1use eventcore_types::{
2 BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3 EventStoreError, StreamId, StreamPosition, StreamPrefix, StreamVersion, StreamWrites,
4};
5use std::fmt;
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug)]
11pub struct ContractTestFailure {
12 scenario: &'static str,
13 detail: String,
14}
15
16impl ContractTestFailure {
17 fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
18 Self {
19 scenario,
20 detail: detail.into(),
21 }
22 }
23
24 fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
25 Self::new(scenario, format!("builder failure during {phase}: {error}"))
26 }
27
28 fn store_error(
29 scenario: &'static str,
30 operation: &'static str,
31 error: EventStoreError,
32 ) -> Self {
33 Self::new(
34 scenario,
35 format!("{operation} operation returned unexpected error: {error}"),
36 )
37 }
38
39 fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
40 Self::new(scenario, detail)
41 }
42}
43
44impl fmt::Display for ContractTestFailure {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 write!(f, "[{}] {}", self.scenario, self.detail)
47 }
48}
49
50impl std::error::Error for ContractTestFailure {}
51
52pub type ContractTestResult = Result<(), ContractTestFailure>;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ContractTestEvent {
56 stream_id: StreamId,
57}
58
59impl ContractTestEvent {
60 pub fn new(stream_id: StreamId) -> Self {
61 Self { stream_id }
62 }
63}
64
65impl Event for ContractTestEvent {
66 fn stream_id(&self) -> &StreamId {
67 &self.stream_id
68 }
69}
70
71fn contract_stream_id(
72 scenario: &'static str,
73 label: &str,
74) -> Result<StreamId, ContractTestFailure> {
75 let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
77
78 StreamId::try_new(raw.clone()).map_err(|error| {
79 ContractTestFailure::assertion(
80 scenario,
81 format!("unable to construct stream id `{}`: {}", raw, error),
82 )
83 })
84}
85
86fn builder_step(
87 scenario: &'static str,
88 phase: &'static str,
89 result: Result<StreamWrites, EventStoreError>,
90) -> Result<StreamWrites, ContractTestFailure> {
91 result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
92}
93
94fn register_contract_stream(
95 scenario: &'static str,
96 writes: StreamWrites,
97 stream_id: &StreamId,
98 expected_version: StreamVersion,
99) -> Result<StreamWrites, ContractTestFailure> {
100 builder_step(
101 scenario,
102 "register_stream",
103 writes.register_stream(stream_id.clone(), expected_version),
104 )
105}
106
107fn append_contract_event(
108 scenario: &'static str,
109 writes: StreamWrites,
110 stream_id: &StreamId,
111) -> Result<StreamWrites, ContractTestFailure> {
112 let event = ContractTestEvent::new(stream_id.clone());
113 builder_step(scenario, "append", writes.append(event))
114}
115
116pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
117where
118 F: Fn() -> S + Send + Sync + Clone + 'static,
119 S: EventStore + Send + Sync + 'static,
120{
121 const SCENARIO: &str = "basic_read_write";
122
123 let store = make_store();
124 let stream_id = contract_stream_id(SCENARIO, "single");
125
126 let stream_id = stream_id?;
127
128 let writes = register_contract_stream(
129 SCENARIO,
130 StreamWrites::new(),
131 &stream_id,
132 StreamVersion::new(0),
133 )?;
134 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
135
136 let _ = store
137 .append_events(writes)
138 .await
139 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
140
141 let reader = store
142 .read_stream::<ContractTestEvent>(stream_id.clone())
143 .await
144 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146 let len = reader.len();
147 let empty = reader.is_empty();
148
149 if empty {
150 return Err(ContractTestFailure::assertion(
151 SCENARIO,
152 "expected stream to contain events but it was empty",
153 ));
154 }
155
156 if len != 1 {
157 return Err(ContractTestFailure::assertion(
158 SCENARIO,
159 format!(
160 "expected stream to contain exactly one event, observed len={}",
161 len
162 ),
163 ));
164 }
165
166 Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171 F: Fn() -> S + Send + Sync + Clone + 'static,
172 S: EventStore + Send + Sync + 'static,
173{
174 const SCENARIO: &str = "concurrent_version_conflicts";
175
176 let store = make_store();
177 let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179 let first_writes = register_contract_stream(
180 SCENARIO,
181 StreamWrites::new(),
182 &stream_id,
183 StreamVersion::new(0),
184 )?;
185 let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187 let _ = store
188 .append_events(first_writes)
189 .await
190 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192 let conflicting_writes = register_contract_stream(
193 SCENARIO,
194 StreamWrites::new(),
195 &stream_id,
196 StreamVersion::new(0),
197 )?;
198 let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200 match store.append_events(conflicting_writes).await {
201 Err(EventStoreError::VersionConflict) => Ok(()),
202 Err(error) => Err(ContractTestFailure::store_error(
203 SCENARIO,
204 "append_events",
205 error,
206 )),
207 Ok(_) => Err(ContractTestFailure::assertion(
208 SCENARIO,
209 "expected version conflict but append succeeded",
210 )),
211 }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216 F: Fn() -> S + Send + Sync + Clone + 'static,
217 S: EventStore + Send + Sync + 'static,
218{
219 const SCENARIO: &str = "stream_isolation";
220
221 let store = make_store();
222 let left_stream = contract_stream_id(SCENARIO, "left")?;
223 let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225 let writes = register_contract_stream(
226 SCENARIO,
227 StreamWrites::new(),
228 &left_stream,
229 StreamVersion::new(0),
230 )?;
231 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235 let _ = store
236 .append_events(writes)
237 .await
238 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240 let left_reader = store
241 .read_stream::<ContractTestEvent>(left_stream.clone())
242 .await
243 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244
245 let right_reader = store
246 .read_stream::<ContractTestEvent>(right_stream.clone())
247 .await
248 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
249
250 let left_len = left_reader.len();
251 if left_len != 1 {
252 return Err(ContractTestFailure::assertion(
253 SCENARIO,
254 format!(
255 "left stream expected exactly one event but observed {}",
256 left_len
257 ),
258 ));
259 }
260
261 if left_reader
262 .iter()
263 .any(|event| event.stream_id() != &left_stream)
264 {
265 return Err(ContractTestFailure::assertion(
266 SCENARIO,
267 "left stream read events belonging to another stream",
268 ));
269 }
270
271 let right_len = right_reader.len();
272 if right_len != 1 {
273 return Err(ContractTestFailure::assertion(
274 SCENARIO,
275 format!(
276 "right stream expected exactly one event but observed {}",
277 right_len
278 ),
279 ));
280 }
281
282 if right_reader
283 .iter()
284 .any(|event| event.stream_id() != &right_stream)
285 {
286 return Err(ContractTestFailure::assertion(
287 SCENARIO,
288 "right stream read events belonging to another stream",
289 ));
290 }
291
292 Ok(())
293}
294
295pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
296where
297 F: Fn() -> S + Send + Sync + Clone + 'static,
298 S: EventStore + Send + Sync + 'static,
299{
300 const SCENARIO: &str = "missing_stream_reads";
301
302 let store = make_store();
303 let stream_id = contract_stream_id(SCENARIO, "ghost")?;
304
305 let reader = store
306 .read_stream::<ContractTestEvent>(stream_id.clone())
307 .await
308 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
309
310 if !reader.is_empty() {
311 return Err(ContractTestFailure::assertion(
312 SCENARIO,
313 "expected read_stream to succeed with no events for an untouched stream",
314 ));
315 }
316
317 Ok(())
318}
319
320pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
321where
322 F: Fn() -> S + Send + Sync + Clone + 'static,
323 S: EventStore + Send + Sync + 'static,
324{
325 const SCENARIO: &str = "conflict_preserves_atomicity";
326
327 let store = make_store();
328 let left_stream = contract_stream_id(SCENARIO, "left")?;
329 let right_stream = contract_stream_id(SCENARIO, "right")?;
330
331 let writes = register_contract_stream(
333 SCENARIO,
334 StreamWrites::new(),
335 &left_stream,
336 StreamVersion::new(0),
337 )?;
338 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
339 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
340 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
341
342 let _ = store
343 .append_events(writes)
344 .await
345 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
346
347 let writes = register_contract_stream(
349 SCENARIO,
350 StreamWrites::new(),
351 &left_stream,
352 StreamVersion::new(0),
353 )?;
354 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
355 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
356 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
357
358 match store.append_events(writes).await {
359 Err(EventStoreError::VersionConflict) => {
360 let left_reader = store
361 .read_stream::<ContractTestEvent>(left_stream.clone())
362 .await
363 .map_err(|error| {
364 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
365 })?;
366 if left_reader.len() != 1 {
367 return Err(ContractTestFailure::assertion(
368 SCENARIO,
369 format!(
370 "expected left stream to remain at len=1 after failed append, observed {}",
371 left_reader.len()
372 ),
373 ));
374 }
375
376 let right_reader = store
377 .read_stream::<ContractTestEvent>(right_stream.clone())
378 .await
379 .map_err(|error| {
380 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
381 })?;
382 if right_reader.len() != 1 {
383 return Err(ContractTestFailure::assertion(
384 SCENARIO,
385 format!(
386 "expected right stream to remain at len=1 after failed append, observed {}",
387 right_reader.len()
388 ),
389 ));
390 }
391
392 Ok(())
393 }
394 Err(error) => Err(ContractTestFailure::store_error(
395 SCENARIO,
396 "append_events",
397 error,
398 )),
399 Ok(_) => Err(ContractTestFailure::assertion(
400 SCENARIO,
401 "expected version conflict but append succeeded",
402 )),
403 }
404}
405
406#[macro_export]
430macro_rules! backend_contract_tests {
431 (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr $(,)?) => {
432 #[allow(non_snake_case)]
433 mod $suite {
434 use $crate::contract::{
435 test_basic_read_write, test_batch_limiting,
436 test_checkpoint_independent_subscriptions,
437 test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
438 test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
439 test_conflict_preserves_atomicity, test_event_ordering_across_streams,
440 test_missing_stream_reads, test_position_based_resumption, test_stream_isolation,
441 test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
442 };
443
444 #[tokio::test(flavor = "multi_thread")]
445 async fn basic_read_write_contract() {
446 test_basic_read_write($make_store)
447 .await
448 .expect("event store contract failed");
449 }
450
451 #[tokio::test(flavor = "multi_thread")]
452 async fn concurrent_version_conflicts_contract() {
453 test_concurrent_version_conflicts($make_store)
454 .await
455 .expect("event store contract failed");
456 }
457
458 #[tokio::test(flavor = "multi_thread")]
459 async fn stream_isolation_contract() {
460 test_stream_isolation($make_store)
461 .await
462 .expect("event store contract failed");
463 }
464
465 #[tokio::test(flavor = "multi_thread")]
466 async fn missing_stream_reads_contract() {
467 test_missing_stream_reads($make_store)
468 .await
469 .expect("event store contract failed");
470 }
471
472 #[tokio::test(flavor = "multi_thread")]
473 async fn conflict_preserves_atomicity_contract() {
474 test_conflict_preserves_atomicity($make_store)
475 .await
476 .expect("event store contract failed");
477 }
478
479 #[tokio::test(flavor = "multi_thread")]
480 async fn event_ordering_across_streams_contract() {
481 test_event_ordering_across_streams($make_store)
482 .await
483 .expect("event reader contract failed");
484 }
485
486 #[tokio::test(flavor = "multi_thread")]
487 async fn position_based_resumption_contract() {
488 test_position_based_resumption($make_store)
489 .await
490 .expect("event reader contract failed");
491 }
492
493 #[tokio::test(flavor = "multi_thread")]
494 async fn stream_prefix_filtering_contract() {
495 test_stream_prefix_filtering($make_store)
496 .await
497 .expect("event reader contract failed");
498 }
499
500 #[tokio::test(flavor = "multi_thread")]
501 async fn stream_prefix_requires_prefix_match_contract() {
502 test_stream_prefix_requires_prefix_match($make_store)
503 .await
504 .expect("event reader contract failed");
505 }
506
507 #[tokio::test(flavor = "multi_thread")]
508 async fn batch_limiting_contract() {
509 test_batch_limiting($make_store)
510 .await
511 .expect("event reader contract failed");
512 }
513
514 #[tokio::test(flavor = "multi_thread")]
516 async fn checkpoint_save_and_load_contract() {
517 test_checkpoint_save_and_load($make_checkpoint_store)
518 .await
519 .expect("checkpoint store contract failed");
520 }
521
522 #[tokio::test(flavor = "multi_thread")]
523 async fn checkpoint_update_overwrites_contract() {
524 test_checkpoint_update_overwrites($make_checkpoint_store)
525 .await
526 .expect("checkpoint store contract failed");
527 }
528
529 #[tokio::test(flavor = "multi_thread")]
530 async fn checkpoint_load_missing_returns_none_contract() {
531 test_checkpoint_load_missing_returns_none($make_checkpoint_store)
532 .await
533 .expect("checkpoint store contract failed");
534 }
535
536 #[tokio::test(flavor = "multi_thread")]
537 async fn checkpoint_independent_subscriptions_contract() {
538 test_checkpoint_independent_subscriptions($make_checkpoint_store)
539 .await
540 .expect("checkpoint store contract failed");
541 }
542 }
543 };
544}
545
546pub use backend_contract_tests;
547
548pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
550where
551 F: Fn() -> S + Send + Sync + Clone + 'static,
552 S: EventStore + EventReader + Send + Sync + 'static,
553{
554 const SCENARIO: &str = "event_ordering_across_streams";
555
556 let store = make_store();
557
558 let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
560 let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
561 let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
562
563 let writes = register_contract_stream(
565 SCENARIO,
566 StreamWrites::new(),
567 &stream_a,
568 StreamVersion::new(0),
569 )?;
570 let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
571 let _ = store
572 .append_events(writes)
573 .await
574 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
575
576 let writes = register_contract_stream(
578 SCENARIO,
579 StreamWrites::new(),
580 &stream_b,
581 StreamVersion::new(0),
582 )?;
583 let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
584 let _ = store
585 .append_events(writes)
586 .await
587 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
588
589 let writes = register_contract_stream(
591 SCENARIO,
592 StreamWrites::new(),
593 &stream_c,
594 StreamVersion::new(0),
595 )?;
596 let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
597 let _ = store
598 .append_events(writes)
599 .await
600 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
601
602 let filter = EventFilter::all();
604 let page = EventPage::first(BatchSize::new(100));
605 let events = store
606 .read_events::<ContractTestEvent>(filter, page)
607 .await
608 .map_err(|_error| {
609 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
610 })?;
611
612 if events.len() != 3 {
614 return Err(ContractTestFailure::assertion(
615 SCENARIO,
616 format!("expected 3 events but got {}", events.len()),
617 ));
618 }
619
620 let (first_event, _) = &events[0];
622 if first_event.stream_id() != &stream_a {
623 return Err(ContractTestFailure::assertion(
624 SCENARIO,
625 format!(
626 "expected first event from stream_a but got from {:?}",
627 first_event.stream_id()
628 ),
629 ));
630 }
631
632 let (second_event, _) = &events[1];
633 if second_event.stream_id() != &stream_b {
634 return Err(ContractTestFailure::assertion(
635 SCENARIO,
636 format!(
637 "expected second event from stream_b but got from {:?}",
638 second_event.stream_id()
639 ),
640 ));
641 }
642
643 let (third_event, _) = &events[2];
644 if third_event.stream_id() != &stream_c {
645 return Err(ContractTestFailure::assertion(
646 SCENARIO,
647 format!(
648 "expected third event from stream_c but got from {:?}",
649 third_event.stream_id()
650 ),
651 ));
652 }
653
654 Ok(())
655}
656
657pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
659where
660 F: Fn() -> S + Send + Sync + Clone + 'static,
661 S: EventStore + EventReader + Send + Sync + 'static,
662{
663 const SCENARIO: &str = "position_based_resumption";
664
665 let store = make_store();
666
667 let stream = contract_stream_id(SCENARIO, "stream")?;
669
670 let mut writes = register_contract_stream(
671 SCENARIO,
672 StreamWrites::new(),
673 &stream,
674 StreamVersion::new(0),
675 )?;
676
677 for _ in 0..5 {
679 writes = append_contract_event(SCENARIO, writes, &stream)?;
680 }
681
682 let _ = store
683 .append_events(writes)
684 .await
685 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
686
687 let filter = EventFilter::all();
689 let page = EventPage::first(BatchSize::new(100));
690 let all_events = store
691 .read_events::<ContractTestEvent>(filter.clone(), page)
692 .await
693 .map_err(|_error| {
694 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
695 })?;
696
697 let (_third_event, third_position) = &all_events[2];
698
699 let page_after = EventPage::after(*third_position, BatchSize::new(100));
701 let events_after = store
702 .read_events::<ContractTestEvent>(filter, page_after)
703 .await
704 .map_err(|_error| {
705 ContractTestFailure::assertion(
706 SCENARIO,
707 "read_events failed when reading after position",
708 )
709 })?;
710
711 if events_after.len() != 2 {
713 return Err(ContractTestFailure::assertion(
714 SCENARIO,
715 format!(
716 "expected 2 events after position {} but got {}",
717 third_position,
718 events_after.len()
719 ),
720 ));
721 }
722
723 for (_event, position) in events_after.iter() {
725 if *position == *third_position {
726 return Err(ContractTestFailure::assertion(
727 SCENARIO,
728 format!(
729 "expected position {} to be excluded but it was included in results",
730 third_position
731 ),
732 ));
733 }
734 }
735
736 let (_event1, pos1) = &events_after[0];
738 let (_event2, pos2) = &events_after[1];
739
740 if *pos1 <= *third_position {
741 return Err(ContractTestFailure::assertion(
742 SCENARIO,
743 format!(
744 "expected first returned position to be > {} but got {}",
745 third_position, pos1
746 ),
747 ));
748 }
749
750 if *pos2 <= *pos1 {
751 return Err(ContractTestFailure::assertion(
752 SCENARIO,
753 format!(
754 "expected positions to be in ascending order but {} <= {}",
755 pos2, pos1
756 ),
757 ));
758 }
759
760 Ok(())
761}
762
763pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
765where
766 F: Fn() -> S + Send + Sync + Clone + 'static,
767 S: EventStore + EventReader + Send + Sync + 'static,
768{
769 const SCENARIO: &str = "stream_prefix_filtering";
770
771 let store = make_store();
772
773 let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
775 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
776 })?;
777 let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
778 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
779 })?;
780 let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
781 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
782 })?;
783
784 let mut writes = register_contract_stream(
785 SCENARIO,
786 StreamWrites::new(),
787 &account_1,
788 StreamVersion::new(0),
789 )?;
790 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
791 writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
792
793 writes = append_contract_event(SCENARIO, writes, &account_1)?;
794 writes = append_contract_event(SCENARIO, writes, &account_2)?;
795 writes = append_contract_event(SCENARIO, writes, &order_1)?;
796
797 let _ = store
798 .append_events(writes)
799 .await
800 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
801
802 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
804 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
805 })?;
806 let filter = EventFilter::prefix(prefix);
807 let page = EventPage::first(BatchSize::new(100));
808 let events = store
809 .read_events::<ContractTestEvent>(filter, page)
810 .await
811 .map_err(|_error| {
812 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
813 })?;
814
815 if events.len() != 2 {
817 return Err(ContractTestFailure::assertion(
818 SCENARIO,
819 format!(
820 "expected 2 events from account-* streams but got {}",
821 events.len()
822 ),
823 ));
824 }
825
826 for (event, _) in events.iter() {
828 let stream_id_str = event.stream_id().as_ref();
829 if !stream_id_str.starts_with("account-") {
830 return Err(ContractTestFailure::assertion(
831 SCENARIO,
832 format!(
833 "expected all events from streams starting with 'account-' but found event from {}",
834 stream_id_str
835 ),
836 ));
837 }
838 }
839
840 Ok(())
843}
844
845pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
847where
848 F: Fn() -> S + Send + Sync + Clone + 'static,
849 S: EventStore + EventReader + Send + Sync + 'static,
850{
851 const SCENARIO: &str = "stream_prefix_requires_prefix_match";
852
853 let store = make_store();
854
855 let account_stream =
858 StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
859 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
860 })?;
861 let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
862 .map_err(|e| {
863 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
864 })?;
865 let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
866 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
867 })?;
868
869 let mut writes = register_contract_stream(
870 SCENARIO,
871 StreamWrites::new(),
872 &account_stream,
873 StreamVersion::new(0),
874 )?;
875 writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
876 writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
877
878 writes = append_contract_event(SCENARIO, writes, &account_stream)?;
879 writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
880 writes = append_contract_event(SCENARIO, writes, &order_stream)?;
881
882 let _ = store
883 .append_events(writes)
884 .await
885 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
886
887 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
889 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
890 })?;
891 let filter = EventFilter::prefix(prefix);
892 let page = EventPage::first(BatchSize::new(100));
893 let events = store
894 .read_events::<ContractTestEvent>(filter, page)
895 .await
896 .map_err(|_error| {
897 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
898 })?;
899
900 if events.len() != 1 {
902 return Err(ContractTestFailure::assertion(
903 SCENARIO,
904 format!(
905 "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
906 events.len()
907 ),
908 ));
909 }
910
911 let (event, _) = &events[0];
913 let stream_id_str = event.stream_id().as_ref();
914 if !stream_id_str.starts_with("account-123") {
915 return Err(ContractTestFailure::assertion(
916 SCENARIO,
917 format!(
918 "expected event from stream starting with 'account-123' but got from {}",
919 stream_id_str
920 ),
921 ));
922 }
923
924 if stream_id_str.starts_with("my-account-456") {
926 return Err(ContractTestFailure::assertion(
927 SCENARIO,
928 "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
929 ));
930 }
931
932 Ok(())
933}
934
935pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
937where
938 F: Fn() -> S + Send + Sync + Clone + 'static,
939 S: EventStore + EventReader + Send + Sync + 'static,
940{
941 const SCENARIO: &str = "batch_limiting";
942
943 let store = make_store();
944
945 let stream = contract_stream_id(SCENARIO, "stream")?;
947
948 let mut writes = register_contract_stream(
949 SCENARIO,
950 StreamWrites::new(),
951 &stream,
952 StreamVersion::new(0),
953 )?;
954
955 for _ in 0..20 {
957 writes = append_contract_event(SCENARIO, writes, &stream)?;
958 }
959
960 let _ = store
961 .append_events(writes)
962 .await
963 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
964
965 let filter = EventFilter::all();
967 let page = EventPage::first(BatchSize::new(10));
968 let events = store
969 .read_events::<ContractTestEvent>(filter, page)
970 .await
971 .map_err(|_error| {
972 ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
973 })?;
974
975 if events.len() != 10 {
977 return Err(ContractTestFailure::assertion(
978 SCENARIO,
979 format!("expected exactly 10 events but got {}", events.len()),
980 ));
981 }
982
983 Ok(())
988}
989
990pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
996where
997 F: Fn() -> CS + Send + Sync + Clone + 'static,
998 CS: CheckpointStore + Send + Sync + 'static,
999{
1000 const SCENARIO: &str = "checkpoint_save_and_load";
1001
1002 let store = make_checkpoint_store();
1003
1004 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1006 let position = StreamPosition::new(Uuid::now_v7());
1007
1008 store
1010 .save(&subscription_name, position)
1011 .await
1012 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1013
1014 let loaded = store
1016 .load(&subscription_name)
1017 .await
1018 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1019
1020 if loaded != Some(position) {
1021 return Err(ContractTestFailure::assertion(
1022 SCENARIO,
1023 format!(
1024 "expected loaded position {:?} but got {:?}",
1025 Some(position),
1026 loaded
1027 ),
1028 ));
1029 }
1030
1031 Ok(())
1032}
1033
1034pub async fn test_checkpoint_update_overwrites<F, CS>(
1036 make_checkpoint_store: F,
1037) -> ContractTestResult
1038where
1039 F: Fn() -> CS + Send + Sync + Clone + 'static,
1040 CS: CheckpointStore + Send + Sync + 'static,
1041{
1042 const SCENARIO: &str = "checkpoint_update_overwrites";
1043
1044 let store = make_checkpoint_store();
1045
1046 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1048 let first_position = StreamPosition::new(Uuid::now_v7());
1049
1050 store
1051 .save(&subscription_name, first_position)
1052 .await
1053 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1054
1055 let second_position = StreamPosition::new(Uuid::now_v7());
1057 store
1058 .save(&subscription_name, second_position)
1059 .await
1060 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1061
1062 let loaded = store
1064 .load(&subscription_name)
1065 .await
1066 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1067
1068 if loaded != Some(second_position) {
1069 return Err(ContractTestFailure::assertion(
1070 SCENARIO,
1071 format!(
1072 "expected updated position {:?} but got {:?}",
1073 Some(second_position),
1074 loaded
1075 ),
1076 ));
1077 }
1078
1079 Ok(())
1080}
1081
1082pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1084 make_checkpoint_store: F,
1085) -> ContractTestResult
1086where
1087 F: Fn() -> CS + Send + Sync + Clone + 'static,
1088 CS: CheckpointStore + Send + Sync + 'static,
1089{
1090 const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1091
1092 let store = make_checkpoint_store();
1093
1094 let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1096
1097 let loaded = store
1099 .load(&subscription_name)
1100 .await
1101 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1102
1103 if loaded.is_some() {
1105 return Err(ContractTestFailure::assertion(
1106 SCENARIO,
1107 format!("expected None for missing checkpoint but got {:?}", loaded),
1108 ));
1109 }
1110
1111 Ok(())
1112}
1113
1114pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1116 make_checkpoint_store: F,
1117) -> ContractTestResult
1118where
1119 F: Fn() -> CS + Send + Sync + Clone + 'static,
1120 CS: CheckpointStore + Send + Sync + 'static,
1121{
1122 const SCENARIO: &str = "checkpoint_independent_subscriptions";
1123
1124 let store = make_checkpoint_store();
1125
1126 let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1128 let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1129
1130 let position_a = StreamPosition::new(Uuid::now_v7());
1131 let position_b = StreamPosition::new(Uuid::now_v7());
1132
1133 store
1135 .save(&subscription_a, position_a)
1136 .await
1137 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1138
1139 store
1140 .save(&subscription_b, position_b)
1141 .await
1142 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1143
1144 let loaded_a = store
1146 .load(&subscription_a)
1147 .await
1148 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1149
1150 let loaded_b = store
1151 .load(&subscription_b)
1152 .await
1153 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1154
1155 if loaded_a != Some(position_a) {
1156 return Err(ContractTestFailure::assertion(
1157 SCENARIO,
1158 format!(
1159 "subscription A: expected {:?} but got {:?}",
1160 Some(position_a),
1161 loaded_a
1162 ),
1163 ));
1164 }
1165
1166 if loaded_b != Some(position_b) {
1167 return Err(ContractTestFailure::assertion(
1168 SCENARIO,
1169 format!(
1170 "subscription B: expected {:?} but got {:?}",
1171 Some(position_b),
1172 loaded_b
1173 ),
1174 ));
1175 }
1176
1177 Ok(())
1178}