1use crate::interceptor::SendMode;
7use crate::node::{ActorId, NodeId};
8use uuid::Uuid;
9
10pub trait RemoteMessage: crate::message::Message + Send + 'static {}
13
14pub trait MessageSerializer: Send + Sync + 'static {
19 fn name(&self) -> &'static str;
21
22 fn serialize(&self, value: &dyn std::any::Any) -> Result<Vec<u8>, SerializationError>;
24
25 fn deserialize(
27 &self,
28 bytes: &[u8],
29 type_name: &str,
30 ) -> Result<Box<dyn std::any::Any + Send>, SerializationError>;
31}
32
33#[derive(Debug, Clone)]
35pub struct SerializationError {
36 pub message: String,
38}
39
40impl SerializationError {
41 pub fn new(message: impl Into<String>) -> Self {
43 Self {
44 message: message.into(),
45 }
46 }
47}
48
49impl std::fmt::Display for SerializationError {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "serialization error: {}", self.message)
52 }
53}
54
55impl std::error::Error for SerializationError {}
56
57#[derive(Debug, Clone)]
60#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
61pub struct WireEnvelope {
62 pub target: ActorId,
64 pub target_name: String,
66 pub message_type: String,
68 pub send_mode: SendMode,
70 pub headers: WireHeaders,
72 pub body: Vec<u8>,
74 pub request_id: Option<Uuid>,
76 pub version: Option<u32>,
78}
79
80#[derive(Debug, Clone, Default)]
83#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
84pub struct WireHeaders {
85 pub entries: std::collections::HashMap<String, Vec<u8>>,
87}
88
89impl WireHeaders {
90 pub fn new() -> Self {
92 Self {
93 entries: std::collections::HashMap::new(),
94 }
95 }
96
97 pub fn insert(&mut self, name: String, value: Vec<u8>) {
99 self.entries.insert(name, value);
100 }
101
102 pub fn get(&self, name: &str) -> Option<&[u8]> {
104 self.entries.get(name).map(|v| v.as_slice())
105 }
106
107 pub fn is_empty(&self) -> bool {
109 self.entries.is_empty()
110 }
111 pub fn len(&self) -> usize {
113 self.entries.len()
114 }
115
116 pub fn to_headers(&self, registry: &HeaderRegistry) -> crate::message::Headers {
119 let mut headers = crate::message::Headers::new();
120 for (name, bytes) in &self.entries {
121 if let Some(header_value) = registry.deserialize(name, bytes) {
122 headers.insert_boxed(header_value);
123 }
124 }
125 headers
126 }
127}
128
129pub type HeaderDeserializerFn =
131 Box<dyn Fn(&[u8]) -> Option<Box<dyn crate::message::HeaderValue>> + Send + Sync>;
132
133pub struct HeaderRegistry {
139 deserializers: std::collections::HashMap<String, HeaderDeserializerFn>,
141}
142
143impl HeaderRegistry {
144 pub fn new() -> Self {
146 Self {
147 deserializers: std::collections::HashMap::new(),
148 }
149 }
150
151 pub fn register(
153 &mut self,
154 header_name: impl Into<String>,
155 deserializer: impl Fn(&[u8]) -> Option<Box<dyn crate::message::HeaderValue>>
156 + Send
157 + Sync
158 + 'static,
159 ) {
160 self.deserializers
161 .insert(header_name.into(), Box::new(deserializer));
162 }
163
164 pub fn deserialize(
167 &self,
168 header_name: &str,
169 bytes: &[u8],
170 ) -> Option<Box<dyn crate::message::HeaderValue>> {
171 let deser = self.deserializers.get(header_name)?;
172 deser(bytes)
173 }
174
175 pub fn len(&self) -> usize {
177 self.deserializers.len()
178 }
179
180 pub fn is_empty(&self) -> bool {
182 self.deserializers.is_empty()
183 }
184}
185
186impl Default for HeaderRegistry {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192pub trait MessageVersionHandler: Send + Sync + 'static {
195 fn message_type(&self) -> &'static str;
197
198 fn migrate(&self, payload: &[u8], from_version: u32, to_version: u32) -> Option<Vec<u8>>;
201}
202
203#[derive(Debug, Clone)]
217#[non_exhaustive]
218pub struct ClusterState {
219 pub local_node: NodeId,
221 pub nodes: Vec<NodeId>,
223 pub is_leader: bool,
225 pub wire_version: crate::version::WireVersion,
227 pub app_version: Option<String>,
230 pub peer_versions: std::collections::HashMap<NodeId, PeerVersionInfo>,
234}
235
236impl ClusterState {
237 pub fn new(local_node: NodeId, nodes: Vec<NodeId>) -> Self {
243 Self {
244 local_node,
245 nodes,
246 is_leader: false,
247 wire_version: crate::version::WireVersion::parse(
248 crate::version::DACTOR_WIRE_VERSION,
249 )
250 .expect("DACTOR_WIRE_VERSION must be valid"),
251 app_version: None,
252 peer_versions: std::collections::HashMap::new(),
253 }
254 }
255
256 pub fn node_count(&self) -> usize {
258 self.nodes.len()
259 }
260
261 pub fn contains(&self, node_id: &NodeId) -> bool {
263 self.nodes.contains(node_id)
264 }
265
266 pub fn peer_version(&self, node_id: &NodeId) -> Option<&PeerVersionInfo> {
268 self.peer_versions.get(node_id)
269 }
270}
271
272#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct PeerVersionInfo {
278 pub wire_version: crate::version::WireVersion,
280 pub app_version: Option<String>,
282 pub adapter: String,
284}
285
286#[derive(Debug, Clone)]
288pub struct DiscoveryError {
289 pub message: String,
291}
292
293impl DiscoveryError {
294 pub fn new(message: impl Into<String>) -> Self {
296 Self {
297 message: message.into(),
298 }
299 }
300}
301
302impl std::fmt::Display for DiscoveryError {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 write!(f, "{}", self.message)
305 }
306}
307
308impl std::error::Error for DiscoveryError {}
309
310#[derive(Debug, Clone, PartialEq, Eq, Hash)]
316pub struct DiscoveredPeer {
317 pub node_id: NodeId,
319 pub address: String,
321}
322
323impl DiscoveredPeer {
324 pub fn new(node_id: NodeId, address: impl Into<String>) -> Self {
326 Self {
327 node_id,
328 address: address.into(),
329 }
330 }
331
332 pub fn from_address(address: impl Into<String>) -> Self {
345 let addr = address.into();
346 Self {
347 node_id: NodeId(addr.clone()),
348 address: addr,
349 }
350 }
351}
352
353#[async_trait::async_trait]
355pub trait ClusterDiscovery: Send + Sync + 'static {
356 async fn discover(&self) -> Result<Vec<DiscoveredPeer>, DiscoveryError>;
363}
364
365pub struct StaticSeeds {
367 pub peers: Vec<DiscoveredPeer>,
369}
370
371impl StaticSeeds {
372 pub fn new(addresses: Vec<String>) -> Self {
377 Self {
378 peers: addresses.into_iter().map(DiscoveredPeer::from_address).collect(),
379 }
380 }
381
382 pub fn from_peers(peers: Vec<DiscoveredPeer>) -> Self {
385 Self { peers }
386 }
387}
388
389#[async_trait::async_trait]
390impl ClusterDiscovery for StaticSeeds {
391 async fn discover(&self) -> Result<Vec<DiscoveredPeer>, DiscoveryError> {
392 Ok(self.peers.clone())
393 }
394}
395
396#[cfg(feature = "serde")]
406pub struct JsonSerializer;
407
408#[cfg(feature = "serde")]
409impl JsonSerializer {
410 pub fn serialize_typed<T: serde::Serialize>(value: &T) -> Result<Vec<u8>, SerializationError> {
412 serde_json::to_vec(value)
413 .map_err(|e| SerializationError::new(format!("json serialize: {e}")))
414 }
415
416 pub fn deserialize_typed<T: serde::de::DeserializeOwned>(
418 bytes: &[u8],
419 ) -> Result<T, SerializationError> {
420 serde_json::from_slice(bytes)
421 .map_err(|e| SerializationError::new(format!("json deserialize: {e}")))
422 }
423}
424
425#[cfg(feature = "serde")]
434pub fn build_tell_envelope<M: serde::Serialize>(
435 target: crate::node::ActorId,
436 target_name: impl Into<String>,
437 msg: &M,
438 headers: WireHeaders,
439) -> Result<WireEnvelope, SerializationError> {
440 let body = JsonSerializer::serialize_typed(msg)?;
441 Ok(WireEnvelope {
442 target,
443 target_name: target_name.into(),
444 message_type: std::any::type_name::<M>().to_string(),
445 send_mode: crate::interceptor::SendMode::Tell,
446 headers,
447 body,
448 request_id: None,
449 version: None,
450 })
451}
452
453#[cfg(feature = "serde")]
457pub fn build_ask_envelope<M: serde::Serialize>(
458 target: crate::node::ActorId,
459 target_name: impl Into<String>,
460 msg: &M,
461 headers: WireHeaders,
462 request_id: uuid::Uuid,
463) -> Result<WireEnvelope, SerializationError> {
464 let body = JsonSerializer::serialize_typed(msg)?;
465 Ok(WireEnvelope {
466 target,
467 target_name: target_name.into(),
468 message_type: std::any::type_name::<M>().to_string(),
469 send_mode: crate::interceptor::SendMode::Ask,
470 headers,
471 body,
472 request_id: Some(request_id),
473 version: None,
474 })
475}
476
477#[cfg(feature = "serde")]
481pub fn build_wire_envelope<M: serde::Serialize>(
482 target: crate::node::ActorId,
483 target_name: impl Into<String>,
484 msg: &M,
485 send_mode: crate::interceptor::SendMode,
486 headers: WireHeaders,
487 request_id: Option<uuid::Uuid>,
488 version: Option<u32>,
489) -> Result<WireEnvelope, SerializationError> {
490 let body = JsonSerializer::serialize_typed(msg)?;
491 Ok(WireEnvelope {
492 target,
493 target_name: target_name.into(),
494 message_type: std::any::type_name::<M>().to_string(),
495 send_mode,
496 headers,
497 body,
498 request_id,
499 version,
500 })
501}
502
503pub fn receive_envelope_body(
509 envelope: &WireEnvelope,
510 registry: &crate::type_registry::TypeRegistry,
511) -> Result<Box<dyn std::any::Any + Send>, SerializationError> {
512 registry.deserialize(&envelope.message_type, &envelope.body)
513}
514
515pub fn receive_envelope_body_versioned(
518 envelope: &WireEnvelope,
519 registry: &crate::type_registry::TypeRegistry,
520 version_handlers: &std::collections::HashMap<String, Box<dyn MessageVersionHandler>>,
521 expected_version: Option<u32>,
522) -> Result<Box<dyn std::any::Any + Send>, SerializationError> {
523 let body = match (envelope.version, expected_version) {
524 (Some(received), Some(expected)) if received != expected => {
525 if let Some(handler) = version_handlers.get(&envelope.message_type) {
527 handler
528 .migrate(&envelope.body, received, expected)
529 .ok_or_else(|| {
530 SerializationError::new(format!(
531 "{}: cannot migrate from v{received} to v{expected}",
532 envelope.message_type
533 ))
534 })?
535 } else {
536 envelope.body.clone()
538 }
539 }
540 _ => envelope.body.clone(),
541 };
542
543 registry.deserialize(&envelope.message_type, &body)
544}
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549 use crate::interceptor::SendMode;
550 use crate::node::NodeId;
551
552 #[test]
553 fn test_wire_envelope_construction() {
554 let envelope = WireEnvelope {
555 target: ActorId {
556 node: NodeId("node-1".into()),
557 local: 42,
558 },
559 target_name: "test".into(),
560 message_type: "my_crate::Increment".into(),
561 send_mode: SendMode::Tell,
562 headers: WireHeaders::new(),
563 body: vec![1, 2, 3],
564 request_id: None,
565 version: Some(1),
566 };
567 assert_eq!(envelope.message_type, "my_crate::Increment");
568 assert_eq!(envelope.body, vec![1, 2, 3]);
569 assert_eq!(envelope.version, Some(1));
570 }
571
572 #[test]
573 fn test_wire_headers() {
574 let mut headers = WireHeaders::new();
575 assert!(headers.is_empty());
576 headers.insert("trace-id".into(), b"abc-123".to_vec());
577 headers.insert("priority".into(), vec![128]);
578 assert_eq!(headers.len(), 2);
579 assert_eq!(headers.get("trace-id").unwrap(), b"abc-123");
580 assert_eq!(headers.get("priority").unwrap(), &[128]);
581 assert!(headers.get("missing").is_none());
582 }
583
584 #[test]
585 fn test_serialization_error() {
586 let err = SerializationError::new("invalid format");
587 assert!(format!("{}", err).contains("invalid format"));
588 }
589
590 #[test]
591 fn test_cluster_state() {
592 let mut state = ClusterState::new(
593 NodeId("node-1".into()),
594 vec![
595 NodeId("node-1".into()),
596 NodeId("node-2".into()),
597 NodeId("node-3".into()),
598 ],
599 );
600 state.is_leader = true;
601 assert_eq!(state.node_count(), 3);
602 assert!(state.contains(&NodeId("node-2".into())));
603 assert!(!state.contains(&NodeId("node-99".into())));
604 assert!(state.is_leader);
605 assert!(state.app_version.is_none());
606 assert_eq!(
607 state.wire_version,
608 crate::version::WireVersion::parse(crate::version::DACTOR_WIRE_VERSION).unwrap()
609 );
610 assert!(state.peer_versions.is_empty());
611 }
612
613 #[test]
614 fn test_cluster_state_with_app_version() {
615 let mut state = ClusterState::new(
616 NodeId("node-1".into()),
617 vec![NodeId("node-1".into()), NodeId("node-2".into())],
618 );
619 state.app_version = Some("2.3.1".into());
620 assert_eq!(state.app_version.as_deref(), Some("2.3.1"));
621 }
622
623 #[test]
624 fn test_cluster_state_peer_versions() {
625 let mut state = ClusterState::new(
626 NodeId("node-1".into()),
627 vec![
628 NodeId("node-1".into()),
629 NodeId("node-2".into()),
630 NodeId("node-3".into()),
631 ],
632 );
633 state.peer_versions.insert(
634 NodeId("node-2".into()),
635 PeerVersionInfo {
636 wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
637 app_version: Some("1.0.0".into()),
638 adapter: "ractor".into(),
639 },
640 );
641 state.peer_versions.insert(
642 NodeId("node-3".into()),
643 PeerVersionInfo {
644 wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
645 app_version: Some("1.0.1".into()),
646 adapter: "ractor".into(),
647 },
648 );
649
650 let p2 = state.peer_version(&NodeId("node-2".into())).unwrap();
652 assert_eq!(p2.app_version.as_deref(), Some("1.0.0"));
653 assert_eq!(p2.adapter, "ractor");
654
655 let p3 = state.peer_version(&NodeId("node-3".into())).unwrap();
656 assert_eq!(p3.app_version.as_deref(), Some("1.0.1"));
657
658 assert!(state.peer_version(&NodeId("node-1".into())).is_none());
660
661 assert!(state.peer_version(&NodeId("node-99".into())).is_none());
663 }
664
665 #[test]
666 fn test_cluster_state_mixed_app_versions() {
667 let mut state = ClusterState::new(
668 NodeId("node-1".into()),
669 vec![
670 NodeId("node-1".into()),
671 NodeId("node-2".into()),
672 NodeId("node-3".into()),
673 ],
674 );
675 state.app_version = Some("2.3.1".into());
676 state.peer_versions.insert(
677 NodeId("node-2".into()),
678 PeerVersionInfo {
679 wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
680 app_version: Some("2.3.0".into()),
681 adapter: "ractor".into(),
682 },
683 );
684 state.peer_versions.insert(
685 NodeId("node-3".into()),
686 PeerVersionInfo {
687 wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
688 app_version: Some("2.3.1".into()),
689 adapter: "ractor".into(),
690 },
691 );
692
693 let total = state.node_count();
695 let on_latest = 1 + state.peer_versions.values()
697 .filter(|p| p.app_version.as_deref() == Some("2.3.1"))
698 .count();
699 assert_eq!(total, 3);
700 assert_eq!(on_latest, 2); }
702
703 #[tokio::test]
704 async fn test_static_seeds() {
705 let seeds = StaticSeeds::new(vec!["node1:4697".into(), "node2:4697".into()]);
706 let discovered = seeds.discover().await.unwrap();
707 assert_eq!(discovered.len(), 2);
708 assert_eq!(discovered[0].address, "node1:4697");
709 assert_eq!(discovered[0].node_id, NodeId("node1:4697".into()));
710 }
711
712 #[tokio::test]
713 async fn test_static_seeds_from_peers() {
714 let seeds = StaticSeeds::from_peers(vec![
715 DiscoveredPeer::new(NodeId("node-a".into()), "10.0.0.1:9000"),
716 DiscoveredPeer::new(NodeId("node-b".into()), "10.0.0.2:9000"),
717 ]);
718 let discovered = seeds.discover().await.unwrap();
719 assert_eq!(discovered.len(), 2);
720 assert_eq!(discovered[0].node_id, NodeId("node-a".into()));
721 assert_eq!(discovered[0].address, "10.0.0.1:9000");
722 assert_eq!(discovered[1].node_id, NodeId("node-b".into()));
723 }
724
725 #[test]
726 fn test_discovered_peer_from_address() {
727 let peer = DiscoveredPeer::from_address("10.0.0.1:9000");
728 assert_eq!(peer.node_id, NodeId("10.0.0.1:9000".into()));
729 assert_eq!(peer.address, "10.0.0.1:9000");
730 }
731
732 #[test]
733 fn test_wire_envelope_with_request_id() {
734 let envelope = WireEnvelope {
735 target: ActorId {
736 node: NodeId("n".into()),
737 local: 1,
738 },
739 target_name: "test".into(),
740 message_type: "Ask".into(),
741 send_mode: SendMode::Ask,
742 headers: WireHeaders::new(),
743 body: vec![],
744 request_id: Some(Uuid::new_v4()),
745 version: None,
746 };
747 assert!(envelope.request_id.is_some());
748 assert_eq!(envelope.send_mode, SendMode::Ask);
749 }
750
751 #[test]
752 fn test_header_registry_roundtrip() {
753 use crate::message::HeaderValue;
754 use std::any::Any;
755
756 #[derive(Debug, Clone)]
757 struct TraceId(String);
758 impl HeaderValue for TraceId {
759 fn header_name(&self) -> &'static str {
760 "trace-id"
761 }
762 fn to_bytes(&self) -> Option<Vec<u8>> {
763 Some(self.0.as_bytes().to_vec())
764 }
765 fn as_any(&self) -> &dyn Any {
766 self
767 }
768 }
769
770 let mut registry = HeaderRegistry::new();
772 registry.register("trace-id", |bytes: &[u8]| {
773 let s = String::from_utf8(bytes.to_vec()).ok()?;
774 Some(Box::new(TraceId(s)) as Box<dyn HeaderValue>)
775 });
776
777 assert_eq!(registry.len(), 1);
778 assert!(!registry.is_empty());
779
780 let mut headers = crate::message::Headers::new();
782 headers.insert(TraceId("abc-123".into()));
783 let wire = headers.to_wire();
784
785 assert_eq!(wire.len(), 1);
786 assert_eq!(wire.get("trace-id").unwrap(), b"abc-123");
787
788 let restored = wire.to_headers(®istry);
790 let trace = restored.get::<TraceId>().unwrap();
791 assert_eq!(trace.0, "abc-123");
792 }
793
794 #[test]
795 fn test_header_registry_missing_deserializer() {
796 let registry = HeaderRegistry::new();
797 assert!(registry.deserialize("unknown", &[]).is_none());
798 }
799
800 #[test]
801 fn test_headers_to_wire_skips_local_only() {
802 use crate::message::HeaderValue;
803 use std::any::Any;
804
805 #[derive(Debug)]
806 struct LocalOnlyHeader;
807 impl HeaderValue for LocalOnlyHeader {
808 fn header_name(&self) -> &'static str {
809 "local-only"
810 }
811 fn to_bytes(&self) -> Option<Vec<u8>> {
812 None
813 }
814 fn as_any(&self) -> &dyn Any {
815 self
816 }
817 }
818
819 let mut headers = crate::message::Headers::new();
820 headers.insert(LocalOnlyHeader);
821 let wire = headers.to_wire();
822 assert!(wire.is_empty());
823 }
824
825 #[test]
826 fn test_receive_envelope_body() {
827 let mut registry = crate::type_registry::TypeRegistry::new();
828 registry.register("test::Amount", |bytes: &[u8]| {
829 if bytes.len() != 8 {
830 return Err(SerializationError::new("expected 8 bytes"));
831 }
832 let val = u64::from_be_bytes(bytes.try_into().unwrap());
833 Ok(Box::new(val))
834 });
835
836 let envelope = WireEnvelope {
837 target: ActorId {
838 node: NodeId("n".into()),
839 local: 1,
840 },
841 target_name: "test".into(),
842 message_type: "test::Amount".into(),
843 send_mode: SendMode::Tell,
844 headers: WireHeaders::new(),
845 body: 42u64.to_be_bytes().to_vec(),
846 request_id: None,
847 version: None,
848 };
849
850 let any = receive_envelope_body(&envelope, ®istry).unwrap();
851 let val = any.downcast::<u64>().unwrap();
852 assert_eq!(*val, 42);
853 }
854
855 #[test]
856 fn test_receive_envelope_body_unknown_type() {
857 let registry = crate::type_registry::TypeRegistry::new();
858 let envelope = WireEnvelope {
859 target: ActorId {
860 node: NodeId("n".into()),
861 local: 1,
862 },
863 target_name: "test".into(),
864 message_type: "unknown::Type".into(),
865 send_mode: SendMode::Tell,
866 headers: WireHeaders::new(),
867 body: vec![],
868 request_id: None,
869 version: None,
870 };
871
872 let result = receive_envelope_body(&envelope, ®istry);
873 assert!(result.is_err());
874 assert!(result.unwrap_err().message.contains("no deserializer"));
875 }
876
877 #[test]
878 fn test_version_mismatch_with_handler() {
879 let mut registry = crate::type_registry::TypeRegistry::new();
880 registry.register("test::Versioned", |bytes: &[u8]| {
882 if bytes.len() != 8 {
883 return Err(SerializationError::new("expected 8 bytes"));
884 }
885 let val = u64::from_be_bytes(bytes.try_into().unwrap());
886 Ok(Box::new(val))
887 });
888
889 struct DoubleMigrator;
891 impl MessageVersionHandler for DoubleMigrator {
892 fn message_type(&self) -> &'static str {
893 "test::Versioned"
894 }
895 fn migrate(&self, payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
896 if payload.len() != 8 {
897 return None;
898 }
899 let val = u64::from_be_bytes(payload.try_into().unwrap());
900 Some((val * 2).to_be_bytes().to_vec())
901 }
902 }
903
904 let mut version_handlers: std::collections::HashMap<
905 String,
906 Box<dyn MessageVersionHandler>,
907 > = std::collections::HashMap::new();
908 version_handlers.insert("test::Versioned".into(), Box::new(DoubleMigrator));
909
910 let envelope = WireEnvelope {
911 target: ActorId {
912 node: NodeId("n".into()),
913 local: 1,
914 },
915 target_name: "test".into(),
916 message_type: "test::Versioned".into(),
917 send_mode: SendMode::Tell,
918 headers: WireHeaders::new(),
919 body: 21u64.to_be_bytes().to_vec(),
920 request_id: None,
921 version: Some(1), };
923
924 let any = receive_envelope_body_versioned(
925 &envelope,
926 ®istry,
927 &version_handlers,
928 Some(2), )
930 .unwrap();
931 let val = any.downcast::<u64>().unwrap();
932 assert_eq!(*val, 42); }
934
935 #[test]
936 fn test_version_match_skips_migration() {
937 let mut registry = crate::type_registry::TypeRegistry::new();
939 registry.register("test::Same", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
940
941 struct PanicMigrator;
942 impl MessageVersionHandler for PanicMigrator {
943 fn message_type(&self) -> &'static str {
944 "test::Same"
945 }
946 fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
947 panic!("migrate should not be called when versions match");
948 }
949 }
950
951 let mut version_handlers: std::collections::HashMap<
952 String,
953 Box<dyn MessageVersionHandler>,
954 > = std::collections::HashMap::new();
955 version_handlers.insert("test::Same".into(), Box::new(PanicMigrator));
956
957 let envelope = WireEnvelope {
958 target: ActorId {
959 node: NodeId("n".into()),
960 local: 1,
961 },
962 target_name: "test".into(),
963 message_type: "test::Same".into(),
964 send_mode: SendMode::Tell,
965 headers: WireHeaders::new(),
966 body: vec![1, 2, 3],
967 request_id: None,
968 version: Some(2),
969 };
970
971 let any = receive_envelope_body_versioned(
972 &envelope,
973 ®istry,
974 &version_handlers,
975 Some(2), )
977 .unwrap();
978 let val = any.downcast::<Vec<u8>>().unwrap();
979 assert_eq!(*val, vec![1, 2, 3]);
980 }
981
982 #[test]
985 fn test_version_mismatch_no_handler_falls_through() {
986 let mut registry = crate::type_registry::TypeRegistry::new();
989 registry.register("test::NoHandler", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
990
991 let version_handlers: std::collections::HashMap<String, Box<dyn MessageVersionHandler>> =
992 std::collections::HashMap::new();
993
994 let envelope = WireEnvelope {
995 target: ActorId {
996 node: NodeId("n".into()),
997 local: 1,
998 },
999 target_name: "test".into(),
1000 message_type: "test::NoHandler".into(),
1001 send_mode: SendMode::Tell,
1002 headers: WireHeaders::new(),
1003 body: vec![10, 20],
1004 request_id: None,
1005 version: Some(1), };
1007
1008 let any = receive_envelope_body_versioned(
1010 &envelope,
1011 ®istry,
1012 &version_handlers,
1013 Some(2),
1014 )
1015 .unwrap();
1016 let val = any.downcast::<Vec<u8>>().unwrap();
1017 assert_eq!(*val, vec![10, 20]);
1018 }
1019
1020 #[test]
1021 fn test_version_mismatch_handler_returns_none_rejects() {
1022 let mut registry = crate::type_registry::TypeRegistry::new();
1025 registry.register("test::FailMigrate", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1026
1027 struct RejectingMigrator;
1028 impl MessageVersionHandler for RejectingMigrator {
1029 fn message_type(&self) -> &'static str {
1030 "test::FailMigrate"
1031 }
1032 fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
1033 None }
1035 }
1036
1037 let mut version_handlers: std::collections::HashMap<
1038 String,
1039 Box<dyn MessageVersionHandler>,
1040 > = std::collections::HashMap::new();
1041 version_handlers.insert("test::FailMigrate".into(), Box::new(RejectingMigrator));
1042
1043 let envelope = WireEnvelope {
1044 target: ActorId {
1045 node: NodeId("n".into()),
1046 local: 1,
1047 },
1048 target_name: "test".into(),
1049 message_type: "test::FailMigrate".into(),
1050 send_mode: SendMode::Tell,
1051 headers: WireHeaders::new(),
1052 body: vec![1, 2, 3],
1053 request_id: None,
1054 version: Some(1), };
1056
1057 let result = receive_envelope_body_versioned(
1058 &envelope,
1059 ®istry,
1060 &version_handlers,
1061 Some(2), );
1063 assert!(result.is_err());
1064 let err = result.unwrap_err();
1065 assert!(
1066 err.message.contains("cannot migrate from v1 to v2"),
1067 "expected migration rejection, got: {}",
1068 err.message
1069 );
1070 }
1071
1072 #[test]
1073 fn test_version_none_on_sender_skips_migration() {
1074 let mut registry = crate::type_registry::TypeRegistry::new();
1077 registry.register("test::OptionalVersion", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1078
1079 struct PanicMigrator;
1081 impl MessageVersionHandler for PanicMigrator {
1082 fn message_type(&self) -> &'static str {
1083 "test::OptionalVersion"
1084 }
1085 fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
1086 panic!("migrate should not be called when sender has no version");
1087 }
1088 }
1089
1090 let mut version_handlers: std::collections::HashMap<
1091 String,
1092 Box<dyn MessageVersionHandler>,
1093 > = std::collections::HashMap::new();
1094 version_handlers.insert("test::OptionalVersion".into(), Box::new(PanicMigrator));
1095
1096 let envelope = WireEnvelope {
1097 target: ActorId {
1098 node: NodeId("n".into()),
1099 local: 1,
1100 },
1101 target_name: "test".into(),
1102 message_type: "test::OptionalVersion".into(),
1103 send_mode: SendMode::Tell,
1104 headers: WireHeaders::new(),
1105 body: vec![7, 8, 9],
1106 request_id: None,
1107 version: None, };
1109
1110 let any = receive_envelope_body_versioned(
1111 &envelope,
1112 ®istry,
1113 &version_handlers,
1114 Some(2), )
1116 .unwrap();
1117 let val = any.downcast::<Vec<u8>>().unwrap();
1118 assert_eq!(*val, vec![7, 8, 9]);
1119 }
1120
1121 #[test]
1122 fn test_version_none_on_both_sides_skips_migration() {
1123 let mut registry = crate::type_registry::TypeRegistry::new();
1125 registry.register("test::NoVersion", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1126
1127 let version_handlers: std::collections::HashMap<String, Box<dyn MessageVersionHandler>> =
1128 std::collections::HashMap::new();
1129
1130 let envelope = WireEnvelope {
1131 target: ActorId {
1132 node: NodeId("n".into()),
1133 local: 1,
1134 },
1135 target_name: "test".into(),
1136 message_type: "test::NoVersion".into(),
1137 send_mode: SendMode::Tell,
1138 headers: WireHeaders::new(),
1139 body: vec![4, 5, 6],
1140 request_id: None,
1141 version: None,
1142 };
1143
1144 let any = receive_envelope_body_versioned(
1145 &envelope,
1146 ®istry,
1147 &version_handlers,
1148 None, )
1150 .unwrap();
1151 let val = any.downcast::<Vec<u8>>().unwrap();
1152 assert_eq!(*val, vec![4, 5, 6]);
1153 }
1154
1155 #[test]
1156 fn test_version_none_on_receiver_skips_migration() {
1157 let mut registry = crate::type_registry::TypeRegistry::new();
1160 registry.register("test::ReceiverNone", |bytes: &[u8]| Ok(Box::new(bytes.to_vec())));
1161
1162 struct PanicMigrator;
1163 impl MessageVersionHandler for PanicMigrator {
1164 fn message_type(&self) -> &'static str {
1165 "test::ReceiverNone"
1166 }
1167 fn migrate(&self, _payload: &[u8], _from: u32, _to: u32) -> Option<Vec<u8>> {
1168 panic!("migrate should not be called when receiver has no version expectation");
1169 }
1170 }
1171
1172 let mut version_handlers: std::collections::HashMap<
1173 String,
1174 Box<dyn MessageVersionHandler>,
1175 > = std::collections::HashMap::new();
1176 version_handlers.insert("test::ReceiverNone".into(), Box::new(PanicMigrator));
1177
1178 let envelope = WireEnvelope {
1179 target: ActorId {
1180 node: NodeId("n".into()),
1181 local: 1,
1182 },
1183 target_name: "test".into(),
1184 message_type: "test::ReceiverNone".into(),
1185 send_mode: SendMode::Tell,
1186 headers: WireHeaders::new(),
1187 body: vec![11, 22, 33],
1188 request_id: None,
1189 version: Some(3), };
1191
1192 let any = receive_envelope_body_versioned(
1193 &envelope,
1194 ®istry,
1195 &version_handlers,
1196 None, )
1198 .unwrap();
1199 let val = any.downcast::<Vec<u8>>().unwrap();
1200 assert_eq!(*val, vec![11, 22, 33]);
1201 }
1202
1203 #[test]
1204 fn test_version_backward_migration_v2_to_v1() {
1205 let mut registry = crate::type_registry::TypeRegistry::new();
1207 registry.register("test::Backward", |bytes: &[u8]| {
1208 if bytes.len() != 8 {
1209 return Err(SerializationError::new("expected 8 bytes"));
1210 }
1211 let val = u64::from_be_bytes(bytes.try_into().unwrap());
1212 Ok(Box::new(val))
1213 });
1214
1215 struct HalveMigrator;
1216 impl MessageVersionHandler for HalveMigrator {
1217 fn message_type(&self) -> &'static str {
1218 "test::Backward"
1219 }
1220 fn migrate(&self, payload: &[u8], from: u32, to: u32) -> Option<Vec<u8>> {
1221 if from > to {
1222 let val = u64::from_be_bytes(payload.try_into().ok()?);
1224 Some((val / 2).to_be_bytes().to_vec())
1225 } else {
1226 None
1227 }
1228 }
1229 }
1230
1231 let mut version_handlers: std::collections::HashMap<
1232 String,
1233 Box<dyn MessageVersionHandler>,
1234 > = std::collections::HashMap::new();
1235 version_handlers.insert("test::Backward".into(), Box::new(HalveMigrator));
1236
1237 let envelope = WireEnvelope {
1238 target: ActorId {
1239 node: NodeId("n".into()),
1240 local: 1,
1241 },
1242 target_name: "test".into(),
1243 message_type: "test::Backward".into(),
1244 send_mode: SendMode::Tell,
1245 headers: WireHeaders::new(),
1246 body: 100u64.to_be_bytes().to_vec(),
1247 request_id: None,
1248 version: Some(2), };
1250
1251 let any = receive_envelope_body_versioned(
1252 &envelope,
1253 ®istry,
1254 &version_handlers,
1255 Some(1), )
1257 .unwrap();
1258 let val = any.downcast::<u64>().unwrap();
1259 assert_eq!(*val, 50); }
1261
1262 #[cfg(feature = "serde")]
1263 mod serde_tests {
1264 use super::*;
1265
1266 #[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
1267 struct Increment {
1268 amount: u64,
1269 }
1270
1271 #[test]
1272 fn json_serializer_roundtrip() {
1273 let msg = Increment { amount: 42 };
1274 let bytes = JsonSerializer::serialize_typed(&msg).unwrap();
1275 let deserialized: Increment = JsonSerializer::deserialize_typed(&bytes).unwrap();
1276 assert_eq!(deserialized, msg);
1277 }
1278
1279 #[test]
1280 fn json_serializer_invalid_bytes() {
1281 let result = JsonSerializer::deserialize_typed::<Increment>(b"not json");
1282 assert!(result.is_err());
1283 assert!(result.unwrap_err().message.contains("json deserialize"));
1284 }
1285
1286 #[test]
1287 fn build_tell_envelope_roundtrip() {
1288 let target = ActorId {
1289 node: NodeId("node-2".into()),
1290 local: 7,
1291 };
1292 let msg = Increment { amount: 100 };
1293 let envelope =
1294 build_tell_envelope(target.clone(), "counter", &msg, WireHeaders::new()).unwrap();
1295
1296 assert_eq!(envelope.target, target);
1297 assert_eq!(envelope.send_mode, SendMode::Tell);
1298 assert!(envelope.request_id.is_none());
1299 assert!(envelope.message_type.contains("Increment"));
1300
1301 let deserialized: Increment =
1303 JsonSerializer::deserialize_typed(&envelope.body).unwrap();
1304 assert_eq!(deserialized.amount, 100);
1305 }
1306
1307 #[test]
1308 fn build_ask_envelope_roundtrip() {
1309 let target = ActorId {
1310 node: NodeId("node-3".into()),
1311 local: 42,
1312 };
1313 let msg = Increment { amount: 5 };
1314 let request_id = Uuid::new_v4();
1315 let envelope = build_ask_envelope(
1316 target.clone(),
1317 "counter",
1318 &msg,
1319 WireHeaders::new(),
1320 request_id,
1321 )
1322 .unwrap();
1323
1324 assert_eq!(envelope.target, target);
1325 assert_eq!(envelope.send_mode, SendMode::Ask);
1326 assert_eq!(envelope.request_id, Some(request_id));
1327 }
1328
1329 #[test]
1330 fn full_pipeline_send_and_receive() {
1331 let target = ActorId {
1333 node: NodeId("node-2".into()),
1334 local: 1,
1335 };
1336 let msg = Increment { amount: 77 };
1337 let envelope =
1338 build_tell_envelope(target, "counter", &msg, WireHeaders::new()).unwrap();
1339
1340 let mut registry = crate::type_registry::TypeRegistry::new();
1342 registry.register_type::<Increment>();
1343
1344 let any = receive_envelope_body(&envelope, ®istry).unwrap();
1346 let received = any.downcast::<Increment>().unwrap();
1347 assert_eq!(received.amount, 77);
1348 }
1349
1350 #[test]
1351 fn full_pipeline_with_headers() {
1352 use crate::message::HeaderValue;
1353 use std::any::Any;
1354
1355 #[derive(Debug, Clone)]
1356 struct Priority(u8);
1357 impl HeaderValue for Priority {
1358 fn header_name(&self) -> &'static str {
1359 "priority"
1360 }
1361 fn to_bytes(&self) -> Option<Vec<u8>> {
1362 Some(vec![self.0])
1363 }
1364 fn as_any(&self) -> &dyn Any {
1365 self
1366 }
1367 }
1368
1369 let mut headers = crate::message::Headers::new();
1371 headers.insert(Priority(5));
1372 let wire_headers = headers.to_wire();
1373
1374 let target = ActorId {
1375 node: NodeId("node-2".into()),
1376 local: 1,
1377 };
1378 let msg = Increment { amount: 10 };
1379 let envelope = build_tell_envelope(target, "counter", &msg, wire_headers).unwrap();
1380
1381 let mut header_registry = HeaderRegistry::new();
1383 header_registry.register("priority", |bytes: &[u8]| {
1384 if bytes.len() != 1 {
1385 return None;
1386 }
1387 Some(Box::new(Priority(bytes[0])) as Box<dyn HeaderValue>)
1388 });
1389
1390 let restored_headers = envelope.headers.to_headers(&header_registry);
1391 let priority = restored_headers.get::<Priority>().unwrap();
1392 assert_eq!(priority.0, 5);
1393 }
1394 }
1395}