oxigdal-streaming
Real-time data processing and streaming pipelines for OxiGDAL.
Overview
oxigdal-streaming provides a comprehensive framework for processing geospatial data in real-time. It includes robust stream processing capabilities with event-time processing, windowing, stateful operations, and fault tolerance.
Features
Streaming Core
- Stream Traits and Abstractions: Flexible stream processing with sources, sinks, and operators
- Backpressure Handling: Adaptive backpressure management to prevent buffer overflow
- Flow Control: Rate limiting and flow control mechanisms
- Error Recovery: Configurable recovery strategies with exponential backoff
Windowing & Watermarking
- Tumbling Windows: Fixed, non-overlapping time windows
- Sliding Windows: Overlapping time windows with configurable slide intervals
- Session Windows: Dynamic windows based on activity gaps
- Event Time Processing: Watermark generation for handling out-of-order events
- Late Data Handling: Configurable policies for late-arriving data
Transformations
- Basic Operations: Map, filter, flatMap
- Aggregations: Count, sum, average, min, max
- Reduce Operations: Reduce, fold, scan
- Join Operations: Inner, left, right, full outer joins
- Partitioning: Hash, range, round-robin partitioning strategies
State Management
- Keyed State: Value, list, map, reducing, and aggregating state
- Operator State: Broadcast and union list state
- Checkpointing: Periodic checkpointing for fault tolerance
- State Backends: In-memory and RocksDB backends
- Recovery: Automatic state recovery from checkpoints
Installation
Add this to your Cargo.toml:
[]
= "0.1.3"
For RocksDB backend support:
[]
= { = "0.1.3", = ["rocksdb-backend"] }
Usage
Basic Stream Processing
use ;
use Utc;
async
Windowing
use TumblingAssigner;
use WindowAssigner;
use Duration;
let assigner = new;
let windows = assigner.assign_windows?;
Aggregation
use ;
let operator = new;
for elem in elements
let result = operator.get_result.await;
Join Operations
use ;
let config = default;
let join = new;
join.process_left.await?;
let results = join.process_right.await?;
Stateful Processing
use MemoryStateBackend;
use ValueState;
use Arc;
let backend = new;
let state = new;
state.set.await?;
let value = state.get.await?;
Checkpointing
use ;
let config = default;
let coordinator = new;
let checkpoint_id = coordinator.trigger_checkpoint.await?;
// Process...
coordinator.complete_checkpoint.await?;
Architecture
The crate is organized into several modules:
- core: Stream abstractions, backpressure, flow control, operators, and recovery
- windowing: Window types, assigners, and watermark generation
- transformations: Stream transformations, aggregations, joins, and partitioning
- state: State backends, checkpointing, keyed state, and operator state
Performance
The streaming framework is designed for high performance:
- Lock-free data structures where possible
- Efficient buffer management with adaptive backpressure
- Configurable parallelism for distributed processing
- RocksDB backend for persistent state with minimal overhead
COOLJAPAN Compliance
This crate follows all COOLJAPAN policies:
- ✅ 100% Pure Rust (no C/Fortran dependencies)
- ✅ No
unwrap()orpanic!()in production code - ✅ All files under 2000 lines
- ✅ Workspace dependencies
- ✅ Comprehensive tests and benchmarks
License
Licensed under Apache-2.0.
Contributing
Contributions are welcome! Please ensure all tests pass and follow the COOLJAPAN policies.