calimero_runtime/logic/host_functions/
blobs.rs

1use std::io::{Cursor, Error, Read};
2
3use calimero_primitives::{blobs::BlobId, context::ContextId};
4
5use futures_util::{StreamExt, TryStreamExt};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::UnboundedReceiverStream;
8
9use crate::{
10    errors::HostError,
11    logic::{sys, VMHostFunctions, VMLogicError, VMLogicResult, DIGEST_SIZE},
12};
13
14/// An enum representing a handle to a blob, which can be for reading or writing.
15#[derive(Debug)]
16pub enum BlobHandle {
17    /// A handle for writing data to a blob.
18    Write(BlobWriteHandle),
19    /// A handle for reading data from an existing blob.
20    Read(BlobReadHandle),
21}
22
23/// A handle for managing an asynchronous blob write operation.
24#[derive(Debug)]
25pub struct BlobWriteHandle {
26    /// The sender part of a channel to stream data chunks to the writer task.
27    sender: mpsc::UnboundedSender<Vec<u8>>,
28    /// A handle to the spawned task that performs the blob writing,
29    /// which will eventually yield the `BlobId` and total size of the data written.
30    completion_handle: tokio::task::JoinHandle<eyre::Result<(BlobId, u64)>>,
31}
32
33/// A handle for managing a blob read operation.
34pub struct BlobReadHandle {
35    /// The ID of the blob being read.
36    blob_id: BlobId,
37    /// The asynchronous stream of data chunks from the blob storage.
38    stream: Option<Box<dyn futures_util::Stream<Item = Result<bytes::Bytes, Error>> + Unpin>>,
39    /// A cursor for the current data chunk to handle partial reads efficiently.
40    /// Cursor for current storage chunk - automatic position tracking!
41    // TODO: clarify the "automatic position tracking".
42    current_chunk_cursor: Option<Cursor<Vec<u8>>>,
43    /// The current reading position within the blob.
44    position: u64,
45}
46
47impl std::fmt::Debug for BlobReadHandle {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("BlobReadHandle")
50            .field("blob_id", &self.blob_id)
51            .field("stream", &"<stream>")
52            .field("current_chunk_cursor", &self.current_chunk_cursor)
53            .field("position", &self.position)
54            .finish()
55    }
56}
57
58impl VMHostFunctions<'_> {
59    /// Creates a new blob for writing.
60    ///
61    /// This initializes a blob upload stream and returns a file descriptor (`fd`) that
62    /// can be used with `blob_write` and `blob_close`.
63    ///
64    /// # Returns
65    ///
66    /// A `u64` file descriptor for the new blob write handle.
67    ///
68    /// # Errors
69    ///
70    /// * `HostError::BlobsNotSupported` if the node client is not configured.
71    /// * `HostError::TooManyBlobHandles` if the maximum number of handles is exceeded.
72    /// * `HostError::IntegerOverflow` on `u64` overflow.
73    pub fn blob_create(&mut self) -> VMLogicResult<u64> {
74        if self.borrow_logic().node_client.is_none() {
75            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
76        }
77
78        // The error should never happen as unlikely we have limits set with a value >= u32::MAX.
79        // Still, the check is essential as downcasting on 32-bit systems might lead to
80        // undefined behavior.
81        let Ok(limits_max_blob_handles) =
82            usize::try_from(self.borrow_logic().limits.max_blob_handles)
83        else {
84            return Err(VMLogicError::HostError(HostError::IntegerOverflow));
85        };
86
87        if self.borrow_logic().blob_handles.len() >= limits_max_blob_handles {
88            return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
89                max: self.borrow_logic().limits.max_blob_handles,
90            }));
91        }
92
93        let fd = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
94            let Some(node_client) = logic.node_client.clone() else {
95                return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
96            };
97
98            let fd = logic.next_blob_fd;
99            logic.next_blob_fd = logic
100                .next_blob_fd
101                .checked_add(1)
102                .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
103
104            let (data_sender, data_receiver) = mpsc::unbounded_channel();
105
106            let completion_handle = tokio::spawn(async move {
107                let stream = UnboundedReceiverStream::new(data_receiver);
108
109                let byte_stream =
110                    stream.map(|data: Vec<u8>| Ok::<bytes::Bytes, Error>(data.into()));
111                let reader = byte_stream.into_async_read();
112
113                node_client.add_blob(reader, None, None).await
114            });
115
116            //TODO: add assert that no bytes were written during the creation of an empty blob.
117
118            let handle = BlobHandle::Write(BlobWriteHandle {
119                sender: data_sender,
120                completion_handle,
121            });
122
123            drop(logic.blob_handles.insert(fd, handle));
124            Ok(fd)
125        })?;
126
127        Ok(fd)
128    }
129
130    /// Writes a chunk of data to a blob.
131    ///
132    /// # Arguments
133    ///
134    /// * `fd` - The file descriptor obtained from `blob_create()` operation.
135    /// * `src_data_ptr` - A pointer to a source-buffer `sys::Buffer` in guest memory
136    /// containing the data chunk to write.
137    ///
138    /// # Returns
139    ///
140    /// The number of bytes written as `u64`, which is equal to the length of the input data buffer.
141    ///
142    /// # Errors
143    ///
144    /// * `HostError::BlobsNotSupported` if the node client is not configured.
145    /// * `HostError::InvalidBlobHandle` if the `fd` is invalid or not a write handle.
146    /// * `HostError::BlobWriteTooLarge` if the data chunk exceeds `max_blob_chunk_size`.
147    pub fn blob_write(&mut self, fd: u64, src_data_ptr: u64) -> VMLogicResult<u64> {
148        let data = unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_data_ptr)? };
149        let data_len = data.len();
150
151        if self.borrow_logic().node_client.is_none() {
152            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
153        }
154
155        // Validate chunk size
156        if data_len > self.borrow_logic().limits.max_blob_chunk_size {
157            return Err(VMLogicError::HostError(HostError::BlobWriteTooLarge {
158                size: data_len,
159                max: self.borrow_logic().limits.max_blob_chunk_size,
160            }));
161        }
162
163        let data = self.read_guest_memory_slice(&data).to_vec();
164
165        self.with_logic_mut(|logic| {
166            let handle = logic
167                .blob_handles
168                .get(&fd)
169                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
170
171            match handle {
172                BlobHandle::Write(_) => Ok(()),
173                BlobHandle::Read(_) => Err(VMLogicError::HostError(HostError::InvalidBlobHandle)),
174            }
175        })?;
176
177        self.with_logic_mut(|logic| {
178            let handle = logic
179                .blob_handles
180                .get_mut(&fd)
181                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
182            match handle {
183                BlobHandle::Write(w) => {
184                    w.sender
185                        .send(data.clone())
186                        .map_err(|_| VMLogicError::HostError(HostError::InvalidBlobHandle))?;
187                }
188                BlobHandle::Read(_) => {
189                    return Err(VMLogicError::HostError(HostError::InvalidBlobHandle))
190                }
191            }
192            Ok::<(), VMLogicError>(())
193        })?;
194
195        Ok(data_len)
196    }
197
198    /// Closes a blob handle and gets the resulting blob ID.
199    ///
200    /// For a write handle, this finalizes the upload and writes the resulting `BlobId`
201    /// into the guest's memory buffer. For a read handle, it simply closes it.
202    ///
203    /// # Arguments
204    ///
205    /// * `fd` - The file descriptor to close.
206    /// * `dest_blob_id_ptr` - A pointer to a 32-byte destination buffer `sys::BufferMut`
207    /// in guest memory where the final `BlobId` will be written (for write handles).
208    ///
209    /// # Returns
210    ///
211    /// Returns `1` on success.
212    ///
213    /// # Errors
214    ///
215    /// * `HostError::InvalidMemoryAccess` if the `blob_id_ptr` buffer is not 32 bytes
216    /// or if memory access fails for a descriptor buffer.
217    /// * `HostError::InvalidBlobHandle` if the `fd` is invalid.
218    /// * `HostError::BlobsNotSupported` if the node client is not supported or upload operation fails.
219    pub fn blob_close(&mut self, fd: u64, dest_blob_id_ptr: u64) -> VMLogicResult<u32> {
220        let guest_blob_id_ptr =
221            unsafe { self.read_guest_memory_typed::<sys::BufferMut<'_>>(dest_blob_id_ptr)? };
222
223        if guest_blob_id_ptr.len() != DIGEST_SIZE as u64 {
224            return Err(HostError::InvalidMemoryAccess.into());
225        }
226
227        if self.borrow_logic().node_client.is_none() {
228            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
229        }
230
231        let handle = self.with_logic_mut(|logic| {
232            logic
233                .blob_handles
234                .remove(&fd)
235                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))
236        })?;
237
238        let guest_blob_id_out_buf: &mut [u8] = self.read_guest_memory_slice_mut(&guest_blob_id_ptr);
239
240        match handle {
241            BlobHandle::Write(write_handle) => {
242                let _ignored = write_handle.sender;
243
244                let (blob_id_, _size) = tokio::runtime::Handle::current()
245                    .block_on(write_handle.completion_handle)
246                    .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?
247                    .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?;
248
249                // Record the Blob ID into the guest memory buffer
250                guest_blob_id_out_buf.copy_from_slice(blob_id_.as_ref());
251            }
252            // Record the Blob ID into the guest memory buffer
253            BlobHandle::Read(read_handle) => {
254                guest_blob_id_out_buf.copy_from_slice(read_handle.blob_id.as_ref())
255            }
256        }
257
258        Ok(1)
259    }
260
261    /// Announces a blob to a specific context for network discovery.
262    ///
263    /// # Arguments
264    ///
265    /// * `src_blob_id_ptr` - pointer to a 32-byte source-buffer `sys::Buffer` in guest memory,
266    /// containing the 32-byte `BlobId`.
267    /// * `src_context_id_ptr` - pointer to a 32-byte source-buffer `sys::Buffer` in guest memory,
268    /// containing the 32-byte `ContextId`.
269    ///
270    /// # Returns
271    ///
272    /// Returns `1` on successful announcement.
273    ///
274    /// # Errors
275    ///
276    /// * `HostError::BlobsNotSupported` if blob functionality is disabled or a network
277    ///   error occurs.
278    /// * `HostError::InvalidMemoryAccess` if memory access fails for descriptor buffers.
279    pub fn blob_announce_to_context(
280        &mut self,
281        src_blob_id_ptr: u64,
282        src_context_id_ptr: u64,
283    ) -> VMLogicResult<u32> {
284        // Check if blob functionality is available
285        let node_client = match &self.borrow_logic().node_client {
286            Some(client) => client.clone(),
287            None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
288        };
289
290        let blob_id = unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_blob_id_ptr)? };
291        let context_id =
292            unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_context_id_ptr)? };
293
294        let blob_id = BlobId::from(*self.read_guest_memory_sized::<DIGEST_SIZE>(&blob_id)?);
295        let context_id =
296            ContextId::from(*self.read_guest_memory_sized::<DIGEST_SIZE>(&context_id)?);
297
298        // Get blob metadata to get size
299        let blob_info = tokio::runtime::Handle::current()
300            .block_on(node_client.get_blob_info(blob_id))
301            .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?
302            .ok_or_else(|| VMLogicError::HostError(HostError::BlobsNotSupported))?;
303
304        // Announce blob to network
305        tokio::task::block_in_place(|| {
306            tokio::runtime::Handle::current().block_on(async {
307                node_client
308                    .announce_blob_to_network(&blob_id, &context_id, blob_info.size)
309                    .await
310                    .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
311            })
312        })?;
313
314        Ok(1)
315    }
316
317    /// Opens an existing blob for reading.
318    ///
319    /// # Arguments
320    ///
321    /// * `src_blob_id_ptr` - pointer to a 32-byte source-buffer `sys::Buffer` in guest memory,
322    /// containing the 32-byte `BlobId`.
323    ///
324    /// # Returns
325    ///
326    /// A `u64` file descriptor for the new blob read handle.
327    ///
328    /// # Errors
329    ///
330    /// * `HostError::BlobsNotSupported` if the node client is not configured.
331    /// * `HostError::TooManyBlobHandles` if the maximum number of handles is exceeded.
332    /// * `HostError::InvalidMemoryAccess` if memory access fails for a descriptor buffer.
333    pub fn blob_open(&mut self, src_blob_id_ptr: u64) -> VMLogicResult<u64> {
334        let blob_id = unsafe { self.read_guest_memory_typed::<sys::Buffer<'_>>(src_blob_id_ptr)? };
335
336        if self.borrow_logic().node_client.is_none() {
337            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
338        }
339
340        // The error should never happen as unlikely we have limits set with a value >= u32::MAX.
341        // Still, the check is essential as downcasting on 32-bit systems might lead to
342        // undefined behavior.
343        let Ok(limits_max_blob_handles) =
344            usize::try_from(self.borrow_logic().limits.max_blob_handles)
345        else {
346            return Err(VMLogicError::HostError(HostError::IntegerOverflow));
347        };
348
349        if self.borrow_logic().blob_handles.len() >= limits_max_blob_handles {
350            return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
351                max: self.borrow_logic().limits.max_blob_handles,
352            }));
353        }
354
355        let blob_id = BlobId::from(*self.read_guest_memory_sized::<DIGEST_SIZE>(&blob_id)?);
356
357        let fd = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
358            let fd = logic.next_blob_fd;
359            logic.next_blob_fd = logic
360                .next_blob_fd
361                .checked_add(1)
362                .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
363
364            let handle = BlobHandle::Read(BlobReadHandle {
365                blob_id,
366                stream: None,
367                current_chunk_cursor: None,
368                position: 0,
369            });
370
371            // TODO: verify if we need to drop it here or just ignore the value:
372            // `let _ignored = logic.blob_handles.insert(fd, handle));`
373            drop(logic.blob_handles.insert(fd, handle));
374            Ok(fd)
375        })?;
376
377        Ok(fd)
378    }
379
380    /// Reads a chunk of data from an open blob.
381    ///
382    /// Data is read from the blob and copied into the provided guest memory buffer.
383    ///
384    /// # Arguments
385    ///
386    /// * `fd` - The file descriptor obtained from `blob_open`.
387    /// * `dest_data_ptr` - A pointer to a destination buffer `sys::BufferMut` in guest memory where
388    /// the read data will be stored
389    ///
390    /// # Returns
391    ///
392    /// The number of bytes actually read as `u64`. This can be less than the buffer size if the
393    /// end of the blob is reached.
394    ///
395    /// # Errors
396    ///
397    /// * `HostError::BlobsNotSupported` if blob functionality is unavailable.
398    /// * `HostError::InvalidBlobHandle` if the `fd` is invalid or not a read handle.
399    /// * `HostError::BlobBufferTooLarge` if the guest buffer exceeds `max_blob_chunk_size`.
400    /// * `HostError::InvalidMemoryAccess` if memory access fails for a descriptor buffer.
401    pub fn blob_read(&mut self, fd: u64, dest_data_ptr: u64) -> VMLogicResult<u64> {
402        let dest_data =
403            unsafe { self.read_guest_memory_typed::<sys::BufferMut<'_>>(dest_data_ptr)? };
404        let data_len = dest_data.len();
405
406        // Check if blob functionality is available
407        let node_client = match &self.borrow_logic().node_client {
408            Some(client) => client.clone(),
409            None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
410        };
411
412        // Validate buffer size
413        if data_len > self.borrow_logic().limits.max_blob_chunk_size {
414            return Err(VMLogicError::HostError(HostError::BlobBufferTooLarge {
415                size: data_len,
416                max: self.borrow_logic().limits.max_blob_chunk_size,
417            }));
418        }
419
420        if data_len == 0 {
421            return Ok(0);
422        }
423
424        // The error should never happen as we already validated the buffer size before.
425        // Still, the check is essential as downcasting on 32-bit systems might lead to
426        // undefined behavior.
427        let Ok(data_len) = usize::try_from(data_len) else {
428            return Err(VMLogicError::HostError(HostError::IntegerOverflow));
429        };
430        // Local output buffer.
431        let mut output_buffer = Vec::with_capacity(data_len);
432
433        let bytes_read = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
434            let handle = logic
435                .blob_handles
436                .get_mut(&fd)
437                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
438
439            let read_handle = match handle {
440                BlobHandle::Read(r) => r,
441                BlobHandle::Write(_) => {
442                    return Err(VMLogicError::HostError(HostError::InvalidBlobHandle))
443                }
444            };
445
446            // First, try to read from current chunk cursor if available
447            if let Some(cursor) = &mut read_handle.current_chunk_cursor {
448                let mut temp_buffer = vec![0_u8; data_len];
449                match cursor.read(&mut temp_buffer) {
450                    Ok(bytes_from_cursor) => {
451                        output_buffer.extend_from_slice(&temp_buffer[..bytes_from_cursor]);
452
453                        // If cursor is exhausted, remove it
454                        if bytes_from_cursor == 0
455                            || cursor.position() >= cursor.get_ref().len() as u64
456                        {
457                            read_handle.current_chunk_cursor = None;
458                        }
459
460                        // If we satisfied the request entirely from cursor, we're done
461                        if output_buffer.len() >= data_len {
462                            read_handle.position = read_handle
463                                .position
464                                .checked_add(output_buffer.len() as u64)
465                                .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
466
467                            return Ok(output_buffer.len() as u64);
468                        }
469                    }
470                    Err(_) => {
471                        // Cursor error, remove it
472                        read_handle.current_chunk_cursor = None;
473                    }
474                }
475            }
476
477            if read_handle.stream.is_none() {
478                let blob_stream = tokio::runtime::Handle::current()
479                    .block_on(node_client.get_blob(&read_handle.blob_id, None))
480                    .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?;
481
482                if let Some(stream) = blob_stream {
483                    let mapped_stream = stream.map(|result| {
484                        result
485                            .map(|chunk| bytes::Bytes::copy_from_slice(&chunk))
486                            .map_err(|_| Error::other("blob read error"))
487                    });
488                    read_handle.stream = Some(Box::new(mapped_stream));
489                } else {
490                    read_handle.position = read_handle
491                        .position
492                        .checked_add(output_buffer.len() as u64)
493                        .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
494
495                    return Ok(output_buffer.len() as u64);
496                }
497            }
498
499            if let Some(stream) = &mut read_handle.stream {
500                tokio::runtime::Handle::current().block_on(async {
501                    while output_buffer.len() < data_len {
502                        match stream.next().await {
503                            Some(Ok(chunk)) => {
504                                let chunk_bytes = chunk.as_ref();
505
506                                let remaining_needed = data_len
507                                    .checked_sub(output_buffer.len())
508                                    .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
509
510                                if chunk_bytes.len() <= remaining_needed {
511                                    output_buffer.extend_from_slice(chunk_bytes);
512                                } else {
513                                    // Use part of chunk, save rest in cursor for next time
514                                    output_buffer
515                                        .extend_from_slice(&chunk_bytes[..remaining_needed]);
516
517                                    // Create cursor with remaining data
518                                    let remaining_data = chunk_bytes[remaining_needed..].to_vec();
519                                    read_handle.current_chunk_cursor =
520                                        Some(Cursor::new(remaining_data));
521
522                                    break;
523                                }
524                            }
525                            Some(Err(_)) | None => {
526                                break;
527                            }
528                        }
529                    }
530                    Ok::<(), VMLogicError>(())
531                })?;
532            }
533
534            read_handle.position = read_handle
535                .position
536                .checked_add(output_buffer.len() as u64)
537                .ok_or(VMLogicError::HostError(HostError::IntegerOverflow))?;
538            Ok(output_buffer.len() as u64)
539        })?;
540
541        if bytes_read > 0 {
542            // Copy data from the local output buffer to destination buffer located in guest
543            // memory.
544            self.read_guest_memory_slice_mut(&dest_data)
545                .copy_from_slice(&output_buffer);
546        }
547
548        Ok(bytes_read)
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555
556    use crate::logic::{
557        tests::{setup_vm, SimpleMockStorage},
558        Cow, VMContext, VMLimits, VMLogic,
559    };
560    use wasmer::{AsStoreMut, Store};
561
562    /// Verifies that `blob_create` host function correctly returns an error when
563    /// the node client is not configured.
564    #[test]
565    fn test_blob_create_without_client_returns_an_error() {
566        let mut storage = SimpleMockStorage::new();
567        let limits = VMLimits::default();
568        let (mut logic, mut store) = setup_vm!(&mut storage, &limits, vec![]);
569        let mut host = logic.host_functions(store.as_store_mut());
570        let err = host.blob_create().unwrap_err();
571        assert!(matches!(
572            err,
573            VMLogicError::HostError(HostError::BlobsNotSupported)
574        ));
575    }
576}