evento_core/
projection.rs

1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//!     .handler(account_opened)
24//!     .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//!     .load::<Account>("account-123")
29//!     .execute(&executor)
30//!     .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//!     .subscription()
35//!     .routing_key("accounts")
36//!     .start(&executor)
37//!     .await?;
38//! ```
39
40use std::{
41    collections::HashMap,
42    future::Future,
43    ops::{Deref, DerefMut},
44    pin::Pin,
45    time::Duration,
46};
47use tokio::time::{interval_at, Instant};
48use ulid::Ulid;
49
50use backon::{ExponentialBuilder, Retryable};
51
52use crate::{
53    context,
54    cursor::{Args, Cursor},
55    Executor, ReadAggregator,
56};
57
58/// Filter for events by routing key.
59///
60/// Routing keys allow partitioning events for parallel processing
61/// or filtering subscriptions to specific event streams.
62#[derive(Clone)]
63pub enum RoutingKey {
64    /// Match all events regardless of routing key
65    All,
66    /// Match events with a specific routing key (or no key if `None`)
67    Value(Option<String>),
68}
69
70/// Handler context providing access to executor and shared data.
71///
72/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
73/// data storage and provides access to the executor for database operations.
74///
75/// # Example
76///
77/// ```rust,ignore
78/// #[evento::handler]
79/// async fn my_handler<E: Executor>(
80///     event: Event<MyEventData>,
81///     action: Action<'_, MyView, E>,
82/// ) -> anyhow::Result<()> {
83///     if let Action::Handle(ctx) = action {
84///         // Access shared data
85///         let config: Data<AppConfig> = ctx.extract();
86///
87///         // Use executor for queries
88///         let events = ctx.executor.read(...).await?;
89///     }
90///     Ok(())
91/// }
92/// ```
93#[derive(Clone)]
94pub struct Context<'a, E: Executor> {
95    context: context::RwContext,
96    /// Reference to the executor for database operations
97    pub executor: &'a E,
98}
99
100impl<'a, E: Executor> Deref for Context<'a, E> {
101    type Target = context::RwContext;
102
103    fn deref(&self) -> &Self::Target {
104        &self.context
105    }
106}
107
108/// Trait for aggregate types.
109///
110/// Aggregates are the root entities in event sourcing. Each aggregate
111/// type has a unique identifier string used for event storage and routing.
112///
113/// This trait is typically derived using the `#[evento::aggregator]` macro.
114///
115/// # Example
116///
117/// ```rust,ignore
118/// #[evento::aggregator("myapp/Account")]
119/// #[derive(Default)]
120/// pub struct Account {
121///     pub balance: i64,
122///     pub owner: String,
123/// }
124/// ```
125pub trait Aggregator: Default {
126    /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
127    fn aggregator_type() -> &'static str;
128}
129
130/// Trait for event types.
131///
132/// Events represent state changes that have occurred. Each event type
133/// has a name and belongs to an aggregator type.
134///
135/// This trait is typically derived using the `#[evento::aggregator]` macro.
136///
137/// # Example
138///
139/// ```rust,ignore
140/// #[evento::aggregator("myapp/Account")]
141/// #[derive(bitcode::Encode, bitcode::Decode)]
142/// pub struct AccountOpened {
143///     pub owner: String,
144/// }
145/// ```
146pub trait Event: Aggregator {
147    /// Returns the event name (e.g., "AccountOpened")
148    fn event_name() -> &'static str;
149}
150
151/// Trait for event handlers.
152///
153/// Handlers process events in two modes:
154/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
155/// - `apply`: For loading aggregate state by replaying events
156///
157/// This trait is typically implemented via the `#[evento::handler]` macro.
158pub trait Handler<P: 'static, E: Executor>: Sync + Send {
159    /// Handles an event during subscription processing.
160    ///
161    /// This is called when processing events in a subscription context,
162    /// where side effects like database updates or API calls are appropriate.
163    fn handle<'a>(
164        &'a self,
165        context: &'a Context<'a, E>,
166        event: &'a crate::Event,
167    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
168
169    /// Applies an event to build projection state.
170    ///
171    /// This is called when loading aggregate state by replaying events.
172    /// It should be a pure function that modifies the projection without side effects.
173    fn apply<'a>(
174        &'a self,
175        projection: &'a mut P,
176        event: &'a crate::Event,
177    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
178
179    /// Returns the aggregator type this handler processes.
180    fn aggregator_type(&self) -> &'static str;
181    /// Returns the event name this handler processes.
182    fn event_name(&self) -> &'static str;
183}
184
185/// Action passed to event handlers.
186///
187/// Determines whether the handler should apply state changes or
188/// handle the event with side effects.
189pub enum Action<'a, P: 'static, E: Executor> {
190    /// Apply event to projection state (for loading)
191    Apply(&'a mut P),
192    /// Handle event with context (for subscriptions)
193    Handle(&'a Context<'a, E>),
194}
195
196/// Typed event with deserialized data and metadata.
197///
198/// `EventData` wraps a raw [`Event`](crate::Event) and provides typed access
199/// to the deserialized event data and metadata. It implements `Deref` to
200/// provide access to the underlying event fields (id, timestamp, version, etc.).
201///
202/// # Type Parameters
203///
204/// - `D`: The event data type (e.g., `AccountOpened`)
205/// - `M`: The metadata type (defaults to `bool` for no metadata)
206///
207/// # Example
208///
209/// ```rust,ignore
210/// use evento::metadata::Event;
211///
212/// #[evento::handler]
213/// async fn handle_deposit<E: Executor>(
214///     event: Event<MoneyDeposited>,
215///     action: Action<'_, AccountView, E>,
216/// ) -> anyhow::Result<()> {
217///     // Access typed data
218///     println!("Amount: {}", event.data.amount);
219///
220///     // Access metadata
221///     if let Ok(user) = event.metadata.user() {
222///         println!("By user: {}", user);
223///     }
224///
225///     // Access underlying event fields via Deref
226///     println!("Event ID: {}", event.id);
227///     println!("Version: {}", event.version);
228///
229///     Ok(())
230/// }
231/// ```
232pub struct EventData<D, M = bool> {
233    event: crate::Event,
234    /// The typed event data
235    pub data: D,
236    /// The typed event metadata
237    pub metadata: M,
238}
239
240impl<D, M> Deref for EventData<D, M> {
241    type Target = crate::Event;
242
243    fn deref(&self) -> &Self::Target {
244        &self.event
245    }
246}
247
248impl<D, M> TryFrom<&crate::Event> for EventData<D, M>
249where
250    D: bitcode::DecodeOwned,
251    M: bitcode::DecodeOwned,
252{
253    type Error = bitcode::Error;
254
255    fn try_from(value: &crate::Event) -> Result<Self, Self::Error> {
256        let data = bitcode::decode::<D>(&value.data)?;
257        let metadata = bitcode::decode::<M>(&value.metadata)?;
258        Ok(EventData {
259            data,
260            metadata,
261            event: value.clone(),
262        })
263    }
264}
265
266/// Container for event handlers that build a projection.
267///
268/// A `Projection` groups related event handlers together and provides
269/// methods to load aggregate state or create subscriptions.
270///
271/// # Type Parameters
272///
273/// - `P`: The projection/view type being built
274/// - `E`: The executor type for database operations
275///
276/// # Example
277///
278/// ```rust,ignore
279/// let projection = Projection::<AccountView, _>::new("accounts")
280///     .handler(account_opened)
281///     .handler(money_deposited)
282///     .handler(money_withdrawn);
283///
284/// // Use for loading state
285/// let state = projection.clone()
286///     .load::<Account>("account-123")
287///     .execute(&executor)
288///     .await?;
289///
290/// // Or create a subscription
291/// let sub = projection
292///     .subscription()
293///     .start(&executor)
294///     .await?;
295/// ```
296pub struct Projection<P: 'static, E: Executor> {
297    key: String,
298    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
299}
300
301impl<P: 'static, E: Executor> Projection<P, E> {
302    /// Creates a new projection with the given key.
303    ///
304    /// The key is used as the subscription identifier for cursor tracking.
305    pub fn new(key: impl Into<String>) -> Self {
306        Self {
307            key: key.into(),
308            handlers: HashMap::new(),
309        }
310    }
311
312    /// Registers an event handler with this projection.
313    ///
314    /// # Panics
315    ///
316    /// Panics if a handler for the same event type is already registered.
317    pub fn handler<H: Handler<P, E> + 'static>(mut self, h: H) -> Self {
318        let key = format!("{}_{}", h.aggregator_type(), h.event_name());
319        if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
320            panic!("Cannot register event handler: key {} already exists", key);
321        }
322        self
323    }
324
325    /// Creates a builder for loading aggregate state.
326    ///
327    /// This consumes the projection and returns a [`LoadBuilder`] configured
328    /// to load the state for the specified aggregate.
329    ///
330    /// # Type Parameters
331    ///
332    /// - `A`: The aggregate type to load
333    pub fn load<A: Aggregator>(self, id: impl Into<String>) -> LoadBuilder<P, E>
334    where
335        P: Snapshot + Default,
336    {
337        let id = id.into();
338        let mut aggregators = HashMap::new();
339        aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
340
341        LoadBuilder {
342            key: self.key.to_owned(),
343            id,
344            aggregators,
345            handlers: self.handlers,
346            context: Default::default(),
347            filter_events_by_name: true,
348        }
349    }
350
351    /// Creates a builder for a continuous event subscription.
352    ///
353    /// This consumes the projection and returns a [`SubscriptionBuilder`]
354    /// that can be configured and started.
355    pub fn subscription(self) -> SubscriptionBuilder<P, E> {
356        SubscriptionBuilder {
357            key: self.key.to_owned(),
358            context: Default::default(),
359            handlers: self.handlers,
360            delay: None,
361            retry: Some(30),
362            chunk_size: 300,
363            is_accept_failure: false,
364            routing_key: RoutingKey::Value(None),
365            aggregators: Default::default(),
366        }
367    }
368}
369
370/// Result of loading an aggregate's state.
371///
372/// Contains the rebuilt projection state along with the current version
373/// and routing key. Implements `Deref` and `DerefMut` for transparent
374/// access to the inner item.
375///
376/// # Example
377///
378/// ```rust,ignore
379/// let result: LoadResult<AccountView> = projection
380///     .load::<Account>("account-123")
381///     .execute(&executor)
382///     .await?
383///     .expect("Account not found");
384///
385/// // Access inner item via Deref
386/// println!("Balance: {}", result.balance);
387///
388/// // Access metadata
389/// println!("Version: {}", result.version);
390/// println!("Routing key: {:?}", result.routing_key);
391/// ```
392#[derive(Debug, Clone, Default)]
393pub struct LoadResult<A> {
394    /// The loaded projection/view state
395    pub item: A,
396    /// Current version of the aggregate
397    pub version: u16,
398    /// Routing key for the aggregate (if set)
399    pub routing_key: Option<String>,
400}
401
402impl<A> Deref for LoadResult<A> {
403    type Target = A;
404    fn deref(&self) -> &Self::Target {
405        &self.item
406    }
407}
408
409impl<A> DerefMut for LoadResult<A> {
410    fn deref_mut(&mut self) -> &mut Self::Target {
411        &mut self.item
412    }
413}
414
415/// Trait for types that can be restored from snapshots.
416///
417/// Snapshots provide a performance optimization by storing pre-computed
418/// state, avoiding the need to replay all events from the beginning.
419///
420/// This trait is typically implemented via the `#[evento::snapshot]` macro.
421///
422/// # Example
423///
424/// ```rust,ignore
425/// #[evento::snapshot]
426/// #[derive(Default)]
427/// pub struct AccountView {
428///     pub balance: i64,
429///     pub owner: String,
430/// }
431///
432/// // The macro generates the restore implementation that loads
433/// // from a snapshot table if available
434/// ```
435pub trait Snapshot: Sized {
436    /// Restores state from a snapshot if available.
437    ///
438    /// Returns `None` if no snapshot exists for the given ID.
439    fn restore<'a>(
440        _context: &'a context::RwContext,
441        _id: String,
442    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Option<Self>>> + Send + 'a>> {
443        Box::pin(async { Ok(None) })
444    }
445}
446
447/// Builder for loading aggregate state from events.
448///
449/// Created via [`Projection::load`], this builder configures how to
450/// load an aggregate's state by replaying events.
451///
452/// # Example
453///
454/// ```rust,ignore
455/// let result = projection
456///     .load::<Account>("account-123")
457///     .data(app_config)  // Add shared data
458///     .aggregator::<User>("user-456")  // Add related aggregate
459///     .execute(&executor)
460///     .await?;
461/// ```
462pub struct LoadBuilder<P: Snapshot + Default + 'static, E: Executor> {
463    key: String,
464    id: String,
465    aggregators: HashMap<String, String>,
466    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
467    context: context::RwContext,
468    filter_events_by_name: bool,
469}
470
471impl<P: Snapshot + Default + 'static, E: Executor> LoadBuilder<P, E> {
472    /// Adds shared data to the load context.
473    ///
474    /// Data added here is accessible in handlers via the context.
475    pub fn data<D: Send + Sync + 'static>(&mut self, v: D) -> &mut Self {
476        self.context.insert(v);
477
478        self
479    }
480
481    /// Adds a related aggregate to load events from.
482    ///
483    /// Use this when the projection needs events from multiple aggregates.
484    pub fn aggregator<A: Aggregator>(&mut self, id: impl Into<String>) -> &mut Self {
485        self.aggregators
486            .insert(A::aggregator_type().to_owned(), id.into());
487
488        self
489    }
490
491    pub fn filter_events_by_name(&mut self, v: bool) -> &mut Self {
492        self.filter_events_by_name = v;
493
494        self
495    }
496
497    /// Executes the load operation, returning the rebuilt state.
498    ///
499    /// Returns `None` if no events exist for the aggregate.
500    /// Returns `Err` if there are too many events to process in one batch.
501    pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<LoadResult<P>>> {
502        let context = Context {
503            context: self.context.clone(),
504            executor,
505        };
506
507        let mut cursor = executor.get_subscriber_cursor(self.key.to_owned()).await?;
508        let (mut version, mut routing_key) = match cursor {
509            Some(ref cursor) => {
510                let cursor = crate::Event::deserialize_cursor(cursor)?;
511
512                (cursor.v, cursor.r)
513            }
514            _ => (0, None),
515        };
516        let loaded = P::restore(&context, self.id.to_owned()).await?;
517        if loaded.is_none() {
518            cursor = None;
519        }
520
521        let mut read_aggregators = vec![];
522        for handler in self.handlers.values() {
523            let Some(id) = self.aggregators.get(handler.aggregator_type()) else {
524                anyhow::bail!(
525                    "Failed to load projection {}/{}: id not found",
526                    handler.aggregator_type(),
527                    handler.event_name()
528                );
529            };
530
531            read_aggregators.push(ReadAggregator {
532                aggregator_type: handler.aggregator_type().to_owned(),
533                aggregator_id: Some(id.to_owned()),
534                name: if self.filter_events_by_name {
535                    Some(handler.event_name().to_owned())
536                } else {
537                    None
538                },
539            });
540        }
541
542        let events = executor
543            .read(
544                Some(read_aggregators.to_vec()),
545                None,
546                Args::forward(100, cursor.clone()),
547            )
548            .await?;
549
550        if events.edges.is_empty() && loaded.is_none() {
551            return Ok(None);
552        }
553
554        let mut snapshot = loaded.unwrap_or_default();
555
556        for event in events.edges.iter() {
557            let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
558            let Some(handler) = self.handlers.get(&key) else {
559                tracing::debug!("No handler found for {}/{key}", self.key);
560                continue;
561            };
562
563            handler.apply(&mut snapshot, &event.node).await?;
564        }
565
566        if events.page_info.has_next_page {
567            anyhow::bail!("Too busy");
568        }
569
570        if let Some(event) = events.edges.last() {
571            version = event.node.version;
572            routing_key = event.node.routing_key.to_owned();
573        }
574
575        Ok(Some(LoadResult {
576            item: snapshot,
577            version,
578            routing_key,
579        }))
580    }
581}
582
583/// Builder for creating event subscriptions.
584///
585/// Created via [`Projection::subscription`], this builder configures
586/// a continuous event processing subscription with retry logic,
587/// routing key filtering, and graceful shutdown support.
588///
589/// # Example
590///
591/// ```rust,ignore
592/// let subscription = projection
593///     .subscription()
594///     .routing_key("accounts")
595///     .chunk_size(100)
596///     .retry(5)
597///     .delay(Duration::from_secs(10))
598///     .start(&executor)
599///     .await?;
600///
601/// // Later, gracefully shutdown
602/// subscription.shutdown().await?;
603/// ```
604pub struct SubscriptionBuilder<P: 'static, E: Executor> {
605    key: String,
606    handlers: HashMap<String, Box<dyn Handler<P, E>>>,
607    context: context::RwContext,
608    routing_key: RoutingKey,
609    delay: Option<Duration>,
610    chunk_size: u16,
611    is_accept_failure: bool,
612    retry: Option<u8>,
613    aggregators: HashMap<String, String>,
614}
615
616impl<P, E: Executor + 'static> SubscriptionBuilder<P, E> {
617    /// Adds shared data to the load context.
618    ///
619    /// Data added here is accessible in handlers via the context.
620    pub fn data<D: Send + Sync + 'static>(self, v: D) -> Self {
621        self.context.insert(v);
622
623        self
624    }
625
626    /// Allows the subscription to continue after handler failures.
627    ///
628    /// By default, subscriptions stop on the first error. With this flag,
629    /// errors are logged but processing continues.
630    pub fn accept_failure(mut self) -> Self {
631        self.is_accept_failure = true;
632
633        self
634    }
635
636    /// Sets the number of events to process per batch.
637    ///
638    /// Default is 300.
639    pub fn chunk_size(mut self, v: u16) -> Self {
640        self.chunk_size = v;
641
642        self
643    }
644
645    /// Sets a delay before starting the subscription.
646    ///
647    /// Useful for staggering subscription starts in multi-node deployments.
648    pub fn delay(mut self, v: Duration) -> Self {
649        self.delay = Some(v);
650
651        self
652    }
653
654    /// Filters events by routing key.
655    ///
656    /// Only events with the matching routing key will be processed.
657    pub fn routing_key(mut self, v: impl Into<String>) -> Self {
658        self.routing_key = RoutingKey::Value(Some(v.into()));
659
660        self
661    }
662
663    /// Sets the maximum number of retries on failure.
664    ///
665    /// Uses exponential backoff. Default is 30.
666    pub fn retry(mut self, v: u8) -> Self {
667        self.retry = Some(v);
668
669        self
670    }
671
672    fn without_retry(mut self) -> Self {
673        self.retry = None;
674
675        self
676    }
677
678    /// Processes all events regardless of routing key.
679    pub fn all(mut self) -> Self {
680        self.routing_key = RoutingKey::All;
681
682        self
683    }
684
685    /// Adds a related aggregate to process events from.
686    pub fn aggregator<A: Aggregator>(mut self, id: impl Into<String>) -> Self {
687        self.aggregators
688            .insert(A::aggregator_type().to_owned(), id.into());
689
690        self
691    }
692
693    fn read_aggregators(&self) -> Vec<ReadAggregator> {
694        self.handlers
695            .values()
696            .map(|h| match self.aggregators.get(h.aggregator_type()) {
697                Some(id) => ReadAggregator {
698                    aggregator_type: h.aggregator_type().to_owned(),
699                    aggregator_id: Some(id.to_owned()),
700                    name: Some(h.event_name().to_owned()),
701                },
702                _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
703            })
704            .collect()
705    }
706
707    fn key(&self) -> String {
708        if let RoutingKey::Value(Some(ref key)) = self.routing_key {
709            return format!("{key}.{}", self.key);
710        }
711
712        self.key.to_owned()
713    }
714
715    async fn process(
716        &self,
717        executor: &E,
718        id: &Ulid,
719        aggregators: &[ReadAggregator],
720        mut rx: Option<&mut tokio::sync::oneshot::Receiver<()>>,
721    ) -> anyhow::Result<()> {
722        let mut interval = interval_at(
723            Instant::now() - Duration::from_millis(400),
724            Duration::from_millis(300),
725        );
726
727        loop {
728            interval.tick().await;
729
730            if !executor.is_subscriber_running(self.key(), *id).await? {
731                return Ok(());
732            }
733
734            let cursor = executor.get_subscriber_cursor(self.key()).await?;
735
736            let timestamp = executor
737                .read(
738                    Some(aggregators.to_vec()),
739                    Some(self.routing_key.to_owned()),
740                    Args::backward(1, None),
741                )
742                .await?
743                .edges
744                .last()
745                .map(|e| e.node.timestamp)
746                .unwrap_or_default();
747
748            let res = executor
749                .read(
750                    Some(aggregators.to_vec()),
751                    Some(self.routing_key.to_owned()),
752                    Args::forward(self.chunk_size, cursor),
753                )
754                .await?;
755
756            if res.edges.is_empty() {
757                return Ok(());
758            }
759
760            let context = Context {
761                context: self.context.clone(),
762                executor,
763            };
764
765            for event in res.edges {
766                if let Some(rx) = rx.as_mut() {
767                    if rx.try_recv().is_ok() {
768                        tracing::info!(
769                            key = self.key(),
770                            "Subscription received shutdown signal, stopping gracefull"
771                        );
772
773                        return Ok(());
774                    }
775                }
776
777                tracing::Span::current().record("aggregator_type", &event.node.aggregator_type);
778                tracing::Span::current().record("aggregator_id", &event.node.aggregator_id);
779                tracing::Span::current().record("event", &event.node.name);
780
781                let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
782                let Some(handler) = self.handlers.get(&key) else {
783                    panic!("No handler found for {}/{key}", self.key());
784                };
785
786                handler.handle(&context, &event.node).await?;
787
788                executor
789                    .acknowledge(
790                        self.key(),
791                        event.cursor.to_owned(),
792                        timestamp - event.node.timestamp,
793                    )
794                    .await?;
795            }
796        }
797    }
798
799    /// Starts the subscription without retry logic.
800    ///
801    /// Equivalent to calling `start()` with retries disabled.
802    pub async fn unretry_start(self, executor: &E) -> anyhow::Result<Subscription>
803    where
804        E: Clone,
805    {
806        self.without_retry().start(executor).await
807    }
808
809    /// Starts a continuous background subscription.
810    ///
811    /// Returns a [`Subscription`] handle that can be used for graceful shutdown.
812    /// The subscription runs in a spawned tokio task and polls for new events.
813    pub async fn start(self, executor: &E) -> anyhow::Result<Subscription>
814    where
815        E: Clone,
816    {
817        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
818        let executor = executor.clone();
819        let id = Ulid::new();
820        let subscription_id = id;
821
822        executor
823            .upsert_subscriber(self.key(), id.to_owned())
824            .await?;
825
826        let task_handle = tokio::spawn(async move {
827            let read_aggregators = self.read_aggregators();
828            let start = self
829                .delay
830                .map(|d| Instant::now() + d)
831                .unwrap_or_else(Instant::now);
832
833            let mut interval = interval_at(
834                start - Duration::from_millis(1200),
835                Duration::from_millis(1000),
836            );
837
838            loop {
839                if shutdown_rx.try_recv().is_ok() {
840                    tracing::info!(
841                        key = self.key(),
842                        "Subscription received shutdown signal, stopping gracefull"
843                    );
844
845                    break;
846                }
847
848                interval.tick().await;
849
850                let _ = tracing::error_span!(
851                    "start",
852                    key = self.key(),
853                    aggregator_type = tracing::field::Empty,
854                    aggregator_id = tracing::field::Empty,
855                    event = tracing::field::Empty,
856                )
857                .entered();
858
859                let result = match self.retry {
860                    Some(retry) => {
861                        (|| async { self.process(&executor, &id, &read_aggregators, None).await })
862                            .retry(ExponentialBuilder::default().with_max_times(retry.into()))
863                            .sleep(tokio::time::sleep)
864                            .notify(|err, dur| {
865                                tracing::error!(
866                                    error = %err,
867                                    duration = ?dur,
868                                    "Failed to process event"
869                                );
870                            })
871                            .await
872                    }
873                    _ => self.process(&executor, &id, &read_aggregators, None).await,
874                };
875
876                let Err(err) = result else {
877                    continue;
878                };
879
880                tracing::error!(error = %err, "Failed to process event");
881
882                if !self.is_accept_failure {
883                    break;
884                }
885            }
886        });
887
888        Ok(Subscription {
889            id: subscription_id,
890            task_handle,
891            shutdown_tx,
892        })
893    }
894
895    /// Executes the subscription once without retry logic.
896    ///
897    /// Processes all pending events and returns. Does not poll for new events.
898    pub async fn unretry_execute(self, executor: &E) -> anyhow::Result<()> {
899        self.without_retry().execute(executor).await
900    }
901
902    /// Executes the subscription once, processing all pending events.
903    ///
904    /// Unlike `start()`, this does not run continuously. It processes
905    /// all currently pending events and returns.
906    pub async fn execute(&self, executor: &E) -> anyhow::Result<()> {
907        let id = Ulid::new();
908
909        executor
910            .upsert_subscriber(self.key(), id.to_owned())
911            .await?;
912
913        let read_aggregators = self.read_aggregators();
914
915        let _ = tracing::error_span!(
916            "execute",
917            key = self.key(),
918            aggregator_type = tracing::field::Empty,
919            aggregator_id = tracing::field::Empty,
920            event = tracing::field::Empty,
921        )
922        .entered();
923
924        match self.retry {
925            Some(retry) => {
926                (|| async { self.process(executor, &id, &read_aggregators, None).await })
927                    .retry(ExponentialBuilder::default().with_max_times(retry.into()))
928                    .sleep(tokio::time::sleep)
929                    .notify(|err, dur| {
930                        tracing::error!(
931                            error = %err,
932                            duration = ?dur,
933                            "Failed to process event"
934                        );
935                    })
936                    .await
937            }
938            _ => self.process(executor, &id, &read_aggregators, None).await,
939        }
940    }
941}
942
943/// Handle to a running event subscription.
944///
945/// Returned by [`SubscriptionBuilder::start`], this handle provides
946/// the subscription ID and a method for graceful shutdown.
947///
948/// # Example
949///
950/// ```rust,ignore
951/// let subscription = projection
952///     .subscription()
953///     .start(&executor)
954///     .await?;
955///
956/// println!("Started subscription: {}", subscription.id);
957///
958/// // On application shutdown
959/// subscription.shutdown().await?;
960/// ```
961#[derive(Debug)]
962pub struct Subscription {
963    /// Unique ID for this subscription instance
964    pub id: Ulid,
965    task_handle: tokio::task::JoinHandle<()>,
966    shutdown_tx: tokio::sync::oneshot::Sender<()>,
967}
968
969impl Subscription {
970    /// Gracefully shuts down the subscription.
971    ///
972    /// Signals the subscription to stop and waits for it to finish
973    /// processing the current event before returning.
974    pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
975        let _ = self.shutdown_tx.send(());
976
977        self.task_handle.await
978    }
979}