Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27/// Reads a length-prefixed field and decodes it as UTF-8.
28fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29    let bytes = format::read_bytes(r)?;
30    String::from_utf8(bytes).map_err(|_| {
31        FormatError::Io(io::Error::new(
32            io::ErrorKind::InvalidData,
33            format!("{field} is not valid utf-8"),
34        ))
35    })
36}
37
38/// Record tags for the AOF format.
39const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57const TAG_INCRBY: u8 = 19;
58const TAG_DECRBY: u8 = 20;
59const TAG_APPEND: u8 = 21;
60const TAG_RENAME: u8 = 22;
61
62/// A single mutation record stored in the AOF.
63#[derive(Debug, Clone, PartialEq)]
64pub enum AofRecord {
65    /// SET key value \[expire_ms\]. expire_ms is -1 for no expiration.
66    Set {
67        key: String,
68        value: Bytes,
69        expire_ms: i64,
70    },
71    /// DEL key.
72    Del { key: String },
73    /// EXPIRE key seconds.
74    Expire { key: String, seconds: u64 },
75    /// LPUSH key value [value ...].
76    LPush { key: String, values: Vec<Bytes> },
77    /// RPUSH key value [value ...].
78    RPush { key: String, values: Vec<Bytes> },
79    /// LPOP key.
80    LPop { key: String },
81    /// RPOP key.
82    RPop { key: String },
83    /// ZADD key score member [score member ...].
84    ZAdd {
85        key: String,
86        members: Vec<(f64, String)>,
87    },
88    /// ZREM key member [member ...].
89    ZRem { key: String, members: Vec<String> },
90    /// PERSIST key — remove expiration.
91    Persist { key: String },
92    /// PEXPIRE key milliseconds.
93    Pexpire { key: String, milliseconds: u64 },
94    /// INCR key.
95    Incr { key: String },
96    /// DECR key.
97    Decr { key: String },
98    /// HSET key field value [field value ...].
99    HSet {
100        key: String,
101        fields: Vec<(String, Bytes)>,
102    },
103    /// HDEL key field [field ...].
104    HDel { key: String, fields: Vec<String> },
105    /// HINCRBY key field delta.
106    HIncrBy {
107        key: String,
108        field: String,
109        delta: i64,
110    },
111    /// SADD key member [member ...].
112    SAdd { key: String, members: Vec<String> },
113    /// SREM key member [member ...].
114    SRem { key: String, members: Vec<String> },
115    /// INCRBY key delta.
116    IncrBy { key: String, delta: i64 },
117    /// DECRBY key delta.
118    DecrBy { key: String, delta: i64 },
119    /// APPEND key value.
120    Append { key: String, value: Bytes },
121    /// RENAME key newkey.
122    Rename { key: String, newkey: String },
123}
124
125impl AofRecord {
126    /// Serializes this record into a byte vector (tag + payload, no CRC).
127    fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
128        let mut buf = Vec::new();
129        match self {
130            AofRecord::Set {
131                key,
132                value,
133                expire_ms,
134            } => {
135                format::write_u8(&mut buf, TAG_SET)?;
136                format::write_bytes(&mut buf, key.as_bytes())?;
137                format::write_bytes(&mut buf, value)?;
138                format::write_i64(&mut buf, *expire_ms)?;
139            }
140            AofRecord::Del { key } => {
141                format::write_u8(&mut buf, TAG_DEL)?;
142                format::write_bytes(&mut buf, key.as_bytes())?;
143            }
144            AofRecord::Expire { key, seconds } => {
145                format::write_u8(&mut buf, TAG_EXPIRE)?;
146                format::write_bytes(&mut buf, key.as_bytes())?;
147                format::write_i64(&mut buf, *seconds as i64)?;
148            }
149            AofRecord::LPush { key, values } => {
150                format::write_u8(&mut buf, TAG_LPUSH)?;
151                format::write_bytes(&mut buf, key.as_bytes())?;
152                format::write_u32(&mut buf, values.len() as u32)?;
153                for v in values {
154                    format::write_bytes(&mut buf, v)?;
155                }
156            }
157            AofRecord::RPush { key, values } => {
158                format::write_u8(&mut buf, TAG_RPUSH)?;
159                format::write_bytes(&mut buf, key.as_bytes())?;
160                format::write_u32(&mut buf, values.len() as u32)?;
161                for v in values {
162                    format::write_bytes(&mut buf, v)?;
163                }
164            }
165            AofRecord::LPop { key } => {
166                format::write_u8(&mut buf, TAG_LPOP)?;
167                format::write_bytes(&mut buf, key.as_bytes())?;
168            }
169            AofRecord::RPop { key } => {
170                format::write_u8(&mut buf, TAG_RPOP)?;
171                format::write_bytes(&mut buf, key.as_bytes())?;
172            }
173            AofRecord::ZAdd { key, members } => {
174                format::write_u8(&mut buf, TAG_ZADD)?;
175                format::write_bytes(&mut buf, key.as_bytes())?;
176                format::write_u32(&mut buf, members.len() as u32)?;
177                for (score, member) in members {
178                    format::write_f64(&mut buf, *score)?;
179                    format::write_bytes(&mut buf, member.as_bytes())?;
180                }
181            }
182            AofRecord::ZRem { key, members } => {
183                format::write_u8(&mut buf, TAG_ZREM)?;
184                format::write_bytes(&mut buf, key.as_bytes())?;
185                format::write_u32(&mut buf, members.len() as u32)?;
186                for member in members {
187                    format::write_bytes(&mut buf, member.as_bytes())?;
188                }
189            }
190            AofRecord::Persist { key } => {
191                format::write_u8(&mut buf, TAG_PERSIST)?;
192                format::write_bytes(&mut buf, key.as_bytes())?;
193            }
194            AofRecord::Pexpire { key, milliseconds } => {
195                format::write_u8(&mut buf, TAG_PEXPIRE)?;
196                format::write_bytes(&mut buf, key.as_bytes())?;
197                format::write_i64(&mut buf, *milliseconds as i64)?;
198            }
199            AofRecord::Incr { key } => {
200                format::write_u8(&mut buf, TAG_INCR)?;
201                format::write_bytes(&mut buf, key.as_bytes())?;
202            }
203            AofRecord::Decr { key } => {
204                format::write_u8(&mut buf, TAG_DECR)?;
205                format::write_bytes(&mut buf, key.as_bytes())?;
206            }
207            AofRecord::HSet { key, fields } => {
208                format::write_u8(&mut buf, TAG_HSET)?;
209                format::write_bytes(&mut buf, key.as_bytes())?;
210                format::write_u32(&mut buf, fields.len() as u32)?;
211                for (field, value) in fields {
212                    format::write_bytes(&mut buf, field.as_bytes())?;
213                    format::write_bytes(&mut buf, value)?;
214                }
215            }
216            AofRecord::HDel { key, fields } => {
217                format::write_u8(&mut buf, TAG_HDEL)?;
218                format::write_bytes(&mut buf, key.as_bytes())?;
219                format::write_u32(&mut buf, fields.len() as u32)?;
220                for field in fields {
221                    format::write_bytes(&mut buf, field.as_bytes())?;
222                }
223            }
224            AofRecord::HIncrBy { key, field, delta } => {
225                format::write_u8(&mut buf, TAG_HINCRBY)?;
226                format::write_bytes(&mut buf, key.as_bytes())?;
227                format::write_bytes(&mut buf, field.as_bytes())?;
228                format::write_i64(&mut buf, *delta)?;
229            }
230            AofRecord::SAdd { key, members } => {
231                format::write_u8(&mut buf, TAG_SADD)?;
232                format::write_bytes(&mut buf, key.as_bytes())?;
233                format::write_u32(&mut buf, members.len() as u32)?;
234                for member in members {
235                    format::write_bytes(&mut buf, member.as_bytes())?;
236                }
237            }
238            AofRecord::SRem { key, members } => {
239                format::write_u8(&mut buf, TAG_SREM)?;
240                format::write_bytes(&mut buf, key.as_bytes())?;
241                format::write_u32(&mut buf, members.len() as u32)?;
242                for member in members {
243                    format::write_bytes(&mut buf, member.as_bytes())?;
244                }
245            }
246            AofRecord::IncrBy { key, delta } => {
247                format::write_u8(&mut buf, TAG_INCRBY)?;
248                format::write_bytes(&mut buf, key.as_bytes())?;
249                format::write_i64(&mut buf, *delta)?;
250            }
251            AofRecord::DecrBy { key, delta } => {
252                format::write_u8(&mut buf, TAG_DECRBY)?;
253                format::write_bytes(&mut buf, key.as_bytes())?;
254                format::write_i64(&mut buf, *delta)?;
255            }
256            AofRecord::Append { key, value } => {
257                format::write_u8(&mut buf, TAG_APPEND)?;
258                format::write_bytes(&mut buf, key.as_bytes())?;
259                format::write_bytes(&mut buf, value)?;
260            }
261            AofRecord::Rename { key, newkey } => {
262                format::write_u8(&mut buf, TAG_RENAME)?;
263                format::write_bytes(&mut buf, key.as_bytes())?;
264                format::write_bytes(&mut buf, newkey.as_bytes())?;
265            }
266        }
267        Ok(buf)
268    }
269
270    /// Deserializes a record from a byte slice (tag + payload, no CRC).
271    fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
272        let mut cursor = io::Cursor::new(data);
273        let tag = format::read_u8(&mut cursor)?;
274        match tag {
275            TAG_SET => {
276                let key = read_string(&mut cursor, "key")?;
277                let value = format::read_bytes(&mut cursor)?;
278                let expire_ms = format::read_i64(&mut cursor)?;
279                Ok(AofRecord::Set {
280                    key,
281                    value: Bytes::from(value),
282                    expire_ms,
283                })
284            }
285            TAG_DEL => {
286                let key = read_string(&mut cursor, "key")?;
287                Ok(AofRecord::Del { key })
288            }
289            TAG_EXPIRE => {
290                let key = read_string(&mut cursor, "key")?;
291                let seconds = format::read_i64(&mut cursor)? as u64;
292                Ok(AofRecord::Expire { key, seconds })
293            }
294            TAG_LPUSH | TAG_RPUSH => {
295                let key = read_string(&mut cursor, "key")?;
296                let count = format::read_u32(&mut cursor)?;
297                let mut values = Vec::with_capacity(count as usize);
298                for _ in 0..count {
299                    values.push(Bytes::from(format::read_bytes(&mut cursor)?));
300                }
301                if tag == TAG_LPUSH {
302                    Ok(AofRecord::LPush { key, values })
303                } else {
304                    Ok(AofRecord::RPush { key, values })
305                }
306            }
307            TAG_LPOP => {
308                let key = read_string(&mut cursor, "key")?;
309                Ok(AofRecord::LPop { key })
310            }
311            TAG_RPOP => {
312                let key = read_string(&mut cursor, "key")?;
313                Ok(AofRecord::RPop { key })
314            }
315            TAG_ZADD => {
316                let key = read_string(&mut cursor, "key")?;
317                let count = format::read_u32(&mut cursor)?;
318                let mut members = Vec::with_capacity(count as usize);
319                for _ in 0..count {
320                    let score = format::read_f64(&mut cursor)?;
321                    let member = read_string(&mut cursor, "member")?;
322                    members.push((score, member));
323                }
324                Ok(AofRecord::ZAdd { key, members })
325            }
326            TAG_ZREM => {
327                let key = read_string(&mut cursor, "key")?;
328                let count = format::read_u32(&mut cursor)?;
329                let mut members = Vec::with_capacity(count as usize);
330                for _ in 0..count {
331                    members.push(read_string(&mut cursor, "member")?);
332                }
333                Ok(AofRecord::ZRem { key, members })
334            }
335            TAG_PERSIST => {
336                let key = read_string(&mut cursor, "key")?;
337                Ok(AofRecord::Persist { key })
338            }
339            TAG_PEXPIRE => {
340                let key = read_string(&mut cursor, "key")?;
341                let milliseconds = format::read_i64(&mut cursor)? as u64;
342                Ok(AofRecord::Pexpire { key, milliseconds })
343            }
344            TAG_INCR => {
345                let key = read_string(&mut cursor, "key")?;
346                Ok(AofRecord::Incr { key })
347            }
348            TAG_DECR => {
349                let key = read_string(&mut cursor, "key")?;
350                Ok(AofRecord::Decr { key })
351            }
352            TAG_HSET => {
353                let key = read_string(&mut cursor, "key")?;
354                let count = format::read_u32(&mut cursor)?;
355                let mut fields = Vec::with_capacity(count as usize);
356                for _ in 0..count {
357                    let field = read_string(&mut cursor, "field")?;
358                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
359                    fields.push((field, value));
360                }
361                Ok(AofRecord::HSet { key, fields })
362            }
363            TAG_HDEL => {
364                let key = read_string(&mut cursor, "key")?;
365                let count = format::read_u32(&mut cursor)?;
366                let mut fields = Vec::with_capacity(count as usize);
367                for _ in 0..count {
368                    fields.push(read_string(&mut cursor, "field")?);
369                }
370                Ok(AofRecord::HDel { key, fields })
371            }
372            TAG_HINCRBY => {
373                let key = read_string(&mut cursor, "key")?;
374                let field = read_string(&mut cursor, "field")?;
375                let delta = format::read_i64(&mut cursor)?;
376                Ok(AofRecord::HIncrBy { key, field, delta })
377            }
378            TAG_SADD => {
379                let key = read_string(&mut cursor, "key")?;
380                let count = format::read_u32(&mut cursor)?;
381                let mut members = Vec::with_capacity(count as usize);
382                for _ in 0..count {
383                    members.push(read_string(&mut cursor, "member")?);
384                }
385                Ok(AofRecord::SAdd { key, members })
386            }
387            TAG_SREM => {
388                let key = read_string(&mut cursor, "key")?;
389                let count = format::read_u32(&mut cursor)?;
390                let mut members = Vec::with_capacity(count as usize);
391                for _ in 0..count {
392                    members.push(read_string(&mut cursor, "member")?);
393                }
394                Ok(AofRecord::SRem { key, members })
395            }
396            TAG_INCRBY => {
397                let key = read_string(&mut cursor, "key")?;
398                let delta = format::read_i64(&mut cursor)?;
399                Ok(AofRecord::IncrBy { key, delta })
400            }
401            TAG_DECRBY => {
402                let key = read_string(&mut cursor, "key")?;
403                let delta = format::read_i64(&mut cursor)?;
404                Ok(AofRecord::DecrBy { key, delta })
405            }
406            TAG_APPEND => {
407                let key = read_string(&mut cursor, "key")?;
408                let value = Bytes::from(format::read_bytes(&mut cursor)?);
409                Ok(AofRecord::Append { key, value })
410            }
411            TAG_RENAME => {
412                let key = read_string(&mut cursor, "key")?;
413                let newkey = read_string(&mut cursor, "newkey")?;
414                Ok(AofRecord::Rename { key, newkey })
415            }
416            _ => Err(FormatError::UnknownTag(tag)),
417        }
418    }
419}
420
421/// Configurable fsync policy for the AOF writer.
422#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
423pub enum FsyncPolicy {
424    /// fsync after every write. safest, slowest.
425    Always,
426    /// fsync once per second. the shard tick drives this.
427    #[default]
428    EverySec,
429    /// let the OS decide when to flush. fastest, least durable.
430    No,
431}
432
433/// Buffered writer for appending AOF records to a file.
434pub struct AofWriter {
435    writer: BufWriter<File>,
436    path: PathBuf,
437}
438
439impl AofWriter {
440    /// Opens (or creates) an AOF file. If the file is new, writes the header.
441    /// If the file already exists, appends to it.
442    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
443        let path = path.into();
444        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
445
446        let mut opts = OpenOptions::new();
447        opts.create(true).append(true);
448        #[cfg(unix)]
449        {
450            use std::os::unix::fs::OpenOptionsExt;
451            opts.mode(0o600);
452        }
453        let file = opts.open(&path)?;
454        let mut writer = BufWriter::new(file);
455
456        if !exists {
457            format::write_header(&mut writer, format::AOF_MAGIC)?;
458            writer.flush()?;
459        }
460
461        Ok(Self { writer, path })
462    }
463
464    /// Appends a record to the AOF. Writes tag+payload+crc32.
465    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
466        let payload = record.to_bytes()?;
467        let checksum = format::crc32(&payload);
468        self.writer.write_all(&payload)?;
469        format::write_u32(&mut self.writer, checksum)?;
470        Ok(())
471    }
472
473    /// Flushes the internal buffer to the OS.
474    pub fn flush(&mut self) -> Result<(), FormatError> {
475        self.writer.flush()?;
476        Ok(())
477    }
478
479    /// Flushes and fsyncs the file to disk.
480    pub fn sync(&mut self) -> Result<(), FormatError> {
481        self.writer.flush()?;
482        self.writer.get_ref().sync_all()?;
483        Ok(())
484    }
485
486    /// Returns the file path.
487    pub fn path(&self) -> &Path {
488        &self.path
489    }
490
491    /// Truncates the AOF file back to just the header.
492    /// Used after a successful snapshot to reset the log.
493    pub fn truncate(&mut self) -> Result<(), FormatError> {
494        // flush and drop the old writer
495        self.writer.flush()?;
496
497        // reopen the file with truncation, write fresh header
498        let mut opts = OpenOptions::new();
499        opts.create(true).write(true).truncate(true);
500        #[cfg(unix)]
501        {
502            use std::os::unix::fs::OpenOptionsExt;
503            opts.mode(0o600);
504        }
505        let file = opts.open(&self.path)?;
506        let mut writer = BufWriter::new(file);
507        format::write_header(&mut writer, format::AOF_MAGIC)?;
508        writer.flush()?;
509        // ensure the fresh header is durable before we start appending
510        writer.get_ref().sync_all()?;
511        self.writer = writer;
512        Ok(())
513    }
514}
515
516/// Reader for iterating over AOF records.
517#[derive(Debug)]
518pub struct AofReader {
519    reader: BufReader<File>,
520}
521
522impl AofReader {
523    /// Opens an AOF file and validates the header.
524    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
525        let file = File::open(path.as_ref())?;
526        let mut reader = BufReader::new(file);
527        let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
528        Ok(Self { reader })
529    }
530
531    /// Reads the next record from the AOF.
532    ///
533    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
534    /// server crashed mid-write), returns `Ok(None)` rather than an error
535    /// — this is the expected recovery behavior.
536    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
537        // peek for EOF — try reading the tag byte
538        let tag = match format::read_u8(&mut self.reader) {
539            Ok(t) => t,
540            Err(FormatError::UnexpectedEof) => return Ok(None),
541            Err(e) => return Err(e),
542        };
543
544        // read the rest of the payload based on tag, building the full
545        // record bytes for CRC verification
546        let record_result = self.read_payload_for_tag(tag);
547        match record_result {
548            Ok((payload, stored_crc)) => {
549                // prepend the tag to the payload for CRC check
550                let mut full = Vec::with_capacity(1 + payload.len());
551                full.push(tag);
552                full.extend_from_slice(&payload);
553                format::verify_crc32(&full, stored_crc)?;
554                AofRecord::from_bytes(&full).map(Some)
555            }
556            // truncated record — treat as end of usable data
557            Err(FormatError::UnexpectedEof) => Ok(None),
558            Err(e) => Err(e),
559        }
560    }
561
562    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
563    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
564        let mut payload = Vec::new();
565        match tag {
566            TAG_SET => {
567                // key_len + key + value_len + value + expire_ms
568                let key = format::read_bytes(&mut self.reader)?;
569                format::write_bytes(&mut payload, &key)?;
570                let value = format::read_bytes(&mut self.reader)?;
571                format::write_bytes(&mut payload, &value)?;
572                let expire_ms = format::read_i64(&mut self.reader)?;
573                format::write_i64(&mut payload, expire_ms)?;
574            }
575            TAG_DEL => {
576                let key = format::read_bytes(&mut self.reader)?;
577                format::write_bytes(&mut payload, &key)?;
578            }
579            TAG_EXPIRE => {
580                let key = format::read_bytes(&mut self.reader)?;
581                format::write_bytes(&mut payload, &key)?;
582                let seconds = format::read_i64(&mut self.reader)?;
583                format::write_i64(&mut payload, seconds)?;
584            }
585            TAG_LPUSH | TAG_RPUSH => {
586                let key = format::read_bytes(&mut self.reader)?;
587                format::write_bytes(&mut payload, &key)?;
588                let count = format::read_u32(&mut self.reader)?;
589                format::write_u32(&mut payload, count)?;
590                for _ in 0..count {
591                    let val = format::read_bytes(&mut self.reader)?;
592                    format::write_bytes(&mut payload, &val)?;
593                }
594            }
595            TAG_LPOP | TAG_RPOP => {
596                let key = format::read_bytes(&mut self.reader)?;
597                format::write_bytes(&mut payload, &key)?;
598            }
599            TAG_ZADD => {
600                let key = format::read_bytes(&mut self.reader)?;
601                format::write_bytes(&mut payload, &key)?;
602                let count = format::read_u32(&mut self.reader)?;
603                format::write_u32(&mut payload, count)?;
604                for _ in 0..count {
605                    let score = format::read_f64(&mut self.reader)?;
606                    format::write_f64(&mut payload, score)?;
607                    let member = format::read_bytes(&mut self.reader)?;
608                    format::write_bytes(&mut payload, &member)?;
609                }
610            }
611            TAG_ZREM => {
612                let key = format::read_bytes(&mut self.reader)?;
613                format::write_bytes(&mut payload, &key)?;
614                let count = format::read_u32(&mut self.reader)?;
615                format::write_u32(&mut payload, count)?;
616                for _ in 0..count {
617                    let member = format::read_bytes(&mut self.reader)?;
618                    format::write_bytes(&mut payload, &member)?;
619                }
620            }
621            TAG_PERSIST => {
622                let key = format::read_bytes(&mut self.reader)?;
623                format::write_bytes(&mut payload, &key)?;
624            }
625            TAG_PEXPIRE => {
626                let key = format::read_bytes(&mut self.reader)?;
627                format::write_bytes(&mut payload, &key)?;
628                let millis = format::read_i64(&mut self.reader)?;
629                format::write_i64(&mut payload, millis)?;
630            }
631            TAG_INCR | TAG_DECR => {
632                let key = format::read_bytes(&mut self.reader)?;
633                format::write_bytes(&mut payload, &key)?;
634            }
635            _ => return Err(FormatError::UnknownTag(tag)),
636        }
637        let stored_crc = format::read_u32(&mut self.reader)?;
638        Ok((payload, stored_crc))
639    }
640}
641
642/// Returns the AOF file path for a given shard in a data directory.
643pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
644    data_dir.join(format!("shard-{shard_id}.aof"))
645}
646
647#[cfg(test)]
648mod tests {
649    use super::*;
650    fn temp_dir() -> tempfile::TempDir {
651        tempfile::tempdir().expect("create temp dir")
652    }
653
654    #[test]
655    fn record_round_trip_set() {
656        let rec = AofRecord::Set {
657            key: "hello".into(),
658            value: Bytes::from("world"),
659            expire_ms: 5000,
660        };
661        let bytes = rec.to_bytes().unwrap();
662        let decoded = AofRecord::from_bytes(&bytes).unwrap();
663        assert_eq!(rec, decoded);
664    }
665
666    #[test]
667    fn record_round_trip_del() {
668        let rec = AofRecord::Del { key: "gone".into() };
669        let bytes = rec.to_bytes().unwrap();
670        let decoded = AofRecord::from_bytes(&bytes).unwrap();
671        assert_eq!(rec, decoded);
672    }
673
674    #[test]
675    fn record_round_trip_expire() {
676        let rec = AofRecord::Expire {
677            key: "ttl".into(),
678            seconds: 300,
679        };
680        let bytes = rec.to_bytes().unwrap();
681        let decoded = AofRecord::from_bytes(&bytes).unwrap();
682        assert_eq!(rec, decoded);
683    }
684
685    #[test]
686    fn set_with_no_expiry() {
687        let rec = AofRecord::Set {
688            key: "k".into(),
689            value: Bytes::from("v"),
690            expire_ms: -1,
691        };
692        let bytes = rec.to_bytes().unwrap();
693        let decoded = AofRecord::from_bytes(&bytes).unwrap();
694        assert_eq!(rec, decoded);
695    }
696
697    #[test]
698    fn writer_reader_round_trip() {
699        let dir = temp_dir();
700        let path = dir.path().join("test.aof");
701
702        let records = vec![
703            AofRecord::Set {
704                key: "a".into(),
705                value: Bytes::from("1"),
706                expire_ms: -1,
707            },
708            AofRecord::Set {
709                key: "b".into(),
710                value: Bytes::from("2"),
711                expire_ms: 10_000,
712            },
713            AofRecord::Del { key: "a".into() },
714            AofRecord::Expire {
715                key: "b".into(),
716                seconds: 60,
717            },
718        ];
719
720        // write
721        {
722            let mut writer = AofWriter::open(&path).unwrap();
723            for rec in &records {
724                writer.write_record(rec).unwrap();
725            }
726            writer.sync().unwrap();
727        }
728
729        // read back
730        let mut reader = AofReader::open(&path).unwrap();
731        let mut got = Vec::new();
732        while let Some(rec) = reader.read_record().unwrap() {
733            got.push(rec);
734        }
735        assert_eq!(records, got);
736    }
737
738    #[test]
739    fn empty_aof_returns_no_records() {
740        let dir = temp_dir();
741        let path = dir.path().join("empty.aof");
742
743        // just write the header
744        {
745            let _writer = AofWriter::open(&path).unwrap();
746        }
747
748        let mut reader = AofReader::open(&path).unwrap();
749        assert!(reader.read_record().unwrap().is_none());
750    }
751
752    #[test]
753    fn truncated_record_treated_as_eof() {
754        let dir = temp_dir();
755        let path = dir.path().join("trunc.aof");
756
757        // write one good record, then append garbage (simulating a crash)
758        {
759            let mut writer = AofWriter::open(&path).unwrap();
760            writer
761                .write_record(&AofRecord::Set {
762                    key: "ok".into(),
763                    value: Bytes::from("good"),
764                    expire_ms: -1,
765                })
766                .unwrap();
767            writer.flush().unwrap();
768        }
769
770        // append a partial tag with no payload
771        {
772            let mut file = OpenOptions::new().append(true).open(&path).unwrap();
773            file.write_all(&[TAG_SET]).unwrap();
774        }
775
776        let mut reader = AofReader::open(&path).unwrap();
777        // first record should be fine
778        let rec = reader.read_record().unwrap().unwrap();
779        assert!(matches!(rec, AofRecord::Set { .. }));
780        // second should be None (truncated)
781        assert!(reader.read_record().unwrap().is_none());
782    }
783
784    #[test]
785    fn corrupt_crc_detected() {
786        let dir = temp_dir();
787        let path = dir.path().join("corrupt.aof");
788
789        {
790            let mut writer = AofWriter::open(&path).unwrap();
791            writer
792                .write_record(&AofRecord::Set {
793                    key: "k".into(),
794                    value: Bytes::from("v"),
795                    expire_ms: -1,
796                })
797                .unwrap();
798            writer.flush().unwrap();
799        }
800
801        // corrupt the last byte (part of the CRC)
802        let mut data = fs::read(&path).unwrap();
803        let last = data.len() - 1;
804        data[last] ^= 0xFF;
805        fs::write(&path, &data).unwrap();
806
807        let mut reader = AofReader::open(&path).unwrap();
808        let err = reader.read_record().unwrap_err();
809        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
810    }
811
812    #[test]
813    fn missing_magic_is_error() {
814        let dir = temp_dir();
815        let path = dir.path().join("bad.aof");
816        fs::write(&path, b"NOT_AOF_DATA").unwrap();
817
818        let err = AofReader::open(&path).unwrap_err();
819        assert!(matches!(err, FormatError::InvalidMagic));
820    }
821
822    #[test]
823    fn truncate_resets_aof() {
824        let dir = temp_dir();
825        let path = dir.path().join("reset.aof");
826
827        {
828            let mut writer = AofWriter::open(&path).unwrap();
829            writer
830                .write_record(&AofRecord::Set {
831                    key: "old".into(),
832                    value: Bytes::from("data"),
833                    expire_ms: -1,
834                })
835                .unwrap();
836            writer.truncate().unwrap();
837
838            // write a new record after truncation
839            writer
840                .write_record(&AofRecord::Set {
841                    key: "new".into(),
842                    value: Bytes::from("fresh"),
843                    expire_ms: -1,
844                })
845                .unwrap();
846            writer.sync().unwrap();
847        }
848
849        let mut reader = AofReader::open(&path).unwrap();
850        let rec = reader.read_record().unwrap().unwrap();
851        match rec {
852            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
853            other => panic!("expected Set, got {other:?}"),
854        }
855        // only one record after truncation
856        assert!(reader.read_record().unwrap().is_none());
857    }
858
859    #[test]
860    fn record_round_trip_lpush() {
861        let rec = AofRecord::LPush {
862            key: "list".into(),
863            values: vec![Bytes::from("a"), Bytes::from("b")],
864        };
865        let bytes = rec.to_bytes().unwrap();
866        let decoded = AofRecord::from_bytes(&bytes).unwrap();
867        assert_eq!(rec, decoded);
868    }
869
870    #[test]
871    fn record_round_trip_rpush() {
872        let rec = AofRecord::RPush {
873            key: "list".into(),
874            values: vec![Bytes::from("x")],
875        };
876        let bytes = rec.to_bytes().unwrap();
877        let decoded = AofRecord::from_bytes(&bytes).unwrap();
878        assert_eq!(rec, decoded);
879    }
880
881    #[test]
882    fn record_round_trip_lpop() {
883        let rec = AofRecord::LPop { key: "list".into() };
884        let bytes = rec.to_bytes().unwrap();
885        let decoded = AofRecord::from_bytes(&bytes).unwrap();
886        assert_eq!(rec, decoded);
887    }
888
889    #[test]
890    fn record_round_trip_rpop() {
891        let rec = AofRecord::RPop { key: "list".into() };
892        let bytes = rec.to_bytes().unwrap();
893        let decoded = AofRecord::from_bytes(&bytes).unwrap();
894        assert_eq!(rec, decoded);
895    }
896
897    #[test]
898    fn writer_reader_round_trip_with_list_records() {
899        let dir = temp_dir();
900        let path = dir.path().join("list.aof");
901
902        let records = vec![
903            AofRecord::LPush {
904                key: "l".into(),
905                values: vec![Bytes::from("a"), Bytes::from("b")],
906            },
907            AofRecord::RPush {
908                key: "l".into(),
909                values: vec![Bytes::from("c")],
910            },
911            AofRecord::LPop { key: "l".into() },
912            AofRecord::RPop { key: "l".into() },
913        ];
914
915        {
916            let mut writer = AofWriter::open(&path).unwrap();
917            for rec in &records {
918                writer.write_record(rec).unwrap();
919            }
920            writer.sync().unwrap();
921        }
922
923        let mut reader = AofReader::open(&path).unwrap();
924        let mut got = Vec::new();
925        while let Some(rec) = reader.read_record().unwrap() {
926            got.push(rec);
927        }
928        assert_eq!(records, got);
929    }
930
931    #[test]
932    fn record_round_trip_zadd() {
933        let rec = AofRecord::ZAdd {
934            key: "board".into(),
935            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
936        };
937        let bytes = rec.to_bytes().unwrap();
938        let decoded = AofRecord::from_bytes(&bytes).unwrap();
939        assert_eq!(rec, decoded);
940    }
941
942    #[test]
943    fn record_round_trip_zrem() {
944        let rec = AofRecord::ZRem {
945            key: "board".into(),
946            members: vec!["alice".into(), "bob".into()],
947        };
948        let bytes = rec.to_bytes().unwrap();
949        let decoded = AofRecord::from_bytes(&bytes).unwrap();
950        assert_eq!(rec, decoded);
951    }
952
953    #[test]
954    fn writer_reader_round_trip_with_sorted_set_records() {
955        let dir = temp_dir();
956        let path = dir.path().join("zset.aof");
957
958        let records = vec![
959            AofRecord::ZAdd {
960                key: "board".into(),
961                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
962            },
963            AofRecord::ZRem {
964                key: "board".into(),
965                members: vec!["alice".into()],
966            },
967        ];
968
969        {
970            let mut writer = AofWriter::open(&path).unwrap();
971            for rec in &records {
972                writer.write_record(rec).unwrap();
973            }
974            writer.sync().unwrap();
975        }
976
977        let mut reader = AofReader::open(&path).unwrap();
978        let mut got = Vec::new();
979        while let Some(rec) = reader.read_record().unwrap() {
980            got.push(rec);
981        }
982        assert_eq!(records, got);
983    }
984
985    #[test]
986    fn record_round_trip_persist() {
987        let rec = AofRecord::Persist {
988            key: "mykey".into(),
989        };
990        let bytes = rec.to_bytes().unwrap();
991        let decoded = AofRecord::from_bytes(&bytes).unwrap();
992        assert_eq!(rec, decoded);
993    }
994
995    #[test]
996    fn record_round_trip_pexpire() {
997        let rec = AofRecord::Pexpire {
998            key: "mykey".into(),
999            milliseconds: 5000,
1000        };
1001        let bytes = rec.to_bytes().unwrap();
1002        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1003        assert_eq!(rec, decoded);
1004    }
1005
1006    #[test]
1007    fn record_round_trip_incr() {
1008        let rec = AofRecord::Incr {
1009            key: "counter".into(),
1010        };
1011        let bytes = rec.to_bytes().unwrap();
1012        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1013        assert_eq!(rec, decoded);
1014    }
1015
1016    #[test]
1017    fn record_round_trip_decr() {
1018        let rec = AofRecord::Decr {
1019            key: "counter".into(),
1020        };
1021        let bytes = rec.to_bytes().unwrap();
1022        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1023        assert_eq!(rec, decoded);
1024    }
1025
1026    #[test]
1027    fn writer_reader_round_trip_with_persist_pexpire() {
1028        let dir = temp_dir();
1029        let path = dir.path().join("persist_pexpire.aof");
1030
1031        let records = vec![
1032            AofRecord::Set {
1033                key: "k".into(),
1034                value: Bytes::from("v"),
1035                expire_ms: 5000,
1036            },
1037            AofRecord::Persist { key: "k".into() },
1038            AofRecord::Pexpire {
1039                key: "k".into(),
1040                milliseconds: 3000,
1041            },
1042        ];
1043
1044        {
1045            let mut writer = AofWriter::open(&path).unwrap();
1046            for rec in &records {
1047                writer.write_record(rec).unwrap();
1048            }
1049            writer.sync().unwrap();
1050        }
1051
1052        let mut reader = AofReader::open(&path).unwrap();
1053        let mut got = Vec::new();
1054        while let Some(rec) = reader.read_record().unwrap() {
1055            got.push(rec);
1056        }
1057        assert_eq!(records, got);
1058    }
1059
1060    #[test]
1061    fn aof_path_format() {
1062        let p = aof_path(Path::new("/data"), 3);
1063        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1064    }
1065
1066    #[test]
1067    fn record_round_trip_hset() {
1068        let rec = AofRecord::HSet {
1069            key: "hash".into(),
1070            fields: vec![
1071                ("f1".into(), Bytes::from("v1")),
1072                ("f2".into(), Bytes::from("v2")),
1073            ],
1074        };
1075        let bytes = rec.to_bytes().unwrap();
1076        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1077        assert_eq!(rec, decoded);
1078    }
1079
1080    #[test]
1081    fn record_round_trip_hdel() {
1082        let rec = AofRecord::HDel {
1083            key: "hash".into(),
1084            fields: vec!["f1".into(), "f2".into()],
1085        };
1086        let bytes = rec.to_bytes().unwrap();
1087        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1088        assert_eq!(rec, decoded);
1089    }
1090
1091    #[test]
1092    fn record_round_trip_hincrby() {
1093        let rec = AofRecord::HIncrBy {
1094            key: "hash".into(),
1095            field: "counter".into(),
1096            delta: -42,
1097        };
1098        let bytes = rec.to_bytes().unwrap();
1099        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1100        assert_eq!(rec, decoded);
1101    }
1102
1103    #[test]
1104    fn record_round_trip_sadd() {
1105        let rec = AofRecord::SAdd {
1106            key: "set".into(),
1107            members: vec!["m1".into(), "m2".into(), "m3".into()],
1108        };
1109        let bytes = rec.to_bytes().unwrap();
1110        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1111        assert_eq!(rec, decoded);
1112    }
1113
1114    #[test]
1115    fn record_round_trip_srem() {
1116        let rec = AofRecord::SRem {
1117            key: "set".into(),
1118            members: vec!["m1".into()],
1119        };
1120        let bytes = rec.to_bytes().unwrap();
1121        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1122        assert_eq!(rec, decoded);
1123    }
1124}