Skip to main content

lsm_tree/
ingestion.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5use crate::{
6    AnyTree, UserKey, UserValue, blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion,
7};
8
9/// Unified ingestion builder over `AnyTree`
10// Keep zero allocations and direct dispatch; boxing introduces heap indirection and `dyn` adds virtual dispatch.
11// Ingestion calls use `&mut self` in tight loops; the active variant is stable and branch prediction makes the match cheap.
12// Allowing this lint preserves hot-path performance at the cost of a larger enum size.
13#[expect(clippy::large_enum_variant)]
14pub enum AnyIngestion<'a> {
15    /// Ingestion for a standard LSM-tree
16    Standard(Ingestion<'a>),
17
18    /// Ingestion for a [`BlobTree`](crate::BlobTree) with KV separation
19    Blob(BlobIngestion<'a>),
20}
21
22impl AnyIngestion<'_> {
23    /// Writes a key-value pair.
24    ///
25    /// # Errors
26    ///
27    /// Will return `Err` if an IO error occurs.
28    pub fn write<K: Into<UserKey>, V: Into<UserValue>>(
29        &mut self,
30        key: K,
31        value: V,
32    ) -> crate::Result<()> {
33        match self {
34            Self::Standard(i) => i.write(key.into(), value.into()),
35            Self::Blob(b) => b.write(key.into(), value.into()),
36        }
37    }
38
39    /// Writes a tombstone for a key.
40    ///
41    /// # Errors
42    ///
43    /// Will return `Err` if an IO error occurs.
44    pub fn write_tombstone<K: Into<UserKey>>(&mut self, key: K) -> crate::Result<()> {
45        match self {
46            Self::Standard(i) => i.write_tombstone(key.into()),
47            Self::Blob(b) => b.write_tombstone(key.into()),
48        }
49    }
50
51    /// Writes a weak tombstone for a key.
52    ///
53    /// # Examples
54    ///
55    /// ```
56    /// # use lsm_tree::Config;
57    /// # let folder = tempfile::tempdir()?;
58    /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
59    /// #
60    /// let mut ingestion = tree.ingestion()?;
61    /// ingestion.write("a", "abc")?;
62    /// ingestion.write_weak_tombstone("b")?;
63    /// ingestion.finish()?;
64    /// #
65    /// # Ok::<(), lsm_tree::Error>(())
66    /// ```
67    ///
68    /// # Errors
69    ///
70    /// Will return `Err` if an IO error occurs.
71    pub fn write_weak_tombstone<K: Into<UserKey>>(&mut self, key: K) -> crate::Result<()> {
72        match self {
73            Self::Standard(i) => i.write_weak_tombstone(key.into()),
74            Self::Blob(b) => b.write_weak_tombstone(key.into()),
75        }
76    }
77
78    /// Writes a consumer-provided columnar batch (its value sub-columns) as one
79    /// columnar block, stored directly without re-transposing the value.
80    ///
81    /// The batch carries the three intrinsic columns (`[key, seqno, value-type]`)
82    /// plus one or more value sub-columns. Its keys must be strictly increasing
83    /// (by the tree comparator) within the batch and after any previously written
84    /// data, and every per-row seqno must be `0`: [`finish`](Self::finish) assigns
85    /// the atomic global sequence number. The columnar layout must be enabled
86    /// (`columnar` in the runtime config) on a standard tree; a row-mode or blob
87    /// tree rejects the batch.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the batch shape is invalid, the keys are not strictly
92    /// increasing, any row carries a non-zero seqno, the layout is not columnar,
93    /// a block write fails, or the tree is a blob tree (columnar ingest does not
94    /// support KV separation).
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// use lsm_tree::table::columnar::{Column, TypeTag, entries_to_column_batch};
100    /// use lsm_tree::{AnyTree, Config, InternalValue, ValueType};
101    ///
102    /// let folder = tempfile::tempdir()?;
103    /// let any = Config::new(folder, Default::default(), Default::default()).open()?;
104    /// if let AnyTree::Standard(tree) = &any {
105    ///     tree.update_runtime_config(|cfg| cfg.columnar = true)?;
106    /// }
107    ///
108    /// // One row whose value is a single fixed-4 sub-column (id 3).
109    /// let mut batch = entries_to_column_batch(&[InternalValue::from_components(
110    ///     b"k0".to_vec(),
111    ///     b"x".to_vec(),
112    ///     0,
113    ///     ValueType::Value,
114    /// )])?;
115    /// batch.columns.pop();
116    /// batch.columns.push(Column {
117    ///     column_id: 3,
118    ///     type_tag: TypeTag::Fixed(4),
119    ///     validity: None,
120    ///     data: vec![1, 0, 0, 0],
121    /// });
122    ///
123    /// let mut ingestion = any.ingestion()?;
124    /// ingestion.write_columnar_batch(&batch)?;
125    /// ingestion.finish()?;
126    /// # Ok::<(), lsm_tree::Error>(())
127    /// ```
128    #[cfg(feature = "columnar")]
129    pub fn write_columnar_batch(
130        &mut self,
131        batch: &crate::table::columnar::ColumnBatch,
132    ) -> crate::Result<()> {
133        match self {
134            Self::Standard(i) => i.write_columnar_batch(batch),
135            Self::Blob(_) => Err(crate::Error::FeatureUnsupported(
136                "columnar batch ingest is not supported for blob trees",
137            )),
138        }
139    }
140
141    /// Finalizes ingestion and registers created tables (and blob files if present).
142    ///
143    /// # Errors
144    ///
145    /// Will return `Err` if an IO error occurs.
146    pub fn finish(self) -> crate::Result<()> {
147        match self {
148            Self::Standard(i) => i.finish(),
149            Self::Blob(b) => b.finish(),
150        }
151    }
152}
153
154impl AnyTree {
155    /// Starts an ingestion for any tree type (standard or blob).
156    ///
157    /// # Errors
158    ///
159    /// Will return `Err` if an IO error occurs.
160    pub fn ingestion(&self) -> crate::Result<AnyIngestion<'_>> {
161        match self {
162            Self::Standard(t) => Ok(AnyIngestion::Standard(Ingestion::new(t)?)),
163            Self::Blob(b) => Ok(AnyIngestion::Blob(BlobIngestion::new(b)?)),
164        }
165    }
166}