Skip to main content

ringkernel_procint/fabric/
pipeline.rs

1//! Streaming pipeline for event processing.
2//!
3//! Connects event generation to GPU kernels and analytics.
4
5use crate::fabric::{GeneratorConfig, ProcessEventGenerator, SectorTemplate};
6use crate::models::GpuObjectEvent;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9
10/// Pipeline configuration.
11#[derive(Debug, Clone)]
12pub struct PipelineConfig {
13    /// Generator configuration.
14    pub generator: GeneratorConfig,
15    /// Batch size for GPU processing.
16    pub gpu_batch_size: usize,
17    /// Enable DFG construction.
18    pub enable_dfg: bool,
19    /// Enable pattern detection.
20    pub enable_patterns: bool,
21    /// Enable conformance checking.
22    pub enable_conformance: bool,
23    /// Enable partial order derivation.
24    pub enable_partial_order: bool,
25}
26
27impl Default for PipelineConfig {
28    fn default() -> Self {
29        Self {
30            generator: GeneratorConfig::default(),
31            gpu_batch_size: 4096,
32            enable_dfg: true,
33            enable_patterns: true,
34            enable_conformance: true,
35            enable_partial_order: true,
36        }
37    }
38}
39
40/// Pipeline statistics.
41#[derive(Debug, Clone, Default)]
42pub struct PipelineStats {
43    /// Total events processed.
44    pub events_processed: u64,
45    /// Total batches processed.
46    pub batches_processed: u64,
47    /// DFG updates.
48    pub dfg_updates: u64,
49    /// Patterns detected.
50    pub patterns_detected: u64,
51    /// Conformance checks performed.
52    pub conformance_checks: u64,
53    /// Average batch processing time (microseconds).
54    pub avg_batch_time_us: f64,
55}
56
57/// Streaming pipeline for process intelligence.
58pub struct ProcessingPipeline {
59    /// Sector template.
60    sector: SectorTemplate,
61    /// Pipeline configuration.
62    config: PipelineConfig,
63    /// Event generator.
64    generator: ProcessEventGenerator,
65    /// Running flag.
66    running: Arc<AtomicBool>,
67    /// Event counter.
68    event_count: Arc<AtomicU64>,
69    /// Batch counter.
70    batch_count: Arc<AtomicU64>,
71}
72
73impl ProcessingPipeline {
74    /// Create a new pipeline.
75    pub fn new(sector: SectorTemplate, config: PipelineConfig) -> Self {
76        let generator = ProcessEventGenerator::new(sector.clone(), config.generator.clone());
77
78        Self {
79            sector,
80            config,
81            generator,
82            running: Arc::new(AtomicBool::new(false)),
83            event_count: Arc::new(AtomicU64::new(0)),
84            batch_count: Arc::new(AtomicU64::new(0)),
85        }
86    }
87
88    /// Get the sector template.
89    pub fn sector(&self) -> &SectorTemplate {
90        &self.sector
91    }
92
93    /// Get the configuration.
94    pub fn config(&self) -> &PipelineConfig {
95        &self.config
96    }
97
98    /// Check if pipeline is running.
99    pub fn is_running(&self) -> bool {
100        self.running.load(Ordering::Relaxed)
101    }
102
103    /// Start the pipeline.
104    pub fn start(&self) {
105        self.running.store(true, Ordering::Relaxed);
106    }
107
108    /// Stop the pipeline.
109    pub fn stop(&self) {
110        self.running.store(false, Ordering::Relaxed);
111    }
112
113    /// Generate next batch of events.
114    pub fn generate_batch(&mut self) -> Vec<GpuObjectEvent> {
115        if !self.is_running() {
116            return Vec::new();
117        }
118
119        let events = self.generator.generate_batch(self.config.gpu_batch_size);
120        self.event_count
121            .fetch_add(events.len() as u64, Ordering::Relaxed);
122        self.batch_count.fetch_add(1, Ordering::Relaxed);
123        events
124    }
125
126    /// Get current statistics.
127    pub fn stats(&self) -> PipelineStats {
128        let generator_stats = self.generator.stats();
129        PipelineStats {
130            events_processed: self.event_count.load(Ordering::Relaxed),
131            batches_processed: self.batch_count.load(Ordering::Relaxed),
132            dfg_updates: generator_stats.total_events,
133            patterns_detected: generator_stats.bottleneck_count
134                + generator_stats.rework_count
135                + generator_stats.long_running_count,
136            conformance_checks: generator_stats.cases_completed,
137            avg_batch_time_us: 0.0, // Will be updated by actual processing
138        }
139    }
140
141    /// Get generator statistics.
142    pub fn generator_stats(&self) -> &crate::fabric::GeneratorStats {
143        self.generator.stats()
144    }
145
146    /// Get estimated throughput.
147    pub fn throughput(&self) -> f32 {
148        self.generator.throughput()
149    }
150
151    /// Reset statistics.
152    pub fn reset_stats(&mut self) {
153        self.event_count.store(0, Ordering::Relaxed);
154        self.batch_count.store(0, Ordering::Relaxed);
155    }
156
157    /// Change sector template.
158    pub fn set_sector(&mut self, sector: SectorTemplate) {
159        self.sector = sector.clone();
160        self.generator = ProcessEventGenerator::new(sector, self.config.generator.clone());
161        self.reset_stats();
162    }
163
164    /// Update generator configuration.
165    pub fn set_generator_config(&mut self, config: GeneratorConfig) {
166        self.config.generator = config.clone();
167        self.generator = ProcessEventGenerator::new(self.sector.clone(), config);
168    }
169}
170
171/// Pipeline builder for convenient setup.
172#[derive(Debug, Default)]
173pub struct PipelineBuilder {
174    sector: Option<SectorTemplate>,
175    config: PipelineConfig,
176}
177
178impl PipelineBuilder {
179    /// Create a new builder.
180    pub fn new() -> Self {
181        Self::default()
182    }
183
184    /// Set sector template.
185    pub fn with_sector(mut self, sector: SectorTemplate) -> Self {
186        self.sector = Some(sector);
187        self
188    }
189
190    /// Set events per second.
191    pub fn with_events_per_second(mut self, eps: u32) -> Self {
192        self.config.generator.events_per_second = eps;
193        self
194    }
195
196    /// Set GPU batch size.
197    pub fn with_gpu_batch_size(mut self, size: usize) -> Self {
198        self.config.gpu_batch_size = size;
199        self
200    }
201
202    /// Enable/disable DFG construction.
203    pub fn with_dfg(mut self, enabled: bool) -> Self {
204        self.config.enable_dfg = enabled;
205        self
206    }
207
208    /// Enable/disable pattern detection.
209    pub fn with_patterns(mut self, enabled: bool) -> Self {
210        self.config.enable_patterns = enabled;
211        self
212    }
213
214    /// Enable/disable conformance checking.
215    pub fn with_conformance(mut self, enabled: bool) -> Self {
216        self.config.enable_conformance = enabled;
217        self
218    }
219
220    /// Build the pipeline.
221    pub fn build(self) -> ProcessingPipeline {
222        let sector = self.sector.unwrap_or_default();
223        ProcessingPipeline::new(sector, self.config)
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::fabric::{HealthcareConfig, ManufacturingConfig};
231
232    #[test]
233    fn test_pipeline_creation() {
234        let pipeline = PipelineBuilder::new()
235            .with_sector(SectorTemplate::Healthcare(HealthcareConfig::default()))
236            .with_events_per_second(10000)
237            .build();
238
239        assert!(!pipeline.is_running());
240        assert_eq!(pipeline.sector().name(), "Healthcare");
241    }
242
243    #[test]
244    fn test_pipeline_start_stop() {
245        let pipeline =
246            ProcessingPipeline::new(SectorTemplate::default(), PipelineConfig::default());
247
248        assert!(!pipeline.is_running());
249        pipeline.start();
250        assert!(pipeline.is_running());
251        pipeline.stop();
252        assert!(!pipeline.is_running());
253    }
254
255    #[test]
256    fn test_generate_batch() {
257        let mut pipeline = PipelineBuilder::new()
258            .with_sector(SectorTemplate::Manufacturing(ManufacturingConfig::default()))
259            .with_gpu_batch_size(100)
260            .build();
261
262        pipeline.start();
263        let batch = pipeline.generate_batch();
264        assert!(!batch.is_empty());
265
266        let stats = pipeline.stats();
267        assert!(stats.events_processed > 0);
268    }
269}