use std::time::{Duration, Instant};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use crate::{debug_log};
#[derive(Debug, Clone)]
pub struct SmartFlushConfig {
pub base_interval_ms: usize,
pub min_interval_ms: usize,
pub max_interval_ms: usize,
pub write_rate_threshold: u64,
pub accumulated_bytes_threshold: usize,
pub enabled: bool,
}
impl Default for SmartFlushConfig {
fn default() -> Self {
Self {
base_interval_ms: 200, min_interval_ms: 50, max_interval_ms: 2000, write_rate_threshold: 10000, accumulated_bytes_threshold: 4 * 1024 * 1024, enabled: true,
}
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct WriteLoadStats {
write_count: AtomicU64,
write_bytes: AtomicU64,
last_stats_time: RwLock<Instant>,
current_write_rate: AtomicU64,
current_byte_rate: AtomicU64,
accumulated_bytes: AtomicUsize,
}
impl WriteLoadStats {
pub fn new() -> Self {
Self {
write_count: AtomicU64::new(0),
write_bytes: AtomicU64::new(0),
last_stats_time: RwLock::new(Instant::now()),
current_write_rate: AtomicU64::new(0),
current_byte_rate: AtomicU64::new(0),
accumulated_bytes: AtomicUsize::new(0),
}
}
pub fn record_write(&self, bytes_written: usize) {
self.write_count.fetch_add(1, Ordering::Relaxed);
self.write_bytes.fetch_add(bytes_written as u64, Ordering::Relaxed);
self.accumulated_bytes.fetch_add(bytes_written, Ordering::Relaxed);
}
pub fn update_rates(&self) {
let now = Instant::now();
let mut last_time = self.last_stats_time.write();
let elapsed = now.duration_since(*last_time);
if elapsed.as_secs() > 0 {
let write_count = self.write_count.swap(0, Ordering::Relaxed);
let write_bytes = self.write_bytes.swap(0, Ordering::Relaxed);
let write_rate = (write_count as f64 / elapsed.as_secs_f64()) as u64;
let byte_rate = (write_bytes as f64 / elapsed.as_secs_f64()) as u64;
self.current_write_rate.store(write_rate, Ordering::Relaxed);
self.current_byte_rate.store(byte_rate, Ordering::Relaxed);
}
*last_time = now;
}
pub fn get_write_rate(&self) -> u64 {
self.current_write_rate.load(Ordering::Relaxed)
}
pub fn get_byte_rate(&self) -> u64 {
self.current_byte_rate.load(Ordering::Relaxed)
}
pub fn get_accumulated_bytes(&self) -> usize {
self.accumulated_bytes.load(Ordering::Relaxed)
}
pub fn reset_accumulated_bytes(&self) {
self.accumulated_bytes.store(0, Ordering::Relaxed);
}
}
#[doc(hidden)]
pub struct SmartFlushScheduler {
config: SmartFlushConfig,
stats: Arc<WriteLoadStats>,
last_flush_time: RwLock<Instant>,
}
impl SmartFlushScheduler {
pub fn new(config: SmartFlushConfig) -> Self {
Self {
config,
stats: Arc::new(WriteLoadStats::new()),
last_flush_time: RwLock::new(Instant::now()),
}
}
pub fn get_stats(&self) -> Arc<WriteLoadStats> {
self.stats.clone()
}
pub fn calculate_next_flush_delay(&self) -> Duration {
if !self.config.enabled {
return Duration::from_millis(self.config.base_interval_ms as u64);
}
self.stats.update_rates();
let write_rate = self.stats.get_write_rate();
let accumulated_bytes = self.stats.get_accumulated_bytes();
let last_flush = *self.last_flush_time.read();
let time_since_last_flush = Instant::now().duration_since(last_flush);
if accumulated_bytes >= self.config.accumulated_bytes_threshold {
debug_log!("智能flush: 累积字节{}超过阈值{}, 立即flush",
accumulated_bytes, self.config.accumulated_bytes_threshold);
return Duration::from_millis(0);
}
let mut interval_ms = self.config.base_interval_ms;
if write_rate > self.config.write_rate_threshold {
let load_factor = (write_rate as f64 / self.config.write_rate_threshold as f64).min(5.0);
interval_ms = (self.config.base_interval_ms as f64 / load_factor) as usize;
interval_ms = interval_ms.max(self.config.min_interval_ms);
debug_log!("智能flush: 高写入负载{} ops/sec, 调整间隔为{}ms",
write_rate, interval_ms);
} else {
let load_factor = (write_rate as f64 / self.config.write_rate_threshold as f64).max(0.1);
interval_ms = (self.config.base_interval_ms as f64 * (2.0 - load_factor)) as usize;
interval_ms = interval_ms.min(self.config.max_interval_ms);
debug_log!("智能flush: 低写入负载{} ops/sec, 调整间隔为{}ms",
write_rate, interval_ms);
}
let remaining_interval = Duration::from_millis(interval_ms as u64);
if time_since_last_flush >= remaining_interval {
Duration::from_millis(0) } else {
remaining_interval - time_since_last_flush
}
}
pub fn notify_flush_completed(&self) {
*self.last_flush_time.write() = Instant::now();
self.stats.reset_accumulated_bytes();
}
pub fn update_config(&mut self, config: SmartFlushConfig) {
self.config = config;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_write_load_stats() {
let stats = WriteLoadStats::new();
stats.record_write(100);
stats.record_write(200);
stats.record_write(150);
stats.update_rates();
assert_eq!(stats.get_accumulated_bytes(), 450);
stats.reset_accumulated_bytes();
assert_eq!(stats.get_accumulated_bytes(), 0);
}
#[test]
fn test_smart_flush_scheduler() {
let config = SmartFlushConfig {
base_interval_ms: 100,
min_interval_ms: 50,
max_interval_ms: 500,
write_rate_threshold: 1000,
accumulated_bytes_threshold: 1000,
enabled: true,
};
let scheduler = SmartFlushScheduler::new(config);
let stats = scheduler.get_stats();
stats.record_write(1200);
let delay = scheduler.calculate_next_flush_delay();
assert_eq!(delay, Duration::from_millis(0));
scheduler.notify_flush_completed();
let delay = scheduler.calculate_next_flush_delay();
assert!(delay > Duration::from_millis(0));
}
}