1#![allow(missing_docs)]
2pub mod schemas;
8
9use std::collections::HashMap;
10
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13
14#[derive(Debug, Clone)]
16pub struct GeneratorConfig {
17 pub schema: SchemaType,
19 pub rate: u64,
21 pub duration_secs: u64,
23 pub anomaly_rate: f64,
25 pub seed: Option<u64>,
27}
28
29impl Default for GeneratorConfig {
30 fn default() -> Self {
31 Self {
32 schema: SchemaType::Fraud,
33 rate: 1000,
34 duration_secs: 60,
35 anomaly_rate: 0.05,
36 seed: None,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum SchemaType {
44 Fraud,
45 Iot,
46 Sysmon,
47 Trading,
48}
49
50impl std::str::FromStr for SchemaType {
51 type Err = String;
52 fn from_str(s: &str) -> Result<Self, Self::Err> {
53 match s.to_lowercase().as_str() {
54 "fraud" => Ok(Self::Fraud),
55 "iot" => Ok(Self::Iot),
56 "sysmon" => Ok(Self::Sysmon),
57 "trading" => Ok(Self::Trading),
58 other => Err(format!(
59 "Unknown schema: {other}. Available: fraud, iot, sysmon, trading"
60 )),
61 }
62 }
63}
64
65impl std::fmt::Display for SchemaType {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Fraud => write!(f, "fraud"),
69 Self::Iot => write!(f, "iot"),
70 Self::Sysmon => write!(f, "sysmon"),
71 Self::Trading => write!(f, "trading"),
72 }
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct GeneratedEvent {
79 pub event_type: String,
80 pub timestamp: DateTime<Utc>,
81 pub fields: HashMap<String, serde_json::Value>,
82 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
84 pub is_anomaly: bool,
85}
86
87pub trait EventSchema: Send + Sync {
89 fn next_event(&mut self) -> GeneratedEvent;
91
92 fn event_types(&self) -> Vec<String>;
94
95 fn description(&self) -> &str;
97}
98
99pub fn create_schema(schema_type: SchemaType, seed: Option<u64>) -> Box<dyn EventSchema> {
101 match schema_type {
102 SchemaType::Fraud => Box::new(schemas::fraud::FraudSchema::new(seed)),
103 SchemaType::Iot => Box::new(schemas::iot::IotSchema::new(seed)),
104 SchemaType::Sysmon => Box::new(schemas::sysmon::SysmonSchema::new(seed)),
105 SchemaType::Trading => Box::new(schemas::trading::TradingSchema::new(seed)),
106 }
107}
108
109pub fn generate_batch(config: &GeneratorConfig, count: usize) -> Vec<GeneratedEvent> {
111 let mut schema = create_schema(config.schema, config.seed);
112 (0..count).map(|_| schema.next_event()).collect()
113}
114
115#[cfg(test)]
116mod tests {
117 use std::collections::HashSet;
118
119 use super::*;
120
121 #[test]
126 fn default_config_has_expected_values() {
127 let config = GeneratorConfig::default();
128 assert_eq!(config.schema, SchemaType::Fraud);
129 assert_eq!(config.rate, 1000);
130 assert_eq!(config.duration_secs, 60);
131 assert!((config.anomaly_rate - 0.05).abs() < f64::EPSILON);
132 assert!(config.seed.is_none());
133 }
134
135 #[test]
140 fn schema_type_from_str_valid() {
141 assert_eq!("fraud".parse::<SchemaType>().unwrap(), SchemaType::Fraud);
142 assert_eq!("iot".parse::<SchemaType>().unwrap(), SchemaType::Iot);
143 assert_eq!(
144 "trading".parse::<SchemaType>().unwrap(),
145 SchemaType::Trading
146 );
147 }
148
149 #[test]
150 fn schema_type_from_str_case_insensitive() {
151 assert_eq!("FRAUD".parse::<SchemaType>().unwrap(), SchemaType::Fraud);
152 assert_eq!("IoT".parse::<SchemaType>().unwrap(), SchemaType::Iot);
153 assert_eq!(
154 "Trading".parse::<SchemaType>().unwrap(),
155 SchemaType::Trading
156 );
157 }
158
159 #[test]
160 fn schema_type_from_str_invalid() {
161 let err = "unknown".parse::<SchemaType>().unwrap_err();
162 assert!(err.contains("Unknown schema"));
163 assert!(err.contains("fraud"));
164 }
165
166 #[test]
167 fn schema_type_display() {
168 assert_eq!(SchemaType::Fraud.to_string(), "fraud");
169 assert_eq!(SchemaType::Iot.to_string(), "iot");
170 assert_eq!(SchemaType::Trading.to_string(), "trading");
171 }
172
173 #[test]
174 fn schema_type_display_roundtrips() {
175 for st in [SchemaType::Fraud, SchemaType::Iot, SchemaType::Trading] {
176 let s = st.to_string();
177 let parsed: SchemaType = s.parse().unwrap();
178 assert_eq!(parsed, st);
179 }
180 }
181
182 #[test]
187 fn create_schema_returns_correct_type() {
188 let fraud = create_schema(SchemaType::Fraud, Some(42));
189 assert!(fraud.description().contains("fraud") || fraud.description().contains("Banking"));
190
191 let iot = create_schema(SchemaType::Iot, Some(42));
192 assert!(iot.description().contains("IoT") || iot.description().contains("sensor"));
193
194 let trading = create_schema(SchemaType::Trading, Some(42));
195 assert!(trading.description().contains("Market") || trading.description().contains("trad"));
196 }
197
198 #[test]
203 fn generate_batch_returns_correct_count() {
204 let config = GeneratorConfig {
205 seed: Some(42),
206 ..Default::default()
207 };
208 assert_eq!(generate_batch(&config, 0).len(), 0);
209 assert_eq!(generate_batch(&config, 1).len(), 1);
210 assert_eq!(generate_batch(&config, 100).len(), 100);
211 }
212
213 #[test]
214 fn generate_batch_deterministic_with_seed() {
215 let config = GeneratorConfig {
216 seed: Some(123),
217 ..Default::default()
218 };
219 let batch1 = generate_batch(&config, 50);
220 let batch2 = generate_batch(&config, 50);
221
222 for (a, b) in batch1.iter().zip(batch2.iter()) {
223 assert_eq!(a.event_type, b.event_type);
224 assert_eq!(a.is_anomaly, b.is_anomaly);
225 assert_eq!(a.fields.len(), b.fields.len());
226 }
227 }
228
229 #[test]
230 fn generate_batch_events_have_nonempty_type() {
231 let config = GeneratorConfig {
232 seed: Some(42),
233 ..Default::default()
234 };
235 for event in generate_batch(&config, 200) {
236 assert!(!event.event_type.is_empty());
237 }
238 }
239
240 #[test]
245 fn generated_event_json_roundtrip() {
246 let config = GeneratorConfig {
247 seed: Some(42),
248 ..Default::default()
249 };
250 let events = generate_batch(&config, 10);
251 for event in &events {
252 let json = serde_json::to_string(event).unwrap();
253 let restored: GeneratedEvent = serde_json::from_str(&json).unwrap();
254 assert_eq!(restored.event_type, event.event_type);
255 assert_eq!(restored.is_anomaly, event.is_anomaly);
256 }
257 }
258
259 #[test]
260 fn is_anomaly_skipped_when_false() {
261 let event = GeneratedEvent {
262 event_type: "test".into(),
263 timestamp: Utc::now(),
264 fields: HashMap::new(),
265 is_anomaly: false,
266 };
267 let json = serde_json::to_string(&event).unwrap();
268 assert!(!json.contains("is_anomaly"));
269 }
270
271 #[test]
272 fn is_anomaly_present_when_true() {
273 let event = GeneratedEvent {
274 event_type: "test".into(),
275 timestamp: Utc::now(),
276 fields: HashMap::new(),
277 is_anomaly: true,
278 };
279 let json = serde_json::to_string(&event).unwrap();
280 assert!(json.contains("is_anomaly"));
281 }
282
283 #[test]
288 fn fraud_event_types() {
289 let schema = schemas::fraud::FraudSchema::new(Some(42));
290 let types = schema.event_types();
291 assert!(types.contains(&"login".to_string()));
292 assert!(types.contains(&"transaction".to_string()));
293 assert!(types.contains(&"transfer".to_string()));
294 assert!(types.contains(&"card_payment".to_string()));
295 }
296
297 #[test]
298 fn fraud_generates_valid_events() {
299 let mut schema = schemas::fraud::FraudSchema::new(Some(42));
300 let allowed: HashSet<&str> = ["login", "transaction", "transfer", "card_payment"]
301 .iter()
302 .copied()
303 .collect();
304
305 for _ in 0..500 {
306 let event = schema.next_event();
307 assert!(
308 allowed.contains(event.event_type.as_str()),
309 "Unexpected event type: {}",
310 event.event_type
311 );
312 assert!(event.fields.contains_key("user_id"));
313 }
314 }
315
316 #[test]
317 fn fraud_anomalies_are_injected() {
318 let mut schema = schemas::fraud::FraudSchema::new(Some(42));
319 let mut anomaly_count = 0;
320 let n = 2000;
321 for _ in 0..n {
322 if schema.next_event().is_anomaly {
323 anomaly_count += 1;
324 }
325 }
326 assert!(anomaly_count > 0, "No anomalies in {n} events");
328 assert!(anomaly_count < n, "All events were anomalies");
330 }
331
332 #[test]
333 fn fraud_anomaly_sequence_has_login_then_transfers() {
334 let mut schema = schemas::fraud::FraudSchema::new(Some(42));
335 let events: Vec<_> = (0..5000).map(|_| schema.next_event()).collect();
336
337 let mut found_login = false;
339 let mut found_transfer = false;
340 for event in &events {
341 if event.is_anomaly && event.event_type == "login" {
342 found_login = true;
343 assert!(event.fields.contains_key("new_location"));
344 }
345 if event.is_anomaly && event.event_type == "transfer" {
346 found_transfer = true;
347 let amount = event.fields["amount"].as_f64().unwrap();
348 assert!(
349 (5000.0..=50000.0).contains(&amount),
350 "Anomaly transfer amount {amount} out of range"
351 );
352 }
353 }
354 assert!(found_login, "No anomaly login found in 5000 events");
355 assert!(found_transfer, "No anomaly transfer found in 5000 events");
356 }
357
358 #[test]
359 fn fraud_login_events_have_required_fields() {
360 let mut schema = schemas::fraud::FraudSchema::new(Some(42));
361 for _ in 0..500 {
362 let event = schema.next_event();
363 if event.event_type == "login" && !event.is_anomaly {
364 assert!(event.fields.contains_key("city"));
365 assert!(event.fields.contains_key("success"));
366 assert!(event.fields.contains_key("device"));
367 }
368 }
369 }
370
371 #[test]
372 fn fraud_transaction_events_have_amount() {
373 let mut schema = schemas::fraud::FraudSchema::new(Some(42));
374 for _ in 0..500 {
375 let event = schema.next_event();
376 if event.event_type == "transaction" || event.event_type == "card_payment" {
377 assert!(event.fields.contains_key("amount"));
378 assert!(event.fields.contains_key("merchant"));
379 let amount = event.fields["amount"].as_f64().unwrap();
380 assert!((5.0..=500.0).contains(&amount));
381 }
382 }
383 }
384
385 #[test]
390 fn iot_event_types() {
391 let schema = schemas::iot::IotSchema::new(Some(42));
392 let types = schema.event_types();
393 assert!(types.contains(&"sensor_reading".to_string()));
394 assert!(types.contains(&"sensor_alert".to_string()));
395 assert!(types.contains(&"sensor_heartbeat".to_string()));
396 }
397
398 #[test]
399 fn iot_generates_valid_events() {
400 let mut schema = schemas::iot::IotSchema::new(Some(42));
401 let allowed: HashSet<&str> = ["sensor_reading", "sensor_alert", "sensor_heartbeat"]
402 .iter()
403 .copied()
404 .collect();
405
406 for _ in 0..500 {
407 let event = schema.next_event();
408 assert!(
409 allowed.contains(event.event_type.as_str()),
410 "Unexpected event type: {}",
411 event.event_type
412 );
413 assert!(event.fields.contains_key("sensor_id"));
414 assert!(event.fields.contains_key("zone"));
415 assert!(event.fields.contains_key("temperature"));
416 assert!(event.fields.contains_key("humidity"));
417 assert!(event.fields.contains_key("pressure"));
418 }
419 }
420
421 #[test]
422 fn iot_sensors_use_correct_zones() {
423 let mut schema = schemas::iot::IotSchema::new(Some(42));
424 let valid_zones: HashSet<&str> = ["zone_a", "zone_b", "zone_c", "zone_d"]
425 .iter()
426 .copied()
427 .collect();
428
429 for _ in 0..200 {
430 let event = schema.next_event();
431 let zone = event.fields["zone"].as_str().unwrap();
432 assert!(valid_zones.contains(zone), "Unexpected zone: {zone}");
433 }
434 }
435
436 #[test]
437 fn iot_anomalies_cause_temperature_spike() {
438 let mut schema = schemas::iot::IotSchema::new(Some(42));
439 let mut found_anomaly = false;
440 for _ in 0..2000 {
441 let event = schema.next_event();
442 if event.is_anomaly {
443 found_anomaly = true;
444 }
445 }
446 assert!(found_anomaly, "No anomalies in 2000 IoT events");
447 }
448
449 #[test]
450 fn iot_alert_events_have_alert_type() {
451 let mut schema = schemas::iot::IotSchema::new(Some(42));
452 for _ in 0..500 {
453 let event = schema.next_event();
454 if event.event_type == "sensor_alert" {
455 assert!(event.fields.contains_key("alert_type"));
456 }
457 }
458 }
459
460 #[test]
465 fn trading_event_types() {
466 let schema = schemas::trading::TradingSchema::new(Some(42));
467 let types = schema.event_types();
468 assert!(types.contains(&"trade".to_string()));
469 assert!(types.contains(&"quote".to_string()));
470 assert!(types.contains(&"order_new".to_string()));
471 assert!(types.contains(&"order_cancel".to_string()));
472 }
473
474 #[test]
475 fn trading_generates_valid_events() {
476 let mut schema = schemas::trading::TradingSchema::new(Some(42));
477 let allowed: HashSet<&str> = ["trade", "quote", "order_new", "order_cancel"]
478 .iter()
479 .copied()
480 .collect();
481
482 for _ in 0..500 {
483 let event = schema.next_event();
484 assert!(
485 allowed.contains(event.event_type.as_str()),
486 "Unexpected event type: {}",
487 event.event_type
488 );
489 assert!(event.fields.contains_key("symbol"));
490 assert!(event.fields.contains_key("exchange"));
491 assert!(event.fields.contains_key("price"));
492 }
493 }
494
495 #[test]
496 fn trading_prices_are_positive() {
497 let mut schema = schemas::trading::TradingSchema::new(Some(42));
498 for _ in 0..1000 {
499 let event = schema.next_event();
500 let price = event.fields["price"].as_f64().unwrap();
501 assert!(price > 0.0, "Price must be positive, got {price}");
502 }
503 }
504
505 #[test]
506 fn trading_trade_events_have_volume_and_side() {
507 let mut schema = schemas::trading::TradingSchema::new(Some(42));
508 for _ in 0..500 {
509 let event = schema.next_event();
510 if event.event_type == "trade" {
511 assert!(event.fields.contains_key("volume"));
512 assert!(event.fields.contains_key("side"));
513 assert!(event.fields.contains_key("trade_id"));
514 let side = event.fields["side"].as_str().unwrap();
515 assert!(side == "buy" || side == "sell");
516 }
517 }
518 }
519
520 #[test]
521 fn trading_quote_events_have_bid_ask() {
522 let mut schema = schemas::trading::TradingSchema::new(Some(42));
523 for _ in 0..500 {
524 let event = schema.next_event();
525 if event.event_type == "quote" {
526 assert!(event.fields.contains_key("bid"));
527 assert!(event.fields.contains_key("ask"));
528 let bid = event.fields["bid"].as_f64().unwrap();
529 let ask = event.fields["ask"].as_f64().unwrap();
530 assert!(bid < ask, "Bid {bid} should be less than ask {ask}");
531 }
532 }
533 }
534
535 #[test]
536 fn trading_anomalies_have_large_volume() {
537 let mut schema = schemas::trading::TradingSchema::new(Some(42));
538 let mut found_anomaly_trade = false;
539 for _ in 0..5000 {
540 let event = schema.next_event();
541 if event.is_anomaly && event.event_type == "trade" {
542 found_anomaly_trade = true;
543 let volume = event.fields["volume"].as_u64().unwrap();
544 assert!(
545 volume >= 10000,
546 "Anomaly trade volume should be >= 10000, got {volume}"
547 );
548 }
549 }
550 assert!(found_anomaly_trade, "No anomaly trades in 5000 events");
551 }
552
553 #[test]
554 fn trading_uses_known_symbols() {
555 let mut schema = schemas::trading::TradingSchema::new(Some(42));
556 let valid: HashSet<&str> = [
557 "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META", "NVDA", "JPM",
558 ]
559 .iter()
560 .copied()
561 .collect();
562
563 for _ in 0..200 {
564 let event = schema.next_event();
565 let sym = event.fields["symbol"].as_str().unwrap();
566 assert!(valid.contains(sym), "Unknown symbol: {sym}");
567 }
568 }
569
570 #[test]
575 fn all_schemas_deterministic_with_same_seed() {
576 for schema_type in [SchemaType::Fraud, SchemaType::Iot, SchemaType::Trading] {
577 let mut s1 = create_schema(schema_type, Some(99));
578 let mut s2 = create_schema(schema_type, Some(99));
579
580 for i in 0..100 {
581 let e1 = s1.next_event();
582 let e2 = s2.next_event();
583 assert_eq!(
584 e1.event_type, e2.event_type,
585 "{schema_type} event {i}: types differ"
586 );
587 assert_eq!(
588 e1.is_anomaly, e2.is_anomaly,
589 "{schema_type} event {i}: anomaly flag differs"
590 );
591 }
592 }
593 }
594
595 #[test]
596 fn different_seeds_produce_different_sequences() {
597 let mut s1 = create_schema(SchemaType::Fraud, Some(1));
598 let mut s2 = create_schema(SchemaType::Fraud, Some(2));
599
600 let events1: Vec<_> = (0..50).map(|_| s1.next_event().event_type).collect();
601 let events2: Vec<_> = (0..50).map(|_| s2.next_event().event_type).collect();
602
603 assert_ne!(events1, events2);
605 }
606}