Skip to main content

event_stream_processor/
event_stream_processor.rs

1//! Real test: event stream processor with conservation laws and degradation detection.
2//!
3//! Simulates processing 1000 events in batches, applying conservation checks
4//! between batches, and detecting when the system is degrading.
5//!
6//! This tests crackle-runtime in a scenario closer to production — event streams
7//! with throughput, latency, and error metrics.
8
9use crackle_runtime::{CrackleTask, Kiln, ThermalProfile, TaskOutput};
10use std::time::{Duration, Instant};
11
12/// An event being processed through the stream processor.
13struct ProcessedEvent {
14    event_id: u64,
15    batch_id: u32,
16    processing_time_ms: f64,
17    throughput: f64,         // events/sec for this batch
18    memory_mb: f64,          // memory used during processing
19    error_rate: f64,         // 0.0–1.0
20    records_in: f64,
21    records_out: f64,
22    cpu_temp_c: f64,         // system temperature proxy
23    latency_p99_ms: f64,     // p99 latency
24    consumer_lag: f64,       // kafka-style consumer lag
25    gc_pause_ms: f64,        // GC pause time
26}
27
28impl CrackleTask for ProcessedEvent {
29    type Output = f64;
30
31    fn fire(&self) -> TaskOutput<Self::Output> {
32        TaskOutput::new(
33            self.processing_time_ms,
34            vec![
35                ("processing_time_ms".into(),     self.processing_time_ms),
36                ("throughput".into(),              self.throughput),
37                ("memory_mb".into(),               self.memory_mb),
38                ("error_rate".into(),              self.error_rate),
39                ("records_in".into(),              self.records_in),
40                ("records_out".into(),             self.records_out),
41                ("cpu_temp_c".into(),              self.cpu_temp_c),
42                ("latency_p99_ms".into(),          self.latency_p99_ms),
43                ("consumer_lag".into(),            self.consumer_lag),
44                ("gc_pause_ms".into(),             self.gc_pause_ms),
45                ("event_id".into(),                self.event_id as f64),
46                ("batch_id".into(),                self.batch_id as f64),
47            ],
48        )
49    }
50
51    fn label(&self) -> String {
52        format!("batch-{}-event-{}", self.batch_id, self.event_id)
53    }
54}
55
56/// Simulate a batch of events with given characteristics.
57fn simulate_batch(
58    batch_id: u32,
59    batch_size: u32,
60    base_latency_ms: f64,
61    degradation_factor: f64,  // 0.0 = healthy, increases over time
62) -> Vec<ProcessedEvent> {
63    let mut events = Vec::with_capacity(batch_size as usize);
64    for i in 0..batch_size {
65        let event_id = batch_id as u64 * 1000 + i as u64;
66        let noise: f64 = (i as f64 * 7.0).sin() * 0.1; // small sine wave noise
67        let degradation = degradation_factor * (i as f64 / batch_size as f64);
68
69        let processing_time_ms = base_latency_ms * (1.0 + degradation + noise);
70        let throughput = 1000.0 / processing_time_ms * 1000.0;
71        let memory_mb = 128.0 + degradation * 64.0 + noise * 10.0;
72        let error_rate = (degradation * 0.5 + noise.abs() * 0.05).min(0.5);
73        let records_in = 1000.0;
74        let records_out = records_in * (1.0 - error_rate);
75        let cpu_temp_c = 65.0 + degradation * 30.0 + noise * 5.0;
76        let latency_p99_ms = processing_time_ms * 2.5;
77        let consumer_lag = degradation * 10000.0 + (i as f64 * 3.0).sin().abs() * 100.0;
78        let gc_pause_ms = 5.0 + degradation * 50.0;
79
80        events.push(ProcessedEvent {
81            event_id,
82            batch_id,
83            processing_time_ms,
84            throughput,
85            memory_mb,
86            error_rate,
87            records_in,
88            records_out,
89            cpu_temp_c,
90            latency_p99_ms,
91            consumer_lag,
92            gc_pause_ms,
93        });
94    }
95    events
96}
97
98fn main() {
99    let total_events = 1000;
100    let batch_size = 100;
101    let num_batches = total_events / batch_size;
102
103    println!("═══════════════════════════════════════════════════════════════");
104    println!("  CRACKLE-RUNTIME: EVENT STREAM PROCESSOR TEST");
105    println!("═══════════════════════════════════════════════════════════════\n");
106    println!("Processing {} events across {} batches of {}...\n",
107             total_events, num_batches, batch_size);
108
109    let start = Instant::now();
110
111    // Phase 1: Healthy system (batches 1-4)
112    println!("─── Phase 1: Healthy System ───");
113    let mut healthy_kiln = Kiln::new(
114        ThermalProfile::fast_cooling()
115    );
116
117    // Batch 1-4: all healthy, 10ms baseline latency
118    for batch_id in 1..=4 {
119        let events = simulate_batch(batch_id, batch_size as u32, 10.0, 0.05);
120        for event in events {
121            healthy_kiln.fire_and_record(event).unwrap();
122        }
123
124        let elapsed = start.elapsed();
125        println!("  Batch {}/{} fired ({} events, {:.0}ms)",
126                 batch_id, num_batches, batch_size, elapsed.as_millis());
127    }
128
129    // Cool healthy phase
130    let healthy_patterns = healthy_kiln.cool();
131    println!("\n  Healthy system patterns ({} detected):", healthy_patterns.len());
132    for p in &healthy_patterns {
133        println!("    [{:?}] {} (conf: {:.2})", p.kind(), p.description(), p.confidence());
134    }
135    // Also check conservation on records_in/records_out
136    let conservation = healthy_kiln.distribution_shift(10);
137    println!("    Distribution shifts:");
138    for (name, shift) in &conservation[..conservation.len().min(3)] {
139        println!("      {} → KL = {:.4}", name, shift);
140    }
141
142    println!();
143
144    // Phase 2: Degrading system (batches 5-8)
145    println!("─── Phase 2: Degrading System ───");
146
147    let mut degrading_kiln = Kiln::new(
148        ThermalProfile::fast_cooling()
149    );
150
151    // Batches where degradation ramps up
152    let degradation_levels = [0.2, 0.5, 0.9, 1.5];
153    for (i, &degradation) in degradation_levels.iter().enumerate() {
154        let batch_id = (5 + i) as u32;
155        // latency increases with degradation
156        let base_latency = 10.0 + degradation * 50.0;
157
158        let events = simulate_batch(batch_id, batch_size as u32, base_latency, degradation);
159        for event in events {
160            degrading_kiln.fire_and_record(event).unwrap();
161        }
162
163        let elapsed = start.elapsed();
164        println!("  Batch {}/{} [degradation={:.1}x] fired (latency baseline: {:.0}ms, {:.0}ms total)",
165                 batch_id, num_batches, degradation, base_latency, elapsed.as_millis());
166    }
167
168    let degrading_patterns = degrading_kiln.cool();
169    println!("\n  Degrading system patterns ({} detected):", degrading_patterns.len());
170    for p in &degrading_patterns {
171        println!("    [{:?}] {} (conf: {:.2})", p.kind(), p.description(), p.confidence());
172    }
173
174    // JSD shift (distribution shape changes)
175    let jsd_shifts = degrading_kiln.jsd_shift(10);
176    println!("    JSD shifts (distribution shape changes):");
177    for (name, shift) in &jsd_shifts[..jsd_shifts.len().min(5)] {
178        println!("      {} → JSD = {:.4}", name, shift);
179    }
180
181    // Permutation entropy (temporal structure)
182    let pes = degrading_kiln.permutation_entropies(4);
183    println!("    Permutation entropies (temporal structure):");
184    for (name, pe) in &pes {
185        println!("      {} → PE = {:.4}", name, pe);
186    }
187
188    println!();
189
190    // Phase 3: Conservation law test — records in = records out should be violated
191    // as errors increase
192    println!("─── Phase 3: Conservation Law Verification ───");
193    let mut conservation_kiln = Kiln::new(
194        ThermalProfile::fast_cooling()
195    );
196
197    // Force records_in = records_out for first 3 batches (healthy conservation)
198    for batch_id in 1..=3 {
199        let event = ProcessedEvent {
200            event_id: batch_id as u64,
201            batch_id: batch_id as u32,
202            processing_time_ms: 10.0,
203            throughput: 100.0,
204            memory_mb: 128.0,
205            error_rate: 0.01,
206            records_in: 1000.0,
207            records_out: 995.0,  // nearly conserved
208            cpu_temp_c: 65.0,
209            latency_p99_ms: 25.0,
210            consumer_lag: 10.0,
211            gc_pause_ms: 5.0,
212        };
213        conservation_kiln.fire_and_record(event).unwrap();
214    }
215
216    // Force violation of records conservation (errors spike)
217    for batch_id in 4..=6 {
218        let event = ProcessedEvent {
219            event_id: batch_id as u64,
220            batch_id: batch_id as u32,
221            processing_time_ms: 10.0 + batch_id as f64 * 10.0,
222            throughput: 50.0,
223            memory_mb: 256.0,
224            error_rate: 0.3,
225            records_in: 1000.0,
226            records_out: 700.0,  // 30% loss — conservation violated
227            cpu_temp_c: 80.0,
228            latency_p99_ms: 100.0,
229            consumer_lag: 500.0,
230            gc_pause_ms: 20.0,
231        };
232        conservation_kiln.fire_and_record(event).unwrap();
233    }
234
235    let conservation_patterns = conservation_kiln.cool();
236    println!("  Conservation-specific patterns ({} detected):", conservation_patterns.len());
237    for p in &conservation_patterns {
238        println!("    [{:?}] {} (conf: {:.2})", p.kind(), p.description(), p.confidence());
239    }
240
241    // MI matrix to show nonlinear dependencies
242    let mi = conservation_kiln.mi_matrix(8);
243    println!("  Mutual Information matrix (8 bins):");
244    let metric_names = ["processing_time_ms", "error_rate", "records_in", "records_out", "memory_mb"];
245    for (i, name_i) in metric_names.iter().enumerate() {
246        for (j, name_j) in metric_names.iter().enumerate() {
247            if i == j {
248                println!("    I({:25}; {:25}) = H({})", name_i, name_j, name_i);
249            } else {
250                println!("    I({:25}; {:25}) = {:.4} bits", name_i, name_j, mi[i][j]);
251            }
252        }
253    }
254
255    println!();
256    println!("───────────────────────────────────────────────────────────────");
257    println!("  SUMMARY");
258    println!("───────────────────────────────────────────────────────────────\n");
259
260    // Degradation detection analysis
261    println!("  Degradation Metrics:");
262    let pt = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::PhaseTransition));
263    let pt_count = pt.count();
264    println!("    Phase transitions detected in degraded system: {} alerts",
265             pt_count);
266
267    let corr = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::Correlation));
268    let corr_count = corr.count();
269    println!("    Correlations in degraded system: {} pairs", corr_count);
270
271    let cons = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::Conservation));
272    let cons_count = cons.count();
273    println!("    Conservation laws in degraded system: {} found", cons_count);
274
275    let clust = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::Clustering));
276    let clust_count = clust.count();
277    println!("    Clusters in degraded system: {} groups", clust_count);
278
279    println!();
280    let total_duration = start.elapsed();
281    println!("  Total test duration: {:.2}s", total_duration.as_secs_f64());
282    println!("  Events processed: {}", total_events);
283    println!("  Avg throughput: {:.0} events/s",
284             total_events as f64 / total_duration.as_secs_f64());
285}