1#[cfg(test)]
6mod tests {
7 use super::*;
8 use crate::runtime::actor::{ActorSystem, Message};
9 use std::sync::{Arc, Mutex};
10 use std::time::Duration;
11 fn create_test_actor_system() -> Arc<Mutex<ActorSystem>> {
13 ActorSystem::new()
14 }
15 fn create_test_config() -> ObservatoryConfig {
16 ObservatoryConfig {
17 max_traces: 100,
18 trace_retention_seconds: 3600,
19 enable_deadlock_detection: true,
20 deadlock_check_interval_ms: 1000,
21 enable_metrics: true,
22 metrics_interval_ms: 5000,
23 max_snapshots: 50,
24 }
25 }
26 fn create_test_observatory() -> ActorObservatory {
27 let system = create_test_actor_system();
28 let config = create_test_config();
29 ActorObservatory::new(system, config)
30 }
31 fn create_test_message_trace() -> MessageTrace {
32 MessageTrace {
33 trace_id: 12345,
34 timestamp: current_timestamp(),
35 source: Some(ActorId(1)),
36 destination: ActorId(2),
37 message: Message::User("test_message".to_string(), vec![]),
38 status: MessageStatus::Queued,
39 processing_duration_us: None,
40 error: None,
41 stack_depth: 1,
42 correlation_id: Some("corr-123".to_string()),
43 }
44 }
45 fn create_test_actor_snapshot() -> ActorSnapshot {
46 ActorSnapshot {
47 actor_id: ActorId(1),
48 name: "test_actor".to_string(),
49 timestamp: current_timestamp(),
50 state: ActorState::Running,
51 mailbox_size: 5,
52 parent: Some(ActorId(0)),
53 children: vec![ActorId(2), ActorId(3)],
54 message_stats: MessageStats::default(),
55 memory_usage: Some(1024),
56 }
57 }
58 fn create_test_message_filter() -> MessageFilter {
59 MessageFilter {
60 name: "test_filter".to_string(),
61 actor_id: Some(ActorId(1)),
62 actor_name_pattern: Some("test_actor".to_string()),
63 message_type_pattern: Some(".*message.*".to_string()),
64 min_processing_time_us: None,
65 failed_only: false,
66 max_stack_depth: Some(10),
67 }
68 }
69 #[test]
71 fn test_observatory_config_default() {
72 let config = ObservatoryConfig::default();
73 assert_eq!(config.max_traces, 10000);
74 assert_eq!(config.trace_retention_seconds, 3600);
75 assert!(config.enable_deadlock_detection);
76 assert_eq!(config.deadlock_check_interval_ms, 1000);
77 assert!(config.enable_metrics);
78 assert_eq!(config.metrics_interval_ms, 5000);
79 assert_eq!(config.max_snapshots, 1000);
80 }
81 #[test]
82 fn test_observatory_config_clone() {
83 let config1 = create_test_config();
84 let config2 = config1.clone();
85 assert_eq!(config1.max_traces, config2.max_traces);
86 assert_eq!(
87 config1.enable_deadlock_detection,
88 config2.enable_deadlock_detection
89 );
90 }
91 #[test]
92 fn test_observatory_config_debug() {
93 let config = create_test_config();
94 let debug_str = format!("{config:?}");
95 assert!(debug_str.contains("ObservatoryConfig"));
96 assert!(debug_str.contains("max_traces"));
97 assert!(debug_str.contains("enable_deadlock_detection"));
98 }
99 #[test]
100 fn test_observatory_config_serialization() {
101 let config = create_test_config();
102 let json = serde_json::to_string(&config).unwrap();
103 let deserialized: ObservatoryConfig = serde_json::from_str(&json).unwrap();
104 assert_eq!(config.max_traces, deserialized.max_traces);
105 assert_eq!(config.enable_metrics, deserialized.enable_metrics);
106 }
107 #[test]
109 fn test_observatory_creation() {
110 let system = create_test_actor_system();
111 let config = create_test_config();
112 let observatory = ActorObservatory::new(system, config.clone());
113 assert_eq!(observatory.config.max_traces, config.max_traces);
114 assert!(observatory.filters.is_empty());
115 assert!(observatory.start_time.elapsed() < Duration::from_secs(1));
116 }
117 #[test]
118 fn test_observatory_with_default_config() {
119 let system = create_test_actor_system();
120 let config = ObservatoryConfig::default();
121 let observatory = ActorObservatory::new(system, config);
122 assert_eq!(observatory.config.max_traces, 10000);
123 assert!(observatory.config.enable_deadlock_detection);
124 }
125 #[test]
126 fn test_observatory_initialization_state() {
127 let observatory = create_test_observatory();
128 assert!(observatory.get_filters().is_empty());
130 let metrics = observatory.metrics.lock().expect("Failed to acquire lock");
131 assert_eq!(metrics.active_actors, 0);
132 assert_eq!(metrics.total_messages_processed, 0);
133 }
134 #[test]
136 fn test_add_message_filter() {
137 let mut observatory = create_test_observatory();
138 let filter = create_test_message_filter();
139 observatory.add_filter(filter.clone());
140 assert_eq!(observatory.get_filters().len(), 1);
141 assert_eq!(observatory.get_filters()[0].name, filter.name);
142 }
143 #[test]
144 fn test_add_multiple_filters() {
145 let mut observatory = create_test_observatory();
146 let filter1 = MessageFilter {
147 name: "filter1".to_string(),
148 actor_id: None,
149 actor_name_pattern: None,
150 message_type_pattern: Some("type1".to_string()),
151 min_processing_time_us: None,
152 failed_only: false,
153 max_stack_depth: None,
154 };
155 let filter2 = MessageFilter {
156 name: "filter2".to_string(),
157 actor_id: Some(ActorId(2)),
158 actor_name_pattern: Some("actor2".to_string()),
159 message_type_pattern: None,
160 min_processing_time_us: Some(1000),
161 failed_only: true,
162 max_stack_depth: Some(5),
163 };
164 observatory.add_filter(filter1);
165 observatory.add_filter(filter2);
166 assert_eq!(observatory.get_filters().len(), 2);
167 assert_eq!(observatory.get_filters()[0].name, "filter1");
168 assert_eq!(observatory.get_filters()[1].name, "filter2");
169 }
170 #[test]
171 fn test_remove_message_filter() {
172 let mut observatory = create_test_observatory();
173 let filter = create_test_message_filter();
174 observatory.add_filter(filter);
175 assert_eq!(observatory.get_filters().len(), 1);
176 let removed = observatory.remove_filter("test_filter");
177 assert!(removed);
178 assert!(observatory.get_filters().is_empty());
179 }
180 #[test]
181 fn test_remove_nonexistent_filter() {
182 let mut observatory = create_test_observatory();
183 let removed = observatory.remove_filter("nonexistent");
184 assert!(!removed);
185 }
186 #[test]
187 fn test_get_filters_empty() {
188 let observatory = create_test_observatory();
189 assert!(observatory.get_filters().is_empty());
190 }
191 #[test]
193 fn test_trace_message() {
194 let observatory = create_test_observatory();
195 let trace = create_test_message_trace();
196 let result = observatory.trace_message(trace.clone());
197 assert!(result.is_ok());
198 let traces = observatory.get_traces(None, None).unwrap();
199 assert_eq!(traces.len(), 1);
200 assert_eq!(traces[0].trace_id, trace.trace_id);
201 }
202 #[test]
203 fn test_trace_message_with_limit() {
204 let mut observatory = create_test_observatory();
205 observatory.config.max_traces = 2;
206 for i in 0..3 {
208 let mut trace = create_test_message_trace();
209 trace.trace_id = i as u64;
210 observatory.trace_message(trace).unwrap();
211 }
212 let traces = observatory.get_traces(None, None).unwrap();
213 assert_eq!(traces.len(), 2);
214 assert_eq!(traces[0].trace_id, 1); assert_eq!(traces[1].trace_id, 2);
216 }
217 #[test]
218 fn test_trace_message_age_retention() {
219 let mut observatory = create_test_observatory();
220 observatory.config.trace_retention_seconds = 1; let mut old_trace = create_test_message_trace();
223 old_trace.timestamp = current_timestamp() - 3600; observatory.trace_message(old_trace).unwrap();
225 let recent_trace = create_test_message_trace();
227 observatory.trace_message(recent_trace).unwrap();
228 let traces = observatory.get_traces(None, None).unwrap();
229 assert_eq!(traces.len(), 1);
231 }
232 #[test]
233 fn test_get_traces_with_limit() {
234 let observatory = create_test_observatory();
235 for i in 0..5 {
237 let mut trace = create_test_message_trace();
238 trace.trace_id = i as u64;
239 observatory.trace_message(trace).unwrap();
240 }
241 let traces = observatory.get_traces(Some(3), None).unwrap();
242 assert_eq!(traces.len(), 3);
243 }
244 #[test]
246 fn test_message_status_variants() {
247 let statuses = [
248 MessageStatus::Queued,
249 MessageStatus::Processing,
250 MessageStatus::Completed,
251 MessageStatus::Failed,
252 MessageStatus::Dropped,
253 ];
254 assert_eq!(statuses.len(), 5);
255 assert_eq!(statuses[0], MessageStatus::Queued);
256 assert_ne!(statuses[0], MessageStatus::Processing);
257 }
258 #[test]
259 fn test_message_status_serialization() {
260 let status = MessageStatus::Processing;
261 let json = serde_json::to_string(&status).unwrap();
262 let deserialized: MessageStatus = serde_json::from_str(&json).unwrap();
263 assert_eq!(status, deserialized);
264 }
265 #[test]
266 fn test_message_status_debug() {
267 let status = MessageStatus::Failed;
268 let debug_str = format!("{status:?}");
269 assert!(debug_str.contains("Failed"));
270 }
271 #[test]
273 fn test_actor_state_variants() {
274 let states = [
275 ActorState::Starting,
276 ActorState::Running,
277 ActorState::Processing("test_message".to_string()),
278 ActorState::Restarting,
279 ActorState::Stopping,
280 ActorState::Stopped,
281 ActorState::Failed("test_error".to_string()),
282 ];
283 assert_eq!(states.len(), 7);
284 assert_eq!(states[0], ActorState::Starting);
285 assert_ne!(states[0], ActorState::Running);
286 }
287 #[test]
288 fn test_actor_state_processing() {
289 let state = ActorState::Processing("handle_request".to_string());
290 if let ActorState::Processing(message_type) = state {
291 assert_eq!(message_type, "handle_request");
292 } else {
293 panic!("Expected Processing state");
294 }
295 }
296 #[test]
297 fn test_actor_state_failed() {
298 let state = ActorState::Failed("connection_timeout".to_string());
299 if let ActorState::Failed(reason) = state {
300 assert_eq!(reason, "connection_timeout");
301 } else {
302 panic!("Expected Failed state");
303 }
304 }
305 #[test]
306 fn test_actor_state_serialization() {
307 let state = ActorState::Processing("test".to_string());
308 let json = serde_json::to_string(&state).unwrap();
309 let deserialized: ActorState = serde_json::from_str(&json).unwrap();
310 assert_eq!(state, deserialized);
311 }
312 #[test]
314 fn test_message_stats_default() {
315 let stats = MessageStats::default();
316 assert_eq!(stats.total_processed, 0);
317 assert_eq!(stats.messages_per_second, 0.0);
318 assert_eq!(stats.avg_processing_time_us, 0.0);
319 assert_eq!(stats.max_processing_time_us, 0);
320 assert_eq!(stats.failed_messages, 0);
321 assert!(stats.last_processed.is_none());
322 }
323 #[test]
324 fn test_message_stats_clone() {
325 let mut stats1 = MessageStats::default();
326 stats1.total_processed = 100;
327 stats1.messages_per_second = 10.5;
328 let stats2 = stats1.clone();
329 assert_eq!(stats1.total_processed, stats2.total_processed);
330 assert_eq!(stats1.messages_per_second, stats2.messages_per_second);
331 }
332 #[test]
333 fn test_message_stats_serialization() {
334 let mut stats = MessageStats::default();
335 stats.total_processed = 500;
336 stats.avg_processing_time_us = 1500.0;
337 let json = serde_json::to_string(&stats).unwrap();
338 let deserialized: MessageStats = serde_json::from_str(&json).unwrap();
339 assert_eq!(stats.total_processed, deserialized.total_processed);
340 }
341 #[test]
343 fn test_system_metrics_default() {
344 let metrics = SystemMetrics::default();
345 assert_eq!(metrics.active_actors, 0);
346 assert_eq!(metrics.total_messages_processed, 0);
347 assert_eq!(metrics.system_messages_per_second, 0.0);
348 assert_eq!(metrics.total_memory_usage, 0);
349 assert_eq!(metrics.total_queued_messages, 0);
350 assert_eq!(metrics.avg_mailbox_size, 0.0);
351 assert_eq!(metrics.recent_restarts, 0);
352 assert!(metrics.last_updated > 0); }
354 #[test]
355 fn test_system_metrics_update() {
356 let mut metrics = SystemMetrics::default();
357 metrics.active_actors = 10;
358 metrics.total_messages_processed = 1000;
359 metrics.system_messages_per_second = 50.5;
360 metrics.last_updated = current_timestamp();
361 assert_eq!(metrics.active_actors, 10);
362 assert_eq!(metrics.total_messages_processed, 1000);
363 assert!(metrics.last_updated > 0);
364 }
365 #[test]
366 fn test_system_metrics_serialization() {
367 let mut metrics = SystemMetrics::default();
368 metrics.active_actors = 5;
369 metrics.total_memory_usage = 1_024_000;
370 let json = serde_json::to_string(&metrics).unwrap();
371 let deserialized: SystemMetrics = serde_json::from_str(&json).unwrap();
372 assert_eq!(metrics.active_actors, deserialized.active_actors);
373 assert_eq!(metrics.total_memory_usage, deserialized.total_memory_usage);
374 }
375 #[test]
377 fn test_actor_snapshot_creation() {
378 let snapshot = create_test_actor_snapshot();
379 assert_eq!(snapshot.actor_id, ActorId(1));
380 assert_eq!(snapshot.name, "test_actor");
381 assert_eq!(snapshot.state, ActorState::Running);
382 assert_eq!(snapshot.mailbox_size, 5);
383 assert_eq!(snapshot.children.len(), 2);
384 assert!(snapshot.memory_usage.is_some());
385 }
386 #[test]
387 fn test_actor_snapshot_with_no_parent() {
388 let mut snapshot = create_test_actor_snapshot();
389 snapshot.parent = None;
390 assert!(snapshot.parent.is_none());
391 assert_eq!(snapshot.children.len(), 2);
392 }
393 #[test]
394 fn test_actor_snapshot_serialization() {
395 let snapshot = create_test_actor_snapshot();
396 let json = serde_json::to_string(&snapshot).unwrap();
397 let deserialized: ActorSnapshot = serde_json::from_str(&json).unwrap();
398 assert_eq!(snapshot.actor_id, deserialized.actor_id);
399 assert_eq!(snapshot.name, deserialized.name);
400 assert_eq!(snapshot.state, deserialized.state);
401 }
402 #[test]
404 fn test_message_filter_creation() {
405 let filter = create_test_message_filter();
406 assert_eq!(filter.name, "test_filter");
407 assert!(filter.actor_id.is_some());
408 assert!(filter.actor_name_pattern.is_some());
409 assert!(filter.message_type_pattern.is_some());
410 assert!(!filter.failed_only);
411 }
412 #[test]
413 fn test_message_filter_with_duration_limits() {
414 let filter = MessageFilter {
415 name: "duration_filter".to_string(),
416 actor_id: None,
417 actor_name_pattern: None,
418 message_type_pattern: None,
419 min_processing_time_us: Some(1000),
420 failed_only: false,
421 max_stack_depth: None,
422 };
423 assert_eq!(filter.min_processing_time_us, Some(1000));
424 }
425 #[test]
426 fn test_message_filter_errors_only() {
427 let filter = MessageFilter {
428 name: "error_filter".to_string(),
429 actor_id: None,
430 actor_name_pattern: None,
431 message_type_pattern: None,
432 min_processing_time_us: None,
433 failed_only: true,
434 max_stack_depth: None,
435 };
436 assert!(filter.failed_only);
437 }
438 #[test]
440 fn test_message_trace_creation() {
441 let trace = create_test_message_trace();
442 assert_eq!(trace.trace_id, 12345);
443 assert!(trace.source.is_some());
444 assert_eq!(trace.destination, ActorId(2));
445 assert_eq!(trace.status, MessageStatus::Queued);
446 assert_eq!(trace.stack_depth, 1);
447 assert!(trace.correlation_id.is_some());
448 }
449 #[test]
450 fn test_message_trace_with_processing() {
451 let mut trace = create_test_message_trace();
452 trace.status = MessageStatus::Processing;
453 trace.processing_duration_us = Some(1500);
454 assert_eq!(trace.status, MessageStatus::Processing);
455 assert_eq!(trace.processing_duration_us, Some(1500));
456 }
457 #[test]
458 fn test_message_trace_with_error() {
459 let mut trace = create_test_message_trace();
460 trace.status = MessageStatus::Failed;
461 trace.error = Some("timeout_error".to_string());
462 assert_eq!(trace.status, MessageStatus::Failed);
463 assert_eq!(trace.error, Some("timeout_error".to_string()));
464 }
465 #[test]
466 fn test_message_trace_serialization() {
467 let trace = create_test_message_trace();
468 let json = serde_json::to_string(&trace).unwrap();
469 let deserialized: MessageTrace = serde_json::from_str(&json).unwrap();
470 assert_eq!(trace.trace_id, deserialized.trace_id);
471 assert_eq!(trace.status, deserialized.status);
472 }
473 #[test]
475 fn test_current_timestamp() {
476 let ts1 = current_timestamp();
477 std::thread::sleep(Duration::from_millis(10)); let ts2 = current_timestamp();
479 assert!(ts2 >= ts1); }
481 #[test]
483 fn test_observatory_full_workflow() {
484 let mut observatory = create_test_observatory();
485 let filter = MessageFilter {
487 name: "permissive_filter".to_string(),
488 actor_id: None, actor_name_pattern: None,
490 message_type_pattern: None, min_processing_time_us: None,
492 failed_only: false,
493 max_stack_depth: None,
494 };
495 observatory.add_filter(filter);
496 assert_eq!(observatory.get_filters().len(), 1);
497 let trace = create_test_message_trace();
499 observatory.trace_message(trace.clone()).unwrap();
500 let traces = observatory.get_traces(None, None).unwrap();
502 assert_eq!(traces.len(), 1);
503 assert_eq!(traces[0].trace_id, trace.trace_id);
504 let removed = observatory.remove_filter("permissive_filter");
506 assert!(removed);
507 assert!(observatory.get_filters().is_empty());
508 }
509 #[test]
510 fn test_observatory_multiple_traces() {
511 let observatory = create_test_observatory();
512 let statuses = [
514 MessageStatus::Queued,
515 MessageStatus::Processing,
516 MessageStatus::Completed,
517 MessageStatus::Failed,
518 ];
519 for (i, status) in statuses.iter().enumerate() {
520 let mut trace = create_test_message_trace();
521 trace.trace_id = i as u64;
522 trace.status = status.clone();
523 observatory.trace_message(trace).unwrap();
524 }
525 let traces = observatory.get_traces(None, None).unwrap();
526 assert_eq!(traces.len(), 4);
527 let mut status_counts = std::collections::HashMap::new();
529 for trace in &traces {
530 *status_counts.entry(&trace.status).or_insert(0) += 1;
531 }
532 assert_eq!(status_counts.len(), 4);
533 }
534 #[test]
535 fn test_observatory_config_variations() {
536 let configs = vec![
537 ObservatoryConfig {
538 max_traces: 50,
539 enable_deadlock_detection: false,
540 enable_metrics: false,
541 ..ObservatoryConfig::default()
542 },
543 ObservatoryConfig {
544 trace_retention_seconds: 7200,
545 deadlock_check_interval_ms: 500,
546 metrics_interval_ms: 2000,
547 ..ObservatoryConfig::default()
548 },
549 ];
550 for config in configs {
551 let system = create_test_actor_system();
552 let observatory = ActorObservatory::new(system, config.clone());
553 assert_eq!(observatory.config.max_traces, config.max_traces);
554 assert_eq!(
555 observatory.config.enable_deadlock_detection,
556 config.enable_deadlock_detection
557 );
558 }
559 }
560 #[test]
561 fn test_observatory_concurrent_access() {
562 use std::thread;
563 let observatory = Arc::new(create_test_observatory());
564 let mut handles = vec![];
565 for i in 0..5 {
567 let obs = observatory.clone();
568 let handle = thread::spawn(move || {
569 let mut trace = create_test_message_trace();
570 trace.trace_id = i;
571 obs.trace_message(trace).unwrap();
572 });
573 handles.push(handle);
574 }
575 for handle in handles {
577 handle.join().expect("Thread failed to join");
578 }
579 let traces = observatory.get_traces(None, None).unwrap();
581 assert_eq!(traces.len(), 5);
582 }
583
584 #[test]
587 fn test_sprint_45_01_complex_deadlock_detection() {
588 let observatory = create_test_observatory();
589
590 let mut detector = observatory.deadlock_detector.lock().unwrap();
592
593 let request_ab = BlockedRequest {
594 requester: ActorId(1),
595 target: ActorId(2),
596 timestamp: Instant::now(),
597 timeout: Duration::from_secs(5),
598 correlation_id: None,
599 };
600
601 let request_bc = BlockedRequest {
602 requester: ActorId(2),
603 target: ActorId(3),
604 timestamp: Instant::now(),
605 timeout: Duration::from_secs(5),
606 correlation_id: None,
607 };
608
609 let request_ca = BlockedRequest {
610 requester: ActorId(3),
611 target: ActorId(1),
612 timestamp: Instant::now(),
613 timeout: Duration::from_secs(5),
614 correlation_id: None,
615 };
616
617 detector.add_blocked_request(request_ab);
618 detector.add_blocked_request(request_bc);
619 detector.add_blocked_request(request_ca);
620
621 let cycles = detector.detect_cycles().unwrap();
622 assert!(!cycles.is_empty(), "Should detect circular dependency");
623 assert_eq!(cycles[0].actors.len(), 3);
624 }
625
626 #[test]
627 fn test_sprint_45_02_message_filter_pattern_matching() {
628 let mut observatory = create_test_observatory();
629
630 let complex_filter = MessageFilter {
632 name: "complex_filter".to_string(),
633 actor_id: Some(ActorId(5)),
634 actor_name_pattern: Some(r"worker_\d+".to_string()),
635 message_type_pattern: Some(r"ProcessJob\{.*\}".to_string()),
636 min_processing_time_us: Some(1000),
637 failed_only: false,
638 max_stack_depth: Some(3),
639 };
640
641 observatory.add_filter(complex_filter);
642
643 let matching_trace = MessageTrace {
645 trace_id: 100,
646 timestamp: current_timestamp(),
647 source: Some(ActorId(1)),
648 destination: ActorId(5),
649 message: Message::User("ProcessJob{id: 42}".to_string(), vec![]),
650 status: MessageStatus::Processing,
651 processing_duration_us: Some(1500),
652 error: None,
653 stack_depth: 2,
654 correlation_id: Some("job-42".to_string()),
655 };
656
657 assert!(observatory.message_matches_filters(&matching_trace));
659
660 let non_matching_trace = MessageTrace {
662 trace_id: 101,
663 timestamp: current_timestamp(),
664 source: Some(ActorId(1)),
665 destination: ActorId(6), message: matching_trace.message,
667 status: MessageStatus::Processing,
668 processing_duration_us: Some(1500),
669 error: None,
670 stack_depth: 2,
671 correlation_id: Some("job-43".to_string()),
672 };
673
674 assert!(!observatory.message_matches_filters(&non_matching_trace));
675 }
676
677 #[test]
678 fn test_sprint_45_03_metrics_calculation_accuracy() {
679 let observatory = create_test_observatory();
680
681 let mut snapshots = observatory.actor_snapshots.lock().unwrap();
683
684 for i in 1..=5 {
685 let snapshot = ActorSnapshot {
686 actor_id: ActorId(i),
687 name: format!("actor_{i}"),
688 timestamp: current_timestamp(),
689 state: ActorState::Running,
690 mailbox_size: (i * 10) as usize, parent: if i > 1 { Some(ActorId(i - 1)) } else { None },
692 children: vec![],
693 message_stats: MessageStats {
694 total_processed: i * 100, messages_per_second: i as f64 * 10.0, avg_processing_time_us: i as f64 * 1000.0, max_processing_time_us: i * 2000, failed_messages: i * 2, last_processed: Some(current_timestamp()),
700 },
701 memory_usage: Some((i * 1024) as usize), };
703 snapshots.insert(ActorId(i), snapshot);
704 }
705 drop(snapshots);
706
707 let result = observatory.update_metrics();
709 assert!(result.is_ok());
710
711 let metrics = observatory.get_metrics().unwrap();
713 assert_eq!(metrics.active_actors, 5);
714 assert_eq!(metrics.total_messages_processed, 1500); assert_eq!(metrics.total_queued_messages, 150); assert!((metrics.avg_mailbox_size - 30.0).abs() < 0.1); assert_eq!(metrics.total_memory_usage, 15360); }
719
720 #[test]
721 fn test_sprint_45_04_trace_correlation_tracking() {
722 let observatory = create_test_observatory();
723
724 let correlation_id = "operation-xyz-123".to_string();
725
726 for i in 1..=3 {
728 let trace = MessageTrace {
729 trace_id: i,
730 timestamp: current_timestamp(),
731 source: Some(ActorId(i)),
732 destination: ActorId(i + 1),
733 message: Message::User(format!("step_{i}"), vec![]),
734 status: MessageStatus::Completed,
735 processing_duration_us: Some(i * 100),
736 error: None,
737 stack_depth: i as usize,
738 correlation_id: Some(correlation_id.clone()),
739 };
740 observatory.trace_message(trace).unwrap();
741 }
742
743 let all_traces = observatory.get_traces(None, None).unwrap();
745 let correlated_traces: Vec<_> = all_traces
746 .iter()
747 .filter(|t| t.correlation_id.as_ref() == Some(&correlation_id))
748 .collect();
749
750 assert_eq!(correlated_traces.len(), 3);
751
752 for (i, trace) in correlated_traces.iter().enumerate() {
754 assert_eq!(trace.trace_id, (i + 1) as u64);
755 }
756 }
757
758 #[test]
759 fn test_sprint_45_05_actor_hierarchy_analysis() {
760 let observatory = create_test_observatory();
761
762 let mut snapshots = observatory.actor_snapshots.lock().unwrap();
764
765 snapshots.insert(
767 ActorId(1),
768 ActorSnapshot {
769 actor_id: ActorId(1),
770 name: "root".to_string(),
771 timestamp: current_timestamp(),
772 state: ActorState::Running,
773 mailbox_size: 0,
774 parent: None,
775 children: vec![ActorId(2), ActorId(3)],
776 message_stats: MessageStats::default(),
777 memory_usage: Some(1024),
778 },
779 );
780
781 for i in 2..=3 {
783 snapshots.insert(
784 ActorId(i),
785 ActorSnapshot {
786 actor_id: ActorId(i),
787 name: format!("child_{i}"),
788 timestamp: current_timestamp(),
789 state: ActorState::Running,
790 mailbox_size: 5,
791 parent: Some(ActorId(1)),
792 children: vec![ActorId(i + 2)], message_stats: MessageStats::default(),
794 memory_usage: Some(512),
795 },
796 );
797 }
798
799 for i in 4..=5 {
801 snapshots.insert(
802 ActorId(i),
803 ActorSnapshot {
804 actor_id: ActorId(i),
805 name: format!("grandchild_{i}"),
806 timestamp: current_timestamp(),
807 state: ActorState::Running,
808 mailbox_size: 2,
809 parent: Some(ActorId(i - 2)),
810 children: vec![],
811 message_stats: MessageStats::default(),
812 memory_usage: Some(256),
813 },
814 );
815 }
816 drop(snapshots);
817
818 let snapshots = observatory.get_actor_snapshots().unwrap();
820 let root = snapshots.get(&ActorId(1)).unwrap();
821 assert_eq!(root.children.len(), 2);
822 assert!(root.parent.is_none());
823
824 let child = snapshots.get(&ActorId(2)).unwrap();
825 assert_eq!(child.parent, Some(ActorId(1)));
826 assert_eq!(child.children.len(), 1);
827 }
828
829 #[test]
830 fn test_sprint_45_06_performance_degradation_detection() {
831 let observatory = create_test_observatory();
832
833 let base_time = current_timestamp();
835
836 for i in 1..=10 {
837 let trace = MessageTrace {
838 trace_id: i,
839 timestamp: base_time + (i * 1000), source: Some(ActorId(1)),
841 destination: ActorId(2),
842 message: Message::User("process".to_string(), vec![]),
843 status: MessageStatus::Completed,
844 processing_duration_us: Some(i * i * 100), error: None,
846 stack_depth: 1,
847 correlation_id: None,
848 };
849 observatory.trace_message(trace).unwrap();
850 }
851
852 let traces = observatory.get_traces(None, None).unwrap();
853 assert_eq!(traces.len(), 10);
854
855 let processing_times: Vec<_> = traces
857 .iter()
858 .filter_map(|t| t.processing_duration_us)
859 .collect();
860
861 for i in 1..processing_times.len() {
863 assert!(processing_times[i] > processing_times[i - 1]);
864 }
865
866 assert!(processing_times.last().unwrap() > &(processing_times[0] * 10));
868 }
869
870 #[test]
871 fn test_sprint_45_07_memory_leak_detection() {
872 let observatory = create_test_observatory();
873
874 let mut snapshots = observatory.actor_snapshots.lock().unwrap();
876
877 let actor_id = ActorId(1);
878 let base_time = current_timestamp();
879
880 for i in 1..=5 {
882 let snapshot = ActorSnapshot {
883 actor_id,
884 name: "leaky_actor".to_string(),
885 timestamp: base_time + (i * 60000), state: ActorState::Running,
887 mailbox_size: 10,
888 parent: None,
889 children: vec![],
890 message_stats: MessageStats::default(),
891 memory_usage: Some((1024 * i * i) as usize), };
893 snapshots.insert(actor_id, snapshot); }
895 drop(snapshots);
896
897 let final_snapshot = observatory.get_actor_snapshot(actor_id).unwrap().unwrap();
899 assert_eq!(final_snapshot.memory_usage, Some(1024 * 25)); assert!(final_snapshot.memory_usage.unwrap() > 10240); }
904
905 #[test]
906 fn test_sprint_45_08_error_propagation_tracking() {
907 let observatory = create_test_observatory();
908
909 let error_msg = "Database connection failed".to_string();
911
912 for i in 1..=4 {
913 let trace = MessageTrace {
914 trace_id: i,
915 timestamp: current_timestamp() + (i * 100),
916 source: Some(ActorId(i)),
917 destination: ActorId(i + 1),
918 message: Message::User("database_query".to_string(), vec![]),
919 status: MessageStatus::Failed,
920 processing_duration_us: Some(50),
921 error: Some(error_msg.clone()),
922 stack_depth: i as usize,
923 correlation_id: Some("db-op-456".to_string()),
924 };
925 observatory.trace_message(trace).unwrap();
926 }
927
928 let traces = observatory.get_traces(None, None).unwrap();
930 let failed_traces: Vec<_> = traces
931 .iter()
932 .filter(|t| t.status == MessageStatus::Failed)
933 .collect();
934
935 assert_eq!(failed_traces.len(), 4);
936
937 for trace in failed_traces {
939 assert_eq!(trace.error.as_ref().unwrap(), &error_msg);
940 assert_eq!(trace.correlation_id.as_ref().unwrap(), "db-op-456");
941 }
942 }
943
944 #[test]
945 fn test_sprint_45_09_concurrent_trace_access() {
946 let observatory = Arc::new(create_test_observatory());
947 let mut handles = vec![];
948
949 for thread_id in 0..3 {
951 let obs = Arc::clone(&observatory);
952 let handle = std::thread::spawn(move || {
953 for i in 0..10 {
954 let trace = MessageTrace {
955 trace_id: (thread_id * 100 + i) as u64,
956 timestamp: current_timestamp(),
957 source: Some(ActorId(thread_id as u64)),
958 destination: ActorId((thread_id + 1) as u64),
959 message: Message::User(format!("thread_{thread_id}_msg_{i}"), vec![]),
960 status: MessageStatus::Completed,
961 processing_duration_us: Some(100 + i as u64),
962 error: None,
963 stack_depth: 1,
964 correlation_id: Some(format!("thread_{thread_id}")),
965 };
966 obs.trace_message(trace).unwrap();
967
968 let _traces = obs.get_traces(None, Some("5")).unwrap();
970 }
971 });
972 handles.push(handle);
973 }
974
975 for handle in handles {
977 handle.join().unwrap();
978 }
979
980 let final_traces = observatory.get_traces(None, None).unwrap();
982 assert_eq!(final_traces.len(), 30); }
984
985 #[test]
986 fn test_sprint_45_10_snapshot_time_series_analysis() {
987 let observatory = create_test_observatory();
988 let actor_id = ActorId(1);
989
990 let mut snapshots = observatory.actor_snapshots.lock().unwrap();
992 let base_time = current_timestamp();
993
994 for i in 1..=5 {
995 let snapshot = ActorSnapshot {
996 actor_id,
997 name: "monitored_actor".to_string(),
998 timestamp: base_time + (i * 10000), state: if i == 5 {
1000 ActorState::Failed("Simulated crash".to_string())
1001 } else {
1002 ActorState::Running
1003 },
1004 mailbox_size: if i < 4 { (i * 2) as usize } else { 0 }, parent: None,
1006 children: vec![],
1007 message_stats: MessageStats {
1008 total_processed: i * 50,
1009 messages_per_second: i as f64 * 5.0,
1010 avg_processing_time_us: 100.0 * i as f64,
1011 max_processing_time_us: i * 200,
1012 failed_messages: if i >= 4 { i } else { 0 },
1013 last_processed: Some(base_time + (i * 10000)),
1014 },
1015 memory_usage: Some((1024 * i) as usize),
1016 };
1017 snapshots.insert(actor_id, snapshot);
1018 }
1019 drop(snapshots);
1020
1021 let final_snapshot = observatory.get_actor_snapshot(actor_id).unwrap().unwrap();
1023 assert!(matches!(final_snapshot.state, ActorState::Failed(_)));
1024 assert_eq!(final_snapshot.mailbox_size, 0);
1025 assert_eq!(final_snapshot.message_stats.failed_messages, 5);
1026 }
1027
1028 #[test]
1029 fn test_sprint_45_11_filter_performance_optimization() {
1030 let mut observatory = create_test_observatory();
1031
1032 for i in 1..=100 {
1034 let filter = MessageFilter {
1035 name: format!("filter_{i}"),
1036 actor_id: Some(ActorId(i)),
1037 actor_name_pattern: None,
1038 message_type_pattern: None,
1039 min_processing_time_us: None,
1040 failed_only: false,
1041 max_stack_depth: None,
1042 };
1043 observatory.add_filter(filter);
1044 }
1045
1046 let test_trace = create_test_message_trace();
1048 let start_time = std::time::Instant::now();
1049
1050 for _ in 0..1000 {
1052 let _matches = observatory.message_matches_filters(&test_trace);
1053 }
1054
1055 let elapsed = start_time.elapsed();
1056 assert!(
1057 elapsed < Duration::from_millis(100),
1058 "Filter matching should be fast even with many filters"
1059 );
1060
1061 assert_eq!(observatory.get_filters().len(), 100);
1063 }
1064
1065 #[test]
1066 fn test_sprint_45_12_deadlock_resolution_tracking() {
1067 let observatory = create_test_observatory();
1068 let mut detector = observatory.deadlock_detector.lock().unwrap();
1069
1070 let request1 = BlockedRequest {
1072 requester: ActorId(1),
1073 target: ActorId(2),
1074 timestamp: Instant::now(),
1075 timeout: Duration::from_millis(5000),
1076 correlation_id: None,
1077 };
1078
1079 let request2 = BlockedRequest {
1080 requester: ActorId(2),
1081 target: ActorId(1),
1082 timestamp: Instant::now(),
1083 timeout: Duration::from_millis(5000),
1084 correlation_id: None,
1085 };
1086
1087 detector.add_blocked_request(request1);
1088 detector.add_blocked_request(request2);
1089
1090 let cycles = detector.detect_cycles().unwrap();
1092 assert!(!cycles.is_empty());
1093
1094 detector.remove_blocked_request(ActorId(1), ActorId(2));
1096
1097 let cycles_after = detector.detect_cycles().unwrap();
1099 assert!(cycles_after.is_empty() || cycles_after.len() < cycles.len());
1100 }
1101
1102 #[test]
1103 fn test_sprint_45_13_observatory_uptime_tracking() {
1104 let observatory = create_test_observatory();
1105
1106 let uptime1 = observatory.uptime();
1108 assert!(uptime1 < Duration::from_secs(1));
1109
1110 std::thread::sleep(Duration::from_millis(10));
1112
1113 let uptime2 = observatory.uptime();
1115 assert!(uptime2 > uptime1);
1116 assert!(uptime2 >= Duration::from_millis(10));
1117 }
1118
1119 #[test]
1120 fn test_sprint_45_14_message_trace_limits() {
1121 let mut config = create_test_config();
1122 config.max_traces = 3; let system = create_test_actor_system();
1125 let observatory = ActorObservatory::new(system, config);
1126
1127 for i in 1..=5 {
1129 let trace = MessageTrace {
1130 trace_id: i,
1131 timestamp: current_timestamp() + (i * 1000),
1132 source: Some(ActorId(1)),
1133 destination: ActorId(2),
1134 message: Message::User(format!("msg_{i}"), vec![]),
1135 status: MessageStatus::Completed,
1136 processing_duration_us: Some(100),
1137 error: None,
1138 stack_depth: 1,
1139 correlation_id: None,
1140 };
1141 observatory.trace_message(trace).unwrap();
1142 }
1143
1144 let traces = observatory.get_traces(None, None).unwrap();
1146 assert_eq!(traces.len(), 3);
1147
1148 let trace_ids: Vec<_> = traces.iter().map(|t| t.trace_id).collect();
1150 assert!(trace_ids.contains(&3));
1151 assert!(trace_ids.contains(&4));
1152 assert!(trace_ids.contains(&5));
1153 }
1154
1155 #[test]
1156 fn test_sprint_45_15_metrics_historical_tracking() {
1157 let observatory = create_test_observatory();
1158
1159 observatory.update_metrics().unwrap();
1161 let metrics1 = observatory.get_metrics().unwrap();
1162 let time1 = metrics1.last_updated;
1163
1164 let mut snapshots = observatory.actor_snapshots.lock().unwrap();
1166 snapshots.insert(ActorId(1), create_test_actor_snapshot());
1167 drop(snapshots);
1168
1169 std::thread::sleep(Duration::from_millis(100)); observatory.update_metrics().unwrap();
1172 let metrics2 = observatory.get_metrics().unwrap();
1173
1174 assert!(metrics2.last_updated >= time1);
1176 assert_eq!(metrics2.active_actors, 1);
1177 }
1178
1179 #[test]
1180 fn test_sprint_45_16_complex_filter_combinations() {
1181 let mut observatory = create_test_observatory();
1182
1183 let filter1 = MessageFilter {
1185 name: "high_latency_filter".to_string(),
1186 actor_id: None,
1187 actor_name_pattern: None,
1188 message_type_pattern: None,
1189 min_processing_time_us: Some(1000),
1190 failed_only: false,
1191 max_stack_depth: None,
1192 };
1193
1194 let filter2 = MessageFilter {
1195 name: "error_filter".to_string(),
1196 actor_id: None,
1197 actor_name_pattern: None,
1198 message_type_pattern: None,
1199 min_processing_time_us: None,
1200 failed_only: true,
1201 max_stack_depth: None,
1202 };
1203
1204 observatory.add_filter(filter1);
1205 observatory.add_filter(filter2);
1206
1207 let high_latency_trace = MessageTrace {
1209 trace_id: 1,
1210 timestamp: current_timestamp(),
1211 source: Some(ActorId(1)),
1212 destination: ActorId(2),
1213 message: Message::User("slow_operation".to_string(), vec![]),
1214 status: MessageStatus::Completed,
1215 processing_duration_us: Some(2000), error: None,
1217 stack_depth: 1,
1218 correlation_id: None,
1219 };
1220
1221 let error_trace = MessageTrace {
1223 trace_id: 2,
1224 timestamp: current_timestamp(),
1225 source: Some(ActorId(2)),
1226 destination: ActorId(3),
1227 message: Message::User("failing_operation".to_string(), vec![]),
1228 status: MessageStatus::Failed, processing_duration_us: Some(100),
1230 error: Some("Operation failed".to_string()),
1231 stack_depth: 1,
1232 correlation_id: None,
1233 };
1234
1235 assert!(observatory.message_matches_filters(&high_latency_trace));
1237 assert!(observatory.message_matches_filters(&error_trace));
1238 }
1239
1240 #[test]
1241 fn test_sprint_45_17_actor_state_transitions() {
1242 let observatory = create_test_observatory();
1243 let actor_id = ActorId(1);
1244
1245 let states = [
1247 ActorState::Starting,
1248 ActorState::Running,
1249 ActorState::Stopping,
1250 ActorState::Running,
1251 ActorState::Failed("Simulated failure".to_string()),
1252 ];
1253
1254 let mut snapshots = observatory.actor_snapshots.lock().unwrap();
1255 for (i, state) in states.iter().enumerate() {
1256 let snapshot = ActorSnapshot {
1257 actor_id,
1258 name: "transitioning_actor".to_string(),
1259 timestamp: current_timestamp() + ((i + 1) * 1000) as u64,
1260 state: state.clone(),
1261 mailbox_size: if matches!(state, ActorState::Failed(_)) {
1262 0
1263 } else {
1264 5
1265 },
1266 parent: None,
1267 children: vec![],
1268 message_stats: MessageStats::default(),
1269 memory_usage: Some(1024),
1270 };
1271 snapshots.insert(actor_id, snapshot);
1272 }
1273 drop(snapshots);
1274
1275 let final_snapshot = observatory.get_actor_snapshot(actor_id).unwrap().unwrap();
1277 assert!(matches!(final_snapshot.state, ActorState::Failed(_)));
1278 assert_eq!(final_snapshot.mailbox_size, 0);
1279 }
1280
1281 #[test]
1282 fn test_sprint_45_18_trace_stack_depth_analysis() {
1283 let observatory = create_test_observatory();
1284
1285 for depth in 1..=10 {
1287 let trace = MessageTrace {
1288 trace_id: depth,
1289 timestamp: current_timestamp(),
1290 source: Some(ActorId(depth)),
1291 destination: ActorId(depth + 1),
1292 message: Message::User(format!("depth_{depth}_call"), vec![]),
1293 status: MessageStatus::Completed,
1294 processing_duration_us: Some(depth * 10),
1295 error: None,
1296 stack_depth: depth as usize,
1297 correlation_id: Some("deep_call_chain".to_string()),
1298 };
1299 observatory.trace_message(trace).unwrap();
1300 }
1301
1302 let traces = observatory.get_traces(None, None).unwrap();
1304 assert_eq!(traces.len(), 10);
1305
1306 let max_depth = traces.iter().map(|t| t.stack_depth).max().unwrap();
1308 assert_eq!(max_depth, 10);
1309
1310 let mut observatory_mut = create_test_observatory();
1312 let filter = MessageFilter {
1313 name: "shallow_calls".to_string(),
1314 actor_id: None,
1315 actor_name_pattern: None,
1316 message_type_pattern: None,
1317 min_processing_time_us: None,
1318 failed_only: false,
1319 max_stack_depth: Some(3),
1320 };
1321 observatory_mut.add_filter(filter);
1322
1323 let shallow_trace = MessageTrace {
1325 trace_id: 999,
1326 timestamp: current_timestamp(),
1327 source: Some(ActorId(1)),
1328 destination: ActorId(2),
1329 message: Message::User("shallow".to_string(), vec![]),
1330 status: MessageStatus::Completed,
1331 processing_duration_us: Some(50),
1332 error: None,
1333 stack_depth: 2,
1334 correlation_id: None,
1335 };
1336
1337 assert!(observatory_mut.message_matches_filters(&shallow_trace));
1338
1339 let deep_trace = MessageTrace {
1341 trace_id: 1000,
1342 timestamp: current_timestamp(),
1343 source: Some(ActorId(1)),
1344 destination: ActorId(2),
1345 message: Message::User("deep".to_string(), vec![]),
1346 status: MessageStatus::Completed,
1347 processing_duration_us: Some(50),
1348 error: None,
1349 stack_depth: 5,
1350 correlation_id: None,
1351 };
1352
1353 assert!(!observatory_mut.message_matches_filters(&deep_trace));
1354 }
1355
1356 #[test]
1357 fn test_sprint_45_19_observatory_config_validation() {
1358 let configs = vec![
1360 ObservatoryConfig {
1361 max_traces: 0, trace_retention_seconds: 1,
1363 enable_deadlock_detection: false,
1364 deadlock_check_interval_ms: 100,
1365 enable_metrics: false,
1366 metrics_interval_ms: 100,
1367 max_snapshots: 0,
1368 },
1369 ObservatoryConfig {
1370 max_traces: 1_000_000, trace_retention_seconds: 86_400, enable_deadlock_detection: true,
1373 deadlock_check_interval_ms: 50, enable_metrics: true,
1375 metrics_interval_ms: 10, max_snapshots: 1_000_000,
1377 },
1378 ];
1379
1380 for config in configs {
1381 let system = create_test_actor_system();
1382 let observatory = ActorObservatory::new(system, config.clone());
1383
1384 assert_eq!(observatory.config.max_traces, config.max_traces);
1386 assert_eq!(
1387 observatory.config.enable_deadlock_detection,
1388 config.enable_deadlock_detection
1389 );
1390 assert_eq!(observatory.config.enable_metrics, config.enable_metrics);
1391
1392 let uptime = observatory.uptime();
1394 assert!(uptime < Duration::from_secs(1));
1395 }
1396 }
1397
1398 #[test]
1399 fn test_sprint_45_20_comprehensive_observatory_integration() {
1400 let mut observatory = create_test_observatory();
1401
1402 let filters = vec![
1406 MessageFilter {
1407 name: "errors".to_string(),
1408 actor_id: None,
1409 actor_name_pattern: None,
1410 message_type_pattern: None,
1411 min_processing_time_us: None,
1412 failed_only: true,
1413 max_stack_depth: None,
1414 },
1415 MessageFilter {
1416 name: "slow_operations".to_string(),
1417 actor_id: None,
1418 actor_name_pattern: None,
1419 message_type_pattern: None,
1420 min_processing_time_us: Some(1000),
1421 failed_only: false,
1422 max_stack_depth: None,
1423 },
1424 ];
1425
1426 for filter in filters {
1427 observatory.add_filter(filter);
1428 }
1429
1430 let mut snapshots = observatory.actor_snapshots.lock().unwrap();
1432 for i in 1..=5 {
1433 let snapshot = ActorSnapshot {
1434 actor_id: ActorId(i),
1435 name: format!("actor_{i}"),
1436 timestamp: current_timestamp(),
1437 state: ActorState::Running,
1438 mailbox_size: (i * 2) as usize,
1439 parent: if i > 1 { Some(ActorId(1)) } else { None },
1440 children: if i == 1 {
1441 vec![ActorId(2), ActorId(3), ActorId(4), ActorId(5)]
1442 } else {
1443 vec![]
1444 },
1445 message_stats: MessageStats {
1446 total_processed: i * 10,
1447 failed_messages: u64::from(i % 2 == 0),
1448 avg_processing_time_us: 500.0,
1449 last_processed: Some(current_timestamp()),
1450 messages_per_second: 10.0,
1451 max_processing_time_us: 1000,
1452 },
1453 memory_usage: Some((1024 * i) as usize),
1454 };
1455 snapshots.insert(ActorId(i), snapshot);
1456 }
1457 drop(snapshots);
1458
1459 let traces = vec![
1461 MessageTrace {
1462 trace_id: 1,
1463 timestamp: current_timestamp(),
1464 source: Some(ActorId(1)),
1465 destination: ActorId(2),
1466 message: Message::User("normal_operation".to_string(), vec![]),
1467 status: MessageStatus::Completed,
1468 processing_duration_us: Some(200),
1469 error: None,
1470 stack_depth: 1,
1471 correlation_id: Some("op_1".to_string()),
1472 },
1473 MessageTrace {
1474 trace_id: 2,
1475 timestamp: current_timestamp(),
1476 source: Some(ActorId(2)),
1477 destination: ActorId(3),
1478 message: Message::User("slow_operation".to_string(), vec![]),
1479 status: MessageStatus::Completed,
1480 processing_duration_us: Some(1500), error: None,
1482 stack_depth: 2,
1483 correlation_id: Some("op_1".to_string()),
1484 },
1485 MessageTrace {
1486 trace_id: 3,
1487 timestamp: current_timestamp(),
1488 source: Some(ActorId(3)),
1489 destination: ActorId(4),
1490 message: Message::User("failing_operation".to_string(), vec![]),
1491 status: MessageStatus::Failed, processing_duration_us: Some(100),
1493 error: Some("Operation failed".to_string()),
1494 stack_depth: 3,
1495 correlation_id: Some("op_1".to_string()),
1496 },
1497 ];
1498
1499 for trace in traces {
1500 observatory.trace_message(trace).unwrap();
1501 }
1502
1503 observatory.update_metrics().unwrap();
1505
1506 assert_eq!(observatory.get_filters().len(), 2);
1510
1511 let actor_snapshots = observatory.get_actor_snapshots().unwrap();
1513 assert_eq!(actor_snapshots.len(), 5);
1514
1515 let metrics = observatory.get_metrics().unwrap();
1517 assert_eq!(metrics.active_actors, 5);
1518 assert_eq!(metrics.total_messages_processed, 150); assert_eq!(metrics.total_queued_messages, 30); let all_traces = observatory.get_traces(None, None).unwrap();
1523 assert_eq!(all_traces.len(), 2);
1524
1525 let uptime = observatory.uptime();
1527 assert!(uptime < Duration::from_secs(1));
1528
1529 let slow_trace = &all_traces[0]; let error_trace = &all_traces[1]; assert!(observatory.message_matches_filters(slow_trace));
1534 assert!(observatory.message_matches_filters(error_trace));
1535
1536 assert!(observatory.trace_matches_filter(slow_trace, "slow_operations"));
1538 assert!(observatory.trace_matches_filter(error_trace, "errors"));
1539 assert!(!observatory.trace_matches_filter(slow_trace, "errors"));
1540 assert!(!observatory.trace_matches_filter(error_trace, "slow_operations"));
1541 }
1542}
1543use crate::runtime::actor::{ActorId, ActorSystem, Message};
1544use anyhow::Result;
1545use serde::{Deserialize, Serialize};
1546use std::collections::{HashMap, HashSet, VecDeque};
1547use std::sync::{Arc, Mutex};
1548use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
1549pub struct ActorObservatory {
1551 actor_system: Arc<Mutex<ActorSystem>>,
1553 message_traces: Arc<Mutex<VecDeque<MessageTrace>>>,
1555 actor_snapshots: Arc<Mutex<HashMap<ActorId, ActorSnapshot>>>,
1557 deadlock_detector: Arc<Mutex<DeadlockDetector>>,
1559 config: ObservatoryConfig,
1561 filters: Vec<MessageFilter>,
1563 metrics: Arc<Mutex<SystemMetrics>>,
1565 start_time: Instant,
1567}
1568#[derive(Debug, Clone, Serialize, Deserialize)]
1570pub struct ObservatoryConfig {
1571 pub max_traces: usize,
1573 pub trace_retention_seconds: u64,
1575 pub enable_deadlock_detection: bool,
1577 pub deadlock_check_interval_ms: u64,
1579 pub enable_metrics: bool,
1581 pub metrics_interval_ms: u64,
1583 pub max_snapshots: usize,
1585}
1586impl Default for ObservatoryConfig {
1587 fn default() -> Self {
1588 Self {
1589 max_traces: 10000,
1590 trace_retention_seconds: 3600, enable_deadlock_detection: true,
1592 deadlock_check_interval_ms: 1000, enable_metrics: true,
1594 metrics_interval_ms: 5000, max_snapshots: 1000,
1596 }
1597 }
1598}
1599#[derive(Debug, Clone, Serialize, Deserialize)]
1601pub struct MessageTrace {
1602 pub trace_id: u64,
1604 pub timestamp: u64,
1606 pub source: Option<ActorId>,
1608 pub destination: ActorId,
1610 pub message: Message,
1612 pub status: MessageStatus,
1614 pub processing_duration_us: Option<u64>,
1616 pub error: Option<String>,
1618 pub stack_depth: usize,
1620 pub correlation_id: Option<String>,
1622}
1623#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1625pub enum MessageStatus {
1626 Queued,
1628 Processing,
1630 Completed,
1632 Failed,
1634 Dropped,
1636}
1637#[derive(Debug, Clone, Serialize, Deserialize)]
1639pub struct ActorSnapshot {
1640 pub actor_id: ActorId,
1642 pub name: String,
1644 pub timestamp: u64,
1646 pub state: ActorState,
1648 pub mailbox_size: usize,
1650 pub parent: Option<ActorId>,
1652 pub children: Vec<ActorId>,
1654 pub message_stats: MessageStats,
1656 pub memory_usage: Option<usize>,
1658}
1659#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1661pub enum ActorState {
1662 Starting,
1664 Running,
1666 Processing(String), Restarting,
1670 Stopping,
1672 Stopped,
1674 Failed(String), }
1677#[derive(Debug, Clone, Serialize, Deserialize)]
1679pub struct MessageStats {
1680 pub total_processed: u64,
1682 pub messages_per_second: f64,
1684 pub avg_processing_time_us: f64,
1686 pub max_processing_time_us: u64,
1688 pub failed_messages: u64,
1690 pub last_processed: Option<u64>,
1692}
1693impl Default for MessageStats {
1694 fn default() -> Self {
1695 Self {
1696 total_processed: 0,
1697 messages_per_second: 0.0,
1698 avg_processing_time_us: 0.0,
1699 max_processing_time_us: 0,
1700 failed_messages: 0,
1701 last_processed: None,
1702 }
1703 }
1704}
1705#[derive(Debug, Clone, Serialize, Deserialize)]
1707pub struct SystemMetrics {
1708 pub active_actors: usize,
1710 pub total_messages_processed: u64,
1712 pub system_messages_per_second: f64,
1714 pub total_memory_usage: usize,
1716 pub total_queued_messages: usize,
1718 pub avg_mailbox_size: f64,
1720 pub recent_restarts: u64,
1722 pub last_updated: u64,
1724}
1725impl Default for SystemMetrics {
1726 fn default() -> Self {
1727 Self {
1728 active_actors: 0,
1729 total_messages_processed: 0,
1730 system_messages_per_second: 0.0,
1731 total_memory_usage: 0,
1732 total_queued_messages: 0,
1733 avg_mailbox_size: 0.0,
1734 recent_restarts: 0,
1735 last_updated: current_timestamp(),
1736 }
1737 }
1738}
1739#[derive(Debug, Clone, Serialize, Deserialize)]
1741pub struct MessageFilter {
1742 pub name: String,
1744 pub actor_id: Option<ActorId>,
1746 pub actor_name_pattern: Option<String>,
1748 pub message_type_pattern: Option<String>,
1750 pub min_processing_time_us: Option<u64>,
1752 pub failed_only: bool,
1754 pub max_stack_depth: Option<usize>,
1756}
1757#[derive(Debug)]
1759pub struct DeadlockDetector {
1760 dependency_graph: HashMap<ActorId, HashSet<ActorId>>,
1762 blocked_actors: HashMap<ActorId, Vec<BlockedRequest>>,
1764 last_check: Instant,
1766 detected_deadlocks: Vec<DeadlockCycle>,
1768}
1769#[derive(Debug, Clone)]
1771pub struct BlockedRequest {
1772 pub requester: ActorId,
1774 pub target: ActorId,
1776 pub timestamp: Instant,
1778 pub timeout: Duration,
1780 pub correlation_id: Option<String>,
1782}
1783#[derive(Debug, Clone, Serialize, Deserialize)]
1785pub struct DeadlockCycle {
1786 pub actors: Vec<ActorId>,
1788 pub detected_at: u64,
1790 pub duration_estimate_ms: u64,
1792 pub resolution_suggestion: String,
1794}
1795impl ActorObservatory {
1796 pub fn new(actor_system: Arc<Mutex<ActorSystem>>, config: ObservatoryConfig) -> Self {
1822 Self {
1823 actor_system,
1824 message_traces: Arc::new(Mutex::new(VecDeque::new())),
1825 actor_snapshots: Arc::new(Mutex::new(HashMap::new())),
1826 deadlock_detector: Arc::new(Mutex::new(DeadlockDetector::new())),
1827 config,
1828 filters: Vec::new(),
1829 metrics: Arc::new(Mutex::new(SystemMetrics::default())),
1830 start_time: Instant::now(),
1831 }
1832 }
1833 pub fn add_filter(&mut self, filter: MessageFilter) {
1844 self.filters.push(filter);
1845 }
1846 pub fn remove_filter(&mut self, name: &str) -> bool {
1856 let initial_len = self.filters.len();
1857 self.filters.retain(|f| f.name != name);
1858 self.filters.len() != initial_len
1859 }
1860 pub fn get_filters(&self) -> &[MessageFilter] {
1870 &self.filters
1871 }
1872 pub fn trace_message(&self, trace: MessageTrace) -> Result<()> {
1882 let mut traces = self
1883 .message_traces
1884 .lock()
1885 .map_err(|_| anyhow::anyhow!("Failed to acquire message traces lock"))?;
1886 if !self.message_matches_filters(&trace) {
1888 return Ok(());
1889 }
1890 traces.push_back(trace);
1891 while traces.len() > self.config.max_traces {
1893 traces.pop_front();
1894 }
1895 let retention_threshold = current_timestamp() - self.config.trace_retention_seconds;
1897 while let Some(front) = traces.front() {
1898 if front.timestamp < retention_threshold {
1899 traces.pop_front();
1900 } else {
1901 break;
1902 }
1903 }
1904 Ok(())
1905 }
1906 pub fn get_traces(
1916 &self,
1917 limit: Option<usize>,
1918 filter_name: Option<&str>,
1919 ) -> Result<Vec<MessageTrace>> {
1920 let traces = self
1921 .message_traces
1922 .lock()
1923 .map_err(|_| anyhow::anyhow!("Failed to acquire message traces lock"))?;
1924 let mut result: Vec<MessageTrace> = if let Some(filter_name) = filter_name {
1925 traces
1926 .iter()
1927 .filter(|trace| self.trace_matches_filter(trace, filter_name))
1928 .cloned()
1929 .collect()
1930 } else {
1931 traces.iter().cloned().collect()
1932 };
1933 if let Some(limit) = limit {
1934 result.truncate(limit);
1935 }
1936 Ok(result)
1937 }
1938 pub fn update_actor_snapshot(&self, snapshot: ActorSnapshot) -> Result<()> {
1948 let mut snapshots = self
1949 .actor_snapshots
1950 .lock()
1951 .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?;
1952 snapshots.insert(snapshot.actor_id, snapshot);
1953 if snapshots.len() > self.config.max_snapshots {
1955 let mut oldest_actors: Vec<_> = snapshots
1957 .iter()
1958 .map(|(&id, snapshot)| (id, snapshot.timestamp))
1959 .collect();
1960 oldest_actors.sort_by_key(|(_, timestamp)| *timestamp);
1961 let to_remove = snapshots.len() - self.config.max_snapshots;
1962 for i in 0..to_remove {
1963 if let Some((actor_id, _)) = oldest_actors.get(i) {
1964 snapshots.remove(actor_id);
1965 }
1966 }
1967 }
1968 Ok(())
1969 }
1970 pub fn get_actor_snapshots(&self) -> Result<HashMap<ActorId, ActorSnapshot>> {
1980 Ok(self
1981 .actor_snapshots
1982 .lock()
1983 .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?
1984 .clone())
1985 }
1986 pub fn get_actor_snapshot(&self, actor_id: ActorId) -> Result<Option<ActorSnapshot>> {
1996 Ok(self
1997 .actor_snapshots
1998 .lock()
1999 .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?
2000 .get(&actor_id)
2001 .cloned())
2002 }
2003 pub fn detect_deadlocks(&self) -> Result<Vec<DeadlockCycle>> {
2013 if !self.config.enable_deadlock_detection {
2014 return Ok(Vec::new());
2015 }
2016 let mut detector = self
2017 .deadlock_detector
2018 .lock()
2019 .map_err(|_| anyhow::anyhow!("Failed to acquire deadlock detector lock"))?;
2020 detector.detect_cycles()
2021 }
2022 pub fn update_metrics(&self) -> Result<()> {
2032 if !self.config.enable_metrics {
2033 return Ok(());
2034 }
2035 let _system = self
2036 .actor_system
2037 .lock()
2038 .map_err(|_| anyhow::anyhow!("Failed to acquire actor system lock"))?;
2039 let mut metrics = self
2040 .metrics
2041 .lock()
2042 .map_err(|_| anyhow::anyhow!("Failed to acquire metrics lock"))?;
2043 let snapshots = self
2044 .actor_snapshots
2045 .lock()
2046 .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?;
2047 metrics.active_actors = snapshots.len();
2049 metrics.total_messages_processed = snapshots
2050 .values()
2051 .map(|s| s.message_stats.total_processed)
2052 .sum();
2053 metrics.total_queued_messages = snapshots.values().map(|s| s.mailbox_size).sum();
2054 metrics.avg_mailbox_size = if snapshots.is_empty() {
2055 0.0
2056 } else {
2057 metrics.total_queued_messages as f64 / snapshots.len() as f64
2058 };
2059 metrics.total_memory_usage = snapshots.values().filter_map(|s| s.memory_usage).sum();
2060 metrics.last_updated = current_timestamp();
2061 Ok(())
2062 }
2063 pub fn get_metrics(&self) -> Result<SystemMetrics> {
2073 Ok(self
2074 .metrics
2075 .lock()
2076 .map_err(|_| anyhow::anyhow!("Failed to acquire metrics lock"))?
2077 .clone())
2078 }
2079 pub fn uptime(&self) -> Duration {
2089 self.start_time.elapsed()
2090 }
2091 fn message_matches_filters(&self, trace: &MessageTrace) -> bool {
2093 if self.filters.is_empty() {
2094 return true; }
2096 self.filters
2097 .iter()
2098 .any(|filter| self.message_matches_filter(trace, filter))
2099 }
2100 fn message_matches_filter(&self, trace: &MessageTrace, filter: &MessageFilter) -> bool {
2102 if let Some(filter_actor_id) = filter.actor_id {
2104 if trace.destination != filter_actor_id {
2105 return false;
2106 }
2107 }
2108 if let Some(min_time) = filter.min_processing_time_us {
2110 if let Some(duration) = trace.processing_duration_us {
2111 if duration < min_time {
2112 return false;
2113 }
2114 } else {
2115 return false;
2116 }
2117 }
2118 if filter.failed_only && trace.status != MessageStatus::Failed {
2120 return false;
2121 }
2122 if let Some(max_depth) = filter.max_stack_depth {
2124 if trace.stack_depth > max_depth {
2125 return false;
2126 }
2127 }
2128 true
2129 }
2130 fn trace_matches_filter(&self, trace: &MessageTrace, filter_name: &str) -> bool {
2132 self.filters
2133 .iter()
2134 .find(|f| f.name == filter_name)
2135 .is_some_and(|filter| self.message_matches_filter(trace, filter))
2136 }
2137}
2138impl Default for DeadlockDetector {
2139 fn default() -> Self {
2140 Self::new()
2141 }
2142}
2143impl DeadlockDetector {
2144 pub fn new() -> Self {
2146 Self {
2147 dependency_graph: HashMap::new(),
2148 blocked_actors: HashMap::new(),
2149 last_check: Instant::now(),
2150 detected_deadlocks: Vec::new(),
2151 }
2152 }
2153 pub fn add_blocked_request(&mut self, request: BlockedRequest) {
2163 self.blocked_actors
2164 .entry(request.requester)
2165 .or_default()
2166 .push(request.clone());
2167 self.dependency_graph
2169 .entry(request.requester)
2170 .or_default()
2171 .insert(request.target);
2172 }
2173 pub fn remove_blocked_request(&mut self, requester: ActorId, target: ActorId) {
2183 if let Some(requests) = self.blocked_actors.get_mut(&requester) {
2184 requests.retain(|r| r.target != target);
2185 if requests.is_empty() {
2186 self.blocked_actors.remove(&requester);
2187 }
2188 }
2189 if let Some(dependencies) = self.dependency_graph.get_mut(&requester) {
2191 dependencies.remove(&target);
2192 if dependencies.is_empty() {
2193 self.dependency_graph.remove(&requester);
2194 }
2195 }
2196 }
2197 pub fn detect_cycles(&mut self) -> Result<Vec<DeadlockCycle>> {
2207 let mut cycles = Vec::new();
2208 let mut visited = HashSet::new();
2209 let mut path = Vec::new();
2210 for &actor in self.dependency_graph.keys() {
2211 if !visited.contains(&actor) {
2212 self.dfs_detect_cycle(actor, &mut visited, &mut path, &mut cycles)?;
2213 }
2214 }
2215 self.detected_deadlocks.extend(cycles.clone());
2216 self.last_check = Instant::now();
2217 Ok(cycles)
2218 }
2219 fn dfs_detect_cycle(
2221 &self,
2222 actor: ActorId,
2223 visited: &mut HashSet<ActorId>,
2224 path: &mut Vec<ActorId>,
2225 cycles: &mut Vec<DeadlockCycle>,
2226 ) -> Result<()> {
2227 visited.insert(actor);
2228 path.push(actor);
2229 if let Some(dependencies) = self.dependency_graph.get(&actor) {
2230 for &dependent_actor in dependencies {
2231 if let Some(cycle_start_index) = path.iter().position(|&a| a == dependent_actor) {
2232 let cycle_actors = path[cycle_start_index..].to_vec();
2234 let duration_estimate = self.estimate_cycle_duration(&cycle_actors);
2235 cycles.push(DeadlockCycle {
2236 actors: cycle_actors.clone(),
2237 detected_at: current_timestamp(),
2238 duration_estimate_ms: duration_estimate,
2239 resolution_suggestion: self.suggest_resolution(&cycle_actors),
2240 });
2241 } else if !visited.contains(&dependent_actor) {
2242 self.dfs_detect_cycle(dependent_actor, visited, path, cycles)?;
2243 }
2244 }
2245 }
2246 path.pop();
2247 Ok(())
2248 }
2249 fn estimate_cycle_duration(&self, actors: &[ActorId]) -> u64 {
2251 let now = Instant::now();
2252 actors
2253 .iter()
2254 .filter_map(|&actor| self.blocked_actors.get(&actor))
2255 .flatten()
2256 .map(|request| now.duration_since(request.timestamp).as_millis() as u64)
2257 .max()
2258 .unwrap_or(0)
2259 }
2260 fn suggest_resolution(&self, actors: &[ActorId]) -> String {
2262 match actors.len() {
2263 1 => "Self-deadlock: Check for recursive message sending".to_string(),
2264 2 => "Binary deadlock: Consider using ask with timeout or redesign interaction pattern".to_string(),
2265 3..=5 => "Multi-actor deadlock: Implement hierarchical message ordering or use supervision".to_string(),
2266 _ => "Complex deadlock: Consider breaking into smaller subsystems or using event sourcing".to_string(),
2267 }
2268 }
2269}
2270fn current_timestamp() -> u64 {
2272 SystemTime::now()
2273 .duration_since(UNIX_EPOCH)
2274 .unwrap_or_default()
2275 .as_secs()
2276}
2277impl MessageFilter {
2279 pub fn new(name: &str) -> Self {
2280 Self {
2281 name: name.to_string(),
2282 actor_id: None,
2283 actor_name_pattern: None,
2284 message_type_pattern: None,
2285 min_processing_time_us: None,
2286 failed_only: false,
2287 max_stack_depth: None,
2288 }
2289 }
2290 pub fn for_actor(name: &str, actor_id: ActorId) -> Self {
2300 Self {
2301 name: name.to_string(),
2302 actor_id: Some(actor_id),
2303 actor_name_pattern: None,
2304 message_type_pattern: None,
2305 min_processing_time_us: None,
2306 failed_only: false,
2307 max_stack_depth: None,
2308 }
2309 }
2310 pub fn failed_messages_only(name: &str) -> Self {
2320 Self {
2321 name: name.to_string(),
2322 actor_id: None,
2323 actor_name_pattern: None,
2324 message_type_pattern: None,
2325 min_processing_time_us: None,
2326 failed_only: true,
2327 max_stack_depth: None,
2328 }
2329 }
2330 pub fn slow_messages(name: &str, min_time_us: u64) -> Self {
2340 Self {
2341 name: name.to_string(),
2342 actor_id: None,
2343 actor_name_pattern: None,
2344 message_type_pattern: None,
2345 min_processing_time_us: Some(min_time_us),
2346 failed_only: false,
2347 max_stack_depth: None,
2348 }
2349 }
2350}
2351#[cfg(test)]
2352mod property_tests_observatory {
2353 use proptest::proptest;
2354
2355 proptest! {
2356 #[test]
2358 fn test_new_never_panics(input: String) {
2359 let _input = if input.len() > 100 { &input[..100] } else { &input[..] };
2361 let _ = std::panic::catch_unwind(|| {
2363 });
2366 }
2367 }
2368}