use async_trait::async_trait;
use dactor::actor::Actor;
use dactor::persistence::*;
#[derive(Debug, Clone)]
enum CounterEvent {
Add(i64),
Subtract(i64),
}
struct CounterActor {
id: String,
value: i64,
last_seq: SequenceId,
}
impl CounterActor {
fn new(id: &str) -> Self {
Self {
id: id.to_string(),
value: 0,
last_seq: SequenceId(0),
}
}
}
impl Actor for CounterActor {
type Args = ();
type Deps = ();
fn create(_args: (), _deps: ()) -> Self {
CounterActor::new("default")
}
}
impl PersistentActor for CounterActor {
fn persistence_id(&self) -> PersistenceId {
PersistenceId::new("Counter", &self.id)
}
}
#[async_trait]
impl EventSourced for CounterActor {
type Event = CounterEvent;
fn apply(&mut self, event: &CounterEvent) {
match event {
CounterEvent::Add(v) => self.value += v,
CounterEvent::Subtract(v) => self.value -= v,
}
}
fn serialize_event(&self, event: &CounterEvent) -> Result<Vec<u8>, PersistError> {
let bytes = match event {
CounterEvent::Add(v) => [b"A:".as_slice(), &v.to_le_bytes()].concat(),
CounterEvent::Subtract(v) => [b"S:".as_slice(), &v.to_le_bytes()].concat(),
};
Ok(bytes)
}
fn deserialize_event(&self, payload: &[u8]) -> Result<CounterEvent, PersistError> {
if payload.len() != 10 {
return Err(PersistError::SerializationFailed(
format!("expected 10 bytes, got {}", payload.len()),
));
}
let val = i64::from_le_bytes(payload[2..10].try_into().unwrap());
match &payload[..2] {
b"A:" => Ok(CounterEvent::Add(val)),
b"S:" => Ok(CounterEvent::Subtract(val)),
_ => Err(PersistError::SerializationFailed("unknown tag".into())),
}
}
fn snapshot_payload(&self) -> Result<Vec<u8>, PersistError> {
Ok(self.value.to_le_bytes().to_vec())
}
fn restore_snapshot(&mut self, payload: Vec<u8>) -> Result<(), PersistError> {
if payload.len() != 8 {
return Err(PersistError::SerializationFailed("bad snapshot len".into()));
}
self.value = i64::from_le_bytes(payload.try_into().unwrap());
Ok(())
}
fn last_sequence_id(&self) -> SequenceId {
self.last_seq
}
fn set_last_sequence_id(&mut self, seq: SequenceId) {
self.last_seq = seq;
}
}
struct ConfigActor {
id: String,
data: String,
}
impl ConfigActor {
fn new(id: &str) -> Self {
Self {
id: id.to_string(),
data: String::new(),
}
}
}
impl Actor for ConfigActor {
type Args = ();
type Deps = ();
fn create(_args: (), _deps: ()) -> Self {
ConfigActor::new("default")
}
}
impl PersistentActor for ConfigActor {
fn persistence_id(&self) -> PersistenceId {
PersistenceId::new("Config", &self.id)
}
}
#[async_trait]
impl DurableState for ConfigActor {
fn serialize_state(&self) -> Result<Vec<u8>, PersistError> {
Ok(self.data.as_bytes().to_vec())
}
fn restore_state(&mut self, payload: Vec<u8>) -> Result<(), PersistError> {
self.data = String::from_utf8(payload)
.map_err(|e| PersistError::SerializationFailed(e.to_string()))?;
Ok(())
}
}
#[tokio::test]
async fn test_event_sourced_full_lifecycle() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("lifecycle");
let expected_values = [10, 15, 12, 22, 17];
let events = [
CounterEvent::Add(10),
CounterEvent::Add(5),
CounterEvent::Subtract(3),
CounterEvent::Add(10),
CounterEvent::Subtract(5),
];
for (i, event) in events.into_iter().enumerate() {
actor.persist(event, &storage).await.unwrap();
assert_eq!(actor.value, expected_values[i], "mismatch after event {}", i);
}
assert_eq!(actor.last_sequence_id(), SequenceId(5));
actor.snapshot(&storage).await.unwrap();
actor.persist(CounterEvent::Add(100), &storage).await.unwrap();
actor.persist(CounterEvent::Subtract(7), &storage).await.unwrap();
actor.persist(CounterEvent::Add(3), &storage).await.unwrap();
assert_eq!(actor.value, 113);
assert_eq!(actor.last_sequence_id(), SequenceId(8));
}
#[tokio::test]
async fn test_event_sourced_recovery_from_events() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("evonly");
for i in 1..=10 {
actor
.persist(CounterEvent::Add(i as i64), &storage)
.await
.unwrap();
}
let expected_sum: i64 = (1..=10).sum(); assert_eq!(actor.value, expected_sum);
let mut recovered = CounterActor::new("evonly");
recover_event_sourced(&mut recovered, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered.value, expected_sum);
assert_eq!(recovered.last_sequence_id(), SequenceId(10));
}
#[tokio::test]
async fn test_event_sourced_recovery_with_snapshot() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("snaprec");
for i in 1..=5 {
actor
.persist(CounterEvent::Add(i * 10), &storage)
.await
.unwrap();
}
assert_eq!(actor.value, 150);
actor.snapshot(&storage).await.unwrap();
for i in 1..=5 {
actor
.persist(CounterEvent::Subtract(i), &storage)
.await
.unwrap();
}
assert_eq!(actor.value, 135);
let mut recovered = CounterActor::new("snaprec");
recover_event_sourced(&mut recovered, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered.value, 135);
assert_eq!(recovered.last_sequence_id(), SequenceId(10));
}
#[tokio::test]
async fn test_durable_state_save_and_recover() {
let storage = InMemoryStorage::new();
let mut actor = ConfigActor::new("cfg1");
actor.data = "persistent-value-42".to_string();
DurableState::save_state(&actor, &storage as &dyn StateStorage)
.await
.unwrap();
let mut recovered = ConfigActor::new("cfg1");
assert_eq!(recovered.data, "");
recover_durable_state(&mut recovered, &storage).await.unwrap();
assert_eq!(recovered.data, "persistent-value-42");
}
#[tokio::test]
async fn test_event_sourced_batch_persist() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("batch");
let events: Vec<CounterEvent> = (1..=10).map(CounterEvent::Add).collect();
let seq = actor.persist_batch(events, &storage).await.unwrap();
let expected_sum: i64 = (1..=10).sum(); assert_eq!(actor.value, expected_sum);
assert_eq!(seq, SequenceId(10));
assert_eq!(actor.last_sequence_id(), SequenceId(10));
let pid = actor.persistence_id();
let entries = storage.read_events(&pid, SequenceId(1)).await.unwrap();
assert_eq!(entries.len(), 10);
let mut recovered = CounterActor::new("batch");
recover_event_sourced(&mut recovered, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered.value, expected_sum, "recovered state should match batch sum");
assert_eq!(recovered.last_sequence_id(), SequenceId(10));
}
#[tokio::test]
async fn test_journal_cleanup_after_snapshot() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("cleanup");
for i in 1..=10 {
actor
.persist(CounterEvent::Add(i * 2), &storage)
.await
.unwrap();
}
assert_eq!(actor.value, 110);
actor.snapshot(&storage).await.unwrap();
let pid = actor.persistence_id();
storage.delete_events_to(&pid, SequenceId(10)).await.unwrap();
let remaining = storage.read_events(&pid, SequenceId(1)).await.unwrap();
assert!(remaining.is_empty());
let mut recovered = CounterActor::new("cleanup");
recover_event_sourced(&mut recovered, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered.value, 110);
assert_eq!(recovered.last_sequence_id(), SequenceId(10));
}
#[tokio::test]
async fn test_empty_recovery_creates_fresh_actor() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("fresh");
recover_event_sourced(&mut actor, &storage, &storage)
.await
.unwrap();
assert_eq!(actor.value, 0);
assert_eq!(actor.last_sequence_id(), SequenceId(0));
}
#[tokio::test]
async fn test_multiple_actors_independent_persistence() {
let storage = InMemoryStorage::new();
let mut actor_a = CounterActor::new("alpha");
actor_a.persist(CounterEvent::Add(10), &storage).await.unwrap();
actor_a.persist(CounterEvent::Add(20), &storage).await.unwrap();
actor_a.persist(CounterEvent::Add(30), &storage).await.unwrap();
assert_eq!(actor_a.value, 60);
let mut actor_b = CounterActor::new("beta");
actor_b.persist(CounterEvent::Subtract(5), &storage).await.unwrap();
actor_b.persist(CounterEvent::Add(100), &storage).await.unwrap();
assert_eq!(actor_b.value, 95);
let mut recovered_a = CounterActor::new("alpha");
recover_event_sourced(&mut recovered_a, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered_a.value, 60);
assert_eq!(recovered_a.last_sequence_id(), SequenceId(3));
let mut recovered_b = CounterActor::new("beta");
recover_event_sourced(&mut recovered_b, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered_b.value, 95);
assert_eq!(recovered_b.last_sequence_id(), SequenceId(2));
}
#[tokio::test]
async fn test_deserialize_corrupted_event_fails() {
let storage = InMemoryStorage::new();
let pid = PersistenceId::new("Counter", "corrupt-ev");
let mut setup = CounterActor::new("corrupt-ev");
setup.persist(CounterEvent::Add(100), &storage).await.unwrap();
storage
.write_event(&pid, SequenceId(2), "garbage", &[0xFF, 0xFF])
.await
.unwrap();
let mut actor = CounterActor::new("corrupt-ev");
let result = recover_event_sourced(&mut actor, &storage, &storage).await;
assert!(result.is_err(), "recovery should fail on corrupted event payload");
assert_eq!(actor.value, 0, "actor state should not change on failed recovery");
}
#[tokio::test]
async fn test_deserialize_corrupted_snapshot_fails() {
let storage = InMemoryStorage::new();
let pid = PersistenceId::new("Counter", "corrupt-snap");
let mut actor = CounterActor::new("corrupt-snap");
actor.persist(CounterEvent::Add(1), &storage).await.unwrap();
storage
.save_snapshot(&pid, SequenceId(1), &[0xDE, 0xAD])
.await
.unwrap();
let mut recovered = CounterActor::new("corrupt-snap");
let result = recover_event_sourced(&mut recovered, &storage, &storage).await;
assert!(result.is_err(), "recovery should fail on corrupted snapshot payload");
assert_eq!(recovered.value, 0, "actor state should not change on failed recovery");
}
#[tokio::test]
async fn test_durable_state_corrupted_payload() {
let storage = InMemoryStorage::new();
let pid = PersistenceId::new("Config", "corrupt-cfg");
storage
.save_state(&pid, &[0xFF, 0xFE, 0x80])
.await
.unwrap();
let mut actor = ConfigActor::new("corrupt-cfg");
let result = recover_durable_state(&mut actor, &storage).await;
assert!(result.is_err(), "recovery should fail on non-UTF-8 state payload");
assert_eq!(actor.data, "", "actor state should not change on failed recovery");
}
#[tokio::test]
async fn test_persist_batch_empty() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("empty-batch");
let seq = actor
.persist_batch(vec![], &storage)
.await
.unwrap();
assert_eq!(seq, SequenceId(0));
assert_eq!(actor.value, 0);
let pid = actor.persistence_id();
let entries = storage.read_events(&pid, SequenceId(1)).await.unwrap();
assert!(entries.is_empty(), "no entries should be written for an empty batch");
}
#[tokio::test]
async fn test_durable_state_delete_then_recover() {
let storage = InMemoryStorage::new();
let mut actor = ConfigActor::new("del-cfg");
actor.data = "important-data".to_string();
DurableState::save_state(&actor, &storage as &dyn StateStorage)
.await
.unwrap();
let pid = actor.persistence_id();
storage.delete_state(&pid).await.unwrap();
let mut recovered = ConfigActor::new("del-cfg");
recover_durable_state(&mut recovered, &storage).await.unwrap();
assert_eq!(recovered.data, "", "after delete, recovery should yield default state");
}
#[tokio::test]
async fn test_snapshot_cleanup_preserves_later_events() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("snap-cleanup");
for i in 1..=10 {
actor
.persist(CounterEvent::Add(i), &storage)
.await
.unwrap();
}
assert_eq!(actor.value, 55);
let pid = PersistenceId::new("Counter", "snap-cleanup");
let snapshot_value: i64 = 15;
storage
.save_snapshot(&pid, SequenceId(5), &snapshot_value.to_le_bytes())
.await
.unwrap();
storage.delete_events_to(&pid, SequenceId(5)).await.unwrap();
let remaining = storage.read_events(&pid, SequenceId(1)).await.unwrap();
assert_eq!(remaining.len(), 5, "events 6-10 should remain");
assert_eq!(remaining[0].sequence_id, SequenceId(6));
let mut recovered = CounterActor::new("snap-cleanup");
recover_event_sourced(&mut recovered, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered.value, 55);
assert_eq!(recovered.last_sequence_id(), SequenceId(10));
}
#[tokio::test]
async fn test_durable_state_overwrite() {
let storage = InMemoryStorage::new();
let mut actor = ConfigActor::new("overwrite");
actor.data = "v1".to_string();
DurableState::save_state(&actor, &storage as &dyn StateStorage)
.await
.unwrap();
actor.data = "v2".to_string();
DurableState::save_state(&actor, &storage as &dyn StateStorage)
.await
.unwrap();
let mut recovered = ConfigActor::new("overwrite");
recover_durable_state(&mut recovered, &storage).await.unwrap();
assert_eq!(recovered.data, "v2");
}
#[tokio::test]
async fn test_recovery_from_snapshot_without_events() {
let storage = InMemoryStorage::new();
let mut actor = CounterActor::new("snap-only");
for i in 1..=5 {
actor
.persist(CounterEvent::Add(i * 3), &storage)
.await
.unwrap();
}
assert_eq!(actor.value, 45);
actor.snapshot(&storage).await.unwrap();
let pid = actor.persistence_id();
storage.delete_events_to(&pid, SequenceId(5)).await.unwrap();
let remaining = storage.read_events(&pid, SequenceId(1)).await.unwrap();
assert!(remaining.is_empty());
let mut recovered = CounterActor::new("snap-only");
recover_event_sourced(&mut recovered, &storage, &storage)
.await
.unwrap();
assert_eq!(recovered.value, 45);
assert_eq!(recovered.last_sequence_id(), SequenceId(5));
}