1use anyhow::{anyhow, Result};
35use chrono::{DateTime, Utc};
36use serde::{Deserialize, Serialize};
37use std::collections::{HashMap, VecDeque};
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{mpsc, RwLock};
42use tracing::{debug, info};
43use uuid::Uuid;
44
45use crate::event::{EventMetadata, StreamEvent};
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct TestHarnessConfig {
50 pub use_mock_clock: bool,
52 pub initial_time: Option<DateTime<Utc>>,
54 pub event_buffer_size: usize,
56 pub assertion_timeout: Duration,
58 pub verbose: bool,
60 pub capture_events: bool,
62 pub max_captured_events: usize,
64 pub enable_metrics: bool,
66}
67
68impl Default for TestHarnessConfig {
69 fn default() -> Self {
70 Self {
71 use_mock_clock: true,
72 initial_time: None,
73 event_buffer_size: 10000,
74 assertion_timeout: Duration::from_secs(10),
75 verbose: false,
76 capture_events: true,
77 max_captured_events: 100000,
78 enable_metrics: true,
79 }
80 }
81}
82
83pub struct MockClock {
85 current_time: Arc<RwLock<DateTime<Utc>>>,
87 listeners: Arc<RwLock<Vec<mpsc::Sender<DateTime<Utc>>>>>,
89}
90
91impl MockClock {
92 pub fn new(initial_time: DateTime<Utc>) -> Self {
94 Self {
95 current_time: Arc::new(RwLock::new(initial_time)),
96 listeners: Arc::new(RwLock::new(Vec::new())),
97 }
98 }
99
100 pub async fn now(&self) -> DateTime<Utc> {
102 *self.current_time.read().await
103 }
104
105 pub async fn advance(&self, duration: Duration) {
107 let mut time = self.current_time.write().await;
108 *time += chrono::Duration::from_std(duration).unwrap_or_default();
109
110 let new_time = *time;
111 drop(time);
112
113 let listeners = self.listeners.read().await;
115 for listener in listeners.iter() {
116 let _ = listener.send(new_time).await;
117 }
118 }
119
120 pub async fn set_time(&self, time: DateTime<Utc>) {
122 let mut current = self.current_time.write().await;
123 *current = time;
124
125 drop(current);
126
127 let listeners = self.listeners.read().await;
128 for listener in listeners.iter() {
129 let _ = listener.send(time).await;
130 }
131 }
132
133 pub async fn subscribe(&self) -> mpsc::Receiver<DateTime<Utc>> {
135 let (tx, rx) = mpsc::channel(100);
136 let mut listeners = self.listeners.write().await;
137 listeners.push(tx);
138 rx
139 }
140}
141
142pub struct EventGenerator {
144 generator_type: GeneratorType,
146 counter: AtomicU64,
148 config: GeneratorConfig,
150}
151
152#[derive(Debug, Clone)]
154pub struct GeneratorConfig {
155 pub source: String,
157 pub properties: HashMap<String, String>,
159 pub timestamp_increment: Duration,
161}
162
163impl Default for GeneratorConfig {
164 fn default() -> Self {
165 Self {
166 source: "test_generator".to_string(),
167 properties: HashMap::new(),
168 timestamp_increment: Duration::from_millis(100),
169 }
170 }
171}
172
173#[derive(Debug, Clone)]
175pub enum GeneratorType {
176 Sequential { start: u64, step: u64 },
178 Random { min: f64, max: f64 },
180 Cyclic { pattern: Vec<f64>, index: usize },
182 Gaussian { mean: f64, stddev: f64 },
184 Custom,
186}
187
188impl EventGenerator {
189 pub fn sequential(_count: u64) -> Self {
191 Self {
192 generator_type: GeneratorType::Sequential { start: 0, step: 1 },
193 counter: AtomicU64::new(0),
194 config: GeneratorConfig::default(),
195 }
196 }
197
198 pub fn random(min: f64, max: f64) -> Self {
200 Self {
201 generator_type: GeneratorType::Random { min, max },
202 counter: AtomicU64::new(0),
203 config: GeneratorConfig::default(),
204 }
205 }
206
207 pub fn cyclic(pattern: Vec<f64>) -> Self {
209 Self {
210 generator_type: GeneratorType::Cyclic { pattern, index: 0 },
211 counter: AtomicU64::new(0),
212 config: GeneratorConfig::default(),
213 }
214 }
215
216 pub fn gaussian(mean: f64, stddev: f64) -> Self {
218 Self {
219 generator_type: GeneratorType::Gaussian { mean, stddev },
220 counter: AtomicU64::new(0),
221 config: GeneratorConfig::default(),
222 }
223 }
224
225 pub fn with_source(mut self, source: String) -> Self {
227 self.config.source = source;
228 self
229 }
230
231 pub fn with_properties(mut self, properties: HashMap<String, String>) -> Self {
233 self.config.properties = properties;
234 self
235 }
236
237 pub fn next_event(&self, timestamp: DateTime<Utc>) -> StreamEvent {
239 let count = self.counter.fetch_add(1, Ordering::SeqCst);
240
241 let value = match &self.generator_type {
242 GeneratorType::Sequential { start, step } => {
243 format!("{}", start + count * step)
244 }
245 GeneratorType::Random { min, max } => {
246 let range = max - min;
247 let value = min + (count as f64 % 1000.0) / 1000.0 * range;
248 format!("{:.2}", value)
249 }
250 GeneratorType::Cyclic { pattern, .. } => {
251 let index = count as usize % pattern.len();
252 format!("{:.2}", pattern[index])
253 }
254 GeneratorType::Gaussian { mean, stddev } => {
255 let value = mean + (count as f64 % 10.0 - 5.0) * stddev / 5.0;
257 format!("{:.2}", value)
258 }
259 GeneratorType::Custom => {
260 format!("{}", count)
261 }
262 };
263
264 let metadata = EventMetadata {
265 event_id: Uuid::new_v4().to_string(),
266 timestamp,
267 source: self.config.source.clone(),
268 user: None,
269 context: Some(format!("test_event_{}", count)),
270 caused_by: None,
271 version: "1.0".to_string(),
272 properties: self.config.properties.clone(),
273 checksum: None,
274 };
275
276 StreamEvent::TripleAdded {
278 subject: format!("test:subject_{}", count),
279 predicate: "test:predicate".to_string(),
280 object: value,
281 graph: None,
282 metadata,
283 }
284 }
285
286 pub fn generate_batch(&self, count: usize, start_time: DateTime<Utc>) -> Vec<StreamEvent> {
288 let mut events = Vec::with_capacity(count);
289 let mut time = start_time;
290
291 for _ in 0..count {
292 events.push(self.next_event(time));
293 time += chrono::Duration::from_std(self.config.timestamp_increment).unwrap_or_default();
294 }
295
296 events
297 }
298}
299
300pub struct TestHarness {
302 config: TestHarnessConfig,
304 clock: Arc<MockClock>,
306 generator: Option<Arc<EventGenerator>>,
308 input_tx: mpsc::Sender<StreamEvent>,
310 input_rx: Arc<RwLock<mpsc::Receiver<StreamEvent>>>,
312 output_events: Arc<RwLock<VecDeque<StreamEvent>>>,
314 captured_events: Arc<RwLock<Vec<CapturedEvent>>>,
316 metrics: Arc<RwLock<TestMetrics>>,
318 assertions: Arc<RwLock<Vec<Assertion>>>,
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct CapturedEvent {
325 pub event: StreamEvent,
327 pub captured_at: DateTime<Utc>,
329 pub processing_time: Option<Duration>,
331 pub source: String,
333}
334
335#[derive(Debug, Clone, Default, Serialize, Deserialize)]
337pub struct TestMetrics {
338 pub events_pushed: u64,
340 pub events_received: u64,
342 pub total_assertions: u64,
344 pub passed_assertions: u64,
346 pub failed_assertions: u64,
348 pub avg_processing_time_us: f64,
350 pub max_processing_time_us: u64,
352 pub test_duration: Duration,
354 pub memory_usage_bytes: usize,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct Assertion {
361 pub assertion_type: AssertionType,
363 pub expected: String,
365 pub actual: Option<String>,
367 pub passed: bool,
369 pub error_message: Option<String>,
371 pub timestamp: DateTime<Utc>,
373}
374
375#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
377pub enum AssertionType {
378 EventCount,
380 Contains,
382 Order,
384 NoEvents,
386 Property,
388 WithinDuration,
390 Performance,
392 Custom(String),
394}
395
396pub struct TestHarnessBuilder {
398 config: TestHarnessConfig,
399 generator: Option<EventGenerator>,
400}
401
402impl TestHarnessBuilder {
403 pub fn new() -> Self {
405 Self {
406 config: TestHarnessConfig::default(),
407 generator: None,
408 }
409 }
410
411 pub fn with_mock_clock(mut self) -> Self {
413 self.config.use_mock_clock = true;
414 self
415 }
416
417 pub fn with_initial_time(mut self, time: DateTime<Utc>) -> Self {
419 self.config.initial_time = Some(time);
420 self
421 }
422
423 pub fn with_buffer_size(mut self, size: usize) -> Self {
425 self.config.event_buffer_size = size;
426 self
427 }
428
429 pub fn with_timeout(mut self, timeout: Duration) -> Self {
431 self.config.assertion_timeout = timeout;
432 self
433 }
434
435 pub fn verbose(mut self) -> Self {
437 self.config.verbose = true;
438 self
439 }
440
441 pub fn with_event_generator(mut self, generator: EventGenerator) -> Self {
443 self.generator = Some(generator);
444 self
445 }
446
447 pub async fn build(self) -> Result<TestHarness> {
449 let initial_time = self.config.initial_time.unwrap_or_else(Utc::now);
450 let clock = Arc::new(MockClock::new(initial_time));
451
452 let (input_tx, input_rx) = mpsc::channel(self.config.event_buffer_size);
453
454 let harness = TestHarness {
455 config: self.config,
456 clock,
457 generator: self.generator.map(Arc::new),
458 input_tx,
459 input_rx: Arc::new(RwLock::new(input_rx)),
460 output_events: Arc::new(RwLock::new(VecDeque::new())),
461 captured_events: Arc::new(RwLock::new(Vec::new())),
462 metrics: Arc::new(RwLock::new(TestMetrics::default())),
463 assertions: Arc::new(RwLock::new(Vec::new())),
464 };
465
466 if harness.config.verbose {
467 info!("Test harness created with config: {:?}", harness.config);
468 }
469
470 Ok(harness)
471 }
472}
473
474impl Default for TestHarnessBuilder {
475 fn default() -> Self {
476 Self::new()
477 }
478}
479
480impl TestHarness {
481 pub fn builder() -> TestHarnessBuilder {
483 TestHarnessBuilder::new()
484 }
485
486 pub async fn now(&self) -> DateTime<Utc> {
488 self.clock.now().await
489 }
490
491 pub async fn advance_time(&self, duration: Duration) {
493 if self.config.verbose {
494 debug!("Advancing time by {:?}", duration);
495 }
496 self.clock.advance(duration).await;
497 }
498
499 pub async fn set_time(&self, time: DateTime<Utc>) {
501 if self.config.verbose {
502 debug!("Setting time to {:?}", time);
503 }
504 self.clock.set_time(time).await;
505 }
506
507 pub async fn push_event(&self, event: StreamEvent) -> Result<()> {
509 self.input_tx
510 .send(event.clone())
511 .await
512 .map_err(|e| anyhow!("Failed to push event: {}", e))?;
513
514 if self.config.capture_events {
515 let mut captured = self.captured_events.write().await;
516 if captured.len() < self.config.max_captured_events {
517 captured.push(CapturedEvent {
518 event,
519 captured_at: self.clock.now().await,
520 processing_time: None,
521 source: "input".to_string(),
522 });
523 }
524 }
525
526 let mut metrics = self.metrics.write().await;
527 metrics.events_pushed += 1;
528
529 Ok(())
530 }
531
532 pub async fn push_events(&self, events: Vec<StreamEvent>) -> Result<()> {
534 for event in events {
535 self.push_event(event).await?;
536 }
537 Ok(())
538 }
539
540 pub async fn generate_events(&self, count: usize) -> Result<()> {
542 if let Some(generator) = &self.generator {
543 let time = self.clock.now().await;
544 let events = generator.generate_batch(count, time);
545 self.push_events(events).await
546 } else {
547 Err(anyhow!("No event generator configured"))
548 }
549 }
550
551 pub async fn add_output(&self, event: StreamEvent) {
553 let mut output = self.output_events.write().await;
554 output.push_back(event.clone());
555
556 if self.config.capture_events {
557 let mut captured = self.captured_events.write().await;
558 if captured.len() < self.config.max_captured_events {
559 captured.push(CapturedEvent {
560 event,
561 captured_at: self.clock.now().await,
562 processing_time: None,
563 source: "output".to_string(),
564 });
565 }
566 }
567
568 let mut metrics = self.metrics.write().await;
569 metrics.events_received += 1;
570 }
571
572 pub async fn get_output(&self) -> Vec<StreamEvent> {
574 let output = self.output_events.read().await;
575 output.iter().cloned().collect()
576 }
577
578 pub async fn clear_output(&self) {
580 let mut output = self.output_events.write().await;
581 output.clear();
582 }
583
584 pub async fn get_captured_events(&self) -> Vec<CapturedEvent> {
586 let captured = self.captured_events.read().await;
587 captured.clone()
588 }
589
590 pub async fn assert_event_count(&self, expected: usize) -> Result<()> {
592 let output = self.output_events.read().await;
593 let actual = output.len();
594
595 let passed = actual == expected;
596 let error_message = if passed {
597 None
598 } else {
599 Some(format!("Expected {} events, got {}", expected, actual))
600 };
601
602 let assertion = Assertion {
603 assertion_type: AssertionType::EventCount,
604 expected: expected.to_string(),
605 actual: Some(actual.to_string()),
606 passed,
607 error_message: error_message.clone(),
608 timestamp: self.clock.now().await,
609 };
610
611 let mut assertions = self.assertions.write().await;
612 assertions.push(assertion);
613
614 let mut metrics = self.metrics.write().await;
615 metrics.total_assertions += 1;
616 if passed {
617 metrics.passed_assertions += 1;
618 } else {
619 metrics.failed_assertions += 1;
620 }
621
622 if passed {
623 Ok(())
624 } else {
625 Err(anyhow!(
626 error_message.expect("error_message should be set when assertion fails")
627 ))
628 }
629 }
630
631 pub async fn assert_contains(&self, predicate: impl Fn(&StreamEvent) -> bool) -> Result<()> {
633 let output = self.output_events.read().await;
634 let found = output.iter().any(predicate);
635
636 let passed = found;
637 let error_message = if passed {
638 None
639 } else {
640 Some("Expected event not found in output".to_string())
641 };
642
643 let assertion = Assertion {
644 assertion_type: AssertionType::Contains,
645 expected: "matching event".to_string(),
646 actual: Some(format!("{} events checked", output.len())),
647 passed,
648 error_message: error_message.clone(),
649 timestamp: self.clock.now().await,
650 };
651
652 let mut assertions = self.assertions.write().await;
653 assertions.push(assertion);
654
655 let mut metrics = self.metrics.write().await;
656 metrics.total_assertions += 1;
657 if passed {
658 metrics.passed_assertions += 1;
659 } else {
660 metrics.failed_assertions += 1;
661 }
662
663 if passed {
664 Ok(())
665 } else {
666 Err(anyhow!(
667 error_message.expect("error_message should be set when assertion fails")
668 ))
669 }
670 }
671
672 pub async fn assert_no_events(&self) -> Result<()> {
674 self.assert_event_count(0).await
675 }
676
677 pub async fn assert_within(
679 &self,
680 duration: Duration,
681 condition: impl Fn(&[StreamEvent]) -> bool,
682 ) -> Result<()> {
683 let start = Instant::now();
684
685 while start.elapsed() < duration {
686 let output = self.output_events.read().await;
687 let events: Vec<_> = output.iter().cloned().collect();
688 drop(output);
689
690 if condition(&events) {
691 let assertion = Assertion {
692 assertion_type: AssertionType::WithinDuration,
693 expected: format!("condition within {:?}", duration),
694 actual: Some(format!("satisfied after {:?}", start.elapsed())),
695 passed: true,
696 error_message: None,
697 timestamp: self.clock.now().await,
698 };
699
700 let mut assertions = self.assertions.write().await;
701 assertions.push(assertion);
702
703 let mut metrics = self.metrics.write().await;
704 metrics.total_assertions += 1;
705 metrics.passed_assertions += 1;
706
707 return Ok(());
708 }
709
710 tokio::time::sleep(Duration::from_millis(10)).await;
711 }
712
713 let error_message = format!("Condition not satisfied within {:?}", duration);
714
715 let assertion = Assertion {
716 assertion_type: AssertionType::WithinDuration,
717 expected: format!("condition within {:?}", duration),
718 actual: Some("timeout".to_string()),
719 passed: false,
720 error_message: Some(error_message.clone()),
721 timestamp: self.clock.now().await,
722 };
723
724 let mut assertions = self.assertions.write().await;
725 assertions.push(assertion);
726
727 let mut metrics = self.metrics.write().await;
728 metrics.total_assertions += 1;
729 metrics.failed_assertions += 1;
730
731 Err(anyhow!(error_message))
732 }
733
734 pub async fn assert_performance(
736 &self,
737 metric: PerformanceMetric,
738 threshold: f64,
739 ) -> Result<()> {
740 let metrics = self.metrics.read().await;
741
742 let (actual, passed) = match metric {
743 PerformanceMetric::AvgLatency => (
744 metrics.avg_processing_time_us,
745 metrics.avg_processing_time_us <= threshold,
746 ),
747 PerformanceMetric::MaxLatency => (
748 metrics.max_processing_time_us as f64,
749 metrics.max_processing_time_us as f64 <= threshold,
750 ),
751 PerformanceMetric::Throughput => {
752 let throughput = if metrics.test_duration.as_secs_f64() > 0.0 {
753 metrics.events_received as f64 / metrics.test_duration.as_secs_f64()
754 } else {
755 0.0
756 };
757 (throughput, throughput >= threshold)
758 }
759 };
760
761 drop(metrics);
762
763 let error_message = if passed {
764 None
765 } else {
766 Some(format!(
767 "{:?} {} does not meet threshold {}",
768 metric, actual, threshold
769 ))
770 };
771
772 let assertion = Assertion {
773 assertion_type: AssertionType::Performance,
774 expected: format!("{:?} <= {}", metric, threshold),
775 actual: Some(actual.to_string()),
776 passed,
777 error_message: error_message.clone(),
778 timestamp: self.clock.now().await,
779 };
780
781 let mut assertions = self.assertions.write().await;
782 assertions.push(assertion);
783
784 let mut metrics = self.metrics.write().await;
785 metrics.total_assertions += 1;
786 if passed {
787 metrics.passed_assertions += 1;
788 } else {
789 metrics.failed_assertions += 1;
790 }
791
792 if passed {
793 Ok(())
794 } else {
795 Err(anyhow!(
796 error_message.expect("error_message should be set when assertion fails")
797 ))
798 }
799 }
800
801 pub async fn get_metrics(&self) -> TestMetrics {
803 self.metrics.read().await.clone()
804 }
805
806 pub async fn get_assertions(&self) -> Vec<Assertion> {
808 self.assertions.read().await.clone()
809 }
810
811 pub async fn generate_report(&self) -> TestReport {
813 let metrics = self.metrics.read().await;
814 let assertions = self.assertions.read().await;
815 let captured = self.captured_events.read().await;
816
817 TestReport {
818 test_name: "stream_test".to_string(),
819 status: if metrics.failed_assertions == 0 {
820 TestStatus::Passed
821 } else {
822 TestStatus::Failed
823 },
824 metrics: metrics.clone(),
825 assertions: assertions.clone(),
826 event_count: captured.len(),
827 generated_at: Utc::now(),
828 }
829 }
830
831 pub async fn reset(&self) {
833 self.output_events.write().await.clear();
834 self.captured_events.write().await.clear();
835 *self.metrics.write().await = TestMetrics::default();
836 self.assertions.write().await.clear();
837
838 if self.config.verbose {
839 info!("Test harness reset");
840 }
841 }
842}
843
844#[derive(Debug, Clone, Copy)]
846pub enum PerformanceMetric {
847 AvgLatency,
849 MaxLatency,
851 Throughput,
853}
854
855#[derive(Debug, Clone, Serialize, Deserialize)]
857pub struct TestReport {
858 pub test_name: String,
860 pub status: TestStatus,
862 pub metrics: TestMetrics,
864 pub assertions: Vec<Assertion>,
866 pub event_count: usize,
868 pub generated_at: DateTime<Utc>,
870}
871
872#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
874pub enum TestStatus {
875 Passed,
876 Failed,
877 Skipped,
878 Error,
879}
880
881impl TestReport {
882 pub fn to_json(&self) -> Result<String> {
884 serde_json::to_string_pretty(self).map_err(|e| anyhow!("JSON error: {}", e))
885 }
886
887 pub fn print_summary(&self) {
889 println!("\n=== Test Report: {} ===", self.test_name);
890 println!("Status: {:?}", self.status);
891 println!("Events pushed: {}", self.metrics.events_pushed);
892 println!("Events received: {}", self.metrics.events_received);
893 println!(
894 "Assertions: {}/{} passed",
895 self.metrics.passed_assertions, self.metrics.total_assertions
896 );
897 if self.metrics.total_assertions > 0 && self.metrics.failed_assertions > 0 {
898 println!("Failed assertions:");
899 for assertion in &self.assertions {
900 if !assertion.passed {
901 println!(
902 " - {:?}: {}",
903 assertion.assertion_type,
904 assertion.error_message.clone().unwrap_or_default()
905 );
906 }
907 }
908 }
909 println!("========================\n");
910 }
911}
912
913pub struct TestFixture {
915 pub name: String,
917 pub setup_events: Vec<StreamEvent>,
919 pub expected_outputs: Vec<StreamEvent>,
921 pub time_advance: Option<Duration>,
923}
924
925impl TestFixture {
926 pub fn new(name: &str) -> Self {
928 Self {
929 name: name.to_string(),
930 setup_events: Vec::new(),
931 expected_outputs: Vec::new(),
932 time_advance: None,
933 }
934 }
935
936 pub fn with_input(mut self, event: StreamEvent) -> Self {
938 self.setup_events.push(event);
939 self
940 }
941
942 pub fn expect_output(mut self, event: StreamEvent) -> Self {
944 self.expected_outputs.push(event);
945 self
946 }
947
948 pub fn advance_time(mut self, duration: Duration) -> Self {
950 self.time_advance = Some(duration);
951 self
952 }
953
954 pub async fn run(&self, harness: &TestHarness) -> Result<()> {
956 harness.push_events(self.setup_events.clone()).await?;
958
959 if let Some(duration) = self.time_advance {
961 harness.advance_time(duration).await;
962 }
963
964 harness
966 .assert_event_count(self.expected_outputs.len())
967 .await?;
968
969 Ok(())
970 }
971}
972
973type EventPredicate = Box<dyn Fn(&StreamEvent) -> bool + Send + Sync>;
975
976pub struct EventMatcher {
978 conditions: Vec<EventPredicate>,
979}
980
981impl EventMatcher {
982 pub fn new() -> Self {
984 Self {
985 conditions: Vec::new(),
986 }
987 }
988
989 pub fn with_condition<F>(mut self, condition: F) -> Self
991 where
992 F: Fn(&StreamEvent) -> bool + Send + Sync + 'static,
993 {
994 self.conditions.push(Box::new(condition));
995 self
996 }
997
998 pub fn triple_added(mut self) -> Self {
1000 self.conditions
1001 .push(Box::new(|e| matches!(e, StreamEvent::TripleAdded { .. })));
1002 self
1003 }
1004
1005 pub fn triple_removed(mut self) -> Self {
1007 self.conditions
1008 .push(Box::new(|e| matches!(e, StreamEvent::TripleRemoved { .. })));
1009 self
1010 }
1011
1012 pub fn with_source(mut self, source: &str) -> Self {
1014 let source = source.to_string();
1015 self.conditions.push(Box::new(move |e| match e {
1016 StreamEvent::TripleAdded { metadata, .. }
1017 | StreamEvent::TripleRemoved { metadata, .. }
1018 | StreamEvent::GraphCreated { metadata, .. }
1019 | StreamEvent::GraphDeleted { metadata, .. }
1020 | StreamEvent::TransactionBegin { metadata, .. }
1021 | StreamEvent::TransactionCommit { metadata, .. }
1022 | StreamEvent::TransactionAbort { metadata, .. }
1023 | StreamEvent::Heartbeat { metadata, .. } => metadata.source == source,
1024 _ => false,
1025 }));
1026 self
1027 }
1028
1029 pub fn sparql_update(mut self) -> Self {
1031 self.conditions
1032 .push(Box::new(|e| matches!(e, StreamEvent::SparqlUpdate { .. })));
1033 self
1034 }
1035
1036 pub fn heartbeat(mut self) -> Self {
1038 self.conditions
1039 .push(Box::new(|e| matches!(e, StreamEvent::Heartbeat { .. })));
1040 self
1041 }
1042
1043 pub fn matches(&self, event: &StreamEvent) -> bool {
1045 self.conditions.iter().all(|c| c(event))
1046 }
1047}
1048
1049impl Default for EventMatcher {
1050 fn default() -> Self {
1051 Self::new()
1052 }
1053}
1054
1055#[macro_export]
1057macro_rules! assert_stream_output {
1058 ($harness:expr, count($expected:expr)) => {
1059 $harness.assert_event_count($expected).await
1060 };
1061 ($harness:expr, contains($predicate:expr)) => {
1062 $harness.assert_contains($predicate).await
1063 };
1064 ($harness:expr, empty) => {
1065 $harness.assert_no_events().await
1066 };
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072
1073 #[tokio::test]
1074 async fn test_harness_builder() {
1075 let harness = TestHarness::builder()
1076 .with_mock_clock()
1077 .with_buffer_size(1000)
1078 .with_timeout(Duration::from_secs(5))
1079 .build()
1080 .await
1081 .unwrap();
1082
1083 assert!(harness.config.use_mock_clock);
1084 assert_eq!(harness.config.event_buffer_size, 1000);
1085 }
1086
1087 #[tokio::test]
1088 async fn test_mock_clock() {
1089 let clock = MockClock::new(Utc::now());
1090 let initial = clock.now().await;
1091
1092 clock.advance(Duration::from_secs(60)).await;
1093 let after = clock.now().await;
1094
1095 let diff = (after - initial).num_seconds();
1096 assert_eq!(diff, 60);
1097 }
1098
1099 #[tokio::test]
1100 async fn test_event_generator() {
1101 let generator = EventGenerator::sequential(10);
1102 let time = Utc::now();
1103
1104 let events = generator.generate_batch(5, time);
1105 assert_eq!(events.len(), 5);
1106 }
1107
1108 #[tokio::test]
1109 async fn test_push_events() {
1110 let harness = TestHarness::builder().build().await.unwrap();
1111
1112 let metadata = EventMetadata {
1113 event_id: Uuid::new_v4().to_string(),
1114 timestamp: Utc::now(),
1115 source: "test".to_string(),
1116 user: None,
1117 context: None,
1118 caused_by: None,
1119 version: "1.0".to_string(),
1120 properties: HashMap::new(),
1121 checksum: None,
1122 };
1123
1124 let event = StreamEvent::TripleAdded {
1125 subject: "test:subject".to_string(),
1126 predicate: "test:predicate".to_string(),
1127 object: "value1".to_string(),
1128 graph: None,
1129 metadata,
1130 };
1131
1132 harness.push_event(event).await.unwrap();
1133
1134 let metrics = harness.get_metrics().await;
1135 assert_eq!(metrics.events_pushed, 1);
1136 }
1137
1138 #[tokio::test]
1139 async fn test_assert_event_count() {
1140 let harness = TestHarness::builder().build().await.unwrap();
1141
1142 harness.assert_event_count(0).await.unwrap();
1144
1145 let metadata = EventMetadata {
1147 event_id: Uuid::new_v4().to_string(),
1148 timestamp: Utc::now(),
1149 source: "test".to_string(),
1150 user: None,
1151 context: None,
1152 caused_by: None,
1153 version: "1.0".to_string(),
1154 properties: HashMap::new(),
1155 checksum: None,
1156 };
1157
1158 let event = StreamEvent::TripleAdded {
1159 subject: "test:subject".to_string(),
1160 predicate: "test:predicate".to_string(),
1161 object: "value1".to_string(),
1162 graph: None,
1163 metadata,
1164 };
1165
1166 harness.add_output(event).await;
1167
1168 harness.assert_event_count(1).await.unwrap();
1170
1171 assert!(harness.assert_event_count(2).await.is_err());
1173 }
1174
1175 #[tokio::test]
1176 async fn test_assert_contains() {
1177 let harness = TestHarness::builder().build().await.unwrap();
1178
1179 let metadata = EventMetadata {
1180 event_id: Uuid::new_v4().to_string(),
1181 timestamp: Utc::now(),
1182 source: "test".to_string(),
1183 user: None,
1184 context: None,
1185 caused_by: None,
1186 version: "1.0".to_string(),
1187 properties: HashMap::new(),
1188 checksum: None,
1189 };
1190
1191 let event = StreamEvent::TripleAdded {
1192 subject: "test:subject".to_string(),
1193 predicate: "test:predicate".to_string(),
1194 object: "value42".to_string(),
1195 graph: None,
1196 metadata,
1197 };
1198
1199 harness.add_output(event).await;
1200
1201 harness.assert_contains(|e| {
1203 matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "test:subject")
1204 }).await.unwrap();
1205
1206 assert!(harness
1208 .assert_contains(|e| {
1209 matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "other:subject")
1210 })
1211 .await
1212 .is_err());
1213 }
1214
1215 #[tokio::test]
1216 async fn test_event_matcher() {
1217 let matcher = EventMatcher::new().triple_added();
1218
1219 let metadata = EventMetadata {
1220 event_id: Uuid::new_v4().to_string(),
1221 timestamp: Utc::now(),
1222 source: "test".to_string(),
1223 user: None,
1224 context: None,
1225 caused_by: None,
1226 version: "1.0".to_string(),
1227 properties: HashMap::new(),
1228 checksum: None,
1229 };
1230
1231 let event = StreamEvent::TripleAdded {
1232 subject: "test:subject".to_string(),
1233 predicate: "test:predicate".to_string(),
1234 object: "value".to_string(),
1235 graph: None,
1236 metadata,
1237 };
1238
1239 assert!(matcher.matches(&event));
1240 }
1241
1242 #[tokio::test]
1243 async fn test_generate_report() {
1244 let harness = TestHarness::builder().build().await.unwrap();
1245
1246 harness.assert_event_count(0).await.unwrap();
1247
1248 let report = harness.generate_report().await;
1249 assert_eq!(report.status, TestStatus::Passed);
1250 assert_eq!(report.metrics.total_assertions, 1);
1251 assert_eq!(report.metrics.passed_assertions, 1);
1252 }
1253
1254 #[tokio::test]
1255 async fn test_fixture() {
1256 let harness = TestHarness::builder().build().await.unwrap();
1257
1258 let fixture = TestFixture::new("basic_test").advance_time(Duration::from_secs(60));
1259
1260 fixture.run(&harness).await.unwrap();
1262 }
1263
1264 #[tokio::test]
1265 async fn test_harness_reset() {
1266 let harness = TestHarness::builder().build().await.unwrap();
1267
1268 let metadata = EventMetadata {
1269 event_id: Uuid::new_v4().to_string(),
1270 timestamp: Utc::now(),
1271 source: "test".to_string(),
1272 user: None,
1273 context: None,
1274 caused_by: None,
1275 version: "1.0".to_string(),
1276 properties: HashMap::new(),
1277 checksum: None,
1278 };
1279
1280 let event = StreamEvent::TripleAdded {
1281 subject: "test:subject".to_string(),
1282 predicate: "test:predicate".to_string(),
1283 object: "value".to_string(),
1284 graph: None,
1285 metadata,
1286 };
1287
1288 harness.add_output(event).await;
1289 harness.assert_event_count(1).await.unwrap();
1290
1291 harness.reset().await;
1292
1293 let metrics = harness.get_metrics().await;
1294 assert_eq!(metrics.events_received, 0);
1295 assert_eq!(metrics.total_assertions, 0);
1296
1297 harness.assert_event_count(0).await.unwrap();
1298 }
1299
1300 #[tokio::test]
1301 async fn test_captured_events() {
1302 let harness = TestHarness::builder().build().await.unwrap();
1303
1304 let metadata = EventMetadata {
1305 event_id: Uuid::new_v4().to_string(),
1306 timestamp: Utc::now(),
1307 source: "test".to_string(),
1308 user: None,
1309 context: None,
1310 caused_by: None,
1311 version: "1.0".to_string(),
1312 properties: HashMap::new(),
1313 checksum: None,
1314 };
1315
1316 let event = StreamEvent::TripleAdded {
1317 subject: "test:subject".to_string(),
1318 predicate: "test:predicate".to_string(),
1319 object: "value".to_string(),
1320 graph: None,
1321 metadata,
1322 };
1323
1324 harness.push_event(event).await.unwrap();
1325
1326 let captured = harness.get_captured_events().await;
1327 assert_eq!(captured.len(), 1);
1328 assert_eq!(captured[0].source, "input");
1329 }
1330
1331 #[tokio::test]
1332 async fn test_time_advancement() {
1333 let initial = Utc::now();
1334 let harness = TestHarness::builder()
1335 .with_mock_clock()
1336 .with_initial_time(initial)
1337 .build()
1338 .await
1339 .unwrap();
1340
1341 assert_eq!(harness.now().await, initial);
1342
1343 harness.advance_time(Duration::from_secs(3600)).await;
1344 let after = harness.now().await;
1345 let diff = (after - initial).num_seconds();
1346 assert_eq!(diff, 3600);
1347 }
1348}