lsm_tree/write_batch.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5//! Write batch for bulk memtable insertion with shared seqno.
6//!
7//! A [`WriteBatch`] collects multiple write operations (insert, remove, merge)
8//! and applies them to the active memtable with a single seqno.
9//! This reduces per-operation overhead:
10//!
11//! - **One version-history lock** acquisition instead of N
12//! - **Batch size accounting**: single `fetch_add` for total size
13//! - **Shared seqno**: all entries in a batch share the same sequence number
14//!
15//! **Visibility contract:** entries are inserted into the memtable one at a time
16//! and become individually visible to concurrent readers as they are written.
17//! Atomic batch visibility requires the **caller** to publish the batch seqno
18//! (via `visible_seqno.fetch_max(batch_seqno + 1)`) only **after**
19//! [`AbstractTree::apply_batch`] returns. This is the same pattern used by
20//! fjall's keyspace for single-writer batches.
21
22use crate::{UserKey, UserValue, ValueType, value::InternalValue};
23
24/// A single entry in a [`WriteBatch`].
25#[derive(Clone, Debug)]
26enum WriteBatchEntry {
27 /// Insert or update a key-value pair.
28 Insert { key: UserKey, value: UserValue },
29
30 /// Delete a key (standard tombstone).
31 Remove { key: UserKey },
32
33 /// Delete a key (weak/single-delete tombstone).
34 RemoveWeak { key: UserKey },
35
36 /// Write a merge operand for a key.
37 Merge { key: UserKey, value: UserValue },
38}
39
40/// Batch of write operations applied with a shared seqno.
41///
42/// **Duplicate keys:** all entries receive the same seqno. The memtable
43/// skiplist orders by `(user_key, Reverse(seqno))` — `value_type` does NOT
44/// break ties. Two entries with the same `(user_key, seqno)` compare equal
45/// regardless of operation type, so one may silently overwrite the other.
46///
47/// - **Repeated `merge()` on the same key:** safe. All merge operands are
48/// collected during reads regardless of skiplist position.
49/// - **Mixed ops on the same key** (e.g. `insert` + `remove`): not allowed.
50/// `materialize()` rejects these batches with `Error::MixedOperationBatch`
51/// in all builds. Callers must canonicalize mixed-op duplicates into a
52/// single final operation before batching.
53///
54/// # Examples
55///
56/// ```
57/// use lsm_tree::WriteBatch;
58///
59/// let mut batch = WriteBatch::new();
60/// batch.insert("key1", "value1");
61/// batch.insert("key2", "value2");
62/// batch.remove("key3");
63///
64/// assert_eq!(batch.len(), 3);
65/// assert!(!batch.is_empty());
66/// ```
67#[derive(Clone, Debug, Default)]
68pub struct WriteBatch {
69 entries: Vec<WriteBatchEntry>,
70}
71
72impl WriteBatch {
73 /// Creates an empty write batch.
74 #[must_use]
75 pub fn new() -> Self {
76 Self {
77 entries: Vec::new(),
78 }
79 }
80
81 /// Creates an empty write batch with the given capacity.
82 #[must_use]
83 pub fn with_capacity(capacity: usize) -> Self {
84 Self {
85 entries: Vec::with_capacity(capacity),
86 }
87 }
88
89 /// Inserts a key-value pair into the batch.
90 pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
91 self.entries.push(WriteBatchEntry::Insert {
92 key: key.into(),
93 value: value.into(),
94 });
95 }
96
97 /// Adds a delete (tombstone) for a key.
98 pub fn remove<K: Into<UserKey>>(&mut self, key: K) {
99 self.entries
100 .push(WriteBatchEntry::Remove { key: key.into() });
101 }
102
103 /// Adds a weak delete (single-delete tombstone) for a key.
104 pub fn remove_weak<K: Into<UserKey>>(&mut self, key: K) {
105 self.entries
106 .push(WriteBatchEntry::RemoveWeak { key: key.into() });
107 }
108
109 /// Adds a merge operand for a key.
110 ///
111 /// Multiple `merge()` calls for the same key within one batch are supported:
112 /// they produce distinct merge operands that are resolved together during
113 /// reads (via the configured [`MergeOperator`](crate::MergeOperator)).
114 /// The duplicate-key warning in the struct doc applies to mixed operation
115 /// types (e.g. `insert` + `remove` on the same key), not to multiple merges.
116 pub fn merge<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
117 self.entries.push(WriteBatchEntry::Merge {
118 key: key.into(),
119 value: value.into(),
120 });
121 }
122
123 /// Returns the number of operations in the batch.
124 #[must_use]
125 pub fn len(&self) -> usize {
126 self.entries.len()
127 }
128
129 /// Returns `true` if the batch contains no operations.
130 #[must_use]
131 pub fn is_empty(&self) -> bool {
132 self.entries.is_empty()
133 }
134
135 /// Clears the batch, removing all entries.
136 pub fn clear(&mut self) {
137 self.entries.clear();
138 }
139
140 /// Materializes all entries into [`InternalValue`]s with the given seqno.
141 ///
142 /// # Errors
143 ///
144 /// Returns [`Error::MixedOperationBatch`](crate::Error::MixedOperationBatch)
145 /// if any user key appears with differing operation types (e.g. insert + remove),
146 /// which would make equal-key entries with different operation types ambiguous
147 /// to later reads and merges.
148 #[doc(hidden)]
149 pub(crate) fn materialize(self, seqno: crate::SeqNo) -> crate::Result<Vec<InternalValue>> {
150 // Reject mixed-op duplicates unconditionally — `InternalKey` ordering
151 // ties on `(user_key, seqno)` without `value_type` as tie-breaker,
152 // making the read/compaction outcome ambiguous.
153 {
154 let mut seen: std::collections::HashMap<&[u8], ValueType, rustc_hash::FxBuildHasher> =
155 std::collections::HashMap::with_capacity_and_hasher(
156 self.entries.len(),
157 rustc_hash::FxBuildHasher,
158 );
159 for entry in &self.entries {
160 let (key_bytes, vtype): (&[u8], _) = match entry {
161 WriteBatchEntry::Insert { key, .. } => (key.as_ref(), ValueType::Value),
162 WriteBatchEntry::Remove { key } => (key.as_ref(), ValueType::Tombstone),
163 WriteBatchEntry::RemoveWeak { key } => (key.as_ref(), ValueType::WeakTombstone),
164 WriteBatchEntry::Merge { key, .. } => (key.as_ref(), ValueType::MergeOperand),
165 };
166 if let Some(&prev_type) = seen.get(key_bytes) {
167 if prev_type != vtype {
168 return Err(crate::Error::MixedOperationBatch);
169 }
170 } else {
171 seen.insert(key_bytes, vtype);
172 }
173 }
174 }
175
176 Ok(self
177 .entries
178 .into_iter()
179 .map(|entry| match entry {
180 WriteBatchEntry::Insert { key, value } => {
181 InternalValue::from_components(key, value, seqno, ValueType::Value)
182 }
183 WriteBatchEntry::Remove { key } => InternalValue::new_tombstone(key, seqno),
184 WriteBatchEntry::RemoveWeak { key } => {
185 InternalValue::new_weak_tombstone(key, seqno)
186 }
187 WriteBatchEntry::Merge { key, value } => {
188 InternalValue::new_merge_operand(key, value, seqno)
189 }
190 })
191 .collect())
192 }
193}