use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{Cursor, Read, Write};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
None,
Lz4,
Zstd,
Deflate,
}
impl Default for CompressionAlgorithm {
fn default() -> Self {
Self::Lz4
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SerializationFormat {
MessagePack,
ProtocolBuffers,
Bincode,
Json,
}
impl Default for SerializationFormat {
fn default() -> Self {
Self::MessagePack
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct SchemaVersion {
pub major: u32,
pub minor: u32,
pub patch: u32,
}
impl Default for SchemaVersion {
fn default() -> Self {
Self {
major: 1,
minor: 0,
patch: 0,
}
}
}
impl SchemaVersion {
pub fn is_compatible_with(&self, other: &SchemaVersion) -> bool {
if self.major != other.major {
return false;
}
self.minor >= other.minor
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializationConfig {
pub compression: CompressionAlgorithm,
pub format: SerializationFormat,
pub schema_version: SchemaVersion,
pub enable_checksums: bool,
pub compression_threshold: usize,
pub enable_metrics: bool,
}
impl Default for SerializationConfig {
fn default() -> Self {
Self {
compression: CompressionAlgorithm::Lz4,
format: SerializationFormat::MessagePack,
schema_version: SchemaVersion::default(),
enable_checksums: true,
compression_threshold: 1024, enable_metrics: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializedMessage {
pub schema_version: SchemaVersion,
pub compression: CompressionAlgorithm,
pub format: SerializationFormat,
pub payload: Vec<u8>,
pub checksum: Option<u32>,
pub original_size: usize,
pub compression_ratio: f32,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SerializationMetrics {
pub messages_serialized: u64,
pub messages_deserialized: u64,
pub bytes_serialized: u64,
pub bytes_deserialized: u64,
pub serialization_time: Duration,
pub deserialization_time: Duration,
pub compression_time: Duration,
pub decompression_time: Duration,
pub avg_compression_ratio: f32,
pub checksum_failures: u64,
pub schema_mismatches: u64,
}
impl SerializationMetrics {
pub fn throughput(&self) -> f64 {
let total_time = self.serialization_time + self.deserialization_time;
if total_time.is_zero() {
0.0
} else {
(self.messages_serialized + self.messages_deserialized) as f64
/ total_time.as_secs_f64()
}
}
pub fn avg_message_size(&self) -> f64 {
if self.messages_serialized == 0 {
0.0
} else {
self.bytes_serialized as f64 / self.messages_serialized as f64
}
}
}
#[derive(Debug)]
pub struct MessageSerializer {
config: SerializationConfig,
metrics: SerializationMetrics,
}
impl MessageSerializer {
pub fn new() -> Self {
Self::with_config(SerializationConfig::default())
}
pub fn with_config(config: SerializationConfig) -> Self {
Self {
config,
metrics: SerializationMetrics::default(),
}
}
pub fn serialize<T: Serialize>(&mut self, message: &T) -> Result<SerializedMessage> {
let start_time = Instant::now();
let serialized_data = match self.config.format {
SerializationFormat::MessagePack => rmp_serde::to_vec(message)
.map_err(|e| anyhow!("MessagePack serialization failed: {}", e))?,
SerializationFormat::Bincode => {
oxicode::serde::encode_to_vec(&message, oxicode::config::standard())
.map_err(|e| anyhow!("Bincode serialization failed: {}", e))?
}
SerializationFormat::Json => serde_json::to_vec(message)
.map_err(|e| anyhow!("JSON serialization failed: {}", e))?,
SerializationFormat::ProtocolBuffers => {
oxicode::serde::encode_to_vec(&message, oxicode::config::standard())
.map_err(|e| anyhow!("ProtocolBuffers serialization failed: {}", e))?
}
};
let original_size = serialized_data.len();
let (compressed_data, compression_used) = if original_size
> self.config.compression_threshold
{
let compression_start = Instant::now();
let compressed = match self.config.compression {
CompressionAlgorithm::None => (serialized_data.clone(), CompressionAlgorithm::None),
CompressionAlgorithm::Lz4 => {
let compressed = oxiarc_lz4::compress(&serialized_data)
.map_err(|e| anyhow!("LZ4 compression failed: {}", e))?;
(compressed, CompressionAlgorithm::Lz4)
}
CompressionAlgorithm::Zstd => {
let compressed = oxiarc_zstd::encode_all(&serialized_data, 3)
.map_err(|e| anyhow!("Zstd compression failed: {}", e))?;
(compressed, CompressionAlgorithm::Zstd)
}
CompressionAlgorithm::Deflate => {
use flate2::write::DeflateEncoder;
use flate2::Compression;
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&serialized_data)
.map_err(|e| anyhow!("Deflate compression failed: {}", e))?;
let compressed = encoder
.finish()
.map_err(|e| anyhow!("Deflate compression failed: {}", e))?;
(compressed, CompressionAlgorithm::Deflate)
}
};
if self.config.enable_metrics {
self.metrics.compression_time += compression_start.elapsed();
}
compressed
} else {
(serialized_data, CompressionAlgorithm::None)
};
let compression_ratio = if original_size > 0 {
compressed_data.len() as f32 / original_size as f32
} else {
1.0
};
let checksum = if self.config.enable_checksums {
Some(crc32fast::hash(&compressed_data))
} else {
None
};
let serialized_message = SerializedMessage {
schema_version: self.config.schema_version,
compression: compression_used,
format: self.config.format,
payload: compressed_data,
checksum,
original_size,
compression_ratio,
};
if self.config.enable_metrics {
self.metrics.messages_serialized += 1;
self.metrics.bytes_serialized += original_size as u64;
self.metrics.serialization_time += start_time.elapsed();
let total_messages = self.metrics.messages_serialized as f32;
self.metrics.avg_compression_ratio =
(self.metrics.avg_compression_ratio * (total_messages - 1.0) + compression_ratio)
/ total_messages;
}
Ok(serialized_message)
}
pub fn deserialize<T: for<'de> Deserialize<'de>>(
&mut self,
message: &SerializedMessage,
) -> Result<T> {
let start_time = Instant::now();
if !self
.config
.schema_version
.is_compatible_with(&message.schema_version)
{
if self.config.enable_metrics {
self.metrics.schema_mismatches += 1;
}
return Err(anyhow!(
"Schema version mismatch: expected {:?}, got {:?}",
self.config.schema_version,
message.schema_version
));
}
if self.config.enable_checksums {
if let Some(expected_checksum) = message.checksum {
let actual_checksum = crc32fast::hash(&message.payload);
if actual_checksum != expected_checksum {
if self.config.enable_metrics {
self.metrics.checksum_failures += 1;
}
return Err(anyhow!(
"Checksum verification failed: expected {}, got {}",
expected_checksum,
actual_checksum
));
}
}
}
let decompression_start = Instant::now();
let decompressed_data = match message.compression {
CompressionAlgorithm::None => message.payload.clone(),
CompressionAlgorithm::Lz4 => oxiarc_lz4::decompress(
&message.payload,
message.original_size.max(100 * 1024 * 1024),
)
.map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?,
CompressionAlgorithm::Zstd => oxiarc_zstd::decode_all(&message.payload)
.map_err(|e| anyhow!("Zstd decompression failed: {}", e))?,
CompressionAlgorithm::Deflate => {
use flate2::read::DeflateDecoder;
let mut decoder = DeflateDecoder::new(Cursor::new(&message.payload));
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(|e| anyhow!("Deflate decompression failed: {}", e))?;
decompressed
}
};
if self.config.enable_metrics {
self.metrics.decompression_time += decompression_start.elapsed();
}
let deserialized: T = match message.format {
SerializationFormat::MessagePack => rmp_serde::from_slice(&decompressed_data)
.map_err(|e| anyhow!("MessagePack deserialization failed: {}", e))?,
SerializationFormat::Bincode => {
oxicode::serde::decode_from_slice(&decompressed_data, oxicode::config::standard())
.map(|(v, _)| v)
.map_err(|e| anyhow!("Bincode deserialization failed: {}", e))?
}
SerializationFormat::Json => serde_json::from_slice(&decompressed_data)
.map_err(|e| anyhow!("JSON deserialization failed: {}", e))?,
SerializationFormat::ProtocolBuffers => {
oxicode::serde::decode_from_slice(&decompressed_data, oxicode::config::standard())
.map(|(v, _)| v)
.map_err(|e| anyhow!("ProtocolBuffers deserialization failed: {}", e))?
}
};
if self.config.enable_metrics {
self.metrics.messages_deserialized += 1;
self.metrics.bytes_deserialized += decompressed_data.len() as u64;
self.metrics.deserialization_time += start_time.elapsed();
}
Ok(deserialized)
}
pub fn metrics(&self) -> &SerializationMetrics {
&self.metrics
}
pub fn reset_metrics(&mut self) {
self.metrics = SerializationMetrics::default();
}
pub fn update_config(&mut self, config: SerializationConfig) {
self.config = config;
}
pub fn config(&self) -> &SerializationConfig {
&self.config
}
pub fn benchmark<T: Serialize + for<'de> Deserialize<'de> + Clone>(
&mut self,
message: &T,
iterations: usize,
) -> Result<BenchmarkResults> {
let mut results = BenchmarkResults {
iterations,
total_serialization_time: Duration::ZERO,
total_deserialization_time: Duration::ZERO,
min_serialization_time: Duration::MAX,
max_serialization_time: Duration::ZERO,
min_deserialization_time: Duration::MAX,
max_deserialization_time: Duration::ZERO,
total_compressed_size: 0,
total_uncompressed_size: 0,
compression_ratios: Vec::new(),
};
for _ in 0..iterations {
let serialize_start = Instant::now();
let serialized = self.serialize(message)?;
let serialize_time = serialize_start.elapsed();
results.total_serialization_time += serialize_time;
results.min_serialization_time = results.min_serialization_time.min(serialize_time);
results.max_serialization_time = results.max_serialization_time.max(serialize_time);
results.total_compressed_size += serialized.payload.len();
results.total_uncompressed_size += serialized.original_size;
results
.compression_ratios
.push(serialized.compression_ratio);
let deserialize_start = Instant::now();
let _deserialized: T = self.deserialize(&serialized)?;
let deserialize_time = deserialize_start.elapsed();
results.total_deserialization_time += deserialize_time;
results.min_deserialization_time =
results.min_deserialization_time.min(deserialize_time);
results.max_deserialization_time =
results.max_deserialization_time.max(deserialize_time);
}
Ok(results)
}
}
impl Default for MessageSerializer {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct BenchmarkResults {
pub iterations: usize,
pub total_serialization_time: Duration,
pub total_deserialization_time: Duration,
pub min_serialization_time: Duration,
pub max_serialization_time: Duration,
pub min_deserialization_time: Duration,
pub max_deserialization_time: Duration,
pub total_compressed_size: usize,
pub total_uncompressed_size: usize,
pub compression_ratios: Vec<f32>,
}
impl BenchmarkResults {
pub fn avg_serialization_time(&self) -> Duration {
self.total_serialization_time / self.iterations as u32
}
pub fn avg_deserialization_time(&self) -> Duration {
self.total_deserialization_time / self.iterations as u32
}
pub fn avg_compression_ratio(&self) -> f32 {
if self.compression_ratios.is_empty() {
1.0
} else {
self.compression_ratios.iter().sum::<f32>() / self.compression_ratios.len() as f32
}
}
pub fn throughput(&self) -> f64 {
let total_time = self.total_serialization_time + self.total_deserialization_time;
if total_time.is_zero() {
0.0
} else {
(self.iterations * 2) as f64 / total_time.as_secs_f64() }
}
}
pub struct AdaptiveCompression {
sample_size: usize,
performance_history: HashMap<CompressionAlgorithm, Vec<f32>>,
current_best: CompressionAlgorithm,
}
impl AdaptiveCompression {
pub fn new() -> Self {
Self {
sample_size: 100,
performance_history: HashMap::new(),
current_best: CompressionAlgorithm::Lz4,
}
}
pub fn evaluate_and_select(&mut self, data: &[u8]) -> CompressionAlgorithm {
if data.len() < 1024 {
return self.current_best;
}
let total_samples: usize = self.performance_history.values().map(|v| v.len()).sum();
if total_samples % self.sample_size == 0 {
self.benchmark_algorithms(data);
}
self.current_best
}
fn benchmark_algorithms(&mut self, sample_data: &[u8]) {
let algorithms = [
CompressionAlgorithm::None,
CompressionAlgorithm::Lz4,
CompressionAlgorithm::Zstd,
CompressionAlgorithm::Deflate,
];
let mut best_score = f32::MIN;
let mut best_algorithm = self.current_best;
for &algorithm in &algorithms {
if let Ok(score) = self.benchmark_algorithm(algorithm, sample_data) {
self.performance_history
.entry(algorithm)
.or_default()
.push(score);
let history = self
.performance_history
.get_mut(&algorithm)
.expect("algorithm key just inserted via entry().or_default()");
if history.len() > self.sample_size {
history.remove(0);
}
let avg_score = history.iter().sum::<f32>() / history.len() as f32;
if avg_score > best_score {
best_score = avg_score;
best_algorithm = algorithm;
}
}
}
self.current_best = best_algorithm;
}
fn benchmark_algorithm(&self, algorithm: CompressionAlgorithm, data: &[u8]) -> Result<f32> {
let start_time = Instant::now();
let compressed_size = match algorithm {
CompressionAlgorithm::None => data.len(),
CompressionAlgorithm::Lz4 => oxiarc_lz4::compress(data)
.map_err(|e| anyhow!("LZ4 benchmark failed: {}", e))?
.len(),
CompressionAlgorithm::Zstd => oxiarc_zstd::encode_all(data, 3)
.map_err(|e| anyhow!("Zstd benchmark failed: {}", e))?
.len(),
CompressionAlgorithm::Deflate => {
use flate2::write::DeflateEncoder;
use flate2::Compression;
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(data)
.map_err(|e| anyhow!("Deflate benchmark failed: {}", e))?;
encoder
.finish()
.map_err(|e| anyhow!("Deflate benchmark failed: {}", e))?
.len()
}
};
let compression_time = start_time.elapsed();
let compression_ratio = data.len() as f32 / compressed_size as f32;
let speed = data.len() as f32 / compression_time.as_secs_f32();
let score = (compression_ratio * 0.3) + (speed / 1_000_000.0 * 0.7);
Ok(score)
}
pub fn current_best(&self) -> CompressionAlgorithm {
self.current_best
}
pub fn performance_history(&self) -> &HashMap<CompressionAlgorithm, Vec<f32>> {
&self.performance_history
}
}
impl Default for AdaptiveCompression {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct TestMessage {
id: u64,
data: String,
values: Vec<i32>,
}
#[test]
fn test_serialization_roundtrip() {
let mut serializer = MessageSerializer::new();
let message = TestMessage {
id: 12345,
data: "Hello, distributed world!".to_string(),
values: vec![1, 2, 3, 4, 5],
};
let serialized = serializer.serialize(&message).unwrap();
let deserialized: TestMessage = serializer.deserialize(&serialized).unwrap();
assert_eq!(message, deserialized);
}
#[test]
fn test_compression_algorithms() {
let algorithms = [
CompressionAlgorithm::None,
CompressionAlgorithm::Lz4,
CompressionAlgorithm::Deflate,
];
for algorithm in algorithms {
let config = SerializationConfig {
compression: algorithm,
compression_threshold: 0, ..Default::default()
};
let mut serializer = MessageSerializer::with_config(config);
let message = TestMessage {
id: 12345,
data: "A".repeat(1000), values: (0..100).collect(),
};
let serialized = serializer.serialize(&message).unwrap();
let deserialized: TestMessage = serializer.deserialize(&serialized).unwrap();
assert_eq!(message, deserialized);
assert_eq!(serialized.compression, algorithm);
}
}
#[test]
fn test_schema_version_compatibility() {
let v1_0_0 = SchemaVersion {
major: 1,
minor: 0,
patch: 0,
};
let v1_1_0 = SchemaVersion {
major: 1,
minor: 1,
patch: 0,
};
let v2_0_0 = SchemaVersion {
major: 2,
minor: 0,
patch: 0,
};
assert!(v1_1_0.is_compatible_with(&v1_0_0));
assert!(!v1_0_0.is_compatible_with(&v1_1_0));
assert!(!v1_0_0.is_compatible_with(&v2_0_0));
assert!(!v2_0_0.is_compatible_with(&v1_0_0));
}
#[test]
fn test_checksum_verification() {
let config = SerializationConfig {
enable_checksums: true,
..Default::default()
};
let mut serializer = MessageSerializer::with_config(config);
let message = TestMessage {
id: 12345,
data: "Test message".to_string(),
values: vec![1, 2, 3],
};
let mut serialized = serializer.serialize(&message).unwrap();
assert!(serialized.checksum.is_some());
serialized.payload[0] ^= 0xFF;
let result: Result<TestMessage> = serializer.deserialize(&serialized);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Checksum verification failed"));
}
#[test]
fn test_adaptive_compression() {
let mut adaptive = AdaptiveCompression::new();
let data = b"Hello, world!".repeat(100);
let algorithm1 = adaptive.evaluate_and_select(&data);
let algorithm2 = adaptive.evaluate_and_select(&data);
assert!(matches!(
algorithm1,
CompressionAlgorithm::None
| CompressionAlgorithm::Lz4
| CompressionAlgorithm::Zstd
| CompressionAlgorithm::Deflate
));
assert_eq!(algorithm1, algorithm2); }
#[test]
fn test_metrics_collection() {
let config = SerializationConfig {
enable_metrics: true,
..Default::default()
};
let mut serializer = MessageSerializer::with_config(config);
let message = TestMessage {
id: 12345,
data: "Test message".to_string(),
values: vec![1, 2, 3],
};
let serialized = serializer.serialize(&message).unwrap();
let _deserialized: TestMessage = serializer.deserialize(&serialized).unwrap();
let metrics = serializer.metrics();
assert_eq!(metrics.messages_serialized, 1);
assert_eq!(metrics.messages_deserialized, 1);
assert!(metrics.bytes_serialized > 0);
assert!(metrics.bytes_deserialized > 0);
assert!(metrics.serialization_time > Duration::ZERO);
assert!(metrics.deserialization_time > Duration::ZERO);
}
}