Skip to main content

atomr_patterns/cqrs/
event_codec.rs

1//! [`EventCodecRegistry`] — manifest-keyed event decoder dispatch.
2//!
3//! Use it when an aggregate's event schema evolves. For each
4//! historical manifest your journal might contain, register a decoder
5//! that returns the *current* `Event` type:
6//!
7//! ```ignore
8//! let registry = EventCodecRegistry::<OrderEvent>::new()
9//!     .register("order-evt-v1", |b| OrderEventV1::decode(b).map(Into::into))
10//!     .register("order-evt-v2", |b| OrderEventV2::decode(b))
11//!     .with_default(|b| OrderEventV2::decode(b));
12//!
13//! CqrsPattern::<Order>::builder(journal)
14//!     .factory(...)
15//!     .with_event_codecs(registry)
16//!     .with_reader(MyReader)
17//!     .build()?
18//! ```
19//!
20//! On replay, the runner inspects each
21//! [`atomr_persistence_query::EventEnvelope::manifest`] and dispatches
22//! to the matching registered decoder. When no entry matches, the
23//! reader's [`crate::cqrs::Reader::decode`] is used as a final
24//! fallback.
25
26use std::collections::HashMap;
27use std::sync::Arc;
28
29type Decoder<E> = Arc<dyn Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static>;
30
31/// Manifest -> decoder map plus an optional catch-all decoder.
32pub struct EventCodecRegistry<E: Send + 'static> {
33    pub(crate) decoders: HashMap<String, Decoder<E>>,
34    pub(crate) default: Option<Decoder<E>>,
35}
36
37impl<E: Send + 'static> Default for EventCodecRegistry<E> {
38    fn default() -> Self {
39        Self { decoders: HashMap::new(), default: None }
40    }
41}
42
43impl<E: Send + 'static> EventCodecRegistry<E> {
44    pub fn new() -> Self {
45        Self::default()
46    }
47
48    /// Register `decode` as the decoder for events written with
49    /// `manifest`.
50    pub fn register<F>(mut self, manifest: impl Into<String>, decode: F) -> Self
51    where
52        F: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
53    {
54        self.decoders.insert(manifest.into(), Arc::new(decode));
55        self
56    }
57
58    /// Set a catch-all decoder used when no manifest matches.
59    pub fn with_default<F>(mut self, decode: F) -> Self
60    where
61        F: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
62    {
63        self.default = Some(Arc::new(decode));
64        self
65    }
66
67    /// Look up a decoder for `manifest` or fall back to the default.
68    pub fn decode(&self, manifest: &str, bytes: &[u8]) -> Option<Result<E, String>> {
69        if let Some(decoder) = self.decoders.get(manifest) {
70            return Some(decoder(bytes));
71        }
72        self.default.as_ref().map(|d| d(bytes))
73    }
74}