Skip to main content

atomr_remote/
remote_props.rs

1//! `RemoteProps` — typed Props serialization for `Deploy::remote`.
2//!
3//! Without a portable Props codec the remote deployer can only ship
4//! `(manifest, bytes)` pairs — this module gives users an opt-in registry
5//! where each manifest maps to a typed factory that reconstructs the actor
6//! on the receiving node.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12use thiserror::Error;
13
14#[derive(Debug, Error)]
15#[non_exhaustive]
16pub enum RemotePropsError {
17    #[error("no factory registered for manifest `{0}`")]
18    UnknownManifest(String),
19    #[error("codec error: {0}")]
20    Codec(String),
21}
22
23/// Boxed factory closure: given the serialized payload, produce the
24/// reconstructed actor handle as `Arc<dyn std::any::Any + Send + Sync>`
25/// (downcast on the receiving side).
26type Factory =
27    Arc<dyn Fn(&[u8]) -> Result<Arc<dyn std::any::Any + Send + Sync>, RemotePropsError> + Send + Sync>;
28
29/// Per-system registry of `(manifest, factory)` pairs.
30#[derive(Default, Clone)]
31pub struct RemotePropsRegistry {
32    inner: Arc<RwLock<HashMap<String, Factory>>>,
33}
34
35impl RemotePropsRegistry {
36    pub fn new() -> Self {
37        Self::default()
38    }
39
40    /// Register a factory for `manifest`. The factory receives the
41    /// serialized payload and returns a reconstructed type-erased
42    /// value the receiver can downcast.
43    pub fn register<F>(&self, manifest: impl Into<String>, factory: F)
44    where
45        F: Fn(&[u8]) -> Result<Arc<dyn std::any::Any + Send + Sync>, RemotePropsError>
46            + Send
47            + Sync
48            + 'static,
49    {
50        self.inner.write().insert(manifest.into(), Arc::new(factory));
51    }
52
53    /// Reconstruct an actor from a `(manifest, bytes)` pair.
54    pub fn instantiate(
55        &self,
56        manifest: &str,
57        bytes: &[u8],
58    ) -> Result<Arc<dyn std::any::Any + Send + Sync>, RemotePropsError> {
59        let factory = self
60            .inner
61            .read()
62            .get(manifest)
63            .cloned()
64            .ok_or_else(|| RemotePropsError::UnknownManifest(manifest.into()))?;
65        factory(bytes)
66    }
67
68    pub fn manifests(&self) -> Vec<String> {
69        let mut v: Vec<String> = self.inner.read().keys().cloned().collect();
70        v.sort();
71        v
72    }
73
74    pub fn len(&self) -> usize {
75        self.inner.read().len()
76    }
77
78    pub fn is_empty(&self) -> bool {
79        self.inner.read().is_empty()
80    }
81}
82
83/// Convenience: register a factory that decodes a `serde::Deserialize`
84/// type via bincode. Eliminates the per-manifest boilerplate.
85pub fn register_bincode<T>(reg: &RemotePropsRegistry, manifest: impl Into<String>)
86where
87    T: for<'de> serde::Deserialize<'de> + Send + Sync + 'static,
88{
89    reg.register(manifest, |bytes: &[u8]| {
90        let cfg = bincode::config::standard();
91        let (v, _): (T, _) = bincode::serde::decode_from_slice(bytes, cfg)
92            .map_err(|e| RemotePropsError::Codec(e.to_string()))?;
93        Ok(Arc::new(v) as Arc<dyn std::any::Any + Send + Sync>)
94    });
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use serde::{Deserialize, Serialize};
101
102    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
103    struct Greeter {
104        prefix: String,
105    }
106
107    #[test]
108    fn unknown_manifest_errors() {
109        let reg = RemotePropsRegistry::new();
110        let r = reg.instantiate("nope", &[]);
111        assert!(matches!(r, Err(RemotePropsError::UnknownManifest(_))));
112    }
113
114    #[test]
115    fn register_bincode_round_trip() {
116        let reg = RemotePropsRegistry::new();
117        register_bincode::<Greeter>(&reg, "Greeter");
118        let cfg = bincode::config::standard();
119        let bytes = bincode::serde::encode_to_vec(&Greeter { prefix: "hi".into() }, cfg).unwrap();
120        let any = reg.instantiate("Greeter", &bytes).unwrap();
121        let g: &Greeter = any.downcast_ref().unwrap();
122        assert_eq!(g.prefix, "hi");
123    }
124
125    #[test]
126    fn manifests_listed_sorted() {
127        let reg = RemotePropsRegistry::new();
128        register_bincode::<Greeter>(&reg, "ZGreeter");
129        register_bincode::<Greeter>(&reg, "AGreeter");
130        register_bincode::<Greeter>(&reg, "MGreeter");
131        assert_eq!(reg.manifests(), vec!["AGreeter", "MGreeter", "ZGreeter"]);
132        assert_eq!(reg.len(), 3);
133    }
134
135    #[test]
136    fn codec_failure_is_typed() {
137        let reg = RemotePropsRegistry::new();
138        register_bincode::<Greeter>(&reg, "G");
139        let r = reg.instantiate("G", &[0xff, 0xff]);
140        assert!(matches!(r, Err(RemotePropsError::Codec(_))));
141    }
142}