OxiRS Stream - Real-time RDF Streaming
Status: Production Release (v0.1.0) - Released January 7, 2026
✨ Production Release: Production-ready with API stability guarantees and comprehensive testing.
Real-time RDF data streaming with support for Kafka, NATS, and other message brokers. Process RDF streams with windowing, aggregation, and pattern matching.
Features
Message Brokers
- Apache Kafka - Distributed streaming platform
- NATS - Lightweight, high-performance messaging
- RabbitMQ - Reliable message queuing
- Custom Adapters - Bring your own message broker
Stream Processing
- Windowing - Tumbling, sliding, and session windows
- Aggregation - Count, sum, average over windows
- Pattern Matching - Detect patterns in RDF streams
- Filtering - Stream-based SPARQL filters
Features
- At-Least-Once Delivery - Reliable message processing
- Backpressure - Handle fast producers
- Checkpointing - Resume from failures
- Metrics - Monitor stream performance
Installation
Add to your Cargo.toml:
# Experimental feature
[]
= "0.1.0"
# Enable specific brokers
= { = "0.1.0", = ["kafka", "nats"] }
Quick Start
Basic Streaming
use ;
use Triple;
async
Stream Processing with Windows
use ;
use Duration;
async
Message Broker Configuration
Kafka
use KafkaConfig;
let config = KafkaConfig ;
NATS
use NatsConfig;
let config = NatsConfig ;
Windowing
Tumbling Windows
Fixed-size, non-overlapping windows:
use ;
use Duration;
let config = WindowConfig ;
// Process 60-second windows
Sliding Windows
Overlapping windows:
let config = WindowConfig ;
// Windows: [0-60s], [30-90s], [60-120s], ...
Session Windows
Dynamic windows based on inactivity gaps:
let config = WindowConfig ;
Stream Operations
Filtering
use SparqlFilter;
let filter = new?;
let filtered_stream = stream.filter;
Mapping
let transformed_stream = stream.map;
Aggregation
use ;
let processor = builder
.source
.window
.aggregate
.aggregate
.build?;
let results = processor.process.await?;
Pattern Matching
Temporal Patterns
use TemporalPattern;
let pattern = builder
.event
.followed_by
.within
.build?;
let matches = stream.detect_pattern.await?;
Graph Patterns
use GraphPattern;
let pattern = parse?;
let matches = stream.match_pattern.await?;
Reliability
Checkpointing
use CheckpointConfig;
let checkpoint_config = CheckpointConfig ;
let processor = builder
.source
.checkpoint
.build?;
// Automatically recovers from last checkpoint on failure
Error Handling
use ;
let error_policy = ErrorPolicy ;
let processor = builder
.source
.error_policy
.build?;
Integration
With oxirs-shacl (Streaming Validation)
use StreamProcessor;
use ValidationEngine;
let validator = new;
let processor = builder
.source
.window
.validate_with
.build?;
let mut results = processor.process.await?;
while let Some = results.next.await
With oxirs-arq (Stream Queries)
use StreamProcessor;
use StreamingQueryEngine;
let query_engine = new;
let query = r#"
PREFIX foaf: <http://xmlns.com/foaf/0.1/>
SELECT ?person (COUNT(?friend) as ?friendCount)
WHERE {
?person a foaf:Person .
?person foaf:knows ?friend .
}
GROUP BY ?person
HAVING (COUNT(?friend) > 10)
"#;
let processor = builder
.source
.window
.query
.build?;
Performance
Throughput Benchmarks
| Message Broker | Throughput | Latency (p99) |
|---|---|---|
| Kafka | 100K triples/s | 15ms |
| NATS | 80K triples/s | 8ms |
| RabbitMQ | 50K triples/s | 20ms |
Benchmarked on M1 Mac with local brokers
Optimization Tips
// Batch processing
let processor = builder
.source
.batch_size // Process in batches of 1000
.parallelism // 4 parallel workers
.build?;
// Backpressure control
let processor = builder
.source
.buffer_size
.backpressure_strategy
.build?;
Status
Production Release (v0.1.0)
- ✅ Kafka/NATS integrations with persisted offset checkpoints
- ✅ Windowing, filtering, and mapping tied into CLI persistence workflows
- ✅ SPARQL stream federation with
SERVICEbridging to remote endpoints - ✅ Prometheus/SciRS2 metrics for throughput, lag, and error rates
- 🚧 Aggregation operators (tumbling/sliding) final polish (in progress)
- 🚧 Pattern matching DSL and CEP (in progress)
- ⏳ Exactly-once semantics (planned for future release)
- ⏳ Distributed stream processing (planned for v0.2.0)
Contributing
This is an experimental module. Feedback welcome!
License
MIT OR Apache-2.0
See Also
- oxirs-shacl - Stream validation
- oxirs-arq - Stream queries
- oxirs-federate - Federated streams