streamweave-window
Windowing operations for StreamWeave
Group stream elements into bounded windows for aggregation and processing.
The streamweave-window package provides windowing operations for StreamWeave streams. It supports time-based windows (tumbling, sliding), count-based windows, session windows, and configurable window triggers and late data policies.
β¨ Key Features
- Window Types: Tumbling, Sliding, Session windows
- Time-Based Windows: Event time and processing time support
- Count-Based Windows: Window by number of elements
- Window Assigners: Assign elements to windows
- Window Triggers: Control when windows fire
- Late Data Policy: Handle late-arriving elements
- Window Transformers: Ready-to-use window transformers
π¦ Installation
Add this to your Cargo.toml:
[]
= "0.4.0"
π Quick Start
Tumbling Window
use ;
use Duration;
// Create a tumbling window of 5 seconds
let assigner = new;
let config = default;
// Use with window transformer
let transformer = new;
Sliding Window
use SlidingWindowAssigner;
// Create a sliding window: 10 second size, 5 second slide
let assigner = new;
π API Overview
Window Types
TimeWindow:
- Represents a window with start and end timestamps
- Used for time-based windowing
Window Assigners:
TumblingWindowAssigner- Fixed-size, non-overlapping windowsSlidingWindowAssigner- Fixed-size, overlapping windowsSessionWindowAssigner- Gap-based dynamic windows
Window Configuration
Window Triggers
Control when windows fire:
Late Data Policy
Handle late-arriving elements:
π Usage Examples
Tumbling Window
Fixed-size, non-overlapping windows:
use ;
use Duration;
let assigner = new;
let config = default;
// Elements are grouped into 5-second windows
// [0-5s), [5-10s), [10-15s), ...
Sliding Window
Fixed-size, overlapping windows:
use SlidingWindowAssigner;
// 10 second window, 5 second slide
let assigner = new;
// Windows: [0-10s), [5-15s), [10-20s), ...
Session Window
Gap-based dynamic windows:
use SessionWindowAssigner;
// Session gap of 30 seconds
let assigner = new;
// Windows are created dynamically based on activity gaps
Count-Based Windows
Window by number of elements:
use CountWindowAssigner;
// Window of 100 elements
let assigner = new;
Window Transformers
Use ready-to-use window transformers:
use WindowTransformer;
let assigner = new;
let transformer = new;
// Use in pipeline
pipeline.transformer;
Late Data Handling
Configure late data policy:
use ;
use Duration;
// Allow lateness up to 1 minute
let config = WindowConfig ;
// Or drop late data
let config = WindowConfig ;
// Or emit to side output
let config = WindowConfig ;
Time-Based Windows
Use event time or processing time:
use ;
// Create a time window
let window = new;
// Check if timestamp is in window
let timestamp = now;
if window.contains
ποΈ Architecture
Windows group elements for bounded processing:
βββββββββββββββ
β Stream ββββelementsβββ>ββββββββββββββββ
βββββββββββββββ β WindowAssignerβ
β β
β ββββββββββ β
β β Window β β
β ββββββββββ β
ββββββββββββββββ
β
βΌ
ββββββββββββββββ
β Aggregation β
ββββββββββββββββ
Window Flow:
- Elements arrive in stream
- Window assigner assigns elements to windows
- Elements accumulate in windows
- Window trigger fires window
- Aggregation processes window contents
- Results emitted
π§ Configuration
Window Assigners
Tumbling:
- Fixed-size, non-overlapping
- Simple and efficient
- Best for regular aggregations
Sliding:
- Fixed-size, overlapping
- More windows, more computation
- Best for smooth aggregations
Session:
- Gap-based, dynamic size
- Adapts to data patterns
- Best for user sessions
Late Data Policies
Drop:
- Discard late elements
- Simple, no overhead
- Best for real-time processing
SideOutput:
- Emit to separate stream
- Enables separate processing
- Best for analysis
AllowLateness:
- Include in window
- Refire if needed
- Best for accuracy
π Error Handling
Window operations return WindowResult<T>:
β‘ Performance Considerations
- Window State: Windows maintain state for elements
- Memory Usage: More windows = more memory
- Trigger Frequency: Frequent triggers = more computation
- Late Data: Late data handling adds overhead
π Examples
For more examples, see:
π Dependencies
streamweave-window depends on:
streamweave- Core traitsstreamweave-error- Error handlingstreamweave-message(optional) - Message envelope supporttokio- Async runtimefutures- Stream utilitieschrono- Timestamp supportasync-stream- Stream generation
π― Use Cases
Windowing is used for:
- Time-Based Aggregations: Sum, average, count over time
- Sliding Aggregations: Moving averages, trends
- Session Analysis: User session tracking
- Bounded Processing: Process unbounded streams in bounded windows
- Late Data Handling: Handle out-of-order data
π Documentation
π See Also
- streamweave - Core traits
- streamweave-pipeline - Pipeline API
- streamweave-graph - Graph API
- streamweave-stateful - Stateful processing
π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
π License
This project is licensed under the CC BY-SA 4.0 license.