evento_core/projection.rs
1//! Projections and event subscriptions.
2//!
3//! This module provides the core building blocks for event sourcing:
4//! - Projections that build read models from events
5//! - Subscriptions that continuously process events
6//! - Loading aggregate state from event streams
7//!
8//! # Key Types
9//!
10//! - [`Projection`] - Defines handlers for building projections
11//! - [`LoadBuilder`] - Loads aggregate state from events
12//! - [`SubscriptionBuilder`] - Builds continuous event subscriptions
13//! - [`Subscription`] - Handle to a running subscription
14//! - [`EventData`] - Typed event with deserialized data and metadata
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use evento::projection::Projection;
20//!
21//! // Define a projection with event handlers
22//! let projection = Projection::<AccountView, _>::new("accounts")
23//! .handler(account_opened)
24//! .handler(money_deposited);
25//!
26//! // Load aggregate state
27//! let result = projection
28//! .load::<Account>("account-123")
29//! .execute(&executor)
30//! .await?;
31//!
32//! // Or start a subscription
33//! let subscription = projection
34//! .subscription()
35//! .routing_key("accounts")
36//! .start(&executor)
37//! .await?;
38//! ```
39
40use std::{
41 collections::HashMap,
42 future::Future,
43 ops::{Deref, DerefMut},
44 pin::Pin,
45 time::Duration,
46};
47use tokio::time::{interval_at, Instant};
48use ulid::Ulid;
49
50use backon::{ExponentialBuilder, Retryable};
51
52use crate::{context, cursor::Args, Executor, ReadAggregator};
53
54/// Filter for events by routing key.
55///
56/// Routing keys allow partitioning events for parallel processing
57/// or filtering subscriptions to specific event streams.
58#[derive(Clone)]
59pub enum RoutingKey {
60 /// Match all events regardless of routing key
61 All,
62 /// Match events with a specific routing key (or no key if `None`)
63 Value(Option<String>),
64}
65
66/// Handler context providing access to executor and shared data.
67///
68/// `Context` wraps an [`RwContext`](crate::context::RwContext) for type-safe
69/// data storage and provides access to the executor for database operations.
70///
71/// # Example
72///
73/// ```rust,ignore
74/// #[evento::handler]
75/// async fn my_handler<E: Executor>(
76/// event: Event<MyEventData>,
77/// action: Action<'_, MyView, E>,
78/// ) -> anyhow::Result<()> {
79/// if let Action::Handle(ctx) = action {
80/// // Access shared data
81/// let config: Data<AppConfig> = ctx.extract();
82///
83/// // Use executor for queries
84/// let events = ctx.executor.read(...).await?;
85/// }
86/// Ok(())
87/// }
88/// ```
89#[derive(Clone)]
90pub struct Context<'a, E: Executor> {
91 context: context::RwContext,
92 /// Reference to the executor for database operations
93 pub executor: &'a E,
94}
95
96impl<'a, E: Executor> Deref for Context<'a, E> {
97 type Target = context::RwContext;
98
99 fn deref(&self) -> &Self::Target {
100 &self.context
101 }
102}
103
104/// Trait for aggregate types.
105///
106/// Aggregates are the root entities in event sourcing. Each aggregate
107/// type has a unique identifier string used for event storage and routing.
108///
109/// This trait is typically derived using the `#[evento::aggregator]` macro.
110///
111/// # Example
112///
113/// ```rust,ignore
114/// #[evento::aggregator("myapp/Account")]
115/// #[derive(Default)]
116/// pub struct Account {
117/// pub balance: i64,
118/// pub owner: String,
119/// }
120/// ```
121pub trait Aggregator: Default {
122 /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
123 fn aggregator_type() -> &'static str;
124}
125
126/// Trait for event types.
127///
128/// Events represent state changes that have occurred. Each event type
129/// has a name and belongs to an aggregator type.
130///
131/// This trait is typically derived using the `#[evento::aggregator]` macro.
132///
133/// # Example
134///
135/// ```rust,ignore
136/// #[evento::aggregator("myapp/Account")]
137/// #[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
138/// pub struct AccountOpened {
139/// pub owner: String,
140/// }
141/// ```
142pub trait Event: Aggregator {
143 /// Returns the event name (e.g., "AccountOpened")
144 fn event_name() -> &'static str;
145}
146
147/// Trait for event handlers.
148///
149/// Handlers process events in two modes:
150/// - `handle`: For subscriptions that perform side effects (send emails, update read models)
151/// - `apply`: For loading aggregate state by replaying events
152///
153/// This trait is typically implemented via the `#[evento::handler]` macro.
154pub trait Handler<P: 'static, E: Executor>: Sync + Send {
155 /// Handles an event during subscription processing.
156 ///
157 /// This is called when processing events in a subscription context,
158 /// where side effects like database updates or API calls are appropriate.
159 fn handle<'a>(
160 &'a self,
161 context: &'a Context<'a, E>,
162 event: &'a crate::Event,
163 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
164
165 /// Applies an event to build projection state.
166 ///
167 /// This is called when loading aggregate state by replaying events.
168 /// It should be a pure function that modifies the projection without side effects.
169 fn apply<'a>(
170 &'a self,
171 projection: &'a mut P,
172 event: &'a crate::Event,
173 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>;
174
175 /// Returns the aggregator type this handler processes.
176 fn aggregator_type(&self) -> &'static str;
177 /// Returns the event name this handler processes.
178 fn event_name(&self) -> &'static str;
179}
180
181/// Action passed to event handlers.
182///
183/// Determines whether the handler should apply state changes or
184/// handle the event with side effects.
185pub enum Action<'a, P: 'static, E: Executor> {
186 /// Apply event to projection state (for loading)
187 Apply(&'a mut P),
188 /// Handle event with context (for subscriptions)
189 Handle(&'a Context<'a, E>),
190}
191
192/// Typed event with deserialized data and metadata.
193///
194/// `EventData` wraps a raw [`Event`](crate::Event) and provides typed access
195/// to the deserialized event data and metadata. It implements `Deref` to
196/// provide access to the underlying event fields (id, timestamp, version, etc.).
197///
198/// # Type Parameters
199///
200/// - `D`: The event data type (e.g., `AccountOpened`)
201/// - `M`: The metadata type (defaults to `bool` for no metadata)
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use evento::metadata::Event;
207///
208/// #[evento::handler]
209/// async fn handle_deposit<E: Executor>(
210/// event: Event<MoneyDeposited>,
211/// action: Action<'_, AccountView, E>,
212/// ) -> anyhow::Result<()> {
213/// // Access typed data
214/// println!("Amount: {}", event.data.amount);
215///
216/// // Access metadata
217/// if let Ok(user) = event.metadata.user() {
218/// println!("By user: {}", user);
219/// }
220///
221/// // Access underlying event fields via Deref
222/// println!("Event ID: {}", event.id);
223/// println!("Version: {}", event.version);
224///
225/// Ok(())
226/// }
227/// ```
228pub struct EventData<D, M = bool> {
229 event: crate::Event,
230 /// The typed event data
231 pub data: D,
232 /// The typed event metadata
233 pub metadata: M,
234}
235
236impl<D, M> Deref for EventData<D, M> {
237 type Target = crate::Event;
238
239 fn deref(&self) -> &Self::Target {
240 &self.event
241 }
242}
243
244impl<D, M> TryFrom<&crate::Event> for EventData<D, M>
245where
246 D: rkyv::Archive,
247 M: rkyv::Archive,
248 <D as rkyv::Archive>::Archived: for<'a> rkyv::bytecheck::CheckBytes<
249 rkyv::rancor::Strategy<
250 rkyv::validation::Validator<
251 rkyv::validation::archive::ArchiveValidator<'a>,
252 rkyv::validation::shared::SharedValidator,
253 >,
254 rkyv::rancor::Error,
255 >,
256 > + rkyv::Deserialize<D, rkyv::rancor::Strategy<rkyv::de::Pool, rkyv::rancor::Error>>,
257 <M as rkyv::Archive>::Archived: for<'a> rkyv::bytecheck::CheckBytes<
258 rkyv::rancor::Strategy<
259 rkyv::validation::Validator<
260 rkyv::validation::archive::ArchiveValidator<'a>,
261 rkyv::validation::shared::SharedValidator,
262 >,
263 rkyv::rancor::Error,
264 >,
265 > + rkyv::Deserialize<M, rkyv::rancor::Strategy<rkyv::de::Pool, rkyv::rancor::Error>>,
266{
267 type Error = rkyv::rancor::Error;
268
269 fn try_from(value: &crate::Event) -> Result<Self, Self::Error> {
270 let data = rkyv::from_bytes::<D, rkyv::rancor::Error>(&value.data)?;
271 let metadata = rkyv::from_bytes::<M, rkyv::rancor::Error>(&value.metadata)?;
272 Ok(EventData {
273 data,
274 metadata,
275 event: value.clone(),
276 })
277 }
278}
279
280/// Container for event handlers that build a projection.
281///
282/// A `Projection` groups related event handlers together and provides
283/// methods to load aggregate state or create subscriptions.
284///
285/// # Type Parameters
286///
287/// - `P`: The projection/view type being built
288/// - `E`: The executor type for database operations
289///
290/// # Example
291///
292/// ```rust,ignore
293/// let projection = Projection::<AccountView, _>::new("accounts")
294/// .handler(account_opened)
295/// .handler(money_deposited)
296/// .handler(money_withdrawn);
297///
298/// // Use for loading state
299/// let state = projection.clone()
300/// .load::<Account>("account-123")
301/// .execute(&executor)
302/// .await?;
303///
304/// // Or create a subscription
305/// let sub = projection
306/// .subscription()
307/// .start(&executor)
308/// .await?;
309/// ```
310pub struct Projection<P: 'static, E: Executor> {
311 key: String,
312 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
313}
314
315impl<P: 'static, E: Executor> Projection<P, E> {
316 /// Creates a new projection with the given key.
317 ///
318 /// The key is used as the subscription identifier for cursor tracking.
319 pub fn new(key: impl Into<String>) -> Self {
320 Self {
321 key: key.into(),
322 handlers: HashMap::new(),
323 }
324 }
325
326 /// Registers an event handler with this projection.
327 ///
328 /// # Panics
329 ///
330 /// Panics if a handler for the same event type is already registered.
331 pub fn handler<H: Handler<P, E> + 'static>(mut self, h: H) -> Self {
332 let key = format!("{}_{}", h.aggregator_type(), h.event_name());
333 if self.handlers.insert(key.to_owned(), Box::new(h)).is_some() {
334 panic!("Cannot register event handler: key {} already exists", key);
335 }
336 self
337 }
338
339 /// Creates a builder for loading aggregate state.
340 ///
341 /// This consumes the projection and returns a [`LoadBuilder`] configured
342 /// to load the state for the specified aggregate.
343 ///
344 /// # Type Parameters
345 ///
346 /// - `A`: The aggregate type to load
347 pub fn load<A: Aggregator>(self, id: impl Into<String>) -> LoadBuilder<P, E>
348 where
349 P: Snapshot + Default,
350 {
351 let id = id.into();
352 let mut aggregators = HashMap::new();
353 aggregators.insert(A::aggregator_type().to_owned(), id.to_owned());
354
355 LoadBuilder {
356 key: self.key.to_owned(),
357 id,
358 aggregators,
359 handlers: self.handlers,
360 context: Default::default(),
361 }
362 }
363
364 /// Creates a builder for a continuous event subscription.
365 ///
366 /// This consumes the projection and returns a [`SubscriptionBuilder`]
367 /// that can be configured and started.
368 pub fn subscription(self) -> SubscriptionBuilder<P, E> {
369 SubscriptionBuilder {
370 key: self.key.to_owned(),
371 context: Default::default(),
372 handlers: self.handlers,
373 delay: None,
374 retry: Some(30),
375 chunk_size: 300,
376 is_accept_failure: false,
377 routing_key: RoutingKey::Value(None),
378 aggregators: Default::default(),
379 }
380 }
381}
382
383/// Result of loading an aggregate's state.
384///
385/// Contains the rebuilt projection state along with the current version
386/// and routing key. Implements `Deref` and `DerefMut` for transparent
387/// access to the inner item.
388///
389/// # Example
390///
391/// ```rust,ignore
392/// let result: LoadResult<AccountView> = projection
393/// .load::<Account>("account-123")
394/// .execute(&executor)
395/// .await?
396/// .expect("Account not found");
397///
398/// // Access inner item via Deref
399/// println!("Balance: {}", result.balance);
400///
401/// // Access metadata
402/// println!("Version: {}", result.version);
403/// println!("Routing key: {:?}", result.routing_key);
404/// ```
405#[derive(Debug, Clone, Default)]
406pub struct LoadResult<A> {
407 /// The loaded projection/view state
408 pub item: A,
409 /// Current version of the aggregate
410 pub version: u16,
411 /// Routing key for the aggregate (if set)
412 pub routing_key: Option<String>,
413}
414
415impl<A> Deref for LoadResult<A> {
416 type Target = A;
417 fn deref(&self) -> &Self::Target {
418 &self.item
419 }
420}
421
422impl<A> DerefMut for LoadResult<A> {
423 fn deref_mut(&mut self) -> &mut Self::Target {
424 &mut self.item
425 }
426}
427
428/// Type alias for an optional load result.
429pub type OptionLoadResult<A> = Option<LoadResult<A>>;
430
431/// Trait for types that can be restored from snapshots.
432///
433/// Snapshots provide a performance optimization by storing pre-computed
434/// state, avoiding the need to replay all events from the beginning.
435///
436/// This trait is typically implemented via the `#[evento::snapshot]` macro.
437///
438/// # Example
439///
440/// ```rust,ignore
441/// #[evento::snapshot]
442/// #[derive(Default)]
443/// pub struct AccountView {
444/// pub balance: i64,
445/// pub owner: String,
446/// }
447///
448/// // The macro generates the restore implementation that loads
449/// // from a snapshot table if available
450/// ```
451pub trait Snapshot: Sized {
452 /// Restores state from a snapshot if available.
453 ///
454 /// Returns `None` if no snapshot exists for the given ID.
455 fn restore<'a>(
456 context: &'a context::RwContext,
457 id: String,
458 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OptionLoadResult<Self>>> + Send + 'a>>;
459}
460
461/// Builder for loading aggregate state from events.
462///
463/// Created via [`Projection::load`], this builder configures how to
464/// load an aggregate's state by replaying events.
465///
466/// # Example
467///
468/// ```rust,ignore
469/// let result = projection
470/// .load::<Account>("account-123")
471/// .data(app_config) // Add shared data
472/// .aggregator::<User>("user-456") // Add related aggregate
473/// .execute(&executor)
474/// .await?;
475/// ```
476pub struct LoadBuilder<P: Snapshot + Default + 'static, E: Executor> {
477 key: String,
478 id: String,
479 aggregators: HashMap<String, String>,
480 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
481 context: context::RwContext,
482}
483
484impl<P: Snapshot + Default + 'static, E: Executor> LoadBuilder<P, E> {
485 /// Adds shared data to the load context.
486 ///
487 /// Data added here is accessible in handlers via the context.
488 pub fn data<D: Send + Sync + 'static>(&mut self, v: D) -> &mut Self {
489 self.context.insert(v);
490
491 self
492 }
493
494 /// Adds a related aggregate to load events from.
495 ///
496 /// Use this when the projection needs events from multiple aggregates.
497 pub fn aggregator<A: Aggregator>(&mut self, id: impl Into<String>) -> &mut Self {
498 self.aggregators
499 .insert(A::aggregator_type().to_owned(), id.into());
500
501 self
502 }
503
504 /// Executes the load operation, returning the rebuilt state.
505 ///
506 /// Returns `None` if no events exist for the aggregate.
507 /// Returns `Err` if there are too many events to process in one batch.
508 pub async fn execute(&self, executor: &E) -> anyhow::Result<Option<LoadResult<P>>> {
509 let context = Context {
510 context: self.context.clone(),
511 executor,
512 };
513
514 let cursor = executor.get_subscriber_cursor(self.key.to_owned()).await?;
515 let loaded = P::restore(&context, self.id.to_owned()).await?;
516 let has_loaded = loaded.is_some();
517 let mut snapshot = loaded.unwrap_or_default();
518
519 let read_aggregators = self
520 .handlers
521 .values()
522 .map(|h| match self.aggregators.get(h.aggregator_type()) {
523 Some(id) => ReadAggregator {
524 aggregator_type: h.aggregator_type().to_owned(),
525 aggregator_id: Some(id.to_owned()),
526 name: None,
527 },
528 _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
529 })
530 .collect::<Vec<_>>();
531
532 let events = executor
533 .read(
534 Some(read_aggregators.to_vec()),
535 None,
536 Args::forward(100, cursor.clone()),
537 )
538 .await?;
539
540 for event in events.edges.iter() {
541 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
542 let Some(handler) = self.handlers.get(&key) else {
543 tracing::debug!("No handler found for {}/{key}", self.key);
544 continue;
545 };
546
547 handler.apply(&mut snapshot, &event.node).await?;
548 }
549
550 if events.page_info.has_next_page {
551 anyhow::bail!("Too busy");
552 }
553
554 if let Some(event) = events.edges.last() {
555 snapshot.version = event.node.version;
556 snapshot.routing_key = event.node.routing_key.to_owned();
557
558 return Ok(Some(snapshot));
559 }
560
561 if !has_loaded {
562 return Ok(None);
563 }
564
565 let events = executor
566 .read(
567 Some(read_aggregators.to_vec()),
568 None,
569 Args::backward(1, None),
570 )
571 .await?;
572
573 if let Some(event) = events.edges.first() {
574 snapshot.version = event.node.version;
575 snapshot.routing_key = event.node.routing_key.to_owned();
576
577 return Ok(Some(snapshot));
578 }
579
580 Ok(None)
581 }
582}
583
584/// Builder for creating event subscriptions.
585///
586/// Created via [`Projection::subscription`], this builder configures
587/// a continuous event processing subscription with retry logic,
588/// routing key filtering, and graceful shutdown support.
589///
590/// # Example
591///
592/// ```rust,ignore
593/// let subscription = projection
594/// .subscription()
595/// .routing_key("accounts")
596/// .chunk_size(100)
597/// .retry(5)
598/// .delay(Duration::from_secs(10))
599/// .start(&executor)
600/// .await?;
601///
602/// // Later, gracefully shutdown
603/// subscription.shutdown().await?;
604/// ```
605pub struct SubscriptionBuilder<P: 'static, E: Executor> {
606 key: String,
607 handlers: HashMap<String, Box<dyn Handler<P, E>>>,
608 context: context::RwContext,
609 routing_key: RoutingKey,
610 delay: Option<Duration>,
611 chunk_size: u16,
612 is_accept_failure: bool,
613 retry: Option<u8>,
614 aggregators: HashMap<String, String>,
615}
616
617impl<P, E: Executor + 'static> SubscriptionBuilder<P, E> {
618 /// Allows the subscription to continue after handler failures.
619 ///
620 /// By default, subscriptions stop on the first error. With this flag,
621 /// errors are logged but processing continues.
622 pub fn accept_failure(mut self) -> Self {
623 self.is_accept_failure = true;
624
625 self
626 }
627
628 /// Sets the number of events to process per batch.
629 ///
630 /// Default is 300.
631 pub fn chunk_size(mut self, v: u16) -> Self {
632 self.chunk_size = v;
633
634 self
635 }
636
637 /// Sets a delay before starting the subscription.
638 ///
639 /// Useful for staggering subscription starts in multi-node deployments.
640 pub fn delay(mut self, v: Duration) -> Self {
641 self.delay = Some(v);
642
643 self
644 }
645
646 /// Filters events by routing key.
647 ///
648 /// Only events with the matching routing key will be processed.
649 pub fn routing_key(mut self, v: impl Into<String>) -> Self {
650 self.routing_key = RoutingKey::Value(Some(v.into()));
651
652 self
653 }
654
655 /// Sets the maximum number of retries on failure.
656 ///
657 /// Uses exponential backoff. Default is 30.
658 pub fn retry(mut self, v: u8) -> Self {
659 self.retry = Some(v);
660
661 self
662 }
663
664 fn without_retry(mut self) -> Self {
665 self.retry = None;
666
667 self
668 }
669
670 /// Processes all events regardless of routing key.
671 pub fn all(mut self) -> Self {
672 self.routing_key = RoutingKey::All;
673
674 self
675 }
676
677 /// Adds a related aggregate to process events from.
678 pub fn aggregator<A: Aggregator>(mut self, id: impl Into<String>) -> Self {
679 self.aggregators
680 .insert(A::aggregator_type().to_owned(), id.into());
681
682 self
683 }
684
685 fn read_aggregators(&self) -> Vec<ReadAggregator> {
686 self.handlers
687 .values()
688 .map(|h| match self.aggregators.get(h.aggregator_type()) {
689 Some(id) => ReadAggregator {
690 aggregator_type: h.aggregator_type().to_owned(),
691 aggregator_id: Some(id.to_owned()),
692 name: Some(h.event_name().to_owned()),
693 },
694 _ => ReadAggregator::event(h.aggregator_type(), h.event_name()),
695 })
696 .collect()
697 }
698
699 fn key(&self) -> String {
700 if let RoutingKey::Value(Some(ref key)) = self.routing_key {
701 return format!("{key}.{}", self.key);
702 }
703
704 self.key.to_owned()
705 }
706
707 async fn process(
708 &self,
709 executor: &E,
710 id: &Ulid,
711 aggregators: &[ReadAggregator],
712 mut rx: Option<&mut tokio::sync::oneshot::Receiver<()>>,
713 ) -> anyhow::Result<()> {
714 let mut interval = interval_at(
715 Instant::now() - Duration::from_millis(400),
716 Duration::from_millis(300),
717 );
718
719 loop {
720 interval.tick().await;
721
722 if !executor.is_subscriber_running(self.key(), *id).await? {
723 return Ok(());
724 }
725
726 let cursor = executor.get_subscriber_cursor(self.key()).await?;
727
728 let timestamp = executor
729 .read(
730 Some(aggregators.to_vec()),
731 Some(self.routing_key.to_owned()),
732 Args::backward(1, None),
733 )
734 .await?
735 .edges
736 .last()
737 .map(|e| e.node.timestamp)
738 .unwrap_or_default();
739
740 let res = executor
741 .read(
742 Some(aggregators.to_vec()),
743 Some(self.routing_key.to_owned()),
744 Args::forward(self.chunk_size, cursor),
745 )
746 .await?;
747
748 if res.edges.is_empty() {
749 return Ok(());
750 }
751
752 let context = Context {
753 context: self.context.clone(),
754 executor,
755 };
756
757 for event in res.edges {
758 if let Some(rx) = rx.as_mut() {
759 if rx.try_recv().is_ok() {
760 tracing::info!(
761 key = self.key(),
762 "Subscription received shutdown signal, stopping gracefull"
763 );
764
765 return Ok(());
766 }
767 }
768
769 tracing::Span::current().record("aggregator_type", &event.node.aggregator_type);
770 tracing::Span::current().record("aggregator_id", &event.node.aggregator_id);
771 tracing::Span::current().record("event", &event.node.name);
772
773 let key = format!("{}_{}", event.node.aggregator_type, event.node.name);
774 let Some(handler) = self.handlers.get(&key) else {
775 panic!("No handler found for {}/{key}", self.key());
776 };
777
778 handler.handle(&context, &event.node).await?;
779
780 executor
781 .acknowledge(
782 self.key(),
783 event.cursor.to_owned(),
784 timestamp - event.node.timestamp,
785 )
786 .await?;
787 }
788 }
789 }
790
791 /// Starts the subscription without retry logic.
792 ///
793 /// Equivalent to calling `start()` with retries disabled.
794 pub async fn unretry_start(self, executor: &E) -> anyhow::Result<Subscription>
795 where
796 E: Clone,
797 {
798 self.without_retry().start(executor).await
799 }
800
801 /// Starts a continuous background subscription.
802 ///
803 /// Returns a [`Subscription`] handle that can be used for graceful shutdown.
804 /// The subscription runs in a spawned tokio task and polls for new events.
805 pub async fn start(self, executor: &E) -> anyhow::Result<Subscription>
806 where
807 E: Clone,
808 {
809 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
810 let executor = executor.clone();
811 let id = Ulid::new();
812 let subscription_id = id;
813
814 executor
815 .upsert_subscriber(self.key(), id.to_owned())
816 .await?;
817
818 let task_handle = tokio::spawn(async move {
819 let read_aggregators = self.read_aggregators();
820 let start = self
821 .delay
822 .map(|d| Instant::now() + d)
823 .unwrap_or_else(Instant::now);
824
825 let mut interval = interval_at(
826 start - Duration::from_millis(1200),
827 Duration::from_millis(1000),
828 );
829
830 loop {
831 if shutdown_rx.try_recv().is_ok() {
832 tracing::info!(
833 key = self.key(),
834 "Subscription received shutdown signal, stopping gracefull"
835 );
836
837 break;
838 }
839
840 interval.tick().await;
841
842 let _ = tracing::error_span!(
843 "start",
844 key = self.key(),
845 aggregator_type = tracing::field::Empty,
846 aggregator_id = tracing::field::Empty,
847 event = tracing::field::Empty,
848 )
849 .entered();
850
851 let result = match self.retry {
852 Some(retry) => {
853 (|| async { self.process(&executor, &id, &read_aggregators, None).await })
854 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
855 .sleep(tokio::time::sleep)
856 .notify(|err, dur| {
857 tracing::error!(
858 error = %err,
859 duration = ?dur,
860 "Failed to process event"
861 );
862 })
863 .await
864 }
865 _ => self.process(&executor, &id, &read_aggregators, None).await,
866 };
867
868 let Err(err) = result else {
869 continue;
870 };
871
872 tracing::error!(error = %err, "Failed to process event");
873
874 if !self.is_accept_failure {
875 break;
876 }
877 }
878 });
879
880 Ok(Subscription {
881 id: subscription_id,
882 task_handle,
883 shutdown_tx,
884 })
885 }
886
887 /// Executes the subscription once without retry logic.
888 ///
889 /// Processes all pending events and returns. Does not poll for new events.
890 pub async fn unretry_execute(self, executor: &E) -> anyhow::Result<()> {
891 self.without_retry().execute(executor).await
892 }
893
894 /// Executes the subscription once, processing all pending events.
895 ///
896 /// Unlike `start()`, this does not run continuously. It processes
897 /// all currently pending events and returns.
898 pub async fn execute(&self, executor: &E) -> anyhow::Result<()> {
899 let id = Ulid::new();
900
901 executor
902 .upsert_subscriber(self.key(), id.to_owned())
903 .await?;
904
905 let read_aggregators = self.read_aggregators();
906
907 let _ = tracing::error_span!(
908 "execute",
909 key = self.key(),
910 aggregator_type = tracing::field::Empty,
911 aggregator_id = tracing::field::Empty,
912 event = tracing::field::Empty,
913 )
914 .entered();
915
916 match self.retry {
917 Some(retry) => {
918 (|| async { self.process(executor, &id, &read_aggregators, None).await })
919 .retry(ExponentialBuilder::default().with_max_times(retry.into()))
920 .sleep(tokio::time::sleep)
921 .notify(|err, dur| {
922 tracing::error!(
923 error = %err,
924 duration = ?dur,
925 "Failed to process event"
926 );
927 })
928 .await
929 }
930 _ => self.process(executor, &id, &read_aggregators, None).await,
931 }
932 }
933}
934
935/// Handle to a running event subscription.
936///
937/// Returned by [`SubscriptionBuilder::start`], this handle provides
938/// the subscription ID and a method for graceful shutdown.
939///
940/// # Example
941///
942/// ```rust,ignore
943/// let subscription = projection
944/// .subscription()
945/// .start(&executor)
946/// .await?;
947///
948/// println!("Started subscription: {}", subscription.id);
949///
950/// // On application shutdown
951/// subscription.shutdown().await?;
952/// ```
953#[derive(Debug)]
954pub struct Subscription {
955 /// Unique ID for this subscription instance
956 pub id: Ulid,
957 task_handle: tokio::task::JoinHandle<()>,
958 shutdown_tx: tokio::sync::oneshot::Sender<()>,
959}
960
961impl Subscription {
962 /// Gracefully shuts down the subscription.
963 ///
964 /// Signals the subscription to stop and waits for it to finish
965 /// processing the current event before returning.
966 pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
967 let _ = self.shutdown_tx.send(());
968
969 self.task_handle.await
970 }
971}