Skip to main content

dactor_test_harness/
cluster.rs

1use std::collections::HashMap;
2use std::process::{Child, Command, Stdio};
3use std::time::Duration;
4
5use crate::events::EventStream;
6use crate::protocol::test_node_service_client::TestNodeServiceClient;
7use crate::protocol::*;
8
9pub struct TestNodeHandle {
10    pub process: Child,
11    pub node_id: String,
12    pub control_port: u16,
13    client: Option<TestNodeServiceClient<tonic::transport::Channel>>,
14}
15
16pub struct TestCluster {
17    nodes: HashMap<String, TestNodeHandle>,
18}
19
20pub struct TestClusterBuilder {
21    nodes: Vec<(String, String, Vec<String>, u16)>,
22}
23
24impl TestCluster {
25    pub fn builder() -> TestClusterBuilder {
26        TestClusterBuilder { nodes: Vec::new() }
27    }
28
29    /// Ping a node to verify it's alive.
30    pub async fn ping(
31        &mut self,
32        node_id: &str,
33        echo: &str,
34    ) -> Result<PingResponse, Box<dyn std::error::Error>> {
35        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
36        let client = handle.client.as_mut().ok_or("not connected")?;
37        let response = client
38            .ping(PingRequest {
39                echo: echo.to_string(),
40            })
41            .await?;
42        Ok(response.into_inner())
43    }
44
45    /// Get node info.
46    pub async fn get_node_info(
47        &mut self,
48        node_id: &str,
49    ) -> Result<NodeInfoResponse, Box<dyn std::error::Error>> {
50        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
51        let client = handle.client.as_mut().ok_or("not connected")?;
52        let response = client.get_node_info(Empty {}).await?;
53        Ok(response.into_inner())
54    }
55
56    /// Inject a fault on a node.
57    pub async fn inject_fault(
58        &mut self,
59        node_id: &str,
60        fault_type: &str,
61        target: &str,
62        duration_ms: u64,
63        count: u32,
64    ) -> Result<(), Box<dyn std::error::Error>> {
65        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
66        let client = handle.client.as_mut().ok_or("not connected")?;
67        client
68            .inject_fault(FaultRequest {
69                fault_type: fault_type.to_string(),
70                target: target.to_string(),
71                duration_ms,
72                count,
73                detail: String::new(),
74            })
75            .await?;
76        Ok(())
77    }
78
79    /// Clear all faults on a node.
80    pub async fn clear_faults(&mut self, node_id: &str) -> Result<(), Box<dyn std::error::Error>> {
81        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
82        let client = handle.client.as_mut().ok_or("not connected")?;
83        client.clear_faults(Empty {}).await?;
84        Ok(())
85    }
86
87    /// Subscribe to events from a node.
88    pub async fn subscribe_events(
89        &mut self,
90        node_id: &str,
91        event_types: &[&str],
92    ) -> Result<EventStream, Box<dyn std::error::Error>> {
93        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
94        let client = handle.client.as_mut().ok_or("not connected")?;
95        let response = client
96            .subscribe_events(EventFilter {
97                event_types: event_types.iter().map(|s| s.to_string()).collect(),
98            })
99            .await?;
100        Ok(EventStream::new(response.into_inner()))
101    }
102
103    /// Send a custom command.
104    pub async fn custom(
105        &mut self,
106        node_id: &str,
107        command_type: &str,
108        payload: &[u8],
109    ) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
110        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
111        let client = handle.client.as_mut().ok_or("not connected")?;
112        let response = client
113            .custom_command(CustomRequest {
114                command_type: command_type.to_string(),
115                payload: payload.to_vec(),
116            })
117            .await?;
118        Ok(response.into_inner().payload)
119    }
120
121    /// Spawn an actor on a node.
122    pub async fn spawn_actor(
123        &mut self,
124        node_id: &str,
125        actor_type: &str,
126        actor_name: &str,
127        args: &[u8],
128    ) -> Result<SpawnActorResponse, Box<dyn std::error::Error>> {
129        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
130        let client = handle.client.as_mut().ok_or("not connected")?;
131        let response = client
132            .spawn_actor(SpawnActorRequest {
133                actor_type: actor_type.to_string(),
134                actor_name: actor_name.to_string(),
135                args: args.to_vec(),
136            })
137            .await?;
138        Ok(response.into_inner())
139    }
140
141    /// Send a fire-and-forget message to an actor on a node.
142    pub async fn tell_actor(
143        &mut self,
144        node_id: &str,
145        actor_name: &str,
146        message_type: &str,
147        payload: &[u8],
148    ) -> Result<TellActorResponse, Box<dyn std::error::Error>> {
149        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
150        let client = handle.client.as_mut().ok_or("not connected")?;
151        let response = client
152            .tell_actor(TellActorRequest {
153                actor_name: actor_name.to_string(),
154                message_type: message_type.to_string(),
155                payload: payload.to_vec(),
156            })
157            .await?;
158        Ok(response.into_inner())
159    }
160
161    /// Send a request-reply message to an actor on a node.
162    pub async fn ask_actor(
163        &mut self,
164        node_id: &str,
165        actor_name: &str,
166        message_type: &str,
167        payload: &[u8],
168    ) -> Result<AskActorResponse, Box<dyn std::error::Error>> {
169        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
170        let client = handle.client.as_mut().ok_or("not connected")?;
171        let response = client
172            .ask_actor(AskActorRequest {
173                actor_name: actor_name.to_string(),
174                message_type: message_type.to_string(),
175                payload: payload.to_vec(),
176                timeout_ms: 0,
177            })
178            .await?;
179        Ok(response.into_inner())
180    }
181
182    /// Send a request-reply message to an actor on a node with a timeout.
183    /// If `timeout_ms > 0`, the ask is cancelled after that many milliseconds.
184    pub async fn ask_actor_with_timeout(
185        &mut self,
186        node_id: &str,
187        actor_name: &str,
188        message_type: &str,
189        payload: &[u8],
190        timeout_ms: u64,
191    ) -> Result<AskActorResponse, Box<dyn std::error::Error>> {
192        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
193        let client = handle.client.as_mut().ok_or("not connected")?;
194        let response = client
195            .ask_actor(AskActorRequest {
196                actor_name: actor_name.to_string(),
197                message_type: message_type.to_string(),
198                payload: payload.to_vec(),
199                timeout_ms,
200            })
201            .await?;
202        Ok(response.into_inner())
203    }
204
205    /// Stop an actor on a node.
206    pub async fn stop_actor(
207        &mut self,
208        node_id: &str,
209        actor_name: &str,
210    ) -> Result<StopActorResponse, Box<dyn std::error::Error>> {
211        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
212        let client = handle.client.as_mut().ok_or("not connected")?;
213        let response = client
214            .stop_actor(StopActorRequest {
215                actor_name: actor_name.to_string(),
216            })
217            .await?;
218        Ok(response.into_inner())
219    }
220
221    /// Register a watch: when `target_name` stops, `watcher_name` is notified.
222    pub async fn watch_actor(
223        &mut self,
224        node_id: &str,
225        watcher_name: &str,
226        target_name: &str,
227    ) -> Result<WatchActorResponse, Box<dyn std::error::Error>> {
228        let handle = self.nodes.get_mut(node_id).ok_or("node not found")?;
229        let client = handle.client.as_mut().ok_or("not connected")?;
230        let response = client
231            .watch_actor(WatchActorRequest {
232                watcher_name: watcher_name.to_string(),
233                target_name: target_name.to_string(),
234            })
235            .await?;
236        Ok(response.into_inner())
237    }
238
239    /// Graceful shutdown of a specific node.
240    pub async fn shutdown_node(&mut self, node_id: &str) -> Result<(), Box<dyn std::error::Error>> {
241        if let Some(handle) = self.nodes.get_mut(node_id) {
242            if let Some(client) = handle.client.as_mut() {
243                let _ = client
244                    .shutdown(ShutdownRequest {
245                        graceful: true,
246                        timeout_ms: 5000,
247                    })
248                    .await;
249            }
250            let _ = handle.process.kill();
251            let _ = handle.process.wait();
252        }
253        Ok(())
254    }
255
256    /// Shutdown all nodes.
257    pub async fn shutdown(&mut self) {
258        let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
259        for node_id in node_ids {
260            let _ = self.shutdown_node(&node_id).await;
261        }
262    }
263}
264
265impl Drop for TestCluster {
266    fn drop(&mut self) {
267        for (_, handle) in self.nodes.iter_mut() {
268            let _ = handle.process.kill();
269            let _ = handle.process.wait(); // reap to avoid zombies and port reuse races
270        }
271    }
272}
273
274impl TestClusterBuilder {
275    /// Add a node to the cluster.
276    pub fn node(mut self, node_id: &str, binary: &str, args: &[&str], port: u16) -> Self {
277        self.nodes.push((
278            node_id.to_string(),
279            binary.to_string(),
280            args.iter().map(|s| s.to_string()).collect(),
281            port,
282        ));
283        self
284    }
285
286    /// Build and launch all nodes. Waits for each node to become reachable.
287    /// Panics if any node fails to start or connect within the retry window.
288    pub async fn build(self) -> TestCluster {
289        let mut nodes: HashMap<String, TestNodeHandle> = HashMap::new();
290
291        for (node_id, binary, args, port) in self.nodes {
292            let mut process = Command::new(&binary)
293                .args(&args)
294                .env("DACTOR_NODE_ID", &node_id)
295                .env("DACTOR_CONTROL_PORT", port.to_string())
296                .stdout(Stdio::null())
297                .stderr(Stdio::null())
298                .spawn()
299                .unwrap_or_else(|e| {
300                    // Kill already-spawned nodes before panicking
301                    for (_, handle) in nodes.iter_mut() {
302                        let _ = handle.process.kill();
303                        let _ = handle.process.wait();
304                    }
305                    panic!(
306                        "Failed to launch test node '{}' ({}): {}",
307                        node_id, binary, e
308                    )
309                });
310
311            // Connect gRPC client with retry
312            let addr = format!("http://127.0.0.1:{}", port);
313            let mut client = None;
314            for _ in 0..50 {
315                tokio::time::sleep(Duration::from_millis(100)).await;
316                match TestNodeServiceClient::connect(addr.clone()).await {
317                    Ok(c) => {
318                        client = Some(c);
319                        break;
320                    }
321                    Err(_) => continue,
322                }
323            }
324
325            if client.is_none() {
326                // Kill this process and all already-spawned nodes before panicking
327                let _ = process.kill();
328                let _ = process.wait();
329                for (_, handle) in nodes.iter_mut() {
330                    let _ = handle.process.kill();
331                    let _ = handle.process.wait();
332                }
333                panic!(
334                    "Failed to connect to test node '{}' at {} after 5s of retries",
335                    node_id, addr
336                );
337            }
338
339            nodes.insert(
340                node_id.clone(),
341                TestNodeHandle {
342                    process,
343                    node_id,
344                    control_port: port,
345                    client,
346                },
347            );
348        }
349
350        TestCluster { nodes }
351    }
352}