Skip to main content

oxirs_ttl/patch/
patch_parser.rs

1//! RDF Patch parser (batch and streaming) and graph mutation helpers.
2//!
3//! Exports: [`PatchParser`], [`apply_patch`], [`diff_to_patch`].
4
5use std::collections::BTreeMap;
6use std::io::{BufRead, BufReader, Read};
7
8use super::patch_types::{
9    Graph, PatchChange, PatchError, PatchHeader, PatchQuad, PatchResult, PatchStats, PatchTerm,
10    PatchTriple, RdfPatch,
11};
12
13// ─── PatchParser ─────────────────────────────────────────────────────────────
14
15/// Parser for the RDF Patch text format
16pub struct PatchParser;
17
18impl PatchParser {
19    /// Parse an entire RDF Patch document from a string
20    pub fn parse(input: &str) -> PatchResult<RdfPatch> {
21        let mut headers = Vec::new();
22        let mut changes = Vec::new();
23        let mut prefixes: BTreeMap<String, String> = BTreeMap::new();
24
25        for (idx, raw_line) in input.lines().enumerate() {
26            let line_no = idx + 1;
27            let line = raw_line.trim();
28
29            // Skip blank lines and comments
30            if line.is_empty() || line.starts_with('#') {
31                continue;
32            }
33
34            if let Some(rest) = line.strip_prefix("H ") {
35                let header = Self::parse_header(rest.trim(), line_no)?;
36                headers.push(header);
37            } else if line == "TX" {
38                changes.push(PatchChange::TransactionBegin);
39            } else if line == "TC" {
40                changes.push(PatchChange::TransactionCommit);
41            } else if line == "TA" {
42                changes.push(PatchChange::TransactionAbort);
43            } else if let Some(rest) = line.strip_prefix("PA ") {
44                let (prefix, iri) = Self::parse_prefix_decl(rest.trim(), line_no)?;
45                prefixes.insert(prefix.clone(), iri.clone());
46                changes.push(PatchChange::AddPrefix { prefix, iri });
47            } else if let Some(rest) = line.strip_prefix("PD ") {
48                let (prefix, iri) = Self::parse_prefix_decl(rest.trim(), line_no)?;
49                changes.push(PatchChange::DeletePrefix { prefix, iri });
50            } else if let Some(rest) = line.strip_prefix("A ") {
51                let change = Self::parse_triple_or_quad("A", rest.trim(), &prefixes, line_no)?;
52                changes.push(change);
53            } else if let Some(rest) = line.strip_prefix("D ") {
54                let change = Self::parse_triple_or_quad("D", rest.trim(), &prefixes, line_no)?;
55                changes.push(change);
56            } else {
57                return Err(PatchError::at(
58                    line_no,
59                    format!("unrecognised line: {line:?}"),
60                ));
61            }
62        }
63
64        Ok(RdfPatch { headers, changes })
65    }
66
67    /// Create a streaming iterator that parses one [`PatchChange`] at a time.
68    /// Headers are skipped in streaming mode (only change lines are yielded).
69    pub fn parse_streaming(reader: impl Read) -> impl Iterator<Item = PatchResult<PatchChange>> {
70        StreamingPatchParser::new(reader)
71    }
72
73    // ── Internal helpers ──────────────────────────────────────────────────
74
75    fn parse_header(rest: &str, line_no: usize) -> PatchResult<PatchHeader> {
76        // rest is `key <value>` or `key value`
77        let mut parts = rest.splitn(2, ' ');
78        let key = parts
79            .next()
80            .ok_or_else(|| PatchError::at(line_no, "missing header key"))?
81            .trim();
82        let value_raw = parts.next().unwrap_or("").trim();
83        let value = strip_angle_brackets(value_raw);
84        match key {
85            "version" => Ok(PatchHeader::Version(value.to_string())),
86            "prev" => Ok(PatchHeader::Previous(value.to_string())),
87            "id" => Ok(PatchHeader::Id(value.to_string())),
88            other => Ok(PatchHeader::Unknown {
89                key: other.to_string(),
90                value: value.to_string(),
91            }),
92        }
93    }
94
95    fn parse_prefix_decl(rest: &str, line_no: usize) -> PatchResult<(String, String)> {
96        // rest is `prefix <iri>` or `prefix: <iri>`
97        let mut parts = rest.splitn(2, ' ');
98        let prefix_raw = parts
99            .next()
100            .ok_or_else(|| PatchError::at(line_no, "missing prefix name"))?
101            .trim_end_matches(':');
102        let iri_raw = parts
103            .next()
104            .ok_or_else(|| PatchError::at(line_no, "missing prefix IRI"))?
105            .trim();
106        let iri = strip_angle_brackets(iri_raw);
107        Ok((prefix_raw.to_string(), iri.to_string()))
108    }
109
110    pub(crate) fn parse_triple_or_quad(
111        op: &str,
112        rest: &str,
113        prefixes: &BTreeMap<String, String>,
114        line_no: usize,
115    ) -> PatchResult<PatchChange> {
116        // Strip trailing ' .' if present
117        let rest = rest.trim_end_matches('.').trim();
118        let terms = tokenise_terms(rest, prefixes, line_no)?;
119        match terms.len() {
120            3 => {
121                let triple = PatchTriple::new(terms[0].clone(), terms[1].clone(), terms[2].clone());
122                if op == "A" {
123                    Ok(PatchChange::AddTriple(triple))
124                } else {
125                    Ok(PatchChange::DeleteTriple(triple))
126                }
127            }
128            4 => {
129                let quad = PatchQuad::new(
130                    terms[0].clone(),
131                    terms[1].clone(),
132                    terms[2].clone(),
133                    terms[3].clone(),
134                );
135                if op == "A" {
136                    Ok(PatchChange::AddQuad(quad))
137                } else {
138                    Ok(PatchChange::DeleteQuad(quad))
139                }
140            }
141            n => Err(PatchError::at(
142                line_no,
143                format!("expected 3 or 4 terms, got {n}"),
144            )),
145        }
146    }
147}
148
149// ─── Streaming parser ────────────────────────────────────────────────────────
150
151struct StreamingPatchParser<R: Read> {
152    reader: BufReader<R>,
153    line_no: usize,
154    prefixes: BTreeMap<String, String>,
155    done: bool,
156}
157
158impl<R: Read> StreamingPatchParser<R> {
159    fn new(reader: R) -> Self {
160        Self {
161            reader: BufReader::new(reader),
162            line_no: 0,
163            prefixes: BTreeMap::new(),
164            done: false,
165        }
166    }
167}
168
169impl<R: Read> Iterator for StreamingPatchParser<R> {
170    type Item = PatchResult<PatchChange>;
171
172    fn next(&mut self) -> Option<Self::Item> {
173        if self.done {
174            return None;
175        }
176        loop {
177            let mut raw = String::new();
178            match self.reader.read_line(&mut raw) {
179                Ok(0) => {
180                    self.done = true;
181                    return None;
182                }
183                Err(e) => {
184                    self.done = true;
185                    return Some(Err(PatchError::at(self.line_no, e.to_string())));
186                }
187                Ok(_) => {}
188            }
189            self.line_no += 1;
190            let line = raw.trim();
191
192            if line.is_empty() || line.starts_with('#') {
193                continue;
194            }
195
196            // Headers — skip silently in streaming mode
197            if line.starts_with("H ") {
198                continue;
199            }
200
201            let result = if line == "TX" {
202                Ok(PatchChange::TransactionBegin)
203            } else if line == "TC" {
204                Ok(PatchChange::TransactionCommit)
205            } else if line == "TA" {
206                Ok(PatchChange::TransactionAbort)
207            } else if let Some(rest) = line.strip_prefix("PA ") {
208                match parse_prefix_decl_inline(rest.trim(), self.line_no) {
209                    Ok((prefix, iri)) => {
210                        self.prefixes.insert(prefix.clone(), iri.clone());
211                        Ok(PatchChange::AddPrefix { prefix, iri })
212                    }
213                    Err(e) => Err(e),
214                }
215            } else if let Some(rest) = line.strip_prefix("PD ") {
216                match parse_prefix_decl_inline(rest.trim(), self.line_no) {
217                    Ok((prefix, iri)) => Ok(PatchChange::DeletePrefix { prefix, iri }),
218                    Err(e) => Err(e),
219                }
220            } else if let Some(rest) = line.strip_prefix("A ") {
221                PatchParser::parse_triple_or_quad("A", rest.trim(), &self.prefixes, self.line_no)
222            } else if let Some(rest) = line.strip_prefix("D ") {
223                PatchParser::parse_triple_or_quad("D", rest.trim(), &self.prefixes, self.line_no)
224            } else {
225                Err(PatchError::at(
226                    self.line_no,
227                    format!("unrecognised line: {line:?}"),
228                ))
229            };
230
231            return Some(result);
232        }
233    }
234}
235
236// ─── apply_patch ─────────────────────────────────────────────────────────────
237
238/// Apply an [`RdfPatch`] to an in-memory [`Graph`], updating it in place.
239///
240/// Transactions are honoured: changes between `TX`/`TA` are rolled back on abort.
241/// Returns [`PatchStats`] summarising what was modified.
242pub fn apply_patch(graph: &mut Graph, patch: &RdfPatch) -> PatchResult<PatchStats> {
243    let mut stats = PatchStats::default();
244    let mut in_tx = false;
245    // Staged changes for the current transaction block
246    let mut tx_adds: Vec<PatchTriple> = Vec::new();
247    let mut tx_deletes: Vec<PatchTriple> = Vec::new();
248    let mut tx_prefix_adds: Vec<(String, String)> = Vec::new();
249
250    for change in &patch.changes {
251        match change {
252            PatchChange::TransactionBegin => {
253                in_tx = true;
254                tx_adds.clear();
255                tx_deletes.clear();
256                tx_prefix_adds.clear();
257                stats.transactions += 1;
258            }
259            PatchChange::TransactionCommit => {
260                // Commit staged changes
261                for t in tx_adds.drain(..) {
262                    if graph.add_triple(t) {
263                        stats.triples_added += 1;
264                    }
265                }
266                for t in &tx_deletes {
267                    if graph.remove_triple(t) {
268                        stats.triples_deleted += 1;
269                    }
270                }
271                tx_deletes.clear();
272                for (p, i) in tx_prefix_adds.drain(..) {
273                    graph.prefixes.insert(p, i);
274                    stats.prefixes_added += 1;
275                }
276                in_tx = false;
277            }
278            PatchChange::TransactionAbort => {
279                // Discard staged changes
280                tx_adds.clear();
281                tx_deletes.clear();
282                tx_prefix_adds.clear();
283                in_tx = false;
284                stats.aborts += 1;
285            }
286            PatchChange::AddPrefix { prefix, iri } => {
287                if in_tx {
288                    tx_prefix_adds.push((prefix.clone(), iri.clone()));
289                } else {
290                    graph.prefixes.insert(prefix.clone(), iri.clone());
291                    stats.prefixes_added += 1;
292                }
293            }
294            PatchChange::DeletePrefix { prefix, .. } => {
295                graph.prefixes.remove(prefix.as_str());
296                stats.prefixes_deleted += 1;
297            }
298            PatchChange::AddTriple(t) => {
299                if in_tx {
300                    tx_adds.push(t.clone());
301                } else if graph.add_triple(t.clone()) {
302                    stats.triples_added += 1;
303                }
304            }
305            PatchChange::DeleteTriple(t) => {
306                if in_tx {
307                    tx_deletes.push(t.clone());
308                } else if graph.remove_triple(t) {
309                    stats.triples_deleted += 1;
310                }
311            }
312            // Quads are not supported on simple Graph; treat as triple
313            PatchChange::AddQuad(q) => {
314                let t = PatchTriple::new(q.subject.clone(), q.predicate.clone(), q.object.clone());
315                if in_tx {
316                    tx_adds.push(t);
317                } else if graph.add_triple(t) {
318                    stats.triples_added += 1;
319                }
320            }
321            PatchChange::DeleteQuad(q) => {
322                let t = PatchTriple::new(q.subject.clone(), q.predicate.clone(), q.object.clone());
323                if in_tx {
324                    tx_deletes.push(t.clone());
325                } else if graph.remove_triple(&t) {
326                    stats.triples_deleted += 1;
327                }
328            }
329        }
330    }
331
332    Ok(stats)
333}
334
335// ─── diff_to_patch ───────────────────────────────────────────────────────────
336
337/// Generate a minimal [`RdfPatch`] that transforms `old` into `new`.
338///
339/// All deletions come before additions in the generated patch, matching
340/// the convention used by most RDF Patch tools.
341pub fn diff_to_patch(old: &Graph, new: &Graph) -> RdfPatch {
342    let mut changes = Vec::new();
343
344    // Deletes: triples in old but not new
345    for triple in old.iter() {
346        if !new.contains(triple) {
347            changes.push(PatchChange::DeleteTriple(triple.clone()));
348        }
349    }
350
351    // Adds: triples in new but not old
352    for triple in new.iter() {
353        if !old.contains(triple) {
354            changes.push(PatchChange::AddTriple(triple.clone()));
355        }
356    }
357
358    // Prefix adds: in new but not old
359    for (prefix, iri) in &new.prefixes {
360        if old.prefixes.get(prefix) != Some(iri) {
361            changes.push(PatchChange::AddPrefix {
362                prefix: prefix.clone(),
363                iri: iri.clone(),
364            });
365        }
366    }
367
368    // Prefix deletes: in old but not new
369    for (prefix, iri) in &old.prefixes {
370        if !new.prefixes.contains_key(prefix.as_str()) {
371            changes.push(PatchChange::DeletePrefix {
372                prefix: prefix.clone(),
373                iri: iri.clone(),
374            });
375        }
376    }
377
378    RdfPatch {
379        headers: Vec::new(),
380        changes,
381    }
382}
383
384// ─── Term tokeniser ──────────────────────────────────────────────────────────
385
386/// Tokenise a whitespace-separated sequence of RDF terms.
387/// Handles IRIs (`<...>`), blank nodes (`_:id`), literals (`"..."`), and
388/// prefixed names (`prefix:local`).
389pub(crate) fn tokenise_terms(
390    input: &str,
391    prefixes: &BTreeMap<String, String>,
392    line_no: usize,
393) -> PatchResult<Vec<PatchTerm>> {
394    let mut terms = Vec::new();
395    let chars: Vec<char> = input.chars().collect();
396    let mut pos = 0;
397
398    while pos < chars.len() {
399        // Skip whitespace
400        while pos < chars.len() && chars[pos].is_whitespace() {
401            pos += 1;
402        }
403        if pos >= chars.len() {
404            break;
405        }
406
407        if chars[pos] == '<' {
408            // IRI
409            pos += 1;
410            let start = pos;
411            while pos < chars.len() && chars[pos] != '>' {
412                pos += 1;
413            }
414            if pos >= chars.len() {
415                return Err(PatchError::at(line_no, "unterminated IRI"));
416            }
417            let iri: String = chars[start..pos].iter().collect();
418            pos += 1; // consume '>'
419            terms.push(PatchTerm::iri(iri));
420        } else if chars[pos] == '"' {
421            // Literal
422            pos += 1;
423            let mut value = String::new();
424            while pos < chars.len() {
425                if chars[pos] == '\\' && pos + 1 < chars.len() {
426                    pos += 1;
427                    match chars[pos] {
428                        '"' => value.push('"'),
429                        '\\' => value.push('\\'),
430                        'n' => value.push('\n'),
431                        'r' => value.push('\r'),
432                        't' => value.push('\t'),
433                        c => {
434                            value.push('\\');
435                            value.push(c);
436                        }
437                    }
438                    pos += 1;
439                } else if chars[pos] == '"' {
440                    break;
441                } else {
442                    value.push(chars[pos]);
443                    pos += 1;
444                }
445            }
446            if pos >= chars.len() {
447                return Err(PatchError::at(line_no, "unterminated literal"));
448            }
449            pos += 1; // consume closing '"'
450
451            // Check for language tag or datatype
452            if pos < chars.len() && chars[pos] == '@' {
453                pos += 1;
454                let start = pos;
455                while pos < chars.len() && !chars[pos].is_whitespace() {
456                    pos += 1;
457                }
458                let lang: String = chars[start..pos].iter().collect();
459                terms.push(PatchTerm::lang_literal(value, lang));
460            } else if pos + 1 < chars.len() && chars[pos] == '^' && chars[pos + 1] == '^' {
461                pos += 2;
462                if pos >= chars.len() || chars[pos] != '<' {
463                    return Err(PatchError::at(line_no, "expected '<' after '^^'"));
464                }
465                pos += 1;
466                let start = pos;
467                while pos < chars.len() && chars[pos] != '>' {
468                    pos += 1;
469                }
470                if pos >= chars.len() {
471                    return Err(PatchError::at(line_no, "unterminated datatype IRI"));
472                }
473                let dt: String = chars[start..pos].iter().collect();
474                pos += 1;
475                terms.push(PatchTerm::typed_literal(value, dt));
476            } else {
477                terms.push(PatchTerm::literal(value));
478            }
479        } else if pos + 1 < chars.len() && chars[pos] == '_' && chars[pos + 1] == ':' {
480            // Blank node
481            pos += 2;
482            let start = pos;
483            while pos < chars.len() && !chars[pos].is_whitespace() && chars[pos] != '.' {
484                pos += 1;
485            }
486            let id: String = chars[start..pos].iter().collect();
487            terms.push(PatchTerm::blank_node(id));
488        } else if chars[pos] == '.' {
489            // Trailing dot — stop
490            pos += 1;
491        } else {
492            // Possibly a prefixed name `prefix:local`
493            let start = pos;
494            while pos < chars.len() && !chars[pos].is_whitespace() && chars[pos] != '.' {
495                pos += 1;
496            }
497            let token: String = chars[start..pos].iter().collect();
498            if let Some(colon_pos) = token.find(':') {
499                let ns = &token[..colon_pos];
500                let local = &token[colon_pos + 1..];
501                match prefixes.get(ns) {
502                    Some(base) => {
503                        let full = format!("{base}{local}");
504                        terms.push(PatchTerm::iri(full));
505                    }
506                    None => {
507                        return Err(PatchError::at(
508                            line_no,
509                            format!("unknown prefix '{ns}' in '{token}'"),
510                        ))
511                    }
512                }
513            } else if token.is_empty() || token == "." {
514                // skip
515            } else {
516                return Err(PatchError::at(
517                    line_no,
518                    format!("unexpected token '{token}'"),
519                ));
520            }
521        }
522    }
523
524    Ok(terms)
525}
526
527/// Strip surrounding `<...>` from an IRI token, if present
528pub(crate) fn strip_angle_brackets(s: &str) -> &str {
529    if s.starts_with('<') && s.ends_with('>') {
530        &s[1..s.len() - 1]
531    } else {
532        s
533    }
534}
535
536/// Inline prefix-decl parser used in the streaming parser
537pub(crate) fn parse_prefix_decl_inline(
538    rest: &str,
539    line_no: usize,
540) -> PatchResult<(String, String)> {
541    let mut parts = rest.splitn(2, ' ');
542    let prefix_raw = parts
543        .next()
544        .ok_or_else(|| PatchError::at(line_no, "missing prefix name"))?
545        .trim_end_matches(':');
546    let iri_raw = parts
547        .next()
548        .ok_or_else(|| PatchError::at(line_no, "missing prefix IRI"))?
549        .trim();
550    let iri = strip_angle_brackets(iri_raw);
551    Ok((prefix_raw.to_string(), iri.to_string()))
552}