fjall/batch/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod item;
6
7use crate::{Keyspace, PartitionHandle, PersistMode};
8use byteview::ByteView;
9pub use item::*;
10use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
11use std::collections::HashSet;
12
13/// Partition key (a.k.a. column family, locality group)
14pub type PartitionKey = byteview::StrView;
15
16/// An atomic write batch
17///
18/// Allows atomically writing across partitions inside the [`Keyspace`].
19#[doc(alias = "WriteBatch")]
20pub struct Batch {
21    pub(crate) data: Vec<Item>,
22    keyspace: Keyspace,
23    durability: Option<PersistMode>,
24}
25
26impl Batch {
27    /// Initializes a new write batch.
28    ///
29    /// This function is called by [`Keyspace::batch`].
30    pub(crate) fn new(keyspace: Keyspace) -> Self {
31        Self {
32            data: Vec::new(),
33            keyspace,
34            durability: None,
35        }
36    }
37
38    /// Initializes a new write batch with preallocated capacity.
39    ///
40    /// ### Note
41    ///
42    /// "Capacity" refers to the number of batch item slots, not their size in memory.
43    #[must_use]
44    pub fn with_capacity(keyspace: Keyspace, capacity: usize) -> Self {
45        Self {
46            data: Vec::with_capacity(capacity),
47            keyspace,
48            durability: None,
49        }
50    }
51
52    /// Gets the number of batched items.
53    #[must_use]
54    pub fn len(&self) -> usize {
55        self.data.len()
56    }
57
58    /// Returns `true` if there are no batches items (yet).
59    #[must_use]
60    pub fn is_empty(&self) -> bool {
61        self.len() == 0
62    }
63
64    /// Sets the durability level.
65    #[must_use]
66    pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
67        self.durability = mode;
68        self
69    }
70
71    /// Inserts a key-value pair into the batch.
72    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
73        &mut self,
74        p: &PartitionHandle,
75        key: K,
76        value: V,
77    ) {
78        self.data
79            .push(Item::new(p.name.clone(), key, value, ValueType::Value));
80    }
81
82    /// Removes a key-value pair.
83    pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
84        self.data
85            .push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
86    }
87
88    /// Adds a weak tombstone marker for a key.
89    ///
90    /// The tombstone marker of this delete operation will vanish when it
91    /// collides with its corresponding insertion.
92    /// This may cause older versions of the value to be resurrected, so it should
93    /// only be used and preferred in scenarios where a key is only ever written once.
94    ///
95    /// # Experimental
96    ///
97    /// This function is currently experimental.
98    #[doc(hidden)]
99    pub fn remove_weak<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
100        self.data.push(Item::new(
101            p.name.clone(),
102            key,
103            vec![],
104            ValueType::WeakTombstone,
105        ));
106    }
107
108    ///
109    /// Fast commit
110    ///
111    pub fn commit_partition<K, V>(
112        self,
113        partition: &PartitionHandle,
114        items: Vec<CompactItem<K, V>>,
115    ) -> crate::Result<()>
116    where
117        K: Into<ByteView>,
118        V: Into<ByteView>,
119    {
120        use std::sync::atomic::Ordering;
121
122        log::trace!("batch: Acquiring journal writer");
123        let mut journal_writer = self.keyspace.journal.get_writer();
124
125        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
126        if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
127            return Err(crate::Error::Poisoned);
128        }
129
130        let batch_seqno = self.keyspace.seqno.next();
131
132        let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
133
134        if let Some(mode) = self.durability {
135            if let Err(e) = journal_writer.persist(mode) {
136                self.keyspace.is_poisoned.store(true, Ordering::Release);
137
138                log::error!(
139                    "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
140                );
141
142                return Err(crate::Error::Poisoned);
143            }
144        }
145
146        #[allow(clippy::mutable_key_type)]
147        let mut partitions_with_possible_stall = HashSet::new();
148        partitions_with_possible_stall.insert(partition.clone());
149
150        let mut batch_size = 0u64;
151
152        log::trace!("Applying {} batched items to memtable(s)", items.len());
153
154        for item in items {
155            let (item_size, _) = match item {
156                CompactItem::Value { key, value } => {
157                    partition.tree.insert(key.into(), value.into(), batch_seqno)
158                }
159                CompactItem::Tombstone(key) => partition.tree.remove(key.into(), batch_seqno),
160                CompactItem::WeakTombstone(key) => {
161                    partition.tree.remove_weak(key.into(), batch_seqno)
162                }
163            };
164
165            batch_size += u64::from(item_size);
166        }
167
168        self.keyspace
169            .visible_seqno
170            .fetch_max(batch_seqno + 1, Ordering::AcqRel);
171
172        drop(journal_writer);
173
174        log::trace!("batch: Freed journal writer");
175
176        // IMPORTANT: Add batch size to current write buffer size
177        // Otherwise write buffer growth is unbounded when using batches
178        self.keyspace.write_buffer_manager.allocate(batch_size);
179
180        // Check each affected partition for write stall/halt
181        for partition in partitions_with_possible_stall {
182            let memtable_size = partition.tree.active_memtable_size();
183
184            if let Err(e) = partition.check_memtable_overflow(memtable_size) {
185                log::error!("Failed memtable rotate check: {e:?}");
186            }
187
188            // IMPORTANT: Check write buffer as well
189            // Otherwise batch writes are never stalled/halted
190            let write_buffer_size = self.keyspace.write_buffer_manager.get();
191            partition.check_write_buffer_size(write_buffer_size);
192        }
193
194        Ok(())
195    }
196
197    /// Commits the batch to the [`Keyspace`] atomically.
198    ///
199    /// # Errors
200    ///
201    /// Will return `Err` if an IO error occurs.
202    pub fn commit(mut self) -> crate::Result<()> {
203        use std::sync::atomic::Ordering;
204
205        log::trace!("batch: Acquiring journal writer");
206        let mut journal_writer = self.keyspace.journal.get_writer();
207
208        // IMPORTANT: Check the poisoned flag after getting journal mutex, otherwise TOCTOU
209        if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
210            return Err(crate::Error::Poisoned);
211        }
212
213        let batch_seqno = self.keyspace.seqno.next();
214
215        let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
216
217        if let Some(mode) = self.durability {
218            if let Err(e) = journal_writer.persist(mode) {
219                self.keyspace.is_poisoned.store(true, Ordering::Release);
220
221                log::error!(
222                    "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
223                );
224
225                return Err(crate::Error::Poisoned);
226            }
227        }
228
229        #[allow(clippy::mutable_key_type)]
230        let mut partitions_with_possible_stall = HashSet::new();
231        let partitions = self.keyspace.partitions.read().expect("lock is poisoned");
232
233        let mut batch_size = 0u64;
234
235        log::trace!("Applying {} batched items to memtable(s)", self.data.len());
236
237        for item in std::mem::take(&mut self.data) {
238            let Some(partition) = partitions.get(&item.partition) else {
239                continue;
240            };
241
242            // TODO: need a better, generic write op
243            let (item_size, _) = match item.value_type {
244                ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
245                ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
246                ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
247            };
248
249            batch_size += u64::from(item_size);
250
251            // IMPORTANT: Clone the handle, because we don't want to keep the partitions lock open
252            partitions_with_possible_stall.insert(partition.clone());
253        }
254
255        self.keyspace
256            .visible_seqno
257            .fetch_max(batch_seqno + 1, Ordering::AcqRel);
258
259        drop(journal_writer);
260
261        log::trace!("batch: Freed journal writer");
262
263        drop(partitions);
264
265        // IMPORTANT: Add batch size to current write buffer size
266        // Otherwise write buffer growth is unbounded when using batches
267        self.keyspace.write_buffer_manager.allocate(batch_size);
268
269        // Check each affected partition for write stall/halt
270        for partition in partitions_with_possible_stall {
271            let memtable_size = partition.tree.active_memtable_size();
272
273            if let Err(e) = partition.check_memtable_overflow(memtable_size) {
274                log::error!("Failed memtable rotate check: {e:?}");
275            }
276
277            // IMPORTANT: Check write buffer as well
278            // Otherwise batch writes are never stalled/halted
279            let write_buffer_size = self.keyspace.write_buffer_manager.get();
280            partition.check_write_buffer_size(write_buffer_size);
281        }
282
283        Ok(())
284    }
285}