1use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29 let bytes = format::read_bytes(r)?;
30 String::from_utf8(bytes).map_err(|_| {
31 FormatError::Io(io::Error::new(
32 io::ErrorKind::InvalidData,
33 format!("{field} is not valid utf-8"),
34 ))
35 })
36}
37
38const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57const TAG_INCRBY: u8 = 19;
58const TAG_DECRBY: u8 = 20;
59const TAG_APPEND: u8 = 21;
60const TAG_RENAME: u8 = 22;
61
62#[derive(Debug, Clone, PartialEq)]
64pub enum AofRecord {
65 Set {
67 key: String,
68 value: Bytes,
69 expire_ms: i64,
70 },
71 Del { key: String },
73 Expire { key: String, seconds: u64 },
75 LPush { key: String, values: Vec<Bytes> },
77 RPush { key: String, values: Vec<Bytes> },
79 LPop { key: String },
81 RPop { key: String },
83 ZAdd {
85 key: String,
86 members: Vec<(f64, String)>,
87 },
88 ZRem { key: String, members: Vec<String> },
90 Persist { key: String },
92 Pexpire { key: String, milliseconds: u64 },
94 Incr { key: String },
96 Decr { key: String },
98 HSet {
100 key: String,
101 fields: Vec<(String, Bytes)>,
102 },
103 HDel { key: String, fields: Vec<String> },
105 HIncrBy {
107 key: String,
108 field: String,
109 delta: i64,
110 },
111 SAdd { key: String, members: Vec<String> },
113 SRem { key: String, members: Vec<String> },
115 IncrBy { key: String, delta: i64 },
117 DecrBy { key: String, delta: i64 },
119 Append { key: String, value: Bytes },
121 Rename { key: String, newkey: String },
123}
124
125impl AofRecord {
126 fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
128 let mut buf = Vec::new();
129 match self {
130 AofRecord::Set {
131 key,
132 value,
133 expire_ms,
134 } => {
135 format::write_u8(&mut buf, TAG_SET)?;
136 format::write_bytes(&mut buf, key.as_bytes())?;
137 format::write_bytes(&mut buf, value)?;
138 format::write_i64(&mut buf, *expire_ms)?;
139 }
140 AofRecord::Del { key } => {
141 format::write_u8(&mut buf, TAG_DEL)?;
142 format::write_bytes(&mut buf, key.as_bytes())?;
143 }
144 AofRecord::Expire { key, seconds } => {
145 format::write_u8(&mut buf, TAG_EXPIRE)?;
146 format::write_bytes(&mut buf, key.as_bytes())?;
147 format::write_i64(&mut buf, *seconds as i64)?;
148 }
149 AofRecord::LPush { key, values } => {
150 format::write_u8(&mut buf, TAG_LPUSH)?;
151 format::write_bytes(&mut buf, key.as_bytes())?;
152 format::write_u32(&mut buf, values.len() as u32)?;
153 for v in values {
154 format::write_bytes(&mut buf, v)?;
155 }
156 }
157 AofRecord::RPush { key, values } => {
158 format::write_u8(&mut buf, TAG_RPUSH)?;
159 format::write_bytes(&mut buf, key.as_bytes())?;
160 format::write_u32(&mut buf, values.len() as u32)?;
161 for v in values {
162 format::write_bytes(&mut buf, v)?;
163 }
164 }
165 AofRecord::LPop { key } => {
166 format::write_u8(&mut buf, TAG_LPOP)?;
167 format::write_bytes(&mut buf, key.as_bytes())?;
168 }
169 AofRecord::RPop { key } => {
170 format::write_u8(&mut buf, TAG_RPOP)?;
171 format::write_bytes(&mut buf, key.as_bytes())?;
172 }
173 AofRecord::ZAdd { key, members } => {
174 format::write_u8(&mut buf, TAG_ZADD)?;
175 format::write_bytes(&mut buf, key.as_bytes())?;
176 format::write_u32(&mut buf, members.len() as u32)?;
177 for (score, member) in members {
178 format::write_f64(&mut buf, *score)?;
179 format::write_bytes(&mut buf, member.as_bytes())?;
180 }
181 }
182 AofRecord::ZRem { key, members } => {
183 format::write_u8(&mut buf, TAG_ZREM)?;
184 format::write_bytes(&mut buf, key.as_bytes())?;
185 format::write_u32(&mut buf, members.len() as u32)?;
186 for member in members {
187 format::write_bytes(&mut buf, member.as_bytes())?;
188 }
189 }
190 AofRecord::Persist { key } => {
191 format::write_u8(&mut buf, TAG_PERSIST)?;
192 format::write_bytes(&mut buf, key.as_bytes())?;
193 }
194 AofRecord::Pexpire { key, milliseconds } => {
195 format::write_u8(&mut buf, TAG_PEXPIRE)?;
196 format::write_bytes(&mut buf, key.as_bytes())?;
197 format::write_i64(&mut buf, *milliseconds as i64)?;
198 }
199 AofRecord::Incr { key } => {
200 format::write_u8(&mut buf, TAG_INCR)?;
201 format::write_bytes(&mut buf, key.as_bytes())?;
202 }
203 AofRecord::Decr { key } => {
204 format::write_u8(&mut buf, TAG_DECR)?;
205 format::write_bytes(&mut buf, key.as_bytes())?;
206 }
207 AofRecord::HSet { key, fields } => {
208 format::write_u8(&mut buf, TAG_HSET)?;
209 format::write_bytes(&mut buf, key.as_bytes())?;
210 format::write_u32(&mut buf, fields.len() as u32)?;
211 for (field, value) in fields {
212 format::write_bytes(&mut buf, field.as_bytes())?;
213 format::write_bytes(&mut buf, value)?;
214 }
215 }
216 AofRecord::HDel { key, fields } => {
217 format::write_u8(&mut buf, TAG_HDEL)?;
218 format::write_bytes(&mut buf, key.as_bytes())?;
219 format::write_u32(&mut buf, fields.len() as u32)?;
220 for field in fields {
221 format::write_bytes(&mut buf, field.as_bytes())?;
222 }
223 }
224 AofRecord::HIncrBy { key, field, delta } => {
225 format::write_u8(&mut buf, TAG_HINCRBY)?;
226 format::write_bytes(&mut buf, key.as_bytes())?;
227 format::write_bytes(&mut buf, field.as_bytes())?;
228 format::write_i64(&mut buf, *delta)?;
229 }
230 AofRecord::SAdd { key, members } => {
231 format::write_u8(&mut buf, TAG_SADD)?;
232 format::write_bytes(&mut buf, key.as_bytes())?;
233 format::write_u32(&mut buf, members.len() as u32)?;
234 for member in members {
235 format::write_bytes(&mut buf, member.as_bytes())?;
236 }
237 }
238 AofRecord::SRem { key, members } => {
239 format::write_u8(&mut buf, TAG_SREM)?;
240 format::write_bytes(&mut buf, key.as_bytes())?;
241 format::write_u32(&mut buf, members.len() as u32)?;
242 for member in members {
243 format::write_bytes(&mut buf, member.as_bytes())?;
244 }
245 }
246 AofRecord::IncrBy { key, delta } => {
247 format::write_u8(&mut buf, TAG_INCRBY)?;
248 format::write_bytes(&mut buf, key.as_bytes())?;
249 format::write_i64(&mut buf, *delta)?;
250 }
251 AofRecord::DecrBy { key, delta } => {
252 format::write_u8(&mut buf, TAG_DECRBY)?;
253 format::write_bytes(&mut buf, key.as_bytes())?;
254 format::write_i64(&mut buf, *delta)?;
255 }
256 AofRecord::Append { key, value } => {
257 format::write_u8(&mut buf, TAG_APPEND)?;
258 format::write_bytes(&mut buf, key.as_bytes())?;
259 format::write_bytes(&mut buf, value)?;
260 }
261 AofRecord::Rename { key, newkey } => {
262 format::write_u8(&mut buf, TAG_RENAME)?;
263 format::write_bytes(&mut buf, key.as_bytes())?;
264 format::write_bytes(&mut buf, newkey.as_bytes())?;
265 }
266 }
267 Ok(buf)
268 }
269
270 fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
272 let mut cursor = io::Cursor::new(data);
273 let tag = format::read_u8(&mut cursor)?;
274 match tag {
275 TAG_SET => {
276 let key = read_string(&mut cursor, "key")?;
277 let value = format::read_bytes(&mut cursor)?;
278 let expire_ms = format::read_i64(&mut cursor)?;
279 Ok(AofRecord::Set {
280 key,
281 value: Bytes::from(value),
282 expire_ms,
283 })
284 }
285 TAG_DEL => {
286 let key = read_string(&mut cursor, "key")?;
287 Ok(AofRecord::Del { key })
288 }
289 TAG_EXPIRE => {
290 let key = read_string(&mut cursor, "key")?;
291 let seconds = format::read_i64(&mut cursor)? as u64;
292 Ok(AofRecord::Expire { key, seconds })
293 }
294 TAG_LPUSH | TAG_RPUSH => {
295 let key = read_string(&mut cursor, "key")?;
296 let count = format::read_u32(&mut cursor)?;
297 let mut values = Vec::with_capacity(count as usize);
298 for _ in 0..count {
299 values.push(Bytes::from(format::read_bytes(&mut cursor)?));
300 }
301 if tag == TAG_LPUSH {
302 Ok(AofRecord::LPush { key, values })
303 } else {
304 Ok(AofRecord::RPush { key, values })
305 }
306 }
307 TAG_LPOP => {
308 let key = read_string(&mut cursor, "key")?;
309 Ok(AofRecord::LPop { key })
310 }
311 TAG_RPOP => {
312 let key = read_string(&mut cursor, "key")?;
313 Ok(AofRecord::RPop { key })
314 }
315 TAG_ZADD => {
316 let key = read_string(&mut cursor, "key")?;
317 let count = format::read_u32(&mut cursor)?;
318 let mut members = Vec::with_capacity(count as usize);
319 for _ in 0..count {
320 let score = format::read_f64(&mut cursor)?;
321 let member = read_string(&mut cursor, "member")?;
322 members.push((score, member));
323 }
324 Ok(AofRecord::ZAdd { key, members })
325 }
326 TAG_ZREM => {
327 let key = read_string(&mut cursor, "key")?;
328 let count = format::read_u32(&mut cursor)?;
329 let mut members = Vec::with_capacity(count as usize);
330 for _ in 0..count {
331 members.push(read_string(&mut cursor, "member")?);
332 }
333 Ok(AofRecord::ZRem { key, members })
334 }
335 TAG_PERSIST => {
336 let key = read_string(&mut cursor, "key")?;
337 Ok(AofRecord::Persist { key })
338 }
339 TAG_PEXPIRE => {
340 let key = read_string(&mut cursor, "key")?;
341 let milliseconds = format::read_i64(&mut cursor)? as u64;
342 Ok(AofRecord::Pexpire { key, milliseconds })
343 }
344 TAG_INCR => {
345 let key = read_string(&mut cursor, "key")?;
346 Ok(AofRecord::Incr { key })
347 }
348 TAG_DECR => {
349 let key = read_string(&mut cursor, "key")?;
350 Ok(AofRecord::Decr { key })
351 }
352 TAG_HSET => {
353 let key = read_string(&mut cursor, "key")?;
354 let count = format::read_u32(&mut cursor)?;
355 let mut fields = Vec::with_capacity(count as usize);
356 for _ in 0..count {
357 let field = read_string(&mut cursor, "field")?;
358 let value = Bytes::from(format::read_bytes(&mut cursor)?);
359 fields.push((field, value));
360 }
361 Ok(AofRecord::HSet { key, fields })
362 }
363 TAG_HDEL => {
364 let key = read_string(&mut cursor, "key")?;
365 let count = format::read_u32(&mut cursor)?;
366 let mut fields = Vec::with_capacity(count as usize);
367 for _ in 0..count {
368 fields.push(read_string(&mut cursor, "field")?);
369 }
370 Ok(AofRecord::HDel { key, fields })
371 }
372 TAG_HINCRBY => {
373 let key = read_string(&mut cursor, "key")?;
374 let field = read_string(&mut cursor, "field")?;
375 let delta = format::read_i64(&mut cursor)?;
376 Ok(AofRecord::HIncrBy { key, field, delta })
377 }
378 TAG_SADD => {
379 let key = read_string(&mut cursor, "key")?;
380 let count = format::read_u32(&mut cursor)?;
381 let mut members = Vec::with_capacity(count as usize);
382 for _ in 0..count {
383 members.push(read_string(&mut cursor, "member")?);
384 }
385 Ok(AofRecord::SAdd { key, members })
386 }
387 TAG_SREM => {
388 let key = read_string(&mut cursor, "key")?;
389 let count = format::read_u32(&mut cursor)?;
390 let mut members = Vec::with_capacity(count as usize);
391 for _ in 0..count {
392 members.push(read_string(&mut cursor, "member")?);
393 }
394 Ok(AofRecord::SRem { key, members })
395 }
396 TAG_INCRBY => {
397 let key = read_string(&mut cursor, "key")?;
398 let delta = format::read_i64(&mut cursor)?;
399 Ok(AofRecord::IncrBy { key, delta })
400 }
401 TAG_DECRBY => {
402 let key = read_string(&mut cursor, "key")?;
403 let delta = format::read_i64(&mut cursor)?;
404 Ok(AofRecord::DecrBy { key, delta })
405 }
406 TAG_APPEND => {
407 let key = read_string(&mut cursor, "key")?;
408 let value = Bytes::from(format::read_bytes(&mut cursor)?);
409 Ok(AofRecord::Append { key, value })
410 }
411 TAG_RENAME => {
412 let key = read_string(&mut cursor, "key")?;
413 let newkey = read_string(&mut cursor, "newkey")?;
414 Ok(AofRecord::Rename { key, newkey })
415 }
416 _ => Err(FormatError::UnknownTag(tag)),
417 }
418 }
419}
420
421#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
423pub enum FsyncPolicy {
424 Always,
426 #[default]
428 EverySec,
429 No,
431}
432
433pub struct AofWriter {
435 writer: BufWriter<File>,
436 path: PathBuf,
437}
438
439impl AofWriter {
440 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
443 let path = path.into();
444 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
445
446 let mut opts = OpenOptions::new();
447 opts.create(true).append(true);
448 #[cfg(unix)]
449 {
450 use std::os::unix::fs::OpenOptionsExt;
451 opts.mode(0o600);
452 }
453 let file = opts.open(&path)?;
454 let mut writer = BufWriter::new(file);
455
456 if !exists {
457 format::write_header(&mut writer, format::AOF_MAGIC)?;
458 writer.flush()?;
459 }
460
461 Ok(Self { writer, path })
462 }
463
464 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
466 let payload = record.to_bytes()?;
467 let checksum = format::crc32(&payload);
468 self.writer.write_all(&payload)?;
469 format::write_u32(&mut self.writer, checksum)?;
470 Ok(())
471 }
472
473 pub fn flush(&mut self) -> Result<(), FormatError> {
475 self.writer.flush()?;
476 Ok(())
477 }
478
479 pub fn sync(&mut self) -> Result<(), FormatError> {
481 self.writer.flush()?;
482 self.writer.get_ref().sync_all()?;
483 Ok(())
484 }
485
486 pub fn path(&self) -> &Path {
488 &self.path
489 }
490
491 pub fn truncate(&mut self) -> Result<(), FormatError> {
494 self.writer.flush()?;
496
497 let mut opts = OpenOptions::new();
499 opts.create(true).write(true).truncate(true);
500 #[cfg(unix)]
501 {
502 use std::os::unix::fs::OpenOptionsExt;
503 opts.mode(0o600);
504 }
505 let file = opts.open(&self.path)?;
506 let mut writer = BufWriter::new(file);
507 format::write_header(&mut writer, format::AOF_MAGIC)?;
508 writer.flush()?;
509 writer.get_ref().sync_all()?;
511 self.writer = writer;
512 Ok(())
513 }
514}
515
516#[derive(Debug)]
518pub struct AofReader {
519 reader: BufReader<File>,
520}
521
522impl AofReader {
523 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
525 let file = File::open(path.as_ref())?;
526 let mut reader = BufReader::new(file);
527 let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
528 Ok(Self { reader })
529 }
530
531 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
537 let tag = match format::read_u8(&mut self.reader) {
539 Ok(t) => t,
540 Err(FormatError::UnexpectedEof) => return Ok(None),
541 Err(e) => return Err(e),
542 };
543
544 let record_result = self.read_payload_for_tag(tag);
547 match record_result {
548 Ok((payload, stored_crc)) => {
549 let mut full = Vec::with_capacity(1 + payload.len());
551 full.push(tag);
552 full.extend_from_slice(&payload);
553 format::verify_crc32(&full, stored_crc)?;
554 AofRecord::from_bytes(&full).map(Some)
555 }
556 Err(FormatError::UnexpectedEof) => Ok(None),
558 Err(e) => Err(e),
559 }
560 }
561
562 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
564 let mut payload = Vec::new();
565 match tag {
566 TAG_SET => {
567 let key = format::read_bytes(&mut self.reader)?;
569 format::write_bytes(&mut payload, &key)?;
570 let value = format::read_bytes(&mut self.reader)?;
571 format::write_bytes(&mut payload, &value)?;
572 let expire_ms = format::read_i64(&mut self.reader)?;
573 format::write_i64(&mut payload, expire_ms)?;
574 }
575 TAG_DEL => {
576 let key = format::read_bytes(&mut self.reader)?;
577 format::write_bytes(&mut payload, &key)?;
578 }
579 TAG_EXPIRE => {
580 let key = format::read_bytes(&mut self.reader)?;
581 format::write_bytes(&mut payload, &key)?;
582 let seconds = format::read_i64(&mut self.reader)?;
583 format::write_i64(&mut payload, seconds)?;
584 }
585 TAG_LPUSH | TAG_RPUSH => {
586 let key = format::read_bytes(&mut self.reader)?;
587 format::write_bytes(&mut payload, &key)?;
588 let count = format::read_u32(&mut self.reader)?;
589 format::write_u32(&mut payload, count)?;
590 for _ in 0..count {
591 let val = format::read_bytes(&mut self.reader)?;
592 format::write_bytes(&mut payload, &val)?;
593 }
594 }
595 TAG_LPOP | TAG_RPOP => {
596 let key = format::read_bytes(&mut self.reader)?;
597 format::write_bytes(&mut payload, &key)?;
598 }
599 TAG_ZADD => {
600 let key = format::read_bytes(&mut self.reader)?;
601 format::write_bytes(&mut payload, &key)?;
602 let count = format::read_u32(&mut self.reader)?;
603 format::write_u32(&mut payload, count)?;
604 for _ in 0..count {
605 let score = format::read_f64(&mut self.reader)?;
606 format::write_f64(&mut payload, score)?;
607 let member = format::read_bytes(&mut self.reader)?;
608 format::write_bytes(&mut payload, &member)?;
609 }
610 }
611 TAG_ZREM => {
612 let key = format::read_bytes(&mut self.reader)?;
613 format::write_bytes(&mut payload, &key)?;
614 let count = format::read_u32(&mut self.reader)?;
615 format::write_u32(&mut payload, count)?;
616 for _ in 0..count {
617 let member = format::read_bytes(&mut self.reader)?;
618 format::write_bytes(&mut payload, &member)?;
619 }
620 }
621 TAG_PERSIST => {
622 let key = format::read_bytes(&mut self.reader)?;
623 format::write_bytes(&mut payload, &key)?;
624 }
625 TAG_PEXPIRE => {
626 let key = format::read_bytes(&mut self.reader)?;
627 format::write_bytes(&mut payload, &key)?;
628 let millis = format::read_i64(&mut self.reader)?;
629 format::write_i64(&mut payload, millis)?;
630 }
631 TAG_INCR | TAG_DECR => {
632 let key = format::read_bytes(&mut self.reader)?;
633 format::write_bytes(&mut payload, &key)?;
634 }
635 _ => return Err(FormatError::UnknownTag(tag)),
636 }
637 let stored_crc = format::read_u32(&mut self.reader)?;
638 Ok((payload, stored_crc))
639 }
640}
641
642pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
644 data_dir.join(format!("shard-{shard_id}.aof"))
645}
646
647#[cfg(test)]
648mod tests {
649 use super::*;
650 fn temp_dir() -> tempfile::TempDir {
651 tempfile::tempdir().expect("create temp dir")
652 }
653
654 #[test]
655 fn record_round_trip_set() {
656 let rec = AofRecord::Set {
657 key: "hello".into(),
658 value: Bytes::from("world"),
659 expire_ms: 5000,
660 };
661 let bytes = rec.to_bytes().unwrap();
662 let decoded = AofRecord::from_bytes(&bytes).unwrap();
663 assert_eq!(rec, decoded);
664 }
665
666 #[test]
667 fn record_round_trip_del() {
668 let rec = AofRecord::Del { key: "gone".into() };
669 let bytes = rec.to_bytes().unwrap();
670 let decoded = AofRecord::from_bytes(&bytes).unwrap();
671 assert_eq!(rec, decoded);
672 }
673
674 #[test]
675 fn record_round_trip_expire() {
676 let rec = AofRecord::Expire {
677 key: "ttl".into(),
678 seconds: 300,
679 };
680 let bytes = rec.to_bytes().unwrap();
681 let decoded = AofRecord::from_bytes(&bytes).unwrap();
682 assert_eq!(rec, decoded);
683 }
684
685 #[test]
686 fn set_with_no_expiry() {
687 let rec = AofRecord::Set {
688 key: "k".into(),
689 value: Bytes::from("v"),
690 expire_ms: -1,
691 };
692 let bytes = rec.to_bytes().unwrap();
693 let decoded = AofRecord::from_bytes(&bytes).unwrap();
694 assert_eq!(rec, decoded);
695 }
696
697 #[test]
698 fn writer_reader_round_trip() {
699 let dir = temp_dir();
700 let path = dir.path().join("test.aof");
701
702 let records = vec![
703 AofRecord::Set {
704 key: "a".into(),
705 value: Bytes::from("1"),
706 expire_ms: -1,
707 },
708 AofRecord::Set {
709 key: "b".into(),
710 value: Bytes::from("2"),
711 expire_ms: 10_000,
712 },
713 AofRecord::Del { key: "a".into() },
714 AofRecord::Expire {
715 key: "b".into(),
716 seconds: 60,
717 },
718 ];
719
720 {
722 let mut writer = AofWriter::open(&path).unwrap();
723 for rec in &records {
724 writer.write_record(rec).unwrap();
725 }
726 writer.sync().unwrap();
727 }
728
729 let mut reader = AofReader::open(&path).unwrap();
731 let mut got = Vec::new();
732 while let Some(rec) = reader.read_record().unwrap() {
733 got.push(rec);
734 }
735 assert_eq!(records, got);
736 }
737
738 #[test]
739 fn empty_aof_returns_no_records() {
740 let dir = temp_dir();
741 let path = dir.path().join("empty.aof");
742
743 {
745 let _writer = AofWriter::open(&path).unwrap();
746 }
747
748 let mut reader = AofReader::open(&path).unwrap();
749 assert!(reader.read_record().unwrap().is_none());
750 }
751
752 #[test]
753 fn truncated_record_treated_as_eof() {
754 let dir = temp_dir();
755 let path = dir.path().join("trunc.aof");
756
757 {
759 let mut writer = AofWriter::open(&path).unwrap();
760 writer
761 .write_record(&AofRecord::Set {
762 key: "ok".into(),
763 value: Bytes::from("good"),
764 expire_ms: -1,
765 })
766 .unwrap();
767 writer.flush().unwrap();
768 }
769
770 {
772 let mut file = OpenOptions::new().append(true).open(&path).unwrap();
773 file.write_all(&[TAG_SET]).unwrap();
774 }
775
776 let mut reader = AofReader::open(&path).unwrap();
777 let rec = reader.read_record().unwrap().unwrap();
779 assert!(matches!(rec, AofRecord::Set { .. }));
780 assert!(reader.read_record().unwrap().is_none());
782 }
783
784 #[test]
785 fn corrupt_crc_detected() {
786 let dir = temp_dir();
787 let path = dir.path().join("corrupt.aof");
788
789 {
790 let mut writer = AofWriter::open(&path).unwrap();
791 writer
792 .write_record(&AofRecord::Set {
793 key: "k".into(),
794 value: Bytes::from("v"),
795 expire_ms: -1,
796 })
797 .unwrap();
798 writer.flush().unwrap();
799 }
800
801 let mut data = fs::read(&path).unwrap();
803 let last = data.len() - 1;
804 data[last] ^= 0xFF;
805 fs::write(&path, &data).unwrap();
806
807 let mut reader = AofReader::open(&path).unwrap();
808 let err = reader.read_record().unwrap_err();
809 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
810 }
811
812 #[test]
813 fn missing_magic_is_error() {
814 let dir = temp_dir();
815 let path = dir.path().join("bad.aof");
816 fs::write(&path, b"NOT_AOF_DATA").unwrap();
817
818 let err = AofReader::open(&path).unwrap_err();
819 assert!(matches!(err, FormatError::InvalidMagic));
820 }
821
822 #[test]
823 fn truncate_resets_aof() {
824 let dir = temp_dir();
825 let path = dir.path().join("reset.aof");
826
827 {
828 let mut writer = AofWriter::open(&path).unwrap();
829 writer
830 .write_record(&AofRecord::Set {
831 key: "old".into(),
832 value: Bytes::from("data"),
833 expire_ms: -1,
834 })
835 .unwrap();
836 writer.truncate().unwrap();
837
838 writer
840 .write_record(&AofRecord::Set {
841 key: "new".into(),
842 value: Bytes::from("fresh"),
843 expire_ms: -1,
844 })
845 .unwrap();
846 writer.sync().unwrap();
847 }
848
849 let mut reader = AofReader::open(&path).unwrap();
850 let rec = reader.read_record().unwrap().unwrap();
851 match rec {
852 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
853 other => panic!("expected Set, got {other:?}"),
854 }
855 assert!(reader.read_record().unwrap().is_none());
857 }
858
859 #[test]
860 fn record_round_trip_lpush() {
861 let rec = AofRecord::LPush {
862 key: "list".into(),
863 values: vec![Bytes::from("a"), Bytes::from("b")],
864 };
865 let bytes = rec.to_bytes().unwrap();
866 let decoded = AofRecord::from_bytes(&bytes).unwrap();
867 assert_eq!(rec, decoded);
868 }
869
870 #[test]
871 fn record_round_trip_rpush() {
872 let rec = AofRecord::RPush {
873 key: "list".into(),
874 values: vec![Bytes::from("x")],
875 };
876 let bytes = rec.to_bytes().unwrap();
877 let decoded = AofRecord::from_bytes(&bytes).unwrap();
878 assert_eq!(rec, decoded);
879 }
880
881 #[test]
882 fn record_round_trip_lpop() {
883 let rec = AofRecord::LPop { key: "list".into() };
884 let bytes = rec.to_bytes().unwrap();
885 let decoded = AofRecord::from_bytes(&bytes).unwrap();
886 assert_eq!(rec, decoded);
887 }
888
889 #[test]
890 fn record_round_trip_rpop() {
891 let rec = AofRecord::RPop { key: "list".into() };
892 let bytes = rec.to_bytes().unwrap();
893 let decoded = AofRecord::from_bytes(&bytes).unwrap();
894 assert_eq!(rec, decoded);
895 }
896
897 #[test]
898 fn writer_reader_round_trip_with_list_records() {
899 let dir = temp_dir();
900 let path = dir.path().join("list.aof");
901
902 let records = vec![
903 AofRecord::LPush {
904 key: "l".into(),
905 values: vec![Bytes::from("a"), Bytes::from("b")],
906 },
907 AofRecord::RPush {
908 key: "l".into(),
909 values: vec![Bytes::from("c")],
910 },
911 AofRecord::LPop { key: "l".into() },
912 AofRecord::RPop { key: "l".into() },
913 ];
914
915 {
916 let mut writer = AofWriter::open(&path).unwrap();
917 for rec in &records {
918 writer.write_record(rec).unwrap();
919 }
920 writer.sync().unwrap();
921 }
922
923 let mut reader = AofReader::open(&path).unwrap();
924 let mut got = Vec::new();
925 while let Some(rec) = reader.read_record().unwrap() {
926 got.push(rec);
927 }
928 assert_eq!(records, got);
929 }
930
931 #[test]
932 fn record_round_trip_zadd() {
933 let rec = AofRecord::ZAdd {
934 key: "board".into(),
935 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
936 };
937 let bytes = rec.to_bytes().unwrap();
938 let decoded = AofRecord::from_bytes(&bytes).unwrap();
939 assert_eq!(rec, decoded);
940 }
941
942 #[test]
943 fn record_round_trip_zrem() {
944 let rec = AofRecord::ZRem {
945 key: "board".into(),
946 members: vec!["alice".into(), "bob".into()],
947 };
948 let bytes = rec.to_bytes().unwrap();
949 let decoded = AofRecord::from_bytes(&bytes).unwrap();
950 assert_eq!(rec, decoded);
951 }
952
953 #[test]
954 fn writer_reader_round_trip_with_sorted_set_records() {
955 let dir = temp_dir();
956 let path = dir.path().join("zset.aof");
957
958 let records = vec![
959 AofRecord::ZAdd {
960 key: "board".into(),
961 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
962 },
963 AofRecord::ZRem {
964 key: "board".into(),
965 members: vec!["alice".into()],
966 },
967 ];
968
969 {
970 let mut writer = AofWriter::open(&path).unwrap();
971 for rec in &records {
972 writer.write_record(rec).unwrap();
973 }
974 writer.sync().unwrap();
975 }
976
977 let mut reader = AofReader::open(&path).unwrap();
978 let mut got = Vec::new();
979 while let Some(rec) = reader.read_record().unwrap() {
980 got.push(rec);
981 }
982 assert_eq!(records, got);
983 }
984
985 #[test]
986 fn record_round_trip_persist() {
987 let rec = AofRecord::Persist {
988 key: "mykey".into(),
989 };
990 let bytes = rec.to_bytes().unwrap();
991 let decoded = AofRecord::from_bytes(&bytes).unwrap();
992 assert_eq!(rec, decoded);
993 }
994
995 #[test]
996 fn record_round_trip_pexpire() {
997 let rec = AofRecord::Pexpire {
998 key: "mykey".into(),
999 milliseconds: 5000,
1000 };
1001 let bytes = rec.to_bytes().unwrap();
1002 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1003 assert_eq!(rec, decoded);
1004 }
1005
1006 #[test]
1007 fn record_round_trip_incr() {
1008 let rec = AofRecord::Incr {
1009 key: "counter".into(),
1010 };
1011 let bytes = rec.to_bytes().unwrap();
1012 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1013 assert_eq!(rec, decoded);
1014 }
1015
1016 #[test]
1017 fn record_round_trip_decr() {
1018 let rec = AofRecord::Decr {
1019 key: "counter".into(),
1020 };
1021 let bytes = rec.to_bytes().unwrap();
1022 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1023 assert_eq!(rec, decoded);
1024 }
1025
1026 #[test]
1027 fn writer_reader_round_trip_with_persist_pexpire() {
1028 let dir = temp_dir();
1029 let path = dir.path().join("persist_pexpire.aof");
1030
1031 let records = vec![
1032 AofRecord::Set {
1033 key: "k".into(),
1034 value: Bytes::from("v"),
1035 expire_ms: 5000,
1036 },
1037 AofRecord::Persist { key: "k".into() },
1038 AofRecord::Pexpire {
1039 key: "k".into(),
1040 milliseconds: 3000,
1041 },
1042 ];
1043
1044 {
1045 let mut writer = AofWriter::open(&path).unwrap();
1046 for rec in &records {
1047 writer.write_record(rec).unwrap();
1048 }
1049 writer.sync().unwrap();
1050 }
1051
1052 let mut reader = AofReader::open(&path).unwrap();
1053 let mut got = Vec::new();
1054 while let Some(rec) = reader.read_record().unwrap() {
1055 got.push(rec);
1056 }
1057 assert_eq!(records, got);
1058 }
1059
1060 #[test]
1061 fn aof_path_format() {
1062 let p = aof_path(Path::new("/data"), 3);
1063 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1064 }
1065
1066 #[test]
1067 fn record_round_trip_hset() {
1068 let rec = AofRecord::HSet {
1069 key: "hash".into(),
1070 fields: vec![
1071 ("f1".into(), Bytes::from("v1")),
1072 ("f2".into(), Bytes::from("v2")),
1073 ],
1074 };
1075 let bytes = rec.to_bytes().unwrap();
1076 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1077 assert_eq!(rec, decoded);
1078 }
1079
1080 #[test]
1081 fn record_round_trip_hdel() {
1082 let rec = AofRecord::HDel {
1083 key: "hash".into(),
1084 fields: vec!["f1".into(), "f2".into()],
1085 };
1086 let bytes = rec.to_bytes().unwrap();
1087 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1088 assert_eq!(rec, decoded);
1089 }
1090
1091 #[test]
1092 fn record_round_trip_hincrby() {
1093 let rec = AofRecord::HIncrBy {
1094 key: "hash".into(),
1095 field: "counter".into(),
1096 delta: -42,
1097 };
1098 let bytes = rec.to_bytes().unwrap();
1099 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1100 assert_eq!(rec, decoded);
1101 }
1102
1103 #[test]
1104 fn record_round_trip_sadd() {
1105 let rec = AofRecord::SAdd {
1106 key: "set".into(),
1107 members: vec!["m1".into(), "m2".into(), "m3".into()],
1108 };
1109 let bytes = rec.to_bytes().unwrap();
1110 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1111 assert_eq!(rec, decoded);
1112 }
1113
1114 #[test]
1115 fn record_round_trip_srem() {
1116 let rec = AofRecord::SRem {
1117 key: "set".into(),
1118 members: vec!["m1".into()],
1119 };
1120 let bytes = rec.to_bytes().unwrap();
1121 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1122 assert_eq!(rec, decoded);
1123 }
1124}