libblobd_direct/
lib.rs

1#![allow(non_snake_case)]
2
3use crate::backing_store::file::FileBackingStore;
4#[cfg(target_os = "linux")]
5use crate::backing_store::uring::UringBackingStore;
6#[cfg(target_os = "linux")]
7use crate::backing_store::uring::UringCfg;
8use crate::backing_store::BackingStore;
9use crate::backing_store::PartitionStore;
10use crate::pages::Pages;
11use crate::partition::PartitionLoader;
12use ahash::HashMap;
13use chrono::DateTime;
14use chrono::Utc;
15use exporter::BlobdExporter;
16use exporter::BlobdExporterMarker;
17use futures::future::join_all;
18use futures::stream::iter;
19use futures::StreamExt;
20use itertools::Itertools;
21use metrics::BlobdMetrics;
22use objects::ClusterLoadProgress;
23use off64::usz;
24use op::commit_object::op_commit_object;
25use op::commit_object::OpCommitObjectInput;
26use op::commit_object::OpCommitObjectOutput;
27use op::create_object::op_create_object;
28use op::create_object::OpCreateObjectInput;
29use op::create_object::OpCreateObjectOutput;
30use op::delete_object::op_delete_object;
31use op::delete_object::OpDeleteObjectInput;
32use op::delete_object::OpDeleteObjectOutput;
33use op::inspect_object::op_inspect_object;
34use op::inspect_object::OpInspectObjectInput;
35use op::inspect_object::OpInspectObjectOutput;
36use op::read_object::op_read_object;
37use op::read_object::OpReadObjectInput;
38use op::read_object::OpReadObjectOutput;
39use op::write_object::op_write_object;
40use op::write_object::OpWriteObjectInput;
41use op::write_object::OpWriteObjectOutput;
42use op::OpResult;
43use partition::Partition;
44use std::error::Error;
45use std::fs::OpenOptions;
46#[cfg(target_os = "linux")]
47use std::os::unix::prelude::OpenOptionsExt;
48use std::path::PathBuf;
49use std::sync::atomic::Ordering;
50use std::sync::Arc;
51use tokio::spawn;
52use tokio::time::sleep;
53use tracing::info;
54use tracing::info_span;
55use tracing::Instrument;
56
57pub mod allocator;
58pub mod backing_store;
59pub mod ctx;
60pub mod exporter;
61pub mod incomplete_token;
62pub mod metrics;
63pub mod object;
64pub mod objects;
65pub mod op;
66pub mod pages;
67pub mod partition;
68pub mod tuples;
69pub mod util;
70
71#[derive(Clone, Debug)]
72pub struct BlobdCfgPartition {
73  /// This file will be opened with O_RDWR | O_DIRECT.
74  pub path: PathBuf,
75  /// This must be a multiple of the lpage size.
76  pub offset: u64,
77  /// This must be a multiple of the lpage size.
78  pub len: u64,
79}
80
81#[derive(Clone, Copy, PartialEq, Eq, Debug)]
82pub enum BlobdCfgBackingStore {
83  #[cfg(target_os = "linux")]
84  Uring,
85  File,
86}
87
88#[derive(Clone, Debug)]
89pub struct BlobdCfg {
90  pub backing_store: BlobdCfgBackingStore,
91  /// This must be much greater than zero.
92  pub expire_incomplete_objects_after_secs: u64,
93  pub lpage_size_pow2: u8,
94  /// The amount of bytes per partition to reserve for storing object tuples. This can be expanded online later on, but only up to the leftmost heap allocation, so it's worth setting this to a high value. This will be rounded up to the nearest multiple of the lpage size.
95  pub object_tuples_area_reserved_space: u64,
96  /// The device must support atomic writes of this size. It's recommended to use the physical sector size, instead of the logical sector size, for better performance. On Linux, use `blockdev --getpbsz /dev/my_device` to get the physical sector size.
97  pub spage_size_pow2: u8,
98  /// Advanced options, only change if you know what you're doing.
99  #[cfg(target_os = "linux")]
100  pub uring_coop_taskrun: bool,
101  #[cfg(target_os = "linux")]
102  pub uring_defer_taskrun: bool,
103  #[cfg(target_os = "linux")]
104  pub uring_iopoll: bool,
105  #[cfg(target_os = "linux")]
106  pub uring_sqpoll: Option<u32>,
107}
108
109impl BlobdCfg {
110  pub fn lpage_size(&self) -> u64 {
111    1 << self.lpage_size_pow2
112  }
113
114  pub fn spage_size(&self) -> u64 {
115    1 << self.spage_size_pow2
116  }
117}
118
119pub struct BlobdLoader {
120  cfg: BlobdCfg,
121  pages: Pages,
122  partitions: Vec<PartitionLoader>,
123  metrics: BlobdMetrics,
124}
125
126impl BlobdLoader {
127  pub fn new(partition_cfg: Vec<BlobdCfgPartition>, cfg: BlobdCfg) -> Self {
128    assert!(cfg.expire_incomplete_objects_after_secs > 0);
129
130    let metrics = BlobdMetrics::default();
131    let pages = Pages::new(cfg.spage_size_pow2, cfg.lpage_size_pow2);
132    let mut devices = HashMap::<PathBuf, Arc<dyn BackingStore>>::default();
133    let partitions = partition_cfg
134      .into_iter()
135      .enumerate()
136      .map(|(i, part)| {
137        let dev = devices.entry(part.path.clone()).or_insert_with(|| {
138          let file = {
139            let mut opt = OpenOptions::new();
140            opt.read(true).write(true);
141            #[cfg(target_os = "linux")]
142            opt.custom_flags(libc::O_DIRECT);
143            opt.open(&part.path).unwrap()
144          };
145          match cfg.backing_store {
146            #[cfg(target_os = "linux")]
147            BlobdCfgBackingStore::Uring => {
148              Arc::new(UringBackingStore::new(file, pages.clone(), UringCfg {
149                coop_taskrun: cfg.uring_coop_taskrun,
150                defer_taskrun: cfg.uring_defer_taskrun,
151                iopoll: cfg.uring_iopoll,
152                sqpoll: cfg.uring_sqpoll,
153              }))
154            }
155            BlobdCfgBackingStore::File => Arc::new(FileBackingStore::new(file, pages.clone())),
156          }
157        });
158        PartitionLoader::new(
159          i,
160          PartitionStore::new(dev.clone(), part.offset, part.len),
161          cfg.clone(),
162          pages.clone(),
163          metrics.clone(),
164        )
165      })
166      .collect_vec();
167
168    Self {
169      cfg,
170      pages,
171      partitions,
172      metrics,
173    }
174  }
175
176  pub async fn format(&self) {
177    iter(&self.partitions)
178      .for_each_concurrent(None, |p| async move {
179        p.format().await;
180      })
181      .await;
182  }
183
184  /// Provide BlobdExporterEntry::default if no offset.
185  pub async fn export(&self, offset: BlobdExporterMarker) -> BlobdExporter {
186    BlobdExporter::new(&self.partitions, &self.pages, offset).await
187  }
188
189  pub async fn load_and_start(self) -> Blobd {
190    let progress: Arc<ClusterLoadProgress> = Default::default();
191
192    spawn({
193      let partition_count = self.partitions.len();
194      let progress = progress.clone();
195      async move {
196        loop {
197          sleep(std::time::Duration::from_secs(3)).await;
198          if progress.partitions_completed.load(Ordering::Relaxed) >= partition_count {
199            break;
200          };
201          info!(
202            objects_loaded = progress.objects_loaded.load(Ordering::Relaxed),
203            objects_total = progress.objects_total.load(Ordering::Relaxed),
204            "initial loading progress"
205          );
206        }
207      }
208    });
209
210    let partitions = join_all(self.partitions.into_iter().map(|p| {
211      let progress = progress.clone();
212      async move { p.load_and_start(progress).await }
213    }))
214    .await;
215
216    Blobd {
217      cfg: self.cfg,
218      partitions: Arc::new(partitions),
219      metrics: self.metrics,
220    }
221  }
222}
223
224#[derive(Clone)]
225pub struct Blobd {
226  cfg: BlobdCfg,
227  partitions: Arc<Vec<Partition>>,
228  metrics: BlobdMetrics,
229}
230
231pub struct BlobdListObjectsOutputObject {
232  pub key: Vec<u8>,
233  pub created: DateTime<Utc>,
234  pub size: u64,
235  pub id: u128,
236}
237
238impl Blobd {
239  // Provide getter to prevent mutating BlobdCfg.
240  pub fn cfg(&self) -> &BlobdCfg {
241    &self.cfg
242  }
243
244  pub fn metrics(&self) -> &BlobdMetrics {
245    &self.metrics
246  }
247
248  fn get_partition_index_by_object_key(&self, key: &[u8]) -> usize {
249    let hash = twox_hash::xxh3::hash64(key);
250    // We support partition counts that are not power-of-two because that's too inflexible and costly.
251    usz!(hash) % self.partitions.len()
252  }
253
254  pub async fn commit_object(&self, input: OpCommitObjectInput) -> OpResult<OpCommitObjectOutput> {
255    let partition_index = input.incomplete_token.partition_idx;
256    let span = info_span!("commit op", partition_index);
257    op_commit_object(self.partitions[partition_index].ctx.clone(), input)
258      .instrument(span)
259      .await
260  }
261
262  pub async fn create_object(&self, input: OpCreateObjectInput) -> OpResult<OpCreateObjectOutput> {
263    let partition_index = self.get_partition_index_by_object_key(&input.key);
264    let span = info_span!("create op", partition_index);
265    op_create_object(self.partitions[partition_index].ctx.clone(), input)
266      .instrument(span)
267      .await
268  }
269
270  pub async fn delete_object(&self, input: OpDeleteObjectInput) -> OpResult<OpDeleteObjectOutput> {
271    let partition_index = self.get_partition_index_by_object_key(&input.key);
272    let span = info_span!("delete op", partition_index);
273    op_delete_object(self.partitions[partition_index].ctx.clone(), input)
274      .instrument(span)
275      .await
276  }
277
278  pub async fn inspect_object(
279    &self,
280    input: OpInspectObjectInput,
281  ) -> OpResult<OpInspectObjectOutput> {
282    let partition_index = self.get_partition_index_by_object_key(&input.key);
283    let span = info_span!("inspect op", partition_index);
284    op_inspect_object(self.partitions[partition_index].ctx.clone(), input)
285      .instrument(span)
286      .await
287  }
288
289  /// WARNING: Use this method sparingly and with awareness of its behaviour:
290  /// - Deadlocks could occur if iteration occurs across threads, locks, or await points.
291  /// - There could be significant performance drops; some or all state operations (e.g. create, commit, delete) may be locked for the entirety of the iteration.
292  /// - There is no guarantee of consistency; object entries could be duplicated or skipped, and how entries for objects that are created, committed, or deleted during iteration are iterated is undefined.
293  /// - There is definitely no defined order.
294  pub fn list_objects(&self) -> impl Iterator<Item = BlobdListObjectsOutputObject> + '_ {
295    self.partitions.iter().flat_map(|partition| {
296      partition
297        .ctx
298        .committed_objects
299        .iter()
300        .map(|o| BlobdListObjectsOutputObject {
301          created: o.created,
302          id: o.id(),
303          key: o.key.clone(),
304          size: o.size,
305        })
306    })
307  }
308
309  pub async fn read_object(&self, input: OpReadObjectInput) -> OpResult<OpReadObjectOutput> {
310    let partition_index = self.get_partition_index_by_object_key(&input.key);
311    let span = info_span!("read op", partition_index);
312    op_read_object(self.partitions[partition_index].ctx.clone(), input)
313      .instrument(span)
314      .await
315  }
316
317  pub async fn write_object<
318    D: AsRef<[u8]>,
319    S: Unpin + futures::Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
320  >(
321    &self,
322    input: OpWriteObjectInput<D, S>,
323  ) -> OpResult<OpWriteObjectOutput> {
324    let partition_index = input.incomplete_token.partition_idx;
325    let span = info_span!("write op", partition_index);
326    op_write_object(self.partitions[partition_index].ctx.clone(), input)
327      .instrument(span)
328      .await
329  }
330}