Skip to main content

atomr_remote/serialization/
mod.rs

1//! Pluggable serializer registry for remote payloads.
2//! Core.
3//!
4//! Every payload that crosses the wire carries a `serializer_id` and a
5//! `manifest` (type name). On the receiving side those two fields key
6//! into the [`SerializerRegistry`] to recover the original Rust type.
7//!
8//! Built-in serializers:
9//!
10//! | id | name      | wire format        |
11//! |----|-----------|--------------------|
12//! | 0  | system    | bincode (control)  |
13//! | 1  | bincode   | bincode v2 + serde |
14//! | 2  | json      | serde_json         |
15
16mod bincode_codec;
17mod json_codec;
18
19use std::any::{Any, TypeId};
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use parking_lot::RwLock;
24use thiserror::Error;
25
26pub use bincode_codec::{bincode_decode, bincode_encode, system_decode, system_encode};
27pub use json_codec::{json_decode, json_encode};
28
29pub const SYSTEM_SERIALIZER_ID: u32 = 0;
30pub const BINCODE_SERIALIZER_ID: u32 = 1;
31pub const JSON_SERIALIZER_ID: u32 = 2;
32
33#[derive(Debug, Error)]
34pub enum SerializeError {
35    #[error("serialization failed: {0}")]
36    Encode(String),
37    #[error("deserialization failed: {0}")]
38    Decode(String),
39    #[error("no serializer registered for manifest={0}")]
40    UnknownManifest(String),
41    #[error("downcast failed for manifest={0}")]
42    Downcast(String),
43}
44
45type EncodeFn = Arc<dyn Fn(&dyn Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
46type DecodeFn = Arc<dyn Fn(&[u8]) -> Result<Box<dyn Any + Send>, SerializeError> + Send + Sync>;
47
48/// Closure pair that knows how to encode/decode one Rust type to/from
49/// wire bytes. The registry maps a `manifest` (type name) to one of these.
50#[derive(Clone)]
51pub struct TypeCodec {
52    pub serializer_id: u32,
53    pub manifest: String,
54    pub type_id: TypeId,
55    pub encode: EncodeFn,
56    pub decode: DecodeFn,
57}
58
59/// Per-system registry mapping a `manifest` to a [`TypeCodec`].
60#[derive(Default, Clone)]
61pub struct SerializerRegistry {
62    inner: Arc<RwLock<RegistryInner>>,
63}
64
65#[derive(Default)]
66struct RegistryInner {
67    by_manifest: HashMap<String, TypeCodec>,
68    by_type_id: HashMap<TypeId, TypeCodec>,
69}
70
71impl SerializerRegistry {
72    /// Empty registry. Most callers want [`SerializerRegistry::standard`].
73    pub fn new() -> Self {
74        Self::default()
75    }
76
77    /// Registry pre-populated with codecs for the system control payloads
78    /// (see `bincode_codec::register_system_payloads`).
79    pub fn standard() -> Self {
80        let r = Self::new();
81        bincode_codec::register_system_payloads(&r);
82        r
83    }
84
85    /// Register a type with its serialization closures. The `manifest`
86    /// must match the Rust `std::any::type_name::<T>()` if you want
87    /// `encode_typed::<T>` to find it without an explicit manifest.
88    pub fn register_codec(&self, codec: TypeCodec) {
89        let mut g = self.inner.write();
90        g.by_type_id.insert(codec.type_id, codec.clone());
91        g.by_manifest.insert(codec.manifest.clone(), codec);
92    }
93
94    /// Convenience: register `T` with the bincode codec (id=1).
95    pub fn register_bincode<T>(&self)
96    where
97        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
98    {
99        let manifest = std::any::type_name::<T>().to_string();
100        self.register_codec(TypeCodec {
101            serializer_id: BINCODE_SERIALIZER_ID,
102            manifest: manifest.clone(),
103            type_id: TypeId::of::<T>(),
104            encode: Arc::new(|v: &dyn Any| {
105                let v = v
106                    .downcast_ref::<T>()
107                    .ok_or_else(|| SerializeError::Downcast(std::any::type_name::<T>().to_string()))?;
108                bincode_encode(v)
109            }),
110            decode: Arc::new(|b: &[u8]| {
111                let v: T = bincode_decode(b)?;
112                Ok(Box::new(v) as Box<dyn Any + Send>)
113            }),
114        });
115    }
116
117    /// Convenience: register `T` with the JSON codec (id=2).
118    pub fn register_json<T>(&self)
119    where
120        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
121    {
122        let manifest = std::any::type_name::<T>().to_string();
123        self.register_codec(TypeCodec {
124            serializer_id: JSON_SERIALIZER_ID,
125            manifest: manifest.clone(),
126            type_id: TypeId::of::<T>(),
127            encode: Arc::new(|v: &dyn Any| {
128                let v = v
129                    .downcast_ref::<T>()
130                    .ok_or_else(|| SerializeError::Downcast(std::any::type_name::<T>().to_string()))?;
131                json_encode(v)
132            }),
133            decode: Arc::new(|b: &[u8]| {
134                let v: T = json_decode(b)?;
135                Ok(Box::new(v) as Box<dyn Any + Send>)
136            }),
137        });
138    }
139
140    pub fn codec_for_manifest(&self, manifest: &str) -> Option<TypeCodec> {
141        self.inner.read().by_manifest.get(manifest).cloned()
142    }
143
144    pub fn codec_for_type<T: Any>(&self) -> Option<TypeCodec> {
145        self.inner.read().by_type_id.get(&TypeId::of::<T>()).cloned()
146    }
147
148    /// Encode a typed value, looking up its codec by `TypeId`. Returns
149    /// `(serializer_id, manifest, bytes)` so the caller can fill the
150    /// envelope.
151    pub fn encode_typed<T: Any + Send>(&self, value: &T) -> Result<(u32, String, Vec<u8>), SerializeError> {
152        let codec = self
153            .codec_for_type::<T>()
154            .ok_or_else(|| SerializeError::UnknownManifest(std::any::type_name::<T>().to_string()))?;
155        let bytes = (codec.encode)(value as &dyn Any)?;
156        Ok((codec.serializer_id, codec.manifest.clone(), bytes))
157    }
158
159    /// Decode an inbound payload. Returns the type-erased value plus the
160    /// codec used (so the caller can downcast).
161    pub fn decode_dyn(
162        &self,
163        manifest: &str,
164        _serializer_id: u32,
165        bytes: &[u8],
166    ) -> Result<(Box<dyn Any + Send>, TypeCodec), SerializeError> {
167        let codec = self
168            .codec_for_manifest(manifest)
169            .ok_or_else(|| SerializeError::UnknownManifest(manifest.to_string()))?;
170        let value = (codec.decode)(bytes)?;
171        Ok((value, codec))
172    }
173}