use eventcore_types::{
BackoffMultiplier, CheckpointStore, Event, EventReader, MaxConsecutiveFailures,
MaxRetryAttempts, Projector, StreamPosition,
};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PollConfig {
pub(crate) poll_interval: Duration,
pub(crate) empty_poll_backoff: Duration,
pub(crate) poll_failure_backoff: Duration,
pub(crate) max_consecutive_poll_failures: MaxConsecutiveFailures,
}
impl Default for PollConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_millis(100),
empty_poll_backoff: Duration::from_millis(50),
poll_failure_backoff: Duration::from_millis(100),
max_consecutive_poll_failures: MaxConsecutiveFailures::new(
std::num::NonZeroU32::new(5).expect("5 is non-zero"),
),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct EventRetryConfig {
pub(crate) max_retry_attempts: MaxRetryAttempts,
pub(crate) retry_delay: Duration,
pub(crate) retry_backoff_multiplier: BackoffMultiplier,
pub(crate) max_retry_delay: Duration,
}
impl Default for EventRetryConfig {
fn default() -> Self {
Self {
max_retry_attempts: MaxRetryAttempts::new(3),
retry_delay: Duration::from_millis(100),
retry_backoff_multiplier: BackoffMultiplier::try_new(2.0)
.expect("2.0 is a valid BackoffMultiplier value"),
max_retry_delay: Duration::from_secs(5),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PollMode {
Batch,
Continuous,
}
pub(crate) struct ProjectionRunner<E, R, P, C>
where
E: Event,
R: EventReader,
P: Projector<Event = E>,
C: CheckpointStore,
{
projector: P,
store: R,
checkpoint_store: Option<C>,
poll_mode: PollMode,
poll_config: PollConfig,
event_retry_config: EventRetryConfig,
_event: std::marker::PhantomData<E>,
}
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct NoCheckpointStore;
#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("no checkpoint store configured")]
pub(crate) struct NoCheckpointError;
impl CheckpointStore for NoCheckpointStore {
type Error = NoCheckpointError;
async fn load(&self, _name: &str) -> Result<Option<StreamPosition>, Self::Error> {
Ok(None)
}
async fn save(&self, _name: &str, _position: StreamPosition) -> Result<(), Self::Error> {
Ok(())
}
}
impl<P, R> ProjectionRunner<P::Event, R, P, NoCheckpointStore>
where
P: Projector,
P::Event: Event + Clone,
P::Context: Default,
R: EventReader,
{
pub(crate) fn new(projector: P, store: R) -> Self {
Self {
projector,
store,
checkpoint_store: None,
poll_mode: PollMode::Batch,
poll_config: PollConfig::default(),
event_retry_config: EventRetryConfig::default(),
_event: std::marker::PhantomData,
}
}
pub(crate) fn with_checkpoint_store<C: CheckpointStore>(
self,
checkpoint_store: C,
) -> ProjectionRunner<P::Event, R, P, C> {
ProjectionRunner {
projector: self.projector,
store: self.store,
checkpoint_store: Some(checkpoint_store),
poll_mode: self.poll_mode,
poll_config: self.poll_config,
event_retry_config: self.event_retry_config,
_event: std::marker::PhantomData,
}
}
}
impl<E, R, P, C> ProjectionRunner<E, R, P, C>
where
E: Event + Clone,
R: EventReader,
P: Projector<Event = E>,
P::Context: Default,
C: CheckpointStore,
{
pub(crate) fn with_poll_mode(mut self, mode: PollMode) -> Self {
self.poll_mode = mode;
self
}
pub(crate) fn with_poll_config(mut self, config: PollConfig) -> Self {
self.poll_config = config;
self
}
pub(crate) fn with_event_retry_config(mut self, config: EventRetryConfig) -> Self {
self.event_retry_config = config;
self
}
pub(crate) async fn run(self) -> Result<(), ProjectionError>
where
P::Error: std::fmt::Debug,
R::Error: std::fmt::Display,
{
use crate::projection_pipeline::{
ProjectionEffect, ProjectionEffectResult, ProjectionPipeline, ProjectionStep,
};
let has_checkpoint_store = self.checkpoint_store.is_some();
let mut pipeline = ProjectionPipeline::new(
self.projector,
has_checkpoint_store,
self.poll_mode,
self.poll_config,
self.event_retry_config,
);
let mut step = pipeline.step();
loop {
match step {
ProjectionStep::Done(result) => return result,
ProjectionStep::Yield(ProjectionEffect::LoadCheckpoint { name }) => {
let result = match &self.checkpoint_store {
Some(cs) => cs.load(&name).await.map_err(|e| e.to_string()),
None => Ok(None),
};
step = pipeline.resume(ProjectionEffectResult::CheckpointLoaded(result));
}
ProjectionStep::Yield(ProjectionEffect::ReadEvents { filter, page }) => {
let result = self
.store
.read_events(filter, page)
.await
.map_err(|e| e.to_string());
step = pipeline.resume(ProjectionEffectResult::EventsRead(result));
}
ProjectionStep::Yield(ProjectionEffect::SaveCheckpoint { name, position }) => {
let result = match &self.checkpoint_store {
Some(cs) => cs.save(&name, position).await.map_err(|e| e.to_string()),
None => Ok(()),
};
step = pipeline.resume(ProjectionEffectResult::CheckpointSaved(result));
}
ProjectionStep::Yield(ProjectionEffect::Sleep { duration }) => {
tokio::time::sleep(duration).await;
step = pipeline.resume(ProjectionEffectResult::Slept);
}
}
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum ProjectionError {
#[error("projection failed: {0}")]
Failed(String),
#[error("failed to acquire leadership: {0}")]
LeadershipError(String),
}
#[derive(Debug, Clone)]
pub struct ProjectionConfig {
continuous: bool,
poll_interval: Duration,
empty_poll_backoff: Duration,
poll_failure_backoff: Duration,
max_consecutive_poll_failures: MaxConsecutiveFailures,
event_retry_max_attempts: MaxRetryAttempts,
event_retry_delay: Duration,
event_retry_backoff_multiplier: BackoffMultiplier,
event_retry_max_delay: Duration,
}
impl Default for ProjectionConfig {
fn default() -> Self {
let poll_defaults = PollConfig::default();
let retry_defaults = EventRetryConfig::default();
Self {
continuous: false,
poll_interval: poll_defaults.poll_interval,
empty_poll_backoff: poll_defaults.empty_poll_backoff,
poll_failure_backoff: poll_defaults.poll_failure_backoff,
max_consecutive_poll_failures: poll_defaults.max_consecutive_poll_failures,
event_retry_max_attempts: retry_defaults.max_retry_attempts,
event_retry_delay: retry_defaults.retry_delay,
event_retry_backoff_multiplier: retry_defaults.retry_backoff_multiplier,
event_retry_max_delay: retry_defaults.max_retry_delay,
}
}
}
impl ProjectionConfig {
pub fn continuous(mut self) -> Self {
self.continuous = true;
self
}
pub fn poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn empty_poll_backoff(mut self, backoff: Duration) -> Self {
self.empty_poll_backoff = backoff;
self
}
pub fn poll_failure_backoff(mut self, backoff: Duration) -> Self {
self.poll_failure_backoff = backoff;
self
}
pub fn max_consecutive_poll_failures(mut self, max: MaxConsecutiveFailures) -> Self {
self.max_consecutive_poll_failures = max;
self
}
pub fn event_retry_max_attempts(mut self, max: MaxRetryAttempts) -> Self {
self.event_retry_max_attempts = max;
self
}
pub fn event_retry_delay(mut self, delay: Duration) -> Self {
self.event_retry_delay = delay;
self
}
pub fn event_retry_backoff_multiplier(mut self, multiplier: BackoffMultiplier) -> Self {
self.event_retry_backoff_multiplier = multiplier;
self
}
pub fn event_retry_max_delay(mut self, max_delay: Duration) -> Self {
self.event_retry_max_delay = max_delay;
self
}
fn to_poll_config(&self) -> PollConfig {
PollConfig {
poll_interval: self.poll_interval,
empty_poll_backoff: self.empty_poll_backoff,
poll_failure_backoff: self.poll_failure_backoff,
max_consecutive_poll_failures: self.max_consecutive_poll_failures,
}
}
fn to_event_retry_config(&self) -> EventRetryConfig {
EventRetryConfig {
max_retry_attempts: self.event_retry_max_attempts,
retry_delay: self.event_retry_delay,
retry_backoff_multiplier: self.event_retry_backoff_multiplier,
max_retry_delay: self.event_retry_max_delay,
}
}
fn to_poll_mode(&self) -> PollMode {
if self.continuous {
PollMode::Continuous
} else {
PollMode::Batch
}
}
}
pub async fn run_projection<P, B>(
projector: P,
backend: &B,
config: ProjectionConfig,
) -> Result<(), ProjectionError>
where
P: Projector,
P::Event: Event + Clone,
P::Context: Default,
P::Error: std::fmt::Debug,
B: EventReader + CheckpointStore + eventcore_types::ProjectorCoordinator,
<B as EventReader>::Error: std::fmt::Display,
{
let _guard = backend
.try_acquire(projector.name())
.await
.map_err(|e| ProjectionError::LeadershipError(e.to_string()))?;
let runner = ProjectionRunner::new(projector, backend)
.with_checkpoint_store(backend)
.with_poll_mode(config.to_poll_mode())
.with_poll_config(config.to_poll_config())
.with_event_retry_config(config.to_event_retry_config());
runner.run().await
}