1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10 common::CommitVersion,
11 delta::Delta,
12 encoded::{
13 key::{EncodedKey, EncodedKeyRange},
14 row::EncodedRow,
15 },
16 event::metric::{StorageDelete, StorageStatsRecordedEvent, StorageWrite},
17 interface::store::{
18 MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionGetPrevious,
19 MultiVersionRow, MultiVersionStore,
20 },
21};
22use reifydb_type::util::{cowvec::CowVec, hex};
23use tracing::{instrument, warn};
24
25use super::{
26 StandardMultiStore,
27 router::{classify_key, classify_range, is_single_version_semantics_key},
28 version::{VersionedGetResult, get_at_version},
29 worker::{DropMessage, DropRequest},
30};
31use crate::{
32 Result,
33 tier::{EntryKind, EntryKind::Multi, RangeCursor, TierBatch, TierStorage},
34};
35
36const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41 #[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42 fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43 let table = classify_key(key);
44
45 if let Some(hot) = &self.hot {
47 match get_at_version(hot, table, key.as_ref(), version)? {
48 VersionedGetResult::Value {
49 value,
50 version: v,
51 } => {
52 return Ok(Some(MultiVersionRow {
53 key: key.clone(),
54 row: EncodedRow(value),
55 version: v,
56 }));
57 }
58 VersionedGetResult::Tombstone => return Ok(None),
59 VersionedGetResult::NotFound => {}
60 }
61 }
62
63 if let Some(warm) = &self.warm {
65 match get_at_version(warm, table, key.as_ref(), version)? {
66 VersionedGetResult::Value {
67 value,
68 version: v,
69 } => {
70 return Ok(Some(MultiVersionRow {
71 key: key.clone(),
72 row: EncodedRow(value),
73 version: v,
74 }));
75 }
76 VersionedGetResult::Tombstone => return Ok(None),
77 VersionedGetResult::NotFound => {}
78 }
79 }
80
81 if let Some(cold) = &self.cold {
83 match get_at_version(cold, table, key.as_ref(), version)? {
84 VersionedGetResult::Value {
85 value,
86 version: v,
87 } => {
88 return Ok(Some(MultiVersionRow {
89 key: key.clone(),
90 row: EncodedRow(value),
91 version: v,
92 }));
93 }
94 VersionedGetResult::Tombstone => return Ok(None),
95 VersionedGetResult::NotFound => {}
96 }
97 }
98
99 Ok(None)
100 }
101}
102
103impl MultiVersionContains for StandardMultiStore {
104 #[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105 fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106 Ok(MultiVersionGet::get(self, key, version)?.is_some())
107 }
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111 #[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112 fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113 let Some(storage) = &self.hot else {
115 return Ok(());
116 };
117
118 let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
119 let mut writes: Vec<StorageWrite> = Vec::new();
120 let mut deletes: Vec<StorageDelete> = Vec::new();
121 let mut batches: TierBatch = HashMap::new();
122 let mut explicit_drops: Vec<(EntryKind, EncodedKey, Option<CommitVersion>, Option<usize>)> = Vec::new();
123
124 for delta in deltas.iter() {
125 let key = delta.key();
126
127 let table = classify_key(key);
128 let is_single_version = is_single_version_semantics_key(key);
129
130 match delta {
131 Delta::Set {
132 key,
133 row,
134 } => {
135 if is_single_version {
136 pending_set_keys.insert(key.0.clone());
137 }
138
139 writes.push(StorageWrite {
140 key: key.clone(),
141 value_bytes: row.len() as u64,
142 });
143
144 batches.entry(table).or_default().push((key.0.clone(), Some(row.0.clone())));
145 }
146 Delta::Unset {
147 key,
148 row,
149 } => {
150 deletes.push(StorageDelete {
151 key: key.clone(),
152 value_bytes: row.len() as u64,
153 });
154
155 batches.entry(table).or_default().push((key.0.clone(), None));
156 }
157 Delta::Remove {
158 key,
159 } => {
160 batches.entry(table).or_default().push((key.0.clone(), None));
161 }
162 Delta::Drop {
163 key,
164 up_to_version,
165 keep_last_versions,
166 } => {
167 explicit_drops.push((table, key.clone(), *up_to_version, *keep_last_versions));
168 }
169 }
170 }
171
172 let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
174 for (table, key, up_to_version, keep_last_versions) in explicit_drops {
175 let pending_version = if pending_set_keys.contains(key.as_ref()) {
176 Some(version)
177 } else {
178 None
179 };
180
181 drop_batch.push(DropRequest {
182 table,
183 key: key.0.clone(),
184 up_to_version,
185 keep_last_versions,
186 commit_version: version,
187 pending_version,
188 });
189 }
190
191 for key in pending_set_keys.iter() {
193 let table = classify_key(&EncodedKey(key.clone()));
194 drop_batch.push(DropRequest {
195 table,
196 key: key.clone(),
197 up_to_version: None,
198 keep_last_versions: Some(1),
199 commit_version: version,
200 pending_version: Some(version),
201 });
202 }
203
204 if !drop_batch.is_empty() && self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
205 warn!("Failed to send drop batch");
206 }
207
208 storage.set(version, batches)?;
210
211 if !writes.is_empty() || !deletes.is_empty() {
213 self.event_bus.emit(StorageStatsRecordedEvent::new(writes, deletes, vec![], version));
214 }
215
216 Ok(())
217 }
218}
219
220#[derive(Debug, Clone, Default)]
225pub struct MultiVersionRangeCursor {
226 pub hot: RangeCursor,
228 pub warm: RangeCursor,
230 pub cold: RangeCursor,
232 pub exhausted: bool,
234}
235
236impl MultiVersionRangeCursor {
237 pub fn new() -> Self {
239 Self::default()
240 }
241
242 pub fn is_exhausted(&self) -> bool {
244 self.exhausted
245 }
246}
247
248struct TierScanQuery<'a> {
250 table: EntryKind,
251 start: &'a [u8],
252 end: &'a [u8],
253 version: CommitVersion,
254 range: &'a EncodedKeyRange,
255}
256
257impl StandardMultiStore {
258 pub fn range_next(
263 &self,
264 cursor: &mut MultiVersionRangeCursor,
265 range: EncodedKeyRange,
266 version: CommitVersion,
267 batch_size: u64,
268 ) -> Result<MultiVersionBatch> {
269 if cursor.exhausted {
270 return Ok(MultiVersionBatch {
271 items: Vec::new(),
272 has_more: false,
273 });
274 }
275
276 let table = classify_key_range(&range);
277 let (start, end) = make_range_bounds(&range);
278 let batch_size = batch_size as usize;
279 let scan = TierScanQuery {
280 table,
281 start: &start,
282 end: &end,
283 version,
284 range: &range,
285 };
286
287 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
289
290 while collected.len() < batch_size {
292 let mut any_progress = false;
293
294 if let Some(hot) = &self.hot
296 && !cursor.hot.exhausted
297 {
298 let progress = Self::scan_tier_chunk(hot, &mut cursor.hot, &scan, &mut collected)?;
299 any_progress |= progress;
300 }
301
302 if let Some(warm) = &self.warm
304 && !cursor.warm.exhausted
305 {
306 let progress = Self::scan_tier_chunk(warm, &mut cursor.warm, &scan, &mut collected)?;
307 any_progress |= progress;
308 }
309
310 if let Some(cold) = &self.cold
312 && !cursor.cold.exhausted
313 {
314 let progress = Self::scan_tier_chunk(cold, &mut cursor.cold, &scan, &mut collected)?;
315 any_progress |= progress;
316 }
317
318 if !any_progress {
319 cursor.exhausted = true;
321 break;
322 }
323 }
324
325 let items: Vec<MultiVersionRow> = collected
327 .into_iter()
328 .take(batch_size)
329 .filter_map(|(key_bytes, (v, value))| {
330 value.map(|val| MultiVersionRow {
331 key: EncodedKey(CowVec::new(key_bytes)),
332 row: EncodedRow(val),
333 version: v,
334 })
335 })
336 .collect();
337
338 let has_more = items.len() >= batch_size || !cursor.exhausted;
339
340 Ok(MultiVersionBatch {
341 items,
342 has_more,
343 })
344 }
345
346 fn scan_tier_chunk<S: TierStorage>(
349 storage: &S,
350 cursor: &mut RangeCursor,
351 scan: &TierScanQuery,
352 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
353 ) -> Result<bool> {
354 let batch = storage.range_next(
355 scan.table,
356 cursor,
357 Bound::Included(scan.start),
358 Bound::Included(scan.end),
359 scan.version,
360 TIER_SCAN_CHUNK_SIZE,
361 )?;
362
363 if batch.entries.is_empty() {
364 return Ok(false);
365 }
366
367 for entry in batch.entries {
368 let original_key = entry.key.as_slice().to_vec();
370 let entry_version = entry.version;
371
372 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
374 if !scan.range.contains(&original_key_encoded) {
375 continue;
376 }
377
378 let should_update = match collected.get(&original_key) {
380 None => true,
381 Some((existing_version, _)) => entry_version > *existing_version,
382 };
383
384 if should_update {
385 collected.insert(original_key, (entry_version, entry.value));
386 }
387 }
388
389 Ok(true)
390 }
391
392 pub fn range(
398 &self,
399 range: EncodedKeyRange,
400 version: CommitVersion,
401 batch_size: usize,
402 ) -> MultiVersionRangeIter {
403 MultiVersionRangeIter {
404 store: self.clone(),
405 cursor: MultiVersionRangeCursor::new(),
406 range,
407 version,
408 batch_size,
409 current_batch: Vec::new(),
410 current_index: 0,
411 }
412 }
413
414 pub fn range_rev(
420 &self,
421 range: EncodedKeyRange,
422 version: CommitVersion,
423 batch_size: usize,
424 ) -> MultiVersionRangeRevIter {
425 MultiVersionRangeRevIter {
426 store: self.clone(),
427 cursor: MultiVersionRangeCursor::new(),
428 range,
429 version,
430 batch_size,
431 current_batch: Vec::new(),
432 current_index: 0,
433 }
434 }
435
436 fn range_rev_next(
441 &self,
442 cursor: &mut MultiVersionRangeCursor,
443 range: EncodedKeyRange,
444 version: CommitVersion,
445 batch_size: u64,
446 ) -> Result<MultiVersionBatch> {
447 if cursor.exhausted {
448 return Ok(MultiVersionBatch {
449 items: Vec::new(),
450 has_more: false,
451 });
452 }
453
454 let table = classify_key_range(&range);
455 let (start, end) = make_range_bounds(&range);
456 let batch_size = batch_size as usize;
457 let scan = TierScanQuery {
458 table,
459 start: &start,
460 end: &end,
461 version,
462 range: &range,
463 };
464
465 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
467
468 while collected.len() < batch_size {
470 let mut any_progress = false;
471
472 if let Some(hot) = &self.hot
474 && !cursor.hot.exhausted
475 {
476 let progress = Self::scan_tier_chunk_rev(hot, &mut cursor.hot, &scan, &mut collected)?;
477 any_progress |= progress;
478 }
479
480 if let Some(warm) = &self.warm
482 && !cursor.warm.exhausted
483 {
484 let progress =
485 Self::scan_tier_chunk_rev(warm, &mut cursor.warm, &scan, &mut collected)?;
486 any_progress |= progress;
487 }
488
489 if let Some(cold) = &self.cold
491 && !cursor.cold.exhausted
492 {
493 let progress =
494 Self::scan_tier_chunk_rev(cold, &mut cursor.cold, &scan, &mut collected)?;
495 any_progress |= progress;
496 }
497
498 if !any_progress {
499 cursor.exhausted = true;
501 break;
502 }
503 }
504
505 let items: Vec<MultiVersionRow> = collected
507 .into_iter()
508 .rev()
509 .take(batch_size)
510 .filter_map(|(key_bytes, (v, value))| {
511 value.map(|val| MultiVersionRow {
512 key: EncodedKey(CowVec::new(key_bytes)),
513 row: EncodedRow(val),
514 version: v,
515 })
516 })
517 .collect();
518
519 let has_more = items.len() >= batch_size || !cursor.exhausted;
520
521 Ok(MultiVersionBatch {
522 items,
523 has_more,
524 })
525 }
526
527 fn scan_tier_chunk_rev<S: TierStorage>(
530 storage: &S,
531 cursor: &mut RangeCursor,
532 scan: &TierScanQuery,
533 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
534 ) -> Result<bool> {
535 let batch = storage.range_rev_next(
536 scan.table,
537 cursor,
538 Bound::Included(scan.start),
539 Bound::Included(scan.end),
540 scan.version,
541 TIER_SCAN_CHUNK_SIZE,
542 )?;
543
544 if batch.entries.is_empty() {
545 return Ok(false);
546 }
547
548 for entry in batch.entries {
549 let original_key = entry.key.as_slice().to_vec();
551 let entry_version = entry.version;
552
553 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
555 if !scan.range.contains(&original_key_encoded) {
556 continue;
557 }
558
559 let should_update = match collected.get(&original_key) {
561 None => true,
562 Some((existing_version, _)) => entry_version > *existing_version,
563 };
564
565 if should_update {
566 collected.insert(original_key, (entry_version, entry.value));
567 }
568 }
569
570 Ok(true)
571 }
572}
573
574impl MultiVersionGetPrevious for StandardMultiStore {
575 fn get_previous_version(
576 &self,
577 key: &EncodedKey,
578 before_version: CommitVersion,
579 ) -> Result<Option<MultiVersionRow>> {
580 if before_version.0 == 0 {
581 return Ok(None);
582 }
583
584 let storage = self.hot.as_ref().expect("hot storage required for version lookups");
586
587 let table = classify_key(key);
588 let prev_version = CommitVersion(before_version.0 - 1);
589
590 match get_at_version(storage, table, key.as_ref(), prev_version) {
591 Ok(VersionedGetResult::Value {
592 value,
593 version,
594 }) => Ok(Some(MultiVersionRow {
595 key: key.clone(),
596 row: EncodedRow(CowVec::new(value.to_vec())),
597 version,
598 })),
599 Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
600 Err(e) => Err(e),
601 }
602 }
603}
604
605impl MultiVersionStore for StandardMultiStore {}
606
607pub struct MultiVersionRangeIter {
609 store: StandardMultiStore,
610 cursor: MultiVersionRangeCursor,
611 range: EncodedKeyRange,
612 version: CommitVersion,
613 batch_size: usize,
614 current_batch: Vec<MultiVersionRow>,
615 current_index: usize,
616}
617
618impl Iterator for MultiVersionRangeIter {
619 type Item = Result<MultiVersionRow>;
620
621 fn next(&mut self) -> Option<Self::Item> {
622 if self.current_index < self.current_batch.len() {
624 let item = self.current_batch[self.current_index].clone();
625 self.current_index += 1;
626 return Some(Ok(item));
627 }
628
629 if self.cursor.exhausted {
631 return None;
632 }
633
634 match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
636 {
637 Ok(batch) => {
638 if batch.items.is_empty() {
639 return None;
640 }
641 self.current_batch = batch.items;
642 self.current_index = 0;
643 self.next()
644 }
645 Err(e) => Some(Err(e)),
646 }
647 }
648}
649
650pub struct MultiVersionRangeRevIter {
652 store: StandardMultiStore,
653 cursor: MultiVersionRangeCursor,
654 range: EncodedKeyRange,
655 version: CommitVersion,
656 batch_size: usize,
657 current_batch: Vec<MultiVersionRow>,
658 current_index: usize,
659}
660
661impl Iterator for MultiVersionRangeRevIter {
662 type Item = Result<MultiVersionRow>;
663
664 fn next(&mut self) -> Option<Self::Item> {
665 if self.current_index < self.current_batch.len() {
667 let item = self.current_batch[self.current_index].clone();
668 self.current_index += 1;
669 return Some(Ok(item));
670 }
671
672 if self.cursor.exhausted {
674 return None;
675 }
676
677 match self.store.range_rev_next(
679 &mut self.cursor,
680 self.range.clone(),
681 self.version,
682 self.batch_size as u64,
683 ) {
684 Ok(batch) => {
685 if batch.items.is_empty() {
686 return None;
687 }
688 self.current_batch = batch.items;
689 self.current_index = 0;
690 self.next()
691 }
692 Err(e) => Some(Err(e)),
693 }
694 }
695}
696
697fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
699 classify_range(range).unwrap_or(Multi)
700}
701
702fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
705 let start = match &range.start {
706 Bound::Included(key) => key.as_ref().to_vec(),
707 Bound::Excluded(key) => key.as_ref().to_vec(),
708 Bound::Unbounded => vec![],
709 };
710
711 let end = match &range.end {
712 Bound::Included(key) => key.as_ref().to_vec(),
713 Bound::Excluded(key) => key.as_ref().to_vec(),
714 Bound::Unbounded => vec![0xFFu8; 256],
715 };
716
717 (start, end)
718}