coerce 0.8.6

Async actor runtime and distributed systems framework
Documentation
use crate::actor::context::{ActorContext, ActorStatus};
use crate::actor::message::Message;
use crate::actor::system::ActorSystem;
use crate::actor::{Actor, ActorId, BoxedActorRef};

use crate::persistent::failure::{should_retry, PersistFailurePolicy, RecoveryFailurePolicy};
use crate::persistent::journal::snapshot::Snapshot;
use crate::persistent::journal::types::JournalTypes;
use crate::persistent::journal::{PersistErr, RecoveryErr};
use crate::persistent::recovery::{ActorRecovery, Recovery};

use crate::persistent::batch::EventBatch;
use crate::persistent::storage::JournalEntry;
use crate::persistent::ReadMessages;
use std::sync::Arc;

#[async_trait]
pub trait PersistentActor: 'static + Sized + Send + Sync {
    fn persistence_key(&self, ctx: &ActorContext) -> String {
        ctx.id().to_string()
    }

    fn configure(types: &mut JournalTypes<Self>);

    async fn pre_recovery(&mut self, _ctx: &mut ActorContext) {}

    async fn post_recovery(&mut self, _ctx: &mut ActorContext) {}

    async fn stopped(&mut self, _ctx: &mut ActorContext) {}

    async fn persist<M: Message>(
        &self,
        message: &M,
        ctx: &mut ActorContext,
    ) -> Result<(), PersistErr>
    where
        Self: Recover<M>,
    {
        let message_bytes = message.as_bytes();
        match message_bytes {
            Ok(bytes) => {
                let mut attempts = 1;
                let bytes = Arc::new(bytes);
                loop {
                    let result = ctx
                        .persistence_mut()
                        .journal_mut::<Self>()
                        .persist_message::<M>(bytes.clone())
                        .await;

                    if let Some(res) = check(result, &mut attempts, self, ctx).await {
                        return res;
                    }
                }
            }

            Err(e) => return Err(PersistErr::Serialisation(e)),
        }
    }

    async fn persist_batch(
        &self,
        batch: EventBatch<Self>,
        ctx: &mut ActorContext,
    ) -> Result<(), PersistErr> {
        if batch.entries().is_empty() {
            return Ok(());
        }
        let mut attempts = 1;
        loop {
            let result = ctx
                .persistence_mut()
                .journal_mut::<Self>()
                .persist_batch(&batch)
                .await;

            if let Some(res) = check(result, &mut attempts, self, ctx).await {
                return res;
            }
        }
    }

    async fn snapshot<S: Snapshot>(
        &self,
        snapshot: S,
        ctx: &mut ActorContext,
    ) -> Result<(), PersistErr>
    where
        Self: RecoverSnapshot<S>,
    {
        let snapshot_bytes = snapshot.into_remote_envelope();
        match snapshot_bytes {
            Ok(bytes) => {
                let bytes = Arc::new(bytes.into_bytes());

                let mut attempts = 1;
                loop {
                    let result = ctx
                        .persistence_mut()
                        .journal_mut::<Self>()
                        .persist_snapshot::<S>(bytes.clone())
                        .await;

                    if let Some(res) = check(result, &mut attempts, self, ctx).await {
                        return res;
                    }
                }
            }

            Err(e) => return Err(PersistErr::Serialisation(e)),
        }
    }

    async fn recover(&mut self, persistence_key: String, ctx: &mut ActorContext) -> Recovery<Self> {
        ActorRecovery::recover_journal(self, Some(persistence_key), ctx).await
    }

    fn last_sequence_id(&self, ctx: &mut ActorContext) -> i64 {
        ctx.persistence_mut()
            .journal_mut::<Self>()
            .last_sequence_id()
    }

    async fn read_messages<'a>(
        &self,
        args: ReadMessages<'a>,
        ctx: &mut ActorContext,
    ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
        ctx.persistence_mut()
            .journal_mut::<Self>()
            .read_messages(args)
            .await
    }

    async fn clear_old_messages(&mut self, ctx: &mut ActorContext) -> bool {
        ctx.persistence_mut()
            .journal_mut::<Self>()
            .clear_old_messages()
            .await
    }

    fn recovery_failure_policy(&self) -> RecoveryFailurePolicy {
        RecoveryFailurePolicy::default()
    }

    fn persist_failure_policy(&self) -> PersistFailurePolicy {
        PersistFailurePolicy::default()
    }

    fn event_batch(&self, ctx: &ActorContext) -> EventBatch<Self> {
        EventBatch::create(ctx)
    }

    async fn on_recovery_err(&mut self, _err: RecoveryErr, _ctx: &mut ActorContext) {}

    async fn on_recovery_failed(&mut self, _ctx: &mut ActorContext) {}

    async fn on_child_stopped(&mut self, _id: &ActorId, _ctx: &mut ActorContext) {}
}

