1use std::{
2 borrow::Cow,
3 cell::Cell,
4 collections::VecDeque,
5 fmt,
6 future::Future,
7 io,
8 ops::{Deref, DerefMut},
9 sync::{Arc, Mutex},
10 task::Poll::{Pending, Ready},
11 time::Duration,
12};
13
14use anyhow::Context;
15use serde::{Deserialize, Serialize};
16use tokio::{
17 net::{TcpListener, TcpStream},
18 sync::{
19 mpsc::{error::TrySendError, Sender},
20 oneshot,
21 },
22 sync::{oneshot::Sender as OneshotSender, Semaphore},
23};
24use tracing::{error, info, warn, Instrument, Span};
25use wasmer::{
26 self, imports, AsStoreMut, Function, FunctionEnv, FunctionEnvMut, Imports, Instance, Memory,
27 Store, StoreMut,
28};
29
30use env::{
31 messages::{AccountId, HttpRequest, HttpResponseHead, QueryRequest, SystemMessage},
32 tls::{TlsClientConfig, TlsServerConfig},
33 IntPtr, IntRet, OcallError, Result, RetEncode,
34};
35use scale::{Decode, Encode};
36use sidevm_env as env;
37use thread_local::ThreadLocal;
38use wasmer_middlewares::metering;
39
40use crate::{
41 async_context::{get_task_cx, set_task_env, GuestWaker},
42 resource::{Resource, ResourceKeeper, TcpListenerResource},
43 tls::{load_tls_config, TlsStream},
44 IncomingHttpRequest, VmId,
45};
46
47mod wasi_env;
48
49pub struct FnEnvMut<'a, T> {
50 store: StoreMut<'a>,
51 inner: T,
52}
53
54impl<'a, T> Deref for FnEnvMut<'a, T> {
55 type Target = T;
56
57 fn deref(&self) -> &Self::Target {
58 &self.inner
59 }
60}
61
62impl<'a, T> DerefMut for FnEnvMut<'a, T> {
63 fn deref_mut(&mut self) -> &mut Self::Target {
64 &mut self.inner
65 }
66}
67
68impl<'a, T> FnEnvMut<'a, T> {
69 pub fn new(store: &'a mut impl AsStoreMut, value: T) -> Self {
70 Self {
71 store: store.as_store_mut(),
72 inner: value,
73 }
74 }
75}
76
77pub struct ShortId<T>(pub T);
78
79impl<T: AsRef<[u8]>> fmt::Display for ShortId<T> {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 let len = self.0.as_ref().len();
82 hex_fmt::HexFmt(&self.0.as_ref()[..len.min(6)]).fmt(f)
83 }
84}
85
86fn _sizeof_i32_must_eq_to_intptr() {
88 let _ = core::mem::transmute::<i32, IntPtr>;
89}
90
91pub fn create_env(
92 id: VmId,
93 store: &mut Store,
94 cache_ops: DynCacheOps,
95 out_tx: OutgoingRequestChannel,
96 log_handler: Option<LogHandler>,
97 args: Vec<String>,
98) -> (Env, Imports) {
99 let raw_env = Env::new(id, cache_ops, out_tx, log_handler, args);
100 let env = FunctionEnv::new(store, raw_env.clone());
101 let wasi_imports = wasi_env::wasi_imports(store, &env);
102 (
103 raw_env,
104 imports! {
105 "env" => {
106 "sidevm_ocall" => Function::new_typed_with_env(
107 store,
108 &env,
109 sidevm_ocall,
110 ),
111 "sidevm_ocall_fast_return" => Function::new_typed_with_env(
112 store,
113 &env,
114 sidevm_ocall_fast_return,
115 ),
116 },
117 "wasi_snapshot_preview1" => wasi_imports,
118 },
119 )
120}
121
122pub(crate) struct TaskSet {
123 awake_tasks: dashmap::DashSet<i32>,
124 pub(crate) awake_wakers: Mutex<VecDeque<i32>>,
126}
127
128impl TaskSet {
129 fn with_task0() -> Self {
130 let awake_tasks = dashmap::DashSet::new();
131 awake_tasks.insert(0);
132 Self {
133 awake_tasks,
134 awake_wakers: Default::default(),
135 }
136 }
137
138 pub(crate) fn push_task(&self, task_id: i32) {
139 self.awake_tasks.insert(task_id);
140 }
141
142 pub(crate) fn pop_task(&self) -> Option<i32> {
143 let item = self.awake_tasks.iter().next().map(|task_id| *task_id);
144 match item {
145 Some(task_id) => {
146 self.awake_tasks.remove(&task_id);
147 Some(task_id)
148 }
149 None => None,
150 }
151 }
152
153 pub(crate) fn is_empty(&self) -> bool {
154 self.awake_tasks.is_empty() && self.awake_wakers.lock().unwrap().is_empty()
155 }
156}
157
158pub trait CacheOps {
159 fn get(&self, contract: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>>;
160 fn set(&self, contract: &[u8], key: &[u8], value: &[u8]) -> Result<()>;
161 fn set_expiration(&self, contract: &[u8], key: &[u8], expire_after_secs: u64) -> Result<()>;
162 fn remove(&self, contract: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>>;
163}
164
165pub type DynCacheOps = &'static (dyn CacheOps + Send + Sync);
166pub type LogHandler = Box<dyn Fn(VmId, u8, &str) + Send + Sync>;
167
168pub type OutgoingRequestChannel = Sender<(VmId, OutgoingRequest)>;
169
170pub enum OutgoingRequest {
171 Query {
172 contract_id: [u8; 32],
173 payload: Vec<u8>,
174 reply_tx: OneshotSender<Vec<u8>>,
175 },
176 Output(Vec<u8>),
178}
179
180struct VmMemory(Option<Memory>);
181
182pub(crate) struct EnvInner {
183 memory: VmMemory,
184 id: VmId,
185 gas_per_breath: u64,
186 resources: ResourceKeeper,
187 temp_return_value: ThreadLocal<Cell<Option<Vec<u8>>>>,
188 ocall_trace_enabled: bool,
189 message_tx: Option<Sender<Vec<u8>>>,
190 query_tx: Option<Sender<Vec<u8>>>,
191 sys_message_tx: Option<Sender<Vec<u8>>>,
192 http_connect_tx: Option<Sender<Vec<u8>>>,
193 awake_tasks: Arc<TaskSet>,
194 current_task: i32,
195 cache_ops: DynCacheOps,
196 weight: u32,
197 instance: Option<Instance>,
198 outgoing_query_guard: Arc<Semaphore>,
199 outgoing_request_tx: OutgoingRequestChannel,
200 log_handler: Option<LogHandler>,
201 _counter: vm_counter::Counter,
202 args: Vec<String>,
203}
204
205impl VmMemory {
206 pub(crate) fn unwrap_ref(&self) -> &Memory {
207 self.0.as_ref().expect("memory is not initialized")
208 }
209}
210
211struct MemoryView<'a>(wasmer::MemoryView<'a>);
212
213impl<'a> Deref for MemoryView<'a> {
214 type Target = wasmer::MemoryView<'a>;
215
216 fn deref(&self) -> &Self::Target {
217 &self.0
218 }
219}
220
221impl<'a> MemoryView<'a> {
222 fn check_addr(&self, offset: usize, len: usize) -> Result<(usize, usize)> {
223 let end = offset.checked_add(len).ok_or(OcallError::InvalidAddress)?;
224 if end > self.size().bytes().0 {
225 return Err(OcallError::InvalidAddress);
226 }
227 Ok((offset, end))
228 }
229}
230
231impl<'a> env::VmMemory for MemoryView<'a> {
232 fn copy_to_vm(&self, data: &[u8], ptr: IntPtr) -> Result<()> {
233 if data.len() > u32::MAX as usize {
234 return Err(OcallError::NoMemory);
235 }
236 self.write(ptr as _, data)
237 .or(Err(OcallError::InvalidAddress))?;
238 Ok(())
239 }
240
241 fn slice_from_vm(&self, ptr: IntPtr, len: IntPtr) -> Result<&[u8]> {
242 let (offset, end) = self.check_addr(ptr as _, len as _)?;
243 let slice = unsafe { &self.data_unchecked()[offset..end] };
244 Ok(slice)
245 }
246
247 fn slice_from_vm_mut(&self, ptr: IntPtr, len: IntPtr) -> Result<&mut [u8]> {
248 let (offset, end) = self.check_addr(ptr as _, len as _)?;
249 let slice = unsafe { &mut self.data_unchecked_mut()[offset..end] };
250 Ok(slice)
251 }
252}
253
254#[derive(Clone)]
255pub struct Env {
256 pub(crate) inner: Arc<Mutex<EnvInner>>,
257}
258
259impl Env {
260 fn new(
261 id: VmId,
262 cache_ops: DynCacheOps,
263 outgoing_request_tx: OutgoingRequestChannel,
264 log_handler: Option<LogHandler>,
265 args: Vec<String>,
266 ) -> Self {
267 Self {
268 inner: Arc::new(Mutex::new(EnvInner {
269 memory: VmMemory(None),
270 id,
271 gas_per_breath: 0,
272 resources: Default::default(),
273 temp_return_value: Default::default(),
274 ocall_trace_enabled: false,
275 message_tx: None,
276 sys_message_tx: None,
277 query_tx: None,
278 http_connect_tx: None,
279 awake_tasks: Arc::new(TaskSet::with_task0()),
280 current_task: 0,
281 cache_ops,
282 weight: 1,
283 instance: None,
284 outgoing_query_guard: Arc::new(Semaphore::new(1)),
285 outgoing_request_tx,
286 log_handler,
287 _counter: Default::default(),
288 args,
289 })),
290 }
291 }
292
293 pub fn set_memory(&self, memory: Memory) {
294 self.inner.lock().unwrap().memory.0 = Some(memory);
295 }
296
297 pub fn cleanup(&self) {
298 self.inner.lock().unwrap().memory.0 = None;
300 }
301
302 pub fn push_message(&self, message: Vec<u8>) -> Option<Result<(), TrySendError<Vec<u8>>>> {
304 let tx = self.inner.lock().unwrap().message_tx.clone()?;
305 Some(tx.try_send(message))
306 }
307
308 pub fn push_system_message(
310 &self,
311 message: SystemMessage,
312 ) -> Option<Result<(), TrySendError<Vec<u8>>>> {
313 let tx = self.inner.lock().unwrap().sys_message_tx.clone()?;
314 Some(tx.try_send(message.encode()))
315 }
316
317 pub fn push_query(
319 &self,
320 origin: Option<AccountId>,
321 payload: Vec<u8>,
322 reply_tx: OneshotSender<Vec<u8>>,
323 ) -> Option<impl Future<Output = anyhow::Result<()>>> {
324 let mut env_guard = self.inner.lock().unwrap();
325 let tx = env_guard.query_tx.clone()?;
326 let reply_tx = env_guard
327 .resources
328 .push(Resource::OneshotTx(Some(reply_tx)));
329 let inner = Arc::downgrade(&self.inner);
330 Some(async move {
331 let reply_tx = reply_tx?;
332 let query = QueryRequest {
333 origin,
334 payload,
335 reply_tx,
336 };
337 let result = tx.send(query.encode()).await;
338 if result.is_err() {
339 if let Some(inner) = inner.upgrade() {
340 let mut env_guard = inner.lock().unwrap();
341 let _ = env_guard.close(reply_tx);
342 }
343 }
344 result?;
345 Ok(())
346 })
347 }
348
349 pub fn set_gas_per_breath(&self, gas: u64) {
350 self.inner.lock().unwrap().gas_per_breath = gas;
351 }
352
353 pub fn reset_gas_to_breath(&self, store: &mut impl AsStoreMut) {
354 let guard = self.inner.lock().unwrap();
355 let instance = guard
356 .instance
357 .as_ref()
358 .expect("BUG: missing instance in env");
359 metering::set_remaining_points(store, instance, guard.gas_per_breath);
360 }
361
362 pub fn has_more_ready(&self) -> bool {
363 !self.inner.lock().unwrap().awake_tasks.is_empty()
364 }
365
366 pub fn weight(&self) -> u32 {
367 self.inner.lock().unwrap().weight
368 }
369
370 pub fn set_weight(&self, weight: u32) {
371 let mut inner = self.inner.lock().unwrap();
372 inner.weight = weight;
373 tracing::debug!(target: "sidevm", weight, "Weight updated");
374 }
375
376 pub fn set_instance(&self, instance: Instance) {
377 self.inner.lock().unwrap().instance = Some(instance);
378 }
379
380 pub fn is_stifled(&self, store: &mut impl AsStoreMut) -> bool {
381 self.inner.lock().unwrap().is_stifled(store)
382 }
383
384 pub fn memory(&self) -> Memory {
385 self.inner
386 .lock()
387 .unwrap()
388 .memory
389 .0
390 .as_ref()
391 .expect("BUG: missing memory in env")
392 .clone()
393 }
394
395 pub fn push_http_request(
397 &self,
398 request: IncomingHttpRequest,
399 ) -> Option<impl Future<Output = anyhow::Result<()>>> {
400 let IncomingHttpRequest {
401 head,
402 body_stream,
403 response_tx,
404 } = request;
405 let mut env_guard = self.inner.lock().unwrap();
406 let connect_tx = env_guard.http_connect_tx.clone()?;
407 let (reply_tx, reply_rx) = oneshot::channel();
408 let reply_tx = env_guard
409 .resources
410 .push(Resource::OneshotTx(Some(reply_tx)));
411 tokio::spawn(
412 async move {
413 let reply = reply_rx.await;
414 let reply = reply
415 .context("Failed to receive http response")
416 .and_then(|bytes| {
417 let response = HttpResponseHead::decode(&mut &bytes[..])?;
418 Ok(response)
419 });
420 if response_tx.send(reply).is_err() {
421 info!(target: "sidevm", "Failed to send http response");
422 }
423 }
424 .instrument(Span::current()),
425 );
426 let body_stream = env_guard
427 .resources
428 .push(Resource::DuplexStream(body_stream));
429 let inner = Arc::downgrade(&self.inner);
430 Some(async move {
431 let response_tx = reply_tx?;
432 let body_stream = body_stream?;
433 let query = HttpRequest {
434 head,
435 response_tx,
436 io_stream: body_stream,
437 };
438 let result = connect_tx.send(query.encode()).await;
439 if result.is_err() {
440 if let Some(inner) = inner.upgrade() {
441 let mut env_guard = inner.lock().unwrap();
442 let _ = env_guard.close(response_tx);
443 let _ = env_guard.close(body_stream);
444 }
445 }
446 result?;
447 Ok(())
448 })
449 }
450
451 pub fn with_args<T>(&self, f: impl FnOnce(&[String]) -> T) -> T {
452 f(&self.inner.lock().unwrap().args)
453 }
454}
455
456impl<'a, 'b> env::OcallEnv for FnEnvMut<'a, &'b mut EnvInner> {
457 fn put_return(&mut self, rv: Vec<u8>) -> usize {
458 let len = rv.len();
459 self.temp_return_value.get_or_default().set(Some(rv));
460 len
461 }
462
463 fn take_return(&mut self) -> Option<Vec<u8>> {
464 self.temp_return_value.get_or_default().take()
465 }
466}
467
468impl<'a, 'b> env::OcallFuncs for FnEnvMut<'a, &'b mut EnvInner> {
469 fn close(&mut self, resource_id: i32) -> Result<()> {
470 self.inner.close(resource_id)
471 }
472
473 fn poll(&mut self, waker_id: i32, resource_id: i32) -> Result<Vec<u8>> {
474 self.resources.get_mut(resource_id)?.poll(waker_id)
475 }
476
477 fn poll_read(&mut self, waker_id: i32, resource_id: i32, data: &mut [u8]) -> Result<u32> {
478 self.resources
479 .get_mut(resource_id)?
480 .poll_read(waker_id, data)
481 }
482
483 fn poll_write(&mut self, waker_id: i32, resource_id: i32, data: &[u8]) -> Result<u32> {
484 self.resources
485 .get_mut(resource_id)?
486 .poll_write(waker_id, data)
487 }
488
489 fn poll_shutdown(&mut self, waker_id: i32, resource_id: i32) -> Result<()> {
490 self.resources.get_mut(resource_id)?.poll_shutdown(waker_id)
491 }
492
493 fn poll_res(&mut self, waker_id: i32, resource_id: i32) -> Result<i32> {
494 let res = self.resources.get_mut(resource_id)?.poll_res(waker_id)?;
495 self.resources.push(res)
496 }
497
498 fn mark_task_ready(&mut self, task_id: i32) -> Result<()> {
499 self.awake_tasks.push_task(task_id);
500 Ok(())
501 }
502
503 fn next_ready_task(&mut self) -> Result<i32> {
504 self.awake_tasks.pop_task().ok_or(OcallError::NotFound)
505 }
506
507 fn create_timer(&mut self, timeout: i32) -> Result<i32> {
508 let sleep = tokio::time::sleep(Duration::from_millis(timeout as u64));
509 self.resources.push(Resource::Sleep(Box::pin(sleep)))
510 }
511
512 fn enable_ocall_trace(&mut self, enable: bool) -> Result<()> {
513 self.ocall_trace_enabled = enable;
514 Ok(())
515 }
516
517 fn tcp_listen(&mut self, addr: Cow<str>, tls_config: Option<TlsServerConfig>) -> Result<i32> {
518 let std_listener = std::net::TcpListener::bind(&*addr).or(Err(OcallError::IoError))?;
519 std_listener
520 .set_nonblocking(true)
521 .or(Err(OcallError::IoError))?;
522 let listener = TcpListener::from_std(std_listener).or(Err(OcallError::IoError))?;
523 let tls_config = tls_config.map(load_tls_config).transpose()?.map(Arc::new);
524 self.resources
525 .push(Resource::TcpListener(Box::new(TcpListenerResource {
526 listener,
527 tls_config,
528 })))
529 }
530
531 fn tcp_accept(&mut self, waker_id: i32, tcp_res_id: i32) -> Result<(i32, String)> {
532 let waker = GuestWaker::from_id(waker_id);
533 let (res, remote_addr) = {
534 let res = self.resources.get_mut(tcp_res_id)?;
535 let res = match res {
536 Resource::TcpListener(res) => res,
537 _ => return Err(OcallError::UnsupportedOperation),
538 };
539 let (stream, addr) = match get_task_cx(waker, |ct| res.listener.poll_accept(ct)) {
540 Pending => return Err(OcallError::Pending),
541 Ready(result) => result.or(Err(OcallError::IoError))?,
542 };
543 let res = match &res.tls_config {
544 Some(tls_config) => {
545 Resource::TlsStream(Box::new(TlsStream::accept(stream, tls_config.clone())))
546 }
547 None => Resource::TcpStream(Box::new(stream)),
548 };
549 (res, addr)
550 };
551 self.resources
552 .push(res)
553 .map(|res_id| (res_id, remote_addr.to_string()))
554 }
555
556 fn tcp_accept_no_addr(&mut self, waker_id: i32, resource_id: i32) -> Result<i32> {
557 self.tcp_accept(waker_id, resource_id)
558 .map(|(res_id, _)| res_id)
559 }
560
561 fn tcp_connect(&mut self, host: &str, port: u16) -> Result<i32> {
562 if host.len() > 253 {
563 return Err(OcallError::InvalidParameter);
564 }
565 let host = host.to_owned();
566 let fut = async move { tcp_connect(&host, port).await };
567 self.resources.push(Resource::TcpConnect(Box::pin(fut)))
568 }
569
570 fn tcp_connect_tls(&mut self, host: String, port: u16, config: TlsClientConfig) -> Result<i32> {
571 if host.len() > 253 {
572 return Err(OcallError::InvalidParameter);
573 }
574 let TlsClientConfig::V0 = config;
575 let domain = host
576 .as_str()
577 .try_into()
578 .or(Err(OcallError::InvalidParameter))?;
579 let fut = async move {
580 tcp_connect(&host, port)
581 .await
582 .map(move |stream| TlsStream::connect(domain, stream))
583 };
584 self.resources.push(Resource::TlsConnect(Box::pin(fut)))
585 }
586
587 fn log(&mut self, level: log::Level, message: &str) -> Result<()> {
588 log::log!(target: "sidevm", level, "{message}");
589 if let Some(log_handler) = &self.log_handler {
590 log_handler(self.id, level as u8, message);
591 }
592 Ok(())
593 }
594
595 fn local_cache_get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
596 self.cache_ops.get(&self.id[..], key)
597 }
598
599 fn local_cache_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
600 self.cache_ops.set(&self.id[..], key, value)
601 }
602
603 fn local_cache_set_expiration(&mut self, key: &[u8], expire_after_secs: u64) -> Result<()> {
604 self.cache_ops
605 .set_expiration(&self.id[..], key, expire_after_secs)
606 }
607
608 fn local_cache_remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
609 self.cache_ops.remove(&self.id[..], key)
610 }
611
612 fn awake_wakers(&mut self) -> Result<Vec<i32>> {
613 Ok(self
614 .awake_tasks
615 .awake_wakers
616 .lock()
617 .unwrap()
618 .drain(..)
619 .collect())
620 }
621
622 fn getrandom(&mut self, buf: &mut [u8]) -> Result<()> {
623 use rand::RngCore;
624 const RANDOM_BYTE_WEIGHT: usize = 1_000_000;
625
626 self.inner
627 .pay(&mut self.store, (RANDOM_BYTE_WEIGHT * buf.len()) as _)?;
628 rand::thread_rng().fill_bytes(buf);
629 Ok(())
630 }
631
632 fn oneshot_send(&mut self, resource_id: i32, data: &[u8]) -> Result<()> {
633 let res = self.resources.get_mut(resource_id)?;
634 match res {
635 Resource::OneshotTx(sender) => match sender.take() {
636 Some(sender) => sender.send(data.to_vec()).or(Err(OcallError::IoError))?,
637 None => return Err(OcallError::IoError),
638 },
639 _ => return Err(OcallError::UnsupportedOperation),
640 }
641 Ok(())
642 }
643
644 fn create_input_channel(&mut self, ch: env::InputChannel) -> Result<i32> {
645 use env::InputChannel::*;
646 macro_rules! create_channel {
647 ($field: expr) => {{
648 if $field.is_some() {
649 return Err(OcallError::AlreadyExists);
650 }
651 let (tx, rx) = tokio::sync::mpsc::channel(20);
652 let res = self.resources.push(Resource::ChannelRx(rx))?;
653 $field = Some(tx);
654 Ok(res)
655 }};
656 }
657 match ch {
658 GeneralMessage => create_channel!(self.message_tx),
659 SystemMessage => create_channel!(self.sys_message_tx),
660 Query => create_channel!(self.query_tx),
661 HttpRequest => create_channel!(self.http_connect_tx),
662 }
663 }
664
665 fn gas_remaining(&mut self) -> Result<u8> {
666 self.inner.pay(&mut self.store, 1_000_000)?;
667 Ok(if self.gas_per_breath == 0 {
668 100
669 } else {
670 (self.inner.gas_to_breath(&mut self.store) * 100 / self.gas_per_breath) as u8
671 })
672 }
673
674 fn query_local_contract(&mut self, contract_id: [u8; 32], payload: Vec<u8>) -> Result<i32> {
675 let sem = self
676 .inner
677 .outgoing_query_guard
678 .clone()
679 .try_acquire_owned()
680 .or(Err(OcallError::ResourceLimited))?;
681 let (res_tx, res_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1);
682 let res_id = self.resources.push(Resource::ChannelRx(res_rx))?;
683 let (reply_tx, reply_rx) = oneshot::channel();
684 let request = OutgoingRequest::Query {
685 contract_id,
686 payload,
687 reply_tx,
688 };
689 let from = self.inner.id;
690 self.inner
691 .outgoing_request_tx
692 .try_send((from, request))
693 .or(Err(OcallError::IoError))?;
694 tokio::spawn(async move {
695 let _sem = sem;
696 let result = match reply_rx.await {
697 Ok(reply) => res_tx.send(reply).await,
698 Err(_) => {
699 warn!(target: "sidevm", "Failed to receive query result");
700 res_tx.send(Vec::new()).await
701 }
702 };
703 if result.is_err() {
704 error!(target: "sidevm", "Failed to send query result");
705 }
706 });
707 Ok(res_id)
708 }
709
710 fn vmid(&mut self) -> Result<[u8; 32]> {
712 Ok(self.id)
713 }
714
715 fn emit_program_output(&mut self, output: &[u8]) -> Result<()> {
716 let from = self.inner.id;
717 let request = OutgoingRequest::Output(output.to_vec());
718 self.inner
719 .outgoing_request_tx
720 .try_send((from, request))
721 .or(Err(OcallError::IoError))
722 }
723}
724
725impl EnvInner {
726 pub(crate) fn make_mut<'a, 'b>(
727 &'a mut self,
728 store: &'b mut impl AsStoreMut,
729 ) -> FnEnvMut<'b, &'a mut Self> {
730 FnEnvMut::new(store, self)
731 }
732
733 pub(crate) fn close(&mut self, resource_id: i32) -> Result<()> {
734 match self.resources.take(resource_id) {
735 None => Err(OcallError::NotFound),
736 Some(_res) => Ok(()),
737 }
738 }
739
740 fn is_stifled(&mut self, store: &mut impl AsStoreMut) -> bool {
741 let instance = self.instance.as_ref().expect("BUG: instance is not set");
742 match metering::get_remaining_points(store, instance) {
743 metering::MeteringPoints::Remaining(_) => false,
744 metering::MeteringPoints::Exhausted => true,
745 }
746 }
747
748 fn gas_to_breath(&self, store: &mut impl AsStoreMut) -> u64 {
749 let instance = self.instance.as_ref().expect("BUG: instance is not set");
750 match metering::get_remaining_points(store, instance) {
751 metering::MeteringPoints::Remaining(v) => v,
752 metering::MeteringPoints::Exhausted => 0,
753 }
754 }
755
756 fn set_gas_to_breath(&self, store: &mut impl AsStoreMut, gas: u64) {
757 let instance = self.instance.as_ref().expect("BUG: instance is not set");
758 metering::set_remaining_points(store, instance, gas);
759 }
760
761 fn pay(&mut self, store: &mut impl AsStoreMut, cost: u64) -> Result<(), OcallAborted> {
762 let gas = self.gas_to_breath(store);
763 if cost > gas {
764 return Err(OcallAborted::Stifled);
765 }
766 self.set_gas_to_breath(store, gas - cost);
767 Ok(())
768 }
769}
770
771fn is_ip(host: &str) -> bool {
772 host.parse::<std::net::IpAddr>().is_ok()
773}
774
775async fn tcp_connect(host: &str, port: u16) -> io::Result<TcpStream> {
776 fn get_proxy(key: &str) -> Option<String> {
777 std::env::var(key).ok().and_then(|uri| {
778 if uri.trim().is_empty() {
779 None
780 } else {
781 Some(uri)
782 }
783 })
784 }
785
786 let proxy_url = if host.ends_with(".i2p") {
787 get_proxy("i2p_proxy")
788 } else {
789 None
790 };
791
792 if let Some(proxy_url) = proxy_url.or_else(|| get_proxy("all_proxy")) {
793 phala_tokio_proxy::connect((host, port), proxy_url).await
794 } else if is_ip(host) {
795 TcpStream::connect((host, port)).await
796 } else {
797 let resolver = trust_dns_resolver::TokioAsyncResolver::tokio_from_system_conf()
801 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
802 let ips = resolver
803 .lookup_ip(host)
804 .await
805 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
806 let mut last_err = None;
807 for ip in ips {
808 match TcpStream::connect((ip, port)).await {
809 Ok(stream) => return Ok(stream),
810 Err(e) => last_err = Some(e),
811 }
812 }
813 match last_err {
814 Some(e) => Err(e),
815 None => Err(io::Error::new(
816 io::ErrorKind::Other,
817 "DNS: No address found",
818 )),
819 }
820 }
821}
822
823fn sidevm_ocall_fast_return(
824 func_env: FunctionEnvMut<Env>,
825 task_id: i32,
826 func_id: i32,
827 p0: IntPtr,
828 p1: IntPtr,
829 p2: IntPtr,
830 p3: IntPtr,
831) -> Result<IntRet, OcallAborted> {
832 do_ocall(func_env, task_id, func_id, p0, p1, p2, p3, true)
833}
834
835#[allow(clippy::too_many_arguments)]
837fn sidevm_ocall(
838 func_env: FunctionEnvMut<Env>,
839 task_id: i32,
840 func_id: i32,
841 p0: IntPtr,
842 p1: IntPtr,
843 p2: IntPtr,
844 p3: IntPtr,
845) -> Result<IntRet, OcallAborted> {
846 do_ocall(func_env, task_id, func_id, p0, p1, p2, p3, false)
847}
848
849#[allow(clippy::too_many_arguments)]
850#[tracing::instrument(name="ocall", fields(tid=task_id), skip_all)]
851fn do_ocall(
852 mut func_env: FunctionEnvMut<Env>,
853 task_id: i32,
854 func_id: i32,
855 p0: IntPtr,
856 p1: IntPtr,
857 p2: IntPtr,
858 p3: IntPtr,
859 fast_return: bool,
860) -> Result<IntRet, OcallAborted> {
861 let inner = func_env.data().inner.clone();
862 let mut guard = inner.lock().unwrap();
863 let env = &mut *guard;
864
865 env.current_task = task_id;
866 let result = set_task_env(env.awake_tasks.clone(), task_id, || {
867 let memory = env.memory.unwrap_ref().clone();
868 let vm = MemoryView(memory.view(&func_env));
869
870 unsafe fn translife<'b, T>(m: MemoryView<'_>, _l: &'b T) -> MemoryView<'b> {
891 std::mem::transmute(m)
892 }
893 let vm = unsafe { translife(vm, &memory) };
894 let mut state = env.make_mut(&mut func_env);
895 env::dispatch_ocall(fast_return, &mut state, &vm, func_id, p0, p1, p2, p3)
896 });
897
898 if env.ocall_trace_enabled {
899 let func_name = env::ocall_id2name(func_id);
900 tracing::trace!(target: "sidevm", "{func_name}({p0}, {p1}, {p2}, {p3}) = {result:?}");
901 }
902 convert(result)
903}
904
905fn convert(result: Result<i32, OcallError>) -> Result<IntRet, OcallAborted> {
906 match result {
907 Err(OcallError::GasExhausted) => Err(OcallAborted::GasExhausted),
908 Err(OcallError::Stifled) => Err(OcallAborted::Stifled),
909 _ => Ok(result.encode_ret()),
910 }
911}
912
913#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
914pub enum OcallAborted {
915 GasExhausted,
916 Stifled,
917}
918
919impl From<OcallAborted> for OcallError {
920 fn from(aborted: OcallAborted) -> Self {
921 match aborted {
922 OcallAborted::GasExhausted => OcallError::GasExhausted,
923 OcallAborted::Stifled => OcallError::Stifled,
924 }
925 }
926}
927
928impl fmt::Display for OcallAborted {
929 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
930 match self {
931 OcallAborted::GasExhausted => write!(f, "Gas exhausted"),
932 OcallAborted::Stifled => write!(f, "Stifled"),
933 }
934 }
935}
936
937impl std::error::Error for OcallAborted {}
938
939pub use vm_counter::vm_count;
940mod vm_counter {
941 use std::sync::atomic::{AtomicUsize, Ordering};
942
943 pub fn vm_count() -> usize {
944 Counter::current()
945 }
946
947 static COUNTER: AtomicUsize = AtomicUsize::new(0);
948 pub struct Counter(());
949 impl Counter {
950 pub fn current() -> usize {
951 COUNTER.load(Ordering::Relaxed)
952 }
953 }
954 impl Default for Counter {
955 fn default() -> Self {
956 COUNTER.fetch_add(1, Ordering::Relaxed);
957 Self(())
958 }
959 }
960 impl Drop for Counter {
961 fn drop(&mut self) {
962 COUNTER.fetch_sub(1, Ordering::Relaxed);
963 }
964 }
965}