1use std::collections::HashMap;
2use std::process::{Child, Command, Stdio};
3use std::time::Duration;
4
5use crate::events::EventStream;
6use crate::protocol::test_node_service_client::TestNodeServiceClient;
7use crate::protocol::*;
8
9pub struct TestNodeHandle {
10 pub process: Child,
11 pub node_id: String,
12 pub control_port: u16,
13 client: Option<TestNodeServiceClient<tonic::transport::Channel>>,
14}
15
16pub struct TestCluster {
17 nodes: HashMap<String, TestNodeHandle>,
18}
19
20pub struct TestClusterBuilder {
21 nodes: Vec<(String, String, Vec<String>, u16)>,
22}
23
24impl TestCluster {
25 pub fn builder() -> TestClusterBuilder {
26 TestClusterBuilder { nodes: Vec::new() }
27 }
28
29 pub async fn ping(
31 &mut self,
32 node_id: &str,
33 echo: &str,
34 ) -> Result<PingResponse, Box<dyn std::error::Error>> {
35 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
36 let client = handle.client.as_mut().ok_or("not connected")?;
37 let response = client
38 .ping(PingRequest {
39 echo: echo.to_string(),
40 })
41 .await?;
42 Ok(response.into_inner())
43 }
44
45 pub async fn get_node_info(
47 &mut self,
48 node_id: &str,
49 ) -> Result<NodeInfoResponse, Box<dyn std::error::Error>> {
50 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
51 let client = handle.client.as_mut().ok_or("not connected")?;
52 let response = client.get_node_info(Empty {}).await?;
53 Ok(response.into_inner())
54 }
55
56 pub async fn inject_fault(
58 &mut self,
59 node_id: &str,
60 fault_type: &str,
61 target: &str,
62 duration_ms: u64,
63 count: u32,
64 ) -> Result<(), Box<dyn std::error::Error>> {
65 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
66 let client = handle.client.as_mut().ok_or("not connected")?;
67 client
68 .inject_fault(FaultRequest {
69 fault_type: fault_type.to_string(),
70 target: target.to_string(),
71 duration_ms,
72 count,
73 detail: String::new(),
74 })
75 .await?;
76 Ok(())
77 }
78
79 pub async fn clear_faults(&mut self, node_id: &str) -> Result<(), Box<dyn std::error::Error>> {
81 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
82 let client = handle.client.as_mut().ok_or("not connected")?;
83 client.clear_faults(Empty {}).await?;
84 Ok(())
85 }
86
87 pub async fn subscribe_events(
89 &mut self,
90 node_id: &str,
91 event_types: &[&str],
92 ) -> Result<EventStream, Box<dyn std::error::Error>> {
93 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
94 let client = handle.client.as_mut().ok_or("not connected")?;
95 let response = client
96 .subscribe_events(EventFilter {
97 event_types: event_types.iter().map(|s| s.to_string()).collect(),
98 })
99 .await?;
100 Ok(EventStream::new(response.into_inner()))
101 }
102
103 pub async fn custom(
105 &mut self,
106 node_id: &str,
107 command_type: &str,
108 payload: &[u8],
109 ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
110 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
111 let client = handle.client.as_mut().ok_or("not connected")?;
112 let response = client
113 .custom_command(CustomRequest {
114 command_type: command_type.to_string(),
115 payload: payload.to_vec(),
116 })
117 .await?;
118 Ok(response.into_inner().payload)
119 }
120
121 pub async fn spawn_actor(
123 &mut self,
124 node_id: &str,
125 actor_type: &str,
126 actor_name: &str,
127 args: &[u8],
128 ) -> Result<SpawnActorResponse, Box<dyn std::error::Error>> {
129 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
130 let client = handle.client.as_mut().ok_or("not connected")?;
131 let response = client
132 .spawn_actor(SpawnActorRequest {
133 actor_type: actor_type.to_string(),
134 actor_name: actor_name.to_string(),
135 args: args.to_vec(),
136 })
137 .await?;
138 Ok(response.into_inner())
139 }
140
141 pub async fn tell_actor(
143 &mut self,
144 node_id: &str,
145 actor_name: &str,
146 message_type: &str,
147 payload: &[u8],
148 ) -> Result<TellActorResponse, Box<dyn std::error::Error>> {
149 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
150 let client = handle.client.as_mut().ok_or("not connected")?;
151 let response = client
152 .tell_actor(TellActorRequest {
153 actor_name: actor_name.to_string(),
154 message_type: message_type.to_string(),
155 payload: payload.to_vec(),
156 })
157 .await?;
158 Ok(response.into_inner())
159 }
160
161 pub async fn ask_actor(
163 &mut self,
164 node_id: &str,
165 actor_name: &str,
166 message_type: &str,
167 payload: &[u8],
168 ) -> Result<AskActorResponse, Box<dyn std::error::Error>> {
169 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
170 let client = handle.client.as_mut().ok_or("not connected")?;
171 let response = client
172 .ask_actor(AskActorRequest {
173 actor_name: actor_name.to_string(),
174 message_type: message_type.to_string(),
175 payload: payload.to_vec(),
176 timeout_ms: 0,
177 })
178 .await?;
179 Ok(response.into_inner())
180 }
181
182 pub async fn ask_actor_with_timeout(
185 &mut self,
186 node_id: &str,
187 actor_name: &str,
188 message_type: &str,
189 payload: &[u8],
190 timeout_ms: u64,
191 ) -> Result<AskActorResponse, Box<dyn std::error::Error>> {
192 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
193 let client = handle.client.as_mut().ok_or("not connected")?;
194 let response = client
195 .ask_actor(AskActorRequest {
196 actor_name: actor_name.to_string(),
197 message_type: message_type.to_string(),
198 payload: payload.to_vec(),
199 timeout_ms,
200 })
201 .await?;
202 Ok(response.into_inner())
203 }
204
205 pub async fn stop_actor(
207 &mut self,
208 node_id: &str,
209 actor_name: &str,
210 ) -> Result<StopActorResponse, Box<dyn std::error::Error>> {
211 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
212 let client = handle.client.as_mut().ok_or("not connected")?;
213 let response = client
214 .stop_actor(StopActorRequest {
215 actor_name: actor_name.to_string(),
216 })
217 .await?;
218 Ok(response.into_inner())
219 }
220
221 pub async fn watch_actor(
223 &mut self,
224 node_id: &str,
225 watcher_name: &str,
226 target_name: &str,
227 ) -> Result<WatchActorResponse, Box<dyn std::error::Error>> {
228 let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
229 let client = handle.client.as_mut().ok_or("not connected")?;
230 let response = client
231 .watch_actor(WatchActorRequest {
232 watcher_name: watcher_name.to_string(),
233 target_name: target_name.to_string(),
234 })
235 .await?;
236 Ok(response.into_inner())
237 }
238
239 pub async fn shutdown_node(&mut self, node_id: &str) -> Result<(), Box<dyn std::error::Error>> {
241 if let Some(handle) = self.nodes.get_mut(node_id) {
242 if let Some(client) = handle.client.as_mut() {
243 let _ = client
244 .shutdown(ShutdownRequest {
245 graceful: true,
246 timeout_ms: 5000,
247 })
248 .await;
249 }
250 let _ = handle.process.kill();
251 let _ = handle.process.wait();
252 }
253 Ok(())
254 }
255
256 pub async fn shutdown(&mut self) {
258 let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
259 for node_id in node_ids {
260 let _ = self.shutdown_node(&node_id).await;
261 }
262 }
263}
264
265impl Drop for TestCluster {
266 fn drop(&mut self) {
267 for (_, handle) in self.nodes.iter_mut() {
268 let _ = handle.process.kill();
269 let _ = handle.process.wait(); }
271 }
272}
273
274impl TestClusterBuilder {
275 pub fn node(mut self, node_id: &str, binary: &str, args: &[&str], port: u16) -> Self {
277 self.nodes.push((
278 node_id.to_string(),
279 binary.to_string(),
280 args.iter().map(|s| s.to_string()).collect(),
281 port,
282 ));
283 self
284 }
285
286 pub async fn build(self) -> TestCluster {
289 let mut nodes: HashMap<String, TestNodeHandle> = HashMap::new();
290
291 for (node_id, binary, args, port) in self.nodes {
292 let mut process = Command::new(&binary)
293 .args(&args)
294 .env("DACTOR_NODE_ID", &node_id)
295 .env("DACTOR_CONTROL_PORT", port.to_string())
296 .stdout(Stdio::null())
297 .stderr(Stdio::null())
298 .spawn()
299 .unwrap_or_else(|e| {
300 for (_, handle) in nodes.iter_mut() {
302 let _ = handle.process.kill();
303 let _ = handle.process.wait();
304 }
305 panic!(
306 "Failed to launch test node '{}' ({}): {}",
307 node_id, binary, e
308 )
309 });
310
311 let addr = format!("http://127.0.0.1:{}", port);
313 let mut client = None;
314 for _ in 0..50 {
315 tokio::time::sleep(Duration::from_millis(100)).await;
316 match TestNodeServiceClient::connect(addr.clone()).await {
317 Ok(c) => {
318 client = Some(c);
319 break;
320 }
321 Err(_) => continue,
322 }
323 }
324
325 if client.is_none() {
326 let _ = process.kill();
328 let _ = process.wait();
329 for (_, handle) in nodes.iter_mut() {
330 let _ = handle.process.kill();
331 let _ = handle.process.wait();
332 }
333 panic!(
334 "Failed to connect to test node '{}' at {} after 5s of retries",
335 node_id, addr
336 );
337 }
338
339 nodes.insert(
340 node_id.clone(),
341 TestNodeHandle {
342 process,
343 node_id,
344 control_port: port,
345 client,
346 },
347 );
348 }
349
350 TestCluster { nodes }
351 }
352}