Skip to main content

atomr_remote/serialization/
mod.rs

1//! Pluggable serializer registry for remote payloads.
2//! akka.net: `Remote/Serialization/MessageSerializer.cs` +
3//! `Akka.Serialization` core.
4//!
5//! Every payload that crosses the wire carries a `serializer_id` and a
6//! `manifest` (type name). On the receiving side those two fields key
7//! into the [`SerializerRegistry`] to recover the original Rust type.
8//!
9//! Built-in serializers:
10//!
11//! | id | name      | wire format        |
12//! |----|-----------|--------------------|
13//! | 0  | system    | bincode (control)  |
14//! | 1  | bincode   | bincode v2 + serde |
15//! | 2  | json      | serde_json         |
16
17mod bincode_codec;
18mod json_codec;
19
20use std::any::{Any, TypeId};
21use std::collections::HashMap;
22use std::sync::Arc;
23
24use parking_lot::RwLock;
25use thiserror::Error;
26
27pub use bincode_codec::{bincode_decode, bincode_encode, system_decode, system_encode};
28pub use json_codec::{json_decode, json_encode};
29
30pub const SYSTEM_SERIALIZER_ID: u32 = 0;
31pub const BINCODE_SERIALIZER_ID: u32 = 1;
32pub const JSON_SERIALIZER_ID: u32 = 2;
33
34#[derive(Debug, Error)]
35pub enum SerializeError {
36    #[error("serialization failed: {0}")]
37    Encode(String),
38    #[error("deserialization failed: {0}")]
39    Decode(String),
40    #[error("no serializer registered for manifest={0}")]
41    UnknownManifest(String),
42    #[error("downcast failed for manifest={0}")]
43    Downcast(String),
44}
45
46type EncodeFn = Arc<dyn Fn(&dyn Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
47type DecodeFn = Arc<dyn Fn(&[u8]) -> Result<Box<dyn Any + Send>, SerializeError> + Send + Sync>;
48
49/// Closure pair that knows how to encode/decode one Rust type to/from
50/// wire bytes. The registry maps a `manifest` (type name) to one of these.
51#[derive(Clone)]
52pub struct TypeCodec {
53    pub serializer_id: u32,
54    pub manifest: String,
55    pub type_id: TypeId,
56    pub encode: EncodeFn,
57    pub decode: DecodeFn,
58}
59
60/// Per-system registry mapping a `manifest` to a [`TypeCodec`].
61#[derive(Default, Clone)]
62pub struct SerializerRegistry {
63    inner: Arc<RwLock<RegistryInner>>,
64}
65
66#[derive(Default)]
67struct RegistryInner {
68    by_manifest: HashMap<String, TypeCodec>,
69    by_type_id: HashMap<TypeId, TypeCodec>,
70}
71
72impl SerializerRegistry {
73    /// Empty registry. Most callers want [`SerializerRegistry::standard`].
74    pub fn new() -> Self {
75        Self::default()
76    }
77
78    /// Registry pre-populated with codecs for the system control payloads
79    /// (see `bincode_codec::register_system_payloads`).
80    pub fn standard() -> Self {
81        let r = Self::new();
82        bincode_codec::register_system_payloads(&r);
83        r
84    }
85
86    /// Register a type with its serialization closures. The `manifest`
87    /// must match the Rust `std::any::type_name::<T>()` if you want
88    /// `encode_typed::<T>` to find it without an explicit manifest.
89    pub fn register_codec(&self, codec: TypeCodec) {
90        let mut g = self.inner.write();
91        g.by_type_id.insert(codec.type_id, codec.clone());
92        g.by_manifest.insert(codec.manifest.clone(), codec);
93    }
94
95    /// Convenience: register `T` with the bincode codec (id=1).
96    pub fn register_bincode<T>(&self)
97    where
98        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
99    {
100        let manifest = std::any::type_name::<T>().to_string();
101        self.register_codec(TypeCodec {
102            serializer_id: BINCODE_SERIALIZER_ID,
103            manifest: manifest.clone(),
104            type_id: TypeId::of::<T>(),
105            encode: Arc::new(|v: &dyn Any| {
106                let v = v
107                    .downcast_ref::<T>()
108                    .ok_or_else(|| SerializeError::Downcast(std::any::type_name::<T>().to_string()))?;
109                bincode_encode(v)
110            }),
111            decode: Arc::new(|b: &[u8]| {
112                let v: T = bincode_decode(b)?;
113                Ok(Box::new(v) as Box<dyn Any + Send>)
114            }),
115        });
116    }
117
118    /// Convenience: register `T` with the JSON codec (id=2).
119    pub fn register_json<T>(&self)
120    where
121        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
122    {
123        let manifest = std::any::type_name::<T>().to_string();
124        self.register_codec(TypeCodec {
125            serializer_id: JSON_SERIALIZER_ID,
126            manifest: manifest.clone(),
127            type_id: TypeId::of::<T>(),
128            encode: Arc::new(|v: &dyn Any| {
129                let v = v
130                    .downcast_ref::<T>()
131                    .ok_or_else(|| SerializeError::Downcast(std::any::type_name::<T>().to_string()))?;
132                json_encode(v)
133            }),
134            decode: Arc::new(|b: &[u8]| {
135                let v: T = json_decode(b)?;
136                Ok(Box::new(v) as Box<dyn Any + Send>)
137            }),
138        });
139    }
140
141    pub fn codec_for_manifest(&self, manifest: &str) -> Option<TypeCodec> {
142        self.inner.read().by_manifest.get(manifest).cloned()
143    }
144
145    pub fn codec_for_type<T: Any>(&self) -> Option<TypeCodec> {
146        self.inner.read().by_type_id.get(&TypeId::of::<T>()).cloned()
147    }
148
149    /// Encode a typed value, looking up its codec by `TypeId`. Returns
150    /// `(serializer_id, manifest, bytes)` so the caller can fill the
151    /// envelope.
152    pub fn encode_typed<T: Any + Send>(&self, value: &T) -> Result<(u32, String, Vec<u8>), SerializeError> {
153        let codec = self
154            .codec_for_type::<T>()
155            .ok_or_else(|| SerializeError::UnknownManifest(std::any::type_name::<T>().to_string()))?;
156        let bytes = (codec.encode)(value as &dyn Any)?;
157        Ok((codec.serializer_id, codec.manifest.clone(), bytes))
158    }
159
160    /// Decode an inbound payload. Returns the type-erased value plus the
161    /// codec used (so the caller can downcast).
162    pub fn decode_dyn(
163        &self,
164        manifest: &str,
165        _serializer_id: u32,
166        bytes: &[u8],
167    ) -> Result<(Box<dyn Any + Send>, TypeCodec), SerializeError> {
168        let codec = self
169            .codec_for_manifest(manifest)
170            .ok_or_else(|| SerializeError::UnknownManifest(manifest.to_string()))?;
171        let value = (codec.decode)(bytes)?;
172        Ok((value, codec))
173    }
174}