1use eventcore_types::{
2 BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3 EventStoreError, ProjectorCoordinator, StreamId, StreamPosition, StreamPrefix, StreamVersion,
4 StreamWrites,
5};
6use std::fmt;
7
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11#[derive(Debug)]
12pub struct ContractTestFailure {
13 scenario: &'static str,
14 detail: String,
15}
16
17impl ContractTestFailure {
18 fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19 Self {
20 scenario,
21 detail: detail.into(),
22 }
23 }
24
25 fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26 Self::new(scenario, format!("builder failure during {phase}: {error}"))
27 }
28
29 fn store_error(
30 scenario: &'static str,
31 operation: &'static str,
32 error: EventStoreError,
33 ) -> Self {
34 Self::new(
35 scenario,
36 format!("{operation} operation returned unexpected error: {error}"),
37 )
38 }
39
40 fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41 Self::new(scenario, detail)
42 }
43}
44
45impl fmt::Display for ContractTestFailure {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 write!(f, "[{}] {}", self.scenario, self.detail)
48 }
49}
50
51impl std::error::Error for ContractTestFailure {}
52
53pub type ContractTestResult = Result<(), ContractTestFailure>;
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ContractTestEvent {
57 stream_id: StreamId,
58}
59
60impl ContractTestEvent {
61 pub fn new(stream_id: StreamId) -> Self {
62 Self { stream_id }
63 }
64}
65
66impl Event for ContractTestEvent {
67 fn stream_id(&self) -> &StreamId {
68 &self.stream_id
69 }
70}
71
72fn contract_stream_id(
73 scenario: &'static str,
74 label: &str,
75) -> Result<StreamId, ContractTestFailure> {
76 let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
78
79 StreamId::try_new(raw.clone()).map_err(|error| {
80 ContractTestFailure::assertion(
81 scenario,
82 format!("unable to construct stream id `{}`: {}", raw, error),
83 )
84 })
85}
86
87fn builder_step(
88 scenario: &'static str,
89 phase: &'static str,
90 result: Result<StreamWrites, EventStoreError>,
91) -> Result<StreamWrites, ContractTestFailure> {
92 result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
93}
94
95fn register_contract_stream(
96 scenario: &'static str,
97 writes: StreamWrites,
98 stream_id: &StreamId,
99 expected_version: StreamVersion,
100) -> Result<StreamWrites, ContractTestFailure> {
101 builder_step(
102 scenario,
103 "register_stream",
104 writes.register_stream(stream_id.clone(), expected_version),
105 )
106}
107
108fn append_contract_event(
109 scenario: &'static str,
110 writes: StreamWrites,
111 stream_id: &StreamId,
112) -> Result<StreamWrites, ContractTestFailure> {
113 let event = ContractTestEvent::new(stream_id.clone());
114 builder_step(scenario, "append", writes.append(event))
115}
116
117pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
118where
119 F: Fn() -> S + Send + Sync + Clone + 'static,
120 S: EventStore + Send + Sync + 'static,
121{
122 const SCENARIO: &str = "basic_read_write";
123
124 let store = make_store();
125 let stream_id = contract_stream_id(SCENARIO, "single");
126
127 let stream_id = stream_id?;
128
129 let writes = register_contract_stream(
130 SCENARIO,
131 StreamWrites::new(),
132 &stream_id,
133 StreamVersion::new(0),
134 )?;
135 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
136
137 let _ = store
138 .append_events(writes)
139 .await
140 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
141
142 let reader = store
143 .read_stream::<ContractTestEvent>(stream_id.clone())
144 .await
145 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
146
147 let len = reader.len();
148 let empty = reader.is_empty();
149
150 if empty {
151 return Err(ContractTestFailure::assertion(
152 SCENARIO,
153 "expected stream to contain events but it was empty",
154 ));
155 }
156
157 if len != 1 {
158 return Err(ContractTestFailure::assertion(
159 SCENARIO,
160 format!(
161 "expected stream to contain exactly one event, observed len={}",
162 len
163 ),
164 ));
165 }
166
167 Ok(())
168}
169
170pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
171where
172 F: Fn() -> S + Send + Sync + Clone + 'static,
173 S: EventStore + Send + Sync + 'static,
174{
175 const SCENARIO: &str = "concurrent_version_conflicts";
176
177 let store = make_store();
178 let stream_id = contract_stream_id(SCENARIO, "shared")?;
179
180 let first_writes = register_contract_stream(
181 SCENARIO,
182 StreamWrites::new(),
183 &stream_id,
184 StreamVersion::new(0),
185 )?;
186 let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
187
188 let _ = store
189 .append_events(first_writes)
190 .await
191 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
192
193 let conflicting_writes = register_contract_stream(
194 SCENARIO,
195 StreamWrites::new(),
196 &stream_id,
197 StreamVersion::new(0),
198 )?;
199 let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
200
201 match store.append_events(conflicting_writes).await {
202 Err(EventStoreError::VersionConflict) => Ok(()),
203 Err(error) => Err(ContractTestFailure::store_error(
204 SCENARIO,
205 "append_events",
206 error,
207 )),
208 Ok(_) => Err(ContractTestFailure::assertion(
209 SCENARIO,
210 "expected version conflict but append succeeded",
211 )),
212 }
213}
214
215pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
216where
217 F: Fn() -> S + Send + Sync + Clone + 'static,
218 S: EventStore + Send + Sync + 'static,
219{
220 const SCENARIO: &str = "stream_isolation";
221
222 let store = make_store();
223 let left_stream = contract_stream_id(SCENARIO, "left")?;
224 let right_stream = contract_stream_id(SCENARIO, "right")?;
225
226 let writes = register_contract_stream(
227 SCENARIO,
228 StreamWrites::new(),
229 &left_stream,
230 StreamVersion::new(0),
231 )?;
232 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
233 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
234 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
235
236 let _ = store
237 .append_events(writes)
238 .await
239 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
240
241 let left_reader = store
242 .read_stream::<ContractTestEvent>(left_stream.clone())
243 .await
244 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
245
246 let right_reader = store
247 .read_stream::<ContractTestEvent>(right_stream.clone())
248 .await
249 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
250
251 let left_len = left_reader.len();
252 if left_len != 1 {
253 return Err(ContractTestFailure::assertion(
254 SCENARIO,
255 format!(
256 "left stream expected exactly one event but observed {}",
257 left_len
258 ),
259 ));
260 }
261
262 if left_reader
263 .iter()
264 .any(|event| event.stream_id() != &left_stream)
265 {
266 return Err(ContractTestFailure::assertion(
267 SCENARIO,
268 "left stream read events belonging to another stream",
269 ));
270 }
271
272 let right_len = right_reader.len();
273 if right_len != 1 {
274 return Err(ContractTestFailure::assertion(
275 SCENARIO,
276 format!(
277 "right stream expected exactly one event but observed {}",
278 right_len
279 ),
280 ));
281 }
282
283 if right_reader
284 .iter()
285 .any(|event| event.stream_id() != &right_stream)
286 {
287 return Err(ContractTestFailure::assertion(
288 SCENARIO,
289 "right stream read events belonging to another stream",
290 ));
291 }
292
293 Ok(())
294}
295
296pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
297where
298 F: Fn() -> S + Send + Sync + Clone + 'static,
299 S: EventStore + Send + Sync + 'static,
300{
301 const SCENARIO: &str = "missing_stream_reads";
302
303 let store = make_store();
304 let stream_id = contract_stream_id(SCENARIO, "ghost")?;
305
306 let reader = store
307 .read_stream::<ContractTestEvent>(stream_id.clone())
308 .await
309 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
310
311 if !reader.is_empty() {
312 return Err(ContractTestFailure::assertion(
313 SCENARIO,
314 "expected read_stream to succeed with no events for an untouched stream",
315 ));
316 }
317
318 Ok(())
319}
320
321pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
322where
323 F: Fn() -> S + Send + Sync + Clone + 'static,
324 S: EventStore + Send + Sync + 'static,
325{
326 const SCENARIO: &str = "conflict_preserves_atomicity";
327
328 let store = make_store();
329 let left_stream = contract_stream_id(SCENARIO, "left")?;
330 let right_stream = contract_stream_id(SCENARIO, "right")?;
331
332 let writes = register_contract_stream(
334 SCENARIO,
335 StreamWrites::new(),
336 &left_stream,
337 StreamVersion::new(0),
338 )?;
339 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
340 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
341 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
342
343 let _ = store
344 .append_events(writes)
345 .await
346 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
347
348 let writes = register_contract_stream(
350 SCENARIO,
351 StreamWrites::new(),
352 &left_stream,
353 StreamVersion::new(0),
354 )?;
355 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
356 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
357 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
358
359 match store.append_events(writes).await {
360 Err(EventStoreError::VersionConflict) => {
361 let left_reader = store
362 .read_stream::<ContractTestEvent>(left_stream.clone())
363 .await
364 .map_err(|error| {
365 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
366 })?;
367 if left_reader.len() != 1 {
368 return Err(ContractTestFailure::assertion(
369 SCENARIO,
370 format!(
371 "expected left stream to remain at len=1 after failed append, observed {}",
372 left_reader.len()
373 ),
374 ));
375 }
376
377 let right_reader = store
378 .read_stream::<ContractTestEvent>(right_stream.clone())
379 .await
380 .map_err(|error| {
381 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
382 })?;
383 if right_reader.len() != 1 {
384 return Err(ContractTestFailure::assertion(
385 SCENARIO,
386 format!(
387 "expected right stream to remain at len=1 after failed append, observed {}",
388 right_reader.len()
389 ),
390 ));
391 }
392
393 Ok(())
394 }
395 Err(error) => Err(ContractTestFailure::store_error(
396 SCENARIO,
397 "append_events",
398 error,
399 )),
400 Ok(_) => Err(ContractTestFailure::assertion(
401 SCENARIO,
402 "expected version conflict but append succeeded",
403 )),
404 }
405}
406
407#[macro_export]
432macro_rules! backend_contract_tests {
433 (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
434 #[allow(non_snake_case)]
435 mod $suite {
436 use $crate::contract::{
437 test_basic_read_write, test_batch_limiting,
438 test_checkpoint_independent_subscriptions,
439 test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
440 test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
441 test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
442 test_coordination_independent_subscriptions,
443 test_coordination_leadership_released_on_guard_drop,
444 test_coordination_second_instance_blocked, test_event_ordering_across_streams,
445 test_missing_stream_reads, test_position_based_resumption, test_stream_isolation,
446 test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
447 };
448
449 #[tokio::test(flavor = "multi_thread")]
450 async fn basic_read_write_contract() {
451 test_basic_read_write($make_store)
452 .await
453 .expect("event store contract failed");
454 }
455
456 #[tokio::test(flavor = "multi_thread")]
457 async fn concurrent_version_conflicts_contract() {
458 test_concurrent_version_conflicts($make_store)
459 .await
460 .expect("event store contract failed");
461 }
462
463 #[tokio::test(flavor = "multi_thread")]
464 async fn stream_isolation_contract() {
465 test_stream_isolation($make_store)
466 .await
467 .expect("event store contract failed");
468 }
469
470 #[tokio::test(flavor = "multi_thread")]
471 async fn missing_stream_reads_contract() {
472 test_missing_stream_reads($make_store)
473 .await
474 .expect("event store contract failed");
475 }
476
477 #[tokio::test(flavor = "multi_thread")]
478 async fn conflict_preserves_atomicity_contract() {
479 test_conflict_preserves_atomicity($make_store)
480 .await
481 .expect("event store contract failed");
482 }
483
484 #[tokio::test(flavor = "multi_thread")]
485 async fn event_ordering_across_streams_contract() {
486 test_event_ordering_across_streams($make_store)
487 .await
488 .expect("event reader contract failed");
489 }
490
491 #[tokio::test(flavor = "multi_thread")]
492 async fn position_based_resumption_contract() {
493 test_position_based_resumption($make_store)
494 .await
495 .expect("event reader contract failed");
496 }
497
498 #[tokio::test(flavor = "multi_thread")]
499 async fn stream_prefix_filtering_contract() {
500 test_stream_prefix_filtering($make_store)
501 .await
502 .expect("event reader contract failed");
503 }
504
505 #[tokio::test(flavor = "multi_thread")]
506 async fn stream_prefix_requires_prefix_match_contract() {
507 test_stream_prefix_requires_prefix_match($make_store)
508 .await
509 .expect("event reader contract failed");
510 }
511
512 #[tokio::test(flavor = "multi_thread")]
513 async fn batch_limiting_contract() {
514 test_batch_limiting($make_store)
515 .await
516 .expect("event reader contract failed");
517 }
518
519 #[tokio::test(flavor = "multi_thread")]
521 async fn checkpoint_save_and_load_contract() {
522 test_checkpoint_save_and_load($make_checkpoint_store)
523 .await
524 .expect("checkpoint store contract failed");
525 }
526
527 #[tokio::test(flavor = "multi_thread")]
528 async fn checkpoint_update_overwrites_contract() {
529 test_checkpoint_update_overwrites($make_checkpoint_store)
530 .await
531 .expect("checkpoint store contract failed");
532 }
533
534 #[tokio::test(flavor = "multi_thread")]
535 async fn checkpoint_load_missing_returns_none_contract() {
536 test_checkpoint_load_missing_returns_none($make_checkpoint_store)
537 .await
538 .expect("checkpoint store contract failed");
539 }
540
541 #[tokio::test(flavor = "multi_thread")]
542 async fn checkpoint_independent_subscriptions_contract() {
543 test_checkpoint_independent_subscriptions($make_checkpoint_store)
544 .await
545 .expect("checkpoint store contract failed");
546 }
547
548 #[tokio::test(flavor = "multi_thread")]
550 async fn coordination_acquire_leadership_contract() {
551 test_coordination_acquire_leadership($make_coordinator)
552 .await
553 .expect("coordinator contract failed");
554 }
555
556 #[tokio::test(flavor = "multi_thread")]
557 async fn coordination_second_instance_blocked_contract() {
558 test_coordination_second_instance_blocked($make_coordinator)
559 .await
560 .expect("coordinator contract failed");
561 }
562
563 #[tokio::test(flavor = "multi_thread")]
564 async fn coordination_independent_subscriptions_contract() {
565 test_coordination_independent_subscriptions($make_coordinator)
566 .await
567 .expect("coordinator contract failed");
568 }
569
570 #[tokio::test(flavor = "multi_thread")]
571 async fn coordination_leadership_released_on_guard_drop_contract() {
572 test_coordination_leadership_released_on_guard_drop($make_coordinator)
573 .await
574 .expect("coordinator contract failed");
575 }
576 }
577 };
578}
579
580pub use backend_contract_tests;
581
582pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
584where
585 F: Fn() -> S + Send + Sync + Clone + 'static,
586 S: EventStore + EventReader + Send + Sync + 'static,
587{
588 const SCENARIO: &str = "event_ordering_across_streams";
589
590 let store = make_store();
591
592 let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
594 let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
595 let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
596
597 let writes = register_contract_stream(
599 SCENARIO,
600 StreamWrites::new(),
601 &stream_a,
602 StreamVersion::new(0),
603 )?;
604 let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
605 let _ = store
606 .append_events(writes)
607 .await
608 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
609
610 let writes = register_contract_stream(
612 SCENARIO,
613 StreamWrites::new(),
614 &stream_b,
615 StreamVersion::new(0),
616 )?;
617 let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
618 let _ = store
619 .append_events(writes)
620 .await
621 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
622
623 let writes = register_contract_stream(
625 SCENARIO,
626 StreamWrites::new(),
627 &stream_c,
628 StreamVersion::new(0),
629 )?;
630 let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
631 let _ = store
632 .append_events(writes)
633 .await
634 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
635
636 let filter = EventFilter::all();
638 let page = EventPage::first(BatchSize::new(100));
639 let events = store
640 .read_events::<ContractTestEvent>(filter, page)
641 .await
642 .map_err(|_error| {
643 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
644 })?;
645
646 if events.len() != 3 {
648 return Err(ContractTestFailure::assertion(
649 SCENARIO,
650 format!("expected 3 events but got {}", events.len()),
651 ));
652 }
653
654 let (first_event, _) = &events[0];
656 if first_event.stream_id() != &stream_a {
657 return Err(ContractTestFailure::assertion(
658 SCENARIO,
659 format!(
660 "expected first event from stream_a but got from {:?}",
661 first_event.stream_id()
662 ),
663 ));
664 }
665
666 let (second_event, _) = &events[1];
667 if second_event.stream_id() != &stream_b {
668 return Err(ContractTestFailure::assertion(
669 SCENARIO,
670 format!(
671 "expected second event from stream_b but got from {:?}",
672 second_event.stream_id()
673 ),
674 ));
675 }
676
677 let (third_event, _) = &events[2];
678 if third_event.stream_id() != &stream_c {
679 return Err(ContractTestFailure::assertion(
680 SCENARIO,
681 format!(
682 "expected third event from stream_c but got from {:?}",
683 third_event.stream_id()
684 ),
685 ));
686 }
687
688 Ok(())
689}
690
691pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
693where
694 F: Fn() -> S + Send + Sync + Clone + 'static,
695 S: EventStore + EventReader + Send + Sync + 'static,
696{
697 const SCENARIO: &str = "position_based_resumption";
698
699 let store = make_store();
700
701 let stream = contract_stream_id(SCENARIO, "stream")?;
703
704 let mut writes = register_contract_stream(
705 SCENARIO,
706 StreamWrites::new(),
707 &stream,
708 StreamVersion::new(0),
709 )?;
710
711 for _ in 0..5 {
713 writes = append_contract_event(SCENARIO, writes, &stream)?;
714 }
715
716 let _ = store
717 .append_events(writes)
718 .await
719 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
720
721 let filter = EventFilter::all();
723 let page = EventPage::first(BatchSize::new(100));
724 let all_events = store
725 .read_events::<ContractTestEvent>(filter.clone(), page)
726 .await
727 .map_err(|_error| {
728 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
729 })?;
730
731 let (_third_event, third_position) = &all_events[2];
732
733 let page_after = EventPage::after(*third_position, BatchSize::new(100));
735 let events_after = store
736 .read_events::<ContractTestEvent>(filter, page_after)
737 .await
738 .map_err(|_error| {
739 ContractTestFailure::assertion(
740 SCENARIO,
741 "read_events failed when reading after position",
742 )
743 })?;
744
745 if events_after.len() != 2 {
747 return Err(ContractTestFailure::assertion(
748 SCENARIO,
749 format!(
750 "expected 2 events after position {} but got {}",
751 third_position,
752 events_after.len()
753 ),
754 ));
755 }
756
757 for (_event, position) in events_after.iter() {
759 if *position == *third_position {
760 return Err(ContractTestFailure::assertion(
761 SCENARIO,
762 format!(
763 "expected position {} to be excluded but it was included in results",
764 third_position
765 ),
766 ));
767 }
768 }
769
770 let (_event1, pos1) = &events_after[0];
772 let (_event2, pos2) = &events_after[1];
773
774 if *pos1 <= *third_position {
775 return Err(ContractTestFailure::assertion(
776 SCENARIO,
777 format!(
778 "expected first returned position to be > {} but got {}",
779 third_position, pos1
780 ),
781 ));
782 }
783
784 if *pos2 <= *pos1 {
785 return Err(ContractTestFailure::assertion(
786 SCENARIO,
787 format!(
788 "expected positions to be in ascending order but {} <= {}",
789 pos2, pos1
790 ),
791 ));
792 }
793
794 Ok(())
795}
796
797pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
799where
800 F: Fn() -> S + Send + Sync + Clone + 'static,
801 S: EventStore + EventReader + Send + Sync + 'static,
802{
803 const SCENARIO: &str = "stream_prefix_filtering";
804
805 let store = make_store();
806
807 let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
809 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
810 })?;
811 let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
812 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
813 })?;
814 let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
815 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
816 })?;
817
818 let mut writes = register_contract_stream(
819 SCENARIO,
820 StreamWrites::new(),
821 &account_1,
822 StreamVersion::new(0),
823 )?;
824 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
825 writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
826
827 writes = append_contract_event(SCENARIO, writes, &account_1)?;
828 writes = append_contract_event(SCENARIO, writes, &account_2)?;
829 writes = append_contract_event(SCENARIO, writes, &order_1)?;
830
831 let _ = store
832 .append_events(writes)
833 .await
834 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
835
836 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
838 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
839 })?;
840 let filter = EventFilter::prefix(prefix);
841 let page = EventPage::first(BatchSize::new(100));
842 let events = store
843 .read_events::<ContractTestEvent>(filter, page)
844 .await
845 .map_err(|_error| {
846 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
847 })?;
848
849 if events.len() != 2 {
851 return Err(ContractTestFailure::assertion(
852 SCENARIO,
853 format!(
854 "expected 2 events from account-* streams but got {}",
855 events.len()
856 ),
857 ));
858 }
859
860 for (event, _) in events.iter() {
862 let stream_id_str = event.stream_id().as_ref();
863 if !stream_id_str.starts_with("account-") {
864 return Err(ContractTestFailure::assertion(
865 SCENARIO,
866 format!(
867 "expected all events from streams starting with 'account-' but found event from {}",
868 stream_id_str
869 ),
870 ));
871 }
872 }
873
874 Ok(())
877}
878
879pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
881where
882 F: Fn() -> S + Send + Sync + Clone + 'static,
883 S: EventStore + EventReader + Send + Sync + 'static,
884{
885 const SCENARIO: &str = "stream_prefix_requires_prefix_match";
886
887 let store = make_store();
888
889 let account_stream =
892 StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
893 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
894 })?;
895 let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
896 .map_err(|e| {
897 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
898 })?;
899 let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
900 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
901 })?;
902
903 let mut writes = register_contract_stream(
904 SCENARIO,
905 StreamWrites::new(),
906 &account_stream,
907 StreamVersion::new(0),
908 )?;
909 writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
910 writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
911
912 writes = append_contract_event(SCENARIO, writes, &account_stream)?;
913 writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
914 writes = append_contract_event(SCENARIO, writes, &order_stream)?;
915
916 let _ = store
917 .append_events(writes)
918 .await
919 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
920
921 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
923 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
924 })?;
925 let filter = EventFilter::prefix(prefix);
926 let page = EventPage::first(BatchSize::new(100));
927 let events = store
928 .read_events::<ContractTestEvent>(filter, page)
929 .await
930 .map_err(|_error| {
931 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
932 })?;
933
934 if events.len() != 1 {
936 return Err(ContractTestFailure::assertion(
937 SCENARIO,
938 format!(
939 "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
940 events.len()
941 ),
942 ));
943 }
944
945 let (event, _) = &events[0];
947 let stream_id_str = event.stream_id().as_ref();
948 if !stream_id_str.starts_with("account-123") {
949 return Err(ContractTestFailure::assertion(
950 SCENARIO,
951 format!(
952 "expected event from stream starting with 'account-123' but got from {}",
953 stream_id_str
954 ),
955 ));
956 }
957
958 if stream_id_str.starts_with("my-account-456") {
960 return Err(ContractTestFailure::assertion(
961 SCENARIO,
962 "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
963 ));
964 }
965
966 Ok(())
967}
968
969pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
971where
972 F: Fn() -> S + Send + Sync + Clone + 'static,
973 S: EventStore + EventReader + Send + Sync + 'static,
974{
975 const SCENARIO: &str = "batch_limiting";
976
977 let store = make_store();
978
979 let stream = contract_stream_id(SCENARIO, "stream")?;
981
982 let mut writes = register_contract_stream(
983 SCENARIO,
984 StreamWrites::new(),
985 &stream,
986 StreamVersion::new(0),
987 )?;
988
989 for _ in 0..20 {
991 writes = append_contract_event(SCENARIO, writes, &stream)?;
992 }
993
994 let _ = store
995 .append_events(writes)
996 .await
997 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
998
999 let filter = EventFilter::all();
1001 let page = EventPage::first(BatchSize::new(10));
1002 let events = store
1003 .read_events::<ContractTestEvent>(filter, page)
1004 .await
1005 .map_err(|_error| {
1006 ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1007 })?;
1008
1009 if events.len() != 10 {
1011 return Err(ContractTestFailure::assertion(
1012 SCENARIO,
1013 format!("expected exactly 10 events but got {}", events.len()),
1014 ));
1015 }
1016
1017 Ok(())
1022}
1023
1024pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1030where
1031 F: Fn() -> CS + Send + Sync + Clone + 'static,
1032 CS: CheckpointStore + Send + Sync + 'static,
1033{
1034 const SCENARIO: &str = "checkpoint_save_and_load";
1035
1036 let store = make_checkpoint_store();
1037
1038 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1040 let position = StreamPosition::new(Uuid::now_v7());
1041
1042 store
1044 .save(&subscription_name, position)
1045 .await
1046 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1047
1048 let loaded = store
1050 .load(&subscription_name)
1051 .await
1052 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1053
1054 if loaded != Some(position) {
1055 return Err(ContractTestFailure::assertion(
1056 SCENARIO,
1057 format!(
1058 "expected loaded position {:?} but got {:?}",
1059 Some(position),
1060 loaded
1061 ),
1062 ));
1063 }
1064
1065 Ok(())
1066}
1067
1068pub async fn test_checkpoint_update_overwrites<F, CS>(
1070 make_checkpoint_store: F,
1071) -> ContractTestResult
1072where
1073 F: Fn() -> CS + Send + Sync + Clone + 'static,
1074 CS: CheckpointStore + Send + Sync + 'static,
1075{
1076 const SCENARIO: &str = "checkpoint_update_overwrites";
1077
1078 let store = make_checkpoint_store();
1079
1080 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1082 let first_position = StreamPosition::new(Uuid::now_v7());
1083
1084 store
1085 .save(&subscription_name, first_position)
1086 .await
1087 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1088
1089 let second_position = StreamPosition::new(Uuid::now_v7());
1091 store
1092 .save(&subscription_name, second_position)
1093 .await
1094 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1095
1096 let loaded = store
1098 .load(&subscription_name)
1099 .await
1100 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1101
1102 if loaded != Some(second_position) {
1103 return Err(ContractTestFailure::assertion(
1104 SCENARIO,
1105 format!(
1106 "expected updated position {:?} but got {:?}",
1107 Some(second_position),
1108 loaded
1109 ),
1110 ));
1111 }
1112
1113 Ok(())
1114}
1115
1116pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1118 make_checkpoint_store: F,
1119) -> ContractTestResult
1120where
1121 F: Fn() -> CS + Send + Sync + Clone + 'static,
1122 CS: CheckpointStore + Send + Sync + 'static,
1123{
1124 const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1125
1126 let store = make_checkpoint_store();
1127
1128 let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1130
1131 let loaded = store
1133 .load(&subscription_name)
1134 .await
1135 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1136
1137 if loaded.is_some() {
1139 return Err(ContractTestFailure::assertion(
1140 SCENARIO,
1141 format!("expected None for missing checkpoint but got {:?}", loaded),
1142 ));
1143 }
1144
1145 Ok(())
1146}
1147
1148pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1150 make_checkpoint_store: F,
1151) -> ContractTestResult
1152where
1153 F: Fn() -> CS + Send + Sync + Clone + 'static,
1154 CS: CheckpointStore + Send + Sync + 'static,
1155{
1156 const SCENARIO: &str = "checkpoint_independent_subscriptions";
1157
1158 let store = make_checkpoint_store();
1159
1160 let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1162 let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1163
1164 let position_a = StreamPosition::new(Uuid::now_v7());
1165 let position_b = StreamPosition::new(Uuid::now_v7());
1166
1167 store
1169 .save(&subscription_a, position_a)
1170 .await
1171 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1172
1173 store
1174 .save(&subscription_b, position_b)
1175 .await
1176 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1177
1178 let loaded_a = store
1180 .load(&subscription_a)
1181 .await
1182 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1183
1184 let loaded_b = store
1185 .load(&subscription_b)
1186 .await
1187 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1188
1189 if loaded_a != Some(position_a) {
1190 return Err(ContractTestFailure::assertion(
1191 SCENARIO,
1192 format!(
1193 "subscription A: expected {:?} but got {:?}",
1194 Some(position_a),
1195 loaded_a
1196 ),
1197 ));
1198 }
1199
1200 if loaded_b != Some(position_b) {
1201 return Err(ContractTestFailure::assertion(
1202 SCENARIO,
1203 format!(
1204 "subscription B: expected {:?} but got {:?}",
1205 Some(position_b),
1206 loaded_b
1207 ),
1208 ));
1209 }
1210
1211 Ok(())
1212}
1213
1214pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1223where
1224 F: Fn() -> C + Send + Sync + Clone + 'static,
1225 C: ProjectorCoordinator + Send + Sync + 'static,
1226{
1227 const SCENARIO: &str = "coordination_acquire_leadership";
1228
1229 let coordinator = make_coordinator();
1230
1231 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1233
1234 let result = coordinator.try_acquire(&subscription_name).await;
1236
1237 if result.is_err() {
1239 return Err(ContractTestFailure::assertion(
1240 SCENARIO,
1241 "expected first instance to acquire leadership successfully, but try_acquire failed",
1242 ));
1243 }
1244
1245 Ok(())
1246}
1247
1248pub async fn test_coordination_second_instance_blocked<F, C>(
1253 make_coordinator: F,
1254) -> ContractTestResult
1255where
1256 F: Fn() -> C + Send + Sync + Clone + 'static,
1257 C: ProjectorCoordinator + Send + Sync + 'static,
1258{
1259 const SCENARIO: &str = "coordination_second_instance_blocked";
1260
1261 let coordinator = make_coordinator();
1262
1263 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1265
1266 let _first_guard = coordinator
1268 .try_acquire(&subscription_name)
1269 .await
1270 .map_err(|_| {
1271 ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1272 })?;
1273
1274 let second_result = coordinator.try_acquire(&subscription_name).await;
1276
1277 if second_result.is_ok() {
1279 return Err(ContractTestFailure::assertion(
1280 SCENARIO,
1281 "expected second instance to be blocked but try_acquire succeeded",
1282 ));
1283 }
1284
1285 Ok(())
1286}
1287
1288pub async fn test_coordination_independent_subscriptions<F, C>(
1294 make_coordinator: F,
1295) -> ContractTestResult
1296where
1297 F: Fn() -> C + Send + Sync + Clone + 'static,
1298 C: ProjectorCoordinator + Send + Sync + 'static,
1299{
1300 const SCENARIO: &str = "coordination_independent_subscriptions";
1301
1302 let coordinator = make_coordinator();
1303
1304 let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1306 let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1307
1308 let _guard_a = coordinator
1310 .try_acquire(&subscription_a)
1311 .await
1312 .map_err(|_| {
1313 ContractTestFailure::assertion(
1314 SCENARIO,
1315 "projector-A failed to acquire leadership for its subscription",
1316 )
1317 })?;
1318
1319 let result_b = coordinator.try_acquire(&subscription_b).await;
1322
1323 if result_b.is_err() {
1325 return Err(ContractTestFailure::assertion(
1326 SCENARIO,
1327 "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1328 ));
1329 }
1330
1331 Ok(())
1332}
1333
1334pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1340 make_coordinator: F,
1341) -> ContractTestResult
1342where
1343 F: Fn() -> C + Send + Sync + Clone + 'static,
1344 C: ProjectorCoordinator + Send + Sync + 'static,
1345{
1346 const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1347
1348 let coordinator = make_coordinator();
1349
1350 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1352
1353 {
1355 let _first_guard = coordinator
1356 .try_acquire(&subscription_name)
1357 .await
1358 .map_err(|_| {
1359 ContractTestFailure::assertion(
1360 SCENARIO,
1361 "first instance failed to acquire leadership",
1362 )
1363 })?;
1364 }
1366
1367 let second_result = coordinator.try_acquire(&subscription_name).await;
1369
1370 if second_result.is_err() {
1372 return Err(ContractTestFailure::assertion(
1373 SCENARIO,
1374 "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1375 ));
1376 }
1377
1378 Ok(())
1379}