Skip to main content

eventcore_testing/
contract.rs

1use eventcore_types::{
2    BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3    EventStoreError, ProjectorCoordinator, StreamId, StreamPattern, StreamPosition, StreamPrefix,
4    StreamVersion, StreamWrites, collect_events,
5};
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug, thiserror::Error)]
11#[error("[{scenario}] {detail}")]
12pub struct ContractTestFailure {
13    scenario: &'static str,
14    detail: String,
15}
16
17impl ContractTestFailure {
18    fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19        Self {
20            scenario,
21            detail: detail.into(),
22        }
23    }
24
25    fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26        Self::new(scenario, format!("builder failure during {phase}: {error}"))
27    }
28
29    fn store_error(
30        scenario: &'static str,
31        operation: &'static str,
32        error: EventStoreError,
33    ) -> Self {
34        Self::new(
35            scenario,
36            format!("{operation} operation returned unexpected error: {error}"),
37        )
38    }
39
40    fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41        Self::new(scenario, detail)
42    }
43}
44
45pub type ContractTestResult = Result<(), ContractTestFailure>;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContractTestEvent {
49    stream_id: StreamId,
50}
51
52impl ContractTestEvent {
53    pub fn new(stream_id: StreamId) -> Self {
54        Self { stream_id }
55    }
56}
57
58impl Event for ContractTestEvent {
59    fn stream_id(&self) -> &StreamId {
60        &self.stream_id
61    }
62
63    fn event_type_name() -> &'static str {
64        "ContractTestEvent"
65    }
66}
67
68fn contract_stream_id(
69    scenario: &'static str,
70    label: &str,
71) -> Result<StreamId, ContractTestFailure> {
72    // Include UUID for parallel test execution against shared database
73    let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
74
75    StreamId::try_new(raw.clone()).map_err(|error| {
76        ContractTestFailure::assertion(
77            scenario,
78            format!("unable to construct stream id `{}`: {}", raw, error),
79        )
80    })
81}
82
83fn builder_step(
84    scenario: &'static str,
85    phase: &'static str,
86    result: Result<StreamWrites, EventStoreError>,
87) -> Result<StreamWrites, ContractTestFailure> {
88    result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
89}
90
91fn register_contract_stream(
92    scenario: &'static str,
93    writes: StreamWrites,
94    stream_id: &StreamId,
95    expected_version: StreamVersion,
96) -> Result<StreamWrites, ContractTestFailure> {
97    builder_step(
98        scenario,
99        "register_stream",
100        writes.register_stream(stream_id.clone(), expected_version),
101    )
102}
103
104fn append_contract_event(
105    scenario: &'static str,
106    writes: StreamWrites,
107    stream_id: &StreamId,
108) -> Result<StreamWrites, ContractTestFailure> {
109    let event = ContractTestEvent::new(stream_id.clone());
110    builder_step(scenario, "append", writes.append(event))
111}
112
113pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
114where
115    F: Fn() -> S + Send + Sync + Clone + 'static,
116    S: EventStore + Send + Sync + 'static,
117{
118    const SCENARIO: &str = "basic_read_write";
119
120    let store = make_store();
121    let stream_id = contract_stream_id(SCENARIO, "single");
122
123    let stream_id = stream_id?;
124
125    let writes = register_contract_stream(
126        SCENARIO,
127        StreamWrites::new(),
128        &stream_id,
129        StreamVersion::new(0),
130    )?;
131    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
132
133    let _ = store
134        .append_events(writes)
135        .await
136        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
137
138    let stream = store
139        .read_stream::<ContractTestEvent>(stream_id.clone())
140        .await
141        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
142    let events = collect_events(stream)
143        .await
144        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146    let len = events.len();
147    let empty = events.is_empty();
148
149    if empty {
150        return Err(ContractTestFailure::assertion(
151            SCENARIO,
152            "expected stream to contain events but it was empty",
153        ));
154    }
155
156    if len != 1 {
157        return Err(ContractTestFailure::assertion(
158            SCENARIO,
159            format!(
160                "expected stream to contain exactly one event, observed len={}",
161                len
162            ),
163        ));
164    }
165
166    Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171    F: Fn() -> S + Send + Sync + Clone + 'static,
172    S: EventStore + Send + Sync + 'static,
173{
174    const SCENARIO: &str = "concurrent_version_conflicts";
175
176    let store = make_store();
177    let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179    let first_writes = register_contract_stream(
180        SCENARIO,
181        StreamWrites::new(),
182        &stream_id,
183        StreamVersion::new(0),
184    )?;
185    let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187    let _ = store
188        .append_events(first_writes)
189        .await
190        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192    let conflicting_writes = register_contract_stream(
193        SCENARIO,
194        StreamWrites::new(),
195        &stream_id,
196        StreamVersion::new(0),
197    )?;
198    let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200    match store.append_events(conflicting_writes).await {
201        Err(EventStoreError::VersionConflict { .. }) => Ok(()),
202        Err(error) => Err(ContractTestFailure::store_error(
203            SCENARIO,
204            "append_events",
205            error,
206        )),
207        Ok(_) => Err(ContractTestFailure::assertion(
208            SCENARIO,
209            "expected version conflict but append succeeded",
210        )),
211    }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216    F: Fn() -> S + Send + Sync + Clone + 'static,
217    S: EventStore + Send + Sync + 'static,
218{
219    const SCENARIO: &str = "stream_isolation";
220
221    let store = make_store();
222    let left_stream = contract_stream_id(SCENARIO, "left")?;
223    let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225    let writes = register_contract_stream(
226        SCENARIO,
227        StreamWrites::new(),
228        &left_stream,
229        StreamVersion::new(0),
230    )?;
231    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235    let _ = store
236        .append_events(writes)
237        .await
238        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240    let left_stream_events = store
241        .read_stream::<ContractTestEvent>(left_stream.clone())
242        .await
243        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244    let left_events = collect_events(left_stream_events)
245        .await
246        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
247
248    let right_stream_events = store
249        .read_stream::<ContractTestEvent>(right_stream.clone())
250        .await
251        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
252    let right_events = collect_events(right_stream_events)
253        .await
254        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
255
256    let left_len = left_events.len();
257    if left_len != 1 {
258        return Err(ContractTestFailure::assertion(
259            SCENARIO,
260            format!(
261                "left stream expected exactly one event but observed {}",
262                left_len
263            ),
264        ));
265    }
266
267    if left_events
268        .iter()
269        .any(|event| event.stream_id() != &left_stream)
270    {
271        return Err(ContractTestFailure::assertion(
272            SCENARIO,
273            "left stream read events belonging to another stream",
274        ));
275    }
276
277    let right_len = right_events.len();
278    if right_len != 1 {
279        return Err(ContractTestFailure::assertion(
280            SCENARIO,
281            format!(
282                "right stream expected exactly one event but observed {}",
283                right_len
284            ),
285        ));
286    }
287
288    if right_events
289        .iter()
290        .any(|event| event.stream_id() != &right_stream)
291    {
292        return Err(ContractTestFailure::assertion(
293            SCENARIO,
294            "right stream read events belonging to another stream",
295        ));
296    }
297
298    Ok(())
299}
300
301pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
302where
303    F: Fn() -> S + Send + Sync + Clone + 'static,
304    S: EventStore + Send + Sync + 'static,
305{
306    const SCENARIO: &str = "missing_stream_reads";
307
308    let store = make_store();
309    let stream_id = contract_stream_id(SCENARIO, "ghost")?;
310
311    let stream = store
312        .read_stream::<ContractTestEvent>(stream_id.clone())
313        .await
314        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
315    let events = collect_events(stream)
316        .await
317        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
318
319    if !events.is_empty() {
320        return Err(ContractTestFailure::assertion(
321            SCENARIO,
322            "expected read_stream to succeed with no events for an untouched stream",
323        ));
324    }
325
326    Ok(())
327}
328
329pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
330where
331    F: Fn() -> S + Send + Sync + Clone + 'static,
332    S: EventStore + Send + Sync + 'static,
333{
334    const SCENARIO: &str = "conflict_preserves_atomicity";
335
336    let store = make_store();
337    let left_stream = contract_stream_id(SCENARIO, "left")?;
338    let right_stream = contract_stream_id(SCENARIO, "right")?;
339
340    // Seed one event per stream so we can introduce a single-stream conflict later.
341    let writes = register_contract_stream(
342        SCENARIO,
343        StreamWrites::new(),
344        &left_stream,
345        StreamVersion::new(0),
346    )?;
347    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
348    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
349    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
350
351    let _ = store
352        .append_events(writes)
353        .await
354        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
355
356    // Build a batch where the left stream has a stale expected version and the right stream is current.
357    let writes = register_contract_stream(
358        SCENARIO,
359        StreamWrites::new(),
360        &left_stream,
361        StreamVersion::new(0),
362    )?;
363    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
364    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
365    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
366
367    match store.append_events(writes).await {
368        Err(EventStoreError::VersionConflict { .. }) => {
369            let left_stream_events = store
370                .read_stream::<ContractTestEvent>(left_stream.clone())
371                .await
372                .map_err(|error| {
373                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
374                })?;
375            let left_events = collect_events(left_stream_events).await.map_err(|error| {
376                ContractTestFailure::store_error(SCENARIO, "read_stream", error)
377            })?;
378            if left_events.len() != 1 {
379                return Err(ContractTestFailure::assertion(
380                    SCENARIO,
381                    format!(
382                        "expected left stream to remain at len=1 after failed append, observed {}",
383                        left_events.len()
384                    ),
385                ));
386            }
387
388            let right_stream_events = store
389                .read_stream::<ContractTestEvent>(right_stream.clone())
390                .await
391                .map_err(|error| {
392                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
393                })?;
394            let right_events = collect_events(right_stream_events).await.map_err(|error| {
395                ContractTestFailure::store_error(SCENARIO, "read_stream", error)
396            })?;
397            if right_events.len() != 1 {
398                return Err(ContractTestFailure::assertion(
399                    SCENARIO,
400                    format!(
401                        "expected right stream to remain at len=1 after failed append, observed {}",
402                        right_events.len()
403                    ),
404                ));
405            }
406
407            Ok(())
408        }
409        Err(error) => Err(ContractTestFailure::store_error(
410            SCENARIO,
411            "append_events",
412            error,
413        )),
414        Ok(_) => Err(ContractTestFailure::assertion(
415            SCENARIO,
416            "expected version conflict but append succeeded",
417        )),
418    }
419}
420
421/// A different event type used to test type mismatch detection in read_stream.
422#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct MismatchedEvent {
424    stream_id: StreamId,
425    extra_field: String,
426}
427
428impl Event for MismatchedEvent {
429    fn stream_id(&self) -> &StreamId {
430        &self.stream_id
431    }
432
433    fn event_type_name() -> &'static str {
434        "MismatchedEvent"
435    }
436}
437
438/// Contract test: read_stream errors when events on the stream don't match
439/// the requested type.
440///
441/// This verifies that all backends behave consistently when a stream contains
442/// events that were written with one type but read with a different type.
443/// The correct behavior is to return `EventStoreError::DeserializationFailed`,
444/// not to silently skip unrecognized events.
445pub async fn test_read_stream_errors_on_type_mismatch<F, S>(make_store: F) -> ContractTestResult
446where
447    F: Fn() -> S + Send + Sync + Clone + 'static,
448    S: EventStore + Send + Sync + 'static,
449{
450    const SCENARIO: &str = "read_stream_errors_on_type_mismatch";
451
452    let store = make_store();
453    let stream_id = contract_stream_id(SCENARIO, "mismatched")?;
454
455    // Write an event using ContractTestEvent
456    let writes = register_contract_stream(
457        SCENARIO,
458        StreamWrites::new(),
459        &stream_id,
460        StreamVersion::new(0),
461    )?;
462    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
463
464    let _ = store
465        .append_events(writes)
466        .await
467        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
468
469    // Read the same stream but request a different event type. Opening the
470    // stream may succeed; the type mismatch surfaces as a per-event decode
471    // failure when the stream is consumed. Either path (an open-time error or
472    // an item-level error during collection) is acceptable, but the stream
473    // MUST NOT silently yield events of the wrong type.
474    let result = match store.read_stream::<MismatchedEvent>(stream_id).await {
475        Ok(stream) => collect_events(stream).await,
476        Err(error) => Err(error),
477    };
478
479    match result {
480        Err(EventStoreError::DeserializationFailed { .. }) => Ok(()),
481        Err(other) => Err(ContractTestFailure::assertion(
482            SCENARIO,
483            format!(
484                "expected DeserializationFailed error, got different error: {}",
485                other
486            ),
487        )),
488        Ok(events) if events.is_empty() => Err(ContractTestFailure::assertion(
489            SCENARIO,
490            "read_stream silently returned empty results instead of erroring on type mismatch",
491        )),
492        Ok(events) => Err(ContractTestFailure::assertion(
493            SCENARIO,
494            format!(
495                "read_stream returned {} events instead of erroring on type mismatch",
496                events.len()
497            ),
498        )),
499    }
500}
501
502// NOTE: The old fragmented macros (event_store_contract_tests!, event_reader_contract_tests!)
503// have been removed. Use backend_contract_tests! which runs ALL contract tests.
504
505/// Unified contract test macro for all backend implementations.
506///
507/// This macro generates ALL contract tests for a backend implementation.
508/// When new contract tests are added to eventcore-testing, they automatically
509/// run for all backends that use this macro—no changes to backend test files required.
510///
511/// # Example
512///
513/// ```ignore
514/// backend_contract_tests! {
515///     suite = my_backend,
516///     make_store = || MyEventStore::new(),
517///     make_checkpoint_store = || MyCheckpointStore::new(),
518/// }
519/// ```
520///
521/// # Requirements
522///
523/// The store type must implement both `EventStore` and `EventReader` traits.
524/// The checkpoint store type must implement `CheckpointStore` trait.
525/// The coordinator type must implement `ProjectorCoordinator` trait.
526#[macro_export]
527macro_rules! backend_contract_tests {
528    (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
529        #[allow(non_snake_case)]
530        mod $suite {
531            use $crate::contract::{
532                test_basic_read_write, test_batch_limiting,
533                test_checkpoint_independent_subscriptions,
534                test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
535                test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
536                test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
537                test_coordination_independent_subscriptions,
538                test_coordination_leadership_released_on_guard_drop,
539                test_coordination_second_instance_blocked, test_event_ordering_across_streams,
540                test_missing_stream_reads, test_position_based_resumption,
541                test_read_stream_errors_on_type_mismatch, test_stream_isolation,
542                test_stream_pattern_char_class, test_stream_pattern_filtering,
543                test_stream_pattern_single_char, test_stream_prefix_filtering,
544                test_stream_prefix_requires_prefix_match,
545            };
546
547            #[tokio::test(flavor = "multi_thread")]
548            async fn basic_read_write_contract() {
549                test_basic_read_write($make_store)
550                    .await
551                    .expect("event store contract failed");
552            }
553
554            #[tokio::test(flavor = "multi_thread")]
555            async fn concurrent_version_conflicts_contract() {
556                test_concurrent_version_conflicts($make_store)
557                    .await
558                    .expect("event store contract failed");
559            }
560
561            #[tokio::test(flavor = "multi_thread")]
562            async fn stream_isolation_contract() {
563                test_stream_isolation($make_store)
564                    .await
565                    .expect("event store contract failed");
566            }
567
568            #[tokio::test(flavor = "multi_thread")]
569            async fn missing_stream_reads_contract() {
570                test_missing_stream_reads($make_store)
571                    .await
572                    .expect("event store contract failed");
573            }
574
575            #[tokio::test(flavor = "multi_thread")]
576            async fn conflict_preserves_atomicity_contract() {
577                test_conflict_preserves_atomicity($make_store)
578                    .await
579                    .expect("event store contract failed");
580            }
581
582            #[tokio::test(flavor = "multi_thread")]
583            async fn read_stream_errors_on_type_mismatch_contract() {
584                test_read_stream_errors_on_type_mismatch($make_store)
585                    .await
586                    .expect("event store contract failed");
587            }
588
589            #[tokio::test(flavor = "multi_thread")]
590            async fn event_ordering_across_streams_contract() {
591                test_event_ordering_across_streams($make_store)
592                    .await
593                    .expect("event reader contract failed");
594            }
595
596            #[tokio::test(flavor = "multi_thread")]
597            async fn position_based_resumption_contract() {
598                test_position_based_resumption($make_store)
599                    .await
600                    .expect("event reader contract failed");
601            }
602
603            #[tokio::test(flavor = "multi_thread")]
604            async fn stream_prefix_filtering_contract() {
605                test_stream_prefix_filtering($make_store)
606                    .await
607                    .expect("event reader contract failed");
608            }
609
610            #[tokio::test(flavor = "multi_thread")]
611            async fn stream_prefix_requires_prefix_match_contract() {
612                test_stream_prefix_requires_prefix_match($make_store)
613                    .await
614                    .expect("event reader contract failed");
615            }
616
617            #[tokio::test(flavor = "multi_thread")]
618            async fn stream_pattern_filtering_contract() {
619                test_stream_pattern_filtering($make_store)
620                    .await
621                    .expect("event reader contract failed");
622            }
623
624            #[tokio::test(flavor = "multi_thread")]
625            async fn stream_pattern_single_char_contract() {
626                test_stream_pattern_single_char($make_store)
627                    .await
628                    .expect("event reader contract failed");
629            }
630
631            #[tokio::test(flavor = "multi_thread")]
632            async fn stream_pattern_char_class_contract() {
633                test_stream_pattern_char_class($make_store)
634                    .await
635                    .expect("event reader contract failed");
636            }
637
638            #[tokio::test(flavor = "multi_thread")]
639            async fn batch_limiting_contract() {
640                test_batch_limiting($make_store)
641                    .await
642                    .expect("event reader contract failed");
643            }
644
645            // CheckpointStore contract tests
646            #[tokio::test(flavor = "multi_thread")]
647            async fn checkpoint_save_and_load_contract() {
648                test_checkpoint_save_and_load($make_checkpoint_store)
649                    .await
650                    .expect("checkpoint store contract failed");
651            }
652
653            #[tokio::test(flavor = "multi_thread")]
654            async fn checkpoint_update_overwrites_contract() {
655                test_checkpoint_update_overwrites($make_checkpoint_store)
656                    .await
657                    .expect("checkpoint store contract failed");
658            }
659
660            #[tokio::test(flavor = "multi_thread")]
661            async fn checkpoint_load_missing_returns_none_contract() {
662                test_checkpoint_load_missing_returns_none($make_checkpoint_store)
663                    .await
664                    .expect("checkpoint store contract failed");
665            }
666
667            #[tokio::test(flavor = "multi_thread")]
668            async fn checkpoint_independent_subscriptions_contract() {
669                test_checkpoint_independent_subscriptions($make_checkpoint_store)
670                    .await
671                    .expect("checkpoint store contract failed");
672            }
673
674            // ProjectorCoordinator contract tests
675            #[tokio::test(flavor = "multi_thread")]
676            async fn coordination_acquire_leadership_contract() {
677                test_coordination_acquire_leadership($make_coordinator)
678                    .await
679                    .expect("coordinator contract failed");
680            }
681
682            #[tokio::test(flavor = "multi_thread")]
683            async fn coordination_second_instance_blocked_contract() {
684                test_coordination_second_instance_blocked($make_coordinator)
685                    .await
686                    .expect("coordinator contract failed");
687            }
688
689            #[tokio::test(flavor = "multi_thread")]
690            async fn coordination_independent_subscriptions_contract() {
691                test_coordination_independent_subscriptions($make_coordinator)
692                    .await
693                    .expect("coordinator contract failed");
694            }
695
696            #[tokio::test(flavor = "multi_thread")]
697            async fn coordination_leadership_released_on_guard_drop_contract() {
698                test_coordination_leadership_released_on_guard_drop($make_coordinator)
699                    .await
700                    .expect("coordinator contract failed");
701            }
702        }
703    };
704}
705
706pub use backend_contract_tests;
707
708/// Contract test: Events from multiple streams are read in global append order
709pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
710where
711    F: Fn() -> S + Send + Sync + Clone + 'static,
712    S: EventStore + EventReader + Send + Sync + 'static,
713{
714    const SCENARIO: &str = "event_ordering_across_streams";
715
716    let store = make_store();
717
718    // Given: Three streams with events appended in specific order
719    let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
720    let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
721    let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
722
723    // Append event to stream A
724    let writes = register_contract_stream(
725        SCENARIO,
726        StreamWrites::new(),
727        &stream_a,
728        StreamVersion::new(0),
729    )?;
730    let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
731    let _ = store
732        .append_events(writes)
733        .await
734        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
735
736    // Append event to stream B
737    let writes = register_contract_stream(
738        SCENARIO,
739        StreamWrites::new(),
740        &stream_b,
741        StreamVersion::new(0),
742    )?;
743    let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
744    let _ = store
745        .append_events(writes)
746        .await
747        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
748
749    // Append event to stream C
750    let writes = register_contract_stream(
751        SCENARIO,
752        StreamWrites::new(),
753        &stream_c,
754        StreamVersion::new(0),
755    )?;
756    let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
757    let _ = store
758        .append_events(writes)
759        .await
760        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
761
762    // When: Reading all events via EventReader with no position filter
763    let filter = EventFilter::all();
764    let page = EventPage::first(BatchSize::new(100));
765    let events = store
766        .read_events::<ContractTestEvent>(filter, page)
767        .await
768        .map_err(|_error| {
769            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
770        })?;
771
772    // Then: Events are returned in global append order (A, B, C)
773    if events.len() != 3 {
774        return Err(ContractTestFailure::assertion(
775            SCENARIO,
776            format!("expected 3 events but got {}", events.len()),
777        ));
778    }
779
780    // And: Verify complete ordering across all three streams
781    let (first_event, _) = &events[0];
782    if first_event.stream_id() != &stream_a {
783        return Err(ContractTestFailure::assertion(
784            SCENARIO,
785            format!(
786                "expected first event from stream_a but got from {:?}",
787                first_event.stream_id()
788            ),
789        ));
790    }
791
792    let (second_event, _) = &events[1];
793    if second_event.stream_id() != &stream_b {
794        return Err(ContractTestFailure::assertion(
795            SCENARIO,
796            format!(
797                "expected second event from stream_b but got from {:?}",
798                second_event.stream_id()
799            ),
800        ));
801    }
802
803    let (third_event, _) = &events[2];
804    if third_event.stream_id() != &stream_c {
805        return Err(ContractTestFailure::assertion(
806            SCENARIO,
807            format!(
808                "expected third event from stream_c but got from {:?}",
809                third_event.stream_id()
810            ),
811        ));
812    }
813
814    Ok(())
815}
816
817/// Contract test: Position-based resumption works correctly
818pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
819where
820    F: Fn() -> S + Send + Sync + Clone + 'static,
821    S: EventStore + EventReader + Send + Sync + 'static,
822{
823    const SCENARIO: &str = "position_based_resumption";
824
825    let store = make_store();
826
827    // Given: Events at positions 0, 1, 2, 3, 4 (5 events total)
828    let stream = contract_stream_id(SCENARIO, "stream")?;
829
830    let mut writes = register_contract_stream(
831        SCENARIO,
832        StreamWrites::new(),
833        &stream,
834        StreamVersion::new(0),
835    )?;
836
837    // Append 5 events
838    for _ in 0..5 {
839        writes = append_contract_event(SCENARIO, writes, &stream)?;
840    }
841
842    let _ = store
843        .append_events(writes)
844        .await
845        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
846
847    // Get position of third event (index 2, position 2)
848    let filter = EventFilter::all();
849    let page = EventPage::first(BatchSize::new(100));
850    let all_events = store
851        .read_events::<ContractTestEvent>(filter.clone(), page)
852        .await
853        .map_err(|_error| {
854            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
855        })?;
856
857    let (_third_event, third_position) = &all_events[2];
858
859    // When: Reading events after position 2
860    let page_after = EventPage::after(*third_position, BatchSize::new(100));
861    let events_after = store
862        .read_events::<ContractTestEvent>(filter, page_after)
863        .await
864        .map_err(|_error| {
865            ContractTestFailure::assertion(
866                SCENARIO,
867                "read_events failed when reading after position",
868            )
869        })?;
870
871    // Then: Only events at positions 3 and 4 are returned (2 events)
872    if events_after.len() != 2 {
873        return Err(ContractTestFailure::assertion(
874            SCENARIO,
875            format!(
876                "expected 2 events after position {} but got {}",
877                third_position,
878                events_after.len()
879            ),
880        ));
881    }
882
883    // And: Position 2 event is NOT included (verify exclusivity)
884    for (_event, position) in events_after.iter() {
885        if *position == *third_position {
886            return Err(ContractTestFailure::assertion(
887                SCENARIO,
888                format!(
889                    "expected position {} to be excluded but it was included in results",
890                    third_position
891                ),
892            ));
893        }
894    }
895
896    // And: Returned event positions are greater than third_position and in ascending order
897    let (_event1, pos1) = &events_after[0];
898    let (_event2, pos2) = &events_after[1];
899
900    if *pos1 <= *third_position {
901        return Err(ContractTestFailure::assertion(
902            SCENARIO,
903            format!(
904                "expected first returned position to be > {} but got {}",
905                third_position, pos1
906            ),
907        ));
908    }
909
910    if *pos2 <= *pos1 {
911        return Err(ContractTestFailure::assertion(
912            SCENARIO,
913            format!(
914                "expected positions to be in ascending order but {} <= {}",
915                pos2, pos1
916            ),
917        ));
918    }
919
920    Ok(())
921}
922
923/// Contract test: Stream prefix filtering returns only matching streams
924pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
925where
926    F: Fn() -> S + Send + Sync + Clone + 'static,
927    S: EventStore + EventReader + Send + Sync + 'static,
928{
929    const SCENARIO: &str = "stream_prefix_filtering";
930
931    let store = make_store();
932
933    // Given: Events on streams with IDs that actually start with "account-" or "order-"
934    let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
935        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
936    })?;
937    let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
938        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
939    })?;
940    let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
941        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
942    })?;
943
944    let mut writes = register_contract_stream(
945        SCENARIO,
946        StreamWrites::new(),
947        &account_1,
948        StreamVersion::new(0),
949    )?;
950    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
951    writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
952
953    writes = append_contract_event(SCENARIO, writes, &account_1)?;
954    writes = append_contract_event(SCENARIO, writes, &account_2)?;
955    writes = append_contract_event(SCENARIO, writes, &order_1)?;
956
957    let _ = store
958        .append_events(writes)
959        .await
960        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
961
962    // When: Reading with prefix filter "account-"
963    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
964        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
965    })?;
966    let filter = EventFilter::prefix(prefix);
967    let page = EventPage::first(BatchSize::new(100));
968    let events = store
969        .read_events::<ContractTestEvent>(filter, page)
970        .await
971        .map_err(|_error| {
972            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
973        })?;
974
975    // Then: Only events from account-1 and account-2 are returned
976    if events.len() != 2 {
977        return Err(ContractTestFailure::assertion(
978            SCENARIO,
979            format!(
980                "expected 2 events from account-* streams but got {}",
981                events.len()
982            ),
983        ));
984    }
985
986    // And: All events are from streams starting with "account-"
987    for (event, _) in events.iter() {
988        let stream_id_str = event.stream_id().as_ref();
989        if !stream_id_str.starts_with("account-") {
990            return Err(ContractTestFailure::assertion(
991                SCENARIO,
992                format!(
993                    "expected all events from streams starting with 'account-' but found event from {}",
994                    stream_id_str
995                ),
996            ));
997        }
998    }
999
1000    // And: order-1 events are filtered out (verified by length check above)
1001
1002    Ok(())
1003}
1004
1005/// Contract test: Stream prefix filtering requires true prefix match (not substring match)
1006pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
1007where
1008    F: Fn() -> S + Send + Sync + Clone + 'static,
1009    S: EventStore + EventReader + Send + Sync + 'static,
1010{
1011    const SCENARIO: &str = "stream_prefix_requires_prefix_match";
1012
1013    let store = make_store();
1014
1015    // Given: Three streams with actual prefixes: "account-123", "my-account-456", "order-789"
1016    // We want to verify that prefix "account-" matches ONLY "account-123", not "my-account-456"
1017    let account_stream =
1018        StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
1019            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1020        })?;
1021    let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
1022        .map_err(|e| {
1023            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1024        })?;
1025    let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
1026        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1027    })?;
1028
1029    let mut writes = register_contract_stream(
1030        SCENARIO,
1031        StreamWrites::new(),
1032        &account_stream,
1033        StreamVersion::new(0),
1034    )?;
1035    writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
1036    writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
1037
1038    writes = append_contract_event(SCENARIO, writes, &account_stream)?;
1039    writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
1040    writes = append_contract_event(SCENARIO, writes, &order_stream)?;
1041
1042    let _ = store
1043        .append_events(writes)
1044        .await
1045        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1046
1047    // When: Reading with prefix filter "account-"
1048    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
1049        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
1050    })?;
1051    let filter = EventFilter::prefix(prefix);
1052    let page = EventPage::first(BatchSize::new(100));
1053    let events = store
1054        .read_events::<ContractTestEvent>(filter, page)
1055        .await
1056        .map_err(|_error| {
1057            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
1058        })?;
1059
1060    // Then: ONLY "account-123" stream should be returned (not "my-account-456")
1061    if events.len() != 1 {
1062        return Err(ContractTestFailure::assertion(
1063            SCENARIO,
1064            format!(
1065                "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
1066                events.len()
1067            ),
1068        ));
1069    }
1070
1071    // And: The event must be from a stream starting with "account-123"
1072    let (event, _) = &events[0];
1073    let stream_id_str = event.stream_id().as_ref();
1074    if !stream_id_str.starts_with("account-123") {
1075        return Err(ContractTestFailure::assertion(
1076            SCENARIO,
1077            format!(
1078                "expected event from stream starting with 'account-123' but got from {}",
1079                stream_id_str
1080            ),
1081        ));
1082    }
1083
1084    // And: Verify it's NOT from my-account-456 (proves we're not doing substring matching)
1085    if stream_id_str.starts_with("my-account-456") {
1086        return Err(ContractTestFailure::assertion(
1087            SCENARIO,
1088            "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
1089        ));
1090    }
1091
1092    Ok(())
1093}
1094
1095/// Contract test: Glob `*` wildcard pattern filtering selects only matching streams.
1096///
1097/// Per ADR-0047, `EventFilter::pattern` matches stream IDs against a POSIX glob
1098/// pattern. The wildcard `*` matches any sequence of characters (including the
1099/// stream separator `/`). This test proves the filter is applied at the query
1100/// level: it appends enough non-matching events to cross the pagination `LIMIT`
1101/// boundary before any matching events, so an implementation that applied `LIMIT`
1102/// before the pattern filter would return zero matches.
1103pub async fn test_stream_pattern_filtering<F, S>(make_store: F) -> ContractTestResult
1104where
1105    F: Fn() -> S + Send + Sync + Clone + 'static,
1106    S: EventStore + EventReader + Send + Sync + 'static,
1107{
1108    const SCENARIO: &str = "stream_pattern_filtering";
1109
1110    let store = make_store();
1111    let run = Uuid::now_v7();
1112
1113    // Given: many non-matching "order-*" streams followed by two matching
1114    // "account-*" streams. The non-matching events alone exceed the page limit.
1115    let mut writes = StreamWrites::new();
1116    let mut order_streams = Vec::new();
1117    for _ in 0..50 {
1118        let order = StreamId::try_new(format!("order-{}", Uuid::now_v7())).map_err(|e| {
1119            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1120        })?;
1121        writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1122        order_streams.push(order);
1123    }
1124
1125    let account_1 = StreamId::try_new(format!("account-1-{run}")).map_err(|e| {
1126        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1127    })?;
1128    let account_2 = StreamId::try_new(format!("account-2-{run}")).map_err(|e| {
1129        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1130    })?;
1131    writes = register_contract_stream(SCENARIO, writes, &account_1, StreamVersion::new(0))?;
1132    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
1133
1134    for order in &order_streams {
1135        writes = append_contract_event(SCENARIO, writes, order)?;
1136    }
1137    writes = append_contract_event(SCENARIO, writes, &account_1)?;
1138    writes = append_contract_event(SCENARIO, writes, &account_2)?;
1139
1140    let _ = store
1141        .append_events(writes)
1142        .await
1143        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1144
1145    // When: Reading with glob pattern "account-*" and a page limit smaller than
1146    // the number of non-matching events that precede the matches.
1147    let pattern = StreamPattern::try_new("account-*").map_err(|e| {
1148        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1149    })?;
1150    let filter = EventFilter::pattern(pattern);
1151    let page = EventPage::first(BatchSize::new(10));
1152    let events = store
1153        .read_events::<ContractTestEvent>(filter, page)
1154        .await
1155        .map_err(|_error| {
1156            ContractTestFailure::assertion(
1157                SCENARIO,
1158                "read_events failed with stream pattern filter",
1159            )
1160        })?;
1161
1162    // Then: exactly the two "account-*" events are returned. If the limit were
1163    // applied before filtering, this would be zero.
1164    if events.len() != 2 {
1165        return Err(ContractTestFailure::assertion(
1166            SCENARIO,
1167            format!(
1168                "expected exactly 2 events matching glob 'account-*' but got {} (pattern filter must be applied before the page limit)",
1169                events.len()
1170            ),
1171        ));
1172    }
1173
1174    for (event, _) in events.iter() {
1175        let stream_id_str = event.stream_id().as_ref();
1176        if !stream_id_str.starts_with("account-") {
1177            return Err(ContractTestFailure::assertion(
1178                SCENARIO,
1179                format!(
1180                    "expected all events from streams matching 'account-*' but found event from {}",
1181                    stream_id_str
1182                ),
1183            ));
1184        }
1185    }
1186
1187    Ok(())
1188}
1189
1190/// Contract test: Glob `?` matches exactly one character.
1191///
1192/// Per ADR-0047, `?` matches a single arbitrary character. Pattern `account-?`
1193/// must match `account-1` but not `account-12` (two trailing characters) and
1194/// not `order-1`.
1195pub async fn test_stream_pattern_single_char<F, S>(make_store: F) -> ContractTestResult
1196where
1197    F: Fn() -> S + Send + Sync + Clone + 'static,
1198    S: EventStore + EventReader + Send + Sync + 'static,
1199{
1200    const SCENARIO: &str = "stream_pattern_single_char";
1201
1202    let store = make_store();
1203    let run = Uuid::now_v7();
1204
1205    // Given: one single-char-suffix stream, one two-char-suffix stream, one order.
1206    let account_x = StreamId::try_new(format!("account-x{run}")).map_err(|e| {
1207        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1208    })?;
1209    let account_xy = StreamId::try_new(format!("account-xy{run}")).map_err(|e| {
1210        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1211    })?;
1212    let order = StreamId::try_new(format!("order-z{run}")).map_err(|e| {
1213        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1214    })?;
1215
1216    let mut writes = register_contract_stream(
1217        SCENARIO,
1218        StreamWrites::new(),
1219        &account_x,
1220        StreamVersion::new(0),
1221    )?;
1222    writes = register_contract_stream(SCENARIO, writes, &account_xy, StreamVersion::new(0))?;
1223    writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1224
1225    writes = append_contract_event(SCENARIO, writes, &account_x)?;
1226    writes = append_contract_event(SCENARIO, writes, &account_xy)?;
1227    writes = append_contract_event(SCENARIO, writes, &order)?;
1228
1229    let _ = store
1230        .append_events(writes)
1231        .await
1232        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1233
1234    // When: Reading with glob pattern "account-?{run}" (single char between
1235    // "account-" and the run suffix).
1236    let pattern = StreamPattern::try_new(format!("account-?{run}")).map_err(|e| {
1237        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1238    })?;
1239    let filter = EventFilter::pattern(pattern);
1240    let page = EventPage::first(BatchSize::new(100));
1241    let events = store
1242        .read_events::<ContractTestEvent>(filter, page)
1243        .await
1244        .map_err(|_error| {
1245            ContractTestFailure::assertion(
1246                SCENARIO,
1247                "read_events failed with stream pattern filter",
1248            )
1249        })?;
1250
1251    // Then: only "account-x{run}" matches (single char). "account-xy{run}" has
1252    // two chars; "order-z{run}" has the wrong prefix.
1253    if events.len() != 1 {
1254        return Err(ContractTestFailure::assertion(
1255            SCENARIO,
1256            format!(
1257                "expected exactly 1 event matching glob 'account-?{run}' but got {}",
1258                events.len()
1259            ),
1260        ));
1261    }
1262
1263    let (event, _) = &events[0];
1264    let stream_id_str = event.stream_id().as_ref();
1265    if stream_id_str != account_x.as_ref() {
1266        return Err(ContractTestFailure::assertion(
1267            SCENARIO,
1268            format!(
1269                "expected the single-character match {} but got {}",
1270                account_x.as_ref(),
1271                stream_id_str
1272            ),
1273        ));
1274    }
1275
1276    Ok(())
1277}
1278
1279/// Contract test: Glob `[0-9]` character class matches a single digit.
1280///
1281/// Per ADR-0047, a bracketed character class `[...]` matches one character from
1282/// the set. Pattern `account-[0-9]*` must match streams whose suffix begins with
1283/// a digit and reject those beginning with a non-digit.
1284pub async fn test_stream_pattern_char_class<F, S>(make_store: F) -> ContractTestResult
1285where
1286    F: Fn() -> S + Send + Sync + Clone + 'static,
1287    S: EventStore + EventReader + Send + Sync + 'static,
1288{
1289    const SCENARIO: &str = "stream_pattern_char_class";
1290
1291    let store = make_store();
1292    let run = Uuid::now_v7().simple().to_string();
1293
1294    // Given: a digit-prefixed account stream and a letter-prefixed account stream.
1295    let digit_stream = StreamId::try_new(format!("account-7-{run}")).map_err(|e| {
1296        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1297    })?;
1298    let letter_stream = StreamId::try_new(format!("account-a-{run}")).map_err(|e| {
1299        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1300    })?;
1301
1302    let mut writes = register_contract_stream(
1303        SCENARIO,
1304        StreamWrites::new(),
1305        &digit_stream,
1306        StreamVersion::new(0),
1307    )?;
1308    writes = register_contract_stream(SCENARIO, writes, &letter_stream, StreamVersion::new(0))?;
1309
1310    writes = append_contract_event(SCENARIO, writes, &digit_stream)?;
1311    writes = append_contract_event(SCENARIO, writes, &letter_stream)?;
1312
1313    let _ = store
1314        .append_events(writes)
1315        .await
1316        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1317
1318    // When: Reading with glob pattern "account-[0-9]*".
1319    let pattern = StreamPattern::try_new("account-[0-9]*").map_err(|e| {
1320        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1321    })?;
1322    let filter = EventFilter::pattern(pattern);
1323    let page = EventPage::first(BatchSize::new(100));
1324    let events = store
1325        .read_events::<ContractTestEvent>(filter, page)
1326        .await
1327        .map_err(|_error| {
1328            ContractTestFailure::assertion(
1329                SCENARIO,
1330                "read_events failed with stream pattern filter",
1331            )
1332        })?;
1333
1334    // Then: only the digit-prefixed stream matches.
1335    if events.len() != 1 {
1336        return Err(ContractTestFailure::assertion(
1337            SCENARIO,
1338            format!(
1339                "expected exactly 1 event matching glob 'account-[0-9]*' but got {}",
1340                events.len()
1341            ),
1342        ));
1343    }
1344
1345    let (event, _) = &events[0];
1346    let stream_id_str = event.stream_id().as_ref();
1347    if stream_id_str != digit_stream.as_ref() {
1348        return Err(ContractTestFailure::assertion(
1349            SCENARIO,
1350            format!(
1351                "expected the digit-prefixed match {} but got {}",
1352                digit_stream.as_ref(),
1353                stream_id_str
1354            ),
1355        ));
1356    }
1357
1358    Ok(())
1359}
1360
1361/// Contract test: Batch limiting returns exactly the specified number of events
1362pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
1363where
1364    F: Fn() -> S + Send + Sync + Clone + 'static,
1365    S: EventStore + EventReader + Send + Sync + 'static,
1366{
1367    const SCENARIO: &str = "batch_limiting";
1368
1369    let store = make_store();
1370
1371    // Given: 20 events in the store
1372    let stream = contract_stream_id(SCENARIO, "stream")?;
1373
1374    let mut writes = register_contract_stream(
1375        SCENARIO,
1376        StreamWrites::new(),
1377        &stream,
1378        StreamVersion::new(0),
1379    )?;
1380
1381    // Append 20 events
1382    for _ in 0..20 {
1383        writes = append_contract_event(SCENARIO, writes, &stream)?;
1384    }
1385
1386    let _ = store
1387        .append_events(writes)
1388        .await
1389        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1390
1391    // When: Read events with limit of 10
1392    let filter = EventFilter::all();
1393    let page = EventPage::first(BatchSize::new(10));
1394    let events = store
1395        .read_events::<ContractTestEvent>(filter, page)
1396        .await
1397        .map_err(|_error| {
1398            ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1399        })?;
1400
1401    // Then: Exactly 10 events are returned
1402    if events.len() != 10 {
1403        return Err(ContractTestFailure::assertion(
1404            SCENARIO,
1405            format!("expected exactly 10 events but got {}", events.len()),
1406        ));
1407    }
1408
1409    // And: Events are the FIRST 10 in global order
1410    // (We verify this by checking we got exactly 10 events - the implementation
1411    // must return events in order, so if we got 10 events they must be the first 10)
1412
1413    Ok(())
1414}
1415
1416// ============================================================================
1417// CheckpointStore Contract Tests
1418// ============================================================================
1419
1420/// Contract test: Save a checkpoint and load it back
1421pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1422where
1423    F: Fn() -> CS + Send + Sync + Clone + 'static,
1424    CS: CheckpointStore + Send + Sync + 'static,
1425{
1426    const SCENARIO: &str = "checkpoint_save_and_load";
1427
1428    let store = make_checkpoint_store();
1429
1430    // Given: A subscription name and position
1431    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1432    let position = StreamPosition::new(Uuid::now_v7());
1433
1434    // When: Saving the checkpoint
1435    store
1436        .save(&subscription_name, position)
1437        .await
1438        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1439
1440    // Then: Loading returns the saved position
1441    let loaded = store
1442        .load(&subscription_name)
1443        .await
1444        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1445
1446    if loaded != Some(position) {
1447        return Err(ContractTestFailure::assertion(
1448            SCENARIO,
1449            format!(
1450                "expected loaded position {:?} but got {:?}",
1451                Some(position),
1452                loaded
1453            ),
1454        ));
1455    }
1456
1457    Ok(())
1458}
1459
1460/// Contract test: Saving a checkpoint overwrites the previous value
1461pub async fn test_checkpoint_update_overwrites<F, CS>(
1462    make_checkpoint_store: F,
1463) -> ContractTestResult
1464where
1465    F: Fn() -> CS + Send + Sync + Clone + 'static,
1466    CS: CheckpointStore + Send + Sync + 'static,
1467{
1468    const SCENARIO: &str = "checkpoint_update_overwrites";
1469
1470    let store = make_checkpoint_store();
1471
1472    // Given: A subscription with an initial checkpoint
1473    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1474    let first_position = StreamPosition::new(Uuid::now_v7());
1475
1476    store
1477        .save(&subscription_name, first_position)
1478        .await
1479        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1480
1481    // When: Saving a new position
1482    let second_position = StreamPosition::new(Uuid::now_v7());
1483    store
1484        .save(&subscription_name, second_position)
1485        .await
1486        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1487
1488    // Then: Loading returns the new position, not the old one
1489    let loaded = store
1490        .load(&subscription_name)
1491        .await
1492        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1493
1494    if loaded != Some(second_position) {
1495        return Err(ContractTestFailure::assertion(
1496            SCENARIO,
1497            format!(
1498                "expected updated position {:?} but got {:?}",
1499                Some(second_position),
1500                loaded
1501            ),
1502        ));
1503    }
1504
1505    Ok(())
1506}
1507
1508/// Contract test: Loading a non-existent checkpoint returns None
1509pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1510    make_checkpoint_store: F,
1511) -> ContractTestResult
1512where
1513    F: Fn() -> CS + Send + Sync + Clone + 'static,
1514    CS: CheckpointStore + Send + Sync + 'static,
1515{
1516    const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1517
1518    let store = make_checkpoint_store();
1519
1520    // Given: A subscription name that has never been saved
1521    let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1522
1523    // When: Loading the checkpoint
1524    let loaded = store
1525        .load(&subscription_name)
1526        .await
1527        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1528
1529    // Then: None is returned
1530    if loaded.is_some() {
1531        return Err(ContractTestFailure::assertion(
1532            SCENARIO,
1533            format!("expected None for missing checkpoint but got {:?}", loaded),
1534        ));
1535    }
1536
1537    Ok(())
1538}
1539
1540/// Contract test: Different subscription names have independent checkpoints
1541pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1542    make_checkpoint_store: F,
1543) -> ContractTestResult
1544where
1545    F: Fn() -> CS + Send + Sync + Clone + 'static,
1546    CS: CheckpointStore + Send + Sync + 'static,
1547{
1548    const SCENARIO: &str = "checkpoint_independent_subscriptions";
1549
1550    let store = make_checkpoint_store();
1551
1552    // Given: Two subscription names
1553    let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1554    let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1555
1556    let position_a = StreamPosition::new(Uuid::now_v7());
1557    let position_b = StreamPosition::new(Uuid::now_v7());
1558
1559    // When: Saving different positions for each
1560    store
1561        .save(&subscription_a, position_a)
1562        .await
1563        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1564
1565    store
1566        .save(&subscription_b, position_b)
1567        .await
1568        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1569
1570    // Then: Each subscription loads its own position
1571    let loaded_a = store
1572        .load(&subscription_a)
1573        .await
1574        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1575
1576    let loaded_b = store
1577        .load(&subscription_b)
1578        .await
1579        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1580
1581    if loaded_a != Some(position_a) {
1582        return Err(ContractTestFailure::assertion(
1583            SCENARIO,
1584            format!(
1585                "subscription A: expected {:?} but got {:?}",
1586                Some(position_a),
1587                loaded_a
1588            ),
1589        ));
1590    }
1591
1592    if loaded_b != Some(position_b) {
1593        return Err(ContractTestFailure::assertion(
1594            SCENARIO,
1595            format!(
1596                "subscription B: expected {:?} but got {:?}",
1597                Some(position_b),
1598                loaded_b
1599            ),
1600        ));
1601    }
1602
1603    Ok(())
1604}
1605
1606// ============================================================================
1607// ProjectorCoordinator Contract Tests
1608// ============================================================================
1609
1610/// Contract test: First instance can acquire leadership successfully
1611///
1612/// Observable behavior: When no other instance holds leadership for a subscription,
1613/// calling try_acquire returns a guard indicating successful acquisition.
1614pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1615where
1616    F: Fn() -> C + Send + Sync + Clone + 'static,
1617    C: ProjectorCoordinator + Send + Sync + 'static,
1618{
1619    const SCENARIO: &str = "coordination_acquire_leadership";
1620
1621    let coordinator = make_coordinator();
1622
1623    // Given: A unique subscription name (no existing leadership)
1624    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1625
1626    // When: Attempting to acquire leadership
1627    let result = coordinator.try_acquire(&subscription_name).await;
1628
1629    // Then: Acquisition succeeds (returns Ok with guard)
1630    if result.is_err() {
1631        return Err(ContractTestFailure::assertion(
1632            SCENARIO,
1633            "expected first instance to acquire leadership successfully, but try_acquire failed",
1634        ));
1635    }
1636
1637    Ok(())
1638}
1639
1640/// Contract test: Second instance returns error when leadership unavailable
1641///
1642/// Observable behavior: When one instance holds leadership for a subscription,
1643/// a second attempt to acquire leadership for the same subscription returns an error.
1644pub async fn test_coordination_second_instance_blocked<F, C>(
1645    make_coordinator: F,
1646) -> ContractTestResult
1647where
1648    F: Fn() -> C + Send + Sync + Clone + 'static,
1649    C: ProjectorCoordinator + Send + Sync + 'static,
1650{
1651    const SCENARIO: &str = "coordination_second_instance_blocked";
1652
1653    let coordinator = make_coordinator();
1654
1655    // Given: A unique subscription name
1656    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1657
1658    // And: First instance acquires leadership
1659    let _first_guard = coordinator
1660        .try_acquire(&subscription_name)
1661        .await
1662        .map_err(|_| {
1663            ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1664        })?;
1665
1666    // When: Second instance attempts to acquire leadership while first holds it
1667    let second_result = coordinator.try_acquire(&subscription_name).await;
1668
1669    // Then: Second attempt returns an error (leadership unavailable)
1670    if second_result.is_ok() {
1671        return Err(ContractTestFailure::assertion(
1672            SCENARIO,
1673            "expected second instance to be blocked but try_acquire succeeded",
1674        ));
1675    }
1676
1677    Ok(())
1678}
1679
1680/// Contract test: Different projectors have independent coordination (different lock keys)
1681///
1682/// Observable behavior: Leadership for one subscription does not block leadership
1683/// acquisition for a different subscription. Each subscription/projector has its own
1684/// independent coordination scope.
1685pub async fn test_coordination_independent_subscriptions<F, C>(
1686    make_coordinator: F,
1687) -> ContractTestResult
1688where
1689    F: Fn() -> C + Send + Sync + Clone + 'static,
1690    C: ProjectorCoordinator + Send + Sync + 'static,
1691{
1692    const SCENARIO: &str = "coordination_independent_subscriptions";
1693
1694    let coordinator = make_coordinator();
1695
1696    // Given: Two unique subscription names (different projectors)
1697    let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1698    let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1699
1700    // And: First projector acquires leadership for subscription A
1701    let _guard_a = coordinator
1702        .try_acquire(&subscription_a)
1703        .await
1704        .map_err(|_| {
1705            ContractTestFailure::assertion(
1706                SCENARIO,
1707                "projector-A failed to acquire leadership for its subscription",
1708            )
1709        })?;
1710
1711    // When: Second projector attempts to acquire leadership for subscription B
1712    // (while first projector still holds leadership for A)
1713    let result_b = coordinator.try_acquire(&subscription_b).await;
1714
1715    // Then: Second acquisition succeeds (different subscriptions are independent)
1716    if result_b.is_err() {
1717        return Err(ContractTestFailure::assertion(
1718            SCENARIO,
1719            "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1720        ));
1721    }
1722
1723    Ok(())
1724}
1725
1726/// Contract test: Leadership is released when guard is dropped (crash/disconnect recovery)
1727///
1728/// Observable behavior: When an instance holding leadership drops its guard (simulating
1729/// process exit, crash, or connection close), the lock is released and another instance
1730/// can acquire leadership for the same subscription.
1731pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1732    make_coordinator: F,
1733) -> ContractTestResult
1734where
1735    F: Fn() -> C + Send + Sync + Clone + 'static,
1736    C: ProjectorCoordinator + Send + Sync + 'static,
1737{
1738    const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1739
1740    let coordinator = make_coordinator();
1741
1742    // Given: A unique subscription name
1743    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1744
1745    // And: First instance acquires leadership, then drops the guard
1746    {
1747        let _first_guard = coordinator
1748            .try_acquire(&subscription_name)
1749            .await
1750            .map_err(|_| {
1751                ContractTestFailure::assertion(
1752                    SCENARIO,
1753                    "first instance failed to acquire leadership",
1754                )
1755            })?;
1756        // Guard is dropped here when scope ends (simulates process exit/crash)
1757    }
1758
1759    // When: Second instance attempts to acquire leadership after first guard dropped
1760    let second_result = coordinator.try_acquire(&subscription_name).await;
1761
1762    // Then: Second acquisition succeeds (leadership was released)
1763    if second_result.is_err() {
1764        return Err(ContractTestFailure::assertion(
1765            SCENARIO,
1766            "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1767        ));
1768    }
1769
1770    Ok(())
1771}