eventcore_testing/
contract.rs

1use eventcore_types::{
2    BatchSize, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, StreamId,
3    StreamPrefix, StreamVersion, StreamWrites,
4};
5use std::fmt;
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug)]
11pub struct ContractTestFailure {
12    scenario: &'static str,
13    detail: String,
14}
15
16impl ContractTestFailure {
17    fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
18        Self {
19            scenario,
20            detail: detail.into(),
21        }
22    }
23
24    fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
25        Self::new(scenario, format!("builder failure during {phase}: {error}"))
26    }
27
28    fn store_error(
29        scenario: &'static str,
30        operation: &'static str,
31        error: EventStoreError,
32    ) -> Self {
33        Self::new(
34            scenario,
35            format!("{operation} operation returned unexpected error: {error}"),
36        )
37    }
38
39    fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
40        Self::new(scenario, detail)
41    }
42}
43
44impl fmt::Display for ContractTestFailure {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        write!(f, "[{}] {}", self.scenario, self.detail)
47    }
48}
49
50impl std::error::Error for ContractTestFailure {}
51
52pub type ContractTestResult = Result<(), ContractTestFailure>;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ContractTestEvent {
56    stream_id: StreamId,
57}
58
59impl ContractTestEvent {
60    pub fn new(stream_id: StreamId) -> Self {
61        Self { stream_id }
62    }
63}
64
65impl Event for ContractTestEvent {
66    fn stream_id(&self) -> &StreamId {
67        &self.stream_id
68    }
69}
70
71fn contract_stream_id(
72    scenario: &'static str,
73    label: &str,
74) -> Result<StreamId, ContractTestFailure> {
75    // Include UUID for parallel test execution against shared database
76    let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
77
78    StreamId::try_new(raw.clone()).map_err(|error| {
79        ContractTestFailure::assertion(
80            scenario,
81            format!("unable to construct stream id `{}`: {}", raw, error),
82        )
83    })
84}
85
86fn builder_step(
87    scenario: &'static str,
88    phase: &'static str,
89    result: Result<StreamWrites, EventStoreError>,
90) -> Result<StreamWrites, ContractTestFailure> {
91    result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
92}
93
94fn register_contract_stream(
95    scenario: &'static str,
96    writes: StreamWrites,
97    stream_id: &StreamId,
98    expected_version: StreamVersion,
99) -> Result<StreamWrites, ContractTestFailure> {
100    builder_step(
101        scenario,
102        "register_stream",
103        writes.register_stream(stream_id.clone(), expected_version),
104    )
105}
106
107fn append_contract_event(
108    scenario: &'static str,
109    writes: StreamWrites,
110    stream_id: &StreamId,
111) -> Result<StreamWrites, ContractTestFailure> {
112    let event = ContractTestEvent::new(stream_id.clone());
113    builder_step(scenario, "append", writes.append(event))
114}
115
116pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
117where
118    F: Fn() -> S + Send + Sync + Clone + 'static,
119    S: EventStore + Send + Sync + 'static,
120{
121    const SCENARIO: &str = "basic_read_write";
122
123    let store = make_store();
124    let stream_id = contract_stream_id(SCENARIO, "single");
125
126    let stream_id = stream_id?;
127
128    let writes = register_contract_stream(
129        SCENARIO,
130        StreamWrites::new(),
131        &stream_id,
132        StreamVersion::new(0),
133    )?;
134    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
135
136    let _ = store
137        .append_events(writes)
138        .await
139        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
140
141    let reader = store
142        .read_stream::<ContractTestEvent>(stream_id.clone())
143        .await
144        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146    let len = reader.len();
147    let empty = reader.is_empty();
148
149    if empty {
150        return Err(ContractTestFailure::assertion(
151            SCENARIO,
152            "expected stream to contain events but it was empty",
153        ));
154    }
155
156    if len != 1 {
157        return Err(ContractTestFailure::assertion(
158            SCENARIO,
159            format!(
160                "expected stream to contain exactly one event, observed len={}",
161                len
162            ),
163        ));
164    }
165
166    Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171    F: Fn() -> S + Send + Sync + Clone + 'static,
172    S: EventStore + Send + Sync + 'static,
173{
174    const SCENARIO: &str = "concurrent_version_conflicts";
175
176    let store = make_store();
177    let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179    let first_writes = register_contract_stream(
180        SCENARIO,
181        StreamWrites::new(),
182        &stream_id,
183        StreamVersion::new(0),
184    )?;
185    let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187    let _ = store
188        .append_events(first_writes)
189        .await
190        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192    let conflicting_writes = register_contract_stream(
193        SCENARIO,
194        StreamWrites::new(),
195        &stream_id,
196        StreamVersion::new(0),
197    )?;
198    let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200    match store.append_events(conflicting_writes).await {
201        Err(EventStoreError::VersionConflict) => Ok(()),
202        Err(error) => Err(ContractTestFailure::store_error(
203            SCENARIO,
204            "append_events",
205            error,
206        )),
207        Ok(_) => Err(ContractTestFailure::assertion(
208            SCENARIO,
209            "expected version conflict but append succeeded",
210        )),
211    }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216    F: Fn() -> S + Send + Sync + Clone + 'static,
217    S: EventStore + Send + Sync + 'static,
218{
219    const SCENARIO: &str = "stream_isolation";
220
221    let store = make_store();
222    let left_stream = contract_stream_id(SCENARIO, "left")?;
223    let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225    let writes = register_contract_stream(
226        SCENARIO,
227        StreamWrites::new(),
228        &left_stream,
229        StreamVersion::new(0),
230    )?;
231    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235    let _ = store
236        .append_events(writes)
237        .await
238        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240    let left_reader = store
241        .read_stream::<ContractTestEvent>(left_stream.clone())
242        .await
243        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244
245    let right_reader = store
246        .read_stream::<ContractTestEvent>(right_stream.clone())
247        .await
248        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
249
250    let left_len = left_reader.len();
251    if left_len != 1 {
252        return Err(ContractTestFailure::assertion(
253            SCENARIO,
254            format!(
255                "left stream expected exactly one event but observed {}",
256                left_len
257            ),
258        ));
259    }
260
261    if left_reader
262        .iter()
263        .any(|event| event.stream_id() != &left_stream)
264    {
265        return Err(ContractTestFailure::assertion(
266            SCENARIO,
267            "left stream read events belonging to another stream",
268        ));
269    }
270
271    let right_len = right_reader.len();
272    if right_len != 1 {
273        return Err(ContractTestFailure::assertion(
274            SCENARIO,
275            format!(
276                "right stream expected exactly one event but observed {}",
277                right_len
278            ),
279        ));
280    }
281
282    if right_reader
283        .iter()
284        .any(|event| event.stream_id() != &right_stream)
285    {
286        return Err(ContractTestFailure::assertion(
287            SCENARIO,
288            "right stream read events belonging to another stream",
289        ));
290    }
291
292    Ok(())
293}
294
295pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
296where
297    F: Fn() -> S + Send + Sync + Clone + 'static,
298    S: EventStore + Send + Sync + 'static,
299{
300    const SCENARIO: &str = "missing_stream_reads";
301
302    let store = make_store();
303    let stream_id = contract_stream_id(SCENARIO, "ghost")?;
304
305    let reader = store
306        .read_stream::<ContractTestEvent>(stream_id.clone())
307        .await
308        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
309
310    if !reader.is_empty() {
311        return Err(ContractTestFailure::assertion(
312            SCENARIO,
313            "expected read_stream to succeed with no events for an untouched stream",
314        ));
315    }
316
317    Ok(())
318}
319
320pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
321where
322    F: Fn() -> S + Send + Sync + Clone + 'static,
323    S: EventStore + Send + Sync + 'static,
324{
325    const SCENARIO: &str = "conflict_preserves_atomicity";
326
327    let store = make_store();
328    let left_stream = contract_stream_id(SCENARIO, "left")?;
329    let right_stream = contract_stream_id(SCENARIO, "right")?;
330
331    // Seed one event per stream so we can introduce a single-stream conflict later.
332    let writes = register_contract_stream(
333        SCENARIO,
334        StreamWrites::new(),
335        &left_stream,
336        StreamVersion::new(0),
337    )?;
338    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
339    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
340    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
341
342    let _ = store
343        .append_events(writes)
344        .await
345        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
346
347    // Build a batch where the left stream has a stale expected version and the right stream is current.
348    let writes = register_contract_stream(
349        SCENARIO,
350        StreamWrites::new(),
351        &left_stream,
352        StreamVersion::new(0),
353    )?;
354    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
355    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
356    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
357
358    match store.append_events(writes).await {
359        Err(EventStoreError::VersionConflict) => {
360            let left_reader = store
361                .read_stream::<ContractTestEvent>(left_stream.clone())
362                .await
363                .map_err(|error| {
364                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
365                })?;
366            if left_reader.len() != 1 {
367                return Err(ContractTestFailure::assertion(
368                    SCENARIO,
369                    format!(
370                        "expected left stream to remain at len=1 after failed append, observed {}",
371                        left_reader.len()
372                    ),
373                ));
374            }
375
376            let right_reader = store
377                .read_stream::<ContractTestEvent>(right_stream.clone())
378                .await
379                .map_err(|error| {
380                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
381                })?;
382            if right_reader.len() != 1 {
383                return Err(ContractTestFailure::assertion(
384                    SCENARIO,
385                    format!(
386                        "expected right stream to remain at len=1 after failed append, observed {}",
387                        right_reader.len()
388                    ),
389                ));
390            }
391
392            Ok(())
393        }
394        Err(error) => Err(ContractTestFailure::store_error(
395            SCENARIO,
396            "append_events",
397            error,
398        )),
399        Ok(_) => Err(ContractTestFailure::assertion(
400            SCENARIO,
401            "expected version conflict but append succeeded",
402        )),
403    }
404}
405
406#[macro_export]
407macro_rules! event_store_contract_tests {
408    (suite = $suite:ident, make_store = $make_store:expr $(,)?) => {
409        #[allow(non_snake_case)]
410        mod $suite {
411            use $crate::contract::{
412                test_basic_read_write, test_concurrent_version_conflicts,
413                test_conflict_preserves_atomicity, test_missing_stream_reads,
414                test_stream_isolation,
415            };
416
417            #[tokio::test(flavor = "multi_thread")]
418            async fn basic_read_write_contract() {
419                test_basic_read_write($make_store)
420                    .await
421                    .expect("event store contract failed");
422            }
423
424            #[tokio::test(flavor = "multi_thread")]
425            async fn concurrent_version_conflicts_contract() {
426                test_concurrent_version_conflicts($make_store)
427                    .await
428                    .expect("event store contract failed");
429            }
430
431            #[tokio::test(flavor = "multi_thread")]
432            async fn stream_isolation_contract() {
433                test_stream_isolation($make_store)
434                    .await
435                    .expect("event store contract failed");
436            }
437
438            #[tokio::test(flavor = "multi_thread")]
439            async fn missing_stream_reads_contract() {
440                test_missing_stream_reads($make_store)
441                    .await
442                    .expect("event store contract failed");
443            }
444
445            #[tokio::test(flavor = "multi_thread")]
446            async fn conflict_preserves_atomicity_contract() {
447                test_conflict_preserves_atomicity($make_store)
448                    .await
449                    .expect("event store contract failed");
450            }
451        }
452    };
453}
454
455#[macro_export]
456macro_rules! event_reader_contract_tests {
457    (suite = $suite:ident, make_store = $make_store:expr $(,)?) => {
458        #[allow(non_snake_case)]
459        mod $suite {
460            use $crate::contract::{
461                test_batch_limiting, test_event_ordering_across_streams,
462                test_position_based_resumption, test_stream_prefix_filtering,
463                test_stream_prefix_requires_prefix_match,
464            };
465
466            #[tokio::test(flavor = "multi_thread")]
467            async fn event_ordering_across_streams_contract() {
468                test_event_ordering_across_streams($make_store)
469                    .await
470                    .expect("event reader contract failed");
471            }
472
473            #[tokio::test(flavor = "multi_thread")]
474            async fn position_based_resumption_contract() {
475                test_position_based_resumption($make_store)
476                    .await
477                    .expect("event reader contract failed");
478            }
479
480            #[tokio::test(flavor = "multi_thread")]
481            async fn stream_prefix_filtering_contract() {
482                test_stream_prefix_filtering($make_store)
483                    .await
484                    .expect("event reader contract failed");
485            }
486
487            #[tokio::test(flavor = "multi_thread")]
488            async fn stream_prefix_requires_prefix_match_contract() {
489                test_stream_prefix_requires_prefix_match($make_store)
490                    .await
491                    .expect("event reader contract failed");
492            }
493
494            #[tokio::test(flavor = "multi_thread")]
495            async fn batch_limiting_contract() {
496                test_batch_limiting($make_store)
497                    .await
498                    .expect("event reader contract failed");
499            }
500        }
501    };
502}
503
504pub use event_reader_contract_tests;
505pub use event_store_contract_tests;
506
507#[macro_export]
508macro_rules! event_store_suite {
509    (suite = $suite:ident, make_store = $make_store:expr $(,)?) => {
510        #[allow(non_snake_case)]
511        mod $suite {
512            use $crate::contract::{
513                test_basic_read_write, test_batch_limiting, test_concurrent_version_conflicts,
514                test_conflict_preserves_atomicity, test_event_ordering_across_streams,
515                test_missing_stream_reads, test_position_based_resumption, test_stream_isolation,
516                test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
517            };
518
519            #[tokio::test(flavor = "multi_thread")]
520            async fn basic_read_write_contract() {
521                test_basic_read_write($make_store)
522                    .await
523                    .expect("event store contract failed");
524            }
525
526            #[tokio::test(flavor = "multi_thread")]
527            async fn concurrent_version_conflicts_contract() {
528                test_concurrent_version_conflicts($make_store)
529                    .await
530                    .expect("event store contract failed");
531            }
532
533            #[tokio::test(flavor = "multi_thread")]
534            async fn stream_isolation_contract() {
535                test_stream_isolation($make_store)
536                    .await
537                    .expect("event store contract failed");
538            }
539
540            #[tokio::test(flavor = "multi_thread")]
541            async fn missing_stream_reads_contract() {
542                test_missing_stream_reads($make_store)
543                    .await
544                    .expect("event store contract failed");
545            }
546
547            #[tokio::test(flavor = "multi_thread")]
548            async fn conflict_preserves_atomicity_contract() {
549                test_conflict_preserves_atomicity($make_store)
550                    .await
551                    .expect("event store contract failed");
552            }
553
554            #[tokio::test(flavor = "multi_thread")]
555            async fn event_ordering_across_streams_contract() {
556                test_event_ordering_across_streams($make_store)
557                    .await
558                    .expect("event reader contract failed");
559            }
560
561            #[tokio::test(flavor = "multi_thread")]
562            async fn position_based_resumption_contract() {
563                test_position_based_resumption($make_store)
564                    .await
565                    .expect("event reader contract failed");
566            }
567
568            #[tokio::test(flavor = "multi_thread")]
569            async fn stream_prefix_filtering_contract() {
570                test_stream_prefix_filtering($make_store)
571                    .await
572                    .expect("event reader contract failed");
573            }
574
575            #[tokio::test(flavor = "multi_thread")]
576            async fn stream_prefix_requires_prefix_match_contract() {
577                test_stream_prefix_requires_prefix_match($make_store)
578                    .await
579                    .expect("event reader contract failed");
580            }
581
582            #[tokio::test(flavor = "multi_thread")]
583            async fn batch_limiting_contract() {
584                test_batch_limiting($make_store)
585                    .await
586                    .expect("event reader contract failed");
587            }
588        }
589    };
590}
591
592pub use event_store_suite;
593
594/// Contract test: Events from multiple streams are read in global append order
595pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
596where
597    F: Fn() -> S + Send + Sync + Clone + 'static,
598    S: EventStore + EventReader + Send + Sync + 'static,
599{
600    const SCENARIO: &str = "event_ordering_across_streams";
601
602    let store = make_store();
603
604    // Given: Three streams with events appended in specific order
605    let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
606    let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
607    let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
608
609    // Append event to stream A
610    let writes = register_contract_stream(
611        SCENARIO,
612        StreamWrites::new(),
613        &stream_a,
614        StreamVersion::new(0),
615    )?;
616    let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
617    let _ = store
618        .append_events(writes)
619        .await
620        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
621
622    // Append event to stream B
623    let writes = register_contract_stream(
624        SCENARIO,
625        StreamWrites::new(),
626        &stream_b,
627        StreamVersion::new(0),
628    )?;
629    let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
630    let _ = store
631        .append_events(writes)
632        .await
633        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
634
635    // Append event to stream C
636    let writes = register_contract_stream(
637        SCENARIO,
638        StreamWrites::new(),
639        &stream_c,
640        StreamVersion::new(0),
641    )?;
642    let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
643    let _ = store
644        .append_events(writes)
645        .await
646        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
647
648    // When: Reading all events via EventReader with no position filter
649    let filter = EventFilter::all();
650    let page = EventPage::first(BatchSize::new(100));
651    let events = store
652        .read_events::<ContractTestEvent>(filter, page)
653        .await
654        .map_err(|_error| {
655            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
656        })?;
657
658    // Then: Events are returned in global append order (A, B, C)
659    if events.len() != 3 {
660        return Err(ContractTestFailure::assertion(
661            SCENARIO,
662            format!("expected 3 events but got {}", events.len()),
663        ));
664    }
665
666    // And: Verify complete ordering across all three streams
667    let (first_event, _) = &events[0];
668    if first_event.stream_id() != &stream_a {
669        return Err(ContractTestFailure::assertion(
670            SCENARIO,
671            format!(
672                "expected first event from stream_a but got from {:?}",
673                first_event.stream_id()
674            ),
675        ));
676    }
677
678    let (second_event, _) = &events[1];
679    if second_event.stream_id() != &stream_b {
680        return Err(ContractTestFailure::assertion(
681            SCENARIO,
682            format!(
683                "expected second event from stream_b but got from {:?}",
684                second_event.stream_id()
685            ),
686        ));
687    }
688
689    let (third_event, _) = &events[2];
690    if third_event.stream_id() != &stream_c {
691        return Err(ContractTestFailure::assertion(
692            SCENARIO,
693            format!(
694                "expected third event from stream_c but got from {:?}",
695                third_event.stream_id()
696            ),
697        ));
698    }
699
700    Ok(())
701}
702
703/// Contract test: Position-based resumption works correctly
704pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
705where
706    F: Fn() -> S + Send + Sync + Clone + 'static,
707    S: EventStore + EventReader + Send + Sync + 'static,
708{
709    const SCENARIO: &str = "position_based_resumption";
710
711    let store = make_store();
712
713    // Given: Events at positions 0, 1, 2, 3, 4 (5 events total)
714    let stream = contract_stream_id(SCENARIO, "stream")?;
715
716    let mut writes = register_contract_stream(
717        SCENARIO,
718        StreamWrites::new(),
719        &stream,
720        StreamVersion::new(0),
721    )?;
722
723    // Append 5 events
724    for _ in 0..5 {
725        writes = append_contract_event(SCENARIO, writes, &stream)?;
726    }
727
728    let _ = store
729        .append_events(writes)
730        .await
731        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
732
733    // Get position of third event (index 2, position 2)
734    let filter = EventFilter::all();
735    let page = EventPage::first(BatchSize::new(100));
736    let all_events = store
737        .read_events::<ContractTestEvent>(filter.clone(), page)
738        .await
739        .map_err(|_error| {
740            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
741        })?;
742
743    let (_third_event, third_position) = &all_events[2];
744
745    // When: Reading events after position 2
746    let page_after = EventPage::after(*third_position, BatchSize::new(100));
747    let events_after = store
748        .read_events::<ContractTestEvent>(filter, page_after)
749        .await
750        .map_err(|_error| {
751            ContractTestFailure::assertion(
752                SCENARIO,
753                "read_events failed when reading after position",
754            )
755        })?;
756
757    // Then: Only events at positions 3 and 4 are returned (2 events)
758    if events_after.len() != 2 {
759        return Err(ContractTestFailure::assertion(
760            SCENARIO,
761            format!(
762                "expected 2 events after position {} but got {}",
763                third_position,
764                events_after.len()
765            ),
766        ));
767    }
768
769    // And: Position 2 event is NOT included (verify exclusivity)
770    for (_event, position) in events_after.iter() {
771        if *position == *third_position {
772            return Err(ContractTestFailure::assertion(
773                SCENARIO,
774                format!(
775                    "expected position {} to be excluded but it was included in results",
776                    third_position
777                ),
778            ));
779        }
780    }
781
782    // And: Returned event positions are greater than third_position and in ascending order
783    let (_event1, pos1) = &events_after[0];
784    let (_event2, pos2) = &events_after[1];
785
786    if *pos1 <= *third_position {
787        return Err(ContractTestFailure::assertion(
788            SCENARIO,
789            format!(
790                "expected first returned position to be > {} but got {}",
791                third_position, pos1
792            ),
793        ));
794    }
795
796    if *pos2 <= *pos1 {
797        return Err(ContractTestFailure::assertion(
798            SCENARIO,
799            format!(
800                "expected positions to be in ascending order but {} <= {}",
801                pos2, pos1
802            ),
803        ));
804    }
805
806    Ok(())
807}
808
809/// Contract test: Stream prefix filtering returns only matching streams
810pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
811where
812    F: Fn() -> S + Send + Sync + Clone + 'static,
813    S: EventStore + EventReader + Send + Sync + 'static,
814{
815    const SCENARIO: &str = "stream_prefix_filtering";
816
817    let store = make_store();
818
819    // Given: Events on streams with IDs that actually start with "account-" or "order-"
820    let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
821        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
822    })?;
823    let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
824        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
825    })?;
826    let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
827        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
828    })?;
829
830    let mut writes = register_contract_stream(
831        SCENARIO,
832        StreamWrites::new(),
833        &account_1,
834        StreamVersion::new(0),
835    )?;
836    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
837    writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
838
839    writes = append_contract_event(SCENARIO, writes, &account_1)?;
840    writes = append_contract_event(SCENARIO, writes, &account_2)?;
841    writes = append_contract_event(SCENARIO, writes, &order_1)?;
842
843    let _ = store
844        .append_events(writes)
845        .await
846        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
847
848    // When: Reading with prefix filter "account-"
849    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
850        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
851    })?;
852    let filter = EventFilter::prefix(prefix);
853    let page = EventPage::first(BatchSize::new(100));
854    let events = store
855        .read_events::<ContractTestEvent>(filter, page)
856        .await
857        .map_err(|_error| {
858            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
859        })?;
860
861    // Then: Only events from account-1 and account-2 are returned
862    if events.len() != 2 {
863        return Err(ContractTestFailure::assertion(
864            SCENARIO,
865            format!(
866                "expected 2 events from account-* streams but got {}",
867                events.len()
868            ),
869        ));
870    }
871
872    // And: All events are from streams starting with "account-"
873    for (event, _) in events.iter() {
874        let stream_id_str = event.stream_id().as_ref();
875        if !stream_id_str.starts_with("account-") {
876            return Err(ContractTestFailure::assertion(
877                SCENARIO,
878                format!(
879                    "expected all events from streams starting with 'account-' but found event from {}",
880                    stream_id_str
881                ),
882            ));
883        }
884    }
885
886    // And: order-1 events are filtered out (verified by length check above)
887
888    Ok(())
889}
890
891/// Contract test: Stream prefix filtering requires true prefix match (not substring match)
892pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
893where
894    F: Fn() -> S + Send + Sync + Clone + 'static,
895    S: EventStore + EventReader + Send + Sync + 'static,
896{
897    const SCENARIO: &str = "stream_prefix_requires_prefix_match";
898
899    let store = make_store();
900
901    // Given: Three streams with actual prefixes: "account-123", "my-account-456", "order-789"
902    // We want to verify that prefix "account-" matches ONLY "account-123", not "my-account-456"
903    let account_stream =
904        StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
905            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
906        })?;
907    let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
908        .map_err(|e| {
909            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
910        })?;
911    let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
912        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
913    })?;
914
915    let mut writes = register_contract_stream(
916        SCENARIO,
917        StreamWrites::new(),
918        &account_stream,
919        StreamVersion::new(0),
920    )?;
921    writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
922    writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
923
924    writes = append_contract_event(SCENARIO, writes, &account_stream)?;
925    writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
926    writes = append_contract_event(SCENARIO, writes, &order_stream)?;
927
928    let _ = store
929        .append_events(writes)
930        .await
931        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
932
933    // When: Reading with prefix filter "account-"
934    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
935        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
936    })?;
937    let filter = EventFilter::prefix(prefix);
938    let page = EventPage::first(BatchSize::new(100));
939    let events = store
940        .read_events::<ContractTestEvent>(filter, page)
941        .await
942        .map_err(|_error| {
943            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
944        })?;
945
946    // Then: ONLY "account-123" stream should be returned (not "my-account-456")
947    if events.len() != 1 {
948        return Err(ContractTestFailure::assertion(
949            SCENARIO,
950            format!(
951                "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
952                events.len()
953            ),
954        ));
955    }
956
957    // And: The event must be from a stream starting with "account-123"
958    let (event, _) = &events[0];
959    let stream_id_str = event.stream_id().as_ref();
960    if !stream_id_str.starts_with("account-123") {
961        return Err(ContractTestFailure::assertion(
962            SCENARIO,
963            format!(
964                "expected event from stream starting with 'account-123' but got from {}",
965                stream_id_str
966            ),
967        ));
968    }
969
970    // And: Verify it's NOT from my-account-456 (proves we're not doing substring matching)
971    if stream_id_str.starts_with("my-account-456") {
972        return Err(ContractTestFailure::assertion(
973            SCENARIO,
974            "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
975        ));
976    }
977
978    Ok(())
979}
980
981/// Contract test: Batch limiting returns exactly the specified number of events
982pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
983where
984    F: Fn() -> S + Send + Sync + Clone + 'static,
985    S: EventStore + EventReader + Send + Sync + 'static,
986{
987    const SCENARIO: &str = "batch_limiting";
988
989    let store = make_store();
990
991    // Given: 20 events in the store
992    let stream = contract_stream_id(SCENARIO, "stream")?;
993
994    let mut writes = register_contract_stream(
995        SCENARIO,
996        StreamWrites::new(),
997        &stream,
998        StreamVersion::new(0),
999    )?;
1000
1001    // Append 20 events
1002    for _ in 0..20 {
1003        writes = append_contract_event(SCENARIO, writes, &stream)?;
1004    }
1005
1006    let _ = store
1007        .append_events(writes)
1008        .await
1009        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1010
1011    // When: Read events with limit of 10
1012    let filter = EventFilter::all();
1013    let page = EventPage::first(BatchSize::new(10));
1014    let events = store
1015        .read_events::<ContractTestEvent>(filter, page)
1016        .await
1017        .map_err(|_error| {
1018            ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1019        })?;
1020
1021    // Then: Exactly 10 events are returned
1022    if events.len() != 10 {
1023        return Err(ContractTestFailure::assertion(
1024            SCENARIO,
1025            format!("expected exactly 10 events but got {}", events.len()),
1026        ));
1027    }
1028
1029    // And: Events are the FIRST 10 in global order
1030    // (We verify this by checking we got exactly 10 events - the implementation
1031    // must return events in order, so if we got 10 events they must be the first 10)
1032
1033    Ok(())
1034}