Skip to main content

evento_core/
projection.rs

1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//!     .handler(account_opened)
24//!     .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//!     .load::<Account>("account-123")
29//!     .execute(&executor)
30//!     .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//!     .subscription()
35//!     .routing_key("accounts")
36//!     .start(&executor)
37//!     .await?;
38//! ```
39
40use std::{collections::HashMap, future::Future, marker::PhantomData, ops::Deref, pin::Pin};
41
42use crate::{
43    context,
44    cursor::{self, Args, Cursor},
45    Aggregator, AggregatorBuilder, AggregatorEvent, Executor, ReadAggregator,
46};
47
48/// Handler context providing access to executor and shared data.
49///
50/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
51/// data storage and provides access to the executor for database operations.
52///
53/// # Example
54///
55/// ```rust,ignore
56/// #[evento::handler]
57/// async fn my_handler<E: Executor>(
58///     event: Event<MyEventData>,
59///     action: Action<'_, MyView, E>,
60/// ) -> anyhow::Result<()> {
61///     if let Action::Handle(ctx) = action {
62///         // Access shared data
63///         let config: Data<AppConfig> = ctx.extract();
64///
65///         // Use executor for queries
66///         let events = ctx.executor.read(...).await?;
67///     }
68///     Ok(())
69/// }
70/// ```
71#[derive(Clone)]
72pub struct Context<'a, E: Executor> {
73    context: context::RwContext,
74    /// Reference to the executor for database operations
75    pub executor: &'a E,
76    pub id: String,
77    revision: u16,
78    aggregator_type: String,
79    aggregators: &'a HashMap<String, String>,
80}
81
82impl<'a, E: Executor> Context<'a, E> {
83    /// Retrieves a stored snapshot for the given ID.
84    ///
85    /// Returns `None` if no snapshot exists.
86    pub async fn get_snapshot<D: bitcode::DecodeOwned + ProjectionCursor>(
87        &self,
88    ) -> anyhow::Result<Option<D>> {
89        let Some((data, cursor)) = self
90            .executor
91            .get_snapshot(
92                self.aggregator_type.to_owned(),
93                self.revision.to_string(),
94                self.id.to_owned(),
95            )
96            .await?
97        else {
98            return Ok(None);
99        };
100
101        let mut data: D = bitcode::decode(&data)?;
102        data.set_cursor(&cursor);
103
104        Ok(Some(data))
105    }
106
107    /// Stores a snapshot for the given ID.
108    ///
109    /// The snapshot cursor is extracted from the data to track the event position.
110    pub async fn take_snapshot<D: bitcode::Encode + ProjectionCursor>(
111        &self,
112        data: &D,
113    ) -> anyhow::Result<()> {
114        let cursor = data.get_cursor();
115        let data = bitcode::encode(data);
116
117        self.executor
118            .save_snapshot(
119                self.aggregator_type.to_owned(),
120                self.revision.to_string(),
121                self.id.to_owned(),
122                data,
123                cursor,
124            )
125            .await
126    }
127
128    /// Returns the aggregate ID for a registered aggregator type.
129    ///
130    /// # Panics
131    ///
132    /// Panics if the aggregator type was not registered via [`Projection::aggregator`].
133    pub async fn aggregator<A: Aggregator>(&self) -> String {
134        tracing::debug!(
135            "Failed to get `Aggregator id <{}>` For the Aggregator id extractor to work \
136        correctly, wrap the data with `Projection::new().aggregator::<MyAggregator>(id)`. \
137        Ensure that types align in both the set and retrieve calls.",
138            A::aggregator_type()
139        );
140
141        self.aggregators
142            .get(A::aggregator_type())
143            .expect("Projection Aggregator not configured correctly. View/enable debug logs for more details.")
144            .to_owned()
145    }
146}
147
148impl<'a, E: Executor> Deref for Context<'a, E> {
149    type Target = context::RwContext;
150
151    fn deref(&self) -> &Self::Target {
152        &self.context
153    }
154}
155
156/// Trait for event handlers.
157///
158/// Handlers process events in two modes:
159/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
160/// - `apply`: For loading aggregate state by replaying events
161///
162/// This trait is typically implemented via the `#[evento::handler]` macro.
163pub trait Handler<P: 'static>: Sync + Send {
164    /// Applies an event to build projection state.
165    ///
166    /// This is called when loading aggregate state by replaying events.
167    /// It should be a pure function that modifies the projection without side effects.
168    fn handle<'a>(
169        &'a self,
170        projection: &'a mut P,
171        event: &'a crate::Event,
172    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
173
174    /// Returns the aggregator type this handler processes.
175    fn aggregator_type(&self) -> &'static str;
176    /// Returns the event name this handler processes.
177    fn event_name(&self) -> &'static str;
178}
179
180/// Trait for types that track their cursor position in the event stream.
181///
182/// This trait is typically derived using the `#[evento::projection]` macro.
183pub trait ProjectionCursor {
184    /// Returns the current cursor position.
185    fn get_cursor(&self) -> cursor::Value;
186    /// Sets the cursor position.
187    fn set_cursor(&mut self, v: &cursor::Value);
188}
189
190/// Trait for projections that can create an [`AggregatorBuilder`].
191///
192/// Extends [`ProjectionCursor`] to provide aggregate identity and versioning,
193/// enabling projections to emit new events.
194pub trait ProjectionAggregator: ProjectionCursor {
195    /// Returns the aggregate ID for this projection.
196    ///
197    /// # Panics
198    ///
199    /// Default implementation panics; must be overridden.
200    fn aggregator_id(&self) -> String {
201        todo!("ProjectionCursor.aggregator_id must be implemented for ProjectionCursor.aggregator")
202    }
203
204    /// Returns the current aggregate version from the cursor.
205    ///
206    /// Returns `0` if no cursor is set.
207    fn aggregator_version(&self) -> anyhow::Result<u16> {
208        let value = self.get_cursor();
209        if value == Default::default() {
210            return Ok(0);
211        }
212
213        let cursor = crate::Event::deserialize_cursor(&value)?;
214
215        Ok(cursor.v)
216    }
217
218    /// Creates an [`AggregatorBuilder`] pre-configured with ID and version.
219    ///
220    /// Use this to emit new events from a projection.
221    fn aggregator(&self) -> anyhow::Result<AggregatorBuilder> {
222        Ok(AggregatorBuilder::new(self.aggregator_id())
223            .original_version(self.aggregator_version()?)
224            .to_owned())
225    }
226}
227
228/// Trait for types that can be restored from snapshots.
229///
230/// Snapshots provide a performance optimization by storing pre-computed
231/// state, avoiding the need to replay all events from the beginning.
232///
233/// This trait is typically implemented via the `#[evento::snapshot]` macro.
234///
235/// # Example
236///
237/// ```rust,ignore
238/// #[evento::snapshot]
239/// #[derive(Default)]
240/// pub struct AccountView {
241///     pub balance: i64,
242///     pub owner: String,
243/// }
244///
245/// // The macro generates the restore implementation that loads
246/// // from a snapshot table if available
247/// ```
248pub trait Snapshot<E: Executor>: ProjectionCursor + Sized {
249    /// Restores state from a snapshot if available.
250    ///
251    /// Returns `None` if no snapshot exists for the given ID.
252    fn restore(
253        _context: &Context<'_, E>,
254    ) -> impl Future<Output = anyhow::Result<Option<Self>>> + Send {
255        Box::pin(async { Ok(None) })
256    }
257
258    /// Stores the current state as a snapshot.
259    ///
260    /// Default implementation does nothing.
261    fn take_snapshot(
262        &self,
263        _context: &Context<'_, E>,
264    ) -> impl Future<Output = anyhow::Result<()>> + Send {
265        Box::pin(async { Ok(()) })
266    }
267}
268
269impl<T: bitcode::Encode + bitcode::DecodeOwned + ProjectionCursor + Send + Sync, E: Executor>
270    Snapshot<E> for T
271{
272    async fn restore(context: &Context<'_, E>) -> anyhow::Result<Option<Self>> {
273        context.get_snapshot().await
274    }
275
276    async fn take_snapshot(&self, context: &Context<'_, E>) -> anyhow::Result<()> {
277        context.take_snapshot(self).await
278    }
279}
280
281/// Projection for loading aggregate state from events.
282///
283/// Combines event handlers to rebuild aggregate state by replaying events.
284/// Supports snapshots for performance optimization.
285///
286/// # Example
287///
288/// ```rust,ignore
289/// let result = Projection::<_, AccountView>::new::<Account>("account-123")
290///     .handler(account_opened)
291///     .handler(money_deposited)
292///     .data(app_config)
293///     .execute(&executor)
294///     .await?;
295/// ```
296pub struct Projection<E: Executor, P: Default + 'static> {
297    id: String,
298    aggregator_type: String,
299    revision: u16,
300    aggregators: HashMap<String, String>,
301    handlers: HashMap<String, Box<dyn Handler<P>>>,
302    context: context::RwContext,
303    safety_disabled: bool,
304    executor: PhantomData<E>,
305}
306
307impl<E: Executor, P: Snapshot<E> + Default + 'static> Projection<E, P> {
308    /// Creates a builder for loading aggregate state.
309    ///
310    /// This consumes the projection and returns a [`LoadBuilder`] configured
311    /// to load the state for the specified aggregate.
312    ///
313    /// # Type Parameters
314    ///
315    /// - `A`: The aggregate type to load
316    pub fn new<A: Aggregator>(id: impl Into<String>) -> Projection<E, P>
317    where
318        P: Snapshot<E> + Default,
319    {
320        let id = id.into();
321        let mut aggregators = HashMap::new();
322        aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
323
324        Projection {
325            id,
326            aggregator_type: A::aggregator_type().to_owned(),
327            aggregators,
328            context: Default::default(),
329            handlers: HashMap::new(),
330            safety_disabled: true,
331            executor: PhantomData,
332            revision: 0,
333        }
334    }
335
336    /// Creates a builder for loading aggregate state.
337    ///
338    /// This consumes the projection and returns a [`LoadBuilder`] configured
339    /// to load the state for the specified aggregate.
340    ///
341    /// # Type Parameters
342    ///
343    /// - `A`: The aggregate type to load
344    pub fn ids<A: Aggregator>(ids: Vec<impl Into<String>>) -> Projection<E, P>
345    where
346        P: Snapshot<E> + Default,
347    {
348        Self::new::<A>(crate::hash_ids(ids))
349    }
350
351    /// Sets the snapshot revision.
352    ///
353    /// Changing the revision invalidates existing snapshots, forcing a full rebuild.
354    pub fn revision(mut self, value: u16) -> Self {
355        self.revision = value;
356
357        self
358    }
359
360    /// Enables safety checks for unhandled events.
361    ///
362    /// When enabled, execution fails if an event is encountered without a handler.
363    pub fn safety_check(mut self) -> Self {
364        self.safety_disabled = false;
365
366        self
367    }
368
369    /// Registers an event handler with this projection.
370    ///
371    /// # Panics
372    ///
373    /// Panics if a handler for the same event type is already registered.
374    pub fn handler<H: Handler<P> + 'static>(mut self, h: H) -> Self {
375        let key = format!("{}_{}", h.aggregator_type(), h.event_name());
376        if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
377            panic!("Cannot register event handler: key {} already exists", key);
378        }
379        self
380    }
381
382    /// Registers a skip handler with this projection.
383    ///
384    /// # Panics
385    ///
386    /// Panics if a handler for the same event type is already registered.
387    pub fn skip<EV: AggregatorEvent + Send + Sync + 'static>(self) -> Self {
388        self.handler(SkipHandler::<EV>(PhantomData))
389    }
390
391    /// Adds shared data to the load context.
392    ///
393    /// Data added here is accessible in handlers via the context.
394    pub fn data<D: Send + Sync + 'static>(self, v: D) -> Self {
395        self.context.insert(v);
396
397        self
398    }
399
400    /// Adds a related aggregate to load events from.
401    ///
402    /// Use this when the projection needs events from multiple aggregates.
403    pub fn aggregator<A: Aggregator>(self, id: impl Into<String>) -> Self {
404        self.aggregator_raw(A::aggregator_type().to_owned(), id)
405    }
406
407    /// Adds a related aggregate to load events from.
408    ///
409    /// Use this when the projection needs events from multiple aggregates.
410    pub fn aggregator_raw(
411        mut self,
412        aggregator_type: impl Into<String>,
413        id: impl Into<String>,
414    ) -> Self {
415        self.aggregators.insert(aggregator_type.into(), id.into());
416
417        self
418    }
419
420    /// Executes the load operation, returning the rebuilt state.
421    ///
422    /// Returns `None` if no events exist for the aggregate.
423    /// Returns `Err` if there are too many events to process in one batch.
424    pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<P>> {
425        let context = Context {
426            context: self.context.clone(),
427            executor,
428            id: self.id.to_owned(),
429            aggregator_type: self.aggregator_type.to_owned(),
430            aggregators: &self.aggregators,
431            revision: self.revision,
432        };
433        let snapshot = P::restore(&context).await?;
434        let cursor = snapshot.as_ref().map(|s| s.get_cursor());
435
436        let read_aggregators = self
437            .handlers
438            .values()
439            .map(|h| match self.aggregators.get(h.aggregator_type()) {
440                Some(id) => ReadAggregator {
441                    aggregator_type: h.aggregator_type().to_owned(),
442                    aggregator_id: Some(id.to_owned()),
443                    name: if self.safety_disabled {
444                        Some(h.event_name().to_owned())
445                    } else {
446                        None
447                    },
448                },
449                _ => {
450                    if self.safety_disabled {
451                        ReadAggregator::event(h.aggregator_type(), h.event_name())
452                    } else {
453                        ReadAggregator::aggregator(h.aggregator_type())
454                    }
455                }
456            })
457            .collect::<Vec<_>>();
458
459        let events = executor
460            .read(
461                Some(read_aggregators.to_vec()),
462                None,
463                Args::forward(100, cursor.clone()),
464            )
465            .await?;
466
467        if events.edges.is_empty() && snapshot.is_none() {
468            return Ok(None);
469        }
470
471        let mut snapshot = snapshot.unwrap_or_default();
472
473        for event in events.edges.iter() {
474            let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
475
476            let Some(handler) = self.handlers.get(&key) else {
477                if !self.safety_disabled {
478                    anyhow::bail!("no handler k={key}");
479                }
480
481                continue;
482            };
483
484            handler.handle(&mut snapshot, &event.node).await?;
485        }
486
487        if let Some(event) = events.edges.last() {
488            snapshot.set_cursor(&event.cursor);
489            snapshot.take_snapshot(&context).await?;
490        }
491
492        if events.page_info.has_next_page {
493            anyhow::bail!("Too busy");
494        }
495
496        Ok(Some(snapshot))
497    }
498}
499
500pub(crate) struct SkipHandler<E: AggregatorEvent>(PhantomData<E>);
501
502impl<P: 'static, EV: AggregatorEvent + Send + Sync> Handler<P> for SkipHandler<EV> {
503    fn handle<'a>(
504        &'a self,
505        _projection: &'a mut P,
506        _event: &'a crate::Event,
507    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>> {
508        Box::pin(async { Ok(()) })
509    }
510
511    fn aggregator_type(&self) -> &'static str {
512        EV::aggregator_type()
513    }
514
515    fn event_name(&self) -> &'static str {
516        EV::event_name()
517    }
518}