Skip to main content

eventcore_types/
projection.rs

1//! Projection types and traits for building read models from event streams.
2//!
3//! This module provides the core abstractions for event projection:
4//! - `Projector`: Trait for transforming events into read model updates
5//! - `EventReader`: Trait for reading events globally for projections
6//! - `StreamPosition`: Global position in the event stream
7
8use crate::store::StreamPrefix;
9use nutype::nutype;
10use std::future::Future;
11use uuid::Uuid;
12
13/// Context provided to error handler when event processing fails.
14///
15/// This struct bundles together all the information needed to make
16/// informed decisions about how to handle a projection failure.
17///
18/// # Type Parameters
19///
20/// - `E`: The error type returned by the projector's `apply()` method
21///
22/// # Fields
23///
24/// - `error`: Reference to the error that occurred
25/// - `position`: Global stream position where the failure occurred
26/// - `retry_count`: Number of times this event has been retried (0 on first failure)
27#[derive(Debug)]
28pub struct FailureContext<'a, E> {
29    /// Reference to the error that occurred during event processing.
30    pub error: &'a E,
31    /// Global stream position of the event that failed to process.
32    pub position: StreamPosition,
33    /// Number of retry attempts so far (0 on initial failure).
34    pub retry_count: RetryCount,
35}
36
37/// Strategy for handling event processing failures.
38///
39/// When a projector's `apply()` method returns an error, the `on_error()`
40/// callback determines how the projection runner should respond. This enum
41/// represents the available failure strategies.
42///
43/// # Variants
44///
45/// - `Fatal`: Stop processing immediately and return the error
46/// - `Skip`: Log the error and continue processing the next event
47/// - `Retry`: Attempt to reprocess the event according to retry configuration
48///
49/// # Example
50///
51/// ```ignore
52/// fn on_error(
53///     &mut self,
54///     ctx: FailureContext<Self::Error>,
55/// ) -> FailureStrategy {
56///     match ctx.error {
57///         MyError::Transient(_) if ctx.retry_count < 3 => FailureStrategy::Retry,
58///         MyError::PoisonEvent(_) => FailureStrategy::Skip,
59///         _ => FailureStrategy::Fatal,
60///     }
61/// }
62/// ```
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum FailureStrategy {
65    /// Stop processing immediately and return the error to the caller.
66    ///
67    /// Use this when:
68    /// - The error is unrecoverable (e.g., database schema mismatch)
69    /// - The projector requires manual intervention
70    /// - Continuing would corrupt the read model
71    Fatal,
72
73    /// Skip this event and continue processing the next one.
74    ///
75    /// Use this when:
76    /// - The event is malformed or invalid (poison event)
77    /// - Processing this event is not critical
78    /// - Continuing without this event is acceptable
79    Skip,
80
81    /// Retry processing this event according to retry configuration.
82    ///
83    /// Use this when:
84    /// - The error is likely transient (e.g., network timeout)
85    /// - Retrying might succeed
86    /// - The event is important and should not be skipped
87    Retry,
88}
89
90/// Global stream position representing a location in the ordered event log.
91///
92/// StreamPosition uniquely identifies a position in the global event stream
93/// across all individual streams. Used by projectors to track progress and
94/// enable resumable event processing.
95///
96/// Positions are UUID7 values (timestamp-ordered UUIDs) assigned at event
97/// append time. They are monotonically increasing and globally sortable.
98#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display))]
99pub struct StreamPosition(Uuid);
100
101/// Trait for transforming events into read model updates.
102///
103/// Projectors consume events from the event store and update read models.
104/// They implement the "Q" (Query) side of CQRS by building denormalized
105/// views optimized for reading.
106///
107/// # Type Parameters
108///
109/// - `Event`: The domain event type this projector handles
110/// - `Error`: The error type returned when projection fails
111/// - `Context`: Shared context for database connections, caches, etc.
112///
113/// # Required Methods
114///
115/// - `apply`: Process a single event and update the read model
116/// - `name`: Return a unique identifier for this projector
117///
118/// # Example
119///
120/// ```ignore
121/// struct AccountBalanceProjector {
122///     balances: HashMap<AccountId, Money>,
123/// }
124///
125/// impl Projector for AccountBalanceProjector {
126///     type Event = AccountEvent;
127///     type Error = Infallible;
128///     type Context = ();
129///
130///     fn apply(
131///         &mut self,
132///         event: Self::Event,
133///         _position: StreamPosition,
134///         _ctx: &mut Self::Context,
135///     ) -> Result<(), Self::Error> {
136///         match event {
137///             AccountEvent::Deposited { account_id, amount } => {
138///                 *self.balances.entry(account_id).or_default() += amount;
139///             }
140///             AccountEvent::Withdrawn { account_id, amount } => {
141///                 *self.balances.entry(account_id).or_default() -= amount;
142///             }
143///         }
144///         Ok(())
145///     }
146///
147///     fn name(&self) -> &str {
148///         "account-balance"
149///     }
150/// }
151/// ```
152pub trait Projector {
153    /// The domain event type this projector handles.
154    type Event;
155
156    /// The error type returned when projection fails.
157    type Error;
158
159    /// Shared context for database connections, caches, etc.
160    type Context;
161
162    /// Process a single event and update the read model.
163    ///
164    /// This method is called for each event in stream order. Implementations
165    /// should update their read model state based on the event content.
166    ///
167    /// # Parameters
168    ///
169    /// - `event`: The domain event to process
170    /// - `position`: The global stream position of this event
171    /// - `ctx`: Mutable reference to shared context
172    ///
173    /// # Returns
174    ///
175    /// - `Ok(())`: Event was successfully processed
176    /// - `Err(Self::Error)`: Projection failed (triggers error handling)
177    fn apply(
178        &mut self,
179        event: Self::Event,
180        position: StreamPosition,
181        ctx: &mut Self::Context,
182    ) -> Result<(), Self::Error>;
183
184    /// Return a unique identifier for this projector.
185    ///
186    /// The name is used for:
187    /// - Logging and tracing
188    /// - Checkpoint storage (to resume from last position)
189    /// - Coordination (leader election key)
190    ///
191    /// Names should be stable across deployments. Changing a projector's
192    /// name will cause it to reprocess all events from the beginning.
193    fn name(&self) -> &str;
194
195    /// Handle event processing errors and determine failure strategy.
196    ///
197    /// Called when `apply()` returns an error. The projector can inspect
198    /// the error context and decide how the runner should respond.
199    ///
200    /// # Parameters
201    ///
202    /// - `ctx`: Context containing the error, position, and retry count
203    ///
204    /// # Returns
205    ///
206    /// The failure strategy the runner should use:
207    /// - `FailureStrategy::Fatal`: Stop processing and return error
208    /// - `FailureStrategy::Skip`: Skip this event and continue
209    /// - `FailureStrategy::Retry`: Retry processing this event
210    ///
211    /// # Default Implementation
212    ///
213    /// Returns `FailureStrategy::Fatal` for all errors. This is the safest
214    /// default - projectors that need different behavior should override
215    /// this method.
216    ///
217    /// # Example
218    ///
219    /// ```ignore
220    /// fn on_error(
221    ///     &mut self,
222    ///     ctx: FailureContext<Self::Error>,
223    /// ) -> FailureStrategy {
224    ///     match ctx.error {
225    ///         MyError::Transient(_) if ctx.retry_count < 3 => FailureStrategy::Retry,
226    ///         MyError::PoisonEvent(_) => FailureStrategy::Skip,
227    ///         _ => FailureStrategy::Fatal,
228    ///     }
229    /// }
230    /// ```
231    fn on_error(&mut self, _ctx: FailureContext<'_, Self::Error>) -> FailureStrategy {
232        FailureStrategy::Fatal
233    }
234}
235
236/// Batch size domain type for limiting query results.
237///
238/// BatchSize represents the maximum number of events to return in a single
239/// query. Callers are responsible for choosing appropriate batch sizes based
240/// on their memory constraints and use case requirements.
241///
242/// A batch size of zero is valid and will return an empty result set.
243///
244/// # Examples
245///
246/// ```ignore
247/// use eventcore_types::projection::BatchSize;
248///
249/// let small = BatchSize::new(100);
250/// let large = BatchSize::new(1_000_000);
251/// let empty = BatchSize::new(0);  // Valid - returns no events
252/// ```
253#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display))]
254pub struct BatchSize(usize);
255
256/// Maximum number of retry attempts for event processing.
257///
258/// MaxRetryAttempts represents the maximum number of times to retry processing
259/// a failed event before escalating to a fatal error. A value of 0 means no
260/// retries are attempted.
261///
262/// # Examples
263///
264/// ```ignore
265/// use eventcore_types::projection::MaxRetryAttempts;
266///
267/// let no_retries = MaxRetryAttempts::new(0);
268/// let standard = MaxRetryAttempts::new(3);
269/// let aggressive = MaxRetryAttempts::new(10);
270/// let very_aggressive = MaxRetryAttempts::new(1000); // Library doesn't impose limits
271/// ```
272#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
273pub struct MaxRetryAttempts(u32);
274
275/// Backoff multiplier for exponential retry delays.
276///
277/// BackoffMultiplier represents the factor by which retry delays grow on each
278/// attempt. A value of 1.0 means constant delay (no backoff), while values
279/// greater than 1.0 implement exponential backoff.
280///
281/// The minimum value of 1.0 prevents decreasing delays, which would not make
282/// sense for retry backoff. Common values are 2.0 (double each time) or 1.5
283/// (50% increase).
284///
285/// # Examples
286///
287/// ```ignore
288/// use eventcore_types::projection::BackoffMultiplier;
289///
290/// let constant = BackoffMultiplier::try_new(1.0).expect("1.0 is valid");  // No backoff
291/// let standard = BackoffMultiplier::try_new(2.0).expect("2.0 is valid");  // Double each time
292/// let gentle = BackoffMultiplier::try_new(1.5).expect("1.5 is valid");    // 50% increase
293///
294/// // Values below 1.0 are rejected
295/// assert!(BackoffMultiplier::try_new(0.5).is_err());
296/// ```
297#[nutype(
298    validate(greater_or_equal = 1.0),
299    derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
300)]
301pub struct BackoffMultiplier(f64);
302
303/// Maximum number of consecutive poll failures before stopping.
304///
305/// MaxConsecutiveFailures represents the threshold for consecutive errors
306/// during event polling. Must be at least 1, enforced by using NonZeroU32
307/// as the underlying type.
308///
309/// # Examples
310///
311/// ```ignore
312/// use eventcore_types::projection::MaxConsecutiveFailures;
313/// use std::num::NonZeroU32;
314///
315/// let lenient = MaxConsecutiveFailures::new(NonZeroU32::new(10).expect("10 is non-zero"));
316/// let strict = MaxConsecutiveFailures::new(NonZeroU32::new(3).expect("3 is non-zero"));
317///
318/// // Zero failures not allowed by type system
319/// // let zero = NonZeroU32::new(0); // Returns None
320/// ```
321#[nutype(derive(
322    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, AsRef, Into
323))]
324pub struct MaxConsecutiveFailures(std::num::NonZeroU32);
325
326/// Maximum number of retry attempts.
327///
328/// MaxRetries represents the maximum number of times to retry an operation
329/// before giving up. A value of 0 means no retries (fail immediately).
330///
331/// # Examples
332///
333/// ```ignore
334/// use eventcore_types::projection::MaxRetries;
335///
336/// let no_retry = MaxRetries::new(0);
337/// let standard = MaxRetries::new(3);
338/// let aggressive = MaxRetries::new(10);
339/// let very_aggressive = MaxRetries::new(1000); // Library doesn't impose limits
340/// ```
341#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
342pub struct MaxRetries(u32);
343
344/// Delay in milliseconds for retry or backoff operations.
345///
346/// DelayMilliseconds represents a time delay expressed in milliseconds.
347/// Used for retry delays, backoff intervals, and polling intervals.
348///
349/// # Examples
350///
351/// ```ignore
352/// use eventcore_types::projection::DelayMilliseconds;
353///
354/// let short = DelayMilliseconds::new(100);
355/// let medium = DelayMilliseconds::new(1000);
356/// let long = DelayMilliseconds::new(5000);
357/// ```
358#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
359pub struct DelayMilliseconds(u64);
360
361/// Attempt number for retry operations (1-based).
362///
363/// AttemptNumber represents which attempt is currently being made, starting
364/// from 1 for the first attempt. Must be at least 1, enforced by using NonZeroU32
365/// as the underlying type.
366///
367/// # Examples
368///
369/// ```ignore
370/// use eventcore_types::projection::AttemptNumber;
371/// use std::num::NonZeroU32;
372///
373/// let first_attempt = AttemptNumber::new(NonZeroU32::new(1).expect("1 is non-zero"));
374/// let retry_attempt = AttemptNumber::new(NonZeroU32::new(3).expect("3 is non-zero"));
375///
376/// // Zero attempts not allowed by type system
377/// // let zero = NonZeroU32::new(0); // Returns None
378/// ```
379#[nutype(derive(
380    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, AsRef, Into
381))]
382pub struct AttemptNumber(std::num::NonZeroU32);
383
384/// Count of retry attempts that have been made (0-based).
385///
386/// RetryCount represents how many retry attempts have been made so far.
387/// Starts at 0 on the initial failure (before any retries).
388///
389/// # Examples
390///
391/// ```ignore
392/// use eventcore_types::projection::RetryCount;
393///
394/// let initial_failure = RetryCount::new(0);
395/// let after_first_retry = RetryCount::new(1);
396/// let after_three_retries = RetryCount::new(3);
397/// ```
398#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
399pub struct RetryCount(u32);
400
401/// Pagination parameters for reading events.
402///
403/// EventPage bundles together the cursor position and page size for paginating
404/// through events. This separates pagination concerns from filtering concerns.
405///
406/// # Examples
407///
408/// ```ignore
409/// use eventcore_types::projection::{EventPage, BatchSize};
410///
411/// // First page
412/// let page = EventPage::first(BatchSize::new(100));
413/// let events = reader.read_events(filter, page).await?;
414///
415/// // Next page using the last event's position
416/// if let Some(next_page) = page.next_from_results(&events) {
417///     let more = reader.read_events(filter, next_page).await?;
418/// }
419/// ```
420#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421pub struct EventPage {
422    after_position: Option<StreamPosition>,
423    limit: BatchSize,
424}
425
426impl EventPage {
427    /// Create the first page with the given limit.
428    ///
429    /// Starts reading from the beginning of the event stream.
430    pub fn first(limit: BatchSize) -> Self {
431        Self {
432            after_position: None,
433            limit,
434        }
435    }
436
437    /// Create a page starting after the given position.
438    ///
439    /// Only events with position > `after_position` will be returned.
440    pub fn after(position: StreamPosition, limit: BatchSize) -> Self {
441        Self {
442            after_position: Some(position),
443            limit,
444        }
445    }
446
447    /// Create the next page using the last position from previous results.
448    ///
449    /// This is a convenience method for the common pagination pattern.
450    /// Returns `Some(next_page)` if events were returned, `None` if empty.
451    ///
452    /// # Examples
453    ///
454    /// ```ignore
455    /// let mut page = EventPage::first(BatchSize::new(100));
456    /// loop {
457    ///     let events = reader.read_events(filter, page).await?;
458    ///     if events.is_empty() {
459    ///         break;
460    ///     }
461    ///     // Process events...
462    ///
463    ///     // Get next page
464    ///     page = match page.next_from_results(&events) {
465    ///         Some(next) => next,
466    ///         None => break,
467    ///     };
468    /// }
469    /// ```
470    pub fn next_from_results<E>(&self, events: &[(E, StreamPosition)]) -> Option<Self> {
471        events.last().map(|(_, pos)| Self {
472            after_position: Some(*pos),
473            limit: self.limit,
474        })
475    }
476
477    /// Create the next page using an explicit position.
478    ///
479    /// Returns a new page that starts after the given position with the same limit.
480    pub fn next(&self, last_position: StreamPosition) -> Self {
481        Self {
482            after_position: Some(last_position),
483            limit: self.limit,
484        }
485    }
486
487    /// Get the cursor position for this page.
488    pub fn after_position(&self) -> Option<StreamPosition> {
489        self.after_position
490    }
491
492    /// Get the page size limit.
493    pub fn limit(&self) -> BatchSize {
494        self.limit
495    }
496}
497
498/// Filter criteria for selecting which events to read from the event store.
499///
500/// EventFilter specifies filtering criteria (e.g., stream prefix) separate from
501/// pagination concerns (position and limit). Use `::all()` to match all events,
502/// or `::prefix()` to filter by stream ID prefix.
503///
504/// # Examples
505///
506/// ```ignore
507/// // Match all events
508/// let filter = EventFilter::all();
509///
510/// // Filter by stream prefix
511/// let filter = EventFilter::prefix("account-");
512/// ```
513#[derive(Debug, Clone)]
514pub struct EventFilter {
515    stream_prefix: Option<StreamPrefix>,
516}
517
518impl EventFilter {
519    /// Create a filter that matches all events from all streams.
520    ///
521    /// This is the most permissive filter - it matches every event
522    /// in the store.
523    pub fn all() -> Self {
524        Self {
525            stream_prefix: None,
526        }
527    }
528
529    /// Create a filter that matches events from streams with the given prefix.
530    ///
531    /// Only events whose stream ID starts with the specified prefix will match.
532    ///
533    /// # Examples
534    ///
535    /// ```ignore
536    /// use eventcore_types::{EventFilter, StreamPrefix};
537    ///
538    /// let prefix = StreamPrefix::try_new("account-").unwrap();
539    /// let filter = EventFilter::prefix(prefix);
540    /// ```
541    pub fn prefix(prefix: StreamPrefix) -> Self {
542        Self {
543            stream_prefix: Some(prefix),
544        }
545    }
546
547    /// Get the stream prefix filter, if any.
548    ///
549    /// Returns `Some(&StreamPrefix)` if a prefix filter is set, or `None`
550    /// if this filter matches all streams.
551    pub fn stream_prefix(&self) -> Option<&StreamPrefix> {
552        self.stream_prefix.as_ref()
553    }
554}
555
556/// Trait for reading events globally for projections.
557///
558/// EventReader provides access to all events in global order, which is
559/// required for building read models that aggregate data across streams.
560///
561/// # Pagination and Filtering
562///
563/// The `read_events` method requires explicit pagination via `EventPage`
564/// to prevent accidental memory exhaustion. Filtering is specified via `EventFilter`.
565///
566/// # Type Safety
567///
568/// The method is generic over the event type, allowing the caller to specify
569/// which event type to deserialize. Events that cannot be deserialized to the
570/// requested type are skipped.
571pub trait EventReader {
572    /// Error type returned by read operations.
573    type Error;
574
575    /// Read events matching filter criteria with pagination.
576    ///
577    /// Returns a vector of tuples containing the event and its global position.
578    /// Events are ordered by their append time (oldest first).
579    ///
580    /// # Type Parameters
581    ///
582    /// - `E`: The event type to deserialize events as
583    ///
584    /// # Parameters
585    ///
586    /// - `filter`: Filtering criteria (stream prefix, etc.)
587    /// - `page`: Pagination parameters (cursor position and limit)
588    ///
589    /// # Returns
590    ///
591    /// - `Ok(Vec<(E, StreamPosition)>)`: Events with their positions
592    /// - `Err(Self::Error)`: If the read operation fails
593    ///
594    /// # Examples
595    ///
596    /// ```ignore
597    /// let page = EventPage::first(BatchSize::new(100));
598    /// let events = reader.read_events(EventFilter::all(), page).await?;
599    /// ```
600    fn read_events<E: crate::Event>(
601        &self,
602        filter: EventFilter,
603        page: EventPage,
604    ) -> impl Future<Output = Result<Vec<(E, StreamPosition)>, Self::Error>> + Send;
605}
606
607/// Blanket implementation allowing EventReader trait to work with references.
608///
609/// This is a trivial forwarding implementation that cannot be meaningfully tested
610/// in isolation - mutations here would break all EventReader usage through references.
611// cargo-mutants: skip (trivial forwarding impl)
612impl<T: EventReader + Sync> EventReader for &T {
613    type Error = T::Error;
614
615    async fn read_events<E: crate::Event>(
616        &self,
617        filter: EventFilter,
618        page: EventPage,
619    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
620        (*self).read_events(filter, page).await
621    }
622}
623
624/// Trait for persisting and retrieving projection checkpoints.
625///
626/// CheckpointStore provides durable storage for projection progress, enabling
627/// projections to resume from their last known position after restarts or failures.
628///
629/// # Type Parameters
630///
631/// - `Error`: The error type returned by checkpoint operations
632///
633/// # Required Methods
634///
635/// - `load`: Retrieve the last saved position for a subscription
636/// - `save`: Persist the current position for a subscription
637///
638/// # Example
639///
640/// ```ignore
641/// impl CheckpointStore for MyCheckpointStore {
642///     type Error = MyError;
643///
644///     async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
645///         // Load from database, file, etc.
646///     }
647///
648///     async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
649///         // Persist to database, file, etc.
650///     }
651/// }
652/// ```
653pub trait CheckpointStore: Send + Sync {
654    /// Error type returned by checkpoint operations.
655    type Error: std::error::Error + Send + Sync + 'static;
656
657    /// Load the last saved checkpoint position for a subscription.
658    ///
659    /// # Parameters
660    ///
661    /// - `name`: The unique name identifying the subscription/projector
662    ///
663    /// # Returns
664    ///
665    /// - `Ok(Some(position))`: The last saved position
666    /// - `Ok(None)`: No checkpoint exists for this subscription
667    /// - `Err(Self::Error)`: If the load operation fails
668    fn load(
669        &self,
670        name: &str,
671    ) -> impl Future<Output = Result<Option<StreamPosition>, Self::Error>> + Send;
672
673    /// Save a checkpoint position for a subscription.
674    ///
675    /// This overwrites any previously saved position for the same subscription.
676    ///
677    /// # Parameters
678    ///
679    /// - `name`: The unique name identifying the subscription/projector
680    /// - `position`: The stream position to save
681    ///
682    /// # Returns
683    ///
684    /// - `Ok(())`: The checkpoint was saved successfully
685    /// - `Err(Self::Error)`: If the save operation fails
686    fn save(
687        &self,
688        name: &str,
689        position: StreamPosition,
690    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
691}
692
693/// Blanket implementation allowing CheckpointStore trait to work with references.
694///
695/// This is a trivial forwarding implementation that cannot be meaningfully tested
696/// in isolation - mutations here would break all CheckpointStore usage through references.
697// cargo-mutants: skip (trivial forwarding impl)
698impl<T: CheckpointStore + Sync> CheckpointStore for &T {
699    type Error = T::Error;
700
701    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
702        (*self).load(name).await
703    }
704
705    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
706        (*self).save(name, position).await
707    }
708}
709
710/// Trait for coordinating projector leadership across multiple instances.
711///
712/// ProjectorCoordinator enables single-leader projector execution in distributed
713/// deployments. Only one instance at a time should process events for a given
714/// subscription to prevent duplicate processing and ensure ordering guarantees.
715///
716/// # Design
717///
718/// This trait uses a non-blocking, try-acquire pattern (per ADR-028):
719/// - `try_acquire` returns immediately, never blocks
720/// - Returns a guard on success; dropping the guard releases leadership
721/// - Returns an error if another instance holds leadership
722///
723/// Callers decide how to handle acquisition failure:
724/// - Exit the process (recommended for Kubernetes/systemd deployments)
725/// - Sleep and retry (for environments without restart orchestration)
726/// - Continue without this projector (for degraded-mode operation)
727///
728/// # Type Parameters
729///
730/// - `Error`: Error type returned when acquisition fails
731/// - `Guard`: Guard type that releases leadership when dropped (must be Send)
732///
733/// # Example
734///
735/// ```ignore
736/// let result = coordinator.try_acquire("my-projector").await;
737/// match result {
738///     Ok(guard) => {
739///         // We have leadership - run the projector
740///         run_projector().await;
741///         // Guard is dropped when we're done, releasing leadership
742///     }
743///     Err(_) => {
744///         // Another instance has leadership - exit or retry
745///         std::process::exit(0);
746///     }
747/// }
748/// ```
749pub trait ProjectorCoordinator {
750    /// Error type returned when leadership acquisition fails.
751    ///
752    /// Should distinguish between:
753    /// - Lock not acquired (another instance has leadership)
754    /// - Infrastructure errors (database connectivity, etc.)
755    type Error: std::error::Error + Send + Sync + 'static;
756
757    /// Guard type that releases leadership when dropped.
758    ///
759    /// The guard must be Send to support async workflows where the guard
760    /// may be held across await points.
761    type Guard: Send;
762
763    /// Attempt to acquire leadership for a subscription.
764    ///
765    /// This method is non-blocking per ADR-028. It returns immediately with
766    /// either a guard (success) or an error (failure).
767    ///
768    /// # Parameters
769    ///
770    /// - `subscription_name`: Unique identifier for the projector/subscription
771    ///
772    /// # Returns
773    ///
774    /// - `Ok(Guard)`: Leadership acquired; dropping the guard releases it
775    /// - `Err(Self::Error)`: Leadership not acquired (held by another instance)
776    ///   or infrastructure error
777    fn try_acquire(
778        &self,
779        subscription_name: &str,
780    ) -> impl Future<Output = Result<Self::Guard, Self::Error>> + Send;
781}
782
783#[cfg(test)]
784mod tests {
785    use super::*;
786
787    #[test]
788    fn event_page_first_has_no_after_position() {
789        let page = EventPage::first(BatchSize::new(100));
790        assert_eq!(page.after_position(), None);
791        assert_eq!(page.limit().into_inner(), 100);
792    }
793
794    #[test]
795    fn event_page_after_has_correct_position() {
796        let uuid = Uuid::parse_str("018e8c5e-8c5e-7000-8000-000000000001").unwrap();
797        let position = StreamPosition::new(uuid);
798        let page = EventPage::after(position, BatchSize::new(50));
799        assert_eq!(page.after_position(), Some(position));
800        assert_eq!(page.limit().into_inner(), 50);
801    }
802
803    #[test]
804    fn event_page_next_preserves_limit_and_updates_position() {
805        let page = EventPage::first(BatchSize::new(100));
806        let uuid = Uuid::parse_str("018e8c5e-8c5e-7000-8000-000000000002").unwrap();
807        let new_position = StreamPosition::new(uuid);
808        let next_page = page.next(new_position);
809        assert_eq!(next_page.after_position(), Some(new_position));
810        assert_eq!(next_page.limit().into_inner(), 100);
811    }
812}