1use parking_lot::RwLock;
4use rill_core::queues::telemetry::Telemetry;
5use rill_core::traits::{ParameterId, PortId};
6use rill_core_actor::ActorRef;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11#[derive(Debug, Clone, Default)]
13pub struct ComponentStats {
14 pub operations: u64,
16 pub total_time_ns: u64,
18 pub max_time_ns: u64,
20 pub violations: u64,
22 pub avg_time_ns: f64,
24}
25
26#[derive(Debug, Clone)]
28pub struct Violation {
29 pub component: String,
31 pub expected_ns: u64,
33 pub actual_ns: u64,
35 pub timestamp: u64,
37 pub value: Option<f32>,
39}
40
41#[derive(Debug, Default, Clone)]
43pub struct SandboxSummary {
44 pub total_operations: u64,
46 pub total_violations: u64,
48 pub components: Vec<String>,
50 pub max_time_ns: u64,
52 pub max_time_component: Option<String>,
54 pub violations_count: usize,
56}
57
58#[derive(Clone)]
60pub struct MicroControlObserver {
61 stats: Arc<RwLock<HashMap<String, ComponentStats>>>,
62 violations: Arc<RwLock<Vec<Violation>>>,
63 telemetry_tx: Option<ActorRef<Telemetry>>,
64}
65
66impl Default for MicroControlObserver {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72impl MicroControlObserver {
73 pub fn new() -> Self {
75 Self {
76 stats: Arc::new(RwLock::new(HashMap::new())),
77 violations: Arc::new(RwLock::new(Vec::new())),
78 telemetry_tx: None,
79 }
80 }
81
82 pub fn with_actor(tx: ActorRef<Telemetry>) -> Self {
84 Self {
85 stats: Arc::new(RwLock::new(HashMap::new())),
86 violations: Arc::new(RwLock::new(Vec::new())),
87 telemetry_tx: Some(tx),
88 }
89 }
90
91 fn send_telemetry(&self, event: Telemetry) {
92 if let Some(ref tx) = self.telemetry_tx {
93 tx.send(event);
94 }
95 }
96
97 pub fn observe_start(&self, component: &str) -> OperationGuard {
99 OperationGuard {
100 component: component.to_string(),
101 start_time: Self::now(),
102 observer: self.clone(),
103 }
104 }
105
106 pub fn observe_start_with_params(
108 &self,
109 component: &str,
110 port: PortId,
111 _parameter: &ParameterId,
112 ) -> OperationGuard {
113 let guard = self.observe_start(component);
114 self.send_telemetry(Telemetry::event(
115 "observer",
116 "micro_start",
117 vec![port.node_id().inner() as f32, port.index() as f32],
118 ));
119 guard
120 }
121
122 pub fn record_violation(
124 &self,
125 component: &str,
126 expected_ns: u64,
127 actual_ns: u64,
128 value: Option<f32>,
129 ) {
130 let violation = Violation {
131 component: component.to_string(),
132 expected_ns,
133 actual_ns,
134 timestamp: Self::now(),
135 value,
136 };
137 self.violations.write().push(violation.clone());
138 let mut stats = self.stats.write();
139 let comp_stats = stats.entry(component.to_string()).or_default();
140 comp_stats.violations += 1;
141 self.send_telemetry(Telemetry::violation(
142 component,
143 expected_ns,
144 actual_ns,
145 value,
146 ));
147 println!(
148 "⚠️ Violation in {}: {}ns (expected {}ns)",
149 component, actual_ns, expected_ns
150 );
151 }
152
153 pub fn component_stats(&self, component: &str) -> Option<ComponentStats> {
155 self.stats.read().get(component).cloned()
156 }
157
158 pub fn violations(&self) -> Vec<Violation> {
160 self.violations.read().clone()
161 }
162
163 pub fn sandbox_summary(&self) -> SandboxSummary {
165 let stats = self.stats.read();
166 let mut summary = SandboxSummary::default();
167 for (component, comp_stats) in stats.iter() {
168 summary.total_operations += comp_stats.operations;
169 summary.total_violations += comp_stats.violations;
170 summary.components.push(component.clone());
171 if comp_stats.max_time_ns > summary.max_time_ns {
172 summary.max_time_ns = comp_stats.max_time_ns;
173 summary.max_time_component = Some(component.clone());
174 }
175 }
176 summary.violations_count = self.violations.read().len();
177 summary
178 }
179
180 fn now() -> u64 {
181 SystemTime::now()
182 .duration_since(UNIX_EPOCH)
183 .unwrap_or_default()
184 .as_micros() as u64
185 }
186}
187
188pub struct OperationGuard {
190 component: String,
191 start_time: u64,
192 observer: MicroControlObserver,
193}
194
195impl Drop for OperationGuard {
196 fn drop(&mut self) {
197 let duration = (Self::now() - self.start_time) * 1000;
198 let mut stats = self.observer.stats.write();
199 let comp_stats = stats.entry(self.component.clone()).or_default();
200 comp_stats.operations += 1;
201 comp_stats.total_time_ns += duration;
202 if duration > comp_stats.max_time_ns {
203 comp_stats.max_time_ns = duration;
204 }
205 comp_stats.avg_time_ns = comp_stats.total_time_ns as f64 / comp_stats.operations as f64;
206 self.observer.send_telemetry(Telemetry::event(
207 "observer",
208 "micro_complete",
209 vec![duration as f32],
210 ));
211 }
212}
213
214impl OperationGuard {
215 fn now() -> u64 {
216 SystemTime::now()
217 .duration_since(UNIX_EPOCH)
218 .unwrap_or_default()
219 .as_micros() as u64
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use rill_core_actor::ActorSystem;
227 use std::sync::{Arc, Mutex};
228
229 #[test]
230 fn test_observer_creation() {
231 let observer = MicroControlObserver::new();
232 let stats = observer.sandbox_summary();
233 assert_eq!(stats.total_operations, 0);
234 }
235
236 #[test]
237 fn test_observer_record_violation() {
238 let system = ActorSystem::new();
239 let received = Arc::new(Mutex::new(Vec::new()));
240 let recv = received.clone();
241 let mut actor = system.spawn("telemetry", move |msg: Telemetry| {
242 recv.lock().unwrap().push(msg);
243 });
244 let observer = MicroControlObserver::with_actor(actor.actor_ref());
245 observer.record_violation("test_comp", 100, 250, Some(0.5));
246 let stats = observer.sandbox_summary();
247 assert_eq!(stats.total_violations, 1);
248 let violations = observer.violations();
249 assert_eq!(violations.len(), 1);
250 assert_eq!(violations[0].component, "test_comp");
251 actor.drain();
252 let events = received.lock().unwrap();
253 for evt in events.iter() {
254 if let Telemetry::Violation { component, .. } = evt {
255 assert_eq!(component, "test_comp");
256 }
257 }
258 }
259
260 #[test]
261 fn test_observer_operation_guard() {
262 let system = ActorSystem::new();
263 let received = Arc::new(Mutex::new(Vec::new()));
264 let recv = received.clone();
265 let mut actor = system.spawn("telemetry", move |msg: Telemetry| {
266 recv.lock().unwrap().push(msg);
267 });
268 let observer = MicroControlObserver::with_actor(actor.actor_ref());
269 {
270 let _guard = observer.observe_start("test_op");
271 std::thread::sleep(std::time::Duration::from_micros(10));
272 }
273 let stats = observer.sandbox_summary();
274 assert_eq!(stats.total_operations, 1);
275 actor.drain();
276 let events = received.lock().unwrap();
277 for evt in events.iter() {
278 if let Telemetry::Event { kind, .. } = evt {
279 assert_eq!(kind, "micro_complete");
280 }
281 }
282 }
283}