Expand description
Bounded async MPSC/SPSC channel with item expiry.
Caducus (latin) = perishable
Caducus is a bounded asynchronous channel with two operating modes:
single-producer single-consumer (SpscBuilder / SpscSender) and
multi-producer single-consumer (MpscBuilder / MpscSender). Items
carry a time-to-live and are evicted when they expire; the eviction is
observable through user-supplied ReportChannels.
Use shutdown() on the sender side for controlled channel teardown. Dropping the last
sender also triggers a hard shutdown that drains and reports any items still in
the buffer.
Caducus runs on Tokio. A runtime handle must be available either implicitly
(call build() from within a Tokio task) or explicitly via
SpscBuilder::runtime / MpscBuilder::runtime.
§Example
use std::time::Duration;
use caducus::MpscBuilder;
#[tokio::main]
async fn main() {
let (tx, rx) = MpscBuilder::<u32>::new(128, Duration::from_secs(5))
.build()
.expect("tokio runtime available");
tx.send(42).expect("buffer not full");
let item = rx.next(None).await.expect("received before timeout");
assert_eq!(item, 42);
}§Caducus Bounded Buffer With Expiry
Status: Developed
§Objectives
- Provide a bounded asynchronous channel with expiry and hard shutdown semantics.
- Support two operating modes: SPSC (single producer, single consumer) and MPSC (multi producer, single consumer). Mode is set at build time and immutable.
- Keep the implementation split into storage, concurrency, reclaimer, sender, and receiver layers.
- Use a mutex-based architecture with Vec-backed ring buffer storage.
- Report expiry and shutdown outcomes through sender-owned report channels.
- Support per-item expiry sends that can override the configured default TTL with either a validated per-send TTL or a future absolute deadline.
§Technical Details
§Layering
Five modules, strict separation of concerns. Each module doc is the single source of truth for that module’s behaviour.
§Ring Buffer — Data Ownership And Integrity
src/concurrency/ring_buffer.rs, private submodule of the concurrency module.
The ring buffer is the single authority for stored queue accounting data and its validity. Default sends let the ring calculate expiry from the configured TTL. Per-item send variants validate their caller-supplied TTL or deadline before locking the ring and pass the resolved absolute deadline down for storage. Upper layers do not inspect or rewrite occupied ring state.
See docs/ring-buffer.md for structure, operations, and validation.
§Error Model
src/error.rs, public module.
All errors use the unified CaducusError<T> / CaducusErrorKind<T> types. The default type
parameter is ().
CaducusErrorKind<T> variants:
InvalidArgument– invalid non-send argument or configuration value (e.g. configured TTL out of range).InvalidTTL(T)– invalid per-send TTL or deadline. Carries the rejected item. Used whensend_with_ttlreceives a duration outside1ms..=1 year, orsend_with_deadlinereceives a deadline at or before validation time.InvalidPattern(T)– wrong push variant for the configured mode. Carries the rejected item.Timeout– blocking pop deadline reached.Shutdown(T)– queue has been shut down. Carries the rejected item on the send path; carries()on the receive path.Full(T)– queue is at capacity. Carries the rejected item.NoRuntime– no Tokio runtime available whenbuild()is called.
The send path returns Result<(), CaducusError<T>> so the caller always gets the item back on
failure. This guarantee also applies to per-item expiry sends that fail validation with
InvalidTTL(item), such as an out-of-range per-send TTL or a deadline that is not in the future.
The receive path and configuration methods return Result<..., CaducusError<()>>.
CaducusError<T>::into_inner() returns Option<T>, extracting the item from item-carrying send
errors: Full, Shutdown, InvalidTTL, and InvalidPattern.
§Concurrency — Serialised Access And Wakeup Coordination
src/concurrency.rs, crate-private module.
Serialises access to the ring buffer via Mutex<Ring<T>> and coordinates wakeups via two
Arc<Notify> handles (one for the reclaimer task, one for the receiver). Provides a unified
drain method controlled by DrainMode for both the reclaimer and receiver paths.
See docs/concurrency.md for send/drain paths, error model, and notification flow.
§Reclaimer — Expiry Task, Receiver Support, And Reporting
src/reclaimer.rs, crate-private module.
Owns the reclaimer task (expiry draining loop), all item reporting logic (expiry and shutdown),
and the try_receive function that serves the receiver. The receiver calls try_receive instead
of accessing the concurrency layer directly.
See docs/reclaimer.md for the reclaimer task loop, try_receive, reporting functions, and
wakeup correctness.
§Sender — Public Send API
src/sender.rs, the public send-side interface.
Owns builders and sender-local report-channel configuration. Sender drop triggers shutdown
(SPSC always, MPSC on last sender via Arc<AtomicUsize> sender counting).
See docs/sender.md for builders, send patterns, clone semantics, and drop behavior.
§Receiver — Public Receive API
src/receiver.rs, the public receive-side interface.
Owns the wait loop and timeout logic. Delegates to reclaimer::try_receive for drain-and-claim
operations. Mode-agnostic.
See docs/receiver.md for receive behaviour and lifecycle.
§Visibility
Only the sender and receiver modules expose public APIs. The concurrency layer is pub(crate).
The ring buffer is a private submodule of the concurrency module, invisible to sender and receiver.
§No Panic Paths In Live Code
expect, unwrap(), panic!, unreachable!, unimplemented!, and todo! are prohibited in all
live (non-test) code. Every fallible operation must be handled through Result, Option
combinators, or pattern matching. Safe non-panicking variants such as unwrap_or,
unwrap_or_else, and unwrap_or_default are permitted.
§Public API Summary
Two builders produce mode-specific sender types. Both take new(capacity, ttl) plus optional
expiry_channel(...) / shutdown_channel(...) methods:
SpscBuilder<T>::build(...)returns(SpscSender<T>, Receiver<T>).MpscBuilder<T>::build(...)returns(MpscSender<T>, Receiver<T>).
SpscSender<T> – not cloneable, report channels (if configured) fixed at construction at ring
level. Unconfigured channels result in silent drop of the corresponding outcome.
Methods: send(item), send_with_ttl(item, ttl), send_with_deadline(item, deadline),
update_capacity, update_ttl, shutdown, is_closed.
MpscSender<T> – cloneable, each clone captures a snapshot of the current report channels.
Methods: send(item), send_with_ttl(item, ttl), send_with_deadline(item, deadline),
set_expiry_channel, set_shutdown_channel, set_channels, update_capacity, update_ttl,
shutdown, is_closed.
Default send(item) uses the configured channel TTL. send_with_ttl validates the supplied
duration against the library TTL limits. send_with_deadline rejects deadlines at or before
validation time and otherwise accepts caller-supplied future deadlines without applying library TTL
limits. Per-item expiry sends do not change the configured default TTL used by later send(item)
calls. Receive delivery remains FIFO; per-item expiry does not provide earliest-deadline-first,
priority, or fair scheduling.
Receiver<T> – mode-agnostic, single consumer.
Methods: next(deadline: Option<Instant>), is_closed.
Error variants: InvalidArgument, InvalidTTL, InvalidPattern, NoRuntime, Timeout,
Shutdown, Full.
Full API detail lives in the sender and receiver docs.
§Conservation Invariant
Every item sent through the channel is accounted for exactly once:
sent == received + expired + shutdown
This is enforced by the architecture: reclaimer::try_receive reports expired items on the
receiver path, the reclaimer task reports expired items on the drain path, and shutdown drains
report through shutdown channels. Performance tests validate strict equality.
§Validation
tests/ring_buffer.rsvalidates storage invariants and resize behavior.tests/concurrency.rsvalidates drain, shutdown, and notification behavior.tests/caducus.rsvalidates the public sender and receiver contract.tests/performance.rsvalidates throughput, conservation, and stress scenarios.- Verification passes with:
cargo test --test caducuscargo testandcargo clippy --all-targets --all-features -- -D warnings.
Re-exports§
pub use error::CaducusError;pub use error::CaducusErrorKind;pub use receiver::Receiver;pub use sender::MpscBuilder;pub use sender::MpscSender;pub use sender::SpscBuilder;pub use sender::SpscSender;
Modules§
- error
- Error types returned from every fallible operation in the public API.
- receiver
- Receive side of the channel. See
Receiver. - sender
- Send side of the channel. See
SpscBuilder,MpscBuilder,SpscSender, andMpscSender.
Traits§
- Report
Channel - Report channel trait for expiry and shutdown reporting.