libblobd-direct 0.6.0

Library for blobd, direct variant
Documentation
use crate::backing_store::PartitionStore;
use crate::object::calc_object_layout;
use crate::object::ObjectMetadata;
use crate::object::ObjectState;
use crate::object::ObjectTuple;
use crate::pages::Pages;
use crate::partition::PartitionLoader;
use crate::tuples::load_tuples_from_raw_tuples_area;
use bufpool::buf::Buf;
use chrono::DateTime;
use chrono::Utc;
use futures::stream::iter;
use futures::Stream;
use futures::StreamExt;
use off64::usz;
use parking_lot::Mutex;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;

#[derive(PartialEq, Eq, Clone, Copy, Debug, Default, Serialize, Deserialize)]
pub struct BlobdExporterMarker {
  object_id: u128,
  partition: usize,
}

impl Ord for BlobdExporterMarker {
  fn cmp(&self, other: &Self) -> std::cmp::Ordering {
    // WARNING: It's important for the order key to be `(object_id, partition)` and not the other way around as otherwise we'll keep hitting a single partition when iterating in order.
    self
      .object_id
      .cmp(&other.object_id)
      .then_with(|| self.partition.cmp(&other.partition))
  }
}

impl PartialOrd for BlobdExporterMarker {
  fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
    Some(self.cmp(other))
  }
}

pub struct BlobdExportedObject {
  // This is generally a blobd internal value and not that useful, but we'll export it anyway.
  pub id: u128,
  pub key: Vec<u8>,
  pub size: u64,
  pub created: DateTime<Utc>,
  pub data_stream: Pin<Box<dyn Stream<Item = Buf> + Send>>,
}

pub struct BlobdExporterEntry {
  tuple: ObjectTuple,
  partition_dev: PartitionStore,
  pages: Pages,
}

impl BlobdExporterEntry {
  pub async fn read(&self) -> BlobdExportedObject {
    let dev = self.partition_dev.clone();
    let pages = &self.pages;
    let tuple = &self.tuple;

    let metadata_raw = dev
      .read_at(
        tuple.metadata_dev_offset,
        1 << tuple.metadata_page_size_pow2,
      )
      .await;

    // Yes, rmp_serde stops reading once fully deserialised, and doesn't error if there is extra junk afterwards.
    let ObjectMetadata {
      size: object_size,
      created,
      key,
      lpage_dev_offsets,
      tail_page_dev_offsets,
    } = rmp_serde::from_slice(&metadata_raw).unwrap();
    let layout = calc_object_layout(&pages, object_size);

    let mut reads = Vec::new();
    for dev_offset in lpage_dev_offsets {
      reads.push((dev_offset, pages.lpage_size()));
    }
    for (i, sz) in layout.tail_page_sizes_pow2 {
      reads.push((tail_page_dev_offsets[usz!(i)], 1 << sz));
    }

    let stream = async_stream::stream! {
      let mut rem = object_size;
      for (offset, len) in reads {
        let mut chunk = dev.read_at(offset, len).await;
        // Using `rem` and calling `truncate` on each iteration is the simplest way. An alternative is to do some subtle logic on the last read, but that's trickier. Be aware that the last page may be larger than a spage, perfectly full, or non-existent (empty object).
        chunk.truncate(usz!(rem));
        rem = rem.saturating_sub(len);
        yield chunk;
      };
    };

    BlobdExportedObject {
      created,
      id: tuple.id,
      key,
      size: object_size,
      data_stream: Box::pin(stream),
    }
  }
}

// TODO Document how to handle multiple committed objects with the same key.
pub struct BlobdExporter {
  entries: VecDeque<(BlobdExporterMarker, BlobdExporterEntry)>,
}

impl BlobdExporter {
  pub(crate) async fn new(
    partitions: &[PartitionLoader],
    pages: &Pages,
    offset: BlobdExporterMarker,
  ) -> BlobdExporter {
    let raw_tuple_areas: Arc<Mutex<Vec<(usize, Buf)>>> = Default::default();
    iter(partitions.iter().enumerate())
      .for_each_concurrent(None, |(id, p)| {
        let raw_tuple_areas = raw_tuple_areas.clone();
        async move {
          // Don't inline this expression, we should not hold the lock until we've read this.
          let raw = p.load_raw_tuples_area().await;
          raw_tuple_areas.lock().push((id, raw));
        }
      })
      .await;
    let mut entries = Vec::new();
    for (part_id, raw) in Arc::into_inner(raw_tuple_areas).unwrap().into_inner() {
      load_tuples_from_raw_tuples_area(&raw, pages, |_, tuple| {
        let marker = BlobdExporterMarker {
          object_id: tuple.id,
          partition: part_id,
        };
        if marker >= offset && tuple.state == ObjectState::Committed {
          entries.push((marker, BlobdExporterEntry {
            tuple,
            pages: pages.clone(),
            partition_dev: partitions[part_id].dev.clone(),
          }));
        };
      });
    }
    entries.sort_unstable_by_key(|e| e.0);
    Self {
      entries: entries.into(),
    }
  }

  pub fn pop(&mut self) -> Option<(BlobdExporterMarker, BlobdExporterEntry)> {
    self.entries.pop_front()
  }

  pub fn take(self) -> VecDeque<(BlobdExporterMarker, BlobdExporterEntry)> {
    self.entries
  }
}