calimero_blobstore/
lib.rs

1use core::fmt::{self, Debug, Formatter};
2use core::pin::{pin, Pin};
3use core::task::{Context, Poll};
4use std::io::ErrorKind as IoErrorKind;
5
6use async_stream::try_stream;
7use calimero_primitives::blobs::BlobId;
8use calimero_primitives::hash::Hash;
9use calimero_store::key::BlobMeta as BlobMetaKey;
10use calimero_store::types::BlobMeta as BlobMetaValue;
11use calimero_store::Store as DataStore;
12use camino::Utf8PathBuf;
13use eyre::{Report, Result as EyreResult};
14use futures_util::io::BufReader;
15use futures_util::{AsyncRead, AsyncReadExt, Stream, StreamExt, TryStreamExt};
16use sha2::{Digest, Sha256};
17use thiserror::Error as ThisError;
18use tokio::fs::{create_dir_all, read as async_read, try_exists, write as async_write};
19
20pub mod config;
21
22use config::BlobStoreConfig;
23
24pub const CHUNK_SIZE: usize = 1 << 20; // 1MiB
25const _: [(); { (usize::BITS - CHUNK_SIZE.leading_zeros()) > 32 } as usize] = [
26    /* CHUNK_SIZE must be a 32-bit number */
27];
28
29// const MAX_LINKS_PER_BLOB: usize = 128;
30
31#[derive(Clone, Debug)]
32pub struct BlobManager {
33    data_store: DataStore,
34    blob_store: FileSystem, // Arc<dyn BlobRepository>
35}
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
38enum Value {
39    Full { hash: Hash, size: u64 },
40    Part { id: BlobId, _size: u64 },
41    Overflow { found: u64, expected: u64 },
42}
43
44#[derive(Clone, Debug, Default)]
45struct State {
46    digest: Sha256,
47    size: usize,
48}
49
50#[expect(clippy::exhaustive_enums, reason = "There are no more variants to add")]
51#[derive(Eq, Ord, Copy, Clone, Debug, PartialEq, PartialOrd)]
52pub enum Size {
53    Hint(u64),
54    Exact(u64),
55}
56
57impl Size {
58    const fn hint(&self) -> usize {
59        // TODO: Check this, as the incoming int is a u64
60        #[expect(
61            clippy::cast_possible_truncation,
62            reason = "This is never expected to overflow"
63        )]
64        match self {
65            Self::Hint(size) | Self::Exact(size) => *size as usize,
66        }
67    }
68
69    fn overflows(this: Option<&Self>, size: usize) -> Option<u64> {
70        let size = u64::try_from(size);
71
72        match this {
73            None | Some(Self::Hint(_)) => size.err().map(|_| u64::MAX),
74            Some(Self::Exact(exact)) => {
75                size.map_or_else(|_| Some(*exact), |s| (s > *exact).then_some(*exact))
76            }
77        }
78    }
79}
80
81impl BlobManager {
82    #[must_use]
83    pub const fn new(data_store: DataStore, blob_store: FileSystem) -> Self {
84        Self {
85            data_store,
86            blob_store,
87        }
88    }
89
90    pub fn has(&self, id: BlobId) -> EyreResult<bool> {
91        Ok(self.data_store.handle().has(&BlobMetaKey::new(id))?)
92    }
93
94    // return a concrete type that resolves to the content of the file
95    pub fn get(&self, id: BlobId) -> EyreResult<Option<Blob>> {
96        Blob::new(id, self.clone())
97    }
98
99    pub async fn delete(&self, id: BlobId) -> EyreResult<bool> {
100        self.blob_store.delete(id).await
101    }
102
103    pub async fn put<T>(&self, stream: T) -> EyreResult<(BlobId, Hash, u64)>
104    where
105        T: AsyncRead,
106    {
107        self.put_sized(None, stream).await
108    }
109
110    pub async fn put_sized<T>(
111        &self,
112        size: Option<Size>,
113        stream: T,
114    ) -> EyreResult<(BlobId, Hash, u64)>
115    where
116        T: AsyncRead,
117    {
118        let mut stream = pin!(BufReader::new(stream));
119
120        let blobs = try_stream!({
121            let mut buf = vec![0_u8; CHUNK_SIZE].into_boxed_slice();
122            let mut file = State::default();
123            let mut blob = State::default();
124
125            let overflow_data = loop {
126                let bytes = stream.read(&mut buf[blob.size..]).await?;
127
128                let finished = bytes == 0;
129
130                if !finished {
131                    let new_blob_size = blob.size.saturating_add(bytes);
132                    let new_file_size = file.size.saturating_add(bytes);
133
134                    let chunk = &buf[blob.size..new_blob_size];
135
136                    blob.size = new_blob_size;
137                    file.size = new_file_size;
138
139                    if let Some(expected) = Size::overflows(size.as_ref(), new_file_size) {
140                        break Some(expected);
141                    }
142
143                    blob.digest.update(chunk);
144                    file.digest.update(chunk);
145
146                    if blob.size != buf.len() {
147                        continue;
148                    }
149                }
150
151                if blob.size == 0 {
152                    break None;
153                }
154
155                let id = BlobId::from(*AsRef::<[u8; 32]>::as_ref(&blob.digest.finalize()));
156
157                self.data_store.handle().put(
158                    &BlobMetaKey::new(id),
159                    &BlobMetaValue::new(blob.size as u64, *id, Box::default()),
160                )?;
161
162                self.blob_store.put(id, &buf[..blob.size]).await?;
163
164                yield Value::Part {
165                    id,
166                    _size: blob.size as u64,
167                };
168
169                if finished {
170                    break None;
171                }
172
173                blob = State::default();
174            };
175
176            if let Some(expected) = overflow_data {
177                yield Value::Overflow {
178                    found: file.size as u64,
179                    expected,
180                };
181            } else {
182                yield Value::Full {
183                    hash: Hash::from(*(AsRef::<[u8; 32]>::as_ref(&file.digest.finalize()))),
184                    size: file.size as u64,
185                };
186            }
187        });
188
189        let blobs = typed_stream::<EyreResult<_>>(blobs).peekable();
190        let mut blobs = pin!(blobs);
191
192        let mut links = Vec::with_capacity(
193            size.map(|s| s.hint().saturating_div(CHUNK_SIZE))
194                .unwrap_or_default(),
195        );
196
197        let mut digest = Sha256::new();
198
199        while let Some(Value::Part { id, _size }) = blobs
200            .as_mut()
201            .next_if(|v| matches!(v, Ok(Value::Part { .. })))
202            .await
203            .transpose()?
204        {
205            links.push(BlobMetaKey::new(id));
206            digest.update(id.as_ref());
207        }
208
209        let (hash, size) = match blobs.try_next().await? {
210            Some(Value::Full { hash, size }) => (hash, size),
211            Some(Value::Overflow { found, expected }) => {
212                eyre::bail!("expected {} bytes in the stream, found {}", expected, found)
213            }
214            _ => {
215                unreachable!("the root should always be emitted");
216            }
217        };
218
219        let id = BlobId::from(*(AsRef::<[u8; 32]>::as_ref(&digest.finalize())));
220
221        self.data_store.handle().put(
222            &BlobMetaKey::new(id),
223            &BlobMetaValue::new(size, *hash, links.into_boxed_slice()),
224        )?;
225
226        Ok((id, hash, size)) // todo!: Ok(Blob { id, size, hash }::{fn stream()})
227    }
228}
229
230fn typed_stream<T>(s: impl Stream<Item = T>) -> impl Stream<Item = T> {
231    s
232}
233
234pub struct Blob {
235    // id: BlobId,
236    // meta: BlobMeta,
237
238    // blob_mgr: BlobManager,
239    #[expect(clippy::type_complexity, reason = "Acceptable here")]
240    stream: Pin<Box<dyn Stream<Item = Result<Box<[u8]>, BlobError>> + Send>>,
241}
242
243impl Blob {
244    fn new(id: BlobId, blob_mgr: BlobManager) -> EyreResult<Option<Self>> {
245        let Some(blob_meta) = blob_mgr.data_store.handle().get(&BlobMetaKey::new(id))? else {
246            return Ok(None);
247        };
248
249        #[expect(
250            clippy::semicolon_if_nothing_returned,
251            reason = "False positive; not possible with macro"
252        )]
253        let stream = Box::pin(try_stream!({
254            if blob_meta.links.is_empty() {
255                let maybe_blob = blob_mgr.blob_store.get(id).await;
256                let maybe_blob = maybe_blob.map_err(BlobError::RepoError)?;
257                let blob = maybe_blob.ok_or_else(|| BlobError::DanglingBlob { id })?;
258                return yield blob;
259            }
260
261            for link in blob_meta.links {
262                let maybe_link = Self::new(link.blob_id(), blob_mgr.clone());
263                let maybe_link = maybe_link.map_err(BlobError::RepoError)?;
264                let link = maybe_link.ok_or_else(|| BlobError::DanglingBlob { id })?;
265                for await blob in link {
266                    yield blob?;
267                }
268            }
269        }));
270
271        Ok(Some(Self { stream }))
272    }
273}
274
275impl Debug for Blob {
276    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
277        // TODO: Add more details if/when additional fields are added to Blob
278        f.debug_struct("Blob").finish()
279    }
280}
281
282#[derive(Debug, ThisError)]
283#[expect(variant_size_differences, reason = "Doesn't matter here")]
284#[non_exhaustive]
285pub enum BlobError {
286    #[error("encountered a dangling Blob ID: `{id}`, the blob store may be corrupt")]
287    DanglingBlob { id: BlobId },
288    #[error(transparent)]
289    RepoError(Report),
290}
291
292impl Stream for Blob {
293    type Item = Result<Box<[u8]>, BlobError>;
294
295    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
296        self.stream.poll_next_unpin(cx)
297    }
298}
299
300trait BlobRepository {
301    #[expect(dead_code, reason = "Will be used in future")]
302    async fn has(&self, id: BlobId) -> EyreResult<bool>;
303    async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>>;
304    async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()>;
305    async fn delete(&self, id: BlobId) -> EyreResult<bool>;
306}
307
308#[derive(Clone, Debug)]
309pub struct FileSystem {
310    root: Utf8PathBuf,
311    // strategy: ShardingStrategy,
312}
313
314// enum ShardingStrategy {
315//     NextToLast(Tolerance)
316// }
317
318impl FileSystem {
319    pub async fn new(config: &BlobStoreConfig) -> EyreResult<Self> {
320        create_dir_all(&config.path).await?;
321
322        Ok(Self {
323            root: config.path.clone(),
324        })
325    }
326
327    fn path(&self, id: BlobId) -> Utf8PathBuf {
328        self.root.join(id.as_str())
329    }
330}
331
332impl BlobRepository for FileSystem {
333    async fn has(&self, id: BlobId) -> EyreResult<bool> {
334        try_exists(self.path(id)).await.map_err(Into::into)
335    }
336
337    async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>> {
338        match async_read(self.path(id)).await {
339            Ok(file) => Ok(Some(file.into_boxed_slice())),
340            Err(err) if err.kind() == IoErrorKind::NotFound => Ok(None),
341            Err(err) => Err(err.into()),
342        }
343    }
344
345    async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()> {
346        async_write(self.path(id), data).await.map_err(Into::into)
347    }
348
349    async fn delete(&self, id: BlobId) -> EyreResult<bool> {
350        let path = self.path(id);
351        match tokio::fs::remove_file(&path).await {
352            Ok(()) => Ok(true),
353            Err(err) if err.kind() == IoErrorKind::NotFound => Ok(false),
354            Err(err) => Err(err.into()),
355        }
356    }
357}
358
359#[cfg(test)]
360mod integration_tests_package_usage {
361    use tokio_util as _;
362}