calimero_runtime/logic/host_functions/
blobs.rs1use std::io::{Cursor, Error, Read};
2
3use calimero_primitives::{blobs::BlobId, context::ContextId};
4
5use futures_util::{StreamExt, TryStreamExt};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::UnboundedReceiverStream;
8
9use crate::{
10 errors::HostError,
11 logic::{sys, VMHostFunctions, VMLogicError, VMLogicResult, DIGEST_SIZE},
12};
13
14#[derive(Debug)]
16pub enum BlobHandle {
17 Write(BlobWriteHandle),
19 Read(BlobReadHandle),
21}
22
23#[derive(Debug)]
25pub struct BlobWriteHandle {
26 sender: mpsc::UnboundedSender<Vec<u8>>,
28 completion_handle: tokio::task::JoinHandle<eyre::Result<(BlobId, u64)>>,
31}
32
33pub struct BlobReadHandle {
35 blob_id: BlobId,
37 stream: Option<Box<dyn futures_util::Stream<Item = Result<bytes::Bytes, Error>> + Unpin>>,
39 current_chunk_cursor: Option<Cursor<Vec<u8>>>,
43 position: u64,
45}
46
47impl std::fmt::Debug for BlobReadHandle {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("BlobReadHandle")
50 .field("blob_id", &self.blob_id)
51 .field("stream", &"<stream>")
52 .field("current_chunk_cursor", &self.current_chunk_cursor)
53 .field("position", &self.position)
54 .finish()
55 }
56}
57
58impl VMHostFunctions<'_> {
59 pub fn blob_create(&mut self) -> VMLogicResult<u64> {
74 if self.borrow_logic().node_client.is_none() {
75 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
76 }
77
78 let Ok(limits_max_blob_handles) =
82 usize::try_from(self.borrow_logic().limits.max_blob_handles)
83 else {
84 return Err(VMLogicError::HostError(HostError::IntegerOverflow));
85 };
86
87 if self.borrow_logic().blob_handles.len() >= limits_max_blob_handles {
88 return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
89 max: self.borrow_logic().limits.max_blob_handles,
90 }));
91 }
92
93 let fd = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
94 let Some(node_client) = logic.node_client.clone() else {
95 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
96 };
97
98 let fd = logic.next_blob_fd;
99 logic.next_blob_fd = logic
100 .next_blob_fd
101 .checked_add(1)
102 .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
103
104 let (data_sender, data_receiver) = mpsc::unbounded_channel();
105
106 let completion_handle = tokio::spawn(async move {
107 let stream = UnboundedReceiverStream::new(data_receiver);
108
109 let byte_stream =
110 stream.map(|data: Vec<u8>| Ok::<bytes::Bytes, Error>(data.into()));
111 let reader = byte_stream.into_async_read();
112
113 node_client.add_blob(reader, None, None).await
114 });
115
116 let handle = BlobHandle::Write(BlobWriteHandle {
119 sender: data_sender,
120 completion_handle,
121 });
122
123 drop(logic.blob_handles.insert(fd, handle));
124 Ok(fd)
125 })?;
126
127 Ok(fd)
128 }
129
130 pub fn blob_write(&mut self, fd: u64, src_data_ptr: u64) -> VMLogicResult<u64> {
148 let data = unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_data_ptr)? };
149 let data_len = data.len();
150
151 if self.borrow_logic().node_client.is_none() {
152 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
153 }
154
155 if data_len > self.borrow_logic().limits.max_blob_chunk_size {
157 return Err(VMLogicError::HostError(HostError::BlobWriteTooLarge {
158 size: data_len,
159 max: self.borrow_logic().limits.max_blob_chunk_size,
160 }));
161 }
162
163 let data = self.read_guest_memory_slice(&data).to_vec();
164
165 self.with_logic_mut(|logic| {
166 let handle = logic
167 .blob_handles
168 .get(&fd)
169 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
170
171 match handle {
172 BlobHandle::Write(_) => Ok(()),
173 BlobHandle::Read(_) => Err(VMLogicError::HostError(HostError::InvalidBlobHandle)),
174 }
175 })?;
176
177 self.with_logic_mut(|logic| {
178 let handle = logic
179 .blob_handles
180 .get_mut(&fd)
181 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
182 match handle {
183 BlobHandle::Write(w) => {
184 w.sender
185 .send(data.clone())
186 .map_err(|_| VMLogicError::HostError(HostError::InvalidBlobHandle))?;
187 }
188 BlobHandle::Read(_) => {
189 return Err(VMLogicError::HostError(HostError::InvalidBlobHandle))
190 }
191 }
192 Ok::<(), VMLogicError>(())
193 })?;
194
195 Ok(data_len)
196 }
197
198 pub fn blob_close(&mut self, fd: u64, dest_blob_id_ptr: u64) -> VMLogicResult<u32> {
220 let guest_blob_id_ptr =
221 unsafe { self.read_guest_memory_typed::<sys::BufferMut<'_>>(dest_blob_id_ptr)? };
222
223 if guest_blob_id_ptr.len() != DIGEST_SIZE as u64 {
224 return Err(HostError::InvalidMemoryAccess.into());
225 }
226
227 if self.borrow_logic().node_client.is_none() {
228 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
229 }
230
231 let handle = self.with_logic_mut(|logic| {
232 logic
233 .blob_handles
234 .remove(&fd)
235 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))
236 })?;
237
238 let guest_blob_id_out_buf: &mut [u8] = self.read_guest_memory_slice_mut(&guest_blob_id_ptr);
239
240 match handle {
241 BlobHandle::Write(write_handle) => {
242 let _ignored = write_handle.sender;
243
244 let (blob_id_, _size) = tokio::runtime::Handle::current()
245 .block_on(write_handle.completion_handle)
246 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?
247 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?;
248
249 guest_blob_id_out_buf.copy_from_slice(blob_id_.as_ref());
251 }
252 BlobHandle::Read(read_handle) => {
254 guest_blob_id_out_buf.copy_from_slice(read_handle.blob_id.as_ref())
255 }
256 }
257
258 Ok(1)
259 }
260
261 pub fn blob_announce_to_context(
280 &mut self,
281 src_blob_id_ptr: u64,
282 src_context_id_ptr: u64,
283 ) -> VMLogicResult<u32> {
284 let node_client = match &self.borrow_logic().node_client {
286 Some(client) => client.clone(),
287 None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
288 };
289
290 let blob_id = unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_blob_id_ptr)? };
291 let context_id =
292 unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_context_id_ptr)? };
293
294 let blob_id = BlobId::from(*self.read_guest_memory_sized::<DIGEST_SIZE>(&blob_id)?);
295 let context_id =
296 ContextId::from(*self.read_guest_memory_sized::<DIGEST_SIZE>(&context_id)?);
297
298 let blob_info = tokio::runtime::Handle::current()
300 .block_on(node_client.get_blob_info(blob_id))
301 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?
302 .ok_or_else(|| VMLogicError::HostError(HostError::BlobsNotSupported))?;
303
304 tokio::task::block_in_place(|| {
306 tokio::runtime::Handle::current().block_on(async {
307 node_client
308 .announce_blob_to_network(&blob_id, &context_id, blob_info.size)
309 .await
310 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
311 })
312 })?;
313
314 Ok(1)
315 }
316
317 pub fn blob_open(&mut self, src_blob_id_ptr: u64) -> VMLogicResult<u64> {
334 let blob_id = unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_blob_id_ptr)? };
335
336 if self.borrow_logic().node_client.is_none() {
337 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
338 }
339
340 let Ok(limits_max_blob_handles) =
344 usize::try_from(self.borrow_logic().limits.max_blob_handles)
345 else {
346 return Err(VMLogicError::HostError(HostError::IntegerOverflow));
347 };
348
349 if self.borrow_logic().blob_handles.len() >= limits_max_blob_handles {
350 return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
351 max: self.borrow_logic().limits.max_blob_handles,
352 }));
353 }
354
355 let blob_id = BlobId::from(*self.read_guest_memory_sized::<DIGEST_SIZE>(&blob_id)?);
356
357 let fd = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
358 let fd = logic.next_blob_fd;
359 logic.next_blob_fd = logic
360 .next_blob_fd
361 .checked_add(1)
362 .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
363
364 let handle = BlobHandle::Read(BlobReadHandle {
365 blob_id,
366 stream: None,
367 current_chunk_cursor: None,
368 position: 0,
369 });
370
371 drop(logic.blob_handles.insert(fd, handle));
374 Ok(fd)
375 })?;
376
377 Ok(fd)
378 }
379
380 pub fn blob_read(&mut self, fd: u64, dest_data_ptr: u64) -> VMLogicResult<u64> {
402 let dest_data =
403 unsafe { self.read_guest_memory_typed::<sys::BufferMut<'_>>(dest_data_ptr)? };
404 let data_len = dest_data.len();
405
406 let node_client = match &self.borrow_logic().node_client {
408 Some(client) => client.clone(),
409 None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
410 };
411
412 if data_len > self.borrow_logic().limits.max_blob_chunk_size {
414 return Err(VMLogicError::HostError(HostError::BlobBufferTooLarge {
415 size: data_len,
416 max: self.borrow_logic().limits.max_blob_chunk_size,
417 }));
418 }
419
420 if data_len == 0 {
421 return Ok(0);
422 }
423
424 let Ok(data_len) = usize::try_from(data_len) else {
428 return Err(VMLogicError::HostError(HostError::IntegerOverflow));
429 };
430 let mut output_buffer = Vec::with_capacity(data_len);
432
433 let bytes_read = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
434 let handle = logic
435 .blob_handles
436 .get_mut(&fd)
437 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
438
439 let read_handle = match handle {
440 BlobHandle::Read(r) => r,
441 BlobHandle::Write(_) => {
442 return Err(VMLogicError::HostError(HostError::InvalidBlobHandle))
443 }
444 };
445
446 if let Some(cursor) = &mut read_handle.current_chunk_cursor {
448 let mut temp_buffer = vec![0_u8; data_len];
449 match cursor.read(&mut temp_buffer) {
450 Ok(bytes_from_cursor) => {
451 output_buffer.extend_from_slice(&temp_buffer[..bytes_from_cursor]);
452
453 if bytes_from_cursor == 0
455 || cursor.position() >= cursor.get_ref().len() as u64
456 {
457 read_handle.current_chunk_cursor = None;
458 }
459
460 if output_buffer.len() >= data_len {
462 read_handle.position = read_handle
463 .position
464 .checked_add(output_buffer.len() as u64)
465 .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
466
467 return Ok(output_buffer.len() as u64);
468 }
469 }
470 Err(_) => {
471 read_handle.current_chunk_cursor = None;
473 }
474 }
475 }
476
477 if read_handle.stream.is_none() {
478 let blob_stream = tokio::runtime::Handle::current()
479 .block_on(node_client.get_blob(&read_handle.blob_id, None))
480 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?;
481
482 if let Some(stream) = blob_stream {
483 let mapped_stream = stream.map(|result| {
484 result
485 .map(|chunk| bytes::Bytes::copy_from_slice(&chunk))
486 .map_err(|_| Error::other("blob read error"))
487 });
488 read_handle.stream = Some(Box::new(mapped_stream));
489 } else {
490 read_handle.position = read_handle
491 .position
492 .checked_add(output_buffer.len() as u64)
493 .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
494
495 return Ok(output_buffer.len() as u64);
496 }
497 }
498
499 if let Some(stream) = &mut read_handle.stream {
500 tokio::runtime::Handle::current().block_on(async {
501 while output_buffer.len() < data_len {
502 match stream.next().await {
503 Some(Ok(chunk)) => {
504 let chunk_bytes = chunk.as_ref();
505
506 let remaining_needed = data_len
507 .checked_sub(output_buffer.len())
508 .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
509
510 if chunk_bytes.len() <= remaining_needed {
511 output_buffer.extend_from_slice(chunk_bytes);
512 } else {
513 output_buffer
515 .extend_from_slice(&chunk_bytes[..remaining_needed]);
516
517 let remaining_data = chunk_bytes[remaining_needed..].to_vec();
519 read_handle.current_chunk_cursor =
520 Some(Cursor::new(remaining_data));
521
522 break;
523 }
524 }
525 Some(Err(_)) | None => {
526 break;
527 }
528 }
529 }
530 Ok::<(), VMLogicError>(())
531 })?;
532 }
533
534 read_handle.position = read_handle
535 .position
536 .checked_add(output_buffer.len() as u64)
537 .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
538 Ok(output_buffer.len() as u64)
539 })?;
540
541 if bytes_read > 0 {
542 self.read_guest_memory_slice_mut(&dest_data)
545 .copy_from_slice(&output_buffer);
546 }
547
548 Ok(bytes_read)
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555
556 use crate::logic::{
557 tests::{setup_vm, SimpleMockStorage},
558 Cow, VMContext, VMLimits, VMLogic,
559 };
560 use wasmer::{AsStoreMut, Store};
561
562 #[test]
565 fn test_blob_create_without_client_returns_an_error() {
566 let mut storage = SimpleMockStorage::new();
567 let limits = VMLimits::default();
568 let (mut logic, mut store) = setup_vm!(&mut storage, &limits, vec![]);
569 let mut host = logic.host_functions(store.as_store_mut());
570 let err = host.blob_create().unwrap_err();
571 assert!(matches!(
572 err,
573 VMLogicError::HostError(HostError::BlobsNotSupported)
574 ));
575 }
576}