1use eventcore_types::{
2 BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3 EventStoreError, ProjectorCoordinator, StreamId, StreamPosition, StreamPrefix, StreamVersion,
4 StreamWrites,
5};
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug, thiserror::Error)]
11#[error("[{scenario}] {detail}")]
12pub struct ContractTestFailure {
13 scenario: &'static str,
14 detail: String,
15}
16
17impl ContractTestFailure {
18 fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19 Self {
20 scenario,
21 detail: detail.into(),
22 }
23 }
24
25 fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26 Self::new(scenario, format!("builder failure during {phase}: {error}"))
27 }
28
29 fn store_error(
30 scenario: &'static str,
31 operation: &'static str,
32 error: EventStoreError,
33 ) -> Self {
34 Self::new(
35 scenario,
36 format!("{operation} operation returned unexpected error: {error}"),
37 )
38 }
39
40 fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41 Self::new(scenario, detail)
42 }
43}
44
45pub type ContractTestResult = Result<(), ContractTestFailure>;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContractTestEvent {
49 stream_id: StreamId,
50}
51
52impl ContractTestEvent {
53 pub fn new(stream_id: StreamId) -> Self {
54 Self { stream_id }
55 }
56}
57
58impl Event for ContractTestEvent {
59 fn stream_id(&self) -> &StreamId {
60 &self.stream_id
61 }
62
63 fn event_type_name() -> &'static str {
64 "ContractTestEvent"
65 }
66}
67
68fn contract_stream_id(
69 scenario: &'static str,
70 label: &str,
71) -> Result<StreamId, ContractTestFailure> {
72 let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
74
75 StreamId::try_new(raw.clone()).map_err(|error| {
76 ContractTestFailure::assertion(
77 scenario,
78 format!("unable to construct stream id `{}`: {}", raw, error),
79 )
80 })
81}
82
83fn builder_step(
84 scenario: &'static str,
85 phase: &'static str,
86 result: Result<StreamWrites, EventStoreError>,
87) -> Result<StreamWrites, ContractTestFailure> {
88 result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
89}
90
91fn register_contract_stream(
92 scenario: &'static str,
93 writes: StreamWrites,
94 stream_id: &StreamId,
95 expected_version: StreamVersion,
96) -> Result<StreamWrites, ContractTestFailure> {
97 builder_step(
98 scenario,
99 "register_stream",
100 writes.register_stream(stream_id.clone(), expected_version),
101 )
102}
103
104fn append_contract_event(
105 scenario: &'static str,
106 writes: StreamWrites,
107 stream_id: &StreamId,
108) -> Result<StreamWrites, ContractTestFailure> {
109 let event = ContractTestEvent::new(stream_id.clone());
110 builder_step(scenario, "append", writes.append(event))
111}
112
113pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
114where
115 F: Fn() -> S + Send + Sync + Clone + 'static,
116 S: EventStore + Send + Sync + 'static,
117{
118 const SCENARIO: &str = "basic_read_write";
119
120 let store = make_store();
121 let stream_id = contract_stream_id(SCENARIO, "single");
122
123 let stream_id = stream_id?;
124
125 let writes = register_contract_stream(
126 SCENARIO,
127 StreamWrites::new(),
128 &stream_id,
129 StreamVersion::new(0),
130 )?;
131 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
132
133 let _ = store
134 .append_events(writes)
135 .await
136 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
137
138 let reader = store
139 .read_stream::<ContractTestEvent>(stream_id.clone())
140 .await
141 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
142
143 let len = reader.len();
144 let empty = reader.is_empty();
145
146 if empty {
147 return Err(ContractTestFailure::assertion(
148 SCENARIO,
149 "expected stream to contain events but it was empty",
150 ));
151 }
152
153 if len != 1 {
154 return Err(ContractTestFailure::assertion(
155 SCENARIO,
156 format!(
157 "expected stream to contain exactly one event, observed len={}",
158 len
159 ),
160 ));
161 }
162
163 Ok(())
164}
165
166pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
167where
168 F: Fn() -> S + Send + Sync + Clone + 'static,
169 S: EventStore + Send + Sync + 'static,
170{
171 const SCENARIO: &str = "concurrent_version_conflicts";
172
173 let store = make_store();
174 let stream_id = contract_stream_id(SCENARIO, "shared")?;
175
176 let first_writes = register_contract_stream(
177 SCENARIO,
178 StreamWrites::new(),
179 &stream_id,
180 StreamVersion::new(0),
181 )?;
182 let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
183
184 let _ = store
185 .append_events(first_writes)
186 .await
187 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
188
189 let conflicting_writes = register_contract_stream(
190 SCENARIO,
191 StreamWrites::new(),
192 &stream_id,
193 StreamVersion::new(0),
194 )?;
195 let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
196
197 match store.append_events(conflicting_writes).await {
198 Err(EventStoreError::VersionConflict { .. }) => Ok(()),
199 Err(error) => Err(ContractTestFailure::store_error(
200 SCENARIO,
201 "append_events",
202 error,
203 )),
204 Ok(_) => Err(ContractTestFailure::assertion(
205 SCENARIO,
206 "expected version conflict but append succeeded",
207 )),
208 }
209}
210
211pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
212where
213 F: Fn() -> S + Send + Sync + Clone + 'static,
214 S: EventStore + Send + Sync + 'static,
215{
216 const SCENARIO: &str = "stream_isolation";
217
218 let store = make_store();
219 let left_stream = contract_stream_id(SCENARIO, "left")?;
220 let right_stream = contract_stream_id(SCENARIO, "right")?;
221
222 let writes = register_contract_stream(
223 SCENARIO,
224 StreamWrites::new(),
225 &left_stream,
226 StreamVersion::new(0),
227 )?;
228 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
229 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
230 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
231
232 let _ = store
233 .append_events(writes)
234 .await
235 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
236
237 let left_reader = store
238 .read_stream::<ContractTestEvent>(left_stream.clone())
239 .await
240 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
241
242 let right_reader = store
243 .read_stream::<ContractTestEvent>(right_stream.clone())
244 .await
245 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
246
247 let left_len = left_reader.len();
248 if left_len != 1 {
249 return Err(ContractTestFailure::assertion(
250 SCENARIO,
251 format!(
252 "left stream expected exactly one event but observed {}",
253 left_len
254 ),
255 ));
256 }
257
258 if left_reader
259 .iter()
260 .any(|event| event.stream_id() != &left_stream)
261 {
262 return Err(ContractTestFailure::assertion(
263 SCENARIO,
264 "left stream read events belonging to another stream",
265 ));
266 }
267
268 let right_len = right_reader.len();
269 if right_len != 1 {
270 return Err(ContractTestFailure::assertion(
271 SCENARIO,
272 format!(
273 "right stream expected exactly one event but observed {}",
274 right_len
275 ),
276 ));
277 }
278
279 if right_reader
280 .iter()
281 .any(|event| event.stream_id() != &right_stream)
282 {
283 return Err(ContractTestFailure::assertion(
284 SCENARIO,
285 "right stream read events belonging to another stream",
286 ));
287 }
288
289 Ok(())
290}
291
292pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
293where
294 F: Fn() -> S + Send + Sync + Clone + 'static,
295 S: EventStore + Send + Sync + 'static,
296{
297 const SCENARIO: &str = "missing_stream_reads";
298
299 let store = make_store();
300 let stream_id = contract_stream_id(SCENARIO, "ghost")?;
301
302 let reader = store
303 .read_stream::<ContractTestEvent>(stream_id.clone())
304 .await
305 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
306
307 if !reader.is_empty() {
308 return Err(ContractTestFailure::assertion(
309 SCENARIO,
310 "expected read_stream to succeed with no events for an untouched stream",
311 ));
312 }
313
314 Ok(())
315}
316
317pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
318where
319 F: Fn() -> S + Send + Sync + Clone + 'static,
320 S: EventStore + Send + Sync + 'static,
321{
322 const SCENARIO: &str = "conflict_preserves_atomicity";
323
324 let store = make_store();
325 let left_stream = contract_stream_id(SCENARIO, "left")?;
326 let right_stream = contract_stream_id(SCENARIO, "right")?;
327
328 let writes = register_contract_stream(
330 SCENARIO,
331 StreamWrites::new(),
332 &left_stream,
333 StreamVersion::new(0),
334 )?;
335 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
336 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
337 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
338
339 let _ = store
340 .append_events(writes)
341 .await
342 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
343
344 let writes = register_contract_stream(
346 SCENARIO,
347 StreamWrites::new(),
348 &left_stream,
349 StreamVersion::new(0),
350 )?;
351 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
352 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
353 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
354
355 match store.append_events(writes).await {
356 Err(EventStoreError::VersionConflict { .. }) => {
357 let left_reader = store
358 .read_stream::<ContractTestEvent>(left_stream.clone())
359 .await
360 .map_err(|error| {
361 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
362 })?;
363 if left_reader.len() != 1 {
364 return Err(ContractTestFailure::assertion(
365 SCENARIO,
366 format!(
367 "expected left stream to remain at len=1 after failed append, observed {}",
368 left_reader.len()
369 ),
370 ));
371 }
372
373 let right_reader = store
374 .read_stream::<ContractTestEvent>(right_stream.clone())
375 .await
376 .map_err(|error| {
377 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
378 })?;
379 if right_reader.len() != 1 {
380 return Err(ContractTestFailure::assertion(
381 SCENARIO,
382 format!(
383 "expected right stream to remain at len=1 after failed append, observed {}",
384 right_reader.len()
385 ),
386 ));
387 }
388
389 Ok(())
390 }
391 Err(error) => Err(ContractTestFailure::store_error(
392 SCENARIO,
393 "append_events",
394 error,
395 )),
396 Ok(_) => Err(ContractTestFailure::assertion(
397 SCENARIO,
398 "expected version conflict but append succeeded",
399 )),
400 }
401}
402
403#[derive(Debug, Clone, Serialize, Deserialize)]
405pub struct MismatchedEvent {
406 stream_id: StreamId,
407 extra_field: String,
408}
409
410impl Event for MismatchedEvent {
411 fn stream_id(&self) -> &StreamId {
412 &self.stream_id
413 }
414
415 fn event_type_name() -> &'static str {
416 "MismatchedEvent"
417 }
418}
419
420pub async fn test_read_stream_errors_on_type_mismatch<F, S>(make_store: F) -> ContractTestResult
428where
429 F: Fn() -> S + Send + Sync + Clone + 'static,
430 S: EventStore + Send + Sync + 'static,
431{
432 const SCENARIO: &str = "read_stream_errors_on_type_mismatch";
433
434 let store = make_store();
435 let stream_id = contract_stream_id(SCENARIO, "mismatched")?;
436
437 let writes = register_contract_stream(
439 SCENARIO,
440 StreamWrites::new(),
441 &stream_id,
442 StreamVersion::new(0),
443 )?;
444 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
445
446 let _ = store
447 .append_events(writes)
448 .await
449 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
450
451 let result = store.read_stream::<MismatchedEvent>(stream_id).await;
453
454 match result {
455 Err(EventStoreError::DeserializationFailed { .. }) => Ok(()),
456 Err(other) => Err(ContractTestFailure::assertion(
457 SCENARIO,
458 format!(
459 "expected DeserializationFailed error, got different error: {}",
460 other
461 ),
462 )),
463 Ok(reader) if reader.is_empty() => Err(ContractTestFailure::assertion(
464 SCENARIO,
465 "read_stream silently returned empty results instead of erroring on type mismatch",
466 )),
467 Ok(reader) => Err(ContractTestFailure::assertion(
468 SCENARIO,
469 format!(
470 "read_stream returned {} events instead of erroring on type mismatch",
471 reader.len()
472 ),
473 )),
474 }
475}
476
477#[macro_export]
502macro_rules! backend_contract_tests {
503 (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
504 #[allow(non_snake_case)]
505 mod $suite {
506 use $crate::contract::{
507 test_basic_read_write, test_batch_limiting,
508 test_checkpoint_independent_subscriptions,
509 test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
510 test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
511 test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
512 test_coordination_independent_subscriptions,
513 test_coordination_leadership_released_on_guard_drop,
514 test_coordination_second_instance_blocked, test_event_ordering_across_streams,
515 test_missing_stream_reads, test_position_based_resumption,
516 test_read_stream_errors_on_type_mismatch, test_stream_isolation,
517 test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
518 };
519
520 #[tokio::test(flavor = "multi_thread")]
521 async fn basic_read_write_contract() {
522 test_basic_read_write($make_store)
523 .await
524 .expect("event store contract failed");
525 }
526
527 #[tokio::test(flavor = "multi_thread")]
528 async fn concurrent_version_conflicts_contract() {
529 test_concurrent_version_conflicts($make_store)
530 .await
531 .expect("event store contract failed");
532 }
533
534 #[tokio::test(flavor = "multi_thread")]
535 async fn stream_isolation_contract() {
536 test_stream_isolation($make_store)
537 .await
538 .expect("event store contract failed");
539 }
540
541 #[tokio::test(flavor = "multi_thread")]
542 async fn missing_stream_reads_contract() {
543 test_missing_stream_reads($make_store)
544 .await
545 .expect("event store contract failed");
546 }
547
548 #[tokio::test(flavor = "multi_thread")]
549 async fn conflict_preserves_atomicity_contract() {
550 test_conflict_preserves_atomicity($make_store)
551 .await
552 .expect("event store contract failed");
553 }
554
555 #[tokio::test(flavor = "multi_thread")]
556 async fn read_stream_errors_on_type_mismatch_contract() {
557 test_read_stream_errors_on_type_mismatch($make_store)
558 .await
559 .expect("event store contract failed");
560 }
561
562 #[tokio::test(flavor = "multi_thread")]
563 async fn event_ordering_across_streams_contract() {
564 test_event_ordering_across_streams($make_store)
565 .await
566 .expect("event reader contract failed");
567 }
568
569 #[tokio::test(flavor = "multi_thread")]
570 async fn position_based_resumption_contract() {
571 test_position_based_resumption($make_store)
572 .await
573 .expect("event reader contract failed");
574 }
575
576 #[tokio::test(flavor = "multi_thread")]
577 async fn stream_prefix_filtering_contract() {
578 test_stream_prefix_filtering($make_store)
579 .await
580 .expect("event reader contract failed");
581 }
582
583 #[tokio::test(flavor = "multi_thread")]
584 async fn stream_prefix_requires_prefix_match_contract() {
585 test_stream_prefix_requires_prefix_match($make_store)
586 .await
587 .expect("event reader contract failed");
588 }
589
590 #[tokio::test(flavor = "multi_thread")]
591 async fn batch_limiting_contract() {
592 test_batch_limiting($make_store)
593 .await
594 .expect("event reader contract failed");
595 }
596
597 #[tokio::test(flavor = "multi_thread")]
599 async fn checkpoint_save_and_load_contract() {
600 test_checkpoint_save_and_load($make_checkpoint_store)
601 .await
602 .expect("checkpoint store contract failed");
603 }
604
605 #[tokio::test(flavor = "multi_thread")]
606 async fn checkpoint_update_overwrites_contract() {
607 test_checkpoint_update_overwrites($make_checkpoint_store)
608 .await
609 .expect("checkpoint store contract failed");
610 }
611
612 #[tokio::test(flavor = "multi_thread")]
613 async fn checkpoint_load_missing_returns_none_contract() {
614 test_checkpoint_load_missing_returns_none($make_checkpoint_store)
615 .await
616 .expect("checkpoint store contract failed");
617 }
618
619 #[tokio::test(flavor = "multi_thread")]
620 async fn checkpoint_independent_subscriptions_contract() {
621 test_checkpoint_independent_subscriptions($make_checkpoint_store)
622 .await
623 .expect("checkpoint store contract failed");
624 }
625
626 #[tokio::test(flavor = "multi_thread")]
628 async fn coordination_acquire_leadership_contract() {
629 test_coordination_acquire_leadership($make_coordinator)
630 .await
631 .expect("coordinator contract failed");
632 }
633
634 #[tokio::test(flavor = "multi_thread")]
635 async fn coordination_second_instance_blocked_contract() {
636 test_coordination_second_instance_blocked($make_coordinator)
637 .await
638 .expect("coordinator contract failed");
639 }
640
641 #[tokio::test(flavor = "multi_thread")]
642 async fn coordination_independent_subscriptions_contract() {
643 test_coordination_independent_subscriptions($make_coordinator)
644 .await
645 .expect("coordinator contract failed");
646 }
647
648 #[tokio::test(flavor = "multi_thread")]
649 async fn coordination_leadership_released_on_guard_drop_contract() {
650 test_coordination_leadership_released_on_guard_drop($make_coordinator)
651 .await
652 .expect("coordinator contract failed");
653 }
654 }
655 };
656}
657
658pub use backend_contract_tests;
659
660pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
662where
663 F: Fn() -> S + Send + Sync + Clone + 'static,
664 S: EventStore + EventReader + Send + Sync + 'static,
665{
666 const SCENARIO: &str = "event_ordering_across_streams";
667
668 let store = make_store();
669
670 let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
672 let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
673 let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
674
675 let writes = register_contract_stream(
677 SCENARIO,
678 StreamWrites::new(),
679 &stream_a,
680 StreamVersion::new(0),
681 )?;
682 let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
683 let _ = store
684 .append_events(writes)
685 .await
686 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
687
688 let writes = register_contract_stream(
690 SCENARIO,
691 StreamWrites::new(),
692 &stream_b,
693 StreamVersion::new(0),
694 )?;
695 let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
696 let _ = store
697 .append_events(writes)
698 .await
699 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
700
701 let writes = register_contract_stream(
703 SCENARIO,
704 StreamWrites::new(),
705 &stream_c,
706 StreamVersion::new(0),
707 )?;
708 let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
709 let _ = store
710 .append_events(writes)
711 .await
712 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
713
714 let filter = EventFilter::all();
716 let page = EventPage::first(BatchSize::new(100));
717 let events = store
718 .read_events::<ContractTestEvent>(filter, page)
719 .await
720 .map_err(|_error| {
721 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
722 })?;
723
724 if events.len() != 3 {
726 return Err(ContractTestFailure::assertion(
727 SCENARIO,
728 format!("expected 3 events but got {}", events.len()),
729 ));
730 }
731
732 let (first_event, _) = &events[0];
734 if first_event.stream_id() != &stream_a {
735 return Err(ContractTestFailure::assertion(
736 SCENARIO,
737 format!(
738 "expected first event from stream_a but got from {:?}",
739 first_event.stream_id()
740 ),
741 ));
742 }
743
744 let (second_event, _) = &events[1];
745 if second_event.stream_id() != &stream_b {
746 return Err(ContractTestFailure::assertion(
747 SCENARIO,
748 format!(
749 "expected second event from stream_b but got from {:?}",
750 second_event.stream_id()
751 ),
752 ));
753 }
754
755 let (third_event, _) = &events[2];
756 if third_event.stream_id() != &stream_c {
757 return Err(ContractTestFailure::assertion(
758 SCENARIO,
759 format!(
760 "expected third event from stream_c but got from {:?}",
761 third_event.stream_id()
762 ),
763 ));
764 }
765
766 Ok(())
767}
768
769pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
771where
772 F: Fn() -> S + Send + Sync + Clone + 'static,
773 S: EventStore + EventReader + Send + Sync + 'static,
774{
775 const SCENARIO: &str = "position_based_resumption";
776
777 let store = make_store();
778
779 let stream = contract_stream_id(SCENARIO, "stream")?;
781
782 let mut writes = register_contract_stream(
783 SCENARIO,
784 StreamWrites::new(),
785 &stream,
786 StreamVersion::new(0),
787 )?;
788
789 for _ in 0..5 {
791 writes = append_contract_event(SCENARIO, writes, &stream)?;
792 }
793
794 let _ = store
795 .append_events(writes)
796 .await
797 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
798
799 let filter = EventFilter::all();
801 let page = EventPage::first(BatchSize::new(100));
802 let all_events = store
803 .read_events::<ContractTestEvent>(filter.clone(), page)
804 .await
805 .map_err(|_error| {
806 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
807 })?;
808
809 let (_third_event, third_position) = &all_events[2];
810
811 let page_after = EventPage::after(*third_position, BatchSize::new(100));
813 let events_after = store
814 .read_events::<ContractTestEvent>(filter, page_after)
815 .await
816 .map_err(|_error| {
817 ContractTestFailure::assertion(
818 SCENARIO,
819 "read_events failed when reading after position",
820 )
821 })?;
822
823 if events_after.len() != 2 {
825 return Err(ContractTestFailure::assertion(
826 SCENARIO,
827 format!(
828 "expected 2 events after position {} but got {}",
829 third_position,
830 events_after.len()
831 ),
832 ));
833 }
834
835 for (_event, position) in events_after.iter() {
837 if *position == *third_position {
838 return Err(ContractTestFailure::assertion(
839 SCENARIO,
840 format!(
841 "expected position {} to be excluded but it was included in results",
842 third_position
843 ),
844 ));
845 }
846 }
847
848 let (_event1, pos1) = &events_after[0];
850 let (_event2, pos2) = &events_after[1];
851
852 if *pos1 <= *third_position {
853 return Err(ContractTestFailure::assertion(
854 SCENARIO,
855 format!(
856 "expected first returned position to be > {} but got {}",
857 third_position, pos1
858 ),
859 ));
860 }
861
862 if *pos2 <= *pos1 {
863 return Err(ContractTestFailure::assertion(
864 SCENARIO,
865 format!(
866 "expected positions to be in ascending order but {} <= {}",
867 pos2, pos1
868 ),
869 ));
870 }
871
872 Ok(())
873}
874
875pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
877where
878 F: Fn() -> S + Send + Sync + Clone + 'static,
879 S: EventStore + EventReader + Send + Sync + 'static,
880{
881 const SCENARIO: &str = "stream_prefix_filtering";
882
883 let store = make_store();
884
885 let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
887 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
888 })?;
889 let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
890 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
891 })?;
892 let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
893 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
894 })?;
895
896 let mut writes = register_contract_stream(
897 SCENARIO,
898 StreamWrites::new(),
899 &account_1,
900 StreamVersion::new(0),
901 )?;
902 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
903 writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
904
905 writes = append_contract_event(SCENARIO, writes, &account_1)?;
906 writes = append_contract_event(SCENARIO, writes, &account_2)?;
907 writes = append_contract_event(SCENARIO, writes, &order_1)?;
908
909 let _ = store
910 .append_events(writes)
911 .await
912 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
913
914 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
916 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
917 })?;
918 let filter = EventFilter::prefix(prefix);
919 let page = EventPage::first(BatchSize::new(100));
920 let events = store
921 .read_events::<ContractTestEvent>(filter, page)
922 .await
923 .map_err(|_error| {
924 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
925 })?;
926
927 if events.len() != 2 {
929 return Err(ContractTestFailure::assertion(
930 SCENARIO,
931 format!(
932 "expected 2 events from account-* streams but got {}",
933 events.len()
934 ),
935 ));
936 }
937
938 for (event, _) in events.iter() {
940 let stream_id_str = event.stream_id().as_ref();
941 if !stream_id_str.starts_with("account-") {
942 return Err(ContractTestFailure::assertion(
943 SCENARIO,
944 format!(
945 "expected all events from streams starting with 'account-' but found event from {}",
946 stream_id_str
947 ),
948 ));
949 }
950 }
951
952 Ok(())
955}
956
957pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
959where
960 F: Fn() -> S + Send + Sync + Clone + 'static,
961 S: EventStore + EventReader + Send + Sync + 'static,
962{
963 const SCENARIO: &str = "stream_prefix_requires_prefix_match";
964
965 let store = make_store();
966
967 let account_stream =
970 StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
971 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
972 })?;
973 let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
974 .map_err(|e| {
975 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
976 })?;
977 let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
978 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
979 })?;
980
981 let mut writes = register_contract_stream(
982 SCENARIO,
983 StreamWrites::new(),
984 &account_stream,
985 StreamVersion::new(0),
986 )?;
987 writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
988 writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
989
990 writes = append_contract_event(SCENARIO, writes, &account_stream)?;
991 writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
992 writes = append_contract_event(SCENARIO, writes, &order_stream)?;
993
994 let _ = store
995 .append_events(writes)
996 .await
997 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
998
999 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
1001 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
1002 })?;
1003 let filter = EventFilter::prefix(prefix);
1004 let page = EventPage::first(BatchSize::new(100));
1005 let events = store
1006 .read_events::<ContractTestEvent>(filter, page)
1007 .await
1008 .map_err(|_error| {
1009 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
1010 })?;
1011
1012 if events.len() != 1 {
1014 return Err(ContractTestFailure::assertion(
1015 SCENARIO,
1016 format!(
1017 "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
1018 events.len()
1019 ),
1020 ));
1021 }
1022
1023 let (event, _) = &events[0];
1025 let stream_id_str = event.stream_id().as_ref();
1026 if !stream_id_str.starts_with("account-123") {
1027 return Err(ContractTestFailure::assertion(
1028 SCENARIO,
1029 format!(
1030 "expected event from stream starting with 'account-123' but got from {}",
1031 stream_id_str
1032 ),
1033 ));
1034 }
1035
1036 if stream_id_str.starts_with("my-account-456") {
1038 return Err(ContractTestFailure::assertion(
1039 SCENARIO,
1040 "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
1041 ));
1042 }
1043
1044 Ok(())
1045}
1046
1047pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
1049where
1050 F: Fn() -> S + Send + Sync + Clone + 'static,
1051 S: EventStore + EventReader + Send + Sync + 'static,
1052{
1053 const SCENARIO: &str = "batch_limiting";
1054
1055 let store = make_store();
1056
1057 let stream = contract_stream_id(SCENARIO, "stream")?;
1059
1060 let mut writes = register_contract_stream(
1061 SCENARIO,
1062 StreamWrites::new(),
1063 &stream,
1064 StreamVersion::new(0),
1065 )?;
1066
1067 for _ in 0..20 {
1069 writes = append_contract_event(SCENARIO, writes, &stream)?;
1070 }
1071
1072 let _ = store
1073 .append_events(writes)
1074 .await
1075 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1076
1077 let filter = EventFilter::all();
1079 let page = EventPage::first(BatchSize::new(10));
1080 let events = store
1081 .read_events::<ContractTestEvent>(filter, page)
1082 .await
1083 .map_err(|_error| {
1084 ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1085 })?;
1086
1087 if events.len() != 10 {
1089 return Err(ContractTestFailure::assertion(
1090 SCENARIO,
1091 format!("expected exactly 10 events but got {}", events.len()),
1092 ));
1093 }
1094
1095 Ok(())
1100}
1101
1102pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1108where
1109 F: Fn() -> CS + Send + Sync + Clone + 'static,
1110 CS: CheckpointStore + Send + Sync + 'static,
1111{
1112 const SCENARIO: &str = "checkpoint_save_and_load";
1113
1114 let store = make_checkpoint_store();
1115
1116 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1118 let position = StreamPosition::new(Uuid::now_v7());
1119
1120 store
1122 .save(&subscription_name, position)
1123 .await
1124 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1125
1126 let loaded = store
1128 .load(&subscription_name)
1129 .await
1130 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1131
1132 if loaded != Some(position) {
1133 return Err(ContractTestFailure::assertion(
1134 SCENARIO,
1135 format!(
1136 "expected loaded position {:?} but got {:?}",
1137 Some(position),
1138 loaded
1139 ),
1140 ));
1141 }
1142
1143 Ok(())
1144}
1145
1146pub async fn test_checkpoint_update_overwrites<F, CS>(
1148 make_checkpoint_store: F,
1149) -> ContractTestResult
1150where
1151 F: Fn() -> CS + Send + Sync + Clone + 'static,
1152 CS: CheckpointStore + Send + Sync + 'static,
1153{
1154 const SCENARIO: &str = "checkpoint_update_overwrites";
1155
1156 let store = make_checkpoint_store();
1157
1158 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1160 let first_position = StreamPosition::new(Uuid::now_v7());
1161
1162 store
1163 .save(&subscription_name, first_position)
1164 .await
1165 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1166
1167 let second_position = StreamPosition::new(Uuid::now_v7());
1169 store
1170 .save(&subscription_name, second_position)
1171 .await
1172 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1173
1174 let loaded = store
1176 .load(&subscription_name)
1177 .await
1178 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1179
1180 if loaded != Some(second_position) {
1181 return Err(ContractTestFailure::assertion(
1182 SCENARIO,
1183 format!(
1184 "expected updated position {:?} but got {:?}",
1185 Some(second_position),
1186 loaded
1187 ),
1188 ));
1189 }
1190
1191 Ok(())
1192}
1193
1194pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1196 make_checkpoint_store: F,
1197) -> ContractTestResult
1198where
1199 F: Fn() -> CS + Send + Sync + Clone + 'static,
1200 CS: CheckpointStore + Send + Sync + 'static,
1201{
1202 const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1203
1204 let store = make_checkpoint_store();
1205
1206 let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1208
1209 let loaded = store
1211 .load(&subscription_name)
1212 .await
1213 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1214
1215 if loaded.is_some() {
1217 return Err(ContractTestFailure::assertion(
1218 SCENARIO,
1219 format!("expected None for missing checkpoint but got {:?}", loaded),
1220 ));
1221 }
1222
1223 Ok(())
1224}
1225
1226pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1228 make_checkpoint_store: F,
1229) -> ContractTestResult
1230where
1231 F: Fn() -> CS + Send + Sync + Clone + 'static,
1232 CS: CheckpointStore + Send + Sync + 'static,
1233{
1234 const SCENARIO: &str = "checkpoint_independent_subscriptions";
1235
1236 let store = make_checkpoint_store();
1237
1238 let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1240 let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1241
1242 let position_a = StreamPosition::new(Uuid::now_v7());
1243 let position_b = StreamPosition::new(Uuid::now_v7());
1244
1245 store
1247 .save(&subscription_a, position_a)
1248 .await
1249 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1250
1251 store
1252 .save(&subscription_b, position_b)
1253 .await
1254 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1255
1256 let loaded_a = store
1258 .load(&subscription_a)
1259 .await
1260 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1261
1262 let loaded_b = store
1263 .load(&subscription_b)
1264 .await
1265 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1266
1267 if loaded_a != Some(position_a) {
1268 return Err(ContractTestFailure::assertion(
1269 SCENARIO,
1270 format!(
1271 "subscription A: expected {:?} but got {:?}",
1272 Some(position_a),
1273 loaded_a
1274 ),
1275 ));
1276 }
1277
1278 if loaded_b != Some(position_b) {
1279 return Err(ContractTestFailure::assertion(
1280 SCENARIO,
1281 format!(
1282 "subscription B: expected {:?} but got {:?}",
1283 Some(position_b),
1284 loaded_b
1285 ),
1286 ));
1287 }
1288
1289 Ok(())
1290}
1291
1292pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1301where
1302 F: Fn() -> C + Send + Sync + Clone + 'static,
1303 C: ProjectorCoordinator + Send + Sync + 'static,
1304{
1305 const SCENARIO: &str = "coordination_acquire_leadership";
1306
1307 let coordinator = make_coordinator();
1308
1309 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1311
1312 let result = coordinator.try_acquire(&subscription_name).await;
1314
1315 if result.is_err() {
1317 return Err(ContractTestFailure::assertion(
1318 SCENARIO,
1319 "expected first instance to acquire leadership successfully, but try_acquire failed",
1320 ));
1321 }
1322
1323 Ok(())
1324}
1325
1326pub async fn test_coordination_second_instance_blocked<F, C>(
1331 make_coordinator: F,
1332) -> ContractTestResult
1333where
1334 F: Fn() -> C + Send + Sync + Clone + 'static,
1335 C: ProjectorCoordinator + Send + Sync + 'static,
1336{
1337 const SCENARIO: &str = "coordination_second_instance_blocked";
1338
1339 let coordinator = make_coordinator();
1340
1341 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1343
1344 let _first_guard = coordinator
1346 .try_acquire(&subscription_name)
1347 .await
1348 .map_err(|_| {
1349 ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1350 })?;
1351
1352 let second_result = coordinator.try_acquire(&subscription_name).await;
1354
1355 if second_result.is_ok() {
1357 return Err(ContractTestFailure::assertion(
1358 SCENARIO,
1359 "expected second instance to be blocked but try_acquire succeeded",
1360 ));
1361 }
1362
1363 Ok(())
1364}
1365
1366pub async fn test_coordination_independent_subscriptions<F, C>(
1372 make_coordinator: F,
1373) -> ContractTestResult
1374where
1375 F: Fn() -> C + Send + Sync + Clone + 'static,
1376 C: ProjectorCoordinator + Send + Sync + 'static,
1377{
1378 const SCENARIO: &str = "coordination_independent_subscriptions";
1379
1380 let coordinator = make_coordinator();
1381
1382 let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1384 let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1385
1386 let _guard_a = coordinator
1388 .try_acquire(&subscription_a)
1389 .await
1390 .map_err(|_| {
1391 ContractTestFailure::assertion(
1392 SCENARIO,
1393 "projector-A failed to acquire leadership for its subscription",
1394 )
1395 })?;
1396
1397 let result_b = coordinator.try_acquire(&subscription_b).await;
1400
1401 if result_b.is_err() {
1403 return Err(ContractTestFailure::assertion(
1404 SCENARIO,
1405 "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1406 ));
1407 }
1408
1409 Ok(())
1410}
1411
1412pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1418 make_coordinator: F,
1419) -> ContractTestResult
1420where
1421 F: Fn() -> C + Send + Sync + Clone + 'static,
1422 C: ProjectorCoordinator + Send + Sync + 'static,
1423{
1424 const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1425
1426 let coordinator = make_coordinator();
1427
1428 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1430
1431 {
1433 let _first_guard = coordinator
1434 .try_acquire(&subscription_name)
1435 .await
1436 .map_err(|_| {
1437 ContractTestFailure::assertion(
1438 SCENARIO,
1439 "first instance failed to acquire leadership",
1440 )
1441 })?;
1442 }
1444
1445 let second_result = coordinator.try_acquire(&subscription_name).await;
1447
1448 if second_result.is_err() {
1450 return Err(ContractTestFailure::assertion(
1451 SCENARIO,
1452 "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1453 ));
1454 }
1455
1456 Ok(())
1457}