atomr_persistence/
persistent_actor.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6
7use crate::journal::{Journal, JournalError, PersistentRepr};
8use crate::snapshot::SnapshotStore;
9
10#[async_trait]
11pub trait PersistentActor: Send + 'static {
12 type Command: Send + 'static;
13 type Event: Send + Clone + 'static;
14 type State: Default + Send + 'static;
15
16 fn persistence_id(&self) -> String;
17
18 fn command_to_events(&self, state: &Self::State, cmd: Self::Command) -> Vec<Self::Event>;
19
20 fn apply_event(state: &mut Self::State, event: &Self::Event);
21
22 fn encode_event(event: &Self::Event) -> Vec<u8>;
23 fn decode_event(bytes: &[u8]) -> Self::Event;
24
25 async fn recover<J: Journal>(
26 &self,
27 journal: Arc<J>,
28 state: &mut Self::State,
29 writer_uuid: &str,
30 ) -> Result<u64, JournalError> {
31 let highest = journal.highest_sequence_nr(&self.persistence_id(), 0).await?;
32 let events = journal.replay_messages(&self.persistence_id(), 1, highest, u64::MAX).await?;
33 for e in &events {
34 let evt = Self::decode_event(&e.payload);
35 Self::apply_event(state, &evt);
36 }
37 let _ = writer_uuid;
38 Ok(highest)
39 }
40
41 async fn handle_command<J: Journal>(
42 &self,
43 journal: Arc<J>,
44 state: &mut Self::State,
45 next_seq: &mut u64,
46 writer_uuid: &str,
47 cmd: Self::Command,
48 ) -> Result<(), JournalError> {
49 let events = self.command_to_events(state, cmd);
50 let mut reprs = Vec::with_capacity(events.len());
51 for e in &events {
52 *next_seq += 1;
53 reprs.push(PersistentRepr {
54 persistence_id: self.persistence_id(),
55 sequence_nr: *next_seq,
56 payload: Self::encode_event(e),
57 manifest: "evt".into(),
58 writer_uuid: writer_uuid.into(),
59 deleted: false,
60 tags: Vec::new(),
61 });
62 }
63 journal.write_messages(reprs).await?;
64 for e in &events {
65 Self::apply_event(state, e);
66 }
67 Ok(())
68 }
69
70 async fn save_snapshot<S: SnapshotStore>(&self, store: Arc<S>, sequence_nr: u64, payload: Vec<u8>) {
71 store
72 .save(
73 crate::snapshot::SnapshotMetadata {
74 persistence_id: self.persistence_id(),
75 sequence_nr,
76 timestamp: 0,
77 },
78 payload,
79 )
80 .await;
81 }
82}