1use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29 let bytes = format::read_bytes(r)?;
30 String::from_utf8(bytes).map_err(|_| {
31 FormatError::Io(io::Error::new(
32 io::ErrorKind::InvalidData,
33 format!("{field} is not valid utf-8"),
34 ))
35 })
36}
37
38const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52
53#[derive(Debug, Clone, PartialEq)]
55pub enum AofRecord {
56 Set {
58 key: String,
59 value: Bytes,
60 expire_ms: i64,
61 },
62 Del { key: String },
64 Expire { key: String, seconds: u64 },
66 LPush { key: String, values: Vec<Bytes> },
68 RPush { key: String, values: Vec<Bytes> },
70 LPop { key: String },
72 RPop { key: String },
74 ZAdd {
76 key: String,
77 members: Vec<(f64, String)>,
78 },
79 ZRem { key: String, members: Vec<String> },
81 Persist { key: String },
83 Pexpire { key: String, milliseconds: u64 },
85 Incr { key: String },
87 Decr { key: String },
89}
90
91impl AofRecord {
92 fn to_bytes(&self) -> Vec<u8> {
94 let mut buf = Vec::new();
95 match self {
96 AofRecord::Set {
97 key,
98 value,
99 expire_ms,
100 } => {
101 format::write_u8(&mut buf, TAG_SET).expect("vec write");
102 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
103 format::write_bytes(&mut buf, value).expect("vec write");
104 format::write_i64(&mut buf, *expire_ms).expect("vec write");
105 }
106 AofRecord::Del { key } => {
107 format::write_u8(&mut buf, TAG_DEL).expect("vec write");
108 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
109 }
110 AofRecord::Expire { key, seconds } => {
111 format::write_u8(&mut buf, TAG_EXPIRE).expect("vec write");
112 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
113 format::write_i64(&mut buf, *seconds as i64).expect("vec write");
114 }
115 AofRecord::LPush { key, values } => {
116 format::write_u8(&mut buf, TAG_LPUSH).expect("vec write");
117 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
118 format::write_u32(&mut buf, values.len() as u32).expect("vec write");
119 for v in values {
120 format::write_bytes(&mut buf, v).expect("vec write");
121 }
122 }
123 AofRecord::RPush { key, values } => {
124 format::write_u8(&mut buf, TAG_RPUSH).expect("vec write");
125 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
126 format::write_u32(&mut buf, values.len() as u32).expect("vec write");
127 for v in values {
128 format::write_bytes(&mut buf, v).expect("vec write");
129 }
130 }
131 AofRecord::LPop { key } => {
132 format::write_u8(&mut buf, TAG_LPOP).expect("vec write");
133 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
134 }
135 AofRecord::RPop { key } => {
136 format::write_u8(&mut buf, TAG_RPOP).expect("vec write");
137 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
138 }
139 AofRecord::ZAdd { key, members } => {
140 format::write_u8(&mut buf, TAG_ZADD).expect("vec write");
141 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
142 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
143 for (score, member) in members {
144 format::write_f64(&mut buf, *score).expect("vec write");
145 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
146 }
147 }
148 AofRecord::ZRem { key, members } => {
149 format::write_u8(&mut buf, TAG_ZREM).expect("vec write");
150 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
151 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
152 for member in members {
153 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
154 }
155 }
156 AofRecord::Persist { key } => {
157 format::write_u8(&mut buf, TAG_PERSIST).expect("vec write");
158 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
159 }
160 AofRecord::Pexpire { key, milliseconds } => {
161 format::write_u8(&mut buf, TAG_PEXPIRE).expect("vec write");
162 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
163 format::write_i64(&mut buf, *milliseconds as i64).expect("vec write");
164 }
165 AofRecord::Incr { key } => {
166 format::write_u8(&mut buf, TAG_INCR).expect("vec write");
167 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
168 }
169 AofRecord::Decr { key } => {
170 format::write_u8(&mut buf, TAG_DECR).expect("vec write");
171 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
172 }
173 }
174 buf
175 }
176
177 fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
179 let mut cursor = io::Cursor::new(data);
180 let tag = format::read_u8(&mut cursor)?;
181 match tag {
182 TAG_SET => {
183 let key = read_string(&mut cursor, "key")?;
184 let value = format::read_bytes(&mut cursor)?;
185 let expire_ms = format::read_i64(&mut cursor)?;
186 Ok(AofRecord::Set {
187 key,
188 value: Bytes::from(value),
189 expire_ms,
190 })
191 }
192 TAG_DEL => {
193 let key = read_string(&mut cursor, "key")?;
194 Ok(AofRecord::Del { key })
195 }
196 TAG_EXPIRE => {
197 let key = read_string(&mut cursor, "key")?;
198 let seconds = format::read_i64(&mut cursor)? as u64;
199 Ok(AofRecord::Expire { key, seconds })
200 }
201 TAG_LPUSH | TAG_RPUSH => {
202 let key = read_string(&mut cursor, "key")?;
203 let count = format::read_u32(&mut cursor)?;
204 let mut values = Vec::with_capacity(count as usize);
205 for _ in 0..count {
206 values.push(Bytes::from(format::read_bytes(&mut cursor)?));
207 }
208 if tag == TAG_LPUSH {
209 Ok(AofRecord::LPush { key, values })
210 } else {
211 Ok(AofRecord::RPush { key, values })
212 }
213 }
214 TAG_LPOP => {
215 let key = read_string(&mut cursor, "key")?;
216 Ok(AofRecord::LPop { key })
217 }
218 TAG_RPOP => {
219 let key = read_string(&mut cursor, "key")?;
220 Ok(AofRecord::RPop { key })
221 }
222 TAG_ZADD => {
223 let key = read_string(&mut cursor, "key")?;
224 let count = format::read_u32(&mut cursor)?;
225 let mut members = Vec::with_capacity(count as usize);
226 for _ in 0..count {
227 let score = format::read_f64(&mut cursor)?;
228 let member = read_string(&mut cursor, "member")?;
229 members.push((score, member));
230 }
231 Ok(AofRecord::ZAdd { key, members })
232 }
233 TAG_ZREM => {
234 let key = read_string(&mut cursor, "key")?;
235 let count = format::read_u32(&mut cursor)?;
236 let mut members = Vec::with_capacity(count as usize);
237 for _ in 0..count {
238 members.push(read_string(&mut cursor, "member")?);
239 }
240 Ok(AofRecord::ZRem { key, members })
241 }
242 TAG_PERSIST => {
243 let key = read_string(&mut cursor, "key")?;
244 Ok(AofRecord::Persist { key })
245 }
246 TAG_PEXPIRE => {
247 let key = read_string(&mut cursor, "key")?;
248 let milliseconds = format::read_i64(&mut cursor)? as u64;
249 Ok(AofRecord::Pexpire { key, milliseconds })
250 }
251 TAG_INCR => {
252 let key = read_string(&mut cursor, "key")?;
253 Ok(AofRecord::Incr { key })
254 }
255 TAG_DECR => {
256 let key = read_string(&mut cursor, "key")?;
257 Ok(AofRecord::Decr { key })
258 }
259 _ => Err(FormatError::UnknownTag(tag)),
260 }
261 }
262}
263
264#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
266pub enum FsyncPolicy {
267 Always,
269 #[default]
271 EverySec,
272 No,
274}
275
276pub struct AofWriter {
278 writer: BufWriter<File>,
279 path: PathBuf,
280}
281
282impl AofWriter {
283 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
286 let path = path.into();
287 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
288
289 let file = OpenOptions::new().create(true).append(true).open(&path)?;
290 let mut writer = BufWriter::new(file);
291
292 if !exists {
293 format::write_header(&mut writer, format::AOF_MAGIC)?;
294 writer.flush()?;
295 }
296
297 Ok(Self { writer, path })
298 }
299
300 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
302 let payload = record.to_bytes();
303 let checksum = format::crc32(&payload);
304 self.writer.write_all(&payload)?;
305 format::write_u32(&mut self.writer, checksum)?;
306 Ok(())
307 }
308
309 pub fn flush(&mut self) -> Result<(), FormatError> {
311 self.writer.flush()?;
312 Ok(())
313 }
314
315 pub fn sync(&mut self) -> Result<(), FormatError> {
317 self.writer.flush()?;
318 self.writer.get_ref().sync_all()?;
319 Ok(())
320 }
321
322 pub fn path(&self) -> &Path {
324 &self.path
325 }
326
327 pub fn truncate(&mut self) -> Result<(), FormatError> {
330 self.writer.flush()?;
332
333 let file = OpenOptions::new()
335 .create(true)
336 .write(true)
337 .truncate(true)
338 .open(&self.path)?;
339 let mut writer = BufWriter::new(file);
340 format::write_header(&mut writer, format::AOF_MAGIC)?;
341 writer.flush()?;
342 self.writer = writer;
343 Ok(())
344 }
345}
346
347#[derive(Debug)]
349pub struct AofReader {
350 reader: BufReader<File>,
351}
352
353impl AofReader {
354 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
356 let file = File::open(path.as_ref())?;
357 let mut reader = BufReader::new(file);
358 let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
359 Ok(Self { reader })
360 }
361
362 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
368 let tag = match format::read_u8(&mut self.reader) {
370 Ok(t) => t,
371 Err(FormatError::UnexpectedEof) => return Ok(None),
372 Err(e) => return Err(e),
373 };
374
375 let record_result = self.read_payload_for_tag(tag);
378 match record_result {
379 Ok((payload, stored_crc)) => {
380 let mut full = Vec::with_capacity(1 + payload.len());
382 full.push(tag);
383 full.extend_from_slice(&payload);
384 format::verify_crc32(&full, stored_crc)?;
385 AofRecord::from_bytes(&full).map(Some)
386 }
387 Err(FormatError::UnexpectedEof) => Ok(None),
389 Err(e) => Err(e),
390 }
391 }
392
393 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
395 let mut payload = Vec::new();
396 match tag {
397 TAG_SET => {
398 let key = format::read_bytes(&mut self.reader)?;
400 format::write_bytes(&mut payload, &key).expect("vec write");
401 let value = format::read_bytes(&mut self.reader)?;
402 format::write_bytes(&mut payload, &value).expect("vec write");
403 let expire_ms = format::read_i64(&mut self.reader)?;
404 format::write_i64(&mut payload, expire_ms).expect("vec write");
405 }
406 TAG_DEL => {
407 let key = format::read_bytes(&mut self.reader)?;
408 format::write_bytes(&mut payload, &key).expect("vec write");
409 }
410 TAG_EXPIRE => {
411 let key = format::read_bytes(&mut self.reader)?;
412 format::write_bytes(&mut payload, &key).expect("vec write");
413 let seconds = format::read_i64(&mut self.reader)?;
414 format::write_i64(&mut payload, seconds).expect("vec write");
415 }
416 TAG_LPUSH | TAG_RPUSH => {
417 let key = format::read_bytes(&mut self.reader)?;
418 format::write_bytes(&mut payload, &key).expect("vec write");
419 let count = format::read_u32(&mut self.reader)?;
420 format::write_u32(&mut payload, count).expect("vec write");
421 for _ in 0..count {
422 let val = format::read_bytes(&mut self.reader)?;
423 format::write_bytes(&mut payload, &val).expect("vec write");
424 }
425 }
426 TAG_LPOP | TAG_RPOP => {
427 let key = format::read_bytes(&mut self.reader)?;
428 format::write_bytes(&mut payload, &key).expect("vec write");
429 }
430 TAG_ZADD => {
431 let key = format::read_bytes(&mut self.reader)?;
432 format::write_bytes(&mut payload, &key).expect("vec write");
433 let count = format::read_u32(&mut self.reader)?;
434 format::write_u32(&mut payload, count).expect("vec write");
435 for _ in 0..count {
436 let score = format::read_f64(&mut self.reader)?;
437 format::write_f64(&mut payload, score).expect("vec write");
438 let member = format::read_bytes(&mut self.reader)?;
439 format::write_bytes(&mut payload, &member).expect("vec write");
440 }
441 }
442 TAG_ZREM => {
443 let key = format::read_bytes(&mut self.reader)?;
444 format::write_bytes(&mut payload, &key).expect("vec write");
445 let count = format::read_u32(&mut self.reader)?;
446 format::write_u32(&mut payload, count).expect("vec write");
447 for _ in 0..count {
448 let member = format::read_bytes(&mut self.reader)?;
449 format::write_bytes(&mut payload, &member).expect("vec write");
450 }
451 }
452 TAG_PERSIST => {
453 let key = format::read_bytes(&mut self.reader)?;
454 format::write_bytes(&mut payload, &key).expect("vec write");
455 }
456 TAG_PEXPIRE => {
457 let key = format::read_bytes(&mut self.reader)?;
458 format::write_bytes(&mut payload, &key).expect("vec write");
459 let millis = format::read_i64(&mut self.reader)?;
460 format::write_i64(&mut payload, millis).expect("vec write");
461 }
462 TAG_INCR | TAG_DECR => {
463 let key = format::read_bytes(&mut self.reader)?;
464 format::write_bytes(&mut payload, &key).expect("vec write");
465 }
466 _ => return Err(FormatError::UnknownTag(tag)),
467 }
468 let stored_crc = format::read_u32(&mut self.reader)?;
469 Ok((payload, stored_crc))
470 }
471}
472
473pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
475 data_dir.join(format!("shard-{shard_id}.aof"))
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481 fn temp_dir() -> tempfile::TempDir {
482 tempfile::tempdir().expect("create temp dir")
483 }
484
485 #[test]
486 fn record_round_trip_set() {
487 let rec = AofRecord::Set {
488 key: "hello".into(),
489 value: Bytes::from("world"),
490 expire_ms: 5000,
491 };
492 let bytes = rec.to_bytes();
493 let decoded = AofRecord::from_bytes(&bytes).unwrap();
494 assert_eq!(rec, decoded);
495 }
496
497 #[test]
498 fn record_round_trip_del() {
499 let rec = AofRecord::Del { key: "gone".into() };
500 let bytes = rec.to_bytes();
501 let decoded = AofRecord::from_bytes(&bytes).unwrap();
502 assert_eq!(rec, decoded);
503 }
504
505 #[test]
506 fn record_round_trip_expire() {
507 let rec = AofRecord::Expire {
508 key: "ttl".into(),
509 seconds: 300,
510 };
511 let bytes = rec.to_bytes();
512 let decoded = AofRecord::from_bytes(&bytes).unwrap();
513 assert_eq!(rec, decoded);
514 }
515
516 #[test]
517 fn set_with_no_expiry() {
518 let rec = AofRecord::Set {
519 key: "k".into(),
520 value: Bytes::from("v"),
521 expire_ms: -1,
522 };
523 let bytes = rec.to_bytes();
524 let decoded = AofRecord::from_bytes(&bytes).unwrap();
525 assert_eq!(rec, decoded);
526 }
527
528 #[test]
529 fn writer_reader_round_trip() {
530 let dir = temp_dir();
531 let path = dir.path().join("test.aof");
532
533 let records = vec![
534 AofRecord::Set {
535 key: "a".into(),
536 value: Bytes::from("1"),
537 expire_ms: -1,
538 },
539 AofRecord::Set {
540 key: "b".into(),
541 value: Bytes::from("2"),
542 expire_ms: 10_000,
543 },
544 AofRecord::Del { key: "a".into() },
545 AofRecord::Expire {
546 key: "b".into(),
547 seconds: 60,
548 },
549 ];
550
551 {
553 let mut writer = AofWriter::open(&path).unwrap();
554 for rec in &records {
555 writer.write_record(rec).unwrap();
556 }
557 writer.sync().unwrap();
558 }
559
560 let mut reader = AofReader::open(&path).unwrap();
562 let mut got = Vec::new();
563 while let Some(rec) = reader.read_record().unwrap() {
564 got.push(rec);
565 }
566 assert_eq!(records, got);
567 }
568
569 #[test]
570 fn empty_aof_returns_no_records() {
571 let dir = temp_dir();
572 let path = dir.path().join("empty.aof");
573
574 {
576 let _writer = AofWriter::open(&path).unwrap();
577 }
578
579 let mut reader = AofReader::open(&path).unwrap();
580 assert!(reader.read_record().unwrap().is_none());
581 }
582
583 #[test]
584 fn truncated_record_treated_as_eof() {
585 let dir = temp_dir();
586 let path = dir.path().join("trunc.aof");
587
588 {
590 let mut writer = AofWriter::open(&path).unwrap();
591 writer
592 .write_record(&AofRecord::Set {
593 key: "ok".into(),
594 value: Bytes::from("good"),
595 expire_ms: -1,
596 })
597 .unwrap();
598 writer.flush().unwrap();
599 }
600
601 {
603 let mut file = OpenOptions::new().append(true).open(&path).unwrap();
604 file.write_all(&[TAG_SET]).unwrap();
605 }
606
607 let mut reader = AofReader::open(&path).unwrap();
608 let rec = reader.read_record().unwrap().unwrap();
610 assert!(matches!(rec, AofRecord::Set { .. }));
611 assert!(reader.read_record().unwrap().is_none());
613 }
614
615 #[test]
616 fn corrupt_crc_detected() {
617 let dir = temp_dir();
618 let path = dir.path().join("corrupt.aof");
619
620 {
621 let mut writer = AofWriter::open(&path).unwrap();
622 writer
623 .write_record(&AofRecord::Set {
624 key: "k".into(),
625 value: Bytes::from("v"),
626 expire_ms: -1,
627 })
628 .unwrap();
629 writer.flush().unwrap();
630 }
631
632 let mut data = fs::read(&path).unwrap();
634 let last = data.len() - 1;
635 data[last] ^= 0xFF;
636 fs::write(&path, &data).unwrap();
637
638 let mut reader = AofReader::open(&path).unwrap();
639 let err = reader.read_record().unwrap_err();
640 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
641 }
642
643 #[test]
644 fn missing_magic_is_error() {
645 let dir = temp_dir();
646 let path = dir.path().join("bad.aof");
647 fs::write(&path, b"NOT_AOF_DATA").unwrap();
648
649 let err = AofReader::open(&path).unwrap_err();
650 assert!(matches!(err, FormatError::InvalidMagic));
651 }
652
653 #[test]
654 fn truncate_resets_aof() {
655 let dir = temp_dir();
656 let path = dir.path().join("reset.aof");
657
658 {
659 let mut writer = AofWriter::open(&path).unwrap();
660 writer
661 .write_record(&AofRecord::Set {
662 key: "old".into(),
663 value: Bytes::from("data"),
664 expire_ms: -1,
665 })
666 .unwrap();
667 writer.truncate().unwrap();
668
669 writer
671 .write_record(&AofRecord::Set {
672 key: "new".into(),
673 value: Bytes::from("fresh"),
674 expire_ms: -1,
675 })
676 .unwrap();
677 writer.sync().unwrap();
678 }
679
680 let mut reader = AofReader::open(&path).unwrap();
681 let rec = reader.read_record().unwrap().unwrap();
682 match rec {
683 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
684 other => panic!("expected Set, got {other:?}"),
685 }
686 assert!(reader.read_record().unwrap().is_none());
688 }
689
690 #[test]
691 fn record_round_trip_lpush() {
692 let rec = AofRecord::LPush {
693 key: "list".into(),
694 values: vec![Bytes::from("a"), Bytes::from("b")],
695 };
696 let bytes = rec.to_bytes();
697 let decoded = AofRecord::from_bytes(&bytes).unwrap();
698 assert_eq!(rec, decoded);
699 }
700
701 #[test]
702 fn record_round_trip_rpush() {
703 let rec = AofRecord::RPush {
704 key: "list".into(),
705 values: vec![Bytes::from("x")],
706 };
707 let bytes = rec.to_bytes();
708 let decoded = AofRecord::from_bytes(&bytes).unwrap();
709 assert_eq!(rec, decoded);
710 }
711
712 #[test]
713 fn record_round_trip_lpop() {
714 let rec = AofRecord::LPop { key: "list".into() };
715 let bytes = rec.to_bytes();
716 let decoded = AofRecord::from_bytes(&bytes).unwrap();
717 assert_eq!(rec, decoded);
718 }
719
720 #[test]
721 fn record_round_trip_rpop() {
722 let rec = AofRecord::RPop { key: "list".into() };
723 let bytes = rec.to_bytes();
724 let decoded = AofRecord::from_bytes(&bytes).unwrap();
725 assert_eq!(rec, decoded);
726 }
727
728 #[test]
729 fn writer_reader_round_trip_with_list_records() {
730 let dir = temp_dir();
731 let path = dir.path().join("list.aof");
732
733 let records = vec![
734 AofRecord::LPush {
735 key: "l".into(),
736 values: vec![Bytes::from("a"), Bytes::from("b")],
737 },
738 AofRecord::RPush {
739 key: "l".into(),
740 values: vec![Bytes::from("c")],
741 },
742 AofRecord::LPop { key: "l".into() },
743 AofRecord::RPop { key: "l".into() },
744 ];
745
746 {
747 let mut writer = AofWriter::open(&path).unwrap();
748 for rec in &records {
749 writer.write_record(rec).unwrap();
750 }
751 writer.sync().unwrap();
752 }
753
754 let mut reader = AofReader::open(&path).unwrap();
755 let mut got = Vec::new();
756 while let Some(rec) = reader.read_record().unwrap() {
757 got.push(rec);
758 }
759 assert_eq!(records, got);
760 }
761
762 #[test]
763 fn record_round_trip_zadd() {
764 let rec = AofRecord::ZAdd {
765 key: "board".into(),
766 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
767 };
768 let bytes = rec.to_bytes();
769 let decoded = AofRecord::from_bytes(&bytes).unwrap();
770 assert_eq!(rec, decoded);
771 }
772
773 #[test]
774 fn record_round_trip_zrem() {
775 let rec = AofRecord::ZRem {
776 key: "board".into(),
777 members: vec!["alice".into(), "bob".into()],
778 };
779 let bytes = rec.to_bytes();
780 let decoded = AofRecord::from_bytes(&bytes).unwrap();
781 assert_eq!(rec, decoded);
782 }
783
784 #[test]
785 fn writer_reader_round_trip_with_sorted_set_records() {
786 let dir = temp_dir();
787 let path = dir.path().join("zset.aof");
788
789 let records = vec![
790 AofRecord::ZAdd {
791 key: "board".into(),
792 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
793 },
794 AofRecord::ZRem {
795 key: "board".into(),
796 members: vec!["alice".into()],
797 },
798 ];
799
800 {
801 let mut writer = AofWriter::open(&path).unwrap();
802 for rec in &records {
803 writer.write_record(rec).unwrap();
804 }
805 writer.sync().unwrap();
806 }
807
808 let mut reader = AofReader::open(&path).unwrap();
809 let mut got = Vec::new();
810 while let Some(rec) = reader.read_record().unwrap() {
811 got.push(rec);
812 }
813 assert_eq!(records, got);
814 }
815
816 #[test]
817 fn record_round_trip_persist() {
818 let rec = AofRecord::Persist {
819 key: "mykey".into(),
820 };
821 let bytes = rec.to_bytes();
822 let decoded = AofRecord::from_bytes(&bytes).unwrap();
823 assert_eq!(rec, decoded);
824 }
825
826 #[test]
827 fn record_round_trip_pexpire() {
828 let rec = AofRecord::Pexpire {
829 key: "mykey".into(),
830 milliseconds: 5000,
831 };
832 let bytes = rec.to_bytes();
833 let decoded = AofRecord::from_bytes(&bytes).unwrap();
834 assert_eq!(rec, decoded);
835 }
836
837 #[test]
838 fn record_round_trip_incr() {
839 let rec = AofRecord::Incr {
840 key: "counter".into(),
841 };
842 let bytes = rec.to_bytes();
843 let decoded = AofRecord::from_bytes(&bytes).unwrap();
844 assert_eq!(rec, decoded);
845 }
846
847 #[test]
848 fn record_round_trip_decr() {
849 let rec = AofRecord::Decr {
850 key: "counter".into(),
851 };
852 let bytes = rec.to_bytes();
853 let decoded = AofRecord::from_bytes(&bytes).unwrap();
854 assert_eq!(rec, decoded);
855 }
856
857 #[test]
858 fn writer_reader_round_trip_with_persist_pexpire() {
859 let dir = temp_dir();
860 let path = dir.path().join("persist_pexpire.aof");
861
862 let records = vec![
863 AofRecord::Set {
864 key: "k".into(),
865 value: Bytes::from("v"),
866 expire_ms: 5000,
867 },
868 AofRecord::Persist { key: "k".into() },
869 AofRecord::Pexpire {
870 key: "k".into(),
871 milliseconds: 3000,
872 },
873 ];
874
875 {
876 let mut writer = AofWriter::open(&path).unwrap();
877 for rec in &records {
878 writer.write_record(rec).unwrap();
879 }
880 writer.sync().unwrap();
881 }
882
883 let mut reader = AofReader::open(&path).unwrap();
884 let mut got = Vec::new();
885 while let Some(rec) = reader.read_record().unwrap() {
886 got.push(rec);
887 }
888 assert_eq!(records, got);
889 }
890
891 #[test]
892 fn aof_path_format() {
893 let p = aof_path(Path::new("/data"), 3);
894 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
895 }
896}