use std::time::{Duration, SystemTime, UNIX_EPOCH};
use calimero_context_primitives::client::ContextClient;
use calimero_network_primitives::{
blob_types::{BlobAuthPayload, BlobChunk, BlobRequest, BlobResponse},
stream::{Message as StreamMessage, Stream},
};
use calimero_node_primitives::client::NodeClient;
use calimero_primitives::blobs::BlobId;
use futures_util::{SinkExt, StreamExt};
use libp2p::PeerId;
use tokio::time::{sleep, timeout};
use tracing::{debug, error, info, warn};
const BLOB_SERVE_TIMEOUT: Duration = Duration::from_secs(300); const CHUNK_SEND_TIMEOUT: Duration = Duration::from_secs(30); const FLOW_CONTROL_DELAY: Duration = Duration::from_millis(10);
const MAX_REQUEST_AGE_SECS: u64 = 30;
const MAX_REQUEST_FUTURE_AGE_SECS: u64 = 10;
pub async fn handle_blob_protocol_stream(
node_client: NodeClient,
context_client: ContextClient,
peer_id: PeerId,
mut stream: Box<Stream>,
) -> eyre::Result<()> {
info!(%peer_id, "Starting blob protocol stream handler");
let first_message = match stream.next().await {
Some(Ok(msg)) => msg,
Some(Err(e)) => {
debug!(%peer_id, error = %e, "Error reading blob request from stream");
return Err(e.into());
}
None => {
debug!(%peer_id, "Blob protocol stream closed immediately");
return Ok(());
}
};
let blob_request = serde_json::from_slice::<BlobRequest>(&first_message.data)
.map_err(|e| eyre::eyre!("Failed to parse blob request: {}", e))?;
if !is_blob_access_authorized(&context_client, &blob_request).await? {
let response = BlobResponse {
found: false,
size: None,
};
let response_data = serde_json::to_vec(&response)?;
timeout(
CHUNK_SEND_TIMEOUT,
stream.send(StreamMessage::new(response_data)),
)
.await
.map_err(|_| eyre::eyre!("Timeout sending auth rejection"))??;
return Ok(());
}
handle_blob_request_stream(node_client, peer_id, blob_request, stream).await
}
async fn handle_blob_request_stream(
node_client: NodeClient,
peer_id: PeerId,
blob_request: BlobRequest,
mut stream: Box<Stream>,
) -> eyre::Result<()> {
info!(
%peer_id,
blob_id = blob_request.blob_id.as_str(),
context_id = blob_request.context_id.as_str(),
"Processing blob request stream using binary chunk protocol"
);
let serve_result = timeout(BLOB_SERVE_TIMEOUT, async {
info!(%peer_id, blob_id = %blob_request.blob_id, "Attempting to get blob from local storage");
let blob_stream = node_client
.get_blob(&BlobId::from(blob_request.blob_id), None)
.await?;
let (response, blob_stream) = if let Some(blob_stream) = blob_stream {
info!(%peer_id, "Blob found, will stream chunks");
let blob_metadata = node_client
.get_blob_info(BlobId::from(blob_request.blob_id))
.await?;
let total_size = blob_metadata.map(|meta| meta.size).unwrap_or(0);
let response = BlobResponse {
found: true,
size: Some(total_size),
};
(response, Some(blob_stream))
} else {
info!(%peer_id, "Blob not found");
let response = BlobResponse {
found: false,
size: None,
};
(response, None)
};
let response_data = serde_json::to_vec(&response)
.map_err(|e| eyre::eyre!("Failed to serialize blob response: {}", e))?;
timeout(
CHUNK_SEND_TIMEOUT,
stream.send(StreamMessage::new(response_data)),
)
.await
.map_err(|_| eyre::eyre!("Timeout sending response"))?
.map_err(|e| eyre::eyre!("Failed to send blob response: {}", e))?;
if response.found {
let mut blob_stream = blob_stream.expect("Blob stream should exist since response.found is true");
debug!(%peer_id, "Starting to stream blob chunks");
let mut chunk_count = 0;
let mut total_bytes_sent = 0;
while let Some(chunk_result) = blob_stream.next().await {
match chunk_result {
Ok(chunk) => {
chunk_count += 1;
total_bytes_sent += chunk.len();
debug!(
%peer_id,
chunk_number = chunk_count,
chunk_size = chunk.len(),
total_sent = total_bytes_sent,
"Sending blob chunk"
);
let blob_chunk = BlobChunk {
data: chunk.to_vec(),
};
let chunk_data = borsh::to_vec(&blob_chunk)
.map_err(|e| eyre::eyre!("Failed to serialize blob chunk: {}", e))?;
debug!(
%peer_id,
chunk_number = chunk_count,
original_chunk_size = chunk.len(),
binary_message_size = chunk_data.len(),
"Sending binary chunk data"
);
timeout(
CHUNK_SEND_TIMEOUT,
stream.send(StreamMessage::new(chunk_data)),
)
.await
.map_err(|_| eyre::eyre!("Timeout sending chunk {}", chunk_count))?
.map_err(|e| eyre::eyre!("Failed to send blob chunk: {}", e))?;
if chunk_count % 10 == 0 {
sleep(FLOW_CONTROL_DELAY).await;
}
}
Err(e) => {
warn!(%peer_id, error = %e, "Failed to read blob chunk");
return Err(eyre::eyre!("Failed to read blob chunk: {}", e));
}
}
}
let final_chunk = BlobChunk {
data: Vec::new(),
};
let final_chunk_data = borsh::to_vec(&final_chunk)
.map_err(|e| eyre::eyre!("Failed to serialize final chunk: {}", e))?;
timeout(
CHUNK_SEND_TIMEOUT,
stream.send(StreamMessage::new(final_chunk_data)),
)
.await
.map_err(|_| eyre::eyre!("Timeout sending final chunk"))?
.map_err(|e| eyre::eyre!("Failed to send final blob chunk: {}", e))?;
debug!(
%peer_id,
total_chunks = chunk_count + 1, total_bytes = total_bytes_sent,
"Successfully streamed all blob chunks"
);
}
debug!(%peer_id, "Blob request stream handled successfully");
Ok(())
})
.await;
match serve_result {
Ok(result) => result,
Err(_) => {
warn!(
%peer_id,
blob_id = blob_request.blob_id.as_str(),
timeout_secs = BLOB_SERVE_TIMEOUT.as_secs(),
"Blob serving timed out"
);
Err(eyre::eyre!("Blob serving timed out"))
}
}
}
async fn is_blob_access_authorized(
context_client: &ContextClient,
request: &BlobRequest,
) -> eyre::Result<bool> {
let context_config = match context_client.context_config(&request.context_id) {
Ok(Some(cfg)) => cfg,
Ok(None) => {
warn!(context_id=%request.context_id, "Context config not found locally. Denying blob access.");
return Ok(false);
}
Err(e) => return Err(e),
};
let external_client = context_client.external_client(&request.context_id, &context_config)?;
let app_config = external_client.config().application().await;
if let Ok(app) = app_config {
let requested_blob = BlobId::from(request.blob_id);
if requested_blob == app.blob.bytecode || requested_blob == app.blob.compiled {
debug!(blob_id=%request.blob_id, "Access granted: Blob is public Application Bundle");
return Ok(true);
}
} else {
warn!("Failed to fetch application config to verify public blob.");
}
let auth = match &request.auth {
Some(auth_struct) => auth_struct,
None => return Ok(false),
};
let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
if auth.timestamp < now.saturating_sub(MAX_REQUEST_AGE_SECS)
|| auth.timestamp > now.saturating_add(MAX_REQUEST_FUTURE_AGE_SECS)
{
return Ok(false);
}
let payload = BlobAuthPayload {
blob_id: *request.blob_id,
context_id: *request.context_id,
timestamp: auth.timestamp,
};
let message = borsh::to_vec(&payload)?;
if auth
.public_key
.verify_raw_signature(&message, &auth.signature)
.is_err()
{
error!(blob_id=%request.blob_id, "The blob request had an auth header, but the signature is incorrect.");
return Ok(false);
}
let is_member = context_client.has_member(&request.context_id, &auth.public_key)?;
if !is_member {
error!(
blob_id=%request.blob_id,
%request.context_id,
%auth.public_key,
"The blob request had an auth header, but the identity is not a member of the context."
);
}
Ok(is_member)
}