Skip to main content

oxirs_stream/
testing_framework.rs

1//! # Stream Testing Framework
2//!
3//! This module provides a comprehensive testing framework for stream
4//! applications, enabling developers to write reliable and maintainable tests.
5//!
6//! ## Features
7//! - Test harness for stream applications
8//! - Mock streams and event generators
9//! - Time manipulation for testing windows
10//! - Assertions for stream output
11//! - Performance testing utilities
12//! - Test fixtures and builders
13//! - Test report generation
14//!
15//! ## Example
16//! ```rust,ignore
17//! use oxirs_stream::testing_framework::*;
18//!
19//! #[tokio::test]
20//! async fn test_stream_processing() {
21//!     let harness = TestHarness::builder()
22//!         .with_mock_clock()
23//!         .with_event_generator(EventGenerator::sequential(100))
24//!         .build()
25//!         .await?;
26//!
27//!     harness.push_events(vec![/* events */]).await;
28//!     harness.advance_time(Duration::from_secs(60)).await;
29//!
30//!     assert_stream_output!(harness, contains(expected));
31//! }
32//! ```
33
34use anyhow::{anyhow, Result};
35use chrono::{DateTime, Utc};
36use serde::{Deserialize, Serialize};
37use std::collections::{HashMap, VecDeque};
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{mpsc, RwLock};
42use tracing::{debug, info};
43use uuid::Uuid;
44
45use crate::event::{EventMetadata, StreamEvent};
46
47/// Configuration for the test harness
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct TestHarnessConfig {
50    /// Use mock clock for time manipulation
51    pub use_mock_clock: bool,
52    /// Initial mock time
53    pub initial_time: Option<DateTime<Utc>>,
54    /// Event buffer size
55    pub event_buffer_size: usize,
56    /// Timeout for assertions
57    pub assertion_timeout: Duration,
58    /// Enable verbose logging
59    pub verbose: bool,
60    /// Capture all events for inspection
61    pub capture_events: bool,
62    /// Maximum events to capture
63    pub max_captured_events: usize,
64    /// Enable performance metrics
65    pub enable_metrics: bool,
66}
67
68impl Default for TestHarnessConfig {
69    fn default() -> Self {
70        Self {
71            use_mock_clock: true,
72            initial_time: None,
73            event_buffer_size: 10000,
74            assertion_timeout: Duration::from_secs(10),
75            verbose: false,
76            capture_events: true,
77            max_captured_events: 100000,
78            enable_metrics: true,
79        }
80    }
81}
82
83/// Mock clock for time manipulation in tests
84pub struct MockClock {
85    /// Current time
86    current_time: Arc<RwLock<DateTime<Utc>>>,
87    /// Time advancement listeners
88    listeners: Arc<RwLock<Vec<mpsc::Sender<DateTime<Utc>>>>>,
89}
90
91impl MockClock {
92    /// Create a new mock clock
93    pub fn new(initial_time: DateTime<Utc>) -> Self {
94        Self {
95            current_time: Arc::new(RwLock::new(initial_time)),
96            listeners: Arc::new(RwLock::new(Vec::new())),
97        }
98    }
99
100    /// Get current time
101    pub async fn now(&self) -> DateTime<Utc> {
102        *self.current_time.read().await
103    }
104
105    /// Advance time by duration
106    pub async fn advance(&self, duration: Duration) {
107        let mut time = self.current_time.write().await;
108        *time += chrono::Duration::from_std(duration).unwrap_or_default();
109
110        let new_time = *time;
111        drop(time);
112
113        // Notify listeners
114        let listeners = self.listeners.read().await;
115        for listener in listeners.iter() {
116            let _ = listener.send(new_time).await;
117        }
118    }
119
120    /// Set time to specific value
121    pub async fn set_time(&self, time: DateTime<Utc>) {
122        let mut current = self.current_time.write().await;
123        *current = time;
124
125        drop(current);
126
127        let listeners = self.listeners.read().await;
128        for listener in listeners.iter() {
129            let _ = listener.send(time).await;
130        }
131    }
132
133    /// Subscribe to time changes
134    pub async fn subscribe(&self) -> mpsc::Receiver<DateTime<Utc>> {
135        let (tx, rx) = mpsc::channel(100);
136        let mut listeners = self.listeners.write().await;
137        listeners.push(tx);
138        rx
139    }
140}
141
142/// Event generator for creating test events
143pub struct EventGenerator {
144    /// Generator type
145    generator_type: GeneratorType,
146    /// Event counter
147    counter: AtomicU64,
148    /// Configuration
149    config: GeneratorConfig,
150}
151
152/// Generator configuration
153#[derive(Debug, Clone)]
154pub struct GeneratorConfig {
155    /// Source name for events
156    pub source: String,
157    /// Event properties template
158    pub properties: HashMap<String, String>,
159    /// Timestamp increment
160    pub timestamp_increment: Duration,
161}
162
163impl Default for GeneratorConfig {
164    fn default() -> Self {
165        Self {
166            source: "test_generator".to_string(),
167            properties: HashMap::new(),
168            timestamp_increment: Duration::from_millis(100),
169        }
170    }
171}
172
173/// Types of event generators
174#[derive(Debug, Clone)]
175pub enum GeneratorType {
176    /// Sequential integer events
177    Sequential { start: u64, step: u64 },
178    /// Random events
179    Random { min: f64, max: f64 },
180    /// Cyclic pattern
181    Cyclic { pattern: Vec<f64>, index: usize },
182    /// Gaussian distribution
183    Gaussian { mean: f64, stddev: f64 },
184    /// Custom generator function
185    Custom,
186}
187
188impl EventGenerator {
189    /// Create a sequential generator
190    pub fn sequential(_count: u64) -> Self {
191        Self {
192            generator_type: GeneratorType::Sequential { start: 0, step: 1 },
193            counter: AtomicU64::new(0),
194            config: GeneratorConfig::default(),
195        }
196    }
197
198    /// Create a random generator
199    pub fn random(min: f64, max: f64) -> Self {
200        Self {
201            generator_type: GeneratorType::Random { min, max },
202            counter: AtomicU64::new(0),
203            config: GeneratorConfig::default(),
204        }
205    }
206
207    /// Create a cyclic generator
208    pub fn cyclic(pattern: Vec<f64>) -> Self {
209        Self {
210            generator_type: GeneratorType::Cyclic { pattern, index: 0 },
211            counter: AtomicU64::new(0),
212            config: GeneratorConfig::default(),
213        }
214    }
215
216    /// Create a gaussian generator
217    pub fn gaussian(mean: f64, stddev: f64) -> Self {
218        Self {
219            generator_type: GeneratorType::Gaussian { mean, stddev },
220            counter: AtomicU64::new(0),
221            config: GeneratorConfig::default(),
222        }
223    }
224
225    /// Set source name
226    pub fn with_source(mut self, source: String) -> Self {
227        self.config.source = source;
228        self
229    }
230
231    /// Set properties
232    pub fn with_properties(mut self, properties: HashMap<String, String>) -> Self {
233        self.config.properties = properties;
234        self
235    }
236
237    /// Generate next event
238    pub fn next_event(&self, timestamp: DateTime<Utc>) -> StreamEvent {
239        let count = self.counter.fetch_add(1, Ordering::SeqCst);
240
241        let value = match &self.generator_type {
242            GeneratorType::Sequential { start, step } => {
243                format!("{}", start + count * step)
244            }
245            GeneratorType::Random { min, max } => {
246                let range = max - min;
247                let value = min + (count as f64 % 1000.0) / 1000.0 * range;
248                format!("{:.2}", value)
249            }
250            GeneratorType::Cyclic { pattern, .. } => {
251                let index = count as usize % pattern.len();
252                format!("{:.2}", pattern[index])
253            }
254            GeneratorType::Gaussian { mean, stddev } => {
255                // Simple approximation
256                let value = mean + (count as f64 % 10.0 - 5.0) * stddev / 5.0;
257                format!("{:.2}", value)
258            }
259            GeneratorType::Custom => {
260                format!("{}", count)
261            }
262        };
263
264        let metadata = EventMetadata {
265            event_id: Uuid::new_v4().to_string(),
266            timestamp,
267            source: self.config.source.clone(),
268            user: None,
269            context: Some(format!("test_event_{}", count)),
270            caused_by: None,
271            version: "1.0".to_string(),
272            properties: self.config.properties.clone(),
273            checksum: None,
274        };
275
276        // Use TripleAdded as a test event type
277        StreamEvent::TripleAdded {
278            subject: format!("test:subject_{}", count),
279            predicate: "test:predicate".to_string(),
280            object: value,
281            graph: None,
282            metadata,
283        }
284    }
285
286    /// Generate batch of events
287    pub fn generate_batch(&self, count: usize, start_time: DateTime<Utc>) -> Vec<StreamEvent> {
288        let mut events = Vec::with_capacity(count);
289        let mut time = start_time;
290
291        for _ in 0..count {
292            events.push(self.next_event(time));
293            time += chrono::Duration::from_std(self.config.timestamp_increment).unwrap_or_default();
294        }
295
296        events
297    }
298}
299
300/// Test harness for stream testing
301pub struct TestHarness {
302    /// Configuration
303    config: TestHarnessConfig,
304    /// Mock clock
305    clock: Arc<MockClock>,
306    /// Event generator
307    generator: Option<Arc<EventGenerator>>,
308    /// Input events channel
309    input_tx: mpsc::Sender<StreamEvent>,
310    /// Input events receiver
311    input_rx: Arc<RwLock<mpsc::Receiver<StreamEvent>>>,
312    /// Output events
313    output_events: Arc<RwLock<VecDeque<StreamEvent>>>,
314    /// Captured events for inspection
315    captured_events: Arc<RwLock<Vec<CapturedEvent>>>,
316    /// Test metrics
317    metrics: Arc<RwLock<TestMetrics>>,
318    /// Assertions
319    assertions: Arc<RwLock<Vec<Assertion>>>,
320}
321
322/// Captured event with metadata
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct CapturedEvent {
325    /// Original event
326    pub event: StreamEvent,
327    /// Capture time
328    pub captured_at: DateTime<Utc>,
329    /// Processing time
330    pub processing_time: Option<Duration>,
331    /// Source
332    pub source: String,
333}
334
335/// Test metrics
336#[derive(Debug, Clone, Default, Serialize, Deserialize)]
337pub struct TestMetrics {
338    /// Total events pushed
339    pub events_pushed: u64,
340    /// Total events received
341    pub events_received: u64,
342    /// Total assertions
343    pub total_assertions: u64,
344    /// Passed assertions
345    pub passed_assertions: u64,
346    /// Failed assertions
347    pub failed_assertions: u64,
348    /// Average processing time
349    pub avg_processing_time_us: f64,
350    /// Max processing time
351    pub max_processing_time_us: u64,
352    /// Test duration
353    pub test_duration: Duration,
354    /// Memory usage
355    pub memory_usage_bytes: usize,
356}
357
358/// Assertion for testing
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct Assertion {
361    /// Assertion type
362    pub assertion_type: AssertionType,
363    /// Expected value
364    pub expected: String,
365    /// Actual value
366    pub actual: Option<String>,
367    /// Result
368    pub passed: bool,
369    /// Error message
370    pub error_message: Option<String>,
371    /// Timestamp
372    pub timestamp: DateTime<Utc>,
373}
374
375/// Assertion types
376#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
377pub enum AssertionType {
378    /// Assert event count
379    EventCount,
380    /// Assert event contains
381    Contains,
382    /// Assert event order
383    Order,
384    /// Assert no events
385    NoEvents,
386    /// Assert event property
387    Property,
388    /// Assert within duration
389    WithinDuration,
390    /// Assert performance
391    Performance,
392    /// Custom assertion
393    Custom(String),
394}
395
396/// Test harness builder
397pub struct TestHarnessBuilder {
398    config: TestHarnessConfig,
399    generator: Option<EventGenerator>,
400}
401
402impl TestHarnessBuilder {
403    /// Create a new builder
404    pub fn new() -> Self {
405        Self {
406            config: TestHarnessConfig::default(),
407            generator: None,
408        }
409    }
410
411    /// Use mock clock
412    pub fn with_mock_clock(mut self) -> Self {
413        self.config.use_mock_clock = true;
414        self
415    }
416
417    /// Set initial time
418    pub fn with_initial_time(mut self, time: DateTime<Utc>) -> Self {
419        self.config.initial_time = Some(time);
420        self
421    }
422
423    /// Set event buffer size
424    pub fn with_buffer_size(mut self, size: usize) -> Self {
425        self.config.event_buffer_size = size;
426        self
427    }
428
429    /// Set assertion timeout
430    pub fn with_timeout(mut self, timeout: Duration) -> Self {
431        self.config.assertion_timeout = timeout;
432        self
433    }
434
435    /// Enable verbose logging
436    pub fn verbose(mut self) -> Self {
437        self.config.verbose = true;
438        self
439    }
440
441    /// Set event generator
442    pub fn with_event_generator(mut self, generator: EventGenerator) -> Self {
443        self.generator = Some(generator);
444        self
445    }
446
447    /// Build the test harness
448    pub async fn build(self) -> Result<TestHarness> {
449        let initial_time = self.config.initial_time.unwrap_or_else(Utc::now);
450        let clock = Arc::new(MockClock::new(initial_time));
451
452        let (input_tx, input_rx) = mpsc::channel(self.config.event_buffer_size);
453
454        let harness = TestHarness {
455            config: self.config,
456            clock,
457            generator: self.generator.map(Arc::new),
458            input_tx,
459            input_rx: Arc::new(RwLock::new(input_rx)),
460            output_events: Arc::new(RwLock::new(VecDeque::new())),
461            captured_events: Arc::new(RwLock::new(Vec::new())),
462            metrics: Arc::new(RwLock::new(TestMetrics::default())),
463            assertions: Arc::new(RwLock::new(Vec::new())),
464        };
465
466        if harness.config.verbose {
467            info!("Test harness created with config: {:?}", harness.config);
468        }
469
470        Ok(harness)
471    }
472}
473
474impl Default for TestHarnessBuilder {
475    fn default() -> Self {
476        Self::new()
477    }
478}
479
480impl TestHarness {
481    /// Create a new test harness builder
482    pub fn builder() -> TestHarnessBuilder {
483        TestHarnessBuilder::new()
484    }
485
486    /// Get current mock time
487    pub async fn now(&self) -> DateTime<Utc> {
488        self.clock.now().await
489    }
490
491    /// Advance mock time
492    pub async fn advance_time(&self, duration: Duration) {
493        if self.config.verbose {
494            debug!("Advancing time by {:?}", duration);
495        }
496        self.clock.advance(duration).await;
497    }
498
499    /// Set mock time
500    pub async fn set_time(&self, time: DateTime<Utc>) {
501        if self.config.verbose {
502            debug!("Setting time to {:?}", time);
503        }
504        self.clock.set_time(time).await;
505    }
506
507    /// Push a single event
508    pub async fn push_event(&self, event: StreamEvent) -> Result<()> {
509        self.input_tx
510            .send(event.clone())
511            .await
512            .map_err(|e| anyhow!("Failed to push event: {}", e))?;
513
514        if self.config.capture_events {
515            let mut captured = self.captured_events.write().await;
516            if captured.len() < self.config.max_captured_events {
517                captured.push(CapturedEvent {
518                    event,
519                    captured_at: self.clock.now().await,
520                    processing_time: None,
521                    source: "input".to_string(),
522                });
523            }
524        }
525
526        let mut metrics = self.metrics.write().await;
527        metrics.events_pushed += 1;
528
529        Ok(())
530    }
531
532    /// Push multiple events
533    pub async fn push_events(&self, events: Vec<StreamEvent>) -> Result<()> {
534        for event in events {
535            self.push_event(event).await?;
536        }
537        Ok(())
538    }
539
540    /// Generate and push events
541    pub async fn generate_events(&self, count: usize) -> Result<()> {
542        if let Some(generator) = &self.generator {
543            let time = self.clock.now().await;
544            let events = generator.generate_batch(count, time);
545            self.push_events(events).await
546        } else {
547            Err(anyhow!("No event generator configured"))
548        }
549    }
550
551    /// Add output event (called by stream processor)
552    pub async fn add_output(&self, event: StreamEvent) {
553        let mut output = self.output_events.write().await;
554        output.push_back(event.clone());
555
556        if self.config.capture_events {
557            let mut captured = self.captured_events.write().await;
558            if captured.len() < self.config.max_captured_events {
559                captured.push(CapturedEvent {
560                    event,
561                    captured_at: self.clock.now().await,
562                    processing_time: None,
563                    source: "output".to_string(),
564                });
565            }
566        }
567
568        let mut metrics = self.metrics.write().await;
569        metrics.events_received += 1;
570    }
571
572    /// Get output events
573    pub async fn get_output(&self) -> Vec<StreamEvent> {
574        let output = self.output_events.read().await;
575        output.iter().cloned().collect()
576    }
577
578    /// Clear output events
579    pub async fn clear_output(&self) {
580        let mut output = self.output_events.write().await;
581        output.clear();
582    }
583
584    /// Get captured events
585    pub async fn get_captured_events(&self) -> Vec<CapturedEvent> {
586        let captured = self.captured_events.read().await;
587        captured.clone()
588    }
589
590    /// Assert event count
591    pub async fn assert_event_count(&self, expected: usize) -> Result<()> {
592        let output = self.output_events.read().await;
593        let actual = output.len();
594
595        let passed = actual == expected;
596        let error_message = if passed {
597            None
598        } else {
599            Some(format!("Expected {} events, got {}", expected, actual))
600        };
601
602        let assertion = Assertion {
603            assertion_type: AssertionType::EventCount,
604            expected: expected.to_string(),
605            actual: Some(actual.to_string()),
606            passed,
607            error_message: error_message.clone(),
608            timestamp: self.clock.now().await,
609        };
610
611        let mut assertions = self.assertions.write().await;
612        assertions.push(assertion);
613
614        let mut metrics = self.metrics.write().await;
615        metrics.total_assertions += 1;
616        if passed {
617            metrics.passed_assertions += 1;
618        } else {
619            metrics.failed_assertions += 1;
620        }
621
622        if passed {
623            Ok(())
624        } else {
625            Err(anyhow!(
626                error_message.expect("error_message should be set when assertion fails")
627            ))
628        }
629    }
630
631    /// Assert output contains event
632    pub async fn assert_contains(&self, predicate: impl Fn(&StreamEvent) -> bool) -> Result<()> {
633        let output = self.output_events.read().await;
634        let found = output.iter().any(predicate);
635
636        let passed = found;
637        let error_message = if passed {
638            None
639        } else {
640            Some("Expected event not found in output".to_string())
641        };
642
643        let assertion = Assertion {
644            assertion_type: AssertionType::Contains,
645            expected: "matching event".to_string(),
646            actual: Some(format!("{} events checked", output.len())),
647            passed,
648            error_message: error_message.clone(),
649            timestamp: self.clock.now().await,
650        };
651
652        let mut assertions = self.assertions.write().await;
653        assertions.push(assertion);
654
655        let mut metrics = self.metrics.write().await;
656        metrics.total_assertions += 1;
657        if passed {
658            metrics.passed_assertions += 1;
659        } else {
660            metrics.failed_assertions += 1;
661        }
662
663        if passed {
664            Ok(())
665        } else {
666            Err(anyhow!(
667                error_message.expect("error_message should be set when assertion fails")
668            ))
669        }
670    }
671
672    /// Assert no output events
673    pub async fn assert_no_events(&self) -> Result<()> {
674        self.assert_event_count(0).await
675    }
676
677    /// Assert events within duration
678    pub async fn assert_within(
679        &self,
680        duration: Duration,
681        condition: impl Fn(&[StreamEvent]) -> bool,
682    ) -> Result<()> {
683        let start = Instant::now();
684
685        while start.elapsed() < duration {
686            let output = self.output_events.read().await;
687            let events: Vec<_> = output.iter().cloned().collect();
688            drop(output);
689
690            if condition(&events) {
691                let assertion = Assertion {
692                    assertion_type: AssertionType::WithinDuration,
693                    expected: format!("condition within {:?}", duration),
694                    actual: Some(format!("satisfied after {:?}", start.elapsed())),
695                    passed: true,
696                    error_message: None,
697                    timestamp: self.clock.now().await,
698                };
699
700                let mut assertions = self.assertions.write().await;
701                assertions.push(assertion);
702
703                let mut metrics = self.metrics.write().await;
704                metrics.total_assertions += 1;
705                metrics.passed_assertions += 1;
706
707                return Ok(());
708            }
709
710            tokio::time::sleep(Duration::from_millis(10)).await;
711        }
712
713        let error_message = format!("Condition not satisfied within {:?}", duration);
714
715        let assertion = Assertion {
716            assertion_type: AssertionType::WithinDuration,
717            expected: format!("condition within {:?}", duration),
718            actual: Some("timeout".to_string()),
719            passed: false,
720            error_message: Some(error_message.clone()),
721            timestamp: self.clock.now().await,
722        };
723
724        let mut assertions = self.assertions.write().await;
725        assertions.push(assertion);
726
727        let mut metrics = self.metrics.write().await;
728        metrics.total_assertions += 1;
729        metrics.failed_assertions += 1;
730
731        Err(anyhow!(error_message))
732    }
733
734    /// Assert performance metric
735    pub async fn assert_performance(
736        &self,
737        metric: PerformanceMetric,
738        threshold: f64,
739    ) -> Result<()> {
740        let metrics = self.metrics.read().await;
741
742        let (actual, passed) = match metric {
743            PerformanceMetric::AvgLatency => (
744                metrics.avg_processing_time_us,
745                metrics.avg_processing_time_us <= threshold,
746            ),
747            PerformanceMetric::MaxLatency => (
748                metrics.max_processing_time_us as f64,
749                metrics.max_processing_time_us as f64 <= threshold,
750            ),
751            PerformanceMetric::Throughput => {
752                let throughput = if metrics.test_duration.as_secs_f64() > 0.0 {
753                    metrics.events_received as f64 / metrics.test_duration.as_secs_f64()
754                } else {
755                    0.0
756                };
757                (throughput, throughput >= threshold)
758            }
759        };
760
761        drop(metrics);
762
763        let error_message = if passed {
764            None
765        } else {
766            Some(format!(
767                "{:?} {} does not meet threshold {}",
768                metric, actual, threshold
769            ))
770        };
771
772        let assertion = Assertion {
773            assertion_type: AssertionType::Performance,
774            expected: format!("{:?} <= {}", metric, threshold),
775            actual: Some(actual.to_string()),
776            passed,
777            error_message: error_message.clone(),
778            timestamp: self.clock.now().await,
779        };
780
781        let mut assertions = self.assertions.write().await;
782        assertions.push(assertion);
783
784        let mut metrics = self.metrics.write().await;
785        metrics.total_assertions += 1;
786        if passed {
787            metrics.passed_assertions += 1;
788        } else {
789            metrics.failed_assertions += 1;
790        }
791
792        if passed {
793            Ok(())
794        } else {
795            Err(anyhow!(
796                error_message.expect("error_message should be set when assertion fails")
797            ))
798        }
799    }
800
801    /// Get test metrics
802    pub async fn get_metrics(&self) -> TestMetrics {
803        self.metrics.read().await.clone()
804    }
805
806    /// Get all assertions
807    pub async fn get_assertions(&self) -> Vec<Assertion> {
808        self.assertions.read().await.clone()
809    }
810
811    /// Generate test report
812    pub async fn generate_report(&self) -> TestReport {
813        let metrics = self.metrics.read().await;
814        let assertions = self.assertions.read().await;
815        let captured = self.captured_events.read().await;
816
817        TestReport {
818            test_name: "stream_test".to_string(),
819            status: if metrics.failed_assertions == 0 {
820                TestStatus::Passed
821            } else {
822                TestStatus::Failed
823            },
824            metrics: metrics.clone(),
825            assertions: assertions.clone(),
826            event_count: captured.len(),
827            generated_at: Utc::now(),
828        }
829    }
830
831    /// Reset harness state
832    pub async fn reset(&self) {
833        self.output_events.write().await.clear();
834        self.captured_events.write().await.clear();
835        *self.metrics.write().await = TestMetrics::default();
836        self.assertions.write().await.clear();
837
838        if self.config.verbose {
839            info!("Test harness reset");
840        }
841    }
842}
843
844/// Performance metrics
845#[derive(Debug, Clone, Copy)]
846pub enum PerformanceMetric {
847    /// Average latency in microseconds
848    AvgLatency,
849    /// Maximum latency in microseconds
850    MaxLatency,
851    /// Throughput in events per second
852    Throughput,
853}
854
855/// Test report
856#[derive(Debug, Clone, Serialize, Deserialize)]
857pub struct TestReport {
858    /// Test name
859    pub test_name: String,
860    /// Test status
861    pub status: TestStatus,
862    /// Test metrics
863    pub metrics: TestMetrics,
864    /// Assertions
865    pub assertions: Vec<Assertion>,
866    /// Total event count
867    pub event_count: usize,
868    /// Report generation time
869    pub generated_at: DateTime<Utc>,
870}
871
872/// Test status
873#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
874pub enum TestStatus {
875    Passed,
876    Failed,
877    Skipped,
878    Error,
879}
880
881impl TestReport {
882    /// Convert to JSON
883    pub fn to_json(&self) -> Result<String> {
884        serde_json::to_string_pretty(self).map_err(|e| anyhow!("JSON error: {}", e))
885    }
886
887    /// Print summary
888    pub fn print_summary(&self) {
889        println!("\n=== Test Report: {} ===", self.test_name);
890        println!("Status: {:?}", self.status);
891        println!("Events pushed: {}", self.metrics.events_pushed);
892        println!("Events received: {}", self.metrics.events_received);
893        println!(
894            "Assertions: {}/{} passed",
895            self.metrics.passed_assertions, self.metrics.total_assertions
896        );
897        if self.metrics.total_assertions > 0 && self.metrics.failed_assertions > 0 {
898            println!("Failed assertions:");
899            for assertion in &self.assertions {
900                if !assertion.passed {
901                    println!(
902                        "  - {:?}: {}",
903                        assertion.assertion_type,
904                        assertion.error_message.clone().unwrap_or_default()
905                    );
906                }
907            }
908        }
909        println!("========================\n");
910    }
911}
912
913/// Test fixture for common test scenarios
914pub struct TestFixture {
915    /// Name
916    pub name: String,
917    /// Setup events
918    pub setup_events: Vec<StreamEvent>,
919    /// Expected outputs
920    pub expected_outputs: Vec<StreamEvent>,
921    /// Time advancement
922    pub time_advance: Option<Duration>,
923}
924
925impl TestFixture {
926    /// Create a new fixture
927    pub fn new(name: &str) -> Self {
928        Self {
929            name: name.to_string(),
930            setup_events: Vec::new(),
931            expected_outputs: Vec::new(),
932            time_advance: None,
933        }
934    }
935
936    /// Add setup event
937    pub fn with_input(mut self, event: StreamEvent) -> Self {
938        self.setup_events.push(event);
939        self
940    }
941
942    /// Add expected output
943    pub fn expect_output(mut self, event: StreamEvent) -> Self {
944        self.expected_outputs.push(event);
945        self
946    }
947
948    /// Set time advancement
949    pub fn advance_time(mut self, duration: Duration) -> Self {
950        self.time_advance = Some(duration);
951        self
952    }
953
954    /// Run fixture with harness
955    pub async fn run(&self, harness: &TestHarness) -> Result<()> {
956        // Push setup events
957        harness.push_events(self.setup_events.clone()).await?;
958
959        // Advance time if configured
960        if let Some(duration) = self.time_advance {
961            harness.advance_time(duration).await;
962        }
963
964        // Verify outputs
965        harness
966            .assert_event_count(self.expected_outputs.len())
967            .await?;
968
969        Ok(())
970    }
971}
972
973/// Event predicate function type
974type EventPredicate = Box<dyn Fn(&StreamEvent) -> bool + Send + Sync>;
975
976/// Event matcher for assertions
977pub struct EventMatcher {
978    conditions: Vec<EventPredicate>,
979}
980
981impl EventMatcher {
982    /// Create a new matcher
983    pub fn new() -> Self {
984        Self {
985            conditions: Vec::new(),
986        }
987    }
988
989    /// Add condition
990    pub fn with_condition<F>(mut self, condition: F) -> Self
991    where
992        F: Fn(&StreamEvent) -> bool + Send + Sync + 'static,
993    {
994        self.conditions.push(Box::new(condition));
995        self
996    }
997
998    /// Match triple added events
999    pub fn triple_added(mut self) -> Self {
1000        self.conditions
1001            .push(Box::new(|e| matches!(e, StreamEvent::TripleAdded { .. })));
1002        self
1003    }
1004
1005    /// Match triple removed events
1006    pub fn triple_removed(mut self) -> Self {
1007        self.conditions
1008            .push(Box::new(|e| matches!(e, StreamEvent::TripleRemoved { .. })));
1009        self
1010    }
1011
1012    /// Match events by source
1013    pub fn with_source(mut self, source: &str) -> Self {
1014        let source = source.to_string();
1015        self.conditions.push(Box::new(move |e| match e {
1016            StreamEvent::TripleAdded { metadata, .. }
1017            | StreamEvent::TripleRemoved { metadata, .. }
1018            | StreamEvent::GraphCreated { metadata, .. }
1019            | StreamEvent::GraphDeleted { metadata, .. }
1020            | StreamEvent::TransactionBegin { metadata, .. }
1021            | StreamEvent::TransactionCommit { metadata, .. }
1022            | StreamEvent::TransactionAbort { metadata, .. }
1023            | StreamEvent::Heartbeat { metadata, .. } => metadata.source == source,
1024            _ => false,
1025        }));
1026        self
1027    }
1028
1029    /// Match SPARQL update events
1030    pub fn sparql_update(mut self) -> Self {
1031        self.conditions
1032            .push(Box::new(|e| matches!(e, StreamEvent::SparqlUpdate { .. })));
1033        self
1034    }
1035
1036    /// Match heartbeat events
1037    pub fn heartbeat(mut self) -> Self {
1038        self.conditions
1039            .push(Box::new(|e| matches!(e, StreamEvent::Heartbeat { .. })));
1040        self
1041    }
1042
1043    /// Check if event matches all conditions
1044    pub fn matches(&self, event: &StreamEvent) -> bool {
1045        self.conditions.iter().all(|c| c(event))
1046    }
1047}
1048
1049impl Default for EventMatcher {
1050    fn default() -> Self {
1051        Self::new()
1052    }
1053}
1054
1055/// Macros for common assertions
1056#[macro_export]
1057macro_rules! assert_stream_output {
1058    ($harness:expr, count($expected:expr)) => {
1059        $harness.assert_event_count($expected).await
1060    };
1061    ($harness:expr, contains($predicate:expr)) => {
1062        $harness.assert_contains($predicate).await
1063    };
1064    ($harness:expr, empty) => {
1065        $harness.assert_no_events().await
1066    };
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072
1073    #[tokio::test]
1074    async fn test_harness_builder() {
1075        let harness = TestHarness::builder()
1076            .with_mock_clock()
1077            .with_buffer_size(1000)
1078            .with_timeout(Duration::from_secs(5))
1079            .build()
1080            .await
1081            .unwrap();
1082
1083        assert!(harness.config.use_mock_clock);
1084        assert_eq!(harness.config.event_buffer_size, 1000);
1085    }
1086
1087    #[tokio::test]
1088    async fn test_mock_clock() {
1089        let clock = MockClock::new(Utc::now());
1090        let initial = clock.now().await;
1091
1092        clock.advance(Duration::from_secs(60)).await;
1093        let after = clock.now().await;
1094
1095        let diff = (after - initial).num_seconds();
1096        assert_eq!(diff, 60);
1097    }
1098
1099    #[tokio::test]
1100    async fn test_event_generator() {
1101        let generator = EventGenerator::sequential(10);
1102        let time = Utc::now();
1103
1104        let events = generator.generate_batch(5, time);
1105        assert_eq!(events.len(), 5);
1106    }
1107
1108    #[tokio::test]
1109    async fn test_push_events() {
1110        let harness = TestHarness::builder().build().await.unwrap();
1111
1112        let metadata = EventMetadata {
1113            event_id: Uuid::new_v4().to_string(),
1114            timestamp: Utc::now(),
1115            source: "test".to_string(),
1116            user: None,
1117            context: None,
1118            caused_by: None,
1119            version: "1.0".to_string(),
1120            properties: HashMap::new(),
1121            checksum: None,
1122        };
1123
1124        let event = StreamEvent::TripleAdded {
1125            subject: "test:subject".to_string(),
1126            predicate: "test:predicate".to_string(),
1127            object: "value1".to_string(),
1128            graph: None,
1129            metadata,
1130        };
1131
1132        harness.push_event(event).await.unwrap();
1133
1134        let metrics = harness.get_metrics().await;
1135        assert_eq!(metrics.events_pushed, 1);
1136    }
1137
1138    #[tokio::test]
1139    async fn test_assert_event_count() {
1140        let harness = TestHarness::builder().build().await.unwrap();
1141
1142        // Should pass with 0 events
1143        harness.assert_event_count(0).await.unwrap();
1144
1145        // Add an output event
1146        let metadata = EventMetadata {
1147            event_id: Uuid::new_v4().to_string(),
1148            timestamp: Utc::now(),
1149            source: "test".to_string(),
1150            user: None,
1151            context: None,
1152            caused_by: None,
1153            version: "1.0".to_string(),
1154            properties: HashMap::new(),
1155            checksum: None,
1156        };
1157
1158        let event = StreamEvent::TripleAdded {
1159            subject: "test:subject".to_string(),
1160            predicate: "test:predicate".to_string(),
1161            object: "value1".to_string(),
1162            graph: None,
1163            metadata,
1164        };
1165
1166        harness.add_output(event).await;
1167
1168        // Should pass with 1 event
1169        harness.assert_event_count(1).await.unwrap();
1170
1171        // Should fail with wrong count
1172        assert!(harness.assert_event_count(2).await.is_err());
1173    }
1174
1175    #[tokio::test]
1176    async fn test_assert_contains() {
1177        let harness = TestHarness::builder().build().await.unwrap();
1178
1179        let metadata = EventMetadata {
1180            event_id: Uuid::new_v4().to_string(),
1181            timestamp: Utc::now(),
1182            source: "test".to_string(),
1183            user: None,
1184            context: None,
1185            caused_by: None,
1186            version: "1.0".to_string(),
1187            properties: HashMap::new(),
1188            checksum: None,
1189        };
1190
1191        let event = StreamEvent::TripleAdded {
1192            subject: "test:subject".to_string(),
1193            predicate: "test:predicate".to_string(),
1194            object: "value42".to_string(),
1195            graph: None,
1196            metadata,
1197        };
1198
1199        harness.add_output(event).await;
1200
1201        // Should find triple added event
1202        harness.assert_contains(|e| {
1203            matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "test:subject")
1204        }).await.unwrap();
1205
1206        // Should not find non-existent event
1207        assert!(harness
1208            .assert_contains(|e| {
1209                matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "other:subject")
1210            })
1211            .await
1212            .is_err());
1213    }
1214
1215    #[tokio::test]
1216    async fn test_event_matcher() {
1217        let matcher = EventMatcher::new().triple_added();
1218
1219        let metadata = EventMetadata {
1220            event_id: Uuid::new_v4().to_string(),
1221            timestamp: Utc::now(),
1222            source: "test".to_string(),
1223            user: None,
1224            context: None,
1225            caused_by: None,
1226            version: "1.0".to_string(),
1227            properties: HashMap::new(),
1228            checksum: None,
1229        };
1230
1231        let event = StreamEvent::TripleAdded {
1232            subject: "test:subject".to_string(),
1233            predicate: "test:predicate".to_string(),
1234            object: "value".to_string(),
1235            graph: None,
1236            metadata,
1237        };
1238
1239        assert!(matcher.matches(&event));
1240    }
1241
1242    #[tokio::test]
1243    async fn test_generate_report() {
1244        let harness = TestHarness::builder().build().await.unwrap();
1245
1246        harness.assert_event_count(0).await.unwrap();
1247
1248        let report = harness.generate_report().await;
1249        assert_eq!(report.status, TestStatus::Passed);
1250        assert_eq!(report.metrics.total_assertions, 1);
1251        assert_eq!(report.metrics.passed_assertions, 1);
1252    }
1253
1254    #[tokio::test]
1255    async fn test_fixture() {
1256        let harness = TestHarness::builder().build().await.unwrap();
1257
1258        let fixture = TestFixture::new("basic_test").advance_time(Duration::from_secs(60));
1259
1260        // Should pass with no inputs/outputs
1261        fixture.run(&harness).await.unwrap();
1262    }
1263
1264    #[tokio::test]
1265    async fn test_harness_reset() {
1266        let harness = TestHarness::builder().build().await.unwrap();
1267
1268        let metadata = EventMetadata {
1269            event_id: Uuid::new_v4().to_string(),
1270            timestamp: Utc::now(),
1271            source: "test".to_string(),
1272            user: None,
1273            context: None,
1274            caused_by: None,
1275            version: "1.0".to_string(),
1276            properties: HashMap::new(),
1277            checksum: None,
1278        };
1279
1280        let event = StreamEvent::TripleAdded {
1281            subject: "test:subject".to_string(),
1282            predicate: "test:predicate".to_string(),
1283            object: "value".to_string(),
1284            graph: None,
1285            metadata,
1286        };
1287
1288        harness.add_output(event).await;
1289        harness.assert_event_count(1).await.unwrap();
1290
1291        harness.reset().await;
1292
1293        let metrics = harness.get_metrics().await;
1294        assert_eq!(metrics.events_received, 0);
1295        assert_eq!(metrics.total_assertions, 0);
1296
1297        harness.assert_event_count(0).await.unwrap();
1298    }
1299
1300    #[tokio::test]
1301    async fn test_captured_events() {
1302        let harness = TestHarness::builder().build().await.unwrap();
1303
1304        let metadata = EventMetadata {
1305            event_id: Uuid::new_v4().to_string(),
1306            timestamp: Utc::now(),
1307            source: "test".to_string(),
1308            user: None,
1309            context: None,
1310            caused_by: None,
1311            version: "1.0".to_string(),
1312            properties: HashMap::new(),
1313            checksum: None,
1314        };
1315
1316        let event = StreamEvent::TripleAdded {
1317            subject: "test:subject".to_string(),
1318            predicate: "test:predicate".to_string(),
1319            object: "value".to_string(),
1320            graph: None,
1321            metadata,
1322        };
1323
1324        harness.push_event(event).await.unwrap();
1325
1326        let captured = harness.get_captured_events().await;
1327        assert_eq!(captured.len(), 1);
1328        assert_eq!(captured[0].source, "input");
1329    }
1330
1331    #[tokio::test]
1332    async fn test_time_advancement() {
1333        let initial = Utc::now();
1334        let harness = TestHarness::builder()
1335            .with_mock_clock()
1336            .with_initial_time(initial)
1337            .build()
1338            .await
1339            .unwrap();
1340
1341        assert_eq!(harness.now().await, initial);
1342
1343        harness.advance_time(Duration::from_secs(3600)).await;
1344        let after = harness.now().await;
1345        let diff = (after - initial).num_seconds();
1346        assert_eq!(diff, 3600);
1347    }
1348}