1use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32 let bytes = format::read_bytes(r)?;
33 String::from_utf8(bytes).map_err(|_| {
34 FormatError::Io(io::Error::new(
35 io::ErrorKind::InvalidData,
36 format!("{field} is not valid utf-8"),
37 ))
38 })
39}
40
41fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44 let count = format::read_u32(r)?;
45 format::validate_collection_count(count, field)?;
46 let mut items = Vec::with_capacity(format::capped_capacity(count));
47 for _ in 0..count {
48 items.push(read_string(r, field)?);
49 }
50 Ok(items)
51}
52
53fn read_bytes_list(r: &mut impl io::Read, label: &str) -> Result<Vec<Bytes>, FormatError> {
56 let count = format::read_u32(r)?;
57 format::validate_collection_count(count, label)?;
58 let mut items = Vec::with_capacity(format::capped_capacity(count));
59 for _ in 0..count {
60 items.push(Bytes::from(format::read_bytes(r)?));
61 }
62 Ok(items)
63}
64
65const TAG_SET: u8 = 1;
70const TAG_INCR: u8 = 12;
71const TAG_DECR: u8 = 13;
72const TAG_INCRBY: u8 = 19;
73const TAG_DECRBY: u8 = 20;
74const TAG_APPEND: u8 = 21;
75
76const TAG_LPUSH: u8 = 4;
78const TAG_RPUSH: u8 = 5;
79const TAG_LPOP: u8 = 6;
80const TAG_RPOP: u8 = 7;
81
82const TAG_ZADD: u8 = 8;
84const TAG_ZREM: u8 = 9;
85
86const TAG_HSET: u8 = 14;
88const TAG_HDEL: u8 = 15;
89const TAG_HINCRBY: u8 = 16;
90
91const TAG_SADD: u8 = 17;
93const TAG_SREM: u8 = 18;
94
95const TAG_DEL: u8 = 2;
97const TAG_EXPIRE: u8 = 3;
98const TAG_PERSIST: u8 = 10;
99const TAG_PEXPIRE: u8 = 11;
100const TAG_RENAME: u8 = 22;
101const TAG_COPY: u8 = 27;
102const TAG_LSET: u8 = 28;
103const TAG_LTRIM: u8 = 29;
104const TAG_LINSERT: u8 = 30;
105const TAG_LREM: u8 = 31;
106const TAG_SETRANGE: u8 = 32;
107
108#[cfg(feature = "vector")]
110const TAG_VADD: u8 = 25;
111#[cfg(feature = "vector")]
112const TAG_VREM: u8 = 26;
113
114#[cfg(feature = "protobuf")]
116const TAG_PROTO_SET: u8 = 23;
117#[cfg(feature = "protobuf")]
118const TAG_PROTO_REGISTER: u8 = 24;
119
120#[derive(Debug, Clone, PartialEq)]
122pub enum AofRecord {
123 Set {
125 key: String,
126 value: Bytes,
127 expire_ms: i64,
128 },
129 Del { key: String },
131 Expire { key: String, seconds: u64 },
133 LPush { key: String, values: Vec<Bytes> },
135 RPush { key: String, values: Vec<Bytes> },
137 LPop { key: String },
139 RPop { key: String },
141 LSet {
143 key: String,
144 index: i64,
145 value: Bytes,
146 },
147 LTrim { key: String, start: i64, stop: i64 },
149 LInsert {
151 key: String,
152 before: bool,
153 pivot: Bytes,
154 value: Bytes,
155 },
156 LRem {
158 key: String,
159 count: i64,
160 value: Bytes,
161 },
162 ZAdd {
164 key: String,
165 members: Vec<(f64, String)>,
166 },
167 ZRem { key: String, members: Vec<String> },
169 Persist { key: String },
171 Pexpire { key: String, milliseconds: u64 },
173 Incr { key: String },
175 Decr { key: String },
177 HSet {
179 key: String,
180 fields: Vec<(String, Bytes)>,
181 },
182 HDel { key: String, fields: Vec<String> },
184 HIncrBy {
186 key: String,
187 field: String,
188 delta: i64,
189 },
190 SAdd { key: String, members: Vec<String> },
192 SRem { key: String, members: Vec<String> },
194 IncrBy { key: String, delta: i64 },
196 DecrBy { key: String, delta: i64 },
198 Append { key: String, value: Bytes },
200 SetRange {
202 key: String,
203 offset: usize,
204 value: Bytes,
205 },
206 Rename { key: String, newkey: String },
208 Copy {
210 source: String,
211 destination: String,
212 replace: bool,
213 },
214 #[cfg(feature = "vector")]
217 VAdd {
218 key: String,
219 element: String,
220 vector: Vec<f32>,
221 metric: u8,
223 quantization: u8,
225 connectivity: u32,
226 expansion_add: u32,
227 },
228 #[cfg(feature = "vector")]
230 VRem { key: String, element: String },
231 #[cfg(feature = "protobuf")]
233 ProtoSet {
234 key: String,
235 type_name: String,
236 data: Bytes,
237 expire_ms: i64,
238 },
239 #[cfg(feature = "protobuf")]
241 ProtoRegister { name: String, descriptor: Bytes },
242}
243
244impl AofRecord {
245 fn tag(&self) -> u8 {
255 match self {
256 AofRecord::Set { .. } => TAG_SET,
257 AofRecord::Del { .. } => TAG_DEL,
258 AofRecord::Expire { .. } => TAG_EXPIRE,
259 AofRecord::LPush { .. } => TAG_LPUSH,
260 AofRecord::RPush { .. } => TAG_RPUSH,
261 AofRecord::LPop { .. } => TAG_LPOP,
262 AofRecord::RPop { .. } => TAG_RPOP,
263 AofRecord::LSet { .. } => TAG_LSET,
264 AofRecord::LTrim { .. } => TAG_LTRIM,
265 AofRecord::LInsert { .. } => TAG_LINSERT,
266 AofRecord::LRem { .. } => TAG_LREM,
267 AofRecord::ZAdd { .. } => TAG_ZADD,
268 AofRecord::ZRem { .. } => TAG_ZREM,
269 AofRecord::Persist { .. } => TAG_PERSIST,
270 AofRecord::Pexpire { .. } => TAG_PEXPIRE,
271 AofRecord::Incr { .. } => TAG_INCR,
272 AofRecord::Decr { .. } => TAG_DECR,
273 AofRecord::HSet { .. } => TAG_HSET,
274 AofRecord::HDel { .. } => TAG_HDEL,
275 AofRecord::HIncrBy { .. } => TAG_HINCRBY,
276 AofRecord::SAdd { .. } => TAG_SADD,
277 AofRecord::SRem { .. } => TAG_SREM,
278 AofRecord::IncrBy { .. } => TAG_INCRBY,
279 AofRecord::DecrBy { .. } => TAG_DECRBY,
280 AofRecord::Append { .. } => TAG_APPEND,
281 AofRecord::SetRange { .. } => TAG_SETRANGE,
282 AofRecord::Rename { .. } => TAG_RENAME,
283 AofRecord::Copy { .. } => TAG_COPY,
284 #[cfg(feature = "vector")]
285 AofRecord::VAdd { .. } => TAG_VADD,
286 #[cfg(feature = "vector")]
287 AofRecord::VRem { .. } => TAG_VREM,
288 #[cfg(feature = "protobuf")]
289 AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
290 #[cfg(feature = "protobuf")]
291 AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
292 }
293 }
294
295 fn estimated_size(&self) -> usize {
301 const LEN_PREFIX: usize = 4;
303
304 match self {
305 AofRecord::Set {
307 key,
308 value,
309 expire_ms: _,
310 } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
311 AofRecord::Del { key }
313 | AofRecord::LPop { key }
314 | AofRecord::RPop { key }
315 | AofRecord::Persist { key }
316 | AofRecord::Incr { key }
317 | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
318 AofRecord::Expire { key, .. } | AofRecord::Pexpire { key, .. } => {
320 1 + LEN_PREFIX + key.len() + 8
321 }
322 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
324 let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
325 1 + LEN_PREFIX + key.len() + 4 + values_size
326 }
327 AofRecord::LSet { key, value, .. } => {
329 1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
330 }
331 AofRecord::LTrim { key, .. } => 1 + LEN_PREFIX + key.len() + 8 + 8,
333 AofRecord::LInsert {
335 key, pivot, value, ..
336 } => {
337 1 + LEN_PREFIX + key.len() + 1 + LEN_PREFIX + pivot.len() + LEN_PREFIX + value.len()
338 }
339 AofRecord::LRem { key, value, .. } => {
341 1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
342 }
343 AofRecord::ZAdd { key, members } => {
344 let members_size: usize =
345 members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
346 1 + LEN_PREFIX + key.len() + 4 + members_size
347 }
348 AofRecord::ZRem { key, members }
349 | AofRecord::SAdd { key, members }
350 | AofRecord::SRem { key, members } => {
351 let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
352 1 + LEN_PREFIX + key.len() + 4 + members_size
353 }
354 AofRecord::HSet { key, fields } => {
355 let fields_size: usize = fields
356 .iter()
357 .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
358 .sum();
359 1 + LEN_PREFIX + key.len() + 4 + fields_size
360 }
361 AofRecord::HDel { key, fields } => {
362 let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
363 1 + LEN_PREFIX + key.len() + 4 + fields_size
364 }
365 AofRecord::HIncrBy { key, field, .. } => {
366 1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
367 }
368 AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
369 1 + LEN_PREFIX + key.len() + 8
370 }
371 AofRecord::Append { key, value } => {
372 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
373 }
374 AofRecord::SetRange { key, value, .. } => {
375 1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
376 }
377 AofRecord::Rename { key, newkey } => {
378 1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
379 }
380 AofRecord::Copy {
381 source,
382 destination,
383 ..
384 } => 1 + LEN_PREFIX + source.len() + LEN_PREFIX + destination.len() + 1,
385 #[cfg(feature = "vector")]
386 AofRecord::VAdd {
387 key,
388 element,
389 vector,
390 ..
391 } => {
392 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
393 }
394 #[cfg(feature = "vector")]
395 AofRecord::VRem { key, element } => {
396 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
397 }
398 #[cfg(feature = "protobuf")]
399 AofRecord::ProtoSet {
400 key,
401 type_name,
402 data,
403 ..
404 } => {
405 1 + LEN_PREFIX
406 + key.len()
407 + LEN_PREFIX
408 + type_name.len()
409 + LEN_PREFIX
410 + data.len()
411 + 8
412 }
413 #[cfg(feature = "protobuf")]
414 AofRecord::ProtoRegister { name, descriptor } => {
415 1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
416 }
417 }
418 }
419
420 pub fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
422 let mut buf = Vec::with_capacity(self.estimated_size());
423 format::write_u8(&mut buf, self.tag())?;
424
425 match self {
426 AofRecord::Del { key }
428 | AofRecord::LPop { key }
429 | AofRecord::RPop { key }
430 | AofRecord::Persist { key }
431 | AofRecord::Incr { key }
432 | AofRecord::Decr { key } => {
433 format::write_bytes(&mut buf, key.as_bytes())?;
434 }
435
436 AofRecord::Set {
438 key,
439 value,
440 expire_ms,
441 } => {
442 format::write_bytes(&mut buf, key.as_bytes())?;
443 format::write_bytes(&mut buf, value)?;
444 format::write_i64(&mut buf, *expire_ms)?;
445 }
446
447 AofRecord::Expire { key, seconds } => {
450 format::write_bytes(&mut buf, key.as_bytes())?;
451 format::write_i64(&mut buf, (*seconds).min(i64::MAX as u64) as i64)?;
452 }
453 AofRecord::Pexpire { key, milliseconds } => {
454 format::write_bytes(&mut buf, key.as_bytes())?;
455 format::write_i64(&mut buf, (*milliseconds).min(i64::MAX as u64) as i64)?;
456 }
457 AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
458 format::write_bytes(&mut buf, key.as_bytes())?;
459 format::write_i64(&mut buf, *delta)?;
460 }
461
462 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
464 format::write_bytes(&mut buf, key.as_bytes())?;
465 format::write_len(&mut buf, values.len())?;
466 for v in values {
467 format::write_bytes(&mut buf, v)?;
468 }
469 }
470
471 AofRecord::LSet { key, index, value } => {
473 format::write_bytes(&mut buf, key.as_bytes())?;
474 format::write_i64(&mut buf, *index)?;
475 format::write_bytes(&mut buf, value)?;
476 }
477
478 AofRecord::LTrim { key, start, stop } => {
480 format::write_bytes(&mut buf, key.as_bytes())?;
481 format::write_i64(&mut buf, *start)?;
482 format::write_i64(&mut buf, *stop)?;
483 }
484
485 AofRecord::LInsert {
487 key,
488 before,
489 pivot,
490 value,
491 } => {
492 format::write_bytes(&mut buf, key.as_bytes())?;
493 format::write_u8(&mut buf, if *before { 1 } else { 0 })?;
494 format::write_bytes(&mut buf, pivot)?;
495 format::write_bytes(&mut buf, value)?;
496 }
497
498 AofRecord::LRem { key, count, value } => {
500 format::write_bytes(&mut buf, key.as_bytes())?;
501 format::write_i64(&mut buf, *count)?;
502 format::write_bytes(&mut buf, value)?;
503 }
504
505 AofRecord::ZRem { key, members }
507 | AofRecord::SAdd { key, members }
508 | AofRecord::SRem { key, members } => {
509 format::write_bytes(&mut buf, key.as_bytes())?;
510 format::write_len(&mut buf, members.len())?;
511 for member in members {
512 format::write_bytes(&mut buf, member.as_bytes())?;
513 }
514 }
515 AofRecord::HDel { key, fields } => {
516 format::write_bytes(&mut buf, key.as_bytes())?;
517 format::write_len(&mut buf, fields.len())?;
518 for field in fields {
519 format::write_bytes(&mut buf, field.as_bytes())?;
520 }
521 }
522
523 AofRecord::ZAdd { key, members } => {
525 format::write_bytes(&mut buf, key.as_bytes())?;
526 format::write_len(&mut buf, members.len())?;
527 for (score, member) in members {
528 format::write_f64(&mut buf, *score)?;
529 format::write_bytes(&mut buf, member.as_bytes())?;
530 }
531 }
532
533 AofRecord::HSet { key, fields } => {
535 format::write_bytes(&mut buf, key.as_bytes())?;
536 format::write_len(&mut buf, fields.len())?;
537 for (field, value) in fields {
538 format::write_bytes(&mut buf, field.as_bytes())?;
539 format::write_bytes(&mut buf, value)?;
540 }
541 }
542
543 AofRecord::HIncrBy { key, field, delta } => {
545 format::write_bytes(&mut buf, key.as_bytes())?;
546 format::write_bytes(&mut buf, field.as_bytes())?;
547 format::write_i64(&mut buf, *delta)?;
548 }
549
550 AofRecord::Append { key, value } => {
552 format::write_bytes(&mut buf, key.as_bytes())?;
553 format::write_bytes(&mut buf, value)?;
554 }
555
556 AofRecord::SetRange { key, offset, value } => {
558 format::write_bytes(&mut buf, key.as_bytes())?;
559 format::write_i64(&mut buf, *offset as i64)?;
560 format::write_bytes(&mut buf, value)?;
561 }
562
563 AofRecord::Rename { key, newkey } => {
565 format::write_bytes(&mut buf, key.as_bytes())?;
566 format::write_bytes(&mut buf, newkey.as_bytes())?;
567 }
568
569 AofRecord::Copy {
571 source,
572 destination,
573 replace,
574 } => {
575 format::write_bytes(&mut buf, source.as_bytes())?;
576 format::write_bytes(&mut buf, destination.as_bytes())?;
577 buf.push(u8::from(*replace));
578 }
579
580 #[cfg(feature = "vector")]
581 AofRecord::VAdd {
582 key,
583 element,
584 vector,
585 metric,
586 quantization,
587 connectivity,
588 expansion_add,
589 } => {
590 format::write_bytes(&mut buf, key.as_bytes())?;
591 format::write_bytes(&mut buf, element.as_bytes())?;
592 format::write_len(&mut buf, vector.len())?;
593 for &v in vector {
594 format::write_f32(&mut buf, v)?;
595 }
596 format::write_u8(&mut buf, *metric)?;
597 format::write_u8(&mut buf, *quantization)?;
598 format::write_u32(&mut buf, *connectivity)?;
599 format::write_u32(&mut buf, *expansion_add)?;
600 }
601 #[cfg(feature = "vector")]
602 AofRecord::VRem { key, element } => {
603 format::write_bytes(&mut buf, key.as_bytes())?;
604 format::write_bytes(&mut buf, element.as_bytes())?;
605 }
606
607 #[cfg(feature = "protobuf")]
608 AofRecord::ProtoSet {
609 key,
610 type_name,
611 data,
612 expire_ms,
613 } => {
614 format::write_bytes(&mut buf, key.as_bytes())?;
615 format::write_bytes(&mut buf, type_name.as_bytes())?;
616 format::write_bytes(&mut buf, data)?;
617 format::write_i64(&mut buf, *expire_ms)?;
618 }
619 #[cfg(feature = "protobuf")]
620 AofRecord::ProtoRegister { name, descriptor } => {
621 format::write_bytes(&mut buf, name.as_bytes())?;
622 format::write_bytes(&mut buf, descriptor)?;
623 }
624 }
625 Ok(buf)
626 }
627
628 pub fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
633 let mut cursor = io::Cursor::new(data);
634 let tag = format::read_u8(&mut cursor)?;
635 match tag {
636 TAG_SET => {
637 let key = read_string(&mut cursor, "key")?;
638 let value = format::read_bytes(&mut cursor)?;
639 let expire_ms = format::read_i64(&mut cursor)?;
640 Ok(AofRecord::Set {
641 key,
642 value: Bytes::from(value),
643 expire_ms,
644 })
645 }
646 TAG_DEL => {
647 let key = read_string(&mut cursor, "key")?;
648 Ok(AofRecord::Del { key })
649 }
650 TAG_EXPIRE => {
651 let key = read_string(&mut cursor, "key")?;
652 let raw = format::read_i64(&mut cursor)?;
653 let seconds = u64::try_from(raw).map_err(|_| {
654 FormatError::InvalidData(format!(
655 "EXPIRE seconds is negative ({raw}) in AOF record"
656 ))
657 })?;
658 Ok(AofRecord::Expire { key, seconds })
659 }
660 TAG_LPUSH | TAG_RPUSH => {
661 let key = read_string(&mut cursor, "key")?;
662 let values = read_bytes_list(&mut cursor, "list")?;
663 if tag == TAG_LPUSH {
664 Ok(AofRecord::LPush { key, values })
665 } else {
666 Ok(AofRecord::RPush { key, values })
667 }
668 }
669 TAG_LPOP => {
670 let key = read_string(&mut cursor, "key")?;
671 Ok(AofRecord::LPop { key })
672 }
673 TAG_RPOP => {
674 let key = read_string(&mut cursor, "key")?;
675 Ok(AofRecord::RPop { key })
676 }
677 TAG_LSET => {
678 let key = read_string(&mut cursor, "key")?;
679 let index = format::read_i64(&mut cursor)?;
680 let value = Bytes::from(format::read_bytes(&mut cursor)?);
681 Ok(AofRecord::LSet { key, index, value })
682 }
683 TAG_LTRIM => {
684 let key = read_string(&mut cursor, "key")?;
685 let start = format::read_i64(&mut cursor)?;
686 let stop = format::read_i64(&mut cursor)?;
687 Ok(AofRecord::LTrim { key, start, stop })
688 }
689 TAG_LINSERT => {
690 let key = read_string(&mut cursor, "key")?;
691 let before_byte = format::read_u8(&mut cursor)?;
692 let before = before_byte != 0;
693 let pivot = Bytes::from(format::read_bytes(&mut cursor)?);
694 let value = Bytes::from(format::read_bytes(&mut cursor)?);
695 Ok(AofRecord::LInsert {
696 key,
697 before,
698 pivot,
699 value,
700 })
701 }
702 TAG_LREM => {
703 let key = read_string(&mut cursor, "key")?;
704 let count = format::read_i64(&mut cursor)?;
705 let value = Bytes::from(format::read_bytes(&mut cursor)?);
706 Ok(AofRecord::LRem { key, count, value })
707 }
708 TAG_ZADD => {
709 let key = read_string(&mut cursor, "key")?;
710 let count = format::read_u32(&mut cursor)?;
711 format::validate_collection_count(count, "sorted set")?;
712 let mut members = Vec::with_capacity(format::capped_capacity(count));
713 for _ in 0..count {
714 let score = format::read_f64(&mut cursor)?;
715 let member = read_string(&mut cursor, "member")?;
716 members.push((score, member));
717 }
718 Ok(AofRecord::ZAdd { key, members })
719 }
720 TAG_ZREM => {
721 let key = read_string(&mut cursor, "key")?;
722 let members = read_string_list(&mut cursor, "member")?;
723 Ok(AofRecord::ZRem { key, members })
724 }
725 TAG_PERSIST => {
726 let key = read_string(&mut cursor, "key")?;
727 Ok(AofRecord::Persist { key })
728 }
729 TAG_PEXPIRE => {
730 let key = read_string(&mut cursor, "key")?;
731 let raw = format::read_i64(&mut cursor)?;
732 let milliseconds = u64::try_from(raw).map_err(|_| {
733 FormatError::InvalidData(format!(
734 "PEXPIRE milliseconds is negative ({raw}) in AOF record"
735 ))
736 })?;
737 Ok(AofRecord::Pexpire { key, milliseconds })
738 }
739 TAG_INCR => {
740 let key = read_string(&mut cursor, "key")?;
741 Ok(AofRecord::Incr { key })
742 }
743 TAG_DECR => {
744 let key = read_string(&mut cursor, "key")?;
745 Ok(AofRecord::Decr { key })
746 }
747 TAG_HSET => {
748 let key = read_string(&mut cursor, "key")?;
749 let count = format::read_u32(&mut cursor)?;
750 format::validate_collection_count(count, "hash")?;
751 let mut fields = Vec::with_capacity(format::capped_capacity(count));
752 for _ in 0..count {
753 let field = read_string(&mut cursor, "field")?;
754 let value = Bytes::from(format::read_bytes(&mut cursor)?);
755 fields.push((field, value));
756 }
757 Ok(AofRecord::HSet { key, fields })
758 }
759 TAG_HDEL => {
760 let key = read_string(&mut cursor, "key")?;
761 let fields = read_string_list(&mut cursor, "field")?;
762 Ok(AofRecord::HDel { key, fields })
763 }
764 TAG_HINCRBY => {
765 let key = read_string(&mut cursor, "key")?;
766 let field = read_string(&mut cursor, "field")?;
767 let delta = format::read_i64(&mut cursor)?;
768 Ok(AofRecord::HIncrBy { key, field, delta })
769 }
770 TAG_SADD => {
771 let key = read_string(&mut cursor, "key")?;
772 let members = read_string_list(&mut cursor, "member")?;
773 Ok(AofRecord::SAdd { key, members })
774 }
775 TAG_SREM => {
776 let key = read_string(&mut cursor, "key")?;
777 let members = read_string_list(&mut cursor, "member")?;
778 Ok(AofRecord::SRem { key, members })
779 }
780 TAG_INCRBY => {
781 let key = read_string(&mut cursor, "key")?;
782 let delta = format::read_i64(&mut cursor)?;
783 Ok(AofRecord::IncrBy { key, delta })
784 }
785 TAG_DECRBY => {
786 let key = read_string(&mut cursor, "key")?;
787 let delta = format::read_i64(&mut cursor)?;
788 Ok(AofRecord::DecrBy { key, delta })
789 }
790 TAG_APPEND => {
791 let key = read_string(&mut cursor, "key")?;
792 let value = Bytes::from(format::read_bytes(&mut cursor)?);
793 Ok(AofRecord::Append { key, value })
794 }
795 TAG_SETRANGE => {
796 let key = read_string(&mut cursor, "key")?;
797 let offset = format::read_i64(&mut cursor)? as usize;
798 let value = Bytes::from(format::read_bytes(&mut cursor)?);
799 Ok(AofRecord::SetRange { key, offset, value })
800 }
801 TAG_RENAME => {
802 let key = read_string(&mut cursor, "key")?;
803 let newkey = read_string(&mut cursor, "newkey")?;
804 Ok(AofRecord::Rename { key, newkey })
805 }
806 TAG_COPY => {
807 let source = read_string(&mut cursor, "source")?;
808 let destination = read_string(&mut cursor, "destination")?;
809 let replace = format::read_u8(&mut cursor)? != 0;
810 Ok(AofRecord::Copy {
811 source,
812 destination,
813 replace,
814 })
815 }
816 #[cfg(feature = "vector")]
817 TAG_VADD => {
818 let key = read_string(&mut cursor, "key")?;
819 let element = read_string(&mut cursor, "element")?;
820 let dim = format::read_u32(&mut cursor)?;
821 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
822 return Err(FormatError::InvalidData(format!(
823 "AOF VADD dimension {dim} exceeds max {}",
824 format::MAX_PERSISTED_VECTOR_DIMS
825 )));
826 }
827 let mut vector = Vec::with_capacity(dim as usize);
828 for _ in 0..dim {
829 vector.push(format::read_f32(&mut cursor)?);
830 }
831 let metric = format::read_u8(&mut cursor)?;
832 let quantization = format::read_u8(&mut cursor)?;
833 let connectivity = format::read_u32(&mut cursor)?;
834 let expansion_add = format::read_u32(&mut cursor)?;
835 Ok(AofRecord::VAdd {
836 key,
837 element,
838 vector,
839 metric,
840 quantization,
841 connectivity,
842 expansion_add,
843 })
844 }
845 #[cfg(feature = "vector")]
846 TAG_VREM => {
847 let key = read_string(&mut cursor, "key")?;
848 let element = read_string(&mut cursor, "element")?;
849 Ok(AofRecord::VRem { key, element })
850 }
851 #[cfg(feature = "protobuf")]
852 TAG_PROTO_SET => {
853 let key = read_string(&mut cursor, "key")?;
854 let type_name = read_string(&mut cursor, "type_name")?;
855 let data = format::read_bytes(&mut cursor)?;
856 let expire_ms = format::read_i64(&mut cursor)?;
857 Ok(AofRecord::ProtoSet {
858 key,
859 type_name,
860 data: Bytes::from(data),
861 expire_ms,
862 })
863 }
864 #[cfg(feature = "protobuf")]
865 TAG_PROTO_REGISTER => {
866 let name = read_string(&mut cursor, "name")?;
867 let descriptor = format::read_bytes(&mut cursor)?;
868 Ok(AofRecord::ProtoRegister {
869 name,
870 descriptor: Bytes::from(descriptor),
871 })
872 }
873 _ => Err(FormatError::UnknownTag(tag)),
874 }
875 }
876}
877
878#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
880pub enum FsyncPolicy {
881 Always,
883 #[default]
885 EverySec,
886 No,
888}
889
890pub struct AofWriter {
892 writer: BufWriter<File>,
893 path: PathBuf,
894 #[cfg(feature = "encryption")]
895 encryption_key: Option<crate::encryption::EncryptionKey>,
896}
897
898impl AofWriter {
899 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
902 let path = path.into();
903 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
904
905 let file = open_persistence_file(&path)?;
906 let mut writer = BufWriter::new(file);
907
908 if !exists {
909 format::write_header(&mut writer, format::AOF_MAGIC)?;
910 writer.flush()?;
911 }
912
913 Ok(Self {
914 writer,
915 path,
916 #[cfg(feature = "encryption")]
917 encryption_key: None,
918 })
919 }
920
921 #[cfg(feature = "encryption")]
927 pub fn open_encrypted(
928 path: impl Into<PathBuf>,
929 key: crate::encryption::EncryptionKey,
930 ) -> Result<Self, FormatError> {
931 let path = path.into();
932 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
933
934 let file = open_persistence_file(&path)?;
935 let mut writer = BufWriter::new(file);
936
937 if !exists {
938 format::write_header_versioned(
939 &mut writer,
940 format::AOF_MAGIC,
941 format::FORMAT_VERSION_ENCRYPTED,
942 )?;
943 writer.flush()?;
944 }
945
946 Ok(Self {
947 writer,
948 path,
949 encryption_key: Some(key),
950 })
951 }
952
953 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
958 let payload = record.to_bytes()?;
959
960 #[cfg(feature = "encryption")]
961 if let Some(ref key) = self.encryption_key {
962 let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
963 self.writer.write_all(&nonce)?;
964 format::write_len(&mut self.writer, ciphertext.len())?;
965 self.writer.write_all(&ciphertext)?;
966 return Ok(());
967 }
968
969 let checksum = format::crc32(&payload);
970 self.writer.write_all(&payload)?;
971 format::write_u32(&mut self.writer, checksum)?;
972 Ok(())
973 }
974
975 pub fn flush(&mut self) -> Result<(), FormatError> {
977 self.writer.flush()?;
978 Ok(())
979 }
980
981 pub fn sync(&mut self) -> Result<(), FormatError> {
983 self.writer.flush()?;
984 self.writer.get_ref().sync_all()?;
985 Ok(())
986 }
987
988 pub fn path(&self) -> &Path {
990 &self.path
991 }
992
993 pub fn truncate(&mut self) -> Result<(), FormatError> {
999 self.writer.flush()?;
1001
1002 let tmp_path = self.path.with_extension("aof.tmp");
1004 let mut opts = OpenOptions::new();
1005 opts.create(true).write(true).truncate(true);
1006 #[cfg(unix)]
1007 {
1008 use std::os::unix::fs::OpenOptionsExt;
1009 opts.mode(0o600);
1010 }
1011 let tmp_file = opts.open(&tmp_path)?;
1012 let mut tmp_writer = BufWriter::new(tmp_file);
1013
1014 #[cfg(feature = "encryption")]
1015 if self.encryption_key.is_some() {
1016 format::write_header_versioned(
1017 &mut tmp_writer,
1018 format::AOF_MAGIC,
1019 format::FORMAT_VERSION_ENCRYPTED,
1020 )?;
1021 } else {
1022 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1023 }
1024 #[cfg(not(feature = "encryption"))]
1025 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1026
1027 tmp_writer.flush()?;
1028 tmp_writer.get_ref().sync_all()?;
1029
1030 std::fs::rename(&tmp_path, &self.path)?;
1032
1033 let file = OpenOptions::new().append(true).open(&self.path)?;
1035 self.writer = BufWriter::new(file);
1036 Ok(())
1037 }
1038}
1039
1040pub struct AofReader {
1042 reader: BufReader<File>,
1043 version: u8,
1045 #[cfg(feature = "encryption")]
1046 encryption_key: Option<crate::encryption::EncryptionKey>,
1047}
1048
1049impl fmt::Debug for AofReader {
1050 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1051 f.debug_struct("AofReader")
1052 .field("version", &self.version)
1053 .finish()
1054 }
1055}
1056
1057impl AofReader {
1058 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
1060 let file = File::open(path.as_ref())?;
1061 let mut reader = BufReader::new(file);
1062 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1063
1064 if version == format::FORMAT_VERSION_ENCRYPTED {
1065 return Err(FormatError::EncryptionRequired);
1066 }
1067
1068 Ok(Self {
1069 reader,
1070 version,
1071 #[cfg(feature = "encryption")]
1072 encryption_key: None,
1073 })
1074 }
1075
1076 #[cfg(feature = "encryption")]
1081 pub fn open_encrypted(
1082 path: impl AsRef<Path>,
1083 key: crate::encryption::EncryptionKey,
1084 ) -> Result<Self, FormatError> {
1085 let file = File::open(path.as_ref())?;
1086 let mut reader = BufReader::new(file);
1087 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1088
1089 Ok(Self {
1090 reader,
1091 version,
1092 encryption_key: Some(key),
1093 })
1094 }
1095
1096 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1102 #[cfg(feature = "encryption")]
1103 if self.version == format::FORMAT_VERSION_ENCRYPTED {
1104 return self.read_encrypted_record();
1105 }
1106
1107 self.read_v2_record()
1108 }
1109
1110 fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1112 let tag = match format::read_u8(&mut self.reader) {
1114 Ok(t) => t,
1115 Err(FormatError::UnexpectedEof) => return Ok(None),
1116 Err(e) => return Err(e),
1117 };
1118
1119 let record_result = self.read_payload_for_tag(tag);
1122 match record_result {
1123 Ok((payload, stored_crc)) => {
1124 let mut full = Vec::with_capacity(1 + payload.len());
1126 full.push(tag);
1127 full.extend_from_slice(&payload);
1128 format::verify_crc32(&full, stored_crc)?;
1129 AofRecord::from_bytes(&full).map(Some)
1130 }
1131 Err(FormatError::UnexpectedEof) => Ok(None),
1133 Err(e) => Err(e),
1134 }
1135 }
1136
1137 #[cfg(feature = "encryption")]
1139 fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1140 let key = self
1141 .encryption_key
1142 .as_ref()
1143 .ok_or(FormatError::EncryptionRequired)?;
1144
1145 let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
1147 if let Err(e) = self.reader.read_exact(&mut nonce) {
1148 return if e.kind() == io::ErrorKind::UnexpectedEof {
1149 Ok(None)
1150 } else {
1151 Err(FormatError::Io(e))
1152 };
1153 }
1154
1155 let ct_len = match format::read_u32(&mut self.reader) {
1157 Ok(n) => n as usize,
1158 Err(FormatError::UnexpectedEof) => return Ok(None),
1159 Err(e) => return Err(e),
1160 };
1161
1162 if ct_len > format::MAX_FIELD_LEN {
1163 return Err(FormatError::Io(io::Error::new(
1164 io::ErrorKind::InvalidData,
1165 format!("encrypted record length {ct_len} exceeds maximum"),
1166 )));
1167 }
1168
1169 let mut ciphertext = vec![0u8; ct_len];
1170 if let Err(e) = self.reader.read_exact(&mut ciphertext) {
1171 return if e.kind() == io::ErrorKind::UnexpectedEof {
1172 Ok(None)
1173 } else {
1174 Err(FormatError::Io(e))
1175 };
1176 }
1177
1178 let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
1179 AofRecord::from_bytes(&plaintext).map(Some)
1180 }
1181
1182 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
1184 let mut payload = Vec::new();
1185 match tag {
1186 TAG_SET => {
1187 let key = format::read_bytes(&mut self.reader)?;
1189 format::write_bytes(&mut payload, &key)?;
1190 let value = format::read_bytes(&mut self.reader)?;
1191 format::write_bytes(&mut payload, &value)?;
1192 let expire_ms = format::read_i64(&mut self.reader)?;
1193 format::write_i64(&mut payload, expire_ms)?;
1194 }
1195 TAG_DEL => {
1196 let key = format::read_bytes(&mut self.reader)?;
1197 format::write_bytes(&mut payload, &key)?;
1198 }
1199 TAG_EXPIRE => {
1200 let key = format::read_bytes(&mut self.reader)?;
1201 format::write_bytes(&mut payload, &key)?;
1202 let seconds = format::read_i64(&mut self.reader)?;
1203 format::write_i64(&mut payload, seconds)?;
1204 }
1205 TAG_LPUSH | TAG_RPUSH => {
1206 let key = format::read_bytes(&mut self.reader)?;
1207 format::write_bytes(&mut payload, &key)?;
1208 let count = format::read_u32(&mut self.reader)?;
1209 format::validate_collection_count(count, "list")?;
1210 format::write_u32(&mut payload, count)?;
1211 for _ in 0..count {
1212 let val = format::read_bytes(&mut self.reader)?;
1213 format::write_bytes(&mut payload, &val)?;
1214 }
1215 }
1216 TAG_LPOP | TAG_RPOP => {
1217 let key = format::read_bytes(&mut self.reader)?;
1218 format::write_bytes(&mut payload, &key)?;
1219 }
1220 TAG_ZADD => {
1221 let key = format::read_bytes(&mut self.reader)?;
1222 format::write_bytes(&mut payload, &key)?;
1223 let count = format::read_u32(&mut self.reader)?;
1224 format::validate_collection_count(count, "sorted set")?;
1225 format::write_u32(&mut payload, count)?;
1226 for _ in 0..count {
1227 let score = format::read_f64(&mut self.reader)?;
1228 format::write_f64(&mut payload, score)?;
1229 let member = format::read_bytes(&mut self.reader)?;
1230 format::write_bytes(&mut payload, &member)?;
1231 }
1232 }
1233 TAG_ZREM => {
1234 let key = format::read_bytes(&mut self.reader)?;
1235 format::write_bytes(&mut payload, &key)?;
1236 let count = format::read_u32(&mut self.reader)?;
1237 format::validate_collection_count(count, "sorted set")?;
1238 format::write_u32(&mut payload, count)?;
1239 for _ in 0..count {
1240 let member = format::read_bytes(&mut self.reader)?;
1241 format::write_bytes(&mut payload, &member)?;
1242 }
1243 }
1244 TAG_PERSIST => {
1245 let key = format::read_bytes(&mut self.reader)?;
1246 format::write_bytes(&mut payload, &key)?;
1247 }
1248 TAG_PEXPIRE => {
1249 let key = format::read_bytes(&mut self.reader)?;
1250 format::write_bytes(&mut payload, &key)?;
1251 let millis = format::read_i64(&mut self.reader)?;
1252 format::write_i64(&mut payload, millis)?;
1253 }
1254 TAG_INCR | TAG_DECR => {
1255 let key = format::read_bytes(&mut self.reader)?;
1256 format::write_bytes(&mut payload, &key)?;
1257 }
1258 TAG_HSET => {
1259 let key = format::read_bytes(&mut self.reader)?;
1260 format::write_bytes(&mut payload, &key)?;
1261 let count = format::read_u32(&mut self.reader)?;
1262 format::validate_collection_count(count, "hash")?;
1263 format::write_u32(&mut payload, count)?;
1264 for _ in 0..count {
1265 let field = format::read_bytes(&mut self.reader)?;
1266 format::write_bytes(&mut payload, &field)?;
1267 let value = format::read_bytes(&mut self.reader)?;
1268 format::write_bytes(&mut payload, &value)?;
1269 }
1270 }
1271 TAG_HDEL | TAG_SADD | TAG_SREM => {
1272 let key = format::read_bytes(&mut self.reader)?;
1273 format::write_bytes(&mut payload, &key)?;
1274 let count = format::read_u32(&mut self.reader)?;
1275 format::validate_collection_count(count, "set")?;
1276 format::write_u32(&mut payload, count)?;
1277 for _ in 0..count {
1278 let item = format::read_bytes(&mut self.reader)?;
1279 format::write_bytes(&mut payload, &item)?;
1280 }
1281 }
1282 TAG_HINCRBY => {
1283 let key = format::read_bytes(&mut self.reader)?;
1284 format::write_bytes(&mut payload, &key)?;
1285 let field = format::read_bytes(&mut self.reader)?;
1286 format::write_bytes(&mut payload, &field)?;
1287 let delta = format::read_i64(&mut self.reader)?;
1288 format::write_i64(&mut payload, delta)?;
1289 }
1290 TAG_INCRBY | TAG_DECRBY => {
1291 let key = format::read_bytes(&mut self.reader)?;
1292 format::write_bytes(&mut payload, &key)?;
1293 let delta = format::read_i64(&mut self.reader)?;
1294 format::write_i64(&mut payload, delta)?;
1295 }
1296 TAG_APPEND => {
1297 let key = format::read_bytes(&mut self.reader)?;
1298 format::write_bytes(&mut payload, &key)?;
1299 let value = format::read_bytes(&mut self.reader)?;
1300 format::write_bytes(&mut payload, &value)?;
1301 }
1302 TAG_RENAME => {
1303 let key = format::read_bytes(&mut self.reader)?;
1304 format::write_bytes(&mut payload, &key)?;
1305 let newkey = format::read_bytes(&mut self.reader)?;
1306 format::write_bytes(&mut payload, &newkey)?;
1307 }
1308 TAG_COPY => {
1309 let source = format::read_bytes(&mut self.reader)?;
1310 format::write_bytes(&mut payload, &source)?;
1311 let dest = format::read_bytes(&mut self.reader)?;
1312 format::write_bytes(&mut payload, &dest)?;
1313 let replace = format::read_u8(&mut self.reader)?;
1314 payload.push(replace);
1315 }
1316 #[cfg(feature = "vector")]
1317 TAG_VADD => {
1318 let key = format::read_bytes(&mut self.reader)?;
1319 format::write_bytes(&mut payload, &key)?;
1320 let element = format::read_bytes(&mut self.reader)?;
1321 format::write_bytes(&mut payload, &element)?;
1322 let dim = format::read_u32(&mut self.reader)?;
1323 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1324 return Err(FormatError::InvalidData(format!(
1325 "AOF VADD dimension {dim} exceeds max {}",
1326 format::MAX_PERSISTED_VECTOR_DIMS
1327 )));
1328 }
1329 format::write_u32(&mut payload, dim)?;
1330 for _ in 0..dim {
1331 let v = format::read_f32(&mut self.reader)?;
1332 format::write_f32(&mut payload, v)?;
1333 }
1334 let metric = format::read_u8(&mut self.reader)?;
1335 format::write_u8(&mut payload, metric)?;
1336 let quantization = format::read_u8(&mut self.reader)?;
1337 format::write_u8(&mut payload, quantization)?;
1338 let connectivity = format::read_u32(&mut self.reader)?;
1339 format::write_u32(&mut payload, connectivity)?;
1340 let expansion_add = format::read_u32(&mut self.reader)?;
1341 format::write_u32(&mut payload, expansion_add)?;
1342 }
1343 #[cfg(feature = "vector")]
1344 TAG_VREM => {
1345 let key = format::read_bytes(&mut self.reader)?;
1346 format::write_bytes(&mut payload, &key)?;
1347 let element = format::read_bytes(&mut self.reader)?;
1348 format::write_bytes(&mut payload, &element)?;
1349 }
1350 #[cfg(feature = "protobuf")]
1351 TAG_PROTO_SET => {
1352 let key = format::read_bytes(&mut self.reader)?;
1353 format::write_bytes(&mut payload, &key)?;
1354 let type_name = format::read_bytes(&mut self.reader)?;
1355 format::write_bytes(&mut payload, &type_name)?;
1356 let data = format::read_bytes(&mut self.reader)?;
1357 format::write_bytes(&mut payload, &data)?;
1358 let expire_ms = format::read_i64(&mut self.reader)?;
1359 format::write_i64(&mut payload, expire_ms)?;
1360 }
1361 #[cfg(feature = "protobuf")]
1362 TAG_PROTO_REGISTER => {
1363 let name = format::read_bytes(&mut self.reader)?;
1364 format::write_bytes(&mut payload, &name)?;
1365 let descriptor = format::read_bytes(&mut self.reader)?;
1366 format::write_bytes(&mut payload, &descriptor)?;
1367 }
1368 _ => return Err(FormatError::UnknownTag(tag)),
1369 }
1370 let stored_crc = format::read_u32(&mut self.reader)?;
1371 Ok((payload, stored_crc))
1372 }
1373}
1374
1375fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1377 let mut opts = OpenOptions::new();
1378 opts.create(true).append(true);
1379 #[cfg(unix)]
1380 {
1381 use std::os::unix::fs::OpenOptionsExt;
1382 opts.mode(0o600);
1383 }
1384 Ok(opts.open(path)?)
1385}
1386
1387pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1389 data_dir.join(format!("shard-{shard_id}.aof"))
1390}
1391
1392#[cfg(test)]
1393mod tests {
1394 use super::*;
1395
1396 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1397
1398 fn temp_dir() -> tempfile::TempDir {
1399 tempfile::tempdir().expect("create temp dir")
1400 }
1401
1402 #[test]
1403 fn record_round_trip_set() -> Result {
1404 let rec = AofRecord::Set {
1405 key: "hello".into(),
1406 value: Bytes::from("world"),
1407 expire_ms: 5000,
1408 };
1409 let bytes = rec.to_bytes()?;
1410 let decoded = AofRecord::from_bytes(&bytes)?;
1411 assert_eq!(rec, decoded);
1412 Ok(())
1413 }
1414
1415 #[test]
1416 fn record_round_trip_del() -> Result {
1417 let rec = AofRecord::Del { key: "gone".into() };
1418 let bytes = rec.to_bytes()?;
1419 let decoded = AofRecord::from_bytes(&bytes)?;
1420 assert_eq!(rec, decoded);
1421 Ok(())
1422 }
1423
1424 #[test]
1425 fn record_round_trip_expire() -> Result {
1426 let rec = AofRecord::Expire {
1427 key: "ttl".into(),
1428 seconds: 300,
1429 };
1430 let bytes = rec.to_bytes()?;
1431 let decoded = AofRecord::from_bytes(&bytes)?;
1432 assert_eq!(rec, decoded);
1433 Ok(())
1434 }
1435
1436 #[test]
1437 fn set_with_no_expiry() -> Result {
1438 let rec = AofRecord::Set {
1439 key: "k".into(),
1440 value: Bytes::from("v"),
1441 expire_ms: -1,
1442 };
1443 let bytes = rec.to_bytes()?;
1444 let decoded = AofRecord::from_bytes(&bytes)?;
1445 assert_eq!(rec, decoded);
1446 Ok(())
1447 }
1448
1449 #[test]
1450 fn writer_reader_round_trip() -> Result {
1451 let dir = temp_dir();
1452 let path = dir.path().join("test.aof");
1453
1454 let records = vec![
1455 AofRecord::Set {
1456 key: "a".into(),
1457 value: Bytes::from("1"),
1458 expire_ms: -1,
1459 },
1460 AofRecord::Set {
1461 key: "b".into(),
1462 value: Bytes::from("2"),
1463 expire_ms: 10_000,
1464 },
1465 AofRecord::Del { key: "a".into() },
1466 AofRecord::Expire {
1467 key: "b".into(),
1468 seconds: 60,
1469 },
1470 ];
1471
1472 {
1474 let mut writer = AofWriter::open(&path)?;
1475 for rec in &records {
1476 writer.write_record(rec)?;
1477 }
1478 writer.sync()?;
1479 }
1480
1481 let mut reader = AofReader::open(&path)?;
1483 let mut got = Vec::new();
1484 while let Some(rec) = reader.read_record()? {
1485 got.push(rec);
1486 }
1487 assert_eq!(records, got);
1488 Ok(())
1489 }
1490
1491 #[test]
1492 fn empty_aof_returns_no_records() -> Result {
1493 let dir = temp_dir();
1494 let path = dir.path().join("empty.aof");
1495
1496 {
1498 let _writer = AofWriter::open(&path)?;
1499 }
1500
1501 let mut reader = AofReader::open(&path)?;
1502 assert!(reader.read_record()?.is_none());
1503 Ok(())
1504 }
1505
1506 #[test]
1507 fn truncated_record_treated_as_eof() -> Result {
1508 let dir = temp_dir();
1509 let path = dir.path().join("trunc.aof");
1510
1511 {
1513 let mut writer = AofWriter::open(&path)?;
1514 writer.write_record(&AofRecord::Set {
1515 key: "ok".into(),
1516 value: Bytes::from("good"),
1517 expire_ms: -1,
1518 })?;
1519 writer.flush()?;
1520 }
1521
1522 {
1524 let mut file = OpenOptions::new().append(true).open(&path)?;
1525 file.write_all(&[TAG_SET])?;
1526 }
1527
1528 let mut reader = AofReader::open(&path)?;
1529 let rec = reader.read_record()?.unwrap();
1531 assert!(matches!(rec, AofRecord::Set { .. }));
1532 assert!(reader.read_record()?.is_none());
1534 Ok(())
1535 }
1536
1537 #[test]
1538 fn corrupt_crc_detected() -> Result {
1539 let dir = temp_dir();
1540 let path = dir.path().join("corrupt.aof");
1541
1542 {
1543 let mut writer = AofWriter::open(&path)?;
1544 writer.write_record(&AofRecord::Set {
1545 key: "k".into(),
1546 value: Bytes::from("v"),
1547 expire_ms: -1,
1548 })?;
1549 writer.flush()?;
1550 }
1551
1552 let mut data = fs::read(&path)?;
1554 let last = data.len() - 1;
1555 data[last] ^= 0xFF;
1556 fs::write(&path, &data)?;
1557
1558 let mut reader = AofReader::open(&path)?;
1559 let err = reader.read_record().unwrap_err();
1560 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1561 Ok(())
1562 }
1563
1564 #[test]
1565 fn missing_magic_is_error() {
1566 let dir = temp_dir();
1567 let path = dir.path().join("bad.aof");
1568 fs::write(&path, b"NOT_AOF_DATA").unwrap();
1569
1570 let err = AofReader::open(&path).unwrap_err();
1571 assert!(matches!(err, FormatError::InvalidMagic));
1572 }
1573
1574 #[test]
1575 fn truncate_resets_aof() -> Result {
1576 let dir = temp_dir();
1577 let path = dir.path().join("reset.aof");
1578
1579 {
1580 let mut writer = AofWriter::open(&path)?;
1581 writer.write_record(&AofRecord::Set {
1582 key: "old".into(),
1583 value: Bytes::from("data"),
1584 expire_ms: -1,
1585 })?;
1586 writer.truncate()?;
1587
1588 writer.write_record(&AofRecord::Set {
1590 key: "new".into(),
1591 value: Bytes::from("fresh"),
1592 expire_ms: -1,
1593 })?;
1594 writer.sync()?;
1595 }
1596
1597 let mut reader = AofReader::open(&path)?;
1598 let rec = reader.read_record()?.unwrap();
1599 match rec {
1600 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1601 other => panic!("expected Set, got {other:?}"),
1602 }
1603 assert!(reader.read_record()?.is_none());
1605 Ok(())
1606 }
1607
1608 #[test]
1609 fn record_round_trip_lpush() -> Result {
1610 let rec = AofRecord::LPush {
1611 key: "list".into(),
1612 values: vec![Bytes::from("a"), Bytes::from("b")],
1613 };
1614 let bytes = rec.to_bytes()?;
1615 let decoded = AofRecord::from_bytes(&bytes)?;
1616 assert_eq!(rec, decoded);
1617 Ok(())
1618 }
1619
1620 #[test]
1621 fn record_round_trip_rpush() -> Result {
1622 let rec = AofRecord::RPush {
1623 key: "list".into(),
1624 values: vec![Bytes::from("x")],
1625 };
1626 let bytes = rec.to_bytes()?;
1627 let decoded = AofRecord::from_bytes(&bytes)?;
1628 assert_eq!(rec, decoded);
1629 Ok(())
1630 }
1631
1632 #[test]
1633 fn record_round_trip_lpop() -> Result {
1634 let rec = AofRecord::LPop { key: "list".into() };
1635 let bytes = rec.to_bytes()?;
1636 let decoded = AofRecord::from_bytes(&bytes)?;
1637 assert_eq!(rec, decoded);
1638 Ok(())
1639 }
1640
1641 #[test]
1642 fn record_round_trip_rpop() -> Result {
1643 let rec = AofRecord::RPop { key: "list".into() };
1644 let bytes = rec.to_bytes()?;
1645 let decoded = AofRecord::from_bytes(&bytes)?;
1646 assert_eq!(rec, decoded);
1647 Ok(())
1648 }
1649
1650 #[test]
1651 fn writer_reader_round_trip_with_list_records() -> Result {
1652 let dir = temp_dir();
1653 let path = dir.path().join("list.aof");
1654
1655 let records = vec![
1656 AofRecord::LPush {
1657 key: "l".into(),
1658 values: vec![Bytes::from("a"), Bytes::from("b")],
1659 },
1660 AofRecord::RPush {
1661 key: "l".into(),
1662 values: vec![Bytes::from("c")],
1663 },
1664 AofRecord::LPop { key: "l".into() },
1665 AofRecord::RPop { key: "l".into() },
1666 ];
1667
1668 {
1669 let mut writer = AofWriter::open(&path)?;
1670 for rec in &records {
1671 writer.write_record(rec)?;
1672 }
1673 writer.sync()?;
1674 }
1675
1676 let mut reader = AofReader::open(&path)?;
1677 let mut got = Vec::new();
1678 while let Some(rec) = reader.read_record()? {
1679 got.push(rec);
1680 }
1681 assert_eq!(records, got);
1682 Ok(())
1683 }
1684
1685 #[test]
1686 fn record_round_trip_zadd() -> Result {
1687 let rec = AofRecord::ZAdd {
1688 key: "board".into(),
1689 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1690 };
1691 let bytes = rec.to_bytes()?;
1692 let decoded = AofRecord::from_bytes(&bytes)?;
1693 assert_eq!(rec, decoded);
1694 Ok(())
1695 }
1696
1697 #[test]
1698 fn record_round_trip_zrem() -> Result {
1699 let rec = AofRecord::ZRem {
1700 key: "board".into(),
1701 members: vec!["alice".into(), "bob".into()],
1702 };
1703 let bytes = rec.to_bytes()?;
1704 let decoded = AofRecord::from_bytes(&bytes)?;
1705 assert_eq!(rec, decoded);
1706 Ok(())
1707 }
1708
1709 #[test]
1710 fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1711 let dir = temp_dir();
1712 let path = dir.path().join("zset.aof");
1713
1714 let records = vec![
1715 AofRecord::ZAdd {
1716 key: "board".into(),
1717 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1718 },
1719 AofRecord::ZRem {
1720 key: "board".into(),
1721 members: vec!["alice".into()],
1722 },
1723 ];
1724
1725 {
1726 let mut writer = AofWriter::open(&path)?;
1727 for rec in &records {
1728 writer.write_record(rec)?;
1729 }
1730 writer.sync()?;
1731 }
1732
1733 let mut reader = AofReader::open(&path)?;
1734 let mut got = Vec::new();
1735 while let Some(rec) = reader.read_record()? {
1736 got.push(rec);
1737 }
1738 assert_eq!(records, got);
1739 Ok(())
1740 }
1741
1742 #[test]
1743 fn record_round_trip_persist() -> Result {
1744 let rec = AofRecord::Persist {
1745 key: "mykey".into(),
1746 };
1747 let bytes = rec.to_bytes()?;
1748 let decoded = AofRecord::from_bytes(&bytes)?;
1749 assert_eq!(rec, decoded);
1750 Ok(())
1751 }
1752
1753 #[test]
1754 fn record_round_trip_pexpire() -> Result {
1755 let rec = AofRecord::Pexpire {
1756 key: "mykey".into(),
1757 milliseconds: 5000,
1758 };
1759 let bytes = rec.to_bytes()?;
1760 let decoded = AofRecord::from_bytes(&bytes)?;
1761 assert_eq!(rec, decoded);
1762 Ok(())
1763 }
1764
1765 #[test]
1766 fn record_round_trip_incr() -> Result {
1767 let rec = AofRecord::Incr {
1768 key: "counter".into(),
1769 };
1770 let bytes = rec.to_bytes()?;
1771 let decoded = AofRecord::from_bytes(&bytes)?;
1772 assert_eq!(rec, decoded);
1773 Ok(())
1774 }
1775
1776 #[test]
1777 fn record_round_trip_decr() -> Result {
1778 let rec = AofRecord::Decr {
1779 key: "counter".into(),
1780 };
1781 let bytes = rec.to_bytes()?;
1782 let decoded = AofRecord::from_bytes(&bytes)?;
1783 assert_eq!(rec, decoded);
1784 Ok(())
1785 }
1786
1787 #[test]
1788 fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1789 let dir = temp_dir();
1790 let path = dir.path().join("persist_pexpire.aof");
1791
1792 let records = vec![
1793 AofRecord::Set {
1794 key: "k".into(),
1795 value: Bytes::from("v"),
1796 expire_ms: 5000,
1797 },
1798 AofRecord::Persist { key: "k".into() },
1799 AofRecord::Pexpire {
1800 key: "k".into(),
1801 milliseconds: 3000,
1802 },
1803 ];
1804
1805 {
1806 let mut writer = AofWriter::open(&path)?;
1807 for rec in &records {
1808 writer.write_record(rec)?;
1809 }
1810 writer.sync()?;
1811 }
1812
1813 let mut reader = AofReader::open(&path)?;
1814 let mut got = Vec::new();
1815 while let Some(rec) = reader.read_record()? {
1816 got.push(rec);
1817 }
1818 assert_eq!(records, got);
1819 Ok(())
1820 }
1821
1822 #[test]
1823 fn aof_path_format() {
1824 let p = aof_path(Path::new("/data"), 3);
1825 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1826 }
1827
1828 #[test]
1829 fn record_round_trip_hset() -> Result {
1830 let rec = AofRecord::HSet {
1831 key: "hash".into(),
1832 fields: vec![
1833 ("f1".into(), Bytes::from("v1")),
1834 ("f2".into(), Bytes::from("v2")),
1835 ],
1836 };
1837 let bytes = rec.to_bytes()?;
1838 let decoded = AofRecord::from_bytes(&bytes)?;
1839 assert_eq!(rec, decoded);
1840 Ok(())
1841 }
1842
1843 #[test]
1844 fn record_round_trip_hdel() -> Result {
1845 let rec = AofRecord::HDel {
1846 key: "hash".into(),
1847 fields: vec!["f1".into(), "f2".into()],
1848 };
1849 let bytes = rec.to_bytes()?;
1850 let decoded = AofRecord::from_bytes(&bytes)?;
1851 assert_eq!(rec, decoded);
1852 Ok(())
1853 }
1854
1855 #[test]
1856 fn record_round_trip_hincrby() -> Result {
1857 let rec = AofRecord::HIncrBy {
1858 key: "hash".into(),
1859 field: "counter".into(),
1860 delta: -42,
1861 };
1862 let bytes = rec.to_bytes()?;
1863 let decoded = AofRecord::from_bytes(&bytes)?;
1864 assert_eq!(rec, decoded);
1865 Ok(())
1866 }
1867
1868 #[test]
1869 fn record_round_trip_sadd() -> Result {
1870 let rec = AofRecord::SAdd {
1871 key: "set".into(),
1872 members: vec!["m1".into(), "m2".into(), "m3".into()],
1873 };
1874 let bytes = rec.to_bytes()?;
1875 let decoded = AofRecord::from_bytes(&bytes)?;
1876 assert_eq!(rec, decoded);
1877 Ok(())
1878 }
1879
1880 #[test]
1881 fn record_round_trip_srem() -> Result {
1882 let rec = AofRecord::SRem {
1883 key: "set".into(),
1884 members: vec!["m1".into()],
1885 };
1886 let bytes = rec.to_bytes()?;
1887 let decoded = AofRecord::from_bytes(&bytes)?;
1888 assert_eq!(rec, decoded);
1889 Ok(())
1890 }
1891
1892 #[cfg(feature = "vector")]
1893 #[test]
1894 fn record_round_trip_vadd() -> Result {
1895 let rec = AofRecord::VAdd {
1896 key: "embeddings".into(),
1897 element: "doc1".into(),
1898 vector: vec![0.1, 0.2, 0.3],
1899 metric: 0, quantization: 0, connectivity: 16,
1902 expansion_add: 64,
1903 };
1904 let bytes = rec.to_bytes()?;
1905 let decoded = AofRecord::from_bytes(&bytes)?;
1906 assert_eq!(rec, decoded);
1907 Ok(())
1908 }
1909
1910 #[cfg(feature = "vector")]
1911 #[test]
1912 fn record_round_trip_vadd_high_dim() -> Result {
1913 let rec = AofRecord::VAdd {
1914 key: "vecs".into(),
1915 element: "e".into(),
1916 vector: vec![0.0; 1536], metric: 1, quantization: 1, connectivity: 32,
1920 expansion_add: 128,
1921 };
1922 let bytes = rec.to_bytes()?;
1923 let decoded = AofRecord::from_bytes(&bytes)?;
1924 assert_eq!(rec, decoded);
1925 Ok(())
1926 }
1927
1928 #[cfg(feature = "vector")]
1929 #[test]
1930 fn record_round_trip_vrem() -> Result {
1931 let rec = AofRecord::VRem {
1932 key: "embeddings".into(),
1933 element: "doc1".into(),
1934 };
1935 let bytes = rec.to_bytes()?;
1936 let decoded = AofRecord::from_bytes(&bytes)?;
1937 assert_eq!(rec, decoded);
1938 Ok(())
1939 }
1940
1941 #[cfg(feature = "encryption")]
1942 mod encrypted {
1943 use super::*;
1944 use crate::encryption::EncryptionKey;
1945
1946 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1947
1948 fn test_key() -> EncryptionKey {
1949 EncryptionKey::from_bytes([0x42; 32])
1950 }
1951
1952 #[test]
1953 fn encrypted_writer_reader_round_trip() -> Result {
1954 let dir = temp_dir();
1955 let path = dir.path().join("enc.aof");
1956 let key = test_key();
1957
1958 let records = vec![
1959 AofRecord::Set {
1960 key: "a".into(),
1961 value: Bytes::from("1"),
1962 expire_ms: -1,
1963 },
1964 AofRecord::Del { key: "a".into() },
1965 AofRecord::LPush {
1966 key: "list".into(),
1967 values: vec![Bytes::from("x"), Bytes::from("y")],
1968 },
1969 AofRecord::ZAdd {
1970 key: "zs".into(),
1971 members: vec![(1.0, "m".into())],
1972 },
1973 ];
1974
1975 {
1976 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1977 for rec in &records {
1978 writer.write_record(rec)?;
1979 }
1980 writer.sync()?;
1981 }
1982
1983 let mut reader = AofReader::open_encrypted(&path, key)?;
1984 let mut got = Vec::new();
1985 while let Some(rec) = reader.read_record()? {
1986 got.push(rec);
1987 }
1988 assert_eq!(records, got);
1989 Ok(())
1990 }
1991
1992 #[test]
1993 fn encrypted_aof_wrong_key_fails() -> Result {
1994 let dir = temp_dir();
1995 let path = dir.path().join("enc_bad.aof");
1996 let key = test_key();
1997 let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
1998
1999 {
2000 let mut writer = AofWriter::open_encrypted(&path, key)?;
2001 writer.write_record(&AofRecord::Set {
2002 key: "k".into(),
2003 value: Bytes::from("v"),
2004 expire_ms: -1,
2005 })?;
2006 writer.sync()?;
2007 }
2008
2009 let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
2010 let err = reader.read_record().unwrap_err();
2011 assert!(matches!(err, FormatError::DecryptionFailed));
2012 Ok(())
2013 }
2014
2015 #[test]
2016 fn v2_file_readable_with_encryption_key() -> Result {
2017 let dir = temp_dir();
2018 let path = dir.path().join("v2.aof");
2019 let key = test_key();
2020
2021 {
2023 let mut writer = AofWriter::open(&path)?;
2024 writer.write_record(&AofRecord::Set {
2025 key: "k".into(),
2026 value: Bytes::from("v"),
2027 expire_ms: -1,
2028 })?;
2029 writer.sync()?;
2030 }
2031
2032 let mut reader = AofReader::open_encrypted(&path, key)?;
2034 let rec = reader.read_record()?.unwrap();
2035 assert!(matches!(rec, AofRecord::Set { .. }));
2036 Ok(())
2037 }
2038
2039 #[test]
2040 fn v3_file_without_key_returns_error() -> Result {
2041 let dir = temp_dir();
2042 let path = dir.path().join("v3_nokey.aof");
2043 let key = test_key();
2044
2045 {
2047 let mut writer = AofWriter::open_encrypted(&path, key)?;
2048 writer.write_record(&AofRecord::Set {
2049 key: "k".into(),
2050 value: Bytes::from("v"),
2051 expire_ms: -1,
2052 })?;
2053 writer.sync()?;
2054 }
2055
2056 let err = AofReader::open(&path).unwrap_err();
2058 assert!(matches!(err, FormatError::EncryptionRequired));
2059 Ok(())
2060 }
2061
2062 #[test]
2063 fn encrypted_truncate_preserves_encryption() -> Result {
2064 let dir = temp_dir();
2065 let path = dir.path().join("enc_trunc.aof");
2066 let key = test_key();
2067
2068 {
2069 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
2070 writer.write_record(&AofRecord::Set {
2071 key: "old".into(),
2072 value: Bytes::from("data"),
2073 expire_ms: -1,
2074 })?;
2075 writer.truncate()?;
2076
2077 writer.write_record(&AofRecord::Set {
2078 key: "new".into(),
2079 value: Bytes::from("fresh"),
2080 expire_ms: -1,
2081 })?;
2082 writer.sync()?;
2083 }
2084
2085 let mut reader = AofReader::open_encrypted(&path, key)?;
2086 let rec = reader.read_record()?.unwrap();
2087 match rec {
2088 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
2089 other => panic!("expected Set, got {other:?}"),
2090 }
2091 assert!(reader.read_record()?.is_none());
2092 Ok(())
2093 }
2094 }
2095}