oxirs_stream/patch/
result.rs1use crate::{PatchOperation, RdfPatch};
4use anyhow::Result;
5use chrono::{DateTime, Utc};
6use std::fmt;
7use tracing::debug;
8
9pub struct PatchResult {
10 pub patch_id: String,
11 pub total_operations: usize,
12 pub operations_applied: usize,
13 pub errors: Vec<String>,
14 pub applied_at: DateTime<Utc>,
15}
16
17impl Default for PatchResult {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl PatchResult {
24 pub fn new() -> Self {
25 Self {
26 patch_id: String::new(),
27 total_operations: 0,
28 operations_applied: 0,
29 errors: Vec::new(),
30 applied_at: Utc::now(),
31 }
32 }
33
34 pub fn is_success(&self) -> bool {
35 self.errors.is_empty() && self.operations_applied == self.total_operations
36 }
37
38 pub fn success_rate(&self) -> f64 {
39 if self.total_operations == 0 {
40 1.0
41 } else {
42 self.operations_applied as f64 / self.total_operations as f64
43 }
44 }
45}
46
47impl fmt::Display for PatchResult {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 write!(
50 f,
51 "Patch {} applied: {}/{} operations ({:.1}% success)",
52 self.patch_id,
53 self.operations_applied,
54 self.total_operations,
55 self.success_rate() * 100.0
56 )
57 }
58}
59
60pub fn create_patch_from_sparql(update: &str) -> Result<RdfPatch> {
62 let mut delta_computer = crate::delta::DeltaComputer::new();
63 delta_computer.sparql_to_patch(update)
64}
65
66pub fn create_transactional_patch(operations: Vec<PatchOperation>) -> RdfPatch {
68 let mut patch = RdfPatch::new();
69 let transaction_id = uuid::Uuid::new_v4().to_string();
70
71 patch.add_operation(PatchOperation::TransactionBegin {
73 transaction_id: Some(transaction_id.clone()),
74 });
75 patch.transaction_id = Some(transaction_id);
76
77 for op in operations {
79 patch.add_operation(op);
80 }
81
82 patch.add_operation(PatchOperation::TransactionCommit);
84
85 patch
86}
87
88pub fn create_reverse_patch(patch: &RdfPatch) -> Result<RdfPatch> {
90 let mut reverse_patch = RdfPatch::new();
91 reverse_patch.id = format!("{}-reverse", patch.id);
92
93 reverse_patch.headers = patch.headers.clone();
95 reverse_patch.prefixes = patch.prefixes.clone();
96
97 let mut reversing_transaction = false;
99 let mut transaction_operations = Vec::new();
100
101 for operation in patch.operations.iter().rev() {
103 let reverse_operation = match operation {
104 PatchOperation::Add {
105 subject,
106 predicate,
107 object,
108 } => PatchOperation::Delete {
109 subject: subject.clone(),
110 predicate: predicate.clone(),
111 object: object.clone(),
112 },
113 PatchOperation::Delete {
114 subject,
115 predicate,
116 object,
117 } => PatchOperation::Add {
118 subject: subject.clone(),
119 predicate: predicate.clone(),
120 object: object.clone(),
121 },
122 PatchOperation::AddGraph { graph } => PatchOperation::DeleteGraph {
123 graph: graph.clone(),
124 },
125 PatchOperation::DeleteGraph { graph } => PatchOperation::AddGraph {
126 graph: graph.clone(),
127 },
128 PatchOperation::AddPrefix {
129 prefix,
130 namespace: _namespace,
131 } => PatchOperation::DeletePrefix {
132 prefix: prefix.clone(),
133 },
134 PatchOperation::DeletePrefix { prefix } => {
135 reverse_patch.add_operation(PatchOperation::Header {
138 key: "warning".to_string(),
139 value: format!("Cannot reverse prefix deletion for '{prefix}'"),
140 });
141 continue;
142 }
143 PatchOperation::TransactionBegin { transaction_id: _ } => {
144 reversing_transaction = false;
146 for op in transaction_operations.drain(..) {
148 reverse_patch.add_operation(op);
149 }
150 PatchOperation::TransactionCommit
151 }
152 PatchOperation::TransactionCommit => {
153 reversing_transaction = true;
155 PatchOperation::TransactionBegin {
156 transaction_id: patch.transaction_id.clone(),
157 }
158 }
159 PatchOperation::TransactionAbort => {
160 continue;
162 }
163 PatchOperation::Header { key, value } => PatchOperation::Header {
164 key: format!("reversed-{key}"),
165 value: value.clone(),
166 },
167 };
168
169 if reversing_transaction && !matches!(operation, PatchOperation::TransactionCommit) {
170 transaction_operations.push(reverse_operation);
171 } else {
172 reverse_patch.add_operation(reverse_operation);
173 }
174 }
175
176 debug!(
177 "Created reverse patch with {} operations",
178 reverse_patch.operations.len()
179 );
180 Ok(reverse_patch)
181}
182
183pub fn merge_patches(patches: &[RdfPatch]) -> Result<RdfPatch> {
185 let mut merged = RdfPatch::new();
186 merged.id = format!("merged-{}", uuid::Uuid::new_v4());
187
188 for patch in patches {
189 for operation in &patch.operations {
190 merged.add_operation(operation.clone());
191 }
192 }
193
194 debug!(
195 "Merged {} patches into {} operations",
196 patches.len(),
197 merged.operations.len()
198 );
199 Ok(merged)
200}
201
202pub fn optimize_patch(patch: &RdfPatch) -> Result<RdfPatch> {
204 let mut optimized = RdfPatch::new();
205 optimized.id = format!("{}-optimized", patch.id);
206
207 let mut seen_operations = std::collections::HashSet::new();
208
209 for operation in &patch.operations {
210 let operation_key = format!("{operation:?}");
211
212 if seen_operations.contains(&operation_key) {
214 continue;
215 }
216
217 seen_operations.insert(operation_key);
218 optimized.add_operation(operation.clone());
219 }
220
221 debug!(
222 "Optimized patch from {} to {} operations",
223 patch.operations.len(),
224 optimized.operations.len()
225 );
226
227 Ok(optimized)
228}
229
230pub fn validate_patch(patch: &RdfPatch) -> Result<Vec<String>> {
232 let mut warnings = Vec::new();
233
234 let mut adds = std::collections::HashSet::new();
236 let mut deletes = std::collections::HashSet::new();
237
238 for operation in &patch.operations {
239 match operation {
240 PatchOperation::Add {
241 subject,
242 predicate,
243 object,
244 } => {
245 let triple = (subject.clone(), predicate.clone(), object.clone());
246 if deletes.contains(&triple) {
247 warnings.push(format!("Triple added after being deleted: {triple:?}"));
248 }
249 adds.insert(triple);
250 }
251 PatchOperation::Delete {
252 subject,
253 predicate,
254 object,
255 } => {
256 let triple = (subject.clone(), predicate.clone(), object.clone());
257 if !adds.contains(&triple) {
258 warnings.push(format!("Triple deleted without prior addition: {triple:?}"));
259 }
260 deletes.insert(triple);
261 }
262 _ => {} }
264 }
265
266 Ok(warnings)
267}