Skip to main content

lsm_tree/
write_batch.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5//! Write batch for bulk memtable insertion with shared seqno.
6//!
7//! A [`WriteBatch`] collects multiple write operations (insert, remove, merge)
8//! and applies them to the active memtable with a single seqno.
9//! This reduces per-operation overhead:
10//!
11//! - **One version-history lock** acquisition instead of N
12//! - **Batch size accounting**: single `fetch_add` for total size
13//! - **Shared seqno**: all entries in a batch share the same sequence number
14//!
15//! **Visibility contract:** entries are inserted into the memtable one at a time
16//! and become individually visible to concurrent readers as they are written.
17//! Atomic batch visibility requires the **caller** to publish the batch seqno
18//! (via `visible_seqno.fetch_max(batch_seqno + 1)`) only **after**
19//! [`AbstractTree::apply_batch`] returns. This is the same pattern used by
20//! fjall's keyspace for single-writer batches.
21
22use crate::{UserKey, UserValue, ValueType, value::InternalValue};
23
24/// A single entry in a [`WriteBatch`].
25#[derive(Clone, Debug)]
26enum WriteBatchEntry {
27    /// Insert or update a key-value pair.
28    Insert { key: UserKey, value: UserValue },
29
30    /// Delete a key (standard tombstone).
31    Remove { key: UserKey },
32
33    /// Delete a key (weak/single-delete tombstone).
34    RemoveWeak { key: UserKey },
35
36    /// Write a merge operand for a key.
37    Merge { key: UserKey, value: UserValue },
38}
39
40/// Batch of write operations applied with a shared seqno.
41///
42/// **Duplicate keys:** all entries receive the same seqno. The memtable
43/// skiplist orders by `(user_key, Reverse(seqno))` — `value_type` does NOT
44/// break ties. Two entries with the same `(user_key, seqno)` compare equal
45/// regardless of operation type, so one may silently overwrite the other.
46///
47/// - **Repeated `merge()` on the same key:** safe. All merge operands are
48///   collected during reads regardless of skiplist position.
49/// - **Mixed ops on the same key** (e.g. `insert` + `remove`): not allowed.
50///   `materialize()` rejects these batches with `Error::MixedOperationBatch`
51///   in all builds. Callers must canonicalize mixed-op duplicates into a
52///   single final operation before batching.
53///
54/// # Examples
55///
56/// ```
57/// use lsm_tree::WriteBatch;
58///
59/// let mut batch = WriteBatch::new();
60/// batch.insert("key1", "value1");
61/// batch.insert("key2", "value2");
62/// batch.remove("key3");
63///
64/// assert_eq!(batch.len(), 3);
65/// assert!(!batch.is_empty());
66/// ```
67#[derive(Clone, Debug, Default)]
68pub struct WriteBatch {
69    entries: Vec<WriteBatchEntry>,
70}
71
72impl WriteBatch {
73    /// Creates an empty write batch.
74    #[must_use]
75    pub fn new() -> Self {
76        Self {
77            entries: Vec::new(),
78        }
79    }
80
81    /// Creates an empty write batch with the given capacity.
82    #[must_use]
83    pub fn with_capacity(capacity: usize) -> Self {
84        Self {
85            entries: Vec::with_capacity(capacity),
86        }
87    }
88
89    /// Inserts a key-value pair into the batch.
90    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
91        self.entries.push(WriteBatchEntry::Insert {
92            key: key.into(),
93            value: value.into(),
94        });
95    }
96
97    /// Adds a delete (tombstone) for a key.
98    pub fn remove<K: Into<UserKey>>(&mut self, key: K) {
99        self.entries
100            .push(WriteBatchEntry::Remove { key: key.into() });
101    }
102
103    /// Adds a weak delete (single-delete tombstone) for a key.
104    pub fn remove_weak<K: Into<UserKey>>(&mut self, key: K) {
105        self.entries
106            .push(WriteBatchEntry::RemoveWeak { key: key.into() });
107    }
108
109    /// Adds a merge operand for a key.
110    ///
111    /// Multiple `merge()` calls for the same key within one batch are supported:
112    /// they produce distinct merge operands that are resolved together during
113    /// reads (via the configured [`MergeOperator`](crate::MergeOperator)).
114    /// The duplicate-key warning in the struct doc applies to mixed operation
115    /// types (e.g. `insert` + `remove` on the same key), not to multiple merges.
116    pub fn merge<K: Into<UserKey>, V: Into<UserValue>>(&mut self, key: K, value: V) {
117        self.entries.push(WriteBatchEntry::Merge {
118            key: key.into(),
119            value: value.into(),
120        });
121    }
122
123    /// Returns the number of operations in the batch.
124    #[must_use]
125    pub fn len(&self) -> usize {
126        self.entries.len()
127    }
128
129    /// Returns `true` if the batch contains no operations.
130    #[must_use]
131    pub fn is_empty(&self) -> bool {
132        self.entries.is_empty()
133    }
134
135    /// Clears the batch, removing all entries.
136    pub fn clear(&mut self) {
137        self.entries.clear();
138    }
139
140    /// Materializes all entries into [`InternalValue`]s with the given seqno.
141    ///
142    /// # Errors
143    ///
144    /// Returns [`Error::MixedOperationBatch`](crate::Error::MixedOperationBatch)
145    /// if any user key appears with differing operation types (e.g. insert + remove),
146    /// which would make equal-key entries with different operation types ambiguous
147    /// to later reads and merges.
148    #[doc(hidden)]
149    pub(crate) fn materialize(self, seqno: crate::SeqNo) -> crate::Result<Vec<InternalValue>> {
150        // Reject mixed-op duplicates unconditionally — `InternalKey` ordering
151        // ties on `(user_key, seqno)` without `value_type` as tie-breaker,
152        // making the read/compaction outcome ambiguous.
153        {
154            let mut seen: std::collections::HashMap<&[u8], ValueType, rustc_hash::FxBuildHasher> =
155                std::collections::HashMap::with_capacity_and_hasher(
156                    self.entries.len(),
157                    rustc_hash::FxBuildHasher,
158                );
159            for entry in &self.entries {
160                let (key_bytes, vtype): (&[u8], _) = match entry {
161                    WriteBatchEntry::Insert { key, .. } => (key.as_ref(), ValueType::Value),
162                    WriteBatchEntry::Remove { key } => (key.as_ref(), ValueType::Tombstone),
163                    WriteBatchEntry::RemoveWeak { key } => (key.as_ref(), ValueType::WeakTombstone),
164                    WriteBatchEntry::Merge { key, .. } => (key.as_ref(), ValueType::MergeOperand),
165                };
166                if let Some(&prev_type) = seen.get(key_bytes) {
167                    if prev_type != vtype {
168                        return Err(crate::Error::MixedOperationBatch);
169                    }
170                } else {
171                    seen.insert(key_bytes, vtype);
172                }
173            }
174        }
175
176        Ok(self
177            .entries
178            .into_iter()
179            .map(|entry| match entry {
180                WriteBatchEntry::Insert { key, value } => {
181                    InternalValue::from_components(key, value, seqno, ValueType::Value)
182                }
183                WriteBatchEntry::Remove { key } => InternalValue::new_tombstone(key, seqno),
184                WriteBatchEntry::RemoveWeak { key } => {
185                    InternalValue::new_weak_tombstone(key, seqno)
186                }
187                WriteBatchEntry::Merge { key, value } => {
188                    InternalValue::new_merge_operand(key, value, seqno)
189                }
190            })
191            .collect())
192    }
193}