1use crate::OxirsError;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10const LZ4_MAX_DECOMPRESS_SIZE: usize = 100 * 1024 * 1024;
12
13#[derive(Debug, Clone)]
15pub enum Algorithm {
16 None,
18 Lz4 { level: u32 },
20 Zstd { level: i32 },
22 RdfCustom { options: RdfCompressionOptions },
24}
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct RdfCompressionOptions {
29 pub uri_dictionary: bool,
31 pub prefix_compression: bool,
33 pub dictionary_size: usize,
35 pub datatype_compression: bool,
37}
38
39impl Default for RdfCompressionOptions {
40 fn default() -> Self {
41 RdfCompressionOptions {
42 uri_dictionary: true,
43 prefix_compression: true,
44 dictionary_size: 16384,
45 datatype_compression: true,
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct CompressionResult {
53 pub data: Vec<u8>,
55 pub original_size: usize,
57 pub compressed_size: usize,
59 pub ratio: f64,
61 pub algorithm: String,
63 pub compression_time_us: u64,
65}
66
67pub struct Compressor {
69 algorithm: Algorithm,
70 uri_dictionary: Option<UriDictionary>,
71 stats: CompressionStats,
72}
73
74struct UriDictionary {
76 uri_to_id: HashMap<String, u32>,
78 id_to_uri: HashMap<u32, String>,
80 next_id: u32,
82 prefixes: Vec<(String, String)>,
84}
85
86#[derive(Debug, Default)]
88struct CompressionStats {
89 total_compressed: u64,
90 total_original: u64,
91 compression_count: u64,
92 total_time_us: u64,
93}
94
95impl Compressor {
96 pub fn new(algorithm: Algorithm) -> Self {
98 let uri_dictionary = match &algorithm {
99 Algorithm::RdfCustom { options } if options.uri_dictionary => {
100 Some(UriDictionary::new())
101 }
102 _ => None,
103 };
104
105 Compressor {
106 algorithm,
107 uri_dictionary,
108 stats: CompressionStats::default(),
109 }
110 }
111
112 pub fn compress(&mut self, data: &[u8]) -> Result<CompressionResult, OxirsError> {
114 let start = std::time::Instant::now();
115 let original_size = data.len();
116
117 let (compressed, algorithm_name) = match self.algorithm.clone() {
118 Algorithm::None => (data.to_vec(), "none"),
119 Algorithm::Lz4 { level } => {
120 let compressed = self.compress_lz4(data, level)?;
121 (compressed, "lz4")
122 }
123 Algorithm::Zstd { level } => {
124 let compressed = self.compress_zstd(data, level)?;
125 (compressed, "zstd")
126 }
127 Algorithm::RdfCustom { options } => {
128 let compressed = self.compress_rdf_custom(data, &options)?;
129 (compressed, "rdf_custom")
130 }
131 };
132
133 let compressed_size = compressed.len();
134 let ratio = original_size as f64 / compressed_size as f64;
135 let compression_time_us = start.elapsed().as_micros() as u64;
136
137 self.stats.total_original += original_size as u64;
139 self.stats.total_compressed += compressed_size as u64;
140 self.stats.compression_count += 1;
141 self.stats.total_time_us += compression_time_us;
142
143 Ok(CompressionResult {
144 data: compressed,
145 original_size,
146 compressed_size,
147 ratio,
148 algorithm: algorithm_name.to_string(),
149 compression_time_us,
150 })
151 }
152
153 pub fn decompress(&mut self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
155 match &self.algorithm {
156 Algorithm::None => Ok(data.to_vec()),
157 Algorithm::Lz4 { .. } => self.decompress_lz4(data),
158 Algorithm::Zstd { .. } => self.decompress_zstd(data),
159 Algorithm::RdfCustom { .. } => self.decompress_rdf_custom(data),
160 }
161 }
162
163 fn compress_lz4(&self, data: &[u8], _level: u32) -> Result<Vec<u8>, OxirsError> {
165 oxiarc_lz4::compress(data)
166 .map_err(|e| OxirsError::Io(format!("LZ4 compression failed: {}", e)))
167 }
168
169 fn decompress_lz4(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
171 oxiarc_lz4::decompress(data, LZ4_MAX_DECOMPRESS_SIZE)
172 .map_err(|e| OxirsError::Io(format!("LZ4 decompression failed: {}", e)))
173 }
174
175 fn compress_zstd(&self, data: &[u8], level: i32) -> Result<Vec<u8>, OxirsError> {
177 oxiarc_zstd::encode_all(data, level)
178 .map_err(|e| OxirsError::Io(format!("Zstd compression failed: {}", e)))
179 }
180
181 fn decompress_zstd(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
183 oxiarc_zstd::decode_all(data)
184 .map_err(|e| OxirsError::Io(format!("Zstd decompression failed: {}", e)))
185 }
186
187 fn compress_rdf_custom(
189 &mut self,
190 data: &[u8],
191 options: &RdfCompressionOptions,
192 ) -> Result<Vec<u8>, OxirsError> {
193 let rdf_data = String::from_utf8_lossy(data);
195
196 let mut compressed = RdfCompressedData {
198 version: 1,
199 options: options.clone(),
200 dictionary: Vec::new(),
201 triples: Vec::new(),
202 };
203
204 if options.uri_dictionary {
206 if let Some(dict) = &mut self.uri_dictionary {
207 for line in rdf_data.lines() {
210 if let Some(uri) = extract_uri(line) {
211 dict.add_uri(uri);
212 }
213 }
214
215 compressed.dictionary = dict.export();
217 }
218 }
219
220 compressed.triples = data.to_vec();
223
224 let serialized = oxicode::serde::encode_to_vec(&compressed, oxicode::config::standard())?;
226 self.compress_zstd(&serialized, 3)
227 }
228
229 fn decompress_rdf_custom(&mut self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
231 let decompressed = self.decompress_zstd(data)?;
233
234 let compressed: RdfCompressedData =
236 oxicode::serde::decode_from_slice(&decompressed, oxicode::config::standard())
237 .map(|(v, _)| v)?;
238
239 if compressed.options.uri_dictionary && !compressed.dictionary.is_empty() {
241 if let Some(dict) = &mut self.uri_dictionary {
242 dict.import(compressed.dictionary);
243 }
244 }
245
246 Ok(compressed.triples)
249 }
250
251 pub fn stats(&self) -> CompressionStatsSummary {
253 CompressionStatsSummary {
254 total_compressed_mb: self.stats.total_compressed as f64 / 1_048_576.0,
255 total_original_mb: self.stats.total_original as f64 / 1_048_576.0,
256 average_ratio: if self.stats.total_compressed > 0 {
257 self.stats.total_original as f64 / self.stats.total_compressed as f64
258 } else {
259 1.0
260 },
261 compression_count: self.stats.compression_count,
262 avg_time_us: self
263 .stats
264 .total_time_us
265 .checked_div(self.stats.compression_count)
266 .unwrap_or(0),
267 }
268 }
269}
270
271#[derive(Debug, Clone)]
273pub struct CompressionStatsSummary {
274 pub total_compressed_mb: f64,
275 pub total_original_mb: f64,
276 pub average_ratio: f64,
277 pub compression_count: u64,
278 pub avg_time_us: u64,
279}
280
281#[derive(Debug, Serialize, Deserialize)]
283struct RdfCompressedData {
284 version: u32,
285 options: RdfCompressionOptions,
286 dictionary: Vec<(String, u32)>,
287 triples: Vec<u8>,
288}
289
290impl UriDictionary {
291 fn new() -> Self {
292 let mut dict = UriDictionary {
293 uri_to_id: HashMap::new(),
294 id_to_uri: HashMap::new(),
295 next_id: 0,
296 prefixes: Vec::new(),
297 };
298
299 dict.add_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#");
301 dict.add_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#");
302 dict.add_prefix("xsd", "http://www.w3.org/2001/XMLSchema#");
303 dict.add_prefix("owl", "http://www.w3.org/2002/07/owl#");
304
305 dict
306 }
307
308 fn add_prefix(&mut self, prefix: &str, uri: &str) {
309 self.prefixes.push((prefix.to_string(), uri.to_string()));
310 }
311
312 fn add_uri(&mut self, uri: &str) -> u32 {
313 if let Some(&id) = self.uri_to_id.get(uri) {
314 return id;
315 }
316
317 let id = self.next_id;
318 self.uri_to_id.insert(uri.to_string(), id);
319 self.id_to_uri.insert(id, uri.to_string());
320 self.next_id += 1;
321 id
322 }
323
324 #[allow(dead_code)]
325 fn get_uri(&self, id: u32) -> Option<&str> {
326 self.id_to_uri.get(&id).map(|s| s.as_str())
327 }
328
329 fn export(&self) -> Vec<(String, u32)> {
330 self.uri_to_id
331 .iter()
332 .map(|(uri, id)| (uri.clone(), *id))
333 .collect()
334 }
335
336 fn import(&mut self, data: Vec<(String, u32)>) {
337 self.uri_to_id.clear();
338 self.id_to_uri.clear();
339
340 for (uri, id) in data {
341 self.uri_to_id.insert(uri.clone(), id);
342 self.id_to_uri.insert(id, uri);
343 self.next_id = self.next_id.max(id + 1);
344 }
345 }
346}
347
348fn extract_uri(line: &str) -> Option<&str> {
350 if let Some(start) = line.find('<') {
351 if let Some(end) = line[start..].find('>') {
352 return Some(&line[start + 1..start + end]);
353 }
354 }
355 None
356}
357
358pub async fn compress_rdf(
360 data: &[u8],
361 algorithm: Algorithm,
362) -> Result<CompressionResult, OxirsError> {
363 let mut compressor = Compressor::new(algorithm);
364 compressor.compress(data)
365}
366
367pub async fn decompress_rdf(data: &[u8], algorithm: Algorithm) -> Result<Vec<u8>, OxirsError> {
369 let mut compressor = Compressor::new(algorithm);
370 compressor.decompress(data)
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn test_lz4_compression() {
379 let data = b"<http://example.org/s> <http://example.org/p> <http://example.org/o> .";
380 let mut compressor = Compressor::new(Algorithm::Lz4 { level: 1 });
381
382 let result = compressor
383 .compress(data)
384 .expect("compression should succeed");
385 assert!(result.compressed_size < result.original_size);
386
387 let decompressed = compressor
388 .decompress(&result.data)
389 .expect("decompression should succeed");
390 assert_eq!(decompressed, data);
391 }
392
393 #[test]
394 fn test_zstd_compression() {
395 let triple = b"<http://example.org/s> <http://example.org/p> \"literal value\" .\n";
396 let data: Vec<u8> = triple
397 .iter()
398 .copied()
399 .cycle()
400 .take(triple.len() * 50)
401 .collect();
402 let mut compressor = Compressor::new(Algorithm::Zstd { level: 3 });
403
404 let result = compressor.compress(&data).expect("zstd compression failed");
405 assert!(result.compressed_size < result.original_size);
406
407 let decompressed = compressor
408 .decompress(&result.data)
409 .expect("zstd decompression failed");
410 assert_eq!(decompressed, data);
411 }
412
413 #[test]
414 fn test_uri_dictionary() {
415 let mut dict = UriDictionary::new();
416
417 let id1 = dict.add_uri("http://example.org/test");
418 let id2 = dict.add_uri("http://example.org/test");
419 assert_eq!(id1, id2);
420
421 let id3 = dict.add_uri("http://example.org/other");
422 assert_ne!(id1, id3);
423
424 assert_eq!(dict.get_uri(id1), Some("http://example.org/test"));
425 }
426}