Expand description
§Streaming API
In-memory streaming API for LaminarDB - embedded Kafka Streams-like semantics
with zero external dependencies.
§Overview
This module provides a type-safe streaming API with:
- Source: Entry point for data into a pipeline
- Sink: Consumption endpoint for data from a pipeline
- Subscription: Interface for receiving records from a sink
- Channels: Lock-free SPSC/MPSC communication
§Key Design Principles
- Channel type is auto-derived - Never user-specified
- SPSC → MPSC upgrade - Automatic on
source.clone() - Zero allocations on hot path - Arena and batch operations
- Checkpointing is optional - Zero overhead when disabled
§Quick Start
ⓘ
use laminar_core::streaming::{self, Record, Source, Sink, Subscription};
// Define your event type
#[derive(Clone)]
struct MyEvent {
id: i64,
value: f64,
timestamp: i64,
}
impl Record for MyEvent {
fn schema() -> SchemaRef { /* ... */ }
fn to_record_batch(&self) -> RecordBatch { /* ... */ }
fn event_time(&self) -> Option<i64> { Some(self.timestamp) }
}
// Create source and sink
let (source, sink) = streaming::create::<MyEvent>(1024);
// Push data (single producer = SPSC mode)
source.push(event)?;
// Clone for multiple producers (triggers MPSC upgrade)
let source2 = source.clone();
assert!(source.is_mpsc());
// Subscribe to receive data
let subscription = sink.subscribe();
// Consume via polling
while let Some(batch) = subscription.poll() {
process(batch);
}
// Or iterate
for batch in subscription {
process(batch);
}§Module Structure
config: Configuration types for channels, sources, and sinkserror: Error types for streaming operationsring_buffer: Lock-free ring buffer implementationchannel(): SPSC/MPSC channel with automatic upgradesource: Source API and Record traitsink: Sink API with subscription supportsubscription: Subscription API for consuming records
§Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────────┐
│ Source │────▶│ Channel │────▶│ Sink │
│ │ │ (SPSC/MPSC) │ │ │
│ push() │ │ │ │ subscribe() │
│ watermark() │ │ │ │ │
└─────────────┘ └─────────────┘ └────────┬────────┘
│ │
│ clone() │
│ (triggers MPSC) ▼
│ ┌─────────────────┐
│ │ Subscription │
│ │ │
│ │ poll() │
│ │ recv() │
│ │ Iterator │
│ └─────────────────┘§Performance
| Operation | Target | Notes |
|---|---|---|
| Ring buffer push | < 20ns | Power-of-2, cache-padded |
| SPSC channel push | < 50ns | Lock-free, batch support |
| MPSC channel push | < 150ns | CAS-based slot claiming |
| Source push | < 100ns | Includes watermark update |
| Arrow batch push | < 1μs | Zero-copy schema validation |
§Backpressure
Configurable strategies when buffers are full:
- Block: Wait until space is available (default, exactly-once friendly)
DropOldest: Overwrite oldest data (real-time systems)- Reject: Return error immediately (caller decides)
Re-exports§
pub use channel::channel;pub use channel::channel_with_config;pub use channel::ChannelMode;pub use channel::Consumer;pub use channel::Producer;pub use checkpoint::CheckpointError;pub use checkpoint::StreamCheckpoint;pub use checkpoint::StreamCheckpointConfig;pub use checkpoint::StreamCheckpointManager;pub use checkpoint::WalMode;pub use config::BackpressureStrategy;pub use config::ChannelConfig;pub use config::ChannelStats;pub use config::SinkConfig;pub use config::SourceConfig;pub use config::WaitStrategy;pub use error::RecvError;pub use error::StreamingError;pub use error::TryPushError;pub use ring_buffer::RingBuffer;pub use sink::Sink;pub use sink::SinkMode;pub use source::create;pub use source::create_with_config;pub use source::Record;pub use source::Source;pub use subscription::Subscription;pub use subscription::SubscriptionMessage;pub use broadcast::BroadcastChannel;pub use broadcast::BroadcastConfig;pub use broadcast::BroadcastConfigBuilder;pub use broadcast::BroadcastError;pub use broadcast::SlowSubscriberPolicy;pub use broadcast::SubscriberInfo;
Modules§
- broadcast
- Broadcast Channel for multi-consumer streaming.
- channel
- Lock-free streaming channels with automatic SPSC → MPSC upgrade.
- checkpoint
- Streaming checkpoint support.
- config
- Streaming API configuration types.
- error
- Streaming API error types.
- ring_
buffer - Lock-free ring buffer for streaming channels.
- sink
- Streaming Sink API.
- source
- Streaming Source API.
- subscription
- Streaming Subscription API.