calimero_runtime/
logic.rs

1#![allow(single_use_lifetimes, unused_lifetimes, reason = "False positive")]
2#![allow(clippy::mem_forget, reason = "Safe for now")]
3
4use core::num::NonZeroU64;
5use core::{fmt, slice};
6use std::borrow::Cow;
7use std::collections::{BTreeMap, HashMap};
8use std::io::{Cursor, Read};
9use std::mem::MaybeUninit;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::vec;
12
13use borsh::from_slice as from_borsh_slice;
14use calimero_node_primitives::client::NodeClient;
15use calimero_primitives::blobs::BlobId;
16use calimero_sys as sys;
17use futures_util::{StreamExt, TryStreamExt};
18use ouroboros::self_referencing;
19use rand::RngCore;
20use serde::Serialize;
21use tokio::sync::mpsc;
22use tokio_stream::wrappers::UnboundedReceiverStream;
23
24use crate::constraint::{Constrained, MaxU64};
25use crate::errors::{FunctionCallError, HostError, Location, PanicContext};
26use crate::store::Storage;
27use crate::Constraint;
28
29mod errors;
30mod imports;
31mod registers;
32
33pub use errors::VMLogicError;
34use registers::Registers;
35
36pub type VMLogicResult<T, E = VMLogicError> = Result<T, E>;
37
38#[derive(Debug)]
39#[non_exhaustive]
40pub struct VMContext<'a> {
41    pub input: Cow<'a, [u8]>,
42    pub context_id: [u8; 32],
43    pub executor_public_key: [u8; 32],
44}
45
46impl<'a> VMContext<'a> {
47    #[must_use]
48    pub const fn new(
49        input: Cow<'a, [u8]>,
50        context_id: [u8; 32],
51        executor_public_key: [u8; 32],
52    ) -> Self {
53        Self {
54            input,
55            context_id,
56            executor_public_key,
57        }
58    }
59}
60
61#[derive(Debug, Clone, Copy)]
62pub struct VMLimits {
63    pub max_memory_pages: u32,
64    pub max_stack_size: usize,
65    pub max_registers: u64,
66    // constrained to be less than u64::MAX
67    // because register_len returns u64::MAX if the register is not found
68    pub max_register_size: Constrained<u64, MaxU64<{ u64::MAX - 1 }>>,
69    pub max_registers_capacity: u64, // todo! must not be less than max_register_size
70    pub max_logs: u64,
71    pub max_log_size: u64,
72    pub max_events: u64,
73    pub max_event_kind_size: u64,
74    pub max_event_data_size: u64,
75    pub max_storage_key_size: NonZeroU64,
76    pub max_storage_value_size: NonZeroU64,
77    pub max_blob_handles: u64,
78    pub max_blob_chunk_size: u64,
79}
80
81impl Default for VMLimits {
82    fn default() -> Self {
83        #[inline(always)]
84        fn is_valid<T, E: fmt::Debug>(t: Result<T, E>) -> T {
85            t.expect("is valid")
86        }
87
88        VMLimits {
89            max_memory_pages: 1 << 10,                               // 1 KiB (64 KiB?)
90            max_stack_size: 200 << 10,                               // 200 KiB
91            max_registers: 100,                                      //
92            max_register_size: is_valid((100 << 20).validate()),     // 100 MiB
93            max_registers_capacity: 1 << 30,                         // 1 GiB
94            max_logs: 100,                                           //
95            max_log_size: 16 << 10,                                  // 16 KiB
96            max_events: 100,                                         //
97            max_event_kind_size: 100,                                //
98            max_event_data_size: 16 << 10,                           // 16 KiB
99            max_storage_key_size: is_valid((1 << 20).try_into()),    // 1 MiB
100            max_storage_value_size: is_valid((10 << 20).try_into()), // 10 MiB
101            max_blob_handles: 100,
102            max_blob_chunk_size: 10 << 20, // 10 MiB
103        }
104    }
105}
106
107enum BlobHandle {
108    Write(BlobWriteHandle),
109    Read(BlobReadHandle),
110}
111
112#[derive(Debug)]
113struct BlobWriteHandle {
114    sender: mpsc::UnboundedSender<Vec<u8>>,
115    completion_handle: tokio::task::JoinHandle<eyre::Result<(BlobId, u64)>>,
116}
117
118struct BlobReadHandle {
119    blob_id: BlobId,
120    // Stream state
121    stream:
122        Option<Box<dyn futures_util::Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>,
123    // Cursor for current storage chunk - automatic position tracking!
124    current_chunk_cursor: Option<Cursor<Vec<u8>>>,
125    position: u64,
126}
127
128#[expect(
129    missing_debug_implementations,
130    reason = "storage and node_client can't impl Debug"
131)]
132pub struct VMLogic<'a> {
133    storage: &'a mut dyn Storage,
134    memory: Option<wasmer::Memory>,
135    context: VMContext<'a>,
136    limits: &'a VMLimits,
137    registers: Registers,
138    returns: Option<VMLogicResult<Vec<u8>, Vec<u8>>>,
139    logs: Vec<String>,
140    events: Vec<Event>,
141    root_hash: Option<[u8; 32]>,
142    artifact: Vec<u8>,
143    proposals: BTreeMap<[u8; 32], Vec<u8>>,
144    approvals: Vec<[u8; 32]>,
145
146    // Blob functionality
147    node_client: Option<NodeClient>,
148    blob_handles: HashMap<u64, BlobHandle>,
149    next_blob_fd: u64,
150}
151
152impl<'a> VMLogic<'a> {
153    pub fn new(
154        storage: &'a mut dyn Storage,
155        context: VMContext<'a>,
156        limits: &'a VMLimits,
157        node_client: Option<NodeClient>,
158    ) -> Self {
159        VMLogic {
160            storage,
161            memory: None,
162            context,
163            limits,
164            registers: Registers::default(),
165            returns: None,
166            logs: vec![],
167            events: vec![],
168            root_hash: None,
169            artifact: vec![],
170            proposals: BTreeMap::new(),
171            approvals: vec![],
172
173            // Blob functionality
174            node_client,
175            blob_handles: HashMap::new(),
176            next_blob_fd: 1,
177        }
178    }
179
180    pub fn with_memory(&mut self, memory: wasmer::Memory) -> &mut Self {
181        self.memory = Some(memory);
182        self
183    }
184
185    pub fn host_functions(&'a mut self, store: wasmer::StoreMut<'a>) -> VMHostFunctions<'a> {
186        let memory = self.memory.clone().expect("VM Memory not initialized");
187
188        VMHostFunctionsBuilder {
189            logic: self,
190            store,
191
192            memory_builder: |store| memory.view(store),
193        }
194        .build()
195    }
196}
197
198#[derive(Debug, Serialize)]
199#[non_exhaustive]
200pub struct Outcome {
201    pub returns: VMLogicResult<Option<Vec<u8>>, FunctionCallError>,
202    pub logs: Vec<String>,
203    pub events: Vec<Event>,
204    pub root_hash: Option<[u8; 32]>,
205    pub artifact: Vec<u8>,
206    pub proposals: BTreeMap<[u8; 32], Vec<u8>>,
207    //list of ids for approved proposals
208    pub approvals: Vec<[u8; 32]>,
209    // execution runtime
210    // current storage usage of the app
211}
212
213#[derive(Debug, Serialize)]
214#[non_exhaustive]
215pub struct Event {
216    pub kind: String,
217    pub data: Vec<u8>,
218}
219
220impl VMLogic<'_> {
221    #[must_use]
222    pub fn finish(self, err: Option<FunctionCallError>) -> Outcome {
223        let returns = match err {
224            Some(err) => Err(err),
225            None => self
226                .returns
227                .map(|t| t.map_err(FunctionCallError::ExecutionError))
228                .transpose(),
229        };
230
231        Outcome {
232            returns,
233            logs: self.logs,
234            events: self.events,
235            root_hash: self.root_hash,
236            artifact: self.artifact,
237            proposals: self.proposals,
238            approvals: self.approvals,
239        }
240    }
241}
242
243#[self_referencing]
244pub struct VMHostFunctions<'a> {
245    logic: &'a mut VMLogic<'a>,
246    store: wasmer::StoreMut<'a>,
247
248    #[covariant]
249    #[borrows(store)]
250    memory: wasmer::MemoryView<'this>,
251}
252
253impl VMHostFunctions<'_> {
254    fn read_slice(&mut self, slice: &sys::Buffer<'_>) -> &mut [u8] {
255        let ptr = slice.ptr().value().as_usize();
256        let len = slice.len() as usize;
257
258        unsafe { &mut self.borrow_memory().data_unchecked_mut()[ptr..ptr + len] }
259    }
260
261    fn read_guest_memory_sized<const N: usize>(
262        &self,
263        ptr: u64,
264        len: u64,
265    ) -> VMLogicResult<[u8; N]> {
266        let len = usize::try_from(len).map_err(|_| HostError::IntegerOverflow)?;
267
268        if len != N {
269            return Err(HostError::InvalidMemoryAccess.into());
270        }
271
272        let mut buf = [0; N];
273
274        self.borrow_memory().read(ptr, &mut buf)?;
275
276        Ok(buf)
277    }
278
279    /// Reads a sized type from guest memory.
280    unsafe fn read_typed<T>(&self, ptr: u64) -> VMLogicResult<T> {
281        let mut value = MaybeUninit::<T>::uninit();
282
283        let raw = slice::from_raw_parts_mut(value.as_mut_ptr().cast::<u8>(), size_of::<T>());
284
285        self.borrow_memory().read(ptr, raw)?;
286
287        Ok(value.assume_init())
288    }
289}
290
291impl VMHostFunctions<'_> {
292    pub fn panic(&mut self, location_ptr: u64) -> VMLogicResult<()> {
293        let location = unsafe { self.read_typed::<sys::Location<'_>>(location_ptr)? };
294
295        let file = String::from_utf8(self.read_slice(&location.file()).to_vec())
296            .map_err(|_| HostError::BadUTF8)?;
297        let line = location.line();
298        let column = location.column();
299
300        Err(HostError::Panic {
301            context: PanicContext::Guest,
302            message: "explicit panic".to_owned(),
303            location: Location::At { file, line, column },
304        }
305        .into())
306    }
307
308    pub fn panic_utf8(&mut self, msg_ptr: u64, location_ptr: u64) -> VMLogicResult<()> {
309        let message_buf = unsafe { self.read_typed::<sys::Buffer<'_>>(msg_ptr)? };
310        let location = unsafe { self.read_typed::<sys::Location<'_>>(location_ptr)? };
311
312        let message = String::from_utf8(self.read_slice(&message_buf).to_vec())
313            .map_err(|_| HostError::BadUTF8)?;
314        let file = String::from_utf8(self.read_slice(&location.file()).to_vec())
315            .map_err(|_| HostError::BadUTF8)?;
316        let line = location.line();
317        let column = location.column();
318
319        Err(HostError::Panic {
320            context: PanicContext::Guest,
321            message,
322            location: Location::At { file, line, column },
323        }
324        .into())
325    }
326
327    pub fn register_len(&self, register_id: u64) -> VMLogicResult<u64> {
328        Ok(self
329            .borrow_logic()
330            .registers
331            .get_len(register_id)
332            .unwrap_or(u64::MAX))
333    }
334
335    pub fn read_register(&self, register_id: u64, register_ptr: u64) -> VMLogicResult<u32> {
336        let register = unsafe { self.read_typed::<sys::BufferMut<'_>>(register_ptr)? };
337
338        let data = self.borrow_logic().registers.get(register_id)?;
339        if data.len() != usize::try_from(register.len()).map_err(|_| HostError::IntegerOverflow)? {
340            return Ok(0);
341        }
342        self.borrow_memory()
343            .write(register.ptr().value().as_usize() as u64, data)?;
344        Ok(1)
345    }
346
347    pub fn context_id(&mut self, register_id: u64) -> VMLogicResult<()> {
348        self.with_logic_mut(|logic| {
349            logic
350                .registers
351                .set(logic.limits, register_id, logic.context.context_id)
352        })
353    }
354
355    pub fn executor_id(&mut self, register_id: u64) -> VMLogicResult<()> {
356        self.with_logic_mut(|logic| {
357            logic
358                .registers
359                .set(logic.limits, register_id, logic.context.executor_public_key)
360        })
361    }
362
363    pub fn input(&mut self, register_id: u64) -> VMLogicResult<()> {
364        self.with_logic_mut(|logic| {
365            logic
366                .registers
367                .set(logic.limits, register_id, &*logic.context.input)
368        })?;
369
370        Ok(())
371    }
372
373    pub fn value_return(&mut self, value_ptr: u64) -> VMLogicResult<()> {
374        let result = unsafe { self.read_typed::<sys::ValueReturn<'_>>(value_ptr)? };
375
376        let result = match result {
377            sys::ValueReturn::Ok(value) => Ok(self.read_slice(&value).to_vec()),
378            sys::ValueReturn::Err(value) => Err(self.read_slice(&value).to_vec()),
379        };
380
381        self.with_logic_mut(|logic| logic.returns = Some(result));
382
383        Ok(())
384    }
385
386    pub fn log_utf8(&mut self, log_ptr: u64) -> VMLogicResult<()> {
387        let buf = unsafe { self.read_typed::<sys::Buffer<'_>>(log_ptr)? };
388
389        let message =
390            String::from_utf8(self.read_slice(&buf).to_vec()).map_err(|_| HostError::BadUTF8)?;
391
392        let logic = self.borrow_logic();
393
394        if logic.logs.len()
395            >= usize::try_from(logic.limits.max_logs).map_err(|_| HostError::IntegerOverflow)?
396        {
397            return Err(HostError::LogsOverflow.into());
398        }
399
400        self.with_logic_mut(|logic| logic.logs.push(message));
401
402        Ok(())
403    }
404
405    pub fn emit(&mut self, event_ptr: u64) -> VMLogicResult<()> {
406        let event = unsafe { self.read_typed::<sys::Event<'_>>(event_ptr)? };
407
408        let kind_len = event.kind().len();
409        let data_len = event.data().len();
410
411        let logic = self.borrow_logic();
412
413        if kind_len > logic.limits.max_event_kind_size {
414            return Err(HostError::EventKindSizeOverflow.into());
415        }
416
417        if data_len > logic.limits.max_event_data_size {
418            return Err(HostError::EventDataSizeOverflow.into());
419        }
420
421        if logic.events.len()
422            >= usize::try_from(logic.limits.max_events).map_err(|_| HostError::IntegerOverflow)?
423        {
424            return Err(HostError::EventsOverflow.into());
425        }
426
427        let kind = String::from_utf8(self.read_slice(event.kind()).to_vec())
428            .map_err(|_| HostError::BadUTF8)?;
429        let data = self.read_slice(event.data()).to_vec();
430
431        self.with_logic_mut(|logic| logic.events.push(Event { kind, data }));
432
433        Ok(())
434    }
435
436    pub fn commit(&mut self, root_hash_ptr: u64, artifact_ptr: u64) -> VMLogicResult<()> {
437        let root_hash = unsafe { self.read_typed::<sys::Buffer<'_>>(root_hash_ptr)? };
438        let artifact = unsafe { self.read_typed::<sys::Buffer<'_>>(artifact_ptr)? };
439
440        let root_hash = self
441            .read_slice(&root_hash)
442            .try_into()
443            .map_err(|_| HostError::InvalidMemoryAccess)?;
444        let artifact = self.read_slice(&artifact).to_vec();
445
446        self.with_logic_mut(|logic| {
447            if logic.root_hash.is_some() {
448                return Err(HostError::InvalidMemoryAccess);
449            }
450
451            logic.root_hash = Some(root_hash);
452            logic.artifact = artifact;
453
454            Ok(())
455        })?;
456
457        Ok(())
458    }
459
460    pub fn storage_read(&mut self, key_ptr: u64, register_id: u64) -> VMLogicResult<u32> {
461        let key = unsafe { self.read_typed::<sys::Buffer<'_>>(key_ptr)? };
462        let key_len = key.len();
463
464        let key = self
465            .read_slice(&key)
466            .try_into()
467            .map_err(|_| HostError::InvalidMemoryAccess)?;
468
469        let logic = self.borrow_logic();
470
471        if key_len > logic.limits.max_storage_key_size.get() {
472            return Err(HostError::KeyLengthOverflow.into());
473        }
474
475        if let Some(value) = logic.storage.get(&key) {
476            self.with_logic_mut(|logic| logic.registers.set(logic.limits, register_id, value))?;
477
478            return Ok(1);
479        }
480
481        Ok(0)
482    }
483
484    pub fn storage_remove(&mut self, key_ptr: u64, register_id: u64) -> VMLogicResult<u32> {
485        let key = unsafe { self.read_typed::<sys::Buffer<'_>>(key_ptr)? };
486        let key_len = key.len();
487
488        let key = self
489            .read_slice(&key)
490            .try_into()
491            .map_err(|_| HostError::InvalidMemoryAccess)?;
492
493        let logic = self.borrow_logic();
494
495        if key_len > logic.limits.max_storage_key_size.get() {
496            return Err(HostError::KeyLengthOverflow.into());
497        }
498
499        if let Some(value) = logic.storage.get(&key) {
500            self.with_logic_mut(|logic| {
501                drop(logic.storage.remove(&key));
502                logic.registers.set(logic.limits, register_id, value)
503            })?;
504
505            return Ok(1);
506        }
507
508        Ok(0)
509    }
510
511    pub fn storage_write(
512        &mut self,
513        key_ptr: u64,
514        value_ptr: u64,
515        register_id: u64,
516    ) -> VMLogicResult<u32> {
517        let key = unsafe { self.read_typed::<sys::Buffer<'_>>(key_ptr)? };
518        let key_len = key.len();
519
520        let value = unsafe { self.read_typed::<sys::Buffer<'_>>(value_ptr)? };
521        let value_len = value.len();
522
523        let logic = self.borrow_logic();
524
525        if key_len > logic.limits.max_storage_key_size.get() {
526            return Err(HostError::KeyLengthOverflow.into());
527        }
528
529        if value_len > logic.limits.max_storage_value_size.get() {
530            return Err(HostError::ValueLengthOverflow.into());
531        }
532
533        let key = self
534            .read_slice(&key)
535            .try_into()
536            .map_err(|_| HostError::InvalidMemoryAccess)?;
537        let value = self
538            .read_slice(&value)
539            .try_into()
540            .map_err(|_| HostError::InvalidMemoryAccess)?;
541
542        let evicted = self.with_logic_mut(|logic| logic.storage.set(key, value));
543
544        if let Some(evicted) = evicted {
545            self.with_logic_mut(|logic| logic.registers.set(logic.limits, register_id, evicted))?;
546
547            return Ok(1);
548        };
549
550        Ok(0)
551    }
552
553    #[expect(clippy::too_many_arguments, reason = "Acceptable here")]
554    pub fn fetch(
555        &mut self,
556        url_ptr: u64,
557        method_ptr: u64,
558        headers_ptr: u64,
559        body_ptr: u64,
560        register_id: u64,
561    ) -> VMLogicResult<u32> {
562        let url = unsafe { self.read_typed::<sys::Buffer<'_>>(url_ptr)? };
563        let method = unsafe { self.read_typed::<sys::Buffer<'_>>(method_ptr)? };
564        let headers = unsafe { self.read_typed::<sys::Buffer<'_>>(headers_ptr)? };
565        let body = unsafe { self.read_typed::<sys::Buffer<'_>>(body_ptr)? };
566
567        let url =
568            String::from_utf8(self.read_slice(&url).to_vec()).map_err(|_| HostError::BadUTF8)?;
569        let method =
570            String::from_utf8(self.read_slice(&method).to_vec()).map_err(|_| HostError::BadUTF8)?;
571
572        let headers = self.read_slice(&headers).to_vec();
573
574        // Note: The `fetch` function cannot be directly called by applications.
575        // Therefore, the headers are generated exclusively by our code, ensuring
576        // that it is safe to deserialize them.
577        let headers: Vec<(String, String)> =
578            from_borsh_slice(&headers).map_err(|_| HostError::DeserializationError)?;
579
580        let body = self.read_slice(&body).to_vec();
581        let mut request = ureq::request(&method, &url);
582
583        for (key, value) in &headers {
584            request = request.set(key, value);
585        }
586
587        let response = if body.is_empty() {
588            request.call()
589        } else {
590            request.send_bytes(&body)
591        };
592
593        let (status, data) = match response {
594            Ok(response) => {
595                let mut buffer = vec![];
596                match response.into_reader().read_to_end(&mut buffer) {
597                    Ok(_) => (0, buffer),
598                    Err(_) => (1, "Failed to read the response body.".into()),
599                }
600            }
601            Err(e) => (1, e.to_string().into_bytes()),
602        };
603
604        self.with_logic_mut(|logic| logic.registers.set(logic.limits, register_id, data))?;
605        Ok(status)
606    }
607
608    pub fn random_bytes(&mut self, ptr: u64) -> VMLogicResult<()> {
609        let byte_slice = unsafe { self.read_typed::<sys::BufferMut<'_>>(ptr)? };
610        let mut buf = self.read_slice(&byte_slice).to_vec();
611
612        rand::thread_rng().fill_bytes(&mut buf);
613        self.borrow_memory().write(ptr, &buf)?;
614
615        Ok(())
616    }
617
618    /// Gets the current time.
619    ///
620    /// This function obtains the current time as a nanosecond timestamp, as
621    /// [`SystemTime`] is not available inside the guest runtime. Therefore the
622    /// guest needs to request this from the host.
623    ///
624    #[expect(
625        clippy::cast_possible_truncation,
626        reason = "Impossible to overflow in normal circumstances"
627    )]
628    #[expect(
629        clippy::expect_used,
630        clippy::unwrap_in_result,
631        reason = "Effectively infallible here"
632    )]
633    pub fn time_now(&mut self, ptr: u64) -> VMLogicResult<()> {
634        let time = unsafe { self.read_typed::<sys::BufferMut<'_>>(ptr)? };
635
636        if time.len() != 8 {
637            return Err(HostError::InvalidMemoryAccess.into());
638        }
639
640        let now = SystemTime::now()
641            .duration_since(UNIX_EPOCH)
642            .expect("Time went backwards to before the Unix epoch!")
643            .as_nanos() as u64;
644
645        self.borrow_memory()
646            .write(time.ptr().value().as_usize() as u64, &now.to_le_bytes())?;
647
648        Ok(())
649    }
650
651    /// Call the contract's `send_proposal()` function through the bridge.
652    ///
653    /// The proposal actions are obtained as raw data and pushed onto a list of
654    /// proposals to be sent to the host.
655    ///
656    /// Note that multiple actions are received, and the entire batch is pushed
657    /// onto the proposal list to represent one proposal.
658    ///
659    /// # Parameters
660    ///
661    /// * `actions_ptr` - Pointer to the start of the action data in WASM
662    ///                   memory.
663    /// * `actions_len` - Length of the action data.
664    /// * `id_ptr`      - Pointer to the start of the id data in WASM memory.
665    /// * `id_len`      - Length of the action data. This should be 32 bytes.
666    ///
667    pub fn send_proposal(&mut self, actions_ptr: u64, id_ptr: u64) -> VMLogicResult<()> {
668        let actions = unsafe { self.read_typed::<sys::Buffer<'_>>(actions_ptr)? };
669        let id = unsafe { self.read_typed::<sys::BufferMut<'_>>(id_ptr)? };
670
671        if id.len() != 32 {
672            return Err(HostError::InvalidMemoryAccess.into());
673        }
674
675        let actions_bytes = self.read_slice(&actions).to_vec();
676        let mut proposal_id = [0; 32];
677
678        rand::thread_rng().fill_bytes(&mut proposal_id);
679        drop(self.with_logic_mut(|logic| logic.proposals.insert(proposal_id, actions_bytes)));
680
681        self.borrow_memory()
682            .write(id.ptr().value().as_usize() as u64, &proposal_id)?;
683
684        Ok(())
685    }
686
687    pub fn approve_proposal(&mut self, approval_ptr: u64) -> VMLogicResult<()> {
688        let approval = unsafe { self.read_typed::<sys::Buffer<'_>>(approval_ptr)? };
689        let approval_len = approval.len();
690
691        if approval_len != 32 {
692            return Err(HostError::InvalidMemoryAccess.into());
693        }
694        let approval = self.read_guest_memory_sized::<32>(approval_ptr, approval_len)?;
695        let _ = self.with_logic_mut(|logic| logic.approvals.push(approval));
696
697        Ok(())
698    }
699
700    // ========== BLOB FUNCTIONS ==========
701
702    /// Create a new blob for writing
703    /// Returns: file descriptor (u64) for writing operations
704    pub fn blob_create(&mut self) -> VMLogicResult<u64> {
705        if self.borrow_logic().node_client.is_none() {
706            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
707        }
708
709        if self.borrow_logic().blob_handles.len()
710            >= self.borrow_logic().limits.max_blob_handles as usize
711        {
712            return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
713                max: self.borrow_logic().limits.max_blob_handles,
714            }));
715        }
716
717        let fd = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
718            let Some(node_client) = logic.node_client.clone() else {
719                return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
720            };
721
722            let fd = logic.next_blob_fd;
723            logic.next_blob_fd += 1;
724
725            let (data_sender, data_receiver) = mpsc::unbounded_channel();
726
727            let completion_handle = tokio::spawn(async move {
728                let stream = UnboundedReceiverStream::new(data_receiver);
729
730                let byte_stream =
731                    stream.map(|data: Vec<u8>| Ok::<bytes::Bytes, std::io::Error>(data.into()));
732                let reader = byte_stream.into_async_read();
733
734                node_client.add_blob(reader, None, None).await
735            });
736
737            let handle = BlobHandle::Write(BlobWriteHandle {
738                sender: data_sender,
739                completion_handle,
740            });
741
742            drop(logic.blob_handles.insert(fd, handle));
743            Ok(fd)
744        })?;
745
746        Ok(fd)
747    }
748
749    /// Write a chunk of data to a blob
750    /// Returns: number of bytes written (u64)
751    pub fn blob_write(&mut self, fd: u64, data_ptr: u64) -> VMLogicResult<u64> {
752        let data = unsafe { self.read_typed::<sys::Buffer<'_>>(data_ptr)? };
753        let data_len = data.len();
754
755        if self.borrow_logic().node_client.is_none() {
756            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
757        }
758
759        // Validate chunk size
760        if data_len > self.borrow_logic().limits.max_blob_chunk_size {
761            return Err(VMLogicError::HostError(HostError::BlobWriteTooLarge {
762                size: data_len,
763                max: self.borrow_logic().limits.max_blob_chunk_size,
764            }));
765        }
766
767        let data = self.read_slice(&data).to_vec();
768
769        self.with_logic_mut(|logic| {
770            let handle = logic
771                .blob_handles
772                .get(&fd)
773                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
774
775            match handle {
776                BlobHandle::Write(_) => Ok(()),
777                BlobHandle::Read(_) => Err(VMLogicError::HostError(HostError::InvalidBlobHandle)),
778            }
779        })?;
780
781        self.with_logic_mut(|logic| {
782            let handle = logic
783                .blob_handles
784                .get_mut(&fd)
785                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
786            match handle {
787                BlobHandle::Write(w) => {
788                    w.sender
789                        .send(data.clone())
790                        .map_err(|_| VMLogicError::HostError(HostError::InvalidBlobHandle))?;
791                }
792                _ => return Err(VMLogicError::HostError(HostError::InvalidBlobHandle)),
793            }
794            Ok::<(), VMLogicError>(())
795        })?;
796
797        Ok(data_len)
798    }
799
800    /// Close a blob handle and get the resulting blob ID
801    /// Returns: 1 on success
802    pub fn blob_close(&mut self, fd: u64, blob_id_ptr: u64) -> VMLogicResult<u32> {
803        let blob_id = unsafe { self.read_typed::<sys::BufferMut<'_>>(blob_id_ptr)? };
804        let blob_id_ptr = blob_id.ptr().value().as_usize() as u64;
805
806        if self.borrow_logic().node_client.is_none() {
807            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
808        }
809
810        if blob_id.len() != 32 {
811            return Err(HostError::InvalidMemoryAccess.into());
812        }
813
814        let handle = self.with_logic_mut(|logic| {
815            logic
816                .blob_handles
817                .remove(&fd)
818                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))
819        })?;
820
821        match handle {
822            BlobHandle::Write(write_handle) => {
823                drop(write_handle.sender);
824
825                let (blob_id, _size) = tokio::task::block_in_place(|| {
826                    tokio::runtime::Handle::current().block_on(write_handle.completion_handle)
827                })
828                .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?
829                .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?;
830
831                self.borrow_memory().write(blob_id_ptr, blob_id.as_ref())?;
832            }
833            BlobHandle::Read(read_handle) => {
834                self.borrow_memory()
835                    .write(blob_id_ptr, read_handle.blob_id.as_ref())?;
836            }
837        }
838
839        Ok(1)
840    }
841
842    /// Announce a blob to a specific context for network discovery
843    pub fn blob_announce_to_context(
844        &mut self,
845        blob_id_ptr: u64,
846        context_id_ptr: u64,
847    ) -> VMLogicResult<u32> {
848        // Check if blob functionality is available
849        let node_client = match &self.borrow_logic().node_client {
850            Some(client) => client.clone(),
851            None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
852        };
853
854        let blob_id = unsafe { self.read_typed::<sys::Buffer<'_>>(blob_id_ptr)? };
855        let context_id = unsafe { self.read_typed::<sys::Buffer<'_>>(context_id_ptr)? };
856
857        let blob_id_len = blob_id.ptr().value().as_usize() as u64;
858        let context_id_len = context_id.ptr().value().as_usize() as u64;
859
860        // Validate input lengths
861        if blob_id_len != 32 || context_id_len != 32 {
862            return Err(HostError::InvalidMemoryAccess.into());
863        }
864
865        // Read blob_id and context_id from memory
866        let blob_id_bytes = self.read_guest_memory_sized::<32>(blob_id_ptr, blob_id_len)?;
867        let context_id_bytes =
868            self.read_guest_memory_sized::<32>(context_id_ptr, context_id_len)?;
869
870        let blob_id = BlobId::from(blob_id_bytes);
871        let context_id = calimero_primitives::context::ContextId::from(context_id_bytes);
872
873        // Get blob metadata to get size
874        let blob_info = tokio::task::block_in_place(|| {
875            tokio::runtime::Handle::current().block_on(async {
876                node_client
877                    .get_blob_info(blob_id)
878                    .await
879                    .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
880            })
881        })?;
882
883        let blob_info =
884            blob_info.ok_or_else(|| VMLogicError::HostError(HostError::BlobsNotSupported))?;
885
886        // Announce blob to network
887        tokio::task::block_in_place(|| {
888            tokio::runtime::Handle::current().block_on(async {
889                node_client
890                    .announce_blob_to_network(&blob_id, &context_id, blob_info.size)
891                    .await
892                    .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
893            })
894        })?;
895
896        Ok(1)
897    }
898
899    /// Open a blob for reading
900    /// Returns: file descriptor (u64) for reading operations  
901    pub fn blob_open(&mut self, blob_id_ptr: u64) -> VMLogicResult<u64> {
902        let blob_id = unsafe { self.read_typed::<sys::Buffer<'_>>(blob_id_ptr)? };
903        let blob_id_len = blob_id.len();
904
905        if self.borrow_logic().node_client.is_none() {
906            return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
907        }
908
909        if self.borrow_logic().blob_handles.len()
910            >= self.borrow_logic().limits.max_blob_handles as usize
911        {
912            return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
913                max: self.borrow_logic().limits.max_blob_handles,
914            }));
915        }
916
917        if blob_id_len != 32 {
918            return Err(HostError::InvalidMemoryAccess.into());
919        }
920
921        let blob_id_bytes = self
922            .read_guest_memory_sized::<32>(blob_id.ptr().value().as_usize() as u64, blob_id_len)?;
923        let blob_id = BlobId::from(blob_id_bytes);
924
925        let fd = self.with_logic_mut(|logic| {
926            let fd = logic.next_blob_fd;
927            logic.next_blob_fd += 1;
928
929            let handle = BlobHandle::Read(BlobReadHandle {
930                blob_id,
931                stream: None,
932                current_chunk_cursor: None,
933                position: 0,
934            });
935            drop(logic.blob_handles.insert(fd, handle));
936            fd
937        });
938
939        Ok(fd)
940    }
941
942    /// Read a chunk of data from a blob
943    /// Returns: number of bytes read (u64)  
944    pub fn blob_read(&mut self, fd: u64, data_ptr: u64) -> VMLogicResult<u64> {
945        let data = unsafe { self.read_typed::<sys::BufferMut<'_>>(data_ptr)? };
946        let data_len = data.len();
947
948        // Check if blob functionality is available
949        let node_client = match &self.borrow_logic().node_client {
950            Some(client) => client.clone(),
951            None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
952        };
953
954        // Validate buffer size
955        if data_len > self.borrow_logic().limits.max_blob_chunk_size {
956            return Err(VMLogicError::HostError(HostError::BlobBufferTooLarge {
957                size: data_len,
958                max: self.borrow_logic().limits.max_blob_chunk_size,
959            }));
960        }
961
962        if data_len == 0 {
963            return Ok(0);
964        }
965
966        let mut output_buffer = Vec::with_capacity(data_len as usize);
967
968        let bytes_read = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
969            let handle = logic
970                .blob_handles
971                .get_mut(&fd)
972                .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
973
974            let read_handle = match handle {
975                BlobHandle::Read(r) => r,
976                BlobHandle::Write(_) => {
977                    return Err(VMLogicError::HostError(HostError::InvalidBlobHandle))
978                }
979            };
980
981            let needed = data_len as usize;
982
983            // First, try to read from current chunk cursor if available
984            if let Some(cursor) = &mut read_handle.current_chunk_cursor {
985                let mut temp_buffer = vec![0u8; needed];
986                match cursor.read(&mut temp_buffer) {
987                    Ok(bytes_from_cursor) => {
988                        output_buffer.extend_from_slice(&temp_buffer[..bytes_from_cursor]);
989
990                        // If cursor is exhausted, remove it
991                        if bytes_from_cursor == 0
992                            || cursor.position() >= cursor.get_ref().len() as u64
993                        {
994                            read_handle.current_chunk_cursor = None;
995                        }
996
997                        // If we satisfied the request entirely from cursor, we're done
998                        if output_buffer.len() >= needed {
999                            read_handle.position += output_buffer.len() as u64;
1000                            return Ok(output_buffer.len() as u64);
1001                        }
1002                    }
1003                    Err(_) => {
1004                        // Cursor error, remove it
1005                        read_handle.current_chunk_cursor = None;
1006                    }
1007                }
1008            }
1009
1010            if read_handle.stream.is_none() {
1011                let blob_stream = tokio::task::block_in_place(|| {
1012                    tokio::runtime::Handle::current().block_on(async {
1013                        node_client
1014                            .get_blob(&read_handle.blob_id, None)
1015                            .await
1016                            .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
1017                    })
1018                })?;
1019
1020                match blob_stream {
1021                    Some(stream) => {
1022                        let mapped_stream = stream.map(|result| match result {
1023                            Ok(chunk) => Ok(bytes::Bytes::copy_from_slice(&chunk)),
1024                            Err(_) => Err(std::io::Error::new(
1025                                std::io::ErrorKind::Other,
1026                                "blob read error",
1027                            )),
1028                        });
1029                        read_handle.stream = Some(Box::new(mapped_stream));
1030                    }
1031                    None => {
1032                        read_handle.position += output_buffer.len() as u64;
1033                        return Ok(output_buffer.len() as u64);
1034                    }
1035                }
1036            }
1037
1038            if let Some(stream) = &mut read_handle.stream {
1039                tokio::task::block_in_place(|| {
1040                    tokio::runtime::Handle::current().block_on(async {
1041                        while output_buffer.len() < needed {
1042                            match stream.next().await {
1043                                Some(Ok(chunk)) => {
1044                                    let chunk_bytes = chunk.as_ref();
1045                                    let remaining_needed = needed - output_buffer.len();
1046
1047                                    if chunk_bytes.len() <= remaining_needed {
1048                                        output_buffer.extend_from_slice(chunk_bytes);
1049                                    } else {
1050                                        // Use part of chunk, save rest in cursor for next time
1051                                        output_buffer
1052                                            .extend_from_slice(&chunk_bytes[..remaining_needed]);
1053
1054                                        // Create cursor with remaining data
1055                                        let remaining_data =
1056                                            chunk_bytes[remaining_needed..].to_vec();
1057                                        read_handle.current_chunk_cursor =
1058                                            Some(Cursor::new(remaining_data));
1059                                        break;
1060                                    }
1061                                }
1062                                Some(Err(_)) | None => {
1063                                    break;
1064                                }
1065                            }
1066                        }
1067                        Ok::<(), VMLogicError>(())
1068                    })
1069                })?;
1070            }
1071
1072            read_handle.position += output_buffer.len() as u64;
1073            Ok(output_buffer.len() as u64)
1074        })?;
1075
1076        if bytes_read > 0 {
1077            self.borrow_memory()
1078                .write(data.ptr().value().as_usize() as u64, &output_buffer)?;
1079        }
1080
1081        Ok(bytes_read)
1082    }
1083}