oxirs_stream/patch/
normalizer.rs1use crate::{PatchOperation, RdfPatch};
4use anyhow::Result;
5use std::collections::BTreeSet;
6use tracing::info;
7
8pub struct PatchNormalizer {
9 canonical_ordering: bool,
10 deduplicate_operations: bool,
11 normalize_uris: bool,
12 sort_by_subject: bool,
13}
14
15impl PatchNormalizer {
16 pub fn new() -> Self {
17 Self {
18 canonical_ordering: true,
19 deduplicate_operations: true,
20 normalize_uris: true,
21 sort_by_subject: true,
22 }
23 }
24
25 pub fn with_canonical_ordering(mut self, enabled: bool) -> Self {
26 self.canonical_ordering = enabled;
27 self
28 }
29
30 pub fn with_deduplication(mut self, enabled: bool) -> Self {
31 self.deduplicate_operations = enabled;
32 self
33 }
34
35 pub fn with_uri_normalization(mut self, enabled: bool) -> Self {
36 self.normalize_uris = enabled;
37 self
38 }
39
40 pub fn normalize(&self, patch: &RdfPatch) -> Result<RdfPatch> {
42 let mut normalized = patch.clone();
43 normalized.id = format!("{}-normalized", patch.id);
44
45 if self.normalize_uris {
47 normalized = self.normalize_uris_in_patch(normalized)?;
48 }
49
50 if self.deduplicate_operations {
52 normalized = self.deduplicate_operations_in_patch(normalized)?;
53 }
54
55 if self.canonical_ordering {
57 normalized = self.apply_canonical_ordering(normalized)?;
58 }
59
60 if self.sort_by_subject {
62 normalized = self.sort_operations_by_subject(normalized)?;
63 }
64
65 info!(
66 "Normalized patch: {} -> {} operations",
67 patch.operations.len(),
68 normalized.operations.len()
69 );
70 Ok(normalized)
71 }
72
73 fn normalize_uris_in_patch(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
74 for operation in &mut patch.operations {
75 match operation {
76 PatchOperation::Add {
77 subject,
78 predicate,
79 object,
80 } => {
81 *subject = self.normalize_uri(subject);
82 *predicate = self.normalize_uri(predicate);
83 *object = self.normalize_uri(object);
84 }
85 PatchOperation::Delete {
86 subject,
87 predicate,
88 object,
89 } => {
90 *subject = self.normalize_uri(subject);
91 *predicate = self.normalize_uri(predicate);
92 *object = self.normalize_uri(object);
93 }
94 PatchOperation::AddGraph { graph } => {
95 *graph = self.normalize_uri(graph);
96 }
97 PatchOperation::DeleteGraph { graph } => {
98 *graph = self.normalize_uri(graph);
99 }
100 _ => {} }
102 }
103 Ok(patch)
104 }
105
106 fn normalize_uri(&self, uri: &str) -> String {
107 let mut normalized = uri.trim_end_matches('/').to_string();
109
110 if normalized.starts_with("http://") || normalized.starts_with("https://") {
112 if let Some(pos) = normalized.find("://") {
113 let (scheme, rest) = normalized.split_at(pos + 3);
114 if let Some(domain_end) = rest.find('/') {
115 let (domain, path) = rest.split_at(domain_end);
116 normalized =
117 format!("{}{}{}", scheme.to_lowercase(), domain.to_lowercase(), path);
118 } else {
119 normalized = format!("{}{}", scheme.to_lowercase(), rest.to_lowercase());
120 }
121 }
122 }
123
124 normalized
125 }
126
127 fn deduplicate_operations_in_patch(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
128 let mut seen = BTreeSet::new();
129 patch.operations.retain(|op| {
130 let key = format!("{op:?}");
131 if seen.contains(&key) {
132 false
133 } else {
134 seen.insert(key);
135 true
136 }
137 });
138 Ok(patch)
139 }
140
141 fn apply_canonical_ordering(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
142 let mut headers = Vec::new();
144 let mut prefixes = Vec::new();
145 let mut tx_begin = Vec::new();
146 let mut adds = Vec::new();
147 let mut deletes = Vec::new();
148 let mut graphs = Vec::new();
149 let mut tx_end = Vec::new();
150
151 for operation in &patch.operations {
152 match operation {
153 PatchOperation::Header { .. } => headers.push(operation.clone()),
154 PatchOperation::AddPrefix { .. } | PatchOperation::DeletePrefix { .. } => {
155 prefixes.push(operation.clone())
156 }
157 PatchOperation::TransactionBegin { .. } => tx_begin.push(operation.clone()),
158 PatchOperation::Add { .. } => adds.push(operation.clone()),
159 PatchOperation::Delete { .. } => deletes.push(operation.clone()),
160 PatchOperation::AddGraph { .. } | PatchOperation::DeleteGraph { .. } => {
161 graphs.push(operation.clone())
162 }
163 PatchOperation::TransactionCommit | PatchOperation::TransactionAbort => {
164 tx_end.push(operation.clone())
165 }
166 }
167 }
168
169 patch.operations.clear();
171 patch.operations.extend(headers);
172 patch.operations.extend(prefixes);
173 patch.operations.extend(tx_begin);
174 patch.operations.extend(graphs);
175 patch.operations.extend(deletes); patch.operations.extend(adds);
177 patch.operations.extend(tx_end);
178
179 Ok(patch)
180 }
181
182 fn sort_operations_by_subject(&self, mut patch: RdfPatch) -> Result<RdfPatch> {
183 patch.operations.sort_by(|a, b| {
185 let subject_a = self.extract_subject(a);
186 let subject_b = self.extract_subject(b);
187 subject_a.cmp(&subject_b)
188 });
189
190 Ok(patch)
191 }
192
193 fn extract_subject(&self, operation: &PatchOperation) -> String {
194 match operation {
195 PatchOperation::Add { subject, .. } | PatchOperation::Delete { subject, .. } => {
196 subject.clone()
197 }
198 _ => String::new(), }
200 }
201}
202
203impl Default for PatchNormalizer {
204 fn default() -> Self {
205 Self::new()
206 }
207}