1use std::any::Any;
5use std::collections::{HashMap, HashSet};
6use std::io;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::RwLock;
12
13use crate::actor::{ActorConfig, AggregateHandle, spawn_actor_with_config};
14use crate::aggregate::Aggregate;
15use crate::process_manager::{
16 AggregateDispatcher, ProcessManagerCatchUp, ProcessManagerReport, ProcessManagerRunner,
17 TypedDispatcher, append_dead_letter,
18};
19use crate::projection::{Projection, ProjectionRunner};
20use crate::storage::StreamLayout;
21
22type HandleCache = HashMap<(String, String), Box<dyn Any + Send + Sync>>;
27
28type ProjectionMap = HashMap<String, Box<dyn Any + Send + Sync>>;
35
36type ProcessManagerList = Vec<std::sync::Mutex<Box<dyn ProcessManagerCatchUp>>>;
41
42type DispatcherMap = HashMap<String, Box<dyn AggregateDispatcher>>;
44
45type ProjectionCatchUpList = Vec<std::sync::Mutex<Box<dyn ProjectionCatchUpFn>>>;
52
53trait ProjectionCatchUpFn: Send + Sync {
59 fn catch_up(&mut self) -> io::Result<()>;
61}
62
63struct SharedProjectionCatchUp<P: Projection> {
67 inner: Arc<std::sync::Mutex<ProjectionRunner<P>>>,
68}
69
70impl<P: Projection> ProjectionCatchUpFn for SharedProjectionCatchUp<P> {
71 fn catch_up(&mut self) -> io::Result<()> {
72 let mut runner = self
73 .inner
74 .lock()
75 .map_err(|e| io::Error::other(e.to_string()))?;
76 runner.catch_up()
77 }
78}
79
80const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
82
83#[derive(Debug, Clone, Default)]
99pub struct InjectOptions {
100 pub run_process_managers: bool,
103}
104
105#[derive(Clone)]
122pub struct AggregateStore {
123 layout: StreamLayout,
124 cache: Arc<RwLock<HandleCache>>,
125 projections: Arc<std::sync::RwLock<ProjectionMap>>,
126 projection_catch_ups: Arc<std::sync::RwLock<ProjectionCatchUpList>>,
128 process_managers: Arc<std::sync::RwLock<ProcessManagerList>>,
129 dispatchers: Arc<DispatcherMap>,
130 seen_ids: Arc<std::sync::Mutex<HashSet<String>>>,
133 idle_timeout: Duration,
134}
135
136impl std::fmt::Debug for AggregateStore {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 f.debug_struct("AggregateStore")
141 .field("base_dir", &self.layout.base_dir())
142 .finish()
143 }
144}
145
146impl AggregateStore {
147 pub async fn open(base_dir: impl AsRef<Path>) -> io::Result<Self> {
163 let layout = StreamLayout::new(base_dir.as_ref());
164 let meta_dir = layout.meta_dir();
167 tokio::task::spawn_blocking(move || std::fs::create_dir_all(meta_dir))
168 .await
169 .map_err(io::Error::other)??;
170 Ok(Self {
171 layout,
172 cache: Arc::new(RwLock::new(HashMap::new())),
173 projections: Arc::new(std::sync::RwLock::new(HashMap::new())),
174 projection_catch_ups: Arc::new(std::sync::RwLock::new(Vec::new())),
175 process_managers: Arc::new(std::sync::RwLock::new(Vec::new())),
176 dispatchers: Arc::new(HashMap::new()),
177 seen_ids: Arc::new(std::sync::Mutex::new(HashSet::new())),
178 idle_timeout: DEFAULT_IDLE_TIMEOUT,
179 })
180 }
181
182 pub async fn get<A: Aggregate>(&self, id: &str) -> io::Result<AggregateHandle<A>> {
200 let key = (A::AGGREGATE_TYPE.to_owned(), id.to_owned());
201
202 {
204 let cache = self.cache.read().await;
205 if let Some(boxed) = cache.get(&key)
206 && let Some(handle) = boxed.downcast_ref::<AggregateHandle<A>>()
207 && handle.is_alive()
208 {
209 return Ok(handle.clone());
210 }
211 }
212
213 {
216 let mut cache = self.cache.write().await;
217 cache.remove(&key);
218 }
219
220 let layout = self.layout.clone();
222 let agg_type = A::AGGREGATE_TYPE.to_owned();
223 let inst_id = id.to_owned();
224 let stream_dir =
225 tokio::task::spawn_blocking(move || layout.ensure_stream(&agg_type, &inst_id))
226 .await
227 .map_err(io::Error::other)??;
228
229 tracing::debug!(
230 aggregate_type = A::AGGREGATE_TYPE,
231 instance_id = %id,
232 "spawning actor"
233 );
234
235 let config = ActorConfig {
236 idle_timeout: self.idle_timeout,
237 };
238 let handle = spawn_actor_with_config::<A>(&stream_dir, config)?;
239
240 let mut cache = self.cache.write().await;
241 cache.insert(key, Box::new(handle.clone()));
242 Ok(handle)
243 }
244
245 pub async fn list<A: Aggregate>(&self) -> io::Result<Vec<String>> {
256 let layout = self.layout.clone();
257 let agg_type = A::AGGREGATE_TYPE.to_owned();
258 tokio::task::spawn_blocking(move || layout.list_streams(&agg_type))
259 .await
260 .map_err(io::Error::other)?
261 }
262
263 pub fn builder(base_dir: impl AsRef<Path>) -> AggregateStoreBuilder {
287 AggregateStoreBuilder {
288 base_dir: base_dir.as_ref().to_owned(),
289 projection_factories: Vec::new(),
290 process_manager_factories: Vec::new(),
291 dispatcher_factories: Vec::new(),
292 idle_timeout: DEFAULT_IDLE_TIMEOUT,
293 }
294 }
295
296 pub fn projection<P: Projection>(&self) -> io::Result<P> {
315 let projections = self
316 .projections
317 .read()
318 .map_err(|e| io::Error::other(e.to_string()))?;
319 let runner_any = projections.get(P::NAME).ok_or_else(|| {
320 io::Error::new(
321 io::ErrorKind::NotFound,
322 format!("projection '{}' not registered", P::NAME),
323 )
324 })?;
325 let runner_arc = runner_any
330 .downcast_ref::<Arc<std::sync::Mutex<ProjectionRunner<P>>>>()
331 .ok_or_else(|| io::Error::other("projection type mismatch"))?;
332 let mut runner = runner_arc
333 .lock()
334 .map_err(|e| io::Error::other(e.to_string()))?;
335 runner.catch_up()?;
336 Ok(runner.state().clone())
337 }
338
339 pub fn rebuild_projection<P: Projection>(&self) -> io::Result<()> {
356 let projections = self
357 .projections
358 .read()
359 .map_err(|e| io::Error::other(e.to_string()))?;
360 let runner_any = projections.get(P::NAME).ok_or_else(|| {
361 io::Error::new(
362 io::ErrorKind::NotFound,
363 format!("projection '{}' not registered", P::NAME),
364 )
365 })?;
366 let runner_arc = runner_any
367 .downcast_ref::<Arc<std::sync::Mutex<ProjectionRunner<P>>>>()
368 .ok_or_else(|| io::Error::other("projection type mismatch"))?;
369 let mut runner = runner_arc
370 .lock()
371 .map_err(|e| io::Error::other(e.to_string()))?;
372 runner.rebuild()
373 }
374
375 pub async fn run_process_managers(&self) -> io::Result<ProcessManagerReport> {
392 let mut all_work: Vec<(Vec<crate::command::CommandEnvelope>, std::path::PathBuf)> =
395 Vec::new();
396
397 {
398 let pms = self
399 .process_managers
400 .read()
401 .map_err(|e| io::Error::other(e.to_string()))?;
402 for pm_mutex in pms.iter() {
403 let mut pm = pm_mutex
404 .lock()
405 .map_err(|e| io::Error::other(e.to_string()))?;
406 let envelopes = pm.catch_up()?;
407 let dead_letter_path = pm.dead_letter_path();
408 all_work.push((envelopes, dead_letter_path));
409 }
410 }
411
412 let mut report = ProcessManagerReport::default();
414 for (envelopes, dead_letter_path) in &all_work {
415 for envelope in envelopes {
416 let agg_type = &envelope.aggregate_type;
417 match self.dispatchers.get(agg_type) {
418 Some(dispatcher) => match dispatcher.dispatch(self, envelope.clone()).await {
419 Ok(()) => {
420 tracing::info!(
421 target_type = %agg_type,
422 target_id = %envelope.instance_id,
423 "dispatching command"
424 );
425 report.dispatched += 1;
426 }
427 Err(e) => {
428 tracing::error!(
429 aggregate_type = %agg_type,
430 instance_id = %envelope.instance_id,
431 error = %e,
432 "process manager dispatch failed, dead-lettering"
433 );
434 append_dead_letter(dead_letter_path, envelope.clone(), &e.to_string())?;
435 report.dead_lettered += 1;
436 }
437 },
438 None => {
439 let err_msg = format!("unknown aggregate type: {agg_type}");
440 tracing::error!(
441 aggregate_type = %agg_type,
442 "no dispatcher registered, dead-lettering"
443 );
444 append_dead_letter(dead_letter_path, envelope.clone(), &err_msg)?;
445 report.dead_lettered += 1;
446 }
447 }
448 }
449 }
450
451 {
453 let pms = self
454 .process_managers
455 .read()
456 .map_err(|e| io::Error::other(e.to_string()))?;
457 for pm_mutex in pms.iter() {
458 let pm = pm_mutex
459 .lock()
460 .map_err(|e| io::Error::other(e.to_string()))?;
461 pm.save()?;
462 }
463 }
464
465 Ok(report)
466 }
467
468 pub fn layout(&self) -> &StreamLayout {
470 &self.layout
471 }
472
473 pub async fn list_streams(
494 &self,
495 aggregate_type: Option<&str>,
496 ) -> io::Result<Vec<(String, String)>> {
497 let layout = self.layout.clone();
498 match aggregate_type {
499 Some(agg_type) => {
500 let agg_type = agg_type.to_owned();
501 tokio::task::spawn_blocking(move || {
502 let ids = layout.list_streams(&agg_type)?;
503 Ok(ids.into_iter().map(|id| (agg_type.clone(), id)).collect())
504 })
505 .await
506 .map_err(io::Error::other)?
507 }
508 None => tokio::task::spawn_blocking(move || {
509 let types = layout.list_aggregate_types()?;
510 let mut pairs = Vec::new();
511 for agg_type in types {
512 let ids = layout.list_streams(&agg_type)?;
513 pairs.extend(ids.into_iter().map(|id| (agg_type.clone(), id)));
514 }
515 Ok(pairs)
516 })
517 .await
518 .map_err(io::Error::other)?,
519 }
520 }
521
522 pub async fn read_events(
545 &self,
546 aggregate_type: &str,
547 instance_id: &str,
548 ) -> io::Result<Vec<eventfold::Event>> {
549 let layout = self.layout.clone();
550 let agg_type = aggregate_type.to_owned();
551 let inst_id = instance_id.to_owned();
552 tokio::task::spawn_blocking(move || {
553 let stream_dir = layout.stream_dir(&agg_type, &inst_id);
554
555 if !stream_dir.is_dir() {
557 return Err(io::Error::new(
558 io::ErrorKind::NotFound,
559 format!("stream directory not found: {}", stream_dir.display()),
560 ));
561 }
562
563 let reader = eventfold::EventReader::new(&stream_dir);
564 let iter = match reader.read_from(0) {
567 Ok(iter) => iter,
568 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
569 Err(e) => return Err(e),
570 };
571
572 let mut events = Vec::new();
573 for result in iter {
574 let (event, _next_offset, _line_hash) = result?;
575 events.push(event);
576 }
577 Ok(events)
578 })
579 .await
580 .map_err(io::Error::other)?
581 }
582
583 pub async fn inject_event<A: Aggregate>(
618 &self,
619 instance_id: &str,
620 event: eventfold::Event,
621 opts: InjectOptions,
622 ) -> io::Result<()> {
623 let event_id = event.id.clone();
625 if let Some(ref id) = event_id {
626 let seen = self
627 .seen_ids
628 .lock()
629 .map_err(|e| io::Error::other(e.to_string()))?;
630 if seen.contains(id) {
631 return Ok(());
632 }
633 }
634
635 let layout = self.layout.clone();
637 let agg_type = A::AGGREGATE_TYPE.to_owned();
638 let inst_id = instance_id.to_owned();
639 let stream_dir =
640 tokio::task::spawn_blocking(move || layout.ensure_stream(&agg_type, &inst_id))
641 .await
642 .map_err(io::Error::other)??;
643
644 let key = (A::AGGREGATE_TYPE.to_owned(), instance_id.to_owned());
647 let injected_via_actor = {
648 let cache = self.cache.read().await;
649 if let Some(boxed) = cache.get(&key)
650 && let Some(handle) = boxed.downcast_ref::<AggregateHandle<A>>()
651 && handle.is_alive()
652 {
653 handle.inject_via_actor(event.clone()).await?;
654 true
655 } else {
656 false
657 }
658 };
659
660 if !injected_via_actor {
661 let ev = event;
662 tokio::task::spawn_blocking(move || {
663 let mut writer = eventfold::EventWriter::open(&stream_dir)?;
664 writer.append(&ev).map(|_| ())
665 })
666 .await
667 .map_err(io::Error::other)??;
668 }
669
670 if let Some(id) = event_id {
672 let mut seen = self
673 .seen_ids
674 .lock()
675 .map_err(|e| io::Error::other(e.to_string()))?;
676 seen.insert(id);
677 }
678
679 {
681 let catch_ups = self
682 .projection_catch_ups
683 .read()
684 .map_err(|e| io::Error::other(e.to_string()))?;
685 for catch_up_mutex in catch_ups.iter() {
686 let mut catch_up = catch_up_mutex
687 .lock()
688 .map_err(|e| io::Error::other(e.to_string()))?;
689 catch_up.catch_up()?;
690 }
691 }
692
693 if opts.run_process_managers {
695 self.run_process_managers().await?;
696 }
697
698 Ok(())
699 }
700}
701
702type ProjectionFactory = Box<
711 dyn FnOnce(
712 StreamLayout,
713 ) -> io::Result<(
714 Box<dyn Any + Send + Sync>,
715 std::sync::Mutex<Box<dyn ProjectionCatchUpFn>>,
716 )>,
717>;
718
719type ProcessManagerFactory =
721 Box<dyn FnOnce(StreamLayout) -> io::Result<std::sync::Mutex<Box<dyn ProcessManagerCatchUp>>>>;
722
723type DispatcherFactory = Box<dyn FnOnce() -> Box<dyn AggregateDispatcher>>;
725
726pub struct AggregateStoreBuilder {
751 base_dir: PathBuf,
752 projection_factories: Vec<(String, ProjectionFactory)>,
753 process_manager_factories: Vec<(String, ProcessManagerFactory)>,
754 dispatcher_factories: Vec<(String, DispatcherFactory)>,
755 idle_timeout: Duration,
756}
757
758impl AggregateStoreBuilder {
759 pub fn projection<P: Projection>(mut self) -> Self {
772 self.projection_factories.push((
773 P::NAME.to_owned(),
774 Box::new(|layout| {
775 let runner = ProjectionRunner::<P>::new(layout)?;
776 let shared = Arc::new(std::sync::Mutex::new(runner));
777 let any_box: Box<dyn Any + Send + Sync> = Box::new(shared.clone());
780 let catch_up: std::sync::Mutex<Box<dyn ProjectionCatchUpFn>> =
782 std::sync::Mutex::new(Box::new(SharedProjectionCatchUp { inner: shared }));
783 Ok((any_box, catch_up))
784 }),
785 ));
786 self
787 }
788
789 pub fn process_manager<PM>(mut self) -> Self
804 where
805 PM: crate::process_manager::ProcessManager,
806 {
807 self.process_manager_factories.push((
808 PM::NAME.to_owned(),
809 Box::new(|layout| {
810 let runner = ProcessManagerRunner::<PM>::new(layout)?;
811 Ok(std::sync::Mutex::new(
812 Box::new(runner) as Box<dyn ProcessManagerCatchUp>
813 ))
814 }),
815 ));
816 self
817 }
818
819 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
849 self.idle_timeout = timeout;
850 self
851 }
852
853 pub fn aggregate_type<A>(mut self) -> Self
867 where
868 A: Aggregate,
869 A::Command: serde::de::DeserializeOwned,
870 {
871 self.dispatcher_factories.push((
872 A::AGGREGATE_TYPE.to_owned(),
873 Box::new(|| Box::new(TypedDispatcher::<A>::new()) as Box<dyn AggregateDispatcher>),
874 ));
875 self
876 }
877
878 pub async fn open(self) -> io::Result<AggregateStore> {
893 let layout = StreamLayout::new(&self.base_dir);
894 let meta_dir = layout.meta_dir();
895 tokio::task::spawn_blocking(move || std::fs::create_dir_all(meta_dir))
896 .await
897 .map_err(io::Error::other)??;
898
899 let mut projections = HashMap::new();
900 let mut projection_catch_ups: ProjectionCatchUpList = Vec::new();
901 for (name, factory) in self.projection_factories {
902 let (any_runner, catch_up) = factory(layout.clone())?;
903 projections.insert(name, any_runner);
904 projection_catch_ups.push(catch_up);
905 }
906
907 let mut process_managers = Vec::new();
908 for (_name, factory) in self.process_manager_factories {
909 let runner = factory(layout.clone())?;
910 process_managers.push(runner);
911 }
912
913 let mut dispatchers: HashMap<String, Box<dyn AggregateDispatcher>> = HashMap::new();
914 for (name, factory) in self.dispatcher_factories {
915 dispatchers.insert(name, factory());
916 }
917
918 Ok(AggregateStore {
919 layout,
920 cache: Arc::new(RwLock::new(HashMap::new())),
921 projections: Arc::new(std::sync::RwLock::new(projections)),
922 projection_catch_ups: Arc::new(std::sync::RwLock::new(projection_catch_ups)),
923 process_managers: Arc::new(std::sync::RwLock::new(process_managers)),
924 dispatchers: Arc::new(dispatchers),
925 seen_ids: Arc::new(std::sync::Mutex::new(HashSet::new())),
926 idle_timeout: self.idle_timeout,
927 })
928 }
929}
930
931#[cfg(test)]
932mod tests {
933 use std::time::Duration;
934
935 use tempfile::TempDir;
936
937 use super::*;
938 use crate::aggregate::test_fixtures::{Counter, CounterCommand};
939 use crate::command::CommandContext;
940
941 #[tokio::test]
942 async fn full_roundtrip() {
943 let tmp = TempDir::new().expect("failed to create temp dir");
944 let store = AggregateStore::open(tmp.path())
945 .await
946 .expect("open should succeed");
947
948 let handle = store
949 .get::<Counter>("c-1")
950 .await
951 .expect("get should succeed");
952
953 let ctx = CommandContext::default();
954 handle
955 .execute(CounterCommand::Increment, ctx.clone())
956 .await
957 .expect("first increment should succeed");
958 handle
959 .execute(CounterCommand::Increment, ctx)
960 .await
961 .expect("second increment should succeed");
962
963 let state = handle.state().await.expect("state should succeed");
964 assert_eq!(state.value, 2);
965 }
966
967 #[tokio::test]
968 async fn list_empty_initially() {
969 let tmp = TempDir::new().expect("failed to create temp dir");
970 let store = AggregateStore::open(tmp.path())
971 .await
972 .expect("open should succeed");
973
974 let ids = store.list::<Counter>().await.expect("list should succeed");
975 assert!(ids.is_empty());
976 }
977
978 #[tokio::test]
979 async fn list_after_commands() {
980 let tmp = TempDir::new().expect("failed to create temp dir");
981 let store = AggregateStore::open(tmp.path())
982 .await
983 .expect("open should succeed");
984
985 let ctx = CommandContext::default();
986
987 let h1 = store
988 .get::<Counter>("c-1")
989 .await
990 .expect("get c-1 should succeed");
991 h1.execute(CounterCommand::Increment, ctx.clone())
992 .await
993 .expect("c-1 increment should succeed");
994
995 let h2 = store
996 .get::<Counter>("c-2")
997 .await
998 .expect("get c-2 should succeed");
999 h2.execute(CounterCommand::Increment, ctx)
1000 .await
1001 .expect("c-2 increment should succeed");
1002
1003 let mut ids = store.list::<Counter>().await.expect("list should succeed");
1004 ids.sort();
1005 assert_eq!(ids, vec!["c-1", "c-2"]);
1006 }
1007
1008 #[tokio::test]
1009 async fn same_id_returns_shared_handle() {
1010 let tmp = TempDir::new().expect("failed to create temp dir");
1011 let store = AggregateStore::open(tmp.path())
1012 .await
1013 .expect("open should succeed");
1014
1015 let h1 = store
1016 .get::<Counter>("c-1")
1017 .await
1018 .expect("first get should succeed");
1019 let h2 = store
1020 .get::<Counter>("c-1")
1021 .await
1022 .expect("second get should succeed");
1023
1024 h1.execute(CounterCommand::Increment, CommandContext::default())
1025 .await
1026 .expect("increment via h1 should succeed");
1027
1028 let state = h2.state().await.expect("state via h2 should succeed");
1029 assert_eq!(state.value, 1);
1030 }
1031
1032 #[tokio::test]
1033 async fn state_survives_store_reopen() {
1034 let tmp = TempDir::new().expect("failed to create temp dir");
1035
1036 {
1038 let store = AggregateStore::open(tmp.path())
1039 .await
1040 .expect("open should succeed");
1041 let handle = store
1042 .get::<Counter>("c-1")
1043 .await
1044 .expect("get should succeed");
1045 let ctx = CommandContext::default();
1046 for _ in 0..3 {
1047 handle
1048 .execute(CounterCommand::Increment, ctx.clone())
1049 .await
1050 .expect("increment should succeed");
1051 }
1052 }
1053
1054 tokio::time::sleep(Duration::from_millis(50)).await;
1056
1057 let store = AggregateStore::open(tmp.path())
1059 .await
1060 .expect("reopen should succeed");
1061 let handle = store
1062 .get::<Counter>("c-1")
1063 .await
1064 .expect("get after reopen should succeed");
1065 let state = handle.state().await.expect("state should succeed");
1066 assert_eq!(state.value, 3);
1067 }
1068
1069 #[tokio::test]
1070 async fn two_aggregate_types_coexist() {
1071 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1073 struct Toggle {
1074 pub on: bool,
1075 }
1076
1077 #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
1078 #[serde(tag = "type", content = "data")]
1079 enum ToggleEvent {
1080 Toggled,
1081 }
1082
1083 #[derive(Debug, thiserror::Error)]
1084 enum ToggleError {}
1085
1086 impl Aggregate for Toggle {
1087 const AGGREGATE_TYPE: &'static str = "toggle";
1088 type Command = ();
1089 type DomainEvent = ToggleEvent;
1090 type Error = ToggleError;
1091
1092 fn handle(&self, _cmd: ()) -> Result<Vec<ToggleEvent>, ToggleError> {
1093 Ok(vec![ToggleEvent::Toggled])
1094 }
1095
1096 fn apply(mut self, _event: &ToggleEvent) -> Self {
1097 self.on = !self.on;
1098 self
1099 }
1100 }
1101
1102 let tmp = TempDir::new().expect("failed to create temp dir");
1103 let store = AggregateStore::open(tmp.path())
1104 .await
1105 .expect("open should succeed");
1106
1107 let counter_handle = store
1109 .get::<Counter>("c-1")
1110 .await
1111 .expect("get counter should succeed");
1112 counter_handle
1113 .execute(CounterCommand::Increment, CommandContext::default())
1114 .await
1115 .expect("counter increment should succeed");
1116
1117 let toggle_handle = store
1119 .get::<Toggle>("t-1")
1120 .await
1121 .expect("get toggle should succeed");
1122 toggle_handle
1123 .execute((), CommandContext::default())
1124 .await
1125 .expect("toggle should succeed");
1126
1127 let counter_state = counter_handle
1129 .state()
1130 .await
1131 .expect("counter state should succeed");
1132 assert_eq!(counter_state.value, 1);
1133
1134 let toggle_state = toggle_handle
1135 .state()
1136 .await
1137 .expect("toggle state should succeed");
1138 assert!(toggle_state.on);
1139
1140 let counter_ids = store
1142 .list::<Counter>()
1143 .await
1144 .expect("list counters should succeed");
1145 assert_eq!(counter_ids, vec!["c-1"]);
1146
1147 let toggle_ids = store
1148 .list::<Toggle>()
1149 .await
1150 .expect("list toggles should succeed");
1151 assert_eq!(toggle_ids, vec!["t-1"]);
1152 }
1153
1154 use crate::projection::test_fixtures::EventCounter;
1157
1158 async fn increment(store: &AggregateStore, id: &str) {
1160 let handle = store.get::<Counter>(id).await.expect("get should succeed");
1161 handle
1162 .execute(CounterCommand::Increment, CommandContext::default())
1163 .await
1164 .expect("increment should succeed");
1165 }
1166
1167 #[tokio::test]
1168 async fn builder_with_projection_roundtrip() {
1169 let tmp = TempDir::new().expect("failed to create temp dir");
1170 let store = AggregateStore::builder(tmp.path())
1171 .projection::<EventCounter>()
1172 .open()
1173 .await
1174 .expect("builder open should succeed");
1175
1176 let handle = store
1177 .get::<Counter>("c-1")
1178 .await
1179 .expect("get should succeed");
1180 let ctx = CommandContext::default();
1181 for _ in 0..3 {
1182 handle
1183 .execute(CounterCommand::Increment, ctx.clone())
1184 .await
1185 .expect("increment should succeed");
1186 }
1187
1188 let counter = store
1189 .projection::<EventCounter>()
1190 .expect("projection query should succeed");
1191 assert_eq!(counter.count, 3);
1192 }
1193
1194 #[tokio::test]
1195 async fn projection_sees_multiple_instances() {
1196 let tmp = TempDir::new().expect("failed to create temp dir");
1197 let store = AggregateStore::builder(tmp.path())
1198 .projection::<EventCounter>()
1199 .open()
1200 .await
1201 .expect("builder open should succeed");
1202
1203 increment(&store, "c-1").await;
1204 increment(&store, "c-2").await;
1205
1206 let counter = store
1207 .projection::<EventCounter>()
1208 .expect("projection query should succeed");
1209 assert_eq!(counter.count, 2);
1210 }
1211
1212 #[tokio::test]
1213 async fn projection_persists_across_restart() {
1214 let tmp = TempDir::new().expect("failed to create temp dir");
1215
1216 {
1218 let store = AggregateStore::builder(tmp.path())
1219 .projection::<EventCounter>()
1220 .open()
1221 .await
1222 .expect("builder open should succeed");
1223
1224 increment(&store, "c-1").await;
1225 increment(&store, "c-1").await;
1226 increment(&store, "c-2").await;
1227
1228 let counter = store
1229 .projection::<EventCounter>()
1230 .expect("projection query should succeed");
1231 assert_eq!(counter.count, 3);
1232 }
1233
1234 tokio::time::sleep(Duration::from_millis(50)).await;
1236
1237 let store = AggregateStore::builder(tmp.path())
1240 .projection::<EventCounter>()
1241 .open()
1242 .await
1243 .expect("reopen should succeed");
1244
1245 let counter = store
1246 .projection::<EventCounter>()
1247 .expect("projection query after reopen should succeed");
1248 assert_eq!(counter.count, 3);
1249 }
1250
1251 #[tokio::test]
1252 async fn projection_without_registration_returns_error() {
1253 let tmp = TempDir::new().expect("failed to create temp dir");
1254 let store = AggregateStore::open(tmp.path())
1255 .await
1256 .expect("open should succeed");
1257
1258 let result = store.projection::<EventCounter>();
1259 assert!(result.is_err());
1260 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
1261 }
1262
1263 #[tokio::test]
1264 async fn open_convenience_still_works() {
1265 let tmp = TempDir::new().expect("failed to create temp dir");
1266 let store = AggregateStore::open(tmp.path())
1267 .await
1268 .expect("open should succeed");
1269
1270 let handle = store
1271 .get::<Counter>("c-1")
1272 .await
1273 .expect("get should succeed");
1274 handle
1275 .execute(CounterCommand::Increment, CommandContext::default())
1276 .await
1277 .expect("increment should succeed");
1278
1279 let state = handle.state().await.expect("state should succeed");
1280 assert_eq!(state.value, 1);
1281 }
1282
1283 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1292 struct Receiver {
1293 pub received_count: u64,
1294 }
1295
1296 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1297 #[serde(tag = "type", content = "data")]
1298 enum ReceiverCommand {
1299 Accept,
1300 }
1301
1302 #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
1303 #[serde(tag = "type", content = "data")]
1304 enum ReceiverEvent {
1305 Accepted,
1306 }
1307
1308 #[derive(Debug, thiserror::Error)]
1309 enum ReceiverError {}
1310
1311 impl Aggregate for Receiver {
1312 const AGGREGATE_TYPE: &'static str = "receiver";
1313 type Command = ReceiverCommand;
1314 type DomainEvent = ReceiverEvent;
1315 type Error = ReceiverError;
1316
1317 fn handle(&self, _cmd: ReceiverCommand) -> Result<Vec<ReceiverEvent>, ReceiverError> {
1318 Ok(vec![ReceiverEvent::Accepted])
1319 }
1320
1321 fn apply(mut self, _event: &ReceiverEvent) -> Self {
1322 self.received_count += 1;
1323 self
1324 }
1325 }
1326
1327 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1330 struct ForwardSaga {
1331 pub forwarded: u64,
1332 }
1333
1334 impl crate::process_manager::ProcessManager for ForwardSaga {
1335 const NAME: &'static str = "forward-saga";
1336
1337 fn subscriptions(&self) -> &'static [&'static str] {
1338 &["counter"]
1339 }
1340
1341 fn react(
1342 &mut self,
1343 _aggregate_type: &str,
1344 stream_id: &str,
1345 _event: &eventfold::Event,
1346 ) -> Vec<crate::command::CommandEnvelope> {
1347 self.forwarded += 1;
1348 vec![crate::command::CommandEnvelope {
1349 aggregate_type: "receiver".to_string(),
1350 instance_id: stream_id.to_string(),
1351 command: serde_json::json!({"type": "Accept"}),
1352 context: CommandContext::default(),
1353 }]
1354 }
1355 }
1356
1357 #[tokio::test]
1358 async fn end_to_end_process_manager_dispatch() {
1359 let tmp = TempDir::new().expect("failed to create temp dir");
1360 let store = AggregateStore::builder(tmp.path())
1361 .process_manager::<ForwardSaga>()
1362 .aggregate_type::<Receiver>()
1363 .open()
1364 .await
1365 .expect("builder open should succeed");
1366
1367 increment(&store, "c-1").await;
1369 increment(&store, "c-1").await;
1370
1371 let report = store
1373 .run_process_managers()
1374 .await
1375 .expect("run_process_managers should succeed");
1376
1377 assert_eq!(report.dispatched, 2);
1378 assert_eq!(report.dead_lettered, 0);
1379
1380 let receiver_handle = store
1382 .get::<Receiver>("c-1")
1383 .await
1384 .expect("get receiver should succeed");
1385 let receiver_state = receiver_handle
1386 .state()
1387 .await
1388 .expect("receiver state should succeed");
1389 assert_eq!(receiver_state.received_count, 2);
1390 }
1391
1392 #[tokio::test]
1393 async fn process_manager_dead_letters_unknown_type() {
1394 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1396 struct BadTargetSaga {
1397 seen: u64,
1398 }
1399
1400 impl crate::process_manager::ProcessManager for BadTargetSaga {
1401 const NAME: &'static str = "bad-target-saga";
1402
1403 fn subscriptions(&self) -> &'static [&'static str] {
1404 &["counter"]
1405 }
1406
1407 fn react(
1408 &mut self,
1409 _aggregate_type: &str,
1410 _stream_id: &str,
1411 _event: &eventfold::Event,
1412 ) -> Vec<crate::command::CommandEnvelope> {
1413 self.seen += 1;
1414 vec![crate::command::CommandEnvelope {
1415 aggregate_type: "nonexistent".to_string(),
1416 instance_id: "x".to_string(),
1417 command: serde_json::json!({}),
1418 context: CommandContext::default(),
1419 }]
1420 }
1421 }
1422
1423 let tmp = TempDir::new().expect("failed to create temp dir");
1424 let store = AggregateStore::builder(tmp.path())
1425 .process_manager::<BadTargetSaga>()
1426 .open()
1427 .await
1428 .expect("builder open should succeed");
1429
1430 increment(&store, "c-1").await;
1431
1432 let report = store
1433 .run_process_managers()
1434 .await
1435 .expect("run_process_managers should succeed");
1436
1437 assert_eq!(report.dispatched, 0);
1438 assert_eq!(report.dead_lettered, 1);
1439
1440 let dl_path = tmp
1442 .path()
1443 .join("process_managers/bad-target-saga/dead_letters.jsonl");
1444 let contents = std::fs::read_to_string(&dl_path).expect("dead-letter file should exist");
1445 let entry: serde_json::Value =
1446 serde_json::from_str(contents.trim()).expect("dead-letter entry should be valid JSON");
1447 assert!(
1448 entry["error"]
1449 .as_str()
1450 .expect("error field should be a string")
1451 .contains("nonexistent")
1452 );
1453 }
1454
1455 #[tokio::test]
1456 async fn run_process_managers_idempotent() {
1457 let tmp = TempDir::new().expect("failed to create temp dir");
1458 let store = AggregateStore::builder(tmp.path())
1459 .process_manager::<ForwardSaga>()
1460 .aggregate_type::<Receiver>()
1461 .open()
1462 .await
1463 .expect("builder open should succeed");
1464
1465 increment(&store, "c-1").await;
1466
1467 let first = store
1469 .run_process_managers()
1470 .await
1471 .expect("first run should succeed");
1472 assert_eq!(first.dispatched, 1);
1473
1474 let second = store
1476 .run_process_managers()
1477 .await
1478 .expect("second run should succeed");
1479 assert_eq!(second.dispatched, 0);
1480 assert_eq!(second.dead_lettered, 0);
1481 }
1482
1483 #[tokio::test]
1484 async fn process_manager_recovers_after_restart() {
1485 let tmp = TempDir::new().expect("failed to create temp dir");
1486
1487 {
1489 let store = AggregateStore::builder(tmp.path())
1490 .process_manager::<ForwardSaga>()
1491 .aggregate_type::<Receiver>()
1492 .open()
1493 .await
1494 .expect("builder open should succeed");
1495
1496 increment(&store, "c-1").await;
1497 increment(&store, "c-2").await;
1498
1499 let report = store
1500 .run_process_managers()
1501 .await
1502 .expect("run should succeed");
1503 assert_eq!(report.dispatched, 2);
1504 }
1505
1506 tokio::time::sleep(Duration::from_millis(50)).await;
1508
1509 let store = AggregateStore::builder(tmp.path())
1511 .process_manager::<ForwardSaga>()
1512 .aggregate_type::<Receiver>()
1513 .open()
1514 .await
1515 .expect("reopen should succeed");
1516
1517 increment(&store, "c-1").await;
1518
1519 let report = store
1520 .run_process_managers()
1521 .await
1522 .expect("run after restart should succeed");
1523
1524 assert_eq!(report.dispatched, 1);
1526 assert_eq!(report.dead_lettered, 0);
1527 }
1528
1529 #[tokio::test]
1532 async fn idle_actor_evicted_and_respawned() {
1533 let tmp = TempDir::new().expect("failed to create temp dir");
1534 let store = AggregateStore::builder(tmp.path())
1535 .idle_timeout(Duration::from_millis(200))
1536 .open()
1537 .await
1538 .expect("builder open should succeed");
1539
1540 let handle = store
1542 .get::<Counter>("c-1")
1543 .await
1544 .expect("get should succeed");
1545 handle
1546 .execute(CounterCommand::Increment, CommandContext::default())
1547 .await
1548 .expect("increment should succeed");
1549
1550 tokio::time::sleep(Duration::from_millis(400)).await;
1552 assert!(
1553 !handle.is_alive(),
1554 "actor should be dead after idle timeout"
1555 );
1556
1557 let handle2 = store
1559 .get::<Counter>("c-1")
1560 .await
1561 .expect("get after eviction should succeed");
1562 let state = handle2.state().await.expect("state should succeed");
1563 assert_eq!(state.value, 1, "state should reflect persisted events");
1564 }
1565
1566 #[tokio::test]
1567 async fn rapid_commands_keep_actor_alive() {
1568 let tmp = TempDir::new().expect("failed to create temp dir");
1569 let store = AggregateStore::builder(tmp.path())
1570 .idle_timeout(Duration::from_millis(300))
1571 .open()
1572 .await
1573 .expect("builder open should succeed");
1574
1575 let handle = store
1576 .get::<Counter>("c-1")
1577 .await
1578 .expect("get should succeed");
1579
1580 let ctx = CommandContext::default();
1581 for _ in 0..5 {
1582 handle
1583 .execute(CounterCommand::Increment, ctx.clone())
1584 .await
1585 .expect("execute should succeed");
1586 tokio::time::sleep(Duration::from_millis(100)).await;
1587 }
1588
1589 assert!(
1590 handle.is_alive(),
1591 "actor should remain alive during activity"
1592 );
1593 let state = handle.state().await.expect("state should succeed");
1594 assert_eq!(state.value, 5);
1595 }
1596
1597 fn incremented_event() -> eventfold::Event {
1601 eventfold::Event::new("Incremented", serde_json::Value::Null)
1602 }
1603
1604 #[tokio::test]
1605 async fn inject_event_appends_to_stream() {
1606 let tmp = TempDir::new().expect("failed to create temp dir");
1607 let store = AggregateStore::open(tmp.path())
1608 .await
1609 .expect("open should succeed");
1610
1611 store
1612 .inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
1613 .await
1614 .expect("inject_event should succeed");
1615
1616 let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
1618 let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
1619 assert_eq!(
1620 contents.lines().count(),
1621 1,
1622 "should have exactly one event line"
1623 );
1624 }
1625
1626 #[tokio::test]
1627 async fn inject_event_projections_reflect_event() {
1628 let tmp = TempDir::new().expect("failed to create temp dir");
1629 let store = AggregateStore::builder(tmp.path())
1630 .projection::<EventCounter>()
1631 .open()
1632 .await
1633 .expect("builder open should succeed");
1634
1635 store
1636 .inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
1637 .await
1638 .expect("inject_event should succeed");
1639
1640 let counter = store
1641 .projection::<EventCounter>()
1642 .expect("projection query should succeed");
1643 assert_eq!(counter.count, 1);
1644 }
1645
1646 #[tokio::test]
1647 async fn inject_event_dedup_by_id() {
1648 let tmp = TempDir::new().expect("failed to create temp dir");
1649 let store = AggregateStore::open(tmp.path())
1650 .await
1651 .expect("open should succeed");
1652
1653 let event = incremented_event().with_id("ev-1".to_string());
1654
1655 store
1657 .inject_event::<Counter>("c-1", event.clone(), InjectOptions::default())
1658 .await
1659 .expect("first inject should succeed");
1660
1661 store
1663 .inject_event::<Counter>("c-1", event, InjectOptions::default())
1664 .await
1665 .expect("second inject should succeed (no-op)");
1666
1667 let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
1669 let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
1670 assert_eq!(
1671 contents.lines().count(),
1672 1,
1673 "dedup should prevent second write"
1674 );
1675 }
1676
1677 #[tokio::test]
1678 async fn inject_event_no_dedup_for_none_id() {
1679 let tmp = TempDir::new().expect("failed to create temp dir");
1680 let store = AggregateStore::open(tmp.path())
1681 .await
1682 .expect("open should succeed");
1683
1684 let event = incremented_event();
1686 assert!(event.id.is_none(), "precondition: id is None");
1687
1688 store
1689 .inject_event::<Counter>("c-1", event.clone(), InjectOptions::default())
1690 .await
1691 .expect("first inject should succeed");
1692
1693 store
1694 .inject_event::<Counter>("c-1", event, InjectOptions::default())
1695 .await
1696 .expect("second inject should succeed");
1697
1698 let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
1699 let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
1700 assert_eq!(contents.lines().count(), 2, "both events should be written");
1701 }
1702
1703 #[tokio::test]
1704 async fn inject_options_default_does_not_run_process_managers() {
1705 let opts = InjectOptions::default();
1706 assert!(!opts.run_process_managers);
1707 }
1708
1709 #[tokio::test]
1710 async fn inject_event_with_process_managers() {
1711 let tmp = TempDir::new().expect("failed to create temp dir");
1712 let store = AggregateStore::builder(tmp.path())
1713 .process_manager::<ForwardSaga>()
1714 .aggregate_type::<Receiver>()
1715 .open()
1716 .await
1717 .expect("builder open should succeed");
1718
1719 store
1720 .inject_event::<Counter>(
1721 "c-1",
1722 incremented_event(),
1723 InjectOptions {
1724 run_process_managers: true,
1725 },
1726 )
1727 .await
1728 .expect("inject_event should succeed");
1729
1730 let receiver_handle = store
1732 .get::<Receiver>("c-1")
1733 .await
1734 .expect("get receiver should succeed");
1735 let receiver_state = receiver_handle
1736 .state()
1737 .await
1738 .expect("receiver state should succeed");
1739 assert_eq!(
1740 receiver_state.received_count, 1,
1741 "process manager should have dispatched"
1742 );
1743 }
1744
1745 #[tokio::test]
1746 async fn inject_event_with_live_actor() {
1747 let tmp = TempDir::new().expect("failed to create temp dir");
1748 let store = AggregateStore::open(tmp.path())
1749 .await
1750 .expect("open should succeed");
1751
1752 let handle = store
1754 .get::<Counter>("c-1")
1755 .await
1756 .expect("get should succeed");
1757 assert!(handle.is_alive(), "actor should be alive");
1758
1759 store
1761 .inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
1762 .await
1763 .expect("inject_event with live actor should succeed");
1764
1765 let state = handle.state().await.expect("state should succeed");
1767 assert_eq!(state.value, 1, "actor should see the injected event");
1768 }
1769
1770 #[tokio::test]
1771 async fn inject_event_creates_new_stream() {
1772 let tmp = TempDir::new().expect("failed to create temp dir");
1773 let store = AggregateStore::open(tmp.path())
1774 .await
1775 .expect("open should succeed");
1776
1777 store
1779 .inject_event::<Counter>(
1780 "new-instance",
1781 incremented_event(),
1782 InjectOptions::default(),
1783 )
1784 .await
1785 .expect("inject_event should create stream");
1786
1787 let stream_dir = tmp.path().join("streams/counter/new-instance");
1789 assert!(stream_dir.is_dir(), "stream directory should exist");
1790
1791 let handle = store
1793 .get::<Counter>("new-instance")
1794 .await
1795 .expect("get should succeed after inject");
1796 let state = handle.state().await.expect("state should succeed");
1797 assert_eq!(state.value, 1, "actor should replay the injected event");
1798 }
1799
1800 #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1804 struct Toggle {
1805 pub on: bool,
1806 }
1807
1808 #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
1809 #[serde(tag = "type", content = "data")]
1810 enum ToggleEvent {
1811 Toggled,
1812 }
1813
1814 #[derive(Debug, thiserror::Error)]
1815 enum ToggleError {}
1816
1817 impl Aggregate for Toggle {
1818 const AGGREGATE_TYPE: &'static str = "toggle";
1819 type Command = ();
1820 type DomainEvent = ToggleEvent;
1821 type Error = ToggleError;
1822
1823 fn handle(&self, _cmd: ()) -> Result<Vec<ToggleEvent>, ToggleError> {
1824 Ok(vec![ToggleEvent::Toggled])
1825 }
1826
1827 fn apply(mut self, _event: &ToggleEvent) -> Self {
1828 self.on = !self.on;
1829 self
1830 }
1831 }
1832
1833 async fn toggle(store: &AggregateStore, id: &str) {
1835 let handle = store
1836 .get::<Toggle>(id)
1837 .await
1838 .expect("get toggle should succeed");
1839 handle
1840 .execute((), CommandContext::default())
1841 .await
1842 .expect("toggle should succeed");
1843 }
1844
1845 #[tokio::test]
1846 async fn list_streams_none_returns_all_sorted() {
1847 let tmp = TempDir::new().expect("failed to create temp dir");
1848 let store = AggregateStore::open(tmp.path())
1849 .await
1850 .expect("open should succeed");
1851
1852 increment(&store, "c-1").await;
1854 increment(&store, "c-2").await;
1855 toggle(&store, "t-1").await;
1856
1857 let pairs = store
1858 .list_streams(None)
1859 .await
1860 .expect("list_streams(None) should succeed");
1861
1862 assert_eq!(
1863 pairs,
1864 vec![
1865 ("counter".to_owned(), "c-1".to_owned()),
1866 ("counter".to_owned(), "c-2".to_owned()),
1867 ("toggle".to_owned(), "t-1".to_owned()),
1868 ]
1869 );
1870 }
1871
1872 #[tokio::test]
1873 async fn list_streams_some_filters_by_type() {
1874 let tmp = TempDir::new().expect("failed to create temp dir");
1875 let store = AggregateStore::open(tmp.path())
1876 .await
1877 .expect("open should succeed");
1878
1879 increment(&store, "c-1").await;
1880 increment(&store, "c-2").await;
1881 toggle(&store, "t-1").await;
1882
1883 let pairs = store
1884 .list_streams(Some("counter"))
1885 .await
1886 .expect("list_streams(Some) should succeed");
1887
1888 assert_eq!(
1889 pairs,
1890 vec![
1891 ("counter".to_owned(), "c-1".to_owned()),
1892 ("counter".to_owned(), "c-2".to_owned()),
1893 ]
1894 );
1895 }
1896
1897 #[tokio::test]
1898 async fn list_streams_none_empty_store() {
1899 let tmp = TempDir::new().expect("failed to create temp dir");
1900 let store = AggregateStore::open(tmp.path())
1901 .await
1902 .expect("open should succeed");
1903
1904 let pairs = store
1905 .list_streams(None)
1906 .await
1907 .expect("list_streams(None) on empty store should succeed");
1908
1909 assert!(pairs.is_empty());
1910 }
1911
1912 #[tokio::test]
1913 async fn list_streams_some_nonexistent_type() {
1914 let tmp = TempDir::new().expect("failed to create temp dir");
1915 let store = AggregateStore::open(tmp.path())
1916 .await
1917 .expect("open should succeed");
1918
1919 let pairs = store
1920 .list_streams(Some("nonexistent"))
1921 .await
1922 .expect("list_streams(Some(nonexistent)) should succeed");
1923
1924 assert!(pairs.is_empty());
1925 }
1926
1927 #[tokio::test]
1928 async fn read_events_returns_all_events() {
1929 let tmp = TempDir::new().expect("failed to create temp dir");
1930 let store = AggregateStore::open(tmp.path())
1931 .await
1932 .expect("open should succeed");
1933
1934 increment(&store, "c-1").await;
1935 increment(&store, "c-1").await;
1936
1937 let events = store
1938 .read_events("counter", "c-1")
1939 .await
1940 .expect("read_events should succeed");
1941
1942 assert_eq!(events.len(), 2);
1943 assert_eq!(events[0].event_type, "Incremented");
1944 assert_eq!(events[1].event_type, "Incremented");
1945 }
1946
1947 #[tokio::test]
1948 async fn read_events_empty_stream_returns_empty_vec() {
1949 let tmp = TempDir::new().expect("failed to create temp dir");
1950 let store = AggregateStore::open(tmp.path())
1951 .await
1952 .expect("open should succeed");
1953
1954 let _handle = store
1956 .get::<Counter>("c-1")
1957 .await
1958 .expect("get should succeed");
1959
1960 let events = store
1964 .read_events("counter", "c-1")
1965 .await
1966 .expect("read_events on empty stream should succeed");
1967
1968 assert!(events.is_empty());
1969 }
1970
1971 #[tokio::test]
1972 async fn read_events_nonexistent_stream_returns_not_found() {
1973 let tmp = TempDir::new().expect("failed to create temp dir");
1974 let store = AggregateStore::open(tmp.path())
1975 .await
1976 .expect("open should succeed");
1977
1978 let result = store.read_events("nonexistent", "x").await;
1979
1980 assert!(result.is_err());
1981 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
1982 }
1983}