Skip to main content

dactor_mock/
cluster.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use dactor::actor::{Actor, Handler};
5use dactor::node::{ActorId, NodeId};
6use dactor::remote::ClusterState;
7use dactor::supervision::ChildTerminated;
8use dactor::test_support::test_runtime::TestActorRef;
9
10use crate::network::MockNetwork;
11use crate::node::MockNode;
12
13/// A simulated multi-node cluster for testing.
14///
15/// Each node has its own runtime and system actors (SpawnManager,
16/// WatchManager, CancelManager, NodeDirectory). Cross-node messaging
17/// goes through the MockNetwork which can simulate failures.
18pub struct MockCluster {
19    nodes: HashMap<NodeId, MockNode>,
20    network: Arc<MockNetwork>,
21}
22
23impl MockCluster {
24    /// Create a cluster with the given node IDs.
25    ///
26    /// All nodes are connected to each other in the NodeDirectory.
27    pub fn new(node_ids: &[&str]) -> Self {
28        let ids: Vec<NodeId> = node_ids.iter().map(|id| NodeId(id.to_string())).collect();
29        let mut nodes = HashMap::new();
30
31        for id in &ids {
32            let mut node = MockNode::new(id.clone());
33            // Connect each node to all other nodes
34            for peer in &ids {
35                if peer != id {
36                    node.connect_peer(peer);
37                }
38            }
39            nodes.insert(id.clone(), node);
40        }
41
42        Self {
43            nodes,
44            network: Arc::new(MockNetwork::new()),
45        }
46    }
47
48    /// Get a reference to a node.
49    pub fn node(&self, id: &str) -> &MockNode {
50        self.nodes
51            .get(&NodeId(id.to_string()))
52            .unwrap_or_else(|| panic!("node '{}' not found in cluster", id))
53    }
54
55    /// Get a mutable reference to a node.
56    pub fn node_mut(&mut self, id: &str) -> &mut MockNode {
57        self.nodes
58            .get_mut(&NodeId(id.to_string()))
59            .unwrap_or_else(|| panic!("node '{}' not found in cluster", id))
60    }
61
62    /// Get the network for fault injection.
63    pub fn network(&self) -> &MockNetwork {
64        &self.network
65    }
66
67    /// Number of nodes in the cluster.
68    pub fn node_count(&self) -> usize {
69        self.nodes.len()
70    }
71
72    /// All node IDs.
73    pub fn node_ids(&self) -> Vec<NodeId> {
74        self.nodes.keys().cloned().collect()
75    }
76
77    /// Remove a node from the cluster.
78    ///
79    /// Updates all surviving nodes' NodeDirectory to mark the crashed
80    /// node as Disconnected.
81    pub fn crash_node(&mut self, id: &str) {
82        let crashed = NodeId(id.to_string());
83        self.nodes.remove(&crashed);
84        // Update surviving nodes
85        for node in self.nodes.values_mut() {
86            node.disconnect_peer(&crashed);
87        }
88    }
89
90    /// Restart a node — creates a fresh node with same ID.
91    /// Reconnects it to all existing peers and updates their directories.
92    pub fn restart_node(&mut self, id: &str) {
93        let node_id = NodeId(id.to_string());
94        let mut new_node = MockNode::new(node_id.clone());
95        // Connect new node to all existing peers
96        for peer_id in self.nodes.keys() {
97            new_node.connect_peer(peer_id);
98        }
99        // Update existing nodes to know about the restarted node
100        for node in self.nodes.values_mut() {
101            node.connect_peer(&node_id);
102        }
103        self.nodes.insert(node_id, new_node);
104    }
105
106    /// Freeze a node — removes it from the cluster temporarily.
107    /// The returned `MockNode` retains its runtime and actors.
108    /// Use `unfreeze_node` to restore it.
109    pub fn freeze_node(&mut self, id: &str) -> Option<MockNode> {
110        self.nodes.remove(&NodeId(id.to_string()))
111    }
112
113    /// Unfreeze a node — restore it to the cluster.
114    /// The node is re-inserted using its original `node_id`.
115    pub fn unfreeze_node(&mut self, node: MockNode) {
116        self.nodes.insert(node.node_id.clone(), node);
117    }
118
119    /// Watch an actor from a watcher on the same node.
120    /// Both watcher and target must be on nodes in this cluster.
121    pub fn watch<W>(&self, watcher_node: &str, watcher: &TestActorRef<W>, target_id: ActorId)
122    where
123        W: Actor + Handler<ChildTerminated> + 'static,
124    {
125        let node = self.node(watcher_node);
126        node.runtime.watch(watcher, target_id);
127    }
128
129    /// Unwatch an actor.
130    pub fn unwatch(&self, node_id: &str, watcher_id: &ActorId, target_id: &ActorId) {
131        let node = self.node(node_id);
132        node.runtime.unwatch(watcher_id, target_id);
133    }
134
135    /// Get a ClusterState snapshot from the perspective of a specific node.
136    ///
137    /// The `local_node` parameter determines which node is treated as local
138    /// (excluded from `peer_versions`). Returns `None` if the cluster is
139    /// empty or the node is not in the cluster.
140    pub fn state_for(&self, local_node: &str) -> Option<ClusterState> {
141        let local = NodeId(local_node.into());
142        if !self.nodes.contains_key(&local) {
143            return None;
144        }
145        let node_ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
146        let mut state = ClusterState::new(local.clone(), node_ids);
147        for node_id in &state.nodes {
148            if node_id != &local {
149                state.peer_versions.insert(
150                    node_id.clone(),
151                    dactor::remote::PeerVersionInfo {
152                        wire_version: dactor::version::WireVersion::parse(
153                            dactor::version::DACTOR_WIRE_VERSION,
154                        )
155                        .unwrap(),
156                        app_version: None,
157                        adapter: "mock".into(),
158                    },
159                );
160            }
161        }
162        Some(state)
163    }
164
165    /// Get a ClusterState snapshot. Returns `None` if the cluster is empty.
166    ///
167    /// Uses the first node (sorted by ID) as the local node perspective.
168    /// For deterministic results, prefer [`state_for`](Self::state_for).
169    pub fn state(&self) -> Option<ClusterState> {
170        if self.nodes.is_empty() {
171            return None;
172        }
173        let mut node_ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
174        node_ids.sort_by(|a, b| a.0.cmp(&b.0));
175        self.state_for(&node_ids[0].0)
176    }
177
178    /// Register a remote watch via the target node's WatchManager.
179    pub fn remote_watch(&mut self, target_node: &str, target: ActorId, watcher: ActorId) {
180        let node = self.node_mut(target_node);
181        node.watch_manager.watch(target.clone(), watcher);
182    }
183
184    /// Remove a remote watch via the target node's WatchManager.
185    pub fn remote_unwatch(&mut self, target_node: &str, target: &ActorId, watcher: &ActorId) {
186        let node = self.node_mut(target_node);
187        node.watch_manager.unwatch(target, watcher);
188    }
189
190    /// Notify the target node's WatchManager that an actor has terminated.
191    /// Returns notifications for all remote watchers of this actor.
192    pub fn notify_terminated(
193        &mut self,
194        node_id: &str,
195        terminated: &ActorId,
196    ) -> Vec<dactor::system_actors::WatchNotification> {
197        let node = self.node_mut(node_id);
198        node.watch_manager.on_terminated(terminated)
199    }
200
201    /// Register a cancellation token on a node's CancelManager.
202    pub fn register_cancel(
203        &mut self,
204        node_id: &str,
205        request_id: String,
206        token: tokio_util::sync::CancellationToken,
207    ) {
208        let node = self.node_mut(node_id);
209        node.cancel_manager.register(request_id, token);
210    }
211
212    /// Cancel a request on a node's CancelManager.
213    pub fn cancel_request(
214        &mut self,
215        node_id: &str,
216        request_id: &str,
217    ) -> dactor::system_actors::CancelResponse {
218        let node = self.node_mut(node_id);
219        node.cancel_manager.cancel(request_id)
220    }
221}