eventcore_types/projection.rs
1//! Projection types and traits for building read models from event streams.
2//!
3//! This module provides the core abstractions for event projection:
4//! - `Projector`: Trait for transforming events into read model updates
5//! - `EventReader`: Trait for reading events globally for projections
6//! - `StreamPosition`: Global position in the event stream
7
8use crate::store::StreamPrefix;
9use nutype::nutype;
10use std::future::Future;
11use uuid::Uuid;
12
13/// Context provided to error handler when event processing fails.
14///
15/// This struct bundles together all the information needed to make
16/// informed decisions about how to handle a projection failure.
17///
18/// # Type Parameters
19///
20/// - `E`: The error type returned by the projector's `apply()` method
21///
22/// # Fields
23///
24/// - `error`: Reference to the error that occurred
25/// - `position`: Global stream position where the failure occurred
26/// - `retry_count`: Number of times this event has been retried (0 on first failure)
27#[derive(Debug)]
28pub struct FailureContext<'a, E> {
29 /// Reference to the error that occurred during event processing.
30 pub error: &'a E,
31 /// Global stream position of the event that failed to process.
32 pub position: StreamPosition,
33 /// Number of retry attempts so far (0 on initial failure).
34 pub retry_count: RetryCount,
35}
36
37/// Strategy for handling event processing failures.
38///
39/// When a projector's `apply()` method returns an error, the `on_error()`
40/// callback determines how the projection runner should respond. This enum
41/// represents the available failure strategies.
42///
43/// # Variants
44///
45/// - `Fatal`: Stop processing immediately and return the error
46/// - `Skip`: Log the error and continue processing the next event
47/// - `Retry`: Attempt to reprocess the event according to retry configuration
48///
49/// # Example
50///
51/// ```ignore
52/// fn on_error(
53/// &mut self,
54/// ctx: FailureContext<Self::Error>,
55/// ) -> FailureStrategy {
56/// match ctx.error {
57/// MyError::Transient(_) if ctx.retry_count < 3 => FailureStrategy::Retry,
58/// MyError::PoisonEvent(_) => FailureStrategy::Skip,
59/// _ => FailureStrategy::Fatal,
60/// }
61/// }
62/// ```
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum FailureStrategy {
65 /// Stop processing immediately and return the error to the caller.
66 ///
67 /// Use this when:
68 /// - The error is unrecoverable (e.g., database schema mismatch)
69 /// - The projector requires manual intervention
70 /// - Continuing would corrupt the read model
71 Fatal,
72
73 /// Skip this event and continue processing the next one.
74 ///
75 /// Use this when:
76 /// - The event is malformed or invalid (poison event)
77 /// - Processing this event is not critical
78 /// - Continuing without this event is acceptable
79 Skip,
80
81 /// Retry processing this event according to retry configuration.
82 ///
83 /// Use this when:
84 /// - The error is likely transient (e.g., network timeout)
85 /// - Retrying might succeed
86 /// - The event is important and should not be skipped
87 Retry,
88}
89
90/// Global stream position representing a location in the ordered event log.
91///
92/// StreamPosition uniquely identifies a position in the global event stream
93/// across all individual streams. Used by projectors to track progress and
94/// enable resumable event processing.
95///
96/// Positions are UUID7 values (timestamp-ordered UUIDs) assigned at event
97/// append time. They are monotonically increasing and globally sortable.
98#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display))]
99pub struct StreamPosition(Uuid);
100
101/// Trait for transforming events into read model updates.
102///
103/// Projectors consume events from the event store and update read models.
104/// They implement the "Q" (Query) side of CQRS by building denormalized
105/// views optimized for reading.
106///
107/// # Type Parameters
108///
109/// - `Event`: The domain event type this projector handles
110/// - `Error`: The error type returned when projection fails
111/// - `Context`: Shared context for database connections, caches, etc.
112///
113/// # Required Methods
114///
115/// - `apply`: Process a single event and update the read model
116/// - `name`: Return a unique identifier for this projector
117///
118/// # Example
119///
120/// ```ignore
121/// struct AccountBalanceProjector {
122/// balances: HashMap<AccountId, Money>,
123/// }
124///
125/// impl Projector for AccountBalanceProjector {
126/// type Event = AccountEvent;
127/// type Error = Infallible;
128/// type Context = ();
129///
130/// fn apply(
131/// &mut self,
132/// event: Self::Event,
133/// _position: StreamPosition,
134/// _ctx: &mut Self::Context,
135/// ) -> Result<(), Self::Error> {
136/// match event {
137/// AccountEvent::Deposited { account_id, amount } => {
138/// *self.balances.entry(account_id).or_default() += amount;
139/// }
140/// AccountEvent::Withdrawn { account_id, amount } => {
141/// *self.balances.entry(account_id).or_default() -= amount;
142/// }
143/// }
144/// Ok(())
145/// }
146///
147/// fn name(&self) -> &str {
148/// "account-balance"
149/// }
150/// }
151/// ```
152pub trait Projector {
153 /// The domain event type this projector handles.
154 type Event;
155
156 /// The error type returned when projection fails.
157 type Error;
158
159 /// Shared context for database connections, caches, etc.
160 type Context;
161
162 /// Process a single event and update the read model.
163 ///
164 /// This method is called for each event in stream order. Implementations
165 /// should update their read model state based on the event content.
166 ///
167 /// # Parameters
168 ///
169 /// - `event`: The domain event to process
170 /// - `position`: The global stream position of this event
171 /// - `ctx`: Mutable reference to shared context
172 ///
173 /// # Returns
174 ///
175 /// - `Ok(())`: Event was successfully processed
176 /// - `Err(Self::Error)`: Projection failed (triggers error handling)
177 fn apply(
178 &mut self,
179 event: Self::Event,
180 position: StreamPosition,
181 ctx: &mut Self::Context,
182 ) -> Result<(), Self::Error>;
183
184 /// Return a unique identifier for this projector.
185 ///
186 /// The name is used for:
187 /// - Logging and tracing
188 /// - Checkpoint storage (to resume from last position)
189 /// - Coordination (leader election key)
190 ///
191 /// Names should be stable across deployments. Changing a projector's
192 /// name will cause it to reprocess all events from the beginning.
193 fn name(&self) -> &str;
194
195 /// Handle event processing errors and determine failure strategy.
196 ///
197 /// Called when `apply()` returns an error. The projector can inspect
198 /// the error context and decide how the runner should respond.
199 ///
200 /// # Parameters
201 ///
202 /// - `ctx`: Context containing the error, position, and retry count
203 ///
204 /// # Returns
205 ///
206 /// The failure strategy the runner should use:
207 /// - `FailureStrategy::Fatal`: Stop processing and return error
208 /// - `FailureStrategy::Skip`: Skip this event and continue
209 /// - `FailureStrategy::Retry`: Retry processing this event
210 ///
211 /// # Default Implementation
212 ///
213 /// Returns `FailureStrategy::Fatal` for all errors. This is the safest
214 /// default - projectors that need different behavior should override
215 /// this method.
216 ///
217 /// # Example
218 ///
219 /// ```ignore
220 /// fn on_error(
221 /// &mut self,
222 /// ctx: FailureContext<Self::Error>,
223 /// ) -> FailureStrategy {
224 /// match ctx.error {
225 /// MyError::Transient(_) if ctx.retry_count < 3 => FailureStrategy::Retry,
226 /// MyError::PoisonEvent(_) => FailureStrategy::Skip,
227 /// _ => FailureStrategy::Fatal,
228 /// }
229 /// }
230 /// ```
231 fn on_error(&mut self, _ctx: FailureContext<'_, Self::Error>) -> FailureStrategy {
232 FailureStrategy::Fatal
233 }
234}
235
236/// Batch size domain type for limiting query results.
237///
238/// BatchSize represents the maximum number of events to return in a single
239/// query. Callers are responsible for choosing appropriate batch sizes based
240/// on their memory constraints and use case requirements.
241///
242/// A batch size of zero is valid and will return an empty result set.
243///
244/// # Examples
245///
246/// ```ignore
247/// use eventcore_types::projection::BatchSize;
248///
249/// let small = BatchSize::new(100);
250/// let large = BatchSize::new(1_000_000);
251/// let empty = BatchSize::new(0); // Valid - returns no events
252/// ```
253#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display))]
254pub struct BatchSize(usize);
255
256/// Maximum number of retry attempts for event processing.
257///
258/// MaxRetryAttempts represents the maximum number of times to retry processing
259/// a failed event before escalating to a fatal error. A value of 0 means no
260/// retries are attempted.
261///
262/// # Examples
263///
264/// ```ignore
265/// use eventcore_types::projection::MaxRetryAttempts;
266///
267/// let no_retries = MaxRetryAttempts::new(0);
268/// let standard = MaxRetryAttempts::new(3);
269/// let aggressive = MaxRetryAttempts::new(10);
270/// let very_aggressive = MaxRetryAttempts::new(1000); // Library doesn't impose limits
271/// ```
272#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
273pub struct MaxRetryAttempts(u32);
274
275/// Backoff multiplier for exponential retry delays.
276///
277/// BackoffMultiplier represents the factor by which retry delays grow on each
278/// attempt. A value of 1.0 means constant delay (no backoff), while values
279/// greater than 1.0 implement exponential backoff.
280///
281/// The minimum value of 1.0 prevents decreasing delays, which would not make
282/// sense for retry backoff. Common values are 2.0 (double each time) or 1.5
283/// (50% increase).
284///
285/// # Examples
286///
287/// ```ignore
288/// use eventcore_types::projection::BackoffMultiplier;
289///
290/// let constant = BackoffMultiplier::try_new(1.0).expect("1.0 is valid"); // No backoff
291/// let standard = BackoffMultiplier::try_new(2.0).expect("2.0 is valid"); // Double each time
292/// let gentle = BackoffMultiplier::try_new(1.5).expect("1.5 is valid"); // 50% increase
293///
294/// // Values below 1.0 are rejected
295/// assert!(BackoffMultiplier::try_new(0.5).is_err());
296/// ```
297#[nutype(
298 validate(greater_or_equal = 1.0),
299 derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
300)]
301pub struct BackoffMultiplier(f64);
302
303/// Maximum number of consecutive poll failures before stopping.
304///
305/// MaxConsecutiveFailures represents the threshold for consecutive errors
306/// during event polling. Must be at least 1, enforced by using NonZeroU32
307/// as the underlying type.
308///
309/// # Examples
310///
311/// ```ignore
312/// use eventcore_types::projection::MaxConsecutiveFailures;
313/// use std::num::NonZeroU32;
314///
315/// let lenient = MaxConsecutiveFailures::new(NonZeroU32::new(10).expect("10 is non-zero"));
316/// let strict = MaxConsecutiveFailures::new(NonZeroU32::new(3).expect("3 is non-zero"));
317///
318/// // Zero failures not allowed by type system
319/// // let zero = NonZeroU32::new(0); // Returns None
320/// ```
321#[nutype(derive(
322 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, AsRef, Into
323))]
324pub struct MaxConsecutiveFailures(std::num::NonZeroU32);
325
326/// Maximum number of retry attempts.
327///
328/// MaxRetries represents the maximum number of times to retry an operation
329/// before giving up. A value of 0 means no retries (fail immediately).
330///
331/// # Examples
332///
333/// ```ignore
334/// use eventcore_types::projection::MaxRetries;
335///
336/// let no_retry = MaxRetries::new(0);
337/// let standard = MaxRetries::new(3);
338/// let aggressive = MaxRetries::new(10);
339/// let very_aggressive = MaxRetries::new(1000); // Library doesn't impose limits
340/// ```
341#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
342pub struct MaxRetries(u32);
343
344/// Delay in milliseconds for retry or backoff operations.
345///
346/// DelayMilliseconds represents a time delay expressed in milliseconds.
347/// Used for retry delays, backoff intervals, and polling intervals.
348///
349/// # Examples
350///
351/// ```ignore
352/// use eventcore_types::projection::DelayMilliseconds;
353///
354/// let short = DelayMilliseconds::new(100);
355/// let medium = DelayMilliseconds::new(1000);
356/// let long = DelayMilliseconds::new(5000);
357/// ```
358#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
359pub struct DelayMilliseconds(u64);
360
361/// Attempt number for retry operations (1-based).
362///
363/// AttemptNumber represents which attempt is currently being made, starting
364/// from 1 for the first attempt. Must be at least 1, enforced by using NonZeroU32
365/// as the underlying type.
366///
367/// # Examples
368///
369/// ```ignore
370/// use eventcore_types::projection::AttemptNumber;
371/// use std::num::NonZeroU32;
372///
373/// let first_attempt = AttemptNumber::new(NonZeroU32::new(1).expect("1 is non-zero"));
374/// let retry_attempt = AttemptNumber::new(NonZeroU32::new(3).expect("3 is non-zero"));
375///
376/// // Zero attempts not allowed by type system
377/// // let zero = NonZeroU32::new(0); // Returns None
378/// ```
379#[nutype(derive(
380 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, AsRef, Into
381))]
382pub struct AttemptNumber(std::num::NonZeroU32);
383
384/// Count of retry attempts that have been made (0-based).
385///
386/// RetryCount represents how many retry attempts have been made so far.
387/// Starts at 0 on the initial failure (before any retries).
388///
389/// # Examples
390///
391/// ```ignore
392/// use eventcore_types::projection::RetryCount;
393///
394/// let initial_failure = RetryCount::new(0);
395/// let after_first_retry = RetryCount::new(1);
396/// let after_three_retries = RetryCount::new(3);
397/// ```
398#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Display, Into))]
399pub struct RetryCount(u32);
400
401/// Pagination parameters for reading events.
402///
403/// EventPage bundles together the cursor position and page size for paginating
404/// through events. This separates pagination concerns from filtering concerns.
405///
406/// # Examples
407///
408/// ```ignore
409/// use eventcore_types::projection::{EventPage, BatchSize};
410///
411/// // First page
412/// let page = EventPage::first(BatchSize::new(100));
413/// let events = reader.read_events(filter, page).await?;
414///
415/// // Next page using the last event's position
416/// if let Some(next_page) = page.next_from_results(&events) {
417/// let more = reader.read_events(filter, next_page).await?;
418/// }
419/// ```
420#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421pub struct EventPage {
422 after_position: Option<StreamPosition>,
423 limit: BatchSize,
424}
425
426impl EventPage {
427 /// Create the first page with the given limit.
428 ///
429 /// Starts reading from the beginning of the event stream.
430 pub fn first(limit: BatchSize) -> Self {
431 Self {
432 after_position: None,
433 limit,
434 }
435 }
436
437 /// Create a page starting after the given position.
438 ///
439 /// Only events with position > `after_position` will be returned.
440 pub fn after(position: StreamPosition, limit: BatchSize) -> Self {
441 Self {
442 after_position: Some(position),
443 limit,
444 }
445 }
446
447 /// Create the next page using the last position from previous results.
448 ///
449 /// This is a convenience method for the common pagination pattern.
450 /// Returns `Some(next_page)` if events were returned, `None` if empty.
451 ///
452 /// # Examples
453 ///
454 /// ```ignore
455 /// let mut page = EventPage::first(BatchSize::new(100));
456 /// loop {
457 /// let events = reader.read_events(filter, page).await?;
458 /// if events.is_empty() {
459 /// break;
460 /// }
461 /// // Process events...
462 ///
463 /// // Get next page
464 /// page = match page.next_from_results(&events) {
465 /// Some(next) => next,
466 /// None => break,
467 /// };
468 /// }
469 /// ```
470 pub fn next_from_results<E>(&self, events: &[(E, StreamPosition)]) -> Option<Self> {
471 events.last().map(|(_, pos)| Self {
472 after_position: Some(*pos),
473 limit: self.limit,
474 })
475 }
476
477 /// Create the next page using an explicit position.
478 ///
479 /// Returns a new page that starts after the given position with the same limit.
480 pub fn next(&self, last_position: StreamPosition) -> Self {
481 Self {
482 after_position: Some(last_position),
483 limit: self.limit,
484 }
485 }
486
487 /// Get the cursor position for this page.
488 pub fn after_position(&self) -> Option<StreamPosition> {
489 self.after_position
490 }
491
492 /// Get the page size limit.
493 pub fn limit(&self) -> BatchSize {
494 self.limit
495 }
496}
497
498/// Filter criteria for selecting which events to read from the event store.
499///
500/// EventFilter specifies filtering criteria (e.g., stream prefix) separate from
501/// pagination concerns (position and limit). Use `::all()` to match all events,
502/// or `::prefix()` to filter by stream ID prefix.
503///
504/// # Examples
505///
506/// ```ignore
507/// // Match all events
508/// let filter = EventFilter::all();
509///
510/// // Filter by stream prefix
511/// let filter = EventFilter::prefix("account-");
512/// ```
513#[derive(Debug, Clone)]
514pub struct EventFilter {
515 stream_prefix: Option<StreamPrefix>,
516}
517
518impl EventFilter {
519 /// Create a filter that matches all events from all streams.
520 ///
521 /// This is the most permissive filter - it matches every event
522 /// in the store.
523 pub fn all() -> Self {
524 Self {
525 stream_prefix: None,
526 }
527 }
528
529 /// Create a filter that matches events from streams with the given prefix.
530 ///
531 /// Only events whose stream ID starts with the specified prefix will match.
532 ///
533 /// # Examples
534 ///
535 /// ```ignore
536 /// use eventcore_types::{EventFilter, StreamPrefix};
537 ///
538 /// let prefix = StreamPrefix::try_new("account-").unwrap();
539 /// let filter = EventFilter::prefix(prefix);
540 /// ```
541 pub fn prefix(prefix: StreamPrefix) -> Self {
542 Self {
543 stream_prefix: Some(prefix),
544 }
545 }
546
547 /// Get the stream prefix filter, if any.
548 ///
549 /// Returns `Some(&StreamPrefix)` if a prefix filter is set, or `None`
550 /// if this filter matches all streams.
551 pub fn stream_prefix(&self) -> Option<&StreamPrefix> {
552 self.stream_prefix.as_ref()
553 }
554}
555
556/// Trait for reading events globally for projections.
557///
558/// EventReader provides access to all events in global order, which is
559/// required for building read models that aggregate data across streams.
560///
561/// # Pagination and Filtering
562///
563/// The `read_events` method requires explicit pagination via `EventPage`
564/// to prevent accidental memory exhaustion. Filtering is specified via `EventFilter`.
565///
566/// # Type Safety
567///
568/// The method is generic over the event type, allowing the caller to specify
569/// which event type to deserialize. Events that cannot be deserialized to the
570/// requested type are skipped.
571pub trait EventReader {
572 /// Error type returned by read operations.
573 type Error;
574
575 /// Read events matching filter criteria with pagination.
576 ///
577 /// Returns a vector of tuples containing the event and its global position.
578 /// Events are ordered by their append time (oldest first).
579 ///
580 /// # Type Parameters
581 ///
582 /// - `E`: The event type to deserialize events as
583 ///
584 /// # Parameters
585 ///
586 /// - `filter`: Filtering criteria (stream prefix, etc.)
587 /// - `page`: Pagination parameters (cursor position and limit)
588 ///
589 /// # Returns
590 ///
591 /// - `Ok(Vec<(E, StreamPosition)>)`: Events with their positions
592 /// - `Err(Self::Error)`: If the read operation fails
593 ///
594 /// # Examples
595 ///
596 /// ```ignore
597 /// let page = EventPage::first(BatchSize::new(100));
598 /// let events = reader.read_events(EventFilter::all(), page).await?;
599 /// ```
600 fn read_events<E: crate::Event>(
601 &self,
602 filter: EventFilter,
603 page: EventPage,
604 ) -> impl Future<Output = Result<Vec<(E, StreamPosition)>, Self::Error>> + Send;
605}
606
607/// Blanket implementation allowing EventReader trait to work with references.
608///
609/// This is a trivial forwarding implementation that cannot be meaningfully tested
610/// in isolation - mutations here would break all EventReader usage through references.
611// cargo-mutants: skip (trivial forwarding impl)
612impl<T: EventReader + Sync> EventReader for &T {
613 type Error = T::Error;
614
615 async fn read_events<E: crate::Event>(
616 &self,
617 filter: EventFilter,
618 page: EventPage,
619 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
620 (*self).read_events(filter, page).await
621 }
622}
623
624/// Trait for persisting and retrieving projection checkpoints.
625///
626/// CheckpointStore provides durable storage for projection progress, enabling
627/// projections to resume from their last known position after restarts or failures.
628///
629/// # Type Parameters
630///
631/// - `Error`: The error type returned by checkpoint operations
632///
633/// # Required Methods
634///
635/// - `load`: Retrieve the last saved position for a subscription
636/// - `save`: Persist the current position for a subscription
637///
638/// # Example
639///
640/// ```ignore
641/// impl CheckpointStore for MyCheckpointStore {
642/// type Error = MyError;
643///
644/// async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
645/// // Load from database, file, etc.
646/// }
647///
648/// async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
649/// // Persist to database, file, etc.
650/// }
651/// }
652/// ```
653pub trait CheckpointStore: Send + Sync {
654 /// Error type returned by checkpoint operations.
655 type Error: std::error::Error + Send + Sync + 'static;
656
657 /// Load the last saved checkpoint position for a subscription.
658 ///
659 /// # Parameters
660 ///
661 /// - `name`: The unique name identifying the subscription/projector
662 ///
663 /// # Returns
664 ///
665 /// - `Ok(Some(position))`: The last saved position
666 /// - `Ok(None)`: No checkpoint exists for this subscription
667 /// - `Err(Self::Error)`: If the load operation fails
668 fn load(
669 &self,
670 name: &str,
671 ) -> impl Future<Output = Result<Option<StreamPosition>, Self::Error>> + Send;
672
673 /// Save a checkpoint position for a subscription.
674 ///
675 /// This overwrites any previously saved position for the same subscription.
676 ///
677 /// # Parameters
678 ///
679 /// - `name`: The unique name identifying the subscription/projector
680 /// - `position`: The stream position to save
681 ///
682 /// # Returns
683 ///
684 /// - `Ok(())`: The checkpoint was saved successfully
685 /// - `Err(Self::Error)`: If the save operation fails
686 fn save(
687 &self,
688 name: &str,
689 position: StreamPosition,
690 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
691}
692
693/// Blanket implementation allowing CheckpointStore trait to work with references.
694///
695/// This is a trivial forwarding implementation that cannot be meaningfully tested
696/// in isolation - mutations here would break all CheckpointStore usage through references.
697// cargo-mutants: skip (trivial forwarding impl)
698impl<T: CheckpointStore + Sync> CheckpointStore for &T {
699 type Error = T::Error;
700
701 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
702 (*self).load(name).await
703 }
704
705 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
706 (*self).save(name, position).await
707 }
708}
709
710/// Trait for coordinating projector leadership across multiple instances.
711///
712/// ProjectorCoordinator enables single-leader projector execution in distributed
713/// deployments. Only one instance at a time should process events for a given
714/// subscription to prevent duplicate processing and ensure ordering guarantees.
715///
716/// # Design
717///
718/// This trait uses a non-blocking, try-acquire pattern (per ADR-028):
719/// - `try_acquire` returns immediately, never blocks
720/// - Returns a guard on success; dropping the guard releases leadership
721/// - Returns an error if another instance holds leadership
722///
723/// Callers decide how to handle acquisition failure:
724/// - Exit the process (recommended for Kubernetes/systemd deployments)
725/// - Sleep and retry (for environments without restart orchestration)
726/// - Continue without this projector (for degraded-mode operation)
727///
728/// # Type Parameters
729///
730/// - `Error`: Error type returned when acquisition fails
731/// - `Guard`: Guard type that releases leadership when dropped (must be Send)
732///
733/// # Example
734///
735/// ```ignore
736/// let result = coordinator.try_acquire("my-projector").await;
737/// match result {
738/// Ok(guard) => {
739/// // We have leadership - run the projector
740/// run_projector().await;
741/// // Guard is dropped when we're done, releasing leadership
742/// }
743/// Err(_) => {
744/// // Another instance has leadership - exit or retry
745/// std::process::exit(0);
746/// }
747/// }
748/// ```
749pub trait ProjectorCoordinator {
750 /// Error type returned when leadership acquisition fails.
751 ///
752 /// Should distinguish between:
753 /// - Lock not acquired (another instance has leadership)
754 /// - Infrastructure errors (database connectivity, etc.)
755 type Error: std::error::Error + Send + Sync + 'static;
756
757 /// Guard type that releases leadership when dropped.
758 ///
759 /// The guard must be Send to support async workflows where the guard
760 /// may be held across await points.
761 type Guard: Send;
762
763 /// Attempt to acquire leadership for a subscription.
764 ///
765 /// This method is non-blocking per ADR-028. It returns immediately with
766 /// either a guard (success) or an error (failure).
767 ///
768 /// # Parameters
769 ///
770 /// - `subscription_name`: Unique identifier for the projector/subscription
771 ///
772 /// # Returns
773 ///
774 /// - `Ok(Guard)`: Leadership acquired; dropping the guard releases it
775 /// - `Err(Self::Error)`: Leadership not acquired (held by another instance)
776 /// or infrastructure error
777 fn try_acquire(
778 &self,
779 subscription_name: &str,
780 ) -> impl Future<Output = Result<Self::Guard, Self::Error>> + Send;
781}
782
783#[cfg(test)]
784mod tests {
785 use super::*;
786
787 #[test]
788 fn event_page_first_has_no_after_position() {
789 let page = EventPage::first(BatchSize::new(100));
790 assert_eq!(page.after_position(), None);
791 assert_eq!(page.limit().into_inner(), 100);
792 }
793
794 #[test]
795 fn event_page_after_has_correct_position() {
796 let uuid = Uuid::parse_str("018e8c5e-8c5e-7000-8000-000000000001").unwrap();
797 let position = StreamPosition::new(uuid);
798 let page = EventPage::after(position, BatchSize::new(50));
799 assert_eq!(page.after_position(), Some(position));
800 assert_eq!(page.limit().into_inner(), 50);
801 }
802
803 #[test]
804 fn event_page_next_preserves_limit_and_updates_position() {
805 let page = EventPage::first(BatchSize::new(100));
806 let uuid = Uuid::parse_str("018e8c5e-8c5e-7000-8000-000000000002").unwrap();
807 let new_position = StreamPosition::new(uuid);
808 let next_page = page.next(new_position);
809 assert_eq!(next_page.after_position(), Some(new_position));
810 assert_eq!(next_page.limit().into_inner(), 100);
811 }
812}