Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27/// Reads a length-prefixed field and decodes it as UTF-8.
28fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29    let bytes = format::read_bytes(r)?;
30    String::from_utf8(bytes).map_err(|_| {
31        FormatError::Io(io::Error::new(
32            io::ErrorKind::InvalidData,
33            format!("{field} is not valid utf-8"),
34        ))
35    })
36}
37
38/// Record tags for the AOF format.
39const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57const TAG_INCRBY: u8 = 19;
58const TAG_DECRBY: u8 = 20;
59const TAG_APPEND: u8 = 21;
60const TAG_RENAME: u8 = 22;
61
62/// A single mutation record stored in the AOF.
63#[derive(Debug, Clone, PartialEq)]
64pub enum AofRecord {
65    /// SET key value \[expire_ms\]. expire_ms is -1 for no expiration.
66    Set {
67        key: String,
68        value: Bytes,
69        expire_ms: i64,
70    },
71    /// DEL key.
72    Del { key: String },
73    /// EXPIRE key seconds.
74    Expire { key: String, seconds: u64 },
75    /// LPUSH key value [value ...].
76    LPush { key: String, values: Vec<Bytes> },
77    /// RPUSH key value [value ...].
78    RPush { key: String, values: Vec<Bytes> },
79    /// LPOP key.
80    LPop { key: String },
81    /// RPOP key.
82    RPop { key: String },
83    /// ZADD key score member [score member ...].
84    ZAdd {
85        key: String,
86        members: Vec<(f64, String)>,
87    },
88    /// ZREM key member [member ...].
89    ZRem { key: String, members: Vec<String> },
90    /// PERSIST key — remove expiration.
91    Persist { key: String },
92    /// PEXPIRE key milliseconds.
93    Pexpire { key: String, milliseconds: u64 },
94    /// INCR key.
95    Incr { key: String },
96    /// DECR key.
97    Decr { key: String },
98    /// HSET key field value [field value ...].
99    HSet {
100        key: String,
101        fields: Vec<(String, Bytes)>,
102    },
103    /// HDEL key field [field ...].
104    HDel { key: String, fields: Vec<String> },
105    /// HINCRBY key field delta.
106    HIncrBy {
107        key: String,
108        field: String,
109        delta: i64,
110    },
111    /// SADD key member [member ...].
112    SAdd { key: String, members: Vec<String> },
113    /// SREM key member [member ...].
114    SRem { key: String, members: Vec<String> },
115    /// INCRBY key delta.
116    IncrBy { key: String, delta: i64 },
117    /// DECRBY key delta.
118    DecrBy { key: String, delta: i64 },
119    /// APPEND key value.
120    Append { key: String, value: Bytes },
121    /// RENAME key newkey.
122    Rename { key: String, newkey: String },
123}
124
125impl AofRecord {
126    /// Serializes this record into a byte vector (tag + payload, no CRC).
127    fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
128        let mut buf = Vec::new();
129        match self {
130            AofRecord::Set {
131                key,
132                value,
133                expire_ms,
134            } => {
135                format::write_u8(&mut buf, TAG_SET)?;
136                format::write_bytes(&mut buf, key.as_bytes())?;
137                format::write_bytes(&mut buf, value)?;
138                format::write_i64(&mut buf, *expire_ms)?;
139            }
140            AofRecord::Del { key } => {
141                format::write_u8(&mut buf, TAG_DEL)?;
142                format::write_bytes(&mut buf, key.as_bytes())?;
143            }
144            AofRecord::Expire { key, seconds } => {
145                format::write_u8(&mut buf, TAG_EXPIRE)?;
146                format::write_bytes(&mut buf, key.as_bytes())?;
147                format::write_i64(&mut buf, *seconds as i64)?;
148            }
149            AofRecord::LPush { key, values } => {
150                format::write_u8(&mut buf, TAG_LPUSH)?;
151                format::write_bytes(&mut buf, key.as_bytes())?;
152                format::write_u32(&mut buf, values.len() as u32)?;
153                for v in values {
154                    format::write_bytes(&mut buf, v)?;
155                }
156            }
157            AofRecord::RPush { key, values } => {
158                format::write_u8(&mut buf, TAG_RPUSH)?;
159                format::write_bytes(&mut buf, key.as_bytes())?;
160                format::write_u32(&mut buf, values.len() as u32)?;
161                for v in values {
162                    format::write_bytes(&mut buf, v)?;
163                }
164            }
165            AofRecord::LPop { key } => {
166                format::write_u8(&mut buf, TAG_LPOP)?;
167                format::write_bytes(&mut buf, key.as_bytes())?;
168            }
169            AofRecord::RPop { key } => {
170                format::write_u8(&mut buf, TAG_RPOP)?;
171                format::write_bytes(&mut buf, key.as_bytes())?;
172            }
173            AofRecord::ZAdd { key, members } => {
174                format::write_u8(&mut buf, TAG_ZADD)?;
175                format::write_bytes(&mut buf, key.as_bytes())?;
176                format::write_u32(&mut buf, members.len() as u32)?;
177                for (score, member) in members {
178                    format::write_f64(&mut buf, *score)?;
179                    format::write_bytes(&mut buf, member.as_bytes())?;
180                }
181            }
182            AofRecord::ZRem { key, members } => {
183                format::write_u8(&mut buf, TAG_ZREM)?;
184                format::write_bytes(&mut buf, key.as_bytes())?;
185                format::write_u32(&mut buf, members.len() as u32)?;
186                for member in members {
187                    format::write_bytes(&mut buf, member.as_bytes())?;
188                }
189            }
190            AofRecord::Persist { key } => {
191                format::write_u8(&mut buf, TAG_PERSIST)?;
192                format::write_bytes(&mut buf, key.as_bytes())?;
193            }
194            AofRecord::Pexpire { key, milliseconds } => {
195                format::write_u8(&mut buf, TAG_PEXPIRE)?;
196                format::write_bytes(&mut buf, key.as_bytes())?;
197                format::write_i64(&mut buf, *milliseconds as i64)?;
198            }
199            AofRecord::Incr { key } => {
200                format::write_u8(&mut buf, TAG_INCR)?;
201                format::write_bytes(&mut buf, key.as_bytes())?;
202            }
203            AofRecord::Decr { key } => {
204                format::write_u8(&mut buf, TAG_DECR)?;
205                format::write_bytes(&mut buf, key.as_bytes())?;
206            }
207            AofRecord::HSet { key, fields } => {
208                format::write_u8(&mut buf, TAG_HSET)?;
209                format::write_bytes(&mut buf, key.as_bytes())?;
210                format::write_u32(&mut buf, fields.len() as u32)?;
211                for (field, value) in fields {
212                    format::write_bytes(&mut buf, field.as_bytes())?;
213                    format::write_bytes(&mut buf, value)?;
214                }
215            }
216            AofRecord::HDel { key, fields } => {
217                format::write_u8(&mut buf, TAG_HDEL)?;
218                format::write_bytes(&mut buf, key.as_bytes())?;
219                format::write_u32(&mut buf, fields.len() as u32)?;
220                for field in fields {
221                    format::write_bytes(&mut buf, field.as_bytes())?;
222                }
223            }
224            AofRecord::HIncrBy { key, field, delta } => {
225                format::write_u8(&mut buf, TAG_HINCRBY)?;
226                format::write_bytes(&mut buf, key.as_bytes())?;
227                format::write_bytes(&mut buf, field.as_bytes())?;
228                format::write_i64(&mut buf, *delta)?;
229            }
230            AofRecord::SAdd { key, members } => {
231                format::write_u8(&mut buf, TAG_SADD)?;
232                format::write_bytes(&mut buf, key.as_bytes())?;
233                format::write_u32(&mut buf, members.len() as u32)?;
234                for member in members {
235                    format::write_bytes(&mut buf, member.as_bytes())?;
236                }
237            }
238            AofRecord::SRem { key, members } => {
239                format::write_u8(&mut buf, TAG_SREM)?;
240                format::write_bytes(&mut buf, key.as_bytes())?;
241                format::write_u32(&mut buf, members.len() as u32)?;
242                for member in members {
243                    format::write_bytes(&mut buf, member.as_bytes())?;
244                }
245            }
246            AofRecord::IncrBy { key, delta } => {
247                format::write_u8(&mut buf, TAG_INCRBY)?;
248                format::write_bytes(&mut buf, key.as_bytes())?;
249                format::write_i64(&mut buf, *delta)?;
250            }
251            AofRecord::DecrBy { key, delta } => {
252                format::write_u8(&mut buf, TAG_DECRBY)?;
253                format::write_bytes(&mut buf, key.as_bytes())?;
254                format::write_i64(&mut buf, *delta)?;
255            }
256            AofRecord::Append { key, value } => {
257                format::write_u8(&mut buf, TAG_APPEND)?;
258                format::write_bytes(&mut buf, key.as_bytes())?;
259                format::write_bytes(&mut buf, value)?;
260            }
261            AofRecord::Rename { key, newkey } => {
262                format::write_u8(&mut buf, TAG_RENAME)?;
263                format::write_bytes(&mut buf, key.as_bytes())?;
264                format::write_bytes(&mut buf, newkey.as_bytes())?;
265            }
266        }
267        Ok(buf)
268    }
269
270    /// Deserializes a record from a byte slice (tag + payload, no CRC).
271    fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
272        let mut cursor = io::Cursor::new(data);
273        let tag = format::read_u8(&mut cursor)?;
274        match tag {
275            TAG_SET => {
276                let key = read_string(&mut cursor, "key")?;
277                let value = format::read_bytes(&mut cursor)?;
278                let expire_ms = format::read_i64(&mut cursor)?;
279                Ok(AofRecord::Set {
280                    key,
281                    value: Bytes::from(value),
282                    expire_ms,
283                })
284            }
285            TAG_DEL => {
286                let key = read_string(&mut cursor, "key")?;
287                Ok(AofRecord::Del { key })
288            }
289            TAG_EXPIRE => {
290                let key = read_string(&mut cursor, "key")?;
291                let seconds = format::read_i64(&mut cursor)? as u64;
292                Ok(AofRecord::Expire { key, seconds })
293            }
294            TAG_LPUSH | TAG_RPUSH => {
295                let key = read_string(&mut cursor, "key")?;
296                let count = format::read_u32(&mut cursor)?;
297                let mut values = Vec::with_capacity(count as usize);
298                for _ in 0..count {
299                    values.push(Bytes::from(format::read_bytes(&mut cursor)?));
300                }
301                if tag == TAG_LPUSH {
302                    Ok(AofRecord::LPush { key, values })
303                } else {
304                    Ok(AofRecord::RPush { key, values })
305                }
306            }
307            TAG_LPOP => {
308                let key = read_string(&mut cursor, "key")?;
309                Ok(AofRecord::LPop { key })
310            }
311            TAG_RPOP => {
312                let key = read_string(&mut cursor, "key")?;
313                Ok(AofRecord::RPop { key })
314            }
315            TAG_ZADD => {
316                let key = read_string(&mut cursor, "key")?;
317                let count = format::read_u32(&mut cursor)?;
318                let mut members = Vec::with_capacity(count as usize);
319                for _ in 0..count {
320                    let score = format::read_f64(&mut cursor)?;
321                    let member = read_string(&mut cursor, "member")?;
322                    members.push((score, member));
323                }
324                Ok(AofRecord::ZAdd { key, members })
325            }
326            TAG_ZREM => {
327                let key = read_string(&mut cursor, "key")?;
328                let count = format::read_u32(&mut cursor)?;
329                let mut members = Vec::with_capacity(count as usize);
330                for _ in 0..count {
331                    members.push(read_string(&mut cursor, "member")?);
332                }
333                Ok(AofRecord::ZRem { key, members })
334            }
335            TAG_PERSIST => {
336                let key = read_string(&mut cursor, "key")?;
337                Ok(AofRecord::Persist { key })
338            }
339            TAG_PEXPIRE => {
340                let key = read_string(&mut cursor, "key")?;
341                let milliseconds = format::read_i64(&mut cursor)? as u64;
342                Ok(AofRecord::Pexpire { key, milliseconds })
343            }
344            TAG_INCR => {
345                let key = read_string(&mut cursor, "key")?;
346                Ok(AofRecord::Incr { key })
347            }
348            TAG_DECR => {
349                let key = read_string(&mut cursor, "key")?;
350                Ok(AofRecord::Decr { key })
351            }
352            TAG_HSET => {
353                let key = read_string(&mut cursor, "key")?;
354                let count = format::read_u32(&mut cursor)?;
355                let mut fields = Vec::with_capacity(count as usize);
356                for _ in 0..count {
357                    let field = read_string(&mut cursor, "field")?;
358                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
359                    fields.push((field, value));
360                }
361                Ok(AofRecord::HSet { key, fields })
362            }
363            TAG_HDEL => {
364                let key = read_string(&mut cursor, "key")?;
365                let count = format::read_u32(&mut cursor)?;
366                let mut fields = Vec::with_capacity(count as usize);
367                for _ in 0..count {
368                    fields.push(read_string(&mut cursor, "field")?);
369                }
370                Ok(AofRecord::HDel { key, fields })
371            }
372            TAG_HINCRBY => {
373                let key = read_string(&mut cursor, "key")?;
374                let field = read_string(&mut cursor, "field")?;
375                let delta = format::read_i64(&mut cursor)?;
376                Ok(AofRecord::HIncrBy { key, field, delta })
377            }
378            TAG_SADD => {
379                let key = read_string(&mut cursor, "key")?;
380                let count = format::read_u32(&mut cursor)?;
381                let mut members = Vec::with_capacity(count as usize);
382                for _ in 0..count {
383                    members.push(read_string(&mut cursor, "member")?);
384                }
385                Ok(AofRecord::SAdd { key, members })
386            }
387            TAG_SREM => {
388                let key = read_string(&mut cursor, "key")?;
389                let count = format::read_u32(&mut cursor)?;
390                let mut members = Vec::with_capacity(count as usize);
391                for _ in 0..count {
392                    members.push(read_string(&mut cursor, "member")?);
393                }
394                Ok(AofRecord::SRem { key, members })
395            }
396            TAG_INCRBY => {
397                let key = read_string(&mut cursor, "key")?;
398                let delta = format::read_i64(&mut cursor)?;
399                Ok(AofRecord::IncrBy { key, delta })
400            }
401            TAG_DECRBY => {
402                let key = read_string(&mut cursor, "key")?;
403                let delta = format::read_i64(&mut cursor)?;
404                Ok(AofRecord::DecrBy { key, delta })
405            }
406            TAG_APPEND => {
407                let key = read_string(&mut cursor, "key")?;
408                let value = Bytes::from(format::read_bytes(&mut cursor)?);
409                Ok(AofRecord::Append { key, value })
410            }
411            TAG_RENAME => {
412                let key = read_string(&mut cursor, "key")?;
413                let newkey = read_string(&mut cursor, "newkey")?;
414                Ok(AofRecord::Rename { key, newkey })
415            }
416            _ => Err(FormatError::UnknownTag(tag)),
417        }
418    }
419}
420
421/// Configurable fsync policy for the AOF writer.
422#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
423pub enum FsyncPolicy {
424    /// fsync after every write. safest, slowest.
425    Always,
426    /// fsync once per second. the shard tick drives this.
427    #[default]
428    EverySec,
429    /// let the OS decide when to flush. fastest, least durable.
430    No,
431}
432
433/// Buffered writer for appending AOF records to a file.
434pub struct AofWriter {
435    writer: BufWriter<File>,
436    path: PathBuf,
437}
438
439impl AofWriter {
440    /// Opens (or creates) an AOF file. If the file is new, writes the header.
441    /// If the file already exists, appends to it.
442    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
443        let path = path.into();
444        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
445
446        let file = OpenOptions::new().create(true).append(true).open(&path)?;
447        let mut writer = BufWriter::new(file);
448
449        if !exists {
450            format::write_header(&mut writer, format::AOF_MAGIC)?;
451            writer.flush()?;
452        }
453
454        Ok(Self { writer, path })
455    }
456
457    /// Appends a record to the AOF. Writes tag+payload+crc32.
458    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
459        let payload = record.to_bytes()?;
460        let checksum = format::crc32(&payload);
461        self.writer.write_all(&payload)?;
462        format::write_u32(&mut self.writer, checksum)?;
463        Ok(())
464    }
465
466    /// Flushes the internal buffer to the OS.
467    pub fn flush(&mut self) -> Result<(), FormatError> {
468        self.writer.flush()?;
469        Ok(())
470    }
471
472    /// Flushes and fsyncs the file to disk.
473    pub fn sync(&mut self) -> Result<(), FormatError> {
474        self.writer.flush()?;
475        self.writer.get_ref().sync_all()?;
476        Ok(())
477    }
478
479    /// Returns the file path.
480    pub fn path(&self) -> &Path {
481        &self.path
482    }
483
484    /// Truncates the AOF file back to just the header.
485    /// Used after a successful snapshot to reset the log.
486    pub fn truncate(&mut self) -> Result<(), FormatError> {
487        // flush and drop the old writer
488        self.writer.flush()?;
489
490        // reopen the file with truncation, write fresh header
491        let file = OpenOptions::new()
492            .create(true)
493            .write(true)
494            .truncate(true)
495            .open(&self.path)?;
496        let mut writer = BufWriter::new(file);
497        format::write_header(&mut writer, format::AOF_MAGIC)?;
498        writer.flush()?;
499        // ensure the fresh header is durable before we start appending
500        writer.get_ref().sync_all()?;
501        self.writer = writer;
502        Ok(())
503    }
504}
505
506/// Reader for iterating over AOF records.
507#[derive(Debug)]
508pub struct AofReader {
509    reader: BufReader<File>,
510}
511
512impl AofReader {
513    /// Opens an AOF file and validates the header.
514    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
515        let file = File::open(path.as_ref())?;
516        let mut reader = BufReader::new(file);
517        let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
518        Ok(Self { reader })
519    }
520
521    /// Reads the next record from the AOF.
522    ///
523    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
524    /// server crashed mid-write), returns `Ok(None)` rather than an error
525    /// — this is the expected recovery behavior.
526    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
527        // peek for EOF — try reading the tag byte
528        let tag = match format::read_u8(&mut self.reader) {
529            Ok(t) => t,
530            Err(FormatError::UnexpectedEof) => return Ok(None),
531            Err(e) => return Err(e),
532        };
533
534        // read the rest of the payload based on tag, building the full
535        // record bytes for CRC verification
536        let record_result = self.read_payload_for_tag(tag);
537        match record_result {
538            Ok((payload, stored_crc)) => {
539                // prepend the tag to the payload for CRC check
540                let mut full = Vec::with_capacity(1 + payload.len());
541                full.push(tag);
542                full.extend_from_slice(&payload);
543                format::verify_crc32(&full, stored_crc)?;
544                AofRecord::from_bytes(&full).map(Some)
545            }
546            // truncated record — treat as end of usable data
547            Err(FormatError::UnexpectedEof) => Ok(None),
548            Err(e) => Err(e),
549        }
550    }
551
552    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
553    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
554        let mut payload = Vec::new();
555        match tag {
556            TAG_SET => {
557                // key_len + key + value_len + value + expire_ms
558                let key = format::read_bytes(&mut self.reader)?;
559                format::write_bytes(&mut payload, &key)?;
560                let value = format::read_bytes(&mut self.reader)?;
561                format::write_bytes(&mut payload, &value)?;
562                let expire_ms = format::read_i64(&mut self.reader)?;
563                format::write_i64(&mut payload, expire_ms)?;
564            }
565            TAG_DEL => {
566                let key = format::read_bytes(&mut self.reader)?;
567                format::write_bytes(&mut payload, &key)?;
568            }
569            TAG_EXPIRE => {
570                let key = format::read_bytes(&mut self.reader)?;
571                format::write_bytes(&mut payload, &key)?;
572                let seconds = format::read_i64(&mut self.reader)?;
573                format::write_i64(&mut payload, seconds)?;
574            }
575            TAG_LPUSH | TAG_RPUSH => {
576                let key = format::read_bytes(&mut self.reader)?;
577                format::write_bytes(&mut payload, &key)?;
578                let count = format::read_u32(&mut self.reader)?;
579                format::write_u32(&mut payload, count)?;
580                for _ in 0..count {
581                    let val = format::read_bytes(&mut self.reader)?;
582                    format::write_bytes(&mut payload, &val)?;
583                }
584            }
585            TAG_LPOP | TAG_RPOP => {
586                let key = format::read_bytes(&mut self.reader)?;
587                format::write_bytes(&mut payload, &key)?;
588            }
589            TAG_ZADD => {
590                let key = format::read_bytes(&mut self.reader)?;
591                format::write_bytes(&mut payload, &key)?;
592                let count = format::read_u32(&mut self.reader)?;
593                format::write_u32(&mut payload, count)?;
594                for _ in 0..count {
595                    let score = format::read_f64(&mut self.reader)?;
596                    format::write_f64(&mut payload, score)?;
597                    let member = format::read_bytes(&mut self.reader)?;
598                    format::write_bytes(&mut payload, &member)?;
599                }
600            }
601            TAG_ZREM => {
602                let key = format::read_bytes(&mut self.reader)?;
603                format::write_bytes(&mut payload, &key)?;
604                let count = format::read_u32(&mut self.reader)?;
605                format::write_u32(&mut payload, count)?;
606                for _ in 0..count {
607                    let member = format::read_bytes(&mut self.reader)?;
608                    format::write_bytes(&mut payload, &member)?;
609                }
610            }
611            TAG_PERSIST => {
612                let key = format::read_bytes(&mut self.reader)?;
613                format::write_bytes(&mut payload, &key)?;
614            }
615            TAG_PEXPIRE => {
616                let key = format::read_bytes(&mut self.reader)?;
617                format::write_bytes(&mut payload, &key)?;
618                let millis = format::read_i64(&mut self.reader)?;
619                format::write_i64(&mut payload, millis)?;
620            }
621            TAG_INCR | TAG_DECR => {
622                let key = format::read_bytes(&mut self.reader)?;
623                format::write_bytes(&mut payload, &key)?;
624            }
625            _ => return Err(FormatError::UnknownTag(tag)),
626        }
627        let stored_crc = format::read_u32(&mut self.reader)?;
628        Ok((payload, stored_crc))
629    }
630}
631
632/// Returns the AOF file path for a given shard in a data directory.
633pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
634    data_dir.join(format!("shard-{shard_id}.aof"))
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    fn temp_dir() -> tempfile::TempDir {
641        tempfile::tempdir().expect("create temp dir")
642    }
643
644    #[test]
645    fn record_round_trip_set() {
646        let rec = AofRecord::Set {
647            key: "hello".into(),
648            value: Bytes::from("world"),
649            expire_ms: 5000,
650        };
651        let bytes = rec.to_bytes().unwrap();
652        let decoded = AofRecord::from_bytes(&bytes).unwrap();
653        assert_eq!(rec, decoded);
654    }
655
656    #[test]
657    fn record_round_trip_del() {
658        let rec = AofRecord::Del { key: "gone".into() };
659        let bytes = rec.to_bytes().unwrap();
660        let decoded = AofRecord::from_bytes(&bytes).unwrap();
661        assert_eq!(rec, decoded);
662    }
663
664    #[test]
665    fn record_round_trip_expire() {
666        let rec = AofRecord::Expire {
667            key: "ttl".into(),
668            seconds: 300,
669        };
670        let bytes = rec.to_bytes().unwrap();
671        let decoded = AofRecord::from_bytes(&bytes).unwrap();
672        assert_eq!(rec, decoded);
673    }
674
675    #[test]
676    fn set_with_no_expiry() {
677        let rec = AofRecord::Set {
678            key: "k".into(),
679            value: Bytes::from("v"),
680            expire_ms: -1,
681        };
682        let bytes = rec.to_bytes().unwrap();
683        let decoded = AofRecord::from_bytes(&bytes).unwrap();
684        assert_eq!(rec, decoded);
685    }
686
687    #[test]
688    fn writer_reader_round_trip() {
689        let dir = temp_dir();
690        let path = dir.path().join("test.aof");
691
692        let records = vec![
693            AofRecord::Set {
694                key: "a".into(),
695                value: Bytes::from("1"),
696                expire_ms: -1,
697            },
698            AofRecord::Set {
699                key: "b".into(),
700                value: Bytes::from("2"),
701                expire_ms: 10_000,
702            },
703            AofRecord::Del { key: "a".into() },
704            AofRecord::Expire {
705                key: "b".into(),
706                seconds: 60,
707            },
708        ];
709
710        // write
711        {
712            let mut writer = AofWriter::open(&path).unwrap();
713            for rec in &records {
714                writer.write_record(rec).unwrap();
715            }
716            writer.sync().unwrap();
717        }
718
719        // read back
720        let mut reader = AofReader::open(&path).unwrap();
721        let mut got = Vec::new();
722        while let Some(rec) = reader.read_record().unwrap() {
723            got.push(rec);
724        }
725        assert_eq!(records, got);
726    }
727
728    #[test]
729    fn empty_aof_returns_no_records() {
730        let dir = temp_dir();
731        let path = dir.path().join("empty.aof");
732
733        // just write the header
734        {
735            let _writer = AofWriter::open(&path).unwrap();
736        }
737
738        let mut reader = AofReader::open(&path).unwrap();
739        assert!(reader.read_record().unwrap().is_none());
740    }
741
742    #[test]
743    fn truncated_record_treated_as_eof() {
744        let dir = temp_dir();
745        let path = dir.path().join("trunc.aof");
746
747        // write one good record, then append garbage (simulating a crash)
748        {
749            let mut writer = AofWriter::open(&path).unwrap();
750            writer
751                .write_record(&AofRecord::Set {
752                    key: "ok".into(),
753                    value: Bytes::from("good"),
754                    expire_ms: -1,
755                })
756                .unwrap();
757            writer.flush().unwrap();
758        }
759
760        // append a partial tag with no payload
761        {
762            let mut file = OpenOptions::new().append(true).open(&path).unwrap();
763            file.write_all(&[TAG_SET]).unwrap();
764        }
765
766        let mut reader = AofReader::open(&path).unwrap();
767        // first record should be fine
768        let rec = reader.read_record().unwrap().unwrap();
769        assert!(matches!(rec, AofRecord::Set { .. }));
770        // second should be None (truncated)
771        assert!(reader.read_record().unwrap().is_none());
772    }
773
774    #[test]
775    fn corrupt_crc_detected() {
776        let dir = temp_dir();
777        let path = dir.path().join("corrupt.aof");
778
779        {
780            let mut writer = AofWriter::open(&path).unwrap();
781            writer
782                .write_record(&AofRecord::Set {
783                    key: "k".into(),
784                    value: Bytes::from("v"),
785                    expire_ms: -1,
786                })
787                .unwrap();
788            writer.flush().unwrap();
789        }
790
791        // corrupt the last byte (part of the CRC)
792        let mut data = fs::read(&path).unwrap();
793        let last = data.len() - 1;
794        data[last] ^= 0xFF;
795        fs::write(&path, &data).unwrap();
796
797        let mut reader = AofReader::open(&path).unwrap();
798        let err = reader.read_record().unwrap_err();
799        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
800    }
801
802    #[test]
803    fn missing_magic_is_error() {
804        let dir = temp_dir();
805        let path = dir.path().join("bad.aof");
806        fs::write(&path, b"NOT_AOF_DATA").unwrap();
807
808        let err = AofReader::open(&path).unwrap_err();
809        assert!(matches!(err, FormatError::InvalidMagic));
810    }
811
812    #[test]
813    fn truncate_resets_aof() {
814        let dir = temp_dir();
815        let path = dir.path().join("reset.aof");
816
817        {
818            let mut writer = AofWriter::open(&path).unwrap();
819            writer
820                .write_record(&AofRecord::Set {
821                    key: "old".into(),
822                    value: Bytes::from("data"),
823                    expire_ms: -1,
824                })
825                .unwrap();
826            writer.truncate().unwrap();
827
828            // write a new record after truncation
829            writer
830                .write_record(&AofRecord::Set {
831                    key: "new".into(),
832                    value: Bytes::from("fresh"),
833                    expire_ms: -1,
834                })
835                .unwrap();
836            writer.sync().unwrap();
837        }
838
839        let mut reader = AofReader::open(&path).unwrap();
840        let rec = reader.read_record().unwrap().unwrap();
841        match rec {
842            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
843            other => panic!("expected Set, got {other:?}"),
844        }
845        // only one record after truncation
846        assert!(reader.read_record().unwrap().is_none());
847    }
848
849    #[test]
850    fn record_round_trip_lpush() {
851        let rec = AofRecord::LPush {
852            key: "list".into(),
853            values: vec![Bytes::from("a"), Bytes::from("b")],
854        };
855        let bytes = rec.to_bytes().unwrap();
856        let decoded = AofRecord::from_bytes(&bytes).unwrap();
857        assert_eq!(rec, decoded);
858    }
859
860    #[test]
861    fn record_round_trip_rpush() {
862        let rec = AofRecord::RPush {
863            key: "list".into(),
864            values: vec![Bytes::from("x")],
865        };
866        let bytes = rec.to_bytes().unwrap();
867        let decoded = AofRecord::from_bytes(&bytes).unwrap();
868        assert_eq!(rec, decoded);
869    }
870
871    #[test]
872    fn record_round_trip_lpop() {
873        let rec = AofRecord::LPop { key: "list".into() };
874        let bytes = rec.to_bytes().unwrap();
875        let decoded = AofRecord::from_bytes(&bytes).unwrap();
876        assert_eq!(rec, decoded);
877    }
878
879    #[test]
880    fn record_round_trip_rpop() {
881        let rec = AofRecord::RPop { key: "list".into() };
882        let bytes = rec.to_bytes().unwrap();
883        let decoded = AofRecord::from_bytes(&bytes).unwrap();
884        assert_eq!(rec, decoded);
885    }
886
887    #[test]
888    fn writer_reader_round_trip_with_list_records() {
889        let dir = temp_dir();
890        let path = dir.path().join("list.aof");
891
892        let records = vec![
893            AofRecord::LPush {
894                key: "l".into(),
895                values: vec![Bytes::from("a"), Bytes::from("b")],
896            },
897            AofRecord::RPush {
898                key: "l".into(),
899                values: vec![Bytes::from("c")],
900            },
901            AofRecord::LPop { key: "l".into() },
902            AofRecord::RPop { key: "l".into() },
903        ];
904
905        {
906            let mut writer = AofWriter::open(&path).unwrap();
907            for rec in &records {
908                writer.write_record(rec).unwrap();
909            }
910            writer.sync().unwrap();
911        }
912
913        let mut reader = AofReader::open(&path).unwrap();
914        let mut got = Vec::new();
915        while let Some(rec) = reader.read_record().unwrap() {
916            got.push(rec);
917        }
918        assert_eq!(records, got);
919    }
920
921    #[test]
922    fn record_round_trip_zadd() {
923        let rec = AofRecord::ZAdd {
924            key: "board".into(),
925            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
926        };
927        let bytes = rec.to_bytes().unwrap();
928        let decoded = AofRecord::from_bytes(&bytes).unwrap();
929        assert_eq!(rec, decoded);
930    }
931
932    #[test]
933    fn record_round_trip_zrem() {
934        let rec = AofRecord::ZRem {
935            key: "board".into(),
936            members: vec!["alice".into(), "bob".into()],
937        };
938        let bytes = rec.to_bytes().unwrap();
939        let decoded = AofRecord::from_bytes(&bytes).unwrap();
940        assert_eq!(rec, decoded);
941    }
942
943    #[test]
944    fn writer_reader_round_trip_with_sorted_set_records() {
945        let dir = temp_dir();
946        let path = dir.path().join("zset.aof");
947
948        let records = vec![
949            AofRecord::ZAdd {
950                key: "board".into(),
951                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
952            },
953            AofRecord::ZRem {
954                key: "board".into(),
955                members: vec!["alice".into()],
956            },
957        ];
958
959        {
960            let mut writer = AofWriter::open(&path).unwrap();
961            for rec in &records {
962                writer.write_record(rec).unwrap();
963            }
964            writer.sync().unwrap();
965        }
966
967        let mut reader = AofReader::open(&path).unwrap();
968        let mut got = Vec::new();
969        while let Some(rec) = reader.read_record().unwrap() {
970            got.push(rec);
971        }
972        assert_eq!(records, got);
973    }
974
975    #[test]
976    fn record_round_trip_persist() {
977        let rec = AofRecord::Persist {
978            key: "mykey".into(),
979        };
980        let bytes = rec.to_bytes().unwrap();
981        let decoded = AofRecord::from_bytes(&bytes).unwrap();
982        assert_eq!(rec, decoded);
983    }
984
985    #[test]
986    fn record_round_trip_pexpire() {
987        let rec = AofRecord::Pexpire {
988            key: "mykey".into(),
989            milliseconds: 5000,
990        };
991        let bytes = rec.to_bytes().unwrap();
992        let decoded = AofRecord::from_bytes(&bytes).unwrap();
993        assert_eq!(rec, decoded);
994    }
995
996    #[test]
997    fn record_round_trip_incr() {
998        let rec = AofRecord::Incr {
999            key: "counter".into(),
1000        };
1001        let bytes = rec.to_bytes().unwrap();
1002        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1003        assert_eq!(rec, decoded);
1004    }
1005
1006    #[test]
1007    fn record_round_trip_decr() {
1008        let rec = AofRecord::Decr {
1009            key: "counter".into(),
1010        };
1011        let bytes = rec.to_bytes().unwrap();
1012        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1013        assert_eq!(rec, decoded);
1014    }
1015
1016    #[test]
1017    fn writer_reader_round_trip_with_persist_pexpire() {
1018        let dir = temp_dir();
1019        let path = dir.path().join("persist_pexpire.aof");
1020
1021        let records = vec![
1022            AofRecord::Set {
1023                key: "k".into(),
1024                value: Bytes::from("v"),
1025                expire_ms: 5000,
1026            },
1027            AofRecord::Persist { key: "k".into() },
1028            AofRecord::Pexpire {
1029                key: "k".into(),
1030                milliseconds: 3000,
1031            },
1032        ];
1033
1034        {
1035            let mut writer = AofWriter::open(&path).unwrap();
1036            for rec in &records {
1037                writer.write_record(rec).unwrap();
1038            }
1039            writer.sync().unwrap();
1040        }
1041
1042        let mut reader = AofReader::open(&path).unwrap();
1043        let mut got = Vec::new();
1044        while let Some(rec) = reader.read_record().unwrap() {
1045            got.push(rec);
1046        }
1047        assert_eq!(records, got);
1048    }
1049
1050    #[test]
1051    fn aof_path_format() {
1052        let p = aof_path(Path::new("/data"), 3);
1053        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1054    }
1055
1056    #[test]
1057    fn record_round_trip_hset() {
1058        let rec = AofRecord::HSet {
1059            key: "hash".into(),
1060            fields: vec![
1061                ("f1".into(), Bytes::from("v1")),
1062                ("f2".into(), Bytes::from("v2")),
1063            ],
1064        };
1065        let bytes = rec.to_bytes().unwrap();
1066        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1067        assert_eq!(rec, decoded);
1068    }
1069
1070    #[test]
1071    fn record_round_trip_hdel() {
1072        let rec = AofRecord::HDel {
1073            key: "hash".into(),
1074            fields: vec!["f1".into(), "f2".into()],
1075        };
1076        let bytes = rec.to_bytes().unwrap();
1077        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1078        assert_eq!(rec, decoded);
1079    }
1080
1081    #[test]
1082    fn record_round_trip_hincrby() {
1083        let rec = AofRecord::HIncrBy {
1084            key: "hash".into(),
1085            field: "counter".into(),
1086            delta: -42,
1087        };
1088        let bytes = rec.to_bytes().unwrap();
1089        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1090        assert_eq!(rec, decoded);
1091    }
1092
1093    #[test]
1094    fn record_round_trip_sadd() {
1095        let rec = AofRecord::SAdd {
1096            key: "set".into(),
1097            members: vec!["m1".into(), "m2".into(), "m3".into()],
1098        };
1099        let bytes = rec.to_bytes().unwrap();
1100        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1101        assert_eq!(rec, decoded);
1102    }
1103
1104    #[test]
1105    fn record_round_trip_srem() {
1106        let rec = AofRecord::SRem {
1107            key: "set".into(),
1108            members: vec!["m1".into()],
1109        };
1110        let bytes = rec.to_bytes().unwrap();
1111        let decoded = AofRecord::from_bytes(&bytes).unwrap();
1112        assert_eq!(rec, decoded);
1113    }
1114}