1#![forbid(unsafe_code)]
38
39mod aof;
40pub mod layout;
41mod replay;
42pub mod reshard;
43mod rewrite_fmt;
44mod shards_meta;
45
46pub use aof::{Aof, Fsync, RewritePlan, RewriteStats, write_aof_base};
47pub use replay::replay_aof;
48pub use shards_meta::{Routing, ShardsMeta, read_shards_meta, write_shards_meta};
49pub use kevy_resp::{Argv, ArgvView};
50pub use rewrite_fmt::dump_aof;
51pub(crate) use rewrite_fmt::{dump_store_to_buf, estimate_multibulk_bytes, write_multibulk};
52use kevy_store::Store;
53use kevy_store::Value;
54use std::fs::File;
56use std::io::{self, BufReader, BufWriter, Read, Write};
57use std::path::Path;
58
59const MAGIC: &[u8; 8] = b"KEVYSNAP";
69const VERSION: u8 = 4;
70const VERSION_RELATIVE_TTL: u8 = 2;
71const VERSION_ABSOLUTE_TTL: u8 = 3;
72
73const OP_EOF: u8 = 0;
76const OP_STR: u8 = 1;
77const OP_HASH: u8 = 2;
78const OP_LIST: u8 = 3;
79const OP_SET: u8 = 4;
80const OP_ZSET: u8 = 5;
81const OP_STREAM: u8 = 6;
82
83pub(crate) const SNAPSHOT_BUF_CAP: usize = 1 << 20;
87
88pub trait SnapshotSource {
93 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>));
95}
96
97impl SnapshotSource for Store {
98 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
99 self.snapshot_each(f);
100 }
101}
102
103impl SnapshotSource for kevy_store::SnapshotView {
104 fn for_each_entry(&self, f: impl FnMut(&[u8], &Value, Option<u64>)) {
105 self.each(f);
106 }
107}
108
109pub fn save_snapshot<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<()> {
113 let tmp = write_snapshot_tmp(src, path)?;
114 std::fs::rename(&tmp, path)
115}
116
117pub fn write_snapshot_tmp<S: SnapshotSource>(src: &S, path: &Path) -> io::Result<std::path::PathBuf> {
124 let tmp = tmp_path(path);
125 {
126 let mut w = BufWriter::with_capacity(SNAPSHOT_BUF_CAP, File::create(&tmp)?);
127 w.write_all(MAGIC)?;
128 w.write_all(&[VERSION])?;
129 let now = kevy_store::now_unix_ms();
132 let mut err: Option<io::Error> = None;
134 src.for_each_entry(|key, value, ttl| {
135 let deadline = ttl.map(|ms| now.saturating_add(ms));
136 if err.is_none()
137 && let Err(e) = write_entry(&mut w, key, value, deadline)
138 {
139 err = Some(e);
140 }
141 });
142 if let Some(e) = err {
143 return Err(e);
144 }
145 w.write_all(&[OP_EOF])?;
146 w.flush()?;
147 w.get_ref().sync_all()?; }
149 Ok(tmp)
150}
151
152pub fn load_snapshot(store: &mut Store, path: &Path) -> io::Result<()> {
155 let mut r = BufReader::new(File::open(path)?);
156
157 let mut magic = [0u8; 8];
158 r.read_exact(&mut magic)?;
159 if &magic != MAGIC {
160 return Err(io::Error::new(
161 io::ErrorKind::InvalidData,
162 "kevy snapshot: bad magic",
163 ));
164 }
165 let version = read_u8(&mut r)?;
166 if !(VERSION_RELATIVE_TTL..=VERSION).contains(&version) {
167 return Err(io::Error::new(
168 io::ErrorKind::InvalidData,
169 "kevy snapshot: bad version",
170 ));
171 }
172 let absolute_ttl = version >= VERSION_ABSOLUTE_TTL;
178 let now = kevy_store::now_unix_ms();
179
180 loop {
181 let op = read_u8(&mut r)?;
182 if op == OP_EOF {
183 return Ok(());
184 }
185 let raw_ttl = read_ttl(&mut r)?;
186 let ttl = if absolute_ttl {
187 raw_ttl.map(|deadline| deadline.saturating_sub(now))
188 } else {
189 raw_ttl
190 };
191 let key = read_bytes(&mut r)?;
192 match op {
193 OP_STR => {
194 let val = read_bytes(&mut r)?;
195 store.load_str(key, val, ttl);
196 }
197 OP_HASH => {
198 let n = read_u32(&mut r)? as usize;
199 let mut fields = Vec::with_capacity(n);
200 for _ in 0..n {
201 let f = read_bytes(&mut r)?;
202 let v = read_bytes(&mut r)?;
203 fields.push((f, v));
204 }
205 store.load_hash(key, fields, ttl);
206 }
207 OP_LIST => {
208 let n = read_u32(&mut r)? as usize;
209 let mut items = Vec::with_capacity(n);
210 for _ in 0..n {
211 items.push(read_bytes(&mut r)?);
212 }
213 store.load_list(key, items, ttl);
214 }
215 OP_SET => {
216 let n = read_u32(&mut r)? as usize;
217 let mut members = Vec::with_capacity(n);
218 for _ in 0..n {
219 members.push(read_bytes(&mut r)?);
220 }
221 store.load_set(key, members, ttl);
222 }
223 OP_ZSET => {
224 let n = read_u32(&mut r)? as usize;
225 let mut pairs = Vec::with_capacity(n);
226 for _ in 0..n {
227 let m = read_bytes(&mut r)?;
228 let score = f64::from_bits(read_u64(&mut r)?);
229 pairs.push((m, score));
230 }
231 store.load_zset(key, pairs, ttl);
232 }
233 OP_STREAM => {
234 let last_ms = read_u64(&mut r)?;
235 let last_seq = read_u64(&mut r)?;
236 let mxd_ms = read_u64(&mut r)?;
237 let mxd_seq = read_u64(&mut r)?;
238 let entries_added = read_u64(&mut r)?;
239 let n = read_u32(&mut r)? as usize;
240 let mut entries = Vec::with_capacity(n);
241 for _ in 0..n {
242 let ms = read_u64(&mut r)?;
243 let seq = read_u64(&mut r)?;
244 let nf = read_u32(&mut r)? as usize;
245 let mut fv = Vec::with_capacity(nf);
246 for _ in 0..nf {
247 let f = read_bytes(&mut r)?;
248 let v = read_bytes(&mut r)?;
249 fv.push((f, v));
250 }
251 entries.push((ms, seq, fv));
252 }
253 let groups = if version >= VERSION {
256 read_stream_groups(&mut r)?
257 } else {
258 Vec::new()
259 };
260 store.load_stream(
261 key,
262 entries,
263 (last_ms, last_seq),
264 (mxd_ms, mxd_seq),
265 entries_added,
266 groups,
267 ttl,
268 );
269 }
270 other => {
271 return Err(io::Error::new(
272 io::ErrorKind::InvalidData,
273 format!("kevy snapshot: unknown opcode {other}"),
274 ));
275 }
276 }
277 }
278}
279
280fn write_entry<W: Write>(w: &mut W, key: &[u8], value: &Value, ttl: Option<u64>) -> io::Result<()> {
282 match value {
283 Value::Str(v) => {
284 w.write_all(&[OP_STR])?;
285 write_ttl(w, ttl)?;
286 write_bytes(w, key)?;
287 write_bytes(w, v.as_slice())?;
288 }
289 Value::Hash(h) => {
290 w.write_all(&[OP_HASH])?;
291 write_ttl(w, ttl)?;
292 write_bytes(w, key)?;
293 w.write_all(&(h.len() as u32).to_le_bytes())?;
294 for (f, v) in h.iter() {
295 write_bytes(w, f.as_slice())?;
296 write_bytes(w, v)?;
297 }
298 }
299 Value::List(l) => {
300 w.write_all(&[OP_LIST])?;
301 write_ttl(w, ttl)?;
302 write_bytes(w, key)?;
303 w.write_all(&(l.len() as u32).to_le_bytes())?;
304 for item in l.iter() {
305 write_bytes(w, item)?;
306 }
307 }
308 Value::Set(set) => {
309 w.write_all(&[OP_SET])?;
310 write_ttl(w, ttl)?;
311 write_bytes(w, key)?;
312 w.write_all(&(set.len() as u32).to_le_bytes())?;
313 for m in set.iter() {
314 write_bytes(w, m.as_slice())?;
315 }
316 }
317 Value::ZSet(z) => {
318 w.write_all(&[OP_ZSET])?;
319 write_ttl(w, ttl)?;
320 write_bytes(w, key)?;
321 let entries: Vec<(&[u8], f64)> = z.ordered().collect();
322 w.write_all(&(entries.len() as u32).to_le_bytes())?;
323 for (m, score) in entries {
324 write_bytes(w, m)?;
325 w.write_all(&score.to_bits().to_le_bytes())?;
326 }
327 }
328 Value::Stream(s) => {
329 w.write_all(&[OP_STREAM])?;
330 write_ttl(w, ttl)?;
331 write_bytes(w, key)?;
332 w.write_all(&s.last_id().ms.to_le_bytes())?;
333 w.write_all(&s.last_id().seq.to_le_bytes())?;
334 w.write_all(&s.max_deleted_id().ms.to_le_bytes())?;
335 w.write_all(&s.max_deleted_id().seq.to_le_bytes())?;
336 w.write_all(&s.entries_added().to_le_bytes())?;
337 let len = s.length() as u32;
338 w.write_all(&len.to_le_bytes())?;
339 for (id, fv) in s.iter_entries() {
340 w.write_all(&id.ms.to_le_bytes())?;
341 w.write_all(&id.seq.to_le_bytes())?;
342 w.write_all(&(fv.len() as u32).to_le_bytes())?;
343 for (f, v) in fv {
344 write_bytes(w, f.as_slice())?;
345 write_bytes(w, v.as_slice())?;
346 }
347 }
348 write_stream_groups(w, &s.export_groups())?;
349 }
350 }
351 Ok(())
352}
353
354fn write_stream_groups<W: Write>(w: &mut W, groups: &[kevy_store::LoadedGroup]) -> io::Result<()> {
359 w.write_all(&(groups.len() as u32).to_le_bytes())?;
360 for g in groups {
361 write_bytes(w, &g.name)?;
362 w.write_all(&g.last_delivered.0.to_le_bytes())?;
363 w.write_all(&g.last_delivered.1.to_le_bytes())?;
364 w.write_all(&(g.consumers.len() as u32).to_le_bytes())?;
365 for (name, last_seen_ms) in &g.consumers {
366 write_bytes(w, name)?;
367 w.write_all(&last_seen_ms.to_le_bytes())?;
368 }
369 w.write_all(&(g.pel.len() as u32).to_le_bytes())?;
370 for (ms, seq, consumer, delivery_time_ms, delivery_count) in &g.pel {
371 w.write_all(&ms.to_le_bytes())?;
372 w.write_all(&seq.to_le_bytes())?;
373 write_bytes(w, consumer)?;
374 w.write_all(&delivery_time_ms.to_le_bytes())?;
375 w.write_all(&delivery_count.to_le_bytes())?;
376 }
377 }
378 Ok(())
379}
380
381fn read_stream_groups<R: Read>(r: &mut R) -> io::Result<Vec<kevy_store::LoadedGroup>> {
383 let n = read_u32(r)? as usize;
384 let mut groups = Vec::with_capacity(n);
385 for _ in 0..n {
386 let name = read_bytes(r)?;
387 let last_delivered = (read_u64(r)?, read_u64(r)?);
388 let nc = read_u32(r)? as usize;
389 let mut consumers = Vec::with_capacity(nc);
390 for _ in 0..nc {
391 let cname = read_bytes(r)?;
392 consumers.push((cname, read_u64(r)?));
393 }
394 let np = read_u32(r)? as usize;
395 let mut pel = Vec::with_capacity(np);
396 for _ in 0..np {
397 let ms = read_u64(r)?;
398 let seq = read_u64(r)?;
399 let consumer = read_bytes(r)?;
400 let delivery_time_ms = read_u64(r)?;
401 let delivery_count = read_u32(r)?;
402 pel.push((ms, seq, consumer, delivery_time_ms, delivery_count));
403 }
404 groups.push(kevy_store::LoadedGroup { name, last_delivered, consumers, pel });
405 }
406 Ok(groups)
407}
408
409fn write_ttl<W: Write>(w: &mut W, ttl: Option<u64>) -> io::Result<()> {
410 match ttl {
411 Some(ms) => {
412 w.write_all(&[1u8])?;
413 w.write_all(&ms.to_le_bytes())?;
414 }
415 None => w.write_all(&[0u8])?,
416 }
417 Ok(())
418}
419
420fn read_ttl<R: Read>(r: &mut R) -> io::Result<Option<u64>> {
421 if read_u8(r)? == 1 {
422 Ok(Some(read_u64(r)?))
423 } else {
424 Ok(None)
425 }
426}
427
428fn tmp_path(path: &Path) -> std::path::PathBuf {
429 let mut s = path.as_os_str().to_owned();
430 s.push(".tmp");
431 s.into()
432}
433
434fn write_bytes<W: Write>(w: &mut W, b: &[u8]) -> io::Result<()> {
435 w.write_all(&(b.len() as u32).to_le_bytes())?;
436 w.write_all(b)
437}
438
439fn read_bytes<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
440 let len = read_u32(r)? as usize;
441 let mut buf = vec![0u8; len];
442 r.read_exact(&mut buf)?;
443 Ok(buf)
444}
445
446fn read_u8<R: Read>(r: &mut R) -> io::Result<u8> {
447 let mut b = [0u8; 1];
448 r.read_exact(&mut b)?;
449 Ok(b[0])
450}
451
452fn read_u32<R: Read>(r: &mut R) -> io::Result<u32> {
453 let mut b = [0u8; 4];
454 r.read_exact(&mut b)?;
455 Ok(u32::from_le_bytes(b))
456}
457
458fn read_u64<R: Read>(r: &mut R) -> io::Result<u64> {
459 let mut b = [0u8; 8];
460 r.read_exact(&mut b)?;
461 Ok(u64::from_le_bytes(b))
462}
463
464#[cfg(test)]
465mod tests;
466#[cfg(test)]
467mod tests_aof;
468#[cfg(test)]
469mod tests_rewrite;