Skip to main content

eventfold_es/
actor.rs

1//! Actor loop that owns an aggregate and processes commands.
2//!
3//! The actor runs on a blocking thread and sequentially processes messages
4//! from an `mpsc` channel. It exclusively owns the `EventWriter` (and
5//! therefore the flock), the `View<A>` for maintaining aggregate state,
6//! and the `EventReader` for refreshing the view before each command.
7//!
8//! Public API: [`AggregateHandle`] (cloneable async handle) and
9//! [`spawn_actor`] (factory that opens the log and starts the actor thread).
10
11use std::io;
12use std::path::Path;
13use std::time::Duration;
14
15use eventfold::{EventReader, EventWriter, View};
16use tokio::sync::{mpsc, oneshot};
17
18use crate::aggregate::{Aggregate, reducer, to_eventfold_event};
19use crate::command::CommandContext;
20use crate::error::{ExecuteError, StateError};
21
22/// Maximum number of optimistic concurrency retries before giving up.
23///
24/// Currently unused because the actor exclusively owns the writer, so
25/// conflicts cannot occur. Retained for future `append_if` support.
26#[allow(dead_code)]
27const DEFAULT_MAX_RETRIES: u32 = 3;
28
29/// Configuration for the actor loop.
30///
31/// Internal to the crate -- callers configure idle timeout through
32/// [`AggregateStoreBuilder::idle_timeout`](crate::AggregateStoreBuilder::idle_timeout).
33pub(crate) struct ActorConfig {
34    /// How long the actor waits for a message before shutting down.
35    /// An effectively infinite value means the actor never idles out.
36    pub idle_timeout: Duration,
37}
38
39/// Result type sent back through the `Execute` reply channel.
40type ExecuteResult<A> =
41    Result<Vec<<A as Aggregate>::DomainEvent>, ExecuteError<<A as Aggregate>::Error>>;
42
43/// Messages sent from `AggregateHandle` to the actor loop.
44///
45/// Each variant carries a `oneshot::Sender` for the actor to reply on
46/// once the operation completes.
47pub(crate) enum ActorMessage<A: Aggregate> {
48    /// Execute a command against the aggregate.
49    Execute {
50        /// The domain command to execute.
51        cmd: A::Command,
52        /// Cross-cutting metadata (actor identity, correlation ID, etc.).
53        ctx: CommandContext,
54        /// Channel to send back the produced domain events or an error.
55        reply: oneshot::Sender<ExecuteResult<A>>,
56    },
57
58    /// Retrieve the current aggregate state.
59    GetState {
60        /// Channel to send back a clone of the current state or an error.
61        reply: oneshot::Sender<Result<A, StateError>>,
62    },
63
64    /// Inject a pre-validated `eventfold::Event` directly into the stream.
65    ///
66    /// The actor appends the event via its owned `EventWriter` and sends
67    /// the I/O result back on `reply`. This keeps the actor as the sole
68    /// writer for any stream that has a live actor.
69    Inject {
70        /// The pre-validated event to append as-is.
71        event: eventfold::Event,
72        /// Channel to send back the I/O result.
73        reply: oneshot::Sender<io::Result<()>>,
74    },
75
76    /// Gracefully shut down the actor loop.
77    #[allow(dead_code)] // Constructed only in tests.
78    Shutdown,
79}
80
81/// Runs the aggregate actor loop on a blocking thread.
82///
83/// Owns the `EventWriter` and aggregate state `View`. Receives messages
84/// from `AggregateHandle` via the mpsc channel and processes them sequentially.
85/// The loop exits when the channel closes (all senders dropped), a
86/// `Shutdown` message is received, or the idle timeout elapses. On exit the
87/// `EventWriter` is dropped, releasing the flock.
88///
89/// # Arguments
90///
91/// * `writer` - Exclusive writer for the aggregate's event stream.
92/// * `view` - Derived view that holds the current aggregate state.
93/// * `reader` - Reader used to refresh the view before each operation.
94/// * `rx` - Receiving end of the mpsc channel carrying `ActorMessage`s.
95/// * `config` - Actor configuration (idle timeout).
96pub(crate) fn run_actor<A: Aggregate>(
97    mut writer: EventWriter,
98    mut view: View<A>,
99    reader: EventReader,
100    mut rx: mpsc::Receiver<ActorMessage<A>>,
101    config: ActorConfig,
102) {
103    // Build a lightweight current-thread runtime with time enabled.
104    // The actor needs `tokio::time::timeout` to implement idle eviction,
105    // but the parent runtime may be current-thread (common in tests),
106    // which doesn't drive timers from non-runtime threads. A dedicated
107    // minimal runtime avoids that constraint and keeps the actor
108    // self-contained.
109    let rt = tokio::runtime::Builder::new_current_thread()
110        .enable_time()
111        .build()
112        .expect("failed to create actor timeout runtime");
113
114    loop {
115        // Create the timeout future INSIDE `block_on` so that the `Sleep`
116        // timer registers with the local runtime's time driver.
117        let idle_timeout = config.idle_timeout;
118        let msg = rt.block_on(async { tokio::time::timeout(idle_timeout, rx.recv()).await });
119
120        match msg {
121            // Received a message before the timeout elapsed.
122            Ok(Some(msg)) => match msg {
123                ActorMessage::Execute { cmd, ctx, reply } => {
124                    let _span = tracing::info_span!("execute", aggregate_type = A::AGGREGATE_TYPE,)
125                        .entered();
126                    let result = execute_command::<A>(&mut writer, &mut view, &reader, cmd, &ctx);
127                    // If the receiver was dropped, the caller no longer cares
128                    // about the result. Silently discard it.
129                    let _ = reply.send(result);
130                }
131
132                ActorMessage::GetState { reply } => {
133                    let result = get_state::<A>(&mut view, &reader);
134                    let _ = reply.send(result);
135                }
136
137                ActorMessage::Inject { event, reply } => {
138                    let result = writer.append(&event).map(|_| ());
139                    let _ = reply.send(result);
140                }
141
142                ActorMessage::Shutdown => break,
143            },
144            // Channel closed: all senders dropped.
145            Ok(None) => break,
146            // Idle timeout elapsed with no messages.
147            Err(_elapsed) => {
148                tracing::info!(
149                    aggregate_type = A::AGGREGATE_TYPE,
150                    "actor idle, shutting down"
151                );
152                break;
153            }
154        }
155    }
156    // Loop exited: either `Shutdown` received, channel closed, or idle timeout.
157    // `writer` is dropped here, releasing the flock on the event log.
158}
159
160/// Execute a single command: refresh state, handle the command, persist events.
161///
162/// Factored out of the match arm for clarity and to keep the actor loop concise.
163fn execute_command<A: Aggregate>(
164    writer: &mut EventWriter,
165    view: &mut View<A>,
166    reader: &EventReader,
167    cmd: A::Command,
168    ctx: &CommandContext,
169) -> Result<Vec<A::DomainEvent>, ExecuteError<A::Error>> {
170    // 1. Refresh the view to incorporate any events not yet folded.
171    //    Although we are the sole writer, a prior iteration may have
172    //    appended events that the view hasn't consumed yet.
173    view.refresh(reader).map_err(ExecuteError::Io)?;
174
175    // 2. Decide: run the command handler against current state.
176    let state = view.state().clone();
177    let domain_events = state.handle(cmd).map_err(ExecuteError::Domain)?;
178
179    // 3. No-op commands produce no events.
180    if domain_events.is_empty() {
181        return Ok(domain_events);
182    }
183
184    // 4. Convert each domain event to an `eventfold::Event` and append.
185    for de in &domain_events {
186        let ef_event = to_eventfold_event::<A>(de, ctx)
187            .map_err(|e| ExecuteError::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?;
188        writer.append(&ef_event).map_err(ExecuteError::Io)?;
189    }
190
191    tracing::info!(count = domain_events.len(), "events appended");
192
193    Ok(domain_events)
194}
195
196/// Refresh the view and return a clone of the current aggregate state.
197fn get_state<A: Aggregate>(view: &mut View<A>, reader: &EventReader) -> Result<A, StateError> {
198    view.refresh(reader).map_err(StateError::Io)?;
199    Ok(view.state().clone())
200}
201
202/// Async handle to a running aggregate actor.
203///
204/// Lightweight, cloneable, and `Send + Sync`. Communicates with the
205/// actor thread over a bounded channel.
206///
207/// # Type Parameters
208///
209/// * `A` - The [`Aggregate`] type this handle controls.
210#[derive(Debug)]
211pub struct AggregateHandle<A: Aggregate> {
212    sender: mpsc::Sender<ActorMessage<A>>,
213    reader: EventReader,
214}
215
216// Manual `Clone` because `A` itself need not be `Clone` for the handle --
217// we only clone the `Sender` and `EventReader`, both of which are always
218// `Clone` regardless of `A`.
219impl<A: Aggregate> Clone for AggregateHandle<A> {
220    fn clone(&self) -> Self {
221        Self {
222            sender: self.sender.clone(),
223            reader: self.reader.clone(),
224        }
225    }
226}
227
228impl<A: Aggregate> AggregateHandle<A> {
229    /// Send a command to the aggregate and wait for the result.
230    ///
231    /// Returns the domain events produced by the command on success.
232    ///
233    /// # Arguments
234    ///
235    /// * `cmd` - The domain command to execute against the aggregate.
236    /// * `ctx` - Cross-cutting metadata (actor identity, correlation ID, etc.).
237    ///
238    /// # Returns
239    ///
240    /// The domain events produced by the command on success.
241    ///
242    /// # Errors
243    ///
244    /// * [`ExecuteError::Domain`] -- the aggregate rejected the command.
245    /// * [`ExecuteError::Io`] -- a disk I/O error occurred.
246    /// * [`ExecuteError::ActorGone`] -- the actor thread has exited.
247    pub async fn execute(
248        &self,
249        cmd: A::Command,
250        ctx: CommandContext,
251    ) -> Result<Vec<A::DomainEvent>, ExecuteError<A::Error>> {
252        let (tx, rx) = oneshot::channel();
253        self.sender
254            .send(ActorMessage::Execute {
255                cmd,
256                ctx,
257                reply: tx,
258            })
259            .await
260            .map_err(|_| ExecuteError::ActorGone)?;
261        rx.await.map_err(|_| ExecuteError::ActorGone)?
262    }
263
264    /// Read the current aggregate state.
265    ///
266    /// Refreshes the view from disk before returning.
267    ///
268    /// # Returns
269    ///
270    /// A clone of the current aggregate state.
271    ///
272    /// # Errors
273    ///
274    /// * [`StateError::Io`] -- a disk I/O error occurred.
275    /// * [`StateError::ActorGone`] -- the actor thread has exited.
276    pub async fn state(&self) -> Result<A, StateError> {
277        let (tx, rx) = oneshot::channel();
278        self.sender
279            .send(ActorMessage::GetState { reply: tx })
280            .await
281            .map_err(|_| StateError::ActorGone)?;
282        rx.await.map_err(|_| StateError::ActorGone)?
283    }
284
285    /// Inject a pre-validated event by sending it to the actor for append.
286    ///
287    /// The actor owns the exclusive `EventWriter`, so this method routes
288    /// the event through the actor's channel to avoid lock contention.
289    /// The actor appends the event and sends the I/O result back.
290    ///
291    /// # Arguments
292    ///
293    /// * `event` - A pre-validated `eventfold::Event` to append as-is.
294    ///
295    /// # Errors
296    ///
297    /// * [`io::Error`] with [`io::ErrorKind::BrokenPipe`] if the actor
298    ///   has exited and the channel is closed.
299    /// * [`io::Error`] if the underlying `EventWriter::append` fails.
300    pub(crate) async fn inject_via_actor(&self, event: eventfold::Event) -> io::Result<()> {
301        let (tx, rx) = oneshot::channel();
302        self.sender
303            .send(ActorMessage::Inject { event, reply: tx })
304            .await
305            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "actor gone"))?;
306        rx.await
307            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "actor gone"))?
308    }
309
310    /// Returns a reference to the [`EventReader`] for this aggregate's stream.
311    pub fn reader(&self) -> &EventReader {
312        &self.reader
313    }
314
315    /// Check whether the actor backing this handle is still running.
316    ///
317    /// Returns `false` if the actor thread has exited (e.g. due to idle
318    /// timeout or shutdown). The store uses this to evict stale handles
319    /// from its cache and re-spawn the actor on the next `get` call.
320    pub fn is_alive(&self) -> bool {
321        !self.sender.is_closed()
322    }
323}
324
325/// Spawn a new aggregate actor with explicit configuration.
326///
327/// This is the internal entry point used by [`AggregateStore`](crate::AggregateStore)
328/// to pass an idle timeout to the actor loop.
329///
330/// # Arguments
331///
332/// * `stream_dir` - Path to the directory containing the event log.
333/// * `config` - Actor configuration (idle timeout).
334///
335/// # Returns
336///
337/// An [`AggregateHandle`] for sending commands and reading state.
338///
339/// # Errors
340///
341/// Returns [`std::io::Error`] if the event log cannot be opened.
342pub(crate) fn spawn_actor_with_config<A: Aggregate>(
343    stream_dir: &Path,
344    config: ActorConfig,
345) -> io::Result<AggregateHandle<A>> {
346    let writer = EventWriter::open(stream_dir)?;
347    let reader = writer.reader();
348    let views_dir = stream_dir.join("views");
349    let view = View::<A>::new("state", reducer::<A>(), &views_dir);
350    let (tx, rx) = mpsc::channel::<ActorMessage<A>>(32);
351    let handle_reader = reader.clone();
352
353    std::thread::spawn(move || {
354        run_actor::<A>(writer, view, reader, rx, config);
355    });
356
357    Ok(AggregateHandle {
358        sender: tx,
359        reader: handle_reader,
360    })
361}
362
363/// Spawn a new aggregate actor for the stream at `stream_dir`.
364///
365/// Opens the [`EventWriter`], creates an [`EventReader`] and aggregate
366/// state [`View`], then starts the actor loop on a dedicated blocking
367/// thread.
368///
369/// The actor created by this function uses an effectively infinite idle
370/// timeout. For configurable timeouts, use
371/// [`AggregateStoreBuilder::idle_timeout`](crate::AggregateStoreBuilder::idle_timeout).
372///
373/// # Arguments
374///
375/// * `stream_dir` - Path to the directory containing the event log
376///   (e.g. `<base>/streams/<type>/<id>`).
377///
378/// # Returns
379///
380/// An [`AggregateHandle`] for sending commands and reading state.
381///
382/// # Errors
383///
384/// Returns [`std::io::Error`] if the event log cannot be opened.
385pub fn spawn_actor<A: Aggregate>(stream_dir: &Path) -> io::Result<AggregateHandle<A>> {
386    // Use an effectively infinite timeout so the actor never idles out.
387    // `u64::MAX / 2` avoids overflow when tokio adds the timeout duration
388    // to the current `Instant`.
389    let config = ActorConfig {
390        idle_timeout: Duration::from_secs(u64::MAX / 2),
391    };
392    spawn_actor_with_config(stream_dir, config)
393}
394
395#[cfg(test)]
396mod tests {
397    use std::time::Duration;
398
399    use tempfile::TempDir;
400
401    use super::*;
402    use crate::aggregate::test_fixtures::{Counter, CounterCommand, CounterError, CounterEvent};
403    use crate::error::ExecuteError;
404
405    #[tokio::test]
406    async fn execute_increment_three_times() {
407        let tmp = TempDir::new().expect("failed to create temp dir");
408        let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
409
410        let ctx = CommandContext::default();
411        for _ in 0..3 {
412            handle
413                .execute(CounterCommand::Increment, ctx.clone())
414                .await
415                .expect("execute should succeed");
416        }
417
418        let state = handle.state().await.expect("state should succeed");
419        assert_eq!(state.value, 3);
420    }
421
422    #[tokio::test]
423    async fn execute_decrement_at_zero_returns_domain_error() {
424        let tmp = TempDir::new().expect("failed to create temp dir");
425        let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
426
427        let result = handle
428            .execute(CounterCommand::Decrement, CommandContext::default())
429            .await;
430
431        assert!(
432            matches!(result, Err(ExecuteError::Domain(CounterError::AlreadyZero))),
433            "expected Domain(AlreadyZero), got: {result:?}"
434        );
435    }
436
437    #[tokio::test]
438    async fn state_persists_across_respawn() {
439        let tmp = TempDir::new().expect("failed to create temp dir");
440
441        // First actor: increment twice, then drop the handle.
442        {
443            let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
444            let ctx = CommandContext::default();
445            handle
446                .execute(CounterCommand::Increment, ctx.clone())
447                .await
448                .expect("first increment should succeed");
449            handle
450                .execute(CounterCommand::Increment, ctx)
451                .await
452                .expect("second increment should succeed");
453        }
454        // Handle dropped -- channel closes, actor exits.
455
456        // Brief sleep to let the actor thread finish and release the
457        // flock before we open a new writer on the same directory.
458        tokio::time::sleep(Duration::from_millis(50)).await;
459
460        // Second actor on the same directory should recover the state.
461        let handle = spawn_actor::<Counter>(tmp.path()).expect("respawn should succeed");
462        let state = handle.state().await.expect("state should succeed");
463        assert_eq!(state.value, 2);
464    }
465
466    #[tokio::test]
467    async fn sequential_commands_correct() {
468        let tmp = TempDir::new().expect("failed to create temp dir");
469        let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
470        let ctx = CommandContext::default();
471
472        handle
473            .execute(CounterCommand::Increment, ctx.clone())
474            .await
475            .expect("increment should succeed");
476        handle
477            .execute(CounterCommand::Add(10), ctx.clone())
478            .await
479            .expect("add should succeed");
480        handle
481            .execute(CounterCommand::Decrement, ctx)
482            .await
483            .expect("decrement should succeed");
484
485        let state = handle.state().await.expect("state should succeed");
486        assert_eq!(state.value, 10);
487    }
488
489    #[tokio::test]
490    async fn execute_returns_produced_events() {
491        let tmp = TempDir::new().expect("failed to create temp dir");
492        let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
493
494        let events = handle
495            .execute(CounterCommand::Increment, CommandContext::default())
496            .await
497            .expect("execute should succeed");
498
499        assert_eq!(events, vec![CounterEvent::Incremented]);
500    }
501
502    #[tokio::test]
503    async fn idle_timeout_shuts_down_actor() {
504        let tmp = TempDir::new().expect("failed to create temp dir");
505        let config = ActorConfig {
506            idle_timeout: Duration::from_millis(200),
507        };
508        let handle =
509            spawn_actor_with_config::<Counter>(tmp.path(), config).expect("spawn should succeed");
510
511        // Execute a command before the timeout.
512        handle
513            .execute(CounterCommand::Increment, CommandContext::default())
514            .await
515            .expect("first execute should succeed");
516
517        // Wait for idle timeout to elapse.
518        tokio::time::sleep(Duration::from_millis(400)).await;
519
520        // Actor should have shut down.
521        assert!(
522            !handle.is_alive(),
523            "actor should be dead after idle timeout"
524        );
525
526        // Re-spawn on the same directory recovers state from disk.
527        let config2 = ActorConfig {
528            idle_timeout: Duration::from_secs(u64::MAX / 2),
529        };
530        let handle2 = spawn_actor_with_config::<Counter>(tmp.path(), config2)
531            .expect("respawn should succeed");
532        let state = handle2.state().await.expect("state should succeed");
533        assert_eq!(state.value, 1, "state should reflect the first command");
534    }
535
536    #[tokio::test]
537    async fn inject_via_actor_appends_and_updates_state() {
538        let tmp = TempDir::new().expect("failed to create temp dir");
539        let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
540
541        // Build a raw eventfold::Event matching the Counter's "Incremented" variant.
542        let event = eventfold::Event::new("Incremented", serde_json::Value::Null);
543
544        // Inject the event directly via the actor.
545        handle
546            .inject_via_actor(event)
547            .await
548            .expect("inject_via_actor should succeed");
549
550        // The actor's view should pick up the injected event on next refresh.
551        let state = handle.state().await.expect("state should succeed");
552        assert_eq!(
553            state.value, 1,
554            "injected Incremented event should bump counter to 1"
555        );
556    }
557
558    #[tokio::test]
559    async fn rapid_commands_prevent_idle_eviction() {
560        let tmp = TempDir::new().expect("failed to create temp dir");
561        let config = ActorConfig {
562            idle_timeout: Duration::from_millis(300),
563        };
564        let handle =
565            spawn_actor_with_config::<Counter>(tmp.path(), config).expect("spawn should succeed");
566
567        let ctx = CommandContext::default();
568        // Send commands at 100ms intervals, each resetting the idle timer.
569        for _ in 0..5 {
570            handle
571                .execute(CounterCommand::Increment, ctx.clone())
572                .await
573                .expect("execute should succeed");
574            tokio::time::sleep(Duration::from_millis(100)).await;
575        }
576
577        assert!(
578            handle.is_alive(),
579            "actor should still be alive during activity"
580        );
581        let state = handle.state().await.expect("state should succeed");
582        assert_eq!(state.value, 5);
583    }
584}