#![forbid(unsafe_code)]
mod aof;
pub mod layout;
mod replay;
pub mod reshard;
mod rewrite_fmt;
mod shards_meta;
mod snapshot_payload;
pub use aof::{Aof, Fsync, RewritePlan, RewriteStats, write_aof_base};
pub use replay::replay_aof;
pub use shards_meta::{Routing, ShardsMeta, read_shards_meta, write_shards_meta};
pub use kevy_resp::{Argv, ArgvView};
pub use rewrite_fmt::dump_aof;
pub(crate) use rewrite_fmt::{dump_store_to_buf, estimate_multibulk_bytes, write_multibulk};
use kevy_store::Store;
use kevy_store::Value;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::path::Path;
const MAGIC: &[u8; 8] = b"KEVYSNAP";
const VERSION: u8 = 4;
const VERSION_RELATIVE_TTL: u8 = 2;
const VERSION_ABSOLUTE_TTL: u8 = 3;
const OP_EOF: u8 = 0;
const OP_STR: u8 = 1;
const OP_HASH: u8 = 2;
const OP_LIST: u8 = 3;
const OP_SET: u8 = 4;
const OP_ZSET: u8 = 5;
const OP_STREAM: u8 = 6;
pub(crate) const SNAPSHOT_BUF_CAP: usize = 1 << 20;
pub trait SnapshotSource {
fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>));
}
impl SnapshotSource for Store {
fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
self.snapshot_each(f);
}
}
impl SnapshotSource for kevy_store::SnapshotView {
fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
self.each(f);
}
}
pub fn save_snapshot<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<()> {
let tmp = write_snapshot_tmp(src, path)?;
std::fs::rename(&tmp, path)
}
pub fn write_snapshot_to<S: SnapshotSource, W: Write>(src: &S, sink: &mut W) -> io::Result<()> {
let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, sink);
w.write_all(MAGIC)?;
w.write_all(&[VERSION])?;
let now = kevy_store::now_unix_ms();
let mut err: Option<io::Error> = None;
src.for_each_entry(|key, value, ttl| {
let deadline = ttl.map(|ms| now.saturating_add(ms));
if err.is_none()
&& let Err(e) = write_entry(&mut w, key, value, deadline)
{
err = Some(e);
}
});
if let Some(e) = err {
return Err(e);
}
w.write_all(&[OP_EOF])?;
w.flush()?;
Ok(())
}
pub fn write_snapshot_tmp<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<std::path::PathBuf> {
let tmp = tmp_path(path);
{
let mut file = File::create(&tmp)?;
write_snapshot_to(src, &mut file)?;
file.sync_all()?; }
Ok(tmp)
}
pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
let r = BufReader::new(File::open(path)?);
load_snapshot_from(store, r)
}
pub fn load_snapshot_from<R: Read>(store: &mut Store, mut r: R) -> io::Result<()> {
let mut magic = [0u8; 8];
r.read_exact(&mut magic)?;
if &magic != MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"kevy snapshot: bad magic",
));
}
let version = read_u8(&mut r)?;
if !(VERSION_RELATIVE_TTL..=VERSION).contains(&version) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"kevy snapshot: bad version",
));
}
let absolute_ttl = version >= VERSION_ABSOLUTE_TTL;
let now = kevy_store::now_unix_ms();
loop {
let op = read_u8(&mut r)?;
if op == OP_EOF {
return Ok(());
}
let raw_ttl = read_ttl(&mut r)?;
let ttl = if absolute_ttl {
raw_ttl.map(|deadline| deadline.saturating_sub(now))
} else {
raw_ttl
};
let key = read_bytes(&mut r)?;
match op {
OP_STR => {
let val = read_bytes(&mut r)?;
store.load_str(key, val, ttl);
}
OP_HASH => {
let n = read_u32(&mut r)? as usize;
let mut fields = Vec::with_capacity(n);
for _ in 0..n {
let f = read_bytes(&mut r)?;
let v = read_bytes(&mut r)?;
fields.push((f, v));
}
store.load_hash(key, fields, ttl);
}
OP_LIST => {
let n = read_u32(&mut r)? as usize;
let mut items = Vec::with_capacity(n);
for _ in 0..n {
items.push(read_bytes(&mut r)?);
}
store.load_list(key, items, ttl);
}
OP_SET => {
let n = read_u32(&mut r)? as usize;
let mut members = Vec::with_capacity(n);
for _ in 0..n {
members.push(read_bytes(&mut r)?);
}
store.load_set(key, members, ttl);
}
OP_ZSET => {
let n = read_u32(&mut r)? as usize;
let mut pairs = Vec::with_capacity(n);
for _ in 0..n {
let m = read_bytes(&mut r)?;
let score = f64::from_bits(read_u64(&mut r)?);
pairs.push((m, score));
}
store.load_zset(key, pairs, ttl);
}
OP_STREAM => {
let last_ms = read_u64(&mut r)?;
let last_seq = read_u64(&mut r)?;
let mxd_ms = read_u64(&mut r)?;
let mxd_seq = read_u64(&mut r)?;
let entries_added = read_u64(&mut r)?;
let n = read_u32(&mut r)? as usize;
let mut entries = Vec::with_capacity(n);
for _ in 0..n {
let ms = read_u64(&mut r)?;
let seq = read_u64(&mut r)?;
let nf = read_u32(&mut r)? as usize;
let mut fv = Vec::with_capacity(nf);
for _ in 0..nf {
let f = read_bytes(&mut r)?;
let v = read_bytes(&mut r)?;
fv.push((f, v));
}
entries.push((ms, seq, fv));
}
let groups = if version >= VERSION {
read_stream_groups(&mut r)?
} else {
Vec::new()
};
store.load_stream(
key,
entries,
(last_ms, last_seq),
(mxd_ms, mxd_seq),
entries_added,
groups,
ttl,
);
}
other => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("kevy snapshot: unknown opcode {other}"),
));
}
}
}
}
fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
let op = match value {
Value::Str(_) | Value::Int(_) | Value::ArcBulk(_) => OP_STR,
Value::Hash(_) | Value::SmallHashInline(_) => OP_HASH,
Value::List(_) | Value::SmallListInline(_) => OP_LIST,
Value::Set(_) | Value::SmallSetInline(_) => OP_SET,
Value::ZSet(_) | Value::SmallZSetInline(_) => OP_ZSET,
Value::Stream(_) => OP_STREAM,
};
w.write_all(&[op])?;
write_ttl(w, ttl)?;
write_bytes(w, key)?;
match value {
Value::Str(v) => write_bytes(w, v.as_slice()),
Value::Int(n) => write_bytes(w, n.to_string().as_bytes()),
Value::ArcBulk(a) => write_bytes(w, a.as_ref()),
Value::Hash(h) => snapshot_payload::write_hash_payload(w, h),
Value::SmallHashInline(h) => snapshot_payload::write_small_hash_payload(w, h),
Value::List(l) => snapshot_payload::write_list_payload(w, l),
Value::SmallListInline(l) => snapshot_payload::write_small_list_payload(w, l),
Value::Set(set) => snapshot_payload::write_set_payload(w, set),
Value::SmallSetInline(s) => snapshot_payload::write_small_set_payload(w, s),
Value::ZSet(z) => snapshot_payload::write_zset_payload(w, z),
Value::SmallZSetInline(z) => snapshot_payload::write_small_zset_payload(w, z),
Value::Stream(s) => snapshot_payload::write_stream_payload(w, s),
}
}
pub(crate) fn write_stream_groups<W: Write>(w: &mut W, groups: &[kevy_store::LoadedGroup]) -> io::Result<()> {
w.write_all(&(groups.len() as u32).to_le_bytes())?;
for g in groups {
write_bytes(w, &g.name)?;
w.write_all(&g.last_delivered.0.to_le_bytes())?;
w.write_all(&g.last_delivered.1.to_le_bytes())?;
w.write_all(&(g.consumers.len() as u32).to_le_bytes())?;
for (name, last_seen_ms) in &g.consumers {
write_bytes(w, name)?;
w.write_all(&last_seen_ms.to_le_bytes())?;
}
w.write_all(&(g.pel.len() as u32).to_le_bytes())?;
for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
w.write_all(&ms.to_le_bytes())?;
w.write_all(&seq.to_le_bytes())?;
write_bytes(w, consumer)?;
w.write_all(&delivery_time_ms.to_le_bytes())?;
w.write_all(&delivery_count.to_le_bytes())?;
}
}
Ok(())
}
fn read_stream_groups<R: Read>(r: &mut R) -> io::Result<Vec<kevy_store::LoadedGroup>> {
let n = read_u32(r)? as usize;
let mut groups = Vec::with_capacity(n);
for _ in 0..n {
let name = read_bytes(r)?;
let last_delivered = (read_u64(r)?, read_u64(r)?);
let nc = read_u32(r)? as usize;
let mut consumers = Vec::with_capacity(nc);
for _ in 0..nc {
let cname = read_bytes(r)?;
consumers.push((cname, read_u64(r)?));
}
let np = read_u32(r)? as usize;
let mut pel = Vec::with_capacity(np);
for _ in 0..np {
let ms = read_u64(r)?;
let seq = read_u64(r)?;
let consumer = read_bytes(r)?;
let delivery_time_ms = read_u64(r)?;
let delivery_count = read_u32(r)?;
pel.push((ms, seq, consumer, delivery_time_ms, delivery_count));
}
groups.push(kevy_store::LoadedGroup { name, last_delivered, consumers, pel });
}
Ok(groups)
}
fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
match ttl {
Some(ms) => {
w.write_all(&[1u8])?;
w.write_all(&ms.to_le_bytes())?;
}
None => w.write_all(&[0u8])?,
}
Ok(())
}
fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
if read_u8(r)? == 1 {
Ok(Some(read_u64(r)?))
} else {
Ok(None)
}
}
fn tmp_path(path: &Path) -> std::path::PathBuf {
let mut s = path.as_os_str().to_owned();
s.push(".tmp");
s.into()
}
pub(crate) fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
w.write_all(&(b.len() as u32).to_le_bytes())?;
w.write_all(b)
}
fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
let len = read_u32(r)? as usize;
let mut buf = vec![0u8; len];
r.read_exact(&mut buf)?;
Ok(buf)
}
fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
let mut b = [0u8; 1];
r.read_exact(&mut b)?;
Ok(b[0])
}
fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
let mut b = [0u8; 4];
r.read_exact(&mut b)?;
Ok(u32::from_le_bytes(b))
}
fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
let mut b = [0u8; 8];
r.read_exact(&mut b)?;
Ok(u64::from_le_bytes(b))
}
#[cfg(test)]
mod tests;
#[cfg(test)]
mod tests_aof;
#[cfg(test)]
mod tests_rewrite;