use crate::log_manager::LogManager;
use noxu_util::daemon::DaemonThread;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
pub struct LogFlusher {
flush_sync_daemon: Option<DaemonThread>,
flush_no_sync_daemon: Option<DaemonThread>,
flush_sync_interval_ms: u64,
flush_no_sync_interval_ms: u64,
last_n_commits_sync: Arc<AtomicU64>,
last_n_commits_no_sync: Arc<AtomicU64>,
flush_sync_count: Arc<AtomicU64>,
flush_no_sync_count: Arc<AtomicU64>,
}
impl LogFlusher {
pub fn new(
log_manager: Arc<LogManager>,
flush_sync_interval_ms: u64,
flush_no_sync_interval_ms: u64,
get_n_commits: Arc<dyn Fn() -> u64 + Send + Sync>,
) -> Self {
let last_n_commits_sync = Arc::new(AtomicU64::new(get_n_commits()));
let last_n_commits_no_sync = Arc::new(AtomicU64::new(get_n_commits()));
let flush_sync_count = Arc::new(AtomicU64::new(0));
let flush_no_sync_count = Arc::new(AtomicU64::new(0));
let flush_sync_daemon = if flush_sync_interval_ms > 0 {
let lm = Arc::clone(&log_manager);
let get_commits = Arc::clone(&get_n_commits);
let last_commits = Arc::clone(&last_n_commits_sync);
let count = Arc::clone(&flush_sync_count);
Some(DaemonThread::spawn(
"LogFlusher-Sync",
Duration::from_millis(flush_sync_interval_ms),
move || {
let current_commits = get_commits();
let last = last_commits.load(Ordering::Relaxed);
if current_commits > last {
if let Err(e) = lm.flush_sync() {
log::error!(
"LogFlusher (sync) flush failed: {e}; commits \
between {last} and {current_commits} are not \
yet durable on disk and will be retried on \
the next tick"
);
} else {
last_commits
.store(current_commits, Ordering::Relaxed);
count.fetch_add(1, Ordering::Relaxed);
}
}
true },
))
} else {
None
};
let flush_no_sync_daemon = if flush_no_sync_interval_ms > 0 {
let lm = Arc::clone(&log_manager);
let get_commits = Arc::clone(&get_n_commits);
let last_commits = Arc::clone(&last_n_commits_no_sync);
let count = Arc::clone(&flush_no_sync_count);
Some(DaemonThread::spawn(
"LogFlusher-NoSync",
Duration::from_millis(flush_no_sync_interval_ms),
move || {
let current_commits = get_commits();
let last = last_commits.load(Ordering::Relaxed);
if current_commits > last {
if let Err(e) = lm.flush_no_sync() {
log::error!(
"LogFlusher (no-sync) flush failed: {e}; \
commits between {last} and {current_commits} \
are not yet flushed and will be retried on \
the next tick"
);
} else {
last_commits
.store(current_commits, Ordering::Relaxed);
count.fetch_add(1, Ordering::Relaxed);
}
}
true },
))
} else {
None
};
LogFlusher {
flush_sync_daemon,
flush_no_sync_daemon,
flush_sync_interval_ms,
flush_no_sync_interval_ms,
last_n_commits_sync,
last_n_commits_no_sync,
flush_sync_count,
flush_no_sync_count,
}
}
pub fn get_flush_sync_interval(&self) -> u64 {
self.flush_sync_interval_ms
}
pub fn get_flush_no_sync_interval(&self) -> u64 {
self.flush_no_sync_interval_ms
}
pub fn get_flush_sync_count(&self) -> u64 {
self.flush_sync_count.load(Ordering::Relaxed)
}
pub fn get_flush_no_sync_count(&self) -> u64 {
self.flush_no_sync_count.load(Ordering::Relaxed)
}
pub fn request_shutdown(&self) {
if let Some(ref daemon) = self.flush_sync_daemon {
daemon.request_shutdown();
}
if let Some(ref daemon) = self.flush_no_sync_daemon {
daemon.request_shutdown();
}
}
pub fn shutdown(self) {
if let Some(daemon) = self.flush_sync_daemon {
daemon.shutdown();
}
if let Some(daemon) = self.flush_no_sync_daemon {
daemon.shutdown();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file_manager::FileManager;
use crate::log_manager::LogManager;
use std::sync::atomic::AtomicU64;
use tempfile::TempDir;
fn make_log_manager(dir: &TempDir) -> Arc<LogManager> {
let fm = Arc::new(
FileManager::new(dir.path(), false, 10_000_000, 100).unwrap(),
);
Arc::new(LogManager::new(fm, 3, 1024 * 1024, 4096))
}
#[test]
fn test_new_flusher() {
let dir = TempDir::new().unwrap();
let lm = make_log_manager(&dir);
let commit_count = Arc::new(AtomicU64::new(0));
let commit_count_clone = Arc::clone(&commit_count);
let flusher = LogFlusher::new(
lm,
1000,
500,
Arc::new(move || commit_count_clone.load(Ordering::Relaxed)),
);
assert_eq!(flusher.get_flush_sync_interval(), 1000);
assert_eq!(flusher.get_flush_no_sync_interval(), 500);
flusher.shutdown();
}
#[test]
fn test_flusher_triggers_on_commits() {
let dir = TempDir::new().unwrap();
let lm = make_log_manager(&dir);
let commit_count = Arc::new(AtomicU64::new(0));
let commit_count_clone = Arc::clone(&commit_count);
let flusher = LogFlusher::new(
lm,
50, 0, Arc::new(move || commit_count_clone.load(Ordering::Relaxed)),
);
commit_count.store(10, Ordering::Relaxed);
std::thread::sleep(Duration::from_millis(200));
assert!(flusher.get_flush_sync_count() > 0);
flusher.shutdown();
}
#[test]
fn test_flusher_no_flush_without_commits() {
let dir = TempDir::new().unwrap();
let lm = make_log_manager(&dir);
let commit_count = Arc::new(AtomicU64::new(5));
let commit_count_clone = Arc::clone(&commit_count);
let flusher = LogFlusher::new(
lm,
50, 0,
Arc::new(move || commit_count_clone.load(Ordering::Relaxed)),
);
std::thread::sleep(Duration::from_millis(200));
assert_eq!(flusher.get_flush_sync_count(), 0);
flusher.shutdown();
}
}