Skip to main content

ave_actors_store/
store.rs

1//! Event-sourced persistence for actors via [`PersistentActor`].
2
3use crate::{
4    database::{Collection, DbManager, State},
5    error::{Error, StoreOperation},
6};
7
8use ave_actors_actor::{
9    Actor, ActorContext, ActorPath, EncryptedKey, Error as ActorError, Event,
10    Handler, IntoActor, Message, Response,
11};
12
13use async_trait::async_trait;
14
15use borsh::{BorshDeserialize, BorshSerialize};
16
17use chacha20poly1305::{
18    XChaCha20Poly1305, XNonce,
19    aead::{Aead, AeadCore, KeyInit, OsRng},
20};
21
22use serde::{Deserialize, Serialize};
23
24use tracing::{debug, error, info_span, warn};
25
26use std::fmt::Debug;
27
28/// Nonce size for XChaCha20-Poly1305 encryption.
29const NONCE_SIZE: usize = 24;
30
31fn store_error(operation: StoreOperation, reason: impl ToString) -> Error {
32    Error::Store {
33        operation,
34        reason: reason.to_string(),
35    }
36}
37
38fn actor_store_error(
39    operation: StoreOperation,
40    reason: impl ToString,
41) -> ActorError {
42    ActorError::StoreOperation {
43        operation: operation.to_string(),
44        reason: reason.to_string(),
45    }
46}
47
48/// Selects the persistence strategy used by a [`PersistentActor`].
49///
50/// `Light` trades storage space for fast recovery; `Full` trades recovery speed
51/// for a smaller storage footprint and a complete audit trail.
52#[derive(Debug, Clone)]
53pub enum PersistenceType {
54    /// Each event is stored together with a state snapshot.
55    ///
56    /// Recovery is fast (load snapshot, done) at the cost of storing the full
57    /// state on every write.
58    Light,
59    /// Only events are stored; state is reconstructed by replaying them.
60    ///
61    /// Uses less storage and preserves a complete audit trail, but recovery time
62    /// grows with the number of events since the last snapshot.
63    Full,
64}
65
66/// Marker type that selects [`PersistenceType::Light`] for a [`PersistentActor`].
67pub struct LightPersistence;
68
69/// Marker type that selects [`PersistenceType::Full`] for a [`PersistentActor`].
70pub struct FullPersistence;
71
72/// Type-level selector that maps a marker type to a [`PersistenceType`] value.
73pub trait Persistence {
74    /// Returns the runtime persistence mode represented by this marker type.
75    fn get_persistence() -> PersistenceType;
76}
77
78impl Persistence for LightPersistence {
79    fn get_persistence() -> PersistenceType {
80        PersistenceType::Light
81    }
82}
83
84impl Persistence for FullPersistence {
85    fn get_persistence() -> PersistenceType {
86        PersistenceType::Full
87    }
88}
89
90/// Wrapper that guarantees a [`PersistentActor`] was constructed via [`PersistentActor::initial`].
91///
92/// The actor system requires this wrapper for persistent actors, preventing users
93/// from bypassing the initialization logic by constructing the actor struct directly.
94#[derive(Debug)]
95pub struct InitializedActor<A>(A);
96
97impl<A> InitializedActor<A> {
98    pub(crate) const fn new(actor: A) -> Self {
99        Self(actor)
100    }
101}
102
103impl<A> IntoActor<A> for InitializedActor<A>
104where
105    A: PersistentActor,
106    A::Event: BorshSerialize + BorshDeserialize,
107{
108    fn into_actor(self) -> A {
109        self.0
110    }
111}
112
113/// Extends [`Actor`] with event-sourced state persistence.
114///
115/// Implement `apply` to define how each event mutates the in-memory state, then
116/// call `persist` inside `handle_message` to durably record a change. The actor
117/// system automatically recovers state on restart by replaying events and/or loading
118/// the latest snapshot, depending on the chosen [`Persistence`] strategy.
119///
120/// Do NOT also implement `NotPersistentActor` on the same type.
121#[async_trait]
122pub trait PersistentActor:
123    Actor + Handler<Self> + Debug + Clone + BorshSerialize + BorshDeserialize
124where
125    Self::Event: BorshSerialize + BorshDeserialize,
126{
127    /// The persistence strategy ([`LightPersistence`] or [`FullPersistence`]).
128    type Persistence: Persistence;
129
130    /// Parameters passed to [`create_initial`](PersistentActor::create_initial). Use `()` if no initialization data is needed.
131    type InitParams;
132
133    /// Creates the actor in its default initial state from the given parameters.
134    ///
135    /// This is called by [`initial`](PersistentActor::initial) and also during recovery
136    /// when no snapshot exists and events must be replayed from scratch.
137    fn create_initial(params: Self::InitParams) -> Self;
138
139    /// Returns an [`InitializedActor`] wrapping the actor's initial state.
140    ///
141    /// This is the only way to create a persistent actor instance accepted by
142    /// `create_root_actor` and `create_child`. Pass the result directly to those methods.
143    fn initial(params: Self::InitParams) -> InitializedActor<Self>
144    where
145        Self: Sized,
146    {
147        InitializedActor::new(Self::create_initial(params))
148    }
149
150    /// Applies `event` to the actor's in-memory state.
151    ///
152    /// This method must be deterministic — the same event applied to the same state
153    /// must always produce the same result. It should only update in-memory fields;
154    /// persistence is handled by [`persist`](PersistentActor::persist).
155    /// Returns an error if the event represents an invalid state transition.
156    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError>;
157
158    /// Replaces the current state with `state`, used during snapshot recovery.
159    ///
160    /// The default implementation is `*self = state`. Override if you need to
161    /// preserve fields that should not be overwritten during recovery.
162    fn update(&mut self, state: Self) {
163        *self = state;
164    }
165
166    /// Snapshot cadence for `FullPersistence`.
167    ///
168    /// - `None`: snapshots are only manual or done during store shutdown.
169    /// - `Some(n)`: after every `n` persisted events since the last snapshot,
170    ///   the store snapshots the current actor state automatically.
171    ///
172    /// Default: `Some(100)`.
173    fn snapshot_every() -> Option<u64> {
174        Some(100)
175    }
176
177    /// Whether already snapshotted events should be compacted after a
178    /// successful snapshot.
179    ///
180    /// Default: `false`, so `FullPersistence` keeps the full audit log unless
181    /// the actor opts into retention explicitly.
182    fn compact_on_snapshot() -> bool {
183        false
184    }
185
186    /// Applies `event` to the in-memory state and durably persists it.
187    ///
188    /// Calls [`apply`](PersistentActor::apply) first; if that succeeds, sends the event
189    /// (and optionally the current state) to the child `store` actor. On any failure the
190    /// in-memory state is rolled back to its pre-call value. For `LightPersistence`, both
191    /// the event and a state snapshot are written atomically; for `FullPersistence`, only
192    /// the event is written, with periodic automatic snapshots controlled by [`snapshot_every`](PersistentActor::snapshot_every).
193    async fn persist(
194        &mut self,
195        event: &Self::Event,
196        ctx: &mut ActorContext<Self>,
197    ) -> Result<(), ActorError> {
198        let store = ctx.get_child::<Store<Self>>("store").await?;
199
200        let prev_state = self.clone();
201
202        if let Err(e) = self.apply(event) {
203            self.update(prev_state.clone());
204            return Err(e);
205        }
206
207        let response = match Self::Persistence::get_persistence() {
208            PersistenceType::Light => {
209                match store
210                    .ask(StoreCommand::PersistLight(
211                        event.clone(),
212                        self.clone(),
213                    ))
214                    .await
215                {
216                    Ok(response) => response,
217                    Err(e) => {
218                        self.update(prev_state.clone());
219                        return Err(actor_store_error(
220                            StoreOperation::PersistLight,
221                            e,
222                        ));
223                    }
224                }
225            }
226            PersistenceType::Full => {
227                match store
228                    .ask(StoreCommand::PersistFullEvent {
229                        event: event.clone(),
230                        snapshot_every: Self::snapshot_every(),
231                    })
232                    .await
233                {
234                    Ok(response) => response,
235                    Err(e) => {
236                        self.update(prev_state.clone());
237                        return Err(actor_store_error(
238                            StoreOperation::PersistFull,
239                            e,
240                        ));
241                    }
242                }
243            }
244        };
245
246        match response {
247            StoreResponse::Persisted => Ok(()),
248            StoreResponse::SnapshotRequired => {
249                self.snapshot(ctx).await?;
250                Ok(())
251            }
252            _ => {
253                self.update(prev_state);
254                Err(ActorError::UnexpectedResponse {
255                    path: ActorPath::from(format!(
256                        "{}/store",
257                        ctx.path().clone()
258                    )),
259                    expected:
260                        "StoreResponse::Persisted | StoreResponse::SnapshotRequired"
261                            .to_owned(),
262                })
263            }
264        }
265    }
266
267    /// Sends the current state to the child `store` actor to be saved as a snapshot.
268    ///
269    /// Snapshots speed up recovery by reducing the number of events that need to be
270    /// replayed. For most use cases, snapshots are triggered automatically; call this
271    /// manually only when you need an immediate checkpoint.
272    async fn snapshot(
273        &self,
274        ctx: &mut ActorContext<Self>,
275    ) -> Result<(), ActorError> {
276        let store = ctx.get_child::<Store<Self>>("store").await?;
277        store
278            .ask(StoreCommand::Snapshot(self.clone()))
279            .await
280            .map_err(|e| actor_store_error(StoreOperation::Snapshot, e))?;
281        Ok(())
282    }
283
284    /// Creates the child `store` actor, opens the storage backend, and recovers any persisted state.
285    ///
286    /// Call this from [`pre_start`](ave_actors_actor::Actor::pre_start), passing the database manager
287    /// and an optional encryption key. If a persisted state exists, it is loaded and applied to `self`
288    /// before `pre_start` returns. `prefix` scopes the storage keys; if `None`, the actor's path key is used.
289    async fn start_store<C: Collection, S: State>(
290        &mut self,
291        name: &str,
292        prefix: Option<String>,
293        ctx: &mut ActorContext<Self>,
294        manager: impl DbManager<C, S>,
295        key_box: Option<EncryptedKey>,
296    ) -> Result<(), ActorError> {
297        let prefix = prefix.unwrap_or_else(|| ctx.path().key());
298
299        let store =
300            Store::<Self>::new(name, &prefix, manager, key_box, self.clone())
301                .map_err(|e| actor_store_error(StoreOperation::StoreInit, e))?;
302        let store = ctx.create_child("store", store).await?;
303        let response = store.ask(StoreCommand::Recover).await?;
304
305        if let StoreResponse::State(Some(state)) = response {
306            self.update(state);
307        }
308
309        Ok(())
310    }
311}
312
313/// Internal child actor that manages event and snapshot persistence for a [`PersistentActor`].
314///
315/// Created automatically by [`start_store`](PersistentActor::start_store). It stores events in
316/// a [`Collection`](crate::database::Collection) with zero-padded sequence-number keys and
317/// snapshots in a [`State`](crate::database::State) store. Data can optionally be encrypted
318/// with XChaCha20-Poly1305.
319pub struct Store<P>
320where
321    P: PersistentActor,
322    P::Event: BorshSerialize + BorshDeserialize,
323{
324    /// Next free event index.
325    ///
326    /// This is not the number of the last persisted event; it is the index that
327    /// the next persisted event will use.
328    event_counter: u64,
329    /// Number of events already included in the latest snapshot.
330    ///
331    /// If `state_counter == 10`, the snapshot contains events `0..=9`.
332    state_counter: u64,
333    /// Exclusive upper bound of the event range already compacted from the log.
334    compacted_until: u64,
335    /// Collection for storing events with sequence numbers as keys.
336    events: Box<dyn Collection>,
337    /// Storage for the latest state snapshot.
338    states: Box<dyn State>,
339    /// Storage for log metadata used to resume after snapshots/compaction.
340    metadata: Box<dyn State>,
341    /// Encrypted password for data encryption (XChaCha20-Poly1305).
342    /// If None, data is stored unencrypted.
343    key_box: Option<EncryptedKey>,
344    /// Initial state to use when recovering without a snapshot.
345    /// This is the state created with `create_initial(params)`.
346    initial_state: P,
347}
348
349impl<P> ave_actors_actor::NotPersistentActor for Store<P>
350where
351    P: PersistentActor,
352    P::Event: BorshSerialize + BorshDeserialize,
353{
354}
355
356#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
357struct StoreMetadata {
358    next_event_index: u64,
359    compacted_until: u64,
360}
361
362impl<P> Store<P>
363where
364    P: PersistentActor,
365    P::Event: BorshSerialize + BorshDeserialize,
366{
367    /// Creates and initializes the store, opening the three backend stores (events, state, metadata).
368    ///
369    /// `name` is used to derive the collection names; `prefix` scopes the keys.
370    /// On construction the event counter is read from persisted metadata (or inferred from the
371    /// event log for backward compatibility). Returns an error if any backend allocation fails.
372    pub fn new<C, S>(
373        name: &str,
374        prefix: &str,
375        manager: impl DbManager<C, S>,
376        key_box: Option<EncryptedKey>,
377        initial_state: P,
378    ) -> Result<Self, Error>
379    where
380        C: Collection + 'static,
381        S: State + 'static,
382    {
383        let events =
384            manager.create_collection(&format!("{}_events", name), prefix)?;
385        let states =
386            manager.create_state(&format!("{}_states", name), prefix)?;
387        let metadata =
388            manager.create_state(&format!("{}_metadata", name), prefix)?;
389
390        let mut store = Self {
391            event_counter: 0,
392            state_counter: 0,
393            compacted_until: 0,
394            events: Box::new(events),
395            states: Box::new(states),
396            metadata: Box::new(metadata),
397            key_box,
398            initial_state,
399        };
400
401        // Initialize event_counter from persisted metadata when available.
402        // Fall back to the latest snapshot boundary and event log for
403        // backwards compatibility with stores created before metadata existed.
404        let last_event_counter = if let Some((key, _)) = store.events.last()? {
405            key.parse::<u64>()
406                .map_err(|e| store_error(StoreOperation::ParseEventKey, e))?
407                + 1
408        } else {
409            0
410        };
411
412        let snapshot_counter = if let Some((_, counter)) = store.get_state()? {
413            counter
414        } else {
415            0
416        };
417
418        if let Some(metadata) = store.get_metadata()? {
419            store.event_counter =
420                last_event_counter.max(metadata.next_event_index);
421            store.compacted_until = metadata.compacted_until;
422        } else {
423            store.event_counter = last_event_counter.max(snapshot_counter);
424        }
425
426        debug!(
427            "Initializing Store with event_counter: {}, compacted_until: {}",
428            store.event_counter, store.compacted_until
429        );
430
431        Ok(store)
432    }
433
434    const fn pending_events_since_snapshot(&self) -> u64 {
435        self.event_counter.saturating_sub(self.state_counter)
436    }
437
438    fn get_metadata(&self) -> Result<Option<StoreMetadata>, Error> {
439        let data = match self.metadata.get() {
440            Ok(data) => data,
441            Err(Error::EntryNotFound { .. }) => return Ok(None),
442            Err(err) => return Err(err),
443        };
444
445        let bytes = if let Some(key_box) = &self.key_box {
446            self.decrypt(key_box, data.as_slice())?
447        } else {
448            data
449        };
450
451        let metadata: StoreMetadata =
452            borsh::from_slice(&bytes).map_err(|e| {
453                error!("Can't decode metadata: {}", e);
454                store_error(StoreOperation::DecodeState, e)
455            })?;
456
457        Ok(Some(metadata))
458    }
459
460    fn persist_metadata(&mut self) -> Result<(), Error> {
461        let metadata = StoreMetadata {
462            next_event_index: self.event_counter,
463            compacted_until: self.compacted_until,
464        };
465        let data = borsh::to_vec(&metadata).map_err(|e| {
466            error!("Can't encode metadata: {}", e);
467            store_error(StoreOperation::EncodeActor, e)
468        })?;
469
470        let bytes = if let Some(key_box) = &self.key_box {
471            self.encrypt(key_box, data.as_slice())?
472        } else {
473            data
474        };
475
476        self.metadata.put(&bytes)
477    }
478
479    fn compact_to_snapshot(&mut self) -> Result<(), Error> {
480        for idx in self.compacted_until..self.state_counter {
481            let key = format!("{:020}", idx);
482            match self.events.del(&key) {
483                Ok(()) | Err(Error::EntryNotFound { .. }) => {
484                    self.compacted_until = idx + 1;
485                }
486                Err(err) => return Err(err),
487            }
488        }
489        Ok(())
490    }
491
492    /// Serializes and stores `event` at the next sequence position, then increments the event counter.
493    fn persist<E>(&mut self, event: &E) -> Result<(), Error>
494    where
495        E: Event + BorshSerialize + BorshDeserialize,
496    {
497        debug!("Persisting event: {:?}", event);
498
499        let bytes = borsh::to_vec(event).map_err(|e| {
500            error!("Can't encode event: {}", e);
501            store_error(StoreOperation::EncodeEvent, e)
502        })?;
503
504        let bytes = if let Some(key_box) = &self.key_box {
505            self.encrypt(key_box, &bytes)?
506        } else {
507            bytes
508        };
509
510        // Calculate next event position (0-indexed)
511        // event_counter works like vector.len(): 0 means empty, 1 means one event at position 0
512        let next_event_number = self.event_counter;
513
514        debug!(
515            "Persisting event {} at index {}",
516            std::any::type_name::<E>(),
517            next_event_number
518        );
519
520        // First persist the event, then increment counter (atomic operation)
521        let result = self
522            .events
523            .put(&format!("{:020}", next_event_number), &bytes);
524
525        // Only increment counter if persist was successful
526        if result.is_ok() {
527            self.event_counter += 1;
528            debug!(
529                "Successfully persisted event, event_counter now: {}",
530                self.event_counter
531            );
532        }
533
534        result
535    }
536
537    /// Atomically stores `event` and a state snapshot (used for `LightPersistence`). Rolls back the event if the snapshot fails.
538    fn persist_state<E>(&mut self, event: &E, state: &P) -> Result<(), Error>
539    where
540        E: Event + BorshSerialize + BorshDeserialize,
541    {
542        debug!("Persisting event: {:?}", event);
543
544        let bytes = borsh::to_vec(event).map_err(|e| {
545            error!("Can't encode event: {}", e);
546            store_error(StoreOperation::EncodeEvent, e)
547        })?;
548
549        let bytes = if let Some(key_box) = &self.key_box {
550            self.encrypt(key_box, &bytes)?
551        } else {
552            bytes
553        };
554
555        // Calculate next event position (0-indexed)
556        // event_counter works like vector.len(): 0 means empty, 1 means one event at position 0
557        let next_event_number = self.event_counter;
558
559        debug!(
560            "Persisting event {} at index {} with LightPersistence",
561            std::any::type_name::<E>(),
562            next_event_number
563        );
564
565        // 1. First persist the event
566        let event_key = format!("{:020}", next_event_number);
567        let result = self.events.put(&event_key, &bytes);
568
569        // 2. Only increment counter if persist was successful
570        if result.is_ok() {
571            self.event_counter += 1;
572            debug!(
573                "Successfully persisted event, event_counter now: {}",
574                self.event_counter
575            );
576        } else {
577            return result;
578        }
579
580        // 3. NOW create snapshot with the updated event_counter
581        // This ensures state_counter = event_counter after the snapshot
582        if let Err(snapshot_err) = self.snapshot(state) {
583            self.event_counter = next_event_number;
584            if let Err(rollback_err) = self.events.del(&event_key) {
585                return Err(store_error(
586                    StoreOperation::RollbackPersistLight,
587                    format!(
588                        "snapshot failed: {}; rollback delete failed: {}",
589                        snapshot_err, rollback_err
590                    ),
591                ));
592            }
593            return Err(snapshot_err);
594        }
595
596        Ok(())
597    }
598
599    /// Returns the most recently persisted event, or `None` if no events have been stored.
600    fn last_event(&self) -> Result<Option<P::Event>, Error> {
601        if let Some((_, data)) = self.events.last()? {
602            let data = if let Some(key_box) = &self.key_box {
603                self.decrypt(key_box, data.as_slice())?
604            } else {
605                data
606            };
607
608            let event: P::Event = borsh::from_slice(&data).map_err(|e| {
609                error!("Can't decode event: {}", e);
610                store_error(StoreOperation::DecodeEvent, e)
611            })?;
612
613            Ok(Some(event))
614        } else {
615            Ok(None)
616        }
617    }
618
619    fn get_state(&self) -> Result<Option<(P, u64)>, Error> {
620        let data = match self.states.get() {
621            Ok(data) => data,
622            Err(e) => {
623                if let Error::EntryNotFound { .. } = e {
624                    return Ok(None);
625                } else {
626                    return Err(e);
627                }
628            }
629        };
630
631        let bytes = if let Some(key_box) = &self.key_box {
632            self.decrypt(key_box, data.as_slice())?
633        } else {
634            data
635        };
636
637        let state: (P, u64) = borsh::from_slice(&bytes).map_err(|e| {
638            error!("Can't decode state: {}", e);
639            store_error(StoreOperation::DecodeState, e)
640        })?;
641
642        Ok(Some(state))
643    }
644
645    /// Retrieve events.
646    fn events(&self, from: u64, to: u64) -> Result<Vec<P::Event>, Error> {
647        if from > to {
648            return Ok(Vec::new());
649        }
650
651        let mut events = Vec::new();
652
653        for i in from..=to {
654            let key = format!("{:020}", i);
655            let data = self.events.get(&key)?;
656            let data = if let Some(key_box) = &self.key_box {
657                self.decrypt(key_box, data.as_slice())?
658            } else {
659                data
660            };
661
662            let event: P::Event = borsh::from_slice(&data).map_err(|e| {
663                error!("Can't decode event: {}", e);
664                store_error(StoreOperation::DecodeEvent, e)
665            })?;
666
667            events.push(event);
668        }
669        Ok(events)
670    }
671
672    /// Retrieve events for external queries, tolerating out-of-range bounds.
673    ///
674    /// This keeps recovery strict while making StoreCommand::GetEvents behave
675    /// like a range query: if the requested interval does not intersect with
676    /// existing events, the result is empty; if it overlaps partially, only
677    /// the existing suffix is returned.
678    fn query_events(&self, from: u64, to: u64) -> Result<Vec<P::Event>, Error> {
679        if from > to || from >= self.event_counter {
680            return Ok(Vec::new());
681        }
682
683        let upper = to.min(self.event_counter.saturating_sub(1));
684        self.events(from, upper)
685    }
686
687    /// Serializes `actor`, stores it as the current snapshot, updates the state counter and metadata, and optionally compacts old events.
688    fn snapshot(&mut self, actor: &P) -> Result<(), Error> {
689        debug!("Snapshotting state: {:?}", actor);
690
691        let next_state_counter = self.event_counter;
692
693        let data =
694            borsh::to_vec(&(actor, next_state_counter)).map_err(|e| {
695                error!("Can't encode actor: {}", e);
696                store_error(StoreOperation::EncodeActor, e)
697            })?;
698
699        let bytes = if let Some(key_box) = &self.key_box {
700            self.encrypt(key_box, data.as_slice())?
701        } else {
702            data
703        };
704
705        self.states.put(&bytes)?;
706        self.state_counter = next_state_counter;
707        self.persist_metadata()?;
708        if P::compact_on_snapshot() {
709            if let Err(err) = self.compact_to_snapshot() {
710                warn!(
711                    error = %err,
712                    "Snapshot persisted but event compaction failed; keeping event log"
713                );
714            } else if let Err(err) = self.persist_metadata() {
715                warn!(
716                    error = %err,
717                    "Snapshot metadata persisted but compaction watermark update failed"
718                );
719            }
720        }
721        Ok(())
722    }
723
724    /// Loads the latest snapshot (if any) and replays all subsequent events to reconstruct the current state.
725    fn recover(&mut self) -> Result<Option<P>, Error> {
726        debug!("Starting recovery process");
727
728        if let Some((mut state, counter)) = self.get_state()? {
729            self.state_counter = counter;
730            debug!("Recovered state with counter: {}", counter);
731
732            let last_event_counter =
733                if let Some((key, ..)) = self.events.last()? {
734                    key.parse::<u64>().map_err(|e| {
735                        store_error(StoreOperation::ParseEventKey, e)
736                    })? + 1
737                } else {
738                    0
739                };
740
741            // When old events have been compacted away, the snapshot counter is
742            // still the authoritative lower bound for the next event index.
743            self.event_counter = self.state_counter.max(last_event_counter);
744
745            debug!(
746                "Recovery state: event_counter={}, state_counter={}",
747                self.event_counter, self.state_counter
748            );
749
750            if self.event_counter > self.state_counter {
751                warn!(
752                    event_counter = self.event_counter,
753                    state_counter = self.state_counter,
754                    "State mismatch detected, replaying events"
755                );
756                debug!(
757                    "Applying events from {} to {}",
758                    self.state_counter,
759                    self.event_counter - 1
760                );
761                let events =
762                    self.events(self.state_counter, self.event_counter - 1)?;
763                debug!("Found {} events to replay", events.len());
764
765                for (i, event) in events.iter().enumerate() {
766                    debug!("Applying event {} of {}", i + 1, events.len());
767                    state.apply(event).map_err(|e| {
768                        store_error(StoreOperation::ApplyEvent, e)
769                    })?;
770                }
771
772                debug!(
773                    "Updating snapshot after applying {} events",
774                    events.len()
775                );
776                self.snapshot(&state)?;
777                debug!(
778                    "Recovery completed. Final event_counter: {}",
779                    self.event_counter
780                );
781                // Note: We don't increment event_counter here as it already has the correct value
782                // from the last persisted event key or the latest snapshot boundary.
783            } else {
784                debug!("State is up to date, no events to apply");
785            }
786
787            Ok(Some(state))
788        } else {
789            debug!("No previous state found");
790
791            // Check if there are any events in the database
792            if let Some((key, ..)) = self.events.last()? {
793                debug!(
794                    "No snapshot but events found - replaying from beginning"
795                );
796
797                self.event_counter = key.parse::<u64>().map_err(|e| {
798                    store_error(StoreOperation::ParseEventKey, e)
799                })? + 1;
800                self.state_counter = 0;
801
802                debug!(
803                    "Using provided initial state and applying {} events",
804                    self.event_counter
805                );
806
807                // Use the initial state provided during Store creation
808                // This was created with create_initial(params) by the user
809                let mut state = self.initial_state.clone();
810
811                // Apply ALL events from the beginning
812                let events = self.events(0, self.event_counter - 1)?;
813                debug!("Replaying {} events from scratch", events.len());
814
815                for (i, event) in events.iter().enumerate() {
816                    debug!("Applying event {} of {}", i + 1, events.len());
817                    state.apply(event).map_err(|e| {
818                        store_error(StoreOperation::ApplyEvent, e)
819                    })?;
820                }
821
822                // Create snapshot for future recoveries
823                debug!("Creating snapshot after replaying events");
824                self.snapshot(&state)?;
825
826                debug!(
827                    "Recovery completed. Final event_counter: {}",
828                    self.event_counter
829                );
830
831                Ok(Some(state))
832            } else {
833                debug!("No previous state and no events found, starting fresh");
834                Ok(None)
835            }
836        }
837    }
838
839    /// Snapshot the current reconstructed state if there are events since the
840    /// last snapshot. Only relevant for FullPersistence — LightPersistence
841    /// already stores state with every event.
842    ///
843    /// Called automatically during shutdown via `pre_stop`.
844    fn snapshot_if_needed(&mut self) -> Result<(), Error> {
845        if !matches!(P::Persistence::get_persistence(), PersistenceType::Full) {
846            return Ok(());
847        }
848
849        if self.event_counter == 0 || self.event_counter <= self.state_counter {
850            return Ok(());
851        }
852
853        // Reconstruct state from last snapshot (or initial) + pending events.
854        let mut state = if let Some((s, _)) = self.get_state()? {
855            s
856        } else {
857            self.initial_state.clone()
858        };
859
860        let events = self.events(self.state_counter, self.event_counter - 1)?;
861        for event in &events {
862            state.apply(event).map_err(|e| {
863                store_error(StoreOperation::ApplyEventOnStop, e)
864            })?;
865        }
866
867        self.snapshot(&state)
868    }
869
870    /// Deletes all events, snapshots, and metadata, then resets all counters to zero.
871    ///
872    /// This permanently destroys all persisted data for the actor and cannot be undone.
873    pub fn purge(&mut self) -> Result<(), Error> {
874        self.events.purge()?;
875        self.states.purge()?;
876        self.metadata.purge()?;
877        self.event_counter = 0;
878        self.state_counter = 0;
879        self.compacted_until = 0;
880        Ok(())
881    }
882
883    /// Encrypts `bytes` with XChaCha20-Poly1305 using a fresh random nonce.
884    ///
885    /// The output format is `[nonce (24 bytes) || ciphertext || Poly1305 tag (16 bytes)]`.
886    /// Returns an error if the key cannot be decrypted from the [`EncryptedKey`] or if
887    /// the AEAD encryption fails.
888    fn encrypt(
889        &self,
890        key_box: &EncryptedKey,
891        bytes: &[u8],
892    ) -> Result<Vec<u8>, Error> {
893        if let Ok(key) = key_box.key() {
894            // Validate key size (XChaCha20-Poly1305 requires exactly 32 bytes)
895            if key.len() != 32 {
896                error!(
897                    expected = 32,
898                    got = key.len(),
899                    "Invalid encryption key length"
900                );
901                return Err(Error::Store {
902                    operation: StoreOperation::ValidateKeyLength,
903                    reason: format!(
904                        "Invalid key length: expected 32 bytes, got {}",
905                        key.len()
906                    ),
907                });
908            }
909
910            // Create cipher from key
911            let cipher = XChaCha20Poly1305::new(key.as_ref().into());
912
913            // Generate cryptographically secure random nonce (192-bits/24-bytes)
914            // XChaCha20 uses extended nonce, virtually eliminating collision risk
915            let nonce = XChaCha20Poly1305::generate_nonce(&mut OsRng);
916
917            // Encrypt and authenticate the data
918            // XChaCha20-Poly1305 provides both confidentiality and authenticity
919            let ciphertext: Vec<u8> =
920                cipher.encrypt(&nonce, bytes.as_ref()).map_err(|e| {
921                    error!(error = %e, "Encryption failed");
922                    store_error(StoreOperation::EncryptData, e)
923                })?;
924
925            // Prepend nonce to ciphertext for storage
926            // Format: [nonce (24 bytes) || ciphertext || poly1305_tag (16 bytes)]
927            Ok([nonce.to_vec(), ciphertext].concat())
928        } else {
929            error!("Failed to decrypt encryption key");
930            Err(store_error(StoreOperation::DecryptKey, "Can't decrypt key"))
931        }
932    }
933
934    /// Decrypts a XChaCha20-Poly1305 ciphertext produced by [`encrypt`](Store::encrypt).
935    ///
936    /// Reads the nonce from the first 24 bytes, then decrypts and verifies the authentication
937    /// tag. Returns an error if the ciphertext is too short, the key is unavailable, or
938    /// authentication fails (possible tampering or corruption).
939    fn decrypt(
940        &self,
941        key_box: &EncryptedKey,
942        ciphertext: &[u8],
943    ) -> Result<Vec<u8>, Error> {
944        // Validate ciphertext length (nonce + tag minimum = 24 + 16 = 40 bytes)
945        if ciphertext.len() < NONCE_SIZE + 16 {
946            warn!(
947                expected_min = NONCE_SIZE + 16,
948                got = ciphertext.len(),
949                "Invalid ciphertext length, possible corruption"
950            );
951            return Err(Error::Store {
952                operation: StoreOperation::ValidateCiphertext,
953                reason: format!(
954                    "Invalid ciphertext length: expected at least {} bytes, got {}",
955                    NONCE_SIZE + 16,
956                    ciphertext.len()
957                ),
958            });
959        }
960
961        if let Ok(key) = key_box.key() {
962            // Validate key size (XChaCha20-Poly1305 requires exactly 32 bytes)
963            if key.len() != 32 {
964                error!(
965                    expected = 32,
966                    got = key.len(),
967                    "Invalid decryption key length"
968                );
969                return Err(store_error(
970                    StoreOperation::ValidateKeyLength,
971                    format!(
972                        "Invalid key length: expected 32 bytes, got {}",
973                        key.len()
974                    ),
975                ));
976            }
977
978            // Extract nonce from the beginning of ciphertext
979            let nonce = XNonce::from_slice(&ciphertext[..NONCE_SIZE]);
980
981            // Extract actual ciphertext (includes Poly1305 authentication tag at the end)
982            let ciphertext_data = &ciphertext[NONCE_SIZE..];
983
984            // Create cipher and decrypt
985            let cipher = XChaCha20Poly1305::new(key.as_ref().into());
986
987            // Decrypt and verify authentication tag
988            // This will fail if data has been tampered with or corrupted
989            let plaintext =
990                cipher.decrypt(nonce, ciphertext_data).map_err(|e| {
991                    warn!(error = %e, "Decryption failed, possible tampering or corruption");
992                    store_error(
993                        StoreOperation::DecryptData,
994                        format!(
995                            "Decryption failed (possible tampering): {}",
996                            e
997                        ),
998                    )
999                })?;
1000
1001            Ok(plaintext)
1002        } else {
1003            error!("Failed to decrypt decryption key");
1004            Err(store_error(StoreOperation::DecryptKey, "Can't decrypt key"))
1005        }
1006    }
1007}
1008
1009/// Commands processed by the internal [`Store`] actor.
1010#[derive(Debug, Clone)]
1011pub enum StoreCommand<P, E> {
1012    /// Persist an event without forcing a snapshot.
1013    Persist(E),
1014    /// Persist an event and report whether the caller should snapshot now.
1015    PersistFullEvent {
1016        /// Event to append to the event log.
1017        event: E,
1018        /// Snapshot cadence for `FullPersistence`.
1019        snapshot_every: Option<u64>,
1020    },
1021    /// Persist an event and snapshot the supplied actor state if required.
1022    PersistFull {
1023        /// Event to append to the event log.
1024        event: E,
1025        /// Current actor state, used when a snapshot is triggered.
1026        actor: P,
1027        /// Snapshot cadence for `FullPersistence`.
1028        snapshot_every: Option<u64>,
1029    },
1030    /// Persist an event together with a snapshot of the supplied actor state.
1031    PersistLight(E, P),
1032    /// Snapshot the supplied actor state immediately.
1033    Snapshot(P),
1034    /// Remove event log entries already covered by the latest snapshot.
1035    Compact,
1036    /// Return the most recently persisted event.
1037    LastEvent,
1038    /// Return the next free event index.
1039    LastEventNumber,
1040    /// Return all events from the supplied event index to the end of the log.
1041    LastEventsFrom(u64),
1042    /// Return all events within the inclusive `[from, to]` range.
1043    GetEvents { from: u64, to: u64 },
1044    /// Recover the current actor state from snapshots and events.
1045    Recover,
1046    /// Delete all events, snapshots, and metadata for this actor.
1047    Purge,
1048}
1049
1050impl<P, E> Message for StoreCommand<P, E>
1051where
1052    P: PersistentActor,
1053    P::Event: BorshSerialize + BorshDeserialize,
1054    E: Event + BorshSerialize + BorshDeserialize,
1055{
1056}
1057
1058/// Responses returned by the [`Store`] actor for each [`StoreCommand`].
1059#[derive(Debug, Clone)]
1060pub enum StoreResponse<P>
1061where
1062    P: PersistentActor,
1063    P::Event: BorshSerialize + BorshDeserialize,
1064{
1065    /// Command completed without a payload.
1066    None,
1067    /// An event was persisted successfully.
1068    Persisted,
1069    /// A full-persistence write succeeded and a snapshot should be taken now.
1070    SnapshotRequired,
1071    /// A snapshot was stored successfully.
1072    Snapshotted,
1073    /// Event compaction completed successfully.
1074    Compacted,
1075    /// Recovered actor state, or `None` when no persisted state exists.
1076    State(Option<P>),
1077    /// Most recently persisted event, or `None` when the log is empty.
1078    LastEvent(Option<P::Event>),
1079    /// Next free event index.
1080    LastEventNumber(u64),
1081    /// Event payloads returned by a range query.
1082    Events(Vec<P::Event>),
1083}
1084
1085impl<P> Response for StoreResponse<P>
1086where
1087    P: PersistentActor,
1088    P::Event: BorshSerialize + BorshDeserialize,
1089{
1090}
1091
1092/// Events emitted by the [`Store`] actor (e.g. after a successful persist or snapshot).
1093#[derive(
1094    Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
1095)]
1096pub enum StoreEvent {
1097    /// Emitted after a successful persist operation.
1098    Persisted,
1099    /// Emitted after a successful snapshot.
1100    Snapshotted,
1101}
1102
1103impl Event for StoreEvent {}
1104
1105#[async_trait]
1106impl<P> Actor for Store<P>
1107where
1108    P: PersistentActor,
1109    P::Event: BorshSerialize + BorshDeserialize,
1110{
1111    type Message = StoreCommand<P, P::Event>;
1112    type Response = StoreResponse<P>;
1113    type Event = StoreEvent;
1114
1115    fn get_span(
1116        id: &str,
1117        _parent_span: Option<tracing::Span>,
1118    ) -> tracing::Span {
1119        info_span!("Store", id = %id)
1120    }
1121
1122    /// On shutdown, snapshot the current state if there are unsnapshotted
1123    /// events (FullPersistence only). This replaces the old `stop_store()`
1124    /// pattern where the parent actor was responsible for triggering snapshots.
1125    async fn pre_stop(
1126        &mut self,
1127        ctx: &mut ActorContext<Self>,
1128    ) -> Result<(), ActorError> {
1129        if let Err(e) = self.snapshot_if_needed() {
1130            error!(error = %e, "Failed to snapshot state during Store shutdown");
1131            let _ = ctx
1132                .emit_error(actor_store_error(
1133                    StoreOperation::EmitPreStopError,
1134                    e,
1135                ))
1136                .await;
1137        }
1138        Ok(())
1139    }
1140}
1141
1142#[async_trait]
1143impl<P> Handler<Self> for Store<P>
1144where
1145    P: PersistentActor,
1146    P::Event: BorshSerialize + BorshDeserialize,
1147{
1148    async fn handle_message(
1149        &mut self,
1150        _sender: ActorPath,
1151        msg: StoreCommand<P, P::Event>,
1152        _ctx: &mut ActorContext<Self>,
1153    ) -> Result<StoreResponse<P>, ActorError> {
1154        // Match the command.
1155        match msg {
1156            // Persist an event.
1157            StoreCommand::Persist(event) => {
1158                self.persist(&event).map_err(|e| {
1159                    actor_store_error(StoreOperation::Persist, e)
1160                })?;
1161                debug!("Persisted event: {:?}", event);
1162                Ok(StoreResponse::Persisted)
1163            }
1164            StoreCommand::PersistFullEvent {
1165                event,
1166                snapshot_every,
1167            } => {
1168                self.persist(&event).map_err(|e| {
1169                    actor_store_error(StoreOperation::PersistFull, e)
1170                })?;
1171
1172                if snapshot_every.is_some_and(|every| {
1173                    self.pending_events_since_snapshot() >= every
1174                }) {
1175                    debug!("Persisted full event and snapshot is now required");
1176                    Ok(StoreResponse::SnapshotRequired)
1177                } else {
1178                    debug!("Persisted full event: {:?}", event);
1179                    Ok(StoreResponse::Persisted)
1180                }
1181            }
1182            StoreCommand::PersistFull {
1183                event,
1184                actor,
1185                snapshot_every,
1186            } => {
1187                self.persist(&event).map_err(|e| {
1188                    actor_store_error(StoreOperation::PersistFull, e)
1189                })?;
1190
1191                if snapshot_every.is_some_and(|every| {
1192                    self.pending_events_since_snapshot() >= every
1193                }) {
1194                    self.snapshot(&actor).map_err(|e| {
1195                        actor_store_error(StoreOperation::Snapshot, e)
1196                    })?;
1197                }
1198
1199                debug!("Persisted full event: {:?}", event);
1200                Ok(StoreResponse::Persisted)
1201            }
1202            // Light persistence of an event.
1203            StoreCommand::PersistLight(event, actor) => {
1204                self.persist_state(&event, &actor).map_err(|e| {
1205                    actor_store_error(StoreOperation::PersistLight, e)
1206                })?;
1207                debug!("Light persistence of event: {:?}", event);
1208                Ok(StoreResponse::Persisted)
1209            }
1210            // Snapshot the state.
1211            StoreCommand::Snapshot(actor) => {
1212                self.snapshot(&actor).map_err(|e| {
1213                    actor_store_error(StoreOperation::Snapshot, e)
1214                })?;
1215                debug!("Snapshotted state: {:?}", actor);
1216                Ok(StoreResponse::Snapshotted)
1217            }
1218            StoreCommand::Compact => {
1219                self.compact_to_snapshot().map_err(|e| {
1220                    actor_store_error(StoreOperation::Compact, e)
1221                })?;
1222                debug!("Compacted events covered by the latest snapshot");
1223                Ok(StoreResponse::Compacted)
1224            }
1225            // Recover the state.
1226            StoreCommand::Recover => {
1227                let state = self.recover().map_err(|e| {
1228                    actor_store_error(StoreOperation::Recover, e)
1229                })?;
1230                debug!("Recovered state: {:?}", state);
1231                Ok(StoreResponse::State(state))
1232            }
1233            StoreCommand::GetEvents { from, to } => {
1234                let events = self.query_events(from, to).map_err(|e| {
1235                    actor_store_error(
1236                        StoreOperation::GetEventsRange,
1237                        format!("Unable to get events range: {}", e),
1238                    )
1239                })?;
1240                Ok(StoreResponse::Events(events))
1241            }
1242            // Get the last event.
1243            StoreCommand::LastEvent => {
1244                let event = self.last_event().map_err(|e| {
1245                    actor_store_error(StoreOperation::LastEvent, e)
1246                })?;
1247                debug!("Last event: {:?}", event);
1248                Ok(StoreResponse::LastEvent(event))
1249            }
1250            // Purge the store.
1251            StoreCommand::Purge => {
1252                self.purge()
1253                    .map_err(|e| actor_store_error(StoreOperation::Purge, e))?;
1254                debug!("Purged store");
1255                Ok(StoreResponse::None)
1256            }
1257            // Get the last event number.
1258            StoreCommand::LastEventNumber => {
1259                Ok(StoreResponse::LastEventNumber(self.event_counter))
1260            }
1261            // Get the last events from a number of counter.
1262            StoreCommand::LastEventsFrom(from) => {
1263                let to = self.event_counter.saturating_sub(1);
1264                let events = self.events(from, to).map_err(|e| {
1265                    actor_store_error(
1266                        StoreOperation::GetLatestEvents,
1267                        format!("Unable to get the latest events: {}", e),
1268                    )
1269                })?;
1270                Ok(StoreResponse::Events(events))
1271            }
1272        }
1273    }
1274}
1275
1276#[cfg(test)]
1277mod tests {
1278    use std::vec;
1279    use test_log::test;
1280    use tokio_util::sync::CancellationToken;
1281    use tracing::info_span;
1282
1283    use super::*;
1284    use crate::memory::MemoryManager;
1285
1286    use ave_actors_actor::{ActorRef, ActorSystem, Error as ActorError};
1287
1288    use async_trait::async_trait;
1289
1290    #[derive(
1291        Debug,
1292        Clone,
1293        Serialize,
1294        Deserialize,
1295        BorshSerialize,
1296        BorshDeserialize,
1297        Default,
1298    )]
1299    struct TestActor {
1300        pub version: usize,
1301        pub value: i32,
1302    }
1303
1304    #[derive(
1305        Debug,
1306        Clone,
1307        Serialize,
1308        Deserialize,
1309        BorshSerialize,
1310        BorshDeserialize,
1311        Default,
1312    )]
1313    struct TestActorLight {
1314        pub data: Vec<i32>,
1315    }
1316
1317    #[derive(Debug, Clone, Serialize, Deserialize)]
1318    enum TestMessageLight {
1319        SetData(Vec<i32>),
1320        GetData,
1321    }
1322
1323    #[derive(Debug, Clone, Serialize, Deserialize)]
1324    enum TestMessage {
1325        Increment(i32),
1326        Recover,
1327        Snapshot,
1328        GetValue,
1329    }
1330
1331    impl Message for TestMessage {}
1332    impl Message for TestMessageLight {}
1333
1334    #[derive(
1335        Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
1336    )]
1337    struct TestEvent(i32);
1338
1339    impl Event for TestEvent {}
1340
1341    #[derive(
1342        Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
1343    )]
1344    struct TestEventLight(Vec<i32>);
1345
1346    impl Event for TestEventLight {}
1347
1348    #[derive(Debug, Clone, PartialEq)]
1349    enum TestResponse {
1350        Value(i32),
1351        None,
1352    }
1353
1354    #[derive(Debug, Clone, PartialEq)]
1355    enum TestResponseLight {
1356        Data(Vec<i32>),
1357        None,
1358    }
1359
1360    impl Response for TestResponse {}
1361    impl Response for TestResponseLight {}
1362
1363    #[async_trait]
1364    impl Actor for TestActorLight {
1365        type Message = TestMessageLight;
1366        type Event = TestEventLight;
1367        type Response = TestResponseLight;
1368
1369        fn get_span(
1370            id: &str,
1371            _parent_span: Option<tracing::Span>,
1372        ) -> tracing::Span {
1373            info_span!("TestActorLight", id = %id)
1374        }
1375
1376        async fn pre_start(
1377            &mut self,
1378            ctx: &mut ActorContext<Self>,
1379        ) -> Result<(), ActorError> {
1380            let memory_db: MemoryManager =
1381                ctx.system().get_helper("db").await.unwrap();
1382
1383            let encrypt_key = EncryptedKey::new(&[3u8; 32]).unwrap();
1384
1385            let db = Store::<Self>::new(
1386                "store",
1387                "prefix",
1388                memory_db,
1389                Some(encrypt_key),
1390                Self::create_initial(()),
1391            )
1392            .unwrap();
1393
1394            let store = ctx.create_child("store", db).await.unwrap();
1395            let response = store.ask(StoreCommand::Recover).await?;
1396
1397            if let StoreResponse::State(Some(state)) = response {
1398                self.update(state);
1399            } else {
1400                debug!("Create first snapshot");
1401                store
1402                    .tell(StoreCommand::Snapshot(self.clone()))
1403                    .await
1404                    .unwrap();
1405            }
1406
1407            Ok(())
1408        }
1409    }
1410
1411    #[async_trait]
1412    impl Actor for TestActor {
1413        type Message = TestMessage;
1414        type Event = TestEvent;
1415        type Response = TestResponse;
1416
1417        fn get_span(
1418            id: &str,
1419            _parent_span: Option<tracing::Span>,
1420        ) -> tracing::Span {
1421            info_span!("TestActor", id = %id)
1422        }
1423
1424        async fn pre_start(
1425            &mut self,
1426            ctx: &mut ActorContext<Self>,
1427        ) -> Result<(), ActorError> {
1428            let db = Store::<Self>::new(
1429                "store",
1430                "prefix",
1431                MemoryManager::default(),
1432                None,
1433                Self::create_initial(()),
1434            )
1435            .unwrap();
1436            let store = ctx.create_child("store", db).await.unwrap();
1437            let response = store.ask(StoreCommand::Recover).await.unwrap();
1438            debug!("Recover response: {:?}", response);
1439            if let StoreResponse::State(Some(state)) = response {
1440                debug!("Recovering state: {:?}", state);
1441                self.update(state);
1442            }
1443            Ok(())
1444        }
1445    }
1446
1447    #[async_trait]
1448    impl PersistentActor for TestActorLight {
1449        type Persistence = LightPersistence;
1450        type InitParams = ();
1451
1452        fn create_initial(_: ()) -> Self {
1453            Self { data: Vec::new() }
1454        }
1455
1456        fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1457            self.data.clone_from(&event.0);
1458            Ok(())
1459        }
1460    }
1461
1462    #[async_trait]
1463    impl PersistentActor for TestActor {
1464        type Persistence = FullPersistence;
1465        type InitParams = ();
1466
1467        fn create_initial(_: ()) -> Self {
1468            Self {
1469                version: 0,
1470                value: 0,
1471            }
1472        }
1473
1474        fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1475            self.version += 1;
1476            self.value += event.0;
1477            Ok(())
1478        }
1479    }
1480
1481    #[async_trait]
1482    impl Handler<TestActorLight> for TestActorLight {
1483        async fn handle_message(
1484            &mut self,
1485            _sender: ActorPath,
1486            msg: TestMessageLight,
1487            ctx: &mut ActorContext<TestActorLight>,
1488        ) -> Result<TestResponseLight, ActorError> {
1489            match msg {
1490                TestMessageLight::SetData(data) => {
1491                    self.on_event(TestEventLight(data), ctx).await;
1492                    Ok(TestResponseLight::None)
1493                }
1494                TestMessageLight::GetData => {
1495                    Ok(TestResponseLight::Data(self.data.clone()))
1496                }
1497            }
1498        }
1499
1500        async fn on_event(
1501            &mut self,
1502            event: TestEventLight,
1503            ctx: &mut ActorContext<TestActorLight>,
1504        ) -> () {
1505            self.persist(&event, ctx).await.unwrap();
1506        }
1507    }
1508
1509    #[async_trait]
1510    impl Handler<TestActor> for TestActor {
1511        async fn handle_message(
1512            &mut self,
1513            _sender: ActorPath,
1514            msg: TestMessage,
1515            ctx: &mut ActorContext<TestActor>,
1516        ) -> Result<TestResponse, ActorError> {
1517            match msg {
1518                TestMessage::Increment(value) => {
1519                    let event = TestEvent(value);
1520                    self.on_event(event, ctx).await;
1521                    Ok(TestResponse::None)
1522                }
1523                TestMessage::Recover => {
1524                    let store: ActorRef<Store<Self>> =
1525                        ctx.get_child("store").await.unwrap();
1526                    let response =
1527                        store.ask(StoreCommand::Recover).await.unwrap();
1528                    if let StoreResponse::State(Some(state)) = response {
1529                        self.update(state.clone());
1530                        Ok(TestResponse::Value(state.value))
1531                    } else {
1532                        Ok(TestResponse::None)
1533                    }
1534                }
1535                TestMessage::Snapshot => {
1536                    let store: ActorRef<Store<Self>> =
1537                        ctx.get_child("store").await.unwrap();
1538                    store
1539                        .ask(StoreCommand::Snapshot(self.clone()))
1540                        .await
1541                        .unwrap();
1542                    Ok(TestResponse::None)
1543                }
1544                TestMessage::GetValue => Ok(TestResponse::Value(self.value)),
1545            }
1546        }
1547
1548        async fn on_event(
1549            &mut self,
1550            event: TestEvent,
1551            ctx: &mut ActorContext<TestActor>,
1552        ) -> () {
1553            self.persist(&event, ctx).await.unwrap();
1554        }
1555    }
1556
1557    #[test(tokio::test)]
1558    async fn test_store_actor() {
1559        let (system, mut runner) = ActorSystem::create(
1560            CancellationToken::new(),
1561            CancellationToken::new(),
1562        );
1563        // Init runner.
1564        tokio::spawn(async move {
1565            runner.run().await;
1566        });
1567
1568        let encrypt_key =
1569            EncryptedKey::new(b"0123456789abcdef0123456789abcdef").unwrap();
1570        let db = Store::<TestActor>::new(
1571            "store",
1572            "test",
1573            MemoryManager::default(),
1574            Some(encrypt_key),
1575            TestActor::create_initial(()),
1576        )
1577        .unwrap();
1578        let store = system.create_root_actor("store", db).await.unwrap();
1579
1580        let mut actor = TestActor {
1581            version: 0,
1582            value: 0,
1583        };
1584        store
1585            .tell(StoreCommand::Snapshot(actor.clone()))
1586            .await
1587            .unwrap();
1588        store
1589            .tell(StoreCommand::Persist(TestEvent(10)))
1590            .await
1591            .unwrap();
1592        actor.apply(&TestEvent(10)).unwrap();
1593        store
1594            .tell(StoreCommand::Snapshot(actor.clone()))
1595            .await
1596            .unwrap();
1597        store
1598            .tell(StoreCommand::Persist(TestEvent(10)))
1599            .await
1600            .unwrap();
1601
1602        actor.apply(&TestEvent(10)).unwrap();
1603        let response = store.ask(StoreCommand::Recover).await.unwrap();
1604        if let StoreResponse::State(Some(state)) = response {
1605            assert_eq!(state.value, actor.value);
1606        }
1607        let response = store.ask(StoreCommand::Recover).await.unwrap();
1608        if let StoreResponse::State(Some(state)) = response {
1609            assert_eq!(state.value, actor.value);
1610        }
1611        let response = store.ask(StoreCommand::LastEvent).await.unwrap();
1612        if let StoreResponse::LastEvent(Some(event)) = response {
1613            assert_eq!(event.0, 10);
1614        } else {
1615            panic!("Event not found");
1616        }
1617        let response = store.ask(StoreCommand::LastEventNumber).await.unwrap();
1618        if let StoreResponse::LastEventNumber(number) = response {
1619            assert_eq!(number, 2);
1620        } else {
1621            panic!("Event number not found");
1622        }
1623        let response =
1624            store.ask(StoreCommand::LastEventsFrom(1)).await.unwrap();
1625        if let StoreResponse::Events(events) = response {
1626            assert_eq!(events.len(), 1);
1627            assert_eq!(events[0].0, 10);
1628        } else {
1629            panic!("Events not found");
1630        }
1631        let response = store
1632            .ask(StoreCommand::GetEvents { from: 0, to: 1 })
1633            .await
1634            .unwrap();
1635        if let StoreResponse::Events(events) = response {
1636            assert_eq!(events.len(), 2);
1637            assert_eq!(events[0].0, 10);
1638            assert_eq!(events[1].0, 10);
1639        } else {
1640            panic!("Events not found");
1641        }
1642    }
1643
1644    #[test(tokio::test)]
1645    async fn test_persistent_light_actor() {
1646        let (system, ..) = ActorSystem::create(
1647            CancellationToken::new(),
1648            CancellationToken::new(),
1649        );
1650
1651        system.add_helper("db", MemoryManager::default()).await;
1652
1653        let actor_ref = system
1654            .create_root_actor("test", TestActorLight::initial(()))
1655            .await
1656            .unwrap();
1657
1658        let result = actor_ref
1659            .ask(TestMessageLight::SetData(vec![12, 13, 14, 15]))
1660            .await
1661            .unwrap();
1662
1663        assert_eq!(result, TestResponseLight::None);
1664
1665        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1666
1667        actor_ref.ask_stop().await.unwrap();
1668
1669        let actor_ref = system
1670            .create_root_actor("test", TestActorLight::initial(()))
1671            .await
1672            .unwrap();
1673
1674        let result = actor_ref.ask(TestMessageLight::GetData).await.unwrap();
1675
1676        let TestResponseLight::Data(data) = result else {
1677            panic!("Invalid response")
1678        };
1679
1680        assert_eq!(data, vec![12, 13, 14, 15]);
1681    }
1682
1683    #[test(tokio::test)]
1684    async fn test_persistent_actor() {
1685        let (system, mut runner) = ActorSystem::create(
1686            CancellationToken::new(),
1687            CancellationToken::new(),
1688        );
1689        // Init runner.
1690        tokio::spawn(async move {
1691            runner.run().await;
1692        });
1693
1694        let actor_ref = system
1695            .create_root_actor("test", TestActor::initial(()))
1696            .await
1697            .unwrap();
1698
1699        let result = actor_ref.ask(TestMessage::Increment(10)).await.unwrap();
1700
1701        assert_eq!(result, TestResponse::None);
1702
1703        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1704
1705        actor_ref.tell(TestMessage::Snapshot).await.unwrap();
1706
1707        let result = actor_ref.ask(TestMessage::GetValue).await.unwrap();
1708
1709        assert_eq!(result, TestResponse::Value(10));
1710        actor_ref.tell(TestMessage::Increment(10)).await.unwrap();
1711        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1712
1713        let value = actor_ref.ask(TestMessage::GetValue).await.unwrap();
1714
1715        assert_eq!(value, TestResponse::Value(20));
1716
1717        actor_ref.ask(TestMessage::Recover).await.unwrap();
1718
1719        let value = actor_ref.ask(TestMessage::GetValue).await.unwrap();
1720
1721        assert_eq!(value, TestResponse::Value(20));
1722
1723        actor_ref.ask_stop().await.unwrap();
1724        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
1725    }
1726
1727    #[test(tokio::test)]
1728    async fn test_encrypt_decrypt() {
1729        let encrypt_key = EncryptedKey::new(&[0u8; 32]).unwrap();
1730
1731        let store = Store::<TestActor>::new(
1732            "store",
1733            "test",
1734            MemoryManager::default(),
1735            Some(encrypt_key),
1736            TestActor::create_initial(()),
1737        )
1738        .unwrap();
1739        let data = b"Hello, world!";
1740        let encrypted = store
1741            .encrypt(&store.key_box.clone().unwrap(), data)
1742            .unwrap();
1743        let decrypted = store
1744            .decrypt(&store.key_box.clone().unwrap(), &encrypted)
1745            .unwrap();
1746        assert_eq!(data, decrypted.as_slice());
1747    }
1748}