1use super::telemetry::Telemetry;
4use crate::traits::{ParameterId, PortId};
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10#[derive(Debug, Clone, Default)]
12pub struct ComponentStats {
13 pub operations: u64,
15 pub total_time_ns: u64,
17 pub max_time_ns: u64,
19 pub violations: u64,
21 pub avg_time_ns: f64,
23}
24
25#[derive(Debug, Clone)]
27pub struct Violation {
28 pub component: String,
30 pub expected_ns: u64,
32 pub actual_ns: u64,
34 pub timestamp: u64,
36 pub value: Option<f32>,
38}
39
40#[derive(Debug, Default, Clone)]
42pub struct SandboxSummary {
43 pub total_operations: u64,
45 pub total_violations: u64,
47 pub components: Vec<String>,
49 pub max_time_ns: u64,
51 pub max_time_component: Option<String>,
53 pub violations_count: usize,
55}
56
57#[derive(Debug, Clone)]
59pub struct MicroControlPermit {
60 enabled: Arc<std::sync::atomic::AtomicBool>,
62 max_time_ns: u64,
64 component: String,
66}
67
68impl MicroControlPermit {
69 pub fn new(component: impl Into<String>, max_time_ns: u64) -> Self {
71 Self {
72 enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
73 max_time_ns,
74 component: component.into(),
75 }
76 }
77
78 pub fn is_allowed(&self) -> bool {
80 self.enabled.load(std::sync::atomic::Ordering::Relaxed)
81 }
82
83 pub fn revoke(&self) {
85 self.enabled
86 .store(false, std::sync::atomic::Ordering::Relaxed);
87 }
88
89 pub fn max_time_ns(&self) -> u64 {
91 self.max_time_ns
92 }
93
94 pub fn component(&self) -> &str {
96 &self.component
97 }
98}
99
100#[derive(Clone)]
102pub struct MicroControlObserver {
103 stats: Arc<RwLock<HashMap<String, ComponentStats>>>,
105
106 violations: Arc<RwLock<Vec<Violation>>>,
108
109 telemetry_tx: crossbeam_channel::Sender<Telemetry>,
111}
112
113impl MicroControlObserver {
114 pub fn new(telemetry: super::telemetry::TelemetryQueue) -> Self {
116 Self {
117 stats: Arc::new(RwLock::new(HashMap::new())),
118 violations: Arc::new(RwLock::new(Vec::new())),
119 telemetry_tx: telemetry.sender(),
120 }
121 }
122
123 pub fn with_sender(telemetry_tx: crossbeam_channel::Sender<Telemetry>) -> Self {
125 Self {
126 stats: Arc::new(RwLock::new(HashMap::new())),
127 violations: Arc::new(RwLock::new(Vec::new())),
128 telemetry_tx,
129 }
130 }
131
132 pub fn observe_start(&self, component: &str) -> OperationGuard {
134 OperationGuard {
135 component: component.to_string(),
136 start_time: Self::now(),
137 observer: self.clone(),
138 }
139 }
140
141 pub fn observe_start_with_params(
143 &self,
144 component: &str,
145 port: PortId,
146 _parameter: &ParameterId,
147 ) -> OperationGuard {
148 let guard = self.observe_start(component);
149
150 let _ = self.telemetry_tx.send(Telemetry::event(
152 "observer",
153 "micro_start",
154 vec![port.node_id().inner() as f32, port.index() as f32],
155 ));
156
157 guard
158 }
159
160 pub fn record_violation(
162 &self,
163 component: &str,
164 expected_ns: u64,
165 actual_ns: u64,
166 value: Option<f32>,
167 ) {
168 let violation = Violation {
169 component: component.to_string(),
170 expected_ns,
171 actual_ns,
172 timestamp: Self::now(),
173 value,
174 };
175
176 self.violations.write().push(violation.clone());
178
179 let mut stats = self.stats.write();
181 let comp_stats = stats.entry(component.to_string()).or_default();
182 comp_stats.violations += 1;
183
184 let _ = self.telemetry_tx.send(Telemetry::violation(
186 component,
187 expected_ns,
188 actual_ns,
189 value,
190 ));
191
192 println!(
194 "⚠️ Нарушение в {}: {}нс (ожидалось {}нс)",
195 component, actual_ns, expected_ns
196 );
197 }
198
199 pub fn component_stats(&self, component: &str) -> Option<ComponentStats> {
201 self.stats.read().get(component).cloned()
202 }
203
204 pub fn violations(&self) -> Vec<Violation> {
206 self.violations.read().clone()
207 }
208
209 pub fn sandbox_summary(&self) -> SandboxSummary {
211 let stats = self.stats.read();
212 let mut summary = SandboxSummary::default();
213
214 for (component, comp_stats) in stats.iter() {
215 summary.total_operations += comp_stats.operations;
216 summary.total_violations += comp_stats.violations;
217 summary.components.push(component.clone());
218
219 if comp_stats.max_time_ns > summary.max_time_ns {
220 summary.max_time_ns = comp_stats.max_time_ns;
221 summary.max_time_component = Some(component.clone());
222 }
223 }
224
225 summary.violations_count = self.violations.read().len();
226 summary
227 }
228
229 fn now() -> u64 {
231 SystemTime::now()
232 .duration_since(UNIX_EPOCH)
233 .unwrap_or_default()
234 .as_micros() as u64
235 }
236}
237
238pub struct OperationGuard {
240 component: String,
241 start_time: u64,
242 observer: MicroControlObserver,
243}
244
245impl Drop for OperationGuard {
246 fn drop(&mut self) {
247 let duration = (Self::now() - self.start_time) * 1000; let mut stats = self.observer.stats.write();
251 let comp_stats = stats.entry(self.component.clone()).or_default();
252 comp_stats.operations += 1;
253 comp_stats.total_time_ns += duration;
254 if duration > comp_stats.max_time_ns {
255 comp_stats.max_time_ns = duration;
256 }
257 comp_stats.avg_time_ns = comp_stats.total_time_ns as f64 / comp_stats.operations as f64;
258
259 let _ = self.observer.telemetry_tx.send(Telemetry::event(
261 "observer",
262 "micro_complete",
263 vec![duration as f32],
264 ));
265 }
266}
267
268impl OperationGuard {
269 fn now() -> u64 {
270 SystemTime::now()
271 .duration_since(UNIX_EPOCH)
272 .unwrap_or_default()
273 .as_micros() as u64
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use crate::queues::telemetry::TelemetryQueue;
281
282 #[test]
283 fn test_observer_creation() {
284 let (tx, _rx) = crossbeam_channel::unbounded();
285 let observer = MicroControlObserver::with_sender(tx);
286
287 let stats = observer.sandbox_summary();
288 assert_eq!(stats.total_operations, 0);
289 assert_eq!(stats.total_violations, 0);
290 }
291
292 #[test]
293 fn test_observer_record_violation() {
294 let (tx, rx) = crossbeam_channel::unbounded();
295 let observer = MicroControlObserver::with_sender(tx);
296
297 observer.record_violation("test_comp", 100, 250, Some(0.5));
298
299 let stats = observer.sandbox_summary();
300 assert_eq!(stats.total_violations, 1);
301
302 let violations = observer.violations();
303 assert_eq!(violations.len(), 1);
304 assert_eq!(violations[0].component, "test_comp");
305 assert_eq!(violations[0].expected_ns, 100);
306 assert_eq!(violations[0].actual_ns, 250);
307 assert_eq!(violations[0].value, Some(0.5));
308
309 let telemetry = rx.try_recv().unwrap();
311 match telemetry {
312 Telemetry::Violation {
313 component,
314 expected_ns,
315 actual_ns,
316 value,
317 ..
318 } => {
319 assert_eq!(component, "test_comp");
320 assert_eq!(expected_ns, 100);
321 assert_eq!(actual_ns, 250);
322 assert_eq!(value, Some(0.5));
323 }
324 _ => panic!("Expected violation telemetry"),
325 }
326 }
327
328 #[test]
329 fn test_observer_operation_guard() {
330 let (tx, rx) = crossbeam_channel::unbounded();
331 let observer = MicroControlObserver::with_sender(tx);
332
333 {
334 let _guard = observer.observe_start("test_op");
335 std::thread::sleep(std::time::Duration::from_micros(10));
336 } let stats = observer.sandbox_summary();
339 assert_eq!(stats.total_operations, 1);
340 assert!(stats.max_time_ns > 0);
341
342 let telemetry = rx.try_recv().unwrap();
344 match telemetry {
345 Telemetry::Event { kind, .. } => {
346 assert_eq!(kind, "micro_complete");
347 }
348 _ => panic!("Expected event telemetry"),
349 }
350 }
351
352 #[test]
353 fn test_observer_component_stats() {
354 let (tx, _rx) = crossbeam_channel::unbounded();
355 let observer = MicroControlObserver::with_sender(tx);
356
357 for i in 0..5 {
358 let _guard = observer.observe_start("comp1");
359 std::thread::sleep(std::time::Duration::from_micros(i * 10));
360 }
361
362 for i in 0..3 {
363 let _guard = observer.observe_start("comp2");
364 std::thread::sleep(std::time::Duration::from_micros(i * 20));
365 }
366
367 let stats = observer.sandbox_summary();
368 assert_eq!(stats.total_operations, 8);
369 assert_eq!(stats.components.len(), 2);
370
371 let comp1_stats = observer.component_stats("comp1").unwrap();
372 assert_eq!(comp1_stats.operations, 5);
373 assert!(comp1_stats.avg_time_ns > 0.0);
374
375 let comp2_stats = observer.component_stats("comp2").unwrap();
376 assert_eq!(comp2_stats.operations, 3);
377 }
378}