atomr_patterns/cqrs/event_codec.rs
1//! [`EventCodecRegistry`] — manifest-keyed event decoder dispatch.
2//!
3//! Use it when an aggregate's event schema evolves. For each
4//! historical manifest your journal might contain, register a decoder
5//! that returns the *current* `Event` type:
6//!
7//! ```ignore
8//! let registry = EventCodecRegistry::<OrderEvent>::new()
9//! .register("order-evt-v1", |b| OrderEventV1::decode(b).map(Into::into))
10//! .register("order-evt-v2", |b| OrderEventV2::decode(b))
11//! .with_default(|b| OrderEventV2::decode(b));
12//!
13//! CqrsPattern::<Order>::builder(journal)
14//! .factory(...)
15//! .with_event_codecs(registry)
16//! .with_reader(MyReader)
17//! .build()?
18//! ```
19//!
20//! On replay, the runner inspects each
21//! [`atomr_persistence_query::EventEnvelope::manifest`] and dispatches
22//! to the matching registered decoder. When no entry matches, the
23//! reader's [`crate::cqrs::Reader::decode`] is used as a final
24//! fallback.
25
26use std::collections::HashMap;
27use std::sync::Arc;
28
29type Decoder<E> = Arc<dyn Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static>;
30
31/// Manifest -> decoder map plus an optional catch-all decoder.
32pub struct EventCodecRegistry<E: Send + 'static> {
33 pub(crate) decoders: HashMap<String, Decoder<E>>,
34 pub(crate) default: Option<Decoder<E>>,
35}
36
37impl<E: Send + 'static> Default for EventCodecRegistry<E> {
38 fn default() -> Self {
39 Self { decoders: HashMap::new(), default: None }
40 }
41}
42
43impl<E: Send + 'static> EventCodecRegistry<E> {
44 pub fn new() -> Self {
45 Self::default()
46 }
47
48 /// Register `decode` as the decoder for events written with
49 /// `manifest`.
50 pub fn register<F>(mut self, manifest: impl Into<String>, decode: F) -> Self
51 where
52 F: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
53 {
54 self.decoders.insert(manifest.into(), Arc::new(decode));
55 self
56 }
57
58 /// Set a catch-all decoder used when no manifest matches.
59 pub fn with_default<F>(mut self, decode: F) -> Self
60 where
61 F: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
62 {
63 self.default = Some(Arc::new(decode));
64 self
65 }
66
67 /// Look up a decoder for `manifest` or fall back to the default.
68 pub fn decode(&self, manifest: &str, bytes: &[u8]) -> Option<Result<E, String>> {
69 if let Some(decoder) = self.decoders.get(manifest) {
70 return Some(decoder(bytes));
71 }
72 self.default.as_ref().map(|d| d(bytes))
73 }
74}