Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27/// Reads a length-prefixed field and decodes it as UTF-8.
28fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29    let bytes = format::read_bytes(r)?;
30    String::from_utf8(bytes).map_err(|_| {
31        FormatError::Io(io::Error::new(
32            io::ErrorKind::InvalidData,
33            format!("{field} is not valid utf-8"),
34        ))
35    })
36}
37
38/// Record tags for the AOF format.
39const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52
53/// A single mutation record stored in the AOF.
54#[derive(Debug, Clone, PartialEq)]
55pub enum AofRecord {
56    /// SET key value [expire_ms]. expire_ms is -1 for no expiration.
57    Set {
58        key: String,
59        value: Bytes,
60        expire_ms: i64,
61    },
62    /// DEL key.
63    Del { key: String },
64    /// EXPIRE key seconds.
65    Expire { key: String, seconds: u64 },
66    /// LPUSH key value [value ...].
67    LPush { key: String, values: Vec<Bytes> },
68    /// RPUSH key value [value ...].
69    RPush { key: String, values: Vec<Bytes> },
70    /// LPOP key.
71    LPop { key: String },
72    /// RPOP key.
73    RPop { key: String },
74    /// ZADD key score member [score member ...].
75    ZAdd {
76        key: String,
77        members: Vec<(f64, String)>,
78    },
79    /// ZREM key member [member ...].
80    ZRem { key: String, members: Vec<String> },
81    /// PERSIST key — remove expiration.
82    Persist { key: String },
83    /// PEXPIRE key milliseconds.
84    Pexpire { key: String, milliseconds: u64 },
85    /// INCR key.
86    Incr { key: String },
87    /// DECR key.
88    Decr { key: String },
89}
90
91impl AofRecord {
92    /// Serializes this record into a byte vector (tag + payload, no CRC).
93    fn to_bytes(&self) -> Vec<u8> {
94        let mut buf = Vec::new();
95        match self {
96            AofRecord::Set {
97                key,
98                value,
99                expire_ms,
100            } => {
101                format::write_u8(&mut buf, TAG_SET).expect("vec write");
102                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
103                format::write_bytes(&mut buf, value).expect("vec write");
104                format::write_i64(&mut buf, *expire_ms).expect("vec write");
105            }
106            AofRecord::Del { key } => {
107                format::write_u8(&mut buf, TAG_DEL).expect("vec write");
108                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
109            }
110            AofRecord::Expire { key, seconds } => {
111                format::write_u8(&mut buf, TAG_EXPIRE).expect("vec write");
112                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
113                format::write_i64(&mut buf, *seconds as i64).expect("vec write");
114            }
115            AofRecord::LPush { key, values } => {
116                format::write_u8(&mut buf, TAG_LPUSH).expect("vec write");
117                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
118                format::write_u32(&mut buf, values.len() as u32).expect("vec write");
119                for v in values {
120                    format::write_bytes(&mut buf, v).expect("vec write");
121                }
122            }
123            AofRecord::RPush { key, values } => {
124                format::write_u8(&mut buf, TAG_RPUSH).expect("vec write");
125                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
126                format::write_u32(&mut buf, values.len() as u32).expect("vec write");
127                for v in values {
128                    format::write_bytes(&mut buf, v).expect("vec write");
129                }
130            }
131            AofRecord::LPop { key } => {
132                format::write_u8(&mut buf, TAG_LPOP).expect("vec write");
133                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
134            }
135            AofRecord::RPop { key } => {
136                format::write_u8(&mut buf, TAG_RPOP).expect("vec write");
137                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
138            }
139            AofRecord::ZAdd { key, members } => {
140                format::write_u8(&mut buf, TAG_ZADD).expect("vec write");
141                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
142                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
143                for (score, member) in members {
144                    format::write_f64(&mut buf, *score).expect("vec write");
145                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
146                }
147            }
148            AofRecord::ZRem { key, members } => {
149                format::write_u8(&mut buf, TAG_ZREM).expect("vec write");
150                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
151                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
152                for member in members {
153                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
154                }
155            }
156            AofRecord::Persist { key } => {
157                format::write_u8(&mut buf, TAG_PERSIST).expect("vec write");
158                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
159            }
160            AofRecord::Pexpire { key, milliseconds } => {
161                format::write_u8(&mut buf, TAG_PEXPIRE).expect("vec write");
162                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
163                format::write_i64(&mut buf, *milliseconds as i64).expect("vec write");
164            }
165            AofRecord::Incr { key } => {
166                format::write_u8(&mut buf, TAG_INCR).expect("vec write");
167                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
168            }
169            AofRecord::Decr { key } => {
170                format::write_u8(&mut buf, TAG_DECR).expect("vec write");
171                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
172            }
173        }
174        buf
175    }
176
177    /// Deserializes a record from a byte slice (tag + payload, no CRC).
178    fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
179        let mut cursor = io::Cursor::new(data);
180        let tag = format::read_u8(&mut cursor)?;
181        match tag {
182            TAG_SET => {
183                let key = read_string(&mut cursor, "key")?;
184                let value = format::read_bytes(&mut cursor)?;
185                let expire_ms = format::read_i64(&mut cursor)?;
186                Ok(AofRecord::Set {
187                    key,
188                    value: Bytes::from(value),
189                    expire_ms,
190                })
191            }
192            TAG_DEL => {
193                let key = read_string(&mut cursor, "key")?;
194                Ok(AofRecord::Del { key })
195            }
196            TAG_EXPIRE => {
197                let key = read_string(&mut cursor, "key")?;
198                let seconds = format::read_i64(&mut cursor)? as u64;
199                Ok(AofRecord::Expire { key, seconds })
200            }
201            TAG_LPUSH | TAG_RPUSH => {
202                let key = read_string(&mut cursor, "key")?;
203                let count = format::read_u32(&mut cursor)?;
204                let mut values = Vec::with_capacity(count as usize);
205                for _ in 0..count {
206                    values.push(Bytes::from(format::read_bytes(&mut cursor)?));
207                }
208                if tag == TAG_LPUSH {
209                    Ok(AofRecord::LPush { key, values })
210                } else {
211                    Ok(AofRecord::RPush { key, values })
212                }
213            }
214            TAG_LPOP => {
215                let key = read_string(&mut cursor, "key")?;
216                Ok(AofRecord::LPop { key })
217            }
218            TAG_RPOP => {
219                let key = read_string(&mut cursor, "key")?;
220                Ok(AofRecord::RPop { key })
221            }
222            TAG_ZADD => {
223                let key = read_string(&mut cursor, "key")?;
224                let count = format::read_u32(&mut cursor)?;
225                let mut members = Vec::with_capacity(count as usize);
226                for _ in 0..count {
227                    let score = format::read_f64(&mut cursor)?;
228                    let member = read_string(&mut cursor, "member")?;
229                    members.push((score, member));
230                }
231                Ok(AofRecord::ZAdd { key, members })
232            }
233            TAG_ZREM => {
234                let key = read_string(&mut cursor, "key")?;
235                let count = format::read_u32(&mut cursor)?;
236                let mut members = Vec::with_capacity(count as usize);
237                for _ in 0..count {
238                    members.push(read_string(&mut cursor, "member")?);
239                }
240                Ok(AofRecord::ZRem { key, members })
241            }
242            TAG_PERSIST => {
243                let key = read_string(&mut cursor, "key")?;
244                Ok(AofRecord::Persist { key })
245            }
246            TAG_PEXPIRE => {
247                let key = read_string(&mut cursor, "key")?;
248                let milliseconds = format::read_i64(&mut cursor)? as u64;
249                Ok(AofRecord::Pexpire { key, milliseconds })
250            }
251            TAG_INCR => {
252                let key = read_string(&mut cursor, "key")?;
253                Ok(AofRecord::Incr { key })
254            }
255            TAG_DECR => {
256                let key = read_string(&mut cursor, "key")?;
257                Ok(AofRecord::Decr { key })
258            }
259            _ => Err(FormatError::UnknownTag(tag)),
260        }
261    }
262}
263
264/// Configurable fsync policy for the AOF writer.
265#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
266pub enum FsyncPolicy {
267    /// fsync after every write. safest, slowest.
268    Always,
269    /// fsync once per second. the shard tick drives this.
270    #[default]
271    EverySec,
272    /// let the OS decide when to flush. fastest, least durable.
273    No,
274}
275
276/// Buffered writer for appending AOF records to a file.
277pub struct AofWriter {
278    writer: BufWriter<File>,
279    path: PathBuf,
280}
281
282impl AofWriter {
283    /// Opens (or creates) an AOF file. If the file is new, writes the header.
284    /// If the file already exists, appends to it.
285    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
286        let path = path.into();
287        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
288
289        let file = OpenOptions::new().create(true).append(true).open(&path)?;
290        let mut writer = BufWriter::new(file);
291
292        if !exists {
293            format::write_header(&mut writer, format::AOF_MAGIC)?;
294            writer.flush()?;
295        }
296
297        Ok(Self { writer, path })
298    }
299
300    /// Appends a record to the AOF. Writes tag+payload+crc32.
301    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
302        let payload = record.to_bytes();
303        let checksum = format::crc32(&payload);
304        self.writer.write_all(&payload)?;
305        format::write_u32(&mut self.writer, checksum)?;
306        Ok(())
307    }
308
309    /// Flushes the internal buffer to the OS.
310    pub fn flush(&mut self) -> Result<(), FormatError> {
311        self.writer.flush()?;
312        Ok(())
313    }
314
315    /// Flushes and fsyncs the file to disk.
316    pub fn sync(&mut self) -> Result<(), FormatError> {
317        self.writer.flush()?;
318        self.writer.get_ref().sync_all()?;
319        Ok(())
320    }
321
322    /// Returns the file path.
323    pub fn path(&self) -> &Path {
324        &self.path
325    }
326
327    /// Truncates the AOF file back to just the header.
328    /// Used after a successful snapshot to reset the log.
329    pub fn truncate(&mut self) -> Result<(), FormatError> {
330        // flush and drop the old writer
331        self.writer.flush()?;
332
333        // reopen the file with truncation, write fresh header
334        let file = OpenOptions::new()
335            .create(true)
336            .write(true)
337            .truncate(true)
338            .open(&self.path)?;
339        let mut writer = BufWriter::new(file);
340        format::write_header(&mut writer, format::AOF_MAGIC)?;
341        writer.flush()?;
342        self.writer = writer;
343        Ok(())
344    }
345}
346
347/// Reader for iterating over AOF records.
348#[derive(Debug)]
349pub struct AofReader {
350    reader: BufReader<File>,
351}
352
353impl AofReader {
354    /// Opens an AOF file and validates the header.
355    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
356        let file = File::open(path.as_ref())?;
357        let mut reader = BufReader::new(file);
358        let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
359        Ok(Self { reader })
360    }
361
362    /// Reads the next record from the AOF.
363    ///
364    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
365    /// server crashed mid-write), returns `Ok(None)` rather than an error
366    /// — this is the expected recovery behavior.
367    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
368        // peek for EOF — try reading the tag byte
369        let tag = match format::read_u8(&mut self.reader) {
370            Ok(t) => t,
371            Err(FormatError::UnexpectedEof) => return Ok(None),
372            Err(e) => return Err(e),
373        };
374
375        // read the rest of the payload based on tag, building the full
376        // record bytes for CRC verification
377        let record_result = self.read_payload_for_tag(tag);
378        match record_result {
379            Ok((payload, stored_crc)) => {
380                // prepend the tag to the payload for CRC check
381                let mut full = Vec::with_capacity(1 + payload.len());
382                full.push(tag);
383                full.extend_from_slice(&payload);
384                format::verify_crc32(&full, stored_crc)?;
385                AofRecord::from_bytes(&full).map(Some)
386            }
387            // truncated record — treat as end of usable data
388            Err(FormatError::UnexpectedEof) => Ok(None),
389            Err(e) => Err(e),
390        }
391    }
392
393    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
394    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
395        let mut payload = Vec::new();
396        match tag {
397            TAG_SET => {
398                // key_len + key + value_len + value + expire_ms
399                let key = format::read_bytes(&mut self.reader)?;
400                format::write_bytes(&mut payload, &key).expect("vec write");
401                let value = format::read_bytes(&mut self.reader)?;
402                format::write_bytes(&mut payload, &value).expect("vec write");
403                let expire_ms = format::read_i64(&mut self.reader)?;
404                format::write_i64(&mut payload, expire_ms).expect("vec write");
405            }
406            TAG_DEL => {
407                let key = format::read_bytes(&mut self.reader)?;
408                format::write_bytes(&mut payload, &key).expect("vec write");
409            }
410            TAG_EXPIRE => {
411                let key = format::read_bytes(&mut self.reader)?;
412                format::write_bytes(&mut payload, &key).expect("vec write");
413                let seconds = format::read_i64(&mut self.reader)?;
414                format::write_i64(&mut payload, seconds).expect("vec write");
415            }
416            TAG_LPUSH | TAG_RPUSH => {
417                let key = format::read_bytes(&mut self.reader)?;
418                format::write_bytes(&mut payload, &key).expect("vec write");
419                let count = format::read_u32(&mut self.reader)?;
420                format::write_u32(&mut payload, count).expect("vec write");
421                for _ in 0..count {
422                    let val = format::read_bytes(&mut self.reader)?;
423                    format::write_bytes(&mut payload, &val).expect("vec write");
424                }
425            }
426            TAG_LPOP | TAG_RPOP => {
427                let key = format::read_bytes(&mut self.reader)?;
428                format::write_bytes(&mut payload, &key).expect("vec write");
429            }
430            TAG_ZADD => {
431                let key = format::read_bytes(&mut self.reader)?;
432                format::write_bytes(&mut payload, &key).expect("vec write");
433                let count = format::read_u32(&mut self.reader)?;
434                format::write_u32(&mut payload, count).expect("vec write");
435                for _ in 0..count {
436                    let score = format::read_f64(&mut self.reader)?;
437                    format::write_f64(&mut payload, score).expect("vec write");
438                    let member = format::read_bytes(&mut self.reader)?;
439                    format::write_bytes(&mut payload, &member).expect("vec write");
440                }
441            }
442            TAG_ZREM => {
443                let key = format::read_bytes(&mut self.reader)?;
444                format::write_bytes(&mut payload, &key).expect("vec write");
445                let count = format::read_u32(&mut self.reader)?;
446                format::write_u32(&mut payload, count).expect("vec write");
447                for _ in 0..count {
448                    let member = format::read_bytes(&mut self.reader)?;
449                    format::write_bytes(&mut payload, &member).expect("vec write");
450                }
451            }
452            TAG_PERSIST => {
453                let key = format::read_bytes(&mut self.reader)?;
454                format::write_bytes(&mut payload, &key).expect("vec write");
455            }
456            TAG_PEXPIRE => {
457                let key = format::read_bytes(&mut self.reader)?;
458                format::write_bytes(&mut payload, &key).expect("vec write");
459                let millis = format::read_i64(&mut self.reader)?;
460                format::write_i64(&mut payload, millis).expect("vec write");
461            }
462            TAG_INCR | TAG_DECR => {
463                let key = format::read_bytes(&mut self.reader)?;
464                format::write_bytes(&mut payload, &key).expect("vec write");
465            }
466            _ => return Err(FormatError::UnknownTag(tag)),
467        }
468        let stored_crc = format::read_u32(&mut self.reader)?;
469        Ok((payload, stored_crc))
470    }
471}
472
473/// Returns the AOF file path for a given shard in a data directory.
474pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
475    data_dir.join(format!("shard-{shard_id}.aof"))
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481    fn temp_dir() -> tempfile::TempDir {
482        tempfile::tempdir().expect("create temp dir")
483    }
484
485    #[test]
486    fn record_round_trip_set() {
487        let rec = AofRecord::Set {
488            key: "hello".into(),
489            value: Bytes::from("world"),
490            expire_ms: 5000,
491        };
492        let bytes = rec.to_bytes();
493        let decoded = AofRecord::from_bytes(&bytes).unwrap();
494        assert_eq!(rec, decoded);
495    }
496
497    #[test]
498    fn record_round_trip_del() {
499        let rec = AofRecord::Del { key: "gone".into() };
500        let bytes = rec.to_bytes();
501        let decoded = AofRecord::from_bytes(&bytes).unwrap();
502        assert_eq!(rec, decoded);
503    }
504
505    #[test]
506    fn record_round_trip_expire() {
507        let rec = AofRecord::Expire {
508            key: "ttl".into(),
509            seconds: 300,
510        };
511        let bytes = rec.to_bytes();
512        let decoded = AofRecord::from_bytes(&bytes).unwrap();
513        assert_eq!(rec, decoded);
514    }
515
516    #[test]
517    fn set_with_no_expiry() {
518        let rec = AofRecord::Set {
519            key: "k".into(),
520            value: Bytes::from("v"),
521            expire_ms: -1,
522        };
523        let bytes = rec.to_bytes();
524        let decoded = AofRecord::from_bytes(&bytes).unwrap();
525        assert_eq!(rec, decoded);
526    }
527
528    #[test]
529    fn writer_reader_round_trip() {
530        let dir = temp_dir();
531        let path = dir.path().join("test.aof");
532
533        let records = vec![
534            AofRecord::Set {
535                key: "a".into(),
536                value: Bytes::from("1"),
537                expire_ms: -1,
538            },
539            AofRecord::Set {
540                key: "b".into(),
541                value: Bytes::from("2"),
542                expire_ms: 10_000,
543            },
544            AofRecord::Del { key: "a".into() },
545            AofRecord::Expire {
546                key: "b".into(),
547                seconds: 60,
548            },
549        ];
550
551        // write
552        {
553            let mut writer = AofWriter::open(&path).unwrap();
554            for rec in &records {
555                writer.write_record(rec).unwrap();
556            }
557            writer.sync().unwrap();
558        }
559
560        // read back
561        let mut reader = AofReader::open(&path).unwrap();
562        let mut got = Vec::new();
563        while let Some(rec) = reader.read_record().unwrap() {
564            got.push(rec);
565        }
566        assert_eq!(records, got);
567    }
568
569    #[test]
570    fn empty_aof_returns_no_records() {
571        let dir = temp_dir();
572        let path = dir.path().join("empty.aof");
573
574        // just write the header
575        {
576            let _writer = AofWriter::open(&path).unwrap();
577        }
578
579        let mut reader = AofReader::open(&path).unwrap();
580        assert!(reader.read_record().unwrap().is_none());
581    }
582
583    #[test]
584    fn truncated_record_treated_as_eof() {
585        let dir = temp_dir();
586        let path = dir.path().join("trunc.aof");
587
588        // write one good record, then append garbage (simulating a crash)
589        {
590            let mut writer = AofWriter::open(&path).unwrap();
591            writer
592                .write_record(&AofRecord::Set {
593                    key: "ok".into(),
594                    value: Bytes::from("good"),
595                    expire_ms: -1,
596                })
597                .unwrap();
598            writer.flush().unwrap();
599        }
600
601        // append a partial tag with no payload
602        {
603            let mut file = OpenOptions::new().append(true).open(&path).unwrap();
604            file.write_all(&[TAG_SET]).unwrap();
605        }
606
607        let mut reader = AofReader::open(&path).unwrap();
608        // first record should be fine
609        let rec = reader.read_record().unwrap().unwrap();
610        assert!(matches!(rec, AofRecord::Set { .. }));
611        // second should be None (truncated)
612        assert!(reader.read_record().unwrap().is_none());
613    }
614
615    #[test]
616    fn corrupt_crc_detected() {
617        let dir = temp_dir();
618        let path = dir.path().join("corrupt.aof");
619
620        {
621            let mut writer = AofWriter::open(&path).unwrap();
622            writer
623                .write_record(&AofRecord::Set {
624                    key: "k".into(),
625                    value: Bytes::from("v"),
626                    expire_ms: -1,
627                })
628                .unwrap();
629            writer.flush().unwrap();
630        }
631
632        // corrupt the last byte (part of the CRC)
633        let mut data = fs::read(&path).unwrap();
634        let last = data.len() - 1;
635        data[last] ^= 0xFF;
636        fs::write(&path, &data).unwrap();
637
638        let mut reader = AofReader::open(&path).unwrap();
639        let err = reader.read_record().unwrap_err();
640        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
641    }
642
643    #[test]
644    fn missing_magic_is_error() {
645        let dir = temp_dir();
646        let path = dir.path().join("bad.aof");
647        fs::write(&path, b"NOT_AOF_DATA").unwrap();
648
649        let err = AofReader::open(&path).unwrap_err();
650        assert!(matches!(err, FormatError::InvalidMagic));
651    }
652
653    #[test]
654    fn truncate_resets_aof() {
655        let dir = temp_dir();
656        let path = dir.path().join("reset.aof");
657
658        {
659            let mut writer = AofWriter::open(&path).unwrap();
660            writer
661                .write_record(&AofRecord::Set {
662                    key: "old".into(),
663                    value: Bytes::from("data"),
664                    expire_ms: -1,
665                })
666                .unwrap();
667            writer.truncate().unwrap();
668
669            // write a new record after truncation
670            writer
671                .write_record(&AofRecord::Set {
672                    key: "new".into(),
673                    value: Bytes::from("fresh"),
674                    expire_ms: -1,
675                })
676                .unwrap();
677            writer.sync().unwrap();
678        }
679
680        let mut reader = AofReader::open(&path).unwrap();
681        let rec = reader.read_record().unwrap().unwrap();
682        match rec {
683            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
684            other => panic!("expected Set, got {other:?}"),
685        }
686        // only one record after truncation
687        assert!(reader.read_record().unwrap().is_none());
688    }
689
690    #[test]
691    fn record_round_trip_lpush() {
692        let rec = AofRecord::LPush {
693            key: "list".into(),
694            values: vec![Bytes::from("a"), Bytes::from("b")],
695        };
696        let bytes = rec.to_bytes();
697        let decoded = AofRecord::from_bytes(&bytes).unwrap();
698        assert_eq!(rec, decoded);
699    }
700
701    #[test]
702    fn record_round_trip_rpush() {
703        let rec = AofRecord::RPush {
704            key: "list".into(),
705            values: vec![Bytes::from("x")],
706        };
707        let bytes = rec.to_bytes();
708        let decoded = AofRecord::from_bytes(&bytes).unwrap();
709        assert_eq!(rec, decoded);
710    }
711
712    #[test]
713    fn record_round_trip_lpop() {
714        let rec = AofRecord::LPop { key: "list".into() };
715        let bytes = rec.to_bytes();
716        let decoded = AofRecord::from_bytes(&bytes).unwrap();
717        assert_eq!(rec, decoded);
718    }
719
720    #[test]
721    fn record_round_trip_rpop() {
722        let rec = AofRecord::RPop { key: "list".into() };
723        let bytes = rec.to_bytes();
724        let decoded = AofRecord::from_bytes(&bytes).unwrap();
725        assert_eq!(rec, decoded);
726    }
727
728    #[test]
729    fn writer_reader_round_trip_with_list_records() {
730        let dir = temp_dir();
731        let path = dir.path().join("list.aof");
732
733        let records = vec![
734            AofRecord::LPush {
735                key: "l".into(),
736                values: vec![Bytes::from("a"), Bytes::from("b")],
737            },
738            AofRecord::RPush {
739                key: "l".into(),
740                values: vec![Bytes::from("c")],
741            },
742            AofRecord::LPop { key: "l".into() },
743            AofRecord::RPop { key: "l".into() },
744        ];
745
746        {
747            let mut writer = AofWriter::open(&path).unwrap();
748            for rec in &records {
749                writer.write_record(rec).unwrap();
750            }
751            writer.sync().unwrap();
752        }
753
754        let mut reader = AofReader::open(&path).unwrap();
755        let mut got = Vec::new();
756        while let Some(rec) = reader.read_record().unwrap() {
757            got.push(rec);
758        }
759        assert_eq!(records, got);
760    }
761
762    #[test]
763    fn record_round_trip_zadd() {
764        let rec = AofRecord::ZAdd {
765            key: "board".into(),
766            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
767        };
768        let bytes = rec.to_bytes();
769        let decoded = AofRecord::from_bytes(&bytes).unwrap();
770        assert_eq!(rec, decoded);
771    }
772
773    #[test]
774    fn record_round_trip_zrem() {
775        let rec = AofRecord::ZRem {
776            key: "board".into(),
777            members: vec!["alice".into(), "bob".into()],
778        };
779        let bytes = rec.to_bytes();
780        let decoded = AofRecord::from_bytes(&bytes).unwrap();
781        assert_eq!(rec, decoded);
782    }
783
784    #[test]
785    fn writer_reader_round_trip_with_sorted_set_records() {
786        let dir = temp_dir();
787        let path = dir.path().join("zset.aof");
788
789        let records = vec![
790            AofRecord::ZAdd {
791                key: "board".into(),
792                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
793            },
794            AofRecord::ZRem {
795                key: "board".into(),
796                members: vec!["alice".into()],
797            },
798        ];
799
800        {
801            let mut writer = AofWriter::open(&path).unwrap();
802            for rec in &records {
803                writer.write_record(rec).unwrap();
804            }
805            writer.sync().unwrap();
806        }
807
808        let mut reader = AofReader::open(&path).unwrap();
809        let mut got = Vec::new();
810        while let Some(rec) = reader.read_record().unwrap() {
811            got.push(rec);
812        }
813        assert_eq!(records, got);
814    }
815
816    #[test]
817    fn record_round_trip_persist() {
818        let rec = AofRecord::Persist {
819            key: "mykey".into(),
820        };
821        let bytes = rec.to_bytes();
822        let decoded = AofRecord::from_bytes(&bytes).unwrap();
823        assert_eq!(rec, decoded);
824    }
825
826    #[test]
827    fn record_round_trip_pexpire() {
828        let rec = AofRecord::Pexpire {
829            key: "mykey".into(),
830            milliseconds: 5000,
831        };
832        let bytes = rec.to_bytes();
833        let decoded = AofRecord::from_bytes(&bytes).unwrap();
834        assert_eq!(rec, decoded);
835    }
836
837    #[test]
838    fn record_round_trip_incr() {
839        let rec = AofRecord::Incr {
840            key: "counter".into(),
841        };
842        let bytes = rec.to_bytes();
843        let decoded = AofRecord::from_bytes(&bytes).unwrap();
844        assert_eq!(rec, decoded);
845    }
846
847    #[test]
848    fn record_round_trip_decr() {
849        let rec = AofRecord::Decr {
850            key: "counter".into(),
851        };
852        let bytes = rec.to_bytes();
853        let decoded = AofRecord::from_bytes(&bytes).unwrap();
854        assert_eq!(rec, decoded);
855    }
856
857    #[test]
858    fn writer_reader_round_trip_with_persist_pexpire() {
859        let dir = temp_dir();
860        let path = dir.path().join("persist_pexpire.aof");
861
862        let records = vec![
863            AofRecord::Set {
864                key: "k".into(),
865                value: Bytes::from("v"),
866                expire_ms: 5000,
867            },
868            AofRecord::Persist { key: "k".into() },
869            AofRecord::Pexpire {
870                key: "k".into(),
871                milliseconds: 3000,
872            },
873        ];
874
875        {
876            let mut writer = AofWriter::open(&path).unwrap();
877            for rec in &records {
878                writer.write_record(rec).unwrap();
879            }
880            writer.sync().unwrap();
881        }
882
883        let mut reader = AofReader::open(&path).unwrap();
884        let mut got = Vec::new();
885        while let Some(rec) = reader.read_record().unwrap() {
886            got.push(rec);
887        }
888        assert_eq!(records, got);
889    }
890
891    #[test]
892    fn aof_path_format() {
893        let p = aof_path(Path::new("/data"), 3);
894        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
895    }
896}