Skip to main content

dactor/
remote.rs

1//! Remote actor type definitions.
2//!
3//! Contains marker traits, wire-format types, and cluster discovery stubs
4//! for future remote actor communication. No actual networking is implemented.
5
6use crate::interceptor::SendMode;
7use crate::node::{ActorId, NodeId};
8use uuid::Uuid;
9
10/// Marker trait for messages that can be sent to remote actors.
11/// Messages must be serializable to cross the network boundary.
12pub trait RemoteMessage: crate::message::Message + Send + 'static {}
13
14/// Trait for serializing and deserializing messages for wire transport.
15///
16/// The runtime uses this for all remote communication: tell, ask, stream, feed,
17/// remote spawn (actor Args), error payloads, and headers.
18pub trait MessageSerializer: Send + Sync + 'static {
19    /// Human-readable name (for logging and diagnostics).
20    fn name(&self) -> &'static str;
21
22    /// Serialize a value to bytes.
23    fn serialize(&self, value: &dyn std::any::Any) -> Result<Vec<u8>, SerializationError>;
24
25    /// Deserialize bytes back to a typed value.
26    fn deserialize(
27        &self,
28        bytes: &[u8],
29        type_name: &str,
30    ) -> Result<Box<dyn std::any::Any + Send>, SerializationError>;
31}
32
33/// Error during serialization/deserialization.
34#[derive(Debug, Clone)]
35pub struct SerializationError {
36    /// Description of the serialization failure.
37    pub message: String,
38}
39
40impl SerializationError {
41    /// Create a new serialization error with the given message.
42    pub fn new(message: impl Into<String>) -> Self {
43        Self {
44            message: message.into(),
45        }
46    }
47}
48
49impl std::fmt::Display for SerializationError {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        write!(f, "serialization error: {}", self.message)
52    }
53}
54
55impl std::error::Error for SerializationError {}
56
57/// Wire-format envelope for remote messages.
58/// All remote messages travel as WireEnvelopes over the network.
59#[derive(Debug, Clone)]
60#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
61pub struct WireEnvelope {
62    /// Target actor ID.
63    pub target: ActorId,
64    /// Target actor's human-readable name.
65    pub target_name: String,
66    /// Rust type name of the message (for deserialization dispatch).
67    pub message_type: String,
68    /// How the message was sent (Tell, Ask, Expand, Reduce).
69    pub send_mode: SendMode,
70    /// Serialized headers (string key → bytes).
71    pub headers: WireHeaders,
72    /// Serialized message body.
73    pub body: Vec<u8>,
74    /// Request ID for correlating ask replies (None for tell).
75    pub request_id: Option<Uuid>,
76    /// Message version for schema evolution (None = current version).
77    pub version: Option<u32>,
78}
79
80/// Wire-format headers: string key → serialized bytes.
81/// Used for cross-node header transport.
82#[derive(Debug, Clone, Default)]
83#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
84pub struct WireHeaders {
85    /// Key-value pairs of serialized header data.
86    pub entries: std::collections::HashMap<String, Vec<u8>>,
87}
88
89impl WireHeaders {
90    /// Create an empty wire headers map.
91    pub fn new() -> Self {
92        Self {
93            entries: std::collections::HashMap::new(),
94        }
95    }
96
97    /// Insert a header entry.
98    pub fn insert(&mut self, name: String, value: Vec<u8>) {
99        self.entries.insert(name, value);
100    }
101
102    /// Get a header value by name.
103    pub fn get(&self, name: &str) -> Option<&[u8]> {
104        self.entries.get(name).map(|v| v.as_slice())
105    }
106
107    /// Whether there are no headers.
108    pub fn is_empty(&self) -> bool {
109        self.entries.is_empty()
110    }
111    /// Number of header entries.
112    pub fn len(&self) -> usize {
113        self.entries.len()
114    }
115
116    /// Reconstruct typed [`Headers`](crate::message::Headers) from wire format
117    /// using a [`HeaderRegistry`] to look up deserializers by name.
118    pub fn to_headers(&self, registry: &HeaderRegistry) -> crate::message::Headers {
119        let mut headers = crate::message::Headers::new();
120        for (name, bytes) in &self.entries {
121            if let Some(header_value) = registry.deserialize(name, bytes) {
122                headers.insert_boxed(header_value);
123            }
124        }
125        headers
126    }
127}
128
129/// A function that deserializes header bytes into a typed header value.
130pub type HeaderDeserializerFn =
131    Box<dyn Fn(&[u8]) -> Option<Box<dyn crate::message::HeaderValue>> + Send + Sync>;
132
133/// Registry for deserializing wire header bytes back to typed
134/// [`HeaderValue`](crate::message::HeaderValue) instances.
135///
136/// Populated at startup with one entry per header type that can arrive
137/// from remote nodes.
138pub struct HeaderRegistry {
139    /// header_name → deserializer function.
140    deserializers: std::collections::HashMap<String, HeaderDeserializerFn>,
141}
142
143impl HeaderRegistry {
144    /// Create an empty registry.
145    pub fn new() -> Self {
146        Self {
147            deserializers: std::collections::HashMap::new(),
148        }
149    }
150
151    /// Register a deserializer for a named header.
152    pub fn register(
153        &mut self,
154        header_name: impl Into<String>,
155        deserializer: impl Fn(&[u8]) -> Option<Box<dyn crate::message::HeaderValue>>
156            + Send
157            + Sync
158            + 'static,
159    ) {
160        self.deserializers
161            .insert(header_name.into(), Box::new(deserializer));
162    }
163
164    /// Deserialize header bytes using the registered deserializer.
165    /// Returns `None` if no deserializer is registered or deserialization fails.
166    pub fn deserialize(
167        &self,
168        header_name: &str,
169        bytes: &[u8],
170    ) -> Option<Box<dyn crate::message::HeaderValue>> {
171        let deser = self.deserializers.get(header_name)?;
172        deser(bytes)
173    }
174
175    /// Number of registered header deserializers.
176    pub fn len(&self) -> usize {
177        self.deserializers.len()
178    }
179
180    /// Whether the registry is empty.
181    pub fn is_empty(&self) -> bool {
182        self.deserializers.is_empty()
183    }
184}
185
186impl Default for HeaderRegistry {
187    fn default() -> Self {
188        Self::new()
189    }
190}
191
192/// Handler for message version migration.
193/// Registered per message type to handle schema evolution.
194pub trait MessageVersionHandler: Send + Sync + 'static {
195    /// The message type this handler manages.
196    fn message_type(&self) -> &'static str;
197
198    /// Migrate a message payload from an older version to the current version.
199    /// Returns the migrated bytes, or None if migration is not possible.
200    fn migrate(&self, payload: &[u8], from_version: u32, to_version: u32) -> Option<Vec<u8>>;
201}
202
203/// Snapshot of the cluster state at a point in time.
204///
205/// Includes topology (which nodes are known), version information for
206/// each peer, and the local node's own version metadata. This is the
207/// primary type for operational visibility during rolling upgrades.
208///
209/// # Invariants
210///
211/// - `nodes` includes the local node.
212/// - `peer_versions` excludes the local node (local version info is in
213///   `wire_version` and `app_version`).
214/// - Every connected remote node in `nodes` should have a corresponding
215///   entry in `peer_versions`.
216#[derive(Debug, Clone)]
217#[non_exhaustive]
218pub struct ClusterState {
219    /// The local node's identity.
220    pub local_node: NodeId,
221    /// All known nodes in the cluster (including local).
222    pub nodes: Vec<NodeId>,
223    /// Whether this node considers itself the leader (if applicable).
224    pub is_leader: bool,
225    /// This node's wire protocol version.
226    pub wire_version: crate::version::WireVersion,
227    /// This node's application version, if configured. Purely
228    /// informational — does not affect compatibility.
229    pub app_version: Option<String>,
230    /// Version metadata for each connected remote peer. Populated from
231    /// successful handshake responses. Keyed by [`NodeId`]; does **not**
232    /// include the local node.
233    pub peer_versions: std::collections::HashMap<NodeId, PeerVersionInfo>,
234}
235
236impl ClusterState {
237    /// Create a new ClusterState with the given local node and defaults.
238    ///
239    /// Sets `wire_version` to [`DACTOR_WIRE_VERSION`](crate::version::DACTOR_WIRE_VERSION),
240    /// `app_version` to `None`, `is_leader` to `false`, and empty
241    /// `peer_versions`.
242    pub fn new(local_node: NodeId, nodes: Vec<NodeId>) -> Self {
243        Self {
244            local_node,
245            nodes,
246            is_leader: false,
247            wire_version: crate::version::WireVersion::parse(
248                crate::version::DACTOR_WIRE_VERSION,
249            )
250            .expect("DACTOR_WIRE_VERSION must be valid"),
251            app_version: None,
252            peer_versions: std::collections::HashMap::new(),
253        }
254    }
255
256    /// Number of nodes in the cluster.
257    pub fn node_count(&self) -> usize {
258        self.nodes.len()
259    }
260
261    /// Check if a specific node is in the cluster.
262    pub fn contains(&self, node_id: &NodeId) -> bool {
263        self.nodes.contains(node_id)
264    }
265
266    /// Look up version information for a remote peer.
267    pub fn peer_version(&self, node_id: &NodeId) -> Option<&PeerVersionInfo> {
268        self.peer_versions.get(node_id)
269    }
270}
271
272/// Version metadata for a connected remote peer.
273///
274/// Populated from a successful [`HandshakeResponse::Accepted`](crate::system_actors::HandshakeResponse)
275/// during connection setup.
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct PeerVersionInfo {
278    /// The peer's wire protocol version.
279    pub wire_version: crate::version::WireVersion,
280    /// The peer's application version, if configured.
281    pub app_version: Option<String>,
282    /// The peer's actor framework adapter (e.g. "ractor", "kameo").
283    pub adapter: String,
284}
285
286/// Error returned by [`ClusterDiscovery`] implementations.
287#[derive(Debug, Clone)]
288pub struct DiscoveryError {
289    /// Human-readable description of what went wrong.
290    pub message: String,
291}
292
293impl DiscoveryError {
294    /// Create a new discovery error with the given message.
295    pub fn new(message: impl Into<String>) -> Self {
296        Self {
297            message: message.into(),
298        }
299    }
300}
301
302impl std::fmt::Display for DiscoveryError {
303    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304        write!(f, "{}", self.message)
305    }
306}
307
308impl std::error::Error for DiscoveryError {}
309
310/// A peer found by [`ClusterDiscovery`].
311///
312/// Combines a stable identity ([`NodeId`]) with a network address.
313/// Discovery backends that only know addresses (e.g., DNS-based discovery)
314/// can use the address as the node ID via [`DiscoveredPeer::from_address`].
315#[derive(Debug, Clone, PartialEq, Eq, Hash)]
316pub struct DiscoveredPeer {
317    /// The peer's stable identity in the cluster.
318    pub node_id: NodeId,
319    /// The peer's network address (e.g., "10.0.0.1:9000").
320    pub address: String,
321}
322
323impl DiscoveredPeer {
324    /// Create a discovered peer with an explicit node ID.
325    pub fn new(node_id: NodeId, address: impl Into<String>) -> Self {
326        Self {
327            node_id,
328            address: address.into(),
329        }
330    }
331
332    /// Create a discovered peer using the address string as the node ID.
333    ///
334    /// Convenience for discovery backends that don't provide stable
335    /// identities (e.g., DNS resolution). The address is used as both
336    /// the identity and the endpoint.
337    ///
338    /// **Important:** When using this with [`verify_peer_identity`](crate::verify_peer_identity),
339    /// the remote node must be configured with a `node_id` that matches
340    /// this address string exactly. If the remote node uses a different
341    /// `node_id` (e.g., `"my-node"` instead of `"10.0.0.1:9000"`), the
342    /// identity verification will fail. For such cases,
343    /// use [`DiscoveredPeer::new`] with an explicit `node_id` instead.
344    pub fn from_address(address: impl Into<String>) -> Self {
345        let addr = address.into();
346        Self {
347            node_id: NodeId(addr.clone()),
348            address: addr,
349        }
350    }
351}
352
353/// Trait for cluster discovery — how nodes find each other.
354#[async_trait::async_trait]
355pub trait ClusterDiscovery: Send + Sync + 'static {
356    /// Discover peers to connect to.
357    ///
358    /// Returns a list of [`DiscoveredPeer`]s. The runtime will compare this
359    /// against currently connected peers and attempt to connect new ones
360    /// via [`verify_peer_identity`](crate::verify_peer_identity) and
361    /// [`connect_peer`] on the adapter runtime.
362    async fn discover(&self) -> Result<Vec<DiscoveredPeer>, DiscoveryError>;
363}
364
365/// Static list of seed peers (simplest discovery mechanism).
366pub struct StaticSeeds {
367    /// Pre-configured peers.
368    pub peers: Vec<DiscoveredPeer>,
369}
370
371impl StaticSeeds {
372    /// Create static seeds from a list of addresses.
373    ///
374    /// Each address is used as both the node ID and the endpoint
375    /// (via [`DiscoveredPeer::from_address`]).
376    pub fn new(addresses: Vec<String>) -> Self {
377        Self {
378            peers: addresses.into_iter().map(DiscoveredPeer::from_address).collect(),
379        }
380    }
381
382    /// Create static seeds from a list of [`DiscoveredPeer`]s with
383    /// explicit node IDs.
384    pub fn from_peers(peers: Vec<DiscoveredPeer>) -> Self {
385        Self { peers }
386    }
387}
388
389#[async_trait::async_trait]
390impl ClusterDiscovery for StaticSeeds {
391    async fn discover(&self) -> Result<Vec<DiscoveredPeer>, DiscoveryError> {
392        Ok(self.peers.clone())
393    }
394}
395
396// ---------------------------------------------------------------------------
397// JsonSerializer (serde feature)
398// ---------------------------------------------------------------------------
399
400/// JSON-based message serializer using `serde_json`.
401///
402/// Human-readable format, good for debugging and interop. Slightly slower
403/// and larger than binary formats but excellent for development and
404/// diagnostics.
405#[cfg(feature = "serde")]
406pub struct JsonSerializer;
407
408#[cfg(feature = "serde")]
409impl JsonSerializer {
410    /// Serialize a value to JSON bytes.
411    pub fn serialize_typed<T: serde::Serialize>(value: &T) -> Result<Vec<u8>, SerializationError> {
412        serde_json::to_vec(value)
413            .map_err(|e| SerializationError::new(format!("json serialize: {e}")))
414    }
415
416    /// Deserialize JSON bytes to a typed value.
417    pub fn deserialize_typed<T: serde::de::DeserializeOwned>(
418        bytes: &[u8],
419    ) -> Result<T, SerializationError> {
420        serde_json::from_slice(bytes)
421            .map_err(|e| SerializationError::new(format!("json deserialize: {e}")))
422    }
423}
424
425// ---------------------------------------------------------------------------
426// Wire pipeline helpers
427// ---------------------------------------------------------------------------
428
429/// Build a [`WireEnvelope`] for a remote tell (fire-and-forget).
430///
431/// Serializes the message body to bytes and packages it with the target,
432/// headers, and metadata into a wire-ready envelope.
433#[cfg(feature = "serde")]
434pub fn build_tell_envelope<M: serde::Serialize>(
435    target: crate::node::ActorId,
436    target_name: impl Into<String>,
437    msg: &M,
438    headers: WireHeaders,
439) -> Result<WireEnvelope, SerializationError> {
440    let body = JsonSerializer::serialize_typed(msg)?;
441    Ok(WireEnvelope {
442        target,
443        target_name: target_name.into(),
444        message_type: std::any::type_name::<M>().to_string(),
445        send_mode: crate::interceptor::SendMode::Tell,
446        headers,
447        body,
448        request_id: None,
449        version: None,
450    })
451}
452
453/// Build a [`WireEnvelope`] for a remote ask (request-reply).
454///
455/// Includes a `request_id` for correlating the reply.
456#[cfg(feature = "serde")]
457pub fn build_ask_envelope<M: serde::Serialize>(
458    target: crate::node::ActorId,
459    target_name: impl Into<String>,
460    msg: &M,
461    headers: WireHeaders,
462    request_id: uuid::Uuid,
463) -> Result<WireEnvelope, SerializationError> {
464    let body = JsonSerializer::serialize_typed(msg)?;
465    Ok(WireEnvelope {
466        target,
467        target_name: target_name.into(),
468        message_type: std::any::type_name::<M>().to_string(),
469        send_mode: crate::interceptor::SendMode::Ask,
470        headers,
471        body,
472        request_id: Some(request_id),
473        version: None,
474    })
475}
476
477/// Build a [`WireEnvelope`] with an explicit send mode and optional request ID.
478///
479/// Lower-level builder for stream and feed modes.
480#[cfg(feature = "serde")]
481pub fn build_wire_envelope<M: serde::Serialize>(
482    target: crate::node::ActorId,
483    target_name: impl Into<String>,
484    msg: &M,
485    send_mode: crate::interceptor::SendMode,
486    headers: WireHeaders,
487    request_id: Option<uuid::Uuid>,
488    version: Option<u32>,
489) -> Result<WireEnvelope, SerializationError> {
490    let body = JsonSerializer::serialize_typed(msg)?;
491    Ok(WireEnvelope {
492        target,
493        target_name: target_name.into(),
494        message_type: std::any::type_name::<M>().to_string(),
495        send_mode,
496        headers,
497        body,
498        request_id,
499        version,
500    })
501}
502
503/// Receive-side: deserialize a [`WireEnvelope`]'s body using a
504/// [`TypeRegistry`](crate::type_registry::TypeRegistry).
505///
506/// Returns the type-erased deserialized message. The caller can downcast
507/// to the concrete type using `Any::downcast::<M>()`.
508pub fn receive_envelope_body(
509    envelope: &WireEnvelope,
510    registry: &crate::type_registry::TypeRegistry,
511) -> Result<Box<dyn std::any::Any + Send>, SerializationError> {
512    registry.deserialize(&envelope.message_type, &envelope.body)
513}
514
515/// Receive-side with version checking: deserialize a [`WireEnvelope`]'s body,
516/// applying version migration if a [`MessageVersionHandler`] is registered.
517pub fn receive_envelope_body_versioned(
518    envelope: &WireEnvelope,
519    registry: &crate::type_registry::TypeRegistry,
520    version_handlers: &std::collections::HashMap<String, Box<dyn MessageVersionHandler>>,
521    expected_version: Option<u32>,
522) -> Result<Box<dyn std::any::Any + Send>, SerializationError> {
523    let body = match (envelope.version, expected_version) {
524        (Some(received), Some(expected)) if received != expected => {
525            // Version mismatch — try migration
526            if let Some(handler) = version_handlers.get(&envelope.message_type) {
527                handler
528                    .migrate(&envelope.body, received, expected)
529                    .ok_or_else(|| {
530                        SerializationError::new(format!(
531                            "{}: cannot migrate from v{received} to v{expected}",
532                            envelope.message_type
533                        ))
534                    })?
535            } else {
536                // No handler registered — use body as-is (rely on serde defaults)
537                envelope.body.clone()
538            }
539        }
540        _ => envelope.body.clone(),
541    };
542
543    registry.deserialize(&envelope.message_type, &body)
544}
545
546#[cfg(test)]
547mod tests {
548    use super::*;
549    use crate::interceptor::SendMode;
550    use crate::node::NodeId;
551
552    #[test]
553    fn test_wire_envelope_construction() {
554        let envelope = WireEnvelope {
555            target: ActorId {
556                node: NodeId("node-1".into()),
557                local: 42,
558            },
559            target_name: "test".into(),
560            message_type: "my_crate::Increment".into(),
561            send_mode: SendMode::Tell,
562            headers: WireHeaders::new(),
563            body: vec![1, 2, 3],
564            request_id: None,
565            version: Some(1),
566        };
567        assert_eq!(envelope.message_type, "my_crate::Increment");
568        assert_eq!(envelope.body, vec![1, 2, 3]);
569        assert_eq!(envelope.version, Some(1));
570    }
571
572    #[test]
573    fn test_wire_headers() {
574        let mut headers = WireHeaders::new();
575        assert!(headers.is_empty());
576        headers.insert("trace-id".into(), b"abc-123".to_vec());
577        headers.insert("priority".into(), vec![128]);
578        assert_eq!(headers.len(), 2);
579        assert_eq!(headers.get("trace-id").unwrap(), b"abc-123");
580        assert_eq!(headers.get("priority").unwrap(), &[128]);
581        assert!(headers.get("missing").is_none());
582    }
583
584    #[test]
585    fn test_serialization_error() {
586        let err = SerializationError::new("invalid format");
587        assert!(format!("{}", err).contains("invalid format"));
588    }
589
590    #[test]
591    fn test_cluster_state() {
592        let mut state = ClusterState::new(
593            NodeId("node-1".into()),
594            vec![
595                NodeId("node-1".into()),
596                NodeId("node-2".into()),
597                NodeId("node-3".into()),
598            ],
599        );
600        state.is_leader = true;
601        assert_eq!(state.node_count(), 3);
602        assert!(state.contains(&NodeId("node-2".into())));
603        assert!(!state.contains(&NodeId("node-99".into())));
604        assert!(state.is_leader);
605        assert!(state.app_version.is_none());
606        assert_eq!(
607            state.wire_version,
608            crate::version::WireVersion::parse(crate::version::DACTOR_WIRE_VERSION).unwrap()
609        );
610        assert!(state.peer_versions.is_empty());
611    }
612
613    #[test]
614    fn test_cluster_state_with_app_version() {
615        let mut state = ClusterState::new(
616            NodeId("node-1".into()),
617            vec![NodeId("node-1".into()), NodeId("node-2".into())],
618        );
619        state.app_version = Some("2.3.1".into());
620        assert_eq!(state.app_version.as_deref(), Some("2.3.1"));
621    }
622
623    #[test]
624    fn test_cluster_state_peer_versions() {
625        let mut state = ClusterState::new(
626            NodeId("node-1".into()),
627            vec![
628                NodeId("node-1".into()),
629                NodeId("node-2".into()),
630                NodeId("node-3".into()),
631            ],
632        );
633        state.peer_versions.insert(
634            NodeId("node-2".into()),
635            PeerVersionInfo {
636                wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
637                app_version: Some("1.0.0".into()),
638                adapter: "ractor".into(),
639            },
640        );
641        state.peer_versions.insert(
642            NodeId("node-3".into()),
643            PeerVersionInfo {
644                wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
645                app_version: Some("1.0.1".into()),
646                adapter: "ractor".into(),
647            },
648        );
649
650        // Lookup works
651        let p2 = state.peer_version(&NodeId("node-2".into())).unwrap();
652        assert_eq!(p2.app_version.as_deref(), Some("1.0.0"));
653        assert_eq!(p2.adapter, "ractor");
654
655        let p3 = state.peer_version(&NodeId("node-3".into())).unwrap();
656        assert_eq!(p3.app_version.as_deref(), Some("1.0.1"));
657
658        // Local node not in peer_versions
659        assert!(state.peer_version(&NodeId("node-1".into())).is_none());
660
661        // Unknown node not in peer_versions
662        assert!(state.peer_version(&NodeId("node-99".into())).is_none());
663    }
664
665    #[test]
666    fn test_cluster_state_mixed_app_versions() {
667        let mut state = ClusterState::new(
668            NodeId("node-1".into()),
669            vec![
670                NodeId("node-1".into()),
671                NodeId("node-2".into()),
672                NodeId("node-3".into()),
673            ],
674        );
675        state.app_version = Some("2.3.1".into());
676        state.peer_versions.insert(
677            NodeId("node-2".into()),
678            PeerVersionInfo {
679                wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
680                app_version: Some("2.3.0".into()),
681                adapter: "ractor".into(),
682            },
683        );
684        state.peer_versions.insert(
685            NodeId("node-3".into()),
686            PeerVersionInfo {
687                wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
688                app_version: Some("2.3.1".into()),
689                adapter: "ractor".into(),
690            },
691        );
692
693        // Can compute rollout progress
694        let total = state.node_count();
695        let on_latest = 1 // local
696            + state.peer_versions.values()
697                .filter(|p| p.app_version.as_deref() == Some("2.3.1"))
698                .count();
699        assert_eq!(total, 3);
700        assert_eq!(on_latest, 2); // node-1 (local) + node-3
701    }
702
703    #[tokio::test]
704    async fn test_static_seeds() {
705        let seeds = StaticSeeds::new(vec!["node1:4697".into(), "node2:4697".into()]);
706        let discovered = seeds.discover().await.unwrap();
707        assert_eq!(discovered.len(), 2);
708        assert_eq!(discovered[0].address, "node1:4697");
709        assert_eq!(discovered[0].node_id, NodeId("node1:4697".into()));
710    }
711
712    #[tokio::test]
713    async fn test_static_seeds_from_peers() {
714        let seeds = StaticSeeds::from_peers(vec![
715            DiscoveredPeer::new(NodeId("node-a".into()), "10.0.0.1:9000"),
716            DiscoveredPeer::new(NodeId("node-b".into()), "10.0.0.2:9000"),
717        ]);
718        let discovered = seeds.discover().await.unwrap();
719        assert_eq!(discovered.len(), 2);
720        assert_eq!(discovered[0].node_id, NodeId("node-a".into()));
721        assert_eq!(discovered[0].address, "10.0.0.1:9000");
722        assert_eq!(discovered[1].node_id, NodeId("node-b".into()));
723    }
724
725    #[test]
726    fn test_discovered_peer_from_address() {
727        let peer = DiscoveredPeer::from_address("10.0.0.1:9000");
728        assert_eq!(peer.node_id, NodeId("10.0.0.1:9000".into()));
729        assert_eq!(peer.address, "10.0.0.1:9000");
730    }
731
732    #[test]
733    fn test_wire_envelope_with_request_id() {
734        let envelope = WireEnvelope {
735            target: ActorId {
736                node: NodeId("n".into()),
737                local: 1,
738            },
739            target_name: "test".into(),
740            message_type: "Ask".into(),
741            send_mode: SendMode::Ask,
742            headers: WireHeaders::new(),
743            body: vec![],
744            request_id: Some(Uuid::new_v4()),
745            version: None,
746        };
747        assert!(envelope.request_id.is_some());
748        assert_eq!(envelope.send_mode, SendMode::Ask);
749    }
750
751    #[test]
752    fn test_header_registry_roundtrip() {
753        use crate::message::HeaderValue;
754        use std::any::Any;
755
756        #[derive(Debug, Clone)]
757        struct TraceId(String);
758        impl HeaderValue for TraceId {
759            fn header_name(&self) -> &'static str {
760                "trace-id"
761            }
762            fn to_bytes(&self) -> Option<Vec<u8>> {
763                Some(self.0.as_bytes().to_vec())
764            }
765            fn as_any(&self) -> &dyn Any {
766                self
767            }
768        }
769
770        // Set up header registry
771        let mut registry = HeaderRegistry::new();
772        registry.register("trace-id", |bytes: &[u8]| {
773            let s = String::from_utf8(bytes.to_vec()).ok()?;
774            Some(Box::new(TraceId(s)) as Box<dyn HeaderValue>)
775        });
776
777        assert_eq!(registry.len(), 1);
778        assert!(!registry.is_empty());
779
780        // Create typed headers and convert to wire
781        let mut headers = crate::message::Headers::new();
782        headers.insert(TraceId("abc-123".into()));
783        let wire = headers.to_wire();
784
785        assert_eq!(wire.len(), 1);
786        assert_eq!(wire.get("trace-id").unwrap(), b"abc-123");
787
788        // Convert wire back to typed headers via registry
789        let restored = wire.to_headers(&registry);
790        let trace = restored.get::<TraceId>().unwrap();
791        assert_eq!(trace.0, "abc-123");
792    }
793
794    #[test]
795    fn test_header_registry_missing_deserializer() {
796        let registry = HeaderRegistry::new();
797        assert!(registry.deserialize("unknown", &[]).is_none());
798    }
799
800    #[test]
801    fn test_headers_to_wire_skips_local_only() {
802        use crate::message::HeaderValue;
803        use std::any::Any;
804
805        #[derive(Debug)]
806        struct LocalOnlyHeader;
807        impl HeaderValue for LocalOnlyHeader {
808            fn header_name(&self) -> &'static str {
809                "local-only"
810            }
811            fn to_bytes(&self) -> Option<Vec<u8>> {
812                None
813            }
814            fn as_any(&self) -> &dyn Any {
815                self
816            }
817        }
818
819        let mut headers = crate::message::Headers::new();
820        headers.insert(LocalOnlyHeader);
821        let wire = headers.to_wire();
822        assert!(wire.is_empty());
823    }
824
825    #[test]
826    fn test_receive_envelope_body() {
827        let mut registry = crate::type_registry::TypeRegistry::new();
828        registry.register("test::Amount", |bytes: &[u8]| {
829            if bytes.len() != 8 {
830                return Err(SerializationError::new("expected 8 bytes"));
831            }
832            let val = u64::from_be_bytes(bytes.try_into().unwrap());
833            Ok(Box::new(val))
834        });
835
836        let envelope = WireEnvelope {
837            target: ActorId {
838                node: NodeId("n".into()),
839                local: 1,
840            },
841            target_name: "test".into(),
842            message_type: "test::Amount".into(),
843            send_mode: SendMode::Tell,
844            headers: WireHeaders::new(),
845            body: 42u64.to_be_bytes().to_vec(),
846            request_id: None,
847            version: None,
848        };
849
850        let any = receive_envelope_body(&envelope, &registry).unwrap();
851        let val = any.downcast::<u64>().unwrap();
852        assert_eq!(*val, 42);
853    }
854
855    #[test]
856    fn test_receive_envelope_body_unknown_type() {
857        let registry = crate::type_registry::TypeRegistry::new();
858        let envelope = WireEnvelope {
859            target: ActorId {
860                node: NodeId("n".into()),
861                local: 1,
862            },
863            target_name: "test".into(),
864            message_type: "unknown::Type".into(),
865            send_mode: SendMode::Tell,
866            headers: WireHeaders::new(),
867            body: vec![],
868            request_id: None,
869            version: None,
870        };
871
872        let result = receive_envelope_body(&envelope, &registry);
873        assert!(result.is_err());
874        assert!(result.unwrap_err().message.contains("no deserializer"));
875    }
876
877    #[test]
878    fn test_version_mismatch_with_handler() {
879        let mut registry = crate::type_registry::TypeRegistry::new();
880        // Register v2 deserializer (u64 big-endian)
881        registry.register("test::Versioned", |bytes: &[u8]| {
882            if bytes.len() != 8 {
883                return Err(SerializationError::new("expected 8 bytes"));
884            }
885            let val = u64::from_be_bytes(bytes.try_into().unwrap());
886            Ok(Box::new(val))
887        });
888
889        // Version handler that doubles the value during migration
890        struct DoubleMigrator;
891        impl MessageVersionHandler for DoubleMigrator {
892            fn message_type(&self) -> &'static str {
893                "test::Versioned"
894            }
895            fn migrate(&self, payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
896                if payload.len() != 8 {
897                    return None;
898                }
899                let val = u64::from_be_bytes(payload.try_into().unwrap());
900                Some((val * 2).to_be_bytes().to_vec())
901            }
902        }
903
904        let mut version_handlers: std::collections::HashMap<
905            String,
906            Box<dyn MessageVersionHandler>,
907        > = std::collections::HashMap::new();
908        version_handlers.insert("test::Versioned".into(), Box::new(DoubleMigrator));
909
910        let envelope = WireEnvelope {
911            target: ActorId {
912                node: NodeId("n".into()),
913                local: 1,
914            },
915            target_name: "test".into(),
916            message_type: "test::Versioned".into(),
917            send_mode: SendMode::Tell,
918            headers: WireHeaders::new(),
919            body: 21u64.to_be_bytes().to_vec(),
920            request_id: None,
921            version: Some(1), // sender is v1
922        };
923
924        let any = receive_envelope_body_versioned(
925            &envelope,
926            &registry,
927            &version_handlers,
928            Some(2), // we expect v2
929        )
930        .unwrap();
931        let val = any.downcast::<u64>().unwrap();
932        assert_eq!(*val, 42); // 21 * 2 = 42 (migrated)
933    }
934
935    #[test]
936    fn test_version_match_skips_migration() {
937        // When versions match, no migration is attempted — even if a handler exists.
938        let mut registry = crate::type_registry::TypeRegistry::new();
939        registry.register("test::Same", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
940
941        struct PanicMigrator;
942        impl MessageVersionHandler for PanicMigrator {
943            fn message_type(&self) -> &'static str {
944                "test::Same"
945            }
946            fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
947                panic!("migrate should not be called when versions match");
948            }
949        }
950
951        let mut version_handlers: std::collections::HashMap<
952            String,
953            Box<dyn MessageVersionHandler>,
954        > = std::collections::HashMap::new();
955        version_handlers.insert("test::Same".into(), Box::new(PanicMigrator));
956
957        let envelope = WireEnvelope {
958            target: ActorId {
959                node: NodeId("n".into()),
960                local: 1,
961            },
962            target_name: "test".into(),
963            message_type: "test::Same".into(),
964            send_mode: SendMode::Tell,
965            headers: WireHeaders::new(),
966            body: vec![1, 2, 3],
967            request_id: None,
968            version: Some(2),
969        };
970
971        let any = receive_envelope_body_versioned(
972            &envelope,
973            &registry,
974            &version_handlers,
975            Some(2), // same version — handler must NOT be called
976        )
977        .unwrap();
978        let val = any.downcast::<Vec<u8>>().unwrap();
979        assert_eq!(*val, vec![1, 2, 3]);
980    }
981
982    // --- T11: Version breaking change — migration rejection scenarios ---
983
984    #[test]
985    fn test_version_mismatch_no_handler_falls_through() {
986        // When no MessageVersionHandler is registered, version mismatch
987        // falls through to deserialize the body as-is (relies on serde defaults).
988        let mut registry = crate::type_registry::TypeRegistry::new();
989        registry.register("test::NoHandler", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
990
991        let version_handlers: std::collections::HashMap<String, Box<dyn MessageVersionHandler>> =
992            std::collections::HashMap::new();
993
994        let envelope = WireEnvelope {
995            target: ActorId {
996                node: NodeId("n".into()),
997                local: 1,
998            },
999            target_name: "test".into(),
1000            message_type: "test::NoHandler".into(),
1001            send_mode: SendMode::Tell,
1002            headers: WireHeaders::new(),
1003            body: vec![10, 20],
1004            request_id: None,
1005            version: Some(1), // sender v1
1006        };
1007
1008        // Receiver expects v2 but has no handler — body passes through
1009        let any = receive_envelope_body_versioned(
1010            &envelope,
1011            &registry,
1012            &version_handlers,
1013            Some(2),
1014        )
1015        .unwrap();
1016        let val = any.downcast::<Vec<u8>>().unwrap();
1017        assert_eq!(*val, vec![10, 20]);
1018    }
1019
1020    #[test]
1021    fn test_version_mismatch_handler_returns_none_rejects() {
1022        // When the MessageVersionHandler cannot migrate (returns None),
1023        // the call should fail with a clear error.
1024        let mut registry = crate::type_registry::TypeRegistry::new();
1025        registry.register("test::FailMigrate", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1026
1027        struct RejectingMigrator;
1028        impl MessageVersionHandler for RejectingMigrator {
1029            fn message_type(&self) -> &'static str {
1030                "test::FailMigrate"
1031            }
1032            fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
1033                None // migration not possible
1034            }
1035        }
1036
1037        let mut version_handlers: std::collections::HashMap<
1038            String,
1039            Box<dyn MessageVersionHandler>,
1040        > = std::collections::HashMap::new();
1041        version_handlers.insert("test::FailMigrate".into(), Box::new(RejectingMigrator));
1042
1043        let envelope = WireEnvelope {
1044            target: ActorId {
1045                node: NodeId("n".into()),
1046                local: 1,
1047            },
1048            target_name: "test".into(),
1049            message_type: "test::FailMigrate".into(),
1050            send_mode: SendMode::Tell,
1051            headers: WireHeaders::new(),
1052            body: vec![1, 2, 3],
1053            request_id: None,
1054            version: Some(1), // sender v1
1055        };
1056
1057        let result = receive_envelope_body_versioned(
1058            &envelope,
1059            &registry,
1060            &version_handlers,
1061            Some(2), // receiver expects v2
1062        );
1063        assert!(result.is_err());
1064        let err = result.unwrap_err();
1065        assert!(
1066            err.message.contains("cannot migrate from v1 to v2"),
1067            "expected migration rejection, got: {}",
1068            err.message
1069        );
1070    }
1071
1072    #[test]
1073    fn test_version_none_on_sender_skips_migration() {
1074        // When the sender doesn't set a version (None), no migration is attempted
1075        // regardless of the receiver's expected version — even if a handler exists.
1076        let mut registry = crate::type_registry::TypeRegistry::new();
1077        registry.register("test::OptionalVersion", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1078
1079        // Register a panicking handler to prove it's never called
1080        struct PanicMigrator;
1081        impl MessageVersionHandler for PanicMigrator {
1082            fn message_type(&self) -> &'static str {
1083                "test::OptionalVersion"
1084            }
1085            fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
1086                panic!("migrate should not be called when sender has no version");
1087            }
1088        }
1089
1090        let mut version_handlers: std::collections::HashMap<
1091            String,
1092            Box<dyn MessageVersionHandler>,
1093        > = std::collections::HashMap::new();
1094        version_handlers.insert("test::OptionalVersion".into(), Box::new(PanicMigrator));
1095
1096        let envelope = WireEnvelope {
1097            target: ActorId {
1098                node: NodeId("n".into()),
1099                local: 1,
1100            },
1101            target_name: "test".into(),
1102            message_type: "test::OptionalVersion".into(),
1103            send_mode: SendMode::Tell,
1104            headers: WireHeaders::new(),
1105            body: vec![7, 8, 9],
1106            request_id: None,
1107            version: None, // sender has no version
1108        };
1109
1110        let any = receive_envelope_body_versioned(
1111            &envelope,
1112            &registry,
1113            &version_handlers,
1114            Some(2), // receiver expects v2
1115        )
1116        .unwrap();
1117        let val = any.downcast::<Vec<u8>>().unwrap();
1118        assert_eq!(*val, vec![7, 8, 9]);
1119    }
1120
1121    #[test]
1122    fn test_version_none_on_both_sides_skips_migration() {
1123        // When neither side specifies a version, no migration is attempted.
1124        let mut registry = crate::type_registry::TypeRegistry::new();
1125        registry.register("test::NoVersion", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1126
1127        let version_handlers: std::collections::HashMap<String, Box<dyn MessageVersionHandler>> =
1128            std::collections::HashMap::new();
1129
1130        let envelope = WireEnvelope {
1131            target: ActorId {
1132                node: NodeId("n".into()),
1133                local: 1,
1134            },
1135            target_name: "test".into(),
1136            message_type: "test::NoVersion".into(),
1137            send_mode: SendMode::Tell,
1138            headers: WireHeaders::new(),
1139            body: vec![4, 5, 6],
1140            request_id: None,
1141            version: None,
1142        };
1143
1144        let any = receive_envelope_body_versioned(
1145            &envelope,
1146            &registry,
1147            &version_handlers,
1148            None, // receiver also has no version expectation
1149        )
1150        .unwrap();
1151        let val = any.downcast::<Vec<u8>>().unwrap();
1152        assert_eq!(*val, vec![4, 5, 6]);
1153    }
1154
1155    #[test]
1156    fn test_version_none_on_receiver_skips_migration() {
1157        // When receiver has no version expectation (None) but sender has a
1158        // version, no migration is attempted — even if a handler exists.
1159        let mut registry = crate::type_registry::TypeRegistry::new();
1160        registry.register("test::ReceiverNone", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1161
1162        struct PanicMigrator;
1163        impl MessageVersionHandler for PanicMigrator {
1164            fn message_type(&self) -> &'static str {
1165                "test::ReceiverNone"
1166            }
1167            fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
1168                panic!("migrate should not be called when receiver has no version expectation");
1169            }
1170        }
1171
1172        let mut version_handlers: std::collections::HashMap<
1173            String,
1174            Box<dyn MessageVersionHandler>,
1175        > = std::collections::HashMap::new();
1176        version_handlers.insert("test::ReceiverNone".into(), Box::new(PanicMigrator));
1177
1178        let envelope = WireEnvelope {
1179            target: ActorId {
1180                node: NodeId("n".into()),
1181                local: 1,
1182            },
1183            target_name: "test".into(),
1184            message_type: "test::ReceiverNone".into(),
1185            send_mode: SendMode::Tell,
1186            headers: WireHeaders::new(),
1187            body: vec![11, 22, 33],
1188            request_id: None,
1189            version: Some(3), // sender has v3
1190        };
1191
1192        let any = receive_envelope_body_versioned(
1193            &envelope,
1194            &registry,
1195            &version_handlers,
1196            None, // receiver has no version expectation
1197        )
1198        .unwrap();
1199        let val = any.downcast::<Vec<u8>>().unwrap();
1200        assert_eq!(*val, vec![11, 22, 33]);
1201    }
1202
1203    #[test]
1204    fn test_version_backward_migration_v2_to_v1() {
1205        // Verify migration works in reverse direction (newer sender, older receiver).
1206        let mut registry = crate::type_registry::TypeRegistry::new();
1207        registry.register("test::Backward", |bytes: &[u8]| {
1208            if bytes.len() != 8 {
1209                return Err(SerializationError::new("expected 8 bytes"));
1210            }
1211            let val = u64::from_be_bytes(bytes.try_into().unwrap());
1212            Ok(Box::new(val))
1213        });
1214
1215        struct HalveMigrator;
1216        impl MessageVersionHandler for HalveMigrator {
1217            fn message_type(&self) -> &'static str {
1218                "test::Backward"
1219            }
1220            fn migrate(&self, payload: &[u8], from: u32, to: u32) -> Option<Vec<u8>> {
1221                if from > to {
1222                    // Downgrade: halve the value
1223                    let val = u64::from_be_bytes(payload.try_into().ok()?);
1224                    Some((val / 2).to_be_bytes().to_vec())
1225                } else {
1226                    None
1227                }
1228            }
1229        }
1230
1231        let mut version_handlers: std::collections::HashMap<
1232            String,
1233            Box<dyn MessageVersionHandler>,
1234        > = std::collections::HashMap::new();
1235        version_handlers.insert("test::Backward".into(), Box::new(HalveMigrator));
1236
1237        let envelope = WireEnvelope {
1238            target: ActorId {
1239                node: NodeId("n".into()),
1240                local: 1,
1241            },
1242            target_name: "test".into(),
1243            message_type: "test::Backward".into(),
1244            send_mode: SendMode::Tell,
1245            headers: WireHeaders::new(),
1246            body: 100u64.to_be_bytes().to_vec(),
1247            request_id: None,
1248            version: Some(2), // sender is v2
1249        };
1250
1251        let any = receive_envelope_body_versioned(
1252            &envelope,
1253            &registry,
1254            &version_handlers,
1255            Some(1), // receiver expects v1
1256        )
1257        .unwrap();
1258        let val = any.downcast::<u64>().unwrap();
1259        assert_eq!(*val, 50); // 100 / 2 = 50 (downgraded)
1260    }
1261
1262    #[cfg(feature = "serde")]
1263    mod serde_tests {
1264        use super::*;
1265
1266        #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
1267        struct Increment {
1268            amount: u64,
1269        }
1270
1271        #[test]
1272        fn json_serializer_roundtrip() {
1273            let msg = Increment { amount: 42 };
1274            let bytes = JsonSerializer::serialize_typed(&msg).unwrap();
1275            let deserialized: Increment = JsonSerializer::deserialize_typed(&bytes).unwrap();
1276            assert_eq!(deserialized, msg);
1277        }
1278
1279        #[test]
1280        fn json_serializer_invalid_bytes() {
1281            let result = JsonSerializer::deserialize_typed::<Increment>(b"not json");
1282            assert!(result.is_err());
1283            assert!(result.unwrap_err().message.contains("json deserialize"));
1284        }
1285
1286        #[test]
1287        fn build_tell_envelope_roundtrip() {
1288            let target = ActorId {
1289                node: NodeId("node-2".into()),
1290                local: 7,
1291            };
1292            let msg = Increment { amount: 100 };
1293            let envelope =
1294                build_tell_envelope(target.clone(), "counter", &msg, WireHeaders::new()).unwrap();
1295
1296            assert_eq!(envelope.target, target);
1297            assert_eq!(envelope.send_mode, SendMode::Tell);
1298            assert!(envelope.request_id.is_none());
1299            assert!(envelope.message_type.contains("Increment"));
1300
1301            // Deserialize body back
1302            let deserialized: Increment =
1303                JsonSerializer::deserialize_typed(&envelope.body).unwrap();
1304            assert_eq!(deserialized.amount, 100);
1305        }
1306
1307        #[test]
1308        fn build_ask_envelope_roundtrip() {
1309            let target = ActorId {
1310                node: NodeId("node-3".into()),
1311                local: 42,
1312            };
1313            let msg = Increment { amount: 5 };
1314            let request_id = Uuid::new_v4();
1315            let envelope = build_ask_envelope(
1316                target.clone(),
1317                "counter",
1318                &msg,
1319                WireHeaders::new(),
1320                request_id,
1321            )
1322            .unwrap();
1323
1324            assert_eq!(envelope.target, target);
1325            assert_eq!(envelope.send_mode, SendMode::Ask);
1326            assert_eq!(envelope.request_id, Some(request_id));
1327        }
1328
1329        #[test]
1330        fn full_pipeline_send_and_receive() {
1331            // 1. Build envelope (sender side)
1332            let target = ActorId {
1333                node: NodeId("node-2".into()),
1334                local: 1,
1335            };
1336            let msg = Increment { amount: 77 };
1337            let envelope =
1338                build_tell_envelope(target, "counter", &msg, WireHeaders::new()).unwrap();
1339
1340            // 2. Register type on receiver side
1341            let mut registry = crate::type_registry::TypeRegistry::new();
1342            registry.register_type::<Increment>();
1343
1344            // 3. Receive and deserialize
1345            let any = receive_envelope_body(&envelope, &registry).unwrap();
1346            let received = any.downcast::<Increment>().unwrap();
1347            assert_eq!(received.amount, 77);
1348        }
1349
1350        #[test]
1351        fn full_pipeline_with_headers() {
1352            use crate::message::HeaderValue;
1353            use std::any::Any;
1354
1355            #[derive(Debug, Clone)]
1356            struct Priority(u8);
1357            impl HeaderValue for Priority {
1358                fn header_name(&self) -> &'static str {
1359                    "priority"
1360                }
1361                fn to_bytes(&self) -> Option<Vec<u8>> {
1362                    Some(vec![self.0])
1363                }
1364                fn as_any(&self) -> &dyn Any {
1365                    self
1366                }
1367            }
1368
1369            // Sender: typed headers → wire
1370            let mut headers = crate::message::Headers::new();
1371            headers.insert(Priority(5));
1372            let wire_headers = headers.to_wire();
1373
1374            let target = ActorId {
1375                node: NodeId("node-2".into()),
1376                local: 1,
1377            };
1378            let msg = Increment { amount: 10 };
1379            let envelope = build_tell_envelope(target, "counter", &msg, wire_headers).unwrap();
1380
1381            // Receiver: wire headers → typed (via registry)
1382            let mut header_registry = HeaderRegistry::new();
1383            header_registry.register("priority", |bytes: &[u8]| {
1384                if bytes.len() != 1 {
1385                    return None;
1386                }
1387                Some(Box::new(Priority(bytes[0])) as Box<dyn HeaderValue>)
1388            });
1389
1390            let restored_headers = envelope.headers.to_headers(&header_registry);
1391            let priority = restored_headers.get::<Priority>().unwrap();
1392            assert_eq!(priority.0, 5);
1393        }
1394    }
1395}