spatio/storage/backends/
aof.rs1use super::MemoryBackend;
4use crate::config::{DbItem, SetOptions};
5use crate::error::Result;
6use crate::storage::persistence::AOFCommand;
7use crate::storage::{StorageBackend, StorageOp, StorageStats};
8use bytes::Bytes;
9use std::collections::BTreeMap;
10use std::time::SystemTime;
11
12pub struct AOFBackend {
14 memory: MemoryBackend,
15 aof_writer: crate::storage::AOFFile,
16}
17
18impl AOFBackend {
19 pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
21 let aof_writer = crate::storage::AOFFile::open(path)?;
22 let memory = MemoryBackend::new();
23
24 Ok(Self { memory, aof_writer })
25 }
26
27 pub fn load_from_aof(&mut self) -> Result<()> {
29 let commands = self.aof_writer.replay()?;
30 self.memory = MemoryBackend::new();
31
32 for command in commands {
33 match command {
34 AOFCommand::Set {
35 key,
36 value,
37 created_at,
38 expires_at,
39 } => {
40 let item = DbItem {
41 value,
42 created_at,
43 expires_at,
44 };
45 self.memory.put(key.as_ref(), &item)?;
46 }
47 AOFCommand::Delete { key } => {
48 let _ = self.memory.delete(key.as_ref())?;
49 }
50 }
51 }
52
53 Ok(())
54 }
55}
56
57impl StorageBackend for AOFBackend {
58 fn put(&mut self, key: &[u8], item: &DbItem) -> Result<()> {
59 let opts = item.expires_at.map(SetOptions::with_expiration);
60 self.aof_writer.write_set(
61 &Bytes::copy_from_slice(key),
62 &item.value,
63 opts.as_ref(),
64 item.created_at,
65 )?;
66
67 self.memory.put(key, item)
68 }
69
70 fn get(&self, key: &[u8]) -> Result<Option<DbItem>> {
71 self.memory.get(key)
72 }
73
74 fn delete(&mut self, key: &[u8]) -> Result<Option<DbItem>> {
75 self.aof_writer.write_delete(&Bytes::copy_from_slice(key))?;
76
77 self.memory.delete(key)
78 }
79
80 fn contains_key(&self, key: &[u8]) -> Result<bool> {
81 self.memory.contains_key(key)
82 }
83
84 fn keys_with_prefix(&self, prefix: &[u8]) -> Result<Vec<Bytes>> {
85 self.memory.keys_with_prefix(prefix)
86 }
87
88 fn scan_prefix(&self, prefix: &[u8]) -> Result<BTreeMap<Bytes, DbItem>> {
89 self.memory.scan_prefix(prefix)
90 }
91
92 fn len(&self) -> Result<usize> {
93 self.memory.len()
94 }
95
96 fn is_empty(&self) -> Result<bool> {
97 self.memory.is_empty()
98 }
99
100 fn sync(&mut self) -> Result<()> {
101 self.aof_writer.sync()
102 }
103
104 fn close(&mut self) -> Result<()> {
105 self.aof_writer.sync()?;
106 self.memory.close()
107 }
108
109 fn stats(&self) -> Result<StorageStats> {
110 self.memory.stats()
111 }
112
113 fn batch(&mut self, ops: &[StorageOp]) -> Result<()> {
114 for op in ops {
115 match op {
116 StorageOp::Put { key, item } => {
117 let opts = item.expires_at.map(SetOptions::with_expiration);
118 self.aof_writer
119 .write_set(key, &item.value, opts.as_ref(), item.created_at)?;
120 }
121 StorageOp::Delete { key } => {
122 self.aof_writer.write_delete(key)?;
123 }
124 }
125 }
126
127 self.memory.batch(ops)
128 }
129
130 fn iter(&self) -> Result<Box<dyn Iterator<Item = (Bytes, DbItem)> + '_>> {
131 self.memory.iter()
132 }
133
134 fn cleanup_expired(&mut self, now: SystemTime) -> Result<usize> {
135 let expired_keys = {
136 let mut keys = Vec::new();
137 for (key, item) in self.memory.iter()? {
138 if let Some(expires_at) = item.expires_at
139 && expires_at <= now
140 {
141 keys.push(key);
142 }
143 }
144 keys
145 };
146
147 for key in &expired_keys {
148 self.aof_writer.write_delete(key)?;
149 }
150
151 self.memory.cleanup_expired(now)
152 }
153}