Skip to main content

mqdb_core/storage/
fjall_backend.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::backend::{BatchOperations, StorageBackend};
5use crate::config::DurabilityMode;
6use crate::error::Result;
7use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, Readable, Slice};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11pub struct FjallBackend {
12    db: Database,
13    keyspace: Keyspace,
14    durability: DurabilityMode,
15    commit_lock: Arc<Mutex<()>>,
16}
17
18impl FjallBackend {
19    /// # Errors
20    /// Returns an error if the storage fails to open.
21    pub fn open<P: AsRef<Path>>(path: P, durability: DurabilityMode) -> Result<Self> {
22        std::fs::create_dir_all(path.as_ref()).map_err(fjall::Error::Io)?;
23        let db = Database::builder(path.as_ref()).open()?;
24        let keyspace = db.keyspace("main", KeyspaceCreateOptions::default)?;
25
26        Ok(Self {
27            db,
28            keyspace,
29            durability,
30            commit_lock: Arc::new(Mutex::new(())),
31        })
32    }
33
34    fn sync_if_needed(&self) -> Result<()> {
35        match self.durability {
36            DurabilityMode::Immediate => {
37                self.db.persist(PersistMode::SyncAll)?;
38            }
39            DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
40        }
41        Ok(())
42    }
43}
44
45impl StorageBackend for FjallBackend {
46    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
47        Ok(self.keyspace.get(key)?.map(|v| v.to_vec()))
48    }
49
50    fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
51        self.keyspace.insert(key, value)?;
52        self.sync_if_needed()?;
53        Ok(())
54    }
55
56    fn remove(&self, key: &[u8]) -> Result<()> {
57        self.keyspace.remove(key)?;
58        self.sync_if_needed()?;
59        Ok(())
60    }
61
62    fn prefix_scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
63        let snapshot = self.db.snapshot();
64        let mut results = Vec::new();
65        for guard in snapshot.prefix(&self.keyspace, prefix) {
66            let (k, v) = guard.into_inner()?;
67            results.push((k.to_vec(), v.to_vec()));
68        }
69        Ok(results)
70    }
71
72    fn prefix_count(&self, prefix: &[u8]) -> Result<usize> {
73        let snapshot = self.db.snapshot();
74        let mut count = 0;
75        for guard in snapshot.prefix(&self.keyspace, prefix) {
76            let _ = guard.key()?;
77            count += 1;
78        }
79        Ok(count)
80    }
81
82    fn prefix_scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
83        let snapshot = self.db.snapshot();
84        let mut results = Vec::new();
85        for guard in snapshot.prefix(&self.keyspace, prefix) {
86            results.push(guard.key()?.to_vec());
87        }
88        Ok(results)
89    }
90
91    fn prefix_scan_batch(
92        &self,
93        prefix: &[u8],
94        batch_size: usize,
95        after_key: Option<&[u8]>,
96    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
97        let snapshot = self.db.snapshot();
98        let mut results = Vec::with_capacity(batch_size);
99        let start: Vec<u8> = if let Some(after) = after_key {
100            let mut s = after.to_vec();
101            s.push(0);
102            s
103        } else {
104            prefix.to_vec()
105        };
106        for guard in snapshot.range(&self.keyspace, start..) {
107            let (k, v) = guard.into_inner()?;
108            if !k.starts_with(prefix) {
109                break;
110            }
111            results.push((k.to_vec(), v.to_vec()));
112            if results.len() >= batch_size {
113                break;
114            }
115        }
116        Ok(results)
117    }
118
119    fn range_scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
120        let snapshot = self.db.snapshot();
121        let mut results = Vec::new();
122        for guard in snapshot.range(&self.keyspace, start..end) {
123            let (k, v) = guard.into_inner()?;
124            results.push((k.to_vec(), v.to_vec()));
125        }
126        Ok(results)
127    }
128
129    fn batch(&self) -> Box<dyn BatchOperations> {
130        Box::new(FjallBatch {
131            db: self.db.clone(),
132            keyspace: self.keyspace.clone(),
133            durability: self.durability,
134            operations: Vec::new(),
135            preconditions: Vec::new(),
136            commit_lock: Arc::clone(&self.commit_lock),
137        })
138    }
139
140    fn flush(&self) -> Result<()> {
141        self.sync_if_needed()
142    }
143}
144
145enum BatchOp {
146    Insert(Vec<u8>, Vec<u8>),
147    Remove(Vec<u8>),
148}
149
150struct Precondition {
151    key: Vec<u8>,
152    expected_value: Vec<u8>,
153}
154
155pub struct FjallBatch {
156    db: Database,
157    keyspace: Keyspace,
158    durability: DurabilityMode,
159    operations: Vec<BatchOp>,
160    preconditions: Vec<Precondition>,
161    commit_lock: Arc<Mutex<()>>,
162}
163
164impl BatchOperations for FjallBatch {
165    fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
166        self.operations.push(BatchOp::Insert(key, value));
167    }
168
169    fn remove(&mut self, key: Vec<u8>) {
170        self.operations.push(BatchOp::Remove(key));
171    }
172
173    fn expect_value(&mut self, key: Vec<u8>, expected_value: Vec<u8>) {
174        self.preconditions.push(Precondition {
175            key,
176            expected_value,
177        });
178    }
179
180    fn commit(self: Box<Self>) -> Result<()> {
181        {
182            let _guard = self
183                .commit_lock
184                .lock()
185                .map_err(|e| crate::error::Error::Internal(e.to_string()))?;
186
187            let snapshot = self.db.snapshot();
188            for precondition in &self.preconditions {
189                let actual: Option<Slice> = snapshot.get(&self.keyspace, &precondition.key)?;
190                match actual {
191                    Some(val) if val.as_ref() == precondition.expected_value.as_slice() => {}
192                    _ => {
193                        return Err(crate::error::Error::Conflict(
194                            "optimistic lock failed: value was modified".into(),
195                        ));
196                    }
197                }
198            }
199
200            let mut batch = self.db.batch();
201            for op in self.operations {
202                match op {
203                    BatchOp::Insert(k, v) => batch.insert(&self.keyspace, k, v),
204                    BatchOp::Remove(k) => batch.remove(&self.keyspace, k),
205                }
206            }
207            batch.commit()?;
208        }
209
210        match self.durability {
211            DurabilityMode::Immediate => {
212                self.db.persist(PersistMode::SyncAll)?;
213            }
214            DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
215        }
216
217        Ok(())
218    }
219}