1pub mod item;
6
7use crate::{Keyspace, PartitionHandle, PersistMode};
8pub use item::*;
9use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
10use std::collections::HashSet;
11
12pub type PartitionKey = byteview::StrView;
14
15#[doc(alias = "WriteBatch")]
19pub struct Batch {
20 pub(crate) data: Vec<Item>,
21 keyspace: Keyspace,
22 durability: Option<PersistMode>,
23}
24
25impl Batch {
26 pub(crate) fn new(keyspace: Keyspace) -> Self {
30 Self {
31 data: Vec::new(),
32 keyspace,
33 durability: None,
34 }
35 }
36
37 #[must_use]
43 pub fn with_capacity(keyspace: Keyspace, capacity: usize) -> Self {
44 Self {
45 data: Vec::with_capacity(capacity),
46 keyspace,
47 durability: None,
48 }
49 }
50
51 #[must_use]
53 pub fn len(&self) -> usize {
54 self.data.len()
55 }
56
57 #[must_use]
59 pub fn is_empty(&self) -> bool {
60 self.len() == 0
61 }
62
63 #[must_use]
65 pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
66 self.durability = mode;
67 self
68 }
69
70 pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
72 &mut self,
73 p: &PartitionHandle,
74 key: K,
75 value: V,
76 ) {
77 self.data
78 .push(Item::new(p.name.clone(), key, value, ValueType::Value));
79 }
80
81 pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
83 self.data
84 .push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
85 }
86
87 #[doc(hidden)]
98 pub fn remove_weak<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
99 self.data.push(Item::new(
100 p.name.clone(),
101 key,
102 vec![],
103 ValueType::WeakTombstone,
104 ));
105 }
106
107 pub fn commit_partitions(
111 self,
112 tuples: Vec<(&PartitionHandle, Vec<InnerItem>)>,
113 ) -> crate::Result<()> {
114 use std::sync::atomic::Ordering;
115
116 log::trace!("batch: Acquiring journal writer");
117 let mut journal_writer = self.keyspace.journal.get_writer();
118
119 if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
121 return Err(crate::Error::Poisoned);
122 }
123
124 let batch_seqno = self.keyspace.seqno.next();
125
126 let batch_size = tuples.iter().map(|(_, items)| items.len()).sum::<usize>();
127
128 let _ = journal_writer.write_optimized_batch(&tuples, batch_size, batch_seqno);
129
130 if let Some(mode) = self.durability {
131 if let Err(e) = journal_writer.persist(mode) {
132 self.keyspace.is_poisoned.store(true, Ordering::Release);
133
134 log::error!(
135 "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
136 );
137
138 return Err(crate::Error::Poisoned);
139 }
140 }
141
142 #[allow(clippy::mutable_key_type)]
143 let mut partitions_with_possible_stall = HashSet::new();
144 for (p, _) in &tuples {
145 partitions_with_possible_stall.insert(*p);
146 }
147
148 let mut batch_size = 0u64;
149
150 log::trace!("Applying {} batched items to memtable(s)", batch_size);
151
152 for (partition, items) in tuples {
153 for item in items {
154 let (item_size, _) = match item.value_type {
155 ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
156 ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
157 ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
158 };
159
160 batch_size += u64::from(item_size);
161 }
162 }
163
164 self.keyspace
165 .visible_seqno
166 .fetch_max(batch_seqno + 1, Ordering::AcqRel);
167
168 drop(journal_writer);
169
170 log::trace!("batch: Freed journal writer");
171
172 self.keyspace.write_buffer_manager.allocate(batch_size);
175
176 for partition in partitions_with_possible_stall {
178 let memtable_size = partition.tree.active_memtable_size();
179
180 if let Err(e) = partition.check_memtable_overflow(memtable_size) {
181 log::error!("Failed memtable rotate check: {e:?}");
182 }
183
184 let write_buffer_size = self.keyspace.write_buffer_manager.get();
187 partition.check_write_buffer_size(write_buffer_size);
188 }
189
190 Ok(())
191 }
192
193 pub fn commit(mut self) -> crate::Result<()> {
199 use std::sync::atomic::Ordering;
200
201 log::trace!("batch: Acquiring journal writer");
202 let mut journal_writer = self.keyspace.journal.get_writer();
203
204 if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
206 return Err(crate::Error::Poisoned);
207 }
208
209 let batch_seqno = self.keyspace.seqno.next();
210
211 let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
212
213 if let Some(mode) = self.durability {
214 if let Err(e) = journal_writer.persist(mode) {
215 self.keyspace.is_poisoned.store(true, Ordering::Release);
216
217 log::error!(
218 "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
219 );
220
221 return Err(crate::Error::Poisoned);
222 }
223 }
224
225 #[allow(clippy::mutable_key_type)]
226 let mut partitions_with_possible_stall = HashSet::new();
227 let partitions = self.keyspace.partitions.read().expect("lock is poisoned");
228
229 let mut batch_size = 0u64;
230
231 log::trace!("Applying {} batched items to memtable(s)", self.data.len());
232
233 for item in std::mem::take(&mut self.data) {
234 let Some(partition) = partitions.get(&item.partition) else {
235 continue;
236 };
237
238 let (item_size, _) = match item.value_type {
240 ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
241 ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
242 ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
243 };
244
245 batch_size += u64::from(item_size);
246
247 partitions_with_possible_stall.insert(partition.clone());
249 }
250
251 self.keyspace
252 .visible_seqno
253 .fetch_max(batch_seqno + 1, Ordering::AcqRel);
254
255 drop(journal_writer);
256
257 log::trace!("batch: Freed journal writer");
258
259 drop(partitions);
260
261 self.keyspace.write_buffer_manager.allocate(batch_size);
264
265 for partition in partitions_with_possible_stall {
267 let memtable_size = partition.tree.active_memtable_size();
268
269 if let Err(e) = partition.check_memtable_overflow(memtable_size) {
270 log::error!("Failed memtable rotate check: {e:?}");
271 }
272
273 let write_buffer_size = self.keyspace.write_buffer_manager.get();
276 partition.check_write_buffer_size(write_buffer_size);
277 }
278
279 Ok(())
280 }
281}