use std::{any::type_name, fmt::Debug, future::Future};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{debug, warn};
use crate::{
actor::{Actor, ActorArgs, ActorId},
actor_ref::ActorRef,
context::{Context, RootContext, spawn_with_id_impl},
};
pub trait PersistentStorage: Send + Sync {
fn try_read(&self, id: ActorId) -> impl Future<Output = Result<Vec<u8>, anyhow::Error>> + Send;
fn try_write(
&self,
id: ActorId,
bytes: Vec<u8>,
) -> impl Future<Output = Result<(), anyhow::Error>> + Send;
}
pub trait PersistentActor: Actor {
type Snapshot: Debug
+ Clone
+ Send
+ Sync
+ Serialize
+ for<'a> Deserialize<'a>
+ for<'a> From<&'a Self>;
type RuntimeArgs: Debug + Send;
type ActorArgs: ActorArgs<Actor = Self>;
fn persistent_args(
snapshot: Self::Snapshot,
runtime_args: Self::RuntimeArgs,
) -> Self::ActorArgs;
}
pub trait PersistentSpawnExt {
fn spawn_persistent<S, Args>(
&self,
storage: &S,
actor_id: ActorId,
args: Args,
) -> impl Future<Output = Result<ActorRef<Args::Actor>, PersistenceError>> + Send
where
S: PersistentStorage,
Args: ActorArgs,
<Args as ActorArgs>::Actor: PersistentActor;
fn respawn<S, B>(
&self,
storage: &S,
actor_id: ActorId,
runtime_args: B::RuntimeArgs,
) -> impl Future<Output = Result<ActorRef<B>, PersistenceError>> + Send
where
S: PersistentStorage,
B: Actor + PersistentActor;
fn respawn_or<S, Args>(
&self,
storage: &S,
actor_id: ActorId,
runtime_args: impl FnOnce() -> <<Args as ActorArgs>::Actor as PersistentActor>::RuntimeArgs
+ Send,
default_args: impl FnOnce() -> Args + Send,
) -> impl Future<Output = Result<ActorRef<Args::Actor>, PersistenceError>> + Send
where
S: PersistentStorage,
Args: ActorArgs,
<Args as ActorArgs>::Actor: PersistentActor;
}
pub trait PersistentContextExt<A>
where
A: Actor + PersistentActor,
{
fn save_snapshot<S: PersistentStorage>(
&self,
storage: &S,
state: &A,
) -> impl Future<Output = anyhow::Result<()>> + Send;
}
#[derive(Debug, Error)]
pub enum PersistenceError {
#[error(transparent)]
IoError(#[from] anyhow::Error),
#[error(transparent)]
SerializeError(postcard::Error),
#[error(transparent)]
DeserializeError(postcard::Error),
}
impl<A> PersistentSpawnExt for Context<A>
where
A: Actor,
{
async fn spawn_persistent<S, Args>(
&self,
_storage: &S,
actor_id: ActorId,
args: Args,
) -> Result<ActorRef<Args::Actor>, PersistenceError>
where
S: PersistentStorage,
Args: ActorArgs,
<Args as ActorArgs>::Actor: PersistentActor,
{
let (hdl, actor) = spawn_with_id_impl(actor_id, &self.this_hdl, args);
self.child_hdls.lock().unwrap().push(hdl.downgrade());
Ok(actor)
}
async fn respawn<S, B>(
&self,
storage: &S,
actor_id: ActorId,
runtime_args: B::RuntimeArgs,
) -> Result<ActorRef<B>, PersistenceError>
where
S: PersistentStorage,
B: Actor + PersistentActor,
{
let bytes = storage.try_read(actor_id).await?;
let snapshot: B::Snapshot =
postcard::from_bytes(&bytes).map_err(PersistenceError::DeserializeError)?;
let args = B::persistent_args(snapshot, runtime_args);
let actor = self.spawn_persistent(storage, actor_id, args).await?;
Ok(actor)
}
async fn respawn_or<S, Args>(
&self,
storage: &S,
actor_id: ActorId,
runtime_args: impl FnOnce() -> <<Args as ActorArgs>::Actor as PersistentActor>::RuntimeArgs
+ Send,
default_args: impl FnOnce() -> Args + Send,
) -> Result<ActorRef<Args::Actor>, PersistenceError>
where
S: PersistentStorage,
Args: ActorArgs,
<Args as ActorArgs>::Actor: PersistentActor,
{
let bytes = match storage.try_read(actor_id).await {
Err(err) => {
debug!(
actor = format_args!("{}({actor_id})", type_name::<< Args as ActorArgs >::Actor
> ()), % err, "failed to read snapshot"
);
return self
.spawn_persistent(storage, actor_id, default_args())
.await;
}
Ok(bytes) => bytes,
};
let snapshot: <Args::Actor as PersistentActor>::Snapshot = match postcard::from_bytes(
&bytes,
) {
Err(err) => {
warn!(
actor = format_args!("{}({actor_id})", type_name::<< Args as ActorArgs >::Actor
> ()), % err, "failed to deserialize snapshot"
);
return self
.spawn_persistent(storage, actor_id, default_args())
.await;
}
Ok(snapshot) => snapshot,
};
let args = Args::Actor::persistent_args(snapshot, runtime_args());
self.spawn_persistent(storage, actor_id, args).await
}
}
impl PersistentSpawnExt for RootContext {
async fn spawn_persistent<S, Args>(
&self,
_storage: &S,
actor_id: ActorId,
args: Args,
) -> Result<ActorRef<Args::Actor>, PersistenceError>
where
S: PersistentStorage,
Args: ActorArgs,
<Args as ActorArgs>::Actor: PersistentActor,
{
let (hdl, actor) = spawn_with_id_impl(actor_id, &self.this_hdl, args);
self.child_hdls.lock().unwrap().push(hdl.downgrade());
Ok(actor)
}
async fn respawn<S, B>(
&self,
storage: &S,
actor_id: ActorId,
runtime_args: B::RuntimeArgs,
) -> Result<ActorRef<B>, PersistenceError>
where
S: PersistentStorage,
B: Actor + PersistentActor,
{
let bytes = storage.try_read(actor_id).await?;
let snapshot: B::Snapshot =
postcard::from_bytes(&bytes).map_err(PersistenceError::DeserializeError)?;
let args = B::persistent_args(snapshot, runtime_args);
let actor = self.spawn_persistent(storage, actor_id, args).await?;
Ok(actor)
}
async fn respawn_or<S, Args>(
&self,
storage: &S,
actor_id: ActorId,
runtime_args: impl FnOnce() -> <<Args as ActorArgs>::Actor as PersistentActor>::RuntimeArgs,
default_args: impl FnOnce() -> Args,
) -> Result<ActorRef<Args::Actor>, PersistenceError>
where
S: PersistentStorage,
Args: ActorArgs,
<Args as ActorArgs>::Actor: PersistentActor,
{
let bytes = match storage.try_read(actor_id).await {
Err(err) => {
debug!(
actor = format_args!("{}({actor_id})", type_name::<< Args as ActorArgs >::Actor
> ()), % err, "failed to read snapshot"
);
return self
.spawn_persistent(storage, actor_id, default_args())
.await;
}
Ok(bytes) => bytes,
};
let snapshot: <Args::Actor as PersistentActor>::Snapshot = match postcard::from_bytes(
&bytes,
) {
Err(err) => {
warn!(
actor = format_args!("{}({actor_id})", type_name::<< Args as ActorArgs >::Actor
> ()), % err, "failed to deserialize snapshot"
);
return self
.spawn_persistent(storage, actor_id, default_args())
.await;
}
Ok(snapshot) => snapshot,
};
let args = Args::Actor::persistent_args(snapshot, runtime_args());
self.spawn_persistent(storage, actor_id, args).await
}
}
impl<A> PersistentContextExt<A> for Context<A>
where
A: Actor + PersistentActor,
{
fn save_snapshot<S: PersistentStorage>(
&self,
storage: &S,
state: &A,
) -> impl Future<Output = anyhow::Result<()>> + Send {
let snapshot = A::Snapshot::from(state);
let actor_id = self.id();
async move {
storage
.try_write(actor_id, postcard::to_stdvec(&snapshot)?)
.await?;
Ok(())
}
}
}