Skip to main content

laminar_core/streaming/
mod.rs

1//! # Streaming API
2//!
3//! In-memory streaming API for `LaminarDB` - embedded Kafka Streams-like semantics
4//! with zero external dependencies.
5//!
6//! ## Overview
7//!
8//! This module provides a type-safe streaming API with:
9//!
10//! - **Source**: Entry point for data into a pipeline
11//! - **Sink**: Consumption endpoint for data from a pipeline
12//! - **Subscription**: Interface for receiving records from a sink
13//! - **Channels**: Lock-free SPSC/MPSC communication
14//!
15//! ## Key Design Principles
16//!
17//! 1. **Channel type is auto-derived** - Never user-specified
18//! 2. **SPSC → MPSC upgrade** - Automatic on `source.clone()`
19//! 3. **Zero allocations on hot path** - Arena and batch operations
20//! 4. **Checkpointing is optional** - Zero overhead when disabled
21//!
22//! ## Quick Start
23//!
24//! ```rust,ignore
25//! use laminar_core::streaming::{self, Record, Source, Sink, Subscription};
26//!
27//! // Define your event type
28//! #[derive(Clone)]
29//! struct MyEvent {
30//!     id: i64,
31//!     value: f64,
32//!     timestamp: i64,
33//! }
34//!
35//! impl Record for MyEvent {
36//!     fn schema() -> SchemaRef { /* ... */ }
37//!     fn to_record_batch(&self) -> RecordBatch { /* ... */ }
38//!     fn event_time(&self) -> Option<i64> { Some(self.timestamp) }
39//! }
40//!
41//! // Create source and sink
42//! let (source, sink) = streaming::create::<MyEvent>(1024);
43//!
44//! // Push data (single producer = SPSC mode)
45//! source.push(event)?;
46//!
47//! // Clone for multiple producers (triggers MPSC upgrade)
48//! let source2 = source.clone();
49//! assert!(source.is_mpsc());
50//!
51//! // Subscribe to receive data
52//! let subscription = sink.subscribe();
53//!
54//! // Consume via polling
55//! while let Some(batch) = subscription.poll() {
56//!     process(batch);
57//! }
58//!
59//! // Or iterate
60//! for batch in subscription {
61//!     process(batch);
62//! }
63//! ```
64//!
65//! ## Module Structure
66//!
67//! - [`config`]: Configuration types for channels, sources, and sinks
68//! - [`error`]: Error types for streaming operations
69//! - [`ring_buffer`]: Lock-free ring buffer implementation
70//! - [`channel()`]: SPSC/MPSC channel with automatic upgrade
71//! - [`source`]: Source API and Record trait
72//! - [`sink`]: Sink API with subscription support
73//! - [`subscription`]: Subscription API for consuming records
74//!
75//! ## Architecture
76//!
77//! ```text
78//! ┌─────────────┐     ┌─────────────┐     ┌─────────────────┐
79//! │   Source    │────▶│   Channel   │────▶│      Sink       │
80//! │             │     │ (SPSC/MPSC) │     │                 │
81//! │ push()      │     │             │     │ subscribe()     │
82//! │ watermark() │     │             │     │                 │
83//! └─────────────┘     └─────────────┘     └────────┬────────┘
84//!       │                                          │
85//!       │ clone()                                  │
86//!       │ (triggers MPSC)                          ▼
87//!       │                                 ┌─────────────────┐
88//!       │                                 │  Subscription   │
89//!       │                                 │                 │
90//!       │                                 │ poll()          │
91//!       │                                 │ recv()          │
92//!       │                                 │ Iterator        │
93//!       │                                 └─────────────────┘
94//! ```
95//!
96//! ## Performance
97//!
98//! | Operation | Target | Notes |
99//! |-----------|--------|-------|
100//! | Ring buffer push | < 20ns | Power-of-2, cache-padded |
101//! | SPSC channel push | < 50ns | Lock-free, batch support |
102//! | MPSC channel push | < 150ns | CAS-based slot claiming |
103//! | Source push | < 100ns | Includes watermark update |
104//! | Arrow batch push | < 1μs | Zero-copy schema validation |
105//!
106//! ## Backpressure
107//!
108//! Configurable strategies when buffers are full:
109//!
110//! - **Block**: Wait until space is available (default, exactly-once friendly)
111//! - **`DropOldest`**: Overwrite oldest data (real-time systems)
112//! - **Reject**: Return error immediately (caller decides)
113
114pub mod broadcast;
115pub mod channel;
116pub mod checkpoint;
117pub mod config;
118pub mod error;
119pub mod ring_buffer;
120pub mod sink;
121pub mod source;
122pub mod subscription;
123
124// Re-export key types
125pub use channel::{channel, channel_with_config, ChannelMode, Consumer, Producer};
126pub use checkpoint::{
127    CheckpointError, StreamCheckpoint, StreamCheckpointConfig, StreamCheckpointManager, WalMode,
128};
129pub use config::{
130    BackpressureStrategy, ChannelConfig, ChannelStats, SinkConfig, SourceConfig, WaitStrategy,
131};
132pub use error::{RecvError, StreamingError, TryPushError};
133pub use ring_buffer::RingBuffer;
134pub use sink::{Sink, SinkMode};
135pub use source::{create, create_with_config, Record, Source};
136pub use subscription::{Subscription, SubscriptionMessage};
137
138// Broadcast channel re-exports
139pub use broadcast::{
140    BroadcastChannel, BroadcastConfig, BroadcastConfigBuilder, BroadcastError,
141    SlowSubscriberPolicy, SubscriberInfo,
142};