use crate::OxirsError;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
const LZ4_MAX_DECOMPRESS_SIZE: usize = 100 * 1024 * 1024;
#[derive(Debug, Clone)]
pub enum Algorithm {
None,
Lz4 { level: u32 },
Zstd { level: i32 },
RdfCustom { options: RdfCompressionOptions },
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RdfCompressionOptions {
pub uri_dictionary: bool,
pub prefix_compression: bool,
pub dictionary_size: usize,
pub datatype_compression: bool,
}
impl Default for RdfCompressionOptions {
fn default() -> Self {
RdfCompressionOptions {
uri_dictionary: true,
prefix_compression: true,
dictionary_size: 16384,
datatype_compression: true,
}
}
}
#[derive(Debug, Clone)]
pub struct CompressionResult {
pub data: Vec<u8>,
pub original_size: usize,
pub compressed_size: usize,
pub ratio: f64,
pub algorithm: String,
pub compression_time_us: u64,
}
pub struct Compressor {
algorithm: Algorithm,
uri_dictionary: Option<UriDictionary>,
stats: CompressionStats,
}
struct UriDictionary {
uri_to_id: HashMap<String, u32>,
id_to_uri: HashMap<u32, String>,
next_id: u32,
prefixes: Vec<(String, String)>,
}
#[derive(Debug, Default)]
struct CompressionStats {
total_compressed: u64,
total_original: u64,
compression_count: u64,
total_time_us: u64,
}
impl Compressor {
pub fn new(algorithm: Algorithm) -> Self {
let uri_dictionary = match &algorithm {
Algorithm::RdfCustom { options } if options.uri_dictionary => {
Some(UriDictionary::new())
}
_ => None,
};
Compressor {
algorithm,
uri_dictionary,
stats: CompressionStats::default(),
}
}
pub fn compress(&mut self, data: &[u8]) -> Result<CompressionResult, OxirsError> {
let start = std::time::Instant::now();
let original_size = data.len();
let (compressed, algorithm_name) = match self.algorithm.clone() {
Algorithm::None => (data.to_vec(), "none"),
Algorithm::Lz4 { level } => {
let compressed = self.compress_lz4(data, level)?;
(compressed, "lz4")
}
Algorithm::Zstd { level } => {
let compressed = self.compress_zstd(data, level)?;
(compressed, "zstd")
}
Algorithm::RdfCustom { options } => {
let compressed = self.compress_rdf_custom(data, &options)?;
(compressed, "rdf_custom")
}
};
let compressed_size = compressed.len();
let ratio = original_size as f64 / compressed_size as f64;
let compression_time_us = start.elapsed().as_micros() as u64;
self.stats.total_original += original_size as u64;
self.stats.total_compressed += compressed_size as u64;
self.stats.compression_count += 1;
self.stats.total_time_us += compression_time_us;
Ok(CompressionResult {
data: compressed,
original_size,
compressed_size,
ratio,
algorithm: algorithm_name.to_string(),
compression_time_us,
})
}
pub fn decompress(&mut self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
match &self.algorithm {
Algorithm::None => Ok(data.to_vec()),
Algorithm::Lz4 { .. } => self.decompress_lz4(data),
Algorithm::Zstd { .. } => self.decompress_zstd(data),
Algorithm::RdfCustom { .. } => self.decompress_rdf_custom(data),
}
}
fn compress_lz4(&self, data: &[u8], _level: u32) -> Result<Vec<u8>, OxirsError> {
oxiarc_lz4::compress(data)
.map_err(|e| OxirsError::Io(format!("LZ4 compression failed: {}", e)))
}
fn decompress_lz4(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
oxiarc_lz4::decompress(data, LZ4_MAX_DECOMPRESS_SIZE)
.map_err(|e| OxirsError::Io(format!("LZ4 decompression failed: {}", e)))
}
fn compress_zstd(&self, data: &[u8], level: i32) -> Result<Vec<u8>, OxirsError> {
oxiarc_zstd::encode_all(data, level)
.map_err(|e| OxirsError::Io(format!("Zstd compression failed: {}", e)))
}
fn decompress_zstd(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
oxiarc_zstd::decode_all(data)
.map_err(|e| OxirsError::Io(format!("Zstd decompression failed: {}", e)))
}
fn compress_rdf_custom(
&mut self,
data: &[u8],
options: &RdfCompressionOptions,
) -> Result<Vec<u8>, OxirsError> {
let rdf_data = String::from_utf8_lossy(data);
let mut compressed = RdfCompressedData {
version: 1,
options: options.clone(),
dictionary: Vec::new(),
triples: Vec::new(),
};
if options.uri_dictionary {
if let Some(dict) = &mut self.uri_dictionary {
for line in rdf_data.lines() {
if let Some(uri) = extract_uri(line) {
dict.add_uri(uri);
}
}
compressed.dictionary = dict.export();
}
}
compressed.triples = data.to_vec();
let serialized = oxicode::serde::encode_to_vec(&compressed, oxicode::config::standard())?;
self.compress_zstd(&serialized, 3)
}
fn decompress_rdf_custom(&mut self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
let decompressed = self.decompress_zstd(data)?;
let compressed: RdfCompressedData =
oxicode::serde::decode_from_slice(&decompressed, oxicode::config::standard())
.map(|(v, _)| v)?;
if compressed.options.uri_dictionary && !compressed.dictionary.is_empty() {
if let Some(dict) = &mut self.uri_dictionary {
dict.import(compressed.dictionary);
}
}
Ok(compressed.triples)
}
pub fn stats(&self) -> CompressionStatsSummary {
CompressionStatsSummary {
total_compressed_mb: self.stats.total_compressed as f64 / 1_048_576.0,
total_original_mb: self.stats.total_original as f64 / 1_048_576.0,
average_ratio: if self.stats.total_compressed > 0 {
self.stats.total_original as f64 / self.stats.total_compressed as f64
} else {
1.0
},
compression_count: self.stats.compression_count,
avg_time_us: self
.stats
.total_time_us
.checked_div(self.stats.compression_count)
.unwrap_or(0),
}
}
}
#[derive(Debug, Clone)]
pub struct CompressionStatsSummary {
pub total_compressed_mb: f64,
pub total_original_mb: f64,
pub average_ratio: f64,
pub compression_count: u64,
pub avg_time_us: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct RdfCompressedData {
version: u32,
options: RdfCompressionOptions,
dictionary: Vec<(String, u32)>,
triples: Vec<u8>,
}
impl UriDictionary {
fn new() -> Self {
let mut dict = UriDictionary {
uri_to_id: HashMap::new(),
id_to_uri: HashMap::new(),
next_id: 0,
prefixes: Vec::new(),
};
dict.add_prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#");
dict.add_prefix("rdfs", "http://www.w3.org/2000/01/rdf-schema#");
dict.add_prefix("xsd", "http://www.w3.org/2001/XMLSchema#");
dict.add_prefix("owl", "http://www.w3.org/2002/07/owl#");
dict
}
fn add_prefix(&mut self, prefix: &str, uri: &str) {
self.prefixes.push((prefix.to_string(), uri.to_string()));
}
fn add_uri(&mut self, uri: &str) -> u32 {
if let Some(&id) = self.uri_to_id.get(uri) {
return id;
}
let id = self.next_id;
self.uri_to_id.insert(uri.to_string(), id);
self.id_to_uri.insert(id, uri.to_string());
self.next_id += 1;
id
}
#[allow(dead_code)]
fn get_uri(&self, id: u32) -> Option<&str> {
self.id_to_uri.get(&id).map(|s| s.as_str())
}
fn export(&self) -> Vec<(String, u32)> {
self.uri_to_id
.iter()
.map(|(uri, id)| (uri.clone(), *id))
.collect()
}
fn import(&mut self, data: Vec<(String, u32)>) {
self.uri_to_id.clear();
self.id_to_uri.clear();
for (uri, id) in data {
self.uri_to_id.insert(uri.clone(), id);
self.id_to_uri.insert(id, uri);
self.next_id = self.next_id.max(id + 1);
}
}
}
fn extract_uri(line: &str) -> Option<&str> {
if let Some(start) = line.find('<') {
if let Some(end) = line[start..].find('>') {
return Some(&line[start + 1..start + end]);
}
}
None
}
pub async fn compress_rdf(
data: &[u8],
algorithm: Algorithm,
) -> Result<CompressionResult, OxirsError> {
let mut compressor = Compressor::new(algorithm);
compressor.compress(data)
}
pub async fn decompress_rdf(data: &[u8], algorithm: Algorithm) -> Result<Vec<u8>, OxirsError> {
let mut compressor = Compressor::new(algorithm);
compressor.decompress(data)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lz4_compression() {
let data = b"<http://example.org/s> <http://example.org/p> <http://example.org/o> .";
let mut compressor = Compressor::new(Algorithm::Lz4 { level: 1 });
let result = compressor
.compress(data)
.expect("compression should succeed");
assert!(result.compressed_size < result.original_size);
let decompressed = compressor
.decompress(&result.data)
.expect("decompression should succeed");
assert_eq!(decompressed, data);
}
#[test]
fn test_zstd_compression() {
let triple = b"<http://example.org/s> <http://example.org/p> \"literal value\" .\n";
let data: Vec<u8> = triple
.iter()
.copied()
.cycle()
.take(triple.len() * 50)
.collect();
let mut compressor = Compressor::new(Algorithm::Zstd { level: 3 });
let result = compressor.compress(&data).expect("zstd compression failed");
assert!(result.compressed_size < result.original_size);
let decompressed = compressor
.decompress(&result.data)
.expect("zstd decompression failed");
assert_eq!(decompressed, data);
}
#[test]
fn test_uri_dictionary() {
let mut dict = UriDictionary::new();
let id1 = dict.add_uri("http://example.org/test");
let id2 = dict.add_uri("http://example.org/test");
assert_eq!(id1, id2);
let id3 = dict.add_uri("http://example.org/other");
assert_ne!(id1, id3);
assert_eq!(dict.get_uri(id1), Some("http://example.org/test"));
}
}