libblobd_kv/
log_buffer.rs

1use crate::allocator::Allocator;
2use crate::backing_store::BackingStore;
3use crate::backing_store::BoundedStore;
4use crate::metrics::BlobdMetrics;
5use crate::object::get_bundle_index_for_key;
6use crate::object::serialise_bundle;
7use crate::object::BundleDeserialiser;
8use crate::object::ObjectTupleData;
9use crate::object::ObjectTupleKey;
10use crate::object::OBJECT_TUPLE_DATA_LEN_INLINE_THRESHOLD;
11use crate::op::write_object::allocate_object_on_heap;
12use crate::pages::Pages;
13use crate::util::ceil_pow2;
14use crate::util::mod_pow2;
15use crate::util::ByteConsumer;
16use ahash::AHashMap;
17use crossbeam_channel::RecvTimeoutError;
18use dashmap::DashMap;
19use futures::stream::iter;
20use futures::StreamExt;
21use futures::TryFutureExt;
22use itertools::Itertools;
23use num_derive::FromPrimitive;
24use num_traits::FromPrimitive;
25use off64::int::Off64ReadInt;
26use off64::int::Off64WriteMutInt;
27use off64::u32;
28use off64::u64;
29use off64::usz;
30use off64::Off64WriteMut;
31use parking_lot::Mutex;
32use parking_lot::RwLock;
33use signal_future::SignalFuture;
34use signal_future::SignalFutureController;
35use std::mem::take;
36use std::sync::atomic::AtomicBool;
37use std::sync::atomic::Ordering::Relaxed;
38use std::sync::Arc;
39use std::thread;
40use std::time::Duration;
41use std::time::Instant;
42use tinybuf::TinyBuf;
43use tokio::runtime::Handle;
44use tokio::spawn;
45use tokio::task::spawn_blocking;
46use tokio::time::sleep;
47use tokio::time::timeout;
48use tracing::info;
49use tracing::warn;
50
51/*
52
53Data layout:
54
55{
56  ({
57    u8 == 1 upsert
58    u8[] key_serialised
59    u8[] data_serialised
60  } | {
61    u8 == 2 delete
62    u8[] key_serialised
63  } | {
64    u8 == 3 padding_marker
65  } | {
66    u8 == 4 wraparound_marker
67  })[] persisted_entries
68}[] flushes
69
70*/
71
72const OFFSETOF_VIRTUAL_HEAD: u64 = 0;
73const OFFSETOF_VIRTUAL_TAIL: u64 = OFFSETOF_VIRTUAL_HEAD + 8;
74
75pub(crate) enum BundleTask {
76  Upsert {
77    key: ObjectTupleKey,
78    data: ObjectTupleData,
79    signal: SignalFutureController<()>,
80  },
81  Delete {
82    key: ObjectTupleKey,
83    signal: SignalFutureController<()>,
84  },
85}
86
87// We intentionally don't use structs here, instead using this enum as just the raw u8 tag, as otherwise it makes it hard to try and serialise without owning (i.e. a lot of memcpy) while deserialising with ownership (since it's almost always from raw device read buffers that will be discarded).
88#[derive(Clone, Copy, PartialEq, Eq, Debug, FromPrimitive)]
89#[repr(u8)]
90enum LogBufferPersistedEntry {
91  Upsert = 1,
92  Delete,
93  Padding, // When parsing, this means to skip to the next spage as the rest of the current spage is junk.
94  Wraparound, // When parsing, this means to skip to the next virtual offset where the physical offset is zero as the rest of the physical log buffer is junk.
95}
96
97#[derive(Clone)]
98enum LogBufferOverlayEntry {
99  Deleted {},
100  Upserted { data: ObjectTupleData },
101}
102
103struct CompletedFlushesBacklogEntry {
104  new_virtual_tail_to_write: u64,
105  started: Instant,
106  tasks: Vec<BundleTask>,
107}
108
109// If we just have standard HashMap and hold a read lock, then read performance quickly becomes single-threaded; DashMap is much faster for this reason. However, we still need some way to quickly switch between maps for commits. Therefore, we use a light lock and quickly clone the inner Arc<DashMap> to try and get the best of both worlds.
110// WARNING: Do not use FxHasher, as it makes the maps poorly distributed and causes high CPU usage from many key comparisons.
111#[derive(Clone, Default)]
112struct Overlay {
113  committing: Option<Arc<DashMap<ObjectTupleKey, LogBufferOverlayEntry, ahash::RandomState>>>,
114  uncommitted: Arc<DashMap<ObjectTupleKey, LogBufferOverlayEntry, ahash::RandomState>>,
115}
116
117struct LogBufferState {
118  head: u64,
119  tail: u64,
120}
121
122impl LogBufferState {
123  pub async fn flush(&self, state_dev: &BoundedStore, pages: &Pages, metrics: &BlobdMetrics) {
124    let mut state_buf = pages.allocate_uninitialised(pages.spage_size());
125    state_buf.write_u64_le_at(OFFSETOF_VIRTUAL_HEAD, self.head);
126    state_buf.write_u64_le_at(OFFSETOF_VIRTUAL_TAIL, self.tail);
127    state_dev.write_at(0, state_buf).await;
128    metrics.0.log_buffer_virtual_head.store(self.head, Relaxed);
129    metrics.0.log_buffer_virtual_tail.store(self.tail, Relaxed);
130    metrics.0.log_buffer_flush_state_count.fetch_add(1, Relaxed);
131  }
132}
133
134pub(crate) struct LogBuffer {
135  bundle_count: u64,
136  bundles_dev: BoundedStore,
137  commit_threshold: u64,
138  currently_committing: Arc<AtomicBool>,
139  data_dev: BoundedStore,
140  dev: Arc<dyn BackingStore>,
141  heap_allocator: Arc<Mutex<Allocator>>,
142  metrics: BlobdMetrics,
143  overlay: Arc<RwLock<Overlay>>,
144  pages: Pages,
145  state_dev: BoundedStore,
146  virtual_pointers: Arc<tokio::sync::RwLock<LogBufferState>>,
147  // std::sync::mpsc::Sender is not Send.
148  sender: crossbeam_channel::Sender<BundleTask>,
149  receiver: Mutex<Option<crossbeam_channel::Receiver<BundleTask>>>, // This will be taken and left as None after calling `start`.
150}
151
152impl LogBuffer {
153  pub async fn format_device(state_dev: &BoundedStore, pages: &Pages) {
154    state_dev
155      .write_at(0, pages.slow_allocate_with_zeros(pages.spage_size()))
156      .await;
157  }
158
159  pub async fn load_from_device(
160    dev: Arc<dyn BackingStore>,
161    bundles_dev: BoundedStore,
162    data_dev: BoundedStore,
163    state_dev: BoundedStore,
164    heap_allocator: Arc<Mutex<Allocator>>,
165    pages: Pages,
166    metrics: BlobdMetrics,
167    bundle_count: u64,
168    commit_threshold: u64,
169  ) -> Self {
170    let currently_committing = Arc::new(AtomicBool::new(false));
171
172    let state_raw = state_dev.read_at(0, pages.spage_size()).await;
173    let init_virtual_head = state_raw.read_u64_le_at(OFFSETOF_VIRTUAL_HEAD);
174    let init_virtual_tail = state_raw.read_u64_le_at(OFFSETOF_VIRTUAL_TAIL);
175    assert_eq!(mod_pow2(init_virtual_head, pages.spage_size_pow2), 0);
176    assert_eq!(mod_pow2(init_virtual_tail, pages.spage_size_pow2), 0);
177    assert!(init_virtual_head <= init_virtual_tail);
178    metrics
179      .0
180      .log_buffer_virtual_head
181      .store(init_virtual_head, Relaxed);
182    metrics
183      .0
184      .log_buffer_virtual_tail
185      .store(init_virtual_tail, Relaxed);
186    info!(
187      virtual_head = init_virtual_head,
188      virtual_tail = init_virtual_tail,
189      "loading log buffer"
190    );
191
192    // If this is async-locked, it means someone is writing to the log buffer state.
193    let virtual_pointers = Arc::new(tokio::sync::RwLock::new(LogBufferState {
194      head: init_virtual_head,
195      tail: init_virtual_tail,
196    }));
197
198    // TODO Regularly shrink_to_fit if capacity is excessively high.
199    let overlay: Arc<RwLock<Overlay>> = {
200      let overlay = Overlay::default();
201      // TODO Allow configuring.
202      const BUFSIZE: u64 = 1024 * 1024 * 128;
203      let mut buf_vbase = init_virtual_head;
204      let mut buf = Vec::new();
205      let mut buf_drained: usize = 0;
206      let vend = init_virtual_tail;
207      loop {
208        let vnext = buf_vbase + u64!(buf_drained);
209        if vnext == vend {
210          break;
211        }
212        // Use a loop as we may be near the physical end and so will need more than one read.
213        loop {
214          assert!(buf_drained <= buf.len());
215          let avail = u64!(buf.len() - buf_drained);
216          let buf_vafter = vnext + avail;
217          assert_eq!(mod_pow2(buf_vafter, pages.spage_size_pow2), 0);
218          assert!(buf_vafter <= vend);
219          if avail >= BUFSIZE || buf_vafter == vend {
220            break;
221          }
222          // The buffer's running dry and there's still more to read. We don't know how long each serialised entry is, so we need to have enough buffered to ensure that any deserialisation error is because of corruption/bugs and not because unexpected EOF.
223          let physical_offset = buf_vafter % data_dev.len();
224          let to_read = BUFSIZE
225            .min(vend - buf_vafter)
226            .min(data_dev.len() - physical_offset);
227          let rd = data_dev.read_at(physical_offset, to_read).await;
228          buf_vbase += u64!(buf_drained);
229          buf.drain(..buf_drained);
230          buf_drained = 0;
231          buf.extend_from_slice(&rd);
232        }
233        // Use `buf_drained` instead of draining a few bytes from `buf` on each iteration, causing reallocation each time.
234        let mut rd = ByteConsumer::new(&buf[buf_drained..]);
235        let to_skip = match LogBufferPersistedEntry::from_u8(rd.consume(1)[0]).unwrap() {
236          LogBufferPersistedEntry::Upsert => {
237            let key = ObjectTupleKey::deserialise(&mut rd);
238            let data = ObjectTupleData::deserialise(&mut rd);
239            metrics.0.log_buffer_write_entry_count.fetch_add(1, Relaxed);
240            metrics
241              .0
242              .log_buffer_write_entry_data_bytes
243              .fetch_add(u64!(data.len()), Relaxed);
244            overlay
245              .uncommitted
246              .insert(key, LogBufferOverlayEntry::Upserted { data });
247            rd.consumed()
248          }
249          LogBufferPersistedEntry::Delete => {
250            let key = ObjectTupleKey::deserialise(&mut rd);
251            metrics
252              .0
253              .log_buffer_delete_entry_count
254              .fetch_add(1, Relaxed);
255            overlay
256              .uncommitted
257              .insert(key, LogBufferOverlayEntry::Deleted {});
258            rd.consumed()
259          }
260          LogBufferPersistedEntry::Padding => {
261            let to_skip = pages.spage_size() - mod_pow2(vnext, pages.spage_size_pow2);
262            assert!(to_skip > 0 && to_skip < pages.spage_size());
263            usz!(to_skip)
264          }
265          LogBufferPersistedEntry::Wraparound => {
266            let to_skip = data_dev.len() - (vnext % data_dev.len());
267            assert!(to_skip > 0 && to_skip < data_dev.len());
268            usz!(to_skip)
269          }
270        };
271        buf_drained += to_skip;
272      }
273      info!(entries = overlay.uncommitted.len(), "loaded log buffer");
274      Arc::new(RwLock::new(overlay))
275    };
276
277    let (sender, receiver) = crossbeam_channel::unbounded::<BundleTask>();
278
279    Self {
280      bundle_count,
281      bundles_dev,
282      commit_threshold,
283      currently_committing,
284      data_dev,
285      dev,
286      heap_allocator,
287      metrics,
288      overlay,
289      pages,
290      receiver: Mutex::new(Some(receiver)),
291      sender,
292      state_dev,
293      virtual_pointers,
294    }
295  }
296
297  /// WARNING: This must only be called once.
298  pub async fn start_background_threads(&self) {
299    let handle = Handle::current();
300    let init_virtual_tail = self.virtual_pointers.try_read().unwrap().tail;
301    let receiver = self.receiver.lock().take().unwrap();
302    let bundle_count = self.bundle_count;
303    let commit_threshold = self.commit_threshold;
304
305    // This separate background future and async channel exists so that completed flushes can continue to be enqueued while this is writing the log state asynchronously.
306    let (completer_send, mut completer_recv) =
307      tokio::sync::mpsc::unbounded_channel::<(u64, CompletedFlushesBacklogEntry)>();
308    spawn({
309      let bundles_dev = self.bundles_dev.clone();
310      let currently_committing = self.currently_committing.clone();
311      let dev = self.dev.clone();
312      let heap_allocator = self.heap_allocator.clone();
313      let metrics = self.metrics.clone();
314      let overlay = self.overlay.clone();
315      let pages = self.pages.clone();
316      let state_dev = self.state_dev.clone();
317      let virtual_pointers = self.virtual_pointers.clone();
318      async move {
319        let mut backlog: AHashMap<u64, CompletedFlushesBacklogEntry> = Default::default();
320        let mut next_flush_id: u64 = 0;
321        loop {
322          // Check if we need to do a commit. We should always be able to read-lock `virtual_pointers` because the only other thread that could write-lock (other than us) is the commit future, which we checked isn't running.
323          if !currently_committing.load(Relaxed)
324            && virtual_pointers
325              .try_read()
326              .map(|v| v.tail.checked_sub(v.head).unwrap() >= commit_threshold)
327              .unwrap()
328          {
329            currently_committing.store(true, Relaxed);
330            // Only we (the current spawned future) can update the overlay entries and virtual head/tail, so even though virtual head/tail are not locked as part of `overlay`, they are always in sync and this is consistent and correct.
331            // NOTE: To keep the previous consistency and correctness guarantees, do this outside of the following `spawn`.
332            let log_entries_to_commit = {
333              let mut overlay = overlay.write();
334              assert!(overlay.committing.is_none());
335              let entry_map = take(&mut overlay.uncommitted);
336              overlay.committing = Some(entry_map.clone());
337              entry_map
338            };
339            let commit_up_to_tail = virtual_pointers.try_read().unwrap().tail;
340            info!(
341              entries = log_entries_to_commit.len(),
342              up_to_virtual_tail = commit_up_to_tail,
343              "log buffer commit: starting"
344            );
345            metrics.0.log_buffer_commit_count.fetch_add(1, Relaxed);
346            metrics
347              .0
348              .log_buffer_commit_entry_count
349              .fetch_add(u64!(log_entries_to_commit.len()), Relaxed);
350            spawn({
351              let bundles_dev = bundles_dev.clone();
352              let currently_committing = currently_committing.clone();
353              let dev = dev.clone();
354              let heap_allocator = heap_allocator.clone();
355              let metrics = metrics.clone();
356              let overlay = overlay.clone();
357              let pages = pages.clone();
358              let state_dev = state_dev.clone();
359              let virtual_pointers = virtual_pointers.clone();
360              async move {
361                let grouping_started = Instant::now();
362                let (heap_writes, by_bundle_idx) = spawn_blocking({
363                  let heap_allocator = heap_allocator.clone();
364                  let metrics = metrics.clone();
365                  let pages = pages.clone();
366                  move || {
367                    // Lock once instead of every time we need it, which will probably be many given that almost all objects will need to be moved to the heap.
368                    let mut allocator = heap_allocator.lock();
369                    let mut heap_writes = Vec::new();
370                    let by_bundle_idx = log_entries_to_commit
371                      .iter()
372                      .map(|e| {
373                        let key = e.key().clone();
374                        let entry = match e.value() {
375                          LogBufferOverlayEntry::Upserted {
376                            data: ObjectTupleData::Inline(data),
377                          } if data.len() > OBJECT_TUPLE_DATA_LEN_INLINE_THRESHOLD => {
378                            // We're now writing to the tuples area which has a much smaller treshold for inline data than the log buffer, so we need to rewrite the object.
379                            // We do this now in a spawn_blocking so we don't block async threads; each individual entry doesn't take too much CPU time to update the allocator but it adds up when there are potentially millions of entries.
380                            // TODO Handle ENOSPC.
381                            let size = u32!(data.len());
382                            let (dev_offset, size_on_dev) =
383                              allocate_object_on_heap(&mut allocator, &pages, &metrics, size)
384                                .unwrap();
385                            let mut buf = pages.allocate_uninitialised(size_on_dev);
386                            buf.write_at(0, data);
387                            heap_writes.push((dev_offset, buf));
388                            metrics
389                              .0
390                              .log_buffer_commit_object_heap_move_bytes
391                              .fetch_add(u64!(data.len()), Relaxed);
392                            metrics
393                              .0
394                              .log_buffer_commit_object_heap_move_count
395                              .fetch_add(1, Relaxed);
396                            LogBufferOverlayEntry::Upserted {
397                              data: ObjectTupleData::Heap { size, dev_offset },
398                            }
399                          }
400                          e => e.clone(),
401                        };
402                        (key, entry)
403                      })
404                      .into_group_map_by(|(key, _data)| {
405                        get_bundle_index_for_key(&key.hash(), bundle_count)
406                      });
407                    (heap_writes, by_bundle_idx)
408                  }
409                })
410                .await
411                .unwrap();
412                metrics
413                  .0
414                  .log_buffer_commit_object_grouping_us
415                  .fetch_add(u64!(grouping_started.elapsed().as_micros()), Relaxed);
416                metrics
417                  .0
418                  .log_buffer_commit_bundle_count
419                  .fetch_add(u64!(by_bundle_idx.len()), Relaxed);
420                info!(
421                  bundle_count = by_bundle_idx.len(),
422                  "log buffer commit: entries grouped"
423                );
424
425                // Move objects to heap first before updating bundles.
426                let heap_writes_started = Instant::now();
427                // TODO Opportunities for coalescing?
428                iter(heap_writes)
429                  .for_each_concurrent(None, |(dev_offset, buf)| {
430                    let dev = dev.clone();
431                    async move {
432                      dev.write_at(dev_offset, buf).await;
433                    }
434                  })
435                  .await;
436                metrics
437                  .0
438                  .log_buffer_commit_object_heap_move_write_us
439                  .fetch_add(u64!(heap_writes_started.elapsed().as_micros()), Relaxed);
440                info!("log buffer commit: objects moved to heap");
441
442                // Read then update then write tuple bundles.
443                let bundles_update_started = Instant::now();
444                iter(by_bundle_idx)
445                  .for_each_concurrent(None, |(bundle_idx, overlay_entries)| {
446                    let bundles_dev = bundles_dev.clone();
447                    let dev = dev.clone();
448                    let heap_allocator = heap_allocator.clone();
449                    let metrics = metrics.clone();
450                    let pages = pages.clone();
451                    // If we don't spawn, `for_each_concurrent` essentially becomes one future that processes every single bundle and entry, and while each individual bundle doesn't take that much CPU time, in aggregate it adds up to a lot and we'd be blocking future threads and slowing down the entire system.
452                    spawn(async move {
453                      let read_started = Instant::now();
454                      let bundle_raw = bundles_dev
455                        .read_at(bundle_idx * pages.spage_size(), pages.spage_size())
456                        .await;
457                      metrics
458                        .0
459                        .log_buffer_commit_bundle_read_us
460                        .fetch_add(u64!(read_started.elapsed().as_micros()), Relaxed);
461
462                      let mut bundle =
463                        BundleDeserialiser::new(bundle_raw).collect::<AHashMap<_, _>>();
464                      for (key, ent) in overlay_entries {
465                        let existing_object_to_delete = match ent {
466                          LogBufferOverlayEntry::Deleted {} => bundle.remove(&key),
467                          LogBufferOverlayEntry::Upserted { data } => {
468                            // We've already ensured that data is no larger than `OBJECT_TUPLE_DATA_LEN_INLINE_THRESHOLD` if inline.
469                            // We'll subtract one again when handling `existing_object_to_delete` if there's an existing object.
470                            metrics.0.object_count.fetch_add(1, Relaxed);
471                            bundle.insert(key, data)
472                          }
473                        };
474                        if let Some(deleted) = existing_object_to_delete {
475                          metrics.0.object_count.fetch_sub(1, Relaxed);
476                          // TODO There is still a race condition here where someone is about to read this object but we release its space and some other new object gets allocated it and writes some other data, causing junk to be read.
477                          if let ObjectTupleData::Heap { size, dev_offset } = deleted {
478                            heap_allocator.lock().release(dev_offset, size);
479                            metrics
480                              .0
481                              .heap_object_data_bytes
482                              .fetch_sub(size.into(), Relaxed);
483                          };
484                        };
485                      }
486                      // TODO Better error/panic message on overflow.
487                      let new_bundle = serialise_bundle(&pages, bundle);
488                      let write_started = Instant::now();
489                      dev
490                        .write_at(bundle_idx * pages.spage_size(), new_bundle)
491                        .await;
492                      metrics
493                        .0
494                        .log_buffer_commit_bundle_committed_count
495                        .fetch_add(1, Relaxed);
496                      metrics
497                        .0
498                        .log_buffer_commit_bundle_write_us
499                        .fetch_add(u64!(write_started.elapsed().as_micros()), Relaxed);
500                    })
501                    .unwrap_or_else(|_| ())
502                  })
503                  .await;
504                metrics
505                  .0
506                  .log_buffer_commit_bundles_update_us
507                  .fetch_add(u64!(bundles_update_started.elapsed().as_micros()), Relaxed);
508                info!("log buffer commit: bundles updated");
509                {
510                  let mut v = virtual_pointers.write().await;
511                  v.head = commit_up_to_tail;
512                  v.flush(&state_dev, &pages, &metrics).await;
513                };
514                assert!(overlay.write().committing.take().is_some());
515                currently_committing.store(false, Relaxed);
516                info!("log buffer commit: completed");
517              }
518            });
519          }
520
521          let Some((flush_id, ent)) = completer_recv.recv().await else {
522            break;
523          };
524          assert!(backlog.insert(flush_id, ent).is_none());
525          // TODO Tune and allow configuring hyperparameter.
526          while let Ok(Some((flush_id, ent))) =
527            timeout(Duration::from_micros(10), completer_recv.recv()).await
528          {
529            assert!(backlog.insert(flush_id, ent).is_none());
530          }
531
532          let mut flushed_entries = Vec::new();
533          while let Some(ent) = backlog.remove(&next_flush_id) {
534            flushed_entries.push(ent);
535            next_flush_id += 1;
536          }
537          if !flushed_entries.is_empty() {
538            let new_virtual_tail = flushed_entries.last().unwrap().new_virtual_tail_to_write;
539            {
540              let mut v = virtual_pointers.write().await;
541              v.tail = new_virtual_tail;
542              v.flush(&state_dev, &pages, &metrics).await;
543            };
544            let overlay = overlay.read().clone();
545            let metrics = metrics.clone();
546            spawn_blocking(move || {
547              for ent in flushed_entries {
548                for msg in ent.tasks {
549                  match msg {
550                    BundleTask::Delete { key, signal } => {
551                      metrics
552                        .0
553                        .log_buffer_delete_entry_count
554                        .fetch_add(1, Relaxed);
555                      overlay
556                        .uncommitted
557                        .insert(key, LogBufferOverlayEntry::Deleted {});
558                      signal.signal(());
559                    }
560                    BundleTask::Upsert { key, data, signal } => {
561                      metrics.0.log_buffer_write_entry_count.fetch_add(1, Relaxed);
562                      metrics
563                        .0
564                        .log_buffer_write_entry_data_bytes
565                        .fetch_add(u64!(data.len()), Relaxed);
566                      overlay
567                        .uncommitted
568                        .insert(key, LogBufferOverlayEntry::Upserted { data });
569                      signal.signal(());
570                    }
571                  };
572                }
573                metrics
574                  .0
575                  .log_buffer_flush_total_us
576                  .fetch_add(u64!(ent.started.elapsed().as_micros()), Relaxed);
577              }
578            })
579            .await
580            .unwrap();
581          };
582        }
583      }
584    });
585
586    thread::spawn({
587      let data_dev = self.data_dev.clone();
588      let metrics = self.metrics.clone();
589      let pages = self.pages.clone();
590      let virtual_pointers = self.virtual_pointers.clone();
591      move || {
592        let mut virtual_tail = init_virtual_tail;
593        // TODO Tune and allow configuring hyperparameter.
594        const MAX_BUF_LEN: u64 = 128 * 1024 * 1024;
595        let mut buf = Vec::new();
596        let mut pending_log_flush = Vec::new();
597        let mut next_flush_id = 0;
598        let mut last_flush_time = Instant::now();
599        let mut disconnected = false;
600        while !disconnected {
601          // TODO Tune and allow configuring hyperparameter.
602          match receiver.recv_timeout(Duration::from_micros(100)) {
603            Ok(task) => {
604              match &task {
605                BundleTask::Upsert { key, data, .. } => {
606                  buf.push(LogBufferPersistedEntry::Upsert as u8);
607                  key.serialise(&mut buf);
608                  data.serialise(&mut buf);
609                }
610                BundleTask::Delete { key, .. } => {
611                  buf.push(LogBufferPersistedEntry::Delete as u8);
612                  key.serialise(&mut buf);
613                }
614              };
615              pending_log_flush.push(task);
616            }
617            Err(RecvTimeoutError::Timeout) => {}
618            Err(RecvTimeoutError::Disconnected) => {
619              disconnected = true;
620            }
621          };
622          if pending_log_flush.is_empty() {
623            continue;
624          };
625          let now = Instant::now();
626          let mut buf_len = u64!(buf.len());
627          // TODO Tune and allow configuring duration hyperparameter. This doesn't have to match the `recv_timeout` as they handle different scenarios: the `recv_timeout` determines how often to check in again and see if a flush is necessary, while this determines when a flush is necessary.
628          if disconnected
629            || buf_len >= MAX_BUF_LEN
630            || u64!(now.duration_since(last_flush_time).as_micros()) > 10_000
631          {
632            // We need padding:
633            // - to avoid double writing the last spage between flushes
634            // - to allow parallel flushes (without padding, flushes will likely overlap and clobber each other in the last spage)
635            if mod_pow2(buf_len, pages.spage_size_pow2) > 0 {
636              buf.push(LogBufferPersistedEntry::Padding as u8);
637              buf_len += 1;
638            };
639            metrics
640              .0
641              .log_buffer_flush_entry_count
642              .fetch_add(u64!(pending_log_flush.len()), Relaxed);
643            metrics
644              .0
645              .log_buffer_flush_data_bytes
646              .fetch_add(buf_len, Relaxed);
647            metrics.0.log_buffer_flush_count.fetch_add(1, Relaxed);
648            if buf_len <= 1024 * 4 {
649              metrics.0.log_buffer_flush_4k_count.fetch_add(1, Relaxed);
650            } else if buf_len <= 1024 * 64 {
651              metrics.0.log_buffer_flush_64k_count.fetch_add(1, Relaxed);
652            } else if buf_len <= 1024 * 1024 * 1 {
653              metrics.0.log_buffer_flush_1m_count.fetch_add(1, Relaxed);
654            } else if buf_len <= 1024 * 1024 * 8 {
655              metrics.0.log_buffer_flush_8m_count.fetch_add(1, Relaxed);
656            };
657            let mut buf_padded =
658              pages.allocate_uninitialised(ceil_pow2(buf_len, pages.spage_size_pow2));
659
660            metrics
661              .0
662              .log_buffer_flush_padding_bytes
663              .fetch_add(u64!(buf_padded.len() - buf.len()), Relaxed);
664            buf_padded[..buf.len()].copy_from_slice(&buf);
665            buf.clear();
666
667            let flush_id = next_flush_id;
668            next_flush_id += 1;
669            assert_eq!(mod_pow2(virtual_tail, pages.spage_size_pow2), 0);
670            let mut physical_offset = virtual_tail % data_dev.len();
671            let write_wraparound_at = if physical_offset + u64!(buf_padded.len()) > data_dev.len() {
672              // We don't have enough room at the end, so wraparound.
673              // TODO This is not efficient use of space if `buf_padded` is large, as some entries could probably still fit.
674              warn!("log buffer wrapped");
675              let write_wraparound_at = physical_offset;
676              virtual_tail += data_dev.len() - physical_offset;
677              physical_offset = 0;
678              assert_eq!(virtual_tail % data_dev.len(), physical_offset);
679              Some(write_wraparound_at)
680            } else {
681              None
682            };
683            virtual_tail += u64!(buf_padded.len());
684            // Check this AFTER possible wraparound. This is always safe as `head` can only increase, so we can only have false positives, not false negatives.
685            if virtual_tail - virtual_pointers.blocking_read().head >= data_dev.len() {
686              // TODO Better handling.
687              panic!("out of log buffer space");
688            };
689
690            let new_virtual_tail_to_write = virtual_tail;
691            let pending_log_flush = pending_log_flush.drain(..).collect_vec();
692            last_flush_time = now;
693            handle.spawn({
694              let completer_send = completer_send.clone();
695              let data_dev = data_dev.clone();
696              let metrics = metrics.clone();
697              let pages = pages.clone();
698              async move {
699                let flush_write_started = Instant::now();
700                if let Some(write_wraparound_at) = write_wraparound_at {
701                  let mut raw = pages.allocate_uninitialised(pages.spage_size());
702                  raw[0] = LogBufferPersistedEntry::Wraparound as u8;
703                  data_dev.write_at(write_wraparound_at, raw).await;
704                }
705                data_dev.write_at(physical_offset, buf_padded).await;
706                metrics
707                  .0
708                  .log_buffer_flush_write_us
709                  .fetch_add(u64!(flush_write_started.elapsed().as_micros()), Relaxed);
710                assert!(completer_send
711                  .send((flush_id, CompletedFlushesBacklogEntry {
712                    new_virtual_tail_to_write,
713                    started: now,
714                    tasks: pending_log_flush,
715                  }))
716                  .is_ok());
717              }
718            });
719          };
720        }
721      }
722    });
723  }
724
725  pub async fn wait_for_any_current_commit(&self) {
726    while self.currently_committing.load(Relaxed) {
727      sleep(Duration::from_millis(1)).await;
728    }
729  }
730
731  pub async fn read_tuple(&self, key_raw: TinyBuf) -> Option<ObjectTupleData> {
732    let hash = blake3::hash(&key_raw);
733    let bundle_idx = get_bundle_index_for_key(hash.as_bytes(), self.bundle_count);
734    let key = ObjectTupleKey::from_raw_and_hash(key_raw, hash);
735    // We're taking a bold step here and not using any in-memory bundle cache, because each read of a random spage is extremely fast on NVMe devices and (assuming good hashing and bucket load factor) we should very rarely re-read a bundle unless we re-read the same key (which we don't need to optimise for).
736    // We don't even need to acquire some read lock, because even if the log commits just as we're about to read or reading, the bundle spage read should still be atomic (i.e. either state before or after our commit), and technically either state is legal and correct. For a similar reason, it's safe to just read any `self.overlay` map entry; all entries represent legal persisted state.
737    // WARNING: We must never return a value that has not persisted to the log or bundle yet, even if in memory, as that gives misleading confirmation of durable persistence.
738    let overlay = self.overlay.read().clone();
739    match overlay
740      .uncommitted
741      .get(&key)
742      .map(|e| e.clone())
743      .or_else(|| {
744        overlay
745          .committing
746          .as_ref()
747          .and_then(|m| m.get(&key).map(|e| e.clone()))
748      }) {
749      Some(e) => match e {
750        LogBufferOverlayEntry::Deleted {} => None,
751        LogBufferOverlayEntry::Upserted { data } => Some(data),
752      },
753      None => BundleDeserialiser::new(
754        self
755          .bundles_dev
756          .read_at(
757            bundle_idx * self.pages.spage_size(),
758            self.pages.spage_size(),
759          )
760          .await,
761      )
762      .find(|(tuple_key, _data)| tuple_key == &key)
763      .map(|(_, data)| data),
764    }
765  }
766
767  pub async fn upsert_tuple(&self, key: TinyBuf, data: ObjectTupleData) {
768    let key = ObjectTupleKey::from_raw(key);
769    let (fut, signal) = SignalFuture::new();
770    self
771      .sender
772      .send(BundleTask::Upsert { key, data, signal })
773      .unwrap();
774    fut.await;
775  }
776
777  pub async fn delete_tuple(&self, key: TinyBuf) {
778    let key = ObjectTupleKey::from_raw(key);
779    let (fut, signal) = SignalFuture::new();
780    self
781      .sender
782      .send(BundleTask::Delete { key, signal })
783      .unwrap();
784    fut.await;
785  }
786}