calimero_node_primitives/client/
blob.rs

1use std::sync::Arc;
2
3use calimero_blobstore::{Blob, Size};
4use calimero_primitives::blobs::{BlobId, BlobInfo, BlobMetadata};
5use calimero_primitives::context::ContextId;
6use calimero_primitives::hash::Hash;
7use calimero_store::key;
8use calimero_store::layer::LayerExt;
9use eyre::bail;
10use futures_util::{AsyncRead, StreamExt};
11use libp2p::PeerId;
12
13use super::NodeClient;
14use crate::messages::get_blob_bytes::GetBlobBytesRequest;
15use crate::messages::NodeMessage::GetBlobBytes;
16
17impl NodeClient {
18    // todo! maybe this should be an actor method?
19    // todo! so we can cache the blob in case it's
20    // todo! to be immediately used? might require
21    // todo! refactoring the blobstore API
22    pub async fn add_blob<S: AsyncRead>(
23        &self,
24        stream: S,
25        expected_size: Option<u64>,
26        expected_hash: Option<&Hash>,
27    ) -> eyre::Result<(BlobId, u64)> {
28        let (blob_id, hash, size) = self
29            .blobstore
30            .put_sized(expected_size.map(Size::Exact), stream)
31            .await?;
32
33        if matches!(expected_hash, Some(expected_hash) if hash != *expected_hash) {
34            bail!("fatal: blob hash mismatch");
35        }
36
37        if matches!(expected_size, Some(expected_size) if size != expected_size) {
38            bail!("fatal: blob size mismatch");
39        }
40
41        Ok((blob_id, size))
42    }
43
44    /// Get blob from local storage or network if context_id is provided
45    /// Returns a streaming Blob that can be used to read the data
46    pub fn get_blob<'a>(
47        &'a self,
48        blob_id: &'a BlobId,
49        context_id: Option<&'a ContextId>,
50    ) -> impl std::future::Future<Output = eyre::Result<Option<Blob>>> + 'a {
51        async move {
52            // First try to get locally
53            let Some(stream) = self.blobstore.get(*blob_id)? else {
54                // If no context provided or blob not found locally, return None
55                if context_id.is_none() {
56                    return Ok(None);
57                }
58
59                // Try network discovery
60                let context_id = context_id.unwrap();
61                tracing::info!(
62                    blob_id = %blob_id,
63                    context_id = %context_id,
64                    "Blob not found locally, attempting network discovery"
65                );
66
67                const MAX_RETRIES: usize = 3;
68                const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(2);
69
70                for attempt in 1..=MAX_RETRIES {
71                    tracing::debug!(
72                        blob_id = %blob_id,
73                        context_id = %context_id,
74                        attempt,
75                        max_attempts = MAX_RETRIES,
76                        "Attempting network discovery"
77                    );
78
79                    let peers = match self
80                        .network_client
81                        .query_blob(*blob_id, Some(*context_id))
82                        .await
83                    {
84                        Ok(peers) => peers,
85                        Err(e) => {
86                            tracing::warn!(
87                                blob_id = %blob_id,
88                                context_id = %context_id,
89                                attempt,
90                                error = %e,
91                                "Failed to query DHT for blob"
92                            );
93                            if attempt < MAX_RETRIES {
94                                tokio::time::sleep(RETRY_DELAY).await;
95                                continue;
96                            }
97                            return Err(e);
98                        }
99                    };
100
101                    if peers.is_empty() {
102                        tracing::info!(
103                            blob_id = %blob_id,
104                            context_id = %context_id,
105                            attempt,
106                            "No peers found with blob"
107                        );
108                        if attempt < MAX_RETRIES {
109                            tokio::time::sleep(RETRY_DELAY).await;
110                            continue;
111                        }
112                        return Ok(None);
113                    }
114
115                    tracing::info!(
116                        blob_id = %blob_id,
117                        context_id = %context_id,
118                        peer_count = peers.len(),
119                        attempt,
120                        "Found {} peers with blob, attempting download", peers.len()
121                    );
122
123                    // Try to get the blob from each available peer
124                    for (peer_index, peer_id) in peers.iter().enumerate() {
125                        tracing::debug!(
126                            peer_id = %peer_id,
127                            peer_index = peer_index + 1,
128                            total_peers = peers.len(),
129                            attempt,
130                            "Attempting to download blob from peer"
131                        );
132
133                        match self
134                            .network_client
135                            .request_blob(*blob_id, *context_id, *peer_id)
136                            .await
137                        {
138                            Ok(Some(data)) => {
139                                tracing::info!(
140                                    blob_id = %blob_id,
141                                    peer_id = %peer_id,
142                                    size = data.len(),
143                                    attempt,
144                                    "Successfully downloaded blob from network"
145                                );
146
147                                // Store the blob locally for future use
148                                let (blob_id_stored, _size) = self
149                                    .add_blob(data.as_slice(), Some(data.len() as u64), None)
150                                    .await?;
151
152                                // Verify we stored the correct blob
153                                if blob_id_stored != *blob_id {
154                                    tracing::warn!(
155                                        expected = %blob_id,
156                                        actual = %blob_id_stored,
157                                        "Downloaded blob ID mismatch"
158                                    );
159                                    continue;
160                                }
161
162                                // Return the newly stored blob as a stream
163                                return Ok(self.blobstore.get(*blob_id)?);
164                            }
165                            Ok(None) => {
166                                tracing::debug!(
167                                    peer_id = %peer_id,
168                                    attempt,
169                                    "Peer doesn't have the blob"
170                                );
171                            }
172                            Err(e) => {
173                                tracing::warn!(
174                                    peer_id = %peer_id,
175                                    error = %e,
176                                    attempt,
177                                    "Failed to download blob from peer"
178                                );
179                            }
180                        }
181                    }
182
183                    // If we reach here, all peers failed for this attempt
184                    if attempt < MAX_RETRIES {
185                        tracing::info!(
186                            blob_id = %blob_id,
187                            context_id = %context_id,
188                            attempt,
189                            "All peers failed, retrying in {} seconds",
190                            RETRY_DELAY.as_secs()
191                        );
192                        tokio::time::sleep(RETRY_DELAY).await;
193                    }
194                }
195
196                tracing::debug!(
197                    blob_id = %blob_id,
198                    context_id = %context_id,
199                    max_attempts = MAX_RETRIES,
200                    "Failed to download blob from any peer after all retry attempts"
201                );
202                return Ok(None);
203            };
204
205            Ok(Some(stream))
206        }
207    }
208
209    /// Get blob bytes from local storage with actor-based caching
210    /// Falls back to network download if context_id is provided and blob not found locally
211    pub async fn get_blob_bytes(
212        &self,
213        blob_id: &BlobId,
214        context_id: Option<&ContextId>,
215    ) -> eyre::Result<Option<Arc<[u8]>>> {
216        if **blob_id == [0; 32] {
217            return Ok(None);
218        }
219
220        // First try to get from NodeManager's cache (for locally stored blobs)
221        let request = GetBlobBytesRequest { blob_id: *blob_id };
222
223        let (tx, rx) = tokio::sync::oneshot::channel();
224
225        match self
226            .node_manager
227            .send(GetBlobBytes {
228                request,
229                outcome: tx,
230            })
231            .await
232        {
233            Ok(_) => {
234                if let Ok(response) = rx.await {
235                    if let Ok(response) = response {
236                        if response.bytes.is_some() {
237                            return Ok(response.bytes);
238                        }
239                    }
240                }
241            }
242            Err(_) => {
243                // NodeManager not available, fallback to direct access
244            }
245        }
246
247        if let Some(context_id) = context_id {
248            let Some(mut blob) = self.get_blob(blob_id, Some(context_id)).await? else {
249                return Ok(None);
250            };
251
252            let mut data = Vec::new();
253            while let Some(chunk) = blob.next().await {
254                data.extend_from_slice(&chunk?);
255            }
256
257            Ok(Some(data.into()))
258        } else {
259            // No context_id provided and not in local cache
260            Ok(None)
261        }
262    }
263
264    /// Query the network for peers that have a specific blob
265    pub async fn find_blob_providers(
266        &self,
267        blob_id: &BlobId,
268        context_id: &ContextId,
269    ) -> eyre::Result<Vec<PeerId>> {
270        self.network_client
271            .query_blob(*blob_id, Some(*context_id))
272            .await
273    }
274
275    /// Announce a blob to the network for discovery
276    pub async fn announce_blob_to_network(
277        &self,
278        blob_id: &BlobId,
279        context_id: &ContextId,
280        size: u64,
281    ) -> eyre::Result<()> {
282        self.network_client
283            .announce_blob(*blob_id, *context_id, size)
284            .await
285    }
286
287    pub fn has_blob(&self, blob_id: &BlobId) -> eyre::Result<bool> {
288        self.blobstore.has(*blob_id)
289    }
290
291    /// List all root blobs
292    ///
293    /// Returns a list of all root blob IDs and their metadata. Root blobs are either:
294    /// - Blobs that contain links to chunks (segmented large files)
295    /// - Standalone blobs that aren't referenced as chunks by other blobs
296    /// This excludes individual chunk blobs to provide a cleaner user experience.
297    pub fn list_blobs(&self) -> eyre::Result<Vec<BlobInfo>> {
298        let handle = self.datastore.clone().handle();
299
300        let iter_result = handle.iter::<key::BlobMeta>();
301        let mut iter = match iter_result {
302            Ok(iter) => iter,
303            Err(err) => {
304                tracing::error!("Failed to create blob iterator: {:?}", err);
305                bail!("Failed to iterate blob entries");
306            }
307        };
308
309        let mut chunk_blob_ids = std::collections::HashSet::new();
310
311        tracing::debug!("Starting first pass: collecting chunk blob IDs");
312        for result in iter.entries() {
313            match result {
314                (Ok(_blob_key), Ok(blob_meta)) => {
315                    // Only collect chunk IDs, not full blob info
316                    for link in blob_meta.links.iter() {
317                        let _ = chunk_blob_ids.insert(link.blob_id());
318                    }
319                }
320                (Err(err), _) | (_, Err(err)) => {
321                    tracing::error!(
322                        "Failed to read blob entry during chunk collection: {:?}",
323                        err
324                    );
325                    bail!("Failed to read blob entries");
326                }
327            }
328        }
329
330        let handle2 = self.datastore.clone().handle();
331        let iter_result2 = handle2.iter::<key::BlobMeta>();
332        let mut iter2 = match iter_result2 {
333            Ok(iter) => iter,
334            Err(err) => {
335                tracing::error!("Failed to create second blob iterator: {:?}", err);
336                bail!("Failed to iterate blob entries");
337            }
338        };
339
340        let mut root_blobs = Vec::new();
341
342        tracing::debug!(
343            "Starting second pass: collecting root blobs (filtering {} chunks)",
344            chunk_blob_ids.len()
345        );
346        for result in iter2.entries() {
347            match result {
348                (Ok(blob_key), Ok(blob_meta)) => {
349                    let blob_id = blob_key.blob_id();
350
351                    // Only include if it's not a chunk blob
352                    if !chunk_blob_ids.contains(&blob_id) {
353                        root_blobs.push(BlobInfo {
354                            blob_id,
355                            size: blob_meta.size,
356                        });
357                    }
358                }
359                (Err(err), _) | (_, Err(err)) => {
360                    tracing::error!(
361                        "Failed to read blob entry during root collection: {:?}",
362                        err
363                    );
364                    bail!("Failed to read blob entries");
365                }
366            }
367        }
368
369        tracing::debug!(
370            "Listing complete: found {} chunks, returning {} root/standalone blobs",
371            chunk_blob_ids.len(),
372            root_blobs.len()
373        );
374
375        Ok(root_blobs)
376    }
377
378    /// Delete a blob by its ID
379    ///
380    /// Removes blob metadata from database and deletes the actual blob files.
381    /// This includes all associated chunk files for large blobs.
382    pub async fn delete_blob(&self, blob_id: BlobId) -> eyre::Result<bool> {
383        let mut handle = self.datastore.clone().handle();
384        let blob_key = key::BlobMeta::new(blob_id);
385
386        let blob_meta = match handle.get(&blob_key) {
387            Ok(Some(meta)) => meta,
388            Ok(None) => {
389                bail!("Blob not found");
390            }
391            Err(err) => {
392                tracing::error!("Failed to get blob metadata {}: {:?}", blob_id, err);
393                bail!("Failed to access blob metadata: {}", err);
394            }
395        };
396
397        tracing::info!(
398            "Starting deletion for blob {} with {} linked chunks",
399            blob_id,
400            blob_meta.links.len()
401        );
402
403        let mut blobs_to_delete = vec![blob_id];
404        let mut deleted_metadata_count = 0;
405        let mut deleted_files_count = 0;
406
407        blobs_to_delete.extend(blob_meta.links.iter().map(|link| link.blob_id()));
408
409        // Delete blob files first
410        for current_blob_id in &blobs_to_delete {
411            match self.blobstore.delete(*current_blob_id).await {
412                Ok(true) => {
413                    deleted_files_count += 1;
414                    tracing::debug!("Successfully deleted blob file {}", current_blob_id);
415                }
416                Ok(false) => {
417                    tracing::debug!("Blob file {} was already missing", current_blob_id);
418                }
419                Err(err) => {
420                    tracing::warn!("Failed to delete blob file {}: {}", current_blob_id, err);
421                    // Continue with metadata deletion even if file deletion fails
422                }
423            }
424        }
425
426        // Delete metadata
427        for current_blob_id in blobs_to_delete {
428            let current_key = key::BlobMeta::new(current_blob_id);
429
430            match handle.delete(&current_key) {
431                Ok(()) => {
432                    deleted_metadata_count += 1;
433                    tracing::debug!("Successfully deleted metadata for blob {}", current_blob_id);
434                }
435                Err(err) => {
436                    tracing::warn!(
437                        "Failed to delete metadata for blob {}: {}",
438                        current_blob_id,
439                        err
440                    );
441                }
442            }
443        }
444
445        if deleted_metadata_count > 0 {
446            tracing::info!(
447                "Successfully deleted {} blob metadata entries and {} blob files",
448                deleted_metadata_count,
449                deleted_files_count
450            );
451            Ok(true)
452        } else {
453            bail!("Failed to delete any blob metadata");
454        }
455    }
456
457    /// Get blob metadata
458    ///
459    /// Returns blob metadata including size, hash, and detected MIME type.
460    /// This is efficient for checking blob existence and getting metadata info.
461    pub async fn get_blob_info(&self, blob_id: BlobId) -> eyre::Result<Option<BlobMetadata>> {
462        let handle = self.datastore.clone().handle();
463        let blob_key = key::BlobMeta::new(blob_id);
464
465        match handle.get(&blob_key) {
466            Ok(Some(blob_meta)) => {
467                let mime_type = self
468                    .detect_blob_mime_type(blob_id)
469                    .await
470                    .unwrap_or_else(|| "application/octet-stream".to_owned());
471
472                Ok(Some(BlobMetadata {
473                    blob_id,
474                    size: blob_meta.size,
475                    hash: blob_meta.hash,
476                    mime_type,
477                }))
478            }
479            Ok(None) => Ok(None),
480            Err(err) => {
481                tracing::error!("Failed to get blob metadata: {:?}", err);
482                bail!("Failed to retrieve blob metadata: {}", err);
483            }
484        }
485    }
486
487    /// Detect MIME type by reading the first few bytes of a blob
488    pub async fn detect_blob_mime_type(&self, blob_id: BlobId) -> Option<String> {
489        match self.get_blob(&blob_id, None).await {
490            Ok(Some(mut blob_stream)) => {
491                if let Some(Ok(first_chunk)) = blob_stream.next().await {
492                    let bytes = first_chunk.as_ref();
493                    let sample_size = std::cmp::min(bytes.len(), 512);
494                    return Some(detect_mime_from_bytes(&bytes[..sample_size]).to_owned());
495                }
496            }
497            Ok(None) => {
498                tracing::warn!("Blob {} not found for MIME detection", blob_id);
499            }
500            Err(err) => {
501                tracing::warn!(
502                    "Failed to read blob {} for MIME detection: {:?}",
503                    blob_id,
504                    err
505                );
506            }
507        }
508
509        None
510    }
511}
512
513/// Detect MIME type from file bytes using the infer crate
514fn detect_mime_from_bytes(bytes: &[u8]) -> &'static str {
515    if let Some(kind) = infer::get(bytes) {
516        return kind.mime_type();
517    }
518
519    "application/octet-stream"
520}