oxirs_stream/patch/
normalizer.rs

1//! Patch normalizer
2
3use crate::{PatchOperation, RdfPatch};
4use anyhow::Result;
5use std::collections::BTreeSet;
6use tracing::info;
7
8pub struct PatchNormalizer {
9    canonical_ordering: bool,
10    deduplicate_operations: bool,
11    normalize_uris: bool,
12    sort_by_subject: bool,
13}
14
15impl PatchNormalizer {
16    pub fn new() -> Self {
17        Self {
18            canonical_ordering: true,
19            deduplicate_operations: true,
20            normalize_uris: true,
21            sort_by_subject: true,
22        }
23    }
24
25    pub fn with_canonical_ordering(mut self, enabled: bool) -> Self {
26        self.canonical_ordering = enabled;
27        self
28    }
29
30    pub fn with_deduplication(mut self, enabled: bool) -> Self {
31        self.deduplicate_operations = enabled;
32        self
33    }
34
35    pub fn with_uri_normalization(mut self, enabled: bool) -> Self {
36        self.normalize_uris = enabled;
37        self
38    }
39
40    /// Normalize a patch according to configured rules
41    pub fn normalize(&self, patch: &RdfPatch) -> Result<RdfPatch> {
42        let mut normalized = patch.clone();
43        normalized.id = format!("{}-normalized", patch.id);
44
45        // Step 1: Normalize URIs
46        if self.normalize_uris {
47            normalized = self.normalize_uris_in_patch(normalized)?;
48        }
49
50        // Step 2: Deduplicate operations
51        if self.deduplicate_operations {
52            normalized = self.deduplicate_operations_in_patch(normalized)?;
53        }
54
55        // Step 3: Canonical ordering
56        if self.canonical_ordering {
57            normalized = self.apply_canonical_ordering(normalized)?;
58        }
59
60        // Step 4: Sort by subject if enabled
61        if self.sort_by_subject {
62            normalized = self.sort_operations_by_subject(normalized)?;
63        }
64
65        info!(
66            "Normalized patch: {} -> {} operations",
67            patch.operations.len(),
68            normalized.operations.len()
69        );
70        Ok(normalized)
71    }
72
73    fn normalize_uris_in_patch(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
74        for operation in &mut patch.operations {
75            match operation {
76                PatchOperation::Add {
77                    subject,
78                    predicate,
79                    object,
80                } => {
81                    *subject = self.normalize_uri(subject);
82                    *predicate = self.normalize_uri(predicate);
83                    *object = self.normalize_uri(object);
84                }
85                PatchOperation::Delete {
86                    subject,
87                    predicate,
88                    object,
89                } => {
90                    *subject = self.normalize_uri(subject);
91                    *predicate = self.normalize_uri(predicate);
92                    *object = self.normalize_uri(object);
93                }
94                PatchOperation::AddGraph { graph } => {
95                    *graph = self.normalize_uri(graph);
96                }
97                PatchOperation::DeleteGraph { graph } => {
98                    *graph = self.normalize_uri(graph);
99                }
100                _ => {} // Other operations don't have URIs to normalize
101            }
102        }
103        Ok(patch)
104    }
105
106    fn normalize_uri(&self, uri: &str) -> String {
107        // Remove trailing slashes, normalize case, etc.
108        let mut normalized = uri.trim_end_matches('/').to_string();
109
110        // Convert to lowercase for schemes
111        if normalized.starts_with("http://") || normalized.starts_with("https://") {
112            if let Some(pos) = normalized.find("://") {
113                let (scheme, rest) = normalized.split_at(pos + 3);
114                if let Some(domain_end) = rest.find('/') {
115                    let (domain, path) = rest.split_at(domain_end);
116                    normalized =
117                        format!("{}{}{}", scheme.to_lowercase(), domain.to_lowercase(), path);
118                } else {
119                    normalized = format!("{}{}", scheme.to_lowercase(), rest.to_lowercase());
120                }
121            }
122        }
123
124        normalized
125    }
126
127    fn deduplicate_operations_in_patch(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
128        let mut seen = BTreeSet::new();
129        patch.operations.retain(|op| {
130            let key = format!("{op:?}");
131            if seen.contains(&key) {
132                false
133            } else {
134                seen.insert(key);
135                true
136            }
137        });
138        Ok(patch)
139    }
140
141    fn apply_canonical_ordering(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
142        // Group operations by type and apply canonical ordering within each group
143        let mut headers = Vec::new();
144        let mut prefixes = Vec::new();
145        let mut tx_begin = Vec::new();
146        let mut adds = Vec::new();
147        let mut deletes = Vec::new();
148        let mut graphs = Vec::new();
149        let mut tx_end = Vec::new();
150
151        for operation in &patch.operations {
152            match operation {
153                PatchOperation::Header { .. } => headers.push(operation.clone()),
154                PatchOperation::AddPrefix { .. } | PatchOperation::DeletePrefix { .. } => {
155                    prefixes.push(operation.clone())
156                }
157                PatchOperation::TransactionBegin { .. } => tx_begin.push(operation.clone()),
158                PatchOperation::Add { .. } => adds.push(operation.clone()),
159                PatchOperation::Delete { .. } => deletes.push(operation.clone()),
160                PatchOperation::AddGraph { .. } | PatchOperation::DeleteGraph { .. } => {
161                    graphs.push(operation.clone())
162                }
163                PatchOperation::TransactionCommit | PatchOperation::TransactionAbort => {
164                    tx_end.push(operation.clone())
165                }
166            }
167        }
168
169        // Rebuild operations in canonical order
170        patch.operations.clear();
171        patch.operations.extend(headers);
172        patch.operations.extend(prefixes);
173        patch.operations.extend(tx_begin);
174        patch.operations.extend(graphs);
175        patch.operations.extend(deletes); // Deletes before adds
176        patch.operations.extend(adds);
177        patch.operations.extend(tx_end);
178
179        Ok(patch)
180    }
181
182    fn sort_operations_by_subject(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
183        // Sort triple operations by subject
184        patch.operations.sort_by(|a, b| {
185            let subject_a = self.extract_subject(a);
186            let subject_b = self.extract_subject(b);
187            subject_a.cmp(&subject_b)
188        });
189
190        Ok(patch)
191    }
192
193    fn extract_subject(&self, operation: &PatchOperation) -> String {
194        match operation {
195            PatchOperation::Add { subject, .. } | PatchOperation::Delete { subject, .. } => {
196                subject.clone()
197            }
198            _ => String::new(), // Non-triple operations sort first
199        }
200    }
201}
202
203impl Default for PatchNormalizer {
204    fn default() -> Self {
205        Self::new()
206    }
207}