1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10 common::CommitVersion,
11 delta::Delta,
12 encoded::{
13 encoded::EncodedValues,
14 key::{EncodedKey, EncodedKeyRange},
15 },
16 event::metric::{StorageDelete, StorageStatsRecordedEvent, StorageWrite},
17 interface::store::{
18 MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionGetPrevious,
19 MultiVersionStore, MultiVersionValues,
20 },
21};
22use reifydb_type::util::{cowvec::CowVec, hex};
23use tracing::instrument;
24
25use super::{
26 StandardMultiStore,
27 router::{classify_key, is_single_version_semantics_key},
28 version::{VersionedGetResult, get_at_version},
29 worker::{DropMessage, DropRequest},
30};
31use crate::tier::{EntryKind, EntryKind::Multi, RangeCursor, TierStorage};
32
33const TIER_SCAN_CHUNK_SIZE: usize = 4096;
36
37impl MultiVersionGet for StandardMultiStore {
38 #[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
39 fn get(&self, key: &EncodedKey, version: CommitVersion) -> crate::Result<Option<MultiVersionValues>> {
40 let table = classify_key(key);
41
42 if let Some(hot) = &self.hot {
44 match get_at_version(hot, table, key.as_ref(), version)? {
45 VersionedGetResult::Value {
46 value,
47 version: v,
48 } => {
49 return Ok(Some(MultiVersionValues {
50 key: key.clone(),
51 values: EncodedValues(value),
52 version: v,
53 }));
54 }
55 VersionedGetResult::Tombstone => return Ok(None),
56 VersionedGetResult::NotFound => {}
57 }
58 }
59
60 if let Some(warm) = &self.warm {
62 match get_at_version(warm, table, key.as_ref(), version)? {
63 VersionedGetResult::Value {
64 value,
65 version: v,
66 } => {
67 return Ok(Some(MultiVersionValues {
68 key: key.clone(),
69 values: EncodedValues(value),
70 version: v,
71 }));
72 }
73 VersionedGetResult::Tombstone => return Ok(None),
74 VersionedGetResult::NotFound => {}
75 }
76 }
77
78 if let Some(cold) = &self.cold {
80 match get_at_version(cold, table, key.as_ref(), version)? {
81 VersionedGetResult::Value {
82 value,
83 version: v,
84 } => {
85 return Ok(Some(MultiVersionValues {
86 key: key.clone(),
87 values: EncodedValues(value),
88 version: v,
89 }));
90 }
91 VersionedGetResult::Tombstone => return Ok(None),
92 VersionedGetResult::NotFound => {}
93 }
94 }
95
96 Ok(None)
97 }
98}
99
100impl MultiVersionContains for StandardMultiStore {
101 #[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0), ret)]
102 fn contains(&self, key: &EncodedKey, version: CommitVersion) -> crate::Result<bool> {
103 Ok(MultiVersionGet::get(self, key, version)?.is_some())
104 }
105}
106
107impl MultiVersionCommit for StandardMultiStore {
108 #[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
109 fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> crate::Result<()> {
110 let Some(storage) = &self.hot else {
112 return Ok(());
113 };
114
115 let mut pending_set_keys: HashSet<Vec<u8>> = HashSet::new();
116 let mut writes: Vec<StorageWrite> = Vec::new();
117 let mut deletes: Vec<StorageDelete> = Vec::new();
118 let mut batches: HashMap<EntryKind, Vec<(CowVec<u8>, Option<CowVec<u8>>)>> = HashMap::new();
119 let mut explicit_drops: Vec<(EntryKind, EncodedKey, Option<CommitVersion>, Option<usize>)> = Vec::new();
120
121 for delta in deltas.iter() {
122 let key = delta.key();
123
124 let table = classify_key(key);
125 let is_single_version = is_single_version_semantics_key(key);
126
127 match delta {
128 Delta::Set {
129 key,
130 values,
131 } => {
132 if is_single_version {
133 pending_set_keys.insert(key.as_ref().to_vec());
134 }
135
136 writes.push(StorageWrite {
137 key: key.clone(),
138 value_bytes: values.len() as u64,
139 });
140
141 let logical_key = CowVec::new(key.as_ref().to_vec());
143 batches.entry(table)
144 .or_default()
145 .push((logical_key, Some(CowVec::new(values.as_ref().to_vec()))));
146 }
147 Delta::Unset {
148 key,
149 values,
150 } => {
151 deletes.push(StorageDelete {
152 key: key.clone(),
153 value_bytes: values.len() as u64,
154 });
155
156 let logical_key = CowVec::new(key.as_ref().to_vec());
158 batches.entry(table).or_default().push((logical_key, None));
159 }
160 Delta::Remove {
161 key,
162 } => {
163 let logical_key = CowVec::new(key.as_ref().to_vec());
165 batches.entry(table).or_default().push((logical_key, None));
166 }
167 Delta::Drop {
168 key,
169 up_to_version,
170 keep_last_versions,
171 } => {
172 explicit_drops.push((table, key.clone(), *up_to_version, *keep_last_versions));
173 }
174 }
175 }
176
177 let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
179 for (table, key, up_to_version, keep_last_versions) in explicit_drops {
180 let pending_version = if pending_set_keys.contains(key.as_ref()) {
181 Some(version)
182 } else {
183 None
184 };
185
186 drop_batch.push(DropRequest {
187 table,
188 key: key.0.clone(),
189 up_to_version,
190 keep_last_versions,
191 commit_version: version,
192 pending_version,
193 });
194 }
195
196 for key_bytes in pending_set_keys.iter() {
198 let key = CowVec::new(key_bytes.clone());
199 let table = classify_key(&EncodedKey(key.clone()));
200 drop_batch.push(DropRequest {
201 table,
202 key,
203 up_to_version: None,
204 keep_last_versions: Some(1),
205 commit_version: version,
206 pending_version: Some(version),
207 });
208 }
209
210 if !drop_batch.is_empty() {
211 let _ = self.drop_actor.send(DropMessage::Batch(drop_batch));
212 }
213
214 storage.set(version, batches)?;
216
217 if !writes.is_empty() || !deletes.is_empty() {
219 self.event_bus.emit(StorageStatsRecordedEvent::new(writes, deletes, vec![], version));
220 }
221
222 Ok(())
223 }
224}
225
226#[derive(Debug, Clone, Default)]
231pub struct MultiVersionRangeCursor {
232 pub hot: RangeCursor,
234 pub warm: RangeCursor,
236 pub cold: RangeCursor,
238 pub exhausted: bool,
240}
241
242impl MultiVersionRangeCursor {
243 pub fn new() -> Self {
245 Self::default()
246 }
247
248 pub fn is_exhausted(&self) -> bool {
250 self.exhausted
251 }
252}
253
254impl StandardMultiStore {
255 pub fn range_next(
260 &self,
261 cursor: &mut MultiVersionRangeCursor,
262 range: EncodedKeyRange,
263 version: CommitVersion,
264 batch_size: u64,
265 ) -> crate::Result<MultiVersionBatch> {
266 if cursor.exhausted {
267 return Ok(MultiVersionBatch {
268 items: Vec::new(),
269 has_more: false,
270 });
271 }
272
273 let table = classify_key_range(&range);
274 let (start, end) = make_range_bounds(&range);
275 let batch_size = batch_size as usize;
276
277 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
279
280 while collected.len() < batch_size {
282 let mut any_progress = false;
283
284 if let Some(hot) = &self.hot {
286 if !cursor.hot.exhausted {
287 let progress = Self::scan_tier_chunk(
288 hot,
289 table,
290 &mut cursor.hot,
291 &start,
292 &end,
293 version,
294 &range,
295 &mut collected,
296 )?;
297 any_progress |= progress;
298 }
299 }
300
301 if let Some(warm) = &self.warm {
303 if !cursor.warm.exhausted {
304 let progress = Self::scan_tier_chunk(
305 warm,
306 table,
307 &mut cursor.warm,
308 &start,
309 &end,
310 version,
311 &range,
312 &mut collected,
313 )?;
314 any_progress |= progress;
315 }
316 }
317
318 if let Some(cold) = &self.cold {
320 if !cursor.cold.exhausted {
321 let progress = Self::scan_tier_chunk(
322 cold,
323 table,
324 &mut cursor.cold,
325 &start,
326 &end,
327 version,
328 &range,
329 &mut collected,
330 )?;
331 any_progress |= progress;
332 }
333 }
334
335 if !any_progress {
336 cursor.exhausted = true;
338 break;
339 }
340 }
341
342 let items: Vec<MultiVersionValues> = collected
344 .into_iter()
345 .take(batch_size)
346 .filter_map(|(key_bytes, (v, value))| {
347 value.map(|val| MultiVersionValues {
348 key: EncodedKey(CowVec::new(key_bytes)),
349 values: EncodedValues(val),
350 version: v,
351 })
352 })
353 .collect();
354
355 let has_more = items.len() >= batch_size || !cursor.exhausted;
356
357 Ok(MultiVersionBatch {
358 items,
359 has_more,
360 })
361 }
362
363 fn scan_tier_chunk<S: TierStorage>(
366 storage: &S,
367 table: EntryKind,
368 cursor: &mut RangeCursor,
369 start: &[u8],
370 end: &[u8],
371 version: CommitVersion,
372 range: &EncodedKeyRange,
373 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
374 ) -> crate::Result<bool> {
375 let batch = storage.range_next(
376 table,
377 cursor,
378 Bound::Included(start),
379 Bound::Included(end),
380 version,
381 TIER_SCAN_CHUNK_SIZE,
382 )?;
383
384 if batch.entries.is_empty() {
385 return Ok(false);
386 }
387
388 for entry in batch.entries {
389 let original_key = entry.key.as_slice().to_vec();
391 let entry_version = entry.version;
392
393 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
395 if !range.contains(&original_key_encoded) {
396 continue;
397 }
398
399 let should_update = match collected.get(&original_key) {
401 None => true,
402 Some((existing_version, _)) => entry_version > *existing_version,
403 };
404
405 if should_update {
406 collected.insert(original_key, (entry_version, entry.value));
407 }
408 }
409
410 Ok(true)
411 }
412
413 pub fn range(
419 &self,
420 range: EncodedKeyRange,
421 version: CommitVersion,
422 batch_size: usize,
423 ) -> MultiVersionRangeIter {
424 MultiVersionRangeIter {
425 store: self.clone(),
426 cursor: MultiVersionRangeCursor::new(),
427 range,
428 version,
429 batch_size,
430 current_batch: Vec::new(),
431 current_index: 0,
432 }
433 }
434
435 pub fn range_rev(
441 &self,
442 range: EncodedKeyRange,
443 version: CommitVersion,
444 batch_size: usize,
445 ) -> MultiVersionRangeRevIter {
446 MultiVersionRangeRevIter {
447 store: self.clone(),
448 cursor: MultiVersionRangeCursor::new(),
449 range,
450 version,
451 batch_size,
452 current_batch: Vec::new(),
453 current_index: 0,
454 }
455 }
456
457 fn range_rev_next(
462 &self,
463 cursor: &mut MultiVersionRangeCursor,
464 range: EncodedKeyRange,
465 version: CommitVersion,
466 batch_size: u64,
467 ) -> crate::Result<MultiVersionBatch> {
468 if cursor.exhausted {
469 return Ok(MultiVersionBatch {
470 items: Vec::new(),
471 has_more: false,
472 });
473 }
474
475 let table = classify_key_range(&range);
476 let (start, end) = make_range_bounds(&range);
477 let batch_size = batch_size as usize;
478
479 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
481
482 while collected.len() < batch_size {
484 let mut any_progress = false;
485
486 if let Some(hot) = &self.hot {
488 if !cursor.hot.exhausted {
489 let progress = Self::scan_tier_chunk_rev(
490 hot,
491 table,
492 &mut cursor.hot,
493 &start,
494 &end,
495 version,
496 &range,
497 &mut collected,
498 )?;
499 any_progress |= progress;
500 }
501 }
502
503 if let Some(warm) = &self.warm {
505 if !cursor.warm.exhausted {
506 let progress = Self::scan_tier_chunk_rev(
507 warm,
508 table,
509 &mut cursor.warm,
510 &start,
511 &end,
512 version,
513 &range,
514 &mut collected,
515 )?;
516 any_progress |= progress;
517 }
518 }
519
520 if let Some(cold) = &self.cold {
522 if !cursor.cold.exhausted {
523 let progress = Self::scan_tier_chunk_rev(
524 cold,
525 table,
526 &mut cursor.cold,
527 &start,
528 &end,
529 version,
530 &range,
531 &mut collected,
532 )?;
533 any_progress |= progress;
534 }
535 }
536
537 if !any_progress {
538 cursor.exhausted = true;
540 break;
541 }
542 }
543
544 let items: Vec<MultiVersionValues> = collected
546 .into_iter()
547 .rev()
548 .take(batch_size)
549 .filter_map(|(key_bytes, (v, value))| {
550 value.map(|val| MultiVersionValues {
551 key: EncodedKey(CowVec::new(key_bytes)),
552 values: EncodedValues(val),
553 version: v,
554 })
555 })
556 .collect();
557
558 let has_more = items.len() >= batch_size || !cursor.exhausted;
559
560 Ok(MultiVersionBatch {
561 items,
562 has_more,
563 })
564 }
565
566 fn scan_tier_chunk_rev<S: TierStorage>(
569 storage: &S,
570 table: EntryKind,
571 cursor: &mut RangeCursor,
572 start: &[u8],
573 end: &[u8],
574 version: CommitVersion,
575 range: &EncodedKeyRange,
576 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
577 ) -> crate::Result<bool> {
578 let batch = storage.range_rev_next(
579 table,
580 cursor,
581 Bound::Included(start),
582 Bound::Included(end),
583 version,
584 TIER_SCAN_CHUNK_SIZE,
585 )?;
586
587 if batch.entries.is_empty() {
588 return Ok(false);
589 }
590
591 for entry in batch.entries {
592 let original_key = entry.key.as_slice().to_vec();
594 let entry_version = entry.version;
595
596 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
598 if !range.contains(&original_key_encoded) {
599 continue;
600 }
601
602 let should_update = match collected.get(&original_key) {
604 None => true,
605 Some((existing_version, _)) => entry_version > *existing_version,
606 };
607
608 if should_update {
609 collected.insert(original_key, (entry_version, entry.value));
610 }
611 }
612
613 Ok(true)
614 }
615}
616
617impl MultiVersionGetPrevious for StandardMultiStore {
618 fn get_previous_version(
619 &self,
620 key: &EncodedKey,
621 before_version: CommitVersion,
622 ) -> crate::Result<Option<MultiVersionValues>> {
623 if before_version.0 == 0 {
624 return Ok(None);
625 }
626
627 let storage = self.hot.as_ref().expect("hot storage required for version lookups");
629
630 let table = classify_key(key);
631 let prev_version = CommitVersion(before_version.0 - 1);
632
633 match get_at_version(storage, table, key.as_ref(), prev_version) {
634 Ok(VersionedGetResult::Value {
635 value,
636 version,
637 }) => Ok(Some(MultiVersionValues {
638 key: key.clone(),
639 values: EncodedValues(CowVec::new(value.to_vec())),
640 version,
641 })),
642 Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
643 Err(e) => Err(e),
644 }
645 }
646}
647
648impl MultiVersionStore for StandardMultiStore {}
649
650pub struct MultiVersionRangeIter {
652 store: StandardMultiStore,
653 cursor: MultiVersionRangeCursor,
654 range: EncodedKeyRange,
655 version: CommitVersion,
656 batch_size: usize,
657 current_batch: Vec<MultiVersionValues>,
658 current_index: usize,
659}
660
661impl Iterator for MultiVersionRangeIter {
662 type Item = crate::Result<MultiVersionValues>;
663
664 fn next(&mut self) -> Option<Self::Item> {
665 if self.current_index < self.current_batch.len() {
667 let item = self.current_batch[self.current_index].clone();
668 self.current_index += 1;
669 return Some(Ok(item));
670 }
671
672 if self.cursor.exhausted {
674 return None;
675 }
676
677 match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
679 {
680 Ok(batch) => {
681 if batch.items.is_empty() {
682 return None;
683 }
684 self.current_batch = batch.items;
685 self.current_index = 0;
686 self.next()
687 }
688 Err(e) => Some(Err(e)),
689 }
690 }
691}
692
693pub struct MultiVersionRangeRevIter {
695 store: StandardMultiStore,
696 cursor: MultiVersionRangeCursor,
697 range: EncodedKeyRange,
698 version: CommitVersion,
699 batch_size: usize,
700 current_batch: Vec<MultiVersionValues>,
701 current_index: usize,
702}
703
704impl Iterator for MultiVersionRangeRevIter {
705 type Item = crate::Result<MultiVersionValues>;
706
707 fn next(&mut self) -> Option<Self::Item> {
708 if self.current_index < self.current_batch.len() {
710 let item = self.current_batch[self.current_index].clone();
711 self.current_index += 1;
712 return Some(Ok(item));
713 }
714
715 if self.cursor.exhausted {
717 return None;
718 }
719
720 match self.store.range_rev_next(
722 &mut self.cursor,
723 self.range.clone(),
724 self.version,
725 self.batch_size as u64,
726 ) {
727 Ok(batch) => {
728 if batch.items.is_empty() {
729 return None;
730 }
731 self.current_batch = batch.items;
732 self.current_index = 0;
733 self.next()
734 }
735 Err(e) => Some(Err(e)),
736 }
737 }
738}
739
740fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
742 use super::router::classify_range;
743
744 classify_range(range).unwrap_or(Multi)
745}
746
747fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
750 let start = match &range.start {
751 Bound::Included(key) => key.as_ref().to_vec(),
752 Bound::Excluded(key) => key.as_ref().to_vec(),
753 Bound::Unbounded => vec![],
754 };
755
756 let end = match &range.end {
757 Bound::Included(key) => key.as_ref().to_vec(),
758 Bound::Excluded(key) => key.as_ref().to_vec(),
759 Bound::Unbounded => vec![0xFFu8; 256],
760 };
761
762 (start, end)
763}