Skip to main content

oxirs_stream/patch/
result.rs

1//! Patch result and reporting
2
3use crate::{PatchOperation, RdfPatch};
4use anyhow::Result;
5use chrono::{DateTime, Utc};
6use std::fmt;
7use tracing::debug;
8
9pub struct PatchResult {
10    pub patch_id: String,
11    pub total_operations: usize,
12    pub operations_applied: usize,
13    pub errors: Vec<String>,
14    pub applied_at: DateTime<Utc>,
15}
16
17impl Default for PatchResult {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl PatchResult {
24    pub fn new() -> Self {
25        Self {
26            patch_id: String::new(),
27            total_operations: 0,
28            operations_applied: 0,
29            errors: Vec::new(),
30            applied_at: Utc::now(),
31        }
32    }
33
34    pub fn is_success(&self) -> bool {
35        self.errors.is_empty() && self.operations_applied == self.total_operations
36    }
37
38    pub fn success_rate(&self) -> f64 {
39        if self.total_operations == 0 {
40            1.0
41        } else {
42            self.operations_applied as f64 / self.total_operations as f64
43        }
44    }
45}
46
47impl fmt::Display for PatchResult {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        write!(
50            f,
51            "Patch {} applied: {}/{} operations ({:.1}% success)",
52            self.patch_id,
53            self.operations_applied,
54            self.total_operations,
55            self.success_rate() * 100.0
56        )
57    }
58}
59
60/// Create RDF Patch from SPARQL Update
61pub fn create_patch_from_sparql(update: &str) -> Result<RdfPatch> {
62    let mut delta_computer = crate::delta::DeltaComputer::new();
63    delta_computer.sparql_to_patch(update)
64}
65
66/// Create transactional patch
67pub fn create_transactional_patch(operations: Vec<PatchOperation>) -> RdfPatch {
68    let mut patch = RdfPatch::new();
69    let transaction_id = uuid::Uuid::new_v4().to_string();
70
71    // Add transaction begin
72    patch.add_operation(PatchOperation::TransactionBegin {
73        transaction_id: Some(transaction_id.clone()),
74    });
75    patch.transaction_id = Some(transaction_id);
76
77    // Add all operations
78    for op in operations {
79        patch.add_operation(op);
80    }
81
82    // Add transaction commit
83    patch.add_operation(PatchOperation::TransactionCommit);
84
85    patch
86}
87
88/// Create reverse patch from an existing patch
89pub fn create_reverse_patch(patch: &RdfPatch) -> Result<RdfPatch> {
90    let mut reverse_patch = RdfPatch::new();
91    reverse_patch.id = format!("{}-reverse", patch.id);
92
93    // Copy headers and prefixes
94    reverse_patch.headers = patch.headers.clone();
95    reverse_patch.prefixes = patch.prefixes.clone();
96
97    // Track if we're reversing a transaction
98    let mut reversing_transaction = false;
99    let mut transaction_operations = Vec::new();
100
101    // Reverse the operations and their order
102    for operation in patch.operations.iter().rev() {
103        let reverse_operation = match operation {
104            PatchOperation::Add {
105                subject,
106                predicate,
107                object,
108            } => PatchOperation::Delete {
109                subject: subject.clone(),
110                predicate: predicate.clone(),
111                object: object.clone(),
112            },
113            PatchOperation::Delete {
114                subject,
115                predicate,
116                object,
117            } => PatchOperation::Add {
118                subject: subject.clone(),
119                predicate: predicate.clone(),
120                object: object.clone(),
121            },
122            PatchOperation::AddGraph { graph } => PatchOperation::DeleteGraph {
123                graph: graph.clone(),
124            },
125            PatchOperation::DeleteGraph { graph } => PatchOperation::AddGraph {
126                graph: graph.clone(),
127            },
128            PatchOperation::AddPrefix {
129                prefix,
130                namespace: _namespace,
131            } => PatchOperation::DeletePrefix {
132                prefix: prefix.clone(),
133            },
134            PatchOperation::DeletePrefix { prefix } => {
135                // Can't reverse a prefix deletion without knowing the namespace
136                // Skip or add as header
137                reverse_patch.add_operation(PatchOperation::Header {
138                    key: "warning".to_string(),
139                    value: format!("Cannot reverse prefix deletion for '{prefix}'"),
140                });
141                continue;
142            }
143            PatchOperation::TransactionBegin { transaction_id: _ } => {
144                // End of transaction (we're reversing)
145                reversing_transaction = false;
146                // Add all collected operations
147                for op in transaction_operations.drain(..) {
148                    reverse_patch.add_operation(op);
149                }
150                PatchOperation::TransactionCommit
151            }
152            PatchOperation::TransactionCommit => {
153                // Start of transaction (we're reversing)
154                reversing_transaction = true;
155                PatchOperation::TransactionBegin {
156                    transaction_id: patch.transaction_id.clone(),
157                }
158            }
159            PatchOperation::TransactionAbort => {
160                // Transaction was aborted, no need to reverse
161                continue;
162            }
163            PatchOperation::Header { key, value } => PatchOperation::Header {
164                key: format!("reversed-{key}"),
165                value: value.clone(),
166            },
167        };
168
169        if reversing_transaction && !matches!(operation, PatchOperation::TransactionCommit) {
170            transaction_operations.push(reverse_operation);
171        } else {
172            reverse_patch.add_operation(reverse_operation);
173        }
174    }
175
176    debug!(
177        "Created reverse patch with {} operations",
178        reverse_patch.operations.len()
179    );
180    Ok(reverse_patch)
181}
182
183/// Merge multiple patches into one
184pub fn merge_patches(patches: &[RdfPatch]) -> Result<RdfPatch> {
185    let mut merged = RdfPatch::new();
186    merged.id = format!("merged-{}", uuid::Uuid::new_v4());
187
188    for patch in patches {
189        for operation in &patch.operations {
190            merged.add_operation(operation.clone());
191        }
192    }
193
194    debug!(
195        "Merged {} patches into {} operations",
196        patches.len(),
197        merged.operations.len()
198    );
199    Ok(merged)
200}
201
202/// Optimize patch by removing redundant operations
203pub fn optimize_patch(patch: &RdfPatch) -> Result<RdfPatch> {
204    let mut optimized = RdfPatch::new();
205    optimized.id = format!("{}-optimized", patch.id);
206
207    let mut seen_operations = std::collections::HashSet::new();
208
209    for operation in &patch.operations {
210        let operation_key = format!("{operation:?}");
211
212        // Skip duplicate operations
213        if seen_operations.contains(&operation_key) {
214            continue;
215        }
216
217        seen_operations.insert(operation_key);
218        optimized.add_operation(operation.clone());
219    }
220
221    debug!(
222        "Optimized patch from {} to {} operations",
223        patch.operations.len(),
224        optimized.operations.len()
225    );
226
227    Ok(optimized)
228}
229
230/// Validate patch consistency
231pub fn validate_patch(patch: &RdfPatch) -> Result<Vec<String>> {
232    let mut warnings = Vec::new();
233
234    // Check for conflicting operations
235    let mut adds = std::collections::HashSet::new();
236    let mut deletes = std::collections::HashSet::new();
237
238    for operation in &patch.operations {
239        match operation {
240            PatchOperation::Add {
241                subject,
242                predicate,
243                object,
244            } => {
245                let triple = (subject.clone(), predicate.clone(), object.clone());
246                if deletes.contains(&triple) {
247                    warnings.push(format!("Triple added after being deleted: {triple:?}"));
248                }
249                adds.insert(triple);
250            }
251            PatchOperation::Delete {
252                subject,
253                predicate,
254                object,
255            } => {
256                let triple = (subject.clone(), predicate.clone(), object.clone());
257                if !adds.contains(&triple) {
258                    warnings.push(format!("Triple deleted without prior addition: {triple:?}"));
259                }
260                deletes.insert(triple);
261            }
262            _ => {} // Graph operations don't conflict in the same way
263        }
264    }
265
266    Ok(warnings)
267}