exocore_core/framing/
capnp.rs

1use std::io;
2
3use bytes::Bytes;
4use capnp::{
5    message::{Builder, HeapAllocator},
6    traits::Owned,
7};
8use exocore_protos::{capnp, generated::MessageType};
9
10use super::{check_into_size, Error, FrameBuilder, FrameReader};
11
12/// Frame that wraps a Capnproto message
13pub struct CapnpFrame<I: FrameReader> {
14    inner: I,
15}
16
17impl<I: FrameReader> CapnpFrame<I> {
18    pub fn new(inner: I) -> Result<CapnpFrame<I>, capnp::Error> {
19        Ok(CapnpFrame { inner })
20    }
21
22    pub fn inner(&self) -> &I {
23        &self.inner
24    }
25}
26
27impl<I: FrameReader> FrameReader for CapnpFrame<I> {
28    type OwnedType = CapnpFrame<I::OwnedType>;
29
30    fn exposed_data(&self) -> &[u8] {
31        self.inner.exposed_data()
32    }
33
34    fn whole_data(&self) -> &[u8] {
35        self.inner.whole_data()
36    }
37
38    fn to_owned_frame(&self) -> Self::OwnedType {
39        let owned_inner = self.inner.to_owned_frame();
40        CapnpFrame::new(owned_inner).expect("Couldn't read owned version of self")
41    }
42}
43
44impl<I: FrameReader + Clone> Clone for CapnpFrame<I> {
45    fn clone(&self) -> Self {
46        CapnpFrame {
47            inner: self.inner.clone(),
48        }
49    }
50}
51
52/// Frame that wraps a Capnpframe with type annotation.
53pub struct TypedCapnpFrame<I: FrameReader, T>
54where
55    T: MessageType,
56{
57    inner: CapnpFrame<I>,
58    reader: capnp::message::Reader<capnp::serialize::OwnedSegments>,
59    phantom: std::marker::PhantomData<T>,
60}
61
62impl<I: FrameReader, T> TypedCapnpFrame<I, T>
63where
64    T: MessageType,
65{
66    pub fn new(data: I) -> Result<TypedCapnpFrame<I, T>, capnp::Error> {
67        let frame = CapnpFrame::new(data)?;
68        Self::from_capnp(frame)
69    }
70
71    pub fn from_capnp(capnp_frame: CapnpFrame<I>) -> Result<TypedCapnpFrame<I, T>, capnp::Error> {
72        let opts = capnp::message::ReaderOptions {
73            // This remove security limit, but we keep reusing the reader and we eventually reach
74            // that limit because of it.
75            traversal_limit_in_words: None,
76            ..Default::default()
77        };
78        let mut data = capnp_frame.exposed_data();
79        let reader = capnp::serialize::read_message(&mut data, opts)?;
80
81        Ok(TypedCapnpFrame {
82            inner: capnp_frame,
83            reader,
84            phantom: std::marker::PhantomData,
85        })
86    }
87
88    pub fn inner(&self) -> &CapnpFrame<I> {
89        &self.inner
90    }
91
92    pub fn get_reader(&self) -> Result<<T as Owned>::Reader<'_>, capnp::Error> {
93        self.reader.get_root()
94    }
95
96    pub fn to_owned(&self) -> TypedCapnpFrame<I::OwnedType, T> {
97        let inner_owned = self.inner.to_owned_frame();
98        TypedCapnpFrame::from_capnp(inner_owned).unwrap()
99    }
100}
101
102impl<I: FrameReader, T> FrameReader for TypedCapnpFrame<I, T>
103where
104    T: MessageType,
105{
106    type OwnedType = TypedCapnpFrame<CapnpFrame<I::OwnedType>, T>;
107
108    fn exposed_data(&self) -> &[u8] {
109        self.inner.exposed_data()
110    }
111
112    fn whole_data(&self) -> &[u8] {
113        self.inner.whole_data()
114    }
115
116    fn to_owned_frame(&self) -> Self::OwnedType {
117        let owned_inner = self.inner.to_owned_frame();
118        TypedCapnpFrame::new(owned_inner).expect("Couldn't read owned version of self")
119    }
120}
121
122impl<I: FrameReader + Clone, T> Clone for TypedCapnpFrame<I, T>
123where
124    T: MessageType,
125{
126    fn clone(&self) -> Self {
127        Self::from_capnp(self.inner.clone()).unwrap()
128    }
129}
130
131/// Capnproto frame builder
132pub struct CapnpFrameBuilder<T>
133where
134    T: MessageType,
135{
136    builder: Builder<HeapAllocator>,
137    phantom: std::marker::PhantomData<T>,
138}
139
140impl<T> CapnpFrameBuilder<T>
141where
142    T: MessageType,
143{
144    pub fn new() -> CapnpFrameBuilder<T> {
145        let builder = Builder::new_default();
146        CapnpFrameBuilder {
147            builder,
148            phantom: std::marker::PhantomData,
149        }
150    }
151
152    pub fn get_builder(&mut self) -> <T as capnp::traits::Owned>::Builder<'_> {
153        self.builder.get_root().unwrap()
154    }
155}
156
157impl<T> FrameBuilder for CapnpFrameBuilder<T>
158where
159    T: MessageType,
160{
161    type OwnedFrameType = TypedCapnpFrame<Bytes, T>;
162
163    fn write_to<W: io::Write>(&self, writer: &mut W) -> Result<usize, Error> {
164        let mut buffer = Vec::new();
165        capnp::serialize::write_message(&mut buffer, &self.builder)?;
166        writer.write_all(&buffer)?;
167        Ok(buffer.len())
168    }
169
170    fn write_into(&self, into: &mut [u8]) -> Result<usize, Error> {
171        let mut buffer = Vec::new();
172        capnp::serialize::write_message(&mut buffer, &self.builder)?;
173        check_into_size(buffer.len(), into)?;
174        into[0..buffer.len()].copy_from_slice(&buffer);
175        Ok(buffer.len())
176    }
177
178    fn expected_size(&self) -> Option<usize> {
179        None
180    }
181
182    fn as_owned_frame(&self) -> Self::OwnedFrameType {
183        let bytes = self.as_bytes();
184        TypedCapnpFrame::new(bytes).expect("Couldn't read just-created frame")
185    }
186}
187
188impl<T> Default for CapnpFrameBuilder<T>
189where
190    T: MessageType,
191{
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use exocore_protos::generated::data_chain_capnp::block_header;
200
201    use super::*;
202    use crate::framing::assert_builder_equals;
203
204    #[test]
205    fn assert_typed_frame_send_sync() -> anyhow::Result<()> {
206        fn test_sync<S: Send + Sync>(_sync: S) {}
207
208        let mut frame_builder = CapnpFrameBuilder::<block_header::Owned>::new();
209        let mut builder = frame_builder.get_builder();
210        builder.set_height(1234);
211
212        let frame = TypedCapnpFrame::<_, block_header::Owned>::new(frame_builder.as_bytes())?;
213        test_sync(frame);
214
215        Ok(())
216    }
217
218    #[test]
219    fn can_build_and_read() -> anyhow::Result<()> {
220        let mut frame_builder = CapnpFrameBuilder::<block_header::Owned>::new();
221        let mut builder = frame_builder.get_builder();
222        builder.set_height(1234);
223
224        assert_builder_equals(&frame_builder)?;
225        let frame_bytes = frame_builder.as_bytes();
226
227        let capnp_frame = TypedCapnpFrame::<_, block_header::Owned>::new(frame_bytes)?;
228        let reader = capnp_frame.get_reader()?;
229        assert_eq!(1234, reader.get_height());
230
231        let capnp_frame_owned = capnp_frame.to_owned();
232        let reader = capnp_frame_owned.get_reader()?;
233        assert_eq!(1234, reader.get_height());
234
235        Ok(())
236    }
237
238    #[test]
239    fn can_build_to_owned() -> anyhow::Result<()> {
240        let mut frame_builder = CapnpFrameBuilder::<block_header::Owned>::new();
241        let mut builder = frame_builder.get_builder();
242        builder.set_height(1234);
243
244        let capnp_frame = frame_builder.as_owned_frame();
245        let reader = capnp_frame.get_reader()?;
246        assert_eq!(1234, reader.get_height());
247
248        Ok(())
249    }
250}