1#![allow(single_use_lifetimes, unused_lifetimes, reason = "False positive")]
2#![allow(clippy::mem_forget, reason = "Safe for now")]
3
4use core::num::NonZeroU64;
5use core::{fmt, slice};
6use std::borrow::Cow;
7use std::collections::{BTreeMap, HashMap};
8use std::io::{Cursor, Read};
9use std::mem::MaybeUninit;
10use std::time::{SystemTime, UNIX_EPOCH};
11use std::vec;
12
13use borsh::from_slice as from_borsh_slice;
14use calimero_node_primitives::client::NodeClient;
15use calimero_primitives::blobs::BlobId;
16use calimero_sys as sys;
17use futures_util::{StreamExt, TryStreamExt};
18use ouroboros::self_referencing;
19use rand::RngCore;
20use serde::Serialize;
21use tokio::sync::mpsc;
22use tokio_stream::wrappers::UnboundedReceiverStream;
23
24use crate::constraint::{Constrained, MaxU64};
25use crate::errors::{FunctionCallError, HostError, Location, PanicContext};
26use crate::store::Storage;
27use crate::Constraint;
28
29mod errors;
30mod imports;
31mod registers;
32
33pub use errors::VMLogicError;
34use registers::Registers;
35
36pub type VMLogicResult<T, E = VMLogicError> = Result<T, E>;
37
38#[derive(Debug)]
39#[non_exhaustive]
40pub struct VMContext<'a> {
41 pub input: Cow<'a, [u8]>,
42 pub context_id: [u8; 32],
43 pub executor_public_key: [u8; 32],
44}
45
46impl<'a> VMContext<'a> {
47 #[must_use]
48 pub const fn new(
49 input: Cow<'a, [u8]>,
50 context_id: [u8; 32],
51 executor_public_key: [u8; 32],
52 ) -> Self {
53 Self {
54 input,
55 context_id,
56 executor_public_key,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy)]
62pub struct VMLimits {
63 pub max_memory_pages: u32,
64 pub max_stack_size: usize,
65 pub max_registers: u64,
66 pub max_register_size: Constrained<u64, MaxU64<{ u64::MAX - 1 }>>,
69 pub max_registers_capacity: u64, pub max_logs: u64,
71 pub max_log_size: u64,
72 pub max_events: u64,
73 pub max_event_kind_size: u64,
74 pub max_event_data_size: u64,
75 pub max_storage_key_size: NonZeroU64,
76 pub max_storage_value_size: NonZeroU64,
77 pub max_blob_handles: u64,
78 pub max_blob_chunk_size: u64,
79}
80
81impl Default for VMLimits {
82 fn default() -> Self {
83 #[inline(always)]
84 fn is_valid<T, E: fmt::Debug>(t: Result<T, E>) -> T {
85 t.expect("is valid")
86 }
87
88 VMLimits {
89 max_memory_pages: 1 << 10, max_stack_size: 200 << 10, max_registers: 100, max_register_size: is_valid((100 << 20).validate()), max_registers_capacity: 1 << 30, max_logs: 100, max_log_size: 16 << 10, max_events: 100, max_event_kind_size: 100, max_event_data_size: 16 << 10, max_storage_key_size: is_valid((1 << 20).try_into()), max_storage_value_size: is_valid((10 << 20).try_into()), max_blob_handles: 100,
102 max_blob_chunk_size: 10 << 20, }
104 }
105}
106
107enum BlobHandle {
108 Write(BlobWriteHandle),
109 Read(BlobReadHandle),
110}
111
112#[derive(Debug)]
113struct BlobWriteHandle {
114 sender: mpsc::UnboundedSender<Vec<u8>>,
115 completion_handle: tokio::task::JoinHandle<eyre::Result<(BlobId, u64)>>,
116}
117
118struct BlobReadHandle {
119 blob_id: BlobId,
120 stream:
122 Option<Box<dyn futures_util::Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>,
123 current_chunk_cursor: Option<Cursor<Vec<u8>>>,
125 position: u64,
126}
127
128#[expect(
129 missing_debug_implementations,
130 reason = "storage and node_client can't impl Debug"
131)]
132pub struct VMLogic<'a> {
133 storage: &'a mut dyn Storage,
134 memory: Option<wasmer::Memory>,
135 context: VMContext<'a>,
136 limits: &'a VMLimits,
137 registers: Registers,
138 returns: Option<VMLogicResult<Vec<u8>, Vec<u8>>>,
139 logs: Vec<String>,
140 events: Vec<Event>,
141 root_hash: Option<[u8; 32]>,
142 artifact: Vec<u8>,
143 proposals: BTreeMap<[u8; 32], Vec<u8>>,
144 approvals: Vec<[u8; 32]>,
145
146 node_client: Option<NodeClient>,
148 blob_handles: HashMap<u64, BlobHandle>,
149 next_blob_fd: u64,
150}
151
152impl<'a> VMLogic<'a> {
153 pub fn new(
154 storage: &'a mut dyn Storage,
155 context: VMContext<'a>,
156 limits: &'a VMLimits,
157 node_client: Option<NodeClient>,
158 ) -> Self {
159 VMLogic {
160 storage,
161 memory: None,
162 context,
163 limits,
164 registers: Registers::default(),
165 returns: None,
166 logs: vec![],
167 events: vec![],
168 root_hash: None,
169 artifact: vec![],
170 proposals: BTreeMap::new(),
171 approvals: vec![],
172
173 node_client,
175 blob_handles: HashMap::new(),
176 next_blob_fd: 1,
177 }
178 }
179
180 pub fn with_memory(&mut self, memory: wasmer::Memory) -> &mut Self {
181 self.memory = Some(memory);
182 self
183 }
184
185 pub fn host_functions(&'a mut self, store: wasmer::StoreMut<'a>) -> VMHostFunctions<'a> {
186 let memory = self.memory.clone().expect("VM Memory not initialized");
187
188 VMHostFunctionsBuilder {
189 logic: self,
190 store,
191
192 memory_builder: |store| memory.view(store),
193 }
194 .build()
195 }
196}
197
198#[derive(Debug, Serialize)]
199#[non_exhaustive]
200pub struct Outcome {
201 pub returns: VMLogicResult<Option<Vec<u8>>, FunctionCallError>,
202 pub logs: Vec<String>,
203 pub events: Vec<Event>,
204 pub root_hash: Option<[u8; 32]>,
205 pub artifact: Vec<u8>,
206 pub proposals: BTreeMap<[u8; 32], Vec<u8>>,
207 pub approvals: Vec<[u8; 32]>,
209 }
212
213#[derive(Debug, Serialize)]
214#[non_exhaustive]
215pub struct Event {
216 pub kind: String,
217 pub data: Vec<u8>,
218}
219
220impl VMLogic<'_> {
221 #[must_use]
222 pub fn finish(self, err: Option<FunctionCallError>) -> Outcome {
223 let returns = match err {
224 Some(err) => Err(err),
225 None => self
226 .returns
227 .map(|t| t.map_err(FunctionCallError::ExecutionError))
228 .transpose(),
229 };
230
231 Outcome {
232 returns,
233 logs: self.logs,
234 events: self.events,
235 root_hash: self.root_hash,
236 artifact: self.artifact,
237 proposals: self.proposals,
238 approvals: self.approvals,
239 }
240 }
241}
242
243#[self_referencing]
244pub struct VMHostFunctions<'a> {
245 logic: &'a mut VMLogic<'a>,
246 store: wasmer::StoreMut<'a>,
247
248 #[covariant]
249 #[borrows(store)]
250 memory: wasmer::MemoryView<'this>,
251}
252
253impl VMHostFunctions<'_> {
254 fn read_slice(&mut self, slice: &sys::Buffer<'_>) -> &mut [u8] {
255 let ptr = slice.ptr().value().as_usize();
256 let len = slice.len() as usize;
257
258 unsafe { &mut self.borrow_memory().data_unchecked_mut()[ptr..ptr + len] }
259 }
260
261 fn read_guest_memory_sized<const N: usize>(
262 &self,
263 ptr: u64,
264 len: u64,
265 ) -> VMLogicResult<[u8; N]> {
266 let len = usize::try_from(len).map_err(|_| HostError::IntegerOverflow)?;
267
268 if len != N {
269 return Err(HostError::InvalidMemoryAccess.into());
270 }
271
272 let mut buf = [0; N];
273
274 self.borrow_memory().read(ptr, &mut buf)?;
275
276 Ok(buf)
277 }
278
279 unsafe fn read_typed<T>(&self, ptr: u64) -> VMLogicResult<T> {
281 let mut value = MaybeUninit::<T>::uninit();
282
283 let raw = slice::from_raw_parts_mut(value.as_mut_ptr().cast::<u8>(), size_of::<T>());
284
285 self.borrow_memory().read(ptr, raw)?;
286
287 Ok(value.assume_init())
288 }
289}
290
291impl VMHostFunctions<'_> {
292 pub fn panic(&mut self, location_ptr: u64) -> VMLogicResult<()> {
293 let location = unsafe { self.read_typed::<sys::Location<'_>>(location_ptr)? };
294
295 let file = String::from_utf8(self.read_slice(&location.file()).to_vec())
296 .map_err(|_| HostError::BadUTF8)?;
297 let line = location.line();
298 let column = location.column();
299
300 Err(HostError::Panic {
301 context: PanicContext::Guest,
302 message: "explicit panic".to_owned(),
303 location: Location::At { file, line, column },
304 }
305 .into())
306 }
307
308 pub fn panic_utf8(&mut self, msg_ptr: u64, location_ptr: u64) -> VMLogicResult<()> {
309 let message_buf = unsafe { self.read_typed::<sys::Buffer<'_>>(msg_ptr)? };
310 let location = unsafe { self.read_typed::<sys::Location<'_>>(location_ptr)? };
311
312 let message = String::from_utf8(self.read_slice(&message_buf).to_vec())
313 .map_err(|_| HostError::BadUTF8)?;
314 let file = String::from_utf8(self.read_slice(&location.file()).to_vec())
315 .map_err(|_| HostError::BadUTF8)?;
316 let line = location.line();
317 let column = location.column();
318
319 Err(HostError::Panic {
320 context: PanicContext::Guest,
321 message,
322 location: Location::At { file, line, column },
323 }
324 .into())
325 }
326
327 pub fn register_len(&self, register_id: u64) -> VMLogicResult<u64> {
328 Ok(self
329 .borrow_logic()
330 .registers
331 .get_len(register_id)
332 .unwrap_or(u64::MAX))
333 }
334
335 pub fn read_register(&self, register_id: u64, register_ptr: u64) -> VMLogicResult<u32> {
336 let register = unsafe { self.read_typed::<sys::BufferMut<'_>>(register_ptr)? };
337
338 let data = self.borrow_logic().registers.get(register_id)?;
339 if data.len() != usize::try_from(register.len()).map_err(|_| HostError::IntegerOverflow)? {
340 return Ok(0);
341 }
342 self.borrow_memory()
343 .write(register.ptr().value().as_usize() as u64, data)?;
344 Ok(1)
345 }
346
347 pub fn context_id(&mut self, register_id: u64) -> VMLogicResult<()> {
348 self.with_logic_mut(|logic| {
349 logic
350 .registers
351 .set(logic.limits, register_id, logic.context.context_id)
352 })
353 }
354
355 pub fn executor_id(&mut self, register_id: u64) -> VMLogicResult<()> {
356 self.with_logic_mut(|logic| {
357 logic
358 .registers
359 .set(logic.limits, register_id, logic.context.executor_public_key)
360 })
361 }
362
363 pub fn input(&mut self, register_id: u64) -> VMLogicResult<()> {
364 self.with_logic_mut(|logic| {
365 logic
366 .registers
367 .set(logic.limits, register_id, &*logic.context.input)
368 })?;
369
370 Ok(())
371 }
372
373 pub fn value_return(&mut self, value_ptr: u64) -> VMLogicResult<()> {
374 let result = unsafe { self.read_typed::<sys::ValueReturn<'_>>(value_ptr)? };
375
376 let result = match result {
377 sys::ValueReturn::Ok(value) => Ok(self.read_slice(&value).to_vec()),
378 sys::ValueReturn::Err(value) => Err(self.read_slice(&value).to_vec()),
379 };
380
381 self.with_logic_mut(|logic| logic.returns = Some(result));
382
383 Ok(())
384 }
385
386 pub fn log_utf8(&mut self, log_ptr: u64) -> VMLogicResult<()> {
387 let buf = unsafe { self.read_typed::<sys::Buffer<'_>>(log_ptr)? };
388
389 let message =
390 String::from_utf8(self.read_slice(&buf).to_vec()).map_err(|_| HostError::BadUTF8)?;
391
392 let logic = self.borrow_logic();
393
394 if logic.logs.len()
395 >= usize::try_from(logic.limits.max_logs).map_err(|_| HostError::IntegerOverflow)?
396 {
397 return Err(HostError::LogsOverflow.into());
398 }
399
400 self.with_logic_mut(|logic| logic.logs.push(message));
401
402 Ok(())
403 }
404
405 pub fn emit(&mut self, event_ptr: u64) -> VMLogicResult<()> {
406 let event = unsafe { self.read_typed::<sys::Event<'_>>(event_ptr)? };
407
408 let kind_len = event.kind().len();
409 let data_len = event.data().len();
410
411 let logic = self.borrow_logic();
412
413 if kind_len > logic.limits.max_event_kind_size {
414 return Err(HostError::EventKindSizeOverflow.into());
415 }
416
417 if data_len > logic.limits.max_event_data_size {
418 return Err(HostError::EventDataSizeOverflow.into());
419 }
420
421 if logic.events.len()
422 >= usize::try_from(logic.limits.max_events).map_err(|_| HostError::IntegerOverflow)?
423 {
424 return Err(HostError::EventsOverflow.into());
425 }
426
427 let kind = String::from_utf8(self.read_slice(event.kind()).to_vec())
428 .map_err(|_| HostError::BadUTF8)?;
429 let data = self.read_slice(event.data()).to_vec();
430
431 self.with_logic_mut(|logic| logic.events.push(Event { kind, data }));
432
433 Ok(())
434 }
435
436 pub fn commit(&mut self, root_hash_ptr: u64, artifact_ptr: u64) -> VMLogicResult<()> {
437 let root_hash = unsafe { self.read_typed::<sys::Buffer<'_>>(root_hash_ptr)? };
438 let artifact = unsafe { self.read_typed::<sys::Buffer<'_>>(artifact_ptr)? };
439
440 let root_hash = self
441 .read_slice(&root_hash)
442 .try_into()
443 .map_err(|_| HostError::InvalidMemoryAccess)?;
444 let artifact = self.read_slice(&artifact).to_vec();
445
446 self.with_logic_mut(|logic| {
447 if logic.root_hash.is_some() {
448 return Err(HostError::InvalidMemoryAccess);
449 }
450
451 logic.root_hash = Some(root_hash);
452 logic.artifact = artifact;
453
454 Ok(())
455 })?;
456
457 Ok(())
458 }
459
460 pub fn storage_read(&mut self, key_ptr: u64, register_id: u64) -> VMLogicResult<u32> {
461 let key = unsafe { self.read_typed::<sys::Buffer<'_>>(key_ptr)? };
462 let key_len = key.len();
463
464 let key = self
465 .read_slice(&key)
466 .try_into()
467 .map_err(|_| HostError::InvalidMemoryAccess)?;
468
469 let logic = self.borrow_logic();
470
471 if key_len > logic.limits.max_storage_key_size.get() {
472 return Err(HostError::KeyLengthOverflow.into());
473 }
474
475 if let Some(value) = logic.storage.get(&key) {
476 self.with_logic_mut(|logic| logic.registers.set(logic.limits, register_id, value))?;
477
478 return Ok(1);
479 }
480
481 Ok(0)
482 }
483
484 pub fn storage_remove(&mut self, key_ptr: u64, register_id: u64) -> VMLogicResult<u32> {
485 let key = unsafe { self.read_typed::<sys::Buffer<'_>>(key_ptr)? };
486 let key_len = key.len();
487
488 let key = self
489 .read_slice(&key)
490 .try_into()
491 .map_err(|_| HostError::InvalidMemoryAccess)?;
492
493 let logic = self.borrow_logic();
494
495 if key_len > logic.limits.max_storage_key_size.get() {
496 return Err(HostError::KeyLengthOverflow.into());
497 }
498
499 if let Some(value) = logic.storage.get(&key) {
500 self.with_logic_mut(|logic| {
501 drop(logic.storage.remove(&key));
502 logic.registers.set(logic.limits, register_id, value)
503 })?;
504
505 return Ok(1);
506 }
507
508 Ok(0)
509 }
510
511 pub fn storage_write(
512 &mut self,
513 key_ptr: u64,
514 value_ptr: u64,
515 register_id: u64,
516 ) -> VMLogicResult<u32> {
517 let key = unsafe { self.read_typed::<sys::Buffer<'_>>(key_ptr)? };
518 let key_len = key.len();
519
520 let value = unsafe { self.read_typed::<sys::Buffer<'_>>(value_ptr)? };
521 let value_len = value.len();
522
523 let logic = self.borrow_logic();
524
525 if key_len > logic.limits.max_storage_key_size.get() {
526 return Err(HostError::KeyLengthOverflow.into());
527 }
528
529 if value_len > logic.limits.max_storage_value_size.get() {
530 return Err(HostError::ValueLengthOverflow.into());
531 }
532
533 let key = self
534 .read_slice(&key)
535 .try_into()
536 .map_err(|_| HostError::InvalidMemoryAccess)?;
537 let value = self
538 .read_slice(&value)
539 .try_into()
540 .map_err(|_| HostError::InvalidMemoryAccess)?;
541
542 let evicted = self.with_logic_mut(|logic| logic.storage.set(key, value));
543
544 if let Some(evicted) = evicted {
545 self.with_logic_mut(|logic| logic.registers.set(logic.limits, register_id, evicted))?;
546
547 return Ok(1);
548 };
549
550 Ok(0)
551 }
552
553 #[expect(clippy::too_many_arguments, reason = "Acceptable here")]
554 pub fn fetch(
555 &mut self,
556 url_ptr: u64,
557 method_ptr: u64,
558 headers_ptr: u64,
559 body_ptr: u64,
560 register_id: u64,
561 ) -> VMLogicResult<u32> {
562 let url = unsafe { self.read_typed::<sys::Buffer<'_>>(url_ptr)? };
563 let method = unsafe { self.read_typed::<sys::Buffer<'_>>(method_ptr)? };
564 let headers = unsafe { self.read_typed::<sys::Buffer<'_>>(headers_ptr)? };
565 let body = unsafe { self.read_typed::<sys::Buffer<'_>>(body_ptr)? };
566
567 let url =
568 String::from_utf8(self.read_slice(&url).to_vec()).map_err(|_| HostError::BadUTF8)?;
569 let method =
570 String::from_utf8(self.read_slice(&method).to_vec()).map_err(|_| HostError::BadUTF8)?;
571
572 let headers = self.read_slice(&headers).to_vec();
573
574 let headers: Vec<(String, String)> =
578 from_borsh_slice(&headers).map_err(|_| HostError::DeserializationError)?;
579
580 let body = self.read_slice(&body).to_vec();
581 let mut request = ureq::request(&method, &url);
582
583 for (key, value) in &headers {
584 request = request.set(key, value);
585 }
586
587 let response = if body.is_empty() {
588 request.call()
589 } else {
590 request.send_bytes(&body)
591 };
592
593 let (status, data) = match response {
594 Ok(response) => {
595 let mut buffer = vec![];
596 match response.into_reader().read_to_end(&mut buffer) {
597 Ok(_) => (0, buffer),
598 Err(_) => (1, "Failed to read the response body.".into()),
599 }
600 }
601 Err(e) => (1, e.to_string().into_bytes()),
602 };
603
604 self.with_logic_mut(|logic| logic.registers.set(logic.limits, register_id, data))?;
605 Ok(status)
606 }
607
608 pub fn random_bytes(&mut self, ptr: u64) -> VMLogicResult<()> {
609 let byte_slice = unsafe { self.read_typed::<sys::BufferMut<'_>>(ptr)? };
610 let mut buf = self.read_slice(&byte_slice).to_vec();
611
612 rand::thread_rng().fill_bytes(&mut buf);
613 self.borrow_memory().write(ptr, &buf)?;
614
615 Ok(())
616 }
617
618 #[expect(
625 clippy::cast_possible_truncation,
626 reason = "Impossible to overflow in normal circumstances"
627 )]
628 #[expect(
629 clippy::expect_used,
630 clippy::unwrap_in_result,
631 reason = "Effectively infallible here"
632 )]
633 pub fn time_now(&mut self, ptr: u64) -> VMLogicResult<()> {
634 let time = unsafe { self.read_typed::<sys::BufferMut<'_>>(ptr)? };
635
636 if time.len() != 8 {
637 return Err(HostError::InvalidMemoryAccess.into());
638 }
639
640 let now = SystemTime::now()
641 .duration_since(UNIX_EPOCH)
642 .expect("Time went backwards to before the Unix epoch!")
643 .as_nanos() as u64;
644
645 self.borrow_memory()
646 .write(time.ptr().value().as_usize() as u64, &now.to_le_bytes())?;
647
648 Ok(())
649 }
650
651 pub fn send_proposal(&mut self, actions_ptr: u64, id_ptr: u64) -> VMLogicResult<()> {
668 let actions = unsafe { self.read_typed::<sys::Buffer<'_>>(actions_ptr)? };
669 let id = unsafe { self.read_typed::<sys::BufferMut<'_>>(id_ptr)? };
670
671 if id.len() != 32 {
672 return Err(HostError::InvalidMemoryAccess.into());
673 }
674
675 let actions_bytes = self.read_slice(&actions).to_vec();
676 let mut proposal_id = [0; 32];
677
678 rand::thread_rng().fill_bytes(&mut proposal_id);
679 drop(self.with_logic_mut(|logic| logic.proposals.insert(proposal_id, actions_bytes)));
680
681 self.borrow_memory()
682 .write(id.ptr().value().as_usize() as u64, &proposal_id)?;
683
684 Ok(())
685 }
686
687 pub fn approve_proposal(&mut self, approval_ptr: u64) -> VMLogicResult<()> {
688 let approval = unsafe { self.read_typed::<sys::Buffer<'_>>(approval_ptr)? };
689 let approval_len = approval.len();
690
691 if approval_len != 32 {
692 return Err(HostError::InvalidMemoryAccess.into());
693 }
694 let approval = self.read_guest_memory_sized::<32>(approval_ptr, approval_len)?;
695 let _ = self.with_logic_mut(|logic| logic.approvals.push(approval));
696
697 Ok(())
698 }
699
700 pub fn blob_create(&mut self) -> VMLogicResult<u64> {
705 if self.borrow_logic().node_client.is_none() {
706 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
707 }
708
709 if self.borrow_logic().blob_handles.len()
710 >= self.borrow_logic().limits.max_blob_handles as usize
711 {
712 return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
713 max: self.borrow_logic().limits.max_blob_handles,
714 }));
715 }
716
717 let fd = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
718 let Some(node_client) = logic.node_client.clone() else {
719 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
720 };
721
722 let fd = logic.next_blob_fd;
723 logic.next_blob_fd += 1;
724
725 let (data_sender, data_receiver) = mpsc::unbounded_channel();
726
727 let completion_handle = tokio::spawn(async move {
728 let stream = UnboundedReceiverStream::new(data_receiver);
729
730 let byte_stream =
731 stream.map(|data: Vec<u8>| Ok::<bytes::Bytes, std::io::Error>(data.into()));
732 let reader = byte_stream.into_async_read();
733
734 node_client.add_blob(reader, None, None).await
735 });
736
737 let handle = BlobHandle::Write(BlobWriteHandle {
738 sender: data_sender,
739 completion_handle,
740 });
741
742 drop(logic.blob_handles.insert(fd, handle));
743 Ok(fd)
744 })?;
745
746 Ok(fd)
747 }
748
749 pub fn blob_write(&mut self, fd: u64, data_ptr: u64) -> VMLogicResult<u64> {
752 let data = unsafe { self.read_typed::<sys::Buffer<'_>>(data_ptr)? };
753 let data_len = data.len();
754
755 if self.borrow_logic().node_client.is_none() {
756 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
757 }
758
759 if data_len > self.borrow_logic().limits.max_blob_chunk_size {
761 return Err(VMLogicError::HostError(HostError::BlobWriteTooLarge {
762 size: data_len,
763 max: self.borrow_logic().limits.max_blob_chunk_size,
764 }));
765 }
766
767 let data = self.read_slice(&data).to_vec();
768
769 self.with_logic_mut(|logic| {
770 let handle = logic
771 .blob_handles
772 .get(&fd)
773 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
774
775 match handle {
776 BlobHandle::Write(_) => Ok(()),
777 BlobHandle::Read(_) => Err(VMLogicError::HostError(HostError::InvalidBlobHandle)),
778 }
779 })?;
780
781 self.with_logic_mut(|logic| {
782 let handle = logic
783 .blob_handles
784 .get_mut(&fd)
785 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
786 match handle {
787 BlobHandle::Write(w) => {
788 w.sender
789 .send(data.clone())
790 .map_err(|_| VMLogicError::HostError(HostError::InvalidBlobHandle))?;
791 }
792 _ => return Err(VMLogicError::HostError(HostError::InvalidBlobHandle)),
793 }
794 Ok::<(), VMLogicError>(())
795 })?;
796
797 Ok(data_len)
798 }
799
800 pub fn blob_close(&mut self, fd: u64, blob_id_ptr: u64) -> VMLogicResult<u32> {
803 let blob_id = unsafe { self.read_typed::<sys::BufferMut<'_>>(blob_id_ptr)? };
804 let blob_id_ptr = blob_id.ptr().value().as_usize() as u64;
805
806 if self.borrow_logic().node_client.is_none() {
807 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
808 }
809
810 if blob_id.len() != 32 {
811 return Err(HostError::InvalidMemoryAccess.into());
812 }
813
814 let handle = self.with_logic_mut(|logic| {
815 logic
816 .blob_handles
817 .remove(&fd)
818 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))
819 })?;
820
821 match handle {
822 BlobHandle::Write(write_handle) => {
823 drop(write_handle.sender);
824
825 let (blob_id, _size) = tokio::task::block_in_place(|| {
826 tokio::runtime::Handle::current().block_on(write_handle.completion_handle)
827 })
828 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?
829 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))?;
830
831 self.borrow_memory().write(blob_id_ptr, blob_id.as_ref())?;
832 }
833 BlobHandle::Read(read_handle) => {
834 self.borrow_memory()
835 .write(blob_id_ptr, read_handle.blob_id.as_ref())?;
836 }
837 }
838
839 Ok(1)
840 }
841
842 pub fn blob_announce_to_context(
844 &mut self,
845 blob_id_ptr: u64,
846 context_id_ptr: u64,
847 ) -> VMLogicResult<u32> {
848 let node_client = match &self.borrow_logic().node_client {
850 Some(client) => client.clone(),
851 None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
852 };
853
854 let blob_id = unsafe { self.read_typed::<sys::Buffer<'_>>(blob_id_ptr)? };
855 let context_id = unsafe { self.read_typed::<sys::Buffer<'_>>(context_id_ptr)? };
856
857 let blob_id_len = blob_id.ptr().value().as_usize() as u64;
858 let context_id_len = context_id.ptr().value().as_usize() as u64;
859
860 if blob_id_len != 32 || context_id_len != 32 {
862 return Err(HostError::InvalidMemoryAccess.into());
863 }
864
865 let blob_id_bytes = self.read_guest_memory_sized::<32>(blob_id_ptr, blob_id_len)?;
867 let context_id_bytes =
868 self.read_guest_memory_sized::<32>(context_id_ptr, context_id_len)?;
869
870 let blob_id = BlobId::from(blob_id_bytes);
871 let context_id = calimero_primitives::context::ContextId::from(context_id_bytes);
872
873 let blob_info = tokio::task::block_in_place(|| {
875 tokio::runtime::Handle::current().block_on(async {
876 node_client
877 .get_blob_info(blob_id)
878 .await
879 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
880 })
881 })?;
882
883 let blob_info =
884 blob_info.ok_or_else(|| VMLogicError::HostError(HostError::BlobsNotSupported))?;
885
886 tokio::task::block_in_place(|| {
888 tokio::runtime::Handle::current().block_on(async {
889 node_client
890 .announce_blob_to_network(&blob_id, &context_id, blob_info.size)
891 .await
892 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
893 })
894 })?;
895
896 Ok(1)
897 }
898
899 pub fn blob_open(&mut self, blob_id_ptr: u64) -> VMLogicResult<u64> {
902 let blob_id = unsafe { self.read_typed::<sys::Buffer<'_>>(blob_id_ptr)? };
903 let blob_id_len = blob_id.len();
904
905 if self.borrow_logic().node_client.is_none() {
906 return Err(VMLogicError::HostError(HostError::BlobsNotSupported));
907 }
908
909 if self.borrow_logic().blob_handles.len()
910 >= self.borrow_logic().limits.max_blob_handles as usize
911 {
912 return Err(VMLogicError::HostError(HostError::TooManyBlobHandles {
913 max: self.borrow_logic().limits.max_blob_handles,
914 }));
915 }
916
917 if blob_id_len != 32 {
918 return Err(HostError::InvalidMemoryAccess.into());
919 }
920
921 let blob_id_bytes = self
922 .read_guest_memory_sized::<32>(blob_id.ptr().value().as_usize() as u64, blob_id_len)?;
923 let blob_id = BlobId::from(blob_id_bytes);
924
925 let fd = self.with_logic_mut(|logic| {
926 let fd = logic.next_blob_fd;
927 logic.next_blob_fd += 1;
928
929 let handle = BlobHandle::Read(BlobReadHandle {
930 blob_id,
931 stream: None,
932 current_chunk_cursor: None,
933 position: 0,
934 });
935 drop(logic.blob_handles.insert(fd, handle));
936 fd
937 });
938
939 Ok(fd)
940 }
941
942 pub fn blob_read(&mut self, fd: u64, data_ptr: u64) -> VMLogicResult<u64> {
945 let data = unsafe { self.read_typed::<sys::BufferMut<'_>>(data_ptr)? };
946 let data_len = data.len();
947
948 let node_client = match &self.borrow_logic().node_client {
950 Some(client) => client.clone(),
951 None => return Err(VMLogicError::HostError(HostError::BlobsNotSupported)),
952 };
953
954 if data_len > self.borrow_logic().limits.max_blob_chunk_size {
956 return Err(VMLogicError::HostError(HostError::BlobBufferTooLarge {
957 size: data_len,
958 max: self.borrow_logic().limits.max_blob_chunk_size,
959 }));
960 }
961
962 if data_len == 0 {
963 return Ok(0);
964 }
965
966 let mut output_buffer = Vec::with_capacity(data_len as usize);
967
968 let bytes_read = self.with_logic_mut(|logic| -> VMLogicResult<u64> {
969 let handle = logic
970 .blob_handles
971 .get_mut(&fd)
972 .ok_or(VMLogicError::HostError(HostError::InvalidBlobHandle))?;
973
974 let read_handle = match handle {
975 BlobHandle::Read(r) => r,
976 BlobHandle::Write(_) => {
977 return Err(VMLogicError::HostError(HostError::InvalidBlobHandle))
978 }
979 };
980
981 let needed = data_len as usize;
982
983 if let Some(cursor) = &mut read_handle.current_chunk_cursor {
985 let mut temp_buffer = vec![0u8; needed];
986 match cursor.read(&mut temp_buffer) {
987 Ok(bytes_from_cursor) => {
988 output_buffer.extend_from_slice(&temp_buffer[..bytes_from_cursor]);
989
990 if bytes_from_cursor == 0
992 || cursor.position() >= cursor.get_ref().len() as u64
993 {
994 read_handle.current_chunk_cursor = None;
995 }
996
997 if output_buffer.len() >= needed {
999 read_handle.position += output_buffer.len() as u64;
1000 return Ok(output_buffer.len() as u64);
1001 }
1002 }
1003 Err(_) => {
1004 read_handle.current_chunk_cursor = None;
1006 }
1007 }
1008 }
1009
1010 if read_handle.stream.is_none() {
1011 let blob_stream = tokio::task::block_in_place(|| {
1012 tokio::runtime::Handle::current().block_on(async {
1013 node_client
1014 .get_blob(&read_handle.blob_id, None)
1015 .await
1016 .map_err(|_| VMLogicError::HostError(HostError::BlobsNotSupported))
1017 })
1018 })?;
1019
1020 match blob_stream {
1021 Some(stream) => {
1022 let mapped_stream = stream.map(|result| match result {
1023 Ok(chunk) => Ok(bytes::Bytes::copy_from_slice(&chunk)),
1024 Err(_) => Err(std::io::Error::new(
1025 std::io::ErrorKind::Other,
1026 "blob read error",
1027 )),
1028 });
1029 read_handle.stream = Some(Box::new(mapped_stream));
1030 }
1031 None => {
1032 read_handle.position += output_buffer.len() as u64;
1033 return Ok(output_buffer.len() as u64);
1034 }
1035 }
1036 }
1037
1038 if let Some(stream) = &mut read_handle.stream {
1039 tokio::task::block_in_place(|| {
1040 tokio::runtime::Handle::current().block_on(async {
1041 while output_buffer.len() < needed {
1042 match stream.next().await {
1043 Some(Ok(chunk)) => {
1044 let chunk_bytes = chunk.as_ref();
1045 let remaining_needed = needed - output_buffer.len();
1046
1047 if chunk_bytes.len() <= remaining_needed {
1048 output_buffer.extend_from_slice(chunk_bytes);
1049 } else {
1050 output_buffer
1052 .extend_from_slice(&chunk_bytes[..remaining_needed]);
1053
1054 let remaining_data =
1056 chunk_bytes[remaining_needed..].to_vec();
1057 read_handle.current_chunk_cursor =
1058 Some(Cursor::new(remaining_data));
1059 break;
1060 }
1061 }
1062 Some(Err(_)) | None => {
1063 break;
1064 }
1065 }
1066 }
1067 Ok::<(), VMLogicError>(())
1068 })
1069 })?;
1070 }
1071
1072 read_handle.position += output_buffer.len() as u64;
1073 Ok(output_buffer.len() as u64)
1074 })?;
1075
1076 if bytes_read > 0 {
1077 self.borrow_memory()
1078 .write(data.ptr().value().as_usize() as u64, &output_buffer)?;
1079 }
1080
1081 Ok(bytes_read)
1082 }
1083}