Skip to main content

rill_patchbay/
observer.rs

1//! Micro-control observer — RT safety monitor with actor-based telemetry.
2
3use parking_lot::RwLock;
4use rill_core::queues::telemetry::Telemetry;
5use rill_core::traits::{ParameterId, PortId};
6use rill_core_actor::ActorRef;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11/// Component statistics
12#[derive(Debug, Clone, Default)]
13pub struct ComponentStats {
14    /// Number of operations performed.
15    pub operations: u64,
16    /// Total execution time in nanoseconds.
17    pub total_time_ns: u64,
18    /// Maximum observed execution time in nanoseconds.
19    pub max_time_ns: u64,
20    /// Number of timing violations.
21    pub violations: u64,
22    /// Average execution time in nanoseconds.
23    pub avg_time_ns: f64,
24}
25
26/// Violation record
27#[derive(Debug, Clone)]
28pub struct Violation {
29    /// Name of the component that violated its time budget.
30    pub component: String,
31    /// Expected execution time in nanoseconds.
32    pub expected_ns: u64,
33    /// Actual execution time in nanoseconds.
34    pub actual_ns: u64,
35    /// When the violation occurred (microseconds since UNIX epoch).
36    pub timestamp: u64,
37    /// Optional value associated with the violation.
38    pub value: Option<f32>,
39}
40
41/// Sandbox summary
42#[derive(Debug, Default, Clone)]
43pub struct SandboxSummary {
44    /// Total number of operations across all components.
45    pub total_operations: u64,
46    /// Total number of violations across all components.
47    pub total_violations: u64,
48    /// Names of active components.
49    pub components: Vec<String>,
50    /// Maximum operation time across all components.
51    pub max_time_ns: u64,
52    /// Name of the component with the maximum operation time.
53    pub max_time_component: Option<String>,
54    /// Number of recorded violations.
55    pub violations_count: usize,
56}
57
58/// Micro-control observer using `ActorRef<Telemetry>` for event dispatch.
59#[derive(Clone)]
60pub struct MicroControlObserver {
61    stats: Arc<RwLock<HashMap<String, ComponentStats>>>,
62    violations: Arc<RwLock<Vec<Violation>>>,
63    telemetry_tx: Option<ActorRef<Telemetry>>,
64}
65
66impl Default for MicroControlObserver {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72impl MicroControlObserver {
73    /// Create an observer without telemetry reporting.
74    pub fn new() -> Self {
75        Self {
76            stats: Arc::new(RwLock::new(HashMap::new())),
77            violations: Arc::new(RwLock::new(Vec::new())),
78            telemetry_tx: None,
79        }
80    }
81
82    /// Create an observer that sends events to the given actor.
83    pub fn with_actor(tx: ActorRef<Telemetry>) -> Self {
84        Self {
85            stats: Arc::new(RwLock::new(HashMap::new())),
86            violations: Arc::new(RwLock::new(Vec::new())),
87            telemetry_tx: Some(tx),
88        }
89    }
90
91    fn send_telemetry(&self, event: Telemetry) {
92        if let Some(ref tx) = self.telemetry_tx {
93            tx.send(event);
94        }
95    }
96
97    /// Start observing an operation for the given component.
98    pub fn observe_start(&self, component: &str) -> OperationGuard {
99        OperationGuard {
100            component: component.to_string(),
101            start_time: Self::now(),
102            observer: self.clone(),
103        }
104    }
105
106    /// Start observing with parameter context (port + parameter).
107    pub fn observe_start_with_params(
108        &self,
109        component: &str,
110        port: PortId,
111        _parameter: &ParameterId,
112    ) -> OperationGuard {
113        let guard = self.observe_start(component);
114        self.send_telemetry(Telemetry::event(
115            "observer",
116            "micro_start",
117            vec![port.node_id().inner() as f32, port.index() as f32],
118        ));
119        guard
120    }
121
122    /// Record a timing violation for a component.
123    pub fn record_violation(
124        &self,
125        component: &str,
126        expected_ns: u64,
127        actual_ns: u64,
128        value: Option<f32>,
129    ) {
130        let violation = Violation {
131            component: component.to_string(),
132            expected_ns,
133            actual_ns,
134            timestamp: Self::now(),
135            value,
136        };
137        self.violations.write().push(violation.clone());
138        let mut stats = self.stats.write();
139        let comp_stats = stats.entry(component.to_string()).or_default();
140        comp_stats.violations += 1;
141        self.send_telemetry(Telemetry::violation(
142            component,
143            expected_ns,
144            actual_ns,
145            value,
146        ));
147        println!(
148            "⚠️ Violation in {}: {}ns (expected {}ns)",
149            component, actual_ns, expected_ns
150        );
151    }
152
153    /// Get statistics for a specific component.
154    pub fn component_stats(&self, component: &str) -> Option<ComponentStats> {
155        self.stats.read().get(component).cloned()
156    }
157
158    /// Get all recorded violations.
159    pub fn violations(&self) -> Vec<Violation> {
160        self.violations.read().clone()
161    }
162
163    /// Get a summary of the entire sandbox.
164    pub fn sandbox_summary(&self) -> SandboxSummary {
165        let stats = self.stats.read();
166        let mut summary = SandboxSummary::default();
167        for (component, comp_stats) in stats.iter() {
168            summary.total_operations += comp_stats.operations;
169            summary.total_violations += comp_stats.violations;
170            summary.components.push(component.clone());
171            if comp_stats.max_time_ns > summary.max_time_ns {
172                summary.max_time_ns = comp_stats.max_time_ns;
173                summary.max_time_component = Some(component.clone());
174            }
175        }
176        summary.violations_count = self.violations.read().len();
177        summary
178    }
179
180    fn now() -> u64 {
181        SystemTime::now()
182            .duration_since(UNIX_EPOCH)
183            .unwrap_or_default()
184            .as_micros() as u64
185    }
186}
187
188/// Guard that automatically records operation completion
189pub struct OperationGuard {
190    component: String,
191    start_time: u64,
192    observer: MicroControlObserver,
193}
194
195impl Drop for OperationGuard {
196    fn drop(&mut self) {
197        let duration = (Self::now() - self.start_time) * 1000;
198        let mut stats = self.observer.stats.write();
199        let comp_stats = stats.entry(self.component.clone()).or_default();
200        comp_stats.operations += 1;
201        comp_stats.total_time_ns += duration;
202        if duration > comp_stats.max_time_ns {
203            comp_stats.max_time_ns = duration;
204        }
205        comp_stats.avg_time_ns = comp_stats.total_time_ns as f64 / comp_stats.operations as f64;
206        self.observer.send_telemetry(Telemetry::event(
207            "observer",
208            "micro_complete",
209            vec![duration as f32],
210        ));
211    }
212}
213
214impl OperationGuard {
215    fn now() -> u64 {
216        SystemTime::now()
217            .duration_since(UNIX_EPOCH)
218            .unwrap_or_default()
219            .as_micros() as u64
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use rill_core_actor::ActorSystem;
227    use std::sync::{Arc, Mutex};
228
229    #[test]
230    fn test_observer_creation() {
231        let observer = MicroControlObserver::new();
232        let stats = observer.sandbox_summary();
233        assert_eq!(stats.total_operations, 0);
234    }
235
236    #[test]
237    fn test_observer_record_violation() {
238        let system = ActorSystem::new();
239        let received = Arc::new(Mutex::new(Vec::new()));
240        let recv = received.clone();
241        let mut actor = system.spawn("telemetry", move |msg: Telemetry| {
242            recv.lock().unwrap().push(msg);
243        });
244        let observer = MicroControlObserver::with_actor(actor.actor_ref());
245        observer.record_violation("test_comp", 100, 250, Some(0.5));
246        let stats = observer.sandbox_summary();
247        assert_eq!(stats.total_violations, 1);
248        let violations = observer.violations();
249        assert_eq!(violations.len(), 1);
250        assert_eq!(violations[0].component, "test_comp");
251        actor.drain();
252        let events = received.lock().unwrap();
253        for evt in events.iter() {
254            if let Telemetry::Violation { component, .. } = evt {
255                assert_eq!(component, "test_comp");
256            }
257        }
258    }
259
260    #[test]
261    fn test_observer_operation_guard() {
262        let system = ActorSystem::new();
263        let received = Arc::new(Mutex::new(Vec::new()));
264        let recv = received.clone();
265        let mut actor = system.spawn("telemetry", move |msg: Telemetry| {
266            recv.lock().unwrap().push(msg);
267        });
268        let observer = MicroControlObserver::with_actor(actor.actor_ref());
269        {
270            let _guard = observer.observe_start("test_op");
271            std::thread::sleep(std::time::Duration::from_micros(10));
272        }
273        let stats = observer.sandbox_summary();
274        assert_eq!(stats.total_operations, 1);
275        actor.drain();
276        let events = received.lock().unwrap();
277        for evt in events.iter() {
278            if let Telemetry::Event { kind, .. } = evt {
279                assert_eq!(kind, "micro_complete");
280            }
281        }
282    }
283}