use crate::SNAPSHOT_BUF_CAP;
use kevy_resp::{Argv, ArgvView};
use kevy_store::{StreamData, StreamId, Value};
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::Path;
pub fn dump_aof<S: crate::SnapshotSource>(path: &Path, src: &S) -> io::Result<(u64, u64)> {
let f = File::create(path)?;
let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, f);
w.write_all(crate::aof::AOF_MAGIC)?;
let mut keys = 0u64;
let mut err: Option<io::Error> = None;
src.for_each_entry(|key, value, ttl_ms| {
if err.is_some() {
return;
}
if let Err(e) = write_value_as_commands(&mut w, key, value, ttl_ms) {
err = Some(e);
} else {
keys += 1;
}
});
if let Some(e) = err {
return Err(e);
}
w.flush()?;
let inner = w
.into_inner()
.map_err(|e| io::Error::other(e.to_string()))?;
let bytes = inner.metadata().map(|m| m.len()).unwrap_or(0);
inner.sync_all()?;
Ok((keys, bytes))
}
pub(crate) fn dump_store_to_buf<S: crate::SnapshotSource>(src: &S) -> (Vec<u8>, u64) {
let mut buf = Vec::with_capacity(crate::SNAPSHOT_BUF_CAP);
buf.extend_from_slice(crate::aof::AOF_MAGIC);
let mut keys = 0u64;
src.for_each_entry(|key, value, ttl_ms| {
let _ = write_value_as_commands(&mut buf, key, value, ttl_ms);
keys += 1;
});
(buf, keys)
}
fn write_value_as_commands<W: Write>(
w: &mut W,
key: &[u8],
value: &Value,
ttl_ms: Option<u64>,
) -> io::Result<()> {
match value {
Value::Str(s) => {
let argv = Argv::from(vec![b"SET".to_vec(), key.to_vec(), s.to_vec()]);
write_multibulk(w, &argv)?;
}
Value::Hash(h) => {
let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + h.len() * 2);
argv.push(b"HSET".to_vec());
argv.push(key.to_vec());
for (f, v) in h.iter() {
argv.push(f.to_vec());
argv.push(v.clone());
}
write_multibulk(w, &Argv::from(argv))?;
}
Value::List(l) => {
let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + l.len());
argv.push(b"RPUSH".to_vec());
argv.push(key.to_vec());
for v in l.iter() {
argv.push(v.clone());
}
write_multibulk(w, &Argv::from(argv))?;
}
Value::Set(s) => {
let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + s.len());
argv.push(b"SADD".to_vec());
argv.push(key.to_vec());
for m in s.iter() {
argv.push(m.to_vec());
}
write_multibulk(w, &Argv::from(argv))?;
}
Value::ZSet(z) => {
let mut argv: Vec<Vec<u8>> = Vec::with_capacity(2 + z.ordered().count() * 2);
argv.push(b"ZADD".to_vec());
argv.push(key.to_vec());
for (m, sc) in z.ordered() {
argv.push(fmt_zset_score(sc));
argv.push(m.to_vec());
}
write_multibulk(w, &Argv::from(argv))?;
}
Value::Stream(s) => write_stream_as_commands(w, key, s)?,
}
if let Some(ms) = ttl_ms {
let deadline = kevy_store::now_unix_ms().saturating_add(ms);
let argv = Argv::from(vec![
b"PEXPIREAT".to_vec(),
key.to_vec(),
deadline.to_string().into_bytes(),
]);
write_multibulk(w, &argv)?;
}
Ok(())
}
fn write_stream_as_commands<W: Write>(w: &mut W, key: &[u8], s: &StreamData) -> io::Result<()> {
for (id, fv) in s.iter_entries() {
let mut argv: Vec<Vec<u8>> = Vec::with_capacity(3 + fv.len() * 2);
argv.push(b"XADD".to_vec());
argv.push(key.to_vec());
argv.push(id.encode());
for (f, v) in fv {
argv.push(f.to_vec());
argv.push(v.to_vec());
}
write_multibulk(w, &Argv::from(argv))?;
}
let (len, last, mxd, added) =
(s.length(), s.last_id(), s.max_deleted_id(), s.entries_added());
if len == 0 && last != StreamId::MIN {
let argv = vec![
b"XADD".to_vec(), key.to_vec(), b"MAXLEN".to_vec(), b"0".to_vec(),
last.encode(), b"x".to_vec(), b"x".to_vec(),
];
write_multibulk(w, &Argv::from(argv))?;
}
let natural = if len > 0 {
(s.last_entry().map_or(StreamId::MIN, |(id, _)| id), len, StreamId::MIN)
} else {
(last, u64::from(last != StreamId::MIN), last)
};
if natural != (last, added, mxd) {
let argv = vec![
b"XSETID".to_vec(), key.to_vec(), last.encode(),
b"ENTRIESADDED".to_vec(), added.to_string().into_bytes(),
b"MAXDELETEDID".to_vec(), mxd.encode(),
];
write_multibulk(w, &Argv::from(argv))?;
}
write_stream_group_commands(w, key, s)
}
fn write_stream_group_commands<W: Write>(
w: &mut W,
key: &[u8],
s: &StreamData,
) -> io::Result<()> {
for g in s.export_groups() {
let last_delivered =
StreamId { ms: g.last_delivered.0, seq: g.last_delivered.1 };
let argv = vec![
b"XGROUP".to_vec(), b"CREATE".to_vec(), key.to_vec(), g.name.clone(),
last_delivered.encode(), b"MKSTREAM".to_vec(),
];
write_multibulk(w, &Argv::from(argv))?;
for (consumer, _last_seen_ms) in &g.consumers {
let argv = vec![
b"XGROUP".to_vec(), b"CREATECONSUMER".to_vec(), key.to_vec(),
g.name.clone(), consumer.clone(),
];
write_multibulk(w, &Argv::from(argv))?;
}
for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
let id = StreamId { ms: *ms, seq: *seq };
if !s.contains_entry(id) {
continue;
}
let argv = vec![
b"XCLAIM".to_vec(), key.to_vec(), g.name.clone(), consumer.clone(),
b"0".to_vec(), id.encode(),
b"TIME".to_vec(), delivery_time_ms.to_string().into_bytes(),
b"RETRYCOUNT".to_vec(), delivery_count.to_string().into_bytes(),
b"FORCE".to_vec(), b"JUSTID".to_vec(),
];
write_multibulk(w, &Argv::from(argv))?;
}
}
Ok(())
}
fn fmt_zset_score(s: f64) -> Vec<u8> {
if s.is_finite() && s == s.trunc() && s.abs() < 1e17 {
format!("{}", s as i64).into_bytes()
} else {
format!("{s:.17}").into_bytes()
}
}
pub(crate) fn estimate_multibulk_bytes<A: ArgvView + ?Sized>(args: &A) -> u64 {
let mut n: u64 = 3 + decimal_digits(args.len() as u64) as u64;
for i in 0..args.len() {
let a = &args[i];
n += 3 + decimal_digits(a.len() as u64) as u64 + a.len() as u64 + 2;
}
n
}
#[inline]
fn decimal_digits(mut x: u64) -> u32 {
if x == 0 {
return 1;
}
let mut d = 0;
while x > 0 {
d += 1;
x /= 10;
}
d
}
pub(crate) fn write_multibulk<W: Write, A: ArgvView + ?Sized>(
w: &mut W,
args: &A,
) -> io::Result<()> {
write!(w, "*{}\r\n", args.len())?;
for i in 0..args.len() {
let a = &args[i];
write!(w, "${}\r\n", a.len())?;
w.write_all(a)?;
w.write_all(b"\r\n")?;
}
Ok(())
}