use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use serde::de::DeserializeOwned;
use sqlx::PgPool;
use sqlx::postgres::Postgres;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::lease::{self, AcquireOutcome, LeaseStatus};
use crate::{Aggregate, EventData, EventStore, EventStoreError, RecordedEvent, Subscription};
const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
pub struct HandledEvent<E: EventData> {
pub event: E,
pub recorded: RecordedEvent,
}
impl<E: EventData> HandledEvent<E> {
pub fn global_position(&self) -> i64 {
self.recorded.global_position
}
pub fn stream_id(&self) -> &str {
&self.recorded.stream_id
}
pub fn stream_version(&self) -> i64 {
self.recorded.stream_version
}
pub fn event_type(&self) -> &str {
&self.recorded.event_type
}
pub fn metadata(&self) -> &serde_json::Value {
&self.recorded.metadata
}
pub fn transaction_id(&self) -> u64 {
self.recorded.transaction_id
}
pub fn created_at(&self) -> chrono::DateTime<chrono::Utc> {
self.recorded.created_at
}
}
pub trait EventHandler: Send + Sync + 'static {
type Aggregate: Aggregate;
fn handle(
&self,
event: HandledEvent<<Self::Aggregate as Aggregate>::Event>,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
}
trait EventHandlerDyn: Send + Sync {
fn handle_dyn<'a>(
&'a self,
event: &'a RecordedEvent,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>>;
}
struct TypedAdapter<H: EventHandler> {
inner: H,
}
impl<H> EventHandlerDyn for TypedAdapter<H>
where
H: EventHandler,
<H::Aggregate as Aggregate>::Event: DeserializeOwned,
{
fn handle_dyn<'a>(
&'a self,
recorded: &'a RecordedEvent,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>> {
Box::pin(async move {
let event =
serde_json::from_value::<<H::Aggregate as Aggregate>::Event>(recorded.data.clone())
.map_err(|source| EventStoreError::Deserialization {
stream_id: recorded.stream_id.clone(),
global_position: recorded.global_position,
event_type: recorded.event_type.clone(),
source,
})?;
self.inner
.handle(HandledEvent {
event,
recorded: recorded.clone(),
})
.await
})
}
}
pub trait TransactionalEventHandler: Send + Sync + 'static {
type Aggregate: Aggregate;
fn handle(
&self,
event: HandledEvent<<Self::Aggregate as Aggregate>::Event>,
conn: &mut sqlx::PgConnection,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
}
trait TxEventHandlerDyn: Send + Sync {
fn handle_dyn<'a>(
&'a self,
recorded: &'a RecordedEvent,
conn: &'a mut sqlx::PgConnection,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>>;
}
struct TxTypedAdapter<H: TransactionalEventHandler> {
inner: H,
}
impl<H> TxEventHandlerDyn for TxTypedAdapter<H>
where
H: TransactionalEventHandler,
<H::Aggregate as Aggregate>::Event: DeserializeOwned,
{
fn handle_dyn<'a>(
&'a self,
recorded: &'a RecordedEvent,
conn: &'a mut sqlx::PgConnection,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>> {
Box::pin(async move {
let event =
serde_json::from_value::<<H::Aggregate as Aggregate>::Event>(recorded.data.clone())
.map_err(|source| EventStoreError::Deserialization {
stream_id: recorded.stream_id.clone(),
global_position: recorded.global_position,
event_type: recorded.event_type.clone(),
source,
})?;
self.inner
.handle(
HandledEvent {
event,
recorded: recorded.clone(),
},
conn,
)
.await
})
}
}
#[derive(Clone)]
enum SubHandler {
Plain(Arc<dyn EventHandlerDyn>),
Transactional(Arc<dyn TxEventHandlerDyn>),
}
struct CategorySubscription {
category: String,
subscription_id: String,
handler: SubHandler,
}
pub struct ProjectionRunnerBuilder {
event_store: EventStore,
subscriptions: Vec<CategorySubscription>,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
worker_id: String,
lease_ttl: Duration,
lease_recheck_interval: Duration,
shutdown_grace: Duration,
}
impl ProjectionRunnerBuilder {
pub fn subscribe<H>(mut self, subscription_id: impl Into<String>, handler: H) -> Self
where
H: EventHandler,
<H::Aggregate as Aggregate>::Event: DeserializeOwned,
{
self.subscriptions.push(CategorySubscription {
category: H::Aggregate::stream_category().to_string(),
subscription_id: subscription_id.into(),
handler: SubHandler::Plain(Arc::new(TypedAdapter { inner: handler })),
});
self
}
pub fn subscribe_transactional<H>(
mut self,
subscription_id: impl Into<String>,
handler: H,
) -> Self
where
H: TransactionalEventHandler,
<H::Aggregate as Aggregate>::Event: DeserializeOwned,
{
self.subscriptions.push(CategorySubscription {
category: H::Aggregate::stream_category().to_string(),
subscription_id: subscription_id.into(),
handler: SubHandler::Transactional(Arc::new(TxTypedAdapter { inner: handler })),
});
self
}
pub fn poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn batch_size(mut self, batch_size: i64) -> Self {
self.batch_size = batch_size;
self
}
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = max_attempts;
self
}
pub fn worker_id(mut self, id: impl Into<String>) -> Self {
self.worker_id = id.into();
self
}
pub fn lease_ttl(mut self, ttl: Duration) -> Self {
self.lease_ttl = ttl;
self
}
pub fn lease_recheck_interval(mut self, d: Duration) -> Self {
self.lease_recheck_interval = d;
self
}
pub fn shutdown_grace(mut self, d: Duration) -> Self {
self.shutdown_grace = d;
self
}
pub fn build(self) -> ProjectionRunner {
assert!(
self.max_attempts >= 1,
"max_attempts must be >= 1 (got 0) — a handler must run at least once",
);
let mut seen = HashSet::with_capacity(self.subscriptions.len());
for sub in &self.subscriptions {
if !seen.insert(sub.subscription_id.as_str()) {
panic!(
"duplicate subscription_id `{}` registered with ProjectionRunner — \
two handlers would race on the same checkpoint row",
sub.subscription_id,
);
}
}
let n_subs = self.subscriptions.len();
let pool_max = self.event_store.pool().options().get_max_connections() as usize;
let recommended = n_subs.saturating_mul(2).max(n_subs + 4);
if pool_max < recommended {
tracing::warn!(
pool_max_connections = pool_max,
subscriptions = n_subs,
recommended_minimum = recommended,
"ProjectionRunner: connection pool may be undersized. Each active \
lease holds 1 connection for heartbeat; the rest serve polls and \
handler queries. Consider PgPoolOptions::new().max_connections(N) \
with N >= subscriptions × 2.",
);
}
let effective_worker_id = format!("{}#{}", self.worker_id, Uuid::new_v4());
ProjectionRunner {
event_store: self.event_store,
subscriptions: self.subscriptions,
poll_interval: self.poll_interval,
batch_size: self.batch_size,
max_attempts: self.max_attempts,
worker_id: effective_worker_id,
lease_ttl: self.lease_ttl,
lease_recheck_interval: self.lease_recheck_interval,
shutdown_grace: self.shutdown_grace,
}
}
}
pub struct ProjectionRunner {
event_store: EventStore,
subscriptions: Vec<CategorySubscription>,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
worker_id: String,
lease_ttl: Duration,
lease_recheck_interval: Duration,
shutdown_grace: Duration,
}
impl ProjectionRunner {
pub fn builder(event_store: EventStore) -> ProjectionRunnerBuilder {
ProjectionRunnerBuilder {
event_store,
subscriptions: Vec::new(),
poll_interval: Duration::from_millis(100),
batch_size: 100,
max_attempts: 3,
worker_id: Uuid::new_v4().to_string(),
lease_ttl: Duration::from_secs(30),
lease_recheck_interval: Duration::from_secs(5),
shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
}
}
pub async fn run(self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
let internal_cancel = CancellationToken::new();
let bridge = {
let internal = internal_cancel.clone();
let external = cancellation_token.clone();
tokio::spawn(async move {
external.cancelled().await;
internal.cancel();
})
};
let mut joinset: JoinSet<LeaseLoopResult> = JoinSet::new();
for cat_sub in &self.subscriptions {
let work = LeaseLoopWork {
event_store: self.event_store.clone(),
category: cat_sub.category.clone(),
subscription_id: cat_sub.subscription_id.clone(),
handler: cat_sub.handler.clone(),
worker_id: self.worker_id.clone(),
lease_ttl: self.lease_ttl,
lease_recheck_interval: self.lease_recheck_interval,
poll_interval: self.poll_interval,
batch_size: self.batch_size,
max_attempts: self.max_attempts,
};
let cancel = internal_cancel.clone();
joinset.spawn(work.run(cancel));
}
let mut first_err: Option<anyhow::Error> = None;
if let Some(joined) = joinset.join_next().await {
match joined {
Ok(LeaseLoopResult {
subscription_id,
outcome: Ok(()),
}) => {
tracing::debug!(
subscription = %subscription_id,
"lease loop completed; cascading shutdown",
);
}
Ok(LeaseLoopResult {
subscription_id,
outcome: Err(e),
}) => {
tracing::error!(
subscription = %subscription_id,
error = %e,
"lease loop failed; cascading shutdown",
);
first_err = Some(e);
}
Err(join_err) => {
tracing::error!(error = ?join_err, "lease loop task panicked");
first_err = Some(anyhow::anyhow!("lease loop task panicked: {join_err}"));
}
}
}
internal_cancel.cancel();
let drain = async {
while let Some(joined) = joinset.join_next().await {
match joined {
Ok(LeaseLoopResult {
subscription_id,
outcome: Ok(()),
}) => {
tracing::debug!(
subscription = %subscription_id,
"lease loop drained",
);
}
Ok(LeaseLoopResult {
subscription_id,
outcome: Err(e),
}) => {
tracing::warn!(
subscription = %subscription_id,
error = %e,
"lease loop drained with error",
);
if first_err.is_none() {
first_err = Some(e);
}
}
Err(join_err) => {
tracing::error!(error = ?join_err, "lease loop drain panicked");
if first_err.is_none() {
first_err = Some(anyhow::anyhow!(
"lease loop task panicked during drain: {join_err}"
));
}
}
}
}
};
if tokio::time::timeout(self.shutdown_grace, drain)
.await
.is_err()
{
tracing::warn!(
grace = ?self.shutdown_grace,
remaining = joinset.len(),
"lease loops did not drain within grace period; aborting",
);
joinset.abort_all();
while joinset.join_next().await.is_some() {}
}
bridge.abort();
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
}
pub fn worker_id(&self) -> &str {
&self.worker_id
}
pub async fn lease_status(
&self,
subscription_id: &str,
) -> Result<Option<LeaseStatus>, EventStoreError> {
lease::status(self.event_store.pool(), subscription_id).await
}
}
struct LeaseLoopWork {
event_store: EventStore,
category: String,
subscription_id: String,
handler: SubHandler,
worker_id: String,
lease_ttl: Duration,
lease_recheck_interval: Duration,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
}
struct LeaseLoopResult {
subscription_id: String,
outcome: anyhow::Result<()>,
}
impl LeaseLoopWork {
async fn run(self, cancel: CancellationToken) -> LeaseLoopResult {
let outcome = lease_loop(
&self.event_store,
&self.category,
&self.subscription_id,
&self.handler,
&self.worker_id,
self.lease_ttl,
self.lease_recheck_interval,
self.poll_interval,
self.batch_size,
self.max_attempts,
cancel,
)
.await;
LeaseLoopResult {
subscription_id: self.subscription_id,
outcome,
}
}
}
enum InnerExit {
Cancelled,
Fenced,
PoisonHandler(anyhow::Error),
Transient(EventStoreError),
}
#[allow(clippy::too_many_arguments)]
async fn lease_loop(
event_store: &EventStore,
category: &str,
subscription_id: &str,
handler: &SubHandler,
worker_id: &str,
lease_ttl: Duration,
lease_recheck_interval: Duration,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let pool = event_store.pool().clone();
let heartbeat_interval = lease_ttl / 3;
loop {
if cancel.is_cancelled() {
return Ok(());
}
let fence_token =
match lease::try_acquire(&pool, subscription_id, worker_id, lease_ttl).await {
Ok(AcquireOutcome::Acquired { fence_token }) => fence_token,
Ok(AcquireOutcome::Held) => {
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
Err(e) => {
tracing::warn!(
subscription = %subscription_id,
worker = %worker_id,
error = %e,
"lease acquire failed; backing off",
);
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
};
tracing::info!(
subscription = %subscription_id,
worker = %worker_id,
fence_token,
"acquired projection lease",
);
let hb_conn = match pool.acquire().await {
Ok(c) => c,
Err(e) => {
tracing::warn!(
subscription = %subscription_id,
error = %e,
"could not acquire heartbeat connection; releasing lease",
);
let _ = lease::release(&pool, subscription_id, worker_id, fence_token).await;
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
};
let fence_signal = CancellationToken::new();
let hb_handle = {
let sub_id = subscription_id.to_string();
let worker = worker_id.to_string();
let fence_signal = fence_signal.clone();
let cancel = cancel.clone();
tokio::spawn(heartbeat_task(
hb_conn,
sub_id,
worker,
fence_token,
lease_ttl,
heartbeat_interval,
fence_signal,
cancel,
))
};
let subscription_result = Subscription::create(
event_store.clone(),
pool.clone(),
subscription_id,
batch_size,
)
.await;
let exit = match subscription_result {
Ok(mut subscription) => {
inner_loop(
&pool,
subscription_id,
category,
handler,
&mut subscription,
fence_token,
poll_interval,
max_attempts,
cancel.clone(),
fence_signal.clone(),
)
.await
}
Err(e) => InnerExit::Transient(e),
};
fence_signal.cancel();
let _ = hb_handle.await;
match exit {
InnerExit::Cancelled => {
let released = lease::release(&pool, subscription_id, worker_id, fence_token)
.await
.unwrap_or(false);
tracing::info!(
subscription = %subscription_id,
fence_token,
released,
"released projection lease on shutdown",
);
return Ok(());
}
InnerExit::Fenced => {
tracing::info!(
subscription = %subscription_id,
fence_token,
"projection lease fenced; re-acquiring",
);
continue;
}
InnerExit::PoisonHandler(err) => {
let _ = lease::release(&pool, subscription_id, worker_id, fence_token).await;
tracing::error!(
subscription = %subscription_id,
error = %err,
"subscription stopped: handler failed past max_attempts",
);
return Err(err);
}
InnerExit::Transient(e) => {
let _ = lease::release(&pool, subscription_id, worker_id, fence_token).await;
tracing::warn!(
subscription = %subscription_id,
error = %e,
"transient lease-loop error; backing off then re-acquiring",
);
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn inner_loop(
pool: &PgPool,
subscription_id: &str,
category: &str,
handler: &SubHandler,
subscription: &mut Subscription,
fence_token: i64,
poll_interval: Duration,
max_attempts: u32,
cancel: CancellationToken,
fence_signal: CancellationToken,
) -> InnerExit {
loop {
if cancel.is_cancelled() {
return InnerExit::Cancelled;
}
if fence_signal.is_cancelled() {
return InnerExit::Fenced;
}
let events = match subscription.poll_category(category).await {
Ok(e) => e,
Err(e @ EventStoreError::OrderingViolation { .. }) => {
return InnerExit::PoisonHandler(e.into());
}
Err(e) => return InnerExit::Transient(e),
};
if events.is_empty() {
tokio::select! {
_ = tokio::time::sleep(poll_interval) => continue,
_ = cancel.cancelled() => return InnerExit::Cancelled,
_ = fence_signal.cancelled() => return InnerExit::Fenced,
}
}
let cursors: Vec<(u64, i64)> = (0..=events.len())
.map(|k| subscription.checkpoint_cursor_after(k))
.collect();
let exit = match handler {
SubHandler::Plain(h) => {
dispatch_plain_batch(
pool,
subscription_id,
h.as_ref(),
&events,
&cursors,
fence_token,
max_attempts,
&cancel,
&fence_signal,
)
.await
}
SubHandler::Transactional(h) => {
dispatch_transactional_batch(
pool,
subscription_id,
h.as_ref(),
&events,
cursors[events.len()],
fence_token,
max_attempts,
&fence_signal,
)
.await
}
};
if let Some(exit) = exit {
return exit;
}
}
}
#[allow(clippy::too_many_arguments)]
async fn dispatch_plain_batch(
pool: &PgPool,
subscription_id: &str,
handler: &dyn EventHandlerDyn,
events: &[RecordedEvent],
cursors: &[(u64, i64)],
fence_token: i64,
max_attempts: u32,
cancel: &CancellationToken,
fence_signal: &CancellationToken,
) -> Option<InnerExit> {
let mut handled: usize = 0;
for event in events {
if let Err(e) = dispatch_with_retries(handler, event, max_attempts, subscription_id).await {
if handled > 0 {
let (txid, pos) = cursors[handled];
let _ = lease::checkpoint(pool, subscription_id, fence_token, pos, txid).await;
}
return Some(InnerExit::PoisonHandler(e));
}
handled += 1;
if cancel.is_cancelled() {
let (txid, pos) = cursors[handled];
let _ = lease::checkpoint(pool, subscription_id, fence_token, pos, txid).await;
return Some(InnerExit::Cancelled);
}
if fence_signal.is_cancelled() {
return Some(InnerExit::Fenced);
}
}
let (txid, pos) = cursors[handled];
match lease::checkpoint(pool, subscription_id, fence_token, pos, txid).await {
Ok(true) => {}
Ok(false) => return Some(InnerExit::Fenced),
Err(e) => return Some(InnerExit::Transient(e)),
}
None
}
#[allow(clippy::too_many_arguments)]
async fn dispatch_transactional_batch(
pool: &PgPool,
subscription_id: &str,
handler: &dyn TxEventHandlerDyn,
events: &[RecordedEvent],
end_cursor: (u64, i64),
fence_token: i64,
max_attempts: u32,
fence_signal: &CancellationToken,
) -> Option<InnerExit> {
if events.is_empty() {
return None;
}
let (txid, pos) = end_cursor;
for attempt in 0..max_attempts {
if fence_signal.is_cancelled() {
return Some(InnerExit::Fenced);
}
let mut tx = match pool.begin().await {
Ok(t) => t,
Err(e) => return Some(InnerExit::Transient(e.into())),
};
let mut handler_err = None;
for event in events {
if let Err(e) = handler.handle_dyn(event, &mut tx).await {
handler_err = Some(e);
break;
}
}
if let Some(e) = handler_err {
let _ = tx.rollback().await;
if attempt + 1 >= max_attempts {
return Some(InnerExit::PoisonHandler(e));
}
tokio::time::sleep(Duration::from_millis(20 * (attempt as u64 + 1))).await;
continue;
}
match lease::checkpoint(&mut *tx, subscription_id, fence_token, pos, txid).await {
Ok(true) => match tx.commit().await {
Ok(()) => return None,
Err(e) => return Some(InnerExit::Transient(e.into())),
},
Ok(false) => {
let _ = tx.rollback().await;
return Some(InnerExit::Fenced);
}
Err(e) => {
let _ = tx.rollback().await;
return Some(InnerExit::Transient(e));
}
}
}
None
}
#[allow(clippy::too_many_arguments)]
async fn heartbeat_task(
mut conn: sqlx::pool::PoolConnection<Postgres>,
subscription_id: String,
worker_id: String,
fence_token: i64,
lease_ttl: Duration,
heartbeat_interval: Duration,
fence_signal: CancellationToken,
cancel: CancellationToken,
) {
let mut interval = tokio::time::interval(heartbeat_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
match lease::heartbeat(
&mut conn, &subscription_id, &worker_id, fence_token, lease_ttl,
).await {
Ok(true) => {}
Ok(false) => {
tracing::info!(
subscription = %subscription_id,
fence_token,
"heartbeat fenced; signalling inner loop",
);
fence_signal.cancel();
return;
}
Err(e) => {
tracing::warn!(
subscription = %subscription_id,
error = %e,
"heartbeat error; treating as fenced to re-acquire",
);
fence_signal.cancel();
return;
}
}
}
_ = cancel.cancelled() => return,
_ = fence_signal.cancelled() => return,
}
}
}
async fn dispatch_with_retries(
handler: &dyn EventHandlerDyn,
event: &RecordedEvent,
max_attempts: u32,
subscription_id: &str,
) -> anyhow::Result<()> {
let mut last_err = None;
for attempt in 0..max_attempts {
match handler.handle_dyn(event).await {
Ok(()) => return Ok(()),
Err(e) => {
let delay = match attempt {
0 => Duration::from_millis(100),
1 => Duration::from_millis(500),
_ => Duration::from_secs(2),
};
tracing::warn!(
subscription = %subscription_id,
event_type = %event.event_type,
global_position = event.global_position,
attempt = attempt + 1,
"projection handler failed, retrying in {delay:?}: {e:#}",
);
last_err = Some(e);
tokio::time::sleep(delay).await;
}
}
}
let err = last_err.expect("max_attempts > 0");
tracing::error!(
subscription = %subscription_id,
event_type = %event.event_type,
global_position = event.global_position,
"projection handler failed after {} attempts: {err:#}",
max_attempts,
);
Err(err)
}