rsp_rs/engine/
r2r.rs

1use crate::quad_container::QuadContainer;
2use oxigraph::model::Quad;
3use oxigraph::sparql::QueryResults;
4use oxigraph::store::Store;
5use std::collections::HashSet;
6
7/// R2R (Relation-to-Relation) Operator
8/// Executes SPARQL queries over streaming data combined with static data
9#[derive(Clone)]
10pub struct R2ROperator {
11    pub(crate) query: String,
12    pub(crate) static_data: HashSet<Quad>,
13}
14
15impl R2ROperator {
16    /// Create a new R2ROperator with a SPARQL query
17    pub fn new(query: String) -> Self {
18        Self {
19            query,
20            static_data: HashSet::new(),
21        }
22    }
23
24    /// Add a static quad to the operator's static data store
25    pub fn add_static_data(&mut self, quad: Quad) {
26        self.static_data.insert(quad);
27    }
28
29    /// Execute the SPARQL query over the container's quads combined with static data
30    pub fn execute(
31        &self,
32        container: &QuadContainer,
33    ) -> Result<QueryResults, Box<dyn std::error::Error>> {
34        // Create an in-memory store
35        let store = Store::new()?;
36
37        // Add all quads from the container
38        for quad in &container.elements {
39            store.insert(quad)?;
40        }
41
42        // Add all static quads
43        for quad in &self.static_data {
44            store.insert(quad)?;
45        }
46
47        #[cfg(debug_assertions)]
48        {
49            println!("[R2R] Executing query:");
50            println!("{}", self.query);
51            println!("[R2R] Container has {} quads", container.len());
52            println!("[R2R] Static data has {} quads", self.static_data.len());
53            for (i, quad) in container.elements.iter().enumerate() {
54                println!("[R2R]   Quad {}: {:?}", i + 1, quad);
55            }
56        }
57
58        // Execute the query
59        // Note: Oxigraph doesn't support custom extension functions in the same way as Comunica
60        // For custom functions like sqrt and pow, you would need to:
61        // 1. Preprocess the query to replace custom functions with SPARQL built-ins
62        // 2. Use SPARQL BIND expressions with standard math operations
63        // 3. Or implement a query rewriter
64        use oxigraph::sparql::SparqlEvaluator;
65        SparqlEvaluator::new()
66            .parse_query(&self.query)?
67            .on_store(&store)
68            .execute()
69            .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
70    }
71
72    /// Execute the SPARQL query and return results as a vector of solution mappings
73    /// This is a convenience method that handles common result types
74    pub fn execute_select(
75        &self,
76        container: &QuadContainer,
77    ) -> Result<Vec<String>, Box<dyn std::error::Error>> {
78        let results = self.execute(container)?;
79
80        let mut output = Vec::new();
81
82        if let QueryResults::Solutions(solutions) = results {
83            for solution in solutions {
84                let solution = solution?;
85                output.push(format!("{:?}", solution));
86            }
87        }
88
89        Ok(output)
90    }
91
92    /// Get a reference to the query string
93    pub fn query(&self) -> &str {
94        &self.query
95    }
96
97    /// Get the number of static quads
98    pub fn static_data_size(&self) -> usize {
99        self.static_data.len()
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use oxigraph::model::*;
107
108    #[test]
109    fn test_r2r_operator_creation() {
110        let query = "SELECT * WHERE { ?s ?p ?o }".to_string();
111        let operator = R2ROperator::new(query.clone());
112        assert_eq!(operator.query(), query);
113        assert_eq!(operator.static_data_size(), 0);
114    }
115
116    #[test]
117    fn test_add_static_data() {
118        let query = "SELECT * WHERE { ?s ?p ?o }".to_string();
119        let mut operator = R2ROperator::new(query);
120
121        let quad = Quad::new(
122            NamedNode::new("http://example.org/subject").unwrap(),
123            NamedNode::new("http://example.org/predicate").unwrap(),
124            NamedNode::new("http://example.org/object").unwrap(),
125            GraphName::DefaultGraph,
126        );
127
128        operator.add_static_data(quad);
129        assert_eq!(operator.static_data_size(), 1);
130    }
131
132    #[test]
133    fn test_execute_query() -> Result<(), Box<dyn std::error::Error>> {
134        let query = "SELECT * WHERE { ?s ?p ?o }".to_string();
135        let mut operator = R2ROperator::new(query);
136
137        // Add some static data
138        let static_quad = Quad::new(
139            NamedNode::new("http://example.org/static").unwrap(),
140            NamedNode::new("http://example.org/isStatic").unwrap(),
141            Literal::new_simple_literal("true"),
142            GraphName::DefaultGraph,
143        );
144        operator.add_static_data(static_quad.clone());
145
146        // Create a container with some quads
147        let mut container_quads = HashSet::new();
148        let stream_quad = Quad::new(
149            NamedNode::new("http://example.org/stream").unwrap(),
150            NamedNode::new("http://example.org/hasValue").unwrap(),
151            Literal::new_simple_literal("42"),
152            GraphName::DefaultGraph,
153        );
154        container_quads.insert(stream_quad);
155
156        let container = QuadContainer::new(container_quads, 0);
157
158        // Execute the query
159        let results = operator.execute(&container)?;
160
161        // Check that we got results
162        if let QueryResults::Solutions(solutions) = results {
163            let count = solutions.count();
164            assert_eq!(count, 2); // One from static data, one from container
165        }
166
167        Ok(())
168    }
169}