1use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32 let bytes = format::read_bytes(r)?;
33 String::from_utf8(bytes).map_err(|_| {
34 FormatError::Io(io::Error::new(
35 io::ErrorKind::InvalidData,
36 format!("{field} is not valid utf-8"),
37 ))
38 })
39}
40
41fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44 let count = format::read_u32(r)?;
45 format::validate_collection_count(count, field)?;
46 let mut items = Vec::with_capacity(format::capped_capacity(count));
47 for _ in 0..count {
48 items.push(read_string(r, field)?);
49 }
50 Ok(items)
51}
52
53fn read_bytes_list(r: &mut impl io::Read, label: &str) -> Result<Vec<Bytes>, FormatError> {
56 let count = format::read_u32(r)?;
57 format::validate_collection_count(count, label)?;
58 let mut items = Vec::with_capacity(format::capped_capacity(count));
59 for _ in 0..count {
60 items.push(Bytes::from(format::read_bytes(r)?));
61 }
62 Ok(items)
63}
64
65const TAG_SET: u8 = 1;
70const TAG_INCR: u8 = 12;
71const TAG_DECR: u8 = 13;
72const TAG_INCRBY: u8 = 19;
73const TAG_DECRBY: u8 = 20;
74const TAG_APPEND: u8 = 21;
75
76const TAG_LPUSH: u8 = 4;
78const TAG_RPUSH: u8 = 5;
79const TAG_LPOP: u8 = 6;
80const TAG_RPOP: u8 = 7;
81
82const TAG_ZADD: u8 = 8;
84const TAG_ZREM: u8 = 9;
85
86const TAG_HSET: u8 = 14;
88const TAG_HDEL: u8 = 15;
89const TAG_HINCRBY: u8 = 16;
90
91const TAG_SADD: u8 = 17;
93const TAG_SREM: u8 = 18;
94
95const TAG_DEL: u8 = 2;
97const TAG_EXPIRE: u8 = 3;
98const TAG_PERSIST: u8 = 10;
99const TAG_PEXPIRE: u8 = 11;
100const TAG_RENAME: u8 = 22;
101
102#[cfg(feature = "vector")]
104const TAG_VADD: u8 = 25;
105#[cfg(feature = "vector")]
106const TAG_VREM: u8 = 26;
107
108#[cfg(feature = "protobuf")]
110const TAG_PROTO_SET: u8 = 23;
111#[cfg(feature = "protobuf")]
112const TAG_PROTO_REGISTER: u8 = 24;
113
114#[derive(Debug, Clone, PartialEq)]
116pub enum AofRecord {
117 Set {
119 key: String,
120 value: Bytes,
121 expire_ms: i64,
122 },
123 Del { key: String },
125 Expire { key: String, seconds: u64 },
127 LPush { key: String, values: Vec<Bytes> },
129 RPush { key: String, values: Vec<Bytes> },
131 LPop { key: String },
133 RPop { key: String },
135 ZAdd {
137 key: String,
138 members: Vec<(f64, String)>,
139 },
140 ZRem { key: String, members: Vec<String> },
142 Persist { key: String },
144 Pexpire { key: String, milliseconds: u64 },
146 Incr { key: String },
148 Decr { key: String },
150 HSet {
152 key: String,
153 fields: Vec<(String, Bytes)>,
154 },
155 HDel { key: String, fields: Vec<String> },
157 HIncrBy {
159 key: String,
160 field: String,
161 delta: i64,
162 },
163 SAdd { key: String, members: Vec<String> },
165 SRem { key: String, members: Vec<String> },
167 IncrBy { key: String, delta: i64 },
169 DecrBy { key: String, delta: i64 },
171 Append { key: String, value: Bytes },
173 Rename { key: String, newkey: String },
175 #[cfg(feature = "vector")]
178 VAdd {
179 key: String,
180 element: String,
181 vector: Vec<f32>,
182 metric: u8,
184 quantization: u8,
186 connectivity: u32,
187 expansion_add: u32,
188 },
189 #[cfg(feature = "vector")]
191 VRem { key: String, element: String },
192 #[cfg(feature = "protobuf")]
194 ProtoSet {
195 key: String,
196 type_name: String,
197 data: Bytes,
198 expire_ms: i64,
199 },
200 #[cfg(feature = "protobuf")]
202 ProtoRegister { name: String, descriptor: Bytes },
203}
204
205impl AofRecord {
206 fn tag(&self) -> u8 {
216 match self {
217 AofRecord::Set { .. } => TAG_SET,
218 AofRecord::Del { .. } => TAG_DEL,
219 AofRecord::Expire { .. } => TAG_EXPIRE,
220 AofRecord::LPush { .. } => TAG_LPUSH,
221 AofRecord::RPush { .. } => TAG_RPUSH,
222 AofRecord::LPop { .. } => TAG_LPOP,
223 AofRecord::RPop { .. } => TAG_RPOP,
224 AofRecord::ZAdd { .. } => TAG_ZADD,
225 AofRecord::ZRem { .. } => TAG_ZREM,
226 AofRecord::Persist { .. } => TAG_PERSIST,
227 AofRecord::Pexpire { .. } => TAG_PEXPIRE,
228 AofRecord::Incr { .. } => TAG_INCR,
229 AofRecord::Decr { .. } => TAG_DECR,
230 AofRecord::HSet { .. } => TAG_HSET,
231 AofRecord::HDel { .. } => TAG_HDEL,
232 AofRecord::HIncrBy { .. } => TAG_HINCRBY,
233 AofRecord::SAdd { .. } => TAG_SADD,
234 AofRecord::SRem { .. } => TAG_SREM,
235 AofRecord::IncrBy { .. } => TAG_INCRBY,
236 AofRecord::DecrBy { .. } => TAG_DECRBY,
237 AofRecord::Append { .. } => TAG_APPEND,
238 AofRecord::Rename { .. } => TAG_RENAME,
239 #[cfg(feature = "vector")]
240 AofRecord::VAdd { .. } => TAG_VADD,
241 #[cfg(feature = "vector")]
242 AofRecord::VRem { .. } => TAG_VREM,
243 #[cfg(feature = "protobuf")]
244 AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
245 #[cfg(feature = "protobuf")]
246 AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
247 }
248 }
249
250 fn estimated_size(&self) -> usize {
256 const LEN_PREFIX: usize = 4;
258
259 match self {
260 AofRecord::Set {
262 key,
263 value,
264 expire_ms: _,
265 } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
266 AofRecord::Del { key }
268 | AofRecord::LPop { key }
269 | AofRecord::RPop { key }
270 | AofRecord::Persist { key }
271 | AofRecord::Incr { key }
272 | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
273 AofRecord::Expire { key, .. } | AofRecord::Pexpire { key, .. } => {
275 1 + LEN_PREFIX + key.len() + 8
276 }
277 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
279 let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
280 1 + LEN_PREFIX + key.len() + 4 + values_size
281 }
282 AofRecord::ZAdd { key, members } => {
283 let members_size: usize =
284 members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
285 1 + LEN_PREFIX + key.len() + 4 + members_size
286 }
287 AofRecord::ZRem { key, members }
288 | AofRecord::SAdd { key, members }
289 | AofRecord::SRem { key, members } => {
290 let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
291 1 + LEN_PREFIX + key.len() + 4 + members_size
292 }
293 AofRecord::HSet { key, fields } => {
294 let fields_size: usize = fields
295 .iter()
296 .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
297 .sum();
298 1 + LEN_PREFIX + key.len() + 4 + fields_size
299 }
300 AofRecord::HDel { key, fields } => {
301 let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
302 1 + LEN_PREFIX + key.len() + 4 + fields_size
303 }
304 AofRecord::HIncrBy { key, field, .. } => {
305 1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
306 }
307 AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
308 1 + LEN_PREFIX + key.len() + 8
309 }
310 AofRecord::Append { key, value } => {
311 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
312 }
313 AofRecord::Rename { key, newkey } => {
314 1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
315 }
316 #[cfg(feature = "vector")]
317 AofRecord::VAdd {
318 key,
319 element,
320 vector,
321 ..
322 } => {
323 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
324 }
325 #[cfg(feature = "vector")]
326 AofRecord::VRem { key, element } => {
327 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
328 }
329 #[cfg(feature = "protobuf")]
330 AofRecord::ProtoSet {
331 key,
332 type_name,
333 data,
334 ..
335 } => {
336 1 + LEN_PREFIX
337 + key.len()
338 + LEN_PREFIX
339 + type_name.len()
340 + LEN_PREFIX
341 + data.len()
342 + 8
343 }
344 #[cfg(feature = "protobuf")]
345 AofRecord::ProtoRegister { name, descriptor } => {
346 1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
347 }
348 }
349 }
350
351 pub fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
353 let mut buf = Vec::with_capacity(self.estimated_size());
354 format::write_u8(&mut buf, self.tag())?;
355
356 match self {
357 AofRecord::Del { key }
359 | AofRecord::LPop { key }
360 | AofRecord::RPop { key }
361 | AofRecord::Persist { key }
362 | AofRecord::Incr { key }
363 | AofRecord::Decr { key } => {
364 format::write_bytes(&mut buf, key.as_bytes())?;
365 }
366
367 AofRecord::Set {
369 key,
370 value,
371 expire_ms,
372 } => {
373 format::write_bytes(&mut buf, key.as_bytes())?;
374 format::write_bytes(&mut buf, value)?;
375 format::write_i64(&mut buf, *expire_ms)?;
376 }
377
378 AofRecord::Expire { key, seconds } => {
381 format::write_bytes(&mut buf, key.as_bytes())?;
382 format::write_i64(&mut buf, (*seconds).min(i64::MAX as u64) as i64)?;
383 }
384 AofRecord::Pexpire { key, milliseconds } => {
385 format::write_bytes(&mut buf, key.as_bytes())?;
386 format::write_i64(&mut buf, (*milliseconds).min(i64::MAX as u64) as i64)?;
387 }
388 AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
389 format::write_bytes(&mut buf, key.as_bytes())?;
390 format::write_i64(&mut buf, *delta)?;
391 }
392
393 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
395 format::write_bytes(&mut buf, key.as_bytes())?;
396 format::write_len(&mut buf, values.len())?;
397 for v in values {
398 format::write_bytes(&mut buf, v)?;
399 }
400 }
401
402 AofRecord::ZRem { key, members }
404 | AofRecord::SAdd { key, members }
405 | AofRecord::SRem { key, members } => {
406 format::write_bytes(&mut buf, key.as_bytes())?;
407 format::write_len(&mut buf, members.len())?;
408 for member in members {
409 format::write_bytes(&mut buf, member.as_bytes())?;
410 }
411 }
412 AofRecord::HDel { key, fields } => {
413 format::write_bytes(&mut buf, key.as_bytes())?;
414 format::write_len(&mut buf, fields.len())?;
415 for field in fields {
416 format::write_bytes(&mut buf, field.as_bytes())?;
417 }
418 }
419
420 AofRecord::ZAdd { key, members } => {
422 format::write_bytes(&mut buf, key.as_bytes())?;
423 format::write_len(&mut buf, members.len())?;
424 for (score, member) in members {
425 format::write_f64(&mut buf, *score)?;
426 format::write_bytes(&mut buf, member.as_bytes())?;
427 }
428 }
429
430 AofRecord::HSet { key, fields } => {
432 format::write_bytes(&mut buf, key.as_bytes())?;
433 format::write_len(&mut buf, fields.len())?;
434 for (field, value) in fields {
435 format::write_bytes(&mut buf, field.as_bytes())?;
436 format::write_bytes(&mut buf, value)?;
437 }
438 }
439
440 AofRecord::HIncrBy { key, field, delta } => {
442 format::write_bytes(&mut buf, key.as_bytes())?;
443 format::write_bytes(&mut buf, field.as_bytes())?;
444 format::write_i64(&mut buf, *delta)?;
445 }
446
447 AofRecord::Append { key, value } => {
449 format::write_bytes(&mut buf, key.as_bytes())?;
450 format::write_bytes(&mut buf, value)?;
451 }
452
453 AofRecord::Rename { key, newkey } => {
455 format::write_bytes(&mut buf, key.as_bytes())?;
456 format::write_bytes(&mut buf, newkey.as_bytes())?;
457 }
458
459 #[cfg(feature = "vector")]
460 AofRecord::VAdd {
461 key,
462 element,
463 vector,
464 metric,
465 quantization,
466 connectivity,
467 expansion_add,
468 } => {
469 format::write_bytes(&mut buf, key.as_bytes())?;
470 format::write_bytes(&mut buf, element.as_bytes())?;
471 format::write_len(&mut buf, vector.len())?;
472 for &v in vector {
473 format::write_f32(&mut buf, v)?;
474 }
475 format::write_u8(&mut buf, *metric)?;
476 format::write_u8(&mut buf, *quantization)?;
477 format::write_u32(&mut buf, *connectivity)?;
478 format::write_u32(&mut buf, *expansion_add)?;
479 }
480 #[cfg(feature = "vector")]
481 AofRecord::VRem { key, element } => {
482 format::write_bytes(&mut buf, key.as_bytes())?;
483 format::write_bytes(&mut buf, element.as_bytes())?;
484 }
485
486 #[cfg(feature = "protobuf")]
487 AofRecord::ProtoSet {
488 key,
489 type_name,
490 data,
491 expire_ms,
492 } => {
493 format::write_bytes(&mut buf, key.as_bytes())?;
494 format::write_bytes(&mut buf, type_name.as_bytes())?;
495 format::write_bytes(&mut buf, data)?;
496 format::write_i64(&mut buf, *expire_ms)?;
497 }
498 #[cfg(feature = "protobuf")]
499 AofRecord::ProtoRegister { name, descriptor } => {
500 format::write_bytes(&mut buf, name.as_bytes())?;
501 format::write_bytes(&mut buf, descriptor)?;
502 }
503 }
504 Ok(buf)
505 }
506
507 pub fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
512 let mut cursor = io::Cursor::new(data);
513 let tag = format::read_u8(&mut cursor)?;
514 match tag {
515 TAG_SET => {
516 let key = read_string(&mut cursor, "key")?;
517 let value = format::read_bytes(&mut cursor)?;
518 let expire_ms = format::read_i64(&mut cursor)?;
519 Ok(AofRecord::Set {
520 key,
521 value: Bytes::from(value),
522 expire_ms,
523 })
524 }
525 TAG_DEL => {
526 let key = read_string(&mut cursor, "key")?;
527 Ok(AofRecord::Del { key })
528 }
529 TAG_EXPIRE => {
530 let key = read_string(&mut cursor, "key")?;
531 let raw = format::read_i64(&mut cursor)?;
532 let seconds = u64::try_from(raw).map_err(|_| {
533 FormatError::InvalidData(format!(
534 "EXPIRE seconds is negative ({raw}) in AOF record"
535 ))
536 })?;
537 Ok(AofRecord::Expire { key, seconds })
538 }
539 TAG_LPUSH | TAG_RPUSH => {
540 let key = read_string(&mut cursor, "key")?;
541 let values = read_bytes_list(&mut cursor, "list")?;
542 if tag == TAG_LPUSH {
543 Ok(AofRecord::LPush { key, values })
544 } else {
545 Ok(AofRecord::RPush { key, values })
546 }
547 }
548 TAG_LPOP => {
549 let key = read_string(&mut cursor, "key")?;
550 Ok(AofRecord::LPop { key })
551 }
552 TAG_RPOP => {
553 let key = read_string(&mut cursor, "key")?;
554 Ok(AofRecord::RPop { key })
555 }
556 TAG_ZADD => {
557 let key = read_string(&mut cursor, "key")?;
558 let count = format::read_u32(&mut cursor)?;
559 format::validate_collection_count(count, "sorted set")?;
560 let mut members = Vec::with_capacity(format::capped_capacity(count));
561 for _ in 0..count {
562 let score = format::read_f64(&mut cursor)?;
563 let member = read_string(&mut cursor, "member")?;
564 members.push((score, member));
565 }
566 Ok(AofRecord::ZAdd { key, members })
567 }
568 TAG_ZREM => {
569 let key = read_string(&mut cursor, "key")?;
570 let members = read_string_list(&mut cursor, "member")?;
571 Ok(AofRecord::ZRem { key, members })
572 }
573 TAG_PERSIST => {
574 let key = read_string(&mut cursor, "key")?;
575 Ok(AofRecord::Persist { key })
576 }
577 TAG_PEXPIRE => {
578 let key = read_string(&mut cursor, "key")?;
579 let raw = format::read_i64(&mut cursor)?;
580 let milliseconds = u64::try_from(raw).map_err(|_| {
581 FormatError::InvalidData(format!(
582 "PEXPIRE milliseconds is negative ({raw}) in AOF record"
583 ))
584 })?;
585 Ok(AofRecord::Pexpire { key, milliseconds })
586 }
587 TAG_INCR => {
588 let key = read_string(&mut cursor, "key")?;
589 Ok(AofRecord::Incr { key })
590 }
591 TAG_DECR => {
592 let key = read_string(&mut cursor, "key")?;
593 Ok(AofRecord::Decr { key })
594 }
595 TAG_HSET => {
596 let key = read_string(&mut cursor, "key")?;
597 let count = format::read_u32(&mut cursor)?;
598 format::validate_collection_count(count, "hash")?;
599 let mut fields = Vec::with_capacity(format::capped_capacity(count));
600 for _ in 0..count {
601 let field = read_string(&mut cursor, "field")?;
602 let value = Bytes::from(format::read_bytes(&mut cursor)?);
603 fields.push((field, value));
604 }
605 Ok(AofRecord::HSet { key, fields })
606 }
607 TAG_HDEL => {
608 let key = read_string(&mut cursor, "key")?;
609 let fields = read_string_list(&mut cursor, "field")?;
610 Ok(AofRecord::HDel { key, fields })
611 }
612 TAG_HINCRBY => {
613 let key = read_string(&mut cursor, "key")?;
614 let field = read_string(&mut cursor, "field")?;
615 let delta = format::read_i64(&mut cursor)?;
616 Ok(AofRecord::HIncrBy { key, field, delta })
617 }
618 TAG_SADD => {
619 let key = read_string(&mut cursor, "key")?;
620 let members = read_string_list(&mut cursor, "member")?;
621 Ok(AofRecord::SAdd { key, members })
622 }
623 TAG_SREM => {
624 let key = read_string(&mut cursor, "key")?;
625 let members = read_string_list(&mut cursor, "member")?;
626 Ok(AofRecord::SRem { key, members })
627 }
628 TAG_INCRBY => {
629 let key = read_string(&mut cursor, "key")?;
630 let delta = format::read_i64(&mut cursor)?;
631 Ok(AofRecord::IncrBy { key, delta })
632 }
633 TAG_DECRBY => {
634 let key = read_string(&mut cursor, "key")?;
635 let delta = format::read_i64(&mut cursor)?;
636 Ok(AofRecord::DecrBy { key, delta })
637 }
638 TAG_APPEND => {
639 let key = read_string(&mut cursor, "key")?;
640 let value = Bytes::from(format::read_bytes(&mut cursor)?);
641 Ok(AofRecord::Append { key, value })
642 }
643 TAG_RENAME => {
644 let key = read_string(&mut cursor, "key")?;
645 let newkey = read_string(&mut cursor, "newkey")?;
646 Ok(AofRecord::Rename { key, newkey })
647 }
648 #[cfg(feature = "vector")]
649 TAG_VADD => {
650 let key = read_string(&mut cursor, "key")?;
651 let element = read_string(&mut cursor, "element")?;
652 let dim = format::read_u32(&mut cursor)?;
653 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
654 return Err(FormatError::InvalidData(format!(
655 "AOF VADD dimension {dim} exceeds max {}",
656 format::MAX_PERSISTED_VECTOR_DIMS
657 )));
658 }
659 let mut vector = Vec::with_capacity(dim as usize);
660 for _ in 0..dim {
661 vector.push(format::read_f32(&mut cursor)?);
662 }
663 let metric = format::read_u8(&mut cursor)?;
664 let quantization = format::read_u8(&mut cursor)?;
665 let connectivity = format::read_u32(&mut cursor)?;
666 let expansion_add = format::read_u32(&mut cursor)?;
667 Ok(AofRecord::VAdd {
668 key,
669 element,
670 vector,
671 metric,
672 quantization,
673 connectivity,
674 expansion_add,
675 })
676 }
677 #[cfg(feature = "vector")]
678 TAG_VREM => {
679 let key = read_string(&mut cursor, "key")?;
680 let element = read_string(&mut cursor, "element")?;
681 Ok(AofRecord::VRem { key, element })
682 }
683 #[cfg(feature = "protobuf")]
684 TAG_PROTO_SET => {
685 let key = read_string(&mut cursor, "key")?;
686 let type_name = read_string(&mut cursor, "type_name")?;
687 let data = format::read_bytes(&mut cursor)?;
688 let expire_ms = format::read_i64(&mut cursor)?;
689 Ok(AofRecord::ProtoSet {
690 key,
691 type_name,
692 data: Bytes::from(data),
693 expire_ms,
694 })
695 }
696 #[cfg(feature = "protobuf")]
697 TAG_PROTO_REGISTER => {
698 let name = read_string(&mut cursor, "name")?;
699 let descriptor = format::read_bytes(&mut cursor)?;
700 Ok(AofRecord::ProtoRegister {
701 name,
702 descriptor: Bytes::from(descriptor),
703 })
704 }
705 _ => Err(FormatError::UnknownTag(tag)),
706 }
707 }
708}
709
710#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
712pub enum FsyncPolicy {
713 Always,
715 #[default]
717 EverySec,
718 No,
720}
721
722pub struct AofWriter {
724 writer: BufWriter<File>,
725 path: PathBuf,
726 #[cfg(feature = "encryption")]
727 encryption_key: Option<crate::encryption::EncryptionKey>,
728}
729
730impl AofWriter {
731 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
734 let path = path.into();
735 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
736
737 let file = open_persistence_file(&path)?;
738 let mut writer = BufWriter::new(file);
739
740 if !exists {
741 format::write_header(&mut writer, format::AOF_MAGIC)?;
742 writer.flush()?;
743 }
744
745 Ok(Self {
746 writer,
747 path,
748 #[cfg(feature = "encryption")]
749 encryption_key: None,
750 })
751 }
752
753 #[cfg(feature = "encryption")]
759 pub fn open_encrypted(
760 path: impl Into<PathBuf>,
761 key: crate::encryption::EncryptionKey,
762 ) -> Result<Self, FormatError> {
763 let path = path.into();
764 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
765
766 let file = open_persistence_file(&path)?;
767 let mut writer = BufWriter::new(file);
768
769 if !exists {
770 format::write_header_versioned(
771 &mut writer,
772 format::AOF_MAGIC,
773 format::FORMAT_VERSION_ENCRYPTED,
774 )?;
775 writer.flush()?;
776 }
777
778 Ok(Self {
779 writer,
780 path,
781 encryption_key: Some(key),
782 })
783 }
784
785 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
790 let payload = record.to_bytes()?;
791
792 #[cfg(feature = "encryption")]
793 if let Some(ref key) = self.encryption_key {
794 let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
795 self.writer.write_all(&nonce)?;
796 format::write_len(&mut self.writer, ciphertext.len())?;
797 self.writer.write_all(&ciphertext)?;
798 return Ok(());
799 }
800
801 let checksum = format::crc32(&payload);
802 self.writer.write_all(&payload)?;
803 format::write_u32(&mut self.writer, checksum)?;
804 Ok(())
805 }
806
807 pub fn flush(&mut self) -> Result<(), FormatError> {
809 self.writer.flush()?;
810 Ok(())
811 }
812
813 pub fn sync(&mut self) -> Result<(), FormatError> {
815 self.writer.flush()?;
816 self.writer.get_ref().sync_all()?;
817 Ok(())
818 }
819
820 pub fn path(&self) -> &Path {
822 &self.path
823 }
824
825 pub fn truncate(&mut self) -> Result<(), FormatError> {
831 self.writer.flush()?;
833
834 let tmp_path = self.path.with_extension("aof.tmp");
836 let mut opts = OpenOptions::new();
837 opts.create(true).write(true).truncate(true);
838 #[cfg(unix)]
839 {
840 use std::os::unix::fs::OpenOptionsExt;
841 opts.mode(0o600);
842 }
843 let tmp_file = opts.open(&tmp_path)?;
844 let mut tmp_writer = BufWriter::new(tmp_file);
845
846 #[cfg(feature = "encryption")]
847 if self.encryption_key.is_some() {
848 format::write_header_versioned(
849 &mut tmp_writer,
850 format::AOF_MAGIC,
851 format::FORMAT_VERSION_ENCRYPTED,
852 )?;
853 } else {
854 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
855 }
856 #[cfg(not(feature = "encryption"))]
857 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
858
859 tmp_writer.flush()?;
860 tmp_writer.get_ref().sync_all()?;
861
862 std::fs::rename(&tmp_path, &self.path)?;
864
865 let file = OpenOptions::new().append(true).open(&self.path)?;
867 self.writer = BufWriter::new(file);
868 Ok(())
869 }
870}
871
872pub struct AofReader {
874 reader: BufReader<File>,
875 version: u8,
877 #[cfg(feature = "encryption")]
878 encryption_key: Option<crate::encryption::EncryptionKey>,
879}
880
881impl fmt::Debug for AofReader {
882 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
883 f.debug_struct("AofReader")
884 .field("version", &self.version)
885 .finish()
886 }
887}
888
889impl AofReader {
890 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
892 let file = File::open(path.as_ref())?;
893 let mut reader = BufReader::new(file);
894 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
895
896 if version == format::FORMAT_VERSION_ENCRYPTED {
897 return Err(FormatError::EncryptionRequired);
898 }
899
900 Ok(Self {
901 reader,
902 version,
903 #[cfg(feature = "encryption")]
904 encryption_key: None,
905 })
906 }
907
908 #[cfg(feature = "encryption")]
913 pub fn open_encrypted(
914 path: impl AsRef<Path>,
915 key: crate::encryption::EncryptionKey,
916 ) -> Result<Self, FormatError> {
917 let file = File::open(path.as_ref())?;
918 let mut reader = BufReader::new(file);
919 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
920
921 Ok(Self {
922 reader,
923 version,
924 encryption_key: Some(key),
925 })
926 }
927
928 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
934 #[cfg(feature = "encryption")]
935 if self.version == format::FORMAT_VERSION_ENCRYPTED {
936 return self.read_encrypted_record();
937 }
938
939 self.read_v2_record()
940 }
941
942 fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
944 let tag = match format::read_u8(&mut self.reader) {
946 Ok(t) => t,
947 Err(FormatError::UnexpectedEof) => return Ok(None),
948 Err(e) => return Err(e),
949 };
950
951 let record_result = self.read_payload_for_tag(tag);
954 match record_result {
955 Ok((payload, stored_crc)) => {
956 let mut full = Vec::with_capacity(1 + payload.len());
958 full.push(tag);
959 full.extend_from_slice(&payload);
960 format::verify_crc32(&full, stored_crc)?;
961 AofRecord::from_bytes(&full).map(Some)
962 }
963 Err(FormatError::UnexpectedEof) => Ok(None),
965 Err(e) => Err(e),
966 }
967 }
968
969 #[cfg(feature = "encryption")]
971 fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
972 let key = self
973 .encryption_key
974 .as_ref()
975 .ok_or(FormatError::EncryptionRequired)?;
976
977 let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
979 if let Err(e) = self.reader.read_exact(&mut nonce) {
980 return if e.kind() == io::ErrorKind::UnexpectedEof {
981 Ok(None)
982 } else {
983 Err(FormatError::Io(e))
984 };
985 }
986
987 let ct_len = match format::read_u32(&mut self.reader) {
989 Ok(n) => n as usize,
990 Err(FormatError::UnexpectedEof) => return Ok(None),
991 Err(e) => return Err(e),
992 };
993
994 if ct_len > format::MAX_FIELD_LEN {
995 return Err(FormatError::Io(io::Error::new(
996 io::ErrorKind::InvalidData,
997 format!("encrypted record length {ct_len} exceeds maximum"),
998 )));
999 }
1000
1001 let mut ciphertext = vec![0u8; ct_len];
1002 if let Err(e) = self.reader.read_exact(&mut ciphertext) {
1003 return if e.kind() == io::ErrorKind::UnexpectedEof {
1004 Ok(None)
1005 } else {
1006 Err(FormatError::Io(e))
1007 };
1008 }
1009
1010 let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
1011 AofRecord::from_bytes(&plaintext).map(Some)
1012 }
1013
1014 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
1016 let mut payload = Vec::new();
1017 match tag {
1018 TAG_SET => {
1019 let key = format::read_bytes(&mut self.reader)?;
1021 format::write_bytes(&mut payload, &key)?;
1022 let value = format::read_bytes(&mut self.reader)?;
1023 format::write_bytes(&mut payload, &value)?;
1024 let expire_ms = format::read_i64(&mut self.reader)?;
1025 format::write_i64(&mut payload, expire_ms)?;
1026 }
1027 TAG_DEL => {
1028 let key = format::read_bytes(&mut self.reader)?;
1029 format::write_bytes(&mut payload, &key)?;
1030 }
1031 TAG_EXPIRE => {
1032 let key = format::read_bytes(&mut self.reader)?;
1033 format::write_bytes(&mut payload, &key)?;
1034 let seconds = format::read_i64(&mut self.reader)?;
1035 format::write_i64(&mut payload, seconds)?;
1036 }
1037 TAG_LPUSH | TAG_RPUSH => {
1038 let key = format::read_bytes(&mut self.reader)?;
1039 format::write_bytes(&mut payload, &key)?;
1040 let count = format::read_u32(&mut self.reader)?;
1041 format::validate_collection_count(count, "list")?;
1042 format::write_u32(&mut payload, count)?;
1043 for _ in 0..count {
1044 let val = format::read_bytes(&mut self.reader)?;
1045 format::write_bytes(&mut payload, &val)?;
1046 }
1047 }
1048 TAG_LPOP | TAG_RPOP => {
1049 let key = format::read_bytes(&mut self.reader)?;
1050 format::write_bytes(&mut payload, &key)?;
1051 }
1052 TAG_ZADD => {
1053 let key = format::read_bytes(&mut self.reader)?;
1054 format::write_bytes(&mut payload, &key)?;
1055 let count = format::read_u32(&mut self.reader)?;
1056 format::validate_collection_count(count, "sorted set")?;
1057 format::write_u32(&mut payload, count)?;
1058 for _ in 0..count {
1059 let score = format::read_f64(&mut self.reader)?;
1060 format::write_f64(&mut payload, score)?;
1061 let member = format::read_bytes(&mut self.reader)?;
1062 format::write_bytes(&mut payload, &member)?;
1063 }
1064 }
1065 TAG_ZREM => {
1066 let key = format::read_bytes(&mut self.reader)?;
1067 format::write_bytes(&mut payload, &key)?;
1068 let count = format::read_u32(&mut self.reader)?;
1069 format::validate_collection_count(count, "sorted set")?;
1070 format::write_u32(&mut payload, count)?;
1071 for _ in 0..count {
1072 let member = format::read_bytes(&mut self.reader)?;
1073 format::write_bytes(&mut payload, &member)?;
1074 }
1075 }
1076 TAG_PERSIST => {
1077 let key = format::read_bytes(&mut self.reader)?;
1078 format::write_bytes(&mut payload, &key)?;
1079 }
1080 TAG_PEXPIRE => {
1081 let key = format::read_bytes(&mut self.reader)?;
1082 format::write_bytes(&mut payload, &key)?;
1083 let millis = format::read_i64(&mut self.reader)?;
1084 format::write_i64(&mut payload, millis)?;
1085 }
1086 TAG_INCR | TAG_DECR => {
1087 let key = format::read_bytes(&mut self.reader)?;
1088 format::write_bytes(&mut payload, &key)?;
1089 }
1090 TAG_HSET => {
1091 let key = format::read_bytes(&mut self.reader)?;
1092 format::write_bytes(&mut payload, &key)?;
1093 let count = format::read_u32(&mut self.reader)?;
1094 format::validate_collection_count(count, "hash")?;
1095 format::write_u32(&mut payload, count)?;
1096 for _ in 0..count {
1097 let field = format::read_bytes(&mut self.reader)?;
1098 format::write_bytes(&mut payload, &field)?;
1099 let value = format::read_bytes(&mut self.reader)?;
1100 format::write_bytes(&mut payload, &value)?;
1101 }
1102 }
1103 TAG_HDEL | TAG_SADD | TAG_SREM => {
1104 let key = format::read_bytes(&mut self.reader)?;
1105 format::write_bytes(&mut payload, &key)?;
1106 let count = format::read_u32(&mut self.reader)?;
1107 format::validate_collection_count(count, "set")?;
1108 format::write_u32(&mut payload, count)?;
1109 for _ in 0..count {
1110 let item = format::read_bytes(&mut self.reader)?;
1111 format::write_bytes(&mut payload, &item)?;
1112 }
1113 }
1114 TAG_HINCRBY => {
1115 let key = format::read_bytes(&mut self.reader)?;
1116 format::write_bytes(&mut payload, &key)?;
1117 let field = format::read_bytes(&mut self.reader)?;
1118 format::write_bytes(&mut payload, &field)?;
1119 let delta = format::read_i64(&mut self.reader)?;
1120 format::write_i64(&mut payload, delta)?;
1121 }
1122 TAG_INCRBY | TAG_DECRBY => {
1123 let key = format::read_bytes(&mut self.reader)?;
1124 format::write_bytes(&mut payload, &key)?;
1125 let delta = format::read_i64(&mut self.reader)?;
1126 format::write_i64(&mut payload, delta)?;
1127 }
1128 TAG_APPEND => {
1129 let key = format::read_bytes(&mut self.reader)?;
1130 format::write_bytes(&mut payload, &key)?;
1131 let value = format::read_bytes(&mut self.reader)?;
1132 format::write_bytes(&mut payload, &value)?;
1133 }
1134 TAG_RENAME => {
1135 let key = format::read_bytes(&mut self.reader)?;
1136 format::write_bytes(&mut payload, &key)?;
1137 let newkey = format::read_bytes(&mut self.reader)?;
1138 format::write_bytes(&mut payload, &newkey)?;
1139 }
1140 #[cfg(feature = "vector")]
1141 TAG_VADD => {
1142 let key = format::read_bytes(&mut self.reader)?;
1143 format::write_bytes(&mut payload, &key)?;
1144 let element = format::read_bytes(&mut self.reader)?;
1145 format::write_bytes(&mut payload, &element)?;
1146 let dim = format::read_u32(&mut self.reader)?;
1147 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1148 return Err(FormatError::InvalidData(format!(
1149 "AOF VADD dimension {dim} exceeds max {}",
1150 format::MAX_PERSISTED_VECTOR_DIMS
1151 )));
1152 }
1153 format::write_u32(&mut payload, dim)?;
1154 for _ in 0..dim {
1155 let v = format::read_f32(&mut self.reader)?;
1156 format::write_f32(&mut payload, v)?;
1157 }
1158 let metric = format::read_u8(&mut self.reader)?;
1159 format::write_u8(&mut payload, metric)?;
1160 let quantization = format::read_u8(&mut self.reader)?;
1161 format::write_u8(&mut payload, quantization)?;
1162 let connectivity = format::read_u32(&mut self.reader)?;
1163 format::write_u32(&mut payload, connectivity)?;
1164 let expansion_add = format::read_u32(&mut self.reader)?;
1165 format::write_u32(&mut payload, expansion_add)?;
1166 }
1167 #[cfg(feature = "vector")]
1168 TAG_VREM => {
1169 let key = format::read_bytes(&mut self.reader)?;
1170 format::write_bytes(&mut payload, &key)?;
1171 let element = format::read_bytes(&mut self.reader)?;
1172 format::write_bytes(&mut payload, &element)?;
1173 }
1174 #[cfg(feature = "protobuf")]
1175 TAG_PROTO_SET => {
1176 let key = format::read_bytes(&mut self.reader)?;
1177 format::write_bytes(&mut payload, &key)?;
1178 let type_name = format::read_bytes(&mut self.reader)?;
1179 format::write_bytes(&mut payload, &type_name)?;
1180 let data = format::read_bytes(&mut self.reader)?;
1181 format::write_bytes(&mut payload, &data)?;
1182 let expire_ms = format::read_i64(&mut self.reader)?;
1183 format::write_i64(&mut payload, expire_ms)?;
1184 }
1185 #[cfg(feature = "protobuf")]
1186 TAG_PROTO_REGISTER => {
1187 let name = format::read_bytes(&mut self.reader)?;
1188 format::write_bytes(&mut payload, &name)?;
1189 let descriptor = format::read_bytes(&mut self.reader)?;
1190 format::write_bytes(&mut payload, &descriptor)?;
1191 }
1192 _ => return Err(FormatError::UnknownTag(tag)),
1193 }
1194 let stored_crc = format::read_u32(&mut self.reader)?;
1195 Ok((payload, stored_crc))
1196 }
1197}
1198
1199fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1201 let mut opts = OpenOptions::new();
1202 opts.create(true).append(true);
1203 #[cfg(unix)]
1204 {
1205 use std::os::unix::fs::OpenOptionsExt;
1206 opts.mode(0o600);
1207 }
1208 Ok(opts.open(path)?)
1209}
1210
1211pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1213 data_dir.join(format!("shard-{shard_id}.aof"))
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218 use super::*;
1219
1220 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1221
1222 fn temp_dir() -> tempfile::TempDir {
1223 tempfile::tempdir().expect("create temp dir")
1224 }
1225
1226 #[test]
1227 fn record_round_trip_set() -> Result {
1228 let rec = AofRecord::Set {
1229 key: "hello".into(),
1230 value: Bytes::from("world"),
1231 expire_ms: 5000,
1232 };
1233 let bytes = rec.to_bytes()?;
1234 let decoded = AofRecord::from_bytes(&bytes)?;
1235 assert_eq!(rec, decoded);
1236 Ok(())
1237 }
1238
1239 #[test]
1240 fn record_round_trip_del() -> Result {
1241 let rec = AofRecord::Del { key: "gone".into() };
1242 let bytes = rec.to_bytes()?;
1243 let decoded = AofRecord::from_bytes(&bytes)?;
1244 assert_eq!(rec, decoded);
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn record_round_trip_expire() -> Result {
1250 let rec = AofRecord::Expire {
1251 key: "ttl".into(),
1252 seconds: 300,
1253 };
1254 let bytes = rec.to_bytes()?;
1255 let decoded = AofRecord::from_bytes(&bytes)?;
1256 assert_eq!(rec, decoded);
1257 Ok(())
1258 }
1259
1260 #[test]
1261 fn set_with_no_expiry() -> Result {
1262 let rec = AofRecord::Set {
1263 key: "k".into(),
1264 value: Bytes::from("v"),
1265 expire_ms: -1,
1266 };
1267 let bytes = rec.to_bytes()?;
1268 let decoded = AofRecord::from_bytes(&bytes)?;
1269 assert_eq!(rec, decoded);
1270 Ok(())
1271 }
1272
1273 #[test]
1274 fn writer_reader_round_trip() -> Result {
1275 let dir = temp_dir();
1276 let path = dir.path().join("test.aof");
1277
1278 let records = vec![
1279 AofRecord::Set {
1280 key: "a".into(),
1281 value: Bytes::from("1"),
1282 expire_ms: -1,
1283 },
1284 AofRecord::Set {
1285 key: "b".into(),
1286 value: Bytes::from("2"),
1287 expire_ms: 10_000,
1288 },
1289 AofRecord::Del { key: "a".into() },
1290 AofRecord::Expire {
1291 key: "b".into(),
1292 seconds: 60,
1293 },
1294 ];
1295
1296 {
1298 let mut writer = AofWriter::open(&path)?;
1299 for rec in &records {
1300 writer.write_record(rec)?;
1301 }
1302 writer.sync()?;
1303 }
1304
1305 let mut reader = AofReader::open(&path)?;
1307 let mut got = Vec::new();
1308 while let Some(rec) = reader.read_record()? {
1309 got.push(rec);
1310 }
1311 assert_eq!(records, got);
1312 Ok(())
1313 }
1314
1315 #[test]
1316 fn empty_aof_returns_no_records() -> Result {
1317 let dir = temp_dir();
1318 let path = dir.path().join("empty.aof");
1319
1320 {
1322 let _writer = AofWriter::open(&path)?;
1323 }
1324
1325 let mut reader = AofReader::open(&path)?;
1326 assert!(reader.read_record()?.is_none());
1327 Ok(())
1328 }
1329
1330 #[test]
1331 fn truncated_record_treated_as_eof() -> Result {
1332 let dir = temp_dir();
1333 let path = dir.path().join("trunc.aof");
1334
1335 {
1337 let mut writer = AofWriter::open(&path)?;
1338 writer.write_record(&AofRecord::Set {
1339 key: "ok".into(),
1340 value: Bytes::from("good"),
1341 expire_ms: -1,
1342 })?;
1343 writer.flush()?;
1344 }
1345
1346 {
1348 let mut file = OpenOptions::new().append(true).open(&path)?;
1349 file.write_all(&[TAG_SET])?;
1350 }
1351
1352 let mut reader = AofReader::open(&path)?;
1353 let rec = reader.read_record()?.unwrap();
1355 assert!(matches!(rec, AofRecord::Set { .. }));
1356 assert!(reader.read_record()?.is_none());
1358 Ok(())
1359 }
1360
1361 #[test]
1362 fn corrupt_crc_detected() -> Result {
1363 let dir = temp_dir();
1364 let path = dir.path().join("corrupt.aof");
1365
1366 {
1367 let mut writer = AofWriter::open(&path)?;
1368 writer.write_record(&AofRecord::Set {
1369 key: "k".into(),
1370 value: Bytes::from("v"),
1371 expire_ms: -1,
1372 })?;
1373 writer.flush()?;
1374 }
1375
1376 let mut data = fs::read(&path)?;
1378 let last = data.len() - 1;
1379 data[last] ^= 0xFF;
1380 fs::write(&path, &data)?;
1381
1382 let mut reader = AofReader::open(&path)?;
1383 let err = reader.read_record().unwrap_err();
1384 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1385 Ok(())
1386 }
1387
1388 #[test]
1389 fn missing_magic_is_error() {
1390 let dir = temp_dir();
1391 let path = dir.path().join("bad.aof");
1392 fs::write(&path, b"NOT_AOF_DATA").unwrap();
1393
1394 let err = AofReader::open(&path).unwrap_err();
1395 assert!(matches!(err, FormatError::InvalidMagic));
1396 }
1397
1398 #[test]
1399 fn truncate_resets_aof() -> Result {
1400 let dir = temp_dir();
1401 let path = dir.path().join("reset.aof");
1402
1403 {
1404 let mut writer = AofWriter::open(&path)?;
1405 writer.write_record(&AofRecord::Set {
1406 key: "old".into(),
1407 value: Bytes::from("data"),
1408 expire_ms: -1,
1409 })?;
1410 writer.truncate()?;
1411
1412 writer.write_record(&AofRecord::Set {
1414 key: "new".into(),
1415 value: Bytes::from("fresh"),
1416 expire_ms: -1,
1417 })?;
1418 writer.sync()?;
1419 }
1420
1421 let mut reader = AofReader::open(&path)?;
1422 let rec = reader.read_record()?.unwrap();
1423 match rec {
1424 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1425 other => panic!("expected Set, got {other:?}"),
1426 }
1427 assert!(reader.read_record()?.is_none());
1429 Ok(())
1430 }
1431
1432 #[test]
1433 fn record_round_trip_lpush() -> Result {
1434 let rec = AofRecord::LPush {
1435 key: "list".into(),
1436 values: vec![Bytes::from("a"), Bytes::from("b")],
1437 };
1438 let bytes = rec.to_bytes()?;
1439 let decoded = AofRecord::from_bytes(&bytes)?;
1440 assert_eq!(rec, decoded);
1441 Ok(())
1442 }
1443
1444 #[test]
1445 fn record_round_trip_rpush() -> Result {
1446 let rec = AofRecord::RPush {
1447 key: "list".into(),
1448 values: vec![Bytes::from("x")],
1449 };
1450 let bytes = rec.to_bytes()?;
1451 let decoded = AofRecord::from_bytes(&bytes)?;
1452 assert_eq!(rec, decoded);
1453 Ok(())
1454 }
1455
1456 #[test]
1457 fn record_round_trip_lpop() -> Result {
1458 let rec = AofRecord::LPop { key: "list".into() };
1459 let bytes = rec.to_bytes()?;
1460 let decoded = AofRecord::from_bytes(&bytes)?;
1461 assert_eq!(rec, decoded);
1462 Ok(())
1463 }
1464
1465 #[test]
1466 fn record_round_trip_rpop() -> Result {
1467 let rec = AofRecord::RPop { key: "list".into() };
1468 let bytes = rec.to_bytes()?;
1469 let decoded = AofRecord::from_bytes(&bytes)?;
1470 assert_eq!(rec, decoded);
1471 Ok(())
1472 }
1473
1474 #[test]
1475 fn writer_reader_round_trip_with_list_records() -> Result {
1476 let dir = temp_dir();
1477 let path = dir.path().join("list.aof");
1478
1479 let records = vec![
1480 AofRecord::LPush {
1481 key: "l".into(),
1482 values: vec![Bytes::from("a"), Bytes::from("b")],
1483 },
1484 AofRecord::RPush {
1485 key: "l".into(),
1486 values: vec![Bytes::from("c")],
1487 },
1488 AofRecord::LPop { key: "l".into() },
1489 AofRecord::RPop { key: "l".into() },
1490 ];
1491
1492 {
1493 let mut writer = AofWriter::open(&path)?;
1494 for rec in &records {
1495 writer.write_record(rec)?;
1496 }
1497 writer.sync()?;
1498 }
1499
1500 let mut reader = AofReader::open(&path)?;
1501 let mut got = Vec::new();
1502 while let Some(rec) = reader.read_record()? {
1503 got.push(rec);
1504 }
1505 assert_eq!(records, got);
1506 Ok(())
1507 }
1508
1509 #[test]
1510 fn record_round_trip_zadd() -> Result {
1511 let rec = AofRecord::ZAdd {
1512 key: "board".into(),
1513 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1514 };
1515 let bytes = rec.to_bytes()?;
1516 let decoded = AofRecord::from_bytes(&bytes)?;
1517 assert_eq!(rec, decoded);
1518 Ok(())
1519 }
1520
1521 #[test]
1522 fn record_round_trip_zrem() -> Result {
1523 let rec = AofRecord::ZRem {
1524 key: "board".into(),
1525 members: vec!["alice".into(), "bob".into()],
1526 };
1527 let bytes = rec.to_bytes()?;
1528 let decoded = AofRecord::from_bytes(&bytes)?;
1529 assert_eq!(rec, decoded);
1530 Ok(())
1531 }
1532
1533 #[test]
1534 fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1535 let dir = temp_dir();
1536 let path = dir.path().join("zset.aof");
1537
1538 let records = vec![
1539 AofRecord::ZAdd {
1540 key: "board".into(),
1541 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1542 },
1543 AofRecord::ZRem {
1544 key: "board".into(),
1545 members: vec!["alice".into()],
1546 },
1547 ];
1548
1549 {
1550 let mut writer = AofWriter::open(&path)?;
1551 for rec in &records {
1552 writer.write_record(rec)?;
1553 }
1554 writer.sync()?;
1555 }
1556
1557 let mut reader = AofReader::open(&path)?;
1558 let mut got = Vec::new();
1559 while let Some(rec) = reader.read_record()? {
1560 got.push(rec);
1561 }
1562 assert_eq!(records, got);
1563 Ok(())
1564 }
1565
1566 #[test]
1567 fn record_round_trip_persist() -> Result {
1568 let rec = AofRecord::Persist {
1569 key: "mykey".into(),
1570 };
1571 let bytes = rec.to_bytes()?;
1572 let decoded = AofRecord::from_bytes(&bytes)?;
1573 assert_eq!(rec, decoded);
1574 Ok(())
1575 }
1576
1577 #[test]
1578 fn record_round_trip_pexpire() -> Result {
1579 let rec = AofRecord::Pexpire {
1580 key: "mykey".into(),
1581 milliseconds: 5000,
1582 };
1583 let bytes = rec.to_bytes()?;
1584 let decoded = AofRecord::from_bytes(&bytes)?;
1585 assert_eq!(rec, decoded);
1586 Ok(())
1587 }
1588
1589 #[test]
1590 fn record_round_trip_incr() -> Result {
1591 let rec = AofRecord::Incr {
1592 key: "counter".into(),
1593 };
1594 let bytes = rec.to_bytes()?;
1595 let decoded = AofRecord::from_bytes(&bytes)?;
1596 assert_eq!(rec, decoded);
1597 Ok(())
1598 }
1599
1600 #[test]
1601 fn record_round_trip_decr() -> Result {
1602 let rec = AofRecord::Decr {
1603 key: "counter".into(),
1604 };
1605 let bytes = rec.to_bytes()?;
1606 let decoded = AofRecord::from_bytes(&bytes)?;
1607 assert_eq!(rec, decoded);
1608 Ok(())
1609 }
1610
1611 #[test]
1612 fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1613 let dir = temp_dir();
1614 let path = dir.path().join("persist_pexpire.aof");
1615
1616 let records = vec![
1617 AofRecord::Set {
1618 key: "k".into(),
1619 value: Bytes::from("v"),
1620 expire_ms: 5000,
1621 },
1622 AofRecord::Persist { key: "k".into() },
1623 AofRecord::Pexpire {
1624 key: "k".into(),
1625 milliseconds: 3000,
1626 },
1627 ];
1628
1629 {
1630 let mut writer = AofWriter::open(&path)?;
1631 for rec in &records {
1632 writer.write_record(rec)?;
1633 }
1634 writer.sync()?;
1635 }
1636
1637 let mut reader = AofReader::open(&path)?;
1638 let mut got = Vec::new();
1639 while let Some(rec) = reader.read_record()? {
1640 got.push(rec);
1641 }
1642 assert_eq!(records, got);
1643 Ok(())
1644 }
1645
1646 #[test]
1647 fn aof_path_format() {
1648 let p = aof_path(Path::new("/data"), 3);
1649 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1650 }
1651
1652 #[test]
1653 fn record_round_trip_hset() -> Result {
1654 let rec = AofRecord::HSet {
1655 key: "hash".into(),
1656 fields: vec![
1657 ("f1".into(), Bytes::from("v1")),
1658 ("f2".into(), Bytes::from("v2")),
1659 ],
1660 };
1661 let bytes = rec.to_bytes()?;
1662 let decoded = AofRecord::from_bytes(&bytes)?;
1663 assert_eq!(rec, decoded);
1664 Ok(())
1665 }
1666
1667 #[test]
1668 fn record_round_trip_hdel() -> Result {
1669 let rec = AofRecord::HDel {
1670 key: "hash".into(),
1671 fields: vec!["f1".into(), "f2".into()],
1672 };
1673 let bytes = rec.to_bytes()?;
1674 let decoded = AofRecord::from_bytes(&bytes)?;
1675 assert_eq!(rec, decoded);
1676 Ok(())
1677 }
1678
1679 #[test]
1680 fn record_round_trip_hincrby() -> Result {
1681 let rec = AofRecord::HIncrBy {
1682 key: "hash".into(),
1683 field: "counter".into(),
1684 delta: -42,
1685 };
1686 let bytes = rec.to_bytes()?;
1687 let decoded = AofRecord::from_bytes(&bytes)?;
1688 assert_eq!(rec, decoded);
1689 Ok(())
1690 }
1691
1692 #[test]
1693 fn record_round_trip_sadd() -> Result {
1694 let rec = AofRecord::SAdd {
1695 key: "set".into(),
1696 members: vec!["m1".into(), "m2".into(), "m3".into()],
1697 };
1698 let bytes = rec.to_bytes()?;
1699 let decoded = AofRecord::from_bytes(&bytes)?;
1700 assert_eq!(rec, decoded);
1701 Ok(())
1702 }
1703
1704 #[test]
1705 fn record_round_trip_srem() -> Result {
1706 let rec = AofRecord::SRem {
1707 key: "set".into(),
1708 members: vec!["m1".into()],
1709 };
1710 let bytes = rec.to_bytes()?;
1711 let decoded = AofRecord::from_bytes(&bytes)?;
1712 assert_eq!(rec, decoded);
1713 Ok(())
1714 }
1715
1716 #[cfg(feature = "vector")]
1717 #[test]
1718 fn record_round_trip_vadd() -> Result {
1719 let rec = AofRecord::VAdd {
1720 key: "embeddings".into(),
1721 element: "doc1".into(),
1722 vector: vec![0.1, 0.2, 0.3],
1723 metric: 0, quantization: 0, connectivity: 16,
1726 expansion_add: 64,
1727 };
1728 let bytes = rec.to_bytes()?;
1729 let decoded = AofRecord::from_bytes(&bytes)?;
1730 assert_eq!(rec, decoded);
1731 Ok(())
1732 }
1733
1734 #[cfg(feature = "vector")]
1735 #[test]
1736 fn record_round_trip_vadd_high_dim() -> Result {
1737 let rec = AofRecord::VAdd {
1738 key: "vecs".into(),
1739 element: "e".into(),
1740 vector: vec![0.0; 1536], metric: 1, quantization: 1, connectivity: 32,
1744 expansion_add: 128,
1745 };
1746 let bytes = rec.to_bytes()?;
1747 let decoded = AofRecord::from_bytes(&bytes)?;
1748 assert_eq!(rec, decoded);
1749 Ok(())
1750 }
1751
1752 #[cfg(feature = "vector")]
1753 #[test]
1754 fn record_round_trip_vrem() -> Result {
1755 let rec = AofRecord::VRem {
1756 key: "embeddings".into(),
1757 element: "doc1".into(),
1758 };
1759 let bytes = rec.to_bytes()?;
1760 let decoded = AofRecord::from_bytes(&bytes)?;
1761 assert_eq!(rec, decoded);
1762 Ok(())
1763 }
1764
1765 #[cfg(feature = "encryption")]
1766 mod encrypted {
1767 use super::*;
1768 use crate::encryption::EncryptionKey;
1769
1770 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1771
1772 fn test_key() -> EncryptionKey {
1773 EncryptionKey::from_bytes([0x42; 32])
1774 }
1775
1776 #[test]
1777 fn encrypted_writer_reader_round_trip() -> Result {
1778 let dir = temp_dir();
1779 let path = dir.path().join("enc.aof");
1780 let key = test_key();
1781
1782 let records = vec![
1783 AofRecord::Set {
1784 key: "a".into(),
1785 value: Bytes::from("1"),
1786 expire_ms: -1,
1787 },
1788 AofRecord::Del { key: "a".into() },
1789 AofRecord::LPush {
1790 key: "list".into(),
1791 values: vec![Bytes::from("x"), Bytes::from("y")],
1792 },
1793 AofRecord::ZAdd {
1794 key: "zs".into(),
1795 members: vec![(1.0, "m".into())],
1796 },
1797 ];
1798
1799 {
1800 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1801 for rec in &records {
1802 writer.write_record(rec)?;
1803 }
1804 writer.sync()?;
1805 }
1806
1807 let mut reader = AofReader::open_encrypted(&path, key)?;
1808 let mut got = Vec::new();
1809 while let Some(rec) = reader.read_record()? {
1810 got.push(rec);
1811 }
1812 assert_eq!(records, got);
1813 Ok(())
1814 }
1815
1816 #[test]
1817 fn encrypted_aof_wrong_key_fails() -> Result {
1818 let dir = temp_dir();
1819 let path = dir.path().join("enc_bad.aof");
1820 let key = test_key();
1821 let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
1822
1823 {
1824 let mut writer = AofWriter::open_encrypted(&path, key)?;
1825 writer.write_record(&AofRecord::Set {
1826 key: "k".into(),
1827 value: Bytes::from("v"),
1828 expire_ms: -1,
1829 })?;
1830 writer.sync()?;
1831 }
1832
1833 let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
1834 let err = reader.read_record().unwrap_err();
1835 assert!(matches!(err, FormatError::DecryptionFailed));
1836 Ok(())
1837 }
1838
1839 #[test]
1840 fn v2_file_readable_with_encryption_key() -> Result {
1841 let dir = temp_dir();
1842 let path = dir.path().join("v2.aof");
1843 let key = test_key();
1844
1845 {
1847 let mut writer = AofWriter::open(&path)?;
1848 writer.write_record(&AofRecord::Set {
1849 key: "k".into(),
1850 value: Bytes::from("v"),
1851 expire_ms: -1,
1852 })?;
1853 writer.sync()?;
1854 }
1855
1856 let mut reader = AofReader::open_encrypted(&path, key)?;
1858 let rec = reader.read_record()?.unwrap();
1859 assert!(matches!(rec, AofRecord::Set { .. }));
1860 Ok(())
1861 }
1862
1863 #[test]
1864 fn v3_file_without_key_returns_error() -> Result {
1865 let dir = temp_dir();
1866 let path = dir.path().join("v3_nokey.aof");
1867 let key = test_key();
1868
1869 {
1871 let mut writer = AofWriter::open_encrypted(&path, key)?;
1872 writer.write_record(&AofRecord::Set {
1873 key: "k".into(),
1874 value: Bytes::from("v"),
1875 expire_ms: -1,
1876 })?;
1877 writer.sync()?;
1878 }
1879
1880 let err = AofReader::open(&path).unwrap_err();
1882 assert!(matches!(err, FormatError::EncryptionRequired));
1883 Ok(())
1884 }
1885
1886 #[test]
1887 fn encrypted_truncate_preserves_encryption() -> Result {
1888 let dir = temp_dir();
1889 let path = dir.path().join("enc_trunc.aof");
1890 let key = test_key();
1891
1892 {
1893 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1894 writer.write_record(&AofRecord::Set {
1895 key: "old".into(),
1896 value: Bytes::from("data"),
1897 expire_ms: -1,
1898 })?;
1899 writer.truncate()?;
1900
1901 writer.write_record(&AofRecord::Set {
1902 key: "new".into(),
1903 value: Bytes::from("fresh"),
1904 expire_ms: -1,
1905 })?;
1906 writer.sync()?;
1907 }
1908
1909 let mut reader = AofReader::open_encrypted(&path, key)?;
1910 let rec = reader.read_record()?.unwrap();
1911 match rec {
1912 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1913 other => panic!("expected Set, got {other:?}"),
1914 }
1915 assert!(reader.read_record()?.is_none());
1916 Ok(())
1917 }
1918 }
1919}