Skip to main content

eventcore_testing/
contract.rs

1use eventcore_types::{
2    BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore,
3    EventStoreError, ProjectorCoordinator, StreamId, StreamPosition, StreamPrefix, StreamVersion,
4    StreamWrites,
5};
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug, thiserror::Error)]
11#[error("[{scenario}] {detail}")]
12pub struct ContractTestFailure {
13    scenario: &'static str,
14    detail: String,
15}
16
17impl ContractTestFailure {
18    fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
19        Self {
20            scenario,
21            detail: detail.into(),
22        }
23    }
24
25    fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
26        Self::new(scenario, format!("builder failure during {phase}: {error}"))
27    }
28
29    fn store_error(
30        scenario: &'static str,
31        operation: &'static str,
32        error: EventStoreError,
33    ) -> Self {
34        Self::new(
35            scenario,
36            format!("{operation} operation returned unexpected error: {error}"),
37        )
38    }
39
40    fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
41        Self::new(scenario, detail)
42    }
43}
44
45pub type ContractTestResult = Result<(), ContractTestFailure>;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContractTestEvent {
49    stream_id: StreamId,
50}
51
52impl ContractTestEvent {
53    pub fn new(stream_id: StreamId) -> Self {
54        Self { stream_id }
55    }
56}
57
58impl Event for ContractTestEvent {
59    fn stream_id(&self) -> &StreamId {
60        &self.stream_id
61    }
62
63    fn event_type_name() -> &'static str {
64        "ContractTestEvent"
65    }
66}
67
68fn contract_stream_id(
69    scenario: &'static str,
70    label: &str,
71) -> Result<StreamId, ContractTestFailure> {
72    // Include UUID for parallel test execution against shared database
73    let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
74
75    StreamId::try_new(raw.clone()).map_err(|error| {
76        ContractTestFailure::assertion(
77            scenario,
78            format!("unable to construct stream id `{}`: {}", raw, error),
79        )
80    })
81}
82
83fn builder_step(
84    scenario: &'static str,
85    phase: &'static str,
86    result: Result<StreamWrites, EventStoreError>,
87) -> Result<StreamWrites, ContractTestFailure> {
88    result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
89}
90
91fn register_contract_stream(
92    scenario: &'static str,
93    writes: StreamWrites,
94    stream_id: &StreamId,
95    expected_version: StreamVersion,
96) -> Result<StreamWrites, ContractTestFailure> {
97    builder_step(
98        scenario,
99        "register_stream",
100        writes.register_stream(stream_id.clone(), expected_version),
101    )
102}
103
104fn append_contract_event(
105    scenario: &'static str,
106    writes: StreamWrites,
107    stream_id: &StreamId,
108) -> Result<StreamWrites, ContractTestFailure> {
109    let event = ContractTestEvent::new(stream_id.clone());
110    builder_step(scenario, "append", writes.append(event))
111}
112
113pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
114where
115    F: Fn() -> S + Send + Sync + Clone + 'static,
116    S: EventStore + Send + Sync + 'static,
117{
118    const SCENARIO: &str = "basic_read_write";
119
120    let store = make_store();
121    let stream_id = contract_stream_id(SCENARIO, "single");
122
123    let stream_id = stream_id?;
124
125    let writes = register_contract_stream(
126        SCENARIO,
127        StreamWrites::new(),
128        &stream_id,
129        StreamVersion::new(0),
130    )?;
131    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
132
133    let _ = store
134        .append_events(writes)
135        .await
136        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
137
138    let reader = store
139        .read_stream::<ContractTestEvent>(stream_id.clone())
140        .await
141        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
142
143    let len = reader.len();
144    let empty = reader.is_empty();
145
146    if empty {
147        return Err(ContractTestFailure::assertion(
148            SCENARIO,
149            "expected stream to contain events but it was empty",
150        ));
151    }
152
153    if len != 1 {
154        return Err(ContractTestFailure::assertion(
155            SCENARIO,
156            format!(
157                "expected stream to contain exactly one event, observed len={}",
158                len
159            ),
160        ));
161    }
162
163    Ok(())
164}
165
166pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
167where
168    F: Fn() -> S + Send + Sync + Clone + 'static,
169    S: EventStore + Send + Sync + 'static,
170{
171    const SCENARIO: &str = "concurrent_version_conflicts";
172
173    let store = make_store();
174    let stream_id = contract_stream_id(SCENARIO, "shared")?;
175
176    let first_writes = register_contract_stream(
177        SCENARIO,
178        StreamWrites::new(),
179        &stream_id,
180        StreamVersion::new(0),
181    )?;
182    let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
183
184    let _ = store
185        .append_events(first_writes)
186        .await
187        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
188
189    let conflicting_writes = register_contract_stream(
190        SCENARIO,
191        StreamWrites::new(),
192        &stream_id,
193        StreamVersion::new(0),
194    )?;
195    let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
196
197    match store.append_events(conflicting_writes).await {
198        Err(EventStoreError::VersionConflict { .. }) => Ok(()),
199        Err(error) => Err(ContractTestFailure::store_error(
200            SCENARIO,
201            "append_events",
202            error,
203        )),
204        Ok(_) => Err(ContractTestFailure::assertion(
205            SCENARIO,
206            "expected version conflict but append succeeded",
207        )),
208    }
209}
210
211pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
212where
213    F: Fn() -> S + Send + Sync + Clone + 'static,
214    S: EventStore + Send + Sync + 'static,
215{
216    const SCENARIO: &str = "stream_isolation";
217
218    let store = make_store();
219    let left_stream = contract_stream_id(SCENARIO, "left")?;
220    let right_stream = contract_stream_id(SCENARIO, "right")?;
221
222    let writes = register_contract_stream(
223        SCENARIO,
224        StreamWrites::new(),
225        &left_stream,
226        StreamVersion::new(0),
227    )?;
228    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
229    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
230    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
231
232    let _ = store
233        .append_events(writes)
234        .await
235        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
236
237    let left_reader = store
238        .read_stream::<ContractTestEvent>(left_stream.clone())
239        .await
240        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
241
242    let right_reader = store
243        .read_stream::<ContractTestEvent>(right_stream.clone())
244        .await
245        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
246
247    let left_len = left_reader.len();
248    if left_len != 1 {
249        return Err(ContractTestFailure::assertion(
250            SCENARIO,
251            format!(
252                "left stream expected exactly one event but observed {}",
253                left_len
254            ),
255        ));
256    }
257
258    if left_reader
259        .iter()
260        .any(|event| event.stream_id() != &left_stream)
261    {
262        return Err(ContractTestFailure::assertion(
263            SCENARIO,
264            "left stream read events belonging to another stream",
265        ));
266    }
267
268    let right_len = right_reader.len();
269    if right_len != 1 {
270        return Err(ContractTestFailure::assertion(
271            SCENARIO,
272            format!(
273                "right stream expected exactly one event but observed {}",
274                right_len
275            ),
276        ));
277    }
278
279    if right_reader
280        .iter()
281        .any(|event| event.stream_id() != &right_stream)
282    {
283        return Err(ContractTestFailure::assertion(
284            SCENARIO,
285            "right stream read events belonging to another stream",
286        ));
287    }
288
289    Ok(())
290}
291
292pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
293where
294    F: Fn() -> S + Send + Sync + Clone + 'static,
295    S: EventStore + Send + Sync + 'static,
296{
297    const SCENARIO: &str = "missing_stream_reads";
298
299    let store = make_store();
300    let stream_id = contract_stream_id(SCENARIO, "ghost")?;
301
302    let reader = store
303        .read_stream::<ContractTestEvent>(stream_id.clone())
304        .await
305        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
306
307    if !reader.is_empty() {
308        return Err(ContractTestFailure::assertion(
309            SCENARIO,
310            "expected read_stream to succeed with no events for an untouched stream",
311        ));
312    }
313
314    Ok(())
315}
316
317pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
318where
319    F: Fn() -> S + Send + Sync + Clone + 'static,
320    S: EventStore + Send + Sync + 'static,
321{
322    const SCENARIO: &str = "conflict_preserves_atomicity";
323
324    let store = make_store();
325    let left_stream = contract_stream_id(SCENARIO, "left")?;
326    let right_stream = contract_stream_id(SCENARIO, "right")?;
327
328    // Seed one event per stream so we can introduce a single-stream conflict later.
329    let writes = register_contract_stream(
330        SCENARIO,
331        StreamWrites::new(),
332        &left_stream,
333        StreamVersion::new(0),
334    )?;
335    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
336    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
337    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
338
339    let _ = store
340        .append_events(writes)
341        .await
342        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
343
344    // Build a batch where the left stream has a stale expected version and the right stream is current.
345    let writes = register_contract_stream(
346        SCENARIO,
347        StreamWrites::new(),
348        &left_stream,
349        StreamVersion::new(0),
350    )?;
351    let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
352    let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
353    let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
354
355    match store.append_events(writes).await {
356        Err(EventStoreError::VersionConflict { .. }) => {
357            let left_reader = store
358                .read_stream::<ContractTestEvent>(left_stream.clone())
359                .await
360                .map_err(|error| {
361                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
362                })?;
363            if left_reader.len() != 1 {
364                return Err(ContractTestFailure::assertion(
365                    SCENARIO,
366                    format!(
367                        "expected left stream to remain at len=1 after failed append, observed {}",
368                        left_reader.len()
369                    ),
370                ));
371            }
372
373            let right_reader = store
374                .read_stream::<ContractTestEvent>(right_stream.clone())
375                .await
376                .map_err(|error| {
377                    ContractTestFailure::store_error(SCENARIO, "read_stream", error)
378                })?;
379            if right_reader.len() != 1 {
380                return Err(ContractTestFailure::assertion(
381                    SCENARIO,
382                    format!(
383                        "expected right stream to remain at len=1 after failed append, observed {}",
384                        right_reader.len()
385                    ),
386                ));
387            }
388
389            Ok(())
390        }
391        Err(error) => Err(ContractTestFailure::store_error(
392            SCENARIO,
393            "append_events",
394            error,
395        )),
396        Ok(_) => Err(ContractTestFailure::assertion(
397            SCENARIO,
398            "expected version conflict but append succeeded",
399        )),
400    }
401}
402
403/// A different event type used to test type mismatch detection in read_stream.
404#[derive(Debug, Clone, Serialize, Deserialize)]
405pub struct MismatchedEvent {
406    stream_id: StreamId,
407    extra_field: String,
408}
409
410impl Event for MismatchedEvent {
411    fn stream_id(&self) -> &StreamId {
412        &self.stream_id
413    }
414
415    fn event_type_name() -> &'static str {
416        "MismatchedEvent"
417    }
418}
419
420/// Contract test: read_stream errors when events on the stream don't match
421/// the requested type.
422///
423/// This verifies that all backends behave consistently when a stream contains
424/// events that were written with one type but read with a different type.
425/// The correct behavior is to return `EventStoreError::DeserializationFailed`,
426/// not to silently skip unrecognized events.
427pub async fn test_read_stream_errors_on_type_mismatch<F, S>(make_store: F) -> ContractTestResult
428where
429    F: Fn() -> S + Send + Sync + Clone + 'static,
430    S: EventStore + Send + Sync + 'static,
431{
432    const SCENARIO: &str = "read_stream_errors_on_type_mismatch";
433
434    let store = make_store();
435    let stream_id = contract_stream_id(SCENARIO, "mismatched")?;
436
437    // Write an event using ContractTestEvent
438    let writes = register_contract_stream(
439        SCENARIO,
440        StreamWrites::new(),
441        &stream_id,
442        StreamVersion::new(0),
443    )?;
444    let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
445
446    let _ = store
447        .append_events(writes)
448        .await
449        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
450
451    // Read the same stream but request a different event type
452    let result = store.read_stream::<MismatchedEvent>(stream_id).await;
453
454    match result {
455        Err(EventStoreError::DeserializationFailed { .. }) => Ok(()),
456        Err(other) => Err(ContractTestFailure::assertion(
457            SCENARIO,
458            format!(
459                "expected DeserializationFailed error, got different error: {}",
460                other
461            ),
462        )),
463        Ok(reader) if reader.is_empty() => Err(ContractTestFailure::assertion(
464            SCENARIO,
465            "read_stream silently returned empty results instead of erroring on type mismatch",
466        )),
467        Ok(reader) => Err(ContractTestFailure::assertion(
468            SCENARIO,
469            format!(
470                "read_stream returned {} events instead of erroring on type mismatch",
471                reader.len()
472            ),
473        )),
474    }
475}
476
477// NOTE: The old fragmented macros (event_store_contract_tests!, event_reader_contract_tests!)
478// have been removed. Use backend_contract_tests! which runs ALL contract tests.
479
480/// Unified contract test macro for all backend implementations.
481///
482/// This macro generates ALL contract tests for a backend implementation.
483/// When new contract tests are added to eventcore-testing, they automatically
484/// run for all backends that use this macro—no changes to backend test files required.
485///
486/// # Example
487///
488/// ```ignore
489/// backend_contract_tests! {
490///     suite = my_backend,
491///     make_store = || MyEventStore::new(),
492///     make_checkpoint_store = || MyCheckpointStore::new(),
493/// }
494/// ```
495///
496/// # Requirements
497///
498/// The store type must implement both `EventStore` and `EventReader` traits.
499/// The checkpoint store type must implement `CheckpointStore` trait.
500/// The coordinator type must implement `ProjectorCoordinator` trait.
501#[macro_export]
502macro_rules! backend_contract_tests {
503    (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr, make_coordinator = $make_coordinator:expr $(,)?) => {
504        #[allow(non_snake_case)]
505        mod $suite {
506            use $crate::contract::{
507                test_basic_read_write, test_batch_limiting,
508                test_checkpoint_independent_subscriptions,
509                test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load,
510                test_checkpoint_update_overwrites, test_concurrent_version_conflicts,
511                test_conflict_preserves_atomicity, test_coordination_acquire_leadership,
512                test_coordination_independent_subscriptions,
513                test_coordination_leadership_released_on_guard_drop,
514                test_coordination_second_instance_blocked, test_event_ordering_across_streams,
515                test_missing_stream_reads, test_position_based_resumption,
516                test_read_stream_errors_on_type_mismatch, test_stream_isolation,
517                test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
518            };
519
520            #[tokio::test(flavor = "multi_thread")]
521            async fn basic_read_write_contract() {
522                test_basic_read_write($make_store)
523                    .await
524                    .expect("event store contract failed");
525            }
526
527            #[tokio::test(flavor = "multi_thread")]
528            async fn concurrent_version_conflicts_contract() {
529                test_concurrent_version_conflicts($make_store)
530                    .await
531                    .expect("event store contract failed");
532            }
533
534            #[tokio::test(flavor = "multi_thread")]
535            async fn stream_isolation_contract() {
536                test_stream_isolation($make_store)
537                    .await
538                    .expect("event store contract failed");
539            }
540
541            #[tokio::test(flavor = "multi_thread")]
542            async fn missing_stream_reads_contract() {
543                test_missing_stream_reads($make_store)
544                    .await
545                    .expect("event store contract failed");
546            }
547
548            #[tokio::test(flavor = "multi_thread")]
549            async fn conflict_preserves_atomicity_contract() {
550                test_conflict_preserves_atomicity($make_store)
551                    .await
552                    .expect("event store contract failed");
553            }
554
555            #[tokio::test(flavor = "multi_thread")]
556            async fn read_stream_errors_on_type_mismatch_contract() {
557                test_read_stream_errors_on_type_mismatch($make_store)
558                    .await
559                    .expect("event store contract failed");
560            }
561
562            #[tokio::test(flavor = "multi_thread")]
563            async fn event_ordering_across_streams_contract() {
564                test_event_ordering_across_streams($make_store)
565                    .await
566                    .expect("event reader contract failed");
567            }
568
569            #[tokio::test(flavor = "multi_thread")]
570            async fn position_based_resumption_contract() {
571                test_position_based_resumption($make_store)
572                    .await
573                    .expect("event reader contract failed");
574            }
575
576            #[tokio::test(flavor = "multi_thread")]
577            async fn stream_prefix_filtering_contract() {
578                test_stream_prefix_filtering($make_store)
579                    .await
580                    .expect("event reader contract failed");
581            }
582
583            #[tokio::test(flavor = "multi_thread")]
584            async fn stream_prefix_requires_prefix_match_contract() {
585                test_stream_prefix_requires_prefix_match($make_store)
586                    .await
587                    .expect("event reader contract failed");
588            }
589
590            #[tokio::test(flavor = "multi_thread")]
591            async fn batch_limiting_contract() {
592                test_batch_limiting($make_store)
593                    .await
594                    .expect("event reader contract failed");
595            }
596
597            // CheckpointStore contract tests
598            #[tokio::test(flavor = "multi_thread")]
599            async fn checkpoint_save_and_load_contract() {
600                test_checkpoint_save_and_load($make_checkpoint_store)
601                    .await
602                    .expect("checkpoint store contract failed");
603            }
604
605            #[tokio::test(flavor = "multi_thread")]
606            async fn checkpoint_update_overwrites_contract() {
607                test_checkpoint_update_overwrites($make_checkpoint_store)
608                    .await
609                    .expect("checkpoint store contract failed");
610            }
611
612            #[tokio::test(flavor = "multi_thread")]
613            async fn checkpoint_load_missing_returns_none_contract() {
614                test_checkpoint_load_missing_returns_none($make_checkpoint_store)
615                    .await
616                    .expect("checkpoint store contract failed");
617            }
618
619            #[tokio::test(flavor = "multi_thread")]
620            async fn checkpoint_independent_subscriptions_contract() {
621                test_checkpoint_independent_subscriptions($make_checkpoint_store)
622                    .await
623                    .expect("checkpoint store contract failed");
624            }
625
626            // ProjectorCoordinator contract tests
627            #[tokio::test(flavor = "multi_thread")]
628            async fn coordination_acquire_leadership_contract() {
629                test_coordination_acquire_leadership($make_coordinator)
630                    .await
631                    .expect("coordinator contract failed");
632            }
633
634            #[tokio::test(flavor = "multi_thread")]
635            async fn coordination_second_instance_blocked_contract() {
636                test_coordination_second_instance_blocked($make_coordinator)
637                    .await
638                    .expect("coordinator contract failed");
639            }
640
641            #[tokio::test(flavor = "multi_thread")]
642            async fn coordination_independent_subscriptions_contract() {
643                test_coordination_independent_subscriptions($make_coordinator)
644                    .await
645                    .expect("coordinator contract failed");
646            }
647
648            #[tokio::test(flavor = "multi_thread")]
649            async fn coordination_leadership_released_on_guard_drop_contract() {
650                test_coordination_leadership_released_on_guard_drop($make_coordinator)
651                    .await
652                    .expect("coordinator contract failed");
653            }
654        }
655    };
656}
657
658pub use backend_contract_tests;
659
660/// Contract test: Events from multiple streams are read in global append order
661pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
662where
663    F: Fn() -> S + Send + Sync + Clone + 'static,
664    S: EventStore + EventReader + Send + Sync + 'static,
665{
666    const SCENARIO: &str = "event_ordering_across_streams";
667
668    let store = make_store();
669
670    // Given: Three streams with events appended in specific order
671    let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
672    let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
673    let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
674
675    // Append event to stream A
676    let writes = register_contract_stream(
677        SCENARIO,
678        StreamWrites::new(),
679        &stream_a,
680        StreamVersion::new(0),
681    )?;
682    let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
683    let _ = store
684        .append_events(writes)
685        .await
686        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
687
688    // Append event to stream B
689    let writes = register_contract_stream(
690        SCENARIO,
691        StreamWrites::new(),
692        &stream_b,
693        StreamVersion::new(0),
694    )?;
695    let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
696    let _ = store
697        .append_events(writes)
698        .await
699        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
700
701    // Append event to stream C
702    let writes = register_contract_stream(
703        SCENARIO,
704        StreamWrites::new(),
705        &stream_c,
706        StreamVersion::new(0),
707    )?;
708    let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
709    let _ = store
710        .append_events(writes)
711        .await
712        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
713
714    // When: Reading all events via EventReader with no position filter
715    let filter = EventFilter::all();
716    let page = EventPage::first(BatchSize::new(100));
717    let events = store
718        .read_events::<ContractTestEvent>(filter, page)
719        .await
720        .map_err(|_error| {
721            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
722        })?;
723
724    // Then: Events are returned in global append order (A, B, C)
725    if events.len() != 3 {
726        return Err(ContractTestFailure::assertion(
727            SCENARIO,
728            format!("expected 3 events but got {}", events.len()),
729        ));
730    }
731
732    // And: Verify complete ordering across all three streams
733    let (first_event, _) = &events[0];
734    if first_event.stream_id() != &stream_a {
735        return Err(ContractTestFailure::assertion(
736            SCENARIO,
737            format!(
738                "expected first event from stream_a but got from {:?}",
739                first_event.stream_id()
740            ),
741        ));
742    }
743
744    let (second_event, _) = &events[1];
745    if second_event.stream_id() != &stream_b {
746        return Err(ContractTestFailure::assertion(
747            SCENARIO,
748            format!(
749                "expected second event from stream_b but got from {:?}",
750                second_event.stream_id()
751            ),
752        ));
753    }
754
755    let (third_event, _) = &events[2];
756    if third_event.stream_id() != &stream_c {
757        return Err(ContractTestFailure::assertion(
758            SCENARIO,
759            format!(
760                "expected third event from stream_c but got from {:?}",
761                third_event.stream_id()
762            ),
763        ));
764    }
765
766    Ok(())
767}
768
769/// Contract test: Position-based resumption works correctly
770pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
771where
772    F: Fn() -> S + Send + Sync + Clone + 'static,
773    S: EventStore + EventReader + Send + Sync + 'static,
774{
775    const SCENARIO: &str = "position_based_resumption";
776
777    let store = make_store();
778
779    // Given: Events at positions 0, 1, 2, 3, 4 (5 events total)
780    let stream = contract_stream_id(SCENARIO, "stream")?;
781
782    let mut writes = register_contract_stream(
783        SCENARIO,
784        StreamWrites::new(),
785        &stream,
786        StreamVersion::new(0),
787    )?;
788
789    // Append 5 events
790    for _ in 0..5 {
791        writes = append_contract_event(SCENARIO, writes, &stream)?;
792    }
793
794    let _ = store
795        .append_events(writes)
796        .await
797        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
798
799    // Get position of third event (index 2, position 2)
800    let filter = EventFilter::all();
801    let page = EventPage::first(BatchSize::new(100));
802    let all_events = store
803        .read_events::<ContractTestEvent>(filter.clone(), page)
804        .await
805        .map_err(|_error| {
806            ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
807        })?;
808
809    let (_third_event, third_position) = &all_events[2];
810
811    // When: Reading events after position 2
812    let page_after = EventPage::after(*third_position, BatchSize::new(100));
813    let events_after = store
814        .read_events::<ContractTestEvent>(filter, page_after)
815        .await
816        .map_err(|_error| {
817            ContractTestFailure::assertion(
818                SCENARIO,
819                "read_events failed when reading after position",
820            )
821        })?;
822
823    // Then: Only events at positions 3 and 4 are returned (2 events)
824    if events_after.len() != 2 {
825        return Err(ContractTestFailure::assertion(
826            SCENARIO,
827            format!(
828                "expected 2 events after position {} but got {}",
829                third_position,
830                events_after.len()
831            ),
832        ));
833    }
834
835    // And: Position 2 event is NOT included (verify exclusivity)
836    for (_event, position) in events_after.iter() {
837        if *position == *third_position {
838            return Err(ContractTestFailure::assertion(
839                SCENARIO,
840                format!(
841                    "expected position {} to be excluded but it was included in results",
842                    third_position
843                ),
844            ));
845        }
846    }
847
848    // And: Returned event positions are greater than third_position and in ascending order
849    let (_event1, pos1) = &events_after[0];
850    let (_event2, pos2) = &events_after[1];
851
852    if *pos1 <= *third_position {
853        return Err(ContractTestFailure::assertion(
854            SCENARIO,
855            format!(
856                "expected first returned position to be > {} but got {}",
857                third_position, pos1
858            ),
859        ));
860    }
861
862    if *pos2 <= *pos1 {
863        return Err(ContractTestFailure::assertion(
864            SCENARIO,
865            format!(
866                "expected positions to be in ascending order but {} <= {}",
867                pos2, pos1
868            ),
869        ));
870    }
871
872    Ok(())
873}
874
875/// Contract test: Stream prefix filtering returns only matching streams
876pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
877where
878    F: Fn() -> S + Send + Sync + Clone + 'static,
879    S: EventStore + EventReader + Send + Sync + 'static,
880{
881    const SCENARIO: &str = "stream_prefix_filtering";
882
883    let store = make_store();
884
885    // Given: Events on streams with IDs that actually start with "account-" or "order-"
886    let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
887        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
888    })?;
889    let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
890        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
891    })?;
892    let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
893        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
894    })?;
895
896    let mut writes = register_contract_stream(
897        SCENARIO,
898        StreamWrites::new(),
899        &account_1,
900        StreamVersion::new(0),
901    )?;
902    writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
903    writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
904
905    writes = append_contract_event(SCENARIO, writes, &account_1)?;
906    writes = append_contract_event(SCENARIO, writes, &account_2)?;
907    writes = append_contract_event(SCENARIO, writes, &order_1)?;
908
909    let _ = store
910        .append_events(writes)
911        .await
912        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
913
914    // When: Reading with prefix filter "account-"
915    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
916        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
917    })?;
918    let filter = EventFilter::prefix(prefix);
919    let page = EventPage::first(BatchSize::new(100));
920    let events = store
921        .read_events::<ContractTestEvent>(filter, page)
922        .await
923        .map_err(|_error| {
924            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
925        })?;
926
927    // Then: Only events from account-1 and account-2 are returned
928    if events.len() != 2 {
929        return Err(ContractTestFailure::assertion(
930            SCENARIO,
931            format!(
932                "expected 2 events from account-* streams but got {}",
933                events.len()
934            ),
935        ));
936    }
937
938    // And: All events are from streams starting with "account-"
939    for (event, _) in events.iter() {
940        let stream_id_str = event.stream_id().as_ref();
941        if !stream_id_str.starts_with("account-") {
942            return Err(ContractTestFailure::assertion(
943                SCENARIO,
944                format!(
945                    "expected all events from streams starting with 'account-' but found event from {}",
946                    stream_id_str
947                ),
948            ));
949        }
950    }
951
952    // And: order-1 events are filtered out (verified by length check above)
953
954    Ok(())
955}
956
957/// Contract test: Stream prefix filtering requires true prefix match (not substring match)
958pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
959where
960    F: Fn() -> S + Send + Sync + Clone + 'static,
961    S: EventStore + EventReader + Send + Sync + 'static,
962{
963    const SCENARIO: &str = "stream_prefix_requires_prefix_match";
964
965    let store = make_store();
966
967    // Given: Three streams with actual prefixes: "account-123", "my-account-456", "order-789"
968    // We want to verify that prefix "account-" matches ONLY "account-123", not "my-account-456"
969    let account_stream =
970        StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
971            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
972        })?;
973    let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
974        .map_err(|e| {
975            ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
976        })?;
977    let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
978        ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
979    })?;
980
981    let mut writes = register_contract_stream(
982        SCENARIO,
983        StreamWrites::new(),
984        &account_stream,
985        StreamVersion::new(0),
986    )?;
987    writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
988    writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
989
990    writes = append_contract_event(SCENARIO, writes, &account_stream)?;
991    writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
992    writes = append_contract_event(SCENARIO, writes, &order_stream)?;
993
994    let _ = store
995        .append_events(writes)
996        .await
997        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
998
999    // When: Reading with prefix filter "account-"
1000    let prefix = StreamPrefix::try_new("account-").map_err(|e| {
1001        ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
1002    })?;
1003    let filter = EventFilter::prefix(prefix);
1004    let page = EventPage::first(BatchSize::new(100));
1005    let events = store
1006        .read_events::<ContractTestEvent>(filter, page)
1007        .await
1008        .map_err(|_error| {
1009            ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
1010        })?;
1011
1012    // Then: ONLY "account-123" stream should be returned (not "my-account-456")
1013    if events.len() != 1 {
1014        return Err(ContractTestFailure::assertion(
1015            SCENARIO,
1016            format!(
1017                "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
1018                events.len()
1019            ),
1020        ));
1021    }
1022
1023    // And: The event must be from a stream starting with "account-123"
1024    let (event, _) = &events[0];
1025    let stream_id_str = event.stream_id().as_ref();
1026    if !stream_id_str.starts_with("account-123") {
1027        return Err(ContractTestFailure::assertion(
1028            SCENARIO,
1029            format!(
1030                "expected event from stream starting with 'account-123' but got from {}",
1031                stream_id_str
1032            ),
1033        ));
1034    }
1035
1036    // And: Verify it's NOT from my-account-456 (proves we're not doing substring matching)
1037    if stream_id_str.starts_with("my-account-456") {
1038        return Err(ContractTestFailure::assertion(
1039            SCENARIO,
1040            "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
1041        ));
1042    }
1043
1044    Ok(())
1045}
1046
1047/// Contract test: Batch limiting returns exactly the specified number of events
1048pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
1049where
1050    F: Fn() -> S + Send + Sync + Clone + 'static,
1051    S: EventStore + EventReader + Send + Sync + 'static,
1052{
1053    const SCENARIO: &str = "batch_limiting";
1054
1055    let store = make_store();
1056
1057    // Given: 20 events in the store
1058    let stream = contract_stream_id(SCENARIO, "stream")?;
1059
1060    let mut writes = register_contract_stream(
1061        SCENARIO,
1062        StreamWrites::new(),
1063        &stream,
1064        StreamVersion::new(0),
1065    )?;
1066
1067    // Append 20 events
1068    for _ in 0..20 {
1069        writes = append_contract_event(SCENARIO, writes, &stream)?;
1070    }
1071
1072    let _ = store
1073        .append_events(writes)
1074        .await
1075        .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1076
1077    // When: Read events with limit of 10
1078    let filter = EventFilter::all();
1079    let page = EventPage::first(BatchSize::new(10));
1080    let events = store
1081        .read_events::<ContractTestEvent>(filter, page)
1082        .await
1083        .map_err(|_error| {
1084            ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1085        })?;
1086
1087    // Then: Exactly 10 events are returned
1088    if events.len() != 10 {
1089        return Err(ContractTestFailure::assertion(
1090            SCENARIO,
1091            format!("expected exactly 10 events but got {}", events.len()),
1092        ));
1093    }
1094
1095    // And: Events are the FIRST 10 in global order
1096    // (We verify this by checking we got exactly 10 events - the implementation
1097    // must return events in order, so if we got 10 events they must be the first 10)
1098
1099    Ok(())
1100}
1101
1102// ============================================================================
1103// CheckpointStore Contract Tests
1104// ============================================================================
1105
1106/// Contract test: Save a checkpoint and load it back
1107pub async fn test_checkpoint_save_and_load<F, CS>(make_checkpoint_store: F) -> ContractTestResult
1108where
1109    F: Fn() -> CS + Send + Sync + Clone + 'static,
1110    CS: CheckpointStore + Send + Sync + 'static,
1111{
1112    const SCENARIO: &str = "checkpoint_save_and_load";
1113
1114    let store = make_checkpoint_store();
1115
1116    // Given: A subscription name and position
1117    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1118    let position = StreamPosition::new(Uuid::now_v7());
1119
1120    // When: Saving the checkpoint
1121    store
1122        .save(&subscription_name, position)
1123        .await
1124        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?;
1125
1126    // Then: Loading returns the saved position
1127    let loaded = store
1128        .load(&subscription_name)
1129        .await
1130        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1131
1132    if loaded != Some(position) {
1133        return Err(ContractTestFailure::assertion(
1134            SCENARIO,
1135            format!(
1136                "expected loaded position {:?} but got {:?}",
1137                Some(position),
1138                loaded
1139            ),
1140        ));
1141    }
1142
1143    Ok(())
1144}
1145
1146/// Contract test: Saving a checkpoint overwrites the previous value
1147pub async fn test_checkpoint_update_overwrites<F, CS>(
1148    make_checkpoint_store: F,
1149) -> ContractTestResult
1150where
1151    F: Fn() -> CS + Send + Sync + Clone + 'static,
1152    CS: CheckpointStore + Send + Sync + 'static,
1153{
1154    const SCENARIO: &str = "checkpoint_update_overwrites";
1155
1156    let store = make_checkpoint_store();
1157
1158    // Given: A subscription with an initial checkpoint
1159    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1160    let first_position = StreamPosition::new(Uuid::now_v7());
1161
1162    store
1163        .save(&subscription_name, first_position)
1164        .await
1165        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?;
1166
1167    // When: Saving a new position
1168    let second_position = StreamPosition::new(Uuid::now_v7());
1169    store
1170        .save(&subscription_name, second_position)
1171        .await
1172        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?;
1173
1174    // Then: Loading returns the new position, not the old one
1175    let loaded = store
1176        .load(&subscription_name)
1177        .await
1178        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1179
1180    if loaded != Some(second_position) {
1181        return Err(ContractTestFailure::assertion(
1182            SCENARIO,
1183            format!(
1184                "expected updated position {:?} but got {:?}",
1185                Some(second_position),
1186                loaded
1187            ),
1188        ));
1189    }
1190
1191    Ok(())
1192}
1193
1194/// Contract test: Loading a non-existent checkpoint returns None
1195pub async fn test_checkpoint_load_missing_returns_none<F, CS>(
1196    make_checkpoint_store: F,
1197) -> ContractTestResult
1198where
1199    F: Fn() -> CS + Send + Sync + Clone + 'static,
1200    CS: CheckpointStore + Send + Sync + 'static,
1201{
1202    const SCENARIO: &str = "checkpoint_load_missing_returns_none";
1203
1204    let store = make_checkpoint_store();
1205
1206    // Given: A subscription name that has never been saved
1207    let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7());
1208
1209    // When: Loading the checkpoint
1210    let loaded = store
1211        .load(&subscription_name)
1212        .await
1213        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?;
1214
1215    // Then: None is returned
1216    if loaded.is_some() {
1217        return Err(ContractTestFailure::assertion(
1218            SCENARIO,
1219            format!("expected None for missing checkpoint but got {:?}", loaded),
1220        ));
1221    }
1222
1223    Ok(())
1224}
1225
1226/// Contract test: Different subscription names have independent checkpoints
1227pub async fn test_checkpoint_independent_subscriptions<F, CS>(
1228    make_checkpoint_store: F,
1229) -> ContractTestResult
1230where
1231    F: Fn() -> CS + Send + Sync + Clone + 'static,
1232    CS: CheckpointStore + Send + Sync + 'static,
1233{
1234    const SCENARIO: &str = "checkpoint_independent_subscriptions";
1235
1236    let store = make_checkpoint_store();
1237
1238    // Given: Two subscription names
1239    let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7());
1240    let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7());
1241
1242    let position_a = StreamPosition::new(Uuid::now_v7());
1243    let position_b = StreamPosition::new(Uuid::now_v7());
1244
1245    // When: Saving different positions for each
1246    store
1247        .save(&subscription_a, position_a)
1248        .await
1249        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?;
1250
1251    store
1252        .save(&subscription_b, position_b)
1253        .await
1254        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?;
1255
1256    // Then: Each subscription loads its own position
1257    let loaded_a = store
1258        .load(&subscription_a)
1259        .await
1260        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?;
1261
1262    let loaded_b = store
1263        .load(&subscription_b)
1264        .await
1265        .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?;
1266
1267    if loaded_a != Some(position_a) {
1268        return Err(ContractTestFailure::assertion(
1269            SCENARIO,
1270            format!(
1271                "subscription A: expected {:?} but got {:?}",
1272                Some(position_a),
1273                loaded_a
1274            ),
1275        ));
1276    }
1277
1278    if loaded_b != Some(position_b) {
1279        return Err(ContractTestFailure::assertion(
1280            SCENARIO,
1281            format!(
1282                "subscription B: expected {:?} but got {:?}",
1283                Some(position_b),
1284                loaded_b
1285            ),
1286        ));
1287    }
1288
1289    Ok(())
1290}
1291
1292// ============================================================================
1293// ProjectorCoordinator Contract Tests
1294// ============================================================================
1295
1296/// Contract test: First instance can acquire leadership successfully
1297///
1298/// Observable behavior: When no other instance holds leadership for a subscription,
1299/// calling try_acquire returns a guard indicating successful acquisition.
1300pub async fn test_coordination_acquire_leadership<F, C>(make_coordinator: F) -> ContractTestResult
1301where
1302    F: Fn() -> C + Send + Sync + Clone + 'static,
1303    C: ProjectorCoordinator + Send + Sync + 'static,
1304{
1305    const SCENARIO: &str = "coordination_acquire_leadership";
1306
1307    let coordinator = make_coordinator();
1308
1309    // Given: A unique subscription name (no existing leadership)
1310    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1311
1312    // When: Attempting to acquire leadership
1313    let result = coordinator.try_acquire(&subscription_name).await;
1314
1315    // Then: Acquisition succeeds (returns Ok with guard)
1316    if result.is_err() {
1317        return Err(ContractTestFailure::assertion(
1318            SCENARIO,
1319            "expected first instance to acquire leadership successfully, but try_acquire failed",
1320        ));
1321    }
1322
1323    Ok(())
1324}
1325
1326/// Contract test: Second instance returns error when leadership unavailable
1327///
1328/// Observable behavior: When one instance holds leadership for a subscription,
1329/// a second attempt to acquire leadership for the same subscription returns an error.
1330pub async fn test_coordination_second_instance_blocked<F, C>(
1331    make_coordinator: F,
1332) -> ContractTestResult
1333where
1334    F: Fn() -> C + Send + Sync + Clone + 'static,
1335    C: ProjectorCoordinator + Send + Sync + 'static,
1336{
1337    const SCENARIO: &str = "coordination_second_instance_blocked";
1338
1339    let coordinator = make_coordinator();
1340
1341    // Given: A unique subscription name
1342    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1343
1344    // And: First instance acquires leadership
1345    let _first_guard = coordinator
1346        .try_acquire(&subscription_name)
1347        .await
1348        .map_err(|_| {
1349            ContractTestFailure::assertion(SCENARIO, "first instance failed to acquire leadership")
1350        })?;
1351
1352    // When: Second instance attempts to acquire leadership while first holds it
1353    let second_result = coordinator.try_acquire(&subscription_name).await;
1354
1355    // Then: Second attempt returns an error (leadership unavailable)
1356    if second_result.is_ok() {
1357        return Err(ContractTestFailure::assertion(
1358            SCENARIO,
1359            "expected second instance to be blocked but try_acquire succeeded",
1360        ));
1361    }
1362
1363    Ok(())
1364}
1365
1366/// Contract test: Different projectors have independent coordination (different lock keys)
1367///
1368/// Observable behavior: Leadership for one subscription does not block leadership
1369/// acquisition for a different subscription. Each subscription/projector has its own
1370/// independent coordination scope.
1371pub async fn test_coordination_independent_subscriptions<F, C>(
1372    make_coordinator: F,
1373) -> ContractTestResult
1374where
1375    F: Fn() -> C + Send + Sync + Clone + 'static,
1376    C: ProjectorCoordinator + Send + Sync + 'static,
1377{
1378    const SCENARIO: &str = "coordination_independent_subscriptions";
1379
1380    let coordinator = make_coordinator();
1381
1382    // Given: Two unique subscription names (different projectors)
1383    let subscription_a = format!("contract::{}::projector-A::{}", SCENARIO, Uuid::now_v7());
1384    let subscription_b = format!("contract::{}::projector-B::{}", SCENARIO, Uuid::now_v7());
1385
1386    // And: First projector acquires leadership for subscription A
1387    let _guard_a = coordinator
1388        .try_acquire(&subscription_a)
1389        .await
1390        .map_err(|_| {
1391            ContractTestFailure::assertion(
1392                SCENARIO,
1393                "projector-A failed to acquire leadership for its subscription",
1394            )
1395        })?;
1396
1397    // When: Second projector attempts to acquire leadership for subscription B
1398    // (while first projector still holds leadership for A)
1399    let result_b = coordinator.try_acquire(&subscription_b).await;
1400
1401    // Then: Second acquisition succeeds (different subscriptions are independent)
1402    if result_b.is_err() {
1403        return Err(ContractTestFailure::assertion(
1404            SCENARIO,
1405            "expected projector-B to acquire leadership for its own subscription, but try_acquire failed - different projectors should have independent coordination",
1406        ));
1407    }
1408
1409    Ok(())
1410}
1411
1412/// Contract test: Leadership is released when guard is dropped (crash/disconnect recovery)
1413///
1414/// Observable behavior: When an instance holding leadership drops its guard (simulating
1415/// process exit, crash, or connection close), the lock is released and another instance
1416/// can acquire leadership for the same subscription.
1417pub async fn test_coordination_leadership_released_on_guard_drop<F, C>(
1418    make_coordinator: F,
1419) -> ContractTestResult
1420where
1421    F: Fn() -> C + Send + Sync + Clone + 'static,
1422    C: ProjectorCoordinator + Send + Sync + 'static,
1423{
1424    const SCENARIO: &str = "coordination_leadership_released_on_guard_drop";
1425
1426    let coordinator = make_coordinator();
1427
1428    // Given: A unique subscription name
1429    let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7());
1430
1431    // And: First instance acquires leadership, then drops the guard
1432    {
1433        let _first_guard = coordinator
1434            .try_acquire(&subscription_name)
1435            .await
1436            .map_err(|_| {
1437                ContractTestFailure::assertion(
1438                    SCENARIO,
1439                    "first instance failed to acquire leadership",
1440                )
1441            })?;
1442        // Guard is dropped here when scope ends (simulates process exit/crash)
1443    }
1444
1445    // When: Second instance attempts to acquire leadership after first guard dropped
1446    let second_result = coordinator.try_acquire(&subscription_name).await;
1447
1448    // Then: Second acquisition succeeds (leadership was released)
1449    if second_result.is_err() {
1450        return Err(ContractTestFailure::assertion(
1451            SCENARIO,
1452            "expected second instance to acquire leadership after first guard dropped, but try_acquire failed - leadership should be released when guard is dropped",
1453        ));
1454    }
1455
1456    Ok(())
1457}