mqdb_core/storage/
fjall_backend.rs1use super::backend::{BatchOperations, StorageBackend};
5use crate::config::DurabilityMode;
6use crate::error::Result;
7use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, Readable, Slice};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11pub struct FjallBackend {
12 db: Database,
13 keyspace: Keyspace,
14 durability: DurabilityMode,
15 commit_lock: Arc<Mutex<()>>,
16}
17
18impl FjallBackend {
19 pub fn open<P: AsRef<Path>>(path: P, durability: DurabilityMode) -> Result<Self> {
22 std::fs::create_dir_all(path.as_ref()).map_err(fjall::Error::Io)?;
23 let db = Database::builder(path.as_ref()).open()?;
24 let keyspace = db.keyspace("main", KeyspaceCreateOptions::default)?;
25
26 Ok(Self {
27 db,
28 keyspace,
29 durability,
30 commit_lock: Arc::new(Mutex::new(())),
31 })
32 }
33
34 fn sync_if_needed(&self) -> Result<()> {
35 match self.durability {
36 DurabilityMode::Immediate => {
37 self.db.persist(PersistMode::SyncAll)?;
38 }
39 DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
40 }
41 Ok(())
42 }
43}
44
45impl StorageBackend for FjallBackend {
46 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
47 Ok(self.keyspace.get(key)?.map(|v| v.to_vec()))
48 }
49
50 fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
51 self.keyspace.insert(key, value)?;
52 self.sync_if_needed()?;
53 Ok(())
54 }
55
56 fn remove(&self, key: &[u8]) -> Result<()> {
57 self.keyspace.remove(key)?;
58 self.sync_if_needed()?;
59 Ok(())
60 }
61
62 fn prefix_scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
63 let snapshot = self.db.snapshot();
64 let mut results = Vec::new();
65 for guard in snapshot.prefix(&self.keyspace, prefix) {
66 let (k, v) = guard.into_inner()?;
67 results.push((k.to_vec(), v.to_vec()));
68 }
69 Ok(results)
70 }
71
72 fn prefix_count(&self, prefix: &[u8]) -> Result<usize> {
73 let snapshot = self.db.snapshot();
74 let mut count = 0;
75 for guard in snapshot.prefix(&self.keyspace, prefix) {
76 let _ = guard.key()?;
77 count += 1;
78 }
79 Ok(count)
80 }
81
82 fn prefix_scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
83 let snapshot = self.db.snapshot();
84 let mut results = Vec::new();
85 for guard in snapshot.prefix(&self.keyspace, prefix) {
86 results.push(guard.key()?.to_vec());
87 }
88 Ok(results)
89 }
90
91 fn prefix_scan_batch(
92 &self,
93 prefix: &[u8],
94 batch_size: usize,
95 after_key: Option<&[u8]>,
96 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
97 let snapshot = self.db.snapshot();
98 let mut results = Vec::with_capacity(batch_size);
99 let start: Vec<u8> = if let Some(after) = after_key {
100 let mut s = after.to_vec();
101 s.push(0);
102 s
103 } else {
104 prefix.to_vec()
105 };
106 for guard in snapshot.range(&self.keyspace, start..) {
107 let (k, v) = guard.into_inner()?;
108 if !k.starts_with(prefix) {
109 break;
110 }
111 results.push((k.to_vec(), v.to_vec()));
112 if results.len() >= batch_size {
113 break;
114 }
115 }
116 Ok(results)
117 }
118
119 fn range_scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
120 let snapshot = self.db.snapshot();
121 let mut results = Vec::new();
122 for guard in snapshot.range(&self.keyspace, start..end) {
123 let (k, v) = guard.into_inner()?;
124 results.push((k.to_vec(), v.to_vec()));
125 }
126 Ok(results)
127 }
128
129 fn batch(&self) -> Box<dyn BatchOperations> {
130 Box::new(FjallBatch {
131 db: self.db.clone(),
132 keyspace: self.keyspace.clone(),
133 durability: self.durability,
134 operations: Vec::new(),
135 preconditions: Vec::new(),
136 commit_lock: Arc::clone(&self.commit_lock),
137 })
138 }
139
140 fn flush(&self) -> Result<()> {
141 self.sync_if_needed()
142 }
143}
144
145enum BatchOp {
146 Insert(Vec<u8>, Vec<u8>),
147 Remove(Vec<u8>),
148}
149
150struct Precondition {
151 key: Vec<u8>,
152 expected_value: Vec<u8>,
153}
154
155pub struct FjallBatch {
156 db: Database,
157 keyspace: Keyspace,
158 durability: DurabilityMode,
159 operations: Vec<BatchOp>,
160 preconditions: Vec<Precondition>,
161 commit_lock: Arc<Mutex<()>>,
162}
163
164impl BatchOperations for FjallBatch {
165 fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
166 self.operations.push(BatchOp::Insert(key, value));
167 }
168
169 fn remove(&mut self, key: Vec<u8>) {
170 self.operations.push(BatchOp::Remove(key));
171 }
172
173 fn expect_value(&mut self, key: Vec<u8>, expected_value: Vec<u8>) {
174 self.preconditions.push(Precondition {
175 key,
176 expected_value,
177 });
178 }
179
180 fn commit(self: Box<Self>) -> Result<()> {
181 {
182 let _guard = self
183 .commit_lock
184 .lock()
185 .map_err(|e| crate::error::Error::Internal(e.to_string()))?;
186
187 let snapshot = self.db.snapshot();
188 for precondition in &self.preconditions {
189 let actual: Option<Slice> = snapshot.get(&self.keyspace, &precondition.key)?;
190 match actual {
191 Some(val) if val.as_ref() == precondition.expected_value.as_slice() => {}
192 _ => {
193 return Err(crate::error::Error::Conflict(
194 "optimistic lock failed: value was modified".into(),
195 ));
196 }
197 }
198 }
199
200 let mut batch = self.db.batch();
201 for op in self.operations {
202 match op {
203 BatchOp::Insert(k, v) => batch.insert(&self.keyspace, k, v),
204 BatchOp::Remove(k) => batch.remove(&self.keyspace, k),
205 }
206 }
207 batch.commit()?;
208 }
209
210 match self.durability {
211 DurabilityMode::Immediate => {
212 self.db.persist(PersistMode::SyncAll)?;
213 }
214 DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
215 }
216
217 Ok(())
218 }
219}