Expand description
Β§streamweave-window
Windowing operations for StreamWeave
Group stream elements into bounded windows for aggregation and processing.
The streamweave-window package provides windowing operations for StreamWeave streams. It supports time-based windows (tumbling, sliding), count-based windows, session windows, and configurable window triggers and late data policies.
Β§β¨ Key Features
- Window Types: Tumbling, Sliding, Session windows
- Time-Based Windows: Event time and processing time support
- Count-Based Windows: Window by number of elements
- Window Assigners: Assign elements to windows
- Window Triggers: Control when windows fire
- Late Data Policy: Handle late-arriving elements
- Window Transformers: Ready-to-use window transformers
Β§π¦ Installation
Add this to your Cargo.toml:
[dependencies]
streamweave-window = "0.4.0"Β§π Quick Start
Β§Tumbling Window
use streamweave_window::{TumblingWindowAssigner, WindowConfig};
use std::time::Duration;
// Create a tumbling window of 5 seconds
let assigner = TumblingWindowAssigner::new(Duration::from_secs(5));
let config = WindowConfig::default();
// Use with window transformer
let transformer = WindowTransformer::new(assigner, config);Β§Sliding Window
use streamweave_window::SlidingWindowAssigner;
// Create a sliding window: 10 second size, 5 second slide
let assigner = SlidingWindowAssigner::new(
Duration::from_secs(10),
Duration::from_secs(5)
);Β§π API Overview
Β§Window Types
TimeWindow:
- Represents a window with start and end timestamps
- Used for time-based windowing
Window Assigners:
TumblingWindowAssigner- Fixed-size, non-overlapping windowsSlidingWindowAssigner- Fixed-size, overlapping windowsSessionWindowAssigner- Gap-based dynamic windows
Β§Window Configuration
pub struct WindowConfig {
pub late_data_policy: LateDataPolicy,
pub allowed_lateness: Option<Duration>,
// ... other config options
}Β§Window Triggers
Control when windows fire:
pub enum TriggerResult {
Continue, // Keep accumulating
Fire, // Emit results, keep state
FireAndPurge, // Emit results, clear state
Purge, // Discard state
}Β§Late Data Policy
Handle late-arriving elements:
pub enum LateDataPolicy {
Drop, // Drop late elements
SideOutput, // Emit to side output
AllowLateness(Duration), // Include in window
}Β§π Usage Examples
Β§Tumbling Window
Fixed-size, non-overlapping windows:
use streamweave_window::{TumblingWindowAssigner, WindowConfig};
use std::time::Duration;
let assigner = TumblingWindowAssigner::new(Duration::from_secs(5));
let config = WindowConfig::default();
// Elements are grouped into 5-second windows
// [0-5s), [5-10s), [10-15s), ...Β§Sliding Window
Fixed-size, overlapping windows:
use streamweave_window::SlidingWindowAssigner;
// 10 second window, 5 second slide
let assigner = SlidingWindowAssigner::new(
Duration::from_secs(10),
Duration::from_secs(5)
);
// Windows: [0-10s), [5-15s), [10-20s), ...Β§Session Window
Gap-based dynamic windows:
use streamweave_window::SessionWindowAssigner;
// Session gap of 30 seconds
let assigner = SessionWindowAssigner::new(Duration::from_secs(30));
// Windows are created dynamically based on activity gapsΒ§Count-Based Windows
Window by number of elements:
use streamweave_window::CountWindowAssigner;
// Window of 100 elements
let assigner = CountWindowAssigner::new(100);Β§Window Transformers
Use ready-to-use window transformers:
use streamweave_window::transformers::WindowTransformer;
let assigner = TumblingWindowAssigner::new(Duration::from_secs(5));
let transformer = WindowTransformer::new(assigner, WindowConfig::default());
// Use in pipeline
pipeline.transformer(transformer);Β§Late Data Handling
Configure late data policy:
use streamweave_window::{WindowConfig, LateDataPolicy};
use std::time::Duration;
// Allow lateness up to 1 minute
let config = WindowConfig {
late_data_policy: LateDataPolicy::AllowLateness(Duration::from_secs(60)),
..Default::default()
};
// Or drop late data
let config = WindowConfig {
late_data_policy: LateDataPolicy::Drop,
..Default::default()
};
// Or emit to side output
let config = WindowConfig {
late_data_policy: LateDataPolicy::SideOutput,
..Default::default()
};Β§Time-Based Windows
Use event time or processing time:
use streamweave_window::{TimeWindow, DateTime, Utc};
// Create a time window
let window = TimeWindow::new(
Utc::now(),
Utc::now() + Duration::from_secs(10)
);
// Check if timestamp is in window
let timestamp = Utc::now();
if window.contains(timestamp) {
// Process element
}Β§ποΈ Architecture
Windows group elements for bounded processing:
βββββββββββββββ
β Stream ββββelementsβββ>ββββββββββββββββ
βββββββββββββββ β WindowAssignerβ
β β
β ββββββββββ β
β β Window β β
β ββββββββββ β
ββββββββββββββββ
β
βΌ
ββββββββββββββββ
β Aggregation β
ββββββββββββββββWindow Flow:
- Elements arrive in stream
- Window assigner assigns elements to windows
- Elements accumulate in windows
- Window trigger fires window
- Aggregation processes window contents
- Results emitted
Β§π§ Configuration
Β§Window Assigners
Tumbling:
- Fixed-size, non-overlapping
- Simple and efficient
- Best for regular aggregations
Sliding:
- Fixed-size, overlapping
- More windows, more computation
- Best for smooth aggregations
Session:
- Gap-based, dynamic size
- Adapts to data patterns
- Best for user sessions
Β§Late Data Policies
Drop:
- Discard late elements
- Simple, no overhead
- Best for real-time processing
SideOutput:
- Emit to separate stream
- Enables separate processing
- Best for analysis
AllowLateness:
- Include in window
- Refire if needed
- Best for accuracy
Β§π Error Handling
Window operations return WindowResult<T>:
pub enum WindowError {
InvalidConfig(String),
NotFound(String),
WindowClosed(String),
StateError(String),
}Β§β‘ Performance Considerations
- Window State: Windows maintain state for elements
- Memory Usage: More windows = more memory
- Trigger Frequency: Frequent triggers = more computation
- Late Data: Late data handling adds overhead
Β§π Examples
For more examples, see:
Β§π Dependencies
streamweave-window depends on:
streamweave- Core traitsstreamweave-error- Error handlingstreamweave-message(optional) - Message envelope supporttokio- Async runtimefutures- Stream utilitieschrono- Timestamp supportasync-stream- Stream generation
Β§π― Use Cases
Windowing is used for:
- Time-Based Aggregations: Sum, average, count over time
- Sliding Aggregations: Moving averages, trends
- Session Analysis: User session tracking
- Bounded Processing: Process unbounded streams in bounded windows
- Late Data Handling: Handle out-of-order data
Β§π Documentation
Β§π See Also
- streamweave - Core traits
- streamweave-pipeline - Pipeline API
- streamweave-graph - Graph API
- streamweave-stateful - Stateful processing
Β§π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
Β§π License
This project is licensed under the CC BY-SA 4.0 license.
Re-exportsΒ§
pub use transformers::*;pub use window::*;
ModulesΒ§
- transformers
- Window transformer for StreamWeave
- window
- Windowing operations for stream processing.