sidevm_host_runtime/
env.rs

1use std::{
2    borrow::Cow,
3    cell::Cell,
4    collections::VecDeque,
5    fmt,
6    future::Future,
7    io,
8    ops::{Deref, DerefMut},
9    sync::{Arc, Mutex},
10    task::Poll::{Pending, Ready},
11    time::Duration,
12};
13
14use anyhow::Context;
15use serde::{Deserialize, Serialize};
16use tokio::{
17    net::{TcpListener, TcpStream},
18    sync::{
19        mpsc::{error::TrySendError, Sender},
20        oneshot,
21    },
22    sync::{oneshot::Sender as OneshotSender, Semaphore},
23};
24use tracing::{error, info, warn, Instrument, Span};
25use wasmer::{
26    self, imports, AsStoreMut, Function, FunctionEnv, FunctionEnvMut, Imports, Instance, Memory,
27    Store, StoreMut,
28};
29
30use env::{
31    messages::{AccountId, HttpRequest, HttpResponseHead, QueryRequest, SystemMessage},
32    tls::{TlsClientConfig, TlsServerConfig},
33    IntPtr, IntRet, OcallError, Result, RetEncode,
34};
35use scale::{Decode, Encode};
36use sidevm_env as env;
37use thread_local::ThreadLocal;
38use wasmer_middlewares::metering;
39
40use crate::{
41    async_context::{get_task_cx, set_task_env, GuestWaker},
42    resource::{Resource, ResourceKeeper, TcpListenerResource},
43    tls::{load_tls_config, TlsStream},
44    IncomingHttpRequest, VmId,
45};
46
47mod wasi_env;
48
49pub struct FnEnvMut<'a, T> {
50    store: StoreMut<'a>,
51    inner: T,
52}
53
54impl<'a, T> Deref for FnEnvMut<'a, T> {
55    type Target = T;
56
57    fn deref(&self) -> &Self::Target {
58        &self.inner
59    }
60}
61
62impl<'a, T> DerefMut for FnEnvMut<'a, T> {
63    fn deref_mut(&mut self) -> &mut Self::Target {
64        &mut self.inner
65    }
66}
67
68impl<'a, T> FnEnvMut<'a, T> {
69    pub fn new(store: &'a mut impl AsStoreMut, value: T) -> Self {
70        Self {
71            store: store.as_store_mut(),
72            inner: value,
73        }
74    }
75}
76
77pub struct ShortId<T>(pub T);
78
79impl<T: AsRef<[u8]>> fmt::Display for ShortId<T> {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        let len = self.0.as_ref().len();
82        hex_fmt::HexFmt(&self.0.as_ref()[..len.min(6)]).fmt(f)
83    }
84}
85
86// Let the compiler check IntPtr is 32bit sized.
87fn _sizeof_i32_must_eq_to_intptr() {
88    let _ = core::mem::transmute::<i32, IntPtr>;
89}
90
91pub fn create_env(
92    id: VmId,
93    store: &mut Store,
94    cache_ops: DynCacheOps,
95    out_tx: OutgoingRequestChannel,
96    log_handler: Option<LogHandler>,
97    args: Vec<String>,
98) -> (Env, Imports) {
99    let raw_env = Env::new(id, cache_ops, out_tx, log_handler, args);
100    let env = FunctionEnv::new(store, raw_env.clone());
101    let wasi_imports = wasi_env::wasi_imports(store, &env);
102    (
103        raw_env,
104        imports! {
105            "env" => {
106                "sidevm_ocall" => Function::new_typed_with_env(
107                    store,
108                    &env,
109                    sidevm_ocall,
110                ),
111                "sidevm_ocall_fast_return" => Function::new_typed_with_env(
112                    store,
113                    &env,
114                    sidevm_ocall_fast_return,
115                ),
116            },
117            "wasi_snapshot_preview1" => wasi_imports,
118        },
119    )
120}
121
122pub(crate) struct TaskSet {
123    awake_tasks: dashmap::DashSet<i32>,
124    /// Guest waker ids that are ready to be woken up, or to be dropped if negative.
125    pub(crate) awake_wakers: Mutex<VecDeque<i32>>,
126}
127
128impl TaskSet {
129    fn with_task0() -> Self {
130        let awake_tasks = dashmap::DashSet::new();
131        awake_tasks.insert(0);
132        Self {
133            awake_tasks,
134            awake_wakers: Default::default(),
135        }
136    }
137
138    pub(crate) fn push_task(&self, task_id: i32) {
139        self.awake_tasks.insert(task_id);
140    }
141
142    pub(crate) fn pop_task(&self) -> Option<i32> {
143        let item = self.awake_tasks.iter().next().map(|task_id| *task_id);
144        match item {
145            Some(task_id) => {
146                self.awake_tasks.remove(&task_id);
147                Some(task_id)
148            }
149            None => None,
150        }
151    }
152
153    pub(crate) fn is_empty(&self) -> bool {
154        self.awake_tasks.is_empty() && self.awake_wakers.lock().unwrap().is_empty()
155    }
156}
157
158pub trait CacheOps {
159    fn get(&self, contract: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>>;
160    fn set(&self, contract: &[u8], key: &[u8], value: &[u8]) -> Result<()>;
161    fn set_expiration(&self, contract: &[u8], key: &[u8], expire_after_secs: u64) -> Result<()>;
162    fn remove(&self, contract: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>>;
163}
164
165pub type DynCacheOps = &'static (dyn CacheOps + Send + Sync);
166pub type LogHandler = Box<dyn Fn(VmId, u8, &str) + Send + Sync>;
167
168pub type OutgoingRequestChannel = Sender<(VmId, OutgoingRequest)>;
169
170pub enum OutgoingRequest {
171    Query {
172        contract_id: [u8; 32],
173        payload: Vec<u8>,
174        reply_tx: OneshotSender<Vec<u8>>,
175    },
176    // Used by Js Engine to send js eval result
177    Output(Vec<u8>),
178}
179
180struct VmMemory(Option<Memory>);
181
182pub(crate) struct EnvInner {
183    memory: VmMemory,
184    id: VmId,
185    gas_per_breath: u64,
186    resources: ResourceKeeper,
187    temp_return_value: ThreadLocal<Cell<Option<Vec<u8>>>>,
188    ocall_trace_enabled: bool,
189    message_tx: Option<Sender<Vec<u8>>>,
190    query_tx: Option<Sender<Vec<u8>>>,
191    sys_message_tx: Option<Sender<Vec<u8>>>,
192    http_connect_tx: Option<Sender<Vec<u8>>>,
193    awake_tasks: Arc<TaskSet>,
194    current_task: i32,
195    cache_ops: DynCacheOps,
196    weight: u32,
197    instance: Option<Instance>,
198    outgoing_query_guard: Arc<Semaphore>,
199    outgoing_request_tx: OutgoingRequestChannel,
200    log_handler: Option<LogHandler>,
201    _counter: vm_counter::Counter,
202    args: Vec<String>,
203}
204
205impl VmMemory {
206    pub(crate) fn unwrap_ref(&self) -> &Memory {
207        self.0.as_ref().expect("memory is not initialized")
208    }
209}
210
211struct MemoryView<'a>(wasmer::MemoryView<'a>);
212
213impl<'a> Deref for MemoryView<'a> {
214    type Target = wasmer::MemoryView<'a>;
215
216    fn deref(&self) -> &Self::Target {
217        &self.0
218    }
219}
220
221impl<'a> MemoryView<'a> {
222    fn check_addr(&self, offset: usize, len: usize) -> Result<(usize, usize)> {
223        let end = offset.checked_add(len).ok_or(OcallError::InvalidAddress)?;
224        if end > self.size().bytes().0 {
225            return Err(OcallError::InvalidAddress);
226        }
227        Ok((offset, end))
228    }
229}
230
231impl<'a> env::VmMemory for MemoryView<'a> {
232    fn copy_to_vm(&self, data: &[u8], ptr: IntPtr) -> Result<()> {
233        if data.len() > u32::MAX as usize {
234            return Err(OcallError::NoMemory);
235        }
236        self.write(ptr as _, data)
237            .or(Err(OcallError::InvalidAddress))?;
238        Ok(())
239    }
240
241    fn slice_from_vm(&self, ptr: IntPtr, len: IntPtr) -> Result<&[u8]> {
242        let (offset, end) = self.check_addr(ptr as _, len as _)?;
243        let slice = unsafe { &self.data_unchecked()[offset..end] };
244        Ok(slice)
245    }
246
247    fn slice_from_vm_mut(&self, ptr: IntPtr, len: IntPtr) -> Result<&mut [u8]> {
248        let (offset, end) = self.check_addr(ptr as _, len as _)?;
249        let slice = unsafe { &mut self.data_unchecked_mut()[offset..end] };
250        Ok(slice)
251    }
252}
253
254#[derive(Clone)]
255pub struct Env {
256    pub(crate) inner: Arc<Mutex<EnvInner>>,
257}
258
259impl Env {
260    fn new(
261        id: VmId,
262        cache_ops: DynCacheOps,
263        outgoing_request_tx: OutgoingRequestChannel,
264        log_handler: Option<LogHandler>,
265        args: Vec<String>,
266    ) -> Self {
267        Self {
268            inner: Arc::new(Mutex::new(EnvInner {
269                memory: VmMemory(None),
270                id,
271                gas_per_breath: 0,
272                resources: Default::default(),
273                temp_return_value: Default::default(),
274                ocall_trace_enabled: false,
275                message_tx: None,
276                sys_message_tx: None,
277                query_tx: None,
278                http_connect_tx: None,
279                awake_tasks: Arc::new(TaskSet::with_task0()),
280                current_task: 0,
281                cache_ops,
282                weight: 1,
283                instance: None,
284                outgoing_query_guard: Arc::new(Semaphore::new(1)),
285                outgoing_request_tx,
286                log_handler,
287                _counter: Default::default(),
288                args,
289            })),
290        }
291    }
292
293    pub fn set_memory(&self, memory: Memory) {
294        self.inner.lock().unwrap().memory.0 = Some(memory);
295    }
296
297    pub fn cleanup(&self) {
298        // Cut up the reference cycle to avoid leaks.
299        self.inner.lock().unwrap().memory.0 = None;
300    }
301
302    /// Push a pink message into the Sidevm instance.
303    pub fn push_message(&self, message: Vec<u8>) -> Option<Result<(), TrySendError<Vec<u8>>>> {
304        let tx = self.inner.lock().unwrap().message_tx.clone()?;
305        Some(tx.try_send(message))
306    }
307
308    /// Push a pink system message into the Sidevm instance.
309    pub fn push_system_message(
310        &self,
311        message: SystemMessage,
312    ) -> Option<Result<(), TrySendError<Vec<u8>>>> {
313        let tx = self.inner.lock().unwrap().sys_message_tx.clone()?;
314        Some(tx.try_send(message.encode()))
315    }
316
317    /// Push a contract query to the Sidevm instance.
318    pub fn push_query(
319        &self,
320        origin: Option<AccountId>,
321        payload: Vec<u8>,
322        reply_tx: OneshotSender<Vec<u8>>,
323    ) -> Option<impl Future<Output = anyhow::Result<()>>> {
324        let mut env_guard = self.inner.lock().unwrap();
325        let tx = env_guard.query_tx.clone()?;
326        let reply_tx = env_guard
327            .resources
328            .push(Resource::OneshotTx(Some(reply_tx)));
329        let inner = Arc::downgrade(&self.inner);
330        Some(async move {
331            let reply_tx = reply_tx?;
332            let query = QueryRequest {
333                origin,
334                payload,
335                reply_tx,
336            };
337            let result = tx.send(query.encode()).await;
338            if result.is_err() {
339                if let Some(inner) = inner.upgrade() {
340                    let mut env_guard = inner.lock().unwrap();
341                    let _ = env_guard.close(reply_tx);
342                }
343            }
344            result?;
345            Ok(())
346        })
347    }
348
349    pub fn set_gas_per_breath(&self, gas: u64) {
350        self.inner.lock().unwrap().gas_per_breath = gas;
351    }
352
353    pub fn reset_gas_to_breath(&self, store: &mut impl AsStoreMut) {
354        let guard = self.inner.lock().unwrap();
355        let instance = guard
356            .instance
357            .as_ref()
358            .expect("BUG: missing instance in env");
359        metering::set_remaining_points(store, instance, guard.gas_per_breath);
360    }
361
362    pub fn has_more_ready(&self) -> bool {
363        !self.inner.lock().unwrap().awake_tasks.is_empty()
364    }
365
366    pub fn weight(&self) -> u32 {
367        self.inner.lock().unwrap().weight
368    }
369
370    pub fn set_weight(&self, weight: u32) {
371        let mut inner = self.inner.lock().unwrap();
372        inner.weight = weight;
373        tracing::debug!(target: "sidevm", weight, "Weight updated");
374    }
375
376    pub fn set_instance(&self, instance: Instance) {
377        self.inner.lock().unwrap().instance = Some(instance);
378    }
379
380    pub fn is_stifled(&self, store: &mut impl AsStoreMut) -> bool {
381        self.inner.lock().unwrap().is_stifled(store)
382    }
383
384    pub fn memory(&self) -> Memory {
385        self.inner
386            .lock()
387            .unwrap()
388            .memory
389            .0
390            .as_ref()
391            .expect("BUG: missing memory in env")
392            .clone()
393    }
394
395    /// Establish a incoming HTTP connection.
396    pub fn push_http_request(
397        &self,
398        request: IncomingHttpRequest,
399    ) -> Option<impl Future<Output = anyhow::Result<()>>> {
400        let IncomingHttpRequest {
401            head,
402            body_stream,
403            response_tx,
404        } = request;
405        let mut env_guard = self.inner.lock().unwrap();
406        let connect_tx = env_guard.http_connect_tx.clone()?;
407        let (reply_tx, reply_rx) = oneshot::channel();
408        let reply_tx = env_guard
409            .resources
410            .push(Resource::OneshotTx(Some(reply_tx)));
411        tokio::spawn(
412            async move {
413                let reply = reply_rx.await;
414                let reply = reply
415                    .context("Failed to receive http response")
416                    .and_then(|bytes| {
417                        let response = HttpResponseHead::decode(&mut &bytes[..])?;
418                        Ok(response)
419                    });
420                if response_tx.send(reply).is_err() {
421                    info!(target: "sidevm", "Failed to send http response");
422                }
423            }
424            .instrument(Span::current()),
425        );
426        let body_stream = env_guard
427            .resources
428            .push(Resource::DuplexStream(body_stream));
429        let inner = Arc::downgrade(&self.inner);
430        Some(async move {
431            let response_tx = reply_tx?;
432            let body_stream = body_stream?;
433            let query = HttpRequest {
434                head,
435                response_tx,
436                io_stream: body_stream,
437            };
438            let result = connect_tx.send(query.encode()).await;
439            if result.is_err() {
440                if let Some(inner) = inner.upgrade() {
441                    let mut env_guard = inner.lock().unwrap();
442                    let _ = env_guard.close(response_tx);
443                    let _ = env_guard.close(body_stream);
444                }
445            }
446            result?;
447            Ok(())
448        })
449    }
450
451    pub fn with_args<T>(&self, f: impl FnOnce(&[String]) -> T) -> T {
452        f(&self.inner.lock().unwrap().args)
453    }
454}
455
456impl<'a, 'b> env::OcallEnv for FnEnvMut<'a, &'b mut EnvInner> {
457    fn put_return(&mut self, rv: Vec<u8>) -> usize {
458        let len = rv.len();
459        self.temp_return_value.get_or_default().set(Some(rv));
460        len
461    }
462
463    fn take_return(&mut self) -> Option<Vec<u8>> {
464        self.temp_return_value.get_or_default().take()
465    }
466}
467
468impl<'a, 'b> env::OcallFuncs for FnEnvMut<'a, &'b mut EnvInner> {
469    fn close(&mut self, resource_id: i32) -> Result<()> {
470        self.inner.close(resource_id)
471    }
472
473    fn poll(&mut self, waker_id: i32, resource_id: i32) -> Result<Vec<u8>> {
474        self.resources.get_mut(resource_id)?.poll(waker_id)
475    }
476
477    fn poll_read(&mut self, waker_id: i32, resource_id: i32, data: &mut [u8]) -> Result<u32> {
478        self.resources
479            .get_mut(resource_id)?
480            .poll_read(waker_id, data)
481    }
482
483    fn poll_write(&mut self, waker_id: i32, resource_id: i32, data: &[u8]) -> Result<u32> {
484        self.resources
485            .get_mut(resource_id)?
486            .poll_write(waker_id, data)
487    }
488
489    fn poll_shutdown(&mut self, waker_id: i32, resource_id: i32) -> Result<()> {
490        self.resources.get_mut(resource_id)?.poll_shutdown(waker_id)
491    }
492
493    fn poll_res(&mut self, waker_id: i32, resource_id: i32) -> Result<i32> {
494        let res = self.resources.get_mut(resource_id)?.poll_res(waker_id)?;
495        self.resources.push(res)
496    }
497
498    fn mark_task_ready(&mut self, task_id: i32) -> Result<()> {
499        self.awake_tasks.push_task(task_id);
500        Ok(())
501    }
502
503    fn next_ready_task(&mut self) -> Result<i32> {
504        self.awake_tasks.pop_task().ok_or(OcallError::NotFound)
505    }
506
507    fn create_timer(&mut self, timeout: i32) -> Result<i32> {
508        let sleep = tokio::time::sleep(Duration::from_millis(timeout as u64));
509        self.resources.push(Resource::Sleep(Box::pin(sleep)))
510    }
511
512    fn enable_ocall_trace(&mut self, enable: bool) -> Result<()> {
513        self.ocall_trace_enabled = enable;
514        Ok(())
515    }
516
517    fn tcp_listen(&mut self, addr: Cow<str>, tls_config: Option<TlsServerConfig>) -> Result<i32> {
518        let std_listener = std::net::TcpListener::bind(&*addr).or(Err(OcallError::IoError))?;
519        std_listener
520            .set_nonblocking(true)
521            .or(Err(OcallError::IoError))?;
522        let listener = TcpListener::from_std(std_listener).or(Err(OcallError::IoError))?;
523        let tls_config = tls_config.map(load_tls_config).transpose()?.map(Arc::new);
524        self.resources
525            .push(Resource::TcpListener(Box::new(TcpListenerResource {
526                listener,
527                tls_config,
528            })))
529    }
530
531    fn tcp_accept(&mut self, waker_id: i32, tcp_res_id: i32) -> Result<(i32, String)> {
532        let waker = GuestWaker::from_id(waker_id);
533        let (res, remote_addr) = {
534            let res = self.resources.get_mut(tcp_res_id)?;
535            let res = match res {
536                Resource::TcpListener(res) => res,
537                _ => return Err(OcallError::UnsupportedOperation),
538            };
539            let (stream, addr) = match get_task_cx(waker, |ct| res.listener.poll_accept(ct)) {
540                Pending => return Err(OcallError::Pending),
541                Ready(result) => result.or(Err(OcallError::IoError))?,
542            };
543            let res = match &res.tls_config {
544                Some(tls_config) => {
545                    Resource::TlsStream(Box::new(TlsStream::accept(stream, tls_config.clone())))
546                }
547                None => Resource::TcpStream(Box::new(stream)),
548            };
549            (res, addr)
550        };
551        self.resources
552            .push(res)
553            .map(|res_id| (res_id, remote_addr.to_string()))
554    }
555
556    fn tcp_accept_no_addr(&mut self, waker_id: i32, resource_id: i32) -> Result<i32> {
557        self.tcp_accept(waker_id, resource_id)
558            .map(|(res_id, _)| res_id)
559    }
560
561    fn tcp_connect(&mut self, host: &str, port: u16) -> Result<i32> {
562        if host.len() > 253 {
563            return Err(OcallError::InvalidParameter);
564        }
565        let host = host.to_owned();
566        let fut = async move { tcp_connect(&host, port).await };
567        self.resources.push(Resource::TcpConnect(Box::pin(fut)))
568    }
569
570    fn tcp_connect_tls(&mut self, host: String, port: u16, config: TlsClientConfig) -> Result<i32> {
571        if host.len() > 253 {
572            return Err(OcallError::InvalidParameter);
573        }
574        let TlsClientConfig::V0 = config;
575        let domain = host
576            .as_str()
577            .try_into()
578            .or(Err(OcallError::InvalidParameter))?;
579        let fut = async move {
580            tcp_connect(&host, port)
581                .await
582                .map(move |stream| TlsStream::connect(domain, stream))
583        };
584        self.resources.push(Resource::TlsConnect(Box::pin(fut)))
585    }
586
587    fn log(&mut self, level: log::Level, message: &str) -> Result<()> {
588        log::log!(target: "sidevm", level, "{message}");
589        if let Some(log_handler) = &self.log_handler {
590            log_handler(self.id, level as u8, message);
591        }
592        Ok(())
593    }
594
595    fn local_cache_get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
596        self.cache_ops.get(&self.id[..], key)
597    }
598
599    fn local_cache_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
600        self.cache_ops.set(&self.id[..], key, value)
601    }
602
603    fn local_cache_set_expiration(&mut self, key: &[u8], expire_after_secs: u64) -> Result<()> {
604        self.cache_ops
605            .set_expiration(&self.id[..], key, expire_after_secs)
606    }
607
608    fn local_cache_remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
609        self.cache_ops.remove(&self.id[..], key)
610    }
611
612    fn awake_wakers(&mut self) -> Result<Vec<i32>> {
613        Ok(self
614            .awake_tasks
615            .awake_wakers
616            .lock()
617            .unwrap()
618            .drain(..)
619            .collect())
620    }
621
622    fn getrandom(&mut self, buf: &mut [u8]) -> Result<()> {
623        use rand::RngCore;
624        const RANDOM_BYTE_WEIGHT: usize = 1_000_000;
625
626        self.inner
627            .pay(&mut self.store, (RANDOM_BYTE_WEIGHT * buf.len()) as _)?;
628        rand::thread_rng().fill_bytes(buf);
629        Ok(())
630    }
631
632    fn oneshot_send(&mut self, resource_id: i32, data: &[u8]) -> Result<()> {
633        let res = self.resources.get_mut(resource_id)?;
634        match res {
635            Resource::OneshotTx(sender) => match sender.take() {
636                Some(sender) => sender.send(data.to_vec()).or(Err(OcallError::IoError))?,
637                None => return Err(OcallError::IoError),
638            },
639            _ => return Err(OcallError::UnsupportedOperation),
640        }
641        Ok(())
642    }
643
644    fn create_input_channel(&mut self, ch: env::InputChannel) -> Result<i32> {
645        use env::InputChannel::*;
646        macro_rules! create_channel {
647            ($field: expr) => {{
648                if $field.is_some() {
649                    return Err(OcallError::AlreadyExists);
650                }
651                let (tx, rx) = tokio::sync::mpsc::channel(20);
652                let res = self.resources.push(Resource::ChannelRx(rx))?;
653                $field = Some(tx);
654                Ok(res)
655            }};
656        }
657        match ch {
658            GeneralMessage => create_channel!(self.message_tx),
659            SystemMessage => create_channel!(self.sys_message_tx),
660            Query => create_channel!(self.query_tx),
661            HttpRequest => create_channel!(self.http_connect_tx),
662        }
663    }
664
665    fn gas_remaining(&mut self) -> Result<u8> {
666        self.inner.pay(&mut self.store, 1_000_000)?;
667        Ok(if self.gas_per_breath == 0 {
668            100
669        } else {
670            (self.inner.gas_to_breath(&mut self.store) * 100 / self.gas_per_breath) as u8
671        })
672    }
673
674    fn query_local_contract(&mut self, contract_id: [u8; 32], payload: Vec<u8>) -> Result<i32> {
675        let sem = self
676            .inner
677            .outgoing_query_guard
678            .clone()
679            .try_acquire_owned()
680            .or(Err(OcallError::ResourceLimited))?;
681        let (res_tx, res_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1);
682        let res_id = self.resources.push(Resource::ChannelRx(res_rx))?;
683        let (reply_tx, reply_rx) = oneshot::channel();
684        let request = OutgoingRequest::Query {
685            contract_id,
686            payload,
687            reply_tx,
688        };
689        let from = self.inner.id;
690        self.inner
691            .outgoing_request_tx
692            .try_send((from, request))
693            .or(Err(OcallError::IoError))?;
694        tokio::spawn(async move {
695            let _sem = sem;
696            let result = match reply_rx.await {
697                Ok(reply) => res_tx.send(reply).await,
698                Err(_) => {
699                    warn!(target: "sidevm", "Failed to receive query result");
700                    res_tx.send(Vec::new()).await
701                }
702            };
703            if result.is_err() {
704                error!(target: "sidevm", "Failed to send query result");
705            }
706        });
707        Ok(res_id)
708    }
709
710    /// Returns the vmid of the current instance.
711    fn vmid(&mut self) -> Result<[u8; 32]> {
712        Ok(self.id)
713    }
714
715    fn emit_program_output(&mut self, output: &[u8]) -> Result<()> {
716        let from = self.inner.id;
717        let request = OutgoingRequest::Output(output.to_vec());
718        self.inner
719            .outgoing_request_tx
720            .try_send((from, request))
721            .or(Err(OcallError::IoError))
722    }
723}
724
725impl EnvInner {
726    pub(crate) fn make_mut<'a, 'b>(
727        &'a mut self,
728        store: &'b mut impl AsStoreMut,
729    ) -> FnEnvMut<'b, &'a mut Self> {
730        FnEnvMut::new(store, self)
731    }
732
733    pub(crate) fn close(&mut self, resource_id: i32) -> Result<()> {
734        match self.resources.take(resource_id) {
735            None => Err(OcallError::NotFound),
736            Some(_res) => Ok(()),
737        }
738    }
739
740    fn is_stifled(&mut self, store: &mut impl AsStoreMut) -> bool {
741        let instance = self.instance.as_ref().expect("BUG: instance is not set");
742        match metering::get_remaining_points(store, instance) {
743            metering::MeteringPoints::Remaining(_) => false,
744            metering::MeteringPoints::Exhausted => true,
745        }
746    }
747
748    fn gas_to_breath(&self, store: &mut impl AsStoreMut) -> u64 {
749        let instance = self.instance.as_ref().expect("BUG: instance is not set");
750        match metering::get_remaining_points(store, instance) {
751            metering::MeteringPoints::Remaining(v) => v,
752            metering::MeteringPoints::Exhausted => 0,
753        }
754    }
755
756    fn set_gas_to_breath(&self, store: &mut impl AsStoreMut, gas: u64) {
757        let instance = self.instance.as_ref().expect("BUG: instance is not set");
758        metering::set_remaining_points(store, instance, gas);
759    }
760
761    fn pay(&mut self, store: &mut impl AsStoreMut, cost: u64) -> Result<(), OcallAborted> {
762        let gas = self.gas_to_breath(store);
763        if cost > gas {
764            return Err(OcallAborted::Stifled);
765        }
766        self.set_gas_to_breath(store, gas - cost);
767        Ok(())
768    }
769}
770
771fn is_ip(host: &str) -> bool {
772    host.parse::<std::net::IpAddr>().is_ok()
773}
774
775async fn tcp_connect(host: &str, port: u16) -> io::Result<TcpStream> {
776    fn get_proxy(key: &str) -> Option<String> {
777        std::env::var(key).ok().and_then(|uri| {
778            if uri.trim().is_empty() {
779                None
780            } else {
781                Some(uri)
782            }
783        })
784    }
785
786    let proxy_url = if host.ends_with(".i2p") {
787        get_proxy("i2p_proxy")
788    } else {
789        None
790    };
791
792    if let Some(proxy_url) = proxy_url.or_else(|| get_proxy("all_proxy")) {
793        phala_tokio_proxy::connect((host, port), proxy_url).await
794    } else if is_ip(host) {
795        TcpStream::connect((host, port)).await
796    } else {
797        // By default, tokio uses the blocking DNS resovler from libc and run them in a thread pool.
798        // That would cause problem such as run out of thread-pool in some poor network situation.
799        // So, we use trust-dns async resolver here.
800        let resolver = trust_dns_resolver::TokioAsyncResolver::tokio_from_system_conf()
801            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
802        let ips = resolver
803            .lookup_ip(host)
804            .await
805            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
806        let mut last_err = None;
807        for ip in ips {
808            match TcpStream::connect((ip, port)).await {
809                Ok(stream) => return Ok(stream),
810                Err(e) => last_err = Some(e),
811            }
812        }
813        match last_err {
814            Some(e) => Err(e),
815            None => Err(io::Error::new(
816                io::ErrorKind::Other,
817                "DNS: No address found",
818            )),
819        }
820    }
821}
822
823fn sidevm_ocall_fast_return(
824    func_env: FunctionEnvMut<Env>,
825    task_id: i32,
826    func_id: i32,
827    p0: IntPtr,
828    p1: IntPtr,
829    p2: IntPtr,
830    p3: IntPtr,
831) -> Result<IntRet, OcallAborted> {
832    do_ocall(func_env, task_id, func_id, p0, p1, p2, p3, true)
833}
834
835// Support all ocalls. Put the result into a temporary vec and wait for next fetch_result ocall to fetch the result.
836#[allow(clippy::too_many_arguments)]
837fn sidevm_ocall(
838    func_env: FunctionEnvMut<Env>,
839    task_id: i32,
840    func_id: i32,
841    p0: IntPtr,
842    p1: IntPtr,
843    p2: IntPtr,
844    p3: IntPtr,
845) -> Result<IntRet, OcallAborted> {
846    do_ocall(func_env, task_id, func_id, p0, p1, p2, p3, false)
847}
848
849#[allow(clippy::too_many_arguments)]
850#[tracing::instrument(name="ocall", fields(tid=task_id), skip_all)]
851fn do_ocall(
852    mut func_env: FunctionEnvMut<Env>,
853    task_id: i32,
854    func_id: i32,
855    p0: IntPtr,
856    p1: IntPtr,
857    p2: IntPtr,
858    p3: IntPtr,
859    fast_return: bool,
860) -> Result<IntRet, OcallAborted> {
861    let inner = func_env.data().inner.clone();
862    let mut guard = inner.lock().unwrap();
863    let env = &mut *guard;
864
865    env.current_task = task_id;
866    let result = set_task_env(env.awake_tasks.clone(), task_id, || {
867        let memory = env.memory.unwrap_ref().clone();
868        let vm = MemoryView(memory.view(&func_env));
869
870        // Safety:
871        //
872        // Started from wasmer 3.3, the lifetime of MemoryView is bound to the lifetime of the Store
873        // rather than bound to the Memory as before. The behavior before was unsound because the
874        // Memory contents could be changed. Especially when a grow operation happens, the Memory
875        // could be reallocated and the old MemoryView would be invalid.
876        //
877        // Why we need to transmute the lifetime here?
878        // To make the larger chunks of data passings between the host and the guest efficient, we
879        // decided to directly pass the data chunks to ocall functions by reference instead of
880        // copying them. For example, given a ocall defined as `fn tcp_read(fd: i32, data: &mut [u8])`,
881        // the parameter `data` is a reference to the guest memory rather than copied to a owned vec.
882        // Obviously, the lifetime of the reference is bound to the guest memory which stored in the
883        // instance of Store.
884        //
885        // It would be safe only when the following conditions are met:
886        //   1. The referred slice of guest memory is not changed by other codes during the ocall.
887        //   2. The entire guest memory is not reallocated during the ocall function call.
888        // Currently, we can guarantee both conditions are met by carefully implementing the ocall
889        // functions and make sure no guest reentrancy happens during the ocall function call.
890        unsafe fn translife<'b, T>(m: MemoryView<'_>, _l: &'b T) -> MemoryView<'b> {
891            std::mem::transmute(m)
892        }
893        let vm = unsafe { translife(vm, &memory) };
894        let mut state = env.make_mut(&mut func_env);
895        env::dispatch_ocall(fast_return, &mut state, &vm, func_id, p0, p1, p2, p3)
896    });
897
898    if env.ocall_trace_enabled {
899        let func_name = env::ocall_id2name(func_id);
900        tracing::trace!(target: "sidevm", "{func_name}({p0}, {p1}, {p2}, {p3}) = {result:?}");
901    }
902    convert(result)
903}
904
905fn convert(result: Result<i32, OcallError>) -> Result<IntRet, OcallAborted> {
906    match result {
907        Err(OcallError::GasExhausted) => Err(OcallAborted::GasExhausted),
908        Err(OcallError::Stifled) => Err(OcallAborted::Stifled),
909        _ => Ok(result.encode_ret()),
910    }
911}
912
913#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
914pub enum OcallAborted {
915    GasExhausted,
916    Stifled,
917}
918
919impl From<OcallAborted> for OcallError {
920    fn from(aborted: OcallAborted) -> Self {
921        match aborted {
922            OcallAborted::GasExhausted => OcallError::GasExhausted,
923            OcallAborted::Stifled => OcallError::Stifled,
924        }
925    }
926}
927
928impl fmt::Display for OcallAborted {
929    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
930        match self {
931            OcallAborted::GasExhausted => write!(f, "Gas exhausted"),
932            OcallAborted::Stifled => write!(f, "Stifled"),
933        }
934    }
935}
936
937impl std::error::Error for OcallAborted {}
938
939pub use vm_counter::vm_count;
940mod vm_counter {
941    use std::sync::atomic::{AtomicUsize, Ordering};
942
943    pub fn vm_count() -> usize {
944        Counter::current()
945    }
946
947    static COUNTER: AtomicUsize = AtomicUsize::new(0);
948    pub struct Counter(());
949    impl Counter {
950        pub fn current() -> usize {
951            COUNTER.load(Ordering::Relaxed)
952        }
953    }
954    impl Default for Counter {
955        fn default() -> Self {
956            COUNTER.fetch_add(1, Ordering::Relaxed);
957            Self(())
958        }
959    }
960    impl Drop for Counter {
961        fn drop(&mut self) {
962            COUNTER.fetch_sub(1, Ordering::Relaxed);
963        }
964    }
965}