ringkernel_procint/actors/
coordinator.rs1use super::{
6 ControlMessage, DfgUpdateMessage, PartialOrderBatchMessage, PatternBatchMessage,
7 PipelineMessage, PipelineStats,
8};
9use crate::analytics::AnalyticsEngine;
10use crate::cuda::GpuStatus;
11use crate::fabric::{PipelineConfig, ProcessingPipeline, SectorTemplate};
12use crate::kernels::{
13 ConformanceKernel, DfgConstructionKernel, PartialOrderKernel, PatternConfig,
14 PatternDetectionKernel,
15};
16use crate::models::ProcessModel;
17use std::collections::HashMap;
18use std::sync::mpsc::{channel, Receiver, Sender};
19
20#[derive(Debug, Clone)]
22pub struct ProcessVariation {
23 pub signature: String,
25 pub activities: Vec<u32>,
27 pub count: u64,
29 pub avg_duration_ms: f32,
31 pub is_conformant: bool,
33}
34
35#[derive(Debug, Clone, Default)]
37pub struct GpuStatsReport {
38 pub dfg_kernel_launches: u64,
40 pub pattern_kernel_launches: u64,
42 pub total_gpu_time_us: u64,
44 pub total_elements_gpu: u64,
46 pub bytes_transferred: u64,
48 pub dfg_using_gpu: bool,
50 pub pattern_using_gpu: bool,
52}
53
54impl GpuStatsReport {
55 pub fn is_gpu_active(&self) -> bool {
57 self.dfg_using_gpu || self.pattern_using_gpu
58 }
59
60 pub fn throughput(&self) -> f64 {
62 if self.total_gpu_time_us > 0 {
63 self.total_elements_gpu as f64 * 1_000_000.0 / self.total_gpu_time_us as f64
64 } else {
65 0.0
66 }
67 }
68
69 pub fn total_launches(&self) -> u64 {
71 self.dfg_kernel_launches + self.pattern_kernel_launches
72 }
73}
74
75pub struct PipelineCoordinator {
77 pipeline: ProcessingPipeline,
79 dfg_kernel: DfgConstructionKernel,
81 pattern_kernel: PatternDetectionKernel,
83 partial_order_kernel: PartialOrderKernel,
85 conformance_kernel: ConformanceKernel,
87 analytics: AnalyticsEngine,
89 stats: PipelineStats,
91 result_sender: Option<Sender<PipelineMessage>>,
93 next_batch_id: u64,
95 current_dfg: crate::models::DFGGraph,
97 current_patterns: Vec<crate::models::GpuPatternMatch>,
99 current_partial_orders: Vec<crate::models::GpuPartialOrderTrace>,
101 current_conformance: Option<crate::kernels::ConformanceCheckResult>,
103 variations: std::collections::HashMap<String, ProcessVariation>,
105}
106
107impl PipelineCoordinator {
108 pub fn new(sector: SectorTemplate, config: PipelineConfig) -> Self {
110 let model = Self::create_model_from_sector(§or);
112 let pipeline = ProcessingPipeline::new(sector, config);
113
114 Self {
115 pipeline,
116 dfg_kernel: DfgConstructionKernel::default(),
117 pattern_kernel: PatternDetectionKernel::new(PatternConfig::default()),
118 partial_order_kernel: PartialOrderKernel::default(),
119 conformance_kernel: ConformanceKernel::new(model),
120 analytics: AnalyticsEngine::new(),
121 stats: PipelineStats::default(),
122 result_sender: None,
123 next_batch_id: 1,
124 current_dfg: crate::models::DFGGraph::default(),
125 current_patterns: Vec::new(),
126 current_partial_orders: Vec::new(),
127 current_conformance: None,
128 variations: HashMap::new(),
129 }
130 }
131
132 fn create_model_from_sector(sector: &SectorTemplate) -> ProcessModel {
134 use crate::models::ProcessModelType;
135
136 let mut model = ProcessModel::new(1, sector.name(), ProcessModelType::DFG);
137 let registry = sector.build_registry();
138
139 for trans in sector.transitions() {
141 if let (Some(source), Some(target)) = (
142 registry.get_by_name(trans.source),
143 registry.get_by_name(trans.target),
144 ) {
145 model.add_transition(source.id, target.id);
146 }
147 }
148
149 for name in sector.start_activities() {
151 if let Some(activity) = registry.get_by_name(name) {
152 model.start_activities.push(activity.id);
153 }
154 }
155 for name in sector.end_activities() {
156 if let Some(activity) = registry.get_by_name(name) {
157 model.end_activities.push(activity.id);
158 }
159 }
160
161 model
162 }
163
164 pub fn with_conformance_model(mut self, model: ProcessModel) -> Self {
166 self.conformance_kernel = ConformanceKernel::new(model);
167 self
168 }
169
170 pub fn create_result_channel(&mut self) -> Receiver<PipelineMessage> {
172 let (sender, receiver) = channel();
173 self.result_sender = Some(sender);
174 receiver
175 }
176
177 pub fn stats(&self) -> &PipelineStats {
179 &self.stats
180 }
181
182 pub fn analytics(&self) -> &AnalyticsEngine {
184 &self.analytics
185 }
186
187 pub fn analytics_mut(&mut self) -> &mut AnalyticsEngine {
189 &mut self.analytics
190 }
191
192 pub fn current_dfg(&self) -> &crate::models::DFGGraph {
194 &self.current_dfg
195 }
196
197 pub fn current_patterns(&self) -> &[crate::models::GpuPatternMatch] {
199 &self.current_patterns
200 }
201
202 pub fn current_partial_orders(&self) -> &[crate::models::GpuPartialOrderTrace] {
204 &self.current_partial_orders
205 }
206
207 pub fn current_conformance(&self) -> Option<&crate::kernels::ConformanceCheckResult> {
209 self.current_conformance.as_ref()
210 }
211
212 pub fn variations(&self) -> &HashMap<String, ProcessVariation> {
214 &self.variations
215 }
216
217 pub fn dfg_gpu_status(&self) -> GpuStatus {
219 self.dfg_kernel.gpu_status()
220 }
221
222 pub fn pattern_gpu_status(&self) -> GpuStatus {
224 self.pattern_kernel.gpu_status()
225 }
226
227 pub fn is_dfg_using_gpu(&self) -> bool {
229 self.dfg_kernel.is_using_gpu()
230 }
231
232 pub fn is_pattern_using_gpu(&self) -> bool {
234 self.pattern_kernel.is_using_gpu()
235 }
236
237 pub fn gpu_stats(&self) -> GpuStatsReport {
239 let dfg_stats = self.dfg_kernel.gpu_stats();
240 let pattern_stats = self.pattern_kernel.gpu_stats();
241
242 GpuStatsReport {
243 dfg_kernel_launches: dfg_stats.kernel_launches,
244 pattern_kernel_launches: pattern_stats.kernel_launches,
245 total_gpu_time_us: dfg_stats.total_gpu_time_us + pattern_stats.total_gpu_time_us,
246 total_elements_gpu: dfg_stats.total_elements_gpu + pattern_stats.total_elements_gpu,
247 bytes_transferred: dfg_stats.bytes_to_gpu
248 + dfg_stats.bytes_from_gpu
249 + pattern_stats.bytes_to_gpu
250 + pattern_stats.bytes_from_gpu,
251 dfg_using_gpu: self.dfg_kernel.is_using_gpu(),
252 pattern_using_gpu: self.pattern_kernel.is_using_gpu(),
253 }
254 }
255
256 pub fn handle_control(&mut self, msg: ControlMessage) {
258 match msg {
259 ControlMessage::Start => {
260 self.pipeline.start();
261 self.stats.is_running = true;
262 }
263 ControlMessage::Pause => {
264 self.pipeline.stop();
265 self.stats.is_running = false;
266 }
267 ControlMessage::Stop => {
268 self.pipeline.stop();
269 self.stats.is_running = false;
270 }
271 ControlMessage::Reset => {
272 self.reset();
273 }
274 ControlMessage::GetStats => {
275 }
277 }
278 }
279
280 pub fn tick(&mut self) -> Option<ProcessedBatch> {
282 if !self.pipeline.is_running() {
283 return None;
284 }
285
286 let start = std::time::Instant::now();
287
288 let events = self.pipeline.generate_batch();
290 if events.is_empty() {
291 return None;
292 }
293
294 let batch_id = self.next_batch_id;
295 self.next_batch_id += 1;
296
297 let dfg_result = self.dfg_kernel.process(&events);
299
300 self.analytics.dfg_metrics.calculate(&dfg_result.dfg);
302
303 let pattern_result = self.pattern_kernel.detect(dfg_result.dfg.nodes());
305 self.analytics
306 .pattern_aggregator
307 .add_patterns(&pattern_result.patterns);
308
309 let partial_order_result = self.partial_order_kernel.derive(&events);
311
312 let conformance_results = self.conformance_kernel.check(&events);
314
315 self.update_variations(&partial_order_result.traces, &conformance_results);
317
318 let total_time = start.elapsed().as_micros() as u64;
320 self.stats.record_batch(events.len() as u64, total_time);
321
322 let avg_duration = dfg_result
324 .dfg
325 .nodes()
326 .iter()
327 .map(|n| n.avg_duration_ms)
328 .sum::<f32>()
329 / dfg_result.dfg.nodes().len().max(1) as f32;
330
331 let fitness = conformance_results.avg_fitness();
332
333 self.analytics
334 .kpi_tracker
335 .update(events.len() as u64, avg_duration, fitness);
336 self.analytics
337 .kpi_tracker
338 .set_pattern_count(pattern_result.patterns.len() as u64);
339
340 self.current_dfg = dfg_result.dfg.clone();
342 self.current_patterns = pattern_result.patterns.clone();
343 self.current_partial_orders = partial_order_result.traces.clone();
344 self.current_conformance = Some(conformance_results);
345
346 if let Some(sender) = &self.result_sender {
348 let _ = sender.send(PipelineMessage::DfgUpdate(DfgUpdateMessage {
349 batch_id,
350 nodes: self.current_dfg.nodes().to_vec(),
351 edges: self.current_dfg.edges().to_vec(),
352 processing_time_us: dfg_result.total_time_us,
353 }));
354
355 let _ = sender.send(PipelineMessage::PatternsDetected(PatternBatchMessage {
356 batch_id,
357 patterns: self.current_patterns.clone(),
358 processing_time_us: pattern_result.total_time_us,
359 }));
360
361 let _ = sender.send(PipelineMessage::PartialOrders(PartialOrderBatchMessage {
362 batch_id,
363 traces: self.current_partial_orders.clone(),
364 processing_time_us: partial_order_result.total_time_us,
365 }));
366 }
367
368 Some(ProcessedBatch {
369 batch_id,
370 event_count: events.len(),
371 dfg_nodes: dfg_result.dfg.nodes().len(),
372 dfg_edges: dfg_result.dfg.edges().len(),
373 patterns_detected: pattern_result.patterns.len(),
374 partial_orders: partial_order_result.traces.len(),
375 processing_time_us: total_time,
376 })
377 }
378
379 pub fn reset(&mut self) {
381 self.pipeline.stop();
382 self.pipeline.reset_stats();
383 self.analytics = AnalyticsEngine::new();
384 self.stats = PipelineStats::default();
385 self.next_batch_id = 1;
386 self.current_dfg = crate::models::DFGGraph::default();
387 self.current_patterns.clear();
388 self.current_partial_orders.clear();
389 self.current_conformance = None;
390 self.variations.clear();
391 self.dfg_kernel = DfgConstructionKernel::default();
392 self.partial_order_kernel = PartialOrderKernel::default();
393 }
394
395 pub fn set_sector(&mut self, sector: SectorTemplate) {
397 let model = Self::create_model_from_sector(§or);
399 self.conformance_kernel = ConformanceKernel::new(model);
400 self.pipeline.set_sector(sector);
401 self.reset();
402 }
403
404 fn update_variations(
406 &mut self,
407 traces: &[crate::models::GpuPartialOrderTrace],
408 conformance: &crate::kernels::ConformanceCheckResult,
409 ) {
410 for trace in traces {
411 let activities: Vec<u32> = trace.activity_ids[..trace.activity_count as usize].to_vec();
413 let signature: String = activities
414 .iter()
415 .map(|a| a.to_string())
416 .collect::<Vec<_>>()
417 .join("->");
418
419 let is_conformant = conformance
421 .results
422 .iter()
423 .find(|r| r.trace_id == trace.case_id)
424 .map(|r| r.is_conformant())
425 .unwrap_or(true);
426
427 let duration = (trace.end_time.physical_ms - trace.start_time.physical_ms) as f32;
429
430 if let Some(var) = self.variations.get_mut(&signature) {
431 let total_duration = var.avg_duration_ms * var.count as f32 + duration;
433 var.count += 1;
434 var.avg_duration_ms = total_duration / var.count as f32;
435 } else {
436 self.variations.insert(
438 signature.clone(),
439 ProcessVariation {
440 signature,
441 activities,
442 count: 1,
443 avg_duration_ms: duration,
444 is_conformant,
445 },
446 );
447 }
448 }
449 }
450
451 pub fn pipeline(&self) -> &ProcessingPipeline {
453 &self.pipeline
454 }
455
456 pub fn pipeline_mut(&mut self) -> &mut ProcessingPipeline {
458 &mut self.pipeline
459 }
460}
461
462#[derive(Debug, Clone)]
464pub struct ProcessedBatch {
465 pub batch_id: u64,
467 pub event_count: usize,
469 pub dfg_nodes: usize,
471 pub dfg_edges: usize,
473 pub patterns_detected: usize,
475 pub partial_orders: usize,
477 pub processing_time_us: u64,
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use crate::fabric::{FinanceConfig, HealthcareConfig, ManufacturingConfig, PipelineConfig};
485
486 #[test]
487 fn test_coordinator_creation() {
488 let coord = PipelineCoordinator::new(
489 SectorTemplate::Healthcare(HealthcareConfig::default()),
490 PipelineConfig::default(),
491 );
492 assert!(!coord.stats().is_running);
493 }
494
495 #[test]
496 fn test_coordinator_lifecycle() {
497 let mut coord = PipelineCoordinator::new(
498 SectorTemplate::Manufacturing(ManufacturingConfig::default()),
499 PipelineConfig::default(),
500 );
501
502 coord.handle_control(ControlMessage::Start);
503 assert!(coord.stats().is_running);
504
505 coord.handle_control(ControlMessage::Pause);
506 assert!(!coord.stats().is_running);
507 }
508
509 #[test]
510 fn test_tick_processing() {
511 let mut coord = PipelineCoordinator::new(
512 SectorTemplate::Finance(FinanceConfig::default()),
513 PipelineConfig::default(),
514 );
515
516 coord.handle_control(ControlMessage::Start);
517 let result = coord.tick();
518
519 assert!(result.is_some());
520 let batch = result.unwrap();
521 assert!(batch.event_count > 0);
522 }
523}