Skip to main content

calimero_node_primitives/client/
blob.rs

1use std::sync::Arc;
2
3use calimero_blobstore::{Blob, Size};
4use calimero_network_primitives::blob_types::{BlobAuth, BlobAuthPayload};
5use calimero_primitives::{
6    blobs::{BlobId, BlobInfo, BlobMetadata},
7    common::DIGEST_SIZE,
8    context::ContextId,
9    hash::Hash,
10    identity::{PrivateKey, PublicKey},
11};
12use calimero_store::key;
13use calimero_store::layer::LayerExt;
14use eyre::bail;
15use futures_util::{AsyncRead, StreamExt};
16use libp2p::PeerId;
17use std::time::{SystemTime, UNIX_EPOCH};
18use tracing::{debug, error, trace};
19
20use super::NodeClient;
21use crate::messages::get_blob_bytes::GetBlobBytesRequest;
22use crate::messages::NodeMessage::GetBlobBytes;
23
24impl NodeClient {
25    // todo! maybe this should be an actor method?
26    // todo! so we can cache the blob in case it's
27    // todo! to be immediately used? might require
28    // todo! refactoring the blobstore API
29    pub async fn add_blob<S: AsyncRead>(
30        &self,
31        stream: S,
32        expected_size: Option<u64>,
33        expected_hash: Option<&Hash>,
34    ) -> eyre::Result<(BlobId, u64)> {
35        debug!(
36            expected_size,
37            has_expected_hash = expected_hash.is_some(),
38            "add_blob invoked"
39        );
40
41        let (blob_id, hash, size) = match self
42            .blobstore
43            .put_sized(expected_size.map(Size::Exact), stream)
44            .await
45        {
46            Ok(result) => {
47                trace!(
48                    blob_id = %result.0,
49                    stored_size = result.2,
50                    hash = ?result.1,
51                    "blobstore.put_sized completed"
52                );
53                result
54            }
55            Err(err) => {
56                error!(error = ?err, "blobstore.put_sized failed");
57                return Err(err);
58            }
59        };
60
61        if matches!(expected_hash, Some(expected_hash) if hash != *expected_hash) {
62            bail!("fatal: blob hash mismatch");
63        }
64
65        if matches!(expected_size, Some(expected_size) if size != expected_size) {
66            bail!("fatal: blob size mismatch");
67        }
68
69        debug!(
70            %blob_id,
71            stored_size = size,
72            "add_blob completed successfully"
73        );
74
75        Ok((blob_id, size))
76    }
77
78    /// Get blob from local storage or network if context_id is provided
79    /// Returns a streaming Blob that can be used to read the data
80    pub async fn get_blob<'a>(
81        &'a self,
82        blob_id: &'a BlobId,
83        context_id: Option<&'a ContextId>,
84    ) -> eyre::Result<Option<Blob>> {
85        // First try to get locally
86        let Some(stream) = self.blobstore.get(*blob_id)? else {
87            // If no context provided or blob not found locally, return None
88            if context_id.is_none() {
89                return Ok(None);
90            }
91
92            // Try network discovery
93            let context_id = context_id.unwrap();
94            tracing::info!(
95                blob_id = %blob_id,
96                context_id = %context_id,
97                "Blob not found locally, attempting network discovery"
98            );
99
100            const MAX_RETRIES: usize = 3;
101            const RETRY_DELAY: core::time::Duration = core::time::Duration::from_secs(2);
102
103            for attempt in 1..=MAX_RETRIES {
104                tracing::debug!(
105                    blob_id = %blob_id,
106                    context_id = %context_id,
107                    attempt,
108                    max_attempts = MAX_RETRIES,
109                    "Attempting network discovery"
110                );
111
112                let peers = match self
113                    .network_client
114                    .query_blob(*blob_id, Some(*context_id))
115                    .await
116                {
117                    Ok(peers) => peers,
118                    Err(e) => {
119                        tracing::warn!(
120                            blob_id = %blob_id,
121                            context_id = %context_id,
122                            attempt,
123                            error = %e,
124                            "Failed to query DHT for blob"
125                        );
126                        if attempt < MAX_RETRIES {
127                            tokio::time::sleep(RETRY_DELAY).await;
128                            continue;
129                        }
130                        return Err(e);
131                    }
132                };
133
134                if peers.is_empty() {
135                    tracing::info!(
136                        blob_id = %blob_id,
137                        context_id = %context_id,
138                        attempt,
139                        "No peers found with blob"
140                    );
141                    if attempt < MAX_RETRIES {
142                        tokio::time::sleep(RETRY_DELAY).await;
143                        continue;
144                    }
145                    return Ok(None);
146                }
147
148                tracing::info!(
149                    blob_id = %blob_id,
150                    context_id = %context_id,
151                    peer_count = peers.len(),
152                    attempt,
153                    "Found {} peers with blob, attempting download", peers.len()
154                );
155
156                // Try to get the blob from each available peer
157                for (peer_index, peer_id) in peers.iter().enumerate() {
158                    tracing::debug!(
159                        peer_id = %peer_id,
160                        peer_index = peer_index + 1,
161                        total_peers = peers.len(),
162                        attempt,
163                        "Attempting to download blob from peer"
164                    );
165
166                    // Generate Authorization for the blob.
167                    let auth = self.create_blob_auth_for_context(context_id, blob_id)?;
168
169                    match self
170                        .network_client
171                        .request_blob(*blob_id, *context_id, *peer_id, auth)
172                        .await
173                    {
174                        Ok(Some(data)) => {
175                            tracing::info!(
176                                blob_id = %blob_id,
177                                peer_id = %peer_id,
178                                size = data.len(),
179                                attempt,
180                                "Successfully downloaded blob from network"
181                            );
182
183                            // Store the blob locally for future use
184                            let (blob_id_stored, _size) = self
185                                .add_blob(data.as_slice(), Some(data.len() as u64), None)
186                                .await?;
187
188                            // Verify we stored the correct blob
189                            if blob_id_stored != *blob_id {
190                                tracing::warn!(
191                                    expected = %blob_id,
192                                    actual = %blob_id_stored,
193                                    "Downloaded blob ID mismatch"
194                                );
195                                continue;
196                            }
197
198                            // Return the newly stored blob as a stream
199                            return self.blobstore.get(*blob_id);
200                        }
201                        Ok(None) => {
202                            tracing::debug!(
203                                peer_id = %peer_id,
204                                attempt,
205                                "Peer doesn't have the blob"
206                            );
207                        }
208                        Err(e) => {
209                            tracing::warn!(
210                                peer_id = %peer_id,
211                                error = %e,
212                                attempt,
213                                "Failed to download blob from peer"
214                            );
215                        }
216                    }
217                }
218
219                // If we reach here, all peers failed for this attempt
220                if attempt < MAX_RETRIES {
221                    tracing::info!(
222                        blob_id = %blob_id,
223                        context_id = %context_id,
224                        attempt,
225                        "All peers failed, retrying in {} seconds",
226                        RETRY_DELAY.as_secs()
227                    );
228                    tokio::time::sleep(RETRY_DELAY).await;
229                }
230            }
231
232            tracing::debug!(
233                blob_id = %blob_id,
234                context_id = %context_id,
235                max_attempts = MAX_RETRIES,
236                "Failed to download blob from any peer after all retry attempts"
237            );
238            return Ok(None);
239        };
240
241        Ok(Some(stream))
242    }
243
244    /// Get blob bytes from local storage with actor-based caching
245    /// Falls back to network download if context_id is provided and blob not found locally
246    pub async fn get_blob_bytes(
247        &self,
248        blob_id: &BlobId,
249        context_id: Option<&ContextId>,
250    ) -> eyre::Result<Option<Arc<[u8]>>> {
251        if **blob_id == [0; 32] {
252            return Ok(None);
253        }
254
255        let blob_id = *blob_id;
256
257        // Try NodeManager's cache first (checks cache, then blobstore if not cached, and updates cache)
258        // This ensures proper caching behavior and access tracking
259        let request = GetBlobBytesRequest { blob_id };
260        let (tx, rx) = tokio::sync::oneshot::channel();
261
262        // Use a short timeout to avoid hanging if NodeManager is unavailable
263        let send_result = tokio::time::timeout(
264            tokio::time::Duration::from_millis(10),
265            self.node_manager.send(GetBlobBytes {
266                request,
267                outcome: tx,
268            }),
269        )
270        .await;
271
272        if let Ok(Ok(())) = send_result {
273            // Node manager accepted the request, wait for response with timeout
274            match tokio::time::timeout(tokio::time::Duration::from_millis(100), rx).await {
275                Ok(Ok(Ok(response))) if response.bytes.is_some() => {
276                    return Ok(response.bytes);
277                }
278                Ok(Ok(Ok(_))) => {
279                    // NodeManager returned None (blob not found), fall through to direct blobstore
280                }
281                _ => {
282                    // Node manager didn't respond in time, fall through to direct blobstore
283                }
284            }
285        }
286
287        // Fallback to direct blobstore access if NodeManager is unavailable or blob not in cache
288        // This ensures we can still retrieve blobs even if NodeManager is down (e.g., in tests)
289        if let Some(mut stream) = self.blobstore.get(blob_id)? {
290            let mut data = Vec::new();
291            while let Some(chunk) = stream.next().await {
292                data.extend_from_slice(&chunk?);
293            }
294            return Ok(Some(data.into()));
295        }
296
297        // If not found locally and context_id provided, try network discovery
298        if let Some(context_id) = context_id {
299            let Some(mut blob) = self.get_blob(&blob_id, Some(context_id)).await? else {
300                return Ok(None);
301            };
302
303            let mut data = Vec::new();
304            while let Some(chunk) = blob.next().await {
305                data.extend_from_slice(&chunk?);
306            }
307
308            Ok(Some(data.into()))
309        } else {
310            // No context_id provided and blob not found locally
311            Ok(None)
312        }
313    }
314
315    /// Query the network for peers that have a specific blob
316    pub async fn find_blob_providers(
317        &self,
318        blob_id: &BlobId,
319        context_id: &ContextId,
320    ) -> eyre::Result<Vec<PeerId>> {
321        self.network_client
322            .query_blob(*blob_id, Some(*context_id))
323            .await
324    }
325
326    /// Announce a blob to the network for discovery
327    pub async fn announce_blob_to_network(
328        &self,
329        blob_id: &BlobId,
330        context_id: &ContextId,
331        size: u64,
332    ) -> eyre::Result<()> {
333        self.network_client
334            .announce_blob(*blob_id, *context_id, size)
335            .await
336    }
337
338    pub fn has_blob(&self, blob_id: &BlobId) -> eyre::Result<bool> {
339        self.blobstore.has(*blob_id)
340    }
341
342    /// List all root blobs
343    ///
344    /// Returns a list of all root blob IDs and their metadata. Root blobs are either:
345    /// - Blobs that contain links to chunks (segmented large files)
346    /// - Standalone blobs that aren't referenced as chunks by other blobs
347    /// This excludes individual chunk blobs to provide a cleaner user experience.
348    pub fn list_blobs(&self) -> eyre::Result<Vec<BlobInfo>> {
349        let handle = self.datastore.clone().handle();
350
351        let iter_result = handle.iter::<key::BlobMeta>();
352        let mut iter = match iter_result {
353            Ok(iter) => iter,
354            Err(err) => {
355                tracing::error!("Failed to create blob iterator: {:?}", err);
356                bail!("Failed to iterate blob entries");
357            }
358        };
359
360        let mut chunk_blob_ids = std::collections::HashSet::new();
361
362        tracing::debug!("Starting first pass: collecting chunk blob IDs");
363        for result in iter.entries() {
364            match result {
365                (Ok(_blob_key), Ok(blob_meta)) => {
366                    // Only collect chunk IDs, not full blob info
367                    for link in &blob_meta.links {
368                        let _ = chunk_blob_ids.insert(link.blob_id());
369                    }
370                }
371                (Err(err), _) | (_, Err(err)) => {
372                    tracing::error!(
373                        "Failed to read blob entry during chunk collection: {:?}",
374                        err
375                    );
376                    bail!("Failed to read blob entries");
377                }
378            }
379        }
380
381        let handle2 = self.datastore.clone().handle();
382        let iter_result2 = handle2.iter::<key::BlobMeta>();
383        let mut iter2 = match iter_result2 {
384            Ok(iter) => iter,
385            Err(err) => {
386                tracing::error!("Failed to create second blob iterator: {:?}", err);
387                bail!("Failed to iterate blob entries");
388            }
389        };
390
391        let mut root_blobs = Vec::new();
392
393        tracing::debug!(
394            "Starting second pass: collecting root blobs (filtering {} chunks)",
395            chunk_blob_ids.len()
396        );
397        for result in iter2.entries() {
398            match result {
399                (Ok(blob_key), Ok(blob_meta)) => {
400                    let blob_id = blob_key.blob_id();
401
402                    // Only include if it's not a chunk blob
403                    if !chunk_blob_ids.contains(&blob_id) {
404                        root_blobs.push(BlobInfo {
405                            blob_id,
406                            size: blob_meta.size,
407                        });
408                    }
409                }
410                (Err(err), _) | (_, Err(err)) => {
411                    tracing::error!(
412                        "Failed to read blob entry during root collection: {:?}",
413                        err
414                    );
415                    bail!("Failed to read blob entries");
416                }
417            }
418        }
419
420        tracing::debug!(
421            "Listing complete: found {} chunks, returning {} root/standalone blobs",
422            chunk_blob_ids.len(),
423            root_blobs.len()
424        );
425
426        Ok(root_blobs)
427    }
428
429    /// Delete a blob by its ID
430    ///
431    /// Removes blob metadata from database and deletes the actual blob files.
432    /// This includes all associated chunk files for large blobs.
433    pub async fn delete_blob(&self, blob_id: BlobId) -> eyre::Result<bool> {
434        let mut handle = self.datastore.clone().handle();
435        let blob_key = key::BlobMeta::new(blob_id);
436
437        let blob_meta = match handle.get(&blob_key) {
438            Ok(Some(meta)) => meta,
439            Ok(None) => {
440                bail!("Blob not found");
441            }
442            Err(err) => {
443                tracing::error!("Failed to get blob metadata {}: {:?}", blob_id, err);
444                bail!("Failed to access blob metadata: {}", err);
445            }
446        };
447
448        tracing::info!(
449            "Starting deletion for blob {} with {} linked chunks",
450            blob_id,
451            blob_meta.links.len()
452        );
453
454        let mut blobs_to_delete = vec![blob_id];
455        let mut deleted_metadata_count = 0;
456        let mut deleted_files_count = 0;
457
458        blobs_to_delete.extend(blob_meta.links.iter().map(key::BlobMeta::blob_id));
459
460        // Delete blob files first
461        for current_blob_id in &blobs_to_delete {
462            match self.blobstore.delete(*current_blob_id).await {
463                Ok(true) => {
464                    deleted_files_count += 1;
465                    tracing::debug!("Successfully deleted blob file {}", current_blob_id);
466                }
467                Ok(false) => {
468                    tracing::debug!("Blob file {} was already missing", current_blob_id);
469                }
470                Err(err) => {
471                    tracing::warn!("Failed to delete blob file {}: {}", current_blob_id, err);
472                    // Continue with metadata deletion even if file deletion fails
473                }
474            }
475        }
476
477        // Delete metadata
478        for current_blob_id in blobs_to_delete {
479            let current_key = key::BlobMeta::new(current_blob_id);
480
481            match handle.delete(&current_key) {
482                Ok(()) => {
483                    deleted_metadata_count += 1;
484                    tracing::debug!("Successfully deleted metadata for blob {}", current_blob_id);
485                }
486                Err(err) => {
487                    tracing::warn!(
488                        "Failed to delete metadata for blob {}: {}",
489                        current_blob_id,
490                        err
491                    );
492                }
493            }
494        }
495
496        if deleted_metadata_count > 0 {
497            tracing::info!(
498                "Successfully deleted {} blob metadata entries and {} blob files",
499                deleted_metadata_count,
500                deleted_files_count
501            );
502            Ok(true)
503        } else {
504            bail!("Failed to delete any blob metadata");
505        }
506    }
507
508    /// Get blob metadata
509    ///
510    /// Returns blob metadata including size, hash, and detected MIME type.
511    /// This is efficient for checking blob existence and getting metadata info.
512    pub async fn get_blob_info(&self, blob_id: BlobId) -> eyre::Result<Option<BlobMetadata>> {
513        let handle = self.datastore.clone().handle();
514        let blob_key = key::BlobMeta::new(blob_id);
515
516        match handle.get(&blob_key) {
517            Ok(Some(blob_meta)) => {
518                let mime_type = self
519                    .detect_blob_mime_type(blob_id)
520                    .await
521                    .unwrap_or_else(|| "application/octet-stream".to_owned());
522
523                Ok(Some(BlobMetadata {
524                    blob_id,
525                    size: blob_meta.size,
526                    hash: blob_meta.hash,
527                    mime_type,
528                }))
529            }
530            Ok(None) => Ok(None),
531            Err(err) => {
532                tracing::error!("Failed to get blob metadata: {:?}", err);
533                bail!("Failed to retrieve blob metadata: {}", err);
534            }
535        }
536    }
537
538    /// Detect MIME type by reading the first few bytes of a blob
539    pub async fn detect_blob_mime_type(&self, blob_id: BlobId) -> Option<String> {
540        match self.get_blob(&blob_id, None).await {
541            Ok(Some(mut blob_stream)) => {
542                if let Some(Ok(first_chunk)) = blob_stream.next().await {
543                    let bytes = first_chunk.as_ref();
544                    let sample_size = core::cmp::min(bytes.len(), 512);
545                    return Some(detect_mime_from_bytes(&bytes[..sample_size]).to_owned());
546                }
547            }
548            Ok(None) => {
549                tracing::warn!("Blob {} not found for MIME detection", blob_id);
550            }
551            Err(err) => {
552                tracing::warn!(
553                    "Failed to read blob {} for MIME detection: {:?}",
554                    blob_id,
555                    err
556                );
557            }
558        }
559
560        None
561    }
562
563    /// Helper to find an identity in the datastore for which the node possesses the private key.
564    pub fn find_owned_identity(
565        &self,
566        context_id: &ContextId,
567    ) -> eyre::Result<Option<(PublicKey, PrivateKey)>> {
568        let handle = self.datastore.clone().handle();
569        let start_key = key::ContextIdentity::new(*context_id, [0u8; DIGEST_SIZE].into());
570        let mut iter = handle.iter::<key::ContextIdentity>()?;
571        let first = iter.seek(start_key).transpose();
572
573        for key in first.into_iter().chain(iter.keys()) {
574            let key = key?;
575            if key.context_id() != *context_id {
576                break;
577            }
578
579            if let Some(val) = handle.get(&key)? {
580                if let Some(pk_bytes) = val.private_key {
581                    return Ok(Some((key.public_key(), PrivateKey::from(pk_bytes))));
582                }
583            }
584        }
585        Ok(None)
586    }
587
588    /// Generates the `BlobAuth` authentication structure by creating a payload envelope and signing it.
589    ///
590    /// # Arguments
591    /// * `blob_id` - The ID of the blob being requested.
592    /// * `context_id` - The context context the blob belongs to.
593    /// * `public_key` - The public key of the requester that is a member of the context.
594    /// * `private_key` - The private key used to sign the request.
595    pub fn create_blob_auth(
596        &self,
597        blob_id: &BlobId,
598        context_id: &ContextId,
599        public_key: PublicKey,
600        private_key: &PrivateKey,
601    ) -> eyre::Result<BlobAuth> {
602        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
603
604        // Construct the Envelope Payload
605        let payload = BlobAuthPayload {
606            blob_id: *blob_id.digest(),
607            context_id: *context_id.digest(),
608            timestamp,
609        };
610
611        // Serialize the envelope using Borsh
612        let message = borsh::to_vec(&payload)?;
613
614        // Sign the serialized envelope
615        let signature = private_key
616            .sign(&message)
617            .map_err(|e| eyre::eyre!("Signing failed: {}", e))?;
618
619        Ok(BlobAuth {
620            public_key,
621            signature: signature.to_bytes(),
622            timestamp,
623        })
624    }
625
626    /// A helper function that finds identity from store and creates blob authentication struct.
627    ///
628    /// Attempts to find a local identity for the context. If found, generates a signature.
629    /// If not found, returns `None` (which implies a public access request).
630    /// # Returns
631    /// * `Ok(Some(blob_auth))` - if the local identity was found and blob authentication struct
632    ///   was successfully created.
633    /// * `Ok(None)` - if the node doesn't own any identity for the given context.
634    /// * `Err` - if some internal error occured (e.g. DB error, serialization, etc).
635    pub fn create_blob_auth_for_context(
636        &self,
637        context_id: &ContextId,
638        blob_id: &BlobId,
639    ) -> eyre::Result<Option<BlobAuth>> {
640        if let Some((public_key, private_key)) = self.find_owned_identity(context_id)? {
641            let auth = self.create_blob_auth(blob_id, context_id, public_key, &private_key)?;
642            Ok(Some(auth))
643        } else {
644            Ok(None)
645        }
646    }
647}
648
649/// Detect MIME type from file bytes using the infer crate
650fn detect_mime_from_bytes(bytes: &[u8]) -> &'static str {
651    if let Some(kind) = infer::get(bytes) {
652        return kind.mime_type();
653    }
654
655    "application/octet-stream"
656}