Expand description
§ave-actors-store
Event-sourced persistence for actors built on ave-actors-actor. Actors record state changes as an immutable event log and recover their state on restart by replaying events and loading snapshots.
This crate is part of the ave-actors workspace.
§Core concepts
| Concept | Description |
|---|---|
PersistentActor | Trait extending Actor with event sourcing — implement apply and call persist |
LightPersistence | Strategy: stores event + state snapshot on every write (fast recovery) |
FullPersistence | Strategy: stores events only, replays on recovery (smaller footprint, full audit trail) |
InitializedActor<A> | Required wrapper returned by PersistentActor::initial(params) |
DbManager<C, S> | Backend factory trait — implement to plug in a custom database |
Collection | Ordered key-value storage for the event log |
State | Single-value storage for state snapshots |
§Main API
| API | Receives | Returns | Purpose |
|---|---|---|---|
PersistentActor::create_initial | InitParams | Self | Builds the base state used on first start and recovery without a snapshot |
PersistentActor::initial | InitParams | InitializedActor<Self> | Wraps the actor so the actor system accepts it as persistent |
PersistentActor::apply | &Event | Result<(), ActorError> | Applies one event to in-memory state |
PersistentActor::start_store | store name, optional prefix, ActorContext, backend manager, optional EncryptedKey | Result<(), ActorError> | Opens the backend and recovers persisted state into the actor |
PersistentActor::persist | &Event, ActorContext | Result<(), ActorError> | Applies and durably records one event, rolling back memory on failure |
PersistentActor::snapshot | ActorContext | Result<(), ActorError> | Forces an immediate snapshot of the current actor state |
§Quick start
use ave_actors_actor::{
Actor, ActorContext, ActorPath, Error as ActorError, Event, Handler,
Message, Response,
};
use ave_actors_store::{
store::{FullPersistence, PersistentActor},
memory::MemoryManager,
};
use async_trait::async_trait;
use borsh::{BorshDeserialize, BorshSerialize};
use serde::{Deserialize, Serialize};
use tracing::info_span;
// --- Events ---
#[derive(Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
enum CounterEvent {
Incremented(i32),
}
impl Event for CounterEvent {}
// --- Messages & responses ---
#[derive(Debug, Clone, Serialize, Deserialize)]
enum CounterMsg { Increment(i32), GetValue }
#[derive(Debug, Clone, PartialEq)]
enum CounterResp { Ok, Value(i32) }
impl Message for CounterMsg {}
impl Response for CounterResp {}
// --- Actor ---
#[derive(Debug, Clone, Default, BorshSerialize, BorshDeserialize)]
struct Counter { value: i32 }
#[async_trait]
impl Actor for Counter {
type Message = CounterMsg;
type Event = CounterEvent;
type Response = CounterResp;
fn get_span(id: &str, _parent: Option<tracing::Span>) -> tracing::Span {
info_span!("Counter", id)
}
async fn pre_start(&mut self, ctx: &mut ActorContext<Self>) -> Result<(), ActorError> {
// start_store creates the "store" child actor, opens the backend,
// and recovers any previously persisted state into `self`.
self.start_store("counter", None, ctx, MemoryManager::default(), None).await
}
}
#[async_trait]
impl Handler<Counter> for Counter {
async fn handle_message(
&mut self,
_sender: ActorPath,
msg: CounterMsg,
ctx: &mut ActorContext<Self>,
) -> Result<CounterResp, ActorError> {
match msg {
CounterMsg::Increment(n) => {
// persist applies the event to self and saves it durably.
// If persistence fails, the in-memory state is rolled back.
self.persist(&CounterEvent::Incremented(n), ctx).await?;
Ok(CounterResp::Ok)
}
CounterMsg::GetValue => Ok(CounterResp::Value(self.value)),
}
}
}
// --- PersistentActor ---
impl PersistentActor for Counter {
type Persistence = FullPersistence;
type InitParams = ();
fn create_initial(_params: ()) -> Self {
Self::default()
}
fn apply(&mut self, event: &CounterEvent) -> Result<(), ActorError> {
match event {
CounterEvent::Incremented(n) => self.value += n,
}
Ok(())
}
}Create and use the actor:
use ave_actors_actor::{ActorSystem, ActorRef};
use ave_actors_store::store::PersistentActor;
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() {
let graceful = CancellationToken::new();
let crash = CancellationToken::new();
let (system, mut runner) = ActorSystem::create(graceful, crash);
// Use PersistentActor::initial() — the only accepted way to create a persistent actor.
let counter: ActorRef<Counter> = system
.create_root_actor("counter", Counter::initial(()))
.await
.unwrap();
counter.ask(CounterMsg::Increment(10)).await.unwrap();
let resp = counter.ask(CounterMsg::GetValue).await.unwrap();
assert_eq!(resp, CounterResp::Value(10));
system.stop_system();
runner.run().await;
}§Persistence strategies
| Strategy | Write | Recovery | When to use |
|---|---|---|---|
LightPersistence | Event + state snapshot | Load snapshot (no replay) | When recovery speed matters most |
FullPersistence | Event only | Load last snapshot + replay remaining events | When storage efficiency or a full audit trail matters |
For FullPersistence, snapshots are taken automatically every N events (default: 100). Override to tune:
fn snapshot_every() -> Option<u64> {
Some(50) // snapshot every 50 events
}Set compact_on_snapshot() -> bool to true to delete events already covered by a snapshot and reduce storage use:
fn compact_on_snapshot() -> bool {
true
}§Encryption
Pass an [EncryptedKey] to start_store to encrypt all events and snapshots at rest using XChaCha20-Poly1305:
use ave_actors_actor::EncryptedKey;
async fn pre_start(&mut self, ctx: &mut ActorContext<Self>) -> Result<(), ActorError> {
let raw_key: [u8; 32] = /* load from secure storage */;
let key = EncryptedKey::new(&raw_key).map_err(|e| ActorError::Helper {
name: "key".into(),
reason: e.to_string(),
})?;
self.start_store("my-actor", None, ctx, MemoryManager::default(), Some(key)).await
}The key itself is held in memory encrypted via ASCON AEAD (see ave-actors-actor’s EncryptedKey).
§Storage backends
| Crate | Backend | Use case |
|---|---|---|
ave-actors-store (built-in) | MemoryManager | Tests and ephemeral state |
ave-actors-sqlite | SQLite (via rusqlite) | Single-node, embedded persistence |
ave-actors-rocksdb | RocksDB | High-throughput, multi-actor workloads |
All three backends implement DbManager<C, S> and can be swapped without changing actor code.
§Implementing a custom backend
Implement DbManager, Collection, and State from ave_actors_store::database:
use ave_actors_store::database::{DbManager, Collection, State};
struct MyManager { /* ... */ }
struct MyStore { /* ... */ }
impl DbManager<MyStore, MyStore> for MyManager {
fn create_collection(&self, name: &str, prefix: &str) -> Result<MyStore, Error> { /* ... */ }
fn create_state(&self, name: &str, prefix: &str) -> Result<MyStore, Error> { /* ... */ }
}
impl Collection for MyStore { /* ... */ }
impl State for MyStore { /* ... */ }Use the test_store_trait! macro from ave_actors_store to verify your implementation:
ave_actors_store::test_store_trait! {
my_backend_tests: MyManager: MyStore
}Re-exports§
pub use error::Error;pub use error::StoreOperation;pub use store::InitializedActor;
Modules§
- config
- database
- Storage backend traits:
DbManager,Collection, andState. - error
- Error types for the store system.
- memory
- In-memory
DbManagerbackend, intended for tests and ephemeral usage. - store
- Event-sourced persistence for actors via
PersistentActor.