Skip to main content

Module streaming

Module streaming 

Source
Expand description

§Streaming API

In-memory streaming API for LaminarDB - embedded Kafka Streams-like semantics with zero external dependencies.

§Overview

This module provides a type-safe streaming API with:

  • Source: Entry point for data into a pipeline
  • Sink: Consumption endpoint for data from a pipeline
  • Subscription: Interface for receiving records from a sink
  • Channels: Lock-free SPSC/MPSC communication

§Key Design Principles

  1. Channel type is auto-derived - Never user-specified
  2. SPSC → MPSC upgrade - Automatic on source.clone()
  3. Zero allocations on hot path - Arena and batch operations
  4. Checkpointing is optional - Zero overhead when disabled

§Quick Start

use laminar_core::streaming::{self, Record, Source, Sink, Subscription};

// Define your event type
#[derive(Clone)]
struct MyEvent {
    id: i64,
    value: f64,
    timestamp: i64,
}

impl Record for MyEvent {
    fn schema() -> SchemaRef { /* ... */ }
    fn to_record_batch(&self) -> RecordBatch { /* ... */ }
    fn event_time(&self) -> Option<i64> { Some(self.timestamp) }
}

// Create source and sink
let (source, sink) = streaming::create::<MyEvent>(1024);

// Push data (single producer = SPSC mode)
source.push(event)?;

// Clone for multiple producers (triggers MPSC upgrade)
let source2 = source.clone();
assert!(source.is_mpsc());

// Subscribe to receive data
let subscription = sink.subscribe();

// Consume via polling
while let Some(batch) = subscription.poll() {
    process(batch);
}

// Or iterate
for batch in subscription {
    process(batch);
}

§Module Structure

  • config: Configuration types for channels, sources, and sinks
  • error: Error types for streaming operations
  • ring_buffer: Lock-free ring buffer implementation
  • channel(): SPSC/MPSC channel with automatic upgrade
  • source: Source API and Record trait
  • sink: Sink API with subscription support
  • subscription: Subscription API for consuming records

§Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────────┐
│   Source    │────▶│   Channel   │────▶│      Sink       │
│             │     │ (SPSC/MPSC) │     │                 │
│ push()      │     │             │     │ subscribe()     │
│ watermark() │     │             │     │                 │
└─────────────┘     └─────────────┘     └────────┬────────┘
      │                                          │
      │ clone()                                  │
      │ (triggers MPSC)                          ▼
      │                                 ┌─────────────────┐
      │                                 │  Subscription   │
      │                                 │                 │
      │                                 │ poll()          │
      │                                 │ recv()          │
      │                                 │ Iterator        │
      │                                 └─────────────────┘

§Performance

OperationTargetNotes
Ring buffer push< 20nsPower-of-2, cache-padded
SPSC channel push< 50nsLock-free, batch support
MPSC channel push< 150nsCAS-based slot claiming
Source push< 100nsIncludes watermark update
Arrow batch push< 1μsZero-copy schema validation

§Backpressure

Configurable strategies when buffers are full:

  • Block: Wait until space is available (default, exactly-once friendly)
  • DropOldest: Overwrite oldest data (real-time systems)
  • Reject: Return error immediately (caller decides)

Re-exports§

pub use channel::channel;
pub use channel::channel_with_config;
pub use channel::ChannelMode;
pub use channel::Consumer;
pub use channel::Producer;
pub use checkpoint::CheckpointError;
pub use checkpoint::StreamCheckpoint;
pub use checkpoint::StreamCheckpointConfig;
pub use checkpoint::StreamCheckpointManager;
pub use checkpoint::WalMode;
pub use config::BackpressureStrategy;
pub use config::ChannelConfig;
pub use config::ChannelStats;
pub use config::SinkConfig;
pub use config::SourceConfig;
pub use config::WaitStrategy;
pub use error::RecvError;
pub use error::StreamingError;
pub use error::TryPushError;
pub use ring_buffer::RingBuffer;
pub use sink::Sink;
pub use sink::SinkMode;
pub use source::create;
pub use source::create_with_config;
pub use source::Record;
pub use source::Source;
pub use subscription::Subscription;
pub use subscription::SubscriptionMessage;
pub use broadcast::BroadcastChannel;
pub use broadcast::BroadcastConfig;
pub use broadcast::BroadcastConfigBuilder;
pub use broadcast::BroadcastError;
pub use broadcast::SlowSubscriberPolicy;
pub use broadcast::SubscriberInfo;

Modules§

broadcast
Broadcast Channel for multi-consumer streaming.
channel
Lock-free streaming channels with automatic SPSC → MPSC upgrade.
checkpoint
Streaming checkpoint support.
config
Streaming API configuration types.
error
Streaming API error types.
ring_buffer
Lock-free ring buffer for streaming channels.
sink
Streaming Sink API.
source
Streaming Source API.
subscription
Streaming Subscription API.