evento_core/
projection.rs

1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//!     .handler(account_opened)
24//!     .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//!     .load::<Account>("account-123")
29//!     .execute(&executor)
30//!     .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//!     .subscription()
35//!     .routing_key("accounts")
36//!     .start(&executor)
37//!     .await?;
38//! ```
39
40use std::{
41    collections::HashMap,
42    future::Future,
43    ops::{Deref, DerefMut},
44    pin::Pin,
45    time::Duration,
46};
47use tokio::time::{interval_at, Instant};
48use ulid::Ulid;
49
50use backon::{ExponentialBuilder, Retryable};
51
52use crate::{context, cursor::Args, Executor, ReadAggregator};
53
54/// Filter for events by routing key.
55///
56/// Routing keys allow partitioning events for parallel processing
57/// or filtering subscriptions to specific event streams.
58#[derive(Clone)]
59pub enum RoutingKey {
60    /// Match all events regardless of routing key
61    All,
62    /// Match events with a specific routing key (or no key if `None`)
63    Value(Option<String>),
64}
65
66/// Handler context providing access to executor and shared data.
67///
68/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
69/// data storage and provides access to the executor for database operations.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// #[evento::handler]
75/// async fn my_handler<E: Executor>(
76///     event: Event<MyEventData>,
77///     action: Action<'_, MyView, E>,
78/// ) -> anyhow::Result<()> {
79///     if let Action::Handle(ctx) = action {
80///         // Access shared data
81///         let config: Data<AppConfig> = ctx.extract();
82///
83///         // Use executor for queries
84///         let events = ctx.executor.read(...).await?;
85///     }
86///     Ok(())
87/// }
88/// ```
89#[derive(Clone)]
90pub struct Context<'a, E: Executor> {
91    context: context::RwContext,
92    /// Reference to the executor for database operations
93    pub executor: &'a E,
94}
95
96impl<'a, E: Executor> Deref for Context<'a, E> {
97    type Target = context::RwContext;
98
99    fn deref(&self) -> &Self::Target {
100        &self.context
101    }
102}
103
104/// Trait for aggregate types.
105///
106/// Aggregates are the root entities in event sourcing. Each aggregate
107/// type has a unique identifier string used for event storage and routing.
108///
109/// This trait is typically derived using the `#[evento::aggregator]` macro.
110///
111/// # Example
112///
113/// ```rust,ignore
114/// #[evento::aggregator("myapp/Account")]
115/// #[derive(Default)]
116/// pub struct Account {
117///     pub balance: i64,
118///     pub owner: String,
119/// }
120/// ```
121pub trait Aggregator: Default {
122    /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
123    fn aggregator_type() -> &'static str;
124}
125
126/// Trait for event types.
127///
128/// Events represent state changes that have occurred. Each event type
129/// has a name and belongs to an aggregator type.
130///
131/// This trait is typically derived using the `#[evento::aggregator]` macro.
132///
133/// # Example
134///
135/// ```rust,ignore
136/// #[evento::aggregator("myapp/Account")]
137/// #[derive(bitcode::Encode, bitcode::Decode)]
138/// pub struct AccountOpened {
139///     pub owner: String,
140/// }
141/// ```
142pub trait Event: Aggregator {
143    /// Returns the event name (e.g., "AccountOpened")
144    fn event_name() -> &'static str;
145}
146
147/// Trait for event handlers.
148///
149/// Handlers process events in two modes:
150/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
151/// - `apply`: For loading aggregate state by replaying events
152///
153/// This trait is typically implemented via the `#[evento::handler]` macro.
154pub trait Handler<P: 'static, E: Executor>: Sync + Send {
155    /// Handles an event during subscription processing.
156    ///
157    /// This is called when processing events in a subscription context,
158    /// where side effects like database updates or API calls are appropriate.
159    fn handle<'a>(
160        &'a self,
161        context: &'a Context<'a, E>,
162        event: &'a crate::Event,
163    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
164
165    /// Applies an event to build projection state.
166    ///
167    /// This is called when loading aggregate state by replaying events.
168    /// It should be a pure function that modifies the projection without side effects.
169    fn apply<'a>(
170        &'a self,
171        projection: &'a mut P,
172        event: &'a crate::Event,
173    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
174
175    /// Returns the aggregator type this handler processes.
176    fn aggregator_type(&self) -> &'static str;
177    /// Returns the event name this handler processes.
178    fn event_name(&self) -> &'static str;
179}
180
181/// Action passed to event handlers.
182///
183/// Determines whether the handler should apply state changes or
184/// handle the event with side effects.
185pub enum Action<'a, P: 'static, E: Executor> {
186    /// Apply event to projection state (for loading)
187    Apply(&'a mut P),
188    /// Handle event with context (for subscriptions)
189    Handle(&'a Context<'a, E>),
190}
191
192/// Typed event with deserialized data and metadata.
193///
194/// `EventData` wraps a raw [`Event`](crate::Event) and provides typed access
195/// to the deserialized event data and metadata. It implements `Deref` to
196/// provide access to the underlying event fields (id, timestamp, version, etc.).
197///
198/// # Type Parameters
199///
200/// - `D`: The event data type (e.g., `AccountOpened`)
201/// - `M`: The metadata type (defaults to `bool` for no metadata)
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use evento::metadata::Event;
207///
208/// #[evento::handler]
209/// async fn handle_deposit<E: Executor>(
210///     event: Event<MoneyDeposited>,
211///     action: Action<'_, AccountView, E>,
212/// ) -> anyhow::Result<()> {
213///     // Access typed data
214///     println!("Amount: {}", event.data.amount);
215///
216///     // Access metadata
217///     if let Ok(user) = event.metadata.user() {
218///         println!("By user: {}", user);
219///     }
220///
221///     // Access underlying event fields via Deref
222///     println!("Event ID: {}", event.id);
223///     println!("Version: {}", event.version);
224///
225///     Ok(())
226/// }
227/// ```
228pub struct EventData<D, M = bool> {
229    event: crate::Event,
230    /// The typed event data
231    pub data: D,
232    /// The typed event metadata
233    pub metadata: M,
234}
235
236impl<D, M> Deref for EventData<D, M> {
237    type Target = crate::Event;
238
239    fn deref(&self) -> &Self::Target {
240        &self.event
241    }
242}
243
244impl<D, M> TryFrom<&crate::Event> for EventData<D, M>
245where
246    D: bitcode::DecodeOwned,
247    M: bitcode::DecodeOwned,
248{
249    type Error = bitcode::Error;
250
251    fn try_from(value: &crate::Event) -> Result<Self, Self::Error> {
252        let data = bitcode::decode::<D>(&value.data)?;
253        let metadata = bitcode::decode::<M>(&value.metadata)?;
254        Ok(EventData {
255            data,
256            metadata,
257            event: value.clone(),
258        })
259    }
260}
261
262/// Container for event handlers that build a projection.
263///
264/// A `Projection` groups related event handlers together and provides
265/// methods to load aggregate state or create subscriptions.
266///
267/// # Type Parameters
268///
269/// - `P`: The projection/view type being built
270/// - `E`: The executor type for database operations
271///
272/// # Example
273///
274/// ```rust,ignore
275/// let projection = Projection::<AccountView, _>::new("accounts")
276///     .handler(account_opened)
277///     .handler(money_deposited)
278///     .handler(money_withdrawn);
279///
280/// // Use for loading state
281/// let state = projection.clone()
282///     .load::<Account>("account-123")
283///     .execute(&executor)
284///     .await?;
285///
286/// // Or create a subscription
287/// let sub = projection
288///     .subscription()
289///     .start(&executor)
290///     .await?;
291/// ```
292pub struct Projection<P: 'static, E: Executor> {
293    key: String,
294    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
295}
296
297impl<P: 'static, E: Executor> Projection<P, E> {
298    /// Creates a new projection with the given key.
299    ///
300    /// The key is used as the subscription identifier for cursor tracking.
301    pub fn new(key: impl Into<String>) -> Self {
302        Self {
303            key: key.into(),
304            handlers: HashMap::new(),
305        }
306    }
307
308    /// Registers an event handler with this projection.
309    ///
310    /// # Panics
311    ///
312    /// Panics if a handler for the same event type is already registered.
313    pub fn handler<H: Handler<P, E> + 'static>(mut self, h: H) -> Self {
314        let key = format!("{}_{}", h.aggregator_type(), h.event_name());
315        if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
316            panic!("Cannot register event handler: key {} already exists", key);
317        }
318        self
319    }
320
321    /// Creates a builder for loading aggregate state.
322    ///
323    /// This consumes the projection and returns a [`LoadBuilder`] configured
324    /// to load the state for the specified aggregate.
325    ///
326    /// # Type Parameters
327    ///
328    /// - `A`: The aggregate type to load
329    pub fn load<A: Aggregator>(self, id: impl Into<String>) -> LoadBuilder<P, E>
330    where
331        P: Snapshot + Default,
332    {
333        let id = id.into();
334        let mut aggregators = HashMap::new();
335        aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
336
337        LoadBuilder {
338            key: self.key.to_owned(),
339            id,
340            aggregators,
341            handlers: self.handlers,
342            context: Default::default(),
343            filter_events_by_name: true,
344        }
345    }
346
347    /// Creates a builder for a continuous event subscription.
348    ///
349    /// This consumes the projection and returns a [`SubscriptionBuilder`]
350    /// that can be configured and started.
351    pub fn subscription(self) -> SubscriptionBuilder<P, E> {
352        SubscriptionBuilder {
353            key: self.key.to_owned(),
354            context: Default::default(),
355            handlers: self.handlers,
356            delay: None,
357            retry: Some(30),
358            chunk_size: 300,
359            is_accept_failure: false,
360            routing_key: RoutingKey::Value(None),
361            aggregators: Default::default(),
362        }
363    }
364}
365
366/// Result of loading an aggregate's state.
367///
368/// Contains the rebuilt projection state along with the current version
369/// and routing key. Implements `Deref` and `DerefMut` for transparent
370/// access to the inner item.
371///
372/// # Example
373///
374/// ```rust,ignore
375/// let result: LoadResult<AccountView> = projection
376///     .load::<Account>("account-123")
377///     .execute(&executor)
378///     .await?
379///     .expect("Account not found");
380///
381/// // Access inner item via Deref
382/// println!("Balance: {}", result.balance);
383///
384/// // Access metadata
385/// println!("Version: {}", result.version);
386/// println!("Routing key: {:?}", result.routing_key);
387/// ```
388#[derive(Debug, Clone, Default)]
389pub struct LoadResult<A> {
390    /// The loaded projection/view state
391    pub item: A,
392    /// Current version of the aggregate
393    pub version: u16,
394    /// Routing key for the aggregate (if set)
395    pub routing_key: Option<String>,
396}
397
398impl<A> Deref for LoadResult<A> {
399    type Target = A;
400    fn deref(&self) -> &Self::Target {
401        &self.item
402    }
403}
404
405impl<A> DerefMut for LoadResult<A> {
406    fn deref_mut(&mut self) -> &mut Self::Target {
407        &mut self.item
408    }
409}
410
411/// Type alias for an optional load result.
412pub type OptionLoadResult<A> = Option<LoadResult<A>>;
413
414/// Trait for types that can be restored from snapshots.
415///
416/// Snapshots provide a performance optimization by storing pre-computed
417/// state, avoiding the need to replay all events from the beginning.
418///
419/// This trait is typically implemented via the `#[evento::snapshot]` macro.
420///
421/// # Example
422///
423/// ```rust,ignore
424/// #[evento::snapshot]
425/// #[derive(Default)]
426/// pub struct AccountView {
427///     pub balance: i64,
428///     pub owner: String,
429/// }
430///
431/// // The macro generates the restore implementation that loads
432/// // from a snapshot table if available
433/// ```
434pub trait Snapshot: Sized {
435    /// Restores state from a snapshot if available.
436    ///
437    /// Returns `None` if no snapshot exists for the given ID.
438    fn restore<'a>(
439        context: &'a context::RwContext,
440        id: String,
441    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OptionLoadResult<Self>>> + Send + 'a>>;
442}
443
444/// Builder for loading aggregate state from events.
445///
446/// Created via [`Projection::load`], this builder configures how to
447/// load an aggregate's state by replaying events.
448///
449/// # Example
450///
451/// ```rust,ignore
452/// let result = projection
453///     .load::<Account>("account-123")
454///     .data(app_config)  // Add shared data
455///     .aggregator::<User>("user-456")  // Add related aggregate
456///     .execute(&executor)
457///     .await?;
458/// ```
459pub struct LoadBuilder<P: Snapshot + Default + 'static, E: Executor> {
460    key: String,
461    id: String,
462    aggregators: HashMap<String, String>,
463    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
464    context: context::RwContext,
465    filter_events_by_name: bool,
466}
467
468impl<P: Snapshot + Default + 'static, E: Executor> LoadBuilder<P, E> {
469    /// Adds shared data to the load context.
470    ///
471    /// Data added here is accessible in handlers via the context.
472    pub fn data<D: Send + Sync + 'static>(&mut self, v: D) -> &mut Self {
473        self.context.insert(v);
474
475        self
476    }
477
478    /// Adds a related aggregate to load events from.
479    ///
480    /// Use this when the projection needs events from multiple aggregates.
481    pub fn aggregator<A: Aggregator>(&mut self, id: impl Into<String>) -> &mut Self {
482        self.aggregators
483            .insert(A::aggregator_type().to_owned(), id.into());
484
485        self
486    }
487
488    pub fn filter_events_by_name(&mut self, v: bool) -> &mut Self {
489        self.filter_events_by_name = v;
490
491        self
492    }
493
494    /// Executes the load operation, returning the rebuilt state.
495    ///
496    /// Returns `None` if no events exist for the aggregate.
497    /// Returns `Err` if there are too many events to process in one batch.
498    pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<LoadResult<P>>> {
499        let context = Context {
500            context: self.context.clone(),
501            executor,
502        };
503
504        let cursor = executor.get_subscriber_cursor(self.key.to_owned()).await?;
505        let loaded = P::restore(&context, self.id.to_owned()).await?;
506        let has_loaded = loaded.is_some();
507        let mut snapshot = loaded.unwrap_or_default();
508
509        let mut read_aggregators = vec![];
510        for handler in self.handlers.values() {
511            let Some(id) = self.aggregators.get(handler.aggregator_type()) else {
512                anyhow::bail!(
513                    "Failed to load projection {}/{}: id not found",
514                    handler.aggregator_type(),
515                    handler.event_name()
516                );
517            };
518
519            read_aggregators.push(ReadAggregator {
520                aggregator_type: handler.aggregator_type().to_owned(),
521                aggregator_id: Some(id.to_owned()),
522                name: if self.filter_events_by_name {
523                    Some(handler.event_name().to_owned())
524                } else {
525                    None
526                },
527            });
528        }
529
530        let events = executor
531            .read(
532                Some(read_aggregators.to_vec()),
533                None,
534                Args::forward(100, cursor.clone()),
535            )
536            .await?;
537
538        for event in events.edges.iter() {
539            let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
540            let Some(handler) = self.handlers.get(&key) else {
541                tracing::debug!("No handler found for {}/{key}", self.key);
542                continue;
543            };
544
545            handler.apply(&mut snapshot, &event.node).await?;
546        }
547
548        if events.page_info.has_next_page {
549            anyhow::bail!("Too busy");
550        }
551
552        if let Some(event) = events.edges.last() {
553            snapshot.version = event.node.version;
554            snapshot.routing_key = event.node.routing_key.to_owned();
555
556            return Ok(Some(snapshot));
557        }
558
559        if !has_loaded {
560            return Ok(None);
561        }
562
563        let events = executor
564            .read(
565                Some(read_aggregators.to_vec()),
566                None,
567                Args::backward(1, None),
568            )
569            .await?;
570
571        if let Some(event) = events.edges.first() {
572            snapshot.version = event.node.version;
573            snapshot.routing_key = event.node.routing_key.to_owned();
574
575            return Ok(Some(snapshot));
576        }
577
578        Ok(None)
579    }
580}
581
582/// Builder for creating event subscriptions.
583///
584/// Created via [`Projection::subscription`], this builder configures
585/// a continuous event processing subscription with retry logic,
586/// routing key filtering, and graceful shutdown support.
587///
588/// # Example
589///
590/// ```rust,ignore
591/// let subscription = projection
592///     .subscription()
593///     .routing_key("accounts")
594///     .chunk_size(100)
595///     .retry(5)
596///     .delay(Duration::from_secs(10))
597///     .start(&executor)
598///     .await?;
599///
600/// // Later, gracefully shutdown
601/// subscription.shutdown().await?;
602/// ```
603pub struct SubscriptionBuilder<P: 'static, E: Executor> {
604    key: String,
605    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
606    context: context::RwContext,
607    routing_key: RoutingKey,
608    delay: Option<Duration>,
609    chunk_size: u16,
610    is_accept_failure: bool,
611    retry: Option<u8>,
612    aggregators: HashMap<String, String>,
613}
614
615impl<P, E: Executor + 'static> SubscriptionBuilder<P, E> {
616    /// Adds shared data to the load context.
617    ///
618    /// Data added here is accessible in handlers via the context.
619    pub fn data<D: Send + Sync + 'static>(self, v: D) -> Self {
620        self.context.insert(v);
621
622        self
623    }
624
625    /// Allows the subscription to continue after handler failures.
626    ///
627    /// By default, subscriptions stop on the first error. With this flag,
628    /// errors are logged but processing continues.
629    pub fn accept_failure(mut self) -> Self {
630        self.is_accept_failure = true;
631
632        self
633    }
634
635    /// Sets the number of events to process per batch.
636    ///
637    /// Default is 300.
638    pub fn chunk_size(mut self, v: u16) -> Self {
639        self.chunk_size = v;
640
641        self
642    }
643
644    /// Sets a delay before starting the subscription.
645    ///
646    /// Useful for staggering subscription starts in multi-node deployments.
647    pub fn delay(mut self, v: Duration) -> Self {
648        self.delay = Some(v);
649
650        self
651    }
652
653    /// Filters events by routing key.
654    ///
655    /// Only events with the matching routing key will be processed.
656    pub fn routing_key(mut self, v: impl Into<String>) -> Self {
657        self.routing_key = RoutingKey::Value(Some(v.into()));
658
659        self
660    }
661
662    /// Sets the maximum number of retries on failure.
663    ///
664    /// Uses exponential backoff. Default is 30.
665    pub fn retry(mut self, v: u8) -> Self {
666        self.retry = Some(v);
667
668        self
669    }
670
671    fn without_retry(mut self) -> Self {
672        self.retry = None;
673
674        self
675    }
676
677    /// Processes all events regardless of routing key.
678    pub fn all(mut self) -> Self {
679        self.routing_key = RoutingKey::All;
680
681        self
682    }
683
684    /// Adds a related aggregate to process events from.
685    pub fn aggregator<A: Aggregator>(mut self, id: impl Into<String>) -> Self {
686        self.aggregators
687            .insert(A::aggregator_type().to_owned(), id.into());
688
689        self
690    }
691
692    fn read_aggregators(&self) -> Vec<ReadAggregator> {
693        self.handlers
694            .values()
695            .map(|h| match self.aggregators.get(h.aggregator_type()) {
696                Some(id) => ReadAggregator {
697                    aggregator_type: h.aggregator_type().to_owned(),
698                    aggregator_id: Some(id.to_owned()),
699                    name: Some(h.event_name().to_owned()),
700                },
701                _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
702            })
703            .collect()
704    }
705
706    fn key(&self) -> String {
707        if let RoutingKey::Value(Some(ref key)) = self.routing_key {
708            return format!("{key}.{}", self.key);
709        }
710
711        self.key.to_owned()
712    }
713
714    async fn process(
715        &self,
716        executor: &E,
717        id: &Ulid,
718        aggregators: &[ReadAggregator],
719        mut rx: Option<&mut tokio::sync::oneshot::Receiver<()>>,
720    ) -> anyhow::Result<()> {
721        let mut interval = interval_at(
722            Instant::now() - Duration::from_millis(400),
723            Duration::from_millis(300),
724        );
725
726        loop {
727            interval.tick().await;
728
729            if !executor.is_subscriber_running(self.key(), *id).await? {
730                return Ok(());
731            }
732
733            let cursor = executor.get_subscriber_cursor(self.key()).await?;
734
735            let timestamp = executor
736                .read(
737                    Some(aggregators.to_vec()),
738                    Some(self.routing_key.to_owned()),
739                    Args::backward(1, None),
740                )
741                .await?
742                .edges
743                .last()
744                .map(|e| e.node.timestamp)
745                .unwrap_or_default();
746
747            let res = executor
748                .read(
749                    Some(aggregators.to_vec()),
750                    Some(self.routing_key.to_owned()),
751                    Args::forward(self.chunk_size, cursor),
752                )
753                .await?;
754
755            if res.edges.is_empty() {
756                return Ok(());
757            }
758
759            let context = Context {
760                context: self.context.clone(),
761                executor,
762            };
763
764            for event in res.edges {
765                if let Some(rx) = rx.as_mut() {
766                    if rx.try_recv().is_ok() {
767                        tracing::info!(
768                            key = self.key(),
769                            "Subscription received shutdown signal, stopping gracefull"
770                        );
771
772                        return Ok(());
773                    }
774                }
775
776                tracing::Span::current().record("aggregator_type", &event.node.aggregator_type);
777                tracing::Span::current().record("aggregator_id", &event.node.aggregator_id);
778                tracing::Span::current().record("event", &event.node.name);
779
780                let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
781                let Some(handler) = self.handlers.get(&key) else {
782                    panic!("No handler found for {}/{key}", self.key());
783                };
784
785                handler.handle(&context, &event.node).await?;
786
787                executor
788                    .acknowledge(
789                        self.key(),
790                        event.cursor.to_owned(),
791                        timestamp - event.node.timestamp,
792                    )
793                    .await?;
794            }
795        }
796    }
797
798    /// Starts the subscription without retry logic.
799    ///
800    /// Equivalent to calling `start()` with retries disabled.
801    pub async fn unretry_start(self, executor: &E) -> anyhow::Result<Subscription>
802    where
803        E: Clone,
804    {
805        self.without_retry().start(executor).await
806    }
807
808    /// Starts a continuous background subscription.
809    ///
810    /// Returns a [`Subscription`] handle that can be used for graceful shutdown.
811    /// The subscription runs in a spawned tokio task and polls for new events.
812    pub async fn start(self, executor: &E) -> anyhow::Result<Subscription>
813    where
814        E: Clone,
815    {
816        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
817        let executor = executor.clone();
818        let id = Ulid::new();
819        let subscription_id = id;
820
821        executor
822            .upsert_subscriber(self.key(), id.to_owned())
823            .await?;
824
825        let task_handle = tokio::spawn(async move {
826            let read_aggregators = self.read_aggregators();
827            let start = self
828                .delay
829                .map(|d| Instant::now() + d)
830                .unwrap_or_else(Instant::now);
831
832            let mut interval = interval_at(
833                start - Duration::from_millis(1200),
834                Duration::from_millis(1000),
835            );
836
837            loop {
838                if shutdown_rx.try_recv().is_ok() {
839                    tracing::info!(
840                        key = self.key(),
841                        "Subscription received shutdown signal, stopping gracefull"
842                    );
843
844                    break;
845                }
846
847                interval.tick().await;
848
849                let _ = tracing::error_span!(
850                    "start",
851                    key = self.key(),
852                    aggregator_type = tracing::field::Empty,
853                    aggregator_id = tracing::field::Empty,
854                    event = tracing::field::Empty,
855                )
856                .entered();
857
858                let result = match self.retry {
859                    Some(retry) => {
860                        (|| async { self.process(&executor, &id, &read_aggregators, None).await })
861                            .retry(ExponentialBuilder::default().with_max_times(retry.into()))
862                            .sleep(tokio::time::sleep)
863                            .notify(|err, dur| {
864                                tracing::error!(
865                                    error = %err,
866                                    duration = ?dur,
867                                    "Failed to process event"
868                                );
869                            })
870                            .await
871                    }
872                    _ => self.process(&executor, &id, &read_aggregators, None).await,
873                };
874
875                let Err(err) = result else {
876                    continue;
877                };
878
879                tracing::error!(error = %err, "Failed to process event");
880
881                if !self.is_accept_failure {
882                    break;
883                }
884            }
885        });
886
887        Ok(Subscription {
888            id: subscription_id,
889            task_handle,
890            shutdown_tx,
891        })
892    }
893
894    /// Executes the subscription once without retry logic.
895    ///
896    /// Processes all pending events and returns. Does not poll for new events.
897    pub async fn unretry_execute(self, executor: &E) -> anyhow::Result<()> {
898        self.without_retry().execute(executor).await
899    }
900
901    /// Executes the subscription once, processing all pending events.
902    ///
903    /// Unlike `start()`, this does not run continuously. It processes
904    /// all currently pending events and returns.
905    pub async fn execute(&self, executor: &E) -> anyhow::Result<()> {
906        let id = Ulid::new();
907
908        executor
909            .upsert_subscriber(self.key(), id.to_owned())
910            .await?;
911
912        let read_aggregators = self.read_aggregators();
913
914        let _ = tracing::error_span!(
915            "execute",
916            key = self.key(),
917            aggregator_type = tracing::field::Empty,
918            aggregator_id = tracing::field::Empty,
919            event = tracing::field::Empty,
920        )
921        .entered();
922
923        match self.retry {
924            Some(retry) => {
925                (|| async { self.process(executor, &id, &read_aggregators, None).await })
926                    .retry(ExponentialBuilder::default().with_max_times(retry.into()))
927                    .sleep(tokio::time::sleep)
928                    .notify(|err, dur| {
929                        tracing::error!(
930                            error = %err,
931                            duration = ?dur,
932                            "Failed to process event"
933                        );
934                    })
935                    .await
936            }
937            _ => self.process(executor, &id, &read_aggregators, None).await,
938        }
939    }
940}
941
942/// Handle to a running event subscription.
943///
944/// Returned by [`SubscriptionBuilder::start`], this handle provides
945/// the subscription ID and a method for graceful shutdown.
946///
947/// # Example
948///
949/// ```rust,ignore
950/// let subscription = projection
951///     .subscription()
952///     .start(&executor)
953///     .await?;
954///
955/// println!("Started subscription: {}", subscription.id);
956///
957/// // On application shutdown
958/// subscription.shutdown().await?;
959/// ```
960#[derive(Debug)]
961pub struct Subscription {
962    /// Unique ID for this subscription instance
963    pub id: Ulid,
964    task_handle: tokio::task::JoinHandle<()>,
965    shutdown_tx: tokio::sync::oneshot::Sender<()>,
966}
967
968impl Subscription {
969    /// Gracefully shuts down the subscription.
970    ///
971    /// Signals the subscription to stop and waits for it to finish
972    /// processing the current event before returning.
973    pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
974        let _ = self.shutdown_tx.send(());
975
976        self.task_handle.await
977    }
978}