RSP-RS
An RDF Stream Processing Engine in Rust built on top of Oxigraph for SPARQL querying and Tokio for async processing.
Installation
Add this to your Cargo.toml:
[]
= "0.1.0"
Or install with cargo:
Usage
You can define a query using the RSP-QL syntax. An example query is shown below:
use RSPEngine;
let query = r#"
PREFIX ex: <https://rsp.rs/>
REGISTER RStream <output> AS
SELECT *
FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 10 STEP 2]
WHERE {
WINDOW ex:w1 { ?s ?p ?o }
}
"#;
You can then create an instance of the RSPEngine and pass the query to it:
let mut rsp_engine = new;
Initialize the engine to create windows and streams:
rsp_engine.initialize?;
You can add stream elements to the RSPEngine using streams. First get a stream reference:
let stream = rsp_engine.get_stream.unwrap;
Then add quads with timestamps:
use *;
let quad = new;
stream.add_quads?;
Here's a complete example:
use *;
use RSPEngine;
use tokio;
async
async
Features
- RSP-QL Support: Full RSP-QL syntax for defining continuous queries
- Multiple Windows: Support for multiple sliding/tumbling windows
- Stream-Static Joins: Join streaming data with static background knowledge
- SPARQL Aggregations: COUNT, AVG, MIN, MAX, SUM with GROUP BY
- Async Processing: Built on Tokio for high-performance async processing
- Named Graphs: Full support for RDF named graphs in queries
- Real-time Results: Continuous query evaluation with RStream/IStream/DStream semantics
Testing
Run the test suite:
Run integration tests specifically:
API Documentation
RSPEngine
new(query: String)- Create a new RSP engine with RSP-QL queryinitialize()- Initialize windows and streams from the querystart_processing()- Start async processing and return results receiverget_stream(name: &str)- Get a stream by name for adding dataadd_static_data(quad: Quad)- Add static background knowledge
CSPARQLWindow
new(name, range, slide, strategy, tick, start_time)- Create a windowadd(quad, timestamp)- Add a quad to the windowsubscribe(stream_type, callback)- Subscribe to window emissions
R2ROperator
new(query: String)- Create R2R operator with SPARQL queryadd_static_data(quad)- Add static data for joinsexecute(container)- Execute query on streaming data
Examples
See the integration tests in tests/integration/ for comprehensive examples:
- Basic RSP engine usage
- Aggregation queries (COUNT, AVG, MIN/MAX, SUM)
- Window-R2R integration
- Named graph queries
- Static data joins
License
This code is copyrighted by Ghent University - imec and released under the MIT Licence
Acknowledgments
This project is a Rust port of RSP-JS, an RDF Stream Processing library for JavaScript/Typescript.
We would like to thank the original authors and contributors of RSP-JS for their excellent work and for providing the foundation that made this Rust implementation possible.
The core concepts, RSP-QL syntax support, and windowing semantics have been adapted from the original TypeScript implementation to provide the same functionality in a high-performance Rust library.
Contact
For any questions, please contact Kush or create an issue in the repository.