ringkernel_procint/actors/
messages.rs1use crate::models::{
4 ConformanceResult, GpuDFGEdge, GpuDFGNode, GpuObjectEvent, GpuPartialOrderTrace,
5 GpuPatternMatch,
6};
7
8#[derive(Debug, Clone)]
10pub enum PipelineMessage {
11 Events(EventBatch),
13 DfgUpdate(DfgUpdateMessage),
15 PatternsDetected(PatternBatchMessage),
17 ConformanceResults(ConformanceBatchMessage),
19 PartialOrders(PartialOrderBatchMessage),
21 Control(ControlMessage),
23}
24
25#[derive(Debug, Clone)]
27pub struct EventBatch {
28 pub batch_id: u64,
30 pub events: Vec<GpuObjectEvent>,
32 pub timestamp_ms: u64,
34}
35
36impl EventBatch {
37 pub fn new(batch_id: u64, events: Vec<GpuObjectEvent>) -> Self {
39 Self {
40 batch_id,
41 events,
42 timestamp_ms: std::time::SystemTime::now()
43 .duration_since(std::time::UNIX_EPOCH)
44 .unwrap()
45 .as_millis() as u64,
46 }
47 }
48
49 pub fn len(&self) -> usize {
51 self.events.len()
52 }
53
54 pub fn is_empty(&self) -> bool {
56 self.events.is_empty()
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct DfgUpdateMessage {
63 pub batch_id: u64,
65 pub nodes: Vec<GpuDFGNode>,
67 pub edges: Vec<GpuDFGEdge>,
69 pub processing_time_us: u64,
71}
72
73#[derive(Debug, Clone)]
75pub struct PatternBatchMessage {
76 pub batch_id: u64,
78 pub patterns: Vec<GpuPatternMatch>,
80 pub processing_time_us: u64,
82}
83
84#[derive(Debug, Clone)]
86pub struct ConformanceBatchMessage {
87 pub batch_id: u64,
89 pub results: Vec<ConformanceResult>,
91 pub processing_time_us: u64,
93}
94
95#[derive(Debug, Clone)]
97pub struct PartialOrderBatchMessage {
98 pub batch_id: u64,
100 pub traces: Vec<GpuPartialOrderTrace>,
102 pub processing_time_us: u64,
104}
105
106#[derive(Debug, Clone, Copy)]
108pub enum ControlMessage {
109 Start,
111 Pause,
113 Stop,
115 Reset,
117 GetStats,
119}
120
121#[derive(Debug, Clone, Default)]
123pub struct PipelineStats {
124 pub total_events: u64,
126 pub batches_processed: u64,
128 pub patterns_detected: u64,
130 pub conformance_checks: u64,
132 pub avg_batch_time_us: f64,
134 pub events_per_second: f64,
136 pub is_running: bool,
138}
139
140impl PipelineStats {
141 pub fn record_batch(&mut self, events: u64, processing_time_us: u64) {
143 self.total_events += events;
144 self.batches_processed += 1;
145
146 let alpha = 0.1;
148 self.avg_batch_time_us =
149 self.avg_batch_time_us * (1.0 - alpha) + processing_time_us as f64 * alpha;
150
151 if processing_time_us > 0 {
153 let batch_eps = events as f64 * 1_000_000.0 / processing_time_us as f64;
154 self.events_per_second = self.events_per_second * (1.0 - alpha) + batch_eps * alpha;
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::models::HybridTimestamp;
163
164 #[test]
165 fn test_event_batch() {
166 let events = vec![GpuObjectEvent {
167 event_id: 1,
168 object_id: 100,
169 activity_id: 1,
170 timestamp: HybridTimestamp::now(),
171 ..Default::default()
172 }];
173
174 let batch = EventBatch::new(1, events);
175 assert_eq!(batch.len(), 1);
176 assert!(!batch.is_empty());
177 }
178
179 #[test]
180 fn test_pipeline_stats() {
181 let mut stats = PipelineStats::default();
182 stats.record_batch(1000, 1000); assert!(stats.events_per_second > 0.0);
184 }
185}