use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use serde::de::DeserializeOwned;
use sqlx::PgPool;
use sqlx::postgres::Postgres;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::lease::{self, AcquireOutcome, LeaseStatus};
use crate::{Aggregate, EventData, EventStore, EventStoreError, RecordedEvent, Subscription};
const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
pub struct HandledEvent<E: EventData> {
pub event: E,
pub recorded: RecordedEvent,
}
impl<E: EventData> HandledEvent<E> {
pub fn global_position(&self) -> i64 {
self.recorded.global_position
}
pub fn stream_id(&self) -> &str {
&self.recorded.stream_id
}
pub fn stream_version(&self) -> i64 {
self.recorded.stream_version
}
pub fn event_type(&self) -> &str {
&self.recorded.event_type
}
pub fn metadata(&self) -> &serde_json::Value {
&self.recorded.metadata
}
pub fn transaction_id(&self) -> u64 {
self.recorded.transaction_id
}
pub fn created_at(&self) -> chrono::DateTime<chrono::Utc> {
self.recorded.created_at
}
}
pub trait EventHandler: Send + Sync + 'static {
type Aggregate: Aggregate;
fn handle(
&self,
event: HandledEvent<<Self::Aggregate as Aggregate>::Event>,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send;
}
trait EventHandlerDyn: Send + Sync {
fn handle_dyn<'a>(
&'a self,
event: &'a RecordedEvent,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>>;
}
struct TypedAdapter<H: EventHandler> {
inner: H,
}
impl<H> EventHandlerDyn for TypedAdapter<H>
where
H: EventHandler,
<H::Aggregate as Aggregate>::Event: DeserializeOwned,
{
fn handle_dyn<'a>(
&'a self,
recorded: &'a RecordedEvent,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>> {
Box::pin(async move {
let event =
serde_json::from_value::<<H::Aggregate as Aggregate>::Event>(recorded.data.clone())
.map_err(|source| EventStoreError::Deserialization {
stream_id: recorded.stream_id.clone(),
global_position: recorded.global_position,
event_type: recorded.event_type.clone(),
source,
})?;
self.inner
.handle(HandledEvent {
event,
recorded: recorded.clone(),
})
.await
})
}
}
struct CategorySubscription {
category: String,
subscription_id: String,
handler: Arc<dyn EventHandlerDyn>,
}
pub struct ProjectionRunnerBuilder {
event_store: EventStore,
subscriptions: Vec<CategorySubscription>,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
worker_id: String,
lease_ttl: Duration,
lease_recheck_interval: Duration,
shutdown_grace: Duration,
}
impl ProjectionRunnerBuilder {
pub fn subscribe<H>(mut self, subscription_id: impl Into<String>, handler: H) -> Self
where
H: EventHandler,
<H::Aggregate as Aggregate>::Event: DeserializeOwned,
{
self.subscriptions.push(CategorySubscription {
category: H::Aggregate::stream_category().to_string(),
subscription_id: subscription_id.into(),
handler: Arc::new(TypedAdapter { inner: handler }),
});
self
}
pub fn poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn batch_size(mut self, batch_size: i64) -> Self {
self.batch_size = batch_size;
self
}
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = max_attempts;
self
}
pub fn worker_id(mut self, id: impl Into<String>) -> Self {
self.worker_id = id.into();
self
}
pub fn lease_ttl(mut self, ttl: Duration) -> Self {
self.lease_ttl = ttl;
self
}
pub fn lease_recheck_interval(mut self, d: Duration) -> Self {
self.lease_recheck_interval = d;
self
}
pub fn shutdown_grace(mut self, d: Duration) -> Self {
self.shutdown_grace = d;
self
}
pub fn build(self) -> ProjectionRunner {
let mut seen = HashSet::with_capacity(self.subscriptions.len());
for sub in &self.subscriptions {
if !seen.insert(sub.subscription_id.as_str()) {
panic!(
"duplicate subscription_id `{}` registered with ProjectionRunner — \
two handlers would race on the same checkpoint row",
sub.subscription_id,
);
}
}
let n_subs = self.subscriptions.len();
let pool_max = self.event_store.pool().options().get_max_connections() as usize;
let recommended = n_subs.saturating_mul(2).max(n_subs + 4);
if pool_max < recommended {
tracing::warn!(
pool_max_connections = pool_max,
subscriptions = n_subs,
recommended_minimum = recommended,
"ProjectionRunner: connection pool may be undersized. Each active \
lease holds 1 connection for heartbeat; the rest serve polls and \
handler queries. Consider PgPoolOptions::new().max_connections(N) \
with N >= subscriptions × 2.",
);
}
ProjectionRunner {
event_store: self.event_store,
subscriptions: self.subscriptions,
poll_interval: self.poll_interval,
batch_size: self.batch_size,
max_attempts: self.max_attempts,
worker_id: self.worker_id,
lease_ttl: self.lease_ttl,
lease_recheck_interval: self.lease_recheck_interval,
shutdown_grace: self.shutdown_grace,
}
}
}
pub struct ProjectionRunner {
event_store: EventStore,
subscriptions: Vec<CategorySubscription>,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
worker_id: String,
lease_ttl: Duration,
lease_recheck_interval: Duration,
shutdown_grace: Duration,
}
impl ProjectionRunner {
pub fn builder(event_store: EventStore) -> ProjectionRunnerBuilder {
ProjectionRunnerBuilder {
event_store,
subscriptions: Vec::new(),
poll_interval: Duration::from_millis(100),
batch_size: 100,
max_attempts: 3,
worker_id: Uuid::new_v4().to_string(),
lease_ttl: Duration::from_secs(30),
lease_recheck_interval: Duration::from_secs(5),
shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
}
}
pub async fn run(&self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
let internal_cancel = CancellationToken::new();
let bridge = {
let internal = internal_cancel.clone();
let external = cancellation_token.clone();
tokio::spawn(async move {
external.cancelled().await;
internal.cancel();
})
};
let mut joinset: JoinSet<LeaseLoopResult> = JoinSet::new();
for cat_sub in &self.subscriptions {
let work = LeaseLoopWork {
event_store: self.event_store.clone(),
category: cat_sub.category.clone(),
subscription_id: cat_sub.subscription_id.clone(),
handler: Arc::clone(&cat_sub.handler),
worker_id: self.worker_id.clone(),
lease_ttl: self.lease_ttl,
lease_recheck_interval: self.lease_recheck_interval,
poll_interval: self.poll_interval,
batch_size: self.batch_size,
max_attempts: self.max_attempts,
};
let cancel = internal_cancel.clone();
joinset.spawn(work.run(cancel));
}
let mut first_err: Option<anyhow::Error> = None;
if let Some(joined) = joinset.join_next().await {
match joined {
Ok(LeaseLoopResult {
subscription_id,
outcome: Ok(()),
}) => {
tracing::debug!(
subscription = %subscription_id,
"lease loop completed; cascading shutdown",
);
}
Ok(LeaseLoopResult {
subscription_id,
outcome: Err(e),
}) => {
tracing::error!(
subscription = %subscription_id,
error = %e,
"lease loop failed; cascading shutdown",
);
first_err = Some(e);
}
Err(join_err) => {
tracing::error!(error = ?join_err, "lease loop task panicked");
first_err = Some(anyhow::anyhow!("lease loop task panicked: {join_err}"));
}
}
}
internal_cancel.cancel();
let drain = async {
while let Some(joined) = joinset.join_next().await {
match joined {
Ok(LeaseLoopResult {
subscription_id,
outcome: Ok(()),
}) => {
tracing::debug!(
subscription = %subscription_id,
"lease loop drained",
);
}
Ok(LeaseLoopResult {
subscription_id,
outcome: Err(e),
}) => {
tracing::warn!(
subscription = %subscription_id,
error = %e,
"lease loop drained with error",
);
if first_err.is_none() {
first_err = Some(e);
}
}
Err(join_err) => {
tracing::error!(error = ?join_err, "lease loop drain panicked");
if first_err.is_none() {
first_err = Some(anyhow::anyhow!(
"lease loop task panicked during drain: {join_err}"
));
}
}
}
}
};
if tokio::time::timeout(self.shutdown_grace, drain)
.await
.is_err()
{
tracing::warn!(
grace = ?self.shutdown_grace,
remaining = joinset.len(),
"lease loops did not drain within grace period; aborting",
);
joinset.abort_all();
while joinset.join_next().await.is_some() {}
}
bridge.abort();
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
}
pub fn worker_id(&self) -> &str {
&self.worker_id
}
pub async fn lease_status(
&self,
subscription_id: &str,
) -> Result<Option<LeaseStatus>, EventStoreError> {
lease::status(self.event_store.pool(), subscription_id).await
}
}
struct LeaseLoopWork {
event_store: EventStore,
category: String,
subscription_id: String,
handler: Arc<dyn EventHandlerDyn>,
worker_id: String,
lease_ttl: Duration,
lease_recheck_interval: Duration,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
}
struct LeaseLoopResult {
subscription_id: String,
outcome: anyhow::Result<()>,
}
impl LeaseLoopWork {
async fn run(self, cancel: CancellationToken) -> LeaseLoopResult {
let outcome = lease_loop(
&self.event_store,
&self.category,
&self.subscription_id,
self.handler.as_ref(),
&self.worker_id,
self.lease_ttl,
self.lease_recheck_interval,
self.poll_interval,
self.batch_size,
self.max_attempts,
cancel,
)
.await;
LeaseLoopResult {
subscription_id: self.subscription_id,
outcome,
}
}
}
enum InnerExit {
Cancelled,
Fenced,
PoisonHandler(anyhow::Error),
Transient(EventStoreError),
}
#[allow(clippy::too_many_arguments)]
async fn lease_loop(
event_store: &EventStore,
category: &str,
subscription_id: &str,
handler: &dyn EventHandlerDyn,
worker_id: &str,
lease_ttl: Duration,
lease_recheck_interval: Duration,
poll_interval: Duration,
batch_size: i64,
max_attempts: u32,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let pool = event_store.pool().clone();
let heartbeat_interval = lease_ttl / 3;
loop {
if cancel.is_cancelled() {
return Ok(());
}
let fence_token =
match lease::try_acquire(&pool, subscription_id, worker_id, lease_ttl).await {
Ok(AcquireOutcome::Acquired { fence_token }) => fence_token,
Ok(AcquireOutcome::Held) => {
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
Err(e) => {
tracing::warn!(
subscription = %subscription_id,
worker = %worker_id,
error = %e,
"lease acquire failed; backing off",
);
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
};
tracing::info!(
subscription = %subscription_id,
worker = %worker_id,
fence_token,
"acquired projection lease",
);
let hb_conn = match pool.acquire().await {
Ok(c) => c,
Err(e) => {
tracing::warn!(
subscription = %subscription_id,
error = %e,
"could not acquire heartbeat connection; releasing lease",
);
let _ = lease::release(&pool, subscription_id, worker_id, fence_token).await;
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
};
let fence_signal = CancellationToken::new();
let hb_handle = {
let sub_id = subscription_id.to_string();
let worker = worker_id.to_string();
let fence_signal = fence_signal.clone();
let cancel = cancel.clone();
tokio::spawn(heartbeat_task(
hb_conn,
sub_id,
worker,
fence_token,
lease_ttl,
heartbeat_interval,
fence_signal,
cancel,
))
};
let subscription_result = Subscription::create(
event_store.clone(),
pool.clone(),
subscription_id,
batch_size,
)
.await;
let exit = match subscription_result {
Ok(mut subscription) => {
inner_loop(
&pool,
subscription_id,
category,
handler,
&mut subscription,
fence_token,
poll_interval,
max_attempts,
cancel.clone(),
fence_signal.clone(),
)
.await
}
Err(e) => InnerExit::Transient(e),
};
fence_signal.cancel();
let _ = hb_handle.await;
match exit {
InnerExit::Cancelled => {
let released = lease::release(&pool, subscription_id, worker_id, fence_token)
.await
.unwrap_or(false);
tracing::info!(
subscription = %subscription_id,
fence_token,
released,
"released projection lease on shutdown",
);
return Ok(());
}
InnerExit::Fenced => {
tracing::info!(
subscription = %subscription_id,
fence_token,
"projection lease fenced; re-acquiring",
);
continue;
}
InnerExit::PoisonHandler(err) => {
let _ = lease::release(&pool, subscription_id, worker_id, fence_token).await;
tracing::error!(
subscription = %subscription_id,
error = %err,
"subscription stopped: handler failed past max_attempts",
);
return Err(err);
}
InnerExit::Transient(e) => {
let _ = lease::release(&pool, subscription_id, worker_id, fence_token).await;
tracing::warn!(
subscription = %subscription_id,
error = %e,
"transient lease-loop error; backing off then re-acquiring",
);
tokio::select! {
_ = tokio::time::sleep(lease_recheck_interval) => continue,
_ = cancel.cancelled() => return Ok(()),
}
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn inner_loop(
pool: &PgPool,
subscription_id: &str,
category: &str,
handler: &dyn EventHandlerDyn,
subscription: &mut Subscription,
fence_token: i64,
poll_interval: Duration,
max_attempts: u32,
cancel: CancellationToken,
fence_signal: CancellationToken,
) -> InnerExit {
loop {
if cancel.is_cancelled() {
return InnerExit::Cancelled;
}
if fence_signal.is_cancelled() {
return InnerExit::Fenced;
}
let events = match subscription.poll_category(category).await {
Ok(e) => e,
Err(e) => return InnerExit::Transient(e),
};
if events.is_empty() {
tokio::select! {
_ = tokio::time::sleep(poll_interval) => continue,
_ = cancel.cancelled() => return InnerExit::Cancelled,
_ = fence_signal.cancelled() => return InnerExit::Fenced,
}
}
let mut last_done: Option<(i64, u64)> = None;
for event in &events {
if let Err(e) =
dispatch_with_retries(handler, event, max_attempts, subscription_id).await
{
if let Some((pos, txid)) = last_done {
let _ = lease::checkpoint(pool, subscription_id, fence_token, pos, txid).await;
}
return InnerExit::PoisonHandler(e);
}
last_done = Some((event.global_position, event.transaction_id));
if cancel.is_cancelled() {
if let Some((pos, txid)) = last_done {
let _ = lease::checkpoint(pool, subscription_id, fence_token, pos, txid).await;
}
return InnerExit::Cancelled;
}
if fence_signal.is_cancelled() {
return InnerExit::Fenced;
}
}
if let Some((pos, txid)) = last_done {
let ok = match lease::checkpoint(pool, subscription_id, fence_token, pos, txid).await {
Ok(b) => b,
Err(e) => return InnerExit::Transient(e),
};
if !ok {
return InnerExit::Fenced;
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn heartbeat_task(
mut conn: sqlx::pool::PoolConnection<Postgres>,
subscription_id: String,
worker_id: String,
fence_token: i64,
lease_ttl: Duration,
heartbeat_interval: Duration,
fence_signal: CancellationToken,
cancel: CancellationToken,
) {
let mut interval = tokio::time::interval(heartbeat_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
match lease::heartbeat(
&mut conn, &subscription_id, &worker_id, fence_token, lease_ttl,
).await {
Ok(true) => {}
Ok(false) => {
tracing::info!(
subscription = %subscription_id,
fence_token,
"heartbeat fenced; signalling inner loop",
);
fence_signal.cancel();
return;
}
Err(e) => {
tracing::warn!(
subscription = %subscription_id,
error = %e,
"heartbeat error; treating as fenced to re-acquire",
);
fence_signal.cancel();
return;
}
}
}
_ = cancel.cancelled() => return,
_ = fence_signal.cancelled() => return,
}
}
}
async fn dispatch_with_retries(
handler: &dyn EventHandlerDyn,
event: &RecordedEvent,
max_attempts: u32,
subscription_id: &str,
) -> anyhow::Result<()> {
let mut last_err = None;
for attempt in 0..max_attempts {
match handler.handle_dyn(event).await {
Ok(()) => return Ok(()),
Err(e) => {
let delay = match attempt {
0 => Duration::from_millis(100),
1 => Duration::from_millis(500),
_ => Duration::from_secs(2),
};
tracing::warn!(
subscription = %subscription_id,
event_type = %event.event_type,
global_position = event.global_position,
attempt = attempt + 1,
"projection handler failed, retrying in {delay:?}: {e:#}",
);
last_err = Some(e);
tokio::time::sleep(delay).await;
}
}
}
let err = last_err.expect("max_attempts > 0");
tracing::error!(
subscription = %subscription_id,
event_type = %event.event_type,
global_position = event.global_position,
"projection handler failed after {} attempts: {err:#}",
max_attempts,
);
Err(err)
}