#[async_trait]
pub trait Recover<M: Message> {
    async fn recover(&mut self, message: M, ctx: &mut ActorContext);
}

#[async_trait]
pub trait RecoverSnapshot<S: Snapshot> {
    async fn recover(&mut self, snapshot: S, ctx: &mut ActorContext);
}

async fn check<A: PersistentActor>(
    result: Result<(), PersistErr>,
    attempts: &mut usize,
    actor: &A,
    ctx: &mut ActorContext,
) -> Option<Result<(), PersistErr>> {
    match result {
        Ok(res) => return Some(Ok(res)),
        Err(e) => {
            let failure_policy = actor.persist_failure_policy();

            error!("persist failed, error={error}, actor_id={actor_id}, policy={failure_policy}, attempt={attempt}",
                error = e,
                actor_id = ctx.id(),
                failure_policy = &failure_policy,
                attempt = attempts
            );

            match failure_policy {
                PersistFailurePolicy::Retry(retry_policy) => {
                    if !should_retry(ctx, &attempts, retry_policy).await {
                        return Some(Err(e));
                    }
                }

                PersistFailurePolicy::ReturnErr => {
                    return Some(Err(e));
                }

                PersistFailurePolicy::StopActor => {
                    ctx.stop(None);
                    return Some(Err(PersistErr::ActorStopping(Box::new(e))));
                }

                PersistFailurePolicy::Panic => {
                    panic!("persist failed");
                }
            }

            *attempts += 1;
            None
        }
    }
}

#[async_trait]
impl<A: 'static + PersistentActor + Send + Sync> Actor for A
where
    A: Sized,
{
    fn new_context(
        &self,
        system: Option<ActorSystem>,
        status: ActorStatus,
        boxed_ref: BoxedActorRef,
    ) -> ActorContext {
        ActorContext::new(system, status, boxed_ref).with_persistence()
    }

    async fn started(&mut self, ctx: &mut ActorContext) {
        trace!("persistent actor starting, loading journal");

        self.pre_recovery(ctx).await;

        let persistence_key = self.persistence_key(ctx);
        let (snapshot, messages) = {
            let journal = self.recover(persistence_key.clone(), ctx).await;
            match journal {
                Recovery::Recovered(journal) => {
                    trace!(
                        "persistent actor ({}) recovered {} snapshot(s) and {} message(s)",
                        &persistence_key,
                        if journal.snapshot.is_some() { 1 } else { 0 },
                        journal.messages.as_ref().map_or(0, |m| m.len()),
                    );

                    (journal.snapshot, journal.messages)
                }
                Recovery::Disabled => {
                    trace!(
                        "persistent actor ({}) disabled message recovery",
                        &persistence_key,
                    );

                    (None, None)
                }
                Recovery::Failed => {
                    trace!("recovery failed, ctx_status={:?}", ctx.get_status());
                    self.on_recovery_failed(ctx).await;
                    return;
                }
            }
        };

        if let Some(snapshot) = snapshot {
            if let Err(e) = snapshot.recover(self, ctx).await {
                error!("Error while attempting to recover from a snapshot, error={error}, actor_id={actor_id}, persistence_key={persistence_key}",
                    error = &e,
                    actor_id = ctx.id(),
                    persistence_key = &persistence_key
                );

                self.on_recovery_err(e, ctx).await;
                ctx.stop(None);
                return;
            }
        }

        if let Some(messages) = messages {
            for message in messages {
                if let Err(e) = message.recover(self, ctx).await {
                    error!("Error while attempting to recover from a message, error={error}, actor_id={actor_id}, persistence_key={persistence_key}",
                        error = &e,
                        actor_id = ctx.id(),
                        persistence_key = &persistence_key
                    );

                    self.on_recovery_err(e, ctx).await;
                    ctx.stop(None);
                    return;
                }
            }
        }

        self.post_recovery(ctx).await;
    }

    async fn stopped(&mut self, ctx: &mut ActorContext) {
        trace!("persistent actor stopped");
        self.stopped(ctx).await
    }

    async fn on_child_stopped(&mut self, id: &ActorId, ctx: &mut ActorContext) {
        self.on_child_stopped(id, ctx).await
    }
}