use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use futures::future::BoxFuture;
use crate::core::engine::FrozenDiContainer;
use crate::messaging::{EventContext, EventHandlerDescriptor, InboundMessage, MessageTransport};
use crate::observability::propagation::TraceContext;
type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), String>>;
pub struct ConsumerRuntime {
pub transport: Arc<dyn MessageTransport>,
pub container: &'static FrozenDiContainer,
pub poll: Duration,
pub batch: usize,
pub max_retries: u32,
pub dedupe_ttl_secs: u64,
}
impl ConsumerRuntime {
pub fn spawn(self) {
let dispatch: HashMap<&'static str, Handler> =
inventory::iter::<&'static EventHandlerDescriptor>
.into_iter()
.map(|d| (d.topic, d.handler))
.collect();
for d in inventory::iter::<&'static EventHandlerDescriptor> {
tracing::info!(
topic = d.topic,
consumer = d.consumer,
"event handler registered"
);
}
tokio::spawn(async move {
let mut attempts: HashMap<String, u32> = HashMap::new();
let mut tick = tokio::time::interval(self.poll);
loop {
tick.tick().await;
let batch = match self.transport.poll(self.batch).await {
Ok(b) => b,
Err(e) => {
tracing::warn!(error = %e, "transport poll failed — retrying next tick");
continue;
}
};
for msg in batch {
self.process(&dispatch, &mut attempts, msg).await;
}
}
});
}
async fn process(
&self,
dispatch: &HashMap<&'static str, Handler>,
attempts: &mut HashMap<String, u32>,
msg: InboundMessage,
) {
let Some(handler) = dispatch.get(msg.topic.as_str()) else {
tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
let _ = self.transport.ack(&msg).await;
return;
};
let store = self
.container
.try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
if let Some(store) = store {
match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
crate::web::idempotency::IdempotencyDecision::Fresh => {}
crate::web::idempotency::IdempotencyDecision::Unavailable => {}
crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
metrics::counter!("events_deduped_total").increment(1);
let _ = self.transport.ack(&msg).await;
return;
}
crate::web::idempotency::IdempotencyDecision::InFlight => {
let _ = self.transport.nack(&msg).await;
return;
}
}
}
let trace = msg
.traceparent
.as_deref()
.and_then(TraceContext::from_traceparent)
.unwrap_or_else(TraceContext::new_root);
let ctx = EventContext {
message: msg.clone(),
container: self.container,
trace,
};
match handler(ctx).await {
Ok(()) => {
attempts.remove(&msg.idempotency_key);
if let Some(store) = store {
store
.complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
.await;
}
metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
.increment(1);
if let Err(e) = self.transport.ack(&msg).await {
tracing::warn!(error = %e, "ack failed — message may redeliver");
}
}
Err(reason) => {
if let Some(store) = store {
store.release(&dedupe_key).await; }
let n = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
*n += 1;
if *n > self.max_retries {
attempts.remove(&msg.idempotency_key);
metrics::counter!("events_dead_lettered_total").increment(1);
tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
attempts = self.max_retries, reason = %reason,
"poison message → dead letter");
let mut parked = msg.clone();
if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
masker.apply(&mut parked.payload);
}
let _ = self.transport.dead_letter(&parked, &reason).await;
} else {
metrics::counter!("events_retried_total").increment(1);
tracing::warn!(topic = %msg.topic, attempt = *n, reason = %reason,
"event handler failed — nack for retry");
let _ = self.transport.nack(&msg).await;
}
}
}
}
}