Skip to main content

oxirs_stream/patch/
compressor.rs

1//! Patch compression and optimization
2
3use super::{PatchParser, PatchSerializer};
4use crate::{PatchOperation, RdfPatch};
5use anyhow::{anyhow, Result};
6use std::collections::HashMap;
7use tracing::info;
8
9pub struct PatchCompressor {
10    compression_level: u32,
11    enable_dictionary: bool,
12    prefix_compression: bool,
13}
14
15impl PatchCompressor {
16    pub fn new() -> Self {
17        Self {
18            compression_level: 6,
19            enable_dictionary: true,
20            prefix_compression: true,
21        }
22    }
23
24    pub fn with_compression_level(mut self, level: u32) -> Self {
25        self.compression_level = level.min(9);
26        self
27    }
28
29    pub fn with_dictionary_compression(mut self, enabled: bool) -> Self {
30        self.enable_dictionary = enabled;
31        self
32    }
33
34    pub fn with_prefix_compression(mut self, enabled: bool) -> Self {
35        self.prefix_compression = enabled;
36        self
37    }
38
39    /// Compress patch using gzip compression
40    pub fn compress_patch(&self, patch: &RdfPatch) -> Result<Vec<u8>> {
41        // Serialize patch to string
42        let serializer = PatchSerializer::new().with_pretty_print(false);
43        let patch_str = serializer.serialize(patch)?;
44        let original_len = patch_str.len();
45
46        // Apply dictionary compression if enabled
47        let optimized_str = if self.enable_dictionary {
48            self.apply_dictionary_compression(&patch_str)?
49        } else {
50            patch_str
51        };
52
53        // Apply gzip compression
54        let compressed =
55            oxiarc_deflate::gzip_compress(optimized_str.as_bytes(), self.compression_level as u8)
56                .map_err(|e| anyhow!("Gzip compression failed: {e}"))?;
57
58        info!(
59            "Compressed patch: {} -> {} bytes ({:.1}% reduction)",
60            original_len,
61            compressed.len(),
62            (1.0 - compressed.len() as f64 / original_len as f64) * 100.0
63        );
64
65        Ok(compressed)
66    }
67
68    /// Decompress patch from compressed bytes
69    pub fn decompress_patch(&self, compressed_data: &[u8]) -> Result<RdfPatch> {
70        // Decompress gzip
71        let decompressed_bytes = oxiarc_deflate::gzip_decompress(compressed_data)
72            .map_err(|e| anyhow!("Gzip decompression failed: {e}"))?;
73        let decompressed = String::from_utf8(decompressed_bytes)
74            .map_err(|e| anyhow!("Decompressed patch is not valid UTF-8: {e}"))?;
75
76        // Apply dictionary decompression if needed
77        let patch_str = if self.enable_dictionary {
78            self.apply_dictionary_decompression(&decompressed)?
79        } else {
80            decompressed
81        };
82
83        // Parse patch
84        let mut parser = PatchParser::new();
85        parser.parse(&patch_str)
86    }
87
88    fn apply_dictionary_compression(&self, patch_str: &str) -> Result<String> {
89        // Build frequency dictionary of common terms
90        let mut word_freq = HashMap::new();
91        for word in patch_str.split_whitespace() {
92            *word_freq.entry(word.to_string()).or_insert(0) += 1;
93        }
94
95        // Create dictionary of most frequent terms
96        let mut freq_words: Vec<_> = word_freq.into_iter().collect();
97        freq_words.sort_by_key(|b| std::cmp::Reverse(b.1));
98
99        let mut dictionary = HashMap::new();
100        let mut compressed = patch_str.to_string();
101
102        // Replace most frequent words with short codes
103        for (i, (word, freq)) in freq_words.iter().take(256).enumerate() {
104            if word.len() > 3 && *freq > 2 {
105                let code = format!("#{i:02x}");
106                dictionary.insert(code.clone(), word.clone());
107                compressed = compressed.replace(word, &code);
108            }
109        }
110
111        // Prepend dictionary to compressed string
112        let mut dict_header = String::new();
113        for (code, word) in dictionary {
114            dict_header.push_str(&format!("{code}={word}\n"));
115        }
116        dict_header.push_str("---\n");
117        dict_header.push_str(&compressed);
118
119        Ok(dict_header)
120    }
121
122    fn apply_dictionary_decompression(&self, compressed_str: &str) -> Result<String> {
123        if let Some(separator_pos) = compressed_str.find("---\n") {
124            let (dict_part, content_part) = compressed_str.split_at(separator_pos);
125            let content = &content_part[4..]; // Skip "---\n"
126
127            let mut dictionary = HashMap::new();
128            for line in dict_part.lines() {
129                if let Some(eq_pos) = line.find('=') {
130                    let code = &line[..eq_pos];
131                    let word = &line[eq_pos + 1..];
132                    dictionary.insert(code, word);
133                }
134            }
135
136            let mut decompressed = content.to_string();
137            for (code, word) in dictionary {
138                decompressed = decompressed.replace(code, word);
139            }
140
141            Ok(decompressed)
142        } else {
143            Ok(compressed_str.to_string())
144        }
145    }
146
147    /// Compress using prefix compression for common namespaces
148    pub fn compress_with_prefixes(&self, patch: &RdfPatch) -> Result<RdfPatch> {
149        let mut compressed = patch.clone();
150        compressed.id = format!("{}-prefix-compressed", patch.id);
151
152        if !self.prefix_compression {
153            return Ok(compressed);
154        }
155
156        // Build frequency map of URI prefixes
157        let mut prefix_freq = HashMap::new();
158        for operation in &patch.operations {
159            self.collect_uris_from_operation(operation, &mut prefix_freq);
160        }
161
162        // Find common prefixes
163        let mut common_prefixes = HashMap::new();
164        for (uri, freq) in prefix_freq {
165            if freq > 2 {
166                if let Some(prefix) = self.extract_namespace_prefix(&uri) {
167                    if prefix.len() > 10 {
168                        let short_prefix = format!("ns{}", common_prefixes.len());
169                        common_prefixes.insert(prefix, short_prefix);
170                    }
171                }
172            }
173        }
174
175        // Add prefix declarations to patch
176        for (namespace, prefix) in &common_prefixes {
177            compressed.add_operation(PatchOperation::AddPrefix {
178                prefix: prefix.clone(),
179                namespace: namespace.clone(),
180            });
181        }
182
183        // Replace URIs with prefixed forms
184        for operation in &mut compressed.operations {
185            self.apply_prefix_compression_to_operation(operation, &common_prefixes);
186        }
187
188        info!(
189            "Applied prefix compression: {} prefixes defined",
190            common_prefixes.len()
191        );
192        Ok(compressed)
193    }
194
195    fn collect_uris_from_operation(
196        &self,
197        operation: &PatchOperation,
198        prefix_freq: &mut HashMap<String, usize>,
199    ) {
200        match operation {
201            PatchOperation::Add {
202                subject,
203                predicate,
204                object,
205            } => {
206                *prefix_freq.entry(subject.clone()).or_insert(0) += 1;
207                *prefix_freq.entry(predicate.clone()).or_insert(0) += 1;
208                *prefix_freq.entry(object.clone()).or_insert(0) += 1;
209            }
210            PatchOperation::Delete {
211                subject,
212                predicate,
213                object,
214            } => {
215                *prefix_freq.entry(subject.clone()).or_insert(0) += 1;
216                *prefix_freq.entry(predicate.clone()).or_insert(0) += 1;
217                *prefix_freq.entry(object.clone()).or_insert(0) += 1;
218            }
219            PatchOperation::AddGraph { graph } | PatchOperation::DeleteGraph { graph } => {
220                *prefix_freq.entry(graph.clone()).or_insert(0) += 1;
221            }
222            _ => {}
223        }
224    }
225
226    fn extract_namespace_prefix(&self, uri: &str) -> Option<String> {
227        // Extract namespace part of URI (everything up to last # or /)
228        if let Some(pos) = uri.rfind('#') {
229            Some(uri[..pos + 1].to_string())
230        } else {
231            uri.rfind('/').map(|pos| uri[..pos + 1].to_string())
232        }
233    }
234
235    fn apply_prefix_compression_to_operation(
236        &self,
237        operation: &mut PatchOperation,
238        prefixes: &HashMap<String, String>,
239    ) {
240        match operation {
241            PatchOperation::Add {
242                subject,
243                predicate,
244                object,
245            } => {
246                *subject = self.compress_uri_with_prefixes(subject, prefixes);
247                *predicate = self.compress_uri_with_prefixes(predicate, prefixes);
248                *object = self.compress_uri_with_prefixes(object, prefixes);
249            }
250            PatchOperation::Delete {
251                subject,
252                predicate,
253                object,
254            } => {
255                *subject = self.compress_uri_with_prefixes(subject, prefixes);
256                *predicate = self.compress_uri_with_prefixes(predicate, prefixes);
257                *object = self.compress_uri_with_prefixes(object, prefixes);
258            }
259            PatchOperation::AddGraph { graph } | PatchOperation::DeleteGraph { graph } => {
260                *graph = self.compress_uri_with_prefixes(graph, prefixes);
261            }
262            _ => {}
263        }
264    }
265
266    fn compress_uri_with_prefixes(&self, uri: &str, prefixes: &HashMap<String, String>) -> String {
267        for (namespace, prefix) in prefixes {
268            if uri.starts_with(namespace) {
269                let local_name = &uri[namespace.len()..];
270                return format!("{prefix}:{local_name}");
271            }
272        }
273        uri.to_string()
274    }
275}
276
277impl Default for PatchCompressor {
278    fn default() -> Self {
279        Self::new()
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::patch::context::{apply_patch_with_context, PatchContext};
287    use crate::patch::result::{
288        create_reverse_patch, create_transactional_patch, optimize_patch, validate_patch,
289    };
290
291    #[test]
292    fn test_patch_serialization() {
293        let mut patch = RdfPatch::new();
294        patch.add_operation(PatchOperation::Header {
295            key: "creator".to_string(),
296            value: "test-suite".to_string(),
297        });
298        patch.add_operation(PatchOperation::TransactionBegin {
299            transaction_id: Some("tx-123".to_string()),
300        });
301        patch.add_operation(PatchOperation::AddPrefix {
302            prefix: "ex".to_string(),
303            namespace: "http://example.org/".to_string(),
304        });
305        patch.add_operation(PatchOperation::Add {
306            subject: "http://example.org/subject".to_string(),
307            predicate: "http://example.org/predicate".to_string(),
308            object: "\"Object literal\"".to_string(),
309        });
310        patch.add_operation(PatchOperation::Delete {
311            subject: "http://example.org/subject2".to_string(),
312            predicate: "http://example.org/predicate2".to_string(),
313            object: "http://example.org/object2".to_string(),
314        });
315        patch.add_operation(PatchOperation::TransactionCommit);
316
317        let serializer = PatchSerializer::new();
318        let result = serializer.serialize(&patch);
319
320        assert!(result.is_ok());
321        let serialized = result.unwrap();
322        assert!(serialized.contains("H creator test-suite"));
323        assert!(serialized.contains("TX tx-123"));
324        assert!(serialized.contains("PA ex:"));
325        assert!(serialized.contains("A "));
326        assert!(serialized.contains("D "));
327        assert!(serialized.contains("TC"));
328        assert!(serialized.contains("@prefix"));
329    }
330
331    #[test]
332    fn test_patch_parsing() {
333        let patch_content = r#"
334@prefix ex: <http://example.org/> .
335@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
336
337H creator test-parser .
338TX tx-456 .
339PA ex2: <http://example2.org/> .
340A ex:subject ex:predicate "Object literal" .
341D ex:subject2 ex:predicate2 ex:object2 .
342GA ex:graph1 .
343GD ex:graph2 .
344PD old: .
345TC .
346"#;
347
348        let mut parser = PatchParser::new();
349        let result = parser.parse(patch_content);
350
351        assert!(result.is_ok());
352        let patch = result.unwrap();
353        assert_eq!(patch.operations.len(), 9);
354
355        // Check header was captured
356        assert_eq!(
357            patch.headers.get("creator"),
358            Some(&"test-parser".to_string())
359        );
360
361        // Check transaction ID was captured
362        assert_eq!(patch.transaction_id, Some("tx-456".to_string()));
363
364        // Check prefix was captured
365        assert_eq!(
366            patch.prefixes.get("ex2"),
367            Some(&"http://example2.org/".to_string())
368        );
369
370        match &patch.operations[0] {
371            PatchOperation::Header { key, value } => {
372                assert_eq!(key, "creator");
373                assert_eq!(value, "test-parser");
374            }
375            _ => panic!("Expected Header operation"),
376        }
377
378        match &patch.operations[1] {
379            PatchOperation::TransactionBegin { transaction_id } => {
380                assert_eq!(transaction_id, &Some("tx-456".to_string()));
381            }
382            _ => panic!("Expected TransactionBegin operation"),
383        }
384    }
385
386    #[test]
387    fn test_patch_round_trip() {
388        let mut original_patch = RdfPatch::new();
389        original_patch.add_operation(PatchOperation::Add {
390            subject: "http://example.org/subject".to_string(),
391            predicate: "http://example.org/predicate".to_string(),
392            object: "\"Test object\"".to_string(),
393        });
394
395        // Serialize to string
396        let serialized = original_patch.to_rdf_patch_format().unwrap();
397
398        // Parse back from string
399        let parsed_patch = RdfPatch::from_rdf_patch_format(&serialized).unwrap();
400
401        // Check that we get the same operations
402        assert_eq!(
403            original_patch.operations.len(),
404            parsed_patch.operations.len()
405        );
406
407        match (&original_patch.operations[0], &parsed_patch.operations[0]) {
408            (
409                PatchOperation::Add {
410                    subject: s1,
411                    predicate: p1,
412                    object: o1,
413                },
414                PatchOperation::Add {
415                    subject: s2,
416                    predicate: p2,
417                    object: o2,
418                },
419            ) => {
420                assert_eq!(s1, s2);
421                assert_eq!(p1, p2);
422                assert_eq!(o1, o2);
423            }
424            _ => panic!("Operations don't match"),
425        }
426    }
427
428    #[test]
429    fn test_reverse_patch() {
430        let mut patch = RdfPatch::new();
431        patch.add_operation(PatchOperation::TransactionBegin {
432            transaction_id: Some("tx-789".to_string()),
433        });
434        patch.add_operation(PatchOperation::Add {
435            subject: "http://example.org/s".to_string(),
436            predicate: "http://example.org/p".to_string(),
437            object: "http://example.org/o".to_string(),
438        });
439        patch.add_operation(PatchOperation::AddGraph {
440            graph: "http://example.org/graph".to_string(),
441        });
442        patch.add_operation(PatchOperation::TransactionCommit);
443        patch.transaction_id = Some("tx-789".to_string());
444
445        let reverse = create_reverse_patch(&patch).unwrap();
446
447        // Should have TX, two reversed operations, and TC
448        assert!(reverse.operations.len() >= 4);
449
450        // First should be transaction begin (reversing the commit)
451        match &reverse.operations[0] {
452            PatchOperation::TransactionBegin { .. } => {}
453            _ => panic!("Expected TransactionBegin operation"),
454        }
455
456        // Find the reversed operations
457        let has_delete_graph = reverse.operations.iter().any(|op| {
458            matches!(op, PatchOperation::DeleteGraph { graph } if graph == "http://example.org/graph")
459        });
460        let has_delete_triple = reverse.operations.iter().any(|op| {
461            matches!(op, PatchOperation::Delete { subject, .. } if subject == "http://example.org/s")
462        });
463
464        assert!(has_delete_graph);
465        assert!(has_delete_triple);
466
467        // Last should be transaction commit
468        match reverse.operations.last() {
469            Some(PatchOperation::TransactionCommit) => {}
470            _ => panic!("Expected TransactionCommit as last operation"),
471        }
472    }
473
474    #[test]
475    fn test_patch_optimization() {
476        let mut patch = RdfPatch::new();
477        let operation = PatchOperation::Add {
478            subject: "http://example.org/s".to_string(),
479            predicate: "http://example.org/p".to_string(),
480            object: "http://example.org/o".to_string(),
481        };
482
483        // Add the same operation twice
484        patch.add_operation(operation.clone());
485        patch.add_operation(operation);
486
487        let optimized = optimize_patch(&patch).unwrap();
488
489        // Should remove duplicate
490        assert_eq!(optimized.operations.len(), 1);
491    }
492
493    #[test]
494    fn test_transactional_patch() {
495        let operations = vec![
496            PatchOperation::Add {
497                subject: "s1".to_string(),
498                predicate: "p1".to_string(),
499                object: "o1".to_string(),
500            },
501            PatchOperation::Delete {
502                subject: "s2".to_string(),
503                predicate: "p2".to_string(),
504                object: "o2".to_string(),
505            },
506        ];
507
508        let patch = create_transactional_patch(operations);
509
510        // Should have TX + 2 operations + TC = 4 total
511        assert_eq!(patch.operations.len(), 4);
512
513        // First should be transaction begin
514        assert!(matches!(
515            &patch.operations[0],
516            PatchOperation::TransactionBegin { .. }
517        ));
518
519        // Last should be transaction commit
520        assert!(matches!(
521            &patch.operations[3],
522            PatchOperation::TransactionCommit
523        ));
524
525        // Should have transaction ID set
526        assert!(patch.transaction_id.is_some());
527    }
528
529    #[test]
530    fn test_patch_validation() {
531        let mut patch = RdfPatch::new();
532        patch.add_operation(PatchOperation::Delete {
533            subject: "http://example.org/s".to_string(),
534            predicate: "http://example.org/p".to_string(),
535            object: "http://example.org/o".to_string(),
536        });
537
538        let warnings = validate_patch(&patch).unwrap();
539
540        // Should warn about deleting without prior addition
541        assert!(!warnings.is_empty());
542        assert!(warnings[0].contains("deleted without prior addition"));
543    }
544
545    #[test]
546    fn test_gzip_compress_patch_round_trip() {
547        // Exercises the oxiarc_deflate gzip compress/decompress path.
548        let mut patch = RdfPatch::new();
549        patch.add_operation(PatchOperation::AddPrefix {
550            prefix: "ex".to_string(),
551            namespace: "http://example.org/".to_string(),
552        });
553        for i in 0..32 {
554            patch.add_operation(PatchOperation::Add {
555                subject: format!("http://example.org/subject/{i}"),
556                predicate: "http://example.org/predicate".to_string(),
557                object: format!("\"value-{i}\""),
558            });
559        }
560
561        let compressor = PatchCompressor::new();
562        let compressed = compressor
563            .compress_patch(&patch)
564            .expect("gzip compress_patch");
565        let restored = compressor
566            .decompress_patch(&compressed)
567            .expect("gzip decompress_patch");
568
569        assert_eq!(
570            patch.operations.len(),
571            restored.operations.len(),
572            "operation count mismatch after gzip round-trip"
573        );
574    }
575
576    #[test]
577    fn test_gzip_compress_patch_round_trip_no_dictionary() {
578        // Round-trip with dictionary compression disabled so the gzip layer
579        // operates on the raw serialized patch bytes.
580        let mut patch = RdfPatch::new();
581        patch.add_operation(PatchOperation::Add {
582            subject: "http://example.org/s".to_string(),
583            predicate: "http://example.org/p".to_string(),
584            object: "\"literal object value\"".to_string(),
585        });
586
587        let compressor = PatchCompressor::new().with_dictionary_compression(false);
588        let compressed = compressor
589            .compress_patch(&patch)
590            .expect("gzip compress_patch no-dict");
591        let restored = compressor
592            .decompress_patch(&compressed)
593            .expect("gzip decompress_patch no-dict");
594
595        assert_eq!(patch.operations.len(), restored.operations.len());
596    }
597
598    #[test]
599    fn test_patch_application() {
600        let mut patch = RdfPatch::new();
601        patch.add_operation(PatchOperation::Add {
602            subject: "http://example.org/s".to_string(),
603            predicate: "http://example.org/p".to_string(),
604            object: "http://example.org/o".to_string(),
605        });
606
607        let context = PatchContext {
608            strict_mode: false,
609            validate_operations: true,
610            dry_run: true,
611        };
612
613        let result = apply_patch_with_context(&patch, &context).unwrap();
614
615        assert_eq!(result.total_operations, 1);
616        assert_eq!(result.operations_applied, 1);
617        assert!(result.is_success());
618        assert_eq!(result.success_rate(), 1.0);
619    }
620}