1#![allow(non_snake_case)]
2
3use crate::backing_store::file::FileBackingStore;
4#[cfg(target_os = "linux")]
5use crate::backing_store::uring::UringBackingStore;
6#[cfg(target_os = "linux")]
7use crate::backing_store::uring::UringCfg;
8use crate::backing_store::BackingStore;
9use crate::backing_store::PartitionStore;
10use crate::pages::Pages;
11use crate::partition::PartitionLoader;
12use ahash::HashMap;
13use chrono::DateTime;
14use chrono::Utc;
15use exporter::BlobdExporter;
16use exporter::BlobdExporterMarker;
17use futures::future::join_all;
18use futures::stream::iter;
19use futures::StreamExt;
20use itertools::Itertools;
21use metrics::BlobdMetrics;
22use objects::ClusterLoadProgress;
23use off64::usz;
24use op::commit_object::op_commit_object;
25use op::commit_object::OpCommitObjectInput;
26use op::commit_object::OpCommitObjectOutput;
27use op::create_object::op_create_object;
28use op::create_object::OpCreateObjectInput;
29use op::create_object::OpCreateObjectOutput;
30use op::delete_object::op_delete_object;
31use op::delete_object::OpDeleteObjectInput;
32use op::delete_object::OpDeleteObjectOutput;
33use op::inspect_object::op_inspect_object;
34use op::inspect_object::OpInspectObjectInput;
35use op::inspect_object::OpInspectObjectOutput;
36use op::read_object::op_read_object;
37use op::read_object::OpReadObjectInput;
38use op::read_object::OpReadObjectOutput;
39use op::write_object::op_write_object;
40use op::write_object::OpWriteObjectInput;
41use op::write_object::OpWriteObjectOutput;
42use op::OpResult;
43use partition::Partition;
44use std::error::Error;
45use std::fs::OpenOptions;
46#[cfg(target_os = "linux")]
47use std::os::unix::prelude::OpenOptionsExt;
48use std::path::PathBuf;
49use std::sync::atomic::Ordering;
50use std::sync::Arc;
51use tokio::spawn;
52use tokio::time::sleep;
53use tracing::info;
54use tracing::info_span;
55use tracing::Instrument;
56
57pub mod allocator;
58pub mod backing_store;
59pub mod ctx;
60pub mod exporter;
61pub mod incomplete_token;
62pub mod metrics;
63pub mod object;
64pub mod objects;
65pub mod op;
66pub mod pages;
67pub mod partition;
68pub mod tuples;
69pub mod util;
70
71#[derive(Clone, Debug)]
72pub struct BlobdCfgPartition {
73 pub path: PathBuf,
75 pub offset: u64,
77 pub len: u64,
79}
80
81#[derive(Clone, Copy, PartialEq, Eq, Debug)]
82pub enum BlobdCfgBackingStore {
83 #[cfg(target_os = "linux")]
84 Uring,
85 File,
86}
87
88#[derive(Clone, Debug)]
89pub struct BlobdCfg {
90 pub backing_store: BlobdCfgBackingStore,
91 pub expire_incomplete_objects_after_secs: u64,
93 pub lpage_size_pow2: u8,
94 pub object_tuples_area_reserved_space: u64,
96 pub spage_size_pow2: u8,
98 #[cfg(target_os = "linux")]
100 pub uring_coop_taskrun: bool,
101 #[cfg(target_os = "linux")]
102 pub uring_defer_taskrun: bool,
103 #[cfg(target_os = "linux")]
104 pub uring_iopoll: bool,
105 #[cfg(target_os = "linux")]
106 pub uring_sqpoll: Option<u32>,
107}
108
109impl BlobdCfg {
110 pub fn lpage_size(&self) -> u64 {
111 1 << self.lpage_size_pow2
112 }
113
114 pub fn spage_size(&self) -> u64 {
115 1 << self.spage_size_pow2
116 }
117}
118
119pub struct BlobdLoader {
120 cfg: BlobdCfg,
121 pages: Pages,
122 partitions: Vec<PartitionLoader>,
123 metrics: BlobdMetrics,
124}
125
126impl BlobdLoader {
127 pub fn new(partition_cfg: Vec<BlobdCfgPartition>, cfg: BlobdCfg) -> Self {
128 assert!(cfg.expire_incomplete_objects_after_secs > 0);
129
130 let metrics = BlobdMetrics::default();
131 let pages = Pages::new(cfg.spage_size_pow2, cfg.lpage_size_pow2);
132 let mut devices = HashMap::<PathBuf, Arc<dyn BackingStore>>::default();
133 let partitions = partition_cfg
134 .into_iter()
135 .enumerate()
136 .map(|(i, part)| {
137 let dev = devices.entry(part.path.clone()).or_insert_with(|| {
138 let file = {
139 let mut opt = OpenOptions::new();
140 opt.read(true).write(true);
141 #[cfg(target_os = "linux")]
142 opt.custom_flags(libc::O_DIRECT);
143 opt.open(&part.path).unwrap()
144 };
145 match cfg.backing_store {
146 #[cfg(target_os = "linux")]
147 BlobdCfgBackingStore::Uring => {
148 Arc::new(UringBackingStore::new(file, pages.clone(), UringCfg {
149 coop_taskrun: cfg.uring_coop_taskrun,
150 defer_taskrun: cfg.uring_defer_taskrun,
151 iopoll: cfg.uring_iopoll,
152 sqpoll: cfg.uring_sqpoll,
153 }))
154 }
155 BlobdCfgBackingStore::File => Arc::new(FileBackingStore::new(file, pages.clone())),
156 }
157 });
158 PartitionLoader::new(
159 i,
160 PartitionStore::new(dev.clone(), part.offset, part.len),
161 cfg.clone(),
162 pages.clone(),
163 metrics.clone(),
164 )
165 })
166 .collect_vec();
167
168 Self {
169 cfg,
170 pages,
171 partitions,
172 metrics,
173 }
174 }
175
176 pub async fn format(&self) {
177 iter(&self.partitions)
178 .for_each_concurrent(None, |p| async move {
179 p.format().await;
180 })
181 .await;
182 }
183
184 pub async fn export(&self, offset: BlobdExporterMarker) -> BlobdExporter {
186 BlobdExporter::new(&self.partitions, &self.pages, offset).await
187 }
188
189 pub async fn load_and_start(self) -> Blobd {
190 let progress: Arc<ClusterLoadProgress> = Default::default();
191
192 spawn({
193 let partition_count = self.partitions.len();
194 let progress = progress.clone();
195 async move {
196 loop {
197 sleep(std::time::Duration::from_secs(3)).await;
198 if progress.partitions_completed.load(Ordering::Relaxed) >= partition_count {
199 break;
200 };
201 info!(
202 objects_loaded = progress.objects_loaded.load(Ordering::Relaxed),
203 objects_total = progress.objects_total.load(Ordering::Relaxed),
204 "initial loading progress"
205 );
206 }
207 }
208 });
209
210 let partitions = join_all(self.partitions.into_iter().map(|p| {
211 let progress = progress.clone();
212 async move { p.load_and_start(progress).await }
213 }))
214 .await;
215
216 Blobd {
217 cfg: self.cfg,
218 partitions: Arc::new(partitions),
219 metrics: self.metrics,
220 }
221 }
222}
223
224#[derive(Clone)]
225pub struct Blobd {
226 cfg: BlobdCfg,
227 partitions: Arc<Vec<Partition>>,
228 metrics: BlobdMetrics,
229}
230
231pub struct BlobdListObjectsOutputObject {
232 pub key: Vec<u8>,
233 pub created: DateTime<Utc>,
234 pub size: u64,
235 pub id: u128,
236}
237
238impl Blobd {
239 pub fn cfg(&self) -> &BlobdCfg {
241 &self.cfg
242 }
243
244 pub fn metrics(&self) -> &BlobdMetrics {
245 &self.metrics
246 }
247
248 fn get_partition_index_by_object_key(&self, key: &[u8]) -> usize {
249 let hash = twox_hash::xxh3::hash64(key);
250 usz!(hash) % self.partitions.len()
252 }
253
254 pub async fn commit_object(&self, input: OpCommitObjectInput) -> OpResult<OpCommitObjectOutput> {
255 let partition_index = input.incomplete_token.partition_idx;
256 let span = info_span!("commit op", partition_index);
257 op_commit_object(self.partitions[partition_index].ctx.clone(), input)
258 .instrument(span)
259 .await
260 }
261
262 pub async fn create_object(&self, input: OpCreateObjectInput) -> OpResult<OpCreateObjectOutput> {
263 let partition_index = self.get_partition_index_by_object_key(&input.key);
264 let span = info_span!("create op", partition_index);
265 op_create_object(self.partitions[partition_index].ctx.clone(), input)
266 .instrument(span)
267 .await
268 }
269
270 pub async fn delete_object(&self, input: OpDeleteObjectInput) -> OpResult<OpDeleteObjectOutput> {
271 let partition_index = self.get_partition_index_by_object_key(&input.key);
272 let span = info_span!("delete op", partition_index);
273 op_delete_object(self.partitions[partition_index].ctx.clone(), input)
274 .instrument(span)
275 .await
276 }
277
278 pub async fn inspect_object(
279 &self,
280 input: OpInspectObjectInput,
281 ) -> OpResult<OpInspectObjectOutput> {
282 let partition_index = self.get_partition_index_by_object_key(&input.key);
283 let span = info_span!("inspect op", partition_index);
284 op_inspect_object(self.partitions[partition_index].ctx.clone(), input)
285 .instrument(span)
286 .await
287 }
288
289 pub fn list_objects(&self) -> impl Iterator<Item = BlobdListObjectsOutputObject> + '_ {
295 self.partitions.iter().flat_map(|partition| {
296 partition
297 .ctx
298 .committed_objects
299 .iter()
300 .map(|o| BlobdListObjectsOutputObject {
301 created: o.created,
302 id: o.id(),
303 key: o.key.clone(),
304 size: o.size,
305 })
306 })
307 }
308
309 pub async fn read_object(&self, input: OpReadObjectInput) -> OpResult<OpReadObjectOutput> {
310 let partition_index = self.get_partition_index_by_object_key(&input.key);
311 let span = info_span!("read op", partition_index);
312 op_read_object(self.partitions[partition_index].ctx.clone(), input)
313 .instrument(span)
314 .await
315 }
316
317 pub async fn write_object<
318 D: AsRef<[u8]>,
319 S: Unpin + futures::Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
320 >(
321 &self,
322 input: OpWriteObjectInput<D, S>,
323 ) -> OpResult<OpWriteObjectOutput> {
324 let partition_index = input.incomplete_token.partition_idx;
325 let span = info_span!("write op", partition_index);
326 op_write_object(self.partitions[partition_index].ctx.clone(), input)
327 .instrument(span)
328 .await
329 }
330}