arcly-http 0.3.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Transactional Outbox — reliable event publishing without dual writes.
//!
//! ## The dual-write problem
//!
//! "Commit the order, then publish `order.created`" is two systems with no
//! shared transaction: crash between them and the database and the event
//! stream disagree — silently. The outbox pattern closes the gap by writing
//! the event into an **outbox table inside the same database transaction** as
//! the business data, then letting a background relay publish it afterwards
//! with at-least-once semantics (consumers dedupe on `idempotency_key`).
//!
//! ## Division of labour
//!
//! - The app implements [`OutboxTx`] (enqueue inside its DB transaction),
//!   [`OutboxStore`] (fetch/ack pending rows), and [`OutboxPublisher`]
//!   (Kafka / NATS / webhook).
//! - The framework ships [`with_transaction`] (begin → work → commit/rollback)
//!   and [`OutboxRelay`] (background poll-publish-ack loop, spawned from
//!   `ArclyPlugin::on_start`).
//!
//! Nothing here touches the request hot path beyond the app's own
//! transaction; the relay is a single background task.

use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;

use crate::data::{DataError, DataSource};

// ─── Entry ────────────────────────────────────────────────────────────────────

/// One event enqueued in the same transaction as the business write.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct OutboxEntry {
    /// Store-assigned sequence (0 before persistence).
    pub id: u64,
    pub topic: String,
    pub payload: serde_json::Value,
    /// Stable per-event key — consumers dedupe on this under at-least-once.
    pub idempotency_key: String,
    pub tenant: Option<String>,
    /// W3C trace context of the producing request — consumers continue the
    /// trace instead of starting an orphan root (`TraceContext::from_traceparent`).
    #[serde(default)]
    pub traceparent: Option<String>,
}

// ─── Transaction-side contracts ───────────────────────────────────────────────

/// A live database transaction that can also enqueue outbox entries.
pub trait OutboxTx: Send {
    /// INSERT into the outbox table under THIS transaction — atomic with the
    /// business write. Visible to the relay only after commit.
    fn enqueue<'a>(&'a mut self, entry: OutboxEntry) -> BoxFuture<'a, Result<(), DataError>>;

    fn commit(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
    fn rollback(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
}

/// A datasource that can open outbox-capable transactions.
pub trait TransactionalDataSource: DataSource {
    fn begin(&self) -> BoxFuture<'_, Result<Box<dyn OutboxTx>, DataError>>;
}

/// Run `work` inside one transaction: commit on `Ok`, roll back on `Err`.
///
/// ```ignore
/// let order = with_transaction(ds, |tx| Box::pin(async move {
///     let order = repo.insert_order(tx, &dto).await?;
///     tx.enqueue(OutboxEntry { topic: "order.created".into() /* ... */ }).await?;
///     Ok(order)
/// })).await?;
/// ```
pub async fn with_transaction<D, T, F>(ds: &D, work: F) -> Result<T, DataError>
where
    D: TransactionalDataSource + ?Sized,
    F: for<'t> FnOnce(&'t mut Box<dyn OutboxTx>) -> BoxFuture<'t, Result<T, DataError>>,
{
    let mut tx = ds.begin().await?;
    match work(&mut tx).await {
        Ok(v) => {
            tx.commit().await?;
            Ok(v)
        }
        Err(e) => {
            // Roll back best-effort; surface the original error.
            let _ = tx.rollback().await;
            Err(e)
        }
    }
}

// ─── Relay-side contracts ─────────────────────────────────────────────────────

/// Committed-but-unpublished entries, fetched/acked by the relay.
pub trait OutboxStore: Send + Sync + 'static {
    /// Oldest committed-but-unpublished entries, up to `limit`, in order.
    fn fetch_pending(&self, limit: usize) -> BoxFuture<'_, Result<Vec<OutboxEntry>, DataError>>;
    /// Mark one entry delivered (idempotent — the relay may retry).
    fn mark_published(&self, id: u64) -> BoxFuture<'_, Result<(), DataError>>;
}

/// Downstream event transport (Kafka, NATS, webhook fan-out, …).
pub trait OutboxPublisher: Send + Sync + 'static {
    fn publish<'a>(
        &'a self,
        entry: &'a OutboxEntry,
    ) -> BoxFuture<'a, Result<(), crate::messaging::BoxError>>;
}

// ─── Relay ────────────────────────────────────────────────────────────────────

