1use core::fmt::{self, Debug, Formatter};
2use core::pin::{pin, Pin};
3use core::task::{Context, Poll};
4use std::io::ErrorKind as IoErrorKind;
5
6use async_stream::try_stream;
7use calimero_primitives::blobs::BlobId;
8use calimero_primitives::hash::Hash;
9use calimero_store::key::BlobMeta as BlobMetaKey;
10use calimero_store::types::BlobMeta as BlobMetaValue;
11use calimero_store::Store as DataStore;
12use camino::Utf8PathBuf;
13use eyre::{Report, Result as EyreResult};
14use futures_util::io::BufReader;
15use futures_util::{AsyncRead, AsyncReadExt, Stream, StreamExt, TryStreamExt};
16use sha2::{Digest, Sha256};
17use thiserror::Error as ThisError;
18use tokio::fs::{create_dir_all, read as async_read, try_exists, write as async_write};
19
20pub mod config;
21
22use config::BlobStoreConfig;
23
24pub const CHUNK_SIZE: usize = 1 << 20; const _: [(); { (usize::BITS - CHUNK_SIZE.leading_zeros()) > 32 } as usize] = [
26 ];
28
29#[derive(Clone, Debug)]
32pub struct BlobManager {
33 data_store: DataStore,
34 blob_store: FileSystem, }
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
38enum Value {
39 Full { hash: Hash, size: u64 },
40 Part { id: BlobId, _size: u64 },
41 Overflow { found: u64, expected: u64 },
42}
43
44#[derive(Clone, Debug, Default)]
45struct State {
46 digest: Sha256,
47 size: usize,
48}
49
50#[expect(clippy::exhaustive_enums, reason = "There are no more variants to add")]
51#[derive(Eq, Ord, Copy, Clone, Debug, PartialEq, PartialOrd)]
52pub enum Size {
53 Hint(u64),
54 Exact(u64),
55}
56
57impl Size {
58 const fn hint(&self) -> usize {
59 #[expect(
61 clippy::cast_possible_truncation,
62 reason = "This is never expected to overflow"
63 )]
64 match self {
65 Self::Hint(size) | Self::Exact(size) => *size as usize,
66 }
67 }
68
69 fn overflows(this: Option<&Self>, size: usize) -> Option<u64> {
70 let size = u64::try_from(size);
71
72 match this {
73 None | Some(Self::Hint(_)) => size.err().map(|_| u64::MAX),
74 Some(Self::Exact(exact)) => {
75 size.map_or_else(|_| Some(*exact), |s| (s > *exact).then_some(*exact))
76 }
77 }
78 }
79}
80
81impl BlobManager {
82 #[must_use]
83 pub const fn new(data_store: DataStore, blob_store: FileSystem) -> Self {
84 Self {
85 data_store,
86 blob_store,
87 }
88 }
89
90 pub fn has(&self, id: BlobId) -> EyreResult<bool> {
91 Ok(self.data_store.handle().has(&BlobMetaKey::new(id))?)
92 }
93
94 pub fn get(&self, id: BlobId) -> EyreResult<Option<Blob>> {
96 Blob::new(id, self.clone())
97 }
98
99 pub async fn delete(&self, id: BlobId) -> EyreResult<bool> {
100 self.blob_store.delete(id).await
101 }
102
103 pub async fn put<T>(&self, stream: T) -> EyreResult<(BlobId, Hash, u64)>
104 where
105 T: AsyncRead,
106 {
107 self.put_sized(None, stream).await
108 }
109
110 pub async fn put_sized<T>(
111 &self,
112 size: Option<Size>,
113 stream: T,
114 ) -> EyreResult<(BlobId, Hash, u64)>
115 where
116 T: AsyncRead,
117 {
118 let mut stream = pin!(BufReader::new(stream));
119
120 let blobs = try_stream!({
121 let mut buf = vec![0_u8; CHUNK_SIZE].into_boxed_slice();
122 let mut file = State::default();
123 let mut blob = State::default();
124
125 let overflow_data = loop {
126 let bytes = stream.read(&mut buf[blob.size..]).await?;
127
128 let finished = bytes == 0;
129
130 if !finished {
131 let new_blob_size = blob.size.saturating_add(bytes);
132 let new_file_size = file.size.saturating_add(bytes);
133
134 let chunk = &buf[blob.size..new_blob_size];
135
136 blob.size = new_blob_size;
137 file.size = new_file_size;
138
139 if let Some(expected) = Size::overflows(size.as_ref(), new_file_size) {
140 break Some(expected);
141 }
142
143 blob.digest.update(chunk);
144 file.digest.update(chunk);
145
146 if blob.size != buf.len() {
147 continue;
148 }
149 }
150
151 if blob.size == 0 {
152 break None;
153 }
154
155 let id = BlobId::from(*AsRef::<[u8; 32]>::as_ref(&blob.digest.finalize()));
156
157 self.data_store.handle().put(
158 &BlobMetaKey::new(id),
159 &BlobMetaValue::new(blob.size as u64, *id, Box::default()),
160 )?;
161
162 self.blob_store.put(id, &buf[..blob.size]).await?;
163
164 yield Value::Part {
165 id,
166 _size: blob.size as u64,
167 };
168
169 if finished {
170 break None;
171 }
172
173 blob = State::default();
174 };
175
176 if let Some(expected) = overflow_data {
177 yield Value::Overflow {
178 found: file.size as u64,
179 expected,
180 };
181 } else {
182 yield Value::Full {
183 hash: Hash::from(*(AsRef::<[u8; 32]>::as_ref(&file.digest.finalize()))),
184 size: file.size as u64,
185 };
186 }
187 });
188
189 let blobs = typed_stream::<EyreResult<_>>(blobs).peekable();
190 let mut blobs = pin!(blobs);
191
192 let mut links = Vec::with_capacity(
193 size.map(|s| s.hint().saturating_div(CHUNK_SIZE))
194 .unwrap_or_default(),
195 );
196
197 let mut digest = Sha256::new();
198
199 while let Some(Value::Part { id, _size }) = blobs
200 .as_mut()
201 .next_if(|v| matches!(v, Ok(Value::Part { .. })))
202 .await
203 .transpose()?
204 {
205 links.push(BlobMetaKey::new(id));
206 digest.update(id.as_ref());
207 }
208
209 let (hash, size) = match blobs.try_next().await? {
210 Some(Value::Full { hash, size }) => (hash, size),
211 Some(Value::Overflow { found, expected }) => {
212 eyre::bail!("expected {} bytes in the stream, found {}", expected, found)
213 }
214 _ => {
215 unreachable!("the root should always be emitted");
216 }
217 };
218
219 let id = BlobId::from(*(AsRef::<[u8; 32]>::as_ref(&digest.finalize())));
220
221 self.data_store.handle().put(
222 &BlobMetaKey::new(id),
223 &BlobMetaValue::new(size, *hash, links.into_boxed_slice()),
224 )?;
225
226 Ok((id, hash, size)) }
228}
229
230fn typed_stream<T>(s: impl Stream<Item = T>) -> impl Stream<Item = T> {
231 s
232}
233
234pub struct Blob {
235 #[expect(clippy::type_complexity, reason = "Acceptable here")]
240 stream: Pin<Box<dyn Stream<Item = Result<Box<[u8]>, BlobError>> + Send>>,
241}
242
243impl Blob {
244 fn new(id: BlobId, blob_mgr: BlobManager) -> EyreResult<Option<Self>> {
245 let Some(blob_meta) = blob_mgr.data_store.handle().get(&BlobMetaKey::new(id))? else {
246 return Ok(None);
247 };
248
249 #[expect(
250 clippy::semicolon_if_nothing_returned,
251 reason = "False positive; not possible with macro"
252 )]
253 let stream = Box::pin(try_stream!({
254 if blob_meta.links.is_empty() {
255 let maybe_blob = blob_mgr.blob_store.get(id).await;
256 let maybe_blob = maybe_blob.map_err(BlobError::RepoError)?;
257 let blob = maybe_blob.ok_or_else(|| BlobError::DanglingBlob { id })?;
258 return yield blob;
259 }
260
261 for link in blob_meta.links {
262 let maybe_link = Self::new(link.blob_id(), blob_mgr.clone());
263 let maybe_link = maybe_link.map_err(BlobError::RepoError)?;
264 let link = maybe_link.ok_or_else(|| BlobError::DanglingBlob { id })?;
265 for await blob in link {
266 yield blob?;
267 }
268 }
269 }));
270
271 Ok(Some(Self { stream }))
272 }
273}
274
275impl Debug for Blob {
276 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
277 f.debug_struct("Blob").finish()
279 }
280}
281
282#[derive(Debug, ThisError)]
283#[expect(variant_size_differences, reason = "Doesn't matter here")]
284#[non_exhaustive]
285pub enum BlobError {
286 #[error("encountered a dangling Blob ID: `{id}`, the blob store may be corrupt")]
287 DanglingBlob { id: BlobId },
288 #[error(transparent)]
289 RepoError(Report),
290}
291
292impl Stream for Blob {
293 type Item = Result<Box<[u8]>, BlobError>;
294
295 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
296 self.stream.poll_next_unpin(cx)
297 }
298}
299
300trait BlobRepository {
301 #[expect(dead_code, reason = "Will be used in future")]
302 async fn has(&self, id: BlobId) -> EyreResult<bool>;
303 async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>>;
304 async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()>;
305 async fn delete(&self, id: BlobId) -> EyreResult<bool>;
306}
307
308#[derive(Clone, Debug)]
309pub struct FileSystem {
310 root: Utf8PathBuf,
311 }
313
314impl FileSystem {
319 pub async fn new(config: &BlobStoreConfig) -> EyreResult<Self> {
320 create_dir_all(&config.path).await?;
321
322 Ok(Self {
323 root: config.path.clone(),
324 })
325 }
326
327 fn path(&self, id: BlobId) -> Utf8PathBuf {
328 self.root.join(id.as_str())
329 }
330}
331
332impl BlobRepository for FileSystem {
333 async fn has(&self, id: BlobId) -> EyreResult<bool> {
334 try_exists(self.path(id)).await.map_err(Into::into)
335 }
336
337 async fn get(&self, id: BlobId) -> EyreResult<Option<Box<[u8]>>> {
338 match async_read(self.path(id)).await {
339 Ok(file) => Ok(Some(file.into_boxed_slice())),
340 Err(err) if err.kind() == IoErrorKind::NotFound => Ok(None),
341 Err(err) => Err(err.into()),
342 }
343 }
344
345 async fn put(&self, id: BlobId, data: &[u8]) -> EyreResult<()> {
346 async_write(self.path(id), data).await.map_err(Into::into)
347 }
348
349 async fn delete(&self, id: BlobId) -> EyreResult<bool> {
350 let path = self.path(id);
351 match tokio::fs::remove_file(&path).await {
352 Ok(()) => Ok(true),
353 Err(err) if err.kind() == IoErrorKind::NotFound => Ok(false),
354 Err(err) => Err(err.into()),
355 }
356 }
357}
358
359#[cfg(test)]
360mod integration_tests_package_usage {
361 use tokio_util as _;
362}