Skip to main content

atomr_persistence/
receive_persistent.rs

1//! `ReceivePersistent` — closure-style helper for persistent actors.
2//!
3//! Phase 11.D of `docs/full-port-plan.md`. Akka.NET parity:
4//! `Akka.Persistence.ReceivePersistentActor`. Where [`crate::Eventsourced`]
5//! makes the user implement a trait, `ReceivePersistent` lets ad-hoc /
6//! script-shaped actors register handler closures up front and run a
7//! command loop without a custom struct.
8//!
9//! ```ignore
10//! use atomr_persistence::{ReceivePersistent, RecoveryPermitter, InMemoryJournal};
11//! # async fn ex() {
12//! let journal = std::sync::Arc::new(InMemoryJournal::default());
13//! let mut rp: ReceivePersistent<i64, i64, &'static str> = ReceivePersistent::new("counter")
14//!     .on_command(|state, cmd| Ok(vec![cmd]))
15//!     .on_event(|state, evt| { *state += evt; })
16//!     .with_codec(
17//!         |e| Ok(e.to_le_bytes().to_vec()),
18//!         |b| Ok(i64::from_le_bytes(b.try_into().map_err(|_| "len".to_string())?)),
19//!     );
20//! let permits = RecoveryPermitter::new(1);
21//! rp.recover(journal.clone(), &permits).await.unwrap();
22//! rp.handle(journal.clone(), 5).await.unwrap();
23//! assert_eq!(rp.state(), &5);
24//! # }
25//! ```
26
27use std::sync::Arc;
28
29use crate::eventsourced::EventsourcedError;
30use crate::journal::{Journal, PersistentRepr};
31use crate::recovery_permitter::RecoveryPermitter;
32
33type CommandFn<S, C, E, Err> = Box<dyn FnMut(&S, C) -> Result<Vec<E>, Err> + Send>;
34type EventFn<S, E> = Box<dyn FnMut(&mut S, &E) + Send>;
35type EncodeFn<E> = Box<dyn Fn(&E) -> Result<Vec<u8>, String> + Send + Sync>;
36type DecodeFn<E> = Box<dyn Fn(&[u8]) -> Result<E, String> + Send + Sync>;
37
38/// Closure-style persistent actor.
39pub struct ReceivePersistent<S, E, Err>
40where
41    S: Default + Send + 'static,
42    E: Clone + Send + 'static,
43    Err: std::error::Error + Send + 'static,
44{
45    persistence_id: String,
46    state: S,
47    next_seq: u64,
48    writer_uuid: String,
49    on_command: Option<CommandFn<S, E, E, Err>>,
50    on_event: Option<EventFn<S, E>>,
51    encode: Option<EncodeFn<E>>,
52    decode: Option<DecodeFn<E>>,
53}
54
55impl<S, E, Err> ReceivePersistent<S, E, Err>
56where
57    S: Default + Send + 'static,
58    E: Clone + Send + 'static,
59    Err: std::error::Error + Send + 'static,
60{
61    pub fn new(persistence_id: impl Into<String>) -> Self {
62        Self {
63            persistence_id: persistence_id.into(),
64            state: S::default(),
65            next_seq: 0,
66            writer_uuid: format!("{}-{}", std::process::id(), uuid_v4_simple()),
67            on_command: None,
68            on_event: None,
69            encode: None,
70            decode: None,
71        }
72    }
73
74    /// Register the command handler. The closure receives the current
75    /// state + the incoming command (the command type matches the
76    /// event type for this minimal helper; richer command-vs-event
77    /// shapes use `Eventsourced` directly).
78    pub fn on_command<F>(mut self, f: F) -> Self
79    where
80        F: FnMut(&S, E) -> Result<Vec<E>, Err> + Send + 'static,
81    {
82        self.on_command = Some(Box::new(f));
83        self
84    }
85
86    /// Register the event applier — mutates state in-place.
87    pub fn on_event<F>(mut self, f: F) -> Self
88    where
89        F: FnMut(&mut S, &E) + Send + 'static,
90    {
91        self.on_event = Some(Box::new(f));
92        self
93    }
94
95    /// Register the codec used to round-trip events through the journal.
96    pub fn with_codec<EncF, DecF>(mut self, encode: EncF, decode: DecF) -> Self
97    where
98        EncF: Fn(&E) -> Result<Vec<u8>, String> + Send + Sync + 'static,
99        DecF: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
100    {
101        self.encode = Some(Box::new(encode));
102        self.decode = Some(Box::new(decode));
103        self
104    }
105
106    pub fn state(&self) -> &S {
107        &self.state
108    }
109
110    pub fn persistence_id(&self) -> &str {
111        &self.persistence_id
112    }
113
114    /// Replay the journal under `permitter` and apply each event.
115    pub async fn recover<J: Journal>(
116        &mut self,
117        journal: Arc<J>,
118        permitter: &RecoveryPermitter,
119    ) -> Result<u64, EventsourcedError<Err>> {
120        let _permit = permitter.acquire().await.ok_or(EventsourcedError::PermitDenied)?;
121        let on_event = self
122            .on_event
123            .as_mut()
124            .ok_or_else(|| EventsourcedError::Codec("on_event handler not registered".into()))?;
125        let decode =
126            self.decode.as_ref().ok_or_else(|| EventsourcedError::Codec("decoder not registered".into()))?;
127        let highest = journal.highest_sequence_nr(&self.persistence_id, 0).await?;
128        let events = journal.replay_messages(&self.persistence_id, 1, highest, u64::MAX).await?;
129        for e in &events {
130            let evt = decode(&e.payload).map_err(EventsourcedError::Codec)?;
131            on_event(&mut self.state, &evt);
132        }
133        self.next_seq = highest;
134        Ok(highest)
135    }
136
137    /// Apply one command — derive events, persist, apply.
138    pub async fn handle<J: Journal>(
139        &mut self,
140        journal: Arc<J>,
141        cmd: E,
142    ) -> Result<(), EventsourcedError<Err>> {
143        let on_cmd = self
144            .on_command
145            .as_mut()
146            .ok_or_else(|| EventsourcedError::Codec("on_command handler not registered".into()))?;
147        let events = on_cmd(&self.state, cmd).map_err(EventsourcedError::Domain)?;
148        if events.is_empty() {
149            return Ok(());
150        }
151        let on_event = self
152            .on_event
153            .as_mut()
154            .ok_or_else(|| EventsourcedError::Codec("on_event handler not registered".into()))?;
155        let encode =
156            self.encode.as_ref().ok_or_else(|| EventsourcedError::Codec("encoder not registered".into()))?;
157        let mut reprs = Vec::with_capacity(events.len());
158        for e in &events {
159            self.next_seq += 1;
160            let payload = encode(e).map_err(EventsourcedError::Codec)?;
161            reprs.push(PersistentRepr {
162                persistence_id: self.persistence_id.clone(),
163                sequence_nr: self.next_seq,
164                payload,
165                manifest: "evt".into(),
166                writer_uuid: self.writer_uuid.clone(),
167                deleted: false,
168                tags: Vec::new(),
169            });
170        }
171        journal.write_messages(reprs).await?;
172        for e in &events {
173            on_event(&mut self.state, e);
174        }
175        Ok(())
176    }
177}
178
179fn uuid_v4_simple() -> String {
180    // Tiny non-cryptographic id for writer_uuid. Good enough for
181    // dedup purposes — the journal only uses this to disambiguate
182    // concurrent writers.
183    use std::time::{SystemTime, UNIX_EPOCH};
184    let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
185    format!("{nanos:x}")
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::InMemoryJournal;
192
193    #[derive(Debug, thiserror::Error)]
194    #[error("dummy")]
195    struct DummyErr;
196
197    #[tokio::test]
198    async fn closure_actor_persists_and_recovers() {
199        let journal = Arc::new(InMemoryJournal::default());
200        let permits = RecoveryPermitter::new(1);
201
202        let mut rp: ReceivePersistent<i64, i64, DummyErr> = ReceivePersistent::new("pid-1")
203            .on_command(|_state, cmd| Ok(vec![cmd]))
204            .on_event(|state, evt| {
205                *state += evt;
206            })
207            .with_codec(
208                |e: &i64| Ok(e.to_le_bytes().to_vec()),
209                |b: &[u8]| {
210                    let arr: [u8; 8] = b.try_into().map_err(|_| "len".to_string())?;
211                    Ok(i64::from_le_bytes(arr))
212                },
213            );
214
215        rp.handle(journal.clone(), 5).await.unwrap();
216        rp.handle(journal.clone(), 3).await.unwrap();
217        rp.handle(journal.clone(), -2).await.unwrap();
218        assert_eq!(rp.state(), &6);
219
220        // Fresh replay reaches the same state.
221        let mut rp2: ReceivePersistent<i64, i64, DummyErr> = ReceivePersistent::new("pid-1")
222            .on_command(|_state, cmd| Ok(vec![cmd]))
223            .on_event(|state, evt| {
224                *state += evt;
225            })
226            .with_codec(
227                |e: &i64| Ok(e.to_le_bytes().to_vec()),
228                |b: &[u8]| {
229                    let arr: [u8; 8] = b.try_into().map_err(|_| "len".to_string())?;
230                    Ok(i64::from_le_bytes(arr))
231                },
232            );
233        rp2.recover(journal.clone(), &permits).await.unwrap();
234        assert_eq!(rp2.state(), &6);
235    }
236
237    #[tokio::test]
238    async fn missing_codec_is_a_typed_error() {
239        let journal = Arc::new(InMemoryJournal::default());
240        let mut rp: ReceivePersistent<i64, i64, DummyErr> =
241            ReceivePersistent::new("pid-2").on_command(|_, c| Ok(vec![c])).on_event(|s, e| {
242                *s += e;
243            });
244        let r = rp.handle(journal, 1).await;
245        assert!(matches!(r, Err(EventsourcedError::Codec(_))));
246    }
247}