1use std::sync::Arc;
8
9use bytes::{Bytes, BytesMut};
10
11use crate::actor::{Actor, RemoteAddressable};
12use crate::address::{Address, Recipient, RemoteMailbox, RemoteProxy, SenderInfo};
13use crate::message::{Message, MessageId};
14
15#[cfg(feature = "derive")]
16#[cfg_attr(docsrs, doc(cfg(feature = "derive")))]
17pub use acktor_derive::{Decode, Encode};
18
19mod error;
20pub use error::{DecodeError, EncodeError};
21
22mod table;
23pub use table::{Codec, CodecTable, MessageCodec};
24
25mod control_message;
26mod ipc_message;
27
28mod protobuf_helper;
29
30mod common_codec;
31#[cfg(not(feature = "prost-codec"))]
32mod default_codec;
33#[cfg(feature = "prost-codec")]
34mod prost_codec;
35
36pub trait EncodeContext {
38 fn register(&self, actor: RemoteMailbox) -> Result<(), EncodeError>;
42}
43
44pub trait DecodeContext {
46 fn remote_proxy(&self) -> Option<Arc<dyn RemoteProxy + Send + Sync>>;
48}
49
50pub trait Encode {
52 fn encoded_len(&self) -> usize;
54
55 fn encode(
60 &self,
61 buf: &mut BytesMut,
62 ctx: Option<&dyn EncodeContext>,
63 ) -> Result<(), EncodeError>;
64
65 fn encode_to_bytes(&self, ctx: Option<&dyn EncodeContext>) -> Result<Bytes, EncodeError> {
67 let mut buf = BytesMut::with_capacity(self.encoded_len());
68 self.encode(&mut buf, ctx)?;
69
70 Ok(buf.freeze())
71 }
72}
73
74pub trait Decode {
76 fn decode(buf: Bytes, ctx: Option<&dyn DecodeContext>) -> Result<Self, DecodeError>
78 where
79 Self: Sized;
80}
81
82impl<A> Address<A>
83where
84 A: Actor + RemoteAddressable,
85{
86 pub fn register(&self, ctx: &dyn EncodeContext) -> Result<(), EncodeError> {
87 let actor_id = self.index();
88
89 if actor_id.is_remote() {
90 Err(EncodeError::EncodeRemoteAddress)
91 } else {
92 ctx.register(
93 self.remote_mailbox()
94 .ok_or(EncodeError::NotRemoteAddressable)?,
95 )
96 }
97 }
98
99 pub fn new_with_decode_context(
100 index: u64,
101 ctx: &dyn DecodeContext,
102 ) -> Result<Self, DecodeError> {
103 let proxy = ctx.remote_proxy().ok_or(DecodeError::MissingRemoteProxy)?;
104 Ok(Address::new_remote(index, proxy))
105 }
106}
107
108impl<A> Encode for Address<A>
109where
110 A: Actor + RemoteAddressable,
111{
112 #[inline]
113 fn encoded_len(&self) -> usize {
114 prost::Message::encoded_len(&self.index().as_local())
115 }
116
117 #[inline]
118 fn encode(
119 &self,
120 buf: &mut BytesMut,
121 ctx: Option<&dyn EncodeContext>,
122 ) -> Result<(), EncodeError> {
123 self.register(ctx.ok_or(EncodeError::MissingEncodeContext)?)?;
125 prost::Message::encode(&self.index().as_local(), buf).map_err(Into::into)
126 }
127}
128
129impl<A> Decode for Address<A>
130where
131 A: Actor + RemoteAddressable,
132{
133 #[inline]
134 fn decode(buf: Bytes, ctx: Option<&dyn DecodeContext>) -> Result<Self, DecodeError> {
135 let actor_id = <u64 as prost::Message>::decode(buf)?;
136 Self::new_with_decode_context(actor_id, ctx.ok_or(DecodeError::MissingDecodeContext)?)
137 }
138}
139
140impl<M> Recipient<M>
141where
142 M: Message,
143{
144 pub fn register(&self, ctx: &dyn EncodeContext) -> Result<(), EncodeError> {
145 let actor_id = self.index();
146
147 if actor_id.is_remote() {
148 Err(EncodeError::EncodeRemoteAddress)
149 } else {
150 ctx.register(
151 self.remote_mailbox()
152 .ok_or(EncodeError::NotRemoteAddressable)?,
153 )
154 }
155 }
156
157 pub fn new_with_decode_context(index: u64, ctx: &dyn DecodeContext) -> Result<Self, DecodeError>
158 where
159 M: MessageId + Encode,
160 M::Result: Decode,
161 {
162 let proxy = ctx.remote_proxy().ok_or(DecodeError::MissingRemoteProxy)?;
163 Ok(Recipient::new_remote(index, proxy))
164 }
165}
166
167impl<M> Encode for Recipient<M>
168where
169 M: Message + MessageId + Encode,
170 M::Result: Decode,
171{
172 #[inline]
173 fn encoded_len(&self) -> usize {
174 prost::Message::encoded_len(&self.index().as_local())
175 }
176
177 #[inline]
178 fn encode(
179 &self,
180 buf: &mut BytesMut,
181 ctx: Option<&dyn EncodeContext>,
182 ) -> Result<(), EncodeError> {
183 self.register(ctx.ok_or(EncodeError::MissingEncodeContext)?)?;
185 prost::Message::encode(&self.index().as_local(), buf).map_err(Into::into)
186 }
187}
188
189impl<M> Decode for Recipient<M>
190where
191 M: Message + MessageId + Encode,
192 M::Result: Decode,
193{
194 #[inline]
195 fn decode(buf: Bytes, ctx: Option<&dyn DecodeContext>) -> Result<Self, DecodeError> {
196 let actor_id = <u64 as prost::Message>::decode(buf)?;
197 Self::new_with_decode_context(actor_id, ctx.ok_or(DecodeError::MissingDecodeContext)?)
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use std::fmt::Debug;
204 use std::sync::Arc;
205
206 use pretty_assertions::assert_eq;
207
208 use super::*;
209 use crate::utils::test_utils::{Dummy, DummyProxy, Ping, make_address};
210
211 fn roundtrip<T>(value: T) -> anyhow::Result<()>
212 where
213 T: Encode + Decode + PartialEq + Debug,
214 {
215 let expected_len = value.encoded_len();
216 let mut buf = BytesMut::with_capacity(expected_len);
217 value.encode(&mut buf, None)?;
218 let buf = buf.freeze();
219 assert_eq!(buf.len(), expected_len);
220
221 let direct = value.encode_to_bytes(None)?;
222 assert_eq!(direct.len(), expected_len);
223 assert_eq!(buf, direct);
224
225 let decoded = T::decode(buf, None)?;
226 assert_eq!(value, decoded);
227
228 Ok(())
229 }
230
231 #[test]
232 fn test_primitive() -> anyhow::Result<()> {
233 roundtrip(())?;
234 roundtrip(true)?;
235 roundtrip(42_u8)?;
236 roundtrip(4242_u16)?;
237 roundtrip(424242_u32)?;
238 roundtrip(42424242_u64)?;
239 roundtrip(4242424242_usize)?;
240 roundtrip(-42_i8)?;
241 roundtrip(-4242_i16)?;
242 roundtrip(-424242_i32)?;
243 roundtrip(-42424242_i64)?;
244 roundtrip(-4242424242_isize)?;
245 roundtrip(42.42_f32)?;
246 roundtrip(42.42_f64)?;
247 roundtrip("hello".to_string())?;
248
249 Ok(())
250 }
251
252 #[test]
253 fn test_vector() -> anyhow::Result<()> {
254 roundtrip(vec![true, false, true])?;
255 roundtrip(vec![42_u8, 42_u8, 42_u8])?;
256 roundtrip(vec![4242_u16, 4242_u16, 4242_u16])?;
257 roundtrip(vec![424242_u32, 424242_u32, 424242_u32])?;
258 roundtrip(vec![42424242_u64, 42424242_u64, 42424242_u64])?;
259 roundtrip(vec![42424242_usize, 42424242_usize, 42424242_usize])?;
260 roundtrip(vec![-42_i8, -42_i8, -42_i8])?;
261 roundtrip(vec![-4242_i16, -4242_i16, -4242_i16])?;
262 roundtrip(vec![-424242_i32, -424242_i32, -424242_i32])?;
263 roundtrip(vec![-42424242_i64, -42424242_i64, -42424242_i64])?;
264 roundtrip(vec![-42424242_isize, -42424242_isize, -42424242_isize])?;
265 roundtrip(vec![42.42_f32, 42.42_f32, 42.42_f32])?;
266 roundtrip(vec![42.42_f64, 42.42_f64, 42.42_f64])?;
267 roundtrip(Vec::<bool>::new())?;
269 roundtrip(Vec::<u16>::new())?;
270 roundtrip(Vec::<f32>::new())?;
271 roundtrip(Vec::<usize>::new())?;
272 roundtrip(Vec::<isize>::new())?;
273
274 Ok(())
275 }
276
277 #[test]
278 fn test_option() -> anyhow::Result<()> {
279 roundtrip(None::<u16>)?;
280 roundtrip(Some(4242_u16))?;
281
282 Ok(())
283 }
284
285 #[test]
286 fn test_result() -> anyhow::Result<()> {
287 roundtrip(Ok::<String, String>("hello".to_string()))?;
288 roundtrip(Err::<String, String>("boom".to_string()))?;
289
290 Ok(())
291 }
292
293 #[test]
294 fn test_smart_pointer() -> anyhow::Result<()> {
295 roundtrip(Box::new(vec![4242_u16, 4242_u16, 4242_u16]))?;
296 roundtrip(Arc::new(vec![4242_u16, 4242_u16, 4242_u16]))?;
297
298 Ok(())
299 }
300
301 #[test]
302 fn test_tuple() -> anyhow::Result<()> {
303 roundtrip((42_u32, "hello".to_string()))?;
304 roundtrip((-42424242_i64, true, "hello".to_string(), Some(4242_u16)))?;
305 roundtrip((42_u8, (-424242_i32, "hello".to_string())))?;
307
308 #[cfg(not(feature = "prost-codec"))]
309 {
310 use crate::error::ErrorReport;
311
312 let bad: Bytes = vec![0_u8, 1_u8, 2_u8].into();
313 let result = <(u32, u32)>::decode(bad, None);
314 assert_eq!(
315 result.unwrap_err().report(),
316 "could not decode the message: missing the tuple element length"
317 );
318
319 let bad: Bytes = vec![0xff_u8, 0xff_u8, 0xff_u8, 0xff_u8, 42_u8].into();
320 let result = <(u32, u32)>::decode(bad, None);
321 assert_eq!(
322 result.unwrap_err().report(),
323 "could not decode the message: missing the tuple element data"
324 );
325 }
326
327 Ok(())
328 }
329
330 #[tokio::test]
331 async fn test_address() -> anyhow::Result<()> {
332 use crate::error::ErrorReport;
333
334 let proxy = DummyProxy::new();
335
336 let (address, _) = make_address(1);
337
338 let expected_len = address.encoded_len();
339 let mut buf = BytesMut::with_capacity(expected_len);
340 address.encode(&mut buf, proxy.encode_context())?;
341 let buf = buf.freeze();
342 assert_eq!(buf.len(), expected_len);
343
344 let direct = address.encode_to_bytes(proxy.encode_context())?;
345 assert_eq!(direct.len(), expected_len);
346 assert_eq!(buf, direct);
347
348 let decoded = Address::<Dummy>::decode(buf, proxy.decode_context())?;
349 assert_eq!(address.index().as_local(), decoded.index().as_local());
350
351 let address = Address::<Dummy>::new_remote(42, proxy.clone());
352 let result = address.encode_to_bytes(proxy.encode_context());
353 assert_eq!(
354 result.unwrap_err().report(),
355 "remote address should not be encoded into a message"
356 );
357
358 Ok(())
359 }
360
361 #[tokio::test]
362 async fn test_recipient() -> anyhow::Result<()> {
363 use crate::error::ErrorReport;
364
365 let proxy = DummyProxy::new();
366
367 let (address, _) = make_address(1);
368 let recipient: Recipient<Ping> = address.into();
369
370 let expected_len = recipient.encoded_len();
371 let mut buf = BytesMut::with_capacity(expected_len);
372 recipient.encode(&mut buf, proxy.encode_context())?;
373 let buf = buf.freeze();
374 assert_eq!(buf.len(), expected_len);
375
376 let direct = recipient.encode_to_bytes(proxy.encode_context())?;
377 assert_eq!(direct.len(), expected_len);
378 assert_eq!(buf, direct);
379
380 let decoded = Recipient::<Ping>::decode(buf, proxy.decode_context())?;
381 assert_eq!(recipient.index().as_local(), decoded.index().as_local());
382
383 let recipient = Recipient::<Ping>::new_remote(42, proxy.clone());
384 let result = recipient.encode_to_bytes(proxy.encode_context());
385 assert_eq!(
386 result.unwrap_err().report(),
387 "remote address should not be encoded into a message"
388 );
389
390 Ok(())
391 }
392}