oxirs_stream/
testing_framework.rs

1//! # Stream Testing Framework
2//!
3//! This module provides a comprehensive testing framework for stream
4//! applications, enabling developers to write reliable and maintainable tests.
5//!
6//! ## Features
7//! - Test harness for stream applications
8//! - Mock streams and event generators
9//! - Time manipulation for testing windows
10//! - Assertions for stream output
11//! - Performance testing utilities
12//! - Test fixtures and builders
13//! - Test report generation
14//!
15//! ## Example
16//! ```rust,ignore
17//! use oxirs_stream::testing_framework::*;
18//!
19//! #[tokio::test]
20//! async fn test_stream_processing() {
21//!     let harness = TestHarness::builder()
22//!         .with_mock_clock()
23//!         .with_event_generator(EventGenerator::sequential(100))
24//!         .build()
25//!         .await?;
26//!
27//!     harness.push_events(vec![/* events */]).await;
28//!     harness.advance_time(Duration::from_secs(60)).await;
29//!
30//!     assert_stream_output!(harness, contains(expected));
31//! }
32//! ```
33
34use anyhow::{anyhow, Result};
35use chrono::{DateTime, Utc};
36use serde::{Deserialize, Serialize};
37use std::collections::{HashMap, VecDeque};
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{mpsc, RwLock};
42use tracing::{debug, info};
43use uuid::Uuid;
44
45use crate::event::{EventMetadata, StreamEvent};
46
47/// Configuration for the test harness
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct TestHarnessConfig {
50    /// Use mock clock for time manipulation
51    pub use_mock_clock: bool,
52    /// Initial mock time
53    pub initial_time: Option<DateTime<Utc>>,
54    /// Event buffer size
55    pub event_buffer_size: usize,
56    /// Timeout for assertions
57    pub assertion_timeout: Duration,
58    /// Enable verbose logging
59    pub verbose: bool,
60    /// Capture all events for inspection
61    pub capture_events: bool,
62    /// Maximum events to capture
63    pub max_captured_events: usize,
64    /// Enable performance metrics
65    pub enable_metrics: bool,
66}
67
68impl Default for TestHarnessConfig {
69    fn default() -> Self {
70        Self {
71            use_mock_clock: true,
72            initial_time: None,
73            event_buffer_size: 10000,
74            assertion_timeout: Duration::from_secs(10),
75            verbose: false,
76            capture_events: true,
77            max_captured_events: 100000,
78            enable_metrics: true,
79        }
80    }
81}
82
83/// Mock clock for time manipulation in tests
84pub struct MockClock {
85    /// Current time
86    current_time: Arc<RwLock<DateTime<Utc>>>,
87    /// Time advancement listeners
88    listeners: Arc<RwLock<Vec<mpsc::Sender<DateTime<Utc>>>>>,
89}
90
91impl MockClock {
92    /// Create a new mock clock
93    pub fn new(initial_time: DateTime<Utc>) -> Self {
94        Self {
95            current_time: Arc::new(RwLock::new(initial_time)),
96            listeners: Arc::new(RwLock::new(Vec::new())),
97        }
98    }
99
100    /// Get current time
101    pub async fn now(&self) -> DateTime<Utc> {
102        *self.current_time.read().await
103    }
104
105    /// Advance time by duration
106    pub async fn advance(&self, duration: Duration) {
107        let mut time = self.current_time.write().await;
108        *time += chrono::Duration::from_std(duration).unwrap_or_default();
109
110        let new_time = *time;
111        drop(time);
112
113        // Notify listeners
114        let listeners = self.listeners.read().await;
115        for listener in listeners.iter() {
116            let _ = listener.send(new_time).await;
117        }
118    }
119
120    /// Set time to specific value
121    pub async fn set_time(&self, time: DateTime<Utc>) {
122        let mut current = self.current_time.write().await;
123        *current = time;
124
125        drop(current);
126
127        let listeners = self.listeners.read().await;
128        for listener in listeners.iter() {
129            let _ = listener.send(time).await;
130        }
131    }
132
133    /// Subscribe to time changes
134    pub async fn subscribe(&self) -> mpsc::Receiver<DateTime<Utc>> {
135        let (tx, rx) = mpsc::channel(100);
136        let mut listeners = self.listeners.write().await;
137        listeners.push(tx);
138        rx
139    }
140}
141
142/// Event generator for creating test events
143pub struct EventGenerator {
144    /// Generator type
145    generator_type: GeneratorType,
146    /// Event counter
147    counter: AtomicU64,
148    /// Configuration
149    config: GeneratorConfig,
150}
151
152/// Generator configuration
153#[derive(Debug, Clone)]
154pub struct GeneratorConfig {
155    /// Source name for events
156    pub source: String,
157    /// Event properties template
158    pub properties: HashMap<String, String>,
159    /// Timestamp increment
160    pub timestamp_increment: Duration,
161}
162
163impl Default for GeneratorConfig {
164    fn default() -> Self {
165        Self {
166            source: "test_generator".to_string(),
167            properties: HashMap::new(),
168            timestamp_increment: Duration::from_millis(100),
169        }
170    }
171}
172
173/// Types of event generators
174#[derive(Debug, Clone)]
175pub enum GeneratorType {
176    /// Sequential integer events
177    Sequential { start: u64, step: u64 },
178    /// Random events
179    Random { min: f64, max: f64 },
180    /// Cyclic pattern
181    Cyclic { pattern: Vec<f64>, index: usize },
182    /// Gaussian distribution
183    Gaussian { mean: f64, stddev: f64 },
184    /// Custom generator function
185    Custom,
186}
187
188impl EventGenerator {
189    /// Create a sequential generator
190    pub fn sequential(_count: u64) -> Self {
191        Self {
192            generator_type: GeneratorType::Sequential { start: 0, step: 1 },
193            counter: AtomicU64::new(0),
194            config: GeneratorConfig::default(),
195        }
196    }
197
198    /// Create a random generator
199    pub fn random(min: f64, max: f64) -> Self {
200        Self {
201            generator_type: GeneratorType::Random { min, max },
202            counter: AtomicU64::new(0),
203            config: GeneratorConfig::default(),
204        }
205    }
206
207    /// Create a cyclic generator
208    pub fn cyclic(pattern: Vec<f64>) -> Self {
209        Self {
210            generator_type: GeneratorType::Cyclic { pattern, index: 0 },
211            counter: AtomicU64::new(0),
212            config: GeneratorConfig::default(),
213        }
214    }
215
216    /// Create a gaussian generator
217    pub fn gaussian(mean: f64, stddev: f64) -> Self {
218        Self {
219            generator_type: GeneratorType::Gaussian { mean, stddev },
220            counter: AtomicU64::new(0),
221            config: GeneratorConfig::default(),
222        }
223    }
224
225    /// Set source name
226    pub fn with_source(mut self, source: String) -> Self {
227        self.config.source = source;
228        self
229    }
230
231    /// Set properties
232    pub fn with_properties(mut self, properties: HashMap<String, String>) -> Self {
233        self.config.properties = properties;
234        self
235    }
236
237    /// Generate next event
238    pub fn next_event(&self, timestamp: DateTime<Utc>) -> StreamEvent {
239        let count = self.counter.fetch_add(1, Ordering::SeqCst);
240
241        let value = match &self.generator_type {
242            GeneratorType::Sequential { start, step } => {
243                format!("{}", start + count * step)
244            }
245            GeneratorType::Random { min, max } => {
246                let range = max - min;
247                let value = min + (count as f64 % 1000.0) / 1000.0 * range;
248                format!("{:.2}", value)
249            }
250            GeneratorType::Cyclic { pattern, .. } => {
251                let index = count as usize % pattern.len();
252                format!("{:.2}", pattern[index])
253            }
254            GeneratorType::Gaussian { mean, stddev } => {
255                // Simple approximation
256                let value = mean + (count as f64 % 10.0 - 5.0) * stddev / 5.0;
257                format!("{:.2}", value)
258            }
259            GeneratorType::Custom => {
260                format!("{}", count)
261            }
262        };
263
264        let metadata = EventMetadata {
265            event_id: Uuid::new_v4().to_string(),
266            timestamp,
267            source: self.config.source.clone(),
268            user: None,
269            context: Some(format!("test_event_{}", count)),
270            caused_by: None,
271            version: "1.0".to_string(),
272            properties: self.config.properties.clone(),
273            checksum: None,
274        };
275
276        // Use TripleAdded as a test event type
277        StreamEvent::TripleAdded {
278            subject: format!("test:subject_{}", count),
279            predicate: "test:predicate".to_string(),
280            object: value,
281            graph: None,
282            metadata,
283        }
284    }
285
286    /// Generate batch of events
287    pub fn generate_batch(&self, count: usize, start_time: DateTime<Utc>) -> Vec<StreamEvent> {
288        let mut events = Vec::with_capacity(count);
289        let mut time = start_time;
290
291        for _ in 0..count {
292            events.push(self.next_event(time));
293            time += chrono::Duration::from_std(self.config.timestamp_increment).unwrap_or_default();
294        }
295
296        events
297    }
298}
299
300/// Test harness for stream testing
301pub struct TestHarness {
302    /// Configuration
303    config: TestHarnessConfig,
304    /// Mock clock
305    clock: Arc<MockClock>,
306    /// Event generator
307    generator: Option<Arc<EventGenerator>>,
308    /// Input events channel
309    input_tx: mpsc::Sender<StreamEvent>,
310    /// Input events receiver
311    input_rx: Arc<RwLock<mpsc::Receiver<StreamEvent>>>,
312    /// Output events
313    output_events: Arc<RwLock<VecDeque<StreamEvent>>>,
314    /// Captured events for inspection
315    captured_events: Arc<RwLock<Vec<CapturedEvent>>>,
316    /// Test metrics
317    metrics: Arc<RwLock<TestMetrics>>,
318    /// Assertions
319    assertions: Arc<RwLock<Vec<Assertion>>>,
320}
321
322/// Captured event with metadata
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct CapturedEvent {
325    /// Original event
326    pub event: StreamEvent,
327    /// Capture time
328    pub captured_at: DateTime<Utc>,
329    /// Processing time
330    pub processing_time: Option<Duration>,
331    /// Source
332    pub source: String,
333}
334
335/// Test metrics
336#[derive(Debug, Clone, Default, Serialize, Deserialize)]
337pub struct TestMetrics {
338    /// Total events pushed
339    pub events_pushed: u64,
340    /// Total events received
341    pub events_received: u64,
342    /// Total assertions
343    pub total_assertions: u64,
344    /// Passed assertions
345    pub passed_assertions: u64,
346    /// Failed assertions
347    pub failed_assertions: u64,
348    /// Average processing time
349    pub avg_processing_time_us: f64,
350    /// Max processing time
351    pub max_processing_time_us: u64,
352    /// Test duration
353    pub test_duration: Duration,
354    /// Memory usage
355    pub memory_usage_bytes: usize,
356}
357
358/// Assertion for testing
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct Assertion {
361    /// Assertion type
362    pub assertion_type: AssertionType,
363    /// Expected value
364    pub expected: String,
365    /// Actual value
366    pub actual: Option<String>,
367    /// Result
368    pub passed: bool,
369    /// Error message
370    pub error_message: Option<String>,
371    /// Timestamp
372    pub timestamp: DateTime<Utc>,
373}
374
375/// Assertion types
376#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
377pub enum AssertionType {
378    /// Assert event count
379    EventCount,
380    /// Assert event contains
381    Contains,
382    /// Assert event order
383    Order,
384    /// Assert no events
385    NoEvents,
386    /// Assert event property
387    Property,
388    /// Assert within duration
389    WithinDuration,
390    /// Assert performance
391    Performance,
392    /// Custom assertion
393    Custom(String),
394}
395
396/// Test harness builder
397pub struct TestHarnessBuilder {
398    config: TestHarnessConfig,
399    generator: Option<EventGenerator>,
400}
401
402impl TestHarnessBuilder {
403    /// Create a new builder
404    pub fn new() -> Self {
405        Self {
406            config: TestHarnessConfig::default(),
407            generator: None,
408        }
409    }
410
411    /// Use mock clock
412    pub fn with_mock_clock(mut self) -> Self {
413        self.config.use_mock_clock = true;
414        self
415    }
416
417    /// Set initial time
418    pub fn with_initial_time(mut self, time: DateTime<Utc>) -> Self {
419        self.config.initial_time = Some(time);
420        self
421    }
422
423    /// Set event buffer size
424    pub fn with_buffer_size(mut self, size: usize) -> Self {
425        self.config.event_buffer_size = size;
426        self
427    }
428
429    /// Set assertion timeout
430    pub fn with_timeout(mut self, timeout: Duration) -> Self {
431        self.config.assertion_timeout = timeout;
432        self
433    }
434
435    /// Enable verbose logging
436    pub fn verbose(mut self) -> Self {
437        self.config.verbose = true;
438        self
439    }
440
441    /// Set event generator
442    pub fn with_event_generator(mut self, generator: EventGenerator) -> Self {
443        self.generator = Some(generator);
444        self
445    }
446
447    /// Build the test harness
448    pub async fn build(self) -> Result<TestHarness> {
449        let initial_time = self.config.initial_time.unwrap_or_else(Utc::now);
450        let clock = Arc::new(MockClock::new(initial_time));
451
452        let (input_tx, input_rx) = mpsc::channel(self.config.event_buffer_size);
453
454        let harness = TestHarness {
455            config: self.config,
456            clock,
457            generator: self.generator.map(Arc::new),
458            input_tx,
459            input_rx: Arc::new(RwLock::new(input_rx)),
460            output_events: Arc::new(RwLock::new(VecDeque::new())),
461            captured_events: Arc::new(RwLock::new(Vec::new())),
462            metrics: Arc::new(RwLock::new(TestMetrics::default())),
463            assertions: Arc::new(RwLock::new(Vec::new())),
464        };
465
466        if harness.config.verbose {
467            info!("Test harness created with config: {:?}", harness.config);
468        }
469
470        Ok(harness)
471    }
472}
473
474impl Default for TestHarnessBuilder {
475    fn default() -> Self {
476        Self::new()
477    }
478}
479
480impl TestHarness {
481    /// Create a new test harness builder
482    pub fn builder() -> TestHarnessBuilder {
483        TestHarnessBuilder::new()
484    }
485
486    /// Get current mock time
487    pub async fn now(&self) -> DateTime<Utc> {
488        self.clock.now().await
489    }
490
491    /// Advance mock time
492    pub async fn advance_time(&self, duration: Duration) {
493        if self.config.verbose {
494            debug!("Advancing time by {:?}", duration);
495        }
496        self.clock.advance(duration).await;
497    }
498
499    /// Set mock time
500    pub async fn set_time(&self, time: DateTime<Utc>) {
501        if self.config.verbose {
502            debug!("Setting time to {:?}", time);
503        }
504        self.clock.set_time(time).await;
505    }
506
507    /// Push a single event
508    pub async fn push_event(&self, event: StreamEvent) -> Result<()> {
509        self.input_tx
510            .send(event.clone())
511            .await
512            .map_err(|e| anyhow!("Failed to push event: {}", e))?;
513
514        if self.config.capture_events {
515            let mut captured = self.captured_events.write().await;
516            if captured.len() < self.config.max_captured_events {
517                captured.push(CapturedEvent {
518                    event,
519                    captured_at: self.clock.now().await,
520                    processing_time: None,
521                    source: "input".to_string(),
522                });
523            }
524        }
525
526        let mut metrics = self.metrics.write().await;
527        metrics.events_pushed += 1;
528
529        Ok(())
530    }
531
532    /// Push multiple events
533    pub async fn push_events(&self, events: Vec<StreamEvent>) -> Result<()> {
534        for event in events {
535            self.push_event(event).await?;
536        }
537        Ok(())
538    }
539
540    /// Generate and push events
541    pub async fn generate_events(&self, count: usize) -> Result<()> {
542        if let Some(generator) = &self.generator {
543            let time = self.clock.now().await;
544            let events = generator.generate_batch(count, time);
545            self.push_events(events).await
546        } else {
547            Err(anyhow!("No event generator configured"))
548        }
549    }
550
551    /// Add output event (called by stream processor)
552    pub async fn add_output(&self, event: StreamEvent) {
553        let mut output = self.output_events.write().await;
554        output.push_back(event.clone());
555
556        if self.config.capture_events {
557            let mut captured = self.captured_events.write().await;
558            if captured.len() < self.config.max_captured_events {
559                captured.push(CapturedEvent {
560                    event,
561                    captured_at: self.clock.now().await,
562                    processing_time: None,
563                    source: "output".to_string(),
564                });
565            }
566        }
567
568        let mut metrics = self.metrics.write().await;
569        metrics.events_received += 1;
570    }
571
572    /// Get output events
573    pub async fn get_output(&self) -> Vec<StreamEvent> {
574        let output = self.output_events.read().await;
575        output.iter().cloned().collect()
576    }
577
578    /// Clear output events
579    pub async fn clear_output(&self) {
580        let mut output = self.output_events.write().await;
581        output.clear();
582    }
583
584    /// Get captured events
585    pub async fn get_captured_events(&self) -> Vec<CapturedEvent> {
586        let captured = self.captured_events.read().await;
587        captured.clone()
588    }
589
590    /// Assert event count
591    pub async fn assert_event_count(&self, expected: usize) -> Result<()> {
592        let output = self.output_events.read().await;
593        let actual = output.len();
594
595        let passed = actual == expected;
596        let error_message = if passed {
597            None
598        } else {
599            Some(format!("Expected {} events, got {}", expected, actual))
600        };
601
602        let assertion = Assertion {
603            assertion_type: AssertionType::EventCount,
604            expected: expected.to_string(),
605            actual: Some(actual.to_string()),
606            passed,
607            error_message: error_message.clone(),
608            timestamp: self.clock.now().await,
609        };
610
611        let mut assertions = self.assertions.write().await;
612        assertions.push(assertion);
613
614        let mut metrics = self.metrics.write().await;
615        metrics.total_assertions += 1;
616        if passed {
617            metrics.passed_assertions += 1;
618        } else {
619            metrics.failed_assertions += 1;
620        }
621
622        if passed {
623            Ok(())
624        } else {
625            Err(anyhow!(error_message.unwrap()))
626        }
627    }
628
629    /// Assert output contains event
630    pub async fn assert_contains(&self, predicate: impl Fn(&StreamEvent) -> bool) -> Result<()> {
631        let output = self.output_events.read().await;
632        let found = output.iter().any(predicate);
633
634        let passed = found;
635        let error_message = if passed {
636            None
637        } else {
638            Some("Expected event not found in output".to_string())
639        };
640
641        let assertion = Assertion {
642            assertion_type: AssertionType::Contains,
643            expected: "matching event".to_string(),
644            actual: Some(format!("{} events checked", output.len())),
645            passed,
646            error_message: error_message.clone(),
647            timestamp: self.clock.now().await,
648        };
649
650        let mut assertions = self.assertions.write().await;
651        assertions.push(assertion);
652
653        let mut metrics = self.metrics.write().await;
654        metrics.total_assertions += 1;
655        if passed {
656            metrics.passed_assertions += 1;
657        } else {
658            metrics.failed_assertions += 1;
659        }
660
661        if passed {
662            Ok(())
663        } else {
664            Err(anyhow!(error_message.unwrap()))
665        }
666    }
667
668    /// Assert no output events
669    pub async fn assert_no_events(&self) -> Result<()> {
670        self.assert_event_count(0).await
671    }
672
673    /// Assert events within duration
674    pub async fn assert_within(
675        &self,
676        duration: Duration,
677        condition: impl Fn(&[StreamEvent]) -> bool,
678    ) -> Result<()> {
679        let start = Instant::now();
680
681        while start.elapsed() < duration {
682            let output = self.output_events.read().await;
683            let events: Vec<_> = output.iter().cloned().collect();
684            drop(output);
685
686            if condition(&events) {
687                let assertion = Assertion {
688                    assertion_type: AssertionType::WithinDuration,
689                    expected: format!("condition within {:?}", duration),
690                    actual: Some(format!("satisfied after {:?}", start.elapsed())),
691                    passed: true,
692                    error_message: None,
693                    timestamp: self.clock.now().await,
694                };
695
696                let mut assertions = self.assertions.write().await;
697                assertions.push(assertion);
698
699                let mut metrics = self.metrics.write().await;
700                metrics.total_assertions += 1;
701                metrics.passed_assertions += 1;
702
703                return Ok(());
704            }
705
706            tokio::time::sleep(Duration::from_millis(10)).await;
707        }
708
709        let error_message = format!("Condition not satisfied within {:?}", duration);
710
711        let assertion = Assertion {
712            assertion_type: AssertionType::WithinDuration,
713            expected: format!("condition within {:?}", duration),
714            actual: Some("timeout".to_string()),
715            passed: false,
716            error_message: Some(error_message.clone()),
717            timestamp: self.clock.now().await,
718        };
719
720        let mut assertions = self.assertions.write().await;
721        assertions.push(assertion);
722
723        let mut metrics = self.metrics.write().await;
724        metrics.total_assertions += 1;
725        metrics.failed_assertions += 1;
726
727        Err(anyhow!(error_message))
728    }
729
730    /// Assert performance metric
731    pub async fn assert_performance(
732        &self,
733        metric: PerformanceMetric,
734        threshold: f64,
735    ) -> Result<()> {
736        let metrics = self.metrics.read().await;
737
738        let (actual, passed) = match metric {
739            PerformanceMetric::AvgLatency => (
740                metrics.avg_processing_time_us,
741                metrics.avg_processing_time_us <= threshold,
742            ),
743            PerformanceMetric::MaxLatency => (
744                metrics.max_processing_time_us as f64,
745                metrics.max_processing_time_us as f64 <= threshold,
746            ),
747            PerformanceMetric::Throughput => {
748                let throughput = if metrics.test_duration.as_secs_f64() > 0.0 {
749                    metrics.events_received as f64 / metrics.test_duration.as_secs_f64()
750                } else {
751                    0.0
752                };
753                (throughput, throughput >= threshold)
754            }
755        };
756
757        drop(metrics);
758
759        let error_message = if passed {
760            None
761        } else {
762            Some(format!(
763                "{:?} {} does not meet threshold {}",
764                metric, actual, threshold
765            ))
766        };
767
768        let assertion = Assertion {
769            assertion_type: AssertionType::Performance,
770            expected: format!("{:?} <= {}", metric, threshold),
771            actual: Some(actual.to_string()),
772            passed,
773            error_message: error_message.clone(),
774            timestamp: self.clock.now().await,
775        };
776
777        let mut assertions = self.assertions.write().await;
778        assertions.push(assertion);
779
780        let mut metrics = self.metrics.write().await;
781        metrics.total_assertions += 1;
782        if passed {
783            metrics.passed_assertions += 1;
784        } else {
785            metrics.failed_assertions += 1;
786        }
787
788        if passed {
789            Ok(())
790        } else {
791            Err(anyhow!(error_message.unwrap()))
792        }
793    }
794
795    /// Get test metrics
796    pub async fn get_metrics(&self) -> TestMetrics {
797        self.metrics.read().await.clone()
798    }
799
800    /// Get all assertions
801    pub async fn get_assertions(&self) -> Vec<Assertion> {
802        self.assertions.read().await.clone()
803    }
804
805    /// Generate test report
806    pub async fn generate_report(&self) -> TestReport {
807        let metrics = self.metrics.read().await;
808        let assertions = self.assertions.read().await;
809        let captured = self.captured_events.read().await;
810
811        TestReport {
812            test_name: "stream_test".to_string(),
813            status: if metrics.failed_assertions == 0 {
814                TestStatus::Passed
815            } else {
816                TestStatus::Failed
817            },
818            metrics: metrics.clone(),
819            assertions: assertions.clone(),
820            event_count: captured.len(),
821            generated_at: Utc::now(),
822        }
823    }
824
825    /// Reset harness state
826    pub async fn reset(&self) {
827        self.output_events.write().await.clear();
828        self.captured_events.write().await.clear();
829        *self.metrics.write().await = TestMetrics::default();
830        self.assertions.write().await.clear();
831
832        if self.config.verbose {
833            info!("Test harness reset");
834        }
835    }
836}
837
838/// Performance metrics
839#[derive(Debug, Clone, Copy)]
840pub enum PerformanceMetric {
841    /// Average latency in microseconds
842    AvgLatency,
843    /// Maximum latency in microseconds
844    MaxLatency,
845    /// Throughput in events per second
846    Throughput,
847}
848
849/// Test report
850#[derive(Debug, Clone, Serialize, Deserialize)]
851pub struct TestReport {
852    /// Test name
853    pub test_name: String,
854    /// Test status
855    pub status: TestStatus,
856    /// Test metrics
857    pub metrics: TestMetrics,
858    /// Assertions
859    pub assertions: Vec<Assertion>,
860    /// Total event count
861    pub event_count: usize,
862    /// Report generation time
863    pub generated_at: DateTime<Utc>,
864}
865
866/// Test status
867#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
868pub enum TestStatus {
869    Passed,
870    Failed,
871    Skipped,
872    Error,
873}
874
875impl TestReport {
876    /// Convert to JSON
877    pub fn to_json(&self) -> Result<String> {
878        serde_json::to_string_pretty(self).map_err(|e| anyhow!("JSON error: {}", e))
879    }
880
881    /// Print summary
882    pub fn print_summary(&self) {
883        println!("\n=== Test Report: {} ===", self.test_name);
884        println!("Status: {:?}", self.status);
885        println!("Events pushed: {}", self.metrics.events_pushed);
886        println!("Events received: {}", self.metrics.events_received);
887        println!(
888            "Assertions: {}/{} passed",
889            self.metrics.passed_assertions, self.metrics.total_assertions
890        );
891        if self.metrics.total_assertions > 0 && self.metrics.failed_assertions > 0 {
892            println!("Failed assertions:");
893            for assertion in &self.assertions {
894                if !assertion.passed {
895                    println!(
896                        "  - {:?}: {}",
897                        assertion.assertion_type,
898                        assertion.error_message.clone().unwrap_or_default()
899                    );
900                }
901            }
902        }
903        println!("========================\n");
904    }
905}
906
907/// Test fixture for common test scenarios
908pub struct TestFixture {
909    /// Name
910    pub name: String,
911    /// Setup events
912    pub setup_events: Vec<StreamEvent>,
913    /// Expected outputs
914    pub expected_outputs: Vec<StreamEvent>,
915    /// Time advancement
916    pub time_advance: Option<Duration>,
917}
918
919impl TestFixture {
920    /// Create a new fixture
921    pub fn new(name: &str) -> Self {
922        Self {
923            name: name.to_string(),
924            setup_events: Vec::new(),
925            expected_outputs: Vec::new(),
926            time_advance: None,
927        }
928    }
929
930    /// Add setup event
931    pub fn with_input(mut self, event: StreamEvent) -> Self {
932        self.setup_events.push(event);
933        self
934    }
935
936    /// Add expected output
937    pub fn expect_output(mut self, event: StreamEvent) -> Self {
938        self.expected_outputs.push(event);
939        self
940    }
941
942    /// Set time advancement
943    pub fn advance_time(mut self, duration: Duration) -> Self {
944        self.time_advance = Some(duration);
945        self
946    }
947
948    /// Run fixture with harness
949    pub async fn run(&self, harness: &TestHarness) -> Result<()> {
950        // Push setup events
951        harness.push_events(self.setup_events.clone()).await?;
952
953        // Advance time if configured
954        if let Some(duration) = self.time_advance {
955            harness.advance_time(duration).await;
956        }
957
958        // Verify outputs
959        harness
960            .assert_event_count(self.expected_outputs.len())
961            .await?;
962
963        Ok(())
964    }
965}
966
967/// Event predicate function type
968type EventPredicate = Box<dyn Fn(&StreamEvent) -> bool + Send + Sync>;
969
970/// Event matcher for assertions
971pub struct EventMatcher {
972    conditions: Vec<EventPredicate>,
973}
974
975impl EventMatcher {
976    /// Create a new matcher
977    pub fn new() -> Self {
978        Self {
979            conditions: Vec::new(),
980        }
981    }
982
983    /// Add condition
984    pub fn with_condition<F>(mut self, condition: F) -> Self
985    where
986        F: Fn(&StreamEvent) -> bool + Send + Sync + 'static,
987    {
988        self.conditions.push(Box::new(condition));
989        self
990    }
991
992    /// Match triple added events
993    pub fn triple_added(mut self) -> Self {
994        self.conditions
995            .push(Box::new(|e| matches!(e, StreamEvent::TripleAdded { .. })));
996        self
997    }
998
999    /// Match triple removed events
1000    pub fn triple_removed(mut self) -> Self {
1001        self.conditions
1002            .push(Box::new(|e| matches!(e, StreamEvent::TripleRemoved { .. })));
1003        self
1004    }
1005
1006    /// Match events by source
1007    pub fn with_source(mut self, source: &str) -> Self {
1008        let source = source.to_string();
1009        self.conditions.push(Box::new(move |e| match e {
1010            StreamEvent::TripleAdded { metadata, .. }
1011            | StreamEvent::TripleRemoved { metadata, .. }
1012            | StreamEvent::GraphCreated { metadata, .. }
1013            | StreamEvent::GraphDeleted { metadata, .. }
1014            | StreamEvent::TransactionBegin { metadata, .. }
1015            | StreamEvent::TransactionCommit { metadata, .. }
1016            | StreamEvent::TransactionAbort { metadata, .. }
1017            | StreamEvent::Heartbeat { metadata, .. } => metadata.source == source,
1018            _ => false,
1019        }));
1020        self
1021    }
1022
1023    /// Match SPARQL update events
1024    pub fn sparql_update(mut self) -> Self {
1025        self.conditions
1026            .push(Box::new(|e| matches!(e, StreamEvent::SparqlUpdate { .. })));
1027        self
1028    }
1029
1030    /// Match heartbeat events
1031    pub fn heartbeat(mut self) -> Self {
1032        self.conditions
1033            .push(Box::new(|e| matches!(e, StreamEvent::Heartbeat { .. })));
1034        self
1035    }
1036
1037    /// Check if event matches all conditions
1038    pub fn matches(&self, event: &StreamEvent) -> bool {
1039        self.conditions.iter().all(|c| c(event))
1040    }
1041}
1042
1043impl Default for EventMatcher {
1044    fn default() -> Self {
1045        Self::new()
1046    }
1047}
1048
1049/// Macros for common assertions
1050#[macro_export]
1051macro_rules! assert_stream_output {
1052    ($harness:expr, count($expected:expr)) => {
1053        $harness.assert_event_count($expected).await
1054    };
1055    ($harness:expr, contains($predicate:expr)) => {
1056        $harness.assert_contains($predicate).await
1057    };
1058    ($harness:expr, empty) => {
1059        $harness.assert_no_events().await
1060    };
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use super::*;
1066
1067    #[tokio::test]
1068    async fn test_harness_builder() {
1069        let harness = TestHarness::builder()
1070            .with_mock_clock()
1071            .with_buffer_size(1000)
1072            .with_timeout(Duration::from_secs(5))
1073            .build()
1074            .await
1075            .unwrap();
1076
1077        assert!(harness.config.use_mock_clock);
1078        assert_eq!(harness.config.event_buffer_size, 1000);
1079    }
1080
1081    #[tokio::test]
1082    async fn test_mock_clock() {
1083        let clock = MockClock::new(Utc::now());
1084        let initial = clock.now().await;
1085
1086        clock.advance(Duration::from_secs(60)).await;
1087        let after = clock.now().await;
1088
1089        let diff = (after - initial).num_seconds();
1090        assert_eq!(diff, 60);
1091    }
1092
1093    #[tokio::test]
1094    async fn test_event_generator() {
1095        let generator = EventGenerator::sequential(10);
1096        let time = Utc::now();
1097
1098        let events = generator.generate_batch(5, time);
1099        assert_eq!(events.len(), 5);
1100    }
1101
1102    #[tokio::test]
1103    async fn test_push_events() {
1104        let harness = TestHarness::builder().build().await.unwrap();
1105
1106        let metadata = EventMetadata {
1107            event_id: Uuid::new_v4().to_string(),
1108            timestamp: Utc::now(),
1109            source: "test".to_string(),
1110            user: None,
1111            context: None,
1112            caused_by: None,
1113            version: "1.0".to_string(),
1114            properties: HashMap::new(),
1115            checksum: None,
1116        };
1117
1118        let event = StreamEvent::TripleAdded {
1119            subject: "test:subject".to_string(),
1120            predicate: "test:predicate".to_string(),
1121            object: "value1".to_string(),
1122            graph: None,
1123            metadata,
1124        };
1125
1126        harness.push_event(event).await.unwrap();
1127
1128        let metrics = harness.get_metrics().await;
1129        assert_eq!(metrics.events_pushed, 1);
1130    }
1131
1132    #[tokio::test]
1133    async fn test_assert_event_count() {
1134        let harness = TestHarness::builder().build().await.unwrap();
1135
1136        // Should pass with 0 events
1137        harness.assert_event_count(0).await.unwrap();
1138
1139        // Add an output event
1140        let metadata = EventMetadata {
1141            event_id: Uuid::new_v4().to_string(),
1142            timestamp: Utc::now(),
1143            source: "test".to_string(),
1144            user: None,
1145            context: None,
1146            caused_by: None,
1147            version: "1.0".to_string(),
1148            properties: HashMap::new(),
1149            checksum: None,
1150        };
1151
1152        let event = StreamEvent::TripleAdded {
1153            subject: "test:subject".to_string(),
1154            predicate: "test:predicate".to_string(),
1155            object: "value1".to_string(),
1156            graph: None,
1157            metadata,
1158        };
1159
1160        harness.add_output(event).await;
1161
1162        // Should pass with 1 event
1163        harness.assert_event_count(1).await.unwrap();
1164
1165        // Should fail with wrong count
1166        assert!(harness.assert_event_count(2).await.is_err());
1167    }
1168
1169    #[tokio::test]
1170    async fn test_assert_contains() {
1171        let harness = TestHarness::builder().build().await.unwrap();
1172
1173        let metadata = EventMetadata {
1174            event_id: Uuid::new_v4().to_string(),
1175            timestamp: Utc::now(),
1176            source: "test".to_string(),
1177            user: None,
1178            context: None,
1179            caused_by: None,
1180            version: "1.0".to_string(),
1181            properties: HashMap::new(),
1182            checksum: None,
1183        };
1184
1185        let event = StreamEvent::TripleAdded {
1186            subject: "test:subject".to_string(),
1187            predicate: "test:predicate".to_string(),
1188            object: "value42".to_string(),
1189            graph: None,
1190            metadata,
1191        };
1192
1193        harness.add_output(event).await;
1194
1195        // Should find triple added event
1196        harness.assert_contains(|e| {
1197            matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "test:subject")
1198        }).await.unwrap();
1199
1200        // Should not find non-existent event
1201        assert!(harness
1202            .assert_contains(|e| {
1203                matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "other:subject")
1204            })
1205            .await
1206            .is_err());
1207    }
1208
1209    #[tokio::test]
1210    async fn test_event_matcher() {
1211        let matcher = EventMatcher::new().triple_added();
1212
1213        let metadata = EventMetadata {
1214            event_id: Uuid::new_v4().to_string(),
1215            timestamp: Utc::now(),
1216            source: "test".to_string(),
1217            user: None,
1218            context: None,
1219            caused_by: None,
1220            version: "1.0".to_string(),
1221            properties: HashMap::new(),
1222            checksum: None,
1223        };
1224
1225        let event = StreamEvent::TripleAdded {
1226            subject: "test:subject".to_string(),
1227            predicate: "test:predicate".to_string(),
1228            object: "value".to_string(),
1229            graph: None,
1230            metadata,
1231        };
1232
1233        assert!(matcher.matches(&event));
1234    }
1235
1236    #[tokio::test]
1237    async fn test_generate_report() {
1238        let harness = TestHarness::builder().build().await.unwrap();
1239
1240        harness.assert_event_count(0).await.unwrap();
1241
1242        let report = harness.generate_report().await;
1243        assert_eq!(report.status, TestStatus::Passed);
1244        assert_eq!(report.metrics.total_assertions, 1);
1245        assert_eq!(report.metrics.passed_assertions, 1);
1246    }
1247
1248    #[tokio::test]
1249    async fn test_fixture() {
1250        let harness = TestHarness::builder().build().await.unwrap();
1251
1252        let fixture = TestFixture::new("basic_test").advance_time(Duration::from_secs(60));
1253
1254        // Should pass with no inputs/outputs
1255        fixture.run(&harness).await.unwrap();
1256    }
1257
1258    #[tokio::test]
1259    async fn test_harness_reset() {
1260        let harness = TestHarness::builder().build().await.unwrap();
1261
1262        let metadata = EventMetadata {
1263            event_id: Uuid::new_v4().to_string(),
1264            timestamp: Utc::now(),
1265            source: "test".to_string(),
1266            user: None,
1267            context: None,
1268            caused_by: None,
1269            version: "1.0".to_string(),
1270            properties: HashMap::new(),
1271            checksum: None,
1272        };
1273
1274        let event = StreamEvent::TripleAdded {
1275            subject: "test:subject".to_string(),
1276            predicate: "test:predicate".to_string(),
1277            object: "value".to_string(),
1278            graph: None,
1279            metadata,
1280        };
1281
1282        harness.add_output(event).await;
1283        harness.assert_event_count(1).await.unwrap();
1284
1285        harness.reset().await;
1286
1287        let metrics = harness.get_metrics().await;
1288        assert_eq!(metrics.events_received, 0);
1289        assert_eq!(metrics.total_assertions, 0);
1290
1291        harness.assert_event_count(0).await.unwrap();
1292    }
1293
1294    #[tokio::test]
1295    async fn test_captured_events() {
1296        let harness = TestHarness::builder().build().await.unwrap();
1297
1298        let metadata = EventMetadata {
1299            event_id: Uuid::new_v4().to_string(),
1300            timestamp: Utc::now(),
1301            source: "test".to_string(),
1302            user: None,
1303            context: None,
1304            caused_by: None,
1305            version: "1.0".to_string(),
1306            properties: HashMap::new(),
1307            checksum: None,
1308        };
1309
1310        let event = StreamEvent::TripleAdded {
1311            subject: "test:subject".to_string(),
1312            predicate: "test:predicate".to_string(),
1313            object: "value".to_string(),
1314            graph: None,
1315            metadata,
1316        };
1317
1318        harness.push_event(event).await.unwrap();
1319
1320        let captured = harness.get_captured_events().await;
1321        assert_eq!(captured.len(), 1);
1322        assert_eq!(captured[0].source, "input");
1323    }
1324
1325    #[tokio::test]
1326    async fn test_time_advancement() {
1327        let initial = Utc::now();
1328        let harness = TestHarness::builder()
1329            .with_mock_clock()
1330            .with_initial_time(initial)
1331            .build()
1332            .await
1333            .unwrap();
1334
1335        assert_eq!(harness.now().await, initial);
1336
1337        harness.advance_time(Duration::from_secs(3600)).await;
1338        let after = harness.now().await;
1339        let diff = (after - initial).num_seconds();
1340        assert_eq!(diff, 3600);
1341    }
1342}