spatio/storage/backends/
aof.rs

1//! Persistent storage backend using AOF (Append-Only File).
2
3use super::MemoryBackend;
4use crate::config::{DbItem, SetOptions};
5use crate::error::Result;
6use crate::storage::persistence::AOFCommand;
7use crate::storage::{StorageBackend, StorageOp, StorageStats};
8use bytes::Bytes;
9use std::collections::BTreeMap;
10use std::time::SystemTime;
11
12/// Persistent storage backend using AOF (Append-Only File)
13pub struct AOFBackend {
14    memory: MemoryBackend,
15    aof_writer: crate::storage::AOFFile,
16}
17
18impl AOFBackend {
19    /// Create a new AOF storage backend
20    pub fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
21        let aof_writer = crate::storage::AOFFile::open(path)?;
22        let memory = MemoryBackend::new();
23
24        Ok(Self { memory, aof_writer })
25    }
26
27    /// Load existing data from AOF file
28    pub fn load_from_aof(&mut self) -> Result<()> {
29        let commands = self.aof_writer.replay()?;
30        self.memory = MemoryBackend::new();
31
32        for command in commands {
33            match command {
34                AOFCommand::Set {
35                    key,
36                    value,
37                    created_at,
38                    expires_at,
39                } => {
40                    let item = DbItem {
41                        value,
42                        created_at,
43                        expires_at,
44                    };
45                    self.memory.put(key.as_ref(), &item)?;
46                }
47                AOFCommand::Delete { key } => {
48                    let _ = self.memory.delete(key.as_ref())?;
49                }
50            }
51        }
52
53        Ok(())
54    }
55}
56
57impl StorageBackend for AOFBackend {
58    fn put(&mut self, key: &[u8], item: &DbItem) -> Result<()> {
59        let opts = item.expires_at.map(SetOptions::with_expiration);
60        self.aof_writer.write_set(
61            &Bytes::copy_from_slice(key),
62            &item.value,
63            opts.as_ref(),
64            item.created_at,
65        )?;
66
67        self.memory.put(key, item)
68    }
69
70    fn get(&self, key: &[u8]) -> Result<Option<DbItem>> {
71        self.memory.get(key)
72    }
73
74    fn delete(&mut self, key: &[u8]) -> Result<Option<DbItem>> {
75        self.aof_writer.write_delete(&Bytes::copy_from_slice(key))?;
76
77        self.memory.delete(key)
78    }
79
80    fn contains_key(&self, key: &[u8]) -> Result<bool> {
81        self.memory.contains_key(key)
82    }
83
84    fn keys_with_prefix(&self, prefix: &[u8]) -> Result<Vec<Bytes>> {
85        self.memory.keys_with_prefix(prefix)
86    }
87
88    fn scan_prefix(&self, prefix: &[u8]) -> Result<BTreeMap<Bytes, DbItem>> {
89        self.memory.scan_prefix(prefix)
90    }
91
92    fn len(&self) -> Result<usize> {
93        self.memory.len()
94    }
95
96    fn is_empty(&self) -> Result<bool> {
97        self.memory.is_empty()
98    }
99
100    fn sync(&mut self) -> Result<()> {
101        self.aof_writer.sync()
102    }
103
104    fn close(&mut self) -> Result<()> {
105        self.aof_writer.sync()?;
106        self.memory.close()
107    }
108
109    fn stats(&self) -> Result<StorageStats> {
110        self.memory.stats()
111    }
112
113    fn batch(&mut self, ops: &[StorageOp]) -> Result<()> {
114        for op in ops {
115            match op {
116                StorageOp::Put { key, item } => {
117                    let opts = item.expires_at.map(SetOptions::with_expiration);
118                    self.aof_writer
119                        .write_set(key, &item.value, opts.as_ref(), item.created_at)?;
120                }
121                StorageOp::Delete { key } => {
122                    self.aof_writer.write_delete(key)?;
123                }
124            }
125        }
126
127        self.memory.batch(ops)
128    }
129
130    fn iter(&self) -> Result<Box<dyn Iterator<Item = (Bytes, DbItem)> + '_>> {
131        self.memory.iter()
132    }
133
134    fn cleanup_expired(&mut self, now: SystemTime) -> Result<usize> {
135        let expired_keys = {
136            let mut keys = Vec::new();
137            for (key, item) in self.memory.iter()? {
138                if let Some(expires_at) = item.expires_at
139                    && expires_at <= now
140                {
141                    keys.push(key);
142                }
143            }
144            keys
145        };
146
147        for key in &expired_keys {
148            self.aof_writer.write_delete(key)?;
149        }
150
151        self.memory.cleanup_expired(now)
152    }
153}