rsp_rs/parsing/
rspql_parser.rs1use crate::parsed_query::{Operator, ParsedQuery, WindowDefinition};
2use regex::Regex;
3use std::collections::HashMap;
4
5pub struct RSPQLParser {
6 pub rspql_query: String,
7}
8
9impl RSPQLParser {
10 pub fn new(query: String) -> Self {
11 Self { rspql_query: query }
12 }
13
14 pub fn parse(&self) -> ParsedQuery {
15 let mut parsed = ParsedQuery::new("".to_string());
16 let mut sparql_lines: Vec<String> = Vec::new();
17 let mut prefix_mapper: HashMap<String, String> = HashMap::new();
18
19 for line in self.rspql_query.lines() {
20 let trimmed_line = line.trim();
21 if trimmed_line.starts_with("REGISTER") {
22 let re = Regex::new(r"REGISTER +([^ ]+) +<([^>]+)> AS").unwrap();
23 for captures in re.captures_iter(trimmed_line) {
24 let op_str = captures.get(1).unwrap().as_str();
25 let name = captures.get(2).unwrap().as_str();
26 if let Some(operator) = Self::parse_operator(op_str) {
27 parsed.set_r2s(operator, name.to_string());
28 }
29 }
30 } else if trimmed_line.starts_with("FROM NAMED WINDOW") {
31 let re = Regex::new(r"FROM +NAMED +WINDOW +([^ ]+) +ON +STREAM +([^ ]+) +\[RANGE +([^ ]+) +STEP +([^ ]+)\]").unwrap();
32 for captures in re.captures_iter(trimmed_line) {
33 let window_name =
34 Self::unwrap(captures.get(1).unwrap().as_str(), &prefix_mapper);
35 let stream_name =
36 Self::unwrap(captures.get(2).unwrap().as_str(), &prefix_mapper);
37 let width = captures
38 .get(3)
39 .unwrap()
40 .as_str()
41 .parse::<i64>()
42 .unwrap_or(0);
43 let slide = captures
44 .get(4)
45 .unwrap()
46 .as_str()
47 .parse::<i64>()
48 .unwrap_or(0);
49 let window_def = WindowDefinition {
50 window_name,
51 stream_name,
52 width,
53 slide,
54 };
55 parsed.add_s2r_window(window_def);
56 }
57 } else {
58 let mut sparql_line = trimmed_line.to_string();
59 if sparql_line.starts_with("WINDOW") {
60 sparql_line = sparql_line.replace("WINDOW", "GRAPH");
61 }
62 if sparql_line.starts_with("PREFIX") {
63 let re = Regex::new(r"PREFIX +([^:]*): +<([^>]+)>").unwrap();
64 for captures in re.captures_iter(&sparql_line) {
65 let prefix = captures.get(1).unwrap().as_str().to_string();
66 let iri = captures.get(2).unwrap().as_str().to_string();
67 prefix_mapper.insert(prefix, iri);
68 }
69 }
70 sparql_lines.push(sparql_line);
71 }
72 }
73 parsed.set_sparql_query(sparql_lines.join("\n"));
74 parsed
75 }
76
77 fn parse_operator(op_str: &str) -> Option<Operator> {
78 match op_str {
79 "RStream" => Some(Operator::RStream),
80 "IStream" => Some(Operator::IStream),
81 "DStream" => Some(Operator::DStream),
82 _ => None,
83 }
84 }
85
86 fn unwrap(prefixed_iri: &str, mapper: &HashMap<String, String>) -> String {
87 let trimmed = prefixed_iri.trim();
88 if trimmed.starts_with('<') && trimmed.ends_with('>') {
89 trimmed[1..trimmed.len() - 1].to_string()
90 } else {
91 let parts: Vec<&str> = trimmed.split(':').collect();
92 if parts.len() == 2 {
93 if let Some(iri) = mapper.get(parts[0]) {
94 format!("{}{}", iri, parts[1])
95 } else {
96 "".to_string()
97 }
98 } else {
99 "".to_string()
100 }
101 }
102 }
103}