Skip to main content

rill_core/queues/
observer.rs

1//! Наблюдатель за микро-контролем — всевидящее око
2
3use super::telemetry::Telemetry;
4use crate::traits::{ParameterId, PortId};
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10/// Статистика компонента
11#[derive(Debug, Clone, Default)]
12pub struct ComponentStats {
13    /// Количество операций
14    pub operations: u64,
15    /// Суммарное время (для среднего)
16    pub total_time_ns: u64,
17    /// Максимальное время
18    pub max_time_ns: u64,
19    /// Количество нарушений
20    pub violations: u64,
21    /// Среднее время
22    pub avg_time_ns: f64,
23}
24
25/// Запись о нарушении
26#[derive(Debug, Clone)]
27pub struct Violation {
28    /// Компонент, нарушивший закон
29    pub component: String,
30    /// Ожидаемое время (нс)
31    pub expected_ns: u64,
32    /// Фактическое время (нс)
33    pub actual_ns: u64,
34    /// Время нарушения
35    pub timestamp: u64,
36    /// Примененное значение (если было)
37    pub value: Option<f32>,
38}
39
40/// Сводка по песочнице
41#[derive(Debug, Default, Clone)]
42pub struct SandboxSummary {
43    /// Всего операций микро-контроля
44    pub total_operations: u64,
45    /// Всего нарушений
46    pub total_violations: u64,
47    /// Количество активных компонентов
48    pub components: Vec<String>,
49    /// Максимальное время операции
50    pub max_time_ns: u64,
51    /// Компонент с максимальным временем
52    pub max_time_component: Option<String>,
53    /// Количество записанных нарушений
54    pub violations_count: usize,
55}
56
57/// Разрешение на микро-контроль
58#[derive(Debug, Clone)]
59pub struct MicroControlPermit {
60    /// Флаг, что разрешено прямое управление
61    enabled: Arc<std::sync::atomic::AtomicBool>,
62    /// Максимальное время обработки (в наносекундах)
63    max_time_ns: u64,
64    /// Имя компонента (для отладки)
65    component: String,
66}
67
68impl MicroControlPermit {
69    /// Создать новое разрешение
70    pub fn new(component: impl Into<String>, max_time_ns: u64) -> Self {
71        Self {
72            enabled: Arc::new(std::sync::atomic::AtomicBool::new(true)),
73            max_time_ns,
74            component: component.into(),
75        }
76    }
77
78    /// Проверить, можно ли использовать микро-контроль
79    pub fn is_allowed(&self) -> bool {
80        self.enabled.load(std::sync::atomic::Ordering::Relaxed)
81    }
82
83    /// Запретить микро-контроль
84    pub fn revoke(&self) {
85        self.enabled
86            .store(false, std::sync::atomic::Ordering::Relaxed);
87    }
88
89    /// Получить максимальное время обработки
90    pub fn max_time_ns(&self) -> u64 {
91        self.max_time_ns
92    }
93
94    /// Получить имя компонента
95    pub fn component(&self) -> &str {
96        &self.component
97    }
98}
99
100/// Наблюдатель за микро-контролем
101#[derive(Clone)]
102pub struct MicroControlObserver {
103    /// Статистика по компонентам
104    stats: Arc<RwLock<HashMap<String, ComponentStats>>>,
105
106    /// История нарушений
107    violations: Arc<RwLock<Vec<Violation>>>,
108
109    /// Отправитель телеметрии
110    telemetry_tx: crossbeam_channel::Sender<Telemetry>,
111}
112
113impl MicroControlObserver {
114    /// Создать нового наблюдателя с очередью телеметрии
115    pub fn new(telemetry: super::telemetry::TelemetryQueue) -> Self {
116        Self {
117            stats: Arc::new(RwLock::new(HashMap::new())),
118            violations: Arc::new(RwLock::new(Vec::new())),
119            telemetry_tx: telemetry.sender(),
120        }
121    }
122
123    /// Создать нового наблюдателя с отправителем телеметрии
124    pub fn with_sender(telemetry_tx: crossbeam_channel::Sender<Telemetry>) -> Self {
125        Self {
126            stats: Arc::new(RwLock::new(HashMap::new())),
127            violations: Arc::new(RwLock::new(Vec::new())),
128            telemetry_tx,
129        }
130    }
131
132    /// Наблюдать за началом операции
133    pub fn observe_start(&self, component: &str) -> OperationGuard {
134        OperationGuard {
135            component: component.to_string(),
136            start_time: Self::now(),
137            observer: self.clone(),
138        }
139    }
140
141    /// Наблюдать за началом операции с параметрами
142    pub fn observe_start_with_params(
143        &self,
144        component: &str,
145        port: PortId,
146        _parameter: &ParameterId,
147    ) -> OperationGuard {
148        let guard = self.observe_start(component);
149
150        // Отправляем телеметрию о начале операции
151        let _ = self.telemetry_tx.send(Telemetry::event(
152            "observer",
153            "micro_start",
154            vec![port.node_id().inner() as f32, port.index() as f32],
155        ));
156
157        guard
158    }
159
160    /// Зафиксировать нарушение
161    pub fn record_violation(
162        &self,
163        component: &str,
164        expected_ns: u64,
165        actual_ns: u64,
166        value: Option<f32>,
167    ) {
168        let violation = Violation {
169            component: component.to_string(),
170            expected_ns,
171            actual_ns,
172            timestamp: Self::now(),
173            value,
174        };
175
176        // Сохраняем в историю
177        self.violations.write().push(violation.clone());
178
179        // Обновляем статистику
180        let mut stats = self.stats.write();
181        let comp_stats = stats.entry(component.to_string()).or_default();
182        comp_stats.violations += 1;
183
184        // Отправляем телеметрию напрямую через Sender
185        let _ = self.telemetry_tx.send(Telemetry::violation(
186            component,
187            expected_ns,
188            actual_ns,
189            value,
190        ));
191
192        // Временно используем println вместо log
193        println!(
194            "⚠️ Нарушение в {}: {}нс (ожидалось {}нс)",
195            component, actual_ns, expected_ns
196        );
197    }
198
199    /// Получить статистику по компоненту
200    pub fn component_stats(&self, component: &str) -> Option<ComponentStats> {
201        self.stats.read().get(component).cloned()
202    }
203
204    /// Получить все нарушения
205    pub fn violations(&self) -> Vec<Violation> {
206        self.violations.read().clone()
207    }
208
209    /// Получить сводку по песочнице
210    pub fn sandbox_summary(&self) -> SandboxSummary {
211        let stats = self.stats.read();
212        let mut summary = SandboxSummary::default();
213
214        for (component, comp_stats) in stats.iter() {
215            summary.total_operations += comp_stats.operations;
216            summary.total_violations += comp_stats.violations;
217            summary.components.push(component.clone());
218
219            if comp_stats.max_time_ns > summary.max_time_ns {
220                summary.max_time_ns = comp_stats.max_time_ns;
221                summary.max_time_component = Some(component.clone());
222            }
223        }
224
225        summary.violations_count = self.violations.read().len();
226        summary
227    }
228
229    /// Получить текущее время в микросекундах
230    fn now() -> u64 {
231        SystemTime::now()
232            .duration_since(UNIX_EPOCH)
233            .unwrap_or_default()
234            .as_micros() as u64
235    }
236}
237
238/// Guard, который автоматически фиксирует завершение операции
239pub struct OperationGuard {
240    component: String,
241    start_time: u64,
242    observer: MicroControlObserver,
243}
244
245impl Drop for OperationGuard {
246    fn drop(&mut self) {
247        let duration = (Self::now() - self.start_time) * 1000; // микросекунды -> наносекунды
248
249        // Обновляем статистику
250        let mut stats = self.observer.stats.write();
251        let comp_stats = stats.entry(self.component.clone()).or_default();
252        comp_stats.operations += 1;
253        comp_stats.total_time_ns += duration;
254        if duration > comp_stats.max_time_ns {
255            comp_stats.max_time_ns = duration;
256        }
257        comp_stats.avg_time_ns = comp_stats.total_time_ns as f64 / comp_stats.operations as f64;
258
259        // Отправляем телеметрию через Sender
260        let _ = self.observer.telemetry_tx.send(Telemetry::event(
261            "observer",
262            "micro_complete",
263            vec![duration as f32],
264        ));
265    }
266}
267
268impl OperationGuard {
269    fn now() -> u64 {
270        SystemTime::now()
271            .duration_since(UNIX_EPOCH)
272            .unwrap_or_default()
273            .as_micros() as u64
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use crate::queues::telemetry::TelemetryQueue;
281
282    #[test]
283    fn test_observer_creation() {
284        let (tx, _rx) = crossbeam_channel::unbounded();
285        let observer = MicroControlObserver::with_sender(tx);
286
287        let stats = observer.sandbox_summary();
288        assert_eq!(stats.total_operations, 0);
289        assert_eq!(stats.total_violations, 0);
290    }
291
292    #[test]
293    fn test_observer_record_violation() {
294        let (tx, rx) = crossbeam_channel::unbounded();
295        let observer = MicroControlObserver::with_sender(tx);
296
297        observer.record_violation("test_comp", 100, 250, Some(0.5));
298
299        let stats = observer.sandbox_summary();
300        assert_eq!(stats.total_violations, 1);
301
302        let violations = observer.violations();
303        assert_eq!(violations.len(), 1);
304        assert_eq!(violations[0].component, "test_comp");
305        assert_eq!(violations[0].expected_ns, 100);
306        assert_eq!(violations[0].actual_ns, 250);
307        assert_eq!(violations[0].value, Some(0.5));
308
309        // Проверяем, что телеметрия была отправлена
310        let telemetry = rx.try_recv().unwrap();
311        match telemetry {
312            Telemetry::Violation {
313                component,
314                expected_ns,
315                actual_ns,
316                value,
317                ..
318            } => {
319                assert_eq!(component, "test_comp");
320                assert_eq!(expected_ns, 100);
321                assert_eq!(actual_ns, 250);
322                assert_eq!(value, Some(0.5));
323            }
324            _ => panic!("Expected violation telemetry"),
325        }
326    }
327
328    #[test]
329    fn test_observer_operation_guard() {
330        let (tx, rx) = crossbeam_channel::unbounded();
331        let observer = MicroControlObserver::with_sender(tx);
332
333        {
334            let _guard = observer.observe_start("test_op");
335            std::thread::sleep(std::time::Duration::from_micros(10));
336        } // guard автоматически фиксирует завершение при drop
337
338        let stats = observer.sandbox_summary();
339        assert_eq!(stats.total_operations, 1);
340        assert!(stats.max_time_ns > 0);
341
342        // Проверяем телеметрию завершения
343        let telemetry = rx.try_recv().unwrap();
344        match telemetry {
345            Telemetry::Event { kind, .. } => {
346                assert_eq!(kind, "micro_complete");
347            }
348            _ => panic!("Expected event telemetry"),
349        }
350    }
351
352    #[test]
353    fn test_observer_component_stats() {
354        let (tx, _rx) = crossbeam_channel::unbounded();
355        let observer = MicroControlObserver::with_sender(tx);
356
357        for i in 0..5 {
358            let _guard = observer.observe_start("comp1");
359            std::thread::sleep(std::time::Duration::from_micros(i * 10));
360        }
361
362        for i in 0..3 {
363            let _guard = observer.observe_start("comp2");
364            std::thread::sleep(std::time::Duration::from_micros(i * 20));
365        }
366
367        let stats = observer.sandbox_summary();
368        assert_eq!(stats.total_operations, 8);
369        assert_eq!(stats.components.len(), 2);
370
371        let comp1_stats = observer.component_stats("comp1").unwrap();
372        assert_eq!(comp1_stats.operations, 5);
373        assert!(comp1_stats.avg_time_ns > 0.0);
374
375        let comp2_stats = observer.component_stats("comp2").unwrap();
376        assert_eq!(comp2_stats.operations, 3);
377    }
378}