Skip to main content

nexir_mvcc_core/
engine.rs

1use crate::backend::Backend;
2use crate::error::{
3    AbortError, BatchAbortError, BatchCommitError, BatchError, BatchPrewriteError, CommitError,
4    GcError, PrewriteError, ReadError,
5};
6use crate::types::{
7    CommittedVersion, Intent, Mutation, PhysicalWrite, ReadGuard, Timestamp, TxnId,
8};
9
10/// Statistics produced by garbage collection.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct GcStats {
13    /// The number of obsolete versions physically removed.
14    pub versions_removed: usize,
15    /// The number of intents safely ignored and preserved.
16    pub intents_preserved: usize,
17}
18
19/// Budget for incremental garbage collection.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct GcBudget {
22    /// Maximum number of keys to process in one step.
23    pub max_keys: usize,
24    /// Maximum number of historical versions to remove in one step.
25    pub max_versions: usize,
26}
27
28/// Options for garbage collection operations.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct GcOptions {
31    /// Budget parameters limiting the scope of one step.
32    pub budget: GcBudget,
33    /// Explicit opt-in retention policy: if true, a final unshadowed tombstone
34    /// will be physically collapsed. This is ONLY safe if the caller guarantees
35    /// a strict low-watermark where no future reads, prewrites, or guards will
36    /// ever be issued at or below the collapsed tombstone timestamp.
37    pub collapse_final_tombstones: bool,
38}
39
40/// Cursor to resume incremental garbage collection.
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct IncrementalGcCursor {
43    /// The next key to evaluate.
44    pub next_key: Option<Vec<u8>>,
45}
46
47/// Result of an incremental garbage collection step.
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct IncrementalGcResult {
50    /// The cursor to resume GC from.
51    pub cursor: IncrementalGcCursor,
52    /// True if a full pass over the keyspace has completed.
53    pub done: bool,
54    /// Number of keys scanned in this step.
55    pub keys_scanned: usize,
56    /// Number of versions scanned in this step.
57    pub versions_scanned: usize,
58    /// Number of versions physically removed in this step.
59    pub versions_removed: usize,
60    /// Number of active intents safely ignored and preserved.
61    pub intents_preserved: usize,
62}
63
64/// The central state machine for executing MVCC operations.
65///
66/// `MvccEngine` wraps a durable `Backend` and provides high-level APIs for
67/// single-key and multi-key reads, transactional intents, and direct batches.
68pub struct MvccEngine<B: Backend> {
69    backend: B,
70}
71
72impl<B: Backend> MvccEngine<B> {
73    /// Creates a new `MvccEngine` with the given backend.
74    pub fn new(backend: B) -> Self {
75        Self { backend }
76    }
77
78    /// Returns an immutable reference to the underlying backend.
79    pub fn backend(&self) -> &B {
80        &self.backend
81    }
82
83    /// Returns a mutable reference to the underlying backend.
84    pub fn backend_mut(&mut self) -> &mut B {
85        &mut self.backend
86    }
87
88    /// Reads the visible value for a key at a given logical `read_ts`.
89    ///
90    /// This method ignores uncommitted intents and finds the newest committed
91    /// version that has a `commit_ts` <= `read_ts`.
92    pub fn read(&self, key: &[u8], read_ts: Timestamp) -> Result<Option<Vec<u8>>, ReadError> {
93        let version = self
94            .backend
95            .get_visible_committed(key, read_ts)
96            .map_err(ReadError::Backend)?;
97        Ok(version.and_then(|v| v.value))
98    }
99
100    /// Reads the visible value for a key at `read_ts` and returns its commit timestamp and value.
101    ///
102    /// If no version is visible at or before `read_ts`, returns `Ok(None)`.
103    /// If a tombstone is visible, returns `Ok(Some((ts, None)))`.
104    /// If a value is visible, returns `Ok(Some((ts, Some(value))))`.
105    #[allow(clippy::type_complexity)]
106    pub fn read_with_version(
107        &self,
108        key: &[u8],
109        read_ts: Timestamp,
110    ) -> Result<Option<(Timestamp, Option<Vec<u8>>)>, ReadError> {
111        let version = self
112            .backend
113            .get_visible_committed(key, read_ts)
114            .map_err(ReadError::Backend)?;
115        Ok(version.map(|v| (v.commit_ts, v.value)))
116    }
117
118    /// Reads the visible value for a key, prioritizing the transaction's own active intent.
119    ///
120    /// If the transaction has an active intent on the key, its value is returned.
121    /// Otherwise, it falls back to a normal historical read at `read_ts`.
122    pub fn read_own_write(
123        &self,
124        key: &[u8],
125        txn_id: TxnId,
126        start_ts: Timestamp,
127        read_ts: Timestamp,
128    ) -> Result<Option<Vec<u8>>, ReadError> {
129        // First, check if this txn has an intent on the key.
130        if let Some(intent) = self.backend.get_intent(key).map_err(ReadError::Backend)? {
131            if intent.txn_id == txn_id && intent.start_ts == start_ts {
132                return Ok(intent.mutation.value());
133            }
134        }
135        // Fall back to committed versions.
136        self.read(key, read_ts)
137    }
138
139    /// Prewrites a single intent for a distributed transaction.
140    ///
141    /// Fails if another transaction holds an intent on the key, or if a newer
142    /// committed version exists (write conflict). It is idempotent for the same transaction
143    /// and `start_ts`.
144    pub fn prewrite(
145        &mut self,
146        txn_id: TxnId,
147        start_ts: Timestamp,
148        key: Vec<u8>,
149        mutation: Mutation,
150    ) -> Result<(), PrewriteError> {
151        // Check for an existing intent by another txn.
152        if let Some(intent) = self
153            .backend
154            .get_intent(&key)
155            .map_err(PrewriteError::Backend)?
156        {
157            if intent.txn_id != txn_id {
158                return Err(PrewriteError::KeyLocked {
159                    txn_id: intent.txn_id,
160                });
161            }
162            // Same txn: if start_ts matches, this is a re-prewrite (idempotent).
163            // If start_ts differs, it's a conflicting intent from the same txn
164            // (should not happen in correct usage).
165            if intent.start_ts != start_ts {
166                return Err(PrewriteError::IntentAlreadyExists);
167            }
168            // Same txn, same start_ts: already prewritten. Verify mutation matches.
169            if intent.mutation != mutation {
170                return Err(PrewriteError::IntentAlreadyExists);
171            }
172            return Ok(());
173        }
174
175        // Check for write conflict: any committed version with commit_ts > start_ts.
176        if let Some(latest_ts) = self
177            .backend
178            .get_latest_commit_ts(&key)
179            .map_err(PrewriteError::Backend)?
180        {
181            if latest_ts > start_ts {
182                return Err(PrewriteError::WriteConflict);
183            }
184        }
185
186        let intent = Intent {
187            key: key.clone(),
188            txn_id,
189            start_ts,
190            mutation,
191            min_commit_ts: None,
192        };
193        self.backend
194            .put_intent(intent)
195            .map_err(PrewriteError::Backend)?;
196        Ok(())
197    }
198
199    /// Commits a single intent, creating a durable version.
200    ///
201    /// Converts the intent at `start_ts` into a committed version at `commit_ts`.
202    /// The backend must execute this atomically (create version and remove intent).
203    pub fn commit(
204        &mut self,
205        txn_id: TxnId,
206        key: &[u8],
207        start_ts: Timestamp,
208        commit_ts: Timestamp,
209    ) -> Result<(), CommitError> {
210        let intent = self
211            .backend
212            .get_intent(key)
213            .map_err(CommitError::Backend)?
214            .ok_or(CommitError::IntentNotFound)?;
215
216        if intent.txn_id != txn_id {
217            return Err(CommitError::TxnIdMismatch);
218        }
219        if intent.start_ts != start_ts {
220            return Err(CommitError::StartTsMismatch);
221        }
222
223        if commit_ts <= start_ts {
224            return Err(CommitError::InvalidCommitTimestamp {
225                start_ts,
226                commit_ts,
227            });
228        }
229
230        if let Some(min_ts) = intent.min_commit_ts {
231            if commit_ts < min_ts {
232                return Err(CommitError::CommitTsTooEarly {
233                    commit_ts,
234                    min_commit_ts: min_ts,
235                });
236            }
237        }
238
239        // Check for retroactive commit
240        if let Some(latest_ts) = self
241            .backend
242            .get_latest_commit_ts(key)
243            .map_err(CommitError::Backend)?
244        {
245            if commit_ts <= latest_ts {
246                if commit_ts == latest_ts {
247                    return Err(CommitError::DuplicateCommitTimestamp { commit_ts });
248                } else {
249                    return Err(CommitError::CommitTsTooOld {
250                        commit_ts,
251                        latest_commit_ts: latest_ts,
252                    });
253                }
254            }
255        }
256
257        // Create committed version and remove the intent in one backend transition.
258        let version = CommittedVersion {
259            key: key.to_vec(),
260            commit_ts,
261            value: intent.mutation.value(),
262        };
263        self.backend
264            .commit_intents_batch(vec![version], vec![(key.to_vec(), txn_id, start_ts)])
265            .map_err(CommitError::Backend)?;
266        Ok(())
267    }
268
269    /// Aborts a single intent, removing it from the backend.
270    ///
271    /// This method is idempotent: if the intent is not found, it returns `Ok(())`.
272    pub fn abort(
273        &mut self,
274        txn_id: TxnId,
275        key: &[u8],
276        start_ts: Timestamp,
277    ) -> Result<(), AbortError> {
278        let removed = self
279            .backend
280            .remove_intent(key, txn_id, start_ts)
281            .map_err(AbortError::Backend)?;
282        // If the intent was not found or did not match, it's a no-op.
283        // This makes abort idempotent.
284        let _ = removed;
285        Ok(())
286    }
287
288    /// Prewrites multiple intents atomically for a transaction.
289    ///
290    /// Rejects empty batches with `EmptyBatch`. Identical replay semantics apply
291    /// as in single-key prewrite.
292    pub fn prewrite_batch(
293        &mut self,
294        txn_id: TxnId,
295        start_ts: Timestamp,
296        writes: Vec<PhysicalWrite>,
297    ) -> Result<(), BatchPrewriteError> {
298        if writes.is_empty() {
299            return Err(BatchPrewriteError::EmptyBatch);
300        }
301
302        let mut key_set = std::collections::HashSet::new();
303        for w in &writes {
304            if !key_set.insert(w.key.clone()) {
305                return Err(BatchPrewriteError::DuplicateKeyInBatch { key: w.key.clone() });
306            }
307        }
308
309        let mut existing_count = 0;
310        for w in &writes {
311            if let Some(intent) = self
312                .backend
313                .get_intent(&w.key)
314                .map_err(BatchPrewriteError::Backend)?
315            {
316                if intent.txn_id != txn_id {
317                    return Err(BatchPrewriteError::KeyLocked {
318                        key: w.key.clone(),
319                        txn_id: intent.txn_id,
320                    });
321                }
322                let expected_mutation = if let Some(v) = &w.value {
323                    Mutation::Put(v.clone())
324                } else {
325                    Mutation::Delete
326                };
327                if intent.start_ts != start_ts || intent.mutation != expected_mutation {
328                    return Err(BatchPrewriteError::IntentAlreadyExists { key: w.key.clone() });
329                }
330                existing_count += 1;
331            } else if let Some(latest_ts) = self
332                .backend
333                .get_latest_commit_ts(&w.key)
334                .map_err(BatchPrewriteError::Backend)?
335            {
336                if latest_ts > start_ts {
337                    return Err(BatchPrewriteError::WriteConflict { key: w.key.clone() });
338                }
339            }
340        }
341
342        if existing_count == writes.len() {
343            return Ok(());
344        } else if existing_count > 0 {
345            return Err(BatchPrewriteError::PartialBatchReplay);
346        }
347
348        let mut intents = Vec::with_capacity(writes.len());
349        for w in writes {
350            intents.push(Intent {
351                key: w.key,
352                txn_id,
353                start_ts,
354                mutation: if let Some(v) = w.value {
355                    Mutation::Put(v)
356                } else {
357                    Mutation::Delete
358                },
359                min_commit_ts: None,
360            });
361        }
362        self.backend
363            .put_intents_batch(intents)
364            .map_err(BatchPrewriteError::Backend)?;
365        Ok(())
366    }
367
368    /// Commits multiple intents atomically, creating durable versions.
369    ///
370    /// Rejects empty batches with `EmptyBatch`.
371    pub fn commit_batch(
372        &mut self,
373        txn_id: TxnId,
374        start_ts: Timestamp,
375        commit_ts: Timestamp,
376        keys: Vec<Vec<u8>>,
377    ) -> Result<(), BatchCommitError> {
378        if keys.is_empty() {
379            return Err(BatchCommitError::EmptyBatch);
380        }
381
382        let mut key_set = std::collections::HashSet::new();
383        for key in &keys {
384            if !key_set.insert(key.clone()) {
385                return Err(BatchCommitError::DuplicateKeyInBatch { key: key.clone() });
386            }
387        }
388
389        if commit_ts <= start_ts {
390            return Err(BatchCommitError::InvalidCommitTimestamp {
391                start_ts,
392                commit_ts,
393            });
394        }
395
396        let mut commits = Vec::with_capacity(keys.len());
397        let mut removed_intents = Vec::with_capacity(keys.len());
398
399        for key in &keys {
400            let intent = self
401                .backend
402                .get_intent(key)
403                .map_err(BatchCommitError::Backend)?
404                .ok_or_else(|| BatchCommitError::IntentNotFound { key: key.clone() })?;
405
406            if intent.txn_id != txn_id {
407                return Err(BatchCommitError::TxnIdMismatch { key: key.clone() });
408            }
409            if intent.start_ts != start_ts {
410                return Err(BatchCommitError::StartTsMismatch { key: key.clone() });
411            }
412            if let Some(min_ts) = intent.min_commit_ts {
413                if commit_ts < min_ts {
414                    return Err(BatchCommitError::CommitTsTooEarly {
415                        key: key.clone(),
416                        commit_ts,
417                        min_commit_ts: min_ts,
418                    });
419                }
420            }
421
422            if let Some(latest_ts) = self
423                .backend
424                .get_latest_commit_ts(key)
425                .map_err(BatchCommitError::Backend)?
426            {
427                if commit_ts <= latest_ts {
428                    return Err(BatchCommitError::CommitTsTooOld {
429                        key: key.clone(),
430                        commit_ts,
431                        latest_commit_ts: latest_ts,
432                    });
433                }
434            }
435
436            commits.push(CommittedVersion {
437                key: key.clone(),
438                commit_ts,
439                value: intent.mutation.value(),
440            });
441            removed_intents.push((key.clone(), txn_id, start_ts));
442        }
443
444        self.backend
445            .commit_intents_batch(commits, removed_intents)
446            .map_err(BatchCommitError::Backend)?;
447        Ok(())
448    }
449
450    /// Aborts multiple intents, removing them atomically.
451    ///
452    /// If the key list is empty, it returns `Ok(())` (idempotent).
453    pub fn abort_batch(
454        &mut self,
455        txn_id: TxnId,
456        start_ts: Timestamp,
457        keys: Vec<Vec<u8>>,
458    ) -> Result<(), BatchAbortError> {
459        if keys.is_empty() {
460            return Ok(());
461        }
462
463        let mut key_set = std::collections::HashSet::new();
464        for key in &keys {
465            if !key_set.insert(key.clone()) {
466                return Err(BatchAbortError::DuplicateKeyInBatch { key: key.clone() });
467            }
468        }
469
470        let mut removed_intents = Vec::with_capacity(keys.len());
471        for key in &keys {
472            if let Some(intent) = self
473                .backend
474                .get_intent(key)
475                .map_err(BatchAbortError::Backend)?
476            {
477                if intent.txn_id == txn_id && intent.start_ts == start_ts {
478                    removed_intents.push((key.clone(), txn_id, start_ts));
479                }
480            }
481        }
482
483        self.backend
484            .remove_intents_batch(removed_intents)
485            .map_err(BatchAbortError::Backend)?;
486        Ok(())
487    }
488
489    /// Applies a batch of writes directly, bypassing intents.
490    ///
491    /// Rejects empty batches with `EmptyBatch`. Fails if any key has an active intent
492    /// or if `commit_ts` is not strictly greater than the latest committed version.
493    pub fn apply_direct_batch(
494        &mut self,
495        commit_ts: Timestamp,
496        writes: Vec<PhysicalWrite>,
497    ) -> Result<(), BatchError> {
498        if writes.is_empty() {
499            return Err(BatchError::EmptyBatch);
500        }
501
502        let mut key_set = std::collections::HashSet::new();
503        for w in &writes {
504            if !key_set.insert(w.key.clone()) {
505                return Err(BatchError::DuplicateKeyInBatch { key: w.key.clone() });
506            }
507        }
508
509        // Validate all writes
510        for w in &writes {
511            // Reject active intents
512            if let Some(intent) = self
513                .backend
514                .get_intent(&w.key)
515                .map_err(BatchError::Backend)?
516            {
517                return Err(BatchError::KeyLocked {
518                    key: w.key.clone(),
519                    txn_id: intent.txn_id,
520                });
521            }
522
523            // Reject commit_ts <= latest_commit_ts
524            if let Some(latest_ts) = self
525                .backend
526                .get_latest_commit_ts(&w.key)
527                .map_err(BatchError::Backend)?
528            {
529                if commit_ts <= latest_ts {
530                    return Err(BatchError::CommitTsTooOld {
531                        key: w.key.clone(),
532                        commit_ts,
533                        latest_commit_ts: latest_ts,
534                    });
535                }
536            }
537        }
538
539        // Apply all-or-nothing
540        let mut commits = Vec::with_capacity(writes.len());
541        for w in writes {
542            commits.push(CommittedVersion {
543                key: w.key,
544                commit_ts,
545                value: w.value,
546            });
547        }
548
549        self.backend
550            .put_committed_batch(commits)
551            .map_err(BatchError::Backend)?;
552        Ok(())
553    }
554
555    /// Applies a batch of writes conditionally, validating read guards first.
556    ///
557    /// Useful for Read-Modify-Write operations (e.g. Compare-And-Swap) without
558    /// interactive two-phase commit transactions. Rejects empty writes or empty guards.
559    pub fn apply_guarded_batch(
560        &mut self,
561        commit_ts: Timestamp,
562        guards: Vec<ReadGuard>,
563        writes: Vec<PhysicalWrite>,
564    ) -> Result<(), BatchError> {
565        if writes.is_empty() {
566            return Err(BatchError::EmptyBatch);
567        }
568        if guards.is_empty() {
569            return Err(BatchError::NoReadGuards);
570        }
571
572        let mut write_keys = std::collections::HashSet::new();
573        for w in &writes {
574            if !write_keys.insert(w.key.clone()) {
575                return Err(BatchError::DuplicateKeyInBatch { key: w.key.clone() });
576            }
577        }
578
579        // Validate all guards
580        for guard in &guards {
581            let (guard_key, guard_read_ts) = match guard {
582                ReadGuard::ExpectedVersion { key, read_ts, .. } => (key, read_ts),
583                ReadGuard::ExpectedValue { key, read_ts, .. } => (key, read_ts),
584            };
585
586            if commit_ts <= *guard_read_ts {
587                return Err(BatchError::InvalidCommitTimestamp {
588                    read_ts: *guard_read_ts,
589                    commit_ts,
590                });
591            }
592
593            // Check active intents
594            if let Some(intent) = self
595                .backend
596                .get_intent(guard_key)
597                .map_err(BatchError::Backend)?
598            {
599                return Err(BatchError::KeyLocked {
600                    key: guard_key.clone(),
601                    txn_id: intent.txn_id,
602                });
603            }
604
605            if let Some(latest_ts) = self
606                .backend
607                .get_latest_commit_ts(guard_key)
608                .map_err(BatchError::Backend)?
609            {
610                if latest_ts > *guard_read_ts {
611                    return Err(BatchError::GuardFailedNewerVersion {
612                        key: guard_key.clone(),
613                        read_ts: *guard_read_ts,
614                        actual_commit_ts: latest_ts,
615                    });
616                }
617            }
618
619            let visible_version = self
620                .backend
621                .get_visible_committed(guard_key, *guard_read_ts)
622                .map_err(BatchError::Backend)?;
623
624            match guard {
625                ReadGuard::ExpectedVersion {
626                    expected_commit_ts, ..
627                } => {
628                    let actual_commit_ts = visible_version.as_ref().map(|v| v.commit_ts);
629                    if actual_commit_ts != *expected_commit_ts {
630                        return Err(BatchError::GuardFailedVersionMismatch {
631                            key: guard_key.clone(),
632                            expected: *expected_commit_ts,
633                            actual: actual_commit_ts,
634                        });
635                    }
636                }
637                ReadGuard::ExpectedValue { expected_value, .. } => {
638                    let actual_value = visible_version.as_ref().and_then(|v| v.value.as_ref());
639                    if actual_value != expected_value.as_ref() {
640                        return Err(BatchError::GuardFailedValueMismatch {
641                            key: guard_key.clone(),
642                        });
643                    }
644                }
645            }
646        }
647
648        // Validate write keys against intents and duplicate versions
649        for w in &writes {
650            if let Some(intent) = self
651                .backend
652                .get_intent(&w.key)
653                .map_err(BatchError::Backend)?
654            {
655                return Err(BatchError::KeyLocked {
656                    key: w.key.clone(),
657                    txn_id: intent.txn_id,
658                });
659            }
660
661            if let Some(latest_ts) = self
662                .backend
663                .get_latest_commit_ts(&w.key)
664                .map_err(BatchError::Backend)?
665            {
666                if commit_ts <= latest_ts {
667                    return Err(BatchError::CommitTsTooOld {
668                        key: w.key.clone(),
669                        commit_ts,
670                        latest_commit_ts: latest_ts,
671                    });
672                }
673            }
674        }
675
676        // Apply all-or-nothing
677        let mut commits = Vec::with_capacity(writes.len());
678        for w in writes {
679            commits.push(CommittedVersion {
680                key: w.key,
681                commit_ts,
682                value: w.value,
683            });
684        }
685
686        self.backend
687            .put_committed_batch(commits)
688            .map_err(BatchError::Backend)?;
689        Ok(())
690    }
691
692    /// Performs an incremental step of garbage collection.
693    ///
694    /// Obsolete versions older than `safe_point_ts` are removed up to the `budget`.
695    pub fn gc_incremental(
696        &mut self,
697        safe_point_ts: Timestamp,
698        cursor: Option<IncrementalGcCursor>,
699        options: GcOptions,
700    ) -> Result<IncrementalGcResult, GcError> {
701        if options.budget.max_keys == 0 || options.budget.max_versions == 0 {
702            return Err(GcError::InvalidGcBudget);
703        }
704
705        let start_key = cursor.and_then(|c| c.next_key);
706
707        let keys = self
708            .backend
709            .keys_from(start_key.as_deref(), options.budget.max_keys + 1)
710            .map_err(GcError::Backend)?;
711
712        let mut keys_scanned = 0;
713        let mut versions_scanned = 0;
714        let mut versions_removed = 0;
715        let mut intents_preserved = 0;
716
717        let mut next_cursor_key = None;
718        let mut done = false;
719        let mut exhausted_versions = false;
720
721        let num_keys_to_process = std::cmp::min(keys.len(), options.budget.max_keys);
722
723        for key in keys.iter().take(num_keys_to_process) {
724            keys_scanned += 1;
725
726            let has_intent = self
727                .backend
728                .get_intent(key)
729                .map_err(GcError::Backend)?
730                .is_some();
731            if has_intent {
732                intents_preserved += 1;
733            }
734
735            let keeper = self
736                .backend
737                .get_visible_committed(key, safe_point_ts)
738                .map_err(GcError::Backend)?;
739
740            if let Some(keeper_ver) = keeper {
741                versions_scanned += 1; // Count the keeper lookup
742
743                let limit = options.budget.max_versions - versions_removed;
744                if limit == 0 {
745                    // We check if there are actually any versions to remove before breaking
746                    let check_more = self
747                        .backend
748                        .get_committed_timestamps_before(key, keeper_ver.commit_ts, 1)
749                        .map_err(GcError::Backend)?;
750
751                    if !check_more.is_empty() {
752                        next_cursor_key = Some(key.clone());
753                        exhausted_versions = true;
754                        break;
755                    }
756
757                    // Out of budget, but maybe it's a final tombstone?
758                    // We need at least 1 budget unit to remove it, so we must revisit next time.
759                    if options.collapse_final_tombstones
760                        && keeper_ver.value.is_none()
761                        && !has_intent
762                    {
763                        if let Some(latest_ts) = self
764                            .backend
765                            .get_latest_commit_ts(key)
766                            .map_err(GcError::Backend)?
767                        {
768                            if latest_ts == keeper_ver.commit_ts {
769                                next_cursor_key = Some(key.clone());
770                                exhausted_versions = true;
771                                break;
772                            }
773                        }
774                    }
775                    continue;
776                }
777
778                // Check if this is a final tombstone collapse case
779                let mut is_final_tombstone = false;
780                if options.collapse_final_tombstones && keeper_ver.value.is_none() && !has_intent {
781                    if let Some(latest_ts) = self
782                        .backend
783                        .get_latest_commit_ts(key)
784                        .map_err(GcError::Backend)?
785                    {
786                        if latest_ts == keeper_ver.commit_ts {
787                            is_final_tombstone = true;
788                        }
789                    }
790                }
791
792                if is_final_tombstone {
793                    // Final tombstone case: we must remove the tombstone and all older versions atomically.
794                    // We query up to limit + 1 older versions to see if they all fit in the remaining budget.
795                    let mut older_versions = self
796                        .backend
797                        .get_committed_timestamps_before(key, keeper_ver.commit_ts, limit + 1)
798                        .map_err(GcError::Backend)?;
799
800                    versions_scanned += older_versions.len();
801
802                    let has_more = older_versions.len() > limit;
803                    if has_more {
804                        // There are more older versions than the remaining budget.
805                        // Pop the extra to only remove `limit` older versions.
806                        older_versions.pop();
807                        versions_scanned -= 1;
808
809                        // We cannot delete the tombstone yet, because the remaining older versions
810                        // exceed the budget. We only delete the older versions.
811                        for ts in older_versions {
812                            self.backend
813                                .remove_committed_version(key, ts)
814                                .map_err(GcError::Backend)?;
815                            versions_removed += 1;
816                        }
817
818                        next_cursor_key = Some(key.clone());
819                        exhausted_versions = true;
820                        break;
821                    } else {
822                        // older_versions.len() <= limit.
823                        // The total versions to remove (older versions + tombstone) is older_versions.len() + 1.
824                        let older_len = older_versions.len();
825                        if older_len < limit {
826                            // Fits in the remaining budget! Collapse them atomically.
827                            self.backend
828                                .collapse_tombstone(key, keeper_ver.commit_ts, older_versions)
829                                .map_err(GcError::Backend)?;
830
831                            versions_removed += older_len + 1;
832                        } else {
833                            // older_versions.len() == limit.
834                            // The total versions to remove (older_versions.len() + 1) exceeds the remaining budget (limit).
835                            // We can only remove the older versions in this pass, leaving the tombstone.
836                            for ts in older_versions {
837                                self.backend
838                                    .remove_committed_version(key, ts)
839                                    .map_err(GcError::Backend)?;
840                                versions_removed += 1;
841                            }
842
843                            next_cursor_key = Some(key.clone());
844                            exhausted_versions = true;
845                            break;
846                        }
847                    }
848                } else {
849                    // Normal GC case (not a final tombstone): we only remove older versions.
850                    let mut to_remove = self
851                        .backend
852                        .get_committed_timestamps_before(key, keeper_ver.commit_ts, limit + 1)
853                        .map_err(GcError::Backend)?;
854
855                    versions_scanned += to_remove.len();
856
857                    let has_more = to_remove.len() > limit;
858                    if has_more {
859                        to_remove.pop();
860                        versions_scanned -= 1;
861                    }
862
863                    for ts in to_remove {
864                        self.backend
865                            .remove_committed_version(key, ts)
866                            .map_err(GcError::Backend)?;
867                        versions_removed += 1;
868                    }
869
870                    if has_more {
871                        next_cursor_key = Some(key.clone());
872                        exhausted_versions = true;
873                        break;
874                    }
875                }
876            }
877        }
878
879        if !exhausted_versions {
880            if keys.len() > options.budget.max_keys {
881                next_cursor_key = Some(keys[options.budget.max_keys].clone());
882            } else {
883                done = true;
884            }
885        }
886
887        Ok(IncrementalGcResult {
888            cursor: IncrementalGcCursor {
889                next_key: next_cursor_key,
890            },
891            done,
892            keys_scanned,
893            versions_scanned,
894            versions_removed,
895            intents_preserved,
896        })
897    }
898
899    /// Performs a full garbage collection by repeatedly calling `gc_incremental`.
900    ///
901    /// This is maintained for compatibility.
902    pub fn gc(&mut self, safe_point_ts: Timestamp, options: GcOptions) -> Result<GcStats, GcError> {
903        let mut total_versions_removed = 0;
904
905        let mut cursor = None;
906
907        loop {
908            let res = self.gc_incremental(safe_point_ts, cursor.take(), options)?;
909            total_versions_removed += res.versions_removed;
910
911            if res.done {
912                break;
913            }
914            cursor = Some(res.cursor);
915        }
916
917        let mut total_intents_preserved = 0;
918        for key in self.backend.all_keys().map_err(GcError::Backend)? {
919            if self
920                .backend
921                .get_intent(&key)
922                .map_err(GcError::Backend)?
923                .is_some()
924            {
925                total_intents_preserved += 1;
926            }
927        }
928
929        Ok(GcStats {
930            versions_removed: total_versions_removed,
931            intents_preserved: total_intents_preserved,
932        })
933    }
934}