1#![forbid(unsafe_code)]
38
39mod aof;
40pub mod layout;
41mod replay;
42pub mod reshard;
43mod rewrite_fmt;
44mod shards_meta;
45mod snapshot_payload;
46
47pub use aof::{Aof, Fsync, RewritePlan, RewriteStats, write_aof_base};
48pub use replay::replay_aof;
49pub use shards_meta::{Routing, ShardsMeta, read_shards_meta, write_shards_meta};
50pub use kevy_resp::{Argv, ArgvView};
51pub use rewrite_fmt::dump_aof;
52pub(crate) use rewrite_fmt::{dump_store_to_buf, estimate_multibulk_bytes, write_multibulk};
53use kevy_store::Store;
54use kevy_store::Value;
55use std::fs::File;
57use std::io::{self, BufReader, BufWriter, Read, Write};
58use std::path::Path;
59
60const MAGIC: &[u8; 8] = b"KEVYSNAP";
70const VERSION: u8 = 4;
71const VERSION_RELATIVE_TTL: u8 = 2;
72const VERSION_ABSOLUTE_TTL: u8 = 3;
73
74const OP_EOF: u8 = 0;
77const OP_STR: u8 = 1;
78const OP_HASH: u8 = 2;
79const OP_LIST: u8 = 3;
80const OP_SET: u8 = 4;
81const OP_ZSET: u8 = 5;
82const OP_STREAM: u8 = 6;
83
84pub(crate) const SNAPSHOT_BUF_CAP: usize = 1 << 20;
88
89pub trait SnapshotSource {
94 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>));
96}
97
98impl SnapshotSource for Store {
99 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
100 self.snapshot_each(f);
101 }
102}
103
104impl SnapshotSource for kevy_store::SnapshotView {
105 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
106 self.each(f);
107 }
108}
109
110pub fn save_snapshot<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<()> {
114 let tmp = write_snapshot_tmp(src, path)?;
115 std::fs::rename(&tmp, path)
116}
117
118pub fn write_snapshot_to<S: SnapshotSource, W: Write>(src: &S, sink: &mut W) -> io::Result<()> {
130 let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, sink);
131 w.write_all(MAGIC)?;
132 w.write_all(&[VERSION])?;
133 let now = kevy_store::now_unix_ms();
136 let mut err: Option<io::Error> = None;
138 src.for_each_entry(|key, value, ttl| {
139 let deadline = ttl.map(|ms| now.saturating_add(ms));
140 if err.is_none()
141 && let Err(e) = write_entry(&mut w, key, value, deadline)
142 {
143 err = Some(e);
144 }
145 });
146 if let Some(e) = err {
147 return Err(e);
148 }
149 w.write_all(&[OP_EOF])?;
150 w.flush()?;
151 Ok(())
152}
153
154pub fn write_snapshot_tmp<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<std::path::PathBuf> {
161 let tmp = tmp_path(path);
162 {
163 let mut file = File::create(&tmp)?;
164 write_snapshot_to(src, &mut file)?;
165 file.sync_all()?; }
167 Ok(tmp)
168}
169
170pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
173 let r = BufReader::new(File::open(path)?);
174 load_snapshot_from(store, r)
175}
176
177pub fn load_snapshot_from<R: Read>(store: &mut Store, mut r: R) -> io::Result<()> {
184 let mut magic = [0u8; 8];
185 r.read_exact(&mut magic)?;
186 if &magic != MAGIC {
187 return Err(io::Error::new(
188 io::ErrorKind::InvalidData,
189 "kevy snapshot: bad magic",
190 ));
191 }
192 let version = read_u8(&mut r)?;
193 if !(VERSION_RELATIVE_TTL..=VERSION).contains(&version) {
194 return Err(io::Error::new(
195 io::ErrorKind::InvalidData,
196 "kevy snapshot: bad version",
197 ));
198 }
199 let absolute_ttl = version >= VERSION_ABSOLUTE_TTL;
205 let now = kevy_store::now_unix_ms();
206
207 loop {
208 let op = read_u8(&mut r)?;
209 if op == OP_EOF {
210 return Ok(());
211 }
212 let raw_ttl = read_ttl(&mut r)?;
213 let ttl = if absolute_ttl {
214 raw_ttl.map(|deadline| deadline.saturating_sub(now))
215 } else {
216 raw_ttl
217 };
218 let key = read_bytes(&mut r)?;
219 match op {
220 OP_STR => {
221 let val = read_bytes(&mut r)?;
222 store.load_str(key, val, ttl);
223 }
224 OP_HASH => {
225 let n = read_u32(&mut r)? as usize;
226 let mut fields = Vec::with_capacity(n);
227 for _ in 0..n {
228 let f = read_bytes(&mut r)?;
229 let v = read_bytes(&mut r)?;
230 fields.push((f, v));
231 }
232 store.load_hash(key, fields, ttl);
233 }
234 OP_LIST => {
235 let n = read_u32(&mut r)? as usize;
236 let mut items = Vec::with_capacity(n);
237 for _ in 0..n {
238 items.push(read_bytes(&mut r)?);
239 }
240 store.load_list(key, items, ttl);
241 }
242 OP_SET => {
243 let n = read_u32(&mut r)? as usize;
244 let mut members = Vec::with_capacity(n);
245 for _ in 0..n {
246 members.push(read_bytes(&mut r)?);
247 }
248 store.load_set(key, members, ttl);
249 }
250 OP_ZSET => {
251 let n = read_u32(&mut r)? as usize;
252 let mut pairs = Vec::with_capacity(n);
253 for _ in 0..n {
254 let m = read_bytes(&mut r)?;
255 let score = f64::from_bits(read_u64(&mut r)?);
256 pairs.push((m, score));
257 }
258 store.load_zset(key, pairs, ttl);
259 }
260 OP_STREAM => {
261 let last_ms = read_u64(&mut r)?;
262 let last_seq = read_u64(&mut r)?;
263 let mxd_ms = read_u64(&mut r)?;
264 let mxd_seq = read_u64(&mut r)?;
265 let entries_added = read_u64(&mut r)?;
266 let n = read_u32(&mut r)? as usize;
267 let mut entries = Vec::with_capacity(n);
268 for _ in 0..n {
269 let ms = read_u64(&mut r)?;
270 let seq = read_u64(&mut r)?;
271 let nf = read_u32(&mut r)? as usize;
272 let mut fv = Vec::with_capacity(nf);
273 for _ in 0..nf {
274 let f = read_bytes(&mut r)?;
275 let v = read_bytes(&mut r)?;
276 fv.push((f, v));
277 }
278 entries.push((ms, seq, fv));
279 }
280 let groups = if version >= VERSION {
283 read_stream_groups(&mut r)?
284 } else {
285 Vec::new()
286 };
287 store.load_stream(
288 key,
289 entries,
290 (last_ms, last_seq),
291 (mxd_ms, mxd_seq),
292 entries_added,
293 groups,
294 ttl,
295 );
296 }
297 other => {
298 return Err(io::Error::new(
299 io::ErrorKind::InvalidData,
300 format!("kevy snapshot: unknown opcode {other}"),
301 ));
302 }
303 }
304 }
305}
306
307fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
309 let op = match value {
310 Value::Str(_) | Value::Int(_) | Value::ArcBulk(_) => OP_STR, Value::Hash(_) | Value::SmallHashInline(_) => OP_HASH,
313 Value::List(_) | Value::SmallListInline(_) => OP_LIST,
314 Value::Set(_) | Value::SmallSetInline(_) => OP_SET,
319 Value::ZSet(_) | Value::SmallZSetInline(_) => OP_ZSET,
320 Value::Stream(_) => OP_STREAM,
321 };
322 w.write_all(&[op])?;
323 write_ttl(w, ttl)?;
324 write_bytes(w, key)?;
325 match value {
326 Value::Str(v) => write_bytes(w, v.as_slice()),
327 Value::Int(n) => write_bytes(w, n.to_string().as_bytes()),
328 Value::ArcBulk(a) => write_bytes(w, a.as_ref()),
329 Value::Hash(h) => snapshot_payload::write_hash_payload(w, h),
330 Value::SmallHashInline(h) => snapshot_payload::write_small_hash_payload(w, h),
331 Value::List(l) => snapshot_payload::write_list_payload(w, l),
332 Value::SmallListInline(l) => snapshot_payload::write_small_list_payload(w, l),
333 Value::Set(set) => snapshot_payload::write_set_payload(w, set),
334 Value::SmallSetInline(s) => snapshot_payload::write_small_set_payload(w, s),
335 Value::ZSet(z) => snapshot_payload::write_zset_payload(w, z),
336 Value::SmallZSetInline(z) => snapshot_payload::write_small_zset_payload(w, z),
337 Value::Stream(s) => snapshot_payload::write_stream_payload(w, s),
338 }
339}
340
341pub(crate) fn write_stream_groups<W: Write>(w: &mut W, groups: &[kevy_store::LoadedGroup]) -> io::Result<()> {
346 w.write_all(&(groups.len() as u32).to_le_bytes())?;
347 for g in groups {
348 write_bytes(w, &g.name)?;
349 w.write_all(&g.last_delivered.0.to_le_bytes())?;
350 w.write_all(&g.last_delivered.1.to_le_bytes())?;
351 w.write_all(&(g.consumers.len() as u32).to_le_bytes())?;
352 for (name, last_seen_ms) in &g.consumers {
353 write_bytes(w, name)?;
354 w.write_all(&last_seen_ms.to_le_bytes())?;
355 }
356 w.write_all(&(g.pel.len() as u32).to_le_bytes())?;
357 for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
358 w.write_all(&ms.to_le_bytes())?;
359 w.write_all(&seq.to_le_bytes())?;
360 write_bytes(w, consumer)?;
361 w.write_all(&delivery_time_ms.to_le_bytes())?;
362 w.write_all(&delivery_count.to_le_bytes())?;
363 }
364 }
365 Ok(())
366}
367
368fn read_stream_groups<R: Read>(r: &mut R) -> io::Result<Vec<kevy_store::LoadedGroup>> {
370 let n = read_u32(r)? as usize;
371 let mut groups = Vec::with_capacity(n);
372 for _ in 0..n {
373 let name = read_bytes(r)?;
374 let last_delivered = (read_u64(r)?, read_u64(r)?);
375 let nc = read_u32(r)? as usize;
376 let mut consumers = Vec::with_capacity(nc);
377 for _ in 0..nc {
378 let cname = read_bytes(r)?;
379 consumers.push((cname, read_u64(r)?));
380 }
381 let np = read_u32(r)? as usize;
382 let mut pel = Vec::with_capacity(np);
383 for _ in 0..np {
384 let ms = read_u64(r)?;
385 let seq = read_u64(r)?;
386 let consumer = read_bytes(r)?;
387 let delivery_time_ms = read_u64(r)?;
388 let delivery_count = read_u32(r)?;
389 pel.push((ms, seq, consumer, delivery_time_ms, delivery_count));
390 }
391 groups.push(kevy_store::LoadedGroup { name, last_delivered, consumers, pel });
392 }
393 Ok(groups)
394}
395
396fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
397 match ttl {
398 Some(ms) => {
399 w.write_all(&[1u8])?;
400 w.write_all(&ms.to_le_bytes())?;
401 }
402 None => w.write_all(&[0u8])?,
403 }
404 Ok(())
405}
406
407fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
408 if read_u8(r)? == 1 {
409 Ok(Some(read_u64(r)?))
410 } else {
411 Ok(None)
412 }
413}
414
415fn tmp_path(path: &Path) -> std::path::PathBuf {
416 let mut s = path.as_os_str().to_owned();
417 s.push(".tmp");
418 s.into()
419}
420
421pub(crate) fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
422 w.write_all(&(b.len() as u32).to_le_bytes())?;
423 w.write_all(b)
424}
425
426fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
427 let len = read_u32(r)? as usize;
428 let mut buf = vec![0u8; len];
429 r.read_exact(&mut buf)?;
430 Ok(buf)
431}
432
433fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
434 let mut b = [0u8; 1];
435 r.read_exact(&mut b)?;
436 Ok(b[0])
437}
438
439fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
440 let mut b = [0u8; 4];
441 r.read_exact(&mut b)?;
442 Ok(u32::from_le_bytes(b))
443}
444
445fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
446 let mut b = [0u8; 8];
447 r.read_exact(&mut b)?;
448 Ok(u64::from_le_bytes(b))
449}
450
451#[cfg(test)]
452mod tests;
453#[cfg(test)]
454mod tests_aof;
455#[cfg(test)]
456mod tests_rewrite;