ringkernel_procint/actors/
coordinator.rs

1//! Pipeline coordinator for process intelligence.
2//!
3//! Orchestrates the flow of events through GPU kernels.
4
5use super::{
6    ControlMessage, DfgUpdateMessage, PartialOrderBatchMessage, PatternBatchMessage,
7    PipelineMessage, PipelineStats,
8};
9use crate::analytics::AnalyticsEngine;
10use crate::cuda::GpuStatus;
11use crate::fabric::{PipelineConfig, ProcessingPipeline, SectorTemplate};
12use crate::kernels::{
13    ConformanceKernel, DfgConstructionKernel, PartialOrderKernel, PatternConfig,
14    PatternDetectionKernel,
15};
16use crate::models::ProcessModel;
17use std::collections::HashMap;
18use std::sync::mpsc::{channel, Receiver, Sender};
19
20/// A process variation (unique trace pattern).
21#[derive(Debug, Clone)]
22pub struct ProcessVariation {
23    /// Signature (activity sequence as string).
24    pub signature: String,
25    /// Activity IDs in order.
26    pub activities: Vec<u32>,
27    /// Number of cases with this variation.
28    pub count: u64,
29    /// Average duration.
30    pub avg_duration_ms: f32,
31    /// Is conformant to model.
32    pub is_conformant: bool,
33}
34
35/// GPU statistics report.
36#[derive(Debug, Clone, Default)]
37pub struct GpuStatsReport {
38    /// DFG kernel launches.
39    pub dfg_kernel_launches: u64,
40    /// Pattern detection kernel launches.
41    pub pattern_kernel_launches: u64,
42    /// Total GPU execution time in microseconds.
43    pub total_gpu_time_us: u64,
44    /// Total elements processed on GPU.
45    pub total_elements_gpu: u64,
46    /// Total bytes transferred to/from GPU.
47    pub bytes_transferred: u64,
48    /// Whether DFG kernel is using GPU.
49    pub dfg_using_gpu: bool,
50    /// Whether pattern kernel is using GPU.
51    pub pattern_using_gpu: bool,
52}
53
54impl GpuStatsReport {
55    /// Check if any GPU kernels are active.
56    pub fn is_gpu_active(&self) -> bool {
57        self.dfg_using_gpu || self.pattern_using_gpu
58    }
59
60    /// Get GPU throughput in elements per second.
61    pub fn throughput(&self) -> f64 {
62        if self.total_gpu_time_us > 0 {
63            self.total_elements_gpu as f64 * 1_000_000.0 / self.total_gpu_time_us as f64
64        } else {
65            0.0
66        }
67    }
68
69    /// Get total kernel launches.
70    pub fn total_launches(&self) -> u64 {
71        self.dfg_kernel_launches + self.pattern_kernel_launches
72    }
73}
74
75/// Pipeline coordinator managing all processing stages.
76pub struct PipelineCoordinator {
77    /// Data generation pipeline.
78    pipeline: ProcessingPipeline,
79    /// DFG construction kernel.
80    dfg_kernel: DfgConstructionKernel,
81    /// Pattern detection kernel.
82    pattern_kernel: PatternDetectionKernel,
83    /// Partial order kernel.
84    partial_order_kernel: PartialOrderKernel,
85    /// Conformance kernel.
86    conformance_kernel: ConformanceKernel,
87    /// Analytics engine.
88    analytics: AnalyticsEngine,
89    /// Pipeline statistics.
90    stats: PipelineStats,
91    /// Message sender for results.
92    result_sender: Option<Sender<PipelineMessage>>,
93    /// Next batch ID.
94    next_batch_id: u64,
95    /// Current DFG (updated each tick).
96    current_dfg: crate::models::DFGGraph,
97    /// Current patterns.
98    current_patterns: Vec<crate::models::GpuPatternMatch>,
99    /// Current partial orders.
100    current_partial_orders: Vec<crate::models::GpuPartialOrderTrace>,
101    /// Current conformance results.
102    current_conformance: Option<crate::kernels::ConformanceCheckResult>,
103    /// Process variations (unique trace signatures).
104    variations: std::collections::HashMap<String, ProcessVariation>,
105}
106
107impl PipelineCoordinator {
108    /// Create a new pipeline coordinator.
109    pub fn new(sector: SectorTemplate, config: PipelineConfig) -> Self {
110        // Create a reference model from the sector for conformance checking
111        let model = Self::create_model_from_sector(&sector);
112        let pipeline = ProcessingPipeline::new(sector, config);
113
114        Self {
115            pipeline,
116            dfg_kernel: DfgConstructionKernel::default(),
117            pattern_kernel: PatternDetectionKernel::new(PatternConfig::default()),
118            partial_order_kernel: PartialOrderKernel::default(),
119            conformance_kernel: ConformanceKernel::new(model),
120            analytics: AnalyticsEngine::new(),
121            stats: PipelineStats::default(),
122            result_sender: None,
123            next_batch_id: 1,
124            current_dfg: crate::models::DFGGraph::default(),
125            current_patterns: Vec::new(),
126            current_partial_orders: Vec::new(),
127            current_conformance: None,
128            variations: HashMap::new(),
129        }
130    }
131
132    /// Create a process model from sector template.
133    fn create_model_from_sector(sector: &SectorTemplate) -> ProcessModel {
134        use crate::models::ProcessModelType;
135
136        let mut model = ProcessModel::new(1, sector.name(), ProcessModelType::DFG);
137        let registry = sector.build_registry();
138
139        // Add transitions from sector, converting names to IDs
140        for trans in sector.transitions() {
141            if let (Some(source), Some(target)) = (
142                registry.get_by_name(trans.source),
143                registry.get_by_name(trans.target),
144            ) {
145                model.add_transition(source.id, target.id);
146            }
147        }
148
149        // Add start/end activities
150        for name in sector.start_activities() {
151            if let Some(activity) = registry.get_by_name(name) {
152                model.start_activities.push(activity.id);
153            }
154        }
155        for name in sector.end_activities() {
156            if let Some(activity) = registry.get_by_name(name) {
157                model.end_activities.push(activity.id);
158            }
159        }
160
161        model
162    }
163
164    /// Set reference model for conformance checking.
165    pub fn with_conformance_model(mut self, model: ProcessModel) -> Self {
166        self.conformance_kernel = ConformanceKernel::new(model);
167        self
168    }
169
170    /// Create result channel.
171    pub fn create_result_channel(&mut self) -> Receiver<PipelineMessage> {
172        let (sender, receiver) = channel();
173        self.result_sender = Some(sender);
174        receiver
175    }
176
177    /// Get current statistics.
178    pub fn stats(&self) -> &PipelineStats {
179        &self.stats
180    }
181
182    /// Get analytics engine.
183    pub fn analytics(&self) -> &AnalyticsEngine {
184        &self.analytics
185    }
186
187    /// Get mutable analytics engine.
188    pub fn analytics_mut(&mut self) -> &mut AnalyticsEngine {
189        &mut self.analytics
190    }
191
192    /// Get current DFG graph.
193    pub fn current_dfg(&self) -> &crate::models::DFGGraph {
194        &self.current_dfg
195    }
196
197    /// Get current detected patterns.
198    pub fn current_patterns(&self) -> &[crate::models::GpuPatternMatch] {
199        &self.current_patterns
200    }
201
202    /// Get current partial orders.
203    pub fn current_partial_orders(&self) -> &[crate::models::GpuPartialOrderTrace] {
204        &self.current_partial_orders
205    }
206
207    /// Get current conformance results.
208    pub fn current_conformance(&self) -> Option<&crate::kernels::ConformanceCheckResult> {
209        self.current_conformance.as_ref()
210    }
211
212    /// Get process variations.
213    pub fn variations(&self) -> &HashMap<String, ProcessVariation> {
214        &self.variations
215    }
216
217    /// Get GPU status for DFG kernel.
218    pub fn dfg_gpu_status(&self) -> GpuStatus {
219        self.dfg_kernel.gpu_status()
220    }
221
222    /// Get GPU status for pattern detection kernel.
223    pub fn pattern_gpu_status(&self) -> GpuStatus {
224        self.pattern_kernel.gpu_status()
225    }
226
227    /// Check if DFG kernel is using GPU.
228    pub fn is_dfg_using_gpu(&self) -> bool {
229        self.dfg_kernel.is_using_gpu()
230    }
231
232    /// Check if pattern kernel is using GPU.
233    pub fn is_pattern_using_gpu(&self) -> bool {
234        self.pattern_kernel.is_using_gpu()
235    }
236
237    /// Get combined GPU stats.
238    pub fn gpu_stats(&self) -> GpuStatsReport {
239        let dfg_stats = self.dfg_kernel.gpu_stats();
240        let pattern_stats = self.pattern_kernel.gpu_stats();
241
242        GpuStatsReport {
243            dfg_kernel_launches: dfg_stats.kernel_launches,
244            pattern_kernel_launches: pattern_stats.kernel_launches,
245            total_gpu_time_us: dfg_stats.total_gpu_time_us + pattern_stats.total_gpu_time_us,
246            total_elements_gpu: dfg_stats.total_elements_gpu + pattern_stats.total_elements_gpu,
247            bytes_transferred: dfg_stats.bytes_to_gpu
248                + dfg_stats.bytes_from_gpu
249                + pattern_stats.bytes_to_gpu
250                + pattern_stats.bytes_from_gpu,
251            dfg_using_gpu: self.dfg_kernel.is_using_gpu(),
252            pattern_using_gpu: self.pattern_kernel.is_using_gpu(),
253        }
254    }
255
256    /// Handle control message.
257    pub fn handle_control(&mut self, msg: ControlMessage) {
258        match msg {
259            ControlMessage::Start => {
260                self.pipeline.start();
261                self.stats.is_running = true;
262            }
263            ControlMessage::Pause => {
264                self.pipeline.stop();
265                self.stats.is_running = false;
266            }
267            ControlMessage::Stop => {
268                self.pipeline.stop();
269                self.stats.is_running = false;
270            }
271            ControlMessage::Reset => {
272                self.reset();
273            }
274            ControlMessage::GetStats => {
275                // Stats are available via stats() method
276            }
277        }
278    }
279
280    /// Process a single tick (generate + process batch).
281    pub fn tick(&mut self) -> Option<ProcessedBatch> {
282        if !self.pipeline.is_running() {
283            return None;
284        }
285
286        let start = std::time::Instant::now();
287
288        // Generate events
289        let events = self.pipeline.generate_batch();
290        if events.is_empty() {
291            return None;
292        }
293
294        let batch_id = self.next_batch_id;
295        self.next_batch_id += 1;
296
297        // Process through DFG kernel
298        let dfg_result = self.dfg_kernel.process(&events);
299
300        // Update analytics
301        self.analytics.dfg_metrics.calculate(&dfg_result.dfg);
302
303        // Detect patterns
304        let pattern_result = self.pattern_kernel.detect(dfg_result.dfg.nodes());
305        self.analytics
306            .pattern_aggregator
307            .add_patterns(&pattern_result.patterns);
308
309        // Derive partial orders
310        let partial_order_result = self.partial_order_kernel.derive(&events);
311
312        // Conformance checking
313        let conformance_results = self.conformance_kernel.check(&events);
314
315        // Update variations from partial orders
316        self.update_variations(&partial_order_result.traces, &conformance_results);
317
318        // Update KPIs
319        let total_time = start.elapsed().as_micros() as u64;
320        self.stats.record_batch(events.len() as u64, total_time);
321
322        // Update KPI tracker
323        let avg_duration = dfg_result
324            .dfg
325            .nodes()
326            .iter()
327            .map(|n| n.avg_duration_ms)
328            .sum::<f32>()
329            / dfg_result.dfg.nodes().len().max(1) as f32;
330
331        let fitness = conformance_results.avg_fitness();
332
333        self.analytics
334            .kpi_tracker
335            .update(events.len() as u64, avg_duration, fitness);
336        self.analytics
337            .kpi_tracker
338            .set_pattern_count(pattern_result.patterns.len() as u64);
339
340        // Store current results for access via getters
341        self.current_dfg = dfg_result.dfg.clone();
342        self.current_patterns = pattern_result.patterns.clone();
343        self.current_partial_orders = partial_order_result.traces.clone();
344        self.current_conformance = Some(conformance_results);
345
346        // Send results if channel exists
347        if let Some(sender) = &self.result_sender {
348            let _ = sender.send(PipelineMessage::DfgUpdate(DfgUpdateMessage {
349                batch_id,
350                nodes: self.current_dfg.nodes().to_vec(),
351                edges: self.current_dfg.edges().to_vec(),
352                processing_time_us: dfg_result.total_time_us,
353            }));
354
355            let _ = sender.send(PipelineMessage::PatternsDetected(PatternBatchMessage {
356                batch_id,
357                patterns: self.current_patterns.clone(),
358                processing_time_us: pattern_result.total_time_us,
359            }));
360
361            let _ = sender.send(PipelineMessage::PartialOrders(PartialOrderBatchMessage {
362                batch_id,
363                traces: self.current_partial_orders.clone(),
364                processing_time_us: partial_order_result.total_time_us,
365            }));
366        }
367
368        Some(ProcessedBatch {
369            batch_id,
370            event_count: events.len(),
371            dfg_nodes: dfg_result.dfg.nodes().len(),
372            dfg_edges: dfg_result.dfg.edges().len(),
373            patterns_detected: pattern_result.patterns.len(),
374            partial_orders: partial_order_result.traces.len(),
375            processing_time_us: total_time,
376        })
377    }
378
379    /// Reset all state.
380    pub fn reset(&mut self) {
381        self.pipeline.stop();
382        self.pipeline.reset_stats();
383        self.analytics = AnalyticsEngine::new();
384        self.stats = PipelineStats::default();
385        self.next_batch_id = 1;
386        self.current_dfg = crate::models::DFGGraph::default();
387        self.current_patterns.clear();
388        self.current_partial_orders.clear();
389        self.current_conformance = None;
390        self.variations.clear();
391        self.dfg_kernel = DfgConstructionKernel::default();
392        self.partial_order_kernel = PartialOrderKernel::default();
393    }
394
395    /// Change sector.
396    pub fn set_sector(&mut self, sector: SectorTemplate) {
397        // Update conformance model for new sector
398        let model = Self::create_model_from_sector(&sector);
399        self.conformance_kernel = ConformanceKernel::new(model);
400        self.pipeline.set_sector(sector);
401        self.reset();
402    }
403
404    /// Update process variations from traces.
405    fn update_variations(
406        &mut self,
407        traces: &[crate::models::GpuPartialOrderTrace],
408        conformance: &crate::kernels::ConformanceCheckResult,
409    ) {
410        for trace in traces {
411            // Create signature from activity sequence
412            let activities: Vec<u32> = trace.activity_ids[..trace.activity_count as usize].to_vec();
413            let signature: String = activities
414                .iter()
415                .map(|a| a.to_string())
416                .collect::<Vec<_>>()
417                .join("->");
418
419            // Check if this case is conformant
420            let is_conformant = conformance
421                .results
422                .iter()
423                .find(|r| r.trace_id == trace.case_id)
424                .map(|r| r.is_conformant())
425                .unwrap_or(true);
426
427            // Update or create variation
428            let duration = (trace.end_time.physical_ms - trace.start_time.physical_ms) as f32;
429
430            if let Some(var) = self.variations.get_mut(&signature) {
431                // Update existing
432                let total_duration = var.avg_duration_ms * var.count as f32 + duration;
433                var.count += 1;
434                var.avg_duration_ms = total_duration / var.count as f32;
435            } else {
436                // Create new
437                self.variations.insert(
438                    signature.clone(),
439                    ProcessVariation {
440                        signature,
441                        activities,
442                        count: 1,
443                        avg_duration_ms: duration,
444                        is_conformant,
445                    },
446                );
447            }
448        }
449    }
450
451    /// Get pipeline reference.
452    pub fn pipeline(&self) -> &ProcessingPipeline {
453        &self.pipeline
454    }
455
456    /// Get mutable pipeline reference.
457    pub fn pipeline_mut(&mut self) -> &mut ProcessingPipeline {
458        &mut self.pipeline
459    }
460}
461
462/// Summary of a processed batch.
463#[derive(Debug, Clone)]
464pub struct ProcessedBatch {
465    /// Batch ID.
466    pub batch_id: u64,
467    /// Number of events processed.
468    pub event_count: usize,
469    /// Number of DFG nodes.
470    pub dfg_nodes: usize,
471    /// Number of DFG edges.
472    pub dfg_edges: usize,
473    /// Number of patterns detected.
474    pub patterns_detected: usize,
475    /// Number of partial orders derived.
476    pub partial_orders: usize,
477    /// Total processing time in microseconds.
478    pub processing_time_us: u64,
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use crate::fabric::{FinanceConfig, HealthcareConfig, ManufacturingConfig, PipelineConfig};
485
486    #[test]
487    fn test_coordinator_creation() {
488        let coord = PipelineCoordinator::new(
489            SectorTemplate::Healthcare(HealthcareConfig::default()),
490            PipelineConfig::default(),
491        );
492        assert!(!coord.stats().is_running);
493    }
494
495    #[test]
496    fn test_coordinator_lifecycle() {
497        let mut coord = PipelineCoordinator::new(
498            SectorTemplate::Manufacturing(ManufacturingConfig::default()),
499            PipelineConfig::default(),
500        );
501
502        coord.handle_control(ControlMessage::Start);
503        assert!(coord.stats().is_running);
504
505        coord.handle_control(ControlMessage::Pause);
506        assert!(!coord.stats().is_running);
507    }
508
509    #[test]
510    fn test_tick_processing() {
511        let mut coord = PipelineCoordinator::new(
512            SectorTemplate::Finance(FinanceConfig::default()),
513            PipelineConfig::default(),
514        );
515
516        coord.handle_control(ControlMessage::Start);
517        let result = coord.tick();
518
519        assert!(result.is_some());
520        let batch = result.unwrap();
521        assert!(batch.event_count > 0);
522    }
523}