1use std::fs::{self, File, OpenOptions};
20use std::io::{self, BufReader, BufWriter, Write};
21use std::path::{Path, PathBuf};
22
23use bytes::Bytes;
24
25use crate::format::{self, FormatError};
26
27fn read_string(r: &mut impl io::Read, field: &str) -> Result<String, FormatError> {
29 let bytes = format::read_bytes(r)?;
30 String::from_utf8(bytes).map_err(|_| {
31 FormatError::Io(io::Error::new(
32 io::ErrorKind::InvalidData,
33 format!("{field} is not valid utf-8"),
34 ))
35 })
36}
37
38const TAG_SET: u8 = 1;
40const TAG_DEL: u8 = 2;
41const TAG_EXPIRE: u8 = 3;
42const TAG_LPUSH: u8 = 4;
43const TAG_RPUSH: u8 = 5;
44const TAG_LPOP: u8 = 6;
45const TAG_RPOP: u8 = 7;
46const TAG_ZADD: u8 = 8;
47const TAG_ZREM: u8 = 9;
48const TAG_PERSIST: u8 = 10;
49const TAG_PEXPIRE: u8 = 11;
50const TAG_INCR: u8 = 12;
51const TAG_DECR: u8 = 13;
52const TAG_HSET: u8 = 14;
53const TAG_HDEL: u8 = 15;
54const TAG_HINCRBY: u8 = 16;
55const TAG_SADD: u8 = 17;
56const TAG_SREM: u8 = 18;
57const TAG_INCRBY: u8 = 19;
58const TAG_DECRBY: u8 = 20;
59const TAG_APPEND: u8 = 21;
60const TAG_RENAME: u8 = 22;
61
62#[derive(Debug, Clone, PartialEq)]
64pub enum AofRecord {
65 Set {
67 key: String,
68 value: Bytes,
69 expire_ms: i64,
70 },
71 Del { key: String },
73 Expire { key: String, seconds: u64 },
75 LPush { key: String, values: Vec<Bytes> },
77 RPush { key: String, values: Vec<Bytes> },
79 LPop { key: String },
81 RPop { key: String },
83 ZAdd {
85 key: String,
86 members: Vec<(f64, String)>,
87 },
88 ZRem { key: String, members: Vec<String> },
90 Persist { key: String },
92 Pexpire { key: String, milliseconds: u64 },
94 Incr { key: String },
96 Decr { key: String },
98 HSet {
100 key: String,
101 fields: Vec<(String, Bytes)>,
102 },
103 HDel { key: String, fields: Vec<String> },
105 HIncrBy {
107 key: String,
108 field: String,
109 delta: i64,
110 },
111 SAdd { key: String, members: Vec<String> },
113 SRem { key: String, members: Vec<String> },
115 IncrBy { key: String, delta: i64 },
117 DecrBy { key: String, delta: i64 },
119 Append { key: String, value: Bytes },
121 Rename { key: String, newkey: String },
123}
124
125impl AofRecord {
126 fn to_bytes(&self) -> Result<Vec<u8>, FormatError> {
128 let mut buf = Vec::new();
129 match self {
130 AofRecord::Set {
131 key,
132 value,
133 expire_ms,
134 } => {
135 format::write_u8(&mut buf, TAG_SET)?;
136 format::write_bytes(&mut buf, key.as_bytes())?;
137 format::write_bytes(&mut buf, value)?;
138 format::write_i64(&mut buf, *expire_ms)?;
139 }
140 AofRecord::Del { key } => {
141 format::write_u8(&mut buf, TAG_DEL)?;
142 format::write_bytes(&mut buf, key.as_bytes())?;
143 }
144 AofRecord::Expire { key, seconds } => {
145 format::write_u8(&mut buf, TAG_EXPIRE)?;
146 format::write_bytes(&mut buf, key.as_bytes())?;
147 format::write_i64(&mut buf, *seconds as i64)?;
148 }
149 AofRecord::LPush { key, values } => {
150 format::write_u8(&mut buf, TAG_LPUSH)?;
151 format::write_bytes(&mut buf, key.as_bytes())?;
152 format::write_u32(&mut buf, values.len() as u32)?;
153 for v in values {
154 format::write_bytes(&mut buf, v)?;
155 }
156 }
157 AofRecord::RPush { key, values } => {
158 format::write_u8(&mut buf, TAG_RPUSH)?;
159 format::write_bytes(&mut buf, key.as_bytes())?;
160 format::write_u32(&mut buf, values.len() as u32)?;
161 for v in values {
162 format::write_bytes(&mut buf, v)?;
163 }
164 }
165 AofRecord::LPop { key } => {
166 format::write_u8(&mut buf, TAG_LPOP)?;
167 format::write_bytes(&mut buf, key.as_bytes())?;
168 }
169 AofRecord::RPop { key } => {
170 format::write_u8(&mut buf, TAG_RPOP)?;
171 format::write_bytes(&mut buf, key.as_bytes())?;
172 }
173 AofRecord::ZAdd { key, members } => {
174 format::write_u8(&mut buf, TAG_ZADD)?;
175 format::write_bytes(&mut buf, key.as_bytes())?;
176 format::write_u32(&mut buf, members.len() as u32)?;
177 for (score, member) in members {
178 format::write_f64(&mut buf, *score)?;
179 format::write_bytes(&mut buf, member.as_bytes())?;
180 }
181 }
182 AofRecord::ZRem { key, members } => {
183 format::write_u8(&mut buf, TAG_ZREM)?;
184 format::write_bytes(&mut buf, key.as_bytes())?;
185 format::write_u32(&mut buf, members.len() as u32)?;
186 for member in members {
187 format::write_bytes(&mut buf, member.as_bytes())?;
188 }
189 }
190 AofRecord::Persist { key } => {
191 format::write_u8(&mut buf, TAG_PERSIST)?;
192 format::write_bytes(&mut buf, key.as_bytes())?;
193 }
194 AofRecord::Pexpire { key, milliseconds } => {
195 format::write_u8(&mut buf, TAG_PEXPIRE)?;
196 format::write_bytes(&mut buf, key.as_bytes())?;
197 format::write_i64(&mut buf, *milliseconds as i64)?;
198 }
199 AofRecord::Incr { key } => {
200 format::write_u8(&mut buf, TAG_INCR)?;
201 format::write_bytes(&mut buf, key.as_bytes())?;
202 }
203 AofRecord::Decr { key } => {
204 format::write_u8(&mut buf, TAG_DECR)?;
205 format::write_bytes(&mut buf, key.as_bytes())?;
206 }
207 AofRecord::HSet { key, fields } => {
208 format::write_u8(&mut buf, TAG_HSET)?;
209 format::write_bytes(&mut buf, key.as_bytes())?;
210 format::write_u32(&mut buf, fields.len() as u32)?;
211 for (field, value) in fields {
212 format::write_bytes(&mut buf, field.as_bytes())?;
213 format::write_bytes(&mut buf, value)?;
214 }
215 }
216 AofRecord::HDel { key, fields } => {
217 format::write_u8(&mut buf, TAG_HDEL)?;
218 format::write_bytes(&mut buf, key.as_bytes())?;
219 format::write_u32(&mut buf, fields.len() as u32)?;
220 for field in fields {
221 format::write_bytes(&mut buf, field.as_bytes())?;
222 }
223 }
224 AofRecord::HIncrBy { key, field, delta } => {
225 format::write_u8(&mut buf, TAG_HINCRBY)?;
226 format::write_bytes(&mut buf, key.as_bytes())?;
227 format::write_bytes(&mut buf, field.as_bytes())?;
228 format::write_i64(&mut buf, *delta)?;
229 }
230 AofRecord::SAdd { key, members } => {
231 format::write_u8(&mut buf, TAG_SADD)?;
232 format::write_bytes(&mut buf, key.as_bytes())?;
233 format::write_u32(&mut buf, members.len() as u32)?;
234 for member in members {
235 format::write_bytes(&mut buf, member.as_bytes())?;
236 }
237 }
238 AofRecord::SRem { key, members } => {
239 format::write_u8(&mut buf, TAG_SREM)?;
240 format::write_bytes(&mut buf, key.as_bytes())?;
241 format::write_u32(&mut buf, members.len() as u32)?;
242 for member in members {
243 format::write_bytes(&mut buf, member.as_bytes())?;
244 }
245 }
246 AofRecord::IncrBy { key, delta } => {
247 format::write_u8(&mut buf, TAG_INCRBY)?;
248 format::write_bytes(&mut buf, key.as_bytes())?;
249 format::write_i64(&mut buf, *delta)?;
250 }
251 AofRecord::DecrBy { key, delta } => {
252 format::write_u8(&mut buf, TAG_DECRBY)?;
253 format::write_bytes(&mut buf, key.as_bytes())?;
254 format::write_i64(&mut buf, *delta)?;
255 }
256 AofRecord::Append { key, value } => {
257 format::write_u8(&mut buf, TAG_APPEND)?;
258 format::write_bytes(&mut buf, key.as_bytes())?;
259 format::write_bytes(&mut buf, value)?;
260 }
261 AofRecord::Rename { key, newkey } => {
262 format::write_u8(&mut buf, TAG_RENAME)?;
263 format::write_bytes(&mut buf, key.as_bytes())?;
264 format::write_bytes(&mut buf, newkey.as_bytes())?;
265 }
266 }
267 Ok(buf)
268 }
269
270 fn from_bytes(data: &[u8]) -> Result<Self, FormatError> {
272 let mut cursor = io::Cursor::new(data);
273 let tag = format::read_u8(&mut cursor)?;
274 match tag {
275 TAG_SET => {
276 let key = read_string(&mut cursor, "key")?;
277 let value = format::read_bytes(&mut cursor)?;
278 let expire_ms = format::read_i64(&mut cursor)?;
279 Ok(AofRecord::Set {
280 key,
281 value: Bytes::from(value),
282 expire_ms,
283 })
284 }
285 TAG_DEL => {
286 let key = read_string(&mut cursor, "key")?;
287 Ok(AofRecord::Del { key })
288 }
289 TAG_EXPIRE => {
290 let key = read_string(&mut cursor, "key")?;
291 let seconds = format::read_i64(&mut cursor)? as u64;
292 Ok(AofRecord::Expire { key, seconds })
293 }
294 TAG_LPUSH | TAG_RPUSH => {
295 let key = read_string(&mut cursor, "key")?;
296 let count = format::read_u32(&mut cursor)?;
297 let mut values = Vec::with_capacity(count as usize);
298 for _ in 0..count {
299 values.push(Bytes::from(format::read_bytes(&mut cursor)?));
300 }
301 if tag == TAG_LPUSH {
302 Ok(AofRecord::LPush { key, values })
303 } else {
304 Ok(AofRecord::RPush { key, values })
305 }
306 }
307 TAG_LPOP => {
308 let key = read_string(&mut cursor, "key")?;
309 Ok(AofRecord::LPop { key })
310 }
311 TAG_RPOP => {
312 let key = read_string(&mut cursor, "key")?;
313 Ok(AofRecord::RPop { key })
314 }
315 TAG_ZADD => {
316 let key = read_string(&mut cursor, "key")?;
317 let count = format::read_u32(&mut cursor)?;
318 let mut members = Vec::with_capacity(count as usize);
319 for _ in 0..count {
320 let score = format::read_f64(&mut cursor)?;
321 let member = read_string(&mut cursor, "member")?;
322 members.push((score, member));
323 }
324 Ok(AofRecord::ZAdd { key, members })
325 }
326 TAG_ZREM => {
327 let key = read_string(&mut cursor, "key")?;
328 let count = format::read_u32(&mut cursor)?;
329 let mut members = Vec::with_capacity(count as usize);
330 for _ in 0..count {
331 members.push(read_string(&mut cursor, "member")?);
332 }
333 Ok(AofRecord::ZRem { key, members })
334 }
335 TAG_PERSIST => {
336 let key = read_string(&mut cursor, "key")?;
337 Ok(AofRecord::Persist { key })
338 }
339 TAG_PEXPIRE => {
340 let key = read_string(&mut cursor, "key")?;
341 let milliseconds = format::read_i64(&mut cursor)? as u64;
342 Ok(AofRecord::Pexpire { key, milliseconds })
343 }
344 TAG_INCR => {
345 let key = read_string(&mut cursor, "key")?;
346 Ok(AofRecord::Incr { key })
347 }
348 TAG_DECR => {
349 let key = read_string(&mut cursor, "key")?;
350 Ok(AofRecord::Decr { key })
351 }
352 TAG_HSET => {
353 let key = read_string(&mut cursor, "key")?;
354 let count = format::read_u32(&mut cursor)?;
355 let mut fields = Vec::with_capacity(count as usize);
356 for _ in 0..count {
357 let field = read_string(&mut cursor, "field")?;
358 let value = Bytes::from(format::read_bytes(&mut cursor)?);
359 fields.push((field, value));
360 }
361 Ok(AofRecord::HSet { key, fields })
362 }
363 TAG_HDEL => {
364 let key = read_string(&mut cursor, "key")?;
365 let count = format::read_u32(&mut cursor)?;
366 let mut fields = Vec::with_capacity(count as usize);
367 for _ in 0..count {
368 fields.push(read_string(&mut cursor, "field")?);
369 }
370 Ok(AofRecord::HDel { key, fields })
371 }
372 TAG_HINCRBY => {
373 let key = read_string(&mut cursor, "key")?;
374 let field = read_string(&mut cursor, "field")?;
375 let delta = format::read_i64(&mut cursor)?;
376 Ok(AofRecord::HIncrBy { key, field, delta })
377 }
378 TAG_SADD => {
379 let key = read_string(&mut cursor, "key")?;
380 let count = format::read_u32(&mut cursor)?;
381 let mut members = Vec::with_capacity(count as usize);
382 for _ in 0..count {
383 members.push(read_string(&mut cursor, "member")?);
384 }
385 Ok(AofRecord::SAdd { key, members })
386 }
387 TAG_SREM => {
388 let key = read_string(&mut cursor, "key")?;
389 let count = format::read_u32(&mut cursor)?;
390 let mut members = Vec::with_capacity(count as usize);
391 for _ in 0..count {
392 members.push(read_string(&mut cursor, "member")?);
393 }
394 Ok(AofRecord::SRem { key, members })
395 }
396 TAG_INCRBY => {
397 let key = read_string(&mut cursor, "key")?;
398 let delta = format::read_i64(&mut cursor)?;
399 Ok(AofRecord::IncrBy { key, delta })
400 }
401 TAG_DECRBY => {
402 let key = read_string(&mut cursor, "key")?;
403 let delta = format::read_i64(&mut cursor)?;
404 Ok(AofRecord::DecrBy { key, delta })
405 }
406 TAG_APPEND => {
407 let key = read_string(&mut cursor, "key")?;
408 let value = Bytes::from(format::read_bytes(&mut cursor)?);
409 Ok(AofRecord::Append { key, value })
410 }
411 TAG_RENAME => {
412 let key = read_string(&mut cursor, "key")?;
413 let newkey = read_string(&mut cursor, "newkey")?;
414 Ok(AofRecord::Rename { key, newkey })
415 }
416 _ => Err(FormatError::UnknownTag(tag)),
417 }
418 }
419}
420
421#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
423pub enum FsyncPolicy {
424 Always,
426 #[default]
428 EverySec,
429 No,
431}
432
433pub struct AofWriter {
435 writer: BufWriter<File>,
436 path: PathBuf,
437}
438
439impl AofWriter {
440 pub fn open(path: impl Into<PathBuf>) -> Result<Self, FormatError> {
443 let path = path.into();
444 let exists = path.exists() && fs::metadata(&path).map(|m| m.len() > 0).unwrap_or(false);
445
446 let file = OpenOptions::new().create(true).append(true).open(&path)?;
447 let mut writer = BufWriter::new(file);
448
449 if !exists {
450 format::write_header(&mut writer, format::AOF_MAGIC)?;
451 writer.flush()?;
452 }
453
454 Ok(Self { writer, path })
455 }
456
457 pub fn write_record(&mut self, record: &AofRecord) -> Result<(), FormatError> {
459 let payload = record.to_bytes()?;
460 let checksum = format::crc32(&payload);
461 self.writer.write_all(&payload)?;
462 format::write_u32(&mut self.writer, checksum)?;
463 Ok(())
464 }
465
466 pub fn flush(&mut self) -> Result<(), FormatError> {
468 self.writer.flush()?;
469 Ok(())
470 }
471
472 pub fn sync(&mut self) -> Result<(), FormatError> {
474 self.writer.flush()?;
475 self.writer.get_ref().sync_all()?;
476 Ok(())
477 }
478
479 pub fn path(&self) -> &Path {
481 &self.path
482 }
483
484 pub fn truncate(&mut self) -> Result<(), FormatError> {
487 self.writer.flush()?;
489
490 let file = OpenOptions::new()
492 .create(true)
493 .write(true)
494 .truncate(true)
495 .open(&self.path)?;
496 let mut writer = BufWriter::new(file);
497 format::write_header(&mut writer, format::AOF_MAGIC)?;
498 writer.flush()?;
499 writer.get_ref().sync_all()?;
501 self.writer = writer;
502 Ok(())
503 }
504}
505
506#[derive(Debug)]
508pub struct AofReader {
509 reader: BufReader<File>,
510}
511
512impl AofReader {
513 pub fn open(path: impl AsRef<Path>) -> Result<Self, FormatError> {
515 let file = File::open(path.as_ref())?;
516 let mut reader = BufReader::new(file);
517 let _version = format::read_header(&mut reader, format::AOF_MAGIC)?;
518 Ok(Self { reader })
519 }
520
521 pub fn read_record(&mut self) -> Result<Option<AofRecord>, FormatError> {
527 let tag = match format::read_u8(&mut self.reader) {
529 Ok(t) => t,
530 Err(FormatError::UnexpectedEof) => return Ok(None),
531 Err(e) => return Err(e),
532 };
533
534 let record_result = self.read_payload_for_tag(tag);
537 match record_result {
538 Ok((payload, stored_crc)) => {
539 let mut full = Vec::with_capacity(1 + payload.len());
541 full.push(tag);
542 full.extend_from_slice(&payload);
543 format::verify_crc32(&full, stored_crc)?;
544 AofRecord::from_bytes(&full).map(Some)
545 }
546 Err(FormatError::UnexpectedEof) => Ok(None),
548 Err(e) => Err(e),
549 }
550 }
551
552 fn read_payload_for_tag(&mut self, tag: u8) -> Result<(Vec<u8>, u32), FormatError> {
554 let mut payload = Vec::new();
555 match tag {
556 TAG_SET => {
557 let key = format::read_bytes(&mut self.reader)?;
559 format::write_bytes(&mut payload, &key)?;
560 let value = format::read_bytes(&mut self.reader)?;
561 format::write_bytes(&mut payload, &value)?;
562 let expire_ms = format::read_i64(&mut self.reader)?;
563 format::write_i64(&mut payload, expire_ms)?;
564 }
565 TAG_DEL => {
566 let key = format::read_bytes(&mut self.reader)?;
567 format::write_bytes(&mut payload, &key)?;
568 }
569 TAG_EXPIRE => {
570 let key = format::read_bytes(&mut self.reader)?;
571 format::write_bytes(&mut payload, &key)?;
572 let seconds = format::read_i64(&mut self.reader)?;
573 format::write_i64(&mut payload, seconds)?;
574 }
575 TAG_LPUSH | TAG_RPUSH => {
576 let key = format::read_bytes(&mut self.reader)?;
577 format::write_bytes(&mut payload, &key)?;
578 let count = format::read_u32(&mut self.reader)?;
579 format::write_u32(&mut payload, count)?;
580 for _ in 0..count {
581 let val = format::read_bytes(&mut self.reader)?;
582 format::write_bytes(&mut payload, &val)?;
583 }
584 }
585 TAG_LPOP | TAG_RPOP => {
586 let key = format::read_bytes(&mut self.reader)?;
587 format::write_bytes(&mut payload, &key)?;
588 }
589 TAG_ZADD => {
590 let key = format::read_bytes(&mut self.reader)?;
591 format::write_bytes(&mut payload, &key)?;
592 let count = format::read_u32(&mut self.reader)?;
593 format::write_u32(&mut payload, count)?;
594 for _ in 0..count {
595 let score = format::read_f64(&mut self.reader)?;
596 format::write_f64(&mut payload, score)?;
597 let member = format::read_bytes(&mut self.reader)?;
598 format::write_bytes(&mut payload, &member)?;
599 }
600 }
601 TAG_ZREM => {
602 let key = format::read_bytes(&mut self.reader)?;
603 format::write_bytes(&mut payload, &key)?;
604 let count = format::read_u32(&mut self.reader)?;
605 format::write_u32(&mut payload, count)?;
606 for _ in 0..count {
607 let member = format::read_bytes(&mut self.reader)?;
608 format::write_bytes(&mut payload, &member)?;
609 }
610 }
611 TAG_PERSIST => {
612 let key = format::read_bytes(&mut self.reader)?;
613 format::write_bytes(&mut payload, &key)?;
614 }
615 TAG_PEXPIRE => {
616 let key = format::read_bytes(&mut self.reader)?;
617 format::write_bytes(&mut payload, &key)?;
618 let millis = format::read_i64(&mut self.reader)?;
619 format::write_i64(&mut payload, millis)?;
620 }
621 TAG_INCR | TAG_DECR => {
622 let key = format::read_bytes(&mut self.reader)?;
623 format::write_bytes(&mut payload, &key)?;
624 }
625 _ => return Err(FormatError::UnknownTag(tag)),
626 }
627 let stored_crc = format::read_u32(&mut self.reader)?;
628 Ok((payload, stored_crc))
629 }
630}
631
632pub fn aof_path(data_dir: &Path, shard_id: u16) -> PathBuf {
634 data_dir.join(format!("shard-{shard_id}.aof"))
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 fn temp_dir() -> tempfile::TempDir {
641 tempfile::tempdir().expect("create temp dir")
642 }
643
644 #[test]
645 fn record_round_trip_set() {
646 let rec = AofRecord::Set {
647 key: "hello".into(),
648 value: Bytes::from("world"),
649 expire_ms: 5000,
650 };
651 let bytes = rec.to_bytes().unwrap();
652 let decoded = AofRecord::from_bytes(&bytes).unwrap();
653 assert_eq!(rec, decoded);
654 }
655
656 #[test]
657 fn record_round_trip_del() {
658 let rec = AofRecord::Del { key: "gone".into() };
659 let bytes = rec.to_bytes().unwrap();
660 let decoded = AofRecord::from_bytes(&bytes).unwrap();
661 assert_eq!(rec, decoded);
662 }
663
664 #[test]
665 fn record_round_trip_expire() {
666 let rec = AofRecord::Expire {
667 key: "ttl".into(),
668 seconds: 300,
669 };
670 let bytes = rec.to_bytes().unwrap();
671 let decoded = AofRecord::from_bytes(&bytes).unwrap();
672 assert_eq!(rec, decoded);
673 }
674
675 #[test]
676 fn set_with_no_expiry() {
677 let rec = AofRecord::Set {
678 key: "k".into(),
679 value: Bytes::from("v"),
680 expire_ms: -1,
681 };
682 let bytes = rec.to_bytes().unwrap();
683 let decoded = AofRecord::from_bytes(&bytes).unwrap();
684 assert_eq!(rec, decoded);
685 }
686
687 #[test]
688 fn writer_reader_round_trip() {
689 let dir = temp_dir();
690 let path = dir.path().join("test.aof");
691
692 let records = vec![
693 AofRecord::Set {
694 key: "a".into(),
695 value: Bytes::from("1"),
696 expire_ms: -1,
697 },
698 AofRecord::Set {
699 key: "b".into(),
700 value: Bytes::from("2"),
701 expire_ms: 10_000,
702 },
703 AofRecord::Del { key: "a".into() },
704 AofRecord::Expire {
705 key: "b".into(),
706 seconds: 60,
707 },
708 ];
709
710 {
712 let mut writer = AofWriter::open(&path).unwrap();
713 for rec in &records {
714 writer.write_record(rec).unwrap();
715 }
716 writer.sync().unwrap();
717 }
718
719 let mut reader = AofReader::open(&path).unwrap();
721 let mut got = Vec::new();
722 while let Some(rec) = reader.read_record().unwrap() {
723 got.push(rec);
724 }
725 assert_eq!(records, got);
726 }
727
728 #[test]
729 fn empty_aof_returns_no_records() {
730 let dir = temp_dir();
731 let path = dir.path().join("empty.aof");
732
733 {
735 let _writer = AofWriter::open(&path).unwrap();
736 }
737
738 let mut reader = AofReader::open(&path).unwrap();
739 assert!(reader.read_record().unwrap().is_none());
740 }
741
742 #[test]
743 fn truncated_record_treated_as_eof() {
744 let dir = temp_dir();
745 let path = dir.path().join("trunc.aof");
746
747 {
749 let mut writer = AofWriter::open(&path).unwrap();
750 writer
751 .write_record(&AofRecord::Set {
752 key: "ok".into(),
753 value: Bytes::from("good"),
754 expire_ms: -1,
755 })
756 .unwrap();
757 writer.flush().unwrap();
758 }
759
760 {
762 let mut file = OpenOptions::new().append(true).open(&path).unwrap();
763 file.write_all(&[TAG_SET]).unwrap();
764 }
765
766 let mut reader = AofReader::open(&path).unwrap();
767 let rec = reader.read_record().unwrap().unwrap();
769 assert!(matches!(rec, AofRecord::Set { .. }));
770 assert!(reader.read_record().unwrap().is_none());
772 }
773
774 #[test]
775 fn corrupt_crc_detected() {
776 let dir = temp_dir();
777 let path = dir.path().join("corrupt.aof");
778
779 {
780 let mut writer = AofWriter::open(&path).unwrap();
781 writer
782 .write_record(&AofRecord::Set {
783 key: "k".into(),
784 value: Bytes::from("v"),
785 expire_ms: -1,
786 })
787 .unwrap();
788 writer.flush().unwrap();
789 }
790
791 let mut data = fs::read(&path).unwrap();
793 let last = data.len() - 1;
794 data[last] ^= 0xFF;
795 fs::write(&path, &data).unwrap();
796
797 let mut reader = AofReader::open(&path).unwrap();
798 let err = reader.read_record().unwrap_err();
799 assert!(matches!(err, FormatError::ChecksumMismatch { .. }));
800 }
801
802 #[test]
803 fn missing_magic_is_error() {
804 let dir = temp_dir();
805 let path = dir.path().join("bad.aof");
806 fs::write(&path, b"NOT_AOF_DATA").unwrap();
807
808 let err = AofReader::open(&path).unwrap_err();
809 assert!(matches!(err, FormatError::InvalidMagic));
810 }
811
812 #[test]
813 fn truncate_resets_aof() {
814 let dir = temp_dir();
815 let path = dir.path().join("reset.aof");
816
817 {
818 let mut writer = AofWriter::open(&path).unwrap();
819 writer
820 .write_record(&AofRecord::Set {
821 key: "old".into(),
822 value: Bytes::from("data"),
823 expire_ms: -1,
824 })
825 .unwrap();
826 writer.truncate().unwrap();
827
828 writer
830 .write_record(&AofRecord::Set {
831 key: "new".into(),
832 value: Bytes::from("fresh"),
833 expire_ms: -1,
834 })
835 .unwrap();
836 writer.sync().unwrap();
837 }
838
839 let mut reader = AofReader::open(&path).unwrap();
840 let rec = reader.read_record().unwrap().unwrap();
841 match rec {
842 AofRecord::Set { key, .. } => assert_eq!(key, "new"),
843 other => panic!("expected Set, got {other:?}"),
844 }
845 assert!(reader.read_record().unwrap().is_none());
847 }
848
849 #[test]
850 fn record_round_trip_lpush() {
851 let rec = AofRecord::LPush {
852 key: "list".into(),
853 values: vec![Bytes::from("a"), Bytes::from("b")],
854 };
855 let bytes = rec.to_bytes().unwrap();
856 let decoded = AofRecord::from_bytes(&bytes).unwrap();
857 assert_eq!(rec, decoded);
858 }
859
860 #[test]
861 fn record_round_trip_rpush() {
862 let rec = AofRecord::RPush {
863 key: "list".into(),
864 values: vec![Bytes::from("x")],
865 };
866 let bytes = rec.to_bytes().unwrap();
867 let decoded = AofRecord::from_bytes(&bytes).unwrap();
868 assert_eq!(rec, decoded);
869 }
870
871 #[test]
872 fn record_round_trip_lpop() {
873 let rec = AofRecord::LPop { key: "list".into() };
874 let bytes = rec.to_bytes().unwrap();
875 let decoded = AofRecord::from_bytes(&bytes).unwrap();
876 assert_eq!(rec, decoded);
877 }
878
879 #[test]
880 fn record_round_trip_rpop() {
881 let rec = AofRecord::RPop { key: "list".into() };
882 let bytes = rec.to_bytes().unwrap();
883 let decoded = AofRecord::from_bytes(&bytes).unwrap();
884 assert_eq!(rec, decoded);
885 }
886
887 #[test]
888 fn writer_reader_round_trip_with_list_records() {
889 let dir = temp_dir();
890 let path = dir.path().join("list.aof");
891
892 let records = vec![
893 AofRecord::LPush {
894 key: "l".into(),
895 values: vec![Bytes::from("a"), Bytes::from("b")],
896 },
897 AofRecord::RPush {
898 key: "l".into(),
899 values: vec![Bytes::from("c")],
900 },
901 AofRecord::LPop { key: "l".into() },
902 AofRecord::RPop { key: "l".into() },
903 ];
904
905 {
906 let mut writer = AofWriter::open(&path).unwrap();
907 for rec in &records {
908 writer.write_record(rec).unwrap();
909 }
910 writer.sync().unwrap();
911 }
912
913 let mut reader = AofReader::open(&path).unwrap();
914 let mut got = Vec::new();
915 while let Some(rec) = reader.read_record().unwrap() {
916 got.push(rec);
917 }
918 assert_eq!(records, got);
919 }
920
921 #[test]
922 fn record_round_trip_zadd() {
923 let rec = AofRecord::ZAdd {
924 key: "board".into(),
925 members: vec![(100.0, "alice".into()), (200.5, "bob".into())],
926 };
927 let bytes = rec.to_bytes().unwrap();
928 let decoded = AofRecord::from_bytes(&bytes).unwrap();
929 assert_eq!(rec, decoded);
930 }
931
932 #[test]
933 fn record_round_trip_zrem() {
934 let rec = AofRecord::ZRem {
935 key: "board".into(),
936 members: vec!["alice".into(), "bob".into()],
937 };
938 let bytes = rec.to_bytes().unwrap();
939 let decoded = AofRecord::from_bytes(&bytes).unwrap();
940 assert_eq!(rec, decoded);
941 }
942
943 #[test]
944 fn writer_reader_round_trip_with_sorted_set_records() {
945 let dir = temp_dir();
946 let path = dir.path().join("zset.aof");
947
948 let records = vec![
949 AofRecord::ZAdd {
950 key: "board".into(),
951 members: vec![(100.0, "alice".into()), (200.0, "bob".into())],
952 },
953 AofRecord::ZRem {
954 key: "board".into(),
955 members: vec!["alice".into()],
956 },
957 ];
958
959 {
960 let mut writer = AofWriter::open(&path).unwrap();
961 for rec in &records {
962 writer.write_record(rec).unwrap();
963 }
964 writer.sync().unwrap();
965 }
966
967 let mut reader = AofReader::open(&path).unwrap();
968 let mut got = Vec::new();
969 while let Some(rec) = reader.read_record().unwrap() {
970 got.push(rec);
971 }
972 assert_eq!(records, got);
973 }
974
975 #[test]
976 fn record_round_trip_persist() {
977 let rec = AofRecord::Persist {
978 key: "mykey".into(),
979 };
980 let bytes = rec.to_bytes().unwrap();
981 let decoded = AofRecord::from_bytes(&bytes).unwrap();
982 assert_eq!(rec, decoded);
983 }
984
985 #[test]
986 fn record_round_trip_pexpire() {
987 let rec = AofRecord::Pexpire {
988 key: "mykey".into(),
989 milliseconds: 5000,
990 };
991 let bytes = rec.to_bytes().unwrap();
992 let decoded = AofRecord::from_bytes(&bytes).unwrap();
993 assert_eq!(rec, decoded);
994 }
995
996 #[test]
997 fn record_round_trip_incr() {
998 let rec = AofRecord::Incr {
999 key: "counter".into(),
1000 };
1001 let bytes = rec.to_bytes().unwrap();
1002 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1003 assert_eq!(rec, decoded);
1004 }
1005
1006 #[test]
1007 fn record_round_trip_decr() {
1008 let rec = AofRecord::Decr {
1009 key: "counter".into(),
1010 };
1011 let bytes = rec.to_bytes().unwrap();
1012 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1013 assert_eq!(rec, decoded);
1014 }
1015
1016 #[test]
1017 fn writer_reader_round_trip_with_persist_pexpire() {
1018 let dir = temp_dir();
1019 let path = dir.path().join("persist_pexpire.aof");
1020
1021 let records = vec![
1022 AofRecord::Set {
1023 key: "k".into(),
1024 value: Bytes::from("v"),
1025 expire_ms: 5000,
1026 },
1027 AofRecord::Persist { key: "k".into() },
1028 AofRecord::Pexpire {
1029 key: "k".into(),
1030 milliseconds: 3000,
1031 },
1032 ];
1033
1034 {
1035 let mut writer = AofWriter::open(&path).unwrap();
1036 for rec in &records {
1037 writer.write_record(rec).unwrap();
1038 }
1039 writer.sync().unwrap();
1040 }
1041
1042 let mut reader = AofReader::open(&path).unwrap();
1043 let mut got = Vec::new();
1044 while let Some(rec) = reader.read_record().unwrap() {
1045 got.push(rec);
1046 }
1047 assert_eq!(records, got);
1048 }
1049
1050 #[test]
1051 fn aof_path_format() {
1052 let p = aof_path(Path::new("/data"), 3);
1053 assert_eq!(p, PathBuf::from("/data/shard-3.aof"));
1054 }
1055
1056 #[test]
1057 fn record_round_trip_hset() {
1058 let rec = AofRecord::HSet {
1059 key: "hash".into(),
1060 fields: vec![
1061 ("f1".into(), Bytes::from("v1")),
1062 ("f2".into(), Bytes::from("v2")),
1063 ],
1064 };
1065 let bytes = rec.to_bytes().unwrap();
1066 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1067 assert_eq!(rec, decoded);
1068 }
1069
1070 #[test]
1071 fn record_round_trip_hdel() {
1072 let rec = AofRecord::HDel {
1073 key: "hash".into(),
1074 fields: vec!["f1".into(), "f2".into()],
1075 };
1076 let bytes = rec.to_bytes().unwrap();
1077 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1078 assert_eq!(rec, decoded);
1079 }
1080
1081 #[test]
1082 fn record_round_trip_hincrby() {
1083 let rec = AofRecord::HIncrBy {
1084 key: "hash".into(),
1085 field: "counter".into(),
1086 delta: -42,
1087 };
1088 let bytes = rec.to_bytes().unwrap();
1089 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1090 assert_eq!(rec, decoded);
1091 }
1092
1093 #[test]
1094 fn record_round_trip_sadd() {
1095 let rec = AofRecord::SAdd {
1096 key: "set".into(),
1097 members: vec!["m1".into(), "m2".into(), "m3".into()],
1098 };
1099 let bytes = rec.to_bytes().unwrap();
1100 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1101 assert_eq!(rec, decoded);
1102 }
1103
1104 #[test]
1105 fn record_round_trip_srem() {
1106 let rec = AofRecord::SRem {
1107 key: "set".into(),
1108 members: vec!["m1".into()],
1109 };
1110 let bytes = rec.to_bytes().unwrap();
1111 let decoded = AofRecord::from_bytes(&bytes).unwrap();
1112 assert_eq!(rec, decoded);
1113 }
1114}