async_filedb/
lib.rs

1use async_channel as mpsc;
2pub use async_kvdb::*;
3use async_memorydb::MenoryDb;
4use std::collections::HashMap;
5use std::io::Write;
6use std::path::{Path, PathBuf};
7use std::{fs, io, thread};
8
9fn key2file(key: &Key) -> String {
10    form_urlencoded::Serializer::new(String::new()).append_key_only(key).finish()
11}
12fn file2key(name: &str) -> Option<Key> {
13    let mut parse = form_urlencoded::parse(name.as_bytes());
14    let data = parse.next()?;
15    Some(data.0.into())
16}
17
18fn fsdb_exec(path: &Path, op: DbOp) -> io::Result<()> {
19    match op {
20        DbOp::Insert { key, value } => {
21            let name = key2file(&key);
22            let path = path.join(name);
23            let mut f = fs::File::create(path)?;
24            let _ = f.write_all(&value);
25            #[cfg(feature = "auto_sync")]
26            {
27                let _ = f.sync_data();
28            }
29        }
30        DbOp::InsertMany { data } => {
31            for (key, value) in data {
32                let name = key2file(&key);
33                let path = path.join(name);
34                let mut f = fs::File::create(path)?;
35                let _ = f.write_all(&value);
36                #[cfg(feature = "auto_sync")]
37                {
38                    let _ = f.sync_data();
39                }
40            }
41        }
42        DbOp::Delete { key } => {
43            let name = key2file(&key);
44            let path = path.join(name);
45            let _ = fs::remove_file(path);
46        }
47        DbOp::DeleteMany { keys } => {
48            for entry in fs::read_dir(path)? {
49                let file = entry?.path();
50                if file.is_file() {
51                    if let Some(name) = file.file_name() {
52                        let name = name.to_string_lossy();
53                        if keys.iter().any(|k| key2file(k) == name) {
54                            let _ = fs::remove_file(file);
55                        }
56                    }
57                }
58            }
59        }
60        DbOp::DeleteAll => {
61            for entry in fs::read_dir(path)? {
62                let file = entry?.path();
63                if file.is_file() {
64                    let _ = fs::remove_file(file);
65                }
66            }
67        }
68    };
69    Ok(())
70}
71
72pub struct FileDb {
73    mem: MenoryDb,
74    write_ch: mpsc::Sender<DbOp>,
75}
76impl FileDb {
77    /// 加载本地文件数据库
78    /// path: 本地文件夹
79    /// interval_ms: 周期性存到本地硬盘
80    pub fn new(path: String, interval_ms: u64) -> io::Result<Self> {
81        let path = PathBuf::from(path);
82        let mut mem = HashMap::new();
83        fs::create_dir_all(&path)?;
84        for entry in fs::read_dir(&path)? {
85            let file = entry?.path();
86            if file.is_file() {
87                if let Some(name) = file.file_name() {
88                    let name = name.to_string_lossy().into_owned();
89                    let key = if let Some(key) = file2key(&name) {
90                        key
91                    } else {
92                        continue;
93                    };
94                    let value = fs::read(file)?;
95                    mem.insert(key, value.into());
96                }
97            }
98        }
99        let (tx, receiver) = mpsc::unbounded();
100        let thread_name = format!("db-{}", path.display());
101        thread::Builder::new().name(thread_name).spawn(move || {
102            let mut op_merger = DbOpMerger::new();
103            loop {
104                std::thread::sleep(std::time::Duration::from_millis(interval_ms));
105                while let Ok(op) = receiver.try_recv() {
106                    op_merger.merge(op);
107                }
108                if op_merger.is_empty() {
109                    continue;
110                }
111                let ops = op_merger.into_ops();
112                op_merger = DbOpMerger::new();
113                if ops.clear {
114                    let _ = fsdb_exec(&path, DbOp::DeleteAll);
115                }
116                if !ops.insert.is_empty() {
117                    let _ = fsdb_exec(&path, DbOp::InsertMany { data: ops.insert });
118                }
119                if !ops.delete.is_empty() {
120                    let _ = fsdb_exec(&path, DbOp::DeleteMany { keys: ops.delete });
121                }
122            }
123        })?;
124        Ok(Self {
125            mem: MenoryDb::new(mem),
126            write_ch: tx,
127        })
128    }
129}
130
131#[async_trait]
132impl Kvdb for FileDb {
133    async fn scan_keys(&self, filter: &Filter) -> Vec<Key> {
134        self.mem.scan_keys(filter).await
135    }
136    async fn get(&self, key: Key) -> Option<Value> {
137        self.mem.get(key).await
138    }
139    async fn get_many(&self, keys: Vec<Key>) -> HashMap<Key, Value> {
140        self.mem.get_many(keys).await
141    }
142    async fn set(&self, key: Key, value: Value) {
143        self.mem.set(key.clone(), value.clone()).await;
144        let _ = self.write_ch.send(DbOp::Insert { key, value }).await;
145    }
146    async fn set_many(&self, data: HashMap<Key, Value>) {
147        self.mem.set_many(data.clone()).await;
148        let _ = self.write_ch.send(DbOp::InsertMany { data }).await;
149    }
150    async fn delete(&self, key: Key) {
151        self.mem.delete(key.clone()).await;
152        let _ = self.write_ch.send(DbOp::Delete { key }).await;
153    }
154    async fn delete_many(&self, keys: Vec<Key>) {
155        self.mem.delete_many(keys.clone()).await;
156        let _ = self.write_ch.send(DbOp::DeleteMany { keys }).await;
157    }
158    async fn delete_all(&self) {
159        self.mem.delete_all().await;
160        let _ = self.write_ch.send(DbOp::DeleteAll).await;
161    }
162}