1pub mod item;
6
7use crate::{Keyspace, PartitionHandle, PersistMode};
8use byteview::ByteView;
9pub use item::*;
10use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
11use std::collections::HashSet;
12
13pub type PartitionKey = byteview::StrView;
15
16#[doc(alias = "WriteBatch")]
20pub struct Batch {
21 pub(crate) data: Vec<Item>,
22 keyspace: Keyspace,
23 durability: Option<PersistMode>,
24}
25
26impl Batch {
27 pub(crate) fn new(keyspace: Keyspace) -> Self {
31 Self {
32 data: Vec::new(),
33 keyspace,
34 durability: None,
35 }
36 }
37
38 #[must_use]
44 pub fn with_capacity(keyspace: Keyspace, capacity: usize) -> Self {
45 Self {
46 data: Vec::with_capacity(capacity),
47 keyspace,
48 durability: None,
49 }
50 }
51
52 #[must_use]
54 pub fn len(&self) -> usize {
55 self.data.len()
56 }
57
58 #[must_use]
60 pub fn is_empty(&self) -> bool {
61 self.len() == 0
62 }
63
64 #[must_use]
66 pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
67 self.durability = mode;
68 self
69 }
70
71 pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
73 &mut self,
74 p: &PartitionHandle,
75 key: K,
76 value: V,
77 ) {
78 self.data
79 .push(Item::new(p.name.clone(), key, value, ValueType::Value));
80 }
81
82 pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
84 self.data
85 .push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
86 }
87
88 #[doc(hidden)]
99 pub fn remove_weak<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
100 self.data.push(Item::new(
101 p.name.clone(),
102 key,
103 vec![],
104 ValueType::WeakTombstone,
105 ));
106 }
107
108 pub fn commit_partition<K, V>(
112 self,
113 partition: &PartitionHandle,
114 items: Vec<CompactItem<K, V>>,
115 ) -> crate::Result<()>
116 where
117 K: Into<ByteView>,
118 V: Into<ByteView>,
119 {
120 use std::sync::atomic::Ordering;
121
122 log::trace!("batch: Acquiring journal writer");
123 let mut journal_writer = self.keyspace.journal.get_writer();
124
125 if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
127 return Err(crate::Error::Poisoned);
128 }
129
130 let batch_seqno = self.keyspace.seqno.next();
131
132 let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
133
134 if let Some(mode) = self.durability {
135 if let Err(e) = journal_writer.persist(mode) {
136 self.keyspace.is_poisoned.store(true, Ordering::Release);
137
138 log::error!(
139 "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
140 );
141
142 return Err(crate::Error::Poisoned);
143 }
144 }
145
146 #[allow(clippy::mutable_key_type)]
147 let mut partitions_with_possible_stall = HashSet::new();
148 partitions_with_possible_stall.insert(partition.clone());
149
150 let mut batch_size = 0u64;
151
152 log::trace!("Applying {} batched items to memtable(s)", items.len());
153
154 for item in items {
155 let (item_size, _) = match item {
156 CompactItem::Value { key, value } => {
157 partition.tree.insert(key.into(), value.into(), batch_seqno)
158 }
159 CompactItem::Tombstone(key) => partition.tree.remove(key.into(), batch_seqno),
160 CompactItem::WeakTombstone(key) => {
161 partition.tree.remove_weak(key.into(), batch_seqno)
162 }
163 };
164
165 batch_size += u64::from(item_size);
166 }
167
168 self.keyspace
169 .visible_seqno
170 .fetch_max(batch_seqno + 1, Ordering::AcqRel);
171
172 drop(journal_writer);
173
174 log::trace!("batch: Freed journal writer");
175
176 self.keyspace.write_buffer_manager.allocate(batch_size);
179
180 for partition in partitions_with_possible_stall {
182 let memtable_size = partition.tree.active_memtable_size();
183
184 if let Err(e) = partition.check_memtable_overflow(memtable_size) {
185 log::error!("Failed memtable rotate check: {e:?}");
186 }
187
188 let write_buffer_size = self.keyspace.write_buffer_manager.get();
191 partition.check_write_buffer_size(write_buffer_size);
192 }
193
194 Ok(())
195 }
196
197 pub fn commit(mut self) -> crate::Result<()> {
203 use std::sync::atomic::Ordering;
204
205 log::trace!("batch: Acquiring journal writer");
206 let mut journal_writer = self.keyspace.journal.get_writer();
207
208 if self.keyspace.is_poisoned.load(Ordering::Relaxed) {
210 return Err(crate::Error::Poisoned);
211 }
212
213 let batch_seqno = self.keyspace.seqno.next();
214
215 let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
216
217 if let Some(mode) = self.durability {
218 if let Err(e) = journal_writer.persist(mode) {
219 self.keyspace.is_poisoned.store(true, Ordering::Release);
220
221 log::error!(
222 "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
223 );
224
225 return Err(crate::Error::Poisoned);
226 }
227 }
228
229 #[allow(clippy::mutable_key_type)]
230 let mut partitions_with_possible_stall = HashSet::new();
231 let partitions = self.keyspace.partitions.read().expect("lock is poisoned");
232
233 let mut batch_size = 0u64;
234
235 log::trace!("Applying {} batched items to memtable(s)", self.data.len());
236
237 for item in std::mem::take(&mut self.data) {
238 let Some(partition) = partitions.get(&item.partition) else {
239 continue;
240 };
241
242 let (item_size, _) = match item.value_type {
244 ValueType::Value => partition.tree.insert(item.key, item.value, batch_seqno),
245 ValueType::Tombstone => partition.tree.remove(item.key, batch_seqno),
246 ValueType::WeakTombstone => partition.tree.remove_weak(item.key, batch_seqno),
247 };
248
249 batch_size += u64::from(item_size);
250
251 partitions_with_possible_stall.insert(partition.clone());
253 }
254
255 self.keyspace
256 .visible_seqno
257 .fetch_max(batch_seqno + 1, Ordering::AcqRel);
258
259 drop(journal_writer);
260
261 log::trace!("batch: Freed journal writer");
262
263 drop(partitions);
264
265 self.keyspace.write_buffer_manager.allocate(batch_size);
268
269 for partition in partitions_with_possible_stall {
271 let memtable_size = partition.tree.active_memtable_size();
272
273 if let Err(e) = partition.check_memtable_overflow(memtable_size) {
274 log::error!("Failed memtable rotate check: {e:?}");
275 }
276
277 let write_buffer_size = self.keyspace.write_buffer_manager.get();
280 partition.check_write_buffer_size(write_buffer_size);
281 }
282
283 Ok(())
284 }
285}