1use crackle_runtime::{CrackleTask, Kiln, ThermalProfile, TaskOutput};
10use std::time::{Duration, Instant};
11
12struct ProcessedEvent {
14 event_id: u64,
15 batch_id: u32,
16 processing_time_ms: f64,
17 throughput: f64, memory_mb: f64, error_rate: f64, records_in: f64,
21 records_out: f64,
22 cpu_temp_c: f64, latency_p99_ms: f64, consumer_lag: f64, gc_pause_ms: f64, }
27
28impl CrackleTask for ProcessedEvent {
29 type Output = f64;
30
31 fn fire(&self) -> TaskOutput<Self::Output> {
32 TaskOutput::new(
33 self.processing_time_ms,
34 vec![
35 ("processing_time_ms".into(), self.processing_time_ms),
36 ("throughput".into(), self.throughput),
37 ("memory_mb".into(), self.memory_mb),
38 ("error_rate".into(), self.error_rate),
39 ("records_in".into(), self.records_in),
40 ("records_out".into(), self.records_out),
41 ("cpu_temp_c".into(), self.cpu_temp_c),
42 ("latency_p99_ms".into(), self.latency_p99_ms),
43 ("consumer_lag".into(), self.consumer_lag),
44 ("gc_pause_ms".into(), self.gc_pause_ms),
45 ("event_id".into(), self.event_id as f64),
46 ("batch_id".into(), self.batch_id as f64),
47 ],
48 )
49 }
50
51 fn label(&self) -> String {
52 format!("batch-{}-event-{}", self.batch_id, self.event_id)
53 }
54}
55
56fn simulate_batch(
58 batch_id: u32,
59 batch_size: u32,
60 base_latency_ms: f64,
61 degradation_factor: f64, ) -> Vec<ProcessedEvent> {
63 let mut events = Vec::with_capacity(batch_size as usize);
64 for i in 0..batch_size {
65 let event_id = batch_id as u64 * 1000 + i as u64;
66 let noise: f64 = (i as f64 * 7.0).sin() * 0.1; let degradation = degradation_factor * (i as f64 / batch_size as f64);
68
69 let processing_time_ms = base_latency_ms * (1.0 + degradation + noise);
70 let throughput = 1000.0 / processing_time_ms * 1000.0;
71 let memory_mb = 128.0 + degradation * 64.0 + noise * 10.0;
72 let error_rate = (degradation * 0.5 + noise.abs() * 0.05).min(0.5);
73 let records_in = 1000.0;
74 let records_out = records_in * (1.0 - error_rate);
75 let cpu_temp_c = 65.0 + degradation * 30.0 + noise * 5.0;
76 let latency_p99_ms = processing_time_ms * 2.5;
77 let consumer_lag = degradation * 10000.0 + (i as f64 * 3.0).sin().abs() * 100.0;
78 let gc_pause_ms = 5.0 + degradation * 50.0;
79
80 events.push(ProcessedEvent {
81 event_id,
82 batch_id,
83 processing_time_ms,
84 throughput,
85 memory_mb,
86 error_rate,
87 records_in,
88 records_out,
89 cpu_temp_c,
90 latency_p99_ms,
91 consumer_lag,
92 gc_pause_ms,
93 });
94 }
95 events
96}
97
98fn main() {
99 let total_events = 1000;
100 let batch_size = 100;
101 let num_batches = total_events / batch_size;
102
103 println!("═══════════════════════════════════════════════════════════════");
104 println!(" CRACKLE-RUNTIME: EVENT STREAM PROCESSOR TEST");
105 println!("═══════════════════════════════════════════════════════════════\n");
106 println!("Processing {} events across {} batches of {}...\n",
107 total_events, num_batches, batch_size);
108
109 let start = Instant::now();
110
111 println!("─── Phase 1: Healthy System ───");
113 let mut healthy_kiln = Kiln::new(
114 ThermalProfile::fast_cooling()
115 );
116
117 for batch_id in 1..=4 {
119 let events = simulate_batch(batch_id, batch_size as u32, 10.0, 0.05);
120 for event in events {
121 healthy_kiln.fire_and_record(event).unwrap();
122 }
123
124 let elapsed = start.elapsed();
125 println!(" Batch {}/{} fired ({} events, {:.0}ms)",
126 batch_id, num_batches, batch_size, elapsed.as_millis());
127 }
128
129 let healthy_patterns = healthy_kiln.cool();
131 println!("\n Healthy system patterns ({} detected):", healthy_patterns.len());
132 for p in &healthy_patterns {
133 println!(" [{:?}] {} (conf: {:.2})", p.kind(), p.description(), p.confidence());
134 }
135 let conservation = healthy_kiln.distribution_shift(10);
137 println!(" Distribution shifts:");
138 for (name, shift) in &conservation[..conservation.len().min(3)] {
139 println!(" {} → KL = {:.4}", name, shift);
140 }
141
142 println!();
143
144 println!("─── Phase 2: Degrading System ───");
146
147 let mut degrading_kiln = Kiln::new(
148 ThermalProfile::fast_cooling()
149 );
150
151 let degradation_levels = [0.2, 0.5, 0.9, 1.5];
153 for (i, °radation) in degradation_levels.iter().enumerate() {
154 let batch_id = (5 + i) as u32;
155 let base_latency = 10.0 + degradation * 50.0;
157
158 let events = simulate_batch(batch_id, batch_size as u32, base_latency, degradation);
159 for event in events {
160 degrading_kiln.fire_and_record(event).unwrap();
161 }
162
163 let elapsed = start.elapsed();
164 println!(" Batch {}/{} [degradation={:.1}x] fired (latency baseline: {:.0}ms, {:.0}ms total)",
165 batch_id, num_batches, degradation, base_latency, elapsed.as_millis());
166 }
167
168 let degrading_patterns = degrading_kiln.cool();
169 println!("\n Degrading system patterns ({} detected):", degrading_patterns.len());
170 for p in °rading_patterns {
171 println!(" [{:?}] {} (conf: {:.2})", p.kind(), p.description(), p.confidence());
172 }
173
174 let jsd_shifts = degrading_kiln.jsd_shift(10);
176 println!(" JSD shifts (distribution shape changes):");
177 for (name, shift) in &jsd_shifts[..jsd_shifts.len().min(5)] {
178 println!(" {} → JSD = {:.4}", name, shift);
179 }
180
181 let pes = degrading_kiln.permutation_entropies(4);
183 println!(" Permutation entropies (temporal structure):");
184 for (name, pe) in &pes {
185 println!(" {} → PE = {:.4}", name, pe);
186 }
187
188 println!();
189
190 println!("─── Phase 3: Conservation Law Verification ───");
193 let mut conservation_kiln = Kiln::new(
194 ThermalProfile::fast_cooling()
195 );
196
197 for batch_id in 1..=3 {
199 let event = ProcessedEvent {
200 event_id: batch_id as u64,
201 batch_id: batch_id as u32,
202 processing_time_ms: 10.0,
203 throughput: 100.0,
204 memory_mb: 128.0,
205 error_rate: 0.01,
206 records_in: 1000.0,
207 records_out: 995.0, cpu_temp_c: 65.0,
209 latency_p99_ms: 25.0,
210 consumer_lag: 10.0,
211 gc_pause_ms: 5.0,
212 };
213 conservation_kiln.fire_and_record(event).unwrap();
214 }
215
216 for batch_id in 4..=6 {
218 let event = ProcessedEvent {
219 event_id: batch_id as u64,
220 batch_id: batch_id as u32,
221 processing_time_ms: 10.0 + batch_id as f64 * 10.0,
222 throughput: 50.0,
223 memory_mb: 256.0,
224 error_rate: 0.3,
225 records_in: 1000.0,
226 records_out: 700.0, cpu_temp_c: 80.0,
228 latency_p99_ms: 100.0,
229 consumer_lag: 500.0,
230 gc_pause_ms: 20.0,
231 };
232 conservation_kiln.fire_and_record(event).unwrap();
233 }
234
235 let conservation_patterns = conservation_kiln.cool();
236 println!(" Conservation-specific patterns ({} detected):", conservation_patterns.len());
237 for p in &conservation_patterns {
238 println!(" [{:?}] {} (conf: {:.2})", p.kind(), p.description(), p.confidence());
239 }
240
241 let mi = conservation_kiln.mi_matrix(8);
243 println!(" Mutual Information matrix (8 bins):");
244 let metric_names = ["processing_time_ms", "error_rate", "records_in", "records_out", "memory_mb"];
245 for (i, name_i) in metric_names.iter().enumerate() {
246 for (j, name_j) in metric_names.iter().enumerate() {
247 if i == j {
248 println!(" I({:25}; {:25}) = H({})", name_i, name_j, name_i);
249 } else {
250 println!(" I({:25}; {:25}) = {:.4} bits", name_i, name_j, mi[i][j]);
251 }
252 }
253 }
254
255 println!();
256 println!("───────────────────────────────────────────────────────────────");
257 println!(" SUMMARY");
258 println!("───────────────────────────────────────────────────────────────\n");
259
260 println!(" Degradation Metrics:");
262 let pt = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::PhaseTransition));
263 let pt_count = pt.count();
264 println!(" Phase transitions detected in degraded system: {} alerts",
265 pt_count);
266
267 let corr = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::Correlation));
268 let corr_count = corr.count();
269 println!(" Correlations in degraded system: {} pairs", corr_count);
270
271 let cons = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::Conservation));
272 let cons_count = cons.count();
273 println!(" Conservation laws in degraded system: {} found", cons_count);
274
275 let clust = degrading_patterns.iter().filter(|p| matches!(p.kind(), crackle_runtime::PatternKind::Clustering));
276 let clust_count = clust.count();
277 println!(" Clusters in degraded system: {} groups", clust_count);
278
279 println!();
280 let total_duration = start.elapsed();
281 println!(" Total test duration: {:.2}s", total_duration.as_secs_f64());
282 println!(" Events processed: {}", total_events);
283 println!(" Avg throughput: {:.0} events/s",
284 total_events as f64 / total_duration.as_secs_f64());
285}