Skip to main content

acktor/
codec.rs

1//! Codec traits for encoding and decoding remote messages.
2//!
3//! This module provides the [`Encode`] and [`Decode`] traits along with implementations for
4//! primitive types, standard library containers, and acktor types.
5//!
6
7use std::sync::Arc;
8
9use bytes::{Bytes, BytesMut};
10
11use crate::actor::{Actor, RemoteAddressable};
12use crate::address::{Address, Recipient, RemoteMailbox, RemoteProxy, SenderInfo};
13use crate::message::{Message, MessageId};
14
15#[cfg(feature = "derive")]
16#[cfg_attr(docsrs, doc(cfg(feature = "derive")))]
17pub use acktor_derive::{Decode, Encode};
18
19mod error;
20pub use error::{DecodeError, EncodeError};
21
22mod table;
23pub use table::{Codec, CodecTable, MessageCodec};
24
25mod control_message;
26mod ipc_message;
27
28mod protobuf_helper;
29
30mod common_codec;
31#[cfg(not(feature = "prost-codec"))]
32mod default_codec;
33#[cfg(feature = "prost-codec")]
34mod prost_codec;
35
36/// Context for encoding messages.
37pub trait EncodeContext {
38    /// Registers an actor with its [`RemoteMailbox`].
39    ///
40    /// The actor becomes reachable from other processes after registration.
41    fn register(&self, actor: RemoteMailbox) -> Result<(), EncodeError>;
42}
43
44/// Context for decoding messages.
45pub trait DecodeContext {
46    /// Returns the [`RemoteProxy`] associated with this context, if any.
47    fn remote_proxy(&self) -> Option<Arc<dyn RemoteProxy + Send + Sync>>;
48}
49
50/// Describes how to encode a message.
51pub trait Encode {
52    /// Returns the number of bytes this message will encode to.
53    fn encoded_len(&self) -> usize;
54
55    /// Encodes the message into the provided buffer.
56    ///
57    /// The buffer must have at least `self.encoded_len()` bytes of capacity. If not, the encoding
58    /// may or may not fail with an error, depending on the implementation.
59    fn encode(
60        &self,
61        buf: &mut BytesMut,
62        ctx: Option<&dyn EncodeContext>,
63    ) -> Result<(), EncodeError>;
64
65    /// Encodes the message into a freshly allocated [`Bytes`].
66    fn encode_to_bytes(&self, ctx: Option<&dyn EncodeContext>) -> Result<Bytes, EncodeError> {
67        let mut buf = BytesMut::with_capacity(self.encoded_len());
68        self.encode(&mut buf, ctx)?;
69
70        Ok(buf.freeze())
71    }
72}
73
74/// Describes how to decode a message.
75pub trait Decode {
76    /// Decodes the message from the provided buffer.
77    fn decode(buf: Bytes, ctx: Option<&dyn DecodeContext>) -> Result<Self, DecodeError>
78    where
79        Self: Sized;
80}
81
82impl<A> Address<A>
83where
84    A: Actor + RemoteAddressable,
85{
86    pub fn register(&self, ctx: &dyn EncodeContext) -> Result<(), EncodeError> {
87        let actor_id = self.index();
88
89        if actor_id.is_remote() {
90            Err(EncodeError::EncodeRemoteAddress)
91        } else {
92            ctx.register(
93                self.remote_mailbox()
94                    .ok_or(EncodeError::NotRemoteAddressable)?,
95            )
96        }
97    }
98
99    pub fn new_with_decode_context(
100        index: u64,
101        ctx: &dyn DecodeContext,
102    ) -> Result<Self, DecodeError> {
103        let proxy = ctx.remote_proxy().ok_or(DecodeError::MissingRemoteProxy)?;
104        Ok(Address::new_remote(index, proxy))
105    }
106}
107
108impl<A> Encode for Address<A>
109where
110    A: Actor + RemoteAddressable,
111{
112    #[inline]
113    fn encoded_len(&self) -> usize {
114        prost::Message::encoded_len(&self.index().as_local())
115    }
116
117    #[inline]
118    fn encode(
119        &self,
120        buf: &mut BytesMut,
121        ctx: Option<&dyn EncodeContext>,
122    ) -> Result<(), EncodeError> {
123        // auto-register the address if it is an local address
124        self.register(ctx.ok_or(EncodeError::MissingEncodeContext)?)?;
125        prost::Message::encode(&self.index().as_local(), buf).map_err(Into::into)
126    }
127}
128
129impl<A> Decode for Address<A>
130where
131    A: Actor + RemoteAddressable,
132{
133    #[inline]
134    fn decode(buf: Bytes, ctx: Option<&dyn DecodeContext>) -> Result<Self, DecodeError> {
135        let actor_id = <u64 as prost::Message>::decode(buf)?;
136        Self::new_with_decode_context(actor_id, ctx.ok_or(DecodeError::MissingDecodeContext)?)
137    }
138}
139
140impl<M> Recipient<M>
141where
142    M: Message,
143{
144    pub fn register(&self, ctx: &dyn EncodeContext) -> Result<(), EncodeError> {
145        let actor_id = self.index();
146
147        if actor_id.is_remote() {
148            Err(EncodeError::EncodeRemoteAddress)
149        } else {
150            ctx.register(
151                self.remote_mailbox()
152                    .ok_or(EncodeError::NotRemoteAddressable)?,
153            )
154        }
155    }
156
157    pub fn new_with_decode_context(index: u64, ctx: &dyn DecodeContext) -> Result<Self, DecodeError>
158    where
159        M: MessageId + Encode,
160        M::Result: Decode,
161    {
162        let proxy = ctx.remote_proxy().ok_or(DecodeError::MissingRemoteProxy)?;
163        Ok(Recipient::new_remote(index, proxy))
164    }
165}
166
167impl<M> Encode for Recipient<M>
168where
169    M: Message + MessageId + Encode,
170    M::Result: Decode,
171{
172    #[inline]
173    fn encoded_len(&self) -> usize {
174        prost::Message::encoded_len(&self.index().as_local())
175    }
176
177    #[inline]
178    fn encode(
179        &self,
180        buf: &mut BytesMut,
181        ctx: Option<&dyn EncodeContext>,
182    ) -> Result<(), EncodeError> {
183        // auto-register the recipient if it is an local address
184        self.register(ctx.ok_or(EncodeError::MissingEncodeContext)?)?;
185        prost::Message::encode(&self.index().as_local(), buf).map_err(Into::into)
186    }
187}
188
189impl<M> Decode for Recipient<M>
190where
191    M: Message + MessageId + Encode,
192    M::Result: Decode,
193{
194    #[inline]
195    fn decode(buf: Bytes, ctx: Option<&dyn DecodeContext>) -> Result<Self, DecodeError> {
196        let actor_id = <u64 as prost::Message>::decode(buf)?;
197        Self::new_with_decode_context(actor_id, ctx.ok_or(DecodeError::MissingDecodeContext)?)
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use std::fmt::Debug;
204    use std::sync::Arc;
205
206    use pretty_assertions::assert_eq;
207
208    use super::*;
209    use crate::utils::test_utils::{Dummy, DummyProxy, Ping, make_address};
210
211    fn roundtrip<T>(value: T) -> anyhow::Result<()>
212    where
213        T: Encode + Decode + PartialEq + Debug,
214    {
215        let expected_len = value.encoded_len();
216        let mut buf = BytesMut::with_capacity(expected_len);
217        value.encode(&mut buf, None)?;
218        let buf = buf.freeze();
219        assert_eq!(buf.len(), expected_len);
220
221        let direct = value.encode_to_bytes(None)?;
222        assert_eq!(direct.len(), expected_len);
223        assert_eq!(buf, direct);
224
225        let decoded = T::decode(buf, None)?;
226        assert_eq!(value, decoded);
227
228        Ok(())
229    }
230
231    #[test]
232    fn test_primitive() -> anyhow::Result<()> {
233        roundtrip(())?;
234        roundtrip(true)?;
235        roundtrip(42_u8)?;
236        roundtrip(4242_u16)?;
237        roundtrip(424242_u32)?;
238        roundtrip(42424242_u64)?;
239        roundtrip(4242424242_usize)?;
240        roundtrip(-42_i8)?;
241        roundtrip(-4242_i16)?;
242        roundtrip(-424242_i32)?;
243        roundtrip(-42424242_i64)?;
244        roundtrip(-4242424242_isize)?;
245        roundtrip(42.42_f32)?;
246        roundtrip(42.42_f64)?;
247        roundtrip("hello".to_string())?;
248
249        Ok(())
250    }
251
252    #[test]
253    fn test_vector() -> anyhow::Result<()> {
254        roundtrip(vec![true, false, true])?;
255        roundtrip(vec![42_u8, 42_u8, 42_u8])?;
256        roundtrip(vec![4242_u16, 4242_u16, 4242_u16])?;
257        roundtrip(vec![424242_u32, 424242_u32, 424242_u32])?;
258        roundtrip(vec![42424242_u64, 42424242_u64, 42424242_u64])?;
259        roundtrip(vec![42424242_usize, 42424242_usize, 42424242_usize])?;
260        roundtrip(vec![-42_i8, -42_i8, -42_i8])?;
261        roundtrip(vec![-4242_i16, -4242_i16, -4242_i16])?;
262        roundtrip(vec![-424242_i32, -424242_i32, -424242_i32])?;
263        roundtrip(vec![-42424242_i64, -42424242_i64, -42424242_i64])?;
264        roundtrip(vec![-42424242_isize, -42424242_isize, -42424242_isize])?;
265        roundtrip(vec![42.42_f32, 42.42_f32, 42.42_f32])?;
266        roundtrip(vec![42.42_f64, 42.42_f64, 42.42_f64])?;
267        // empty vector
268        roundtrip(Vec::<bool>::new())?;
269        roundtrip(Vec::<u16>::new())?;
270        roundtrip(Vec::<f32>::new())?;
271        roundtrip(Vec::<usize>::new())?;
272        roundtrip(Vec::<isize>::new())?;
273
274        Ok(())
275    }
276
277    #[test]
278    fn test_option() -> anyhow::Result<()> {
279        roundtrip(None::<u16>)?;
280        roundtrip(Some(4242_u16))?;
281
282        Ok(())
283    }
284
285    #[test]
286    fn test_result() -> anyhow::Result<()> {
287        roundtrip(Ok::<String, String>("hello".to_string()))?;
288        roundtrip(Err::<String, String>("boom".to_string()))?;
289
290        Ok(())
291    }
292
293    #[test]
294    fn test_smart_pointer() -> anyhow::Result<()> {
295        roundtrip(Box::new(vec![4242_u16, 4242_u16, 4242_u16]))?;
296        roundtrip(Arc::new(vec![4242_u16, 4242_u16, 4242_u16]))?;
297
298        Ok(())
299    }
300
301    #[test]
302    fn test_tuple() -> anyhow::Result<()> {
303        roundtrip((42_u32, "hello".to_string()))?;
304        roundtrip((-42424242_i64, true, "hello".to_string(), Some(4242_u16)))?;
305        // tuple in tuple
306        roundtrip((42_u8, (-424242_i32, "hello".to_string())))?;
307
308        #[cfg(not(feature = "prost-codec"))]
309        {
310            use crate::error::ErrorReport;
311
312            let bad: Bytes = vec![0_u8, 1_u8, 2_u8].into();
313            let result = <(u32, u32)>::decode(bad, None);
314            assert_eq!(
315                result.unwrap_err().report(),
316                "could not decode the message: missing the tuple element length"
317            );
318
319            let bad: Bytes = vec![0xff_u8, 0xff_u8, 0xff_u8, 0xff_u8, 42_u8].into();
320            let result = <(u32, u32)>::decode(bad, None);
321            assert_eq!(
322                result.unwrap_err().report(),
323                "could not decode the message: missing the tuple element data"
324            );
325        }
326
327        Ok(())
328    }
329
330    #[tokio::test]
331    async fn test_address() -> anyhow::Result<()> {
332        use crate::error::ErrorReport;
333
334        let proxy = DummyProxy::new();
335
336        let (address, _) = make_address(1);
337
338        let expected_len = address.encoded_len();
339        let mut buf = BytesMut::with_capacity(expected_len);
340        address.encode(&mut buf, proxy.encode_context())?;
341        let buf = buf.freeze();
342        assert_eq!(buf.len(), expected_len);
343
344        let direct = address.encode_to_bytes(proxy.encode_context())?;
345        assert_eq!(direct.len(), expected_len);
346        assert_eq!(buf, direct);
347
348        let decoded = Address::<Dummy>::decode(buf, proxy.decode_context())?;
349        assert_eq!(address.index().as_local(), decoded.index().as_local());
350
351        let address = Address::<Dummy>::new_remote(42, proxy.clone());
352        let result = address.encode_to_bytes(proxy.encode_context());
353        assert_eq!(
354            result.unwrap_err().report(),
355            "remote address should not be encoded into a message"
356        );
357
358        Ok(())
359    }
360
361    #[tokio::test]
362    async fn test_recipient() -> anyhow::Result<()> {
363        use crate::error::ErrorReport;
364
365        let proxy = DummyProxy::new();
366
367        let (address, _) = make_address(1);
368        let recipient: Recipient<Ping> = address.into();
369
370        let expected_len = recipient.encoded_len();
371        let mut buf = BytesMut::with_capacity(expected_len);
372        recipient.encode(&mut buf, proxy.encode_context())?;
373        let buf = buf.freeze();
374        assert_eq!(buf.len(), expected_len);
375
376        let direct = recipient.encode_to_bytes(proxy.encode_context())?;
377        assert_eq!(direct.len(), expected_len);
378        assert_eq!(buf, direct);
379
380        let decoded = Recipient::<Ping>::decode(buf, proxy.decode_context())?;
381        assert_eq!(recipient.index().as_local(), decoded.index().as_local());
382
383        let recipient = Recipient::<Ping>::new_remote(42, proxy.clone());
384        let result = recipient.encode_to_bytes(proxy.encode_context());
385        assert_eq!(
386            result.unwrap_err().report(),
387            "remote address should not be encoded into a message"
388        );
389
390        Ok(())
391    }
392}