1use async_channel as mpsc;
2pub use async_kvdb::*;
3use async_memorydb::MenoryDb;
4use std::collections::HashMap;
5use std::io::Write;
6use std::path::{Path, PathBuf};
7use std::{fs, io, thread};
8
9fn key2file(key: &Key) -> String {
10 form_urlencoded::Serializer::new(String::new()).append_key_only(key).finish()
11}
12fn file2key(name: &str) -> Option<Key> {
13 let mut parse = form_urlencoded::parse(name.as_bytes());
14 let data = parse.next()?;
15 Some(data.0.into())
16}
17
18fn fsdb_exec(path: &Path, op: DbOp) -> io::Result<()> {
19 match op {
20 DbOp::Insert { key, value } => {
21 let name = key2file(&key);
22 let path = path.join(name);
23 let mut f = fs::File::create(path)?;
24 let _ = f.write_all(&value);
25 #[cfg(feature = "auto_sync")]
26 {
27 let _ = f.sync_data();
28 }
29 }
30 DbOp::InsertMany { data } => {
31 for (key, value) in data {
32 let name = key2file(&key);
33 let path = path.join(name);
34 let mut f = fs::File::create(path)?;
35 let _ = f.write_all(&value);
36 #[cfg(feature = "auto_sync")]
37 {
38 let _ = f.sync_data();
39 }
40 }
41 }
42 DbOp::Delete { key } => {
43 let name = key2file(&key);
44 let path = path.join(name);
45 let _ = fs::remove_file(path);
46 }
47 DbOp::DeleteMany { keys } => {
48 for entry in fs::read_dir(path)? {
49 let file = entry?.path();
50 if file.is_file() {
51 if let Some(name) = file.file_name() {
52 let name = name.to_string_lossy();
53 if keys.iter().any(|k| key2file(k) == name) {
54 let _ = fs::remove_file(file);
55 }
56 }
57 }
58 }
59 }
60 DbOp::DeleteAll => {
61 for entry in fs::read_dir(path)? {
62 let file = entry?.path();
63 if file.is_file() {
64 let _ = fs::remove_file(file);
65 }
66 }
67 }
68 };
69 Ok(())
70}
71
72pub struct FileDb {
73 mem: MenoryDb,
74 write_ch: mpsc::Sender<DbOp>,
75}
76impl FileDb {
77 pub fn new(path: String, interval_ms: u64) -> io::Result<Self> {
81 let path = PathBuf::from(path);
82 let mut mem = HashMap::new();
83 fs::create_dir_all(&path)?;
84 for entry in fs::read_dir(&path)? {
85 let file = entry?.path();
86 if file.is_file() {
87 if let Some(name) = file.file_name() {
88 let name = name.to_string_lossy().into_owned();
89 let key = if let Some(key) = file2key(&name) {
90 key
91 } else {
92 continue;
93 };
94 let value = fs::read(file)?;
95 mem.insert(key, value.into());
96 }
97 }
98 }
99 let (tx, receiver) = mpsc::unbounded();
100 let thread_name = format!("db-{}", path.display());
101 thread::Builder::new().name(thread_name).spawn(move || {
102 let mut op_merger = DbOpMerger::new();
103 loop {
104 std::thread::sleep(std::time::Duration::from_millis(interval_ms));
105 while let Ok(op) = receiver.try_recv() {
106 op_merger.merge(op);
107 }
108 if op_merger.is_empty() {
109 continue;
110 }
111 let ops = op_merger.into_ops();
112 op_merger = DbOpMerger::new();
113 if ops.clear {
114 let _ = fsdb_exec(&path, DbOp::DeleteAll);
115 }
116 if !ops.insert.is_empty() {
117 let _ = fsdb_exec(&path, DbOp::InsertMany { data: ops.insert });
118 }
119 if !ops.delete.is_empty() {
120 let _ = fsdb_exec(&path, DbOp::DeleteMany { keys: ops.delete });
121 }
122 }
123 })?;
124 Ok(Self {
125 mem: MenoryDb::new(mem),
126 write_ch: tx,
127 })
128 }
129}
130
131#[async_trait]
132impl Kvdb for FileDb {
133 async fn scan_keys(&self, filter: &Filter) -> Vec<Key> {
134 self.mem.scan_keys(filter).await
135 }
136 async fn get(&self, key: Key) -> Option<Value> {
137 self.mem.get(key).await
138 }
139 async fn get_many(&self, keys: Vec<Key>) -> HashMap<Key, Value> {
140 self.mem.get_many(keys).await
141 }
142 async fn set(&self, key: Key, value: Value) {
143 self.mem.set(key.clone(), value.clone()).await;
144 let _ = self.write_ch.send(DbOp::Insert { key, value }).await;
145 }
146 async fn set_many(&self, data: HashMap<Key, Value>) {
147 self.mem.set_many(data.clone()).await;
148 let _ = self.write_ch.send(DbOp::InsertMany { data }).await;
149 }
150 async fn delete(&self, key: Key) {
151 self.mem.delete(key.clone()).await;
152 let _ = self.write_ch.send(DbOp::Delete { key }).await;
153 }
154 async fn delete_many(&self, keys: Vec<Key>) {
155 self.mem.delete_many(keys.clone()).await;
156 let _ = self.write_ch.send(DbOp::DeleteMany { keys }).await;
157 }
158 async fn delete_all(&self) {
159 self.mem.delete_all().await;
160 let _ = self.write_ch.send(DbOp::DeleteAll).await;
161 }
162}