evento_core/projection.rs
1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//! .handler(account_opened)
24//! .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//! .load::<Account>("account-123")
29//! .execute(&executor)
30//! .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//! .subscription()
35//! .routing_key("accounts")
36//! .start(&executor)
37//! .await?;
38//! ```
39
40use std::{
41 collections::HashMap,
42 future::Future,
43 ops::{Deref, DerefMut},
44 pin::Pin,
45 time::Duration,
46};
47use tokio::time::{interval_at, Instant};
48use ulid::Ulid;
49
50use backon::{ExponentialBuilder, Retryable};
51
52use crate::{context, cursor::Args, Executor, ReadAggregator};
53
54/// Filter for events by routing key.
55///
56/// Routing keys allow partitioning events for parallel processing
57/// or filtering subscriptions to specific event streams.
58#[derive(Clone)]
59pub enum RoutingKey {
60 /// Match all events regardless of routing key
61 All,
62 /// Match events with a specific routing key (or no key if `None`)
63 Value(Option<String>),
64}
65
66/// Handler context providing access to executor and shared data.
67///
68/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
69/// data storage and provides access to the executor for database operations.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// #[evento::handler]
75/// async fn my_handler<E: Executor>(
76/// event: Event<MyEventData>,
77/// action: Action<'_, MyView, E>,
78/// ) -> anyhow::Result<()> {
79/// if let Action::Handle(ctx) = action {
80/// // Access shared data
81/// let config: Data<AppConfig> = ctx.extract();
82///
83/// // Use executor for queries
84/// let events = ctx.executor.read(...).await?;
85/// }
86/// Ok(())
87/// }
88/// ```
89#[derive(Clone)]
90pub struct Context<'a, E: Executor> {
91 context: context::RwContext,
92 /// Reference to the executor for database operations
93 pub executor: &'a E,
94}
95
96impl<'a, E: Executor> Deref for Context<'a, E> {
97 type Target = context::RwContext;
98
99 fn deref(&self) -> &Self::Target {
100 &self.context
101 }
102}
103
104/// Trait for aggregate types.
105///
106/// Aggregates are the root entities in event sourcing. Each aggregate
107/// type has a unique identifier string used for event storage and routing.
108///
109/// This trait is typically derived using the `#[evento::aggregator]` macro.
110///
111/// # Example
112///
113/// ```rust,ignore
114/// #[evento::aggregator("myapp/Account")]
115/// #[derive(Default)]
116/// pub struct Account {
117/// pub balance: i64,
118/// pub owner: String,
119/// }
120/// ```
121pub trait Aggregator: Default {
122 /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
123 fn aggregator_type() -> &'static str;
124}
125
126/// Trait for event types.
127///
128/// Events represent state changes that have occurred. Each event type
129/// has a name and belongs to an aggregator type.
130///
131/// This trait is typically derived using the `#[evento::aggregator]` macro.
132///
133/// # Example
134///
135/// ```rust,ignore
136/// #[evento::aggregator("myapp/Account")]
137/// #[derive(bitcode::Encode, bitcode::Decode)]
138/// pub struct AccountOpened {
139/// pub owner: String,
140/// }
141/// ```
142pub trait Event: Aggregator {
143 /// Returns the event name (e.g., "AccountOpened")
144 fn event_name() -> &'static str;
145}
146
147/// Trait for event handlers.
148///
149/// Handlers process events in two modes:
150/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
151/// - `apply`: For loading aggregate state by replaying events
152///
153/// This trait is typically implemented via the `#[evento::handler]` macro.
154pub trait Handler<P: 'static, E: Executor>: Sync + Send {
155 /// Handles an event during subscription processing.
156 ///
157 /// This is called when processing events in a subscription context,
158 /// where side effects like database updates or API calls are appropriate.
159 fn handle<'a>(
160 &'a self,
161 context: &'a Context<'a, E>,
162 event: &'a crate::Event,
163 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
164
165 /// Applies an event to build projection state.
166 ///
167 /// This is called when loading aggregate state by replaying events.
168 /// It should be a pure function that modifies the projection without side effects.
169 fn apply<'a>(
170 &'a self,
171 projection: &'a mut P,
172 event: &'a crate::Event,
173 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
174
175 /// Returns the aggregator type this handler processes.
176 fn aggregator_type(&self) -> &'static str;
177 /// Returns the event name this handler processes.
178 fn event_name(&self) -> &'static str;
179}
180
181/// Action passed to event handlers.
182///
183/// Determines whether the handler should apply state changes or
184/// handle the event with side effects.
185pub enum Action<'a, P: 'static, E: Executor> {
186 /// Apply event to projection state (for loading)
187 Apply(&'a mut P),
188 /// Handle event with context (for subscriptions)
189 Handle(&'a Context<'a, E>),
190}
191
192/// Typed event with deserialized data and metadata.
193///
194/// `EventData` wraps a raw [`Event`](crate::Event) and provides typed access
195/// to the deserialized event data and metadata. It implements `Deref` to
196/// provide access to the underlying event fields (id, timestamp, version, etc.).
197///
198/// # Type Parameters
199///
200/// - `D`: The event data type (e.g., `AccountOpened`)
201/// - `M`: The metadata type (defaults to `bool` for no metadata)
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use evento::metadata::Event;
207///
208/// #[evento::handler]
209/// async fn handle_deposit<E: Executor>(
210/// event: Event<MoneyDeposited>,
211/// action: Action<'_, AccountView, E>,
212/// ) -> anyhow::Result<()> {
213/// // Access typed data
214/// println!("Amount: {}", event.data.amount);
215///
216/// // Access metadata
217/// if let Ok(user) = event.metadata.user() {
218/// println!("By user: {}", user);
219/// }
220///
221/// // Access underlying event fields via Deref
222/// println!("Event ID: {}", event.id);
223/// println!("Version: {}", event.version);
224///
225/// Ok(())
226/// }
227/// ```
228pub struct EventData<D, M = bool> {
229 event: crate::Event,
230 /// The typed event data
231 pub data: D,
232 /// The typed event metadata
233 pub metadata: M,
234}
235
236impl<D, M> Deref for EventData<D, M> {
237 type Target = crate::Event;
238
239 fn deref(&self) -> &Self::Target {
240 &self.event
241 }
242}
243
244impl<D, M> TryFrom<&crate::Event> for EventData<D, M>
245where
246 D: bitcode::DecodeOwned,
247 M: bitcode::DecodeOwned,
248{
249 type Error = bitcode::Error;
250
251 fn try_from(value: &crate::Event) -> Result<Self, Self::Error> {
252 let data = bitcode::decode::<D>(&value.data)?;
253 let metadata = bitcode::decode::<M>(&value.metadata)?;
254 Ok(EventData {
255 data,
256 metadata,
257 event: value.clone(),
258 })
259 }
260}
261
262/// Container for event handlers that build a projection.
263///
264/// A `Projection` groups related event handlers together and provides
265/// methods to load aggregate state or create subscriptions.
266///
267/// # Type Parameters
268///
269/// - `P`: The projection/view type being built
270/// - `E`: The executor type for database operations
271///
272/// # Example
273///
274/// ```rust,ignore
275/// let projection = Projection::<AccountView, _>::new("accounts")
276/// .handler(account_opened)
277/// .handler(money_deposited)
278/// .handler(money_withdrawn);
279///
280/// // Use for loading state
281/// let state = projection.clone()
282/// .load::<Account>("account-123")
283/// .execute(&executor)
284/// .await?;
285///
286/// // Or create a subscription
287/// let sub = projection
288/// .subscription()
289/// .start(&executor)
290/// .await?;
291/// ```
292pub struct Projection<P: 'static, E: Executor> {
293 key: String,
294 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
295}
296
297impl<P: 'static, E: Executor> Projection<P, E> {
298 /// Creates a new projection with the given key.
299 ///
300 /// The key is used as the subscription identifier for cursor tracking.
301 pub fn new(key: impl Into<String>) -> Self {
302 Self {
303 key: key.into(),
304 handlers: HashMap::new(),
305 }
306 }
307
308 /// Registers an event handler with this projection.
309 ///
310 /// # Panics
311 ///
312 /// Panics if a handler for the same event type is already registered.
313 pub fn handler<H: Handler<P, E> + 'static>(mut self, h: H) -> Self {
314 let key = format!("{}_{}", h.aggregator_type(), h.event_name());
315 if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
316 panic!("Cannot register event handler: key {} already exists", key);
317 }
318 self
319 }
320
321 /// Creates a builder for loading aggregate state.
322 ///
323 /// This consumes the projection and returns a [`LoadBuilder`] configured
324 /// to load the state for the specified aggregate.
325 ///
326 /// # Type Parameters
327 ///
328 /// - `A`: The aggregate type to load
329 pub fn load<A: Aggregator>(self, id: impl Into<String>) -> LoadBuilder<P, E>
330 where
331 P: Snapshot + Default,
332 {
333 let id = id.into();
334 let mut aggregators = HashMap::new();
335 aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
336
337 LoadBuilder {
338 key: self.key.to_owned(),
339 id,
340 aggregators,
341 handlers: self.handlers,
342 context: Default::default(),
343 }
344 }
345
346 /// Creates a builder for a continuous event subscription.
347 ///
348 /// This consumes the projection and returns a [`SubscriptionBuilder`]
349 /// that can be configured and started.
350 pub fn subscription(self) -> SubscriptionBuilder<P, E> {
351 SubscriptionBuilder {
352 key: self.key.to_owned(),
353 context: Default::default(),
354 handlers: self.handlers,
355 delay: None,
356 retry: Some(30),
357 chunk_size: 300,
358 is_accept_failure: false,
359 routing_key: RoutingKey::Value(None),
360 aggregators: Default::default(),
361 }
362 }
363}
364
365/// Result of loading an aggregate's state.
366///
367/// Contains the rebuilt projection state along with the current version
368/// and routing key. Implements `Deref` and `DerefMut` for transparent
369/// access to the inner item.
370///
371/// # Example
372///
373/// ```rust,ignore
374/// let result: LoadResult<AccountView> = projection
375/// .load::<Account>("account-123")
376/// .execute(&executor)
377/// .await?
378/// .expect("Account not found");
379///
380/// // Access inner item via Deref
381/// println!("Balance: {}", result.balance);
382///
383/// // Access metadata
384/// println!("Version: {}", result.version);
385/// println!("Routing key: {:?}", result.routing_key);
386/// ```
387#[derive(Debug, Clone, Default)]
388pub struct LoadResult<A> {
389 /// The loaded projection/view state
390 pub item: A,
391 /// Current version of the aggregate
392 pub version: u16,
393 /// Routing key for the aggregate (if set)
394 pub routing_key: Option<String>,
395}
396
397impl<A> Deref for LoadResult<A> {
398 type Target = A;
399 fn deref(&self) -> &Self::Target {
400 &self.item
401 }
402}
403
404impl<A> DerefMut for LoadResult<A> {
405 fn deref_mut(&mut self) -> &mut Self::Target {
406 &mut self.item
407 }
408}
409
410/// Type alias for an optional load result.
411pub type OptionLoadResult<A> = Option<LoadResult<A>>;
412
413/// Trait for types that can be restored from snapshots.
414///
415/// Snapshots provide a performance optimization by storing pre-computed
416/// state, avoiding the need to replay all events from the beginning.
417///
418/// This trait is typically implemented via the `#[evento::snapshot]` macro.
419///
420/// # Example
421///
422/// ```rust,ignore
423/// #[evento::snapshot]
424/// #[derive(Default)]
425/// pub struct AccountView {
426/// pub balance: i64,
427/// pub owner: String,
428/// }
429///
430/// // The macro generates the restore implementation that loads
431/// // from a snapshot table if available
432/// ```
433pub trait Snapshot: Sized {
434 /// Restores state from a snapshot if available.
435 ///
436 /// Returns `None` if no snapshot exists for the given ID.
437 fn restore<'a>(
438 context: &'a context::RwContext,
439 id: String,
440 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OptionLoadResult<Self>>> + Send + 'a>>;
441}
442
443/// Builder for loading aggregate state from events.
444///
445/// Created via [`Projection::load`], this builder configures how to
446/// load an aggregate's state by replaying events.
447///
448/// # Example
449///
450/// ```rust,ignore
451/// let result = projection
452/// .load::<Account>("account-123")
453/// .data(app_config) // Add shared data
454/// .aggregator::<User>("user-456") // Add related aggregate
455/// .execute(&executor)
456/// .await?;
457/// ```
458pub struct LoadBuilder<P: Snapshot + Default + 'static, E: Executor> {
459 key: String,
460 id: String,
461 aggregators: HashMap<String, String>,
462 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
463 context: context::RwContext,
464}
465
466impl<P: Snapshot + Default + 'static, E: Executor> LoadBuilder<P, E> {
467 /// Adds shared data to the load context.
468 ///
469 /// Data added here is accessible in handlers via the context.
470 pub fn data<D: Send + Sync + 'static>(&mut self, v: D) -> &mut Self {
471 self.context.insert(v);
472
473 self
474 }
475
476 /// Adds a related aggregate to load events from.
477 ///
478 /// Use this when the projection needs events from multiple aggregates.
479 pub fn aggregator<A: Aggregator>(&mut self, id: impl Into<String>) -> &mut Self {
480 self.aggregators
481 .insert(A::aggregator_type().to_owned(), id.into());
482
483 self
484 }
485
486 /// Executes the load operation, returning the rebuilt state.
487 ///
488 /// Returns `None` if no events exist for the aggregate.
489 /// Returns `Err` if there are too many events to process in one batch.
490 pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<LoadResult<P>>> {
491 let context = Context {
492 context: self.context.clone(),
493 executor,
494 };
495
496 let cursor = executor.get_subscriber_cursor(self.key.to_owned()).await?;
497 let loaded = P::restore(&context, self.id.to_owned()).await?;
498 let has_loaded = loaded.is_some();
499 let mut snapshot = loaded.unwrap_or_default();
500
501 let read_aggregators = self
502 .handlers
503 .values()
504 .map(|h| match self.aggregators.get(h.aggregator_type()) {
505 Some(id) => ReadAggregator {
506 aggregator_type: h.aggregator_type().to_owned(),
507 aggregator_id: Some(id.to_owned()),
508 name: None,
509 },
510 _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
511 })
512 .collect::<Vec<_>>();
513
514 let events = executor
515 .read(
516 Some(read_aggregators.to_vec()),
517 None,
518 Args::forward(100, cursor.clone()),
519 )
520 .await?;
521
522 for event in events.edges.iter() {
523 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
524 let Some(handler) = self.handlers.get(&key) else {
525 tracing::debug!("No handler found for {}/{key}", self.key);
526 continue;
527 };
528
529 handler.apply(&mut snapshot, &event.node).await?;
530 }
531
532 if events.page_info.has_next_page {
533 anyhow::bail!("Too busy");
534 }
535
536 if let Some(event) = events.edges.last() {
537 snapshot.version = event.node.version;
538 snapshot.routing_key = event.node.routing_key.to_owned();
539
540 return Ok(Some(snapshot));
541 }
542
543 if !has_loaded {
544 return Ok(None);
545 }
546
547 let events = executor
548 .read(
549 Some(read_aggregators.to_vec()),
550 None,
551 Args::backward(1, None),
552 )
553 .await?;
554
555 if let Some(event) = events.edges.first() {
556 snapshot.version = event.node.version;
557 snapshot.routing_key = event.node.routing_key.to_owned();
558
559 return Ok(Some(snapshot));
560 }
561
562 Ok(None)
563 }
564}
565
566/// Builder for creating event subscriptions.
567///
568/// Created via [`Projection::subscription`], this builder configures
569/// a continuous event processing subscription with retry logic,
570/// routing key filtering, and graceful shutdown support.
571///
572/// # Example
573///
574/// ```rust,ignore
575/// let subscription = projection
576/// .subscription()
577/// .routing_key("accounts")
578/// .chunk_size(100)
579/// .retry(5)
580/// .delay(Duration::from_secs(10))
581/// .start(&executor)
582/// .await?;
583///
584/// // Later, gracefully shutdown
585/// subscription.shutdown().await?;
586/// ```
587pub struct SubscriptionBuilder<P: 'static, E: Executor> {
588 key: String,
589 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
590 context: context::RwContext,
591 routing_key: RoutingKey,
592 delay: Option<Duration>,
593 chunk_size: u16,
594 is_accept_failure: bool,
595 retry: Option<u8>,
596 aggregators: HashMap<String, String>,
597}
598
599impl<P, E: Executor + 'static> SubscriptionBuilder<P, E> {
600 /// Allows the subscription to continue after handler failures.
601 ///
602 /// By default, subscriptions stop on the first error. With this flag,
603 /// errors are logged but processing continues.
604 pub fn accept_failure(mut self) -> Self {
605 self.is_accept_failure = true;
606
607 self
608 }
609
610 /// Sets the number of events to process per batch.
611 ///
612 /// Default is 300.
613 pub fn chunk_size(mut self, v: u16) -> Self {
614 self.chunk_size = v;
615
616 self
617 }
618
619 /// Sets a delay before starting the subscription.
620 ///
621 /// Useful for staggering subscription starts in multi-node deployments.
622 pub fn delay(mut self, v: Duration) -> Self {
623 self.delay = Some(v);
624
625 self
626 }
627
628 /// Filters events by routing key.
629 ///
630 /// Only events with the matching routing key will be processed.
631 pub fn routing_key(mut self, v: impl Into<String>) -> Self {
632 self.routing_key = RoutingKey::Value(Some(v.into()));
633
634 self
635 }
636
637 /// Sets the maximum number of retries on failure.
638 ///
639 /// Uses exponential backoff. Default is 30.
640 pub fn retry(mut self, v: u8) -> Self {
641 self.retry = Some(v);
642
643 self
644 }
645
646 fn without_retry(mut self) -> Self {
647 self.retry = None;
648
649 self
650 }
651
652 /// Processes all events regardless of routing key.
653 pub fn all(mut self) -> Self {
654 self.routing_key = RoutingKey::All;
655
656 self
657 }
658
659 /// Adds a related aggregate to process events from.
660 pub fn aggregator<A: Aggregator>(mut self, id: impl Into<String>) -> Self {
661 self.aggregators
662 .insert(A::aggregator_type().to_owned(), id.into());
663
664 self
665 }
666
667 fn read_aggregators(&self) -> Vec<ReadAggregator> {
668 self.handlers
669 .values()
670 .map(|h| match self.aggregators.get(h.aggregator_type()) {
671 Some(id) => ReadAggregator {
672 aggregator_type: h.aggregator_type().to_owned(),
673 aggregator_id: Some(id.to_owned()),
674 name: Some(h.event_name().to_owned()),
675 },
676 _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
677 })
678 .collect()
679 }
680
681 fn key(&self) -> String {
682 if let RoutingKey::Value(Some(ref key)) = self.routing_key {
683 return format!("{key}.{}", self.key);
684 }
685
686 self.key.to_owned()
687 }
688
689 async fn process(
690 &self,
691 executor: &E,
692 id: &Ulid,
693 aggregators: &[ReadAggregator],
694 mut rx: Option<&mut tokio::sync::oneshot::Receiver<()>>,
695 ) -> anyhow::Result<()> {
696 let mut interval = interval_at(
697 Instant::now() - Duration::from_millis(400),
698 Duration::from_millis(300),
699 );
700
701 loop {
702 interval.tick().await;
703
704 if !executor.is_subscriber_running(self.key(), *id).await? {
705 return Ok(());
706 }
707
708 let cursor = executor.get_subscriber_cursor(self.key()).await?;
709
710 let timestamp = executor
711 .read(
712 Some(aggregators.to_vec()),
713 Some(self.routing_key.to_owned()),
714 Args::backward(1, None),
715 )
716 .await?
717 .edges
718 .last()
719 .map(|e| e.node.timestamp)
720 .unwrap_or_default();
721
722 let res = executor
723 .read(
724 Some(aggregators.to_vec()),
725 Some(self.routing_key.to_owned()),
726 Args::forward(self.chunk_size, cursor),
727 )
728 .await?;
729
730 if res.edges.is_empty() {
731 return Ok(());
732 }
733
734 let context = Context {
735 context: self.context.clone(),
736 executor,
737 };
738
739 for event in res.edges {
740 if let Some(rx) = rx.as_mut() {
741 if rx.try_recv().is_ok() {
742 tracing::info!(
743 key = self.key(),
744 "Subscription received shutdown signal, stopping gracefull"
745 );
746
747 return Ok(());
748 }
749 }
750
751 tracing::Span::current().record("aggregator_type", &event.node.aggregator_type);
752 tracing::Span::current().record("aggregator_id", &event.node.aggregator_id);
753 tracing::Span::current().record("event", &event.node.name);
754
755 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
756 let Some(handler) = self.handlers.get(&key) else {
757 panic!("No handler found for {}/{key}", self.key());
758 };
759
760 handler.handle(&context, &event.node).await?;
761
762 executor
763 .acknowledge(
764 self.key(),
765 event.cursor.to_owned(),
766 timestamp - event.node.timestamp,
767 )
768 .await?;
769 }
770 }
771 }
772
773 /// Starts the subscription without retry logic.
774 ///
775 /// Equivalent to calling `start()` with retries disabled.
776 pub async fn unretry_start(self, executor: &E) -> anyhow::Result<Subscription>
777 where
778 E: Clone,
779 {
780 self.without_retry().start(executor).await
781 }
782
783 /// Starts a continuous background subscription.
784 ///
785 /// Returns a [`Subscription`] handle that can be used for graceful shutdown.
786 /// The subscription runs in a spawned tokio task and polls for new events.
787 pub async fn start(self, executor: &E) -> anyhow::Result<Subscription>
788 where
789 E: Clone,
790 {
791 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
792 let executor = executor.clone();
793 let id = Ulid::new();
794 let subscription_id = id;
795
796 executor
797 .upsert_subscriber(self.key(), id.to_owned())
798 .await?;
799
800 let task_handle = tokio::spawn(async move {
801 let read_aggregators = self.read_aggregators();
802 let start = self
803 .delay
804 .map(|d| Instant::now() + d)
805 .unwrap_or_else(Instant::now);
806
807 let mut interval = interval_at(
808 start - Duration::from_millis(1200),
809 Duration::from_millis(1000),
810 );
811
812 loop {
813 if shutdown_rx.try_recv().is_ok() {
814 tracing::info!(
815 key = self.key(),
816 "Subscription received shutdown signal, stopping gracefull"
817 );
818
819 break;
820 }
821
822 interval.tick().await;
823
824 let _ = tracing::error_span!(
825 "start",
826 key = self.key(),
827 aggregator_type = tracing::field::Empty,
828 aggregator_id = tracing::field::Empty,
829 event = tracing::field::Empty,
830 )
831 .entered();
832
833 let result = match self.retry {
834 Some(retry) => {
835 (|| async { self.process(&executor, &id, &read_aggregators, None).await })
836 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
837 .sleep(tokio::time::sleep)
838 .notify(|err, dur| {
839 tracing::error!(
840 error = %err,
841 duration = ?dur,
842 "Failed to process event"
843 );
844 })
845 .await
846 }
847 _ => self.process(&executor, &id, &read_aggregators, None).await,
848 };
849
850 let Err(err) = result else {
851 continue;
852 };
853
854 tracing::error!(error = %err, "Failed to process event");
855
856 if !self.is_accept_failure {
857 break;
858 }
859 }
860 });
861
862 Ok(Subscription {
863 id: subscription_id,
864 task_handle,
865 shutdown_tx,
866 })
867 }
868
869 /// Executes the subscription once without retry logic.
870 ///
871 /// Processes all pending events and returns. Does not poll for new events.
872 pub async fn unretry_execute(self, executor: &E) -> anyhow::Result<()> {
873 self.without_retry().execute(executor).await
874 }
875
876 /// Executes the subscription once, processing all pending events.
877 ///
878 /// Unlike `start()`, this does not run continuously. It processes
879 /// all currently pending events and returns.
880 pub async fn execute(&self, executor: &E) -> anyhow::Result<()> {
881 let id = Ulid::new();
882
883 executor
884 .upsert_subscriber(self.key(), id.to_owned())
885 .await?;
886
887 let read_aggregators = self.read_aggregators();
888
889 let _ = tracing::error_span!(
890 "execute",
891 key = self.key(),
892 aggregator_type = tracing::field::Empty,
893 aggregator_id = tracing::field::Empty,
894 event = tracing::field::Empty,
895 )
896 .entered();
897
898 match self.retry {
899 Some(retry) => {
900 (|| async { self.process(executor, &id, &read_aggregators, None).await })
901 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
902 .sleep(tokio::time::sleep)
903 .notify(|err, dur| {
904 tracing::error!(
905 error = %err,
906 duration = ?dur,
907 "Failed to process event"
908 );
909 })
910 .await
911 }
912 _ => self.process(executor, &id, &read_aggregators, None).await,
913 }
914 }
915}
916
917/// Handle to a running event subscription.
918///
919/// Returned by [`SubscriptionBuilder::start`], this handle provides
920/// the subscription ID and a method for graceful shutdown.
921///
922/// # Example
923///
924/// ```rust,ignore
925/// let subscription = projection
926/// .subscription()
927/// .start(&executor)
928/// .await?;
929///
930/// println!("Started subscription: {}", subscription.id);
931///
932/// // On application shutdown
933/// subscription.shutdown().await?;
934/// ```
935#[derive(Debug)]
936pub struct Subscription {
937 /// Unique ID for this subscription instance
938 pub id: Ulid,
939 task_handle: tokio::task::JoinHandle<()>,
940 shutdown_tx: tokio::sync::oneshot::Sender<()>,
941}
942
943impl Subscription {
944 /// Gracefully shuts down the subscription.
945 ///
946 /// Signals the subscription to stop and waits for it to finish
947 /// processing the current event before returning.
948 pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
949 let _ = self.shutdown_tx.send(());
950
951 self.task_handle.await
952 }
953}