Skip to main content

rill_core/queues/
observer.rs

1//! Наблюдатель за микро-контролем — всевидящее око
2
3use super::telemetry::Telemetry;
4use crate::traits::{ParameterId, PortId};
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10/// Статистика компонента
11#[derive(Debug, Clone, Default)]
12pub struct ComponentStats {
13    /// Количество операций
14    pub operations: u64,
15    /// Суммарное время (для среднего)
16    pub total_time_ns: u64,
17    /// Максимальное время
18    pub max_time_ns: u64,
19    /// Количество нарушений
20    pub violations: u64,
21    /// Среднее время
22    pub avg_time_ns: f64,
23}
24
25/// Запись о нарушении
26#[derive(Debug, Clone)]
27pub struct Violation {
28    /// Компонент, нарушивший закон
29    pub component: String,
30    /// Ожидаемое время (нс)
31    pub expected_ns: u64,
32    /// Фактическое время (нс)
33    pub actual_ns: u64,
34    /// Время нарушения
35    pub timestamp: u64,
36    /// Примененное значение (если было)
37    pub value: Option<f32>,
38}
39
40/// Сводка по песочнице
41#[derive(Debug, Default, Clone)]
42pub struct SandboxSummary {
43    /// Всего операций микро-контроля
44    pub total_operations: u64,
45    /// Всего нарушений
46    pub total_violations: u64,
47    /// Количество активных компонентов
48    pub components: Vec<String>,
49    /// Максимальное время операции
50    pub max_time_ns: u64,
51    /// Компонент с максимальным временем
52    pub max_time_component: Option<String>,
53    /// Количество записанных нарушений
54    pub violations_count: usize,
55}
56
57/// Разрешение на микро-контроль
58#[derive(Debug, Clone)]
59pub struct MicroControlPermit {
60    /// Флаг, что разрешено прямое управление
61    enabled: Arc<std::sync::atomic::AtomicBool>,
62    /// Максимальное время обработки (в наносекундах)
63    max_time_ns: u64,
64    /// Имя компонента (для отладки)
65    component: String,
66}
67
68impl MicroControlPermit {
69    /// Создать новое разрешение
70    pub fn new(component: impl Into<String>, max_time_ns: u64) -> Self {
71        Self {
72            enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
73            max_time_ns,
74            component: component.into(),
75        }
76    }
77
78    /// Проверить, можно ли использовать микро-контроль
79    pub fn is_allowed(&self) -> bool {
80        self.enabled.load(std::sync::atomic::Ordering::Relaxed)
81    }
82
83    /// Запретить микро-контроль
84    pub fn revoke(&self) {
85        self.enabled
86            .store(false, std::sync::atomic::Ordering::Relaxed);
87    }
88
89    /// Получить максимальное время обработки
90    pub fn max_time_ns(&self) -> u64 {
91        self.max_time_ns
92    }
93
94    /// Получить имя компонента
95    pub fn component(&self) -> &str {
96        &self.component
97    }
98}
99
100/// Наблюдатель за микро-контролем
101#[derive(Clone)]
102pub struct MicroControlObserver {
103    /// Статистика по компонентам
104    stats: Arc<RwLock<HashMap<String, ComponentStats>>>,
105
106    /// История нарушений
107    violations: Arc<RwLock<Vec<Violation>>>,
108
109    /// Отправитель телеметрии
110    telemetry_tx: crossbeam_channel::Sender<Telemetry>,
111}
112
113impl MicroControlObserver {
114    /// Создать нового наблюдателя с очередью телеметрии
115    pub fn new(telemetry: super::telemetry::TelemetryQueue) -> Self {
116        Self {
117            stats: Arc::new(RwLock::new(HashMap::new())),
118            violations: Arc::new(RwLock::new(Vec::new())),
119            telemetry_tx: telemetry.sender(),
120        }
121    }
122
123    /// Создать нового наблюдателя с отправителем телеметрии
124    pub fn with_sender(telemetry_tx: crossbeam_channel::Sender<Telemetry>) -> Self {
125        Self {
126            stats: Arc::new(RwLock::new(HashMap::new())),
127            violations: Arc::new(RwLock::new(Vec::new())),
128            telemetry_tx,
129        }
130    }
131
132    /// Наблюдать за началом операции
133    pub fn observe_start(&self, component: &str) -> OperationGuard {
134        OperationGuard {
135            component: component.to_string(),
136            start_time: Self::now(),
137            observer: self.clone(),
138        }
139    }
140
141    /// Наблюдать за началом операции с параметрами
142    pub fn observe_start_with_params(
143        &self,
144        component: &str,
145        port: PortId,
146        _parameter: &ParameterId,
147    ) -> OperationGuard {
148        let guard = self.observe_start(component);
149
150        // Отправляем телеметрию о начале операции
151        let _ = self.telemetry_tx.send(Telemetry::event(
152            "observer",
153            "micro_start",
154            vec![port.node_id().inner() as f32, port.index() as f32],
155        ));
156
157        guard
158    }
159
160    /// Зафиксировать нарушение
161    pub fn record_violation(
162        &self,
163        component: &str,
164        expected_ns: u64,
165        actual_ns: u64,
166        value: Option<f32>,
167    ) {
168        let violation = Violation {
169            component: component.to_string(),
170            expected_ns,
171            actual_ns,
172            timestamp: Self::now(),
173            value,
174        };
175
176        // Сохраняем в историю
177        self.violations.write().push(violation.clone());
178
179        // Обновляем статистику
180        let mut stats = self.stats.write();
181        let comp_stats = stats.entry(component.to_string()).or_default();
182        comp_stats.violations += 1;
183
184        // Отправляем телеметрию напрямую через Sender
185        let _ = self.telemetry_tx.send(Telemetry::violation(
186            component,
187            expected_ns,
188            actual_ns,
189            value,
190        ));
191
192        // Временно используем println вместо log
193        println!(
194            "⚠️ Нарушение в {}: {}нс (ожидалось {}нс)",
195            component, actual_ns, expected_ns
196        );
197    }
198
199    /// Получить статистику по компоненту
200    pub fn component_stats(&self, component: &str) -> Option<ComponentStats> {
201        self.stats.read().get(component).cloned()
202    }
203
204    /// Получить все нарушения
205    pub fn violations(&self) -> Vec<Violation> {
206        self.violations.read().clone()
207    }
208
209    /// Получить сводку по песочнице
210    pub fn sandbox_summary(&self) -> SandboxSummary {
211        let stats = self.stats.read();
212        let mut summary = SandboxSummary::default();
213
214        for (component, comp_stats) in stats.iter() {
215            summary.total_operations += comp_stats.operations;
216            summary.total_violations += comp_stats.violations;
217            summary.components.push(component.clone());
218
219            if comp_stats.max_time_ns > summary.max_time_ns {
220                summary.max_time_ns = comp_stats.max_time_ns;
221                summary.max_time_component = Some(component.clone());
222            }
223        }
224
225        summary.violations_count = self.violations.read().len();
226        summary
227    }
228
229    /// Получить текущее время в микросекундах
230    fn now() -> u64 {
231        SystemTime::now()
232            .duration_since(UNIX_EPOCH)
233            .unwrap_or_default()
234            .as_micros() as u64
235    }
236}
237
238/// Guard, который автоматически фиксирует завершение операции
239pub struct OperationGuard {
240    component: String,
241    start_time: u64,
242    observer: MicroControlObserver,
243}
244
245impl Drop for OperationGuard {
246    fn drop(&mut self) {
247        let duration = (Self::now() - self.start_time) * 1000; // микросекунды -> наносекунды
248
249        // Обновляем статистику
250        let mut stats = self.observer.stats.write();
251        let comp_stats = stats.entry(self.component.clone()).or_default();
252        comp_stats.operations += 1;
253        comp_stats.total_time_ns += duration;
254        if duration > comp_stats.max_time_ns {
255            comp_stats.max_time_ns = duration;
256        }
257        comp_stats.avg_time_ns = comp_stats.total_time_ns as f64 / comp_stats.operations as f64;
258
259        // Отправляем телеметрию через Sender
260        let _ = self.observer.telemetry_tx.send(Telemetry::event(
261            "observer",
262            "micro_complete",
263            vec![duration as f32],
264        ));
265    }
266}
267
268impl OperationGuard {
269    fn now() -> u64 {
270        SystemTime::now()
271            .duration_since(UNIX_EPOCH)
272            .unwrap_or_default()
273            .as_micros() as u64
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    #[test]
281    fn test_observer_creation() {
282        let (tx, _rx) = crossbeam_channel::unbounded();
283        let observer = MicroControlObserver::with_sender(tx);
284
285        let stats = observer.sandbox_summary();
286        assert_eq!(stats.total_operations, 0);
287        assert_eq!(stats.total_violations, 0);
288    }
289
290    #[test]
291    fn test_observer_record_violation() {
292        let (tx, rx) = crossbeam_channel::unbounded();
293        let observer = MicroControlObserver::with_sender(tx);
294
295        observer.record_violation("test_comp", 100, 250, Some(0.5));
296
297        let stats = observer.sandbox_summary();
298        assert_eq!(stats.total_violations, 1);
299
300        let violations = observer.violations();
301        assert_eq!(violations.len(), 1);
302        assert_eq!(violations[0].component, "test_comp");
303        assert_eq!(violations[0].expected_ns, 100);
304        assert_eq!(violations[0].actual_ns, 250);
305        assert_eq!(violations[0].value, Some(0.5));
306
307        // Проверяем, что телеметрия была отправлена
308        let telemetry = rx.try_recv().unwrap();
309        match telemetry {
310            Telemetry::Violation {
311                component,
312                expected_ns,
313                actual_ns,
314                value,
315                ..
316            } => {
317                assert_eq!(component, "test_comp");
318                assert_eq!(expected_ns, 100);
319                assert_eq!(actual_ns, 250);
320                assert_eq!(value, Some(0.5));
321            }
322            _ => panic!("Expected violation telemetry"),
323        }
324    }
325
326    #[test]
327    fn test_observer_operation_guard() {
328        let (tx, rx) = crossbeam_channel::unbounded();
329        let observer = MicroControlObserver::with_sender(tx);
330
331        {
332            let _guard = observer.observe_start("test_op");
333            std::thread::sleep(std::time::Duration::from_micros(10));
334        } // guard автоматически фиксирует завершение при drop
335
336        let stats = observer.sandbox_summary();
337        assert_eq!(stats.total_operations, 1);
338        assert!(stats.max_time_ns > 0);
339
340        // Проверяем телеметрию завершения
341        let telemetry = rx.try_recv().unwrap();
342        match telemetry {
343            Telemetry::Event { kind, .. } => {
344                assert_eq!(kind, "micro_complete");
345            }
346            _ => panic!("Expected event telemetry"),
347        }
348    }
349
350    #[test]
351    fn test_observer_component_stats() {
352        let (tx, _rx) = crossbeam_channel::unbounded();
353        let observer = MicroControlObserver::with_sender(tx);
354
355        for i in 0..5 {
356            let _guard = observer.observe_start("comp1");
357            std::thread::sleep(std::time::Duration::from_micros(i * 10));
358        }
359
360        for i in 0..3 {
361            let _guard = observer.observe_start("comp2");
362            std::thread::sleep(std::time::Duration::from_micros(i * 20));
363        }
364
365        let stats = observer.sandbox_summary();
366        assert_eq!(stats.total_operations, 8);
367        assert_eq!(stats.components.len(), 2);
368
369        let comp1_stats = observer.component_stats("comp1").unwrap();
370        assert_eq!(comp1_stats.operations, 5);
371        assert!(comp1_stats.avg_time_ns > 0.0);
372
373        let comp2_stats = observer.component_stats("comp2").unwrap();
374        assert_eq!(comp2_stats.operations, 3);
375    }
376}