1use std::collections::HashMap;
2use std::sync::Arc;
3
4use dactor::actor::{Actor, Handler};
5use dactor::node::{ActorId, NodeId};
6use dactor::remote::ClusterState;
7use dactor::supervision::ChildTerminated;
8use dactor::test_support::test_runtime::TestActorRef;
9
10use crate::network::MockNetwork;
11use crate::node::MockNode;
12
13pub struct MockCluster {
19 nodes: HashMap<NodeId, MockNode>,
20 network: Arc<MockNetwork>,
21}
22
23impl MockCluster {
24 pub fn new(node_ids: &[&str]) -> Self {
28 let ids: Vec<NodeId> = node_ids.iter().map(|id| NodeId(id.to_string())).collect();
29 let mut nodes = HashMap::new();
30
31 for id in &ids {
32 let mut node = MockNode::new(id.clone());
33 for peer in &ids {
35 if peer != id {
36 node.connect_peer(peer);
37 }
38 }
39 nodes.insert(id.clone(), node);
40 }
41
42 Self {
43 nodes,
44 network: Arc::new(MockNetwork::new()),
45 }
46 }
47
48 pub fn node(&self, id: &str) -> &MockNode {
50 self.nodes
51 .get(&NodeId(id.to_string()))
52 .unwrap_or_else(|| panic!("node '{}' not found in cluster", id))
53 }
54
55 pub fn node_mut(&mut self, id: &str) -> &mut MockNode {
57 self.nodes
58 .get_mut(&NodeId(id.to_string()))
59 .unwrap_or_else(|| panic!("node '{}' not found in cluster", id))
60 }
61
62 pub fn network(&self) -> &MockNetwork {
64 &self.network
65 }
66
67 pub fn node_count(&self) -> usize {
69 self.nodes.len()
70 }
71
72 pub fn node_ids(&self) -> Vec<NodeId> {
74 self.nodes.keys().cloned().collect()
75 }
76
77 pub fn crash_node(&mut self, id: &str) {
82 let crashed = NodeId(id.to_string());
83 self.nodes.remove(&crashed);
84 for node in self.nodes.values_mut() {
86 node.disconnect_peer(&crashed);
87 }
88 }
89
90 pub fn restart_node(&mut self, id: &str) {
93 let node_id = NodeId(id.to_string());
94 let mut new_node = MockNode::new(node_id.clone());
95 for peer_id in self.nodes.keys() {
97 new_node.connect_peer(peer_id);
98 }
99 for node in self.nodes.values_mut() {
101 node.connect_peer(&node_id);
102 }
103 self.nodes.insert(node_id, new_node);
104 }
105
106 pub fn freeze_node(&mut self, id: &str) -> Option<MockNode> {
110 self.nodes.remove(&NodeId(id.to_string()))
111 }
112
113 pub fn unfreeze_node(&mut self, node: MockNode) {
116 self.nodes.insert(node.node_id.clone(), node);
117 }
118
119 pub fn watch<W>(&self, watcher_node: &str, watcher: &TestActorRef<W>, target_id: ActorId)
122 where
123 W: Actor + Handler<ChildTerminated> + 'static,
124 {
125 let node = self.node(watcher_node);
126 node.runtime.watch(watcher, target_id);
127 }
128
129 pub fn unwatch(&self, node_id: &str, watcher_id: &ActorId, target_id: &ActorId) {
131 let node = self.node(node_id);
132 node.runtime.unwatch(watcher_id, target_id);
133 }
134
135 pub fn state_for(&self, local_node: &str) -> Option<ClusterState> {
141 let local = NodeId(local_node.into());
142 if !self.nodes.contains_key(&local) {
143 return None;
144 }
145 let node_ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
146 let mut state = ClusterState::new(local.clone(), node_ids);
147 for node_id in &state.nodes {
148 if node_id != &local {
149 state.peer_versions.insert(
150 node_id.clone(),
151 dactor::remote::PeerVersionInfo {
152 wire_version: dactor::version::WireVersion::parse(
153 dactor::version::DACTOR_WIRE_VERSION,
154 )
155 .unwrap(),
156 app_version: None,
157 adapter: "mock".into(),
158 },
159 );
160 }
161 }
162 Some(state)
163 }
164
165 pub fn state(&self) -> Option<ClusterState> {
170 if self.nodes.is_empty() {
171 return None;
172 }
173 let mut node_ids: Vec<NodeId> = self.nodes.keys().cloned().collect();
174 node_ids.sort_by(|a, b| a.0.cmp(&b.0));
175 self.state_for(&node_ids[0].0)
176 }
177
178 pub fn remote_watch(&mut self, target_node: &str, target: ActorId, watcher: ActorId) {
180 let node = self.node_mut(target_node);
181 node.watch_manager.watch(target.clone(), watcher);
182 }
183
184 pub fn remote_unwatch(&mut self, target_node: &str, target: &ActorId, watcher: &ActorId) {
186 let node = self.node_mut(target_node);
187 node.watch_manager.unwatch(target, watcher);
188 }
189
190 pub fn notify_terminated(
193 &mut self,
194 node_id: &str,
195 terminated: &ActorId,
196 ) -> Vec<dactor::system_actors::WatchNotification> {
197 let node = self.node_mut(node_id);
198 node.watch_manager.on_terminated(terminated)
199 }
200
201 pub fn register_cancel(
203 &mut self,
204 node_id: &str,
205 request_id: String,
206 token: tokio_util::sync::CancellationToken,
207 ) {
208 let node = self.node_mut(node_id);
209 node.cancel_manager.register(request_id, token);
210 }
211
212 pub fn cancel_request(
214 &mut self,
215 node_id: &str,
216 request_id: &str,
217 ) -> dactor::system_actors::CancelResponse {
218 let node = self.node_mut(node_id);
219 node.cancel_manager.cancel(request_id)
220 }
221}