evento_core/projection.rs
1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//! .handler(account_opened)
24//! .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//! .load::<Account>("account-123")
29//! .execute(&executor)
30//! .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//! .subscription()
35//! .routing_key("accounts")
36//! .start(&executor)
37//! .await?;
38//! ```
39
40use std::{collections::HashMap, future::Future, marker::PhantomData, ops::Deref, pin::Pin};
41
42use crate::{
43 context,
44 cursor::{self, Args, Cursor},
45 Aggregator, AggregatorBuilder, AggregatorEvent, Executor, ReadAggregator,
46};
47
48/// Handler context providing access to executor and shared data.
49///
50/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
51/// data storage and provides access to the executor for database operations.
52///
53/// # Example
54///
55/// ```rust,ignore
56/// #[evento::handler]
57/// async fn my_handler<E: Executor>(
58/// event: Event<MyEventData>,
59/// action: Action<'_, MyView, E>,
60/// ) -> anyhow::Result<()> {
61/// if let Action::Handle(ctx) = action {
62/// // Access shared data
63/// let config: Data<AppConfig> = ctx.extract();
64///
65/// // Use executor for queries
66/// let events = ctx.executor.read(...).await?;
67/// }
68/// Ok(())
69/// }
70/// ```
71#[derive(Clone)]
72pub struct Context<'a, E: Executor> {
73 context: context::RwContext,
74 /// Reference to the executor for database operations
75 pub executor: &'a E,
76 pub id: String,
77 revision: u16,
78 aggregator_type: String,
79 aggregators: &'a HashMap<String, String>,
80}
81
82impl<'a, E: Executor> Context<'a, E> {
83 /// Retrieves a stored snapshot for the given ID.
84 ///
85 /// Returns `None` if no snapshot exists.
86 pub async fn get_snapshot<D: bitcode::DecodeOwned + ProjectionCursor>(
87 &self,
88 ) -> anyhow::Result<Option<D>> {
89 let Some((data, cursor)) = self
90 .executor
91 .get_snapshot(
92 self.aggregator_type.to_owned(),
93 self.revision.to_string(),
94 self.id.to_owned(),
95 )
96 .await?
97 else {
98 return Ok(None);
99 };
100
101 let mut data: D = bitcode::decode(&data)?;
102 data.set_cursor(&cursor);
103
104 Ok(Some(data))
105 }
106
107 /// Stores a snapshot for the given ID.
108 ///
109 /// The snapshot cursor is extracted from the data to track the event position.
110 pub async fn take_snapshot<D: bitcode::Encode + ProjectionCursor>(
111 &self,
112 data: &D,
113 ) -> anyhow::Result<()> {
114 let cursor = data.get_cursor();
115 let data = bitcode::encode(data);
116
117 self.executor
118 .save_snapshot(
119 self.aggregator_type.to_owned(),
120 self.revision.to_string(),
121 self.id.to_owned(),
122 data,
123 cursor,
124 )
125 .await
126 }
127
128 /// Returns the aggregate ID for a registered aggregator type.
129 ///
130 /// # Panics
131 ///
132 /// Panics if the aggregator type was not registered via [`Projection::aggregator`].
133 pub async fn aggregator<A: Aggregator>(&self) -> String {
134 tracing::debug!(
135 "Failed to get `Aggregator id <{}>` For the Aggregator id extractor to work \
136 correctly, wrap the data with `Projection::new().aggregator::<MyAggregator>(id)`. \
137 Ensure that types align in both the set and retrieve calls.",
138 A::aggregator_type()
139 );
140
141 self.aggregators
142 .get(A::aggregator_type())
143 .expect("Projection Aggregator not configured correctly. View/enable debug logs for more details.")
144 .to_owned()
145 }
146}
147
148impl<'a, E: Executor> Deref for Context<'a, E> {
149 type Target = context::RwContext;
150
151 fn deref(&self) -> &Self::Target {
152 &self.context
153 }
154}
155
156/// Trait for event handlers.
157///
158/// Handlers process events in two modes:
159/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
160/// - `apply`: For loading aggregate state by replaying events
161///
162/// This trait is typically implemented via the `#[evento::handler]` macro.
163pub trait Handler<P: 'static>: Sync + Send {
164 /// Applies an event to build projection state.
165 ///
166 /// This is called when loading aggregate state by replaying events.
167 /// It should be a pure function that modifies the projection without side effects.
168 fn handle<'a>(
169 &'a self,
170 projection: &'a mut P,
171 event: &'a crate::Event,
172 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
173
174 /// Returns the aggregator type this handler processes.
175 fn aggregator_type(&self) -> &'static str;
176 /// Returns the event name this handler processes.
177 fn event_name(&self) -> &'static str;
178}
179
180/// Trait for types that track their cursor position in the event stream.
181///
182/// This trait is typically derived using the `#[evento::projection]` macro.
183pub trait ProjectionCursor {
184 /// Returns the current cursor position.
185 fn get_cursor(&self) -> cursor::Value;
186 /// Sets the cursor position.
187 fn set_cursor(&mut self, v: &cursor::Value);
188}
189
190/// Trait for projections that can create an [`AggregatorBuilder`].
191///
192/// Extends [`ProjectionCursor`] to provide aggregate identity and versioning,
193/// enabling projections to emit new events.
194pub trait ProjectionAggregator: ProjectionCursor {
195 /// Returns the aggregate ID for this projection.
196 ///
197 /// # Panics
198 ///
199 /// Default implementation panics; must be overridden.
200 fn aggregator_id(&self) -> String {
201 todo!("ProjectionCursor.aggregator_id must be implemented for ProjectionCursor.aggregator")
202 }
203
204 /// Returns the current aggregate version from the cursor.
205 ///
206 /// Returns `0` if no cursor is set.
207 fn aggregator_version(&self) -> anyhow::Result<u16> {
208 let value = self.get_cursor();
209 if value == Default::default() {
210 return Ok(0);
211 }
212
213 let cursor = crate::Event::deserialize_cursor(&value)?;
214
215 Ok(cursor.v)
216 }
217
218 /// Creates an [`AggregatorBuilder`] pre-configured with ID and version.
219 ///
220 /// Use this to emit new events from a projection.
221 fn aggregator(&self) -> anyhow::Result<AggregatorBuilder> {
222 Ok(AggregatorBuilder::new(self.aggregator_id())
223 .original_version(self.aggregator_version()?)
224 .to_owned())
225 }
226}
227
228/// Trait for types that can be restored from snapshots.
229///
230/// Snapshots provide a performance optimization by storing pre-computed
231/// state, avoiding the need to replay all events from the beginning.
232///
233/// This trait is typically implemented via the `#[evento::snapshot]` macro.
234///
235/// # Example
236///
237/// ```rust,ignore
238/// #[evento::snapshot]
239/// #[derive(Default)]
240/// pub struct AccountView {
241/// pub balance: i64,
242/// pub owner: String,
243/// }
244///
245/// // The macro generates the restore implementation that loads
246/// // from a snapshot table if available
247/// ```
248pub trait Snapshot<E: Executor>: ProjectionCursor + Sized {
249 /// Restores state from a snapshot if available.
250 ///
251 /// Returns `None` if no snapshot exists for the given ID.
252 fn restore(
253 _context: &Context<'_, E>,
254 ) -> impl Future<Output = anyhow::Result<Option<Self>>> + Send {
255 Box::pin(async { Ok(None) })
256 }
257
258 /// Stores the current state as a snapshot.
259 ///
260 /// Default implementation does nothing.
261 fn take_snapshot(
262 &self,
263 _context: &Context<'_, E>,
264 ) -> impl Future<Output = anyhow::Result<()>> + Send {
265 Box::pin(async { Ok(()) })
266 }
267}
268
269impl<T: bitcode::Encode + bitcode::DecodeOwned + ProjectionCursor + Send + Sync, E: Executor>
270 Snapshot<E> for T
271{
272 async fn restore(context: &Context<'_, E>) -> anyhow::Result<Option<Self>> {
273 context.get_snapshot().await
274 }
275
276 async fn take_snapshot(&self, context: &Context<'_, E>) -> anyhow::Result<()> {
277 context.take_snapshot(self).await
278 }
279}
280
281/// Projection for loading aggregate state from events.
282///
283/// Combines event handlers to rebuild aggregate state by replaying events.
284/// Supports snapshots for performance optimization.
285///
286/// # Example
287///
288/// ```rust,ignore
289/// let result = Projection::<_, AccountView>::new::<Account>("account-123")
290/// .handler(account_opened)
291/// .handler(money_deposited)
292/// .data(app_config)
293/// .execute(&executor)
294/// .await?;
295/// ```
296pub struct Projection<E: Executor, P: Default + 'static> {
297 id: String,
298 aggregator_type: String,
299 revision: u16,
300 aggregators: HashMap<String, String>,
301 handlers: HashMap<String, Box<dyn Handler<P>>>,
302 context: context::RwContext,
303 safety_disabled: bool,
304 executor: PhantomData<E>,
305}
306
307impl<E: Executor, P: Snapshot<E> + Default + 'static> Projection<E, P> {
308 /// Creates a builder for loading aggregate state.
309 ///
310 /// This consumes the projection and returns a [`LoadBuilder`] configured
311 /// to load the state for the specified aggregate.
312 ///
313 /// # Type Parameters
314 ///
315 /// - `A`: The aggregate type to load
316 pub fn new<A: Aggregator>(id: impl Into<String>) -> Projection<E, P>
317 where
318 P: Snapshot<E> + Default,
319 {
320 let id = id.into();
321 let mut aggregators = HashMap::new();
322 aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
323
324 Projection {
325 id,
326 aggregator_type: A::aggregator_type().to_owned(),
327 aggregators,
328 context: Default::default(),
329 handlers: HashMap::new(),
330 safety_disabled: true,
331 executor: PhantomData,
332 revision: 0,
333 }
334 }
335
336 /// Creates a builder for loading aggregate state.
337 ///
338 /// This consumes the projection and returns a [`LoadBuilder`] configured
339 /// to load the state for the specified aggregate.
340 ///
341 /// # Type Parameters
342 ///
343 /// - `A`: The aggregate type to load
344 pub fn ids<A: Aggregator>(ids: Vec<impl Into<String>>) -> Projection<E, P>
345 where
346 P: Snapshot<E> + Default,
347 {
348 Self::new::<A>(crate::hash_ids(ids))
349 }
350
351 /// Sets the snapshot revision.
352 ///
353 /// Changing the revision invalidates existing snapshots, forcing a full rebuild.
354 pub fn revision(mut self, value: u16) -> Self {
355 self.revision = value;
356
357 self
358 }
359
360 /// Enables safety checks for unhandled events.
361 ///
362 /// When enabled, execution fails if an event is encountered without a handler.
363 pub fn safety_check(mut self) -> Self {
364 self.safety_disabled = false;
365
366 self
367 }
368
369 /// Registers an event handler with this projection.
370 ///
371 /// # Panics
372 ///
373 /// Panics if a handler for the same event type is already registered.
374 pub fn handler<H: Handler<P> + 'static>(mut self, h: H) -> Self {
375 let key = format!("{}_{}", h.aggregator_type(), h.event_name());
376 if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
377 panic!("Cannot register event handler: key {} already exists", key);
378 }
379 self
380 }
381
382 /// Registers a skip handler with this projection.
383 ///
384 /// # Panics
385 ///
386 /// Panics if a handler for the same event type is already registered.
387 pub fn skip<EV: AggregatorEvent + Send + Sync + 'static>(self) -> Self {
388 self.handler(SkipHandler::<EV>(PhantomData))
389 }
390
391 /// Adds shared data to the load context.
392 ///
393 /// Data added here is accessible in handlers via the context.
394 pub fn data<D: Send + Sync + 'static>(self, v: D) -> Self {
395 self.context.insert(v);
396
397 self
398 }
399
400 /// Adds a related aggregate to load events from.
401 ///
402 /// Use this when the projection needs events from multiple aggregates.
403 pub fn aggregator<A: Aggregator>(self, id: impl Into<String>) -> Self {
404 self.aggregator_raw(A::aggregator_type().to_owned(), id)
405 }
406
407 /// Adds a related aggregate to load events from.
408 ///
409 /// Use this when the projection needs events from multiple aggregates.
410 pub fn aggregator_raw(
411 mut self,
412 aggregator_type: impl Into<String>,
413 id: impl Into<String>,
414 ) -> Self {
415 self.aggregators.insert(aggregator_type.into(), id.into());
416
417 self
418 }
419
420 /// Executes the load operation, returning the rebuilt state.
421 ///
422 /// Returns `None` if no events exist for the aggregate.
423 /// Returns `Err` if there are too many events to process in one batch.
424 pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<P>> {
425 let context = Context {
426 context: self.context.clone(),
427 executor,
428 id: self.id.to_owned(),
429 aggregator_type: self.aggregator_type.to_owned(),
430 aggregators: &self.aggregators,
431 revision: self.revision,
432 };
433 let snapshot = P::restore(&context).await?;
434 let cursor = snapshot.as_ref().map(|s| s.get_cursor());
435
436 let read_aggregators = self
437 .handlers
438 .values()
439 .map(|h| match self.aggregators.get(h.aggregator_type()) {
440 Some(id) => ReadAggregator {
441 aggregator_type: h.aggregator_type().to_owned(),
442 aggregator_id: Some(id.to_owned()),
443 name: if self.safety_disabled {
444 Some(h.event_name().to_owned())
445 } else {
446 None
447 },
448 },
449 _ => {
450 if self.safety_disabled {
451 ReadAggregator::event(h.aggregator_type(), h.event_name())
452 } else {
453 ReadAggregator::aggregator(h.aggregator_type())
454 }
455 }
456 })
457 .collect::<Vec<_>>();
458
459 let events = executor
460 .read(
461 Some(read_aggregators.to_vec()),
462 None,
463 Args::forward(100, cursor.clone()),
464 )
465 .await?;
466
467 if events.edges.is_empty() && snapshot.is_none() {
468 return Ok(None);
469 }
470
471 let mut snapshot = snapshot.unwrap_or_default();
472
473 for event in events.edges.iter() {
474 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
475
476 let Some(handler) = self.handlers.get(&key) else {
477 if !self.safety_disabled {
478 anyhow::bail!("no handler k={key}");
479 }
480
481 continue;
482 };
483
484 handler.handle(&mut snapshot, &event.node).await?;
485 }
486
487 if let Some(event) = events.edges.last() {
488 snapshot.set_cursor(&event.cursor);
489 snapshot.take_snapshot(&context).await?;
490 }
491
492 if events.page_info.has_next_page {
493 anyhow::bail!("Too busy");
494 }
495
496 Ok(Some(snapshot))
497 }
498}
499
500pub(crate) struct SkipHandler<E: AggregatorEvent>(PhantomData<E>);
501
502impl<P: 'static, EV: AggregatorEvent + Send + Sync> Handler<P> for SkipHandler<EV> {
503 fn handle<'a>(
504 &'a self,
505 _projection: &'a mut P,
506 _event: &'a crate::Event,
507 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>> {
508 Box::pin(async { Ok(()) })
509 }
510
511 fn aggregator_type(&self) -> &'static str {
512 EV::aggregator_type()
513 }
514
515 fn event_name(&self) -> &'static str {
516 EV::event_name()
517 }
518}