eventcore_testing/
contract.rs

1use eventcore_types::{
2    BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3    EventStoreError, StreamId, StreamPosition, StreamPrefix, StreamVersion, StreamWrites,
4};
5use std::fmt;
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug)]
11pub struct ContractTestFailure {
12    scenario: &'static str,
13    detail: String,
14}
15
16impl ContractTestFailure {
17    fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
18        Self {
19            scenario,
20            detail: detail.into(),
21        }
22    }
23
24    fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
25        Self::new(scenario, format!("builder failure during {phase}: {error}"))
26    }
27
28    fn store_error(
29        scenario: &'static str,
30        operation: &'static str,
31        error: EventStoreError,
32    ) -> Self {
33        Self::new(
34            scenario,
35            format!("{operation} operation returned unexpected error: {error}"),
36        )
37    }
38
39    fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
40        Self::new(scenario, detail)
41    }
42}
43
44impl fmt::Display for ContractTestFailure {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        write!(f, "[{}] {}", self.scenario, self.detail)
47    }
48}
49
50impl std::error::Error for ContractTestFailure {}
51
52pub type ContractTestResult = Result<(), ContractTestFailure>;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ContractTestEvent {
56    stream_id: StreamId,
57}
58
59impl ContractTestEvent {
60    pub fn new(stream_id: StreamId) -> Self {
61        Self { stream_id }
62    }
63}
64
65impl Event for ContractTestEvent {
66    fn stream_id(&self) -> &StreamId {
67        &self.stream_id
68    }
69}
70
71fn contract_stream_id(
72    scenario: &'static str,
73    label: &str,
74) -> Result<StreamId, ContractTestFailure> {
75    // Include UUID for parallel test execution against shared database
76    let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
77
78    StreamId::try_new(raw.clone()).map_err(|error| {
79        ContractTestFailure::assertion(
80            scenario,
81            format!("unable to construct stream id `{}`: {}", raw, error),
82        )
83    })
84}
85
86fn builder_step(
87    scenario: &'static str,
88    phase: &'static str,
89    result: Result<StreamWrites, EventStoreError>,
90) -> Result<StreamWrites, ContractTestFailure> {
91    result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
92}
93
94fn register_contract_stream(
95    scenario: &'static str,
96    writes: StreamWrites,
97    stream_id: &StreamId,
98    expected_version: StreamVersion,
99) -> Result<StreamWrites, ContractTestFailure> {
100    builder_step(
101        scenario,
102        "register_stream",
103        writes.register_stream(stream_id.clone(), expected_version),
104    )
105}
106
107fn append_contract_event(
108    scenario: &'static str,
109    writes: StreamWrites,
110    stream_id: &StreamId,
111) -> Result<StreamWrites, ContractTestFailure> {
112    let event = ContractTestEvent::new(stream_id.clone());
113    builder_step(scenario, "append", writes.append(event))
114}
115
116pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
117where
118    F: Fn() -> S + Send + Sync + Clone + 'static,
119    S: EventStore + Send + Sync + 'static,
120{
121    const SCENARIO: &str = "basic_read_write";
122
123    let store = make_store();
124    let stream_id = contract_stream_id(SCENARIO, "single");
125
126    let stream_id = stream_id?;
127
128    let writes = register_contract_stream(
129        SCENARIO,
130        StreamWrites::new(),
131        &stream_id,
132        StreamVersion::new(0),
133    )?;
134    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
135
136    let _ = store
137        .append_events(writes)
138        .await
139        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
140
141    let reader = store
142        .read_stream::<ContractTestEvent>(stream_id.clone())
143        .await
144        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146    let len = reader.len();
147    let empty = reader.is_empty();
148
149    if empty {
150        return Err(ContractTestFailure::assertion(
151            SCENARIO,
152            "expected stream to contain events but it was empty",
153        ));
154    }
155
156    if len != 1 {
157        return Err(ContractTestFailure::assertion(
158            SCENARIO,
159            format!(
160                "expected stream to contain exactly one event, observed len={}",
161                len
162            ),
163        ));
164    }
165
166    Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171    F: Fn() -> S + Send + Sync + Clone + 'static,
172    S: EventStore + Send + Sync + 'static,
173{
174    const SCENARIO: &str = "concurrent_version_conflicts";
175
176    let store = make_store();
177    let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179    let first_writes = register_contract_stream(
180        SCENARIO,
181        StreamWrites::new(),
182        &stream_id,
183        StreamVersion::new(0),
184    )?;
185    let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187    let _ = store
188        .append_events(first_writes)
189        .await
190        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192    let conflicting_writes = register_contract_stream(
193        SCENARIO,
194        StreamWrites::new(),
195        &stream_id,
196        StreamVersion::new(0),
197    )?;
198    let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200    match store.append_events(conflicting_writes).await {
201        Err(EventStoreError::VersionConflict) => Ok(()),
202        Err(error) => Err(ContractTestFailure::store_error(
203            SCENARIO,
204            "append_events",
205            error,
206        )),
207        Ok(_) => Err(ContractTestFailure::assertion(
208            SCENARIO,
209            "expected version conflict but append succeeded",
210        )),
211    }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216    F: Fn() -> S + Send + Sync + Clone + 'static,
217    S: EventStore + Send + Sync + 'static,
218{
219    const SCENARIO: &str = "stream_isolation";
220
221    let store = make_store();
222    let left_stream = contract_stream_id(SCENARIO, "left")?;
223    let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225    let writes = register_contract_stream(
226        SCENARIO,
227        StreamWrites::new(),
228        &left_stream,
229        StreamVersion::new(0),
230    )?;
231    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235    let _ = store
236        .append_events(writes)
237        .await
238        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240    let left_reader = store
241        .read_stream::<ContractTestEvent>(left_stream.clone())
242        .await
243        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244
245    let right_reader = store
246        .read_stream::<ContractTestEvent>(right_stream.clone())
247        .await
248        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
249
250    let left_len = left_reader.len();
251    if left_len != 1 {
252        return Err(ContractTestFailure::assertion(
253            SCENARIO,
254            format!(
255                "left stream expected exactly one event but observed {}",
256                left_len
257            ),
258        ));
259    }
260
261    if left_reader
262        .iter()
263        .any(|event| event.stream_id() != &left_stream)
264    {
265        return Err(ContractTestFailure::assertion(
266            SCENARIO,
267            "left stream read events belonging to another stream",
268        ));
269    }
270
271    let right_len = right_reader.len();
272    if right_len != 1 {
273        return Err(ContractTestFailure::assertion(
274            SCENARIO,
275            format!(
276                "right stream expected exactly one event but observed {}",
277                right_len
278            ),
279        ));
280    }
281
282    if right_reader
283        .iter()
284        .any(|event| event.stream_id() != &right_stream)
285    {
286        return Err(ContractTestFailure::assertion(
287            SCENARIO,
288            "right stream read events belonging to another stream",
289        ));
290    }
291
292    Ok(())
293}
294
295pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
296where
297    F: Fn() -> S + Send + Sync + Clone + 'static,
298    S: EventStore + Send + Sync + 'static,
299{
300    const SCENARIO: &str = "missing_stream_reads";
301
302    let store = make_store();
303    let stream_id = contract_stream_id(SCENARIO, "ghost")?;
304
305    let reader = store
306        .read_stream::<ContractTestEvent>(stream_id.clone())
307        .await
308        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
309
310    if !reader.is_empty() {
311        return Err(ContractTestFailure::assertion(
312            SCENARIO,
313            "expected read_stream to succeed with no events for an untouched stream",
314        ));
315    }
316
317    Ok(())
318}
319
320pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
321where
322    F: Fn() -> S + Send + Sync + Clone + 'static,
323    S: EventStore + Send + Sync + 'static,
324{
325    const SCENARIO: &str = "conflict_preserves_atomicity";
326
327    let store = make_store();
328    let left_stream = contract_stream_id(SCENARIO, "left")?;
329    let right_stream = contract_stream_id(SCENARIO, "right")?;
330
331    // Seed one event per stream so we can introduce a single-stream conflict later.
332    let writes = register_contract_stream(
333        SCENARIO,
334        StreamWrites::new(),
335        &left_stream,
336        StreamVersion::new(0),
337    )?;
338    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
339    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
340    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
341
342    let _ = store
343        .append_events(writes)
344        .await
345        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
346
347    // Build a batch where the left stream has a stale expected version and the right stream is current.
348    let writes = register_contract_stream(
349        SCENARIO,
350        StreamWrites::new(),
351        &left_stream,
352        StreamVersion::new(0),
353    )?;
354    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
355    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
356    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
357
358    match store.append_events(writes).await {
359        Err(EventStoreError::VersionConflict) => {
360            let left_reader = store
361                .read_stream::<ContractTestEvent>(left_stream.clone())
362                .await
363                .map_err(|error| {
364                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
365                })?;
366            if left_reader.len() != 1 {
367                return Err(ContractTestFailure::assertion(
368                    SCENARIO,
369                    format!(
370                        "expected left stream to remain at len=1 after failed append, observed {}",
371                        left_reader.len()
372                    ),
373                ));
374            }
375
376            let right_reader = store
377                .read_stream::<ContractTestEvent>(right_stream.clone())
378                .await
379                .map_err(|error| {
380                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
381                })?;
382            if right_reader.len() != 1 {
383                return Err(ContractTestFailure::assertion(
384                    SCENARIO,
385                    format!(
386                        "expected right stream to remain at len=1 after failed append, observed {}",
387                        right_reader.len()
388                    ),
389                ));
390            }
391
392            Ok(())
393        }
394        Err(error) => Err(ContractTestFailure::store_error(
395            SCENARIO,
396            "append_events",
397            error,
398        )),
399        Ok(_) => Err(ContractTestFailure::assertion(
400            SCENARIO,
401            "expected version conflict but append succeeded",
402        )),
403    }
404}
405
406// NOTE: The old fragmented macros (event_store_contract_tests!, event_reader_contract_tests!)
407// have been removed. Use backend_contract_tests! which runs ALL contract tests.
408
409/// Unified contract test macro for all backend implementations.
410///
411/// This macro generates ALL contract tests for a backend implementation.
412/// When new contract tests are added to eventcore-testing, they automatically
413/// run for all backends that use this macro—no changes to backend test files required.
414///
415/// # Example
416///
417/// ```ignore
418/// backend_contract_tests! {
419///     suite = my_backend,
420///     make_store = || MyEventStore::new(),
421///     make_checkpoint_store = || MyCheckpointStore::new(),
422/// }
423/// ```
424///
425/// # Requirements
426///
427/// The store type must implement both `EventStore` and `EventReader` traits.
428/// The checkpoint store type must implement `CheckpointStore` trait.
429#[macro_export]
430macro_rules! backend_contract_tests {
431    (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr $(,)?) => {
432        #[allow(non_snake_case)]
433        mod $suite {
434            use $crate::contract::{
435                test_basic_read_write, test_batch_limiting,
436                test_checkpoint_independent_subscriptions,
437                test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
438                test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
439                test_conflict_preserves_atomicity, test_event_ordering_across_streams,
440                test_missing_stream_reads, test_position_based_resumption, test_stream_isolation,
441                test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
442            };
443
444            #[tokio::test(flavor = "multi_thread")]
445            async fn basic_read_write_contract() {
446                test_basic_read_write($make_store)
447                    .await
448                    .expect("event store contract failed");
449            }
450
451            #[tokio::test(flavor = "multi_thread")]
452            async fn concurrent_version_conflicts_contract() {
453                test_concurrent_version_conflicts($make_store)
454                    .await
455                    .expect("event store contract failed");
456            }
457
458            #[tokio::test(flavor = "multi_thread")]
459            async fn stream_isolation_contract() {
460                test_stream_isolation($make_store)
461                    .await
462                    .expect("event store contract failed");
463            }
464
465            #[tokio::test(flavor = "multi_thread")]
466            async fn missing_stream_reads_contract() {
467                test_missing_stream_reads($make_store)
468                    .await
469                    .expect("event store contract failed");
470            }
471
472            #[tokio::test(flavor = "multi_thread")]
473            async fn conflict_preserves_atomicity_contract() {
474                test_conflict_preserves_atomicity($make_store)
475                    .await
476                    .expect("event store contract failed");
477            }
478
479            #[tokio::test(flavor = "multi_thread")]
480            async fn event_ordering_across_streams_contract() {
481                test_event_ordering_across_streams($make_store)
482                    .await
483                    .expect("event reader contract failed");
484            }
485
486            #[tokio::test(flavor = "multi_thread")]
487            async fn position_based_resumption_contract() {
488                test_position_based_resumption($make_store)
489                    .await
490                    .expect("event reader contract failed");
491            }
492
493            #[tokio::test(flavor = "multi_thread")]
494            async fn stream_prefix_filtering_contract() {
495                test_stream_prefix_filtering($make_store)
496                    .await
497                    .expect("event reader contract failed");
498            }
499
500            #[tokio::test(flavor = "multi_thread")]
501            async fn stream_prefix_requires_prefix_match_contract() {
502                test_stream_prefix_requires_prefix_match($make_store)
503                    .await
504                    .expect("event reader contract failed");
505            }
506
507            #[tokio::test(flavor = "multi_thread")]
508            async fn batch_limiting_contract() {
509                test_batch_limiting($make_store)
510                    .await
511                    .expect("event reader contract failed");
512            }
513
514            // CheckpointStore contract tests
515            #[tokio::test(flavor = "multi_thread")]
516            async fn checkpoint_save_and_load_contract() {
517                test_checkpoint_save_and_load($make_checkpoint_store)
518                    .await
519                    .expect("checkpoint store contract failed");
520            }
521
522            #[tokio::test(flavor = "multi_thread")]
523            async fn checkpoint_update_overwrites_contract() {
524                test_checkpoint_update_overwrites($make_checkpoint_store)
525                    .await
526                    .expect("checkpoint store contract failed");
527            }
528
529            #[tokio::test(flavor = "multi_thread")]
530            async fn checkpoint_load_missing_returns_none_contract() {
531                test_checkpoint_load_missing_returns_none($make_checkpoint_store)
532                    .await
533                    .expect("checkpoint store contract failed");
534            }
535
536            #[tokio::test(flavor = "multi_thread")]
537            async fn checkpoint_independent_subscriptions_contract() {
538                test_checkpoint_independent_subscriptions($make_checkpoint_store)
539                    .await
540                    .expect("checkpoint store contract failed");
541            }
542        }
543    };
544}
545
546pub use backend_contract_tests;
547
548/// Contract test: Events from multiple streams are read in global append order
549pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
550where
551    F: Fn() -> S + Send + Sync + Clone + 'static,
552    S: EventStore + EventReader + Send + Sync + 'static,
553{
554    const SCENARIO: &str = "event_ordering_across_streams";
555
556    let store = make_store();
557
558    // Given: Three streams with events appended in specific order
559    let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
560    let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
561    let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
562
563    // Append event to stream A
564    let writes = register_contract_stream(
565        SCENARIO,
566        StreamWrites::new(),
567        &stream_a,
568        StreamVersion::new(0),
569    )?;
570    let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
571    let _ = store
572        .append_events(writes)
573        .await
574        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
575
576    // Append event to stream B
577    let writes = register_contract_stream(
578        SCENARIO,
579        StreamWrites::new(),
580        &stream_b,
581        StreamVersion::new(0),
582    )?;
583    let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
584    let _ = store
585        .append_events(writes)
586        .await
587        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
588
589    // Append event to stream C
590    let writes = register_contract_stream(
591        SCENARIO,
592        StreamWrites::new(),
593        &stream_c,
594        StreamVersion::new(0),
595    )?;
596    let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
597    let _ = store
598        .append_events(writes)
599        .await
600        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
601
602    // When: Reading all events via EventReader with no position filter
603    let filter = EventFilter::all();
604    let page = EventPage::first(BatchSize::new(100));
605    let events = store
606        .read_events::<ContractTestEvent>(filter, page)
607        .await
608        .map_err(|_error| {
609            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
610        })?;
611
612    // Then: Events are returned in global append order (A, B, C)
613    if events.len() != 3 {
614        return Err(ContractTestFailure::assertion(
615            SCENARIO,
616            format!("expected 3 events but got {}", events.len()),
617        ));
618    }
619
620    // And: Verify complete ordering across all three streams
621    let (first_event, _) = &events[0];
622    if first_event.stream_id() != &stream_a {
623        return Err(ContractTestFailure::assertion(
624            SCENARIO,
625            format!(
626                "expected first event from stream_a but got from {:?}",
627                first_event.stream_id()
628            ),
629        ));
630    }
631
632    let (second_event, _) = &events[1];
633    if second_event.stream_id() != &stream_b {
634        return Err(ContractTestFailure::assertion(
635            SCENARIO,
636            format!(
637                "expected second event from stream_b but got from {:?}",
638                second_event.stream_id()
639            ),
640        ));
641    }
642
643    let (third_event, _) = &events[2];
644    if third_event.stream_id() != &stream_c {
645        return Err(ContractTestFailure::assertion(
646            SCENARIO,
647            format!(
648                "expected third event from stream_c but got from {:?}",
649                third_event.stream_id()
650            ),
651        ));
652    }
653
654    Ok(())
655}
656
657/// Contract test: Position-based resumption works correctly
658pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
659where
660    F: Fn() -> S + Send + Sync + Clone + 'static,
661    S: EventStore + EventReader + Send + Sync + 'static,
662{
663    const SCENARIO: &str = "position_based_resumption";
664
665    let store = make_store();
666
667    // Given: Events at positions 0, 1, 2, 3, 4 (5 events total)
668    let stream = contract_stream_id(SCENARIO, "stream")?;
669
670    let mut writes = register_contract_stream(
671        SCENARIO,
672        StreamWrites::new(),
673        &stream,
674        StreamVersion::new(0),
675    )?;
676
677    // Append 5 events
678    for _ in 0..5 {
679        writes = append_contract_event(SCENARIO, writes, &stream)?;
680    }
681
682    let _ = store
683        .append_events(writes)
684        .await
685        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
686
687    // Get position of third event (index 2, position 2)
688    let filter = EventFilter::all();
689    let page = EventPage::first(BatchSize::new(100));
690    let all_events = store
691        .read_events::<ContractTestEvent>(filter.clone(), page)
692        .await
693        .map_err(|_error| {
694            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
695        })?;
696
697    let (_third_event, third_position) = &all_events[2];
698
699    // When: Reading events after position 2
700    let page_after = EventPage::after(*third_position, BatchSize::new(100));
701    let events_after = store
702        .read_events::<ContractTestEvent>(filter, page_after)
703        .await
704        .map_err(|_error| {
705            ContractTestFailure::assertion(
706                SCENARIO,
707                "read_events failed when reading after position",
708            )
709        })?;
710
711    // Then: Only events at positions 3 and 4 are returned (2 events)
712    if events_after.len() != 2 {
713        return Err(ContractTestFailure::assertion(
714            SCENARIO,
715            format!(
716                "expected 2 events after position {} but got {}",
717                third_position,
718                events_after.len()
719            ),
720        ));
721    }
722
723    // And: Position 2 event is NOT included (verify exclusivity)
724    for (_event, position) in events_after.iter() {
725        if *position == *third_position {
726            return Err(ContractTestFailure::assertion(
727                SCENARIO,
728                format!(
729                    "expected position {} to be excluded but it was included in results",
730                    third_position
731                ),
732            ));
733        }
734    }
735
736    // And: Returned event positions are greater than third_position and in ascending order
737    let (_event1, pos1) = &events_after[0];
738    let (_event2, pos2) = &events_after[1];
739
740    if *pos1 <= *third_position {
741        return Err(ContractTestFailure::assertion(
742            SCENARIO,
743            format!(
744                "expected first returned position to be > {} but got {}",
745                third_position, pos1
746            ),
747        ));
748    }
749
750    if *pos2 <= *pos1 {
751        return Err(ContractTestFailure::assertion(
752            SCENARIO,
753            format!(
754                "expected positions to be in ascending order but {} <= {}",
755                pos2, pos1
756            ),
757        ));
758    }
759
760    Ok(())
761}
762
763/// Contract test: Stream prefix filtering returns only matching streams
764pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
765where
766    F: Fn() -> S + Send + Sync + Clone + 'static,
767    S: EventStore + EventReader + Send + Sync + 'static,
768{
769    const SCENARIO: &str = "stream_prefix_filtering";
770
771    let store = make_store();
772
773    // Given: Events on streams with IDs that actually start with "account-" or "order-"
774    let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
775        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
776    })?;
777    let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
778        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
779    })?;
780    let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
781        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
782    })?;
783
784    let mut writes = register_contract_stream(
785        SCENARIO,
786        StreamWrites::new(),
787        &account_1,
788        StreamVersion::new(0),
789    )?;
790    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
791    writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
792
793    writes = append_contract_event(SCENARIO, writes, &account_1)?;
794    writes = append_contract_event(SCENARIO, writes, &account_2)?;
795    writes = append_contract_event(SCENARIO, writes, &order_1)?;
796
797    let _ = store
798        .append_events(writes)
799        .await
800        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
801
802    // When: Reading with prefix filter "account-"
803    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
804        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
805    })?;
806    let filter = EventFilter::prefix(prefix);
807    let page = EventPage::first(BatchSize::new(100));
808    let events = store
809        .read_events::<ContractTestEvent>(filter, page)
810        .await
811        .map_err(|_error| {
812            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
813        })?;
814
815    // Then: Only events from account-1 and account-2 are returned
816    if events.len() != 2 {
817        return Err(ContractTestFailure::assertion(
818            SCENARIO,
819            format!(
820                "expected 2 events from account-* streams but got {}",
821                events.len()
822            ),
823        ));
824    }
825
826    // And: All events are from streams starting with "account-"
827    for (event, _) in events.iter() {
828        let stream_id_str = event.stream_id().as_ref();
829        if !stream_id_str.starts_with("account-") {
830            return Err(ContractTestFailure::assertion(
831                SCENARIO,
832                format!(
833                    "expected all events from streams starting with 'account-' but found event from {}",
834                    stream_id_str
835                ),
836            ));
837        }
838    }
839
840    // And: order-1 events are filtered out (verified by length check above)
841
842    Ok(())
843}
844
845/// Contract test: Stream prefix filtering requires true prefix match (not substring match)
846pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
847where
848    F: Fn() -> S + Send + Sync + Clone + 'static,
849    S: EventStore + EventReader + Send + Sync + 'static,
850{
851    const SCENARIO: &str = "stream_prefix_requires_prefix_match";
852
853    let store = make_store();
854
855    // Given: Three streams with actual prefixes: "account-123", "my-account-456", "order-789"
856    // We want to verify that prefix "account-" matches ONLY "account-123", not "my-account-456"
857    let account_stream =
858        StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
859            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
860        })?;
861    let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
862        .map_err(|e| {
863            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
864        })?;
865    let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
866        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
867    })?;
868
869    let mut writes = register_contract_stream(
870        SCENARIO,
871        StreamWrites::new(),
872        &account_stream,
873        StreamVersion::new(0),
874    )?;
875    writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
876    writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
877
878    writes = append_contract_event(SCENARIO, writes, &account_stream)?;
879    writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
880    writes = append_contract_event(SCENARIO, writes, &order_stream)?;
881
882    let _ = store
883        .append_events(writes)
884        .await
885        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
886
887    // When: Reading with prefix filter "account-"
888    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
889        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
890    })?;
891    let filter = EventFilter::prefix(prefix);
892    let page = EventPage::first(BatchSize::new(100));
893    let events = store
894        .read_events::<ContractTestEvent>(filter, page)
895        .await
896        .map_err(|_error| {
897            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
898        })?;
899
900    // Then: ONLY "account-123" stream should be returned (not "my-account-456")
901    if events.len() != 1 {
902        return Err(ContractTestFailure::assertion(
903            SCENARIO,
904            format!(
905                "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
906                events.len()
907            ),
908        ));
909    }
910
911    // And: The event must be from a stream starting with "account-123"
912    let (event, _) = &events[0];
913    let stream_id_str = event.stream_id().as_ref();
914    if !stream_id_str.starts_with("account-123") {
915        return Err(ContractTestFailure::assertion(
916            SCENARIO,
917            format!(
918                "expected event from stream starting with 'account-123' but got from {}",
919                stream_id_str
920            ),
921        ));
922    }
923
924    // And: Verify it's NOT from my-account-456 (proves we're not doing substring matching)
925    if stream_id_str.starts_with("my-account-456") {
926        return Err(ContractTestFailure::assertion(
927            SCENARIO,
928            "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
929        ));
930    }
931
932    Ok(())
933}
934
935/// Contract test: Batch limiting returns exactly the specified number of events
936pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
937where
938    F: Fn() -> S + Send + Sync + Clone + 'static,
939    S: EventStore + EventReader + Send + Sync + 'static,
940{
941    const SCENARIO: &str = "batch_limiting";
942
943    let store = make_store();
944
945    // Given: 20 events in the store
946    let stream = contract_stream_id(SCENARIO, "stream")?;
947
948    let mut writes = register_contract_stream(
949        SCENARIO,
950        StreamWrites::new(),
951        &stream,
952        StreamVersion::new(0),
953    )?;
954
955    // Append 20 events
956    for _ in 0..20 {
957        writes = append_contract_event(SCENARIO, writes, &stream)?;
958    }
959
960    let _ = store
961        .append_events(writes)
962        .await
963        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
964
965    // When: Read events with limit of 10
966    let filter = EventFilter::all();
967    let page = EventPage::first(BatchSize::new(10));
968    let events = store
969        .read_events::<ContractTestEvent>(filter, page)
970        .await
971        .map_err(|_error| {
972            ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
973        })?;
974
975    // Then: Exactly 10 events are returned
976    if events.len() != 10 {
977        return Err(ContractTestFailure::assertion(
978            SCENARIO,
979            format!("expected exactly 10 events but got {}", events.len()),
980        ));
981    }
982
983    // And: Events are the FIRST 10 in global order
984    // (We verify this by checking we got exactly 10 events - the implementation
985    // must return events in order, so if we got 10 events they must be the first 10)
986
987    Ok(())
988}
989
990// ============================================================================
991// CheckpointStore Contract Tests
992// ============================================================================
993
994/// Contract test: Save a checkpoint and load it back
995pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
996where
997    F: Fn() -> CS + Send + Sync + Clone + 'static,
998    CS: CheckpointStore + Send + Sync + 'static,
999{
1000    const SCENARIO: &str = "checkpoint_save_and_load";
1001
1002    let store = make_checkpoint_store();
1003
1004    // Given: A subscription name and position
1005    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1006    let position = StreamPosition::new(Uuid::now_v7());
1007
1008    // When: Saving the checkpoint
1009    store
1010        .save(&subscription_name, position)
1011        .await
1012        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1013
1014    // Then: Loading returns the saved position
1015    let loaded = store
1016        .load(&subscription_name)
1017        .await
1018        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1019
1020    if loaded != Some(position) {
1021        return Err(ContractTestFailure::assertion(
1022            SCENARIO,
1023            format!(
1024                "expected loaded position {:?} but got {:?}",
1025                Some(position),
1026                loaded
1027            ),
1028        ));
1029    }
1030
1031    Ok(())
1032}
1033
1034/// Contract test: Saving a checkpoint overwrites the previous value
1035pub async fn test_checkpoint_update_overwrites<F, CS>(
1036    make_checkpoint_store: F,
1037) -> ContractTestResult
1038where
1039    F: Fn() -> CS + Send + Sync + Clone + 'static,
1040    CS: CheckpointStore + Send + Sync + 'static,
1041{
1042    const SCENARIO: &str = "checkpoint_update_overwrites";
1043
1044    let store = make_checkpoint_store();
1045
1046    // Given: A subscription with an initial checkpoint
1047    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1048    let first_position = StreamPosition::new(Uuid::now_v7());
1049
1050    store
1051        .save(&subscription_name, first_position)
1052        .await
1053        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1054
1055    // When: Saving a new position
1056    let second_position = StreamPosition::new(Uuid::now_v7());
1057    store
1058        .save(&subscription_name, second_position)
1059        .await
1060        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1061
1062    // Then: Loading returns the new position, not the old one
1063    let loaded = store
1064        .load(&subscription_name)
1065        .await
1066        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1067
1068    if loaded != Some(second_position) {
1069        return Err(ContractTestFailure::assertion(
1070            SCENARIO,
1071            format!(
1072                "expected updated position {:?} but got {:?}",
1073                Some(second_position),
1074                loaded
1075            ),
1076        ));
1077    }
1078
1079    Ok(())
1080}
1081
1082/// Contract test: Loading a non-existent checkpoint returns None
1083pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1084    make_checkpoint_store: F,
1085) -> ContractTestResult
1086where
1087    F: Fn() -> CS + Send + Sync + Clone + 'static,
1088    CS: CheckpointStore + Send + Sync + 'static,
1089{
1090    const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1091
1092    let store = make_checkpoint_store();
1093
1094    // Given: A subscription name that has never been saved
1095    let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1096
1097    // When: Loading the checkpoint
1098    let loaded = store
1099        .load(&subscription_name)
1100        .await
1101        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1102
1103    // Then: None is returned
1104    if loaded.is_some() {
1105        return Err(ContractTestFailure::assertion(
1106            SCENARIO,
1107            format!("expected None for missing checkpoint but got {:?}", loaded),
1108        ));
1109    }
1110
1111    Ok(())
1112}
1113
1114/// Contract test: Different subscription names have independent checkpoints
1115pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1116    make_checkpoint_store: F,
1117) -> ContractTestResult
1118where
1119    F: Fn() -> CS + Send + Sync + Clone + 'static,
1120    CS: CheckpointStore + Send + Sync + 'static,
1121{
1122    const SCENARIO: &str = "checkpoint_independent_subscriptions";
1123
1124    let store = make_checkpoint_store();
1125
1126    // Given: Two subscription names
1127    let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1128    let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1129
1130    let position_a = StreamPosition::new(Uuid::now_v7());
1131    let position_b = StreamPosition::new(Uuid::now_v7());
1132
1133    // When: Saving different positions for each
1134    store
1135        .save(&subscription_a, position_a)
1136        .await
1137        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1138
1139    store
1140        .save(&subscription_b, position_b)
1141        .await
1142        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1143
1144    // Then: Each subscription loads its own position
1145    let loaded_a = store
1146        .load(&subscription_a)
1147        .await
1148        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1149
1150    let loaded_b = store
1151        .load(&subscription_b)
1152        .await
1153        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1154
1155    if loaded_a != Some(position_a) {
1156        return Err(ContractTestFailure::assertion(
1157            SCENARIO,
1158            format!(
1159                "subscription A: expected {:?} but got {:?}",
1160                Some(position_a),
1161                loaded_a
1162            ),
1163        ));
1164    }
1165
1166    if loaded_b != Some(position_b) {
1167        return Err(ContractTestFailure::assertion(
1168            SCENARIO,
1169            format!(
1170                "subscription B: expected {:?} but got {:?}",
1171                Some(position_b),
1172                loaded_b
1173            ),
1174        ));
1175    }
1176
1177    Ok(())
1178}