1use async_channel as mpsc;
2pub use async_kvdb::*;
3use async_memorydb::MenoryDb;
4use core::sync::atomic::{AtomicBool, Ordering::Relaxed};
5use std::collections::HashMap;
6use std::io::Write;
7use std::path::{Path, PathBuf};
8use std::{fs, io, thread};
9
10fn key2file(key: &Key) -> String {
11 form_urlencoded::Serializer::new(String::new()).append_key_only(key).finish()
12}
13fn file2key(name: &str) -> Option<Key> {
14 let mut parse = form_urlencoded::parse(name.as_bytes());
15 let data = parse.next()?;
16 Some(data.0.into())
17}
18
19fn load_item(path: &Path, key: &Key) -> Option<Value> {
20 let name = key2file(key);
21 let path = path.join(name);
22 let value: Value = fs::read(path).ok()?.into();
23 Some(value)
24}
25fn load_all(path: &Path) -> HashMap<Key, Value> {
26 let Ok(entrys) = fs::read_dir(path) else {
27 return HashMap::new();
28 };
29 entrys
30 .filter_map(|entry| {
31 let file = entry.ok()?.path();
32 if !file.is_file() {
33 return None;
34 }
35 let name = file.file_name()?;
36 let name = name.to_string_lossy().into_owned();
37 let key = file2key(&name)?;
38 let value = fs::read(file).ok()?;
39 Some((key, value.into()))
40 })
41 .collect()
42}
43
44fn fsdb_exec(path: &Path, op: DbOp) -> io::Result<()> {
45 match op {
46 DbOp::Get { key, ch } => {
47 let value = load_item(path, &key);
48 let _ = ch.send(value);
49 }
50 DbOp::GetMany { keys, ch } => {
51 let values = keys
52 .into_iter()
53 .filter_map(|key| {
54 let value = load_item(path, &key)?;
55 Some((key, value))
56 })
57 .collect();
58 let _ = ch.send(values);
59 }
60 DbOp::GetAll { ch } => {
61 let value = load_all(path);
62 let _ = ch.send(value);
63 }
64 DbOp::Insert { key, value } => {
65 let name = key2file(&key);
66 let path = path.join(name);
67 let mut f = fs::File::create(path)?;
68 let _ = f.write_all(&value);
69 #[cfg(feature = "auto_sync")]
70 {
71 let _ = f.sync_data();
72 }
73 }
74 DbOp::InsertMany { data } => {
75 for (key, value) in data {
76 let name = key2file(&key);
77 let path = path.join(name);
78 let mut f = fs::File::create(path)?;
79 let _ = f.write_all(&value);
80 #[cfg(feature = "auto_sync")]
81 {
82 let _ = f.sync_data();
83 }
84 }
85 }
86 DbOp::Delete { key } => {
87 let name = key2file(&key);
88 let path = path.join(name);
89 let _ = fs::remove_file(path);
90 }
91 DbOp::DeleteMany { keys } => {
92 for entry in fs::read_dir(path)? {
93 let file = entry?.path();
94 if file.is_file() {
95 if let Some(name) = file.file_name() {
96 let name = name.to_string_lossy();
97 if keys.iter().any(|k| key2file(k) == name) {
98 let _ = fs::remove_file(file);
99 }
100 }
101 }
102 }
103 }
104 DbOp::DeleteAll => {
105 for entry in fs::read_dir(path)? {
106 let file = entry?.path();
107 if file.is_file() {
108 let _ = fs::remove_file(file);
109 }
110 }
111 }
112 };
113 Ok(())
114}
115
116pub struct FileDb {
117 cached_all: AtomicBool,
118 mem: MenoryDb,
119 write_ch: mpsc::Sender<DbOp>,
120}
121impl FileDb {
122 pub fn new(path: String, interval_ms: u64) -> io::Result<Self> {
126 let thread_name = format!("db-{}", &path[path.len().saturating_sub(12)..]);
127 let path = PathBuf::from(path);
128 let mem = HashMap::new();
129 fs::create_dir_all(&path)?;
130 let (tx, receiver) = mpsc::unbounded();
131 thread::Builder::new().name(thread_name).spawn(move || {
132 let mut op_merger = DbOpMerger::new();
133 loop {
134 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
135 if let Ok(op) = receiver.recv_blocking() {
136 op_merger.merge(op);
137 }
138 while let Ok(op) = receiver.try_recv() {
139 op_merger.merge(op);
140 }
141 if op_merger.is_empty() {
142 continue;
143 }
144 let ops = op_merger.into_ops();
145 op_merger = DbOpMerger::new();
146 if ops.clear {
147 let _ = fsdb_exec(&path, DbOp::DeleteAll);
148 }
149 if !ops.insert.is_empty() {
151 let _ = fsdb_exec(&path, DbOp::InsertMany { data: ops.insert });
152 }
153 if !ops.delete.is_empty() {
154 let _ = fsdb_exec(&path, DbOp::DeleteMany { keys: ops.delete });
155 }
156 for (key, chs) in ops.get_one {
158 let value = load_item(&path, &key);
159 for ch in chs {
160 let _ = ch.send(value.clone());
161 }
162 }
163 for op in ops.get_many {
164 let _ = fsdb_exec(&path, op);
165 }
166 for ch in ops.get_all {
167 let value = load_all(&path);
168 let _ = ch.send(value.clone());
169 }
170 }
171 })?;
172 Ok(Self {
173 cached_all: AtomicBool::new(false),
174 mem: MenoryDb::new(mem),
175 write_ch: tx,
176 })
177 }
178 pub async fn clear_cache(&self) {
179 self.cached_all.store(false, Relaxed);
180 self.mem.delete_all().await;
181 }
182 pub async fn load_all(&self) {
183 let (tx, rx) = oneshot::async_channel();
184 let _ = self.write_ch.send(DbOp::GetAll { ch: tx }).await;
185 let Ok(data) = rx.await else { return };
186 self.mem.set_many(data).await;
187 self.cached_all.store(true, Relaxed);
188 }
189}
190
191#[async_trait]
192impl Kvdb for FileDb {
193 async fn scan_keys(&self, filter: &Filter) -> Vec<Key> {
194 if !self.cached_all.load(Relaxed) {
195 self.load_all().await;
196 }
197 self.mem.scan_keys(filter).await
198 }
199 async fn get(&self, key: Key) -> Option<Value> {
200 let ret = self.mem.get(key.clone()).await;
201 if ret.is_some() {
202 return ret;
203 }
204 let (tx, rx) = oneshot::async_channel();
205 let _ = self.write_ch.send(DbOp::Get { key: key.clone(), ch: tx }).await;
206 let value = rx.await.ok()?;
207 if let Some(value) = &value {
208 self.mem.set(key, value.clone()).await;
209 } else {
210 self.mem.delete(key).await;
211 }
212 value
213 }
214 async fn get_many(&self, keys: Vec<Key>) -> HashMap<Key, Value> {
215 if !self.cached_all.load(Relaxed) {
216 self.load_all().await;
217 }
218 self.mem.get_many(keys).await
219 }
220 async fn set(&self, key: Key, value: Value) {
221 self.mem.set(key.clone(), value.clone()).await;
222 let _ = self.write_ch.send(DbOp::Insert { key, value }).await;
223 }
224 async fn set_many(&self, data: HashMap<Key, Value>) {
225 self.mem.set_many(data.clone()).await;
226 let _ = self.write_ch.send(DbOp::InsertMany { data }).await;
227 }
228 async fn delete(&self, key: Key) {
229 self.mem.delete(key.clone()).await;
230 let _ = self.write_ch.send(DbOp::Delete { key }).await;
231 }
232 async fn delete_many(&self, keys: Vec<Key>) {
233 self.mem.delete_many(keys.clone()).await;
234 let _ = self.write_ch.send(DbOp::DeleteMany { keys }).await;
235 }
236 async fn delete_all(&self) {
237 self.mem.delete_all().await;
238 let _ = self.write_ch.send(DbOp::DeleteAll).await;
239 }
240}