lsm_tree/ingestion.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5use crate::{
6 AnyTree, UserKey, UserValue, blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion,
7};
8
9/// Unified ingestion builder over `AnyTree`
10// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch.
11// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap.
12// Allowing this lint preserves hot-path performance at the cost of a larger enum size.
13#[expect(clippy::large_enum_variant)]
14pub enum AnyIngestion<'a> {
15 /// Ingestion for a standard LSM-tree
16 Standard(Ingestion<'a>),
17
18 /// Ingestion for a [`BlobTree`](crate::BlobTree) with KV separation
19 Blob(BlobIngestion<'a>),
20}
21
22impl AnyIngestion<'_> {
23 /// Writes a key-value pair.
24 ///
25 /// # Errors
26 ///
27 /// Will return `Err` if an IO error occurs.
28 pub fn write<K: Into<UserKey>, V: Into<UserValue>>(
29 &mut self,
30 key: K,
31 value: V,
32 ) -> crate::Result<()> {
33 match self {
34 Self::Standard(i) => i.write(key.into(), value.into()),
35 Self::Blob(b) => b.write(key.into(), value.into()),
36 }
37 }
38
39 /// Writes a tombstone for a key.
40 ///
41 /// # Errors
42 ///
43 /// Will return `Err` if an IO error occurs.
44 pub fn write_tombstone<K: Into<UserKey>>(&mut self, key: K) -> crate::Result<()> {
45 match self {
46 Self::Standard(i) => i.write_tombstone(key.into()),
47 Self::Blob(b) => b.write_tombstone(key.into()),
48 }
49 }
50
51 /// Writes a weak tombstone for a key.
52 ///
53 /// # Examples
54 ///
55 /// ```
56 /// # use lsm_tree::Config;
57 /// # let folder = tempfile::tempdir()?;
58 /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
59 /// #
60 /// let mut ingestion = tree.ingestion()?;
61 /// ingestion.write("a", "abc")?;
62 /// ingestion.write_weak_tombstone("b")?;
63 /// ingestion.finish()?;
64 /// #
65 /// # Ok::<(), lsm_tree::Error>(())
66 /// ```
67 ///
68 /// # Errors
69 ///
70 /// Will return `Err` if an IO error occurs.
71 pub fn write_weak_tombstone<K: Into<UserKey>>(&mut self, key: K) -> crate::Result<()> {
72 match self {
73 Self::Standard(i) => i.write_weak_tombstone(key.into()),
74 Self::Blob(b) => b.write_weak_tombstone(key.into()),
75 }
76 }
77
78 /// Writes a consumer-provided columnar batch (its value sub-columns) as one
79 /// columnar block, stored directly without re-transposing the value.
80 ///
81 /// The batch carries the three intrinsic columns (`[key, seqno, value-type]`)
82 /// plus one or more value sub-columns. Its keys must be strictly increasing
83 /// (by the tree comparator) within the batch and after any previously written
84 /// data, and every per-row seqno must be `0`: [`finish`](Self::finish) assigns
85 /// the atomic global sequence number. The columnar layout must be enabled
86 /// (`columnar` in the runtime config) on a standard tree; a row-mode or blob
87 /// tree rejects the batch.
88 ///
89 /// # Errors
90 ///
91 /// Returns an error if the batch shape is invalid, the keys are not strictly
92 /// increasing, any row carries a non-zero seqno, the layout is not columnar,
93 /// a block write fails, or the tree is a blob tree (columnar ingest does not
94 /// support KV separation).
95 ///
96 /// # Examples
97 ///
98 /// ```
99 /// use lsm_tree::table::columnar::{Column, TypeTag, entries_to_column_batch};
100 /// use lsm_tree::{AnyTree, Config, InternalValue, ValueType};
101 ///
102 /// let folder = tempfile::tempdir()?;
103 /// let any = Config::new(folder, Default::default(), Default::default()).open()?;
104 /// if let AnyTree::Standard(tree) = &any {
105 /// tree.update_runtime_config(|cfg| cfg.columnar = true)?;
106 /// }
107 ///
108 /// // One row whose value is a single fixed-4 sub-column (id 3).
109 /// let mut batch = entries_to_column_batch(&[InternalValue::from_components(
110 /// b"k0".to_vec(),
111 /// b"x".to_vec(),
112 /// 0,
113 /// ValueType::Value,
114 /// )])?;
115 /// batch.columns.pop();
116 /// batch.columns.push(Column {
117 /// column_id: 3,
118 /// type_tag: TypeTag::Fixed(4),
119 /// validity: None,
120 /// data: vec![1, 0, 0, 0],
121 /// });
122 ///
123 /// let mut ingestion = any.ingestion()?;
124 /// ingestion.write_columnar_batch(&batch)?;
125 /// ingestion.finish()?;
126 /// # Ok::<(), lsm_tree::Error>(())
127 /// ```
128 #[cfg(feature = "columnar")]
129 pub fn write_columnar_batch(
130 &mut self,
131 batch: &crate::table::columnar::ColumnBatch,
132 ) -> crate::Result<()> {
133 match self {
134 Self::Standard(i) => i.write_columnar_batch(batch),
135 Self::Blob(_) => Err(crate::Error::FeatureUnsupported(
136 "columnar batch ingest is not supported for blob trees",
137 )),
138 }
139 }
140
141 /// Finalizes ingestion and registers created tables (and blob files if present).
142 ///
143 /// # Errors
144 ///
145 /// Will return `Err` if an IO error occurs.
146 pub fn finish(self) -> crate::Result<()> {
147 match self {
148 Self::Standard(i) => i.finish(),
149 Self::Blob(b) => b.finish(),
150 }
151 }
152}
153
154impl AnyTree {
155 /// Starts an ingestion for any tree type (standard or blob).
156 ///
157 /// # Errors
158 ///
159 /// Will return `Err` if an IO error occurs.
160 pub fn ingestion(&self) -> crate::Result<AnyIngestion<'_>> {
161 match self {
162 Self::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)),
163 Self::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)),
164 }
165 }
166}