1use std::sync::Arc;
2
3use calimero_blobstore::{Blob, Size};
4use calimero_network_primitives::blob_types::{BlobAuth, BlobAuthPayload};
5use calimero_primitives::{
6 blobs::{BlobId, BlobInfo, BlobMetadata},
7 common::DIGEST_SIZE,
8 context::ContextId,
9 hash::Hash,
10 identity::{PrivateKey, PublicKey},
11};
12use calimero_store::key;
13use calimero_store::layer::LayerExt;
14use eyre::bail;
15use futures_util::{AsyncRead, StreamExt};
16use libp2p::PeerId;
17use std::time::{SystemTime, UNIX_EPOCH};
18use tracing::{debug, error, trace};
19
20use super::NodeClient;
21use crate::messages::get_blob_bytes::GetBlobBytesRequest;
22use crate::messages::NodeMessage::GetBlobBytes;
23
24impl NodeClient {
25 pub async fn add_blob<S: AsyncRead>(
30 &self,
31 stream: S,
32 expected_size: Option<u64>,
33 expected_hash: Option<&Hash>,
34 ) -> eyre::Result<(BlobId, u64)> {
35 debug!(
36 expected_size,
37 has_expected_hash = expected_hash.is_some(),
38 "add_blob invoked"
39 );
40
41 let (blob_id, hash, size) = match self
42 .blobstore
43 .put_sized(expected_size.map(Size::Exact), stream)
44 .await
45 {
46 Ok(result) => {
47 trace!(
48 blob_id = %result.0,
49 stored_size = result.2,
50 hash = ?result.1,
51 "blobstore.put_sized completed"
52 );
53 result
54 }
55 Err(err) => {
56 error!(error = ?err, "blobstore.put_sized failed");
57 return Err(err);
58 }
59 };
60
61 if matches!(expected_hash, Some(expected_hash) if hash != *expected_hash) {
62 bail!("fatal: blob hash mismatch");
63 }
64
65 if matches!(expected_size, Some(expected_size) if size != expected_size) {
66 bail!("fatal: blob size mismatch");
67 }
68
69 debug!(
70 %blob_id,
71 stored_size = size,
72 "add_blob completed successfully"
73 );
74
75 Ok((blob_id, size))
76 }
77
78 pub async fn get_blob<'a>(
81 &'a self,
82 blob_id: &'a BlobId,
83 context_id: Option<&'a ContextId>,
84 ) -> eyre::Result<Option<Blob>> {
85 let Some(stream) = self.blobstore.get(*blob_id)? else {
87 if context_id.is_none() {
89 return Ok(None);
90 }
91
92 let context_id = context_id.unwrap();
94 tracing::info!(
95 blob_id = %blob_id,
96 context_id = %context_id,
97 "Blob not found locally, attempting network discovery"
98 );
99
100 const MAX_RETRIES: usize = 3;
101 const RETRY_DELAY: core::time::Duration = core::time::Duration::from_secs(2);
102
103 for attempt in 1..=MAX_RETRIES {
104 tracing::debug!(
105 blob_id = %blob_id,
106 context_id = %context_id,
107 attempt,
108 max_attempts = MAX_RETRIES,
109 "Attempting network discovery"
110 );
111
112 let peers = match self
113 .network_client
114 .query_blob(*blob_id, Some(*context_id))
115 .await
116 {
117 Ok(peers) => peers,
118 Err(e) => {
119 tracing::warn!(
120 blob_id = %blob_id,
121 context_id = %context_id,
122 attempt,
123 error = %e,
124 "Failed to query DHT for blob"
125 );
126 if attempt < MAX_RETRIES {
127 tokio::time::sleep(RETRY_DELAY).await;
128 continue;
129 }
130 return Err(e);
131 }
132 };
133
134 if peers.is_empty() {
135 tracing::info!(
136 blob_id = %blob_id,
137 context_id = %context_id,
138 attempt,
139 "No peers found with blob"
140 );
141 if attempt < MAX_RETRIES {
142 tokio::time::sleep(RETRY_DELAY).await;
143 continue;
144 }
145 return Ok(None);
146 }
147
148 tracing::info!(
149 blob_id = %blob_id,
150 context_id = %context_id,
151 peer_count = peers.len(),
152 attempt,
153 "Found {} peers with blob, attempting download", peers.len()
154 );
155
156 for (peer_index, peer_id) in peers.iter().enumerate() {
158 tracing::debug!(
159 peer_id = %peer_id,
160 peer_index = peer_index + 1,
161 total_peers = peers.len(),
162 attempt,
163 "Attempting to download blob from peer"
164 );
165
166 let auth = self.create_blob_auth_for_context(context_id, blob_id)?;
168
169 match self
170 .network_client
171 .request_blob(*blob_id, *context_id, *peer_id, auth)
172 .await
173 {
174 Ok(Some(data)) => {
175 tracing::info!(
176 blob_id = %blob_id,
177 peer_id = %peer_id,
178 size = data.len(),
179 attempt,
180 "Successfully downloaded blob from network"
181 );
182
183 let (blob_id_stored, _size) = self
185 .add_blob(data.as_slice(), Some(data.len() as u64), None)
186 .await?;
187
188 if blob_id_stored != *blob_id {
190 tracing::warn!(
191 expected = %blob_id,
192 actual = %blob_id_stored,
193 "Downloaded blob ID mismatch"
194 );
195 continue;
196 }
197
198 return self.blobstore.get(*blob_id);
200 }
201 Ok(None) => {
202 tracing::debug!(
203 peer_id = %peer_id,
204 attempt,
205 "Peer doesn't have the blob"
206 );
207 }
208 Err(e) => {
209 tracing::warn!(
210 peer_id = %peer_id,
211 error = %e,
212 attempt,
213 "Failed to download blob from peer"
214 );
215 }
216 }
217 }
218
219 if attempt < MAX_RETRIES {
221 tracing::info!(
222 blob_id = %blob_id,
223 context_id = %context_id,
224 attempt,
225 "All peers failed, retrying in {} seconds",
226 RETRY_DELAY.as_secs()
227 );
228 tokio::time::sleep(RETRY_DELAY).await;
229 }
230 }
231
232 tracing::debug!(
233 blob_id = %blob_id,
234 context_id = %context_id,
235 max_attempts = MAX_RETRIES,
236 "Failed to download blob from any peer after all retry attempts"
237 );
238 return Ok(None);
239 };
240
241 Ok(Some(stream))
242 }
243
244 pub async fn get_blob_bytes(
247 &self,
248 blob_id: &BlobId,
249 context_id: Option<&ContextId>,
250 ) -> eyre::Result<Option<Arc<[u8]>>> {
251 if **blob_id == [0; 32] {
252 return Ok(None);
253 }
254
255 let blob_id = *blob_id;
256
257 let request = GetBlobBytesRequest { blob_id };
260 let (tx, rx) = tokio::sync::oneshot::channel();
261
262 let send_result = tokio::time::timeout(
264 tokio::time::Duration::from_millis(10),
265 self.node_manager.send(GetBlobBytes {
266 request,
267 outcome: tx,
268 }),
269 )
270 .await;
271
272 if let Ok(Ok(())) = send_result {
273 match tokio::time::timeout(tokio::time::Duration::from_millis(100), rx).await {
275 Ok(Ok(Ok(response))) if response.bytes.is_some() => {
276 return Ok(response.bytes);
277 }
278 Ok(Ok(Ok(_))) => {
279 }
281 _ => {
282 }
284 }
285 }
286
287 if let Some(mut stream) = self.blobstore.get(blob_id)? {
290 let mut data = Vec::new();
291 while let Some(chunk) = stream.next().await {
292 data.extend_from_slice(&chunk?);
293 }
294 return Ok(Some(data.into()));
295 }
296
297 if let Some(context_id) = context_id {
299 let Some(mut blob) = self.get_blob(&blob_id, Some(context_id)).await? else {
300 return Ok(None);
301 };
302
303 let mut data = Vec::new();
304 while let Some(chunk) = blob.next().await {
305 data.extend_from_slice(&chunk?);
306 }
307
308 Ok(Some(data.into()))
309 } else {
310 Ok(None)
312 }
313 }
314
315 pub async fn find_blob_providers(
317 &self,
318 blob_id: &BlobId,
319 context_id: &ContextId,
320 ) -> eyre::Result<Vec<PeerId>> {
321 self.network_client
322 .query_blob(*blob_id, Some(*context_id))
323 .await
324 }
325
326 pub async fn announce_blob_to_network(
328 &self,
329 blob_id: &BlobId,
330 context_id: &ContextId,
331 size: u64,
332 ) -> eyre::Result<()> {
333 self.network_client
334 .announce_blob(*blob_id, *context_id, size)
335 .await
336 }
337
338 pub fn has_blob(&self, blob_id: &BlobId) -> eyre::Result<bool> {
339 self.blobstore.has(*blob_id)
340 }
341
342 pub fn list_blobs(&self) -> eyre::Result<Vec<BlobInfo>> {
349 let handle = self.datastore.clone().handle();
350
351 let iter_result = handle.iter::<key::BlobMeta>();
352 let mut iter = match iter_result {
353 Ok(iter) => iter,
354 Err(err) => {
355 tracing::error!("Failed to create blob iterator: {:?}", err);
356 bail!("Failed to iterate blob entries");
357 }
358 };
359
360 let mut chunk_blob_ids = std::collections::HashSet::new();
361
362 tracing::debug!("Starting first pass: collecting chunk blob IDs");
363 for result in iter.entries() {
364 match result {
365 (Ok(_blob_key), Ok(blob_meta)) => {
366 for link in &blob_meta.links {
368 let _ = chunk_blob_ids.insert(link.blob_id());
369 }
370 }
371 (Err(err), _) | (_, Err(err)) => {
372 tracing::error!(
373 "Failed to read blob entry during chunk collection: {:?}",
374 err
375 );
376 bail!("Failed to read blob entries");
377 }
378 }
379 }
380
381 let handle2 = self.datastore.clone().handle();
382 let iter_result2 = handle2.iter::<key::BlobMeta>();
383 let mut iter2 = match iter_result2 {
384 Ok(iter) => iter,
385 Err(err) => {
386 tracing::error!("Failed to create second blob iterator: {:?}", err);
387 bail!("Failed to iterate blob entries");
388 }
389 };
390
391 let mut root_blobs = Vec::new();
392
393 tracing::debug!(
394 "Starting second pass: collecting root blobs (filtering {} chunks)",
395 chunk_blob_ids.len()
396 );
397 for result in iter2.entries() {
398 match result {
399 (Ok(blob_key), Ok(blob_meta)) => {
400 let blob_id = blob_key.blob_id();
401
402 if !chunk_blob_ids.contains(&blob_id) {
404 root_blobs.push(BlobInfo {
405 blob_id,
406 size: blob_meta.size,
407 });
408 }
409 }
410 (Err(err), _) | (_, Err(err)) => {
411 tracing::error!(
412 "Failed to read blob entry during root collection: {:?}",
413 err
414 );
415 bail!("Failed to read blob entries");
416 }
417 }
418 }
419
420 tracing::debug!(
421 "Listing complete: found {} chunks, returning {} root/standalone blobs",
422 chunk_blob_ids.len(),
423 root_blobs.len()
424 );
425
426 Ok(root_blobs)
427 }
428
429 pub async fn delete_blob(&self, blob_id: BlobId) -> eyre::Result<bool> {
434 let mut handle = self.datastore.clone().handle();
435 let blob_key = key::BlobMeta::new(blob_id);
436
437 let blob_meta = match handle.get(&blob_key) {
438 Ok(Some(meta)) => meta,
439 Ok(None) => {
440 bail!("Blob not found");
441 }
442 Err(err) => {
443 tracing::error!("Failed to get blob metadata {}: {:?}", blob_id, err);
444 bail!("Failed to access blob metadata: {}", err);
445 }
446 };
447
448 tracing::info!(
449 "Starting deletion for blob {} with {} linked chunks",
450 blob_id,
451 blob_meta.links.len()
452 );
453
454 let mut blobs_to_delete = vec![blob_id];
455 let mut deleted_metadata_count = 0;
456 let mut deleted_files_count = 0;
457
458 blobs_to_delete.extend(blob_meta.links.iter().map(key::BlobMeta::blob_id));
459
460 for current_blob_id in &blobs_to_delete {
462 match self.blobstore.delete(*current_blob_id).await {
463 Ok(true) => {
464 deleted_files_count += 1;
465 tracing::debug!("Successfully deleted blob file {}", current_blob_id);
466 }
467 Ok(false) => {
468 tracing::debug!("Blob file {} was already missing", current_blob_id);
469 }
470 Err(err) => {
471 tracing::warn!("Failed to delete blob file {}: {}", current_blob_id, err);
472 }
474 }
475 }
476
477 for current_blob_id in blobs_to_delete {
479 let current_key = key::BlobMeta::new(current_blob_id);
480
481 match handle.delete(¤t_key) {
482 Ok(()) => {
483 deleted_metadata_count += 1;
484 tracing::debug!("Successfully deleted metadata for blob {}", current_blob_id);
485 }
486 Err(err) => {
487 tracing::warn!(
488 "Failed to delete metadata for blob {}: {}",
489 current_blob_id,
490 err
491 );
492 }
493 }
494 }
495
496 if deleted_metadata_count > 0 {
497 tracing::info!(
498 "Successfully deleted {} blob metadata entries and {} blob files",
499 deleted_metadata_count,
500 deleted_files_count
501 );
502 Ok(true)
503 } else {
504 bail!("Failed to delete any blob metadata");
505 }
506 }
507
508 pub async fn get_blob_info(&self, blob_id: BlobId) -> eyre::Result<Option<BlobMetadata>> {
513 let handle = self.datastore.clone().handle();
514 let blob_key = key::BlobMeta::new(blob_id);
515
516 match handle.get(&blob_key) {
517 Ok(Some(blob_meta)) => {
518 let mime_type = self
519 .detect_blob_mime_type(blob_id)
520 .await
521 .unwrap_or_else(|| "application/octet-stream".to_owned());
522
523 Ok(Some(BlobMetadata {
524 blob_id,
525 size: blob_meta.size,
526 hash: blob_meta.hash,
527 mime_type,
528 }))
529 }
530 Ok(None) => Ok(None),
531 Err(err) => {
532 tracing::error!("Failed to get blob metadata: {:?}", err);
533 bail!("Failed to retrieve blob metadata: {}", err);
534 }
535 }
536 }
537
538 pub async fn detect_blob_mime_type(&self, blob_id: BlobId) -> Option<String> {
540 match self.get_blob(&blob_id, None).await {
541 Ok(Some(mut blob_stream)) => {
542 if let Some(Ok(first_chunk)) = blob_stream.next().await {
543 let bytes = first_chunk.as_ref();
544 let sample_size = core::cmp::min(bytes.len(), 512);
545 return Some(detect_mime_from_bytes(&bytes[..sample_size]).to_owned());
546 }
547 }
548 Ok(None) => {
549 tracing::warn!("Blob {} not found for MIME detection", blob_id);
550 }
551 Err(err) => {
552 tracing::warn!(
553 "Failed to read blob {} for MIME detection: {:?}",
554 blob_id,
555 err
556 );
557 }
558 }
559
560 None
561 }
562
563 pub fn find_owned_identity(
565 &self,
566 context_id: &ContextId,
567 ) -> eyre::Result<Option<(PublicKey, PrivateKey)>> {
568 let handle = self.datastore.clone().handle();
569 let start_key = key::ContextIdentity::new(*context_id, [0u8; DIGEST_SIZE].into());
570 let mut iter = handle.iter::<key::ContextIdentity>()?;
571 let first = iter.seek(start_key).transpose();
572
573 for key in first.into_iter().chain(iter.keys()) {
574 let key = key?;
575 if key.context_id() != *context_id {
576 break;
577 }
578
579 if let Some(val) = handle.get(&key)? {
580 if let Some(pk_bytes) = val.private_key {
581 return Ok(Some((key.public_key(), PrivateKey::from(pk_bytes))));
582 }
583 }
584 }
585 Ok(None)
586 }
587
588 pub fn create_blob_auth(
596 &self,
597 blob_id: &BlobId,
598 context_id: &ContextId,
599 public_key: PublicKey,
600 private_key: &PrivateKey,
601 ) -> eyre::Result<BlobAuth> {
602 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
603
604 let payload = BlobAuthPayload {
606 blob_id: *blob_id.digest(),
607 context_id: *context_id.digest(),
608 timestamp,
609 };
610
611 let message = borsh::to_vec(&payload)?;
613
614 let signature = private_key
616 .sign(&message)
617 .map_err(|e| eyre::eyre!("Signing failed: {}", e))?;
618
619 Ok(BlobAuth {
620 public_key,
621 signature: signature.to_bytes(),
622 timestamp,
623 })
624 }
625
626 pub fn create_blob_auth_for_context(
636 &self,
637 context_id: &ContextId,
638 blob_id: &BlobId,
639 ) -> eyre::Result<Option<BlobAuth>> {
640 if let Some((public_key, private_key)) = self.find_owned_identity(context_id)? {
641 let auth = self.create_blob_auth(blob_id, context_id, public_key, &private_key)?;
642 Ok(Some(auth))
643 } else {
644 Ok(None)
645 }
646 }
647}
648
649fn detect_mime_from_bytes(bytes: &[u8]) -> &'static str {
651 if let Some(kind) = infer::get(bytes) {
652 return kind.mime_type();
653 }
654
655 "application/octet-stream"
656}