1use crate::backend::Backend;
2use crate::error::{
3 AbortError, BatchAbortError, BatchCommitError, BatchError, BatchPrewriteError, CommitError,
4 GcError, PrewriteError, ReadError,
5};
6use crate::types::{
7 CommittedVersion, Intent, Mutation, PhysicalWrite, ReadGuard, Timestamp, TxnId,
8};
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct GcStats {
13 pub versions_removed: usize,
15 pub intents_preserved: usize,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct GcBudget {
22 pub max_keys: usize,
24 pub max_versions: usize,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct GcOptions {
31 pub budget: GcBudget,
33 pub collapse_final_tombstones: bool,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct IncrementalGcCursor {
43 pub next_key: Option<Vec<u8>>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct IncrementalGcResult {
50 pub cursor: IncrementalGcCursor,
52 pub done: bool,
54 pub keys_scanned: usize,
56 pub versions_scanned: usize,
58 pub versions_removed: usize,
60 pub intents_preserved: usize,
62}
63
64pub struct MvccEngine<B: Backend> {
69 backend: B,
70}
71
72impl<B: Backend> MvccEngine<B> {
73 pub fn new(backend: B) -> Self {
75 Self { backend }
76 }
77
78 pub fn backend(&self) -> &B {
80 &self.backend
81 }
82
83 pub fn backend_mut(&mut self) -> &mut B {
85 &mut self.backend
86 }
87
88 pub fn read(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Vec<u8>>, ReadError> {
93 let version = self
94 .backend
95 .get_visible_committed(key, read_ts)
96 .map_err(ReadError::Backend)?;
97 Ok(version.and_then(|v| v.value))
98 }
99
100 #[allow(clippy::type_complexity)]
106 pub fn read_with_version(
107 &self,
108 key: &[u8],
109 read_ts: Timestamp,
110 ) -> Result<Option<(Timestamp, Option<Vec<u8>>)>, ReadError> {
111 let version = self
112 .backend
113 .get_visible_committed(key, read_ts)
114 .map_err(ReadError::Backend)?;
115 Ok(version.map(|v| (v.commit_ts, v.value)))
116 }
117
118 pub fn read_own_write(
123 &self,
124 key: &[u8],
125 txn_id: TxnId,
126 start_ts: Timestamp,
127 read_ts: Timestamp,
128 ) -> Result<Option<Vec<u8>>, ReadError> {
129 if let Some(intent) = self.backend.get_intent(key).map_err(ReadError::Backend)? {
131 if intent.txn_id == txn_id && intent.start_ts == start_ts {
132 return Ok(intent.mutation.value());
133 }
134 }
135 self.read(key, read_ts)
137 }
138
139 pub fn prewrite(
145 &mut self,
146 txn_id: TxnId,
147 start_ts: Timestamp,
148 key: Vec<u8>,
149 mutation: Mutation,
150 ) -> Result<(), PrewriteError> {
151 if let Some(intent) = self
153 .backend
154 .get_intent(&key)
155 .map_err(PrewriteError::Backend)?
156 {
157 if intent.txn_id != txn_id {
158 return Err(PrewriteError::KeyLocked {
159 txn_id: intent.txn_id,
160 });
161 }
162 if intent.start_ts != start_ts {
166 return Err(PrewriteError::IntentAlreadyExists);
167 }
168 if intent.mutation != mutation {
170 return Err(PrewriteError::IntentAlreadyExists);
171 }
172 return Ok(());
173 }
174
175 if let Some(latest_ts) = self
177 .backend
178 .get_latest_commit_ts(&key)
179 .map_err(PrewriteError::Backend)?
180 {
181 if latest_ts > start_ts {
182 return Err(PrewriteError::WriteConflict);
183 }
184 }
185
186 let intent = Intent {
187 key: key.clone(),
188 txn_id,
189 start_ts,
190 mutation,
191 min_commit_ts: None,
192 };
193 self.backend
194 .put_intent(intent)
195 .map_err(PrewriteError::Backend)?;
196 Ok(())
197 }
198
199 pub fn commit(
204 &mut self,
205 txn_id: TxnId,
206 key: &[u8],
207 start_ts: Timestamp,
208 commit_ts: Timestamp,
209 ) -> Result<(), CommitError> {
210 let intent = self
211 .backend
212 .get_intent(key)
213 .map_err(CommitError::Backend)?
214 .ok_or(CommitError::IntentNotFound)?;
215
216 if intent.txn_id != txn_id {
217 return Err(CommitError::TxnIdMismatch);
218 }
219 if intent.start_ts != start_ts {
220 return Err(CommitError::StartTsMismatch);
221 }
222
223 if commit_ts <= start_ts {
224 return Err(CommitError::InvalidCommitTimestamp {
225 start_ts,
226 commit_ts,
227 });
228 }
229
230 if let Some(min_ts) = intent.min_commit_ts {
231 if commit_ts < min_ts {
232 return Err(CommitError::CommitTsTooEarly {
233 commit_ts,
234 min_commit_ts: min_ts,
235 });
236 }
237 }
238
239 if let Some(latest_ts) = self
241 .backend
242 .get_latest_commit_ts(key)
243 .map_err(CommitError::Backend)?
244 {
245 if commit_ts <= latest_ts {
246 if commit_ts == latest_ts {
247 return Err(CommitError::DuplicateCommitTimestamp { commit_ts });
248 } else {
249 return Err(CommitError::CommitTsTooOld {
250 commit_ts,
251 latest_commit_ts: latest_ts,
252 });
253 }
254 }
255 }
256
257 let version = CommittedVersion {
259 key: key.to_vec(),
260 commit_ts,
261 value: intent.mutation.value(),
262 };
263 self.backend
264 .commit_intents_batch(vec![version], vec![(key.to_vec(), txn_id, start_ts)])
265 .map_err(CommitError::Backend)?;
266 Ok(())
267 }
268
269 pub fn abort(
273 &mut self,
274 txn_id: TxnId,
275 key: &[u8],
276 start_ts: Timestamp,
277 ) -> Result<(), AbortError> {
278 let removed = self
279 .backend
280 .remove_intent(key, txn_id, start_ts)
281 .map_err(AbortError::Backend)?;
282 let _ = removed;
285 Ok(())
286 }
287
288 pub fn prewrite_batch(
293 &mut self,
294 txn_id: TxnId,
295 start_ts: Timestamp,
296 writes: Vec<PhysicalWrite>,
297 ) -> Result<(), BatchPrewriteError> {
298 if writes.is_empty() {
299 return Err(BatchPrewriteError::EmptyBatch);
300 }
301
302 let mut key_set = std::collections::HashSet::new();
303 for w in &writes {
304 if !key_set.insert(w.key.clone()) {
305 return Err(BatchPrewriteError::DuplicateKeyInBatch { key: w.key.clone() });
306 }
307 }
308
309 let mut existing_count = 0;
310 for w in &writes {
311 if let Some(intent) = self
312 .backend
313 .get_intent(&w.key)
314 .map_err(BatchPrewriteError::Backend)?
315 {
316 if intent.txn_id != txn_id {
317 return Err(BatchPrewriteError::KeyLocked {
318 key: w.key.clone(),
319 txn_id: intent.txn_id,
320 });
321 }
322 let expected_mutation = if let Some(v) = &w.value {
323 Mutation::Put(v.clone())
324 } else {
325 Mutation::Delete
326 };
327 if intent.start_ts != start_ts || intent.mutation != expected_mutation {
328 return Err(BatchPrewriteError::IntentAlreadyExists { key: w.key.clone() });
329 }
330 existing_count += 1;
331 } else if let Some(latest_ts) = self
332 .backend
333 .get_latest_commit_ts(&w.key)
334 .map_err(BatchPrewriteError::Backend)?
335 {
336 if latest_ts > start_ts {
337 return Err(BatchPrewriteError::WriteConflict { key: w.key.clone() });
338 }
339 }
340 }
341
342 if existing_count == writes.len() {
343 return Ok(());
344 } else if existing_count > 0 {
345 return Err(BatchPrewriteError::PartialBatchReplay);
346 }
347
348 let mut intents = Vec::with_capacity(writes.len());
349 for w in writes {
350 intents.push(Intent {
351 key: w.key,
352 txn_id,
353 start_ts,
354 mutation: if let Some(v) = w.value {
355 Mutation::Put(v)
356 } else {
357 Mutation::Delete
358 },
359 min_commit_ts: None,
360 });
361 }
362 self.backend
363 .put_intents_batch(intents)
364 .map_err(BatchPrewriteError::Backend)?;
365 Ok(())
366 }
367
368 pub fn commit_batch(
372 &mut self,
373 txn_id: TxnId,
374 start_ts: Timestamp,
375 commit_ts: Timestamp,
376 keys: Vec<Vec<u8>>,
377 ) -> Result<(), BatchCommitError> {
378 if keys.is_empty() {
379 return Err(BatchCommitError::EmptyBatch);
380 }
381
382 let mut key_set = std::collections::HashSet::new();
383 for key in &keys {
384 if !key_set.insert(key.clone()) {
385 return Err(BatchCommitError::DuplicateKeyInBatch { key: key.clone() });
386 }
387 }
388
389 if commit_ts <= start_ts {
390 return Err(BatchCommitError::InvalidCommitTimestamp {
391 start_ts,
392 commit_ts,
393 });
394 }
395
396 let mut commits = Vec::with_capacity(keys.len());
397 let mut removed_intents = Vec::with_capacity(keys.len());
398
399 for key in &keys {
400 let intent = self
401 .backend
402 .get_intent(key)
403 .map_err(BatchCommitError::Backend)?
404 .ok_or_else(|| BatchCommitError::IntentNotFound { key: key.clone() })?;
405
406 if intent.txn_id != txn_id {
407 return Err(BatchCommitError::TxnIdMismatch { key: key.clone() });
408 }
409 if intent.start_ts != start_ts {
410 return Err(BatchCommitError::StartTsMismatch { key: key.clone() });
411 }
412 if let Some(min_ts) = intent.min_commit_ts {
413 if commit_ts < min_ts {
414 return Err(BatchCommitError::CommitTsTooEarly {
415 key: key.clone(),
416 commit_ts,
417 min_commit_ts: min_ts,
418 });
419 }
420 }
421
422 if let Some(latest_ts) = self
423 .backend
424 .get_latest_commit_ts(key)
425 .map_err(BatchCommitError::Backend)?
426 {
427 if commit_ts <= latest_ts {
428 return Err(BatchCommitError::CommitTsTooOld {
429 key: key.clone(),
430 commit_ts,
431 latest_commit_ts: latest_ts,
432 });
433 }
434 }
435
436 commits.push(CommittedVersion {
437 key: key.clone(),
438 commit_ts,
439 value: intent.mutation.value(),
440 });
441 removed_intents.push((key.clone(), txn_id, start_ts));
442 }
443
444 self.backend
445 .commit_intents_batch(commits, removed_intents)
446 .map_err(BatchCommitError::Backend)?;
447 Ok(())
448 }
449
450 pub fn abort_batch(
454 &mut self,
455 txn_id: TxnId,
456 start_ts: Timestamp,
457 keys: Vec<Vec<u8>>,
458 ) -> Result<(), BatchAbortError> {
459 if keys.is_empty() {
460 return Ok(());
461 }
462
463 let mut key_set = std::collections::HashSet::new();
464 for key in &keys {
465 if !key_set.insert(key.clone()) {
466 return Err(BatchAbortError::DuplicateKeyInBatch { key: key.clone() });
467 }
468 }
469
470 let mut removed_intents = Vec::with_capacity(keys.len());
471 for key in &keys {
472 if let Some(intent) = self
473 .backend
474 .get_intent(key)
475 .map_err(BatchAbortError::Backend)?
476 {
477 if intent.txn_id == txn_id && intent.start_ts == start_ts {
478 removed_intents.push((key.clone(), txn_id, start_ts));
479 }
480 }
481 }
482
483 self.backend
484 .remove_intents_batch(removed_intents)
485 .map_err(BatchAbortError::Backend)?;
486 Ok(())
487 }
488
489 pub fn apply_direct_batch(
494 &mut self,
495 commit_ts: Timestamp,
496 writes: Vec<PhysicalWrite>,
497 ) -> Result<(), BatchError> {
498 if writes.is_empty() {
499 return Err(BatchError::EmptyBatch);
500 }
501
502 let mut key_set = std::collections::HashSet::new();
503 for w in &writes {
504 if !key_set.insert(w.key.clone()) {
505 return Err(BatchError::DuplicateKeyInBatch { key: w.key.clone() });
506 }
507 }
508
509 for w in &writes {
511 if let Some(intent) = self
513 .backend
514 .get_intent(&w.key)
515 .map_err(BatchError::Backend)?
516 {
517 return Err(BatchError::KeyLocked {
518 key: w.key.clone(),
519 txn_id: intent.txn_id,
520 });
521 }
522
523 if let Some(latest_ts) = self
525 .backend
526 .get_latest_commit_ts(&w.key)
527 .map_err(BatchError::Backend)?
528 {
529 if commit_ts <= latest_ts {
530 return Err(BatchError::CommitTsTooOld {
531 key: w.key.clone(),
532 commit_ts,
533 latest_commit_ts: latest_ts,
534 });
535 }
536 }
537 }
538
539 let mut commits = Vec::with_capacity(writes.len());
541 for w in writes {
542 commits.push(CommittedVersion {
543 key: w.key,
544 commit_ts,
545 value: w.value,
546 });
547 }
548
549 self.backend
550 .put_committed_batch(commits)
551 .map_err(BatchError::Backend)?;
552 Ok(())
553 }
554
555 pub fn apply_guarded_batch(
560 &mut self,
561 commit_ts: Timestamp,
562 guards: Vec<ReadGuard>,
563 writes: Vec<PhysicalWrite>,
564 ) -> Result<(), BatchError> {
565 if writes.is_empty() {
566 return Err(BatchError::EmptyBatch);
567 }
568 if guards.is_empty() {
569 return Err(BatchError::NoReadGuards);
570 }
571
572 let mut write_keys = std::collections::HashSet::new();
573 for w in &writes {
574 if !write_keys.insert(w.key.clone()) {
575 return Err(BatchError::DuplicateKeyInBatch { key: w.key.clone() });
576 }
577 }
578
579 for guard in &guards {
581 let (guard_key, guard_read_ts) = match guard {
582 ReadGuard::ExpectedVersion { key, read_ts, .. } => (key, read_ts),
583 ReadGuard::ExpectedValue { key, read_ts, .. } => (key, read_ts),
584 };
585
586 if commit_ts <= *guard_read_ts {
587 return Err(BatchError::InvalidCommitTimestamp {
588 read_ts: *guard_read_ts,
589 commit_ts,
590 });
591 }
592
593 if let Some(intent) = self
595 .backend
596 .get_intent(guard_key)
597 .map_err(BatchError::Backend)?
598 {
599 return Err(BatchError::KeyLocked {
600 key: guard_key.clone(),
601 txn_id: intent.txn_id,
602 });
603 }
604
605 if let Some(latest_ts) = self
606 .backend
607 .get_latest_commit_ts(guard_key)
608 .map_err(BatchError::Backend)?
609 {
610 if latest_ts > *guard_read_ts {
611 return Err(BatchError::GuardFailedNewerVersion {
612 key: guard_key.clone(),
613 read_ts: *guard_read_ts,
614 actual_commit_ts: latest_ts,
615 });
616 }
617 }
618
619 let visible_version = self
620 .backend
621 .get_visible_committed(guard_key, *guard_read_ts)
622 .map_err(BatchError::Backend)?;
623
624 match guard {
625 ReadGuard::ExpectedVersion {
626 expected_commit_ts, ..
627 } => {
628 let actual_commit_ts = visible_version.as_ref().map(|v| v.commit_ts);
629 if actual_commit_ts != *expected_commit_ts {
630 return Err(BatchError::GuardFailedVersionMismatch {
631 key: guard_key.clone(),
632 expected: *expected_commit_ts,
633 actual: actual_commit_ts,
634 });
635 }
636 }
637 ReadGuard::ExpectedValue { expected_value, .. } => {
638 let actual_value = visible_version.as_ref().and_then(|v| v.value.as_ref());
639 if actual_value != expected_value.as_ref() {
640 return Err(BatchError::GuardFailedValueMismatch {
641 key: guard_key.clone(),
642 });
643 }
644 }
645 }
646 }
647
648 for w in &writes {
650 if let Some(intent) = self
651 .backend
652 .get_intent(&w.key)
653 .map_err(BatchError::Backend)?
654 {
655 return Err(BatchError::KeyLocked {
656 key: w.key.clone(),
657 txn_id: intent.txn_id,
658 });
659 }
660
661 if let Some(latest_ts) = self
662 .backend
663 .get_latest_commit_ts(&w.key)
664 .map_err(BatchError::Backend)?
665 {
666 if commit_ts <= latest_ts {
667 return Err(BatchError::CommitTsTooOld {
668 key: w.key.clone(),
669 commit_ts,
670 latest_commit_ts: latest_ts,
671 });
672 }
673 }
674 }
675
676 let mut commits = Vec::with_capacity(writes.len());
678 for w in writes {
679 commits.push(CommittedVersion {
680 key: w.key,
681 commit_ts,
682 value: w.value,
683 });
684 }
685
686 self.backend
687 .put_committed_batch(commits)
688 .map_err(BatchError::Backend)?;
689 Ok(())
690 }
691
692 pub fn gc_incremental(
696 &mut self,
697 safe_point_ts: Timestamp,
698 cursor: Option<IncrementalGcCursor>,
699 options: GcOptions,
700 ) -> Result<IncrementalGcResult, GcError> {
701 if options.budget.max_keys == 0 || options.budget.max_versions == 0 {
702 return Err(GcError::InvalidGcBudget);
703 }
704
705 let start_key = cursor.and_then(|c| c.next_key);
706
707 let keys = self
708 .backend
709 .keys_from(start_key.as_deref(), options.budget.max_keys + 1)
710 .map_err(GcError::Backend)?;
711
712 let mut keys_scanned = 0;
713 let mut versions_scanned = 0;
714 let mut versions_removed = 0;
715 let mut intents_preserved = 0;
716
717 let mut next_cursor_key = None;
718 let mut done = false;
719 let mut exhausted_versions = false;
720
721 let num_keys_to_process = std::cmp::min(keys.len(), options.budget.max_keys);
722
723 for key in keys.iter().take(num_keys_to_process) {
724 keys_scanned += 1;
725
726 let has_intent = self
727 .backend
728 .get_intent(key)
729 .map_err(GcError::Backend)?
730 .is_some();
731 if has_intent {
732 intents_preserved += 1;
733 }
734
735 let keeper = self
736 .backend
737 .get_visible_committed(key, safe_point_ts)
738 .map_err(GcError::Backend)?;
739
740 if let Some(keeper_ver) = keeper {
741 versions_scanned += 1; let limit = options.budget.max_versions - versions_removed;
744 if limit == 0 {
745 let check_more = self
747 .backend
748 .get_committed_timestamps_before(key, keeper_ver.commit_ts, 1)
749 .map_err(GcError::Backend)?;
750
751 if !check_more.is_empty() {
752 next_cursor_key = Some(key.clone());
753 exhausted_versions = true;
754 break;
755 }
756
757 if options.collapse_final_tombstones
760 && keeper_ver.value.is_none()
761 && !has_intent
762 {
763 if let Some(latest_ts) = self
764 .backend
765 .get_latest_commit_ts(key)
766 .map_err(GcError::Backend)?
767 {
768 if latest_ts == keeper_ver.commit_ts {
769 next_cursor_key = Some(key.clone());
770 exhausted_versions = true;
771 break;
772 }
773 }
774 }
775 continue;
776 }
777
778 let mut is_final_tombstone = false;
780 if options.collapse_final_tombstones && keeper_ver.value.is_none() && !has_intent {
781 if let Some(latest_ts) = self
782 .backend
783 .get_latest_commit_ts(key)
784 .map_err(GcError::Backend)?
785 {
786 if latest_ts == keeper_ver.commit_ts {
787 is_final_tombstone = true;
788 }
789 }
790 }
791
792 if is_final_tombstone {
793 let mut older_versions = self
796 .backend
797 .get_committed_timestamps_before(key, keeper_ver.commit_ts, limit + 1)
798 .map_err(GcError::Backend)?;
799
800 versions_scanned += older_versions.len();
801
802 let has_more = older_versions.len() > limit;
803 if has_more {
804 older_versions.pop();
807 versions_scanned -= 1;
808
809 for ts in older_versions {
812 self.backend
813 .remove_committed_version(key, ts)
814 .map_err(GcError::Backend)?;
815 versions_removed += 1;
816 }
817
818 next_cursor_key = Some(key.clone());
819 exhausted_versions = true;
820 break;
821 } else {
822 let older_len = older_versions.len();
825 if older_len < limit {
826 self.backend
828 .collapse_tombstone(key, keeper_ver.commit_ts, older_versions)
829 .map_err(GcError::Backend)?;
830
831 versions_removed += older_len + 1;
832 } else {
833 for ts in older_versions {
837 self.backend
838 .remove_committed_version(key, ts)
839 .map_err(GcError::Backend)?;
840 versions_removed += 1;
841 }
842
843 next_cursor_key = Some(key.clone());
844 exhausted_versions = true;
845 break;
846 }
847 }
848 } else {
849 let mut to_remove = self
851 .backend
852 .get_committed_timestamps_before(key, keeper_ver.commit_ts, limit + 1)
853 .map_err(GcError::Backend)?;
854
855 versions_scanned += to_remove.len();
856
857 let has_more = to_remove.len() > limit;
858 if has_more {
859 to_remove.pop();
860 versions_scanned -= 1;
861 }
862
863 for ts in to_remove {
864 self.backend
865 .remove_committed_version(key, ts)
866 .map_err(GcError::Backend)?;
867 versions_removed += 1;
868 }
869
870 if has_more {
871 next_cursor_key = Some(key.clone());
872 exhausted_versions = true;
873 break;
874 }
875 }
876 }
877 }
878
879 if !exhausted_versions {
880 if keys.len() > options.budget.max_keys {
881 next_cursor_key = Some(keys[options.budget.max_keys].clone());
882 } else {
883 done = true;
884 }
885 }
886
887 Ok(IncrementalGcResult {
888 cursor: IncrementalGcCursor {
889 next_key: next_cursor_key,
890 },
891 done,
892 keys_scanned,
893 versions_scanned,
894 versions_removed,
895 intents_preserved,
896 })
897 }
898
899 pub fn gc(&mut self, safe_point_ts: Timestamp, options: GcOptions) -> Result<GcStats, GcError> {
903 let mut total_versions_removed = 0;
904
905 let mut cursor = None;
906
907 loop {
908 let res = self.gc_incremental(safe_point_ts, cursor.take(), options)?;
909 total_versions_removed += res.versions_removed;
910
911 if res.done {
912 break;
913 }
914 cursor = Some(res.cursor);
915 }
916
917 let mut total_intents_preserved = 0;
918 for key in self.backend.all_keys().map_err(GcError::Backend)? {
919 if self
920 .backend
921 .get_intent(&key)
922 .map_err(GcError::Backend)?
923 .is_some()
924 {
925 total_intents_preserved += 1;
926 }
927 }
928
929 Ok(GcStats {
930 versions_removed: total_versions_removed,
931 intents_preserved: total_intents_preserved,
932 })
933 }
934}