1use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32 let bytes = format::read_bytes(r)?;
33 String::from_utf8(bytes).map_err(|_| {
34 FormatError::Io(io::Error::new(
35 io::ErrorKind::InvalidData,
36 format!("{field} is not valid utf-8"),
37 ))
38 })
39}
40
41fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44 let count = format::read_u32(r)?;
45 format::validate_collection_count(count, field)?;
46 let mut items = Vec::with_capacity(format::capped_capacity(count));
47 for _ in 0..count {
48 items.push(read_string(r, field)?);
49 }
50 Ok(items)
51}
52
53fn read_bytes_list(r: &mut impl io::Read, label: &str) -> Result<Vec<Bytes>, FormatError> {
56 let count = format::read_u32(r)?;
57 format::validate_collection_count(count, label)?;
58 let mut items = Vec::with_capacity(format::capped_capacity(count));
59 for _ in 0..count {
60 items.push(Bytes::from(format::read_bytes(r)?));
61 }
62 Ok(items)
63}
64
65const TAG_SET: u8 = 1;
70const TAG_INCR: u8 = 12;
71const TAG_DECR: u8 = 13;
72const TAG_INCRBY: u8 = 19;
73const TAG_DECRBY: u8 = 20;
74const TAG_APPEND: u8 = 21;
75
76const TAG_LPUSH: u8 = 4;
78const TAG_RPUSH: u8 = 5;
79const TAG_LPOP: u8 = 6;
80const TAG_RPOP: u8 = 7;
81
82const TAG_ZADD: u8 = 8;
84const TAG_ZREM: u8 = 9;
85
86const TAG_HSET: u8 = 14;
88const TAG_HDEL: u8 = 15;
89const TAG_HINCRBY: u8 = 16;
90
91const TAG_SADD: u8 = 17;
93const TAG_SREM: u8 = 18;
94
95const TAG_DEL: u8 = 2;
97const TAG_EXPIRE: u8 = 3;
98const TAG_PERSIST: u8 = 10;
99const TAG_PEXPIRE: u8 = 11;
100const TAG_PEXPIREAT: u8 = 35;
101const TAG_RENAME: u8 = 22;
102const TAG_COPY: u8 = 27;
103const TAG_LSET: u8 = 28;
104const TAG_LTRIM: u8 = 29;
105const TAG_LINSERT: u8 = 30;
106const TAG_LREM: u8 = 31;
107const TAG_SETRANGE: u8 = 32;
108
109const TAG_SETBIT: u8 = 33;
111const TAG_BITOP: u8 = 34;
112
113#[cfg(feature = "vector")]
115const TAG_VADD: u8 = 25;
116#[cfg(feature = "vector")]
117const TAG_VREM: u8 = 26;
118
119#[cfg(feature = "protobuf")]
121const TAG_PROTO_SET: u8 = 23;
122#[cfg(feature = "protobuf")]
123const TAG_PROTO_REGISTER: u8 = 24;
124
125#[derive(Debug, Clone, PartialEq)]
127pub enum AofRecord {
128 Set {
130 key: String,
131 value: Bytes,
132 expire_ms: i64,
133 },
134 Del { key: String },
136 Expire { key: String, seconds: u64 },
138 LPush { key: String, values: Vec<Bytes> },
140 RPush { key: String, values: Vec<Bytes> },
142 LPop { key: String },
144 RPop { key: String },
146 LSet {
148 key: String,
149 index: i64,
150 value: Bytes,
151 },
152 LTrim { key: String, start: i64, stop: i64 },
154 LInsert {
156 key: String,
157 before: bool,
158 pivot: Bytes,
159 value: Bytes,
160 },
161 LRem {
163 key: String,
164 count: i64,
165 value: Bytes,
166 },
167 ZAdd {
169 key: String,
170 members: Vec<(f64, String)>,
171 },
172 ZRem { key: String, members: Vec<String> },
174 Persist { key: String },
176 Pexpire { key: String, milliseconds: u64 },
178 Pexpireat { key: String, timestamp_ms: u64 },
184 Incr { key: String },
186 Decr { key: String },
188 HSet {
190 key: String,
191 fields: Vec<(String, Bytes)>,
192 },
193 HDel { key: String, fields: Vec<String> },
195 HIncrBy {
197 key: String,
198 field: String,
199 delta: i64,
200 },
201 SAdd { key: String, members: Vec<String> },
203 SRem { key: String, members: Vec<String> },
205 IncrBy { key: String, delta: i64 },
207 DecrBy { key: String, delta: i64 },
209 Append { key: String, value: Bytes },
211 SetRange {
213 key: String,
214 offset: usize,
215 value: Bytes,
216 },
217 SetBit { key: String, offset: u64, value: u8 },
219 BitOp {
223 op: u8,
224 dest: String,
225 keys: Vec<String>,
226 },
227 Rename { key: String, newkey: String },
229 Copy {
231 source: String,
232 destination: String,
233 replace: bool,
234 },
235 #[cfg(feature = "vector")]
238 VAdd {
239 key: String,
240 element: String,
241 vector: Vec<f32>,
242 metric: u8,
244 quantization: u8,
246 connectivity: u32,
247 expansion_add: u32,
248 },
249 #[cfg(feature = "vector")]
251 VRem { key: String, element: String },
252 #[cfg(feature = "protobuf")]
254 ProtoSet {
255 key: String,
256 type_name: String,
257 data: Bytes,
258 expire_ms: i64,
259 },
260 #[cfg(feature = "protobuf")]
262 ProtoRegister { name: String, descriptor: Bytes },
263}
264
265impl AofRecord {
266 fn tag(&self) -> u8 {
276 match self {
277 AofRecord::Set { .. } => TAG_SET,
278 AofRecord::Del { .. } => TAG_DEL,
279 AofRecord::Expire { .. } => TAG_EXPIRE,
280 AofRecord::LPush { .. } => TAG_LPUSH,
281 AofRecord::RPush { .. } => TAG_RPUSH,
282 AofRecord::LPop { .. } => TAG_LPOP,
283 AofRecord::RPop { .. } => TAG_RPOP,
284 AofRecord::LSet { .. } => TAG_LSET,
285 AofRecord::LTrim { .. } => TAG_LTRIM,
286 AofRecord::LInsert { .. } => TAG_LINSERT,
287 AofRecord::LRem { .. } => TAG_LREM,
288 AofRecord::ZAdd { .. } => TAG_ZADD,
289 AofRecord::ZRem { .. } => TAG_ZREM,
290 AofRecord::Persist { .. } => TAG_PERSIST,
291 AofRecord::Pexpire { .. } => TAG_PEXPIRE,
292 AofRecord::Pexpireat { .. } => TAG_PEXPIREAT,
293 AofRecord::Incr { .. } => TAG_INCR,
294 AofRecord::Decr { .. } => TAG_DECR,
295 AofRecord::HSet { .. } => TAG_HSET,
296 AofRecord::HDel { .. } => TAG_HDEL,
297 AofRecord::HIncrBy { .. } => TAG_HINCRBY,
298 AofRecord::SAdd { .. } => TAG_SADD,
299 AofRecord::SRem { .. } => TAG_SREM,
300 AofRecord::IncrBy { .. } => TAG_INCRBY,
301 AofRecord::DecrBy { .. } => TAG_DECRBY,
302 AofRecord::Append { .. } => TAG_APPEND,
303 AofRecord::SetRange { .. } => TAG_SETRANGE,
304 AofRecord::SetBit { .. } => TAG_SETBIT,
305 AofRecord::BitOp { .. } => TAG_BITOP,
306 AofRecord::Rename { .. } => TAG_RENAME,
307 AofRecord::Copy { .. } => TAG_COPY,
308 #[cfg(feature = "vector")]
309 AofRecord::VAdd { .. } => TAG_VADD,
310 #[cfg(feature = "vector")]
311 AofRecord::VRem { .. } => TAG_VREM,
312 #[cfg(feature = "protobuf")]
313 AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
314 #[cfg(feature = "protobuf")]
315 AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
316 }
317 }
318
319 fn estimated_size(&self) -> usize {
325 const LEN_PREFIX: usize = 4;
327
328 match self {
329 AofRecord::Set {
331 key,
332 value,
333 expire_ms: _,
334 } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
335 AofRecord::Del { key }
337 | AofRecord::LPop { key }
338 | AofRecord::RPop { key }
339 | AofRecord::Persist { key }
340 | AofRecord::Incr { key }
341 | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
342 AofRecord::Expire { key, .. }
344 | AofRecord::Pexpire { key, .. }
345 | AofRecord::Pexpireat { key, .. } => 1 + LEN_PREFIX + key.len() + 8,
346 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
348 let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
349 1 + LEN_PREFIX + key.len() + 4 + values_size
350 }
351 AofRecord::LSet { key, value, .. } => {
353 1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
354 }
355 AofRecord::LTrim { key, .. } => 1 + LEN_PREFIX + key.len() + 8 + 8,
357 AofRecord::LInsert {
359 key, pivot, value, ..
360 } => {
361 1 + LEN_PREFIX + key.len() + 1 + LEN_PREFIX + pivot.len() + LEN_PREFIX + value.len()
362 }
363 AofRecord::LRem { key, value, .. } => {
365 1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
366 }
367 AofRecord::ZAdd { key, members } => {
368 let members_size: usize =
369 members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
370 1 + LEN_PREFIX + key.len() + 4 + members_size
371 }
372 AofRecord::ZRem { key, members }
373 | AofRecord::SAdd { key, members }
374 | AofRecord::SRem { key, members } => {
375 let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
376 1 + LEN_PREFIX + key.len() + 4 + members_size
377 }
378 AofRecord::HSet { key, fields } => {
379 let fields_size: usize = fields
380 .iter()
381 .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
382 .sum();
383 1 + LEN_PREFIX + key.len() + 4 + fields_size
384 }
385 AofRecord::HDel { key, fields } => {
386 let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
387 1 + LEN_PREFIX + key.len() + 4 + fields_size
388 }
389 AofRecord::HIncrBy { key, field, .. } => {
390 1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
391 }
392 AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
393 1 + LEN_PREFIX + key.len() + 8
394 }
395 AofRecord::Append { key, value } => {
396 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
397 }
398 AofRecord::SetRange { key, value, .. } => {
399 1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
400 }
401 AofRecord::SetBit { key, .. } => 1 + LEN_PREFIX + key.len() + 8 + 1,
403 AofRecord::BitOp { dest, keys, .. } => {
405 let keys_size: usize = keys.iter().map(|k| LEN_PREFIX + k.len()).sum();
406 1 + 1 + LEN_PREFIX + dest.len() + 4 + keys_size
407 }
408 AofRecord::Rename { key, newkey } => {
409 1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
410 }
411 AofRecord::Copy {
412 source,
413 destination,
414 ..
415 } => 1 + LEN_PREFIX + source.len() + LEN_PREFIX + destination.len() + 1,
416 #[cfg(feature = "vector")]
417 AofRecord::VAdd {
418 key,
419 element,
420 vector,
421 ..
422 } => {
423 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
424 }
425 #[cfg(feature = "vector")]
426 AofRecord::VRem { key, element } => {
427 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
428 }
429 #[cfg(feature = "protobuf")]
430 AofRecord::ProtoSet {
431 key,
432 type_name,
433 data,
434 ..
435 } => {
436 1 + LEN_PREFIX
437 + key.len()
438 + LEN_PREFIX
439 + type_name.len()
440 + LEN_PREFIX
441 + data.len()
442 + 8
443 }
444 #[cfg(feature = "protobuf")]
445 AofRecord::ProtoRegister { name, descriptor } => {
446 1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
447 }
448 }
449 }
450
451 pub fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
453 let mut buf = Vec::with_capacity(self.estimated_size());
454 format::write_u8(&mut buf, self.tag())?;
455
456 match self {
457 AofRecord::Del { key }
459 | AofRecord::LPop { key }
460 | AofRecord::RPop { key }
461 | AofRecord::Persist { key }
462 | AofRecord::Incr { key }
463 | AofRecord::Decr { key } => {
464 format::write_bytes(&mut buf, key.as_bytes())?;
465 }
466
467 AofRecord::Set {
469 key,
470 value,
471 expire_ms,
472 } => {
473 format::write_bytes(&mut buf, key.as_bytes())?;
474 format::write_bytes(&mut buf, value)?;
475 format::write_i64(&mut buf, *expire_ms)?;
476 }
477
478 AofRecord::Expire { key, seconds } => {
481 format::write_bytes(&mut buf, key.as_bytes())?;
482 format::write_i64(&mut buf, (*seconds).min(i64::MAX as u64) as i64)?;
483 }
484 AofRecord::Pexpire { key, milliseconds } => {
485 format::write_bytes(&mut buf, key.as_bytes())?;
486 format::write_i64(&mut buf, (*milliseconds).min(i64::MAX as u64) as i64)?;
487 }
488 AofRecord::Pexpireat { key, timestamp_ms } => {
489 format::write_bytes(&mut buf, key.as_bytes())?;
490 format::write_i64(&mut buf, (*timestamp_ms).min(i64::MAX as u64) as i64)?;
491 }
492 AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
493 format::write_bytes(&mut buf, key.as_bytes())?;
494 format::write_i64(&mut buf, *delta)?;
495 }
496
497 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
499 format::write_bytes(&mut buf, key.as_bytes())?;
500 format::write_len(&mut buf, values.len())?;
501 for v in values {
502 format::write_bytes(&mut buf, v)?;
503 }
504 }
505
506 AofRecord::LSet { key, index, value } => {
508 format::write_bytes(&mut buf, key.as_bytes())?;
509 format::write_i64(&mut buf, *index)?;
510 format::write_bytes(&mut buf, value)?;
511 }
512
513 AofRecord::LTrim { key, start, stop } => {
515 format::write_bytes(&mut buf, key.as_bytes())?;
516 format::write_i64(&mut buf, *start)?;
517 format::write_i64(&mut buf, *stop)?;
518 }
519
520 AofRecord::LInsert {
522 key,
523 before,
524 pivot,
525 value,
526 } => {
527 format::write_bytes(&mut buf, key.as_bytes())?;
528 format::write_u8(&mut buf, if *before { 1 } else { 0 })?;
529 format::write_bytes(&mut buf, pivot)?;
530 format::write_bytes(&mut buf, value)?;
531 }
532
533 AofRecord::LRem { key, count, value } => {
535 format::write_bytes(&mut buf, key.as_bytes())?;
536 format::write_i64(&mut buf, *count)?;
537 format::write_bytes(&mut buf, value)?;
538 }
539
540 AofRecord::ZRem { key, members }
542 | AofRecord::SAdd { key, members }
543 | AofRecord::SRem { key, members } => {
544 format::write_bytes(&mut buf, key.as_bytes())?;
545 format::write_len(&mut buf, members.len())?;
546 for member in members {
547 format::write_bytes(&mut buf, member.as_bytes())?;
548 }
549 }
550 AofRecord::HDel { key, fields } => {
551 format::write_bytes(&mut buf, key.as_bytes())?;
552 format::write_len(&mut buf, fields.len())?;
553 for field in fields {
554 format::write_bytes(&mut buf, field.as_bytes())?;
555 }
556 }
557
558 AofRecord::ZAdd { key, members } => {
560 format::write_bytes(&mut buf, key.as_bytes())?;
561 format::write_len(&mut buf, members.len())?;
562 for (score, member) in members {
563 format::write_f64(&mut buf, *score)?;
564 format::write_bytes(&mut buf, member.as_bytes())?;
565 }
566 }
567
568 AofRecord::HSet { key, fields } => {
570 format::write_bytes(&mut buf, key.as_bytes())?;
571 format::write_len(&mut buf, fields.len())?;
572 for (field, value) in fields {
573 format::write_bytes(&mut buf, field.as_bytes())?;
574 format::write_bytes(&mut buf, value)?;
575 }
576 }
577
578 AofRecord::HIncrBy { key, field, delta } => {
580 format::write_bytes(&mut buf, key.as_bytes())?;
581 format::write_bytes(&mut buf, field.as_bytes())?;
582 format::write_i64(&mut buf, *delta)?;
583 }
584
585 AofRecord::Append { key, value } => {
587 format::write_bytes(&mut buf, key.as_bytes())?;
588 format::write_bytes(&mut buf, value)?;
589 }
590
591 AofRecord::SetRange { key, offset, value } => {
593 format::write_bytes(&mut buf, key.as_bytes())?;
594 format::write_i64(&mut buf, *offset as i64)?;
595 format::write_bytes(&mut buf, value)?;
596 }
597
598 AofRecord::SetBit { key, offset, value } => {
600 format::write_bytes(&mut buf, key.as_bytes())?;
601 format::write_i64(&mut buf, *offset as i64)?;
603 format::write_u8(&mut buf, *value)?;
604 }
605
606 AofRecord::BitOp { op, dest, keys } => {
608 format::write_u8(&mut buf, *op)?;
609 format::write_bytes(&mut buf, dest.as_bytes())?;
610 format::write_len(&mut buf, keys.len())?;
611 for key in keys {
612 format::write_bytes(&mut buf, key.as_bytes())?;
613 }
614 }
615
616 AofRecord::Rename { key, newkey } => {
618 format::write_bytes(&mut buf, key.as_bytes())?;
619 format::write_bytes(&mut buf, newkey.as_bytes())?;
620 }
621
622 AofRecord::Copy {
624 source,
625 destination,
626 replace,
627 } => {
628 format::write_bytes(&mut buf, source.as_bytes())?;
629 format::write_bytes(&mut buf, destination.as_bytes())?;
630 buf.push(u8::from(*replace));
631 }
632
633 #[cfg(feature = "vector")]
634 AofRecord::VAdd {
635 key,
636 element,
637 vector,
638 metric,
639 quantization,
640 connectivity,
641 expansion_add,
642 } => {
643 format::write_bytes(&mut buf, key.as_bytes())?;
644 format::write_bytes(&mut buf, element.as_bytes())?;
645 format::write_len(&mut buf, vector.len())?;
646 for &v in vector {
647 format::write_f32(&mut buf, v)?;
648 }
649 format::write_u8(&mut buf, *metric)?;
650 format::write_u8(&mut buf, *quantization)?;
651 format::write_u32(&mut buf, *connectivity)?;
652 format::write_u32(&mut buf, *expansion_add)?;
653 }
654 #[cfg(feature = "vector")]
655 AofRecord::VRem { key, element } => {
656 format::write_bytes(&mut buf, key.as_bytes())?;
657 format::write_bytes(&mut buf, element.as_bytes())?;
658 }
659
660 #[cfg(feature = "protobuf")]
661 AofRecord::ProtoSet {
662 key,
663 type_name,
664 data,
665 expire_ms,
666 } => {
667 format::write_bytes(&mut buf, key.as_bytes())?;
668 format::write_bytes(&mut buf, type_name.as_bytes())?;
669 format::write_bytes(&mut buf, data)?;
670 format::write_i64(&mut buf, *expire_ms)?;
671 }
672 #[cfg(feature = "protobuf")]
673 AofRecord::ProtoRegister { name, descriptor } => {
674 format::write_bytes(&mut buf, name.as_bytes())?;
675 format::write_bytes(&mut buf, descriptor)?;
676 }
677 }
678 Ok(buf)
679 }
680
681 pub fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
686 let mut cursor = io::Cursor::new(data);
687 let tag = format::read_u8(&mut cursor)?;
688 match tag {
689 TAG_SET => {
690 let key = read_string(&mut cursor, "key")?;
691 let value = format::read_bytes(&mut cursor)?;
692 let expire_ms = format::read_i64(&mut cursor)?;
693 Ok(AofRecord::Set {
694 key,
695 value: Bytes::from(value),
696 expire_ms,
697 })
698 }
699 TAG_DEL => {
700 let key = read_string(&mut cursor, "key")?;
701 Ok(AofRecord::Del { key })
702 }
703 TAG_EXPIRE => {
704 let key = read_string(&mut cursor, "key")?;
705 let raw = format::read_i64(&mut cursor)?;
706 let seconds = u64::try_from(raw).map_err(|_| {
707 FormatError::InvalidData(format!(
708 "EXPIRE seconds is negative ({raw}) in AOF record"
709 ))
710 })?;
711 Ok(AofRecord::Expire { key, seconds })
712 }
713 TAG_LPUSH | TAG_RPUSH => {
714 let key = read_string(&mut cursor, "key")?;
715 let values = read_bytes_list(&mut cursor, "list")?;
716 if tag == TAG_LPUSH {
717 Ok(AofRecord::LPush { key, values })
718 } else {
719 Ok(AofRecord::RPush { key, values })
720 }
721 }
722 TAG_LPOP => {
723 let key = read_string(&mut cursor, "key")?;
724 Ok(AofRecord::LPop { key })
725 }
726 TAG_RPOP => {
727 let key = read_string(&mut cursor, "key")?;
728 Ok(AofRecord::RPop { key })
729 }
730 TAG_LSET => {
731 let key = read_string(&mut cursor, "key")?;
732 let index = format::read_i64(&mut cursor)?;
733 let value = Bytes::from(format::read_bytes(&mut cursor)?);
734 Ok(AofRecord::LSet { key, index, value })
735 }
736 TAG_LTRIM => {
737 let key = read_string(&mut cursor, "key")?;
738 let start = format::read_i64(&mut cursor)?;
739 let stop = format::read_i64(&mut cursor)?;
740 Ok(AofRecord::LTrim { key, start, stop })
741 }
742 TAG_LINSERT => {
743 let key = read_string(&mut cursor, "key")?;
744 let before_byte = format::read_u8(&mut cursor)?;
745 let before = before_byte != 0;
746 let pivot = Bytes::from(format::read_bytes(&mut cursor)?);
747 let value = Bytes::from(format::read_bytes(&mut cursor)?);
748 Ok(AofRecord::LInsert {
749 key,
750 before,
751 pivot,
752 value,
753 })
754 }
755 TAG_LREM => {
756 let key = read_string(&mut cursor, "key")?;
757 let count = format::read_i64(&mut cursor)?;
758 let value = Bytes::from(format::read_bytes(&mut cursor)?);
759 Ok(AofRecord::LRem { key, count, value })
760 }
761 TAG_ZADD => {
762 let key = read_string(&mut cursor, "key")?;
763 let count = format::read_u32(&mut cursor)?;
764 format::validate_collection_count(count, "sorted set")?;
765 let mut members = Vec::with_capacity(format::capped_capacity(count));
766 for _ in 0..count {
767 let score = format::read_f64(&mut cursor)?;
768 let member = read_string(&mut cursor, "member")?;
769 members.push((score, member));
770 }
771 Ok(AofRecord::ZAdd { key, members })
772 }
773 TAG_ZREM => {
774 let key = read_string(&mut cursor, "key")?;
775 let members = read_string_list(&mut cursor, "member")?;
776 Ok(AofRecord::ZRem { key, members })
777 }
778 TAG_PERSIST => {
779 let key = read_string(&mut cursor, "key")?;
780 Ok(AofRecord::Persist { key })
781 }
782 TAG_PEXPIRE => {
783 let key = read_string(&mut cursor, "key")?;
784 let raw = format::read_i64(&mut cursor)?;
785 let milliseconds = u64::try_from(raw).map_err(|_| {
786 FormatError::InvalidData(format!(
787 "PEXPIRE milliseconds is negative ({raw}) in AOF record"
788 ))
789 })?;
790 Ok(AofRecord::Pexpire { key, milliseconds })
791 }
792 TAG_PEXPIREAT => {
793 let key = read_string(&mut cursor, "key")?;
794 let raw = format::read_i64(&mut cursor)?;
795 let timestamp_ms = u64::try_from(raw).map_err(|_| {
796 FormatError::InvalidData(format!(
797 "PEXPIREAT timestamp_ms is negative ({raw}) in AOF record"
798 ))
799 })?;
800 Ok(AofRecord::Pexpireat { key, timestamp_ms })
801 }
802 TAG_INCR => {
803 let key = read_string(&mut cursor, "key")?;
804 Ok(AofRecord::Incr { key })
805 }
806 TAG_DECR => {
807 let key = read_string(&mut cursor, "key")?;
808 Ok(AofRecord::Decr { key })
809 }
810 TAG_HSET => {
811 let key = read_string(&mut cursor, "key")?;
812 let count = format::read_u32(&mut cursor)?;
813 format::validate_collection_count(count, "hash")?;
814 let mut fields = Vec::with_capacity(format::capped_capacity(count));
815 for _ in 0..count {
816 let field = read_string(&mut cursor, "field")?;
817 let value = Bytes::from(format::read_bytes(&mut cursor)?);
818 fields.push((field, value));
819 }
820 Ok(AofRecord::HSet { key, fields })
821 }
822 TAG_HDEL => {
823 let key = read_string(&mut cursor, "key")?;
824 let fields = read_string_list(&mut cursor, "field")?;
825 Ok(AofRecord::HDel { key, fields })
826 }
827 TAG_HINCRBY => {
828 let key = read_string(&mut cursor, "key")?;
829 let field = read_string(&mut cursor, "field")?;
830 let delta = format::read_i64(&mut cursor)?;
831 Ok(AofRecord::HIncrBy { key, field, delta })
832 }
833 TAG_SADD => {
834 let key = read_string(&mut cursor, "key")?;
835 let members = read_string_list(&mut cursor, "member")?;
836 Ok(AofRecord::SAdd { key, members })
837 }
838 TAG_SREM => {
839 let key = read_string(&mut cursor, "key")?;
840 let members = read_string_list(&mut cursor, "member")?;
841 Ok(AofRecord::SRem { key, members })
842 }
843 TAG_INCRBY => {
844 let key = read_string(&mut cursor, "key")?;
845 let delta = format::read_i64(&mut cursor)?;
846 Ok(AofRecord::IncrBy { key, delta })
847 }
848 TAG_DECRBY => {
849 let key = read_string(&mut cursor, "key")?;
850 let delta = format::read_i64(&mut cursor)?;
851 Ok(AofRecord::DecrBy { key, delta })
852 }
853 TAG_APPEND => {
854 let key = read_string(&mut cursor, "key")?;
855 let value = Bytes::from(format::read_bytes(&mut cursor)?);
856 Ok(AofRecord::Append { key, value })
857 }
858 TAG_SETRANGE => {
859 let key = read_string(&mut cursor, "key")?;
860 let offset = format::read_i64(&mut cursor)? as usize;
861 let value = Bytes::from(format::read_bytes(&mut cursor)?);
862 Ok(AofRecord::SetRange { key, offset, value })
863 }
864 TAG_RENAME => {
865 let key = read_string(&mut cursor, "key")?;
866 let newkey = read_string(&mut cursor, "newkey")?;
867 Ok(AofRecord::Rename { key, newkey })
868 }
869 TAG_COPY => {
870 let source = read_string(&mut cursor, "source")?;
871 let destination = read_string(&mut cursor, "destination")?;
872 let replace = format::read_u8(&mut cursor)? != 0;
873 Ok(AofRecord::Copy {
874 source,
875 destination,
876 replace,
877 })
878 }
879 TAG_SETBIT => {
880 let key = read_string(&mut cursor, "key")?;
881 let offset = format::read_i64(&mut cursor)? as u64;
882 let value = format::read_u8(&mut cursor)?;
883 Ok(AofRecord::SetBit { key, offset, value })
884 }
885 TAG_BITOP => {
886 let op = format::read_u8(&mut cursor)?;
887 if op > 3 {
888 return Err(FormatError::InvalidData(format!(
889 "BITOP: unknown op byte {op} in AOF record"
890 )));
891 }
892 let dest = read_string(&mut cursor, "dest")?;
893 let keys = read_string_list(&mut cursor, "key")?;
894 Ok(AofRecord::BitOp { op, dest, keys })
895 }
896 #[cfg(feature = "vector")]
897 TAG_VADD => {
898 let key = read_string(&mut cursor, "key")?;
899 let element = read_string(&mut cursor, "element")?;
900 let dim = format::read_u32(&mut cursor)?;
901 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
902 return Err(FormatError::InvalidData(format!(
903 "AOF VADD dimension {dim} exceeds max {}",
904 format::MAX_PERSISTED_VECTOR_DIMS
905 )));
906 }
907 let mut vector = Vec::with_capacity(dim as usize);
908 for _ in 0..dim {
909 vector.push(format::read_f32(&mut cursor)?);
910 }
911 let metric = format::read_u8(&mut cursor)?;
912 let quantization = format::read_u8(&mut cursor)?;
913 let connectivity = format::read_u32(&mut cursor)?;
914 let expansion_add = format::read_u32(&mut cursor)?;
915 Ok(AofRecord::VAdd {
916 key,
917 element,
918 vector,
919 metric,
920 quantization,
921 connectivity,
922 expansion_add,
923 })
924 }
925 #[cfg(feature = "vector")]
926 TAG_VREM => {
927 let key = read_string(&mut cursor, "key")?;
928 let element = read_string(&mut cursor, "element")?;
929 Ok(AofRecord::VRem { key, element })
930 }
931 #[cfg(feature = "protobuf")]
932 TAG_PROTO_SET => {
933 let key = read_string(&mut cursor, "key")?;
934 let type_name = read_string(&mut cursor, "type_name")?;
935 let data = format::read_bytes(&mut cursor)?;
936 let expire_ms = format::read_i64(&mut cursor)?;
937 Ok(AofRecord::ProtoSet {
938 key,
939 type_name,
940 data: Bytes::from(data),
941 expire_ms,
942 })
943 }
944 #[cfg(feature = "protobuf")]
945 TAG_PROTO_REGISTER => {
946 let name = read_string(&mut cursor, "name")?;
947 let descriptor = format::read_bytes(&mut cursor)?;
948 Ok(AofRecord::ProtoRegister {
949 name,
950 descriptor: Bytes::from(descriptor),
951 })
952 }
953 _ => Err(FormatError::UnknownTag(tag)),
954 }
955 }
956}
957
958#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
960pub enum FsyncPolicy {
961 Always,
963 #[default]
965 EverySec,
966 No,
968}
969
970pub struct AofWriter {
972 writer: BufWriter<File>,
973 path: PathBuf,
974 #[cfg(feature = "encryption")]
975 encryption_key: Option<crate::encryption::EncryptionKey>,
976}
977
978impl AofWriter {
979 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
982 let path = path.into();
983 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
984
985 let file = open_persistence_file(&path)?;
986 let mut writer = BufWriter::new(file);
987
988 if !exists {
989 format::write_header(&mut writer, format::AOF_MAGIC)?;
990 writer.flush()?;
991 }
992
993 Ok(Self {
994 writer,
995 path,
996 #[cfg(feature = "encryption")]
997 encryption_key: None,
998 })
999 }
1000
1001 #[cfg(feature = "encryption")]
1007 pub fn open_encrypted(
1008 path: impl Into<PathBuf>,
1009 key: crate::encryption::EncryptionKey,
1010 ) -> Result<Self, FormatError> {
1011 let path = path.into();
1012 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
1013
1014 let file = open_persistence_file(&path)?;
1015 let mut writer = BufWriter::new(file);
1016
1017 if !exists {
1018 format::write_header_versioned(
1019 &mut writer,
1020 format::AOF_MAGIC,
1021 format::FORMAT_VERSION_ENCRYPTED,
1022 )?;
1023 writer.flush()?;
1024 }
1025
1026 Ok(Self {
1027 writer,
1028 path,
1029 encryption_key: Some(key),
1030 })
1031 }
1032
1033 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
1038 let payload = record.to_bytes()?;
1039
1040 #[cfg(feature = "encryption")]
1041 if let Some(ref key) = self.encryption_key {
1042 let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
1043 self.writer.write_all(&nonce)?;
1044 format::write_len(&mut self.writer, ciphertext.len())?;
1045 self.writer.write_all(&ciphertext)?;
1046 return Ok(());
1047 }
1048
1049 let checksum = format::crc32(&payload);
1050 self.writer.write_all(&payload)?;
1051 format::write_u32(&mut self.writer, checksum)?;
1052 Ok(())
1053 }
1054
1055 pub fn flush(&mut self) -> Result<(), FormatError> {
1057 self.writer.flush()?;
1058 Ok(())
1059 }
1060
1061 pub fn sync(&mut self) -> Result<(), FormatError> {
1063 self.writer.flush()?;
1064 self.writer.get_ref().sync_all()?;
1065 Ok(())
1066 }
1067
1068 pub fn path(&self) -> &Path {
1070 &self.path
1071 }
1072
1073 pub fn truncate(&mut self) -> Result<(), FormatError> {
1079 self.writer.flush()?;
1081
1082 let tmp_path = self.path.with_extension("aof.tmp");
1084 let mut opts = OpenOptions::new();
1085 opts.create(true).write(true).truncate(true);
1086 #[cfg(unix)]
1087 {
1088 use std::os::unix::fs::OpenOptionsExt;
1089 opts.mode(0o600);
1090 }
1091 let tmp_file = opts.open(&tmp_path)?;
1092 let mut tmp_writer = BufWriter::new(tmp_file);
1093
1094 #[cfg(feature = "encryption")]
1095 if self.encryption_key.is_some() {
1096 format::write_header_versioned(
1097 &mut tmp_writer,
1098 format::AOF_MAGIC,
1099 format::FORMAT_VERSION_ENCRYPTED,
1100 )?;
1101 } else {
1102 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1103 }
1104 #[cfg(not(feature = "encryption"))]
1105 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1106
1107 tmp_writer.flush()?;
1108 tmp_writer.get_ref().sync_all()?;
1109
1110 std::fs::rename(&tmp_path, &self.path)?;
1112
1113 let file = OpenOptions::new().append(true).open(&self.path)?;
1115 self.writer = BufWriter::new(file);
1116 Ok(())
1117 }
1118}
1119
1120pub struct AofReader {
1122 reader: BufReader<File>,
1123 version: u8,
1125 #[cfg(feature = "encryption")]
1126 encryption_key: Option<crate::encryption::EncryptionKey>,
1127}
1128
1129impl fmt::Debug for AofReader {
1130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1131 f.debug_struct("AofReader")
1132 .field("version", &self.version)
1133 .finish()
1134 }
1135}
1136
1137impl AofReader {
1138 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
1140 let file = File::open(path.as_ref())?;
1141 let mut reader = BufReader::new(file);
1142 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1143
1144 if version == format::FORMAT_VERSION_ENCRYPTED {
1145 return Err(FormatError::EncryptionRequired);
1146 }
1147
1148 Ok(Self {
1149 reader,
1150 version,
1151 #[cfg(feature = "encryption")]
1152 encryption_key: None,
1153 })
1154 }
1155
1156 #[cfg(feature = "encryption")]
1161 pub fn open_encrypted(
1162 path: impl AsRef<Path>,
1163 key: crate::encryption::EncryptionKey,
1164 ) -> Result<Self, FormatError> {
1165 let file = File::open(path.as_ref())?;
1166 let mut reader = BufReader::new(file);
1167 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1168
1169 Ok(Self {
1170 reader,
1171 version,
1172 encryption_key: Some(key),
1173 })
1174 }
1175
1176 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1182 #[cfg(feature = "encryption")]
1183 if self.version == format::FORMAT_VERSION_ENCRYPTED {
1184 return self.read_encrypted_record();
1185 }
1186
1187 self.read_v2_record()
1188 }
1189
1190 fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1192 let tag = match format::read_u8(&mut self.reader) {
1194 Ok(t) => t,
1195 Err(FormatError::UnexpectedEof) => return Ok(None),
1196 Err(e) => return Err(e),
1197 };
1198
1199 let record_result = self.read_payload_for_tag(tag);
1202 match record_result {
1203 Ok((payload, stored_crc)) => {
1204 let mut full = Vec::with_capacity(1 + payload.len());
1206 full.push(tag);
1207 full.extend_from_slice(&payload);
1208 format::verify_crc32(&full, stored_crc)?;
1209 AofRecord::from_bytes(&full).map(Some)
1210 }
1211 Err(FormatError::UnexpectedEof) => Ok(None),
1213 Err(e) => Err(e),
1214 }
1215 }
1216
1217 #[cfg(feature = "encryption")]
1219 fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1220 let key = self
1221 .encryption_key
1222 .as_ref()
1223 .ok_or(FormatError::EncryptionRequired)?;
1224
1225 let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
1227 if let Err(e) = self.reader.read_exact(&mut nonce) {
1228 return if e.kind() == io::ErrorKind::UnexpectedEof {
1229 Ok(None)
1230 } else {
1231 Err(FormatError::Io(e))
1232 };
1233 }
1234
1235 let ct_len = match format::read_u32(&mut self.reader) {
1237 Ok(n) => n as usize,
1238 Err(FormatError::UnexpectedEof) => return Ok(None),
1239 Err(e) => return Err(e),
1240 };
1241
1242 if ct_len > format::MAX_FIELD_LEN {
1243 return Err(FormatError::Io(io::Error::new(
1244 io::ErrorKind::InvalidData,
1245 format!("encrypted record length {ct_len} exceeds maximum"),
1246 )));
1247 }
1248
1249 let mut ciphertext = vec![0u8; ct_len];
1250 if let Err(e) = self.reader.read_exact(&mut ciphertext) {
1251 return if e.kind() == io::ErrorKind::UnexpectedEof {
1252 Ok(None)
1253 } else {
1254 Err(FormatError::Io(e))
1255 };
1256 }
1257
1258 let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
1259 AofRecord::from_bytes(&plaintext).map(Some)
1260 }
1261
1262 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
1264 let mut payload = Vec::new();
1265 match tag {
1266 TAG_SET => {
1267 let key = format::read_bytes(&mut self.reader)?;
1269 format::write_bytes(&mut payload, &key)?;
1270 let value = format::read_bytes(&mut self.reader)?;
1271 format::write_bytes(&mut payload, &value)?;
1272 let expire_ms = format::read_i64(&mut self.reader)?;
1273 format::write_i64(&mut payload, expire_ms)?;
1274 }
1275 TAG_DEL => {
1276 let key = format::read_bytes(&mut self.reader)?;
1277 format::write_bytes(&mut payload, &key)?;
1278 }
1279 TAG_EXPIRE => {
1280 let key = format::read_bytes(&mut self.reader)?;
1281 format::write_bytes(&mut payload, &key)?;
1282 let seconds = format::read_i64(&mut self.reader)?;
1283 format::write_i64(&mut payload, seconds)?;
1284 }
1285 TAG_LPUSH | TAG_RPUSH => {
1286 let key = format::read_bytes(&mut self.reader)?;
1287 format::write_bytes(&mut payload, &key)?;
1288 let count = format::read_u32(&mut self.reader)?;
1289 format::validate_collection_count(count, "list")?;
1290 format::write_u32(&mut payload, count)?;
1291 for _ in 0..count {
1292 let val = format::read_bytes(&mut self.reader)?;
1293 format::write_bytes(&mut payload, &val)?;
1294 }
1295 }
1296 TAG_LPOP | TAG_RPOP => {
1297 let key = format::read_bytes(&mut self.reader)?;
1298 format::write_bytes(&mut payload, &key)?;
1299 }
1300 TAG_ZADD => {
1301 let key = format::read_bytes(&mut self.reader)?;
1302 format::write_bytes(&mut payload, &key)?;
1303 let count = format::read_u32(&mut self.reader)?;
1304 format::validate_collection_count(count, "sorted set")?;
1305 format::write_u32(&mut payload, count)?;
1306 for _ in 0..count {
1307 let score = format::read_f64(&mut self.reader)?;
1308 format::write_f64(&mut payload, score)?;
1309 let member = format::read_bytes(&mut self.reader)?;
1310 format::write_bytes(&mut payload, &member)?;
1311 }
1312 }
1313 TAG_ZREM => {
1314 let key = format::read_bytes(&mut self.reader)?;
1315 format::write_bytes(&mut payload, &key)?;
1316 let count = format::read_u32(&mut self.reader)?;
1317 format::validate_collection_count(count, "sorted set")?;
1318 format::write_u32(&mut payload, count)?;
1319 for _ in 0..count {
1320 let member = format::read_bytes(&mut self.reader)?;
1321 format::write_bytes(&mut payload, &member)?;
1322 }
1323 }
1324 TAG_PERSIST => {
1325 let key = format::read_bytes(&mut self.reader)?;
1326 format::write_bytes(&mut payload, &key)?;
1327 }
1328 TAG_PEXPIRE => {
1329 let key = format::read_bytes(&mut self.reader)?;
1330 format::write_bytes(&mut payload, &key)?;
1331 let millis = format::read_i64(&mut self.reader)?;
1332 format::write_i64(&mut payload, millis)?;
1333 }
1334 TAG_INCR | TAG_DECR => {
1335 let key = format::read_bytes(&mut self.reader)?;
1336 format::write_bytes(&mut payload, &key)?;
1337 }
1338 TAG_HSET => {
1339 let key = format::read_bytes(&mut self.reader)?;
1340 format::write_bytes(&mut payload, &key)?;
1341 let count = format::read_u32(&mut self.reader)?;
1342 format::validate_collection_count(count, "hash")?;
1343 format::write_u32(&mut payload, count)?;
1344 for _ in 0..count {
1345 let field = format::read_bytes(&mut self.reader)?;
1346 format::write_bytes(&mut payload, &field)?;
1347 let value = format::read_bytes(&mut self.reader)?;
1348 format::write_bytes(&mut payload, &value)?;
1349 }
1350 }
1351 TAG_HDEL | TAG_SADD | TAG_SREM => {
1352 let key = format::read_bytes(&mut self.reader)?;
1353 format::write_bytes(&mut payload, &key)?;
1354 let count = format::read_u32(&mut self.reader)?;
1355 format::validate_collection_count(count, "set")?;
1356 format::write_u32(&mut payload, count)?;
1357 for _ in 0..count {
1358 let item = format::read_bytes(&mut self.reader)?;
1359 format::write_bytes(&mut payload, &item)?;
1360 }
1361 }
1362 TAG_HINCRBY => {
1363 let key = format::read_bytes(&mut self.reader)?;
1364 format::write_bytes(&mut payload, &key)?;
1365 let field = format::read_bytes(&mut self.reader)?;
1366 format::write_bytes(&mut payload, &field)?;
1367 let delta = format::read_i64(&mut self.reader)?;
1368 format::write_i64(&mut payload, delta)?;
1369 }
1370 TAG_INCRBY | TAG_DECRBY => {
1371 let key = format::read_bytes(&mut self.reader)?;
1372 format::write_bytes(&mut payload, &key)?;
1373 let delta = format::read_i64(&mut self.reader)?;
1374 format::write_i64(&mut payload, delta)?;
1375 }
1376 TAG_APPEND => {
1377 let key = format::read_bytes(&mut self.reader)?;
1378 format::write_bytes(&mut payload, &key)?;
1379 let value = format::read_bytes(&mut self.reader)?;
1380 format::write_bytes(&mut payload, &value)?;
1381 }
1382 TAG_RENAME => {
1383 let key = format::read_bytes(&mut self.reader)?;
1384 format::write_bytes(&mut payload, &key)?;
1385 let newkey = format::read_bytes(&mut self.reader)?;
1386 format::write_bytes(&mut payload, &newkey)?;
1387 }
1388 TAG_COPY => {
1389 let source = format::read_bytes(&mut self.reader)?;
1390 format::write_bytes(&mut payload, &source)?;
1391 let dest = format::read_bytes(&mut self.reader)?;
1392 format::write_bytes(&mut payload, &dest)?;
1393 let replace = format::read_u8(&mut self.reader)?;
1394 payload.push(replace);
1395 }
1396 #[cfg(feature = "vector")]
1397 TAG_VADD => {
1398 let key = format::read_bytes(&mut self.reader)?;
1399 format::write_bytes(&mut payload, &key)?;
1400 let element = format::read_bytes(&mut self.reader)?;
1401 format::write_bytes(&mut payload, &element)?;
1402 let dim = format::read_u32(&mut self.reader)?;
1403 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1404 return Err(FormatError::InvalidData(format!(
1405 "AOF VADD dimension {dim} exceeds max {}",
1406 format::MAX_PERSISTED_VECTOR_DIMS
1407 )));
1408 }
1409 format::write_u32(&mut payload, dim)?;
1410 for _ in 0..dim {
1411 let v = format::read_f32(&mut self.reader)?;
1412 format::write_f32(&mut payload, v)?;
1413 }
1414 let metric = format::read_u8(&mut self.reader)?;
1415 format::write_u8(&mut payload, metric)?;
1416 let quantization = format::read_u8(&mut self.reader)?;
1417 format::write_u8(&mut payload, quantization)?;
1418 let connectivity = format::read_u32(&mut self.reader)?;
1419 format::write_u32(&mut payload, connectivity)?;
1420 let expansion_add = format::read_u32(&mut self.reader)?;
1421 format::write_u32(&mut payload, expansion_add)?;
1422 }
1423 #[cfg(feature = "vector")]
1424 TAG_VREM => {
1425 let key = format::read_bytes(&mut self.reader)?;
1426 format::write_bytes(&mut payload, &key)?;
1427 let element = format::read_bytes(&mut self.reader)?;
1428 format::write_bytes(&mut payload, &element)?;
1429 }
1430 #[cfg(feature = "protobuf")]
1431 TAG_PROTO_SET => {
1432 let key = format::read_bytes(&mut self.reader)?;
1433 format::write_bytes(&mut payload, &key)?;
1434 let type_name = format::read_bytes(&mut self.reader)?;
1435 format::write_bytes(&mut payload, &type_name)?;
1436 let data = format::read_bytes(&mut self.reader)?;
1437 format::write_bytes(&mut payload, &data)?;
1438 let expire_ms = format::read_i64(&mut self.reader)?;
1439 format::write_i64(&mut payload, expire_ms)?;
1440 }
1441 #[cfg(feature = "protobuf")]
1442 TAG_PROTO_REGISTER => {
1443 let name = format::read_bytes(&mut self.reader)?;
1444 format::write_bytes(&mut payload, &name)?;
1445 let descriptor = format::read_bytes(&mut self.reader)?;
1446 format::write_bytes(&mut payload, &descriptor)?;
1447 }
1448 _ => return Err(FormatError::UnknownTag(tag)),
1449 }
1450 let stored_crc = format::read_u32(&mut self.reader)?;
1451 Ok((payload, stored_crc))
1452 }
1453}
1454
1455fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1457 let mut opts = OpenOptions::new();
1458 opts.create(true).append(true);
1459 #[cfg(unix)]
1460 {
1461 use std::os::unix::fs::OpenOptionsExt;
1462 opts.mode(0o600);
1463 }
1464 Ok(opts.open(path)?)
1465}
1466
1467pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1469 data_dir.join(format!("shard-{shard_id}.aof"))
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474 use super::*;
1475
1476 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1477
1478 fn temp_dir() -> tempfile::TempDir {
1479 tempfile::tempdir().expect("create temp dir")
1480 }
1481
1482 #[test]
1483 fn record_round_trip_set() -> Result {
1484 let rec = AofRecord::Set {
1485 key: "hello".into(),
1486 value: Bytes::from("world"),
1487 expire_ms: 5000,
1488 };
1489 let bytes = rec.to_bytes()?;
1490 let decoded = AofRecord::from_bytes(&bytes)?;
1491 assert_eq!(rec, decoded);
1492 Ok(())
1493 }
1494
1495 #[test]
1496 fn record_round_trip_del() -> Result {
1497 let rec = AofRecord::Del { key: "gone".into() };
1498 let bytes = rec.to_bytes()?;
1499 let decoded = AofRecord::from_bytes(&bytes)?;
1500 assert_eq!(rec, decoded);
1501 Ok(())
1502 }
1503
1504 #[test]
1505 fn record_round_trip_expire() -> Result {
1506 let rec = AofRecord::Expire {
1507 key: "ttl".into(),
1508 seconds: 300,
1509 };
1510 let bytes = rec.to_bytes()?;
1511 let decoded = AofRecord::from_bytes(&bytes)?;
1512 assert_eq!(rec, decoded);
1513 Ok(())
1514 }
1515
1516 #[test]
1517 fn set_with_no_expiry() -> Result {
1518 let rec = AofRecord::Set {
1519 key: "k".into(),
1520 value: Bytes::from("v"),
1521 expire_ms: -1,
1522 };
1523 let bytes = rec.to_bytes()?;
1524 let decoded = AofRecord::from_bytes(&bytes)?;
1525 assert_eq!(rec, decoded);
1526 Ok(())
1527 }
1528
1529 #[test]
1530 fn writer_reader_round_trip() -> Result {
1531 let dir = temp_dir();
1532 let path = dir.path().join("test.aof");
1533
1534 let records = vec![
1535 AofRecord::Set {
1536 key: "a".into(),
1537 value: Bytes::from("1"),
1538 expire_ms: -1,
1539 },
1540 AofRecord::Set {
1541 key: "b".into(),
1542 value: Bytes::from("2"),
1543 expire_ms: 10_000,
1544 },
1545 AofRecord::Del { key: "a".into() },
1546 AofRecord::Expire {
1547 key: "b".into(),
1548 seconds: 60,
1549 },
1550 ];
1551
1552 {
1554 let mut writer = AofWriter::open(&path)?;
1555 for rec in &records {
1556 writer.write_record(rec)?;
1557 }
1558 writer.sync()?;
1559 }
1560
1561 let mut reader = AofReader::open(&path)?;
1563 let mut got = Vec::new();
1564 while let Some(rec) = reader.read_record()? {
1565 got.push(rec);
1566 }
1567 assert_eq!(records, got);
1568 Ok(())
1569 }
1570
1571 #[test]
1572 fn empty_aof_returns_no_records() -> Result {
1573 let dir = temp_dir();
1574 let path = dir.path().join("empty.aof");
1575
1576 {
1578 let _writer = AofWriter::open(&path)?;
1579 }
1580
1581 let mut reader = AofReader::open(&path)?;
1582 assert!(reader.read_record()?.is_none());
1583 Ok(())
1584 }
1585
1586 #[test]
1587 fn truncated_record_treated_as_eof() -> Result {
1588 let dir = temp_dir();
1589 let path = dir.path().join("trunc.aof");
1590
1591 {
1593 let mut writer = AofWriter::open(&path)?;
1594 writer.write_record(&AofRecord::Set {
1595 key: "ok".into(),
1596 value: Bytes::from("good"),
1597 expire_ms: -1,
1598 })?;
1599 writer.flush()?;
1600 }
1601
1602 {
1604 let mut file = OpenOptions::new().append(true).open(&path)?;
1605 file.write_all(&[TAG_SET])?;
1606 }
1607
1608 let mut reader = AofReader::open(&path)?;
1609 let rec = reader.read_record()?.unwrap();
1611 assert!(matches!(rec, AofRecord::Set { .. }));
1612 assert!(reader.read_record()?.is_none());
1614 Ok(())
1615 }
1616
1617 #[test]
1618 fn corrupt_crc_detected() -> Result {
1619 let dir = temp_dir();
1620 let path = dir.path().join("corrupt.aof");
1621
1622 {
1623 let mut writer = AofWriter::open(&path)?;
1624 writer.write_record(&AofRecord::Set {
1625 key: "k".into(),
1626 value: Bytes::from("v"),
1627 expire_ms: -1,
1628 })?;
1629 writer.flush()?;
1630 }
1631
1632 let mut data = fs::read(&path)?;
1634 let last = data.len() - 1;
1635 data[last] ^= 0xFF;
1636 fs::write(&path, &data)?;
1637
1638 let mut reader = AofReader::open(&path)?;
1639 let err = reader.read_record().unwrap_err();
1640 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1641 Ok(())
1642 }
1643
1644 #[test]
1645 fn missing_magic_is_error() {
1646 let dir = temp_dir();
1647 let path = dir.path().join("bad.aof");
1648 fs::write(&path, b"NOT_AOF_DATA").unwrap();
1649
1650 let err = AofReader::open(&path).unwrap_err();
1651 assert!(matches!(err, FormatError::InvalidMagic));
1652 }
1653
1654 #[test]
1655 fn truncate_resets_aof() -> Result {
1656 let dir = temp_dir();
1657 let path = dir.path().join("reset.aof");
1658
1659 {
1660 let mut writer = AofWriter::open(&path)?;
1661 writer.write_record(&AofRecord::Set {
1662 key: "old".into(),
1663 value: Bytes::from("data"),
1664 expire_ms: -1,
1665 })?;
1666 writer.truncate()?;
1667
1668 writer.write_record(&AofRecord::Set {
1670 key: "new".into(),
1671 value: Bytes::from("fresh"),
1672 expire_ms: -1,
1673 })?;
1674 writer.sync()?;
1675 }
1676
1677 let mut reader = AofReader::open(&path)?;
1678 let rec = reader.read_record()?.unwrap();
1679 match rec {
1680 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1681 other => panic!("expected Set, got {other:?}"),
1682 }
1683 assert!(reader.read_record()?.is_none());
1685 Ok(())
1686 }
1687
1688 #[test]
1689 fn record_round_trip_lpush() -> Result {
1690 let rec = AofRecord::LPush {
1691 key: "list".into(),
1692 values: vec![Bytes::from("a"), Bytes::from("b")],
1693 };
1694 let bytes = rec.to_bytes()?;
1695 let decoded = AofRecord::from_bytes(&bytes)?;
1696 assert_eq!(rec, decoded);
1697 Ok(())
1698 }
1699
1700 #[test]
1701 fn record_round_trip_rpush() -> Result {
1702 let rec = AofRecord::RPush {
1703 key: "list".into(),
1704 values: vec![Bytes::from("x")],
1705 };
1706 let bytes = rec.to_bytes()?;
1707 let decoded = AofRecord::from_bytes(&bytes)?;
1708 assert_eq!(rec, decoded);
1709 Ok(())
1710 }
1711
1712 #[test]
1713 fn record_round_trip_lpop() -> Result {
1714 let rec = AofRecord::LPop { key: "list".into() };
1715 let bytes = rec.to_bytes()?;
1716 let decoded = AofRecord::from_bytes(&bytes)?;
1717 assert_eq!(rec, decoded);
1718 Ok(())
1719 }
1720
1721 #[test]
1722 fn record_round_trip_rpop() -> Result {
1723 let rec = AofRecord::RPop { key: "list".into() };
1724 let bytes = rec.to_bytes()?;
1725 let decoded = AofRecord::from_bytes(&bytes)?;
1726 assert_eq!(rec, decoded);
1727 Ok(())
1728 }
1729
1730 #[test]
1731 fn writer_reader_round_trip_with_list_records() -> Result {
1732 let dir = temp_dir();
1733 let path = dir.path().join("list.aof");
1734
1735 let records = vec![
1736 AofRecord::LPush {
1737 key: "l".into(),
1738 values: vec![Bytes::from("a"), Bytes::from("b")],
1739 },
1740 AofRecord::RPush {
1741 key: "l".into(),
1742 values: vec![Bytes::from("c")],
1743 },
1744 AofRecord::LPop { key: "l".into() },
1745 AofRecord::RPop { key: "l".into() },
1746 ];
1747
1748 {
1749 let mut writer = AofWriter::open(&path)?;
1750 for rec in &records {
1751 writer.write_record(rec)?;
1752 }
1753 writer.sync()?;
1754 }
1755
1756 let mut reader = AofReader::open(&path)?;
1757 let mut got = Vec::new();
1758 while let Some(rec) = reader.read_record()? {
1759 got.push(rec);
1760 }
1761 assert_eq!(records, got);
1762 Ok(())
1763 }
1764
1765 #[test]
1766 fn record_round_trip_zadd() -> Result {
1767 let rec = AofRecord::ZAdd {
1768 key: "board".into(),
1769 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1770 };
1771 let bytes = rec.to_bytes()?;
1772 let decoded = AofRecord::from_bytes(&bytes)?;
1773 assert_eq!(rec, decoded);
1774 Ok(())
1775 }
1776
1777 #[test]
1778 fn record_round_trip_zrem() -> Result {
1779 let rec = AofRecord::ZRem {
1780 key: "board".into(),
1781 members: vec!["alice".into(), "bob".into()],
1782 };
1783 let bytes = rec.to_bytes()?;
1784 let decoded = AofRecord::from_bytes(&bytes)?;
1785 assert_eq!(rec, decoded);
1786 Ok(())
1787 }
1788
1789 #[test]
1790 fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1791 let dir = temp_dir();
1792 let path = dir.path().join("zset.aof");
1793
1794 let records = vec![
1795 AofRecord::ZAdd {
1796 key: "board".into(),
1797 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1798 },
1799 AofRecord::ZRem {
1800 key: "board".into(),
1801 members: vec!["alice".into()],
1802 },
1803 ];
1804
1805 {
1806 let mut writer = AofWriter::open(&path)?;
1807 for rec in &records {
1808 writer.write_record(rec)?;
1809 }
1810 writer.sync()?;
1811 }
1812
1813 let mut reader = AofReader::open(&path)?;
1814 let mut got = Vec::new();
1815 while let Some(rec) = reader.read_record()? {
1816 got.push(rec);
1817 }
1818 assert_eq!(records, got);
1819 Ok(())
1820 }
1821
1822 #[test]
1823 fn record_round_trip_persist() -> Result {
1824 let rec = AofRecord::Persist {
1825 key: "mykey".into(),
1826 };
1827 let bytes = rec.to_bytes()?;
1828 let decoded = AofRecord::from_bytes(&bytes)?;
1829 assert_eq!(rec, decoded);
1830 Ok(())
1831 }
1832
1833 #[test]
1834 fn record_round_trip_pexpire() -> Result {
1835 let rec = AofRecord::Pexpire {
1836 key: "mykey".into(),
1837 milliseconds: 5000,
1838 };
1839 let bytes = rec.to_bytes()?;
1840 let decoded = AofRecord::from_bytes(&bytes)?;
1841 assert_eq!(rec, decoded);
1842 Ok(())
1843 }
1844
1845 #[test]
1846 fn record_round_trip_incr() -> Result {
1847 let rec = AofRecord::Incr {
1848 key: "counter".into(),
1849 };
1850 let bytes = rec.to_bytes()?;
1851 let decoded = AofRecord::from_bytes(&bytes)?;
1852 assert_eq!(rec, decoded);
1853 Ok(())
1854 }
1855
1856 #[test]
1857 fn record_round_trip_decr() -> Result {
1858 let rec = AofRecord::Decr {
1859 key: "counter".into(),
1860 };
1861 let bytes = rec.to_bytes()?;
1862 let decoded = AofRecord::from_bytes(&bytes)?;
1863 assert_eq!(rec, decoded);
1864 Ok(())
1865 }
1866
1867 #[test]
1868 fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1869 let dir = temp_dir();
1870 let path = dir.path().join("persist_pexpire.aof");
1871
1872 let records = vec![
1873 AofRecord::Set {
1874 key: "k".into(),
1875 value: Bytes::from("v"),
1876 expire_ms: 5000,
1877 },
1878 AofRecord::Persist { key: "k".into() },
1879 AofRecord::Pexpire {
1880 key: "k".into(),
1881 milliseconds: 3000,
1882 },
1883 ];
1884
1885 {
1886 let mut writer = AofWriter::open(&path)?;
1887 for rec in &records {
1888 writer.write_record(rec)?;
1889 }
1890 writer.sync()?;
1891 }
1892
1893 let mut reader = AofReader::open(&path)?;
1894 let mut got = Vec::new();
1895 while let Some(rec) = reader.read_record()? {
1896 got.push(rec);
1897 }
1898 assert_eq!(records, got);
1899 Ok(())
1900 }
1901
1902 #[test]
1903 fn aof_path_format() {
1904 let p = aof_path(Path::new("/data"), 3);
1905 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1906 }
1907
1908 #[test]
1909 fn record_round_trip_hset() -> Result {
1910 let rec = AofRecord::HSet {
1911 key: "hash".into(),
1912 fields: vec![
1913 ("f1".into(), Bytes::from("v1")),
1914 ("f2".into(), Bytes::from("v2")),
1915 ],
1916 };
1917 let bytes = rec.to_bytes()?;
1918 let decoded = AofRecord::from_bytes(&bytes)?;
1919 assert_eq!(rec, decoded);
1920 Ok(())
1921 }
1922
1923 #[test]
1924 fn record_round_trip_hdel() -> Result {
1925 let rec = AofRecord::HDel {
1926 key: "hash".into(),
1927 fields: vec!["f1".into(), "f2".into()],
1928 };
1929 let bytes = rec.to_bytes()?;
1930 let decoded = AofRecord::from_bytes(&bytes)?;
1931 assert_eq!(rec, decoded);
1932 Ok(())
1933 }
1934
1935 #[test]
1936 fn record_round_trip_hincrby() -> Result {
1937 let rec = AofRecord::HIncrBy {
1938 key: "hash".into(),
1939 field: "counter".into(),
1940 delta: -42,
1941 };
1942 let bytes = rec.to_bytes()?;
1943 let decoded = AofRecord::from_bytes(&bytes)?;
1944 assert_eq!(rec, decoded);
1945 Ok(())
1946 }
1947
1948 #[test]
1949 fn record_round_trip_sadd() -> Result {
1950 let rec = AofRecord::SAdd {
1951 key: "set".into(),
1952 members: vec!["m1".into(), "m2".into(), "m3".into()],
1953 };
1954 let bytes = rec.to_bytes()?;
1955 let decoded = AofRecord::from_bytes(&bytes)?;
1956 assert_eq!(rec, decoded);
1957 Ok(())
1958 }
1959
1960 #[test]
1961 fn record_round_trip_srem() -> Result {
1962 let rec = AofRecord::SRem {
1963 key: "set".into(),
1964 members: vec!["m1".into()],
1965 };
1966 let bytes = rec.to_bytes()?;
1967 let decoded = AofRecord::from_bytes(&bytes)?;
1968 assert_eq!(rec, decoded);
1969 Ok(())
1970 }
1971
1972 #[cfg(feature = "vector")]
1973 #[test]
1974 fn record_round_trip_vadd() -> Result {
1975 let rec = AofRecord::VAdd {
1976 key: "embeddings".into(),
1977 element: "doc1".into(),
1978 vector: vec![0.1, 0.2, 0.3],
1979 metric: 0, quantization: 0, connectivity: 16,
1982 expansion_add: 64,
1983 };
1984 let bytes = rec.to_bytes()?;
1985 let decoded = AofRecord::from_bytes(&bytes)?;
1986 assert_eq!(rec, decoded);
1987 Ok(())
1988 }
1989
1990 #[cfg(feature = "vector")]
1991 #[test]
1992 fn record_round_trip_vadd_high_dim() -> Result {
1993 let rec = AofRecord::VAdd {
1994 key: "vecs".into(),
1995 element: "e".into(),
1996 vector: vec![0.0; 1536], metric: 1, quantization: 1, connectivity: 32,
2000 expansion_add: 128,
2001 };
2002 let bytes = rec.to_bytes()?;
2003 let decoded = AofRecord::from_bytes(&bytes)?;
2004 assert_eq!(rec, decoded);
2005 Ok(())
2006 }
2007
2008 #[cfg(feature = "vector")]
2009 #[test]
2010 fn record_round_trip_vrem() -> Result {
2011 let rec = AofRecord::VRem {
2012 key: "embeddings".into(),
2013 element: "doc1".into(),
2014 };
2015 let bytes = rec.to_bytes()?;
2016 let decoded = AofRecord::from_bytes(&bytes)?;
2017 assert_eq!(rec, decoded);
2018 Ok(())
2019 }
2020
2021 #[cfg(feature = "encryption")]
2022 mod encrypted {
2023 use super::*;
2024 use crate::encryption::EncryptionKey;
2025
2026 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
2027
2028 fn test_key() -> EncryptionKey {
2029 EncryptionKey::from_bytes([0x42; 32])
2030 }
2031
2032 #[test]
2033 fn encrypted_writer_reader_round_trip() -> Result {
2034 let dir = temp_dir();
2035 let path = dir.path().join("enc.aof");
2036 let key = test_key();
2037
2038 let records = vec![
2039 AofRecord::Set {
2040 key: "a".into(),
2041 value: Bytes::from("1"),
2042 expire_ms: -1,
2043 },
2044 AofRecord::Del { key: "a".into() },
2045 AofRecord::LPush {
2046 key: "list".into(),
2047 values: vec![Bytes::from("x"), Bytes::from("y")],
2048 },
2049 AofRecord::ZAdd {
2050 key: "zs".into(),
2051 members: vec![(1.0, "m".into())],
2052 },
2053 ];
2054
2055 {
2056 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
2057 for rec in &records {
2058 writer.write_record(rec)?;
2059 }
2060 writer.sync()?;
2061 }
2062
2063 let mut reader = AofReader::open_encrypted(&path, key)?;
2064 let mut got = Vec::new();
2065 while let Some(rec) = reader.read_record()? {
2066 got.push(rec);
2067 }
2068 assert_eq!(records, got);
2069 Ok(())
2070 }
2071
2072 #[test]
2073 fn encrypted_aof_wrong_key_fails() -> Result {
2074 let dir = temp_dir();
2075 let path = dir.path().join("enc_bad.aof");
2076 let key = test_key();
2077 let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
2078
2079 {
2080 let mut writer = AofWriter::open_encrypted(&path, key)?;
2081 writer.write_record(&AofRecord::Set {
2082 key: "k".into(),
2083 value: Bytes::from("v"),
2084 expire_ms: -1,
2085 })?;
2086 writer.sync()?;
2087 }
2088
2089 let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
2090 let err = reader.read_record().unwrap_err();
2091 assert!(matches!(err, FormatError::DecryptionFailed));
2092 Ok(())
2093 }
2094
2095 #[test]
2096 fn v2_file_readable_with_encryption_key() -> Result {
2097 let dir = temp_dir();
2098 let path = dir.path().join("v2.aof");
2099 let key = test_key();
2100
2101 {
2103 let mut writer = AofWriter::open(&path)?;
2104 writer.write_record(&AofRecord::Set {
2105 key: "k".into(),
2106 value: Bytes::from("v"),
2107 expire_ms: -1,
2108 })?;
2109 writer.sync()?;
2110 }
2111
2112 let mut reader = AofReader::open_encrypted(&path, key)?;
2114 let rec = reader.read_record()?.unwrap();
2115 assert!(matches!(rec, AofRecord::Set { .. }));
2116 Ok(())
2117 }
2118
2119 #[test]
2120 fn v3_file_without_key_returns_error() -> Result {
2121 let dir = temp_dir();
2122 let path = dir.path().join("v3_nokey.aof");
2123 let key = test_key();
2124
2125 {
2127 let mut writer = AofWriter::open_encrypted(&path, key)?;
2128 writer.write_record(&AofRecord::Set {
2129 key: "k".into(),
2130 value: Bytes::from("v"),
2131 expire_ms: -1,
2132 })?;
2133 writer.sync()?;
2134 }
2135
2136 let err = AofReader::open(&path).unwrap_err();
2138 assert!(matches!(err, FormatError::EncryptionRequired));
2139 Ok(())
2140 }
2141
2142 #[test]
2143 fn encrypted_truncate_preserves_encryption() -> Result {
2144 let dir = temp_dir();
2145 let path = dir.path().join("enc_trunc.aof");
2146 let key = test_key();
2147
2148 {
2149 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
2150 writer.write_record(&AofRecord::Set {
2151 key: "old".into(),
2152 value: Bytes::from("data"),
2153 expire_ms: -1,
2154 })?;
2155 writer.truncate()?;
2156
2157 writer.write_record(&AofRecord::Set {
2158 key: "new".into(),
2159 value: Bytes::from("fresh"),
2160 expire_ms: -1,
2161 })?;
2162 writer.sync()?;
2163 }
2164
2165 let mut reader = AofReader::open_encrypted(&path, key)?;
2166 let rec = reader.read_record()?.unwrap();
2167 match rec {
2168 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
2169 other => panic!("expected Set, got {other:?}"),
2170 }
2171 assert!(reader.read_record()?.is_none());
2172 Ok(())
2173 }
2174 }
2175}