1use std::fmt;
20use std::fs::{self, File, OpenOptions};
21#[cfg(feature = "encryption")]
22use std::io::Read as _;
23use std::io::{self, BufReader, BufWriter, Write};
24use std::path::{Path, PathBuf};
25
26use bytes::Bytes;
27
28use crate::format::{self, FormatError};
29
30fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
32 let bytes = format::read_bytes(r)?;
33 String::from_utf8(bytes).map_err(|_| {
34 FormatError::Io(io::Error::new(
35 io::ErrorKind::InvalidData,
36 format!("{field} is not valid utf-8"),
37 ))
38 })
39}
40
41fn read_string_list(r: &mut impl io::Read, field: &str) -> Result<Vec<String>, FormatError> {
44 let count = format::read_u32(r)?;
45 format::validate_collection_count(count, field)?;
46 let mut items = Vec::with_capacity(format::capped_capacity(count));
47 for _ in 0..count {
48 items.push(read_string(r, field)?);
49 }
50 Ok(items)
51}
52
53const TAG_SET: u8 = 1;
58const TAG_INCR: u8 = 12;
59const TAG_DECR: u8 = 13;
60const TAG_INCRBY: u8 = 19;
61const TAG_DECRBY: u8 = 20;
62const TAG_APPEND: u8 = 21;
63
64const TAG_LPUSH: u8 = 4;
66const TAG_RPUSH: u8 = 5;
67const TAG_LPOP: u8 = 6;
68const TAG_RPOP: u8 = 7;
69
70const TAG_ZADD: u8 = 8;
72const TAG_ZREM: u8 = 9;
73
74const TAG_HSET: u8 = 14;
76const TAG_HDEL: u8 = 15;
77const TAG_HINCRBY: u8 = 16;
78
79const TAG_SADD: u8 = 17;
81const TAG_SREM: u8 = 18;
82
83const TAG_DEL: u8 = 2;
85const TAG_EXPIRE: u8 = 3;
86const TAG_PERSIST: u8 = 10;
87const TAG_PEXPIRE: u8 = 11;
88const TAG_RENAME: u8 = 22;
89
90#[cfg(feature = "vector")]
92const TAG_VADD: u8 = 25;
93#[cfg(feature = "vector")]
94const TAG_VREM: u8 = 26;
95
96#[cfg(feature = "protobuf")]
98const TAG_PROTO_SET: u8 = 23;
99#[cfg(feature = "protobuf")]
100const TAG_PROTO_REGISTER: u8 = 24;
101
102#[derive(Debug, Clone, PartialEq)]
104pub enum AofRecord {
105 Set {
107 key: String,
108 value: Bytes,
109 expire_ms: i64,
110 },
111 Del { key: String },
113 Expire { key: String, seconds: u64 },
115 LPush { key: String, values: Vec<Bytes> },
117 RPush { key: String, values: Vec<Bytes> },
119 LPop { key: String },
121 RPop { key: String },
123 ZAdd {
125 key: String,
126 members: Vec<(f64, String)>,
127 },
128 ZRem { key: String, members: Vec<String> },
130 Persist { key: String },
132 Pexpire { key: String, milliseconds: u64 },
134 Incr { key: String },
136 Decr { key: String },
138 HSet {
140 key: String,
141 fields: Vec<(String, Bytes)>,
142 },
143 HDel { key: String, fields: Vec<String> },
145 HIncrBy {
147 key: String,
148 field: String,
149 delta: i64,
150 },
151 SAdd { key: String, members: Vec<String> },
153 SRem { key: String, members: Vec<String> },
155 IncrBy { key: String, delta: i64 },
157 DecrBy { key: String, delta: i64 },
159 Append { key: String, value: Bytes },
161 Rename { key: String, newkey: String },
163 #[cfg(feature = "vector")]
166 VAdd {
167 key: String,
168 element: String,
169 vector: Vec<f32>,
170 metric: u8,
172 quantization: u8,
174 connectivity: u32,
175 expansion_add: u32,
176 },
177 #[cfg(feature = "vector")]
179 VRem { key: String, element: String },
180 #[cfg(feature = "protobuf")]
182 ProtoSet {
183 key: String,
184 type_name: String,
185 data: Bytes,
186 expire_ms: i64,
187 },
188 #[cfg(feature = "protobuf")]
190 ProtoRegister { name: String, descriptor: Bytes },
191}
192
193impl AofRecord {
194 fn tag(&self) -> u8 {
196 match self {
197 AofRecord::Set { .. } => TAG_SET,
198 AofRecord::Del { .. } => TAG_DEL,
199 AofRecord::Expire { .. } => TAG_EXPIRE,
200 AofRecord::LPush { .. } => TAG_LPUSH,
201 AofRecord::RPush { .. } => TAG_RPUSH,
202 AofRecord::LPop { .. } => TAG_LPOP,
203 AofRecord::RPop { .. } => TAG_RPOP,
204 AofRecord::ZAdd { .. } => TAG_ZADD,
205 AofRecord::ZRem { .. } => TAG_ZREM,
206 AofRecord::Persist { .. } => TAG_PERSIST,
207 AofRecord::Pexpire { .. } => TAG_PEXPIRE,
208 AofRecord::Incr { .. } => TAG_INCR,
209 AofRecord::Decr { .. } => TAG_DECR,
210 AofRecord::HSet { .. } => TAG_HSET,
211 AofRecord::HDel { .. } => TAG_HDEL,
212 AofRecord::HIncrBy { .. } => TAG_HINCRBY,
213 AofRecord::SAdd { .. } => TAG_SADD,
214 AofRecord::SRem { .. } => TAG_SREM,
215 AofRecord::IncrBy { .. } => TAG_INCRBY,
216 AofRecord::DecrBy { .. } => TAG_DECRBY,
217 AofRecord::Append { .. } => TAG_APPEND,
218 AofRecord::Rename { .. } => TAG_RENAME,
219 #[cfg(feature = "vector")]
220 AofRecord::VAdd { .. } => TAG_VADD,
221 #[cfg(feature = "vector")]
222 AofRecord::VRem { .. } => TAG_VREM,
223 #[cfg(feature = "protobuf")]
224 AofRecord::ProtoSet { .. } => TAG_PROTO_SET,
225 #[cfg(feature = "protobuf")]
226 AofRecord::ProtoRegister { .. } => TAG_PROTO_REGISTER,
227 }
228 }
229
230 fn estimated_size(&self) -> usize {
236 const LEN_PREFIX: usize = 4;
238
239 match self {
240 AofRecord::Set {
241 key,
242 value,
243 expire_ms: _,
244 } => 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len() + 8,
245 AofRecord::Del { key }
246 | AofRecord::LPop { key }
247 | AofRecord::RPop { key }
248 | AofRecord::Persist { key }
249 | AofRecord::Incr { key }
250 | AofRecord::Decr { key } => 1 + LEN_PREFIX + key.len(),
251 AofRecord::Expire { key, .. } | AofRecord::Pexpire { key, .. } => {
252 1 + LEN_PREFIX + key.len() + 8
253 }
254 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
255 let values_size: usize = values.iter().map(|v| LEN_PREFIX + v.len()).sum();
256 1 + LEN_PREFIX + key.len() + 4 + values_size
257 }
258 AofRecord::ZAdd { key, members } => {
259 let members_size: usize =
260 members.iter().map(|(_, m)| 8 + LEN_PREFIX + m.len()).sum();
261 1 + LEN_PREFIX + key.len() + 4 + members_size
262 }
263 AofRecord::ZRem { key, members }
264 | AofRecord::SAdd { key, members }
265 | AofRecord::SRem { key, members } => {
266 let members_size: usize = members.iter().map(|m| LEN_PREFIX + m.len()).sum();
267 1 + LEN_PREFIX + key.len() + 4 + members_size
268 }
269 AofRecord::HSet { key, fields } => {
270 let fields_size: usize = fields
271 .iter()
272 .map(|(f, v)| LEN_PREFIX + f.len() + LEN_PREFIX + v.len())
273 .sum();
274 1 + LEN_PREFIX + key.len() + 4 + fields_size
275 }
276 AofRecord::HDel { key, fields } => {
277 let fields_size: usize = fields.iter().map(|f| LEN_PREFIX + f.len()).sum();
278 1 + LEN_PREFIX + key.len() + 4 + fields_size
279 }
280 AofRecord::HIncrBy { key, field, .. } => {
281 1 + LEN_PREFIX + key.len() + LEN_PREFIX + field.len() + 8
282 }
283 AofRecord::IncrBy { key, .. } | AofRecord::DecrBy { key, .. } => {
284 1 + LEN_PREFIX + key.len() + 8
285 }
286 AofRecord::Append { key, value } => {
287 1 + LEN_PREFIX + key.len() + LEN_PREFIX + value.len()
288 }
289 AofRecord::Rename { key, newkey } => {
290 1 + LEN_PREFIX + key.len() + LEN_PREFIX + newkey.len()
291 }
292 #[cfg(feature = "vector")]
293 AofRecord::VAdd {
294 key,
295 element,
296 vector,
297 ..
298 } => {
299 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len() + 4 + vector.len() * 4 + 10
300 }
301 #[cfg(feature = "vector")]
302 AofRecord::VRem { key, element } => {
303 1 + LEN_PREFIX + key.len() + LEN_PREFIX + element.len()
304 }
305 #[cfg(feature = "protobuf")]
306 AofRecord::ProtoSet {
307 key,
308 type_name,
309 data,
310 ..
311 } => {
312 1 + LEN_PREFIX
313 + key.len()
314 + LEN_PREFIX
315 + type_name.len()
316 + LEN_PREFIX
317 + data.len()
318 + 8
319 }
320 #[cfg(feature = "protobuf")]
321 AofRecord::ProtoRegister { name, descriptor } => {
322 1 + LEN_PREFIX + name.len() + LEN_PREFIX + descriptor.len()
323 }
324 }
325 }
326
327 fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
329 let mut buf = Vec::with_capacity(self.estimated_size());
330 format::write_u8(&mut buf, self.tag())?;
331
332 match self {
333 AofRecord::Del { key }
335 | AofRecord::LPop { key }
336 | AofRecord::RPop { key }
337 | AofRecord::Persist { key }
338 | AofRecord::Incr { key }
339 | AofRecord::Decr { key } => {
340 format::write_bytes(&mut buf, key.as_bytes())?;
341 }
342
343 AofRecord::Set {
345 key,
346 value,
347 expire_ms,
348 } => {
349 format::write_bytes(&mut buf, key.as_bytes())?;
350 format::write_bytes(&mut buf, value)?;
351 format::write_i64(&mut buf, *expire_ms)?;
352 }
353
354 AofRecord::Expire { key, seconds } => {
356 format::write_bytes(&mut buf, key.as_bytes())?;
357 format::write_i64(&mut buf, *seconds as i64)?;
358 }
359 AofRecord::Pexpire { key, milliseconds } => {
360 format::write_bytes(&mut buf, key.as_bytes())?;
361 format::write_i64(&mut buf, *milliseconds as i64)?;
362 }
363 AofRecord::IncrBy { key, delta } | AofRecord::DecrBy { key, delta } => {
364 format::write_bytes(&mut buf, key.as_bytes())?;
365 format::write_i64(&mut buf, *delta)?;
366 }
367
368 AofRecord::LPush { key, values } | AofRecord::RPush { key, values } => {
370 format::write_bytes(&mut buf, key.as_bytes())?;
371 format::write_len(&mut buf, values.len())?;
372 for v in values {
373 format::write_bytes(&mut buf, v)?;
374 }
375 }
376
377 AofRecord::ZRem { key, members }
379 | AofRecord::SAdd { key, members }
380 | AofRecord::SRem { key, members } => {
381 format::write_bytes(&mut buf, key.as_bytes())?;
382 format::write_len(&mut buf, members.len())?;
383 for member in members {
384 format::write_bytes(&mut buf, member.as_bytes())?;
385 }
386 }
387 AofRecord::HDel { key, fields } => {
388 format::write_bytes(&mut buf, key.as_bytes())?;
389 format::write_len(&mut buf, fields.len())?;
390 for field in fields {
391 format::write_bytes(&mut buf, field.as_bytes())?;
392 }
393 }
394
395 AofRecord::ZAdd { key, members } => {
397 format::write_bytes(&mut buf, key.as_bytes())?;
398 format::write_len(&mut buf, members.len())?;
399 for (score, member) in members {
400 format::write_f64(&mut buf, *score)?;
401 format::write_bytes(&mut buf, member.as_bytes())?;
402 }
403 }
404
405 AofRecord::HSet { key, fields } => {
407 format::write_bytes(&mut buf, key.as_bytes())?;
408 format::write_len(&mut buf, fields.len())?;
409 for (field, value) in fields {
410 format::write_bytes(&mut buf, field.as_bytes())?;
411 format::write_bytes(&mut buf, value)?;
412 }
413 }
414
415 AofRecord::HIncrBy { key, field, delta } => {
417 format::write_bytes(&mut buf, key.as_bytes())?;
418 format::write_bytes(&mut buf, field.as_bytes())?;
419 format::write_i64(&mut buf, *delta)?;
420 }
421
422 AofRecord::Append { key, value } => {
424 format::write_bytes(&mut buf, key.as_bytes())?;
425 format::write_bytes(&mut buf, value)?;
426 }
427
428 AofRecord::Rename { key, newkey } => {
430 format::write_bytes(&mut buf, key.as_bytes())?;
431 format::write_bytes(&mut buf, newkey.as_bytes())?;
432 }
433
434 #[cfg(feature = "vector")]
435 AofRecord::VAdd {
436 key,
437 element,
438 vector,
439 metric,
440 quantization,
441 connectivity,
442 expansion_add,
443 } => {
444 format::write_bytes(&mut buf, key.as_bytes())?;
445 format::write_bytes(&mut buf, element.as_bytes())?;
446 format::write_len(&mut buf, vector.len())?;
447 for &v in vector {
448 format::write_f32(&mut buf, v)?;
449 }
450 format::write_u8(&mut buf, *metric)?;
451 format::write_u8(&mut buf, *quantization)?;
452 format::write_u32(&mut buf, *connectivity)?;
453 format::write_u32(&mut buf, *expansion_add)?;
454 }
455 #[cfg(feature = "vector")]
456 AofRecord::VRem { key, element } => {
457 format::write_bytes(&mut buf, key.as_bytes())?;
458 format::write_bytes(&mut buf, element.as_bytes())?;
459 }
460
461 #[cfg(feature = "protobuf")]
462 AofRecord::ProtoSet {
463 key,
464 type_name,
465 data,
466 expire_ms,
467 } => {
468 format::write_bytes(&mut buf, key.as_bytes())?;
469 format::write_bytes(&mut buf, type_name.as_bytes())?;
470 format::write_bytes(&mut buf, data)?;
471 format::write_i64(&mut buf, *expire_ms)?;
472 }
473 #[cfg(feature = "protobuf")]
474 AofRecord::ProtoRegister { name, descriptor } => {
475 format::write_bytes(&mut buf, name.as_bytes())?;
476 format::write_bytes(&mut buf, descriptor)?;
477 }
478 }
479 Ok(buf)
480 }
481
482 fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
484 let mut cursor = io::Cursor::new(data);
485 let tag = format::read_u8(&mut cursor)?;
486 match tag {
487 TAG_SET => {
488 let key = read_string(&mut cursor, "key")?;
489 let value = format::read_bytes(&mut cursor)?;
490 let expire_ms = format::read_i64(&mut cursor)?;
491 Ok(AofRecord::Set {
492 key,
493 value: Bytes::from(value),
494 expire_ms,
495 })
496 }
497 TAG_DEL => {
498 let key = read_string(&mut cursor, "key")?;
499 Ok(AofRecord::Del { key })
500 }
501 TAG_EXPIRE => {
502 let key = read_string(&mut cursor, "key")?;
503 let seconds = format::read_i64(&mut cursor)? as u64;
504 Ok(AofRecord::Expire { key, seconds })
505 }
506 TAG_LPUSH | TAG_RPUSH => {
507 let key = read_string(&mut cursor, "key")?;
508 let count = format::read_u32(&mut cursor)?;
509 format::validate_collection_count(count, "list")?;
510 let mut values = Vec::with_capacity(format::capped_capacity(count));
511 for _ in 0..count {
512 values.push(Bytes::from(format::read_bytes(&mut cursor)?));
513 }
514 if tag == TAG_LPUSH {
515 Ok(AofRecord::LPush { key, values })
516 } else {
517 Ok(AofRecord::RPush { key, values })
518 }
519 }
520 TAG_LPOP => {
521 let key = read_string(&mut cursor, "key")?;
522 Ok(AofRecord::LPop { key })
523 }
524 TAG_RPOP => {
525 let key = read_string(&mut cursor, "key")?;
526 Ok(AofRecord::RPop { key })
527 }
528 TAG_ZADD => {
529 let key = read_string(&mut cursor, "key")?;
530 let count = format::read_u32(&mut cursor)?;
531 format::validate_collection_count(count, "sorted set")?;
532 let mut members = Vec::with_capacity(format::capped_capacity(count));
533 for _ in 0..count {
534 let score = format::read_f64(&mut cursor)?;
535 let member = read_string(&mut cursor, "member")?;
536 members.push((score, member));
537 }
538 Ok(AofRecord::ZAdd { key, members })
539 }
540 TAG_ZREM => {
541 let key = read_string(&mut cursor, "key")?;
542 let members = read_string_list(&mut cursor, "member")?;
543 Ok(AofRecord::ZRem { key, members })
544 }
545 TAG_PERSIST => {
546 let key = read_string(&mut cursor, "key")?;
547 Ok(AofRecord::Persist { key })
548 }
549 TAG_PEXPIRE => {
550 let key = read_string(&mut cursor, "key")?;
551 let milliseconds = format::read_i64(&mut cursor)? as u64;
552 Ok(AofRecord::Pexpire { key, milliseconds })
553 }
554 TAG_INCR => {
555 let key = read_string(&mut cursor, "key")?;
556 Ok(AofRecord::Incr { key })
557 }
558 TAG_DECR => {
559 let key = read_string(&mut cursor, "key")?;
560 Ok(AofRecord::Decr { key })
561 }
562 TAG_HSET => {
563 let key = read_string(&mut cursor, "key")?;
564 let count = format::read_u32(&mut cursor)?;
565 format::validate_collection_count(count, "hash")?;
566 let mut fields = Vec::with_capacity(format::capped_capacity(count));
567 for _ in 0..count {
568 let field = read_string(&mut cursor, "field")?;
569 let value = Bytes::from(format::read_bytes(&mut cursor)?);
570 fields.push((field, value));
571 }
572 Ok(AofRecord::HSet { key, fields })
573 }
574 TAG_HDEL => {
575 let key = read_string(&mut cursor, "key")?;
576 let fields = read_string_list(&mut cursor, "field")?;
577 Ok(AofRecord::HDel { key, fields })
578 }
579 TAG_HINCRBY => {
580 let key = read_string(&mut cursor, "key")?;
581 let field = read_string(&mut cursor, "field")?;
582 let delta = format::read_i64(&mut cursor)?;
583 Ok(AofRecord::HIncrBy { key, field, delta })
584 }
585 TAG_SADD => {
586 let key = read_string(&mut cursor, "key")?;
587 let members = read_string_list(&mut cursor, "member")?;
588 Ok(AofRecord::SAdd { key, members })
589 }
590 TAG_SREM => {
591 let key = read_string(&mut cursor, "key")?;
592 let members = read_string_list(&mut cursor, "member")?;
593 Ok(AofRecord::SRem { key, members })
594 }
595 TAG_INCRBY => {
596 let key = read_string(&mut cursor, "key")?;
597 let delta = format::read_i64(&mut cursor)?;
598 Ok(AofRecord::IncrBy { key, delta })
599 }
600 TAG_DECRBY => {
601 let key = read_string(&mut cursor, "key")?;
602 let delta = format::read_i64(&mut cursor)?;
603 Ok(AofRecord::DecrBy { key, delta })
604 }
605 TAG_APPEND => {
606 let key = read_string(&mut cursor, "key")?;
607 let value = Bytes::from(format::read_bytes(&mut cursor)?);
608 Ok(AofRecord::Append { key, value })
609 }
610 TAG_RENAME => {
611 let key = read_string(&mut cursor, "key")?;
612 let newkey = read_string(&mut cursor, "newkey")?;
613 Ok(AofRecord::Rename { key, newkey })
614 }
615 #[cfg(feature = "vector")]
616 TAG_VADD => {
617 let key = read_string(&mut cursor, "key")?;
618 let element = read_string(&mut cursor, "element")?;
619 let dim = format::read_u32(&mut cursor)?;
620 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
621 return Err(FormatError::InvalidData(format!(
622 "AOF VADD dimension {dim} exceeds max {}",
623 format::MAX_PERSISTED_VECTOR_DIMS
624 )));
625 }
626 let mut vector = Vec::with_capacity(dim as usize);
627 for _ in 0..dim {
628 vector.push(format::read_f32(&mut cursor)?);
629 }
630 let metric = format::read_u8(&mut cursor)?;
631 let quantization = format::read_u8(&mut cursor)?;
632 let connectivity = format::read_u32(&mut cursor)?;
633 let expansion_add = format::read_u32(&mut cursor)?;
634 Ok(AofRecord::VAdd {
635 key,
636 element,
637 vector,
638 metric,
639 quantization,
640 connectivity,
641 expansion_add,
642 })
643 }
644 #[cfg(feature = "vector")]
645 TAG_VREM => {
646 let key = read_string(&mut cursor, "key")?;
647 let element = read_string(&mut cursor, "element")?;
648 Ok(AofRecord::VRem { key, element })
649 }
650 #[cfg(feature = "protobuf")]
651 TAG_PROTO_SET => {
652 let key = read_string(&mut cursor, "key")?;
653 let type_name = read_string(&mut cursor, "type_name")?;
654 let data = format::read_bytes(&mut cursor)?;
655 let expire_ms = format::read_i64(&mut cursor)?;
656 Ok(AofRecord::ProtoSet {
657 key,
658 type_name,
659 data: Bytes::from(data),
660 expire_ms,
661 })
662 }
663 #[cfg(feature = "protobuf")]
664 TAG_PROTO_REGISTER => {
665 let name = read_string(&mut cursor, "name")?;
666 let descriptor = format::read_bytes(&mut cursor)?;
667 Ok(AofRecord::ProtoRegister {
668 name,
669 descriptor: Bytes::from(descriptor),
670 })
671 }
672 _ => Err(FormatError::UnknownTag(tag)),
673 }
674 }
675}
676
677#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
679pub enum FsyncPolicy {
680 Always,
682 #[default]
684 EverySec,
685 No,
687}
688
689pub struct AofWriter {
691 writer: BufWriter<File>,
692 path: PathBuf,
693 #[cfg(feature = "encryption")]
694 encryption_key: Option<crate::encryption::EncryptionKey>,
695}
696
697impl AofWriter {
698 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
701 let path = path.into();
702 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
703
704 let file = open_persistence_file(&path)?;
705 let mut writer = BufWriter::new(file);
706
707 if !exists {
708 format::write_header(&mut writer, format::AOF_MAGIC)?;
709 writer.flush()?;
710 }
711
712 Ok(Self {
713 writer,
714 path,
715 #[cfg(feature = "encryption")]
716 encryption_key: None,
717 })
718 }
719
720 #[cfg(feature = "encryption")]
726 pub fn open_encrypted(
727 path: impl Into<PathBuf>,
728 key: crate::encryption::EncryptionKey,
729 ) -> Result<Self, FormatError> {
730 let path = path.into();
731 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
732
733 let file = open_persistence_file(&path)?;
734 let mut writer = BufWriter::new(file);
735
736 if !exists {
737 format::write_header_versioned(
738 &mut writer,
739 format::AOF_MAGIC,
740 format::FORMAT_VERSION_ENCRYPTED,
741 )?;
742 writer.flush()?;
743 }
744
745 Ok(Self {
746 writer,
747 path,
748 encryption_key: Some(key),
749 })
750 }
751
752 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
757 let payload = record.to_bytes()?;
758
759 #[cfg(feature = "encryption")]
760 if let Some(ref key) = self.encryption_key {
761 let (nonce, ciphertext) = crate::encryption::encrypt_record(key, &payload)?;
762 self.writer.write_all(&nonce)?;
763 format::write_len(&mut self.writer, ciphertext.len())?;
764 self.writer.write_all(&ciphertext)?;
765 return Ok(());
766 }
767
768 let checksum = format::crc32(&payload);
769 self.writer.write_all(&payload)?;
770 format::write_u32(&mut self.writer, checksum)?;
771 Ok(())
772 }
773
774 pub fn flush(&mut self) -> Result<(), FormatError> {
776 self.writer.flush()?;
777 Ok(())
778 }
779
780 pub fn sync(&mut self) -> Result<(), FormatError> {
782 self.writer.flush()?;
783 self.writer.get_ref().sync_all()?;
784 Ok(())
785 }
786
787 pub fn path(&self) -> &Path {
789 &self.path
790 }
791
792 pub fn truncate(&mut self) -> Result<(), FormatError> {
798 self.writer.flush()?;
800
801 let tmp_path = self.path.with_extension("aof.tmp");
803 let mut opts = OpenOptions::new();
804 opts.create(true).write(true).truncate(true);
805 #[cfg(unix)]
806 {
807 use std::os::unix::fs::OpenOptionsExt;
808 opts.mode(0o600);
809 }
810 let tmp_file = opts.open(&tmp_path)?;
811 let mut tmp_writer = BufWriter::new(tmp_file);
812
813 #[cfg(feature = "encryption")]
814 if self.encryption_key.is_some() {
815 format::write_header_versioned(
816 &mut tmp_writer,
817 format::AOF_MAGIC,
818 format::FORMAT_VERSION_ENCRYPTED,
819 )?;
820 } else {
821 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
822 }
823 #[cfg(not(feature = "encryption"))]
824 format::write_header(&mut tmp_writer, format::AOF_MAGIC)?;
825
826 tmp_writer.flush()?;
827 tmp_writer.get_ref().sync_all()?;
828
829 std::fs::rename(&tmp_path, &self.path)?;
831
832 let file = OpenOptions::new().append(true).open(&self.path)?;
834 self.writer = BufWriter::new(file);
835 Ok(())
836 }
837}
838
839pub struct AofReader {
841 reader: BufReader<File>,
842 version: u8,
844 #[cfg(feature = "encryption")]
845 encryption_key: Option<crate::encryption::EncryptionKey>,
846}
847
848impl fmt::Debug for AofReader {
849 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
850 f.debug_struct("AofReader")
851 .field("version", &self.version)
852 .finish()
853 }
854}
855
856impl AofReader {
857 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
859 let file = File::open(path.as_ref())?;
860 let mut reader = BufReader::new(file);
861 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
862
863 if version == format::FORMAT_VERSION_ENCRYPTED {
864 return Err(FormatError::EncryptionRequired);
865 }
866
867 Ok(Self {
868 reader,
869 version,
870 #[cfg(feature = "encryption")]
871 encryption_key: None,
872 })
873 }
874
875 #[cfg(feature = "encryption")]
880 pub fn open_encrypted(
881 path: impl AsRef<Path>,
882 key: crate::encryption::EncryptionKey,
883 ) -> Result<Self, FormatError> {
884 let file = File::open(path.as_ref())?;
885 let mut reader = BufReader::new(file);
886 let version = format::read_header(&mut reader, format::AOF_MAGIC)?;
887
888 Ok(Self {
889 reader,
890 version,
891 encryption_key: Some(key),
892 })
893 }
894
895 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
901 #[cfg(feature = "encryption")]
902 if self.version == format::FORMAT_VERSION_ENCRYPTED {
903 return self.read_encrypted_record();
904 }
905
906 self.read_v2_record()
907 }
908
909 fn read_v2_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
911 let tag = match format::read_u8(&mut self.reader) {
913 Ok(t) => t,
914 Err(FormatError::UnexpectedEof) => return Ok(None),
915 Err(e) => return Err(e),
916 };
917
918 let record_result = self.read_payload_for_tag(tag);
921 match record_result {
922 Ok((payload, stored_crc)) => {
923 let mut full = Vec::with_capacity(1 + payload.len());
925 full.push(tag);
926 full.extend_from_slice(&payload);
927 format::verify_crc32(&full, stored_crc)?;
928 AofRecord::from_bytes(&full).map(Some)
929 }
930 Err(FormatError::UnexpectedEof) => Ok(None),
932 Err(e) => Err(e),
933 }
934 }
935
936 #[cfg(feature = "encryption")]
938 fn read_encrypted_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
939 let key = self
940 .encryption_key
941 .as_ref()
942 .ok_or(FormatError::EncryptionRequired)?;
943
944 let mut nonce = [0u8; crate::encryption::NONCE_SIZE];
946 if let Err(e) = self.reader.read_exact(&mut nonce) {
947 return if e.kind() == io::ErrorKind::UnexpectedEof {
948 Ok(None)
949 } else {
950 Err(FormatError::Io(e))
951 };
952 }
953
954 let ct_len = match format::read_u32(&mut self.reader) {
956 Ok(n) => n as usize,
957 Err(FormatError::UnexpectedEof) => return Ok(None),
958 Err(e) => return Err(e),
959 };
960
961 if ct_len > format::MAX_FIELD_LEN {
962 return Err(FormatError::Io(io::Error::new(
963 io::ErrorKind::InvalidData,
964 format!("encrypted record length {ct_len} exceeds maximum"),
965 )));
966 }
967
968 let mut ciphertext = vec![0u8; ct_len];
969 if let Err(e) = self.reader.read_exact(&mut ciphertext) {
970 return if e.kind() == io::ErrorKind::UnexpectedEof {
971 Ok(None)
972 } else {
973 Err(FormatError::Io(e))
974 };
975 }
976
977 let plaintext = crate::encryption::decrypt_record(key, &nonce, &ciphertext)?;
978 AofRecord::from_bytes(&plaintext).map(Some)
979 }
980
981 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
983 let mut payload = Vec::new();
984 match tag {
985 TAG_SET => {
986 let key = format::read_bytes(&mut self.reader)?;
988 format::write_bytes(&mut payload, &key)?;
989 let value = format::read_bytes(&mut self.reader)?;
990 format::write_bytes(&mut payload, &value)?;
991 let expire_ms = format::read_i64(&mut self.reader)?;
992 format::write_i64(&mut payload, expire_ms)?;
993 }
994 TAG_DEL => {
995 let key = format::read_bytes(&mut self.reader)?;
996 format::write_bytes(&mut payload, &key)?;
997 }
998 TAG_EXPIRE => {
999 let key = format::read_bytes(&mut self.reader)?;
1000 format::write_bytes(&mut payload, &key)?;
1001 let seconds = format::read_i64(&mut self.reader)?;
1002 format::write_i64(&mut payload, seconds)?;
1003 }
1004 TAG_LPUSH | TAG_RPUSH => {
1005 let key = format::read_bytes(&mut self.reader)?;
1006 format::write_bytes(&mut payload, &key)?;
1007 let count = format::read_u32(&mut self.reader)?;
1008 format::validate_collection_count(count, "list")?;
1009 format::write_u32(&mut payload, count)?;
1010 for _ in 0..count {
1011 let val = format::read_bytes(&mut self.reader)?;
1012 format::write_bytes(&mut payload, &val)?;
1013 }
1014 }
1015 TAG_LPOP | TAG_RPOP => {
1016 let key = format::read_bytes(&mut self.reader)?;
1017 format::write_bytes(&mut payload, &key)?;
1018 }
1019 TAG_ZADD => {
1020 let key = format::read_bytes(&mut self.reader)?;
1021 format::write_bytes(&mut payload, &key)?;
1022 let count = format::read_u32(&mut self.reader)?;
1023 format::validate_collection_count(count, "sorted set")?;
1024 format::write_u32(&mut payload, count)?;
1025 for _ in 0..count {
1026 let score = format::read_f64(&mut self.reader)?;
1027 format::write_f64(&mut payload, score)?;
1028 let member = format::read_bytes(&mut self.reader)?;
1029 format::write_bytes(&mut payload, &member)?;
1030 }
1031 }
1032 TAG_ZREM => {
1033 let key = format::read_bytes(&mut self.reader)?;
1034 format::write_bytes(&mut payload, &key)?;
1035 let count = format::read_u32(&mut self.reader)?;
1036 format::validate_collection_count(count, "sorted set")?;
1037 format::write_u32(&mut payload, count)?;
1038 for _ in 0..count {
1039 let member = format::read_bytes(&mut self.reader)?;
1040 format::write_bytes(&mut payload, &member)?;
1041 }
1042 }
1043 TAG_PERSIST => {
1044 let key = format::read_bytes(&mut self.reader)?;
1045 format::write_bytes(&mut payload, &key)?;
1046 }
1047 TAG_PEXPIRE => {
1048 let key = format::read_bytes(&mut self.reader)?;
1049 format::write_bytes(&mut payload, &key)?;
1050 let millis = format::read_i64(&mut self.reader)?;
1051 format::write_i64(&mut payload, millis)?;
1052 }
1053 TAG_INCR | TAG_DECR => {
1054 let key = format::read_bytes(&mut self.reader)?;
1055 format::write_bytes(&mut payload, &key)?;
1056 }
1057 TAG_HSET => {
1058 let key = format::read_bytes(&mut self.reader)?;
1059 format::write_bytes(&mut payload, &key)?;
1060 let count = format::read_u32(&mut self.reader)?;
1061 format::validate_collection_count(count, "hash")?;
1062 format::write_u32(&mut payload, count)?;
1063 for _ in 0..count {
1064 let field = format::read_bytes(&mut self.reader)?;
1065 format::write_bytes(&mut payload, &field)?;
1066 let value = format::read_bytes(&mut self.reader)?;
1067 format::write_bytes(&mut payload, &value)?;
1068 }
1069 }
1070 TAG_HDEL | TAG_SADD | TAG_SREM => {
1071 let key = format::read_bytes(&mut self.reader)?;
1072 format::write_bytes(&mut payload, &key)?;
1073 let count = format::read_u32(&mut self.reader)?;
1074 format::validate_collection_count(count, "set")?;
1075 format::write_u32(&mut payload, count)?;
1076 for _ in 0..count {
1077 let item = format::read_bytes(&mut self.reader)?;
1078 format::write_bytes(&mut payload, &item)?;
1079 }
1080 }
1081 TAG_HINCRBY => {
1082 let key = format::read_bytes(&mut self.reader)?;
1083 format::write_bytes(&mut payload, &key)?;
1084 let field = format::read_bytes(&mut self.reader)?;
1085 format::write_bytes(&mut payload, &field)?;
1086 let delta = format::read_i64(&mut self.reader)?;
1087 format::write_i64(&mut payload, delta)?;
1088 }
1089 TAG_INCRBY | TAG_DECRBY => {
1090 let key = format::read_bytes(&mut self.reader)?;
1091 format::write_bytes(&mut payload, &key)?;
1092 let delta = format::read_i64(&mut self.reader)?;
1093 format::write_i64(&mut payload, delta)?;
1094 }
1095 TAG_APPEND => {
1096 let key = format::read_bytes(&mut self.reader)?;
1097 format::write_bytes(&mut payload, &key)?;
1098 let value = format::read_bytes(&mut self.reader)?;
1099 format::write_bytes(&mut payload, &value)?;
1100 }
1101 TAG_RENAME => {
1102 let key = format::read_bytes(&mut self.reader)?;
1103 format::write_bytes(&mut payload, &key)?;
1104 let newkey = format::read_bytes(&mut self.reader)?;
1105 format::write_bytes(&mut payload, &newkey)?;
1106 }
1107 #[cfg(feature = "vector")]
1108 TAG_VADD => {
1109 let key = format::read_bytes(&mut self.reader)?;
1110 format::write_bytes(&mut payload, &key)?;
1111 let element = format::read_bytes(&mut self.reader)?;
1112 format::write_bytes(&mut payload, &element)?;
1113 let dim = format::read_u32(&mut self.reader)?;
1114 if dim > format::MAX_PERSISTED_VECTOR_DIMS {
1115 return Err(FormatError::InvalidData(format!(
1116 "AOF VADD dimension {dim} exceeds max {}",
1117 format::MAX_PERSISTED_VECTOR_DIMS
1118 )));
1119 }
1120 format::write_u32(&mut payload, dim)?;
1121 for _ in 0..dim {
1122 let v = format::read_f32(&mut self.reader)?;
1123 format::write_f32(&mut payload, v)?;
1124 }
1125 let metric = format::read_u8(&mut self.reader)?;
1126 format::write_u8(&mut payload, metric)?;
1127 let quantization = format::read_u8(&mut self.reader)?;
1128 format::write_u8(&mut payload, quantization)?;
1129 let connectivity = format::read_u32(&mut self.reader)?;
1130 format::write_u32(&mut payload, connectivity)?;
1131 let expansion_add = format::read_u32(&mut self.reader)?;
1132 format::write_u32(&mut payload, expansion_add)?;
1133 }
1134 #[cfg(feature = "vector")]
1135 TAG_VREM => {
1136 let key = format::read_bytes(&mut self.reader)?;
1137 format::write_bytes(&mut payload, &key)?;
1138 let element = format::read_bytes(&mut self.reader)?;
1139 format::write_bytes(&mut payload, &element)?;
1140 }
1141 #[cfg(feature = "protobuf")]
1142 TAG_PROTO_SET => {
1143 let key = format::read_bytes(&mut self.reader)?;
1144 format::write_bytes(&mut payload, &key)?;
1145 let type_name = format::read_bytes(&mut self.reader)?;
1146 format::write_bytes(&mut payload, &type_name)?;
1147 let data = format::read_bytes(&mut self.reader)?;
1148 format::write_bytes(&mut payload, &data)?;
1149 let expire_ms = format::read_i64(&mut self.reader)?;
1150 format::write_i64(&mut payload, expire_ms)?;
1151 }
1152 #[cfg(feature = "protobuf")]
1153 TAG_PROTO_REGISTER => {
1154 let name = format::read_bytes(&mut self.reader)?;
1155 format::write_bytes(&mut payload, &name)?;
1156 let descriptor = format::read_bytes(&mut self.reader)?;
1157 format::write_bytes(&mut payload, &descriptor)?;
1158 }
1159 _ => return Err(FormatError::UnknownTag(tag)),
1160 }
1161 let stored_crc = format::read_u32(&mut self.reader)?;
1162 Ok((payload, stored_crc))
1163 }
1164}
1165
1166fn open_persistence_file(path: &Path) -> Result<File, FormatError> {
1168 let mut opts = OpenOptions::new();
1169 opts.create(true).append(true);
1170 #[cfg(unix)]
1171 {
1172 use std::os::unix::fs::OpenOptionsExt;
1173 opts.mode(0o600);
1174 }
1175 Ok(opts.open(path)?)
1176}
1177
1178pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
1180 data_dir.join(format!("shard-{shard_id}.aof"))
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use super::*;
1186
1187 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1188
1189 fn temp_dir() -> tempfile::TempDir {
1190 tempfile::tempdir().expect("create temp dir")
1191 }
1192
1193 #[test]
1194 fn record_round_trip_set() -> Result {
1195 let rec = AofRecord::Set {
1196 key: "hello".into(),
1197 value: Bytes::from("world"),
1198 expire_ms: 5000,
1199 };
1200 let bytes = rec.to_bytes()?;
1201 let decoded = AofRecord::from_bytes(&bytes)?;
1202 assert_eq!(rec, decoded);
1203 Ok(())
1204 }
1205
1206 #[test]
1207 fn record_round_trip_del() -> Result {
1208 let rec = AofRecord::Del { key: "gone".into() };
1209 let bytes = rec.to_bytes()?;
1210 let decoded = AofRecord::from_bytes(&bytes)?;
1211 assert_eq!(rec, decoded);
1212 Ok(())
1213 }
1214
1215 #[test]
1216 fn record_round_trip_expire() -> Result {
1217 let rec = AofRecord::Expire {
1218 key: "ttl".into(),
1219 seconds: 300,
1220 };
1221 let bytes = rec.to_bytes()?;
1222 let decoded = AofRecord::from_bytes(&bytes)?;
1223 assert_eq!(rec, decoded);
1224 Ok(())
1225 }
1226
1227 #[test]
1228 fn set_with_no_expiry() -> Result {
1229 let rec = AofRecord::Set {
1230 key: "k".into(),
1231 value: Bytes::from("v"),
1232 expire_ms: -1,
1233 };
1234 let bytes = rec.to_bytes()?;
1235 let decoded = AofRecord::from_bytes(&bytes)?;
1236 assert_eq!(rec, decoded);
1237 Ok(())
1238 }
1239
1240 #[test]
1241 fn writer_reader_round_trip() -> Result {
1242 let dir = temp_dir();
1243 let path = dir.path().join("test.aof");
1244
1245 let records = vec![
1246 AofRecord::Set {
1247 key: "a".into(),
1248 value: Bytes::from("1"),
1249 expire_ms: -1,
1250 },
1251 AofRecord::Set {
1252 key: "b".into(),
1253 value: Bytes::from("2"),
1254 expire_ms: 10_000,
1255 },
1256 AofRecord::Del { key: "a".into() },
1257 AofRecord::Expire {
1258 key: "b".into(),
1259 seconds: 60,
1260 },
1261 ];
1262
1263 {
1265 let mut writer = AofWriter::open(&path)?;
1266 for rec in &records {
1267 writer.write_record(rec)?;
1268 }
1269 writer.sync()?;
1270 }
1271
1272 let mut reader = AofReader::open(&path)?;
1274 let mut got = Vec::new();
1275 while let Some(rec) = reader.read_record()? {
1276 got.push(rec);
1277 }
1278 assert_eq!(records, got);
1279 Ok(())
1280 }
1281
1282 #[test]
1283 fn empty_aof_returns_no_records() -> Result {
1284 let dir = temp_dir();
1285 let path = dir.path().join("empty.aof");
1286
1287 {
1289 let _writer = AofWriter::open(&path)?;
1290 }
1291
1292 let mut reader = AofReader::open(&path)?;
1293 assert!(reader.read_record()?.is_none());
1294 Ok(())
1295 }
1296
1297 #[test]
1298 fn truncated_record_treated_as_eof() -> Result {
1299 let dir = temp_dir();
1300 let path = dir.path().join("trunc.aof");
1301
1302 {
1304 let mut writer = AofWriter::open(&path)?;
1305 writer.write_record(&AofRecord::Set {
1306 key: "ok".into(),
1307 value: Bytes::from("good"),
1308 expire_ms: -1,
1309 })?;
1310 writer.flush()?;
1311 }
1312
1313 {
1315 let mut file = OpenOptions::new().append(true).open(&path)?;
1316 file.write_all(&[TAG_SET])?;
1317 }
1318
1319 let mut reader = AofReader::open(&path)?;
1320 let rec = reader.read_record()?.unwrap();
1322 assert!(matches!(rec, AofRecord::Set { .. }));
1323 assert!(reader.read_record()?.is_none());
1325 Ok(())
1326 }
1327
1328 #[test]
1329 fn corrupt_crc_detected() -> Result {
1330 let dir = temp_dir();
1331 let path = dir.path().join("corrupt.aof");
1332
1333 {
1334 let mut writer = AofWriter::open(&path)?;
1335 writer.write_record(&AofRecord::Set {
1336 key: "k".into(),
1337 value: Bytes::from("v"),
1338 expire_ms: -1,
1339 })?;
1340 writer.flush()?;
1341 }
1342
1343 let mut data = fs::read(&path)?;
1345 let last = data.len() - 1;
1346 data[last] ^= 0xFF;
1347 fs::write(&path, &data)?;
1348
1349 let mut reader = AofReader::open(&path)?;
1350 let err = reader.read_record().unwrap_err();
1351 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
1352 Ok(())
1353 }
1354
1355 #[test]
1356 fn missing_magic_is_error() {
1357 let dir = temp_dir();
1358 let path = dir.path().join("bad.aof");
1359 fs::write(&path, b"NOT_AOF_DATA").unwrap();
1360
1361 let err = AofReader::open(&path).unwrap_err();
1362 assert!(matches!(err, FormatError::InvalidMagic));
1363 }
1364
1365 #[test]
1366 fn truncate_resets_aof() -> Result {
1367 let dir = temp_dir();
1368 let path = dir.path().join("reset.aof");
1369
1370 {
1371 let mut writer = AofWriter::open(&path)?;
1372 writer.write_record(&AofRecord::Set {
1373 key: "old".into(),
1374 value: Bytes::from("data"),
1375 expire_ms: -1,
1376 })?;
1377 writer.truncate()?;
1378
1379 writer.write_record(&AofRecord::Set {
1381 key: "new".into(),
1382 value: Bytes::from("fresh"),
1383 expire_ms: -1,
1384 })?;
1385 writer.sync()?;
1386 }
1387
1388 let mut reader = AofReader::open(&path)?;
1389 let rec = reader.read_record()?.unwrap();
1390 match rec {
1391 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1392 other => panic!("expected Set, got {other:?}"),
1393 }
1394 assert!(reader.read_record()?.is_none());
1396 Ok(())
1397 }
1398
1399 #[test]
1400 fn record_round_trip_lpush() -> Result {
1401 let rec = AofRecord::LPush {
1402 key: "list".into(),
1403 values: vec![Bytes::from("a"), Bytes::from("b")],
1404 };
1405 let bytes = rec.to_bytes()?;
1406 let decoded = AofRecord::from_bytes(&bytes)?;
1407 assert_eq!(rec, decoded);
1408 Ok(())
1409 }
1410
1411 #[test]
1412 fn record_round_trip_rpush() -> Result {
1413 let rec = AofRecord::RPush {
1414 key: "list".into(),
1415 values: vec![Bytes::from("x")],
1416 };
1417 let bytes = rec.to_bytes()?;
1418 let decoded = AofRecord::from_bytes(&bytes)?;
1419 assert_eq!(rec, decoded);
1420 Ok(())
1421 }
1422
1423 #[test]
1424 fn record_round_trip_lpop() -> Result {
1425 let rec = AofRecord::LPop { key: "list".into() };
1426 let bytes = rec.to_bytes()?;
1427 let decoded = AofRecord::from_bytes(&bytes)?;
1428 assert_eq!(rec, decoded);
1429 Ok(())
1430 }
1431
1432 #[test]
1433 fn record_round_trip_rpop() -> Result {
1434 let rec = AofRecord::RPop { key: "list".into() };
1435 let bytes = rec.to_bytes()?;
1436 let decoded = AofRecord::from_bytes(&bytes)?;
1437 assert_eq!(rec, decoded);
1438 Ok(())
1439 }
1440
1441 #[test]
1442 fn writer_reader_round_trip_with_list_records() -> Result {
1443 let dir = temp_dir();
1444 let path = dir.path().join("list.aof");
1445
1446 let records = vec![
1447 AofRecord::LPush {
1448 key: "l".into(),
1449 values: vec![Bytes::from("a"), Bytes::from("b")],
1450 },
1451 AofRecord::RPush {
1452 key: "l".into(),
1453 values: vec![Bytes::from("c")],
1454 },
1455 AofRecord::LPop { key: "l".into() },
1456 AofRecord::RPop { key: "l".into() },
1457 ];
1458
1459 {
1460 let mut writer = AofWriter::open(&path)?;
1461 for rec in &records {
1462 writer.write_record(rec)?;
1463 }
1464 writer.sync()?;
1465 }
1466
1467 let mut reader = AofReader::open(&path)?;
1468 let mut got = Vec::new();
1469 while let Some(rec) = reader.read_record()? {
1470 got.push(rec);
1471 }
1472 assert_eq!(records, got);
1473 Ok(())
1474 }
1475
1476 #[test]
1477 fn record_round_trip_zadd() -> Result {
1478 let rec = AofRecord::ZAdd {
1479 key: "board".into(),
1480 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
1481 };
1482 let bytes = rec.to_bytes()?;
1483 let decoded = AofRecord::from_bytes(&bytes)?;
1484 assert_eq!(rec, decoded);
1485 Ok(())
1486 }
1487
1488 #[test]
1489 fn record_round_trip_zrem() -> Result {
1490 let rec = AofRecord::ZRem {
1491 key: "board".into(),
1492 members: vec!["alice".into(), "bob".into()],
1493 };
1494 let bytes = rec.to_bytes()?;
1495 let decoded = AofRecord::from_bytes(&bytes)?;
1496 assert_eq!(rec, decoded);
1497 Ok(())
1498 }
1499
1500 #[test]
1501 fn writer_reader_round_trip_with_sorted_set_records() -> Result {
1502 let dir = temp_dir();
1503 let path = dir.path().join("zset.aof");
1504
1505 let records = vec![
1506 AofRecord::ZAdd {
1507 key: "board".into(),
1508 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
1509 },
1510 AofRecord::ZRem {
1511 key: "board".into(),
1512 members: vec!["alice".into()],
1513 },
1514 ];
1515
1516 {
1517 let mut writer = AofWriter::open(&path)?;
1518 for rec in &records {
1519 writer.write_record(rec)?;
1520 }
1521 writer.sync()?;
1522 }
1523
1524 let mut reader = AofReader::open(&path)?;
1525 let mut got = Vec::new();
1526 while let Some(rec) = reader.read_record()? {
1527 got.push(rec);
1528 }
1529 assert_eq!(records, got);
1530 Ok(())
1531 }
1532
1533 #[test]
1534 fn record_round_trip_persist() -> Result {
1535 let rec = AofRecord::Persist {
1536 key: "mykey".into(),
1537 };
1538 let bytes = rec.to_bytes()?;
1539 let decoded = AofRecord::from_bytes(&bytes)?;
1540 assert_eq!(rec, decoded);
1541 Ok(())
1542 }
1543
1544 #[test]
1545 fn record_round_trip_pexpire() -> Result {
1546 let rec = AofRecord::Pexpire {
1547 key: "mykey".into(),
1548 milliseconds: 5000,
1549 };
1550 let bytes = rec.to_bytes()?;
1551 let decoded = AofRecord::from_bytes(&bytes)?;
1552 assert_eq!(rec, decoded);
1553 Ok(())
1554 }
1555
1556 #[test]
1557 fn record_round_trip_incr() -> Result {
1558 let rec = AofRecord::Incr {
1559 key: "counter".into(),
1560 };
1561 let bytes = rec.to_bytes()?;
1562 let decoded = AofRecord::from_bytes(&bytes)?;
1563 assert_eq!(rec, decoded);
1564 Ok(())
1565 }
1566
1567 #[test]
1568 fn record_round_trip_decr() -> Result {
1569 let rec = AofRecord::Decr {
1570 key: "counter".into(),
1571 };
1572 let bytes = rec.to_bytes()?;
1573 let decoded = AofRecord::from_bytes(&bytes)?;
1574 assert_eq!(rec, decoded);
1575 Ok(())
1576 }
1577
1578 #[test]
1579 fn writer_reader_round_trip_with_persist_pexpire() -> Result {
1580 let dir = temp_dir();
1581 let path = dir.path().join("persist_pexpire.aof");
1582
1583 let records = vec![
1584 AofRecord::Set {
1585 key: "k".into(),
1586 value: Bytes::from("v"),
1587 expire_ms: 5000,
1588 },
1589 AofRecord::Persist { key: "k".into() },
1590 AofRecord::Pexpire {
1591 key: "k".into(),
1592 milliseconds: 3000,
1593 },
1594 ];
1595
1596 {
1597 let mut writer = AofWriter::open(&path)?;
1598 for rec in &records {
1599 writer.write_record(rec)?;
1600 }
1601 writer.sync()?;
1602 }
1603
1604 let mut reader = AofReader::open(&path)?;
1605 let mut got = Vec::new();
1606 while let Some(rec) = reader.read_record()? {
1607 got.push(rec);
1608 }
1609 assert_eq!(records, got);
1610 Ok(())
1611 }
1612
1613 #[test]
1614 fn aof_path_format() {
1615 let p = aof_path(Path::new("/data"), 3);
1616 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1617 }
1618
1619 #[test]
1620 fn record_round_trip_hset() -> Result {
1621 let rec = AofRecord::HSet {
1622 key: "hash".into(),
1623 fields: vec![
1624 ("f1".into(), Bytes::from("v1")),
1625 ("f2".into(), Bytes::from("v2")),
1626 ],
1627 };
1628 let bytes = rec.to_bytes()?;
1629 let decoded = AofRecord::from_bytes(&bytes)?;
1630 assert_eq!(rec, decoded);
1631 Ok(())
1632 }
1633
1634 #[test]
1635 fn record_round_trip_hdel() -> Result {
1636 let rec = AofRecord::HDel {
1637 key: "hash".into(),
1638 fields: vec!["f1".into(), "f2".into()],
1639 };
1640 let bytes = rec.to_bytes()?;
1641 let decoded = AofRecord::from_bytes(&bytes)?;
1642 assert_eq!(rec, decoded);
1643 Ok(())
1644 }
1645
1646 #[test]
1647 fn record_round_trip_hincrby() -> Result {
1648 let rec = AofRecord::HIncrBy {
1649 key: "hash".into(),
1650 field: "counter".into(),
1651 delta: -42,
1652 };
1653 let bytes = rec.to_bytes()?;
1654 let decoded = AofRecord::from_bytes(&bytes)?;
1655 assert_eq!(rec, decoded);
1656 Ok(())
1657 }
1658
1659 #[test]
1660 fn record_round_trip_sadd() -> Result {
1661 let rec = AofRecord::SAdd {
1662 key: "set".into(),
1663 members: vec!["m1".into(), "m2".into(), "m3".into()],
1664 };
1665 let bytes = rec.to_bytes()?;
1666 let decoded = AofRecord::from_bytes(&bytes)?;
1667 assert_eq!(rec, decoded);
1668 Ok(())
1669 }
1670
1671 #[test]
1672 fn record_round_trip_srem() -> Result {
1673 let rec = AofRecord::SRem {
1674 key: "set".into(),
1675 members: vec!["m1".into()],
1676 };
1677 let bytes = rec.to_bytes()?;
1678 let decoded = AofRecord::from_bytes(&bytes)?;
1679 assert_eq!(rec, decoded);
1680 Ok(())
1681 }
1682
1683 #[cfg(feature = "vector")]
1684 #[test]
1685 fn record_round_trip_vadd() -> Result {
1686 let rec = AofRecord::VAdd {
1687 key: "embeddings".into(),
1688 element: "doc1".into(),
1689 vector: vec![0.1, 0.2, 0.3],
1690 metric: 0, quantization: 0, connectivity: 16,
1693 expansion_add: 64,
1694 };
1695 let bytes = rec.to_bytes()?;
1696 let decoded = AofRecord::from_bytes(&bytes)?;
1697 assert_eq!(rec, decoded);
1698 Ok(())
1699 }
1700
1701 #[cfg(feature = "vector")]
1702 #[test]
1703 fn record_round_trip_vadd_high_dim() -> Result {
1704 let rec = AofRecord::VAdd {
1705 key: "vecs".into(),
1706 element: "e".into(),
1707 vector: vec![0.0; 1536], metric: 1, quantization: 1, connectivity: 32,
1711 expansion_add: 128,
1712 };
1713 let bytes = rec.to_bytes()?;
1714 let decoded = AofRecord::from_bytes(&bytes)?;
1715 assert_eq!(rec, decoded);
1716 Ok(())
1717 }
1718
1719 #[cfg(feature = "vector")]
1720 #[test]
1721 fn record_round_trip_vrem() -> Result {
1722 let rec = AofRecord::VRem {
1723 key: "embeddings".into(),
1724 element: "doc1".into(),
1725 };
1726 let bytes = rec.to_bytes()?;
1727 let decoded = AofRecord::from_bytes(&bytes)?;
1728 assert_eq!(rec, decoded);
1729 Ok(())
1730 }
1731
1732 #[cfg(feature = "encryption")]
1733 mod encrypted {
1734 use super::*;
1735 use crate::encryption::EncryptionKey;
1736
1737 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
1738
1739 fn test_key() -> EncryptionKey {
1740 EncryptionKey::from_bytes([0x42; 32])
1741 }
1742
1743 #[test]
1744 fn encrypted_writer_reader_round_trip() -> Result {
1745 let dir = temp_dir();
1746 let path = dir.path().join("enc.aof");
1747 let key = test_key();
1748
1749 let records = vec![
1750 AofRecord::Set {
1751 key: "a".into(),
1752 value: Bytes::from("1"),
1753 expire_ms: -1,
1754 },
1755 AofRecord::Del { key: "a".into() },
1756 AofRecord::LPush {
1757 key: "list".into(),
1758 values: vec![Bytes::from("x"), Bytes::from("y")],
1759 },
1760 AofRecord::ZAdd {
1761 key: "zs".into(),
1762 members: vec![(1.0, "m".into())],
1763 },
1764 ];
1765
1766 {
1767 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1768 for rec in &records {
1769 writer.write_record(rec)?;
1770 }
1771 writer.sync()?;
1772 }
1773
1774 let mut reader = AofReader::open_encrypted(&path, key)?;
1775 let mut got = Vec::new();
1776 while let Some(rec) = reader.read_record()? {
1777 got.push(rec);
1778 }
1779 assert_eq!(records, got);
1780 Ok(())
1781 }
1782
1783 #[test]
1784 fn encrypted_aof_wrong_key_fails() -> Result {
1785 let dir = temp_dir();
1786 let path = dir.path().join("enc_bad.aof");
1787 let key = test_key();
1788 let wrong_key = EncryptionKey::from_bytes([0xFF; 32]);
1789
1790 {
1791 let mut writer = AofWriter::open_encrypted(&path, key)?;
1792 writer.write_record(&AofRecord::Set {
1793 key: "k".into(),
1794 value: Bytes::from("v"),
1795 expire_ms: -1,
1796 })?;
1797 writer.sync()?;
1798 }
1799
1800 let mut reader = AofReader::open_encrypted(&path, wrong_key)?;
1801 let err = reader.read_record().unwrap_err();
1802 assert!(matches!(err, FormatError::DecryptionFailed));
1803 Ok(())
1804 }
1805
1806 #[test]
1807 fn v2_file_readable_with_encryption_key() -> Result {
1808 let dir = temp_dir();
1809 let path = dir.path().join("v2.aof");
1810 let key = test_key();
1811
1812 {
1814 let mut writer = AofWriter::open(&path)?;
1815 writer.write_record(&AofRecord::Set {
1816 key: "k".into(),
1817 value: Bytes::from("v"),
1818 expire_ms: -1,
1819 })?;
1820 writer.sync()?;
1821 }
1822
1823 let mut reader = AofReader::open_encrypted(&path, key)?;
1825 let rec = reader.read_record()?.unwrap();
1826 assert!(matches!(rec, AofRecord::Set { .. }));
1827 Ok(())
1828 }
1829
1830 #[test]
1831 fn v3_file_without_key_returns_error() -> Result {
1832 let dir = temp_dir();
1833 let path = dir.path().join("v3_nokey.aof");
1834 let key = test_key();
1835
1836 {
1838 let mut writer = AofWriter::open_encrypted(&path, key)?;
1839 writer.write_record(&AofRecord::Set {
1840 key: "k".into(),
1841 value: Bytes::from("v"),
1842 expire_ms: -1,
1843 })?;
1844 writer.sync()?;
1845 }
1846
1847 let err = AofReader::open(&path).unwrap_err();
1849 assert!(matches!(err, FormatError::EncryptionRequired));
1850 Ok(())
1851 }
1852
1853 #[test]
1854 fn encrypted_truncate_preserves_encryption() -> Result {
1855 let dir = temp_dir();
1856 let path = dir.path().join("enc_trunc.aof");
1857 let key = test_key();
1858
1859 {
1860 let mut writer = AofWriter::open_encrypted(&path, key.clone())?;
1861 writer.write_record(&AofRecord::Set {
1862 key: "old".into(),
1863 value: Bytes::from("data"),
1864 expire_ms: -1,
1865 })?;
1866 writer.truncate()?;
1867
1868 writer.write_record(&AofRecord::Set {
1869 key: "new".into(),
1870 value: Bytes::from("fresh"),
1871 expire_ms: -1,
1872 })?;
1873 writer.sync()?;
1874 }
1875
1876 let mut reader = AofReader::open_encrypted(&path, key)?;
1877 let rec = reader.read_record()?.unwrap();
1878 match rec {
1879 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
1880 other => panic!("expected Set, got {other:?}"),
1881 }
1882 assert!(reader.read_record()?.is_none());
1883 Ok(())
1884 }
1885 }
1886}