Skip to main content

Crate caducus

Crate caducus 

Source
Expand description

Bounded async MPSC/SPSC channel with item expiry.

Caducus (latin) = perishable

Caducus is a bounded asynchronous channel with two operating modes: single-producer single-consumer (SpscBuilder / SpscSender) and multi-producer single-consumer (MpscBuilder / MpscSender). Items carry a time-to-live and are evicted when they expire; the eviction is observable through user-supplied ReportChannels.

Use shutdown() on the sender side for controlled channel teardown. Dropping the last sender also triggers a hard shutdown that drains and reports any items still in the buffer.

Caducus runs on Tokio. A runtime handle must be available either implicitly (call build() from within a Tokio task) or explicitly via SpscBuilder::runtime / MpscBuilder::runtime.

§Example

use std::time::Duration;
use caducus::MpscBuilder;

#[tokio::main]
async fn main() {
    let (tx, rx) = MpscBuilder::<u32>::new(128, Duration::from_secs(5))
        .build()
        .expect("tokio runtime available");

    tx.send(42).expect("buffer not full");
    let item = rx.next(None).await.expect("received before timeout");
    assert_eq!(item, 42);
}

§Caducus Bounded Buffer With Expiry

Status: Developed

§Objectives

  • Provide a bounded asynchronous channel with expiry and hard shutdown semantics.
  • Support two operating modes: SPSC (single producer, single consumer) and MPSC (multi producer, single consumer). Mode is set at build time and immutable.
  • Keep the implementation split into storage, concurrency, reclaimer, sender, and receiver layers.
  • Use a mutex-based architecture with Vec-backed ring buffer storage.
  • Report expiry and shutdown outcomes through sender-owned report channels.
  • Support per-item expiry sends that can override the configured default TTL with either a validated per-send TTL or a future absolute deadline.

§Technical Details

§Layering

Five modules, strict separation of concerns. Each module doc is the single source of truth for that module’s behaviour.

§Ring Buffer — Data Ownership And Integrity

src/concurrency/ring_buffer.rs, private submodule of the concurrency module.

The ring buffer is the single authority for stored queue accounting data and its validity. Default sends let the ring calculate expiry from the configured TTL. Per-item send variants validate their caller-supplied TTL or deadline before locking the ring and pass the resolved absolute deadline down for storage. Upper layers do not inspect or rewrite occupied ring state.

See docs/ring-buffer.md for structure, operations, and validation.

§Error Model

src/error.rs, public module.

All errors use the unified CaducusError<T> / CaducusErrorKind<T> types. The default type parameter is ().

CaducusErrorKind<T> variants:

  • InvalidArgument – invalid non-send argument or configuration value (e.g. configured TTL out of range).
  • InvalidTTL(T) – invalid per-send TTL or deadline. Carries the rejected item. Used when send_with_ttl receives a duration outside 1ms..=1 year, or send_with_deadline receives a deadline at or before validation time.
  • InvalidPattern(T) – wrong push variant for the configured mode. Carries the rejected item.
  • Timeout – blocking pop deadline reached.
  • Shutdown(T) – queue has been shut down. Carries the rejected item on the send path; carries () on the receive path.
  • Full(T) – queue is at capacity. Carries the rejected item.
  • NoRuntime – no Tokio runtime available when build() is called.

The send path returns Result<(), CaducusError<T>> so the caller always gets the item back on failure. This guarantee also applies to per-item expiry sends that fail validation with InvalidTTL(item), such as an out-of-range per-send TTL or a deadline that is not in the future. The receive path and configuration methods return Result<..., CaducusError<()>>.

CaducusError<T>::into_inner() returns Option<T>, extracting the item from item-carrying send errors: Full, Shutdown, InvalidTTL, and InvalidPattern.

§Concurrency — Serialised Access And Wakeup Coordination

src/concurrency.rs, crate-private module.

Serialises access to the ring buffer via Mutex<Ring<T>> and coordinates wakeups via two Arc<Notify> handles (one for the reclaimer task, one for the receiver). Provides a unified drain method controlled by DrainMode for both the reclaimer and receiver paths.

See docs/concurrency.md for send/drain paths, error model, and notification flow.

§Reclaimer — Expiry Task, Receiver Support, And Reporting

src/reclaimer.rs, crate-private module.

