Skip to main content

dactor_test_harness/
node.rs

1use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
2use std::sync::Arc;
3use std::time::Instant;
4use tokio::sync::{broadcast, Notify};
5use tokio_stream::StreamExt;
6use tonic::{Request, Response, Status};
7
8use crate::fault::FaultInjector;
9use crate::handler::CommandHandler;
10use crate::protocol::test_node_service_server::{TestNodeService, TestNodeServiceServer};
11use crate::protocol::*;
12
13pub struct TestNodeConfig {
14    pub node_id: String,
15    pub control_port: u16,
16}
17
18impl TestNodeConfig {
19    pub fn from_args(node_id: &str, port: u16) -> Self {
20        Self {
21            node_id: node_id.to_string(),
22            control_port: port,
23        }
24    }
25}
26
27pub struct TestNode {
28    config: TestNodeConfig,
29    start_time: Instant,
30    fault_injector: Arc<FaultInjector>,
31    event_tx: broadcast::Sender<NodeEvent>,
32    shutdown_flag: Arc<AtomicBool>,
33    shutdown_notify: Arc<Notify>,
34    actor_count: Arc<AtomicU32>,
35    handler: Option<Arc<dyn CommandHandler>>,
36}
37
38impl TestNode {
39    pub fn new(config: TestNodeConfig) -> Self {
40        let (event_tx, _) = broadcast::channel(256);
41        Self {
42            config,
43            start_time: Instant::now(),
44            fault_injector: Arc::new(FaultInjector::new()),
45            event_tx,
46            shutdown_flag: Arc::new(AtomicBool::new(false)),
47            shutdown_notify: Arc::new(Notify::new()),
48            actor_count: Arc::new(AtomicU32::new(0)),
49            handler: None,
50        }
51    }
52
53    /// Create a TestNode with a command handler for actor management.
54    pub fn with_handler(config: TestNodeConfig, handler: Arc<dyn CommandHandler>) -> Self {
55        let (event_tx, _) = broadcast::channel(256);
56        Self {
57            config,
58            start_time: Instant::now(),
59            fault_injector: Arc::new(FaultInjector::new()),
60            event_tx,
61            shutdown_flag: Arc::new(AtomicBool::new(false)),
62            shutdown_notify: Arc::new(Notify::new()),
63            actor_count: Arc::new(AtomicU32::new(0)),
64            handler: Some(handler),
65        }
66    }
67
68    pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
69        let addr = format!("127.0.0.1:{}", self.config.control_port).parse()?;
70        let node_id = self.config.node_id.clone();
71        let shutdown_notify = self.shutdown_notify.clone();
72
73        let svc = TestNodeServiceServer::new(self);
74
75        tracing::info!(node_id = %node_id, addr = %addr, "Test node starting");
76
77        tonic::transport::Server::builder()
78            .add_service(svc)
79            .serve_with_shutdown(addr, async move {
80                shutdown_notify.notified().await;
81            })
82            .await?;
83
84        Ok(())
85    }
86
87    pub fn emit_event(&self, event_type: &str, detail: &str) {
88        let event = NodeEvent {
89            event_type: event_type.to_string(),
90            detail: detail.to_string(),
91            timestamp_ms: self.start_time.elapsed().as_millis() as u64,
92        };
93        let _ = self.event_tx.send(event);
94    }
95}
96
97#[tonic::async_trait]
98impl TestNodeService for TestNode {
99    async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
100        let req = request.into_inner();
101        Ok(Response::new(PingResponse {
102            echo: req.echo,
103            node_id: self.config.node_id.clone(),
104            uptime_ms: self.start_time.elapsed().as_millis() as u64,
105        }))
106    }
107
108    async fn get_node_info(
109        &self,
110        _request: Request<Empty>,
111    ) -> Result<Response<NodeInfoResponse>, Status> {
112        let actor_count = if let Some(ref handler) = self.handler {
113            handler.actor_count()
114        } else {
115            self.actor_count.load(Ordering::Relaxed)
116        };
117        Ok(Response::new(NodeInfoResponse {
118            node_id: self.config.node_id.clone(),
119            uptime_ms: self.start_time.elapsed().as_millis() as u64,
120            adapter: if let Some(ref handler) = self.handler {
121                handler.adapter_name().to_string()
122            } else {
123                "none".to_string()
124            },
125            actor_count,
126        }))
127    }
128
129    async fn shutdown(&self, request: Request<ShutdownRequest>) -> Result<Response<Empty>, Status> {
130        let req = request.into_inner();
131        self.shutdown_flag.store(true, Ordering::SeqCst);
132        self.emit_event(
133            "node_shutdown",
134            &serde_json::json!({
135                "graceful": req.graceful,
136                "timeout_ms": req.timeout_ms,
137            })
138            .to_string(),
139        );
140        // Signal the server to shut down gracefully
141        self.shutdown_notify.notify_one();
142        Ok(Response::new(Empty {}))
143    }
144
145    async fn inject_fault(
146        &self,
147        request: Request<FaultRequest>,
148    ) -> Result<Response<Empty>, Status> {
149        let req = request.into_inner();
150        self.fault_injector
151            .add_fault(&req.fault_type, &req.target, req.duration_ms, req.count);
152        self.emit_event(
153            "fault_injected",
154            &serde_json::json!({
155                "fault_type": req.fault_type,
156                "target": req.target,
157            })
158            .to_string(),
159        );
160        Ok(Response::new(Empty {}))
161    }
162
163    async fn clear_faults(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
164        self.fault_injector.clear_all();
165        self.emit_event("faults_cleared", "{}");
166        Ok(Response::new(Empty {}))
167    }
168
169    type SubscribeEventsStream =
170        std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<NodeEvent, Status>> + Send>>;
171
172    async fn subscribe_events(
173        &self,
174        request: Request<EventFilter>,
175    ) -> Result<Response<Self::SubscribeEventsStream>, Status> {
176        let filter = request.into_inner();
177        let event_types = filter.event_types;
178        let rx = self.event_tx.subscribe();
179        let stream = tokio_stream::wrappers::BroadcastStream::new(rx)
180            .filter_map(|result| result.ok())
181            .filter(move |event| event_types.is_empty() || event_types.contains(&event.event_type))
182            .map(Ok);
183        Ok(Response::new(Box::pin(stream)))
184    }
185
186    async fn custom_command(
187        &self,
188        request: Request<CustomRequest>,
189    ) -> Result<Response<CustomResponse>, Status> {
190        let req = request.into_inner();
191        Err(Status::unimplemented(format!(
192            "custom command '{}' not registered",
193            req.command_type
194        )))
195    }
196
197    async fn spawn_actor(
198        &self,
199        request: Request<SpawnActorRequest>,
200    ) -> Result<Response<SpawnActorResponse>, Status> {
201        let handler = self
202            .handler
203            .as_ref()
204            .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
205        let req = request.into_inner();
206        match handler
207            .spawn_actor(&req.actor_type, &req.actor_name, &req.args)
208            .await
209        {
210            Ok(actor_id) => {
211                self.emit_event(
212                    "actor_spawned",
213                    &serde_json::json!({
214                        "actor_type": req.actor_type,
215                        "actor_name": req.actor_name,
216                        "actor_id": actor_id,
217                    })
218                    .to_string(),
219                );
220                Ok(Response::new(SpawnActorResponse {
221                    success: true,
222                    actor_id,
223                    error: String::new(),
224                }))
225            }
226            Err(e) => Ok(Response::new(SpawnActorResponse {
227                success: false,
228                actor_id: String::new(),
229                error: e,
230            })),
231        }
232    }
233
234    async fn tell_actor(
235        &self,
236        request: Request<TellActorRequest>,
237    ) -> Result<Response<TellActorResponse>, Status> {
238        let handler = self
239            .handler
240            .as_ref()
241            .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
242        let req = request.into_inner();
243
244        // Check for active fault injection
245        if self
246            .fault_injector
247            .has_fault("partition", &req.actor_name)
248        {
249            return Ok(Response::new(TellActorResponse {
250                success: false,
251                error: "partition: message delivery blocked".to_string(),
252            }));
253        }
254
255        match handler
256            .tell_actor(&req.actor_name, &req.message_type, &req.payload)
257            .await
258        {
259            Ok(()) => Ok(Response::new(TellActorResponse {
260                success: true,
261                error: String::new(),
262            })),
263            Err(e) => Ok(Response::new(TellActorResponse {
264                success: false,
265                error: e,
266            })),
267        }
268    }
269
270    async fn ask_actor(
271        &self,
272        request: Request<AskActorRequest>,
273    ) -> Result<Response<AskActorResponse>, Status> {
274        let handler = self
275            .handler
276            .as_ref()
277            .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
278        let req = request.into_inner();
279
280        // Check for active fault injection
281        if self
282            .fault_injector
283            .has_fault("partition", &req.actor_name)
284        {
285            return Ok(Response::new(AskActorResponse {
286                success: false,
287                payload: Vec::new(),
288                error: "partition: message delivery blocked".to_string(),
289            }));
290        }
291
292        match handler
293            .ask_actor(&req.actor_name, &req.message_type, &req.payload, req.timeout_ms)
294            .await
295        {
296            Ok(payload) => Ok(Response::new(AskActorResponse {
297                success: true,
298                payload,
299                error: String::new(),
300            })),
301            Err(e) => Ok(Response::new(AskActorResponse {
302                success: false,
303                payload: Vec::new(),
304                error: e,
305            })),
306        }
307    }
308
309    async fn stop_actor(
310        &self,
311        request: Request<StopActorRequest>,
312    ) -> Result<Response<StopActorResponse>, Status> {
313        let handler = self
314            .handler
315            .as_ref()
316            .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
317        let req = request.into_inner();
318        match handler.stop_actor(&req.actor_name).await {
319            Ok(()) => {
320                self.emit_event(
321                    "actor_stopped",
322                    &serde_json::json!({ "actor_name": req.actor_name }).to_string(),
323                );
324                Ok(Response::new(StopActorResponse {
325                    success: true,
326                    error: String::new(),
327                }))
328            }
329            Err(e) => Ok(Response::new(StopActorResponse {
330                success: false,
331                error: e,
332            })),
333        }
334    }
335
336    async fn watch_actor(
337        &self,
338        request: Request<WatchActorRequest>,
339    ) -> Result<Response<WatchActorResponse>, Status> {
340        let handler = self
341            .handler
342            .as_ref()
343            .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
344        let req = request.into_inner();
345        match handler
346            .watch_actor(&req.watcher_name, &req.target_name)
347            .await
348        {
349            Ok(()) => {
350                self.emit_event(
351                    "actor_watch_registered",
352                    &serde_json::json!({
353                        "watcher": req.watcher_name,
354                        "target": req.target_name,
355                    })
356                    .to_string(),
357                );
358                Ok(Response::new(WatchActorResponse {
359                    success: true,
360                    error: String::new(),
361                }))
362            }
363            Err(e) => Ok(Response::new(WatchActorResponse {
364                success: false,
365                error: e,
366            })),
367        }
368    }
369}