Skip to main content

async_filedb/
lib.rs

1use async_channel as mpsc;
2pub use async_kvdb::*;
3use async_memorydb::MenoryDb;
4use core::sync::atomic::{AtomicBool, Ordering::Relaxed};
5use std::collections::HashMap;
6use std::io::Write;
7use std::path::{Path, PathBuf};
8use std::{fs, io, thread};
9
10fn key2file(key: &Key) -> String {
11    form_urlencoded::Serializer::new(String::new()).append_key_only(key).finish()
12}
13fn file2key(name: &str) -> Option<Key> {
14    let mut parse = form_urlencoded::parse(name.as_bytes());
15    let data = parse.next()?;
16    Some(data.0.into())
17}
18
19fn load_item(path: &Path, key: &Key) -> Option<Value> {
20    let name = key2file(key);
21    let path = path.join(name);
22    let value: Value = fs::read(path).ok()?.into();
23    Some(value)
24}
25fn load_all(path: &Path) -> HashMap<Key, Value> {
26    let Ok(entrys) = fs::read_dir(path) else {
27        return HashMap::new();
28    };
29    entrys
30        .filter_map(|entry| {
31            let file = entry.ok()?.path();
32            if !file.is_file() {
33                return None;
34            }
35            let name = file.file_name()?;
36            let name = name.to_string_lossy().into_owned();
37            let key = file2key(&name)?;
38            let value = fs::read(file).ok()?;
39            Some((key, value.into()))
40        })
41        .collect()
42}
43
44fn fsdb_exec(path: &Path, op: DbOp) -> io::Result<()> {
45    match op {
46        DbOp::Get { key, ch } => {
47            let value = load_item(path, &key);
48            let _ = ch.send(value);
49        }
50        DbOp::GetMany { keys, ch } => {
51            let values = keys
52                .into_iter()
53                .filter_map(|key| {
54                    let value = load_item(path, &key)?;
55                    Some((key, value))
56                })
57                .collect();
58            let _ = ch.send(values);
59        }
60        DbOp::GetAll { ch } => {
61            let value = load_all(path);
62            let _ = ch.send(value);
63        }
64        DbOp::Insert { key, value } => {
65            let name = key2file(&key);
66            let path = path.join(name);
67            let mut f = fs::File::create(path)?;
68            let _ = f.write_all(&value);
69            #[cfg(feature = "auto_sync")]
70            {
71                let _ = f.sync_data();
72            }
73        }
74        DbOp::InsertMany { data } => {
75            for (key, value) in data {
76                let name = key2file(&key);
77                let path = path.join(name);
78                let mut f = fs::File::create(path)?;
79                let _ = f.write_all(&value);
80                #[cfg(feature = "auto_sync")]
81                {
82                    let _ = f.sync_data();
83                }
84            }
85        }
86        DbOp::Delete { key } => {
87            let name = key2file(&key);
88            let path = path.join(name);
89            let _ = fs::remove_file(path);
90        }
91        DbOp::DeleteMany { keys } => {
92            for entry in fs::read_dir(path)? {
93                let file = entry?.path();
94                if file.is_file() {
95                    if let Some(name) = file.file_name() {
96                        let name = name.to_string_lossy();
97                        if keys.iter().any(|k| key2file(k) == name) {
98                            let _ = fs::remove_file(file);
99                        }
100                    }
101                }
102            }
103        }
104        DbOp::DeleteAll => {
105            for entry in fs::read_dir(path)? {
106                let file = entry?.path();
107                if file.is_file() {
108                    let _ = fs::remove_file(file);
109                }
110            }
111        }
112    };
113    Ok(())
114}
115
116pub struct FileDb {
117    cached_all: AtomicBool,
118    mem: MenoryDb,
119    write_ch: mpsc::Sender<DbOp>,
120}
121impl FileDb {
122    /// 加载本地文件数据库
123    /// path: 本地文件夹
124    /// interval_ms: 周期性存到本地硬盘
125    pub fn new(path: String, interval_ms: u64) -> io::Result<Self> {
126        let thread_name = format!("db-{}", &path[path.len().saturating_sub(12)..]);
127        let path = PathBuf::from(path);
128        let mem = HashMap::new();
129        fs::create_dir_all(&path)?;
130        let (tx, receiver) = mpsc::unbounded();
131        thread::Builder::new().name(thread_name).spawn(move || {
132            let mut op_merger = DbOpMerger::new();
133            loop {
134                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
135                if let Ok(op) = receiver.recv_blocking() {
136                    op_merger.merge(op);
137                }
138                while let Ok(op) = receiver.try_recv() {
139                    op_merger.merge(op);
140                }
141                if op_merger.is_empty() {
142                    continue;
143                }
144                let ops = op_merger.into_ops();
145                op_merger = DbOpMerger::new();
146                if ops.clear {
147                    let _ = fsdb_exec(&path, DbOp::DeleteAll);
148                }
149                // 增删 全删后的数据
150                if !ops.insert.is_empty() {
151                    let _ = fsdb_exec(&path, DbOp::InsertMany { data: ops.insert });
152                }
153                if !ops.delete.is_empty() {
154                    let _ = fsdb_exec(&path, DbOp::DeleteMany { keys: ops.delete });
155                }
156                // 读取最新数据
157                for (key, chs) in ops.get_one {
158                    let value = load_item(&path, &key);
159                    for ch in chs {
160                        let _ = ch.send(value.clone());
161                    }
162                }
163                for op in ops.get_many {
164                    let _ = fsdb_exec(&path, op);
165                }
166                for ch in ops.get_all {
167                    let value = load_all(&path);
168                    let _ = ch.send(value.clone());
169                }
170            }
171        })?;
172        Ok(Self {
173            cached_all: AtomicBool::new(false),
174            mem: MenoryDb::new(mem),
175            write_ch: tx,
176        })
177    }
178    pub async fn clear_cache(&self) {
179        self.cached_all.store(false, Relaxed);
180        self.mem.delete_all().await;
181    }
182    pub async fn load_all(&self) {
183        let (tx, rx) = oneshot::async_channel();
184        let _ = self.write_ch.send(DbOp::GetAll { ch: tx }).await;
185        let Ok(data) = rx.await else { return };
186        self.mem.set_many(data).await;
187        self.cached_all.store(true, Relaxed);
188    }
189}
190
191#[async_trait]
192impl Kvdb for FileDb {
193    async fn scan_keys(&self, filter: &Filter) -> Vec<Key> {
194        if !self.cached_all.load(Relaxed) {
195            self.load_all().await;
196        }
197        self.mem.scan_keys(filter).await
198    }
199    async fn get(&self, key: Key) -> Option<Value> {
200        let ret = self.mem.get(key.clone()).await;
201        if ret.is_some() {
202            return ret;
203        }
204        let (tx, rx) = oneshot::async_channel();
205        let _ = self.write_ch.send(DbOp::Get { key: key.clone(), ch: tx }).await;
206        let value = rx.await.ok()?;
207        if let Some(value) = &value {
208            self.mem.set(key, value.clone()).await;
209        } else {
210            self.mem.delete(key).await;
211        }
212        value
213    }
214    async fn get_many(&self, keys: Vec<Key>) -> HashMap<Key, Value> {
215        if !self.cached_all.load(Relaxed) {
216            self.load_all().await;
217        }
218        self.mem.get_many(keys).await
219    }
220    async fn set(&self, key: Key, value: Value) {
221        self.mem.set(key.clone(), value.clone()).await;
222        let _ = self.write_ch.send(DbOp::Insert { key, value }).await;
223    }
224    async fn set_many(&self, data: HashMap<Key, Value>) {
225        self.mem.set_many(data.clone()).await;
226        let _ = self.write_ch.send(DbOp::InsertMany { data }).await;
227    }
228    async fn delete(&self, key: Key) {
229        self.mem.delete(key.clone()).await;
230        let _ = self.write_ch.send(DbOp::Delete { key }).await;
231    }
232    async fn delete_many(&self, keys: Vec<Key>) {
233        self.mem.delete_many(keys.clone()).await;
234        let _ = self.write_ch.send(DbOp::DeleteMany { keys }).await;
235    }
236    async fn delete_all(&self) {
237        self.mem.delete_all().await;
238        let _ = self.write_ch.send(DbOp::DeleteAll).await;
239    }
240}