rsp_rs/parsing/
rspql_parser.rs

1use crate::parsed_query::{Operator, ParsedQuery, WindowDefinition};
2use regex::Regex;
3use std::collections::HashMap;
4
5pub struct RSPQLParser {
6    pub rspql_query: String,
7}
8
9impl RSPQLParser {
10    pub fn new(query: String) -> Self {
11        Self { rspql_query: query }
12    }
13
14    pub fn parse(&self) -> ParsedQuery {
15        let mut parsed = ParsedQuery::new("".to_string());
16        let mut sparql_lines: Vec<String> = Vec::new();
17        let mut prefix_mapper: HashMap<String, String> = HashMap::new();
18
19        for line in self.rspql_query.lines() {
20            let trimmed_line = line.trim();
21            if trimmed_line.starts_with("REGISTER") {
22                let re = Regex::new(r"REGISTER +([^ ]+) +<([^>]+)> AS").unwrap();
23                for captures in re.captures_iter(trimmed_line) {
24                    let op_str = captures.get(1).unwrap().as_str();
25                    let name = captures.get(2).unwrap().as_str();
26                    if let Some(operator) = Self::parse_operator(op_str) {
27                        parsed.set_r2s(operator, name.to_string());
28                    }
29                }
30            } else if trimmed_line.starts_with("FROM NAMED WINDOW") {
31                let re = Regex::new(r"FROM +NAMED +WINDOW +([^ ]+) +ON +STREAM +([^ ]+) +\[RANGE +([^ ]+) +STEP +([^ ]+)\]").unwrap();
32                for captures in re.captures_iter(trimmed_line) {
33                    let window_name =
34                        Self::unwrap(captures.get(1).unwrap().as_str(), &prefix_mapper);
35                    let stream_name =
36                        Self::unwrap(captures.get(2).unwrap().as_str(), &prefix_mapper);
37                    let width = captures
38                        .get(3)
39                        .unwrap()
40                        .as_str()
41                        .parse::<i64>()
42                        .unwrap_or(0);
43                    let slide = captures
44                        .get(4)
45                        .unwrap()
46                        .as_str()
47                        .parse::<i64>()
48                        .unwrap_or(0);
49                    let window_def = WindowDefinition {
50                        window_name,
51                        stream_name,
52                        width,
53                        slide,
54                    };
55                    parsed.add_s2r_window(window_def);
56                }
57            } else {
58                let mut sparql_line = trimmed_line.to_string();
59                if sparql_line.starts_with("WINDOW") {
60                    sparql_line = sparql_line.replace("WINDOW", "GRAPH");
61                }
62                if sparql_line.starts_with("PREFIX") {
63                    let re = Regex::new(r"PREFIX +([^:]*): +<([^>]+)>").unwrap();
64                    for captures in re.captures_iter(&sparql_line) {
65                        let prefix = captures.get(1).unwrap().as_str().to_string();
66                        let iri = captures.get(2).unwrap().as_str().to_string();
67                        prefix_mapper.insert(prefix, iri);
68                    }
69                }
70                sparql_lines.push(sparql_line);
71            }
72        }
73        parsed.set_sparql_query(sparql_lines.join("\n"));
74        parsed
75    }
76
77    fn parse_operator(op_str: &str) -> Option<Operator> {
78        match op_str {
79            "RStream" => Some(Operator::RStream),
80            "IStream" => Some(Operator::IStream),
81            "DStream" => Some(Operator::DStream),
82            _ => None,
83        }
84    }
85
86    fn unwrap(prefixed_iri: &str, mapper: &HashMap<String, String>) -> String {
87        let trimmed = prefixed_iri.trim();
88        if trimmed.starts_with('<') && trimmed.ends_with('>') {
89            trimmed[1..trimmed.len() - 1].to_string()
90        } else {
91            let parts: Vec<&str> = trimmed.split(':').collect();
92            if parts.len() == 2 {
93                if let Some(iri) = mapper.get(parts[0]) {
94                    format!("{}{}", iri, parts[1])
95                } else {
96                    "".to_string()
97                }
98            } else {
99                "".to_string()
100            }
101        }
102    }
103}