1#![allow(non_snake_case)]
2
3use crate::allocator::Allocator;
4use crate::backing_store::file::FileBackingStore;
5#[cfg(target_os = "linux")]
6use crate::backing_store::uring::UringBackingStore;
7#[cfg(target_os = "linux")]
8use crate::backing_store::uring::UringCfg;
9use crate::backing_store::BackingStore;
10use crate::object::LPAGE_SIZE_POW2;
11use crate::object::SPAGE_SIZE_POW2_MIN;
12use crate::pages::Pages;
13use crate::util::ceil_pow2;
14use crate::util::floor_pow2;
15use backing_store::BoundedStore;
16use ctx::Ctx;
17use log_buffer::LogBuffer;
18use metrics::BlobdMetrics;
19use object::format_device_for_tuples;
20use object::load_tuples_from_device;
21use op::delete_object::op_delete_object;
22use op::delete_object::OpDeleteObjectInput;
23use op::delete_object::OpDeleteObjectOutput;
24use op::read_object::op_read_object;
25use op::read_object::OpReadObjectInput;
26use op::read_object::OpReadObjectOutput;
27use op::write_object::op_write_object;
28use op::write_object::OpWriteObjectInput;
29use op::write_object::OpWriteObjectOutput;
30use op::OpResult;
31use parking_lot::Mutex;
32use std::fs::OpenOptions;
33#[cfg(target_os = "linux")]
34use std::os::unix::prelude::OpenOptionsExt;
35use std::path::PathBuf;
36use std::sync::Arc;
37use tokio::join;
38use tracing::info;
39
40pub mod allocator;
41pub mod backing_store;
42pub mod ctx;
43pub mod log_buffer;
44pub mod metrics;
45pub mod object;
46pub mod op;
47pub mod pages;
48pub mod util;
49
50#[derive(Clone, Copy, PartialEq, Eq, Debug)]
51pub enum BlobdCfgBackingStore {
52 #[cfg(target_os = "linux")]
53 Uring,
54 File,
55}
56
57#[derive(Clone, Debug)]
58pub struct BlobdCfg {
59 pub backing_store: BlobdCfgBackingStore,
60 pub device_path: PathBuf,
62 pub device_len: u64,
64 pub log_buffer_commit_threshold: u64,
65 pub log_buffer_size: u64,
66 pub object_tuples_area_reserved_space: u64,
68 pub spage_size_pow2: u8,
70 #[cfg(target_os = "linux")]
72 pub uring_coop_taskrun: bool,
73 #[cfg(target_os = "linux")]
74 pub uring_defer_taskrun: bool,
75 #[cfg(target_os = "linux")]
76 pub uring_iopoll: bool,
77 #[cfg(target_os = "linux")]
78 pub uring_sqpoll: Option<u32>,
79}
80
81impl BlobdCfg {
82 pub fn lpage_size(&self) -> u64 {
83 1 << LPAGE_SIZE_POW2
84 }
85
86 pub fn spage_size(&self) -> u64 {
87 1 << self.spage_size_pow2
88 }
89}
90
91pub struct BlobdLoader {
92 cfg: BlobdCfg,
93 dev: Arc<dyn BackingStore>,
94 pages: Pages,
95 metrics: BlobdMetrics,
96 heap_dev_offset: u64,
97 heap_size: u64,
98 log_commit_threshold: u64,
99 log_data_dev_offset: u64,
100 log_data_size: u64,
101 log_state_dev_offset: u64,
102}
103
104impl BlobdLoader {
105 pub fn new(cfg: BlobdCfg) -> Self {
106 assert!(cfg.spage_size_pow2 >= SPAGE_SIZE_POW2_MIN);
107 assert!(cfg.spage_size_pow2 <= LPAGE_SIZE_POW2);
108
109 let dev_end_aligned = floor_pow2(cfg.device_len, cfg.spage_size_pow2);
110 let log_data_size = cfg.log_buffer_size;
111 let log_commit_threshold = cfg.log_buffer_commit_threshold;
112 assert!(log_data_size > 1024 * 1024 * 64); assert!(log_commit_threshold < log_data_size);
114
115 let tuples_area_size = ceil_pow2(cfg.object_tuples_area_reserved_space, LPAGE_SIZE_POW2);
116 let heap_dev_offset = tuples_area_size;
117 let log_state_dev_offset = dev_end_aligned.checked_sub(cfg.spage_size()).unwrap();
118 let log_data_dev_offset = log_state_dev_offset.checked_sub(log_data_size).unwrap();
119 let heap_size = floor_pow2(
120 log_state_dev_offset.checked_sub(heap_dev_offset).unwrap(),
121 LPAGE_SIZE_POW2,
122 );
123 assert!(heap_size > 1024 * 1024 * 64); let metrics = BlobdMetrics::default();
126 let pages = Pages::new(cfg.spage_size_pow2, LPAGE_SIZE_POW2);
127 let file = {
128 let mut opt = OpenOptions::new();
129 opt.read(true).write(true);
130 #[cfg(target_os = "linux")]
131 opt.custom_flags(libc::O_DIRECT);
132 opt.open(&cfg.device_path).unwrap()
133 };
134 let dev: Arc<dyn BackingStore> = match cfg.backing_store {
135 #[cfg(target_os = "linux")]
136 BlobdCfgBackingStore::Uring => Arc::new(UringBackingStore::new(
137 file,
138 pages.clone(),
139 metrics.clone(),
140 UringCfg {
141 coop_taskrun: cfg.uring_coop_taskrun,
142 defer_taskrun: cfg.uring_defer_taskrun,
143 iopoll: cfg.uring_iopoll,
144 sqpoll: cfg.uring_sqpoll,
145 },
146 )),
147 BlobdCfgBackingStore::File => Arc::new(FileBackingStore::new(file, pages.clone())),
148 };
149
150 Self {
151 cfg,
152 dev,
153 heap_dev_offset,
154 heap_size,
155 log_commit_threshold,
156 log_data_dev_offset,
157 log_data_size,
158 log_state_dev_offset,
159 metrics,
160 pages,
161 }
162 }
163
164 pub async fn format(&self) {
165 format_device_for_tuples(&self.dev, &self.pages, self.heap_dev_offset).await;
166 LogBuffer::format_device(
167 &BoundedStore::new(
168 self.dev.clone(),
169 self.log_state_dev_offset,
170 self.pages.spage_size(),
171 ),
172 &self.pages,
173 )
174 .await;
175 self.dev.sync().await;
176 }
177
178 pub async fn load_and_start(self) -> Blobd {
179 info!(
180 heap_dev_offset = self.heap_dev_offset,
181 heap_size = self.heap_size,
182 log_data_dev_offset = self.log_data_dev_offset,
183 log_data_size = self.log_data_size,
184 log_state_dev_offset = self.log_state_dev_offset,
185 "loading blobd",
186 );
187
188 let heap_allocator = Arc::new(Mutex::new(Allocator::new(
189 self.heap_dev_offset,
190 self.heap_size,
191 self.pages.clone(),
192 self.metrics.clone(),
193 )));
194
195 let (_, log_buffer) = join! {
196 load_tuples_from_device(
197 &self.dev,
198 &self.pages,
199 &self.metrics,
200 heap_allocator.clone(),
201 self.heap_dev_offset,
202 ),
203 LogBuffer::load_from_device(
204 self.dev.clone(),
205 BoundedStore::new(self.dev.clone(), 0, self.heap_dev_offset),
206 BoundedStore::new(
207 self.dev.clone(),
208 self.log_data_dev_offset,
209 self.log_data_size,
210 ),
211 BoundedStore::new(
212 self.dev.clone(),
213 self.log_state_dev_offset,
214 self.pages.spage_size(),
215 ),
216 heap_allocator.clone(),
217 self.pages.clone(),
218 self.metrics.clone(),
219 self.heap_dev_offset / self.pages.spage_size(),
220 self.log_commit_threshold,
221 ),
222 };
223
224 log_buffer.start_background_threads().await;
226
227 let ctx = Arc::new(Ctx {
228 log_buffer,
229 device: self.dev,
230 heap_allocator,
231 metrics: self.metrics.clone(),
232 pages: self.pages,
233 });
234
235 Blobd {
236 cfg: self.cfg,
237 ctx,
238 metrics: self.metrics,
239 }
240 }
241}
242
243#[derive(Clone)]
244pub struct Blobd {
245 cfg: BlobdCfg,
246 ctx: Arc<Ctx>,
247 metrics: BlobdMetrics,
248}
249
250impl Blobd {
251 pub fn cfg(&self) -> &BlobdCfg {
253 &self.cfg
254 }
255
256 pub fn metrics(&self) -> &BlobdMetrics {
257 &self.metrics
258 }
259
260 pub async fn wait_for_any_current_log_buffer_commit(&self) {
261 self.ctx.log_buffer.wait_for_any_current_commit().await;
262 }
263
264 pub async fn delete_object(&self, input: OpDeleteObjectInput) -> OpResult<OpDeleteObjectOutput> {
265 op_delete_object(self.ctx.clone(), input).await
266 }
267
268 pub async fn read_object(&self, input: OpReadObjectInput) -> OpResult<OpReadObjectOutput> {
269 op_read_object(self.ctx.clone(), input).await
270 }
271
272 pub async fn write_object(&self, input: OpWriteObjectInput) -> OpResult<OpWriteObjectOutput> {
273 op_write_object(self.ctx.clone(), input).await
274 }
275}