1use oxirs_core::model::{NamedNode, Subject, Triple};
5
6use super::mapping_types::{
7 build_triple_from_pom, DataSource, MappingError, MappingResult, MappingRule, ObjectSpec,
8 PredicateObjectMap, Row, Template,
9};
10
11#[derive(Debug, Default, Clone)]
18pub struct MappingEngine {
19 pub skip_errors: bool,
21}
22
23impl MappingEngine {
24 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn new_lenient() -> Self {
31 Self { skip_errors: true }
32 }
33
34 pub fn execute(&self, rule: &MappingRule) -> MappingResult<Vec<Triple>> {
36 let (headers, rows) = self.extract_rows(&rule.source)?;
37 let _ = headers; self.map_rows(rule, &rows)
39 }
40
41 pub fn execute_all(&self, rules: &[MappingRule]) -> MappingResult<Vec<Triple>> {
43 let mut all_triples = Vec::new();
44 for rule in rules {
45 let mut triples = self.execute(rule)?;
46 all_triples.append(&mut triples);
47 }
48 Ok(all_triples)
49 }
50
51 fn extract_rows(&self, source: &DataSource) -> MappingResult<(Vec<String>, Vec<Row>)> {
54 match source {
55 DataSource::Csv { content, delimiter } => Self::parse_csv(content, *delimiter),
56 DataSource::Json { content, json_path } => {
57 let rows = Self::parse_json(content, json_path.as_deref())?;
58 Ok((Vec::new(), rows))
60 }
61 DataSource::InlineValues { rows, headers } => {
62 let parsed_rows: Vec<Row> = rows
63 .iter()
64 .map(|row_values| {
65 let pairs = headers
66 .iter()
67 .zip(row_values.iter())
68 .map(|(h, v)| (h.clone(), v.clone()));
69 Row::from_pairs(pairs)
70 })
71 .collect();
72 Ok((headers.clone(), parsed_rows))
73 }
74 }
75 }
76
77 fn map_rows(&self, rule: &MappingRule, rows: &[Row]) -> MappingResult<Vec<Triple>> {
78 let mut triples = Vec::with_capacity(rows.len() * rule.predicate_object_maps.len());
79
80 for (row_idx, row) in rows.iter().enumerate() {
81 let subject_iri = match rule.subject_template.render(row, row_idx) {
83 Ok(iri) => iri,
84 Err(e) => {
85 if self.skip_errors {
86 continue;
87 }
88 return Err(e);
89 }
90 };
91
92 let subject_node =
93 NamedNode::new(&subject_iri).map_err(|e| MappingError::InvalidIri {
94 template: rule.subject_template.pattern.clone(),
95 iri: format!("{subject_iri} ({e})"),
96 })?;
97 let subject: Subject = subject_node.into();
98
99 for pom in &rule.predicate_object_maps {
101 let result = build_triple_from_pom(&subject, pom, row, row_idx);
102 match result {
103 Ok(triple) => triples.push(triple),
104 Err(e) => {
105 if self.skip_errors {
106 continue;
107 }
108 return Err(e);
109 }
110 }
111 }
112 }
113 Ok(triples)
114 }
115
116 pub fn parse_csv(content: &str, delimiter: char) -> MappingResult<(Vec<String>, Vec<Row>)> {
126 let lines = split_csv_lines(content);
127 if lines.is_empty() {
128 return Ok((Vec::new(), Vec::new()));
129 }
130
131 let headers = parse_csv_line(&lines[0], delimiter);
133 if headers.is_empty() {
134 return Err(MappingError::CsvParseError {
135 line: 1,
136 message: "empty header row".to_string(),
137 });
138 }
139
140 let mut rows = Vec::with_capacity(lines.len().saturating_sub(1));
141 for (line_idx, line) in lines.iter().enumerate().skip(1) {
142 if line.trim().is_empty() {
143 continue;
144 }
145 let values = parse_csv_line(line, delimiter);
146 if values.len() != headers.len() {
147 return Err(MappingError::CsvParseError {
148 line: line_idx + 1,
149 message: format!("expected {} fields but got {}", headers.len(), values.len()),
150 });
151 }
152 let row = Row::from_pairs(headers.iter().cloned().zip(values));
153 rows.push(row);
154 }
155 Ok((headers, rows))
156 }
157
158 pub fn parse_json(content: &str, json_path: Option<&str>) -> MappingResult<Vec<Row>> {
169 let value: serde_json::Value =
170 serde_json::from_str(content).map_err(|e| MappingError::JsonParseError {
171 message: e.to_string(),
172 })?;
173
174 let array = if let Some(path) = json_path {
176 navigate_json_path(&value, path)?
177 } else {
178 &value
179 };
180
181 let arr = array.as_array().ok_or_else(|| {
182 let path_desc = json_path.unwrap_or("<root>");
183 MappingError::JsonPathNoMatch {
184 path: path_desc.to_string(),
185 }
186 })?;
187
188 let mut rows = Vec::with_capacity(arr.len());
189 for element in arr {
190 let obj = element
191 .as_object()
192 .ok_or_else(|| MappingError::JsonParseError {
193 message: "JSON array element is not an object".to_string(),
194 })?;
195 let row = Row::from_pairs(
196 obj.iter()
197 .map(|(k, v)| (k.clone(), json_value_to_string(v))),
198 );
199 rows.push(row);
200 }
201 Ok(rows)
202 }
203}
204
205fn navigate_json_path<'a>(
208 value: &'a serde_json::Value,
209 path: &str,
210) -> MappingResult<&'a serde_json::Value> {
211 let mut current = value;
212 for key in path.split('.') {
213 current = current
214 .get(key)
215 .ok_or_else(|| MappingError::JsonPathNoMatch {
216 path: path.to_string(),
217 })?;
218 }
219 Ok(current)
220}
221
222fn json_value_to_string(v: &serde_json::Value) -> String {
223 match v {
224 serde_json::Value::String(s) => s.clone(),
225 serde_json::Value::Null => String::new(),
226 serde_json::Value::Bool(b) => b.to_string(),
227 serde_json::Value::Number(n) => n.to_string(),
228 other => other.to_string(),
229 }
230}
231
232fn split_csv_lines(content: &str) -> Vec<String> {
236 let mut lines = Vec::new();
237 let mut current = String::new();
238 let mut in_quotes = false;
239 let mut chars = content.chars().peekable();
240
241 while let Some(ch) = chars.next() {
242 match ch {
243 '"' => {
244 in_quotes = !in_quotes;
245 current.push(ch);
246 }
247 '\r' => {
248 if chars.peek() == Some(&'\n') {
250 let _ = chars.next();
251 }
252 if !in_quotes {
253 lines.push(std::mem::take(&mut current));
254 } else {
255 current.push('\n');
256 }
257 }
258 '\n' if !in_quotes => {
259 lines.push(std::mem::take(&mut current));
260 }
261 _ => {
262 current.push(ch);
263 }
264 }
265 }
266 if !current.is_empty() {
267 lines.push(current);
268 }
269 lines
270}
271
272fn parse_csv_line(line: &str, delimiter: char) -> Vec<String> {
274 let mut fields = Vec::new();
275 let mut current = String::new();
276 let mut in_quotes = false;
277 let mut chars = line.chars().peekable();
278
279 while let Some(ch) = chars.next() {
280 if in_quotes {
281 if ch == '"' {
282 if chars.peek() == Some(&'"') {
283 current.push('"');
285 let _ = chars.next();
286 } else {
287 in_quotes = false;
288 }
289 } else {
290 current.push(ch);
291 }
292 } else if ch == '"' {
293 in_quotes = true;
294 } else if ch == delimiter {
295 fields.push(std::mem::take(&mut current));
296 } else {
297 current.push(ch);
298 }
299 }
300 fields.push(current);
301 fields
302}
303
304#[derive(Debug)]
320pub struct MappingRuleBuilder {
321 rule: MappingRule,
322}
323
324impl MappingRuleBuilder {
325 pub fn new(name: impl Into<String>) -> Self {
327 let name_str = name.into();
328 Self {
329 rule: MappingRule {
330 name: name_str,
331 source: DataSource::Csv {
332 content: String::new(),
333 delimiter: ',',
334 },
335 subject_template: Template::new(""),
336 predicate_object_maps: Vec::new(),
337 graph_name: None,
338 },
339 }
340 }
341
342 pub fn csv_source(mut self, content: impl Into<String>) -> Self {
344 self.rule.source = DataSource::Csv {
345 content: content.into(),
346 delimiter: ',',
347 };
348 self
349 }
350
351 pub fn csv_source_with_delimiter(
353 mut self,
354 content: impl Into<String>,
355 delimiter: char,
356 ) -> Self {
357 self.rule.source = DataSource::Csv {
358 content: content.into(),
359 delimiter,
360 };
361 self
362 }
363
364 pub fn json_source(mut self, content: impl Into<String>) -> Self {
366 self.rule.source = DataSource::Json {
367 content: content.into(),
368 json_path: None,
369 };
370 self
371 }
372
373 pub fn json_source_with_path(
375 mut self,
376 content: impl Into<String>,
377 json_path: impl Into<String>,
378 ) -> Self {
379 self.rule.source = DataSource::Json {
380 content: content.into(),
381 json_path: Some(json_path.into()),
382 };
383 self
384 }
385
386 pub fn inline_source(mut self, headers: Vec<String>, rows: Vec<Vec<String>>) -> Self {
388 self.rule.source = DataSource::InlineValues { rows, headers };
389 self
390 }
391
392 pub fn subject_template(mut self, template: impl Into<String>) -> Self {
394 self.rule.subject_template = Template::new(template);
395 self
396 }
397
398 pub fn map(mut self, predicate: impl Into<String>, object: ObjectSpec) -> Self {
400 self.rule.predicate_object_maps.push(PredicateObjectMap {
401 predicate: predicate.into(),
402 object_template: object,
403 });
404 self
405 }
406
407 pub fn graph(mut self, graph_name: impl Into<String>) -> Self {
409 self.rule.graph_name = Some(graph_name.into());
410 self
411 }
412
413 pub fn build(self) -> MappingRule {
415 self.rule
416 }
417}