1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use crate::types::{CommittedVersion, Intent, Key, Timestamp, TxnId, Value};
4
5pub trait Backend {
10 fn get_committed_versions(&self, key: &[u8]) -> Result<Vec<CommittedVersion>, String>;
12 fn get_latest_committed(&self, key: &[u8]) -> Result<Option<CommittedVersion>, String>;
14 fn get_visible_committed(
16 &self,
17 key: &[u8],
18 read_ts: Timestamp,
19 ) -> Result<Option<CommittedVersion>, String>;
20 fn get_latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>, String>;
22 fn get_intent(&self, key: &[u8]) -> Result<Option<Intent>, String>;
24 fn put_intent(&mut self, intent: Intent) -> Result<(), String>;
26 fn remove_intent(
29 &mut self,
30 key: &[u8],
31 txn_id: TxnId,
32 start_ts: Timestamp,
33 ) -> Result<bool, String>;
34 fn put_committed(&mut self, version: CommittedVersion) -> Result<(), String>;
36 fn put_committed_batch(&mut self, versions: Vec<CommittedVersion>) -> Result<(), String>;
38 fn remove_committed_version(&mut self, key: &[u8], commit_ts: Timestamp) -> Result<(), String>;
40 fn all_keys(&self) -> Result<Vec<Key>, String>;
42 fn put_intents_batch(&mut self, intents: Vec<Intent>) -> Result<(), String>;
44 fn commit_intents_batch(
47 &mut self,
48 commits: Vec<CommittedVersion>,
49 removed_intents: Vec<(Key, TxnId, Timestamp)>,
50 ) -> Result<(), String>;
51 fn remove_intents_batch(&mut self, intents: Vec<(Key, TxnId, Timestamp)>)
53 -> Result<(), String>;
54 fn keys_from(&self, start: Option<&[u8]>, limit: usize) -> Result<Vec<Key>, String>;
56 fn keys_from_prefix(
59 &self,
60 prefix: &[u8],
61 start: Option<&[u8]>,
62 limit: usize,
63 ) -> Result<Vec<Key>, String>;
64 fn get_committed_timestamps_before(
67 &self,
68 key: &[u8],
69 before_ts: Timestamp,
70 limit: usize,
71 ) -> Result<Vec<Timestamp>, String>;
72 fn collapse_tombstone(
75 &mut self,
76 key: &[u8],
77 tombstone_ts: Timestamp,
78 older_ts: Vec<Timestamp>,
79 ) -> Result<(), String>;
80}
81
82#[derive(Debug, Clone, Default)]
85pub struct InMemoryBackend {
86 committed: BTreeMap<(Key, Timestamp), Option<Value>>,
87 intents: HashMap<Key, Intent>,
88 all_keys_set: BTreeSet<Key>,
89}
90
91impl InMemoryBackend {
92 pub fn new() -> Self {
94 Self::default()
95 }
96}
97
98impl Backend for InMemoryBackend {
99 fn get_committed_versions(&self, key: &[u8]) -> Result<Vec<CommittedVersion>, String> {
100 let mut result = Vec::new();
101 let start = (key.to_vec(), Timestamp(0));
102 for ((k, ts), value) in self.committed.range(start..) {
103 if k.as_slice() != key {
104 break;
105 }
106 result.push(CommittedVersion {
107 key: k.clone(),
108 commit_ts: *ts,
109 value: value.clone(),
110 });
111 }
112 Ok(result)
113 }
114
115 fn get_latest_committed(&self, key: &[u8]) -> Result<Option<CommittedVersion>, String> {
116 let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
117 if let Some(((k, ts), value)) = self.committed.range(range).next_back() {
118 if k.as_slice() == key {
119 return Ok(Some(CommittedVersion {
120 key: k.clone(),
121 commit_ts: *ts,
122 value: value.clone(),
123 }));
124 }
125 }
126 Ok(None)
127 }
128
129 fn get_visible_committed(
130 &self,
131 key: &[u8],
132 read_ts: Timestamp,
133 ) -> Result<Option<CommittedVersion>, String> {
134 let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), read_ts);
135 if let Some(((k, ts), value)) = self.committed.range(range).next_back() {
136 if k.as_slice() == key {
137 return Ok(Some(CommittedVersion {
138 key: k.clone(),
139 commit_ts: *ts,
140 value: value.clone(),
141 }));
142 }
143 }
144 Ok(None)
145 }
146
147 fn get_latest_commit_ts(&self, key: &[u8]) -> Result<Option<Timestamp>, String> {
148 let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
149 if let Some(((k, ts), _)) = self.committed.range(range).next_back() {
150 if k.as_slice() == key {
151 return Ok(Some(*ts));
152 }
153 }
154 Ok(None)
155 }
156
157 fn get_intent(&self, key: &[u8]) -> Result<Option<Intent>, String> {
158 Ok(self.intents.get(key).cloned())
159 }
160
161 fn put_intent(&mut self, intent: Intent) -> Result<(), String> {
162 self.all_keys_set.insert(intent.key.clone());
163 self.intents.insert(intent.key.clone(), intent);
164 Ok(())
165 }
166
167 fn remove_intent(
168 &mut self,
169 key: &[u8],
170 txn_id: TxnId,
171 start_ts: Timestamp,
172 ) -> Result<bool, String> {
173 if let Some(intent) = self.intents.get(key) {
174 if intent.txn_id == txn_id && intent.start_ts == start_ts {
175 self.intents.remove(key);
176 self.maybe_remove_from_keys(key);
177 return Ok(true);
178 }
179 }
180 Ok(false)
181 }
182
183 fn put_committed(&mut self, version: CommittedVersion) -> Result<(), String> {
184 self.all_keys_set.insert(version.key.clone());
185 self.committed.insert(
186 (version.key.clone(), version.commit_ts),
187 version.value.clone(),
188 );
189 Ok(())
190 }
191
192 fn put_committed_batch(&mut self, versions: Vec<CommittedVersion>) -> Result<(), String> {
193 for version in versions {
194 self.all_keys_set.insert(version.key.clone());
195 self.committed.insert(
196 (version.key.clone(), version.commit_ts),
197 version.value.clone(),
198 );
199 }
200 Ok(())
201 }
202
203 fn remove_committed_version(&mut self, key: &[u8], commit_ts: Timestamp) -> Result<(), String> {
204 self.committed.remove(&(key.to_vec(), commit_ts));
205 self.maybe_remove_from_keys(key);
206 Ok(())
207 }
208
209 fn all_keys(&self) -> Result<Vec<Key>, String> {
210 Ok(self.all_keys_set.iter().cloned().collect())
211 }
212
213 fn put_intents_batch(&mut self, intents: Vec<Intent>) -> Result<(), String> {
214 for intent in intents {
215 self.all_keys_set.insert(intent.key.clone());
216 self.intents.insert(intent.key.clone(), intent);
217 }
218 Ok(())
219 }
220
221 fn commit_intents_batch(
222 &mut self,
223 commits: Vec<CommittedVersion>,
224 removed_intents: Vec<(Key, TxnId, Timestamp)>,
225 ) -> Result<(), String> {
226 for version in commits {
227 self.all_keys_set.insert(version.key.clone());
228 self.committed.insert(
229 (version.key.clone(), version.commit_ts),
230 version.value.clone(),
231 );
232 }
233 for (key, txn_id, start_ts) in removed_intents {
234 if let Some(intent) = self.intents.get(&key) {
235 if intent.txn_id == txn_id && intent.start_ts == start_ts {
236 self.intents.remove(&key);
237 self.maybe_remove_from_keys(&key);
238 }
239 }
240 }
241 Ok(())
242 }
243
244 fn remove_intents_batch(
245 &mut self,
246 intents: Vec<(Key, TxnId, Timestamp)>,
247 ) -> Result<(), String> {
248 for (key, txn_id, start_ts) in intents {
249 if let Some(intent) = self.intents.get(&key) {
250 if intent.txn_id == txn_id && intent.start_ts == start_ts {
251 self.intents.remove(&key);
252 self.maybe_remove_from_keys(&key);
253 }
254 }
255 }
256 Ok(())
257 }
258
259 fn keys_from(&self, start: Option<&[u8]>, limit: usize) -> Result<Vec<Key>, String> {
260 if let Some(s) = start {
261 Ok(self
262 .all_keys_set
263 .range(s.to_vec()..)
264 .take(limit)
265 .cloned()
266 .collect())
267 } else {
268 Ok(self.all_keys_set.iter().take(limit).cloned().collect())
269 }
270 }
271
272 fn keys_from_prefix(
273 &self,
274 prefix: &[u8],
275 start: Option<&[u8]>,
276 limit: usize,
277 ) -> Result<Vec<Key>, String> {
278 if limit == 0 {
279 return Ok(Vec::new());
280 }
281 if prefix.is_empty() {
282 return Err("Prefix cannot be empty".to_string());
283 }
284 if let Some(s) = start {
285 if !s.starts_with(prefix) {
286 return Err("Start cursor must start with the prefix".to_string());
287 }
288 }
289 let scan_start = start.unwrap_or(prefix);
290 let mut result = Vec::new();
291 let range_start = (scan_start.to_vec(), Timestamp(0));
292 for ((k, _), _) in self.committed.range(range_start..) {
293 if !k.starts_with(prefix) {
294 break;
295 }
296 if result.last() != Some(k) {
297 result.push(k.clone());
298 if result.len() == limit {
299 break;
300 }
301 }
302 }
303 Ok(result)
304 }
305
306 fn get_committed_timestamps_before(
307 &self,
308 key: &[u8],
309 before_ts: Timestamp,
310 limit: usize,
311 ) -> Result<Vec<Timestamp>, String> {
312 let range = (key.to_vec(), Timestamp(0))..(key.to_vec(), before_ts);
313 let mut result = Vec::new();
314 for ((k, ts), _) in self.committed.range(range).rev().take(limit) {
315 if k.as_slice() == key {
316 result.push(*ts);
317 }
318 }
319 Ok(result)
320 }
321
322 fn collapse_tombstone(
323 &mut self,
324 key: &[u8],
325 tombstone_ts: Timestamp,
326 older_ts: Vec<Timestamp>,
327 ) -> Result<(), String> {
328 self.committed.remove(&(key.to_vec(), tombstone_ts));
329 for ts in older_ts {
330 self.committed.remove(&(key.to_vec(), ts));
331 }
332 self.maybe_remove_from_keys(key);
333 Ok(())
334 }
335}
336
337impl InMemoryBackend {
338 fn maybe_remove_from_keys(&mut self, key: &[u8]) {
339 if !self.intents.contains_key(key) {
340 let range = (key.to_vec(), Timestamp(0))..=(key.to_vec(), Timestamp(u64::MAX));
341 if self.committed.range(range).next().is_none() {
342 self.all_keys_set.remove(key);
343 }
344 }
345 }
346}