Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27/// Reads a length-prefixed field and decodes it as UTF-8.
28fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29    let bytes = format::read_bytes(r)?;
30    String::from_utf8(bytes).map_err(|_| {
31        FormatError::Io(io::Error::new(
32            io::ErrorKind::InvalidData,
33            format!("{field} is not valid utf-8"),
34        ))
35    })
36}
37
38/// Record tags for the AOF format.
39const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57
58/// A single mutation record stored in the AOF.
59#[derive(Debug, Clone, PartialEq)]
60pub enum AofRecord {
61    /// SET key value [expire_ms]. expire_ms is -1 for no expiration.
62    Set {
63        key: String,
64        value: Bytes,
65        expire_ms: i64,
66    },
67    /// DEL key.
68    Del { key: String },
69    /// EXPIRE key seconds.
70    Expire { key: String, seconds: u64 },
71    /// LPUSH key value [value ...].
72    LPush { key: String, values: Vec<Bytes> },
73    /// RPUSH key value [value ...].
74    RPush { key: String, values: Vec<Bytes> },
75    /// LPOP key.
76    LPop { key: String },
77    /// RPOP key.
78    RPop { key: String },
79    /// ZADD key score member [score member ...].
80    ZAdd {
81        key: String,
82        members: Vec<(f64, String)>,
83    },
84    /// ZREM key member [member ...].
85    ZRem { key: String, members: Vec<String> },
86    /// PERSIST key — remove expiration.
87    Persist { key: String },
88    /// PEXPIRE key milliseconds.
89    Pexpire { key: String, milliseconds: u64 },
90    /// INCR key.
91    Incr { key: String },
92    /// DECR key.
93    Decr { key: String },
94    /// HSET key field value [field value ...].
95    HSet {
96        key: String,
97        fields: Vec<(String, Bytes)>,
98    },
99    /// HDEL key field [field ...].
100    HDel { key: String, fields: Vec<String> },
101    /// HINCRBY key field delta.
102    HIncrBy {
103        key: String,
104        field: String,
105        delta: i64,
106    },
107    /// SADD key member [member ...].
108    SAdd { key: String, members: Vec<String> },
109    /// SREM key member [member ...].
110    SRem { key: String, members: Vec<String> },
111}
112
113impl AofRecord {
114    /// Serializes this record into a byte vector (tag + payload, no CRC).
115    fn to_bytes(&self) -> Vec<u8> {
116        let mut buf = Vec::new();
117        match self {
118            AofRecord::Set {
119                key,
120                value,
121                expire_ms,
122            } => {
123                format::write_u8(&mut buf, TAG_SET).expect("vec write");
124                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
125                format::write_bytes(&mut buf, value).expect("vec write");
126                format::write_i64(&mut buf, *expire_ms).expect("vec write");
127            }
128            AofRecord::Del { key } => {
129                format::write_u8(&mut buf, TAG_DEL).expect("vec write");
130                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
131            }
132            AofRecord::Expire { key, seconds } => {
133                format::write_u8(&mut buf, TAG_EXPIRE).expect("vec write");
134                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
135                format::write_i64(&mut buf, *seconds as i64).expect("vec write");
136            }
137            AofRecord::LPush { key, values } => {
138                format::write_u8(&mut buf, TAG_LPUSH).expect("vec write");
139                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
140                format::write_u32(&mut buf, values.len() as u32).expect("vec write");
141                for v in values {
142                    format::write_bytes(&mut buf, v).expect("vec write");
143                }
144            }
145            AofRecord::RPush { key, values } => {
146                format::write_u8(&mut buf, TAG_RPUSH).expect("vec write");
147                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
148                format::write_u32(&mut buf, values.len() as u32).expect("vec write");
149                for v in values {
150                    format::write_bytes(&mut buf, v).expect("vec write");
151                }
152            }
153            AofRecord::LPop { key } => {
154                format::write_u8(&mut buf, TAG_LPOP).expect("vec write");
155                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
156            }
157            AofRecord::RPop { key } => {
158                format::write_u8(&mut buf, TAG_RPOP).expect("vec write");
159                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
160            }
161            AofRecord::ZAdd { key, members } => {
162                format::write_u8(&mut buf, TAG_ZADD).expect("vec write");
163                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
164                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
165                for (score, member) in members {
166                    format::write_f64(&mut buf, *score).expect("vec write");
167                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
168                }
169            }
170            AofRecord::ZRem { key, members } => {
171                format::write_u8(&mut buf, TAG_ZREM).expect("vec write");
172                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
173                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
174                for member in members {
175                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
176                }
177            }
178            AofRecord::Persist { key } => {
179                format::write_u8(&mut buf, TAG_PERSIST).expect("vec write");
180                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
181            }
182            AofRecord::Pexpire { key, milliseconds } => {
183                format::write_u8(&mut buf, TAG_PEXPIRE).expect("vec write");
184                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
185                format::write_i64(&mut buf, *milliseconds as i64).expect("vec write");
186            }
187            AofRecord::Incr { key } => {
188                format::write_u8(&mut buf, TAG_INCR).expect("vec write");
189                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
190            }
191            AofRecord::Decr { key } => {
192                format::write_u8(&mut buf, TAG_DECR).expect("vec write");
193                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
194            }
195            AofRecord::HSet { key, fields } => {
196                format::write_u8(&mut buf, TAG_HSET).expect("vec write");
197                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
198                format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
199                for (field, value) in fields {
200                    format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
201                    format::write_bytes(&mut buf, value).expect("vec write");
202                }
203            }
204            AofRecord::HDel { key, fields } => {
205                format::write_u8(&mut buf, TAG_HDEL).expect("vec write");
206                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
207                format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
208                for field in fields {
209                    format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
210                }
211            }
212            AofRecord::HIncrBy { key, field, delta } => {
213                format::write_u8(&mut buf, TAG_HINCRBY).expect("vec write");
214                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
215                format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
216                format::write_i64(&mut buf, *delta).expect("vec write");
217            }
218            AofRecord::SAdd { key, members } => {
219                format::write_u8(&mut buf, TAG_SADD).expect("vec write");
220                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
221                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
222                for member in members {
223                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
224                }
225            }
226            AofRecord::SRem { key, members } => {
227                format::write_u8(&mut buf, TAG_SREM).expect("vec write");
228                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
229                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
230                for member in members {
231                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
232                }
233            }
234        }
235        buf
236    }
237
238    /// Deserializes a record from a byte slice (tag + payload, no CRC).
239    fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
240        let mut cursor = io::Cursor::new(data);
241        let tag = format::read_u8(&mut cursor)?;
242        match tag {
243            TAG_SET => {
244                let key = read_string(&mut cursor, "key")?;
245                let value = format::read_bytes(&mut cursor)?;
246                let expire_ms = format::read_i64(&mut cursor)?;
247                Ok(AofRecord::Set {
248                    key,
249                    value: Bytes::from(value),
250                    expire_ms,
251                })
252            }
253            TAG_DEL => {
254                let key = read_string(&mut cursor, "key")?;
255                Ok(AofRecord::Del { key })
256            }
257            TAG_EXPIRE => {
258                let key = read_string(&mut cursor, "key")?;
259                let seconds = format::read_i64(&mut cursor)? as u64;
260                Ok(AofRecord::Expire { key, seconds })
261            }
262            TAG_LPUSH | TAG_RPUSH => {
263                let key = read_string(&mut cursor, "key")?;
264                let count = format::read_u32(&mut cursor)?;
265                let mut values = Vec::with_capacity(count as usize);
266                for _ in 0..count {
267                    values.push(Bytes::from(format::read_bytes(&mut cursor)?));
268                }
269                if tag == TAG_LPUSH {
270                    Ok(AofRecord::LPush { key, values })
271                } else {
272                    Ok(AofRecord::RPush { key, values })
273                }
274            }
275            TAG_LPOP => {
276                let key = read_string(&mut cursor, "key")?;
277                Ok(AofRecord::LPop { key })
278            }
279            TAG_RPOP => {
280                let key = read_string(&mut cursor, "key")?;
281                Ok(AofRecord::RPop { key })
282            }
283            TAG_ZADD => {
284                let key = read_string(&mut cursor, "key")?;
285                let count = format::read_u32(&mut cursor)?;
286                let mut members = Vec::with_capacity(count as usize);
287                for _ in 0..count {
288                    let score = format::read_f64(&mut cursor)?;
289                    let member = read_string(&mut cursor, "member")?;
290                    members.push((score, member));
291                }
292                Ok(AofRecord::ZAdd { key, members })
293            }
294            TAG_ZREM => {
295                let key = read_string(&mut cursor, "key")?;
296                let count = format::read_u32(&mut cursor)?;
297                let mut members = Vec::with_capacity(count as usize);
298                for _ in 0..count {
299                    members.push(read_string(&mut cursor, "member")?);
300                }
301                Ok(AofRecord::ZRem { key, members })
302            }
303            TAG_PERSIST => {
304                let key = read_string(&mut cursor, "key")?;
305                Ok(AofRecord::Persist { key })
306            }
307            TAG_PEXPIRE => {
308                let key = read_string(&mut cursor, "key")?;
309                let milliseconds = format::read_i64(&mut cursor)? as u64;
310                Ok(AofRecord::Pexpire { key, milliseconds })
311            }
312            TAG_INCR => {
313                let key = read_string(&mut cursor, "key")?;
314                Ok(AofRecord::Incr { key })
315            }
316            TAG_DECR => {
317                let key = read_string(&mut cursor, "key")?;
318                Ok(AofRecord::Decr { key })
319            }
320            TAG_HSET => {
321                let key = read_string(&mut cursor, "key")?;
322                let count = format::read_u32(&mut cursor)?;
323                let mut fields = Vec::with_capacity(count as usize);
324                for _ in 0..count {
325                    let field = read_string(&mut cursor, "field")?;
326                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
327                    fields.push((field, value));
328                }
329                Ok(AofRecord::HSet { key, fields })
330            }
331            TAG_HDEL => {
332                let key = read_string(&mut cursor, "key")?;
333                let count = format::read_u32(&mut cursor)?;
334                let mut fields = Vec::with_capacity(count as usize);
335                for _ in 0..count {
336                    fields.push(read_string(&mut cursor, "field")?);
337                }
338                Ok(AofRecord::HDel { key, fields })
339            }
340            TAG_HINCRBY => {
341                let key = read_string(&mut cursor, "key")?;
342                let field = read_string(&mut cursor, "field")?;
343                let delta = format::read_i64(&mut cursor)?;
344                Ok(AofRecord::HIncrBy { key, field, delta })
345            }
346            TAG_SADD => {
347                let key = read_string(&mut cursor, "key")?;
348                let count = format::read_u32(&mut cursor)?;
349                let mut members = Vec::with_capacity(count as usize);
350                for _ in 0..count {
351                    members.push(read_string(&mut cursor, "member")?);
352                }
353                Ok(AofRecord::SAdd { key, members })
354            }
355            TAG_SREM => {
356                let key = read_string(&mut cursor, "key")?;
357                let count = format::read_u32(&mut cursor)?;
358                let mut members = Vec::with_capacity(count as usize);
359                for _ in 0..count {
360                    members.push(read_string(&mut cursor, "member")?);
361                }
362                Ok(AofRecord::SRem { key, members })
363            }
364            _ => Err(FormatError::UnknownTag(tag)),
365        }
366    }
367}
368
369/// Configurable fsync policy for the AOF writer.
370#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
371pub enum FsyncPolicy {
372    /// fsync after every write. safest, slowest.
373    Always,
374    /// fsync once per second. the shard tick drives this.
375    #[default]
376    EverySec,
377    /// let the OS decide when to flush. fastest, least durable.
378    No,
379}
380
381/// Buffered writer for appending AOF records to a file.
382pub struct AofWriter {
383    writer: BufWriter<File>,
384    path: PathBuf,
385}
386
387impl AofWriter {
388    /// Opens (or creates) an AOF file. If the file is new, writes the header.
389    /// If the file already exists, appends to it.
390    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
391        let path = path.into();
392        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
393
394        let file = OpenOptions::new().create(true).append(true).open(&path)?;
395        let mut writer = BufWriter::new(file);
396
397        if !exists {
398            format::write_header(&mut writer, format::AOF_MAGIC)?;
399            writer.flush()?;
400        }
401
402        Ok(Self { writer, path })
403    }
404
405    /// Appends a record to the AOF. Writes tag+payload+crc32.
406    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
407        let payload = record.to_bytes();
408        let checksum = format::crc32(&payload);
409        self.writer.write_all(&payload)?;
410        format::write_u32(&mut self.writer, checksum)?;
411        Ok(())
412    }
413
414    /// Flushes the internal buffer to the OS.
415    pub fn flush(&mut self) -> Result<(), FormatError> {
416        self.writer.flush()?;
417        Ok(())
418    }
419
420    /// Flushes and fsyncs the file to disk.
421    pub fn sync(&mut self) -> Result<(), FormatError> {
422        self.writer.flush()?;
423        self.writer.get_ref().sync_all()?;
424        Ok(())
425    }
426
427    /// Returns the file path.
428    pub fn path(&self) -> &Path {
429        &self.path
430    }
431
432    /// Truncates the AOF file back to just the header.
433    /// Used after a successful snapshot to reset the log.
434    pub fn truncate(&mut self) -> Result<(), FormatError> {
435        // flush and drop the old writer
436        self.writer.flush()?;
437
438        // reopen the file with truncation, write fresh header
439        let file = OpenOptions::new()
440            .create(true)
441            .write(true)
442            .truncate(true)
443            .open(&self.path)?;
444        let mut writer = BufWriter::new(file);
445        format::write_header(&mut writer, format::AOF_MAGIC)?;
446        writer.flush()?;
447        self.writer = writer;
448        Ok(())
449    }
450}
451
452/// Reader for iterating over AOF records.
453#[derive(Debug)]
454pub struct AofReader {
455    reader: BufReader<File>,
456}
457
458impl AofReader {
459    /// Opens an AOF file and validates the header.
460    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
461        let file = File::open(path.as_ref())?;
462        let mut reader = BufReader::new(file);
463        let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
464        Ok(Self { reader })
465    }
466
467    /// Reads the next record from the AOF.
468    ///
469    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
470    /// server crashed mid-write), returns `Ok(None)` rather than an error
471    /// — this is the expected recovery behavior.
472    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
473        // peek for EOF — try reading the tag byte
474        let tag = match format::read_u8(&mut self.reader) {
475            Ok(t) => t,
476            Err(FormatError::UnexpectedEof) => return Ok(None),
477            Err(e) => return Err(e),
478        };
479
480        // read the rest of the payload based on tag, building the full
481        // record bytes for CRC verification
482        let record_result = self.read_payload_for_tag(tag);
483        match record_result {
484            Ok((payload, stored_crc)) => {
485                // prepend the tag to the payload for CRC check
486                let mut full = Vec::with_capacity(1 + payload.len());
487                full.push(tag);
488                full.extend_from_slice(&payload);
489                format::verify_crc32(&full, stored_crc)?;
490                AofRecord::from_bytes(&full).map(Some)
491            }
492            // truncated record — treat as end of usable data
493            Err(FormatError::UnexpectedEof) => Ok(None),
494            Err(e) => Err(e),
495        }
496    }
497
498    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
499    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
500        let mut payload = Vec::new();
501        match tag {
502            TAG_SET => {
503                // key_len + key + value_len + value + expire_ms
504                let key = format::read_bytes(&mut self.reader)?;
505                format::write_bytes(&mut payload, &key).expect("vec write");
506                let value = format::read_bytes(&mut self.reader)?;
507                format::write_bytes(&mut payload, &value).expect("vec write");
508                let expire_ms = format::read_i64(&mut self.reader)?;
509                format::write_i64(&mut payload, expire_ms).expect("vec write");
510            }
511            TAG_DEL => {
512                let key = format::read_bytes(&mut self.reader)?;
513                format::write_bytes(&mut payload, &key).expect("vec write");
514            }
515            TAG_EXPIRE => {
516                let key = format::read_bytes(&mut self.reader)?;
517                format::write_bytes(&mut payload, &key).expect("vec write");
518                let seconds = format::read_i64(&mut self.reader)?;
519                format::write_i64(&mut payload, seconds).expect("vec write");
520            }
521            TAG_LPUSH | TAG_RPUSH => {
522                let key = format::read_bytes(&mut self.reader)?;
523                format::write_bytes(&mut payload, &key).expect("vec write");
524                let count = format::read_u32(&mut self.reader)?;
525                format::write_u32(&mut payload, count).expect("vec write");
526                for _ in 0..count {
527                    let val = format::read_bytes(&mut self.reader)?;
528                    format::write_bytes(&mut payload, &val).expect("vec write");
529                }
530            }
531            TAG_LPOP | TAG_RPOP => {
532                let key = format::read_bytes(&mut self.reader)?;
533                format::write_bytes(&mut payload, &key).expect("vec write");
534            }
535            TAG_ZADD => {
536                let key = format::read_bytes(&mut self.reader)?;
537                format::write_bytes(&mut payload, &key).expect("vec write");
538                let count = format::read_u32(&mut self.reader)?;
539                format::write_u32(&mut payload, count).expect("vec write");
540                for _ in 0..count {
541                    let score = format::read_f64(&mut self.reader)?;
542                    format::write_f64(&mut payload, score).expect("vec write");
543                    let member = format::read_bytes(&mut self.reader)?;
544                    format::write_bytes(&mut payload, &member).expect("vec write");
545                }
546            }
547            TAG_ZREM => {
548                let key = format::read_bytes(&mut self.reader)?;
549                format::write_bytes(&mut payload, &key).expect("vec write");
550                let count = format::read_u32(&mut self.reader)?;
551                format::write_u32(&mut payload, count).expect("vec write");
552                for _ in 0..count {
553                    let member = format::read_bytes(&mut self.reader)?;
554                    format::write_bytes(&mut payload, &member).expect("vec write");
555                }
556            }
557            TAG_PERSIST => {
558                let key = format::read_bytes(&mut self.reader)?;
559                format::write_bytes(&mut payload, &key).expect("vec write");
560            }
561            TAG_PEXPIRE => {
562                let key = format::read_bytes(&mut self.reader)?;
563                format::write_bytes(&mut payload, &key).expect("vec write");
564                let millis = format::read_i64(&mut self.reader)?;
565                format::write_i64(&mut payload, millis).expect("vec write");
566            }
567            TAG_INCR | TAG_DECR => {
568                let key = format::read_bytes(&mut self.reader)?;
569                format::write_bytes(&mut payload, &key).expect("vec write");
570            }
571            _ => return Err(FormatError::UnknownTag(tag)),
572        }
573        let stored_crc = format::read_u32(&mut self.reader)?;
574        Ok((payload, stored_crc))
575    }
576}
577
578/// Returns the AOF file path for a given shard in a data directory.
579pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
580    data_dir.join(format!("shard-{shard_id}.aof"))
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586    fn temp_dir() -> tempfile::TempDir {
587        tempfile::tempdir().expect("create temp dir")
588    }
589
590    #[test]
591    fn record_round_trip_set() {
592        let rec = AofRecord::Set {
593            key: "hello".into(),
594            value: Bytes::from("world"),
595            expire_ms: 5000,
596        };
597        let bytes = rec.to_bytes();
598        let decoded = AofRecord::from_bytes(&bytes).unwrap();
599        assert_eq!(rec, decoded);
600    }
601
602    #[test]
603    fn record_round_trip_del() {
604        let rec = AofRecord::Del { key: "gone".into() };
605        let bytes = rec.to_bytes();
606        let decoded = AofRecord::from_bytes(&bytes).unwrap();
607        assert_eq!(rec, decoded);
608    }
609
610    #[test]
611    fn record_round_trip_expire() {
612        let rec = AofRecord::Expire {
613            key: "ttl".into(),
614            seconds: 300,
615        };
616        let bytes = rec.to_bytes();
617        let decoded = AofRecord::from_bytes(&bytes).unwrap();
618        assert_eq!(rec, decoded);
619    }
620
621    #[test]
622    fn set_with_no_expiry() {
623        let rec = AofRecord::Set {
624            key: "k".into(),
625            value: Bytes::from("v"),
626            expire_ms: -1,
627        };
628        let bytes = rec.to_bytes();
629        let decoded = AofRecord::from_bytes(&bytes).unwrap();
630        assert_eq!(rec, decoded);
631    }
632
633    #[test]
634    fn writer_reader_round_trip() {
635        let dir = temp_dir();
636        let path = dir.path().join("test.aof");
637
638        let records = vec![
639            AofRecord::Set {
640                key: "a".into(),
641                value: Bytes::from("1"),
642                expire_ms: -1,
643            },
644            AofRecord::Set {
645                key: "b".into(),
646                value: Bytes::from("2"),
647                expire_ms: 10_000,
648            },
649            AofRecord::Del { key: "a".into() },
650            AofRecord::Expire {
651                key: "b".into(),
652                seconds: 60,
653            },
654        ];
655
656        // write
657        {
658            let mut writer = AofWriter::open(&path).unwrap();
659            for rec in &records {
660                writer.write_record(rec).unwrap();
661            }
662            writer.sync().unwrap();
663        }
664
665        // read back
666        let mut reader = AofReader::open(&path).unwrap();
667        let mut got = Vec::new();
668        while let Some(rec) = reader.read_record().unwrap() {
669            got.push(rec);
670        }
671        assert_eq!(records, got);
672    }
673
674    #[test]
675    fn empty_aof_returns_no_records() {
676        let dir = temp_dir();
677        let path = dir.path().join("empty.aof");
678
679        // just write the header
680        {
681            let _writer = AofWriter::open(&path).unwrap();
682        }
683
684        let mut reader = AofReader::open(&path).unwrap();
685        assert!(reader.read_record().unwrap().is_none());
686    }
687
688    #[test]
689    fn truncated_record_treated_as_eof() {
690        let dir = temp_dir();
691        let path = dir.path().join("trunc.aof");
692
693        // write one good record, then append garbage (simulating a crash)
694        {
695            let mut writer = AofWriter::open(&path).unwrap();
696            writer
697                .write_record(&AofRecord::Set {
698                    key: "ok".into(),
699                    value: Bytes::from("good"),
700                    expire_ms: -1,
701                })
702                .unwrap();
703            writer.flush().unwrap();
704        }
705
706        // append a partial tag with no payload
707        {
708            let mut file = OpenOptions::new().append(true).open(&path).unwrap();
709            file.write_all(&[TAG_SET]).unwrap();
710        }
711
712        let mut reader = AofReader::open(&path).unwrap();
713        // first record should be fine
714        let rec = reader.read_record().unwrap().unwrap();
715        assert!(matches!(rec, AofRecord::Set { .. }));
716        // second should be None (truncated)
717        assert!(reader.read_record().unwrap().is_none());
718    }
719
720    #[test]
721    fn corrupt_crc_detected() {
722        let dir = temp_dir();
723        let path = dir.path().join("corrupt.aof");
724
725        {
726            let mut writer = AofWriter::open(&path).unwrap();
727            writer
728                .write_record(&AofRecord::Set {
729                    key: "k".into(),
730                    value: Bytes::from("v"),
731                    expire_ms: -1,
732                })
733                .unwrap();
734            writer.flush().unwrap();
735        }
736
737        // corrupt the last byte (part of the CRC)
738        let mut data = fs::read(&path).unwrap();
739        let last = data.len() - 1;
740        data[last] ^= 0xFF;
741        fs::write(&path, &data).unwrap();
742
743        let mut reader = AofReader::open(&path).unwrap();
744        let err = reader.read_record().unwrap_err();
745        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
746    }
747
748    #[test]
749    fn missing_magic_is_error() {
750        let dir = temp_dir();
751        let path = dir.path().join("bad.aof");
752        fs::write(&path, b"NOT_AOF_DATA").unwrap();
753
754        let err = AofReader::open(&path).unwrap_err();
755        assert!(matches!(err, FormatError::InvalidMagic));
756    }
757
758    #[test]
759    fn truncate_resets_aof() {
760        let dir = temp_dir();
761        let path = dir.path().join("reset.aof");
762
763        {
764            let mut writer = AofWriter::open(&path).unwrap();
765            writer
766                .write_record(&AofRecord::Set {
767                    key: "old".into(),
768                    value: Bytes::from("data"),
769                    expire_ms: -1,
770                })
771                .unwrap();
772            writer.truncate().unwrap();
773
774            // write a new record after truncation
775            writer
776                .write_record(&AofRecord::Set {
777                    key: "new".into(),
778                    value: Bytes::from("fresh"),
779                    expire_ms: -1,
780                })
781                .unwrap();
782            writer.sync().unwrap();
783        }
784
785        let mut reader = AofReader::open(&path).unwrap();
786        let rec = reader.read_record().unwrap().unwrap();
787        match rec {
788            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
789            other => panic!("expected Set, got {other:?}"),
790        }
791        // only one record after truncation
792        assert!(reader.read_record().unwrap().is_none());
793    }
794
795    #[test]
796    fn record_round_trip_lpush() {
797        let rec = AofRecord::LPush {
798            key: "list".into(),
799            values: vec![Bytes::from("a"), Bytes::from("b")],
800        };
801        let bytes = rec.to_bytes();
802        let decoded = AofRecord::from_bytes(&bytes).unwrap();
803        assert_eq!(rec, decoded);
804    }
805
806    #[test]
807    fn record_round_trip_rpush() {
808        let rec = AofRecord::RPush {
809            key: "list".into(),
810            values: vec![Bytes::from("x")],
811        };
812        let bytes = rec.to_bytes();
813        let decoded = AofRecord::from_bytes(&bytes).unwrap();
814        assert_eq!(rec, decoded);
815    }
816
817    #[test]
818    fn record_round_trip_lpop() {
819        let rec = AofRecord::LPop { key: "list".into() };
820        let bytes = rec.to_bytes();
821        let decoded = AofRecord::from_bytes(&bytes).unwrap();
822        assert_eq!(rec, decoded);
823    }
824
825    #[test]
826    fn record_round_trip_rpop() {
827        let rec = AofRecord::RPop { key: "list".into() };
828        let bytes = rec.to_bytes();
829        let decoded = AofRecord::from_bytes(&bytes).unwrap();
830        assert_eq!(rec, decoded);
831    }
832
833    #[test]
834    fn writer_reader_round_trip_with_list_records() {
835        let dir = temp_dir();
836        let path = dir.path().join("list.aof");
837
838        let records = vec![
839            AofRecord::LPush {
840                key: "l".into(),
841                values: vec![Bytes::from("a"), Bytes::from("b")],
842            },
843            AofRecord::RPush {
844                key: "l".into(),
845                values: vec![Bytes::from("c")],
846            },
847            AofRecord::LPop { key: "l".into() },
848            AofRecord::RPop { key: "l".into() },
849        ];
850
851        {
852            let mut writer = AofWriter::open(&path).unwrap();
853            for rec in &records {
854                writer.write_record(rec).unwrap();
855            }
856            writer.sync().unwrap();
857        }
858
859        let mut reader = AofReader::open(&path).unwrap();
860        let mut got = Vec::new();
861        while let Some(rec) = reader.read_record().unwrap() {
862            got.push(rec);
863        }
864        assert_eq!(records, got);
865    }
866
867    #[test]
868    fn record_round_trip_zadd() {
869        let rec = AofRecord::ZAdd {
870            key: "board".into(),
871            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
872        };
873        let bytes = rec.to_bytes();
874        let decoded = AofRecord::from_bytes(&bytes).unwrap();
875        assert_eq!(rec, decoded);
876    }
877
878    #[test]
879    fn record_round_trip_zrem() {
880        let rec = AofRecord::ZRem {
881            key: "board".into(),
882            members: vec!["alice".into(), "bob".into()],
883        };
884        let bytes = rec.to_bytes();
885        let decoded = AofRecord::from_bytes(&bytes).unwrap();
886        assert_eq!(rec, decoded);
887    }
888
889    #[test]
890    fn writer_reader_round_trip_with_sorted_set_records() {
891        let dir = temp_dir();
892        let path = dir.path().join("zset.aof");
893
894        let records = vec![
895            AofRecord::ZAdd {
896                key: "board".into(),
897                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
898            },
899            AofRecord::ZRem {
900                key: "board".into(),
901                members: vec!["alice".into()],
902            },
903        ];
904
905        {
906            let mut writer = AofWriter::open(&path).unwrap();
907            for rec in &records {
908                writer.write_record(rec).unwrap();
909            }
910            writer.sync().unwrap();
911        }
912
913        let mut reader = AofReader::open(&path).unwrap();
914        let mut got = Vec::new();
915        while let Some(rec) = reader.read_record().unwrap() {
916            got.push(rec);
917        }
918        assert_eq!(records, got);
919    }
920
921    #[test]
922    fn record_round_trip_persist() {
923        let rec = AofRecord::Persist {
924            key: "mykey".into(),
925        };
926        let bytes = rec.to_bytes();
927        let decoded = AofRecord::from_bytes(&bytes).unwrap();
928        assert_eq!(rec, decoded);
929    }
930
931    #[test]
932    fn record_round_trip_pexpire() {
933        let rec = AofRecord::Pexpire {
934            key: "mykey".into(),
935            milliseconds: 5000,
936        };
937        let bytes = rec.to_bytes();
938        let decoded = AofRecord::from_bytes(&bytes).unwrap();
939        assert_eq!(rec, decoded);
940    }
941
942    #[test]
943    fn record_round_trip_incr() {
944        let rec = AofRecord::Incr {
945            key: "counter".into(),
946        };
947        let bytes = rec.to_bytes();
948        let decoded = AofRecord::from_bytes(&bytes).unwrap();
949        assert_eq!(rec, decoded);
950    }
951
952    #[test]
953    fn record_round_trip_decr() {
954        let rec = AofRecord::Decr {
955            key: "counter".into(),
956        };
957        let bytes = rec.to_bytes();
958        let decoded = AofRecord::from_bytes(&bytes).unwrap();
959        assert_eq!(rec, decoded);
960    }
961
962    #[test]
963    fn writer_reader_round_trip_with_persist_pexpire() {
964        let dir = temp_dir();
965        let path = dir.path().join("persist_pexpire.aof");
966
967        let records = vec![
968            AofRecord::Set {
969                key: "k".into(),
970                value: Bytes::from("v"),
971                expire_ms: 5000,
972            },
973            AofRecord::Persist { key: "k".into() },
974            AofRecord::Pexpire {
975                key: "k".into(),
976                milliseconds: 3000,
977            },
978        ];
979
980        {
981            let mut writer = AofWriter::open(&path).unwrap();
982            for rec in &records {
983                writer.write_record(rec).unwrap();
984            }
985            writer.sync().unwrap();
986        }
987
988        let mut reader = AofReader::open(&path).unwrap();
989        let mut got = Vec::new();
990        while let Some(rec) = reader.read_record().unwrap() {
991            got.push(rec);
992        }
993        assert_eq!(records, got);
994    }
995
996    #[test]
997    fn aof_path_format() {
998        let p = aof_path(Path::new("/data"), 3);
999        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1000    }
1001
1002    #[test]
1003    fn record_round_trip_hset() {
1004        let rec = AofRecord::HSet {
1005            key: "hash".into(),
1006            fields: vec![
1007                ("f1".into(), Bytes::from("v1")),
1008                ("f2".into(), Bytes::from("v2")),
1009            ],
1010        };
1011        let bytes = rec.to_bytes();
1012        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1013        assert_eq!(rec, decoded);
1014    }
1015
1016    #[test]
1017    fn record_round_trip_hdel() {
1018        let rec = AofRecord::HDel {
1019            key: "hash".into(),
1020            fields: vec!["f1".into(), "f2".into()],
1021        };
1022        let bytes = rec.to_bytes();
1023        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1024        assert_eq!(rec, decoded);
1025    }
1026
1027    #[test]
1028    fn record_round_trip_hincrby() {
1029        let rec = AofRecord::HIncrBy {
1030            key: "hash".into(),
1031            field: "counter".into(),
1032            delta: -42,
1033        };
1034        let bytes = rec.to_bytes();
1035        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1036        assert_eq!(rec, decoded);
1037    }
1038
1039    #[test]
1040    fn record_round_trip_sadd() {
1041        let rec = AofRecord::SAdd {
1042            key: "set".into(),
1043            members: vec!["m1".into(), "m2".into(), "m3".into()],
1044        };
1045        let bytes = rec.to_bytes();
1046        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1047        assert_eq!(rec, decoded);
1048    }
1049
1050    #[test]
1051    fn record_round_trip_srem() {
1052        let rec = AofRecord::SRem {
1053            key: "set".into(),
1054            members: vec!["m1".into()],
1055        };
1056        let bytes = rec.to_bytes();
1057        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1058        assert_eq!(rec, decoded);
1059    }
1060}