use crate::backend::native::v2::wal::V2WALRecord;
use crate::backend::native::{NativeBackendError, NativeResult};
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PerformanceConfig {
pub enable_compression: bool,
pub compression_algorithm: CompressionAlgorithm,
pub compression_level: u8,
pub enable_io_batching: bool,
pub max_batch_size: usize,
pub batch_timeout_ms: u64,
pub enable_cluster_affinity: bool,
pub max_cluster_group_size: usize,
pub enable_adaptive_tuning: bool,
pub monitoring_interval_ms: u64,
}
impl Default for PerformanceConfig {
fn default() -> Self {
Self {
enable_compression: false,
compression_algorithm: CompressionAlgorithm::LZ4,
compression_level: 3,
enable_io_batching: true,
max_batch_size: 1000,
batch_timeout_ms: 10,
enable_cluster_affinity: true,
max_cluster_group_size: 50,
enable_adaptive_tuning: true,
monitoring_interval_ms: 1000,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionAlgorithm {
None,
LZ4,
Zstd,
Snappy,
RLE,
}
impl CompressionAlgorithm {
pub fn default_level(&self) -> u8 {
match self {
Self::None => 0,
Self::LZ4 => 3,
Self::Zstd => 3,
Self::Snappy => 1,
Self::RLE => 1,
}
}
pub fn validate_level(&self, level: u8) -> bool {
match self {
Self::None => level == 0,
Self::LZ4 => (1..=9).contains(&level),
Self::Zstd => (1..=19).contains(&level),
Self::Snappy => level == 1,
Self::RLE => (1..=3).contains(&level),
}
}
}
pub struct WALRecordCompressor {
algorithm: CompressionAlgorithm,
level: u8,
compression_stats: CompressionStats,
}
#[derive(Debug, Clone, Default)]
pub struct CompressionStats {
pub total_records: u64,
pub total_input_bytes: u64,
pub total_output_bytes: u64,
pub compression_ratio: f64,
pub avg_compression_time_us: u64,
pub avg_decompression_time_us: u64,
}
impl WALRecordCompressor {
pub fn new(algorithm: CompressionAlgorithm, level: u8) -> NativeResult<Self> {
if !algorithm.validate_level(level) {
return Err(NativeBackendError::InvalidConfiguration {
parameter: "compression_level".to_string(),
reason: format!(
"Invalid compression level {} for algorithm {:?}",
level, algorithm
),
});
}
Ok(Self {
algorithm,
level,
compression_stats: CompressionStats::default(),
})
}
pub fn compress(&mut self, data: &[u8]) -> NativeResult<Vec<u8>> {
let start_time = Instant::now();
let compressed = match self.algorithm {
CompressionAlgorithm::None => data.to_vec(),
CompressionAlgorithm::LZ4 => self.compress_lz4(data)?,
CompressionAlgorithm::Zstd => self.compress_zstd(data)?,
CompressionAlgorithm::Snappy => self.compress_snappy(data)?,
CompressionAlgorithm::RLE => self.compress_rle(data)?,
};
let duration = start_time.elapsed().as_micros() as u64;
self.compression_stats.total_records += 1;
self.compression_stats.total_input_bytes += data.len() as u64;
self.compression_stats.total_output_bytes += compressed.len() as u64;
let total_records = self.compression_stats.total_records;
self.compression_stats.avg_compression_time_us =
((self.compression_stats.avg_compression_time_us * (total_records - 1)) + duration)
/ total_records;
if self.compression_stats.total_input_bytes > 0 {
self.compression_stats.compression_ratio = self.compression_stats.total_output_bytes
as f64
/ self.compression_stats.total_input_bytes as f64;
}
Ok(compressed)
}
pub fn decompress(&mut self, compressed_data: &[u8]) -> NativeResult<Vec<u8>> {
let start_time = Instant::now();
let decompressed = match self.algorithm {
CompressionAlgorithm::None => compressed_data.to_vec(),
CompressionAlgorithm::LZ4 => self.decompress_lz4(compressed_data)?,
CompressionAlgorithm::Zstd => self.decompress_zstd(compressed_data)?,
CompressionAlgorithm::Snappy => self.decompress_snappy(compressed_data)?,
CompressionAlgorithm::RLE => self.decompress_rle(compressed_data)?,
};
let duration = start_time.elapsed().as_micros() as u64;
let total_records = self.compression_stats.total_records;
if total_records > 0 {
self.compression_stats.avg_decompression_time_us =
((self.compression_stats.avg_decompression_time_us * (total_records - 1))
+ duration)
/ total_records;
}
Ok(decompressed)
}
pub fn get_stats(&self) -> CompressionStats {
self.compression_stats.clone()
}
fn compress_lz4(&self, data: &[u8]) -> NativeResult<Vec<u8>> {
let mut compressed = Vec::new();
let mut i = 0;
while i < data.len() {
let current_byte = data[i];
let mut count = 1;
while i + count < data.len() && data[i + count] == current_byte && count < 255 {
count += 1;
}
if count > 3 || current_byte == 0 {
compressed.push(0); compressed.push(count as u8);
compressed.push(current_byte);
} else {
for _ in 0..count {
compressed.push(current_byte);
}
}
i += count;
}
Ok(compressed)
}
fn decompress_lz4(&self, compressed: &[u8]) -> NativeResult<Vec<u8>> {
let mut decompressed = Vec::new();
let mut i = 0;
while i < compressed.len() {
if compressed[i] == 0 && i + 2 < compressed.len() {
let count = compressed[i + 1] as usize;
let byte = compressed[i + 2];
decompressed.resize(decompressed.len() + count, byte);
i += 3;
} else {
decompressed.push(compressed[i]);
i += 1;
}
}
Ok(decompressed)
}
fn compress_zstd(&self, data: &[u8]) -> NativeResult<Vec<u8>> {
let mut compressed = Vec::new();
if data.is_empty() {
return Ok(compressed);
}
let mut freq = [0u16; 256];
for &byte in data {
freq[byte as usize] = freq[byte as usize].saturating_add(1);
}
let mut codes = [(0u8, 0u8); 256];
for (i, &count) in freq.iter().enumerate() {
if count > 0 {
let code_len = if count > 100 { 4 } else { 8 };
codes[i] = (i as u8, code_len as u8);
}
}
for &(byte, code_len) in &codes {
if code_len > 0 {
compressed.push(byte);
compressed.push(code_len);
}
}
compressed.push(255);
for &byte in data {
compressed.push(byte);
}
Ok(compressed)
}
fn decompress_zstd(&self, compressed: &[u8]) -> NativeResult<Vec<u8>> {
let mut decompressed = Vec::new();
let mut i = 0;
while i < compressed.len() && compressed[i] != 255 {
i += 2;
}
i += 1;
while i < compressed.len() {
decompressed.push(compressed[i]);
i += 1;
}
Ok(decompressed)
}
fn compress_snappy(&self, data: &[u8]) -> NativeResult<Vec<u8>> {
let mut compressed = Vec::new();
let mut i = 0;
while i < data.len() {
let mut best_match_len = 0;
let mut best_match_offset = 0;
let max_offset = std::cmp::min(i, 64); let max_match_len = std::cmp::min(64, data.len() - i);
for offset in 1..max_offset {
if i >= offset {
let mut match_len = 0;
while match_len < max_match_len
&& i + match_len < data.len()
&& offset + match_len <= i
&& data[i + match_len] == data[i - offset + match_len]
{
match_len += 1;
}
if match_len > best_match_len && match_len >= 3 {
best_match_len = match_len;
best_match_offset = offset;
}
}
}
if best_match_len >= 3 {
let offset_part = ((best_match_offset - 1) << 2) & 0xFC; let len_part = (best_match_len - 3) & 0x03; let cmd = (offset_part | len_part) as u8;
compressed.push(cmd);
i += best_match_len;
} else {
let literal_len = std::cmp::min(60, data.len() - i);
compressed.push(0xF0 | (literal_len - 1) as u8);
compressed.extend_from_slice(&data[i..i + literal_len]);
i += literal_len;
}
}
Ok(compressed)
}
fn decompress_snappy(&self, compressed: &[u8]) -> NativeResult<Vec<u8>> {
let mut decompressed = Vec::new();
let mut i = 0;
while i < compressed.len() {
let byte = compressed[i];
if byte >= 0xF0 {
let literal_len = ((byte & 0x0F) + 1) as usize;
if i + 1 + literal_len > compressed.len() {
return Err(NativeBackendError::CorruptionDetected {
context: "Invalid literal length in Snappy decompression".to_string(),
source: None,
});
}
decompressed.extend_from_slice(&compressed[i + 1..i + 1 + literal_len]);
i += 1 + literal_len;
} else {
let offset = ((byte >> 2) + 1) as usize;
let length = ((byte & 0x03) + 3) as usize;
let start_pos = decompressed.len().saturating_sub(offset);
if start_pos + length > decompressed.len() {
return Err(NativeBackendError::CorruptionDetected {
context: "Invalid copy command in Snappy decompression".to_string(),
source: None,
});
}
for j in 0..length {
if start_pos + j < decompressed.len() {
decompressed.push(decompressed[start_pos + j]);
}
}
i += 1;
}
}
Ok(decompressed)
}
fn compress_rle(&self, data: &[u8]) -> NativeResult<Vec<u8>> {
if data.is_empty() {
return Ok(Vec::new());
}
let mut compressed = Vec::new();
let mut current = data[0];
let mut count = 1u8;
for &byte in &data[1..] {
if byte == current && count < 255 {
count += 1;
} else {
compressed.push(current);
compressed.push(count);
current = byte;
count = 1;
}
}
compressed.push(current);
compressed.push(count);
Ok(compressed)
}
fn decompress_rle(&self, compressed: &[u8]) -> NativeResult<Vec<u8>> {
if compressed.is_empty() {
return Ok(Vec::new());
}
if compressed.len() % 2 != 0 {
return Err(NativeBackendError::CorruptionDetected {
context: "RLE compressed data length is not even".to_string(),
source: None,
});
}
let mut decompressed = Vec::new();
for chunk in compressed.chunks_exact(2) {
let byte = chunk[0];
let count = chunk[1];
decompressed.extend(std::iter::repeat(byte).take(count as usize));
}
Ok(decompressed)
}
}
pub struct IOBatcher {
max_batch_size: usize,
batch_timeout: Duration,
current_batch: Vec<Vec<u8>>,
batch_start_time: Instant,
stats: IOBatcherStats,
}
#[derive(Debug, Clone, Default)]
pub struct IOBatcherStats {
pub total_batches: u64,
pub total_records: u64,
pub avg_batch_size: f64,
pub avg_batch_wait_time_us: u64,
pub total_bytes: u64,
}
impl IOBatcher {
pub fn new(max_batch_size: usize, batch_timeout: Duration) -> Self {
Self {
max_batch_size,
batch_timeout,
current_batch: Vec::new(),
batch_start_time: Instant::now(),
stats: IOBatcherStats::default(),
}
}
pub fn add_to_batch(&mut self, data: Vec<u8>) -> Option<Vec<Vec<u8>>> {
self.current_batch.push(data.clone());
let wait_time = self.batch_start_time.elapsed();
if self.current_batch.len() >= self.max_batch_size || wait_time >= self.batch_timeout {
self.flush_batch()
} else {
None
}
}
pub fn flush_batch(&mut self) -> Option<Vec<Vec<u8>>> {
if self.current_batch.is_empty() {
return None;
}
let batch = std::mem::take(&mut self.current_batch);
let wait_time = self.batch_start_time.elapsed();
self.stats.total_batches += 1;
self.stats.total_records += batch.len() as u64;
self.stats.total_bytes += batch.iter().map(|data| data.len()).sum::<usize>() as u64;
let total_batches = self.stats.total_batches;
self.stats.avg_batch_size = ((self.stats.avg_batch_size * (total_batches - 1) as f64)
+ batch.len() as f64)
/ total_batches as f64;
let total_batches = self.stats.total_batches;
self.stats.avg_batch_wait_time_us = ((self.stats.avg_batch_wait_time_us
* (total_batches - 1) as u64)
+ wait_time.as_micros() as u64)
/ total_batches;
self.batch_start_time = Instant::now();
Some(batch)
}
pub fn get_stats(&self) -> IOBatcherStats {
self.stats.clone()
}
}
pub struct ClusterAffinityOptimizer {
max_group_size: usize,
cluster_groups: HashMap<i64, Vec<V2WALRecord>>,
stats: ClusterAffinityStats,
}
#[derive(Debug, Clone, Default)]
pub struct ClusterAffinityStats {
pub total_groups: u64,
pub total_records: u64,
pub avg_group_size: f64,
pub affinity_hit_rate: f64,
}
impl ClusterAffinityOptimizer {
pub fn new(max_group_size: usize) -> Self {
Self {
max_group_size,
cluster_groups: HashMap::new(),
stats: ClusterAffinityStats::default(),
}
}
pub fn add_record(&mut self, record: V2WALRecord) {
if let Some(cluster_key) = record.cluster_key() {
let group = self
.cluster_groups
.entry(cluster_key)
.or_insert_with(Vec::new);
group.push(record.clone());
self.stats.total_records += 1;
if group.len() >= self.max_group_size {
let _ = self.cluster_groups.remove(&cluster_key);
self.stats.total_groups += 1;
}
}
}
pub fn get_cluster_records(&mut self, cluster_key: i64) -> Option<Vec<V2WALRecord>> {
let records = self.cluster_groups.remove(&cluster_key)?;
if !records.is_empty() {
self.stats.total_groups += 1;
}
Some(records)
}
pub fn flush_all(&mut self) -> Vec<(i64, Vec<V2WALRecord>)> {
let groups: Vec<_> = self.cluster_groups.drain().collect();
if !groups.is_empty() {
self.stats.total_groups += groups.len() as u64;
}
groups
}
pub fn get_stats(&self) -> ClusterAffinityStats {
let mut stats = self.stats.clone();
if self.stats.total_records > 0 {
stats.avg_group_size =
self.stats.total_records as f64 / self.stats.total_groups.max(1) as f64;
stats.affinity_hit_rate = 1.0; }
stats
}
}
pub struct AdaptivePerformanceTuner {
current_config: PerformanceConfig,
performance_history: Vec<PerformanceSnapshot>,
stats: TuningStats,
max_history_size: usize,
}
#[derive(Debug, Clone)]
pub struct PerformanceSnapshot {
pub timestamp: Instant,
pub write_throughput_rps: f64,
pub avg_write_latency_us: f64,
pub compression_ratio: f64,
pub io_utilization_percent: f64,
pub memory_utilization_percent: f64,
}
#[derive(Debug, Clone, Default)]
pub struct TuningStats {
pub total_adjustments: u64,
pub compression_adjustments: u64,
pub batch_size_adjustments: u64,
pub timeout_adjustments: u64,
}
impl AdaptivePerformanceTuner {
pub fn new(initial_config: PerformanceConfig, max_history_size: usize) -> Self {
Self {
current_config: initial_config,
performance_history: Vec::new(),
stats: TuningStats::default(),
max_history_size,
}
}
pub fn add_snapshot(&mut self, snapshot: PerformanceSnapshot) {
self.performance_history.push(snapshot.clone());
if self.performance_history.len() > self.max_history_size {
self.performance_history.remove(0);
}
if self.performance_history.len() >= 3 {
if let Some(adjustment) = self.analyze_and_tune() {
self.apply_adjustment(adjustment);
}
}
}
fn analyze_and_tune(&self) -> Option<PerformanceAdjustment> {
if self.performance_history.len() < 3 {
return None;
}
let recent: Vec<_> = self.performance_history.iter().rev().take(3).collect();
let oldest = &recent[2];
let current = &recent[0];
if current.write_throughput_rps < oldest.write_throughput_rps * 0.8 {
return Some(PerformanceAdjustment::ImproveThroughput);
}
if current.avg_write_latency_us > oldest.avg_write_latency_us * 1.5 {
return Some(PerformanceAdjustment::ReduceLatency);
}
if current.memory_utilization_percent > 80.0 {
return Some(PerformanceAdjustment::ReduceMemoryUsage);
}
None
}
fn apply_adjustment(&mut self, adjustment: PerformanceAdjustment) {
match adjustment {
PerformanceAdjustment::ImproveThroughput => {
if self.current_config.max_batch_size < 5000 {
self.current_config.max_batch_size *= 2;
self.stats.batch_size_adjustments += 1;
}
if !self.current_config.enable_compression {
self.current_config.enable_compression = true;
self.stats.compression_adjustments += 1;
}
}
PerformanceAdjustment::ReduceLatency => {
if self.current_config.max_batch_size > 100 {
self.current_config.max_batch_size /= 2;
self.stats.batch_size_adjustments += 1;
}
if self.current_config.batch_timeout_ms > 5 {
self.current_config.batch_timeout_ms /= 2;
self.stats.timeout_adjustments += 1;
}
}
PerformanceAdjustment::ReduceMemoryUsage => {
if self.current_config.max_batch_size > 100 {
self.current_config.max_batch_size = self.current_config.max_batch_size * 3 / 4;
self.stats.batch_size_adjustments += 1;
}
if self.current_config.max_cluster_group_size > 25 {
self.current_config.max_cluster_group_size /= 2;
}
}
}
self.stats.total_adjustments += 1;
}
pub fn get_config(&self) -> PerformanceConfig {
self.current_config.clone()
}
pub fn get_stats(&self) -> TuningStats {
self.stats.clone()
}
}
#[derive(Debug, Clone)]
enum PerformanceAdjustment {
ImproveThroughput,
ReduceLatency,
ReduceMemoryUsage,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compression_algorithm_validation() {
assert!(CompressionAlgorithm::LZ4.validate_level(5));
assert!(!CompressionAlgorithm::LZ4.validate_level(10));
assert!(CompressionAlgorithm::None.validate_level(0));
assert!(!CompressionAlgorithm::None.validate_level(1));
}
#[test]
fn test_rle_compression() {
let mut compressor = WALRecordCompressor::new(CompressionAlgorithm::RLE, 1).unwrap();
let data = vec![1, 1, 1, 2, 2, 3, 3, 3, 3];
let compressed = compressor.compress(&data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = compressor.decompress(&compressed).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_io_batcher() {
let mut batcher = IOBatcher::new(3, Duration::from_millis(100));
let result1 = batcher.add_to_batch(vec![1, 2, 3]);
assert!(result1.is_none());
let result2 = batcher.add_to_batch(vec![4, 5, 6]);
assert!(result2.is_none());
let result3 = batcher.add_to_batch(vec![7, 8, 9]);
assert!(result3.is_some());
let batch = result3.unwrap();
assert_eq!(batch.len(), 3);
}
#[test]
fn test_cluster_affinity_optimizer() {
let mut optimizer = ClusterAffinityOptimizer::new(10);
let record1 = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3],
};
let record2 = V2WALRecord::NodeUpdate {
node_id: 42,
slot_offset: 1024,
old_data: vec![1, 2, 3],
new_data: vec![4, 5, 6],
};
optimizer.add_record(record1);
optimizer.add_record(record2);
let stats = optimizer.get_stats();
assert_eq!(stats.total_records, 2);
assert_eq!(stats.total_groups, 0);
let records = optimizer.get_cluster_records(42);
assert!(
records.is_some(),
"Should find records for cluster 42 before auto-flush"
);
assert_eq!(records.unwrap().len(), 2);
let empty_records = optimizer.get_cluster_records(999); assert!(
empty_records.is_none(),
"Should return None for non-existent cluster"
);
let mut small_optimizer = ClusterAffinityOptimizer::new(1); small_optimizer.add_record(V2WALRecord::NodeInsert {
node_id: 100,
slot_offset: 2048,
node_data: vec![7, 8, 9],
});
let flushed_records = small_optimizer.get_cluster_records(100);
assert!(
flushed_records.is_none(),
"Should return None after auto-flush"
);
}
#[test]
fn test_adaptive_performance_tuner() {
let config = PerformanceConfig::default();
let mut tuner = AdaptivePerformanceTuner::new(config, 5);
let snapshot1 = PerformanceSnapshot {
timestamp: Instant::now(),
write_throughput_rps: 1000.0,
avg_write_latency_us: 100.0,
compression_ratio: 0.7,
io_utilization_percent: 50.0,
memory_utilization_percent: 30.0,
};
tuner.add_snapshot(snapshot1);
let new_config = tuner.get_config();
assert_eq!(new_config.max_batch_size, 1000);
}
}