Skip to main content

atomr_persistence/
persistent_actor.rs

1//! Persistent actor trait.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6
7use crate::journal::{Journal, JournalError, PersistentRepr};
8use crate::snapshot::SnapshotStore;
9
10#[async_trait]
11pub trait PersistentActor: Send + 'static {
12    type Command: Send + 'static;
13    type Event: Send + Clone + 'static;
14    type State: Default + Send + 'static;
15
16    fn persistence_id(&self) -> String;
17
18    fn command_to_events(&self, state: &Self::State, cmd: Self::Command) -> Vec<Self::Event>;
19
20    fn apply_event(state: &mut Self::State, event: &Self::Event);
21
22    fn encode_event(event: &Self::Event) -> Vec<u8>;
23    fn decode_event(bytes: &[u8]) -> Self::Event;
24
25    async fn recover<J: Journal>(
26        &self,
27        journal: Arc<J>,
28        state: &mut Self::State,
29        writer_uuid: &str,
30    ) -> Result<u64, JournalError> {
31        let highest = journal.highest_sequence_nr(&self.persistence_id(), 0).await?;
32        let events = journal.replay_messages(&self.persistence_id(), 1, highest, u64::MAX).await?;
33        for e in &events {
34            let evt = Self::decode_event(&e.payload);
35            Self::apply_event(state, &evt);
36        }
37        let _ = writer_uuid;
38        Ok(highest)
39    }
40
41    async fn handle_command<J: Journal>(
42        &self,
43        journal: Arc<J>,
44        state: &mut Self::State,
45        next_seq: &mut u64,
46        writer_uuid: &str,
47        cmd: Self::Command,
48    ) -> Result<(), JournalError> {
49        let events = self.command_to_events(state, cmd);
50        let mut reprs = Vec::with_capacity(events.len());
51        for e in &events {
52            *next_seq += 1;
53            reprs.push(PersistentRepr {
54                persistence_id: self.persistence_id(),
55                sequence_nr: *next_seq,
56                payload: Self::encode_event(e),
57                manifest: "evt".into(),
58                writer_uuid: writer_uuid.into(),
59                deleted: false,
60                tags: Vec::new(),
61            });
62        }
63        journal.write_messages(reprs).await?;
64        for e in &events {
65            Self::apply_event(state, e);
66        }
67        Ok(())
68    }
69
70    async fn save_snapshot<S: SnapshotStore>(&self, store: Arc<S>, sequence_nr: u64, payload: Vec<u8>) {
71        store
72            .save(
73                crate::snapshot::SnapshotMetadata {
74                    persistence_id: self.persistence_id(),
75                    sequence_nr,
76                    timestamp: 0,
77                },
78                payload,
79            )
80            .await;
81    }
82}