libblobd_direct/
exporter.rs

1use crate::backing_store::PartitionStore;
2use crate::object::calc_object_layout;
3use crate::object::ObjectMetadata;
4use crate::object::ObjectState;
5use crate::object::ObjectTuple;
6use crate::pages::Pages;
7use crate::partition::PartitionLoader;
8use crate::tuples::load_tuples_from_raw_tuples_area;
9use bufpool::buf::Buf;
10use chrono::DateTime;
11use chrono::Utc;
12use futures::stream::iter;
13use futures::Stream;
14use futures::StreamExt;
15use off64::usz;
16use parking_lot::Mutex;
17use serde::Deserialize;
18use serde::Serialize;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::sync::Arc;
22
23#[derive(PartialEq, Eq, Clone, Copy, Debug, Default, Serialize, Deserialize)]
24pub struct BlobdExporterMarker {
25  object_id: u128,
26  partition: usize,
27}
28
29impl Ord for BlobdExporterMarker {
30  fn cmp(&self, other: &Self) -> std::cmp::Ordering {
31    // WARNING: It's important for the order key to be `(object_id, partition)` and not the other way around as otherwise we'll keep hitting a single partition when iterating in order.
32    self
33      .object_id
34      .cmp(&other.object_id)
35      .then_with(|| self.partition.cmp(&other.partition))
36  }
37}
38
39impl PartialOrd for BlobdExporterMarker {
40  fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
41    Some(self.cmp(other))
42  }
43}
44
45pub struct BlobdExportedObject {
46  // This is generally a blobd internal value and not that useful, but we'll export it anyway.
47  pub id: u128,
48  pub key: Vec<u8>,
49  pub size: u64,
50  pub created: DateTime<Utc>,
51  pub data_stream: Pin<Box<dyn Stream<Item = Buf> + Send>>,
52}
53
54pub struct BlobdExporterEntry {
55  tuple: ObjectTuple,
56  partition_dev: PartitionStore,
57  pages: Pages,
58}
59
60impl BlobdExporterEntry {
61  pub async fn read(&self) -> BlobdExportedObject {
62    let dev = self.partition_dev.clone();
63    let pages = &self.pages;
64    let tuple = &self.tuple;
65
66    let metadata_raw = dev
67      .read_at(
68        tuple.metadata_dev_offset,
69        1 << tuple.metadata_page_size_pow2,
70      )
71      .await;
72
73    // Yes, rmp_serde stops reading once fully deserialised, and doesn't error if there is extra junk afterwards.
74    let ObjectMetadata {
75      size: object_size,
76      created,
77      key,
78      lpage_dev_offsets,
79      tail_page_dev_offsets,
80    } = rmp_serde::from_slice(&metadata_raw).unwrap();
81    let layout = calc_object_layout(&pages, object_size);
82
83    let mut reads = Vec::new();
84    for dev_offset in lpage_dev_offsets {
85      reads.push((dev_offset, pages.lpage_size()));
86    }
87    for (i, sz) in layout.tail_page_sizes_pow2 {
88      reads.push((tail_page_dev_offsets[usz!(i)], 1 << sz));
89    }
90
91    let stream = async_stream::stream! {
92      let mut rem = object_size;
93      for (offset, len) in reads {
94        let mut chunk = dev.read_at(offset, len).await;
95        // Using `rem` and calling `truncate` on each iteration is the simplest way. An alternative is to do some subtle logic on the last read, but that's trickier. Be aware that the last page may be larger than a spage, perfectly full, or non-existent (empty object).
96        chunk.truncate(usz!(rem));
97        rem = rem.saturating_sub(len);
98        yield chunk;
99      };
100    };
101
102    BlobdExportedObject {
103      created,
104      id: tuple.id,
105      key,
106      size: object_size,
107      data_stream: Box::pin(stream),
108    }
109  }
110}
111
112// TODO Document how to handle multiple committed objects with the same key.
113pub struct BlobdExporter {
114  entries: VecDeque<(BlobdExporterMarker, BlobdExporterEntry)>,
115}
116
117impl BlobdExporter {
118  pub(crate) async fn new(
119    partitions: &[PartitionLoader],
120    pages: &Pages,
121    offset: BlobdExporterMarker,
122  ) -> BlobdExporter {
123    let raw_tuple_areas: Arc<Mutex<Vec<(usize, Buf)>>> = Default::default();
124    iter(partitions.iter().enumerate())
125      .for_each_concurrent(None, |(id, p)| {
126        let raw_tuple_areas = raw_tuple_areas.clone();
127        async move {
128          // Don't inline this expression, we should not hold the lock until we've read this.
129          let raw = p.load_raw_tuples_area().await;
130          raw_tuple_areas.lock().push((id, raw));
131        }
132      })
133      .await;
134    let mut entries = Vec::new();
135    for (part_id, raw) in Arc::into_inner(raw_tuple_areas).unwrap().into_inner() {
136      load_tuples_from_raw_tuples_area(&raw, pages, |_, tuple| {
137        let marker = BlobdExporterMarker {
138          object_id: tuple.id,
139          partition: part_id,
140        };
141        if marker >= offset && tuple.state == ObjectState::Committed {
142          entries.push((marker, BlobdExporterEntry {
143            tuple,
144            pages: pages.clone(),
145            partition_dev: partitions[part_id].dev.clone(),
146          }));
147        };
148      });
149    }
150    entries.sort_unstable_by_key(|e| e.0);
151    Self {
152      entries: entries.into(),
153    }
154  }
155
156  pub fn pop(&mut self) -> Option<(BlobdExporterMarker, BlobdExporterEntry)> {
157    self.entries.pop_front()
158  }
159
160  pub fn take(self) -> VecDeque<(BlobdExporterMarker, BlobdExporterEntry)> {
161    self.entries
162  }
163}