fjall/batch/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod item;
6
7use crate::{Keyspace, PartitionHandle, PersistMode};
8pub use item::*;
9use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
10use std::collections::HashSet;
11
12/// Partition key (a.k.a. column family, locality group)
13pub type PartitionKey = byteview::StrView;
14
15/// An atomic write batch
16///
17/// Allows atomically writing across partitions inside the [`Keyspace`].
18#[doc(alias = "WriteBatch")]
19pub struct Batch {
20    pub(crate) data: Vec<Item>,
21    keyspace: Keyspace,
22    durability: Option<PersistMode>,
23}
24
25impl Batch {
26    /// Initializes a new write batch.
27    ///
28    /// This function is called by [`Keyspace::batch`].
29    pub(crate) fn new(keyspace: Keyspace) -> Self {
30        Self {
31            data: Vec::new(),
32            keyspace,
33            durability: None,
34        }
35    }
36
37    /// Initializes a new write batch with preallocated capacity.
38    ///
39    /// ### Note
40    ///
41    /// "Capacity" refers to the number of batch item slots, not their size in memory.
42    #[must_use]
43    pub fn with_capacity(keyspace: Keyspace, capacity: usize) -> Self {
44        Self {
45            data: Vec::with_capacity(capacity),
46            keyspace,
47            durability: None,
48        }
49    }
50
51    /// Gets the number of batched items.
52    #[must_use]
53    pub fn len(&self) -> usize {
54        self.data.len()
55    }
56
57    /// Returns `true` if there are no batches items (yet).
58    #[must_use]
59    pub fn is_empty(&self) -> bool {
60        self.len() == 0
61    }
62
63    /// Sets the durability level.
64    #[must_use]
65    pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
66        self.durability = mode;
67        self
68    }
69
70    /// Inserts a key-value pair into the batch.
71    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
72        &mut self,
73        p: &PartitionHandle,
74        key: K,
75        value: V,
76    ) {
77        self.data
78            .push(Item::new(p.name.clone(), key, value, ValueType::Value));
79    }
80
81    /// Removes a key-value pair.
82    pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
83        self.data
84            .push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
85    }
86
87    /// Adds a weak tombstone marker for a key.
88    ///
89    /// The tombstone marker of this delete operation will vanish when it
90    /// collides with its corresponding insertion.
91    /// This may cause older versions of the value to be resurrected, so it should
92    /// only be used and preferred in scenarios where a key is only ever written once.
93    ///
94    /// # Experimental
95    ///
96    /// This function is currently experimental.
97    #[doc(hidden)]
98    pub fn remove_weak<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
99        self.data.push(Item::new(
100            p.name.clone(),
101            key,
102            vec![],
103            ValueType::WeakTombstone,
104        ));
105    }
106
107    ///
108    /// Commit partition
109    ///
110    pub fn commit_partitions(
111        self,
112        tuples: Vec<(&PartitionHandle, Vec<InnerItem>)>,
113    ) -> crate::Result<()> {
114        use std::sync::atomic::Ordering;
115
116        log::trace!("batch: Acquiring journal writer");
117        let mut journal_writer = self.keyspace.journal.get_writer();
118
119        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
120        if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
121            return Err(crate::Error::Poisoned);
122        }
123
124        let batch_seqno = self.keyspace.seqno.next();
125
126        let batch_size = tuples.iter().map(|(_, items)| items.len()).sum::<usize>();
127
128        let _ = journal_writer.write_optimized_batch(&tuples, batch_size, batch_seqno);
129
130        if let Some(mode) = self.durability {
131            if let Err(e) = journal_writer.persist(mode) {
132                self.keyspace.is_poisoned.store(true, Ordering::Release);
133
134                log::error!(
135                    "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
136                );
137
138                return Err(crate::Error::Poisoned);
139            }
140        }
141
142        #[allow(clippy::mutable_key_type)]
143        let mut partitions_with_possible_stall = HashSet::new();
144        for (p, _) in &tuples {
145            partitions_with_possible_stall.insert(*p);
146        }
147
148        let mut batch_size = 0u64;
149
150        log::trace!("Applying {} batched items to memtable(s)", batch_size);
151
152        for (partition, items) in tuples {
153            for item in items {
154                let (item_size, _) = match item.value_type {
155                    ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
156                    ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
157                    ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
158                };
159
160                batch_size += u64::from(item_size);
161            }
162        }
163
164        self.keyspace
165            .visible_seqno
166            .fetch_max(batch_seqno + 1, Ordering::AcqRel);
167
168        drop(journal_writer);
169
170        log::trace!("batch: Freed journal writer");
171
172        // IMPORTANT: Add batch size to current write buffer size
173        // Otherwise write buffer growth is unbounded when using batches
174        self.keyspace.write_buffer_manager.allocate(batch_size);
175
176        // Check each affected partition for write stall/halt
177        for partition in partitions_with_possible_stall {
178            let memtable_size = partition.tree.active_memtable_size();
179
180            if let Err(e) = partition.check_memtable_overflow(memtable_size) {
181                log::error!("Failed memtable rotate check: {e:?}");
182            }
183
184            // IMPORTANT: Check write buffer as well
185            // Otherwise batch writes are never stalled/halted
186            let write_buffer_size = self.keyspace.write_buffer_manager.get();
187            partition.check_write_buffer_size(write_buffer_size);
188        }
189
190        Ok(())
191    }
192
193    /// Commits the batch to the [`Keyspace`] atomically.
194    ///
195    /// # Errors
196    ///
197    /// Will return `Err` if an IO error occurs.
198    pub fn commit(mut self) -> crate::Result<()> {
199        use std::sync::atomic::Ordering;
200
201        log::trace!("batch: Acquiring journal writer");
202        let mut journal_writer = self.keyspace.journal.get_writer();
203
204        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
205        if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
206            return Err(crate::Error::Poisoned);
207        }
208
209        let batch_seqno = self.keyspace.seqno.next();
210
211        let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
212
213        if let Some(mode) = self.durability {
214            if let Err(e) = journal_writer.persist(mode) {
215                self.keyspace.is_poisoned.store(true, Ordering::Release);
216
217                log::error!(
218                    "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
219                );
220
221                return Err(crate::Error::Poisoned);
222            }
223        }
224
225        #[allow(clippy::mutable_key_type)]
226        let mut partitions_with_possible_stall = HashSet::new();
227        let partitions = self.keyspace.partitions.read().expect("lock is poisoned");
228
229        let mut batch_size = 0u64;
230
231        log::trace!("Applying {} batched items to memtable(s)", self.data.len());
232
233        for item in std::mem::take(&mut self.data) {
234            let Some(partition) = partitions.get(&item.partition) else {
235                continue;
236            };
237
238            // TODO: need a better, generic write op
239            let (item_size, _) = match item.value_type {
240                ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
241                ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
242                ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
243            };
244
245            batch_size += u64::from(item_size);
246
247            // IMPORTANT: Clone the handle, because we don't want to keep the partitions lock open
248            partitions_with_possible_stall.insert(partition.clone());
249        }
250
251        self.keyspace
252            .visible_seqno
253            .fetch_max(batch_seqno + 1, Ordering::AcqRel);
254
255        drop(journal_writer);
256
257        log::trace!("batch: Freed journal writer");
258
259        drop(partitions);
260
261        // IMPORTANT: Add batch size to current write buffer size
262        // Otherwise write buffer growth is unbounded when using batches
263        self.keyspace.write_buffer_manager.allocate(batch_size);
264
265        // Check each affected partition for write stall/halt
266        for partition in partitions_with_possible_stall {
267            let memtable_size = partition.tree.active_memtable_size();
268
269            if let Err(e) = partition.check_memtable_overflow(memtable_size) {
270                log::error!("Failed memtable rotate check: {e:?}");
271            }
272
273            // IMPORTANT: Check write buffer as well
274            // Otherwise batch writes are never stalled/halted
275            let write_buffer_size = self.keyspace.write_buffer_manager.get();
276            partition.check_write_buffer_size(write_buffer_size);
277        }
278
279        Ok(())
280    }
281}