Skip to main content

Module tiered_sink

Module tiered_sink 

Source
Available on crate feature tiered-sink only.
Expand description

Tiered sink with automatic disk spillover for resilient message delivery.

This module provides a wrapper around any async sink (Kafka, S3, HTTP, etc.) that automatically spills messages to disk when the primary sink is unavailable or backpressuring, then drains them back when the sink recovers.

§Design

                    ┌─────────────────────────────────────┐
                    │           TieredSink                │
                    │                                     │
   Message ────────►│  try_send() to primary sink        │
                    │         │                           │
                    │         ▼                           │
                    │    ┌─────────┐                      │
                    │    │ Success │──► Done (hot path)   │
                    │    └────┬────┘                      │
                    │         │ Err(Full/Unavailable)     │
                    │         ▼                           │
                    │    ┌─────────┐                      │
                    │    │  Spool  │──► Disk (cold path)  │
                    │    └────┬────┘                      │
                    │         │                           │
                    │    Background drain task            │
                    │    (when primary recovers)          │
                    └─────────────────────────────────────┘

§Features

  • Hot path first: Always tries primary sink with timeout
  • Automatic spillover: Writes to disk only when primary fails
  • Circuit breaker: Avoids hammering a dead sink
  • Background drain: Recovers spooled messages when sink is healthy
  • Configurable ordering: Interleaved (default) or strict FIFO
  • Multiple compression codecs: LZ4 (default), Snappy, Zstd, None

§Example

use hyperi_rustlib::tiered_sink::{TieredSink, TieredSinkConfig, Sink, SinkError};

// Implement Sink for your backend
struct MyKafkaSink { /* ... */ }

#[async_trait::async_trait]
impl Sink for MyKafkaSink {
    type Error = MyError;

    async fn try_send(&self, data: &[u8]) -> Result<(), SinkError<Self::Error>> {
        // Send to Kafka...
        Ok(())
    }
}

// Wrap with TieredSink
let kafka = MyKafkaSink::new();
let config = TieredSinkConfig::new("/var/spool/myapp.queue");
let tiered = TieredSink::new(kafka, config).await?;

// Use tiered - automatically spills to disk if Kafka is down
tiered.send(b"my message").await?;

Structs§

CircuitBreaker
Circuit breaker for protecting against unhealthy sinks.
DiskAwareConfig
Configuration for disk-aware capacity management.
TieredSink
A tiered sink with automatic disk spillover.
TieredSinkConfig
Configuration for TieredSink.

Enums§

CircuitState
Circuit breaker state.
CompressionCodec
Compression codec for spool storage.
DrainStrategy
Strategy for draining spooled messages back to the primary sink.
OrderingMode
Ordering mode for message delivery during drain.
SinkError
Error returned by Sink::try_send.
TieredSinkError
Errors that can occur in TieredSink operations.

Traits§

Sink
A sink that can receive messages asynchronously.

Type Aliases§

Result
Result type for tiered sink operations.