use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use kevy_resp::ArgvView;
use kevy_store::Store;
use crate::{
dump_store_to_buf, estimate_multibulk_bytes, write_multibulk,
};
pub(crate) const AOF_MAGIC: &[u8; 9] = b"KEVYAOF1\n";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Fsync {
Always,
EverySec,
No,
}
pub struct Aof {
file: BufWriter<File>,
path: PathBuf,
fsync: Fsync,
dirty: bool,
last_sync: Instant,
size_bytes: u64,
size_at_last_rewrite: u64,
rewrites_total: u64,
deferred: bool,
rewrite_tee: Option<Vec<u8>>,
}
pub struct RewritePlan {
pub body: Vec<u8>,
pub tmp: PathBuf,
pub keys: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct RewriteStats {
pub keys: u64,
pub bytes: u64,
}
impl Aof {
pub fn open(path: &Path, fsync: Fsync) -> io::Result<Self> {
let mut file = OpenOptions::new().create(true).append(true).open(path)?;
let mut size = file.metadata().map(|m| m.len()).unwrap_or(0);
if size == 0 {
file.write_all(AOF_MAGIC)?;
file.sync_data()?;
size = AOF_MAGIC.len() as u64;
}
Ok(Aof {
file: BufWriter::new(file),
path: path.to_path_buf(),
fsync,
dirty: false,
last_sync: Instant::now(),
size_bytes: size,
size_at_last_rewrite: size,
rewrites_total: 0,
deferred: false,
rewrite_tee: None,
})
}
#[inline]
pub fn fsync_policy(&self) -> Fsync {
self.fsync
}
pub fn set_fsync(&mut self, fsync: Fsync) -> io::Result<()> {
let upgrading_to_always = matches!(fsync, Fsync::Always) && !matches!(self.fsync, Fsync::Always);
self.fsync = fsync;
if upgrading_to_always && self.dirty {
self.file.flush()?;
self.file.get_ref().sync_data()?;
self.dirty = false;
self.last_sync = Instant::now();
}
Ok(())
}
pub fn append<A: ArgvView + ?Sized>(&mut self, args: &A) -> io::Result<()> {
write_multibulk(&mut self.file, args)?;
if let Some(tee) = &mut self.rewrite_tee {
write_multibulk(tee, args)?;
}
self.size_bytes = self
.size_bytes
.saturating_add(estimate_multibulk_bytes(args));
match self.fsync {
Fsync::Always if self.deferred => self.dirty = true,
Fsync::Always => {
self.file.flush()?;
self.file.get_ref().sync_data()?;
}
Fsync::EverySec | Fsync::No => self.dirty = true,
}
Ok(())
}
#[inline]
pub fn begin_group(&mut self) {
if matches!(self.fsync, Fsync::Always) {
self.deferred = true;
}
}
#[inline]
pub fn end_group(&mut self) -> io::Result<()> {
if self.deferred {
self.deferred = false;
if self.dirty {
self.file.flush()?;
self.file.get_ref().sync_data()?;
self.dirty = false;
self.last_sync = Instant::now();
}
}
Ok(())
}
pub fn maybe_sync(&mut self) -> io::Result<()> {
if matches!(self.fsync, Fsync::EverySec)
&& self.dirty
&& self.last_sync.elapsed() >= Duration::from_secs(1)
{
self.file.flush()?;
self.file.get_ref().sync_data()?;
self.dirty = false;
self.last_sync = Instant::now();
}
Ok(())
}
pub fn truncate(&mut self) -> io::Result<()> {
self.file.flush()?;
let f = self.file.get_mut();
f.set_len(0)?;
f.seek(SeekFrom::Start(0))?; f.write_all(AOF_MAGIC)?;
f.sync_all()?;
self.dirty = false;
self.size_bytes = AOF_MAGIC.len() as u64;
self.size_at_last_rewrite = AOF_MAGIC.len() as u64;
Ok(())
}
#[inline]
pub fn size_bytes(&self) -> u64 {
self.size_bytes
}
#[inline]
pub fn size_at_last_rewrite(&self) -> u64 {
self.size_at_last_rewrite
}
#[inline]
pub fn rewrites_total(&self) -> u64 {
self.rewrites_total
}
pub fn rewrite_from(&mut self, store: &Store) -> io::Result<RewriteStats> {
self.file.flush()?;
let tmp = rewrite_tmp_path(&self.path);
let (keys, bytes) = crate::dump_aof(&tmp, store)?;
std::fs::rename(&tmp, &self.path)?;
let f = OpenOptions::new().append(true).open(&self.path)?;
self.file = BufWriter::new(f);
self.size_bytes = bytes;
self.size_at_last_rewrite = bytes;
self.dirty = false;
self.rewrites_total = self.rewrites_total.saturating_add(1);
Ok(RewriteStats { keys, bytes })
}
#[inline]
pub fn is_rewriting(&self) -> bool {
self.rewrite_tee.is_some()
}
pub fn begin_concurrent_rewrite(&mut self, store: &Store) -> io::Result<RewritePlan> {
let (body, keys) = dump_store_to_buf(store);
self.rewrite_tee = Some(Vec::new());
Ok(RewritePlan {
body,
tmp: rewrite_tmp_path(&self.path),
keys,
})
}
pub fn finish_concurrent_rewrite(&mut self, tmp: &Path, keys: u64) -> io::Result<RewriteStats> {
let tee = self.rewrite_tee.take().unwrap_or_default();
{
let mut f = OpenOptions::new().append(true).open(tmp)?;
f.write_all(&tee)?;
f.sync_all()?;
}
std::fs::rename(tmp, &self.path)?;
let f = OpenOptions::new().append(true).open(&self.path)?;
let bytes = f.metadata().map(|m| m.len()).unwrap_or(0);
self.file = BufWriter::new(f);
self.size_bytes = bytes;
self.size_at_last_rewrite = bytes;
self.dirty = false;
self.rewrites_total = self.rewrites_total.saturating_add(1);
Ok(RewriteStats { keys, bytes })
}
pub fn abort_concurrent_rewrite(&mut self) {
self.rewrite_tee = None;
}
pub fn begin_view_rewrite(&mut self) -> io::Result<std::path::PathBuf> {
self.file.flush()?;
self.rewrite_tee = Some(Vec::new());
Ok(rewrite_tmp_path(&self.path))
}
}
pub fn write_aof_base(path: &Path) -> io::Result<()> {
let mut f = File::create(path)?;
f.write_all(AOF_MAGIC)?;
f.sync_all()
}
fn rewrite_tmp_path(path: &Path) -> PathBuf {
let mut p = path.to_path_buf();
let new_name = match path.file_name() {
Some(n) => {
let mut s = n.to_os_string();
s.push(".rewrite");
s
}
None => std::ffi::OsString::from("aof.rewrite"),
};
p.set_file_name(new_name);
p
}