use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use kevy_resp::ArgvView;
use kevy_store::Store;
use crate::{
dump_store_to_aof, estimate_multibulk_bytes, write_multibulk,
};
pub(crate) const AOF_MAGIC: &[u8; 9] = b"KEVYAOF1\n";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Fsync {
Always,
EverySec,
No,
}
pub struct Aof {
file: BufWriter<File>,
path: PathBuf,
fsync: Fsync,
dirty: bool,
last_sync: Instant,
size_bytes: u64,
size_at_last_rewrite: u64,
rewrites_total: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct RewriteStats {
pub keys: u64,
pub bytes: u64,
}
impl Aof {
pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
let mut file = OpenOptions::new().create(true).append(true).open(path)?;
let mut size = file.metadata().map(|m| m.len()).unwrap_or(0);
if size == 0 {
file.write_all(AOF_MAGIC)?;
file.sync_data()?;
size = AOF_MAGIC.len() as u64;
}
Ok(Aof {
file: BufWriter::new(file),
path: path.to_path_buf(),
fsync,
dirty: false,
last_sync: Instant::now(),
size_bytes: size,
size_at_last_rewrite: size,
rewrites_total: 0,
})
}
#[inline]
pub fn fsync_policy(&self) -> Fsync {
self.fsync
}
pub fn set_fsync(&mut self, fsync: Fsync) -> io::Result<()> {
let upgrading_to_always = matches!(fsync, Fsync::Always) && !matches!(self.fsync, Fsync::Always);
self.fsync = fsync;
if upgrading_to_always && self.dirty {
self.file.flush()?;
self.file.get_ref().sync_data()?;
self.dirty = false;
self.last_sync = Instant::now();
}
Ok(())
}
pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
write_multibulk(&mut self.file, args)?;
self.size_bytes = self
.size_bytes
.saturating_add(estimate_multibulk_bytes(args));
match self.fsync {
Fsync::Always => {
self.file.flush()?;
self.file.get_ref().sync_data()?;
}
Fsync::EverySec | Fsync::No => self.dirty = true,
}
Ok(())
}
pub fn maybe_sync(&mut self) -> io::Result<()> {
if matches!(self.fsync, Fsync::EverySec)
&& self.dirty
&& self.last_sync.elapsed() >= Duration::from_secs(1)
{
self.file.flush()?;
self.file.get_ref().sync_data()?;
self.dirty = false;
self.last_sync = Instant::now();
}
Ok(())
}
pub fn truncate(&mut self) -> io::Result<()> {
self.file.flush()?;
let f = self.file.get_mut();
f.set_len(0)?;
f.seek(SeekFrom::Start(0))?; f.write_all(AOF_MAGIC)?;
f.sync_all()?;
self.dirty = false;
self.size_bytes = AOF_MAGIC.len() as u64;
self.size_at_last_rewrite = AOF_MAGIC.len() as u64;
Ok(())
}
#[inline]
pub fn size_bytes(&self) -> u64 {
self.size_bytes
}
#[inline]
pub fn size_at_last_rewrite(&self) -> u64 {
self.size_at_last_rewrite
}
#[inline]
pub fn rewrites_total(&self) -> u64 {
self.rewrites_total
}
pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
self.file.flush()?;
let tmp = rewrite_tmp_path(&self.path);
let (keys, bytes) = dump_store_to_aof(&tmp, store)?;
std::fs::rename(&tmp, &self.path)?;
let f = OpenOptions::new().append(true).open(&self.path)?;
self.file = BufWriter::new(f);
self.size_bytes = bytes;
self.size_at_last_rewrite = bytes;
self.dirty = false;
self.rewrites_total = self.rewrites_total.saturating_add(1);
Ok(RewriteStats { keys, bytes })
}
}
fn rewrite_tmp_path(path: &Path) -> PathBuf {
let mut p = path.to_path_buf();
let new_name = match path.file_name() {
Some(n) => {
let mut s = n.to_os_string();
s.push(".rewrite");
s
}
None => std::ffi::OsString::from("aof.rewrite"),
};
p.set_file_name(new_name);
p
}