1use eventcore_types::{
2 BatchSize, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, StreamId,
3 StreamPrefix, StreamVersion, StreamWrites,
4};
5use std::fmt;
6
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9
10#[derive(Debug)]
11pub struct ContractTestFailure {
12 scenario: &'static str,
13 detail: String,
14}
15
16impl ContractTestFailure {
17 fn new(scenario: &'static str, detail: impl Into<String>) -> Self {
18 Self {
19 scenario,
20 detail: detail.into(),
21 }
22 }
23
24 fn builder_error(scenario: &'static str, phase: &'static str, error: EventStoreError) -> Self {
25 Self::new(scenario, format!("builder failure during {phase}: {error}"))
26 }
27
28 fn store_error(
29 scenario: &'static str,
30 operation: &'static str,
31 error: EventStoreError,
32 ) -> Self {
33 Self::new(
34 scenario,
35 format!("{operation} operation returned unexpected error: {error}"),
36 )
37 }
38
39 fn assertion(scenario: &'static str, detail: impl Into<String>) -> Self {
40 Self::new(scenario, detail)
41 }
42}
43
44impl fmt::Display for ContractTestFailure {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 write!(f, "[{}] {}", self.scenario, self.detail)
47 }
48}
49
50impl std::error::Error for ContractTestFailure {}
51
52pub type ContractTestResult = Result<(), ContractTestFailure>;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ContractTestEvent {
56 stream_id: StreamId,
57}
58
59impl ContractTestEvent {
60 pub fn new(stream_id: StreamId) -> Self {
61 Self { stream_id }
62 }
63}
64
65impl Event for ContractTestEvent {
66 fn stream_id(&self) -> &StreamId {
67 &self.stream_id
68 }
69}
70
71fn contract_stream_id(
72 scenario: &'static str,
73 label: &str,
74) -> Result<StreamId, ContractTestFailure> {
75 let raw = format!("contract::{}::{}::{}", scenario, label, Uuid::now_v7());
77
78 StreamId::try_new(raw.clone()).map_err(|error| {
79 ContractTestFailure::assertion(
80 scenario,
81 format!("unable to construct stream id `{}`: {}", raw, error),
82 )
83 })
84}
85
86fn builder_step(
87 scenario: &'static str,
88 phase: &'static str,
89 result: Result<StreamWrites, EventStoreError>,
90) -> Result<StreamWrites, ContractTestFailure> {
91 result.map_err(|error| ContractTestFailure::builder_error(scenario, phase, error))
92}
93
94fn register_contract_stream(
95 scenario: &'static str,
96 writes: StreamWrites,
97 stream_id: &StreamId,
98 expected_version: StreamVersion,
99) -> Result<StreamWrites, ContractTestFailure> {
100 builder_step(
101 scenario,
102 "register_stream",
103 writes.register_stream(stream_id.clone(), expected_version),
104 )
105}
106
107fn append_contract_event(
108 scenario: &'static str,
109 writes: StreamWrites,
110 stream_id: &StreamId,
111) -> Result<StreamWrites, ContractTestFailure> {
112 let event = ContractTestEvent::new(stream_id.clone());
113 builder_step(scenario, "append", writes.append(event))
114}
115
116pub async fn test_basic_read_write<F, S>(make_store: F) -> ContractTestResult
117where
118 F: Fn() -> S + Send + Sync + Clone + 'static,
119 S: EventStore + Send + Sync + 'static,
120{
121 const SCENARIO: &str = "basic_read_write";
122
123 let store = make_store();
124 let stream_id = contract_stream_id(SCENARIO, "single");
125
126 let stream_id = stream_id?;
127
128 let writes = register_contract_stream(
129 SCENARIO,
130 StreamWrites::new(),
131 &stream_id,
132 StreamVersion::new(0),
133 )?;
134 let writes = append_contract_event(SCENARIO, writes, &stream_id)?;
135
136 let _ = store
137 .append_events(writes)
138 .await
139 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
140
141 let reader = store
142 .read_stream::<ContractTestEvent>(stream_id.clone())
143 .await
144 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
145
146 let len = reader.len();
147 let empty = reader.is_empty();
148
149 if empty {
150 return Err(ContractTestFailure::assertion(
151 SCENARIO,
152 "expected stream to contain events but it was empty",
153 ));
154 }
155
156 if len != 1 {
157 return Err(ContractTestFailure::assertion(
158 SCENARIO,
159 format!(
160 "expected stream to contain exactly one event, observed len={}",
161 len
162 ),
163 ));
164 }
165
166 Ok(())
167}
168
169pub async fn test_concurrent_version_conflicts<F, S>(make_store: F) -> ContractTestResult
170where
171 F: Fn() -> S + Send + Sync + Clone + 'static,
172 S: EventStore + Send + Sync + 'static,
173{
174 const SCENARIO: &str = "concurrent_version_conflicts";
175
176 let store = make_store();
177 let stream_id = contract_stream_id(SCENARIO, "shared")?;
178
179 let first_writes = register_contract_stream(
180 SCENARIO,
181 StreamWrites::new(),
182 &stream_id,
183 StreamVersion::new(0),
184 )?;
185 let first_writes = append_contract_event(SCENARIO, first_writes, &stream_id)?;
186
187 let _ = store
188 .append_events(first_writes)
189 .await
190 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
191
192 let conflicting_writes = register_contract_stream(
193 SCENARIO,
194 StreamWrites::new(),
195 &stream_id,
196 StreamVersion::new(0),
197 )?;
198 let conflicting_writes = append_contract_event(SCENARIO, conflicting_writes, &stream_id)?;
199
200 match store.append_events(conflicting_writes).await {
201 Err(EventStoreError::VersionConflict) => Ok(()),
202 Err(error) => Err(ContractTestFailure::store_error(
203 SCENARIO,
204 "append_events",
205 error,
206 )),
207 Ok(_) => Err(ContractTestFailure::assertion(
208 SCENARIO,
209 "expected version conflict but append succeeded",
210 )),
211 }
212}
213
214pub async fn test_stream_isolation<F, S>(make_store: F) -> ContractTestResult
215where
216 F: Fn() -> S + Send + Sync + Clone + 'static,
217 S: EventStore + Send + Sync + 'static,
218{
219 const SCENARIO: &str = "stream_isolation";
220
221 let store = make_store();
222 let left_stream = contract_stream_id(SCENARIO, "left")?;
223 let right_stream = contract_stream_id(SCENARIO, "right")?;
224
225 let writes = register_contract_stream(
226 SCENARIO,
227 StreamWrites::new(),
228 &left_stream,
229 StreamVersion::new(0),
230 )?;
231 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
232 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
233 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
234
235 let _ = store
236 .append_events(writes)
237 .await
238 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
239
240 let left_reader = store
241 .read_stream::<ContractTestEvent>(left_stream.clone())
242 .await
243 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
244
245 let right_reader = store
246 .read_stream::<ContractTestEvent>(right_stream.clone())
247 .await
248 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
249
250 let left_len = left_reader.len();
251 if left_len != 1 {
252 return Err(ContractTestFailure::assertion(
253 SCENARIO,
254 format!(
255 "left stream expected exactly one event but observed {}",
256 left_len
257 ),
258 ));
259 }
260
261 if left_reader
262 .iter()
263 .any(|event| event.stream_id() != &left_stream)
264 {
265 return Err(ContractTestFailure::assertion(
266 SCENARIO,
267 "left stream read events belonging to another stream",
268 ));
269 }
270
271 let right_len = right_reader.len();
272 if right_len != 1 {
273 return Err(ContractTestFailure::assertion(
274 SCENARIO,
275 format!(
276 "right stream expected exactly one event but observed {}",
277 right_len
278 ),
279 ));
280 }
281
282 if right_reader
283 .iter()
284 .any(|event| event.stream_id() != &right_stream)
285 {
286 return Err(ContractTestFailure::assertion(
287 SCENARIO,
288 "right stream read events belonging to another stream",
289 ));
290 }
291
292 Ok(())
293}
294
295pub async fn test_missing_stream_reads<F, S>(make_store: F) -> ContractTestResult
296where
297 F: Fn() -> S + Send + Sync + Clone + 'static,
298 S: EventStore + Send + Sync + 'static,
299{
300 const SCENARIO: &str = "missing_stream_reads";
301
302 let store = make_store();
303 let stream_id = contract_stream_id(SCENARIO, "ghost")?;
304
305 let reader = store
306 .read_stream::<ContractTestEvent>(stream_id.clone())
307 .await
308 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "read_stream", error))?;
309
310 if !reader.is_empty() {
311 return Err(ContractTestFailure::assertion(
312 SCENARIO,
313 "expected read_stream to succeed with no events for an untouched stream",
314 ));
315 }
316
317 Ok(())
318}
319
320pub async fn test_conflict_preserves_atomicity<F, S>(make_store: F) -> ContractTestResult
321where
322 F: Fn() -> S + Send + Sync + Clone + 'static,
323 S: EventStore + Send + Sync + 'static,
324{
325 const SCENARIO: &str = "conflict_preserves_atomicity";
326
327 let store = make_store();
328 let left_stream = contract_stream_id(SCENARIO, "left")?;
329 let right_stream = contract_stream_id(SCENARIO, "right")?;
330
331 let writes = register_contract_stream(
333 SCENARIO,
334 StreamWrites::new(),
335 &left_stream,
336 StreamVersion::new(0),
337 )?;
338 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(0))?;
339 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
340 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
341
342 let _ = store
343 .append_events(writes)
344 .await
345 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
346
347 let writes = register_contract_stream(
349 SCENARIO,
350 StreamWrites::new(),
351 &left_stream,
352 StreamVersion::new(0),
353 )?;
354 let writes = register_contract_stream(SCENARIO, writes, &right_stream, StreamVersion::new(1))?;
355 let writes = append_contract_event(SCENARIO, writes, &left_stream)?;
356 let writes = append_contract_event(SCENARIO, writes, &right_stream)?;
357
358 match store.append_events(writes).await {
359 Err(EventStoreError::VersionConflict) => {
360 let left_reader = store
361 .read_stream::<ContractTestEvent>(left_stream.clone())
362 .await
363 .map_err(|error| {
364 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
365 })?;
366 if left_reader.len() != 1 {
367 return Err(ContractTestFailure::assertion(
368 SCENARIO,
369 format!(
370 "expected left stream to remain at len=1 after failed append, observed {}",
371 left_reader.len()
372 ),
373 ));
374 }
375
376 let right_reader = store
377 .read_stream::<ContractTestEvent>(right_stream.clone())
378 .await
379 .map_err(|error| {
380 ContractTestFailure::store_error(SCENARIO, "read_stream", error)
381 })?;
382 if right_reader.len() != 1 {
383 return Err(ContractTestFailure::assertion(
384 SCENARIO,
385 format!(
386 "expected right stream to remain at len=1 after failed append, observed {}",
387 right_reader.len()
388 ),
389 ));
390 }
391
392 Ok(())
393 }
394 Err(error) => Err(ContractTestFailure::store_error(
395 SCENARIO,
396 "append_events",
397 error,
398 )),
399 Ok(_) => Err(ContractTestFailure::assertion(
400 SCENARIO,
401 "expected version conflict but append succeeded",
402 )),
403 }
404}
405
406#[macro_export]
407macro_rules! event_store_contract_tests {
408 (suite = $suite:ident, make_store = $make_store:expr $(,)?) => {
409 #[allow(non_snake_case)]
410 mod $suite {
411 use $crate::contract::{
412 test_basic_read_write, test_concurrent_version_conflicts,
413 test_conflict_preserves_atomicity, test_missing_stream_reads,
414 test_stream_isolation,
415 };
416
417 #[tokio::test(flavor = "multi_thread")]
418 async fn basic_read_write_contract() {
419 test_basic_read_write($make_store)
420 .await
421 .expect("event store contract failed");
422 }
423
424 #[tokio::test(flavor = "multi_thread")]
425 async fn concurrent_version_conflicts_contract() {
426 test_concurrent_version_conflicts($make_store)
427 .await
428 .expect("event store contract failed");
429 }
430
431 #[tokio::test(flavor = "multi_thread")]
432 async fn stream_isolation_contract() {
433 test_stream_isolation($make_store)
434 .await
435 .expect("event store contract failed");
436 }
437
438 #[tokio::test(flavor = "multi_thread")]
439 async fn missing_stream_reads_contract() {
440 test_missing_stream_reads($make_store)
441 .await
442 .expect("event store contract failed");
443 }
444
445 #[tokio::test(flavor = "multi_thread")]
446 async fn conflict_preserves_atomicity_contract() {
447 test_conflict_preserves_atomicity($make_store)
448 .await
449 .expect("event store contract failed");
450 }
451 }
452 };
453}
454
455#[macro_export]
456macro_rules! event_reader_contract_tests {
457 (suite = $suite:ident, make_store = $make_store:expr $(,)?) => {
458 #[allow(non_snake_case)]
459 mod $suite {
460 use $crate::contract::{
461 test_batch_limiting, test_event_ordering_across_streams,
462 test_position_based_resumption, test_stream_prefix_filtering,
463 test_stream_prefix_requires_prefix_match,
464 };
465
466 #[tokio::test(flavor = "multi_thread")]
467 async fn event_ordering_across_streams_contract() {
468 test_event_ordering_across_streams($make_store)
469 .await
470 .expect("event reader contract failed");
471 }
472
473 #[tokio::test(flavor = "multi_thread")]
474 async fn position_based_resumption_contract() {
475 test_position_based_resumption($make_store)
476 .await
477 .expect("event reader contract failed");
478 }
479
480 #[tokio::test(flavor = "multi_thread")]
481 async fn stream_prefix_filtering_contract() {
482 test_stream_prefix_filtering($make_store)
483 .await
484 .expect("event reader contract failed");
485 }
486
487 #[tokio::test(flavor = "multi_thread")]
488 async fn stream_prefix_requires_prefix_match_contract() {
489 test_stream_prefix_requires_prefix_match($make_store)
490 .await
491 .expect("event reader contract failed");
492 }
493
494 #[tokio::test(flavor = "multi_thread")]
495 async fn batch_limiting_contract() {
496 test_batch_limiting($make_store)
497 .await
498 .expect("event reader contract failed");
499 }
500 }
501 };
502}
503
504pub use event_reader_contract_tests;
505pub use event_store_contract_tests;
506
507#[macro_export]
508macro_rules! event_store_suite {
509 (suite = $suite:ident, make_store = $make_store:expr $(,)?) => {
510 #[allow(non_snake_case)]
511 mod $suite {
512 use $crate::contract::{
513 test_basic_read_write, test_batch_limiting, test_concurrent_version_conflicts,
514 test_conflict_preserves_atomicity, test_event_ordering_across_streams,
515 test_missing_stream_reads, test_position_based_resumption, test_stream_isolation,
516 test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match,
517 };
518
519 #[tokio::test(flavor = "multi_thread")]
520 async fn basic_read_write_contract() {
521 test_basic_read_write($make_store)
522 .await
523 .expect("event store contract failed");
524 }
525
526 #[tokio::test(flavor = "multi_thread")]
527 async fn concurrent_version_conflicts_contract() {
528 test_concurrent_version_conflicts($make_store)
529 .await
530 .expect("event store contract failed");
531 }
532
533 #[tokio::test(flavor = "multi_thread")]
534 async fn stream_isolation_contract() {
535 test_stream_isolation($make_store)
536 .await
537 .expect("event store contract failed");
538 }
539
540 #[tokio::test(flavor = "multi_thread")]
541 async fn missing_stream_reads_contract() {
542 test_missing_stream_reads($make_store)
543 .await
544 .expect("event store contract failed");
545 }
546
547 #[tokio::test(flavor = "multi_thread")]
548 async fn conflict_preserves_atomicity_contract() {
549 test_conflict_preserves_atomicity($make_store)
550 .await
551 .expect("event store contract failed");
552 }
553
554 #[tokio::test(flavor = "multi_thread")]
555 async fn event_ordering_across_streams_contract() {
556 test_event_ordering_across_streams($make_store)
557 .await
558 .expect("event reader contract failed");
559 }
560
561 #[tokio::test(flavor = "multi_thread")]
562 async fn position_based_resumption_contract() {
563 test_position_based_resumption($make_store)
564 .await
565 .expect("event reader contract failed");
566 }
567
568 #[tokio::test(flavor = "multi_thread")]
569 async fn stream_prefix_filtering_contract() {
570 test_stream_prefix_filtering($make_store)
571 .await
572 .expect("event reader contract failed");
573 }
574
575 #[tokio::test(flavor = "multi_thread")]
576 async fn stream_prefix_requires_prefix_match_contract() {
577 test_stream_prefix_requires_prefix_match($make_store)
578 .await
579 .expect("event reader contract failed");
580 }
581
582 #[tokio::test(flavor = "multi_thread")]
583 async fn batch_limiting_contract() {
584 test_batch_limiting($make_store)
585 .await
586 .expect("event reader contract failed");
587 }
588 }
589 };
590}
591
592pub use event_store_suite;
593
594pub async fn test_event_ordering_across_streams<F, S>(make_store: F) -> ContractTestResult
596where
597 F: Fn() -> S + Send + Sync + Clone + 'static,
598 S: EventStore + EventReader + Send + Sync + 'static,
599{
600 const SCENARIO: &str = "event_ordering_across_streams";
601
602 let store = make_store();
603
604 let stream_a = contract_stream_id(SCENARIO, "stream-a")?;
606 let stream_b = contract_stream_id(SCENARIO, "stream-b")?;
607 let stream_c = contract_stream_id(SCENARIO, "stream-c")?;
608
609 let writes = register_contract_stream(
611 SCENARIO,
612 StreamWrites::new(),
613 &stream_a,
614 StreamVersion::new(0),
615 )?;
616 let writes = append_contract_event(SCENARIO, writes, &stream_a)?;
617 let _ = store
618 .append_events(writes)
619 .await
620 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
621
622 let writes = register_contract_stream(
624 SCENARIO,
625 StreamWrites::new(),
626 &stream_b,
627 StreamVersion::new(0),
628 )?;
629 let writes = append_contract_event(SCENARIO, writes, &stream_b)?;
630 let _ = store
631 .append_events(writes)
632 .await
633 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
634
635 let writes = register_contract_stream(
637 SCENARIO,
638 StreamWrites::new(),
639 &stream_c,
640 StreamVersion::new(0),
641 )?;
642 let writes = append_contract_event(SCENARIO, writes, &stream_c)?;
643 let _ = store
644 .append_events(writes)
645 .await
646 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
647
648 let filter = EventFilter::all();
650 let page = EventPage::first(BatchSize::new(100));
651 let events = store
652 .read_events::<ContractTestEvent>(filter, page)
653 .await
654 .map_err(|_error| {
655 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
656 })?;
657
658 if events.len() != 3 {
660 return Err(ContractTestFailure::assertion(
661 SCENARIO,
662 format!("expected 3 events but got {}", events.len()),
663 ));
664 }
665
666 let (first_event, _) = &events[0];
668 if first_event.stream_id() != &stream_a {
669 return Err(ContractTestFailure::assertion(
670 SCENARIO,
671 format!(
672 "expected first event from stream_a but got from {:?}",
673 first_event.stream_id()
674 ),
675 ));
676 }
677
678 let (second_event, _) = &events[1];
679 if second_event.stream_id() != &stream_b {
680 return Err(ContractTestFailure::assertion(
681 SCENARIO,
682 format!(
683 "expected second event from stream_b but got from {:?}",
684 second_event.stream_id()
685 ),
686 ));
687 }
688
689 let (third_event, _) = &events[2];
690 if third_event.stream_id() != &stream_c {
691 return Err(ContractTestFailure::assertion(
692 SCENARIO,
693 format!(
694 "expected third event from stream_c but got from {:?}",
695 third_event.stream_id()
696 ),
697 ));
698 }
699
700 Ok(())
701}
702
703pub async fn test_position_based_resumption<F, S>(make_store: F) -> ContractTestResult
705where
706 F: Fn() -> S + Send + Sync + Clone + 'static,
707 S: EventStore + EventReader + Send + Sync + 'static,
708{
709 const SCENARIO: &str = "position_based_resumption";
710
711 let store = make_store();
712
713 let stream = contract_stream_id(SCENARIO, "stream")?;
715
716 let mut writes = register_contract_stream(
717 SCENARIO,
718 StreamWrites::new(),
719 &stream,
720 StreamVersion::new(0),
721 )?;
722
723 for _ in 0..5 {
725 writes = append_contract_event(SCENARIO, writes, &stream)?;
726 }
727
728 let _ = store
729 .append_events(writes)
730 .await
731 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
732
733 let filter = EventFilter::all();
735 let page = EventPage::first(BatchSize::new(100));
736 let all_events = store
737 .read_events::<ContractTestEvent>(filter.clone(), page)
738 .await
739 .map_err(|_error| {
740 ContractTestFailure::assertion(SCENARIO, "read_events failed to read events")
741 })?;
742
743 let (_third_event, third_position) = &all_events[2];
744
745 let page_after = EventPage::after(*third_position, BatchSize::new(100));
747 let events_after = store
748 .read_events::<ContractTestEvent>(filter, page_after)
749 .await
750 .map_err(|_error| {
751 ContractTestFailure::assertion(
752 SCENARIO,
753 "read_events failed when reading after position",
754 )
755 })?;
756
757 if events_after.len() != 2 {
759 return Err(ContractTestFailure::assertion(
760 SCENARIO,
761 format!(
762 "expected 2 events after position {} but got {}",
763 third_position,
764 events_after.len()
765 ),
766 ));
767 }
768
769 for (_event, position) in events_after.iter() {
771 if *position == *third_position {
772 return Err(ContractTestFailure::assertion(
773 SCENARIO,
774 format!(
775 "expected position {} to be excluded but it was included in results",
776 third_position
777 ),
778 ));
779 }
780 }
781
782 let (_event1, pos1) = &events_after[0];
784 let (_event2, pos2) = &events_after[1];
785
786 if *pos1 <= *third_position {
787 return Err(ContractTestFailure::assertion(
788 SCENARIO,
789 format!(
790 "expected first returned position to be > {} but got {}",
791 third_position, pos1
792 ),
793 ));
794 }
795
796 if *pos2 <= *pos1 {
797 return Err(ContractTestFailure::assertion(
798 SCENARIO,
799 format!(
800 "expected positions to be in ascending order but {} <= {}",
801 pos2, pos1
802 ),
803 ));
804 }
805
806 Ok(())
807}
808
809pub async fn test_stream_prefix_filtering<F, S>(make_store: F) -> ContractTestResult
811where
812 F: Fn() -> S + Send + Sync + Clone + 'static,
813 S: EventStore + EventReader + Send + Sync + 'static,
814{
815 const SCENARIO: &str = "stream_prefix_filtering";
816
817 let store = make_store();
818
819 let account_1 = StreamId::try_new(format!("account-1-{}", Uuid::now_v7())).map_err(|e| {
821 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
822 })?;
823 let account_2 = StreamId::try_new(format!("account-2-{}", Uuid::now_v7())).map_err(|e| {
824 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
825 })?;
826 let order_1 = StreamId::try_new(format!("order-1-{}", Uuid::now_v7())).map_err(|e| {
827 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
828 })?;
829
830 let mut writes = register_contract_stream(
831 SCENARIO,
832 StreamWrites::new(),
833 &account_1,
834 StreamVersion::new(0),
835 )?;
836 writes = register_contract_stream(SCENARIO, writes, &account_2, StreamVersion::new(0))?;
837 writes = register_contract_stream(SCENARIO, writes, &order_1, StreamVersion::new(0))?;
838
839 writes = append_contract_event(SCENARIO, writes, &account_1)?;
840 writes = append_contract_event(SCENARIO, writes, &account_2)?;
841 writes = append_contract_event(SCENARIO, writes, &order_1)?;
842
843 let _ = store
844 .append_events(writes)
845 .await
846 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
847
848 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
850 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
851 })?;
852 let filter = EventFilter::prefix(prefix);
853 let page = EventPage::first(BatchSize::new(100));
854 let events = store
855 .read_events::<ContractTestEvent>(filter, page)
856 .await
857 .map_err(|_error| {
858 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
859 })?;
860
861 if events.len() != 2 {
863 return Err(ContractTestFailure::assertion(
864 SCENARIO,
865 format!(
866 "expected 2 events from account-* streams but got {}",
867 events.len()
868 ),
869 ));
870 }
871
872 for (event, _) in events.iter() {
874 let stream_id_str = event.stream_id().as_ref();
875 if !stream_id_str.starts_with("account-") {
876 return Err(ContractTestFailure::assertion(
877 SCENARIO,
878 format!(
879 "expected all events from streams starting with 'account-' but found event from {}",
880 stream_id_str
881 ),
882 ));
883 }
884 }
885
886 Ok(())
889}
890
891pub async fn test_stream_prefix_requires_prefix_match<F, S>(make_store: F) -> ContractTestResult
893where
894 F: Fn() -> S + Send + Sync + Clone + 'static,
895 S: EventStore + EventReader + Send + Sync + 'static,
896{
897 const SCENARIO: &str = "stream_prefix_requires_prefix_match";
898
899 let store = make_store();
900
901 let account_stream =
904 StreamId::try_new(format!("account-123-{}", Uuid::now_v7())).map_err(|e| {
905 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
906 })?;
907 let my_account_stream = StreamId::try_new(format!("my-account-456-{}", Uuid::now_v7()))
908 .map_err(|e| {
909 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
910 })?;
911 let order_stream = StreamId::try_new(format!("order-789-{}", Uuid::now_v7())).map_err(|e| {
912 ContractTestFailure::assertion(SCENARIO, format!("invalid stream id: {}", e))
913 })?;
914
915 let mut writes = register_contract_stream(
916 SCENARIO,
917 StreamWrites::new(),
918 &account_stream,
919 StreamVersion::new(0),
920 )?;
921 writes = register_contract_stream(SCENARIO, writes, &my_account_stream, StreamVersion::new(0))?;
922 writes = register_contract_stream(SCENARIO, writes, &order_stream, StreamVersion::new(0))?;
923
924 writes = append_contract_event(SCENARIO, writes, &account_stream)?;
925 writes = append_contract_event(SCENARIO, writes, &my_account_stream)?;
926 writes = append_contract_event(SCENARIO, writes, &order_stream)?;
927
928 let _ = store
929 .append_events(writes)
930 .await
931 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
932
933 let prefix = StreamPrefix::try_new("account-").map_err(|e| {
935 ContractTestFailure::assertion(SCENARIO, format!("failed to create stream prefix: {}", e))
936 })?;
937 let filter = EventFilter::prefix(prefix);
938 let page = EventPage::first(BatchSize::new(100));
939 let events = store
940 .read_events::<ContractTestEvent>(filter, page)
941 .await
942 .map_err(|_error| {
943 ContractTestFailure::assertion(SCENARIO, "read_events failed with stream prefix filter")
944 })?;
945
946 if events.len() != 1 {
948 return Err(ContractTestFailure::assertion(
949 SCENARIO,
950 format!(
951 "expected exactly 1 event from account-* prefix but got {} (bug: implementation uses contains() instead of starts_with())",
952 events.len()
953 ),
954 ));
955 }
956
957 let (event, _) = &events[0];
959 let stream_id_str = event.stream_id().as_ref();
960 if !stream_id_str.starts_with("account-123") {
961 return Err(ContractTestFailure::assertion(
962 SCENARIO,
963 format!(
964 "expected event from stream starting with 'account-123' but got from {}",
965 stream_id_str
966 ),
967 ));
968 }
969
970 if stream_id_str.starts_with("my-account-456") {
972 return Err(ContractTestFailure::assertion(
973 SCENARIO,
974 "BUG EXPOSED: got event from stream starting with 'my-account-456' when filtering for prefix 'account-' - implementation must use prefix matching from the start of the stream ID",
975 ));
976 }
977
978 Ok(())
979}
980
981pub async fn test_batch_limiting<F, S>(make_store: F) -> ContractTestResult
983where
984 F: Fn() -> S + Send + Sync + Clone + 'static,
985 S: EventStore + EventReader + Send + Sync + 'static,
986{
987 const SCENARIO: &str = "batch_limiting";
988
989 let store = make_store();
990
991 let stream = contract_stream_id(SCENARIO, "stream")?;
993
994 let mut writes = register_contract_stream(
995 SCENARIO,
996 StreamWrites::new(),
997 &stream,
998 StreamVersion::new(0),
999 )?;
1000
1001 for _ in 0..20 {
1003 writes = append_contract_event(SCENARIO, writes, &stream)?;
1004 }
1005
1006 let _ = store
1007 .append_events(writes)
1008 .await
1009 .map_err(|error| ContractTestFailure::store_error(SCENARIO, "append_events", error))?;
1010
1011 let filter = EventFilter::all();
1013 let page = EventPage::first(BatchSize::new(10));
1014 let events = store
1015 .read_events::<ContractTestEvent>(filter, page)
1016 .await
1017 .map_err(|_error| {
1018 ContractTestFailure::assertion(SCENARIO, "read_events failed with limit")
1019 })?;
1020
1021 if events.len() != 10 {
1023 return Err(ContractTestFailure::assertion(
1024 SCENARIO,
1025 format!("expected exactly 10 events but got {}", events.len()),
1026 ));
1027 }
1028
1029 Ok(())
1034}