1use core::fmt::{self, Debug, Formatter};
2use core::pin::{pin, Pin};
3use core::task::{Context, Poll};
4use std::io::ErrorKind as IoErrorKind;
5
6use async_stream::try_stream;
7use calimero_primitives::blobs::BlobId;
8use calimero_primitives::hash::Hash;
9use calimero_store::key::BlobMeta as BlobMetaKey;
10use calimero_store::types::BlobMeta as BlobMetaValue;
11use calimero_store::Store as DataStore;
12use camino::Utf8PathBuf;
13use eyre::{Report, Result as EyreResult};
14use futures_util::io::BufReader;
15use futures_util::{AsyncRead, AsyncReadExt, Stream, StreamExt, TryStreamExt};
16use sha2::{Digest, Sha256};
17use thiserror::Error as ThisError;
18use tokio::fs::{create_dir_all, read as async_read, try_exists, write as async_write};
19use tracing::{debug, error, trace};
20
21pub mod config;
22mod utils;
23
24use config::BlobStoreConfig;
25
26pub const CHUNK_SIZE: usize = 1 << 20; const _: [(); { (usize::BITS - CHUNK_SIZE.leading_zeros()) > 32 } as usize] = [
28 ];
30
31#[derive(Clone, Debug)]
34pub struct BlobManager {
35 data_store: DataStore,
36 blob_store: FileSystem, }
38
39#[derive(Clone, Copy, Debug, Eq, PartialEq)]
40enum Value {
41 Full { hash: Hash, size: u64 },
42 Part { id: BlobId, _size: u64 },
43 Overflow { found: u64, expected: u64 },
44}
45
46#[derive(Clone, Debug, Default)]
47struct State {
48 digest: Sha256,
49 size: usize,
50}
51
52#[expect(clippy::exhaustive_enums, reason = "There are no more variants to add")]
53#[derive(Eq, Ord, Copy, Clone, Debug, PartialEq, PartialOrd)]
54pub enum Size {
55 Hint(u64),
56 Exact(u64),
57}
58
59impl Size {
60 const fn hint(&self) -> usize {
61 #[expect(
63 clippy::cast_possible_truncation,
64 reason = "This is never expected to overflow"
65 )]
66 match self {
67 Self::Hint(size) | Self::Exact(size) => *size as usize,
68 }
69 }
70
71 fn overflows(this: Option<&Self>, size: usize) -> Option<u64> {
72 let size = u64::try_from(size);
73
74 match this {
75 None | Some(Self::Hint(_)) => size.err().map(|_| u64::MAX),
76 Some(Self::Exact(exact)) => {
77 size.map_or_else(|_| Some(*exact), |s| (s > *exact).then_some(*exact))
78 }
79 }
80 }
81}
82
83impl BlobManager {
84 #[must_use]
85 pub const fn new(data_store: DataStore, blob_store: FileSystem) -> Self {
86 Self {
87 data_store,
88 blob_store,
89 }
90 }
91
92 pub fn package_path(&self, package: &str) -> Utf8PathBuf {
94 self.blob_store.package_path(package)
95 }
96
97 pub fn version_path(&self, package: &str, version: &str) -> Utf8PathBuf {
99 self.blob_store.version_path(package, version)
100 }
101
102 pub fn root_path(&self) -> Utf8PathBuf {
104 self.blob_store.root_path()
105 }
106
107 pub fn application_blob_path(&self, package: &str, version: &str, id: BlobId) -> Utf8PathBuf {
109 self.blob_store.application_blob_path(package, version, id)
110 }
111
112 pub fn has(&self, id: BlobId) -> EyreResult<bool> {
113 Ok(self.data_store.handle().has(&BlobMetaKey::new(id))?)
114 }
115
116 pub fn get(&self, id: BlobId) -> EyreResult<Option<Blob>> {
118 Blob::new(id, self.clone())
119 }
120
121 pub async fn delete(&self, id: BlobId) -> EyreResult<bool> {
122 self.blob_store.delete(id).await
123 }
124
125 pub async fn put<T>(&self, stream: T) -> EyreResult<(BlobId, Hash, u64)>
126 where
127 T: AsyncRead,
128 {
129 self.put_sized(None, stream).await
130 }
131
132 pub async fn put_sized<T>(
133 &self,
134 size: Option<Size>,
135 stream: T,
136 ) -> EyreResult<(BlobId, Hash, u64)>
137 where
138 T: AsyncRead,
139 {
140 debug!(
141 size_hint = size.as_ref().map(Size::hint),
142 "put_sized invoked"
143 );
144
145 let mut stream = pin!(BufReader::new(stream));
146
147 let blobs = try_stream!({
148 let mut buf = vec![0_u8; CHUNK_SIZE].into_boxed_slice();
149 let mut file = State::default();
150 let mut blob = State::default();
151 let mut chunk_index: u64 = 0;
152
153 let overflow_data = loop {
154 let bytes = stream.read(&mut buf[blob.size..]).await?;
155
156 let finished = bytes == 0;
157
158 if !finished {
159 let new_blob_size = blob.size.saturating_add(bytes);
160 let new_file_size = file.size.saturating_add(bytes);
161
162 let chunk = &buf[blob.size..new_blob_size];
163
164 blob.size = new_blob_size;
165 file.size = new_file_size;
166
167 trace!(
168 chunk_index,
169 chunk_bytes = chunk.len(),
170 file_bytes = file.size,
171 "read chunk data from stream"
172 );
173
174 if let Some(expected) = Size::overflows(size.as_ref(), new_file_size) {
175 trace!(
176 expected,
177 file_bytes = file.size,
178 "size overflow detected while chunking"
179 );
180 break Some(expected);
181 }
182
183 blob.digest.update(chunk);
184 file.digest.update(chunk);
185
186 if blob.size != buf.len() {
187 continue;
188 }
189 }
190
191 if blob.size == 0 {
192 break None;
193 }
194
195 let id = BlobId::from(*AsRef::<[u8; 32]>::as_ref(&blob.digest.finalize()));
196
197 self.data_store.handle().put(
198 &BlobMetaKey::new(id),
199 &BlobMetaValue::new(blob.size as u64, *id, Box::default()),
200 )?;
201
202 self.blob_store.put(id, &buf[..blob.size]).await?;
203
204 trace!(
205 ?id,
206 chunk_index,
207 chunk_size = blob.size,
208 file_bytes = file.size,
209 "blob chunk persisted"
210 );
211 chunk_index += 1;
212
213 yield Value::Part {
214 id,
215 _size: blob.size as u64,
216 };
217
218 if finished {
219 break None;
220 }
221
222 blob = State::default();
223 };
224
225 if let Some(expected) = overflow_data {
226 yield Value::Overflow {
227 found: file.size as u64,
228 expected,
229 };
230 } else {
231 yield Value::Full {
232 hash: Hash::from(*(AsRef::<[u8; 32]>::as_ref(&file.digest.finalize()))),
233 size: file.size as u64,
234 };
235 }
236 });
237
238 let blobs = typed_stream::<EyreResult<_>>(blobs).peekable();
239 let mut blobs = pin!(blobs);
240
241 let mut links = Vec::with_capacity(
242 size.map(|s| s.hint().saturating_div(CHUNK_SIZE))
243 .unwrap_or_default(),
244 );
245
246 let mut digest = Sha256::new();
247
248 while let Some(Value::Part { id, _size }) = blobs
249 .as_mut()
250 .next_if(|v| matches!(v, Ok(Value::Part { .. })))
251 .await
252 .transpose()?
253 {
254 links.push(BlobMetaKey::new(id));
255 digest.update(id.as_ref());
256 }
257
258 let chunk_count = links.len();
259
260 let (hash, size) = match blobs.try_next().await? {
261 Some(Value::Full { hash, size }) => (hash, size),
262 Some(Value::Overflow { found, expected }) => {
263 error!(
264 found,
265 expected, "blob size overflow while finalising stream"
266 );
267 eyre::bail!("expected {} bytes in the stream, found {}", expected, found)
268 }
269 _ => {
270 unreachable!("the root should always be emitted");
271 }
272 };
273
274 let id = BlobId::from(*(AsRef::<[u8; 32]>::as_ref(&digest.finalize())));
275
276 self.data_store.handle().put(
277 &BlobMetaKey::new(id),
278 &BlobMetaValue::new(size, *hash, links.into_boxed_slice()),
279 )?;
280
281 debug!(
282 ?id,
283 total_size = size,
284 chunk_count,
285 "blob metadata persisted"
286 );
287
288 debug!(
289 ?id,
290 total_size = size,
291 chunk_count,
292 "blob stored successfully"
293 );
294
295 Ok((id, hash, size)) }
297}
298
299fn typed_stream<T>(s: impl Stream<Item = T>) -> impl Stream<Item = T> {
300 s
301}
302
303pub struct Blob {
304 #[expect(clippy::type_complexity, reason = "Acceptable here")]
309 stream: Pin<Box<dyn Stream<Item = Result<Box<[u8]>, BlobError>> + Send>>,
310}
311
312impl Blob {
313 fn new(id: BlobId, blob_mgr: BlobManager) -> EyreResult<Option<Self>> {
314 let Some(blob_meta) = blob_mgr.data_store.handle().get(&BlobMetaKey::new(id))? else {
315 trace!(?id, "blob metadata not found");
316 return Ok(None);
317 };
318
319 #[expect(
320 clippy::semicolon_if_nothing_returned,
321 reason = "False positive; not possible with macro"
322 )]
323 let stream = Box::pin(try_stream!({
324 let mut chunk_index: u64 = 0;
325 trace!(
326 ?id,
327 link_count = blob_meta.links.len(),
328 "initializing blob stream"
329 );
330 if blob_meta.links.is_empty() {
331 let maybe_blob = blob_mgr.blob_store.get(id).await;
332 let maybe_blob = maybe_blob.map_err(BlobError::RepoError)?;
333 let blob = maybe_blob.ok_or_else(|| BlobError::DanglingBlob { id })?;
334 trace!(
335 ?id,
336 chunk_index,
337 chunk_size = blob.len(),
338 "serving single blob chunk"
339 );
340 chunk_index += 1;
341 return yield blob;
342 }
343
344 for link_meta in blob_meta.links {
345 let child_id = link_meta.blob_id();
346 trace!(?id, child_id = %child_id, "resolving linked blob");
347 let maybe_link = Self::new(child_id, blob_mgr.clone());
348 let maybe_link = maybe_link.map_err(BlobError::RepoError)?;
349 let mut link_stream = maybe_link.ok_or_else(|| {
350 error!(
351 ?id,
352 missing_child = %child_id,
353 "blob metadata missing referenced child"
354 );
355 BlobError::DanglingBlob { id: child_id }
356 })?;
357 while let Some(data) = link_stream.try_next().await? {
358 let current_index = chunk_index;
359 chunk_index += 1;
360 trace!(
361 ?id,
362 child_id = %child_id,
363 chunk_index = current_index,
364 chunk_size = data.len(),
365 "serving linked blob chunk"
366 );
367 yield data;
368 }
369 }
370 }));
371
372 Ok(Some(Self { stream }))
373 }
374}
375
376impl Debug for Blob {
377 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
378 f.debug_struct("Blob").finish()
380 }
381}
382
383#[derive(Debug, ThisError)]
384#[expect(variant_size_differences, reason = "Doesn't matter here")]
385#[non_exhaustive]
386pub enum BlobError {
387 #[error("encountered a dangling Blob ID: `{id}`, the blob store may be corrupt")]
388 DanglingBlob { id: BlobId },
389 #[error(transparent)]
390 RepoError(Report),
391}
392
393impl Stream for Blob {
394 type Item = Result<Box<[u8]>, BlobError>;
395
396 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
397 self.stream.poll_next_unpin(cx)
398 }
399}
400
401trait BlobRepository {
402 #[expect(dead_code, reason = "Will be used in future")]
403 async fn has(&self, id: BlobId) -> EyreResult<bool>;
404 async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>>;
405 async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()>;
406 async fn delete(&self, id: BlobId) -> EyreResult<bool>;
407}
408
409#[derive(Clone, Debug)]
410pub struct FileSystem {
411 root: Utf8PathBuf,
412 }
414
415impl FileSystem {
420 pub async fn new(config: &BlobStoreConfig) -> EyreResult<Self> {
421 create_dir_all(&config.path).await?;
422
423 Ok(Self {
424 root: config.path.clone(),
425 })
426 }
427
428 fn path(&self, id: BlobId) -> Utf8PathBuf {
429 self.root.join(id.as_str())
430 }
431
432 pub fn application_blob_path(&self, package: &str, version: &str, id: BlobId) -> Utf8PathBuf {
434 utils::validate_path_component(package, Some("package"));
435 utils::validate_path_component(version, Some("version"));
436
437 self.root
438 .join("applications")
439 .join(package)
440 .join(version)
441 .join("blobs")
442 .join(id.as_str())
443 }
444
445 pub fn package_path(&self, package: &str) -> Utf8PathBuf {
447 utils::validate_path_component(package, Some("package"));
448
449 self.root.join("applications").join(package)
450 }
451
452 pub fn version_path(&self, package: &str, version: &str) -> Utf8PathBuf {
454 utils::validate_path_component(version, Some("version"));
455
456 self.package_path(package).join(version)
457 }
458
459 pub fn root_path(&self) -> Utf8PathBuf {
461 self.root.clone()
462 }
463}
464
465impl BlobRepository for FileSystem {
466 async fn has(&self, id: BlobId) -> EyreResult<bool> {
467 try_exists(self.path(id)).await.map_err(Into::into)
468 }
469
470 async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>> {
471 match async_read(self.path(id)).await {
472 Ok(file) => Ok(Some(file.into_boxed_slice())),
473 Err(err) if err.kind() == IoErrorKind::NotFound => Ok(None),
474 Err(err) => Err(err.into()),
475 }
476 }
477
478 async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()> {
479 async_write(self.path(id), data).await.map_err(Into::into)
480 }
481
482 async fn delete(&self, id: BlobId) -> EyreResult<bool> {
483 let path = self.path(id);
484 match tokio::fs::remove_file(&path).await {
485 Ok(()) => Ok(true),
486 Err(err) if err.kind() == IoErrorKind::NotFound => Ok(false),
487 Err(err) => Err(err.into()),
488 }
489 }
490}
491
492#[cfg(test)]
493mod integration_tests_package_usage {
494 use tokio_util as _;
495}