eventfold_es/actor.rs
1//! Actor loop that owns an aggregate and processes commands.
2//!
3//! The actor runs on a blocking thread and sequentially processes messages
4//! from an `mpsc` channel. It exclusively owns the `EventWriter` (and
5//! therefore the flock), the `View<A>` for maintaining aggregate state,
6//! and the `EventReader` for refreshing the view before each command.
7//!
8//! Public API: [`AggregateHandle`] (cloneable async handle) and
9//! [`spawn_actor`] (factory that opens the log and starts the actor thread).
10
11use std::io;
12use std::path::Path;
13use std::time::Duration;
14
15use eventfold::{EventReader, EventWriter, View};
16use tokio::sync::{mpsc, oneshot};
17
18use crate::aggregate::{Aggregate, reducer, to_eventfold_event};
19use crate::command::CommandContext;
20use crate::error::{ExecuteError, StateError};
21
22/// Maximum number of optimistic concurrency retries before giving up.
23///
24/// Currently unused because the actor exclusively owns the writer, so
25/// conflicts cannot occur. Retained for future `append_if` support.
26#[allow(dead_code)]
27const DEFAULT_MAX_RETRIES: u32 = 3;
28
29/// Configuration for the actor loop.
30///
31/// Internal to the crate -- callers configure idle timeout through
32/// [`AggregateStoreBuilder::idle_timeout`](crate::AggregateStoreBuilder::idle_timeout).
33pub(crate) struct ActorConfig {
34 /// How long the actor waits for a message before shutting down.
35 /// An effectively infinite value means the actor never idles out.
36 pub idle_timeout: Duration,
37}
38
39/// Result type sent back through the `Execute` reply channel.
40type ExecuteResult<A> =
41 Result<Vec<<A as Aggregate>::DomainEvent>, ExecuteError<<A as Aggregate>::Error>>;
42
43/// Messages sent from `AggregateHandle` to the actor loop.
44///
45/// Each variant carries a `oneshot::Sender` for the actor to reply on
46/// once the operation completes.
47pub(crate) enum ActorMessage<A: Aggregate> {
48 /// Execute a command against the aggregate.
49 Execute {
50 /// The domain command to execute.
51 cmd: A::Command,
52 /// Cross-cutting metadata (actor identity, correlation ID, etc.).
53 ctx: CommandContext,
54 /// Channel to send back the produced domain events or an error.
55 reply: oneshot::Sender<ExecuteResult<A>>,
56 },
57
58 /// Retrieve the current aggregate state.
59 GetState {
60 /// Channel to send back a clone of the current state or an error.
61 reply: oneshot::Sender<Result<A, StateError>>,
62 },
63
64 /// Inject a pre-validated `eventfold::Event` directly into the stream.
65 ///
66 /// The actor appends the event via its owned `EventWriter` and sends
67 /// the I/O result back on `reply`. This keeps the actor as the sole
68 /// writer for any stream that has a live actor.
69 Inject {
70 /// The pre-validated event to append as-is.
71 event: eventfold::Event,
72 /// Channel to send back the I/O result.
73 reply: oneshot::Sender<io::Result<()>>,
74 },
75
76 /// Gracefully shut down the actor loop.
77 #[allow(dead_code)] // Constructed only in tests.
78 Shutdown,
79}
80
81/// Runs the aggregate actor loop on a blocking thread.
82///
83/// Owns the `EventWriter` and aggregate state `View`. Receives messages
84/// from `AggregateHandle` via the mpsc channel and processes them sequentially.
85/// The loop exits when the channel closes (all senders dropped), a
86/// `Shutdown` message is received, or the idle timeout elapses. On exit the
87/// `EventWriter` is dropped, releasing the flock.
88///
89/// # Arguments
90///
91/// * `writer` - Exclusive writer for the aggregate's event stream.
92/// * `view` - Derived view that holds the current aggregate state.
93/// * `reader` - Reader used to refresh the view before each operation.
94/// * `rx` - Receiving end of the mpsc channel carrying `ActorMessage`s.
95/// * `config` - Actor configuration (idle timeout).
96pub(crate) fn run_actor<A: Aggregate>(
97 mut writer: EventWriter,
98 mut view: View<A>,
99 reader: EventReader,
100 mut rx: mpsc::Receiver<ActorMessage<A>>,
101 config: ActorConfig,
102) {
103 // Build a lightweight current-thread runtime with time enabled.
104 // The actor needs `tokio::time::timeout` to implement idle eviction,
105 // but the parent runtime may be current-thread (common in tests),
106 // which doesn't drive timers from non-runtime threads. A dedicated
107 // minimal runtime avoids that constraint and keeps the actor
108 // self-contained.
109 let rt = tokio::runtime::Builder::new_current_thread()
110 .enable_time()
111 .build()
112 .expect("failed to create actor timeout runtime");
113
114 loop {
115 // Create the timeout future INSIDE `block_on` so that the `Sleep`
116 // timer registers with the local runtime's time driver.
117 let idle_timeout = config.idle_timeout;
118 let msg = rt.block_on(async { tokio::time::timeout(idle_timeout, rx.recv()).await });
119
120 match msg {
121 // Received a message before the timeout elapsed.
122 Ok(Some(msg)) => match msg {
123 ActorMessage::Execute { cmd, ctx, reply } => {
124 let _span = tracing::info_span!("execute", aggregate_type = A::AGGREGATE_TYPE,)
125 .entered();
126 let result = execute_command::<A>(&mut writer, &mut view, &reader, cmd, &ctx);
127 // If the receiver was dropped, the caller no longer cares
128 // about the result. Silently discard it.
129 let _ = reply.send(result);
130 }
131
132 ActorMessage::GetState { reply } => {
133 let result = get_state::<A>(&mut view, &reader);
134 let _ = reply.send(result);
135 }
136
137 ActorMessage::Inject { event, reply } => {
138 let result = writer.append(&event).map(|_| ());
139 let _ = reply.send(result);
140 }
141
142 ActorMessage::Shutdown => break,
143 },
144 // Channel closed: all senders dropped.
145 Ok(None) => break,
146 // Idle timeout elapsed with no messages.
147 Err(_elapsed) => {
148 tracing::info!(
149 aggregate_type = A::AGGREGATE_TYPE,
150 "actor idle, shutting down"
151 );
152 break;
153 }
154 }
155 }
156 // Loop exited: either `Shutdown` received, channel closed, or idle timeout.
157 // `writer` is dropped here, releasing the flock on the event log.
158}
159
160/// Execute a single command: refresh state, handle the command, persist events.
161///
162/// Factored out of the match arm for clarity and to keep the actor loop concise.
163fn execute_command<A: Aggregate>(
164 writer: &mut EventWriter,
165 view: &mut View<A>,
166 reader: &EventReader,
167 cmd: A::Command,
168 ctx: &CommandContext,
169) -> Result<Vec<A::DomainEvent>, ExecuteError<A::Error>> {
170 // 1. Refresh the view to incorporate any events not yet folded.
171 // Although we are the sole writer, a prior iteration may have
172 // appended events that the view hasn't consumed yet.
173 view.refresh(reader).map_err(ExecuteError::Io)?;
174
175 // 2. Decide: run the command handler against current state.
176 let state = view.state().clone();
177 let domain_events = state.handle(cmd).map_err(ExecuteError::Domain)?;
178
179 // 3. No-op commands produce no events.
180 if domain_events.is_empty() {
181 return Ok(domain_events);
182 }
183
184 // 4. Convert each domain event to an `eventfold::Event` and append.
185 for de in &domain_events {
186 let ef_event = to_eventfold_event::<A>(de, ctx)
187 .map_err(|e| ExecuteError::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?;
188 writer.append(&ef_event).map_err(ExecuteError::Io)?;
189 }
190
191 tracing::info!(count = domain_events.len(), "events appended");
192
193 Ok(domain_events)
194}
195
196/// Refresh the view and return a clone of the current aggregate state.
197fn get_state<A: Aggregate>(view: &mut View<A>, reader: &EventReader) -> Result<A, StateError> {
198 view.refresh(reader).map_err(StateError::Io)?;
199 Ok(view.state().clone())
200}
201
202/// Async handle to a running aggregate actor.
203///
204/// Lightweight, cloneable, and `Send + Sync`. Communicates with the
205/// actor thread over a bounded channel.
206///
207/// # Type Parameters
208///
209/// * `A` - The [`Aggregate`] type this handle controls.
210#[derive(Debug)]
211pub struct AggregateHandle<A: Aggregate> {
212 sender: mpsc::Sender<ActorMessage<A>>,
213 reader: EventReader,
214}
215
216// Manual `Clone` because `A` itself need not be `Clone` for the handle --
217// we only clone the `Sender` and `EventReader`, both of which are always
218// `Clone` regardless of `A`.
219impl<A: Aggregate> Clone for AggregateHandle<A> {
220 fn clone(&self) -> Self {
221 Self {
222 sender: self.sender.clone(),
223 reader: self.reader.clone(),
224 }
225 }
226}
227
228impl<A: Aggregate> AggregateHandle<A> {
229 /// Send a command to the aggregate and wait for the result.
230 ///
231 /// Returns the domain events produced by the command on success.
232 ///
233 /// # Arguments
234 ///
235 /// * `cmd` - The domain command to execute against the aggregate.
236 /// * `ctx` - Cross-cutting metadata (actor identity, correlation ID, etc.).
237 ///
238 /// # Returns
239 ///
240 /// The domain events produced by the command on success.
241 ///
242 /// # Errors
243 ///
244 /// * [`ExecuteError::Domain`] -- the aggregate rejected the command.
245 /// * [`ExecuteError::Io`] -- a disk I/O error occurred.
246 /// * [`ExecuteError::ActorGone`] -- the actor thread has exited.
247 pub async fn execute(
248 &self,
249 cmd: A::Command,
250 ctx: CommandContext,
251 ) -> Result<Vec<A::DomainEvent>, ExecuteError<A::Error>> {
252 let (tx, rx) = oneshot::channel();
253 self.sender
254 .send(ActorMessage::Execute {
255 cmd,
256 ctx,
257 reply: tx,
258 })
259 .await
260 .map_err(|_| ExecuteError::ActorGone)?;
261 rx.await.map_err(|_| ExecuteError::ActorGone)?
262 }
263
264 /// Read the current aggregate state.
265 ///
266 /// Refreshes the view from disk before returning.
267 ///
268 /// # Returns
269 ///
270 /// A clone of the current aggregate state.
271 ///
272 /// # Errors
273 ///
274 /// * [`StateError::Io`] -- a disk I/O error occurred.
275 /// * [`StateError::ActorGone`] -- the actor thread has exited.
276 pub async fn state(&self) -> Result<A, StateError> {
277 let (tx, rx) = oneshot::channel();
278 self.sender
279 .send(ActorMessage::GetState { reply: tx })
280 .await
281 .map_err(|_| StateError::ActorGone)?;
282 rx.await.map_err(|_| StateError::ActorGone)?
283 }
284
285 /// Inject a pre-validated event by sending it to the actor for append.
286 ///
287 /// The actor owns the exclusive `EventWriter`, so this method routes
288 /// the event through the actor's channel to avoid lock contention.
289 /// The actor appends the event and sends the I/O result back.
290 ///
291 /// # Arguments
292 ///
293 /// * `event` - A pre-validated `eventfold::Event` to append as-is.
294 ///
295 /// # Errors
296 ///
297 /// * [`io::Error`] with [`io::ErrorKind::BrokenPipe`] if the actor
298 /// has exited and the channel is closed.
299 /// * [`io::Error`] if the underlying `EventWriter::append` fails.
300 pub(crate) async fn inject_via_actor(&self, event: eventfold::Event) -> io::Result<()> {
301 let (tx, rx) = oneshot::channel();
302 self.sender
303 .send(ActorMessage::Inject { event, reply: tx })
304 .await
305 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "actor gone"))?;
306 rx.await
307 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "actor gone"))?
308 }
309
310 /// Returns a reference to the [`EventReader`] for this aggregate's stream.
311 pub fn reader(&self) -> &EventReader {
312 &self.reader
313 }
314
315 /// Check whether the actor backing this handle is still running.
316 ///
317 /// Returns `false` if the actor thread has exited (e.g. due to idle
318 /// timeout or shutdown). The store uses this to evict stale handles
319 /// from its cache and re-spawn the actor on the next `get` call.
320 pub fn is_alive(&self) -> bool {
321 !self.sender.is_closed()
322 }
323}
324
325/// Spawn a new aggregate actor with explicit configuration.
326///
327/// This is the internal entry point used by [`AggregateStore`](crate::AggregateStore)
328/// to pass an idle timeout to the actor loop.
329///
330/// # Arguments
331///
332/// * `stream_dir` - Path to the directory containing the event log.
333/// * `config` - Actor configuration (idle timeout).
334///
335/// # Returns
336///
337/// An [`AggregateHandle`] for sending commands and reading state.
338///
339/// # Errors
340///
341/// Returns [`std::io::Error`] if the event log cannot be opened.
342pub(crate) fn spawn_actor_with_config<A: Aggregate>(
343 stream_dir: &Path,
344 config: ActorConfig,
345) -> io::Result<AggregateHandle<A>> {
346 let writer = EventWriter::open(stream_dir)?;
347 let reader = writer.reader();
348 let views_dir = stream_dir.join("views");
349 let view = View::<A>::new("state", reducer::<A>(), &views_dir);
350 let (tx, rx) = mpsc::channel::<ActorMessage<A>>(32);
351 let handle_reader = reader.clone();
352
353 std::thread::spawn(move || {
354 run_actor::<A>(writer, view, reader, rx, config);
355 });
356
357 Ok(AggregateHandle {
358 sender: tx,
359 reader: handle_reader,
360 })
361}
362
363/// Spawn a new aggregate actor for the stream at `stream_dir`.
364///
365/// Opens the [`EventWriter`], creates an [`EventReader`] and aggregate
366/// state [`View`], then starts the actor loop on a dedicated blocking
367/// thread.
368///
369/// The actor created by this function uses an effectively infinite idle
370/// timeout. For configurable timeouts, use
371/// [`AggregateStoreBuilder::idle_timeout`](crate::AggregateStoreBuilder::idle_timeout).
372///
373/// # Arguments
374///
375/// * `stream_dir` - Path to the directory containing the event log
376/// (e.g. `<base>/streams/<type>/<id>`).
377///
378/// # Returns
379///
380/// An [`AggregateHandle`] for sending commands and reading state.
381///
382/// # Errors
383///
384/// Returns [`std::io::Error`] if the event log cannot be opened.
385pub fn spawn_actor<A: Aggregate>(stream_dir: &Path) -> io::Result<AggregateHandle<A>> {
386 // Use an effectively infinite timeout so the actor never idles out.
387 // `u64::MAX / 2` avoids overflow when tokio adds the timeout duration
388 // to the current `Instant`.
389 let config = ActorConfig {
390 idle_timeout: Duration::from_secs(u64::MAX / 2),
391 };
392 spawn_actor_with_config(stream_dir, config)
393}
394
395#[cfg(test)]
396mod tests {
397 use std::time::Duration;
398
399 use tempfile::TempDir;
400
401 use super::*;
402 use crate::aggregate::test_fixtures::{Counter, CounterCommand, CounterError, CounterEvent};
403 use crate::error::ExecuteError;
404
405 #[tokio::test]
406 async fn execute_increment_three_times() {
407 let tmp = TempDir::new().expect("failed to create temp dir");
408 let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
409
410 let ctx = CommandContext::default();
411 for _ in 0..3 {
412 handle
413 .execute(CounterCommand::Increment, ctx.clone())
414 .await
415 .expect("execute should succeed");
416 }
417
418 let state = handle.state().await.expect("state should succeed");
419 assert_eq!(state.value, 3);
420 }
421
422 #[tokio::test]
423 async fn execute_decrement_at_zero_returns_domain_error() {
424 let tmp = TempDir::new().expect("failed to create temp dir");
425 let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
426
427 let result = handle
428 .execute(CounterCommand::Decrement, CommandContext::default())
429 .await;
430
431 assert!(
432 matches!(result, Err(ExecuteError::Domain(CounterError::AlreadyZero))),
433 "expected Domain(AlreadyZero), got: {result:?}"
434 );
435 }
436
437 #[tokio::test]
438 async fn state_persists_across_respawn() {
439 let tmp = TempDir::new().expect("failed to create temp dir");
440
441 // First actor: increment twice, then drop the handle.
442 {
443 let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
444 let ctx = CommandContext::default();
445 handle
446 .execute(CounterCommand::Increment, ctx.clone())
447 .await
448 .expect("first increment should succeed");
449 handle
450 .execute(CounterCommand::Increment, ctx)
451 .await
452 .expect("second increment should succeed");
453 }
454 // Handle dropped -- channel closes, actor exits.
455
456 // Brief sleep to let the actor thread finish and release the
457 // flock before we open a new writer on the same directory.
458 tokio::time::sleep(Duration::from_millis(50)).await;
459
460 // Second actor on the same directory should recover the state.
461 let handle = spawn_actor::<Counter>(tmp.path()).expect("respawn should succeed");
462 let state = handle.state().await.expect("state should succeed");
463 assert_eq!(state.value, 2);
464 }
465
466 #[tokio::test]
467 async fn sequential_commands_correct() {
468 let tmp = TempDir::new().expect("failed to create temp dir");
469 let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
470 let ctx = CommandContext::default();
471
472 handle
473 .execute(CounterCommand::Increment, ctx.clone())
474 .await
475 .expect("increment should succeed");
476 handle
477 .execute(CounterCommand::Add(10), ctx.clone())
478 .await
479 .expect("add should succeed");
480 handle
481 .execute(CounterCommand::Decrement, ctx)
482 .await
483 .expect("decrement should succeed");
484
485 let state = handle.state().await.expect("state should succeed");
486 assert_eq!(state.value, 10);
487 }
488
489 #[tokio::test]
490 async fn execute_returns_produced_events() {
491 let tmp = TempDir::new().expect("failed to create temp dir");
492 let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
493
494 let events = handle
495 .execute(CounterCommand::Increment, CommandContext::default())
496 .await
497 .expect("execute should succeed");
498
499 assert_eq!(events, vec![CounterEvent::Incremented]);
500 }
501
502 #[tokio::test]
503 async fn idle_timeout_shuts_down_actor() {
504 let tmp = TempDir::new().expect("failed to create temp dir");
505 let config = ActorConfig {
506 idle_timeout: Duration::from_millis(200),
507 };
508 let handle =
509 spawn_actor_with_config::<Counter>(tmp.path(), config).expect("spawn should succeed");
510
511 // Execute a command before the timeout.
512 handle
513 .execute(CounterCommand::Increment, CommandContext::default())
514 .await
515 .expect("first execute should succeed");
516
517 // Wait for idle timeout to elapse.
518 tokio::time::sleep(Duration::from_millis(400)).await;
519
520 // Actor should have shut down.
521 assert!(
522 !handle.is_alive(),
523 "actor should be dead after idle timeout"
524 );
525
526 // Re-spawn on the same directory recovers state from disk.
527 let config2 = ActorConfig {
528 idle_timeout: Duration::from_secs(u64::MAX / 2),
529 };
530 let handle2 = spawn_actor_with_config::<Counter>(tmp.path(), config2)
531 .expect("respawn should succeed");
532 let state = handle2.state().await.expect("state should succeed");
533 assert_eq!(state.value, 1, "state should reflect the first command");
534 }
535
536 #[tokio::test]
537 async fn inject_via_actor_appends_and_updates_state() {
538 let tmp = TempDir::new().expect("failed to create temp dir");
539 let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
540
541 // Build a raw eventfold::Event matching the Counter's "Incremented" variant.
542 let event = eventfold::Event::new("Incremented", serde_json::Value::Null);
543
544 // Inject the event directly via the actor.
545 handle
546 .inject_via_actor(event)
547 .await
548 .expect("inject_via_actor should succeed");
549
550 // The actor's view should pick up the injected event on next refresh.
551 let state = handle.state().await.expect("state should succeed");
552 assert_eq!(
553 state.value, 1,
554 "injected Incremented event should bump counter to 1"
555 );
556 }
557
558 #[tokio::test]
559 async fn rapid_commands_prevent_idle_eviction() {
560 let tmp = TempDir::new().expect("failed to create temp dir");
561 let config = ActorConfig {
562 idle_timeout: Duration::from_millis(300),
563 };
564 let handle =
565 spawn_actor_with_config::<Counter>(tmp.path(), config).expect("spawn should succeed");
566
567 let ctx = CommandContext::default();
568 // Send commands at 100ms intervals, each resetting the idle timer.
569 for _ in 0..5 {
570 handle
571 .execute(CounterCommand::Increment, ctx.clone())
572 .await
573 .expect("execute should succeed");
574 tokio::time::sleep(Duration::from_millis(100)).await;
575 }
576
577 assert!(
578 handle.is_alive(),
579 "actor should still be alive during activity"
580 );
581 let state = handle.state().await.expect("state should succeed");
582 assert_eq!(state.value, 5);
583 }
584}