Skip to main content

ringkernel_procint/actors/
messages.rs

1//! Actor message types for process intelligence pipeline.
2
3use crate::models::{
4    ConformanceResult, GpuDFGEdge, GpuDFGNode, GpuObjectEvent, GpuPartialOrderTrace,
5    GpuPatternMatch,
6};
7
8/// Message types for the processing pipeline.
9#[derive(Debug, Clone)]
10pub enum PipelineMessage {
11    /// New events to process.
12    Events(EventBatch),
13    /// DFG update notification.
14    DfgUpdate(DfgUpdateMessage),
15    /// Pattern detection results.
16    PatternsDetected(PatternBatchMessage),
17    /// Conformance check results.
18    ConformanceResults(ConformanceBatchMessage),
19    /// Partial order derivation results.
20    PartialOrders(PartialOrderBatchMessage),
21    /// Pipeline control.
22    Control(ControlMessage),
23}
24
25/// Batch of events to process.
26#[derive(Debug, Clone)]
27pub struct EventBatch {
28    /// Batch ID.
29    pub batch_id: u64,
30    /// Events in the batch.
31    pub events: Vec<GpuObjectEvent>,
32    /// Timestamp when batch was created.
33    pub timestamp_ms: u64,
34}
35
36impl EventBatch {
37    /// Create a new event batch.
38    pub fn new(batch_id: u64, events: Vec<GpuObjectEvent>) -> Self {
39        Self {
40            batch_id,
41            events,
42            timestamp_ms: std::time::SystemTime::now()
43                .duration_since(std::time::UNIX_EPOCH)
44                .unwrap()
45                .as_millis() as u64,
46        }
47    }
48
49    /// Get batch size.
50    pub fn len(&self) -> usize {
51        self.events.len()
52    }
53
54    /// Check if batch is empty.
55    pub fn is_empty(&self) -> bool {
56        self.events.is_empty()
57    }
58}
59
60/// DFG update message.
61#[derive(Debug, Clone)]
62pub struct DfgUpdateMessage {
63    /// Batch ID that triggered update.
64    pub batch_id: u64,
65    /// Updated nodes.
66    pub nodes: Vec<GpuDFGNode>,
67    /// Updated edges.
68    pub edges: Vec<GpuDFGEdge>,
69    /// Processing time in microseconds.
70    pub processing_time_us: u64,
71}
72
73/// Pattern batch message.
74#[derive(Debug, Clone)]
75pub struct PatternBatchMessage {
76    /// Batch ID.
77    pub batch_id: u64,
78    /// Detected patterns.
79    pub patterns: Vec<GpuPatternMatch>,
80    /// Processing time in microseconds.
81    pub processing_time_us: u64,
82}
83
84/// Conformance batch message.
85#[derive(Debug, Clone)]
86pub struct ConformanceBatchMessage {
87    /// Batch ID.
88    pub batch_id: u64,
89    /// Conformance results.
90    pub results: Vec<ConformanceResult>,
91    /// Processing time in microseconds.
92    pub processing_time_us: u64,
93}
94
95/// Partial order batch message.
96#[derive(Debug, Clone)]
97pub struct PartialOrderBatchMessage {
98    /// Batch ID.
99    pub batch_id: u64,
100    /// Derived partial orders.
101    pub traces: Vec<GpuPartialOrderTrace>,
102    /// Processing time in microseconds.
103    pub processing_time_us: u64,
104}
105
106/// Pipeline control messages.
107#[derive(Debug, Clone, Copy)]
108pub enum ControlMessage {
109    /// Start processing.
110    Start,
111    /// Pause processing.
112    Pause,
113    /// Stop processing.
114    Stop,
115    /// Reset all state.
116    Reset,
117    /// Request statistics.
118    GetStats,
119}
120
121/// Pipeline statistics.
122#[derive(Debug, Clone, Default)]
123pub struct PipelineStats {
124    /// Total events processed.
125    pub total_events: u64,
126    /// Total batches processed.
127    pub batches_processed: u64,
128    /// Total patterns detected.
129    pub patterns_detected: u64,
130    /// Total conformance checks.
131    pub conformance_checks: u64,
132    /// Average batch processing time (us).
133    pub avg_batch_time_us: f64,
134    /// Events per second.
135    pub events_per_second: f64,
136    /// Is running.
137    pub is_running: bool,
138}
139
140impl PipelineStats {
141    /// Update with new batch metrics.
142    pub fn record_batch(&mut self, events: u64, processing_time_us: u64) {
143        self.total_events += events;
144        self.batches_processed += 1;
145
146        // Update rolling average
147        let alpha = 0.1;
148        self.avg_batch_time_us =
149            self.avg_batch_time_us * (1.0 - alpha) + processing_time_us as f64 * alpha;
150
151        // Update throughput estimate
152        if processing_time_us > 0 {
153            let batch_eps = events as f64 * 1_000_000.0 / processing_time_us as f64;
154            self.events_per_second = self.events_per_second * (1.0 - alpha) + batch_eps * alpha;
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::models::HybridTimestamp;
163
164    #[test]
165    fn test_event_batch() {
166        let events = vec![GpuObjectEvent {
167            event_id: 1,
168            object_id: 100,
169            activity_id: 1,
170            timestamp: HybridTimestamp::now(),
171            ..Default::default()
172        }];
173
174        let batch = EventBatch::new(1, events);
175        assert_eq!(batch.len(), 1);
176        assert!(!batch.is_empty());
177    }
178
179    #[test]
180    fn test_pipeline_stats() {
181        let mut stats = PipelineStats::default();
182        stats.record_batch(1000, 1000); // 1000 events in 1ms = 1M eps
183        assert!(stats.events_per_second > 0.0);
184    }
185}