Skip to main content

varpulis_datagen/
lib.rs

1#![allow(missing_docs)]
2//! Event generation library for Varpulis demos and testing.
3//!
4//! Provides pluggable event schemas (fraud, IoT, trading) with configurable
5//! rates, pattern injection, and output formats.
6
7pub mod schemas;
8
9use std::collections::HashMap;
10
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13
14/// Configuration for the event generator.
15#[derive(Debug, Clone)]
16pub struct GeneratorConfig {
17    /// Schema to use for event generation.
18    pub schema: SchemaType,
19    /// Events per second to generate.
20    pub rate: u64,
21    /// Total duration in seconds (0 = infinite).
22    pub duration_secs: u64,
23    /// Fraction of events that should contain injected anomaly patterns (0.0-1.0).
24    pub anomaly_rate: f64,
25    /// Random seed for reproducibility (None = random).
26    pub seed: Option<u64>,
27}
28
29impl Default for GeneratorConfig {
30    fn default() -> Self {
31        Self {
32            schema: SchemaType::Fraud,
33            rate: 1000,
34            duration_secs: 60,
35            anomaly_rate: 0.05,
36            seed: None,
37        }
38    }
39}
40
41/// Available event generation schemas.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum SchemaType {
44    Fraud,
45    Iot,
46    Sysmon,
47    Trading,
48}
49
50impl std::str::FromStr for SchemaType {
51    type Err = String;
52    fn from_str(s: &str) -> Result<Self, Self::Err> {
53        match s.to_lowercase().as_str() {
54            "fraud" => Ok(Self::Fraud),
55            "iot" => Ok(Self::Iot),
56            "sysmon" => Ok(Self::Sysmon),
57            "trading" => Ok(Self::Trading),
58            other => Err(format!(
59                "Unknown schema: {other}. Available: fraud, iot, sysmon, trading"
60            )),
61        }
62    }
63}
64
65impl std::fmt::Display for SchemaType {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Fraud => write!(f, "fraud"),
69            Self::Iot => write!(f, "iot"),
70            Self::Sysmon => write!(f, "sysmon"),
71            Self::Trading => write!(f, "trading"),
72        }
73    }
74}
75
76/// A generated event.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct GeneratedEvent {
79    pub event_type: String,
80    pub timestamp: DateTime<Utc>,
81    pub fields: HashMap<String, serde_json::Value>,
82    /// Whether this event is part of an injected anomaly pattern.
83    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
84    pub is_anomaly: bool,
85}
86
87/// Trait for event generation schemas.
88pub trait EventSchema: Send + Sync {
89    /// Generate the next event.
90    fn next_event(&mut self) -> GeneratedEvent;
91
92    /// Get the list of event types this schema can produce.
93    fn event_types(&self) -> Vec<String>;
94
95    /// Get a description of the schema.
96    fn description(&self) -> &str;
97}
98
99/// Create an event schema from a schema type.
100pub fn create_schema(schema_type: SchemaType, seed: Option<u64>) -> Box<dyn EventSchema> {
101    match schema_type {
102        SchemaType::Fraud => Box::new(schemas::fraud::FraudSchema::new(seed)),
103        SchemaType::Iot => Box::new(schemas::iot::IotSchema::new(seed)),
104        SchemaType::Sysmon => Box::new(schemas::sysmon::SysmonSchema::new(seed)),
105        SchemaType::Trading => Box::new(schemas::trading::TradingSchema::new(seed)),
106    }
107}
108
109/// Generate a batch of events.
110pub fn generate_batch(config: &GeneratorConfig, count: usize) -> Vec<GeneratedEvent> {
111    let mut schema = create_schema(config.schema, config.seed);
112    (0..count).map(|_| schema.next_event()).collect()
113}
114
115#[cfg(test)]
116mod tests {
117    use std::collections::HashSet;
118
119    use super::*;
120
121    // =========================================================================
122    // GeneratorConfig
123    // =========================================================================
124
125    #[test]
126    fn default_config_has_expected_values() {
127        let config = GeneratorConfig::default();
128        assert_eq!(config.schema, SchemaType::Fraud);
129        assert_eq!(config.rate, 1000);
130        assert_eq!(config.duration_secs, 60);
131        assert!((config.anomaly_rate - 0.05).abs() < f64::EPSILON);
132        assert!(config.seed.is_none());
133    }
134
135    // =========================================================================
136    // SchemaType
137    // =========================================================================
138
139    #[test]
140    fn schema_type_from_str_valid() {
141        assert_eq!("fraud".parse::<SchemaType>().unwrap(), SchemaType::Fraud);
142        assert_eq!("iot".parse::<SchemaType>().unwrap(), SchemaType::Iot);
143        assert_eq!(
144            "trading".parse::<SchemaType>().unwrap(),
145            SchemaType::Trading
146        );
147    }
148
149    #[test]
150    fn schema_type_from_str_case_insensitive() {
151        assert_eq!("FRAUD".parse::<SchemaType>().unwrap(), SchemaType::Fraud);
152        assert_eq!("IoT".parse::<SchemaType>().unwrap(), SchemaType::Iot);
153        assert_eq!(
154            "Trading".parse::<SchemaType>().unwrap(),
155            SchemaType::Trading
156        );
157    }
158
159    #[test]
160    fn schema_type_from_str_invalid() {
161        let err = "unknown".parse::<SchemaType>().unwrap_err();
162        assert!(err.contains("Unknown schema"));
163        assert!(err.contains("fraud"));
164    }
165
166    #[test]
167    fn schema_type_display() {
168        assert_eq!(SchemaType::Fraud.to_string(), "fraud");
169        assert_eq!(SchemaType::Iot.to_string(), "iot");
170        assert_eq!(SchemaType::Trading.to_string(), "trading");
171    }
172
173    #[test]
174    fn schema_type_display_roundtrips() {
175        for st in [SchemaType::Fraud, SchemaType::Iot, SchemaType::Trading] {
176            let s = st.to_string();
177            let parsed: SchemaType = s.parse().unwrap();
178            assert_eq!(parsed, st);
179        }
180    }
181
182    // =========================================================================
183    // create_schema factory
184    // =========================================================================
185
186    #[test]
187    fn create_schema_returns_correct_type() {
188        let fraud = create_schema(SchemaType::Fraud, Some(42));
189        assert!(fraud.description().contains("fraud") || fraud.description().contains("Banking"));
190
191        let iot = create_schema(SchemaType::Iot, Some(42));
192        assert!(iot.description().contains("IoT") || iot.description().contains("sensor"));
193
194        let trading = create_schema(SchemaType::Trading, Some(42));
195        assert!(trading.description().contains("Market") || trading.description().contains("trad"));
196    }
197
198    // =========================================================================
199    // generate_batch
200    // =========================================================================
201
202    #[test]
203    fn generate_batch_returns_correct_count() {
204        let config = GeneratorConfig {
205            seed: Some(42),
206            ..Default::default()
207        };
208        assert_eq!(generate_batch(&config, 0).len(), 0);
209        assert_eq!(generate_batch(&config, 1).len(), 1);
210        assert_eq!(generate_batch(&config, 100).len(), 100);
211    }
212
213    #[test]
214    fn generate_batch_deterministic_with_seed() {
215        let config = GeneratorConfig {
216            seed: Some(123),
217            ..Default::default()
218        };
219        let batch1 = generate_batch(&config, 50);
220        let batch2 = generate_batch(&config, 50);
221
222        for (a, b) in batch1.iter().zip(batch2.iter()) {
223            assert_eq!(a.event_type, b.event_type);
224            assert_eq!(a.is_anomaly, b.is_anomaly);
225            assert_eq!(a.fields.len(), b.fields.len());
226        }
227    }
228
229    #[test]
230    fn generate_batch_events_have_nonempty_type() {
231        let config = GeneratorConfig {
232            seed: Some(42),
233            ..Default::default()
234        };
235        for event in generate_batch(&config, 200) {
236            assert!(!event.event_type.is_empty());
237        }
238    }
239
240    // =========================================================================
241    // GeneratedEvent serialization
242    // =========================================================================
243
244    #[test]
245    fn generated_event_json_roundtrip() {
246        let config = GeneratorConfig {
247            seed: Some(42),
248            ..Default::default()
249        };
250        let events = generate_batch(&config, 10);
251        for event in &events {
252            let json = serde_json::to_string(event).unwrap();
253            let restored: GeneratedEvent = serde_json::from_str(&json).unwrap();
254            assert_eq!(restored.event_type, event.event_type);
255            assert_eq!(restored.is_anomaly, event.is_anomaly);
256        }
257    }
258
259    #[test]
260    fn is_anomaly_skipped_when_false() {
261        let event = GeneratedEvent {
262            event_type: "test".into(),
263            timestamp: Utc::now(),
264            fields: HashMap::new(),
265            is_anomaly: false,
266        };
267        let json = serde_json::to_string(&event).unwrap();
268        assert!(!json.contains("is_anomaly"));
269    }
270
271    #[test]
272    fn is_anomaly_present_when_true() {
273        let event = GeneratedEvent {
274            event_type: "test".into(),
275            timestamp: Utc::now(),
276            fields: HashMap::new(),
277            is_anomaly: true,
278        };
279        let json = serde_json::to_string(&event).unwrap();
280        assert!(json.contains("is_anomaly"));
281    }
282
283    // =========================================================================
284    // Fraud schema
285    // =========================================================================
286
287    #[test]
288    fn fraud_event_types() {
289        let schema = schemas::fraud::FraudSchema::new(Some(42));
290        let types = schema.event_types();
291        assert!(types.contains(&"login".to_string()));
292        assert!(types.contains(&"transaction".to_string()));
293        assert!(types.contains(&"transfer".to_string()));
294        assert!(types.contains(&"card_payment".to_string()));
295    }
296
297    #[test]
298    fn fraud_generates_valid_events() {
299        let mut schema = schemas::fraud::FraudSchema::new(Some(42));
300        let allowed: HashSet<&str> = ["login", "transaction", "transfer", "card_payment"]
301            .iter()
302            .copied()
303            .collect();
304
305        for _ in 0..500 {
306            let event = schema.next_event();
307            assert!(
308                allowed.contains(event.event_type.as_str()),
309                "Unexpected event type: {}",
310                event.event_type
311            );
312            assert!(event.fields.contains_key("user_id"));
313        }
314    }
315
316    #[test]
317    fn fraud_anomalies_are_injected() {
318        let mut schema = schemas::fraud::FraudSchema::new(Some(42));
319        let mut anomaly_count = 0;
320        let n = 2000;
321        for _ in 0..n {
322            if schema.next_event().is_anomaly {
323                anomaly_count += 1;
324            }
325        }
326        // Anomalies should exist (5% base rate + multi-event sequences)
327        assert!(anomaly_count > 0, "No anomalies in {n} events");
328        // But not all events should be anomalies
329        assert!(anomaly_count < n, "All events were anomalies");
330    }
331
332    #[test]
333    fn fraud_anomaly_sequence_has_login_then_transfers() {
334        let mut schema = schemas::fraud::FraudSchema::new(Some(42));
335        let events: Vec<_> = (0..5000).map(|_| schema.next_event()).collect();
336
337        // Find anomaly sequences: login (with new_location) followed by high-value transfers
338        let mut found_login = false;
339        let mut found_transfer = false;
340        for event in &events {
341            if event.is_anomaly && event.event_type == "login" {
342                found_login = true;
343                assert!(event.fields.contains_key("new_location"));
344            }
345            if event.is_anomaly && event.event_type == "transfer" {
346                found_transfer = true;
347                let amount = event.fields["amount"].as_f64().unwrap();
348                assert!(
349                    (5000.0..=50000.0).contains(&amount),
350                    "Anomaly transfer amount {amount} out of range"
351                );
352            }
353        }
354        assert!(found_login, "No anomaly login found in 5000 events");
355        assert!(found_transfer, "No anomaly transfer found in 5000 events");
356    }
357
358    #[test]
359    fn fraud_login_events_have_required_fields() {
360        let mut schema = schemas::fraud::FraudSchema::new(Some(42));
361        for _ in 0..500 {
362            let event = schema.next_event();
363            if event.event_type == "login" && !event.is_anomaly {
364                assert!(event.fields.contains_key("city"));
365                assert!(event.fields.contains_key("success"));
366                assert!(event.fields.contains_key("device"));
367            }
368        }
369    }
370
371    #[test]
372    fn fraud_transaction_events_have_amount() {
373        let mut schema = schemas::fraud::FraudSchema::new(Some(42));
374        for _ in 0..500 {
375            let event = schema.next_event();
376            if event.event_type == "transaction" || event.event_type == "card_payment" {
377                assert!(event.fields.contains_key("amount"));
378                assert!(event.fields.contains_key("merchant"));
379                let amount = event.fields["amount"].as_f64().unwrap();
380                assert!((5.0..=500.0).contains(&amount));
381            }
382        }
383    }
384
385    // =========================================================================
386    // IoT schema
387    // =========================================================================
388
389    #[test]
390    fn iot_event_types() {
391        let schema = schemas::iot::IotSchema::new(Some(42));
392        let types = schema.event_types();
393        assert!(types.contains(&"sensor_reading".to_string()));
394        assert!(types.contains(&"sensor_alert".to_string()));
395        assert!(types.contains(&"sensor_heartbeat".to_string()));
396    }
397
398    #[test]
399    fn iot_generates_valid_events() {
400        let mut schema = schemas::iot::IotSchema::new(Some(42));
401        let allowed: HashSet<&str> = ["sensor_reading", "sensor_alert", "sensor_heartbeat"]
402            .iter()
403            .copied()
404            .collect();
405
406        for _ in 0..500 {
407            let event = schema.next_event();
408            assert!(
409                allowed.contains(event.event_type.as_str()),
410                "Unexpected event type: {}",
411                event.event_type
412            );
413            assert!(event.fields.contains_key("sensor_id"));
414            assert!(event.fields.contains_key("zone"));
415            assert!(event.fields.contains_key("temperature"));
416            assert!(event.fields.contains_key("humidity"));
417            assert!(event.fields.contains_key("pressure"));
418        }
419    }
420
421    #[test]
422    fn iot_sensors_use_correct_zones() {
423        let mut schema = schemas::iot::IotSchema::new(Some(42));
424        let valid_zones: HashSet<&str> = ["zone_a", "zone_b", "zone_c", "zone_d"]
425            .iter()
426            .copied()
427            .collect();
428
429        for _ in 0..200 {
430            let event = schema.next_event();
431            let zone = event.fields["zone"].as_str().unwrap();
432            assert!(valid_zones.contains(zone), "Unexpected zone: {zone}");
433        }
434    }
435
436    #[test]
437    fn iot_anomalies_cause_temperature_spike() {
438        let mut schema = schemas::iot::IotSchema::new(Some(42));
439        let mut found_anomaly = false;
440        for _ in 0..2000 {
441            let event = schema.next_event();
442            if event.is_anomaly {
443                found_anomaly = true;
444            }
445        }
446        assert!(found_anomaly, "No anomalies in 2000 IoT events");
447    }
448
449    #[test]
450    fn iot_alert_events_have_alert_type() {
451        let mut schema = schemas::iot::IotSchema::new(Some(42));
452        for _ in 0..500 {
453            let event = schema.next_event();
454            if event.event_type == "sensor_alert" {
455                assert!(event.fields.contains_key("alert_type"));
456            }
457        }
458    }
459
460    // =========================================================================
461    // Trading schema
462    // =========================================================================
463
464    #[test]
465    fn trading_event_types() {
466        let schema = schemas::trading::TradingSchema::new(Some(42));
467        let types = schema.event_types();
468        assert!(types.contains(&"trade".to_string()));
469        assert!(types.contains(&"quote".to_string()));
470        assert!(types.contains(&"order_new".to_string()));
471        assert!(types.contains(&"order_cancel".to_string()));
472    }
473
474    #[test]
475    fn trading_generates_valid_events() {
476        let mut schema = schemas::trading::TradingSchema::new(Some(42));
477        let allowed: HashSet<&str> = ["trade", "quote", "order_new", "order_cancel"]
478            .iter()
479            .copied()
480            .collect();
481
482        for _ in 0..500 {
483            let event = schema.next_event();
484            assert!(
485                allowed.contains(event.event_type.as_str()),
486                "Unexpected event type: {}",
487                event.event_type
488            );
489            assert!(event.fields.contains_key("symbol"));
490            assert!(event.fields.contains_key("exchange"));
491            assert!(event.fields.contains_key("price"));
492        }
493    }
494
495    #[test]
496    fn trading_prices_are_positive() {
497        let mut schema = schemas::trading::TradingSchema::new(Some(42));
498        for _ in 0..1000 {
499            let event = schema.next_event();
500            let price = event.fields["price"].as_f64().unwrap();
501            assert!(price > 0.0, "Price must be positive, got {price}");
502        }
503    }
504
505    #[test]
506    fn trading_trade_events_have_volume_and_side() {
507        let mut schema = schemas::trading::TradingSchema::new(Some(42));
508        for _ in 0..500 {
509            let event = schema.next_event();
510            if event.event_type == "trade" {
511                assert!(event.fields.contains_key("volume"));
512                assert!(event.fields.contains_key("side"));
513                assert!(event.fields.contains_key("trade_id"));
514                let side = event.fields["side"].as_str().unwrap();
515                assert!(side == "buy" || side == "sell");
516            }
517        }
518    }
519
520    #[test]
521    fn trading_quote_events_have_bid_ask() {
522        let mut schema = schemas::trading::TradingSchema::new(Some(42));
523        for _ in 0..500 {
524            let event = schema.next_event();
525            if event.event_type == "quote" {
526                assert!(event.fields.contains_key("bid"));
527                assert!(event.fields.contains_key("ask"));
528                let bid = event.fields["bid"].as_f64().unwrap();
529                let ask = event.fields["ask"].as_f64().unwrap();
530                assert!(bid < ask, "Bid {bid} should be less than ask {ask}");
531            }
532        }
533    }
534
535    #[test]
536    fn trading_anomalies_have_large_volume() {
537        let mut schema = schemas::trading::TradingSchema::new(Some(42));
538        let mut found_anomaly_trade = false;
539        for _ in 0..5000 {
540            let event = schema.next_event();
541            if event.is_anomaly && event.event_type == "trade" {
542                found_anomaly_trade = true;
543                let volume = event.fields["volume"].as_u64().unwrap();
544                assert!(
545                    volume >= 10000,
546                    "Anomaly trade volume should be >= 10000, got {volume}"
547                );
548            }
549        }
550        assert!(found_anomaly_trade, "No anomaly trades in 5000 events");
551    }
552
553    #[test]
554    fn trading_uses_known_symbols() {
555        let mut schema = schemas::trading::TradingSchema::new(Some(42));
556        let valid: HashSet<&str> = [
557            "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", "META", "NVDA", "JPM",
558        ]
559        .iter()
560        .copied()
561        .collect();
562
563        for _ in 0..200 {
564            let event = schema.next_event();
565            let sym = event.fields["symbol"].as_str().unwrap();
566            assert!(valid.contains(sym), "Unknown symbol: {sym}");
567        }
568    }
569
570    // =========================================================================
571    // Cross-schema determinism
572    // =========================================================================
573
574    #[test]
575    fn all_schemas_deterministic_with_same_seed() {
576        for schema_type in [SchemaType::Fraud, SchemaType::Iot, SchemaType::Trading] {
577            let mut s1 = create_schema(schema_type, Some(99));
578            let mut s2 = create_schema(schema_type, Some(99));
579
580            for i in 0..100 {
581                let e1 = s1.next_event();
582                let e2 = s2.next_event();
583                assert_eq!(
584                    e1.event_type, e2.event_type,
585                    "{schema_type} event {i}: types differ"
586                );
587                assert_eq!(
588                    e1.is_anomaly, e2.is_anomaly,
589                    "{schema_type} event {i}: anomaly flag differs"
590                );
591            }
592        }
593    }
594
595    #[test]
596    fn different_seeds_produce_different_sequences() {
597        let mut s1 = create_schema(SchemaType::Fraud, Some(1));
598        let mut s2 = create_schema(SchemaType::Fraud, Some(2));
599
600        let events1: Vec<_> = (0..50).map(|_| s1.next_event().event_type).collect();
601        let events2: Vec<_> = (0..50).map(|_| s2.next_event().event_type).collect();
602
603        // Very unlikely (but not impossible) for 50 events to be identical with different seeds
604        assert_ne!(events1, events2);
605    }
606}