Skip to main content

eventcore_testing/
contract.rs

1use eventcore_types::{
2    BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3    EventStoreError, ProjectorCoordinator, StreamId, StreamPosition, StreamPrefix, StreamVersion,
4    StreamWrites,
5};
6use std::fmt;
7
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11#[derive(Debug)]
12pub struct ContractTestFailure {
13    scenario: &'static str,
14    detail: String,
15}
16
17impl ContractTestFailure {
18    fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19        Self {
20            scenario,
21            detail: detail.into(),
22        }
23    }
24
25    fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26        Self::new(scenario, format!("builder failure during {phase}: {error}"))
27    }
28
29    fn store_error(
30        scenario: &'static str,
31        operation: &'static str,
32        error: EventStoreError,
33    ) -> Self {
34        Self::new(
35            scenario,
36            format!("{operation} operation returned unexpected error: {error}"),
37        )
38    }
39
40    fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41        Self::new(scenario, detail)
42    }
43}
44
45impl fmt::Display for ContractTestFailure {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        write!(f, "[{}] {}", self.scenario, self.detail)
48    }
49}
50
51impl std::error::Error for ContractTestFailure {}
52
53pub type ContractTestResult = Result<(), ContractTestFailure>;
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ContractTestEvent {
57    stream_id: StreamId,
58}
59
60impl ContractTestEvent {
61    pub fn new(stream_id: StreamId) -> Self {
62        Self { stream_id }
63    }
64}
65
66impl Event for ContractTestEvent {
67    fn stream_id(&self) -> &StreamId {
68        &self.stream_id
69    }
70}
71
72fn contract_stream_id(
73    scenario: &'static str,
74    label: &str,
75) -> Result<StreamId, ContractTestFailure> {
76    // Include UUID for parallel test execution against shared database
77    let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
78
79    StreamId::try_new(raw.clone()).map_err(|error| {
80        ContractTestFailure::assertion(
81            scenario,
82            format!("unable to construct stream id `{}`: {}", raw, error),
83        )
84    })
85}
86
87fn builder_step(
88    scenario: &'static str,
89    phase: &'static str,
90    result: Result<StreamWrites, EventStoreError>,
91) -> Result<StreamWrites, ContractTestFailure> {
92    result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
93}
94
95fn register_contract_stream(
96    scenario: &'static str,
97    writes: StreamWrites,
98    stream_id: &StreamId,
99    expected_version: StreamVersion,
100) -> Result<StreamWrites, ContractTestFailure> {
101    builder_step(
102        scenario,
103        "register_stream",
104        writes.register_stream(stream_id.clone(), expected_version),
105    )
106}
107
108fn append_contract_event(
109    scenario: &'static str,
110    writes: StreamWrites,
111    stream_id: &StreamId,
112) -> Result<StreamWrites, ContractTestFailure> {
113    let event = ContractTestEvent::new(stream_id.clone());
114    builder_step(scenario, "append", writes.append(event))
115}
116
117pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
118where
119    F: Fn() -> S + Send + Sync + Clone + 'static,
120    S: EventStore + Send + Sync + 'static,
121{
122    const SCENARIO: &str = "basic_read_write";
123
124    let store = make_store();
125    let stream_id = contract_stream_id(SCENARIO, "single");
126
127    let stream_id = stream_id?;
128
129    let writes = register_contract_stream(
130        SCENARIO,
131        StreamWrites::new(),
132        &stream_id,
133        StreamVersion::new(0),
134    )?;
135    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
136
137    let _ = store
138        .append_events(writes)
139        .await
140        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
141
142    let reader = store
143        .read_stream::<ContractTestEvent>(stream_id.clone())
144        .await
145        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
146
147    let len = reader.len();
148    let empty = reader.is_empty();
149
150    if empty {
151        return Err(ContractTestFailure::assertion(
152            SCENARIO,
153            "expected stream to contain events but it was empty",
154        ));
155    }
156
157    if len != 1 {
158        return Err(ContractTestFailure::assertion(
159            SCENARIO,
160            format!(
161                "expected stream to contain exactly one event, observed len={}",
162                len
163            ),
164        ));
165    }
166
167    Ok(())
168}
169
170pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
171where
172    F: Fn() -> S + Send + Sync + Clone + 'static,
173    S: EventStore + Send + Sync + 'static,
174{
175    const SCENARIO: &str = "concurrent_version_conflicts";
176
177    let store = make_store();
178    let stream_id = contract_stream_id(SCENARIO, "shared")?;
179
180    let first_writes = register_contract_stream(
181        SCENARIO,
182        StreamWrites::new(),
183        &stream_id,
184        StreamVersion::new(0),
185    )?;
186    let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
187
188    let _ = store
189        .append_events(first_writes)
190        .await
191        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
192
193    let conflicting_writes = register_contract_stream(
194        SCENARIO,
195        StreamWrites::new(),
196        &stream_id,
197        StreamVersion::new(0),
198    )?;
199    let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
200
201    match store.append_events(conflicting_writes).await {
202        Err(EventStoreError::VersionConflict) => Ok(()),
203        Err(error) => Err(ContractTestFailure::store_error(
204            SCENARIO,
205            "append_events",
206            error,
207        )),
208        Ok(_) => Err(ContractTestFailure::assertion(
209            SCENARIO,
210            "expected version conflict but append succeeded",
211        )),
212    }
213}
214
215pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
216where
217    F: Fn() -> S + Send + Sync + Clone + 'static,
218    S: EventStore + Send + Sync + 'static,
219{
220    const SCENARIO: &str = "stream_isolation";
221
222    let store = make_store();
223    let left_stream = contract_stream_id(SCENARIO, "left")?;
224    let right_stream = contract_stream_id(SCENARIO, "right")?;
225
226    let writes = register_contract_stream(
227        SCENARIO,
228        StreamWrites::new(),
229        &left_stream,
230        StreamVersion::new(0),
231    )?;
232    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
233    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
234    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
235
236    let _ = store
237        .append_events(writes)
238        .await
239        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
240
241    let left_reader = store
242        .read_stream::<ContractTestEvent>(left_stream.clone())
243        .await
244        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
245
246    let right_reader = store
247        .read_stream::<ContractTestEvent>(right_stream.clone())
248        .await
249        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
250
251    let left_len = left_reader.len();
252    if left_len != 1 {
253        return Err(ContractTestFailure::assertion(
254            SCENARIO,
255            format!(
256                "left stream expected exactly one event but observed {}",
257                left_len
258            ),
259        ));
260    }
261
262    if left_reader
263        .iter()
264        .any(|event| event.stream_id() != &left_stream)
265    {
266        return Err(ContractTestFailure::assertion(
267            SCENARIO,
268            "left stream read events belonging to another stream",
269        ));
270    }
271
272    let right_len = right_reader.len();
273    if right_len != 1 {
274        return Err(ContractTestFailure::assertion(
275            SCENARIO,
276            format!(
277                "right stream expected exactly one event but observed {}",
278                right_len
279            ),
280        ));
281    }
282
283    if right_reader
284        .iter()
285        .any(|event| event.stream_id() != &right_stream)
286    {
287        return Err(ContractTestFailure::assertion(
288            SCENARIO,
289            "right stream read events belonging to another stream",
290        ));
291    }
292
293    Ok(())
294}
295
296pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
297where
298    F: Fn() -> S + Send + Sync + Clone + 'static,
299    S: EventStore + Send + Sync + 'static,
300{
301    const SCENARIO: &str = "missing_stream_reads";
302
303    let store = make_store();
304    let stream_id = contract_stream_id(SCENARIO, "ghost")?;
305
306    let reader = store
307        .read_stream::<ContractTestEvent>(stream_id.clone())
308        .await
309        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
310
311    if !reader.is_empty() {
312        return Err(ContractTestFailure::assertion(
313            SCENARIO,
314            "expected read_stream to succeed with no events for an untouched stream",
315        ));
316    }
317
318    Ok(())
319}
320
321pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
322where
323    F: Fn() -> S + Send + Sync + Clone + 'static,
324    S: EventStore + Send + Sync + 'static,
325{
326    const SCENARIO: &str = "conflict_preserves_atomicity";
327
328    let store = make_store();
329    let left_stream = contract_stream_id(SCENARIO, "left")?;
330    let right_stream = contract_stream_id(SCENARIO, "right")?;
331
332    // Seed one event per stream so we can introduce a single-stream conflict later.
333    let writes = register_contract_stream(
334        SCENARIO,
335        StreamWrites::new(),
336        &left_stream,
337        StreamVersion::new(0),
338    )?;
339    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
340    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
341    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
342
343    let _ = store
344        .append_events(writes)
345        .await
346        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
347
348    // Build a batch where the left stream has a stale expected version and the right stream is current.
349    let writes = register_contract_stream(
350        SCENARIO,
351        StreamWrites::new(),
352        &left_stream,
353        StreamVersion::new(0),
354    )?;
355    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
356    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
357    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
358
359    match store.append_events(writes).await {
360        Err(EventStoreError::VersionConflict) => {
361            let left_reader = store
362                .read_stream::<ContractTestEvent>(left_stream.clone())
363                .await
364                .map_err(|error| {
365                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
366                })?;
367            if left_reader.len() != 1 {
368                return Err(ContractTestFailure::assertion(
369                    SCENARIO,
370                    format!(
371                        "expected left stream to remain at len=1 after failed append, observed {}",
372                        left_reader.len()
373                    ),
374                ));
375            }
376
377            let right_reader = store
378                .read_stream::<ContractTestEvent>(right_stream.clone())
379                .await
380                .map_err(|error| {
381                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
382                })?;
383            if right_reader.len() != 1 {
384                return Err(ContractTestFailure::assertion(
385                    SCENARIO,
386                    format!(
387                        "expected right stream to remain at len=1 after failed append, observed {}",
388                        right_reader.len()
389                    ),
390                ));
391            }
392
393            Ok(())
394        }
395        Err(error) => Err(ContractTestFailure::store_error(
396            SCENARIO,
397            "append_events",
398            error,
399        )),
400        Ok(_) => Err(ContractTestFailure::assertion(
401            SCENARIO,
402            "expected version conflict but append succeeded",
403        )),
404    }
405}
406
407// NOTE: The old fragmented macros (event_store_contract_tests!, event_reader_contract_tests!)
408// have been removed. Use backend_contract_tests! which runs ALL contract tests.
409
410/// Unified contract test macro for all backend implementations.
411///
412/// This macro generates ALL contract tests for a backend implementation.
413/// When new contract tests are added to eventcore-testing, they automatically
414/// run for all backends that use this macro—no changes to backend test files required.
415///
416/// # Example
417///
418/// ```ignore
419/// backend_contract_tests! {
420///     suite = my_backend,
421///     make_store = || MyEventStore::new(),
422///     make_checkpoint_store = || MyCheckpointStore::new(),
423/// }
424/// ```
425///
426/// # Requirements
427///
428/// The store type must implement both `EventStore` and `EventReader` traits.
429/// The checkpoint store type must implement `CheckpointStore` trait.
430/// The coordinator type must implement `ProjectorCoordinator` trait.
431#[macro_export]
432macro_rules! backend_contract_tests {
433    (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
434        #[allow(non_snake_case)]
435        mod $suite {
436            use $crate::contract::{
437                test_basic_read_write, test_batch_limiting,
438                test_checkpoint_independent_subscriptions,
439                test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
440                test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
441                test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
442                test_coordination_independent_subscriptions,
443                test_coordination_leadership_released_on_guard_drop,
444                test_coordination_second_instance_blocked, test_event_ordering_across_streams,
445                test_missing_stream_reads, test_position_based_resumption, test_stream_isolation,
446                test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
447            };
448
449            #[tokio::test(flavor = "multi_thread")]
450            async fn basic_read_write_contract() {
451                test_basic_read_write($make_store)
452                    .await
453                    .expect("event store contract failed");
454            }
455
456            #[tokio::test(flavor = "multi_thread")]
457            async fn concurrent_version_conflicts_contract() {
458                test_concurrent_version_conflicts($make_store)
459                    .await
460                    .expect("event store contract failed");
461            }
462
463            #[tokio::test(flavor = "multi_thread")]
464            async fn stream_isolation_contract() {
465                test_stream_isolation($make_store)
466                    .await
467                    .expect("event store contract failed");
468            }
469
470            #[tokio::test(flavor = "multi_thread")]
471            async fn missing_stream_reads_contract() {
472                test_missing_stream_reads($make_store)
473                    .await
474                    .expect("event store contract failed");
475            }
476
477            #[tokio::test(flavor = "multi_thread")]
478            async fn conflict_preserves_atomicity_contract() {
479                test_conflict_preserves_atomicity($make_store)
480                    .await
481                    .expect("event store contract failed");
482            }
483
484            #[tokio::test(flavor = "multi_thread")]
485            async fn event_ordering_across_streams_contract() {
486                test_event_ordering_across_streams($make_store)
487                    .await
488                    .expect("event reader contract failed");
489            }
490
491            #[tokio::test(flavor = "multi_thread")]
492            async fn position_based_resumption_contract() {
493                test_position_based_resumption($make_store)
494                    .await
495                    .expect("event reader contract failed");
496            }
497
498            #[tokio::test(flavor = "multi_thread")]
499            async fn stream_prefix_filtering_contract() {
500                test_stream_prefix_filtering($make_store)
501                    .await
502                    .expect("event reader contract failed");
503            }
504
505            #[tokio::test(flavor = "multi_thread")]
506            async fn stream_prefix_requires_prefix_match_contract() {
507                test_stream_prefix_requires_prefix_match($make_store)
508                    .await
509                    .expect("event reader contract failed");
510            }
511
512            #[tokio::test(flavor = "multi_thread")]
513            async fn batch_limiting_contract() {
514                test_batch_limiting($make_store)
515                    .await
516                    .expect("event reader contract failed");
517            }
518
519            // CheckpointStore contract tests
520            #[tokio::test(flavor = "multi_thread")]
521            async fn checkpoint_save_and_load_contract() {
522                test_checkpoint_save_and_load($make_checkpoint_store)
523                    .await
524                    .expect("checkpoint store contract failed");
525            }
526
527            #[tokio::test(flavor = "multi_thread")]
528            async fn checkpoint_update_overwrites_contract() {
529                test_checkpoint_update_overwrites($make_checkpoint_store)
530                    .await
531                    .expect("checkpoint store contract failed");
532            }
533
534            #[tokio::test(flavor = "multi_thread")]
535            async fn checkpoint_load_missing_returns_none_contract() {
536                test_checkpoint_load_missing_returns_none($make_checkpoint_store)
537                    .await
538                    .expect("checkpoint store contract failed");
539            }
540
541            #[tokio::test(flavor = "multi_thread")]
542            async fn checkpoint_independent_subscriptions_contract() {
543                test_checkpoint_independent_subscriptions($make_checkpoint_store)
544                    .await
545                    .expect("checkpoint store contract failed");
546            }
547
548            // ProjectorCoordinator contract tests
549            #[tokio::test(flavor = "multi_thread")]
550            async fn coordination_acquire_leadership_contract() {
551                test_coordination_acquire_leadership($make_coordinator)
552                    .await
553                    .expect("coordinator contract failed");
554            }
555
556            #[tokio::test(flavor = "multi_thread")]
557            async fn coordination_second_instance_blocked_contract() {
558                test_coordination_second_instance_blocked($make_coordinator)
559                    .await
560                    .expect("coordinator contract failed");
561            }
562
563            #[tokio::test(flavor = "multi_thread")]
564            async fn coordination_independent_subscriptions_contract() {
565                test_coordination_independent_subscriptions($make_coordinator)
566                    .await
567                    .expect("coordinator contract failed");
568            }
569
570            #[tokio::test(flavor = "multi_thread")]
571            async fn coordination_leadership_released_on_guard_drop_contract() {
572                test_coordination_leadership_released_on_guard_drop($make_coordinator)
573                    .await
574                    .expect("coordinator contract failed");
575            }
576        }
577    };
578}
579
580pub use backend_contract_tests;
581
582/// Contract test: Events from multiple streams are read in global append order
583pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
584where
585    F: Fn() -> S + Send + Sync + Clone + 'static,
586    S: EventStore + EventReader + Send + Sync + 'static,
587{
588    const SCENARIO: &str = "event_ordering_across_streams";
589
590    let store = make_store();
591
592    // Given: Three streams with events appended in specific order
593    let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
594    let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
595    let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
596
597    // Append event to stream A
598    let writes = register_contract_stream(
599        SCENARIO,
600        StreamWrites::new(),
601        &stream_a,
602        StreamVersion::new(0),
603    )?;
604    let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
605    let _ = store
606        .append_events(writes)
607        .await
608        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
609
610    // Append event to stream B
611    let writes = register_contract_stream(
612        SCENARIO,
613        StreamWrites::new(),
614        &stream_b,
615        StreamVersion::new(0),
616    )?;
617    let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
618    let _ = store
619        .append_events(writes)
620        .await
621        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
622
623    // Append event to stream C
624    let writes = register_contract_stream(
625        SCENARIO,
626        StreamWrites::new(),
627        &stream_c,
628        StreamVersion::new(0),
629    )?;
630    let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
631    let _ = store
632        .append_events(writes)
633        .await
634        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
635
636    // When: Reading all events via EventReader with no position filter
637    let filter = EventFilter::all();
638    let page = EventPage::first(BatchSize::new(100));
639    let events = store
640        .read_events::<ContractTestEvent>(filter, page)
641        .await
642        .map_err(|_error| {
643            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
644        })?;
645
646    // Then: Events are returned in global append order (A, B, C)
647    if events.len() != 3 {
648        return Err(ContractTestFailure::assertion(
649            SCENARIO,
650            format!("expected 3 events but got {}", events.len()),
651        ));
652    }
653
654    // And: Verify complete ordering across all three streams
655    let (first_event, _) = &events[0];
656    if first_event.stream_id() != &stream_a {
657        return Err(ContractTestFailure::assertion(
658            SCENARIO,
659            format!(
660                "expected first event from stream_a but got from {:?}",
661                first_event.stream_id()
662            ),
663        ));
664    }
665
666    let (second_event, _) = &events[1];
667    if second_event.stream_id() != &stream_b {
668        return Err(ContractTestFailure::assertion(
669            SCENARIO,
670            format!(
671                "expected second event from stream_b but got from {:?}",
672                second_event.stream_id()
673            ),
674        ));
675    }
676
677    let (third_event, _) = &events[2];
678    if third_event.stream_id() != &stream_c {
679        return Err(ContractTestFailure::assertion(
680            SCENARIO,
681            format!(
682                "expected third event from stream_c but got from {:?}",
683                third_event.stream_id()
684            ),
685        ));
686    }
687
688    Ok(())
689}
690
691/// Contract test: Position-based resumption works correctly
692pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
693where
694    F: Fn() -> S + Send + Sync + Clone + 'static,
695    S: EventStore + EventReader + Send + Sync + 'static,
696{
697    const SCENARIO: &str = "position_based_resumption";
698
699    let store = make_store();
700
701    // Given: Events at positions 0, 1, 2, 3, 4 (5 events total)
702    let stream = contract_stream_id(SCENARIO, "stream")?;
703
704    let mut writes = register_contract_stream(
705        SCENARIO,
706        StreamWrites::new(),
707        &stream,
708        StreamVersion::new(0),
709    )?;
710
711    // Append 5 events
712    for _ in 0..5 {
713        writes = append_contract_event(SCENARIO, writes, &stream)?;
714    }
715
716    let _ = store
717        .append_events(writes)
718        .await
719        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
720
721    // Get position of third event (index 2, position 2)
722    let filter = EventFilter::all();
723    let page = EventPage::first(BatchSize::new(100));
724    let all_events = store
725        .read_events::<ContractTestEvent>(filter.clone(), page)
726        .await
727        .map_err(|_error| {
728            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
729        })?;
730
731    let (_third_event, third_position) = &all_events[2];
732
733    // When: Reading events after position 2
734    let page_after = EventPage::after(*third_position, BatchSize::new(100));
735    let events_after = store
736        .read_events::<ContractTestEvent>(filter, page_after)
737        .await
738        .map_err(|_error| {
739            ContractTestFailure::assertion(
740                SCENARIO,
741                "read_events failed when reading after position",
742            )
743        })?;
744
745    // Then: Only events at positions 3 and 4 are returned (2 events)
746    if events_after.len() != 2 {
747        return Err(ContractTestFailure::assertion(
748            SCENARIO,
749            format!(
750                "expected 2 events after position {} but got {}",
751                third_position,
752                events_after.len()
753            ),
754        ));
755    }
756
757    // And: Position 2 event is NOT included (verify exclusivity)
758    for (_event, position) in events_after.iter() {
759        if *position == *third_position {
760            return Err(ContractTestFailure::assertion(
761                SCENARIO,
762                format!(
763                    "expected position {} to be excluded but it was included in results",
764                    third_position
765                ),
766            ));
767        }
768    }
769
770    // And: Returned event positions are greater than third_position and in ascending order
771    let (_event1, pos1) = &events_after[0];
772    let (_event2, pos2) = &events_after[1];
773
774    if *pos1 <= *third_position {
775        return Err(ContractTestFailure::assertion(
776            SCENARIO,
777            format!(
778                "expected first returned position to be > {} but got {}",
779                third_position, pos1
780            ),
781        ));
782    }
783
784    if *pos2 <= *pos1 {
785        return Err(ContractTestFailure::assertion(
786            SCENARIO,
787            format!(
788                "expected positions to be in ascending order but {} <= {}",
789                pos2, pos1
790            ),
791        ));
792    }
793
794    Ok(())
795}
796
797/// Contract test: Stream prefix filtering returns only matching streams
798pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
799where
800    F: Fn() -> S + Send + Sync + Clone + 'static,
801    S: EventStore + EventReader + Send + Sync + 'static,
802{
803    const SCENARIO: &str = "stream_prefix_filtering";
804
805    let store = make_store();
806
807    // Given: Events on streams with IDs that actually start with "account-" or "order-"
808    let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
809        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
810    })?;
811    let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
812        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
813    })?;
814    let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
815        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
816    })?;
817
818    let mut writes = register_contract_stream(
819        SCENARIO,
820        StreamWrites::new(),
821        &account_1,
822        StreamVersion::new(0),
823    )?;
824    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
825    writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
826
827    writes = append_contract_event(SCENARIO, writes, &account_1)?;
828    writes = append_contract_event(SCENARIO, writes, &account_2)?;
829    writes = append_contract_event(SCENARIO, writes, &order_1)?;
830
831    let _ = store
832        .append_events(writes)
833        .await
834        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
835
836    // When: Reading with prefix filter "account-"
837    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
838        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
839    })?;
840    let filter = EventFilter::prefix(prefix);
841    let page = EventPage::first(BatchSize::new(100));
842    let events = store
843        .read_events::<ContractTestEvent>(filter, page)
844        .await
845        .map_err(|_error| {
846            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
847        })?;
848
849    // Then: Only events from account-1 and account-2 are returned
850    if events.len() != 2 {
851        return Err(ContractTestFailure::assertion(
852            SCENARIO,
853            format!(
854                "expected 2 events from account-* streams but got {}",
855                events.len()
856            ),
857        ));
858    }
859
860    // And: All events are from streams starting with "account-"
861    for (event, _) in events.iter() {
862        let stream_id_str = event.stream_id().as_ref();
863        if !stream_id_str.starts_with("account-") {
864            return Err(ContractTestFailure::assertion(
865                SCENARIO,
866                format!(
867                    "expected all events from streams starting with 'account-' but found event from {}",
868                    stream_id_str
869                ),
870            ));
871        }
872    }
873
874    // And: order-1 events are filtered out (verified by length check above)
875
876    Ok(())
877}
878
879/// Contract test: Stream prefix filtering requires true prefix match (not substring match)
880pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
881where
882    F: Fn() -> S + Send + Sync + Clone + 'static,
883    S: EventStore + EventReader + Send + Sync + 'static,
884{
885    const SCENARIO: &str = "stream_prefix_requires_prefix_match";
886
887    let store = make_store();
888
889    // Given: Three streams with actual prefixes: "account-123", "my-account-456", "order-789"
890    // We want to verify that prefix "account-" matches ONLY "account-123", not "my-account-456"
891    let account_stream =
892        StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
893            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
894        })?;
895    let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
896        .map_err(|e| {
897            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
898        })?;
899    let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
900        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
901    })?;
902
903    let mut writes = register_contract_stream(
904        SCENARIO,
905        StreamWrites::new(),
906        &account_stream,
907        StreamVersion::new(0),
908    )?;
909    writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
910    writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
911
912    writes = append_contract_event(SCENARIO, writes, &account_stream)?;
913    writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
914    writes = append_contract_event(SCENARIO, writes, &order_stream)?;
915
916    let _ = store
917        .append_events(writes)
918        .await
919        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
920
921    // When: Reading with prefix filter "account-"
922    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
923        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
924    })?;
925    let filter = EventFilter::prefix(prefix);
926    let page = EventPage::first(BatchSize::new(100));
927    let events = store
928        .read_events::<ContractTestEvent>(filter, page)
929        .await
930        .map_err(|_error| {
931            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
932        })?;
933
934    // Then: ONLY "account-123" stream should be returned (not "my-account-456")
935    if events.len() != 1 {
936        return Err(ContractTestFailure::assertion(
937            SCENARIO,
938            format!(
939                "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
940                events.len()
941            ),
942        ));
943    }
944
945    // And: The event must be from a stream starting with "account-123"
946    let (event, _) = &events[0];
947    let stream_id_str = event.stream_id().as_ref();
948    if !stream_id_str.starts_with("account-123") {
949        return Err(ContractTestFailure::assertion(
950            SCENARIO,
951            format!(
952                "expected event from stream starting with 'account-123' but got from {}",
953                stream_id_str
954            ),
955        ));
956    }
957
958    // And: Verify it's NOT from my-account-456 (proves we're not doing substring matching)
959    if stream_id_str.starts_with("my-account-456") {
960        return Err(ContractTestFailure::assertion(
961            SCENARIO,
962            "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
963        ));
964    }
965
966    Ok(())
967}
968
969/// Contract test: Batch limiting returns exactly the specified number of events
970pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
971where
972    F: Fn() -> S + Send + Sync + Clone + 'static,
973    S: EventStore + EventReader + Send + Sync + 'static,
974{
975    const SCENARIO: &str = "batch_limiting";
976
977    let store = make_store();
978
979    // Given: 20 events in the store
980    let stream = contract_stream_id(SCENARIO, "stream")?;
981
982    let mut writes = register_contract_stream(
983        SCENARIO,
984        StreamWrites::new(),
985        &stream,
986        StreamVersion::new(0),
987    )?;
988
989    // Append 20 events
990    for _ in 0..20 {
991        writes = append_contract_event(SCENARIO, writes, &stream)?;
992    }
993
994    let _ = store
995        .append_events(writes)
996        .await
997        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
998
999    // When: Read events with limit of 10
1000    let filter = EventFilter::all();
1001    let page = EventPage::first(BatchSize::new(10));
1002    let events = store
1003        .read_events::<ContractTestEvent>(filter, page)
1004        .await
1005        .map_err(|_error| {
1006            ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1007        })?;
1008
1009    // Then: Exactly 10 events are returned
1010    if events.len() != 10 {
1011        return Err(ContractTestFailure::assertion(
1012            SCENARIO,
1013            format!("expected exactly 10 events but got {}", events.len()),
1014        ));
1015    }
1016
1017    // And: Events are the FIRST 10 in global order
1018    // (We verify this by checking we got exactly 10 events - the implementation
1019    // must return events in order, so if we got 10 events they must be the first 10)
1020
1021    Ok(())
1022}
1023
1024// ============================================================================
1025// CheckpointStore Contract Tests
1026// ============================================================================
1027
1028/// Contract test: Save a checkpoint and load it back
1029pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1030where
1031    F: Fn() -> CS + Send + Sync + Clone + 'static,
1032    CS: CheckpointStore + Send + Sync + 'static,
1033{
1034    const SCENARIO: &str = "checkpoint_save_and_load";
1035
1036    let store = make_checkpoint_store();
1037
1038    // Given: A subscription name and position
1039    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1040    let position = StreamPosition::new(Uuid::now_v7());
1041
1042    // When: Saving the checkpoint
1043    store
1044        .save(&subscription_name, position)
1045        .await
1046        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1047
1048    // Then: Loading returns the saved position
1049    let loaded = store
1050        .load(&subscription_name)
1051        .await
1052        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1053
1054    if loaded != Some(position) {
1055        return Err(ContractTestFailure::assertion(
1056            SCENARIO,
1057            format!(
1058                "expected loaded position {:?} but got {:?}",
1059                Some(position),
1060                loaded
1061            ),
1062        ));
1063    }
1064
1065    Ok(())
1066}
1067
1068/// Contract test: Saving a checkpoint overwrites the previous value
1069pub async fn test_checkpoint_update_overwrites<F, CS>(
1070    make_checkpoint_store: F,
1071) -> ContractTestResult
1072where
1073    F: Fn() -> CS + Send + Sync + Clone + 'static,
1074    CS: CheckpointStore + Send + Sync + 'static,
1075{
1076    const SCENARIO: &str = "checkpoint_update_overwrites";
1077
1078    let store = make_checkpoint_store();
1079
1080    // Given: A subscription with an initial checkpoint
1081    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1082    let first_position = StreamPosition::new(Uuid::now_v7());
1083
1084    store
1085        .save(&subscription_name, first_position)
1086        .await
1087        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1088
1089    // When: Saving a new position
1090    let second_position = StreamPosition::new(Uuid::now_v7());
1091    store
1092        .save(&subscription_name, second_position)
1093        .await
1094        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1095
1096    // Then: Loading returns the new position, not the old one
1097    let loaded = store
1098        .load(&subscription_name)
1099        .await
1100        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1101
1102    if loaded != Some(second_position) {
1103        return Err(ContractTestFailure::assertion(
1104            SCENARIO,
1105            format!(
1106                "expected updated position {:?} but got {:?}",
1107                Some(second_position),
1108                loaded
1109            ),
1110        ));
1111    }
1112
1113    Ok(())
1114}
1115
1116/// Contract test: Loading a non-existent checkpoint returns None
1117pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1118    make_checkpoint_store: F,
1119) -> ContractTestResult
1120where
1121    F: Fn() -> CS + Send + Sync + Clone + 'static,
1122    CS: CheckpointStore + Send + Sync + 'static,
1123{
1124    const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1125
1126    let store = make_checkpoint_store();
1127
1128    // Given: A subscription name that has never been saved
1129    let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1130
1131    // When: Loading the checkpoint
1132    let loaded = store
1133        .load(&subscription_name)
1134        .await
1135        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1136
1137    // Then: None is returned
1138    if loaded.is_some() {
1139        return Err(ContractTestFailure::assertion(
1140            SCENARIO,
1141            format!("expected None for missing checkpoint but got {:?}", loaded),
1142        ));
1143    }
1144
1145    Ok(())
1146}
1147
1148/// Contract test: Different subscription names have independent checkpoints
1149pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1150    make_checkpoint_store: F,
1151) -> ContractTestResult
1152where
1153    F: Fn() -> CS + Send + Sync + Clone + 'static,
1154    CS: CheckpointStore + Send + Sync + 'static,
1155{
1156    const SCENARIO: &str = "checkpoint_independent_subscriptions";
1157
1158    let store = make_checkpoint_store();
1159
1160    // Given: Two subscription names
1161    let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1162    let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1163
1164    let position_a = StreamPosition::new(Uuid::now_v7());
1165    let position_b = StreamPosition::new(Uuid::now_v7());
1166
1167    // When: Saving different positions for each
1168    store
1169        .save(&subscription_a, position_a)
1170        .await
1171        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1172
1173    store
1174        .save(&subscription_b, position_b)
1175        .await
1176        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1177
1178    // Then: Each subscription loads its own position
1179    let loaded_a = store
1180        .load(&subscription_a)
1181        .await
1182        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1183
1184    let loaded_b = store
1185        .load(&subscription_b)
1186        .await
1187        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1188
1189    if loaded_a != Some(position_a) {
1190        return Err(ContractTestFailure::assertion(
1191            SCENARIO,
1192            format!(
1193                "subscription A: expected {:?} but got {:?}",
1194                Some(position_a),
1195                loaded_a
1196            ),
1197        ));
1198    }
1199
1200    if loaded_b != Some(position_b) {
1201        return Err(ContractTestFailure::assertion(
1202            SCENARIO,
1203            format!(
1204                "subscription B: expected {:?} but got {:?}",
1205                Some(position_b),
1206                loaded_b
1207            ),
1208        ));
1209    }
1210
1211    Ok(())
1212}
1213
1214// ============================================================================
1215// ProjectorCoordinator Contract Tests
1216// ============================================================================
1217
1218/// Contract test: First instance can acquire leadership successfully
1219///
1220/// Observable behavior: When no other instance holds leadership for a subscription,
1221/// calling try_acquire returns a guard indicating successful acquisition.
1222pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1223where
1224    F: Fn() -> C + Send + Sync + Clone + 'static,
1225    C: ProjectorCoordinator + Send + Sync + 'static,
1226{
1227    const SCENARIO: &str = "coordination_acquire_leadership";
1228
1229    let coordinator = make_coordinator();
1230
1231    // Given: A unique subscription name (no existing leadership)
1232    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1233
1234    // When: Attempting to acquire leadership
1235    let result = coordinator.try_acquire(&subscription_name).await;
1236
1237    // Then: Acquisition succeeds (returns Ok with guard)
1238    if result.is_err() {
1239        return Err(ContractTestFailure::assertion(
1240            SCENARIO,
1241            "expected first instance to acquire leadership successfully, but try_acquire failed",
1242        ));
1243    }
1244
1245    Ok(())
1246}
1247
1248/// Contract test: Second instance returns error when leadership unavailable
1249///
1250/// Observable behavior: When one instance holds leadership for a subscription,
1251/// a second attempt to acquire leadership for the same subscription returns an error.
1252pub async fn test_coordination_second_instance_blocked<F, C>(
1253    make_coordinator: F,
1254) -> ContractTestResult
1255where
1256    F: Fn() -> C + Send + Sync + Clone + 'static,
1257    C: ProjectorCoordinator + Send + Sync + 'static,
1258{
1259    const SCENARIO: &str = "coordination_second_instance_blocked";
1260
1261    let coordinator = make_coordinator();
1262
1263    // Given: A unique subscription name
1264    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1265
1266    // And: First instance acquires leadership
1267    let _first_guard = coordinator
1268        .try_acquire(&subscription_name)
1269        .await
1270        .map_err(|_| {
1271            ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1272        })?;
1273
1274    // When: Second instance attempts to acquire leadership while first holds it
1275    let second_result = coordinator.try_acquire(&subscription_name).await;
1276
1277    // Then: Second attempt returns an error (leadership unavailable)
1278    if second_result.is_ok() {
1279        return Err(ContractTestFailure::assertion(
1280            SCENARIO,
1281            "expected second instance to be blocked but try_acquire succeeded",
1282        ));
1283    }
1284
1285    Ok(())
1286}
1287
1288/// Contract test: Different projectors have independent coordination (different lock keys)
1289///
1290/// Observable behavior: Leadership for one subscription does not block leadership
1291/// acquisition for a different subscription. Each subscription/projector has its own
1292/// independent coordination scope.
1293pub async fn test_coordination_independent_subscriptions<F, C>(
1294    make_coordinator: F,
1295) -> ContractTestResult
1296where
1297    F: Fn() -> C + Send + Sync + Clone + 'static,
1298    C: ProjectorCoordinator + Send + Sync + 'static,
1299{
1300    const SCENARIO: &str = "coordination_independent_subscriptions";
1301
1302    let coordinator = make_coordinator();
1303
1304    // Given: Two unique subscription names (different projectors)
1305    let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1306    let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1307
1308    // And: First projector acquires leadership for subscription A
1309    let _guard_a = coordinator
1310        .try_acquire(&subscription_a)
1311        .await
1312        .map_err(|_| {
1313            ContractTestFailure::assertion(
1314                SCENARIO,
1315                "projector-A failed to acquire leadership for its subscription",
1316            )
1317        })?;
1318
1319    // When: Second projector attempts to acquire leadership for subscription B
1320    // (while first projector still holds leadership for A)
1321    let result_b = coordinator.try_acquire(&subscription_b).await;
1322
1323    // Then: Second acquisition succeeds (different subscriptions are independent)
1324    if result_b.is_err() {
1325        return Err(ContractTestFailure::assertion(
1326            SCENARIO,
1327            "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1328        ));
1329    }
1330
1331    Ok(())
1332}
1333
1334/// Contract test: Leadership is released when guard is dropped (crash/disconnect recovery)
1335///
1336/// Observable behavior: When an instance holding leadership drops its guard (simulating
1337/// process exit, crash, or connection close), the lock is released and another instance
1338/// can acquire leadership for the same subscription.
1339pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1340    make_coordinator: F,
1341) -> ContractTestResult
1342where
1343    F: Fn() -> C + Send + Sync + Clone + 'static,
1344    C: ProjectorCoordinator + Send + Sync + 'static,
1345{
1346    const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1347
1348    let coordinator = make_coordinator();
1349
1350    // Given: A unique subscription name
1351    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1352
1353    // And: First instance acquires leadership, then drops the guard
1354    {
1355        let _first_guard = coordinator
1356            .try_acquire(&subscription_name)
1357            .await
1358            .map_err(|_| {
1359                ContractTestFailure::assertion(
1360                    SCENARIO,
1361                    "first instance failed to acquire leadership",
1362                )
1363            })?;
1364        // Guard is dropped here when scope ends (simulates process exit/crash)
1365    }
1366
1367    // When: Second instance attempts to acquire leadership after first guard dropped
1368    let second_result = coordinator.try_acquire(&subscription_name).await;
1369
1370    // Then: Second acquisition succeeds (leadership was released)
1371    if second_result.is_err() {
1372        return Err(ContractTestFailure::assertion(
1373            SCENARIO,
1374            "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1375        ));
1376    }
1377
1378    Ok(())
1379}