use std::io;
use std::path::Path;
use std::time::Duration;
use eventfold::{EventReader, EventWriter, View};
use tokio::sync::{mpsc, oneshot};
use crate::aggregate::{Aggregate, reducer, to_eventfold_event};
use crate::command::CommandContext;
use crate::error::{ExecuteError, StateError};
#[allow(dead_code)]
const DEFAULT_MAX_RETRIES: u32 = 3;
pub(crate) struct ActorConfig {
pub idle_timeout: Duration,
}
type ExecuteResult<A> =
Result<Vec<<A as Aggregate>::DomainEvent>, ExecuteError<<A as Aggregate>::Error>>;
pub(crate) enum ActorMessage<A: Aggregate> {
Execute {
cmd: A::Command,
ctx: CommandContext,
reply: oneshot::Sender<ExecuteResult<A>>,
},
GetState {
reply: oneshot::Sender<Result<A, StateError>>,
},
Inject {
event: eventfold::Event,
reply: oneshot::Sender<io::Result<()>>,
},
#[allow(dead_code)] Shutdown,
}
pub(crate) fn run_actor<A: Aggregate>(
mut writer: EventWriter,
mut view: View<A>,
reader: EventReader,
mut rx: mpsc::Receiver<ActorMessage<A>>,
config: ActorConfig,
) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("failed to create actor timeout runtime");
loop {
let idle_timeout = config.idle_timeout;
let msg = rt.block_on(async { tokio::time::timeout(idle_timeout, rx.recv()).await });
match msg {
Ok(Some(msg)) => match msg {
ActorMessage::Execute { cmd, ctx, reply } => {
let _span = tracing::info_span!("execute", aggregate_type = A::AGGREGATE_TYPE,)
.entered();
let result = execute_command::<A>(&mut writer, &mut view, &reader, cmd, &ctx);
let _ = reply.send(result);
}
ActorMessage::GetState { reply } => {
let result = get_state::<A>(&mut view, &reader);
let _ = reply.send(result);
}
ActorMessage::Inject { event, reply } => {
let result = writer.append(&event).map(|_| ());
let _ = reply.send(result);
}
ActorMessage::Shutdown => break,
},
Ok(None) => break,
Err(_elapsed) => {
tracing::info!(
aggregate_type = A::AGGREGATE_TYPE,
"actor idle, shutting down"
);
break;
}
}
}
}
fn execute_command<A: Aggregate>(
writer: &mut EventWriter,
view: &mut View<A>,
reader: &EventReader,
cmd: A::Command,
ctx: &CommandContext,
) -> Result<Vec<A::DomainEvent>, ExecuteError<A::Error>> {
view.refresh(reader).map_err(ExecuteError::Io)?;
let state = view.state().clone();
let domain_events = state.handle(cmd).map_err(ExecuteError::Domain)?;
if domain_events.is_empty() {
return Ok(domain_events);
}
for de in &domain_events {
let ef_event = to_eventfold_event::<A>(de, ctx)
.map_err(|e| ExecuteError::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?;
writer.append(&ef_event).map_err(ExecuteError::Io)?;
}
tracing::info!(count = domain_events.len(), "events appended");
Ok(domain_events)
}
fn get_state<A: Aggregate>(view: &mut View<A>, reader: &EventReader) -> Result<A, StateError> {
view.refresh(reader).map_err(StateError::Io)?;
Ok(view.state().clone())
}
#[derive(Debug)]
pub struct AggregateHandle<A: Aggregate> {
sender: mpsc::Sender<ActorMessage<A>>,
reader: EventReader,
}
impl<A: Aggregate> Clone for AggregateHandle<A> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
reader: self.reader.clone(),
}
}
}
impl<A: Aggregate> AggregateHandle<A> {
pub async fn execute(
&self,
cmd: A::Command,
ctx: CommandContext,
) -> Result<Vec<A::DomainEvent>, ExecuteError<A::Error>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(ActorMessage::Execute {
cmd,
ctx,
reply: tx,
})
.await
.map_err(|_| ExecuteError::ActorGone)?;
rx.await.map_err(|_| ExecuteError::ActorGone)?
}
pub async fn state(&self) -> Result<A, StateError> {
let (tx, rx) = oneshot::channel();
self.sender
.send(ActorMessage::GetState { reply: tx })
.await
.map_err(|_| StateError::ActorGone)?;
rx.await.map_err(|_| StateError::ActorGone)?
}
pub(crate) async fn inject_via_actor(&self, event: eventfold::Event) -> io::Result<()> {
let (tx, rx) = oneshot::channel();
self.sender
.send(ActorMessage::Inject { event, reply: tx })
.await
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "actor gone"))?;
rx.await
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "actor gone"))?
}
pub fn reader(&self) -> &EventReader {
&self.reader
}
pub fn is_alive(&self) -> bool {
!self.sender.is_closed()
}
}
pub(crate) fn spawn_actor_with_config<A: Aggregate>(
stream_dir: &Path,
config: ActorConfig,
) -> io::Result<AggregateHandle<A>> {
let writer = EventWriter::open(stream_dir)?;
let reader = writer.reader();
let views_dir = stream_dir.join("views");
let view = View::<A>::new("state", reducer::<A>(), &views_dir);
let (tx, rx) = mpsc::channel::<ActorMessage<A>>(32);
let handle_reader = reader.clone();
std::thread::spawn(move || {
run_actor::<A>(writer, view, reader, rx, config);
});
Ok(AggregateHandle {
sender: tx,
reader: handle_reader,
})
}
pub fn spawn_actor<A: Aggregate>(stream_dir: &Path) -> io::Result<AggregateHandle<A>> {
let config = ActorConfig {
idle_timeout: Duration::from_secs(u64::MAX / 2),
};
spawn_actor_with_config(stream_dir, config)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tempfile::TempDir;
use super::*;
use crate::aggregate::test_fixtures::{Counter, CounterCommand, CounterError, CounterEvent};
use crate::error::ExecuteError;
#[tokio::test]
async fn execute_increment_three_times() {
let tmp = TempDir::new().expect("failed to create temp dir");
let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
let ctx = CommandContext::default();
for _ in 0..3 {
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("execute should succeed");
}
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 3);
}
#[tokio::test]
async fn execute_decrement_at_zero_returns_domain_error() {
let tmp = TempDir::new().expect("failed to create temp dir");
let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
let result = handle
.execute(CounterCommand::Decrement, CommandContext::default())
.await;
assert!(
matches!(result, Err(ExecuteError::Domain(CounterError::AlreadyZero))),
"expected Domain(AlreadyZero), got: {result:?}"
);
}
#[tokio::test]
async fn state_persists_across_respawn() {
let tmp = TempDir::new().expect("failed to create temp dir");
{
let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
let ctx = CommandContext::default();
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("first increment should succeed");
handle
.execute(CounterCommand::Increment, ctx)
.await
.expect("second increment should succeed");
}
tokio::time::sleep(Duration::from_millis(50)).await;
let handle = spawn_actor::<Counter>(tmp.path()).expect("respawn should succeed");
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 2);
}
#[tokio::test]
async fn sequential_commands_correct() {
let tmp = TempDir::new().expect("failed to create temp dir");
let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
let ctx = CommandContext::default();
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("increment should succeed");
handle
.execute(CounterCommand::Add(10), ctx.clone())
.await
.expect("add should succeed");
handle
.execute(CounterCommand::Decrement, ctx)
.await
.expect("decrement should succeed");
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 10);
}
#[tokio::test]
async fn execute_returns_produced_events() {
let tmp = TempDir::new().expect("failed to create temp dir");
let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
let events = handle
.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("execute should succeed");
assert_eq!(events, vec![CounterEvent::Incremented]);
}
#[tokio::test]
async fn idle_timeout_shuts_down_actor() {
let tmp = TempDir::new().expect("failed to create temp dir");
let config = ActorConfig {
idle_timeout: Duration::from_millis(200),
};
let handle =
spawn_actor_with_config::<Counter>(tmp.path(), config).expect("spawn should succeed");
handle
.execute(CounterCommand::Increment, CommandContext::default())
.await
.expect("first execute should succeed");
tokio::time::sleep(Duration::from_millis(400)).await;
assert!(
!handle.is_alive(),
"actor should be dead after idle timeout"
);
let config2 = ActorConfig {
idle_timeout: Duration::from_secs(u64::MAX / 2),
};
let handle2 = spawn_actor_with_config::<Counter>(tmp.path(), config2)
.expect("respawn should succeed");
let state = handle2.state().await.expect("state should succeed");
assert_eq!(state.value, 1, "state should reflect the first command");
}
#[tokio::test]
async fn inject_via_actor_appends_and_updates_state() {
let tmp = TempDir::new().expect("failed to create temp dir");
let handle = spawn_actor::<Counter>(tmp.path()).expect("spawn_actor should succeed");
let event = eventfold::Event::new("Incremented", serde_json::Value::Null);
handle
.inject_via_actor(event)
.await
.expect("inject_via_actor should succeed");
let state = handle.state().await.expect("state should succeed");
assert_eq!(
state.value, 1,
"injected Incremented event should bump counter to 1"
);
}
#[tokio::test]
async fn rapid_commands_prevent_idle_eviction() {
let tmp = TempDir::new().expect("failed to create temp dir");
let config = ActorConfig {
idle_timeout: Duration::from_millis(300),
};
let handle =
spawn_actor_with_config::<Counter>(tmp.path(), config).expect("spawn should succeed");
let ctx = CommandContext::default();
for _ in 0..5 {
handle
.execute(CounterCommand::Increment, ctx.clone())
.await
.expect("execute should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
}
assert!(
handle.is_alive(),
"actor should still be alive during activity"
);
let state = handle.state().await.expect("state should succeed");
assert_eq!(state.value, 5);
}
}