1pub mod provider;
2pub mod snapshot;
3pub mod storage;
4pub mod types;
5
6use crate::actor::context::ActorContext;
7use crate::actor::message::{Message, MessageUnwrapErr, MessageWrapErr};
8use crate::persistent::journal::snapshot::Snapshot;
9use crate::persistent::journal::storage::{JournalEntry, JournalStorageRef};
10use crate::persistent::journal::types::{init_journal_types, JournalTypes};
11use crate::persistent::{PersistentActor, Recover, RecoverSnapshot};
12
13use crate::actor::metrics::ActorMetrics;
14use crate::actor::Actor;
15use crate::persistent::batch::EventBatch;
16use std::error::Error;
17use std::fmt::{Display, Formatter};
18use std::marker::PhantomData;
19use std::ops::Range;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23pub mod proto;
24
25pub struct Journal<A: PersistentActor> {
26 persistence_id: String,
27 last_sequence_id: i64,
28 last_snapshot_sequence_id: Option<i64>,
29 storage: JournalStorageRef,
30 types: Arc<JournalTypes<A>>,
31}
32
33impl<A: PersistentActor> Journal<A> {
34 pub fn new(persistence_id: String, storage: JournalStorageRef) -> Self {
35 let last_sequence_id = 0;
36 let last_snapshot_sequence_id = None;
37 let types = init_journal_types::<A>();
38 Self {
39 persistence_id,
40 last_sequence_id,
41 last_snapshot_sequence_id,
42 storage,
43 types,
44 }
45 }
46
47 pub fn get_types(&self) -> Arc<JournalTypes<A>> {
48 self.types.clone()
49 }
50
51 pub fn last_sequence_id(&self) -> i64 {
52 self.last_sequence_id
53 }
54}
55
56type RecoveryHandlerRef<A> = Arc<dyn RecoveryHandler<A>>;
57
58#[derive(Debug)]
59pub enum RecoveryErr {
60 MessageDeserialisation {
61 error: MessageUnwrapErr,
62 message_type: &'static str,
63 actor_type: &'static str,
64 message_sequence_id: i64,
65 },
66
67 SnapshotDeserialisation {
68 error: MessageUnwrapErr,
69 snapshot_type: &'static str,
70 actor_type: &'static str,
71 },
72
73 Snapshot(anyhow::Error),
74 Messages(anyhow::Error),
75}
76
77impl Display for RecoveryErr {
78 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
79 match &self {
80 RecoveryErr::MessageDeserialisation {
81 error,
82 message_type,
83 actor_type,
84 message_sequence_id,
85 } => {
86 write!(f, "Message deserialisation error (message_type={message_type}, actor_type={actor_type}, sequence_id={sequence_id}) deserialisation error: {error}",
87 error = error,
88 message_type = message_type,
89 actor_type = actor_type,
90 sequence_id = message_sequence_id
91 )
92 }
93
94 RecoveryErr::SnapshotDeserialisation {
95 error,
96 snapshot_type,
97 actor_type,
98 } => {
99 write!(f, "Snapshot deserialisation error, snapshot_type={snapshot_type}, actor_type={actor_type}, deserialisation error: {error}",
100 error = error,
101 snapshot_type = snapshot_type,
102 actor_type = actor_type
103 )
104 }
105
106 RecoveryErr::Snapshot(e) => {
107 write!(f, "Snapshot recovery error: {error}", error = e)
108 }
109
110 RecoveryErr::Messages(e) => {
111 write!(f, "Message recovery error: {error}", error = e)
112 }
113 }
114 }
115}
116
117impl Error for RecoveryErr {}
118
119#[async_trait]
120pub trait RecoveryHandler<A>: 'static + Send + Sync {
121 async fn recover(
122 &self,
123 actor: &mut A,
124 sequence_id: i64,
125 bytes: Vec<u8>,
126 ctx: &mut ActorContext,
127 ) -> Result<(), RecoveryErr>;
128}
129
130pub struct MessageRecoveryHandler<A: PersistentActor, M: Message>(PhantomData<A>, PhantomData<M>);
131
132pub struct SnapshotRecoveryHandler<A: PersistentActor, S: Snapshot>(PhantomData<A>, PhantomData<S>);
133
134impl<A: PersistentActor, M: Message> MessageRecoveryHandler<A, M> {
135 pub fn new() -> Self {
136 MessageRecoveryHandler(PhantomData, PhantomData)
137 }
138}
139
140impl<A: PersistentActor, S: Snapshot> SnapshotRecoveryHandler<A, S> {
141 pub fn new() -> Self {
142 SnapshotRecoveryHandler(PhantomData, PhantomData)
143 }
144}
145
146pub struct RecoveredPayload<A: PersistentActor> {
147 bytes: Vec<u8>,
148 sequence: i64,
149 handler: RecoveryHandlerRef<A>,
150}
151
152impl<A: PersistentActor> RecoveredPayload<A> {
153 pub async fn recover(self, actor: &mut A, ctx: &mut ActorContext) -> Result<(), RecoveryErr> {
154 self.handler
155 .recover(actor, self.sequence, self.bytes, ctx)
156 .await
157 }
158}
159
160#[derive(Debug)]
161pub enum PersistErr {
162 Storage(anyhow::Error),
163 Serialisation(MessageWrapErr),
164 ActorStopping(Box<PersistErr>),
165 NotConfigured(),
166}
167
168impl Display for PersistErr {
169 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170 write!(f, "({:?})", &self)
171 }
172}
173
174impl Error for PersistErr {}
175
176type BytesRef = Arc<Vec<u8>>;
177
178#[derive(Clone, Debug)]
179pub struct ReadMessages<'a> {
180 pub persistence_id: Option<&'a str>,
181 pub read: Read,
182}
183
184impl<'a> ReadMessages<'a> {
185 pub fn message(persistence_id: Option<&'a str>, sequence_id: i64) -> Self {
186 Self {
187 persistence_id,
188 read: Read::Message { sequence_id },
189 }
190 }
191
192 pub fn range(persistence_id: Option<&'a str>, range: Range<i64>) -> Self {
193 Self {
194 persistence_id,
195 read: Read::Range(range),
196 }
197 }
198}
199
200#[derive(Clone, Debug)]
201pub enum Read {
202 Message { sequence_id: i64 },
203 Range(Range<i64>),
204}
205
206impl<A: PersistentActor> Journal<A> {
207 pub async fn persist_batch(&mut self, batch: &EventBatch<A>) -> Result<(), PersistErr> {
208 let mut sequence_id = self.last_sequence_id;
209 let batch = batch
210 .entries()
211 .iter()
212 .map(|e| {
213 sequence_id += 1;
214 JournalEntry {
215 sequence: sequence_id,
216 payload_type: e.payload_type.clone(),
217 bytes: e.bytes.clone(),
218 }
219 })
220 .collect();
221
222 let res = self
223 .storage
224 .write_message_batch(&self.persistence_id, batch)
225 .await
226 .map_err(|e| PersistErr::Storage(e));
227
228 if res.is_ok() {
229 self.last_sequence_id = sequence_id;
230 }
231
232 res
233 }
234
235 pub async fn read_messages(
236 &mut self,
237 args: ReadMessages<'_>,
238 ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
239 let persistence_id = args.persistence_id.unwrap_or(self.persistence_id.as_ref());
240 match args.read {
241 Read::Message { sequence_id } => Ok(self
242 .storage
243 .read_message(persistence_id, sequence_id)
244 .await?
245 .map(|m| vec![m])),
246
247 Read::Range(range) => Ok(self
248 .storage
249 .read_messages(persistence_id, range.start, range.end)
250 .await?),
251 }
252 }
253
254 pub async fn persist_message<M: Message>(&mut self, bytes: BytesRef) -> Result<(), PersistErr>
255 where
256 A: Recover<M>,
257 {
258 trace!(
259 "persisting message, persistence_id={}, message_type={}",
260 &self.persistence_id,
261 M::type_name()
262 );
263
264 let payload_type = self
265 .types
266 .message_type_mapping::<M>()
267 .expect("message type not configured");
268
269 self.storage
270 .write_message(
271 &self.persistence_id,
272 JournalEntry {
273 sequence: self.last_sequence_id,
274 payload_type: payload_type.clone(),
275 bytes,
276 },
277 )
278 .await?;
279
280 debug!(
281 "persisted message, persistence_id={}, message_type={}",
282 &self.persistence_id,
283 M::type_name()
284 );
285
286 self.last_sequence_id += 1;
287 Ok(())
288 }
289
290 pub async fn persist_snapshot<S: Snapshot>(
291 &mut self,
292 bytes: BytesRef,
293 ) -> Result<(), PersistErr> {
294 debug!(
295 "persisting snapshot, persistence_id={}",
296 &self.persistence_id
297 );
298
299 let payload_type = self
300 .types
301 .snapshot_type_mapping::<S>()
302 .expect("snapshot type not configured");
303
304 let sequence = self.last_sequence_id + 1;
305
306 self.storage
307 .write_snapshot(
308 &self.persistence_id,
309 JournalEntry {
310 sequence,
311 payload_type,
312 bytes,
313 },
314 )
315 .await?;
316
317 self.last_sequence_id = sequence;
318 self.last_snapshot_sequence_id = Some(sequence);
319 Ok(())
320 }
321
322 pub async fn recover_snapshot(&mut self) -> Result<Option<RecoveredPayload<A>>, anyhow::Error> {
323 if let Some(raw_snapshot) = self
324 .storage
325 .read_latest_snapshot(&self.persistence_id)
326 .await?
327 {
328 let handler = self
329 .types
330 .recoverable_snapshots()
331 .get(raw_snapshot.payload_type.as_ref());
332
333 let sequence = raw_snapshot.sequence;
334 let bytes =
335 Arc::try_unwrap(raw_snapshot.bytes).map_or_else(|e| e.as_ref().clone(), |s| s);
336
337 self.last_sequence_id = sequence;
338 self.last_snapshot_sequence_id = Some(sequence);
339
340 debug!(
341 "snapshot recovered (persistence_id={}), last sequence={}, type={}",
342 &self.persistence_id, &self.last_sequence_id, &raw_snapshot.payload_type
343 );
344
345 Ok(handler.map(|handler| RecoveredPayload {
346 bytes,
347 sequence,
348 handler: handler.clone(),
349 }))
350 } else {
351 Ok(None)
352 }
353 }
354
355 pub async fn recover_messages(
356 &mut self,
357 ) -> Result<Option<Vec<RecoveredPayload<A>>>, anyhow::Error> {
358 if let Some(messages) = self
362 .storage
363 .read_latest_messages(&self.persistence_id, self.last_sequence_id)
364 .await?
365 {
366 let starting_sequence = self.last_sequence_id;
367 let mut recoverable_messages = vec![];
368 for entry in messages {
369 self.last_sequence_id = entry.sequence;
370
371 if let Some(handler) = self
372 .types
373 .recoverable_messages()
374 .get(entry.payload_type.as_ref())
375 {
376 trace!(
377 "message recovered (persistence_id={}), sequence={}, starting_sequence={} type={}",
378 &self.persistence_id,
379 &self.last_sequence_id,
380 starting_sequence,
381 &entry.payload_type
382 );
383
384 let bytes =
385 Arc::try_unwrap(entry.bytes).map_or_else(|e| e.as_ref().clone(), |s| s);
386
387 recoverable_messages.push(RecoveredPayload {
388 bytes,
389 sequence: entry.sequence,
390 handler: handler.clone(),
391 })
392 } else {
393 error!("persistence_id={} recovered message (type={}) but actor is not configured to process it", &self.persistence_id, &entry.payload_type);
394 }
396 }
397
398 trace!(
399 "recovery complete, last_sequence_id={}",
400 &self.last_sequence_id
401 );
402
403 if recoverable_messages.len() == 0 {
404 Ok(None)
405 } else {
406 Ok(Some(recoverable_messages))
407 }
408 } else {
409 Ok(None)
410 }
411 }
412
413 pub async fn clear(&mut self) -> bool {
414 self.storage.delete_all(&self.persistence_id).await.is_ok()
415 }
416
417 pub async fn clear_old_messages(&mut self) -> bool {
418 if let Some(snapshot_sequence_id) = self.last_snapshot_sequence_id {
419 self.storage
420 .delete_messages_to(&self.persistence_id, snapshot_sequence_id)
421 .await
422 .is_ok()
423 } else {
424 false
426 }
427 }
428}
429
430#[async_trait]
431impl<A: PersistentActor, M: Message> RecoveryHandler<A> for MessageRecoveryHandler<A, M>
432where
433 A: Recover<M>,
434{
435 async fn recover(
436 &self,
437 actor: &mut A,
438 sequence_id: i64,
439 bytes: Vec<u8>,
440 ctx: &mut ActorContext,
441 ) -> Result<(), RecoveryErr> {
442 let message =
443 M::from_bytes(bytes).map_err(|error| RecoveryErr::MessageDeserialisation {
444 error,
445 message_type: M::type_name(),
446 actor_type: A::type_name(),
447 message_sequence_id: sequence_id,
448 })?;
449
450 let start = Instant::now();
451
452 Recover::recover(actor, message, ctx).await;
453 let message_processing_took = start.elapsed();
454
455 ActorMetrics::incr_messages_processed(
456 A::type_name(),
457 M::type_name(),
458 Duration::default(),
459 message_processing_took,
460 );
461
462 Ok(())
463 }
464}
465
466#[async_trait]
467impl<A: PersistentActor, S: Snapshot> RecoveryHandler<A> for SnapshotRecoveryHandler<A, S>
468where
469 A: RecoverSnapshot<S>,
470{
471 async fn recover(
472 &self,
473 actor: &mut A,
474 _sequence_id: i64,
475 bytes: Vec<u8>,
476 ctx: &mut ActorContext,
477 ) -> Result<(), RecoveryErr> {
478 RecoverSnapshot::recover(
479 actor,
480 S::from_remote_envelope(bytes).map_err(|error| {
481 RecoveryErr::SnapshotDeserialisation {
482 error,
483 snapshot_type: S::type_name(),
484 actor_type: A::type_name(),
485 }
486 })?,
487 ctx,
488 )
489 .await;
490
491 Ok(())
492 }
493}
494
495impl From<anyhow::Error> for PersistErr {
496 fn from(e: anyhow::Error) -> Self {
497 Self::Storage(e)
498 }
499}