1use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
2use std::sync::Arc;
3use std::time::Instant;
4use tokio::sync::{broadcast, Notify};
5use tokio_stream::StreamExt;
6use tonic::{Request, Response, Status};
7
8use crate::fault::FaultInjector;
9use crate::handler::CommandHandler;
10use crate::protocol::test_node_service_server::{TestNodeService, TestNodeServiceServer};
11use crate::protocol::*;
12
13pub struct TestNodeConfig {
14 pub node_id: String,
15 pub control_port: u16,
16}
17
18impl TestNodeConfig {
19 pub fn from_args(node_id: &str, port: u16) -> Self {
20 Self {
21 node_id: node_id.to_string(),
22 control_port: port,
23 }
24 }
25}
26
27pub struct TestNode {
28 config: TestNodeConfig,
29 start_time: Instant,
30 fault_injector: Arc<FaultInjector>,
31 event_tx: broadcast::Sender<NodeEvent>,
32 shutdown_flag: Arc<AtomicBool>,
33 shutdown_notify: Arc<Notify>,
34 actor_count: Arc<AtomicU32>,
35 handler: Option<Arc<dyn CommandHandler>>,
36}
37
38impl TestNode {
39 pub fn new(config: TestNodeConfig) -> Self {
40 let (event_tx, _) = broadcast::channel(256);
41 Self {
42 config,
43 start_time: Instant::now(),
44 fault_injector: Arc::new(FaultInjector::new()),
45 event_tx,
46 shutdown_flag: Arc::new(AtomicBool::new(false)),
47 shutdown_notify: Arc::new(Notify::new()),
48 actor_count: Arc::new(AtomicU32::new(0)),
49 handler: None,
50 }
51 }
52
53 pub fn with_handler(config: TestNodeConfig, handler: Arc<dyn CommandHandler>) -> Self {
55 let (event_tx, _) = broadcast::channel(256);
56 Self {
57 config,
58 start_time: Instant::now(),
59 fault_injector: Arc::new(FaultInjector::new()),
60 event_tx,
61 shutdown_flag: Arc::new(AtomicBool::new(false)),
62 shutdown_notify: Arc::new(Notify::new()),
63 actor_count: Arc::new(AtomicU32::new(0)),
64 handler: Some(handler),
65 }
66 }
67
68 pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
69 let addr = format!("127.0.0.1:{}", self.config.control_port).parse()?;
70 let node_id = self.config.node_id.clone();
71 let shutdown_notify = self.shutdown_notify.clone();
72
73 let svc = TestNodeServiceServer::new(self);
74
75 tracing::info!(node_id = %node_id, addr = %addr, "Test node starting");
76
77 tonic::transport::Server::builder()
78 .add_service(svc)
79 .serve_with_shutdown(addr, async move {
80 shutdown_notify.notified().await;
81 })
82 .await?;
83
84 Ok(())
85 }
86
87 pub fn emit_event(&self, event_type: &str, detail: &str) {
88 let event = NodeEvent {
89 event_type: event_type.to_string(),
90 detail: detail.to_string(),
91 timestamp_ms: self.start_time.elapsed().as_millis() as u64,
92 };
93 let _ = self.event_tx.send(event);
94 }
95}
96
97#[tonic::async_trait]
98impl TestNodeService for TestNode {
99 async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
100 let req = request.into_inner();
101 Ok(Response::new(PingResponse {
102 echo: req.echo,
103 node_id: self.config.node_id.clone(),
104 uptime_ms: self.start_time.elapsed().as_millis() as u64,
105 }))
106 }
107
108 async fn get_node_info(
109 &self,
110 _request: Request<Empty>,
111 ) -> Result<Response<NodeInfoResponse>, Status> {
112 let actor_count = if let Some(ref handler) = self.handler {
113 handler.actor_count()
114 } else {
115 self.actor_count.load(Ordering::Relaxed)
116 };
117 Ok(Response::new(NodeInfoResponse {
118 node_id: self.config.node_id.clone(),
119 uptime_ms: self.start_time.elapsed().as_millis() as u64,
120 adapter: if let Some(ref handler) = self.handler {
121 handler.adapter_name().to_string()
122 } else {
123 "none".to_string()
124 },
125 actor_count,
126 }))
127 }
128
129 async fn shutdown(&self, request: Request<ShutdownRequest>) -> Result<Response<Empty>, Status> {
130 let req = request.into_inner();
131 self.shutdown_flag.store(true, Ordering::SeqCst);
132 self.emit_event(
133 "node_shutdown",
134 &serde_json::json!({
135 "graceful": req.graceful,
136 "timeout_ms": req.timeout_ms,
137 })
138 .to_string(),
139 );
140 self.shutdown_notify.notify_one();
142 Ok(Response::new(Empty {}))
143 }
144
145 async fn inject_fault(
146 &self,
147 request: Request<FaultRequest>,
148 ) -> Result<Response<Empty>, Status> {
149 let req = request.into_inner();
150 self.fault_injector
151 .add_fault(&req.fault_type, &req.target, req.duration_ms, req.count);
152 self.emit_event(
153 "fault_injected",
154 &serde_json::json!({
155 "fault_type": req.fault_type,
156 "target": req.target,
157 })
158 .to_string(),
159 );
160 Ok(Response::new(Empty {}))
161 }
162
163 async fn clear_faults(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
164 self.fault_injector.clear_all();
165 self.emit_event("faults_cleared", "{}");
166 Ok(Response::new(Empty {}))
167 }
168
169 type SubscribeEventsStream =
170 std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<NodeEvent, Status>> + Send>>;
171
172 async fn subscribe_events(
173 &self,
174 request: Request<EventFilter>,
175 ) -> Result<Response<Self::SubscribeEventsStream>, Status> {
176 let filter = request.into_inner();
177 let event_types = filter.event_types;
178 let rx = self.event_tx.subscribe();
179 let stream = tokio_stream::wrappers::BroadcastStream::new(rx)
180 .filter_map(|result| result.ok())
181 .filter(move |event| event_types.is_empty() || event_types.contains(&event.event_type))
182 .map(Ok);
183 Ok(Response::new(Box::pin(stream)))
184 }
185
186 async fn custom_command(
187 &self,
188 request: Request<CustomRequest>,
189 ) -> Result<Response<CustomResponse>, Status> {
190 let req = request.into_inner();
191 Err(Status::unimplemented(format!(
192 "custom command '{}' not registered",
193 req.command_type
194 )))
195 }
196
197 async fn spawn_actor(
198 &self,
199 request: Request<SpawnActorRequest>,
200 ) -> Result<Response<SpawnActorResponse>, Status> {
201 let handler = self
202 .handler
203 .as_ref()
204 .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
205 let req = request.into_inner();
206 match handler
207 .spawn_actor(&req.actor_type, &req.actor_name, &req.args)
208 .await
209 {
210 Ok(actor_id) => {
211 self.emit_event(
212 "actor_spawned",
213 &serde_json::json!({
214 "actor_type": req.actor_type,
215 "actor_name": req.actor_name,
216 "actor_id": actor_id,
217 })
218 .to_string(),
219 );
220 Ok(Response::new(SpawnActorResponse {
221 success: true,
222 actor_id,
223 error: String::new(),
224 }))
225 }
226 Err(e) => Ok(Response::new(SpawnActorResponse {
227 success: false,
228 actor_id: String::new(),
229 error: e,
230 })),
231 }
232 }
233
234 async fn tell_actor(
235 &self,
236 request: Request<TellActorRequest>,
237 ) -> Result<Response<TellActorResponse>, Status> {
238 let handler = self
239 .handler
240 .as_ref()
241 .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
242 let req = request.into_inner();
243
244 if self
246 .fault_injector
247 .has_fault("partition", &req.actor_name)
248 {
249 return Ok(Response::new(TellActorResponse {
250 success: false,
251 error: "partition: message delivery blocked".to_string(),
252 }));
253 }
254
255 match handler
256 .tell_actor(&req.actor_name, &req.message_type, &req.payload)
257 .await
258 {
259 Ok(()) => Ok(Response::new(TellActorResponse {
260 success: true,
261 error: String::new(),
262 })),
263 Err(e) => Ok(Response::new(TellActorResponse {
264 success: false,
265 error: e,
266 })),
267 }
268 }
269
270 async fn ask_actor(
271 &self,
272 request: Request<AskActorRequest>,
273 ) -> Result<Response<AskActorResponse>, Status> {
274 let handler = self
275 .handler
276 .as_ref()
277 .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
278 let req = request.into_inner();
279
280 if self
282 .fault_injector
283 .has_fault("partition", &req.actor_name)
284 {
285 return Ok(Response::new(AskActorResponse {
286 success: false,
287 payload: Vec::new(),
288 error: "partition: message delivery blocked".to_string(),
289 }));
290 }
291
292 match handler
293 .ask_actor(&req.actor_name, &req.message_type, &req.payload, req.timeout_ms)
294 .await
295 {
296 Ok(payload) => Ok(Response::new(AskActorResponse {
297 success: true,
298 payload,
299 error: String::new(),
300 })),
301 Err(e) => Ok(Response::new(AskActorResponse {
302 success: false,
303 payload: Vec::new(),
304 error: e,
305 })),
306 }
307 }
308
309 async fn stop_actor(
310 &self,
311 request: Request<StopActorRequest>,
312 ) -> Result<Response<StopActorResponse>, Status> {
313 let handler = self
314 .handler
315 .as_ref()
316 .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
317 let req = request.into_inner();
318 match handler.stop_actor(&req.actor_name).await {
319 Ok(()) => {
320 self.emit_event(
321 "actor_stopped",
322 &serde_json::json!({ "actor_name": req.actor_name }).to_string(),
323 );
324 Ok(Response::new(StopActorResponse {
325 success: true,
326 error: String::new(),
327 }))
328 }
329 Err(e) => Ok(Response::new(StopActorResponse {
330 success: false,
331 error: e,
332 })),
333 }
334 }
335
336 async fn watch_actor(
337 &self,
338 request: Request<WatchActorRequest>,
339 ) -> Result<Response<WatchActorResponse>, Status> {
340 let handler = self
341 .handler
342 .as_ref()
343 .ok_or_else(|| Status::unimplemented("no command handler registered"))?;
344 let req = request.into_inner();
345 match handler
346 .watch_actor(&req.watcher_name, &req.target_name)
347 .await
348 {
349 Ok(()) => {
350 self.emit_event(
351 "actor_watch_registered",
352 &serde_json::json!({
353 "watcher": req.watcher_name,
354 "target": req.target_name,
355 })
356 .to_string(),
357 );
358 Ok(Response::new(WatchActorResponse {
359 success: true,
360 error: String::new(),
361 }))
362 }
363 Err(e) => Ok(Response::new(WatchActorResponse {
364 success: false,
365 error: e,
366 })),
367 }
368 }
369}