Skip to main content

mqdb_core/storage/
fjall_backend.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: AGPL-3.0-only
3
4use super::backend::{BatchOperations, StorageBackend};
5use crate::config::DurabilityMode;
6use crate::error::Result;
7use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, Readable, Slice};
8use std::path::Path;
9
10pub struct FjallBackend {
11    db: Database,
12    keyspace: Keyspace,
13    durability: DurabilityMode,
14}
15
16impl FjallBackend {
17    /// # Errors
18    /// Returns an error if the storage fails to open.
19    pub fn open<P: AsRef<Path>>(path: P, durability: DurabilityMode) -> Result<Self> {
20        let db = Database::builder(path.as_ref()).open()?;
21        let keyspace = db.keyspace("main", KeyspaceCreateOptions::default)?;
22
23        Ok(Self {
24            db,
25            keyspace,
26            durability,
27        })
28    }
29
30    fn sync_if_needed(&self) -> Result<()> {
31        match self.durability {
32            DurabilityMode::Immediate => {
33                self.db.persist(PersistMode::SyncAll)?;
34            }
35            DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
36        }
37        Ok(())
38    }
39}
40
41impl StorageBackend for FjallBackend {
42    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
43        Ok(self.keyspace.get(key)?.map(|v| v.to_vec()))
44    }
45
46    fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
47        self.keyspace.insert(key, value)?;
48        self.sync_if_needed()?;
49        Ok(())
50    }
51
52    fn remove(&self, key: &[u8]) -> Result<()> {
53        self.keyspace.remove(key)?;
54        self.sync_if_needed()?;
55        Ok(())
56    }
57
58    fn prefix_scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
59        let snapshot = self.db.snapshot();
60        let mut results = Vec::new();
61        for guard in snapshot.prefix(&self.keyspace, prefix) {
62            let (k, v) = guard.into_inner()?;
63            results.push((k.to_vec(), v.to_vec()));
64        }
65        Ok(results)
66    }
67
68    fn prefix_count(&self, prefix: &[u8]) -> Result<usize> {
69        let snapshot = self.db.snapshot();
70        let mut count = 0;
71        for guard in snapshot.prefix(&self.keyspace, prefix) {
72            let _ = guard.key()?;
73            count += 1;
74        }
75        Ok(count)
76    }
77
78    fn prefix_scan_keys(&self, prefix: &[u8]) -> Result<Vec<Vec<u8>>> {
79        let snapshot = self.db.snapshot();
80        let mut results = Vec::new();
81        for guard in snapshot.prefix(&self.keyspace, prefix) {
82            results.push(guard.key()?.to_vec());
83        }
84        Ok(results)
85    }
86
87    fn prefix_scan_batch(
88        &self,
89        prefix: &[u8],
90        batch_size: usize,
91        after_key: Option<&[u8]>,
92    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
93        let snapshot = self.db.snapshot();
94        let mut results = Vec::with_capacity(batch_size);
95        let start: Vec<u8> = if let Some(after) = after_key {
96            let mut s = after.to_vec();
97            s.push(0);
98            s
99        } else {
100            prefix.to_vec()
101        };
102        for guard in snapshot.range(&self.keyspace, start..) {
103            let (k, v) = guard.into_inner()?;
104            if !k.starts_with(prefix) {
105                break;
106            }
107            results.push((k.to_vec(), v.to_vec()));
108            if results.len() >= batch_size {
109                break;
110            }
111        }
112        Ok(results)
113    }
114
115    fn range_scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
116        let snapshot = self.db.snapshot();
117        let mut results = Vec::new();
118        for guard in snapshot.range(&self.keyspace, start..end) {
119            let (k, v) = guard.into_inner()?;
120            results.push((k.to_vec(), v.to_vec()));
121        }
122        Ok(results)
123    }
124
125    fn batch(&self) -> Box<dyn BatchOperations> {
126        Box::new(FjallBatch {
127            db: self.db.clone(),
128            keyspace: self.keyspace.clone(),
129            durability: self.durability,
130            operations: Vec::new(),
131            preconditions: Vec::new(),
132        })
133    }
134
135    fn flush(&self) -> Result<()> {
136        self.sync_if_needed()
137    }
138}
139
140enum BatchOp {
141    Insert(Vec<u8>, Vec<u8>),
142    Remove(Vec<u8>),
143}
144
145struct Precondition {
146    key: Vec<u8>,
147    expected_value: Vec<u8>,
148}
149
150pub struct FjallBatch {
151    db: Database,
152    keyspace: Keyspace,
153    durability: DurabilityMode,
154    operations: Vec<BatchOp>,
155    preconditions: Vec<Precondition>,
156}
157
158impl BatchOperations for FjallBatch {
159    fn insert(&mut self, key: Vec<u8>, value: Vec<u8>) {
160        self.operations.push(BatchOp::Insert(key, value));
161    }
162
163    fn remove(&mut self, key: Vec<u8>) {
164        self.operations.push(BatchOp::Remove(key));
165    }
166
167    fn expect_value(&mut self, key: Vec<u8>, expected_value: Vec<u8>) {
168        self.preconditions.push(Precondition {
169            key,
170            expected_value,
171        });
172    }
173
174    fn commit(self: Box<Self>) -> Result<()> {
175        let snapshot = self.db.snapshot();
176
177        for precondition in &self.preconditions {
178            let actual: Option<Slice> = snapshot.get(&self.keyspace, &precondition.key)?;
179            match actual {
180                Some(val) if val.as_ref() == precondition.expected_value.as_slice() => {}
181                _ => {
182                    return Err(crate::error::Error::Conflict(
183                        "optimistic lock failed: value was modified".into(),
184                    ));
185                }
186            }
187        }
188
189        let mut batch = self.db.batch();
190
191        for op in self.operations {
192            match op {
193                BatchOp::Insert(k, v) => batch.insert(&self.keyspace, k, v),
194                BatchOp::Remove(k) => batch.remove(&self.keyspace, k),
195            }
196        }
197
198        batch.commit()?;
199
200        match self.durability {
201            DurabilityMode::Immediate => {
202                self.db.persist(PersistMode::SyncAll)?;
203            }
204            DurabilityMode::PeriodicMs(_) | DurabilityMode::None => {}
205        }
206
207        Ok(())
208    }
209}