1use eventcore_types::{
2 BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3 EventStoreError, ProjectorCoordinator, StreamId, StreamPattern, StreamPosition, StreamPrefix,
4 StreamVersion, StreamWrites, collect_events,
5};
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug, thiserror::Error)]
11#[error("[{scenario}] {detail}")]
12pub struct ContractTestFailure {
13 scenario: &'static str,
14 detail: String,
15}
16
17impl ContractTestFailure {
18 fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19 Self {
20 scenario,
21 detail: detail.into(),
22 }
23 }
24
25 fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26 Self::new(scenario, format!("builder failure during {phase}: {error}"))
27 }
28
29 fn store_error(
30 scenario: &'static str,
31 operation: &'static str,
32 error: EventStoreError,
33 ) -> Self {
34 Self::new(
35 scenario,
36 format!("{operation} operation returned unexpected error: {error}"),
37 )
38 }
39
40 fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41 Self::new(scenario, detail)
42 }
43}
44
45pub type ContractTestResult = Result<(), ContractTestFailure>;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContractTestEvent {
49 stream_id: StreamId,
50}
51
52impl ContractTestEvent {
53 pub fn new(stream_id: StreamId) -> Self {
54 Self { stream_id }
55 }
56}
57
58impl Event for ContractTestEvent {
59 fn stream_id(&self) -> &StreamId {
60 &self.stream_id
61 }
62
63 fn event_type_name() -> &'static str {
64 "ContractTestEvent"
65 }
66}
67
68fn contract_stream_id(
69 scenario: &'static str,
70 label: &str,
71) -> Result<StreamId, ContractTestFailure> {
72 let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
74
75 StreamId::try_new(raw.clone()).map_err(|error| {
76 ContractTestFailure::assertion(
77 scenario,
78 format!("unable to construct stream id `{}`: {}", raw, error),
79 )
80 })
81}
82
83fn builder_step(
84 scenario: &'static str,
85 phase: &'static str,
86 result: Result<StreamWrites, EventStoreError>,
87) -> Result<StreamWrites, ContractTestFailure> {
88 result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
89}
90
91fn register_contract_stream(
92 scenario: &'static str,
93 writes: StreamWrites,
94 stream_id: &StreamId,
95 expected_version: StreamVersion,
96) -> Result<StreamWrites, ContractTestFailure> {
97 builder_step(
98 scenario,
99 "register_stream",
100 writes.register_stream(stream_id.clone(), expected_version),
101 )
102}
103
104fn append_contract_event(
105 scenario: &'static str,
106 writes: StreamWrites,
107 stream_id: &StreamId,
108) -> Result<StreamWrites, ContractTestFailure> {
109 let event = ContractTestEvent::new(stream_id.clone());
110 builder_step(scenario, "append", writes.append(event))
111}
112
113pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
114where
115 F: Fn() -> S + Send + Sync + Clone + 'static,
116 S: EventStore + Send + Sync + 'static,
117{
118 const SCENARIO: &str = "basic_read_write";
119
120 let store = make_store();
121 let stream_id = contract_stream_id(SCENARIO, "single");
122
123 let stream_id = stream_id?;
124
125 let writes = register_contract_stream(
126 SCENARIO,
127 StreamWrites::new(),
128 &stream_id,
129 StreamVersion::new(0),
130 )?;
131 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
132
133 let _ = store
134 .append_events(writes)
135 .await
136 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
137
138 let stream = store
139 .read_stream::<ContractTestEvent>(stream_id.clone())
140 .await
141 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
142 let events = collect_events(stream)
143 .await
144 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146 let len = events.len();
147 let empty = events.is_empty();
148
149 if empty {
150 return Err(ContractTestFailure::assertion(
151 SCENARIO,
152 "expected stream to contain events but it was empty",
153 ));
154 }
155
156 if len != 1 {
157 return Err(ContractTestFailure::assertion(
158 SCENARIO,
159 format!(
160 "expected stream to contain exactly one event, observed len={}",
161 len
162 ),
163 ));
164 }
165
166 Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171 F: Fn() -> S + Send + Sync + Clone + 'static,
172 S: EventStore + Send + Sync + 'static,
173{
174 const SCENARIO: &str = "concurrent_version_conflicts";
175
176 let store = make_store();
177 let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179 let first_writes = register_contract_stream(
180 SCENARIO,
181 StreamWrites::new(),
182 &stream_id,
183 StreamVersion::new(0),
184 )?;
185 let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187 let _ = store
188 .append_events(first_writes)
189 .await
190 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192 let conflicting_writes = register_contract_stream(
193 SCENARIO,
194 StreamWrites::new(),
195 &stream_id,
196 StreamVersion::new(0),
197 )?;
198 let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200 match store.append_events(conflicting_writes).await {
201 Err(EventStoreError::VersionConflict { .. }) => Ok(()),
202 Err(error) => Err(ContractTestFailure::store_error(
203 SCENARIO,
204 "append_events",
205 error,
206 )),
207 Ok(_) => Err(ContractTestFailure::assertion(
208 SCENARIO,
209 "expected version conflict but append succeeded",
210 )),
211 }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216 F: Fn() -> S + Send + Sync + Clone + 'static,
217 S: EventStore + Send + Sync + 'static,
218{
219 const SCENARIO: &str = "stream_isolation";
220
221 let store = make_store();
222 let left_stream = contract_stream_id(SCENARIO, "left")?;
223 let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225 let writes = register_contract_stream(
226 SCENARIO,
227 StreamWrites::new(),
228 &left_stream,
229 StreamVersion::new(0),
230 )?;
231 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235 let _ = store
236 .append_events(writes)
237 .await
238 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240 let left_stream_events = store
241 .read_stream::<ContractTestEvent>(left_stream.clone())
242 .await
243 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244 let left_events = collect_events(left_stream_events)
245 .await
246 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
247
248 let right_stream_events = store
249 .read_stream::<ContractTestEvent>(right_stream.clone())
250 .await
251 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
252 let right_events = collect_events(right_stream_events)
253 .await
254 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
255
256 let left_len = left_events.len();
257 if left_len != 1 {
258 return Err(ContractTestFailure::assertion(
259 SCENARIO,
260 format!(
261 "left stream expected exactly one event but observed {}",
262 left_len
263 ),
264 ));
265 }
266
267 if left_events
268 .iter()
269 .any(|event| event.stream_id() != &left_stream)
270 {
271 return Err(ContractTestFailure::assertion(
272 SCENARIO,
273 "left stream read events belonging to another stream",
274 ));
275 }
276
277 let right_len = right_events.len();
278 if right_len != 1 {
279 return Err(ContractTestFailure::assertion(
280 SCENARIO,
281 format!(
282 "right stream expected exactly one event but observed {}",
283 right_len
284 ),
285 ));
286 }
287
288 if right_events
289 .iter()
290 .any(|event| event.stream_id() != &right_stream)
291 {
292 return Err(ContractTestFailure::assertion(
293 SCENARIO,
294 "right stream read events belonging to another stream",
295 ));
296 }
297
298 Ok(())
299}
300
301pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
302where
303 F: Fn() -> S + Send + Sync + Clone + 'static,
304 S: EventStore + Send + Sync + 'static,
305{
306 const SCENARIO: &str = "missing_stream_reads";
307
308 let store = make_store();
309 let stream_id = contract_stream_id(SCENARIO, "ghost")?;
310
311 let stream = store
312 .read_stream::<ContractTestEvent>(stream_id.clone())
313 .await
314 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
315 let events = collect_events(stream)
316 .await
317 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
318
319 if !events.is_empty() {
320 return Err(ContractTestFailure::assertion(
321 SCENARIO,
322 "expected read_stream to succeed with no events for an untouched stream",
323 ));
324 }
325
326 Ok(())
327}
328
329pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
330where
331 F: Fn() -> S + Send + Sync + Clone + 'static,
332 S: EventStore + Send + Sync + 'static,
333{
334 const SCENARIO: &str = "conflict_preserves_atomicity";
335
336 let store = make_store();
337 let left_stream = contract_stream_id(SCENARIO, "left")?;
338 let right_stream = contract_stream_id(SCENARIO, "right")?;
339
340 let writes = register_contract_stream(
342 SCENARIO,
343 StreamWrites::new(),
344 &left_stream,
345 StreamVersion::new(0),
346 )?;
347 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
348 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
349 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
350
351 let _ = store
352 .append_events(writes)
353 .await
354 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
355
356 let writes = register_contract_stream(
358 SCENARIO,
359 StreamWrites::new(),
360 &left_stream,
361 StreamVersion::new(0),
362 )?;
363 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
364 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
365 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
366
367 match store.append_events(writes).await {
368 Err(EventStoreError::VersionConflict { .. }) => {
369 let left_stream_events = store
370 .read_stream::<ContractTestEvent>(left_stream.clone())
371 .await
372 .map_err(|error| {
373 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
374 })?;
375 let left_events = collect_events(left_stream_events).await.map_err(|error| {
376 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
377 })?;
378 if left_events.len() != 1 {
379 return Err(ContractTestFailure::assertion(
380 SCENARIO,
381 format!(
382 "expected left stream to remain at len=1 after failed append, observed {}",
383 left_events.len()
384 ),
385 ));
386 }
387
388 let right_stream_events = store
389 .read_stream::<ContractTestEvent>(right_stream.clone())
390 .await
391 .map_err(|error| {
392 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
393 })?;
394 let right_events = collect_events(right_stream_events).await.map_err(|error| {
395 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
396 })?;
397 if right_events.len() != 1 {
398 return Err(ContractTestFailure::assertion(
399 SCENARIO,
400 format!(
401 "expected right stream to remain at len=1 after failed append, observed {}",
402 right_events.len()
403 ),
404 ));
405 }
406
407 Ok(())
408 }
409 Err(error) => Err(ContractTestFailure::store_error(
410 SCENARIO,
411 "append_events",
412 error,
413 )),
414 Ok(_) => Err(ContractTestFailure::assertion(
415 SCENARIO,
416 "expected version conflict but append succeeded",
417 )),
418 }
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct MismatchedEvent {
424 stream_id: StreamId,
425 extra_field: String,
426}
427
428impl Event for MismatchedEvent {
429 fn stream_id(&self) -> &StreamId {
430 &self.stream_id
431 }
432
433 fn event_type_name() -> &'static str {
434 "MismatchedEvent"
435 }
436}
437
438pub async fn test_read_stream_errors_on_type_mismatch<F, S>(make_store: F) -> ContractTestResult
446where
447 F: Fn() -> S + Send + Sync + Clone + 'static,
448 S: EventStore + Send + Sync + 'static,
449{
450 const SCENARIO: &str = "read_stream_errors_on_type_mismatch";
451
452 let store = make_store();
453 let stream_id = contract_stream_id(SCENARIO, "mismatched")?;
454
455 let writes = register_contract_stream(
457 SCENARIO,
458 StreamWrites::new(),
459 &stream_id,
460 StreamVersion::new(0),
461 )?;
462 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
463
464 let _ = store
465 .append_events(writes)
466 .await
467 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
468
469 let result = match store.read_stream::<MismatchedEvent>(stream_id).await {
475 Ok(stream) => collect_events(stream).await,
476 Err(error) => Err(error),
477 };
478
479 match result {
480 Err(EventStoreError::DeserializationFailed { .. }) => Ok(()),
481 Err(other) => Err(ContractTestFailure::assertion(
482 SCENARIO,
483 format!(
484 "expected DeserializationFailed error, got different error: {}",
485 other
486 ),
487 )),
488 Ok(events) if events.is_empty() => Err(ContractTestFailure::assertion(
489 SCENARIO,
490 "read_stream silently returned empty results instead of erroring on type mismatch",
491 )),
492 Ok(events) => Err(ContractTestFailure::assertion(
493 SCENARIO,
494 format!(
495 "read_stream returned {} events instead of erroring on type mismatch",
496 events.len()
497 ),
498 )),
499 }
500}
501
502#[macro_export]
528macro_rules! backend_contract_tests {
529 (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
530 #[allow(non_snake_case)]
531 mod $suite {
532 use $crate::contract::{
533 test_basic_read_write, test_batch_limiting,
534 test_checkpoint_independent_subscriptions,
535 test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
536 test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
537 test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
538 test_coordination_independent_subscriptions,
539 test_coordination_leadership_released_on_guard_drop,
540 test_coordination_second_instance_blocked, test_event_ordering_across_streams,
541 test_missing_stream_reads, test_position_based_resumption,
542 test_read_stream_errors_on_type_mismatch, test_stream_isolation,
543 test_stream_pattern_char_class, test_stream_pattern_filtering,
544 test_stream_pattern_single_char, test_stream_prefix_filtering,
545 test_stream_prefix_requires_prefix_match,
546 };
547
548 #[tokio::test(flavor = "multi_thread")]
549 async fn basic_read_write_contract() {
550 test_basic_read_write($make_store)
551 .await
552 .expect("event store contract failed");
553 }
554
555 #[tokio::test(flavor = "multi_thread")]
556 async fn concurrent_version_conflicts_contract() {
557 test_concurrent_version_conflicts($make_store)
558 .await
559 .expect("event store contract failed");
560 }
561
562 #[tokio::test(flavor = "multi_thread")]
563 async fn stream_isolation_contract() {
564 test_stream_isolation($make_store)
565 .await
566 .expect("event store contract failed");
567 }
568
569 #[tokio::test(flavor = "multi_thread")]
570 async fn missing_stream_reads_contract() {
571 test_missing_stream_reads($make_store)
572 .await
573 .expect("event store contract failed");
574 }
575
576 #[tokio::test(flavor = "multi_thread")]
577 async fn conflict_preserves_atomicity_contract() {
578 test_conflict_preserves_atomicity($make_store)
579 .await
580 .expect("event store contract failed");
581 }
582
583 #[tokio::test(flavor = "multi_thread")]
584 async fn read_stream_errors_on_type_mismatch_contract() {
585 test_read_stream_errors_on_type_mismatch($make_store)
586 .await
587 .expect("event store contract failed");
588 }
589
590 #[tokio::test(flavor = "multi_thread")]
591 async fn event_ordering_across_streams_contract() {
592 test_event_ordering_across_streams($make_store)
593 .await
594 .expect("event reader contract failed");
595 }
596
597 #[tokio::test(flavor = "multi_thread")]
598 async fn position_based_resumption_contract() {
599 test_position_based_resumption($make_store)
600 .await
601 .expect("event reader contract failed");
602 }
603
604 #[tokio::test(flavor = "multi_thread")]
605 async fn stream_prefix_filtering_contract() {
606 test_stream_prefix_filtering($make_store)
607 .await
608 .expect("event reader contract failed");
609 }
610
611 #[tokio::test(flavor = "multi_thread")]
612 async fn stream_prefix_requires_prefix_match_contract() {
613 test_stream_prefix_requires_prefix_match($make_store)
614 .await
615 .expect("event reader contract failed");
616 }
617
618 #[tokio::test(flavor = "multi_thread")]
619 async fn stream_pattern_filtering_contract() {
620 test_stream_pattern_filtering($make_store)
621 .await
622 .expect("event reader contract failed");
623 }
624
625 #[tokio::test(flavor = "multi_thread")]
626 async fn stream_pattern_single_char_contract() {
627 test_stream_pattern_single_char($make_store)
628 .await
629 .expect("event reader contract failed");
630 }
631
632 #[tokio::test(flavor = "multi_thread")]
633 async fn stream_pattern_char_class_contract() {
634 test_stream_pattern_char_class($make_store)
635 .await
636 .expect("event reader contract failed");
637 }
638
639 #[tokio::test(flavor = "multi_thread")]
640 async fn batch_limiting_contract() {
641 test_batch_limiting($make_store)
642 .await
643 .expect("event reader contract failed");
644 }
645
646 #[tokio::test(flavor = "multi_thread")]
648 async fn checkpoint_save_and_load_contract() {
649 test_checkpoint_save_and_load($make_checkpoint_store)
650 .await
651 .expect("checkpoint store contract failed");
652 }
653
654 #[tokio::test(flavor = "multi_thread")]
655 async fn checkpoint_update_overwrites_contract() {
656 test_checkpoint_update_overwrites($make_checkpoint_store)
657 .await
658 .expect("checkpoint store contract failed");
659 }
660
661 #[tokio::test(flavor = "multi_thread")]
662 async fn checkpoint_load_missing_returns_none_contract() {
663 test_checkpoint_load_missing_returns_none($make_checkpoint_store)
664 .await
665 .expect("checkpoint store contract failed");
666 }
667
668 #[tokio::test(flavor = "multi_thread")]
669 async fn checkpoint_independent_subscriptions_contract() {
670 test_checkpoint_independent_subscriptions($make_checkpoint_store)
671 .await
672 .expect("checkpoint store contract failed");
673 }
674
675 #[tokio::test(flavor = "multi_thread")]
677 async fn coordination_acquire_leadership_contract() {
678 test_coordination_acquire_leadership($make_coordinator)
679 .await
680 .expect("coordinator contract failed");
681 }
682
683 #[tokio::test(flavor = "multi_thread")]
684 async fn coordination_second_instance_blocked_contract() {
685 test_coordination_second_instance_blocked($make_coordinator)
686 .await
687 .expect("coordinator contract failed");
688 }
689
690 #[tokio::test(flavor = "multi_thread")]
691 async fn coordination_independent_subscriptions_contract() {
692 test_coordination_independent_subscriptions($make_coordinator)
693 .await
694 .expect("coordinator contract failed");
695 }
696
697 #[tokio::test(flavor = "multi_thread")]
698 async fn coordination_leadership_released_on_guard_drop_contract() {
699 test_coordination_leadership_released_on_guard_drop($make_coordinator)
700 .await
701 .expect("coordinator contract failed");
702 }
703 }
704 };
705}
706
707pub use backend_contract_tests;
708
709pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
711where
712 F: Fn() -> S + Send + Sync + Clone + 'static,
713 S: EventStore + EventReader + Send + Sync + 'static,
714{
715 const SCENARIO: &str = "event_ordering_across_streams";
716
717 let store = make_store();
718
719 let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
721 let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
722 let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
723
724 let writes = register_contract_stream(
726 SCENARIO,
727 StreamWrites::new(),
728 &stream_a,
729 StreamVersion::new(0),
730 )?;
731 let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
732 let _ = store
733 .append_events(writes)
734 .await
735 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
736
737 let writes = register_contract_stream(
739 SCENARIO,
740 StreamWrites::new(),
741 &stream_b,
742 StreamVersion::new(0),
743 )?;
744 let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
745 let _ = store
746 .append_events(writes)
747 .await
748 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
749
750 let writes = register_contract_stream(
752 SCENARIO,
753 StreamWrites::new(),
754 &stream_c,
755 StreamVersion::new(0),
756 )?;
757 let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
758 let _ = store
759 .append_events(writes)
760 .await
761 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
762
763 let filter = EventFilter::all();
765 let page = EventPage::first(BatchSize::new(100));
766 let events = store
767 .read_events::<ContractTestEvent>(filter, page)
768 .await
769 .map_err(|_error| {
770 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
771 })?;
772
773 if events.len() != 3 {
775 return Err(ContractTestFailure::assertion(
776 SCENARIO,
777 format!("expected 3 events but got {}", events.len()),
778 ));
779 }
780
781 let (first_event, _) = &events[0];
783 if first_event.stream_id() != &stream_a {
784 return Err(ContractTestFailure::assertion(
785 SCENARIO,
786 format!(
787 "expected first event from stream_a but got from {:?}",
788 first_event.stream_id()
789 ),
790 ));
791 }
792
793 let (second_event, _) = &events[1];
794 if second_event.stream_id() != &stream_b {
795 return Err(ContractTestFailure::assertion(
796 SCENARIO,
797 format!(
798 "expected second event from stream_b but got from {:?}",
799 second_event.stream_id()
800 ),
801 ));
802 }
803
804 let (third_event, _) = &events[2];
805 if third_event.stream_id() != &stream_c {
806 return Err(ContractTestFailure::assertion(
807 SCENARIO,
808 format!(
809 "expected third event from stream_c but got from {:?}",
810 third_event.stream_id()
811 ),
812 ));
813 }
814
815 Ok(())
816}
817
818pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
820where
821 F: Fn() -> S + Send + Sync + Clone + 'static,
822 S: EventStore + EventReader + Send + Sync + 'static,
823{
824 const SCENARIO: &str = "position_based_resumption";
825
826 let store = make_store();
827
828 let stream = contract_stream_id(SCENARIO, "stream")?;
830
831 let mut writes = register_contract_stream(
832 SCENARIO,
833 StreamWrites::new(),
834 &stream,
835 StreamVersion::new(0),
836 )?;
837
838 for _ in 0..5 {
840 writes = append_contract_event(SCENARIO, writes, &stream)?;
841 }
842
843 let _ = store
844 .append_events(writes)
845 .await
846 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
847
848 let filter = EventFilter::all();
850 let page = EventPage::first(BatchSize::new(100));
851 let all_events = store
852 .read_events::<ContractTestEvent>(filter.clone(), page)
853 .await
854 .map_err(|_error| {
855 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
856 })?;
857
858 let (_third_event, third_position) = &all_events[2];
859
860 let page_after = EventPage::after(*third_position, BatchSize::new(100));
862 let events_after = store
863 .read_events::<ContractTestEvent>(filter, page_after)
864 .await
865 .map_err(|_error| {
866 ContractTestFailure::assertion(
867 SCENARIO,
868 "read_events failed when reading after position",
869 )
870 })?;
871
872 if events_after.len() != 2 {
874 return Err(ContractTestFailure::assertion(
875 SCENARIO,
876 format!(
877 "expected 2 events after position {} but got {}",
878 third_position,
879 events_after.len()
880 ),
881 ));
882 }
883
884 for (_event, position) in events_after.iter() {
886 if *position == *third_position {
887 return Err(ContractTestFailure::assertion(
888 SCENARIO,
889 format!(
890 "expected position {} to be excluded but it was included in results",
891 third_position
892 ),
893 ));
894 }
895 }
896
897 let (_event1, pos1) = &events_after[0];
899 let (_event2, pos2) = &events_after[1];
900
901 if *pos1 <= *third_position {
902 return Err(ContractTestFailure::assertion(
903 SCENARIO,
904 format!(
905 "expected first returned position to be > {} but got {}",
906 third_position, pos1
907 ),
908 ));
909 }
910
911 if *pos2 <= *pos1 {
912 return Err(ContractTestFailure::assertion(
913 SCENARIO,
914 format!(
915 "expected positions to be in ascending order but {} <= {}",
916 pos2, pos1
917 ),
918 ));
919 }
920
921 Ok(())
922}
923
924pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
926where
927 F: Fn() -> S + Send + Sync + Clone + 'static,
928 S: EventStore + EventReader + Send + Sync + 'static,
929{
930 const SCENARIO: &str = "stream_prefix_filtering";
931
932 let store = make_store();
933
934 let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
936 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
937 })?;
938 let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
939 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
940 })?;
941 let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
942 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
943 })?;
944
945 let mut writes = register_contract_stream(
946 SCENARIO,
947 StreamWrites::new(),
948 &account_1,
949 StreamVersion::new(0),
950 )?;
951 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
952 writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
953
954 writes = append_contract_event(SCENARIO, writes, &account_1)?;
955 writes = append_contract_event(SCENARIO, writes, &account_2)?;
956 writes = append_contract_event(SCENARIO, writes, &order_1)?;
957
958 let _ = store
959 .append_events(writes)
960 .await
961 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
962
963 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
965 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
966 })?;
967 let filter = EventFilter::prefix(prefix);
968 let page = EventPage::first(BatchSize::new(100));
969 let events = store
970 .read_events::<ContractTestEvent>(filter, page)
971 .await
972 .map_err(|_error| {
973 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
974 })?;
975
976 if events.len() != 2 {
978 return Err(ContractTestFailure::assertion(
979 SCENARIO,
980 format!(
981 "expected 2 events from account-* streams but got {}",
982 events.len()
983 ),
984 ));
985 }
986
987 for (event, _) in events.iter() {
989 let stream_id_str = event.stream_id().as_ref();
990 if !stream_id_str.starts_with("account-") {
991 return Err(ContractTestFailure::assertion(
992 SCENARIO,
993 format!(
994 "expected all events from streams starting with 'account-' but found event from {}",
995 stream_id_str
996 ),
997 ));
998 }
999 }
1000
1001 Ok(())
1004}
1005
1006pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
1008where
1009 F: Fn() -> S + Send + Sync + Clone + 'static,
1010 S: EventStore + EventReader + Send + Sync + 'static,
1011{
1012 const SCENARIO: &str = "stream_prefix_requires_prefix_match";
1013
1014 let store = make_store();
1015
1016 let account_stream =
1019 StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
1020 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1021 })?;
1022 let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
1023 .map_err(|e| {
1024 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1025 })?;
1026 let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
1027 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1028 })?;
1029
1030 let mut writes = register_contract_stream(
1031 SCENARIO,
1032 StreamWrites::new(),
1033 &account_stream,
1034 StreamVersion::new(0),
1035 )?;
1036 writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
1037 writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
1038
1039 writes = append_contract_event(SCENARIO, writes, &account_stream)?;
1040 writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
1041 writes = append_contract_event(SCENARIO, writes, &order_stream)?;
1042
1043 let _ = store
1044 .append_events(writes)
1045 .await
1046 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1047
1048 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
1050 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
1051 })?;
1052 let filter = EventFilter::prefix(prefix);
1053 let page = EventPage::first(BatchSize::new(100));
1054 let events = store
1055 .read_events::<ContractTestEvent>(filter, page)
1056 .await
1057 .map_err(|_error| {
1058 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
1059 })?;
1060
1061 if events.len() != 1 {
1063 return Err(ContractTestFailure::assertion(
1064 SCENARIO,
1065 format!(
1066 "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
1067 events.len()
1068 ),
1069 ));
1070 }
1071
1072 let (event, _) = &events[0];
1074 let stream_id_str = event.stream_id().as_ref();
1075 if !stream_id_str.starts_with("account-123") {
1076 return Err(ContractTestFailure::assertion(
1077 SCENARIO,
1078 format!(
1079 "expected event from stream starting with 'account-123' but got from {}",
1080 stream_id_str
1081 ),
1082 ));
1083 }
1084
1085 if stream_id_str.starts_with("my-account-456") {
1087 return Err(ContractTestFailure::assertion(
1088 SCENARIO,
1089 "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
1090 ));
1091 }
1092
1093 Ok(())
1094}
1095
1096pub async fn test_stream_pattern_filtering<F, S>(make_store: F) -> ContractTestResult
1105where
1106 F: Fn() -> S + Send + Sync + Clone + 'static,
1107 S: EventStore + EventReader + Send + Sync + 'static,
1108{
1109 const SCENARIO: &str = "stream_pattern_filtering";
1110
1111 let store = make_store();
1112 let run = Uuid::now_v7();
1113
1114 let mut writes = StreamWrites::new();
1117 let mut order_streams = Vec::new();
1118 for _ in 0..50 {
1119 let order = StreamId::try_new(format!("order-{}", Uuid::now_v7())).map_err(|e| {
1120 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1121 })?;
1122 writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1123 order_streams.push(order);
1124 }
1125
1126 let account_1 = StreamId::try_new(format!("account-1-{run}")).map_err(|e| {
1127 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1128 })?;
1129 let account_2 = StreamId::try_new(format!("account-2-{run}")).map_err(|e| {
1130 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1131 })?;
1132 writes = register_contract_stream(SCENARIO, writes, &account_1, StreamVersion::new(0))?;
1133 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
1134
1135 for order in &order_streams {
1136 writes = append_contract_event(SCENARIO, writes, order)?;
1137 }
1138 writes = append_contract_event(SCENARIO, writes, &account_1)?;
1139 writes = append_contract_event(SCENARIO, writes, &account_2)?;
1140
1141 let _ = store
1142 .append_events(writes)
1143 .await
1144 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1145
1146 let pattern = StreamPattern::try_new("account-*").map_err(|e| {
1149 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1150 })?;
1151 let filter = EventFilter::pattern(pattern);
1152 let page = EventPage::first(BatchSize::new(10));
1153 let events = store
1154 .read_events::<ContractTestEvent>(filter, page)
1155 .await
1156 .map_err(|_error| {
1157 ContractTestFailure::assertion(
1158 SCENARIO,
1159 "read_events failed with stream pattern filter",
1160 )
1161 })?;
1162
1163 if events.len() != 2 {
1166 return Err(ContractTestFailure::assertion(
1167 SCENARIO,
1168 format!(
1169 "expected exactly 2 events matching glob 'account-*' but got {} (pattern filter must be applied before the page limit)",
1170 events.len()
1171 ),
1172 ));
1173 }
1174
1175 for (event, _) in events.iter() {
1176 let stream_id_str = event.stream_id().as_ref();
1177 if !stream_id_str.starts_with("account-") {
1178 return Err(ContractTestFailure::assertion(
1179 SCENARIO,
1180 format!(
1181 "expected all events from streams matching 'account-*' but found event from {}",
1182 stream_id_str
1183 ),
1184 ));
1185 }
1186 }
1187
1188 Ok(())
1189}
1190
1191pub async fn test_stream_pattern_single_char<F, S>(make_store: F) -> ContractTestResult
1197where
1198 F: Fn() -> S + Send + Sync + Clone + 'static,
1199 S: EventStore + EventReader + Send + Sync + 'static,
1200{
1201 const SCENARIO: &str = "stream_pattern_single_char";
1202
1203 let store = make_store();
1204 let run = Uuid::now_v7();
1205
1206 let account_x = StreamId::try_new(format!("account-x{run}")).map_err(|e| {
1208 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1209 })?;
1210 let account_xy = StreamId::try_new(format!("account-xy{run}")).map_err(|e| {
1211 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1212 })?;
1213 let order = StreamId::try_new(format!("order-z{run}")).map_err(|e| {
1214 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1215 })?;
1216
1217 let mut writes = register_contract_stream(
1218 SCENARIO,
1219 StreamWrites::new(),
1220 &account_x,
1221 StreamVersion::new(0),
1222 )?;
1223 writes = register_contract_stream(SCENARIO, writes, &account_xy, StreamVersion::new(0))?;
1224 writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1225
1226 writes = append_contract_event(SCENARIO, writes, &account_x)?;
1227 writes = append_contract_event(SCENARIO, writes, &account_xy)?;
1228 writes = append_contract_event(SCENARIO, writes, &order)?;
1229
1230 let _ = store
1231 .append_events(writes)
1232 .await
1233 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1234
1235 let pattern = StreamPattern::try_new(format!("account-?{run}")).map_err(|e| {
1238 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1239 })?;
1240 let filter = EventFilter::pattern(pattern);
1241 let page = EventPage::first(BatchSize::new(100));
1242 let events = store
1243 .read_events::<ContractTestEvent>(filter, page)
1244 .await
1245 .map_err(|_error| {
1246 ContractTestFailure::assertion(
1247 SCENARIO,
1248 "read_events failed with stream pattern filter",
1249 )
1250 })?;
1251
1252 if events.len() != 1 {
1255 return Err(ContractTestFailure::assertion(
1256 SCENARIO,
1257 format!(
1258 "expected exactly 1 event matching glob 'account-?{run}' but got {}",
1259 events.len()
1260 ),
1261 ));
1262 }
1263
1264 let (event, _) = &events[0];
1265 let stream_id_str = event.stream_id().as_ref();
1266 if stream_id_str != account_x.as_ref() {
1267 return Err(ContractTestFailure::assertion(
1268 SCENARIO,
1269 format!(
1270 "expected the single-character match {} but got {}",
1271 account_x.as_ref(),
1272 stream_id_str
1273 ),
1274 ));
1275 }
1276
1277 Ok(())
1278}
1279
1280pub async fn test_stream_pattern_char_class<F, S>(make_store: F) -> ContractTestResult
1286where
1287 F: Fn() -> S + Send + Sync + Clone + 'static,
1288 S: EventStore + EventReader + Send + Sync + 'static,
1289{
1290 const SCENARIO: &str = "stream_pattern_char_class";
1291
1292 let store = make_store();
1293 let run = Uuid::now_v7().simple().to_string();
1294
1295 let digit_stream = StreamId::try_new(format!("account-7-{run}")).map_err(|e| {
1297 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1298 })?;
1299 let letter_stream = StreamId::try_new(format!("account-a-{run}")).map_err(|e| {
1300 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1301 })?;
1302
1303 let mut writes = register_contract_stream(
1304 SCENARIO,
1305 StreamWrites::new(),
1306 &digit_stream,
1307 StreamVersion::new(0),
1308 )?;
1309 writes = register_contract_stream(SCENARIO, writes, &letter_stream, StreamVersion::new(0))?;
1310
1311 writes = append_contract_event(SCENARIO, writes, &digit_stream)?;
1312 writes = append_contract_event(SCENARIO, writes, &letter_stream)?;
1313
1314 let _ = store
1315 .append_events(writes)
1316 .await
1317 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1318
1319 let pattern = StreamPattern::try_new("account-[0-9]*").map_err(|e| {
1321 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1322 })?;
1323 let filter = EventFilter::pattern(pattern);
1324 let page = EventPage::first(BatchSize::new(100));
1325 let events = store
1326 .read_events::<ContractTestEvent>(filter, page)
1327 .await
1328 .map_err(|_error| {
1329 ContractTestFailure::assertion(
1330 SCENARIO,
1331 "read_events failed with stream pattern filter",
1332 )
1333 })?;
1334
1335 if events.len() != 1 {
1337 return Err(ContractTestFailure::assertion(
1338 SCENARIO,
1339 format!(
1340 "expected exactly 1 event matching glob 'account-[0-9]*' but got {}",
1341 events.len()
1342 ),
1343 ));
1344 }
1345
1346 let (event, _) = &events[0];
1347 let stream_id_str = event.stream_id().as_ref();
1348 if stream_id_str != digit_stream.as_ref() {
1349 return Err(ContractTestFailure::assertion(
1350 SCENARIO,
1351 format!(
1352 "expected the digit-prefixed match {} but got {}",
1353 digit_stream.as_ref(),
1354 stream_id_str
1355 ),
1356 ));
1357 }
1358
1359 Ok(())
1360}
1361
1362pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
1364where
1365 F: Fn() -> S + Send + Sync + Clone + 'static,
1366 S: EventStore + EventReader + Send + Sync + 'static,
1367{
1368 const SCENARIO: &str = "batch_limiting";
1369
1370 let store = make_store();
1371
1372 let stream = contract_stream_id(SCENARIO, "stream")?;
1374
1375 let mut writes = register_contract_stream(
1376 SCENARIO,
1377 StreamWrites::new(),
1378 &stream,
1379 StreamVersion::new(0),
1380 )?;
1381
1382 for _ in 0..20 {
1384 writes = append_contract_event(SCENARIO, writes, &stream)?;
1385 }
1386
1387 let _ = store
1388 .append_events(writes)
1389 .await
1390 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1391
1392 let filter = EventFilter::all();
1394 let page = EventPage::first(BatchSize::new(10));
1395 let events = store
1396 .read_events::<ContractTestEvent>(filter, page)
1397 .await
1398 .map_err(|_error| {
1399 ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1400 })?;
1401
1402 if events.len() != 10 {
1404 return Err(ContractTestFailure::assertion(
1405 SCENARIO,
1406 format!("expected exactly 10 events but got {}", events.len()),
1407 ));
1408 }
1409
1410 Ok(())
1415}
1416
1417pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1423where
1424 F: Fn() -> CS + Send + Sync + Clone + 'static,
1425 CS: CheckpointStore + Send + Sync + 'static,
1426{
1427 const SCENARIO: &str = "checkpoint_save_and_load";
1428
1429 let store = make_checkpoint_store();
1430
1431 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1433 let position = StreamPosition::new(Uuid::now_v7());
1434
1435 store
1437 .save(&subscription_name, position)
1438 .await
1439 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1440
1441 let loaded = store
1443 .load(&subscription_name)
1444 .await
1445 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1446
1447 if loaded != Some(position) {
1448 return Err(ContractTestFailure::assertion(
1449 SCENARIO,
1450 format!(
1451 "expected loaded position {:?} but got {:?}",
1452 Some(position),
1453 loaded
1454 ),
1455 ));
1456 }
1457
1458 Ok(())
1459}
1460
1461pub async fn test_checkpoint_update_overwrites<F, CS>(
1463 make_checkpoint_store: F,
1464) -> ContractTestResult
1465where
1466 F: Fn() -> CS + Send + Sync + Clone + 'static,
1467 CS: CheckpointStore + Send + Sync + 'static,
1468{
1469 const SCENARIO: &str = "checkpoint_update_overwrites";
1470
1471 let store = make_checkpoint_store();
1472
1473 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1475 let first_position = StreamPosition::new(Uuid::now_v7());
1476
1477 store
1478 .save(&subscription_name, first_position)
1479 .await
1480 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1481
1482 let second_position = StreamPosition::new(Uuid::now_v7());
1484 store
1485 .save(&subscription_name, second_position)
1486 .await
1487 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1488
1489 let loaded = store
1491 .load(&subscription_name)
1492 .await
1493 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1494
1495 if loaded != Some(second_position) {
1496 return Err(ContractTestFailure::assertion(
1497 SCENARIO,
1498 format!(
1499 "expected updated position {:?} but got {:?}",
1500 Some(second_position),
1501 loaded
1502 ),
1503 ));
1504 }
1505
1506 Ok(())
1507}
1508
1509pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1511 make_checkpoint_store: F,
1512) -> ContractTestResult
1513where
1514 F: Fn() -> CS + Send + Sync + Clone + 'static,
1515 CS: CheckpointStore + Send + Sync + 'static,
1516{
1517 const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1518
1519 let store = make_checkpoint_store();
1520
1521 let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1523
1524 let loaded = store
1526 .load(&subscription_name)
1527 .await
1528 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1529
1530 if loaded.is_some() {
1532 return Err(ContractTestFailure::assertion(
1533 SCENARIO,
1534 format!("expected None for missing checkpoint but got {:?}", loaded),
1535 ));
1536 }
1537
1538 Ok(())
1539}
1540
1541pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1543 make_checkpoint_store: F,
1544) -> ContractTestResult
1545where
1546 F: Fn() -> CS + Send + Sync + Clone + 'static,
1547 CS: CheckpointStore + Send + Sync + 'static,
1548{
1549 const SCENARIO: &str = "checkpoint_independent_subscriptions";
1550
1551 let store = make_checkpoint_store();
1552
1553 let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1555 let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1556
1557 let position_a = StreamPosition::new(Uuid::now_v7());
1558 let position_b = StreamPosition::new(Uuid::now_v7());
1559
1560 store
1562 .save(&subscription_a, position_a)
1563 .await
1564 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1565
1566 store
1567 .save(&subscription_b, position_b)
1568 .await
1569 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1570
1571 let loaded_a = store
1573 .load(&subscription_a)
1574 .await
1575 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1576
1577 let loaded_b = store
1578 .load(&subscription_b)
1579 .await
1580 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1581
1582 if loaded_a != Some(position_a) {
1583 return Err(ContractTestFailure::assertion(
1584 SCENARIO,
1585 format!(
1586 "subscription A: expected {:?} but got {:?}",
1587 Some(position_a),
1588 loaded_a
1589 ),
1590 ));
1591 }
1592
1593 if loaded_b != Some(position_b) {
1594 return Err(ContractTestFailure::assertion(
1595 SCENARIO,
1596 format!(
1597 "subscription B: expected {:?} but got {:?}",
1598 Some(position_b),
1599 loaded_b
1600 ),
1601 ));
1602 }
1603
1604 Ok(())
1605}
1606
1607pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1616where
1617 F: Fn() -> C + Send + Sync + Clone + 'static,
1618 C: ProjectorCoordinator + Send + Sync + 'static,
1619{
1620 const SCENARIO: &str = "coordination_acquire_leadership";
1621
1622 let coordinator = make_coordinator();
1623
1624 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1626
1627 let result = coordinator.try_acquire(&subscription_name).await;
1629
1630 if result.is_err() {
1632 return Err(ContractTestFailure::assertion(
1633 SCENARIO,
1634 "expected first instance to acquire leadership successfully, but try_acquire failed",
1635 ));
1636 }
1637
1638 Ok(())
1639}
1640
1641pub async fn test_coordination_second_instance_blocked<F, C>(
1646 make_coordinator: F,
1647) -> ContractTestResult
1648where
1649 F: Fn() -> C + Send + Sync + Clone + 'static,
1650 C: ProjectorCoordinator + Send + Sync + 'static,
1651{
1652 const SCENARIO: &str = "coordination_second_instance_blocked";
1653
1654 let coordinator = make_coordinator();
1655
1656 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1658
1659 let _first_guard = coordinator
1661 .try_acquire(&subscription_name)
1662 .await
1663 .map_err(|_| {
1664 ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1665 })?;
1666
1667 let second_result = coordinator.try_acquire(&subscription_name).await;
1669
1670 if second_result.is_ok() {
1672 return Err(ContractTestFailure::assertion(
1673 SCENARIO,
1674 "expected second instance to be blocked but try_acquire succeeded",
1675 ));
1676 }
1677
1678 Ok(())
1679}
1680
1681pub async fn test_coordination_independent_subscriptions<F, C>(
1687 make_coordinator: F,
1688) -> ContractTestResult
1689where
1690 F: Fn() -> C + Send + Sync + Clone + 'static,
1691 C: ProjectorCoordinator + Send + Sync + 'static,
1692{
1693 const SCENARIO: &str = "coordination_independent_subscriptions";
1694
1695 let coordinator = make_coordinator();
1696
1697 let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1699 let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1700
1701 let _guard_a = coordinator
1703 .try_acquire(&subscription_a)
1704 .await
1705 .map_err(|_| {
1706 ContractTestFailure::assertion(
1707 SCENARIO,
1708 "projector-A failed to acquire leadership for its subscription",
1709 )
1710 })?;
1711
1712 let result_b = coordinator.try_acquire(&subscription_b).await;
1715
1716 if result_b.is_err() {
1718 return Err(ContractTestFailure::assertion(
1719 SCENARIO,
1720 "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1721 ));
1722 }
1723
1724 Ok(())
1725}
1726
1727pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1733 make_coordinator: F,
1734) -> ContractTestResult
1735where
1736 F: Fn() -> C + Send + Sync + Clone + 'static,
1737 C: ProjectorCoordinator + Send + Sync + 'static,
1738{
1739 const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1740
1741 let coordinator = make_coordinator();
1742
1743 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1745
1746 {
1748 let _first_guard = coordinator
1749 .try_acquire(&subscription_name)
1750 .await
1751 .map_err(|_| {
1752 ContractTestFailure::assertion(
1753 SCENARIO,
1754 "first instance failed to acquire leadership",
1755 )
1756 })?;
1757 }
1759
1760 let second_result = coordinator.try_acquire(&subscription_name).await;
1762
1763 if second_result.is_err() {
1765 return Err(ContractTestFailure::assertion(
1766 SCENARIO,
1767 "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1768 ));
1769 }
1770
1771 Ok(())
1772}