Skip to main content

oxirs_core/storage/
compression.rs

1//! Advanced compression for RDF data
2//!
3//! This module provides compression algorithms optimized for RDF data,
4//! including custom RDF-specific compression techniques.
5
6use crate::OxirsError;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Maximum decompression output size (100MB) for LZ4 safety limit
11const LZ4_MAX_DECOMPRESS_SIZE: usize = 100 * 1024 * 1024;
12
13/// Compression algorithm
14#[derive(Debug, Clone)]
15pub enum Algorithm {
16    /// No compression
17    None,
18    /// LZ4 compression (fast)
19    Lz4 { level: u32 },
20    /// Zstandard compression (high ratio)
21    Zstd { level: i32 },
22    /// Custom RDF compression
23    RdfCustom { options: RdfCompressionOptions },
24}
25
26/// RDF-specific compression options
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct RdfCompressionOptions {
29    /// Use dictionary compression for URIs
30    pub uri_dictionary: bool,
31    /// Use prefix compression
32    pub prefix_compression: bool,
33    /// Dictionary size limit
34    pub dictionary_size: usize,
35    /// Use datatype-specific compression
36    pub datatype_compression: bool,
37}
38
39impl Default for RdfCompressionOptions {
40    fn default() -> Self {
41        RdfCompressionOptions {
42            uri_dictionary: true,
43            prefix_compression: true,
44            dictionary_size: 16384,
45            datatype_compression: true,
46        }
47    }
48}
49
50/// Compression result
51#[derive(Debug, Clone)]
52pub struct CompressionResult {
53    /// Compressed data
54    pub data: Vec<u8>,
55    /// Original size
56    pub original_size: usize,
57    /// Compressed size
58    pub compressed_size: usize,
59    /// Compression ratio
60    pub ratio: f64,
61    /// Algorithm used
62    pub algorithm: String,
63    /// Compression time in microseconds
64    pub compression_time_us: u64,
65}
66
67/// RDF compressor
68pub struct Compressor {
69    algorithm: Algorithm,
70    uri_dictionary: Option<UriDictionary>,
71    stats: CompressionStats,
72}
73
74/// URI dictionary for compression
75struct UriDictionary {
76    /// URI to ID mapping
77    uri_to_id: HashMap<String, u32>,
78    /// ID to URI mapping
79    id_to_uri: HashMap<u32, String>,
80    /// Next available ID
81    next_id: u32,
82    /// Common prefixes
83    prefixes: Vec<(String, String)>,
84}
85
86/// Compression statistics
87#[derive(Debug, Default)]
88struct CompressionStats {
89    total_compressed: u64,
90    total_original: u64,
91    compression_count: u64,
92    total_time_us: u64,
93}
94
95impl Compressor {
96    /// Create a new compressor
97    pub fn new(algorithm: Algorithm) -> Self {
98        let uri_dictionary = match &algorithm {
99            Algorithm::RdfCustom { options } if options.uri_dictionary => {
100                Some(UriDictionary::new())
101            }
102            _ => None,
103        };
104
105        Compressor {
106            algorithm,
107            uri_dictionary,
108            stats: CompressionStats::default(),
109        }
110    }
111
112    /// Compress data
113    pub fn compress(&mut self, data: &[u8]) -> Result<CompressionResult, OxirsError> {
114        let start = std::time::Instant::now();
115        let original_size = data.len();
116
117        let (compressed, algorithm_name) = match self.algorithm.clone() {
118            Algorithm::None => (data.to_vec(), "none"),
119            Algorithm::Lz4 { level } => {
120                let compressed = self.compress_lz4(data, level)?;
121                (compressed, "lz4")
122            }
123            Algorithm::Zstd { level } => {
124                let compressed = self.compress_zstd(data, level)?;
125                (compressed, "zstd")
126            }
127            Algorithm::RdfCustom { options } => {
128                let compressed = self.compress_rdf_custom(data, &options)?;
129                (compressed, "rdf_custom")
130            }
131        };
132
133        let compressed_size = compressed.len();
134        let ratio = original_size as f64 / compressed_size as f64;
135        let compression_time_us = start.elapsed().as_micros() as u64;
136
137        // Update stats
138        self.stats.total_original += original_size as u64;
139        self.stats.total_compressed += compressed_size as u64;
140        self.stats.compression_count += 1;
141        self.stats.total_time_us += compression_time_us;
142
143        Ok(CompressionResult {
144            data: compressed,
145            original_size,
146            compressed_size,
147            ratio,
148            algorithm: algorithm_name.to_string(),
149            compression_time_us,
150        })
151    }
152
153    /// Decompress data
154    pub fn decompress(&mut self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
155        match &self.algorithm {
156            Algorithm::None => Ok(data.to_vec()),
157            Algorithm::Lz4 { .. } => self.decompress_lz4(data),
158            Algorithm::Zstd { .. } => self.decompress_zstd(data),
159            Algorithm::RdfCustom { .. } => self.decompress_rdf_custom(data),
160        }
161    }
162
163    /// Compress using LZ4
164    fn compress_lz4(&self, data: &[u8], _level: u32) -> Result<Vec<u8>, OxirsError> {
165        oxiarc_lz4::compress(data)
166            .map_err(|e| OxirsError::Io(format!("LZ4 compression failed: {}", e)))
167    }
168
169    /// Decompress LZ4
170    fn decompress_lz4(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
171        oxiarc_lz4::decompress(data, LZ4_MAX_DECOMPRESS_SIZE)
172            .map_err(|e| OxirsError::Io(format!("LZ4 decompression failed: {}", e)))
173    }
174
175    /// Compress using Zstandard
176    fn compress_zstd(&self, data: &[u8], level: i32) -> Result<Vec<u8>, OxirsError> {
177        oxiarc_zstd::encode_all(data, level)
178            .map_err(|e| OxirsError::Io(format!("Zstd compression failed: {}", e)))
179    }
180
181    /// Decompress Zstandard
182    fn decompress_zstd(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
183        oxiarc_zstd::decode_all(data)
184            .map_err(|e| OxirsError::Io(format!("Zstd decompression failed: {}", e)))
185    }
186
187    /// Custom RDF compression
188    fn compress_rdf_custom(
189        &mut self,
190        data: &[u8],
191        options: &RdfCompressionOptions,
192    ) -> Result<Vec<u8>, OxirsError> {
193        // Parse RDF data
194        let rdf_data = String::from_utf8_lossy(data);
195
196        // Build compressed representation
197        let mut compressed = RdfCompressedData {
198            version: 1,
199            options: options.clone(),
200            dictionary: Vec::new(),
201            triples: Vec::new(),
202        };
203
204        // If using URI dictionary, build it
205        if options.uri_dictionary {
206            if let Some(dict) = &mut self.uri_dictionary {
207                // Extract URIs and build dictionary
208                // This is a simplified implementation
209                for line in rdf_data.lines() {
210                    if let Some(uri) = extract_uri(line) {
211                        dict.add_uri(uri);
212                    }
213                }
214
215                // Store dictionary in compressed data
216                compressed.dictionary = dict.export();
217            }
218        }
219
220        // Compress triples using dictionary
221        // This is a placeholder - real implementation would parse and compress properly
222        compressed.triples = data.to_vec();
223
224        // Apply secondary compression
225        let serialized = oxicode::serde::encode_to_vec(&compressed, oxicode::config::standard())?;
226        self.compress_zstd(&serialized, 3)
227    }
228
229    /// Decompress custom RDF format
230    fn decompress_rdf_custom(&mut self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
231        // Decompress outer layer
232        let decompressed = self.decompress_zstd(data)?;
233
234        // Deserialize
235        let compressed: RdfCompressedData =
236            oxicode::serde::decode_from_slice(&decompressed, oxicode::config::standard())
237                .map(|(v, _)| v)?;
238
239        // Restore dictionary
240        if compressed.options.uri_dictionary && !compressed.dictionary.is_empty() {
241            if let Some(dict) = &mut self.uri_dictionary {
242                dict.import(compressed.dictionary);
243            }
244        }
245
246        // Decompress triples
247        // This is a placeholder - real implementation would properly reconstruct
248        Ok(compressed.triples)
249    }
250
251    /// Get compression statistics
252    pub fn stats(&self) -> CompressionStatsSummary {
253        CompressionStatsSummary {
254            total_compressed_mb: self.stats.total_compressed as f64 / 1_048_576.0,
255            total_original_mb: self.stats.total_original as f64 / 1_048_576.0,
256            average_ratio: if self.stats.total_compressed > 0 {
257                self.stats.total_original as f64 / self.stats.total_compressed as f64
258            } else {
259                1.0
260            },
261            compression_count: self.stats.compression_count,
262            avg_time_us: self
263                .stats
264                .total_time_us
265                .checked_div(self.stats.compression_count)
266                .unwrap_or(0),
267        }
268    }
269}
270
271/// Compression statistics summary
272#[derive(Debug, Clone)]
273pub struct CompressionStatsSummary {
274    pub total_compressed_mb: f64,
275    pub total_original_mb: f64,
276    pub average_ratio: f64,
277    pub compression_count: u64,
278    pub avg_time_us: u64,
279}
280
281/// Compressed RDF data format
282#[derive(Debug, Serialize, Deserialize)]
283struct RdfCompressedData {
284    version: u32,
285    options: RdfCompressionOptions,
286    dictionary: Vec<(String, u32)>,
287    triples: Vec<u8>,
288}
289
290impl UriDictionary {
291    fn new() -> Self {
292        let mut dict = UriDictionary {
293            uri_to_id: HashMap::new(),
294            id_to_uri: HashMap::new(),
295            next_id: 0,
296            prefixes: Vec::new(),
297        };
298
299        // Add common RDF prefixes
300        dict.add_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#");
301        dict.add_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#");
302        dict.add_prefix("xsd", "http://www.w3.org/2001/XMLSchema#");
303        dict.add_prefix("owl", "http://www.w3.org/2002/07/owl#");
304
305        dict
306    }
307
308    fn add_prefix(&mut self, prefix: &str, uri: &str) {
309        self.prefixes.push((prefix.to_string(), uri.to_string()));
310    }
311
312    fn add_uri(&mut self, uri: &str) -> u32 {
313        if let Some(&id) = self.uri_to_id.get(uri) {
314            return id;
315        }
316
317        let id = self.next_id;
318        self.uri_to_id.insert(uri.to_string(), id);
319        self.id_to_uri.insert(id, uri.to_string());
320        self.next_id += 1;
321        id
322    }
323
324    #[allow(dead_code)]
325    fn get_uri(&self, id: u32) -> Option<&str> {
326        self.id_to_uri.get(&id).map(|s| s.as_str())
327    }
328
329    fn export(&self) -> Vec<(String, u32)> {
330        self.uri_to_id
331            .iter()
332            .map(|(uri, id)| (uri.clone(), *id))
333            .collect()
334    }
335
336    fn import(&mut self, data: Vec<(String, u32)>) {
337        self.uri_to_id.clear();
338        self.id_to_uri.clear();
339
340        for (uri, id) in data {
341            self.uri_to_id.insert(uri.clone(), id);
342            self.id_to_uri.insert(id, uri);
343            self.next_id = self.next_id.max(id + 1);
344        }
345    }
346}
347
348/// Extract URI from a line (simplified)
349fn extract_uri(line: &str) -> Option<&str> {
350    if let Some(start) = line.find('<') {
351        if let Some(end) = line[start..].find('>') {
352            return Some(&line[start + 1..start + end]);
353        }
354    }
355    None
356}
357
358/// Compress RDF data using the specified algorithm
359pub async fn compress_rdf(
360    data: &[u8],
361    algorithm: Algorithm,
362) -> Result<CompressionResult, OxirsError> {
363    let mut compressor = Compressor::new(algorithm);
364    compressor.compress(data)
365}
366
367/// Decompress RDF data
368pub async fn decompress_rdf(data: &[u8], algorithm: Algorithm) -> Result<Vec<u8>, OxirsError> {
369    let mut compressor = Compressor::new(algorithm);
370    compressor.decompress(data)
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn test_lz4_compression() {
379        let data = b"<http://example.org/s> <http://example.org/p> <http://example.org/o> .";
380        let mut compressor = Compressor::new(Algorithm::Lz4 { level: 1 });
381
382        let result = compressor
383            .compress(data)
384            .expect("compression should succeed");
385        assert!(result.compressed_size < result.original_size);
386
387        let decompressed = compressor
388            .decompress(&result.data)
389            .expect("decompression should succeed");
390        assert_eq!(decompressed, data);
391    }
392
393    #[test]
394    fn test_zstd_compression() {
395        let triple = b"<http://example.org/s> <http://example.org/p> \"literal value\" .\n";
396        let data: Vec<u8> = triple
397            .iter()
398            .copied()
399            .cycle()
400            .take(triple.len() * 50)
401            .collect();
402        let mut compressor = Compressor::new(Algorithm::Zstd { level: 3 });
403
404        let result = compressor.compress(&data).expect("zstd compression failed");
405        assert!(result.compressed_size < result.original_size);
406
407        let decompressed = compressor
408            .decompress(&result.data)
409            .expect("zstd decompression failed");
410        assert_eq!(decompressed, data);
411    }
412
413    #[test]
414    fn test_uri_dictionary() {
415        let mut dict = UriDictionary::new();
416
417        let id1 = dict.add_uri("http://example.org/test");
418        let id2 = dict.add_uri("http://example.org/test");
419        assert_eq!(id1, id2);
420
421        let id3 = dict.add_uri("http://example.org/other");
422        assert_ne!(id1, id3);
423
424        assert_eq!(dict.get_uri(id1), Some("http://example.org/test"));
425    }
426}