1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 ops::{Bound, RangeBounds},
7};
8
9use reifydb_core::{
10 actors::drop::{DropMessage, DropRequest},
11 common::CommitVersion,
12 delta::Delta,
13 encoded::{
14 key::{EncodedKey, EncodedKeyRange},
15 row::EncodedRow,
16 },
17 event::metric::{MultiCommittedEvent, MultiDelete, MultiWrite},
18 interface::store::{
19 EntryKind, MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet,
20 MultiVersionGetPrevious, MultiVersionRow, MultiVersionStore,
21 },
22};
23use reifydb_type::util::{cowvec::CowVec, hex};
24use tracing::{instrument, warn};
25
26use super::{
27 StandardMultiStore,
28 router::{classify_key, classify_range, is_single_version_semantics_key},
29 version::{VersionedGetResult, get_at_version},
30};
31use crate::{
32 Result,
33 tier::{RangeCursor, TierBatch, TierStorage},
34};
35
36const TIER_SCAN_CHUNK_SIZE: usize = 4096;
39
40impl MultiVersionGet for StandardMultiStore {
41 #[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
42 fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
43 let table = classify_key(key);
44
45 if let Some(hot) = &self.hot {
47 match get_at_version(hot, table, key.as_ref(), version)? {
48 VersionedGetResult::Value {
49 value,
50 version: v,
51 } => {
52 return Ok(Some(MultiVersionRow {
53 key: key.clone(),
54 row: EncodedRow(value),
55 version: v,
56 }));
57 }
58 VersionedGetResult::Tombstone => return Ok(None),
59 VersionedGetResult::NotFound => {}
60 }
61 }
62
63 if let Some(warm) = &self.warm {
65 match get_at_version(warm, table, key.as_ref(), version)? {
66 VersionedGetResult::Value {
67 value,
68 version: v,
69 } => {
70 return Ok(Some(MultiVersionRow {
71 key: key.clone(),
72 row: EncodedRow(value),
73 version: v,
74 }));
75 }
76 VersionedGetResult::Tombstone => return Ok(None),
77 VersionedGetResult::NotFound => {}
78 }
79 }
80
81 if let Some(cold) = &self.cold {
83 match get_at_version(cold, table, key.as_ref(), version)? {
84 VersionedGetResult::Value {
85 value,
86 version: v,
87 } => {
88 return Ok(Some(MultiVersionRow {
89 key: key.clone(),
90 row: EncodedRow(value),
91 version: v,
92 }));
93 }
94 VersionedGetResult::Tombstone => return Ok(None),
95 VersionedGetResult::NotFound => {}
96 }
97 }
98
99 Ok(None)
100 }
101}
102
103impl MultiVersionContains for StandardMultiStore {
104 #[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
105 fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
106 Ok(MultiVersionGet::get(self, key, version)?.is_some())
107 }
108}
109
110impl MultiVersionCommit for StandardMultiStore {
111 #[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
112 fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
113 let Some(storage) = &self.hot else {
115 return Ok(());
116 };
117
118 let classified = classify_deltas(&deltas);
119 let drop_batch = build_drop_batch(classified.explicit_drops, &classified.pending_set_keys, version);
120 self.dispatch_drops(drop_batch);
121
122 storage.set(version, classified.batches)?;
123 self.emit_commit_metrics(classified.writes, classified.deletes, version);
124 Ok(())
125 }
126}
127
128struct ClassifiedDeltas {
133 pending_set_keys: HashSet<CowVec<u8>>,
134 writes: Vec<MultiWrite>,
135 deletes: Vec<MultiDelete>,
136 batches: TierBatch,
137 explicit_drops: Vec<(EntryKind, EncodedKey)>,
138}
139
140#[inline]
141fn classify_deltas(deltas: &CowVec<Delta>) -> ClassifiedDeltas {
142 let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
143 let mut writes: Vec<MultiWrite> = Vec::new();
144 let mut deletes: Vec<MultiDelete> = Vec::new();
145 let mut batches: TierBatch = HashMap::new();
146 let mut explicit_drops: Vec<(EntryKind, EncodedKey)> = Vec::new();
147
148 for delta in deltas.iter() {
149 let key = delta.key();
150 let table = classify_key(key);
151 let is_single_version = is_single_version_semantics_key(key);
152
153 match delta {
154 Delta::Set {
155 key,
156 row,
157 } => {
158 if is_single_version {
159 pending_set_keys.insert(key.0.clone());
160 }
161 writes.push(MultiWrite {
162 key: key.clone(),
163 value_bytes: row.len() as u64,
164 });
165 batches.entry(table).or_default().push((key.0.clone(), Some(row.0.clone())));
166 }
167 Delta::Unset {
168 key,
169 row,
170 } => {
171 deletes.push(MultiDelete {
172 key: key.clone(),
173 value_bytes: row.len() as u64,
174 });
175 batches.entry(table).or_default().push((key.0.clone(), None));
176 }
177 Delta::Remove {
178 key,
179 } => {
180 batches.entry(table).or_default().push((key.0.clone(), None));
181 }
182 Delta::Drop {
183 key,
184 } => {
185 explicit_drops.push((table, key.clone()));
186 }
187 }
188 }
189
190 ClassifiedDeltas {
191 pending_set_keys,
192 writes,
193 deletes,
194 batches,
195 explicit_drops,
196 }
197}
198
199#[inline]
204fn build_drop_batch(
205 explicit_drops: Vec<(EntryKind, EncodedKey)>,
206 pending_set_keys: &HashSet<CowVec<u8>>,
207 version: CommitVersion,
208) -> Vec<DropRequest> {
209 let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
210 for (table, key) in explicit_drops {
211 let pending_version = if pending_set_keys.contains(key.as_ref()) {
212 Some(version)
213 } else {
214 None
215 };
216 drop_batch.push(DropRequest {
217 table,
218 key: key.0.clone(),
219 commit_version: version,
220 pending_version,
221 });
222 }
223 for key in pending_set_keys.iter() {
224 let table = classify_key(&EncodedKey(key.clone()));
225 drop_batch.push(DropRequest {
226 table,
227 key: key.clone(),
228 commit_version: version,
229 pending_version: Some(version),
230 });
231 }
232 drop_batch
233}
234
235impl StandardMultiStore {
236 #[inline]
237 fn dispatch_drops(&self, drop_batch: Vec<DropRequest>) {
238 if !drop_batch.is_empty() && self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
239 warn!("Failed to send drop batch");
240 }
241 }
242
243 #[inline]
244 fn emit_commit_metrics(&self, writes: Vec<MultiWrite>, deletes: Vec<MultiDelete>, version: CommitVersion) {
245 if writes.is_empty() && deletes.is_empty() {
246 return;
247 }
248 self.event_bus.emit(MultiCommittedEvent::new(writes, deletes, vec![], version));
249 }
250}
251
252#[derive(Debug, Clone, Default)]
257pub struct MultiVersionRangeCursor {
258 pub hot: RangeCursor,
260 pub warm: RangeCursor,
262 pub cold: RangeCursor,
264 pub exhausted: bool,
266}
267
268impl MultiVersionRangeCursor {
269 pub fn new() -> Self {
271 Self::default()
272 }
273
274 pub fn is_exhausted(&self) -> bool {
276 self.exhausted
277 }
278}
279
280struct TierScanQuery<'a> {
282 table: EntryKind,
283 start: &'a [u8],
284 end: &'a [u8],
285 version: CommitVersion,
286 range: &'a EncodedKeyRange,
287}
288
289impl StandardMultiStore {
290 pub fn range_next(
295 &self,
296 cursor: &mut MultiVersionRangeCursor,
297 range: EncodedKeyRange,
298 version: CommitVersion,
299 batch_size: u64,
300 ) -> Result<MultiVersionBatch> {
301 if cursor.exhausted {
302 return Ok(MultiVersionBatch {
303 items: Vec::new(),
304 has_more: false,
305 });
306 }
307
308 let table = classify_key_range(&range);
309 let (start, end) = make_range_bounds(&range);
310 let batch_size = batch_size as usize;
311 let scan = TierScanQuery {
312 table,
313 start: &start,
314 end: &end,
315 version,
316 range: &range,
317 };
318
319 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
321
322 while collected.len() < batch_size {
324 let mut any_progress = false;
325
326 if let Some(hot) = &self.hot
328 && !cursor.hot.exhausted
329 {
330 let progress = Self::scan_tier_chunk(hot, &mut cursor.hot, &scan, &mut collected)?;
331 any_progress |= progress;
332 }
333
334 if let Some(warm) = &self.warm
336 && !cursor.warm.exhausted
337 {
338 let progress = Self::scan_tier_chunk(warm, &mut cursor.warm, &scan, &mut collected)?;
339 any_progress |= progress;
340 }
341
342 if let Some(cold) = &self.cold
344 && !cursor.cold.exhausted
345 {
346 let progress = Self::scan_tier_chunk(cold, &mut cursor.cold, &scan, &mut collected)?;
347 any_progress |= progress;
348 }
349
350 if !any_progress {
351 cursor.exhausted = true;
353 break;
354 }
355 }
356
357 let items: Vec<MultiVersionRow> = collected
359 .into_iter()
360 .take(batch_size)
361 .filter_map(|(key_bytes, (v, value))| {
362 value.map(|val| MultiVersionRow {
363 key: EncodedKey(CowVec::new(key_bytes)),
364 row: EncodedRow(val),
365 version: v,
366 })
367 })
368 .collect();
369
370 let has_more = items.len() >= batch_size || !cursor.exhausted;
371
372 Ok(MultiVersionBatch {
373 items,
374 has_more,
375 })
376 }
377
378 fn scan_tier_chunk<S: TierStorage>(
381 storage: &S,
382 cursor: &mut RangeCursor,
383 scan: &TierScanQuery,
384 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
385 ) -> Result<bool> {
386 let batch = storage.range_next(
387 scan.table,
388 cursor,
389 Bound::Included(scan.start),
390 Bound::Included(scan.end),
391 scan.version,
392 TIER_SCAN_CHUNK_SIZE,
393 )?;
394
395 if batch.entries.is_empty() {
396 return Ok(false);
397 }
398
399 for entry in batch.entries {
400 let original_key = entry.key.as_slice().to_vec();
402 let entry_version = entry.version;
403
404 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
406 if !scan.range.contains(&original_key_encoded) {
407 continue;
408 }
409
410 let should_update = match collected.get(&original_key) {
412 None => true,
413 Some((existing_version, _)) => entry_version > *existing_version,
414 };
415
416 if should_update {
417 collected.insert(original_key, (entry_version, entry.value));
418 }
419 }
420
421 Ok(true)
422 }
423
424 pub fn range(
430 &self,
431 range: EncodedKeyRange,
432 version: CommitVersion,
433 batch_size: usize,
434 ) -> MultiVersionRangeIter {
435 MultiVersionRangeIter {
436 store: self.clone(),
437 cursor: MultiVersionRangeCursor::new(),
438 range,
439 version,
440 batch_size,
441 current_batch: Vec::new(),
442 current_index: 0,
443 }
444 }
445
446 pub fn range_rev(
452 &self,
453 range: EncodedKeyRange,
454 version: CommitVersion,
455 batch_size: usize,
456 ) -> MultiVersionRangeRevIter {
457 MultiVersionRangeRevIter {
458 store: self.clone(),
459 cursor: MultiVersionRangeCursor::new(),
460 range,
461 version,
462 batch_size,
463 current_batch: Vec::new(),
464 current_index: 0,
465 }
466 }
467
468 fn range_rev_next(
473 &self,
474 cursor: &mut MultiVersionRangeCursor,
475 range: EncodedKeyRange,
476 version: CommitVersion,
477 batch_size: u64,
478 ) -> Result<MultiVersionBatch> {
479 if cursor.exhausted {
480 return Ok(MultiVersionBatch {
481 items: Vec::new(),
482 has_more: false,
483 });
484 }
485
486 let table = classify_key_range(&range);
487 let (start, end) = make_range_bounds(&range);
488 let batch_size = batch_size as usize;
489 let scan = TierScanQuery {
490 table,
491 start: &start,
492 end: &end,
493 version,
494 range: &range,
495 };
496
497 let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
499
500 while collected.len() < batch_size {
502 let mut any_progress = false;
503
504 if let Some(hot) = &self.hot
506 && !cursor.hot.exhausted
507 {
508 let progress = Self::scan_tier_chunk_rev(hot, &mut cursor.hot, &scan, &mut collected)?;
509 any_progress |= progress;
510 }
511
512 if let Some(warm) = &self.warm
514 && !cursor.warm.exhausted
515 {
516 let progress =
517 Self::scan_tier_chunk_rev(warm, &mut cursor.warm, &scan, &mut collected)?;
518 any_progress |= progress;
519 }
520
521 if let Some(cold) = &self.cold
523 && !cursor.cold.exhausted
524 {
525 let progress =
526 Self::scan_tier_chunk_rev(cold, &mut cursor.cold, &scan, &mut collected)?;
527 any_progress |= progress;
528 }
529
530 if !any_progress {
531 cursor.exhausted = true;
533 break;
534 }
535 }
536
537 let items: Vec<MultiVersionRow> = collected
539 .into_iter()
540 .rev()
541 .take(batch_size)
542 .filter_map(|(key_bytes, (v, value))| {
543 value.map(|val| MultiVersionRow {
544 key: EncodedKey(CowVec::new(key_bytes)),
545 row: EncodedRow(val),
546 version: v,
547 })
548 })
549 .collect();
550
551 let has_more = items.len() >= batch_size || !cursor.exhausted;
552
553 Ok(MultiVersionBatch {
554 items,
555 has_more,
556 })
557 }
558
559 fn scan_tier_chunk_rev<S: TierStorage>(
562 storage: &S,
563 cursor: &mut RangeCursor,
564 scan: &TierScanQuery,
565 collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
566 ) -> Result<bool> {
567 let batch = storage.range_rev_next(
568 scan.table,
569 cursor,
570 Bound::Included(scan.start),
571 Bound::Included(scan.end),
572 scan.version,
573 TIER_SCAN_CHUNK_SIZE,
574 )?;
575
576 if batch.entries.is_empty() {
577 return Ok(false);
578 }
579
580 for entry in batch.entries {
581 let original_key = entry.key.as_slice().to_vec();
583 let entry_version = entry.version;
584
585 let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
587 if !scan.range.contains(&original_key_encoded) {
588 continue;
589 }
590
591 let should_update = match collected.get(&original_key) {
593 None => true,
594 Some((existing_version, _)) => entry_version > *existing_version,
595 };
596
597 if should_update {
598 collected.insert(original_key, (entry_version, entry.value));
599 }
600 }
601
602 Ok(true)
603 }
604}
605
606impl MultiVersionGetPrevious for StandardMultiStore {
607 fn get_previous_version(
608 &self,
609 key: &EncodedKey,
610 before_version: CommitVersion,
611 ) -> Result<Option<MultiVersionRow>> {
612 if before_version.0 == 0 {
613 return Ok(None);
614 }
615
616 let storage = self.hot.as_ref().expect("hot storage required for version lookups");
618
619 let table = classify_key(key);
620 let prev_version = CommitVersion(before_version.0 - 1);
621
622 match get_at_version(storage, table, key.as_ref(), prev_version) {
623 Ok(VersionedGetResult::Value {
624 value,
625 version,
626 }) => Ok(Some(MultiVersionRow {
627 key: key.clone(),
628 row: EncodedRow(CowVec::new(value.to_vec())),
629 version,
630 })),
631 Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
632 Err(e) => Err(e),
633 }
634 }
635}
636
637impl MultiVersionStore for StandardMultiStore {}
638
639pub struct MultiVersionRangeIter {
641 store: StandardMultiStore,
642 cursor: MultiVersionRangeCursor,
643 range: EncodedKeyRange,
644 version: CommitVersion,
645 batch_size: usize,
646 current_batch: Vec<MultiVersionRow>,
647 current_index: usize,
648}
649
650impl Iterator for MultiVersionRangeIter {
651 type Item = Result<MultiVersionRow>;
652
653 fn next(&mut self) -> Option<Self::Item> {
654 if self.current_index < self.current_batch.len() {
656 let item = self.current_batch[self.current_index].clone();
657 self.current_index += 1;
658 return Some(Ok(item));
659 }
660
661 if self.cursor.exhausted {
663 return None;
664 }
665
666 match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
668 {
669 Ok(batch) => {
670 if batch.items.is_empty() {
671 return None;
672 }
673 self.current_batch = batch.items;
674 self.current_index = 0;
675 self.next()
676 }
677 Err(e) => Some(Err(e)),
678 }
679 }
680}
681
682pub struct MultiVersionRangeRevIter {
684 store: StandardMultiStore,
685 cursor: MultiVersionRangeCursor,
686 range: EncodedKeyRange,
687 version: CommitVersion,
688 batch_size: usize,
689 current_batch: Vec<MultiVersionRow>,
690 current_index: usize,
691}
692
693impl Iterator for MultiVersionRangeRevIter {
694 type Item = Result<MultiVersionRow>;
695
696 fn next(&mut self) -> Option<Self::Item> {
697 if self.current_index < self.current_batch.len() {
699 let item = self.current_batch[self.current_index].clone();
700 self.current_index += 1;
701 return Some(Ok(item));
702 }
703
704 if self.cursor.exhausted {
706 return None;
707 }
708
709 match self.store.range_rev_next(
711 &mut self.cursor,
712 self.range.clone(),
713 self.version,
714 self.batch_size as u64,
715 ) {
716 Ok(batch) => {
717 if batch.items.is_empty() {
718 return None;
719 }
720 self.current_batch = batch.items;
721 self.current_index = 0;
722 self.next()
723 }
724 Err(e) => Some(Err(e)),
725 }
726 }
727}
728
729fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
731 classify_range(range).unwrap_or(EntryKind::Multi)
732}
733
734fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
737 let start = match &range.start {
738 Bound::Included(key) => key.as_ref().to_vec(),
739 Bound::Excluded(key) => key.as_ref().to_vec(),
740 Bound::Unbounded => vec![],
741 };
742
743 let end = match &range.end {
744 Bound::Included(key) => key.as_ref().to_vec(),
745 Bound::Excluded(key) => key.as_ref().to_vec(),
746 Bound::Unbounded => vec![0xFFu8; 256],
747 };
748
749 (start, end)
750}