Skip to main content

nexir_mvcc_core/
backend.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use crate::types::{CommittedVersion, Intent, Key, Timestamp, TxnId, Value};
4
5/// The storage backend contract.
6/// - `get_committed_versions` must return versions for the key sorted by ascending `commit_ts`.
7/// - A durable backend must make commit atomic with respect to committed-version creation and intent removal.
8/// - `put_committed_batch` must be strictly all-or-nothing for durable backends.
9pub trait Backend {
10    /// Returns all committed versions for a key, ordered ascending by `commit_ts`.
11    fn get_committed_versions(&self, key: &[u8]) -> Result<Vec<CommittedVersion>, String>;
12    /// Returns the most recent committed version for a key, if any.
13    fn get_latest_committed(&self, key: &[u8]) -> Result<Option<CommittedVersion>, String>;
14    /// Returns the most recent committed version for a key that is visible at or before `read_ts`.
15    fn get_visible_committed(
16        &self,
17        key: &[u8],
18        read_ts: Timestamp,
19    ) -> Result<Option<CommittedVersion>, String>;
20    /// Returns the timestamp of the most recent committed version for a key, if any.
21    fn get_latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>, String>;
22    /// Fetches the active intent for a given key, if any exists.
23    fn get_intent(&self, key: &[u8]) -> Result<Option<Intent>, String>;
24    /// Writes a single intent to the backend.
25    fn put_intent(&mut self, intent: Intent) -> Result<(), String>;
26    /// Removes an intent from the backend if it matches the given `txn_id` and `start_ts`.
27    /// Returns `true` if removed, `false` otherwise.
28    fn remove_intent(
29        &mut self,
30        key: &[u8],
31        txn_id: TxnId,
32        start_ts: Timestamp,
33    ) -> Result<bool, String>;
34    /// Writes a single committed version to the backend.
35    fn put_committed(&mut self, version: CommittedVersion) -> Result<(), String>;
36    /// Writes multiple committed versions atomically. Must be all-or-nothing.
37    fn put_committed_batch(&mut self, versions: Vec<CommittedVersion>) -> Result<(), String>;
38    /// Removes a specific committed version during garbage collection.
39    fn remove_committed_version(&mut self, key: &[u8], commit_ts: Timestamp) -> Result<(), String>;
40    /// Returns a deduplicated, sorted list of all keys currently managed by the backend.
41    fn all_keys(&self) -> Result<Vec<Key>, String>;
42    /// Writes multiple intents atomically. Must be all-or-nothing.
43    fn put_intents_batch(&mut self, intents: Vec<Intent>) -> Result<(), String>;
44    /// Converts multiple intents into committed versions atomically.
45    /// Must create the versions and remove the intents in a single durable transaction.
46    fn commit_intents_batch(
47        &mut self,
48        commits: Vec<CommittedVersion>,
49        removed_intents: Vec<(Key, TxnId, Timestamp)>,
50    ) -> Result<(), String>;
51    /// Removes multiple intents atomically. Must be all-or-nothing.
52    fn remove_intents_batch(&mut self, intents: Vec<(Key, TxnId, Timestamp)>)
53        -> Result<(), String>;
54    /// Returns up to `limit` keys strictly ordered, starting from `start` (inclusive if provided).
55    fn keys_from(&self, start: Option<&[u8]>, limit: usize) -> Result<Vec<Key>, String>;
56    /// Returns up to `limit` keys starting with `prefix`, strictly ordered, starting from `start` if provided.
57    /// Excludes intents and returns committed keys only.
58    fn keys_from_prefix(
59        &self,
60        prefix: &[u8],
61        start: Option<&[u8]>,
62        limit: usize,
63    ) -> Result<Vec<Key>, String>;
64    /// Returns the `limit` newest commit timestamps strictly before `before_ts` for the given key.
65    /// Ordered descending (newest first).
66    fn get_committed_timestamps_before(
67        &self,
68        key: &[u8],
69        before_ts: Timestamp,
70        limit: usize,
71    ) -> Result<Vec<Timestamp>, String>;
72    /// Atomically removes a tombstone version and every supplied older version.
73    /// Callers must pass the complete older version set for a final tombstone collapse.
74    fn collapse_tombstone(
75        &mut self,
76        key: &[u8],
77        tombstone_ts: Timestamp,
78        older_ts: Vec<Timestamp>,
79    ) -> Result<(), String>;
80}
81
82/// A simple, non-durable in-memory implementation of the `Backend` trait.
83/// Intended for testing, examples, and rapid prototyping.
84#[derive(Debug, Clone, Default)]
85pub struct InMemoryBackend {
86    committed: BTreeMap<(Key, Timestamp), Option<Value>>,
87    intents: HashMap<Key, Intent>,
88    all_keys_set: BTreeSet<Key>,
89}
90
91impl InMemoryBackend {
92    /// Creates a new, empty in-memory backend.
93    pub fn new() -> Self {
94        Self::default()
95    }
96}
97
98impl Backend for InMemoryBackend {
99    fn get_committed_versions(&self, key: &[u8]) -> Result<Vec<CommittedVersion>, String> {
100        let mut result = Vec::new();
101        let start = (key.to_vec(), Timestamp(0));
102        for ((k, ts), value) in self.committed.range(start..) {
103            if k.as_slice() != key {
104                break;
105            }
106            result.push(CommittedVersion {
107                key: k.clone(),
108                commit_ts: *ts,
109                value: value.clone(),
110            });
111        }
112        Ok(result)
113    }
114
115    fn get_latest_committed(&self, key: &[u8]) -> Result<Option<CommittedVersion>, String> {
116        let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
117        if let Some(((k, ts), value)) = self.committed.range(range).next_back() {
118            if k.as_slice() == key {
119                return Ok(Some(CommittedVersion {
120                    key: k.clone(),
121                    commit_ts: *ts,
122                    value: value.clone(),
123                }));
124            }
125        }
126        Ok(None)
127    }
128
129    fn get_visible_committed(
130        &self,
131        key: &[u8],
132        read_ts: Timestamp,
133    ) -> Result<Option<CommittedVersion>, String> {
134        let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), read_ts);
135        if let Some(((k, ts), value)) = self.committed.range(range).next_back() {
136            if k.as_slice() == key {
137                return Ok(Some(CommittedVersion {
138                    key: k.clone(),
139                    commit_ts: *ts,
140                    value: value.clone(),
141                }));
142            }
143        }
144        Ok(None)
145    }
146
147    fn get_latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>, String> {
148        let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
149        if let Some(((k, ts), _)) = self.committed.range(range).next_back() {
150            if k.as_slice() == key {
151                return Ok(Some(*ts));
152            }
153        }
154        Ok(None)
155    }
156
157    fn get_intent(&self, key: &[u8]) -> Result<Option<Intent>, String> {
158        Ok(self.intents.get(key).cloned())
159    }
160
161    fn put_intent(&mut self, intent: Intent) -> Result<(), String> {
162        self.all_keys_set.insert(intent.key.clone());
163        self.intents.insert(intent.key.clone(), intent);
164        Ok(())
165    }
166
167    fn remove_intent(
168        &mut self,
169        key: &[u8],
170        txn_id: TxnId,
171        start_ts: Timestamp,
172    ) -> Result<bool, String> {
173        if let Some(intent) = self.intents.get(key) {
174            if intent.txn_id == txn_id && intent.start_ts == start_ts {
175                self.intents.remove(key);
176                self.maybe_remove_from_keys(key);
177                return Ok(true);
178            }
179        }
180        Ok(false)
181    }
182
183    fn put_committed(&mut self, version: CommittedVersion) -> Result<(), String> {
184        self.all_keys_set.insert(version.key.clone());
185        self.committed.insert(
186            (version.key.clone(), version.commit_ts),
187            version.value.clone(),
188        );
189        Ok(())
190    }
191
192    fn put_committed_batch(&mut self, versions: Vec<CommittedVersion>) -> Result<(), String> {
193        for version in versions {
194            self.all_keys_set.insert(version.key.clone());
195            self.committed.insert(
196                (version.key.clone(), version.commit_ts),
197                version.value.clone(),
198            );
199        }
200        Ok(())
201    }
202
203    fn remove_committed_version(&mut self, key: &[u8], commit_ts: Timestamp) -> Result<(), String> {
204        self.committed.remove(&(key.to_vec(), commit_ts));
205        self.maybe_remove_from_keys(key);
206        Ok(())
207    }
208
209    fn all_keys(&self) -> Result<Vec<Key>, String> {
210        Ok(self.all_keys_set.iter().cloned().collect())
211    }
212
213    fn put_intents_batch(&mut self, intents: Vec<Intent>) -> Result<(), String> {
214        for intent in intents {
215            self.all_keys_set.insert(intent.key.clone());
216            self.intents.insert(intent.key.clone(), intent);
217        }
218        Ok(())
219    }
220
221    fn commit_intents_batch(
222        &mut self,
223        commits: Vec<CommittedVersion>,
224        removed_intents: Vec<(Key, TxnId, Timestamp)>,
225    ) -> Result<(), String> {
226        for version in commits {
227            self.all_keys_set.insert(version.key.clone());
228            self.committed.insert(
229                (version.key.clone(), version.commit_ts),
230                version.value.clone(),
231            );
232        }
233        for (key, txn_id, start_ts) in removed_intents {
234            if let Some(intent) = self.intents.get(&key) {
235                if intent.txn_id == txn_id && intent.start_ts == start_ts {
236                    self.intents.remove(&key);
237                    self.maybe_remove_from_keys(&key);
238                }
239            }
240        }
241        Ok(())
242    }
243
244    fn remove_intents_batch(
245        &mut self,
246        intents: Vec<(Key, TxnId, Timestamp)>,
247    ) -> Result<(), String> {
248        for (key, txn_id, start_ts) in intents {
249            if let Some(intent) = self.intents.get(&key) {
250                if intent.txn_id == txn_id && intent.start_ts == start_ts {
251                    self.intents.remove(&key);
252                    self.maybe_remove_from_keys(&key);
253                }
254            }
255        }
256        Ok(())
257    }
258
259    fn keys_from(&self, start: Option<&[u8]>, limit: usize) -> Result<Vec<Key>, String> {
260        if let Some(s) = start {
261            Ok(self
262                .all_keys_set
263                .range(s.to_vec()..)
264                .take(limit)
265                .cloned()
266                .collect())
267        } else {
268            Ok(self.all_keys_set.iter().take(limit).cloned().collect())
269        }
270    }
271
272    fn keys_from_prefix(
273        &self,
274        prefix: &[u8],
275        start: Option<&[u8]>,
276        limit: usize,
277    ) -> Result<Vec<Key>, String> {
278        if limit == 0 {
279            return Ok(Vec::new());
280        }
281        if prefix.is_empty() {
282            return Err("Prefix cannot be empty".to_string());
283        }
284        if let Some(s) = start {
285            if !s.starts_with(prefix) {
286                return Err("Start cursor must start with the prefix".to_string());
287            }
288        }
289        let scan_start = start.unwrap_or(prefix);
290        let mut result = Vec::new();
291        let range_start = (scan_start.to_vec(), Timestamp(0));
292        for ((k, _), _) in self.committed.range(range_start..) {
293            if !k.starts_with(prefix) {
294                break;
295            }
296            if result.last() != Some(k) {
297                result.push(k.clone());
298                if result.len() == limit {
299                    break;
300                }
301            }
302        }
303        Ok(result)
304    }
305
306    fn get_committed_timestamps_before(
307        &self,
308        key: &[u8],
309        before_ts: Timestamp,
310        limit: usize,
311    ) -> Result<Vec<Timestamp>, String> {
312        let range = (key.to_vec(), Timestamp(0))..(key.to_vec(), before_ts);
313        let mut result = Vec::new();
314        for ((k, ts), _) in self.committed.range(range).rev().take(limit) {
315            if k.as_slice() == key {
316                result.push(*ts);
317            }
318        }
319        Ok(result)
320    }
321
322    fn collapse_tombstone(
323        &mut self,
324        key: &[u8],
325        tombstone_ts: Timestamp,
326        older_ts: Vec<Timestamp>,
327    ) -> Result<(), String> {
328        self.committed.remove(&(key.to_vec(), tombstone_ts));
329        for ts in older_ts {
330            self.committed.remove(&(key.to_vec(), ts));
331        }
332        self.maybe_remove_from_keys(key);
333        Ok(())
334    }
335}
336
337impl InMemoryBackend {
338    fn maybe_remove_from_keys(&mut self, key: &[u8]) {
339        if !self.intents.contains_key(key) {
340            let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
341            if self.committed.range(range).next().is_none() {
342                self.all_keys_set.remove(key);
343            }
344        }
345    }
346}