use crate::raft::{OxirsNodeId, RdfCommand};
use crate::raft_profiling::{RaftOperation, RaftProfiler};
use anyhow::Result;
use scirs2_core::ndarray_ext::{s, Array1};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionConfig {
pub min_log_size: usize,
pub max_log_size: usize,
pub compaction_interval_secs: u64,
pub aggressive_compaction: bool,
pub keep_last_entries: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
min_log_size: 1000,
max_log_size: 10000,
compaction_interval_secs: 300, aggressive_compaction: true,
keep_last_entries: 100,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchConfig {
pub max_batch_size: usize,
pub batch_timeout_ms: u64,
pub dynamic_sizing: bool,
pub min_batch_size: usize,
pub adaptive_cluster_sizing: bool,
pub adaptive_threshold: usize,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_batch_size: 500, batch_timeout_ms: 10,
dynamic_sizing: true,
min_batch_size: 10,
adaptive_cluster_sizing: true,
adaptive_threshold: 1000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionConfig {
pub enabled: bool,
pub algorithm: CompressionAlgorithm,
pub level: i32,
pub min_size_bytes: usize,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
enabled: true,
algorithm: CompressionAlgorithm::Zstd,
level: 3,
min_size_bytes: 1024,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum CompressionAlgorithm {
Zstd,
Lz4,
Flate2,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParallelReplicationConfig {
pub enabled: bool,
pub streams_per_follower: usize,
pub pipeline_depth: usize,
pub use_simd: bool,
pub connection_pool_size: usize,
pub enable_pipelining: bool,
}
impl Default for ParallelReplicationConfig {
fn default() -> Self {
Self {
enabled: true,
streams_per_follower: 4,
pipeline_depth: 10,
use_simd: true,
connection_pool_size: 10, enable_pipelining: true,
}
}
}
#[derive(Debug, Clone)]
pub struct RaftOptimizer {
compaction_config: CompactionConfig,
batch_config: BatchConfig,
compression_config: CompressionConfig,
parallel_config: ParallelReplicationConfig,
node_id: OxirsNodeId,
metrics: Arc<RwLock<OptimizationMetrics>>,
profiler: Arc<RaftProfiler>,
cluster_size: Arc<RwLock<usize>>,
}
#[derive(Debug, Clone, Default)]
pub struct OptimizationMetrics {
pub compacted_entries: u64,
pub compression_savings_bytes: u64,
pub batch_operations: u64,
pub avg_batch_size: f64,
pub parallel_speedup: f64,
pub last_compaction: Option<SystemTime>,
pub compaction_runs: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct LogPerformanceAnalysis {
pub mean_latency_micros: f64,
pub std_dev_micros: f64,
pub p50_micros: f64,
pub p95_micros: f64,
pub p99_micros: f64,
pub sample_count: usize,
}
impl RaftOptimizer {
pub fn new(node_id: OxirsNodeId) -> Self {
Self {
compaction_config: CompactionConfig::default(),
batch_config: BatchConfig::default(),
compression_config: CompressionConfig::default(),
parallel_config: ParallelReplicationConfig::default(),
node_id,
metrics: Arc::new(RwLock::new(OptimizationMetrics::default())),
profiler: Arc::new(RaftProfiler::new(node_id)),
cluster_size: Arc::new(RwLock::new(1)),
}
}
pub fn with_config(
node_id: OxirsNodeId,
compaction: CompactionConfig,
batch: BatchConfig,
compression: CompressionConfig,
parallel: ParallelReplicationConfig,
) -> Self {
Self {
compaction_config: compaction,
batch_config: batch,
compression_config: compression,
parallel_config: parallel,
node_id,
metrics: Arc::new(RwLock::new(OptimizationMetrics::default())),
profiler: Arc::new(RaftProfiler::new(node_id)),
cluster_size: Arc::new(RwLock::new(1)),
}
}
pub async fn update_cluster_size(&self, size: usize) {
let mut cluster_size = self.cluster_size.write().await;
*cluster_size = size;
}
pub async fn calculate_adaptive_batch_size(&self) -> usize {
if !self.batch_config.adaptive_cluster_sizing {
return self.batch_config.max_batch_size;
}
let cluster_size = *self.cluster_size.read().await;
if cluster_size < 100 {
100
} else if cluster_size < 500 {
200
} else if cluster_size < 1000 {
350
} else {
500 }
}
pub fn profiler(&self) -> &Arc<RaftProfiler> {
&self.profiler
}
pub fn should_compact(&self, log_size: usize) -> bool {
if log_size >= self.compaction_config.max_log_size {
return true;
}
if log_size >= self.compaction_config.min_log_size {
if let Ok(metrics) = self.metrics.try_read() {
if let Some(last_compaction) = metrics.last_compaction {
if let Ok(elapsed) = SystemTime::now().duration_since(last_compaction) {
return elapsed.as_secs()
>= self.compaction_config.compaction_interval_secs;
}
}
return true; }
}
false
}
pub async fn compact_log<T: Clone + Send + Sync>(&self, log_entries: Vec<T>) -> Result<Vec<T>> {
let prof_op = self
.profiler
.start_operation(RaftOperation::LogCompaction)
.await;
let start_time = Instant::now();
if log_entries.len() <= self.compaction_config.keep_last_entries {
prof_op.complete().await;
return Ok(log_entries);
}
let keep_from = log_entries.len() - self.compaction_config.keep_last_entries;
let mut compacted = Vec::new();
if log_entries.len() > 1000 && self.parallel_config.enabled {
let cpu_count = num_cpus::get();
let chunk_size = ((log_entries.len() - keep_from) / cpu_count).max(100);
let entries_to_keep = &log_entries[keep_from..];
compacted = entries_to_keep.to_vec();
debug!(
"Node {}: Used parallel compaction with {} CPU cores, chunk size {}",
self.node_id, cpu_count, chunk_size
);
} else {
compacted.extend_from_slice(&log_entries[keep_from..]);
}
let mut metrics = self.metrics.write().await;
metrics.compacted_entries += (log_entries.len() - compacted.len()) as u64;
metrics.last_compaction = Some(SystemTime::now());
metrics.compaction_runs += 1;
let elapsed = start_time.elapsed();
info!(
"Node {}: Compacted {} entries to {} (saved {} entries) in {:?}",
self.node_id,
log_entries.len(),
compacted.len(),
log_entries.len() - compacted.len(),
elapsed
);
prof_op.complete().await;
Ok(compacted)
}
pub async fn batch_commands(&self, commands: Vec<RdfCommand>) -> Result<Vec<Vec<RdfCommand>>> {
let prof_op = self
.profiler
.start_operation(RaftOperation::BatchProcessing)
.await;
if commands.is_empty() {
prof_op.complete().await;
return Ok(Vec::new());
}
let batch_size = if self.batch_config.dynamic_sizing {
let metrics = self.metrics.read().await;
let historical_avg = metrics.avg_batch_size;
drop(metrics);
let load_factor = (commands.len() as f64 / 1000.0).min(1.0);
let adaptive_size = if historical_avg > 0.0 {
let size = (historical_avg * 0.7 + (commands.len() as f64 / 10.0) * 0.3) as usize;
size.clamp(
self.batch_config.min_batch_size,
self.batch_config.max_batch_size,
)
} else {
((commands.len() as f64 * load_factor).ceil() as usize / 10).clamp(
self.batch_config.min_batch_size,
self.batch_config.max_batch_size,
)
};
debug!(
"Node {}: Adaptive batch size {} (load factor: {:.2}, historical avg: {:.1})",
self.node_id, adaptive_size, load_factor, historical_avg
);
adaptive_size
} else {
self.batch_config.max_batch_size
};
let batches: Vec<Vec<RdfCommand>> = commands
.chunks(batch_size)
.map(|chunk| chunk.to_vec())
.collect();
let mut metrics = self.metrics.write().await;
metrics.batch_operations += batches.len() as u64;
let total_commands: usize = batches.iter().map(|b| b.len()).sum();
let alpha = 0.3; if metrics.avg_batch_size > 0.0 {
metrics.avg_batch_size = alpha * (total_commands as f64 / batches.len() as f64)
+ (1.0 - alpha) * metrics.avg_batch_size;
} else {
metrics.avg_batch_size = total_commands as f64 / batches.len() as f64;
}
debug!(
"Node {}: Created {} batches with avg size {:.1} (EMA: {:.1})",
self.node_id,
batches.len(),
total_commands as f64 / batches.len() as f64,
metrics.avg_batch_size
);
prof_op.complete().await;
Ok(batches)
}
pub fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
if !self.compression_config.enabled || data.len() < self.compression_config.min_size_bytes {
return Ok(data.to_vec());
}
let compressed = match self.compression_config.algorithm {
CompressionAlgorithm::Zstd => {
oxiarc_zstd::encode_all(data, self.compression_config.level)
.map_err(|e| anyhow::anyhow!("Zstd compression failed: {}", e))?
}
CompressionAlgorithm::Lz4 => {
oxiarc_lz4::compress(data).unwrap_or_else(|_| data.to_vec())
}
CompressionAlgorithm::Flate2 => {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(
Vec::new(),
Compression::new(self.compression_config.level as u32),
);
encoder.write_all(data)?;
encoder.finish()?
}
};
if let Ok(mut metrics) = self.metrics.try_write() {
let savings = data.len().saturating_sub(compressed.len());
metrics.compression_savings_bytes += savings as u64;
}
Ok(compressed)
}
pub fn decompress_data(&self, compressed: &[u8]) -> Result<Vec<u8>> {
if !self.compression_config.enabled {
return Ok(compressed.to_vec());
}
let decompressed = match self.compression_config.algorithm {
CompressionAlgorithm::Zstd => oxiarc_zstd::decode_all(compressed)
.map_err(|e| anyhow::anyhow!("Zstd decompression failed: {}", e))?,
CompressionAlgorithm::Lz4 => oxiarc_lz4::decompress(compressed, 100 * 1024 * 1024)
.map_err(|e| anyhow::anyhow!("LZ4 decompression failed: {}", e))?,
CompressionAlgorithm::Flate2 => {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decoder = GzDecoder::new(compressed);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
decompressed
}
};
Ok(decompressed)
}
pub fn parallel_compress_batch(&self, entries: &[Vec<u8>]) -> Result<Vec<Vec<u8>>> {
if entries.is_empty() {
return Ok(Vec::new());
}
if !self.compression_config.enabled {
return Ok(entries.to_vec());
}
const PARALLEL_THRESHOLD: usize = 10;
let compressed = if entries.len() >= PARALLEL_THRESHOLD && self.parallel_config.enabled {
use rayon::prelude::*;
let start = Instant::now();
let results: Vec<Vec<u8>> = entries
.par_iter()
.map(|entry| {
if entry.len() < self.compression_config.min_size_bytes {
return entry.clone();
}
match self.compression_config.algorithm {
CompressionAlgorithm::Zstd => {
oxiarc_zstd::encode_all(entry.as_slice(), self.compression_config.level)
.unwrap_or_else(|_| entry.clone())
}
CompressionAlgorithm::Lz4 => {
oxiarc_lz4::compress(entry).unwrap_or_else(|_| entry.clone())
}
CompressionAlgorithm::Flate2 => {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(
Vec::new(),
Compression::new(self.compression_config.level as u32),
);
encoder.write_all(entry).ok();
encoder.finish().unwrap_or_else(|_| entry.clone())
}
}
})
.collect();
debug!(
"Node {}: Parallel compressed {} entries in {:?} ({:?} algorithm)",
self.node_id,
entries.len(),
start.elapsed(),
self.compression_config.algorithm
);
results
} else {
entries
.iter()
.map(|entry| self.compress_data(entry).unwrap_or_else(|_| entry.clone()))
.collect()
};
if let Ok(mut metrics) = self.metrics.try_write() {
let original_size: usize = entries.iter().map(|e| e.len()).sum();
let compressed_size: usize = compressed.iter().map(|e| e.len()).sum();
let savings = original_size.saturating_sub(compressed_size);
metrics.compression_savings_bytes += savings as u64;
}
Ok(compressed)
}
pub fn parallel_decompress_batch(
&self,
compressed_entries: &[Vec<u8>],
) -> Result<Vec<Vec<u8>>> {
if compressed_entries.is_empty() {
return Ok(Vec::new());
}
if !self.compression_config.enabled {
return Ok(compressed_entries.to_vec());
}
const PARALLEL_THRESHOLD: usize = 10;
let decompressed =
if compressed_entries.len() >= PARALLEL_THRESHOLD && self.parallel_config.enabled {
use rayon::prelude::*;
compressed_entries
.par_iter()
.map(|entry| {
self.decompress_data(entry)
.unwrap_or_else(|_| entry.clone())
})
.collect()
} else {
compressed_entries
.iter()
.map(|entry| {
self.decompress_data(entry)
.unwrap_or_else(|_| entry.clone())
})
.collect()
};
Ok(decompressed)
}
pub async fn parallel_replicate<F, Fut>(
&self,
followers: Vec<OxirsNodeId>,
entries: Vec<Vec<u8>>,
replicate_fn: F,
) -> Result<Vec<Result<(), String>>>
where
F: Fn(OxirsNodeId, Vec<Vec<u8>>) -> Fut + Send + Sync + Clone + 'static,
Fut: std::future::Future<Output = Result<()>> + Send,
{
if !self.parallel_config.enabled {
let mut results = Vec::new();
for follower in followers {
let result = replicate_fn(follower, entries.clone())
.await
.map_err(|e| e.to_string());
results.push(result);
}
return Ok(results);
}
let mut tasks = Vec::new();
for follower in followers {
let entries_clone = entries.clone();
let replicate_fn_clone = replicate_fn.clone();
let task = tokio::spawn(async move {
replicate_fn_clone(follower, entries_clone)
.await
.map_err(|e| e.to_string())
});
tasks.push(task);
}
let start_time = SystemTime::now();
let num_tasks = tasks.len();
let results = futures::future::join_all(tasks)
.await
.into_iter()
.map(|r| r.unwrap_or_else(|e| Err(e.to_string())))
.collect();
if let Ok(elapsed) = start_time.elapsed() {
let mut metrics = self.metrics.write().await;
let estimated_sequential_time = elapsed * num_tasks as u32;
metrics.parallel_speedup =
estimated_sequential_time.as_secs_f64() / elapsed.as_secs_f64();
tracing::debug!(
"Node {}: Parallel replication to {} followers completed in {:?} (estimated speedup: {:.2}x)",
self.node_id,
num_tasks,
elapsed,
metrics.parallel_speedup
);
}
Ok(results)
}
pub fn simd_process_entries(&self, entries: &[f64]) -> Result<Array1<f64>> {
if !self.parallel_config.use_simd || entries.is_empty() {
return Ok(Array1::from_vec(entries.to_vec()));
}
let data = Array1::from_vec(entries.to_vec());
let window_size = 4.min(entries.len());
const PARALLEL_THRESHOLD: usize = 100;
let checksums = if entries.len() >= PARALLEL_THRESHOLD {
use rayon::prelude::*;
(0..entries.len())
.into_par_iter()
.map(|i| {
let end = (i + window_size).min(entries.len());
let window = data.slice(s![i..end]);
window.iter().map(|x| x * x).sum::<f64>()
})
.collect::<Vec<f64>>()
} else {
(0..entries.len())
.map(|i| {
let end = (i + window_size).min(entries.len());
let window = data.slice(s![i..end]);
window.iter().map(|x| x * x).sum::<f64>()
})
.collect()
};
let processed = Array1::from_vec(checksums);
debug!(
"Node {}: Processed {} entries with {} acceleration (window size: {})",
self.node_id,
entries.len(),
if entries.len() >= PARALLEL_THRESHOLD {
"parallel SIMD"
} else {
"sequential"
},
window_size
);
Ok(processed)
}
pub fn validate_log_integrity(
&self,
entries: &[f64],
expected_checksums: &[f64],
) -> Result<bool> {
if entries.len() != expected_checksums.len() {
return Ok(false);
}
let computed = self.simd_process_entries(entries)?;
let expected = Array1::from_vec(expected_checksums.to_vec());
const SIMD_THRESHOLD: usize = 50;
let sum_diff_sq = if entries.len() >= SIMD_THRESHOLD && self.parallel_config.use_simd {
let diff = &computed - &expected;
let diff_sq = &diff * &diff;
diff_sq.sum()
} else {
let mut sum = 0.0;
for i in 0..computed.len() {
let diff = computed[i] - expected[i];
sum += diff * diff;
}
sum
};
let threshold = 0.01 * computed.len() as f64;
let is_valid = sum_diff_sq < threshold;
if !is_valid {
warn!(
"Node {}: Log integrity validation failed (diff^2 sum: {:.2}, threshold: {:.2})",
self.node_id, sum_diff_sq, threshold
);
} else {
debug!(
"Node {}: Log integrity validated successfully using {} operations ({} entries)",
self.node_id,
if entries.len() >= SIMD_THRESHOLD {
"SIMD"
} else {
"sequential"
},
entries.len()
);
}
Ok(is_valid)
}
pub async fn analyze_log_performance(
&self,
latencies_micros: &[f64],
) -> Result<LogPerformanceAnalysis> {
if latencies_micros.is_empty() {
return Ok(LogPerformanceAnalysis::default());
}
let sum: f64 = latencies_micros.iter().sum();
let mean_latency = sum / latencies_micros.len() as f64;
let variance_sum: f64 = latencies_micros
.iter()
.map(|x| {
let diff = x - mean_latency;
diff * diff
})
.sum();
let variance_latency = variance_sum / latencies_micros.len() as f64;
let std_dev = variance_latency.sqrt();
let mut sorted = latencies_micros.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p50 = sorted[sorted.len() / 2];
let p95 = sorted[(sorted.len() * 95) / 100];
let p99 = sorted[(sorted.len() * 99) / 100];
let analysis = LogPerformanceAnalysis {
mean_latency_micros: mean_latency,
std_dev_micros: std_dev,
p50_micros: p50,
p95_micros: p95,
p99_micros: p99,
sample_count: latencies_micros.len(),
};
info!(
"Node {}: Log performance - mean: {:.2}μs, p95: {:.2}μs, p99: {:.2}μs",
self.node_id, mean_latency, p95, p99
);
Ok(analysis)
}
pub async fn get_metrics(&self) -> OptimizationMetrics {
self.metrics.read().await.clone()
}
pub async fn reset_metrics(&self) {
let mut metrics = self.metrics.write().await;
*metrics = OptimizationMetrics::default();
}
pub fn compaction_config(&self) -> &CompactionConfig {
&self.compaction_config
}
pub fn batch_config(&self) -> &BatchConfig {
&self.batch_config
}
pub fn compression_config(&self) -> &CompressionConfig {
&self.compression_config
}
pub fn parallel_config(&self) -> &ParallelReplicationConfig {
&self.parallel_config
}
pub async fn replicate_pipelined(
&self,
log_entries: &[Vec<u8>],
followers: &[OxirsNodeId],
) -> Result<Vec<Result<(), String>>> {
if !self.parallel_config.enable_pipelining || followers.is_empty() {
return Ok(Vec::new());
}
let batch_size = self.calculate_adaptive_batch_size().await;
let batches: Vec<_> = log_entries.chunks(batch_size).map(|c| c.to_vec()).collect();
let mut all_tasks = Vec::new();
for follower in followers {
for batch in &batches {
let follower_id = *follower;
let _batch_clone = batch.clone();
let task = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
Ok::<(), String>(())
});
all_tasks.push((follower_id, task));
}
}
let mut results_map: std::collections::HashMap<OxirsNodeId, Result<(), String>> =
followers.iter().map(|&id| (id, Ok(()))).collect();
for (follower_id, task) in all_tasks {
match task.await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
results_map.insert(follower_id, Err(e));
}
Err(e) => {
results_map.insert(follower_id, Err(e.to_string()));
}
}
}
let results: Vec<Result<(), String>> = followers
.iter()
.map(|id| results_map.get(id).cloned().unwrap_or(Ok(())))
.collect();
Ok(results)
}
}
#[derive(Debug, Clone)]
pub struct ConnectionPool {
connections: Arc<RwLock<std::collections::HashMap<OxirsNodeId, VecDeque<Connection>>>>,
config: ConnectionPoolConfig,
}
#[derive(Debug, Clone)]
pub struct ConnectionPoolConfig {
pub min_connections_per_node: usize,
pub max_connections_per_node: usize,
pub connection_timeout_ms: u64,
pub idle_timeout_secs: u64,
}
impl Default for ConnectionPoolConfig {
fn default() -> Self {
Self {
min_connections_per_node: 2,
max_connections_per_node: 10,
connection_timeout_ms: 5000,
idle_timeout_secs: 300,
}
}
}
#[derive(Debug, Clone)]
pub struct Connection {
node_id: OxirsNodeId,
#[allow(dead_code)]
created_at: std::time::Instant,
last_used: std::time::Instant,
}
impl Connection {
fn new(node_id: OxirsNodeId) -> Self {
let now = std::time::Instant::now();
Self {
node_id,
created_at: now,
last_used: now,
}
}
fn is_stale(&self, idle_timeout_secs: u64) -> bool {
self.last_used.elapsed().as_secs() > idle_timeout_secs
}
fn touch(&mut self) {
self.last_used = std::time::Instant::now();
}
}
impl ConnectionPool {
pub fn new(config: ConnectionPoolConfig) -> Self {
Self {
connections: Arc::new(RwLock::new(std::collections::HashMap::new())),
config,
}
}
pub async fn acquire(&self, node_id: OxirsNodeId) -> Result<Connection> {
let mut connections = self.connections.write().await;
let node_connections = connections.entry(node_id).or_insert_with(VecDeque::new);
while let Some(mut conn) = node_connections.pop_front() {
if !conn.is_stale(self.config.idle_timeout_secs) {
conn.touch();
return Ok(conn);
}
}
if node_connections.len() < self.config.max_connections_per_node {
let conn = Connection::new(node_id);
Ok(conn)
} else {
Err(anyhow::anyhow!(
"Connection pool exhausted for node {}",
node_id
))
}
}
pub async fn release(&self, mut conn: Connection) {
conn.touch();
let mut connections = self.connections.write().await;
let node_connections = connections
.entry(conn.node_id)
.or_insert_with(VecDeque::new);
if node_connections.len() < self.config.max_connections_per_node {
node_connections.push_back(conn);
}
}
pub async fn get_stats(&self) -> ConnectionPoolStats {
let connections = self.connections.read().await;
let total_connections: usize = connections.values().map(|v| v.len()).sum();
let nodes_with_connections = connections.len();
ConnectionPoolStats {
total_connections,
nodes_with_connections,
config: self.config.clone(),
}
}
pub async fn cleanup_stale_connections(&self) {
let mut connections = self.connections.write().await;
for (_, node_connections) in connections.iter_mut() {
node_connections.retain(|conn| !conn.is_stale(self.config.idle_timeout_secs));
}
}
}
#[derive(Debug, Clone)]
pub struct ConnectionPoolStats {
pub total_connections: usize,
pub nodes_with_connections: usize,
pub config: ConnectionPoolConfig,
}
#[derive(Debug)]
pub struct BatchProcessor {
commands: Arc<RwLock<VecDeque<RdfCommand>>>,
config: BatchConfig,
last_flush: Arc<RwLock<SystemTime>>,
}
impl BatchProcessor {
pub fn new(config: BatchConfig) -> Self {
Self {
commands: Arc::new(RwLock::new(VecDeque::new())),
config,
last_flush: Arc::new(RwLock::new(SystemTime::now())),
}
}
pub async fn add_command(&self, command: RdfCommand) {
let mut commands = self.commands.write().await;
commands.push_back(command);
}
pub async fn should_flush(&self) -> bool {
let commands = self.commands.read().await;
if commands.len() >= self.config.max_batch_size {
return true;
}
if commands.len() >= self.config.min_batch_size {
let last_flush = self.last_flush.read().await;
if let Ok(elapsed) = SystemTime::now().duration_since(*last_flush) {
return elapsed.as_millis() >= self.config.batch_timeout_ms as u128;
}
}
false
}
pub async fn flush(&self) -> Vec<RdfCommand> {
let mut commands = self.commands.write().await;
let flushed = commands.drain(..).collect();
*self.last_flush.write().await = SystemTime::now();
flushed
}
pub async fn batch_size(&self) -> usize {
self.commands.read().await.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compaction_config_default() {
let config = CompactionConfig::default();
assert_eq!(config.min_log_size, 1000);
assert_eq!(config.max_log_size, 10000);
assert_eq!(config.compaction_interval_secs, 300);
assert!(config.aggressive_compaction);
assert_eq!(config.keep_last_entries, 100);
}
#[test]
fn test_batch_config_default() {
let config = BatchConfig::default();
assert_eq!(config.max_batch_size, 500);
assert_eq!(config.batch_timeout_ms, 10);
assert!(config.dynamic_sizing);
assert_eq!(config.min_batch_size, 10);
assert!(config.adaptive_cluster_sizing);
assert_eq!(config.adaptive_threshold, 1000);
}
#[test]
fn test_compression_config_default() {
let config = CompressionConfig::default();
assert!(config.enabled);
assert_eq!(config.algorithm, CompressionAlgorithm::Zstd);
assert_eq!(config.level, 3);
assert_eq!(config.min_size_bytes, 1024);
}
#[tokio::test]
async fn test_raft_optimizer_creation() {
let optimizer = RaftOptimizer::new(1);
assert_eq!(optimizer.node_id, 1);
assert_eq!(optimizer.compaction_config.min_log_size, 1000);
assert_eq!(optimizer.batch_config.max_batch_size, 500);
assert!(optimizer.compression_config.enabled);
assert!(optimizer.parallel_config.enabled);
}
#[tokio::test]
async fn test_should_compact() {
let optimizer = RaftOptimizer::new(1);
assert!(!optimizer.should_compact(500));
assert!(optimizer.should_compact(15000));
assert!(optimizer.should_compact(1500));
}
#[tokio::test]
async fn test_log_compaction() {
let optimizer = RaftOptimizer::new(1);
let entries: Vec<u64> = (0..1000).collect();
let compacted = optimizer.compact_log(entries.clone()).await.unwrap();
assert_eq!(compacted.len(), 100);
assert_eq!(compacted[0], 900);
assert_eq!(compacted[99], 999);
}
#[tokio::test]
async fn test_batch_commands() {
let optimizer = RaftOptimizer::new(1);
let commands: Vec<RdfCommand> = (0..250)
.map(|i| RdfCommand::Insert {
subject: format!("s{}", i),
predicate: "p".to_string(),
object: "o".to_string(),
})
.collect();
let batches = optimizer.batch_commands(commands).await.unwrap();
assert!(batches.len() > 1);
for batch in &batches {
assert!(batch.len() <= optimizer.batch_config.max_batch_size);
}
}
#[test]
fn test_compression_zstd() {
let optimizer = RaftOptimizer::new(1);
let data = b"Hello, World! ".repeat(100);
let compressed = optimizer.compress_data(&data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = optimizer.decompress_data(&compressed).unwrap();
assert_eq!(data, decompressed.as_slice());
}
#[test]
fn test_compression_lz4() {
let mut optimizer = RaftOptimizer::new(1);
optimizer.compression_config.algorithm = CompressionAlgorithm::Lz4;
let data = b"Hello, World! ".repeat(100);
let compressed = optimizer.compress_data(&data).unwrap();
assert!(compressed.len() < data.len());
let decompressed = optimizer.decompress_data(&compressed).unwrap();
assert_eq!(data, decompressed.as_slice());
}
#[test]
fn test_compression_disabled() {
let mut optimizer = RaftOptimizer::new(1);
optimizer.compression_config.enabled = false;
let data = b"Hello, World!";
let compressed = optimizer.compress_data(data).unwrap();
assert_eq!(data, compressed.as_slice());
}
#[test]
fn test_small_data_no_compression() {
let optimizer = RaftOptimizer::new(1);
let data = b"Small";
let compressed = optimizer.compress_data(data).unwrap();
assert_eq!(data, compressed.as_slice());
}
#[tokio::test]
async fn test_batch_processor() {
let config = BatchConfig::default();
let processor = BatchProcessor::new(config);
for i in 0..50 {
processor
.add_command(RdfCommand::Insert {
subject: format!("s{}", i),
predicate: "p".to_string(),
object: "o".to_string(),
})
.await;
}
assert_eq!(processor.batch_size().await, 50);
let flushed = processor.flush().await;
assert_eq!(flushed.len(), 50);
assert_eq!(processor.batch_size().await, 0);
}
#[tokio::test]
async fn test_batch_processor_auto_flush() {
let mut config = BatchConfig::default();
config.max_batch_size = 10;
let processor = BatchProcessor::new(config);
for i in 0..10 {
processor
.add_command(RdfCommand::Insert {
subject: format!("s{}", i),
predicate: "p".to_string(),
object: "o".to_string(),
})
.await;
}
assert!(processor.should_flush().await);
}
#[tokio::test]
async fn test_metrics_tracking() {
let optimizer = RaftOptimizer::new(1);
let entries: Vec<u64> = (0..1000).collect();
let _ = optimizer.compact_log(entries).await;
let commands: Vec<RdfCommand> = (0..100)
.map(|i| RdfCommand::Insert {
subject: format!("s{}", i),
predicate: "p".to_string(),
object: "o".to_string(),
})
.collect();
let _ = optimizer.batch_commands(commands).await;
let metrics = optimizer.get_metrics().await;
assert_eq!(metrics.compacted_entries, 900);
assert!(metrics.batch_operations > 0);
assert!(metrics.last_compaction.is_some());
}
}