Skip to main content

eventcore_testing/
contract.rs

1use eventcore_types::{
2    BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3    EventStoreError, ProjectorCoordinator, StreamId, StreamPattern, StreamPosition, StreamPrefix,
4    StreamVersion, StreamWrites, collect_events,
5};
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug, thiserror::Error)]
11#[error("[{scenario}] {detail}")]
12pub struct ContractTestFailure {
13    scenario: &'static str,
14    detail: String,
15}
16
17impl ContractTestFailure {
18    fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19        Self {
20            scenario,
21            detail: detail.into(),
22        }
23    }
24
25    fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26        Self::new(scenario, format!("builder failure during {phase}: {error}"))
27    }
28
29    fn store_error(
30        scenario: &'static str,
31        operation: &'static str,
32        error: EventStoreError,
33    ) -> Self {
34        Self::new(
35            scenario,
36            format!("{operation} operation returned unexpected error: {error}"),
37        )
38    }
39
40    fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41        Self::new(scenario, detail)
42    }
43}
44
45pub type ContractTestResult = Result<(), ContractTestFailure>;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContractTestEvent {
49    stream_id: StreamId,
50}
51
52impl ContractTestEvent {
53    pub fn new(stream_id: StreamId) -> Self {
54        Self { stream_id }
55    }
56}
57
58impl Event for ContractTestEvent {
59    fn stream_id(&self) -> &StreamId {
60        &self.stream_id
61    }
62
63    fn event_type_name() -> &'static str {
64        "ContractTestEvent"
65    }
66}
67
68fn contract_stream_id(
69    scenario: &'static str,
70    label: &str,
71) -> Result<StreamId, ContractTestFailure> {
72    // Include UUID for parallel test execution against shared database
73    let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
74
75    StreamId::try_new(raw.clone()).map_err(|error| {
76        ContractTestFailure::assertion(
77            scenario,
78            format!("unable to construct stream id `{}`: {}", raw, error),
79        )
80    })
81}
82
83fn builder_step(
84    scenario: &'static str,
85    phase: &'static str,
86    result: Result<StreamWrites, EventStoreError>,
87) -> Result<StreamWrites, ContractTestFailure> {
88    result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
89}
90
91fn register_contract_stream(
92    scenario: &'static str,
93    writes: StreamWrites,
94    stream_id: &StreamId,
95    expected_version: StreamVersion,
96) -> Result<StreamWrites, ContractTestFailure> {
97    builder_step(
98        scenario,
99        "register_stream",
100        writes.register_stream(stream_id.clone(), expected_version),
101    )
102}
103
104fn append_contract_event(
105    scenario: &'static str,
106    writes: StreamWrites,
107    stream_id: &StreamId,
108) -> Result<StreamWrites, ContractTestFailure> {
109    let event = ContractTestEvent::new(stream_id.clone());
110    builder_step(scenario, "append", writes.append(event))
111}
112
113pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
114where
115    F: Fn() -> S + Send + Sync + Clone + 'static,
116    S: EventStore + Send + Sync + 'static,
117{
118    const SCENARIO: &str = "basic_read_write";
119
120    let store = make_store();
121    let stream_id = contract_stream_id(SCENARIO, "single");
122
123    let stream_id = stream_id?;
124
125    let writes = register_contract_stream(
126        SCENARIO,
127        StreamWrites::new(),
128        &stream_id,
129        StreamVersion::new(0),
130    )?;
131    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
132
133    let _ = store
134        .append_events(writes)
135        .await
136        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
137
138    let stream = store
139        .read_stream::<ContractTestEvent>(stream_id.clone())
140        .await
141        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
142    let events = collect_events(stream)
143        .await
144        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146    let len = events.len();
147    let empty = events.is_empty();
148
149    if empty {
150        return Err(ContractTestFailure::assertion(
151            SCENARIO,
152            "expected stream to contain events but it was empty",
153        ));
154    }
155
156    if len != 1 {
157        return Err(ContractTestFailure::assertion(
158            SCENARIO,
159            format!(
160                "expected stream to contain exactly one event, observed len={}",
161                len
162            ),
163        ));
164    }
165
166    Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171    F: Fn() -> S + Send + Sync + Clone + 'static,
172    S: EventStore + Send + Sync + 'static,
173{
174    const SCENARIO: &str = "concurrent_version_conflicts";
175
176    let store = make_store();
177    let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179    let first_writes = register_contract_stream(
180        SCENARIO,
181        StreamWrites::new(),
182        &stream_id,
183        StreamVersion::new(0),
184    )?;
185    let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187    let _ = store
188        .append_events(first_writes)
189        .await
190        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192    let conflicting_writes = register_contract_stream(
193        SCENARIO,
194        StreamWrites::new(),
195        &stream_id,
196        StreamVersion::new(0),
197    )?;
198    let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200    match store.append_events(conflicting_writes).await {
201        Err(EventStoreError::VersionConflict { .. }) => Ok(()),
202        Err(error) => Err(ContractTestFailure::store_error(
203            SCENARIO,
204            "append_events",
205            error,
206        )),
207        Ok(_) => Err(ContractTestFailure::assertion(
208            SCENARIO,
209            "expected version conflict but append succeeded",
210        )),
211    }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216    F: Fn() -> S + Send + Sync + Clone + 'static,
217    S: EventStore + Send + Sync + 'static,
218{
219    const SCENARIO: &str = "stream_isolation";
220
221    let store = make_store();
222    let left_stream = contract_stream_id(SCENARIO, "left")?;
223    let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225    let writes = register_contract_stream(
226        SCENARIO,
227        StreamWrites::new(),
228        &left_stream,
229        StreamVersion::new(0),
230    )?;
231    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235    let _ = store
236        .append_events(writes)
237        .await
238        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240    let left_stream_events = store
241        .read_stream::<ContractTestEvent>(left_stream.clone())
242        .await
243        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244    let left_events = collect_events(left_stream_events)
245        .await
246        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
247
248    let right_stream_events = store
249        .read_stream::<ContractTestEvent>(right_stream.clone())
250        .await
251        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
252    let right_events = collect_events(right_stream_events)
253        .await
254        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
255
256    let left_len = left_events.len();
257    if left_len != 1 {
258        return Err(ContractTestFailure::assertion(
259            SCENARIO,
260            format!(
261                "left stream expected exactly one event but observed {}",
262                left_len
263            ),
264        ));
265    }
266
267    if left_events
268        .iter()
269        .any(|event| event.stream_id() != &left_stream)
270    {
271        return Err(ContractTestFailure::assertion(
272            SCENARIO,
273            "left stream read events belonging to another stream",
274        ));
275    }
276
277    let right_len = right_events.len();
278    if right_len != 1 {
279        return Err(ContractTestFailure::assertion(
280            SCENARIO,
281            format!(
282                "right stream expected exactly one event but observed {}",
283                right_len
284            ),
285        ));
286    }
287
288    if right_events
289        .iter()
290        .any(|event| event.stream_id() != &right_stream)
291    {
292        return Err(ContractTestFailure::assertion(
293            SCENARIO,
294            "right stream read events belonging to another stream",
295        ));
296    }
297
298    Ok(())
299}
300
301pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
302where
303    F: Fn() -> S + Send + Sync + Clone + 'static,
304    S: EventStore + Send + Sync + 'static,
305{
306    const SCENARIO: &str = "missing_stream_reads";
307
308    let store = make_store();
309    let stream_id = contract_stream_id(SCENARIO, "ghost")?;
310
311    let stream = store
312        .read_stream::<ContractTestEvent>(stream_id.clone())
313        .await
314        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
315    let events = collect_events(stream)
316        .await
317        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
318
319    if !events.is_empty() {
320        return Err(ContractTestFailure::assertion(
321            SCENARIO,
322            "expected read_stream to succeed with no events for an untouched stream",
323        ));
324    }
325
326    Ok(())
327}
328
329pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
330where
331    F: Fn() -> S + Send + Sync + Clone + 'static,
332    S: EventStore + Send + Sync + 'static,
333{
334    const SCENARIO: &str = "conflict_preserves_atomicity";
335
336    let store = make_store();
337    let left_stream = contract_stream_id(SCENARIO, "left")?;
338    let right_stream = contract_stream_id(SCENARIO, "right")?;
339
340    // Seed one event per stream so we can introduce a single-stream conflict later.
341    let writes = register_contract_stream(
342        SCENARIO,
343        StreamWrites::new(),
344        &left_stream,
345        StreamVersion::new(0),
346    )?;
347    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
348    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
349    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
350
351    let _ = store
352        .append_events(writes)
353        .await
354        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
355
356    // Build a batch where the left stream has a stale expected version and the right stream is current.
357    let writes = register_contract_stream(
358        SCENARIO,
359        StreamWrites::new(),
360        &left_stream,
361        StreamVersion::new(0),
362    )?;
363    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
364    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
365    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
366
367    match store.append_events(writes).await {
368        Err(EventStoreError::VersionConflict { .. }) => {
369            let left_stream_events = store
370                .read_stream::<ContractTestEvent>(left_stream.clone())
371                .await
372                .map_err(|error| {
373                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
374                })?;
375            let left_events = collect_events(left_stream_events).await.map_err(|error| {
376                ContractTestFailure::store_error(SCENARIO, "read_stream", error)
377            })?;
378            if left_events.len() != 1 {
379                return Err(ContractTestFailure::assertion(
380                    SCENARIO,
381                    format!(
382                        "expected left stream to remain at len=1 after failed append, observed {}",
383                        left_events.len()
384                    ),
385                ));
386            }
387
388            let right_stream_events = store
389                .read_stream::<ContractTestEvent>(right_stream.clone())
390                .await
391                .map_err(|error| {
392                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
393                })?;
394            let right_events = collect_events(right_stream_events).await.map_err(|error| {
395                ContractTestFailure::store_error(SCENARIO, "read_stream", error)
396            })?;
397            if right_events.len() != 1 {
398                return Err(ContractTestFailure::assertion(
399                    SCENARIO,
400                    format!(
401                        "expected right stream to remain at len=1 after failed append, observed {}",
402                        right_events.len()
403                    ),
404                ));
405            }
406
407            Ok(())
408        }
409        Err(error) => Err(ContractTestFailure::store_error(
410            SCENARIO,
411            "append_events",
412            error,
413        )),
414        Ok(_) => Err(ContractTestFailure::assertion(
415            SCENARIO,
416            "expected version conflict but append succeeded",
417        )),
418    }
419}
420
421/// A different event type used to test type mismatch detection in read_stream.
422#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct MismatchedEvent {
424    stream_id: StreamId,
425    extra_field: String,
426}
427
428impl Event for MismatchedEvent {
429    fn stream_id(&self) -> &StreamId {
430        &self.stream_id
431    }
432
433    fn event_type_name() -> &'static str {
434        "MismatchedEvent"
435    }
436}
437
438/// Contract test: read_stream errors when events on the stream don't match
439/// the requested type.
440///
441/// This verifies that all backends behave consistently when a stream contains
442/// events that were written with one type but read with a different type.
443/// The correct behavior is to return `EventStoreError::DeserializationFailed`,
444/// not to silently skip unrecognized events.
445pub async fn test_read_stream_errors_on_type_mismatch<F, S>(make_store: F) -> ContractTestResult
446where
447    F: Fn() -> S + Send + Sync + Clone + 'static,
448    S: EventStore + Send + Sync + 'static,
449{
450    const SCENARIO: &str = "read_stream_errors_on_type_mismatch";
451
452    let store = make_store();
453    let stream_id = contract_stream_id(SCENARIO, "mismatched")?;
454
455    // Write an event using ContractTestEvent
456    let writes = register_contract_stream(
457        SCENARIO,
458        StreamWrites::new(),
459        &stream_id,
460        StreamVersion::new(0),
461    )?;
462    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
463
464    let _ = store
465        .append_events(writes)
466        .await
467        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
468
469    // Read the same stream but request a different event type. Opening the
470    // stream may succeed; the type mismatch surfaces as a per-event decode
471    // failure when the stream is consumed. Either path (an open-time error or
472    // an item-level error during collection) is acceptable, but the stream
473    // MUST NOT silently yield events of the wrong type.
474    let result = match store.read_stream::<MismatchedEvent>(stream_id).await {
475        Ok(stream) => collect_events(stream).await,
476        Err(error) => Err(error),
477    };
478
479    match result {
480        Err(EventStoreError::DeserializationFailed { .. }) => Ok(()),
481        Err(other) => Err(ContractTestFailure::assertion(
482            SCENARIO,
483            format!(
484                "expected DeserializationFailed error, got different error: {}",
485                other
486            ),
487        )),
488        Ok(events) if events.is_empty() => Err(ContractTestFailure::assertion(
489            SCENARIO,
490            "read_stream silently returned empty results instead of erroring on type mismatch",
491        )),
492        Ok(events) => Err(ContractTestFailure::assertion(
493            SCENARIO,
494            format!(
495                "read_stream returned {} events instead of erroring on type mismatch",
496                events.len()
497            ),
498        )),
499    }
500}
501
502// NOTE: The old fragmented macros (event_store_contract_tests!, event_reader_contract_tests!)
503// have been removed. Use backend_contract_tests! which runs ALL contract tests.
504
505/// Unified contract test macro for all backend implementations.
506///
507/// This macro generates ALL contract tests for a backend implementation.
508/// When new contract tests are added to eventcore-testing, they automatically
509/// run for all backends that use this macro—no changes to backend test files required.
510///
511/// # Example
512///
513/// ```ignore
514/// backend_contract_tests! {
515///     suite = my_backend,
516///     make_store = || MyEventStore::new(),
517///     make_checkpoint_store = || MyCheckpointStore::new(),
518///     make_coordinator = || MyProjectorCoordinator::new(),
519/// }
520/// ```
521///
522/// # Requirements
523///
524/// The store type must implement both `EventStore` and `EventReader` traits.
525/// The checkpoint store type must implement `CheckpointStore` trait.
526/// The coordinator type must implement `ProjectorCoordinator` trait.
527#[macro_export]
528macro_rules! backend_contract_tests {
529    (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
530        #[allow(non_snake_case)]
531        mod $suite {
532            use $crate::contract::{
533                test_basic_read_write, test_batch_limiting,
534                test_checkpoint_independent_subscriptions,
535                test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
536                test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
537                test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
538                test_coordination_independent_subscriptions,
539                test_coordination_leadership_released_on_guard_drop,
540                test_coordination_second_instance_blocked, test_event_ordering_across_streams,
541                test_missing_stream_reads, test_position_based_resumption,
542                test_read_stream_errors_on_type_mismatch, test_stream_isolation,
543                test_stream_pattern_char_class, test_stream_pattern_filtering,
544                test_stream_pattern_single_char, test_stream_prefix_filtering,
545                test_stream_prefix_requires_prefix_match,
546            };
547
548            #[tokio::test(flavor = "multi_thread")]
549            async fn basic_read_write_contract() {
550                test_basic_read_write($make_store)
551                    .await
552                    .expect("event store contract failed");
553            }
554
555            #[tokio::test(flavor = "multi_thread")]
556            async fn concurrent_version_conflicts_contract() {
557                test_concurrent_version_conflicts($make_store)
558                    .await
559                    .expect("event store contract failed");
560            }
561
562            #[tokio::test(flavor = "multi_thread")]
563            async fn stream_isolation_contract() {
564                test_stream_isolation($make_store)
565                    .await
566                    .expect("event store contract failed");
567            }
568
569            #[tokio::test(flavor = "multi_thread")]
570            async fn missing_stream_reads_contract() {
571                test_missing_stream_reads($make_store)
572                    .await
573                    .expect("event store contract failed");
574            }
575
576            #[tokio::test(flavor = "multi_thread")]
577            async fn conflict_preserves_atomicity_contract() {
578                test_conflict_preserves_atomicity($make_store)
579                    .await
580                    .expect("event store contract failed");
581            }
582
583            #[tokio::test(flavor = "multi_thread")]
584            async fn read_stream_errors_on_type_mismatch_contract() {
585                test_read_stream_errors_on_type_mismatch($make_store)
586                    .await
587                    .expect("event store contract failed");
588            }
589
590            #[tokio::test(flavor = "multi_thread")]
591            async fn event_ordering_across_streams_contract() {
592                test_event_ordering_across_streams($make_store)
593                    .await
594                    .expect("event reader contract failed");
595            }
596
597            #[tokio::test(flavor = "multi_thread")]
598            async fn position_based_resumption_contract() {
599                test_position_based_resumption($make_store)
600                    .await
601                    .expect("event reader contract failed");
602            }
603
604            #[tokio::test(flavor = "multi_thread")]
605            async fn stream_prefix_filtering_contract() {
606                test_stream_prefix_filtering($make_store)
607                    .await
608                    .expect("event reader contract failed");
609            }
610
611            #[tokio::test(flavor = "multi_thread")]
612            async fn stream_prefix_requires_prefix_match_contract() {
613                test_stream_prefix_requires_prefix_match($make_store)
614                    .await
615                    .expect("event reader contract failed");
616            }
617
618            #[tokio::test(flavor = "multi_thread")]
619            async fn stream_pattern_filtering_contract() {
620                test_stream_pattern_filtering($make_store)
621                    .await
622                    .expect("event reader contract failed");
623            }
624
625            #[tokio::test(flavor = "multi_thread")]
626            async fn stream_pattern_single_char_contract() {
627                test_stream_pattern_single_char($make_store)
628                    .await
629                    .expect("event reader contract failed");
630            }
631
632            #[tokio::test(flavor = "multi_thread")]
633            async fn stream_pattern_char_class_contract() {
634                test_stream_pattern_char_class($make_store)
635                    .await
636                    .expect("event reader contract failed");
637            }
638
639            #[tokio::test(flavor = "multi_thread")]
640            async fn batch_limiting_contract() {
641                test_batch_limiting($make_store)
642                    .await
643                    .expect("event reader contract failed");
644            }
645
646            // CheckpointStore contract tests
647            #[tokio::test(flavor = "multi_thread")]
648            async fn checkpoint_save_and_load_contract() {
649                test_checkpoint_save_and_load($make_checkpoint_store)
650                    .await
651                    .expect("checkpoint store contract failed");
652            }
653
654            #[tokio::test(flavor = "multi_thread")]
655            async fn checkpoint_update_overwrites_contract() {
656                test_checkpoint_update_overwrites($make_checkpoint_store)
657                    .await
658                    .expect("checkpoint store contract failed");
659            }
660
661            #[tokio::test(flavor = "multi_thread")]
662            async fn checkpoint_load_missing_returns_none_contract() {
663                test_checkpoint_load_missing_returns_none($make_checkpoint_store)
664                    .await
665                    .expect("checkpoint store contract failed");
666            }
667
668            #[tokio::test(flavor = "multi_thread")]
669            async fn checkpoint_independent_subscriptions_contract() {
670                test_checkpoint_independent_subscriptions($make_checkpoint_store)
671                    .await
672                    .expect("checkpoint store contract failed");
673            }
674
675            // ProjectorCoordinator contract tests
676            #[tokio::test(flavor = "multi_thread")]
677            async fn coordination_acquire_leadership_contract() {
678                test_coordination_acquire_leadership($make_coordinator)
679                    .await
680                    .expect("coordinator contract failed");
681            }
682
683            #[tokio::test(flavor = "multi_thread")]
684            async fn coordination_second_instance_blocked_contract() {
685                test_coordination_second_instance_blocked($make_coordinator)
686                    .await
687                    .expect("coordinator contract failed");
688            }
689
690            #[tokio::test(flavor = "multi_thread")]
691            async fn coordination_independent_subscriptions_contract() {
692                test_coordination_independent_subscriptions($make_coordinator)
693                    .await
694                    .expect("coordinator contract failed");
695            }
696
697            #[tokio::test(flavor = "multi_thread")]
698            async fn coordination_leadership_released_on_guard_drop_contract() {
699                test_coordination_leadership_released_on_guard_drop($make_coordinator)
700                    .await
701                    .expect("coordinator contract failed");
702            }
703        }
704    };
705}
706
707pub use backend_contract_tests;
708
709/// Contract test: Events from multiple streams are read in global append order
710pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
711where
712    F: Fn() -> S + Send + Sync + Clone + 'static,
713    S: EventStore + EventReader + Send + Sync + 'static,
714{
715    const SCENARIO: &str = "event_ordering_across_streams";
716
717    let store = make_store();
718
719    // Given: Three streams with events appended in specific order
720    let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
721    let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
722    let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
723
724    // Append event to stream A
725    let writes = register_contract_stream(
726        SCENARIO,
727        StreamWrites::new(),
728        &stream_a,
729        StreamVersion::new(0),
730    )?;
731    let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
732    let _ = store
733        .append_events(writes)
734        .await
735        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
736
737    // Append event to stream B
738    let writes = register_contract_stream(
739        SCENARIO,
740        StreamWrites::new(),
741        &stream_b,
742        StreamVersion::new(0),
743    )?;
744    let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
745    let _ = store
746        .append_events(writes)
747        .await
748        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
749
750    // Append event to stream C
751    let writes = register_contract_stream(
752        SCENARIO,
753        StreamWrites::new(),
754        &stream_c,
755        StreamVersion::new(0),
756    )?;
757    let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
758    let _ = store
759        .append_events(writes)
760        .await
761        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
762
763    // When: Reading all events via EventReader with no position filter
764    let filter = EventFilter::all();
765    let page = EventPage::first(BatchSize::new(100));
766    let events = store
767        .read_events::<ContractTestEvent>(filter, page)
768        .await
769        .map_err(|_error| {
770            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
771        })?;
772
773    // Then: Events are returned in global append order (A, B, C)
774    if events.len() != 3 {
775        return Err(ContractTestFailure::assertion(
776            SCENARIO,
777            format!("expected 3 events but got {}", events.len()),
778        ));
779    }
780
781    // And: Verify complete ordering across all three streams
782    let (first_event, _) = &events[0];
783    if first_event.stream_id() != &stream_a {
784        return Err(ContractTestFailure::assertion(
785            SCENARIO,
786            format!(
787                "expected first event from stream_a but got from {:?}",
788                first_event.stream_id()
789            ),
790        ));
791    }
792
793    let (second_event, _) = &events[1];
794    if second_event.stream_id() != &stream_b {
795        return Err(ContractTestFailure::assertion(
796            SCENARIO,
797            format!(
798                "expected second event from stream_b but got from {:?}",
799                second_event.stream_id()
800            ),
801        ));
802    }
803
804    let (third_event, _) = &events[2];
805    if third_event.stream_id() != &stream_c {
806        return Err(ContractTestFailure::assertion(
807            SCENARIO,
808            format!(
809                "expected third event from stream_c but got from {:?}",
810                third_event.stream_id()
811            ),
812        ));
813    }
814
815    Ok(())
816}
817
818/// Contract test: Position-based resumption works correctly
819pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
820where
821    F: Fn() -> S + Send + Sync + Clone + 'static,
822    S: EventStore + EventReader + Send + Sync + 'static,
823{
824    const SCENARIO: &str = "position_based_resumption";
825
826    let store = make_store();
827
828    // Given: Events at positions 0, 1, 2, 3, 4 (5 events total)
829    let stream = contract_stream_id(SCENARIO, "stream")?;
830
831    let mut writes = register_contract_stream(
832        SCENARIO,
833        StreamWrites::new(),
834        &stream,
835        StreamVersion::new(0),
836    )?;
837
838    // Append 5 events
839    for _ in 0..5 {
840        writes = append_contract_event(SCENARIO, writes, &stream)?;
841    }
842
843    let _ = store
844        .append_events(writes)
845        .await
846        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
847
848    // Get position of third event (index 2, position 2)
849    let filter = EventFilter::all();
850    let page = EventPage::first(BatchSize::new(100));
851    let all_events = store
852        .read_events::<ContractTestEvent>(filter.clone(), page)
853        .await
854        .map_err(|_error| {
855            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
856        })?;
857
858    let (_third_event, third_position) = &all_events[2];
859
860    // When: Reading events after position 2
861    let page_after = EventPage::after(*third_position, BatchSize::new(100));
862    let events_after = store
863        .read_events::<ContractTestEvent>(filter, page_after)
864        .await
865        .map_err(|_error| {
866            ContractTestFailure::assertion(
867                SCENARIO,
868                "read_events failed when reading after position",
869            )
870        })?;
871
872    // Then: Only events at positions 3 and 4 are returned (2 events)
873    if events_after.len() != 2 {
874        return Err(ContractTestFailure::assertion(
875            SCENARIO,
876            format!(
877                "expected 2 events after position {} but got {}",
878                third_position,
879                events_after.len()
880            ),
881        ));
882    }
883
884    // And: Position 2 event is NOT included (verify exclusivity)
885    for (_event, position) in events_after.iter() {
886        if *position == *third_position {
887            return Err(ContractTestFailure::assertion(
888                SCENARIO,
889                format!(
890                    "expected position {} to be excluded but it was included in results",
891                    third_position
892                ),
893            ));
894        }
895    }
896
897    // And: Returned event positions are greater than third_position and in ascending order
898    let (_event1, pos1) = &events_after[0];
899    let (_event2, pos2) = &events_after[1];
900
901    if *pos1 <= *third_position {
902        return Err(ContractTestFailure::assertion(
903            SCENARIO,
904            format!(
905                "expected first returned position to be > {} but got {}",
906                third_position, pos1
907            ),
908        ));
909    }
910
911    if *pos2 <= *pos1 {
912        return Err(ContractTestFailure::assertion(
913            SCENARIO,
914            format!(
915                "expected positions to be in ascending order but {} <= {}",
916                pos2, pos1
917            ),
918        ));
919    }
920
921    Ok(())
922}
923
924/// Contract test: Stream prefix filtering returns only matching streams
925pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
926where
927    F: Fn() -> S + Send + Sync + Clone + 'static,
928    S: EventStore + EventReader + Send + Sync + 'static,
929{
930    const SCENARIO: &str = "stream_prefix_filtering";
931
932    let store = make_store();
933
934    // Given: Events on streams with IDs that actually start with "account-" or "order-"
935    let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
936        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
937    })?;
938    let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
939        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
940    })?;
941    let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
942        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
943    })?;
944
945    let mut writes = register_contract_stream(
946        SCENARIO,
947        StreamWrites::new(),
948        &account_1,
949        StreamVersion::new(0),
950    )?;
951    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
952    writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
953
954    writes = append_contract_event(SCENARIO, writes, &account_1)?;
955    writes = append_contract_event(SCENARIO, writes, &account_2)?;
956    writes = append_contract_event(SCENARIO, writes, &order_1)?;
957
958    let _ = store
959        .append_events(writes)
960        .await
961        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
962
963    // When: Reading with prefix filter "account-"
964    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
965        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
966    })?;
967    let filter = EventFilter::prefix(prefix);
968    let page = EventPage::first(BatchSize::new(100));
969    let events = store
970        .read_events::<ContractTestEvent>(filter, page)
971        .await
972        .map_err(|_error| {
973            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
974        })?;
975
976    // Then: Only events from account-1 and account-2 are returned
977    if events.len() != 2 {
978        return Err(ContractTestFailure::assertion(
979            SCENARIO,
980            format!(
981                "expected 2 events from account-* streams but got {}",
982                events.len()
983            ),
984        ));
985    }
986
987    // And: All events are from streams starting with "account-"
988    for (event, _) in events.iter() {
989        let stream_id_str = event.stream_id().as_ref();
990        if !stream_id_str.starts_with("account-") {
991            return Err(ContractTestFailure::assertion(
992                SCENARIO,
993                format!(
994                    "expected all events from streams starting with 'account-' but found event from {}",
995                    stream_id_str
996                ),
997            ));
998        }
999    }
1000
1001    // And: order-1 events are filtered out (verified by length check above)
1002
1003    Ok(())
1004}
1005
1006/// Contract test: Stream prefix filtering requires true prefix match (not substring match)
1007pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
1008where
1009    F: Fn() -> S + Send + Sync + Clone + 'static,
1010    S: EventStore + EventReader + Send + Sync + 'static,
1011{
1012    const SCENARIO: &str = "stream_prefix_requires_prefix_match";
1013
1014    let store = make_store();
1015
1016    // Given: Three streams with actual prefixes: "account-123", "my-account-456", "order-789"
1017    // We want to verify that prefix "account-" matches ONLY "account-123", not "my-account-456"
1018    let account_stream =
1019        StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
1020            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1021        })?;
1022    let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
1023        .map_err(|e| {
1024            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1025        })?;
1026    let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
1027        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1028    })?;
1029
1030    let mut writes = register_contract_stream(
1031        SCENARIO,
1032        StreamWrites::new(),
1033        &account_stream,
1034        StreamVersion::new(0),
1035    )?;
1036    writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
1037    writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
1038
1039    writes = append_contract_event(SCENARIO, writes, &account_stream)?;
1040    writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
1041    writes = append_contract_event(SCENARIO, writes, &order_stream)?;
1042
1043    let _ = store
1044        .append_events(writes)
1045        .await
1046        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1047
1048    // When: Reading with prefix filter "account-"
1049    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
1050        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
1051    })?;
1052    let filter = EventFilter::prefix(prefix);
1053    let page = EventPage::first(BatchSize::new(100));
1054    let events = store
1055        .read_events::<ContractTestEvent>(filter, page)
1056        .await
1057        .map_err(|_error| {
1058            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
1059        })?;
1060
1061    // Then: ONLY "account-123" stream should be returned (not "my-account-456")
1062    if events.len() != 1 {
1063        return Err(ContractTestFailure::assertion(
1064            SCENARIO,
1065            format!(
1066                "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
1067                events.len()
1068            ),
1069        ));
1070    }
1071
1072    // And: The event must be from a stream starting with "account-123"
1073    let (event, _) = &events[0];
1074    let stream_id_str = event.stream_id().as_ref();
1075    if !stream_id_str.starts_with("account-123") {
1076        return Err(ContractTestFailure::assertion(
1077            SCENARIO,
1078            format!(
1079                "expected event from stream starting with 'account-123' but got from {}",
1080                stream_id_str
1081            ),
1082        ));
1083    }
1084
1085    // And: Verify it's NOT from my-account-456 (proves we're not doing substring matching)
1086    if stream_id_str.starts_with("my-account-456") {
1087        return Err(ContractTestFailure::assertion(
1088            SCENARIO,
1089            "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
1090        ));
1091    }
1092
1093    Ok(())
1094}
1095
1096/// Contract test: Glob `*` wildcard pattern filtering selects only matching streams.
1097///
1098/// Per ADR-0047, `EventFilter::pattern` matches stream IDs against a POSIX glob
1099/// pattern. The wildcard `*` matches any sequence of characters (including the
1100/// stream separator `/`). This test proves the filter is applied at the query
1101/// level: it appends enough non-matching events to cross the pagination `LIMIT`
1102/// boundary before any matching events, so an implementation that applied `LIMIT`
1103/// before the pattern filter would return zero matches.
1104pub async fn test_stream_pattern_filtering<F, S>(make_store: F) -> ContractTestResult
1105where
1106    F: Fn() -> S + Send + Sync + Clone + 'static,
1107    S: EventStore + EventReader + Send + Sync + 'static,
1108{
1109    const SCENARIO: &str = "stream_pattern_filtering";
1110
1111    let store = make_store();
1112    let run = Uuid::now_v7();
1113
1114    // Given: many non-matching "order-*" streams followed by two matching
1115    // "account-*" streams. The non-matching events alone exceed the page limit.
1116    let mut writes = StreamWrites::new();
1117    let mut order_streams = Vec::new();
1118    for _ in 0..50 {
1119        let order = StreamId::try_new(format!("order-{}", Uuid::now_v7())).map_err(|e| {
1120            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1121        })?;
1122        writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1123        order_streams.push(order);
1124    }
1125
1126    let account_1 = StreamId::try_new(format!("account-1-{run}")).map_err(|e| {
1127        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1128    })?;
1129    let account_2 = StreamId::try_new(format!("account-2-{run}")).map_err(|e| {
1130        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1131    })?;
1132    writes = register_contract_stream(SCENARIO, writes, &account_1, StreamVersion::new(0))?;
1133    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
1134
1135    for order in &order_streams {
1136        writes = append_contract_event(SCENARIO, writes, order)?;
1137    }
1138    writes = append_contract_event(SCENARIO, writes, &account_1)?;
1139    writes = append_contract_event(SCENARIO, writes, &account_2)?;
1140
1141    let _ = store
1142        .append_events(writes)
1143        .await
1144        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1145
1146    // When: Reading with glob pattern "account-*" and a page limit smaller than
1147    // the number of non-matching events that precede the matches.
1148    let pattern = StreamPattern::try_new("account-*").map_err(|e| {
1149        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1150    })?;
1151    let filter = EventFilter::pattern(pattern);
1152    let page = EventPage::first(BatchSize::new(10));
1153    let events = store
1154        .read_events::<ContractTestEvent>(filter, page)
1155        .await
1156        .map_err(|_error| {
1157            ContractTestFailure::assertion(
1158                SCENARIO,
1159                "read_events failed with stream pattern filter",
1160            )
1161        })?;
1162
1163    // Then: exactly the two "account-*" events are returned. If the limit were
1164    // applied before filtering, this would be zero.
1165    if events.len() != 2 {
1166        return Err(ContractTestFailure::assertion(
1167            SCENARIO,
1168            format!(
1169                "expected exactly 2 events matching glob 'account-*' but got {} (pattern filter must be applied before the page limit)",
1170                events.len()
1171            ),
1172        ));
1173    }
1174
1175    for (event, _) in events.iter() {
1176        let stream_id_str = event.stream_id().as_ref();
1177        if !stream_id_str.starts_with("account-") {
1178            return Err(ContractTestFailure::assertion(
1179                SCENARIO,
1180                format!(
1181                    "expected all events from streams matching 'account-*' but found event from {}",
1182                    stream_id_str
1183                ),
1184            ));
1185        }
1186    }
1187
1188    Ok(())
1189}
1190
1191/// Contract test: Glob `?` matches exactly one character.
1192///
1193/// Per ADR-0047, `?` matches a single arbitrary character. Pattern `account-?`
1194/// must match `account-1` but not `account-12` (two trailing characters) and
1195/// not `order-1`.
1196pub async fn test_stream_pattern_single_char<F, S>(make_store: F) -> ContractTestResult
1197where
1198    F: Fn() -> S + Send + Sync + Clone + 'static,
1199    S: EventStore + EventReader + Send + Sync + 'static,
1200{
1201    const SCENARIO: &str = "stream_pattern_single_char";
1202
1203    let store = make_store();
1204    let run = Uuid::now_v7();
1205
1206    // Given: one single-char-suffix stream, one two-char-suffix stream, one order.
1207    let account_x = StreamId::try_new(format!("account-x{run}")).map_err(|e| {
1208        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1209    })?;
1210    let account_xy = StreamId::try_new(format!("account-xy{run}")).map_err(|e| {
1211        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1212    })?;
1213    let order = StreamId::try_new(format!("order-z{run}")).map_err(|e| {
1214        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1215    })?;
1216
1217    let mut writes = register_contract_stream(
1218        SCENARIO,
1219        StreamWrites::new(),
1220        &account_x,
1221        StreamVersion::new(0),
1222    )?;
1223    writes = register_contract_stream(SCENARIO, writes, &account_xy, StreamVersion::new(0))?;
1224    writes = register_contract_stream(SCENARIO, writes, &order, StreamVersion::new(0))?;
1225
1226    writes = append_contract_event(SCENARIO, writes, &account_x)?;
1227    writes = append_contract_event(SCENARIO, writes, &account_xy)?;
1228    writes = append_contract_event(SCENARIO, writes, &order)?;
1229
1230    let _ = store
1231        .append_events(writes)
1232        .await
1233        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1234
1235    // When: Reading with glob pattern "account-?{run}" (single char between
1236    // "account-" and the run suffix).
1237    let pattern = StreamPattern::try_new(format!("account-?{run}")).map_err(|e| {
1238        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1239    })?;
1240    let filter = EventFilter::pattern(pattern);
1241    let page = EventPage::first(BatchSize::new(100));
1242    let events = store
1243        .read_events::<ContractTestEvent>(filter, page)
1244        .await
1245        .map_err(|_error| {
1246            ContractTestFailure::assertion(
1247                SCENARIO,
1248                "read_events failed with stream pattern filter",
1249            )
1250        })?;
1251
1252    // Then: only "account-x{run}" matches (single char). "account-xy{run}" has
1253    // two chars; "order-z{run}" has the wrong prefix.
1254    if events.len() != 1 {
1255        return Err(ContractTestFailure::assertion(
1256            SCENARIO,
1257            format!(
1258                "expected exactly 1 event matching glob 'account-?{run}' but got {}",
1259                events.len()
1260            ),
1261        ));
1262    }
1263
1264    let (event, _) = &events[0];
1265    let stream_id_str = event.stream_id().as_ref();
1266    if stream_id_str != account_x.as_ref() {
1267        return Err(ContractTestFailure::assertion(
1268            SCENARIO,
1269            format!(
1270                "expected the single-character match {} but got {}",
1271                account_x.as_ref(),
1272                stream_id_str
1273            ),
1274        ));
1275    }
1276
1277    Ok(())
1278}
1279
1280/// Contract test: Glob `[0-9]` character class matches a single digit.
1281///
1282/// Per ADR-0047, a bracketed character class `[...]` matches one character from
1283/// the set. Pattern `account-[0-9]*` must match streams whose suffix begins with
1284/// a digit and reject those beginning with a non-digit.
1285pub async fn test_stream_pattern_char_class<F, S>(make_store: F) -> ContractTestResult
1286where
1287    F: Fn() -> S + Send + Sync + Clone + 'static,
1288    S: EventStore + EventReader + Send + Sync + 'static,
1289{
1290    const SCENARIO: &str = "stream_pattern_char_class";
1291
1292    let store = make_store();
1293    let run = Uuid::now_v7().simple().to_string();
1294
1295    // Given: a digit-prefixed account stream and a letter-prefixed account stream.
1296    let digit_stream = StreamId::try_new(format!("account-7-{run}")).map_err(|e| {
1297        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1298    })?;
1299    let letter_stream = StreamId::try_new(format!("account-a-{run}")).map_err(|e| {
1300        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
1301    })?;
1302
1303    let mut writes = register_contract_stream(
1304        SCENARIO,
1305        StreamWrites::new(),
1306        &digit_stream,
1307        StreamVersion::new(0),
1308    )?;
1309    writes = register_contract_stream(SCENARIO, writes, &letter_stream, StreamVersion::new(0))?;
1310
1311    writes = append_contract_event(SCENARIO, writes, &digit_stream)?;
1312    writes = append_contract_event(SCENARIO, writes, &letter_stream)?;
1313
1314    let _ = store
1315        .append_events(writes)
1316        .await
1317        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1318
1319    // When: Reading with glob pattern "account-[0-9]*".
1320    let pattern = StreamPattern::try_new("account-[0-9]*").map_err(|e| {
1321        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream pattern: {}", e))
1322    })?;
1323    let filter = EventFilter::pattern(pattern);
1324    let page = EventPage::first(BatchSize::new(100));
1325    let events = store
1326        .read_events::<ContractTestEvent>(filter, page)
1327        .await
1328        .map_err(|_error| {
1329            ContractTestFailure::assertion(
1330                SCENARIO,
1331                "read_events failed with stream pattern filter",
1332            )
1333        })?;
1334
1335    // Then: only the digit-prefixed stream matches.
1336    if events.len() != 1 {
1337        return Err(ContractTestFailure::assertion(
1338            SCENARIO,
1339            format!(
1340                "expected exactly 1 event matching glob 'account-[0-9]*' but got {}",
1341                events.len()
1342            ),
1343        ));
1344    }
1345
1346    let (event, _) = &events[0];
1347    let stream_id_str = event.stream_id().as_ref();
1348    if stream_id_str != digit_stream.as_ref() {
1349        return Err(ContractTestFailure::assertion(
1350            SCENARIO,
1351            format!(
1352                "expected the digit-prefixed match {} but got {}",
1353                digit_stream.as_ref(),
1354                stream_id_str
1355            ),
1356        ));
1357    }
1358
1359    Ok(())
1360}
1361
1362/// Contract test: Batch limiting returns exactly the specified number of events
1363pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
1364where
1365    F: Fn() -> S + Send + Sync + Clone + 'static,
1366    S: EventStore + EventReader + Send + Sync + 'static,
1367{
1368    const SCENARIO: &str = "batch_limiting";
1369
1370    let store = make_store();
1371
1372    // Given: 20 events in the store
1373    let stream = contract_stream_id(SCENARIO, "stream")?;
1374
1375    let mut writes = register_contract_stream(
1376        SCENARIO,
1377        StreamWrites::new(),
1378        &stream,
1379        StreamVersion::new(0),
1380    )?;
1381
1382    // Append 20 events
1383    for _ in 0..20 {
1384        writes = append_contract_event(SCENARIO, writes, &stream)?;
1385    }
1386
1387    let _ = store
1388        .append_events(writes)
1389        .await
1390        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1391
1392    // When: Read events with limit of 10
1393    let filter = EventFilter::all();
1394    let page = EventPage::first(BatchSize::new(10));
1395    let events = store
1396        .read_events::<ContractTestEvent>(filter, page)
1397        .await
1398        .map_err(|_error| {
1399            ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1400        })?;
1401
1402    // Then: Exactly 10 events are returned
1403    if events.len() != 10 {
1404        return Err(ContractTestFailure::assertion(
1405            SCENARIO,
1406            format!("expected exactly 10 events but got {}", events.len()),
1407        ));
1408    }
1409
1410    // And: Events are the FIRST 10 in global order
1411    // (We verify this by checking we got exactly 10 events - the implementation
1412    // must return events in order, so if we got 10 events they must be the first 10)
1413
1414    Ok(())
1415}
1416
1417// ============================================================================
1418// CheckpointStore Contract Tests
1419// ============================================================================
1420
1421/// Contract test: Save a checkpoint and load it back
1422pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1423where
1424    F: Fn() -> CS + Send + Sync + Clone + 'static,
1425    CS: CheckpointStore + Send + Sync + 'static,
1426{
1427    const SCENARIO: &str = "checkpoint_save_and_load";
1428
1429    let store = make_checkpoint_store();
1430
1431    // Given: A subscription name and position
1432    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1433    let position = StreamPosition::new(Uuid::now_v7());
1434
1435    // When: Saving the checkpoint
1436    store
1437        .save(&subscription_name, position)
1438        .await
1439        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1440
1441    // Then: Loading returns the saved position
1442    let loaded = store
1443        .load(&subscription_name)
1444        .await
1445        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1446
1447    if loaded != Some(position) {
1448        return Err(ContractTestFailure::assertion(
1449            SCENARIO,
1450            format!(
1451                "expected loaded position {:?} but got {:?}",
1452                Some(position),
1453                loaded
1454            ),
1455        ));
1456    }
1457
1458    Ok(())
1459}
1460
1461/// Contract test: Saving a checkpoint overwrites the previous value
1462pub async fn test_checkpoint_update_overwrites<F, CS>(
1463    make_checkpoint_store: F,
1464) -> ContractTestResult
1465where
1466    F: Fn() -> CS + Send + Sync + Clone + 'static,
1467    CS: CheckpointStore + Send + Sync + 'static,
1468{
1469    const SCENARIO: &str = "checkpoint_update_overwrites";
1470
1471    let store = make_checkpoint_store();
1472
1473    // Given: A subscription with an initial checkpoint
1474    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1475    let first_position = StreamPosition::new(Uuid::now_v7());
1476
1477    store
1478        .save(&subscription_name, first_position)
1479        .await
1480        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1481
1482    // When: Saving a new position
1483    let second_position = StreamPosition::new(Uuid::now_v7());
1484    store
1485        .save(&subscription_name, second_position)
1486        .await
1487        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1488
1489    // Then: Loading returns the new position, not the old one
1490    let loaded = store
1491        .load(&subscription_name)
1492        .await
1493        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1494
1495    if loaded != Some(second_position) {
1496        return Err(ContractTestFailure::assertion(
1497            SCENARIO,
1498            format!(
1499                "expected updated position {:?} but got {:?}",
1500                Some(second_position),
1501                loaded
1502            ),
1503        ));
1504    }
1505
1506    Ok(())
1507}
1508
1509/// Contract test: Loading a non-existent checkpoint returns None
1510pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1511    make_checkpoint_store: F,
1512) -> ContractTestResult
1513where
1514    F: Fn() -> CS + Send + Sync + Clone + 'static,
1515    CS: CheckpointStore + Send + Sync + 'static,
1516{
1517    const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1518
1519    let store = make_checkpoint_store();
1520
1521    // Given: A subscription name that has never been saved
1522    let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1523
1524    // When: Loading the checkpoint
1525    let loaded = store
1526        .load(&subscription_name)
1527        .await
1528        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1529
1530    // Then: None is returned
1531    if loaded.is_some() {
1532        return Err(ContractTestFailure::assertion(
1533            SCENARIO,
1534            format!("expected None for missing checkpoint but got {:?}", loaded),
1535        ));
1536    }
1537
1538    Ok(())
1539}
1540
1541/// Contract test: Different subscription names have independent checkpoints
1542pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1543    make_checkpoint_store: F,
1544) -> ContractTestResult
1545where
1546    F: Fn() -> CS + Send + Sync + Clone + 'static,
1547    CS: CheckpointStore + Send + Sync + 'static,
1548{
1549    const SCENARIO: &str = "checkpoint_independent_subscriptions";
1550
1551    let store = make_checkpoint_store();
1552
1553    // Given: Two subscription names
1554    let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1555    let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1556
1557    let position_a = StreamPosition::new(Uuid::now_v7());
1558    let position_b = StreamPosition::new(Uuid::now_v7());
1559
1560    // When: Saving different positions for each
1561    store
1562        .save(&subscription_a, position_a)
1563        .await
1564        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1565
1566    store
1567        .save(&subscription_b, position_b)
1568        .await
1569        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1570
1571    // Then: Each subscription loads its own position
1572    let loaded_a = store
1573        .load(&subscription_a)
1574        .await
1575        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1576
1577    let loaded_b = store
1578        .load(&subscription_b)
1579        .await
1580        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1581
1582    if loaded_a != Some(position_a) {
1583        return Err(ContractTestFailure::assertion(
1584            SCENARIO,
1585            format!(
1586                "subscription A: expected {:?} but got {:?}",
1587                Some(position_a),
1588                loaded_a
1589            ),
1590        ));
1591    }
1592
1593    if loaded_b != Some(position_b) {
1594        return Err(ContractTestFailure::assertion(
1595            SCENARIO,
1596            format!(
1597                "subscription B: expected {:?} but got {:?}",
1598                Some(position_b),
1599                loaded_b
1600            ),
1601        ));
1602    }
1603
1604    Ok(())
1605}
1606
1607// ============================================================================
1608// ProjectorCoordinator Contract Tests
1609// ============================================================================
1610
1611/// Contract test: First instance can acquire leadership successfully
1612///
1613/// Observable behavior: When no other instance holds leadership for a subscription,
1614/// calling try_acquire returns a guard indicating successful acquisition.
1615pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1616where
1617    F: Fn() -> C + Send + Sync + Clone + 'static,
1618    C: ProjectorCoordinator + Send + Sync + 'static,
1619{
1620    const SCENARIO: &str = "coordination_acquire_leadership";
1621
1622    let coordinator = make_coordinator();
1623
1624    // Given: A unique subscription name (no existing leadership)
1625    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1626
1627    // When: Attempting to acquire leadership
1628    let result = coordinator.try_acquire(&subscription_name).await;
1629
1630    // Then: Acquisition succeeds (returns Ok with guard)
1631    if result.is_err() {
1632        return Err(ContractTestFailure::assertion(
1633            SCENARIO,
1634            "expected first instance to acquire leadership successfully, but try_acquire failed",
1635        ));
1636    }
1637
1638    Ok(())
1639}
1640
1641/// Contract test: Second instance returns error when leadership unavailable
1642///
1643/// Observable behavior: When one instance holds leadership for a subscription,
1644/// a second attempt to acquire leadership for the same subscription returns an error.
1645pub async fn test_coordination_second_instance_blocked<F, C>(
1646    make_coordinator: F,
1647) -> ContractTestResult
1648where
1649    F: Fn() -> C + Send + Sync + Clone + 'static,
1650    C: ProjectorCoordinator + Send + Sync + 'static,
1651{
1652    const SCENARIO: &str = "coordination_second_instance_blocked";
1653
1654    let coordinator = make_coordinator();
1655
1656    // Given: A unique subscription name
1657    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1658
1659    // And: First instance acquires leadership
1660    let _first_guard = coordinator
1661        .try_acquire(&subscription_name)
1662        .await
1663        .map_err(|_| {
1664            ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1665        })?;
1666
1667    // When: Second instance attempts to acquire leadership while first holds it
1668    let second_result = coordinator.try_acquire(&subscription_name).await;
1669
1670    // Then: Second attempt returns an error (leadership unavailable)
1671    if second_result.is_ok() {
1672        return Err(ContractTestFailure::assertion(
1673            SCENARIO,
1674            "expected second instance to be blocked but try_acquire succeeded",
1675        ));
1676    }
1677
1678    Ok(())
1679}
1680
1681/// Contract test: Different projectors have independent coordination (different lock keys)
1682///
1683/// Observable behavior: Leadership for one subscription does not block leadership
1684/// acquisition for a different subscription. Each subscription/projector has its own
1685/// independent coordination scope.
1686pub async fn test_coordination_independent_subscriptions<F, C>(
1687    make_coordinator: F,
1688) -> ContractTestResult
1689where
1690    F: Fn() -> C + Send + Sync + Clone + 'static,
1691    C: ProjectorCoordinator + Send + Sync + 'static,
1692{
1693    const SCENARIO: &str = "coordination_independent_subscriptions";
1694
1695    let coordinator = make_coordinator();
1696
1697    // Given: Two unique subscription names (different projectors)
1698    let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1699    let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1700
1701    // And: First projector acquires leadership for subscription A
1702    let _guard_a = coordinator
1703        .try_acquire(&subscription_a)
1704        .await
1705        .map_err(|_| {
1706            ContractTestFailure::assertion(
1707                SCENARIO,
1708                "projector-A failed to acquire leadership for its subscription",
1709            )
1710        })?;
1711
1712    // When: Second projector attempts to acquire leadership for subscription B
1713    // (while first projector still holds leadership for A)
1714    let result_b = coordinator.try_acquire(&subscription_b).await;
1715
1716    // Then: Second acquisition succeeds (different subscriptions are independent)
1717    if result_b.is_err() {
1718        return Err(ContractTestFailure::assertion(
1719            SCENARIO,
1720            "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1721        ));
1722    }
1723
1724    Ok(())
1725}
1726
1727/// Contract test: Leadership is released when guard is dropped (crash/disconnect recovery)
1728///
1729/// Observable behavior: When an instance holding leadership drops its guard (simulating
1730/// process exit, crash, or connection close), the lock is released and another instance
1731/// can acquire leadership for the same subscription.
1732pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1733    make_coordinator: F,
1734) -> ContractTestResult
1735where
1736    F: Fn() -> C + Send + Sync + Clone + 'static,
1737    C: ProjectorCoordinator + Send + Sync + 'static,
1738{
1739    const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1740
1741    let coordinator = make_coordinator();
1742
1743    // Given: A unique subscription name
1744    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1745
1746    // And: First instance acquires leadership, then drops the guard
1747    {
1748        let _first_guard = coordinator
1749            .try_acquire(&subscription_name)
1750            .await
1751            .map_err(|_| {
1752                ContractTestFailure::assertion(
1753                    SCENARIO,
1754                    "first instance failed to acquire leadership",
1755                )
1756            })?;
1757        // Guard is dropped here when scope ends (simulates process exit/crash)
1758    }
1759
1760    // When: Second instance attempts to acquire leadership after first guard dropped
1761    let second_result = coordinator.try_acquire(&subscription_name).await;
1762
1763    // Then: Second acquisition succeeds (leadership was released)
1764    if second_result.is_err() {
1765        return Err(ContractTestFailure::assertion(
1766            SCENARIO,
1767            "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1768        ));
1769    }
1770
1771    Ok(())
1772}