Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30/// Reads a length-prefixed field and decodes it as UTF-8.
31fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32    let bytes = format::read_bytes(r)?;
33    String::from_utf8(bytes).map_err(|_| {
34        FormatError::Io(io::Error::new(
35            io::ErrorKind::InvalidData,
36            format!("{field} is not valid utf-8"),
37        ))
38    })
39}
40
41/// Reads a count-prefixed list of strings: `[count: u32][string]*`.
42/// Used by SADD, SREM, HDEL, and ZREM deserialization.
43fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44    let count = format::read_u32(r)?;
45    format::validate_collection_count(count, field)?;
46    let mut items = Vec::with_capacity(format::capped_capacity(count));
47    for _ in 0..count {
48        items.push(read_string(r, field)?);
49    }
50    Ok(items)
51}
52
53/// Reads a count-prefixed list of raw byte blobs: `[count: u32][bytes]*`.
54/// Used by LPUSH and RPUSH deserialization.
55fn read_bytes_list(r: &mut impl io::Read, label: &str) -> Result<Vec<Bytes>, FormatError> {
56    let count = format::read_u32(r)?;
57    format::validate_collection_count(count, label)?;
58    let mut items = Vec::with_capacity(format::capped_capacity(count));
59    for _ in 0..count {
60        items.push(Bytes::from(format::read_bytes(r)?));
61    }
62    Ok(items)
63}
64
65// -- record tags --
66// values are stable and must not change (on-disk format).
67
68// string
69const TAG_SET: u8 = 1;
70const TAG_INCR: u8 = 12;
71const TAG_DECR: u8 = 13;
72const TAG_INCRBY: u8 = 19;
73const TAG_DECRBY: u8 = 20;
74const TAG_APPEND: u8 = 21;
75
76// list
77const TAG_LPUSH: u8 = 4;
78const TAG_RPUSH: u8 = 5;
79const TAG_LPOP: u8 = 6;
80const TAG_RPOP: u8 = 7;
81
82// sorted set
83const TAG_ZADD: u8 = 8;
84const TAG_ZREM: u8 = 9;
85
86// hash
87const TAG_HSET: u8 = 14;
88const TAG_HDEL: u8 = 15;
89const TAG_HINCRBY: u8 = 16;
90
91// set
92const TAG_SADD: u8 = 17;
93const TAG_SREM: u8 = 18;
94
95// key lifecycle
96const TAG_DEL: u8 = 2;
97const TAG_EXPIRE: u8 = 3;
98const TAG_PERSIST: u8 = 10;
99const TAG_PEXPIRE: u8 = 11;
100const TAG_RENAME: u8 = 22;
101const TAG_COPY: u8 = 27;
102const TAG_LSET: u8 = 28;
103const TAG_LTRIM: u8 = 29;
104const TAG_LINSERT: u8 = 30;
105const TAG_LREM: u8 = 31;
106const TAG_SETRANGE: u8 = 32;
107
108// vector
109#[cfg(feature = "vector")]
110const TAG_VADD: u8 = 25;
111#[cfg(feature = "vector")]
112const TAG_VREM: u8 = 26;
113
114// protobuf
115#[cfg(feature = "protobuf")]
116const TAG_PROTO_SET: u8 = 23;
117#[cfg(feature = "protobuf")]
118const TAG_PROTO_REGISTER: u8 = 24;
119
120/// A single mutation record stored in the AOF.
121#[derive(Debug, Clone, PartialEq)]
122pub enum AofRecord {
123    /// SET key value \[expire_ms\]. expire_ms is -1 for no expiration.
124    Set {
125        key: String,
126        value: Bytes,
127        expire_ms: i64,
128    },
129    /// DEL key.
130    Del { key: String },
131    /// EXPIRE key seconds.
132    Expire { key: String, seconds: u64 },
133    /// LPUSH key value [value ...].
134    LPush { key: String, values: Vec<Bytes> },
135    /// RPUSH key value [value ...].
136    RPush { key: String, values: Vec<Bytes> },
137    /// LPOP key.
138    LPop { key: String },
139    /// RPOP key.
140    RPop { key: String },
141    /// LSET key index element.
142    LSet {
143        key: String,
144        index: i64,
145        value: Bytes,
146    },
147    /// LTRIM key start stop.
148    LTrim { key: String, start: i64, stop: i64 },
149    /// LINSERT key BEFORE|AFTER pivot element.
150    LInsert {
151        key: String,
152        before: bool,
153        pivot: Bytes,
154        value: Bytes,
155    },
156    /// LREM key count element.
157    LRem {
158        key: String,
159        count: i64,
160        value: Bytes,
161    },
162    /// ZADD key score member [score member ...].
163    ZAdd {
164        key: String,
165        members: Vec<(f64, String)>,
166    },
167    /// ZREM key member [member ...].
168    ZRem { key: String, members: Vec<String> },
169    /// PERSIST key — remove expiration.
170    Persist { key: String },
171    /// PEXPIRE key milliseconds.
172    Pexpire { key: String, milliseconds: u64 },
173    /// INCR key.
174    Incr { key: String },
175    /// DECR key.
176    Decr { key: String },
177    /// HSET key field value [field value ...].
178    HSet {
179        key: String,
180        fields: Vec<(String, Bytes)>,
181    },
182    /// HDEL key field [field ...].
183    HDel { key: String, fields: Vec<String> },
184    /// HINCRBY key field delta.
185    HIncrBy {
186        key: String,
187        field: String,
188        delta: i64,
189    },
190    /// SADD key member [member ...].
191    SAdd { key: String, members: Vec<String> },
192    /// SREM key member [member ...].
193    SRem { key: String, members: Vec<String> },
194    /// INCRBY key delta.
195    IncrBy { key: String, delta: i64 },
196    /// DECRBY key delta.
197    DecrBy { key: String, delta: i64 },
198    /// APPEND key value.
199    Append { key: String, value: Bytes },
200    /// SETRANGE key offset value.
201    SetRange {
202        key: String,
203        offset: usize,
204        value: Bytes,
205    },
206    /// RENAME key newkey.
207    Rename { key: String, newkey: String },
208    /// COPY source destination [REPLACE].
209    Copy {
210        source: String,
211        destination: String,
212        replace: bool,
213    },
214    /// VADD key element vector [metric quant connectivity expansion_add].
215    /// Stores the full index config so recovery can recreate the set.
216    #[cfg(feature = "vector")]
217    VAdd {
218        key: String,
219        element: String,
220        vector: Vec<f32>,
221        /// 0 = cosine, 1 = l2, 2 = inner product
222        metric: u8,
223        /// 0 = f32, 1 = f16, 2 = i8
224        quantization: u8,
225        connectivity: u32,
226        expansion_add: u32,
227    },
228    /// VREM key element.
229    #[cfg(feature = "vector")]
230    VRem { key: String, element: String },
231    /// PROTO.SET key type_name data [expire_ms].
232    #[cfg(feature = "protobuf")]
233    ProtoSet {
234        key: String,
235        type_name: String,
236        data: Bytes,
237        expire_ms: i64,
238    },
239    /// PROTO.REGISTER name descriptor_bytes (for schema persistence).
240    #[cfg(feature = "protobuf")]
241    ProtoRegister { name: String, descriptor: Bytes },
242}
243
244impl AofRecord {
245    // IMPORTANT: each variant has three match arms that must stay in sync:
246    //   - `tag()`: the one-byte discriminant written to disk
247    //   - `estimated_size()`: the capacity hint for the serialization buffer
248    //   - `to_bytes()`: the actual serialized payload
249    //
250    // When adding a new variant, update all three in that order.
251    // The binary format is stable — tag byte values must never be reused.
252
253    /// Returns the on-disk tag byte for this record variant.
254    fn tag(&self) -> u8 {
255        match self {
256            AofRecord::Set { .. } => TAG_SET,
257            AofRecord::Del { .. } => TAG_DEL,
258            AofRecord::Expire { .. } => TAG_EXPIRE,
259            AofRecord::LPush { .. } => TAG_LPUSH,
260            AofRecord::RPush { .. } => TAG_RPUSH,
261            AofRecord::LPop { .. } => TAG_LPOP,
262            AofRecord::RPop { .. } => TAG_RPOP,
263            AofRecord::LSet { .. } => TAG_LSET,
264            AofRecord::LTrim { .. } => TAG_LTRIM,
265            AofRecord::LInsert { .. } => TAG_LINSERT,
266            AofRecord::LRem { .. } => TAG_LREM,
267            AofRecord::ZAdd { .. } => TAG_ZADD,
268            AofRecord::ZRem { .. } => TAG_ZREM,
269            AofRecord::Persist { .. } => TAG_PERSIST,
270            AofRecord::Pexpire { .. } => TAG_PEXPIRE,
271            AofRecord::Incr { .. } => TAG_INCR,
272            AofRecord::Decr { .. } => TAG_DECR,
273            AofRecord::HSet { .. } => TAG_HSET,
274            AofRecord::HDel { .. } => TAG_HDEL,
275            AofRecord::HIncrBy { .. } => TAG_HINCRBY,
276            AofRecord::SAdd { .. } => TAG_SADD,
277            AofRecord::SRem { .. } => TAG_SREM,
278            AofRecord::IncrBy { .. } => TAG_INCRBY,
279            AofRecord::DecrBy { .. } => TAG_DECRBY,
280            AofRecord::Append { .. } => TAG_APPEND,
281            AofRecord::SetRange { .. } => TAG_SETRANGE,
282            AofRecord::Rename { .. } => TAG_RENAME,
283            AofRecord::Copy { .. } => TAG_COPY,
284            #[cfg(feature = "vector")]
285            AofRecord::VAdd { .. } => TAG_VADD,
286            #[cfg(feature = "vector")]
287            AofRecord::VRem { .. } => TAG_VREM,
288            #[cfg(feature = "protobuf")]
289            AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
290            #[cfg(feature = "protobuf")]
291            AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
292        }
293    }
294
295    /// Estimates the serialized size of this record in bytes.
296    ///
297    /// Used as a capacity hint for `to_bytes()` to avoid intermediate
298    /// reallocations. The estimate includes the tag byte plus all
299    /// length-prefixed fields, erring slightly high to avoid growing.
300    fn estimated_size(&self) -> usize {
301        // overhead per length-prefixed field: 4 bytes for the u32 length
302        const LEN_PREFIX: usize = 4;
303
304        match self {
305            // 1 tag + 4 key-len + key + 4 value-len + value + 8 expire_ms
306            AofRecord::Set {
307                key,
308                value,
309                expire_ms: _,
310            } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
311            // 1 tag + 4 key-len + key
312            AofRecord::Del { key }
313            | AofRecord::LPop { key }
314            | AofRecord::RPop { key }
315            | AofRecord::Persist { key }
316            | AofRecord::Incr { key }
317            | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
318            // 1 tag + 4 key-len + key + 8 seconds/millis
319            AofRecord::Expire { key, .. } | AofRecord::Pexpire { key, .. } => {
320                1 + LEN_PREFIX + key.len() + 8
321            }
322            // 1 tag + 4 key-len + key + 4 count + (4 value-len + value) * n
323            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
324                let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
325                1 + LEN_PREFIX + key.len() + 4 + values_size
326            }
327            // 1 tag + 4 key-len + key + 8 index + 4 value-len + value
328            AofRecord::LSet { key, value, .. } => {
329                1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
330            }
331            // 1 tag + 4 key-len + key + 8 start + 8 stop
332            AofRecord::LTrim { key, .. } => 1 + LEN_PREFIX + key.len() + 8 + 8,
333            // 1 tag + 4 key-len + key + 1 before + 4 pivot-len + pivot + 4 value-len + value
334            AofRecord::LInsert {
335                key, pivot, value, ..
336            } => {
337                1 + LEN_PREFIX + key.len() + 1 + LEN_PREFIX + pivot.len() + LEN_PREFIX + value.len()
338            }
339            // 1 tag + 4 key-len + key + 8 count + 4 value-len + value
340            AofRecord::LRem { key, value, .. } => {
341                1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
342            }
343            AofRecord::ZAdd { key, members } => {
344                let members_size: usize =
345                    members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
346                1 + LEN_PREFIX + key.len() + 4 + members_size
347            }
348            AofRecord::ZRem { key, members }
349            | AofRecord::SAdd { key, members }
350            | AofRecord::SRem { key, members } => {
351                let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
352                1 + LEN_PREFIX + key.len() + 4 + members_size
353            }
354            AofRecord::HSet { key, fields } => {
355                let fields_size: usize = fields
356                    .iter()
357                    .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
358                    .sum();
359                1 + LEN_PREFIX + key.len() + 4 + fields_size
360            }
361            AofRecord::HDel { key, fields } => {
362                let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
363                1 + LEN_PREFIX + key.len() + 4 + fields_size
364            }
365            AofRecord::HIncrBy { key, field, .. } => {
366                1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
367            }
368            AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
369                1 + LEN_PREFIX + key.len() + 8
370            }
371            AofRecord::Append { key, value } => {
372                1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
373            }
374            AofRecord::SetRange { key, value, .. } => {
375                1 + LEN_PREFIX + key.len() + 8 + LEN_PREFIX + value.len()
376            }
377            AofRecord::Rename { key, newkey } => {
378                1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
379            }
380            AofRecord::Copy {
381                source,
382                destination,
383                ..
384            } => 1 + LEN_PREFIX + source.len() + LEN_PREFIX + destination.len() + 1,
385            #[cfg(feature = "vector")]
386            AofRecord::VAdd {
387                key,
388                element,
389                vector,
390                ..
391            } => {
392                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
393            }
394            #[cfg(feature = "vector")]
395            AofRecord::VRem { key, element } => {
396                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
397            }
398            #[cfg(feature = "protobuf")]
399            AofRecord::ProtoSet {
400                key,
401                type_name,
402                data,
403                ..
404            } => {
405                1 + LEN_PREFIX
406                    + key.len()
407                    + LEN_PREFIX
408                    + type_name.len()
409                    + LEN_PREFIX
410                    + data.len()
411                    + 8
412            }
413            #[cfg(feature = "protobuf")]
414            AofRecord::ProtoRegister { name, descriptor } => {
415                1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
416            }
417        }
418    }
419
420    /// Serializes this record into a byte vector (tag + payload, no CRC).
421    pub fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
422        let mut buf = Vec::with_capacity(self.estimated_size());
423        format::write_u8(&mut buf, self.tag())?;
424
425        match self {
426            // key-only: tag + key
427            AofRecord::Del { key }
428            | AofRecord::LPop { key }
429            | AofRecord::RPop { key }
430            | AofRecord::Persist { key }
431            | AofRecord::Incr { key }
432            | AofRecord::Decr { key } => {
433                format::write_bytes(&mut buf, key.as_bytes())?;
434            }
435
436            // key + bytes value + expire
437            AofRecord::Set {
438                key,
439                value,
440                expire_ms,
441            } => {
442                format::write_bytes(&mut buf, key.as_bytes())?;
443                format::write_bytes(&mut buf, value)?;
444                format::write_i64(&mut buf, *expire_ms)?;
445            }
446
447            // key + i64 (seconds/milliseconds are capped at i64::MAX on write
448            // so that deserialization can safely cast back to u64)
449            AofRecord::Expire { key, seconds } => {
450                format::write_bytes(&mut buf, key.as_bytes())?;
451                format::write_i64(&mut buf, (*seconds).min(i64::MAX as u64) as i64)?;
452            }
453            AofRecord::Pexpire { key, milliseconds } => {
454                format::write_bytes(&mut buf, key.as_bytes())?;
455                format::write_i64(&mut buf, (*milliseconds).min(i64::MAX as u64) as i64)?;
456            }
457            AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
458                format::write_bytes(&mut buf, key.as_bytes())?;
459                format::write_i64(&mut buf, *delta)?;
460            }
461
462            // key + byte list
463            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
464                format::write_bytes(&mut buf, key.as_bytes())?;
465                format::write_len(&mut buf, values.len())?;
466                for v in values {
467                    format::write_bytes(&mut buf, v)?;
468                }
469            }
470
471            // key + index + value
472            AofRecord::LSet { key, index, value } => {
473                format::write_bytes(&mut buf, key.as_bytes())?;
474                format::write_i64(&mut buf, *index)?;
475                format::write_bytes(&mut buf, value)?;
476            }
477
478            // key + start + stop
479            AofRecord::LTrim { key, start, stop } => {
480                format::write_bytes(&mut buf, key.as_bytes())?;
481                format::write_i64(&mut buf, *start)?;
482                format::write_i64(&mut buf, *stop)?;
483            }
484
485            // key + before(u8) + pivot + value
486            AofRecord::LInsert {
487                key,
488                before,
489                pivot,
490                value,
491            } => {
492                format::write_bytes(&mut buf, key.as_bytes())?;
493                format::write_u8(&mut buf, if *before { 1 } else { 0 })?;
494                format::write_bytes(&mut buf, pivot)?;
495                format::write_bytes(&mut buf, value)?;
496            }
497
498            // key + count + value
499            AofRecord::LRem { key, count, value } => {
500                format::write_bytes(&mut buf, key.as_bytes())?;
501                format::write_i64(&mut buf, *count)?;
502                format::write_bytes(&mut buf, value)?;
503            }
504
505            // key + string list
506            AofRecord::ZRem { key, members }
507            | AofRecord::SAdd { key, members }
508            | AofRecord::SRem { key, members } => {
509                format::write_bytes(&mut buf, key.as_bytes())?;
510                format::write_len(&mut buf, members.len())?;
511                for member in members {
512                    format::write_bytes(&mut buf, member.as_bytes())?;
513                }
514            }
515            AofRecord::HDel { key, fields } => {
516                format::write_bytes(&mut buf, key.as_bytes())?;
517                format::write_len(&mut buf, fields.len())?;
518                for field in fields {
519                    format::write_bytes(&mut buf, field.as_bytes())?;
520                }
521            }
522
523            // key + scored members
524            AofRecord::ZAdd { key, members } => {
525                format::write_bytes(&mut buf, key.as_bytes())?;
526                format::write_len(&mut buf, members.len())?;
527                for (score, member) in members {
528                    format::write_f64(&mut buf, *score)?;
529                    format::write_bytes(&mut buf, member.as_bytes())?;
530                }
531            }
532
533            // key + field-value pairs
534            AofRecord::HSet { key, fields } => {
535                format::write_bytes(&mut buf, key.as_bytes())?;
536                format::write_len(&mut buf, fields.len())?;
537                for (field, value) in fields {
538                    format::write_bytes(&mut buf, field.as_bytes())?;
539                    format::write_bytes(&mut buf, value)?;
540                }
541            }
542
543            // key + field + delta
544            AofRecord::HIncrBy { key, field, delta } => {
545                format::write_bytes(&mut buf, key.as_bytes())?;
546                format::write_bytes(&mut buf, field.as_bytes())?;
547                format::write_i64(&mut buf, *delta)?;
548            }
549
550            // key + bytes value (no expire)
551            AofRecord::Append { key, value } => {
552                format::write_bytes(&mut buf, key.as_bytes())?;
553                format::write_bytes(&mut buf, value)?;
554            }
555
556            // key + offset (as i64) + bytes value
557            AofRecord::SetRange { key, offset, value } => {
558                format::write_bytes(&mut buf, key.as_bytes())?;
559                format::write_i64(&mut buf, *offset as i64)?;
560                format::write_bytes(&mut buf, value)?;
561            }
562
563            // key + newkey
564            AofRecord::Rename { key, newkey } => {
565                format::write_bytes(&mut buf, key.as_bytes())?;
566                format::write_bytes(&mut buf, newkey.as_bytes())?;
567            }
568
569            // source + destination + replace flag
570            AofRecord::Copy {
571                source,
572                destination,
573                replace,
574            } => {
575                format::write_bytes(&mut buf, source.as_bytes())?;
576                format::write_bytes(&mut buf, destination.as_bytes())?;
577                buf.push(u8::from(*replace));
578            }
579
580            #[cfg(feature = "vector")]
581            AofRecord::VAdd {
582                key,
583                element,
584                vector,
585                metric,
586                quantization,
587                connectivity,
588                expansion_add,
589            } => {
590                format::write_bytes(&mut buf, key.as_bytes())?;
591                format::write_bytes(&mut buf, element.as_bytes())?;
592                format::write_len(&mut buf, vector.len())?;
593                for &v in vector {
594                    format::write_f32(&mut buf, v)?;
595                }
596                format::write_u8(&mut buf, *metric)?;
597                format::write_u8(&mut buf, *quantization)?;
598                format::write_u32(&mut buf, *connectivity)?;
599                format::write_u32(&mut buf, *expansion_add)?;
600            }
601            #[cfg(feature = "vector")]
602            AofRecord::VRem { key, element } => {
603                format::write_bytes(&mut buf, key.as_bytes())?;
604                format::write_bytes(&mut buf, element.as_bytes())?;
605            }
606
607            #[cfg(feature = "protobuf")]
608            AofRecord::ProtoSet {
609                key,
610                type_name,
611                data,
612                expire_ms,
613            } => {
614                format::write_bytes(&mut buf, key.as_bytes())?;
615                format::write_bytes(&mut buf, type_name.as_bytes())?;
616                format::write_bytes(&mut buf, data)?;
617                format::write_i64(&mut buf, *expire_ms)?;
618            }
619            #[cfg(feature = "protobuf")]
620            AofRecord::ProtoRegister { name, descriptor } => {
621                format::write_bytes(&mut buf, name.as_bytes())?;
622                format::write_bytes(&mut buf, descriptor)?;
623            }
624        }
625        Ok(buf)
626    }
627
628    /// Deserializes a record from its binary payload (tag byte + fields, no CRC).
629    ///
630    /// The format is the same as `to_bytes()`. CRC validation is the caller's
631    /// responsibility (done by the AOF recovery reader before this is called).
632    pub fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
633        let mut cursor = io::Cursor::new(data);
634        let tag = format::read_u8(&mut cursor)?;
635        match tag {
636            TAG_SET => {
637                let key = read_string(&mut cursor, "key")?;
638                let value = format::read_bytes(&mut cursor)?;
639                let expire_ms = format::read_i64(&mut cursor)?;
640                Ok(AofRecord::Set {
641                    key,
642                    value: Bytes::from(value),
643                    expire_ms,
644                })
645            }
646            TAG_DEL => {
647                let key = read_string(&mut cursor, "key")?;
648                Ok(AofRecord::Del { key })
649            }
650            TAG_EXPIRE => {
651                let key = read_string(&mut cursor, "key")?;
652                let raw = format::read_i64(&mut cursor)?;
653                let seconds = u64::try_from(raw).map_err(|_| {
654                    FormatError::InvalidData(format!(
655                        "EXPIRE seconds is negative ({raw}) in AOF record"
656                    ))
657                })?;
658                Ok(AofRecord::Expire { key, seconds })
659            }
660            TAG_LPUSH | TAG_RPUSH => {
661                let key = read_string(&mut cursor, "key")?;
662                let values = read_bytes_list(&mut cursor, "list")?;
663                if tag == TAG_LPUSH {
664                    Ok(AofRecord::LPush { key, values })
665                } else {
666                    Ok(AofRecord::RPush { key, values })
667                }
668            }
669            TAG_LPOP => {
670                let key = read_string(&mut cursor, "key")?;
671                Ok(AofRecord::LPop { key })
672            }
673            TAG_RPOP => {
674                let key = read_string(&mut cursor, "key")?;
675                Ok(AofRecord::RPop { key })
676            }
677            TAG_LSET => {
678                let key = read_string(&mut cursor, "key")?;
679                let index = format::read_i64(&mut cursor)?;
680                let value = Bytes::from(format::read_bytes(&mut cursor)?);
681                Ok(AofRecord::LSet { key, index, value })
682            }
683            TAG_LTRIM => {
684                let key = read_string(&mut cursor, "key")?;
685                let start = format::read_i64(&mut cursor)?;
686                let stop = format::read_i64(&mut cursor)?;
687                Ok(AofRecord::LTrim { key, start, stop })
688            }
689            TAG_LINSERT => {
690                let key = read_string(&mut cursor, "key")?;
691                let before_byte = format::read_u8(&mut cursor)?;
692                let before = before_byte != 0;
693                let pivot = Bytes::from(format::read_bytes(&mut cursor)?);
694                let value = Bytes::from(format::read_bytes(&mut cursor)?);
695                Ok(AofRecord::LInsert {
696                    key,
697                    before,
698                    pivot,
699                    value,
700                })
701            }
702            TAG_LREM => {
703                let key = read_string(&mut cursor, "key")?;
704                let count = format::read_i64(&mut cursor)?;
705                let value = Bytes::from(format::read_bytes(&mut cursor)?);
706                Ok(AofRecord::LRem { key, count, value })
707            }
708            TAG_ZADD => {
709                let key = read_string(&mut cursor, "key")?;
710                let count = format::read_u32(&mut cursor)?;
711                format::validate_collection_count(count, "sorted set")?;
712                let mut members = Vec::with_capacity(format::capped_capacity(count));
713                for _ in 0..count {
714                    let score = format::read_f64(&mut cursor)?;
715                    let member = read_string(&mut cursor, "member")?;
716                    members.push((score, member));
717                }
718                Ok(AofRecord::ZAdd { key, members })
719            }
720            TAG_ZREM => {
721                let key = read_string(&mut cursor, "key")?;
722                let members = read_string_list(&mut cursor, "member")?;
723                Ok(AofRecord::ZRem { key, members })
724            }
725            TAG_PERSIST => {
726                let key = read_string(&mut cursor, "key")?;
727                Ok(AofRecord::Persist { key })
728            }
729            TAG_PEXPIRE => {
730                let key = read_string(&mut cursor, "key")?;
731                let raw = format::read_i64(&mut cursor)?;
732                let milliseconds = u64::try_from(raw).map_err(|_| {
733                    FormatError::InvalidData(format!(
734                        "PEXPIRE milliseconds is negative ({raw}) in AOF record"
735                    ))
736                })?;
737                Ok(AofRecord::Pexpire { key, milliseconds })
738            }
739            TAG_INCR => {
740                let key = read_string(&mut cursor, "key")?;
741                Ok(AofRecord::Incr { key })
742            }
743            TAG_DECR => {
744                let key = read_string(&mut cursor, "key")?;
745                Ok(AofRecord::Decr { key })
746            }
747            TAG_HSET => {
748                let key = read_string(&mut cursor, "key")?;
749                let count = format::read_u32(&mut cursor)?;
750                format::validate_collection_count(count, "hash")?;
751                let mut fields = Vec::with_capacity(format::capped_capacity(count));
752                for _ in 0..count {
753                    let field = read_string(&mut cursor, "field")?;
754                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
755                    fields.push((field, value));
756                }
757                Ok(AofRecord::HSet { key, fields })
758            }
759            TAG_HDEL => {
760                let key = read_string(&mut cursor, "key")?;
761                let fields = read_string_list(&mut cursor, "field")?;
762                Ok(AofRecord::HDel { key, fields })
763            }
764            TAG_HINCRBY => {
765                let key = read_string(&mut cursor, "key")?;
766                let field = read_string(&mut cursor, "field")?;
767                let delta = format::read_i64(&mut cursor)?;
768                Ok(AofRecord::HIncrBy { key, field, delta })
769            }
770            TAG_SADD => {
771                let key = read_string(&mut cursor, "key")?;
772                let members = read_string_list(&mut cursor, "member")?;
773                Ok(AofRecord::SAdd { key, members })
774            }
775            TAG_SREM => {
776                let key = read_string(&mut cursor, "key")?;
777                let members = read_string_list(&mut cursor, "member")?;
778                Ok(AofRecord::SRem { key, members })
779            }
780            TAG_INCRBY => {
781                let key = read_string(&mut cursor, "key")?;
782                let delta = format::read_i64(&mut cursor)?;
783                Ok(AofRecord::IncrBy { key, delta })
784            }
785            TAG_DECRBY => {
786                let key = read_string(&mut cursor, "key")?;
787                let delta = format::read_i64(&mut cursor)?;
788                Ok(AofRecord::DecrBy { key, delta })
789            }
790            TAG_APPEND => {
791                let key = read_string(&mut cursor, "key")?;
792                let value = Bytes::from(format::read_bytes(&mut cursor)?);
793                Ok(AofRecord::Append { key, value })
794            }
795            TAG_SETRANGE => {
796                let key = read_string(&mut cursor, "key")?;
797                let offset = format::read_i64(&mut cursor)? as usize;
798                let value = Bytes::from(format::read_bytes(&mut cursor)?);
799                Ok(AofRecord::SetRange { key, offset, value })
800            }
801            TAG_RENAME => {
802                let key = read_string(&mut cursor, "key")?;
803                let newkey = read_string(&mut cursor, "newkey")?;
804                Ok(AofRecord::Rename { key, newkey })
805            }
806            TAG_COPY => {
807                let source = read_string(&mut cursor, "source")?;
808                let destination = read_string(&mut cursor, "destination")?;
809                let replace = format::read_u8(&mut cursor)? != 0;
810                Ok(AofRecord::Copy {
811                    source,
812                    destination,
813                    replace,
814                })
815            }
816            #[cfg(feature = "vector")]
817            TAG_VADD => {
818                let key = read_string(&mut cursor, "key")?;
819                let element = read_string(&mut cursor, "element")?;
820                let dim = format::read_u32(&mut cursor)?;
821                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
822                    return Err(FormatError::InvalidData(format!(
823                        "AOF VADD dimension {dim} exceeds max {}",
824                        format::MAX_PERSISTED_VECTOR_DIMS
825                    )));
826                }
827                let mut vector = Vec::with_capacity(dim as usize);
828                for _ in 0..dim {
829                    vector.push(format::read_f32(&mut cursor)?);
830                }
831                let metric = format::read_u8(&mut cursor)?;
832                let quantization = format::read_u8(&mut cursor)?;
833                let connectivity = format::read_u32(&mut cursor)?;
834                let expansion_add = format::read_u32(&mut cursor)?;
835                Ok(AofRecord::VAdd {
836                    key,
837                    element,
838                    vector,
839                    metric,
840                    quantization,
841                    connectivity,
842                    expansion_add,
843                })
844            }
845            #[cfg(feature = "vector")]
846            TAG_VREM => {
847                let key = read_string(&mut cursor, "key")?;
848                let element = read_string(&mut cursor, "element")?;
849                Ok(AofRecord::VRem { key, element })
850            }
851            #[cfg(feature = "protobuf")]
852            TAG_PROTO_SET => {
853                let key = read_string(&mut cursor, "key")?;
854                let type_name = read_string(&mut cursor, "type_name")?;
855                let data = format::read_bytes(&mut cursor)?;
856                let expire_ms = format::read_i64(&mut cursor)?;
857                Ok(AofRecord::ProtoSet {
858                    key,
859                    type_name,
860                    data: Bytes::from(data),
861                    expire_ms,
862                })
863            }
864            #[cfg(feature = "protobuf")]
865            TAG_PROTO_REGISTER => {
866                let name = read_string(&mut cursor, "name")?;
867                let descriptor = format::read_bytes(&mut cursor)?;
868                Ok(AofRecord::ProtoRegister {
869                    name,
870                    descriptor: Bytes::from(descriptor),
871                })
872            }
873            _ => Err(FormatError::UnknownTag(tag)),
874        }
875    }
876}
877
878/// Configurable fsync policy for the AOF writer.
879#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
880pub enum FsyncPolicy {
881    /// fsync after every write. safest, slowest.
882    Always,
883    /// fsync once per second. the shard tick drives this.
884    #[default]
885    EverySec,
886    /// let the OS decide when to flush. fastest, least durable.
887    No,
888}
889
890/// Buffered writer for appending AOF records to a file.
891pub struct AofWriter {
892    writer: BufWriter<File>,
893    path: PathBuf,
894    #[cfg(feature = "encryption")]
895    encryption_key: Option<crate::encryption::EncryptionKey>,
896}
897
898impl AofWriter {
899    /// Opens (or creates) an AOF file. If the file is new, writes the header.
900    /// If the file already exists, appends to it.
901    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
902        let path = path.into();
903        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
904
905        let file = open_persistence_file(&path)?;
906        let mut writer = BufWriter::new(file);
907
908        if !exists {
909            format::write_header(&mut writer, format::AOF_MAGIC)?;
910            writer.flush()?;
911        }
912
913        Ok(Self {
914            writer,
915            path,
916            #[cfg(feature = "encryption")]
917            encryption_key: None,
918        })
919    }
920
921    /// Opens (or creates) an encrypted AOF file using AES-256-GCM.
922    ///
923    /// New files get a v3 header. Existing v2 files can be appended to —
924    /// new records will be written unencrypted (use `BGREWRITEAOF` to
925    /// migrate the full file to v3).
926    #[cfg(feature = "encryption")]
927    pub fn open_encrypted(
928        path: impl Into<PathBuf>,
929        key: crate::encryption::EncryptionKey,
930    ) -> Result<Self, FormatError> {
931        let path = path.into();
932        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
933
934        let file = open_persistence_file(&path)?;
935        let mut writer = BufWriter::new(file);
936
937        if !exists {
938            format::write_header_versioned(
939                &mut writer,
940                format::AOF_MAGIC,
941                format::FORMAT_VERSION_ENCRYPTED,
942            )?;
943            writer.flush()?;
944        }
945
946        Ok(Self {
947            writer,
948            path,
949            encryption_key: Some(key),
950        })
951    }
952
953    /// Appends a record to the AOF.
954    ///
955    /// When an encryption key is set, writes: `[nonce: 12B][len: 4B][ciphertext]`.
956    /// Otherwise writes the v2 format: `[tag+payload][crc32: 4B]`.
957    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
958        let payload = record.to_bytes()?;
959
960        #[cfg(feature = "encryption")]
961        if let Some(ref key) = self.encryption_key {
962            let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
963            self.writer.write_all(&nonce)?;
964            format::write_len(&mut self.writer, ciphertext.len())?;
965            self.writer.write_all(&ciphertext)?;
966            return Ok(());
967        }
968
969        let checksum = format::crc32(&payload);
970        self.writer.write_all(&payload)?;
971        format::write_u32(&mut self.writer, checksum)?;
972        Ok(())
973    }
974
975    /// Flushes the internal buffer to the OS.
976    pub fn flush(&mut self) -> Result<(), FormatError> {
977        self.writer.flush()?;
978        Ok(())
979    }
980
981    /// Flushes and fsyncs the file to disk.
982    pub fn sync(&mut self) -> Result<(), FormatError> {
983        self.writer.flush()?;
984        self.writer.get_ref().sync_all()?;
985        Ok(())
986    }
987
988    /// Returns the file path.
989    pub fn path(&self) -> &Path {
990        &self.path
991    }
992
993    /// Truncates the AOF file back to just the header.
994    ///
995    /// Uses write-to-temp-then-rename for crash safety: the old AOF
996    /// remains intact until the new file (with only a header) is fully
997    /// synced and atomically renamed into place.
998    pub fn truncate(&mut self) -> Result<(), FormatError> {
999        // flush the old writer so no data is in the BufWriter
1000        self.writer.flush()?;
1001
1002        // write a fresh header to a temp file next to the real AOF
1003        let tmp_path = self.path.with_extension("aof.tmp");
1004        let mut opts = OpenOptions::new();
1005        opts.create(true).write(true).truncate(true);
1006        #[cfg(unix)]
1007        {
1008            use std::os::unix::fs::OpenOptionsExt;
1009            opts.mode(0o600);
1010        }
1011        let tmp_file = opts.open(&tmp_path)?;
1012        let mut tmp_writer = BufWriter::new(tmp_file);
1013
1014        #[cfg(feature = "encryption")]
1015        if self.encryption_key.is_some() {
1016            format::write_header_versioned(
1017                &mut tmp_writer,
1018                format::AOF_MAGIC,
1019                format::FORMAT_VERSION_ENCRYPTED,
1020            )?;
1021        } else {
1022            format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1023        }
1024        #[cfg(not(feature = "encryption"))]
1025        format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
1026
1027        tmp_writer.flush()?;
1028        tmp_writer.get_ref().sync_all()?;
1029
1030        // atomic rename: old AOF is replaced only after new file is durable
1031        std::fs::rename(&tmp_path, &self.path)?;
1032
1033        // reopen for appending
1034        let file = OpenOptions::new().append(true).open(&self.path)?;
1035        self.writer = BufWriter::new(file);
1036        Ok(())
1037    }
1038}
1039
1040/// Reader for iterating over AOF records.
1041pub struct AofReader {
1042    reader: BufReader<File>,
1043    /// Format version from the file header. v2 = plaintext, v3 = encrypted.
1044    version: u8,
1045    #[cfg(feature = "encryption")]
1046    encryption_key: Option<crate::encryption::EncryptionKey>,
1047}
1048
1049impl fmt::Debug for AofReader {
1050    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1051        f.debug_struct("AofReader")
1052            .field("version", &self.version)
1053            .finish()
1054    }
1055}
1056
1057impl AofReader {
1058    /// Opens an AOF file and validates the header.
1059    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
1060        let file = File::open(path.as_ref())?;
1061        let mut reader = BufReader::new(file);
1062        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1063
1064        if version == format::FORMAT_VERSION_ENCRYPTED {
1065            return Err(FormatError::EncryptionRequired);
1066        }
1067
1068        Ok(Self {
1069            reader,
1070            version,
1071            #[cfg(feature = "encryption")]
1072            encryption_key: None,
1073        })
1074    }
1075
1076    /// Opens an AOF file with an encryption key for decrypting v3 records.
1077    ///
1078    /// Also handles v2 (plaintext) files — the key is simply unused,
1079    /// allowing transparent migration.
1080    #[cfg(feature = "encryption")]
1081    pub fn open_encrypted(
1082        path: impl AsRef<Path>,
1083        key: crate::encryption::EncryptionKey,
1084    ) -> Result<Self, FormatError> {
1085        let file = File::open(path.as_ref())?;
1086        let mut reader = BufReader::new(file);
1087        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
1088
1089        Ok(Self {
1090            reader,
1091            version,
1092            encryption_key: Some(key),
1093        })
1094    }
1095
1096    /// Reads the next record from the AOF.
1097    ///
1098    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
1099    /// server crashed mid-write), returns `Ok(None)` rather than an error
1100    /// — this is the expected recovery behavior.
1101    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1102        #[cfg(feature = "encryption")]
1103        if self.version == format::FORMAT_VERSION_ENCRYPTED {
1104            return self.read_encrypted_record();
1105        }
1106
1107        self.read_v2_record()
1108    }
1109
1110    /// Reads a v2 (plaintext) record: tag + payload + crc32.
1111    fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1112        // peek for EOF — try reading the tag byte
1113        let tag = match format::read_u8(&mut self.reader) {
1114            Ok(t) => t,
1115            Err(FormatError::UnexpectedEof) => return Ok(None),
1116            Err(e) => return Err(e),
1117        };
1118
1119        // read the rest of the payload based on tag, building the full
1120        // record bytes for CRC verification
1121        let record_result = self.read_payload_for_tag(tag);
1122        match record_result {
1123            Ok((payload, stored_crc)) => {
1124                // prepend the tag to the payload for CRC check
1125                let mut full = Vec::with_capacity(1 + payload.len());
1126                full.push(tag);
1127                full.extend_from_slice(&payload);
1128                format::verify_crc32(&full, stored_crc)?;
1129                AofRecord::from_bytes(&full).map(Some)
1130            }
1131            // truncated record — treat as end of usable data
1132            Err(FormatError::UnexpectedEof) => Ok(None),
1133            Err(e) => Err(e),
1134        }
1135    }
1136
1137    /// Reads a v3 (encrypted) record: nonce + len + ciphertext.
1138    #[cfg(feature = "encryption")]
1139    fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
1140        let key = self
1141            .encryption_key
1142            .as_ref()
1143            .ok_or(FormatError::EncryptionRequired)?;
1144
1145        // read the 12-byte nonce
1146        let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
1147        if let Err(e) = self.reader.read_exact(&mut nonce) {
1148            return if e.kind() == io::ErrorKind::UnexpectedEof {
1149                Ok(None)
1150            } else {
1151                Err(FormatError::Io(e))
1152            };
1153        }
1154
1155        // read ciphertext length and ciphertext
1156        let ct_len = match format::read_u32(&mut self.reader) {
1157            Ok(n) => n as usize,
1158            Err(FormatError::UnexpectedEof) => return Ok(None),
1159            Err(e) => return Err(e),
1160        };
1161
1162        if ct_len > format::MAX_FIELD_LEN {
1163            return Err(FormatError::Io(io::Error::new(
1164                io::ErrorKind::InvalidData,
1165                format!("encrypted record length {ct_len} exceeds maximum"),
1166            )));
1167        }
1168
1169        let mut ciphertext = vec![0u8; ct_len];
1170        if let Err(e) = self.reader.read_exact(&mut ciphertext) {
1171            return if e.kind() == io::ErrorKind::UnexpectedEof {
1172                Ok(None)
1173            } else {
1174                Err(FormatError::Io(e))
1175            };
1176        }
1177
1178        let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
1179        AofRecord::from_bytes(&plaintext).map(Some)
1180    }
1181
1182    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
1183    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
1184        let mut payload = Vec::new();
1185        match tag {
1186            TAG_SET => {
1187                // key_len + key + value_len + value + expire_ms
1188                let key = format::read_bytes(&mut self.reader)?;
1189                format::write_bytes(&mut payload, &key)?;
1190                let value = format::read_bytes(&mut self.reader)?;
1191                format::write_bytes(&mut payload, &value)?;
1192                let expire_ms = format::read_i64(&mut self.reader)?;
1193                format::write_i64(&mut payload, expire_ms)?;
1194            }
1195            TAG_DEL => {
1196                let key = format::read_bytes(&mut self.reader)?;
1197                format::write_bytes(&mut payload, &key)?;
1198            }
1199            TAG_EXPIRE => {
1200                let key = format::read_bytes(&mut self.reader)?;
1201                format::write_bytes(&mut payload, &key)?;
1202                let seconds = format::read_i64(&mut self.reader)?;
1203                format::write_i64(&mut payload, seconds)?;
1204            }
1205            TAG_LPUSH | TAG_RPUSH => {
1206                let key = format::read_bytes(&mut self.reader)?;
1207                format::write_bytes(&mut payload, &key)?;
1208                let count = format::read_u32(&mut self.reader)?;
1209                format::validate_collection_count(count, "list")?;
1210                format::write_u32(&mut payload, count)?;
1211                for _ in 0..count {
1212                    let val = format::read_bytes(&mut self.reader)?;
1213                    format::write_bytes(&mut payload, &val)?;
1214                }
1215            }
1216            TAG_LPOP | TAG_RPOP => {
1217                let key = format::read_bytes(&mut self.reader)?;
1218                format::write_bytes(&mut payload, &key)?;
1219            }
1220            TAG_ZADD => {
1221                let key = format::read_bytes(&mut self.reader)?;
1222                format::write_bytes(&mut payload, &key)?;
1223                let count = format::read_u32(&mut self.reader)?;
1224                format::validate_collection_count(count, "sorted set")?;
1225                format::write_u32(&mut payload, count)?;
1226                for _ in 0..count {
1227                    let score = format::read_f64(&mut self.reader)?;
1228                    format::write_f64(&mut payload, score)?;
1229                    let member = format::read_bytes(&mut self.reader)?;
1230                    format::write_bytes(&mut payload, &member)?;
1231                }
1232            }
1233            TAG_ZREM => {
1234                let key = format::read_bytes(&mut self.reader)?;
1235                format::write_bytes(&mut payload, &key)?;
1236                let count = format::read_u32(&mut self.reader)?;
1237                format::validate_collection_count(count, "sorted set")?;
1238                format::write_u32(&mut payload, count)?;
1239                for _ in 0..count {
1240                    let member = format::read_bytes(&mut self.reader)?;
1241                    format::write_bytes(&mut payload, &member)?;
1242                }
1243            }
1244            TAG_PERSIST => {
1245                let key = format::read_bytes(&mut self.reader)?;
1246                format::write_bytes(&mut payload, &key)?;
1247            }
1248            TAG_PEXPIRE => {
1249                let key = format::read_bytes(&mut self.reader)?;
1250                format::write_bytes(&mut payload, &key)?;
1251                let millis = format::read_i64(&mut self.reader)?;
1252                format::write_i64(&mut payload, millis)?;
1253            }
1254            TAG_INCR | TAG_DECR => {
1255                let key = format::read_bytes(&mut self.reader)?;
1256                format::write_bytes(&mut payload, &key)?;
1257            }
1258            TAG_HSET => {
1259                let key = format::read_bytes(&mut self.reader)?;
1260                format::write_bytes(&mut payload, &key)?;
1261                let count = format::read_u32(&mut self.reader)?;
1262                format::validate_collection_count(count, "hash")?;
1263                format::write_u32(&mut payload, count)?;
1264                for _ in 0..count {
1265                    let field = format::read_bytes(&mut self.reader)?;
1266                    format::write_bytes(&mut payload, &field)?;
1267                    let value = format::read_bytes(&mut self.reader)?;
1268                    format::write_bytes(&mut payload, &value)?;
1269                }
1270            }
1271            TAG_HDEL | TAG_SADD | TAG_SREM => {
1272                let key = format::read_bytes(&mut self.reader)?;
1273                format::write_bytes(&mut payload, &key)?;
1274                let count = format::read_u32(&mut self.reader)?;
1275                format::validate_collection_count(count, "set")?;
1276                format::write_u32(&mut payload, count)?;
1277                for _ in 0..count {
1278                    let item = format::read_bytes(&mut self.reader)?;
1279                    format::write_bytes(&mut payload, &item)?;
1280                }
1281            }
1282            TAG_HINCRBY => {
1283                let key = format::read_bytes(&mut self.reader)?;
1284                format::write_bytes(&mut payload, &key)?;
1285                let field = format::read_bytes(&mut self.reader)?;
1286                format::write_bytes(&mut payload, &field)?;
1287                let delta = format::read_i64(&mut self.reader)?;
1288                format::write_i64(&mut payload, delta)?;
1289            }
1290            TAG_INCRBY | TAG_DECRBY => {
1291                let key = format::read_bytes(&mut self.reader)?;
1292                format::write_bytes(&mut payload, &key)?;
1293                let delta = format::read_i64(&mut self.reader)?;
1294                format::write_i64(&mut payload, delta)?;
1295            }
1296            TAG_APPEND => {
1297                let key = format::read_bytes(&mut self.reader)?;
1298                format::write_bytes(&mut payload, &key)?;
1299                let value = format::read_bytes(&mut self.reader)?;
1300                format::write_bytes(&mut payload, &value)?;
1301            }
1302            TAG_RENAME => {
1303                let key = format::read_bytes(&mut self.reader)?;
1304                format::write_bytes(&mut payload, &key)?;
1305                let newkey = format::read_bytes(&mut self.reader)?;
1306                format::write_bytes(&mut payload, &newkey)?;
1307            }
1308            TAG_COPY => {
1309                let source = format::read_bytes(&mut self.reader)?;
1310                format::write_bytes(&mut payload, &source)?;
1311                let dest = format::read_bytes(&mut self.reader)?;
1312                format::write_bytes(&mut payload, &dest)?;
1313                let replace = format::read_u8(&mut self.reader)?;
1314                payload.push(replace);
1315            }
1316            #[cfg(feature = "vector")]
1317            TAG_VADD => {
1318                let key = format::read_bytes(&mut self.reader)?;
1319                format::write_bytes(&mut payload, &key)?;
1320                let element = format::read_bytes(&mut self.reader)?;
1321                format::write_bytes(&mut payload, &element)?;
1322                let dim = format::read_u32(&mut self.reader)?;
1323                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1324                    return Err(FormatError::InvalidData(format!(
1325                        "AOF VADD dimension {dim} exceeds max {}",
1326                        format::MAX_PERSISTED_VECTOR_DIMS
1327                    )));
1328                }
1329                format::write_u32(&mut payload, dim)?;
1330                for _ in 0..dim {
1331                    let v = format::read_f32(&mut self.reader)?;
1332                    format::write_f32(&mut payload, v)?;
1333                }
1334                let metric = format::read_u8(&mut self.reader)?;
1335                format::write_u8(&mut payload, metric)?;
1336                let quantization = format::read_u8(&mut self.reader)?;
1337                format::write_u8(&mut payload, quantization)?;
1338                let connectivity = format::read_u32(&mut self.reader)?;
1339                format::write_u32(&mut payload, connectivity)?;
1340                let expansion_add = format::read_u32(&mut self.reader)?;
1341                format::write_u32(&mut payload, expansion_add)?;
1342            }
1343            #[cfg(feature = "vector")]
1344            TAG_VREM => {
1345                let key = format::read_bytes(&mut self.reader)?;
1346                format::write_bytes(&mut payload, &key)?;
1347                let element = format::read_bytes(&mut self.reader)?;
1348                format::write_bytes(&mut payload, &element)?;
1349            }
1350            #[cfg(feature = "protobuf")]
1351            TAG_PROTO_SET => {
1352                let key = format::read_bytes(&mut self.reader)?;
1353                format::write_bytes(&mut payload, &key)?;
1354                let type_name = format::read_bytes(&mut self.reader)?;
1355                format::write_bytes(&mut payload, &type_name)?;
1356                let data = format::read_bytes(&mut self.reader)?;
1357                format::write_bytes(&mut payload, &data)?;
1358                let expire_ms = format::read_i64(&mut self.reader)?;
1359                format::write_i64(&mut payload, expire_ms)?;
1360            }
1361            #[cfg(feature = "protobuf")]
1362            TAG_PROTO_REGISTER => {
1363                let name = format::read_bytes(&mut self.reader)?;
1364                format::write_bytes(&mut payload, &name)?;
1365                let descriptor = format::read_bytes(&mut self.reader)?;
1366                format::write_bytes(&mut payload, &descriptor)?;
1367            }
1368            _ => return Err(FormatError::UnknownTag(tag)),
1369        }
1370        let stored_crc = format::read_u32(&mut self.reader)?;
1371        Ok((payload, stored_crc))
1372    }
1373}
1374
1375/// Opens a persistence file with create+append and restrictive permissions.
1376fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1377    let mut opts = OpenOptions::new();
1378    opts.create(true).append(true);
1379    #[cfg(unix)]
1380    {
1381        use std::os::unix::fs::OpenOptionsExt;
1382        opts.mode(0o600);
1383    }
1384    Ok(opts.open(path)?)
1385}
1386
1387/// Returns the AOF file path for a given shard in a data directory.
1388pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1389    data_dir.join(format!("shard-{shard_id}.aof"))
1390}
1391
1392#[cfg(test)]
1393mod tests {
1394    use super::*;
1395
1396    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1397
1398    fn temp_dir() -> tempfile::TempDir {
1399        tempfile::tempdir().expect("create temp dir")
1400    }
1401
1402    #[test]
1403    fn record_round_trip_set() -> Result {
1404        let rec = AofRecord::Set {
1405            key: "hello".into(),
1406            value: Bytes::from("world"),
1407            expire_ms: 5000,
1408        };
1409        let bytes = rec.to_bytes()?;
1410        let decoded = AofRecord::from_bytes(&bytes)?;
1411        assert_eq!(rec, decoded);
1412        Ok(())
1413    }
1414
1415    #[test]
1416    fn record_round_trip_del() -> Result {
1417        let rec = AofRecord::Del { key: "gone".into() };
1418        let bytes = rec.to_bytes()?;
1419        let decoded = AofRecord::from_bytes(&bytes)?;
1420        assert_eq!(rec, decoded);
1421        Ok(())
1422    }
1423
1424    #[test]
1425    fn record_round_trip_expire() -> Result {
1426        let rec = AofRecord::Expire {
1427            key: "ttl".into(),
1428            seconds: 300,
1429        };
1430        let bytes = rec.to_bytes()?;
1431        let decoded = AofRecord::from_bytes(&bytes)?;
1432        assert_eq!(rec, decoded);
1433        Ok(())
1434    }
1435
1436    #[test]
1437    fn set_with_no_expiry() -> Result {
1438        let rec = AofRecord::Set {
1439            key: "k".into(),
1440            value: Bytes::from("v"),
1441            expire_ms: -1,
1442        };
1443        let bytes = rec.to_bytes()?;
1444        let decoded = AofRecord::from_bytes(&bytes)?;
1445        assert_eq!(rec, decoded);
1446        Ok(())
1447    }
1448
1449    #[test]
1450    fn writer_reader_round_trip() -> Result {
1451        let dir = temp_dir();
1452        let path = dir.path().join("test.aof");
1453
1454        let records = vec![
1455            AofRecord::Set {
1456                key: "a".into(),
1457                value: Bytes::from("1"),
1458                expire_ms: -1,
1459            },
1460            AofRecord::Set {
1461                key: "b".into(),
1462                value: Bytes::from("2"),
1463                expire_ms: 10_000,
1464            },
1465            AofRecord::Del { key: "a".into() },
1466            AofRecord::Expire {
1467                key: "b".into(),
1468                seconds: 60,
1469            },
1470        ];
1471
1472        // write
1473        {
1474            let mut writer = AofWriter::open(&path)?;
1475            for rec in &records {
1476                writer.write_record(rec)?;
1477            }
1478            writer.sync()?;
1479        }
1480
1481        // read back
1482        let mut reader = AofReader::open(&path)?;
1483        let mut got = Vec::new();
1484        while let Some(rec) = reader.read_record()? {
1485            got.push(rec);
1486        }
1487        assert_eq!(records, got);
1488        Ok(())
1489    }
1490
1491    #[test]
1492    fn empty_aof_returns_no_records() -> Result {
1493        let dir = temp_dir();
1494        let path = dir.path().join("empty.aof");
1495
1496        // just write the header
1497        {
1498            let _writer = AofWriter::open(&path)?;
1499        }
1500
1501        let mut reader = AofReader::open(&path)?;
1502        assert!(reader.read_record()?.is_none());
1503        Ok(())
1504    }
1505
1506    #[test]
1507    fn truncated_record_treated_as_eof() -> Result {
1508        let dir = temp_dir();
1509        let path = dir.path().join("trunc.aof");
1510
1511        // write one good record, then append garbage (simulating a crash)
1512        {
1513            let mut writer = AofWriter::open(&path)?;
1514            writer.write_record(&AofRecord::Set {
1515                key: "ok".into(),
1516                value: Bytes::from("good"),
1517                expire_ms: -1,
1518            })?;
1519            writer.flush()?;
1520        }
1521
1522        // append a partial tag with no payload
1523        {
1524            let mut file = OpenOptions::new().append(true).open(&path)?;
1525            file.write_all(&[TAG_SET])?;
1526        }
1527
1528        let mut reader = AofReader::open(&path)?;
1529        // first record should be fine
1530        let rec = reader.read_record()?.unwrap();
1531        assert!(matches!(rec, AofRecord::Set { .. }));
1532        // second should be None (truncated)
1533        assert!(reader.read_record()?.is_none());
1534        Ok(())
1535    }
1536
1537    #[test]
1538    fn corrupt_crc_detected() -> Result {
1539        let dir = temp_dir();
1540        let path = dir.path().join("corrupt.aof");
1541
1542        {
1543            let mut writer = AofWriter::open(&path)?;
1544            writer.write_record(&AofRecord::Set {
1545                key: "k".into(),
1546                value: Bytes::from("v"),
1547                expire_ms: -1,
1548            })?;
1549            writer.flush()?;
1550        }
1551
1552        // corrupt the last byte (part of the CRC)
1553        let mut data = fs::read(&path)?;
1554        let last = data.len() - 1;
1555        data[last] ^= 0xFF;
1556        fs::write(&path, &data)?;
1557
1558        let mut reader = AofReader::open(&path)?;
1559        let err = reader.read_record().unwrap_err();
1560        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1561        Ok(())
1562    }
1563
1564    #[test]
1565    fn missing_magic_is_error() {
1566        let dir = temp_dir();
1567        let path = dir.path().join("bad.aof");
1568        fs::write(&path, b"NOT_AOF_DATA").unwrap();
1569
1570        let err = AofReader::open(&path).unwrap_err();
1571        assert!(matches!(err, FormatError::InvalidMagic));
1572    }
1573
1574    #[test]
1575    fn truncate_resets_aof() -> Result {
1576        let dir = temp_dir();
1577        let path = dir.path().join("reset.aof");
1578
1579        {
1580            let mut writer = AofWriter::open(&path)?;
1581            writer.write_record(&AofRecord::Set {
1582                key: "old".into(),
1583                value: Bytes::from("data"),
1584                expire_ms: -1,
1585            })?;
1586            writer.truncate()?;
1587
1588            // write a new record after truncation
1589            writer.write_record(&AofRecord::Set {
1590                key: "new".into(),
1591                value: Bytes::from("fresh"),
1592                expire_ms: -1,
1593            })?;
1594            writer.sync()?;
1595        }
1596
1597        let mut reader = AofReader::open(&path)?;
1598        let rec = reader.read_record()?.unwrap();
1599        match rec {
1600            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1601            other => panic!("expected Set, got {other:?}"),
1602        }
1603        // only one record after truncation
1604        assert!(reader.read_record()?.is_none());
1605        Ok(())
1606    }
1607
1608    #[test]
1609    fn record_round_trip_lpush() -> Result {
1610        let rec = AofRecord::LPush {
1611            key: "list".into(),
1612            values: vec![Bytes::from("a"), Bytes::from("b")],
1613        };
1614        let bytes = rec.to_bytes()?;
1615        let decoded = AofRecord::from_bytes(&bytes)?;
1616        assert_eq!(rec, decoded);
1617        Ok(())
1618    }
1619
1620    #[test]
1621    fn record_round_trip_rpush() -> Result {
1622        let rec = AofRecord::RPush {
1623            key: "list".into(),
1624            values: vec![Bytes::from("x")],
1625        };
1626        let bytes = rec.to_bytes()?;
1627        let decoded = AofRecord::from_bytes(&bytes)?;
1628        assert_eq!(rec, decoded);
1629        Ok(())
1630    }
1631
1632    #[test]
1633    fn record_round_trip_lpop() -> Result {
1634        let rec = AofRecord::LPop { key: "list".into() };
1635        let bytes = rec.to_bytes()?;
1636        let decoded = AofRecord::from_bytes(&bytes)?;
1637        assert_eq!(rec, decoded);
1638        Ok(())
1639    }
1640
1641    #[test]
1642    fn record_round_trip_rpop() -> Result {
1643        let rec = AofRecord::RPop { key: "list".into() };
1644        let bytes = rec.to_bytes()?;
1645        let decoded = AofRecord::from_bytes(&bytes)?;
1646        assert_eq!(rec, decoded);
1647        Ok(())
1648    }
1649
1650    #[test]
1651    fn writer_reader_round_trip_with_list_records() -> Result {
1652        let dir = temp_dir();
1653        let path = dir.path().join("list.aof");
1654
1655        let records = vec![
1656            AofRecord::LPush {
1657                key: "l".into(),
1658                values: vec![Bytes::from("a"), Bytes::from("b")],
1659            },
1660            AofRecord::RPush {
1661                key: "l".into(),
1662                values: vec![Bytes::from("c")],
1663            },
1664            AofRecord::LPop { key: "l".into() },
1665            AofRecord::RPop { key: "l".into() },
1666        ];
1667
1668        {
1669            let mut writer = AofWriter::open(&path)?;
1670            for rec in &records {
1671                writer.write_record(rec)?;
1672            }
1673            writer.sync()?;
1674        }
1675
1676        let mut reader = AofReader::open(&path)?;
1677        let mut got = Vec::new();
1678        while let Some(rec) = reader.read_record()? {
1679            got.push(rec);
1680        }
1681        assert_eq!(records, got);
1682        Ok(())
1683    }
1684
1685    #[test]
1686    fn record_round_trip_zadd() -> Result {
1687        let rec = AofRecord::ZAdd {
1688            key: "board".into(),
1689            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1690        };
1691        let bytes = rec.to_bytes()?;
1692        let decoded = AofRecord::from_bytes(&bytes)?;
1693        assert_eq!(rec, decoded);
1694        Ok(())
1695    }
1696
1697    #[test]
1698    fn record_round_trip_zrem() -> Result {
1699        let rec = AofRecord::ZRem {
1700            key: "board".into(),
1701            members: vec!["alice".into(), "bob".into()],
1702        };
1703        let bytes = rec.to_bytes()?;
1704        let decoded = AofRecord::from_bytes(&bytes)?;
1705        assert_eq!(rec, decoded);
1706        Ok(())
1707    }
1708
1709    #[test]
1710    fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1711        let dir = temp_dir();
1712        let path = dir.path().join("zset.aof");
1713
1714        let records = vec![
1715            AofRecord::ZAdd {
1716                key: "board".into(),
1717                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1718            },
1719            AofRecord::ZRem {
1720                key: "board".into(),
1721                members: vec!["alice".into()],
1722            },
1723        ];
1724
1725        {
1726            let mut writer = AofWriter::open(&path)?;
1727            for rec in &records {
1728                writer.write_record(rec)?;
1729            }
1730            writer.sync()?;
1731        }
1732
1733        let mut reader = AofReader::open(&path)?;
1734        let mut got = Vec::new();
1735        while let Some(rec) = reader.read_record()? {
1736            got.push(rec);
1737        }
1738        assert_eq!(records, got);
1739        Ok(())
1740    }
1741
1742    #[test]
1743    fn record_round_trip_persist() -> Result {
1744        let rec = AofRecord::Persist {
1745            key: "mykey".into(),
1746        };
1747        let bytes = rec.to_bytes()?;
1748        let decoded = AofRecord::from_bytes(&bytes)?;
1749        assert_eq!(rec, decoded);
1750        Ok(())
1751    }
1752
1753    #[test]
1754    fn record_round_trip_pexpire() -> Result {
1755        let rec = AofRecord::Pexpire {
1756            key: "mykey".into(),
1757            milliseconds: 5000,
1758        };
1759        let bytes = rec.to_bytes()?;
1760        let decoded = AofRecord::from_bytes(&bytes)?;
1761        assert_eq!(rec, decoded);
1762        Ok(())
1763    }
1764
1765    #[test]
1766    fn record_round_trip_incr() -> Result {
1767        let rec = AofRecord::Incr {
1768            key: "counter".into(),
1769        };
1770        let bytes = rec.to_bytes()?;
1771        let decoded = AofRecord::from_bytes(&bytes)?;
1772        assert_eq!(rec, decoded);
1773        Ok(())
1774    }
1775
1776    #[test]
1777    fn record_round_trip_decr() -> Result {
1778        let rec = AofRecord::Decr {
1779            key: "counter".into(),
1780        };
1781        let bytes = rec.to_bytes()?;
1782        let decoded = AofRecord::from_bytes(&bytes)?;
1783        assert_eq!(rec, decoded);
1784        Ok(())
1785    }
1786
1787    #[test]
1788    fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1789        let dir = temp_dir();
1790        let path = dir.path().join("persist_pexpire.aof");
1791
1792        let records = vec![
1793            AofRecord::Set {
1794                key: "k".into(),
1795                value: Bytes::from("v"),
1796                expire_ms: 5000,
1797            },
1798            AofRecord::Persist { key: "k".into() },
1799            AofRecord::Pexpire {
1800                key: "k".into(),
1801                milliseconds: 3000,
1802            },
1803        ];
1804
1805        {
1806            let mut writer = AofWriter::open(&path)?;
1807            for rec in &records {
1808                writer.write_record(rec)?;
1809            }
1810            writer.sync()?;
1811        }
1812
1813        let mut reader = AofReader::open(&path)?;
1814        let mut got = Vec::new();
1815        while let Some(rec) = reader.read_record()? {
1816            got.push(rec);
1817        }
1818        assert_eq!(records, got);
1819        Ok(())
1820    }
1821
1822    #[test]
1823    fn aof_path_format() {
1824        let p = aof_path(Path::new("/data"), 3);
1825        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1826    }
1827
1828    #[test]
1829    fn record_round_trip_hset() -> Result {
1830        let rec = AofRecord::HSet {
1831            key: "hash".into(),
1832            fields: vec![
1833                ("f1".into(), Bytes::from("v1")),
1834                ("f2".into(), Bytes::from("v2")),
1835            ],
1836        };
1837        let bytes = rec.to_bytes()?;
1838        let decoded = AofRecord::from_bytes(&bytes)?;
1839        assert_eq!(rec, decoded);
1840        Ok(())
1841    }
1842
1843    #[test]
1844    fn record_round_trip_hdel() -> Result {
1845        let rec = AofRecord::HDel {
1846            key: "hash".into(),
1847            fields: vec!["f1".into(), "f2".into()],
1848        };
1849        let bytes = rec.to_bytes()?;
1850        let decoded = AofRecord::from_bytes(&bytes)?;
1851        assert_eq!(rec, decoded);
1852        Ok(())
1853    }
1854
1855    #[test]
1856    fn record_round_trip_hincrby() -> Result {
1857        let rec = AofRecord::HIncrBy {
1858            key: "hash".into(),
1859            field: "counter".into(),
1860            delta: -42,
1861        };
1862        let bytes = rec.to_bytes()?;
1863        let decoded = AofRecord::from_bytes(&bytes)?;
1864        assert_eq!(rec, decoded);
1865        Ok(())
1866    }
1867
1868    #[test]
1869    fn record_round_trip_sadd() -> Result {
1870        let rec = AofRecord::SAdd {
1871            key: "set".into(),
1872            members: vec!["m1".into(), "m2".into(), "m3".into()],
1873        };
1874        let bytes = rec.to_bytes()?;
1875        let decoded = AofRecord::from_bytes(&bytes)?;
1876        assert_eq!(rec, decoded);
1877        Ok(())
1878    }
1879
1880    #[test]
1881    fn record_round_trip_srem() -> Result {
1882        let rec = AofRecord::SRem {
1883            key: "set".into(),
1884            members: vec!["m1".into()],
1885        };
1886        let bytes = rec.to_bytes()?;
1887        let decoded = AofRecord::from_bytes(&bytes)?;
1888        assert_eq!(rec, decoded);
1889        Ok(())
1890    }
1891
1892    #[cfg(feature = "vector")]
1893    #[test]
1894    fn record_round_trip_vadd() -> Result {
1895        let rec = AofRecord::VAdd {
1896            key: "embeddings".into(),
1897            element: "doc1".into(),
1898            vector: vec![0.1, 0.2, 0.3],
1899            metric: 0,       // cosine
1900            quantization: 0, // f32
1901            connectivity: 16,
1902            expansion_add: 64,
1903        };
1904        let bytes = rec.to_bytes()?;
1905        let decoded = AofRecord::from_bytes(&bytes)?;
1906        assert_eq!(rec, decoded);
1907        Ok(())
1908    }
1909
1910    #[cfg(feature = "vector")]
1911    #[test]
1912    fn record_round_trip_vadd_high_dim() -> Result {
1913        let rec = AofRecord::VAdd {
1914            key: "vecs".into(),
1915            element: "e".into(),
1916            vector: vec![0.0; 1536], // typical embedding dimension
1917            metric: 1,               // l2
1918            quantization: 1,         // f16
1919            connectivity: 32,
1920            expansion_add: 128,
1921        };
1922        let bytes = rec.to_bytes()?;
1923        let decoded = AofRecord::from_bytes(&bytes)?;
1924        assert_eq!(rec, decoded);
1925        Ok(())
1926    }
1927
1928    #[cfg(feature = "vector")]
1929    #[test]
1930    fn record_round_trip_vrem() -> Result {
1931        let rec = AofRecord::VRem {
1932            key: "embeddings".into(),
1933            element: "doc1".into(),
1934        };
1935        let bytes = rec.to_bytes()?;
1936        let decoded = AofRecord::from_bytes(&bytes)?;
1937        assert_eq!(rec, decoded);
1938        Ok(())
1939    }
1940
1941    #[cfg(feature = "encryption")]
1942    mod encrypted {
1943        use super::*;
1944        use crate::encryption::EncryptionKey;
1945
1946        type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1947
1948        fn test_key() -> EncryptionKey {
1949            EncryptionKey::from_bytes([0x42; 32])
1950        }
1951
1952        #[test]
1953        fn encrypted_writer_reader_round_trip() -> Result {
1954            let dir = temp_dir();
1955            let path = dir.path().join("enc.aof");
1956            let key = test_key();
1957
1958            let records = vec![
1959                AofRecord::Set {
1960                    key: "a".into(),
1961                    value: Bytes::from("1"),
1962                    expire_ms: -1,
1963                },
1964                AofRecord::Del { key: "a".into() },
1965                AofRecord::LPush {
1966                    key: "list".into(),
1967                    values: vec![Bytes::from("x"), Bytes::from("y")],
1968                },
1969                AofRecord::ZAdd {
1970                    key: "zs".into(),
1971                    members: vec![(1.0, "m".into())],
1972                },
1973            ];
1974
1975            {
1976                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1977                for rec in &records {
1978                    writer.write_record(rec)?;
1979                }
1980                writer.sync()?;
1981            }
1982
1983            let mut reader = AofReader::open_encrypted(&path, key)?;
1984            let mut got = Vec::new();
1985            while let Some(rec) = reader.read_record()? {
1986                got.push(rec);
1987            }
1988            assert_eq!(records, got);
1989            Ok(())
1990        }
1991
1992        #[test]
1993        fn encrypted_aof_wrong_key_fails() -> Result {
1994            let dir = temp_dir();
1995            let path = dir.path().join("enc_bad.aof");
1996            let key = test_key();
1997            let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
1998
1999            {
2000                let mut writer = AofWriter::open_encrypted(&path, key)?;
2001                writer.write_record(&AofRecord::Set {
2002                    key: "k".into(),
2003                    value: Bytes::from("v"),
2004                    expire_ms: -1,
2005                })?;
2006                writer.sync()?;
2007            }
2008
2009            let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
2010            let err = reader.read_record().unwrap_err();
2011            assert!(matches!(err, FormatError::DecryptionFailed));
2012            Ok(())
2013        }
2014
2015        #[test]
2016        fn v2_file_readable_with_encryption_key() -> Result {
2017            let dir = temp_dir();
2018            let path = dir.path().join("v2.aof");
2019            let key = test_key();
2020
2021            // write a plaintext v2 file
2022            {
2023                let mut writer = AofWriter::open(&path)?;
2024                writer.write_record(&AofRecord::Set {
2025                    key: "k".into(),
2026                    value: Bytes::from("v"),
2027                    expire_ms: -1,
2028                })?;
2029                writer.sync()?;
2030            }
2031
2032            // read with encryption key — should work (v2 is plaintext)
2033            let mut reader = AofReader::open_encrypted(&path, key)?;
2034            let rec = reader.read_record()?.unwrap();
2035            assert!(matches!(rec, AofRecord::Set { .. }));
2036            Ok(())
2037        }
2038
2039        #[test]
2040        fn v3_file_without_key_returns_error() -> Result {
2041            let dir = temp_dir();
2042            let path = dir.path().join("v3_nokey.aof");
2043            let key = test_key();
2044
2045            // write an encrypted v3 file
2046            {
2047                let mut writer = AofWriter::open_encrypted(&path, key)?;
2048                writer.write_record(&AofRecord::Set {
2049                    key: "k".into(),
2050                    value: Bytes::from("v"),
2051                    expire_ms: -1,
2052                })?;
2053                writer.sync()?;
2054            }
2055
2056            // try to open without a key
2057            let err = AofReader::open(&path).unwrap_err();
2058            assert!(matches!(err, FormatError::EncryptionRequired));
2059            Ok(())
2060        }
2061
2062        #[test]
2063        fn encrypted_truncate_preserves_encryption() -> Result {
2064            let dir = temp_dir();
2065            let path = dir.path().join("enc_trunc.aof");
2066            let key = test_key();
2067
2068            {
2069                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
2070                writer.write_record(&AofRecord::Set {
2071                    key: "old".into(),
2072                    value: Bytes::from("data"),
2073                    expire_ms: -1,
2074                })?;
2075                writer.truncate()?;
2076
2077                writer.write_record(&AofRecord::Set {
2078                    key: "new".into(),
2079                    value: Bytes::from("fresh"),
2080                    expire_ms: -1,
2081                })?;
2082                writer.sync()?;
2083            }
2084
2085            let mut reader = AofReader::open_encrypted(&path, key)?;
2086            let rec = reader.read_record()?.unwrap();
2087            match rec {
2088                AofRecord::Set { key, .. } => assert_eq!(key, "new"),
2089                other => panic!("expected Set, got {other:?}"),
2090            }
2091            assert!(reader.read_record()?.is_none());
2092            Ok(())
2093        }
2094    }
2095}