1use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29 let bytes = format::read_bytes(r)?;
30 String::from_utf8(bytes).map_err(|_| {
31 FormatError::Io(io::Error::new(
32 io::ErrorKind::InvalidData,
33 format!("{field} is not valid utf-8"),
34 ))
35 })
36}
37
38const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57
58#[derive(Debug, Clone, PartialEq)]
60pub enum AofRecord {
61 Set {
63 key: String,
64 value: Bytes,
65 expire_ms: i64,
66 },
67 Del { key: String },
69 Expire { key: String, seconds: u64 },
71 LPush { key: String, values: Vec<Bytes> },
73 RPush { key: String, values: Vec<Bytes> },
75 LPop { key: String },
77 RPop { key: String },
79 ZAdd {
81 key: String,
82 members: Vec<(f64, String)>,
83 },
84 ZRem { key: String, members: Vec<String> },
86 Persist { key: String },
88 Pexpire { key: String, milliseconds: u64 },
90 Incr { key: String },
92 Decr { key: String },
94 HSet {
96 key: String,
97 fields: Vec<(String, Bytes)>,
98 },
99 HDel { key: String, fields: Vec<String> },
101 HIncrBy {
103 key: String,
104 field: String,
105 delta: i64,
106 },
107 SAdd { key: String, members: Vec<String> },
109 SRem { key: String, members: Vec<String> },
111}
112
113impl AofRecord {
114 fn to_bytes(&self) -> Vec<u8> {
116 let mut buf = Vec::new();
117 match self {
118 AofRecord::Set {
119 key,
120 value,
121 expire_ms,
122 } => {
123 format::write_u8(&mut buf, TAG_SET).expect("vec write");
124 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
125 format::write_bytes(&mut buf, value).expect("vec write");
126 format::write_i64(&mut buf, *expire_ms).expect("vec write");
127 }
128 AofRecord::Del { key } => {
129 format::write_u8(&mut buf, TAG_DEL).expect("vec write");
130 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
131 }
132 AofRecord::Expire { key, seconds } => {
133 format::write_u8(&mut buf, TAG_EXPIRE).expect("vec write");
134 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
135 format::write_i64(&mut buf, *seconds as i64).expect("vec write");
136 }
137 AofRecord::LPush { key, values } => {
138 format::write_u8(&mut buf, TAG_LPUSH).expect("vec write");
139 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
140 format::write_u32(&mut buf, values.len() as u32).expect("vec write");
141 for v in values {
142 format::write_bytes(&mut buf, v).expect("vec write");
143 }
144 }
145 AofRecord::RPush { key, values } => {
146 format::write_u8(&mut buf, TAG_RPUSH).expect("vec write");
147 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
148 format::write_u32(&mut buf, values.len() as u32).expect("vec write");
149 for v in values {
150 format::write_bytes(&mut buf, v).expect("vec write");
151 }
152 }
153 AofRecord::LPop { key } => {
154 format::write_u8(&mut buf, TAG_LPOP).expect("vec write");
155 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
156 }
157 AofRecord::RPop { key } => {
158 format::write_u8(&mut buf, TAG_RPOP).expect("vec write");
159 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
160 }
161 AofRecord::ZAdd { key, members } => {
162 format::write_u8(&mut buf, TAG_ZADD).expect("vec write");
163 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
164 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
165 for (score, member) in members {
166 format::write_f64(&mut buf, *score).expect("vec write");
167 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
168 }
169 }
170 AofRecord::ZRem { key, members } => {
171 format::write_u8(&mut buf, TAG_ZREM).expect("vec write");
172 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
173 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
174 for member in members {
175 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
176 }
177 }
178 AofRecord::Persist { key } => {
179 format::write_u8(&mut buf, TAG_PERSIST).expect("vec write");
180 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
181 }
182 AofRecord::Pexpire { key, milliseconds } => {
183 format::write_u8(&mut buf, TAG_PEXPIRE).expect("vec write");
184 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
185 format::write_i64(&mut buf, *milliseconds as i64).expect("vec write");
186 }
187 AofRecord::Incr { key } => {
188 format::write_u8(&mut buf, TAG_INCR).expect("vec write");
189 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
190 }
191 AofRecord::Decr { key } => {
192 format::write_u8(&mut buf, TAG_DECR).expect("vec write");
193 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
194 }
195 AofRecord::HSet { key, fields } => {
196 format::write_u8(&mut buf, TAG_HSET).expect("vec write");
197 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
198 format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
199 for (field, value) in fields {
200 format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
201 format::write_bytes(&mut buf, value).expect("vec write");
202 }
203 }
204 AofRecord::HDel { key, fields } => {
205 format::write_u8(&mut buf, TAG_HDEL).expect("vec write");
206 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
207 format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
208 for field in fields {
209 format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
210 }
211 }
212 AofRecord::HIncrBy { key, field, delta } => {
213 format::write_u8(&mut buf, TAG_HINCRBY).expect("vec write");
214 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
215 format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
216 format::write_i64(&mut buf, *delta).expect("vec write");
217 }
218 AofRecord::SAdd { key, members } => {
219 format::write_u8(&mut buf, TAG_SADD).expect("vec write");
220 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
221 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
222 for member in members {
223 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
224 }
225 }
226 AofRecord::SRem { key, members } => {
227 format::write_u8(&mut buf, TAG_SREM).expect("vec write");
228 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
229 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
230 for member in members {
231 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
232 }
233 }
234 }
235 buf
236 }
237
238 fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
240 let mut cursor = io::Cursor::new(data);
241 let tag = format::read_u8(&mut cursor)?;
242 match tag {
243 TAG_SET => {
244 let key = read_string(&mut cursor, "key")?;
245 let value = format::read_bytes(&mut cursor)?;
246 let expire_ms = format::read_i64(&mut cursor)?;
247 Ok(AofRecord::Set {
248 key,
249 value: Bytes::from(value),
250 expire_ms,
251 })
252 }
253 TAG_DEL => {
254 let key = read_string(&mut cursor, "key")?;
255 Ok(AofRecord::Del { key })
256 }
257 TAG_EXPIRE => {
258 let key = read_string(&mut cursor, "key")?;
259 let seconds = format::read_i64(&mut cursor)? as u64;
260 Ok(AofRecord::Expire { key, seconds })
261 }
262 TAG_LPUSH | TAG_RPUSH => {
263 let key = read_string(&mut cursor, "key")?;
264 let count = format::read_u32(&mut cursor)?;
265 let mut values = Vec::with_capacity(count as usize);
266 for _ in 0..count {
267 values.push(Bytes::from(format::read_bytes(&mut cursor)?));
268 }
269 if tag == TAG_LPUSH {
270 Ok(AofRecord::LPush { key, values })
271 } else {
272 Ok(AofRecord::RPush { key, values })
273 }
274 }
275 TAG_LPOP => {
276 let key = read_string(&mut cursor, "key")?;
277 Ok(AofRecord::LPop { key })
278 }
279 TAG_RPOP => {
280 let key = read_string(&mut cursor, "key")?;
281 Ok(AofRecord::RPop { key })
282 }
283 TAG_ZADD => {
284 let key = read_string(&mut cursor, "key")?;
285 let count = format::read_u32(&mut cursor)?;
286 let mut members = Vec::with_capacity(count as usize);
287 for _ in 0..count {
288 let score = format::read_f64(&mut cursor)?;
289 let member = read_string(&mut cursor, "member")?;
290 members.push((score, member));
291 }
292 Ok(AofRecord::ZAdd { key, members })
293 }
294 TAG_ZREM => {
295 let key = read_string(&mut cursor, "key")?;
296 let count = format::read_u32(&mut cursor)?;
297 let mut members = Vec::with_capacity(count as usize);
298 for _ in 0..count {
299 members.push(read_string(&mut cursor, "member")?);
300 }
301 Ok(AofRecord::ZRem { key, members })
302 }
303 TAG_PERSIST => {
304 let key = read_string(&mut cursor, "key")?;
305 Ok(AofRecord::Persist { key })
306 }
307 TAG_PEXPIRE => {
308 let key = read_string(&mut cursor, "key")?;
309 let milliseconds = format::read_i64(&mut cursor)? as u64;
310 Ok(AofRecord::Pexpire { key, milliseconds })
311 }
312 TAG_INCR => {
313 let key = read_string(&mut cursor, "key")?;
314 Ok(AofRecord::Incr { key })
315 }
316 TAG_DECR => {
317 let key = read_string(&mut cursor, "key")?;
318 Ok(AofRecord::Decr { key })
319 }
320 TAG_HSET => {
321 let key = read_string(&mut cursor, "key")?;
322 let count = format::read_u32(&mut cursor)?;
323 let mut fields = Vec::with_capacity(count as usize);
324 for _ in 0..count {
325 let field = read_string(&mut cursor, "field")?;
326 let value = Bytes::from(format::read_bytes(&mut cursor)?);
327 fields.push((field, value));
328 }
329 Ok(AofRecord::HSet { key, fields })
330 }
331 TAG_HDEL => {
332 let key = read_string(&mut cursor, "key")?;
333 let count = format::read_u32(&mut cursor)?;
334 let mut fields = Vec::with_capacity(count as usize);
335 for _ in 0..count {
336 fields.push(read_string(&mut cursor, "field")?);
337 }
338 Ok(AofRecord::HDel { key, fields })
339 }
340 TAG_HINCRBY => {
341 let key = read_string(&mut cursor, "key")?;
342 let field = read_string(&mut cursor, "field")?;
343 let delta = format::read_i64(&mut cursor)?;
344 Ok(AofRecord::HIncrBy { key, field, delta })
345 }
346 TAG_SADD => {
347 let key = read_string(&mut cursor, "key")?;
348 let count = format::read_u32(&mut cursor)?;
349 let mut members = Vec::with_capacity(count as usize);
350 for _ in 0..count {
351 members.push(read_string(&mut cursor, "member")?);
352 }
353 Ok(AofRecord::SAdd { key, members })
354 }
355 TAG_SREM => {
356 let key = read_string(&mut cursor, "key")?;
357 let count = format::read_u32(&mut cursor)?;
358 let mut members = Vec::with_capacity(count as usize);
359 for _ in 0..count {
360 members.push(read_string(&mut cursor, "member")?);
361 }
362 Ok(AofRecord::SRem { key, members })
363 }
364 _ => Err(FormatError::UnknownTag(tag)),
365 }
366 }
367}
368
369#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
371pub enum FsyncPolicy {
372 Always,
374 #[default]
376 EverySec,
377 No,
379}
380
381pub struct AofWriter {
383 writer: BufWriter<File>,
384 path: PathBuf,
385}
386
387impl AofWriter {
388 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
391 let path = path.into();
392 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
393
394 let file = OpenOptions::new().create(true).append(true).open(&path)?;
395 let mut writer = BufWriter::new(file);
396
397 if !exists {
398 format::write_header(&mut writer, format::AOF_MAGIC)?;
399 writer.flush()?;
400 }
401
402 Ok(Self { writer, path })
403 }
404
405 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
407 let payload = record.to_bytes();
408 let checksum = format::crc32(&payload);
409 self.writer.write_all(&payload)?;
410 format::write_u32(&mut self.writer, checksum)?;
411 Ok(())
412 }
413
414 pub fn flush(&mut self) -> Result<(), FormatError> {
416 self.writer.flush()?;
417 Ok(())
418 }
419
420 pub fn sync(&mut self) -> Result<(), FormatError> {
422 self.writer.flush()?;
423 self.writer.get_ref().sync_all()?;
424 Ok(())
425 }
426
427 pub fn path(&self) -> &Path {
429 &self.path
430 }
431
432 pub fn truncate(&mut self) -> Result<(), FormatError> {
435 self.writer.flush()?;
437
438 let file = OpenOptions::new()
440 .create(true)
441 .write(true)
442 .truncate(true)
443 .open(&self.path)?;
444 let mut writer = BufWriter::new(file);
445 format::write_header(&mut writer, format::AOF_MAGIC)?;
446 writer.flush()?;
447 writer.get_ref().sync_all()?;
449 self.writer = writer;
450 Ok(())
451 }
452}
453
454#[derive(Debug)]
456pub struct AofReader {
457 reader: BufReader<File>,
458}
459
460impl AofReader {
461 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
463 let file = File::open(path.as_ref())?;
464 let mut reader = BufReader::new(file);
465 let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
466 Ok(Self { reader })
467 }
468
469 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
475 let tag = match format::read_u8(&mut self.reader) {
477 Ok(t) => t,
478 Err(FormatError::UnexpectedEof) => return Ok(None),
479 Err(e) => return Err(e),
480 };
481
482 let record_result = self.read_payload_for_tag(tag);
485 match record_result {
486 Ok((payload, stored_crc)) => {
487 let mut full = Vec::with_capacity(1 + payload.len());
489 full.push(tag);
490 full.extend_from_slice(&payload);
491 format::verify_crc32(&full, stored_crc)?;
492 AofRecord::from_bytes(&full).map(Some)
493 }
494 Err(FormatError::UnexpectedEof) => Ok(None),
496 Err(e) => Err(e),
497 }
498 }
499
500 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
502 let mut payload = Vec::new();
503 match tag {
504 TAG_SET => {
505 let key = format::read_bytes(&mut self.reader)?;
507 format::write_bytes(&mut payload, &key).expect("vec write");
508 let value = format::read_bytes(&mut self.reader)?;
509 format::write_bytes(&mut payload, &value).expect("vec write");
510 let expire_ms = format::read_i64(&mut self.reader)?;
511 format::write_i64(&mut payload, expire_ms).expect("vec write");
512 }
513 TAG_DEL => {
514 let key = format::read_bytes(&mut self.reader)?;
515 format::write_bytes(&mut payload, &key).expect("vec write");
516 }
517 TAG_EXPIRE => {
518 let key = format::read_bytes(&mut self.reader)?;
519 format::write_bytes(&mut payload, &key).expect("vec write");
520 let seconds = format::read_i64(&mut self.reader)?;
521 format::write_i64(&mut payload, seconds).expect("vec write");
522 }
523 TAG_LPUSH | TAG_RPUSH => {
524 let key = format::read_bytes(&mut self.reader)?;
525 format::write_bytes(&mut payload, &key).expect("vec write");
526 let count = format::read_u32(&mut self.reader)?;
527 format::write_u32(&mut payload, count).expect("vec write");
528 for _ in 0..count {
529 let val = format::read_bytes(&mut self.reader)?;
530 format::write_bytes(&mut payload, &val).expect("vec write");
531 }
532 }
533 TAG_LPOP | TAG_RPOP => {
534 let key = format::read_bytes(&mut self.reader)?;
535 format::write_bytes(&mut payload, &key).expect("vec write");
536 }
537 TAG_ZADD => {
538 let key = format::read_bytes(&mut self.reader)?;
539 format::write_bytes(&mut payload, &key).expect("vec write");
540 let count = format::read_u32(&mut self.reader)?;
541 format::write_u32(&mut payload, count).expect("vec write");
542 for _ in 0..count {
543 let score = format::read_f64(&mut self.reader)?;
544 format::write_f64(&mut payload, score).expect("vec write");
545 let member = format::read_bytes(&mut self.reader)?;
546 format::write_bytes(&mut payload, &member).expect("vec write");
547 }
548 }
549 TAG_ZREM => {
550 let key = format::read_bytes(&mut self.reader)?;
551 format::write_bytes(&mut payload, &key).expect("vec write");
552 let count = format::read_u32(&mut self.reader)?;
553 format::write_u32(&mut payload, count).expect("vec write");
554 for _ in 0..count {
555 let member = format::read_bytes(&mut self.reader)?;
556 format::write_bytes(&mut payload, &member).expect("vec write");
557 }
558 }
559 TAG_PERSIST => {
560 let key = format::read_bytes(&mut self.reader)?;
561 format::write_bytes(&mut payload, &key).expect("vec write");
562 }
563 TAG_PEXPIRE => {
564 let key = format::read_bytes(&mut self.reader)?;
565 format::write_bytes(&mut payload, &key).expect("vec write");
566 let millis = format::read_i64(&mut self.reader)?;
567 format::write_i64(&mut payload, millis).expect("vec write");
568 }
569 TAG_INCR | TAG_DECR => {
570 let key = format::read_bytes(&mut self.reader)?;
571 format::write_bytes(&mut payload, &key).expect("vec write");
572 }
573 _ => return Err(FormatError::UnknownTag(tag)),
574 }
575 let stored_crc = format::read_u32(&mut self.reader)?;
576 Ok((payload, stored_crc))
577 }
578}
579
580pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
582 data_dir.join(format!("shard-{shard_id}.aof"))
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 fn temp_dir() -> tempfile::TempDir {
589 tempfile::tempdir().expect("create temp dir")
590 }
591
592 #[test]
593 fn record_round_trip_set() {
594 let rec = AofRecord::Set {
595 key: "hello".into(),
596 value: Bytes::from("world"),
597 expire_ms: 5000,
598 };
599 let bytes = rec.to_bytes();
600 let decoded = AofRecord::from_bytes(&bytes).unwrap();
601 assert_eq!(rec, decoded);
602 }
603
604 #[test]
605 fn record_round_trip_del() {
606 let rec = AofRecord::Del { key: "gone".into() };
607 let bytes = rec.to_bytes();
608 let decoded = AofRecord::from_bytes(&bytes).unwrap();
609 assert_eq!(rec, decoded);
610 }
611
612 #[test]
613 fn record_round_trip_expire() {
614 let rec = AofRecord::Expire {
615 key: "ttl".into(),
616 seconds: 300,
617 };
618 let bytes = rec.to_bytes();
619 let decoded = AofRecord::from_bytes(&bytes).unwrap();
620 assert_eq!(rec, decoded);
621 }
622
623 #[test]
624 fn set_with_no_expiry() {
625 let rec = AofRecord::Set {
626 key: "k".into(),
627 value: Bytes::from("v"),
628 expire_ms: -1,
629 };
630 let bytes = rec.to_bytes();
631 let decoded = AofRecord::from_bytes(&bytes).unwrap();
632 assert_eq!(rec, decoded);
633 }
634
635 #[test]
636 fn writer_reader_round_trip() {
637 let dir = temp_dir();
638 let path = dir.path().join("test.aof");
639
640 let records = vec![
641 AofRecord::Set {
642 key: "a".into(),
643 value: Bytes::from("1"),
644 expire_ms: -1,
645 },
646 AofRecord::Set {
647 key: "b".into(),
648 value: Bytes::from("2"),
649 expire_ms: 10_000,
650 },
651 AofRecord::Del { key: "a".into() },
652 AofRecord::Expire {
653 key: "b".into(),
654 seconds: 60,
655 },
656 ];
657
658 {
660 let mut writer = AofWriter::open(&path).unwrap();
661 for rec in &records {
662 writer.write_record(rec).unwrap();
663 }
664 writer.sync().unwrap();
665 }
666
667 let mut reader = AofReader::open(&path).unwrap();
669 let mut got = Vec::new();
670 while let Some(rec) = reader.read_record().unwrap() {
671 got.push(rec);
672 }
673 assert_eq!(records, got);
674 }
675
676 #[test]
677 fn empty_aof_returns_no_records() {
678 let dir = temp_dir();
679 let path = dir.path().join("empty.aof");
680
681 {
683 let _writer = AofWriter::open(&path).unwrap();
684 }
685
686 let mut reader = AofReader::open(&path).unwrap();
687 assert!(reader.read_record().unwrap().is_none());
688 }
689
690 #[test]
691 fn truncated_record_treated_as_eof() {
692 let dir = temp_dir();
693 let path = dir.path().join("trunc.aof");
694
695 {
697 let mut writer = AofWriter::open(&path).unwrap();
698 writer
699 .write_record(&AofRecord::Set {
700 key: "ok".into(),
701 value: Bytes::from("good"),
702 expire_ms: -1,
703 })
704 .unwrap();
705 writer.flush().unwrap();
706 }
707
708 {
710 let mut file = OpenOptions::new().append(true).open(&path).unwrap();
711 file.write_all(&[TAG_SET]).unwrap();
712 }
713
714 let mut reader = AofReader::open(&path).unwrap();
715 let rec = reader.read_record().unwrap().unwrap();
717 assert!(matches!(rec, AofRecord::Set { .. }));
718 assert!(reader.read_record().unwrap().is_none());
720 }
721
722 #[test]
723 fn corrupt_crc_detected() {
724 let dir = temp_dir();
725 let path = dir.path().join("corrupt.aof");
726
727 {
728 let mut writer = AofWriter::open(&path).unwrap();
729 writer
730 .write_record(&AofRecord::Set {
731 key: "k".into(),
732 value: Bytes::from("v"),
733 expire_ms: -1,
734 })
735 .unwrap();
736 writer.flush().unwrap();
737 }
738
739 let mut data = fs::read(&path).unwrap();
741 let last = data.len() - 1;
742 data[last] ^= 0xFF;
743 fs::write(&path, &data).unwrap();
744
745 let mut reader = AofReader::open(&path).unwrap();
746 let err = reader.read_record().unwrap_err();
747 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
748 }
749
750 #[test]
751 fn missing_magic_is_error() {
752 let dir = temp_dir();
753 let path = dir.path().join("bad.aof");
754 fs::write(&path, b"NOT_AOF_DATA").unwrap();
755
756 let err = AofReader::open(&path).unwrap_err();
757 assert!(matches!(err, FormatError::InvalidMagic));
758 }
759
760 #[test]
761 fn truncate_resets_aof() {
762 let dir = temp_dir();
763 let path = dir.path().join("reset.aof");
764
765 {
766 let mut writer = AofWriter::open(&path).unwrap();
767 writer
768 .write_record(&AofRecord::Set {
769 key: "old".into(),
770 value: Bytes::from("data"),
771 expire_ms: -1,
772 })
773 .unwrap();
774 writer.truncate().unwrap();
775
776 writer
778 .write_record(&AofRecord::Set {
779 key: "new".into(),
780 value: Bytes::from("fresh"),
781 expire_ms: -1,
782 })
783 .unwrap();
784 writer.sync().unwrap();
785 }
786
787 let mut reader = AofReader::open(&path).unwrap();
788 let rec = reader.read_record().unwrap().unwrap();
789 match rec {
790 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
791 other => panic!("expected Set, got {other:?}"),
792 }
793 assert!(reader.read_record().unwrap().is_none());
795 }
796
797 #[test]
798 fn record_round_trip_lpush() {
799 let rec = AofRecord::LPush {
800 key: "list".into(),
801 values: vec![Bytes::from("a"), Bytes::from("b")],
802 };
803 let bytes = rec.to_bytes();
804 let decoded = AofRecord::from_bytes(&bytes).unwrap();
805 assert_eq!(rec, decoded);
806 }
807
808 #[test]
809 fn record_round_trip_rpush() {
810 let rec = AofRecord::RPush {
811 key: "list".into(),
812 values: vec![Bytes::from("x")],
813 };
814 let bytes = rec.to_bytes();
815 let decoded = AofRecord::from_bytes(&bytes).unwrap();
816 assert_eq!(rec, decoded);
817 }
818
819 #[test]
820 fn record_round_trip_lpop() {
821 let rec = AofRecord::LPop { key: "list".into() };
822 let bytes = rec.to_bytes();
823 let decoded = AofRecord::from_bytes(&bytes).unwrap();
824 assert_eq!(rec, decoded);
825 }
826
827 #[test]
828 fn record_round_trip_rpop() {
829 let rec = AofRecord::RPop { key: "list".into() };
830 let bytes = rec.to_bytes();
831 let decoded = AofRecord::from_bytes(&bytes).unwrap();
832 assert_eq!(rec, decoded);
833 }
834
835 #[test]
836 fn writer_reader_round_trip_with_list_records() {
837 let dir = temp_dir();
838 let path = dir.path().join("list.aof");
839
840 let records = vec![
841 AofRecord::LPush {
842 key: "l".into(),
843 values: vec![Bytes::from("a"), Bytes::from("b")],
844 },
845 AofRecord::RPush {
846 key: "l".into(),
847 values: vec![Bytes::from("c")],
848 },
849 AofRecord::LPop { key: "l".into() },
850 AofRecord::RPop { key: "l".into() },
851 ];
852
853 {
854 let mut writer = AofWriter::open(&path).unwrap();
855 for rec in &records {
856 writer.write_record(rec).unwrap();
857 }
858 writer.sync().unwrap();
859 }
860
861 let mut reader = AofReader::open(&path).unwrap();
862 let mut got = Vec::new();
863 while let Some(rec) = reader.read_record().unwrap() {
864 got.push(rec);
865 }
866 assert_eq!(records, got);
867 }
868
869 #[test]
870 fn record_round_trip_zadd() {
871 let rec = AofRecord::ZAdd {
872 key: "board".into(),
873 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
874 };
875 let bytes = rec.to_bytes();
876 let decoded = AofRecord::from_bytes(&bytes).unwrap();
877 assert_eq!(rec, decoded);
878 }
879
880 #[test]
881 fn record_round_trip_zrem() {
882 let rec = AofRecord::ZRem {
883 key: "board".into(),
884 members: vec!["alice".into(), "bob".into()],
885 };
886 let bytes = rec.to_bytes();
887 let decoded = AofRecord::from_bytes(&bytes).unwrap();
888 assert_eq!(rec, decoded);
889 }
890
891 #[test]
892 fn writer_reader_round_trip_with_sorted_set_records() {
893 let dir = temp_dir();
894 let path = dir.path().join("zset.aof");
895
896 let records = vec![
897 AofRecord::ZAdd {
898 key: "board".into(),
899 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
900 },
901 AofRecord::ZRem {
902 key: "board".into(),
903 members: vec!["alice".into()],
904 },
905 ];
906
907 {
908 let mut writer = AofWriter::open(&path).unwrap();
909 for rec in &records {
910 writer.write_record(rec).unwrap();
911 }
912 writer.sync().unwrap();
913 }
914
915 let mut reader = AofReader::open(&path).unwrap();
916 let mut got = Vec::new();
917 while let Some(rec) = reader.read_record().unwrap() {
918 got.push(rec);
919 }
920 assert_eq!(records, got);
921 }
922
923 #[test]
924 fn record_round_trip_persist() {
925 let rec = AofRecord::Persist {
926 key: "mykey".into(),
927 };
928 let bytes = rec.to_bytes();
929 let decoded = AofRecord::from_bytes(&bytes).unwrap();
930 assert_eq!(rec, decoded);
931 }
932
933 #[test]
934 fn record_round_trip_pexpire() {
935 let rec = AofRecord::Pexpire {
936 key: "mykey".into(),
937 milliseconds: 5000,
938 };
939 let bytes = rec.to_bytes();
940 let decoded = AofRecord::from_bytes(&bytes).unwrap();
941 assert_eq!(rec, decoded);
942 }
943
944 #[test]
945 fn record_round_trip_incr() {
946 let rec = AofRecord::Incr {
947 key: "counter".into(),
948 };
949 let bytes = rec.to_bytes();
950 let decoded = AofRecord::from_bytes(&bytes).unwrap();
951 assert_eq!(rec, decoded);
952 }
953
954 #[test]
955 fn record_round_trip_decr() {
956 let rec = AofRecord::Decr {
957 key: "counter".into(),
958 };
959 let bytes = rec.to_bytes();
960 let decoded = AofRecord::from_bytes(&bytes).unwrap();
961 assert_eq!(rec, decoded);
962 }
963
964 #[test]
965 fn writer_reader_round_trip_with_persist_pexpire() {
966 let dir = temp_dir();
967 let path = dir.path().join("persist_pexpire.aof");
968
969 let records = vec![
970 AofRecord::Set {
971 key: "k".into(),
972 value: Bytes::from("v"),
973 expire_ms: 5000,
974 },
975 AofRecord::Persist { key: "k".into() },
976 AofRecord::Pexpire {
977 key: "k".into(),
978 milliseconds: 3000,
979 },
980 ];
981
982 {
983 let mut writer = AofWriter::open(&path).unwrap();
984 for rec in &records {
985 writer.write_record(rec).unwrap();
986 }
987 writer.sync().unwrap();
988 }
989
990 let mut reader = AofReader::open(&path).unwrap();
991 let mut got = Vec::new();
992 while let Some(rec) = reader.read_record().unwrap() {
993 got.push(rec);
994 }
995 assert_eq!(records, got);
996 }
997
998 #[test]
999 fn aof_path_format() {
1000 let p = aof_path(Path::new("/data"), 3);
1001 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1002 }
1003
1004 #[test]
1005 fn record_round_trip_hset() {
1006 let rec = AofRecord::HSet {
1007 key: "hash".into(),
1008 fields: vec![
1009 ("f1".into(), Bytes::from("v1")),
1010 ("f2".into(), Bytes::from("v2")),
1011 ],
1012 };
1013 let bytes = rec.to_bytes();
1014 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1015 assert_eq!(rec, decoded);
1016 }
1017
1018 #[test]
1019 fn record_round_trip_hdel() {
1020 let rec = AofRecord::HDel {
1021 key: "hash".into(),
1022 fields: vec!["f1".into(), "f2".into()],
1023 };
1024 let bytes = rec.to_bytes();
1025 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1026 assert_eq!(rec, decoded);
1027 }
1028
1029 #[test]
1030 fn record_round_trip_hincrby() {
1031 let rec = AofRecord::HIncrBy {
1032 key: "hash".into(),
1033 field: "counter".into(),
1034 delta: -42,
1035 };
1036 let bytes = rec.to_bytes();
1037 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1038 assert_eq!(rec, decoded);
1039 }
1040
1041 #[test]
1042 fn record_round_trip_sadd() {
1043 let rec = AofRecord::SAdd {
1044 key: "set".into(),
1045 members: vec!["m1".into(), "m2".into(), "m3".into()],
1046 };
1047 let bytes = rec.to_bytes();
1048 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1049 assert_eq!(rec, decoded);
1050 }
1051
1052 #[test]
1053 fn record_round_trip_srem() {
1054 let rec = AofRecord::SRem {
1055 key: "set".into(),
1056 members: vec!["m1".into()],
1057 };
1058 let bytes = rec.to_bytes();
1059 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1060 assert_eq!(rec, decoded);
1061 }
1062}