1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3pub mod binarize;
58pub mod event_log;
59pub mod snapshot_store;
60
61mod util;
62
63use crate::{
64 binarize::Binarize,
65 event_log::EventLog,
66 snapshot_store::{Snapshot, SnapshotStore},
67 util::StreamExt as ThisStreamExt,
68};
69use error_ext::{BoxError, StdErrorExt};
70use futures::{future::ok, TryStreamExt};
71use serde::{Deserialize, Serialize};
72use std::{
73 any::Any,
74 fmt::Debug,
75 marker::PhantomData,
76 num::{NonZeroU64, NonZeroUsize},
77};
78use thiserror::Error;
79use tokio::{
80 sync::{mpsc, oneshot},
81 task,
82};
83use tracing::{debug, error, instrument};
84
85type BoxedCommand<E> = Box<dyn ErasedCommand<E> + Send>;
86type BoxedCommandEffect<E> = Result<
87 (
88 Option<<E as EventSourced>::Event>,
89 Box<dyn FnOnce(&E) -> BoxedAny + Send + Sync>,
90 ),
91 BoxedAny,
92>;
93type BoxedAny = Box<dyn Any + Send>;
94type BoxedMsg<E> = (BoxedCommand<E>, oneshot::Sender<Result<BoxedAny, BoxedAny>>);
95
96pub trait EventSourced
98where
99 Self: Send + Sync + Sized + 'static,
100{
101 type Id: Debug + Clone + Send;
103
104 type Event: Debug + Send + Sync + 'static;
106
107 const TYPE_NAME: &'static str;
109
110 fn handle_event(self, event: Self::Event) -> Self;
112}
113
114pub trait Command<E>
116where
117 Self: Debug + Send + 'static,
118 E: EventSourced,
119{
120 type Reply: Send + Sync + 'static;
122
123 type Error: Send + 'static;
125
126 fn handle_command(self, id: &E::Id, state: &E) -> CommandEffect<E, Self::Reply, Self::Error>;
131}
132
133pub enum CommandEffect<E, Reply, Error>
136where
137 E: EventSourced,
138{
139 EmitAndReply(E::Event, Box<dyn FnOnce(&E) -> Reply + Send + Sync>),
140 Reply(Reply),
141 Reject(Error),
142}
143
144impl<E, Reply, Error> CommandEffect<E, Reply, Error>
145where
146 E: EventSourced,
147{
148 pub fn emit_and_reply(
151 event: E::Event,
152 make_reply: impl FnOnce(&E) -> Reply + Send + Sync + 'static,
153 ) -> Self {
154 Self::EmitAndReply(event, Box::new(make_reply))
155 }
156
157 pub fn reply(reply: Reply) -> Self {
159 Self::Reply(reply)
160 }
161
162 pub fn reject(error: Error) -> Self {
164 Self::Reject(error)
165 }
166}
167
168impl<E, Error> CommandEffect<E, (), Error>
169where
170 E: EventSourced,
171{
172 pub fn emit(event: E::Event) -> Self {
174 Self::emit_and_reply(event, |_| ())
175 }
176}
177
178#[derive(Debug, Clone)]
180pub struct EntityRef<E>
181where
182 E: EventSourced,
183{
184 command_in: mpsc::Sender<BoxedMsg<E>>,
185 id: E::Id,
186 _e: PhantomData<E>,
187}
188
189impl<E> EntityRef<E>
190where
191 E: EventSourced,
192{
193 pub fn id(&self) -> &E::Id {
195 &self.id
196 }
197
198 #[instrument(skip(self))]
203 pub async fn handle_command<C>(
204 &self,
205 command: C,
206 ) -> Result<Result<C::Reply, C::Error>, HandleCommandError>
207 where
208 C: Command<E>,
209 {
210 let (result_in, result_out) = oneshot::channel();
211 self.command_in
212 .send((Box::new(command), result_in))
213 .await
214 .map_err(|_| HandleCommandError("cannot send command".to_string()))?;
215 let result = result_out
216 .await
217 .map_err(|_| HandleCommandError("cannot receive command handler result".to_string()))?;
218 let result = result
219 .map_err(|error| *error.downcast::<C::Error>().expect("downcast error"))
220 .map(|reply| *reply.downcast::<C::Reply>().expect("downcast reply"));
221 Ok(result)
222 }
223}
224
225pub trait EventSourcedExt
227where
228 Self: EventSourced,
229{
230 fn entity(self) -> EventSourcedEntity<Self> {
232 EventSourcedEntity(self)
233 }
234}
235
236impl<E> EventSourcedExt for E where E: EventSourced {}
237
238#[derive(Debug, Clone)]
240pub struct EventSourcedEntity<E>(E)
241where
242 E: EventSourced;
243
244impl<E> EventSourcedEntity<E>
245where
246 E: EventSourced,
247{
248 #[instrument(skip(self, event_log, snapshot_store, binarize))]
251 pub async fn spawn<L, S, B>(
252 self,
253 id: E::Id,
254 snapshot_after: Option<NonZeroU64>,
255 command_buffer: NonZeroUsize,
256 mut event_log: L,
257 mut snapshot_store: S,
258 binarize: B,
259 ) -> Result<EntityRef<E>, SpawnError>
260 where
261 L: EventLog<Id = E::Id>,
262 S: SnapshotStore<Id = E::Id>,
263 B: Binarize<E::Event, E>,
264 {
265 let (snapshot_seq_no, state) = snapshot_store
267 .load::<E, _, _>(&id, |bytes| binarize.state_from_bytes(bytes))
268 .await
269 .map_err(|error| SpawnError::LoadSnapshot(error.into()))?
270 .map(|Snapshot { seq_no, state }| {
271 debug!(?id, seq_no, "restored snapshot");
272 (seq_no, state)
273 })
274 .unzip();
275 let mut state = state.unwrap_or(self.0);
276
277 let mut last_seq_no = event_log
279 .last_seq_no(E::TYPE_NAME, &id)
280 .await
281 .map_err(|error| SpawnError::LastNonZeroU64(error.into()))?;
282 if last_seq_no < snapshot_seq_no {
283 return Err(SpawnError::InvalidLastSeqNo(last_seq_no, snapshot_seq_no));
284 };
285
286 if snapshot_seq_no < last_seq_no {
288 let seq_no = snapshot_seq_no
289 .map(|n| n.saturating_add(1))
290 .unwrap_or(NonZeroU64::MIN);
291 let to_seq_no = last_seq_no.unwrap(); debug!(?id, seq_no, to_seq_no, "replaying events");
293
294 let events = event_log
295 .events_by_id::<E::Event, _, _>(E::TYPE_NAME, &id, seq_no, move |bytes| {
296 binarize.event_from_bytes(bytes)
297 })
298 .await
299 .map_err(|error| SpawnError::EventsById(error.into()))?;
300
301 state = events
302 .map_err(|error| SpawnError::NextEvent(error.into()))
303 .take_until_predicate(move |result| {
304 result
305 .as_ref()
306 .ok()
307 .map(|&(seq_no, _)| seq_no >= to_seq_no)
308 .unwrap_or(true)
309 })
310 .try_fold(state, |state, (_, event)| ok(state.handle_event(event)))
311 .await?;
312
313 debug!(?id, "replayed events");
314 }
315
316 let (command_in, mut command_out) = mpsc::channel::<BoxedMsg<E>>(command_buffer.get());
318 task::spawn({
319 let id = id.clone();
320 let mut event_count = 0u64;
321
322 async move {
323 while let Some((command, result_sender)) = command_out.recv().await {
324 debug!(?id, ?command, "handling command");
325
326 let result = command.handle_command(&id, &state);
327 match result {
328 Ok((Some(event), make_reply)) => {
329 debug!(?id, ?event, "persisting event");
330
331 match event_log
332 .persist::<E::Event, _, _>(
333 E::TYPE_NAME,
334 &id,
335 last_seq_no,
336 &event,
337 &|event| binarize.event_to_bytes(event),
338 )
339 .await
340 {
341 Ok(seq_no) => {
342 debug!(?id, ?event, seq_no, "persited event");
343
344 last_seq_no = Some(seq_no);
345 state = state.handle_event(event);
346
347 event_count += 1;
348 if snapshot_after
349 .map(|a| event_count % a == 0)
350 .unwrap_or_default()
351 {
352 debug!(?id, seq_no, event_count, "saving snapshot");
353
354 if let Err(error) = snapshot_store
355 .save(&id, seq_no, &state, &|state| {
356 binarize.state_to_bytes(state)
357 })
358 .await
359 {
360 error!(
361 error = error.as_chain(),
362 ?id,
363 "cannot save snapshot"
364 );
365 };
366 }
367
368 let reply = make_reply(&state);
369 if result_sender.send(Ok(reply)).is_err() {
370 error!(?id, "cannot send command reply");
371 };
372 }
373
374 Err(error) => {
375 error!(error = error.as_chain(), ?id, "cannot persist event");
376 break;
378 }
379 }
380 }
381
382 Ok((None, make_reply)) => {
383 let reply = make_reply(&state);
384 if result_sender.send(Ok(reply)).is_err() {
385 error!(?id, "cannot send command reply");
386 }
387 }
388
389 Err(error) => {
390 if result_sender.send(Err(error)).is_err() {
391 error!(?id, "cannot send command error");
392 }
393 }
394 };
395 }
396
397 debug!(?id, "entity terminated");
398 }
399 });
400
401 Ok(EntityRef {
402 command_in,
403 id,
404 _e: PhantomData,
405 })
406 }
407}
408
409#[derive(Debug, Error, Serialize, Deserialize)]
412#[error("{0}")]
413pub struct HandleCommandError(String);
414
415#[derive(Debug, Error)]
417pub enum SpawnError {
418 #[error("cannot load snapshot from snapshot store")]
419 LoadSnapshot(#[source] BoxError),
420
421 #[error("last sequence number {0:?} less than snapshot sequence number {0:?}")]
422 InvalidLastSeqNo(Option<NonZeroU64>, Option<NonZeroU64>),
423
424 #[error("cannot get last seqence number from event log")]
425 LastNonZeroU64(#[source] BoxError),
426
427 #[error("cannot get events by ID stream from event log")]
428 EventsById(#[source] BoxError),
429
430 #[error("cannot get next event from events by ID stream")]
431 NextEvent(#[source] BoxError),
432}
433
434trait ErasedCommand<E>
435where
436 Self: Debug,
437 E: EventSourced,
438{
439 fn handle_command(self: Box<Self>, id: &E::Id, state: &E) -> BoxedCommandEffect<E>;
440}
441
442impl<C, E, Reply, Error> ErasedCommand<E> for C
443where
444 C: Command<E, Reply = Reply, Error = Error>,
445 E: EventSourced,
446 Reply: Send + Sync + 'static,
447 Error: Send + 'static,
448{
449 fn handle_command(self: Box<Self>, id: &E::Id, state: &E) -> BoxedCommandEffect<E> {
450 match <C as Command<E>>::handle_command(*self, id, state) {
451 CommandEffect::EmitAndReply(event, make_reply) => {
452 Ok((Some(event), Box::new(|s| Box::new(make_reply(s)))))
453 }
454 CommandEffect::Reply(reply) => Ok((None, Box::new(|_s| Box::new(reply)))),
455 CommandEffect::Reject(error) => Err(Box::new(error) as BoxedAny),
456 }
457 }
458}
459
460#[cfg(all(test, feature = "serde_json"))]
461mod tests {
462 use crate::{
463 binarize::serde_json::*,
464 event_log::{test::TestEventLog, EventLog},
465 snapshot_store::{test::TestSnapshotStore, SnapshotStore},
466 Command, CommandEffect, EntityRef, EventSourced, EventSourcedExt,
467 };
468 use assert_matches::assert_matches;
469 use error_ext::BoxError;
470 use serde::{Deserialize, Serialize};
471 use tracing_test::traced_test;
472 use uuid::Uuid;
473
474 #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
475 pub struct Counter(u64);
476
477 impl EventSourced for Counter {
478 type Id = Uuid;
479 type Event = CounterEvent;
480
481 const TYPE_NAME: &'static str = "counter";
482
483 fn handle_event(self, event: CounterEvent) -> Self {
484 match event {
485 CounterEvent::Increased(_, n) => Self(self.0 + n),
486 CounterEvent::Decreased(_, n) => Self(self.0 - n),
487 }
488 }
489 }
490
491 #[derive(Debug, Serialize, Deserialize)]
492 pub enum CounterEvent {
493 Increased(Uuid, u64),
494 Decreased(Uuid, u64),
495 }
496
497 #[derive(Debug)]
498 pub struct IncreaseCounter(pub u64);
499
500 impl Command<Counter> for IncreaseCounter {
501 type Error = Overflow;
502 type Reply = u64;
503
504 fn handle_command(
505 self,
506 id: &Uuid,
507 state: &Counter,
508 ) -> CommandEffect<Counter, u64, Overflow> {
509 if u64::MAX - state.0 < self.0 {
510 CommandEffect::reject(Overflow)
511 } else {
512 CommandEffect::emit_and_reply(
513 CounterEvent::Increased(*id, self.0),
514 |state: &Counter| state.0,
515 )
516 }
517 }
518 }
519
520 #[derive(Debug)]
521 pub struct Overflow;
522
523 #[derive(Debug)]
524 pub struct DecreaseCounter(pub u64);
525
526 impl Command<Counter> for DecreaseCounter {
527 type Error = Underflow;
528 type Reply = u64;
529
530 fn handle_command(
531 self,
532 id: &Uuid,
533 state: &Counter,
534 ) -> CommandEffect<Counter, u64, Underflow> {
535 if state.0 < self.0 {
536 CommandEffect::reject(Underflow)
537 } else {
538 CommandEffect::emit_and_reply(
539 CounterEvent::Decreased(*id, self.0),
540 move |state: &Counter| state.0 + self.0 - self.0, )
545 }
546 }
547 }
548
549 #[derive(Debug, PartialEq, Eq)]
550 pub struct Underflow;
551
552 #[derive(Debug)]
553 pub struct GetCounter;
554 impl Command<Counter> for GetCounter {
555 type Error = ();
556 type Reply = u64;
557
558 fn handle_command(self, _id: &Uuid, state: &Counter) -> CommandEffect<Counter, u64, ()> {
559 CommandEffect::reply(state.0)
560 }
561 }
562
563 #[tokio::test]
564 #[traced_test]
565 async fn test() -> Result<(), BoxError> {
566 let id = Uuid::from_u128(1);
567
568 let mut event_log = TestEventLog::default();
569 for _ in 0..42 {
570 event_log
571 .persist(
572 "counter",
573 &id,
574 None,
575 &CounterEvent::Increased(id, 1),
576 &to_bytes,
577 )
578 .await?;
579 }
580
581 let mut snapshot_store = TestSnapshotStore::default();
582 snapshot_store
583 .save(&id, 21.try_into()?, &Counter(21), &to_bytes)
584 .await?;
585
586 let entity: EntityRef<Counter> = Counter::default()
587 .entity()
588 .spawn(
589 id,
590 None,
591 1.try_into()?,
592 event_log,
593 snapshot_store,
594 SerdeJsonBinarize,
595 )
596 .await?;
597
598 let reply = entity.handle_command(IncreaseCounter(1)).await?;
599 assert_matches!(reply, Ok(43));
600 let reply = entity.handle_command(DecreaseCounter(100)).await?;
601 assert_matches!(reply, Err(error) if error == Underflow);
602 let reply = entity.handle_command(DecreaseCounter(1)).await?;
603 assert_matches!(reply, Ok(42));
604
605 let reply = entity.handle_command(GetCounter).await?;
606 assert_matches!(reply, Ok(42));
607
608 Ok(())
609 }
610}