Skip to main content

lsm_tree/
scan_since.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5//! Change-data-capture event stream for [`Tree::scan_since_seqno`].
6//!
7//! [`Tree::scan_since_seqno`]: crate::Tree::scan_since_seqno
8
9use crate::{SeqNo, Slice};
10
11/// A single change event emitted by [`Tree::scan_since_seqno`](crate::Tree::scan_since_seqno).
12///
13/// Each event carries the sequence number at which the change was committed.
14/// Events are emitted in increasing seqno order, so a downstream consumer
15/// (replica, Kafka connector, Debezium-style pipeline) can replay them in
16/// order to reconstruct the source's history. Superseded versions are **not**
17/// collapsed: a key updated three times after the target seqno yields three
18/// events, mirroring the source's full change history rather than just its
19/// latest visible state.
20///
21/// # Replay semantics
22///
23/// Applying events in seqno order reconstructs the state delta. An
24/// `Insert(K, V1, s=150)` followed by a `PointTombstone(K, s=200)` means "K was
25/// inserted with V1 at 150, then deleted at 200" — the net effect on a replica
26/// starting before 150 is "create K with V1, then delete K", matching the
27/// source.
28///
29/// # Merge operands
30///
31/// A store using a [`MergeOperator`](crate::MergeOperator) records partial
32/// updates as [`MergeOperand`](Self::MergeOperand) events rather than resolved
33/// values: the consumer applies the same merge operator to reproduce the
34/// source's state. Emitting a merge as an `Insert` would make a replica
35/// overwrite instead of merge, diverging from the source; resolving the merge
36/// chain here would require reading the full base+operand history and defeat
37/// the block-skip optimization, so the raw operand is surfaced instead.
38///
39/// # KV-separated (blob) values
40///
41/// When a value is stored out-of-line in a blob file, the blob is resolved and
42/// the real value is carried in the [`Insert`](Self::Insert) event, so the
43/// consumer never needs access to the source's blob files to replicate.
44#[derive(Clone, Debug, Eq, PartialEq)]
45pub enum ScanSinceEvent {
46    /// A record was written (or overwritten) at `seqno`.
47    ///
48    /// Covers both inline values and values resolved from a blob file.
49    Insert {
50        /// User key that was written.
51        key: Slice,
52        /// Value written at `seqno` (resolved from a blob file if the entry
53        /// was KV-separated).
54        value: Slice,
55        /// Sequence number at which the write was committed.
56        seqno: SeqNo,
57    },
58
59    /// A merge operand was written at `seqno`.
60    ///
61    /// The consumer must apply the source's [`MergeOperator`](crate::MergeOperator)
62    /// to combine this operand with the prior value / operands, exactly as the
63    /// source does.
64    MergeOperand {
65        /// User key the operand applies to.
66        key: Slice,
67        /// Raw merge operand bytes, to be combined via the merge operator.
68        operand: Slice,
69        /// Sequence number at which the operand was committed.
70        seqno: SeqNo,
71    },
72
73    /// A single key was deleted at `seqno`.
74    ///
75    /// Covers both regular and weak (single-delete) tombstones; both reduce to
76    /// "this key is gone as of `seqno`" for replay purposes.
77    PointTombstone {
78        /// User key that was deleted.
79        key: Slice,
80        /// Sequence number at which the deletion was committed.
81        seqno: SeqNo,
82    },
83
84    /// A half-open key range `[start_key, end_key)` was deleted at `seqno`.
85    RangeTombstone {
86        /// Inclusive lower bound of the deleted range.
87        start_key: Slice,
88        /// Exclusive upper bound of the deleted range.
89        end_key: Slice,
90        /// Sequence number at which the range deletion was committed.
91        seqno: SeqNo,
92    },
93}
94
95impl ScanSinceEvent {
96    /// Sequence number at which this change was committed.
97    ///
98    /// Events from [`Tree::scan_since_seqno`](crate::Tree::scan_since_seqno)
99    /// arrive in increasing order of this value.
100    #[must_use]
101    pub fn seqno(&self) -> SeqNo {
102        match self {
103            Self::Insert { seqno, .. }
104            | Self::MergeOperand { seqno, .. }
105            | Self::PointTombstone { seqno, .. }
106            | Self::RangeTombstone { seqno, .. } => *seqno,
107        }
108    }
109}