Skip to main content

atomr_remote/
remote_props.rs

1//! `RemoteProps` — typed Props serialization for `Deploy::remote`.
2//!
3//! Phase 5.I of `docs/full-port-plan.md`. Akka.NET parity:
4//! `Akka.Remote.RemoteDeploymentWatcher` + Hyperion-serialized
5//! Props. Without a portable Props codec the remote deployer can
6//! only ship `(manifest, bytes)` pairs — this module gives users an
7//! opt-in registry where each manifest maps to a typed factory that
8//! reconstructs the actor on the receiving node.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use parking_lot::RwLock;
14use thiserror::Error;
15
16#[derive(Debug, Error)]
17#[non_exhaustive]
18pub enum RemotePropsError {
19    #[error("no factory registered for manifest `{0}`")]
20    UnknownManifest(String),
21    #[error("codec error: {0}")]
22    Codec(String),
23}
24
25/// Boxed factory closure: given the serialized payload, produce the
26/// reconstructed actor handle as `Arc<dyn std::any::Any + Send + Sync>`
27/// (downcast on the receiving side).
28type Factory =
29    Arc<dyn Fn(&[u8]) -> Result<Arc<dyn std::any::Any + Send + Sync>, RemotePropsError> + Send + Sync>;
30
31/// Per-system registry of `(manifest, factory)` pairs.
32#[derive(Default, Clone)]
33pub struct RemotePropsRegistry {
34    inner: Arc<RwLock<HashMap<String, Factory>>>,
35}
36
37impl RemotePropsRegistry {
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    /// Register a factory for `manifest`. The factory receives the
43    /// serialized payload and returns a reconstructed type-erased
44    /// value the receiver can downcast.
45    pub fn register<F>(&self, manifest: impl Into<String>, factory: F)
46    where
47        F: Fn(&[u8]) -> Result<Arc<dyn std::any::Any + Send + Sync>, RemotePropsError>
48            + Send
49            + Sync
50            + 'static,
51    {
52        self.inner.write().insert(manifest.into(), Arc::new(factory));
53    }
54
55    /// Reconstruct an actor from a `(manifest, bytes)` pair.
56    pub fn instantiate(
57        &self,
58        manifest: &str,
59        bytes: &[u8],
60    ) -> Result<Arc<dyn std::any::Any + Send + Sync>, RemotePropsError> {
61        let factory = self
62            .inner
63            .read()
64            .get(manifest)
65            .cloned()
66            .ok_or_else(|| RemotePropsError::UnknownManifest(manifest.into()))?;
67        factory(bytes)
68    }
69
70    pub fn manifests(&self) -> Vec<String> {
71        let mut v: Vec<String> = self.inner.read().keys().cloned().collect();
72        v.sort();
73        v
74    }
75
76    pub fn len(&self) -> usize {
77        self.inner.read().len()
78    }
79
80    pub fn is_empty(&self) -> bool {
81        self.inner.read().is_empty()
82    }
83}
84
85/// Convenience: register a factory that decodes a `serde::Deserialize`
86/// type via bincode. Eliminates the per-manifest boilerplate.
87pub fn register_bincode<T>(reg: &RemotePropsRegistry, manifest: impl Into<String>)
88where
89    T: for<'de> serde::Deserialize<'de> + Send + Sync + 'static,
90{
91    reg.register(manifest, |bytes: &[u8]| {
92        let cfg = bincode::config::standard();
93        let (v, _): (T, _) = bincode::serde::decode_from_slice(bytes, cfg)
94            .map_err(|e| RemotePropsError::Codec(e.to_string()))?;
95        Ok(Arc::new(v) as Arc<dyn std::any::Any + Send + Sync>)
96    });
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use serde::{Deserialize, Serialize};
103
104    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
105    struct Greeter {
106        prefix: String,
107    }
108
109    #[test]
110    fn unknown_manifest_errors() {
111        let reg = RemotePropsRegistry::new();
112        let r = reg.instantiate("nope", &[]);
113        assert!(matches!(r, Err(RemotePropsError::UnknownManifest(_))));
114    }
115
116    #[test]
117    fn register_bincode_round_trip() {
118        let reg = RemotePropsRegistry::new();
119        register_bincode::<Greeter>(&reg, "Greeter");
120        let cfg = bincode::config::standard();
121        let bytes = bincode::serde::encode_to_vec(&Greeter { prefix: "hi".into() }, cfg).unwrap();
122        let any = reg.instantiate("Greeter", &bytes).unwrap();
123        let g: &Greeter = any.downcast_ref().unwrap();
124        assert_eq!(g.prefix, "hi");
125    }
126
127    #[test]
128    fn manifests_listed_sorted() {
129        let reg = RemotePropsRegistry::new();
130        register_bincode::<Greeter>(&reg, "ZGreeter");
131        register_bincode::<Greeter>(&reg, "AGreeter");
132        register_bincode::<Greeter>(&reg, "MGreeter");
133        assert_eq!(reg.manifests(), vec!["AGreeter", "MGreeter", "ZGreeter"]);
134        assert_eq!(reg.len(), 3);
135    }
136
137    #[test]
138    fn codec_failure_is_typed() {
139        let reg = RemotePropsRegistry::new();
140        register_bincode::<Greeter>(&reg, "G");
141        let r = reg.instantiate("G", &[0xff, 0xff]);
142        assert!(matches!(r, Err(RemotePropsError::Codec(_))));
143    }
144}