Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30/// Reads a length-prefixed field and decodes it as UTF-8.
31fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32    let bytes = format::read_bytes(r)?;
33    String::from_utf8(bytes).map_err(|_| {
34        FormatError::Io(io::Error::new(
35            io::ErrorKind::InvalidData,
36            format!("{field} is not valid utf-8"),
37        ))
38    })
39}
40
41/// Reads a count-prefixed list of strings: `[count: u32][string]*`.
42/// Used by SADD, SREM, HDEL, and ZREM deserialization.
43fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44    let count = format::read_u32(r)?;
45    format::validate_collection_count(count, field)?;
46    let mut items = Vec::with_capacity(format::capped_capacity(count));
47    for _ in 0..count {
48        items.push(read_string(r, field)?);
49    }
50    Ok(items)
51}
52
53/// Reads a count-prefixed list of raw byte blobs: `[count: u32][bytes]*`.
54/// Used by LPUSH and RPUSH deserialization.
55fn read_bytes_list(r: &mut impl io::Read, label: &str) -> Result<Vec<Bytes>, FormatError> {
56    let count = format::read_u32(r)?;
57    format::validate_collection_count(count, label)?;
58    let mut items = Vec::with_capacity(format::capped_capacity(count));
59    for _ in 0..count {
60        items.push(Bytes::from(format::read_bytes(r)?));
61    }
62    Ok(items)
63}
64
65// -- record tags --
66// values are stable and must not change (on-disk format).
67
68// string
69const TAG_SET: u8 = 1;
70const TAG_INCR: u8 = 12;
71const TAG_DECR: u8 = 13;
72const TAG_INCRBY: u8 = 19;
73const TAG_DECRBY: u8 = 20;
74const TAG_APPEND: u8 = 21;
75
76// list
77const TAG_LPUSH: u8 = 4;
78const TAG_RPUSH: u8 = 5;
79const TAG_LPOP: u8 = 6;
80const TAG_RPOP: u8 = 7;
81
82// sorted set
83const TAG_ZADD: u8 = 8;
84const TAG_ZREM: u8 = 9;
85
86// hash
87const TAG_HSET: u8 = 14;
88const TAG_HDEL: u8 = 15;
89const TAG_HINCRBY: u8 = 16;
90
91// set
92const TAG_SADD: u8 = 17;
93const TAG_SREM: u8 = 18;
94
95// key lifecycle
96const TAG_DEL: u8 = 2;
97const TAG_EXPIRE: u8 = 3;
98const TAG_PERSIST: u8 = 10;
99const TAG_PEXPIRE: u8 = 11;
100const TAG_RENAME: u8 = 22;
101
102// vector
103#[cfg(feature = "vector")]
104const TAG_VADD: u8 = 25;
105#[cfg(feature = "vector")]
106const TAG_VREM: u8 = 26;
107
108// protobuf
109#[cfg(feature = "protobuf")]
110const TAG_PROTO_SET: u8 = 23;
111#[cfg(feature = "protobuf")]
112const TAG_PROTO_REGISTER: u8 = 24;
113
114/// A single mutation record stored in the AOF.
115#[derive(Debug, Clone, PartialEq)]
116pub enum AofRecord {
117    /// SET key value \[expire_ms\]. expire_ms is -1 for no expiration.
118    Set {
119        key: String,
120        value: Bytes,
121        expire_ms: i64,
122    },
123    /// DEL key.
124    Del { key: String },
125    /// EXPIRE key seconds.
126    Expire { key: String, seconds: u64 },
127    /// LPUSH key value [value ...].
128    LPush { key: String, values: Vec<Bytes> },
129    /// RPUSH key value [value ...].
130    RPush { key: String, values: Vec<Bytes> },
131    /// LPOP key.
132    LPop { key: String },
133    /// RPOP key.
134    RPop { key: String },
135    /// ZADD key score member [score member ...].
136    ZAdd {
137        key: String,
138        members: Vec<(f64, String)>,
139    },
140    /// ZREM key member [member ...].
141    ZRem { key: String, members: Vec<String> },
142    /// PERSIST key — remove expiration.
143    Persist { key: String },
144    /// PEXPIRE key milliseconds.
145    Pexpire { key: String, milliseconds: u64 },
146    /// INCR key.
147    Incr { key: String },
148    /// DECR key.
149    Decr { key: String },
150    /// HSET key field value [field value ...].
151    HSet {
152        key: String,
153        fields: Vec<(String, Bytes)>,
154    },
155    /// HDEL key field [field ...].
156    HDel { key: String, fields: Vec<String> },
157    /// HINCRBY key field delta.
158    HIncrBy {
159        key: String,
160        field: String,
161        delta: i64,
162    },
163    /// SADD key member [member ...].
164    SAdd { key: String, members: Vec<String> },
165    /// SREM key member [member ...].
166    SRem { key: String, members: Vec<String> },
167    /// INCRBY key delta.
168    IncrBy { key: String, delta: i64 },
169    /// DECRBY key delta.
170    DecrBy { key: String, delta: i64 },
171    /// APPEND key value.
172    Append { key: String, value: Bytes },
173    /// RENAME key newkey.
174    Rename { key: String, newkey: String },
175    /// VADD key element vector [metric quant connectivity expansion_add].
176    /// Stores the full index config so recovery can recreate the set.
177    #[cfg(feature = "vector")]
178    VAdd {
179        key: String,
180        element: String,
181        vector: Vec<f32>,
182        /// 0 = cosine, 1 = l2, 2 = inner product
183        metric: u8,
184        /// 0 = f32, 1 = f16, 2 = i8
185        quantization: u8,
186        connectivity: u32,
187        expansion_add: u32,
188    },
189    /// VREM key element.
190    #[cfg(feature = "vector")]
191    VRem { key: String, element: String },
192    /// PROTO.SET key type_name data [expire_ms].
193    #[cfg(feature = "protobuf")]
194    ProtoSet {
195        key: String,
196        type_name: String,
197        data: Bytes,
198        expire_ms: i64,
199    },
200    /// PROTO.REGISTER name descriptor_bytes (for schema persistence).
201    #[cfg(feature = "protobuf")]
202    ProtoRegister { name: String, descriptor: Bytes },
203}
204
205impl AofRecord {
206    // IMPORTANT: each variant has three match arms that must stay in sync:
207    //   - `tag()`: the one-byte discriminant written to disk
208    //   - `estimated_size()`: the capacity hint for the serialization buffer
209    //   - `to_bytes()`: the actual serialized payload
210    //
211    // When adding a new variant, update all three in that order.
212    // The binary format is stable — tag byte values must never be reused.
213
214    /// Returns the on-disk tag byte for this record variant.
215    fn tag(&self) -> u8 {
216        match self {
217            AofRecord::Set { .. } => TAG_SET,
218            AofRecord::Del { .. } => TAG_DEL,
219            AofRecord::Expire { .. } => TAG_EXPIRE,
220            AofRecord::LPush { .. } => TAG_LPUSH,
221            AofRecord::RPush { .. } => TAG_RPUSH,
222            AofRecord::LPop { .. } => TAG_LPOP,
223            AofRecord::RPop { .. } => TAG_RPOP,
224            AofRecord::ZAdd { .. } => TAG_ZADD,
225            AofRecord::ZRem { .. } => TAG_ZREM,
226            AofRecord::Persist { .. } => TAG_PERSIST,
227            AofRecord::Pexpire { .. } => TAG_PEXPIRE,
228            AofRecord::Incr { .. } => TAG_INCR,
229            AofRecord::Decr { .. } => TAG_DECR,
230            AofRecord::HSet { .. } => TAG_HSET,
231            AofRecord::HDel { .. } => TAG_HDEL,
232            AofRecord::HIncrBy { .. } => TAG_HINCRBY,
233            AofRecord::SAdd { .. } => TAG_SADD,
234            AofRecord::SRem { .. } => TAG_SREM,
235            AofRecord::IncrBy { .. } => TAG_INCRBY,
236            AofRecord::DecrBy { .. } => TAG_DECRBY,
237            AofRecord::Append { .. } => TAG_APPEND,
238            AofRecord::Rename { .. } => TAG_RENAME,
239            #[cfg(feature = "vector")]
240            AofRecord::VAdd { .. } => TAG_VADD,
241            #[cfg(feature = "vector")]
242            AofRecord::VRem { .. } => TAG_VREM,
243            #[cfg(feature = "protobuf")]
244            AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
245            #[cfg(feature = "protobuf")]
246            AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
247        }
248    }
249
250    /// Estimates the serialized size of this record in bytes.
251    ///
252    /// Used as a capacity hint for `to_bytes()` to avoid intermediate
253    /// reallocations. The estimate includes the tag byte plus all
254    /// length-prefixed fields, erring slightly high to avoid growing.
255    fn estimated_size(&self) -> usize {
256        // overhead per length-prefixed field: 4 bytes for the u32 length
257        const LEN_PREFIX: usize = 4;
258
259        match self {
260            // 1 tag + 4 key-len + key + 4 value-len + value + 8 expire_ms
261            AofRecord::Set {
262                key,
263                value,
264                expire_ms: _,
265            } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
266            // 1 tag + 4 key-len + key
267            AofRecord::Del { key }
268            | AofRecord::LPop { key }
269            | AofRecord::RPop { key }
270            | AofRecord::Persist { key }
271            | AofRecord::Incr { key }
272            | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
273            // 1 tag + 4 key-len + key + 8 seconds/millis
274            AofRecord::Expire { key, .. } | AofRecord::Pexpire { key, .. } => {
275                1 + LEN_PREFIX + key.len() + 8
276            }
277            // 1 tag + 4 key-len + key + 4 count + (4 value-len + value) * n
278            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
279                let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
280                1 + LEN_PREFIX + key.len() + 4 + values_size
281            }
282            AofRecord::ZAdd { key, members } => {
283                let members_size: usize =
284                    members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
285                1 + LEN_PREFIX + key.len() + 4 + members_size
286            }
287            AofRecord::ZRem { key, members }
288            | AofRecord::SAdd { key, members }
289            | AofRecord::SRem { key, members } => {
290                let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
291                1 + LEN_PREFIX + key.len() + 4 + members_size
292            }
293            AofRecord::HSet { key, fields } => {
294                let fields_size: usize = fields
295                    .iter()
296                    .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
297                    .sum();
298                1 + LEN_PREFIX + key.len() + 4 + fields_size
299            }
300            AofRecord::HDel { key, fields } => {
301                let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
302                1 + LEN_PREFIX + key.len() + 4 + fields_size
303            }
304            AofRecord::HIncrBy { key, field, .. } => {
305                1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
306            }
307            AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
308                1 + LEN_PREFIX + key.len() + 8
309            }
310            AofRecord::Append { key, value } => {
311                1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
312            }
313            AofRecord::Rename { key, newkey } => {
314                1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
315            }
316            #[cfg(feature = "vector")]
317            AofRecord::VAdd {
318                key,
319                element,
320                vector,
321                ..
322            } => {
323                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
324            }
325            #[cfg(feature = "vector")]
326            AofRecord::VRem { key, element } => {
327                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
328            }
329            #[cfg(feature = "protobuf")]
330            AofRecord::ProtoSet {
331                key,
332                type_name,
333                data,
334                ..
335            } => {
336                1 + LEN_PREFIX
337                    + key.len()
338                    + LEN_PREFIX
339                    + type_name.len()
340                    + LEN_PREFIX
341                    + data.len()
342                    + 8
343            }
344            #[cfg(feature = "protobuf")]
345            AofRecord::ProtoRegister { name, descriptor } => {
346                1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
347            }
348        }
349    }
350
351    /// Serializes this record into a byte vector (tag + payload, no CRC).
352    pub fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
353        let mut buf = Vec::with_capacity(self.estimated_size());
354        format::write_u8(&mut buf, self.tag())?;
355
356        match self {
357            // key-only: tag + key
358            AofRecord::Del { key }
359            | AofRecord::LPop { key }
360            | AofRecord::RPop { key }
361            | AofRecord::Persist { key }
362            | AofRecord::Incr { key }
363            | AofRecord::Decr { key } => {
364                format::write_bytes(&mut buf, key.as_bytes())?;
365            }
366
367            // key + bytes value + expire
368            AofRecord::Set {
369                key,
370                value,
371                expire_ms,
372            } => {
373                format::write_bytes(&mut buf, key.as_bytes())?;
374                format::write_bytes(&mut buf, value)?;
375                format::write_i64(&mut buf, *expire_ms)?;
376            }
377
378            // key + i64 (seconds/milliseconds are capped at i64::MAX on write
379            // so that deserialization can safely cast back to u64)
380            AofRecord::Expire { key, seconds } => {
381                format::write_bytes(&mut buf, key.as_bytes())?;
382                format::write_i64(&mut buf, (*seconds).min(i64::MAX as u64) as i64)?;
383            }
384            AofRecord::Pexpire { key, milliseconds } => {
385                format::write_bytes(&mut buf, key.as_bytes())?;
386                format::write_i64(&mut buf, (*milliseconds).min(i64::MAX as u64) as i64)?;
387            }
388            AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
389                format::write_bytes(&mut buf, key.as_bytes())?;
390                format::write_i64(&mut buf, *delta)?;
391            }
392
393            // key + byte list
394            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
395                format::write_bytes(&mut buf, key.as_bytes())?;
396                format::write_len(&mut buf, values.len())?;
397                for v in values {
398                    format::write_bytes(&mut buf, v)?;
399                }
400            }
401
402            // key + string list
403            AofRecord::ZRem { key, members }
404            | AofRecord::SAdd { key, members }
405            | AofRecord::SRem { key, members } => {
406                format::write_bytes(&mut buf, key.as_bytes())?;
407                format::write_len(&mut buf, members.len())?;
408                for member in members {
409                    format::write_bytes(&mut buf, member.as_bytes())?;
410                }
411            }
412            AofRecord::HDel { key, fields } => {
413                format::write_bytes(&mut buf, key.as_bytes())?;
414                format::write_len(&mut buf, fields.len())?;
415                for field in fields {
416                    format::write_bytes(&mut buf, field.as_bytes())?;
417                }
418            }
419
420            // key + scored members
421            AofRecord::ZAdd { key, members } => {
422                format::write_bytes(&mut buf, key.as_bytes())?;
423                format::write_len(&mut buf, members.len())?;
424                for (score, member) in members {
425                    format::write_f64(&mut buf, *score)?;
426                    format::write_bytes(&mut buf, member.as_bytes())?;
427                }
428            }
429
430            // key + field-value pairs
431            AofRecord::HSet { key, fields } => {
432                format::write_bytes(&mut buf, key.as_bytes())?;
433                format::write_len(&mut buf, fields.len())?;
434                for (field, value) in fields {
435                    format::write_bytes(&mut buf, field.as_bytes())?;
436                    format::write_bytes(&mut buf, value)?;
437                }
438            }
439
440            // key + field + delta
441            AofRecord::HIncrBy { key, field, delta } => {
442                format::write_bytes(&mut buf, key.as_bytes())?;
443                format::write_bytes(&mut buf, field.as_bytes())?;
444                format::write_i64(&mut buf, *delta)?;
445            }
446
447            // key + bytes value (no expire)
448            AofRecord::Append { key, value } => {
449                format::write_bytes(&mut buf, key.as_bytes())?;
450                format::write_bytes(&mut buf, value)?;
451            }
452
453            // key + newkey
454            AofRecord::Rename { key, newkey } => {
455                format::write_bytes(&mut buf, key.as_bytes())?;
456                format::write_bytes(&mut buf, newkey.as_bytes())?;
457            }
458
459            #[cfg(feature = "vector")]
460            AofRecord::VAdd {
461                key,
462                element,
463                vector,
464                metric,
465                quantization,
466                connectivity,
467                expansion_add,
468            } => {
469                format::write_bytes(&mut buf, key.as_bytes())?;
470                format::write_bytes(&mut buf, element.as_bytes())?;
471                format::write_len(&mut buf, vector.len())?;
472                for &v in vector {
473                    format::write_f32(&mut buf, v)?;
474                }
475                format::write_u8(&mut buf, *metric)?;
476                format::write_u8(&mut buf, *quantization)?;
477                format::write_u32(&mut buf, *connectivity)?;
478                format::write_u32(&mut buf, *expansion_add)?;
479            }
480            #[cfg(feature = "vector")]
481            AofRecord::VRem { key, element } => {
482                format::write_bytes(&mut buf, key.as_bytes())?;
483                format::write_bytes(&mut buf, element.as_bytes())?;
484            }
485
486            #[cfg(feature = "protobuf")]
487            AofRecord::ProtoSet {
488                key,
489                type_name,
490                data,
491                expire_ms,
492            } => {
493                format::write_bytes(&mut buf, key.as_bytes())?;
494                format::write_bytes(&mut buf, type_name.as_bytes())?;
495                format::write_bytes(&mut buf, data)?;
496                format::write_i64(&mut buf, *expire_ms)?;
497            }
498            #[cfg(feature = "protobuf")]
499            AofRecord::ProtoRegister { name, descriptor } => {
500                format::write_bytes(&mut buf, name.as_bytes())?;
501                format::write_bytes(&mut buf, descriptor)?;
502            }
503        }
504        Ok(buf)
505    }
506
507    /// Deserializes a record from its binary payload (tag byte + fields, no CRC).
508    ///
509    /// The format is the same as `to_bytes()`. CRC validation is the caller's
510    /// responsibility (done by the AOF recovery reader before this is called).
511    pub fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
512        let mut cursor = io::Cursor::new(data);
513        let tag = format::read_u8(&mut cursor)?;
514        match tag {
515            TAG_SET => {
516                let key = read_string(&mut cursor, "key")?;
517                let value = format::read_bytes(&mut cursor)?;
518                let expire_ms = format::read_i64(&mut cursor)?;
519                Ok(AofRecord::Set {
520                    key,
521                    value: Bytes::from(value),
522                    expire_ms,
523                })
524            }
525            TAG_DEL => {
526                let key = read_string(&mut cursor, "key")?;
527                Ok(AofRecord::Del { key })
528            }
529            TAG_EXPIRE => {
530                let key = read_string(&mut cursor, "key")?;
531                let raw = format::read_i64(&mut cursor)?;
532                let seconds = u64::try_from(raw).map_err(|_| {
533                    FormatError::InvalidData(format!(
534                        "EXPIRE seconds is negative ({raw}) in AOF record"
535                    ))
536                })?;
537                Ok(AofRecord::Expire { key, seconds })
538            }
539            TAG_LPUSH | TAG_RPUSH => {
540                let key = read_string(&mut cursor, "key")?;
541                let values = read_bytes_list(&mut cursor, "list")?;
542                if tag == TAG_LPUSH {
543                    Ok(AofRecord::LPush { key, values })
544                } else {
545                    Ok(AofRecord::RPush { key, values })
546                }
547            }
548            TAG_LPOP => {
549                let key = read_string(&mut cursor, "key")?;
550                Ok(AofRecord::LPop { key })
551            }
552            TAG_RPOP => {
553                let key = read_string(&mut cursor, "key")?;
554                Ok(AofRecord::RPop { key })
555            }
556            TAG_ZADD => {
557                let key = read_string(&mut cursor, "key")?;
558                let count = format::read_u32(&mut cursor)?;
559                format::validate_collection_count(count, "sorted set")?;
560                let mut members = Vec::with_capacity(format::capped_capacity(count));
561                for _ in 0..count {
562                    let score = format::read_f64(&mut cursor)?;
563                    let member = read_string(&mut cursor, "member")?;
564                    members.push((score, member));
565                }
566                Ok(AofRecord::ZAdd { key, members })
567            }
568            TAG_ZREM => {
569                let key = read_string(&mut cursor, "key")?;
570                let members = read_string_list(&mut cursor, "member")?;
571                Ok(AofRecord::ZRem { key, members })
572            }
573            TAG_PERSIST => {
574                let key = read_string(&mut cursor, "key")?;
575                Ok(AofRecord::Persist { key })
576            }
577            TAG_PEXPIRE => {
578                let key = read_string(&mut cursor, "key")?;
579                let raw = format::read_i64(&mut cursor)?;
580                let milliseconds = u64::try_from(raw).map_err(|_| {
581                    FormatError::InvalidData(format!(
582                        "PEXPIRE milliseconds is negative ({raw}) in AOF record"
583                    ))
584                })?;
585                Ok(AofRecord::Pexpire { key, milliseconds })
586            }
587            TAG_INCR => {
588                let key = read_string(&mut cursor, "key")?;
589                Ok(AofRecord::Incr { key })
590            }
591            TAG_DECR => {
592                let key = read_string(&mut cursor, "key")?;
593                Ok(AofRecord::Decr { key })
594            }
595            TAG_HSET => {
596                let key = read_string(&mut cursor, "key")?;
597                let count = format::read_u32(&mut cursor)?;
598                format::validate_collection_count(count, "hash")?;
599                let mut fields = Vec::with_capacity(format::capped_capacity(count));
600                for _ in 0..count {
601                    let field = read_string(&mut cursor, "field")?;
602                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
603                    fields.push((field, value));
604                }
605                Ok(AofRecord::HSet { key, fields })
606            }
607            TAG_HDEL => {
608                let key = read_string(&mut cursor, "key")?;
609                let fields = read_string_list(&mut cursor, "field")?;
610                Ok(AofRecord::HDel { key, fields })
611            }
612            TAG_HINCRBY => {
613                let key = read_string(&mut cursor, "key")?;
614                let field = read_string(&mut cursor, "field")?;
615                let delta = format::read_i64(&mut cursor)?;
616                Ok(AofRecord::HIncrBy { key, field, delta })
617            }
618            TAG_SADD => {
619                let key = read_string(&mut cursor, "key")?;
620                let members = read_string_list(&mut cursor, "member")?;
621                Ok(AofRecord::SAdd { key, members })
622            }
623            TAG_SREM => {
624                let key = read_string(&mut cursor, "key")?;
625                let members = read_string_list(&mut cursor, "member")?;
626                Ok(AofRecord::SRem { key, members })
627            }
628            TAG_INCRBY => {
629                let key = read_string(&mut cursor, "key")?;
630                let delta = format::read_i64(&mut cursor)?;
631                Ok(AofRecord::IncrBy { key, delta })
632            }
633            TAG_DECRBY => {
634                let key = read_string(&mut cursor, "key")?;
635                let delta = format::read_i64(&mut cursor)?;
636                Ok(AofRecord::DecrBy { key, delta })
637            }
638            TAG_APPEND => {
639                let key = read_string(&mut cursor, "key")?;
640                let value = Bytes::from(format::read_bytes(&mut cursor)?);
641                Ok(AofRecord::Append { key, value })
642            }
643            TAG_RENAME => {
644                let key = read_string(&mut cursor, "key")?;
645                let newkey = read_string(&mut cursor, "newkey")?;
646                Ok(AofRecord::Rename { key, newkey })
647            }
648            #[cfg(feature = "vector")]
649            TAG_VADD => {
650                let key = read_string(&mut cursor, "key")?;
651                let element = read_string(&mut cursor, "element")?;
652                let dim = format::read_u32(&mut cursor)?;
653                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
654                    return Err(FormatError::InvalidData(format!(
655                        "AOF VADD dimension {dim} exceeds max {}",
656                        format::MAX_PERSISTED_VECTOR_DIMS
657                    )));
658                }
659                let mut vector = Vec::with_capacity(dim as usize);
660                for _ in 0..dim {
661                    vector.push(format::read_f32(&mut cursor)?);
662                }
663                let metric = format::read_u8(&mut cursor)?;
664                let quantization = format::read_u8(&mut cursor)?;
665                let connectivity = format::read_u32(&mut cursor)?;
666                let expansion_add = format::read_u32(&mut cursor)?;
667                Ok(AofRecord::VAdd {
668                    key,
669                    element,
670                    vector,
671                    metric,
672                    quantization,
673                    connectivity,
674                    expansion_add,
675                })
676            }
677            #[cfg(feature = "vector")]
678            TAG_VREM => {
679                let key = read_string(&mut cursor, "key")?;
680                let element = read_string(&mut cursor, "element")?;
681                Ok(AofRecord::VRem { key, element })
682            }
683            #[cfg(feature = "protobuf")]
684            TAG_PROTO_SET => {
685                let key = read_string(&mut cursor, "key")?;
686                let type_name = read_string(&mut cursor, "type_name")?;
687                let data = format::read_bytes(&mut cursor)?;
688                let expire_ms = format::read_i64(&mut cursor)?;
689                Ok(AofRecord::ProtoSet {
690                    key,
691                    type_name,
692                    data: Bytes::from(data),
693                    expire_ms,
694                })
695            }
696            #[cfg(feature = "protobuf")]
697            TAG_PROTO_REGISTER => {
698                let name = read_string(&mut cursor, "name")?;
699                let descriptor = format::read_bytes(&mut cursor)?;
700                Ok(AofRecord::ProtoRegister {
701                    name,
702                    descriptor: Bytes::from(descriptor),
703                })
704            }
705            _ => Err(FormatError::UnknownTag(tag)),
706        }
707    }
708}
709
710/// Configurable fsync policy for the AOF writer.
711#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
712pub enum FsyncPolicy {
713    /// fsync after every write. safest, slowest.
714    Always,
715    /// fsync once per second. the shard tick drives this.
716    #[default]
717    EverySec,
718    /// let the OS decide when to flush. fastest, least durable.
719    No,
720}
721
722/// Buffered writer for appending AOF records to a file.
723pub struct AofWriter {
724    writer: BufWriter<File>,
725    path: PathBuf,
726    #[cfg(feature = "encryption")]
727    encryption_key: Option<crate::encryption::EncryptionKey>,
728}
729
730impl AofWriter {
731    /// Opens (or creates) an AOF file. If the file is new, writes the header.
732    /// If the file already exists, appends to it.
733    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
734        let path = path.into();
735        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
736
737        let file = open_persistence_file(&path)?;
738        let mut writer = BufWriter::new(file);
739
740        if !exists {
741            format::write_header(&mut writer, format::AOF_MAGIC)?;
742            writer.flush()?;
743        }
744
745        Ok(Self {
746            writer,
747            path,
748            #[cfg(feature = "encryption")]
749            encryption_key: None,
750        })
751    }
752
753    /// Opens (or creates) an encrypted AOF file using AES-256-GCM.
754    ///
755    /// New files get a v3 header. Existing v2 files can be appended to —
756    /// new records will be written unencrypted (use `BGREWRITEAOF` to
757    /// migrate the full file to v3).
758    #[cfg(feature = "encryption")]
759    pub fn open_encrypted(
760        path: impl Into<PathBuf>,
761        key: crate::encryption::EncryptionKey,
762    ) -> Result<Self, FormatError> {
763        let path = path.into();
764        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
765
766        let file = open_persistence_file(&path)?;
767        let mut writer = BufWriter::new(file);
768
769        if !exists {
770            format::write_header_versioned(
771                &mut writer,
772                format::AOF_MAGIC,
773                format::FORMAT_VERSION_ENCRYPTED,
774            )?;
775            writer.flush()?;
776        }
777
778        Ok(Self {
779            writer,
780            path,
781            encryption_key: Some(key),
782        })
783    }
784
785    /// Appends a record to the AOF.
786    ///
787    /// When an encryption key is set, writes: `[nonce: 12B][len: 4B][ciphertext]`.
788    /// Otherwise writes the v2 format: `[tag+payload][crc32: 4B]`.
789    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
790        let payload = record.to_bytes()?;
791
792        #[cfg(feature = "encryption")]
793        if let Some(ref key) = self.encryption_key {
794            let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
795            self.writer.write_all(&nonce)?;
796            format::write_len(&mut self.writer, ciphertext.len())?;
797            self.writer.write_all(&ciphertext)?;
798            return Ok(());
799        }
800
801        let checksum = format::crc32(&payload);
802        self.writer.write_all(&payload)?;
803        format::write_u32(&mut self.writer, checksum)?;
804        Ok(())
805    }
806
807    /// Flushes the internal buffer to the OS.
808    pub fn flush(&mut self) -> Result<(), FormatError> {
809        self.writer.flush()?;
810        Ok(())
811    }
812
813    /// Flushes and fsyncs the file to disk.
814    pub fn sync(&mut self) -> Result<(), FormatError> {
815        self.writer.flush()?;
816        self.writer.get_ref().sync_all()?;
817        Ok(())
818    }
819
820    /// Returns the file path.
821    pub fn path(&self) -> &Path {
822        &self.path
823    }
824
825    /// Truncates the AOF file back to just the header.
826    ///
827    /// Uses write-to-temp-then-rename for crash safety: the old AOF
828    /// remains intact until the new file (with only a header) is fully
829    /// synced and atomically renamed into place.
830    pub fn truncate(&mut self) -> Result<(), FormatError> {
831        // flush the old writer so no data is in the BufWriter
832        self.writer.flush()?;
833
834        // write a fresh header to a temp file next to the real AOF
835        let tmp_path = self.path.with_extension("aof.tmp");
836        let mut opts = OpenOptions::new();
837        opts.create(true).write(true).truncate(true);
838        #[cfg(unix)]
839        {
840            use std::os::unix::fs::OpenOptionsExt;
841            opts.mode(0o600);
842        }
843        let tmp_file = opts.open(&tmp_path)?;
844        let mut tmp_writer = BufWriter::new(tmp_file);
845
846        #[cfg(feature = "encryption")]
847        if self.encryption_key.is_some() {
848            format::write_header_versioned(
849                &mut tmp_writer,
850                format::AOF_MAGIC,
851                format::FORMAT_VERSION_ENCRYPTED,
852            )?;
853        } else {
854            format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
855        }
856        #[cfg(not(feature = "encryption"))]
857        format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
858
859        tmp_writer.flush()?;
860        tmp_writer.get_ref().sync_all()?;
861
862        // atomic rename: old AOF is replaced only after new file is durable
863        std::fs::rename(&tmp_path, &self.path)?;
864
865        // reopen for appending
866        let file = OpenOptions::new().append(true).open(&self.path)?;
867        self.writer = BufWriter::new(file);
868        Ok(())
869    }
870}
871
872/// Reader for iterating over AOF records.
873pub struct AofReader {
874    reader: BufReader<File>,
875    /// Format version from the file header. v2 = plaintext, v3 = encrypted.
876    version: u8,
877    #[cfg(feature = "encryption")]
878    encryption_key: Option<crate::encryption::EncryptionKey>,
879}
880
881impl fmt::Debug for AofReader {
882    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
883        f.debug_struct("AofReader")
884            .field("version", &self.version)
885            .finish()
886    }
887}
888
889impl AofReader {
890    /// Opens an AOF file and validates the header.
891    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
892        let file = File::open(path.as_ref())?;
893        let mut reader = BufReader::new(file);
894        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
895
896        if version == format::FORMAT_VERSION_ENCRYPTED {
897            return Err(FormatError::EncryptionRequired);
898        }
899
900        Ok(Self {
901            reader,
902            version,
903            #[cfg(feature = "encryption")]
904            encryption_key: None,
905        })
906    }
907
908    /// Opens an AOF file with an encryption key for decrypting v3 records.
909    ///
910    /// Also handles v2 (plaintext) files — the key is simply unused,
911    /// allowing transparent migration.
912    #[cfg(feature = "encryption")]
913    pub fn open_encrypted(
914        path: impl AsRef<Path>,
915        key: crate::encryption::EncryptionKey,
916    ) -> Result<Self, FormatError> {
917        let file = File::open(path.as_ref())?;
918        let mut reader = BufReader::new(file);
919        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
920
921        Ok(Self {
922            reader,
923            version,
924            encryption_key: Some(key),
925        })
926    }
927
928    /// Reads the next record from the AOF.
929    ///
930    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
931    /// server crashed mid-write), returns `Ok(None)` rather than an error
932    /// — this is the expected recovery behavior.
933    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
934        #[cfg(feature = "encryption")]
935        if self.version == format::FORMAT_VERSION_ENCRYPTED {
936            return self.read_encrypted_record();
937        }
938
939        self.read_v2_record()
940    }
941
942    /// Reads a v2 (plaintext) record: tag + payload + crc32.
943    fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
944        // peek for EOF — try reading the tag byte
945        let tag = match format::read_u8(&mut self.reader) {
946            Ok(t) => t,
947            Err(FormatError::UnexpectedEof) => return Ok(None),
948            Err(e) => return Err(e),
949        };
950
951        // read the rest of the payload based on tag, building the full
952        // record bytes for CRC verification
953        let record_result = self.read_payload_for_tag(tag);
954        match record_result {
955            Ok((payload, stored_crc)) => {
956                // prepend the tag to the payload for CRC check
957                let mut full = Vec::with_capacity(1 + payload.len());
958                full.push(tag);
959                full.extend_from_slice(&payload);
960                format::verify_crc32(&full, stored_crc)?;
961                AofRecord::from_bytes(&full).map(Some)
962            }
963            // truncated record — treat as end of usable data
964            Err(FormatError::UnexpectedEof) => Ok(None),
965            Err(e) => Err(e),
966        }
967    }
968
969    /// Reads a v3 (encrypted) record: nonce + len + ciphertext.
970    #[cfg(feature = "encryption")]
971    fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
972        let key = self
973            .encryption_key
974            .as_ref()
975            .ok_or(FormatError::EncryptionRequired)?;
976
977        // read the 12-byte nonce
978        let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
979        if let Err(e) = self.reader.read_exact(&mut nonce) {
980            return if e.kind() == io::ErrorKind::UnexpectedEof {
981                Ok(None)
982            } else {
983                Err(FormatError::Io(e))
984            };
985        }
986
987        // read ciphertext length and ciphertext
988        let ct_len = match format::read_u32(&mut self.reader) {
989            Ok(n) => n as usize,
990            Err(FormatError::UnexpectedEof) => return Ok(None),
991            Err(e) => return Err(e),
992        };
993
994        if ct_len > format::MAX_FIELD_LEN {
995            return Err(FormatError::Io(io::Error::new(
996                io::ErrorKind::InvalidData,
997                format!("encrypted record length {ct_len} exceeds maximum"),
998            )));
999        }
1000
1001        let mut ciphertext = vec![0u8; ct_len];
1002        if let Err(e) = self.reader.read_exact(&mut ciphertext) {
1003            return if e.kind() == io::ErrorKind::UnexpectedEof {
1004                Ok(None)
1005            } else {
1006                Err(FormatError::Io(e))
1007            };
1008        }
1009
1010        let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
1011        AofRecord::from_bytes(&plaintext).map(Some)
1012    }
1013
1014    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
1015    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
1016        let mut payload = Vec::new();
1017        match tag {
1018            TAG_SET => {
1019                // key_len + key + value_len + value + expire_ms
1020                let key = format::read_bytes(&mut self.reader)?;
1021                format::write_bytes(&mut payload, &key)?;
1022                let value = format::read_bytes(&mut self.reader)?;
1023                format::write_bytes(&mut payload, &value)?;
1024                let expire_ms = format::read_i64(&mut self.reader)?;
1025                format::write_i64(&mut payload, expire_ms)?;
1026            }
1027            TAG_DEL => {
1028                let key = format::read_bytes(&mut self.reader)?;
1029                format::write_bytes(&mut payload, &key)?;
1030            }
1031            TAG_EXPIRE => {
1032                let key = format::read_bytes(&mut self.reader)?;
1033                format::write_bytes(&mut payload, &key)?;
1034                let seconds = format::read_i64(&mut self.reader)?;
1035                format::write_i64(&mut payload, seconds)?;
1036            }
1037            TAG_LPUSH | TAG_RPUSH => {
1038                let key = format::read_bytes(&mut self.reader)?;
1039                format::write_bytes(&mut payload, &key)?;
1040                let count = format::read_u32(&mut self.reader)?;
1041                format::validate_collection_count(count, "list")?;
1042                format::write_u32(&mut payload, count)?;
1043                for _ in 0..count {
1044                    let val = format::read_bytes(&mut self.reader)?;
1045                    format::write_bytes(&mut payload, &val)?;
1046                }
1047            }
1048            TAG_LPOP | TAG_RPOP => {
1049                let key = format::read_bytes(&mut self.reader)?;
1050                format::write_bytes(&mut payload, &key)?;
1051            }
1052            TAG_ZADD => {
1053                let key = format::read_bytes(&mut self.reader)?;
1054                format::write_bytes(&mut payload, &key)?;
1055                let count = format::read_u32(&mut self.reader)?;
1056                format::validate_collection_count(count, "sorted set")?;
1057                format::write_u32(&mut payload, count)?;
1058                for _ in 0..count {
1059                    let score = format::read_f64(&mut self.reader)?;
1060                    format::write_f64(&mut payload, score)?;
1061                    let member = format::read_bytes(&mut self.reader)?;
1062                    format::write_bytes(&mut payload, &member)?;
1063                }
1064            }
1065            TAG_ZREM => {
1066                let key = format::read_bytes(&mut self.reader)?;
1067                format::write_bytes(&mut payload, &key)?;
1068                let count = format::read_u32(&mut self.reader)?;
1069                format::validate_collection_count(count, "sorted set")?;
1070                format::write_u32(&mut payload, count)?;
1071                for _ in 0..count {
1072                    let member = format::read_bytes(&mut self.reader)?;
1073                    format::write_bytes(&mut payload, &member)?;
1074                }
1075            }
1076            TAG_PERSIST => {
1077                let key = format::read_bytes(&mut self.reader)?;
1078                format::write_bytes(&mut payload, &key)?;
1079            }
1080            TAG_PEXPIRE => {
1081                let key = format::read_bytes(&mut self.reader)?;
1082                format::write_bytes(&mut payload, &key)?;
1083                let millis = format::read_i64(&mut self.reader)?;
1084                format::write_i64(&mut payload, millis)?;
1085            }
1086            TAG_INCR | TAG_DECR => {
1087                let key = format::read_bytes(&mut self.reader)?;
1088                format::write_bytes(&mut payload, &key)?;
1089            }
1090            TAG_HSET => {
1091                let key = format::read_bytes(&mut self.reader)?;
1092                format::write_bytes(&mut payload, &key)?;
1093                let count = format::read_u32(&mut self.reader)?;
1094                format::validate_collection_count(count, "hash")?;
1095                format::write_u32(&mut payload, count)?;
1096                for _ in 0..count {
1097                    let field = format::read_bytes(&mut self.reader)?;
1098                    format::write_bytes(&mut payload, &field)?;
1099                    let value = format::read_bytes(&mut self.reader)?;
1100                    format::write_bytes(&mut payload, &value)?;
1101                }
1102            }
1103            TAG_HDEL | TAG_SADD | TAG_SREM => {
1104                let key = format::read_bytes(&mut self.reader)?;
1105                format::write_bytes(&mut payload, &key)?;
1106                let count = format::read_u32(&mut self.reader)?;
1107                format::validate_collection_count(count, "set")?;
1108                format::write_u32(&mut payload, count)?;
1109                for _ in 0..count {
1110                    let item = format::read_bytes(&mut self.reader)?;
1111                    format::write_bytes(&mut payload, &item)?;
1112                }
1113            }
1114            TAG_HINCRBY => {
1115                let key = format::read_bytes(&mut self.reader)?;
1116                format::write_bytes(&mut payload, &key)?;
1117                let field = format::read_bytes(&mut self.reader)?;
1118                format::write_bytes(&mut payload, &field)?;
1119                let delta = format::read_i64(&mut self.reader)?;
1120                format::write_i64(&mut payload, delta)?;
1121            }
1122            TAG_INCRBY | TAG_DECRBY => {
1123                let key = format::read_bytes(&mut self.reader)?;
1124                format::write_bytes(&mut payload, &key)?;
1125                let delta = format::read_i64(&mut self.reader)?;
1126                format::write_i64(&mut payload, delta)?;
1127            }
1128            TAG_APPEND => {
1129                let key = format::read_bytes(&mut self.reader)?;
1130                format::write_bytes(&mut payload, &key)?;
1131                let value = format::read_bytes(&mut self.reader)?;
1132                format::write_bytes(&mut payload, &value)?;
1133            }
1134            TAG_RENAME => {
1135                let key = format::read_bytes(&mut self.reader)?;
1136                format::write_bytes(&mut payload, &key)?;
1137                let newkey = format::read_bytes(&mut self.reader)?;
1138                format::write_bytes(&mut payload, &newkey)?;
1139            }
1140            #[cfg(feature = "vector")]
1141            TAG_VADD => {
1142                let key = format::read_bytes(&mut self.reader)?;
1143                format::write_bytes(&mut payload, &key)?;
1144                let element = format::read_bytes(&mut self.reader)?;
1145                format::write_bytes(&mut payload, &element)?;
1146                let dim = format::read_u32(&mut self.reader)?;
1147                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1148                    return Err(FormatError::InvalidData(format!(
1149                        "AOF VADD dimension {dim} exceeds max {}",
1150                        format::MAX_PERSISTED_VECTOR_DIMS
1151                    )));
1152                }
1153                format::write_u32(&mut payload, dim)?;
1154                for _ in 0..dim {
1155                    let v = format::read_f32(&mut self.reader)?;
1156                    format::write_f32(&mut payload, v)?;
1157                }
1158                let metric = format::read_u8(&mut self.reader)?;
1159                format::write_u8(&mut payload, metric)?;
1160                let quantization = format::read_u8(&mut self.reader)?;
1161                format::write_u8(&mut payload, quantization)?;
1162                let connectivity = format::read_u32(&mut self.reader)?;
1163                format::write_u32(&mut payload, connectivity)?;
1164                let expansion_add = format::read_u32(&mut self.reader)?;
1165                format::write_u32(&mut payload, expansion_add)?;
1166            }
1167            #[cfg(feature = "vector")]
1168            TAG_VREM => {
1169                let key = format::read_bytes(&mut self.reader)?;
1170                format::write_bytes(&mut payload, &key)?;
1171                let element = format::read_bytes(&mut self.reader)?;
1172                format::write_bytes(&mut payload, &element)?;
1173            }
1174            #[cfg(feature = "protobuf")]
1175            TAG_PROTO_SET => {
1176                let key = format::read_bytes(&mut self.reader)?;
1177                format::write_bytes(&mut payload, &key)?;
1178                let type_name = format::read_bytes(&mut self.reader)?;
1179                format::write_bytes(&mut payload, &type_name)?;
1180                let data = format::read_bytes(&mut self.reader)?;
1181                format::write_bytes(&mut payload, &data)?;
1182                let expire_ms = format::read_i64(&mut self.reader)?;
1183                format::write_i64(&mut payload, expire_ms)?;
1184            }
1185            #[cfg(feature = "protobuf")]
1186            TAG_PROTO_REGISTER => {
1187                let name = format::read_bytes(&mut self.reader)?;
1188                format::write_bytes(&mut payload, &name)?;
1189                let descriptor = format::read_bytes(&mut self.reader)?;
1190                format::write_bytes(&mut payload, &descriptor)?;
1191            }
1192            _ => return Err(FormatError::UnknownTag(tag)),
1193        }
1194        let stored_crc = format::read_u32(&mut self.reader)?;
1195        Ok((payload, stored_crc))
1196    }
1197}
1198
1199/// Opens a persistence file with create+append and restrictive permissions.
1200fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1201    let mut opts = OpenOptions::new();
1202    opts.create(true).append(true);
1203    #[cfg(unix)]
1204    {
1205        use std::os::unix::fs::OpenOptionsExt;
1206        opts.mode(0o600);
1207    }
1208    Ok(opts.open(path)?)
1209}
1210
1211/// Returns the AOF file path for a given shard in a data directory.
1212pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1213    data_dir.join(format!("shard-{shard_id}.aof"))
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218    use super::*;
1219
1220    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1221
1222    fn temp_dir() -> tempfile::TempDir {
1223        tempfile::tempdir().expect("create temp dir")
1224    }
1225
1226    #[test]
1227    fn record_round_trip_set() -> Result {
1228        let rec = AofRecord::Set {
1229            key: "hello".into(),
1230            value: Bytes::from("world"),
1231            expire_ms: 5000,
1232        };
1233        let bytes = rec.to_bytes()?;
1234        let decoded = AofRecord::from_bytes(&bytes)?;
1235        assert_eq!(rec, decoded);
1236        Ok(())
1237    }
1238
1239    #[test]
1240    fn record_round_trip_del() -> Result {
1241        let rec = AofRecord::Del { key: "gone".into() };
1242        let bytes = rec.to_bytes()?;
1243        let decoded = AofRecord::from_bytes(&bytes)?;
1244        assert_eq!(rec, decoded);
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn record_round_trip_expire() -> Result {
1250        let rec = AofRecord::Expire {
1251            key: "ttl".into(),
1252            seconds: 300,
1253        };
1254        let bytes = rec.to_bytes()?;
1255        let decoded = AofRecord::from_bytes(&bytes)?;
1256        assert_eq!(rec, decoded);
1257        Ok(())
1258    }
1259
1260    #[test]
1261    fn set_with_no_expiry() -> Result {
1262        let rec = AofRecord::Set {
1263            key: "k".into(),
1264            value: Bytes::from("v"),
1265            expire_ms: -1,
1266        };
1267        let bytes = rec.to_bytes()?;
1268        let decoded = AofRecord::from_bytes(&bytes)?;
1269        assert_eq!(rec, decoded);
1270        Ok(())
1271    }
1272
1273    #[test]
1274    fn writer_reader_round_trip() -> Result {
1275        let dir = temp_dir();
1276        let path = dir.path().join("test.aof");
1277
1278        let records = vec![
1279            AofRecord::Set {
1280                key: "a".into(),
1281                value: Bytes::from("1"),
1282                expire_ms: -1,
1283            },
1284            AofRecord::Set {
1285                key: "b".into(),
1286                value: Bytes::from("2"),
1287                expire_ms: 10_000,
1288            },
1289            AofRecord::Del { key: "a".into() },
1290            AofRecord::Expire {
1291                key: "b".into(),
1292                seconds: 60,
1293            },
1294        ];
1295
1296        // write
1297        {
1298            let mut writer = AofWriter::open(&path)?;
1299            for rec in &records {
1300                writer.write_record(rec)?;
1301            }
1302            writer.sync()?;
1303        }
1304
1305        // read back
1306        let mut reader = AofReader::open(&path)?;
1307        let mut got = Vec::new();
1308        while let Some(rec) = reader.read_record()? {
1309            got.push(rec);
1310        }
1311        assert_eq!(records, got);
1312        Ok(())
1313    }
1314
1315    #[test]
1316    fn empty_aof_returns_no_records() -> Result {
1317        let dir = temp_dir();
1318        let path = dir.path().join("empty.aof");
1319
1320        // just write the header
1321        {
1322            let _writer = AofWriter::open(&path)?;
1323        }
1324
1325        let mut reader = AofReader::open(&path)?;
1326        assert!(reader.read_record()?.is_none());
1327        Ok(())
1328    }
1329
1330    #[test]
1331    fn truncated_record_treated_as_eof() -> Result {
1332        let dir = temp_dir();
1333        let path = dir.path().join("trunc.aof");
1334
1335        // write one good record, then append garbage (simulating a crash)
1336        {
1337            let mut writer = AofWriter::open(&path)?;
1338            writer.write_record(&AofRecord::Set {
1339                key: "ok".into(),
1340                value: Bytes::from("good"),
1341                expire_ms: -1,
1342            })?;
1343            writer.flush()?;
1344        }
1345
1346        // append a partial tag with no payload
1347        {
1348            let mut file = OpenOptions::new().append(true).open(&path)?;
1349            file.write_all(&[TAG_SET])?;
1350        }
1351
1352        let mut reader = AofReader::open(&path)?;
1353        // first record should be fine
1354        let rec = reader.read_record()?.unwrap();
1355        assert!(matches!(rec, AofRecord::Set { .. }));
1356        // second should be None (truncated)
1357        assert!(reader.read_record()?.is_none());
1358        Ok(())
1359    }
1360
1361    #[test]
1362    fn corrupt_crc_detected() -> Result {
1363        let dir = temp_dir();
1364        let path = dir.path().join("corrupt.aof");
1365
1366        {
1367            let mut writer = AofWriter::open(&path)?;
1368            writer.write_record(&AofRecord::Set {
1369                key: "k".into(),
1370                value: Bytes::from("v"),
1371                expire_ms: -1,
1372            })?;
1373            writer.flush()?;
1374        }
1375
1376        // corrupt the last byte (part of the CRC)
1377        let mut data = fs::read(&path)?;
1378        let last = data.len() - 1;
1379        data[last] ^= 0xFF;
1380        fs::write(&path, &data)?;
1381
1382        let mut reader = AofReader::open(&path)?;
1383        let err = reader.read_record().unwrap_err();
1384        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1385        Ok(())
1386    }
1387
1388    #[test]
1389    fn missing_magic_is_error() {
1390        let dir = temp_dir();
1391        let path = dir.path().join("bad.aof");
1392        fs::write(&path, b"NOT_AOF_DATA").unwrap();
1393
1394        let err = AofReader::open(&path).unwrap_err();
1395        assert!(matches!(err, FormatError::InvalidMagic));
1396    }
1397
1398    #[test]
1399    fn truncate_resets_aof() -> Result {
1400        let dir = temp_dir();
1401        let path = dir.path().join("reset.aof");
1402
1403        {
1404            let mut writer = AofWriter::open(&path)?;
1405            writer.write_record(&AofRecord::Set {
1406                key: "old".into(),
1407                value: Bytes::from("data"),
1408                expire_ms: -1,
1409            })?;
1410            writer.truncate()?;
1411
1412            // write a new record after truncation
1413            writer.write_record(&AofRecord::Set {
1414                key: "new".into(),
1415                value: Bytes::from("fresh"),
1416                expire_ms: -1,
1417            })?;
1418            writer.sync()?;
1419        }
1420
1421        let mut reader = AofReader::open(&path)?;
1422        let rec = reader.read_record()?.unwrap();
1423        match rec {
1424            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1425            other => panic!("expected Set, got {other:?}"),
1426        }
1427        // only one record after truncation
1428        assert!(reader.read_record()?.is_none());
1429        Ok(())
1430    }
1431
1432    #[test]
1433    fn record_round_trip_lpush() -> Result {
1434        let rec = AofRecord::LPush {
1435            key: "list".into(),
1436            values: vec![Bytes::from("a"), Bytes::from("b")],
1437        };
1438        let bytes = rec.to_bytes()?;
1439        let decoded = AofRecord::from_bytes(&bytes)?;
1440        assert_eq!(rec, decoded);
1441        Ok(())
1442    }
1443
1444    #[test]
1445    fn record_round_trip_rpush() -> Result {
1446        let rec = AofRecord::RPush {
1447            key: "list".into(),
1448            values: vec![Bytes::from("x")],
1449        };
1450        let bytes = rec.to_bytes()?;
1451        let decoded = AofRecord::from_bytes(&bytes)?;
1452        assert_eq!(rec, decoded);
1453        Ok(())
1454    }
1455
1456    #[test]
1457    fn record_round_trip_lpop() -> Result {
1458        let rec = AofRecord::LPop { key: "list".into() };
1459        let bytes = rec.to_bytes()?;
1460        let decoded = AofRecord::from_bytes(&bytes)?;
1461        assert_eq!(rec, decoded);
1462        Ok(())
1463    }
1464
1465    #[test]
1466    fn record_round_trip_rpop() -> Result {
1467        let rec = AofRecord::RPop { key: "list".into() };
1468        let bytes = rec.to_bytes()?;
1469        let decoded = AofRecord::from_bytes(&bytes)?;
1470        assert_eq!(rec, decoded);
1471        Ok(())
1472    }
1473
1474    #[test]
1475    fn writer_reader_round_trip_with_list_records() -> Result {
1476        let dir = temp_dir();
1477        let path = dir.path().join("list.aof");
1478
1479        let records = vec![
1480            AofRecord::LPush {
1481                key: "l".into(),
1482                values: vec![Bytes::from("a"), Bytes::from("b")],
1483            },
1484            AofRecord::RPush {
1485                key: "l".into(),
1486                values: vec![Bytes::from("c")],
1487            },
1488            AofRecord::LPop { key: "l".into() },
1489            AofRecord::RPop { key: "l".into() },
1490        ];
1491
1492        {
1493            let mut writer = AofWriter::open(&path)?;
1494            for rec in &records {
1495                writer.write_record(rec)?;
1496            }
1497            writer.sync()?;
1498        }
1499
1500        let mut reader = AofReader::open(&path)?;
1501        let mut got = Vec::new();
1502        while let Some(rec) = reader.read_record()? {
1503            got.push(rec);
1504        }
1505        assert_eq!(records, got);
1506        Ok(())
1507    }
1508
1509    #[test]
1510    fn record_round_trip_zadd() -> Result {
1511        let rec = AofRecord::ZAdd {
1512            key: "board".into(),
1513            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1514        };
1515        let bytes = rec.to_bytes()?;
1516        let decoded = AofRecord::from_bytes(&bytes)?;
1517        assert_eq!(rec, decoded);
1518        Ok(())
1519    }
1520
1521    #[test]
1522    fn record_round_trip_zrem() -> Result {
1523        let rec = AofRecord::ZRem {
1524            key: "board".into(),
1525            members: vec!["alice".into(), "bob".into()],
1526        };
1527        let bytes = rec.to_bytes()?;
1528        let decoded = AofRecord::from_bytes(&bytes)?;
1529        assert_eq!(rec, decoded);
1530        Ok(())
1531    }
1532
1533    #[test]
1534    fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1535        let dir = temp_dir();
1536        let path = dir.path().join("zset.aof");
1537
1538        let records = vec![
1539            AofRecord::ZAdd {
1540                key: "board".into(),
1541                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1542            },
1543            AofRecord::ZRem {
1544                key: "board".into(),
1545                members: vec!["alice".into()],
1546            },
1547        ];
1548
1549        {
1550            let mut writer = AofWriter::open(&path)?;
1551            for rec in &records {
1552                writer.write_record(rec)?;
1553            }
1554            writer.sync()?;
1555        }
1556
1557        let mut reader = AofReader::open(&path)?;
1558        let mut got = Vec::new();
1559        while let Some(rec) = reader.read_record()? {
1560            got.push(rec);
1561        }
1562        assert_eq!(records, got);
1563        Ok(())
1564    }
1565
1566    #[test]
1567    fn record_round_trip_persist() -> Result {
1568        let rec = AofRecord::Persist {
1569            key: "mykey".into(),
1570        };
1571        let bytes = rec.to_bytes()?;
1572        let decoded = AofRecord::from_bytes(&bytes)?;
1573        assert_eq!(rec, decoded);
1574        Ok(())
1575    }
1576
1577    #[test]
1578    fn record_round_trip_pexpire() -> Result {
1579        let rec = AofRecord::Pexpire {
1580            key: "mykey".into(),
1581            milliseconds: 5000,
1582        };
1583        let bytes = rec.to_bytes()?;
1584        let decoded = AofRecord::from_bytes(&bytes)?;
1585        assert_eq!(rec, decoded);
1586        Ok(())
1587    }
1588
1589    #[test]
1590    fn record_round_trip_incr() -> Result {
1591        let rec = AofRecord::Incr {
1592            key: "counter".into(),
1593        };
1594        let bytes = rec.to_bytes()?;
1595        let decoded = AofRecord::from_bytes(&bytes)?;
1596        assert_eq!(rec, decoded);
1597        Ok(())
1598    }
1599
1600    #[test]
1601    fn record_round_trip_decr() -> Result {
1602        let rec = AofRecord::Decr {
1603            key: "counter".into(),
1604        };
1605        let bytes = rec.to_bytes()?;
1606        let decoded = AofRecord::from_bytes(&bytes)?;
1607        assert_eq!(rec, decoded);
1608        Ok(())
1609    }
1610
1611    #[test]
1612    fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1613        let dir = temp_dir();
1614        let path = dir.path().join("persist_pexpire.aof");
1615
1616        let records = vec![
1617            AofRecord::Set {
1618                key: "k".into(),
1619                value: Bytes::from("v"),
1620                expire_ms: 5000,
1621            },
1622            AofRecord::Persist { key: "k".into() },
1623            AofRecord::Pexpire {
1624                key: "k".into(),
1625                milliseconds: 3000,
1626            },
1627        ];
1628
1629        {
1630            let mut writer = AofWriter::open(&path)?;
1631            for rec in &records {
1632                writer.write_record(rec)?;
1633            }
1634            writer.sync()?;
1635        }
1636
1637        let mut reader = AofReader::open(&path)?;
1638        let mut got = Vec::new();
1639        while let Some(rec) = reader.read_record()? {
1640            got.push(rec);
1641        }
1642        assert_eq!(records, got);
1643        Ok(())
1644    }
1645
1646    #[test]
1647    fn aof_path_format() {
1648        let p = aof_path(Path::new("/data"), 3);
1649        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1650    }
1651
1652    #[test]
1653    fn record_round_trip_hset() -> Result {
1654        let rec = AofRecord::HSet {
1655            key: "hash".into(),
1656            fields: vec![
1657                ("f1".into(), Bytes::from("v1")),
1658                ("f2".into(), Bytes::from("v2")),
1659            ],
1660        };
1661        let bytes = rec.to_bytes()?;
1662        let decoded = AofRecord::from_bytes(&bytes)?;
1663        assert_eq!(rec, decoded);
1664        Ok(())
1665    }
1666
1667    #[test]
1668    fn record_round_trip_hdel() -> Result {
1669        let rec = AofRecord::HDel {
1670            key: "hash".into(),
1671            fields: vec!["f1".into(), "f2".into()],
1672        };
1673        let bytes = rec.to_bytes()?;
1674        let decoded = AofRecord::from_bytes(&bytes)?;
1675        assert_eq!(rec, decoded);
1676        Ok(())
1677    }
1678
1679    #[test]
1680    fn record_round_trip_hincrby() -> Result {
1681        let rec = AofRecord::HIncrBy {
1682            key: "hash".into(),
1683            field: "counter".into(),
1684            delta: -42,
1685        };
1686        let bytes = rec.to_bytes()?;
1687        let decoded = AofRecord::from_bytes(&bytes)?;
1688        assert_eq!(rec, decoded);
1689        Ok(())
1690    }
1691
1692    #[test]
1693    fn record_round_trip_sadd() -> Result {
1694        let rec = AofRecord::SAdd {
1695            key: "set".into(),
1696            members: vec!["m1".into(), "m2".into(), "m3".into()],
1697        };
1698        let bytes = rec.to_bytes()?;
1699        let decoded = AofRecord::from_bytes(&bytes)?;
1700        assert_eq!(rec, decoded);
1701        Ok(())
1702    }
1703
1704    #[test]
1705    fn record_round_trip_srem() -> Result {
1706        let rec = AofRecord::SRem {
1707            key: "set".into(),
1708            members: vec!["m1".into()],
1709        };
1710        let bytes = rec.to_bytes()?;
1711        let decoded = AofRecord::from_bytes(&bytes)?;
1712        assert_eq!(rec, decoded);
1713        Ok(())
1714    }
1715
1716    #[cfg(feature = "vector")]
1717    #[test]
1718    fn record_round_trip_vadd() -> Result {
1719        let rec = AofRecord::VAdd {
1720            key: "embeddings".into(),
1721            element: "doc1".into(),
1722            vector: vec![0.1, 0.2, 0.3],
1723            metric: 0,       // cosine
1724            quantization: 0, // f32
1725            connectivity: 16,
1726            expansion_add: 64,
1727        };
1728        let bytes = rec.to_bytes()?;
1729        let decoded = AofRecord::from_bytes(&bytes)?;
1730        assert_eq!(rec, decoded);
1731        Ok(())
1732    }
1733
1734    #[cfg(feature = "vector")]
1735    #[test]
1736    fn record_round_trip_vadd_high_dim() -> Result {
1737        let rec = AofRecord::VAdd {
1738            key: "vecs".into(),
1739            element: "e".into(),
1740            vector: vec![0.0; 1536], // typical embedding dimension
1741            metric: 1,               // l2
1742            quantization: 1,         // f16
1743            connectivity: 32,
1744            expansion_add: 128,
1745        };
1746        let bytes = rec.to_bytes()?;
1747        let decoded = AofRecord::from_bytes(&bytes)?;
1748        assert_eq!(rec, decoded);
1749        Ok(())
1750    }
1751
1752    #[cfg(feature = "vector")]
1753    #[test]
1754    fn record_round_trip_vrem() -> Result {
1755        let rec = AofRecord::VRem {
1756            key: "embeddings".into(),
1757            element: "doc1".into(),
1758        };
1759        let bytes = rec.to_bytes()?;
1760        let decoded = AofRecord::from_bytes(&bytes)?;
1761        assert_eq!(rec, decoded);
1762        Ok(())
1763    }
1764
1765    #[cfg(feature = "encryption")]
1766    mod encrypted {
1767        use super::*;
1768        use crate::encryption::EncryptionKey;
1769
1770        type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1771
1772        fn test_key() -> EncryptionKey {
1773            EncryptionKey::from_bytes([0x42; 32])
1774        }
1775
1776        #[test]
1777        fn encrypted_writer_reader_round_trip() -> Result {
1778            let dir = temp_dir();
1779            let path = dir.path().join("enc.aof");
1780            let key = test_key();
1781
1782            let records = vec![
1783                AofRecord::Set {
1784                    key: "a".into(),
1785                    value: Bytes::from("1"),
1786                    expire_ms: -1,
1787                },
1788                AofRecord::Del { key: "a".into() },
1789                AofRecord::LPush {
1790                    key: "list".into(),
1791                    values: vec![Bytes::from("x"), Bytes::from("y")],
1792                },
1793                AofRecord::ZAdd {
1794                    key: "zs".into(),
1795                    members: vec![(1.0, "m".into())],
1796                },
1797            ];
1798
1799            {
1800                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1801                for rec in &records {
1802                    writer.write_record(rec)?;
1803                }
1804                writer.sync()?;
1805            }
1806
1807            let mut reader = AofReader::open_encrypted(&path, key)?;
1808            let mut got = Vec::new();
1809            while let Some(rec) = reader.read_record()? {
1810                got.push(rec);
1811            }
1812            assert_eq!(records, got);
1813            Ok(())
1814        }
1815
1816        #[test]
1817        fn encrypted_aof_wrong_key_fails() -> Result {
1818            let dir = temp_dir();
1819            let path = dir.path().join("enc_bad.aof");
1820            let key = test_key();
1821            let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
1822
1823            {
1824                let mut writer = AofWriter::open_encrypted(&path, key)?;
1825                writer.write_record(&AofRecord::Set {
1826                    key: "k".into(),
1827                    value: Bytes::from("v"),
1828                    expire_ms: -1,
1829                })?;
1830                writer.sync()?;
1831            }
1832
1833            let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
1834            let err = reader.read_record().unwrap_err();
1835            assert!(matches!(err, FormatError::DecryptionFailed));
1836            Ok(())
1837        }
1838
1839        #[test]
1840        fn v2_file_readable_with_encryption_key() -> Result {
1841            let dir = temp_dir();
1842            let path = dir.path().join("v2.aof");
1843            let key = test_key();
1844
1845            // write a plaintext v2 file
1846            {
1847                let mut writer = AofWriter::open(&path)?;
1848                writer.write_record(&AofRecord::Set {
1849                    key: "k".into(),
1850                    value: Bytes::from("v"),
1851                    expire_ms: -1,
1852                })?;
1853                writer.sync()?;
1854            }
1855
1856            // read with encryption key — should work (v2 is plaintext)
1857            let mut reader = AofReader::open_encrypted(&path, key)?;
1858            let rec = reader.read_record()?.unwrap();
1859            assert!(matches!(rec, AofRecord::Set { .. }));
1860            Ok(())
1861        }
1862
1863        #[test]
1864        fn v3_file_without_key_returns_error() -> Result {
1865            let dir = temp_dir();
1866            let path = dir.path().join("v3_nokey.aof");
1867            let key = test_key();
1868
1869            // write an encrypted v3 file
1870            {
1871                let mut writer = AofWriter::open_encrypted(&path, key)?;
1872                writer.write_record(&AofRecord::Set {
1873                    key: "k".into(),
1874                    value: Bytes::from("v"),
1875                    expire_ms: -1,
1876                })?;
1877                writer.sync()?;
1878            }
1879
1880            // try to open without a key
1881            let err = AofReader::open(&path).unwrap_err();
1882            assert!(matches!(err, FormatError::EncryptionRequired));
1883            Ok(())
1884        }
1885
1886        #[test]
1887        fn encrypted_truncate_preserves_encryption() -> Result {
1888            let dir = temp_dir();
1889            let path = dir.path().join("enc_trunc.aof");
1890            let key = test_key();
1891
1892            {
1893                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1894                writer.write_record(&AofRecord::Set {
1895                    key: "old".into(),
1896                    value: Bytes::from("data"),
1897                    expire_ms: -1,
1898                })?;
1899                writer.truncate()?;
1900
1901                writer.write_record(&AofRecord::Set {
1902                    key: "new".into(),
1903                    value: Bytes::from("fresh"),
1904                    expire_ms: -1,
1905                })?;
1906                writer.sync()?;
1907            }
1908
1909            let mut reader = AofReader::open_encrypted(&path, key)?;
1910            let rec = reader.read_record()?.unwrap();
1911            match rec {
1912                AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1913                other => panic!("expected Set, got {other:?}"),
1914            }
1915            assert!(reader.read_record()?.is_none());
1916            Ok(())
1917        }
1918    }
1919}