Skip to main content

oxirs_stream/patch/
compressor.rs

1//! Patch compression and optimization
2
3use super::{PatchParser, PatchSerializer};
4use crate::{PatchOperation, RdfPatch};
5use anyhow::Result;
6use flate2::{read::GzDecoder, write::GzEncoder, Compression};
7use std::collections::HashMap;
8use std::io::{Read, Write};
9use tracing::info;
10
11pub struct PatchCompressor {
12    compression_level: u32,
13    enable_dictionary: bool,
14    prefix_compression: bool,
15}
16
17impl PatchCompressor {
18    pub fn new() -> Self {
19        Self {
20            compression_level: 6,
21            enable_dictionary: true,
22            prefix_compression: true,
23        }
24    }
25
26    pub fn with_compression_level(mut self, level: u32) -> Self {
27        self.compression_level = level.min(9);
28        self
29    }
30
31    pub fn with_dictionary_compression(mut self, enabled: bool) -> Self {
32        self.enable_dictionary = enabled;
33        self
34    }
35
36    pub fn with_prefix_compression(mut self, enabled: bool) -> Self {
37        self.prefix_compression = enabled;
38        self
39    }
40
41    /// Compress patch using gzip compression
42    pub fn compress_patch(&self, patch: &RdfPatch) -> Result<Vec<u8>> {
43        // Serialize patch to string
44        let serializer = PatchSerializer::new().with_pretty_print(false);
45        let patch_str = serializer.serialize(patch)?;
46        let original_len = patch_str.len();
47
48        // Apply dictionary compression if enabled
49        let optimized_str = if self.enable_dictionary {
50            self.apply_dictionary_compression(&patch_str)?
51        } else {
52            patch_str
53        };
54
55        // Apply gzip compression
56        let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.compression_level));
57        encoder.write_all(optimized_str.as_bytes())?;
58        let compressed = encoder.finish()?;
59
60        info!(
61            "Compressed patch: {} -> {} bytes ({:.1}% reduction)",
62            original_len,
63            compressed.len(),
64            (1.0 - compressed.len() as f64 / original_len as f64) * 100.0
65        );
66
67        Ok(compressed)
68    }
69
70    /// Decompress patch from compressed bytes
71    pub fn decompress_patch(&self, compressed_data: &[u8]) -> Result<RdfPatch> {
72        // Decompress gzip
73        let mut decoder = GzDecoder::new(compressed_data);
74        let mut decompressed = String::new();
75        decoder.read_to_string(&mut decompressed)?;
76
77        // Apply dictionary decompression if needed
78        let patch_str = if self.enable_dictionary {
79            self.apply_dictionary_decompression(&decompressed)?
80        } else {
81            decompressed
82        };
83
84        // Parse patch
85        let mut parser = PatchParser::new();
86        parser.parse(&patch_str)
87    }
88
89    fn apply_dictionary_compression(&self, patch_str: &str) -> Result<String> {
90        // Build frequency dictionary of common terms
91        let mut word_freq = HashMap::new();
92        for word in patch_str.split_whitespace() {
93            *word_freq.entry(word.to_string()).or_insert(0) += 1;
94        }
95
96        // Create dictionary of most frequent terms
97        let mut freq_words: Vec<_> = word_freq.into_iter().collect();
98        freq_words.sort_by(|a, b| b.1.cmp(&a.1));
99
100        let mut dictionary = HashMap::new();
101        let mut compressed = patch_str.to_string();
102
103        // Replace most frequent words with short codes
104        for (i, (word, freq)) in freq_words.iter().take(256).enumerate() {
105            if word.len() > 3 && *freq > 2 {
106                let code = format!("#{i:02x}");
107                dictionary.insert(code.clone(), word.clone());
108                compressed = compressed.replace(word, &code);
109            }
110        }
111
112        // Prepend dictionary to compressed string
113        let mut dict_header = String::new();
114        for (code, word) in dictionary {
115            dict_header.push_str(&format!("{code}={word}\n"));
116        }
117        dict_header.push_str("---\n");
118        dict_header.push_str(&compressed);
119
120        Ok(dict_header)
121    }
122
123    fn apply_dictionary_decompression(&self, compressed_str: &str) -> Result<String> {
124        if let Some(separator_pos) = compressed_str.find("---\n") {
125            let (dict_part, content_part) = compressed_str.split_at(separator_pos);
126            let content = &content_part[4..]; // Skip "---\n"
127
128            let mut dictionary = HashMap::new();
129            for line in dict_part.lines() {
130                if let Some(eq_pos) = line.find('=') {
131                    let code = &line[..eq_pos];
132                    let word = &line[eq_pos + 1..];
133                    dictionary.insert(code, word);
134                }
135            }
136
137            let mut decompressed = content.to_string();
138            for (code, word) in dictionary {
139                decompressed = decompressed.replace(code, word);
140            }
141
142            Ok(decompressed)
143        } else {
144            Ok(compressed_str.to_string())
145        }
146    }
147
148    /// Compress using prefix compression for common namespaces
149    pub fn compress_with_prefixes(&self, patch: &RdfPatch) -> Result<RdfPatch> {
150        let mut compressed = patch.clone();
151        compressed.id = format!("{}-prefix-compressed", patch.id);
152
153        if !self.prefix_compression {
154            return Ok(compressed);
155        }
156
157        // Build frequency map of URI prefixes
158        let mut prefix_freq = HashMap::new();
159        for operation in &patch.operations {
160            self.collect_uris_from_operation(operation, &mut prefix_freq);
161        }
162
163        // Find common prefixes
164        let mut common_prefixes = HashMap::new();
165        for (uri, freq) in prefix_freq {
166            if freq > 2 {
167                if let Some(prefix) = self.extract_namespace_prefix(&uri) {
168                    if prefix.len() > 10 {
169                        let short_prefix = format!("ns{}", common_prefixes.len());
170                        common_prefixes.insert(prefix, short_prefix);
171                    }
172                }
173            }
174        }
175
176        // Add prefix declarations to patch
177        for (namespace, prefix) in &common_prefixes {
178            compressed.add_operation(PatchOperation::AddPrefix {
179                prefix: prefix.clone(),
180                namespace: namespace.clone(),
181            });
182        }
183
184        // Replace URIs with prefixed forms
185        for operation in &mut compressed.operations {
186            self.apply_prefix_compression_to_operation(operation, &common_prefixes);
187        }
188
189        info!(
190            "Applied prefix compression: {} prefixes defined",
191            common_prefixes.len()
192        );
193        Ok(compressed)
194    }
195
196    fn collect_uris_from_operation(
197        &self,
198        operation: &PatchOperation,
199        prefix_freq: &mut HashMap<String, usize>,
200    ) {
201        match operation {
202            PatchOperation::Add {
203                subject,
204                predicate,
205                object,
206            } => {
207                *prefix_freq.entry(subject.clone()).or_insert(0) += 1;
208                *prefix_freq.entry(predicate.clone()).or_insert(0) += 1;
209                *prefix_freq.entry(object.clone()).or_insert(0) += 1;
210            }
211            PatchOperation::Delete {
212                subject,
213                predicate,
214                object,
215            } => {
216                *prefix_freq.entry(subject.clone()).or_insert(0) += 1;
217                *prefix_freq.entry(predicate.clone()).or_insert(0) += 1;
218                *prefix_freq.entry(object.clone()).or_insert(0) += 1;
219            }
220            PatchOperation::AddGraph { graph } | PatchOperation::DeleteGraph { graph } => {
221                *prefix_freq.entry(graph.clone()).or_insert(0) += 1;
222            }
223            _ => {}
224        }
225    }
226
227    fn extract_namespace_prefix(&self, uri: &str) -> Option<String> {
228        // Extract namespace part of URI (everything up to last # or /)
229        if let Some(pos) = uri.rfind('#') {
230            Some(uri[..pos + 1].to_string())
231        } else {
232            uri.rfind('/').map(|pos| uri[..pos + 1].to_string())
233        }
234    }
235
236    fn apply_prefix_compression_to_operation(
237        &self,
238        operation: &mut PatchOperation,
239        prefixes: &HashMap<String, String>,
240    ) {
241        match operation {
242            PatchOperation::Add {
243                subject,
244                predicate,
245                object,
246            } => {
247                *subject = self.compress_uri_with_prefixes(subject, prefixes);
248                *predicate = self.compress_uri_with_prefixes(predicate, prefixes);
249                *object = self.compress_uri_with_prefixes(object, prefixes);
250            }
251            PatchOperation::Delete {
252                subject,
253                predicate,
254                object,
255            } => {
256                *subject = self.compress_uri_with_prefixes(subject, prefixes);
257                *predicate = self.compress_uri_with_prefixes(predicate, prefixes);
258                *object = self.compress_uri_with_prefixes(object, prefixes);
259            }
260            PatchOperation::AddGraph { graph } | PatchOperation::DeleteGraph { graph } => {
261                *graph = self.compress_uri_with_prefixes(graph, prefixes);
262            }
263            _ => {}
264        }
265    }
266
267    fn compress_uri_with_prefixes(&self, uri: &str, prefixes: &HashMap<String, String>) -> String {
268        for (namespace, prefix) in prefixes {
269            if uri.starts_with(namespace) {
270                let local_name = &uri[namespace.len()..];
271                return format!("{prefix}:{local_name}");
272            }
273        }
274        uri.to_string()
275    }
276}
277
278impl Default for PatchCompressor {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::patch::context::{apply_patch_with_context, PatchContext};
288    use crate::patch::result::{
289        create_reverse_patch, create_transactional_patch, optimize_patch, validate_patch,
290    };
291
292    #[test]
293    fn test_patch_serialization() {
294        let mut patch = RdfPatch::new();
295        patch.add_operation(PatchOperation::Header {
296            key: "creator".to_string(),
297            value: "test-suite".to_string(),
298        });
299        patch.add_operation(PatchOperation::TransactionBegin {
300            transaction_id: Some("tx-123".to_string()),
301        });
302        patch.add_operation(PatchOperation::AddPrefix {
303            prefix: "ex".to_string(),
304            namespace: "http://example.org/".to_string(),
305        });
306        patch.add_operation(PatchOperation::Add {
307            subject: "http://example.org/subject".to_string(),
308            predicate: "http://example.org/predicate".to_string(),
309            object: "\"Object literal\"".to_string(),
310        });
311        patch.add_operation(PatchOperation::Delete {
312            subject: "http://example.org/subject2".to_string(),
313            predicate: "http://example.org/predicate2".to_string(),
314            object: "http://example.org/object2".to_string(),
315        });
316        patch.add_operation(PatchOperation::TransactionCommit);
317
318        let serializer = PatchSerializer::new();
319        let result = serializer.serialize(&patch);
320
321        assert!(result.is_ok());
322        let serialized = result.unwrap();
323        assert!(serialized.contains("H creator test-suite"));
324        assert!(serialized.contains("TX tx-123"));
325        assert!(serialized.contains("PA ex:"));
326        assert!(serialized.contains("A "));
327        assert!(serialized.contains("D "));
328        assert!(serialized.contains("TC"));
329        assert!(serialized.contains("@prefix"));
330    }
331
332    #[test]
333    fn test_patch_parsing() {
334        let patch_content = r#"
335@prefix ex: <http://example.org/> .
336@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
337
338H creator test-parser .
339TX tx-456 .
340PA ex2: <http://example2.org/> .
341A ex:subject ex:predicate "Object literal" .
342D ex:subject2 ex:predicate2 ex:object2 .
343GA ex:graph1 .
344GD ex:graph2 .
345PD old: .
346TC .
347"#;
348
349        let mut parser = PatchParser::new();
350        let result = parser.parse(patch_content);
351
352        assert!(result.is_ok());
353        let patch = result.unwrap();
354        assert_eq!(patch.operations.len(), 9);
355
356        // Check header was captured
357        assert_eq!(
358            patch.headers.get("creator"),
359            Some(&"test-parser".to_string())
360        );
361
362        // Check transaction ID was captured
363        assert_eq!(patch.transaction_id, Some("tx-456".to_string()));
364
365        // Check prefix was captured
366        assert_eq!(
367            patch.prefixes.get("ex2"),
368            Some(&"http://example2.org/".to_string())
369        );
370
371        match &patch.operations[0] {
372            PatchOperation::Header { key, value } => {
373                assert_eq!(key, "creator");
374                assert_eq!(value, "test-parser");
375            }
376            _ => panic!("Expected Header operation"),
377        }
378
379        match &patch.operations[1] {
380            PatchOperation::TransactionBegin { transaction_id } => {
381                assert_eq!(transaction_id, &Some("tx-456".to_string()));
382            }
383            _ => panic!("Expected TransactionBegin operation"),
384        }
385    }
386
387    #[test]
388    fn test_patch_round_trip() {
389        let mut original_patch = RdfPatch::new();
390        original_patch.add_operation(PatchOperation::Add {
391            subject: "http://example.org/subject".to_string(),
392            predicate: "http://example.org/predicate".to_string(),
393            object: "\"Test object\"".to_string(),
394        });
395
396        // Serialize to string
397        let serialized = original_patch.to_rdf_patch_format().unwrap();
398
399        // Parse back from string
400        let parsed_patch = RdfPatch::from_rdf_patch_format(&serialized).unwrap();
401
402        // Check that we get the same operations
403        assert_eq!(
404            original_patch.operations.len(),
405            parsed_patch.operations.len()
406        );
407
408        match (&original_patch.operations[0], &parsed_patch.operations[0]) {
409            (
410                PatchOperation::Add {
411                    subject: s1,
412                    predicate: p1,
413                    object: o1,
414                },
415                PatchOperation::Add {
416                    subject: s2,
417                    predicate: p2,
418                    object: o2,
419                },
420            ) => {
421                assert_eq!(s1, s2);
422                assert_eq!(p1, p2);
423                assert_eq!(o1, o2);
424            }
425            _ => panic!("Operations don't match"),
426        }
427    }
428
429    #[test]
430    fn test_reverse_patch() {
431        let mut patch = RdfPatch::new();
432        patch.add_operation(PatchOperation::TransactionBegin {
433            transaction_id: Some("tx-789".to_string()),
434        });
435        patch.add_operation(PatchOperation::Add {
436            subject: "http://example.org/s".to_string(),
437            predicate: "http://example.org/p".to_string(),
438            object: "http://example.org/o".to_string(),
439        });
440        patch.add_operation(PatchOperation::AddGraph {
441            graph: "http://example.org/graph".to_string(),
442        });
443        patch.add_operation(PatchOperation::TransactionCommit);
444        patch.transaction_id = Some("tx-789".to_string());
445
446        let reverse = create_reverse_patch(&patch).unwrap();
447
448        // Should have TX, two reversed operations, and TC
449        assert!(reverse.operations.len() >= 4);
450
451        // First should be transaction begin (reversing the commit)
452        match &reverse.operations[0] {
453            PatchOperation::TransactionBegin { .. } => {}
454            _ => panic!("Expected TransactionBegin operation"),
455        }
456
457        // Find the reversed operations
458        let has_delete_graph = reverse.operations.iter().any(|op| {
459            matches!(op, PatchOperation::DeleteGraph { graph } if graph == "http://example.org/graph")
460        });
461        let has_delete_triple = reverse.operations.iter().any(|op| {
462            matches!(op, PatchOperation::Delete { subject, .. } if subject == "http://example.org/s")
463        });
464
465        assert!(has_delete_graph);
466        assert!(has_delete_triple);
467
468        // Last should be transaction commit
469        match reverse.operations.last() {
470            Some(PatchOperation::TransactionCommit) => {}
471            _ => panic!("Expected TransactionCommit as last operation"),
472        }
473    }
474
475    #[test]
476    fn test_patch_optimization() {
477        let mut patch = RdfPatch::new();
478        let operation = PatchOperation::Add {
479            subject: "http://example.org/s".to_string(),
480            predicate: "http://example.org/p".to_string(),
481            object: "http://example.org/o".to_string(),
482        };
483
484        // Add the same operation twice
485        patch.add_operation(operation.clone());
486        patch.add_operation(operation);
487
488        let optimized = optimize_patch(&patch).unwrap();
489
490        // Should remove duplicate
491        assert_eq!(optimized.operations.len(), 1);
492    }
493
494    #[test]
495    fn test_transactional_patch() {
496        let operations = vec![
497            PatchOperation::Add {
498                subject: "s1".to_string(),
499                predicate: "p1".to_string(),
500                object: "o1".to_string(),
501            },
502            PatchOperation::Delete {
503                subject: "s2".to_string(),
504                predicate: "p2".to_string(),
505                object: "o2".to_string(),
506            },
507        ];
508
509        let patch = create_transactional_patch(operations);
510
511        // Should have TX + 2 operations + TC = 4 total
512        assert_eq!(patch.operations.len(), 4);
513
514        // First should be transaction begin
515        assert!(matches!(
516            &patch.operations[0],
517            PatchOperation::TransactionBegin { .. }
518        ));
519
520        // Last should be transaction commit
521        assert!(matches!(
522            &patch.operations[3],
523            PatchOperation::TransactionCommit
524        ));
525
526        // Should have transaction ID set
527        assert!(patch.transaction_id.is_some());
528    }
529
530    #[test]
531    fn test_patch_validation() {
532        let mut patch = RdfPatch::new();
533        patch.add_operation(PatchOperation::Delete {
534            subject: "http://example.org/s".to_string(),
535            predicate: "http://example.org/p".to_string(),
536            object: "http://example.org/o".to_string(),
537        });
538
539        let warnings = validate_patch(&patch).unwrap();
540
541        // Should warn about deleting without prior addition
542        assert!(!warnings.is_empty());
543        assert!(warnings[0].contains("deleted without prior addition"));
544    }
545
546    #[test]
547    fn test_patch_application() {
548        let mut patch = RdfPatch::new();
549        patch.add_operation(PatchOperation::Add {
550            subject: "http://example.org/s".to_string(),
551            predicate: "http://example.org/p".to_string(),
552            object: "http://example.org/o".to_string(),
553        });
554
555        let context = PatchContext {
556            strict_mode: false,
557            validate_operations: true,
558            dry_run: true,
559        };
560
561        let result = apply_patch_with_context(&patch, &context).unwrap();
562
563        assert_eq!(result.total_operations, 1);
564        assert_eq!(result.operations_applied, 1);
565        assert!(result.is_success());
566        assert_eq!(result.success_rate(), 1.0);
567    }
568}