sidevm/
channel.rs

1//! Multi-producer, single-consumer channel implementation.
2use sidevm_env::{
3    messages::{AccountId, QueryRequest, SystemMessage},
4    InputChannel, OcallError,
5};
6
7use super::{ocall, ResourceId};
8use scale::{Decode, Error as CodecError};
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15use lazy_static::lazy_static;
16
17/// A query from an external RPC request.
18pub struct Query {
19    /// The account sending the query.
20    pub origin: Option<AccountId>,
21    /// The query payload.
22    pub payload: Vec<u8>,
23    /// The reply channel. Invoke `send` on this channel to send the reply.
24    pub reply_tx: OneshotSender,
25}
26
27/// A message from ink! to the side VM.
28pub type GeneralMessage = Vec<u8>;
29
30/// Sender end of a oneshot channel connected to host-side.
31pub struct OneshotSender {
32    res_id: ResourceId,
33}
34
35impl OneshotSender {
36    const fn new(res_id: ResourceId) -> Self {
37        Self { res_id }
38    }
39
40    /// Send a message to the host-side.
41    pub fn send(self, data: &[u8]) -> Result<(), OcallError> {
42        ocall::oneshot_send(self.res_id.0, data)
43    }
44}
45
46/// Receiver end of a channel connected to host-side.
47pub struct Receiver<M> {
48    res_id: ResourceId,
49    _marker: std::marker::PhantomData<M>,
50}
51
52/// The future to get the next message from the channel.
53pub struct Next<'a, M> {
54    ch: &'a Receiver<M>,
55}
56
57impl<T> Receiver<T> {
58    /// Create a new `Receiver` from a `ResourceId`.
59    pub const fn new(res_id: ResourceId) -> Self {
60        Self {
61            res_id,
62            _marker: std::marker::PhantomData,
63        }
64    }
65
66    /// Get the next message from the channel.
67    pub fn next(&self) -> Next<T> {
68        Next { ch: self }
69    }
70}
71
72impl Future for Next<'_, GeneralMessage> {
73    type Output = Option<GeneralMessage>;
74
75    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76        let waker_id = crate::env::tasks::intern_waker(cx.waker().clone());
77        match ocall::poll(waker_id, self.ch.res_id.0) {
78            Ok(msg) => Poll::Ready(Some(msg)),
79            Err(OcallError::EndOfFile) => Poll::Ready(None), // tx dropped
80            Err(OcallError::Pending) => Poll::Pending,
81            Err(err) => panic!("unexpected error: {:?}", err),
82        }
83    }
84}
85
86impl Future for Next<'_, SystemMessage> {
87    type Output = Option<Result<SystemMessage, CodecError>>;
88
89    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90        let waker_id = crate::env::tasks::intern_waker(cx.waker().clone());
91        match ocall::poll(waker_id, self.ch.res_id.0) {
92            Ok(msg) => Poll::Ready(Some(SystemMessage::decode(&mut &msg[..]))),
93            Err(OcallError::EndOfFile) => Poll::Ready(None), // The tx dropped
94            Err(OcallError::Pending) => Poll::Pending,
95            Err(err) => panic!("unexpected error: {:?}", err),
96        }
97    }
98}
99
100impl Future for Next<'_, Query> {
101    type Output = Option<Query>;
102
103    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104        let waker_id = crate::env::tasks::intern_waker(cx.waker().clone());
105        match ocall::poll(waker_id, self.ch.res_id.0) {
106            Ok(msg) => {
107                let request =
108                    QueryRequest::decode(&mut &msg[..]).expect("Failed to decode QueryRequest");
109                let reply_tx = OneshotSender::new(ResourceId(request.reply_tx));
110                Poll::Ready(Some(Query {
111                    origin: request.origin,
112                    payload: request.payload,
113                    reply_tx,
114                }))
115            }
116            Err(OcallError::EndOfFile) => Poll::Ready(None), // The tx dropped
117            Err(OcallError::Pending) => Poll::Pending,
118            Err(err) => panic!("unexpected error: {:?}", err),
119        }
120    }
121}
122
123macro_rules! singleton_channel {
124    ($ch: ident) => {{
125        lazy_static! {
126            static ref RX: Receiver<$ch> = {
127                let res_id = ocall::create_input_channel(InputChannel::$ch)
128                    .expect("Failed to create input channel");
129                Receiver::new(ResourceId(res_id))
130            };
131        }
132        &*RX
133    }};
134}
135
136/// The Pink standard input messages channel. Think of it as a stdin of a normal process.
137///
138/// When the sidevm instance is being killed, the tx in the runtime is droped while the instance is
139/// running. At this time the rx-end might receive a None which indicate the tx-end has been closed.
140pub fn input_messages() -> &'static Receiver<GeneralMessage> {
141    singleton_channel!(GeneralMessage)
142}
143
144/// Receive system messages such as log messages from other contracts if this contract has been set
145/// as log handler in the cluster.
146pub fn incoming_system_messages() -> &'static Receiver<SystemMessage> {
147    singleton_channel!(SystemMessage)
148}
149
150/// Queries from RPC channel.
151pub fn incoming_queries() -> &'static Receiver<Query> {
152    singleton_channel!(Query)
153}