1use eventcore_types::{
2 BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3 EventStoreError, ProjectorCoordinator, StreamId, StreamPattern, StreamPosition, StreamPrefix,
4 StreamVersion, StreamWrites, collect_events,
5};
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug, thiserror::Error)]
11#[error("[{scenario}] {detail}")]
12pub struct ContractTestFailure {
13 scenario: &'static str,
14 detail: String,
15}
16
17impl ContractTestFailure {
18 fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19 Self {
20 scenario,
21 detail: detail.into(),
22 }
23 }
24
25 fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26 Self::new(scenario, format!("builder failure during {phase}: {error}"))
27 }
28
29 fn store_error(
30 scenario: &'static str,
31 operation: &'static str,
32 error: EventStoreError,
33 ) -> Self {
34 Self::new(
35 scenario,
36 format!("{operation} operation returned unexpected error: {error}"),
37 )
38 }
39
40 fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41 Self::new(scenario, detail)
42 }
43}
44
45pub type ContractTestResult = Result<(), ContractTestFailure>;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContractTestEvent {
49 stream_id: StreamId,
50}
51
52impl ContractTestEvent {
53 pub fn new(stream_id: StreamId) -> Self {
54 Self { stream_id }
55 }
56}
57
58impl Event for ContractTestEvent {
59 fn stream_id(&self) -> &StreamId {
60 &self.stream_id
61 }
62
63 fn event_type_name() -> &'static str {
64 "ContractTestEvent"
65 }
66}
67
68fn contract_stream_id(
69 scenario: &'static str,
70 label: &str,
71) -> Result<StreamId, ContractTestFailure> {
72 let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
74
75 StreamId::try_new(raw.clone()).map_err(|error| {
76 ContractTestFailure::assertion(
77 scenario,
78 format!("unable to construct stream id `{}`: {}", raw, error),
79 )
80 })
81}
82
83fn builder_step(
84 scenario: &'static str,
85 phase: &'static str,
86 result: Result<StreamWrites, EventStoreError>,
87) -> Result<StreamWrites, ContractTestFailure> {
88 result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
89}
90
91fn register_contract_stream(
92 scenario: &'static str,
93 writes: StreamWrites,
94 stream_id: &StreamId,
95 expected_version: StreamVersion,
96) -> Result<StreamWrites, ContractTestFailure> {
97 builder_step(
98 scenario,
99 "register_stream",
100 writes.register_stream(stream_id.clone(), expected_version),
101 )
102}
103
104fn append_contract_event(
105 scenario: &'static str,
106 writes: StreamWrites,
107 stream_id: &StreamId,
108) -> Result<StreamWrites, ContractTestFailure> {
109 let event = ContractTestEvent::new(stream_id.clone());
110 builder_step(scenario, "append", writes.append(event))
111}
112
113pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
114where
115 F: Fn() -> S + Send + Sync + Clone + 'static,
116 S: EventStore + Send + Sync + 'static,
117{
118 const SCENARIO: &str = "basic_read_write";
119
120 let store = make_store();
121 let stream_id = contract_stream_id(SCENARIO, "single");
122
123 let stream_id = stream_id?;
124
125 let writes = register_contract_stream(
126 SCENARIO,
127 StreamWrites::new(),
128 &stream_id,
129 StreamVersion::new(0),
130 )?;
131 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
132
133 let _ = store
134 .append_events(writes)
135 .await
136 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
137
138 let stream = store
139 .read_stream::<ContractTestEvent>(stream_id.clone())
140 .await
141 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
142 let events = collect_events(stream)
143 .await
144 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146 let len = events.len();
147 let empty = events.is_empty();
148
149 if empty {
150 return Err(ContractTestFailure::assertion(
151 SCENARIO,
152 "expected stream to contain events but it was empty",
153 ));
154 }
155
156 if len != 1 {
157 return Err(ContractTestFailure::assertion(
158 SCENARIO,
159 format!(
160 "expected stream to contain exactly one event, observed len={}",
161 len
162 ),
163 ));
164 }
165
166 Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171 F: Fn() -> S + Send + Sync + Clone + 'static,
172 S: EventStore + Send + Sync + 'static,
173{
174 const SCENARIO: &str = "concurrent_version_conflicts";
175
176 let store = make_store();
177 let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179 let first_writes = register_contract_stream(
180 SCENARIO,
181 StreamWrites::new(),
182 &stream_id,
183 StreamVersion::new(0),
184 )?;
185 let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187 let _ = store
188 .append_events(first_writes)
189 .await
190 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192 let conflicting_writes = register_contract_stream(
193 SCENARIO,
194 StreamWrites::new(),
195 &stream_id,
196 StreamVersion::new(0),
197 )?;
198 let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200 match store.append_events(conflicting_writes).await {
201 Err(EventStoreError::VersionConflict { .. }) => Ok(()),
202 Err(error) => Err(ContractTestFailure::store_error(
203 SCENARIO,
204 "append_events",
205 error,
206 )),
207 Ok(_) => Err(ContractTestFailure::assertion(
208 SCENARIO,
209 "expected version conflict but append succeeded",
210 )),
211 }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216 F: Fn() -> S + Send + Sync + Clone + 'static,
217 S: EventStore + Send + Sync + 'static,
218{
219 const SCENARIO: &str = "stream_isolation";
220
221 let store = make_store();
222 let left_stream = contract_stream_id(SCENARIO, "left")?;
223 let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225 let writes = register_contract_stream(
226 SCENARIO,
227 StreamWrites::new(),
228 &left_stream,
229 StreamVersion::new(0),
230 )?;
231 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235 let _ = store
236 .append_events(writes)
237 .await
238 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240 let left_stream_events = store
241 .read_stream::<ContractTestEvent>(left_stream.clone())
242 .await
243 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244 let left_events = collect_events(left_stream_events)
245 .await
246 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
247
248 let right_stream_events = store
249 .read_stream::<ContractTestEvent>(right_stream.clone())
250 .await
251 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
252 let right_events = collect_events(right_stream_events)
253 .await
254 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
255
256 let left_len = left_events.len();
257 if left_len != 1 {
258 return Err(ContractTestFailure::assertion(
259 SCENARIO,
260 format!(
261 "left stream expected exactly one event but observed {}",
262 left_len
263 ),
264 ));
265 }
266
267 if left_events
268 .iter()
269 .any(|event| event.stream_id() != &left_stream)
270 {
271 return Err(ContractTestFailure::assertion(
272 SCENARIO,
273 "left stream read events belonging to another stream",
274 ));
275 }
276
277 let right_len = right_events.len();
278 if right_len != 1 {
279 return Err(ContractTestFailure::assertion(
280 SCENARIO,
281 format!(
282 "right stream expected exactly one event but observed {}",
283 right_len
284 ),
285 ));
286 }
287
288 if right_events
289 .iter()
290 .any(|event| event.stream_id() != &right_stream)
291 {
292 return Err(ContractTestFailure::assertion(
293 SCENARIO,
294 "right stream read events belonging to another stream",
295 ));
296 }
297
298 Ok(())
299}
300
301pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
302where
303 F: Fn() -> S + Send + Sync + Clone + 'static,
304 S: EventStore + Send + Sync + 'static,
305{
306 const SCENARIO: &str = "missing_stream_reads";
307
308 let store = make_store();
309 let stream_id = contract_stream_id(SCENARIO, "ghost")?;
310
311 let stream = store
312 .read_stream::<ContractTestEvent>(stream_id.clone())
313 .await
314 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
315 let events = collect_events(stream)
316 .await
317 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
318
319 if !events.is_empty() {
320 return Err(ContractTestFailure::assertion(
321 SCENARIO,
322 "expected read_stream to succeed with no events for an untouched stream",
323 ));
324 }
325
326 Ok(())
327}
328
329pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
330where
331 F: Fn() -> S + Send + Sync + Clone + 'static,
332 S: EventStore + Send + Sync + 'static,
333{
334 const SCENARIO: &str = "conflict_preserves_atomicity";
335
336 let store = make_store();
337 let left_stream = contract_stream_id(SCENARIO, "left")?;
338 let right_stream = contract_stream_id(SCENARIO, "right")?;
339
340 let writes = register_contract_stream(
342 SCENARIO,
343 StreamWrites::new(),
344 &left_stream,
345 StreamVersion::new(0),
346 )?;
347 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
348 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
349 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
350
351 let _ = store
352 .append_events(writes)
353 .await
354 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
355
356 let writes = register_contract_stream(
358 SCENARIO,
359 StreamWrites::new(),
360 &left_stream,
361 StreamVersion::new(0),
362 )?;
363 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
364 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
365 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
366
367 match store.append_events(writes).await {
368 Err(EventStoreError::VersionConflict { .. }) => {
369 let left_stream_events = store
370 .read_stream::<ContractTestEvent>(left_stream.clone())
371 .await
372 .map_err(|error| {
373 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
374 })?;
375 let left_events = collect_events(left_stream_events).await.map_err(|error| {
376 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
377 })?;
378 if left_events.len() != 1 {
379 return Err(ContractTestFailure::assertion(
380 SCENARIO,
381 format!(
382 "expected left stream to remain at len=1 after failed append, observed {}",
383 left_events.len()
384 ),
385 ));
386 }
387
388 let right_stream_events = store
389 .read_stream::<ContractTestEvent>(right_stream.clone())
390 .await
391 .map_err(|error| {
392 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
393 })?;
394 let right_events = collect_events(right_stream_events).await.map_err(|error| {
395 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
396 })?;
397 if right_events.len() != 1 {
398 return Err(ContractTestFailure::assertion(
399 SCENARIO,
400 format!(
401 "expected right stream to remain at len=1 after failed append, observed {}",
402 right_events.len()
403 ),
404 ));
405 }
406
407 Ok(())
408 }
409 Err(error) => Err(ContractTestFailure::store_error(
410 SCENARIO,
411 "append_events",
412 error,
413 )),
414 Ok(_) => Err(ContractTestFailure::assertion(
415 SCENARIO,
416 "expected version conflict but append succeeded",
417 )),
418 }
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct MismatchedEvent {
424 stream_id: StreamId,
425 extra_field: String,
426}
427
428impl Event for MismatchedEvent {
429 fn stream_id(&self) -> &StreamId {
430 &self.stream_id
431 }
432
433 fn event_type_name() -> &'static str {
434 "MismatchedEvent"
435 }
436}
437
438pub async fn test_read_stream_errors_on_type_mismatch<F, S>(make_store: F) -> ContractTestResult
446where
447 F: Fn() -> S + Send + Sync + Clone + 'static,
448 S: EventStore + Send + Sync + 'static,
449{
450 const SCENARIO: &str = "read_stream_errors_on_type_mismatch";
451
452 let store = make_store();
453 let stream_id = contract_stream_id(SCENARIO, "mismatched")?;
454
455 let writes = register_contract_stream(
457 SCENARIO,
458 StreamWrites::new(),
459 &stream_id,
460 StreamVersion::new(0),
461 )?;
462 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
463
464 let _ = store
465 .append_events(writes)
466 .await
467 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
468
469 let result = match store.read_stream::<MismatchedEvent>(stream_id).await {
475 Ok(stream) => collect_events(stream).await,
476 Err(error) => Err(error),
477 };
478
479 match result {
480 Err(EventStoreError::DeserializationFailed { .. }) => Ok(()),
481 Err(other) => Err(ContractTestFailure::assertion(
482 SCENARIO,
483 format!(
484 "expected DeserializationFailed error, got different error: {}",
485 other
486 ),
487 )),
488 Ok(events) if events.is_empty() => Err(ContractTestFailure::assertion(
489 SCENARIO,
490 "read_stream silently returned empty results instead of erroring on type mismatch",
491 )),
492 Ok(events) => Err(ContractTestFailure::assertion(
493 SCENARIO,
494 format!(
495 "read_stream returned {} events instead of erroring on type mismatch",
496 events.len()
497 ),
498 )),
499 }
500}
501
502#[macro_export]
527macro_rules! backend_contract_tests {
528 (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
529 #[allow(non_snake_case)]
530 mod $suite {
531 use $crate::contract::{
532 test_basic_read_write, test_batch_limiting,
533 test_checkpoint_independent_subscriptions,
534 test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
535 test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
536 test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
537 test_coordination_independent_subscriptions,
538 test_coordination_leadership_released_on_guard_drop,
539 test_coordination_second_instance_blocked, test_event_ordering_across_streams,
540 test_missing_stream_reads, test_position_based_resumption,
541 test_read_stream_errors_on_type_mismatch, test_stream_isolation,
542 test_stream_pattern_char_class, test_stream_pattern_filtering,
543 test_stream_pattern_single_char, test_stream_prefix_filtering,
544 test_stream_prefix_requires_prefix_match,
545 };
546
547 #[tokio::test(flavor = "multi_thread")]
548 async fn basic_read_write_contract() {
549 test_basic_read_write($make_store)
550 .await
551 .expect("event store contract failed");
552 }
553
554 #[tokio::test(flavor = "multi_thread")]
555 async fn concurrent_version_conflicts_contract() {
556 test_concurrent_version_conflicts($make_store)
557 .await
558 .expect("event store contract failed");
559 }
560
561 #[tokio::test(flavor = "multi_thread")]
562 async fn stream_isolation_contract() {
563 test_stream_isolation($make_store)
564 .await
565 .expect("event store contract failed");
566 }
567
568 #[tokio::test(flavor = "multi_thread")]
569 async fn missing_stream_reads_contract() {
570 test_missing_stream_reads($make_store)
571 .await
572 .expect("event store contract failed");
573 }
574
575 #[tokio::test(flavor = "multi_thread")]
576 async fn conflict_preserves_atomicity_contract() {
577 test_conflict_preserves_atomicity($make_store)
578 .await
579 .expect("event store contract failed");
580 }
581
582 #[tokio::test(flavor = "multi_thread")]
583 async fn read_stream_errors_on_type_mismatch_contract() {
584 test_read_stream_errors_on_type_mismatch($make_store)
585 .await
586 .expect("event store contract failed");
587 }
588
589 #[tokio::test(flavor = "multi_thread")]
590 async fn event_ordering_across_streams_contract() {
591 test_event_ordering_across_streams($make_store)
592 .await
593 .expect("event reader contract failed");
594 }
595
596 #[tokio::test(flavor = "multi_thread")]
597 async fn position_based_resumption_contract() {
598 test_position_based_resumption($make_store)
599 .await
600 .expect("event reader contract failed");
601 }
602
603 #[tokio::test(flavor = "multi_thread")]
604 async fn stream_prefix_filtering_contract() {
605 test_stream_prefix_filtering($make_store)
606 .await
607 .expect("event reader contract failed");
608 }
609
610 #[tokio::test(flavor = "multi_thread")]
611 async fn stream_prefix_requires_prefix_match_contract() {
612 test_stream_prefix_requires_prefix_match($make_store)
613 .await
614 .expect("event reader contract failed");
615 }
616
617 #[tokio::test(flavor = "multi_thread")]
618 async fn stream_pattern_filtering_contract() {
619 test_stream_pattern_filtering($make_store)
620 .await
621 .expect("event reader contract failed");
622 }
623
624 #[tokio::test(flavor = "multi_thread")]
625 async fn stream_pattern_single_char_contract() {
626 test_stream_pattern_single_char($make_store)
627 .await
628 .expect("event reader contract failed");
629 }
630
631 #[tokio::test(flavor = "multi_thread")]
632 async fn stream_pattern_char_class_contract() {
633 test_stream_pattern_char_class($make_store)
634 .await
635 .expect("event reader contract failed");
636 }
637
638 #[tokio::test(flavor = "multi_thread")]
639 async fn batch_limiting_contract() {
640 test_batch_limiting($make_store)
641 .await
642 .expect("event reader contract failed");
643 }
644
645 #[tokio::test(flavor = "multi_thread")]
647 async fn checkpoint_save_and_load_contract() {
648 test_checkpoint_save_and_load($make_checkpoint_store)
649 .await
650 .expect("checkpoint store contract failed");
651 }
652
653 #[tokio::test(flavor = "multi_thread")]
654 async fn checkpoint_update_overwrites_contract() {
655 test_checkpoint_update_overwrites($make_checkpoint_store)
656 .await
657 .expect("checkpoint store contract failed");
658 }
659
660 #[tokio::test(flavor = "multi_thread")]
661 async fn checkpoint_load_missing_returns_none_contract() {
662 test_checkpoint_load_missing_returns_none($make_checkpoint_store)
663 .await
664 .expect("checkpoint store contract failed");
665 }
666
667 #[tokio::test(flavor = "multi_thread")]
668 async fn checkpoint_independent_subscriptions_contract() {
669 test_checkpoint_independent_subscriptions($make_checkpoint_store)
670 .await
671 .expect("checkpoint store contract failed");
672 }
673
674 #[tokio::test(flavor = "multi_thread")]
676 async fn coordination_acquire_leadership_contract() {
677 test_coordination_acquire_leadership($make_coordinator)
678 .await
679 .expect("coordinator contract failed");
680 }
681
682 #[tokio::test(flavor = "multi_thread")]
683 async fn coordination_second_instance_blocked_contract() {
684 test_coordination_second_instance_blocked($make_coordinator)
685 .await
686 .expect("coordinator contract failed");
687 }
688
689 #[tokio::test(flavor = "multi_thread")]
690 async fn coordination_independent_subscriptions_contract() {
691 test_coordination_independent_subscriptions($make_coordinator)
692 .await
693 .expect("coordinator contract failed");
694 }
695
696 #[tokio::test(flavor = "multi_thread")]
697 async fn coordination_leadership_released_on_guard_drop_contract() {
698 test_coordination_leadership_released_on_guard_drop($make_coordinator)
699 .await
700 .expect("coordinator contract failed");
701 }
702 }
703 };
704}
705
706pub use backend_contract_tests;
707
708pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
710where
711 F: Fn() -> S + Send + Sync + Clone + 'static,
712 S: EventStore + EventReader + Send + Sync + 'static,
713{
714 const SCENARIO: &str = "event_ordering_across_streams";
715
716 let store = make_store();
717
718 let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
720 let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
721 let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
722
723 let writes = register_contract_stream(
725 SCENARIO,
726 StreamWrites::new(),
727 &stream_a,
728 StreamVersion::new(0),
729 )?;
730 let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
731 let _ = store
732 .append_events(writes)
733 .await
734 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
735
736 let writes = register_contract_stream(
738 SCENARIO,
739 StreamWrites::new(),
740 &stream_b,
741 StreamVersion::new(0),
742 )?;
743 let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
744 let _ = store
745 .append_events(writes)
746 .await
747 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
748
749 let writes = register_contract_stream(
751 SCENARIO,
752 StreamWrites::new(),
753 &stream_c,
754 StreamVersion::new(0),
755 )?;
756 let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
757 let _ = store
758 .append_events(writes)
759 .await
760 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
761
762 let filter = EventFilter::all();
764 let page = EventPage::first(BatchSize::new(100));
765 let events = store
766 .read_events::<ContractTestEvent>(filter, page)
767 .await
768 .map_err(|_error| {
769 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
770 })?;
771
772 if events.len() != 3 {
774 return Err(ContractTestFailure::assertion(
775 SCENARIO,
776 format!("expected 3 events but got {}", events.len()),
777 ));
778 }
779
780 let (first_event, _) = &events[0];
782 if first_event.stream_id() != &stream_a {
783 return Err(ContractTestFailure::assertion(
784 SCENARIO,
785 format!(
786 "expected first event from stream_a but got from {:?}",
787 first_event.stream_id()
788 ),
789 ));
790 }
791
792 let (second_event, _) = &events[1];
793 if second_event.stream_id() != &stream_b {
794 return Err(ContractTestFailure::assertion(
795 SCENARIO,
796 format!(
797 "expected second event from stream_b but got from {:?}",
798 second_event.stream_id()
799 ),
800 ));
801 }
802
803 let (third_event, _) = &events[2];
804 if third_event.stream_id() != &stream_c {
805 return Err(ContractTestFailure::assertion(
806 SCENARIO,
807 format!(
808 "expected third event from stream_c but got from {:?}",
809 third_event.stream_id()
810 ),
811 ));
812 }
813
814 Ok(())
815}
816
817pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
819where
820 F: Fn() -> S + Send + Sync + Clone + 'static,
821 S: EventStore + EventReader + Send + Sync + 'static,
822{
823 const SCENARIO: &str = "position_based_resumption";
824
825 let store = make_store();
826
827 let stream = contract_stream_id(SCENARIO, "stream")?;
829
830 let mut writes = register_contract_stream(
831 SCENARIO,
832 StreamWrites::new(),
833 &stream,
834 StreamVersion::new(0),
835 )?;
836
837 for _ in 0..5 {
839 writes = append_contract_event(SCENARIO, writes, &stream)?;
840 }
841
842 let _ = store
843 .append_events(writes)
844 .await
845 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
846
847 let filter = EventFilter::all();
849 let page = EventPage::first(BatchSize::new(100));
850 let all_events = store
851 .read_events::<ContractTestEvent>(filter.clone(), page)
852 .await
853 .map_err(|_error| {
854 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
855 })?;
856
857 let (_third_event, third_position) = &all_events[2];
858
859 let page_after = EventPage::after(*third_position, BatchSize::new(100));
861 let events_after = store
862 .read_events::<ContractTestEvent>(filter, page_after)
863 .await
864 .map_err(|_error| {
865 ContractTestFailure::assertion(
866 SCENARIO,
867 "read_events failed when reading after position",
868 )
869 })?;
870
871 if events_after.len() != 2 {
873 return Err(ContractTestFailure::assertion(
874 SCENARIO,
875 format!(
876 "expected 2 events after position {} but got {}",
877 third_position,
878 events_after.len()
879 ),
880 ));
881 }
882
883 for (_event, position) in events_after.iter() {
885 if *position == *third_position {
886 return Err(ContractTestFailure::assertion(
887 SCENARIO,
888 format!(
889 "expected position {} to be excluded but it was included in results",
890 third_position
891 ),
892 ));
893 }
894 }
895
896 let (_event1, pos1) = &events_after[0];
898 let (_event2, pos2) = &events_after[1];
899
900 if *pos1 <= *third_position {
901 return Err(ContractTestFailure::assertion(
902 SCENARIO,
903 format!(
904 "expected first returned position to be > {} but got {}",
905 third_position, pos1
906 ),
907 ));
908 }
909
910 if *pos2 <= *pos1 {
911 return Err(ContractTestFailure::assertion(
912 SCENARIO,
913 format!(
914 "expected positions to be in ascending order but {} <= {}",
915 pos2, pos1
916 ),
917 ));
918 }
919
920 Ok(())
921}
922
923pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
925where
926 F: Fn() -> S + Send + Sync + Clone + 'static,
927 S: EventStore + EventReader + Send + Sync + 'static,
928{
929 const SCENARIO: &str = "stream_prefix_filtering";
930
931 let store = make_store();
932
933 let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
935 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
936 })?;
937 let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
938 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
939 })?;
940 let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
941 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
942 })?;
943
944 let mut writes = register_contract_stream(
945 SCENARIO,
946 StreamWrites::new(),
947 &account_1,
948 StreamVersion::new(0),
949 )?;
950 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
951 writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
952
953 writes = append_contract_event(SCENARIO, writes, &account_1)?;
954 writes = append_contract_event(SCENARIO, writes, &account_2)?;
955 writes = append_contract_event(SCENARIO, writes, &order_1)?;
956
957 let _ = store
958 .append_events(writes)
959 .await
960 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
961
962 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
964 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
965 })?;
966 let filter = EventFilter::prefix(prefix);
967 let page = EventPage::first(BatchSize::new(100));
968 let events = store
969 .read_events::<ContractTestEvent>(filter, page)
970 .await
971 .map_err(|_error| {
972 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
973 })?;
974
975 if events.len() != 2 {
977 return Err(ContractTestFailure::assertion(
978 SCENARIO,
979 format!(
980 "expected 2 events from account-* streams but got {}",
981 events.len()
982 ),
983 ));
984 }
985
986 for (event, _) in events.iter() {
988 let stream_id_str = event.stream_id().as_ref();
989 if !stream_id_str.starts_with("account-") {
990 return Err(ContractTestFailure::assertion(
991 SCENARIO,
992 format!(
993 "expected all events from streams starting with 'account-' but found event from {}",
994 stream_id_str
995 ),
996 ));
997 }
998 }
999
1000 Ok(())
1003}
1004
1005pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
1007where
1008 F: Fn() -> S + Send + Sync + Clone + 'static,
1009 S: EventStore + EventReader + Send + Sync + 'static,
1010{
1011 const SCENARIO: &str = "stream_prefix_requires_prefix_match";
1012
1013 let store = make_store();
1014
1015 let account_stream =
1018 StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
1019 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1020 })?;
1021 let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
1022 .map_err(|e| {
1023 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1024 })?;
1025 let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
1026 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1027 })?;
1028
1029 let mut writes = register_contract_stream(
1030 SCENARIO,
1031 StreamWrites::new(),
1032 &account_stream,
1033 StreamVersion::new(0),
1034 )?;
1035 writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
1036 writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
1037
1038 writes = append_contract_event(SCENARIO, writes, &account_stream)?;
1039 writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
1040 writes = append_contract_event(SCENARIO, writes, &order_stream)?;
1041
1042 let _ = store
1043 .append_events(writes)
1044 .await
1045 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1046
1047 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
1049 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
1050 })?;
1051 let filter = EventFilter::prefix(prefix);
1052 let page = EventPage::first(BatchSize::new(100));
1053 let events = store
1054 .read_events::<ContractTestEvent>(filter, page)
1055 .await
1056 .map_err(|_error| {
1057 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
1058 })?;
1059
1060 if events.len() != 1 {
1062 return Err(ContractTestFailure::assertion(
1063 SCENARIO,
1064 format!(
1065 "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
1066 events.len()
1067 ),
1068 ));
1069 }
1070
1071 let (event, _) = &events[0];
1073 let stream_id_str = event.stream_id().as_ref();
1074 if !stream_id_str.starts_with("account-123") {
1075 return Err(ContractTestFailure::assertion(
1076 SCENARIO,
1077 format!(
1078 "expected event from stream starting with 'account-123' but got from {}",
1079 stream_id_str
1080 ),
1081 ));
1082 }
1083
1084 if stream_id_str.starts_with("my-account-456") {
1086 return Err(ContractTestFailure::assertion(
1087 SCENARIO,
1088 "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
1089 ));
1090 }
1091
1092 Ok(())
1093}
1094
1095pub async fn test_stream_pattern_filtering<F, S>(make_store: F) -> ContractTestResult
1104where
1105 F: Fn() -> S + Send + Sync + Clone + 'static,
1106 S: EventStore + EventReader + Send + Sync + 'static,
1107{
1108 const SCENARIO: &str = "stream_pattern_filtering";
1109
1110 let store = make_store();
1111 let run = Uuid::now_v7();
1112
1113 let mut writes = StreamWrites::new();
1116 let mut order_streams = Vec::new();
1117 for _ in 0..50 {
1118 let order = StreamId::try_new(format!("order-{}", Uuid::now_v7())).map_err(|e| {
1119 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1120 })?;
1121 writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1122 order_streams.push(order);
1123 }
1124
1125 let account_1 = StreamId::try_new(format!("account-1-{run}")).map_err(|e| {
1126 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1127 })?;
1128 let account_2 = StreamId::try_new(format!("account-2-{run}")).map_err(|e| {
1129 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1130 })?;
1131 writes = register_contract_stream(SCENARIO, writes, &account_1, StreamVersion::new(0))?;
1132 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
1133
1134 for order in &order_streams {
1135 writes = append_contract_event(SCENARIO, writes, order)?;
1136 }
1137 writes = append_contract_event(SCENARIO, writes, &account_1)?;
1138 writes = append_contract_event(SCENARIO, writes, &account_2)?;
1139
1140 let _ = store
1141 .append_events(writes)
1142 .await
1143 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1144
1145 let pattern = StreamPattern::try_new("account-*").map_err(|e| {
1148 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1149 })?;
1150 let filter = EventFilter::pattern(pattern);
1151 let page = EventPage::first(BatchSize::new(10));
1152 let events = store
1153 .read_events::<ContractTestEvent>(filter, page)
1154 .await
1155 .map_err(|_error| {
1156 ContractTestFailure::assertion(
1157 SCENARIO,
1158 "read_events failed with stream pattern filter",
1159 )
1160 })?;
1161
1162 if events.len() != 2 {
1165 return Err(ContractTestFailure::assertion(
1166 SCENARIO,
1167 format!(
1168 "expected exactly 2 events matching glob 'account-*' but got {} (pattern filter must be applied before the page limit)",
1169 events.len()
1170 ),
1171 ));
1172 }
1173
1174 for (event, _) in events.iter() {
1175 let stream_id_str = event.stream_id().as_ref();
1176 if !stream_id_str.starts_with("account-") {
1177 return Err(ContractTestFailure::assertion(
1178 SCENARIO,
1179 format!(
1180 "expected all events from streams matching 'account-*' but found event from {}",
1181 stream_id_str
1182 ),
1183 ));
1184 }
1185 }
1186
1187 Ok(())
1188}
1189
1190pub async fn test_stream_pattern_single_char<F, S>(make_store: F) -> ContractTestResult
1196where
1197 F: Fn() -> S + Send + Sync + Clone + 'static,
1198 S: EventStore + EventReader + Send + Sync + 'static,
1199{
1200 const SCENARIO: &str = "stream_pattern_single_char";
1201
1202 let store = make_store();
1203 let run = Uuid::now_v7();
1204
1205 let account_x = StreamId::try_new(format!("account-x{run}")).map_err(|e| {
1207 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1208 })?;
1209 let account_xy = StreamId::try_new(format!("account-xy{run}")).map_err(|e| {
1210 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1211 })?;
1212 let order = StreamId::try_new(format!("order-z{run}")).map_err(|e| {
1213 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1214 })?;
1215
1216 let mut writes = register_contract_stream(
1217 SCENARIO,
1218 StreamWrites::new(),
1219 &account_x,
1220 StreamVersion::new(0),
1221 )?;
1222 writes = register_contract_stream(SCENARIO, writes, &account_xy, StreamVersion::new(0))?;
1223 writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1224
1225 writes = append_contract_event(SCENARIO, writes, &account_x)?;
1226 writes = append_contract_event(SCENARIO, writes, &account_xy)?;
1227 writes = append_contract_event(SCENARIO, writes, &order)?;
1228
1229 let _ = store
1230 .append_events(writes)
1231 .await
1232 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1233
1234 let pattern = StreamPattern::try_new(format!("account-?{run}")).map_err(|e| {
1237 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1238 })?;
1239 let filter = EventFilter::pattern(pattern);
1240 let page = EventPage::first(BatchSize::new(100));
1241 let events = store
1242 .read_events::<ContractTestEvent>(filter, page)
1243 .await
1244 .map_err(|_error| {
1245 ContractTestFailure::assertion(
1246 SCENARIO,
1247 "read_events failed with stream pattern filter",
1248 )
1249 })?;
1250
1251 if events.len() != 1 {
1254 return Err(ContractTestFailure::assertion(
1255 SCENARIO,
1256 format!(
1257 "expected exactly 1 event matching glob 'account-?{run}' but got {}",
1258 events.len()
1259 ),
1260 ));
1261 }
1262
1263 let (event, _) = &events[0];
1264 let stream_id_str = event.stream_id().as_ref();
1265 if stream_id_str != account_x.as_ref() {
1266 return Err(ContractTestFailure::assertion(
1267 SCENARIO,
1268 format!(
1269 "expected the single-character match {} but got {}",
1270 account_x.as_ref(),
1271 stream_id_str
1272 ),
1273 ));
1274 }
1275
1276 Ok(())
1277}
1278
1279pub async fn test_stream_pattern_char_class<F, S>(make_store: F) -> ContractTestResult
1285where
1286 F: Fn() -> S + Send + Sync + Clone + 'static,
1287 S: EventStore + EventReader + Send + Sync + 'static,
1288{
1289 const SCENARIO: &str = "stream_pattern_char_class";
1290
1291 let store = make_store();
1292 let run = Uuid::now_v7().simple().to_string();
1293
1294 let digit_stream = StreamId::try_new(format!("account-7-{run}")).map_err(|e| {
1296 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1297 })?;
1298 let letter_stream = StreamId::try_new(format!("account-a-{run}")).map_err(|e| {
1299 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1300 })?;
1301
1302 let mut writes = register_contract_stream(
1303 SCENARIO,
1304 StreamWrites::new(),
1305 &digit_stream,
1306 StreamVersion::new(0),
1307 )?;
1308 writes = register_contract_stream(SCENARIO, writes, &letter_stream, StreamVersion::new(0))?;
1309
1310 writes = append_contract_event(SCENARIO, writes, &digit_stream)?;
1311 writes = append_contract_event(SCENARIO, writes, &letter_stream)?;
1312
1313 let _ = store
1314 .append_events(writes)
1315 .await
1316 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1317
1318 let pattern = StreamPattern::try_new("account-[0-9]*").map_err(|e| {
1320 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1321 })?;
1322 let filter = EventFilter::pattern(pattern);
1323 let page = EventPage::first(BatchSize::new(100));
1324 let events = store
1325 .read_events::<ContractTestEvent>(filter, page)
1326 .await
1327 .map_err(|_error| {
1328 ContractTestFailure::assertion(
1329 SCENARIO,
1330 "read_events failed with stream pattern filter",
1331 )
1332 })?;
1333
1334 if events.len() != 1 {
1336 return Err(ContractTestFailure::assertion(
1337 SCENARIO,
1338 format!(
1339 "expected exactly 1 event matching glob 'account-[0-9]*' but got {}",
1340 events.len()
1341 ),
1342 ));
1343 }
1344
1345 let (event, _) = &events[0];
1346 let stream_id_str = event.stream_id().as_ref();
1347 if stream_id_str != digit_stream.as_ref() {
1348 return Err(ContractTestFailure::assertion(
1349 SCENARIO,
1350 format!(
1351 "expected the digit-prefixed match {} but got {}",
1352 digit_stream.as_ref(),
1353 stream_id_str
1354 ),
1355 ));
1356 }
1357
1358 Ok(())
1359}
1360
1361pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
1363where
1364 F: Fn() -> S + Send + Sync + Clone + 'static,
1365 S: EventStore + EventReader + Send + Sync + 'static,
1366{
1367 const SCENARIO: &str = "batch_limiting";
1368
1369 let store = make_store();
1370
1371 let stream = contract_stream_id(SCENARIO, "stream")?;
1373
1374 let mut writes = register_contract_stream(
1375 SCENARIO,
1376 StreamWrites::new(),
1377 &stream,
1378 StreamVersion::new(0),
1379 )?;
1380
1381 for _ in 0..20 {
1383 writes = append_contract_event(SCENARIO, writes, &stream)?;
1384 }
1385
1386 let _ = store
1387 .append_events(writes)
1388 .await
1389 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1390
1391 let filter = EventFilter::all();
1393 let page = EventPage::first(BatchSize::new(10));
1394 let events = store
1395 .read_events::<ContractTestEvent>(filter, page)
1396 .await
1397 .map_err(|_error| {
1398 ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1399 })?;
1400
1401 if events.len() != 10 {
1403 return Err(ContractTestFailure::assertion(
1404 SCENARIO,
1405 format!("expected exactly 10 events but got {}", events.len()),
1406 ));
1407 }
1408
1409 Ok(())
1414}
1415
1416pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1422where
1423 F: Fn() -> CS + Send + Sync + Clone + 'static,
1424 CS: CheckpointStore + Send + Sync + 'static,
1425{
1426 const SCENARIO: &str = "checkpoint_save_and_load";
1427
1428 let store = make_checkpoint_store();
1429
1430 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1432 let position = StreamPosition::new(Uuid::now_v7());
1433
1434 store
1436 .save(&subscription_name, position)
1437 .await
1438 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1439
1440 let loaded = store
1442 .load(&subscription_name)
1443 .await
1444 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1445
1446 if loaded != Some(position) {
1447 return Err(ContractTestFailure::assertion(
1448 SCENARIO,
1449 format!(
1450 "expected loaded position {:?} but got {:?}",
1451 Some(position),
1452 loaded
1453 ),
1454 ));
1455 }
1456
1457 Ok(())
1458}
1459
1460pub async fn test_checkpoint_update_overwrites<F, CS>(
1462 make_checkpoint_store: F,
1463) -> ContractTestResult
1464where
1465 F: Fn() -> CS + Send + Sync + Clone + 'static,
1466 CS: CheckpointStore + Send + Sync + 'static,
1467{
1468 const SCENARIO: &str = "checkpoint_update_overwrites";
1469
1470 let store = make_checkpoint_store();
1471
1472 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1474 let first_position = StreamPosition::new(Uuid::now_v7());
1475
1476 store
1477 .save(&subscription_name, first_position)
1478 .await
1479 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1480
1481 let second_position = StreamPosition::new(Uuid::now_v7());
1483 store
1484 .save(&subscription_name, second_position)
1485 .await
1486 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1487
1488 let loaded = store
1490 .load(&subscription_name)
1491 .await
1492 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1493
1494 if loaded != Some(second_position) {
1495 return Err(ContractTestFailure::assertion(
1496 SCENARIO,
1497 format!(
1498 "expected updated position {:?} but got {:?}",
1499 Some(second_position),
1500 loaded
1501 ),
1502 ));
1503 }
1504
1505 Ok(())
1506}
1507
1508pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1510 make_checkpoint_store: F,
1511) -> ContractTestResult
1512where
1513 F: Fn() -> CS + Send + Sync + Clone + 'static,
1514 CS: CheckpointStore + Send + Sync + 'static,
1515{
1516 const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1517
1518 let store = make_checkpoint_store();
1519
1520 let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1522
1523 let loaded = store
1525 .load(&subscription_name)
1526 .await
1527 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1528
1529 if loaded.is_some() {
1531 return Err(ContractTestFailure::assertion(
1532 SCENARIO,
1533 format!("expected None for missing checkpoint but got {:?}", loaded),
1534 ));
1535 }
1536
1537 Ok(())
1538}
1539
1540pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1542 make_checkpoint_store: F,
1543) -> ContractTestResult
1544where
1545 F: Fn() -> CS + Send + Sync + Clone + 'static,
1546 CS: CheckpointStore + Send + Sync + 'static,
1547{
1548 const SCENARIO: &str = "checkpoint_independent_subscriptions";
1549
1550 let store = make_checkpoint_store();
1551
1552 let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1554 let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1555
1556 let position_a = StreamPosition::new(Uuid::now_v7());
1557 let position_b = StreamPosition::new(Uuid::now_v7());
1558
1559 store
1561 .save(&subscription_a, position_a)
1562 .await
1563 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1564
1565 store
1566 .save(&subscription_b, position_b)
1567 .await
1568 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1569
1570 let loaded_a = store
1572 .load(&subscription_a)
1573 .await
1574 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1575
1576 let loaded_b = store
1577 .load(&subscription_b)
1578 .await
1579 .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1580
1581 if loaded_a != Some(position_a) {
1582 return Err(ContractTestFailure::assertion(
1583 SCENARIO,
1584 format!(
1585 "subscription A: expected {:?} but got {:?}",
1586 Some(position_a),
1587 loaded_a
1588 ),
1589 ));
1590 }
1591
1592 if loaded_b != Some(position_b) {
1593 return Err(ContractTestFailure::assertion(
1594 SCENARIO,
1595 format!(
1596 "subscription B: expected {:?} but got {:?}",
1597 Some(position_b),
1598 loaded_b
1599 ),
1600 ));
1601 }
1602
1603 Ok(())
1604}
1605
1606pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1615where
1616 F: Fn() -> C + Send + Sync + Clone + 'static,
1617 C: ProjectorCoordinator + Send + Sync + 'static,
1618{
1619 const SCENARIO: &str = "coordination_acquire_leadership";
1620
1621 let coordinator = make_coordinator();
1622
1623 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1625
1626 let result = coordinator.try_acquire(&subscription_name).await;
1628
1629 if result.is_err() {
1631 return Err(ContractTestFailure::assertion(
1632 SCENARIO,
1633 "expected first instance to acquire leadership successfully, but try_acquire failed",
1634 ));
1635 }
1636
1637 Ok(())
1638}
1639
1640pub async fn test_coordination_second_instance_blocked<F, C>(
1645 make_coordinator: F,
1646) -> ContractTestResult
1647where
1648 F: Fn() -> C + Send + Sync + Clone + 'static,
1649 C: ProjectorCoordinator + Send + Sync + 'static,
1650{
1651 const SCENARIO: &str = "coordination_second_instance_blocked";
1652
1653 let coordinator = make_coordinator();
1654
1655 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1657
1658 let _first_guard = coordinator
1660 .try_acquire(&subscription_name)
1661 .await
1662 .map_err(|_| {
1663 ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1664 })?;
1665
1666 let second_result = coordinator.try_acquire(&subscription_name).await;
1668
1669 if second_result.is_ok() {
1671 return Err(ContractTestFailure::assertion(
1672 SCENARIO,
1673 "expected second instance to be blocked but try_acquire succeeded",
1674 ));
1675 }
1676
1677 Ok(())
1678}
1679
1680pub async fn test_coordination_independent_subscriptions<F, C>(
1686 make_coordinator: F,
1687) -> ContractTestResult
1688where
1689 F: Fn() -> C + Send + Sync + Clone + 'static,
1690 C: ProjectorCoordinator + Send + Sync + 'static,
1691{
1692 const SCENARIO: &str = "coordination_independent_subscriptions";
1693
1694 let coordinator = make_coordinator();
1695
1696 let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1698 let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1699
1700 let _guard_a = coordinator
1702 .try_acquire(&subscription_a)
1703 .await
1704 .map_err(|_| {
1705 ContractTestFailure::assertion(
1706 SCENARIO,
1707 "projector-A failed to acquire leadership for its subscription",
1708 )
1709 })?;
1710
1711 let result_b = coordinator.try_acquire(&subscription_b).await;
1714
1715 if result_b.is_err() {
1717 return Err(ContractTestFailure::assertion(
1718 SCENARIO,
1719 "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1720 ));
1721 }
1722
1723 Ok(())
1724}
1725
1726pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1732 make_coordinator: F,
1733) -> ContractTestResult
1734where
1735 F: Fn() -> C + Send + Sync + Clone + 'static,
1736 C: ProjectorCoordinator + Send + Sync + 'static,
1737{
1738 const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1739
1740 let coordinator = make_coordinator();
1741
1742 let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1744
1745 {
1747 let _first_guard = coordinator
1748 .try_acquire(&subscription_name)
1749 .await
1750 .map_err(|_| {
1751 ContractTestFailure::assertion(
1752 SCENARIO,
1753 "first instance failed to acquire leadership",
1754 )
1755 })?;
1756 }
1758
1759 let second_result = coordinator.try_acquire(&subscription_name).await;
1761
1762 if second_result.is_err() {
1764 return Err(ContractTestFailure::assertion(
1765 SCENARIO,
1766 "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1767 ));
1768 }
1769
1770 Ok(())
1771}