1use std::collections::BTreeMap;
6use std::io::{BufRead, BufReader, Read};
7
8use super::patch_types::{
9 Graph, PatchChange, PatchError, PatchHeader, PatchQuad, PatchResult, PatchStats, PatchTerm,
10 PatchTriple, RdfPatch,
11};
12
13pub struct PatchParser;
17
18impl PatchParser {
19 pub fn parse(input: &str) -> PatchResult<RdfPatch> {
21 let mut headers = Vec::new();
22 let mut changes = Vec::new();
23 let mut prefixes: BTreeMap<String, String> = BTreeMap::new();
24
25 for (idx, raw_line) in input.lines().enumerate() {
26 let line_no = idx + 1;
27 let line = raw_line.trim();
28
29 if line.is_empty() || line.starts_with('#') {
31 continue;
32 }
33
34 if let Some(rest) = line.strip_prefix("H ") {
35 let header = Self::parse_header(rest.trim(), line_no)?;
36 headers.push(header);
37 } else if line == "TX" {
38 changes.push(PatchChange::TransactionBegin);
39 } else if line == "TC" {
40 changes.push(PatchChange::TransactionCommit);
41 } else if line == "TA" {
42 changes.push(PatchChange::TransactionAbort);
43 } else if let Some(rest) = line.strip_prefix("PA ") {
44 let (prefix, iri) = Self::parse_prefix_decl(rest.trim(), line_no)?;
45 prefixes.insert(prefix.clone(), iri.clone());
46 changes.push(PatchChange::AddPrefix { prefix, iri });
47 } else if let Some(rest) = line.strip_prefix("PD ") {
48 let (prefix, iri) = Self::parse_prefix_decl(rest.trim(), line_no)?;
49 changes.push(PatchChange::DeletePrefix { prefix, iri });
50 } else if let Some(rest) = line.strip_prefix("A ") {
51 let change = Self::parse_triple_or_quad("A", rest.trim(), &prefixes, line_no)?;
52 changes.push(change);
53 } else if let Some(rest) = line.strip_prefix("D ") {
54 let change = Self::parse_triple_or_quad("D", rest.trim(), &prefixes, line_no)?;
55 changes.push(change);
56 } else {
57 return Err(PatchError::at(
58 line_no,
59 format!("unrecognised line: {line:?}"),
60 ));
61 }
62 }
63
64 Ok(RdfPatch { headers, changes })
65 }
66
67 pub fn parse_streaming(reader: impl Read) -> impl Iterator<Item = PatchResult<PatchChange>> {
70 StreamingPatchParser::new(reader)
71 }
72
73 fn parse_header(rest: &str, line_no: usize) -> PatchResult<PatchHeader> {
76 let mut parts = rest.splitn(2, ' ');
78 let key = parts
79 .next()
80 .ok_or_else(|| PatchError::at(line_no, "missing header key"))?
81 .trim();
82 let value_raw = parts.next().unwrap_or("").trim();
83 let value = strip_angle_brackets(value_raw);
84 match key {
85 "version" => Ok(PatchHeader::Version(value.to_string())),
86 "prev" => Ok(PatchHeader::Previous(value.to_string())),
87 "id" => Ok(PatchHeader::Id(value.to_string())),
88 other => Ok(PatchHeader::Unknown {
89 key: other.to_string(),
90 value: value.to_string(),
91 }),
92 }
93 }
94
95 fn parse_prefix_decl(rest: &str, line_no: usize) -> PatchResult<(String, String)> {
96 let mut parts = rest.splitn(2, ' ');
98 let prefix_raw = parts
99 .next()
100 .ok_or_else(|| PatchError::at(line_no, "missing prefix name"))?
101 .trim_end_matches(':');
102 let iri_raw = parts
103 .next()
104 .ok_or_else(|| PatchError::at(line_no, "missing prefix IRI"))?
105 .trim();
106 let iri = strip_angle_brackets(iri_raw);
107 Ok((prefix_raw.to_string(), iri.to_string()))
108 }
109
110 pub(crate) fn parse_triple_or_quad(
111 op: &str,
112 rest: &str,
113 prefixes: &BTreeMap<String, String>,
114 line_no: usize,
115 ) -> PatchResult<PatchChange> {
116 let rest = rest.trim_end_matches('.').trim();
118 let terms = tokenise_terms(rest, prefixes, line_no)?;
119 match terms.len() {
120 3 => {
121 let triple = PatchTriple::new(terms[0].clone(), terms[1].clone(), terms[2].clone());
122 if op == "A" {
123 Ok(PatchChange::AddTriple(triple))
124 } else {
125 Ok(PatchChange::DeleteTriple(triple))
126 }
127 }
128 4 => {
129 let quad = PatchQuad::new(
130 terms[0].clone(),
131 terms[1].clone(),
132 terms[2].clone(),
133 terms[3].clone(),
134 );
135 if op == "A" {
136 Ok(PatchChange::AddQuad(quad))
137 } else {
138 Ok(PatchChange::DeleteQuad(quad))
139 }
140 }
141 n => Err(PatchError::at(
142 line_no,
143 format!("expected 3 or 4 terms, got {n}"),
144 )),
145 }
146 }
147}
148
149struct StreamingPatchParser<R: Read> {
152 reader: BufReader<R>,
153 line_no: usize,
154 prefixes: BTreeMap<String, String>,
155 done: bool,
156}
157
158impl<R: Read> StreamingPatchParser<R> {
159 fn new(reader: R) -> Self {
160 Self {
161 reader: BufReader::new(reader),
162 line_no: 0,
163 prefixes: BTreeMap::new(),
164 done: false,
165 }
166 }
167}
168
169impl<R: Read> Iterator for StreamingPatchParser<R> {
170 type Item = PatchResult<PatchChange>;
171
172 fn next(&mut self) -> Option<Self::Item> {
173 if self.done {
174 return None;
175 }
176 loop {
177 let mut raw = String::new();
178 match self.reader.read_line(&mut raw) {
179 Ok(0) => {
180 self.done = true;
181 return None;
182 }
183 Err(e) => {
184 self.done = true;
185 return Some(Err(PatchError::at(self.line_no, e.to_string())));
186 }
187 Ok(_) => {}
188 }
189 self.line_no += 1;
190 let line = raw.trim();
191
192 if line.is_empty() || line.starts_with('#') {
193 continue;
194 }
195
196 if line.starts_with("H ") {
198 continue;
199 }
200
201 let result = if line == "TX" {
202 Ok(PatchChange::TransactionBegin)
203 } else if line == "TC" {
204 Ok(PatchChange::TransactionCommit)
205 } else if line == "TA" {
206 Ok(PatchChange::TransactionAbort)
207 } else if let Some(rest) = line.strip_prefix("PA ") {
208 match parse_prefix_decl_inline(rest.trim(), self.line_no) {
209 Ok((prefix, iri)) => {
210 self.prefixes.insert(prefix.clone(), iri.clone());
211 Ok(PatchChange::AddPrefix { prefix, iri })
212 }
213 Err(e) => Err(e),
214 }
215 } else if let Some(rest) = line.strip_prefix("PD ") {
216 match parse_prefix_decl_inline(rest.trim(), self.line_no) {
217 Ok((prefix, iri)) => Ok(PatchChange::DeletePrefix { prefix, iri }),
218 Err(e) => Err(e),
219 }
220 } else if let Some(rest) = line.strip_prefix("A ") {
221 PatchParser::parse_triple_or_quad("A", rest.trim(), &self.prefixes, self.line_no)
222 } else if let Some(rest) = line.strip_prefix("D ") {
223 PatchParser::parse_triple_or_quad("D", rest.trim(), &self.prefixes, self.line_no)
224 } else {
225 Err(PatchError::at(
226 self.line_no,
227 format!("unrecognised line: {line:?}"),
228 ))
229 };
230
231 return Some(result);
232 }
233 }
234}
235
236pub fn apply_patch(graph: &mut Graph, patch: &RdfPatch) -> PatchResult<PatchStats> {
243 let mut stats = PatchStats::default();
244 let mut in_tx = false;
245 let mut tx_adds: Vec<PatchTriple> = Vec::new();
247 let mut tx_deletes: Vec<PatchTriple> = Vec::new();
248 let mut tx_prefix_adds: Vec<(String, String)> = Vec::new();
249
250 for change in &patch.changes {
251 match change {
252 PatchChange::TransactionBegin => {
253 in_tx = true;
254 tx_adds.clear();
255 tx_deletes.clear();
256 tx_prefix_adds.clear();
257 stats.transactions += 1;
258 }
259 PatchChange::TransactionCommit => {
260 for t in tx_adds.drain(..) {
262 if graph.add_triple(t) {
263 stats.triples_added += 1;
264 }
265 }
266 for t in &tx_deletes {
267 if graph.remove_triple(t) {
268 stats.triples_deleted += 1;
269 }
270 }
271 tx_deletes.clear();
272 for (p, i) in tx_prefix_adds.drain(..) {
273 graph.prefixes.insert(p, i);
274 stats.prefixes_added += 1;
275 }
276 in_tx = false;
277 }
278 PatchChange::TransactionAbort => {
279 tx_adds.clear();
281 tx_deletes.clear();
282 tx_prefix_adds.clear();
283 in_tx = false;
284 stats.aborts += 1;
285 }
286 PatchChange::AddPrefix { prefix, iri } => {
287 if in_tx {
288 tx_prefix_adds.push((prefix.clone(), iri.clone()));
289 } else {
290 graph.prefixes.insert(prefix.clone(), iri.clone());
291 stats.prefixes_added += 1;
292 }
293 }
294 PatchChange::DeletePrefix { prefix, .. } => {
295 graph.prefixes.remove(prefix.as_str());
296 stats.prefixes_deleted += 1;
297 }
298 PatchChange::AddTriple(t) => {
299 if in_tx {
300 tx_adds.push(t.clone());
301 } else if graph.add_triple(t.clone()) {
302 stats.triples_added += 1;
303 }
304 }
305 PatchChange::DeleteTriple(t) => {
306 if in_tx {
307 tx_deletes.push(t.clone());
308 } else if graph.remove_triple(t) {
309 stats.triples_deleted += 1;
310 }
311 }
312 PatchChange::AddQuad(q) => {
314 let t = PatchTriple::new(q.subject.clone(), q.predicate.clone(), q.object.clone());
315 if in_tx {
316 tx_adds.push(t);
317 } else if graph.add_triple(t) {
318 stats.triples_added += 1;
319 }
320 }
321 PatchChange::DeleteQuad(q) => {
322 let t = PatchTriple::new(q.subject.clone(), q.predicate.clone(), q.object.clone());
323 if in_tx {
324 tx_deletes.push(t.clone());
325 } else if graph.remove_triple(&t) {
326 stats.triples_deleted += 1;
327 }
328 }
329 }
330 }
331
332 Ok(stats)
333}
334
335pub fn diff_to_patch(old: &Graph, new: &Graph) -> RdfPatch {
342 let mut changes = Vec::new();
343
344 for triple in old.iter() {
346 if !new.contains(triple) {
347 changes.push(PatchChange::DeleteTriple(triple.clone()));
348 }
349 }
350
351 for triple in new.iter() {
353 if !old.contains(triple) {
354 changes.push(PatchChange::AddTriple(triple.clone()));
355 }
356 }
357
358 for (prefix, iri) in &new.prefixes {
360 if old.prefixes.get(prefix) != Some(iri) {
361 changes.push(PatchChange::AddPrefix {
362 prefix: prefix.clone(),
363 iri: iri.clone(),
364 });
365 }
366 }
367
368 for (prefix, iri) in &old.prefixes {
370 if !new.prefixes.contains_key(prefix.as_str()) {
371 changes.push(PatchChange::DeletePrefix {
372 prefix: prefix.clone(),
373 iri: iri.clone(),
374 });
375 }
376 }
377
378 RdfPatch {
379 headers: Vec::new(),
380 changes,
381 }
382}
383
384pub(crate) fn tokenise_terms(
390 input: &str,
391 prefixes: &BTreeMap<String, String>,
392 line_no: usize,
393) -> PatchResult<Vec<PatchTerm>> {
394 let mut terms = Vec::new();
395 let chars: Vec<char> = input.chars().collect();
396 let mut pos = 0;
397
398 while pos < chars.len() {
399 while pos < chars.len() && chars[pos].is_whitespace() {
401 pos += 1;
402 }
403 if pos >= chars.len() {
404 break;
405 }
406
407 if chars[pos] == '<' {
408 pos += 1;
410 let start = pos;
411 while pos < chars.len() && chars[pos] != '>' {
412 pos += 1;
413 }
414 if pos >= chars.len() {
415 return Err(PatchError::at(line_no, "unterminated IRI"));
416 }
417 let iri: String = chars[start..pos].iter().collect();
418 pos += 1; terms.push(PatchTerm::iri(iri));
420 } else if chars[pos] == '"' {
421 pos += 1;
423 let mut value = String::new();
424 while pos < chars.len() {
425 if chars[pos] == '\\' && pos + 1 < chars.len() {
426 pos += 1;
427 match chars[pos] {
428 '"' => value.push('"'),
429 '\\' => value.push('\\'),
430 'n' => value.push('\n'),
431 'r' => value.push('\r'),
432 't' => value.push('\t'),
433 c => {
434 value.push('\\');
435 value.push(c);
436 }
437 }
438 pos += 1;
439 } else if chars[pos] == '"' {
440 break;
441 } else {
442 value.push(chars[pos]);
443 pos += 1;
444 }
445 }
446 if pos >= chars.len() {
447 return Err(PatchError::at(line_no, "unterminated literal"));
448 }
449 pos += 1; if pos < chars.len() && chars[pos] == '@' {
453 pos += 1;
454 let start = pos;
455 while pos < chars.len() && !chars[pos].is_whitespace() {
456 pos += 1;
457 }
458 let lang: String = chars[start..pos].iter().collect();
459 terms.push(PatchTerm::lang_literal(value, lang));
460 } else if pos + 1 < chars.len() && chars[pos] == '^' && chars[pos + 1] == '^' {
461 pos += 2;
462 if pos >= chars.len() || chars[pos] != '<' {
463 return Err(PatchError::at(line_no, "expected '<' after '^^'"));
464 }
465 pos += 1;
466 let start = pos;
467 while pos < chars.len() && chars[pos] != '>' {
468 pos += 1;
469 }
470 if pos >= chars.len() {
471 return Err(PatchError::at(line_no, "unterminated datatype IRI"));
472 }
473 let dt: String = chars[start..pos].iter().collect();
474 pos += 1;
475 terms.push(PatchTerm::typed_literal(value, dt));
476 } else {
477 terms.push(PatchTerm::literal(value));
478 }
479 } else if pos + 1 < chars.len() && chars[pos] == '_' && chars[pos + 1] == ':' {
480 pos += 2;
482 let start = pos;
483 while pos < chars.len() && !chars[pos].is_whitespace() && chars[pos] != '.' {
484 pos += 1;
485 }
486 let id: String = chars[start..pos].iter().collect();
487 terms.push(PatchTerm::blank_node(id));
488 } else if chars[pos] == '.' {
489 pos += 1;
491 } else {
492 let start = pos;
494 while pos < chars.len() && !chars[pos].is_whitespace() && chars[pos] != '.' {
495 pos += 1;
496 }
497 let token: String = chars[start..pos].iter().collect();
498 if let Some(colon_pos) = token.find(':') {
499 let ns = &token[..colon_pos];
500 let local = &token[colon_pos + 1..];
501 match prefixes.get(ns) {
502 Some(base) => {
503 let full = format!("{base}{local}");
504 terms.push(PatchTerm::iri(full));
505 }
506 None => {
507 return Err(PatchError::at(
508 line_no,
509 format!("unknown prefix '{ns}' in '{token}'"),
510 ))
511 }
512 }
513 } else if token.is_empty() || token == "." {
514 } else {
516 return Err(PatchError::at(
517 line_no,
518 format!("unexpected token '{token}'"),
519 ));
520 }
521 }
522 }
523
524 Ok(terms)
525}
526
527pub(crate) fn strip_angle_brackets(s: &str) -> &str {
529 if s.starts_with('<') && s.ends_with('>') {
530 &s[1..s.len() - 1]
531 } else {
532 s
533 }
534}
535
536pub(crate) fn parse_prefix_decl_inline(
538 rest: &str,
539 line_no: usize,
540) -> PatchResult<(String, String)> {
541 let mut parts = rest.splitn(2, ' ');
542 let prefix_raw = parts
543 .next()
544 .ok_or_else(|| PatchError::at(line_no, "missing prefix name"))?
545 .trim_end_matches(':');
546 let iri_raw = parts
547 .next()
548 .ok_or_else(|| PatchError::at(line_no, "missing prefix IRI"))?
549 .trim();
550 let iri = strip_angle_brackets(iri_raw);
551 Ok((prefix_raw.to_string(), iri.to_string()))
552}