1use super::{AnomalyConfig, ParallelActivityDef, SectorTemplate};
4use crate::models::{ActivityId, ActivityRegistry, EventType, GpuObjectEvent, HybridTimestamp};
5use rand::prelude::*;
6use rand_distr::Normal;
7use std::collections::HashMap;
8
9#[derive(Debug, Clone)]
11pub struct GeneratorConfig {
12 pub events_per_second: u32,
14 pub batch_size: usize,
16 pub concurrent_cases: u32,
18 pub deviation_rate: f32,
20 pub anomalies: AnomalyConfig,
22 pub seed: Option<u64>,
24}
25
26impl Default for GeneratorConfig {
27 fn default() -> Self {
28 Self {
29 events_per_second: 10_000,
30 batch_size: 1000,
31 concurrent_cases: 100,
32 deviation_rate: 0.1,
33 anomalies: AnomalyConfig::default(),
34 seed: None,
35 }
36 }
37}
38
39impl GeneratorConfig {
40 pub fn with_sector(self, _sector: SectorTemplate) -> Self {
42 self
43 }
44
45 pub fn with_events_per_second(mut self, eps: u32) -> Self {
47 self.events_per_second = eps;
48 self
49 }
50
51 pub fn with_deviation_rate(mut self, rate: f32) -> Self {
53 self.deviation_rate = rate;
54 self
55 }
56
57 pub fn with_anomalies(mut self, anomalies: AnomalyConfig) -> Self {
59 self.anomalies = anomalies;
60 self
61 }
62}
63
64#[derive(Debug, Clone)]
66struct CaseState {
67 case_id: u64,
69 current_activity: Option<ActivityId>,
71 completed_activities: Vec<ActivityId>,
73 last_event_time: HybridTimestamp,
75 is_complete: bool,
77 anomaly: Option<AnomalyType>,
79 parallel_activities: Vec<ParallelActivityState>,
81 join_to: Option<ActivityId>,
83}
84
85#[derive(Debug, Clone)]
87struct ParallelActivityState {
88 activity_id: ActivityId,
90 start_time: HybridTimestamp,
92 duration_ms: u32,
94 event_emitted: bool,
96}
97
98#[derive(Debug, Clone, Copy)]
100enum AnomalyType {
101 Bottleneck,
102 Rework,
103 LongRunning,
104 Skip,
105}
106
107pub struct ProcessEventGenerator {
109 sector: SectorTemplate,
111 config: GeneratorConfig,
113 registry: ActivityRegistry,
115 transitions: HashMap<ActivityId, Vec<(ActivityId, f32, u32)>>,
117 parallel_defs: HashMap<ActivityId, ParallelActivityDef>,
119 rng: StdRng,
121 next_event_id: u64,
123 next_case_id: u64,
125 active_cases: HashMap<u64, CaseState>,
127 current_time: HybridTimestamp,
129 stats: GeneratorStats,
131}
132
133#[derive(Debug, Clone, Default)]
135pub struct GeneratorStats {
136 pub total_events: u64,
138 pub cases_started: u64,
140 pub cases_completed: u64,
142 pub bottleneck_count: u64,
144 pub rework_count: u64,
146 pub long_running_count: u64,
148 pub skip_count: u64,
150}
151
152impl ProcessEventGenerator {
153 pub fn new(sector: SectorTemplate, config: GeneratorConfig) -> Self {
155 let rng = match config.seed {
156 Some(seed) => StdRng::seed_from_u64(seed),
157 None => StdRng::from_entropy(),
158 };
159
160 let registry = sector.build_registry();
161 let transitions = Self::build_transition_map(§or, ®istry);
162 let parallel_defs = Self::build_parallel_map(§or, ®istry);
163
164 Self {
165 sector,
166 config,
167 registry,
168 transitions,
169 parallel_defs,
170 rng,
171 next_event_id: 1,
172 next_case_id: 1,
173 active_cases: HashMap::new(),
174 current_time: HybridTimestamp::now(),
175 stats: GeneratorStats::default(),
176 }
177 }
178
179 fn build_parallel_map(
181 sector: &SectorTemplate,
182 registry: &ActivityRegistry,
183 ) -> HashMap<ActivityId, ParallelActivityDef> {
184 let mut map = HashMap::new();
185 for def in sector.parallel_activities() {
186 if let Some(fork_activity) = registry.get_by_name(def.fork_from) {
187 map.insert(fork_activity.id, def);
188 }
189 }
190 map
191 }
192
193 fn build_transition_map(
195 sector: &SectorTemplate,
196 registry: &ActivityRegistry,
197 ) -> HashMap<ActivityId, Vec<(ActivityId, f32, u32)>> {
198 let mut map: HashMap<ActivityId, Vec<(ActivityId, f32, u32)>> = HashMap::new();
199
200 for trans in sector.transitions() {
201 if let (Some(source), Some(target)) = (
202 registry.get_by_name(trans.source),
203 registry.get_by_name(trans.target),
204 ) {
205 map.entry(source.id).or_default().push((
206 target.id,
207 trans.probability,
208 trans.avg_transition_ms,
209 ));
210 }
211 }
212
213 for transitions in map.values_mut() {
215 let total: f32 = transitions.iter().map(|(_, p, _)| p).sum();
216 if total > 0.0 {
217 for (_, p, _) in transitions.iter_mut() {
218 *p /= total;
219 }
220 }
221 }
222
223 map
224 }
225
226 pub fn stats(&self) -> &GeneratorStats {
228 &self.stats
229 }
230
231 pub fn throughput(&self) -> f32 {
233 self.config.events_per_second as f32
234 }
235
236 pub fn generate_batch(&mut self, batch_size: usize) -> Vec<GpuObjectEvent> {
238 let mut events = Vec::with_capacity(batch_size);
239
240 while self.active_cases.len() < self.config.concurrent_cases as usize {
242 self.start_new_case();
243 }
244
245 for _ in 0..batch_size {
246 if let Some(event) = self.generate_next_event() {
247 events.push(event);
248 }
249 }
250
251 events
252 }
253
254 fn start_new_case(&mut self) {
256 let case_id = self.next_case_id;
257 self.next_case_id += 1;
258
259 let start_names = self.sector.start_activities();
261 let start_name = start_names[self.rng.gen_range(0..start_names.len())];
262 let start_activity = self
263 .registry
264 .get_by_name(start_name)
265 .map(|a| a.id)
266 .unwrap_or(1);
267
268 let anomaly = self.determine_anomaly();
270
271 let state = CaseState {
272 case_id,
273 current_activity: Some(start_activity),
274 completed_activities: Vec::new(),
275 last_event_time: self.current_time,
276 is_complete: false,
277 anomaly,
278 parallel_activities: Vec::new(),
279 join_to: None,
280 };
281
282 self.active_cases.insert(case_id, state);
283 self.stats.cases_started += 1;
284
285 if let Some(anomaly) = anomaly {
286 match anomaly {
287 AnomalyType::Bottleneck => self.stats.bottleneck_count += 1,
288 AnomalyType::Rework => self.stats.rework_count += 1,
289 AnomalyType::LongRunning => self.stats.long_running_count += 1,
290 AnomalyType::Skip => self.stats.skip_count += 1,
291 }
292 }
293 }
294
295 fn determine_anomaly(&mut self) -> Option<AnomalyType> {
297 let r: f32 = self.rng.gen();
298
299 if r < self.config.anomalies.bottleneck_rate {
300 Some(AnomalyType::Bottleneck)
301 } else if r < self.config.anomalies.bottleneck_rate + self.config.anomalies.rework_rate {
302 Some(AnomalyType::Rework)
303 } else if r < self.config.anomalies.bottleneck_rate
304 + self.config.anomalies.rework_rate
305 + self.config.anomalies.long_running_rate
306 {
307 Some(AnomalyType::LongRunning)
308 } else if r < self.config.anomalies.bottleneck_rate
309 + self.config.anomalies.rework_rate
310 + self.config.anomalies.long_running_rate
311 + self.config.anomalies.skip_rate
312 {
313 Some(AnomalyType::Skip)
314 } else {
315 None
316 }
317 }
318
319 fn generate_next_event(&mut self) -> Option<GpuObjectEvent> {
321 let case_ids: Vec<u64> = self.active_cases.keys().copied().collect();
323 if case_ids.is_empty() {
324 return None;
325 }
326
327 let case_id = case_ids[self.rng.gen_range(0..case_ids.len())];
328
329 let mut case = self.active_cases.remove(&case_id)?;
331
332 if !case.parallel_activities.is_empty() {
334 let event = self.create_parallel_event(&mut case);
336
337 if case.parallel_activities.iter().all(|p| p.event_emitted) {
339 case.parallel_activities.clear();
341 if let Some(join_activity) = case.join_to.take() {
342 case.current_activity = Some(join_activity);
343 }
344 }
345
346 self.active_cases.insert(case_id, case);
347 return event;
348 }
349
350 let event = self.create_event(&mut case);
352
353 let is_complete = self.transition_case(&mut case);
355
356 if is_complete {
357 self.stats.cases_completed += 1;
358 } else {
359 self.active_cases.insert(case_id, case);
361 }
362
363 Some(event)
364 }
365
366 fn create_parallel_event(&mut self, case: &mut CaseState) -> Option<GpuObjectEvent> {
368 let idx = case
370 .parallel_activities
371 .iter()
372 .position(|p| !p.event_emitted)?;
373
374 let parallel = &mut case.parallel_activities[idx];
375 parallel.event_emitted = true;
376
377 let event_id = self.next_event_id;
378 self.next_event_id += 1;
379
380 let timestamp = parallel.start_time;
382 let duration = parallel.duration_ms;
383 let activity_id = parallel.activity_id;
384
385 case.completed_activities.push(activity_id);
386 self.stats.total_events += 1;
387
388 let event_end = HybridTimestamp::new(
390 timestamp.physical_ms + duration as u64,
391 timestamp.logical + 1,
392 );
393 if event_end.physical_ms > case.last_event_time.physical_ms {
394 case.last_event_time = event_end;
395 }
396
397 if event_end.physical_ms > self.current_time.physical_ms {
399 self.current_time = event_end;
400 }
401
402 Some(GpuObjectEvent {
403 event_id,
404 object_id: case.case_id,
405 activity_id,
406 event_type: EventType::Complete as u8,
407 _padding1: [0; 3],
408 timestamp, resource_id: self.rng.gen_range(1..100),
410 cost: self.rng.gen_range(10.0..1000.0),
411 duration_ms: duration,
412 flags: 0,
413 attributes: [0; 4],
414 object_type_id: 0,
415 related_object_id: 0,
416 _reserved: [0; 36],
417 })
418 }
419
420 fn create_event(&mut self, case: &mut CaseState) -> GpuObjectEvent {
422 let event_id = self.next_event_id;
423 self.next_event_id += 1;
424
425 let current_activity = case.current_activity.unwrap_or(1);
426
427 let activity = self.registry.get(current_activity);
429 let base_duration = activity.map(|a| a.expected_duration_ms).unwrap_or(60_000);
430
431 let duration = match case.anomaly {
433 Some(AnomalyType::Bottleneck) | Some(AnomalyType::LongRunning) => {
434 (base_duration as f32 * self.rng.gen_range(3.0..10.0)) as u32
435 }
436 _ => {
437 let std_dev = base_duration as f32 * 0.3;
439 let dist = Normal::new(base_duration as f64, std_dev as f64).unwrap();
440 dist.sample(&mut self.rng).max(1000.0) as u32
441 }
442 };
443
444 let activity_start_time = case.last_event_time;
447
448 case.last_event_time = HybridTimestamp::new(
450 activity_start_time.physical_ms + duration as u64,
451 activity_start_time.logical + 1,
452 );
453
454 if case.last_event_time.physical_ms > self.current_time.physical_ms {
456 self.current_time = case.last_event_time;
457 }
458
459 case.completed_activities.push(current_activity);
460 self.stats.total_events += 1;
461
462 GpuObjectEvent {
463 event_id,
464 object_id: case.case_id,
465 activity_id: current_activity,
466 event_type: EventType::Complete as u8,
467 _padding1: [0; 3],
468 timestamp: activity_start_time, resource_id: self.rng.gen_range(1..100),
470 cost: self.rng.gen_range(10.0..1000.0),
471 duration_ms: duration,
472 flags: 0,
473 attributes: [0; 4],
474 object_type_id: 0,
475 related_object_id: 0,
476 _reserved: [0; 36],
477 }
478 }
479
480 fn transition_case(&mut self, case: &mut CaseState) -> bool {
482 let current_activity = match case.current_activity {
483 Some(act) => act,
484 None => {
485 case.is_complete = true;
486 return true;
487 }
488 };
489
490 let end_names = self.sector.end_activities();
492 if let Some(activity) = self.registry.get(current_activity) {
493 if end_names.contains(&activity.name.as_str()) {
494 case.is_complete = true;
495 return true;
496 }
497 }
498
499 if let Some(parallel_def) = self.parallel_defs.get(¤t_activity) {
501 if self.rng.gen::<f32>() < parallel_def.probability {
503 return self.start_parallel_activities(case, parallel_def.clone());
505 }
506 }
507
508 let transitions = self.transitions.get(¤t_activity);
510
511 if let Some(trans) = transitions {
512 if trans.is_empty() {
513 case.is_complete = true;
514 return true;
515 }
516
517 if matches!(case.anomaly, Some(AnomalyType::Rework))
519 && self.rng.gen::<f32>() < 0.3
520 && case.completed_activities.len() >= 2
521 {
522 let prev = case.completed_activities[case.completed_activities.len() - 2];
523 case.current_activity = Some(prev);
524 return false;
525 }
526
527 let r: f32 = self.rng.gen();
529 let mut cumulative = 0.0;
530 for (target, prob, _time) in trans {
531 cumulative += prob;
532 if r <= cumulative {
533 case.current_activity = Some(*target);
534 return false;
535 }
536 }
537
538 case.current_activity = Some(trans[0].0);
540 false
541 } else {
542 case.is_complete = true;
544 true
545 }
546 }
547
548 fn start_parallel_activities(
550 &mut self,
551 case: &mut CaseState,
552 parallel_def: ParallelActivityDef,
553 ) -> bool {
554 let join_activity = match self.registry.get_by_name(parallel_def.join_to) {
556 Some(a) => a.id,
557 None => {
558 return false;
560 }
561 };
562
563 let fork_time = case.last_event_time;
565 let mut parallel_states = Vec::new();
566
567 for activity_name in ¶llel_def.parallel_activities {
568 if let Some(activity) = self.registry.get_by_name(activity_name) {
569 let base_duration = activity.expected_duration_ms;
570 let std_dev = base_duration as f32 * 0.3;
572 let dist = Normal::new(base_duration as f64, std_dev as f64).unwrap();
573 let duration = dist.sample(&mut self.rng).max(1000.0) as u32;
574
575 parallel_states.push(ParallelActivityState {
576 activity_id: activity.id,
577 start_time: fork_time,
578 duration_ms: duration,
579 event_emitted: false,
580 });
581 }
582 }
583
584 if parallel_states.is_empty() {
585 return false;
586 }
587
588 case.current_activity = None; case.parallel_activities = parallel_states;
591 case.join_to = Some(join_activity);
592
593 false }
595
596 pub fn sector(&self) -> &SectorTemplate {
598 &self.sector
599 }
600
601 pub fn registry(&self) -> &ActivityRegistry {
603 &self.registry
604 }
605
606 pub fn active_case_count(&self) -> usize {
608 self.active_cases.len()
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615
616 #[test]
617 fn test_generator_creation() {
618 let sector = SectorTemplate::default();
619 let config = GeneratorConfig::default();
620 let generator = ProcessEventGenerator::new(sector, config);
621
622 assert_eq!(generator.active_case_count(), 0);
623 }
624
625 #[test]
626 fn test_batch_generation() {
627 let sector = SectorTemplate::default();
628 let config = GeneratorConfig {
629 concurrent_cases: 10,
630 ..Default::default()
631 };
632 let mut generator = ProcessEventGenerator::new(sector, config);
633
634 let events = generator.generate_batch(100);
635 assert!(!events.is_empty());
636 assert!(generator.stats().total_events > 0);
637 }
638
639 #[test]
640 fn test_deterministic_with_seed() {
641 let sector = SectorTemplate::default();
642 let config = GeneratorConfig {
643 seed: Some(42),
644 concurrent_cases: 5,
645 ..Default::default()
646 };
647
648 let mut gen1 = ProcessEventGenerator::new(sector.clone(), config.clone());
649 let events1 = gen1.generate_batch(50);
650
651 let mut gen2 = ProcessEventGenerator::new(sector, config);
652 let events2 = gen2.generate_batch(50);
653
654 assert!(!events1.is_empty());
657 assert!(!events2.is_empty());
658 let len_diff = (events1.len() as i32 - events2.len() as i32).abs();
659 let avg_len = (events1.len() + events2.len()) / 2;
660 assert!(
661 len_diff <= (avg_len as f32 * 0.3) as i32,
662 "Batch sizes too different: {} vs {}",
663 events1.len(),
664 events2.len()
665 );
666 }
667
668 #[test]
669 fn test_parallel_activity_generation() {
670 use crate::fabric::HealthcareConfig;
671
672 let sector = SectorTemplate::Healthcare(HealthcareConfig::default());
674 let config = GeneratorConfig {
675 seed: Some(12345), concurrent_cases: 50,
677 ..Default::default()
678 };
679 let mut generator = ProcessEventGenerator::new(sector, config);
680
681 let events = generator.generate_batch(2000);
683 assert!(!events.is_empty());
684
685 let mut cases: std::collections::HashMap<u64, Vec<&GpuObjectEvent>> =
687 std::collections::HashMap::new();
688 for event in &events {
689 cases.entry(event.object_id).or_default().push(event);
690 }
691
692 let mut found_parallel = false;
694 for case_events in cases.values() {
695 let lab_tests: Vec<_> = case_events.iter().filter(|e| e.activity_id == 4).collect();
696 let imaging: Vec<_> = case_events.iter().filter(|e| e.activity_id == 5).collect();
697
698 if !lab_tests.is_empty() && !imaging.is_empty() {
699 for lt in &lab_tests {
701 for img in &imaging {
702 if lt.timestamp.physical_ms == img.timestamp.physical_ms {
703 found_parallel = true;
704 break;
705 }
706 }
707 }
708 }
709 }
710
711 assert!(
714 found_parallel,
715 "Expected to find parallel Lab Tests and Imaging activities"
716 );
717 }
718}