ringkernel_procint/fabric/
event_gen.rs

1//! Process event generator for synthetic data production.
2
3use super::{AnomalyConfig, ParallelActivityDef, SectorTemplate};
4use crate::models::{ActivityId, ActivityRegistry, EventType, GpuObjectEvent, HybridTimestamp};
5use rand::prelude::*;
6use rand_distr::Normal;
7use std::collections::HashMap;
8
9/// Generator configuration.
10#[derive(Debug, Clone)]
11pub struct GeneratorConfig {
12    /// Target events per second.
13    pub events_per_second: u32,
14    /// Batch size for generation.
15    pub batch_size: usize,
16    /// Number of concurrent cases.
17    pub concurrent_cases: u32,
18    /// Deviation rate from reference model (0.0 = perfect conformance).
19    pub deviation_rate: f32,
20    /// Anomaly injection configuration.
21    pub anomalies: AnomalyConfig,
22    /// Random seed for reproducibility.
23    pub seed: Option<u64>,
24}
25
26impl Default for GeneratorConfig {
27    fn default() -> Self {
28        Self {
29            events_per_second: 10_000,
30            batch_size: 1000,
31            concurrent_cases: 100,
32            deviation_rate: 0.1,
33            anomalies: AnomalyConfig::default(),
34            seed: None,
35        }
36    }
37}
38
39impl GeneratorConfig {
40    /// Set sector template.
41    pub fn with_sector(self, _sector: SectorTemplate) -> Self {
42        self
43    }
44
45    /// Set events per second.
46    pub fn with_events_per_second(mut self, eps: u32) -> Self {
47        self.events_per_second = eps;
48        self
49    }
50
51    /// Set deviation rate.
52    pub fn with_deviation_rate(mut self, rate: f32) -> Self {
53        self.deviation_rate = rate;
54        self
55    }
56
57    /// Set anomaly config.
58    pub fn with_anomalies(mut self, anomalies: AnomalyConfig) -> Self {
59        self.anomalies = anomalies;
60        self
61    }
62}
63
64/// State of an active case.
65#[derive(Debug, Clone)]
66struct CaseState {
67    /// Case/object ID.
68    case_id: u64,
69    /// Current activity index in registry (None if in parallel mode).
70    current_activity: Option<ActivityId>,
71    /// Activities completed so far.
72    completed_activities: Vec<ActivityId>,
73    /// Last event time.
74    last_event_time: HybridTimestamp,
75    /// Is case complete?
76    is_complete: bool,
77    /// Injected anomaly type (if any).
78    anomaly: Option<AnomalyType>,
79    /// Parallel activities in flight (activity_id, start_time, duration_ms).
80    parallel_activities: Vec<ParallelActivityState>,
81    /// Join activity to transition to after all parallel activities complete.
82    join_to: Option<ActivityId>,
83}
84
85/// State of a parallel activity in flight.
86#[derive(Debug, Clone)]
87struct ParallelActivityState {
88    /// Activity ID.
89    activity_id: ActivityId,
90    /// When this activity started.
91    start_time: HybridTimestamp,
92    /// Expected duration in ms.
93    duration_ms: u32,
94    /// Whether event has been emitted.
95    event_emitted: bool,
96}
97
98/// Type of anomaly injected.
99#[derive(Debug, Clone, Copy)]
100enum AnomalyType {
101    Bottleneck,
102    Rework,
103    LongRunning,
104    Skip,
105}
106
107/// Process event generator.
108pub struct ProcessEventGenerator {
109    /// Sector template.
110    sector: SectorTemplate,
111    /// Generator configuration.
112    config: GeneratorConfig,
113    /// Activity registry.
114    registry: ActivityRegistry,
115    /// Transition map: activity_id -> [(target_id, probability, avg_time)]
116    transitions: HashMap<ActivityId, Vec<(ActivityId, f32, u32)>>,
117    /// Parallel activity definitions: fork_activity_id -> ParallelActivityDef
118    parallel_defs: HashMap<ActivityId, ParallelActivityDef>,
119    /// Random number generator.
120    rng: StdRng,
121    /// Next event ID.
122    next_event_id: u64,
123    /// Next case ID.
124    next_case_id: u64,
125    /// Active cases.
126    active_cases: HashMap<u64, CaseState>,
127    /// Current simulation time.
128    current_time: HybridTimestamp,
129    /// Statistics.
130    stats: GeneratorStats,
131}
132
133/// Generator statistics.
134#[derive(Debug, Clone, Default)]
135pub struct GeneratorStats {
136    /// Total events generated.
137    pub total_events: u64,
138    /// Total cases started.
139    pub cases_started: u64,
140    /// Total cases completed.
141    pub cases_completed: u64,
142    /// Bottleneck anomalies injected.
143    pub bottleneck_count: u64,
144    /// Rework anomalies injected.
145    pub rework_count: u64,
146    /// Long-running anomalies injected.
147    pub long_running_count: u64,
148    /// Skip anomalies injected.
149    pub skip_count: u64,
150}
151
152impl ProcessEventGenerator {
153    /// Create a new generator.
154    pub fn new(sector: SectorTemplate, config: GeneratorConfig) -> Self {
155        let rng = match config.seed {
156            Some(seed) => StdRng::seed_from_u64(seed),
157            None => StdRng::from_entropy(),
158        };
159
160        let registry = sector.build_registry();
161        let transitions = Self::build_transition_map(&sector, &registry);
162        let parallel_defs = Self::build_parallel_map(&sector, &registry);
163
164        Self {
165            sector,
166            config,
167            registry,
168            transitions,
169            parallel_defs,
170            rng,
171            next_event_id: 1,
172            next_case_id: 1,
173            active_cases: HashMap::new(),
174            current_time: HybridTimestamp::now(),
175            stats: GeneratorStats::default(),
176        }
177    }
178
179    /// Build parallel activity map from sector template.
180    fn build_parallel_map(
181        sector: &SectorTemplate,
182        registry: &ActivityRegistry,
183    ) -> HashMap<ActivityId, ParallelActivityDef> {
184        let mut map = HashMap::new();
185        for def in sector.parallel_activities() {
186            if let Some(fork_activity) = registry.get_by_name(def.fork_from) {
187                map.insert(fork_activity.id, def);
188            }
189        }
190        map
191    }
192
193    /// Build transition map from sector template.
194    fn build_transition_map(
195        sector: &SectorTemplate,
196        registry: &ActivityRegistry,
197    ) -> HashMap<ActivityId, Vec<(ActivityId, f32, u32)>> {
198        let mut map: HashMap<ActivityId, Vec<(ActivityId, f32, u32)>> = HashMap::new();
199
200        for trans in sector.transitions() {
201            if let (Some(source), Some(target)) = (
202                registry.get_by_name(trans.source),
203                registry.get_by_name(trans.target),
204            ) {
205                map.entry(source.id).or_default().push((
206                    target.id,
207                    trans.probability,
208                    trans.avg_transition_ms,
209                ));
210            }
211        }
212
213        // Normalize probabilities
214        for transitions in map.values_mut() {
215            let total: f32 = transitions.iter().map(|(_, p, _)| p).sum();
216            if total > 0.0 {
217                for (_, p, _) in transitions.iter_mut() {
218                    *p /= total;
219                }
220            }
221        }
222
223        map
224    }
225
226    /// Get statistics.
227    pub fn stats(&self) -> &GeneratorStats {
228        &self.stats
229    }
230
231    /// Get current throughput (events per second estimate).
232    pub fn throughput(&self) -> f32 {
233        self.config.events_per_second as f32
234    }
235
236    /// Generate a batch of events.
237    pub fn generate_batch(&mut self, batch_size: usize) -> Vec<GpuObjectEvent> {
238        let mut events = Vec::with_capacity(batch_size);
239
240        // Ensure we have enough active cases
241        while self.active_cases.len() < self.config.concurrent_cases as usize {
242            self.start_new_case();
243        }
244
245        for _ in 0..batch_size {
246            if let Some(event) = self.generate_next_event() {
247                events.push(event);
248            }
249        }
250
251        events
252    }
253
254    /// Start a new case.
255    fn start_new_case(&mut self) {
256        let case_id = self.next_case_id;
257        self.next_case_id += 1;
258
259        // Get start activity
260        let start_names = self.sector.start_activities();
261        let start_name = start_names[self.rng.gen_range(0..start_names.len())];
262        let start_activity = self
263            .registry
264            .get_by_name(start_name)
265            .map(|a| a.id)
266            .unwrap_or(1);
267
268        // Determine if this case will have an anomaly
269        let anomaly = self.determine_anomaly();
270
271        let state = CaseState {
272            case_id,
273            current_activity: Some(start_activity),
274            completed_activities: Vec::new(),
275            last_event_time: self.current_time,
276            is_complete: false,
277            anomaly,
278            parallel_activities: Vec::new(),
279            join_to: None,
280        };
281
282        self.active_cases.insert(case_id, state);
283        self.stats.cases_started += 1;
284
285        if let Some(anomaly) = anomaly {
286            match anomaly {
287                AnomalyType::Bottleneck => self.stats.bottleneck_count += 1,
288                AnomalyType::Rework => self.stats.rework_count += 1,
289                AnomalyType::LongRunning => self.stats.long_running_count += 1,
290                AnomalyType::Skip => self.stats.skip_count += 1,
291            }
292        }
293    }
294
295    /// Determine if and what anomaly to inject.
296    fn determine_anomaly(&mut self) -> Option<AnomalyType> {
297        let r: f32 = self.rng.gen();
298
299        if r < self.config.anomalies.bottleneck_rate {
300            Some(AnomalyType::Bottleneck)
301        } else if r < self.config.anomalies.bottleneck_rate + self.config.anomalies.rework_rate {
302            Some(AnomalyType::Rework)
303        } else if r < self.config.anomalies.bottleneck_rate
304            + self.config.anomalies.rework_rate
305            + self.config.anomalies.long_running_rate
306        {
307            Some(AnomalyType::LongRunning)
308        } else if r < self.config.anomalies.bottleneck_rate
309            + self.config.anomalies.rework_rate
310            + self.config.anomalies.long_running_rate
311            + self.config.anomalies.skip_rate
312        {
313            Some(AnomalyType::Skip)
314        } else {
315            None
316        }
317    }
318
319    /// Generate next event from active cases.
320    fn generate_next_event(&mut self) -> Option<GpuObjectEvent> {
321        // Pick a random active case
322        let case_ids: Vec<u64> = self.active_cases.keys().copied().collect();
323        if case_ids.is_empty() {
324            return None;
325        }
326
327        let case_id = case_ids[self.rng.gen_range(0..case_ids.len())];
328
329        // Remove case temporarily to avoid borrow conflicts
330        let mut case = self.active_cases.remove(&case_id)?;
331
332        // Check if case is in parallel mode
333        if !case.parallel_activities.is_empty() {
334            // Generate event for a parallel activity
335            let event = self.create_parallel_event(&mut case);
336
337            // Check if all parallel activities are done
338            if case.parallel_activities.iter().all(|p| p.event_emitted) {
339                // Join: all parallel activities complete, transition to join activity
340                case.parallel_activities.clear();
341                if let Some(join_activity) = case.join_to.take() {
342                    case.current_activity = Some(join_activity);
343                }
344            }
345
346            self.active_cases.insert(case_id, case);
347            return event;
348        }
349
350        // Normal sequential mode
351        let event = self.create_event(&mut case);
352
353        // Transition to next activity (may enter parallel mode)
354        let is_complete = self.transition_case(&mut case);
355
356        if is_complete {
357            self.stats.cases_completed += 1;
358        } else {
359            // Put case back
360            self.active_cases.insert(case_id, case);
361        }
362
363        Some(event)
364    }
365
366    /// Create event for a parallel activity.
367    fn create_parallel_event(&mut self, case: &mut CaseState) -> Option<GpuObjectEvent> {
368        // Find next un-emitted parallel activity
369        let idx = case
370            .parallel_activities
371            .iter()
372            .position(|p| !p.event_emitted)?;
373
374        let parallel = &mut case.parallel_activities[idx];
375        parallel.event_emitted = true;
376
377        let event_id = self.next_event_id;
378        self.next_event_id += 1;
379
380        // The event timestamp is the parallel activity's start time (they all start at the same time!)
381        let timestamp = parallel.start_time;
382        let duration = parallel.duration_ms;
383        let activity_id = parallel.activity_id;
384
385        case.completed_activities.push(activity_id);
386        self.stats.total_events += 1;
387
388        // Update last event time to max of parallel end times (for when join happens)
389        let event_end = HybridTimestamp::new(
390            timestamp.physical_ms + duration as u64,
391            timestamp.logical + 1,
392        );
393        if event_end.physical_ms > case.last_event_time.physical_ms {
394            case.last_event_time = event_end;
395        }
396
397        // Also update global time
398        if event_end.physical_ms > self.current_time.physical_ms {
399            self.current_time = event_end;
400        }
401
402        Some(GpuObjectEvent {
403            event_id,
404            object_id: case.case_id,
405            activity_id,
406            event_type: EventType::Complete as u8,
407            _padding1: [0; 3],
408            timestamp, // Same start time for all parallel activities!
409            resource_id: self.rng.gen_range(1..100),
410            cost: self.rng.gen_range(10.0..1000.0),
411            duration_ms: duration,
412            flags: 0,
413            attributes: [0; 4],
414            object_type_id: 0,
415            related_object_id: 0,
416            _reserved: [0; 36],
417        })
418    }
419
420    /// Create an event for the current activity of a case.
421    fn create_event(&mut self, case: &mut CaseState) -> GpuObjectEvent {
422        let event_id = self.next_event_id;
423        self.next_event_id += 1;
424
425        let current_activity = case.current_activity.unwrap_or(1);
426
427        // Get activity details
428        let activity = self.registry.get(current_activity);
429        let base_duration = activity.map(|a| a.expected_duration_ms).unwrap_or(60_000);
430
431        // Apply anomaly modifications
432        let duration = match case.anomaly {
433            Some(AnomalyType::Bottleneck) | Some(AnomalyType::LongRunning) => {
434                (base_duration as f32 * self.rng.gen_range(3.0..10.0)) as u32
435            }
436            _ => {
437                // Normal distribution around base duration
438                let std_dev = base_duration as f32 * 0.3;
439                let dist = Normal::new(base_duration as f64, std_dev as f64).unwrap();
440                dist.sample(&mut self.rng).max(1000.0) as u32
441            }
442        };
443
444        // Activity starts when the previous activity ended
445        // (case.last_event_time tracks the end of the last activity)
446        let activity_start_time = case.last_event_time;
447
448        // Update case's last_event_time to the END of this activity
449        case.last_event_time = HybridTimestamp::new(
450            activity_start_time.physical_ms + duration as u64,
451            activity_start_time.logical + 1,
452        );
453
454        // Also advance global time (for inter-case ordering)
455        if case.last_event_time.physical_ms > self.current_time.physical_ms {
456            self.current_time = case.last_event_time;
457        }
458
459        case.completed_activities.push(current_activity);
460        self.stats.total_events += 1;
461
462        GpuObjectEvent {
463            event_id,
464            object_id: case.case_id,
465            activity_id: current_activity,
466            event_type: EventType::Complete as u8,
467            _padding1: [0; 3],
468            timestamp: activity_start_time, // Activity START time
469            resource_id: self.rng.gen_range(1..100),
470            cost: self.rng.gen_range(10.0..1000.0),
471            duration_ms: duration,
472            flags: 0,
473            attributes: [0; 4],
474            object_type_id: 0,
475            related_object_id: 0,
476            _reserved: [0; 36],
477        }
478    }
479
480    /// Transition case to next activity. Returns true if case is complete.
481    fn transition_case(&mut self, case: &mut CaseState) -> bool {
482        let current_activity = match case.current_activity {
483            Some(act) => act,
484            None => {
485                case.is_complete = true;
486                return true;
487            }
488        };
489
490        // Check if current activity is an end activity
491        let end_names = self.sector.end_activities();
492        if let Some(activity) = self.registry.get(current_activity) {
493            if end_names.contains(&activity.name.as_str()) {
494                case.is_complete = true;
495                return true;
496            }
497        }
498
499        // Check if this activity triggers parallel execution
500        if let Some(parallel_def) = self.parallel_defs.get(&current_activity) {
501            // Roll for parallel vs sequential
502            if self.rng.gen::<f32>() < parallel_def.probability {
503                // Fork into parallel activities
504                return self.start_parallel_activities(case, parallel_def.clone());
505            }
506        }
507
508        // Get possible transitions
509        let transitions = self.transitions.get(&current_activity);
510
511        if let Some(trans) = transitions {
512            if trans.is_empty() {
513                case.is_complete = true;
514                return true;
515            }
516
517            // Handle rework anomaly - repeat previous activity
518            if matches!(case.anomaly, Some(AnomalyType::Rework))
519                && self.rng.gen::<f32>() < 0.3
520                && case.completed_activities.len() >= 2
521            {
522                let prev = case.completed_activities[case.completed_activities.len() - 2];
523                case.current_activity = Some(prev);
524                return false;
525            }
526
527            // Select next activity based on probabilities
528            let r: f32 = self.rng.gen();
529            let mut cumulative = 0.0;
530            for (target, prob, _time) in trans {
531                cumulative += prob;
532                if r <= cumulative {
533                    case.current_activity = Some(*target);
534                    return false;
535                }
536            }
537
538            // Fallback to first transition
539            case.current_activity = Some(trans[0].0);
540            false
541        } else {
542            // No transitions, case complete
543            case.is_complete = true;
544            true
545        }
546    }
547
548    /// Start parallel activities for a case.
549    fn start_parallel_activities(
550        &mut self,
551        case: &mut CaseState,
552        parallel_def: ParallelActivityDef,
553    ) -> bool {
554        // Get the join activity ID
555        let join_activity = match self.registry.get_by_name(parallel_def.join_to) {
556            Some(a) => a.id,
557            None => {
558                // Can't find join activity, fall back to sequential
559                return false;
560            }
561        };
562
563        // Create parallel activity states - all start at the SAME time (concurrency!)
564        let fork_time = case.last_event_time;
565        let mut parallel_states = Vec::new();
566
567        for activity_name in &parallel_def.parallel_activities {
568            if let Some(activity) = self.registry.get_by_name(activity_name) {
569                let base_duration = activity.expected_duration_ms;
570                // Add some variance to duration
571                let std_dev = base_duration as f32 * 0.3;
572                let dist = Normal::new(base_duration as f64, std_dev as f64).unwrap();
573                let duration = dist.sample(&mut self.rng).max(1000.0) as u32;
574
575                parallel_states.push(ParallelActivityState {
576                    activity_id: activity.id,
577                    start_time: fork_time,
578                    duration_ms: duration,
579                    event_emitted: false,
580                });
581            }
582        }
583
584        if parallel_states.is_empty() {
585            return false;
586        }
587
588        // Enter parallel mode
589        case.current_activity = None; // No single current activity
590        case.parallel_activities = parallel_states;
591        case.join_to = Some(join_activity);
592
593        false // Not complete
594    }
595
596    /// Get the sector template.
597    pub fn sector(&self) -> &SectorTemplate {
598        &self.sector
599    }
600
601    /// Get the activity registry.
602    pub fn registry(&self) -> &ActivityRegistry {
603        &self.registry
604    }
605
606    /// Number of active cases.
607    pub fn active_case_count(&self) -> usize {
608        self.active_cases.len()
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615
616    #[test]
617    fn test_generator_creation() {
618        let sector = SectorTemplate::default();
619        let config = GeneratorConfig::default();
620        let generator = ProcessEventGenerator::new(sector, config);
621
622        assert_eq!(generator.active_case_count(), 0);
623    }
624
625    #[test]
626    fn test_batch_generation() {
627        let sector = SectorTemplate::default();
628        let config = GeneratorConfig {
629            concurrent_cases: 10,
630            ..Default::default()
631        };
632        let mut generator = ProcessEventGenerator::new(sector, config);
633
634        let events = generator.generate_batch(100);
635        assert!(!events.is_empty());
636        assert!(generator.stats().total_events > 0);
637    }
638
639    #[test]
640    fn test_deterministic_with_seed() {
641        let sector = SectorTemplate::default();
642        let config = GeneratorConfig {
643            seed: Some(42),
644            concurrent_cases: 5,
645            ..Default::default()
646        };
647
648        let mut gen1 = ProcessEventGenerator::new(sector.clone(), config.clone());
649        let events1 = gen1.generate_batch(50);
650
651        let mut gen2 = ProcessEventGenerator::new(sector, config);
652        let events2 = gen2.generate_batch(50);
653
654        // With same seed, batches should have similar lengths (within 20%)
655        // Note: exact determinism may vary due to timestamp-based logic
656        assert!(!events1.is_empty());
657        assert!(!events2.is_empty());
658        let len_diff = (events1.len() as i32 - events2.len() as i32).abs();
659        let avg_len = (events1.len() + events2.len()) / 2;
660        assert!(
661            len_diff <= (avg_len as f32 * 0.3) as i32,
662            "Batch sizes too different: {} vs {}",
663            events1.len(),
664            events2.len()
665        );
666    }
667
668    #[test]
669    fn test_parallel_activity_generation() {
670        use crate::fabric::HealthcareConfig;
671
672        // Healthcare has Lab Tests and Imaging that can run in parallel
673        let sector = SectorTemplate::Healthcare(HealthcareConfig::default());
674        let config = GeneratorConfig {
675            seed: Some(12345), // Use seed for reproducibility
676            concurrent_cases: 50,
677            ..Default::default()
678        };
679        let mut generator = ProcessEventGenerator::new(sector, config);
680
681        // Generate enough events to likely trigger parallel activities
682        let events = generator.generate_batch(2000);
683        assert!(!events.is_empty());
684
685        // Group events by case
686        let mut cases: std::collections::HashMap<u64, Vec<&GpuObjectEvent>> =
687            std::collections::HashMap::new();
688        for event in &events {
689            cases.entry(event.object_id).or_default().push(event);
690        }
691
692        // Check for cases where Lab Tests (4) and Imaging (5) have overlapping timestamps
693        let mut found_parallel = false;
694        for case_events in cases.values() {
695            let lab_tests: Vec<_> = case_events.iter().filter(|e| e.activity_id == 4).collect();
696            let imaging: Vec<_> = case_events.iter().filter(|e| e.activity_id == 5).collect();
697
698            if !lab_tests.is_empty() && !imaging.is_empty() {
699                // Check if they have the same start time (parallel execution)
700                for lt in &lab_tests {
701                    for img in &imaging {
702                        if lt.timestamp.physical_ms == img.timestamp.physical_ms {
703                            found_parallel = true;
704                            break;
705                        }
706                    }
707                }
708            }
709        }
710
711        // With 40% probability of parallel execution in Healthcare config,
712        // we should find at least one case with parallel activities
713        assert!(
714            found_parallel,
715            "Expected to find parallel Lab Tests and Imaging activities"
716        );
717    }
718}