libblobd_direct/
exporter.rs1use crate::backing_store::PartitionStore;
2use crate::object::calc_object_layout;
3use crate::object::ObjectMetadata;
4use crate::object::ObjectState;
5use crate::object::ObjectTuple;
6use crate::pages::Pages;
7use crate::partition::PartitionLoader;
8use crate::tuples::load_tuples_from_raw_tuples_area;
9use bufpool::buf::Buf;
10use chrono::DateTime;
11use chrono::Utc;
12use futures::stream::iter;
13use futures::Stream;
14use futures::StreamExt;
15use off64::usz;
16use parking_lot::Mutex;
17use serde::Deserialize;
18use serde::Serialize;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::sync::Arc;
22
23#[derive(PartialEq, Eq, Clone, Copy, Debug, Default, Serialize, Deserialize)]
24pub struct BlobdExporterMarker {
25 object_id: u128,
26 partition: usize,
27}
28
29impl Ord for BlobdExporterMarker {
30 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
31 self
33 .object_id
34 .cmp(&other.object_id)
35 .then_with(|| self.partition.cmp(&other.partition))
36 }
37}
38
39impl PartialOrd for BlobdExporterMarker {
40 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
41 Some(self.cmp(other))
42 }
43}
44
45pub struct BlobdExportedObject {
46 pub id: u128,
48 pub key: Vec<u8>,
49 pub size: u64,
50 pub created: DateTime<Utc>,
51 pub data_stream: Pin<Box<dyn Stream<Item = Buf> + Send>>,
52}
53
54pub struct BlobdExporterEntry {
55 tuple: ObjectTuple,
56 partition_dev: PartitionStore,
57 pages: Pages,
58}
59
60impl BlobdExporterEntry {
61 pub async fn read(&self) -> BlobdExportedObject {
62 let dev = self.partition_dev.clone();
63 let pages = &self.pages;
64 let tuple = &self.tuple;
65
66 let metadata_raw = dev
67 .read_at(
68 tuple.metadata_dev_offset,
69 1 << tuple.metadata_page_size_pow2,
70 )
71 .await;
72
73 let ObjectMetadata {
75 size: object_size,
76 created,
77 key,
78 lpage_dev_offsets,
79 tail_page_dev_offsets,
80 } = rmp_serde::from_slice(&metadata_raw).unwrap();
81 let layout = calc_object_layout(&pages, object_size);
82
83 let mut reads = Vec::new();
84 for dev_offset in lpage_dev_offsets {
85 reads.push((dev_offset, pages.lpage_size()));
86 }
87 for (i, sz) in layout.tail_page_sizes_pow2 {
88 reads.push((tail_page_dev_offsets[usz!(i)], 1 << sz));
89 }
90
91 let stream = async_stream::stream! {
92 let mut rem = object_size;
93 for (offset, len) in reads {
94 let mut chunk = dev.read_at(offset, len).await;
95 chunk.truncate(usz!(rem));
97 rem = rem.saturating_sub(len);
98 yield chunk;
99 };
100 };
101
102 BlobdExportedObject {
103 created,
104 id: tuple.id,
105 key,
106 size: object_size,
107 data_stream: Box::pin(stream),
108 }
109 }
110}
111
112pub struct BlobdExporter {
114 entries: VecDeque<(BlobdExporterMarker, BlobdExporterEntry)>,
115}
116
117impl BlobdExporter {
118 pub(crate) async fn new(
119 partitions: &[PartitionLoader],
120 pages: &Pages,
121 offset: BlobdExporterMarker,
122 ) -> BlobdExporter {
123 let raw_tuple_areas: Arc<Mutex<Vec<(usize, Buf)>>> = Default::default();
124 iter(partitions.iter().enumerate())
125 .for_each_concurrent(None, |(id, p)| {
126 let raw_tuple_areas = raw_tuple_areas.clone();
127 async move {
128 let raw = p.load_raw_tuples_area().await;
130 raw_tuple_areas.lock().push((id, raw));
131 }
132 })
133 .await;
134 let mut entries = Vec::new();
135 for (part_id, raw) in Arc::into_inner(raw_tuple_areas).unwrap().into_inner() {
136 load_tuples_from_raw_tuples_area(&raw, pages, |_, tuple| {
137 let marker = BlobdExporterMarker {
138 object_id: tuple.id,
139 partition: part_id,
140 };
141 if marker >= offset && tuple.state == ObjectState::Committed {
142 entries.push((marker, BlobdExporterEntry {
143 tuple,
144 pages: pages.clone(),
145 partition_dev: partitions[part_id].dev.clone(),
146 }));
147 };
148 });
149 }
150 entries.sort_unstable_by_key(|e| e.0);
151 Self {
152 entries: entries.into(),
153 }
154 }
155
156 pub fn pop(&mut self) -> Option<(BlobdExporterMarker, BlobdExporterEntry)> {
157 self.entries.pop_front()
158 }
159
160 pub fn take(self) -> VecDeque<(BlobdExporterMarker, BlobdExporterEntry)> {
161 self.entries
162 }
163}