use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Copy)]
pub struct BgWriterConfig {
pub delay: Duration,
pub max_pages_per_round: usize,
pub lru_multiplier: f64,
pub max_dirty_fraction: f64,
}
impl Default for BgWriterConfig {
fn default() -> Self {
Self {
delay: Duration::from_millis(200),
max_pages_per_round: 100,
lru_multiplier: 2.0,
max_dirty_fraction: 0.5,
}
}
}
#[derive(Debug, Default)]
pub struct BgWriterStats {
pub rounds: AtomicU64,
pub pages_flushed: AtomicU64,
pub max_round_hit: AtomicU64,
pub last_dirty_fraction_milli: AtomicU64,
}
impl BgWriterStats {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn snapshot(&self) -> BgWriterStatsSnapshot {
BgWriterStatsSnapshot {
rounds: self.rounds.load(Ordering::Relaxed),
pages_flushed: self.pages_flushed.load(Ordering::Relaxed),
max_round_hit: self.max_round_hit.load(Ordering::Relaxed),
last_dirty_fraction: self.last_dirty_fraction_milli.load(Ordering::Relaxed) as f64
/ 1000.0,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct BgWriterStatsSnapshot {
pub rounds: u64,
pub pages_flushed: u64,
pub max_round_hit: u64,
pub last_dirty_fraction: f64,
}
pub trait DirtyPageFlusher: Send + Sync {
fn dirty_fraction(&self) -> f64;
fn flush_some(&self, max: usize) -> usize;
}
pub struct PagerDirtyFlusher {
pager: std::sync::Weak<crate::storage::engine::pager::Pager>,
}
impl PagerDirtyFlusher {
pub fn new(pager: std::sync::Weak<crate::storage::engine::pager::Pager>) -> Self {
Self { pager }
}
}
impl DirtyPageFlusher for PagerDirtyFlusher {
fn dirty_fraction(&self) -> f64 {
match self.pager.upgrade() {
Some(p) => p.dirty_fraction(),
None => 0.0,
}
}
fn flush_some(&self, max: usize) -> usize {
let Some(p) = self.pager.upgrade() else {
return 0;
};
match p.flush_some_dirty(max) {
Ok(n) => n,
Err(err) => {
tracing::warn!(error = ?err, "bgwriter flush_some_dirty failed");
0
}
}
}
}
pub struct BgWriterHandle {
stop: Arc<AtomicBool>,
pub stats: Arc<BgWriterStats>,
}
impl BgWriterHandle {
pub fn stop(&self) {
self.stop.store(true, Ordering::Release);
}
}
impl Drop for BgWriterHandle {
fn drop(&mut self) {
self.stop();
}
}
pub fn spawn(flusher: Arc<dyn DirtyPageFlusher>, config: BgWriterConfig) -> BgWriterHandle {
let stop = Arc::new(AtomicBool::new(false));
let stats = BgWriterStats::new();
let stop_clone = Arc::clone(&stop);
let stats_clone = Arc::clone(&stats);
std::thread::spawn(move || {
loop {
if stop_clone.load(Ordering::Acquire) {
break;
}
let dirty = flusher.dirty_fraction();
stats_clone
.last_dirty_fraction_milli
.store((dirty * 1000.0) as u64, Ordering::Relaxed);
let target_pages = if dirty > config.max_dirty_fraction {
config.max_pages_per_round
} else {
((config.max_pages_per_round as f64 / 4.0) * config.lru_multiplier) as usize
};
let target_pages = target_pages.min(config.max_pages_per_round);
let flushed = flusher.flush_some(target_pages);
stats_clone
.pages_flushed
.fetch_add(flushed as u64, Ordering::Relaxed);
stats_clone.rounds.fetch_add(1, Ordering::Relaxed);
if flushed >= config.max_pages_per_round {
stats_clone.max_round_hit.fetch_add(1, Ordering::Relaxed);
}
std::thread::sleep(config.delay);
}
});
BgWriterHandle { stop, stats }
}