1use std::sync::Arc;
28
29use crate::eventsourced::EventsourcedError;
30use crate::journal::{Journal, PersistentRepr};
31use crate::recovery_permitter::RecoveryPermitter;
32
33type CommandFn<S, C, E, Err> = Box<dyn FnMut(&S, C) -> Result<Vec<E>, Err> + Send>;
34type EventFn<S, E> = Box<dyn FnMut(&mut S, &E) + Send>;
35type EncodeFn<E> = Box<dyn Fn(&E) -> Result<Vec<u8>, String> + Send + Sync>;
36type DecodeFn<E> = Box<dyn Fn(&[u8]) -> Result<E, String> + Send + Sync>;
37
38pub struct ReceivePersistent<S, E, Err>
40where
41 S: Default + Send + 'static,
42 E: Clone + Send + 'static,
43 Err: std::error::Error + Send + 'static,
44{
45 persistence_id: String,
46 state: S,
47 next_seq: u64,
48 writer_uuid: String,
49 on_command: Option<CommandFn<S, E, E, Err>>,
50 on_event: Option<EventFn<S, E>>,
51 encode: Option<EncodeFn<E>>,
52 decode: Option<DecodeFn<E>>,
53}
54
55impl<S, E, Err> ReceivePersistent<S, E, Err>
56where
57 S: Default + Send + 'static,
58 E: Clone + Send + 'static,
59 Err: std::error::Error + Send + 'static,
60{
61 pub fn new(persistence_id: impl Into<String>) -> Self {
62 Self {
63 persistence_id: persistence_id.into(),
64 state: S::default(),
65 next_seq: 0,
66 writer_uuid: format!("{}-{}", std::process::id(), uuid_v4_simple()),
67 on_command: None,
68 on_event: None,
69 encode: None,
70 decode: None,
71 }
72 }
73
74 pub fn on_command<F>(mut self, f: F) -> Self
79 where
80 F: FnMut(&S, E) -> Result<Vec<E>, Err> + Send + 'static,
81 {
82 self.on_command = Some(Box::new(f));
83 self
84 }
85
86 pub fn on_event<F>(mut self, f: F) -> Self
88 where
89 F: FnMut(&mut S, &E) + Send + 'static,
90 {
91 self.on_event = Some(Box::new(f));
92 self
93 }
94
95 pub fn with_codec<EncF, DecF>(mut self, encode: EncF, decode: DecF) -> Self
97 where
98 EncF: Fn(&E) -> Result<Vec<u8>, String> + Send + Sync + 'static,
99 DecF: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
100 {
101 self.encode = Some(Box::new(encode));
102 self.decode = Some(Box::new(decode));
103 self
104 }
105
106 pub fn state(&self) -> &S {
107 &self.state
108 }
109
110 pub fn persistence_id(&self) -> &str {
111 &self.persistence_id
112 }
113
114 pub async fn recover<J: Journal>(
116 &mut self,
117 journal: Arc<J>,
118 permitter: &RecoveryPermitter,
119 ) -> Result<u64, EventsourcedError<Err>> {
120 let _permit = permitter.acquire().await.ok_or(EventsourcedError::PermitDenied)?;
121 let on_event = self
122 .on_event
123 .as_mut()
124 .ok_or_else(|| EventsourcedError::Codec("on_event handler not registered".into()))?;
125 let decode =
126 self.decode.as_ref().ok_or_else(|| EventsourcedError::Codec("decoder not registered".into()))?;
127 let highest = journal.highest_sequence_nr(&self.persistence_id, 0).await?;
128 let events = journal.replay_messages(&self.persistence_id, 1, highest, u64::MAX).await?;
129 for e in &events {
130 let evt = decode(&e.payload).map_err(EventsourcedError::Codec)?;
131 on_event(&mut self.state, &evt);
132 }
133 self.next_seq = highest;
134 Ok(highest)
135 }
136
137 pub async fn handle<J: Journal>(
139 &mut self,
140 journal: Arc<J>,
141 cmd: E,
142 ) -> Result<(), EventsourcedError<Err>> {
143 let on_cmd = self
144 .on_command
145 .as_mut()
146 .ok_or_else(|| EventsourcedError::Codec("on_command handler not registered".into()))?;
147 let events = on_cmd(&self.state, cmd).map_err(EventsourcedError::Domain)?;
148 if events.is_empty() {
149 return Ok(());
150 }
151 let on_event = self
152 .on_event
153 .as_mut()
154 .ok_or_else(|| EventsourcedError::Codec("on_event handler not registered".into()))?;
155 let encode =
156 self.encode.as_ref().ok_or_else(|| EventsourcedError::Codec("encoder not registered".into()))?;
157 let mut reprs = Vec::with_capacity(events.len());
158 for e in &events {
159 self.next_seq += 1;
160 let payload = encode(e).map_err(EventsourcedError::Codec)?;
161 reprs.push(PersistentRepr {
162 persistence_id: self.persistence_id.clone(),
163 sequence_nr: self.next_seq,
164 payload,
165 manifest: "evt".into(),
166 writer_uuid: self.writer_uuid.clone(),
167 deleted: false,
168 tags: Vec::new(),
169 });
170 }
171 journal.write_messages(reprs).await?;
172 for e in &events {
173 on_event(&mut self.state, e);
174 }
175 Ok(())
176 }
177}
178
179fn uuid_v4_simple() -> String {
180 use std::time::{SystemTime, UNIX_EPOCH};
184 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
185 format!("{nanos:x}")
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::InMemoryJournal;
192
193 #[derive(Debug, thiserror::Error)]
194 #[error("dummy")]
195 struct DummyErr;
196
197 #[tokio::test]
198 async fn closure_actor_persists_and_recovers() {
199 let journal = Arc::new(InMemoryJournal::default());
200 let permits = RecoveryPermitter::new(1);
201
202 let mut rp: ReceivePersistent<i64, i64, DummyErr> = ReceivePersistent::new("pid-1")
203 .on_command(|_state, cmd| Ok(vec![cmd]))
204 .on_event(|state, evt| {
205 *state += evt;
206 })
207 .with_codec(
208 |e: &i64| Ok(e.to_le_bytes().to_vec()),
209 |b: &[u8]| {
210 let arr: [u8; 8] = b.try_into().map_err(|_| "len".to_string())?;
211 Ok(i64::from_le_bytes(arr))
212 },
213 );
214
215 rp.handle(journal.clone(), 5).await.unwrap();
216 rp.handle(journal.clone(), 3).await.unwrap();
217 rp.handle(journal.clone(), -2).await.unwrap();
218 assert_eq!(rp.state(), &6);
219
220 let mut rp2: ReceivePersistent<i64, i64, DummyErr> = ReceivePersistent::new("pid-1")
222 .on_command(|_state, cmd| Ok(vec![cmd]))
223 .on_event(|state, evt| {
224 *state += evt;
225 })
226 .with_codec(
227 |e: &i64| Ok(e.to_le_bytes().to_vec()),
228 |b: &[u8]| {
229 let arr: [u8; 8] = b.try_into().map_err(|_| "len".to_string())?;
230 Ok(i64::from_le_bytes(arr))
231 },
232 );
233 rp2.recover(journal.clone(), &permits).await.unwrap();
234 assert_eq!(rp2.state(), &6);
235 }
236
237 #[tokio::test]
238 async fn missing_codec_is_a_typed_error() {
239 let journal = Arc::new(InMemoryJournal::default());
240 let mut rp: ReceivePersistent<i64, i64, DummyErr> =
241 ReceivePersistent::new("pid-2").on_command(|_, c| Ok(vec![c])).on_event(|s, e| {
242 *s += e;
243 });
244 let r = rp.handle(journal, 1).await;
245 assert!(matches!(r, Err(EventsourcedError::Codec(_))));
246 }
247}