evento_core/projection.rs
1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//! .handler(account_opened)
24//! .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//! .load::<Account>("account-123")
29//! .execute(&executor)
30//! .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//! .subscription()
35//! .routing_key("accounts")
36//! .start(&executor)
37//! .await?;
38//! ```
39
40use std::{
41 collections::HashMap,
42 future::Future,
43 ops::{Deref, DerefMut},
44 pin::Pin,
45 time::Duration,
46};
47use tokio::time::{interval_at, Instant};
48use ulid::Ulid;
49
50use backon::{ExponentialBuilder, Retryable};
51
52use crate::{context, cursor::Args, Executor, ReadAggregator};
53
54/// Filter for events by routing key.
55///
56/// Routing keys allow partitioning events for parallel processing
57/// or filtering subscriptions to specific event streams.
58#[derive(Clone)]
59pub enum RoutingKey {
60 /// Match all events regardless of routing key
61 All,
62 /// Match events with a specific routing key (or no key if `None`)
63 Value(Option<String>),
64}
65
66/// Handler context providing access to executor and shared data.
67///
68/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
69/// data storage and provides access to the executor for database operations.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// #[evento::handler]
75/// async fn my_handler<E: Executor>(
76/// event: Event<MyEventData>,
77/// action: Action<'_, MyView, E>,
78/// ) -> anyhow::Result<()> {
79/// if let Action::Handle(ctx) = action {
80/// // Access shared data
81/// let config: Data<AppConfig> = ctx.extract();
82///
83/// // Use executor for queries
84/// let events = ctx.executor.read(...).await?;
85/// }
86/// Ok(())
87/// }
88/// ```
89#[derive(Clone)]
90pub struct Context<'a, E: Executor> {
91 context: context::RwContext,
92 /// Reference to the executor for database operations
93 pub executor: &'a E,
94}
95
96impl<'a, E: Executor> Deref for Context<'a, E> {
97 type Target = context::RwContext;
98
99 fn deref(&self) -> &Self::Target {
100 &self.context
101 }
102}
103
104/// Trait for aggregate types.
105///
106/// Aggregates are the root entities in event sourcing. Each aggregate
107/// type has a unique identifier string used for event storage and routing.
108///
109/// This trait is typically derived using the `#[evento::aggregator]` macro.
110///
111/// # Example
112///
113/// ```rust,ignore
114/// #[evento::aggregator("myapp/Account")]
115/// #[derive(Default)]
116/// pub struct Account {
117/// pub balance: i64,
118/// pub owner: String,
119/// }
120/// ```
121pub trait Aggregator: Default {
122 /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
123 fn aggregator_type() -> &'static str;
124}
125
126/// Trait for event types.
127///
128/// Events represent state changes that have occurred. Each event type
129/// has a name and belongs to an aggregator type.
130///
131/// This trait is typically derived using the `#[evento::aggregator]` macro.
132///
133/// # Example
134///
135/// ```rust,ignore
136/// #[evento::aggregator("myapp/Account")]
137/// #[derive(bitcode::Encode, bitcode::Decode)]
138/// pub struct AccountOpened {
139/// pub owner: String,
140/// }
141/// ```
142pub trait Event: Aggregator {
143 /// Returns the event name (e.g., "AccountOpened")
144 fn event_name() -> &'static str;
145}
146
147/// Trait for event handlers.
148///
149/// Handlers process events in two modes:
150/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
151/// - `apply`: For loading aggregate state by replaying events
152///
153/// This trait is typically implemented via the `#[evento::handler]` macro.
154pub trait Handler<P: 'static, E: Executor>: Sync + Send {
155 /// Handles an event during subscription processing.
156 ///
157 /// This is called when processing events in a subscription context,
158 /// where side effects like database updates or API calls are appropriate.
159 fn handle<'a>(
160 &'a self,
161 context: &'a Context<'a, E>,
162 event: &'a crate::Event,
163 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
164
165 /// Applies an event to build projection state.
166 ///
167 /// This is called when loading aggregate state by replaying events.
168 /// It should be a pure function that modifies the projection without side effects.
169 fn apply<'a>(
170 &'a self,
171 projection: &'a mut P,
172 event: &'a crate::Event,
173 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
174
175 /// Returns the aggregator type this handler processes.
176 fn aggregator_type(&self) -> &'static str;
177 /// Returns the event name this handler processes.
178 fn event_name(&self) -> &'static str;
179}
180
181/// Action passed to event handlers.
182///
183/// Determines whether the handler should apply state changes or
184/// handle the event with side effects.
185pub enum Action<'a, P: 'static, E: Executor> {
186 /// Apply event to projection state (for loading)
187 Apply(&'a mut P),
188 /// Handle event with context (for subscriptions)
189 Handle(&'a Context<'a, E>),
190}
191
192/// Typed event with deserialized data and metadata.
193///
194/// `EventData` wraps a raw [`Event`](crate::Event) and provides typed access
195/// to the deserialized event data and metadata. It implements `Deref` to
196/// provide access to the underlying event fields (id, timestamp, version, etc.).
197///
198/// # Type Parameters
199///
200/// - `D`: The event data type (e.g., `AccountOpened`)
201/// - `M`: The metadata type (defaults to `bool` for no metadata)
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use evento::metadata::Event;
207///
208/// #[evento::handler]
209/// async fn handle_deposit<E: Executor>(
210/// event: Event<MoneyDeposited>,
211/// action: Action<'_, AccountView, E>,
212/// ) -> anyhow::Result<()> {
213/// // Access typed data
214/// println!("Amount: {}", event.data.amount);
215///
216/// // Access metadata
217/// if let Ok(user) = event.metadata.user() {
218/// println!("By user: {}", user);
219/// }
220///
221/// // Access underlying event fields via Deref
222/// println!("Event ID: {}", event.id);
223/// println!("Version: {}", event.version);
224///
225/// Ok(())
226/// }
227/// ```
228pub struct EventData<D, M = bool> {
229 event: crate::Event,
230 /// The typed event data
231 pub data: D,
232 /// The typed event metadata
233 pub metadata: M,
234}
235
236impl<D, M> Deref for EventData<D, M> {
237 type Target = crate::Event;
238
239 fn deref(&self) -> &Self::Target {
240 &self.event
241 }
242}
243
244impl<D, M> TryFrom<&crate::Event> for EventData<D, M>
245where
246 D: bitcode::DecodeOwned,
247 M: bitcode::DecodeOwned,
248{
249 type Error = bitcode::Error;
250
251 fn try_from(value: &crate::Event) -> Result<Self, Self::Error> {
252 let data = bitcode::decode::<D>(&value.data)?;
253 let metadata = bitcode::decode::<M>(&value.metadata)?;
254 Ok(EventData {
255 data,
256 metadata,
257 event: value.clone(),
258 })
259 }
260}
261
262/// Container for event handlers that build a projection.
263///
264/// A `Projection` groups related event handlers together and provides
265/// methods to load aggregate state or create subscriptions.
266///
267/// # Type Parameters
268///
269/// - `P`: The projection/view type being built
270/// - `E`: The executor type for database operations
271///
272/// # Example
273///
274/// ```rust,ignore
275/// let projection = Projection::<AccountView, _>::new("accounts")
276/// .handler(account_opened)
277/// .handler(money_deposited)
278/// .handler(money_withdrawn);
279///
280/// // Use for loading state
281/// let state = projection.clone()
282/// .load::<Account>("account-123")
283/// .execute(&executor)
284/// .await?;
285///
286/// // Or create a subscription
287/// let sub = projection
288/// .subscription()
289/// .start(&executor)
290/// .await?;
291/// ```
292pub struct Projection<P: 'static, E: Executor> {
293 key: String,
294 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
295}
296
297impl<P: 'static, E: Executor> Projection<P, E> {
298 /// Creates a new projection with the given key.
299 ///
300 /// The key is used as the subscription identifier for cursor tracking.
301 pub fn new(key: impl Into<String>) -> Self {
302 Self {
303 key: key.into(),
304 handlers: HashMap::new(),
305 }
306 }
307
308 /// Registers an event handler with this projection.
309 ///
310 /// # Panics
311 ///
312 /// Panics if a handler for the same event type is already registered.
313 pub fn handler<H: Handler<P, E> + 'static>(mut self, h: H) -> Self {
314 let key = format!("{}_{}", h.aggregator_type(), h.event_name());
315 if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
316 panic!("Cannot register event handler: key {} already exists", key);
317 }
318 self
319 }
320
321 /// Creates a builder for loading aggregate state.
322 ///
323 /// This consumes the projection and returns a [`LoadBuilder`] configured
324 /// to load the state for the specified aggregate.
325 ///
326 /// # Type Parameters
327 ///
328 /// - `A`: The aggregate type to load
329 pub fn load<A: Aggregator>(self, id: impl Into<String>) -> LoadBuilder<P, E>
330 where
331 P: Snapshot + Default,
332 {
333 let id = id.into();
334 let mut aggregators = HashMap::new();
335 aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
336
337 LoadBuilder {
338 key: self.key.to_owned(),
339 id,
340 aggregators,
341 handlers: self.handlers,
342 context: Default::default(),
343 filter_events_by_name: true,
344 }
345 }
346
347 /// Creates a builder for a continuous event subscription.
348 ///
349 /// This consumes the projection and returns a [`SubscriptionBuilder`]
350 /// that can be configured and started.
351 pub fn subscription(self) -> SubscriptionBuilder<P, E> {
352 SubscriptionBuilder {
353 key: self.key.to_owned(),
354 context: Default::default(),
355 handlers: self.handlers,
356 delay: None,
357 retry: Some(30),
358 chunk_size: 300,
359 is_accept_failure: false,
360 routing_key: RoutingKey::Value(None),
361 aggregators: Default::default(),
362 }
363 }
364}
365
366/// Result of loading an aggregate's state.
367///
368/// Contains the rebuilt projection state along with the current version
369/// and routing key. Implements `Deref` and `DerefMut` for transparent
370/// access to the inner item.
371///
372/// # Example
373///
374/// ```rust,ignore
375/// let result: LoadResult<AccountView> = projection
376/// .load::<Account>("account-123")
377/// .execute(&executor)
378/// .await?
379/// .expect("Account not found");
380///
381/// // Access inner item via Deref
382/// println!("Balance: {}", result.balance);
383///
384/// // Access metadata
385/// println!("Version: {}", result.version);
386/// println!("Routing key: {:?}", result.routing_key);
387/// ```
388#[derive(Debug, Clone, Default)]
389pub struct LoadResult<A> {
390 /// The loaded projection/view state
391 pub item: A,
392 /// Current version of the aggregate
393 pub version: u16,
394 /// Routing key for the aggregate (if set)
395 pub routing_key: Option<String>,
396}
397
398impl<A> Deref for LoadResult<A> {
399 type Target = A;
400 fn deref(&self) -> &Self::Target {
401 &self.item
402 }
403}
404
405impl<A> DerefMut for LoadResult<A> {
406 fn deref_mut(&mut self) -> &mut Self::Target {
407 &mut self.item
408 }
409}
410
411/// Type alias for an optional load result.
412pub type OptionLoadResult<A> = Option<LoadResult<A>>;
413
414/// Trait for types that can be restored from snapshots.
415///
416/// Snapshots provide a performance optimization by storing pre-computed
417/// state, avoiding the need to replay all events from the beginning.
418///
419/// This trait is typically implemented via the `#[evento::snapshot]` macro.
420///
421/// # Example
422///
423/// ```rust,ignore
424/// #[evento::snapshot]
425/// #[derive(Default)]
426/// pub struct AccountView {
427/// pub balance: i64,
428/// pub owner: String,
429/// }
430///
431/// // The macro generates the restore implementation that loads
432/// // from a snapshot table if available
433/// ```
434pub trait Snapshot: Sized {
435 /// Restores state from a snapshot if available.
436 ///
437 /// Returns `None` if no snapshot exists for the given ID.
438 fn restore<'a>(
439 context: &'a context::RwContext,
440 id: String,
441 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OptionLoadResult<Self>>> + Send + 'a>>;
442}
443
444/// Builder for loading aggregate state from events.
445///
446/// Created via [`Projection::load`], this builder configures how to
447/// load an aggregate's state by replaying events.
448///
449/// # Example
450///
451/// ```rust,ignore
452/// let result = projection
453/// .load::<Account>("account-123")
454/// .data(app_config) // Add shared data
455/// .aggregator::<User>("user-456") // Add related aggregate
456/// .execute(&executor)
457/// .await?;
458/// ```
459pub struct LoadBuilder<P: Snapshot + Default + 'static, E: Executor> {
460 key: String,
461 id: String,
462 aggregators: HashMap<String, String>,
463 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
464 context: context::RwContext,
465 filter_events_by_name: bool,
466}
467
468impl<P: Snapshot + Default + 'static, E: Executor> LoadBuilder<P, E> {
469 /// Adds shared data to the load context.
470 ///
471 /// Data added here is accessible in handlers via the context.
472 pub fn data<D: Send + Sync + 'static>(&mut self, v: D) -> &mut Self {
473 self.context.insert(v);
474
475 self
476 }
477
478 /// Adds a related aggregate to load events from.
479 ///
480 /// Use this when the projection needs events from multiple aggregates.
481 pub fn aggregator<A: Aggregator>(&mut self, id: impl Into<String>) -> &mut Self {
482 self.aggregators
483 .insert(A::aggregator_type().to_owned(), id.into());
484
485 self
486 }
487
488 pub fn filter_events_by_name(&mut self, v: bool) -> &mut Self {
489 self.filter_events_by_name = v;
490
491 self
492 }
493
494 /// Executes the load operation, returning the rebuilt state.
495 ///
496 /// Returns `None` if no events exist for the aggregate.
497 /// Returns `Err` if there are too many events to process in one batch.
498 pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<LoadResult<P>>> {
499 let context = Context {
500 context: self.context.clone(),
501 executor,
502 };
503
504 let cursor = executor.get_subscriber_cursor(self.key.to_owned()).await?;
505 let loaded = P::restore(&context, self.id.to_owned()).await?;
506 let has_loaded = loaded.is_some();
507 let mut snapshot = loaded.unwrap_or_default();
508
509 let mut read_aggregators = vec![];
510 for handler in self.handlers.values() {
511 let Some(id) = self.aggregators.get(handler.aggregator_type()) else {
512 anyhow::bail!(
513 "Failed to load projection {}/{}: id not found",
514 handler.aggregator_type(),
515 handler.event_name()
516 );
517 };
518
519 read_aggregators.push(ReadAggregator {
520 aggregator_type: handler.aggregator_type().to_owned(),
521 aggregator_id: Some(id.to_owned()),
522 name: if self.filter_events_by_name {
523 Some(handler.event_name().to_owned())
524 } else {
525 None
526 },
527 });
528 }
529
530 let events = executor
531 .read(
532 Some(read_aggregators.to_vec()),
533 None,
534 Args::forward(100, cursor.clone()),
535 )
536 .await?;
537
538 for event in events.edges.iter() {
539 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
540 let Some(handler) = self.handlers.get(&key) else {
541 tracing::debug!("No handler found for {}/{key}", self.key);
542 continue;
543 };
544
545 handler.apply(&mut snapshot, &event.node).await?;
546 }
547
548 if events.page_info.has_next_page {
549 anyhow::bail!("Too busy");
550 }
551
552 if let Some(event) = events.edges.last() {
553 snapshot.version = event.node.version;
554 snapshot.routing_key = event.node.routing_key.to_owned();
555
556 return Ok(Some(snapshot));
557 }
558
559 if !has_loaded {
560 return Ok(None);
561 }
562
563 let events = executor
564 .read(
565 Some(read_aggregators.to_vec()),
566 None,
567 Args::backward(1, None),
568 )
569 .await?;
570
571 if let Some(event) = events.edges.first() {
572 snapshot.version = event.node.version;
573 snapshot.routing_key = event.node.routing_key.to_owned();
574
575 return Ok(Some(snapshot));
576 }
577
578 Ok(None)
579 }
580}
581
582/// Builder for creating event subscriptions.
583///
584/// Created via [`Projection::subscription`], this builder configures
585/// a continuous event processing subscription with retry logic,
586/// routing key filtering, and graceful shutdown support.
587///
588/// # Example
589///
590/// ```rust,ignore
591/// let subscription = projection
592/// .subscription()
593/// .routing_key("accounts")
594/// .chunk_size(100)
595/// .retry(5)
596/// .delay(Duration::from_secs(10))
597/// .start(&executor)
598/// .await?;
599///
600/// // Later, gracefully shutdown
601/// subscription.shutdown().await?;
602/// ```
603pub struct SubscriptionBuilder<P: 'static, E: Executor> {
604 key: String,
605 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
606 context: context::RwContext,
607 routing_key: RoutingKey,
608 delay: Option<Duration>,
609 chunk_size: u16,
610 is_accept_failure: bool,
611 retry: Option<u8>,
612 aggregators: HashMap<String, String>,
613}
614
615impl<P, E: Executor + 'static> SubscriptionBuilder<P, E> {
616 /// Adds shared data to the load context.
617 ///
618 /// Data added here is accessible in handlers via the context.
619 pub fn data<D: Send + Sync + 'static>(self, v: D) -> Self {
620 self.context.insert(v);
621
622 self
623 }
624
625 /// Allows the subscription to continue after handler failures.
626 ///
627 /// By default, subscriptions stop on the first error. With this flag,
628 /// errors are logged but processing continues.
629 pub fn accept_failure(mut self) -> Self {
630 self.is_accept_failure = true;
631
632 self
633 }
634
635 /// Sets the number of events to process per batch.
636 ///
637 /// Default is 300.
638 pub fn chunk_size(mut self, v: u16) -> Self {
639 self.chunk_size = v;
640
641 self
642 }
643
644 /// Sets a delay before starting the subscription.
645 ///
646 /// Useful for staggering subscription starts in multi-node deployments.
647 pub fn delay(mut self, v: Duration) -> Self {
648 self.delay = Some(v);
649
650 self
651 }
652
653 /// Filters events by routing key.
654 ///
655 /// Only events with the matching routing key will be processed.
656 pub fn routing_key(mut self, v: impl Into<String>) -> Self {
657 self.routing_key = RoutingKey::Value(Some(v.into()));
658
659 self
660 }
661
662 /// Sets the maximum number of retries on failure.
663 ///
664 /// Uses exponential backoff. Default is 30.
665 pub fn retry(mut self, v: u8) -> Self {
666 self.retry = Some(v);
667
668 self
669 }
670
671 fn without_retry(mut self) -> Self {
672 self.retry = None;
673
674 self
675 }
676
677 /// Processes all events regardless of routing key.
678 pub fn all(mut self) -> Self {
679 self.routing_key = RoutingKey::All;
680
681 self
682 }
683
684 /// Adds a related aggregate to process events from.
685 pub fn aggregator<A: Aggregator>(mut self, id: impl Into<String>) -> Self {
686 self.aggregators
687 .insert(A::aggregator_type().to_owned(), id.into());
688
689 self
690 }
691
692 fn read_aggregators(&self) -> Vec<ReadAggregator> {
693 self.handlers
694 .values()
695 .map(|h| match self.aggregators.get(h.aggregator_type()) {
696 Some(id) => ReadAggregator {
697 aggregator_type: h.aggregator_type().to_owned(),
698 aggregator_id: Some(id.to_owned()),
699 name: Some(h.event_name().to_owned()),
700 },
701 _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
702 })
703 .collect()
704 }
705
706 fn key(&self) -> String {
707 if let RoutingKey::Value(Some(ref key)) = self.routing_key {
708 return format!("{key}.{}", self.key);
709 }
710
711 self.key.to_owned()
712 }
713
714 async fn process(
715 &self,
716 executor: &E,
717 id: &Ulid,
718 aggregators: &[ReadAggregator],
719 mut rx: Option<&mut tokio::sync::oneshot::Receiver<()>>,
720 ) -> anyhow::Result<()> {
721 let mut interval = interval_at(
722 Instant::now() - Duration::from_millis(400),
723 Duration::from_millis(300),
724 );
725
726 loop {
727 interval.tick().await;
728
729 if !executor.is_subscriber_running(self.key(), *id).await? {
730 return Ok(());
731 }
732
733 let cursor = executor.get_subscriber_cursor(self.key()).await?;
734
735 let timestamp = executor
736 .read(
737 Some(aggregators.to_vec()),
738 Some(self.routing_key.to_owned()),
739 Args::backward(1, None),
740 )
741 .await?
742 .edges
743 .last()
744 .map(|e| e.node.timestamp)
745 .unwrap_or_default();
746
747 let res = executor
748 .read(
749 Some(aggregators.to_vec()),
750 Some(self.routing_key.to_owned()),
751 Args::forward(self.chunk_size, cursor),
752 )
753 .await?;
754
755 if res.edges.is_empty() {
756 return Ok(());
757 }
758
759 let context = Context {
760 context: self.context.clone(),
761 executor,
762 };
763
764 for event in res.edges {
765 if let Some(rx) = rx.as_mut() {
766 if rx.try_recv().is_ok() {
767 tracing::info!(
768 key = self.key(),
769 "Subscription received shutdown signal, stopping gracefull"
770 );
771
772 return Ok(());
773 }
774 }
775
776 tracing::Span::current().record("aggregator_type", &event.node.aggregator_type);
777 tracing::Span::current().record("aggregator_id", &event.node.aggregator_id);
778 tracing::Span::current().record("event", &event.node.name);
779
780 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
781 let Some(handler) = self.handlers.get(&key) else {
782 panic!("No handler found for {}/{key}", self.key());
783 };
784
785 handler.handle(&context, &event.node).await?;
786
787 executor
788 .acknowledge(
789 self.key(),
790 event.cursor.to_owned(),
791 timestamp - event.node.timestamp,
792 )
793 .await?;
794 }
795 }
796 }
797
798 /// Starts the subscription without retry logic.
799 ///
800 /// Equivalent to calling `start()` with retries disabled.
801 pub async fn unretry_start(self, executor: &E) -> anyhow::Result<Subscription>
802 where
803 E: Clone,
804 {
805 self.without_retry().start(executor).await
806 }
807
808 /// Starts a continuous background subscription.
809 ///
810 /// Returns a [`Subscription`] handle that can be used for graceful shutdown.
811 /// The subscription runs in a spawned tokio task and polls for new events.
812 pub async fn start(self, executor: &E) -> anyhow::Result<Subscription>
813 where
814 E: Clone,
815 {
816 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
817 let executor = executor.clone();
818 let id = Ulid::new();
819 let subscription_id = id;
820
821 executor
822 .upsert_subscriber(self.key(), id.to_owned())
823 .await?;
824
825 let task_handle = tokio::spawn(async move {
826 let read_aggregators = self.read_aggregators();
827 let start = self
828 .delay
829 .map(|d| Instant::now() + d)
830 .unwrap_or_else(Instant::now);
831
832 let mut interval = interval_at(
833 start - Duration::from_millis(1200),
834 Duration::from_millis(1000),
835 );
836
837 loop {
838 if shutdown_rx.try_recv().is_ok() {
839 tracing::info!(
840 key = self.key(),
841 "Subscription received shutdown signal, stopping gracefull"
842 );
843
844 break;
845 }
846
847 interval.tick().await;
848
849 let _ = tracing::error_span!(
850 "start",
851 key = self.key(),
852 aggregator_type = tracing::field::Empty,
853 aggregator_id = tracing::field::Empty,
854 event = tracing::field::Empty,
855 )
856 .entered();
857
858 let result = match self.retry {
859 Some(retry) => {
860 (|| async { self.process(&executor, &id, &read_aggregators, None).await })
861 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
862 .sleep(tokio::time::sleep)
863 .notify(|err, dur| {
864 tracing::error!(
865 error = %err,
866 duration = ?dur,
867 "Failed to process event"
868 );
869 })
870 .await
871 }
872 _ => self.process(&executor, &id, &read_aggregators, None).await,
873 };
874
875 let Err(err) = result else {
876 continue;
877 };
878
879 tracing::error!(error = %err, "Failed to process event");
880
881 if !self.is_accept_failure {
882 break;
883 }
884 }
885 });
886
887 Ok(Subscription {
888 id: subscription_id,
889 task_handle,
890 shutdown_tx,
891 })
892 }
893
894 /// Executes the subscription once without retry logic.
895 ///
896 /// Processes all pending events and returns. Does not poll for new events.
897 pub async fn unretry_execute(self, executor: &E) -> anyhow::Result<()> {
898 self.without_retry().execute(executor).await
899 }
900
901 /// Executes the subscription once, processing all pending events.
902 ///
903 /// Unlike `start()`, this does not run continuously. It processes
904 /// all currently pending events and returns.
905 pub async fn execute(&self, executor: &E) -> anyhow::Result<()> {
906 let id = Ulid::new();
907
908 executor
909 .upsert_subscriber(self.key(), id.to_owned())
910 .await?;
911
912 let read_aggregators = self.read_aggregators();
913
914 let _ = tracing::error_span!(
915 "execute",
916 key = self.key(),
917 aggregator_type = tracing::field::Empty,
918 aggregator_id = tracing::field::Empty,
919 event = tracing::field::Empty,
920 )
921 .entered();
922
923 match self.retry {
924 Some(retry) => {
925 (|| async { self.process(executor, &id, &read_aggregators, None).await })
926 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
927 .sleep(tokio::time::sleep)
928 .notify(|err, dur| {
929 tracing::error!(
930 error = %err,
931 duration = ?dur,
932 "Failed to process event"
933 );
934 })
935 .await
936 }
937 _ => self.process(executor, &id, &read_aggregators, None).await,
938 }
939 }
940}
941
942/// Handle to a running event subscription.
943///
944/// Returned by [`SubscriptionBuilder::start`], this handle provides
945/// the subscription ID and a method for graceful shutdown.
946///
947/// # Example
948///
949/// ```rust,ignore
950/// let subscription = projection
951/// .subscription()
952/// .start(&executor)
953/// .await?;
954///
955/// println!("Started subscription: {}", subscription.id);
956///
957/// // On application shutdown
958/// subscription.shutdown().await?;
959/// ```
960#[derive(Debug)]
961pub struct Subscription {
962 /// Unique ID for this subscription instance
963 pub id: Ulid,
964 task_handle: tokio::task::JoinHandle<()>,
965 shutdown_tx: tokio::sync::oneshot::Sender<()>,
966}
967
968impl Subscription {
969 /// Gracefully shuts down the subscription.
970 ///
971 /// Signals the subscription to stop and waits for it to finish
972 /// processing the current event before returning.
973 pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
974 let _ = self.shutdown_tx.send(());
975
976 self.task_handle.await
977 }
978}