mqdb_core/storage/
fjall_backend.rs1use super::backend::{BatchOperations, StorageBackend};
5use crate::config::DurabilityMode;
6use crate::error::Result;
7use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, Readable, Slice};
8use std::path::Path;
9
10pub struct FjallBackend {
11 db: Database,
12 keyspace: Keyspace,
13 durability: DurabilityMode,
14}
15
16impl FjallBackend {
17 pub fn open<P: AsRef<Path>>(path: P, durability: DurabilityMode) -> Result<Self> {
20 let db = Database::builder(path.as_ref()).open()?;
21 let keyspace = db.keyspace("main", KeyspaceCreateOptions::default)?;
22
23 Ok(Self {
24 db,
25 keyspace,
26 durability,
27 })
28 }
29
30 fn sync_if_needed(&self) -> Result<()> {
31 match self.durability {
32 DurabilityMode::Immediate => {
33 self.db.persist(PersistMode::SyncAll)?;
34 }
35 DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
36 }
37 Ok(())
38 }
39}
40
41impl StorageBackend for FjallBackend {
42 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
43 Ok(self.keyspace.get(key)?.map(|v| v.to_vec()))
44 }
45
46 fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
47 self.keyspace.insert(key, value)?;
48 self.sync_if_needed()?;
49 Ok(())
50 }
51
52 fn remove(&self, key: &[u8]) -> Result<()> {
53 self.keyspace.remove(key)?;
54 self.sync_if_needed()?;
55 Ok(())
56 }
57
58 fn prefix_scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
59 let snapshot = self.db.snapshot();
60 let mut results = Vec::new();
61 for guard in snapshot.prefix(&self.keyspace, prefix) {
62 let (k, v) = guard.into_inner()?;
63 results.push((k.to_vec(), v.to_vec()));
64 }
65 Ok(results)
66 }
67
68 fn prefix_count(&self, prefix: &[u8]) -> Result<usize> {
69 let snapshot = self.db.snapshot();
70 let mut count = 0;
71 for guard in snapshot.prefix(&self.keyspace, prefix) {
72 let _ = guard.key()?;
73 count += 1;
74 }
75 Ok(count)
76 }
77
78 fn prefix_scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
79 let snapshot = self.db.snapshot();
80 let mut results = Vec::new();
81 for guard in snapshot.prefix(&self.keyspace, prefix) {
82 results.push(guard.key()?.to_vec());
83 }
84 Ok(results)
85 }
86
87 fn prefix_scan_batch(
88 &self,
89 prefix: &[u8],
90 batch_size: usize,
91 after_key: Option<&[u8]>,
92 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
93 let snapshot = self.db.snapshot();
94 let mut results = Vec::with_capacity(batch_size);
95 let start: Vec<u8> = if let Some(after) = after_key {
96 let mut s = after.to_vec();
97 s.push(0);
98 s
99 } else {
100 prefix.to_vec()
101 };
102 for guard in snapshot.range(&self.keyspace, start..) {
103 let (k, v) = guard.into_inner()?;
104 if !k.starts_with(prefix) {
105 break;
106 }
107 results.push((k.to_vec(), v.to_vec()));
108 if results.len() >= batch_size {
109 break;
110 }
111 }
112 Ok(results)
113 }
114
115 fn range_scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
116 let snapshot = self.db.snapshot();
117 let mut results = Vec::new();
118 for guard in snapshot.range(&self.keyspace, start..end) {
119 let (k, v) = guard.into_inner()?;
120 results.push((k.to_vec(), v.to_vec()));
121 }
122 Ok(results)
123 }
124
125 fn batch(&self) -> Box<dyn BatchOperations> {
126 Box::new(FjallBatch {
127 db: self.db.clone(),
128 keyspace: self.keyspace.clone(),
129 durability: self.durability,
130 operations: Vec::new(),
131 preconditions: Vec::new(),
132 })
133 }
134
135 fn flush(&self) -> Result<()> {
136 self.sync_if_needed()
137 }
138}
139
140enum BatchOp {
141 Insert(Vec<u8>, Vec<u8>),
142 Remove(Vec<u8>),
143}
144
145struct Precondition {
146 key: Vec<u8>,
147 expected_value: Vec<u8>,
148}
149
150pub struct FjallBatch {
151 db: Database,
152 keyspace: Keyspace,
153 durability: DurabilityMode,
154 operations: Vec<BatchOp>,
155 preconditions: Vec<Precondition>,
156}
157
158impl BatchOperations for FjallBatch {
159 fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
160 self.operations.push(BatchOp::Insert(key, value));
161 }
162
163 fn remove(&mut self, key: Vec<u8>) {
164 self.operations.push(BatchOp::Remove(key));
165 }
166
167 fn expect_value(&mut self, key: Vec<u8>, expected_value: Vec<u8>) {
168 self.preconditions.push(Precondition {
169 key,
170 expected_value,
171 });
172 }
173
174 fn commit(self: Box<Self>) -> Result<()> {
175 let snapshot = self.db.snapshot();
176
177 for precondition in &self.preconditions {
178 let actual: Option<Slice> = snapshot.get(&self.keyspace, &precondition.key)?;
179 match actual {
180 Some(val) if val.as_ref() == precondition.expected_value.as_slice() => {}
181 _ => {
182 return Err(crate::error::Error::Conflict(
183 "optimistic lock failed: value was modified".into(),
184 ));
185 }
186 }
187 }
188
189 let mut batch = self.db.batch();
190
191 for op in self.operations {
192 match op {
193 BatchOp::Insert(k, v) => batch.insert(&self.keyspace, k, v),
194 BatchOp::Remove(k) => batch.remove(&self.keyspace, k),
195 }
196 }
197
198 batch.commit()?;
199
200 match self.durability {
201 DurabilityMode::Immediate => {
202 self.db.persist(PersistMode::SyncAll)?;
203 }
204 DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
205 }
206
207 Ok(())
208 }
209}