1use std::collections::HashMap;
4use std::path::Path;
5use std::sync::Arc;
6use tokio::fs::{File, OpenOptions};
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
8use tokio::sync::RwLock;
9use tracing::{debug, error, warn};
10
11use crate::{
12 KVError, KVResult, Key, Entry, DatabaseId, Storage, StorageStats,
13};
14
15#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17enum AOFOperation {
18 Set { key: Key, entry: Entry },
19 Delete { key: Key },
20 Clear,
21}
22
23pub struct AOFStorage {
25 cache: Arc<RwLock<HashMap<DatabaseId, HashMap<Key, Entry>>>>,
27 aof_path: String,
29 writer: Arc<RwLock<Option<BufWriter<File>>>>,
31 sync_interval: u64,
33 sync_handle: Option<tokio::task::JoinHandle<()>>,
35}
36
37impl AOFStorage {
38 pub async fn new<P: AsRef<Path>>(aof_path: P) -> KVResult<Self> {
43 let aof_path = aof_path.as_ref().to_string_lossy().to_string();
44
45 if let Some(parent) = Path::new(&aof_path).parent() {
47 tokio::fs::create_dir_all(parent).await
48 .map_err(|e| KVError::Storage(format!("Failed to create AOF directory: {e}")))?;
49 }
50
51 let mut storage = Self {
52 cache: Arc::new(RwLock::new(HashMap::new())),
53 aof_path: aof_path.clone(),
54 writer: Arc::new(RwLock::new(None)),
55 sync_interval: 1, sync_handle: None,
57 };
58
59 storage.load_from_aof().await?;
61
62 storage.open_writer().await?;
64
65 storage.start_sync_task();
67
68 Ok(storage)
69 }
70
71 async fn load_from_aof(&self) -> KVResult<()> {
73 if !Path::new(&self.aof_path).exists() {
74 debug!("AOF file does not exist, starting fresh");
75 return Ok(());
76 }
77
78 debug!("Loading data from AOF file: {}", self.aof_path);
79
80 let file = File::open(&self.aof_path).await
81 .map_err(|e| KVError::Storage(format!("Failed to open AOF file: {e}")))?;
82
83 let reader = BufReader::new(file);
84 let mut lines = reader.lines();
85 let mut cache = self.cache.write().await;
86
87 let mut line_count = 0;
88 while let Some(line) = lines.next_line().await
89 .map_err(|e| KVError::Storage(format!("Failed to read AOF line: {e}")))?
90 {
91 line_count += 1;
92
93 if line.trim().is_empty() {
94 continue;
95 }
96
97 match serde_json::from_str::<AOFOperation>(&line) {
98 Ok(op) => {
99 match op {
100 AOFOperation::Set { key, entry } => {
101 let db = cache.entry(0).or_insert_with(HashMap::new);
105 db.insert(key, entry);
106 }
107 AOFOperation::Delete { key } => {
108 if let Some(db) = cache.get_mut(&0) {
109 db.remove(&key);
110 }
111 }
112 AOFOperation::Clear => {
113 if let Some(db) = cache.get_mut(&0) {
114 db.clear();
115 }
116 }
117 }
118 }
119 Err(e) => {
120 warn!("Failed to parse AOF line {}: {}", line_count, e);
121 }
123 }
124 }
125
126 debug!("Loaded {} lines from AOF file", line_count);
127 Ok(())
128 }
129
130 #[allow(clippy::significant_drop_tightening)]
132 async fn open_writer(&self) -> KVResult<()> {
133 let file = OpenOptions::new()
134 .create(true)
135 .append(true)
136 .open(&self.aof_path)
137 .await
138 .map_err(|e| KVError::Storage(format!("Failed to open AOF file for writing: {e}")))?;
139
140 let writer = BufWriter::new(file);
141 let mut writer_guard = self.writer.write().await;
142 *writer_guard = Some(writer);
143
144 Ok(())
145 }
146
147 #[allow(clippy::significant_drop_tightening)]
149 async fn write_operation(&self, operation: AOFOperation) -> KVResult<()> {
150 let operation_json = serde_json::to_string(&operation)
151 .map_err(KVError::Serialization)?;
152
153 let mut writer_guard = self.writer.write().await;
154 if let Some(writer) = writer_guard.as_mut() {
155 writer.write_all(operation_json.as_bytes()).await
156 .map_err(|e| KVError::Storage(format!("Failed to write to AOF: {e}")))?;
157 writer.write_all(b"\n").await
158 .map_err(|e| KVError::Storage(format!("Failed to write newline to AOF: {e}")))?;
159 } else {
160 return Err(KVError::Storage("AOF writer not available".to_string()));
161 }
162
163 Ok(())
164 }
165
166 fn start_sync_task(&mut self) {
168 let writer = Arc::clone(&self.writer);
169 let sync_interval = self.sync_interval;
170
171 let handle = tokio::spawn(async move {
172 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(sync_interval));
173
174 loop {
175 interval.tick().await;
176
177 let mut writer_guard = writer.write().await;
178 if let Some(writer) = writer_guard.as_mut()
179 && let Err(e) = writer.flush().await {
180 error!("Failed to flush AOF: {}", e);
181 }
182 }
183 });
184
185 self.sync_handle = Some(handle);
186 }
187
188 fn _stop_sync_task(&mut self) {
190 if let Some(handle) = self.sync_handle.take() {
191 handle.abort();
192 }
193 }
194
195 #[allow(clippy::significant_drop_tightening)]
197 async fn force_sync(&self) -> KVResult<()> {
198 let mut writer_guard = self.writer.write().await;
199 if let Some(writer) = writer_guard.as_mut() {
200 writer.flush().await
201 .map_err(|e| KVError::Storage(format!("Failed to flush AOF: {e}")))?;
202 }
203 Ok(())
204 }
205}
206
207impl Drop for AOFStorage {
208 fn drop(&mut self) {
209 }
212}
213
214#[async_trait::async_trait]
215impl Storage for AOFStorage {
216 #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
217 async fn get(&self, database_id: DatabaseId, key: &Key) -> KVResult<Option<Entry>> {
218 let cache = self.cache.read().await;
219 if let Some(db) = cache.get(&database_id) {
220 Ok(db.get(key).cloned())
221 } else {
222 Ok(None)
223 }
224 }
225
226 #[allow(clippy::significant_drop_tightening)]
227 async fn set(&self, database_id: DatabaseId, key: Key, entry: Entry) -> KVResult<()> {
228 {
230 let mut cache = self.cache.write().await;
231 let db = cache.entry(database_id).or_insert_with(HashMap::new);
232 db.insert(key.clone(), entry.clone());
233 }
234
235 let operation = AOFOperation::Set { key, entry };
237 self.write_operation(operation).await?;
238
239 Ok(())
240 }
241
242 #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
243 async fn delete(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
244 let existed = {
246 let mut cache = self.cache.write().await;
247 if let Some(db) = cache.get_mut(&database_id) {
248 db.remove(key).is_some()
249 } else {
250 false
251 }
252 };
253
254 if existed {
255 let operation = AOFOperation::Delete { key: key.clone() };
257 self.write_operation(operation).await?;
258 }
259
260 Ok(existed)
261 }
262
263 #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
264 async fn exists(&self, database_id: DatabaseId, key: &Key) -> KVResult<bool> {
265 let cache = self.cache.read().await;
266 if let Some(db) = cache.get(&database_id) {
267 Ok(db.contains_key(key))
268 } else {
269 Ok(false)
270 }
271 }
272
273 #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
274 async fn keys(&self, database_id: DatabaseId) -> KVResult<Vec<Key>> {
275 let cache = self.cache.read().await;
276 if let Some(db) = cache.get(&database_id) {
277 Ok(db.keys().cloned().collect())
278 } else {
279 Ok(Vec::new())
280 }
281 }
282
283 #[allow(clippy::significant_drop_tightening, clippy::option_if_let_else)]
284 async fn keys_pattern(&self, database_id: DatabaseId, pattern: &str) -> KVResult<Vec<Key>> {
285 let cache = self.cache.read().await;
286 if let Some(db) = cache.get(&database_id) {
287 let keys: Vec<Key> = db.keys()
288 .filter(|key| matches_pattern(key, pattern))
289 .cloned()
290 .collect();
291 Ok(keys)
292 } else {
293 Ok(Vec::new())
294 }
295 }
296
297 async fn clear_database(&self, database_id: DatabaseId) -> KVResult<()> {
298 {
300 let mut cache = self.cache.write().await;
301 if let Some(db) = cache.get_mut(&database_id) {
302 db.clear();
303 }
304 }
305
306 let operation = AOFOperation::Clear;
308 self.write_operation(operation).await?;
309
310 Ok(())
311 }
312
313 async fn get_stats(&self, database_id: DatabaseId) -> KVResult<StorageStats> {
314 let cache = self.cache.read().await;
315 if let Some(db) = cache.get(&database_id) {
316 let total_keys = db.len() as u64;
317 let memory_usage = std::mem::size_of_val(db) as u64;
318
319 let disk_usage = tokio::fs::metadata(&self.aof_path).await
321 .map(|m| m.len())
322 .unwrap_or(0);
323
324 Ok(StorageStats {
325 total_keys,
326 memory_usage,
327 disk_usage: Some(disk_usage),
328 last_flush: None, })
330 } else {
331 Ok(StorageStats {
332 total_keys: 0,
333 memory_usage: 0,
334 disk_usage: Some(0),
335 last_flush: None,
336 })
337 }
338 }
339
340 async fn flush(&self) -> KVResult<()> {
341 self.force_sync().await?;
342 debug!("AOF storage flushed");
343 Ok(())
344 }
345
346 async fn close(&self) -> KVResult<()> {
347 self.force_sync().await?;
348 debug!("AOF storage closed");
349 Ok(())
350 }
351}
352
353fn matches_pattern(key: &str, pattern: &str) -> bool {
355 if pattern == "*" {
356 return true;
357 }
358
359 if !pattern.contains('*') {
360 return key == pattern;
361 }
362
363 let pattern_parts: Vec<&str> = pattern.split('*').collect();
365 if pattern_parts.len() == 2 {
366 let prefix = pattern_parts[0];
367 let suffix = pattern_parts[1];
368
369 if prefix.is_empty() {
370 key.ends_with(suffix)
371 } else if suffix.is_empty() {
372 key.starts_with(prefix)
373 } else {
374 key.starts_with(prefix) && key.ends_with(suffix)
375 }
376 } else {
377 key.contains(pattern.trim_matches('*'))
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::{Value, Entry};
386 use tempfile::TempDir;
387
388 #[tokio::test]
389 async fn test_aof_storage_basic_operations() {
390 let temp_dir = TempDir::new().unwrap();
391 let aof_path = temp_dir.path().join("test.aof");
392 let storage = AOFStorage::new(&aof_path).await.unwrap();
393 let database_id = 0;
394
395 let entry = Entry::new(Value::String("test_value".to_string()), None);
397 storage.set(database_id, "test_key".to_string(), entry.clone()).await.unwrap();
398
399 let retrieved = storage.get(database_id, &"test_key".to_string()).await.unwrap();
400 assert!(retrieved.is_some());
401 assert_eq!(retrieved.unwrap().value.as_string().unwrap(), "test_value");
402
403 let exists = storage.exists(database_id, &"test_key".to_string()).await.unwrap();
405 assert!(exists);
406
407 let deleted = storage.delete(database_id, &"test_key".to_string()).await.unwrap();
409 assert!(deleted);
410
411 let exists_after = storage.exists(database_id, &"test_key".to_string()).await.unwrap();
412 assert!(!exists_after);
413 }
414
415 #[tokio::test]
416 async fn test_aof_storage_persistence() {
417 let temp_dir = TempDir::new().unwrap();
418 let aof_path = temp_dir.path().join("persistent.aof");
419
420 {
422 let storage = AOFStorage::new(&aof_path).await.unwrap();
423 let entry = Entry::new(Value::String("persistent_value".to_string()), None);
424 storage.set(0, "persistent_key".to_string(), entry).await.unwrap();
425 storage.flush().await.unwrap();
426 }
427
428 {
430 let storage = AOFStorage::new(&aof_path).await.unwrap();
431 let retrieved = storage.get(0, &"persistent_key".to_string()).await.unwrap();
432 assert!(retrieved.is_some());
433 assert_eq!(retrieved.unwrap().value.as_string().unwrap(), "persistent_value");
434 }
435 }
436
437 #[tokio::test]
438 async fn test_aof_storage_stats() {
439 let temp_dir = TempDir::new().unwrap();
440 let aof_path = temp_dir.path().join("stats.aof");
441 let storage = AOFStorage::new(&aof_path).await.unwrap();
442
443 let entry = Entry::new(Value::String("value".to_string()), None);
445 storage.set(0, "key1".to_string(), entry.clone()).await.unwrap();
446 storage.set(0, "key2".to_string(), entry).await.unwrap();
447
448 storage.flush().await.unwrap();
450
451 let stats = storage.get_stats(0).await.unwrap();
452 assert_eq!(stats.total_keys, 2);
453 assert!(stats.memory_usage > 0);
454 assert!(stats.disk_usage.is_some());
455 assert!(stats.disk_usage.unwrap() > 0);
456 }
457}