1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10 common::CommitVersion,
11 delta::Delta,
12 encoded::{
13 encoded::EncodedValues,
14 key::{EncodedKey, EncodedKeyRange},
15 },
16 event::metric::{StorageDelete, StorageStatsRecordedEvent, StorageWrite},
17 interface::store::{
18 MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionGetPrevious,
19 MultiVersionStore, MultiVersionValues,
20 },
21};
22use reifydb_type::util::{cowvec::CowVec, hex};
23use tracing::{instrument, warn};
24
25use super::{
26 StandardMultiStore,
27 router::{classify_key, classify_range, is_single_version_semantics_key},
28 version::{VersionedGetResult, get_at_version},
29 worker::{DropMessage, DropRequest},
30};
31use crate::{
32 Result,
33 tier::{EntryKind, EntryKind::Multi, RangeCursor, TierStorage},
34};
35
36const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41 #[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42 fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionValues>> {
43 let table = classify_key(key);
44
45 if let Some(hot) = &self.hot {
47 match get_at_version(hot, table, key.as_ref(), version)? {
48 VersionedGetResult::Value {
49 value,
50 version: v,
51 } => {
52 return Ok(Some(MultiVersionValues {
53 key: key.clone(),
54 values: EncodedValues(value),
55 version: v,
56 }));
57 }
58 VersionedGetResult::Tombstone => return Ok(None),
59 VersionedGetResult::NotFound => {}
60 }
61 }
62
63 if let Some(warm) = &self.warm {
65 match get_at_version(warm, table, key.as_ref(), version)? {
66 VersionedGetResult::Value {
67 value,
68 version: v,
69 } => {
70 return Ok(Some(MultiVersionValues {
71 key: key.clone(),
72 values: EncodedValues(value),
73 version: v,
74 }));
75 }
76 VersionedGetResult::Tombstone => return Ok(None),
77 VersionedGetResult::NotFound => {}
78 }
79 }
80
81 if let Some(cold) = &self.cold {
83 match get_at_version(cold, table, key.as_ref(), version)? {
84 VersionedGetResult::Value {
85 value,
86 version: v,
87 } => {
88 return Ok(Some(MultiVersionValues {
89 key: key.clone(),
90 values: EncodedValues(value),
91 version: v,
92 }));
93 }
94 VersionedGetResult::Tombstone => return Ok(None),
95 VersionedGetResult::NotFound => {}
96 }
97 }
98
99 Ok(None)
100 }
101}
102
103impl MultiVersionContains for StandardMultiStore {
104 #[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105 fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106 Ok(MultiVersionGet::get(self, key, version)?.is_some())
107 }
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111 #[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112 fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113 let Some(storage) = &self.hot else {
115 return Ok(());
116 };
117
118 let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
119 let mut writes: Vec<StorageWrite> = Vec::new();
120 let mut deletes: Vec<StorageDelete> = Vec::new();
121 let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>> = HashMap::new();
122 let mut explicit_drops: Vec<(EntryKind, EncodedKey, Option<CommitVersion>, Option<usize>)> = Vec::new();
123
124 for delta in deltas.iter() {
125 let key = delta.key();
126
127 let table = classify_key(key);
128 let is_single_version = is_single_version_semantics_key(key);
129
130 match delta {
131 Delta::Set {
132 key,
133 values,
134 } => {
135 if is_single_version {
136 pending_set_keys.insert(key.0.clone());
137 }
138
139 writes.push(StorageWrite {
140 key: key.clone(),
141 value_bytes: values.len() as u64,
142 });
143
144 batches.entry(table).or_default().push((key.0.clone(), Some(values.0.clone())));
145 }
146 Delta::Unset {
147 key,
148 values,
149 } => {
150 deletes.push(StorageDelete {
151 key: key.clone(),
152 value_bytes: values.len() as u64,
153 });
154
155 batches.entry(table).or_default().push((key.0.clone(), None));
156 }
157 Delta::Remove {
158 key,
159 } => {
160 batches.entry(table).or_default().push((key.0.clone(), None));
161 }
162 Delta::Drop {
163 key,
164 up_to_version,
165 keep_last_versions,
166 } => {
167 explicit_drops.push((table, key.clone(), *up_to_version, *keep_last_versions));
168 }
169 }
170 }
171
172 let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
174 for (table, key, up_to_version, keep_last_versions) in explicit_drops {
175 let pending_version = if pending_set_keys.contains(key.as_ref()) {
176 Some(version)
177 } else {
178 None
179 };
180
181 drop_batch.push(DropRequest {
182 table,
183 key: key.0.clone(),
184 up_to_version,
185 keep_last_versions,
186 commit_version: version,
187 pending_version,
188 });
189 }
190
191 for key in pending_set_keys.iter() {
193 let table = classify_key(&EncodedKey(key.clone()));
194 drop_batch.push(DropRequest {
195 table,
196 key: key.clone(),
197 up_to_version: None,
198 keep_last_versions: Some(1),
199 commit_version: version,
200 pending_version: Some(version),
201 });
202 }
203
204 if !drop_batch.is_empty() {
205 if self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
206 warn!("Failed to send drop batch");
207 }
208 }
209
210 storage.set(version, batches)?;
212
213 if !writes.is_empty() || !deletes.is_empty() {
215 self.event_bus.emit(StorageStatsRecordedEvent::new(writes, deletes, vec![], version));
216 }
217
218 Ok(())
219 }
220}
221
222#[derive(Debug, Clone, Default)]
227pub struct MultiVersionRangeCursor {
228 pub hot: RangeCursor,
230 pub warm: RangeCursor,
232 pub cold: RangeCursor,
234 pub exhausted: bool,
236}
237
238impl MultiVersionRangeCursor {
239 pub fn new() -> Self {
241 Self::default()
242 }
243
244 pub fn is_exhausted(&self) -> bool {
246 self.exhausted
247 }
248}
249
250impl StandardMultiStore {
251 pub fn range_next(
256 &self,
257 cursor: &mut MultiVersionRangeCursor,
258 range: EncodedKeyRange,
259 version: CommitVersion,
260 batch_size: u64,
261 ) -> Result<MultiVersionBatch> {
262 if cursor.exhausted {
263 return Ok(MultiVersionBatch {
264 items: Vec::new(),
265 has_more: false,
266 });
267 }
268
269 let table = classify_key_range(&range);
270 let (start, end) = make_range_bounds(&range);
271 let batch_size = batch_size as usize;
272
273 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
275
276 while collected.len() < batch_size {
278 let mut any_progress = false;
279
280 if let Some(hot) = &self.hot {
282 if !cursor.hot.exhausted {
283 let progress = Self::scan_tier_chunk(
284 hot,
285 table,
286 &mut cursor.hot,
287 &start,
288 &end,
289 version,
290 &range,
291 &mut collected,
292 )?;
293 any_progress |= progress;
294 }
295 }
296
297 if let Some(warm) = &self.warm {
299 if !cursor.warm.exhausted {
300 let progress = Self::scan_tier_chunk(
301 warm,
302 table,
303 &mut cursor.warm,
304 &start,
305 &end,
306 version,
307 &range,
308 &mut collected,
309 )?;
310 any_progress |= progress;
311 }
312 }
313
314 if let Some(cold) = &self.cold {
316 if !cursor.cold.exhausted {
317 let progress = Self::scan_tier_chunk(
318 cold,
319 table,
320 &mut cursor.cold,
321 &start,
322 &end,
323 version,
324 &range,
325 &mut collected,
326 )?;
327 any_progress |= progress;
328 }
329 }
330
331 if !any_progress {
332 cursor.exhausted = true;
334 break;
335 }
336 }
337
338 let items: Vec<MultiVersionValues> = collected
340 .into_iter()
341 .take(batch_size)
342 .filter_map(|(key_bytes, (v, value))| {
343 value.map(|val| MultiVersionValues {
344 key: EncodedKey(CowVec::new(key_bytes)),
345 values: EncodedValues(val),
346 version: v,
347 })
348 })
349 .collect();
350
351 let has_more = items.len() >= batch_size || !cursor.exhausted;
352
353 Ok(MultiVersionBatch {
354 items,
355 has_more,
356 })
357 }
358
359 fn scan_tier_chunk<S: TierStorage>(
362 storage: &S,
363 table: EntryKind,
364 cursor: &mut RangeCursor,
365 start: &[u8],
366 end: &[u8],
367 version: CommitVersion,
368 range: &EncodedKeyRange,
369 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
370 ) -> Result<bool> {
371 let batch = storage.range_next(
372 table,
373 cursor,
374 Bound::Included(start),
375 Bound::Included(end),
376 version,
377 TIER_SCAN_CHUNK_SIZE,
378 )?;
379
380 if batch.entries.is_empty() {
381 return Ok(false);
382 }
383
384 for entry in batch.entries {
385 let original_key = entry.key.as_slice().to_vec();
387 let entry_version = entry.version;
388
389 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
391 if !range.contains(&original_key_encoded) {
392 continue;
393 }
394
395 let should_update = match collected.get(&original_key) {
397 None => true,
398 Some((existing_version, _)) => entry_version > *existing_version,
399 };
400
401 if should_update {
402 collected.insert(original_key, (entry_version, entry.value));
403 }
404 }
405
406 Ok(true)
407 }
408
409 pub fn range(
415 &self,
416 range: EncodedKeyRange,
417 version: CommitVersion,
418 batch_size: usize,
419 ) -> MultiVersionRangeIter {
420 MultiVersionRangeIter {
421 store: self.clone(),
422 cursor: MultiVersionRangeCursor::new(),
423 range,
424 version,
425 batch_size,
426 current_batch: Vec::new(),
427 current_index: 0,
428 }
429 }
430
431 pub fn range_rev(
437 &self,
438 range: EncodedKeyRange,
439 version: CommitVersion,
440 batch_size: usize,
441 ) -> MultiVersionRangeRevIter {
442 MultiVersionRangeRevIter {
443 store: self.clone(),
444 cursor: MultiVersionRangeCursor::new(),
445 range,
446 version,
447 batch_size,
448 current_batch: Vec::new(),
449 current_index: 0,
450 }
451 }
452
453 fn range_rev_next(
458 &self,
459 cursor: &mut MultiVersionRangeCursor,
460 range: EncodedKeyRange,
461 version: CommitVersion,
462 batch_size: u64,
463 ) -> Result<MultiVersionBatch> {
464 if cursor.exhausted {
465 return Ok(MultiVersionBatch {
466 items: Vec::new(),
467 has_more: false,
468 });
469 }
470
471 let table = classify_key_range(&range);
472 let (start, end) = make_range_bounds(&range);
473 let batch_size = batch_size as usize;
474
475 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
477
478 while collected.len() < batch_size {
480 let mut any_progress = false;
481
482 if let Some(hot) = &self.hot {
484 if !cursor.hot.exhausted {
485 let progress = Self::scan_tier_chunk_rev(
486 hot,
487 table,
488 &mut cursor.hot,
489 &start,
490 &end,
491 version,
492 &range,
493 &mut collected,
494 )?;
495 any_progress |= progress;
496 }
497 }
498
499 if let Some(warm) = &self.warm {
501 if !cursor.warm.exhausted {
502 let progress = Self::scan_tier_chunk_rev(
503 warm,
504 table,
505 &mut cursor.warm,
506 &start,
507 &end,
508 version,
509 &range,
510 &mut collected,
511 )?;
512 any_progress |= progress;
513 }
514 }
515
516 if let Some(cold) = &self.cold {
518 if !cursor.cold.exhausted {
519 let progress = Self::scan_tier_chunk_rev(
520 cold,
521 table,
522 &mut cursor.cold,
523 &start,
524 &end,
525 version,
526 &range,
527 &mut collected,
528 )?;
529 any_progress |= progress;
530 }
531 }
532
533 if !any_progress {
534 cursor.exhausted = true;
536 break;
537 }
538 }
539
540 let items: Vec<MultiVersionValues> = collected
542 .into_iter()
543 .rev()
544 .take(batch_size)
545 .filter_map(|(key_bytes, (v, value))| {
546 value.map(|val| MultiVersionValues {
547 key: EncodedKey(CowVec::new(key_bytes)),
548 values: EncodedValues(val),
549 version: v,
550 })
551 })
552 .collect();
553
554 let has_more = items.len() >= batch_size || !cursor.exhausted;
555
556 Ok(MultiVersionBatch {
557 items,
558 has_more,
559 })
560 }
561
562 fn scan_tier_chunk_rev<S: TierStorage>(
565 storage: &S,
566 table: EntryKind,
567 cursor: &mut RangeCursor,
568 start: &[u8],
569 end: &[u8],
570 version: CommitVersion,
571 range: &EncodedKeyRange,
572 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
573 ) -> Result<bool> {
574 let batch = storage.range_rev_next(
575 table,
576 cursor,
577 Bound::Included(start),
578 Bound::Included(end),
579 version,
580 TIER_SCAN_CHUNK_SIZE,
581 )?;
582
583 if batch.entries.is_empty() {
584 return Ok(false);
585 }
586
587 for entry in batch.entries {
588 let original_key = entry.key.as_slice().to_vec();
590 let entry_version = entry.version;
591
592 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
594 if !range.contains(&original_key_encoded) {
595 continue;
596 }
597
598 let should_update = match collected.get(&original_key) {
600 None => true,
601 Some((existing_version, _)) => entry_version > *existing_version,
602 };
603
604 if should_update {
605 collected.insert(original_key, (entry_version, entry.value));
606 }
607 }
608
609 Ok(true)
610 }
611}
612
613impl MultiVersionGetPrevious for StandardMultiStore {
614 fn get_previous_version(
615 &self,
616 key: &EncodedKey,
617 before_version: CommitVersion,
618 ) -> Result<Option<MultiVersionValues>> {
619 if before_version.0 == 0 {
620 return Ok(None);
621 }
622
623 let storage = self.hot.as_ref().expect("hot storage required for version lookups");
625
626 let table = classify_key(key);
627 let prev_version = CommitVersion(before_version.0 - 1);
628
629 match get_at_version(storage, table, key.as_ref(), prev_version) {
630 Ok(VersionedGetResult::Value {
631 value,
632 version,
633 }) => Ok(Some(MultiVersionValues {
634 key: key.clone(),
635 values: EncodedValues(CowVec::new(value.to_vec())),
636 version,
637 })),
638 Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
639 Err(e) => Err(e),
640 }
641 }
642}
643
644impl MultiVersionStore for StandardMultiStore {}
645
646pub struct MultiVersionRangeIter {
648 store: StandardMultiStore,
649 cursor: MultiVersionRangeCursor,
650 range: EncodedKeyRange,
651 version: CommitVersion,
652 batch_size: usize,
653 current_batch: Vec<MultiVersionValues>,
654 current_index: usize,
655}
656
657impl Iterator for MultiVersionRangeIter {
658 type Item = Result<MultiVersionValues>;
659
660 fn next(&mut self) -> Option<Self::Item> {
661 if self.current_index < self.current_batch.len() {
663 let item = self.current_batch[self.current_index].clone();
664 self.current_index += 1;
665 return Some(Ok(item));
666 }
667
668 if self.cursor.exhausted {
670 return None;
671 }
672
673 match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
675 {
676 Ok(batch) => {
677 if batch.items.is_empty() {
678 return None;
679 }
680 self.current_batch = batch.items;
681 self.current_index = 0;
682 self.next()
683 }
684 Err(e) => Some(Err(e)),
685 }
686 }
687}
688
689pub struct MultiVersionRangeRevIter {
691 store: StandardMultiStore,
692 cursor: MultiVersionRangeCursor,
693 range: EncodedKeyRange,
694 version: CommitVersion,
695 batch_size: usize,
696 current_batch: Vec<MultiVersionValues>,
697 current_index: usize,
698}
699
700impl Iterator for MultiVersionRangeRevIter {
701 type Item = Result<MultiVersionValues>;
702
703 fn next(&mut self) -> Option<Self::Item> {
704 if self.current_index < self.current_batch.len() {
706 let item = self.current_batch[self.current_index].clone();
707 self.current_index += 1;
708 return Some(Ok(item));
709 }
710
711 if self.cursor.exhausted {
713 return None;
714 }
715
716 match self.store.range_rev_next(
718 &mut self.cursor,
719 self.range.clone(),
720 self.version,
721 self.batch_size as u64,
722 ) {
723 Ok(batch) => {
724 if batch.items.is_empty() {
725 return None;
726 }
727 self.current_batch = batch.items;
728 self.current_index = 0;
729 self.next()
730 }
731 Err(e) => Some(Err(e)),
732 }
733 }
734}
735
736fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
738 classify_range(range).unwrap_or(Multi)
739}
740
741fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
744 let start = match &range.start {
745 Bound::Included(key) => key.as_ref().to_vec(),
746 Bound::Excluded(key) => key.as_ref().to_vec(),
747 Bound::Unbounded => vec![],
748 };
749
750 let end = match &range.end {
751 Bound::Included(key) => key.as_ref().to_vec(),
752 Bound::Excluded(key) => key.as_ref().to_vec(),
753 Bound::Unbounded => vec![0xFFu8; 256],
754 };
755
756 (start, end)
757}