1use super::{PatchParser, PatchSerializer};
4use crate::{PatchOperation, RdfPatch};
5use anyhow::Result;
6use flate2::{read::GzDecoder, write::GzEncoder, Compression};
7use std::collections::HashMap;
8use std::io::{Read, Write};
9use tracing::info;
10
11pub struct PatchCompressor {
12 compression_level: u32,
13 enable_dictionary: bool,
14 prefix_compression: bool,
15}
16
17impl PatchCompressor {
18 pub fn new() -> Self {
19 Self {
20 compression_level: 6,
21 enable_dictionary: true,
22 prefix_compression: true,
23 }
24 }
25
26 pub fn with_compression_level(mut self, level: u32) -> Self {
27 self.compression_level = level.min(9);
28 self
29 }
30
31 pub fn with_dictionary_compression(mut self, enabled: bool) -> Self {
32 self.enable_dictionary = enabled;
33 self
34 }
35
36 pub fn with_prefix_compression(mut self, enabled: bool) -> Self {
37 self.prefix_compression = enabled;
38 self
39 }
40
41 pub fn compress_patch(&self, patch: &RdfPatch) -> Result<Vec<u8>> {
43 let serializer = PatchSerializer::new().with_pretty_print(false);
45 let patch_str = serializer.serialize(patch)?;
46 let original_len = patch_str.len();
47
48 let optimized_str = if self.enable_dictionary {
50 self.apply_dictionary_compression(&patch_str)?
51 } else {
52 patch_str
53 };
54
55 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.compression_level));
57 encoder.write_all(optimized_str.as_bytes())?;
58 let compressed = encoder.finish()?;
59
60 info!(
61 "Compressed patch: {} -> {} bytes ({:.1}% reduction)",
62 original_len,
63 compressed.len(),
64 (1.0 - compressed.len() as f64 / original_len as f64) * 100.0
65 );
66
67 Ok(compressed)
68 }
69
70 pub fn decompress_patch(&self, compressed_data: &[u8]) -> Result<RdfPatch> {
72 let mut decoder = GzDecoder::new(compressed_data);
74 let mut decompressed = String::new();
75 decoder.read_to_string(&mut decompressed)?;
76
77 let patch_str = if self.enable_dictionary {
79 self.apply_dictionary_decompression(&decompressed)?
80 } else {
81 decompressed
82 };
83
84 let mut parser = PatchParser::new();
86 parser.parse(&patch_str)
87 }
88
89 fn apply_dictionary_compression(&self, patch_str: &str) -> Result<String> {
90 let mut word_freq = HashMap::new();
92 for word in patch_str.split_whitespace() {
93 *word_freq.entry(word.to_string()).or_insert(0) += 1;
94 }
95
96 let mut freq_words: Vec<_> = word_freq.into_iter().collect();
98 freq_words.sort_by(|a, b| b.1.cmp(&a.1));
99
100 let mut dictionary = HashMap::new();
101 let mut compressed = patch_str.to_string();
102
103 for (i, (word, freq)) in freq_words.iter().take(256).enumerate() {
105 if word.len() > 3 && *freq > 2 {
106 let code = format!("#{i:02x}");
107 dictionary.insert(code.clone(), word.clone());
108 compressed = compressed.replace(word, &code);
109 }
110 }
111
112 let mut dict_header = String::new();
114 for (code, word) in dictionary {
115 dict_header.push_str(&format!("{code}={word}\n"));
116 }
117 dict_header.push_str("---\n");
118 dict_header.push_str(&compressed);
119
120 Ok(dict_header)
121 }
122
123 fn apply_dictionary_decompression(&self, compressed_str: &str) -> Result<String> {
124 if let Some(separator_pos) = compressed_str.find("---\n") {
125 let (dict_part, content_part) = compressed_str.split_at(separator_pos);
126 let content = &content_part[4..]; let mut dictionary = HashMap::new();
129 for line in dict_part.lines() {
130 if let Some(eq_pos) = line.find('=') {
131 let code = &line[..eq_pos];
132 let word = &line[eq_pos + 1..];
133 dictionary.insert(code, word);
134 }
135 }
136
137 let mut decompressed = content.to_string();
138 for (code, word) in dictionary {
139 decompressed = decompressed.replace(code, word);
140 }
141
142 Ok(decompressed)
143 } else {
144 Ok(compressed_str.to_string())
145 }
146 }
147
148 pub fn compress_with_prefixes(&self, patch: &RdfPatch) -> Result<RdfPatch> {
150 let mut compressed = patch.clone();
151 compressed.id = format!("{}-prefix-compressed", patch.id);
152
153 if !self.prefix_compression {
154 return Ok(compressed);
155 }
156
157 let mut prefix_freq = HashMap::new();
159 for operation in &patch.operations {
160 self.collect_uris_from_operation(operation, &mut prefix_freq);
161 }
162
163 let mut common_prefixes = HashMap::new();
165 for (uri, freq) in prefix_freq {
166 if freq > 2 {
167 if let Some(prefix) = self.extract_namespace_prefix(&uri) {
168 if prefix.len() > 10 {
169 let short_prefix = format!("ns{}", common_prefixes.len());
170 common_prefixes.insert(prefix, short_prefix);
171 }
172 }
173 }
174 }
175
176 for (namespace, prefix) in &common_prefixes {
178 compressed.add_operation(PatchOperation::AddPrefix {
179 prefix: prefix.clone(),
180 namespace: namespace.clone(),
181 });
182 }
183
184 for operation in &mut compressed.operations {
186 self.apply_prefix_compression_to_operation(operation, &common_prefixes);
187 }
188
189 info!(
190 "Applied prefix compression: {} prefixes defined",
191 common_prefixes.len()
192 );
193 Ok(compressed)
194 }
195
196 fn collect_uris_from_operation(
197 &self,
198 operation: &PatchOperation,
199 prefix_freq: &mut HashMap<String, usize>,
200 ) {
201 match operation {
202 PatchOperation::Add {
203 subject,
204 predicate,
205 object,
206 } => {
207 *prefix_freq.entry(subject.clone()).or_insert(0) += 1;
208 *prefix_freq.entry(predicate.clone()).or_insert(0) += 1;
209 *prefix_freq.entry(object.clone()).or_insert(0) += 1;
210 }
211 PatchOperation::Delete {
212 subject,
213 predicate,
214 object,
215 } => {
216 *prefix_freq.entry(subject.clone()).or_insert(0) += 1;
217 *prefix_freq.entry(predicate.clone()).or_insert(0) += 1;
218 *prefix_freq.entry(object.clone()).or_insert(0) += 1;
219 }
220 PatchOperation::AddGraph { graph } | PatchOperation::DeleteGraph { graph } => {
221 *prefix_freq.entry(graph.clone()).or_insert(0) += 1;
222 }
223 _ => {}
224 }
225 }
226
227 fn extract_namespace_prefix(&self, uri: &str) -> Option<String> {
228 if let Some(pos) = uri.rfind('#') {
230 Some(uri[..pos + 1].to_string())
231 } else {
232 uri.rfind('/').map(|pos| uri[..pos + 1].to_string())
233 }
234 }
235
236 fn apply_prefix_compression_to_operation(
237 &self,
238 operation: &mut PatchOperation,
239 prefixes: &HashMap<String, String>,
240 ) {
241 match operation {
242 PatchOperation::Add {
243 subject,
244 predicate,
245 object,
246 } => {
247 *subject = self.compress_uri_with_prefixes(subject, prefixes);
248 *predicate = self.compress_uri_with_prefixes(predicate, prefixes);
249 *object = self.compress_uri_with_prefixes(object, prefixes);
250 }
251 PatchOperation::Delete {
252 subject,
253 predicate,
254 object,
255 } => {
256 *subject = self.compress_uri_with_prefixes(subject, prefixes);
257 *predicate = self.compress_uri_with_prefixes(predicate, prefixes);
258 *object = self.compress_uri_with_prefixes(object, prefixes);
259 }
260 PatchOperation::AddGraph { graph } | PatchOperation::DeleteGraph { graph } => {
261 *graph = self.compress_uri_with_prefixes(graph, prefixes);
262 }
263 _ => {}
264 }
265 }
266
267 fn compress_uri_with_prefixes(&self, uri: &str, prefixes: &HashMap<String, String>) -> String {
268 for (namespace, prefix) in prefixes {
269 if uri.starts_with(namespace) {
270 let local_name = &uri[namespace.len()..];
271 return format!("{prefix}:{local_name}");
272 }
273 }
274 uri.to_string()
275 }
276}
277
278impl Default for PatchCompressor {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::patch::context::{apply_patch_with_context, PatchContext};
288 use crate::patch::result::{
289 create_reverse_patch, create_transactional_patch, optimize_patch, validate_patch,
290 };
291
292 #[test]
293 fn test_patch_serialization() {
294 let mut patch = RdfPatch::new();
295 patch.add_operation(PatchOperation::Header {
296 key: "creator".to_string(),
297 value: "test-suite".to_string(),
298 });
299 patch.add_operation(PatchOperation::TransactionBegin {
300 transaction_id: Some("tx-123".to_string()),
301 });
302 patch.add_operation(PatchOperation::AddPrefix {
303 prefix: "ex".to_string(),
304 namespace: "http://example.org/".to_string(),
305 });
306 patch.add_operation(PatchOperation::Add {
307 subject: "http://example.org/subject".to_string(),
308 predicate: "http://example.org/predicate".to_string(),
309 object: "\"Object literal\"".to_string(),
310 });
311 patch.add_operation(PatchOperation::Delete {
312 subject: "http://example.org/subject2".to_string(),
313 predicate: "http://example.org/predicate2".to_string(),
314 object: "http://example.org/object2".to_string(),
315 });
316 patch.add_operation(PatchOperation::TransactionCommit);
317
318 let serializer = PatchSerializer::new();
319 let result = serializer.serialize(&patch);
320
321 assert!(result.is_ok());
322 let serialized = result.unwrap();
323 assert!(serialized.contains("H creator test-suite"));
324 assert!(serialized.contains("TX tx-123"));
325 assert!(serialized.contains("PA ex:"));
326 assert!(serialized.contains("A "));
327 assert!(serialized.contains("D "));
328 assert!(serialized.contains("TC"));
329 assert!(serialized.contains("@prefix"));
330 }
331
332 #[test]
333 fn test_patch_parsing() {
334 let patch_content = r#"
335@prefix ex: <http://example.org/> .
336@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
337
338H creator test-parser .
339TX tx-456 .
340PA ex2: <http://example2.org/> .
341A ex:subject ex:predicate "Object literal" .
342D ex:subject2 ex:predicate2 ex:object2 .
343GA ex:graph1 .
344GD ex:graph2 .
345PD old: .
346TC .
347"#;
348
349 let mut parser = PatchParser::new();
350 let result = parser.parse(patch_content);
351
352 assert!(result.is_ok());
353 let patch = result.unwrap();
354 assert_eq!(patch.operations.len(), 9);
355
356 assert_eq!(
358 patch.headers.get("creator"),
359 Some(&"test-parser".to_string())
360 );
361
362 assert_eq!(patch.transaction_id, Some("tx-456".to_string()));
364
365 assert_eq!(
367 patch.prefixes.get("ex2"),
368 Some(&"http://example2.org/".to_string())
369 );
370
371 match &patch.operations[0] {
372 PatchOperation::Header { key, value } => {
373 assert_eq!(key, "creator");
374 assert_eq!(value, "test-parser");
375 }
376 _ => panic!("Expected Header operation"),
377 }
378
379 match &patch.operations[1] {
380 PatchOperation::TransactionBegin { transaction_id } => {
381 assert_eq!(transaction_id, &Some("tx-456".to_string()));
382 }
383 _ => panic!("Expected TransactionBegin operation"),
384 }
385 }
386
387 #[test]
388 fn test_patch_round_trip() {
389 let mut original_patch = RdfPatch::new();
390 original_patch.add_operation(PatchOperation::Add {
391 subject: "http://example.org/subject".to_string(),
392 predicate: "http://example.org/predicate".to_string(),
393 object: "\"Test object\"".to_string(),
394 });
395
396 let serialized = original_patch.to_rdf_patch_format().unwrap();
398
399 let parsed_patch = RdfPatch::from_rdf_patch_format(&serialized).unwrap();
401
402 assert_eq!(
404 original_patch.operations.len(),
405 parsed_patch.operations.len()
406 );
407
408 match (&original_patch.operations[0], &parsed_patch.operations[0]) {
409 (
410 PatchOperation::Add {
411 subject: s1,
412 predicate: p1,
413 object: o1,
414 },
415 PatchOperation::Add {
416 subject: s2,
417 predicate: p2,
418 object: o2,
419 },
420 ) => {
421 assert_eq!(s1, s2);
422 assert_eq!(p1, p2);
423 assert_eq!(o1, o2);
424 }
425 _ => panic!("Operations don't match"),
426 }
427 }
428
429 #[test]
430 fn test_reverse_patch() {
431 let mut patch = RdfPatch::new();
432 patch.add_operation(PatchOperation::TransactionBegin {
433 transaction_id: Some("tx-789".to_string()),
434 });
435 patch.add_operation(PatchOperation::Add {
436 subject: "http://example.org/s".to_string(),
437 predicate: "http://example.org/p".to_string(),
438 object: "http://example.org/o".to_string(),
439 });
440 patch.add_operation(PatchOperation::AddGraph {
441 graph: "http://example.org/graph".to_string(),
442 });
443 patch.add_operation(PatchOperation::TransactionCommit);
444 patch.transaction_id = Some("tx-789".to_string());
445
446 let reverse = create_reverse_patch(&patch).unwrap();
447
448 assert!(reverse.operations.len() >= 4);
450
451 match &reverse.operations[0] {
453 PatchOperation::TransactionBegin { .. } => {}
454 _ => panic!("Expected TransactionBegin operation"),
455 }
456
457 let has_delete_graph = reverse.operations.iter().any(|op| {
459 matches!(op, PatchOperation::DeleteGraph { graph } if graph == "http://example.org/graph")
460 });
461 let has_delete_triple = reverse.operations.iter().any(|op| {
462 matches!(op, PatchOperation::Delete { subject, .. } if subject == "http://example.org/s")
463 });
464
465 assert!(has_delete_graph);
466 assert!(has_delete_triple);
467
468 match reverse.operations.last() {
470 Some(PatchOperation::TransactionCommit) => {}
471 _ => panic!("Expected TransactionCommit as last operation"),
472 }
473 }
474
475 #[test]
476 fn test_patch_optimization() {
477 let mut patch = RdfPatch::new();
478 let operation = PatchOperation::Add {
479 subject: "http://example.org/s".to_string(),
480 predicate: "http://example.org/p".to_string(),
481 object: "http://example.org/o".to_string(),
482 };
483
484 patch.add_operation(operation.clone());
486 patch.add_operation(operation);
487
488 let optimized = optimize_patch(&patch).unwrap();
489
490 assert_eq!(optimized.operations.len(), 1);
492 }
493
494 #[test]
495 fn test_transactional_patch() {
496 let operations = vec![
497 PatchOperation::Add {
498 subject: "s1".to_string(),
499 predicate: "p1".to_string(),
500 object: "o1".to_string(),
501 },
502 PatchOperation::Delete {
503 subject: "s2".to_string(),
504 predicate: "p2".to_string(),
505 object: "o2".to_string(),
506 },
507 ];
508
509 let patch = create_transactional_patch(operations);
510
511 assert_eq!(patch.operations.len(), 4);
513
514 assert!(matches!(
516 &patch.operations[0],
517 PatchOperation::TransactionBegin { .. }
518 ));
519
520 assert!(matches!(
522 &patch.operations[3],
523 PatchOperation::TransactionCommit
524 ));
525
526 assert!(patch.transaction_id.is_some());
528 }
529
530 #[test]
531 fn test_patch_validation() {
532 let mut patch = RdfPatch::new();
533 patch.add_operation(PatchOperation::Delete {
534 subject: "http://example.org/s".to_string(),
535 predicate: "http://example.org/p".to_string(),
536 object: "http://example.org/o".to_string(),
537 });
538
539 let warnings = validate_patch(&patch).unwrap();
540
541 assert!(!warnings.is_empty());
543 assert!(warnings[0].contains("deleted without prior addition"));
544 }
545
546 #[test]
547 fn test_patch_application() {
548 let mut patch = RdfPatch::new();
549 patch.add_operation(PatchOperation::Add {
550 subject: "http://example.org/s".to_string(),
551 predicate: "http://example.org/p".to_string(),
552 object: "http://example.org/o".to_string(),
553 });
554
555 let context = PatchContext {
556 strict_mode: false,
557 validate_operations: true,
558 dry_run: true,
559 };
560
561 let result = apply_patch_with_context(&patch, &context).unwrap();
562
563 assert_eq!(result.total_operations, 1);
564 assert_eq!(result.operations_applied, 1);
565 assert!(result.is_success());
566 assert_eq!(result.success_rate(), 1.0);
567 }
568}