Skip to main content

lsm_tree/
write_batch.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5//! Write batch for bulk memtable insertion with shared seqno.
6//!
7//! A [`WriteBatch`] collects multiple write operations (insert, remove, merge)
8//! and applies them to the active memtable with a single seqno.
9//! This reduces per-operation overhead:
10//!
11//! - **One version-history lock** acquisition instead of N
12//! - **Batch size accounting**: single `fetch_add` for total size
13//! - **Shared seqno**: all entries in a batch share the same sequence number
14//!
15//! **Visibility contract:** entries are inserted into the memtable one at a time
16//! and become individually visible to concurrent readers as they are written.
17//! Atomic batch visibility requires the **caller** to publish the batch seqno
18//! (via `visible_seqno.fetch_max(batch_seqno + 1)`) only **after**
19//! [`AbstractTree::apply_batch`](crate::AbstractTree::apply_batch) returns. This is the same pattern used by
20//! fjall's keyspace for single-writer batches.
21
22use crate::{UserKey, UserValue, ValueType, value::InternalValue};
23#[cfg(not(feature = "std"))]
24use alloc::vec::Vec;
25
26/// A single entry in a [`WriteBatch`].
27#[derive(Clone, Debug)]
28enum WriteBatchEntry {
29    /// Insert or update a key-value pair.
30    Insert { key: UserKey, value: UserValue },
31
32    /// Delete a key (standard tombstone).
33    Remove { key: UserKey },
34
35    /// Delete a key (weak/single-delete tombstone).
36    RemoveWeak { key: UserKey },
37
38    /// Write a merge operand for a key.
39    Merge { key: UserKey, value: UserValue },
40}
41
42/// Batch of write operations applied with a shared seqno.
43///
44/// **Duplicate keys:** all entries receive the same seqno. The memtable
45/// skiplist orders by `(user_key, Reverse(seqno))` — `value_type` does NOT
46/// break ties. Two entries with the same `(user_key, seqno)` compare equal
47/// regardless of operation type, so one may silently overwrite the other.
48///
49/// - **Repeated `merge()` on the same key:** safe. All merge operands are
50///   collected during reads regardless of skiplist position.
51/// - **Mixed ops on the same key** (e.g. `insert` + `remove`): not allowed.
52///   `materialize()` rejects these batches with `Error::MixedOperationBatch`
53///   in all builds. Callers must canonicalize mixed-op duplicates into a
54///   single final operation before batching.
55///
56/// # Examples
57///
58/// ```
59/// use lsm_tree::WriteBatch;
60///
61/// let mut batch = WriteBatch::new();
62/// batch.insert("key1", "value1");
63/// batch.insert("key2", "value2");
64/// batch.remove("key3");
65///
66/// assert_eq!(batch.len(), 3);
67/// assert!(!batch.is_empty());
68/// ```
69#[derive(Clone, Debug, Default)]
70pub struct WriteBatch {
71    entries: Vec<WriteBatchEntry>,
72}
73
74impl WriteBatch {
75    /// Creates an empty write batch.
76    #[must_use]
77    pub fn new() -> Self {
78        Self {
79            entries: Vec::new(),
80        }
81    }
82
83    /// Creates an empty write batch with the given capacity.
84    #[must_use]
85    pub fn with_capacity(capacity: usize) -> Self {
86        Self {
87            entries: Vec::with_capacity(capacity),
88        }
89    }
90
91    /// Inserts a key-value pair into the batch.
92    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
93        self.entries.push(WriteBatchEntry::Insert {
94            key: key.into(),
95            value: value.into(),
96        });
97    }
98
99    /// Adds a delete (tombstone) for a key.
100    pub fn remove<K: Into<UserKey>>(&mut self, key: K) {
101        self.entries
102            .push(WriteBatchEntry::Remove { key: key.into() });
103    }
104
105    /// Adds a weak delete (single-delete tombstone) for a key.
106    pub fn remove_weak<K: Into<UserKey>>(&mut self, key: K) {
107        self.entries
108            .push(WriteBatchEntry::RemoveWeak { key: key.into() });
109    }
110
111    /// Adds a merge operand for a key.
112    ///
113    /// Multiple `merge()` calls for the same key within one batch are supported:
114    /// they produce distinct merge operands that are resolved together during
115    /// reads (via the configured [`MergeOperator`](crate::MergeOperator)).
116    /// The duplicate-key warning in the struct doc applies to mixed operation
117    /// types (e.g. `insert` + `remove` on the same key), not to multiple merges.
118    pub fn merge<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
119        self.entries.push(WriteBatchEntry::Merge {
120            key: key.into(),
121            value: value.into(),
122        });
123    }
124
125    /// Returns the number of operations in the batch.
126    #[must_use]
127    pub fn len(&self) -> usize {
128        self.entries.len()
129    }
130
131    /// Returns `true` if the batch contains no operations.
132    #[must_use]
133    pub fn is_empty(&self) -> bool {
134        self.entries.is_empty()
135    }
136
137    /// Clears the batch, removing all entries.
138    pub fn clear(&mut self) {
139        self.entries.clear();
140    }
141
142    /// Materializes all entries into [`InternalValue`]s with the given seqno.
143    ///
144    /// # Errors
145    ///
146    /// Returns [`Error::MixedOperationBatch`](crate::Error::MixedOperationBatch)
147    /// if any user key appears with differing operation types (e.g. insert + remove),
148    /// which would make equal-key entries with different operation types ambiguous
149    /// to later reads and merges.
150    #[doc(hidden)]
151    pub(crate) fn materialize(self, seqno: crate::SeqNo) -> crate::Result<Vec<InternalValue>> {
152        // Reject mixed-op duplicates unconditionally — `InternalKey` ordering
153        // ties on `(user_key, seqno)` without `value_type` as tie-breaker,
154        // making the read/compaction outcome ambiguous.
155        {
156            let mut seen: crate::HashMap<&[u8], ValueType> =
157                crate::HashMap::with_capacity_and_hasher(
158                    self.entries.len(),
159                    rustc_hash::FxBuildHasher,
160                );
161            for entry in &self.entries {
162                let (key_bytes, vtype): (&[u8], _) = match entry {
163                    WriteBatchEntry::Insert { key, .. } => (key.as_ref(), ValueType::Value),
164                    WriteBatchEntry::Remove { key } => (key.as_ref(), ValueType::Tombstone),
165                    WriteBatchEntry::RemoveWeak { key } => (key.as_ref(), ValueType::WeakTombstone),
166                    WriteBatchEntry::Merge { key, .. } => (key.as_ref(), ValueType::MergeOperand),
167                };
168                if let Some(&prev_type) = seen.get(key_bytes) {
169                    if prev_type != vtype {
170                        return Err(crate::Error::MixedOperationBatch);
171                    }
172                } else {
173                    seen.insert(key_bytes, vtype);
174                }
175            }
176        }
177
178        Ok(self
179            .entries
180            .into_iter()
181            .map(|entry| match entry {
182                WriteBatchEntry::Insert { key, value } => {
183                    InternalValue::from_components(key, value, seqno, ValueType::Value)
184                }
185                WriteBatchEntry::Remove { key } => InternalValue::new_tombstone(key, seqno),
186                WriteBatchEntry::RemoveWeak { key } => {
187                    InternalValue::new_weak_tombstone(key, seqno)
188                }
189                WriteBatchEntry::Merge { key, value } => {
190                    InternalValue::new_merge_operand(key, value, seqno)
191                }
192            })
193            .collect())
194    }
195}