/// Background poll → publish → ack loop. **At-least-once**: an entry is acked
/// only after a successful publish, so a crash between the two replays it —
/// consumers must dedupe on `idempotency_key`.
#[non_exhaustive]
pub struct OutboxRelay {
    pub store: Arc<dyn OutboxStore>,
    pub publisher: Arc<dyn OutboxPublisher>,
    pub poll: Duration,
    pub batch: usize,
    /// Leader election: when set, only the replica holding this lock drains
    /// the outbox each tick — others skip silently. Eliminates duplicate
    /// publishing across a fleet (consumers still dedupe on idempotency_key
    /// for the at-least-once edge cases).
    pub leader: Option<(
        crate::resilience::DistributedLock,
        Arc<dyn crate::resilience::DLockBackend>,
    )>,
}

impl OutboxRelay {
    /// Defaults: 500ms poll, batch 32, no leader election.
    pub fn new(store: Arc<dyn OutboxStore>, publisher: Arc<dyn OutboxPublisher>) -> Self {
        Self {
            store,
            publisher,
            poll: Duration::from_millis(500),
            batch: 32,
            leader: None,
        }
    }
    pub fn poll(mut self, v: Duration) -> Self {
        self.poll = v;
        self
    }
    pub fn batch(mut self, v: usize) -> Self {
        self.batch = v;
        self
    }
    /// Enable leader election: only the lock holder drains each tick.
    pub fn leader(
        mut self,
        lock: crate::resilience::DistributedLock,
        backend: Arc<dyn crate::resilience::DLockBackend>,
    ) -> Self {
        self.leader = Some((lock, backend));
        self
    }

    /// Spawn the relay. Call from `ArclyPlugin::on_start`; the task dies with
    /// the runtime, so graceful shutdown needs no extra plumbing.
    ///
    /// **Drain-aware**: once a shutdown signal flips the process drain flag
    /// the relay stops claiming new batches — pending rows are durable and
    /// the next leader picks them up. **Backoff**: consecutive fetch
    /// failures double the wait (up to 8× the poll interval) instead of
    /// hammering a struggling store at full poll rate.
    pub fn spawn(self) {
        tokio::spawn(async move {
            let mut tick = tokio::time::interval(self.poll);
            tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
            let mut consecutive_fetch_errors: u32 = 0;
            loop {
                tick.tick().await;

                // Shutdown in progress: leave pending rows for the next
                // leader instead of starting work mid-drain.
                if crate::observability::health::is_draining() {
                    tracing::info!("outbox relay: drain flag set — stopping");
                    return;
                }

                // Error backoff: skip ticks after repeated fetch failures.
                if consecutive_fetch_errors > 0 {
                    let skip = (1u32 << consecutive_fetch_errors.min(3)) - 1; // 1,3,7
                    for _ in 0..skip {
                        tick.tick().await;
                    }
                }

                // Leader election (optional): skip the tick unless we win.
                // The guard is held for the whole batch and dropped after.
                let _leader_guard = match &self.leader {
                    None => None,
                    Some((lock, backend)) => match lock.try_lock(backend).await {
                        Some(g) => Some(g),
                        None => continue, // another replica is draining
                    },
                };

                let pending = match self.store.fetch_pending(self.batch).await {
                    Ok(p) => {
                        consecutive_fetch_errors = 0;
                        p
                    }
                    Err(e) => {
                        consecutive_fetch_errors = consecutive_fetch_errors.saturating_add(1);
                        metrics::counter!("outbox_fetch_errors_total").increment(1);
                        tracing::warn!(error = %e, attempts = consecutive_fetch_errors,
                            "outbox fetch failed — backing off");
                        continue;
                    }
                };
                for entry in &pending {
                    match self.publisher.publish(entry).await {
                        Ok(()) => {
                            if let Err(e) = self.store.mark_published(entry.id).await {
                                tracing::warn!(id = entry.id, error = %e,
                                    "outbox ack failed — entry will be republished");
                            } else {
                                metrics::counter!("outbox_published_total").increment(1);
                            }
                        }
                        Err(e) => {
                            metrics::counter!("outbox_publish_errors_total").increment(1);
                            tracing::warn!(id = entry.id, topic = %entry.topic, error = %e,
                                "outbox publish failed — will retry");
                            // Stop the batch: preserve per-topic ordering.
                            break;
                        }
                    }
                }
            }
        });
    }
}