ringkernel_procint/fabric/
pipeline.rs1use crate::fabric::{GeneratorConfig, ProcessEventGenerator, SectorTemplate};
6use crate::models::GpuObjectEvent;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9
10#[derive(Debug, Clone)]
12pub struct PipelineConfig {
13 pub generator: GeneratorConfig,
15 pub gpu_batch_size: usize,
17 pub enable_dfg: bool,
19 pub enable_patterns: bool,
21 pub enable_conformance: bool,
23 pub enable_partial_order: bool,
25}
26
27impl Default for PipelineConfig {
28 fn default() -> Self {
29 Self {
30 generator: GeneratorConfig::default(),
31 gpu_batch_size: 4096,
32 enable_dfg: true,
33 enable_patterns: true,
34 enable_conformance: true,
35 enable_partial_order: true,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct PipelineStats {
43 pub events_processed: u64,
45 pub batches_processed: u64,
47 pub dfg_updates: u64,
49 pub patterns_detected: u64,
51 pub conformance_checks: u64,
53 pub avg_batch_time_us: f64,
55}
56
57pub struct ProcessingPipeline {
59 sector: SectorTemplate,
61 config: PipelineConfig,
63 generator: ProcessEventGenerator,
65 running: Arc<AtomicBool>,
67 event_count: Arc<AtomicU64>,
69 batch_count: Arc<AtomicU64>,
71}
72
73impl ProcessingPipeline {
74 pub fn new(sector: SectorTemplate, config: PipelineConfig) -> Self {
76 let generator = ProcessEventGenerator::new(sector.clone(), config.generator.clone());
77
78 Self {
79 sector,
80 config,
81 generator,
82 running: Arc::new(AtomicBool::new(false)),
83 event_count: Arc::new(AtomicU64::new(0)),
84 batch_count: Arc::new(AtomicU64::new(0)),
85 }
86 }
87
88 pub fn sector(&self) -> &SectorTemplate {
90 &self.sector
91 }
92
93 pub fn config(&self) -> &PipelineConfig {
95 &self.config
96 }
97
98 pub fn is_running(&self) -> bool {
100 self.running.load(Ordering::Relaxed)
101 }
102
103 pub fn start(&self) {
105 self.running.store(true, Ordering::Relaxed);
106 }
107
108 pub fn stop(&self) {
110 self.running.store(false, Ordering::Relaxed);
111 }
112
113 pub fn generate_batch(&mut self) -> Vec<GpuObjectEvent> {
115 if !self.is_running() {
116 return Vec::new();
117 }
118
119 let events = self.generator.generate_batch(self.config.gpu_batch_size);
120 self.event_count
121 .fetch_add(events.len() as u64, Ordering::Relaxed);
122 self.batch_count.fetch_add(1, Ordering::Relaxed);
123 events
124 }
125
126 pub fn stats(&self) -> PipelineStats {
128 let generator_stats = self.generator.stats();
129 PipelineStats {
130 events_processed: self.event_count.load(Ordering::Relaxed),
131 batches_processed: self.batch_count.load(Ordering::Relaxed),
132 dfg_updates: generator_stats.total_events,
133 patterns_detected: generator_stats.bottleneck_count
134 + generator_stats.rework_count
135 + generator_stats.long_running_count,
136 conformance_checks: generator_stats.cases_completed,
137 avg_batch_time_us: 0.0, }
139 }
140
141 pub fn generator_stats(&self) -> &crate::fabric::GeneratorStats {
143 self.generator.stats()
144 }
145
146 pub fn throughput(&self) -> f32 {
148 self.generator.throughput()
149 }
150
151 pub fn reset_stats(&mut self) {
153 self.event_count.store(0, Ordering::Relaxed);
154 self.batch_count.store(0, Ordering::Relaxed);
155 }
156
157 pub fn set_sector(&mut self, sector: SectorTemplate) {
159 self.sector = sector.clone();
160 self.generator = ProcessEventGenerator::new(sector, self.config.generator.clone());
161 self.reset_stats();
162 }
163
164 pub fn set_generator_config(&mut self, config: GeneratorConfig) {
166 self.config.generator = config.clone();
167 self.generator = ProcessEventGenerator::new(self.sector.clone(), config);
168 }
169}
170
171#[derive(Debug, Default)]
173pub struct PipelineBuilder {
174 sector: Option<SectorTemplate>,
175 config: PipelineConfig,
176}
177
178impl PipelineBuilder {
179 pub fn new() -> Self {
181 Self::default()
182 }
183
184 pub fn with_sector(mut self, sector: SectorTemplate) -> Self {
186 self.sector = Some(sector);
187 self
188 }
189
190 pub fn with_events_per_second(mut self, eps: u32) -> Self {
192 self.config.generator.events_per_second = eps;
193 self
194 }
195
196 pub fn with_gpu_batch_size(mut self, size: usize) -> Self {
198 self.config.gpu_batch_size = size;
199 self
200 }
201
202 pub fn with_dfg(mut self, enabled: bool) -> Self {
204 self.config.enable_dfg = enabled;
205 self
206 }
207
208 pub fn with_patterns(mut self, enabled: bool) -> Self {
210 self.config.enable_patterns = enabled;
211 self
212 }
213
214 pub fn with_conformance(mut self, enabled: bool) -> Self {
216 self.config.enable_conformance = enabled;
217 self
218 }
219
220 pub fn build(self) -> ProcessingPipeline {
222 let sector = self.sector.unwrap_or_default();
223 ProcessingPipeline::new(sector, self.config)
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::fabric::{HealthcareConfig, ManufacturingConfig};
231
232 #[test]
233 fn test_pipeline_creation() {
234 let pipeline = PipelineBuilder::new()
235 .with_sector(SectorTemplate::Healthcare(HealthcareConfig::default()))
236 .with_events_per_second(10000)
237 .build();
238
239 assert!(!pipeline.is_running());
240 assert_eq!(pipeline.sector().name(), "Healthcare");
241 }
242
243 #[test]
244 fn test_pipeline_start_stop() {
245 let pipeline =
246 ProcessingPipeline::new(SectorTemplate::default(), PipelineConfig::default());
247
248 assert!(!pipeline.is_running());
249 pipeline.start();
250 assert!(pipeline.is_running());
251 pipeline.stop();
252 assert!(!pipeline.is_running());
253 }
254
255 #[test]
256 fn test_generate_batch() {
257 let mut pipeline = PipelineBuilder::new()
258 .with_sector(SectorTemplate::Manufacturing(ManufacturingConfig::default()))
259 .with_gpu_batch_size(100)
260 .build();
261
262 pipeline.start();
263 let batch = pipeline.generate_batch();
264 assert!(!batch.is_empty());
265
266 let stats = pipeline.stats();
267 assert!(stats.events_processed > 0);
268 }
269}