Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30/// Reads a length-prefixed field and decodes it as UTF-8.
31fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32    let bytes = format::read_bytes(r)?;
33    String::from_utf8(bytes).map_err(|_| {
34        FormatError::Io(io::Error::new(
35            io::ErrorKind::InvalidData,
36            format!("{field} is not valid utf-8"),
37        ))
38    })
39}
40
41/// Reads a count-prefixed list of strings: `[count: u32][string]*`.
42/// Used by SADD, SREM, HDEL, and ZREM deserialization.
43fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44    let count = format::read_u32(r)?;
45    format::validate_collection_count(count, field)?;
46    let mut items = Vec::with_capacity(format::capped_capacity(count));
47    for _ in 0..count {
48        items.push(read_string(r, field)?);
49    }
50    Ok(items)
51}
52
53/// Reads a count-prefixed list of raw byte blobs: `[count: u32][bytes]*`.
54/// Used by LPUSH and RPUSH deserialization.
55fn read_bytes_list(r: &mut impl io::Read, label: &str) -> Result<Vec<Bytes>, FormatError> {
56    let count = format::read_u32(r)?;
57    format::validate_collection_count(count, label)?;
58    let mut items = Vec::with_capacity(format::capped_capacity(count));
59    for _ in 0..count {
60        items.push(Bytes::from(format::read_bytes(r)?));
61    }
62    Ok(items)
63}
64
65// -- record tags --
66// values are stable and must not change (on-disk format).
67
68// string
69const TAG_SET: u8 = 1;
70const TAG_INCR: u8 = 12;
71const TAG_DECR: u8 = 13;
72const TAG_INCRBY: u8 = 19;
73const TAG_DECRBY: u8 = 20;
74const TAG_APPEND: u8 = 21;
75
76// list
77const TAG_LPUSH: u8 = 4;
78const TAG_RPUSH: u8 = 5;
79const TAG_LPOP: u8 = 6;
80const TAG_RPOP: u8 = 7;
81
82// sorted set
83const TAG_ZADD: u8 = 8;
84const TAG_ZREM: u8 = 9;
85
86// hash
87const TAG_HSET: u8 = 14;
88const TAG_HDEL: u8 = 15;
89const TAG_HINCRBY: u8 = 16;
90
91// set
92const TAG_SADD: u8 = 17;
93const TAG_SREM: u8 = 18;
94
95// key lifecycle
96const TAG_DEL: u8 = 2;
97const TAG_EXPIRE: u8 = 3;
98const TAG_PERSIST: u8 = 10;
99const TAG_PEXPIRE: u8 = 11;
100const TAG_PEXPIREAT: u8 = 35;
101const TAG_RENAME: u8 = 22;
102const TAG_COPY: u8 = 27;
103const TAG_LSET: u8 = 28;
104const TAG_LTRIM: u8 = 29;
105const TAG_LINSERT: u8 = 30;
106const TAG_LREM: u8 = 31;
107const TAG_SETRANGE: u8 = 32;
108
109// bitmap
110const TAG_SETBIT: u8 = 33;
111const TAG_BITOP: u8 = 34;
112
113// vector
114#[cfg(feature = "vector")]
115const TAG_VADD: u8 = 25;
116#[cfg(feature = "vector")]
117const TAG_VREM: u8 = 26;
118
119// protobuf
120#[cfg(feature = "protobuf")]
121const TAG_PROTO_SET: u8 = 23;
122#[cfg(feature = "protobuf")]
123const TAG_PROTO_REGISTER: u8 = 24;
124
125/// A single mutation record stored in the AOF.
126#[derive(Debug, Clone, PartialEq)]
127pub enum AofRecord {
128    /// SET key value \[expire_ms\]. expire_ms is -1 for no expiration.
129    Set {
130        key: String,
131        value: Bytes,
132        expire_ms: i64,
133    },
134    /// DEL key.
135    Del { key: String },
136    /// EXPIRE key seconds.
137    Expire { key: String, seconds: u64 },
138    /// LPUSH key value [value ...].
139    LPush { key: String, values: Vec<Bytes> },
140    /// RPUSH key value [value ...].
141    RPush { key: String, values: Vec<Bytes> },
142    /// LPOP key.
143    LPop { key: String },
144    /// RPOP key.
145    RPop { key: String },
146    /// LSET key index element.
147    LSet {
148        key: String,
149        index: i64,
150        value: Bytes,
151    },
152    /// LTRIM key start stop.
153    LTrim { key: String, start: i64, stop: i64 },
154    /// LINSERT key BEFORE|AFTER pivot element.
155    LInsert {
156        key: String,
157        before: bool,
158        pivot: Bytes,
159        value: Bytes,
160    },
161    /// LREM key count element.
162    LRem {
163        key: String,
164        count: i64,
165        value: Bytes,
166    },
167    /// ZADD key score member [score member ...].
168    ZAdd {
169        key: String,
170        members: Vec<(f64, String)>,
171    },
172    /// ZREM key member [member ...].
173    ZRem { key: String, members: Vec<String> },
174    /// PERSIST key — remove expiration.
175    Persist { key: String },
176    /// PEXPIRE key milliseconds.
177    Pexpire { key: String, milliseconds: u64 },
178    /// PEXPIREAT key timestamp-ms — set expiry at an absolute Unix timestamp (ms).
179    ///
180    /// Used to persist EXPIREAT and PEXPIREAT commands so that after recovery
181    /// the expiry deadline is the same absolute point in time rather than
182    /// being re-anchored to the moment of replay.
183    Pexpireat { key: String, timestamp_ms: u64 },
184    /// INCR key.
185    Incr { key: String },
186    /// DECR key.
187    Decr { key: String },
188    /// HSET key field value [field value ...].
189    HSet {
190        key: String,
191        fields: Vec<(String, Bytes)>,
192    },
193    /// HDEL key field [field ...].
194    HDel { key: String, fields: Vec<String> },
195    /// HINCRBY key field delta.
196    HIncrBy {
197        key: String,
198        field: String,
199        delta: i64,
200    },
201    /// SADD key member [member ...].
202    SAdd { key: String, members: Vec<String> },
203    /// SREM key member [member ...].
204    SRem { key: String, members: Vec<String> },
205    /// INCRBY key delta.
206    IncrBy { key: String, delta: i64 },
207    /// DECRBY key delta.
208    DecrBy { key: String, delta: i64 },
209    /// APPEND key value.
210    Append { key: String, value: Bytes },
211    /// SETRANGE key offset value.
212    SetRange {
213        key: String,
214        offset: usize,
215        value: Bytes,
216    },
217    /// SETBIT key offset value. Replays the bit mutation verbatim.
218    SetBit { key: String, offset: u64, value: u8 },
219    /// BITOP op destkey key [key ...]. Replays the bitwise operation.
220    ///
221    /// `op` is stored as a raw byte: 0=AND, 1=OR, 2=XOR, 3=NOT.
222    BitOp {
223        op: u8,
224        dest: String,
225        keys: Vec<String>,
226    },
227    /// RENAME key newkey.
228    Rename { key: String, newkey: String },
229    /// COPY source destination [REPLACE].
230    Copy {
231        source: String,
232        destination: String,
233        replace: bool,
234    },
235    /// VADD key element vector [metric quant connectivity expansion_add].
236    /// Stores the full index config so recovery can recreate the set.
237    #[cfg(feature = "vector")]
238    VAdd {
239        key: String,
240        element: String,
241        vector: Vec<f32>,
242        /// 0 = cosine, 1 = l2, 2 = inner product
243        metric: u8,
244        /// 0 = f32, 1 = f16, 2 = i8
245        quantization: u8,
246        connectivity: u32,
247        expansion_add: u32,
248    },
249    /// VREM key element.
250    #[cfg(feature = "vector")]
251    VRem { key: String, element: String },
252    /// PROTO.SET key type_name data [expire_ms].
253    #[cfg(feature = "protobuf")]
254    ProtoSet {
255        key: String,
256        type_name: String,
257        data: Bytes,
258        expire_ms: i64,
259    },
260    /// PROTO.REGISTER name descriptor_bytes (for schema persistence).
261    #[cfg(feature = "protobuf")]
262    ProtoRegister { name: String, descriptor: Bytes },
263}
264
265impl AofRecord {
266    // IMPORTANT: each variant has three match arms that must stay in sync:
267    //   - `tag()`: the one-byte discriminant written to disk
268    //   - `estimated_size()`: the capacity hint for the serialization buffer
269    //   - `to_bytes()`: the actual serialized payload
270    //
271    // When adding a new variant, update all three in that order.
272    // The binary format is stable — tag byte values must never be reused.
273
274    /// Returns the on-disk tag byte for this record variant.
275    fn tag(&self) -> u8 {
276        match self {
277            AofRecord::Set { .. } => TAG_SET,
278            AofRecord::Del { .. } => TAG_DEL,
279            AofRecord::Expire { .. } => TAG_EXPIRE,
280            AofRecord::LPush { .. } => TAG_LPUSH,
281            AofRecord::RPush { .. } => TAG_RPUSH,
282            AofRecord::LPop { .. } => TAG_LPOP,
283            AofRecord::RPop { .. } => TAG_RPOP,
284            AofRecord::LSet { .. } => TAG_LSET,
285            AofRecord::LTrim { .. } => TAG_LTRIM,
286            AofRecord::LInsert { .. } => TAG_LINSERT,
287            AofRecord::LRem { .. } => TAG_LREM,
288            AofRecord::ZAdd { .. } => TAG_ZADD,
289            AofRecord::ZRem { .. } => TAG_ZREM,
290            AofRecord::Persist { .. } => TAG_PERSIST,
291            AofRecord::Pexpire { .. } => TAG_PEXPIRE,
292            AofRecord::Pexpireat { .. } => TAG_PEXPIREAT,
293            AofRecord::Incr { .. } => TAG_INCR,
294            AofRecord::Decr { .. } => TAG_DECR,
295            AofRecord::HSet { .. } => TAG_HSET,
296            AofRecord::HDel { .. } => TAG_HDEL,
297            AofRecord::HIncrBy { .. } => TAG_HINCRBY,
298            AofRecord::SAdd { .. } => TAG_SADD,
299            AofRecord::SRem { .. } => TAG_SREM,
300            AofRecord::IncrBy { .. } => TAG_INCRBY,
301            AofRecord::DecrBy { .. } => TAG_DECRBY,
302            AofRecord::Append { .. } => TAG_APPEND,
303            AofRecord::SetRange { .. } => TAG_SETRANGE,
304            AofRecord::SetBit { .. } => TAG_SETBIT,
305            AofRecord::BitOp { .. } => TAG_BITOP,
306            AofRecord::Rename { .. } => TAG_RENAME,
307            AofRecord::Copy { .. } => TAG_COPY,
308            #[cfg(feature = "vector")]
309            AofRecord::VAdd { .. } => TAG_VADD,
310            #[cfg(feature = "vector")]
311            AofRecord::VRem { .. } => TAG_VREM,
312            #[cfg(feature = "protobuf")]
313            AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
314            #[cfg(feature = "protobuf")]
315            AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
316        }
317    }
318
319    /// Estimates the serialized size of this record in bytes.
320    ///
321    /// Used as a capacity hint for `to_bytes()` to avoid intermediate
322    /// reallocations. The estimate includes the tag byte plus all
323    /// length-prefixed fields, erring slightly high to avoid growing.
324    fn estimated_size(&self) -> usize {
325        // overhead per length-prefixed field: 4 bytes for the u32 length
326        const LEN_PREFIX: usize = 4;
327
328        match self {
329            // 1 tag + 4 key-len + key + 4 value-len + value + 8 expire_ms
330            AofRecord::Set {
331                key,
332                value,
333                expire_ms: _,
334            } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
335            // 1 tag + 4 key-len + key
336            AofRecord::Del { key }
337            | AofRecord::LPop { key }
338            | AofRecord::RPop { key }
339            | AofRecord::Persist { key }
340            | AofRecord::Incr { key }
341            | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
342            // 1 tag + 4 key-len + key + 8 seconds/millis/timestamp
343            AofRecord::Expire { key, .. }
344            | AofRecord::Pexpire { key, .. }
345            | AofRecord::Pexpireat { key, .. } => 1 + LEN_PREFIX + key.len() + 8,
346            // 1 tag + 4 key-len + key + 4 count + (4 value-len + value) * n
347            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
348                let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
349                1 + LEN_PREFIX + key.len() + 4 + values_size
350            }
351            // 1 tag + 4 key-len + key + 8 index + 4 value-len + value
352            AofRecord::LSet { key, value, .. } => {
353                1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
354            }
355            // 1 tag + 4 key-len + key + 8 start + 8 stop
356            AofRecord::LTrim { key, .. } => 1 + LEN_PREFIX + key.len() + 8 + 8,
357            // 1 tag + 4 key-len + key + 1 before + 4 pivot-len + pivot + 4 value-len + value
358            AofRecord::LInsert {
359                key, pivot, value, ..
360            } => {
361                1 + LEN_PREFIX + key.len() + 1 + LEN_PREFIX + pivot.len() + LEN_PREFIX + value.len()
362            }
363            // 1 tag + 4 key-len + key + 8 count + 4 value-len + value
364            AofRecord::LRem { key, value, .. } => {
365                1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
366            }
367            AofRecord::ZAdd { key, members } => {
368                let members_size: usize =
369                    members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
370                1 + LEN_PREFIX + key.len() + 4 + members_size
371            }
372            AofRecord::ZRem { key, members }
373            | AofRecord::SAdd { key, members }
374            | AofRecord::SRem { key, members } => {
375                let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
376                1 + LEN_PREFIX + key.len() + 4 + members_size
377            }
378            AofRecord::HSet { key, fields } => {
379                let fields_size: usize = fields
380                    .iter()
381                    .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
382                    .sum();
383                1 + LEN_PREFIX + key.len() + 4 + fields_size
384            }
385            AofRecord::HDel { key, fields } => {
386                let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
387                1 + LEN_PREFIX + key.len() + 4 + fields_size
388            }
389            AofRecord::HIncrBy { key, field, .. } => {
390                1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
391            }
392            AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
393                1 + LEN_PREFIX + key.len() + 8
394            }
395            AofRecord::Append { key, value } => {
396                1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
397            }
398            AofRecord::SetRange { key, value, .. } => {
399                1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
400            }
401            // 1 tag + 4 key-len + key + 8 offset + 1 value
402            AofRecord::SetBit { key, .. } => 1 + LEN_PREFIX + key.len() + 8 + 1,
403            // 1 tag + 1 op + 4 dest-len + dest + 4 count + (4 key-len + key) * n
404            AofRecord::BitOp { dest, keys, .. } => {
405                let keys_size: usize = keys.iter().map(|k| LEN_PREFIX + k.len()).sum();
406                1 + 1 + LEN_PREFIX + dest.len() + 4 + keys_size
407            }
408            AofRecord::Rename { key, newkey } => {
409                1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
410            }
411            AofRecord::Copy {
412                source,
413                destination,
414                ..
415            } => 1 + LEN_PREFIX + source.len() + LEN_PREFIX + destination.len() + 1,
416            #[cfg(feature = "vector")]
417            AofRecord::VAdd {
418                key,
419                element,
420                vector,
421                ..
422            } => {
423                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
424            }
425            #[cfg(feature = "vector")]
426            AofRecord::VRem { key, element } => {
427                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
428            }
429            #[cfg(feature = "protobuf")]
430            AofRecord::ProtoSet {
431                key,
432                type_name,
433                data,
434                ..
435            } => {
436                1 + LEN_PREFIX
437                    + key.len()
438                    + LEN_PREFIX
439                    + type_name.len()
440                    + LEN_PREFIX
441                    + data.len()
442                    + 8
443            }
444            #[cfg(feature = "protobuf")]
445            AofRecord::ProtoRegister { name, descriptor } => {
446                1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
447            }
448        }
449    }
450
451    /// Serializes this record into a byte vector (tag + payload, no CRC).
452    pub fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
453        let mut buf = Vec::with_capacity(self.estimated_size());
454        format::write_u8(&mut buf, self.tag())?;
455
456        match self {
457            // key-only: tag + key
458            AofRecord::Del { key }
459            | AofRecord::LPop { key }
460            | AofRecord::RPop { key }
461            | AofRecord::Persist { key }
462            | AofRecord::Incr { key }
463            | AofRecord::Decr { key } => {
464                format::write_bytes(&mut buf, key.as_bytes())?;
465            }
466
467            // key + bytes value + expire
468            AofRecord::Set {
469                key,
470                value,
471                expire_ms,
472            } => {
473                format::write_bytes(&mut buf, key.as_bytes())?;
474                format::write_bytes(&mut buf, value)?;
475                format::write_i64(&mut buf, *expire_ms)?;
476            }
477
478            // key + i64 (seconds/milliseconds are capped at i64::MAX on write
479            // so that deserialization can safely cast back to u64)
480            AofRecord::Expire { key, seconds } => {
481                format::write_bytes(&mut buf, key.as_bytes())?;
482                format::write_i64(&mut buf, (*seconds).min(i64::MAX as u64) as i64)?;
483            }
484            AofRecord::Pexpire { key, milliseconds } => {
485                format::write_bytes(&mut buf, key.as_bytes())?;
486                format::write_i64(&mut buf, (*milliseconds).min(i64::MAX as u64) as i64)?;
487            }
488            AofRecord::Pexpireat { key, timestamp_ms } => {
489                format::write_bytes(&mut buf, key.as_bytes())?;
490                format::write_i64(&mut buf, (*timestamp_ms).min(i64::MAX as u64) as i64)?;
491            }
492            AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
493                format::write_bytes(&mut buf, key.as_bytes())?;
494                format::write_i64(&mut buf, *delta)?;
495            }
496
497            // key + byte list
498            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
499                format::write_bytes(&mut buf, key.as_bytes())?;
500                format::write_len(&mut buf, values.len())?;
501                for v in values {
502                    format::write_bytes(&mut buf, v)?;
503                }
504            }
505
506            // key + index + value
507            AofRecord::LSet { key, index, value } => {
508                format::write_bytes(&mut buf, key.as_bytes())?;
509                format::write_i64(&mut buf, *index)?;
510                format::write_bytes(&mut buf, value)?;
511            }
512
513            // key + start + stop
514            AofRecord::LTrim { key, start, stop } => {
515                format::write_bytes(&mut buf, key.as_bytes())?;
516                format::write_i64(&mut buf, *start)?;
517                format::write_i64(&mut buf, *stop)?;
518            }
519
520            // key + before(u8) + pivot + value
521            AofRecord::LInsert {
522                key,
523                before,
524                pivot,
525                value,
526            } => {
527                format::write_bytes(&mut buf, key.as_bytes())?;
528                format::write_u8(&mut buf, if *before { 1 } else { 0 })?;
529                format::write_bytes(&mut buf, pivot)?;
530                format::write_bytes(&mut buf, value)?;
531            }
532
533            // key + count + value
534            AofRecord::LRem { key, count, value } => {
535                format::write_bytes(&mut buf, key.as_bytes())?;
536                format::write_i64(&mut buf, *count)?;
537                format::write_bytes(&mut buf, value)?;
538            }
539
540            // key + string list
541            AofRecord::ZRem { key, members }
542            | AofRecord::SAdd { key, members }
543            | AofRecord::SRem { key, members } => {
544                format::write_bytes(&mut buf, key.as_bytes())?;
545                format::write_len(&mut buf, members.len())?;
546                for member in members {
547                    format::write_bytes(&mut buf, member.as_bytes())?;
548                }
549            }
550            AofRecord::HDel { key, fields } => {
551                format::write_bytes(&mut buf, key.as_bytes())?;
552                format::write_len(&mut buf, fields.len())?;
553                for field in fields {
554                    format::write_bytes(&mut buf, field.as_bytes())?;
555                }
556            }
557
558            // key + scored members
559            AofRecord::ZAdd { key, members } => {
560                format::write_bytes(&mut buf, key.as_bytes())?;
561                format::write_len(&mut buf, members.len())?;
562                for (score, member) in members {
563                    format::write_f64(&mut buf, *score)?;
564                    format::write_bytes(&mut buf, member.as_bytes())?;
565                }
566            }
567
568            // key + field-value pairs
569            AofRecord::HSet { key, fields } => {
570                format::write_bytes(&mut buf, key.as_bytes())?;
571                format::write_len(&mut buf, fields.len())?;
572                for (field, value) in fields {
573                    format::write_bytes(&mut buf, field.as_bytes())?;
574                    format::write_bytes(&mut buf, value)?;
575                }
576            }
577
578            // key + field + delta
579            AofRecord::HIncrBy { key, field, delta } => {
580                format::write_bytes(&mut buf, key.as_bytes())?;
581                format::write_bytes(&mut buf, field.as_bytes())?;
582                format::write_i64(&mut buf, *delta)?;
583            }
584
585            // key + bytes value (no expire)
586            AofRecord::Append { key, value } => {
587                format::write_bytes(&mut buf, key.as_bytes())?;
588                format::write_bytes(&mut buf, value)?;
589            }
590
591            // key + offset (as i64) + bytes value
592            AofRecord::SetRange { key, offset, value } => {
593                format::write_bytes(&mut buf, key.as_bytes())?;
594                format::write_i64(&mut buf, *offset as i64)?;
595                format::write_bytes(&mut buf, value)?;
596            }
597
598            // key + offset (as i64) + bit value (u8)
599            AofRecord::SetBit { key, offset, value } => {
600                format::write_bytes(&mut buf, key.as_bytes())?;
601                // offset is u64 but fits in i64 in practice (max bit offset < 2^32)
602                format::write_i64(&mut buf, *offset as i64)?;
603                format::write_u8(&mut buf, *value)?;
604            }
605
606            // op byte + dest + key list
607            AofRecord::BitOp { op, dest, keys } => {
608                format::write_u8(&mut buf, *op)?;
609                format::write_bytes(&mut buf, dest.as_bytes())?;
610                format::write_len(&mut buf, keys.len())?;
611                for key in keys {
612                    format::write_bytes(&mut buf, key.as_bytes())?;
613                }
614            }
615
616            // key + newkey
617            AofRecord::Rename { key, newkey } => {
618                format::write_bytes(&mut buf, key.as_bytes())?;
619                format::write_bytes(&mut buf, newkey.as_bytes())?;
620            }
621
622            // source + destination + replace flag
623            AofRecord::Copy {
624                source,
625                destination,
626                replace,
627            } => {
628                format::write_bytes(&mut buf, source.as_bytes())?;
629                format::write_bytes(&mut buf, destination.as_bytes())?;
630                buf.push(u8::from(*replace));
631            }
632
633            #[cfg(feature = "vector")]
634            AofRecord::VAdd {
635                key,
636                element,
637                vector,
638                metric,
639                quantization,
640                connectivity,
641                expansion_add,
642            } => {
643                format::write_bytes(&mut buf, key.as_bytes())?;
644                format::write_bytes(&mut buf, element.as_bytes())?;
645                format::write_len(&mut buf, vector.len())?;
646                for &v in vector {
647                    format::write_f32(&mut buf, v)?;
648                }
649                format::write_u8(&mut buf, *metric)?;
650                format::write_u8(&mut buf, *quantization)?;
651                format::write_u32(&mut buf, *connectivity)?;
652                format::write_u32(&mut buf, *expansion_add)?;
653            }
654            #[cfg(feature = "vector")]
655            AofRecord::VRem { key, element } => {
656                format::write_bytes(&mut buf, key.as_bytes())?;
657                format::write_bytes(&mut buf, element.as_bytes())?;
658            }
659
660            #[cfg(feature = "protobuf")]
661            AofRecord::ProtoSet {
662                key,
663                type_name,
664                data,
665                expire_ms,
666            } => {
667                format::write_bytes(&mut buf, key.as_bytes())?;
668                format::write_bytes(&mut buf, type_name.as_bytes())?;
669                format::write_bytes(&mut buf, data)?;
670                format::write_i64(&mut buf, *expire_ms)?;
671            }
672            #[cfg(feature = "protobuf")]
673            AofRecord::ProtoRegister { name, descriptor } => {
674                format::write_bytes(&mut buf, name.as_bytes())?;
675                format::write_bytes(&mut buf, descriptor)?;
676            }
677        }
678        Ok(buf)
679    }
680
681    /// Deserializes a record from its binary payload (tag byte + fields, no CRC).
682    ///
683    /// The format is the same as `to_bytes()`. CRC validation is the caller's
684    /// responsibility (done by the AOF recovery reader before this is called).
685    pub fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
686        let mut cursor = io::Cursor::new(data);
687        let tag = format::read_u8(&mut cursor)?;
688        match tag {
689            TAG_SET => {
690                let key = read_string(&mut cursor, "key")?;
691                let value = format::read_bytes(&mut cursor)?;
692                let expire_ms = format::read_i64(&mut cursor)?;
693                Ok(AofRecord::Set {
694                    key,
695                    value: Bytes::from(value),
696                    expire_ms,
697                })
698            }
699            TAG_DEL => {
700                let key = read_string(&mut cursor, "key")?;
701                Ok(AofRecord::Del { key })
702            }
703            TAG_EXPIRE => {
704                let key = read_string(&mut cursor, "key")?;
705                let raw = format::read_i64(&mut cursor)?;
706                let seconds = u64::try_from(raw).map_err(|_| {
707                    FormatError::InvalidData(format!(
708                        "EXPIRE seconds is negative ({raw}) in AOF record"
709                    ))
710                })?;
711                Ok(AofRecord::Expire { key, seconds })
712            }
713            TAG_LPUSH | TAG_RPUSH => {
714                let key = read_string(&mut cursor, "key")?;
715                let values = read_bytes_list(&mut cursor, "list")?;
716                if tag == TAG_LPUSH {
717                    Ok(AofRecord::LPush { key, values })
718                } else {
719                    Ok(AofRecord::RPush { key, values })
720                }
721            }
722            TAG_LPOP => {
723                let key = read_string(&mut cursor, "key")?;
724                Ok(AofRecord::LPop { key })
725            }
726            TAG_RPOP => {
727                let key = read_string(&mut cursor, "key")?;
728                Ok(AofRecord::RPop { key })
729            }
730            TAG_LSET => {
731                let key = read_string(&mut cursor, "key")?;
732                let index = format::read_i64(&mut cursor)?;
733                let value = Bytes::from(format::read_bytes(&mut cursor)?);
734                Ok(AofRecord::LSet { key, index, value })
735            }
736            TAG_LTRIM => {
737                let key = read_string(&mut cursor, "key")?;
738                let start = format::read_i64(&mut cursor)?;
739                let stop = format::read_i64(&mut cursor)?;
740                Ok(AofRecord::LTrim { key, start, stop })
741            }
742            TAG_LINSERT => {
743                let key = read_string(&mut cursor, "key")?;
744                let before_byte = format::read_u8(&mut cursor)?;
745                let before = before_byte != 0;
746                let pivot = Bytes::from(format::read_bytes(&mut cursor)?);
747                let value = Bytes::from(format::read_bytes(&mut cursor)?);
748                Ok(AofRecord::LInsert {
749                    key,
750                    before,
751                    pivot,
752                    value,
753                })
754            }
755            TAG_LREM => {
756                let key = read_string(&mut cursor, "key")?;
757                let count = format::read_i64(&mut cursor)?;
758                let value = Bytes::from(format::read_bytes(&mut cursor)?);
759                Ok(AofRecord::LRem { key, count, value })
760            }
761            TAG_ZADD => {
762                let key = read_string(&mut cursor, "key")?;
763                let count = format::read_u32(&mut cursor)?;
764                format::validate_collection_count(count, "sorted set")?;
765                let mut members = Vec::with_capacity(format::capped_capacity(count));
766                for _ in 0..count {
767                    let score = format::read_f64(&mut cursor)?;
768                    let member = read_string(&mut cursor, "member")?;
769                    members.push((score, member));
770                }
771                Ok(AofRecord::ZAdd { key, members })
772            }
773            TAG_ZREM => {
774                let key = read_string(&mut cursor, "key")?;
775                let members = read_string_list(&mut cursor, "member")?;
776                Ok(AofRecord::ZRem { key, members })
777            }
778            TAG_PERSIST => {
779                let key = read_string(&mut cursor, "key")?;
780                Ok(AofRecord::Persist { key })
781            }
782            TAG_PEXPIRE => {
783                let key = read_string(&mut cursor, "key")?;
784                let raw = format::read_i64(&mut cursor)?;
785                let milliseconds = u64::try_from(raw).map_err(|_| {
786                    FormatError::InvalidData(format!(
787                        "PEXPIRE milliseconds is negative ({raw}) in AOF record"
788                    ))
789                })?;
790                Ok(AofRecord::Pexpire { key, milliseconds })
791            }
792            TAG_PEXPIREAT => {
793                let key = read_string(&mut cursor, "key")?;
794                let raw = format::read_i64(&mut cursor)?;
795                let timestamp_ms = u64::try_from(raw).map_err(|_| {
796                    FormatError::InvalidData(format!(
797                        "PEXPIREAT timestamp_ms is negative ({raw}) in AOF record"
798                    ))
799                })?;
800                Ok(AofRecord::Pexpireat { key, timestamp_ms })
801            }
802            TAG_INCR => {
803                let key = read_string(&mut cursor, "key")?;
804                Ok(AofRecord::Incr { key })
805            }
806            TAG_DECR => {
807                let key = read_string(&mut cursor, "key")?;
808                Ok(AofRecord::Decr { key })
809            }
810            TAG_HSET => {
811                let key = read_string(&mut cursor, "key")?;
812                let count = format::read_u32(&mut cursor)?;
813                format::validate_collection_count(count, "hash")?;
814                let mut fields = Vec::with_capacity(format::capped_capacity(count));
815                for _ in 0..count {
816                    let field = read_string(&mut cursor, "field")?;
817                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
818                    fields.push((field, value));
819                }
820                Ok(AofRecord::HSet { key, fields })
821            }
822            TAG_HDEL => {
823                let key = read_string(&mut cursor, "key")?;
824                let fields = read_string_list(&mut cursor, "field")?;
825                Ok(AofRecord::HDel { key, fields })
826            }
827            TAG_HINCRBY => {
828                let key = read_string(&mut cursor, "key")?;
829                let field = read_string(&mut cursor, "field")?;
830                let delta = format::read_i64(&mut cursor)?;
831                Ok(AofRecord::HIncrBy { key, field, delta })
832            }
833            TAG_SADD => {
834                let key = read_string(&mut cursor, "key")?;
835                let members = read_string_list(&mut cursor, "member")?;
836                Ok(AofRecord::SAdd { key, members })
837            }
838            TAG_SREM => {
839                let key = read_string(&mut cursor, "key")?;
840                let members = read_string_list(&mut cursor, "member")?;
841                Ok(AofRecord::SRem { key, members })
842            }
843            TAG_INCRBY => {
844                let key = read_string(&mut cursor, "key")?;
845                let delta = format::read_i64(&mut cursor)?;
846                Ok(AofRecord::IncrBy { key, delta })
847            }
848            TAG_DECRBY => {
849                let key = read_string(&mut cursor, "key")?;
850                let delta = format::read_i64(&mut cursor)?;
851                Ok(AofRecord::DecrBy { key, delta })
852            }
853            TAG_APPEND => {
854                let key = read_string(&mut cursor, "key")?;
855                let value = Bytes::from(format::read_bytes(&mut cursor)?);
856                Ok(AofRecord::Append { key, value })
857            }
858            TAG_SETRANGE => {
859                let key = read_string(&mut cursor, "key")?;
860                let offset = format::read_i64(&mut cursor)? as usize;
861                let value = Bytes::from(format::read_bytes(&mut cursor)?);
862                Ok(AofRecord::SetRange { key, offset, value })
863            }
864            TAG_RENAME => {
865                let key = read_string(&mut cursor, "key")?;
866                let newkey = read_string(&mut cursor, "newkey")?;
867                Ok(AofRecord::Rename { key, newkey })
868            }
869            TAG_COPY => {
870                let source = read_string(&mut cursor, "source")?;
871                let destination = read_string(&mut cursor, "destination")?;
872                let replace = format::read_u8(&mut cursor)? != 0;
873                Ok(AofRecord::Copy {
874                    source,
875                    destination,
876                    replace,
877                })
878            }
879            TAG_SETBIT => {
880                let key = read_string(&mut cursor, "key")?;
881                let offset = format::read_i64(&mut cursor)? as u64;
882                let value = format::read_u8(&mut cursor)?;
883                Ok(AofRecord::SetBit { key, offset, value })
884            }
885            TAG_BITOP => {
886                let op = format::read_u8(&mut cursor)?;
887                if op > 3 {
888                    return Err(FormatError::InvalidData(format!(
889                        "BITOP: unknown op byte {op} in AOF record"
890                    )));
891                }
892                let dest = read_string(&mut cursor, "dest")?;
893                let keys = read_string_list(&mut cursor, "key")?;
894                Ok(AofRecord::BitOp { op, dest, keys })
895            }
896            #[cfg(feature = "vector")]
897            TAG_VADD => {
898                let key = read_string(&mut cursor, "key")?;
899                let element = read_string(&mut cursor, "element")?;
900                let dim = format::read_u32(&mut cursor)?;
901                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
902                    return Err(FormatError::InvalidData(format!(
903                        "AOF VADD dimension {dim} exceeds max {}",
904                        format::MAX_PERSISTED_VECTOR_DIMS
905                    )));
906                }
907                let mut vector = Vec::with_capacity(dim as usize);
908                for _ in 0..dim {
909                    vector.push(format::read_f32(&mut cursor)?);
910                }
911                let metric = format::read_u8(&mut cursor)?;
912                let quantization = format::read_u8(&mut cursor)?;
913                let connectivity = format::read_u32(&mut cursor)?;
914                let expansion_add = format::read_u32(&mut cursor)?;
915                Ok(AofRecord::VAdd {
916                    key,
917                    element,
918                    vector,
919                    metric,
920                    quantization,
921                    connectivity,
922                    expansion_add,
923                })
924            }
925            #[cfg(feature = "vector")]
926            TAG_VREM => {
927                let key = read_string(&mut cursor, "key")?;
928                let element = read_string(&mut cursor, "element")?;
929                Ok(AofRecord::VRem { key, element })
930            }
931            #[cfg(feature = "protobuf")]
932            TAG_PROTO_SET => {
933                let key = read_string(&mut cursor, "key")?;
934                let type_name = read_string(&mut cursor, "type_name")?;
935                let data = format::read_bytes(&mut cursor)?;
936                let expire_ms = format::read_i64(&mut cursor)?;
937                Ok(AofRecord::ProtoSet {
938                    key,
939                    type_name,
940                    data: Bytes::from(data),
941                    expire_ms,
942                })
943            }
944            #[cfg(feature = "protobuf")]
945            TAG_PROTO_REGISTER => {
946                let name = read_string(&mut cursor, "name")?;
947                let descriptor = format::read_bytes(&mut cursor)?;
948                Ok(AofRecord::ProtoRegister {
949                    name,
950                    descriptor: Bytes::from(descriptor),
951                })
952            }
953            _ => Err(FormatError::UnknownTag(tag)),
954        }
955    }
956}
957
958/// Configurable fsync policy for the AOF writer.
959#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
960pub enum FsyncPolicy {
961    /// fsync after every write. safest, slowest.
962    Always,
963    /// fsync once per second. the shard tick drives this.
964    #[default]
965    EverySec,
966    /// let the OS decide when to flush. fastest, least durable.
967    No,
968}
969
970/// Buffered writer for appending AOF records to a file.
971pub struct AofWriter {
972    writer: BufWriter<File>,
973    path: PathBuf,
974    #[cfg(feature = "encryption")]
975    encryption_key: Option<crate::encryption::EncryptionKey>,
976}
977
978impl AofWriter {
979    /// Opens (or creates) an AOF file. If the file is new, writes the header.
980    /// If the file already exists, appends to it.
981    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
982        let path = path.into();
983        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
984
985        let file = open_persistence_file(&path)?;
986        let mut writer = BufWriter::new(file);
987
988        if !exists {
989            format::write_header(&mut writer, format::AOF_MAGIC)?;
990            writer.flush()?;
991        }
992
993        Ok(Self {
994            writer,
995            path,
996            #[cfg(feature = "encryption")]
997            encryption_key: None,
998        })
999    }
1000
1001    /// Opens (or creates) an encrypted AOF file using AES-256-GCM.
1002    ///
1003    /// New files get a v3 header. Existing v2 files can be appended to —
1004    /// new records will be written unencrypted (use `BGREWRITEAOF` to
1005    /// migrate the full file to v3).
1006    #[cfg(feature = "encryption")]
1007    pub fn open_encrypted(
1008        path: impl Into<PathBuf>,
1009        key: crate::encryption::EncryptionKey,
1010    ) -> Result<Self, FormatError> {
1011        let path = path.into();
1012        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
1013
1014        let file = open_persistence_file(&path)?;
1015        let mut writer = BufWriter::new(file);
1016
1017        if !exists {
1018            format::write_header_versioned(
1019                &mut writer,
1020                format::AOF_MAGIC,
1021                format::FORMAT_VERSION_ENCRYPTED,
1022            )?;
1023            writer.flush()?;
1024        }
1025
1026        Ok(Self {
1027            writer,
1028            path,
1029            encryption_key: Some(key),
1030        })
1031    }
1032
1033    /// Appends a record to the AOF.
1034    ///
1035    /// When an encryption key is set, writes: `[nonce: 12B][len: 4B][ciphertext]`.
1036    /// Otherwise writes the v2 format: `[tag+payload][crc32: 4B]`.
1037    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
1038        let payload = record.to_bytes()?;
1039
1040        #[cfg(feature = "encryption")]
1041        if let Some(ref key) = self.encryption_key {
1042            let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
1043            self.writer.write_all(&nonce)?;
1044            format::write_len(&mut self.writer, ciphertext.len())?;
1045            self.writer.write_all(&ciphertext)?;
1046            return Ok(());
1047        }
1048
1049        let checksum = format::crc32(&payload);
1050        self.writer.write_all(&payload)?;
1051        format::write_u32(&mut self.writer, checksum)?;
1052        Ok(())
1053    }
1054
1055    /// Flushes the internal buffer to the OS.
1056    pub fn flush(&mut self) -> Result<(), FormatError> {
1057        self.writer.flush()?;
1058        Ok(())
1059    }
1060
1061    /// Flushes and fsyncs the file to disk.
1062    pub fn sync(&mut self) -> Result<(), FormatError> {
1063        self.writer.flush()?;
1064        self.writer.get_ref().sync_all()?;
1065        Ok(())
1066    }
1067
1068    /// Returns the file path.
1069    pub fn path(&self) -> &Path {
1070        &self.path
1071    }
1072
1073    /// Truncates the AOF file back to just the header.
1074    ///
1075    /// Uses write-to-temp-then-rename for crash safety: the old AOF
1076    /// remains intact until the new file (with only a header) is fully
1077    /// synced and atomically renamed into place.
1078    pub fn truncate(&mut self) -> Result<(), FormatError> {
1079        // flush the old writer so no data is in the BufWriter
1080        self.writer.flush()?;
1081
1082        // write a fresh header to a temp file next to the real AOF
1083        let tmp_path = self.path.with_extension("aof.tmp");
1084        let mut opts = OpenOptions::new();
1085        opts.create(true).write(true).truncate(true);
1086        #[cfg(unix)]
1087        {
1088            use std::os::unix::fs::OpenOptionsExt;
1089            opts.mode(0o600);
1090        }
1091        let tmp_file = opts.open(&tmp_path)?;
1092        let mut tmp_writer = BufWriter::new(tmp_file);
1093
1094        #[cfg(feature = "encryption")]
1095        if self.encryption_key.is_some() {
1096            format::write_header_versioned(
1097                &mut tmp_writer,
1098                format::AOF_MAGIC,
1099                format::FORMAT_VERSION_ENCRYPTED,
1100            )?;
1101        } else {
1102            format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1103        }
1104        #[cfg(not(feature = "encryption"))]
1105        format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1106
1107        tmp_writer.flush()?;
1108        tmp_writer.get_ref().sync_all()?;
1109
1110        // atomic rename: old AOF is replaced only after new file is durable
1111        std::fs::rename(&tmp_path, &self.path)?;
1112
1113        // reopen for appending
1114        let file = OpenOptions::new().append(true).open(&self.path)?;
1115        self.writer = BufWriter::new(file);
1116        Ok(())
1117    }
1118}
1119
1120/// Reader for iterating over AOF records.
1121pub struct AofReader {
1122    reader: BufReader<File>,
1123    /// Format version from the file header. v2 = plaintext, v3 = encrypted.
1124    version: u8,
1125    #[cfg(feature = "encryption")]
1126    encryption_key: Option<crate::encryption::EncryptionKey>,
1127}
1128
1129impl fmt::Debug for AofReader {
1130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1131        f.debug_struct("AofReader")
1132            .field("version", &self.version)
1133            .finish()
1134    }
1135}
1136
1137impl AofReader {
1138    /// Opens an AOF file and validates the header.
1139    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
1140        let file = File::open(path.as_ref())?;
1141        let mut reader = BufReader::new(file);
1142        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1143
1144        if version == format::FORMAT_VERSION_ENCRYPTED {
1145            return Err(FormatError::EncryptionRequired);
1146        }
1147
1148        Ok(Self {
1149            reader,
1150            version,
1151            #[cfg(feature = "encryption")]
1152            encryption_key: None,
1153        })
1154    }
1155
1156    /// Opens an AOF file with an encryption key for decrypting v3 records.
1157    ///
1158    /// Also handles v2 (plaintext) files — the key is simply unused,
1159    /// allowing transparent migration.
1160    #[cfg(feature = "encryption")]
1161    pub fn open_encrypted(
1162        path: impl AsRef<Path>,
1163        key: crate::encryption::EncryptionKey,
1164    ) -> Result<Self, FormatError> {
1165        let file = File::open(path.as_ref())?;
1166        let mut reader = BufReader::new(file);
1167        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1168
1169        Ok(Self {
1170            reader,
1171            version,
1172            encryption_key: Some(key),
1173        })
1174    }
1175
1176    /// Reads the next record from the AOF.
1177    ///
1178    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
1179    /// server crashed mid-write), returns `Ok(None)` rather than an error
1180    /// — this is the expected recovery behavior.
1181    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1182        #[cfg(feature = "encryption")]
1183        if self.version == format::FORMAT_VERSION_ENCRYPTED {
1184            return self.read_encrypted_record();
1185        }
1186
1187        self.read_v2_record()
1188    }
1189
1190    /// Reads a v2 (plaintext) record: tag + payload + crc32.
1191    fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1192        // peek for EOF — try reading the tag byte
1193        let tag = match format::read_u8(&mut self.reader) {
1194            Ok(t) => t,
1195            Err(FormatError::UnexpectedEof) => return Ok(None),
1196            Err(e) => return Err(e),
1197        };
1198
1199        // read the rest of the payload based on tag, building the full
1200        // record bytes for CRC verification
1201        let record_result = self.read_payload_for_tag(tag);
1202        match record_result {
1203            Ok((payload, stored_crc)) => {
1204                // prepend the tag to the payload for CRC check
1205                let mut full = Vec::with_capacity(1 + payload.len());
1206                full.push(tag);
1207                full.extend_from_slice(&payload);
1208                format::verify_crc32(&full, stored_crc)?;
1209                AofRecord::from_bytes(&full).map(Some)
1210            }
1211            // truncated record — treat as end of usable data
1212            Err(FormatError::UnexpectedEof) => Ok(None),
1213            Err(e) => Err(e),
1214        }
1215    }
1216
1217    /// Reads a v3 (encrypted) record: nonce + len + ciphertext.
1218    #[cfg(feature = "encryption")]
1219    fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1220        let key = self
1221            .encryption_key
1222            .as_ref()
1223            .ok_or(FormatError::EncryptionRequired)?;
1224
1225        // read the 12-byte nonce
1226        let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
1227        if let Err(e) = self.reader.read_exact(&mut nonce) {
1228            return if e.kind() == io::ErrorKind::UnexpectedEof {
1229                Ok(None)
1230            } else {
1231                Err(FormatError::Io(e))
1232            };
1233        }
1234
1235        // read ciphertext length and ciphertext
1236        let ct_len = match format::read_u32(&mut self.reader) {
1237            Ok(n) => n as usize,
1238            Err(FormatError::UnexpectedEof) => return Ok(None),
1239            Err(e) => return Err(e),
1240        };
1241
1242        if ct_len > format::MAX_FIELD_LEN {
1243            return Err(FormatError::Io(io::Error::new(
1244                io::ErrorKind::InvalidData,
1245                format!("encrypted record length {ct_len} exceeds maximum"),
1246            )));
1247        }
1248
1249        let mut ciphertext = vec![0u8; ct_len];
1250        if let Err(e) = self.reader.read_exact(&mut ciphertext) {
1251            return if e.kind() == io::ErrorKind::UnexpectedEof {
1252                Ok(None)
1253            } else {
1254                Err(FormatError::Io(e))
1255            };
1256        }
1257
1258        let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
1259        AofRecord::from_bytes(&plaintext).map(Some)
1260    }
1261
1262    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
1263    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
1264        let mut payload = Vec::new();
1265        match tag {
1266            TAG_SET => {
1267                // key_len + key + value_len + value + expire_ms
1268                let key = format::read_bytes(&mut self.reader)?;
1269                format::write_bytes(&mut payload, &key)?;
1270                let value = format::read_bytes(&mut self.reader)?;
1271                format::write_bytes(&mut payload, &value)?;
1272                let expire_ms = format::read_i64(&mut self.reader)?;
1273                format::write_i64(&mut payload, expire_ms)?;
1274            }
1275            TAG_DEL => {
1276                let key = format::read_bytes(&mut self.reader)?;
1277                format::write_bytes(&mut payload, &key)?;
1278            }
1279            TAG_EXPIRE => {
1280                let key = format::read_bytes(&mut self.reader)?;
1281                format::write_bytes(&mut payload, &key)?;
1282                let seconds = format::read_i64(&mut self.reader)?;
1283                format::write_i64(&mut payload, seconds)?;
1284            }
1285            TAG_LPUSH | TAG_RPUSH => {
1286                let key = format::read_bytes(&mut self.reader)?;
1287                format::write_bytes(&mut payload, &key)?;
1288                let count = format::read_u32(&mut self.reader)?;
1289                format::validate_collection_count(count, "list")?;
1290                format::write_u32(&mut payload, count)?;
1291                for _ in 0..count {
1292                    let val = format::read_bytes(&mut self.reader)?;
1293                    format::write_bytes(&mut payload, &val)?;
1294                }
1295            }
1296            TAG_LPOP | TAG_RPOP => {
1297                let key = format::read_bytes(&mut self.reader)?;
1298                format::write_bytes(&mut payload, &key)?;
1299            }
1300            TAG_ZADD => {
1301                let key = format::read_bytes(&mut self.reader)?;
1302                format::write_bytes(&mut payload, &key)?;
1303                let count = format::read_u32(&mut self.reader)?;
1304                format::validate_collection_count(count, "sorted set")?;
1305                format::write_u32(&mut payload, count)?;
1306                for _ in 0..count {
1307                    let score = format::read_f64(&mut self.reader)?;
1308                    format::write_f64(&mut payload, score)?;
1309                    let member = format::read_bytes(&mut self.reader)?;
1310                    format::write_bytes(&mut payload, &member)?;
1311                }
1312            }
1313            TAG_ZREM => {
1314                let key = format::read_bytes(&mut self.reader)?;
1315                format::write_bytes(&mut payload, &key)?;
1316                let count = format::read_u32(&mut self.reader)?;
1317                format::validate_collection_count(count, "sorted set")?;
1318                format::write_u32(&mut payload, count)?;
1319                for _ in 0..count {
1320                    let member = format::read_bytes(&mut self.reader)?;
1321                    format::write_bytes(&mut payload, &member)?;
1322                }
1323            }
1324            TAG_PERSIST => {
1325                let key = format::read_bytes(&mut self.reader)?;
1326                format::write_bytes(&mut payload, &key)?;
1327            }
1328            TAG_PEXPIRE => {
1329                let key = format::read_bytes(&mut self.reader)?;
1330                format::write_bytes(&mut payload, &key)?;
1331                let millis = format::read_i64(&mut self.reader)?;
1332                format::write_i64(&mut payload, millis)?;
1333            }
1334            TAG_INCR | TAG_DECR => {
1335                let key = format::read_bytes(&mut self.reader)?;
1336                format::write_bytes(&mut payload, &key)?;
1337            }
1338            TAG_HSET => {
1339                let key = format::read_bytes(&mut self.reader)?;
1340                format::write_bytes(&mut payload, &key)?;
1341                let count = format::read_u32(&mut self.reader)?;
1342                format::validate_collection_count(count, "hash")?;
1343                format::write_u32(&mut payload, count)?;
1344                for _ in 0..count {
1345                    let field = format::read_bytes(&mut self.reader)?;
1346                    format::write_bytes(&mut payload, &field)?;
1347                    let value = format::read_bytes(&mut self.reader)?;
1348                    format::write_bytes(&mut payload, &value)?;
1349                }
1350            }
1351            TAG_HDEL | TAG_SADD | TAG_SREM => {
1352                let key = format::read_bytes(&mut self.reader)?;
1353                format::write_bytes(&mut payload, &key)?;
1354                let count = format::read_u32(&mut self.reader)?;
1355                format::validate_collection_count(count, "set")?;
1356                format::write_u32(&mut payload, count)?;
1357                for _ in 0..count {
1358                    let item = format::read_bytes(&mut self.reader)?;
1359                    format::write_bytes(&mut payload, &item)?;
1360                }
1361            }
1362            TAG_HINCRBY => {
1363                let key = format::read_bytes(&mut self.reader)?;
1364                format::write_bytes(&mut payload, &key)?;
1365                let field = format::read_bytes(&mut self.reader)?;
1366                format::write_bytes(&mut payload, &field)?;
1367                let delta = format::read_i64(&mut self.reader)?;
1368                format::write_i64(&mut payload, delta)?;
1369            }
1370            TAG_INCRBY | TAG_DECRBY => {
1371                let key = format::read_bytes(&mut self.reader)?;
1372                format::write_bytes(&mut payload, &key)?;
1373                let delta = format::read_i64(&mut self.reader)?;
1374                format::write_i64(&mut payload, delta)?;
1375            }
1376            TAG_APPEND => {
1377                let key = format::read_bytes(&mut self.reader)?;
1378                format::write_bytes(&mut payload, &key)?;
1379                let value = format::read_bytes(&mut self.reader)?;
1380                format::write_bytes(&mut payload, &value)?;
1381            }
1382            TAG_RENAME => {
1383                let key = format::read_bytes(&mut self.reader)?;
1384                format::write_bytes(&mut payload, &key)?;
1385                let newkey = format::read_bytes(&mut self.reader)?;
1386                format::write_bytes(&mut payload, &newkey)?;
1387            }
1388            TAG_COPY => {
1389                let source = format::read_bytes(&mut self.reader)?;
1390                format::write_bytes(&mut payload, &source)?;
1391                let dest = format::read_bytes(&mut self.reader)?;
1392                format::write_bytes(&mut payload, &dest)?;
1393                let replace = format::read_u8(&mut self.reader)?;
1394                payload.push(replace);
1395            }
1396            #[cfg(feature = "vector")]
1397            TAG_VADD => {
1398                let key = format::read_bytes(&mut self.reader)?;
1399                format::write_bytes(&mut payload, &key)?;
1400                let element = format::read_bytes(&mut self.reader)?;
1401                format::write_bytes(&mut payload, &element)?;
1402                let dim = format::read_u32(&mut self.reader)?;
1403                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1404                    return Err(FormatError::InvalidData(format!(
1405                        "AOF VADD dimension {dim} exceeds max {}",
1406                        format::MAX_PERSISTED_VECTOR_DIMS
1407                    )));
1408                }
1409                format::write_u32(&mut payload, dim)?;
1410                for _ in 0..dim {
1411                    let v = format::read_f32(&mut self.reader)?;
1412                    format::write_f32(&mut payload, v)?;
1413                }
1414                let metric = format::read_u8(&mut self.reader)?;
1415                format::write_u8(&mut payload, metric)?;
1416                let quantization = format::read_u8(&mut self.reader)?;
1417                format::write_u8(&mut payload, quantization)?;
1418                let connectivity = format::read_u32(&mut self.reader)?;
1419                format::write_u32(&mut payload, connectivity)?;
1420                let expansion_add = format::read_u32(&mut self.reader)?;
1421                format::write_u32(&mut payload, expansion_add)?;
1422            }
1423            #[cfg(feature = "vector")]
1424            TAG_VREM => {
1425                let key = format::read_bytes(&mut self.reader)?;
1426                format::write_bytes(&mut payload, &key)?;
1427                let element = format::read_bytes(&mut self.reader)?;
1428                format::write_bytes(&mut payload, &element)?;
1429            }
1430            #[cfg(feature = "protobuf")]
1431            TAG_PROTO_SET => {
1432                let key = format::read_bytes(&mut self.reader)?;
1433                format::write_bytes(&mut payload, &key)?;
1434                let type_name = format::read_bytes(&mut self.reader)?;
1435                format::write_bytes(&mut payload, &type_name)?;
1436                let data = format::read_bytes(&mut self.reader)?;
1437                format::write_bytes(&mut payload, &data)?;
1438                let expire_ms = format::read_i64(&mut self.reader)?;
1439                format::write_i64(&mut payload, expire_ms)?;
1440            }
1441            #[cfg(feature = "protobuf")]
1442            TAG_PROTO_REGISTER => {
1443                let name = format::read_bytes(&mut self.reader)?;
1444                format::write_bytes(&mut payload, &name)?;
1445                let descriptor = format::read_bytes(&mut self.reader)?;
1446                format::write_bytes(&mut payload, &descriptor)?;
1447            }
1448            _ => return Err(FormatError::UnknownTag(tag)),
1449        }
1450        let stored_crc = format::read_u32(&mut self.reader)?;
1451        Ok((payload, stored_crc))
1452    }
1453}
1454
1455/// Opens a persistence file with create+append and restrictive permissions.
1456fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1457    let mut opts = OpenOptions::new();
1458    opts.create(true).append(true);
1459    #[cfg(unix)]
1460    {
1461        use std::os::unix::fs::OpenOptionsExt;
1462        opts.mode(0o600);
1463    }
1464    Ok(opts.open(path)?)
1465}
1466
1467/// Returns the AOF file path for a given shard in a data directory.
1468pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1469    data_dir.join(format!("shard-{shard_id}.aof"))
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474    use super::*;
1475
1476    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1477
1478    fn temp_dir() -> tempfile::TempDir {
1479        tempfile::tempdir().expect("create temp dir")
1480    }
1481
1482    #[test]
1483    fn record_round_trip_set() -> Result {
1484        let rec = AofRecord::Set {
1485            key: "hello".into(),
1486            value: Bytes::from("world"),
1487            expire_ms: 5000,
1488        };
1489        let bytes = rec.to_bytes()?;
1490        let decoded = AofRecord::from_bytes(&bytes)?;
1491        assert_eq!(rec, decoded);
1492        Ok(())
1493    }
1494
1495    #[test]
1496    fn record_round_trip_del() -> Result {
1497        let rec = AofRecord::Del { key: "gone".into() };
1498        let bytes = rec.to_bytes()?;
1499        let decoded = AofRecord::from_bytes(&bytes)?;
1500        assert_eq!(rec, decoded);
1501        Ok(())
1502    }
1503
1504    #[test]
1505    fn record_round_trip_expire() -> Result {
1506        let rec = AofRecord::Expire {
1507            key: "ttl".into(),
1508            seconds: 300,
1509        };
1510        let bytes = rec.to_bytes()?;
1511        let decoded = AofRecord::from_bytes(&bytes)?;
1512        assert_eq!(rec, decoded);
1513        Ok(())
1514    }
1515
1516    #[test]
1517    fn set_with_no_expiry() -> Result {
1518        let rec = AofRecord::Set {
1519            key: "k".into(),
1520            value: Bytes::from("v"),
1521            expire_ms: -1,
1522        };
1523        let bytes = rec.to_bytes()?;
1524        let decoded = AofRecord::from_bytes(&bytes)?;
1525        assert_eq!(rec, decoded);
1526        Ok(())
1527    }
1528
1529    #[test]
1530    fn writer_reader_round_trip() -> Result {
1531        let dir = temp_dir();
1532        let path = dir.path().join("test.aof");
1533
1534        let records = vec![
1535            AofRecord::Set {
1536                key: "a".into(),
1537                value: Bytes::from("1"),
1538                expire_ms: -1,
1539            },
1540            AofRecord::Set {
1541                key: "b".into(),
1542                value: Bytes::from("2"),
1543                expire_ms: 10_000,
1544            },
1545            AofRecord::Del { key: "a".into() },
1546            AofRecord::Expire {
1547                key: "b".into(),
1548                seconds: 60,
1549            },
1550        ];
1551
1552        // write
1553        {
1554            let mut writer = AofWriter::open(&path)?;
1555            for rec in &records {
1556                writer.write_record(rec)?;
1557            }
1558            writer.sync()?;
1559        }
1560
1561        // read back
1562        let mut reader = AofReader::open(&path)?;
1563        let mut got = Vec::new();
1564        while let Some(rec) = reader.read_record()? {
1565            got.push(rec);
1566        }
1567        assert_eq!(records, got);
1568        Ok(())
1569    }
1570
1571    #[test]
1572    fn empty_aof_returns_no_records() -> Result {
1573        let dir = temp_dir();
1574        let path = dir.path().join("empty.aof");
1575
1576        // just write the header
1577        {
1578            let _writer = AofWriter::open(&path)?;
1579        }
1580
1581        let mut reader = AofReader::open(&path)?;
1582        assert!(reader.read_record()?.is_none());
1583        Ok(())
1584    }
1585
1586    #[test]
1587    fn truncated_record_treated_as_eof() -> Result {
1588        let dir = temp_dir();
1589        let path = dir.path().join("trunc.aof");
1590
1591        // write one good record, then append garbage (simulating a crash)
1592        {
1593            let mut writer = AofWriter::open(&path)?;
1594            writer.write_record(&AofRecord::Set {
1595                key: "ok".into(),
1596                value: Bytes::from("good"),
1597                expire_ms: -1,
1598            })?;
1599            writer.flush()?;
1600        }
1601
1602        // append a partial tag with no payload
1603        {
1604            let mut file = OpenOptions::new().append(true).open(&path)?;
1605            file.write_all(&[TAG_SET])?;
1606        }
1607
1608        let mut reader = AofReader::open(&path)?;
1609        // first record should be fine
1610        let rec = reader.read_record()?.unwrap();
1611        assert!(matches!(rec, AofRecord::Set { .. }));
1612        // second should be None (truncated)
1613        assert!(reader.read_record()?.is_none());
1614        Ok(())
1615    }
1616
1617    #[test]
1618    fn corrupt_crc_detected() -> Result {
1619        let dir = temp_dir();
1620        let path = dir.path().join("corrupt.aof");
1621
1622        {
1623            let mut writer = AofWriter::open(&path)?;
1624            writer.write_record(&AofRecord::Set {
1625                key: "k".into(),
1626                value: Bytes::from("v"),
1627                expire_ms: -1,
1628            })?;
1629            writer.flush()?;
1630        }
1631
1632        // corrupt the last byte (part of the CRC)
1633        let mut data = fs::read(&path)?;
1634        let last = data.len() - 1;
1635        data[last] ^= 0xFF;
1636        fs::write(&path, &data)?;
1637
1638        let mut reader = AofReader::open(&path)?;
1639        let err = reader.read_record().unwrap_err();
1640        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1641        Ok(())
1642    }
1643
1644    #[test]
1645    fn missing_magic_is_error() {
1646        let dir = temp_dir();
1647        let path = dir.path().join("bad.aof");
1648        fs::write(&path, b"NOT_AOF_DATA").unwrap();
1649
1650        let err = AofReader::open(&path).unwrap_err();
1651        assert!(matches!(err, FormatError::InvalidMagic));
1652    }
1653
1654    #[test]
1655    fn truncate_resets_aof() -> Result {
1656        let dir = temp_dir();
1657        let path = dir.path().join("reset.aof");
1658
1659        {
1660            let mut writer = AofWriter::open(&path)?;
1661            writer.write_record(&AofRecord::Set {
1662                key: "old".into(),
1663                value: Bytes::from("data"),
1664                expire_ms: -1,
1665            })?;
1666            writer.truncate()?;
1667
1668            // write a new record after truncation
1669            writer.write_record(&AofRecord::Set {
1670                key: "new".into(),
1671                value: Bytes::from("fresh"),
1672                expire_ms: -1,
1673            })?;
1674            writer.sync()?;
1675        }
1676
1677        let mut reader = AofReader::open(&path)?;
1678        let rec = reader.read_record()?.unwrap();
1679        match rec {
1680            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1681            other => panic!("expected Set, got {other:?}"),
1682        }
1683        // only one record after truncation
1684        assert!(reader.read_record()?.is_none());
1685        Ok(())
1686    }
1687
1688    #[test]
1689    fn record_round_trip_lpush() -> Result {
1690        let rec = AofRecord::LPush {
1691            key: "list".into(),
1692            values: vec![Bytes::from("a"), Bytes::from("b")],
1693        };
1694        let bytes = rec.to_bytes()?;
1695        let decoded = AofRecord::from_bytes(&bytes)?;
1696        assert_eq!(rec, decoded);
1697        Ok(())
1698    }
1699
1700    #[test]
1701    fn record_round_trip_rpush() -> Result {
1702        let rec = AofRecord::RPush {
1703            key: "list".into(),
1704            values: vec![Bytes::from("x")],
1705        };
1706        let bytes = rec.to_bytes()?;
1707        let decoded = AofRecord::from_bytes(&bytes)?;
1708        assert_eq!(rec, decoded);
1709        Ok(())
1710    }
1711
1712    #[test]
1713    fn record_round_trip_lpop() -> Result {
1714        let rec = AofRecord::LPop { key: "list".into() };
1715        let bytes = rec.to_bytes()?;
1716        let decoded = AofRecord::from_bytes(&bytes)?;
1717        assert_eq!(rec, decoded);
1718        Ok(())
1719    }
1720
1721    #[test]
1722    fn record_round_trip_rpop() -> Result {
1723        let rec = AofRecord::RPop { key: "list".into() };
1724        let bytes = rec.to_bytes()?;
1725        let decoded = AofRecord::from_bytes(&bytes)?;
1726        assert_eq!(rec, decoded);
1727        Ok(())
1728    }
1729
1730    #[test]
1731    fn writer_reader_round_trip_with_list_records() -> Result {
1732        let dir = temp_dir();
1733        let path = dir.path().join("list.aof");
1734
1735        let records = vec![
1736            AofRecord::LPush {
1737                key: "l".into(),
1738                values: vec![Bytes::from("a"), Bytes::from("b")],
1739            },
1740            AofRecord::RPush {
1741                key: "l".into(),
1742                values: vec![Bytes::from("c")],
1743            },
1744            AofRecord::LPop { key: "l".into() },
1745            AofRecord::RPop { key: "l".into() },
1746        ];
1747
1748        {
1749            let mut writer = AofWriter::open(&path)?;
1750            for rec in &records {
1751                writer.write_record(rec)?;
1752            }
1753            writer.sync()?;
1754        }
1755
1756        let mut reader = AofReader::open(&path)?;
1757        let mut got = Vec::new();
1758        while let Some(rec) = reader.read_record()? {
1759            got.push(rec);
1760        }
1761        assert_eq!(records, got);
1762        Ok(())
1763    }
1764
1765    #[test]
1766    fn record_round_trip_zadd() -> Result {
1767        let rec = AofRecord::ZAdd {
1768            key: "board".into(),
1769            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1770        };
1771        let bytes = rec.to_bytes()?;
1772        let decoded = AofRecord::from_bytes(&bytes)?;
1773        assert_eq!(rec, decoded);
1774        Ok(())
1775    }
1776
1777    #[test]
1778    fn record_round_trip_zrem() -> Result {
1779        let rec = AofRecord::ZRem {
1780            key: "board".into(),
1781            members: vec!["alice".into(), "bob".into()],
1782        };
1783        let bytes = rec.to_bytes()?;
1784        let decoded = AofRecord::from_bytes(&bytes)?;
1785        assert_eq!(rec, decoded);
1786        Ok(())
1787    }
1788
1789    #[test]
1790    fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1791        let dir = temp_dir();
1792        let path = dir.path().join("zset.aof");
1793
1794        let records = vec![
1795            AofRecord::ZAdd {
1796                key: "board".into(),
1797                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1798            },
1799            AofRecord::ZRem {
1800                key: "board".into(),
1801                members: vec!["alice".into()],
1802            },
1803        ];
1804
1805        {
1806            let mut writer = AofWriter::open(&path)?;
1807            for rec in &records {
1808                writer.write_record(rec)?;
1809            }
1810            writer.sync()?;
1811        }
1812
1813        let mut reader = AofReader::open(&path)?;
1814        let mut got = Vec::new();
1815        while let Some(rec) = reader.read_record()? {
1816            got.push(rec);
1817        }
1818        assert_eq!(records, got);
1819        Ok(())
1820    }
1821
1822    #[test]
1823    fn record_round_trip_persist() -> Result {
1824        let rec = AofRecord::Persist {
1825            key: "mykey".into(),
1826        };
1827        let bytes = rec.to_bytes()?;
1828        let decoded = AofRecord::from_bytes(&bytes)?;
1829        assert_eq!(rec, decoded);
1830        Ok(())
1831    }
1832
1833    #[test]
1834    fn record_round_trip_pexpire() -> Result {
1835        let rec = AofRecord::Pexpire {
1836            key: "mykey".into(),
1837            milliseconds: 5000,
1838        };
1839        let bytes = rec.to_bytes()?;
1840        let decoded = AofRecord::from_bytes(&bytes)?;
1841        assert_eq!(rec, decoded);
1842        Ok(())
1843    }
1844
1845    #[test]
1846    fn record_round_trip_incr() -> Result {
1847        let rec = AofRecord::Incr {
1848            key: "counter".into(),
1849        };
1850        let bytes = rec.to_bytes()?;
1851        let decoded = AofRecord::from_bytes(&bytes)?;
1852        assert_eq!(rec, decoded);
1853        Ok(())
1854    }
1855
1856    #[test]
1857    fn record_round_trip_decr() -> Result {
1858        let rec = AofRecord::Decr {
1859            key: "counter".into(),
1860        };
1861        let bytes = rec.to_bytes()?;
1862        let decoded = AofRecord::from_bytes(&bytes)?;
1863        assert_eq!(rec, decoded);
1864        Ok(())
1865    }
1866
1867    #[test]
1868    fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1869        let dir = temp_dir();
1870        let path = dir.path().join("persist_pexpire.aof");
1871
1872        let records = vec![
1873            AofRecord::Set {
1874                key: "k".into(),
1875                value: Bytes::from("v"),
1876                expire_ms: 5000,
1877            },
1878            AofRecord::Persist { key: "k".into() },
1879            AofRecord::Pexpire {
1880                key: "k".into(),
1881                milliseconds: 3000,
1882            },
1883        ];
1884
1885        {
1886            let mut writer = AofWriter::open(&path)?;
1887            for rec in &records {
1888                writer.write_record(rec)?;
1889            }
1890            writer.sync()?;
1891        }
1892
1893        let mut reader = AofReader::open(&path)?;
1894        let mut got = Vec::new();
1895        while let Some(rec) = reader.read_record()? {
1896            got.push(rec);
1897        }
1898        assert_eq!(records, got);
1899        Ok(())
1900    }
1901
1902    #[test]
1903    fn aof_path_format() {
1904        let p = aof_path(Path::new("/data"), 3);
1905        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1906    }
1907
1908    #[test]
1909    fn record_round_trip_hset() -> Result {
1910        let rec = AofRecord::HSet {
1911            key: "hash".into(),
1912            fields: vec![
1913                ("f1".into(), Bytes::from("v1")),
1914                ("f2".into(), Bytes::from("v2")),
1915            ],
1916        };
1917        let bytes = rec.to_bytes()?;
1918        let decoded = AofRecord::from_bytes(&bytes)?;
1919        assert_eq!(rec, decoded);
1920        Ok(())
1921    }
1922
1923    #[test]
1924    fn record_round_trip_hdel() -> Result {
1925        let rec = AofRecord::HDel {
1926            key: "hash".into(),
1927            fields: vec!["f1".into(), "f2".into()],
1928        };
1929        let bytes = rec.to_bytes()?;
1930        let decoded = AofRecord::from_bytes(&bytes)?;
1931        assert_eq!(rec, decoded);
1932        Ok(())
1933    }
1934
1935    #[test]
1936    fn record_round_trip_hincrby() -> Result {
1937        let rec = AofRecord::HIncrBy {
1938            key: "hash".into(),
1939            field: "counter".into(),
1940            delta: -42,
1941        };
1942        let bytes = rec.to_bytes()?;
1943        let decoded = AofRecord::from_bytes(&bytes)?;
1944        assert_eq!(rec, decoded);
1945        Ok(())
1946    }
1947
1948    #[test]
1949    fn record_round_trip_sadd() -> Result {
1950        let rec = AofRecord::SAdd {
1951            key: "set".into(),
1952            members: vec!["m1".into(), "m2".into(), "m3".into()],
1953        };
1954        let bytes = rec.to_bytes()?;
1955        let decoded = AofRecord::from_bytes(&bytes)?;
1956        assert_eq!(rec, decoded);
1957        Ok(())
1958    }
1959
1960    #[test]
1961    fn record_round_trip_srem() -> Result {
1962        let rec = AofRecord::SRem {
1963            key: "set".into(),
1964            members: vec!["m1".into()],
1965        };
1966        let bytes = rec.to_bytes()?;
1967        let decoded = AofRecord::from_bytes(&bytes)?;
1968        assert_eq!(rec, decoded);
1969        Ok(())
1970    }
1971
1972    #[cfg(feature = "vector")]
1973    #[test]
1974    fn record_round_trip_vadd() -> Result {
1975        let rec = AofRecord::VAdd {
1976            key: "embeddings".into(),
1977            element: "doc1".into(),
1978            vector: vec![0.1, 0.2, 0.3],
1979            metric: 0,       // cosine
1980            quantization: 0, // f32
1981            connectivity: 16,
1982            expansion_add: 64,
1983        };
1984        let bytes = rec.to_bytes()?;
1985        let decoded = AofRecord::from_bytes(&bytes)?;
1986        assert_eq!(rec, decoded);
1987        Ok(())
1988    }
1989
1990    #[cfg(feature = "vector")]
1991    #[test]
1992    fn record_round_trip_vadd_high_dim() -> Result {
1993        let rec = AofRecord::VAdd {
1994            key: "vecs".into(),
1995            element: "e".into(),
1996            vector: vec![0.0; 1536], // typical embedding dimension
1997            metric: 1,               // l2
1998            quantization: 1,         // f16
1999            connectivity: 32,
2000            expansion_add: 128,
2001        };
2002        let bytes = rec.to_bytes()?;
2003        let decoded = AofRecord::from_bytes(&bytes)?;
2004        assert_eq!(rec, decoded);
2005        Ok(())
2006    }
2007
2008    #[cfg(feature = "vector")]
2009    #[test]
2010    fn record_round_trip_vrem() -> Result {
2011        let rec = AofRecord::VRem {
2012            key: "embeddings".into(),
2013            element: "doc1".into(),
2014        };
2015        let bytes = rec.to_bytes()?;
2016        let decoded = AofRecord::from_bytes(&bytes)?;
2017        assert_eq!(rec, decoded);
2018        Ok(())
2019    }
2020
2021    #[cfg(feature = "encryption")]
2022    mod encrypted {
2023        use super::*;
2024        use crate::encryption::EncryptionKey;
2025
2026        type Result = std::result::Result<(), Box<dyn std::error::Error>>;
2027
2028        fn test_key() -> EncryptionKey {
2029            EncryptionKey::from_bytes([0x42; 32])
2030        }
2031
2032        #[test]
2033        fn encrypted_writer_reader_round_trip() -> Result {
2034            let dir = temp_dir();
2035            let path = dir.path().join("enc.aof");
2036            let key = test_key();
2037
2038            let records = vec![
2039                AofRecord::Set {
2040                    key: "a".into(),
2041                    value: Bytes::from("1"),
2042                    expire_ms: -1,
2043                },
2044                AofRecord::Del { key: "a".into() },
2045                AofRecord::LPush {
2046                    key: "list".into(),
2047                    values: vec![Bytes::from("x"), Bytes::from("y")],
2048                },
2049                AofRecord::ZAdd {
2050                    key: "zs".into(),
2051                    members: vec![(1.0, "m".into())],
2052                },
2053            ];
2054
2055            {
2056                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
2057                for rec in &records {
2058                    writer.write_record(rec)?;
2059                }
2060                writer.sync()?;
2061            }
2062
2063            let mut reader = AofReader::open_encrypted(&path, key)?;
2064            let mut got = Vec::new();
2065            while let Some(rec) = reader.read_record()? {
2066                got.push(rec);
2067            }
2068            assert_eq!(records, got);
2069            Ok(())
2070        }
2071
2072        #[test]
2073        fn encrypted_aof_wrong_key_fails() -> Result {
2074            let dir = temp_dir();
2075            let path = dir.path().join("enc_bad.aof");
2076            let key = test_key();
2077            let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
2078
2079            {
2080                let mut writer = AofWriter::open_encrypted(&path, key)?;
2081                writer.write_record(&AofRecord::Set {
2082                    key: "k".into(),
2083                    value: Bytes::from("v"),
2084                    expire_ms: -1,
2085                })?;
2086                writer.sync()?;
2087            }
2088
2089            let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
2090            let err = reader.read_record().unwrap_err();
2091            assert!(matches!(err, FormatError::DecryptionFailed));
2092            Ok(())
2093        }
2094
2095        #[test]
2096        fn v2_file_readable_with_encryption_key() -> Result {
2097            let dir = temp_dir();
2098            let path = dir.path().join("v2.aof");
2099            let key = test_key();
2100
2101            // write a plaintext v2 file
2102            {
2103                let mut writer = AofWriter::open(&path)?;
2104                writer.write_record(&AofRecord::Set {
2105                    key: "k".into(),
2106                    value: Bytes::from("v"),
2107                    expire_ms: -1,
2108                })?;
2109                writer.sync()?;
2110            }
2111
2112            // read with encryption key — should work (v2 is plaintext)
2113            let mut reader = AofReader::open_encrypted(&path, key)?;
2114            let rec = reader.read_record()?.unwrap();
2115            assert!(matches!(rec, AofRecord::Set { .. }));
2116            Ok(())
2117        }
2118
2119        #[test]
2120        fn v3_file_without_key_returns_error() -> Result {
2121            let dir = temp_dir();
2122            let path = dir.path().join("v3_nokey.aof");
2123            let key = test_key();
2124
2125            // write an encrypted v3 file
2126            {
2127                let mut writer = AofWriter::open_encrypted(&path, key)?;
2128                writer.write_record(&AofRecord::Set {
2129                    key: "k".into(),
2130                    value: Bytes::from("v"),
2131                    expire_ms: -1,
2132                })?;
2133                writer.sync()?;
2134            }
2135
2136            // try to open without a key
2137            let err = AofReader::open(&path).unwrap_err();
2138            assert!(matches!(err, FormatError::EncryptionRequired));
2139            Ok(())
2140        }
2141
2142        #[test]
2143        fn encrypted_truncate_preserves_encryption() -> Result {
2144            let dir = temp_dir();
2145            let path = dir.path().join("enc_trunc.aof");
2146            let key = test_key();
2147
2148            {
2149                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
2150                writer.write_record(&AofRecord::Set {
2151                    key: "old".into(),
2152                    value: Bytes::from("data"),
2153                    expire_ms: -1,
2154                })?;
2155                writer.truncate()?;
2156
2157                writer.write_record(&AofRecord::Set {
2158                    key: "new".into(),
2159                    value: Bytes::from("fresh"),
2160                    expire_ms: -1,
2161                })?;
2162                writer.sync()?;
2163            }
2164
2165            let mut reader = AofReader::open_encrypted(&path, key)?;
2166            let rec = reader.read_record()?.unwrap();
2167            match rec {
2168                AofRecord::Set { key, .. } => assert_eq!(key, "new"),
2169                other => panic!("expected Set, got {other:?}"),
2170            }
2171            assert!(reader.read_record()?.is_none());
2172            Ok(())
2173        }
2174    }
2175}