use anyhow::*;
use crossbeam::channel::{bounded, Sender};
use std::io::Write;
use std::time::{Duration, Instant};
use std::result::Result::{Ok, Err};
#[derive(Clone)]
pub struct AofHandle {
tx: Sender<Vec<u8>>,
}
pub fn spawn_aof_writer(path: &str) -> Result<AofHandle> {
let (tx, rx) = bounded::<Vec<u8>>(4096);
let path = path.to_string();
std::thread::Builder::new()
.name("aof-writer".into())
.spawn(move || {
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.expect("open aof");
let mut last = Instant::now();
loop {
match rx.recv() {
Ok(buf) => {
let _ = f.write_all(&buf);
if last.elapsed() >= Duration::from_millis(1000) {
let _ = f.flush();
let _ = f.sync_data();
last = Instant::now();
}
}
Err(_) => {
let _ = f.flush();
let _ = f.sync_data();
break;
}
}
}
})?;
Ok(AofHandle { tx })
}
impl AofHandle {
#[inline]
pub fn write(&self, bytes: &[u8]) {
let _ = self.tx.send(bytes.to_vec());
}
}
pub fn emit_aof_set(k: &[u8], v: &[u8]) -> Vec<u8> {
format!(
"*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n${}\r\n{}\r\n",
k.len(),
String::from_utf8_lossy(k),
v.len(),
String::from_utf8_lossy(v)
)
.into_bytes()
}
pub fn emit_aof_rename(a: &[u8], b: &[u8]) -> Vec<u8> {
format!(
"*3\r\n$6\r\nRENAME\r\n${}\r\n{}\r\n${}\r\n{}\r\n",
a.len(),
String::from_utf8_lossy(a),
b.len(),
String::from_utf8_lossy(b)
)
.into_bytes()
}
pub fn emit_aof_incr(k: &[u8]) -> Vec<u8> {
format!(
"*2\r\n$4\r\nINCR\r\n${}\r\n{}\r\n",
k.len(),
String::from_utf8_lossy(k)
)
.into_bytes()
}
use bytes::Bytes;
pub fn emit_aof_mset(pairs: &[(Bytes, Bytes)]) -> Vec<u8> {
let mut s = format!("*{}\r\n$4\r\nMSET\r\n", 1 + pairs.len() * 2);
for (k, v) in pairs {
s.push_str(&format!(
"${}\r\n{}\r\n${}\r\n{}\r\n",
k.len(),
String::from_utf8_lossy(k),
v.len(),
String::from_utf8_lossy(v)
));
}
s.into_bytes()
}