Owns the reclaimer task (expiry draining loop), all item reporting logic (expiry and shutdown), and the try_receive function that serves the receiver. The receiver calls try_receive instead of accessing the concurrency layer directly.

See docs/reclaimer.md for the reclaimer task loop, try_receive, reporting functions, and wakeup correctness.

§Sender — Public Send API

src/sender.rs, the public send-side interface.

Owns builders and sender-local report-channel configuration. Sender drop triggers shutdown (SPSC always, MPSC on last sender via Arc<AtomicUsize> sender counting).

See docs/sender.md for builders, send patterns, clone semantics, and drop behavior.

§Receiver — Public Receive API

src/receiver.rs, the public receive-side interface.

Owns the wait loop and timeout logic. Delegates to reclaimer::try_receive for drain-and-claim operations. Mode-agnostic.

See docs/receiver.md for receive behaviour and lifecycle.

§Visibility

Only the sender and receiver modules expose public APIs. The concurrency layer is pub(crate). The ring buffer is a private submodule of the concurrency module, invisible to sender and receiver.

§No Panic Paths In Live Code

expect, unwrap(), panic!, unreachable!, unimplemented!, and todo! are prohibited in all live (non-test) code. Every fallible operation must be handled through Result, Option combinators, or pattern matching. Safe non-panicking variants such as unwrap_or, unwrap_or_else, and unwrap_or_default are permitted.

§Public API Summary

Two builders produce mode-specific sender types. Both take new(capacity, ttl) plus optional expiry_channel(...) / shutdown_channel(...) methods:

  • SpscBuilder<T>::build(...) returns (SpscSender<T>, Receiver<T>).
  • MpscBuilder<T>::build(...) returns (MpscSender<T>, Receiver<T>).

SpscSender<T> – not cloneable, report channels (if configured) fixed at construction at ring level. Unconfigured channels result in silent drop of the corresponding outcome. Methods: send(item), send_with_ttl(item, ttl), send_with_deadline(item, deadline), update_capacity, update_ttl, shutdown, is_closed.

MpscSender<T> – cloneable, each clone captures a snapshot of the current report channels. Methods: send(item), send_with_ttl(item, ttl), send_with_deadline(item, deadline), set_expiry_channel, set_shutdown_channel, set_channels, update_capacity, update_ttl, shutdown, is_closed.

Default send(item) uses the configured channel TTL. send_with_ttl validates the supplied duration against the library TTL limits. send_with_deadline rejects deadlines at or before validation time and otherwise accepts caller-supplied future deadlines without applying library TTL limits. Per-item expiry sends do not change the configured default TTL used by later send(item) calls. Receive delivery remains FIFO; per-item expiry does not provide earliest-deadline-first, priority, or fair scheduling.

Receiver<T> – mode-agnostic, single consumer. Methods: next(deadline: Option<Instant>), is_closed.

Error variants: InvalidArgument, InvalidTTL, InvalidPattern, NoRuntime, Timeout, Shutdown, Full. Full API detail lives in the sender and receiver docs.

§Conservation Invariant

Every item sent through the channel is accounted for exactly once:

sent == received + expired + shutdown

This is enforced by the architecture: reclaimer::try_receive reports expired items on the receiver path, the reclaimer task reports expired items on the drain path, and shutdown drains report through shutdown channels. Performance tests validate strict equality.

§Validation

  • tests/ring_buffer.rs validates storage invariants and resize behavior.
  • tests/concurrency.rs validates drain, shutdown, and notification behavior.
  • tests/caducus.rs validates the public sender and receiver contract.
  • tests/performance.rs validates throughput, conservation, and stress scenarios.
  • Verification passes with: cargo test --test caducus cargo test and cargo clippy --all-targets --all-features -- -D warnings.

Re-exports§

pub use error::CaducusError;
pub use error::CaducusErrorKind;
pub use receiver::Receiver;
pub use sender::MpscBuilder;
pub use sender::MpscSender;
pub use sender::SpscBuilder;
pub use sender::SpscSender;

Modules§

error
Error types returned from every fallible operation in the public API.
receiver
Receive side of the channel. See Receiver.
sender
Send side of the channel. See SpscBuilder, MpscBuilder, SpscSender, and MpscSender.

Traits§

ReportChannel
Report channel trait for expiry and shutdown reporting.