use std::sync::Arc;
use std::time::Duration;
use futures::future::BoxFuture;
use crate::data::{DataError, DataSource};
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct OutboxEntry {
pub id: u64,
pub topic: String,
pub payload: serde_json::Value,
pub idempotency_key: String,
pub tenant: Option<String>,
#[serde(default)]
pub traceparent: Option<String>,
}
pub trait OutboxTx: Send {
fn enqueue<'a>(&'a mut self, entry: OutboxEntry) -> BoxFuture<'a, Result<(), DataError>>;
fn commit(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
fn rollback(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
}
pub trait TransactionalDataSource: DataSource {
fn begin(&self) -> BoxFuture<'_, Result<Box<dyn OutboxTx>, DataError>>;
}
pub async fn with_transaction<D, T, F>(ds: &D, work: F) -> Result<T, DataError>
where
D: TransactionalDataSource + ?Sized,
F: for<'t> FnOnce(&'t mut Box<dyn OutboxTx>) -> BoxFuture<'t, Result<T, DataError>>,
{
let mut tx = ds.begin().await?;
match work(&mut tx).await {
Ok(v) => {
tx.commit().await?;
Ok(v)
}
Err(e) => {
let _ = tx.rollback().await;
Err(e)
}
}
}
pub trait OutboxStore: Send + Sync + 'static {
fn fetch_pending(&self, limit: usize) -> BoxFuture<'_, Result<Vec<OutboxEntry>, DataError>>;
fn mark_published(&self, id: u64) -> BoxFuture<'_, Result<(), DataError>>;
}
pub trait OutboxPublisher: Send + Sync + 'static {
fn publish<'a>(&'a self, entry: &'a OutboxEntry) -> BoxFuture<'a, Result<(), String>>;
}
pub struct OutboxRelay {
pub store: Arc<dyn OutboxStore>,
pub publisher: Arc<dyn OutboxPublisher>,
pub poll: Duration,
pub batch: usize,
pub leader: Option<(
crate::resilience::DistributedLock,
Arc<dyn crate::resilience::DLockBackend>,
)>,
}
impl OutboxRelay {
pub fn spawn(self) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(self.poll);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut consecutive_fetch_errors: u32 = 0;
loop {
tick.tick().await;
if crate::observability::health::is_draining() {
tracing::info!("outbox relay: drain flag set — stopping");
return;
}
if consecutive_fetch_errors > 0 {
let skip = (1u32 << consecutive_fetch_errors.min(3)) - 1; for _ in 0..skip {
tick.tick().await;
}
}
let _leader_guard = match &self.leader {
None => None,
Some((lock, backend)) => match lock.try_lock(backend).await {
Some(g) => Some(g),
None => continue, },
};
let pending = match self.store.fetch_pending(self.batch).await {
Ok(p) => {
consecutive_fetch_errors = 0;
p
}
Err(e) => {
consecutive_fetch_errors = consecutive_fetch_errors.saturating_add(1);
metrics::counter!("outbox_fetch_errors_total").increment(1);
tracing::warn!(error = %e, attempts = consecutive_fetch_errors,
"outbox fetch failed — backing off");
continue;
}
};
for entry in &pending {
match self.publisher.publish(entry).await {
Ok(()) => {
if let Err(e) = self.store.mark_published(entry.id).await {
tracing::warn!(id = entry.id, error = %e,
"outbox ack failed — entry will be republished");
} else {
metrics::counter!("outbox_published_total").increment(1);
}
}
Err(e) => {
metrics::counter!("outbox_publish_errors_total").increment(1);
tracing::warn!(id = entry.id, topic = %entry.topic, error = %e,
"outbox publish failed — will retry");
break;
}
}
}
}
});
}
}