rsp-rs 0.1.2

A RDF Stream Processing engine in Rust, supporting RSP-QL queries with sliding windows and real-time analytics
Documentation

RSP-RS

An RDF Stream Processing Engine in Rust built on top of Oxigraph for SPARQL querying and Tokio for async processing.

Installation

Add this to your Cargo.toml:

[dependencies]
rsp-rs = "0.1.0"

Or install with cargo:

cargo add rsp-rs

Usage

You can define a query using the RSP-QL syntax. An example query is shown below:

use rsp_rs::RSPEngine;

let query = r#"
    PREFIX ex: <https://rsp.rs/>
    REGISTER RStream <output> AS
    SELECT *
    FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 10 STEP 2]
    WHERE {
        WINDOW ex:w1 { ?s ?p ?o }
    }
"#;

You can then create an instance of the RSPEngine and pass the query to it:

let mut rsp_engine = RSPEngine::new(query);

Initialize the engine to create windows and streams:

rsp_engine.initialize()?;

You can add stream elements to the RSPEngine using streams. First get a stream reference:

let stream = rsp_engine.get_stream("https://rsp.rs/stream1").unwrap();

Then add quads with timestamps:

use oxigraph::model::*;

let quad = Quad::new(
    NamedNode::new("https://rsp.rs/test_subject_1")?,
    NamedNode::new("https://rsp.rs/test_property")?,
    NamedNode::new("https://rsp.rs/test_object")?,
    GraphName::DefaultGraph,
);

stream.add_quads(vec![quad], timestamp_value)?;

Here's a complete example:

use oxigraph::model::*;
use rsp_rs::RSPEngine;
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let query = r#"
        PREFIX ex: <https://rsp.rs/>
        REGISTER RStream <output> AS
        SELECT *
        FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 10 STEP 2]
        WHERE {
            WINDOW ex:w1 { ?s ?p ?o }
        }
    "#;

    let mut rsp_engine = RSPEngine::new(query);
    rsp_engine.initialize()?;

    let stream = rsp_engine.get_stream("https://rsp.rs/stream1").unwrap();

    // Start processing and get results receiver
    let mut result_receiver = rsp_engine.start_processing();

    // Generate some test data
    generate_data(10, stream).await;

    // Collect results
    let mut results = Vec::new();
    while let Some(result) = result_receiver.recv().await {
        println!("Received result: {}", result.bindings);
        results.push(result.bindings);
    }

    println!("Total results: {}", results.len());
    Ok(())
}

async fn generate_data(num_events: usize, stream: &rsp_rs::RDFStream) {
    for i in 0..num_events {
        let quad = Quad::new(
            NamedNode::new(&format!("https://rsp.rs/test_subject_{}", i)).unwrap(),
            NamedNode::new("https://rsp.rs/test_property").unwrap(),
            NamedNode::new("https://rsp.rs/test_object").unwrap(),
            GraphName::DefaultGraph,
        );

        stream.add_quads(vec![quad], i as i64).unwrap();
        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    }
}

Features

  • RSP-QL Support: Full RSP-QL syntax for defining continuous queries
  • Multiple Windows: Support for multiple sliding/tumbling windows
  • Stream-Static Joins: Join streaming data with static background knowledge
  • SPARQL Aggregations: COUNT, AVG, MIN, MAX, SUM with GROUP BY
  • Async Processing: Built on Tokio for high-performance async processing
  • Named Graphs: Full support for RDF named graphs in queries
  • Real-time Results: Continuous query evaluation with RStream/IStream/DStream semantics

Testing

Run the test suite:

cargo test

Run integration tests specifically:

cargo test --test integration_tests

API Documentation

RSPEngine

  • new(query: String) - Create a new RSP engine with RSP-QL query
  • initialize() - Initialize windows and streams from the query
  • start_processing() - Start async processing and return results receiver
  • get_stream(name: &str) - Get a stream by name for adding data
  • add_static_data(quad: Quad) - Add static background knowledge

CSPARQLWindow

  • new(name, range, slide, strategy, tick, start_time) - Create a window
  • add(quad, timestamp) - Add a quad to the window
  • subscribe(stream_type, callback) - Subscribe to window emissions

R2ROperator

  • new(query: String) - Create R2R operator with SPARQL query
  • add_static_data(quad) - Add static data for joins
  • execute(container) - Execute query on streaming data

Examples

See the integration tests in tests/integration/ for comprehensive examples:

  • Basic RSP engine usage
  • Aggregation queries (COUNT, AVG, MIN/MAX, SUM)
  • Window-R2R integration
  • Named graph queries
  • Static data joins

License

This code is copyrighted by Ghent University - imec and released under the MIT Licence

Acknowledgments

This project is a Rust port of RSP-JS, an RDF Stream Processing library for JavaScript/Typescript.

We would like to thank the original authors and contributors of RSP-JS for their excellent work and for providing the foundation that made this Rust implementation possible.

The core concepts, RSP-QL syntax support, and windowing semantics have been adapted from the original TypeScript implementation to provide the same functionality in a high-performance Rust library.

Contact

For any questions, please contact Kush or create an issue in the repository.