evento_core/projection.rs
1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//! .handler(account_opened)
24//! .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//! .load::<Account>("account-123")
29//! .execute(&executor)
30//! .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//! .subscription()
35//! .routing_key("accounts")
36//! .start(&executor)
37//! .await?;
38//! ```
39
40use std::{
41 collections::HashMap,
42 future::Future,
43 ops::{Deref, DerefMut},
44 pin::Pin,
45 time::Duration,
46};
47use tokio::time::{interval_at, Instant};
48use ulid::Ulid;
49
50use backon::{ExponentialBuilder, Retryable};
51
52use crate::{
53 context,
54 cursor::{Args, Cursor},
55 Executor, ReadAggregator,
56};
57
58/// Filter for events by routing key.
59///
60/// Routing keys allow partitioning events for parallel processing
61/// or filtering subscriptions to specific event streams.
62#[derive(Clone)]
63pub enum RoutingKey {
64 /// Match all events regardless of routing key
65 All,
66 /// Match events with a specific routing key (or no key if `None`)
67 Value(Option<String>),
68}
69
70/// Handler context providing access to executor and shared data.
71///
72/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
73/// data storage and provides access to the executor for database operations.
74///
75/// # Example
76///
77/// ```rust,ignore
78/// #[evento::handler]
79/// async fn my_handler<E: Executor>(
80/// event: Event<MyEventData>,
81/// action: Action<'_, MyView, E>,
82/// ) -> anyhow::Result<()> {
83/// if let Action::Handle(ctx) = action {
84/// // Access shared data
85/// let config: Data<AppConfig> = ctx.extract();
86///
87/// // Use executor for queries
88/// let events = ctx.executor.read(...).await?;
89/// }
90/// Ok(())
91/// }
92/// ```
93#[derive(Clone)]
94pub struct Context<'a, E: Executor> {
95 context: context::RwContext,
96 /// Reference to the executor for database operations
97 pub executor: &'a E,
98}
99
100impl<'a, E: Executor> Deref for Context<'a, E> {
101 type Target = context::RwContext;
102
103 fn deref(&self) -> &Self::Target {
104 &self.context
105 }
106}
107
108/// Trait for aggregate types.
109///
110/// Aggregates are the root entities in event sourcing. Each aggregate
111/// type has a unique identifier string used for event storage and routing.
112///
113/// This trait is typically derived using the `#[evento::aggregator]` macro.
114///
115/// # Example
116///
117/// ```rust,ignore
118/// #[evento::aggregator("myapp/Account")]
119/// #[derive(Default)]
120/// pub struct Account {
121/// pub balance: i64,
122/// pub owner: String,
123/// }
124/// ```
125pub trait Aggregator: Default {
126 /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
127 fn aggregator_type() -> &'static str;
128}
129
130/// Trait for event types.
131///
132/// Events represent state changes that have occurred. Each event type
133/// has a name and belongs to an aggregator type.
134///
135/// This trait is typically derived using the `#[evento::aggregator]` macro.
136///
137/// # Example
138///
139/// ```rust,ignore
140/// #[evento::aggregator("myapp/Account")]
141/// #[derive(bitcode::Encode, bitcode::Decode)]
142/// pub struct AccountOpened {
143/// pub owner: String,
144/// }
145/// ```
146pub trait Event: Aggregator {
147 /// Returns the event name (e.g., "AccountOpened")
148 fn event_name() -> &'static str;
149}
150
151/// Trait for event handlers.
152///
153/// Handlers process events in two modes:
154/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
155/// - `apply`: For loading aggregate state by replaying events
156///
157/// This trait is typically implemented via the `#[evento::handler]` macro.
158pub trait Handler<P: 'static, E: Executor>: Sync + Send {
159 /// Handles an event during subscription processing.
160 ///
161 /// This is called when processing events in a subscription context,
162 /// where side effects like database updates or API calls are appropriate.
163 fn handle<'a>(
164 &'a self,
165 context: &'a Context<'a, E>,
166 event: &'a crate::Event,
167 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
168
169 /// Applies an event to build projection state.
170 ///
171 /// This is called when loading aggregate state by replaying events.
172 /// It should be a pure function that modifies the projection without side effects.
173 fn apply<'a>(
174 &'a self,
175 projection: &'a mut P,
176 event: &'a crate::Event,
177 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
178
179 /// Returns the aggregator type this handler processes.
180 fn aggregator_type(&self) -> &'static str;
181 /// Returns the event name this handler processes.
182 fn event_name(&self) -> &'static str;
183}
184
185/// Action passed to event handlers.
186///
187/// Determines whether the handler should apply state changes or
188/// handle the event with side effects.
189pub enum Action<'a, P: 'static, E: Executor> {
190 /// Apply event to projection state (for loading)
191 Apply(&'a mut P),
192 /// Handle event with context (for subscriptions)
193 Handle(&'a Context<'a, E>),
194}
195
196/// Typed event with deserialized data and metadata.
197///
198/// `EventData` wraps a raw [`Event`](crate::Event) and provides typed access
199/// to the deserialized event data and metadata. It implements `Deref` to
200/// provide access to the underlying event fields (id, timestamp, version, etc.).
201///
202/// # Type Parameters
203///
204/// - `D`: The event data type (e.g., `AccountOpened`)
205/// - `M`: The metadata type (defaults to `bool` for no metadata)
206///
207/// # Example
208///
209/// ```rust,ignore
210/// use evento::metadata::Event;
211///
212/// #[evento::handler]
213/// async fn handle_deposit<E: Executor>(
214/// event: Event<MoneyDeposited>,
215/// action: Action<'_, AccountView, E>,
216/// ) -> anyhow::Result<()> {
217/// // Access typed data
218/// println!("Amount: {}", event.data.amount);
219///
220/// // Access metadata
221/// if let Ok(user) = event.metadata.user() {
222/// println!("By user: {}", user);
223/// }
224///
225/// // Access underlying event fields via Deref
226/// println!("Event ID: {}", event.id);
227/// println!("Version: {}", event.version);
228///
229/// Ok(())
230/// }
231/// ```
232pub struct EventData<D, M = bool> {
233 event: crate::Event,
234 /// The typed event data
235 pub data: D,
236 /// The typed event metadata
237 pub metadata: M,
238}
239
240impl<D, M> Deref for EventData<D, M> {
241 type Target = crate::Event;
242
243 fn deref(&self) -> &Self::Target {
244 &self.event
245 }
246}
247
248impl<D, M> TryFrom<&crate::Event> for EventData<D, M>
249where
250 D: bitcode::DecodeOwned,
251 M: bitcode::DecodeOwned,
252{
253 type Error = bitcode::Error;
254
255 fn try_from(value: &crate::Event) -> Result<Self, Self::Error> {
256 let data = bitcode::decode::<D>(&value.data)?;
257 let metadata = bitcode::decode::<M>(&value.metadata)?;
258 Ok(EventData {
259 data,
260 metadata,
261 event: value.clone(),
262 })
263 }
264}
265
266/// Container for event handlers that build a projection.
267///
268/// A `Projection` groups related event handlers together and provides
269/// methods to load aggregate state or create subscriptions.
270///
271/// # Type Parameters
272///
273/// - `P`: The projection/view type being built
274/// - `E`: The executor type for database operations
275///
276/// # Example
277///
278/// ```rust,ignore
279/// let projection = Projection::<AccountView, _>::new("accounts")
280/// .handler(account_opened)
281/// .handler(money_deposited)
282/// .handler(money_withdrawn);
283///
284/// // Use for loading state
285/// let state = projection.clone()
286/// .load::<Account>("account-123")
287/// .execute(&executor)
288/// .await?;
289///
290/// // Or create a subscription
291/// let sub = projection
292/// .subscription()
293/// .start(&executor)
294/// .await?;
295/// ```
296pub struct Projection<P: 'static, E: Executor> {
297 key: String,
298 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
299}
300
301impl<P: 'static, E: Executor> Projection<P, E> {
302 /// Creates a new projection with the given key.
303 ///
304 /// The key is used as the subscription identifier for cursor tracking.
305 pub fn new(key: impl Into<String>) -> Self {
306 Self {
307 key: key.into(),
308 handlers: HashMap::new(),
309 }
310 }
311
312 /// Registers an event handler with this projection.
313 ///
314 /// # Panics
315 ///
316 /// Panics if a handler for the same event type is already registered.
317 pub fn handler<H: Handler<P, E> + 'static>(mut self, h: H) -> Self {
318 let key = format!("{}_{}", h.aggregator_type(), h.event_name());
319 if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
320 panic!("Cannot register event handler: key {} already exists", key);
321 }
322 self
323 }
324
325 /// Creates a builder for loading aggregate state.
326 ///
327 /// This consumes the projection and returns a [`LoadBuilder`] configured
328 /// to load the state for the specified aggregate.
329 ///
330 /// # Type Parameters
331 ///
332 /// - `A`: The aggregate type to load
333 pub fn load<A: Aggregator>(self, id: impl Into<String>) -> LoadBuilder<P, E>
334 where
335 P: Snapshot + Default,
336 {
337 let id = id.into();
338 let mut aggregators = HashMap::new();
339 aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
340
341 LoadBuilder {
342 key: self.key.to_owned(),
343 id,
344 aggregators,
345 handlers: self.handlers,
346 context: Default::default(),
347 filter_events_by_name: true,
348 }
349 }
350
351 /// Creates a builder for a continuous event subscription.
352 ///
353 /// This consumes the projection and returns a [`SubscriptionBuilder`]
354 /// that can be configured and started.
355 pub fn subscription(self) -> SubscriptionBuilder<P, E> {
356 SubscriptionBuilder {
357 key: self.key.to_owned(),
358 context: Default::default(),
359 handlers: self.handlers,
360 delay: None,
361 retry: Some(30),
362 chunk_size: 300,
363 is_accept_failure: false,
364 routing_key: RoutingKey::Value(None),
365 aggregators: Default::default(),
366 }
367 }
368}
369
370/// Result of loading an aggregate's state.
371///
372/// Contains the rebuilt projection state along with the current version
373/// and routing key. Implements `Deref` and `DerefMut` for transparent
374/// access to the inner item.
375///
376/// # Example
377///
378/// ```rust,ignore
379/// let result: LoadResult<AccountView> = projection
380/// .load::<Account>("account-123")
381/// .execute(&executor)
382/// .await?
383/// .expect("Account not found");
384///
385/// // Access inner item via Deref
386/// println!("Balance: {}", result.balance);
387///
388/// // Access metadata
389/// println!("Version: {}", result.version);
390/// println!("Routing key: {:?}", result.routing_key);
391/// ```
392#[derive(Debug, Clone, Default)]
393pub struct LoadResult<A> {
394 /// The loaded projection/view state
395 pub item: A,
396 /// Current version of the aggregate
397 pub version: u16,
398 /// Routing key for the aggregate (if set)
399 pub routing_key: Option<String>,
400}
401
402impl<A> Deref for LoadResult<A> {
403 type Target = A;
404 fn deref(&self) -> &Self::Target {
405 &self.item
406 }
407}
408
409impl<A> DerefMut for LoadResult<A> {
410 fn deref_mut(&mut self) -> &mut Self::Target {
411 &mut self.item
412 }
413}
414
415/// Trait for types that can be restored from snapshots.
416///
417/// Snapshots provide a performance optimization by storing pre-computed
418/// state, avoiding the need to replay all events from the beginning.
419///
420/// This trait is typically implemented via the `#[evento::snapshot]` macro.
421///
422/// # Example
423///
424/// ```rust,ignore
425/// #[evento::snapshot]
426/// #[derive(Default)]
427/// pub struct AccountView {
428/// pub balance: i64,
429/// pub owner: String,
430/// }
431///
432/// // The macro generates the restore implementation that loads
433/// // from a snapshot table if available
434/// ```
435pub trait Snapshot: Sized {
436 /// Restores state from a snapshot if available.
437 ///
438 /// Returns `None` if no snapshot exists for the given ID.
439 fn restore<'a>(
440 _context: &'a context::RwContext,
441 _id: String,
442 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Option<Self>>> + Send + 'a>> {
443 Box::pin(async { Ok(None) })
444 }
445}
446
447/// Builder for loading aggregate state from events.
448///
449/// Created via [`Projection::load`], this builder configures how to
450/// load an aggregate's state by replaying events.
451///
452/// # Example
453///
454/// ```rust,ignore
455/// let result = projection
456/// .load::<Account>("account-123")
457/// .data(app_config) // Add shared data
458/// .aggregator::<User>("user-456") // Add related aggregate
459/// .execute(&executor)
460/// .await?;
461/// ```
462pub struct LoadBuilder<P: Snapshot + Default + 'static, E: Executor> {
463 key: String,
464 id: String,
465 aggregators: HashMap<String, String>,
466 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
467 context: context::RwContext,
468 filter_events_by_name: bool,
469}
470
471impl<P: Snapshot + Default + 'static, E: Executor> LoadBuilder<P, E> {
472 /// Adds shared data to the load context.
473 ///
474 /// Data added here is accessible in handlers via the context.
475 pub fn data<D: Send + Sync + 'static>(&mut self, v: D) -> &mut Self {
476 self.context.insert(v);
477
478 self
479 }
480
481 /// Adds a related aggregate to load events from.
482 ///
483 /// Use this when the projection needs events from multiple aggregates.
484 pub fn aggregator<A: Aggregator>(&mut self, id: impl Into<String>) -> &mut Self {
485 self.aggregators
486 .insert(A::aggregator_type().to_owned(), id.into());
487
488 self
489 }
490
491 pub fn filter_events_by_name(&mut self, v: bool) -> &mut Self {
492 self.filter_events_by_name = v;
493
494 self
495 }
496
497 /// Executes the load operation, returning the rebuilt state.
498 ///
499 /// Returns `None` if no events exist for the aggregate.
500 /// Returns `Err` if there are too many events to process in one batch.
501 pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<LoadResult<P>>> {
502 let context = Context {
503 context: self.context.clone(),
504 executor,
505 };
506
507 let mut cursor = executor.get_subscriber_cursor(self.key.to_owned()).await?;
508 let (mut version, mut routing_key) = match cursor {
509 Some(ref cursor) => {
510 let cursor = crate::Event::deserialize_cursor(cursor)?;
511
512 (cursor.v, cursor.r)
513 }
514 _ => (0, None),
515 };
516 let loaded = P::restore(&context, self.id.to_owned()).await?;
517 if loaded.is_none() {
518 cursor = None;
519 }
520
521 let read_aggregators = self
522 .handlers
523 .values()
524 .map(|h| match self.aggregators.get(h.aggregator_type()) {
525 Some(id) => ReadAggregator {
526 aggregator_type: h.aggregator_type().to_owned(),
527 aggregator_id: Some(id.to_owned()),
528 name: if self.filter_events_by_name {
529 Some(h.event_name().to_owned())
530 } else {
531 None
532 },
533 },
534 _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
535 })
536 .collect::<Vec<_>>();
537
538 let events = executor
539 .read(
540 Some(read_aggregators.to_vec()),
541 None,
542 Args::forward(100, cursor.clone()),
543 )
544 .await?;
545
546 if events.edges.is_empty() && loaded.is_none() {
547 return Ok(None);
548 }
549
550 let mut snapshot = loaded.unwrap_or_default();
551
552 for event in events.edges.iter() {
553 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
554 let Some(handler) = self.handlers.get(&key) else {
555 tracing::trace!("No handler found for {}/{key}", self.key);
556 continue;
557 };
558
559 handler.apply(&mut snapshot, &event.node).await?;
560 }
561
562 if events.page_info.has_next_page {
563 anyhow::bail!("Too busy");
564 }
565
566 if let Some(event) = events.edges.last() {
567 version = event.node.version;
568 routing_key = event.node.routing_key.to_owned();
569 }
570
571 Ok(Some(LoadResult {
572 item: snapshot,
573 version,
574 routing_key,
575 }))
576 }
577}
578
579/// Builder for creating event subscriptions.
580///
581/// Created via [`Projection::subscription`], this builder configures
582/// a continuous event processing subscription with retry logic,
583/// routing key filtering, and graceful shutdown support.
584///
585/// # Example
586///
587/// ```rust,ignore
588/// let subscription = projection
589/// .subscription()
590/// .routing_key("accounts")
591/// .chunk_size(100)
592/// .retry(5)
593/// .delay(Duration::from_secs(10))
594/// .start(&executor)
595/// .await?;
596///
597/// // Later, gracefully shutdown
598/// subscription.shutdown().await?;
599/// ```
600pub struct SubscriptionBuilder<P: 'static, E: Executor> {
601 key: String,
602 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
603 context: context::RwContext,
604 routing_key: RoutingKey,
605 delay: Option<Duration>,
606 chunk_size: u16,
607 is_accept_failure: bool,
608 retry: Option<u8>,
609 aggregators: HashMap<String, String>,
610}
611
612impl<P, E: Executor + 'static> SubscriptionBuilder<P, E> {
613 /// Adds shared data to the load context.
614 ///
615 /// Data added here is accessible in handlers via the context.
616 pub fn data<D: Send + Sync + 'static>(self, v: D) -> Self {
617 self.context.insert(v);
618
619 self
620 }
621
622 /// Allows the subscription to continue after handler failures.
623 ///
624 /// By default, subscriptions stop on the first error. With this flag,
625 /// errors are logged but processing continues.
626 pub fn accept_failure(mut self) -> Self {
627 self.is_accept_failure = true;
628
629 self
630 }
631
632 /// Sets the number of events to process per batch.
633 ///
634 /// Default is 300.
635 pub fn chunk_size(mut self, v: u16) -> Self {
636 self.chunk_size = v;
637
638 self
639 }
640
641 /// Sets a delay before starting the subscription.
642 ///
643 /// Useful for staggering subscription starts in multi-node deployments.
644 pub fn delay(mut self, v: Duration) -> Self {
645 self.delay = Some(v);
646
647 self
648 }
649
650 /// Filters events by routing key.
651 ///
652 /// Only events with the matching routing key will be processed.
653 pub fn routing_key(mut self, v: impl Into<String>) -> Self {
654 self.routing_key = RoutingKey::Value(Some(v.into()));
655
656 self
657 }
658
659 /// Sets the maximum number of retries on failure.
660 ///
661 /// Uses exponential backoff. Default is 30.
662 pub fn retry(mut self, v: u8) -> Self {
663 self.retry = Some(v);
664
665 self
666 }
667
668 fn without_retry(mut self) -> Self {
669 self.retry = None;
670
671 self
672 }
673
674 /// Processes all events regardless of routing key.
675 pub fn all(mut self) -> Self {
676 self.routing_key = RoutingKey::All;
677
678 self
679 }
680
681 /// Adds a related aggregate to process events from.
682 pub fn aggregator<A: Aggregator>(mut self, id: impl Into<String>) -> Self {
683 self.aggregators
684 .insert(A::aggregator_type().to_owned(), id.into());
685
686 self
687 }
688
689 fn read_aggregators(&self) -> Vec<ReadAggregator> {
690 self.handlers
691 .values()
692 .map(|h| match self.aggregators.get(h.aggregator_type()) {
693 Some(id) => ReadAggregator {
694 aggregator_type: h.aggregator_type().to_owned(),
695 aggregator_id: Some(id.to_owned()),
696 name: Some(h.event_name().to_owned()),
697 },
698 _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
699 })
700 .collect()
701 }
702
703 fn key(&self) -> String {
704 if let RoutingKey::Value(Some(ref key)) = self.routing_key {
705 return format!("{key}.{}", self.key);
706 }
707
708 self.key.to_owned()
709 }
710
711 async fn process(
712 &self,
713 executor: &E,
714 id: &Ulid,
715 aggregators: &[ReadAggregator],
716 mut rx: Option<&mut tokio::sync::oneshot::Receiver<()>>,
717 ) -> anyhow::Result<()> {
718 let mut interval = interval_at(
719 Instant::now() - Duration::from_millis(400),
720 Duration::from_millis(300),
721 );
722
723 loop {
724 interval.tick().await;
725
726 if !executor.is_subscriber_running(self.key(), *id).await? {
727 return Ok(());
728 }
729
730 let cursor = executor.get_subscriber_cursor(self.key()).await?;
731
732 let timestamp = executor
733 .read(
734 Some(aggregators.to_vec()),
735 Some(self.routing_key.to_owned()),
736 Args::backward(1, None),
737 )
738 .await?
739 .edges
740 .last()
741 .map(|e| e.node.timestamp)
742 .unwrap_or_default();
743
744 let res = executor
745 .read(
746 Some(aggregators.to_vec()),
747 Some(self.routing_key.to_owned()),
748 Args::forward(self.chunk_size, cursor),
749 )
750 .await?;
751
752 if res.edges.is_empty() {
753 return Ok(());
754 }
755
756 let context = Context {
757 context: self.context.clone(),
758 executor,
759 };
760
761 for event in res.edges {
762 if let Some(rx) = rx.as_mut() {
763 if rx.try_recv().is_ok() {
764 tracing::info!(
765 key = self.key(),
766 "Subscription received shutdown signal, stopping gracefull"
767 );
768
769 return Ok(());
770 }
771 }
772
773 tracing::Span::current().record("subscription", self.key());
774 tracing::Span::current().record("aggregator_type", &event.node.aggregator_type);
775 tracing::Span::current().record("aggregator_id", &event.node.aggregator_id);
776 tracing::Span::current().record("event", &event.node.name);
777
778 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
779 let Some(handler) = self.handlers.get(&key) else {
780 panic!("No handler found for {}/{key}", self.key());
781 };
782
783 handler.handle(&context, &event.node).await?;
784
785 tracing::debug!("handle completed");
786
787 executor
788 .acknowledge(
789 self.key(),
790 event.cursor.to_owned(),
791 timestamp - event.node.timestamp,
792 )
793 .await?;
794 }
795 }
796 }
797
798 /// Starts the subscription without retry logic.
799 ///
800 /// Equivalent to calling `start()` with retries disabled.
801 pub async fn unretry_start(self, executor: &E) -> anyhow::Result<Subscription>
802 where
803 E: Clone,
804 {
805 self.without_retry().start(executor).await
806 }
807
808 /// Starts a continuous background subscription.
809 ///
810 /// Returns a [`Subscription`] handle that can be used for graceful shutdown.
811 /// The subscription runs in a spawned tokio task and polls for new events.
812 pub async fn start(self, executor: &E) -> anyhow::Result<Subscription>
813 where
814 E: Clone,
815 {
816 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
817 let executor = executor.clone();
818 let id = Ulid::new();
819 let subscription_id = id;
820
821 executor
822 .upsert_subscriber(self.key(), id.to_owned())
823 .await?;
824
825 let task_handle = tokio::spawn(async move {
826 let read_aggregators = self.read_aggregators();
827 let start = self
828 .delay
829 .map(|d| Instant::now() + d)
830 .unwrap_or_else(Instant::now);
831
832 let mut interval = interval_at(
833 start - Duration::from_millis(1200),
834 Duration::from_millis(1000),
835 );
836
837 loop {
838 if shutdown_rx.try_recv().is_ok() {
839 tracing::info!(
840 key = self.key(),
841 "Subscription received shutdown signal, stopping gracefull"
842 );
843
844 break;
845 }
846
847 interval.tick().await;
848
849 let _ = tracing::error_span!(
850 "start",
851 key = self.key(),
852 aggregator_type = tracing::field::Empty,
853 aggregator_id = tracing::field::Empty,
854 event = tracing::field::Empty,
855 )
856 .entered();
857
858 let result = match self.retry {
859 Some(retry) => {
860 (|| async { self.process(&executor, &id, &read_aggregators, None).await })
861 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
862 .sleep(tokio::time::sleep)
863 .notify(|err, dur| {
864 tracing::error!(
865 error = %err,
866 duration = ?dur,
867 "Failed to process event"
868 );
869 })
870 .await
871 }
872 _ => self.process(&executor, &id, &read_aggregators, None).await,
873 };
874
875 let Err(err) = result else {
876 continue;
877 };
878
879 tracing::error!(error = %err, "Failed to process event");
880
881 if !self.is_accept_failure {
882 break;
883 }
884 }
885 });
886
887 Ok(Subscription {
888 id: subscription_id,
889 task_handle,
890 shutdown_tx,
891 })
892 }
893
894 /// Executes the subscription once without retry logic.
895 ///
896 /// Processes all pending events and returns. Does not poll for new events.
897 pub async fn unretry_execute(self, executor: &E) -> anyhow::Result<()> {
898 self.without_retry().execute(executor).await
899 }
900
901 /// Executes the subscription once, processing all pending events.
902 ///
903 /// Unlike `start()`, this does not run continuously. It processes
904 /// all currently pending events and returns.
905 pub async fn execute(&self, executor: &E) -> anyhow::Result<()> {
906 let id = Ulid::new();
907
908 executor
909 .upsert_subscriber(self.key(), id.to_owned())
910 .await?;
911
912 let read_aggregators = self.read_aggregators();
913
914 let _ = tracing::error_span!(
915 "execute",
916 key = self.key(),
917 aggregator_type = tracing::field::Empty,
918 aggregator_id = tracing::field::Empty,
919 event = tracing::field::Empty,
920 )
921 .entered();
922
923 match self.retry {
924 Some(retry) => {
925 (|| async { self.process(executor, &id, &read_aggregators, None).await })
926 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
927 .sleep(tokio::time::sleep)
928 .notify(|err, dur| {
929 tracing::error!(
930 error = %err,
931 duration = ?dur,
932 "Failed to process event"
933 );
934 })
935 .await
936 }
937 _ => self.process(executor, &id, &read_aggregators, None).await,
938 }
939 }
940}
941
942/// Handle to a running event subscription.
943///
944/// Returned by [`SubscriptionBuilder::start`], this handle provides
945/// the subscription ID and a method for graceful shutdown.
946///
947/// # Example
948///
949/// ```rust,ignore
950/// let subscription = projection
951/// .subscription()
952/// .start(&executor)
953/// .await?;
954///
955/// println!("Started subscription: {}", subscription.id);
956///
957/// // On application shutdown
958/// subscription.shutdown().await?;
959/// ```
960#[derive(Debug)]
961pub struct Subscription {
962 /// Unique ID for this subscription instance
963 pub id: Ulid,
964 task_handle: tokio::task::JoinHandle<()>,
965 shutdown_tx: tokio::sync::oneshot::Sender<()>,
966}
967
968impl Subscription {
969 /// Gracefully shuts down the subscription.
970 ///
971 /// Signals the subscription to stop and waits for it to finish
972 /// processing the current event before returning.
973 pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
974 let _ = self.shutdown_tx.send(());
975
976 self.task_handle.await
977 }
978}