evento_core/
projection.rs

1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//!     .handler(account_opened)
24//!     .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//!     .load::<Account>("account-123")
29//!     .execute(&executor)
30//!     .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//!     .subscription()
35//!     .routing_key("accounts")
36//!     .start(&executor)
37//!     .await?;
38//! ```
39
40use std::{
41    collections::HashMap,
42    future::Future,
43    ops::{Deref, DerefMut},
44    pin::Pin,
45    time::Duration,
46};
47use tokio::time::{interval_at, Instant};
48use ulid::Ulid;
49
50use backon::{ExponentialBuilder, Retryable};
51
52use crate::{context, cursor::Args, Executor, ReadAggregator};
53
54/// Filter for events by routing key.
55///
56/// Routing keys allow partitioning events for parallel processing
57/// or filtering subscriptions to specific event streams.
58#[derive(Clone)]
59pub enum RoutingKey {
60    /// Match all events regardless of routing key
61    All,
62    /// Match events with a specific routing key (or no key if `None`)
63    Value(Option<String>),
64}
65
66/// Handler context providing access to executor and shared data.
67///
68/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
69/// data storage and provides access to the executor for database operations.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// #[evento::handler]
75/// async fn my_handler<E: Executor>(
76///     event: Event<MyEventData>,
77///     action: Action<'_, MyView, E>,
78/// ) -> anyhow::Result<()> {
79///     if let Action::Handle(ctx) = action {
80///         // Access shared data
81///         let config: Data<AppConfig> = ctx.extract();
82///
83///         // Use executor for queries
84///         let events = ctx.executor.read(...).await?;
85///     }
86///     Ok(())
87/// }
88/// ```
89#[derive(Clone)]
90pub struct Context<'a, E: Executor> {
91    context: context::RwContext,
92    /// Reference to the executor for database operations
93    pub executor: &'a E,
94}
95
96impl<'a, E: Executor> Deref for Context<'a, E> {
97    type Target = context::RwContext;
98
99    fn deref(&self) -> &Self::Target {
100        &self.context
101    }
102}
103
104/// Trait for aggregate types.
105///
106/// Aggregates are the root entities in event sourcing. Each aggregate
107/// type has a unique identifier string used for event storage and routing.
108///
109/// This trait is typically derived using the `#[evento::aggregator]` macro.
110///
111/// # Example
112///
113/// ```rust,ignore
114/// #[evento::aggregator("myapp/Account")]
115/// #[derive(Default)]
116/// pub struct Account {
117///     pub balance: i64,
118///     pub owner: String,
119/// }
120/// ```
121pub trait Aggregator: Default {
122    /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
123    fn aggregator_type() -> &'static str;
124}
125
126/// Trait for event types.
127///
128/// Events represent state changes that have occurred. Each event type
129/// has a name and belongs to an aggregator type.
130///
131/// This trait is typically derived using the `#[evento::aggregator]` macro.
132///
133/// # Example
134///
135/// ```rust,ignore
136/// #[evento::aggregator("myapp/Account")]
137/// #[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
138/// pub struct AccountOpened {
139///     pub owner: String,
140/// }
141/// ```
142pub trait Event: Aggregator {
143    /// Returns the event name (e.g., "AccountOpened")
144    fn event_name() -> &'static str;
145}
146
147/// Trait for event handlers.
148///
149/// Handlers process events in two modes:
150/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
151/// - `apply`: For loading aggregate state by replaying events
152///
153/// This trait is typically implemented via the `#[evento::handler]` macro.
154pub trait Handler<P: 'static, E: Executor>: Sync + Send {
155    /// Handles an event during subscription processing.
156    ///
157    /// This is called when processing events in a subscription context,
158    /// where side effects like database updates or API calls are appropriate.
159    fn handle<'a>(
160        &'a self,
161        context: &'a Context<'a, E>,
162        event: &'a crate::Event,
163    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
164
165    /// Applies an event to build projection state.
166    ///
167    /// This is called when loading aggregate state by replaying events.
168    /// It should be a pure function that modifies the projection without side effects.
169    fn apply<'a>(
170        &'a self,
171        projection: &'a mut P,
172        event: &'a crate::Event,
173    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
174
175    /// Returns the aggregator type this handler processes.
176    fn aggregator_type(&self) -> &'static str;
177    /// Returns the event name this handler processes.
178    fn event_name(&self) -> &'static str;
179}
180
181/// Action passed to event handlers.
182///
183/// Determines whether the handler should apply state changes or
184/// handle the event with side effects.
185pub enum Action<'a, P: 'static, E: Executor> {
186    /// Apply event to projection state (for loading)
187    Apply(&'a mut P),
188    /// Handle event with context (for subscriptions)
189    Handle(&'a Context<'a, E>),
190}
191
192/// Typed event with deserialized data and metadata.
193///
194/// `EventData` wraps a raw [`Event`](crate::Event) and provides typed access
195/// to the deserialized event data and metadata. It implements `Deref` to
196/// provide access to the underlying event fields (id, timestamp, version, etc.).
197///
198/// # Type Parameters
199///
200/// - `D`: The event data type (e.g., `AccountOpened`)
201/// - `M`: The metadata type (defaults to `bool` for no metadata)
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use evento::metadata::Event;
207///
208/// #[evento::handler]
209/// async fn handle_deposit<E: Executor>(
210///     event: Event<MoneyDeposited>,
211///     action: Action<'_, AccountView, E>,
212/// ) -> anyhow::Result<()> {
213///     // Access typed data
214///     println!("Amount: {}", event.data.amount);
215///
216///     // Access metadata
217///     if let Ok(user) = event.metadata.user() {
218///         println!("By user: {}", user);
219///     }
220///
221///     // Access underlying event fields via Deref
222///     println!("Event ID: {}", event.id);
223///     println!("Version: {}", event.version);
224///
225///     Ok(())
226/// }
227/// ```
228pub struct EventData<D, M = bool> {
229    event: crate::Event,
230    /// The typed event data
231    pub data: D,
232    /// The typed event metadata
233    pub metadata: M,
234}
235
236impl<D, M> Deref for EventData<D, M> {
237    type Target = crate::Event;
238
239    fn deref(&self) -> &Self::Target {
240        &self.event
241    }
242}
243
244impl<D, M> TryFrom<&crate::Event> for EventData<D, M>
245where
246    D: rkyv::Archive,
247    M: rkyv::Archive,
248    <D as rkyv::Archive>::Archived: for<'a> rkyv::bytecheck::CheckBytes<
249            rkyv::rancor::Strategy<
250                rkyv::validation::Validator<
251                    rkyv::validation::archive::ArchiveValidator<'a>,
252                    rkyv::validation::shared::SharedValidator,
253                >,
254                rkyv::rancor::Error,
255            >,
256        > + rkyv::Deserialize<D, rkyv::rancor::Strategy<rkyv::de::Pool, rkyv::rancor::Error>>,
257    <M as rkyv::Archive>::Archived: for<'a> rkyv::bytecheck::CheckBytes<
258            rkyv::rancor::Strategy<
259                rkyv::validation::Validator<
260                    rkyv::validation::archive::ArchiveValidator<'a>,
261                    rkyv::validation::shared::SharedValidator,
262                >,
263                rkyv::rancor::Error,
264            >,
265        > + rkyv::Deserialize<M, rkyv::rancor::Strategy<rkyv::de::Pool, rkyv::rancor::Error>>,
266{
267    type Error = rkyv::rancor::Error;
268
269    fn try_from(value: &crate::Event) -> Result<Self, Self::Error> {
270        let data = rkyv::from_bytes::<D, rkyv::rancor::Error>(&value.data)?;
271        let metadata = rkyv::from_bytes::<M, rkyv::rancor::Error>(&value.metadata)?;
272        Ok(EventData {
273            data,
274            metadata,
275            event: value.clone(),
276        })
277    }
278}
279
280/// Container for event handlers that build a projection.
281///
282/// A `Projection` groups related event handlers together and provides
283/// methods to load aggregate state or create subscriptions.
284///
285/// # Type Parameters
286///
287/// - `P`: The projection/view type being built
288/// - `E`: The executor type for database operations
289///
290/// # Example
291///
292/// ```rust,ignore
293/// let projection = Projection::<AccountView, _>::new("accounts")
294///     .handler(account_opened)
295///     .handler(money_deposited)
296///     .handler(money_withdrawn);
297///
298/// // Use for loading state
299/// let state = projection.clone()
300///     .load::<Account>("account-123")
301///     .execute(&executor)
302///     .await?;
303///
304/// // Or create a subscription
305/// let sub = projection
306///     .subscription()
307///     .start(&executor)
308///     .await?;
309/// ```
310pub struct Projection<P: 'static, E: Executor> {
311    key: String,
312    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
313}
314
315impl<P: 'static, E: Executor> Projection<P, E> {
316    /// Creates a new projection with the given key.
317    ///
318    /// The key is used as the subscription identifier for cursor tracking.
319    pub fn new(key: impl Into<String>) -> Self {
320        Self {
321            key: key.into(),
322            handlers: HashMap::new(),
323        }
324    }
325
326    /// Registers an event handler with this projection.
327    ///
328    /// # Panics
329    ///
330    /// Panics if a handler for the same event type is already registered.
331    pub fn handler<H: Handler<P, E> + 'static>(mut self, h: H) -> Self {
332        let key = format!("{}_{}", h.aggregator_type(), h.event_name());
333        if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
334            panic!("Cannot register event handler: key {} already exists", key);
335        }
336        self
337    }
338
339    /// Creates a builder for loading aggregate state.
340    ///
341    /// This consumes the projection and returns a [`LoadBuilder`] configured
342    /// to load the state for the specified aggregate.
343    ///
344    /// # Type Parameters
345    ///
346    /// - `A`: The aggregate type to load
347    pub fn load<A: Aggregator>(self, id: impl Into<String>) -> LoadBuilder<P, E>
348    where
349        P: Snapshot + Default,
350    {
351        let id = id.into();
352        let mut aggregators = HashMap::new();
353        aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
354
355        LoadBuilder {
356            key: self.key.to_owned(),
357            id,
358            aggregators,
359            handlers: self.handlers,
360            context: Default::default(),
361        }
362    }
363
364    /// Creates a builder for a continuous event subscription.
365    ///
366    /// This consumes the projection and returns a [`SubscriptionBuilder`]
367    /// that can be configured and started.
368    pub fn subscription(self) -> SubscriptionBuilder<P, E> {
369        SubscriptionBuilder {
370            key: self.key.to_owned(),
371            context: Default::default(),
372            handlers: self.handlers,
373            delay: None,
374            retry: Some(30),
375            chunk_size: 300,
376            is_accept_failure: false,
377            routing_key: RoutingKey::Value(None),
378            aggregators: Default::default(),
379        }
380    }
381}
382
383/// Result of loading an aggregate's state.
384///
385/// Contains the rebuilt projection state along with the current version
386/// and routing key. Implements `Deref` and `DerefMut` for transparent
387/// access to the inner item.
388///
389/// # Example
390///
391/// ```rust,ignore
392/// let result: LoadResult<AccountView> = projection
393///     .load::<Account>("account-123")
394///     .execute(&executor)
395///     .await?
396///     .expect("Account not found");
397///
398/// // Access inner item via Deref
399/// println!("Balance: {}", result.balance);
400///
401/// // Access metadata
402/// println!("Version: {}", result.version);
403/// println!("Routing key: {:?}", result.routing_key);
404/// ```
405#[derive(Debug, Clone, Default)]
406pub struct LoadResult<A> {
407    /// The loaded projection/view state
408    pub item: A,
409    /// Current version of the aggregate
410    pub version: u16,
411    /// Routing key for the aggregate (if set)
412    pub routing_key: Option<String>,
413}
414
415impl<A> Deref for LoadResult<A> {
416    type Target = A;
417    fn deref(&self) -> &Self::Target {
418        &self.item
419    }
420}
421
422impl<A> DerefMut for LoadResult<A> {
423    fn deref_mut(&mut self) -> &mut Self::Target {
424        &mut self.item
425    }
426}
427
428/// Type alias for an optional load result.
429pub type OptionLoadResult<A> = Option<LoadResult<A>>;
430
431/// Trait for types that can be restored from snapshots.
432///
433/// Snapshots provide a performance optimization by storing pre-computed
434/// state, avoiding the need to replay all events from the beginning.
435///
436/// This trait is typically implemented via the `#[evento::snapshot]` macro.
437///
438/// # Example
439///
440/// ```rust,ignore
441/// #[evento::snapshot]
442/// #[derive(Default)]
443/// pub struct AccountView {
444///     pub balance: i64,
445///     pub owner: String,
446/// }
447///
448/// // The macro generates the restore implementation that loads
449/// // from a snapshot table if available
450/// ```
451pub trait Snapshot: Sized {
452    /// Restores state from a snapshot if available.
453    ///
454    /// Returns `None` if no snapshot exists for the given ID.
455    fn restore<'a>(
456        context: &'a context::RwContext,
457        id: String,
458    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OptionLoadResult<Self>>> + Send + 'a>>;
459}
460
461/// Builder for loading aggregate state from events.
462///
463/// Created via [`Projection::load`], this builder configures how to
464/// load an aggregate's state by replaying events.
465///
466/// # Example
467///
468/// ```rust,ignore
469/// let result = projection
470///     .load::<Account>("account-123")
471///     .data(app_config)  // Add shared data
472///     .aggregator::<User>("user-456")  // Add related aggregate
473///     .execute(&executor)
474///     .await?;
475/// ```
476pub struct LoadBuilder<P: Snapshot + Default + 'static, E: Executor> {
477    key: String,
478    id: String,
479    aggregators: HashMap<String, String>,
480    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
481    context: context::RwContext,
482}
483
484impl<P: Snapshot + Default + 'static, E: Executor> LoadBuilder<P, E> {
485    /// Adds shared data to the load context.
486    ///
487    /// Data added here is accessible in handlers via the context.
488    pub fn data<D: Send + Sync + 'static>(&mut self, v: D) -> &mut Self {
489        self.context.insert(v);
490
491        self
492    }
493
494    /// Adds a related aggregate to load events from.
495    ///
496    /// Use this when the projection needs events from multiple aggregates.
497    pub fn aggregator<A: Aggregator>(&mut self, id: impl Into<String>) -> &mut Self {
498        self.aggregators
499            .insert(A::aggregator_type().to_owned(), id.into());
500
501        self
502    }
503
504    /// Executes the load operation, returning the rebuilt state.
505    ///
506    /// Returns `None` if no events exist for the aggregate.
507    /// Returns `Err` if there are too many events to process in one batch.
508    pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<LoadResult<P>>> {
509        let context = Context {
510            context: self.context.clone(),
511            executor,
512        };
513
514        let cursor = executor.get_subscriber_cursor(self.key.to_owned()).await?;
515        let loaded = P::restore(&context, self.id.to_owned()).await?;
516        let has_loaded = loaded.is_some();
517        let mut snapshot = loaded.unwrap_or_default();
518
519        let read_aggregators = self
520            .handlers
521            .values()
522            .map(|h| match self.aggregators.get(h.aggregator_type()) {
523                Some(id) => ReadAggregator {
524                    aggregator_type: h.aggregator_type().to_owned(),
525                    aggregator_id: Some(id.to_owned()),
526                    name: None,
527                },
528                _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
529            })
530            .collect::<Vec<_>>();
531
532        let events = executor
533            .read(
534                Some(read_aggregators.to_vec()),
535                None,
536                Args::forward(100, cursor.clone()),
537            )
538            .await?;
539
540        for event in events.edges.iter() {
541            let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
542            let Some(handler) = self.handlers.get(&key) else {
543                tracing::debug!("No handler found for {}/{key}", self.key);
544                continue;
545            };
546
547            handler.apply(&mut snapshot, &event.node).await?;
548        }
549
550        if events.page_info.has_next_page {
551            anyhow::bail!("Too busy");
552        }
553
554        if let Some(event) = events.edges.last() {
555            snapshot.version = event.node.version;
556            snapshot.routing_key = event.node.routing_key.to_owned();
557
558            return Ok(Some(snapshot));
559        }
560
561        if !has_loaded {
562            return Ok(None);
563        }
564
565        let events = executor
566            .read(
567                Some(read_aggregators.to_vec()),
568                None,
569                Args::backward(1, None),
570            )
571            .await?;
572
573        if let Some(event) = events.edges.first() {
574            snapshot.version = event.node.version;
575            snapshot.routing_key = event.node.routing_key.to_owned();
576
577            return Ok(Some(snapshot));
578        }
579
580        Ok(None)
581    }
582}
583
584/// Builder for creating event subscriptions.
585///
586/// Created via [`Projection::subscription`], this builder configures
587/// a continuous event processing subscription with retry logic,
588/// routing key filtering, and graceful shutdown support.
589///
590/// # Example
591///
592/// ```rust,ignore
593/// let subscription = projection
594///     .subscription()
595///     .routing_key("accounts")
596///     .chunk_size(100)
597///     .retry(5)
598///     .delay(Duration::from_secs(10))
599///     .start(&executor)
600///     .await?;
601///
602/// // Later, gracefully shutdown
603/// subscription.shutdown().await?;
604/// ```
605pub struct SubscriptionBuilder<P: 'static, E: Executor> {
606    key: String,
607    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
608    context: context::RwContext,
609    routing_key: RoutingKey,
610    delay: Option<Duration>,
611    chunk_size: u16,
612    is_accept_failure: bool,
613    retry: Option<u8>,
614    aggregators: HashMap<String, String>,
615}
616
617impl<P, E: Executor + 'static> SubscriptionBuilder<P, E> {
618    /// Allows the subscription to continue after handler failures.
619    ///
620    /// By default, subscriptions stop on the first error. With this flag,
621    /// errors are logged but processing continues.
622    pub fn accept_failure(mut self) -> Self {
623        self.is_accept_failure = true;
624
625        self
626    }
627
628    /// Sets the number of events to process per batch.
629    ///
630    /// Default is 300.
631    pub fn chunk_size(mut self, v: u16) -> Self {
632        self.chunk_size = v;
633
634        self
635    }
636
637    /// Sets a delay before starting the subscription.
638    ///
639    /// Useful for staggering subscription starts in multi-node deployments.
640    pub fn delay(mut self, v: Duration) -> Self {
641        self.delay = Some(v);
642
643        self
644    }
645
646    /// Filters events by routing key.
647    ///
648    /// Only events with the matching routing key will be processed.
649    pub fn routing_key(mut self, v: impl Into<String>) -> Self {
650        self.routing_key = RoutingKey::Value(Some(v.into()));
651
652        self
653    }
654
655    /// Sets the maximum number of retries on failure.
656    ///
657    /// Uses exponential backoff. Default is 30.
658    pub fn retry(mut self, v: u8) -> Self {
659        self.retry = Some(v);
660
661        self
662    }
663
664    fn without_retry(mut self) -> Self {
665        self.retry = None;
666
667        self
668    }
669
670    /// Processes all events regardless of routing key.
671    pub fn all(mut self) -> Self {
672        self.routing_key = RoutingKey::All;
673
674        self
675    }
676
677    /// Adds a related aggregate to process events from.
678    pub fn aggregator<A: Aggregator>(mut self, id: impl Into<String>) -> Self {
679        self.aggregators
680            .insert(A::aggregator_type().to_owned(), id.into());
681
682        self
683    }
684
685    fn read_aggregators(&self) -> Vec<ReadAggregator> {
686        self.handlers
687            .values()
688            .map(|h| match self.aggregators.get(h.aggregator_type()) {
689                Some(id) => ReadAggregator {
690                    aggregator_type: h.aggregator_type().to_owned(),
691                    aggregator_id: Some(id.to_owned()),
692                    name: Some(h.event_name().to_owned()),
693                },
694                _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
695            })
696            .collect()
697    }
698
699    fn key(&self) -> String {
700        if let RoutingKey::Value(Some(ref key)) = self.routing_key {
701            return format!("{key}.{}", self.key);
702        }
703
704        self.key.to_owned()
705    }
706
707    async fn process(
708        &self,
709        executor: &E,
710        id: &Ulid,
711        aggregators: &[ReadAggregator],
712        mut rx: Option<&mut tokio::sync::oneshot::Receiver<()>>,
713    ) -> anyhow::Result<()> {
714        let mut interval = interval_at(
715            Instant::now() - Duration::from_millis(400),
716            Duration::from_millis(300),
717        );
718
719        loop {
720            interval.tick().await;
721
722            if !executor.is_subscriber_running(self.key(), *id).await? {
723                return Ok(());
724            }
725
726            let cursor = executor.get_subscriber_cursor(self.key()).await?;
727
728            let timestamp = executor
729                .read(
730                    Some(aggregators.to_vec()),
731                    Some(self.routing_key.to_owned()),
732                    Args::backward(1, None),
733                )
734                .await?
735                .edges
736                .last()
737                .map(|e| e.node.timestamp)
738                .unwrap_or_default();
739
740            let res = executor
741                .read(
742                    Some(aggregators.to_vec()),
743                    Some(self.routing_key.to_owned()),
744                    Args::forward(self.chunk_size, cursor),
745                )
746                .await?;
747
748            if res.edges.is_empty() {
749                return Ok(());
750            }
751
752            let context = Context {
753                context: self.context.clone(),
754                executor,
755            };
756
757            for event in res.edges {
758                if let Some(rx) = rx.as_mut() {
759                    if rx.try_recv().is_ok() {
760                        tracing::info!(
761                            key = self.key(),
762                            "Subscription received shutdown signal, stopping gracefull"
763                        );
764
765                        return Ok(());
766                    }
767                }
768
769                tracing::Span::current().record("aggregator_type", &event.node.aggregator_type);
770                tracing::Span::current().record("aggregator_id", &event.node.aggregator_id);
771                tracing::Span::current().record("event", &event.node.name);
772
773                let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
774                let Some(handler) = self.handlers.get(&key) else {
775                    panic!("No handler found for {}/{key}", self.key());
776                };
777
778                handler.handle(&context, &event.node).await?;
779
780                executor
781                    .acknowledge(
782                        self.key(),
783                        event.cursor.to_owned(),
784                        timestamp - event.node.timestamp,
785                    )
786                    .await?;
787            }
788        }
789    }
790
791    /// Starts the subscription without retry logic.
792    ///
793    /// Equivalent to calling `start()` with retries disabled.
794    pub async fn unretry_start(self, executor: &E) -> anyhow::Result<Subscription>
795    where
796        E: Clone,
797    {
798        self.without_retry().start(executor).await
799    }
800
801    /// Starts a continuous background subscription.
802    ///
803    /// Returns a [`Subscription`] handle that can be used for graceful shutdown.
804    /// The subscription runs in a spawned tokio task and polls for new events.
805    pub async fn start(self, executor: &E) -> anyhow::Result<Subscription>
806    where
807        E: Clone,
808    {
809        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
810        let executor = executor.clone();
811        let id = Ulid::new();
812        let subscription_id = id;
813
814        executor
815            .upsert_subscriber(self.key(), id.to_owned())
816            .await?;
817
818        let task_handle = tokio::spawn(async move {
819            let read_aggregators = self.read_aggregators();
820            let start = self
821                .delay
822                .map(|d| Instant::now() + d)
823                .unwrap_or_else(Instant::now);
824
825            let mut interval = interval_at(
826                start - Duration::from_millis(1200),
827                Duration::from_millis(1000),
828            );
829
830            loop {
831                if shutdown_rx.try_recv().is_ok() {
832                    tracing::info!(
833                        key = self.key(),
834                        "Subscription received shutdown signal, stopping gracefull"
835                    );
836
837                    break;
838                }
839
840                interval.tick().await;
841
842                let _ = tracing::error_span!(
843                    "start",
844                    key = self.key(),
845                    aggregator_type = tracing::field::Empty,
846                    aggregator_id = tracing::field::Empty,
847                    event = tracing::field::Empty,
848                )
849                .entered();
850
851                let result = match self.retry {
852                    Some(retry) => {
853                        (|| async { self.process(&executor, &id, &read_aggregators, None).await })
854                            .retry(ExponentialBuilder::default().with_max_times(retry.into()))
855                            .sleep(tokio::time::sleep)
856                            .notify(|err, dur| {
857                                tracing::error!(
858                                    error = %err,
859                                    duration = ?dur,
860                                    "Failed to process event"
861                                );
862                            })
863                            .await
864                    }
865                    _ => self.process(&executor, &id, &read_aggregators, None).await,
866                };
867
868                let Err(err) = result else {
869                    continue;
870                };
871
872                tracing::error!(error = %err, "Failed to process event");
873
874                if !self.is_accept_failure {
875                    break;
876                }
877            }
878        });
879
880        Ok(Subscription {
881            id: subscription_id,
882            task_handle,
883            shutdown_tx,
884        })
885    }
886
887    /// Executes the subscription once without retry logic.
888    ///
889    /// Processes all pending events and returns. Does not poll for new events.
890    pub async fn unretry_execute(self, executor: &E) -> anyhow::Result<()> {
891        self.without_retry().execute(executor).await
892    }
893
894    /// Executes the subscription once, processing all pending events.
895    ///
896    /// Unlike `start()`, this does not run continuously. It processes
897    /// all currently pending events and returns.
898    pub async fn execute(&self, executor: &E) -> anyhow::Result<()> {
899        let id = Ulid::new();
900
901        executor
902            .upsert_subscriber(self.key(), id.to_owned())
903            .await?;
904
905        let read_aggregators = self.read_aggregators();
906
907        let _ = tracing::error_span!(
908            "execute",
909            key = self.key(),
910            aggregator_type = tracing::field::Empty,
911            aggregator_id = tracing::field::Empty,
912            event = tracing::field::Empty,
913        )
914        .entered();
915
916        match self.retry {
917            Some(retry) => {
918                (|| async { self.process(executor, &id, &read_aggregators, None).await })
919                    .retry(ExponentialBuilder::default().with_max_times(retry.into()))
920                    .sleep(tokio::time::sleep)
921                    .notify(|err, dur| {
922                        tracing::error!(
923                            error = %err,
924                            duration = ?dur,
925                            "Failed to process event"
926                        );
927                    })
928                    .await
929            }
930            _ => self.process(executor, &id, &read_aggregators, None).await,
931        }
932    }
933}
934
935/// Handle to a running event subscription.
936///
937/// Returned by [`SubscriptionBuilder::start`], this handle provides
938/// the subscription ID and a method for graceful shutdown.
939///
940/// # Example
941///
942/// ```rust,ignore
943/// let subscription = projection
944///     .subscription()
945///     .start(&executor)
946///     .await?;
947///
948/// println!("Started subscription: {}", subscription.id);
949///
950/// // On application shutdown
951/// subscription.shutdown().await?;
952/// ```
953#[derive(Debug)]
954pub struct Subscription {
955    /// Unique ID for this subscription instance
956    pub id: Ulid,
957    task_handle: tokio::task::JoinHandle<()>,
958    shutdown_tx: tokio::sync::oneshot::Sender<()>,
959}
960
961impl Subscription {
962    /// Gracefully shuts down the subscription.
963    ///
964    /// Signals the subscription to stop and waits for it to finish
965    /// processing the current event before returning.
966    pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
967        let _ = self.shutdown_tx.send(());
968
969        self.task_handle.await
970    }
971}