Skip to main content

eventfold_es/
store.rs

1//! Top-level entry point that composes storage layout, actor spawning, and
2//! handle caching into a single `AggregateStore` type.
3
4use std::any::Any;
5use std::collections::{HashMap, HashSet};
6use std::io;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::RwLock;
12
13use crate::actor::{ActorConfig, AggregateHandle, spawn_actor_with_config};
14use crate::aggregate::Aggregate;
15use crate::process_manager::{
16    AggregateDispatcher, ProcessManagerCatchUp, ProcessManagerReport, ProcessManagerRunner,
17    TypedDispatcher, append_dead_letter,
18};
19use crate::projection::{Projection, ProjectionRunner};
20use crate::storage::StreamLayout;
21
22/// Type-erased handle cache keyed by `(aggregate_type, instance_id)`.
23///
24/// `Box<dyn Any + Send + Sync>` lets a single map hold `AggregateHandle<A>`
25/// for any concrete `A`. Downcasting recovers the typed handle.
26type HandleCache = HashMap<(String, String), Box<dyn Any + Send + Sync>>;
27
28/// Type-erased projection map keyed by projection name.
29///
30/// Each value is a `std::sync::Mutex<ProjectionRunner<P>>` erased to `dyn Any`.
31/// We use `std::sync::Mutex` (not `tokio::sync::Mutex`) because the lock is
32/// held briefly and `catch_up` does blocking I/O that should not hold an async
33/// mutex across an `.await` point.
34type ProjectionMap = HashMap<String, Box<dyn Any + Send + Sync>>;
35
36/// Type-erased list of process manager runners.
37///
38/// Each runner is wrapped in `std::sync::Mutex` because `catch_up` does
39/// blocking file I/O that must not hold an async mutex.
40type ProcessManagerList = Vec<std::sync::Mutex<Box<dyn ProcessManagerCatchUp>>>;
41
42/// Type-erased dispatcher map keyed by aggregate type name.
43type DispatcherMap = HashMap<String, Box<dyn AggregateDispatcher>>;
44
45/// Type-erased catch-up list for projections.
46///
47/// Mirrors the `ProcessManagerList` pattern: each entry wraps a
48/// `ProjectionRunner<P>` behind a `std::sync::Mutex` so that
49/// [`inject_event`](AggregateStore::inject_event) can trigger catch-up on
50/// all projections without knowing the concrete `P` type.
51type ProjectionCatchUpList = Vec<std::sync::Mutex<Box<dyn ProjectionCatchUpFn>>>;
52
53/// Type-erased projection catch-up interface.
54///
55/// Implemented by [`ProjectionRunner<P>`](crate::projection::ProjectionRunner)
56/// wrapper so that the store can catch up all projections without knowing
57/// the concrete `P` type parameter.
58trait ProjectionCatchUpFn: Send + Sync {
59    /// Catch up on all subscribed streams and save the checkpoint.
60    fn catch_up(&mut self) -> io::Result<()>;
61}
62
63/// Wrapper that implements [`ProjectionCatchUpFn`] by delegating to a
64/// shared `Arc<Mutex<ProjectionRunner<P>>>`. This allows the type-erased
65/// catch-up list and the typed projection map to share the same runner.
66struct SharedProjectionCatchUp<P: Projection> {
67    inner: Arc<std::sync::Mutex<ProjectionRunner<P>>>,
68}
69
70impl<P: Projection> ProjectionCatchUpFn for SharedProjectionCatchUp<P> {
71    fn catch_up(&mut self) -> io::Result<()> {
72        let mut runner = self
73            .inner
74            .lock()
75            .map_err(|e| io::Error::other(e.to_string()))?;
76        runner.catch_up()
77    }
78}
79
80/// Default idle timeout for actors: 5 minutes.
81const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
82
83/// Options controlling the behaviour of [`AggregateStore::inject_event`].
84///
85/// # Examples
86///
87/// ```
88/// use eventfold_es::InjectOptions;
89///
90/// // Default: do not run process managers after injection.
91/// let opts = InjectOptions::default();
92/// assert!(!opts.run_process_managers);
93///
94/// // Opt in to process manager triggering.
95/// let opts = InjectOptions { run_process_managers: true };
96/// assert!(opts.run_process_managers);
97/// ```
98#[derive(Debug, Clone, Default)]
99pub struct InjectOptions {
100    /// When `true`, call [`AggregateStore::run_process_managers`] after
101    /// appending the event. Defaults to `false`.
102    pub run_process_managers: bool,
103}
104
105/// Central registry that manages aggregate instance lifecycles.
106///
107/// The store handles directory creation, actor spawning, and handle caching.
108/// It is `Clone + Send + Sync` -- cloning shares the underlying cache.
109///
110/// # Examples
111///
112/// ```no_run
113/// use eventfold_es::{AggregateStore, CommandContext};
114///
115/// # async fn example() -> std::io::Result<()> {
116/// let store = AggregateStore::open("/tmp/my-app").await?;
117/// // Use store.get::<MyAggregate>("instance-id") to get handles
118/// # Ok(())
119/// # }
120/// ```
121#[derive(Clone)]
122pub struct AggregateStore {
123    layout: StreamLayout,
124    cache: Arc<RwLock<HandleCache>>,
125    projections: Arc<std::sync::RwLock<ProjectionMap>>,
126    /// Type-erased projection catch-up runners for [`inject_event`].
127    projection_catch_ups: Arc<std::sync::RwLock<ProjectionCatchUpList>>,
128    process_managers: Arc<std::sync::RwLock<ProcessManagerList>>,
129    dispatchers: Arc<DispatcherMap>,
130    /// In-memory set of event IDs already injected, for deduplication.
131    /// Shared across clones via `Arc`.
132    seen_ids: Arc<std::sync::Mutex<HashSet<String>>>,
133    idle_timeout: Duration,
134}
135
136// Manual `Debug` because `dyn Any` is not `Debug` and we don't want to
137// expose cache internals.
138impl std::fmt::Debug for AggregateStore {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct("AggregateStore")
141            .field("base_dir", &self.layout.base_dir())
142            .finish()
143    }
144}
145
146impl AggregateStore {
147    /// Open or create a store rooted at `base_dir`.
148    ///
149    /// Creates the metadata directory if it does not exist.
150    ///
151    /// # Arguments
152    ///
153    /// * `base_dir` - Root directory for all event store data.
154    ///
155    /// # Returns
156    ///
157    /// A new `AggregateStore` ready to spawn aggregate actors.
158    ///
159    /// # Errors
160    ///
161    /// Returns `std::io::Error` if the metadata directory cannot be created.
162    pub async fn open(base_dir: impl AsRef<Path>) -> io::Result<Self> {
163        let layout = StreamLayout::new(base_dir.as_ref());
164        // Create meta dir using blocking I/O wrapped in spawn_blocking
165        // to avoid blocking the tokio reactor.
166        let meta_dir = layout.meta_dir();
167        tokio::task::spawn_blocking(move || std::fs::create_dir_all(meta_dir))
168            .await
169            .map_err(io::Error::other)??;
170        Ok(Self {
171            layout,
172            cache: Arc::new(RwLock::new(HashMap::new())),
173            projections: Arc::new(std::sync::RwLock::new(HashMap::new())),
174            projection_catch_ups: Arc::new(std::sync::RwLock::new(Vec::new())),
175            process_managers: Arc::new(std::sync::RwLock::new(Vec::new())),
176            dispatchers: Arc::new(HashMap::new()),
177            seen_ids: Arc::new(std::sync::Mutex::new(HashSet::new())),
178            idle_timeout: DEFAULT_IDLE_TIMEOUT,
179        })
180    }
181
182    /// Get a handle to an aggregate instance, spawning its actor if needed.
183    ///
184    /// If the actor is already running (cached), returns a clone of the
185    /// existing handle. Otherwise, creates the stream directory on disk and
186    /// spawns a new actor thread.
187    ///
188    /// # Arguments
189    ///
190    /// * `id` - Unique instance identifier within the aggregate type.
191    ///
192    /// # Returns
193    ///
194    /// An [`AggregateHandle`] for sending commands and reading state.
195    ///
196    /// # Errors
197    ///
198    /// Returns `std::io::Error` if directory creation or event log opening fails.
199    pub async fn get<A: Aggregate>(&self, id: &str) -> io::Result<AggregateHandle<A>> {
200        let key = (A::AGGREGATE_TYPE.to_owned(), id.to_owned());
201
202        // Fast path: check cache with read lock.
203        {
204            let cache = self.cache.read().await;
205            if let Some(boxed) = cache.get(&key)
206                && let Some(handle) = boxed.downcast_ref::<AggregateHandle<A>>()
207                && handle.is_alive()
208            {
209                return Ok(handle.clone());
210            }
211        }
212
213        // If we get here, either the handle is missing or the actor has
214        // exited (e.g. idle timeout). Evict any stale entry and re-spawn.
215        {
216            let mut cache = self.cache.write().await;
217            cache.remove(&key);
218        }
219
220        // Slow path: create stream directory and spawn actor.
221        let layout = self.layout.clone();
222        let agg_type = A::AGGREGATE_TYPE.to_owned();
223        let inst_id = id.to_owned();
224        let stream_dir =
225            tokio::task::spawn_blocking(move || layout.ensure_stream(&agg_type, &inst_id))
226                .await
227                .map_err(io::Error::other)??;
228
229        tracing::debug!(
230            aggregate_type = A::AGGREGATE_TYPE,
231            instance_id = %id,
232            "spawning actor"
233        );
234
235        let config = ActorConfig {
236            idle_timeout: self.idle_timeout,
237        };
238        let handle = spawn_actor_with_config::<A>(&stream_dir, config)?;
239
240        let mut cache = self.cache.write().await;
241        cache.insert(key, Box::new(handle.clone()));
242        Ok(handle)
243    }
244
245    /// List all instance IDs for a given aggregate type.
246    ///
247    /// # Returns
248    ///
249    /// A sorted `Vec<String>` of instance IDs. Returns an empty vector
250    /// if no instances of the given aggregate type exist.
251    ///
252    /// # Errors
253    ///
254    /// Returns `std::io::Error` if reading the directory fails.
255    pub async fn list<A: Aggregate>(&self) -> io::Result<Vec<String>> {
256        let layout = self.layout.clone();
257        let agg_type = A::AGGREGATE_TYPE.to_owned();
258        tokio::task::spawn_blocking(move || layout.list_streams(&agg_type))
259            .await
260            .map_err(io::Error::other)?
261    }
262
263    /// Create a builder for configuring projections and other options.
264    ///
265    /// # Arguments
266    ///
267    /// * `base_dir` - Root directory for all event store data.
268    ///
269    /// # Returns
270    ///
271    /// An [`AggregateStoreBuilder`] that can register projections before opening.
272    ///
273    /// # Examples
274    ///
275    /// ```no_run
276    /// use eventfold_es::AggregateStore;
277    ///
278    /// # async fn example() -> std::io::Result<()> {
279    /// let store = AggregateStore::builder("/tmp/my-app")
280    ///     // .projection::<MyProjection>()
281    ///     .open()
282    ///     .await?;
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub fn builder(base_dir: impl AsRef<Path>) -> AggregateStoreBuilder {
287        AggregateStoreBuilder {
288            base_dir: base_dir.as_ref().to_owned(),
289            projection_factories: Vec::new(),
290            process_manager_factories: Vec::new(),
291            dispatcher_factories: Vec::new(),
292            idle_timeout: DEFAULT_IDLE_TIMEOUT,
293        }
294    }
295
296    /// Catch up and return the current state of a registered projection.
297    ///
298    /// Triggers a lazy catch-up before returning: reads any new events
299    /// from subscribed streams. Returns a clone of the projection state.
300    ///
301    /// This method is synchronous (not async). It uses `std::sync` locks
302    /// and blocking I/O internally. For embedded use, `catch_up` is fast
303    /// for incremental updates. Callers that need an async boundary can
304    /// wrap this in `tokio::task::spawn_blocking`.
305    ///
306    /// # Returns
307    ///
308    /// A clone of the projection's current state after catching up.
309    ///
310    /// # Errors
311    ///
312    /// Returns `io::ErrorKind::NotFound` if the projection is not registered.
313    /// Returns `io::Error` if catching up on events fails.
314    pub fn projection<P: Projection>(&self) -> io::Result<P> {
315        let projections = self
316            .projections
317            .read()
318            .map_err(|e| io::Error::other(e.to_string()))?;
319        let runner_any = projections.get(P::NAME).ok_or_else(|| {
320            io::Error::new(
321                io::ErrorKind::NotFound,
322                format!("projection '{}' not registered", P::NAME),
323            )
324        })?;
325        // Downcast the type-erased `Box<dyn Any>` back to the concrete
326        // `Arc<Mutex<ProjectionRunner<P>>>`. This is safe because
327        // `projection()` on the builder registered the runner under the
328        // same `P::NAME` key.
329        let runner_arc = runner_any
330            .downcast_ref::<Arc<std::sync::Mutex<ProjectionRunner<P>>>>()
331            .ok_or_else(|| io::Error::other("projection type mismatch"))?;
332        let mut runner = runner_arc
333            .lock()
334            .map_err(|e| io::Error::other(e.to_string()))?;
335        runner.catch_up()?;
336        Ok(runner.state().clone())
337    }
338
339    /// Delete the checkpoint for projection `P` and replay all events from scratch.
340    ///
341    /// This acquires the projections read-lock, downcasts to the concrete
342    /// `ProjectionRunner<P>`, and calls `rebuild()` which:
343    /// 1. Deletes the checkpoint file
344    /// 2. Resets internal state to `ProjectionCheckpoint::default()`
345    /// 3. Calls `catch_up()` to replay all events from offset 0
346    /// 4. Saves the new checkpoint
347    ///
348    /// **Blocking I/O** -- if called from an async context,
349    /// wrap this in `tokio::task::spawn_blocking`.
350    ///
351    /// # Errors
352    ///
353    /// Returns `io::ErrorKind::NotFound` if the projection is not registered.
354    /// Returns `io::Error` if deleting the checkpoint or catching up fails.
355    pub fn rebuild_projection<P: Projection>(&self) -> io::Result<()> {
356        let projections = self
357            .projections
358            .read()
359            .map_err(|e| io::Error::other(e.to_string()))?;
360        let runner_any = projections.get(P::NAME).ok_or_else(|| {
361            io::Error::new(
362                io::ErrorKind::NotFound,
363                format!("projection '{}' not registered", P::NAME),
364            )
365        })?;
366        let runner_arc = runner_any
367            .downcast_ref::<Arc<std::sync::Mutex<ProjectionRunner<P>>>>()
368            .ok_or_else(|| io::Error::other("projection type mismatch"))?;
369        let mut runner = runner_arc
370            .lock()
371            .map_err(|e| io::Error::other(e.to_string()))?;
372        runner.rebuild()
373    }
374
375    /// Run all registered process managers through a catch-up pass.
376    ///
377    /// For each process manager:
378    /// 1. Catch up on subscribed streams, collecting command envelopes.
379    /// 2. Dispatch each envelope to the target aggregate via the type registry.
380    /// 3. Write failed dispatches to the per-PM dead-letter log.
381    /// 4. Save the process manager checkpoint after all envelopes are handled.
382    ///
383    /// # Returns
384    ///
385    /// A [`ProcessManagerReport`] summarizing how many envelopes were
386    /// dispatched and how many were dead-lettered.
387    ///
388    /// # Errors
389    ///
390    /// Returns `io::Error` if catching up or saving checkpoints fails.
391    pub async fn run_process_managers(&self) -> io::Result<ProcessManagerReport> {
392        // Collect envelopes and dead-letter paths from each PM under the
393        // std::sync::Mutex. The lock is held only for catch_up (blocking I/O).
394        let mut all_work: Vec<(Vec<crate::command::CommandEnvelope>, std::path::PathBuf)> =
395            Vec::new();
396
397        {
398            let pms = self
399                .process_managers
400                .read()
401                .map_err(|e| io::Error::other(e.to_string()))?;
402            for pm_mutex in pms.iter() {
403                let mut pm = pm_mutex
404                    .lock()
405                    .map_err(|e| io::Error::other(e.to_string()))?;
406                let envelopes = pm.catch_up()?;
407                let dead_letter_path = pm.dead_letter_path();
408                all_work.push((envelopes, dead_letter_path));
409            }
410        }
411
412        // Dispatch envelopes asynchronously.
413        let mut report = ProcessManagerReport::default();
414        for (envelopes, dead_letter_path) in &all_work {
415            for envelope in envelopes {
416                let agg_type = &envelope.aggregate_type;
417                match self.dispatchers.get(agg_type) {
418                    Some(dispatcher) => match dispatcher.dispatch(self, envelope.clone()).await {
419                        Ok(()) => {
420                            tracing::info!(
421                                target_type = %agg_type,
422                                target_id = %envelope.instance_id,
423                                "dispatching command"
424                            );
425                            report.dispatched += 1;
426                        }
427                        Err(e) => {
428                            tracing::error!(
429                                aggregate_type = %agg_type,
430                                instance_id = %envelope.instance_id,
431                                error = %e,
432                                "process manager dispatch failed, dead-lettering"
433                            );
434                            append_dead_letter(dead_letter_path, envelope.clone(), &e.to_string())?;
435                            report.dead_lettered += 1;
436                        }
437                    },
438                    None => {
439                        let err_msg = format!("unknown aggregate type: {agg_type}");
440                        tracing::error!(
441                            aggregate_type = %agg_type,
442                            "no dispatcher registered, dead-lettering"
443                        );
444                        append_dead_letter(dead_letter_path, envelope.clone(), &err_msg)?;
445                        report.dead_lettered += 1;
446                    }
447                }
448            }
449        }
450
451        // Save all PM checkpoints after dispatch is complete.
452        {
453            let pms = self
454                .process_managers
455                .read()
456                .map_err(|e| io::Error::other(e.to_string()))?;
457            for pm_mutex in pms.iter() {
458                let pm = pm_mutex
459                    .lock()
460                    .map_err(|e| io::Error::other(e.to_string()))?;
461                pm.save()?;
462            }
463        }
464
465        Ok(report)
466    }
467
468    /// Returns a reference to the underlying storage layout.
469    pub fn layout(&self) -> &StreamLayout {
470        &self.layout
471    }
472
473    /// List all known `(aggregate_type, instance_id)` pairs.
474    ///
475    /// When `aggregate_type` is `Some`, returns only streams for that type.
476    /// When `None`, returns streams across all aggregate types. Results are
477    /// sorted by aggregate type then instance ID.
478    ///
479    /// # Arguments
480    ///
481    /// * `aggregate_type` - Optional filter. When `Some`, only streams for
482    ///   that aggregate type are returned. When `None`, all streams are
483    ///   returned.
484    ///
485    /// # Returns
486    ///
487    /// A sorted `Vec<(String, String)>` of `(aggregate_type, instance_id)`
488    /// pairs. Returns an empty vector if no matching streams exist.
489    ///
490    /// # Errors
491    ///
492    /// Returns `std::io::Error` if reading the directory fails.
493    pub async fn list_streams(
494        &self,
495        aggregate_type: Option<&str>,
496    ) -> io::Result<Vec<(String, String)>> {
497        let layout = self.layout.clone();
498        match aggregate_type {
499            Some(agg_type) => {
500                let agg_type = agg_type.to_owned();
501                tokio::task::spawn_blocking(move || {
502                    let ids = layout.list_streams(&agg_type)?;
503                    Ok(ids.into_iter().map(|id| (agg_type.clone(), id)).collect())
504                })
505                .await
506                .map_err(io::Error::other)?
507            }
508            None => tokio::task::spawn_blocking(move || {
509                let types = layout.list_aggregate_types()?;
510                let mut pairs = Vec::new();
511                for agg_type in types {
512                    let ids = layout.list_streams(&agg_type)?;
513                    pairs.extend(ids.into_iter().map(|id| (agg_type.clone(), id)));
514                }
515                Ok(pairs)
516            })
517            .await
518            .map_err(io::Error::other)?,
519        }
520    }
521
522    /// Read all raw events from a stream identified by aggregate type and
523    /// instance ID.
524    ///
525    /// Returns the events in the order they were appended. Does not spawn
526    /// an actor or acquire a write lock on the stream.
527    ///
528    /// # Arguments
529    ///
530    /// * `aggregate_type` - The aggregate type name (e.g. `"counter"`).
531    /// * `instance_id` - The unique instance identifier within that type.
532    ///
533    /// # Returns
534    ///
535    /// A `Vec<eventfold::Event>` containing all events in the stream.
536    /// Returns `Ok(vec![])` if the stream directory exists but no events
537    /// have been written yet.
538    ///
539    /// # Errors
540    ///
541    /// Returns `std::io::Error` with `ErrorKind::NotFound` if the stream
542    /// directory does not exist (i.e. the stream was never created).
543    /// Returns `std::io::Error` for other I/O failures during reading.
544    pub async fn read_events(
545        &self,
546        aggregate_type: &str,
547        instance_id: &str,
548    ) -> io::Result<Vec<eventfold::Event>> {
549        let layout = self.layout.clone();
550        let agg_type = aggregate_type.to_owned();
551        let inst_id = instance_id.to_owned();
552        tokio::task::spawn_blocking(move || {
553            let stream_dir = layout.stream_dir(&agg_type, &inst_id);
554
555            // If the stream directory itself does not exist, return NotFound.
556            if !stream_dir.is_dir() {
557                return Err(io::Error::new(
558                    io::ErrorKind::NotFound,
559                    format!("stream directory not found: {}", stream_dir.display()),
560                ));
561            }
562
563            let reader = eventfold::EventReader::new(&stream_dir);
564            // read_from(0) returns NotFound when app.jsonl doesn't exist yet
565            // (stream dir created but no events written). Map that to empty vec.
566            let iter = match reader.read_from(0) {
567                Ok(iter) => iter,
568                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
569                Err(e) => return Err(e),
570            };
571
572            let mut events = Vec::new();
573            for result in iter {
574                let (event, _next_offset, _line_hash) = result?;
575                events.push(event);
576            }
577            Ok(events)
578        })
579        .await
580        .map_err(io::Error::other)?
581    }
582
583    /// Append a pre-validated event directly to a stream, bypassing command
584    /// validation.
585    ///
586    /// This is the primary entry point for relay-sync scenarios where events
587    /// have already been validated on the originating client. The event is
588    /// written as-is to the stream's JSONL log, projections are caught up,
589    /// and process managers are optionally triggered.
590    ///
591    /// # Deduplication
592    ///
593    /// If `event.id` is `Some(id)` and that ID has already been seen by this
594    /// store instance, the call returns `Ok(())` immediately without writing.
595    /// Events with `event.id = None` are never deduplicated.
596    ///
597    /// # Actor interaction
598    ///
599    /// If a live actor exists for the target stream, the event is injected
600    /// through the actor's channel (preserving the actor's exclusive writer
601    /// ownership). Otherwise, a temporary `EventWriter` is opened directly.
602    ///
603    /// # Arguments
604    ///
605    /// * `instance_id` - Unique instance identifier within the aggregate type.
606    /// * `event` - A pre-validated `eventfold::Event` to append as-is.
607    /// * `opts` - Controls whether process managers run after injection.
608    ///
609    /// # Returns
610    ///
611    /// `Ok(())` on success (including dedup no-ops).
612    ///
613    /// # Errors
614    ///
615    /// Returns `std::io::Error` if directory creation, event writing, or
616    /// projection catch-up fails.
617    pub async fn inject_event<A: Aggregate>(
618        &self,
619        instance_id: &str,
620        event: eventfold::Event,
621        opts: InjectOptions,
622    ) -> io::Result<()> {
623        // 1. Dedup check: if the event has an ID already seen, no-op.
624        let event_id = event.id.clone();
625        if let Some(ref id) = event_id {
626            let seen = self
627                .seen_ids
628                .lock()
629                .map_err(|e| io::Error::other(e.to_string()))?;
630            if seen.contains(id) {
631                return Ok(());
632            }
633        }
634
635        // 2. Ensure stream directory exists.
636        let layout = self.layout.clone();
637        let agg_type = A::AGGREGATE_TYPE.to_owned();
638        let inst_id = instance_id.to_owned();
639        let stream_dir =
640            tokio::task::spawn_blocking(move || layout.ensure_stream(&agg_type, &inst_id))
641                .await
642                .map_err(io::Error::other)??;
643
644        // 3. Append the event: route through the actor if one is alive,
645        //    otherwise open a temporary writer directly.
646        let key = (A::AGGREGATE_TYPE.to_owned(), instance_id.to_owned());
647        let injected_via_actor = {
648            let cache = self.cache.read().await;
649            if let Some(boxed) = cache.get(&key)
650                && let Some(handle) = boxed.downcast_ref::<AggregateHandle<A>>()
651                && handle.is_alive()
652            {
653                handle.inject_via_actor(event.clone()).await?;
654                true
655            } else {
656                false
657            }
658        };
659
660        if !injected_via_actor {
661            let ev = event;
662            tokio::task::spawn_blocking(move || {
663                let mut writer = eventfold::EventWriter::open(&stream_dir)?;
664                writer.append(&ev).map(|_| ())
665            })
666            .await
667            .map_err(io::Error::other)??;
668        }
669
670        // 4. Register event ID in seen_ids after successful append.
671        if let Some(id) = event_id {
672            let mut seen = self
673                .seen_ids
674                .lock()
675                .map_err(|e| io::Error::other(e.to_string()))?;
676            seen.insert(id);
677        }
678
679        // 5. Catch up all registered projections via the type-erased list.
680        {
681            let catch_ups = self
682                .projection_catch_ups
683                .read()
684                .map_err(|e| io::Error::other(e.to_string()))?;
685            for catch_up_mutex in catch_ups.iter() {
686                let mut catch_up = catch_up_mutex
687                    .lock()
688                    .map_err(|e| io::Error::other(e.to_string()))?;
689                catch_up.catch_up()?;
690            }
691        }
692
693        // 6. Optionally trigger process managers.
694        if opts.run_process_managers {
695            self.run_process_managers().await?;
696        }
697
698        Ok(())
699    }
700}
701
702/// Factory function type for creating a type-erased projection runner.
703///
704/// Each closure captures the concrete `P: Projection` type, creates a
705/// `ProjectionRunner<P>`, and returns both:
706/// - A `Box<dyn Any + Send + Sync>` (the `Arc<Mutex<ProjectionRunner<P>>>`) for
707///   the typed projection map (used by `projection::<P>()`).
708/// - A `Mutex<Box<dyn ProjectionCatchUpFn>>` for the type-erased catch-up list
709///   (used by `inject_event`).
710type ProjectionFactory = Box<
711    dyn FnOnce(
712        StreamLayout,
713    ) -> io::Result<(
714        Box<dyn Any + Send + Sync>,
715        std::sync::Mutex<Box<dyn ProjectionCatchUpFn>>,
716    )>,
717>;
718
719/// Factory function type for creating a type-erased process manager runner.
720type ProcessManagerFactory =
721    Box<dyn FnOnce(StreamLayout) -> io::Result<std::sync::Mutex<Box<dyn ProcessManagerCatchUp>>>>;
722
723/// Factory function type for creating a type-erased aggregate dispatcher.
724type DispatcherFactory = Box<dyn FnOnce() -> Box<dyn AggregateDispatcher>>;
725
726/// Builder for configuring an [`AggregateStore`] with projections and
727/// process managers.
728///
729/// Created via [`AggregateStore::builder`]. Register projections with
730/// [`projection`](AggregateStoreBuilder::projection), process managers with
731/// [`process_manager`](AggregateStoreBuilder::process_manager), and dispatch
732/// targets with [`aggregate_type`](AggregateStoreBuilder::aggregate_type),
733/// then call [`open`](AggregateStoreBuilder::open) to finalize.
734///
735/// # Examples
736///
737/// ```no_run
738/// use eventfold_es::AggregateStore;
739///
740/// # async fn example() -> std::io::Result<()> {
741/// let store = AggregateStore::builder("/tmp/my-app")
742///     // .projection::<MyProjection>()
743///     // .process_manager::<MySaga>()
744///     // .aggregate_type::<MyAggregate>()
745///     .open()
746///     .await?;
747/// # Ok(())
748/// # }
749/// ```
750pub struct AggregateStoreBuilder {
751    base_dir: PathBuf,
752    projection_factories: Vec<(String, ProjectionFactory)>,
753    process_manager_factories: Vec<(String, ProcessManagerFactory)>,
754    dispatcher_factories: Vec<(String, DispatcherFactory)>,
755    idle_timeout: Duration,
756}
757
758impl AggregateStoreBuilder {
759    /// Register a projection type to be managed by this store.
760    ///
761    /// The projection will be initialized (loading any existing checkpoint)
762    /// when [`open`](AggregateStoreBuilder::open) is called.
763    ///
764    /// # Type Parameters
765    ///
766    /// * `P` - A type implementing [`Projection`].
767    ///
768    /// # Returns
769    ///
770    /// `self` for method chaining.
771    pub fn projection<P: Projection>(mut self) -> Self {
772        self.projection_factories.push((
773            P::NAME.to_owned(),
774            Box::new(|layout| {
775                let runner = ProjectionRunner::<P>::new(layout)?;
776                let shared = Arc::new(std::sync::Mutex::new(runner));
777                // Store the Arc in the typed projection map for downcasting
778                // by `AggregateStore::projection::<P>()`.
779                let any_box: Box<dyn Any + Send + Sync> = Box::new(shared.clone());
780                // Create a type-erased catch-up wrapper sharing the same runner.
781                let catch_up: std::sync::Mutex<Box<dyn ProjectionCatchUpFn>> =
782                    std::sync::Mutex::new(Box::new(SharedProjectionCatchUp { inner: shared }));
783                Ok((any_box, catch_up))
784            }),
785        ));
786        self
787    }
788
789    /// Register a process manager type to be managed by this store.
790    ///
791    /// The process manager will be initialized (loading any existing
792    /// checkpoint) when [`open`](AggregateStoreBuilder::open) is called.
793    /// Use [`run_process_managers`](AggregateStore::run_process_managers)
794    /// to trigger catch-up and dispatch.
795    ///
796    /// # Type Parameters
797    ///
798    /// * `PM` - A type implementing [`ProcessManager`](crate::process_manager::ProcessManager).
799    ///
800    /// # Returns
801    ///
802    /// `self` for method chaining.
803    pub fn process_manager<PM>(mut self) -> Self
804    where
805        PM: crate::process_manager::ProcessManager,
806    {
807        self.process_manager_factories.push((
808            PM::NAME.to_owned(),
809            Box::new(|layout| {
810                let runner = ProcessManagerRunner::<PM>::new(layout)?;
811                Ok(std::sync::Mutex::new(
812                    Box::new(runner) as Box<dyn ProcessManagerCatchUp>
813                ))
814            }),
815        ));
816        self
817    }
818
819    /// Register an aggregate type as a dispatch target for process managers.
820    ///
821    /// This allows [`CommandEnvelope`](crate::command::CommandEnvelope)s
822    /// targeting this aggregate type to be deserialized and routed. The
823    /// aggregate's `Command` type must implement `DeserializeOwned`.
824    ///
825    /// # Type Parameters
826    ///
827    /// * `A` - A type implementing [`Aggregate`] with `Command: DeserializeOwned`.
828    ///
829    /// # Returns
830    ///
831    /// `self` for method chaining.
832    /// Set the idle timeout for actor eviction.
833    ///
834    /// Actors that receive no messages for this duration will shut down,
835    /// releasing their file lock. The next [`get`](AggregateStore::get) call
836    /// transparently re-spawns the actor and recovers state from disk.
837    ///
838    /// Defaults to 5 minutes. Pass `Duration::from_secs(u64::MAX / 2)` to
839    /// effectively disable idle eviction.
840    ///
841    /// # Arguments
842    ///
843    /// * `timeout` - How long an idle actor waits before shutting down.
844    ///
845    /// # Returns
846    ///
847    /// `self` for method chaining.
848    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
849        self.idle_timeout = timeout;
850        self
851    }
852
853    /// Register an aggregate type as a dispatch target for process managers.
854    ///
855    /// This allows [`CommandEnvelope`](crate::command::CommandEnvelope)s
856    /// targeting this aggregate type to be deserialized and routed. The
857    /// aggregate's `Command` type must implement `DeserializeOwned`.
858    ///
859    /// # Type Parameters
860    ///
861    /// * `A` - A type implementing [`Aggregate`] with `Command: DeserializeOwned`.
862    ///
863    /// # Returns
864    ///
865    /// `self` for method chaining.
866    pub fn aggregate_type<A>(mut self) -> Self
867    where
868        A: Aggregate,
869        A::Command: serde::de::DeserializeOwned,
870    {
871        self.dispatcher_factories.push((
872            A::AGGREGATE_TYPE.to_owned(),
873            Box::new(|| Box::new(TypedDispatcher::<A>::new()) as Box<dyn AggregateDispatcher>),
874        ));
875        self
876    }
877
878    /// Build and open the store, initializing all registered projections
879    /// and process managers.
880    ///
881    /// Creates the metadata directory on disk and instantiates each
882    /// projection runner and process manager runner (loading persisted
883    /// checkpoints if available).
884    ///
885    /// # Returns
886    ///
887    /// A fully initialized [`AggregateStore`].
888    ///
889    /// # Errors
890    ///
891    /// Returns `io::Error` if directory creation or initialization fails.
892    pub async fn open(self) -> io::Result<AggregateStore> {
893        let layout = StreamLayout::new(&self.base_dir);
894        let meta_dir = layout.meta_dir();
895        tokio::task::spawn_blocking(move || std::fs::create_dir_all(meta_dir))
896            .await
897            .map_err(io::Error::other)??;
898
899        let mut projections = HashMap::new();
900        let mut projection_catch_ups: ProjectionCatchUpList = Vec::new();
901        for (name, factory) in self.projection_factories {
902            let (any_runner, catch_up) = factory(layout.clone())?;
903            projections.insert(name, any_runner);
904            projection_catch_ups.push(catch_up);
905        }
906
907        let mut process_managers = Vec::new();
908        for (_name, factory) in self.process_manager_factories {
909            let runner = factory(layout.clone())?;
910            process_managers.push(runner);
911        }
912
913        let mut dispatchers: HashMap<String, Box<dyn AggregateDispatcher>> = HashMap::new();
914        for (name, factory) in self.dispatcher_factories {
915            dispatchers.insert(name, factory());
916        }
917
918        Ok(AggregateStore {
919            layout,
920            cache: Arc::new(RwLock::new(HashMap::new())),
921            projections: Arc::new(std::sync::RwLock::new(projections)),
922            projection_catch_ups: Arc::new(std::sync::RwLock::new(projection_catch_ups)),
923            process_managers: Arc::new(std::sync::RwLock::new(process_managers)),
924            dispatchers: Arc::new(dispatchers),
925            seen_ids: Arc::new(std::sync::Mutex::new(HashSet::new())),
926            idle_timeout: self.idle_timeout,
927        })
928    }
929}
930
931#[cfg(test)]
932mod tests {
933    use std::time::Duration;
934
935    use tempfile::TempDir;
936
937    use super::*;
938    use crate::aggregate::test_fixtures::{Counter, CounterCommand};
939    use crate::command::CommandContext;
940
941    #[tokio::test]
942    async fn full_roundtrip() {
943        let tmp = TempDir::new().expect("failed to create temp dir");
944        let store = AggregateStore::open(tmp.path())
945            .await
946            .expect("open should succeed");
947
948        let handle = store
949            .get::<Counter>("c-1")
950            .await
951            .expect("get should succeed");
952
953        let ctx = CommandContext::default();
954        handle
955            .execute(CounterCommand::Increment, ctx.clone())
956            .await
957            .expect("first increment should succeed");
958        handle
959            .execute(CounterCommand::Increment, ctx)
960            .await
961            .expect("second increment should succeed");
962
963        let state = handle.state().await.expect("state should succeed");
964        assert_eq!(state.value, 2);
965    }
966
967    #[tokio::test]
968    async fn list_empty_initially() {
969        let tmp = TempDir::new().expect("failed to create temp dir");
970        let store = AggregateStore::open(tmp.path())
971            .await
972            .expect("open should succeed");
973
974        let ids = store.list::<Counter>().await.expect("list should succeed");
975        assert!(ids.is_empty());
976    }
977
978    #[tokio::test]
979    async fn list_after_commands() {
980        let tmp = TempDir::new().expect("failed to create temp dir");
981        let store = AggregateStore::open(tmp.path())
982            .await
983            .expect("open should succeed");
984
985        let ctx = CommandContext::default();
986
987        let h1 = store
988            .get::<Counter>("c-1")
989            .await
990            .expect("get c-1 should succeed");
991        h1.execute(CounterCommand::Increment, ctx.clone())
992            .await
993            .expect("c-1 increment should succeed");
994
995        let h2 = store
996            .get::<Counter>("c-2")
997            .await
998            .expect("get c-2 should succeed");
999        h2.execute(CounterCommand::Increment, ctx)
1000            .await
1001            .expect("c-2 increment should succeed");
1002
1003        let mut ids = store.list::<Counter>().await.expect("list should succeed");
1004        ids.sort();
1005        assert_eq!(ids, vec!["c-1", "c-2"]);
1006    }
1007
1008    #[tokio::test]
1009    async fn same_id_returns_shared_handle() {
1010        let tmp = TempDir::new().expect("failed to create temp dir");
1011        let store = AggregateStore::open(tmp.path())
1012            .await
1013            .expect("open should succeed");
1014
1015        let h1 = store
1016            .get::<Counter>("c-1")
1017            .await
1018            .expect("first get should succeed");
1019        let h2 = store
1020            .get::<Counter>("c-1")
1021            .await
1022            .expect("second get should succeed");
1023
1024        h1.execute(CounterCommand::Increment, CommandContext::default())
1025            .await
1026            .expect("increment via h1 should succeed");
1027
1028        let state = h2.state().await.expect("state via h2 should succeed");
1029        assert_eq!(state.value, 1);
1030    }
1031
1032    #[tokio::test]
1033    async fn state_survives_store_reopen() {
1034        let tmp = TempDir::new().expect("failed to create temp dir");
1035
1036        // First store: increment 3 times, then drop everything.
1037        {
1038            let store = AggregateStore::open(tmp.path())
1039                .await
1040                .expect("open should succeed");
1041            let handle = store
1042                .get::<Counter>("c-1")
1043                .await
1044                .expect("get should succeed");
1045            let ctx = CommandContext::default();
1046            for _ in 0..3 {
1047                handle
1048                    .execute(CounterCommand::Increment, ctx.clone())
1049                    .await
1050                    .expect("increment should succeed");
1051            }
1052        }
1053
1054        // Brief sleep so the actor thread exits and releases the flock.
1055        tokio::time::sleep(Duration::from_millis(50)).await;
1056
1057        // Second store on the same directory should recover persisted state.
1058        let store = AggregateStore::open(tmp.path())
1059            .await
1060            .expect("reopen should succeed");
1061        let handle = store
1062            .get::<Counter>("c-1")
1063            .await
1064            .expect("get after reopen should succeed");
1065        let state = handle.state().await.expect("state should succeed");
1066        assert_eq!(state.value, 3);
1067    }
1068
1069    #[tokio::test]
1070    async fn two_aggregate_types_coexist() {
1071        // A minimal second aggregate type for testing type coexistence.
1072        #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1073        struct Toggle {
1074            pub on: bool,
1075        }
1076
1077        #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
1078        #[serde(tag = "type", content = "data")]
1079        enum ToggleEvent {
1080            Toggled,
1081        }
1082
1083        #[derive(Debug, thiserror::Error)]
1084        enum ToggleError {}
1085
1086        impl Aggregate for Toggle {
1087            const AGGREGATE_TYPE: &'static str = "toggle";
1088            type Command = ();
1089            type DomainEvent = ToggleEvent;
1090            type Error = ToggleError;
1091
1092            fn handle(&self, _cmd: ()) -> Result<Vec<ToggleEvent>, ToggleError> {
1093                Ok(vec![ToggleEvent::Toggled])
1094            }
1095
1096            fn apply(mut self, _event: &ToggleEvent) -> Self {
1097                self.on = !self.on;
1098                self
1099            }
1100        }
1101
1102        let tmp = TempDir::new().expect("failed to create temp dir");
1103        let store = AggregateStore::open(tmp.path())
1104            .await
1105            .expect("open should succeed");
1106
1107        // Interact with the Counter aggregate.
1108        let counter_handle = store
1109            .get::<Counter>("c-1")
1110            .await
1111            .expect("get counter should succeed");
1112        counter_handle
1113            .execute(CounterCommand::Increment, CommandContext::default())
1114            .await
1115            .expect("counter increment should succeed");
1116
1117        // Interact with the Toggle aggregate.
1118        let toggle_handle = store
1119            .get::<Toggle>("t-1")
1120            .await
1121            .expect("get toggle should succeed");
1122        toggle_handle
1123            .execute((), CommandContext::default())
1124            .await
1125            .expect("toggle should succeed");
1126
1127        // Verify states.
1128        let counter_state = counter_handle
1129            .state()
1130            .await
1131            .expect("counter state should succeed");
1132        assert_eq!(counter_state.value, 1);
1133
1134        let toggle_state = toggle_handle
1135            .state()
1136            .await
1137            .expect("toggle state should succeed");
1138        assert!(toggle_state.on);
1139
1140        // Verify listing is type-scoped.
1141        let counter_ids = store
1142            .list::<Counter>()
1143            .await
1144            .expect("list counters should succeed");
1145        assert_eq!(counter_ids, vec!["c-1"]);
1146
1147        let toggle_ids = store
1148            .list::<Toggle>()
1149            .await
1150            .expect("list toggles should succeed");
1151        assert_eq!(toggle_ids, vec!["t-1"]);
1152    }
1153
1154    // --- AggregateStoreBuilder + projection integration tests ---
1155
1156    use crate::projection::test_fixtures::EventCounter;
1157
1158    /// Helper: execute a single `Increment` command on the given instance.
1159    async fn increment(store: &AggregateStore, id: &str) {
1160        let handle = store.get::<Counter>(id).await.expect("get should succeed");
1161        handle
1162            .execute(CounterCommand::Increment, CommandContext::default())
1163            .await
1164            .expect("increment should succeed");
1165    }
1166
1167    #[tokio::test]
1168    async fn builder_with_projection_roundtrip() {
1169        let tmp = TempDir::new().expect("failed to create temp dir");
1170        let store = AggregateStore::builder(tmp.path())
1171            .projection::<EventCounter>()
1172            .open()
1173            .await
1174            .expect("builder open should succeed");
1175
1176        let handle = store
1177            .get::<Counter>("c-1")
1178            .await
1179            .expect("get should succeed");
1180        let ctx = CommandContext::default();
1181        for _ in 0..3 {
1182            handle
1183                .execute(CounterCommand::Increment, ctx.clone())
1184                .await
1185                .expect("increment should succeed");
1186        }
1187
1188        let counter = store
1189            .projection::<EventCounter>()
1190            .expect("projection query should succeed");
1191        assert_eq!(counter.count, 3);
1192    }
1193
1194    #[tokio::test]
1195    async fn projection_sees_multiple_instances() {
1196        let tmp = TempDir::new().expect("failed to create temp dir");
1197        let store = AggregateStore::builder(tmp.path())
1198            .projection::<EventCounter>()
1199            .open()
1200            .await
1201            .expect("builder open should succeed");
1202
1203        increment(&store, "c-1").await;
1204        increment(&store, "c-2").await;
1205
1206        let counter = store
1207            .projection::<EventCounter>()
1208            .expect("projection query should succeed");
1209        assert_eq!(counter.count, 2);
1210    }
1211
1212    #[tokio::test]
1213    async fn projection_persists_across_restart() {
1214        let tmp = TempDir::new().expect("failed to create temp dir");
1215
1216        // First store: execute events and query the projection (triggers save).
1217        {
1218            let store = AggregateStore::builder(tmp.path())
1219                .projection::<EventCounter>()
1220                .open()
1221                .await
1222                .expect("builder open should succeed");
1223
1224            increment(&store, "c-1").await;
1225            increment(&store, "c-1").await;
1226            increment(&store, "c-2").await;
1227
1228            let counter = store
1229                .projection::<EventCounter>()
1230                .expect("projection query should succeed");
1231            assert_eq!(counter.count, 3);
1232        }
1233
1234        // Brief sleep so actor threads exit and release flocks.
1235        tokio::time::sleep(Duration::from_millis(50)).await;
1236
1237        // Second store on the same directory: projection should restore
1238        // from checkpoint without replaying events.
1239        let store = AggregateStore::builder(tmp.path())
1240            .projection::<EventCounter>()
1241            .open()
1242            .await
1243            .expect("reopen should succeed");
1244
1245        let counter = store
1246            .projection::<EventCounter>()
1247            .expect("projection query after reopen should succeed");
1248        assert_eq!(counter.count, 3);
1249    }
1250
1251    #[tokio::test]
1252    async fn projection_without_registration_returns_error() {
1253        let tmp = TempDir::new().expect("failed to create temp dir");
1254        let store = AggregateStore::open(tmp.path())
1255            .await
1256            .expect("open should succeed");
1257
1258        let result = store.projection::<EventCounter>();
1259        assert!(result.is_err());
1260        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
1261    }
1262
1263    #[tokio::test]
1264    async fn open_convenience_still_works() {
1265        let tmp = TempDir::new().expect("failed to create temp dir");
1266        let store = AggregateStore::open(tmp.path())
1267            .await
1268            .expect("open should succeed");
1269
1270        let handle = store
1271            .get::<Counter>("c-1")
1272            .await
1273            .expect("get should succeed");
1274        handle
1275            .execute(CounterCommand::Increment, CommandContext::default())
1276            .await
1277            .expect("increment should succeed");
1278
1279        let state = handle.state().await.expect("state should succeed");
1280        assert_eq!(state.value, 1);
1281    }
1282
1283    // --- Process manager integration tests ---
1284    //
1285    // These tests use a "Receiver" aggregate that accepts deserializable
1286    // commands, plus a "ForwardSaga" process manager that reacts to Counter
1287    // events and dispatches commands to the Receiver.
1288
1289    /// A minimal aggregate that accepts JSON-deserializable commands.
1290    /// Used as a dispatch target for process manager tests.
1291    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1292    struct Receiver {
1293        pub received_count: u64,
1294    }
1295
1296    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1297    #[serde(tag = "type", content = "data")]
1298    enum ReceiverCommand {
1299        Accept,
1300    }
1301
1302    #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
1303    #[serde(tag = "type", content = "data")]
1304    enum ReceiverEvent {
1305        Accepted,
1306    }
1307
1308    #[derive(Debug, thiserror::Error)]
1309    enum ReceiverError {}
1310
1311    impl Aggregate for Receiver {
1312        const AGGREGATE_TYPE: &'static str = "receiver";
1313        type Command = ReceiverCommand;
1314        type DomainEvent = ReceiverEvent;
1315        type Error = ReceiverError;
1316
1317        fn handle(&self, _cmd: ReceiverCommand) -> Result<Vec<ReceiverEvent>, ReceiverError> {
1318            Ok(vec![ReceiverEvent::Accepted])
1319        }
1320
1321        fn apply(mut self, _event: &ReceiverEvent) -> Self {
1322            self.received_count += 1;
1323            self
1324        }
1325    }
1326
1327    /// A process manager that reacts to counter events by dispatching
1328    /// `Accept` commands to the Receiver aggregate.
1329    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1330    struct ForwardSaga {
1331        pub forwarded: u64,
1332    }
1333
1334    impl crate::process_manager::ProcessManager for ForwardSaga {
1335        const NAME: &'static str = "forward-saga";
1336
1337        fn subscriptions(&self) -> &'static [&'static str] {
1338            &["counter"]
1339        }
1340
1341        fn react(
1342            &mut self,
1343            _aggregate_type: &str,
1344            stream_id: &str,
1345            _event: &eventfold::Event,
1346        ) -> Vec<crate::command::CommandEnvelope> {
1347            self.forwarded += 1;
1348            vec![crate::command::CommandEnvelope {
1349                aggregate_type: "receiver".to_string(),
1350                instance_id: stream_id.to_string(),
1351                command: serde_json::json!({"type": "Accept"}),
1352                context: CommandContext::default(),
1353            }]
1354        }
1355    }
1356
1357    #[tokio::test]
1358    async fn end_to_end_process_manager_dispatch() {
1359        let tmp = TempDir::new().expect("failed to create temp dir");
1360        let store = AggregateStore::builder(tmp.path())
1361            .process_manager::<ForwardSaga>()
1362            .aggregate_type::<Receiver>()
1363            .open()
1364            .await
1365            .expect("builder open should succeed");
1366
1367        // Produce events in the Counter aggregate.
1368        increment(&store, "c-1").await;
1369        increment(&store, "c-1").await;
1370
1371        // Run process managers: should catch up, dispatch to Receiver.
1372        let report = store
1373            .run_process_managers()
1374            .await
1375            .expect("run_process_managers should succeed");
1376
1377        assert_eq!(report.dispatched, 2);
1378        assert_eq!(report.dead_lettered, 0);
1379
1380        // Verify the Receiver aggregate received the dispatched commands.
1381        let receiver_handle = store
1382            .get::<Receiver>("c-1")
1383            .await
1384            .expect("get receiver should succeed");
1385        let receiver_state = receiver_handle
1386            .state()
1387            .await
1388            .expect("receiver state should succeed");
1389        assert_eq!(receiver_state.received_count, 2);
1390    }
1391
1392    #[tokio::test]
1393    async fn process_manager_dead_letters_unknown_type() {
1394        /// A process manager that emits commands to a non-existent aggregate.
1395        #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1396        struct BadTargetSaga {
1397            seen: u64,
1398        }
1399
1400        impl crate::process_manager::ProcessManager for BadTargetSaga {
1401            const NAME: &'static str = "bad-target-saga";
1402
1403            fn subscriptions(&self) -> &'static [&'static str] {
1404                &["counter"]
1405            }
1406
1407            fn react(
1408                &mut self,
1409                _aggregate_type: &str,
1410                _stream_id: &str,
1411                _event: &eventfold::Event,
1412            ) -> Vec<crate::command::CommandEnvelope> {
1413                self.seen += 1;
1414                vec![crate::command::CommandEnvelope {
1415                    aggregate_type: "nonexistent".to_string(),
1416                    instance_id: "x".to_string(),
1417                    command: serde_json::json!({}),
1418                    context: CommandContext::default(),
1419                }]
1420            }
1421        }
1422
1423        let tmp = TempDir::new().expect("failed to create temp dir");
1424        let store = AggregateStore::builder(tmp.path())
1425            .process_manager::<BadTargetSaga>()
1426            .open()
1427            .await
1428            .expect("builder open should succeed");
1429
1430        increment(&store, "c-1").await;
1431
1432        let report = store
1433            .run_process_managers()
1434            .await
1435            .expect("run_process_managers should succeed");
1436
1437        assert_eq!(report.dispatched, 0);
1438        assert_eq!(report.dead_lettered, 1);
1439
1440        // Verify dead-letter file is readable JSONL.
1441        let dl_path = tmp
1442            .path()
1443            .join("process_managers/bad-target-saga/dead_letters.jsonl");
1444        let contents = std::fs::read_to_string(&dl_path).expect("dead-letter file should exist");
1445        let entry: serde_json::Value =
1446            serde_json::from_str(contents.trim()).expect("dead-letter entry should be valid JSON");
1447        assert!(
1448            entry["error"]
1449                .as_str()
1450                .expect("error field should be a string")
1451                .contains("nonexistent")
1452        );
1453    }
1454
1455    #[tokio::test]
1456    async fn run_process_managers_idempotent() {
1457        let tmp = TempDir::new().expect("failed to create temp dir");
1458        let store = AggregateStore::builder(tmp.path())
1459            .process_manager::<ForwardSaga>()
1460            .aggregate_type::<Receiver>()
1461            .open()
1462            .await
1463            .expect("builder open should succeed");
1464
1465        increment(&store, "c-1").await;
1466
1467        // First run: dispatches 1 envelope.
1468        let first = store
1469            .run_process_managers()
1470            .await
1471            .expect("first run should succeed");
1472        assert_eq!(first.dispatched, 1);
1473
1474        // Second run with no new events: should be a no-op.
1475        let second = store
1476            .run_process_managers()
1477            .await
1478            .expect("second run should succeed");
1479        assert_eq!(second.dispatched, 0);
1480        assert_eq!(second.dead_lettered, 0);
1481    }
1482
1483    #[tokio::test]
1484    async fn process_manager_recovers_after_restart() {
1485        let tmp = TempDir::new().expect("failed to create temp dir");
1486
1487        // First store: process 2 events.
1488        {
1489            let store = AggregateStore::builder(tmp.path())
1490                .process_manager::<ForwardSaga>()
1491                .aggregate_type::<Receiver>()
1492                .open()
1493                .await
1494                .expect("builder open should succeed");
1495
1496            increment(&store, "c-1").await;
1497            increment(&store, "c-2").await;
1498
1499            let report = store
1500                .run_process_managers()
1501                .await
1502                .expect("run should succeed");
1503            assert_eq!(report.dispatched, 2);
1504        }
1505
1506        // Brief sleep so actor threads exit.
1507        tokio::time::sleep(Duration::from_millis(50)).await;
1508
1509        // Second store: add 1 more event, run PM again.
1510        let store = AggregateStore::builder(tmp.path())
1511            .process_manager::<ForwardSaga>()
1512            .aggregate_type::<Receiver>()
1513            .open()
1514            .await
1515            .expect("reopen should succeed");
1516
1517        increment(&store, "c-1").await;
1518
1519        let report = store
1520            .run_process_managers()
1521            .await
1522            .expect("run after restart should succeed");
1523
1524        // Should only dispatch the 1 new event, not replay old ones.
1525        assert_eq!(report.dispatched, 1);
1526        assert_eq!(report.dead_lettered, 0);
1527    }
1528
1529    // --- Idle eviction tests ---
1530
1531    #[tokio::test]
1532    async fn idle_actor_evicted_and_respawned() {
1533        let tmp = TempDir::new().expect("failed to create temp dir");
1534        let store = AggregateStore::builder(tmp.path())
1535            .idle_timeout(Duration::from_millis(200))
1536            .open()
1537            .await
1538            .expect("builder open should succeed");
1539
1540        // Execute a command.
1541        let handle = store
1542            .get::<Counter>("c-1")
1543            .await
1544            .expect("get should succeed");
1545        handle
1546            .execute(CounterCommand::Increment, CommandContext::default())
1547            .await
1548            .expect("increment should succeed");
1549
1550        // Wait for the actor to idle out.
1551        tokio::time::sleep(Duration::from_millis(400)).await;
1552        assert!(
1553            !handle.is_alive(),
1554            "actor should be dead after idle timeout"
1555        );
1556
1557        // A new `get` should transparently re-spawn.
1558        let handle2 = store
1559            .get::<Counter>("c-1")
1560            .await
1561            .expect("get after eviction should succeed");
1562        let state = handle2.state().await.expect("state should succeed");
1563        assert_eq!(state.value, 1, "state should reflect persisted events");
1564    }
1565
1566    #[tokio::test]
1567    async fn rapid_commands_keep_actor_alive() {
1568        let tmp = TempDir::new().expect("failed to create temp dir");
1569        let store = AggregateStore::builder(tmp.path())
1570            .idle_timeout(Duration::from_millis(300))
1571            .open()
1572            .await
1573            .expect("builder open should succeed");
1574
1575        let handle = store
1576            .get::<Counter>("c-1")
1577            .await
1578            .expect("get should succeed");
1579
1580        let ctx = CommandContext::default();
1581        for _ in 0..5 {
1582            handle
1583                .execute(CounterCommand::Increment, ctx.clone())
1584                .await
1585                .expect("execute should succeed");
1586            tokio::time::sleep(Duration::from_millis(100)).await;
1587        }
1588
1589        assert!(
1590            handle.is_alive(),
1591            "actor should remain alive during activity"
1592        );
1593        let state = handle.state().await.expect("state should succeed");
1594        assert_eq!(state.value, 5);
1595    }
1596
1597    // --- inject_event tests ---
1598
1599    /// Helper: build an event matching Counter's "Incremented" variant.
1600    fn incremented_event() -> eventfold::Event {
1601        eventfold::Event::new("Incremented", serde_json::Value::Null)
1602    }
1603
1604    #[tokio::test]
1605    async fn inject_event_appends_to_stream() {
1606        let tmp = TempDir::new().expect("failed to create temp dir");
1607        let store = AggregateStore::open(tmp.path())
1608            .await
1609            .expect("open should succeed");
1610
1611        store
1612            .inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
1613            .await
1614            .expect("inject_event should succeed");
1615
1616        // Verify the event appears in the JSONL file.
1617        let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
1618        let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
1619        assert_eq!(
1620            contents.lines().count(),
1621            1,
1622            "should have exactly one event line"
1623        );
1624    }
1625
1626    #[tokio::test]
1627    async fn inject_event_projections_reflect_event() {
1628        let tmp = TempDir::new().expect("failed to create temp dir");
1629        let store = AggregateStore::builder(tmp.path())
1630            .projection::<EventCounter>()
1631            .open()
1632            .await
1633            .expect("builder open should succeed");
1634
1635        store
1636            .inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
1637            .await
1638            .expect("inject_event should succeed");
1639
1640        let counter = store
1641            .projection::<EventCounter>()
1642            .expect("projection query should succeed");
1643        assert_eq!(counter.count, 1);
1644    }
1645
1646    #[tokio::test]
1647    async fn inject_event_dedup_by_id() {
1648        let tmp = TempDir::new().expect("failed to create temp dir");
1649        let store = AggregateStore::open(tmp.path())
1650            .await
1651            .expect("open should succeed");
1652
1653        let event = incremented_event().with_id("ev-1".to_string());
1654
1655        // First injection should succeed and write.
1656        store
1657            .inject_event::<Counter>("c-1", event.clone(), InjectOptions::default())
1658            .await
1659            .expect("first inject should succeed");
1660
1661        // Second injection with the same ID should be a no-op.
1662        store
1663            .inject_event::<Counter>("c-1", event, InjectOptions::default())
1664            .await
1665            .expect("second inject should succeed (no-op)");
1666
1667        // Verify only one event in the JSONL.
1668        let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
1669        let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
1670        assert_eq!(
1671            contents.lines().count(),
1672            1,
1673            "dedup should prevent second write"
1674        );
1675    }
1676
1677    #[tokio::test]
1678    async fn inject_event_no_dedup_for_none_id() {
1679        let tmp = TempDir::new().expect("failed to create temp dir");
1680        let store = AggregateStore::open(tmp.path())
1681            .await
1682            .expect("open should succeed");
1683
1684        // Events with id=None should never be deduplicated.
1685        let event = incremented_event();
1686        assert!(event.id.is_none(), "precondition: id is None");
1687
1688        store
1689            .inject_event::<Counter>("c-1", event.clone(), InjectOptions::default())
1690            .await
1691            .expect("first inject should succeed");
1692
1693        store
1694            .inject_event::<Counter>("c-1", event, InjectOptions::default())
1695            .await
1696            .expect("second inject should succeed");
1697
1698        let jsonl_path = tmp.path().join("streams/counter/c-1/app.jsonl");
1699        let contents = std::fs::read_to_string(&jsonl_path).expect("app.jsonl should exist");
1700        assert_eq!(contents.lines().count(), 2, "both events should be written");
1701    }
1702
1703    #[tokio::test]
1704    async fn inject_options_default_does_not_run_process_managers() {
1705        let opts = InjectOptions::default();
1706        assert!(!opts.run_process_managers);
1707    }
1708
1709    #[tokio::test]
1710    async fn inject_event_with_process_managers() {
1711        let tmp = TempDir::new().expect("failed to create temp dir");
1712        let store = AggregateStore::builder(tmp.path())
1713            .process_manager::<ForwardSaga>()
1714            .aggregate_type::<Receiver>()
1715            .open()
1716            .await
1717            .expect("builder open should succeed");
1718
1719        store
1720            .inject_event::<Counter>(
1721                "c-1",
1722                incremented_event(),
1723                InjectOptions {
1724                    run_process_managers: true,
1725                },
1726            )
1727            .await
1728            .expect("inject_event should succeed");
1729
1730        // The ForwardSaga should have dispatched a command to Receiver.
1731        let receiver_handle = store
1732            .get::<Receiver>("c-1")
1733            .await
1734            .expect("get receiver should succeed");
1735        let receiver_state = receiver_handle
1736            .state()
1737            .await
1738            .expect("receiver state should succeed");
1739        assert_eq!(
1740            receiver_state.received_count, 1,
1741            "process manager should have dispatched"
1742        );
1743    }
1744
1745    #[tokio::test]
1746    async fn inject_event_with_live_actor() {
1747        let tmp = TempDir::new().expect("failed to create temp dir");
1748        let store = AggregateStore::open(tmp.path())
1749            .await
1750            .expect("open should succeed");
1751
1752        // Spawn an actor for c-1 by getting a handle.
1753        let handle = store
1754            .get::<Counter>("c-1")
1755            .await
1756            .expect("get should succeed");
1757        assert!(handle.is_alive(), "actor should be alive");
1758
1759        // Inject an event -- should route through the live actor.
1760        store
1761            .inject_event::<Counter>("c-1", incremented_event(), InjectOptions::default())
1762            .await
1763            .expect("inject_event with live actor should succeed");
1764
1765        // The actor's view should reflect the injected event.
1766        let state = handle.state().await.expect("state should succeed");
1767        assert_eq!(state.value, 1, "actor should see the injected event");
1768    }
1769
1770    #[tokio::test]
1771    async fn inject_event_creates_new_stream() {
1772        let tmp = TempDir::new().expect("failed to create temp dir");
1773        let store = AggregateStore::open(tmp.path())
1774            .await
1775            .expect("open should succeed");
1776
1777        // Inject into a stream that doesn't exist yet.
1778        store
1779            .inject_event::<Counter>(
1780                "new-instance",
1781                incremented_event(),
1782                InjectOptions::default(),
1783            )
1784            .await
1785            .expect("inject_event should create stream");
1786
1787        // Verify the directory was created.
1788        let stream_dir = tmp.path().join("streams/counter/new-instance");
1789        assert!(stream_dir.is_dir(), "stream directory should exist");
1790
1791        // Verify state via a fresh actor.
1792        let handle = store
1793            .get::<Counter>("new-instance")
1794            .await
1795            .expect("get should succeed after inject");
1796        let state = handle.state().await.expect("state should succeed");
1797        assert_eq!(state.value, 1, "actor should replay the injected event");
1798    }
1799
1800    // --- list_streams / read_events tests ---
1801
1802    // Minimal second aggregate type shared by list_streams tests.
1803    #[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
1804    struct Toggle {
1805        pub on: bool,
1806    }
1807
1808    #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
1809    #[serde(tag = "type", content = "data")]
1810    enum ToggleEvent {
1811        Toggled,
1812    }
1813
1814    #[derive(Debug, thiserror::Error)]
1815    enum ToggleError {}
1816
1817    impl Aggregate for Toggle {
1818        const AGGREGATE_TYPE: &'static str = "toggle";
1819        type Command = ();
1820        type DomainEvent = ToggleEvent;
1821        type Error = ToggleError;
1822
1823        fn handle(&self, _cmd: ()) -> Result<Vec<ToggleEvent>, ToggleError> {
1824            Ok(vec![ToggleEvent::Toggled])
1825        }
1826
1827        fn apply(mut self, _event: &ToggleEvent) -> Self {
1828            self.on = !self.on;
1829            self
1830        }
1831    }
1832
1833    /// Helper: execute a single Toggle command on the given instance.
1834    async fn toggle(store: &AggregateStore, id: &str) {
1835        let handle = store
1836            .get::<Toggle>(id)
1837            .await
1838            .expect("get toggle should succeed");
1839        handle
1840            .execute((), CommandContext::default())
1841            .await
1842            .expect("toggle should succeed");
1843    }
1844
1845    #[tokio::test]
1846    async fn list_streams_none_returns_all_sorted() {
1847        let tmp = TempDir::new().expect("failed to create temp dir");
1848        let store = AggregateStore::open(tmp.path())
1849            .await
1850            .expect("open should succeed");
1851
1852        // Create counter instances c-1, c-2 and toggle instance t-1.
1853        increment(&store, "c-1").await;
1854        increment(&store, "c-2").await;
1855        toggle(&store, "t-1").await;
1856
1857        let pairs = store
1858            .list_streams(None)
1859            .await
1860            .expect("list_streams(None) should succeed");
1861
1862        assert_eq!(
1863            pairs,
1864            vec![
1865                ("counter".to_owned(), "c-1".to_owned()),
1866                ("counter".to_owned(), "c-2".to_owned()),
1867                ("toggle".to_owned(), "t-1".to_owned()),
1868            ]
1869        );
1870    }
1871
1872    #[tokio::test]
1873    async fn list_streams_some_filters_by_type() {
1874        let tmp = TempDir::new().expect("failed to create temp dir");
1875        let store = AggregateStore::open(tmp.path())
1876            .await
1877            .expect("open should succeed");
1878
1879        increment(&store, "c-1").await;
1880        increment(&store, "c-2").await;
1881        toggle(&store, "t-1").await;
1882
1883        let pairs = store
1884            .list_streams(Some("counter"))
1885            .await
1886            .expect("list_streams(Some) should succeed");
1887
1888        assert_eq!(
1889            pairs,
1890            vec![
1891                ("counter".to_owned(), "c-1".to_owned()),
1892                ("counter".to_owned(), "c-2".to_owned()),
1893            ]
1894        );
1895    }
1896
1897    #[tokio::test]
1898    async fn list_streams_none_empty_store() {
1899        let tmp = TempDir::new().expect("failed to create temp dir");
1900        let store = AggregateStore::open(tmp.path())
1901            .await
1902            .expect("open should succeed");
1903
1904        let pairs = store
1905            .list_streams(None)
1906            .await
1907            .expect("list_streams(None) on empty store should succeed");
1908
1909        assert!(pairs.is_empty());
1910    }
1911
1912    #[tokio::test]
1913    async fn list_streams_some_nonexistent_type() {
1914        let tmp = TempDir::new().expect("failed to create temp dir");
1915        let store = AggregateStore::open(tmp.path())
1916            .await
1917            .expect("open should succeed");
1918
1919        let pairs = store
1920            .list_streams(Some("nonexistent"))
1921            .await
1922            .expect("list_streams(Some(nonexistent)) should succeed");
1923
1924        assert!(pairs.is_empty());
1925    }
1926
1927    #[tokio::test]
1928    async fn read_events_returns_all_events() {
1929        let tmp = TempDir::new().expect("failed to create temp dir");
1930        let store = AggregateStore::open(tmp.path())
1931            .await
1932            .expect("open should succeed");
1933
1934        increment(&store, "c-1").await;
1935        increment(&store, "c-1").await;
1936
1937        let events = store
1938            .read_events("counter", "c-1")
1939            .await
1940            .expect("read_events should succeed");
1941
1942        assert_eq!(events.len(), 2);
1943        assert_eq!(events[0].event_type, "Incremented");
1944        assert_eq!(events[1].event_type, "Incremented");
1945    }
1946
1947    #[tokio::test]
1948    async fn read_events_empty_stream_returns_empty_vec() {
1949        let tmp = TempDir::new().expect("failed to create temp dir");
1950        let store = AggregateStore::open(tmp.path())
1951            .await
1952            .expect("open should succeed");
1953
1954        // Create the stream directory without executing any commands.
1955        let _handle = store
1956            .get::<Counter>("c-1")
1957            .await
1958            .expect("get should succeed");
1959
1960        // Drop the handle's actor so the flock is released, then read.
1961        // The stream directory exists but app.jsonl may or may not exist.
1962        // In either case read_events should return Ok(vec![]).
1963        let events = store
1964            .read_events("counter", "c-1")
1965            .await
1966            .expect("read_events on empty stream should succeed");
1967
1968        assert!(events.is_empty());
1969    }
1970
1971    #[tokio::test]
1972    async fn read_events_nonexistent_stream_returns_not_found() {
1973        let tmp = TempDir::new().expect("failed to create temp dir");
1974        let store = AggregateStore::open(tmp.path())
1975            .await
1976            .expect("open should succeed");
1977
1978        let result = store.read_events("nonexistent", "x").await;
1979
1980        assert!(result.is_err());
1981        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
1982    }
1983}