1use std::collections::{HashMap, HashSet};
22
23use crate::node::{ActorId, NodeId};
24use crate::remote::SerializationError;
25
26pub const SYSTEM_MSG_TYPE_SPAWN: &str = "dactor::system_actors::SpawnRequest";
45
46pub const SYSTEM_MSG_TYPE_WATCH: &str = "dactor::system_actors::WatchRequest";
50
51pub const SYSTEM_MSG_TYPE_UNWATCH: &str = "dactor::system_actors::UnwatchRequest";
55
56pub const SYSTEM_MSG_TYPE_CANCEL: &str = "dactor::system_actors::CancelRequest";
60
61pub const SYSTEM_MSG_TYPE_CONNECT_PEER: &str = "dactor::system_actors::ConnectPeer";
65
66pub const SYSTEM_MSG_TYPE_DISCONNECT_PEER: &str = "dactor::system_actors::DisconnectPeer";
70
71pub fn is_system_message_type(message_type: &str) -> bool {
73 matches!(
74 message_type,
75 SYSTEM_MSG_TYPE_SPAWN
76 | SYSTEM_MSG_TYPE_WATCH
77 | SYSTEM_MSG_TYPE_UNWATCH
78 | SYSTEM_MSG_TYPE_CANCEL
79 | SYSTEM_MSG_TYPE_CONNECT_PEER
80 | SYSTEM_MSG_TYPE_DISCONNECT_PEER
81 )
82}
83
84#[derive(Debug, Clone)]
90#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
91pub struct SpawnRequest {
92 pub type_name: String,
94 pub args_bytes: Vec<u8>,
96 pub name: String,
98 pub request_id: String,
100}
101
102#[derive(Debug, Clone)]
104#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
105pub enum SpawnResponse {
106 Success {
108 request_id: String,
110 actor_id: ActorId,
112 },
113 Failure {
115 request_id: String,
117 error: String,
119 },
120}
121
122pub struct SpawnManager {
129 type_registry: crate::type_registry::TypeRegistry,
131 spawned: Vec<ActorId>,
133}
134
135impl SpawnManager {
136 pub fn new(type_registry: crate::type_registry::TypeRegistry) -> Self {
138 Self {
139 type_registry,
140 spawned: Vec::new(),
141 }
142 }
143
144 pub fn create_actor(
151 &self,
152 request: &SpawnRequest,
153 ) -> Result<Box<dyn std::any::Any + Send>, SerializationError> {
154 self.type_registry
155 .create_actor(&request.type_name, &request.args_bytes)
156 }
157
158 pub fn record_spawn(&mut self, id: ActorId) {
160 self.spawned.push(id);
161 }
162
163 pub fn spawned_actors(&self) -> &[ActorId] {
165 &self.spawned
166 }
167
168 pub fn type_registry(&self) -> &crate::type_registry::TypeRegistry {
170 &self.type_registry
171 }
172
173 pub fn type_registry_mut(&mut self) -> &mut crate::type_registry::TypeRegistry {
175 &mut self.type_registry
176 }
177}
178
179#[derive(Debug, Clone)]
185#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
186pub struct WatchRequest {
187 pub target: ActorId,
189 pub watcher: ActorId,
191}
192
193#[derive(Debug, Clone)]
195#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
196pub struct UnwatchRequest {
197 pub target: ActorId,
199 pub watcher: ActorId,
201}
202
203#[derive(Debug, Clone)]
205#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
206pub struct WatchNotification {
207 pub terminated: ActorId,
209 pub watcher: ActorId,
211}
212
213pub struct WatchManager {
219 watchers: HashMap<ActorId, HashSet<ActorId>>,
221}
222
223impl WatchManager {
224 pub fn new() -> Self {
226 Self {
227 watchers: HashMap::new(),
228 }
229 }
230
231 pub fn watch(&mut self, target: ActorId, watcher: ActorId) {
233 self.watchers.entry(target).or_default().insert(watcher);
234 }
235
236 pub fn unwatch(&mut self, target: &ActorId, watcher: &ActorId) {
238 if let Some(set) = self.watchers.get_mut(target) {
239 set.remove(watcher);
240 if set.is_empty() {
241 self.watchers.remove(target);
242 }
243 }
244 }
245
246 pub fn on_terminated(&mut self, terminated: &ActorId) -> Vec<WatchNotification> {
249 self.watchers
250 .remove(terminated)
251 .unwrap_or_default()
252 .into_iter()
253 .map(|watcher| WatchNotification {
254 terminated: terminated.clone(),
255 watcher,
256 })
257 .collect()
258 }
259
260 pub fn watchers_of(&self, target: &ActorId) -> Vec<ActorId> {
262 self.watchers
263 .get(target)
264 .map(|s| s.iter().cloned().collect())
265 .unwrap_or_default()
266 }
267
268 pub fn watched_count(&self) -> usize {
270 self.watchers.len()
271 }
272}
273
274impl Default for WatchManager {
275 fn default() -> Self {
276 Self::new()
277 }
278}
279
280#[derive(Debug, Clone)]
286#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
287pub struct CancelRequest {
288 pub target: ActorId,
290 pub request_id: Option<String>,
292}
293
294#[derive(Debug, Clone)]
296#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
297pub enum CancelResponse {
298 Acknowledged,
300 NotFound {
302 reason: String,
304 },
305}
306
307pub struct CancelManager {
312 tokens: HashMap<String, tokio_util::sync::CancellationToken>,
314}
315
316impl CancelManager {
317 pub fn new() -> Self {
319 Self {
320 tokens: HashMap::new(),
321 }
322 }
323
324 pub fn register(&mut self, request_id: String, token: tokio_util::sync::CancellationToken) {
326 self.tokens.insert(request_id, token);
327 }
328
329 pub fn cancel(&mut self, request_id: &str) -> CancelResponse {
331 if let Some(token) = self.tokens.remove(request_id) {
332 token.cancel();
333 CancelResponse::Acknowledged
334 } else {
335 CancelResponse::NotFound {
336 reason: format!("no active request with id '{request_id}'"),
337 }
338 }
339 }
340
341 pub fn remove(&mut self, request_id: &str) {
343 self.tokens.remove(request_id);
344 }
345
346 pub fn active_count(&self) -> usize {
348 self.tokens.len()
349 }
350}
351
352impl Default for CancelManager {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358#[derive(Debug, Clone, Copy, PartialEq, Eq)]
364#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
365pub enum PeerStatus {
366 Connecting,
368 Connected,
370 Unreachable,
372 Disconnected,
374}
375
376#[derive(Debug, Clone)]
378pub struct PeerInfo {
379 pub node_id: NodeId,
381 pub status: PeerStatus,
383 pub address: Option<String>,
385}
386
387pub struct NodeDirectory {
393 peers: HashMap<NodeId, PeerInfo>,
394}
395
396impl NodeDirectory {
397 pub fn new() -> Self {
399 Self {
400 peers: HashMap::new(),
401 }
402 }
403
404 pub fn add_peer(&mut self, node_id: NodeId, address: Option<String>) {
406 self.peers.insert(
407 node_id.clone(),
408 PeerInfo {
409 node_id,
410 status: PeerStatus::Connecting,
411 address,
412 },
413 );
414 }
415
416 pub fn set_status(&mut self, node_id: &NodeId, status: PeerStatus) {
418 if let Some(info) = self.peers.get_mut(node_id) {
419 info.status = status;
420 }
421 }
422
423 pub fn remove_peer(&mut self, node_id: &NodeId) {
425 self.peers.remove(node_id);
426 }
427
428 pub fn get_peer(&self, node_id: &NodeId) -> Option<&PeerInfo> {
430 self.peers.get(node_id)
431 }
432
433 pub fn peer_nodes(&self) -> Vec<NodeId> {
435 self.peers.keys().cloned().collect()
436 }
437
438 pub fn peers_with_status(&self, status: PeerStatus) -> Vec<&PeerInfo> {
440 self.peers.values().filter(|p| p.status == status).collect()
441 }
442
443 pub fn peer_count(&self) -> usize {
445 self.peers.len()
446 }
447
448 pub fn connected_count(&self) -> usize {
450 self.peers
451 .values()
452 .filter(|p| p.status == PeerStatus::Connected)
453 .count()
454 }
455
456 pub fn is_connected(&self, node_id: &NodeId) -> bool {
458 self.peers
459 .get(node_id)
460 .map(|p| p.status == PeerStatus::Connected)
461 .unwrap_or(false)
462 }
463}
464
465impl Default for NodeDirectory {
466 fn default() -> Self {
467 Self::new()
468 }
469}
470
471#[derive(Debug, Clone, PartialEq, Eq)]
481pub struct HandshakeRequest {
482 pub node_id: NodeId,
484 pub wire_version: crate::version::WireVersion,
486 pub app_version: Option<String>,
489 pub adapter: String,
491}
492
493impl HandshakeRequest {
494 pub fn from_runtime(
501 node_id: NodeId,
502 app_version: Option<String>,
503 adapter: impl Into<String>,
504 ) -> Self {
505 Self {
506 node_id,
507 wire_version: crate::version::WireVersion::parse(
508 crate::version::DACTOR_WIRE_VERSION,
509 )
510 .expect("DACTOR_WIRE_VERSION must be valid"),
511 app_version,
512 adapter: adapter.into(),
513 }
514 }
515}
516
517#[derive(Debug, Clone, PartialEq, Eq)]
522pub enum HandshakeResponse {
523 Accepted {
525 node_id: NodeId,
527 wire_version: crate::version::WireVersion,
529 app_version: Option<String>,
531 adapter: String,
533 },
534 Rejected {
536 node_id: NodeId,
538 wire_version: crate::version::WireVersion,
540 reason: RejectionReason,
542 detail: String,
544 },
545}
546
547#[derive(Debug, Clone, Copy, PartialEq, Eq)]
549pub enum RejectionReason {
550 IncompatibleProtocol,
553 IncompatibleAdapter,
556}
557
558impl std::fmt::Display for RejectionReason {
559 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560 match self {
561 RejectionReason::IncompatibleProtocol => write!(f, "incompatible wire protocol"),
562 RejectionReason::IncompatibleAdapter => write!(f, "incompatible adapter"),
563 }
564 }
565}
566
567pub fn validate_handshake(
575 local: &HandshakeRequest,
576 remote: &HandshakeRequest,
577) -> HandshakeResponse {
578 if !local.wire_version.is_compatible(&remote.wire_version) {
580 return HandshakeResponse::Rejected {
581 node_id: local.node_id.clone(),
582 wire_version: local.wire_version,
583 reason: RejectionReason::IncompatibleProtocol,
584 detail: format!(
585 "wire version {remote} is incompatible with local {local} \
586 (different MAJOR version)",
587 remote = remote.wire_version,
588 local = local.wire_version,
589 ),
590 };
591 }
592
593 if local.adapter != remote.adapter {
595 return HandshakeResponse::Rejected {
596 node_id: local.node_id.clone(),
597 wire_version: local.wire_version,
598 reason: RejectionReason::IncompatibleAdapter,
599 detail: format!(
600 "adapter \"{remote}\" does not match local \"{local}\"",
601 remote = remote.adapter,
602 local = local.adapter,
603 ),
604 };
605 }
606
607 HandshakeResponse::Accepted {
609 node_id: local.node_id.clone(),
610 wire_version: local.wire_version,
611 app_version: local.app_version.clone(),
612 adapter: local.adapter.clone(),
613 }
614}
615
616pub fn verify_peer_identity(
626 expected: &NodeId,
627 response: &HandshakeResponse,
628) -> Result<(), String> {
629 match response {
630 HandshakeResponse::Accepted { node_id, .. } => {
631 if node_id != expected {
632 Err(format!(
633 "peer identity mismatch: expected {expected}, got {node_id}"
634 ))
635 } else {
636 Ok(())
637 }
638 }
639 HandshakeResponse::Rejected { .. } => {
640 Ok(())
642 }
643 }
644}
645
646#[derive(Debug, Clone)]
676pub struct SystemActorConfig {
677 pub spawn_manager_mailbox: crate::mailbox::MailboxConfig,
679
680 pub spawn_manager_pool_size: Option<usize>,
691
692 pub control_plane_mailbox: crate::mailbox::MailboxConfig,
705}
706
707impl Default for SystemActorConfig {
708 fn default() -> Self {
709 Self {
710 spawn_manager_mailbox: crate::mailbox::MailboxConfig::Unbounded,
711 spawn_manager_pool_size: None,
712 control_plane_mailbox: crate::mailbox::MailboxConfig::Unbounded,
713 }
714 }
715}
716
717impl SystemActorConfig {
718 pub fn with_spawn_manager_mailbox(mut self, mailbox: crate::mailbox::MailboxConfig) -> Self {
720 self.spawn_manager_mailbox = mailbox;
721 self
722 }
723
724 pub fn with_spawn_manager_pool_size(mut self, size: usize) -> Self {
728 self.spawn_manager_pool_size = Some(size);
729 self
730 }
731
732 pub fn with_control_plane_mailbox(mut self, mailbox: crate::mailbox::MailboxConfig) -> Self {
735 self.control_plane_mailbox = mailbox;
736 self
737 }
738}
739
740#[cfg(test)]
745mod tests {
746 use super::*;
747
748 #[test]
751 fn spawn_manager_create_actor() {
752 let mut registry = crate::type_registry::TypeRegistry::new();
753 registry.register_factory("test::Worker", |bytes: &[u8]| {
754 if bytes.len() != 8 {
755 return Err(SerializationError::new("expected 8 bytes"));
756 }
757 let val = u64::from_be_bytes(bytes.try_into().unwrap());
758 Ok(Box::new(val))
759 });
760
761 let manager = SpawnManager::new(registry);
762 let request = SpawnRequest {
763 type_name: "test::Worker".into(),
764 args_bytes: 42u64.to_be_bytes().to_vec(),
765 name: "worker-1".into(),
766 request_id: "req-1".into(),
767 };
768
769 let actor = manager.create_actor(&request).unwrap();
770 let val = actor.downcast::<u64>().unwrap();
771 assert_eq!(*val, 42);
772 }
773
774 #[test]
775 fn spawn_manager_unknown_type() {
776 let registry = crate::type_registry::TypeRegistry::new();
777 let manager = SpawnManager::new(registry);
778 let request = SpawnRequest {
779 type_name: "unknown::Type".into(),
780 args_bytes: vec![],
781 name: "x".into(),
782 request_id: "req-2".into(),
783 };
784
785 let result = manager.create_actor(&request);
786 assert!(result.is_err());
787 }
788
789 #[test]
790 fn spawn_manager_records_spawned() {
791 let registry = crate::type_registry::TypeRegistry::new();
792 let mut manager = SpawnManager::new(registry);
793 assert!(manager.spawned_actors().is_empty());
794
795 let id = ActorId {
796 node: NodeId("n1".into()),
797 local: 1,
798 };
799 manager.record_spawn(id.clone());
800 assert_eq!(manager.spawned_actors().len(), 1);
801 assert_eq!(manager.spawned_actors()[0], id);
802 }
803
804 #[test]
807 fn watch_and_terminate() {
808 let mut wm = WatchManager::new();
809 let target = ActorId {
810 node: NodeId("n1".into()),
811 local: 1,
812 };
813 let watcher = ActorId {
814 node: NodeId("n2".into()),
815 local: 10,
816 };
817
818 wm.watch(target.clone(), watcher.clone());
819 assert_eq!(wm.watched_count(), 1);
820 assert_eq!(wm.watchers_of(&target).len(), 1);
821
822 let notifications = wm.on_terminated(&target);
823 assert_eq!(notifications.len(), 1);
824 assert_eq!(notifications[0].terminated, target);
825 assert_eq!(notifications[0].watcher, watcher);
826 assert_eq!(wm.watched_count(), 0);
827 }
828
829 #[test]
830 fn watch_multiple_watchers() {
831 let mut wm = WatchManager::new();
832 let target = ActorId {
833 node: NodeId("n1".into()),
834 local: 1,
835 };
836 let w1 = ActorId {
837 node: NodeId("n2".into()),
838 local: 10,
839 };
840 let w2 = ActorId {
841 node: NodeId("n3".into()),
842 local: 20,
843 };
844
845 wm.watch(target.clone(), w1.clone());
846 wm.watch(target.clone(), w2.clone());
847 assert_eq!(wm.watchers_of(&target).len(), 2);
848
849 let notifications = wm.on_terminated(&target);
850 assert_eq!(notifications.len(), 2);
851 }
852
853 #[test]
854 fn unwatch_removes_subscription() {
855 let mut wm = WatchManager::new();
856 let target = ActorId {
857 node: NodeId("n1".into()),
858 local: 1,
859 };
860 let watcher = ActorId {
861 node: NodeId("n2".into()),
862 local: 10,
863 };
864
865 wm.watch(target.clone(), watcher.clone());
866 wm.unwatch(&target, &watcher);
867 assert_eq!(wm.watched_count(), 0);
868
869 let notifications = wm.on_terminated(&target);
870 assert!(notifications.is_empty());
871 }
872
873 #[test]
874 fn terminate_unwatched_actor_returns_empty() {
875 let mut wm = WatchManager::new();
876 let target = ActorId {
877 node: NodeId("n1".into()),
878 local: 99,
879 };
880 let notifications = wm.on_terminated(&target);
881 assert!(notifications.is_empty());
882 }
883
884 #[test]
887 fn cancel_registered_request() {
888 let mut cm = CancelManager::new();
889 let token = tokio_util::sync::CancellationToken::new();
890 let token_check = token.clone();
891
892 cm.register("req-1".into(), token);
893 assert_eq!(cm.active_count(), 1);
894
895 let response = cm.cancel("req-1");
896 assert!(matches!(response, CancelResponse::Acknowledged));
897 assert!(token_check.is_cancelled());
898 assert_eq!(cm.active_count(), 0);
899 }
900
901 #[test]
902 fn cancel_unknown_request_returns_not_found() {
903 let mut cm = CancelManager::new();
904 let response = cm.cancel("nonexistent");
905 assert!(matches!(response, CancelResponse::NotFound { .. }));
906 }
907
908 #[test]
909 fn remove_cleans_up_token() {
910 let mut cm = CancelManager::new();
911 let token = tokio_util::sync::CancellationToken::new();
912 cm.register("req-1".into(), token);
913 assert_eq!(cm.active_count(), 1);
914
915 cm.remove("req-1");
916 assert_eq!(cm.active_count(), 0);
917 }
918
919 #[test]
922 fn add_and_query_peers() {
923 let mut dir = NodeDirectory::new();
924 dir.add_peer(NodeId("n1".into()), Some("10.0.0.1:4697".into()));
925 dir.add_peer(NodeId("n2".into()), None);
926
927 assert_eq!(dir.peer_count(), 2);
928 assert!(!dir.is_connected(&NodeId("n1".into())));
929
930 let info = dir.get_peer(&NodeId("n1".into())).unwrap();
931 assert_eq!(info.status, PeerStatus::Connecting);
932 assert_eq!(info.address.as_deref(), Some("10.0.0.1:4697"));
933 }
934
935 #[test]
936 fn set_status_and_filter() {
937 let mut dir = NodeDirectory::new();
938 dir.add_peer(NodeId("n1".into()), None);
939 dir.add_peer(NodeId("n2".into()), None);
940 dir.add_peer(NodeId("n3".into()), None);
941
942 dir.set_status(&NodeId("n1".into()), PeerStatus::Connected);
943 dir.set_status(&NodeId("n2".into()), PeerStatus::Connected);
944 dir.set_status(&NodeId("n3".into()), PeerStatus::Unreachable);
945
946 assert_eq!(dir.connected_count(), 2);
947 assert!(dir.is_connected(&NodeId("n1".into())));
948 assert!(!dir.is_connected(&NodeId("n3".into())));
949
950 let connected = dir.peers_with_status(PeerStatus::Connected);
951 assert_eq!(connected.len(), 2);
952
953 let unreachable = dir.peers_with_status(PeerStatus::Unreachable);
954 assert_eq!(unreachable.len(), 1);
955 }
956
957 #[test]
958 fn remove_peer() {
959 let mut dir = NodeDirectory::new();
960 dir.add_peer(NodeId("n1".into()), None);
961 assert_eq!(dir.peer_count(), 1);
962
963 dir.remove_peer(&NodeId("n1".into()));
964 assert_eq!(dir.peer_count(), 0);
965 assert!(dir.get_peer(&NodeId("n1".into())).is_none());
966 }
967
968 #[test]
969 fn peer_nodes_list() {
970 let mut dir = NodeDirectory::new();
971 dir.add_peer(NodeId("n1".into()), None);
972 dir.add_peer(NodeId("n2".into()), None);
973
974 let nodes = dir.peer_nodes();
975 assert_eq!(nodes.len(), 2);
976 }
977
978 #[test]
979 fn spawn_response_variants() {
980 let success = SpawnResponse::Success {
981 request_id: "r1".into(),
982 actor_id: ActorId {
983 node: NodeId("n1".into()),
984 local: 42,
985 },
986 };
987 assert!(matches!(success, SpawnResponse::Success { .. }));
988
989 let failure = SpawnResponse::Failure {
990 request_id: "r2".into(),
991 error: "type not found".into(),
992 };
993 assert!(matches!(failure, SpawnResponse::Failure { .. }));
994 }
995
996 #[test]
997 fn watch_notification_fields() {
998 let notif = WatchNotification {
999 terminated: ActorId {
1000 node: NodeId("n1".into()),
1001 local: 1,
1002 },
1003 watcher: ActorId {
1004 node: NodeId("n2".into()),
1005 local: 2,
1006 },
1007 };
1008 assert_eq!(notif.terminated.local, 1);
1009 assert_eq!(notif.watcher.local, 2);
1010 }
1011
1012 #[test]
1013 fn peer_status_transitions() {
1014 let mut dir = NodeDirectory::new();
1015 dir.add_peer(NodeId("n1".into()), None);
1016
1017 assert_eq!(
1019 dir.get_peer(&NodeId("n1".into())).unwrap().status,
1020 PeerStatus::Connecting
1021 );
1022 dir.set_status(&NodeId("n1".into()), PeerStatus::Connected);
1023 assert_eq!(
1024 dir.get_peer(&NodeId("n1".into())).unwrap().status,
1025 PeerStatus::Connected
1026 );
1027 dir.set_status(&NodeId("n1".into()), PeerStatus::Unreachable);
1028 assert_eq!(
1029 dir.get_peer(&NodeId("n1".into())).unwrap().status,
1030 PeerStatus::Unreachable
1031 );
1032 dir.set_status(&NodeId("n1".into()), PeerStatus::Disconnected);
1033 assert_eq!(
1034 dir.get_peer(&NodeId("n1".into())).unwrap().status,
1035 PeerStatus::Disconnected
1036 );
1037 }
1038
1039 #[test]
1046 fn wire_protocol_constants_are_stable() {
1047 assert_eq!(
1048 SYSTEM_MSG_TYPE_SPAWN,
1049 "dactor::system_actors::SpawnRequest",
1050 "SYSTEM_MSG_TYPE_SPAWN is a wire protocol value — do not change"
1051 );
1052 assert_eq!(
1053 SYSTEM_MSG_TYPE_WATCH,
1054 "dactor::system_actors::WatchRequest",
1055 "SYSTEM_MSG_TYPE_WATCH is a wire protocol value — do not change"
1056 );
1057 assert_eq!(
1058 SYSTEM_MSG_TYPE_UNWATCH,
1059 "dactor::system_actors::UnwatchRequest",
1060 "SYSTEM_MSG_TYPE_UNWATCH is a wire protocol value — do not change"
1061 );
1062 assert_eq!(
1063 SYSTEM_MSG_TYPE_CANCEL,
1064 "dactor::system_actors::CancelRequest",
1065 "SYSTEM_MSG_TYPE_CANCEL is a wire protocol value — do not change"
1066 );
1067 assert_eq!(
1068 SYSTEM_MSG_TYPE_CONNECT_PEER,
1069 "dactor::system_actors::ConnectPeer",
1070 "SYSTEM_MSG_TYPE_CONNECT_PEER is a wire protocol value — do not change"
1071 );
1072 assert_eq!(
1073 SYSTEM_MSG_TYPE_DISCONNECT_PEER,
1074 "dactor::system_actors::DisconnectPeer",
1075 "SYSTEM_MSG_TYPE_DISCONNECT_PEER is a wire protocol value — do not change"
1076 );
1077 }
1078
1079 fn make_handshake_request(
1082 node_id: &str,
1083 wire: &str,
1084 adapter: &str,
1085 app_version: Option<&str>,
1086 ) -> HandshakeRequest {
1087 HandshakeRequest {
1088 node_id: NodeId(node_id.into()),
1089 wire_version: crate::version::WireVersion::parse(wire).unwrap(),
1090 app_version: app_version.map(|s| s.into()),
1091 adapter: adapter.into(),
1092 }
1093 }
1094
1095 #[test]
1096 fn handshake_same_version_and_adapter_accepted() {
1097 let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1098 let remote = make_handshake_request("node-b", "0.2.0", "ractor", None);
1099 let resp = validate_handshake(&local, &remote);
1100 assert!(matches!(resp, HandshakeResponse::Accepted { .. }));
1101 }
1102
1103 #[test]
1104 fn handshake_same_major_different_minor_accepted() {
1105 let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1106 let remote = make_handshake_request("node-b", "0.3.0", "ractor", None);
1107 let resp = validate_handshake(&local, &remote);
1108 assert!(matches!(resp, HandshakeResponse::Accepted { .. }));
1109 }
1110
1111 #[test]
1112 fn handshake_different_major_rejected() {
1113 let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1114 let remote = make_handshake_request("node-b", "1.0.0", "ractor", None);
1115 let resp = validate_handshake(&local, &remote);
1116 match resp {
1117 HandshakeResponse::Rejected { reason, detail, .. } => {
1118 assert_eq!(reason, RejectionReason::IncompatibleProtocol);
1119 assert!(detail.contains("MAJOR"));
1120 }
1121 _ => panic!("expected Rejected"),
1122 }
1123 }
1124
1125 #[test]
1126 fn handshake_different_adapter_rejected() {
1127 let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1128 let remote = make_handshake_request("node-b", "0.2.0", "kameo", None);
1129 let resp = validate_handshake(&local, &remote);
1130 match resp {
1131 HandshakeResponse::Rejected { reason, detail, .. } => {
1132 assert_eq!(reason, RejectionReason::IncompatibleAdapter);
1133 assert!(detail.contains("kameo"));
1134 assert!(detail.contains("ractor"));
1135 }
1136 _ => panic!("expected Rejected"),
1137 }
1138 }
1139
1140 #[test]
1141 fn handshake_protocol_checked_before_adapter() {
1142 let local = make_handshake_request("node-a", "0.2.0", "ractor", None);
1143 let remote = make_handshake_request("node-b", "1.0.0", "kameo", None);
1144 let resp = validate_handshake(&local, &remote);
1145 match resp {
1147 HandshakeResponse::Rejected { reason, .. } => {
1148 assert_eq!(reason, RejectionReason::IncompatibleProtocol);
1149 }
1150 _ => panic!("expected Rejected"),
1151 }
1152 }
1153
1154 #[test]
1155 fn handshake_accepted_carries_local_info() {
1156 let local = make_handshake_request("node-a", "0.2.0", "ractor", Some("1.0.0"));
1157 let remote = make_handshake_request("node-b", "0.2.0", "ractor", Some("2.0.0"));
1158 match validate_handshake(&local, &remote) {
1159 HandshakeResponse::Accepted {
1160 node_id,
1161 wire_version,
1162 app_version,
1163 adapter,
1164 } => {
1165 assert_eq!(node_id, NodeId("node-a".into()));
1166 assert_eq!(wire_version.to_string(), "0.2.0");
1167 assert_eq!(app_version.as_deref(), Some("1.0.0"));
1168 assert_eq!(adapter, "ractor");
1169 }
1170 _ => panic!("expected Accepted"),
1171 }
1172 }
1173
1174 #[test]
1175 fn rejection_reason_display() {
1176 assert_eq!(
1177 RejectionReason::IncompatibleProtocol.to_string(),
1178 "incompatible wire protocol"
1179 );
1180 assert_eq!(
1181 RejectionReason::IncompatibleAdapter.to_string(),
1182 "incompatible adapter"
1183 );
1184 }
1185
1186 #[test]
1189 fn verify_peer_identity_matching() {
1190 let resp = HandshakeResponse::Accepted {
1191 node_id: NodeId("node-2".into()),
1192 wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
1193 app_version: None,
1194 adapter: "ractor".into(),
1195 };
1196 assert!(verify_peer_identity(&NodeId("node-2".into()), &resp).is_ok());
1197 }
1198
1199 #[test]
1200 fn verify_peer_identity_mismatch() {
1201 let resp = HandshakeResponse::Accepted {
1202 node_id: NodeId("node-X".into()),
1203 wire_version: crate::version::WireVersion::parse("0.2.0").unwrap(),
1204 app_version: None,
1205 adapter: "ractor".into(),
1206 };
1207 let result = verify_peer_identity(&NodeId("node-2".into()), &resp);
1208 assert!(result.is_err());
1209 assert!(result.unwrap_err().contains("mismatch"));
1210 }
1211
1212 #[test]
1213 fn verify_peer_identity_rejected_is_ok() {
1214 let resp = HandshakeResponse::Rejected {
1215 node_id: NodeId("node-2".into()),
1216 wire_version: crate::version::WireVersion::parse("1.0.0").unwrap(),
1217 reason: RejectionReason::IncompatibleProtocol,
1218 detail: "test".into(),
1219 };
1220 assert!(verify_peer_identity(&NodeId("node-2".into()), &resp).is_ok());
1222 }
1223}