Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27/// Reads a length-prefixed field and decodes it as UTF-8.
28fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29    let bytes = format::read_bytes(r)?;
30    String::from_utf8(bytes).map_err(|_| {
31        FormatError::Io(io::Error::new(
32            io::ErrorKind::InvalidData,
33            format!("{field} is not valid utf-8"),
34        ))
35    })
36}
37
38/// Record tags for the AOF format.
39const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57
58/// A single mutation record stored in the AOF.
59#[derive(Debug, Clone, PartialEq)]
60pub enum AofRecord {
61    /// SET key value \[expire_ms\]. expire_ms is -1 for no expiration.
62    Set {
63        key: String,
64        value: Bytes,
65        expire_ms: i64,
66    },
67    /// DEL key.
68    Del { key: String },
69    /// EXPIRE key seconds.
70    Expire { key: String, seconds: u64 },
71    /// LPUSH key value [value ...].
72    LPush { key: String, values: Vec<Bytes> },
73    /// RPUSH key value [value ...].
74    RPush { key: String, values: Vec<Bytes> },
75    /// LPOP key.
76    LPop { key: String },
77    /// RPOP key.
78    RPop { key: String },
79    /// ZADD key score member [score member ...].
80    ZAdd {
81        key: String,
82        members: Vec<(f64, String)>,
83    },
84    /// ZREM key member [member ...].
85    ZRem { key: String, members: Vec<String> },
86    /// PERSIST key — remove expiration.
87    Persist { key: String },
88    /// PEXPIRE key milliseconds.
89    Pexpire { key: String, milliseconds: u64 },
90    /// INCR key.
91    Incr { key: String },
92    /// DECR key.
93    Decr { key: String },
94    /// HSET key field value [field value ...].
95    HSet {
96        key: String,
97        fields: Vec<(String, Bytes)>,
98    },
99    /// HDEL key field [field ...].
100    HDel { key: String, fields: Vec<String> },
101    /// HINCRBY key field delta.
102    HIncrBy {
103        key: String,
104        field: String,
105        delta: i64,
106    },
107    /// SADD key member [member ...].
108    SAdd { key: String, members: Vec<String> },
109    /// SREM key member [member ...].
110    SRem { key: String, members: Vec<String> },
111}
112
113impl AofRecord {
114    /// Serializes this record into a byte vector (tag + payload, no CRC).
115    fn to_bytes(&self) -> Vec<u8> {
116        let mut buf = Vec::new();
117        match self {
118            AofRecord::Set {
119                key,
120                value,
121                expire_ms,
122            } => {
123                format::write_u8(&mut buf, TAG_SET).expect("vec write");
124                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
125                format::write_bytes(&mut buf, value).expect("vec write");
126                format::write_i64(&mut buf, *expire_ms).expect("vec write");
127            }
128            AofRecord::Del { key } => {
129                format::write_u8(&mut buf, TAG_DEL).expect("vec write");
130                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
131            }
132            AofRecord::Expire { key, seconds } => {
133                format::write_u8(&mut buf, TAG_EXPIRE).expect("vec write");
134                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
135                format::write_i64(&mut buf, *seconds as i64).expect("vec write");
136            }
137            AofRecord::LPush { key, values } => {
138                format::write_u8(&mut buf, TAG_LPUSH).expect("vec write");
139                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
140                format::write_u32(&mut buf, values.len() as u32).expect("vec write");
141                for v in values {
142                    format::write_bytes(&mut buf, v).expect("vec write");
143                }
144            }
145            AofRecord::RPush { key, values } => {
146                format::write_u8(&mut buf, TAG_RPUSH).expect("vec write");
147                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
148                format::write_u32(&mut buf, values.len() as u32).expect("vec write");
149                for v in values {
150                    format::write_bytes(&mut buf, v).expect("vec write");
151                }
152            }
153            AofRecord::LPop { key } => {
154                format::write_u8(&mut buf, TAG_LPOP).expect("vec write");
155                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
156            }
157            AofRecord::RPop { key } => {
158                format::write_u8(&mut buf, TAG_RPOP).expect("vec write");
159                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
160            }
161            AofRecord::ZAdd { key, members } => {
162                format::write_u8(&mut buf, TAG_ZADD).expect("vec write");
163                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
164                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
165                for (score, member) in members {
166                    format::write_f64(&mut buf, *score).expect("vec write");
167                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
168                }
169            }
170            AofRecord::ZRem { key, members } => {
171                format::write_u8(&mut buf, TAG_ZREM).expect("vec write");
172                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
173                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
174                for member in members {
175                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
176                }
177            }
178            AofRecord::Persist { key } => {
179                format::write_u8(&mut buf, TAG_PERSIST).expect("vec write");
180                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
181            }
182            AofRecord::Pexpire { key, milliseconds } => {
183                format::write_u8(&mut buf, TAG_PEXPIRE).expect("vec write");
184                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
185                format::write_i64(&mut buf, *milliseconds as i64).expect("vec write");
186            }
187            AofRecord::Incr { key } => {
188                format::write_u8(&mut buf, TAG_INCR).expect("vec write");
189                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
190            }
191            AofRecord::Decr { key } => {
192                format::write_u8(&mut buf, TAG_DECR).expect("vec write");
193                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
194            }
195            AofRecord::HSet { key, fields } => {
196                format::write_u8(&mut buf, TAG_HSET).expect("vec write");
197                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
198                format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
199                for (field, value) in fields {
200                    format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
201                    format::write_bytes(&mut buf, value).expect("vec write");
202                }
203            }
204            AofRecord::HDel { key, fields } => {
205                format::write_u8(&mut buf, TAG_HDEL).expect("vec write");
206                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
207                format::write_u32(&mut buf, fields.len() as u32).expect("vec write");
208                for field in fields {
209                    format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
210                }
211            }
212            AofRecord::HIncrBy { key, field, delta } => {
213                format::write_u8(&mut buf, TAG_HINCRBY).expect("vec write");
214                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
215                format::write_bytes(&mut buf, field.as_bytes()).expect("vec write");
216                format::write_i64(&mut buf, *delta).expect("vec write");
217            }
218            AofRecord::SAdd { key, members } => {
219                format::write_u8(&mut buf, TAG_SADD).expect("vec write");
220                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
221                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
222                for member in members {
223                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
224                }
225            }
226            AofRecord::SRem { key, members } => {
227                format::write_u8(&mut buf, TAG_SREM).expect("vec write");
228                format::write_bytes(&mut buf, key.as_bytes()).expect("vec write");
229                format::write_u32(&mut buf, members.len() as u32).expect("vec write");
230                for member in members {
231                    format::write_bytes(&mut buf, member.as_bytes()).expect("vec write");
232                }
233            }
234        }
235        buf
236    }
237
238    /// Deserializes a record from a byte slice (tag + payload, no CRC).
239    fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
240        let mut cursor = io::Cursor::new(data);
241        let tag = format::read_u8(&mut cursor)?;
242        match tag {
243            TAG_SET => {
244                let key = read_string(&mut cursor, "key")?;
245                let value = format::read_bytes(&mut cursor)?;
246                let expire_ms = format::read_i64(&mut cursor)?;
247                Ok(AofRecord::Set {
248                    key,
249                    value: Bytes::from(value),
250                    expire_ms,
251                })
252            }
253            TAG_DEL => {
254                let key = read_string(&mut cursor, "key")?;
255                Ok(AofRecord::Del { key })
256            }
257            TAG_EXPIRE => {
258                let key = read_string(&mut cursor, "key")?;
259                let seconds = format::read_i64(&mut cursor)? as u64;
260                Ok(AofRecord::Expire { key, seconds })
261            }
262            TAG_LPUSH | TAG_RPUSH => {
263                let key = read_string(&mut cursor, "key")?;
264                let count = format::read_u32(&mut cursor)?;
265                let mut values = Vec::with_capacity(count as usize);
266                for _ in 0..count {
267                    values.push(Bytes::from(format::read_bytes(&mut cursor)?));
268                }
269                if tag == TAG_LPUSH {
270                    Ok(AofRecord::LPush { key, values })
271                } else {
272                    Ok(AofRecord::RPush { key, values })
273                }
274            }
275            TAG_LPOP => {
276                let key = read_string(&mut cursor, "key")?;
277                Ok(AofRecord::LPop { key })
278            }
279            TAG_RPOP => {
280                let key = read_string(&mut cursor, "key")?;
281                Ok(AofRecord::RPop { key })
282            }
283            TAG_ZADD => {
284                let key = read_string(&mut cursor, "key")?;
285                let count = format::read_u32(&mut cursor)?;
286                let mut members = Vec::with_capacity(count as usize);
287                for _ in 0..count {
288                    let score = format::read_f64(&mut cursor)?;
289                    let member = read_string(&mut cursor, "member")?;
290                    members.push((score, member));
291                }
292                Ok(AofRecord::ZAdd { key, members })
293            }
294            TAG_ZREM => {
295                let key = read_string(&mut cursor, "key")?;
296                let count = format::read_u32(&mut cursor)?;
297                let mut members = Vec::with_capacity(count as usize);
298                for _ in 0..count {
299                    members.push(read_string(&mut cursor, "member")?);
300                }
301                Ok(AofRecord::ZRem { key, members })
302            }
303            TAG_PERSIST => {
304                let key = read_string(&mut cursor, "key")?;
305                Ok(AofRecord::Persist { key })
306            }
307            TAG_PEXPIRE => {
308                let key = read_string(&mut cursor, "key")?;
309                let milliseconds = format::read_i64(&mut cursor)? as u64;
310                Ok(AofRecord::Pexpire { key, milliseconds })
311            }
312            TAG_INCR => {
313                let key = read_string(&mut cursor, "key")?;
314                Ok(AofRecord::Incr { key })
315            }
316            TAG_DECR => {
317                let key = read_string(&mut cursor, "key")?;
318                Ok(AofRecord::Decr { key })
319            }
320            TAG_HSET => {
321                let key = read_string(&mut cursor, "key")?;
322                let count = format::read_u32(&mut cursor)?;
323                let mut fields = Vec::with_capacity(count as usize);
324                for _ in 0..count {
325                    let field = read_string(&mut cursor, "field")?;
326                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
327                    fields.push((field, value));
328                }
329                Ok(AofRecord::HSet { key, fields })
330            }
331            TAG_HDEL => {
332                let key = read_string(&mut cursor, "key")?;
333                let count = format::read_u32(&mut cursor)?;
334                let mut fields = Vec::with_capacity(count as usize);
335                for _ in 0..count {
336                    fields.push(read_string(&mut cursor, "field")?);
337                }
338                Ok(AofRecord::HDel { key, fields })
339            }
340            TAG_HINCRBY => {
341                let key = read_string(&mut cursor, "key")?;
342                let field = read_string(&mut cursor, "field")?;
343                let delta = format::read_i64(&mut cursor)?;
344                Ok(AofRecord::HIncrBy { key, field, delta })
345            }
346            TAG_SADD => {
347                let key = read_string(&mut cursor, "key")?;
348                let count = format::read_u32(&mut cursor)?;
349                let mut members = Vec::with_capacity(count as usize);
350                for _ in 0..count {
351                    members.push(read_string(&mut cursor, "member")?);
352                }
353                Ok(AofRecord::SAdd { key, members })
354            }
355            TAG_SREM => {
356                let key = read_string(&mut cursor, "key")?;
357                let count = format::read_u32(&mut cursor)?;
358                let mut members = Vec::with_capacity(count as usize);
359                for _ in 0..count {
360                    members.push(read_string(&mut cursor, "member")?);
361                }
362                Ok(AofRecord::SRem { key, members })
363            }
364            _ => Err(FormatError::UnknownTag(tag)),
365        }
366    }
367}
368
369/// Configurable fsync policy for the AOF writer.
370#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
371pub enum FsyncPolicy {
372    /// fsync after every write. safest, slowest.
373    Always,
374    /// fsync once per second. the shard tick drives this.
375    #[default]
376    EverySec,
377    /// let the OS decide when to flush. fastest, least durable.
378    No,
379}
380
381/// Buffered writer for appending AOF records to a file.
382pub struct AofWriter {
383    writer: BufWriter<File>,
384    path: PathBuf,
385}
386
387impl AofWriter {
388    /// Opens (or creates) an AOF file. If the file is new, writes the header.
389    /// If the file already exists, appends to it.
390    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
391        let path = path.into();
392        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
393
394        let file = OpenOptions::new().create(true).append(true).open(&path)?;
395        let mut writer = BufWriter::new(file);
396
397        if !exists {
398            format::write_header(&mut writer, format::AOF_MAGIC)?;
399            writer.flush()?;
400        }
401
402        Ok(Self { writer, path })
403    }
404
405    /// Appends a record to the AOF. Writes tag+payload+crc32.
406    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
407        let payload = record.to_bytes();
408        let checksum = format::crc32(&payload);
409        self.writer.write_all(&payload)?;
410        format::write_u32(&mut self.writer, checksum)?;
411        Ok(())
412    }
413
414    /// Flushes the internal buffer to the OS.
415    pub fn flush(&mut self) -> Result<(), FormatError> {
416        self.writer.flush()?;
417        Ok(())
418    }
419
420    /// Flushes and fsyncs the file to disk.
421    pub fn sync(&mut self) -> Result<(), FormatError> {
422        self.writer.flush()?;
423        self.writer.get_ref().sync_all()?;
424        Ok(())
425    }
426
427    /// Returns the file path.
428    pub fn path(&self) -> &Path {
429        &self.path
430    }
431
432    /// Truncates the AOF file back to just the header.
433    /// Used after a successful snapshot to reset the log.
434    pub fn truncate(&mut self) -> Result<(), FormatError> {
435        // flush and drop the old writer
436        self.writer.flush()?;
437
438        // reopen the file with truncation, write fresh header
439        let file = OpenOptions::new()
440            .create(true)
441            .write(true)
442            .truncate(true)
443            .open(&self.path)?;
444        let mut writer = BufWriter::new(file);
445        format::write_header(&mut writer, format::AOF_MAGIC)?;
446        writer.flush()?;
447        // ensure the fresh header is durable before we start appending
448        writer.get_ref().sync_all()?;
449        self.writer = writer;
450        Ok(())
451    }
452}
453
454/// Reader for iterating over AOF records.
455#[derive(Debug)]
456pub struct AofReader {
457    reader: BufReader<File>,
458}
459
460impl AofReader {
461    /// Opens an AOF file and validates the header.
462    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
463        let file = File::open(path.as_ref())?;
464        let mut reader = BufReader::new(file);
465        let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
466        Ok(Self { reader })
467    }
468
469    /// Reads the next record from the AOF.
470    ///
471    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
472    /// server crashed mid-write), returns `Ok(None)` rather than an error
473    /// — this is the expected recovery behavior.
474    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
475        // peek for EOF — try reading the tag byte
476        let tag = match format::read_u8(&mut self.reader) {
477            Ok(t) => t,
478            Err(FormatError::UnexpectedEof) => return Ok(None),
479            Err(e) => return Err(e),
480        };
481
482        // read the rest of the payload based on tag, building the full
483        // record bytes for CRC verification
484        let record_result = self.read_payload_for_tag(tag);
485        match record_result {
486            Ok((payload, stored_crc)) => {
487                // prepend the tag to the payload for CRC check
488                let mut full = Vec::with_capacity(1 + payload.len());
489                full.push(tag);
490                full.extend_from_slice(&payload);
491                format::verify_crc32(&full, stored_crc)?;
492                AofRecord::from_bytes(&full).map(Some)
493            }
494            // truncated record — treat as end of usable data
495            Err(FormatError::UnexpectedEof) => Ok(None),
496            Err(e) => Err(e),
497        }
498    }
499
500    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
501    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
502        let mut payload = Vec::new();
503        match tag {
504            TAG_SET => {
505                // key_len + key + value_len + value + expire_ms
506                let key = format::read_bytes(&mut self.reader)?;
507                format::write_bytes(&mut payload, &key).expect("vec write");
508                let value = format::read_bytes(&mut self.reader)?;
509                format::write_bytes(&mut payload, &value).expect("vec write");
510                let expire_ms = format::read_i64(&mut self.reader)?;
511                format::write_i64(&mut payload, expire_ms).expect("vec write");
512            }
513            TAG_DEL => {
514                let key = format::read_bytes(&mut self.reader)?;
515                format::write_bytes(&mut payload, &key).expect("vec write");
516            }
517            TAG_EXPIRE => {
518                let key = format::read_bytes(&mut self.reader)?;
519                format::write_bytes(&mut payload, &key).expect("vec write");
520                let seconds = format::read_i64(&mut self.reader)?;
521                format::write_i64(&mut payload, seconds).expect("vec write");
522            }
523            TAG_LPUSH | TAG_RPUSH => {
524                let key = format::read_bytes(&mut self.reader)?;
525                format::write_bytes(&mut payload, &key).expect("vec write");
526                let count = format::read_u32(&mut self.reader)?;
527                format::write_u32(&mut payload, count).expect("vec write");
528                for _ in 0..count {
529                    let val = format::read_bytes(&mut self.reader)?;
530                    format::write_bytes(&mut payload, &val).expect("vec write");
531                }
532            }
533            TAG_LPOP | TAG_RPOP => {
534                let key = format::read_bytes(&mut self.reader)?;
535                format::write_bytes(&mut payload, &key).expect("vec write");
536            }
537            TAG_ZADD => {
538                let key = format::read_bytes(&mut self.reader)?;
539                format::write_bytes(&mut payload, &key).expect("vec write");
540                let count = format::read_u32(&mut self.reader)?;
541                format::write_u32(&mut payload, count).expect("vec write");
542                for _ in 0..count {
543                    let score = format::read_f64(&mut self.reader)?;
544                    format::write_f64(&mut payload, score).expect("vec write");
545                    let member = format::read_bytes(&mut self.reader)?;
546                    format::write_bytes(&mut payload, &member).expect("vec write");
547                }
548            }
549            TAG_ZREM => {
550                let key = format::read_bytes(&mut self.reader)?;
551                format::write_bytes(&mut payload, &key).expect("vec write");
552                let count = format::read_u32(&mut self.reader)?;
553                format::write_u32(&mut payload, count).expect("vec write");
554                for _ in 0..count {
555                    let member = format::read_bytes(&mut self.reader)?;
556                    format::write_bytes(&mut payload, &member).expect("vec write");
557                }
558            }
559            TAG_PERSIST => {
560                let key = format::read_bytes(&mut self.reader)?;
561                format::write_bytes(&mut payload, &key).expect("vec write");
562            }
563            TAG_PEXPIRE => {
564                let key = format::read_bytes(&mut self.reader)?;
565                format::write_bytes(&mut payload, &key).expect("vec write");
566                let millis = format::read_i64(&mut self.reader)?;
567                format::write_i64(&mut payload, millis).expect("vec write");
568            }
569            TAG_INCR | TAG_DECR => {
570                let key = format::read_bytes(&mut self.reader)?;
571                format::write_bytes(&mut payload, &key).expect("vec write");
572            }
573            _ => return Err(FormatError::UnknownTag(tag)),
574        }
575        let stored_crc = format::read_u32(&mut self.reader)?;
576        Ok((payload, stored_crc))
577    }
578}
579
580/// Returns the AOF file path for a given shard in a data directory.
581pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
582    data_dir.join(format!("shard-{shard_id}.aof"))
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    fn temp_dir() -> tempfile::TempDir {
589        tempfile::tempdir().expect("create temp dir")
590    }
591
592    #[test]
593    fn record_round_trip_set() {
594        let rec = AofRecord::Set {
595            key: "hello".into(),
596            value: Bytes::from("world"),
597            expire_ms: 5000,
598        };
599        let bytes = rec.to_bytes();
600        let decoded = AofRecord::from_bytes(&bytes).unwrap();
601        assert_eq!(rec, decoded);
602    }
603
604    #[test]
605    fn record_round_trip_del() {
606        let rec = AofRecord::Del { key: "gone".into() };
607        let bytes = rec.to_bytes();
608        let decoded = AofRecord::from_bytes(&bytes).unwrap();
609        assert_eq!(rec, decoded);
610    }
611
612    #[test]
613    fn record_round_trip_expire() {
614        let rec = AofRecord::Expire {
615            key: "ttl".into(),
616            seconds: 300,
617        };
618        let bytes = rec.to_bytes();
619        let decoded = AofRecord::from_bytes(&bytes).unwrap();
620        assert_eq!(rec, decoded);
621    }
622
623    #[test]
624    fn set_with_no_expiry() {
625        let rec = AofRecord::Set {
626            key: "k".into(),
627            value: Bytes::from("v"),
628            expire_ms: -1,
629        };
630        let bytes = rec.to_bytes();
631        let decoded = AofRecord::from_bytes(&bytes).unwrap();
632        assert_eq!(rec, decoded);
633    }
634
635    #[test]
636    fn writer_reader_round_trip() {
637        let dir = temp_dir();
638        let path = dir.path().join("test.aof");
639
640        let records = vec![
641            AofRecord::Set {
642                key: "a".into(),
643                value: Bytes::from("1"),
644                expire_ms: -1,
645            },
646            AofRecord::Set {
647                key: "b".into(),
648                value: Bytes::from("2"),
649                expire_ms: 10_000,
650            },
651            AofRecord::Del { key: "a".into() },
652            AofRecord::Expire {
653                key: "b".into(),
654                seconds: 60,
655            },
656        ];
657
658        // write
659        {
660            let mut writer = AofWriter::open(&path).unwrap();
661            for rec in &records {
662                writer.write_record(rec).unwrap();
663            }
664            writer.sync().unwrap();
665        }
666
667        // read back
668        let mut reader = AofReader::open(&path).unwrap();
669        let mut got = Vec::new();
670        while let Some(rec) = reader.read_record().unwrap() {
671            got.push(rec);
672        }
673        assert_eq!(records, got);
674    }
675
676    #[test]
677    fn empty_aof_returns_no_records() {
678        let dir = temp_dir();
679        let path = dir.path().join("empty.aof");
680
681        // just write the header
682        {
683            let _writer = AofWriter::open(&path).unwrap();
684        }
685
686        let mut reader = AofReader::open(&path).unwrap();
687        assert!(reader.read_record().unwrap().is_none());
688    }
689
690    #[test]
691    fn truncated_record_treated_as_eof() {
692        let dir = temp_dir();
693        let path = dir.path().join("trunc.aof");
694
695        // write one good record, then append garbage (simulating a crash)
696        {
697            let mut writer = AofWriter::open(&path).unwrap();
698            writer
699                .write_record(&AofRecord::Set {
700                    key: "ok".into(),
701                    value: Bytes::from("good"),
702                    expire_ms: -1,
703                })
704                .unwrap();
705            writer.flush().unwrap();
706        }
707
708        // append a partial tag with no payload
709        {
710            let mut file = OpenOptions::new().append(true).open(&path).unwrap();
711            file.write_all(&[TAG_SET]).unwrap();
712        }
713
714        let mut reader = AofReader::open(&path).unwrap();
715        // first record should be fine
716        let rec = reader.read_record().unwrap().unwrap();
717        assert!(matches!(rec, AofRecord::Set { .. }));
718        // second should be None (truncated)
719        assert!(reader.read_record().unwrap().is_none());
720    }
721
722    #[test]
723    fn corrupt_crc_detected() {
724        let dir = temp_dir();
725        let path = dir.path().join("corrupt.aof");
726
727        {
728            let mut writer = AofWriter::open(&path).unwrap();
729            writer
730                .write_record(&AofRecord::Set {
731                    key: "k".into(),
732                    value: Bytes::from("v"),
733                    expire_ms: -1,
734                })
735                .unwrap();
736            writer.flush().unwrap();
737        }
738
739        // corrupt the last byte (part of the CRC)
740        let mut data = fs::read(&path).unwrap();
741        let last = data.len() - 1;
742        data[last] ^= 0xFF;
743        fs::write(&path, &data).unwrap();
744
745        let mut reader = AofReader::open(&path).unwrap();
746        let err = reader.read_record().unwrap_err();
747        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
748    }
749
750    #[test]
751    fn missing_magic_is_error() {
752        let dir = temp_dir();
753        let path = dir.path().join("bad.aof");
754        fs::write(&path, b"NOT_AOF_DATA").unwrap();
755
756        let err = AofReader::open(&path).unwrap_err();
757        assert!(matches!(err, FormatError::InvalidMagic));
758    }
759
760    #[test]
761    fn truncate_resets_aof() {
762        let dir = temp_dir();
763        let path = dir.path().join("reset.aof");
764
765        {
766            let mut writer = AofWriter::open(&path).unwrap();
767            writer
768                .write_record(&AofRecord::Set {
769                    key: "old".into(),
770                    value: Bytes::from("data"),
771                    expire_ms: -1,
772                })
773                .unwrap();
774            writer.truncate().unwrap();
775
776            // write a new record after truncation
777            writer
778                .write_record(&AofRecord::Set {
779                    key: "new".into(),
780                    value: Bytes::from("fresh"),
781                    expire_ms: -1,
782                })
783                .unwrap();
784            writer.sync().unwrap();
785        }
786
787        let mut reader = AofReader::open(&path).unwrap();
788        let rec = reader.read_record().unwrap().unwrap();
789        match rec {
790            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
791            other => panic!("expected Set, got {other:?}"),
792        }
793        // only one record after truncation
794        assert!(reader.read_record().unwrap().is_none());
795    }
796
797    #[test]
798    fn record_round_trip_lpush() {
799        let rec = AofRecord::LPush {
800            key: "list".into(),
801            values: vec![Bytes::from("a"), Bytes::from("b")],
802        };
803        let bytes = rec.to_bytes();
804        let decoded = AofRecord::from_bytes(&bytes).unwrap();
805        assert_eq!(rec, decoded);
806    }
807
808    #[test]
809    fn record_round_trip_rpush() {
810        let rec = AofRecord::RPush {
811            key: "list".into(),
812            values: vec![Bytes::from("x")],
813        };
814        let bytes = rec.to_bytes();
815        let decoded = AofRecord::from_bytes(&bytes).unwrap();
816        assert_eq!(rec, decoded);
817    }
818
819    #[test]
820    fn record_round_trip_lpop() {
821        let rec = AofRecord::LPop { key: "list".into() };
822        let bytes = rec.to_bytes();
823        let decoded = AofRecord::from_bytes(&bytes).unwrap();
824        assert_eq!(rec, decoded);
825    }
826
827    #[test]
828    fn record_round_trip_rpop() {
829        let rec = AofRecord::RPop { key: "list".into() };
830        let bytes = rec.to_bytes();
831        let decoded = AofRecord::from_bytes(&bytes).unwrap();
832        assert_eq!(rec, decoded);
833    }
834
835    #[test]
836    fn writer_reader_round_trip_with_list_records() {
837        let dir = temp_dir();
838        let path = dir.path().join("list.aof");
839
840        let records = vec![
841            AofRecord::LPush {
842                key: "l".into(),
843                values: vec![Bytes::from("a"), Bytes::from("b")],
844            },
845            AofRecord::RPush {
846                key: "l".into(),
847                values: vec![Bytes::from("c")],
848            },
849            AofRecord::LPop { key: "l".into() },
850            AofRecord::RPop { key: "l".into() },
851        ];
852
853        {
854            let mut writer = AofWriter::open(&path).unwrap();
855            for rec in &records {
856                writer.write_record(rec).unwrap();
857            }
858            writer.sync().unwrap();
859        }
860
861        let mut reader = AofReader::open(&path).unwrap();
862        let mut got = Vec::new();
863        while let Some(rec) = reader.read_record().unwrap() {
864            got.push(rec);
865        }
866        assert_eq!(records, got);
867    }
868
869    #[test]
870    fn record_round_trip_zadd() {
871        let rec = AofRecord::ZAdd {
872            key: "board".into(),
873            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
874        };
875        let bytes = rec.to_bytes();
876        let decoded = AofRecord::from_bytes(&bytes).unwrap();
877        assert_eq!(rec, decoded);
878    }
879
880    #[test]
881    fn record_round_trip_zrem() {
882        let rec = AofRecord::ZRem {
883            key: "board".into(),
884            members: vec!["alice".into(), "bob".into()],
885        };
886        let bytes = rec.to_bytes();
887        let decoded = AofRecord::from_bytes(&bytes).unwrap();
888        assert_eq!(rec, decoded);
889    }
890
891    #[test]
892    fn writer_reader_round_trip_with_sorted_set_records() {
893        let dir = temp_dir();
894        let path = dir.path().join("zset.aof");
895
896        let records = vec![
897            AofRecord::ZAdd {
898                key: "board".into(),
899                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
900            },
901            AofRecord::ZRem {
902                key: "board".into(),
903                members: vec!["alice".into()],
904            },
905        ];
906
907        {
908            let mut writer = AofWriter::open(&path).unwrap();
909            for rec in &records {
910                writer.write_record(rec).unwrap();
911            }
912            writer.sync().unwrap();
913        }
914
915        let mut reader = AofReader::open(&path).unwrap();
916        let mut got = Vec::new();
917        while let Some(rec) = reader.read_record().unwrap() {
918            got.push(rec);
919        }
920        assert_eq!(records, got);
921    }
922
923    #[test]
924    fn record_round_trip_persist() {
925        let rec = AofRecord::Persist {
926            key: "mykey".into(),
927        };
928        let bytes = rec.to_bytes();
929        let decoded = AofRecord::from_bytes(&bytes).unwrap();
930        assert_eq!(rec, decoded);
931    }
932
933    #[test]
934    fn record_round_trip_pexpire() {
935        let rec = AofRecord::Pexpire {
936            key: "mykey".into(),
937            milliseconds: 5000,
938        };
939        let bytes = rec.to_bytes();
940        let decoded = AofRecord::from_bytes(&bytes).unwrap();
941        assert_eq!(rec, decoded);
942    }
943
944    #[test]
945    fn record_round_trip_incr() {
946        let rec = AofRecord::Incr {
947            key: "counter".into(),
948        };
949        let bytes = rec.to_bytes();
950        let decoded = AofRecord::from_bytes(&bytes).unwrap();
951        assert_eq!(rec, decoded);
952    }
953
954    #[test]
955    fn record_round_trip_decr() {
956        let rec = AofRecord::Decr {
957            key: "counter".into(),
958        };
959        let bytes = rec.to_bytes();
960        let decoded = AofRecord::from_bytes(&bytes).unwrap();
961        assert_eq!(rec, decoded);
962    }
963
964    #[test]
965    fn writer_reader_round_trip_with_persist_pexpire() {
966        let dir = temp_dir();
967        let path = dir.path().join("persist_pexpire.aof");
968
969        let records = vec![
970            AofRecord::Set {
971                key: "k".into(),
972                value: Bytes::from("v"),
973                expire_ms: 5000,
974            },
975            AofRecord::Persist { key: "k".into() },
976            AofRecord::Pexpire {
977                key: "k".into(),
978                milliseconds: 3000,
979            },
980        ];
981
982        {
983            let mut writer = AofWriter::open(&path).unwrap();
984            for rec in &records {
985                writer.write_record(rec).unwrap();
986            }
987            writer.sync().unwrap();
988        }
989
990        let mut reader = AofReader::open(&path).unwrap();
991        let mut got = Vec::new();
992        while let Some(rec) = reader.read_record().unwrap() {
993            got.push(rec);
994        }
995        assert_eq!(records, got);
996    }
997
998    #[test]
999    fn aof_path_format() {
1000        let p = aof_path(Path::new("/data"), 3);
1001        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1002    }
1003
1004    #[test]
1005    fn record_round_trip_hset() {
1006        let rec = AofRecord::HSet {
1007            key: "hash".into(),
1008            fields: vec![
1009                ("f1".into(), Bytes::from("v1")),
1010                ("f2".into(), Bytes::from("v2")),
1011            ],
1012        };
1013        let bytes = rec.to_bytes();
1014        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1015        assert_eq!(rec, decoded);
1016    }
1017
1018    #[test]
1019    fn record_round_trip_hdel() {
1020        let rec = AofRecord::HDel {
1021            key: "hash".into(),
1022            fields: vec!["f1".into(), "f2".into()],
1023        };
1024        let bytes = rec.to_bytes();
1025        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1026        assert_eq!(rec, decoded);
1027    }
1028
1029    #[test]
1030    fn record_round_trip_hincrby() {
1031        let rec = AofRecord::HIncrBy {
1032            key: "hash".into(),
1033            field: "counter".into(),
1034            delta: -42,
1035        };
1036        let bytes = rec.to_bytes();
1037        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1038        assert_eq!(rec, decoded);
1039    }
1040
1041    #[test]
1042    fn record_round_trip_sadd() {
1043        let rec = AofRecord::SAdd {
1044            key: "set".into(),
1045            members: vec!["m1".into(), "m2".into(), "m3".into()],
1046        };
1047        let bytes = rec.to_bytes();
1048        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1049        assert_eq!(rec, decoded);
1050    }
1051
1052    #[test]
1053    fn record_round_trip_srem() {
1054        let rec = AofRecord::SRem {
1055            key: "set".into(),
1056            members: vec!["m1".into()],
1057        };
1058        let bytes = rec.to_bytes();
1059        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1060        assert_eq!(rec, decoded);
1061    }
1062}