lsm_tree/write_batch.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5//! Write batch for bulk memtable insertion with shared seqno.
6//!
7//! A [`WriteBatch`] collects multiple write operations (insert, remove, merge)
8//! and applies them to the active memtable with a single seqno.
9//! This reduces per-operation overhead:
10//!
11//! - **One version-history lock** acquisition instead of N
12//! - **Batch size accounting**: single `fetch_add` for total size
13//! - **Shared seqno**: all entries in a batch share the same sequence number
14//!
15//! **Visibility contract:** entries are inserted into the memtable one at a time
16//! and become individually visible to concurrent readers as they are written.
17//! Atomic batch visibility requires the **caller** to publish the batch seqno
18//! (via `visible_seqno.fetch_max(batch_seqno + 1)`) only **after**
19//! [`AbstractTree::apply_batch`](crate::AbstractTree::apply_batch) returns. This is the same pattern used by
20//! fjall's keyspace for single-writer batches.
21
22use crate::{UserKey, UserValue, ValueType, value::InternalValue};
23#[cfg(not(feature = "std"))]
24use alloc::vec::Vec;
25
26/// A single entry in a [`WriteBatch`].
27#[derive(Clone, Debug)]
28enum WriteBatchEntry {
29 /// Insert or update a key-value pair.
30 Insert { key: UserKey, value: UserValue },
31
32 /// Delete a key (standard tombstone).
33 Remove { key: UserKey },
34
35 /// Delete a key (weak/single-delete tombstone).
36 RemoveWeak { key: UserKey },
37
38 /// Write a merge operand for a key.
39 Merge { key: UserKey, value: UserValue },
40}
41
42/// Batch of write operations applied with a shared seqno.
43///
44/// **Duplicate keys:** all entries receive the same seqno. The memtable
45/// skiplist orders by `(user_key, Reverse(seqno))` — `value_type` does NOT
46/// break ties. Two entries with the same `(user_key, seqno)` compare equal
47/// regardless of operation type, so one may silently overwrite the other.
48///
49/// - **Repeated `merge()` on the same key:** safe. All merge operands are
50/// collected during reads regardless of skiplist position.
51/// - **Mixed ops on the same key** (e.g. `insert` + `remove`): not allowed.
52/// `materialize()` rejects these batches with `Error::MixedOperationBatch`
53/// in all builds. Callers must canonicalize mixed-op duplicates into a
54/// single final operation before batching.
55///
56/// # Examples
57///
58/// ```
59/// use lsm_tree::WriteBatch;
60///
61/// let mut batch = WriteBatch::new();
62/// batch.insert("key1", "value1");
63/// batch.insert("key2", "value2");
64/// batch.remove("key3");
65///
66/// assert_eq!(batch.len(), 3);
67/// assert!(!batch.is_empty());
68/// ```
69#[derive(Clone, Debug, Default)]
70pub struct WriteBatch {
71 entries: Vec<WriteBatchEntry>,
72}
73
74impl WriteBatch {
75 /// Creates an empty write batch.
76 #[must_use]
77 pub fn new() -> Self {
78 Self {
79 entries: Vec::new(),
80 }
81 }
82
83 /// Creates an empty write batch with the given capacity.
84 #[must_use]
85 pub fn with_capacity(capacity: usize) -> Self {
86 Self {
87 entries: Vec::with_capacity(capacity),
88 }
89 }
90
91 /// Inserts a key-value pair into the batch.
92 pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
93 self.entries.push(WriteBatchEntry::Insert {
94 key: key.into(),
95 value: value.into(),
96 });
97 }
98
99 /// Adds a delete (tombstone) for a key.
100 pub fn remove<K: Into<UserKey>>(&mut self, key: K) {
101 self.entries
102 .push(WriteBatchEntry::Remove { key: key.into() });
103 }
104
105 /// Adds a weak delete (single-delete tombstone) for a key.
106 pub fn remove_weak<K: Into<UserKey>>(&mut self, key: K) {
107 self.entries
108 .push(WriteBatchEntry::RemoveWeak { key: key.into() });
109 }
110
111 /// Adds a merge operand for a key.
112 ///
113 /// Multiple `merge()` calls for the same key within one batch are supported:
114 /// they produce distinct merge operands that are resolved together during
115 /// reads (via the configured [`MergeOperator`](crate::MergeOperator)).
116 /// The duplicate-key warning in the struct doc applies to mixed operation
117 /// types (e.g. `insert` + `remove` on the same key), not to multiple merges.
118 pub fn merge<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
119 self.entries.push(WriteBatchEntry::Merge {
120 key: key.into(),
121 value: value.into(),
122 });
123 }
124
125 /// Returns the number of operations in the batch.
126 #[must_use]
127 pub fn len(&self) -> usize {
128 self.entries.len()
129 }
130
131 /// Returns `true` if the batch contains no operations.
132 #[must_use]
133 pub fn is_empty(&self) -> bool {
134 self.entries.is_empty()
135 }
136
137 /// Clears the batch, removing all entries.
138 pub fn clear(&mut self) {
139 self.entries.clear();
140 }
141
142 /// Materializes all entries into [`InternalValue`]s with the given seqno.
143 ///
144 /// # Errors
145 ///
146 /// Returns [`Error::MixedOperationBatch`](crate::Error::MixedOperationBatch)
147 /// if any user key appears with differing operation types (e.g. insert + remove),
148 /// which would make equal-key entries with different operation types ambiguous
149 /// to later reads and merges.
150 #[doc(hidden)]
151 pub(crate) fn materialize(self, seqno: crate::SeqNo) -> crate::Result<Vec<InternalValue>> {
152 // Reject mixed-op duplicates unconditionally — `InternalKey` ordering
153 // ties on `(user_key, seqno)` without `value_type` as tie-breaker,
154 // making the read/compaction outcome ambiguous.
155 {
156 let mut seen: crate::HashMap<&[u8], ValueType> =
157 crate::HashMap::with_capacity_and_hasher(
158 self.entries.len(),
159 rustc_hash::FxBuildHasher,
160 );
161 for entry in &self.entries {
162 let (key_bytes, vtype): (&[u8], _) = match entry {
163 WriteBatchEntry::Insert { key, .. } => (key.as_ref(), ValueType::Value),
164 WriteBatchEntry::Remove { key } => (key.as_ref(), ValueType::Tombstone),
165 WriteBatchEntry::RemoveWeak { key } => (key.as_ref(), ValueType::WeakTombstone),
166 WriteBatchEntry::Merge { key, .. } => (key.as_ref(), ValueType::MergeOperand),
167 };
168 if let Some(&prev_type) = seen.get(key_bytes) {
169 if prev_type != vtype {
170 return Err(crate::Error::MixedOperationBatch);
171 }
172 } else {
173 seen.insert(key_bytes, vtype);
174 }
175 }
176 }
177
178 Ok(self
179 .entries
180 .into_iter()
181 .map(|entry| match entry {
182 WriteBatchEntry::Insert { key, value } => {
183 InternalValue::from_components(key, value, seqno, ValueType::Value)
184 }
185 WriteBatchEntry::Remove { key } => InternalValue::new_tombstone(key, seqno),
186 WriteBatchEntry::RemoveWeak { key } => {
187 InternalValue::new_weak_tombstone(key, seqno)
188 }
189 WriteBatchEntry::Merge { key, value } => {
190 InternalValue::new_merge_operand(key, value, seqno)
191 }
192 })
193 .collect())
194 }
195}