pub struct OutboxManager<S, P, PT>{ /* private fields */ }Expand description
Long-running worker that publishes pending outbox events to the broker.
The manager owns one concrete OutboxStorage implementation (S), one
Transport implementation (P), and the user’s domain event payload type
(PT). After run is invoked, it will keep processing events
until the watched shutdown channel flips to true.
Prefer constructing through
OutboxManagerBuilder rather than
calling new directly — the builder reports missing
dependencies with a structured OutboxError::ConfigError instead of
requiring all arguments to line up positionally.
Implementations§
Source§impl<S, P, PT> OutboxManager<S, P, PT>
impl<S, P, PT> OutboxManager<S, P, PT>
Sourcepub fn new(
storage: Arc<S>,
publisher: Arc<P>,
config: Arc<OutboxConfig<PT>>,
shutdown_rx: Receiver<bool>,
) -> Self
pub fn new( storage: Arc<S>, publisher: Arc<P>, config: Arc<OutboxConfig<PT>>, shutdown_rx: Receiver<bool>, ) -> Self
Direct constructor used by
OutboxManagerBuilder.
Application code should normally go through the builder.
This signature is compiled when the dlq feature is disabled and omits
the DLQ heap argument.
Sourcepub async fn run(self) -> Result<(), OutboxError>
pub async fn run(self) -> Result<(), OutboxError>
Starts the main outbox worker loop.
This method will run until a shutdown signal is received via the
shutdown_rx channel. It coordinates three concerns:
- Event processing — on each wake-up it drives
OutboxProcessorin an inner drain loop until the fetched batch is empty, at which point it returns to waiting. - Wake-up sources — a
tokio::select!races a storage-levelLISTEN/notify call, a poll interval (config.poll_interval_secs), and the shutdown receiver. A notification error is logged and the loop sleeps for 5 seconds before retrying. - Garbage collection — a background task is spawned that ticks on
config.gc_interval_secsand calls [GarbageCollector::collect_garbage], exiting when the shutdown signal fires.
§Errors
Returns an OutboxError if the worker encounters a terminal failure
that it cannot recover from. In the current implementation transient
errors from the storage and transport layers are logged and the loop
continues, so a returned error signals that the worker observed a
graceful shutdown via shutdown_rx.
§Example
use std::sync::Arc;
use tokio::sync::watch;
use outbox_core::prelude::*;
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let manager = OutboxManagerBuilder::new()
.storage(storage)
.publisher(publisher)
.config(Arc::new(OutboxConfig::default()))
.shutdown_rx(shutdown_rx)
.build()?;
let handle = tokio::spawn(async move { manager.run().await });
// ... later, on a signal or process exit:
let _ = shutdown_tx.send(true);
handle.await.expect("worker panicked")?;