1use crate::allocator::Allocator;
2use crate::backing_store::BackingStore;
3use crate::backing_store::BoundedStore;
4use crate::metrics::BlobdMetrics;
5use crate::object::get_bundle_index_for_key;
6use crate::object::serialise_bundle;
7use crate::object::BundleDeserialiser;
8use crate::object::ObjectTupleData;
9use crate::object::ObjectTupleKey;
10use crate::object::OBJECT_TUPLE_DATA_LEN_INLINE_THRESHOLD;
11use crate::op::write_object::allocate_object_on_heap;
12use crate::pages::Pages;
13use crate::util::ceil_pow2;
14use crate::util::mod_pow2;
15use crate::util::ByteConsumer;
16use ahash::AHashMap;
17use crossbeam_channel::RecvTimeoutError;
18use dashmap::DashMap;
19use futures::stream::iter;
20use futures::StreamExt;
21use futures::TryFutureExt;
22use itertools::Itertools;
23use num_derive::FromPrimitive;
24use num_traits::FromPrimitive;
25use off64::int::Off64ReadInt;
26use off64::int::Off64WriteMutInt;
27use off64::u32;
28use off64::u64;
29use off64::usz;
30use off64::Off64WriteMut;
31use parking_lot::Mutex;
32use parking_lot::RwLock;
33use signal_future::SignalFuture;
34use signal_future::SignalFutureController;
35use std::mem::take;
36use std::sync::atomic::AtomicBool;
37use std::sync::atomic::Ordering::Relaxed;
38use std::sync::Arc;
39use std::thread;
40use std::time::Duration;
41use std::time::Instant;
42use tinybuf::TinyBuf;
43use tokio::runtime::Handle;
44use tokio::spawn;
45use tokio::task::spawn_blocking;
46use tokio::time::sleep;
47use tokio::time::timeout;
48use tracing::info;
49use tracing::warn;
50
51const OFFSETOF_VIRTUAL_HEAD: u64 = 0;
73const OFFSETOF_VIRTUAL_TAIL: u64 = OFFSETOF_VIRTUAL_HEAD + 8;
74
75pub(crate) enum BundleTask {
76 Upsert {
77 key: ObjectTupleKey,
78 data: ObjectTupleData,
79 signal: SignalFutureController<()>,
80 },
81 Delete {
82 key: ObjectTupleKey,
83 signal: SignalFutureController<()>,
84 },
85}
86
87#[derive(Clone, Copy, PartialEq, Eq, Debug, FromPrimitive)]
89#[repr(u8)]
90enum LogBufferPersistedEntry {
91 Upsert = 1,
92 Delete,
93 Padding, Wraparound, }
96
97#[derive(Clone)]
98enum LogBufferOverlayEntry {
99 Deleted {},
100 Upserted { data: ObjectTupleData },
101}
102
103struct CompletedFlushesBacklogEntry {
104 new_virtual_tail_to_write: u64,
105 started: Instant,
106 tasks: Vec<BundleTask>,
107}
108
109#[derive(Clone, Default)]
112struct Overlay {
113 committing: Option<Arc<DashMap<ObjectTupleKey, LogBufferOverlayEntry, ahash::RandomState>>>,
114 uncommitted: Arc<DashMap<ObjectTupleKey, LogBufferOverlayEntry, ahash::RandomState>>,
115}
116
117struct LogBufferState {
118 head: u64,
119 tail: u64,
120}
121
122impl LogBufferState {
123 pub async fn flush(&self, state_dev: &BoundedStore, pages: &Pages, metrics: &BlobdMetrics) {
124 let mut state_buf = pages.allocate_uninitialised(pages.spage_size());
125 state_buf.write_u64_le_at(OFFSETOF_VIRTUAL_HEAD, self.head);
126 state_buf.write_u64_le_at(OFFSETOF_VIRTUAL_TAIL, self.tail);
127 state_dev.write_at(0, state_buf).await;
128 metrics.0.log_buffer_virtual_head.store(self.head, Relaxed);
129 metrics.0.log_buffer_virtual_tail.store(self.tail, Relaxed);
130 metrics.0.log_buffer_flush_state_count.fetch_add(1, Relaxed);
131 }
132}
133
134pub(crate) struct LogBuffer {
135 bundle_count: u64,
136 bundles_dev: BoundedStore,
137 commit_threshold: u64,
138 currently_committing: Arc<AtomicBool>,
139 data_dev: BoundedStore,
140 dev: Arc<dyn BackingStore>,
141 heap_allocator: Arc<Mutex<Allocator>>,
142 metrics: BlobdMetrics,
143 overlay: Arc<RwLock<Overlay>>,
144 pages: Pages,
145 state_dev: BoundedStore,
146 virtual_pointers: Arc<tokio::sync::RwLock<LogBufferState>>,
147 sender: crossbeam_channel::Sender<BundleTask>,
149 receiver: Mutex<Option<crossbeam_channel::Receiver<BundleTask>>>, }
151
152impl LogBuffer {
153 pub async fn format_device(state_dev: &BoundedStore, pages: &Pages) {
154 state_dev
155 .write_at(0, pages.slow_allocate_with_zeros(pages.spage_size()))
156 .await;
157 }
158
159 pub async fn load_from_device(
160 dev: Arc<dyn BackingStore>,
161 bundles_dev: BoundedStore,
162 data_dev: BoundedStore,
163 state_dev: BoundedStore,
164 heap_allocator: Arc<Mutex<Allocator>>,
165 pages: Pages,
166 metrics: BlobdMetrics,
167 bundle_count: u64,
168 commit_threshold: u64,
169 ) -> Self {
170 let currently_committing = Arc::new(AtomicBool::new(false));
171
172 let state_raw = state_dev.read_at(0, pages.spage_size()).await;
173 let init_virtual_head = state_raw.read_u64_le_at(OFFSETOF_VIRTUAL_HEAD);
174 let init_virtual_tail = state_raw.read_u64_le_at(OFFSETOF_VIRTUAL_TAIL);
175 assert_eq!(mod_pow2(init_virtual_head, pages.spage_size_pow2), 0);
176 assert_eq!(mod_pow2(init_virtual_tail, pages.spage_size_pow2), 0);
177 assert!(init_virtual_head <= init_virtual_tail);
178 metrics
179 .0
180 .log_buffer_virtual_head
181 .store(init_virtual_head, Relaxed);
182 metrics
183 .0
184 .log_buffer_virtual_tail
185 .store(init_virtual_tail, Relaxed);
186 info!(
187 virtual_head = init_virtual_head,
188 virtual_tail = init_virtual_tail,
189 "loading log buffer"
190 );
191
192 let virtual_pointers = Arc::new(tokio::sync::RwLock::new(LogBufferState {
194 head: init_virtual_head,
195 tail: init_virtual_tail,
196 }));
197
198 let overlay: Arc<RwLock<Overlay>> = {
200 let overlay = Overlay::default();
201 const BUFSIZE: u64 = 1024 * 1024 * 128;
203 let mut buf_vbase = init_virtual_head;
204 let mut buf = Vec::new();
205 let mut buf_drained: usize = 0;
206 let vend = init_virtual_tail;
207 loop {
208 let vnext = buf_vbase + u64!(buf_drained);
209 if vnext == vend {
210 break;
211 }
212 loop {
214 assert!(buf_drained <= buf.len());
215 let avail = u64!(buf.len() - buf_drained);
216 let buf_vafter = vnext + avail;
217 assert_eq!(mod_pow2(buf_vafter, pages.spage_size_pow2), 0);
218 assert!(buf_vafter <= vend);
219 if avail >= BUFSIZE || buf_vafter == vend {
220 break;
221 }
222 let physical_offset = buf_vafter % data_dev.len();
224 let to_read = BUFSIZE
225 .min(vend - buf_vafter)
226 .min(data_dev.len() - physical_offset);
227 let rd = data_dev.read_at(physical_offset, to_read).await;
228 buf_vbase += u64!(buf_drained);
229 buf.drain(..buf_drained);
230 buf_drained = 0;
231 buf.extend_from_slice(&rd);
232 }
233 let mut rd = ByteConsumer::new(&buf[buf_drained..]);
235 let to_skip = match LogBufferPersistedEntry::from_u8(rd.consume(1)[0]).unwrap() {
236 LogBufferPersistedEntry::Upsert => {
237 let key = ObjectTupleKey::deserialise(&mut rd);
238 let data = ObjectTupleData::deserialise(&mut rd);
239 metrics.0.log_buffer_write_entry_count.fetch_add(1, Relaxed);
240 metrics
241 .0
242 .log_buffer_write_entry_data_bytes
243 .fetch_add(u64!(data.len()), Relaxed);
244 overlay
245 .uncommitted
246 .insert(key, LogBufferOverlayEntry::Upserted { data });
247 rd.consumed()
248 }
249 LogBufferPersistedEntry::Delete => {
250 let key = ObjectTupleKey::deserialise(&mut rd);
251 metrics
252 .0
253 .log_buffer_delete_entry_count
254 .fetch_add(1, Relaxed);
255 overlay
256 .uncommitted
257 .insert(key, LogBufferOverlayEntry::Deleted {});
258 rd.consumed()
259 }
260 LogBufferPersistedEntry::Padding => {
261 let to_skip = pages.spage_size() - mod_pow2(vnext, pages.spage_size_pow2);
262 assert!(to_skip > 0 && to_skip < pages.spage_size());
263 usz!(to_skip)
264 }
265 LogBufferPersistedEntry::Wraparound => {
266 let to_skip = data_dev.len() - (vnext % data_dev.len());
267 assert!(to_skip > 0 && to_skip < data_dev.len());
268 usz!(to_skip)
269 }
270 };
271 buf_drained += to_skip;
272 }
273 info!(entries = overlay.uncommitted.len(), "loaded log buffer");
274 Arc::new(RwLock::new(overlay))
275 };
276
277 let (sender, receiver) = crossbeam_channel::unbounded::<BundleTask>();
278
279 Self {
280 bundle_count,
281 bundles_dev,
282 commit_threshold,
283 currently_committing,
284 data_dev,
285 dev,
286 heap_allocator,
287 metrics,
288 overlay,
289 pages,
290 receiver: Mutex::new(Some(receiver)),
291 sender,
292 state_dev,
293 virtual_pointers,
294 }
295 }
296
297 pub async fn start_background_threads(&self) {
299 let handle = Handle::current();
300 let init_virtual_tail = self.virtual_pointers.try_read().unwrap().tail;
301 let receiver = self.receiver.lock().take().unwrap();
302 let bundle_count = self.bundle_count;
303 let commit_threshold = self.commit_threshold;
304
305 let (completer_send, mut completer_recv) =
307 tokio::sync::mpsc::unbounded_channel::<(u64, CompletedFlushesBacklogEntry)>();
308 spawn({
309 let bundles_dev = self.bundles_dev.clone();
310 let currently_committing = self.currently_committing.clone();
311 let dev = self.dev.clone();
312 let heap_allocator = self.heap_allocator.clone();
313 let metrics = self.metrics.clone();
314 let overlay = self.overlay.clone();
315 let pages = self.pages.clone();
316 let state_dev = self.state_dev.clone();
317 let virtual_pointers = self.virtual_pointers.clone();
318 async move {
319 let mut backlog: AHashMap<u64, CompletedFlushesBacklogEntry> = Default::default();
320 let mut next_flush_id: u64 = 0;
321 loop {
322 if !currently_committing.load(Relaxed)
324 && virtual_pointers
325 .try_read()
326 .map(|v| v.tail.checked_sub(v.head).unwrap() >= commit_threshold)
327 .unwrap()
328 {
329 currently_committing.store(true, Relaxed);
330 let log_entries_to_commit = {
333 let mut overlay = overlay.write();
334 assert!(overlay.committing.is_none());
335 let entry_map = take(&mut overlay.uncommitted);
336 overlay.committing = Some(entry_map.clone());
337 entry_map
338 };
339 let commit_up_to_tail = virtual_pointers.try_read().unwrap().tail;
340 info!(
341 entries = log_entries_to_commit.len(),
342 up_to_virtual_tail = commit_up_to_tail,
343 "log buffer commit: starting"
344 );
345 metrics.0.log_buffer_commit_count.fetch_add(1, Relaxed);
346 metrics
347 .0
348 .log_buffer_commit_entry_count
349 .fetch_add(u64!(log_entries_to_commit.len()), Relaxed);
350 spawn({
351 let bundles_dev = bundles_dev.clone();
352 let currently_committing = currently_committing.clone();
353 let dev = dev.clone();
354 let heap_allocator = heap_allocator.clone();
355 let metrics = metrics.clone();
356 let overlay = overlay.clone();
357 let pages = pages.clone();
358 let state_dev = state_dev.clone();
359 let virtual_pointers = virtual_pointers.clone();
360 async move {
361 let grouping_started = Instant::now();
362 let (heap_writes, by_bundle_idx) = spawn_blocking({
363 let heap_allocator = heap_allocator.clone();
364 let metrics = metrics.clone();
365 let pages = pages.clone();
366 move || {
367 let mut allocator = heap_allocator.lock();
369 let mut heap_writes = Vec::new();
370 let by_bundle_idx = log_entries_to_commit
371 .iter()
372 .map(|e| {
373 let key = e.key().clone();
374 let entry = match e.value() {
375 LogBufferOverlayEntry::Upserted {
376 data: ObjectTupleData::Inline(data),
377 } if data.len() > OBJECT_TUPLE_DATA_LEN_INLINE_THRESHOLD => {
378 let size = u32!(data.len());
382 let (dev_offset, size_on_dev) =
383 allocate_object_on_heap(&mut allocator, &pages, &metrics, size)
384 .unwrap();
385 let mut buf = pages.allocate_uninitialised(size_on_dev);
386 buf.write_at(0, data);
387 heap_writes.push((dev_offset, buf));
388 metrics
389 .0
390 .log_buffer_commit_object_heap_move_bytes
391 .fetch_add(u64!(data.len()), Relaxed);
392 metrics
393 .0
394 .log_buffer_commit_object_heap_move_count
395 .fetch_add(1, Relaxed);
396 LogBufferOverlayEntry::Upserted {
397 data: ObjectTupleData::Heap { size, dev_offset },
398 }
399 }
400 e => e.clone(),
401 };
402 (key, entry)
403 })
404 .into_group_map_by(|(key, _data)| {
405 get_bundle_index_for_key(&key.hash(), bundle_count)
406 });
407 (heap_writes, by_bundle_idx)
408 }
409 })
410 .await
411 .unwrap();
412 metrics
413 .0
414 .log_buffer_commit_object_grouping_us
415 .fetch_add(u64!(grouping_started.elapsed().as_micros()), Relaxed);
416 metrics
417 .0
418 .log_buffer_commit_bundle_count
419 .fetch_add(u64!(by_bundle_idx.len()), Relaxed);
420 info!(
421 bundle_count = by_bundle_idx.len(),
422 "log buffer commit: entries grouped"
423 );
424
425 let heap_writes_started = Instant::now();
427 iter(heap_writes)
429 .for_each_concurrent(None, |(dev_offset, buf)| {
430 let dev = dev.clone();
431 async move {
432 dev.write_at(dev_offset, buf).await;
433 }
434 })
435 .await;
436 metrics
437 .0
438 .log_buffer_commit_object_heap_move_write_us
439 .fetch_add(u64!(heap_writes_started.elapsed().as_micros()), Relaxed);
440 info!("log buffer commit: objects moved to heap");
441
442 let bundles_update_started = Instant::now();
444 iter(by_bundle_idx)
445 .for_each_concurrent(None, |(bundle_idx, overlay_entries)| {
446 let bundles_dev = bundles_dev.clone();
447 let dev = dev.clone();
448 let heap_allocator = heap_allocator.clone();
449 let metrics = metrics.clone();
450 let pages = pages.clone();
451 spawn(async move {
453 let read_started = Instant::now();
454 let bundle_raw = bundles_dev
455 .read_at(bundle_idx * pages.spage_size(), pages.spage_size())
456 .await;
457 metrics
458 .0
459 .log_buffer_commit_bundle_read_us
460 .fetch_add(u64!(read_started.elapsed().as_micros()), Relaxed);
461
462 let mut bundle =
463 BundleDeserialiser::new(bundle_raw).collect::<AHashMap<_, _>>();
464 for (key, ent) in overlay_entries {
465 let existing_object_to_delete = match ent {
466 LogBufferOverlayEntry::Deleted {} => bundle.remove(&key),
467 LogBufferOverlayEntry::Upserted { data } => {
468 metrics.0.object_count.fetch_add(1, Relaxed);
471 bundle.insert(key, data)
472 }
473 };
474 if let Some(deleted) = existing_object_to_delete {
475 metrics.0.object_count.fetch_sub(1, Relaxed);
476 if let ObjectTupleData::Heap { size, dev_offset } = deleted {
478 heap_allocator.lock().release(dev_offset, size);
479 metrics
480 .0
481 .heap_object_data_bytes
482 .fetch_sub(size.into(), Relaxed);
483 };
484 };
485 }
486 let new_bundle = serialise_bundle(&pages, bundle);
488 let write_started = Instant::now();
489 dev
490 .write_at(bundle_idx * pages.spage_size(), new_bundle)
491 .await;
492 metrics
493 .0
494 .log_buffer_commit_bundle_committed_count
495 .fetch_add(1, Relaxed);
496 metrics
497 .0
498 .log_buffer_commit_bundle_write_us
499 .fetch_add(u64!(write_started.elapsed().as_micros()), Relaxed);
500 })
501 .unwrap_or_else(|_| ())
502 })
503 .await;
504 metrics
505 .0
506 .log_buffer_commit_bundles_update_us
507 .fetch_add(u64!(bundles_update_started.elapsed().as_micros()), Relaxed);
508 info!("log buffer commit: bundles updated");
509 {
510 let mut v = virtual_pointers.write().await;
511 v.head = commit_up_to_tail;
512 v.flush(&state_dev, &pages, &metrics).await;
513 };
514 assert!(overlay.write().committing.take().is_some());
515 currently_committing.store(false, Relaxed);
516 info!("log buffer commit: completed");
517 }
518 });
519 }
520
521 let Some((flush_id, ent)) = completer_recv.recv().await else {
522 break;
523 };
524 assert!(backlog.insert(flush_id, ent).is_none());
525 while let Ok(Some((flush_id, ent))) =
527 timeout(Duration::from_micros(10), completer_recv.recv()).await
528 {
529 assert!(backlog.insert(flush_id, ent).is_none());
530 }
531
532 let mut flushed_entries = Vec::new();
533 while let Some(ent) = backlog.remove(&next_flush_id) {
534 flushed_entries.push(ent);
535 next_flush_id += 1;
536 }
537 if !flushed_entries.is_empty() {
538 let new_virtual_tail = flushed_entries.last().unwrap().new_virtual_tail_to_write;
539 {
540 let mut v = virtual_pointers.write().await;
541 v.tail = new_virtual_tail;
542 v.flush(&state_dev, &pages, &metrics).await;
543 };
544 let overlay = overlay.read().clone();
545 let metrics = metrics.clone();
546 spawn_blocking(move || {
547 for ent in flushed_entries {
548 for msg in ent.tasks {
549 match msg {
550 BundleTask::Delete { key, signal } => {
551 metrics
552 .0
553 .log_buffer_delete_entry_count
554 .fetch_add(1, Relaxed);
555 overlay
556 .uncommitted
557 .insert(key, LogBufferOverlayEntry::Deleted {});
558 signal.signal(());
559 }
560 BundleTask::Upsert { key, data, signal } => {
561 metrics.0.log_buffer_write_entry_count.fetch_add(1, Relaxed);
562 metrics
563 .0
564 .log_buffer_write_entry_data_bytes
565 .fetch_add(u64!(data.len()), Relaxed);
566 overlay
567 .uncommitted
568 .insert(key, LogBufferOverlayEntry::Upserted { data });
569 signal.signal(());
570 }
571 };
572 }
573 metrics
574 .0
575 .log_buffer_flush_total_us
576 .fetch_add(u64!(ent.started.elapsed().as_micros()), Relaxed);
577 }
578 })
579 .await
580 .unwrap();
581 };
582 }
583 }
584 });
585
586 thread::spawn({
587 let data_dev = self.data_dev.clone();
588 let metrics = self.metrics.clone();
589 let pages = self.pages.clone();
590 let virtual_pointers = self.virtual_pointers.clone();
591 move || {
592 let mut virtual_tail = init_virtual_tail;
593 const MAX_BUF_LEN: u64 = 128 * 1024 * 1024;
595 let mut buf = Vec::new();
596 let mut pending_log_flush = Vec::new();
597 let mut next_flush_id = 0;
598 let mut last_flush_time = Instant::now();
599 let mut disconnected = false;
600 while !disconnected {
601 match receiver.recv_timeout(Duration::from_micros(100)) {
603 Ok(task) => {
604 match &task {
605 BundleTask::Upsert { key, data, .. } => {
606 buf.push(LogBufferPersistedEntry::Upsert as u8);
607 key.serialise(&mut buf);
608 data.serialise(&mut buf);
609 }
610 BundleTask::Delete { key, .. } => {
611 buf.push(LogBufferPersistedEntry::Delete as u8);
612 key.serialise(&mut buf);
613 }
614 };
615 pending_log_flush.push(task);
616 }
617 Err(RecvTimeoutError::Timeout) => {}
618 Err(RecvTimeoutError::Disconnected) => {
619 disconnected = true;
620 }
621 };
622 if pending_log_flush.is_empty() {
623 continue;
624 };
625 let now = Instant::now();
626 let mut buf_len = u64!(buf.len());
627 if disconnected
629 || buf_len >= MAX_BUF_LEN
630 || u64!(now.duration_since(last_flush_time).as_micros()) > 10_000
631 {
632 if mod_pow2(buf_len, pages.spage_size_pow2) > 0 {
636 buf.push(LogBufferPersistedEntry::Padding as u8);
637 buf_len += 1;
638 };
639 metrics
640 .0
641 .log_buffer_flush_entry_count
642 .fetch_add(u64!(pending_log_flush.len()), Relaxed);
643 metrics
644 .0
645 .log_buffer_flush_data_bytes
646 .fetch_add(buf_len, Relaxed);
647 metrics.0.log_buffer_flush_count.fetch_add(1, Relaxed);
648 if buf_len <= 1024 * 4 {
649 metrics.0.log_buffer_flush_4k_count.fetch_add(1, Relaxed);
650 } else if buf_len <= 1024 * 64 {
651 metrics.0.log_buffer_flush_64k_count.fetch_add(1, Relaxed);
652 } else if buf_len <= 1024 * 1024 * 1 {
653 metrics.0.log_buffer_flush_1m_count.fetch_add(1, Relaxed);
654 } else if buf_len <= 1024 * 1024 * 8 {
655 metrics.0.log_buffer_flush_8m_count.fetch_add(1, Relaxed);
656 };
657 let mut buf_padded =
658 pages.allocate_uninitialised(ceil_pow2(buf_len, pages.spage_size_pow2));
659
660 metrics
661 .0
662 .log_buffer_flush_padding_bytes
663 .fetch_add(u64!(buf_padded.len() - buf.len()), Relaxed);
664 buf_padded[..buf.len()].copy_from_slice(&buf);
665 buf.clear();
666
667 let flush_id = next_flush_id;
668 next_flush_id += 1;
669 assert_eq!(mod_pow2(virtual_tail, pages.spage_size_pow2), 0);
670 let mut physical_offset = virtual_tail % data_dev.len();
671 let write_wraparound_at = if physical_offset + u64!(buf_padded.len()) > data_dev.len() {
672 warn!("log buffer wrapped");
675 let write_wraparound_at = physical_offset;
676 virtual_tail += data_dev.len() - physical_offset;
677 physical_offset = 0;
678 assert_eq!(virtual_tail % data_dev.len(), physical_offset);
679 Some(write_wraparound_at)
680 } else {
681 None
682 };
683 virtual_tail += u64!(buf_padded.len());
684 if virtual_tail - virtual_pointers.blocking_read().head >= data_dev.len() {
686 panic!("out of log buffer space");
688 };
689
690 let new_virtual_tail_to_write = virtual_tail;
691 let pending_log_flush = pending_log_flush.drain(..).collect_vec();
692 last_flush_time = now;
693 handle.spawn({
694 let completer_send = completer_send.clone();
695 let data_dev = data_dev.clone();
696 let metrics = metrics.clone();
697 let pages = pages.clone();
698 async move {
699 let flush_write_started = Instant::now();
700 if let Some(write_wraparound_at) = write_wraparound_at {
701 let mut raw = pages.allocate_uninitialised(pages.spage_size());
702 raw[0] = LogBufferPersistedEntry::Wraparound as u8;
703 data_dev.write_at(write_wraparound_at, raw).await;
704 }
705 data_dev.write_at(physical_offset, buf_padded).await;
706 metrics
707 .0
708 .log_buffer_flush_write_us
709 .fetch_add(u64!(flush_write_started.elapsed().as_micros()), Relaxed);
710 assert!(completer_send
711 .send((flush_id, CompletedFlushesBacklogEntry {
712 new_virtual_tail_to_write,
713 started: now,
714 tasks: pending_log_flush,
715 }))
716 .is_ok());
717 }
718 });
719 };
720 }
721 }
722 });
723 }
724
725 pub async fn wait_for_any_current_commit(&self) {
726 while self.currently_committing.load(Relaxed) {
727 sleep(Duration::from_millis(1)).await;
728 }
729 }
730
731 pub async fn read_tuple(&self, key_raw: TinyBuf) -> Option<ObjectTupleData> {
732 let hash = blake3::hash(&key_raw);
733 let bundle_idx = get_bundle_index_for_key(hash.as_bytes(), self.bundle_count);
734 let key = ObjectTupleKey::from_raw_and_hash(key_raw, hash);
735 let overlay = self.overlay.read().clone();
739 match overlay
740 .uncommitted
741 .get(&key)
742 .map(|e| e.clone())
743 .or_else(|| {
744 overlay
745 .committing
746 .as_ref()
747 .and_then(|m| m.get(&key).map(|e| e.clone()))
748 }) {
749 Some(e) => match e {
750 LogBufferOverlayEntry::Deleted {} => None,
751 LogBufferOverlayEntry::Upserted { data } => Some(data),
752 },
753 None => BundleDeserialiser::new(
754 self
755 .bundles_dev
756 .read_at(
757 bundle_idx * self.pages.spage_size(),
758 self.pages.spage_size(),
759 )
760 .await,
761 )
762 .find(|(tuple_key, _data)| tuple_key == &key)
763 .map(|(_, data)| data),
764 }
765 }
766
767 pub async fn upsert_tuple(&self, key: TinyBuf, data: ObjectTupleData) {
768 let key = ObjectTupleKey::from_raw(key);
769 let (fut, signal) = SignalFuture::new();
770 self
771 .sender
772 .send(BundleTask::Upsert { key, data, signal })
773 .unwrap();
774 fut.await;
775 }
776
777 pub async fn delete_tuple(&self, key: TinyBuf) {
778 let key = ObjectTupleKey::from_raw(key);
779 let (fut, signal) = SignalFuture::new();
780 self
781 .sender
782 .send(BundleTask::Delete { key, signal })
783 .unwrap();
784 fut.await;
785 }
786}