1use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29 let bytes = format::read_bytes(r)?;
30 String::from_utf8(bytes).map_err(|_| {
31 FormatError::Io(io::Error::new(
32 io::ErrorKind::InvalidData,
33 format!("{field} is not valid utf-8"),
34 ))
35 })
36}
37
38const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57
58#[derive(Debug, Clone, PartialEq)]
60pub enum AofRecord {
61 Set {
63 key: String,
64 value: Bytes,
65 expire_ms: i64,
66 },
67 Del { key: String },
69 Expire { key: String, seconds: u64 },
71 LPush { key: String, values: Vec<Bytes> },
73 RPush { key: String, values: Vec<Bytes> },
75 LPop { key: String },
77 RPop { key: String },
79 ZAdd {
81 key: String,
82 members: Vec<(f64, String)>,
83 },
84 ZRem { key: String, members: Vec<String> },
86 Persist { key: String },
88 Pexpire { key: String, milliseconds: u64 },
90 Incr { key: String },
92 Decr { key: String },
94 HSet {
96 key: String,
97 fields: Vec<(String, Bytes)>,
98 },
99 HDel { key: String, fields: Vec<String> },
101 HIncrBy {
103 key: String,
104 field: String,
105 delta: i64,
106 },
107 SAdd { key: String, members: Vec<String> },
109 SRem { key: String, members: Vec<String> },
111}
112
113impl AofRecord {
114 fn to_bytes(&self) -> Vec<u8> {
116 let mut buf = Vec::new();
117 match self {
118 AofRecord::Set {
119 key,
120 value,
121 expire_ms,
122 } => {
123 format::write_u8(&mut buf, TAG_SET).expect("vec write");
124 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
125 format::write_bytes(&mut buf, value).expect("vec write");
126 format::write_i64(&mut buf, *expire_ms).expect("vec write");
127 }
128 AofRecord::Del { key } => {
129 format::write_u8(&mut buf, TAG_DEL).expect("vec write");
130 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
131 }
132 AofRecord::Expire { key, seconds } => {
133 format::write_u8(&mut buf, TAG_EXPIRE).expect("vec write");
134 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
135 format::write_i64(&mut buf, *seconds as i64).expect("vec write");
136 }
137 AofRecord::LPush { key, values } => {
138 format::write_u8(&mut buf, TAG_LPUSH).expect("vec write");
139 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
140 format::write_u32(&mut buf, values.len() as u32).expect("vec write");
141 for v in values {
142 format::write_bytes(&mut buf, v).expect("vec write");
143 }
144 }
145 AofRecord::RPush { key, values } => {
146 format::write_u8(&mut buf, TAG_RPUSH).expect("vec write");
147 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
148 format::write_u32(&mut buf, values.len() as u32).expect("vec write");
149 for v in values {
150 format::write_bytes(&mut buf, v).expect("vec write");
151 }
152 }
153 AofRecord::LPop { key } => {
154 format::write_u8(&mut buf, TAG_LPOP).expect("vec write");
155 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
156 }
157 AofRecord::RPop { key } => {
158 format::write_u8(&mut buf, TAG_RPOP).expect("vec write");
159 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
160 }
161 AofRecord::ZAdd { key, members } => {
162 format::write_u8(&mut buf, TAG_ZADD).expect("vec write");
163 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
164 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
165 for (score, member) in members {
166 format::write_f64(&mut buf, *score).expect("vec write");
167 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
168 }
169 }
170 AofRecord::ZRem { key, members } => {
171 format::write_u8(&mut buf, TAG_ZREM).expect("vec write");
172 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
173 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
174 for member in members {
175 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
176 }
177 }
178 AofRecord::Persist { key } => {
179 format::write_u8(&mut buf, TAG_PERSIST).expect("vec write");
180 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
181 }
182 AofRecord::Pexpire { key, milliseconds } => {
183 format::write_u8(&mut buf, TAG_PEXPIRE).expect("vec write");
184 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
185 format::write_i64(&mut buf, *milliseconds as i64).expect("vec write");
186 }
187 AofRecord::Incr { key } => {
188 format::write_u8(&mut buf, TAG_INCR).expect("vec write");
189 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
190 }
191 AofRecord::Decr { key } => {
192 format::write_u8(&mut buf, TAG_DECR).expect("vec write");
193 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
194 }
195 AofRecord::HSet { key, fields } => {
196 format::write_u8(&mut buf, TAG_HSET).expect("vec write");
197 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
198 format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
199 for (field, value) in fields {
200 format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
201 format::write_bytes(&mut buf, value).expect("vec write");
202 }
203 }
204 AofRecord::HDel { key, fields } => {
205 format::write_u8(&mut buf, TAG_HDEL).expect("vec write");
206 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
207 format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
208 for field in fields {
209 format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
210 }
211 }
212 AofRecord::HIncrBy { key, field, delta } => {
213 format::write_u8(&mut buf, TAG_HINCRBY).expect("vec write");
214 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
215 format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
216 format::write_i64(&mut buf, *delta).expect("vec write");
217 }
218 AofRecord::SAdd { key, members } => {
219 format::write_u8(&mut buf, TAG_SADD).expect("vec write");
220 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
221 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
222 for member in members {
223 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
224 }
225 }
226 AofRecord::SRem { key, members } => {
227 format::write_u8(&mut buf, TAG_SREM).expect("vec write");
228 format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
229 format::write_u32(&mut buf, members.len() as u32).expect("vec write");
230 for member in members {
231 format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
232 }
233 }
234 }
235 buf
236 }
237
238 fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
240 let mut cursor = io::Cursor::new(data);
241 let tag = format::read_u8(&mut cursor)?;
242 match tag {
243 TAG_SET => {
244 let key = read_string(&mut cursor, "key")?;
245 let value = format::read_bytes(&mut cursor)?;
246 let expire_ms = format::read_i64(&mut cursor)?;
247 Ok(AofRecord::Set {
248 key,
249 value: Bytes::from(value),
250 expire_ms,
251 })
252 }
253 TAG_DEL => {
254 let key = read_string(&mut cursor, "key")?;
255 Ok(AofRecord::Del { key })
256 }
257 TAG_EXPIRE => {
258 let key = read_string(&mut cursor, "key")?;
259 let seconds = format::read_i64(&mut cursor)? as u64;
260 Ok(AofRecord::Expire { key, seconds })
261 }
262 TAG_LPUSH | TAG_RPUSH => {
263 let key = read_string(&mut cursor, "key")?;
264 let count = format::read_u32(&mut cursor)?;
265 let mut values = Vec::with_capacity(count as usize);
266 for _ in 0..count {
267 values.push(Bytes::from(format::read_bytes(&mut cursor)?));
268 }
269 if tag == TAG_LPUSH {
270 Ok(AofRecord::LPush { key, values })
271 } else {
272 Ok(AofRecord::RPush { key, values })
273 }
274 }
275 TAG_LPOP => {
276 let key = read_string(&mut cursor, "key")?;
277 Ok(AofRecord::LPop { key })
278 }
279 TAG_RPOP => {
280 let key = read_string(&mut cursor, "key")?;
281 Ok(AofRecord::RPop { key })
282 }
283 TAG_ZADD => {
284 let key = read_string(&mut cursor, "key")?;
285 let count = format::read_u32(&mut cursor)?;
286 let mut members = Vec::with_capacity(count as usize);
287 for _ in 0..count {
288 let score = format::read_f64(&mut cursor)?;
289 let member = read_string(&mut cursor, "member")?;
290 members.push((score, member));
291 }
292 Ok(AofRecord::ZAdd { key, members })
293 }
294 TAG_ZREM => {
295 let key = read_string(&mut cursor, "key")?;
296 let count = format::read_u32(&mut cursor)?;
297 let mut members = Vec::with_capacity(count as usize);
298 for _ in 0..count {
299 members.push(read_string(&mut cursor, "member")?);
300 }
301 Ok(AofRecord::ZRem { key, members })
302 }
303 TAG_PERSIST => {
304 let key = read_string(&mut cursor, "key")?;
305 Ok(AofRecord::Persist { key })
306 }
307 TAG_PEXPIRE => {
308 let key = read_string(&mut cursor, "key")?;
309 let milliseconds = format::read_i64(&mut cursor)? as u64;
310 Ok(AofRecord::Pexpire { key, milliseconds })
311 }
312 TAG_INCR => {
313 let key = read_string(&mut cursor, "key")?;
314 Ok(AofRecord::Incr { key })
315 }
316 TAG_DECR => {
317 let key = read_string(&mut cursor, "key")?;
318 Ok(AofRecord::Decr { key })
319 }
320 TAG_HSET => {
321 let key = read_string(&mut cursor, "key")?;
322 let count = format::read_u32(&mut cursor)?;
323 let mut fields = Vec::with_capacity(count as usize);
324 for _ in 0..count {
325 let field = read_string(&mut cursor, "field")?;
326 let value = Bytes::from(format::read_bytes(&mut cursor)?);
327 fields.push((field, value));
328 }
329 Ok(AofRecord::HSet { key, fields })
330 }
331 TAG_HDEL => {
332 let key = read_string(&mut cursor, "key")?;
333 let count = format::read_u32(&mut cursor)?;
334 let mut fields = Vec::with_capacity(count as usize);
335 for _ in 0..count {
336 fields.push(read_string(&mut cursor, "field")?);
337 }
338 Ok(AofRecord::HDel { key, fields })
339 }
340 TAG_HINCRBY => {
341 let key = read_string(&mut cursor, "key")?;
342 let field = read_string(&mut cursor, "field")?;
343 let delta = format::read_i64(&mut cursor)?;
344 Ok(AofRecord::HIncrBy { key, field, delta })
345 }
346 TAG_SADD => {
347 let key = read_string(&mut cursor, "key")?;
348 let count = format::read_u32(&mut cursor)?;
349 let mut members = Vec::with_capacity(count as usize);
350 for _ in 0..count {
351 members.push(read_string(&mut cursor, "member")?);
352 }
353 Ok(AofRecord::SAdd { key, members })
354 }
355 TAG_SREM => {
356 let key = read_string(&mut cursor, "key")?;
357 let count = format::read_u32(&mut cursor)?;
358 let mut members = Vec::with_capacity(count as usize);
359 for _ in 0..count {
360 members.push(read_string(&mut cursor, "member")?);
361 }
362 Ok(AofRecord::SRem { key, members })
363 }
364 _ => Err(FormatError::UnknownTag(tag)),
365 }
366 }
367}
368
369#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
371pub enum FsyncPolicy {
372 Always,
374 #[default]
376 EverySec,
377 No,
379}
380
381pub struct AofWriter {
383 writer: BufWriter<File>,
384 path: PathBuf,
385}
386
387impl AofWriter {
388 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
391 let path = path.into();
392 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
393
394 let file = OpenOptions::new().create(true).append(true).open(&path)?;
395 let mut writer = BufWriter::new(file);
396
397 if !exists {
398 format::write_header(&mut writer, format::AOF_MAGIC)?;
399 writer.flush()?;
400 }
401
402 Ok(Self { writer, path })
403 }
404
405 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
407 let payload = record.to_bytes();
408 let checksum = format::crc32(&payload);
409 self.writer.write_all(&payload)?;
410 format::write_u32(&mut self.writer, checksum)?;
411 Ok(())
412 }
413
414 pub fn flush(&mut self) -> Result<(), FormatError> {
416 self.writer.flush()?;
417 Ok(())
418 }
419
420 pub fn sync(&mut self) -> Result<(), FormatError> {
422 self.writer.flush()?;
423 self.writer.get_ref().sync_all()?;
424 Ok(())
425 }
426
427 pub fn path(&self) -> &Path {
429 &self.path
430 }
431
432 pub fn truncate(&mut self) -> Result<(), FormatError> {
435 self.writer.flush()?;
437
438 let file = OpenOptions::new()
440 .create(true)
441 .write(true)
442 .truncate(true)
443 .open(&self.path)?;
444 let mut writer = BufWriter::new(file);
445 format::write_header(&mut writer, format::AOF_MAGIC)?;
446 writer.flush()?;
447 self.writer = writer;
448 Ok(())
449 }
450}
451
452#[derive(Debug)]
454pub struct AofReader {
455 reader: BufReader<File>,
456}
457
458impl AofReader {
459 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
461 let file = File::open(path.as_ref())?;
462 let mut reader = BufReader::new(file);
463 let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
464 Ok(Self { reader })
465 }
466
467 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
473 let tag = match format::read_u8(&mut self.reader) {
475 Ok(t) => t,
476 Err(FormatError::UnexpectedEof) => return Ok(None),
477 Err(e) => return Err(e),
478 };
479
480 let record_result = self.read_payload_for_tag(tag);
483 match record_result {
484 Ok((payload, stored_crc)) => {
485 let mut full = Vec::with_capacity(1 + payload.len());
487 full.push(tag);
488 full.extend_from_slice(&payload);
489 format::verify_crc32(&full, stored_crc)?;
490 AofRecord::from_bytes(&full).map(Some)
491 }
492 Err(FormatError::UnexpectedEof) => Ok(None),
494 Err(e) => Err(e),
495 }
496 }
497
498 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
500 let mut payload = Vec::new();
501 match tag {
502 TAG_SET => {
503 let key = format::read_bytes(&mut self.reader)?;
505 format::write_bytes(&mut payload, &key).expect("vec write");
506 let value = format::read_bytes(&mut self.reader)?;
507 format::write_bytes(&mut payload, &value).expect("vec write");
508 let expire_ms = format::read_i64(&mut self.reader)?;
509 format::write_i64(&mut payload, expire_ms).expect("vec write");
510 }
511 TAG_DEL => {
512 let key = format::read_bytes(&mut self.reader)?;
513 format::write_bytes(&mut payload, &key).expect("vec write");
514 }
515 TAG_EXPIRE => {
516 let key = format::read_bytes(&mut self.reader)?;
517 format::write_bytes(&mut payload, &key).expect("vec write");
518 let seconds = format::read_i64(&mut self.reader)?;
519 format::write_i64(&mut payload, seconds).expect("vec write");
520 }
521 TAG_LPUSH | TAG_RPUSH => {
522 let key = format::read_bytes(&mut self.reader)?;
523 format::write_bytes(&mut payload, &key).expect("vec write");
524 let count = format::read_u32(&mut self.reader)?;
525 format::write_u32(&mut payload, count).expect("vec write");
526 for _ in 0..count {
527 let val = format::read_bytes(&mut self.reader)?;
528 format::write_bytes(&mut payload, &val).expect("vec write");
529 }
530 }
531 TAG_LPOP | TAG_RPOP => {
532 let key = format::read_bytes(&mut self.reader)?;
533 format::write_bytes(&mut payload, &key).expect("vec write");
534 }
535 TAG_ZADD => {
536 let key = format::read_bytes(&mut self.reader)?;
537 format::write_bytes(&mut payload, &key).expect("vec write");
538 let count = format::read_u32(&mut self.reader)?;
539 format::write_u32(&mut payload, count).expect("vec write");
540 for _ in 0..count {
541 let score = format::read_f64(&mut self.reader)?;
542 format::write_f64(&mut payload, score).expect("vec write");
543 let member = format::read_bytes(&mut self.reader)?;
544 format::write_bytes(&mut payload, &member).expect("vec write");
545 }
546 }
547 TAG_ZREM => {
548 let key = format::read_bytes(&mut self.reader)?;
549 format::write_bytes(&mut payload, &key).expect("vec write");
550 let count = format::read_u32(&mut self.reader)?;
551 format::write_u32(&mut payload, count).expect("vec write");
552 for _ in 0..count {
553 let member = format::read_bytes(&mut self.reader)?;
554 format::write_bytes(&mut payload, &member).expect("vec write");
555 }
556 }
557 TAG_PERSIST => {
558 let key = format::read_bytes(&mut self.reader)?;
559 format::write_bytes(&mut payload, &key).expect("vec write");
560 }
561 TAG_PEXPIRE => {
562 let key = format::read_bytes(&mut self.reader)?;
563 format::write_bytes(&mut payload, &key).expect("vec write");
564 let millis = format::read_i64(&mut self.reader)?;
565 format::write_i64(&mut payload, millis).expect("vec write");
566 }
567 TAG_INCR | TAG_DECR => {
568 let key = format::read_bytes(&mut self.reader)?;
569 format::write_bytes(&mut payload, &key).expect("vec write");
570 }
571 _ => return Err(FormatError::UnknownTag(tag)),
572 }
573 let stored_crc = format::read_u32(&mut self.reader)?;
574 Ok((payload, stored_crc))
575 }
576}
577
578pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
580 data_dir.join(format!("shard-{shard_id}.aof"))
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586 fn temp_dir() -> tempfile::TempDir {
587 tempfile::tempdir().expect("create temp dir")
588 }
589
590 #[test]
591 fn record_round_trip_set() {
592 let rec = AofRecord::Set {
593 key: "hello".into(),
594 value: Bytes::from("world"),
595 expire_ms: 5000,
596 };
597 let bytes = rec.to_bytes();
598 let decoded = AofRecord::from_bytes(&bytes).unwrap();
599 assert_eq!(rec, decoded);
600 }
601
602 #[test]
603 fn record_round_trip_del() {
604 let rec = AofRecord::Del { key: "gone".into() };
605 let bytes = rec.to_bytes();
606 let decoded = AofRecord::from_bytes(&bytes).unwrap();
607 assert_eq!(rec, decoded);
608 }
609
610 #[test]
611 fn record_round_trip_expire() {
612 let rec = AofRecord::Expire {
613 key: "ttl".into(),
614 seconds: 300,
615 };
616 let bytes = rec.to_bytes();
617 let decoded = AofRecord::from_bytes(&bytes).unwrap();
618 assert_eq!(rec, decoded);
619 }
620
621 #[test]
622 fn set_with_no_expiry() {
623 let rec = AofRecord::Set {
624 key: "k".into(),
625 value: Bytes::from("v"),
626 expire_ms: -1,
627 };
628 let bytes = rec.to_bytes();
629 let decoded = AofRecord::from_bytes(&bytes).unwrap();
630 assert_eq!(rec, decoded);
631 }
632
633 #[test]
634 fn writer_reader_round_trip() {
635 let dir = temp_dir();
636 let path = dir.path().join("test.aof");
637
638 let records = vec![
639 AofRecord::Set {
640 key: "a".into(),
641 value: Bytes::from("1"),
642 expire_ms: -1,
643 },
644 AofRecord::Set {
645 key: "b".into(),
646 value: Bytes::from("2"),
647 expire_ms: 10_000,
648 },
649 AofRecord::Del { key: "a".into() },
650 AofRecord::Expire {
651 key: "b".into(),
652 seconds: 60,
653 },
654 ];
655
656 {
658 let mut writer = AofWriter::open(&path).unwrap();
659 for rec in &records {
660 writer.write_record(rec).unwrap();
661 }
662 writer.sync().unwrap();
663 }
664
665 let mut reader = AofReader::open(&path).unwrap();
667 let mut got = Vec::new();
668 while let Some(rec) = reader.read_record().unwrap() {
669 got.push(rec);
670 }
671 assert_eq!(records, got);
672 }
673
674 #[test]
675 fn empty_aof_returns_no_records() {
676 let dir = temp_dir();
677 let path = dir.path().join("empty.aof");
678
679 {
681 let _writer = AofWriter::open(&path).unwrap();
682 }
683
684 let mut reader = AofReader::open(&path).unwrap();
685 assert!(reader.read_record().unwrap().is_none());
686 }
687
688 #[test]
689 fn truncated_record_treated_as_eof() {
690 let dir = temp_dir();
691 let path = dir.path().join("trunc.aof");
692
693 {
695 let mut writer = AofWriter::open(&path).unwrap();
696 writer
697 .write_record(&AofRecord::Set {
698 key: "ok".into(),
699 value: Bytes::from("good"),
700 expire_ms: -1,
701 })
702 .unwrap();
703 writer.flush().unwrap();
704 }
705
706 {
708 let mut file = OpenOptions::new().append(true).open(&path).unwrap();
709 file.write_all(&[TAG_SET]).unwrap();
710 }
711
712 let mut reader = AofReader::open(&path).unwrap();
713 let rec = reader.read_record().unwrap().unwrap();
715 assert!(matches!(rec, AofRecord::Set { .. }));
716 assert!(reader.read_record().unwrap().is_none());
718 }
719
720 #[test]
721 fn corrupt_crc_detected() {
722 let dir = temp_dir();
723 let path = dir.path().join("corrupt.aof");
724
725 {
726 let mut writer = AofWriter::open(&path).unwrap();
727 writer
728 .write_record(&AofRecord::Set {
729 key: "k".into(),
730 value: Bytes::from("v"),
731 expire_ms: -1,
732 })
733 .unwrap();
734 writer.flush().unwrap();
735 }
736
737 let mut data = fs::read(&path).unwrap();
739 let last = data.len() - 1;
740 data[last] ^= 0xFF;
741 fs::write(&path, &data).unwrap();
742
743 let mut reader = AofReader::open(&path).unwrap();
744 let err = reader.read_record().unwrap_err();
745 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
746 }
747
748 #[test]
749 fn missing_magic_is_error() {
750 let dir = temp_dir();
751 let path = dir.path().join("bad.aof");
752 fs::write(&path, b"NOT_AOF_DATA").unwrap();
753
754 let err = AofReader::open(&path).unwrap_err();
755 assert!(matches!(err, FormatError::InvalidMagic));
756 }
757
758 #[test]
759 fn truncate_resets_aof() {
760 let dir = temp_dir();
761 let path = dir.path().join("reset.aof");
762
763 {
764 let mut writer = AofWriter::open(&path).unwrap();
765 writer
766 .write_record(&AofRecord::Set {
767 key: "old".into(),
768 value: Bytes::from("data"),
769 expire_ms: -1,
770 })
771 .unwrap();
772 writer.truncate().unwrap();
773
774 writer
776 .write_record(&AofRecord::Set {
777 key: "new".into(),
778 value: Bytes::from("fresh"),
779 expire_ms: -1,
780 })
781 .unwrap();
782 writer.sync().unwrap();
783 }
784
785 let mut reader = AofReader::open(&path).unwrap();
786 let rec = reader.read_record().unwrap().unwrap();
787 match rec {
788 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
789 other => panic!("expected Set, got {other:?}"),
790 }
791 assert!(reader.read_record().unwrap().is_none());
793 }
794
795 #[test]
796 fn record_round_trip_lpush() {
797 let rec = AofRecord::LPush {
798 key: "list".into(),
799 values: vec![Bytes::from("a"), Bytes::from("b")],
800 };
801 let bytes = rec.to_bytes();
802 let decoded = AofRecord::from_bytes(&bytes).unwrap();
803 assert_eq!(rec, decoded);
804 }
805
806 #[test]
807 fn record_round_trip_rpush() {
808 let rec = AofRecord::RPush {
809 key: "list".into(),
810 values: vec![Bytes::from("x")],
811 };
812 let bytes = rec.to_bytes();
813 let decoded = AofRecord::from_bytes(&bytes).unwrap();
814 assert_eq!(rec, decoded);
815 }
816
817 #[test]
818 fn record_round_trip_lpop() {
819 let rec = AofRecord::LPop { key: "list".into() };
820 let bytes = rec.to_bytes();
821 let decoded = AofRecord::from_bytes(&bytes).unwrap();
822 assert_eq!(rec, decoded);
823 }
824
825 #[test]
826 fn record_round_trip_rpop() {
827 let rec = AofRecord::RPop { key: "list".into() };
828 let bytes = rec.to_bytes();
829 let decoded = AofRecord::from_bytes(&bytes).unwrap();
830 assert_eq!(rec, decoded);
831 }
832
833 #[test]
834 fn writer_reader_round_trip_with_list_records() {
835 let dir = temp_dir();
836 let path = dir.path().join("list.aof");
837
838 let records = vec![
839 AofRecord::LPush {
840 key: "l".into(),
841 values: vec![Bytes::from("a"), Bytes::from("b")],
842 },
843 AofRecord::RPush {
844 key: "l".into(),
845 values: vec![Bytes::from("c")],
846 },
847 AofRecord::LPop { key: "l".into() },
848 AofRecord::RPop { key: "l".into() },
849 ];
850
851 {
852 let mut writer = AofWriter::open(&path).unwrap();
853 for rec in &records {
854 writer.write_record(rec).unwrap();
855 }
856 writer.sync().unwrap();
857 }
858
859 let mut reader = AofReader::open(&path).unwrap();
860 let mut got = Vec::new();
861 while let Some(rec) = reader.read_record().unwrap() {
862 got.push(rec);
863 }
864 assert_eq!(records, got);
865 }
866
867 #[test]
868 fn record_round_trip_zadd() {
869 let rec = AofRecord::ZAdd {
870 key: "board".into(),
871 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
872 };
873 let bytes = rec.to_bytes();
874 let decoded = AofRecord::from_bytes(&bytes).unwrap();
875 assert_eq!(rec, decoded);
876 }
877
878 #[test]
879 fn record_round_trip_zrem() {
880 let rec = AofRecord::ZRem {
881 key: "board".into(),
882 members: vec!["alice".into(), "bob".into()],
883 };
884 let bytes = rec.to_bytes();
885 let decoded = AofRecord::from_bytes(&bytes).unwrap();
886 assert_eq!(rec, decoded);
887 }
888
889 #[test]
890 fn writer_reader_round_trip_with_sorted_set_records() {
891 let dir = temp_dir();
892 let path = dir.path().join("zset.aof");
893
894 let records = vec![
895 AofRecord::ZAdd {
896 key: "board".into(),
897 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
898 },
899 AofRecord::ZRem {
900 key: "board".into(),
901 members: vec!["alice".into()],
902 },
903 ];
904
905 {
906 let mut writer = AofWriter::open(&path).unwrap();
907 for rec in &records {
908 writer.write_record(rec).unwrap();
909 }
910 writer.sync().unwrap();
911 }
912
913 let mut reader = AofReader::open(&path).unwrap();
914 let mut got = Vec::new();
915 while let Some(rec) = reader.read_record().unwrap() {
916 got.push(rec);
917 }
918 assert_eq!(records, got);
919 }
920
921 #[test]
922 fn record_round_trip_persist() {
923 let rec = AofRecord::Persist {
924 key: "mykey".into(),
925 };
926 let bytes = rec.to_bytes();
927 let decoded = AofRecord::from_bytes(&bytes).unwrap();
928 assert_eq!(rec, decoded);
929 }
930
931 #[test]
932 fn record_round_trip_pexpire() {
933 let rec = AofRecord::Pexpire {
934 key: "mykey".into(),
935 milliseconds: 5000,
936 };
937 let bytes = rec.to_bytes();
938 let decoded = AofRecord::from_bytes(&bytes).unwrap();
939 assert_eq!(rec, decoded);
940 }
941
942 #[test]
943 fn record_round_trip_incr() {
944 let rec = AofRecord::Incr {
945 key: "counter".into(),
946 };
947 let bytes = rec.to_bytes();
948 let decoded = AofRecord::from_bytes(&bytes).unwrap();
949 assert_eq!(rec, decoded);
950 }
951
952 #[test]
953 fn record_round_trip_decr() {
954 let rec = AofRecord::Decr {
955 key: "counter".into(),
956 };
957 let bytes = rec.to_bytes();
958 let decoded = AofRecord::from_bytes(&bytes).unwrap();
959 assert_eq!(rec, decoded);
960 }
961
962 #[test]
963 fn writer_reader_round_trip_with_persist_pexpire() {
964 let dir = temp_dir();
965 let path = dir.path().join("persist_pexpire.aof");
966
967 let records = vec![
968 AofRecord::Set {
969 key: "k".into(),
970 value: Bytes::from("v"),
971 expire_ms: 5000,
972 },
973 AofRecord::Persist { key: "k".into() },
974 AofRecord::Pexpire {
975 key: "k".into(),
976 milliseconds: 3000,
977 },
978 ];
979
980 {
981 let mut writer = AofWriter::open(&path).unwrap();
982 for rec in &records {
983 writer.write_record(rec).unwrap();
984 }
985 writer.sync().unwrap();
986 }
987
988 let mut reader = AofReader::open(&path).unwrap();
989 let mut got = Vec::new();
990 while let Some(rec) = reader.read_record().unwrap() {
991 got.push(rec);
992 }
993 assert_eq!(records, got);
994 }
995
996 #[test]
997 fn aof_path_format() {
998 let p = aof_path(Path::new("/data"), 3);
999 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1000 }
1001
1002 #[test]
1003 fn record_round_trip_hset() {
1004 let rec = AofRecord::HSet {
1005 key: "hash".into(),
1006 fields: vec![
1007 ("f1".into(), Bytes::from("v1")),
1008 ("f2".into(), Bytes::from("v2")),
1009 ],
1010 };
1011 let bytes = rec.to_bytes();
1012 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1013 assert_eq!(rec, decoded);
1014 }
1015
1016 #[test]
1017 fn record_round_trip_hdel() {
1018 let rec = AofRecord::HDel {
1019 key: "hash".into(),
1020 fields: vec!["f1".into(), "f2".into()],
1021 };
1022 let bytes = rec.to_bytes();
1023 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1024 assert_eq!(rec, decoded);
1025 }
1026
1027 #[test]
1028 fn record_round_trip_hincrby() {
1029 let rec = AofRecord::HIncrBy {
1030 key: "hash".into(),
1031 field: "counter".into(),
1032 delta: -42,
1033 };
1034 let bytes = rec.to_bytes();
1035 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1036 assert_eq!(rec, decoded);
1037 }
1038
1039 #[test]
1040 fn record_round_trip_sadd() {
1041 let rec = AofRecord::SAdd {
1042 key: "set".into(),
1043 members: vec!["m1".into(), "m2".into(), "m3".into()],
1044 };
1045 let bytes = rec.to_bytes();
1046 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1047 assert_eq!(rec, decoded);
1048 }
1049
1050 #[test]
1051 fn record_round_trip_srem() {
1052 let rec = AofRecord::SRem {
1053 key: "set".into(),
1054 members: vec!["m1".into()],
1055 };
1056 let bytes = rec.to_bytes();
1057 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1058 assert_eq!(rec, decoded);
1059 }
1060}