1use super::telemetry::Telemetry;
4use crate::traits::{ParameterId, PortId};
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10#[derive(Debug, Clone, Default)]
12pub struct ComponentStats {
13 pub operations: u64,
15 pub total_time_ns: u64,
17 pub max_time_ns: u64,
19 pub violations: u64,
21 pub avg_time_ns: f64,
23}
24
25#[derive(Debug, Clone)]
27pub struct Violation {
28 pub component: String,
30 pub expected_ns: u64,
32 pub actual_ns: u64,
34 pub timestamp: u64,
36 pub value: Option<f32>,
38}
39
40#[derive(Debug, Default, Clone)]
42pub struct SandboxSummary {
43 pub total_operations: u64,
45 pub total_violations: u64,
47 pub components: Vec<String>,
49 pub max_time_ns: u64,
51 pub max_time_component: Option<String>,
53 pub violations_count: usize,
55}
56
57#[derive(Debug, Clone)]
59pub struct MicroControlPermit {
60 enabled: Arc<std::sync::atomic::AtomicBool>,
62 max_time_ns: u64,
64 component: String,
66}
67
68impl MicroControlPermit {
69 pub fn new(component: impl Into<String>, max_time_ns: u64) -> Self {
71 Self {
72 enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
73 max_time_ns,
74 component: component.into(),
75 }
76 }
77
78 pub fn is_allowed(&self) -> bool {
80 self.enabled.load(std::sync::atomic::Ordering::Relaxed)
81 }
82
83 pub fn revoke(&self) {
85 self.enabled
86 .store(false, std::sync::atomic::Ordering::Relaxed);
87 }
88
89 pub fn max_time_ns(&self) -> u64 {
91 self.max_time_ns
92 }
93
94 pub fn component(&self) -> &str {
96 &self.component
97 }
98}
99
100#[derive(Clone)]
102pub struct MicroControlObserver {
103 stats: Arc<RwLock<HashMap<String, ComponentStats>>>,
105
106 violations: Arc<RwLock<Vec<Violation>>>,
108
109 telemetry_tx: crossbeam_channel::Sender<Telemetry>,
111}
112
113impl MicroControlObserver {
114 pub fn new(telemetry: super::telemetry::TelemetryQueue) -> Self {
116 Self {
117 stats: Arc::new(RwLock::new(HashMap::new())),
118 violations: Arc::new(RwLock::new(Vec::new())),
119 telemetry_tx: telemetry.sender(),
120 }
121 }
122
123 pub fn with_sender(telemetry_tx: crossbeam_channel::Sender<Telemetry>) -> Self {
125 Self {
126 stats: Arc::new(RwLock::new(HashMap::new())),
127 violations: Arc::new(RwLock::new(Vec::new())),
128 telemetry_tx,
129 }
130 }
131
132 pub fn observe_start(&self, component: &str) -> OperationGuard {
134 OperationGuard {
135 component: component.to_string(),
136 start_time: Self::now(),
137 observer: self.clone(),
138 }
139 }
140
141 pub fn observe_start_with_params(
143 &self,
144 component: &str,
145 port: PortId,
146 _parameter: &ParameterId,
147 ) -> OperationGuard {
148 let guard = self.observe_start(component);
149
150 let _ = self.telemetry_tx.send(Telemetry::event(
152 "observer",
153 "micro_start",
154 vec![port.node_id().inner() as f32, port.index() as f32],
155 ));
156
157 guard
158 }
159
160 pub fn record_violation(
162 &self,
163 component: &str,
164 expected_ns: u64,
165 actual_ns: u64,
166 value: Option<f32>,
167 ) {
168 let violation = Violation {
169 component: component.to_string(),
170 expected_ns,
171 actual_ns,
172 timestamp: Self::now(),
173 value,
174 };
175
176 self.violations.write().push(violation.clone());
178
179 let mut stats = self.stats.write();
181 let comp_stats = stats.entry(component.to_string()).or_default();
182 comp_stats.violations += 1;
183
184 let _ = self.telemetry_tx.send(Telemetry::violation(
186 component,
187 expected_ns,
188 actual_ns,
189 value,
190 ));
191
192 println!(
194 "⚠️ Нарушение в {}: {}нс (ожидалось {}нс)",
195 component, actual_ns, expected_ns
196 );
197 }
198
199 pub fn component_stats(&self, component: &str) -> Option<ComponentStats> {
201 self.stats.read().get(component).cloned()
202 }
203
204 pub fn violations(&self) -> Vec<Violation> {
206 self.violations.read().clone()
207 }
208
209 pub fn sandbox_summary(&self) -> SandboxSummary {
211 let stats = self.stats.read();
212 let mut summary = SandboxSummary::default();
213
214 for (component, comp_stats) in stats.iter() {
215 summary.total_operations += comp_stats.operations;
216 summary.total_violations += comp_stats.violations;
217 summary.components.push(component.clone());
218
219 if comp_stats.max_time_ns > summary.max_time_ns {
220 summary.max_time_ns = comp_stats.max_time_ns;
221 summary.max_time_component = Some(component.clone());
222 }
223 }
224
225 summary.violations_count = self.violations.read().len();
226 summary
227 }
228
229 fn now() -> u64 {
231 SystemTime::now()
232 .duration_since(UNIX_EPOCH)
233 .unwrap_or_default()
234 .as_micros() as u64
235 }
236}
237
238pub struct OperationGuard {
240 component: String,
241 start_time: u64,
242 observer: MicroControlObserver,
243}
244
245impl Drop for OperationGuard {
246 fn drop(&mut self) {
247 let duration = (Self::now() - self.start_time) * 1000; let mut stats = self.observer.stats.write();
251 let comp_stats = stats.entry(self.component.clone()).or_default();
252 comp_stats.operations += 1;
253 comp_stats.total_time_ns += duration;
254 if duration > comp_stats.max_time_ns {
255 comp_stats.max_time_ns = duration;
256 }
257 comp_stats.avg_time_ns = comp_stats.total_time_ns as f64 / comp_stats.operations as f64;
258
259 let _ = self.observer.telemetry_tx.send(Telemetry::event(
261 "observer",
262 "micro_complete",
263 vec![duration as f32],
264 ));
265 }
266}
267
268impl OperationGuard {
269 fn now() -> u64 {
270 SystemTime::now()
271 .duration_since(UNIX_EPOCH)
272 .unwrap_or_default()
273 .as_micros() as u64
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 #[test]
281 fn test_observer_creation() {
282 let (tx, _rx) = crossbeam_channel::unbounded();
283 let observer = MicroControlObserver::with_sender(tx);
284
285 let stats = observer.sandbox_summary();
286 assert_eq!(stats.total_operations, 0);
287 assert_eq!(stats.total_violations, 0);
288 }
289
290 #[test]
291 fn test_observer_record_violation() {
292 let (tx, rx) = crossbeam_channel::unbounded();
293 let observer = MicroControlObserver::with_sender(tx);
294
295 observer.record_violation("test_comp", 100, 250, Some(0.5));
296
297 let stats = observer.sandbox_summary();
298 assert_eq!(stats.total_violations, 1);
299
300 let violations = observer.violations();
301 assert_eq!(violations.len(), 1);
302 assert_eq!(violations[0].component, "test_comp");
303 assert_eq!(violations[0].expected_ns, 100);
304 assert_eq!(violations[0].actual_ns, 250);
305 assert_eq!(violations[0].value, Some(0.5));
306
307 let telemetry = rx.try_recv().unwrap();
309 match telemetry {
310 Telemetry::Violation {
311 component,
312 expected_ns,
313 actual_ns,
314 value,
315 ..
316 } => {
317 assert_eq!(component, "test_comp");
318 assert_eq!(expected_ns, 100);
319 assert_eq!(actual_ns, 250);
320 assert_eq!(value, Some(0.5));
321 }
322 _ => panic!("Expected violation telemetry"),
323 }
324 }
325
326 #[test]
327 fn test_observer_operation_guard() {
328 let (tx, rx) = crossbeam_channel::unbounded();
329 let observer = MicroControlObserver::with_sender(tx);
330
331 {
332 let _guard = observer.observe_start("test_op");
333 std::thread::sleep(std::time::Duration::from_micros(10));
334 } let stats = observer.sandbox_summary();
337 assert_eq!(stats.total_operations, 1);
338 assert!(stats.max_time_ns > 0);
339
340 let telemetry = rx.try_recv().unwrap();
342 match telemetry {
343 Telemetry::Event { kind, .. } => {
344 assert_eq!(kind, "micro_complete");
345 }
346 _ => panic!("Expected event telemetry"),
347 }
348 }
349
350 #[test]
351 fn test_observer_component_stats() {
352 let (tx, _rx) = crossbeam_channel::unbounded();
353 let observer = MicroControlObserver::with_sender(tx);
354
355 for i in 0..5 {
356 let _guard = observer.observe_start("comp1");
357 std::thread::sleep(std::time::Duration::from_micros(i * 10));
358 }
359
360 for i in 0..3 {
361 let _guard = observer.observe_start("comp2");
362 std::thread::sleep(std::time::Duration::from_micros(i * 20));
363 }
364
365 let stats = observer.sandbox_summary();
366 assert_eq!(stats.total_operations, 8);
367 assert_eq!(stats.components.len(), 2);
368
369 let comp1_stats = observer.component_stats("comp1").unwrap();
370 assert_eq!(comp1_stats.operations, 5);
371 assert!(comp1_stats.avg_time_ns > 0.0);
372
373 let comp2_stats = observer.component_stats("comp2").unwrap();
374 assert_eq!(comp2_stats.operations, 3);
375 }
376}