libblobd_kv/
lib.rs

1#![allow(non_snake_case)]
2
3use crate::allocator::Allocator;
4use crate::backing_store::file::FileBackingStore;
5#[cfg(target_os = "linux")]
6use crate::backing_store::uring::UringBackingStore;
7#[cfg(target_os = "linux")]
8use crate::backing_store::uring::UringCfg;
9use crate::backing_store::BackingStore;
10use crate::object::LPAGE_SIZE_POW2;
11use crate::object::SPAGE_SIZE_POW2_MIN;
12use crate::pages::Pages;
13use crate::util::ceil_pow2;
14use crate::util::floor_pow2;
15use backing_store::BoundedStore;
16use ctx::Ctx;
17use log_buffer::LogBuffer;
18use metrics::BlobdMetrics;
19use object::format_device_for_tuples;
20use object::load_tuples_from_device;
21use op::delete_object::op_delete_object;
22use op::delete_object::OpDeleteObjectInput;
23use op::delete_object::OpDeleteObjectOutput;
24use op::read_object::op_read_object;
25use op::read_object::OpReadObjectInput;
26use op::read_object::OpReadObjectOutput;
27use op::write_object::op_write_object;
28use op::write_object::OpWriteObjectInput;
29use op::write_object::OpWriteObjectOutput;
30use op::OpResult;
31use parking_lot::Mutex;
32use std::fs::OpenOptions;
33#[cfg(target_os = "linux")]
34use std::os::unix::prelude::OpenOptionsExt;
35use std::path::PathBuf;
36use std::sync::Arc;
37use tokio::join;
38use tracing::info;
39
40pub mod allocator;
41pub mod backing_store;
42pub mod ctx;
43pub mod log_buffer;
44pub mod metrics;
45pub mod object;
46pub mod op;
47pub mod pages;
48pub mod util;
49
50#[derive(Clone, Copy, PartialEq, Eq, Debug)]
51pub enum BlobdCfgBackingStore {
52  #[cfg(target_os = "linux")]
53  Uring,
54  File,
55}
56
57#[derive(Clone, Debug)]
58pub struct BlobdCfg {
59  pub backing_store: BlobdCfgBackingStore,
60  /// This file will be opened with O_RDWR | O_DIRECT.
61  pub device_path: PathBuf,
62  /// This must be a multiple of the lpage size.
63  pub device_len: u64,
64  pub log_buffer_commit_threshold: u64,
65  pub log_buffer_size: u64,
66  /// The amount of bytes to reserve for storing object tuples. This cannot be changed later on. This will be rounded up to the nearest multiple of the lpage size.
67  pub object_tuples_area_reserved_space: u64,
68  /// The device must support atomic writes of this size. It's recommended to use the physical sector size, instead of the logical sector size, for better performance. On Linux, use `blockdev --getpbsz /dev/my_device` to get the physical sector size.
69  pub spage_size_pow2: u8,
70  /// Advanced options, only change if you know what you're doing.
71  #[cfg(target_os = "linux")]
72  pub uring_coop_taskrun: bool,
73  #[cfg(target_os = "linux")]
74  pub uring_defer_taskrun: bool,
75  #[cfg(target_os = "linux")]
76  pub uring_iopoll: bool,
77  #[cfg(target_os = "linux")]
78  pub uring_sqpoll: Option<u32>,
79}
80
81impl BlobdCfg {
82  pub fn lpage_size(&self) -> u64 {
83    1 << LPAGE_SIZE_POW2
84  }
85
86  pub fn spage_size(&self) -> u64 {
87    1 << self.spage_size_pow2
88  }
89}
90
91pub struct BlobdLoader {
92  cfg: BlobdCfg,
93  dev: Arc<dyn BackingStore>,
94  pages: Pages,
95  metrics: BlobdMetrics,
96  heap_dev_offset: u64,
97  heap_size: u64,
98  log_commit_threshold: u64,
99  log_data_dev_offset: u64,
100  log_data_size: u64,
101  log_state_dev_offset: u64,
102}
103
104impl BlobdLoader {
105  pub fn new(cfg: BlobdCfg) -> Self {
106    assert!(cfg.spage_size_pow2 >= SPAGE_SIZE_POW2_MIN);
107    assert!(cfg.spage_size_pow2 <= LPAGE_SIZE_POW2);
108
109    let dev_end_aligned = floor_pow2(cfg.device_len, cfg.spage_size_pow2);
110    let log_data_size = cfg.log_buffer_size;
111    let log_commit_threshold = cfg.log_buffer_commit_threshold;
112    assert!(log_data_size > 1024 * 1024 * 64); // Sanity check: ensure reasonable value and not misconfiguration.
113    assert!(log_commit_threshold < log_data_size);
114
115    let tuples_area_size = ceil_pow2(cfg.object_tuples_area_reserved_space, LPAGE_SIZE_POW2);
116    let heap_dev_offset = tuples_area_size;
117    let log_state_dev_offset = dev_end_aligned.checked_sub(cfg.spage_size()).unwrap();
118    let log_data_dev_offset = log_state_dev_offset.checked_sub(log_data_size).unwrap();
119    let heap_size = floor_pow2(
120      log_state_dev_offset.checked_sub(heap_dev_offset).unwrap(),
121      LPAGE_SIZE_POW2,
122    );
123    assert!(heap_size > 1024 * 1024 * 64); // Sanity check: ensure reasonable value and not misconfiguration.
124
125    let metrics = BlobdMetrics::default();
126    let pages = Pages::new(cfg.spage_size_pow2, LPAGE_SIZE_POW2);
127    let file = {
128      let mut opt = OpenOptions::new();
129      opt.read(true).write(true);
130      #[cfg(target_os = "linux")]
131      opt.custom_flags(libc::O_DIRECT);
132      opt.open(&cfg.device_path).unwrap()
133    };
134    let dev: Arc<dyn BackingStore> = match cfg.backing_store {
135      #[cfg(target_os = "linux")]
136      BlobdCfgBackingStore::Uring => Arc::new(UringBackingStore::new(
137        file,
138        pages.clone(),
139        metrics.clone(),
140        UringCfg {
141          coop_taskrun: cfg.uring_coop_taskrun,
142          defer_taskrun: cfg.uring_defer_taskrun,
143          iopoll: cfg.uring_iopoll,
144          sqpoll: cfg.uring_sqpoll,
145        },
146      )),
147      BlobdCfgBackingStore::File => Arc::new(FileBackingStore::new(file, pages.clone())),
148    };
149
150    Self {
151      cfg,
152      dev,
153      heap_dev_offset,
154      heap_size,
155      log_commit_threshold,
156      log_data_dev_offset,
157      log_data_size,
158      log_state_dev_offset,
159      metrics,
160      pages,
161    }
162  }
163
164  pub async fn format(&self) {
165    format_device_for_tuples(&self.dev, &self.pages, self.heap_dev_offset).await;
166    LogBuffer::format_device(
167      &BoundedStore::new(
168        self.dev.clone(),
169        self.log_state_dev_offset,
170        self.pages.spage_size(),
171      ),
172      &self.pages,
173    )
174    .await;
175    self.dev.sync().await;
176  }
177
178  pub async fn load_and_start(self) -> Blobd {
179    info!(
180      heap_dev_offset = self.heap_dev_offset,
181      heap_size = self.heap_size,
182      log_data_dev_offset = self.log_data_dev_offset,
183      log_data_size = self.log_data_size,
184      log_state_dev_offset = self.log_state_dev_offset,
185      "loading blobd",
186    );
187
188    let heap_allocator = Arc::new(Mutex::new(Allocator::new(
189      self.heap_dev_offset,
190      self.heap_size,
191      self.pages.clone(),
192      self.metrics.clone(),
193    )));
194
195    let (_, log_buffer) = join! {
196      load_tuples_from_device(
197        &self.dev,
198        &self.pages,
199        &self.metrics,
200        heap_allocator.clone(),
201        self.heap_dev_offset,
202      ),
203      LogBuffer::load_from_device(
204        self.dev.clone(),
205        BoundedStore::new(self.dev.clone(), 0, self.heap_dev_offset),
206        BoundedStore::new(
207          self.dev.clone(),
208          self.log_data_dev_offset,
209          self.log_data_size,
210        ),
211        BoundedStore::new(
212          self.dev.clone(),
213          self.log_state_dev_offset,
214          self.pages.spage_size(),
215        ),
216        heap_allocator.clone(),
217        self.pages.clone(),
218        self.metrics.clone(),
219        self.heap_dev_offset / self.pages.spage_size(),
220        self.log_commit_threshold,
221      ),
222    };
223
224    // WARNING: Only start after `load_tuples_from_device` has been completed; we cannot simply call this immediately after `LogBuffer::load_from_device`.
225    log_buffer.start_background_threads().await;
226
227    let ctx = Arc::new(Ctx {
228      log_buffer,
229      device: self.dev,
230      heap_allocator,
231      metrics: self.metrics.clone(),
232      pages: self.pages,
233    });
234
235    Blobd {
236      cfg: self.cfg,
237      ctx,
238      metrics: self.metrics,
239    }
240  }
241}
242
243#[derive(Clone)]
244pub struct Blobd {
245  cfg: BlobdCfg,
246  ctx: Arc<Ctx>,
247  metrics: BlobdMetrics,
248}
249
250impl Blobd {
251  // Provide getter to prevent mutating BlobdCfg.
252  pub fn cfg(&self) -> &BlobdCfg {
253    &self.cfg
254  }
255
256  pub fn metrics(&self) -> &BlobdMetrics {
257    &self.metrics
258  }
259
260  pub async fn wait_for_any_current_log_buffer_commit(&self) {
261    self.ctx.log_buffer.wait_for_any_current_commit().await;
262  }
263
264  pub async fn delete_object(&self, input: OpDeleteObjectInput) -> OpResult<OpDeleteObjectOutput> {
265    op_delete_object(self.ctx.clone(), input).await
266  }
267
268  pub async fn read_object(&self, input: OpReadObjectInput) -> OpResult<OpReadObjectOutput> {
269    op_read_object(self.ctx.clone(), input).await
270  }
271
272  pub async fn write_object(&self, input: OpWriteObjectInput) -> OpResult<OpWriteObjectOutput> {
273    op_write_object(self.ctx.clone(), input).await
274  }
275}