Skip to main content

calimero_blobstore/
lib.rs

1use core::fmt::{self, Debug, Formatter};
2use core::pin::{pin, Pin};
3use core::task::{Context, Poll};
4use std::io::ErrorKind as IoErrorKind;
5
6use async_stream::try_stream;
7use calimero_primitives::blobs::BlobId;
8use calimero_primitives::hash::Hash;
9use calimero_store::key::BlobMeta as BlobMetaKey;
10use calimero_store::types::BlobMeta as BlobMetaValue;
11use calimero_store::Store as DataStore;
12use camino::Utf8PathBuf;
13use eyre::{Report, Result as EyreResult};
14use futures_util::io::BufReader;
15use futures_util::{AsyncRead, AsyncReadExt, Stream, StreamExt, TryStreamExt};
16use sha2::{Digest, Sha256};
17use thiserror::Error as ThisError;
18use tokio::fs::{create_dir_all, read as async_read, try_exists, write as async_write};
19use tracing::{debug, error, trace};
20
21pub mod config;
22mod utils;
23
24use config::BlobStoreConfig;
25
26pub const CHUNK_SIZE: usize = 1 << 20; // 1MiB
27const _: [(); { (usize::BITS - CHUNK_SIZE.leading_zeros()) > 32 } as usize] = [
28    /* CHUNK_SIZE must be a 32-bit number */
29];
30
31// const MAX_LINKS_PER_BLOB: usize = 128;
32
33#[derive(Clone, Debug)]
34pub struct BlobManager {
35    data_store: DataStore,
36    blob_store: FileSystem, // Arc<dyn BlobRepository>
37}
38
39#[derive(Clone, Copy, Debug, Eq, PartialEq)]
40enum Value {
41    Full { hash: Hash, size: u64 },
42    Part { id: BlobId, _size: u64 },
43    Overflow { found: u64, expected: u64 },
44}
45
46#[derive(Clone, Debug, Default)]
47struct State {
48    digest: Sha256,
49    size: usize,
50}
51
52#[expect(clippy::exhaustive_enums, reason = "There are no more variants to add")]
53#[derive(Eq, Ord, Copy, Clone, Debug, PartialEq, PartialOrd)]
54pub enum Size {
55    Hint(u64),
56    Exact(u64),
57}
58
59impl Size {
60    const fn hint(&self) -> usize {
61        // TODO: Check this, as the incoming int is a u64
62        #[expect(
63            clippy::cast_possible_truncation,
64            reason = "This is never expected to overflow"
65        )]
66        match self {
67            Self::Hint(size) | Self::Exact(size) => *size as usize,
68        }
69    }
70
71    fn overflows(this: Option<&Self>, size: usize) -> Option<u64> {
72        let size = u64::try_from(size);
73
74        match this {
75            None | Some(Self::Hint(_)) => size.err().map(|_| u64::MAX),
76            Some(Self::Exact(exact)) => {
77                size.map_or_else(|_| Some(*exact), |s| (s > *exact).then_some(*exact))
78            }
79        }
80    }
81}
82
83impl BlobManager {
84    #[must_use]
85    pub const fn new(data_store: DataStore, blob_store: FileSystem) -> Self {
86        Self {
87            data_store,
88            blob_store,
89        }
90    }
91
92    /// Get the package directory path
93    pub fn package_path(&self, package: &str) -> Utf8PathBuf {
94        self.blob_store.package_path(package)
95    }
96
97    /// Get the version directory path
98    pub fn version_path(&self, package: &str, version: &str) -> Utf8PathBuf {
99        self.blob_store.version_path(package, version)
100    }
101
102    /// Get the root/base path of the blobstore
103    pub fn root_path(&self) -> Utf8PathBuf {
104        self.blob_store.root_path()
105    }
106
107    /// Get the path for a blob stored in a package/version directory
108    pub fn application_blob_path(&self, package: &str, version: &str, id: BlobId) -> Utf8PathBuf {
109        self.blob_store.application_blob_path(package, version, id)
110    }
111
112    pub fn has(&self, id: BlobId) -> EyreResult<bool> {
113        Ok(self.data_store.handle().has(&BlobMetaKey::new(id))?)
114    }
115
116    // return a concrete type that resolves to the content of the file
117    pub fn get(&self, id: BlobId) -> EyreResult<Option<Blob>> {
118        Blob::new(id, self.clone())
119    }
120
121    pub async fn delete(&self, id: BlobId) -> EyreResult<bool> {
122        self.blob_store.delete(id).await
123    }
124
125    pub async fn put<T>(&self, stream: T) -> EyreResult<(BlobId, Hash, u64)>
126    where
127        T: AsyncRead,
128    {
129        self.put_sized(None, stream).await
130    }
131
132    pub async fn put_sized<T>(
133        &self,
134        size: Option<Size>,
135        stream: T,
136    ) -> EyreResult<(BlobId, Hash, u64)>
137    where
138        T: AsyncRead,
139    {
140        debug!(
141            size_hint = size.as_ref().map(Size::hint),
142            "put_sized invoked"
143        );
144
145        let mut stream = pin!(BufReader::new(stream));
146
147        let blobs = try_stream!({
148            let mut buf = vec![0_u8; CHUNK_SIZE].into_boxed_slice();
149            let mut file = State::default();
150            let mut blob = State::default();
151            let mut chunk_index: u64 = 0;
152
153            let overflow_data = loop {
154                let bytes = stream.read(&mut buf[blob.size..]).await?;
155
156                let finished = bytes == 0;
157
158                if !finished {
159                    let new_blob_size = blob.size.saturating_add(bytes);
160                    let new_file_size = file.size.saturating_add(bytes);
161
162                    let chunk = &buf[blob.size..new_blob_size];
163
164                    blob.size = new_blob_size;
165                    file.size = new_file_size;
166
167                    trace!(
168                        chunk_index,
169                        chunk_bytes = chunk.len(),
170                        file_bytes = file.size,
171                        "read chunk data from stream"
172                    );
173
174                    if let Some(expected) = Size::overflows(size.as_ref(), new_file_size) {
175                        trace!(
176                            expected,
177                            file_bytes = file.size,
178                            "size overflow detected while chunking"
179                        );
180                        break Some(expected);
181                    }
182
183                    blob.digest.update(chunk);
184                    file.digest.update(chunk);
185
186                    if blob.size != buf.len() {
187                        continue;
188                    }
189                }
190
191                if blob.size == 0 {
192                    break None;
193                }
194
195                let id = BlobId::from(*AsRef::<[u8; 32]>::as_ref(&blob.digest.finalize()));
196
197                self.data_store.handle().put(
198                    &BlobMetaKey::new(id),
199                    &BlobMetaValue::new(blob.size as u64, *id, Box::default()),
200                )?;
201
202                self.blob_store.put(id, &buf[..blob.size]).await?;
203
204                trace!(
205                    ?id,
206                    chunk_index,
207                    chunk_size = blob.size,
208                    file_bytes = file.size,
209                    "blob chunk persisted"
210                );
211                chunk_index += 1;
212
213                yield Value::Part {
214                    id,
215                    _size: blob.size as u64,
216                };
217
218                if finished {
219                    break None;
220                }
221
222                blob = State::default();
223            };
224
225            if let Some(expected) = overflow_data {
226                yield Value::Overflow {
227                    found: file.size as u64,
228                    expected,
229                };
230            } else {
231                yield Value::Full {
232                    hash: Hash::from(*(AsRef::<[u8; 32]>::as_ref(&file.digest.finalize()))),
233                    size: file.size as u64,
234                };
235            }
236        });
237
238        let blobs = typed_stream::<EyreResult<_>>(blobs).peekable();
239        let mut blobs = pin!(blobs);
240
241        let mut links = Vec::with_capacity(
242            size.map(|s| s.hint().saturating_div(CHUNK_SIZE))
243                .unwrap_or_default(),
244        );
245
246        let mut digest = Sha256::new();
247
248        while let Some(Value::Part { id, _size }) = blobs
249            .as_mut()
250            .next_if(|v| matches!(v, Ok(Value::Part { .. })))
251            .await
252            .transpose()?
253        {
254            links.push(BlobMetaKey::new(id));
255            digest.update(id.as_ref());
256        }
257
258        let chunk_count = links.len();
259
260        let (hash, size) = match blobs.try_next().await? {
261            Some(Value::Full { hash, size }) => (hash, size),
262            Some(Value::Overflow { found, expected }) => {
263                error!(
264                    found,
265                    expected, "blob size overflow while finalising stream"
266                );
267                eyre::bail!("expected {} bytes in the stream, found {}", expected, found)
268            }
269            _ => {
270                unreachable!("the root should always be emitted");
271            }
272        };
273
274        let id = BlobId::from(*(AsRef::<[u8; 32]>::as_ref(&digest.finalize())));
275
276        self.data_store.handle().put(
277            &BlobMetaKey::new(id),
278            &BlobMetaValue::new(size, *hash, links.into_boxed_slice()),
279        )?;
280
281        debug!(
282            ?id,
283            total_size = size,
284            chunk_count,
285            "blob metadata persisted"
286        );
287
288        debug!(
289            ?id,
290            total_size = size,
291            chunk_count,
292            "blob stored successfully"
293        );
294
295        Ok((id, hash, size)) // todo!: Ok(Blob { id, size, hash }::{fn stream()})
296    }
297}
298
299fn typed_stream<T>(s: impl Stream<Item = T>) -> impl Stream<Item = T> {
300    s
301}
302
303pub struct Blob {
304    // id: BlobId,
305    // meta: BlobMeta,
306
307    // blob_mgr: BlobManager,
308    #[expect(clippy::type_complexity, reason = "Acceptable here")]
309    stream: Pin<Box<dyn Stream<Item = Result<Box<[u8]>, BlobError>> + Send>>,
310}
311
312impl Blob {
313    fn new(id: BlobId, blob_mgr: BlobManager) -> EyreResult<Option<Self>> {
314        let Some(blob_meta) = blob_mgr.data_store.handle().get(&BlobMetaKey::new(id))? else {
315            trace!(?id, "blob metadata not found");
316            return Ok(None);
317        };
318
319        #[expect(
320            clippy::semicolon_if_nothing_returned,
321            reason = "False positive; not possible with macro"
322        )]
323        let stream = Box::pin(try_stream!({
324            let mut chunk_index: u64 = 0;
325            trace!(
326                ?id,
327                link_count = blob_meta.links.len(),
328                "initializing blob stream"
329            );
330            if blob_meta.links.is_empty() {
331                let maybe_blob = blob_mgr.blob_store.get(id).await;
332                let maybe_blob = maybe_blob.map_err(BlobError::RepoError)?;
333                let blob = maybe_blob.ok_or_else(|| BlobError::DanglingBlob { id })?;
334                trace!(
335                    ?id,
336                    chunk_index,
337                    chunk_size = blob.len(),
338                    "serving single blob chunk"
339                );
340                chunk_index += 1;
341                return yield blob;
342            }
343
344            for link_meta in blob_meta.links {
345                let child_id = link_meta.blob_id();
346                trace!(?id, child_id = %child_id, "resolving linked blob");
347                let maybe_link = Self::new(child_id, blob_mgr.clone());
348                let maybe_link = maybe_link.map_err(BlobError::RepoError)?;
349                let mut link_stream = maybe_link.ok_or_else(|| {
350                    error!(
351                        ?id,
352                        missing_child = %child_id,
353                        "blob metadata missing referenced child"
354                    );
355                    BlobError::DanglingBlob { id: child_id }
356                })?;
357                while let Some(data) = link_stream.try_next().await? {
358                    let current_index = chunk_index;
359                    chunk_index += 1;
360                    trace!(
361                        ?id,
362                        child_id = %child_id,
363                        chunk_index = current_index,
364                        chunk_size = data.len(),
365                        "serving linked blob chunk"
366                    );
367                    yield data;
368                }
369            }
370        }));
371
372        Ok(Some(Self { stream }))
373    }
374}
375
376impl Debug for Blob {
377    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
378        // TODO: Add more details if/when additional fields are added to Blob
379        f.debug_struct("Blob").finish()
380    }
381}
382
383#[derive(Debug, ThisError)]
384#[expect(variant_size_differences, reason = "Doesn't matter here")]
385#[non_exhaustive]
386pub enum BlobError {
387    #[error("encountered a dangling Blob ID: `{id}`, the blob store may be corrupt")]
388    DanglingBlob { id: BlobId },
389    #[error(transparent)]
390    RepoError(Report),
391}
392
393impl Stream for Blob {
394    type Item = Result<Box<[u8]>, BlobError>;
395
396    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
397        self.stream.poll_next_unpin(cx)
398    }
399}
400
401trait BlobRepository {
402    #[expect(dead_code, reason = "Will be used in future")]
403    async fn has(&self, id: BlobId) -> EyreResult<bool>;
404    async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>>;
405    async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()>;
406    async fn delete(&self, id: BlobId) -> EyreResult<bool>;
407}
408
409#[derive(Clone, Debug)]
410pub struct FileSystem {
411    root: Utf8PathBuf,
412    // strategy: ShardingStrategy,
413}
414
415// enum ShardingStrategy {
416//     NextToLast(Tolerance)
417// }
418
419impl FileSystem {
420    pub async fn new(config: &BlobStoreConfig) -> EyreResult<Self> {
421        create_dir_all(&config.path).await?;
422
423        Ok(Self {
424            root: config.path.clone(),
425        })
426    }
427
428    fn path(&self, id: BlobId) -> Utf8PathBuf {
429        self.root.join(id.as_str())
430    }
431
432    /// Get the path for a blob stored in a package/version directory
433    pub fn application_blob_path(&self, package: &str, version: &str, id: BlobId) -> Utf8PathBuf {
434        utils::validate_path_component(package, Some("package"));
435        utils::validate_path_component(version, Some("version"));
436
437        self.root
438            .join("applications")
439            .join(package)
440            .join(version)
441            .join("blobs")
442            .join(id.as_str())
443    }
444
445    /// Get the package directory path
446    pub fn package_path(&self, package: &str) -> Utf8PathBuf {
447        utils::validate_path_component(package, Some("package"));
448
449        self.root.join("applications").join(package)
450    }
451
452    /// Get the version directory path
453    pub fn version_path(&self, package: &str, version: &str) -> Utf8PathBuf {
454        utils::validate_path_component(version, Some("version"));
455
456        self.package_path(package).join(version)
457    }
458
459    /// Get the root/base path of the blobstore
460    pub fn root_path(&self) -> Utf8PathBuf {
461        self.root.clone()
462    }
463}
464
465impl BlobRepository for FileSystem {
466    async fn has(&self, id: BlobId) -> EyreResult<bool> {
467        try_exists(self.path(id)).await.map_err(Into::into)
468    }
469
470    async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>> {
471        match async_read(self.path(id)).await {
472            Ok(file) => Ok(Some(file.into_boxed_slice())),
473            Err(err) if err.kind() == IoErrorKind::NotFound => Ok(None),
474            Err(err) => Err(err.into()),
475        }
476    }
477
478    async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()> {
479        async_write(self.path(id), data).await.map_err(Into::into)
480    }
481
482    async fn delete(&self, id: BlobId) -> EyreResult<bool> {
483        let path = self.path(id);
484        match tokio::fs::remove_file(&path).await {
485            Ok(()) => Ok(true),
486            Err(err) if err.kind() == IoErrorKind::NotFound => Ok(false),
487            Err(err) => Err(err.into()),
488        }
489    }
490}
491
492#[cfg(test)]
493mod integration_tests_package_usage {
494    use tokio_util as _;
495}