Skip to main content

ember_persistence/
aof.rs

1//! Append-only file for recording mutations.
2//!
3//! Each shard writes its own AOF file (`shard-{id}.aof`). Records are
4//! written after successful mutations. The binary format uses a simple
5//! tag + payload + CRC32 structure for each record.
6//!
7//! File layout:
8//! ```text
9//! [EAOF magic: 4B][version: 1B]
10//! [record]*
11//! ```
12//!
13//! Record layout:
14//! ```text
15//! [tag: 1B][payload...][crc32: 4B]
16//! ```
17//! The CRC32 covers the tag + payload bytes.
18
19use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30/// Reads a length-prefixed field and decodes it as UTF-8.
31fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32    let bytes = format::read_bytes(r)?;
33    String::from_utf8(bytes).map_err(|_| {
34        FormatError::Io(io::Error::new(
35            io::ErrorKind::InvalidData,
36            format!("{field} is not valid utf-8"),
37        ))
38    })
39}
40
41/// Reads a count-prefixed list of strings: `[count: u32][string]*`.
42/// Used by SADD, SREM, HDEL, and ZREM deserialization.
43fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44    let count = format::read_u32(r)?;
45    format::validate_collection_count(count, field)?;
46    let mut items = Vec::with_capacity(format::capped_capacity(count));
47    for _ in 0..count {
48        items.push(read_string(r, field)?);
49    }
50    Ok(items)
51}
52
53// -- record tags --
54// values are stable and must not change (on-disk format).
55
56// string
57const TAG_SET: u8 = 1;
58const TAG_INCR: u8 = 12;
59const TAG_DECR: u8 = 13;
60const TAG_INCRBY: u8 = 19;
61const TAG_DECRBY: u8 = 20;
62const TAG_APPEND: u8 = 21;
63
64// list
65const TAG_LPUSH: u8 = 4;
66const TAG_RPUSH: u8 = 5;
67const TAG_LPOP: u8 = 6;
68const TAG_RPOP: u8 = 7;
69
70// sorted set
71const TAG_ZADD: u8 = 8;
72const TAG_ZREM: u8 = 9;
73
74// hash
75const TAG_HSET: u8 = 14;
76const TAG_HDEL: u8 = 15;
77const TAG_HINCRBY: u8 = 16;
78
79// set
80const TAG_SADD: u8 = 17;
81const TAG_SREM: u8 = 18;
82
83// key lifecycle
84const TAG_DEL: u8 = 2;
85const TAG_EXPIRE: u8 = 3;
86const TAG_PERSIST: u8 = 10;
87const TAG_PEXPIRE: u8 = 11;
88const TAG_RENAME: u8 = 22;
89
90// vector
91#[cfg(feature = "vector")]
92const TAG_VADD: u8 = 25;
93#[cfg(feature = "vector")]
94const TAG_VREM: u8 = 26;
95
96// protobuf
97#[cfg(feature = "protobuf")]
98const TAG_PROTO_SET: u8 = 23;
99#[cfg(feature = "protobuf")]
100const TAG_PROTO_REGISTER: u8 = 24;
101
102/// A single mutation record stored in the AOF.
103#[derive(Debug, Clone, PartialEq)]
104pub enum AofRecord {
105    /// SET key value \[expire_ms\]. expire_ms is -1 for no expiration.
106    Set {
107        key: String,
108        value: Bytes,
109        expire_ms: i64,
110    },
111    /// DEL key.
112    Del { key: String },
113    /// EXPIRE key seconds.
114    Expire { key: String, seconds: u64 },
115    /// LPUSH key value [value ...].
116    LPush { key: String, values: Vec<Bytes> },
117    /// RPUSH key value [value ...].
118    RPush { key: String, values: Vec<Bytes> },
119    /// LPOP key.
120    LPop { key: String },
121    /// RPOP key.
122    RPop { key: String },
123    /// ZADD key score member [score member ...].
124    ZAdd {
125        key: String,
126        members: Vec<(f64, String)>,
127    },
128    /// ZREM key member [member ...].
129    ZRem { key: String, members: Vec<String> },
130    /// PERSIST key — remove expiration.
131    Persist { key: String },
132    /// PEXPIRE key milliseconds.
133    Pexpire { key: String, milliseconds: u64 },
134    /// INCR key.
135    Incr { key: String },
136    /// DECR key.
137    Decr { key: String },
138    /// HSET key field value [field value ...].
139    HSet {
140        key: String,
141        fields: Vec<(String, Bytes)>,
142    },
143    /// HDEL key field [field ...].
144    HDel { key: String, fields: Vec<String> },
145    /// HINCRBY key field delta.
146    HIncrBy {
147        key: String,
148        field: String,
149        delta: i64,
150    },
151    /// SADD key member [member ...].
152    SAdd { key: String, members: Vec<String> },
153    /// SREM key member [member ...].
154    SRem { key: String, members: Vec<String> },
155    /// INCRBY key delta.
156    IncrBy { key: String, delta: i64 },
157    /// DECRBY key delta.
158    DecrBy { key: String, delta: i64 },
159    /// APPEND key value.
160    Append { key: String, value: Bytes },
161    /// RENAME key newkey.
162    Rename { key: String, newkey: String },
163    /// VADD key element vector [metric quant connectivity expansion_add].
164    /// Stores the full index config so recovery can recreate the set.
165    #[cfg(feature = "vector")]
166    VAdd {
167        key: String,
168        element: String,
169        vector: Vec<f32>,
170        /// 0 = cosine, 1 = l2, 2 = inner product
171        metric: u8,
172        /// 0 = f32, 1 = f16, 2 = i8
173        quantization: u8,
174        connectivity: u32,
175        expansion_add: u32,
176    },
177    /// VREM key element.
178    #[cfg(feature = "vector")]
179    VRem { key: String, element: String },
180    /// PROTO.SET key type_name data [expire_ms].
181    #[cfg(feature = "protobuf")]
182    ProtoSet {
183        key: String,
184        type_name: String,
185        data: Bytes,
186        expire_ms: i64,
187    },
188    /// PROTO.REGISTER name descriptor_bytes (for schema persistence).
189    #[cfg(feature = "protobuf")]
190    ProtoRegister { name: String, descriptor: Bytes },
191}
192
193impl AofRecord {
194    /// Returns the on-disk tag byte for this record variant.
195    fn tag(&self) -> u8 {
196        match self {
197            AofRecord::Set { .. } => TAG_SET,
198            AofRecord::Del { .. } => TAG_DEL,
199            AofRecord::Expire { .. } => TAG_EXPIRE,
200            AofRecord::LPush { .. } => TAG_LPUSH,
201            AofRecord::RPush { .. } => TAG_RPUSH,
202            AofRecord::LPop { .. } => TAG_LPOP,
203            AofRecord::RPop { .. } => TAG_RPOP,
204            AofRecord::ZAdd { .. } => TAG_ZADD,
205            AofRecord::ZRem { .. } => TAG_ZREM,
206            AofRecord::Persist { .. } => TAG_PERSIST,
207            AofRecord::Pexpire { .. } => TAG_PEXPIRE,
208            AofRecord::Incr { .. } => TAG_INCR,
209            AofRecord::Decr { .. } => TAG_DECR,
210            AofRecord::HSet { .. } => TAG_HSET,
211            AofRecord::HDel { .. } => TAG_HDEL,
212            AofRecord::HIncrBy { .. } => TAG_HINCRBY,
213            AofRecord::SAdd { .. } => TAG_SADD,
214            AofRecord::SRem { .. } => TAG_SREM,
215            AofRecord::IncrBy { .. } => TAG_INCRBY,
216            AofRecord::DecrBy { .. } => TAG_DECRBY,
217            AofRecord::Append { .. } => TAG_APPEND,
218            AofRecord::Rename { .. } => TAG_RENAME,
219            #[cfg(feature = "vector")]
220            AofRecord::VAdd { .. } => TAG_VADD,
221            #[cfg(feature = "vector")]
222            AofRecord::VRem { .. } => TAG_VREM,
223            #[cfg(feature = "protobuf")]
224            AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
225            #[cfg(feature = "protobuf")]
226            AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
227        }
228    }
229
230    /// Estimates the serialized size of this record in bytes.
231    ///
232    /// Used as a capacity hint for `to_bytes()` to avoid intermediate
233    /// reallocations. The estimate includes the tag byte plus all
234    /// length-prefixed fields, erring slightly high to avoid growing.
235    fn estimated_size(&self) -> usize {
236        // overhead per length-prefixed field: 4 bytes for the u32 length
237        const LEN_PREFIX: usize = 4;
238
239        match self {
240            AofRecord::Set {
241                key,
242                value,
243                expire_ms: _,
244            } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
245            AofRecord::Del { key }
246            | AofRecord::LPop { key }
247            | AofRecord::RPop { key }
248            | AofRecord::Persist { key }
249            | AofRecord::Incr { key }
250            | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
251            AofRecord::Expire { key, .. } | AofRecord::Pexpire { key, .. } => {
252                1 + LEN_PREFIX + key.len() + 8
253            }
254            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
255                let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
256                1 + LEN_PREFIX + key.len() + 4 + values_size
257            }
258            AofRecord::ZAdd { key, members } => {
259                let members_size: usize =
260                    members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
261                1 + LEN_PREFIX + key.len() + 4 + members_size
262            }
263            AofRecord::ZRem { key, members }
264            | AofRecord::SAdd { key, members }
265            | AofRecord::SRem { key, members } => {
266                let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
267                1 + LEN_PREFIX + key.len() + 4 + members_size
268            }
269            AofRecord::HSet { key, fields } => {
270                let fields_size: usize = fields
271                    .iter()
272                    .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
273                    .sum();
274                1 + LEN_PREFIX + key.len() + 4 + fields_size
275            }
276            AofRecord::HDel { key, fields } => {
277                let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
278                1 + LEN_PREFIX + key.len() + 4 + fields_size
279            }
280            AofRecord::HIncrBy { key, field, .. } => {
281                1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
282            }
283            AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
284                1 + LEN_PREFIX + key.len() + 8
285            }
286            AofRecord::Append { key, value } => {
287                1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
288            }
289            AofRecord::Rename { key, newkey } => {
290                1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
291            }
292            #[cfg(feature = "vector")]
293            AofRecord::VAdd {
294                key,
295                element,
296                vector,
297                ..
298            } => {
299                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
300            }
301            #[cfg(feature = "vector")]
302            AofRecord::VRem { key, element } => {
303                1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
304            }
305            #[cfg(feature = "protobuf")]
306            AofRecord::ProtoSet {
307                key,
308                type_name,
309                data,
310                ..
311            } => {
312                1 + LEN_PREFIX
313                    + key.len()
314                    + LEN_PREFIX
315                    + type_name.len()
316                    + LEN_PREFIX
317                    + data.len()
318                    + 8
319            }
320            #[cfg(feature = "protobuf")]
321            AofRecord::ProtoRegister { name, descriptor } => {
322                1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
323            }
324        }
325    }
326
327    /// Serializes this record into a byte vector (tag + payload, no CRC).
328    fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
329        let mut buf = Vec::with_capacity(self.estimated_size());
330        format::write_u8(&mut buf, self.tag())?;
331
332        match self {
333            // key-only: tag + key
334            AofRecord::Del { key }
335            | AofRecord::LPop { key }
336            | AofRecord::RPop { key }
337            | AofRecord::Persist { key }
338            | AofRecord::Incr { key }
339            | AofRecord::Decr { key } => {
340                format::write_bytes(&mut buf, key.as_bytes())?;
341            }
342
343            // key + bytes value + expire
344            AofRecord::Set {
345                key,
346                value,
347                expire_ms,
348            } => {
349                format::write_bytes(&mut buf, key.as_bytes())?;
350                format::write_bytes(&mut buf, value)?;
351                format::write_i64(&mut buf, *expire_ms)?;
352            }
353
354            // key + i64
355            AofRecord::Expire { key, seconds } => {
356                format::write_bytes(&mut buf, key.as_bytes())?;
357                format::write_i64(&mut buf, *seconds as i64)?;
358            }
359            AofRecord::Pexpire { key, milliseconds } => {
360                format::write_bytes(&mut buf, key.as_bytes())?;
361                format::write_i64(&mut buf, *milliseconds as i64)?;
362            }
363            AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
364                format::write_bytes(&mut buf, key.as_bytes())?;
365                format::write_i64(&mut buf, *delta)?;
366            }
367
368            // key + byte list
369            AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
370                format::write_bytes(&mut buf, key.as_bytes())?;
371                format::write_len(&mut buf, values.len())?;
372                for v in values {
373                    format::write_bytes(&mut buf, v)?;
374                }
375            }
376
377            // key + string list
378            AofRecord::ZRem { key, members }
379            | AofRecord::SAdd { key, members }
380            | AofRecord::SRem { key, members } => {
381                format::write_bytes(&mut buf, key.as_bytes())?;
382                format::write_len(&mut buf, members.len())?;
383                for member in members {
384                    format::write_bytes(&mut buf, member.as_bytes())?;
385                }
386            }
387            AofRecord::HDel { key, fields } => {
388                format::write_bytes(&mut buf, key.as_bytes())?;
389                format::write_len(&mut buf, fields.len())?;
390                for field in fields {
391                    format::write_bytes(&mut buf, field.as_bytes())?;
392                }
393            }
394
395            // key + scored members
396            AofRecord::ZAdd { key, members } => {
397                format::write_bytes(&mut buf, key.as_bytes())?;
398                format::write_len(&mut buf, members.len())?;
399                for (score, member) in members {
400                    format::write_f64(&mut buf, *score)?;
401                    format::write_bytes(&mut buf, member.as_bytes())?;
402                }
403            }
404
405            // key + field-value pairs
406            AofRecord::HSet { key, fields } => {
407                format::write_bytes(&mut buf, key.as_bytes())?;
408                format::write_len(&mut buf, fields.len())?;
409                for (field, value) in fields {
410                    format::write_bytes(&mut buf, field.as_bytes())?;
411                    format::write_bytes(&mut buf, value)?;
412                }
413            }
414
415            // key + field + delta
416            AofRecord::HIncrBy { key, field, delta } => {
417                format::write_bytes(&mut buf, key.as_bytes())?;
418                format::write_bytes(&mut buf, field.as_bytes())?;
419                format::write_i64(&mut buf, *delta)?;
420            }
421
422            // key + bytes value (no expire)
423            AofRecord::Append { key, value } => {
424                format::write_bytes(&mut buf, key.as_bytes())?;
425                format::write_bytes(&mut buf, value)?;
426            }
427
428            // key + newkey
429            AofRecord::Rename { key, newkey } => {
430                format::write_bytes(&mut buf, key.as_bytes())?;
431                format::write_bytes(&mut buf, newkey.as_bytes())?;
432            }
433
434            #[cfg(feature = "vector")]
435            AofRecord::VAdd {
436                key,
437                element,
438                vector,
439                metric,
440                quantization,
441                connectivity,
442                expansion_add,
443            } => {
444                format::write_bytes(&mut buf, key.as_bytes())?;
445                format::write_bytes(&mut buf, element.as_bytes())?;
446                format::write_len(&mut buf, vector.len())?;
447                for &v in vector {
448                    format::write_f32(&mut buf, v)?;
449                }
450                format::write_u8(&mut buf, *metric)?;
451                format::write_u8(&mut buf, *quantization)?;
452                format::write_u32(&mut buf, *connectivity)?;
453                format::write_u32(&mut buf, *expansion_add)?;
454            }
455            #[cfg(feature = "vector")]
456            AofRecord::VRem { key, element } => {
457                format::write_bytes(&mut buf, key.as_bytes())?;
458                format::write_bytes(&mut buf, element.as_bytes())?;
459            }
460
461            #[cfg(feature = "protobuf")]
462            AofRecord::ProtoSet {
463                key,
464                type_name,
465                data,
466                expire_ms,
467            } => {
468                format::write_bytes(&mut buf, key.as_bytes())?;
469                format::write_bytes(&mut buf, type_name.as_bytes())?;
470                format::write_bytes(&mut buf, data)?;
471                format::write_i64(&mut buf, *expire_ms)?;
472            }
473            #[cfg(feature = "protobuf")]
474            AofRecord::ProtoRegister { name, descriptor } => {
475                format::write_bytes(&mut buf, name.as_bytes())?;
476                format::write_bytes(&mut buf, descriptor)?;
477            }
478        }
479        Ok(buf)
480    }
481
482    /// Deserializes a record from a byte slice (tag + payload, no CRC).
483    fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
484        let mut cursor = io::Cursor::new(data);
485        let tag = format::read_u8(&mut cursor)?;
486        match tag {
487            TAG_SET => {
488                let key = read_string(&mut cursor, "key")?;
489                let value = format::read_bytes(&mut cursor)?;
490                let expire_ms = format::read_i64(&mut cursor)?;
491                Ok(AofRecord::Set {
492                    key,
493                    value: Bytes::from(value),
494                    expire_ms,
495                })
496            }
497            TAG_DEL => {
498                let key = read_string(&mut cursor, "key")?;
499                Ok(AofRecord::Del { key })
500            }
501            TAG_EXPIRE => {
502                let key = read_string(&mut cursor, "key")?;
503                let seconds = format::read_i64(&mut cursor)? as u64;
504                Ok(AofRecord::Expire { key, seconds })
505            }
506            TAG_LPUSH | TAG_RPUSH => {
507                let key = read_string(&mut cursor, "key")?;
508                let count = format::read_u32(&mut cursor)?;
509                format::validate_collection_count(count, "list")?;
510                let mut values = Vec::with_capacity(format::capped_capacity(count));
511                for _ in 0..count {
512                    values.push(Bytes::from(format::read_bytes(&mut cursor)?));
513                }
514                if tag == TAG_LPUSH {
515                    Ok(AofRecord::LPush { key, values })
516                } else {
517                    Ok(AofRecord::RPush { key, values })
518                }
519            }
520            TAG_LPOP => {
521                let key = read_string(&mut cursor, "key")?;
522                Ok(AofRecord::LPop { key })
523            }
524            TAG_RPOP => {
525                let key = read_string(&mut cursor, "key")?;
526                Ok(AofRecord::RPop { key })
527            }
528            TAG_ZADD => {
529                let key = read_string(&mut cursor, "key")?;
530                let count = format::read_u32(&mut cursor)?;
531                format::validate_collection_count(count, "sorted set")?;
532                let mut members = Vec::with_capacity(format::capped_capacity(count));
533                for _ in 0..count {
534                    let score = format::read_f64(&mut cursor)?;
535                    let member = read_string(&mut cursor, "member")?;
536                    members.push((score, member));
537                }
538                Ok(AofRecord::ZAdd { key, members })
539            }
540            TAG_ZREM => {
541                let key = read_string(&mut cursor, "key")?;
542                let members = read_string_list(&mut cursor, "member")?;
543                Ok(AofRecord::ZRem { key, members })
544            }
545            TAG_PERSIST => {
546                let key = read_string(&mut cursor, "key")?;
547                Ok(AofRecord::Persist { key })
548            }
549            TAG_PEXPIRE => {
550                let key = read_string(&mut cursor, "key")?;
551                let milliseconds = format::read_i64(&mut cursor)? as u64;
552                Ok(AofRecord::Pexpire { key, milliseconds })
553            }
554            TAG_INCR => {
555                let key = read_string(&mut cursor, "key")?;
556                Ok(AofRecord::Incr { key })
557            }
558            TAG_DECR => {
559                let key = read_string(&mut cursor, "key")?;
560                Ok(AofRecord::Decr { key })
561            }
562            TAG_HSET => {
563                let key = read_string(&mut cursor, "key")?;
564                let count = format::read_u32(&mut cursor)?;
565                format::validate_collection_count(count, "hash")?;
566                let mut fields = Vec::with_capacity(format::capped_capacity(count));
567                for _ in 0..count {
568                    let field = read_string(&mut cursor, "field")?;
569                    let value = Bytes::from(format::read_bytes(&mut cursor)?);
570                    fields.push((field, value));
571                }
572                Ok(AofRecord::HSet { key, fields })
573            }
574            TAG_HDEL => {
575                let key = read_string(&mut cursor, "key")?;
576                let fields = read_string_list(&mut cursor, "field")?;
577                Ok(AofRecord::HDel { key, fields })
578            }
579            TAG_HINCRBY => {
580                let key = read_string(&mut cursor, "key")?;
581                let field = read_string(&mut cursor, "field")?;
582                let delta = format::read_i64(&mut cursor)?;
583                Ok(AofRecord::HIncrBy { key, field, delta })
584            }
585            TAG_SADD => {
586                let key = read_string(&mut cursor, "key")?;
587                let members = read_string_list(&mut cursor, "member")?;
588                Ok(AofRecord::SAdd { key, members })
589            }
590            TAG_SREM => {
591                let key = read_string(&mut cursor, "key")?;
592                let members = read_string_list(&mut cursor, "member")?;
593                Ok(AofRecord::SRem { key, members })
594            }
595            TAG_INCRBY => {
596                let key = read_string(&mut cursor, "key")?;
597                let delta = format::read_i64(&mut cursor)?;
598                Ok(AofRecord::IncrBy { key, delta })
599            }
600            TAG_DECRBY => {
601                let key = read_string(&mut cursor, "key")?;
602                let delta = format::read_i64(&mut cursor)?;
603                Ok(AofRecord::DecrBy { key, delta })
604            }
605            TAG_APPEND => {
606                let key = read_string(&mut cursor, "key")?;
607                let value = Bytes::from(format::read_bytes(&mut cursor)?);
608                Ok(AofRecord::Append { key, value })
609            }
610            TAG_RENAME => {
611                let key = read_string(&mut cursor, "key")?;
612                let newkey = read_string(&mut cursor, "newkey")?;
613                Ok(AofRecord::Rename { key, newkey })
614            }
615            #[cfg(feature = "vector")]
616            TAG_VADD => {
617                let key = read_string(&mut cursor, "key")?;
618                let element = read_string(&mut cursor, "element")?;
619                let dim = format::read_u32(&mut cursor)?;
620                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
621                    return Err(FormatError::InvalidData(format!(
622                        "AOF VADD dimension {dim} exceeds max {}",
623                        format::MAX_PERSISTED_VECTOR_DIMS
624                    )));
625                }
626                let mut vector = Vec::with_capacity(dim as usize);
627                for _ in 0..dim {
628                    vector.push(format::read_f32(&mut cursor)?);
629                }
630                let metric = format::read_u8(&mut cursor)?;
631                let quantization = format::read_u8(&mut cursor)?;
632                let connectivity = format::read_u32(&mut cursor)?;
633                let expansion_add = format::read_u32(&mut cursor)?;
634                Ok(AofRecord::VAdd {
635                    key,
636                    element,
637                    vector,
638                    metric,
639                    quantization,
640                    connectivity,
641                    expansion_add,
642                })
643            }
644            #[cfg(feature = "vector")]
645            TAG_VREM => {
646                let key = read_string(&mut cursor, "key")?;
647                let element = read_string(&mut cursor, "element")?;
648                Ok(AofRecord::VRem { key, element })
649            }
650            #[cfg(feature = "protobuf")]
651            TAG_PROTO_SET => {
652                let key = read_string(&mut cursor, "key")?;
653                let type_name = read_string(&mut cursor, "type_name")?;
654                let data = format::read_bytes(&mut cursor)?;
655                let expire_ms = format::read_i64(&mut cursor)?;
656                Ok(AofRecord::ProtoSet {
657                    key,
658                    type_name,
659                    data: Bytes::from(data),
660                    expire_ms,
661                })
662            }
663            #[cfg(feature = "protobuf")]
664            TAG_PROTO_REGISTER => {
665                let name = read_string(&mut cursor, "name")?;
666                let descriptor = format::read_bytes(&mut cursor)?;
667                Ok(AofRecord::ProtoRegister {
668                    name,
669                    descriptor: Bytes::from(descriptor),
670                })
671            }
672            _ => Err(FormatError::UnknownTag(tag)),
673        }
674    }
675}
676
677/// Configurable fsync policy for the AOF writer.
678#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
679pub enum FsyncPolicy {
680    /// fsync after every write. safest, slowest.
681    Always,
682    /// fsync once per second. the shard tick drives this.
683    #[default]
684    EverySec,
685    /// let the OS decide when to flush. fastest, least durable.
686    No,
687}
688
689/// Buffered writer for appending AOF records to a file.
690pub struct AofWriter {
691    writer: BufWriter<File>,
692    path: PathBuf,
693    #[cfg(feature = "encryption")]
694    encryption_key: Option<crate::encryption::EncryptionKey>,
695}
696
697impl AofWriter {
698    /// Opens (or creates) an AOF file. If the file is new, writes the header.
699    /// If the file already exists, appends to it.
700    pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
701        let path = path.into();
702        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
703
704        let file = open_persistence_file(&path)?;
705        let mut writer = BufWriter::new(file);
706
707        if !exists {
708            format::write_header(&mut writer, format::AOF_MAGIC)?;
709            writer.flush()?;
710        }
711
712        Ok(Self {
713            writer,
714            path,
715            #[cfg(feature = "encryption")]
716            encryption_key: None,
717        })
718    }
719
720    /// Opens (or creates) an encrypted AOF file using AES-256-GCM.
721    ///
722    /// New files get a v3 header. Existing v2 files can be appended to —
723    /// new records will be written unencrypted (use `BGREWRITEAOF` to
724    /// migrate the full file to v3).
725    #[cfg(feature = "encryption")]
726    pub fn open_encrypted(
727        path: impl Into<PathBuf>,
728        key: crate::encryption::EncryptionKey,
729    ) -> Result<Self, FormatError> {
730        let path = path.into();
731        let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
732
733        let file = open_persistence_file(&path)?;
734        let mut writer = BufWriter::new(file);
735
736        if !exists {
737            format::write_header_versioned(
738                &mut writer,
739                format::AOF_MAGIC,
740                format::FORMAT_VERSION_ENCRYPTED,
741            )?;
742            writer.flush()?;
743        }
744
745        Ok(Self {
746            writer,
747            path,
748            encryption_key: Some(key),
749        })
750    }
751
752    /// Appends a record to the AOF.
753    ///
754    /// When an encryption key is set, writes: `[nonce: 12B][len: 4B][ciphertext]`.
755    /// Otherwise writes the v2 format: `[tag+payload][crc32: 4B]`.
756    pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
757        let payload = record.to_bytes()?;
758
759        #[cfg(feature = "encryption")]
760        if let Some(ref key) = self.encryption_key {
761            let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
762            self.writer.write_all(&nonce)?;
763            format::write_len(&mut self.writer, ciphertext.len())?;
764            self.writer.write_all(&ciphertext)?;
765            return Ok(());
766        }
767
768        let checksum = format::crc32(&payload);
769        self.writer.write_all(&payload)?;
770        format::write_u32(&mut self.writer, checksum)?;
771        Ok(())
772    }
773
774    /// Flushes the internal buffer to the OS.
775    pub fn flush(&mut self) -> Result<(), FormatError> {
776        self.writer.flush()?;
777        Ok(())
778    }
779
780    /// Flushes and fsyncs the file to disk.
781    pub fn sync(&mut self) -> Result<(), FormatError> {
782        self.writer.flush()?;
783        self.writer.get_ref().sync_all()?;
784        Ok(())
785    }
786
787    /// Returns the file path.
788    pub fn path(&self) -> &Path {
789        &self.path
790    }
791
792    /// Truncates the AOF file back to just the header.
793    ///
794    /// Uses write-to-temp-then-rename for crash safety: the old AOF
795    /// remains intact until the new file (with only a header) is fully
796    /// synced and atomically renamed into place.
797    pub fn truncate(&mut self) -> Result<(), FormatError> {
798        // flush the old writer so no data is in the BufWriter
799        self.writer.flush()?;
800
801        // write a fresh header to a temp file next to the real AOF
802        let tmp_path = self.path.with_extension("aof.tmp");
803        let mut opts = OpenOptions::new();
804        opts.create(true).write(true).truncate(true);
805        #[cfg(unix)]
806        {
807            use std::os::unix::fs::OpenOptionsExt;
808            opts.mode(0o600);
809        }
810        let tmp_file = opts.open(&tmp_path)?;
811        let mut tmp_writer = BufWriter::new(tmp_file);
812
813        #[cfg(feature = "encryption")]
814        if self.encryption_key.is_some() {
815            format::write_header_versioned(
816                &mut tmp_writer,
817                format::AOF_MAGIC,
818                format::FORMAT_VERSION_ENCRYPTED,
819            )?;
820        } else {
821            format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
822        }
823        #[cfg(not(feature = "encryption"))]
824        format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
825
826        tmp_writer.flush()?;
827        tmp_writer.get_ref().sync_all()?;
828
829        // atomic rename: old AOF is replaced only after new file is durable
830        std::fs::rename(&tmp_path, &self.path)?;
831
832        // reopen for appending
833        let file = OpenOptions::new().append(true).open(&self.path)?;
834        self.writer = BufWriter::new(file);
835        Ok(())
836    }
837}
838
839/// Reader for iterating over AOF records.
840pub struct AofReader {
841    reader: BufReader<File>,
842    /// Format version from the file header. v2 = plaintext, v3 = encrypted.
843    version: u8,
844    #[cfg(feature = "encryption")]
845    encryption_key: Option<crate::encryption::EncryptionKey>,
846}
847
848impl fmt::Debug for AofReader {
849    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850        f.debug_struct("AofReader")
851            .field("version", &self.version)
852            .finish()
853    }
854}
855
856impl AofReader {
857    /// Opens an AOF file and validates the header.
858    pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
859        let file = File::open(path.as_ref())?;
860        let mut reader = BufReader::new(file);
861        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
862
863        if version == format::FORMAT_VERSION_ENCRYPTED {
864            return Err(FormatError::EncryptionRequired);
865        }
866
867        Ok(Self {
868            reader,
869            version,
870            #[cfg(feature = "encryption")]
871            encryption_key: None,
872        })
873    }
874
875    /// Opens an AOF file with an encryption key for decrypting v3 records.
876    ///
877    /// Also handles v2 (plaintext) files — the key is simply unused,
878    /// allowing transparent migration.
879    #[cfg(feature = "encryption")]
880    pub fn open_encrypted(
881        path: impl AsRef<Path>,
882        key: crate::encryption::EncryptionKey,
883    ) -> Result<Self, FormatError> {
884        let file = File::open(path.as_ref())?;
885        let mut reader = BufReader::new(file);
886        let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
887
888        Ok(Self {
889            reader,
890            version,
891            encryption_key: Some(key),
892        })
893    }
894
895    /// Reads the next record from the AOF.
896    ///
897    /// Returns `Ok(None)` at end-of-file. On a truncated record (the
898    /// server crashed mid-write), returns `Ok(None)` rather than an error
899    /// — this is the expected recovery behavior.
900    pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
901        #[cfg(feature = "encryption")]
902        if self.version == format::FORMAT_VERSION_ENCRYPTED {
903            return self.read_encrypted_record();
904        }
905
906        self.read_v2_record()
907    }
908
909    /// Reads a v2 (plaintext) record: tag + payload + crc32.
910    fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
911        // peek for EOF — try reading the tag byte
912        let tag = match format::read_u8(&mut self.reader) {
913            Ok(t) => t,
914            Err(FormatError::UnexpectedEof) => return Ok(None),
915            Err(e) => return Err(e),
916        };
917
918        // read the rest of the payload based on tag, building the full
919        // record bytes for CRC verification
920        let record_result = self.read_payload_for_tag(tag);
921        match record_result {
922            Ok((payload, stored_crc)) => {
923                // prepend the tag to the payload for CRC check
924                let mut full = Vec::with_capacity(1 + payload.len());
925                full.push(tag);
926                full.extend_from_slice(&payload);
927                format::verify_crc32(&full, stored_crc)?;
928                AofRecord::from_bytes(&full).map(Some)
929            }
930            // truncated record — treat as end of usable data
931            Err(FormatError::UnexpectedEof) => Ok(None),
932            Err(e) => Err(e),
933        }
934    }
935
936    /// Reads a v3 (encrypted) record: nonce + len + ciphertext.
937    #[cfg(feature = "encryption")]
938    fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
939        let key = self
940            .encryption_key
941            .as_ref()
942            .ok_or(FormatError::EncryptionRequired)?;
943
944        // read the 12-byte nonce
945        let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
946        if let Err(e) = self.reader.read_exact(&mut nonce) {
947            return if e.kind() == io::ErrorKind::UnexpectedEof {
948                Ok(None)
949            } else {
950                Err(FormatError::Io(e))
951            };
952        }
953
954        // read ciphertext length and ciphertext
955        let ct_len = match format::read_u32(&mut self.reader) {
956            Ok(n) => n as usize,
957            Err(FormatError::UnexpectedEof) => return Ok(None),
958            Err(e) => return Err(e),
959        };
960
961        if ct_len > format::MAX_FIELD_LEN {
962            return Err(FormatError::Io(io::Error::new(
963                io::ErrorKind::InvalidData,
964                format!("encrypted record length {ct_len} exceeds maximum"),
965            )));
966        }
967
968        let mut ciphertext = vec![0u8; ct_len];
969        if let Err(e) = self.reader.read_exact(&mut ciphertext) {
970            return if e.kind() == io::ErrorKind::UnexpectedEof {
971                Ok(None)
972            } else {
973                Err(FormatError::Io(e))
974            };
975        }
976
977        let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
978        AofRecord::from_bytes(&plaintext).map(Some)
979    }
980
981    /// Reads the remaining payload bytes (after the tag) and the trailing CRC.
982    fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
983        let mut payload = Vec::new();
984        match tag {
985            TAG_SET => {
986                // key_len + key + value_len + value + expire_ms
987                let key = format::read_bytes(&mut self.reader)?;
988                format::write_bytes(&mut payload, &key)?;
989                let value = format::read_bytes(&mut self.reader)?;
990                format::write_bytes(&mut payload, &value)?;
991                let expire_ms = format::read_i64(&mut self.reader)?;
992                format::write_i64(&mut payload, expire_ms)?;
993            }
994            TAG_DEL => {
995                let key = format::read_bytes(&mut self.reader)?;
996                format::write_bytes(&mut payload, &key)?;
997            }
998            TAG_EXPIRE => {
999                let key = format::read_bytes(&mut self.reader)?;
1000                format::write_bytes(&mut payload, &key)?;
1001                let seconds = format::read_i64(&mut self.reader)?;
1002                format::write_i64(&mut payload, seconds)?;
1003            }
1004            TAG_LPUSH | TAG_RPUSH => {
1005                let key = format::read_bytes(&mut self.reader)?;
1006                format::write_bytes(&mut payload, &key)?;
1007                let count = format::read_u32(&mut self.reader)?;
1008                format::validate_collection_count(count, "list")?;
1009                format::write_u32(&mut payload, count)?;
1010                for _ in 0..count {
1011                    let val = format::read_bytes(&mut self.reader)?;
1012                    format::write_bytes(&mut payload, &val)?;
1013                }
1014            }
1015            TAG_LPOP | TAG_RPOP => {
1016                let key = format::read_bytes(&mut self.reader)?;
1017                format::write_bytes(&mut payload, &key)?;
1018            }
1019            TAG_ZADD => {
1020                let key = format::read_bytes(&mut self.reader)?;
1021                format::write_bytes(&mut payload, &key)?;
1022                let count = format::read_u32(&mut self.reader)?;
1023                format::validate_collection_count(count, "sorted set")?;
1024                format::write_u32(&mut payload, count)?;
1025                for _ in 0..count {
1026                    let score = format::read_f64(&mut self.reader)?;
1027                    format::write_f64(&mut payload, score)?;
1028                    let member = format::read_bytes(&mut self.reader)?;
1029                    format::write_bytes(&mut payload, &member)?;
1030                }
1031            }
1032            TAG_ZREM => {
1033                let key = format::read_bytes(&mut self.reader)?;
1034                format::write_bytes(&mut payload, &key)?;
1035                let count = format::read_u32(&mut self.reader)?;
1036                format::validate_collection_count(count, "sorted set")?;
1037                format::write_u32(&mut payload, count)?;
1038                for _ in 0..count {
1039                    let member = format::read_bytes(&mut self.reader)?;
1040                    format::write_bytes(&mut payload, &member)?;
1041                }
1042            }
1043            TAG_PERSIST => {
1044                let key = format::read_bytes(&mut self.reader)?;
1045                format::write_bytes(&mut payload, &key)?;
1046            }
1047            TAG_PEXPIRE => {
1048                let key = format::read_bytes(&mut self.reader)?;
1049                format::write_bytes(&mut payload, &key)?;
1050                let millis = format::read_i64(&mut self.reader)?;
1051                format::write_i64(&mut payload, millis)?;
1052            }
1053            TAG_INCR | TAG_DECR => {
1054                let key = format::read_bytes(&mut self.reader)?;
1055                format::write_bytes(&mut payload, &key)?;
1056            }
1057            TAG_HSET => {
1058                let key = format::read_bytes(&mut self.reader)?;
1059                format::write_bytes(&mut payload, &key)?;
1060                let count = format::read_u32(&mut self.reader)?;
1061                format::validate_collection_count(count, "hash")?;
1062                format::write_u32(&mut payload, count)?;
1063                for _ in 0..count {
1064                    let field = format::read_bytes(&mut self.reader)?;
1065                    format::write_bytes(&mut payload, &field)?;
1066                    let value = format::read_bytes(&mut self.reader)?;
1067                    format::write_bytes(&mut payload, &value)?;
1068                }
1069            }
1070            TAG_HDEL | TAG_SADD | TAG_SREM => {
1071                let key = format::read_bytes(&mut self.reader)?;
1072                format::write_bytes(&mut payload, &key)?;
1073                let count = format::read_u32(&mut self.reader)?;
1074                format::validate_collection_count(count, "set")?;
1075                format::write_u32(&mut payload, count)?;
1076                for _ in 0..count {
1077                    let item = format::read_bytes(&mut self.reader)?;
1078                    format::write_bytes(&mut payload, &item)?;
1079                }
1080            }
1081            TAG_HINCRBY => {
1082                let key = format::read_bytes(&mut self.reader)?;
1083                format::write_bytes(&mut payload, &key)?;
1084                let field = format::read_bytes(&mut self.reader)?;
1085                format::write_bytes(&mut payload, &field)?;
1086                let delta = format::read_i64(&mut self.reader)?;
1087                format::write_i64(&mut payload, delta)?;
1088            }
1089            TAG_INCRBY | TAG_DECRBY => {
1090                let key = format::read_bytes(&mut self.reader)?;
1091                format::write_bytes(&mut payload, &key)?;
1092                let delta = format::read_i64(&mut self.reader)?;
1093                format::write_i64(&mut payload, delta)?;
1094            }
1095            TAG_APPEND => {
1096                let key = format::read_bytes(&mut self.reader)?;
1097                format::write_bytes(&mut payload, &key)?;
1098                let value = format::read_bytes(&mut self.reader)?;
1099                format::write_bytes(&mut payload, &value)?;
1100            }
1101            TAG_RENAME => {
1102                let key = format::read_bytes(&mut self.reader)?;
1103                format::write_bytes(&mut payload, &key)?;
1104                let newkey = format::read_bytes(&mut self.reader)?;
1105                format::write_bytes(&mut payload, &newkey)?;
1106            }
1107            #[cfg(feature = "vector")]
1108            TAG_VADD => {
1109                let key = format::read_bytes(&mut self.reader)?;
1110                format::write_bytes(&mut payload, &key)?;
1111                let element = format::read_bytes(&mut self.reader)?;
1112                format::write_bytes(&mut payload, &element)?;
1113                let dim = format::read_u32(&mut self.reader)?;
1114                if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1115                    return Err(FormatError::InvalidData(format!(
1116                        "AOF VADD dimension {dim} exceeds max {}",
1117                        format::MAX_PERSISTED_VECTOR_DIMS
1118                    )));
1119                }
1120                format::write_u32(&mut payload, dim)?;
1121                for _ in 0..dim {
1122                    let v = format::read_f32(&mut self.reader)?;
1123                    format::write_f32(&mut payload, v)?;
1124                }
1125                let metric = format::read_u8(&mut self.reader)?;
1126                format::write_u8(&mut payload, metric)?;
1127                let quantization = format::read_u8(&mut self.reader)?;
1128                format::write_u8(&mut payload, quantization)?;
1129                let connectivity = format::read_u32(&mut self.reader)?;
1130                format::write_u32(&mut payload, connectivity)?;
1131                let expansion_add = format::read_u32(&mut self.reader)?;
1132                format::write_u32(&mut payload, expansion_add)?;
1133            }
1134            #[cfg(feature = "vector")]
1135            TAG_VREM => {
1136                let key = format::read_bytes(&mut self.reader)?;
1137                format::write_bytes(&mut payload, &key)?;
1138                let element = format::read_bytes(&mut self.reader)?;
1139                format::write_bytes(&mut payload, &element)?;
1140            }
1141            #[cfg(feature = "protobuf")]
1142            TAG_PROTO_SET => {
1143                let key = format::read_bytes(&mut self.reader)?;
1144                format::write_bytes(&mut payload, &key)?;
1145                let type_name = format::read_bytes(&mut self.reader)?;
1146                format::write_bytes(&mut payload, &type_name)?;
1147                let data = format::read_bytes(&mut self.reader)?;
1148                format::write_bytes(&mut payload, &data)?;
1149                let expire_ms = format::read_i64(&mut self.reader)?;
1150                format::write_i64(&mut payload, expire_ms)?;
1151            }
1152            #[cfg(feature = "protobuf")]
1153            TAG_PROTO_REGISTER => {
1154                let name = format::read_bytes(&mut self.reader)?;
1155                format::write_bytes(&mut payload, &name)?;
1156                let descriptor = format::read_bytes(&mut self.reader)?;
1157                format::write_bytes(&mut payload, &descriptor)?;
1158            }
1159            _ => return Err(FormatError::UnknownTag(tag)),
1160        }
1161        let stored_crc = format::read_u32(&mut self.reader)?;
1162        Ok((payload, stored_crc))
1163    }
1164}
1165
1166/// Opens a persistence file with create+append and restrictive permissions.
1167fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1168    let mut opts = OpenOptions::new();
1169    opts.create(true).append(true);
1170    #[cfg(unix)]
1171    {
1172        use std::os::unix::fs::OpenOptionsExt;
1173        opts.mode(0o600);
1174    }
1175    Ok(opts.open(path)?)
1176}
1177
1178/// Returns the AOF file path for a given shard in a data directory.
1179pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1180    data_dir.join(format!("shard-{shard_id}.aof"))
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use super::*;
1186
1187    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1188
1189    fn temp_dir() -> tempfile::TempDir {
1190        tempfile::tempdir().expect("create temp dir")
1191    }
1192
1193    #[test]
1194    fn record_round_trip_set() -> Result {
1195        let rec = AofRecord::Set {
1196            key: "hello".into(),
1197            value: Bytes::from("world"),
1198            expire_ms: 5000,
1199        };
1200        let bytes = rec.to_bytes()?;
1201        let decoded = AofRecord::from_bytes(&bytes)?;
1202        assert_eq!(rec, decoded);
1203        Ok(())
1204    }
1205
1206    #[test]
1207    fn record_round_trip_del() -> Result {
1208        let rec = AofRecord::Del { key: "gone".into() };
1209        let bytes = rec.to_bytes()?;
1210        let decoded = AofRecord::from_bytes(&bytes)?;
1211        assert_eq!(rec, decoded);
1212        Ok(())
1213    }
1214
1215    #[test]
1216    fn record_round_trip_expire() -> Result {
1217        let rec = AofRecord::Expire {
1218            key: "ttl".into(),
1219            seconds: 300,
1220        };
1221        let bytes = rec.to_bytes()?;
1222        let decoded = AofRecord::from_bytes(&bytes)?;
1223        assert_eq!(rec, decoded);
1224        Ok(())
1225    }
1226
1227    #[test]
1228    fn set_with_no_expiry() -> Result {
1229        let rec = AofRecord::Set {
1230            key: "k".into(),
1231            value: Bytes::from("v"),
1232            expire_ms: -1,
1233        };
1234        let bytes = rec.to_bytes()?;
1235        let decoded = AofRecord::from_bytes(&bytes)?;
1236        assert_eq!(rec, decoded);
1237        Ok(())
1238    }
1239
1240    #[test]
1241    fn writer_reader_round_trip() -> Result {
1242        let dir = temp_dir();
1243        let path = dir.path().join("test.aof");
1244
1245        let records = vec![
1246            AofRecord::Set {
1247                key: "a".into(),
1248                value: Bytes::from("1"),
1249                expire_ms: -1,
1250            },
1251            AofRecord::Set {
1252                key: "b".into(),
1253                value: Bytes::from("2"),
1254                expire_ms: 10_000,
1255            },
1256            AofRecord::Del { key: "a".into() },
1257            AofRecord::Expire {
1258                key: "b".into(),
1259                seconds: 60,
1260            },
1261        ];
1262
1263        // write
1264        {
1265            let mut writer = AofWriter::open(&path)?;
1266            for rec in &records {
1267                writer.write_record(rec)?;
1268            }
1269            writer.sync()?;
1270        }
1271
1272        // read back
1273        let mut reader = AofReader::open(&path)?;
1274        let mut got = Vec::new();
1275        while let Some(rec) = reader.read_record()? {
1276            got.push(rec);
1277        }
1278        assert_eq!(records, got);
1279        Ok(())
1280    }
1281
1282    #[test]
1283    fn empty_aof_returns_no_records() -> Result {
1284        let dir = temp_dir();
1285        let path = dir.path().join("empty.aof");
1286
1287        // just write the header
1288        {
1289            let _writer = AofWriter::open(&path)?;
1290        }
1291
1292        let mut reader = AofReader::open(&path)?;
1293        assert!(reader.read_record()?.is_none());
1294        Ok(())
1295    }
1296
1297    #[test]
1298    fn truncated_record_treated_as_eof() -> Result {
1299        let dir = temp_dir();
1300        let path = dir.path().join("trunc.aof");
1301
1302        // write one good record, then append garbage (simulating a crash)
1303        {
1304            let mut writer = AofWriter::open(&path)?;
1305            writer.write_record(&AofRecord::Set {
1306                key: "ok".into(),
1307                value: Bytes::from("good"),
1308                expire_ms: -1,
1309            })?;
1310            writer.flush()?;
1311        }
1312
1313        // append a partial tag with no payload
1314        {
1315            let mut file = OpenOptions::new().append(true).open(&path)?;
1316            file.write_all(&[TAG_SET])?;
1317        }
1318
1319        let mut reader = AofReader::open(&path)?;
1320        // first record should be fine
1321        let rec = reader.read_record()?.unwrap();
1322        assert!(matches!(rec, AofRecord::Set { .. }));
1323        // second should be None (truncated)
1324        assert!(reader.read_record()?.is_none());
1325        Ok(())
1326    }
1327
1328    #[test]
1329    fn corrupt_crc_detected() -> Result {
1330        let dir = temp_dir();
1331        let path = dir.path().join("corrupt.aof");
1332
1333        {
1334            let mut writer = AofWriter::open(&path)?;
1335            writer.write_record(&AofRecord::Set {
1336                key: "k".into(),
1337                value: Bytes::from("v"),
1338                expire_ms: -1,
1339            })?;
1340            writer.flush()?;
1341        }
1342
1343        // corrupt the last byte (part of the CRC)
1344        let mut data = fs::read(&path)?;
1345        let last = data.len() - 1;
1346        data[last] ^= 0xFF;
1347        fs::write(&path, &data)?;
1348
1349        let mut reader = AofReader::open(&path)?;
1350        let err = reader.read_record().unwrap_err();
1351        assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1352        Ok(())
1353    }
1354
1355    #[test]
1356    fn missing_magic_is_error() {
1357        let dir = temp_dir();
1358        let path = dir.path().join("bad.aof");
1359        fs::write(&path, b"NOT_AOF_DATA").unwrap();
1360
1361        let err = AofReader::open(&path).unwrap_err();
1362        assert!(matches!(err, FormatError::InvalidMagic));
1363    }
1364
1365    #[test]
1366    fn truncate_resets_aof() -> Result {
1367        let dir = temp_dir();
1368        let path = dir.path().join("reset.aof");
1369
1370        {
1371            let mut writer = AofWriter::open(&path)?;
1372            writer.write_record(&AofRecord::Set {
1373                key: "old".into(),
1374                value: Bytes::from("data"),
1375                expire_ms: -1,
1376            })?;
1377            writer.truncate()?;
1378
1379            // write a new record after truncation
1380            writer.write_record(&AofRecord::Set {
1381                key: "new".into(),
1382                value: Bytes::from("fresh"),
1383                expire_ms: -1,
1384            })?;
1385            writer.sync()?;
1386        }
1387
1388        let mut reader = AofReader::open(&path)?;
1389        let rec = reader.read_record()?.unwrap();
1390        match rec {
1391            AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1392            other => panic!("expected Set, got {other:?}"),
1393        }
1394        // only one record after truncation
1395        assert!(reader.read_record()?.is_none());
1396        Ok(())
1397    }
1398
1399    #[test]
1400    fn record_round_trip_lpush() -> Result {
1401        let rec = AofRecord::LPush {
1402            key: "list".into(),
1403            values: vec![Bytes::from("a"), Bytes::from("b")],
1404        };
1405        let bytes = rec.to_bytes()?;
1406        let decoded = AofRecord::from_bytes(&bytes)?;
1407        assert_eq!(rec, decoded);
1408        Ok(())
1409    }
1410
1411    #[test]
1412    fn record_round_trip_rpush() -> Result {
1413        let rec = AofRecord::RPush {
1414            key: "list".into(),
1415            values: vec![Bytes::from("x")],
1416        };
1417        let bytes = rec.to_bytes()?;
1418        let decoded = AofRecord::from_bytes(&bytes)?;
1419        assert_eq!(rec, decoded);
1420        Ok(())
1421    }
1422
1423    #[test]
1424    fn record_round_trip_lpop() -> Result {
1425        let rec = AofRecord::LPop { key: "list".into() };
1426        let bytes = rec.to_bytes()?;
1427        let decoded = AofRecord::from_bytes(&bytes)?;
1428        assert_eq!(rec, decoded);
1429        Ok(())
1430    }
1431
1432    #[test]
1433    fn record_round_trip_rpop() -> Result {
1434        let rec = AofRecord::RPop { key: "list".into() };
1435        let bytes = rec.to_bytes()?;
1436        let decoded = AofRecord::from_bytes(&bytes)?;
1437        assert_eq!(rec, decoded);
1438        Ok(())
1439    }
1440
1441    #[test]
1442    fn writer_reader_round_trip_with_list_records() -> Result {
1443        let dir = temp_dir();
1444        let path = dir.path().join("list.aof");
1445
1446        let records = vec![
1447            AofRecord::LPush {
1448                key: "l".into(),
1449                values: vec![Bytes::from("a"), Bytes::from("b")],
1450            },
1451            AofRecord::RPush {
1452                key: "l".into(),
1453                values: vec![Bytes::from("c")],
1454            },
1455            AofRecord::LPop { key: "l".into() },
1456            AofRecord::RPop { key: "l".into() },
1457        ];
1458
1459        {
1460            let mut writer = AofWriter::open(&path)?;
1461            for rec in &records {
1462                writer.write_record(rec)?;
1463            }
1464            writer.sync()?;
1465        }
1466
1467        let mut reader = AofReader::open(&path)?;
1468        let mut got = Vec::new();
1469        while let Some(rec) = reader.read_record()? {
1470            got.push(rec);
1471        }
1472        assert_eq!(records, got);
1473        Ok(())
1474    }
1475
1476    #[test]
1477    fn record_round_trip_zadd() -> Result {
1478        let rec = AofRecord::ZAdd {
1479            key: "board".into(),
1480            members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1481        };
1482        let bytes = rec.to_bytes()?;
1483        let decoded = AofRecord::from_bytes(&bytes)?;
1484        assert_eq!(rec, decoded);
1485        Ok(())
1486    }
1487
1488    #[test]
1489    fn record_round_trip_zrem() -> Result {
1490        let rec = AofRecord::ZRem {
1491            key: "board".into(),
1492            members: vec!["alice".into(), "bob".into()],
1493        };
1494        let bytes = rec.to_bytes()?;
1495        let decoded = AofRecord::from_bytes(&bytes)?;
1496        assert_eq!(rec, decoded);
1497        Ok(())
1498    }
1499
1500    #[test]
1501    fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1502        let dir = temp_dir();
1503        let path = dir.path().join("zset.aof");
1504
1505        let records = vec![
1506            AofRecord::ZAdd {
1507                key: "board".into(),
1508                members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1509            },
1510            AofRecord::ZRem {
1511                key: "board".into(),
1512                members: vec!["alice".into()],
1513            },
1514        ];
1515
1516        {
1517            let mut writer = AofWriter::open(&path)?;
1518            for rec in &records {
1519                writer.write_record(rec)?;
1520            }
1521            writer.sync()?;
1522        }
1523
1524        let mut reader = AofReader::open(&path)?;
1525        let mut got = Vec::new();
1526        while let Some(rec) = reader.read_record()? {
1527            got.push(rec);
1528        }
1529        assert_eq!(records, got);
1530        Ok(())
1531    }
1532
1533    #[test]
1534    fn record_round_trip_persist() -> Result {
1535        let rec = AofRecord::Persist {
1536            key: "mykey".into(),
1537        };
1538        let bytes = rec.to_bytes()?;
1539        let decoded = AofRecord::from_bytes(&bytes)?;
1540        assert_eq!(rec, decoded);
1541        Ok(())
1542    }
1543
1544    #[test]
1545    fn record_round_trip_pexpire() -> Result {
1546        let rec = AofRecord::Pexpire {
1547            key: "mykey".into(),
1548            milliseconds: 5000,
1549        };
1550        let bytes = rec.to_bytes()?;
1551        let decoded = AofRecord::from_bytes(&bytes)?;
1552        assert_eq!(rec, decoded);
1553        Ok(())
1554    }
1555
1556    #[test]
1557    fn record_round_trip_incr() -> Result {
1558        let rec = AofRecord::Incr {
1559            key: "counter".into(),
1560        };
1561        let bytes = rec.to_bytes()?;
1562        let decoded = AofRecord::from_bytes(&bytes)?;
1563        assert_eq!(rec, decoded);
1564        Ok(())
1565    }
1566
1567    #[test]
1568    fn record_round_trip_decr() -> Result {
1569        let rec = AofRecord::Decr {
1570            key: "counter".into(),
1571        };
1572        let bytes = rec.to_bytes()?;
1573        let decoded = AofRecord::from_bytes(&bytes)?;
1574        assert_eq!(rec, decoded);
1575        Ok(())
1576    }
1577
1578    #[test]
1579    fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1580        let dir = temp_dir();
1581        let path = dir.path().join("persist_pexpire.aof");
1582
1583        let records = vec![
1584            AofRecord::Set {
1585                key: "k".into(),
1586                value: Bytes::from("v"),
1587                expire_ms: 5000,
1588            },
1589            AofRecord::Persist { key: "k".into() },
1590            AofRecord::Pexpire {
1591                key: "k".into(),
1592                milliseconds: 3000,
1593            },
1594        ];
1595
1596        {
1597            let mut writer = AofWriter::open(&path)?;
1598            for rec in &records {
1599                writer.write_record(rec)?;
1600            }
1601            writer.sync()?;
1602        }
1603
1604        let mut reader = AofReader::open(&path)?;
1605        let mut got = Vec::new();
1606        while let Some(rec) = reader.read_record()? {
1607            got.push(rec);
1608        }
1609        assert_eq!(records, got);
1610        Ok(())
1611    }
1612
1613    #[test]
1614    fn aof_path_format() {
1615        let p = aof_path(Path::new("/data"), 3);
1616        assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1617    }
1618
1619    #[test]
1620    fn record_round_trip_hset() -> Result {
1621        let rec = AofRecord::HSet {
1622            key: "hash".into(),
1623            fields: vec![
1624                ("f1".into(), Bytes::from("v1")),
1625                ("f2".into(), Bytes::from("v2")),
1626            ],
1627        };
1628        let bytes = rec.to_bytes()?;
1629        let decoded = AofRecord::from_bytes(&bytes)?;
1630        assert_eq!(rec, decoded);
1631        Ok(())
1632    }
1633
1634    #[test]
1635    fn record_round_trip_hdel() -> Result {
1636        let rec = AofRecord::HDel {
1637            key: "hash".into(),
1638            fields: vec!["f1".into(), "f2".into()],
1639        };
1640        let bytes = rec.to_bytes()?;
1641        let decoded = AofRecord::from_bytes(&bytes)?;
1642        assert_eq!(rec, decoded);
1643        Ok(())
1644    }
1645
1646    #[test]
1647    fn record_round_trip_hincrby() -> Result {
1648        let rec = AofRecord::HIncrBy {
1649            key: "hash".into(),
1650            field: "counter".into(),
1651            delta: -42,
1652        };
1653        let bytes = rec.to_bytes()?;
1654        let decoded = AofRecord::from_bytes(&bytes)?;
1655        assert_eq!(rec, decoded);
1656        Ok(())
1657    }
1658
1659    #[test]
1660    fn record_round_trip_sadd() -> Result {
1661        let rec = AofRecord::SAdd {
1662            key: "set".into(),
1663            members: vec!["m1".into(), "m2".into(), "m3".into()],
1664        };
1665        let bytes = rec.to_bytes()?;
1666        let decoded = AofRecord::from_bytes(&bytes)?;
1667        assert_eq!(rec, decoded);
1668        Ok(())
1669    }
1670
1671    #[test]
1672    fn record_round_trip_srem() -> Result {
1673        let rec = AofRecord::SRem {
1674            key: "set".into(),
1675            members: vec!["m1".into()],
1676        };
1677        let bytes = rec.to_bytes()?;
1678        let decoded = AofRecord::from_bytes(&bytes)?;
1679        assert_eq!(rec, decoded);
1680        Ok(())
1681    }
1682
1683    #[cfg(feature = "vector")]
1684    #[test]
1685    fn record_round_trip_vadd() -> Result {
1686        let rec = AofRecord::VAdd {
1687            key: "embeddings".into(),
1688            element: "doc1".into(),
1689            vector: vec![0.1, 0.2, 0.3],
1690            metric: 0,       // cosine
1691            quantization: 0, // f32
1692            connectivity: 16,
1693            expansion_add: 64,
1694        };
1695        let bytes = rec.to_bytes()?;
1696        let decoded = AofRecord::from_bytes(&bytes)?;
1697        assert_eq!(rec, decoded);
1698        Ok(())
1699    }
1700
1701    #[cfg(feature = "vector")]
1702    #[test]
1703    fn record_round_trip_vadd_high_dim() -> Result {
1704        let rec = AofRecord::VAdd {
1705            key: "vecs".into(),
1706            element: "e".into(),
1707            vector: vec![0.0; 1536], // typical embedding dimension
1708            metric: 1,               // l2
1709            quantization: 1,         // f16
1710            connectivity: 32,
1711            expansion_add: 128,
1712        };
1713        let bytes = rec.to_bytes()?;
1714        let decoded = AofRecord::from_bytes(&bytes)?;
1715        assert_eq!(rec, decoded);
1716        Ok(())
1717    }
1718
1719    #[cfg(feature = "vector")]
1720    #[test]
1721    fn record_round_trip_vrem() -> Result {
1722        let rec = AofRecord::VRem {
1723            key: "embeddings".into(),
1724            element: "doc1".into(),
1725        };
1726        let bytes = rec.to_bytes()?;
1727        let decoded = AofRecord::from_bytes(&bytes)?;
1728        assert_eq!(rec, decoded);
1729        Ok(())
1730    }
1731
1732    #[cfg(feature = "encryption")]
1733    mod encrypted {
1734        use super::*;
1735        use crate::encryption::EncryptionKey;
1736
1737        type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1738
1739        fn test_key() -> EncryptionKey {
1740            EncryptionKey::from_bytes([0x42; 32])
1741        }
1742
1743        #[test]
1744        fn encrypted_writer_reader_round_trip() -> Result {
1745            let dir = temp_dir();
1746            let path = dir.path().join("enc.aof");
1747            let key = test_key();
1748
1749            let records = vec![
1750                AofRecord::Set {
1751                    key: "a".into(),
1752                    value: Bytes::from("1"),
1753                    expire_ms: -1,
1754                },
1755                AofRecord::Del { key: "a".into() },
1756                AofRecord::LPush {
1757                    key: "list".into(),
1758                    values: vec![Bytes::from("x"), Bytes::from("y")],
1759                },
1760                AofRecord::ZAdd {
1761                    key: "zs".into(),
1762                    members: vec![(1.0, "m".into())],
1763                },
1764            ];
1765
1766            {
1767                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1768                for rec in &records {
1769                    writer.write_record(rec)?;
1770                }
1771                writer.sync()?;
1772            }
1773
1774            let mut reader = AofReader::open_encrypted(&path, key)?;
1775            let mut got = Vec::new();
1776            while let Some(rec) = reader.read_record()? {
1777                got.push(rec);
1778            }
1779            assert_eq!(records, got);
1780            Ok(())
1781        }
1782
1783        #[test]
1784        fn encrypted_aof_wrong_key_fails() -> Result {
1785            let dir = temp_dir();
1786            let path = dir.path().join("enc_bad.aof");
1787            let key = test_key();
1788            let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
1789
1790            {
1791                let mut writer = AofWriter::open_encrypted(&path, key)?;
1792                writer.write_record(&AofRecord::Set {
1793                    key: "k".into(),
1794                    value: Bytes::from("v"),
1795                    expire_ms: -1,
1796                })?;
1797                writer.sync()?;
1798            }
1799
1800            let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
1801            let err = reader.read_record().unwrap_err();
1802            assert!(matches!(err, FormatError::DecryptionFailed));
1803            Ok(())
1804        }
1805
1806        #[test]
1807        fn v2_file_readable_with_encryption_key() -> Result {
1808            let dir = temp_dir();
1809            let path = dir.path().join("v2.aof");
1810            let key = test_key();
1811
1812            // write a plaintext v2 file
1813            {
1814                let mut writer = AofWriter::open(&path)?;
1815                writer.write_record(&AofRecord::Set {
1816                    key: "k".into(),
1817                    value: Bytes::from("v"),
1818                    expire_ms: -1,
1819                })?;
1820                writer.sync()?;
1821            }
1822
1823            // read with encryption key — should work (v2 is plaintext)
1824            let mut reader = AofReader::open_encrypted(&path, key)?;
1825            let rec = reader.read_record()?.unwrap();
1826            assert!(matches!(rec, AofRecord::Set { .. }));
1827            Ok(())
1828        }
1829
1830        #[test]
1831        fn v3_file_without_key_returns_error() -> Result {
1832            let dir = temp_dir();
1833            let path = dir.path().join("v3_nokey.aof");
1834            let key = test_key();
1835
1836            // write an encrypted v3 file
1837            {
1838                let mut writer = AofWriter::open_encrypted(&path, key)?;
1839                writer.write_record(&AofRecord::Set {
1840                    key: "k".into(),
1841                    value: Bytes::from("v"),
1842                    expire_ms: -1,
1843                })?;
1844                writer.sync()?;
1845            }
1846
1847            // try to open without a key
1848            let err = AofReader::open(&path).unwrap_err();
1849            assert!(matches!(err, FormatError::EncryptionRequired));
1850            Ok(())
1851        }
1852
1853        #[test]
1854        fn encrypted_truncate_preserves_encryption() -> Result {
1855            let dir = temp_dir();
1856            let path = dir.path().join("enc_trunc.aof");
1857            let key = test_key();
1858
1859            {
1860                let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1861                writer.write_record(&AofRecord::Set {
1862                    key: "old".into(),
1863                    value: Bytes::from("data"),
1864                    expire_ms: -1,
1865                })?;
1866                writer.truncate()?;
1867
1868                writer.write_record(&AofRecord::Set {
1869                    key: "new".into(),
1870                    value: Bytes::from("fresh"),
1871                    expire_ms: -1,
1872                })?;
1873                writer.sync()?;
1874            }
1875
1876            let mut reader = AofReader::open_encrypted(&path, key)?;
1877            let rec = reader.read_record()?.unwrap();
1878            match rec {
1879                AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1880                other => panic!("expected Set, got {other:?}"),
1881            }
1882            assert!(reader.read_record()?.is_none());
1883            Ok(())
1884        }
1885    }
1886}