calimero_node_primitives/client/
blob.rs1use std::sync::Arc;
2
3use calimero_blobstore::{Blob, Size};
4use calimero_primitives::blobs::{BlobId, BlobInfo, BlobMetadata};
5use calimero_primitives::context::ContextId;
6use calimero_primitives::hash::Hash;
7use calimero_store::key;
8use calimero_store::layer::LayerExt;
9use eyre::bail;
10use futures_util::{AsyncRead, StreamExt};
11use libp2p::PeerId;
12
13use super::NodeClient;
14use crate::messages::get_blob_bytes::GetBlobBytesRequest;
15use crate::messages::NodeMessage::GetBlobBytes;
16
17impl NodeClient {
18 pub async fn add_blob<S: AsyncRead>(
23 &self,
24 stream: S,
25 expected_size: Option<u64>,
26 expected_hash: Option<&Hash>,
27 ) -> eyre::Result<(BlobId, u64)> {
28 let (blob_id, hash, size) = self
29 .blobstore
30 .put_sized(expected_size.map(Size::Exact), stream)
31 .await?;
32
33 if matches!(expected_hash, Some(expected_hash) if hash != *expected_hash) {
34 bail!("fatal: blob hash mismatch");
35 }
36
37 if matches!(expected_size, Some(expected_size) if size != expected_size) {
38 bail!("fatal: blob size mismatch");
39 }
40
41 Ok((blob_id, size))
42 }
43
44 pub fn get_blob<'a>(
47 &'a self,
48 blob_id: &'a BlobId,
49 context_id: Option<&'a ContextId>,
50 ) -> impl std::future::Future<Output = eyre::Result<Option<Blob>>> + 'a {
51 async move {
52 let Some(stream) = self.blobstore.get(*blob_id)? else {
54 if context_id.is_none() {
56 return Ok(None);
57 }
58
59 let context_id = context_id.unwrap();
61 tracing::info!(
62 blob_id = %blob_id,
63 context_id = %context_id,
64 "Blob not found locally, attempting network discovery"
65 );
66
67 const MAX_RETRIES: usize = 3;
68 const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(2);
69
70 for attempt in 1..=MAX_RETRIES {
71 tracing::debug!(
72 blob_id = %blob_id,
73 context_id = %context_id,
74 attempt,
75 max_attempts = MAX_RETRIES,
76 "Attempting network discovery"
77 );
78
79 let peers = match self
80 .network_client
81 .query_blob(*blob_id, Some(*context_id))
82 .await
83 {
84 Ok(peers) => peers,
85 Err(e) => {
86 tracing::warn!(
87 blob_id = %blob_id,
88 context_id = %context_id,
89 attempt,
90 error = %e,
91 "Failed to query DHT for blob"
92 );
93 if attempt < MAX_RETRIES {
94 tokio::time::sleep(RETRY_DELAY).await;
95 continue;
96 }
97 return Err(e);
98 }
99 };
100
101 if peers.is_empty() {
102 tracing::info!(
103 blob_id = %blob_id,
104 context_id = %context_id,
105 attempt,
106 "No peers found with blob"
107 );
108 if attempt < MAX_RETRIES {
109 tokio::time::sleep(RETRY_DELAY).await;
110 continue;
111 }
112 return Ok(None);
113 }
114
115 tracing::info!(
116 blob_id = %blob_id,
117 context_id = %context_id,
118 peer_count = peers.len(),
119 attempt,
120 "Found {} peers with blob, attempting download", peers.len()
121 );
122
123 for (peer_index, peer_id) in peers.iter().enumerate() {
125 tracing::debug!(
126 peer_id = %peer_id,
127 peer_index = peer_index + 1,
128 total_peers = peers.len(),
129 attempt,
130 "Attempting to download blob from peer"
131 );
132
133 match self
134 .network_client
135 .request_blob(*blob_id, *context_id, *peer_id)
136 .await
137 {
138 Ok(Some(data)) => {
139 tracing::info!(
140 blob_id = %blob_id,
141 peer_id = %peer_id,
142 size = data.len(),
143 attempt,
144 "Successfully downloaded blob from network"
145 );
146
147 let (blob_id_stored, _size) = self
149 .add_blob(data.as_slice(), Some(data.len() as u64), None)
150 .await?;
151
152 if blob_id_stored != *blob_id {
154 tracing::warn!(
155 expected = %blob_id,
156 actual = %blob_id_stored,
157 "Downloaded blob ID mismatch"
158 );
159 continue;
160 }
161
162 return Ok(self.blobstore.get(*blob_id)?);
164 }
165 Ok(None) => {
166 tracing::debug!(
167 peer_id = %peer_id,
168 attempt,
169 "Peer doesn't have the blob"
170 );
171 }
172 Err(e) => {
173 tracing::warn!(
174 peer_id = %peer_id,
175 error = %e,
176 attempt,
177 "Failed to download blob from peer"
178 );
179 }
180 }
181 }
182
183 if attempt < MAX_RETRIES {
185 tracing::info!(
186 blob_id = %blob_id,
187 context_id = %context_id,
188 attempt,
189 "All peers failed, retrying in {} seconds",
190 RETRY_DELAY.as_secs()
191 );
192 tokio::time::sleep(RETRY_DELAY).await;
193 }
194 }
195
196 tracing::debug!(
197 blob_id = %blob_id,
198 context_id = %context_id,
199 max_attempts = MAX_RETRIES,
200 "Failed to download blob from any peer after all retry attempts"
201 );
202 return Ok(None);
203 };
204
205 Ok(Some(stream))
206 }
207 }
208
209 pub async fn get_blob_bytes(
212 &self,
213 blob_id: &BlobId,
214 context_id: Option<&ContextId>,
215 ) -> eyre::Result<Option<Arc<[u8]>>> {
216 if **blob_id == [0; 32] {
217 return Ok(None);
218 }
219
220 let request = GetBlobBytesRequest { blob_id: *blob_id };
222
223 let (tx, rx) = tokio::sync::oneshot::channel();
224
225 match self
226 .node_manager
227 .send(GetBlobBytes {
228 request,
229 outcome: tx,
230 })
231 .await
232 {
233 Ok(_) => {
234 if let Ok(response) = rx.await {
235 if let Ok(response) = response {
236 if response.bytes.is_some() {
237 return Ok(response.bytes);
238 }
239 }
240 }
241 }
242 Err(_) => {
243 }
245 }
246
247 if let Some(context_id) = context_id {
248 let Some(mut blob) = self.get_blob(blob_id, Some(context_id)).await? else {
249 return Ok(None);
250 };
251
252 let mut data = Vec::new();
253 while let Some(chunk) = blob.next().await {
254 data.extend_from_slice(&chunk?);
255 }
256
257 Ok(Some(data.into()))
258 } else {
259 Ok(None)
261 }
262 }
263
264 pub async fn find_blob_providers(
266 &self,
267 blob_id: &BlobId,
268 context_id: &ContextId,
269 ) -> eyre::Result<Vec<PeerId>> {
270 self.network_client
271 .query_blob(*blob_id, Some(*context_id))
272 .await
273 }
274
275 pub async fn announce_blob_to_network(
277 &self,
278 blob_id: &BlobId,
279 context_id: &ContextId,
280 size: u64,
281 ) -> eyre::Result<()> {
282 self.network_client
283 .announce_blob(*blob_id, *context_id, size)
284 .await
285 }
286
287 pub fn has_blob(&self, blob_id: &BlobId) -> eyre::Result<bool> {
288 self.blobstore.has(*blob_id)
289 }
290
291 pub fn list_blobs(&self) -> eyre::Result<Vec<BlobInfo>> {
298 let handle = self.datastore.clone().handle();
299
300 let iter_result = handle.iter::<key::BlobMeta>();
301 let mut iter = match iter_result {
302 Ok(iter) => iter,
303 Err(err) => {
304 tracing::error!("Failed to create blob iterator: {:?}", err);
305 bail!("Failed to iterate blob entries");
306 }
307 };
308
309 let mut chunk_blob_ids = std::collections::HashSet::new();
310
311 tracing::debug!("Starting first pass: collecting chunk blob IDs");
312 for result in iter.entries() {
313 match result {
314 (Ok(_blob_key), Ok(blob_meta)) => {
315 for link in blob_meta.links.iter() {
317 let _ = chunk_blob_ids.insert(link.blob_id());
318 }
319 }
320 (Err(err), _) | (_, Err(err)) => {
321 tracing::error!(
322 "Failed to read blob entry during chunk collection: {:?}",
323 err
324 );
325 bail!("Failed to read blob entries");
326 }
327 }
328 }
329
330 let handle2 = self.datastore.clone().handle();
331 let iter_result2 = handle2.iter::<key::BlobMeta>();
332 let mut iter2 = match iter_result2 {
333 Ok(iter) => iter,
334 Err(err) => {
335 tracing::error!("Failed to create second blob iterator: {:?}", err);
336 bail!("Failed to iterate blob entries");
337 }
338 };
339
340 let mut root_blobs = Vec::new();
341
342 tracing::debug!(
343 "Starting second pass: collecting root blobs (filtering {} chunks)",
344 chunk_blob_ids.len()
345 );
346 for result in iter2.entries() {
347 match result {
348 (Ok(blob_key), Ok(blob_meta)) => {
349 let blob_id = blob_key.blob_id();
350
351 if !chunk_blob_ids.contains(&blob_id) {
353 root_blobs.push(BlobInfo {
354 blob_id,
355 size: blob_meta.size,
356 });
357 }
358 }
359 (Err(err), _) | (_, Err(err)) => {
360 tracing::error!(
361 "Failed to read blob entry during root collection: {:?}",
362 err
363 );
364 bail!("Failed to read blob entries");
365 }
366 }
367 }
368
369 tracing::debug!(
370 "Listing complete: found {} chunks, returning {} root/standalone blobs",
371 chunk_blob_ids.len(),
372 root_blobs.len()
373 );
374
375 Ok(root_blobs)
376 }
377
378 pub async fn delete_blob(&self, blob_id: BlobId) -> eyre::Result<bool> {
383 let mut handle = self.datastore.clone().handle();
384 let blob_key = key::BlobMeta::new(blob_id);
385
386 let blob_meta = match handle.get(&blob_key) {
387 Ok(Some(meta)) => meta,
388 Ok(None) => {
389 bail!("Blob not found");
390 }
391 Err(err) => {
392 tracing::error!("Failed to get blob metadata {}: {:?}", blob_id, err);
393 bail!("Failed to access blob metadata: {}", err);
394 }
395 };
396
397 tracing::info!(
398 "Starting deletion for blob {} with {} linked chunks",
399 blob_id,
400 blob_meta.links.len()
401 );
402
403 let mut blobs_to_delete = vec![blob_id];
404 let mut deleted_metadata_count = 0;
405 let mut deleted_files_count = 0;
406
407 blobs_to_delete.extend(blob_meta.links.iter().map(|link| link.blob_id()));
408
409 for current_blob_id in &blobs_to_delete {
411 match self.blobstore.delete(*current_blob_id).await {
412 Ok(true) => {
413 deleted_files_count += 1;
414 tracing::debug!("Successfully deleted blob file {}", current_blob_id);
415 }
416 Ok(false) => {
417 tracing::debug!("Blob file {} was already missing", current_blob_id);
418 }
419 Err(err) => {
420 tracing::warn!("Failed to delete blob file {}: {}", current_blob_id, err);
421 }
423 }
424 }
425
426 for current_blob_id in blobs_to_delete {
428 let current_key = key::BlobMeta::new(current_blob_id);
429
430 match handle.delete(¤t_key) {
431 Ok(()) => {
432 deleted_metadata_count += 1;
433 tracing::debug!("Successfully deleted metadata for blob {}", current_blob_id);
434 }
435 Err(err) => {
436 tracing::warn!(
437 "Failed to delete metadata for blob {}: {}",
438 current_blob_id,
439 err
440 );
441 }
442 }
443 }
444
445 if deleted_metadata_count > 0 {
446 tracing::info!(
447 "Successfully deleted {} blob metadata entries and {} blob files",
448 deleted_metadata_count,
449 deleted_files_count
450 );
451 Ok(true)
452 } else {
453 bail!("Failed to delete any blob metadata");
454 }
455 }
456
457 pub async fn get_blob_info(&self, blob_id: BlobId) -> eyre::Result<Option<BlobMetadata>> {
462 let handle = self.datastore.clone().handle();
463 let blob_key = key::BlobMeta::new(blob_id);
464
465 match handle.get(&blob_key) {
466 Ok(Some(blob_meta)) => {
467 let mime_type = self
468 .detect_blob_mime_type(blob_id)
469 .await
470 .unwrap_or_else(|| "application/octet-stream".to_owned());
471
472 Ok(Some(BlobMetadata {
473 blob_id,
474 size: blob_meta.size,
475 hash: blob_meta.hash,
476 mime_type,
477 }))
478 }
479 Ok(None) => Ok(None),
480 Err(err) => {
481 tracing::error!("Failed to get blob metadata: {:?}", err);
482 bail!("Failed to retrieve blob metadata: {}", err);
483 }
484 }
485 }
486
487 pub async fn detect_blob_mime_type(&self, blob_id: BlobId) -> Option<String> {
489 match self.get_blob(&blob_id, None).await {
490 Ok(Some(mut blob_stream)) => {
491 if let Some(Ok(first_chunk)) = blob_stream.next().await {
492 let bytes = first_chunk.as_ref();
493 let sample_size = std::cmp::min(bytes.len(), 512);
494 return Some(detect_mime_from_bytes(&bytes[..sample_size]).to_owned());
495 }
496 }
497 Ok(None) => {
498 tracing::warn!("Blob {} not found for MIME detection", blob_id);
499 }
500 Err(err) => {
501 tracing::warn!(
502 "Failed to read blob {} for MIME detection: {:?}",
503 blob_id,
504 err
505 );
506 }
507 }
508
509 None
510 }
511}
512
513fn detect_mime_from_bytes(bytes: &[u8]) -> &'static str {
515 if let Some(kind) = infer::get(bytes) {
516 return kind.mime_type();
517 }
518
519 "application/octet-stream"
520}