1#![allow(non_snake_case)]
2
3use crate::allocator::Allocator;
4use crate::allocator::ALLOCSTATE_SIZE;
5use crate::bucket::Buckets;
6use crate::deleted_list::start_deleted_list_reaper_background_loop;
7use crate::deleted_list::DeletedList;
8use crate::deleted_list::DELETED_LIST_STATE_SIZE;
9use crate::incomplete_list::start_incomplete_list_reaper_background_loop;
10use crate::incomplete_list::IncompleteList;
11use crate::incomplete_list::INCOMPLETE_LIST_STATE_SIZE;
12use crate::metrics::METRICS_STATE_SIZE;
13use crate::object_id::ObjectIdSerial;
14use crate::stream::Stream;
15use crate::stream::STREAM_SIZE;
16#[cfg(test)]
17use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
18#[cfg(test)]
19use crate::test_util::journal::TestWriteJournal as WriteJournal;
20use crate::util::ceil_pow2;
21use bucket::BUCKETS_SIZE;
22use ctx::Ctx;
23use ctx::State;
24use futures::join;
25use metrics::BlobdMetrics;
26use op::commit_object::op_commit_object;
27use op::commit_object::OpCommitObjectInput;
28use op::commit_object::OpCommitObjectOutput;
29use op::create_object::op_create_object;
30use op::create_object::OpCreateObjectInput;
31use op::create_object::OpCreateObjectOutput;
32use op::delete_object::op_delete_object;
33use op::delete_object::OpDeleteObjectInput;
34use op::delete_object::OpDeleteObjectOutput;
35use op::inspect_object::op_inspect_object;
36use op::inspect_object::OpInspectObjectInput;
37use op::inspect_object::OpInspectObjectOutput;
38use op::read_object::op_read_object;
39use op::read_object::OpReadObjectInput;
40use op::read_object::OpReadObjectOutput;
41use op::write_object::op_write_object;
42use op::write_object::OpWriteObjectInput;
43use op::write_object::OpWriteObjectOutput;
44use op::OpResult;
45use page::Pages;
46#[cfg(not(test))]
47use seekable_async_file::SeekableAsyncFile;
48use std::error::Error;
49use std::sync::atomic::AtomicU64;
50use std::sync::Arc;
51use stream::StreamEvent;
52use stream::StreamEventExpiredError;
53use tokio::sync::Mutex;
54use tracing::info;
55#[cfg(not(test))]
56use write_journal::WriteJournal;
57
58pub mod allocator;
59pub mod bucket;
60pub mod ctx;
61pub mod deleted_list;
62pub mod incomplete_list;
63pub mod incomplete_token;
64pub mod metrics;
65pub mod object;
66pub mod object_id;
67pub mod op;
68pub mod page;
69pub mod stream;
70#[cfg(test)]
71pub mod test_util;
72pub mod util;
73
74#[derive(Clone, Debug)]
95pub struct BlobdCfg {
96 pub bucket_count_log2: u8,
97 pub bucket_lock_count_log2: u8,
98 pub reap_objects_after_secs: u64,
99 pub lpage_size_pow2: u8,
100 pub spage_size_pow2: u8,
101 pub versioning: bool,
102}
103
104impl BlobdCfg {
105 pub fn bucket_count(&self) -> u64 {
106 1 << self.bucket_count_log2
107 }
108
109 pub fn bucket_lock_count(&self) -> u64 {
110 1 << self.bucket_lock_count_log2
111 }
112
113 pub fn lpage_size(&self) -> u64 {
114 1 << self.lpage_size_pow2
115 }
116
117 pub fn spage_size(&self) -> u64 {
118 1 << self.spage_size_pow2
119 }
120}
121
122pub struct BlobdLoader {
123 device: SeekableAsyncFile,
124 device_size: Arc<AtomicU64>, journal: Arc<WriteJournal>,
126 cfg: BlobdCfg,
127
128 metrics_dev_offset: u64,
129 object_id_serial_dev_offset: u64,
130 stream_dev_offset: u64,
131 incomplete_list_dev_offset: u64,
132 deleted_list_dev_offset: u64,
133 allocator_dev_offset: u64,
134 buckets_dev_offset: u64,
135
136 heap_dev_offset: u64,
137}
138
139impl BlobdLoader {
140 pub fn new(device: SeekableAsyncFile, device_size: u64, cfg: BlobdCfg) -> Self {
141 assert!(cfg.bucket_count_log2 >= 12 && cfg.bucket_count_log2 <= 48);
142 let bucket_count = 1u64 << cfg.bucket_count_log2;
143
144 assert!(cfg.reap_objects_after_secs > 0);
145
146 const JOURNAL_SIZE_MIN: u64 = 1024 * 1024 * 32;
147
148 let metrics_dev_offset = 0;
149 let object_id_serial_dev_offset = metrics_dev_offset + METRICS_STATE_SIZE;
150 let stream_dev_offset = object_id_serial_dev_offset + 8;
151 let incomplete_list_dev_offset = stream_dev_offset + STREAM_SIZE;
152 let deleted_list_dev_offset = incomplete_list_dev_offset + INCOMPLETE_LIST_STATE_SIZE;
153 let allocator_dev_offset = deleted_list_dev_offset + DELETED_LIST_STATE_SIZE;
154 let buckets_dev_offset = allocator_dev_offset + ALLOCSTATE_SIZE;
155 let buckets_size = BUCKETS_SIZE(bucket_count);
156 let journal_dev_offset = buckets_dev_offset + buckets_size;
157 let min_reserved_space = journal_dev_offset + JOURNAL_SIZE_MIN;
158
159 let heap_dev_offset = ceil_pow2(min_reserved_space, cfg.lpage_size_pow2);
161 let journal_size = heap_dev_offset - journal_dev_offset;
162
163 info!(
164 device_size,
165 buckets_size,
166 journal_size,
167 reserved_size = heap_dev_offset,
168 lpage_size = 1 << cfg.lpage_size_pow2,
169 spage_size = 1 << cfg.spage_size_pow2,
170 "init",
171 );
172
173 #[cfg(not(test))]
174 let journal = Arc::new(WriteJournal::new(
175 device.clone(),
176 journal_dev_offset,
177 journal_size,
178 std::time::Duration::from_micros(200),
179 ));
180 #[cfg(test)]
181 let journal = Arc::new(WriteJournal::new(device.clone()));
182
183 Self {
184 allocator_dev_offset,
185 buckets_dev_offset,
186 cfg,
187 deleted_list_dev_offset,
188 device_size: Arc::new(AtomicU64::new(device_size)),
189 device,
190 heap_dev_offset,
191 incomplete_list_dev_offset,
192 journal,
193 metrics_dev_offset,
194 object_id_serial_dev_offset,
195 stream_dev_offset,
196 }
197 }
198
199 pub async fn format(&self) {
200 let dev = &self.device;
201 join! {
202 BlobdMetrics::format_device(dev, self.metrics_dev_offset),
203 ObjectIdSerial::format_device(dev, self.object_id_serial_dev_offset),
204 Stream::format_device(dev, self.stream_dev_offset),
205 IncompleteList::format_device(dev, self.incomplete_list_dev_offset),
206 DeletedList::format_device(dev, self.deleted_list_dev_offset),
207 Allocator::format_device(dev, self.allocator_dev_offset, self.heap_dev_offset),
208 Buckets::format_device(dev, self.buckets_dev_offset, self.cfg.bucket_count_log2),
209 self.journal.format_device(),
210 };
211 dev.sync_data().await;
212 }
213
214 pub async fn load(self) -> Blobd {
215 self.journal.recover().await;
216
217 let dev = &self.device;
218
219 let pages = Arc::new(Pages::new(
220 self.journal.clone(),
221 self.heap_dev_offset,
222 self.cfg.spage_size_pow2,
223 self.cfg.lpage_size_pow2,
224 ));
225
226 let metrics = Arc::new(BlobdMetrics::load_from_device(dev, self.metrics_dev_offset).await);
228 let (
229 object_id_serial,
230 (stream, stream_in_memory),
231 incomplete_list,
232 deleted_list,
233 allocator,
234 buckets,
235 ) = join! {
236 ObjectIdSerial::load_from_device(dev, self.object_id_serial_dev_offset),
237 Stream::load_from_device(dev, self.stream_dev_offset),
238 IncompleteList::load_from_device(dev.clone(), self.incomplete_list_dev_offset, pages.clone(), metrics.clone(), self.cfg.reap_objects_after_secs),
239 DeletedList::load_from_device(dev.clone(), self.deleted_list_dev_offset, pages.clone(), metrics.clone(), self.cfg.reap_objects_after_secs),
240 Allocator::load_from_device(dev, self.device_size.clone(), self.allocator_dev_offset, pages.clone(), metrics.clone(), self.heap_dev_offset),
241 Buckets::load_from_device(dev.clone(), self.journal.clone(), pages.clone(), self.buckets_dev_offset, self.cfg.bucket_lock_count_log2),
242 };
243
244 let ctx = Arc::new(Ctx {
245 buckets,
246 device: dev.clone(),
247 journal: self.journal.clone(),
248 metrics: metrics.clone(),
249 pages: pages.clone(),
250 reap_objects_after_secs: self.cfg.reap_objects_after_secs,
251 stream_in_memory,
252 versioning: self.cfg.versioning,
253 state: Mutex::new(State {
254 allocator,
255 deleted_list,
256 incomplete_list,
257 object_id_serial,
258 stream,
259 }),
260 });
261
262 Blobd { cfg: self.cfg, ctx }
263 }
264}
265
266#[derive(Clone)]
267pub struct Blobd {
268 cfg: BlobdCfg,
269 ctx: Arc<Ctx>,
270}
271
272impl Blobd {
273 pub fn cfg(&self) -> &BlobdCfg {
275 &self.cfg
276 }
277
278 pub fn metrics(&self) -> &Arc<BlobdMetrics> {
279 &self.ctx.metrics
280 }
281
282 pub async fn start(&self) {
284 join! {
285 self.ctx.journal.start_commit_background_loop(),
286 start_incomplete_list_reaper_background_loop(self.ctx.clone()),
287 start_deleted_list_reaper_background_loop(self.ctx.clone()),
288 };
289 }
290
291 pub async fn get_stream_event(
292 &self,
293 id: u64,
294 ) -> Result<Option<StreamEvent>, StreamEventExpiredError> {
295 self.ctx.stream_in_memory.get_event(id)
296 }
297
298 pub async fn commit_object(&self, input: OpCommitObjectInput) -> OpResult<OpCommitObjectOutput> {
299 op_commit_object(self.ctx.clone(), input).await
300 }
301
302 pub async fn create_object(&self, input: OpCreateObjectInput) -> OpResult<OpCreateObjectOutput> {
303 op_create_object(self.ctx.clone(), input).await
304 }
305
306 pub async fn delete_object(&self, input: OpDeleteObjectInput) -> OpResult<OpDeleteObjectOutput> {
307 op_delete_object(self.ctx.clone(), input).await
308 }
309
310 pub async fn inspect_object(
311 &self,
312 input: OpInspectObjectInput,
313 ) -> OpResult<OpInspectObjectOutput> {
314 op_inspect_object(self.ctx.clone(), input).await
315 }
316
317 pub async fn read_object(&self, input: OpReadObjectInput) -> OpResult<OpReadObjectOutput> {
318 op_read_object(self.ctx.clone(), input).await
319 }
320
321 pub async fn write_object<
322 D: AsRef<[u8]>,
323 S: Unpin + futures::Stream<Item = Result<D, Box<dyn Error + Send + Sync>>>,
324 >(
325 &self,
326 input: OpWriteObjectInput<D, S>,
327 ) -> OpResult<OpWriteObjectOutput> {
328 op_write_object(self.ctx.clone(), input).await
329 }
330}