1use crate::{
4 database::{Collection, DbManager, State},
5 error::{Error, StoreOperation},
6};
7
8use ave_actors_actor::{
9 Actor, ActorContext, ActorPath, EncryptedKey, Error as ActorError, Event,
10 Handler, IntoActor, Message, Response,
11};
12
13use async_trait::async_trait;
14
15use borsh::{BorshDeserialize, BorshSerialize};
16
17use chacha20poly1305::{
18 XChaCha20Poly1305, XNonce,
19 aead::{Aead, AeadCore, KeyInit, OsRng},
20};
21
22use serde::{Deserialize, Serialize};
23
24use tracing::{debug, error, info_span, warn};
25
26use std::fmt::Debug;
27
28const NONCE_SIZE: usize = 24;
30
31fn store_error(operation: StoreOperation, reason: impl ToString) -> Error {
32 Error::Store {
33 operation,
34 reason: reason.to_string(),
35 }
36}
37
38fn actor_store_error(
39 operation: StoreOperation,
40 reason: impl ToString,
41) -> ActorError {
42 ActorError::StoreOperation {
43 operation: operation.to_string(),
44 reason: reason.to_string(),
45 }
46}
47
48#[derive(Debug, Clone)]
53pub enum PersistenceType {
54 Light,
59 Full,
64}
65
66pub struct LightPersistence;
68
69pub struct FullPersistence;
71
72pub trait Persistence {
74 fn get_persistence() -> PersistenceType;
76}
77
78impl Persistence for LightPersistence {
79 fn get_persistence() -> PersistenceType {
80 PersistenceType::Light
81 }
82}
83
84impl Persistence for FullPersistence {
85 fn get_persistence() -> PersistenceType {
86 PersistenceType::Full
87 }
88}
89
90#[derive(Debug)]
95pub struct InitializedActor<A>(A);
96
97impl<A> InitializedActor<A> {
98 pub(crate) const fn new(actor: A) -> Self {
99 Self(actor)
100 }
101}
102
103impl<A> IntoActor<A> for InitializedActor<A>
104where
105 A: PersistentActor,
106 A::Event: BorshSerialize + BorshDeserialize,
107{
108 fn into_actor(self) -> A {
109 self.0
110 }
111}
112
113#[async_trait]
122pub trait PersistentActor:
123 Actor + Handler<Self> + Debug + Clone + BorshSerialize + BorshDeserialize
124where
125 Self::Event: BorshSerialize + BorshDeserialize,
126{
127 type Persistence: Persistence;
129
130 type InitParams;
132
133 fn create_initial(params: Self::InitParams) -> Self;
138
139 fn initial(params: Self::InitParams) -> InitializedActor<Self>
144 where
145 Self: Sized,
146 {
147 InitializedActor::new(Self::create_initial(params))
148 }
149
150 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError>;
157
158 fn update(&mut self, state: Self) {
163 *self = state;
164 }
165
166 fn snapshot_every() -> Option<u64> {
174 Some(100)
175 }
176
177 fn compact_on_snapshot() -> bool {
183 false
184 }
185
186 async fn persist(
194 &mut self,
195 event: &Self::Event,
196 ctx: &mut ActorContext<Self>,
197 ) -> Result<(), ActorError> {
198 let store = ctx.get_child::<Store<Self>>("store").await?;
199
200 let prev_state = self.clone();
201
202 if let Err(e) = self.apply(event) {
203 self.update(prev_state.clone());
204 return Err(e);
205 }
206
207 let response = match Self::Persistence::get_persistence() {
208 PersistenceType::Light => {
209 match store
210 .ask(StoreCommand::PersistLight(
211 event.clone(),
212 self.clone(),
213 ))
214 .await
215 {
216 Ok(response) => response,
217 Err(e) => {
218 self.update(prev_state.clone());
219 return Err(actor_store_error(
220 StoreOperation::PersistLight,
221 e,
222 ));
223 }
224 }
225 }
226 PersistenceType::Full => {
227 match store
228 .ask(StoreCommand::PersistFullEvent {
229 event: event.clone(),
230 snapshot_every: Self::snapshot_every(),
231 })
232 .await
233 {
234 Ok(response) => response,
235 Err(e) => {
236 self.update(prev_state.clone());
237 return Err(actor_store_error(
238 StoreOperation::PersistFull,
239 e,
240 ));
241 }
242 }
243 }
244 };
245
246 match response {
247 StoreResponse::Persisted => Ok(()),
248 StoreResponse::SnapshotRequired => {
249 self.snapshot(ctx).await?;
250 Ok(())
251 }
252 _ => {
253 self.update(prev_state);
254 Err(ActorError::UnexpectedResponse {
255 path: ActorPath::from(format!(
256 "{}/store",
257 ctx.path().clone()
258 )),
259 expected:
260 "StoreResponse::Persisted | StoreResponse::SnapshotRequired"
261 .to_owned(),
262 })
263 }
264 }
265 }
266
267 async fn snapshot(
273 &self,
274 ctx: &mut ActorContext<Self>,
275 ) -> Result<(), ActorError> {
276 let store = ctx.get_child::<Store<Self>>("store").await?;
277 store
278 .ask(StoreCommand::Snapshot(self.clone()))
279 .await
280 .map_err(|e| actor_store_error(StoreOperation::Snapshot, e))?;
281 Ok(())
282 }
283
284 async fn start_store<C: Collection, S: State>(
290 &mut self,
291 name: &str,
292 prefix: Option<String>,
293 ctx: &mut ActorContext<Self>,
294 manager: impl DbManager<C, S>,
295 key_box: Option<EncryptedKey>,
296 ) -> Result<(), ActorError> {
297 let prefix = prefix.unwrap_or_else(|| ctx.path().key());
298
299 let store =
300 Store::<Self>::new(name, &prefix, manager, key_box, self.clone())
301 .map_err(|e| actor_store_error(StoreOperation::StoreInit, e))?;
302 let store = ctx.create_child("store", store).await?;
303 let response = store.ask(StoreCommand::Recover).await?;
304
305 if let StoreResponse::State(Some(state)) = response {
306 self.update(state);
307 }
308
309 Ok(())
310 }
311}
312
313pub struct Store<P>
320where
321 P: PersistentActor,
322 P::Event: BorshSerialize + BorshDeserialize,
323{
324 event_counter: u64,
329 state_counter: u64,
333 compacted_until: u64,
335 events: Box<dyn Collection>,
337 states: Box<dyn State>,
339 metadata: Box<dyn State>,
341 key_box: Option<EncryptedKey>,
344 initial_state: P,
347}
348
349impl<P> ave_actors_actor::NotPersistentActor for Store<P>
350where
351 P: PersistentActor,
352 P::Event: BorshSerialize + BorshDeserialize,
353{
354}
355
356#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
357struct StoreMetadata {
358 next_event_index: u64,
359 compacted_until: u64,
360}
361
362impl<P> Store<P>
363where
364 P: PersistentActor,
365 P::Event: BorshSerialize + BorshDeserialize,
366{
367 pub fn new<C, S>(
373 name: &str,
374 prefix: &str,
375 manager: impl DbManager<C, S>,
376 key_box: Option<EncryptedKey>,
377 initial_state: P,
378 ) -> Result<Self, Error>
379 where
380 C: Collection + 'static,
381 S: State + 'static,
382 {
383 let events =
384 manager.create_collection(&format!("{}_events", name), prefix)?;
385 let states =
386 manager.create_state(&format!("{}_states", name), prefix)?;
387 let metadata =
388 manager.create_state(&format!("{}_metadata", name), prefix)?;
389
390 let mut store = Self {
391 event_counter: 0,
392 state_counter: 0,
393 compacted_until: 0,
394 events: Box::new(events),
395 states: Box::new(states),
396 metadata: Box::new(metadata),
397 key_box,
398 initial_state,
399 };
400
401 let last_event_counter = if let Some((key, _)) = store.events.last()? {
405 key.parse::<u64>()
406 .map_err(|e| store_error(StoreOperation::ParseEventKey, e))?
407 + 1
408 } else {
409 0
410 };
411
412 let snapshot_counter = if let Some((_, counter)) = store.get_state()? {
413 counter
414 } else {
415 0
416 };
417
418 if let Some(metadata) = store.get_metadata()? {
419 store.event_counter =
420 last_event_counter.max(metadata.next_event_index);
421 store.compacted_until = metadata.compacted_until;
422 } else {
423 store.event_counter = last_event_counter.max(snapshot_counter);
424 }
425
426 debug!(
427 "Initializing Store with event_counter: {}, compacted_until: {}",
428 store.event_counter, store.compacted_until
429 );
430
431 Ok(store)
432 }
433
434 const fn pending_events_since_snapshot(&self) -> u64 {
435 self.event_counter.saturating_sub(self.state_counter)
436 }
437
438 fn get_metadata(&self) -> Result<Option<StoreMetadata>, Error> {
439 let data = match self.metadata.get() {
440 Ok(data) => data,
441 Err(Error::EntryNotFound { .. }) => return Ok(None),
442 Err(err) => return Err(err),
443 };
444
445 let bytes = if let Some(key_box) = &self.key_box {
446 self.decrypt(key_box, data.as_slice())?
447 } else {
448 data
449 };
450
451 let metadata: StoreMetadata =
452 borsh::from_slice(&bytes).map_err(|e| {
453 error!("Can't decode metadata: {}", e);
454 store_error(StoreOperation::DecodeState, e)
455 })?;
456
457 Ok(Some(metadata))
458 }
459
460 fn persist_metadata(&mut self) -> Result<(), Error> {
461 let metadata = StoreMetadata {
462 next_event_index: self.event_counter,
463 compacted_until: self.compacted_until,
464 };
465 let data = borsh::to_vec(&metadata).map_err(|e| {
466 error!("Can't encode metadata: {}", e);
467 store_error(StoreOperation::EncodeActor, e)
468 })?;
469
470 let bytes = if let Some(key_box) = &self.key_box {
471 self.encrypt(key_box, data.as_slice())?
472 } else {
473 data
474 };
475
476 self.metadata.put(&bytes)
477 }
478
479 fn compact_to_snapshot(&mut self) -> Result<(), Error> {
480 for idx in self.compacted_until..self.state_counter {
481 let key = format!("{:020}", idx);
482 match self.events.del(&key) {
483 Ok(()) | Err(Error::EntryNotFound { .. }) => {
484 self.compacted_until = idx + 1;
485 }
486 Err(err) => return Err(err),
487 }
488 }
489 Ok(())
490 }
491
492 fn persist<E>(&mut self, event: &E) -> Result<(), Error>
494 where
495 E: Event + BorshSerialize + BorshDeserialize,
496 {
497 debug!("Persisting event: {:?}", event);
498
499 let bytes = borsh::to_vec(event).map_err(|e| {
500 error!("Can't encode event: {}", e);
501 store_error(StoreOperation::EncodeEvent, e)
502 })?;
503
504 let bytes = if let Some(key_box) = &self.key_box {
505 self.encrypt(key_box, &bytes)?
506 } else {
507 bytes
508 };
509
510 let next_event_number = self.event_counter;
513
514 debug!(
515 "Persisting event {} at index {}",
516 std::any::type_name::<E>(),
517 next_event_number
518 );
519
520 let result = self
522 .events
523 .put(&format!("{:020}", next_event_number), &bytes);
524
525 if result.is_ok() {
527 self.event_counter += 1;
528 debug!(
529 "Successfully persisted event, event_counter now: {}",
530 self.event_counter
531 );
532 }
533
534 result
535 }
536
537 fn persist_state<E>(&mut self, event: &E, state: &P) -> Result<(), Error>
539 where
540 E: Event + BorshSerialize + BorshDeserialize,
541 {
542 debug!("Persisting event: {:?}", event);
543
544 let bytes = borsh::to_vec(event).map_err(|e| {
545 error!("Can't encode event: {}", e);
546 store_error(StoreOperation::EncodeEvent, e)
547 })?;
548
549 let bytes = if let Some(key_box) = &self.key_box {
550 self.encrypt(key_box, &bytes)?
551 } else {
552 bytes
553 };
554
555 let next_event_number = self.event_counter;
558
559 debug!(
560 "Persisting event {} at index {} with LightPersistence",
561 std::any::type_name::<E>(),
562 next_event_number
563 );
564
565 let event_key = format!("{:020}", next_event_number);
567 let result = self.events.put(&event_key, &bytes);
568
569 if result.is_ok() {
571 self.event_counter += 1;
572 debug!(
573 "Successfully persisted event, event_counter now: {}",
574 self.event_counter
575 );
576 } else {
577 return result;
578 }
579
580 if let Err(snapshot_err) = self.snapshot(state) {
583 self.event_counter = next_event_number;
584 if let Err(rollback_err) = self.events.del(&event_key) {
585 return Err(store_error(
586 StoreOperation::RollbackPersistLight,
587 format!(
588 "snapshot failed: {}; rollback delete failed: {}",
589 snapshot_err, rollback_err
590 ),
591 ));
592 }
593 return Err(snapshot_err);
594 }
595
596 Ok(())
597 }
598
599 fn last_event(&self) -> Result<Option<P::Event>, Error> {
601 if let Some((_, data)) = self.events.last()? {
602 let data = if let Some(key_box) = &self.key_box {
603 self.decrypt(key_box, data.as_slice())?
604 } else {
605 data
606 };
607
608 let event: P::Event = borsh::from_slice(&data).map_err(|e| {
609 error!("Can't decode event: {}", e);
610 store_error(StoreOperation::DecodeEvent, e)
611 })?;
612
613 Ok(Some(event))
614 } else {
615 Ok(None)
616 }
617 }
618
619 fn get_state(&self) -> Result<Option<(P, u64)>, Error> {
620 let data = match self.states.get() {
621 Ok(data) => data,
622 Err(e) => {
623 if let Error::EntryNotFound { .. } = e {
624 return Ok(None);
625 } else {
626 return Err(e);
627 }
628 }
629 };
630
631 let bytes = if let Some(key_box) = &self.key_box {
632 self.decrypt(key_box, data.as_slice())?
633 } else {
634 data
635 };
636
637 let state: (P, u64) = borsh::from_slice(&bytes).map_err(|e| {
638 error!("Can't decode state: {}", e);
639 store_error(StoreOperation::DecodeState, e)
640 })?;
641
642 Ok(Some(state))
643 }
644
645 fn events(&self, from: u64, to: u64) -> Result<Vec<P::Event>, Error> {
647 if from > to {
648 return Ok(Vec::new());
649 }
650
651 let mut events = Vec::new();
652
653 for i in from..=to {
654 let key = format!("{:020}", i);
655 let data = self.events.get(&key)?;
656 let data = if let Some(key_box) = &self.key_box {
657 self.decrypt(key_box, data.as_slice())?
658 } else {
659 data
660 };
661
662 let event: P::Event = borsh::from_slice(&data).map_err(|e| {
663 error!("Can't decode event: {}", e);
664 store_error(StoreOperation::DecodeEvent, e)
665 })?;
666
667 events.push(event);
668 }
669 Ok(events)
670 }
671
672 fn query_events(&self, from: u64, to: u64) -> Result<Vec<P::Event>, Error> {
679 if from > to || from >= self.event_counter {
680 return Ok(Vec::new());
681 }
682
683 let upper = to.min(self.event_counter.saturating_sub(1));
684 self.events(from, upper)
685 }
686
687 fn snapshot(&mut self, actor: &P) -> Result<(), Error> {
689 debug!("Snapshotting state: {:?}", actor);
690
691 let next_state_counter = self.event_counter;
692
693 let data =
694 borsh::to_vec(&(actor, next_state_counter)).map_err(|e| {
695 error!("Can't encode actor: {}", e);
696 store_error(StoreOperation::EncodeActor, e)
697 })?;
698
699 let bytes = if let Some(key_box) = &self.key_box {
700 self.encrypt(key_box, data.as_slice())?
701 } else {
702 data
703 };
704
705 self.states.put(&bytes)?;
706 self.state_counter = next_state_counter;
707 self.persist_metadata()?;
708 if P::compact_on_snapshot() {
709 if let Err(err) = self.compact_to_snapshot() {
710 warn!(
711 error = %err,
712 "Snapshot persisted but event compaction failed; keeping event log"
713 );
714 } else if let Err(err) = self.persist_metadata() {
715 warn!(
716 error = %err,
717 "Snapshot metadata persisted but compaction watermark update failed"
718 );
719 }
720 }
721 Ok(())
722 }
723
724 fn recover(&mut self) -> Result<Option<P>, Error> {
726 debug!("Starting recovery process");
727
728 if let Some((mut state, counter)) = self.get_state()? {
729 self.state_counter = counter;
730 debug!("Recovered state with counter: {}", counter);
731
732 let last_event_counter =
733 if let Some((key, ..)) = self.events.last()? {
734 key.parse::<u64>().map_err(|e| {
735 store_error(StoreOperation::ParseEventKey, e)
736 })? + 1
737 } else {
738 0
739 };
740
741 self.event_counter = self.state_counter.max(last_event_counter);
744
745 debug!(
746 "Recovery state: event_counter={}, state_counter={}",
747 self.event_counter, self.state_counter
748 );
749
750 if self.event_counter > self.state_counter {
751 warn!(
752 event_counter = self.event_counter,
753 state_counter = self.state_counter,
754 "State mismatch detected, replaying events"
755 );
756 debug!(
757 "Applying events from {} to {}",
758 self.state_counter,
759 self.event_counter - 1
760 );
761 let events =
762 self.events(self.state_counter, self.event_counter - 1)?;
763 debug!("Found {} events to replay", events.len());
764
765 for (i, event) in events.iter().enumerate() {
766 debug!("Applying event {} of {}", i + 1, events.len());
767 state.apply(event).map_err(|e| {
768 store_error(StoreOperation::ApplyEvent, e)
769 })?;
770 }
771
772 debug!(
773 "Updating snapshot after applying {} events",
774 events.len()
775 );
776 self.snapshot(&state)?;
777 debug!(
778 "Recovery completed. Final event_counter: {}",
779 self.event_counter
780 );
781 } else {
784 debug!("State is up to date, no events to apply");
785 }
786
787 Ok(Some(state))
788 } else {
789 debug!("No previous state found");
790
791 if let Some((key, ..)) = self.events.last()? {
793 debug!(
794 "No snapshot but events found - replaying from beginning"
795 );
796
797 self.event_counter = key.parse::<u64>().map_err(|e| {
798 store_error(StoreOperation::ParseEventKey, e)
799 })? + 1;
800 self.state_counter = 0;
801
802 debug!(
803 "Using provided initial state and applying {} events",
804 self.event_counter
805 );
806
807 let mut state = self.initial_state.clone();
810
811 let events = self.events(0, self.event_counter - 1)?;
813 debug!("Replaying {} events from scratch", events.len());
814
815 for (i, event) in events.iter().enumerate() {
816 debug!("Applying event {} of {}", i + 1, events.len());
817 state.apply(event).map_err(|e| {
818 store_error(StoreOperation::ApplyEvent, e)
819 })?;
820 }
821
822 debug!("Creating snapshot after replaying events");
824 self.snapshot(&state)?;
825
826 debug!(
827 "Recovery completed. Final event_counter: {}",
828 self.event_counter
829 );
830
831 Ok(Some(state))
832 } else {
833 debug!("No previous state and no events found, starting fresh");
834 Ok(None)
835 }
836 }
837 }
838
839 fn snapshot_if_needed(&mut self) -> Result<(), Error> {
845 if !matches!(P::Persistence::get_persistence(), PersistenceType::Full) {
846 return Ok(());
847 }
848
849 if self.event_counter == 0 || self.event_counter <= self.state_counter {
850 return Ok(());
851 }
852
853 let mut state = if let Some((s, _)) = self.get_state()? {
855 s
856 } else {
857 self.initial_state.clone()
858 };
859
860 let events = self.events(self.state_counter, self.event_counter - 1)?;
861 for event in &events {
862 state.apply(event).map_err(|e| {
863 store_error(StoreOperation::ApplyEventOnStop, e)
864 })?;
865 }
866
867 self.snapshot(&state)
868 }
869
870 pub fn purge(&mut self) -> Result<(), Error> {
874 self.events.purge()?;
875 self.states.purge()?;
876 self.metadata.purge()?;
877 self.event_counter = 0;
878 self.state_counter = 0;
879 self.compacted_until = 0;
880 Ok(())
881 }
882
883 fn encrypt(
889 &self,
890 key_box: &EncryptedKey,
891 bytes: &[u8],
892 ) -> Result<Vec<u8>, Error> {
893 if let Ok(key) = key_box.key() {
894 if key.len() != 32 {
896 error!(
897 expected = 32,
898 got = key.len(),
899 "Invalid encryption key length"
900 );
901 return Err(Error::Store {
902 operation: StoreOperation::ValidateKeyLength,
903 reason: format!(
904 "Invalid key length: expected 32 bytes, got {}",
905 key.len()
906 ),
907 });
908 }
909
910 let cipher = XChaCha20Poly1305::new(key.as_ref().into());
912
913 let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng);
916
917 let ciphertext: Vec<u8> =
920 cipher.encrypt(&nonce, bytes.as_ref()).map_err(|e| {
921 error!(error = %e, "Encryption failed");
922 store_error(StoreOperation::EncryptData, e)
923 })?;
924
925 Ok([nonce.to_vec(), ciphertext].concat())
928 } else {
929 error!("Failed to decrypt encryption key");
930 Err(store_error(StoreOperation::DecryptKey, "Can't decrypt key"))
931 }
932 }
933
934 fn decrypt(
940 &self,
941 key_box: &EncryptedKey,
942 ciphertext: &[u8],
943 ) -> Result<Vec<u8>, Error> {
944 if ciphertext.len() < NONCE_SIZE + 16 {
946 warn!(
947 expected_min = NONCE_SIZE + 16,
948 got = ciphertext.len(),
949 "Invalid ciphertext length, possible corruption"
950 );
951 return Err(Error::Store {
952 operation: StoreOperation::ValidateCiphertext,
953 reason: format!(
954 "Invalid ciphertext length: expected at least {} bytes, got {}",
955 NONCE_SIZE + 16,
956 ciphertext.len()
957 ),
958 });
959 }
960
961 if let Ok(key) = key_box.key() {
962 if key.len() != 32 {
964 error!(
965 expected = 32,
966 got = key.len(),
967 "Invalid decryption key length"
968 );
969 return Err(store_error(
970 StoreOperation::ValidateKeyLength,
971 format!(
972 "Invalid key length: expected 32 bytes, got {}",
973 key.len()
974 ),
975 ));
976 }
977
978 let nonce = XNonce::from_slice(&ciphertext[..NONCE_SIZE]);
980
981 let ciphertext_data = &ciphertext[NONCE_SIZE..];
983
984 let cipher = XChaCha20Poly1305::new(key.as_ref().into());
986
987 let plaintext =
990 cipher.decrypt(nonce, ciphertext_data).map_err(|e| {
991 warn!(error = %e, "Decryption failed, possible tampering or corruption");
992 store_error(
993 StoreOperation::DecryptData,
994 format!(
995 "Decryption failed (possible tampering): {}",
996 e
997 ),
998 )
999 })?;
1000
1001 Ok(plaintext)
1002 } else {
1003 error!("Failed to decrypt decryption key");
1004 Err(store_error(StoreOperation::DecryptKey, "Can't decrypt key"))
1005 }
1006 }
1007}
1008
1009#[derive(Debug, Clone)]
1011pub enum StoreCommand<P, E> {
1012 Persist(E),
1014 PersistFullEvent {
1016 event: E,
1018 snapshot_every: Option<u64>,
1020 },
1021 PersistFull {
1023 event: E,
1025 actor: P,
1027 snapshot_every: Option<u64>,
1029 },
1030 PersistLight(E, P),
1032 Snapshot(P),
1034 Compact,
1036 LastEvent,
1038 LastEventNumber,
1040 LastEventsFrom(u64),
1042 GetEvents { from: u64, to: u64 },
1044 Recover,
1046 Purge,
1048}
1049
1050impl<P, E> Message for StoreCommand<P, E>
1051where
1052 P: PersistentActor,
1053 P::Event: BorshSerialize + BorshDeserialize,
1054 E: Event + BorshSerialize + BorshDeserialize,
1055{
1056}
1057
1058#[derive(Debug, Clone)]
1060pub enum StoreResponse<P>
1061where
1062 P: PersistentActor,
1063 P::Event: BorshSerialize + BorshDeserialize,
1064{
1065 None,
1067 Persisted,
1069 SnapshotRequired,
1071 Snapshotted,
1073 Compacted,
1075 State(Option<P>),
1077 LastEvent(Option<P::Event>),
1079 LastEventNumber(u64),
1081 Events(Vec<P::Event>),
1083}
1084
1085impl<P> Response for StoreResponse<P>
1086where
1087 P: PersistentActor,
1088 P::Event: BorshSerialize + BorshDeserialize,
1089{
1090}
1091
1092#[derive(
1094 Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
1095)]
1096pub enum StoreEvent {
1097 Persisted,
1099 Snapshotted,
1101}
1102
1103impl Event for StoreEvent {}
1104
1105#[async_trait]
1106impl<P> Actor for Store<P>
1107where
1108 P: PersistentActor,
1109 P::Event: BorshSerialize + BorshDeserialize,
1110{
1111 type Message = StoreCommand<P, P::Event>;
1112 type Response = StoreResponse<P>;
1113 type Event = StoreEvent;
1114
1115 fn get_span(
1116 id: &str,
1117 _parent_span: Option<tracing::Span>,
1118 ) -> tracing::Span {
1119 info_span!("Store", id = %id)
1120 }
1121
1122 async fn pre_stop(
1126 &mut self,
1127 ctx: &mut ActorContext<Self>,
1128 ) -> Result<(), ActorError> {
1129 if let Err(e) = self.snapshot_if_needed() {
1130 error!(error = %e, "Failed to snapshot state during Store shutdown");
1131 let _ = ctx
1132 .emit_error(actor_store_error(
1133 StoreOperation::EmitPreStopError,
1134 e,
1135 ))
1136 .await;
1137 }
1138 Ok(())
1139 }
1140}
1141
1142#[async_trait]
1143impl<P> Handler<Self> for Store<P>
1144where
1145 P: PersistentActor,
1146 P::Event: BorshSerialize + BorshDeserialize,
1147{
1148 async fn handle_message(
1149 &mut self,
1150 _sender: ActorPath,
1151 msg: StoreCommand<P, P::Event>,
1152 _ctx: &mut ActorContext<Self>,
1153 ) -> Result<StoreResponse<P>, ActorError> {
1154 match msg {
1156 StoreCommand::Persist(event) => {
1158 self.persist(&event).map_err(|e| {
1159 actor_store_error(StoreOperation::Persist, e)
1160 })?;
1161 debug!("Persisted event: {:?}", event);
1162 Ok(StoreResponse::Persisted)
1163 }
1164 StoreCommand::PersistFullEvent {
1165 event,
1166 snapshot_every,
1167 } => {
1168 self.persist(&event).map_err(|e| {
1169 actor_store_error(StoreOperation::PersistFull, e)
1170 })?;
1171
1172 if snapshot_every.is_some_and(|every| {
1173 self.pending_events_since_snapshot() >= every
1174 }) {
1175 debug!("Persisted full event and snapshot is now required");
1176 Ok(StoreResponse::SnapshotRequired)
1177 } else {
1178 debug!("Persisted full event: {:?}", event);
1179 Ok(StoreResponse::Persisted)
1180 }
1181 }
1182 StoreCommand::PersistFull {
1183 event,
1184 actor,
1185 snapshot_every,
1186 } => {
1187 self.persist(&event).map_err(|e| {
1188 actor_store_error(StoreOperation::PersistFull, e)
1189 })?;
1190
1191 if snapshot_every.is_some_and(|every| {
1192 self.pending_events_since_snapshot() >= every
1193 }) {
1194 self.snapshot(&actor).map_err(|e| {
1195 actor_store_error(StoreOperation::Snapshot, e)
1196 })?;
1197 }
1198
1199 debug!("Persisted full event: {:?}", event);
1200 Ok(StoreResponse::Persisted)
1201 }
1202 StoreCommand::PersistLight(event, actor) => {
1204 self.persist_state(&event, &actor).map_err(|e| {
1205 actor_store_error(StoreOperation::PersistLight, e)
1206 })?;
1207 debug!("Light persistence of event: {:?}", event);
1208 Ok(StoreResponse::Persisted)
1209 }
1210 StoreCommand::Snapshot(actor) => {
1212 self.snapshot(&actor).map_err(|e| {
1213 actor_store_error(StoreOperation::Snapshot, e)
1214 })?;
1215 debug!("Snapshotted state: {:?}", actor);
1216 Ok(StoreResponse::Snapshotted)
1217 }
1218 StoreCommand::Compact => {
1219 self.compact_to_snapshot().map_err(|e| {
1220 actor_store_error(StoreOperation::Compact, e)
1221 })?;
1222 debug!("Compacted events covered by the latest snapshot");
1223 Ok(StoreResponse::Compacted)
1224 }
1225 StoreCommand::Recover => {
1227 let state = self.recover().map_err(|e| {
1228 actor_store_error(StoreOperation::Recover, e)
1229 })?;
1230 debug!("Recovered state: {:?}", state);
1231 Ok(StoreResponse::State(state))
1232 }
1233 StoreCommand::GetEvents { from, to } => {
1234 let events = self.query_events(from, to).map_err(|e| {
1235 actor_store_error(
1236 StoreOperation::GetEventsRange,
1237 format!("Unable to get events range: {}", e),
1238 )
1239 })?;
1240 Ok(StoreResponse::Events(events))
1241 }
1242 StoreCommand::LastEvent => {
1244 let event = self.last_event().map_err(|e| {
1245 actor_store_error(StoreOperation::LastEvent, e)
1246 })?;
1247 debug!("Last event: {:?}", event);
1248 Ok(StoreResponse::LastEvent(event))
1249 }
1250 StoreCommand::Purge => {
1252 self.purge()
1253 .map_err(|e| actor_store_error(StoreOperation::Purge, e))?;
1254 debug!("Purged store");
1255 Ok(StoreResponse::None)
1256 }
1257 StoreCommand::LastEventNumber => {
1259 Ok(StoreResponse::LastEventNumber(self.event_counter))
1260 }
1261 StoreCommand::LastEventsFrom(from) => {
1263 let to = self.event_counter.saturating_sub(1);
1264 let events = self.events(from, to).map_err(|e| {
1265 actor_store_error(
1266 StoreOperation::GetLatestEvents,
1267 format!("Unable to get the latest events: {}", e),
1268 )
1269 })?;
1270 Ok(StoreResponse::Events(events))
1271 }
1272 }
1273 }
1274}
1275
1276#[cfg(test)]
1277mod tests {
1278 use std::vec;
1279 use test_log::test;
1280 use tokio_util::sync::CancellationToken;
1281 use tracing::info_span;
1282
1283 use super::*;
1284 use crate::memory::MemoryManager;
1285
1286 use ave_actors_actor::{ActorRef, ActorSystem, Error as ActorError};
1287
1288 use async_trait::async_trait;
1289
1290 #[derive(
1291 Debug,
1292 Clone,
1293 Serialize,
1294 Deserialize,
1295 BorshSerialize,
1296 BorshDeserialize,
1297 Default,
1298 )]
1299 struct TestActor {
1300 pub version: usize,
1301 pub value: i32,
1302 }
1303
1304 #[derive(
1305 Debug,
1306 Clone,
1307 Serialize,
1308 Deserialize,
1309 BorshSerialize,
1310 BorshDeserialize,
1311 Default,
1312 )]
1313 struct TestActorLight {
1314 pub data: Vec<i32>,
1315 }
1316
1317 #[derive(Debug, Clone, Serialize, Deserialize)]
1318 enum TestMessageLight {
1319 SetData(Vec<i32>),
1320 GetData,
1321 }
1322
1323 #[derive(Debug, Clone, Serialize, Deserialize)]
1324 enum TestMessage {
1325 Increment(i32),
1326 Recover,
1327 Snapshot,
1328 GetValue,
1329 }
1330
1331 impl Message for TestMessage {}
1332 impl Message for TestMessageLight {}
1333
1334 #[derive(
1335 Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
1336 )]
1337 struct TestEvent(i32);
1338
1339 impl Event for TestEvent {}
1340
1341 #[derive(
1342 Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
1343 )]
1344 struct TestEventLight(Vec<i32>);
1345
1346 impl Event for TestEventLight {}
1347
1348 #[derive(Debug, Clone, PartialEq)]
1349 enum TestResponse {
1350 Value(i32),
1351 None,
1352 }
1353
1354 #[derive(Debug, Clone, PartialEq)]
1355 enum TestResponseLight {
1356 Data(Vec<i32>),
1357 None,
1358 }
1359
1360 impl Response for TestResponse {}
1361 impl Response for TestResponseLight {}
1362
1363 #[async_trait]
1364 impl Actor for TestActorLight {
1365 type Message = TestMessageLight;
1366 type Event = TestEventLight;
1367 type Response = TestResponseLight;
1368
1369 fn get_span(
1370 id: &str,
1371 _parent_span: Option<tracing::Span>,
1372 ) -> tracing::Span {
1373 info_span!("TestActorLight", id = %id)
1374 }
1375
1376 async fn pre_start(
1377 &mut self,
1378 ctx: &mut ActorContext<Self>,
1379 ) -> Result<(), ActorError> {
1380 let memory_db: MemoryManager =
1381 ctx.system().get_helper("db").await.unwrap();
1382
1383 let encrypt_key = EncryptedKey::new(&[3u8; 32]).unwrap();
1384
1385 let db = Store::<Self>::new(
1386 "store",
1387 "prefix",
1388 memory_db,
1389 Some(encrypt_key),
1390 Self::create_initial(()),
1391 )
1392 .unwrap();
1393
1394 let store = ctx.create_child("store", db).await.unwrap();
1395 let response = store.ask(StoreCommand::Recover).await?;
1396
1397 if let StoreResponse::State(Some(state)) = response {
1398 self.update(state);
1399 } else {
1400 debug!("Create first snapshot");
1401 store
1402 .tell(StoreCommand::Snapshot(self.clone()))
1403 .await
1404 .unwrap();
1405 }
1406
1407 Ok(())
1408 }
1409 }
1410
1411 #[async_trait]
1412 impl Actor for TestActor {
1413 type Message = TestMessage;
1414 type Event = TestEvent;
1415 type Response = TestResponse;
1416
1417 fn get_span(
1418 id: &str,
1419 _parent_span: Option<tracing::Span>,
1420 ) -> tracing::Span {
1421 info_span!("TestActor", id = %id)
1422 }
1423
1424 async fn pre_start(
1425 &mut self,
1426 ctx: &mut ActorContext<Self>,
1427 ) -> Result<(), ActorError> {
1428 let db = Store::<Self>::new(
1429 "store",
1430 "prefix",
1431 MemoryManager::default(),
1432 None,
1433 Self::create_initial(()),
1434 )
1435 .unwrap();
1436 let store = ctx.create_child("store", db).await.unwrap();
1437 let response = store.ask(StoreCommand::Recover).await.unwrap();
1438 debug!("Recover response: {:?}", response);
1439 if let StoreResponse::State(Some(state)) = response {
1440 debug!("Recovering state: {:?}", state);
1441 self.update(state);
1442 }
1443 Ok(())
1444 }
1445 }
1446
1447 #[async_trait]
1448 impl PersistentActor for TestActorLight {
1449 type Persistence = LightPersistence;
1450 type InitParams = ();
1451
1452 fn create_initial(_: ()) -> Self {
1453 Self { data: Vec::new() }
1454 }
1455
1456 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1457 self.data.clone_from(&event.0);
1458 Ok(())
1459 }
1460 }
1461
1462 #[async_trait]
1463 impl PersistentActor for TestActor {
1464 type Persistence = FullPersistence;
1465 type InitParams = ();
1466
1467 fn create_initial(_: ()) -> Self {
1468 Self {
1469 version: 0,
1470 value: 0,
1471 }
1472 }
1473
1474 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1475 self.version += 1;
1476 self.value += event.0;
1477 Ok(())
1478 }
1479 }
1480
1481 #[async_trait]
1482 impl Handler<TestActorLight> for TestActorLight {
1483 async fn handle_message(
1484 &mut self,
1485 _sender: ActorPath,
1486 msg: TestMessageLight,
1487 ctx: &mut ActorContext<TestActorLight>,
1488 ) -> Result<TestResponseLight, ActorError> {
1489 match msg {
1490 TestMessageLight::SetData(data) => {
1491 self.on_event(TestEventLight(data), ctx).await;
1492 Ok(TestResponseLight::None)
1493 }
1494 TestMessageLight::GetData => {
1495 Ok(TestResponseLight::Data(self.data.clone()))
1496 }
1497 }
1498 }
1499
1500 async fn on_event(
1501 &mut self,
1502 event: TestEventLight,
1503 ctx: &mut ActorContext<TestActorLight>,
1504 ) -> () {
1505 self.persist(&event, ctx).await.unwrap();
1506 }
1507 }
1508
1509 #[async_trait]
1510 impl Handler<TestActor> for TestActor {
1511 async fn handle_message(
1512 &mut self,
1513 _sender: ActorPath,
1514 msg: TestMessage,
1515 ctx: &mut ActorContext<TestActor>,
1516 ) -> Result<TestResponse, ActorError> {
1517 match msg {
1518 TestMessage::Increment(value) => {
1519 let event = TestEvent(value);
1520 self.on_event(event, ctx).await;
1521 Ok(TestResponse::None)
1522 }
1523 TestMessage::Recover => {
1524 let store: ActorRef<Store<Self>> =
1525 ctx.get_child("store").await.unwrap();
1526 let response =
1527 store.ask(StoreCommand::Recover).await.unwrap();
1528 if let StoreResponse::State(Some(state)) = response {
1529 self.update(state.clone());
1530 Ok(TestResponse::Value(state.value))
1531 } else {
1532 Ok(TestResponse::None)
1533 }
1534 }
1535 TestMessage::Snapshot => {
1536 let store: ActorRef<Store<Self>> =
1537 ctx.get_child("store").await.unwrap();
1538 store
1539 .ask(StoreCommand::Snapshot(self.clone()))
1540 .await
1541 .unwrap();
1542 Ok(TestResponse::None)
1543 }
1544 TestMessage::GetValue => Ok(TestResponse::Value(self.value)),
1545 }
1546 }
1547
1548 async fn on_event(
1549 &mut self,
1550 event: TestEvent,
1551 ctx: &mut ActorContext<TestActor>,
1552 ) -> () {
1553 self.persist(&event, ctx).await.unwrap();
1554 }
1555 }
1556
1557 #[test(tokio::test)]
1558 async fn test_store_actor() {
1559 let (system, mut runner) = ActorSystem::create(
1560 CancellationToken::new(),
1561 CancellationToken::new(),
1562 );
1563 tokio::spawn(async move {
1565 runner.run().await;
1566 });
1567
1568 let encrypt_key =
1569 EncryptedKey::new(b"0123456789abcdef0123456789abcdef").unwrap();
1570 let db = Store::<TestActor>::new(
1571 "store",
1572 "test",
1573 MemoryManager::default(),
1574 Some(encrypt_key),
1575 TestActor::create_initial(()),
1576 )
1577 .unwrap();
1578 let store = system.create_root_actor("store", db).await.unwrap();
1579
1580 let mut actor = TestActor {
1581 version: 0,
1582 value: 0,
1583 };
1584 store
1585 .tell(StoreCommand::Snapshot(actor.clone()))
1586 .await
1587 .unwrap();
1588 store
1589 .tell(StoreCommand::Persist(TestEvent(10)))
1590 .await
1591 .unwrap();
1592 actor.apply(&TestEvent(10)).unwrap();
1593 store
1594 .tell(StoreCommand::Snapshot(actor.clone()))
1595 .await
1596 .unwrap();
1597 store
1598 .tell(StoreCommand::Persist(TestEvent(10)))
1599 .await
1600 .unwrap();
1601
1602 actor.apply(&TestEvent(10)).unwrap();
1603 let response = store.ask(StoreCommand::Recover).await.unwrap();
1604 if let StoreResponse::State(Some(state)) = response {
1605 assert_eq!(state.value, actor.value);
1606 }
1607 let response = store.ask(StoreCommand::Recover).await.unwrap();
1608 if let StoreResponse::State(Some(state)) = response {
1609 assert_eq!(state.value, actor.value);
1610 }
1611 let response = store.ask(StoreCommand::LastEvent).await.unwrap();
1612 if let StoreResponse::LastEvent(Some(event)) = response {
1613 assert_eq!(event.0, 10);
1614 } else {
1615 panic!("Event not found");
1616 }
1617 let response = store.ask(StoreCommand::LastEventNumber).await.unwrap();
1618 if let StoreResponse::LastEventNumber(number) = response {
1619 assert_eq!(number, 2);
1620 } else {
1621 panic!("Event number not found");
1622 }
1623 let response =
1624 store.ask(StoreCommand::LastEventsFrom(1)).await.unwrap();
1625 if let StoreResponse::Events(events) = response {
1626 assert_eq!(events.len(), 1);
1627 assert_eq!(events[0].0, 10);
1628 } else {
1629 panic!("Events not found");
1630 }
1631 let response = store
1632 .ask(StoreCommand::GetEvents { from: 0, to: 1 })
1633 .await
1634 .unwrap();
1635 if let StoreResponse::Events(events) = response {
1636 assert_eq!(events.len(), 2);
1637 assert_eq!(events[0].0, 10);
1638 assert_eq!(events[1].0, 10);
1639 } else {
1640 panic!("Events not found");
1641 }
1642 }
1643
1644 #[test(tokio::test)]
1645 async fn test_persistent_light_actor() {
1646 let (system, ..) = ActorSystem::create(
1647 CancellationToken::new(),
1648 CancellationToken::new(),
1649 );
1650
1651 system.add_helper("db", MemoryManager::default()).await;
1652
1653 let actor_ref = system
1654 .create_root_actor("test", TestActorLight::initial(()))
1655 .await
1656 .unwrap();
1657
1658 let result = actor_ref
1659 .ask(TestMessageLight::SetData(vec![12, 13, 14, 15]))
1660 .await
1661 .unwrap();
1662
1663 assert_eq!(result, TestResponseLight::None);
1664
1665 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1666
1667 actor_ref.ask_stop().await.unwrap();
1668
1669 let actor_ref = system
1670 .create_root_actor("test", TestActorLight::initial(()))
1671 .await
1672 .unwrap();
1673
1674 let result = actor_ref.ask(TestMessageLight::GetData).await.unwrap();
1675
1676 let TestResponseLight::Data(data) = result else {
1677 panic!("Invalid response")
1678 };
1679
1680 assert_eq!(data, vec![12, 13, 14, 15]);
1681 }
1682
1683 #[test(tokio::test)]
1684 async fn test_persistent_actor() {
1685 let (system, mut runner) = ActorSystem::create(
1686 CancellationToken::new(),
1687 CancellationToken::new(),
1688 );
1689 tokio::spawn(async move {
1691 runner.run().await;
1692 });
1693
1694 let actor_ref = system
1695 .create_root_actor("test", TestActor::initial(()))
1696 .await
1697 .unwrap();
1698
1699 let result = actor_ref.ask(TestMessage::Increment(10)).await.unwrap();
1700
1701 assert_eq!(result, TestResponse::None);
1702
1703 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1704
1705 actor_ref.tell(TestMessage::Snapshot).await.unwrap();
1706
1707 let result = actor_ref.ask(TestMessage::GetValue).await.unwrap();
1708
1709 assert_eq!(result, TestResponse::Value(10));
1710 actor_ref.tell(TestMessage::Increment(10)).await.unwrap();
1711 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1712
1713 let value = actor_ref.ask(TestMessage::GetValue).await.unwrap();
1714
1715 assert_eq!(value, TestResponse::Value(20));
1716
1717 actor_ref.ask(TestMessage::Recover).await.unwrap();
1718
1719 let value = actor_ref.ask(TestMessage::GetValue).await.unwrap();
1720
1721 assert_eq!(value, TestResponse::Value(20));
1722
1723 actor_ref.ask_stop().await.unwrap();
1724 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1725 }
1726
1727 #[test(tokio::test)]
1728 async fn test_encrypt_decrypt() {
1729 let encrypt_key = EncryptedKey::new(&[0u8; 32]).unwrap();
1730
1731 let store = Store::<TestActor>::new(
1732 "store",
1733 "test",
1734 MemoryManager::default(),
1735 Some(encrypt_key),
1736 TestActor::create_initial(()),
1737 )
1738 .unwrap();
1739 let data = b"Hello, world!";
1740 let encrypted = store
1741 .encrypt(&store.key_box.clone().unwrap(), data)
1742 .unwrap();
1743 let decrypted = store
1744 .decrypt(&store.key_box.clone().unwrap(), &encrypted)
1745 .unwrap();
1746 assert_eq!(data, decrypted.as_slice());
1747 }
1748}