ruchy/runtime/
observatory.rs

1//! Actor observatory for live system introspection (RUCHY-0817)
2//!
3//! Provides comprehensive monitoring and debugging capabilities for the actor system,
4//! including message tracing, deadlock detection, and performance analysis.
5#[cfg(test)]
6mod tests {
7    use super::*;
8    use crate::runtime::actor::{ActorSystem, Message};
9    use std::sync::{Arc, Mutex};
10    use std::time::Duration;
11    // Helper functions for consistent test setup
12    fn create_test_actor_system() -> Arc<Mutex<ActorSystem>> {
13        ActorSystem::new()
14    }
15    fn create_test_config() -> ObservatoryConfig {
16        ObservatoryConfig {
17            max_traces: 100,
18            trace_retention_seconds: 3600,
19            enable_deadlock_detection: true,
20            deadlock_check_interval_ms: 1000,
21            enable_metrics: true,
22            metrics_interval_ms: 5000,
23            max_snapshots: 50,
24        }
25    }
26    fn create_test_observatory() -> ActorObservatory {
27        let system = create_test_actor_system();
28        let config = create_test_config();
29        ActorObservatory::new(system, config)
30    }
31    fn create_test_message_trace() -> MessageTrace {
32        MessageTrace {
33            trace_id: 12345,
34            timestamp: current_timestamp(),
35            source: Some(ActorId(1)),
36            destination: ActorId(2),
37            message: Message::User("test_message".to_string(), vec![]),
38            status: MessageStatus::Queued,
39            processing_duration_us: None,
40            error: None,
41            stack_depth: 1,
42            correlation_id: Some("corr-123".to_string()),
43        }
44    }
45    fn create_test_actor_snapshot() -> ActorSnapshot {
46        ActorSnapshot {
47            actor_id: ActorId(1),
48            name: "test_actor".to_string(),
49            timestamp: current_timestamp(),
50            state: ActorState::Running,
51            mailbox_size: 5,
52            parent: Some(ActorId(0)),
53            children: vec![ActorId(2), ActorId(3)],
54            message_stats: MessageStats::default(),
55            memory_usage: Some(1024),
56        }
57    }
58    fn create_test_message_filter() -> MessageFilter {
59        MessageFilter {
60            name: "test_filter".to_string(),
61            actor_id: Some(ActorId(1)),
62            actor_name_pattern: Some("test_actor".to_string()),
63            message_type_pattern: Some(".*message.*".to_string()),
64            min_processing_time_us: None,
65            failed_only: false,
66            max_stack_depth: Some(10),
67        }
68    }
69    // ========== Observatory Configuration Tests ==========
70    #[test]
71    fn test_observatory_config_default() {
72        let config = ObservatoryConfig::default();
73        assert_eq!(config.max_traces, 10000);
74        assert_eq!(config.trace_retention_seconds, 3600);
75        assert!(config.enable_deadlock_detection);
76        assert_eq!(config.deadlock_check_interval_ms, 1000);
77        assert!(config.enable_metrics);
78        assert_eq!(config.metrics_interval_ms, 5000);
79        assert_eq!(config.max_snapshots, 1000);
80    }
81    #[test]
82    fn test_observatory_config_clone() {
83        let config1 = create_test_config();
84        let config2 = config1.clone();
85        assert_eq!(config1.max_traces, config2.max_traces);
86        assert_eq!(
87            config1.enable_deadlock_detection,
88            config2.enable_deadlock_detection
89        );
90    }
91    #[test]
92    fn test_observatory_config_debug() {
93        let config = create_test_config();
94        let debug_str = format!("{config:?}");
95        assert!(debug_str.contains("ObservatoryConfig"));
96        assert!(debug_str.contains("max_traces"));
97        assert!(debug_str.contains("enable_deadlock_detection"));
98    }
99    #[test]
100    fn test_observatory_config_serialization() {
101        let config = create_test_config();
102        let json = serde_json::to_string(&config).unwrap();
103        let deserialized: ObservatoryConfig = serde_json::from_str(&json).unwrap();
104        assert_eq!(config.max_traces, deserialized.max_traces);
105        assert_eq!(config.enable_metrics, deserialized.enable_metrics);
106    }
107    // ========== Observatory Creation and Setup Tests ==========
108    #[test]
109    fn test_observatory_creation() {
110        let system = create_test_actor_system();
111        let config = create_test_config();
112        let observatory = ActorObservatory::new(system, config.clone());
113        assert_eq!(observatory.config.max_traces, config.max_traces);
114        assert!(observatory.filters.is_empty());
115        assert!(observatory.start_time.elapsed() < Duration::from_secs(1));
116    }
117    #[test]
118    fn test_observatory_with_default_config() {
119        let system = create_test_actor_system();
120        let config = ObservatoryConfig::default();
121        let observatory = ActorObservatory::new(system, config);
122        assert_eq!(observatory.config.max_traces, 10000);
123        assert!(observatory.config.enable_deadlock_detection);
124    }
125    #[test]
126    fn test_observatory_initialization_state() {
127        let observatory = create_test_observatory();
128        // Should start with empty state
129        assert!(observatory.get_filters().is_empty());
130        let metrics = observatory.metrics.lock().expect("Failed to acquire lock");
131        assert_eq!(metrics.active_actors, 0);
132        assert_eq!(metrics.total_messages_processed, 0);
133    }
134    // ========== Message Filter Management Tests ==========
135    #[test]
136    fn test_add_message_filter() {
137        let mut observatory = create_test_observatory();
138        let filter = create_test_message_filter();
139        observatory.add_filter(filter.clone());
140        assert_eq!(observatory.get_filters().len(), 1);
141        assert_eq!(observatory.get_filters()[0].name, filter.name);
142    }
143    #[test]
144    fn test_add_multiple_filters() {
145        let mut observatory = create_test_observatory();
146        let filter1 = MessageFilter {
147            name: "filter1".to_string(),
148            actor_id: None,
149            actor_name_pattern: None,
150            message_type_pattern: Some("type1".to_string()),
151            min_processing_time_us: None,
152            failed_only: false,
153            max_stack_depth: None,
154        };
155        let filter2 = MessageFilter {
156            name: "filter2".to_string(),
157            actor_id: Some(ActorId(2)),
158            actor_name_pattern: Some("actor2".to_string()),
159            message_type_pattern: None,
160            min_processing_time_us: Some(1000),
161            failed_only: true,
162            max_stack_depth: Some(5),
163        };
164        observatory.add_filter(filter1);
165        observatory.add_filter(filter2);
166        assert_eq!(observatory.get_filters().len(), 2);
167        assert_eq!(observatory.get_filters()[0].name, "filter1");
168        assert_eq!(observatory.get_filters()[1].name, "filter2");
169    }
170    #[test]
171    fn test_remove_message_filter() {
172        let mut observatory = create_test_observatory();
173        let filter = create_test_message_filter();
174        observatory.add_filter(filter);
175        assert_eq!(observatory.get_filters().len(), 1);
176        let removed = observatory.remove_filter("test_filter");
177        assert!(removed);
178        assert!(observatory.get_filters().is_empty());
179    }
180    #[test]
181    fn test_remove_nonexistent_filter() {
182        let mut observatory = create_test_observatory();
183        let removed = observatory.remove_filter("nonexistent");
184        assert!(!removed);
185    }
186    #[test]
187    fn test_get_filters_empty() {
188        let observatory = create_test_observatory();
189        assert!(observatory.get_filters().is_empty());
190    }
191    // ========== Message Tracing Tests ==========
192    #[test]
193    fn test_trace_message() {
194        let observatory = create_test_observatory();
195        let trace = create_test_message_trace();
196        let result = observatory.trace_message(trace.clone());
197        assert!(result.is_ok());
198        let traces = observatory.get_traces(None, None).unwrap();
199        assert_eq!(traces.len(), 1);
200        assert_eq!(traces[0].trace_id, trace.trace_id);
201    }
202    #[test]
203    fn test_trace_message_with_limit() {
204        let mut observatory = create_test_observatory();
205        observatory.config.max_traces = 2;
206        // Add 3 traces, should only keep 2 most recent
207        for i in 0..3 {
208            let mut trace = create_test_message_trace();
209            trace.trace_id = i as u64;
210            observatory.trace_message(trace).unwrap();
211        }
212        let traces = observatory.get_traces(None, None).unwrap();
213        assert_eq!(traces.len(), 2);
214        assert_eq!(traces[0].trace_id, 1); // First trace should be evicted
215        assert_eq!(traces[1].trace_id, 2);
216    }
217    #[test]
218    fn test_trace_message_age_retention() {
219        let mut observatory = create_test_observatory();
220        observatory.config.trace_retention_seconds = 1; // 1 second retention
221                                                        // Add an old trace
222        let mut old_trace = create_test_message_trace();
223        old_trace.timestamp = current_timestamp() - 3600; // 1 hour ago
224        observatory.trace_message(old_trace).unwrap();
225        // Add a recent trace
226        let recent_trace = create_test_message_trace();
227        observatory.trace_message(recent_trace).unwrap();
228        let traces = observatory.get_traces(None, None).unwrap();
229        // Only recent trace should remain
230        assert_eq!(traces.len(), 1);
231    }
232    #[test]
233    fn test_get_traces_with_limit() {
234        let observatory = create_test_observatory();
235        // Add 5 traces
236        for i in 0..5 {
237            let mut trace = create_test_message_trace();
238            trace.trace_id = i as u64;
239            observatory.trace_message(trace).unwrap();
240        }
241        let traces = observatory.get_traces(Some(3), None).unwrap();
242        assert_eq!(traces.len(), 3);
243    }
244    // ========== Message Status Tests ==========
245    #[test]
246    fn test_message_status_variants() {
247        let statuses = [
248            MessageStatus::Queued,
249            MessageStatus::Processing,
250            MessageStatus::Completed,
251            MessageStatus::Failed,
252            MessageStatus::Dropped,
253        ];
254        assert_eq!(statuses.len(), 5);
255        assert_eq!(statuses[0], MessageStatus::Queued);
256        assert_ne!(statuses[0], MessageStatus::Processing);
257    }
258    #[test]
259    fn test_message_status_serialization() {
260        let status = MessageStatus::Processing;
261        let json = serde_json::to_string(&status).unwrap();
262        let deserialized: MessageStatus = serde_json::from_str(&json).unwrap();
263        assert_eq!(status, deserialized);
264    }
265    #[test]
266    fn test_message_status_debug() {
267        let status = MessageStatus::Failed;
268        let debug_str = format!("{status:?}");
269        assert!(debug_str.contains("Failed"));
270    }
271    // ========== Actor State Tests ==========
272    #[test]
273    fn test_actor_state_variants() {
274        let states = [
275            ActorState::Starting,
276            ActorState::Running,
277            ActorState::Processing("test_message".to_string()),
278            ActorState::Restarting,
279            ActorState::Stopping,
280            ActorState::Stopped,
281            ActorState::Failed("test_error".to_string()),
282        ];
283        assert_eq!(states.len(), 7);
284        assert_eq!(states[0], ActorState::Starting);
285        assert_ne!(states[0], ActorState::Running);
286    }
287    #[test]
288    fn test_actor_state_processing() {
289        let state = ActorState::Processing("handle_request".to_string());
290        if let ActorState::Processing(message_type) = state {
291            assert_eq!(message_type, "handle_request");
292        } else {
293            panic!("Expected Processing state");
294        }
295    }
296    #[test]
297    fn test_actor_state_failed() {
298        let state = ActorState::Failed("connection_timeout".to_string());
299        if let ActorState::Failed(reason) = state {
300            assert_eq!(reason, "connection_timeout");
301        } else {
302            panic!("Expected Failed state");
303        }
304    }
305    #[test]
306    fn test_actor_state_serialization() {
307        let state = ActorState::Processing("test".to_string());
308        let json = serde_json::to_string(&state).unwrap();
309        let deserialized: ActorState = serde_json::from_str(&json).unwrap();
310        assert_eq!(state, deserialized);
311    }
312    // ========== Message Statistics Tests ==========
313    #[test]
314    fn test_message_stats_default() {
315        let stats = MessageStats::default();
316        assert_eq!(stats.total_processed, 0);
317        assert_eq!(stats.messages_per_second, 0.0);
318        assert_eq!(stats.avg_processing_time_us, 0.0);
319        assert_eq!(stats.max_processing_time_us, 0);
320        assert_eq!(stats.failed_messages, 0);
321        assert!(stats.last_processed.is_none());
322    }
323    #[test]
324    fn test_message_stats_clone() {
325        let mut stats1 = MessageStats::default();
326        stats1.total_processed = 100;
327        stats1.messages_per_second = 10.5;
328        let stats2 = stats1.clone();
329        assert_eq!(stats1.total_processed, stats2.total_processed);
330        assert_eq!(stats1.messages_per_second, stats2.messages_per_second);
331    }
332    #[test]
333    fn test_message_stats_serialization() {
334        let mut stats = MessageStats::default();
335        stats.total_processed = 500;
336        stats.avg_processing_time_us = 1500.0;
337        let json = serde_json::to_string(&stats).unwrap();
338        let deserialized: MessageStats = serde_json::from_str(&json).unwrap();
339        assert_eq!(stats.total_processed, deserialized.total_processed);
340    }
341    // ========== System Metrics Tests ==========
342    #[test]
343    fn test_system_metrics_default() {
344        let metrics = SystemMetrics::default();
345        assert_eq!(metrics.active_actors, 0);
346        assert_eq!(metrics.total_messages_processed, 0);
347        assert_eq!(metrics.system_messages_per_second, 0.0);
348        assert_eq!(metrics.total_memory_usage, 0);
349        assert_eq!(metrics.total_queued_messages, 0);
350        assert_eq!(metrics.avg_mailbox_size, 0.0);
351        assert_eq!(metrics.recent_restarts, 0);
352        assert!(metrics.last_updated > 0); // Uses current timestamp
353    }
354    #[test]
355    fn test_system_metrics_update() {
356        let mut metrics = SystemMetrics::default();
357        metrics.active_actors = 10;
358        metrics.total_messages_processed = 1000;
359        metrics.system_messages_per_second = 50.5;
360        metrics.last_updated = current_timestamp();
361        assert_eq!(metrics.active_actors, 10);
362        assert_eq!(metrics.total_messages_processed, 1000);
363        assert!(metrics.last_updated > 0);
364    }
365    #[test]
366    fn test_system_metrics_serialization() {
367        let mut metrics = SystemMetrics::default();
368        metrics.active_actors = 5;
369        metrics.total_memory_usage = 1_024_000;
370        let json = serde_json::to_string(&metrics).unwrap();
371        let deserialized: SystemMetrics = serde_json::from_str(&json).unwrap();
372        assert_eq!(metrics.active_actors, deserialized.active_actors);
373        assert_eq!(metrics.total_memory_usage, deserialized.total_memory_usage);
374    }
375    // ========== Actor Snapshot Tests ==========
376    #[test]
377    fn test_actor_snapshot_creation() {
378        let snapshot = create_test_actor_snapshot();
379        assert_eq!(snapshot.actor_id, ActorId(1));
380        assert_eq!(snapshot.name, "test_actor");
381        assert_eq!(snapshot.state, ActorState::Running);
382        assert_eq!(snapshot.mailbox_size, 5);
383        assert_eq!(snapshot.children.len(), 2);
384        assert!(snapshot.memory_usage.is_some());
385    }
386    #[test]
387    fn test_actor_snapshot_with_no_parent() {
388        let mut snapshot = create_test_actor_snapshot();
389        snapshot.parent = None;
390        assert!(snapshot.parent.is_none());
391        assert_eq!(snapshot.children.len(), 2);
392    }
393    #[test]
394    fn test_actor_snapshot_serialization() {
395        let snapshot = create_test_actor_snapshot();
396        let json = serde_json::to_string(&snapshot).unwrap();
397        let deserialized: ActorSnapshot = serde_json::from_str(&json).unwrap();
398        assert_eq!(snapshot.actor_id, deserialized.actor_id);
399        assert_eq!(snapshot.name, deserialized.name);
400        assert_eq!(snapshot.state, deserialized.state);
401    }
402    // ========== Message Filter Tests ==========
403    #[test]
404    fn test_message_filter_creation() {
405        let filter = create_test_message_filter();
406        assert_eq!(filter.name, "test_filter");
407        assert!(filter.actor_id.is_some());
408        assert!(filter.actor_name_pattern.is_some());
409        assert!(filter.message_type_pattern.is_some());
410        assert!(!filter.failed_only);
411    }
412    #[test]
413    fn test_message_filter_with_duration_limits() {
414        let filter = MessageFilter {
415            name: "duration_filter".to_string(),
416            actor_id: None,
417            actor_name_pattern: None,
418            message_type_pattern: None,
419            min_processing_time_us: Some(1000),
420            failed_only: false,
421            max_stack_depth: None,
422        };
423        assert_eq!(filter.min_processing_time_us, Some(1000));
424    }
425    #[test]
426    fn test_message_filter_errors_only() {
427        let filter = MessageFilter {
428            name: "error_filter".to_string(),
429            actor_id: None,
430            actor_name_pattern: None,
431            message_type_pattern: None,
432            min_processing_time_us: None,
433            failed_only: true,
434            max_stack_depth: None,
435        };
436        assert!(filter.failed_only);
437    }
438    // ========== Message Trace Tests ==========
439    #[test]
440    fn test_message_trace_creation() {
441        let trace = create_test_message_trace();
442        assert_eq!(trace.trace_id, 12345);
443        assert!(trace.source.is_some());
444        assert_eq!(trace.destination, ActorId(2));
445        assert_eq!(trace.status, MessageStatus::Queued);
446        assert_eq!(trace.stack_depth, 1);
447        assert!(trace.correlation_id.is_some());
448    }
449    #[test]
450    fn test_message_trace_with_processing() {
451        let mut trace = create_test_message_trace();
452        trace.status = MessageStatus::Processing;
453        trace.processing_duration_us = Some(1500);
454        assert_eq!(trace.status, MessageStatus::Processing);
455        assert_eq!(trace.processing_duration_us, Some(1500));
456    }
457    #[test]
458    fn test_message_trace_with_error() {
459        let mut trace = create_test_message_trace();
460        trace.status = MessageStatus::Failed;
461        trace.error = Some("timeout_error".to_string());
462        assert_eq!(trace.status, MessageStatus::Failed);
463        assert_eq!(trace.error, Some("timeout_error".to_string()));
464    }
465    #[test]
466    fn test_message_trace_serialization() {
467        let trace = create_test_message_trace();
468        let json = serde_json::to_string(&trace).unwrap();
469        let deserialized: MessageTrace = serde_json::from_str(&json).unwrap();
470        assert_eq!(trace.trace_id, deserialized.trace_id);
471        assert_eq!(trace.status, deserialized.status);
472    }
473    // ========== Utility Function Tests ==========
474    #[test]
475    fn test_current_timestamp() {
476        let ts1 = current_timestamp();
477        std::thread::sleep(Duration::from_millis(10)); // Increase sleep time for more reliable test
478        let ts2 = current_timestamp();
479        assert!(ts2 >= ts1); // May be equal due to precision
480    }
481    // ========== Integration Tests ==========
482    #[test]
483    fn test_observatory_full_workflow() {
484        let mut observatory = create_test_observatory();
485        // Add filter that matches our test message
486        let filter = MessageFilter {
487            name: "permissive_filter".to_string(),
488            actor_id: None, // Allow all actors
489            actor_name_pattern: None,
490            message_type_pattern: None, // Allow all message types
491            min_processing_time_us: None,
492            failed_only: false,
493            max_stack_depth: None,
494        };
495        observatory.add_filter(filter);
496        assert_eq!(observatory.get_filters().len(), 1);
497        // Add trace
498        let trace = create_test_message_trace();
499        observatory.trace_message(trace.clone()).unwrap();
500        // Verify trace was recorded
501        let traces = observatory.get_traces(None, None).unwrap();
502        assert_eq!(traces.len(), 1);
503        assert_eq!(traces[0].trace_id, trace.trace_id);
504        // Remove filter
505        let removed = observatory.remove_filter("permissive_filter");
506        assert!(removed);
507        assert!(observatory.get_filters().is_empty());
508    }
509    #[test]
510    fn test_observatory_multiple_traces() {
511        let observatory = create_test_observatory();
512        // Add multiple traces with different statuses
513        let statuses = [
514            MessageStatus::Queued,
515            MessageStatus::Processing,
516            MessageStatus::Completed,
517            MessageStatus::Failed,
518        ];
519        for (i, status) in statuses.iter().enumerate() {
520            let mut trace = create_test_message_trace();
521            trace.trace_id = i as u64;
522            trace.status = status.clone();
523            observatory.trace_message(trace).unwrap();
524        }
525        let traces = observatory.get_traces(None, None).unwrap();
526        assert_eq!(traces.len(), 4);
527        // Verify different statuses were recorded - count unique statuses differently
528        let mut status_counts = std::collections::HashMap::new();
529        for trace in &traces {
530            *status_counts.entry(&trace.status).or_insert(0) += 1;
531        }
532        assert_eq!(status_counts.len(), 4);
533    }
534    #[test]
535    fn test_observatory_config_variations() {
536        let configs = vec![
537            ObservatoryConfig {
538                max_traces: 50,
539                enable_deadlock_detection: false,
540                enable_metrics: false,
541                ..ObservatoryConfig::default()
542            },
543            ObservatoryConfig {
544                trace_retention_seconds: 7200,
545                deadlock_check_interval_ms: 500,
546                metrics_interval_ms: 2000,
547                ..ObservatoryConfig::default()
548            },
549        ];
550        for config in configs {
551            let system = create_test_actor_system();
552            let observatory = ActorObservatory::new(system, config.clone());
553            assert_eq!(observatory.config.max_traces, config.max_traces);
554            assert_eq!(
555                observatory.config.enable_deadlock_detection,
556                config.enable_deadlock_detection
557            );
558        }
559    }
560    #[test]
561    fn test_observatory_concurrent_access() {
562        use std::thread;
563        let observatory = Arc::new(create_test_observatory());
564        let mut handles = vec![];
565        // Spawn multiple threads to add traces concurrently
566        for i in 0..5 {
567            let obs = observatory.clone();
568            let handle = thread::spawn(move || {
569                let mut trace = create_test_message_trace();
570                trace.trace_id = i;
571                obs.trace_message(trace).unwrap();
572            });
573            handles.push(handle);
574        }
575        // Wait for all threads to complete
576        for handle in handles {
577            handle.join().expect("Thread failed to join");
578        }
579        // Verify all traces were recorded
580        let traces = observatory.get_traces(None, None).unwrap();
581        assert_eq!(traces.len(), 5);
582    }
583
584    // ========== SPRINT 45: Advanced Observatory Tests (20 tests) ==========
585
586    #[test]
587    fn test_sprint_45_01_complex_deadlock_detection() {
588        let observatory = create_test_observatory();
589
590        // Create complex deadlock scenario: A -> B -> C -> A
591        let mut detector = observatory.deadlock_detector.lock().unwrap();
592
593        let request_ab = BlockedRequest {
594            requester: ActorId(1),
595            target: ActorId(2),
596            timestamp: Instant::now(),
597            timeout: Duration::from_secs(5),
598            correlation_id: None,
599        };
600
601        let request_bc = BlockedRequest {
602            requester: ActorId(2),
603            target: ActorId(3),
604            timestamp: Instant::now(),
605            timeout: Duration::from_secs(5),
606            correlation_id: None,
607        };
608
609        let request_ca = BlockedRequest {
610            requester: ActorId(3),
611            target: ActorId(1),
612            timestamp: Instant::now(),
613            timeout: Duration::from_secs(5),
614            correlation_id: None,
615        };
616
617        detector.add_blocked_request(request_ab);
618        detector.add_blocked_request(request_bc);
619        detector.add_blocked_request(request_ca);
620
621        let cycles = detector.detect_cycles().unwrap();
622        assert!(!cycles.is_empty(), "Should detect circular dependency");
623        assert_eq!(cycles[0].actors.len(), 3);
624    }
625
626    #[test]
627    fn test_sprint_45_02_message_filter_pattern_matching() {
628        let mut observatory = create_test_observatory();
629
630        // Add sophisticated filter
631        let complex_filter = MessageFilter {
632            name: "complex_filter".to_string(),
633            actor_id: Some(ActorId(5)),
634            actor_name_pattern: Some(r"worker_\d+".to_string()),
635            message_type_pattern: Some(r"ProcessJob\{.*\}".to_string()),
636            min_processing_time_us: Some(1000),
637            failed_only: false,
638            max_stack_depth: Some(3),
639        };
640
641        observatory.add_filter(complex_filter);
642
643        // Create matching trace
644        let matching_trace = MessageTrace {
645            trace_id: 100,
646            timestamp: current_timestamp(),
647            source: Some(ActorId(1)),
648            destination: ActorId(5),
649            message: Message::User("ProcessJob{id: 42}".to_string(), vec![]),
650            status: MessageStatus::Processing,
651            processing_duration_us: Some(1500),
652            error: None,
653            stack_depth: 2,
654            correlation_id: Some("job-42".to_string()),
655        };
656
657        // Test filter matching
658        assert!(observatory.message_matches_filters(&matching_trace));
659
660        // Create non-matching trace
661        let non_matching_trace = MessageTrace {
662            trace_id: 101,
663            timestamp: current_timestamp(),
664            source: Some(ActorId(1)),
665            destination: ActorId(6), // Different actor
666            message: matching_trace.message,
667            status: MessageStatus::Processing,
668            processing_duration_us: Some(1500),
669            error: None,
670            stack_depth: 2,
671            correlation_id: Some("job-43".to_string()),
672        };
673
674        assert!(!observatory.message_matches_filters(&non_matching_trace));
675    }
676
677    #[test]
678    fn test_sprint_45_03_metrics_calculation_accuracy() {
679        let observatory = create_test_observatory();
680
681        // Add multiple actor snapshots with varying stats
682        let mut snapshots = observatory.actor_snapshots.lock().unwrap();
683
684        for i in 1..=5 {
685            let snapshot = ActorSnapshot {
686                actor_id: ActorId(i),
687                name: format!("actor_{i}"),
688                timestamp: current_timestamp(),
689                state: ActorState::Running,
690                mailbox_size: (i * 10) as usize, // 10, 20, 30, 40, 50
691                parent: if i > 1 { Some(ActorId(i - 1)) } else { None },
692                children: vec![],
693                message_stats: MessageStats {
694                    total_processed: i * 100,                  // 100, 200, 300, 400, 500
695                    messages_per_second: i as f64 * 10.0,      // 10.0, 20.0, 30.0, 40.0, 50.0
696                    avg_processing_time_us: i as f64 * 1000.0, // 1000, 2000, 3000, 4000, 5000
697                    max_processing_time_us: i * 2000,          // 2000, 4000, 6000, 8000, 10000
698                    failed_messages: i * 2,                    // 2, 4, 6, 8, 10
699                    last_processed: Some(current_timestamp()),
700                },
701                memory_usage: Some((i * 1024) as usize), // 1024, 2048, 3072, 4096, 5120
702            };
703            snapshots.insert(ActorId(i), snapshot);
704        }
705        drop(snapshots);
706
707        // Update metrics
708        let result = observatory.update_metrics();
709        assert!(result.is_ok());
710
711        // Verify calculations
712        let metrics = observatory.get_metrics().unwrap();
713        assert_eq!(metrics.active_actors, 5);
714        assert_eq!(metrics.total_messages_processed, 1500); // 100+200+300+400+500
715        assert_eq!(metrics.total_queued_messages, 150); // 10+20+30+40+50
716        assert!((metrics.avg_mailbox_size - 30.0).abs() < 0.1); // 150/5 = 30
717        assert_eq!(metrics.total_memory_usage, 15360); // 1024+2048+3072+4096+5120
718    }
719
720    #[test]
721    fn test_sprint_45_04_trace_correlation_tracking() {
722        let observatory = create_test_observatory();
723
724        let correlation_id = "operation-xyz-123".to_string();
725
726        // Create related traces with same correlation ID
727        for i in 1..=3 {
728            let trace = MessageTrace {
729                trace_id: i,
730                timestamp: current_timestamp(),
731                source: Some(ActorId(i)),
732                destination: ActorId(i + 1),
733                message: Message::User(format!("step_{i}"), vec![]),
734                status: MessageStatus::Completed,
735                processing_duration_us: Some(i * 100),
736                error: None,
737                stack_depth: i as usize,
738                correlation_id: Some(correlation_id.clone()),
739            };
740            observatory.trace_message(trace).unwrap();
741        }
742
743        // Verify traces can be retrieved by correlation
744        let all_traces = observatory.get_traces(None, None).unwrap();
745        let correlated_traces: Vec<_> = all_traces
746            .iter()
747            .filter(|t| t.correlation_id.as_ref() == Some(&correlation_id))
748            .collect();
749
750        assert_eq!(correlated_traces.len(), 3);
751
752        // Verify they're ordered by trace_id
753        for (i, trace) in correlated_traces.iter().enumerate() {
754            assert_eq!(trace.trace_id, (i + 1) as u64);
755        }
756    }
757
758    #[test]
759    fn test_sprint_45_05_actor_hierarchy_analysis() {
760        let observatory = create_test_observatory();
761
762        // Create hierarchical actor structure
763        let mut snapshots = observatory.actor_snapshots.lock().unwrap();
764
765        // Root actor
766        snapshots.insert(
767            ActorId(1),
768            ActorSnapshot {
769                actor_id: ActorId(1),
770                name: "root".to_string(),
771                timestamp: current_timestamp(),
772                state: ActorState::Running,
773                mailbox_size: 0,
774                parent: None,
775                children: vec![ActorId(2), ActorId(3)],
776                message_stats: MessageStats::default(),
777                memory_usage: Some(1024),
778            },
779        );
780
781        // Child actors
782        for i in 2..=3 {
783            snapshots.insert(
784                ActorId(i),
785                ActorSnapshot {
786                    actor_id: ActorId(i),
787                    name: format!("child_{i}"),
788                    timestamp: current_timestamp(),
789                    state: ActorState::Running,
790                    mailbox_size: 5,
791                    parent: Some(ActorId(1)),
792                    children: vec![ActorId(i + 2)], // grandchildren
793                    message_stats: MessageStats::default(),
794                    memory_usage: Some(512),
795                },
796            );
797        }
798
799        // Grandchild actors
800        for i in 4..=5 {
801            snapshots.insert(
802                ActorId(i),
803                ActorSnapshot {
804                    actor_id: ActorId(i),
805                    name: format!("grandchild_{i}"),
806                    timestamp: current_timestamp(),
807                    state: ActorState::Running,
808                    mailbox_size: 2,
809                    parent: Some(ActorId(i - 2)),
810                    children: vec![],
811                    message_stats: MessageStats::default(),
812                    memory_usage: Some(256),
813                },
814            );
815        }
816        drop(snapshots);
817
818        // Verify hierarchy
819        let snapshots = observatory.get_actor_snapshots().unwrap();
820        let root = snapshots.get(&ActorId(1)).unwrap();
821        assert_eq!(root.children.len(), 2);
822        assert!(root.parent.is_none());
823
824        let child = snapshots.get(&ActorId(2)).unwrap();
825        assert_eq!(child.parent, Some(ActorId(1)));
826        assert_eq!(child.children.len(), 1);
827    }
828
829    #[test]
830    fn test_sprint_45_06_performance_degradation_detection() {
831        let observatory = create_test_observatory();
832
833        // Simulate performance degradation over time
834        let base_time = current_timestamp();
835
836        for i in 1..=10 {
837            let trace = MessageTrace {
838                trace_id: i,
839                timestamp: base_time + (i * 1000), // 1 second intervals
840                source: Some(ActorId(1)),
841                destination: ActorId(2),
842                message: Message::User("process".to_string(), vec![]),
843                status: MessageStatus::Completed,
844                processing_duration_us: Some(i * i * 100), // Quadratic increase: 100, 400, 900...
845                error: None,
846                stack_depth: 1,
847                correlation_id: None,
848            };
849            observatory.trace_message(trace).unwrap();
850        }
851
852        let traces = observatory.get_traces(None, None).unwrap();
853        assert_eq!(traces.len(), 10);
854
855        // Verify performance degradation trend
856        let processing_times: Vec<_> = traces
857            .iter()
858            .filter_map(|t| t.processing_duration_us)
859            .collect();
860
861        // Each subsequent time should be larger (quadratic growth)
862        for i in 1..processing_times.len() {
863            assert!(processing_times[i] > processing_times[i - 1]);
864        }
865
866        // Last trace should be significantly slower than first
867        assert!(processing_times.last().unwrap() > &(processing_times[0] * 10));
868    }
869
870    #[test]
871    fn test_sprint_45_07_memory_leak_detection() {
872        let observatory = create_test_observatory();
873
874        // Simulate memory leak scenario
875        let mut snapshots = observatory.actor_snapshots.lock().unwrap();
876
877        let actor_id = ActorId(1);
878        let base_time = current_timestamp();
879
880        // Create snapshots showing increasing memory usage
881        for i in 1..=5 {
882            let snapshot = ActorSnapshot {
883                actor_id,
884                name: "leaky_actor".to_string(),
885                timestamp: base_time + (i * 60000), // 1 minute intervals
886                state: ActorState::Running,
887                mailbox_size: 10,
888                parent: None,
889                children: vec![],
890                message_stats: MessageStats::default(),
891                memory_usage: Some((1024 * i * i) as usize), // Quadratic growth
892            };
893            snapshots.insert(actor_id, snapshot); // Replace previous snapshot
894        }
895        drop(snapshots);
896
897        // Get final snapshot
898        let final_snapshot = observatory.get_actor_snapshot(actor_id).unwrap().unwrap();
899        assert_eq!(final_snapshot.memory_usage, Some(1024 * 25)); // 1024 * 5^2
900
901        // Memory usage should be significantly higher than baseline
902        assert!(final_snapshot.memory_usage.unwrap() > 10240); // > 10KB indicates potential leak
903    }
904
905    #[test]
906    fn test_sprint_45_08_error_propagation_tracking() {
907        let observatory = create_test_observatory();
908
909        // Create error propagation chain
910        let error_msg = "Database connection failed".to_string();
911
912        for i in 1..=4 {
913            let trace = MessageTrace {
914                trace_id: i,
915                timestamp: current_timestamp() + (i * 100),
916                source: Some(ActorId(i)),
917                destination: ActorId(i + 1),
918                message: Message::User("database_query".to_string(), vec![]),
919                status: MessageStatus::Failed,
920                processing_duration_us: Some(50),
921                error: Some(error_msg.clone()),
922                stack_depth: i as usize,
923                correlation_id: Some("db-op-456".to_string()),
924            };
925            observatory.trace_message(trace).unwrap();
926        }
927
928        // Verify error propagation
929        let traces = observatory.get_traces(None, None).unwrap();
930        let failed_traces: Vec<_> = traces
931            .iter()
932            .filter(|t| t.status == MessageStatus::Failed)
933            .collect();
934
935        assert_eq!(failed_traces.len(), 4);
936
937        // All should have the same error message
938        for trace in failed_traces {
939            assert_eq!(trace.error.as_ref().unwrap(), &error_msg);
940            assert_eq!(trace.correlation_id.as_ref().unwrap(), "db-op-456");
941        }
942    }
943
944    #[test]
945    fn test_sprint_45_09_concurrent_trace_access() {
946        let observatory = Arc::new(create_test_observatory());
947        let mut handles = vec![];
948
949        // Spawn multiple threads accessing traces concurrently
950        for thread_id in 0..3 {
951            let obs = Arc::clone(&observatory);
952            let handle = std::thread::spawn(move || {
953                for i in 0..10 {
954                    let trace = MessageTrace {
955                        trace_id: (thread_id * 100 + i) as u64,
956                        timestamp: current_timestamp(),
957                        source: Some(ActorId(thread_id as u64)),
958                        destination: ActorId((thread_id + 1) as u64),
959                        message: Message::User(format!("thread_{thread_id}_msg_{i}"), vec![]),
960                        status: MessageStatus::Completed,
961                        processing_duration_us: Some(100 + i as u64),
962                        error: None,
963                        stack_depth: 1,
964                        correlation_id: Some(format!("thread_{thread_id}")),
965                    };
966                    obs.trace_message(trace).unwrap();
967
968                    // Also read traces
969                    let _traces = obs.get_traces(None, Some("5")).unwrap();
970                }
971            });
972            handles.push(handle);
973        }
974
975        // Wait for all threads
976        for handle in handles {
977            handle.join().unwrap();
978        }
979
980        // Verify all traces were recorded
981        let final_traces = observatory.get_traces(None, None).unwrap();
982        assert_eq!(final_traces.len(), 30); // 3 threads * 10 messages each
983    }
984
985    #[test]
986    fn test_sprint_45_10_snapshot_time_series_analysis() {
987        let observatory = create_test_observatory();
988        let actor_id = ActorId(1);
989
990        // Create time series of snapshots
991        let mut snapshots = observatory.actor_snapshots.lock().unwrap();
992        let base_time = current_timestamp();
993
994        for i in 1..=5 {
995            let snapshot = ActorSnapshot {
996                actor_id,
997                name: "monitored_actor".to_string(),
998                timestamp: base_time + (i * 10000), // 10 second intervals
999                state: if i == 5 {
1000                    ActorState::Failed("Simulated crash".to_string())
1001                } else {
1002                    ActorState::Running
1003                },
1004                mailbox_size: if i < 4 { (i * 2) as usize } else { 0 }, // Clears before crash
1005                parent: None,
1006                children: vec![],
1007                message_stats: MessageStats {
1008                    total_processed: i * 50,
1009                    messages_per_second: i as f64 * 5.0,
1010                    avg_processing_time_us: 100.0 * i as f64,
1011                    max_processing_time_us: i * 200,
1012                    failed_messages: if i >= 4 { i } else { 0 },
1013                    last_processed: Some(base_time + (i * 10000)),
1014                },
1015                memory_usage: Some((1024 * i) as usize),
1016            };
1017            snapshots.insert(actor_id, snapshot);
1018        }
1019        drop(snapshots);
1020
1021        // Verify final state
1022        let final_snapshot = observatory.get_actor_snapshot(actor_id).unwrap().unwrap();
1023        assert!(matches!(final_snapshot.state, ActorState::Failed(_)));
1024        assert_eq!(final_snapshot.mailbox_size, 0);
1025        assert_eq!(final_snapshot.message_stats.failed_messages, 5);
1026    }
1027
1028    #[test]
1029    fn test_sprint_45_11_filter_performance_optimization() {
1030        let mut observatory = create_test_observatory();
1031
1032        // Add many filters
1033        for i in 1..=100 {
1034            let filter = MessageFilter {
1035                name: format!("filter_{i}"),
1036                actor_id: Some(ActorId(i)),
1037                actor_name_pattern: None,
1038                message_type_pattern: None,
1039                min_processing_time_us: None,
1040                failed_only: false,
1041                max_stack_depth: None,
1042            };
1043            observatory.add_filter(filter);
1044        }
1045
1046        // Test filtering performance
1047        let test_trace = create_test_message_trace();
1048        let start_time = std::time::Instant::now();
1049
1050        // Run filter matching many times
1051        for _ in 0..1000 {
1052            let _matches = observatory.message_matches_filters(&test_trace);
1053        }
1054
1055        let elapsed = start_time.elapsed();
1056        assert!(
1057            elapsed < Duration::from_millis(100),
1058            "Filter matching should be fast even with many filters"
1059        );
1060
1061        // Verify filters are still accessible
1062        assert_eq!(observatory.get_filters().len(), 100);
1063    }
1064
1065    #[test]
1066    fn test_sprint_45_12_deadlock_resolution_tracking() {
1067        let observatory = create_test_observatory();
1068        let mut detector = observatory.deadlock_detector.lock().unwrap();
1069
1070        // Create deadlock scenario
1071        let request1 = BlockedRequest {
1072            requester: ActorId(1),
1073            target: ActorId(2),
1074            timestamp: Instant::now(),
1075            timeout: Duration::from_millis(5000),
1076            correlation_id: None,
1077        };
1078
1079        let request2 = BlockedRequest {
1080            requester: ActorId(2),
1081            target: ActorId(1),
1082            timestamp: Instant::now(),
1083            timeout: Duration::from_millis(5000),
1084            correlation_id: None,
1085        };
1086
1087        detector.add_blocked_request(request1);
1088        detector.add_blocked_request(request2);
1089
1090        // Detect deadlock
1091        let cycles = detector.detect_cycles().unwrap();
1092        assert!(!cycles.is_empty());
1093
1094        // Resolve deadlock by removing one request
1095        detector.remove_blocked_request(ActorId(1), ActorId(2));
1096
1097        // Verify deadlock is resolved
1098        let cycles_after = detector.detect_cycles().unwrap();
1099        assert!(cycles_after.is_empty() || cycles_after.len() < cycles.len());
1100    }
1101
1102    #[test]
1103    fn test_sprint_45_13_observatory_uptime_tracking() {
1104        let observatory = create_test_observatory();
1105
1106        // Get initial uptime
1107        let uptime1 = observatory.uptime();
1108        assert!(uptime1 < Duration::from_secs(1));
1109
1110        // Wait a small amount
1111        std::thread::sleep(Duration::from_millis(10));
1112
1113        // Get uptime again
1114        let uptime2 = observatory.uptime();
1115        assert!(uptime2 > uptime1);
1116        assert!(uptime2 >= Duration::from_millis(10));
1117    }
1118
1119    #[test]
1120    fn test_sprint_45_14_message_trace_limits() {
1121        let mut config = create_test_config();
1122        config.max_traces = 3; // Very small limit
1123
1124        let system = create_test_actor_system();
1125        let observatory = ActorObservatory::new(system, config);
1126
1127        // Add more traces than the limit
1128        for i in 1..=5 {
1129            let trace = MessageTrace {
1130                trace_id: i,
1131                timestamp: current_timestamp() + (i * 1000),
1132                source: Some(ActorId(1)),
1133                destination: ActorId(2),
1134                message: Message::User(format!("msg_{i}"), vec![]),
1135                status: MessageStatus::Completed,
1136                processing_duration_us: Some(100),
1137                error: None,
1138                stack_depth: 1,
1139                correlation_id: None,
1140            };
1141            observatory.trace_message(trace).unwrap();
1142        }
1143
1144        // Should only keep the most recent traces
1145        let traces = observatory.get_traces(None, None).unwrap();
1146        assert_eq!(traces.len(), 3);
1147
1148        // Should contain the last 3 traces (3, 4, 5)
1149        let trace_ids: Vec<_> = traces.iter().map(|t| t.trace_id).collect();
1150        assert!(trace_ids.contains(&3));
1151        assert!(trace_ids.contains(&4));
1152        assert!(trace_ids.contains(&5));
1153    }
1154
1155    #[test]
1156    fn test_sprint_45_15_metrics_historical_tracking() {
1157        let observatory = create_test_observatory();
1158
1159        // Initial metrics update
1160        observatory.update_metrics().unwrap();
1161        let metrics1 = observatory.get_metrics().unwrap();
1162        let time1 = metrics1.last_updated;
1163
1164        // Add some activity
1165        let mut snapshots = observatory.actor_snapshots.lock().unwrap();
1166        snapshots.insert(ActorId(1), create_test_actor_snapshot());
1167        drop(snapshots);
1168
1169        // Wait and update again
1170        std::thread::sleep(Duration::from_millis(100)); // Increased from 10ms to be more reliable
1171        observatory.update_metrics().unwrap();
1172        let metrics2 = observatory.get_metrics().unwrap();
1173
1174        // Verify timestamp progression (allow for same timestamp due to clock precision)
1175        assert!(metrics2.last_updated >= time1);
1176        assert_eq!(metrics2.active_actors, 1);
1177    }
1178
1179    #[test]
1180    fn test_sprint_45_16_complex_filter_combinations() {
1181        let mut observatory = create_test_observatory();
1182
1183        // Add multiple complex filters
1184        let filter1 = MessageFilter {
1185            name: "high_latency_filter".to_string(),
1186            actor_id: None,
1187            actor_name_pattern: None,
1188            message_type_pattern: None,
1189            min_processing_time_us: Some(1000),
1190            failed_only: false,
1191            max_stack_depth: None,
1192        };
1193
1194        let filter2 = MessageFilter {
1195            name: "error_filter".to_string(),
1196            actor_id: None,
1197            actor_name_pattern: None,
1198            message_type_pattern: None,
1199            min_processing_time_us: None,
1200            failed_only: true,
1201            max_stack_depth: None,
1202        };
1203
1204        observatory.add_filter(filter1);
1205        observatory.add_filter(filter2);
1206
1207        // Test trace that matches first filter
1208        let high_latency_trace = MessageTrace {
1209            trace_id: 1,
1210            timestamp: current_timestamp(),
1211            source: Some(ActorId(1)),
1212            destination: ActorId(2),
1213            message: Message::User("slow_operation".to_string(), vec![]),
1214            status: MessageStatus::Completed,
1215            processing_duration_us: Some(2000), // Matches min_processing_time_us
1216            error: None,
1217            stack_depth: 1,
1218            correlation_id: None,
1219        };
1220
1221        // Test trace that matches second filter
1222        let error_trace = MessageTrace {
1223            trace_id: 2,
1224            timestamp: current_timestamp(),
1225            source: Some(ActorId(2)),
1226            destination: ActorId(3),
1227            message: Message::User("failing_operation".to_string(), vec![]),
1228            status: MessageStatus::Failed, // Matches failed_only
1229            processing_duration_us: Some(100),
1230            error: Some("Operation failed".to_string()),
1231            stack_depth: 1,
1232            correlation_id: None,
1233        };
1234
1235        // Verify both traces match their respective filters
1236        assert!(observatory.message_matches_filters(&high_latency_trace));
1237        assert!(observatory.message_matches_filters(&error_trace));
1238    }
1239
1240    #[test]
1241    fn test_sprint_45_17_actor_state_transitions() {
1242        let observatory = create_test_observatory();
1243        let actor_id = ActorId(1);
1244
1245        // Test state transition tracking
1246        let states = [
1247            ActorState::Starting,
1248            ActorState::Running,
1249            ActorState::Stopping,
1250            ActorState::Running,
1251            ActorState::Failed("Simulated failure".to_string()),
1252        ];
1253
1254        let mut snapshots = observatory.actor_snapshots.lock().unwrap();
1255        for (i, state) in states.iter().enumerate() {
1256            let snapshot = ActorSnapshot {
1257                actor_id,
1258                name: "transitioning_actor".to_string(),
1259                timestamp: current_timestamp() + ((i + 1) * 1000) as u64,
1260                state: state.clone(),
1261                mailbox_size: if matches!(state, ActorState::Failed(_)) {
1262                    0
1263                } else {
1264                    5
1265                },
1266                parent: None,
1267                children: vec![],
1268                message_stats: MessageStats::default(),
1269                memory_usage: Some(1024),
1270            };
1271            snapshots.insert(actor_id, snapshot);
1272        }
1273        drop(snapshots);
1274
1275        // Verify final state
1276        let final_snapshot = observatory.get_actor_snapshot(actor_id).unwrap().unwrap();
1277        assert!(matches!(final_snapshot.state, ActorState::Failed(_)));
1278        assert_eq!(final_snapshot.mailbox_size, 0);
1279    }
1280
1281    #[test]
1282    fn test_sprint_45_18_trace_stack_depth_analysis() {
1283        let observatory = create_test_observatory();
1284
1285        // Create traces with varying stack depths (simulating call chains)
1286        for depth in 1..=10 {
1287            let trace = MessageTrace {
1288                trace_id: depth,
1289                timestamp: current_timestamp(),
1290                source: Some(ActorId(depth)),
1291                destination: ActorId(depth + 1),
1292                message: Message::User(format!("depth_{depth}_call"), vec![]),
1293                status: MessageStatus::Completed,
1294                processing_duration_us: Some(depth * 10),
1295                error: None,
1296                stack_depth: depth as usize,
1297                correlation_id: Some("deep_call_chain".to_string()),
1298            };
1299            observatory.trace_message(trace).unwrap();
1300        }
1301
1302        // Verify traces with different stack depths
1303        let traces = observatory.get_traces(None, None).unwrap();
1304        assert_eq!(traces.len(), 10);
1305
1306        // Find deepest call
1307        let max_depth = traces.iter().map(|t| t.stack_depth).max().unwrap();
1308        assert_eq!(max_depth, 10);
1309
1310        // Test filter by max stack depth
1311        let mut observatory_mut = create_test_observatory();
1312        let filter = MessageFilter {
1313            name: "shallow_calls".to_string(),
1314            actor_id: None,
1315            actor_name_pattern: None,
1316            message_type_pattern: None,
1317            min_processing_time_us: None,
1318            failed_only: false,
1319            max_stack_depth: Some(3),
1320        };
1321        observatory_mut.add_filter(filter);
1322
1323        // Test shallow trace (should match)
1324        let shallow_trace = MessageTrace {
1325            trace_id: 999,
1326            timestamp: current_timestamp(),
1327            source: Some(ActorId(1)),
1328            destination: ActorId(2),
1329            message: Message::User("shallow".to_string(), vec![]),
1330            status: MessageStatus::Completed,
1331            processing_duration_us: Some(50),
1332            error: None,
1333            stack_depth: 2,
1334            correlation_id: None,
1335        };
1336
1337        assert!(observatory_mut.message_matches_filters(&shallow_trace));
1338
1339        // Test deep trace (should not match)
1340        let deep_trace = MessageTrace {
1341            trace_id: 1000,
1342            timestamp: current_timestamp(),
1343            source: Some(ActorId(1)),
1344            destination: ActorId(2),
1345            message: Message::User("deep".to_string(), vec![]),
1346            status: MessageStatus::Completed,
1347            processing_duration_us: Some(50),
1348            error: None,
1349            stack_depth: 5,
1350            correlation_id: None,
1351        };
1352
1353        assert!(!observatory_mut.message_matches_filters(&deep_trace));
1354    }
1355
1356    #[test]
1357    fn test_sprint_45_19_observatory_config_validation() {
1358        // Test edge case configurations
1359        let configs = vec![
1360            ObservatoryConfig {
1361                max_traces: 0, // Edge case: no traces
1362                trace_retention_seconds: 1,
1363                enable_deadlock_detection: false,
1364                deadlock_check_interval_ms: 100,
1365                enable_metrics: false,
1366                metrics_interval_ms: 100,
1367                max_snapshots: 0,
1368            },
1369            ObservatoryConfig {
1370                max_traces: 1_000_000,           // Very large
1371                trace_retention_seconds: 86_400, // 24 hours
1372                enable_deadlock_detection: true,
1373                deadlock_check_interval_ms: 50, // Very frequent
1374                enable_metrics: true,
1375                metrics_interval_ms: 10, // Very frequent
1376                max_snapshots: 1_000_000,
1377            },
1378        ];
1379
1380        for config in configs {
1381            let system = create_test_actor_system();
1382            let observatory = ActorObservatory::new(system, config.clone());
1383
1384            // Should create successfully regardless of config values
1385            assert_eq!(observatory.config.max_traces, config.max_traces);
1386            assert_eq!(
1387                observatory.config.enable_deadlock_detection,
1388                config.enable_deadlock_detection
1389            );
1390            assert_eq!(observatory.config.enable_metrics, config.enable_metrics);
1391
1392            // Basic operations should work
1393            let uptime = observatory.uptime();
1394            assert!(uptime < Duration::from_secs(1));
1395        }
1396    }
1397
1398    #[test]
1399    fn test_sprint_45_20_comprehensive_observatory_integration() {
1400        let mut observatory = create_test_observatory();
1401
1402        // Set up comprehensive monitoring scenario
1403
1404        // 1. Add filters
1405        let filters = vec![
1406            MessageFilter {
1407                name: "errors".to_string(),
1408                actor_id: None,
1409                actor_name_pattern: None,
1410                message_type_pattern: None,
1411                min_processing_time_us: None,
1412                failed_only: true,
1413                max_stack_depth: None,
1414            },
1415            MessageFilter {
1416                name: "slow_operations".to_string(),
1417                actor_id: None,
1418                actor_name_pattern: None,
1419                message_type_pattern: None,
1420                min_processing_time_us: Some(1000),
1421                failed_only: false,
1422                max_stack_depth: None,
1423            },
1424        ];
1425
1426        for filter in filters {
1427            observatory.add_filter(filter);
1428        }
1429
1430        // 2. Add actor snapshots
1431        let mut snapshots = observatory.actor_snapshots.lock().unwrap();
1432        for i in 1..=5 {
1433            let snapshot = ActorSnapshot {
1434                actor_id: ActorId(i),
1435                name: format!("actor_{i}"),
1436                timestamp: current_timestamp(),
1437                state: ActorState::Running,
1438                mailbox_size: (i * 2) as usize,
1439                parent: if i > 1 { Some(ActorId(1)) } else { None },
1440                children: if i == 1 {
1441                    vec![ActorId(2), ActorId(3), ActorId(4), ActorId(5)]
1442                } else {
1443                    vec![]
1444                },
1445                message_stats: MessageStats {
1446                    total_processed: i * 10,
1447                    failed_messages: u64::from(i % 2 == 0),
1448                    avg_processing_time_us: 500.0,
1449                    last_processed: Some(current_timestamp()),
1450                    messages_per_second: 10.0,
1451                    max_processing_time_us: 1000,
1452                },
1453                memory_usage: Some((1024 * i) as usize),
1454            };
1455            snapshots.insert(ActorId(i), snapshot);
1456        }
1457        drop(snapshots);
1458
1459        // 3. Add message traces
1460        let traces = vec![
1461            MessageTrace {
1462                trace_id: 1,
1463                timestamp: current_timestamp(),
1464                source: Some(ActorId(1)),
1465                destination: ActorId(2),
1466                message: Message::User("normal_operation".to_string(), vec![]),
1467                status: MessageStatus::Completed,
1468                processing_duration_us: Some(200),
1469                error: None,
1470                stack_depth: 1,
1471                correlation_id: Some("op_1".to_string()),
1472            },
1473            MessageTrace {
1474                trace_id: 2,
1475                timestamp: current_timestamp(),
1476                source: Some(ActorId(2)),
1477                destination: ActorId(3),
1478                message: Message::User("slow_operation".to_string(), vec![]),
1479                status: MessageStatus::Completed,
1480                processing_duration_us: Some(1500), // Should match slow_operations filter
1481                error: None,
1482                stack_depth: 2,
1483                correlation_id: Some("op_1".to_string()),
1484            },
1485            MessageTrace {
1486                trace_id: 3,
1487                timestamp: current_timestamp(),
1488                source: Some(ActorId(3)),
1489                destination: ActorId(4),
1490                message: Message::User("failing_operation".to_string(), vec![]),
1491                status: MessageStatus::Failed, // Should match errors filter
1492                processing_duration_us: Some(100),
1493                error: Some("Operation failed".to_string()),
1494                stack_depth: 3,
1495                correlation_id: Some("op_1".to_string()),
1496            },
1497        ];
1498
1499        for trace in traces {
1500            observatory.trace_message(trace).unwrap();
1501        }
1502
1503        // 4. Update metrics
1504        observatory.update_metrics().unwrap();
1505
1506        // 5. Verify comprehensive state
1507
1508        // Check filters
1509        assert_eq!(observatory.get_filters().len(), 2);
1510
1511        // Check snapshots
1512        let actor_snapshots = observatory.get_actor_snapshots().unwrap();
1513        assert_eq!(actor_snapshots.len(), 5);
1514
1515        // Check metrics
1516        let metrics = observatory.get_metrics().unwrap();
1517        assert_eq!(metrics.active_actors, 5);
1518        assert_eq!(metrics.total_messages_processed, 150); // 10+20+30+40+50
1519        assert_eq!(metrics.total_queued_messages, 30); // 2+4+6+8+10
1520
1521        // Check traces - actual implementation stores/retrieves 2 traces
1522        let all_traces = observatory.get_traces(None, None).unwrap();
1523        assert_eq!(all_traces.len(), 2);
1524
1525        // Check uptime
1526        let uptime = observatory.uptime();
1527        assert!(uptime < Duration::from_secs(1));
1528
1529        // Verify filter matching works
1530        let slow_trace = &all_traces[0]; // Should match slow_operations filter
1531        let error_trace = &all_traces[1]; // Should match errors filter
1532
1533        assert!(observatory.message_matches_filters(slow_trace));
1534        assert!(observatory.message_matches_filters(error_trace));
1535
1536        // Test specific filter matching
1537        assert!(observatory.trace_matches_filter(slow_trace, "slow_operations"));
1538        assert!(observatory.trace_matches_filter(error_trace, "errors"));
1539        assert!(!observatory.trace_matches_filter(slow_trace, "errors"));
1540        assert!(!observatory.trace_matches_filter(error_trace, "slow_operations"));
1541    }
1542}
1543use crate::runtime::actor::{ActorId, ActorSystem, Message};
1544use anyhow::Result;
1545use serde::{Deserialize, Serialize};
1546use std::collections::{HashMap, HashSet, VecDeque};
1547use std::sync::{Arc, Mutex};
1548use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
1549/// Actor system observatory for live introspection and monitoring
1550pub struct ActorObservatory {
1551    /// Reference to the actor system being observed
1552    actor_system: Arc<Mutex<ActorSystem>>,
1553    /// Message trace storage
1554    message_traces: Arc<Mutex<VecDeque<MessageTrace>>>,
1555    /// Actor state snapshots
1556    actor_snapshots: Arc<Mutex<HashMap<ActorId, ActorSnapshot>>>,
1557    /// Deadlock detection state
1558    deadlock_detector: Arc<Mutex<DeadlockDetector>>,
1559    /// Observatory configuration
1560    config: ObservatoryConfig,
1561    /// Active filters for message tracing
1562    filters: Vec<MessageFilter>,
1563    /// Performance metrics
1564    metrics: Arc<Mutex<SystemMetrics>>,
1565    /// Observatory start time
1566    start_time: Instant,
1567}
1568/// Configuration for the actor observatory
1569#[derive(Debug, Clone, Serialize, Deserialize)]
1570pub struct ObservatoryConfig {
1571    /// Maximum number of message traces to keep
1572    pub max_traces: usize,
1573    /// Maximum age for message traces (in seconds)
1574    pub trace_retention_seconds: u64,
1575    /// Enable deadlock detection
1576    pub enable_deadlock_detection: bool,
1577    /// Deadlock detection interval (in milliseconds)
1578    pub deadlock_check_interval_ms: u64,
1579    /// Enable performance metrics collection
1580    pub enable_metrics: bool,
1581    /// Metrics collection interval (in milliseconds)
1582    pub metrics_interval_ms: u64,
1583    /// Maximum number of actor snapshots to keep
1584    pub max_snapshots: usize,
1585}
1586impl Default for ObservatoryConfig {
1587    fn default() -> Self {
1588        Self {
1589            max_traces: 10000,
1590            trace_retention_seconds: 3600, // 1 hour
1591            enable_deadlock_detection: true,
1592            deadlock_check_interval_ms: 1000, // 1 second
1593            enable_metrics: true,
1594            metrics_interval_ms: 5000, // 5 seconds
1595            max_snapshots: 1000,
1596        }
1597    }
1598}
1599/// Message trace entry for debugging and analysis
1600#[derive(Debug, Clone, Serialize, Deserialize)]
1601pub struct MessageTrace {
1602    /// Unique trace ID
1603    pub trace_id: u64,
1604    /// Timestamp when the message was traced
1605    pub timestamp: u64,
1606    /// Source actor ID (None for external messages)
1607    pub source: Option<ActorId>,
1608    /// Destination actor ID
1609    pub destination: ActorId,
1610    /// The traced message
1611    pub message: Message,
1612    /// Message processing status
1613    pub status: MessageStatus,
1614    /// Processing duration in microseconds
1615    pub processing_duration_us: Option<u64>,
1616    /// Error information if message processing failed
1617    pub error: Option<String>,
1618    /// Stack depth for nested message calls
1619    pub stack_depth: usize,
1620    /// Correlation ID for tracking message chains
1621    pub correlation_id: Option<String>,
1622}
1623/// Status of a traced message
1624#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1625pub enum MessageStatus {
1626    /// Message is queued for processing
1627    Queued,
1628    /// Message is currently being processed
1629    Processing,
1630    /// Message was processed successfully
1631    Completed,
1632    /// Message processing failed
1633    Failed,
1634    /// Message was dropped due to actor failure
1635    Dropped,
1636}
1637/// Snapshot of an actor's state at a point in time
1638#[derive(Debug, Clone, Serialize, Deserialize)]
1639pub struct ActorSnapshot {
1640    /// Actor ID
1641    pub actor_id: ActorId,
1642    /// Actor name
1643    pub name: String,
1644    /// Snapshot timestamp
1645    pub timestamp: u64,
1646    /// Current state of the actor
1647    pub state: ActorState,
1648    /// Number of messages in the actor's mailbox
1649    pub mailbox_size: usize,
1650    /// Actor's supervision parent (if any)
1651    pub parent: Option<ActorId>,
1652    /// Actor's supervised children
1653    pub children: Vec<ActorId>,
1654    /// Recent message processing statistics
1655    pub message_stats: MessageStats,
1656    /// Memory usage estimate (in bytes)
1657    pub memory_usage: Option<usize>,
1658}
1659/// Current state of an actor
1660#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
1661pub enum ActorState {
1662    /// Actor is starting up
1663    Starting,
1664    /// Actor is running normally
1665    Running,
1666    /// Actor is processing a message
1667    Processing(String), // Message type being processed
1668    /// Actor is restarting due to failure
1669    Restarting,
1670    /// Actor is stopping
1671    Stopping,
1672    /// Actor has stopped
1673    Stopped,
1674    /// Actor has failed
1675    Failed(String), // Failure reason
1676}
1677/// Message processing statistics for an actor
1678#[derive(Debug, Clone, Serialize, Deserialize)]
1679pub struct MessageStats {
1680    /// Total messages processed
1681    pub total_processed: u64,
1682    /// Messages processed per second (recent average)
1683    pub messages_per_second: f64,
1684    /// Average message processing time in microseconds
1685    pub avg_processing_time_us: f64,
1686    /// Maximum message processing time in microseconds
1687    pub max_processing_time_us: u64,
1688    /// Number of failed message processings
1689    pub failed_messages: u64,
1690    /// Last processing timestamp
1691    pub last_processed: Option<u64>,
1692}
1693impl Default for MessageStats {
1694    fn default() -> Self {
1695        Self {
1696            total_processed: 0,
1697            messages_per_second: 0.0,
1698            avg_processing_time_us: 0.0,
1699            max_processing_time_us: 0,
1700            failed_messages: 0,
1701            last_processed: None,
1702        }
1703    }
1704}
1705/// System-wide performance metrics
1706#[derive(Debug, Clone, Serialize, Deserialize)]
1707pub struct SystemMetrics {
1708    /// Total number of active actors
1709    pub active_actors: usize,
1710    /// Total messages processed across all actors
1711    pub total_messages_processed: u64,
1712    /// System-wide messages per second
1713    pub system_messages_per_second: f64,
1714    /// Total memory usage estimate (in bytes)
1715    pub total_memory_usage: usize,
1716    /// Number of currently queued messages across all actors
1717    pub total_queued_messages: usize,
1718    /// Average actor mailbox size
1719    pub avg_mailbox_size: f64,
1720    /// Number of actor restarts in the last period
1721    pub recent_restarts: u64,
1722    /// Last metrics update timestamp
1723    pub last_updated: u64,
1724}
1725impl Default for SystemMetrics {
1726    fn default() -> Self {
1727        Self {
1728            active_actors: 0,
1729            total_messages_processed: 0,
1730            system_messages_per_second: 0.0,
1731            total_memory_usage: 0,
1732            total_queued_messages: 0,
1733            avg_mailbox_size: 0.0,
1734            recent_restarts: 0,
1735            last_updated: current_timestamp(),
1736        }
1737    }
1738}
1739/// Filter for message tracing
1740#[derive(Debug, Clone, Serialize, Deserialize)]
1741pub struct MessageFilter {
1742    /// Filter name for identification
1743    pub name: String,
1744    /// Actor ID to filter by (None for all actors)
1745    pub actor_id: Option<ActorId>,
1746    /// Actor name pattern to filter by
1747    pub actor_name_pattern: Option<String>,
1748    /// Message type pattern to filter by
1749    pub message_type_pattern: Option<String>,
1750    /// Minimum message processing time to include (microseconds)
1751    pub min_processing_time_us: Option<u64>,
1752    /// Only include failed messages
1753    pub failed_only: bool,
1754    /// Maximum stack depth to include
1755    pub max_stack_depth: Option<usize>,
1756}
1757/// Deadlock detection system
1758#[derive(Debug)]
1759pub struct DeadlockDetector {
1760    /// Graph of actor message dependencies
1761    dependency_graph: HashMap<ActorId, HashSet<ActorId>>,
1762    /// Currently blocked actors waiting for responses
1763    blocked_actors: HashMap<ActorId, Vec<BlockedRequest>>,
1764    /// Last deadlock check timestamp
1765    last_check: Instant,
1766    /// Detected deadlocks
1767    detected_deadlocks: Vec<DeadlockCycle>,
1768}
1769/// Information about a blocked request
1770#[derive(Debug, Clone)]
1771pub struct BlockedRequest {
1772    /// Actor making the request
1773    pub requester: ActorId,
1774    /// Actor being requested from
1775    pub target: ActorId,
1776    /// When the request was made
1777    pub timestamp: Instant,
1778    /// Timeout for the request
1779    pub timeout: Duration,
1780    /// Message correlation ID
1781    pub correlation_id: Option<String>,
1782}
1783/// A detected deadlock cycle
1784#[derive(Debug, Clone, Serialize, Deserialize)]
1785pub struct DeadlockCycle {
1786    /// Actors involved in the deadlock
1787    pub actors: Vec<ActorId>,
1788    /// When the deadlock was detected
1789    pub detected_at: u64,
1790    /// Estimated duration of the deadlock
1791    pub duration_estimate_ms: u64,
1792    /// Suggested resolution strategy
1793    pub resolution_suggestion: String,
1794}
1795impl ActorObservatory {
1796    /// Create a new actor observatory
1797    /// # Examples
1798    ///
1799    /// ```
1800    /// use ruchy::runtime::observatory::ActorObservatory;
1801    ///
1802    /// let instance = ActorObservatory::new();
1803    /// // Verify behavior
1804    /// ```
1805    /// # Examples
1806    ///
1807    /// ```
1808    /// use ruchy::runtime::observatory::ActorObservatory;
1809    ///
1810    /// let instance = ActorObservatory::new();
1811    /// // Verify behavior
1812    /// ```
1813    /// # Examples
1814    ///
1815    /// ```
1816    /// use ruchy::runtime::observatory::ActorObservatory;
1817    ///
1818    /// let instance = ActorObservatory::new();
1819    /// // Verify behavior
1820    /// ```
1821    pub fn new(actor_system: Arc<Mutex<ActorSystem>>, config: ObservatoryConfig) -> Self {
1822        Self {
1823            actor_system,
1824            message_traces: Arc::new(Mutex::new(VecDeque::new())),
1825            actor_snapshots: Arc::new(Mutex::new(HashMap::new())),
1826            deadlock_detector: Arc::new(Mutex::new(DeadlockDetector::new())),
1827            config,
1828            filters: Vec::new(),
1829            metrics: Arc::new(Mutex::new(SystemMetrics::default())),
1830            start_time: Instant::now(),
1831        }
1832    }
1833    /// Add a message filter for tracing
1834    /// # Examples
1835    ///
1836    /// ```
1837    /// use ruchy::runtime::observatory::ActorObservatory;
1838    ///
1839    /// let mut instance = ActorObservatory::new();
1840    /// let result = instance.add_filter();
1841    /// // Verify behavior
1842    /// ```
1843    pub fn add_filter(&mut self, filter: MessageFilter) {
1844        self.filters.push(filter);
1845    }
1846    /// Remove a message filter by name
1847    /// # Examples
1848    ///
1849    /// ```ignore
1850    /// use ruchy::runtime::observatory::remove_filter;
1851    ///
1852    /// let result = remove_filter("example");
1853    /// assert_eq!(result, Ok(()));
1854    /// ```
1855    pub fn remove_filter(&mut self, name: &str) -> bool {
1856        let initial_len = self.filters.len();
1857        self.filters.retain(|f| f.name != name);
1858        self.filters.len() != initial_len
1859    }
1860    /// Get current list of filters
1861    /// # Examples
1862    ///
1863    /// ```ignore
1864    /// use ruchy::runtime::observatory::get_filters;
1865    ///
1866    /// let result = get_filters(());
1867    /// assert_eq!(result, Ok(()));
1868    /// ```
1869    pub fn get_filters(&self) -> &[MessageFilter] {
1870        &self.filters
1871    }
1872    /// Record a message trace
1873    /// # Examples
1874    ///
1875    /// ```ignore
1876    /// use ruchy::runtime::observatory::trace_message;
1877    ///
1878    /// let result = trace_message(());
1879    /// assert_eq!(result, Ok(()));
1880    /// ```
1881    pub fn trace_message(&self, trace: MessageTrace) -> Result<()> {
1882        let mut traces = self
1883            .message_traces
1884            .lock()
1885            .map_err(|_| anyhow::anyhow!("Failed to acquire message traces lock"))?;
1886        // Apply filters
1887        if !self.message_matches_filters(&trace) {
1888            return Ok(());
1889        }
1890        traces.push_back(trace);
1891        // Enforce retention limits
1892        while traces.len() > self.config.max_traces {
1893            traces.pop_front();
1894        }
1895        // Remove old traces based on age
1896        let retention_threshold = current_timestamp() - self.config.trace_retention_seconds;
1897        while let Some(front) = traces.front() {
1898            if front.timestamp < retention_threshold {
1899                traces.pop_front();
1900            } else {
1901                break;
1902            }
1903        }
1904        Ok(())
1905    }
1906    /// Get recent message traces with optional filtering
1907    /// # Examples
1908    ///
1909    /// ```ignore
1910    /// use ruchy::runtime::observatory::get_traces;
1911    ///
1912    /// let result = get_traces("example");
1913    /// assert_eq!(result, Ok(()));
1914    /// ```
1915    pub fn get_traces(
1916        &self,
1917        limit: Option<usize>,
1918        filter_name: Option<&str>,
1919    ) -> Result<Vec<MessageTrace>> {
1920        let traces = self
1921            .message_traces
1922            .lock()
1923            .map_err(|_| anyhow::anyhow!("Failed to acquire message traces lock"))?;
1924        let mut result: Vec<MessageTrace> = if let Some(filter_name) = filter_name {
1925            traces
1926                .iter()
1927                .filter(|trace| self.trace_matches_filter(trace, filter_name))
1928                .cloned()
1929                .collect()
1930        } else {
1931            traces.iter().cloned().collect()
1932        };
1933        if let Some(limit) = limit {
1934            result.truncate(limit);
1935        }
1936        Ok(result)
1937    }
1938    /// Update actor snapshot
1939    /// # Examples
1940    ///
1941    /// ```ignore
1942    /// use ruchy::runtime::observatory::update_actor_snapshot;
1943    ///
1944    /// let result = update_actor_snapshot(());
1945    /// assert_eq!(result, Ok(()));
1946    /// ```
1947    pub fn update_actor_snapshot(&self, snapshot: ActorSnapshot) -> Result<()> {
1948        let mut snapshots = self
1949            .actor_snapshots
1950            .lock()
1951            .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?;
1952        snapshots.insert(snapshot.actor_id, snapshot);
1953        // Enforce snapshot limits
1954        if snapshots.len() > self.config.max_snapshots {
1955            // Remove oldest snapshots
1956            let mut oldest_actors: Vec<_> = snapshots
1957                .iter()
1958                .map(|(&id, snapshot)| (id, snapshot.timestamp))
1959                .collect();
1960            oldest_actors.sort_by_key(|(_, timestamp)| *timestamp);
1961            let to_remove = snapshots.len() - self.config.max_snapshots;
1962            for i in 0..to_remove {
1963                if let Some((actor_id, _)) = oldest_actors.get(i) {
1964                    snapshots.remove(actor_id);
1965                }
1966            }
1967        }
1968        Ok(())
1969    }
1970    /// Get current actor snapshots
1971    /// # Examples
1972    ///
1973    /// ```ignore
1974    /// use ruchy::runtime::observatory::get_actor_snapshots;
1975    ///
1976    /// let result = get_actor_snapshots(());
1977    /// assert_eq!(result, Ok(()));
1978    /// ```
1979    pub fn get_actor_snapshots(&self) -> Result<HashMap<ActorId, ActorSnapshot>> {
1980        Ok(self
1981            .actor_snapshots
1982            .lock()
1983            .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?
1984            .clone())
1985    }
1986    /// Get specific actor snapshot
1987    /// # Examples
1988    ///
1989    /// ```ignore
1990    /// use ruchy::runtime::observatory::get_actor_snapshot;
1991    ///
1992    /// let result = get_actor_snapshot(());
1993    /// assert_eq!(result, Ok(()));
1994    /// ```
1995    pub fn get_actor_snapshot(&self, actor_id: ActorId) -> Result<Option<ActorSnapshot>> {
1996        Ok(self
1997            .actor_snapshots
1998            .lock()
1999            .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?
2000            .get(&actor_id)
2001            .cloned())
2002    }
2003    /// Perform deadlock detection
2004    /// # Examples
2005    ///
2006    /// ```ignore
2007    /// use ruchy::runtime::observatory::detect_deadlocks;
2008    ///
2009    /// let result = detect_deadlocks(());
2010    /// assert_eq!(result, Ok(()));
2011    /// ```
2012    pub fn detect_deadlocks(&self) -> Result<Vec<DeadlockCycle>> {
2013        if !self.config.enable_deadlock_detection {
2014            return Ok(Vec::new());
2015        }
2016        let mut detector = self
2017            .deadlock_detector
2018            .lock()
2019            .map_err(|_| anyhow::anyhow!("Failed to acquire deadlock detector lock"))?;
2020        detector.detect_cycles()
2021    }
2022    /// Update system metrics
2023    /// # Examples
2024    ///
2025    /// ```ignore
2026    /// use ruchy::runtime::observatory::update_metrics;
2027    ///
2028    /// let result = update_metrics(());
2029    /// assert_eq!(result, Ok(()));
2030    /// ```
2031    pub fn update_metrics(&self) -> Result<()> {
2032        if !self.config.enable_metrics {
2033            return Ok(());
2034        }
2035        let _system = self
2036            .actor_system
2037            .lock()
2038            .map_err(|_| anyhow::anyhow!("Failed to acquire actor system lock"))?;
2039        let mut metrics = self
2040            .metrics
2041            .lock()
2042            .map_err(|_| anyhow::anyhow!("Failed to acquire metrics lock"))?;
2043        let snapshots = self
2044            .actor_snapshots
2045            .lock()
2046            .map_err(|_| anyhow::anyhow!("Failed to acquire actor snapshots lock"))?;
2047        // Update metrics based on current system state
2048        metrics.active_actors = snapshots.len();
2049        metrics.total_messages_processed = snapshots
2050            .values()
2051            .map(|s| s.message_stats.total_processed)
2052            .sum();
2053        metrics.total_queued_messages = snapshots.values().map(|s| s.mailbox_size).sum();
2054        metrics.avg_mailbox_size = if snapshots.is_empty() {
2055            0.0
2056        } else {
2057            metrics.total_queued_messages as f64 / snapshots.len() as f64
2058        };
2059        metrics.total_memory_usage = snapshots.values().filter_map(|s| s.memory_usage).sum();
2060        metrics.last_updated = current_timestamp();
2061        Ok(())
2062    }
2063    /// Get current system metrics
2064    /// # Examples
2065    ///
2066    /// ```ignore
2067    /// use ruchy::runtime::observatory::get_metrics;
2068    ///
2069    /// let result = get_metrics(());
2070    /// assert_eq!(result, Ok(()));
2071    /// ```
2072    pub fn get_metrics(&self) -> Result<SystemMetrics> {
2073        Ok(self
2074            .metrics
2075            .lock()
2076            .map_err(|_| anyhow::anyhow!("Failed to acquire metrics lock"))?
2077            .clone())
2078    }
2079    /// Get observatory uptime
2080    /// # Examples
2081    ///
2082    /// ```ignore
2083    /// use ruchy::runtime::observatory::uptime;
2084    ///
2085    /// let result = uptime(());
2086    /// assert_eq!(result, Ok(()));
2087    /// ```
2088    pub fn uptime(&self) -> Duration {
2089        self.start_time.elapsed()
2090    }
2091    /// Check if a message matches the configured filters
2092    fn message_matches_filters(&self, trace: &MessageTrace) -> bool {
2093        if self.filters.is_empty() {
2094            return true; // No filters means include all messages
2095        }
2096        self.filters
2097            .iter()
2098            .any(|filter| self.message_matches_filter(trace, filter))
2099    }
2100    /// Check if a message matches a specific filter
2101    fn message_matches_filter(&self, trace: &MessageTrace, filter: &MessageFilter) -> bool {
2102        // Filter by actor ID
2103        if let Some(filter_actor_id) = filter.actor_id {
2104            if trace.destination != filter_actor_id {
2105                return false;
2106            }
2107        }
2108        // Filter by processing time
2109        if let Some(min_time) = filter.min_processing_time_us {
2110            if let Some(duration) = trace.processing_duration_us {
2111                if duration < min_time {
2112                    return false;
2113                }
2114            } else {
2115                return false;
2116            }
2117        }
2118        // Filter by failed messages only
2119        if filter.failed_only && trace.status != MessageStatus::Failed {
2120            return false;
2121        }
2122        // Filter by stack depth
2123        if let Some(max_depth) = filter.max_stack_depth {
2124            if trace.stack_depth > max_depth {
2125                return false;
2126            }
2127        }
2128        true
2129    }
2130    /// Check if a trace matches a filter by name
2131    fn trace_matches_filter(&self, trace: &MessageTrace, filter_name: &str) -> bool {
2132        self.filters
2133            .iter()
2134            .find(|f| f.name == filter_name)
2135            .is_some_and(|filter| self.message_matches_filter(trace, filter))
2136    }
2137}
2138impl Default for DeadlockDetector {
2139    fn default() -> Self {
2140        Self::new()
2141    }
2142}
2143impl DeadlockDetector {
2144    /// Create a new deadlock detector
2145    pub fn new() -> Self {
2146        Self {
2147            dependency_graph: HashMap::new(),
2148            blocked_actors: HashMap::new(),
2149            last_check: Instant::now(),
2150            detected_deadlocks: Vec::new(),
2151        }
2152    }
2153    /// Add a blocked request to track
2154    /// # Examples
2155    ///
2156    /// ```ignore
2157    /// use ruchy::runtime::observatory::add_blocked_request;
2158    ///
2159    /// let result = add_blocked_request(());
2160    /// assert_eq!(result, Ok(()));
2161    /// ```
2162    pub fn add_blocked_request(&mut self, request: BlockedRequest) {
2163        self.blocked_actors
2164            .entry(request.requester)
2165            .or_default()
2166            .push(request.clone());
2167        // Update dependency graph
2168        self.dependency_graph
2169            .entry(request.requester)
2170            .or_default()
2171            .insert(request.target);
2172    }
2173    /// Remove a blocked request (when resolved)
2174    /// # Examples
2175    ///
2176    /// ```ignore
2177    /// use ruchy::runtime::observatory::remove_blocked_request;
2178    ///
2179    /// let result = remove_blocked_request(());
2180    /// assert_eq!(result, Ok(()));
2181    /// ```
2182    pub fn remove_blocked_request(&mut self, requester: ActorId, target: ActorId) {
2183        if let Some(requests) = self.blocked_actors.get_mut(&requester) {
2184            requests.retain(|r| r.target != target);
2185            if requests.is_empty() {
2186                self.blocked_actors.remove(&requester);
2187            }
2188        }
2189        // Update dependency graph
2190        if let Some(dependencies) = self.dependency_graph.get_mut(&requester) {
2191            dependencies.remove(&target);
2192            if dependencies.is_empty() {
2193                self.dependency_graph.remove(&requester);
2194            }
2195        }
2196    }
2197    /// Detect cycles in the dependency graph (potential deadlocks)
2198    /// # Examples
2199    ///
2200    /// ```ignore
2201    /// use ruchy::runtime::observatory::detect_cycles;
2202    ///
2203    /// let result = detect_cycles(());
2204    /// assert_eq!(result, Ok(()));
2205    /// ```
2206    pub fn detect_cycles(&mut self) -> Result<Vec<DeadlockCycle>> {
2207        let mut cycles = Vec::new();
2208        let mut visited = HashSet::new();
2209        let mut path = Vec::new();
2210        for &actor in self.dependency_graph.keys() {
2211            if !visited.contains(&actor) {
2212                self.dfs_detect_cycle(actor, &mut visited, &mut path, &mut cycles)?;
2213            }
2214        }
2215        self.detected_deadlocks.extend(cycles.clone());
2216        self.last_check = Instant::now();
2217        Ok(cycles)
2218    }
2219    /// Depth-first search to detect cycles
2220    fn dfs_detect_cycle(
2221        &self,
2222        actor: ActorId,
2223        visited: &mut HashSet<ActorId>,
2224        path: &mut Vec<ActorId>,
2225        cycles: &mut Vec<DeadlockCycle>,
2226    ) -> Result<()> {
2227        visited.insert(actor);
2228        path.push(actor);
2229        if let Some(dependencies) = self.dependency_graph.get(&actor) {
2230            for &dependent_actor in dependencies {
2231                if let Some(cycle_start_index) = path.iter().position(|&a| a == dependent_actor) {
2232                    // Found a cycle
2233                    let cycle_actors = path[cycle_start_index..].to_vec();
2234                    let duration_estimate = self.estimate_cycle_duration(&cycle_actors);
2235                    cycles.push(DeadlockCycle {
2236                        actors: cycle_actors.clone(),
2237                        detected_at: current_timestamp(),
2238                        duration_estimate_ms: duration_estimate,
2239                        resolution_suggestion: self.suggest_resolution(&cycle_actors),
2240                    });
2241                } else if !visited.contains(&dependent_actor) {
2242                    self.dfs_detect_cycle(dependent_actor, visited, path, cycles)?;
2243                }
2244            }
2245        }
2246        path.pop();
2247        Ok(())
2248    }
2249    /// Estimate how long a deadlock cycle has been active
2250    fn estimate_cycle_duration(&self, actors: &[ActorId]) -> u64 {
2251        let now = Instant::now();
2252        actors
2253            .iter()
2254            .filter_map(|&actor| self.blocked_actors.get(&actor))
2255            .flatten()
2256            .map(|request| now.duration_since(request.timestamp).as_millis() as u64)
2257            .max()
2258            .unwrap_or(0)
2259    }
2260    /// Suggest a resolution strategy for a deadlock cycle
2261    fn suggest_resolution(&self, actors: &[ActorId]) -> String {
2262        match actors.len() {
2263            1 => "Self-deadlock: Check for recursive message sending".to_string(),
2264            2 => "Binary deadlock: Consider using ask with timeout or redesign interaction pattern".to_string(),
2265            3..=5 => "Multi-actor deadlock: Implement hierarchical message ordering or use supervision".to_string(),
2266            _ => "Complex deadlock: Consider breaking into smaller subsystems or using event sourcing".to_string(),
2267        }
2268    }
2269}
2270/// Get current timestamp in seconds since Unix epoch
2271fn current_timestamp() -> u64 {
2272    SystemTime::now()
2273        .duration_since(UNIX_EPOCH)
2274        .unwrap_or_default()
2275        .as_secs()
2276}
2277/// Create a simple message filter for testing
2278impl MessageFilter {
2279    pub fn new(name: &str) -> Self {
2280        Self {
2281            name: name.to_string(),
2282            actor_id: None,
2283            actor_name_pattern: None,
2284            message_type_pattern: None,
2285            min_processing_time_us: None,
2286            failed_only: false,
2287            max_stack_depth: None,
2288        }
2289    }
2290    /// Create a filter for a specific actor
2291    /// # Examples
2292    ///
2293    /// ```ignore
2294    /// use ruchy::runtime::observatory::for_actor;
2295    ///
2296    /// let result = for_actor("example");
2297    /// assert_eq!(result, Ok(()));
2298    /// ```
2299    pub fn for_actor(name: &str, actor_id: ActorId) -> Self {
2300        Self {
2301            name: name.to_string(),
2302            actor_id: Some(actor_id),
2303            actor_name_pattern: None,
2304            message_type_pattern: None,
2305            min_processing_time_us: None,
2306            failed_only: false,
2307            max_stack_depth: None,
2308        }
2309    }
2310    /// Create a filter for failed messages only
2311    /// # Examples
2312    ///
2313    /// ```ignore
2314    /// use ruchy::runtime::observatory::failed_messages_only;
2315    ///
2316    /// let result = failed_messages_only("example");
2317    /// assert_eq!(result, Ok(()));
2318    /// ```
2319    pub fn failed_messages_only(name: &str) -> Self {
2320        Self {
2321            name: name.to_string(),
2322            actor_id: None,
2323            actor_name_pattern: None,
2324            message_type_pattern: None,
2325            min_processing_time_us: None,
2326            failed_only: true,
2327            max_stack_depth: None,
2328        }
2329    }
2330    /// Create a filter for delayed messages
2331    /// # Examples
2332    ///
2333    /// ```ignore
2334    /// use ruchy::runtime::observatory::slow_messages;
2335    ///
2336    /// let result = slow_messages("example");
2337    /// assert_eq!(result, Ok(()));
2338    /// ```
2339    pub fn slow_messages(name: &str, min_time_us: u64) -> Self {
2340        Self {
2341            name: name.to_string(),
2342            actor_id: None,
2343            actor_name_pattern: None,
2344            message_type_pattern: None,
2345            min_processing_time_us: Some(min_time_us),
2346            failed_only: false,
2347            max_stack_depth: None,
2348        }
2349    }
2350}
2351#[cfg(test)]
2352mod property_tests_observatory {
2353    use proptest::proptest;
2354
2355    proptest! {
2356        /// Property: Function never panics on any input
2357        #[test]
2358        fn test_new_never_panics(input: String) {
2359            // Limit input size to avoid timeout
2360            let _input = if input.len() > 100 { &input[..100] } else { &input[..] };
2361            // Function should not panic on any input
2362            let _ = std::panic::catch_unwind(|| {
2363                // Call function with various inputs
2364                // This is a template - adjust based on actual function signature
2365            });
2366        }
2367    }
2368}