lsm_tree/scan_since.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5//! Change-data-capture event stream for [`Tree::scan_since_seqno`].
6//!
7//! [`Tree::scan_since_seqno`]: crate::Tree::scan_since_seqno
8
9use crate::{SeqNo, Slice};
10
11/// A single change event emitted by [`Tree::scan_since_seqno`](crate::Tree::scan_since_seqno).
12///
13/// Each event carries the sequence number at which the change was committed.
14/// Events are emitted in increasing seqno order, so a downstream consumer
15/// (replica, Kafka connector, Debezium-style pipeline) can replay them in
16/// order to reconstruct the source's history. Superseded versions are **not**
17/// collapsed: a key updated three times after the target seqno yields three
18/// events, mirroring the source's full change history rather than just its
19/// latest visible state.
20///
21/// # Replay semantics
22///
23/// Applying events in seqno order reconstructs the state delta. An
24/// `Insert(K, V1, s=150)` followed by a `PointTombstone(K, s=200)` means "K was
25/// inserted with V1 at 150, then deleted at 200" — the net effect on a replica
26/// starting before 150 is "create K with V1, then delete K", matching the
27/// source.
28///
29/// # Merge operands
30///
31/// A store using a [`MergeOperator`](crate::MergeOperator) records partial
32/// updates as [`MergeOperand`](Self::MergeOperand) events rather than resolved
33/// values: the consumer applies the same merge operator to reproduce the
34/// source's state. Emitting a merge as an `Insert` would make a replica
35/// overwrite instead of merge, diverging from the source; resolving the merge
36/// chain here would require reading the full base+operand history and defeat
37/// the block-skip optimization, so the raw operand is surfaced instead.
38///
39/// # KV-separated (blob) values
40///
41/// When a value is stored out-of-line in a blob file, the blob is resolved and
42/// the real value is carried in the [`Insert`](Self::Insert) event, so the
43/// consumer never needs access to the source's blob files to replicate.
44#[derive(Clone, Debug, Eq, PartialEq)]
45pub enum ScanSinceEvent {
46 /// A record was written (or overwritten) at `seqno`.
47 ///
48 /// Covers both inline values and values resolved from a blob file.
49 Insert {
50 /// User key that was written.
51 key: Slice,
52 /// Value written at `seqno` (resolved from a blob file if the entry
53 /// was KV-separated).
54 value: Slice,
55 /// Sequence number at which the write was committed.
56 seqno: SeqNo,
57 },
58
59 /// A merge operand was written at `seqno`.
60 ///
61 /// The consumer must apply the source's [`MergeOperator`](crate::MergeOperator)
62 /// to combine this operand with the prior value / operands, exactly as the
63 /// source does.
64 MergeOperand {
65 /// User key the operand applies to.
66 key: Slice,
67 /// Raw merge operand bytes, to be combined via the merge operator.
68 operand: Slice,
69 /// Sequence number at which the operand was committed.
70 seqno: SeqNo,
71 },
72
73 /// A single key was deleted at `seqno`.
74 ///
75 /// Covers both regular and weak (single-delete) tombstones; both reduce to
76 /// "this key is gone as of `seqno`" for replay purposes.
77 PointTombstone {
78 /// User key that was deleted.
79 key: Slice,
80 /// Sequence number at which the deletion was committed.
81 seqno: SeqNo,
82 },
83
84 /// A half-open key range `[start_key, end_key)` was deleted at `seqno`.
85 RangeTombstone {
86 /// Inclusive lower bound of the deleted range.
87 start_key: Slice,
88 /// Exclusive upper bound of the deleted range.
89 end_key: Slice,
90 /// Sequence number at which the range deletion was committed.
91 seqno: SeqNo,
92 },
93}
94
95impl ScanSinceEvent {
96 /// Sequence number at which this change was committed.
97 ///
98 /// Events from [`Tree::scan_since_seqno`](crate::Tree::scan_since_seqno)
99 /// arrive in increasing order of this value.
100 #[must_use]
101 pub fn seqno(&self) -> SeqNo {
102 match self {
103 Self::Insert { seqno, .. }
104 | Self::MergeOperand { seqno, .. }
105 | Self::PointTombstone { seqno, .. }
106 | Self::RangeTombstone { seqno, .. } => *seqno,
107 }
108 }
109}