1use anyhow::{anyhow, Result};
35use chrono::{DateTime, Utc};
36use serde::{Deserialize, Serialize};
37use std::collections::{HashMap, VecDeque};
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{mpsc, RwLock};
42use tracing::{debug, info};
43use uuid::Uuid;
44
45use crate::event::{EventMetadata, StreamEvent};
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct TestHarnessConfig {
50 pub use_mock_clock: bool,
52 pub initial_time: Option<DateTime<Utc>>,
54 pub event_buffer_size: usize,
56 pub assertion_timeout: Duration,
58 pub verbose: bool,
60 pub capture_events: bool,
62 pub max_captured_events: usize,
64 pub enable_metrics: bool,
66}
67
68impl Default for TestHarnessConfig {
69 fn default() -> Self {
70 Self {
71 use_mock_clock: true,
72 initial_time: None,
73 event_buffer_size: 10000,
74 assertion_timeout: Duration::from_secs(10),
75 verbose: false,
76 capture_events: true,
77 max_captured_events: 100000,
78 enable_metrics: true,
79 }
80 }
81}
82
83pub struct MockClock {
85 current_time: Arc<RwLock<DateTime<Utc>>>,
87 listeners: Arc<RwLock<Vec<mpsc::Sender<DateTime<Utc>>>>>,
89}
90
91impl MockClock {
92 pub fn new(initial_time: DateTime<Utc>) -> Self {
94 Self {
95 current_time: Arc::new(RwLock::new(initial_time)),
96 listeners: Arc::new(RwLock::new(Vec::new())),
97 }
98 }
99
100 pub async fn now(&self) -> DateTime<Utc> {
102 *self.current_time.read().await
103 }
104
105 pub async fn advance(&self, duration: Duration) {
107 let mut time = self.current_time.write().await;
108 *time += chrono::Duration::from_std(duration).unwrap_or_default();
109
110 let new_time = *time;
111 drop(time);
112
113 let listeners = self.listeners.read().await;
115 for listener in listeners.iter() {
116 let _ = listener.send(new_time).await;
117 }
118 }
119
120 pub async fn set_time(&self, time: DateTime<Utc>) {
122 let mut current = self.current_time.write().await;
123 *current = time;
124
125 drop(current);
126
127 let listeners = self.listeners.read().await;
128 for listener in listeners.iter() {
129 let _ = listener.send(time).await;
130 }
131 }
132
133 pub async fn subscribe(&self) -> mpsc::Receiver<DateTime<Utc>> {
135 let (tx, rx) = mpsc::channel(100);
136 let mut listeners = self.listeners.write().await;
137 listeners.push(tx);
138 rx
139 }
140}
141
142pub struct EventGenerator {
144 generator_type: GeneratorType,
146 counter: AtomicU64,
148 config: GeneratorConfig,
150}
151
152#[derive(Debug, Clone)]
154pub struct GeneratorConfig {
155 pub source: String,
157 pub properties: HashMap<String, String>,
159 pub timestamp_increment: Duration,
161}
162
163impl Default for GeneratorConfig {
164 fn default() -> Self {
165 Self {
166 source: "test_generator".to_string(),
167 properties: HashMap::new(),
168 timestamp_increment: Duration::from_millis(100),
169 }
170 }
171}
172
173#[derive(Debug, Clone)]
175pub enum GeneratorType {
176 Sequential { start: u64, step: u64 },
178 Random { min: f64, max: f64 },
180 Cyclic { pattern: Vec<f64>, index: usize },
182 Gaussian { mean: f64, stddev: f64 },
184 Custom,
186}
187
188impl EventGenerator {
189 pub fn sequential(_count: u64) -> Self {
191 Self {
192 generator_type: GeneratorType::Sequential { start: 0, step: 1 },
193 counter: AtomicU64::new(0),
194 config: GeneratorConfig::default(),
195 }
196 }
197
198 pub fn random(min: f64, max: f64) -> Self {
200 Self {
201 generator_type: GeneratorType::Random { min, max },
202 counter: AtomicU64::new(0),
203 config: GeneratorConfig::default(),
204 }
205 }
206
207 pub fn cyclic(pattern: Vec<f64>) -> Self {
209 Self {
210 generator_type: GeneratorType::Cyclic { pattern, index: 0 },
211 counter: AtomicU64::new(0),
212 config: GeneratorConfig::default(),
213 }
214 }
215
216 pub fn gaussian(mean: f64, stddev: f64) -> Self {
218 Self {
219 generator_type: GeneratorType::Gaussian { mean, stddev },
220 counter: AtomicU64::new(0),
221 config: GeneratorConfig::default(),
222 }
223 }
224
225 pub fn with_source(mut self, source: String) -> Self {
227 self.config.source = source;
228 self
229 }
230
231 pub fn with_properties(mut self, properties: HashMap<String, String>) -> Self {
233 self.config.properties = properties;
234 self
235 }
236
237 pub fn next_event(&self, timestamp: DateTime<Utc>) -> StreamEvent {
239 let count = self.counter.fetch_add(1, Ordering::SeqCst);
240
241 let value = match &self.generator_type {
242 GeneratorType::Sequential { start, step } => {
243 format!("{}", start + count * step)
244 }
245 GeneratorType::Random { min, max } => {
246 let range = max - min;
247 let value = min + (count as f64 % 1000.0) / 1000.0 * range;
248 format!("{:.2}", value)
249 }
250 GeneratorType::Cyclic { pattern, .. } => {
251 let index = count as usize % pattern.len();
252 format!("{:.2}", pattern[index])
253 }
254 GeneratorType::Gaussian { mean, stddev } => {
255 let value = mean + (count as f64 % 10.0 - 5.0) * stddev / 5.0;
257 format!("{:.2}", value)
258 }
259 GeneratorType::Custom => {
260 format!("{}", count)
261 }
262 };
263
264 let metadata = EventMetadata {
265 event_id: Uuid::new_v4().to_string(),
266 timestamp,
267 source: self.config.source.clone(),
268 user: None,
269 context: Some(format!("test_event_{}", count)),
270 caused_by: None,
271 version: "1.0".to_string(),
272 properties: self.config.properties.clone(),
273 checksum: None,
274 };
275
276 StreamEvent::TripleAdded {
278 subject: format!("test:subject_{}", count),
279 predicate: "test:predicate".to_string(),
280 object: value,
281 graph: None,
282 metadata,
283 }
284 }
285
286 pub fn generate_batch(&self, count: usize, start_time: DateTime<Utc>) -> Vec<StreamEvent> {
288 let mut events = Vec::with_capacity(count);
289 let mut time = start_time;
290
291 for _ in 0..count {
292 events.push(self.next_event(time));
293 time += chrono::Duration::from_std(self.config.timestamp_increment).unwrap_or_default();
294 }
295
296 events
297 }
298}
299
300pub struct TestHarness {
302 config: TestHarnessConfig,
304 clock: Arc<MockClock>,
306 generator: Option<Arc<EventGenerator>>,
308 input_tx: mpsc::Sender<StreamEvent>,
310 input_rx: Arc<RwLock<mpsc::Receiver<StreamEvent>>>,
312 output_events: Arc<RwLock<VecDeque<StreamEvent>>>,
314 captured_events: Arc<RwLock<Vec<CapturedEvent>>>,
316 metrics: Arc<RwLock<TestMetrics>>,
318 assertions: Arc<RwLock<Vec<Assertion>>>,
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct CapturedEvent {
325 pub event: StreamEvent,
327 pub captured_at: DateTime<Utc>,
329 pub processing_time: Option<Duration>,
331 pub source: String,
333}
334
335#[derive(Debug, Clone, Default, Serialize, Deserialize)]
337pub struct TestMetrics {
338 pub events_pushed: u64,
340 pub events_received: u64,
342 pub total_assertions: u64,
344 pub passed_assertions: u64,
346 pub failed_assertions: u64,
348 pub avg_processing_time_us: f64,
350 pub max_processing_time_us: u64,
352 pub test_duration: Duration,
354 pub memory_usage_bytes: usize,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct Assertion {
361 pub assertion_type: AssertionType,
363 pub expected: String,
365 pub actual: Option<String>,
367 pub passed: bool,
369 pub error_message: Option<String>,
371 pub timestamp: DateTime<Utc>,
373}
374
375#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
377pub enum AssertionType {
378 EventCount,
380 Contains,
382 Order,
384 NoEvents,
386 Property,
388 WithinDuration,
390 Performance,
392 Custom(String),
394}
395
396pub struct TestHarnessBuilder {
398 config: TestHarnessConfig,
399 generator: Option<EventGenerator>,
400}
401
402impl TestHarnessBuilder {
403 pub fn new() -> Self {
405 Self {
406 config: TestHarnessConfig::default(),
407 generator: None,
408 }
409 }
410
411 pub fn with_mock_clock(mut self) -> Self {
413 self.config.use_mock_clock = true;
414 self
415 }
416
417 pub fn with_initial_time(mut self, time: DateTime<Utc>) -> Self {
419 self.config.initial_time = Some(time);
420 self
421 }
422
423 pub fn with_buffer_size(mut self, size: usize) -> Self {
425 self.config.event_buffer_size = size;
426 self
427 }
428
429 pub fn with_timeout(mut self, timeout: Duration) -> Self {
431 self.config.assertion_timeout = timeout;
432 self
433 }
434
435 pub fn verbose(mut self) -> Self {
437 self.config.verbose = true;
438 self
439 }
440
441 pub fn with_event_generator(mut self, generator: EventGenerator) -> Self {
443 self.generator = Some(generator);
444 self
445 }
446
447 pub async fn build(self) -> Result<TestHarness> {
449 let initial_time = self.config.initial_time.unwrap_or_else(Utc::now);
450 let clock = Arc::new(MockClock::new(initial_time));
451
452 let (input_tx, input_rx) = mpsc::channel(self.config.event_buffer_size);
453
454 let harness = TestHarness {
455 config: self.config,
456 clock,
457 generator: self.generator.map(Arc::new),
458 input_tx,
459 input_rx: Arc::new(RwLock::new(input_rx)),
460 output_events: Arc::new(RwLock::new(VecDeque::new())),
461 captured_events: Arc::new(RwLock::new(Vec::new())),
462 metrics: Arc::new(RwLock::new(TestMetrics::default())),
463 assertions: Arc::new(RwLock::new(Vec::new())),
464 };
465
466 if harness.config.verbose {
467 info!("Test harness created with config: {:?}", harness.config);
468 }
469
470 Ok(harness)
471 }
472}
473
474impl Default for TestHarnessBuilder {
475 fn default() -> Self {
476 Self::new()
477 }
478}
479
480impl TestHarness {
481 pub fn builder() -> TestHarnessBuilder {
483 TestHarnessBuilder::new()
484 }
485
486 pub async fn now(&self) -> DateTime<Utc> {
488 self.clock.now().await
489 }
490
491 pub async fn advance_time(&self, duration: Duration) {
493 if self.config.verbose {
494 debug!("Advancing time by {:?}", duration);
495 }
496 self.clock.advance(duration).await;
497 }
498
499 pub async fn set_time(&self, time: DateTime<Utc>) {
501 if self.config.verbose {
502 debug!("Setting time to {:?}", time);
503 }
504 self.clock.set_time(time).await;
505 }
506
507 pub async fn push_event(&self, event: StreamEvent) -> Result<()> {
509 self.input_tx
510 .send(event.clone())
511 .await
512 .map_err(|e| anyhow!("Failed to push event: {}", e))?;
513
514 if self.config.capture_events {
515 let mut captured = self.captured_events.write().await;
516 if captured.len() < self.config.max_captured_events {
517 captured.push(CapturedEvent {
518 event,
519 captured_at: self.clock.now().await,
520 processing_time: None,
521 source: "input".to_string(),
522 });
523 }
524 }
525
526 let mut metrics = self.metrics.write().await;
527 metrics.events_pushed += 1;
528
529 Ok(())
530 }
531
532 pub async fn push_events(&self, events: Vec<StreamEvent>) -> Result<()> {
534 for event in events {
535 self.push_event(event).await?;
536 }
537 Ok(())
538 }
539
540 pub async fn generate_events(&self, count: usize) -> Result<()> {
542 if let Some(generator) = &self.generator {
543 let time = self.clock.now().await;
544 let events = generator.generate_batch(count, time);
545 self.push_events(events).await
546 } else {
547 Err(anyhow!("No event generator configured"))
548 }
549 }
550
551 pub async fn add_output(&self, event: StreamEvent) {
553 let mut output = self.output_events.write().await;
554 output.push_back(event.clone());
555
556 if self.config.capture_events {
557 let mut captured = self.captured_events.write().await;
558 if captured.len() < self.config.max_captured_events {
559 captured.push(CapturedEvent {
560 event,
561 captured_at: self.clock.now().await,
562 processing_time: None,
563 source: "output".to_string(),
564 });
565 }
566 }
567
568 let mut metrics = self.metrics.write().await;
569 metrics.events_received += 1;
570 }
571
572 pub async fn get_output(&self) -> Vec<StreamEvent> {
574 let output = self.output_events.read().await;
575 output.iter().cloned().collect()
576 }
577
578 pub async fn clear_output(&self) {
580 let mut output = self.output_events.write().await;
581 output.clear();
582 }
583
584 pub async fn get_captured_events(&self) -> Vec<CapturedEvent> {
586 let captured = self.captured_events.read().await;
587 captured.clone()
588 }
589
590 pub async fn assert_event_count(&self, expected: usize) -> Result<()> {
592 let output = self.output_events.read().await;
593 let actual = output.len();
594
595 let passed = actual == expected;
596 let error_message = if passed {
597 None
598 } else {
599 Some(format!("Expected {} events, got {}", expected, actual))
600 };
601
602 let assertion = Assertion {
603 assertion_type: AssertionType::EventCount,
604 expected: expected.to_string(),
605 actual: Some(actual.to_string()),
606 passed,
607 error_message: error_message.clone(),
608 timestamp: self.clock.now().await,
609 };
610
611 let mut assertions = self.assertions.write().await;
612 assertions.push(assertion);
613
614 let mut metrics = self.metrics.write().await;
615 metrics.total_assertions += 1;
616 if passed {
617 metrics.passed_assertions += 1;
618 } else {
619 metrics.failed_assertions += 1;
620 }
621
622 if passed {
623 Ok(())
624 } else {
625 Err(anyhow!(error_message.unwrap()))
626 }
627 }
628
629 pub async fn assert_contains(&self, predicate: impl Fn(&StreamEvent) -> bool) -> Result<()> {
631 let output = self.output_events.read().await;
632 let found = output.iter().any(predicate);
633
634 let passed = found;
635 let error_message = if passed {
636 None
637 } else {
638 Some("Expected event not found in output".to_string())
639 };
640
641 let assertion = Assertion {
642 assertion_type: AssertionType::Contains,
643 expected: "matching event".to_string(),
644 actual: Some(format!("{} events checked", output.len())),
645 passed,
646 error_message: error_message.clone(),
647 timestamp: self.clock.now().await,
648 };
649
650 let mut assertions = self.assertions.write().await;
651 assertions.push(assertion);
652
653 let mut metrics = self.metrics.write().await;
654 metrics.total_assertions += 1;
655 if passed {
656 metrics.passed_assertions += 1;
657 } else {
658 metrics.failed_assertions += 1;
659 }
660
661 if passed {
662 Ok(())
663 } else {
664 Err(anyhow!(error_message.unwrap()))
665 }
666 }
667
668 pub async fn assert_no_events(&self) -> Result<()> {
670 self.assert_event_count(0).await
671 }
672
673 pub async fn assert_within(
675 &self,
676 duration: Duration,
677 condition: impl Fn(&[StreamEvent]) -> bool,
678 ) -> Result<()> {
679 let start = Instant::now();
680
681 while start.elapsed() < duration {
682 let output = self.output_events.read().await;
683 let events: Vec<_> = output.iter().cloned().collect();
684 drop(output);
685
686 if condition(&events) {
687 let assertion = Assertion {
688 assertion_type: AssertionType::WithinDuration,
689 expected: format!("condition within {:?}", duration),
690 actual: Some(format!("satisfied after {:?}", start.elapsed())),
691 passed: true,
692 error_message: None,
693 timestamp: self.clock.now().await,
694 };
695
696 let mut assertions = self.assertions.write().await;
697 assertions.push(assertion);
698
699 let mut metrics = self.metrics.write().await;
700 metrics.total_assertions += 1;
701 metrics.passed_assertions += 1;
702
703 return Ok(());
704 }
705
706 tokio::time::sleep(Duration::from_millis(10)).await;
707 }
708
709 let error_message = format!("Condition not satisfied within {:?}", duration);
710
711 let assertion = Assertion {
712 assertion_type: AssertionType::WithinDuration,
713 expected: format!("condition within {:?}", duration),
714 actual: Some("timeout".to_string()),
715 passed: false,
716 error_message: Some(error_message.clone()),
717 timestamp: self.clock.now().await,
718 };
719
720 let mut assertions = self.assertions.write().await;
721 assertions.push(assertion);
722
723 let mut metrics = self.metrics.write().await;
724 metrics.total_assertions += 1;
725 metrics.failed_assertions += 1;
726
727 Err(anyhow!(error_message))
728 }
729
730 pub async fn assert_performance(
732 &self,
733 metric: PerformanceMetric,
734 threshold: f64,
735 ) -> Result<()> {
736 let metrics = self.metrics.read().await;
737
738 let (actual, passed) = match metric {
739 PerformanceMetric::AvgLatency => (
740 metrics.avg_processing_time_us,
741 metrics.avg_processing_time_us <= threshold,
742 ),
743 PerformanceMetric::MaxLatency => (
744 metrics.max_processing_time_us as f64,
745 metrics.max_processing_time_us as f64 <= threshold,
746 ),
747 PerformanceMetric::Throughput => {
748 let throughput = if metrics.test_duration.as_secs_f64() > 0.0 {
749 metrics.events_received as f64 / metrics.test_duration.as_secs_f64()
750 } else {
751 0.0
752 };
753 (throughput, throughput >= threshold)
754 }
755 };
756
757 drop(metrics);
758
759 let error_message = if passed {
760 None
761 } else {
762 Some(format!(
763 "{:?} {} does not meet threshold {}",
764 metric, actual, threshold
765 ))
766 };
767
768 let assertion = Assertion {
769 assertion_type: AssertionType::Performance,
770 expected: format!("{:?} <= {}", metric, threshold),
771 actual: Some(actual.to_string()),
772 passed,
773 error_message: error_message.clone(),
774 timestamp: self.clock.now().await,
775 };
776
777 let mut assertions = self.assertions.write().await;
778 assertions.push(assertion);
779
780 let mut metrics = self.metrics.write().await;
781 metrics.total_assertions += 1;
782 if passed {
783 metrics.passed_assertions += 1;
784 } else {
785 metrics.failed_assertions += 1;
786 }
787
788 if passed {
789 Ok(())
790 } else {
791 Err(anyhow!(error_message.unwrap()))
792 }
793 }
794
795 pub async fn get_metrics(&self) -> TestMetrics {
797 self.metrics.read().await.clone()
798 }
799
800 pub async fn get_assertions(&self) -> Vec<Assertion> {
802 self.assertions.read().await.clone()
803 }
804
805 pub async fn generate_report(&self) -> TestReport {
807 let metrics = self.metrics.read().await;
808 let assertions = self.assertions.read().await;
809 let captured = self.captured_events.read().await;
810
811 TestReport {
812 test_name: "stream_test".to_string(),
813 status: if metrics.failed_assertions == 0 {
814 TestStatus::Passed
815 } else {
816 TestStatus::Failed
817 },
818 metrics: metrics.clone(),
819 assertions: assertions.clone(),
820 event_count: captured.len(),
821 generated_at: Utc::now(),
822 }
823 }
824
825 pub async fn reset(&self) {
827 self.output_events.write().await.clear();
828 self.captured_events.write().await.clear();
829 *self.metrics.write().await = TestMetrics::default();
830 self.assertions.write().await.clear();
831
832 if self.config.verbose {
833 info!("Test harness reset");
834 }
835 }
836}
837
838#[derive(Debug, Clone, Copy)]
840pub enum PerformanceMetric {
841 AvgLatency,
843 MaxLatency,
845 Throughput,
847}
848
849#[derive(Debug, Clone, Serialize, Deserialize)]
851pub struct TestReport {
852 pub test_name: String,
854 pub status: TestStatus,
856 pub metrics: TestMetrics,
858 pub assertions: Vec<Assertion>,
860 pub event_count: usize,
862 pub generated_at: DateTime<Utc>,
864}
865
866#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
868pub enum TestStatus {
869 Passed,
870 Failed,
871 Skipped,
872 Error,
873}
874
875impl TestReport {
876 pub fn to_json(&self) -> Result<String> {
878 serde_json::to_string_pretty(self).map_err(|e| anyhow!("JSON error: {}", e))
879 }
880
881 pub fn print_summary(&self) {
883 println!("\n=== Test Report: {} ===", self.test_name);
884 println!("Status: {:?}", self.status);
885 println!("Events pushed: {}", self.metrics.events_pushed);
886 println!("Events received: {}", self.metrics.events_received);
887 println!(
888 "Assertions: {}/{} passed",
889 self.metrics.passed_assertions, self.metrics.total_assertions
890 );
891 if self.metrics.total_assertions > 0 && self.metrics.failed_assertions > 0 {
892 println!("Failed assertions:");
893 for assertion in &self.assertions {
894 if !assertion.passed {
895 println!(
896 " - {:?}: {}",
897 assertion.assertion_type,
898 assertion.error_message.clone().unwrap_or_default()
899 );
900 }
901 }
902 }
903 println!("========================\n");
904 }
905}
906
907pub struct TestFixture {
909 pub name: String,
911 pub setup_events: Vec<StreamEvent>,
913 pub expected_outputs: Vec<StreamEvent>,
915 pub time_advance: Option<Duration>,
917}
918
919impl TestFixture {
920 pub fn new(name: &str) -> Self {
922 Self {
923 name: name.to_string(),
924 setup_events: Vec::new(),
925 expected_outputs: Vec::new(),
926 time_advance: None,
927 }
928 }
929
930 pub fn with_input(mut self, event: StreamEvent) -> Self {
932 self.setup_events.push(event);
933 self
934 }
935
936 pub fn expect_output(mut self, event: StreamEvent) -> Self {
938 self.expected_outputs.push(event);
939 self
940 }
941
942 pub fn advance_time(mut self, duration: Duration) -> Self {
944 self.time_advance = Some(duration);
945 self
946 }
947
948 pub async fn run(&self, harness: &TestHarness) -> Result<()> {
950 harness.push_events(self.setup_events.clone()).await?;
952
953 if let Some(duration) = self.time_advance {
955 harness.advance_time(duration).await;
956 }
957
958 harness
960 .assert_event_count(self.expected_outputs.len())
961 .await?;
962
963 Ok(())
964 }
965}
966
967type EventPredicate = Box<dyn Fn(&StreamEvent) -> bool + Send + Sync>;
969
970pub struct EventMatcher {
972 conditions: Vec<EventPredicate>,
973}
974
975impl EventMatcher {
976 pub fn new() -> Self {
978 Self {
979 conditions: Vec::new(),
980 }
981 }
982
983 pub fn with_condition<F>(mut self, condition: F) -> Self
985 where
986 F: Fn(&StreamEvent) -> bool + Send + Sync + 'static,
987 {
988 self.conditions.push(Box::new(condition));
989 self
990 }
991
992 pub fn triple_added(mut self) -> Self {
994 self.conditions
995 .push(Box::new(|e| matches!(e, StreamEvent::TripleAdded { .. })));
996 self
997 }
998
999 pub fn triple_removed(mut self) -> Self {
1001 self.conditions
1002 .push(Box::new(|e| matches!(e, StreamEvent::TripleRemoved { .. })));
1003 self
1004 }
1005
1006 pub fn with_source(mut self, source: &str) -> Self {
1008 let source = source.to_string();
1009 self.conditions.push(Box::new(move |e| match e {
1010 StreamEvent::TripleAdded { metadata, .. }
1011 | StreamEvent::TripleRemoved { metadata, .. }
1012 | StreamEvent::GraphCreated { metadata, .. }
1013 | StreamEvent::GraphDeleted { metadata, .. }
1014 | StreamEvent::TransactionBegin { metadata, .. }
1015 | StreamEvent::TransactionCommit { metadata, .. }
1016 | StreamEvent::TransactionAbort { metadata, .. }
1017 | StreamEvent::Heartbeat { metadata, .. } => metadata.source == source,
1018 _ => false,
1019 }));
1020 self
1021 }
1022
1023 pub fn sparql_update(mut self) -> Self {
1025 self.conditions
1026 .push(Box::new(|e| matches!(e, StreamEvent::SparqlUpdate { .. })));
1027 self
1028 }
1029
1030 pub fn heartbeat(mut self) -> Self {
1032 self.conditions
1033 .push(Box::new(|e| matches!(e, StreamEvent::Heartbeat { .. })));
1034 self
1035 }
1036
1037 pub fn matches(&self, event: &StreamEvent) -> bool {
1039 self.conditions.iter().all(|c| c(event))
1040 }
1041}
1042
1043impl Default for EventMatcher {
1044 fn default() -> Self {
1045 Self::new()
1046 }
1047}
1048
1049#[macro_export]
1051macro_rules! assert_stream_output {
1052 ($harness:expr, count($expected:expr)) => {
1053 $harness.assert_event_count($expected).await
1054 };
1055 ($harness:expr, contains($predicate:expr)) => {
1056 $harness.assert_contains($predicate).await
1057 };
1058 ($harness:expr, empty) => {
1059 $harness.assert_no_events().await
1060 };
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use super::*;
1066
1067 #[tokio::test]
1068 async fn test_harness_builder() {
1069 let harness = TestHarness::builder()
1070 .with_mock_clock()
1071 .with_buffer_size(1000)
1072 .with_timeout(Duration::from_secs(5))
1073 .build()
1074 .await
1075 .unwrap();
1076
1077 assert!(harness.config.use_mock_clock);
1078 assert_eq!(harness.config.event_buffer_size, 1000);
1079 }
1080
1081 #[tokio::test]
1082 async fn test_mock_clock() {
1083 let clock = MockClock::new(Utc::now());
1084 let initial = clock.now().await;
1085
1086 clock.advance(Duration::from_secs(60)).await;
1087 let after = clock.now().await;
1088
1089 let diff = (after - initial).num_seconds();
1090 assert_eq!(diff, 60);
1091 }
1092
1093 #[tokio::test]
1094 async fn test_event_generator() {
1095 let generator = EventGenerator::sequential(10);
1096 let time = Utc::now();
1097
1098 let events = generator.generate_batch(5, time);
1099 assert_eq!(events.len(), 5);
1100 }
1101
1102 #[tokio::test]
1103 async fn test_push_events() {
1104 let harness = TestHarness::builder().build().await.unwrap();
1105
1106 let metadata = EventMetadata {
1107 event_id: Uuid::new_v4().to_string(),
1108 timestamp: Utc::now(),
1109 source: "test".to_string(),
1110 user: None,
1111 context: None,
1112 caused_by: None,
1113 version: "1.0".to_string(),
1114 properties: HashMap::new(),
1115 checksum: None,
1116 };
1117
1118 let event = StreamEvent::TripleAdded {
1119 subject: "test:subject".to_string(),
1120 predicate: "test:predicate".to_string(),
1121 object: "value1".to_string(),
1122 graph: None,
1123 metadata,
1124 };
1125
1126 harness.push_event(event).await.unwrap();
1127
1128 let metrics = harness.get_metrics().await;
1129 assert_eq!(metrics.events_pushed, 1);
1130 }
1131
1132 #[tokio::test]
1133 async fn test_assert_event_count() {
1134 let harness = TestHarness::builder().build().await.unwrap();
1135
1136 harness.assert_event_count(0).await.unwrap();
1138
1139 let metadata = EventMetadata {
1141 event_id: Uuid::new_v4().to_string(),
1142 timestamp: Utc::now(),
1143 source: "test".to_string(),
1144 user: None,
1145 context: None,
1146 caused_by: None,
1147 version: "1.0".to_string(),
1148 properties: HashMap::new(),
1149 checksum: None,
1150 };
1151
1152 let event = StreamEvent::TripleAdded {
1153 subject: "test:subject".to_string(),
1154 predicate: "test:predicate".to_string(),
1155 object: "value1".to_string(),
1156 graph: None,
1157 metadata,
1158 };
1159
1160 harness.add_output(event).await;
1161
1162 harness.assert_event_count(1).await.unwrap();
1164
1165 assert!(harness.assert_event_count(2).await.is_err());
1167 }
1168
1169 #[tokio::test]
1170 async fn test_assert_contains() {
1171 let harness = TestHarness::builder().build().await.unwrap();
1172
1173 let metadata = EventMetadata {
1174 event_id: Uuid::new_v4().to_string(),
1175 timestamp: Utc::now(),
1176 source: "test".to_string(),
1177 user: None,
1178 context: None,
1179 caused_by: None,
1180 version: "1.0".to_string(),
1181 properties: HashMap::new(),
1182 checksum: None,
1183 };
1184
1185 let event = StreamEvent::TripleAdded {
1186 subject: "test:subject".to_string(),
1187 predicate: "test:predicate".to_string(),
1188 object: "value42".to_string(),
1189 graph: None,
1190 metadata,
1191 };
1192
1193 harness.add_output(event).await;
1194
1195 harness.assert_contains(|e| {
1197 matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "test:subject")
1198 }).await.unwrap();
1199
1200 assert!(harness
1202 .assert_contains(|e| {
1203 matches!(e, StreamEvent::TripleAdded { subject, .. } if subject == "other:subject")
1204 })
1205 .await
1206 .is_err());
1207 }
1208
1209 #[tokio::test]
1210 async fn test_event_matcher() {
1211 let matcher = EventMatcher::new().triple_added();
1212
1213 let metadata = EventMetadata {
1214 event_id: Uuid::new_v4().to_string(),
1215 timestamp: Utc::now(),
1216 source: "test".to_string(),
1217 user: None,
1218 context: None,
1219 caused_by: None,
1220 version: "1.0".to_string(),
1221 properties: HashMap::new(),
1222 checksum: None,
1223 };
1224
1225 let event = StreamEvent::TripleAdded {
1226 subject: "test:subject".to_string(),
1227 predicate: "test:predicate".to_string(),
1228 object: "value".to_string(),
1229 graph: None,
1230 metadata,
1231 };
1232
1233 assert!(matcher.matches(&event));
1234 }
1235
1236 #[tokio::test]
1237 async fn test_generate_report() {
1238 let harness = TestHarness::builder().build().await.unwrap();
1239
1240 harness.assert_event_count(0).await.unwrap();
1241
1242 let report = harness.generate_report().await;
1243 assert_eq!(report.status, TestStatus::Passed);
1244 assert_eq!(report.metrics.total_assertions, 1);
1245 assert_eq!(report.metrics.passed_assertions, 1);
1246 }
1247
1248 #[tokio::test]
1249 async fn test_fixture() {
1250 let harness = TestHarness::builder().build().await.unwrap();
1251
1252 let fixture = TestFixture::new("basic_test").advance_time(Duration::from_secs(60));
1253
1254 fixture.run(&harness).await.unwrap();
1256 }
1257
1258 #[tokio::test]
1259 async fn test_harness_reset() {
1260 let harness = TestHarness::builder().build().await.unwrap();
1261
1262 let metadata = EventMetadata {
1263 event_id: Uuid::new_v4().to_string(),
1264 timestamp: Utc::now(),
1265 source: "test".to_string(),
1266 user: None,
1267 context: None,
1268 caused_by: None,
1269 version: "1.0".to_string(),
1270 properties: HashMap::new(),
1271 checksum: None,
1272 };
1273
1274 let event = StreamEvent::TripleAdded {
1275 subject: "test:subject".to_string(),
1276 predicate: "test:predicate".to_string(),
1277 object: "value".to_string(),
1278 graph: None,
1279 metadata,
1280 };
1281
1282 harness.add_output(event).await;
1283 harness.assert_event_count(1).await.unwrap();
1284
1285 harness.reset().await;
1286
1287 let metrics = harness.get_metrics().await;
1288 assert_eq!(metrics.events_received, 0);
1289 assert_eq!(metrics.total_assertions, 0);
1290
1291 harness.assert_event_count(0).await.unwrap();
1292 }
1293
1294 #[tokio::test]
1295 async fn test_captured_events() {
1296 let harness = TestHarness::builder().build().await.unwrap();
1297
1298 let metadata = EventMetadata {
1299 event_id: Uuid::new_v4().to_string(),
1300 timestamp: Utc::now(),
1301 source: "test".to_string(),
1302 user: None,
1303 context: None,
1304 caused_by: None,
1305 version: "1.0".to_string(),
1306 properties: HashMap::new(),
1307 checksum: None,
1308 };
1309
1310 let event = StreamEvent::TripleAdded {
1311 subject: "test:subject".to_string(),
1312 predicate: "test:predicate".to_string(),
1313 object: "value".to_string(),
1314 graph: None,
1315 metadata,
1316 };
1317
1318 harness.push_event(event).await.unwrap();
1319
1320 let captured = harness.get_captured_events().await;
1321 assert_eq!(captured.len(), 1);
1322 assert_eq!(captured[0].source, "input");
1323 }
1324
1325 #[tokio::test]
1326 async fn test_time_advancement() {
1327 let initial = Utc::now();
1328 let harness = TestHarness::builder()
1329 .with_mock_clock()
1330 .with_initial_time(initial)
1331 .build()
1332 .await
1333 .unwrap();
1334
1335 assert_eq!(harness.now().await, initial);
1336
1337 harness.advance_time(Duration::from_secs(3600)).await;
1338 let after = harness.now().await;
1339 let diff = (after - initial).num_seconds();
1340 assert_eq!(diff, 3600);
1341 }
1342}