1#![forbid(unsafe_code)]
38
39mod aof;
40pub mod layout;
41mod replay;
42pub mod reshard;
43mod rewrite_fmt;
44mod shards_meta;
45
46pub use aof::{Aof, Fsync, RewritePlan, RewriteStats, write_aof_base};
47pub use replay::replay_aof;
48pub use shards_meta::{Routing, ShardsMeta, read_shards_meta, write_shards_meta};
49pub use kevy_resp::{Argv, ArgvView};
50pub use rewrite_fmt::dump_aof;
51pub(crate) use rewrite_fmt::{dump_store_to_buf, estimate_multibulk_bytes, write_multibulk};
52use kevy_store::Store;
53use kevy_store::Value;
54use std::fs::File;
56use std::io::{self, BufReader, BufWriter, Read, Write};
57use std::path::Path;
58
59const MAGIC: &[u8; 8] = b"KEVYSNAP";
69const VERSION: u8 = 4;
70const VERSION_RELATIVE_TTL: u8 = 2;
71const VERSION_ABSOLUTE_TTL: u8 = 3;
72
73const OP_EOF: u8 = 0;
76const OP_STR: u8 = 1;
77const OP_HASH: u8 = 2;
78const OP_LIST: u8 = 3;
79const OP_SET: u8 = 4;
80const OP_ZSET: u8 = 5;
81const OP_STREAM: u8 = 6;
82
83pub(crate) const SNAPSHOT_BUF_CAP: usize = 1 << 20;
87
88pub trait SnapshotSource {
93 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>));
95}
96
97impl SnapshotSource for Store {
98 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
99 self.snapshot_each(f);
100 }
101}
102
103impl SnapshotSource for kevy_store::SnapshotView {
104 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
105 self.each(f);
106 }
107}
108
109pub fn save_snapshot<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<()> {
113 let tmp = write_snapshot_tmp(src, path)?;
114 std::fs::rename(&tmp, path)
115}
116
117pub fn write_snapshot_to<S: SnapshotSource, W: Write>(src: &S, sink: &mut W) -> io::Result<()> {
129 let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, sink);
130 w.write_all(MAGIC)?;
131 w.write_all(&[VERSION])?;
132 let now = kevy_store::now_unix_ms();
135 let mut err: Option<io::Error> = None;
137 src.for_each_entry(|key, value, ttl| {
138 let deadline = ttl.map(|ms| now.saturating_add(ms));
139 if err.is_none()
140 && let Err(e) = write_entry(&mut w, key, value, deadline)
141 {
142 err = Some(e);
143 }
144 });
145 if let Some(e) = err {
146 return Err(e);
147 }
148 w.write_all(&[OP_EOF])?;
149 w.flush()?;
150 Ok(())
151}
152
153pub fn write_snapshot_tmp<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<std::path::PathBuf> {
160 let tmp = tmp_path(path);
161 {
162 let mut file = File::create(&tmp)?;
163 write_snapshot_to(src, &mut file)?;
164 file.sync_all()?; }
166 Ok(tmp)
167}
168
169pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
172 let r = BufReader::new(File::open(path)?);
173 load_snapshot_from(store, r)
174}
175
176pub fn load_snapshot_from<R: Read>(store: &mut Store, mut r: R) -> io::Result<()> {
183 let mut magic = [0u8; 8];
184 r.read_exact(&mut magic)?;
185 if &magic != MAGIC {
186 return Err(io::Error::new(
187 io::ErrorKind::InvalidData,
188 "kevy snapshot: bad magic",
189 ));
190 }
191 let version = read_u8(&mut r)?;
192 if !(VERSION_RELATIVE_TTL..=VERSION).contains(&version) {
193 return Err(io::Error::new(
194 io::ErrorKind::InvalidData,
195 "kevy snapshot: bad version",
196 ));
197 }
198 let absolute_ttl = version >= VERSION_ABSOLUTE_TTL;
204 let now = kevy_store::now_unix_ms();
205
206 loop {
207 let op = read_u8(&mut r)?;
208 if op == OP_EOF {
209 return Ok(());
210 }
211 let raw_ttl = read_ttl(&mut r)?;
212 let ttl = if absolute_ttl {
213 raw_ttl.map(|deadline| deadline.saturating_sub(now))
214 } else {
215 raw_ttl
216 };
217 let key = read_bytes(&mut r)?;
218 match op {
219 OP_STR => {
220 let val = read_bytes(&mut r)?;
221 store.load_str(key, val, ttl);
222 }
223 OP_HASH => {
224 let n = read_u32(&mut r)? as usize;
225 let mut fields = Vec::with_capacity(n);
226 for _ in 0..n {
227 let f = read_bytes(&mut r)?;
228 let v = read_bytes(&mut r)?;
229 fields.push((f, v));
230 }
231 store.load_hash(key, fields, ttl);
232 }
233 OP_LIST => {
234 let n = read_u32(&mut r)? as usize;
235 let mut items = Vec::with_capacity(n);
236 for _ in 0..n {
237 items.push(read_bytes(&mut r)?);
238 }
239 store.load_list(key, items, ttl);
240 }
241 OP_SET => {
242 let n = read_u32(&mut r)? as usize;
243 let mut members = Vec::with_capacity(n);
244 for _ in 0..n {
245 members.push(read_bytes(&mut r)?);
246 }
247 store.load_set(key, members, ttl);
248 }
249 OP_ZSET => {
250 let n = read_u32(&mut r)? as usize;
251 let mut pairs = Vec::with_capacity(n);
252 for _ in 0..n {
253 let m = read_bytes(&mut r)?;
254 let score = f64::from_bits(read_u64(&mut r)?);
255 pairs.push((m, score));
256 }
257 store.load_zset(key, pairs, ttl);
258 }
259 OP_STREAM => {
260 let last_ms = read_u64(&mut r)?;
261 let last_seq = read_u64(&mut r)?;
262 let mxd_ms = read_u64(&mut r)?;
263 let mxd_seq = read_u64(&mut r)?;
264 let entries_added = read_u64(&mut r)?;
265 let n = read_u32(&mut r)? as usize;
266 let mut entries = Vec::with_capacity(n);
267 for _ in 0..n {
268 let ms = read_u64(&mut r)?;
269 let seq = read_u64(&mut r)?;
270 let nf = read_u32(&mut r)? as usize;
271 let mut fv = Vec::with_capacity(nf);
272 for _ in 0..nf {
273 let f = read_bytes(&mut r)?;
274 let v = read_bytes(&mut r)?;
275 fv.push((f, v));
276 }
277 entries.push((ms, seq, fv));
278 }
279 let groups = if version >= VERSION {
282 read_stream_groups(&mut r)?
283 } else {
284 Vec::new()
285 };
286 store.load_stream(
287 key,
288 entries,
289 (last_ms, last_seq),
290 (mxd_ms, mxd_seq),
291 entries_added,
292 groups,
293 ttl,
294 );
295 }
296 other => {
297 return Err(io::Error::new(
298 io::ErrorKind::InvalidData,
299 format!("kevy snapshot: unknown opcode {other}"),
300 ));
301 }
302 }
303 }
304}
305
306fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
308 let op = match value {
309 Value::Str(_) => OP_STR,
310 Value::Hash(_) => OP_HASH,
311 Value::List(_) => OP_LIST,
312 Value::Set(_) => OP_SET,
313 Value::ZSet(_) => OP_ZSET,
314 Value::Stream(_) => OP_STREAM,
315 };
316 w.write_all(&[op])?;
317 write_ttl(w, ttl)?;
318 write_bytes(w, key)?;
319 match value {
320 Value::Str(v) => write_bytes(w, v.as_slice()),
321 Value::Hash(h) => write_hash_payload(w, h),
322 Value::List(l) => write_list_payload(w, l),
323 Value::Set(set) => write_set_payload(w, set),
324 Value::ZSet(z) => write_zset_payload(w, z),
325 Value::Stream(s) => write_stream_payload(w, s),
326 }
327}
328
329fn write_hash_payload<W: Write>(w: &mut W, h: &kevy_store::HashData) -> io::Result<()> {
330 w.write_all(&(h.len() as u32).to_le_bytes())?;
331 for (f, v) in h {
332 write_bytes(w, f.as_slice())?;
333 write_bytes(w, v)?;
334 }
335 Ok(())
336}
337
338fn write_list_payload<W: Write>(w: &mut W, l: &kevy_store::ListData) -> io::Result<()> {
339 w.write_all(&(l.len() as u32).to_le_bytes())?;
340 for item in l {
341 write_bytes(w, item)?;
342 }
343 Ok(())
344}
345
346fn write_set_payload<W: Write>(w: &mut W, set: &kevy_store::SetData) -> io::Result<()> {
347 w.write_all(&(set.len() as u32).to_le_bytes())?;
348 for m in set {
349 write_bytes(w, m.as_slice())?;
350 }
351 Ok(())
352}
353
354fn write_zset_payload<W: Write>(w: &mut W, z: &kevy_store::ZSetData) -> io::Result<()> {
355 let entries: Vec<(&[u8], f64)> = z.ordered().collect();
356 w.write_all(&(entries.len() as u32).to_le_bytes())?;
357 for (m, score) in entries {
358 write_bytes(w, m)?;
359 w.write_all(&score.to_bits().to_le_bytes())?;
360 }
361 Ok(())
362}
363
364fn write_stream_payload<W: Write>(w: &mut W, s: &kevy_store::StreamData) -> io::Result<()> {
365 w.write_all(&s.last_id().ms.to_le_bytes())?;
366 w.write_all(&s.last_id().seq.to_le_bytes())?;
367 w.write_all(&s.max_deleted_id().ms.to_le_bytes())?;
368 w.write_all(&s.max_deleted_id().seq.to_le_bytes())?;
369 w.write_all(&s.entries_added().to_le_bytes())?;
370 w.write_all(&(s.length() as u32).to_le_bytes())?;
371 for (id, fv) in s.iter_entries() {
372 w.write_all(&id.ms.to_le_bytes())?;
373 w.write_all(&id.seq.to_le_bytes())?;
374 w.write_all(&(fv.len() as u32).to_le_bytes())?;
375 for (f, v) in fv {
376 write_bytes(w, f.as_slice())?;
377 write_bytes(w, v.as_slice())?;
378 }
379 }
380 write_stream_groups(w, &s.export_groups())
381}
382
383fn write_stream_groups<W: Write>(w: &mut W, groups: &[kevy_store::LoadedGroup]) -> io::Result<()> {
388 w.write_all(&(groups.len() as u32).to_le_bytes())?;
389 for g in groups {
390 write_bytes(w, &g.name)?;
391 w.write_all(&g.last_delivered.0.to_le_bytes())?;
392 w.write_all(&g.last_delivered.1.to_le_bytes())?;
393 w.write_all(&(g.consumers.len() as u32).to_le_bytes())?;
394 for (name, last_seen_ms) in &g.consumers {
395 write_bytes(w, name)?;
396 w.write_all(&last_seen_ms.to_le_bytes())?;
397 }
398 w.write_all(&(g.pel.len() as u32).to_le_bytes())?;
399 for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
400 w.write_all(&ms.to_le_bytes())?;
401 w.write_all(&seq.to_le_bytes())?;
402 write_bytes(w, consumer)?;
403 w.write_all(&delivery_time_ms.to_le_bytes())?;
404 w.write_all(&delivery_count.to_le_bytes())?;
405 }
406 }
407 Ok(())
408}
409
410fn read_stream_groups<R: Read>(r: &mut R) -> io::Result<Vec<kevy_store::LoadedGroup>> {
412 let n = read_u32(r)? as usize;
413 let mut groups = Vec::with_capacity(n);
414 for _ in 0..n {
415 let name = read_bytes(r)?;
416 let last_delivered = (read_u64(r)?, read_u64(r)?);
417 let nc = read_u32(r)? as usize;
418 let mut consumers = Vec::with_capacity(nc);
419 for _ in 0..nc {
420 let cname = read_bytes(r)?;
421 consumers.push((cname, read_u64(r)?));
422 }
423 let np = read_u32(r)? as usize;
424 let mut pel = Vec::with_capacity(np);
425 for _ in 0..np {
426 let ms = read_u64(r)?;
427 let seq = read_u64(r)?;
428 let consumer = read_bytes(r)?;
429 let delivery_time_ms = read_u64(r)?;
430 let delivery_count = read_u32(r)?;
431 pel.push((ms, seq, consumer, delivery_time_ms, delivery_count));
432 }
433 groups.push(kevy_store::LoadedGroup { name, last_delivered, consumers, pel });
434 }
435 Ok(groups)
436}
437
438fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
439 match ttl {
440 Some(ms) => {
441 w.write_all(&[1u8])?;
442 w.write_all(&ms.to_le_bytes())?;
443 }
444 None => w.write_all(&[0u8])?,
445 }
446 Ok(())
447}
448
449fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
450 if read_u8(r)? == 1 {
451 Ok(Some(read_u64(r)?))
452 } else {
453 Ok(None)
454 }
455}
456
457fn tmp_path(path: &Path) -> std::path::PathBuf {
458 let mut s = path.as_os_str().to_owned();
459 s.push(".tmp");
460 s.into()
461}
462
463fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
464 w.write_all(&(b.len() as u32).to_le_bytes())?;
465 w.write_all(b)
466}
467
468fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
469 let len = read_u32(r)? as usize;
470 let mut buf = vec![0u8; len];
471 r.read_exact(&mut buf)?;
472 Ok(buf)
473}
474
475fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
476 let mut b = [0u8; 1];
477 r.read_exact(&mut b)?;
478 Ok(b[0])
479}
480
481fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
482 let mut b = [0u8; 4];
483 r.read_exact(&mut b)?;
484 Ok(u32::from_le_bytes(b))
485}
486
487fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
488 let mut b = [0u8; 8];
489 r.read_exact(&mut b)?;
490 Ok(u64::from_le_bytes(b))
491}
492
493#[cfg(test)]
494mod tests;
495#[cfg(test)]
496mod tests_aof;
497#[cfg(test)]
498mod tests_rewrite;