Skip to main content

coerce/persistent/journal/
mod.rs

1pub mod provider;
2pub mod snapshot;
3pub mod storage;
4pub mod types;
5
6use crate::actor::context::ActorContext;
7use crate::actor::message::{Message, MessageUnwrapErr, MessageWrapErr};
8use crate::persistent::journal::snapshot::Snapshot;
9use crate::persistent::journal::storage::{JournalEntry, JournalStorageRef};
10use crate::persistent::journal::types::{init_journal_types, JournalTypes};
11use crate::persistent::{PersistentActor, Recover, RecoverSnapshot};
12
13use crate::actor::metrics::ActorMetrics;
14use crate::actor::Actor;
15use crate::persistent::batch::EventBatch;
16use std::error::Error;
17use std::fmt::{Display, Formatter};
18use std::marker::PhantomData;
19use std::ops::Range;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23pub mod proto;
24
25pub struct Journal<A: PersistentActor> {
26    persistence_id: String,
27    last_sequence_id: i64,
28    last_snapshot_sequence_id: Option<i64>,
29    storage: JournalStorageRef,
30    types: Arc<JournalTypes<A>>,
31}
32
33impl<A: PersistentActor> Journal<A> {
34    pub fn new(persistence_id: String, storage: JournalStorageRef) -> Self {
35        let last_sequence_id = 0;
36        let last_snapshot_sequence_id = None;
37        let types = init_journal_types::<A>();
38        Self {
39            persistence_id,
40            last_sequence_id,
41            last_snapshot_sequence_id,
42            storage,
43            types,
44        }
45    }
46
47    pub fn get_types(&self) -> Arc<JournalTypes<A>> {
48        self.types.clone()
49    }
50
51    pub fn last_sequence_id(&self) -> i64 {
52        self.last_sequence_id
53    }
54}
55
56type RecoveryHandlerRef<A> = Arc<dyn RecoveryHandler<A>>;
57
58#[derive(Debug)]
59pub enum RecoveryErr {
60    MessageDeserialisation {
61        error: MessageUnwrapErr,
62        message_type: &'static str,
63        actor_type: &'static str,
64        message_sequence_id: i64,
65    },
66
67    SnapshotDeserialisation {
68        error: MessageUnwrapErr,
69        snapshot_type: &'static str,
70        actor_type: &'static str,
71    },
72
73    Snapshot(anyhow::Error),
74    Messages(anyhow::Error),
75}
76
77impl Display for RecoveryErr {
78    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
79        match &self {
80            RecoveryErr::MessageDeserialisation {
81                error,
82                message_type,
83                actor_type,
84                message_sequence_id,
85            } => {
86                write!(f, "Message deserialisation error (message_type={message_type}, actor_type={actor_type}, sequence_id={sequence_id}) deserialisation error: {error}",
87                       error = error,
88                       message_type = message_type,
89                       actor_type = actor_type,
90                       sequence_id = message_sequence_id
91                )
92            }
93
94            RecoveryErr::SnapshotDeserialisation {
95                error,
96                snapshot_type,
97                actor_type,
98            } => {
99                write!(f, "Snapshot deserialisation error, snapshot_type={snapshot_type}, actor_type={actor_type}, deserialisation error: {error}",
100                       error = error,
101                       snapshot_type = snapshot_type,
102                       actor_type = actor_type
103                )
104            }
105
106            RecoveryErr::Snapshot(e) => {
107                write!(f, "Snapshot recovery error: {error}", error = e)
108            }
109
110            RecoveryErr::Messages(e) => {
111                write!(f, "Message recovery error: {error}", error = e)
112            }
113        }
114    }
115}
116
117impl Error for RecoveryErr {}
118
119#[async_trait]
120pub trait RecoveryHandler<A>: 'static + Send + Sync {
121    async fn recover(
122        &self,
123        actor: &mut A,
124        sequence_id: i64,
125        bytes: Vec<u8>,
126        ctx: &mut ActorContext,
127    ) -> Result<(), RecoveryErr>;
128}
129
130pub struct MessageRecoveryHandler<A: PersistentActor, M: Message>(PhantomData<A>, PhantomData<M>);
131
132pub struct SnapshotRecoveryHandler<A: PersistentActor, S: Snapshot>(PhantomData<A>, PhantomData<S>);
133
134impl<A: PersistentActor, M: Message> MessageRecoveryHandler<A, M> {
135    pub fn new() -> Self {
136        MessageRecoveryHandler(PhantomData, PhantomData)
137    }
138}
139
140impl<A: PersistentActor, S: Snapshot> SnapshotRecoveryHandler<A, S> {
141    pub fn new() -> Self {
142        SnapshotRecoveryHandler(PhantomData, PhantomData)
143    }
144}
145
146pub struct RecoveredPayload<A: PersistentActor> {
147    bytes: Vec<u8>,
148    sequence: i64,
149    handler: RecoveryHandlerRef<A>,
150}
151
152impl<A: PersistentActor> RecoveredPayload<A> {
153    pub async fn recover(self, actor: &mut A, ctx: &mut ActorContext) -> Result<(), RecoveryErr> {
154        self.handler
155            .recover(actor, self.sequence, self.bytes, ctx)
156            .await
157    }
158}
159
160#[derive(Debug)]
161pub enum PersistErr {
162    Storage(anyhow::Error),
163    Serialisation(MessageWrapErr),
164    ActorStopping(Box<PersistErr>),
165    NotConfigured(),
166}
167
168impl Display for PersistErr {
169    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170        write!(f, "({:?})", &self)
171    }
172}
173
174impl Error for PersistErr {}
175
176type BytesRef = Arc<Vec<u8>>;
177
178#[derive(Clone, Debug)]
179pub struct ReadMessages<'a> {
180    pub persistence_id: Option<&'a str>,
181    pub read: Read,
182}
183
184impl<'a> ReadMessages<'a> {
185    pub fn message(persistence_id: Option<&'a str>, sequence_id: i64) -> Self {
186        Self {
187            persistence_id,
188            read: Read::Message { sequence_id },
189        }
190    }
191
192    pub fn range(persistence_id: Option<&'a str>, range: Range<i64>) -> Self {
193        Self {
194            persistence_id,
195            read: Read::Range(range),
196        }
197    }
198}
199
200#[derive(Clone, Debug)]
201pub enum Read {
202    Message { sequence_id: i64 },
203    Range(Range<i64>),
204}
205
206impl<A: PersistentActor> Journal<A> {
207    pub async fn persist_batch(&mut self, batch: &EventBatch<A>) -> Result<(), PersistErr> {
208        let mut sequence_id = self.last_sequence_id;
209        let batch = batch
210            .entries()
211            .iter()
212            .map(|e| {
213                sequence_id += 1;
214                JournalEntry {
215                    sequence: sequence_id,
216                    payload_type: e.payload_type.clone(),
217                    bytes: e.bytes.clone(),
218                }
219            })
220            .collect();
221
222        let res = self
223            .storage
224            .write_message_batch(&self.persistence_id, batch)
225            .await
226            .map_err(|e| PersistErr::Storage(e));
227
228        if res.is_ok() {
229            self.last_sequence_id = sequence_id;
230        }
231
232        res
233    }
234
235    pub async fn read_messages(
236        &mut self,
237        args: ReadMessages<'_>,
238    ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
239        let persistence_id = args.persistence_id.unwrap_or(self.persistence_id.as_ref());
240        match args.read {
241            Read::Message { sequence_id } => Ok(self
242                .storage
243                .read_message(persistence_id, sequence_id)
244                .await?
245                .map(|m| vec![m])),
246
247            Read::Range(range) => Ok(self
248                .storage
249                .read_messages(persistence_id, range.start, range.end)
250                .await?),
251        }
252    }
253
254    pub async fn persist_message<M: Message>(&mut self, bytes: BytesRef) -> Result<(), PersistErr>
255    where
256        A: Recover<M>,
257    {
258        trace!(
259            "persisting message, persistence_id={}, message_type={}",
260            &self.persistence_id,
261            M::type_name()
262        );
263
264        let payload_type = self
265            .types
266            .message_type_mapping::<M>()
267            .expect("message type not configured");
268
269        self.storage
270            .write_message(
271                &self.persistence_id,
272                JournalEntry {
273                    sequence: self.last_sequence_id,
274                    payload_type: payload_type.clone(),
275                    bytes,
276                },
277            )
278            .await?;
279
280        debug!(
281            "persisted message, persistence_id={}, message_type={}",
282            &self.persistence_id,
283            M::type_name()
284        );
285
286        self.last_sequence_id += 1;
287        Ok(())
288    }
289
290    pub async fn persist_snapshot<S: Snapshot>(
291        &mut self,
292        bytes: BytesRef,
293    ) -> Result<(), PersistErr> {
294        debug!(
295            "persisting snapshot, persistence_id={}",
296            &self.persistence_id
297        );
298
299        let payload_type = self
300            .types
301            .snapshot_type_mapping::<S>()
302            .expect("snapshot type not configured");
303
304        let sequence = self.last_sequence_id + 1;
305
306        self.storage
307            .write_snapshot(
308                &self.persistence_id,
309                JournalEntry {
310                    sequence,
311                    payload_type,
312                    bytes,
313                },
314            )
315            .await?;
316
317        self.last_sequence_id = sequence;
318        self.last_snapshot_sequence_id = Some(sequence);
319        Ok(())
320    }
321
322    pub async fn recover_snapshot(&mut self) -> Result<Option<RecoveredPayload<A>>, anyhow::Error> {
323        if let Some(raw_snapshot) = self
324            .storage
325            .read_latest_snapshot(&self.persistence_id)
326            .await?
327        {
328            let handler = self
329                .types
330                .recoverable_snapshots()
331                .get(raw_snapshot.payload_type.as_ref());
332
333            let sequence = raw_snapshot.sequence;
334            let bytes =
335                Arc::try_unwrap(raw_snapshot.bytes).map_or_else(|e| e.as_ref().clone(), |s| s);
336
337            self.last_sequence_id = sequence;
338            self.last_snapshot_sequence_id = Some(sequence);
339
340            debug!(
341                "snapshot recovered (persistence_id={}), last sequence={}, type={}",
342                &self.persistence_id, &self.last_sequence_id, &raw_snapshot.payload_type
343            );
344
345            Ok(handler.map(|handler| RecoveredPayload {
346                bytes,
347                sequence,
348                handler: handler.clone(),
349            }))
350        } else {
351            Ok(None)
352        }
353    }
354
355    pub async fn recover_messages(
356        &mut self,
357    ) -> Result<Option<Vec<RecoveredPayload<A>>>, anyhow::Error> {
358        // TODO: route journal recovery through a system that we can apply limiting to so we can only
359        //       recover {n} entities at a time, so we don't end up bringing down
360        //       the storage backend
361        if let Some(messages) = self
362            .storage
363            .read_latest_messages(&self.persistence_id, self.last_sequence_id)
364            .await?
365        {
366            let starting_sequence = self.last_sequence_id;
367            let mut recoverable_messages = vec![];
368            for entry in messages {
369                self.last_sequence_id = entry.sequence;
370
371                if let Some(handler) = self
372                    .types
373                    .recoverable_messages()
374                    .get(entry.payload_type.as_ref())
375                {
376                    trace!(
377                        "message recovered (persistence_id={}), sequence={}, starting_sequence={} type={}",
378                        &self.persistence_id,
379                        &self.last_sequence_id,
380                        starting_sequence,
381                        &entry.payload_type
382                    );
383
384                    let bytes =
385                        Arc::try_unwrap(entry.bytes).map_or_else(|e| e.as_ref().clone(), |s| s);
386
387                    recoverable_messages.push(RecoveredPayload {
388                        bytes,
389                        sequence: entry.sequence,
390                        handler: handler.clone(),
391                    })
392                } else {
393                    error!("persistence_id={} recovered message (type={}) but actor is not configured to process it",  &self.persistence_id, &entry.payload_type);
394                    // TODO: this should fail recovery
395                }
396            }
397
398            trace!(
399                "recovery complete, last_sequence_id={}",
400                &self.last_sequence_id
401            );
402
403            if recoverable_messages.len() == 0 {
404                Ok(None)
405            } else {
406                Ok(Some(recoverable_messages))
407            }
408        } else {
409            Ok(None)
410        }
411    }
412
413    pub async fn clear(&mut self) -> bool {
414        self.storage.delete_all(&self.persistence_id).await.is_ok()
415    }
416
417    pub async fn clear_old_messages(&mut self) -> bool {
418        if let Some(snapshot_sequence_id) = self.last_snapshot_sequence_id {
419            self.storage
420                .delete_messages_to(&self.persistence_id, snapshot_sequence_id)
421                .await
422                .is_ok()
423        } else {
424            // no messages to delete
425            false
426        }
427    }
428}
429
430#[async_trait]
431impl<A: PersistentActor, M: Message> RecoveryHandler<A> for MessageRecoveryHandler<A, M>
432where
433    A: Recover<M>,
434{
435    async fn recover(
436        &self,
437        actor: &mut A,
438        sequence_id: i64,
439        bytes: Vec<u8>,
440        ctx: &mut ActorContext,
441    ) -> Result<(), RecoveryErr> {
442        let message =
443            M::from_bytes(bytes).map_err(|error| RecoveryErr::MessageDeserialisation {
444                error,
445                message_type: M::type_name(),
446                actor_type: A::type_name(),
447                message_sequence_id: sequence_id,
448            })?;
449
450        let start = Instant::now();
451
452        Recover::recover(actor, message, ctx).await;
453        let message_processing_took = start.elapsed();
454
455        ActorMetrics::incr_messages_processed(
456            A::type_name(),
457            M::type_name(),
458            Duration::default(),
459            message_processing_took,
460        );
461
462        Ok(())
463    }
464}
465
466#[async_trait]
467impl<A: PersistentActor, S: Snapshot> RecoveryHandler<A> for SnapshotRecoveryHandler<A, S>
468where
469    A: RecoverSnapshot<S>,
470{
471    async fn recover(
472        &self,
473        actor: &mut A,
474        _sequence_id: i64,
475        bytes: Vec<u8>,
476        ctx: &mut ActorContext,
477    ) -> Result<(), RecoveryErr> {
478        RecoverSnapshot::recover(
479            actor,
480            S::from_remote_envelope(bytes).map_err(|error| {
481                RecoveryErr::SnapshotDeserialisation {
482                    error,
483                    snapshot_type: S::type_name(),
484                    actor_type: A::type_name(),
485                }
486            })?,
487            ctx,
488        )
489        .await;
490
491        Ok(())
492    }
493}
494
495impl From<anyhow::Error> for PersistErr {
496    fn from(e: anyhow::Error) -> Self {
497        Self::Storage(e)
498    }
499}