sidevm_host_runtime/
service.rs

1use crate::env::{DynCacheOps, OcallAborted};
2use crate::run::{WasmEngine, WasmInstanceConfig};
3use crate::{ShortId, VmId};
4use anyhow::Result;
5use phala_scheduler::TaskScheduler;
6use serde::{Deserialize, Serialize};
7use sidevm_env::messages::{AccountId, HttpHead, HttpResponseHead};
8use std::future::Future;
9use tokio::io::DuplexStream;
10use tokio::{
11    sync::mpsc::{channel, Receiver, Sender},
12    sync::oneshot::Sender as OneshotSender,
13    sync::watch::Receiver as WatchReceiver,
14    task::JoinHandle,
15};
16use tracing::{debug, error, info, trace, warn, Instrument};
17
18pub use sidevm_env::messages::{Metric, SystemMessage};
19pub type CommandSender = Sender<Command>;
20
21#[derive(Debug)]
22pub enum Report {
23    VmTerminated { id: VmId, reason: ExitReason },
24}
25
26#[derive(Debug, Clone, Copy, Serialize, Deserialize, derive_more::Display)]
27pub enum ExitReason {
28    /// The program returned from `fn main`.
29    Exited(i32),
30    /// Stopped by an external Stop command.
31    Stopped,
32    /// The input channel has been closed, likely caused by a Stop command.
33    InputClosed,
34    /// The program panicked.
35    Panicked,
36    /// The task future has beed dropped, likely caused by a Stop command.
37    Cancelled,
38    /// Terminated due to gas checking.
39    OcallAborted(OcallAborted),
40    /// When a previous running instance restored from a checkpoint.
41    Restore,
42    /// The sidevm was deployed without code, so it it waiting to a custom code uploading.
43    WaitingForCode,
44    /// The Code of the sidevm is too large.
45    CodeTooLarge,
46    /// Failed to create the sidevm instance.
47    FailedToStart,
48}
49
50pub enum Command {
51    // Stop the side VM instance.
52    Stop,
53    // Send a sidevm message to the instance.
54    PushMessage(Vec<u8>),
55    // Send a sidevm system message to the instance.
56    PushSystemMessage(SystemMessage),
57    // Push a query from RPC to the instance.
58    PushQuery {
59        origin: Option<AccountId>,
60        payload: Vec<u8>,
61        reply_tx: OneshotSender<Vec<u8>>,
62    },
63    // Update the task scheduling weight
64    UpdateWeight(u32),
65    // An incoming HTTP request
66    HttpRequest(IncomingHttpRequest),
67}
68
69pub struct IncomingHttpRequest {
70    pub(crate) head: HttpHead,
71    pub(crate) body_stream: DuplexStream,
72    pub(crate) response_tx: OneshotSender<anyhow::Result<HttpResponseHead>>,
73}
74
75pub struct ServiceRun {
76    runtime: tokio::runtime::Runtime,
77    report_rx: Receiver<Report>,
78}
79
80#[derive(Clone)]
81pub struct Spawner {
82    runtime_handle: tokio::runtime::Handle,
83    report_tx: Sender<Report>,
84    out_tx: crate::OutgoingRequestChannel,
85    scheduler: TaskScheduler<VmId>,
86}
87
88pub fn service(
89    worker_threads: usize,
90    out_tx: crate::OutgoingRequestChannel,
91) -> (ServiceRun, Spawner) {
92    let worker_threads = worker_threads.max(1);
93    let runtime = tokio::runtime::Builder::new_multi_thread()
94        .max_blocking_threads(16)
95        // Reason for the additional 2 threads:
96        // One for the blocking reactor thread, another one for receiving channel messages
97        // from the pink system
98        .worker_threads(worker_threads + 2)
99        .enable_all()
100        .build()
101        .unwrap();
102    let runtime_handle = runtime.handle().clone();
103    let (report_tx, report_rx) = channel(100);
104    let run = ServiceRun { runtime, report_rx };
105    let spawner = Spawner {
106        runtime_handle,
107        report_tx,
108        out_tx,
109        scheduler: TaskScheduler::new(worker_threads as _),
110    };
111    (run, spawner)
112}
113
114impl ServiceRun {
115    pub fn blocking_run(self, event_handler: impl FnMut(Report)) {
116        let handle = self.runtime.handle().clone();
117        handle.block_on(self.run(event_handler));
118    }
119
120    pub async fn run(mut self, mut event_handler: impl FnMut(Report)) {
121        loop {
122            match self.report_rx.recv().await {
123                None => {
124                    info!(target: "sidevm", "The report channel is closed. Exiting service.");
125                    break;
126                }
127                Some(report) => {
128                    event_handler(report);
129                }
130            }
131        }
132
133        // To avoid: panicked at 'Cannot drop a runtime in a context where blocking is not allowed.'
134        let handle = self.runtime.handle().clone();
135        handle.spawn_blocking(move || drop(self));
136    }
137}
138
139impl Spawner {
140    #[tracing::instrument(parent=None, name="sidevm", fields(id = %ShortId(id)), skip_all)]
141    #[allow(clippy::too_many_arguments)]
142    pub fn start(
143        &self,
144        wasm_bytes: &[u8],
145        max_memory_pages: u32,
146        id: VmId,
147        gas_per_breath: u64,
148        cache_ops: DynCacheOps,
149        weight: u32,
150        prev_stopped: Option<WatchReceiver<bool>>,
151    ) -> Result<(CommandSender, JoinHandle<ExitReason>)> {
152        let event_tx = self.out_tx.clone();
153        let (cmd_tx, mut cmd_rx) = channel(128);
154        let spawner = self.runtime_handle.clone();
155        let scheduler = self.scheduler.clone();
156        let wasm_bytes = wasm_bytes.to_vec();
157        let handle = self.spawn(async move {
158            macro_rules! push_msg {
159                ($expr: expr, $level: ident, $msg: expr) => {{
160                    $level!(target: "sidevm", msg=%$msg, "Pushing message");
161                    match $expr {
162                        None => {
163                            $level!(target: "sidevm", "Message rejected");
164                            continue;
165                        },
166                        Some(v) => v,
167                    }
168                }};
169                (@async: $expr: expr, $level: ident, $msg: expr) => {
170                    let push = push_msg!($expr, $level, $msg);
171                    spawner.spawn(async move {
172                        if let Err(err) = push.await {
173                            error!(target: "sidevm", msg=%$msg, ?err, "Push message failed");
174                        }
175                    }.in_current_span());
176                };
177                (@sync: $expr: expr, $level: ident, $msg: expr) => {
178                    let push = push_msg!($expr, $level, $msg);
179                    if let Err(err) = push {
180                        error!(target: "sidevm", msg=%$msg, %err, "Push message failed");
181                    }
182                };
183            }
184            let mut weight = weight;
185            if let Some(mut prev_stopped) = prev_stopped {
186                if !*prev_stopped.borrow() {
187                    info!(target: "sidevm", "Waiting for the previous instance to be stopped...");
188                    tokio::select! {
189                        _ = prev_stopped.changed() => {},
190                        cmd = cmd_rx.recv() => {
191                            match cmd {
192                                None => {
193                                    info!(target: "sidevm", "The command channel is closed. Exiting...");
194                                    return ExitReason::InputClosed;
195                                }
196                                Some(Command::Stop) => {
197                                    info!(target: "sidevm", "Received stop command. Exiting...");
198                                    return ExitReason::Stopped;
199                                }
200                                Some(Command::UpdateWeight(w)) => {
201                                    weight = w;
202                                }
203                                Some(
204                                    Command::PushMessage(_) |
205                                    Command::PushSystemMessage(_) |
206                                    Command::PushQuery { .. } |
207                                    Command::HttpRequest(_)
208                                ) => {
209                                    info!(
210                                        target: "sidevm",
211                                        "Ignored command while waiting for the previous instance to be stopped"
212                                    );
213                                }
214                            }
215                        },
216                    }
217                }
218            }
219            info!(target: "sidevm", "Starting sidevm instance...");
220            let engine = WasmEngine::new();
221            let module = match engine.compile(&wasm_bytes) {
222                Ok(m) => m,
223                Err(err) => {
224                    error!(target: "sidevm", ?err, "Failed to compile wasm module");
225                    return ExitReason::FailedToStart;
226                }
227            };
228            info!(target: "sidevm", "Wasm module compiled");
229            let config = WasmInstanceConfig {
230                max_memory_pages,
231                id,
232                gas_per_breath,
233                cache_ops,
234                scheduler: Some(scheduler),
235                weight,
236                event_tx,
237                log_handler: None,
238            };
239            let (mut wasm_run, env) = match module.run(vec![], config) {
240                Ok(i) => i,
241                Err(err) => {
242                    error!(target: "sidevm", "Failed to create sidevm instance: {err:?}");
243                    return ExitReason::FailedToStart;
244                }
245            };
246            loop {
247                tokio::select! {
248                    cmd = cmd_rx.recv() => {
249                        match cmd {
250                            None => {
251                                info!(target: "sidevm", "The command channel is closed. Exiting...");
252                                break ExitReason::InputClosed;
253                            }
254                            Some(Command::Stop) => {
255                                info!(target: "sidevm", "Received stop command. Exiting...");
256                                break ExitReason::Stopped;
257                            }
258                            Some(Command::PushMessage(msg)) => {
259                                push_msg!(@sync: env.push_message(msg), debug, "message");
260                            }
261                            Some(Command::PushSystemMessage(msg)) => {
262                                push_msg!(@sync: env.push_system_message(msg), trace, "system message");
263                            }
264                            Some(Command::PushQuery{ origin, payload, reply_tx }) => {
265                                push_msg!(@async: env.push_query(origin, payload, reply_tx), debug, "query");
266                            }
267                            Some(Command::HttpRequest(request)) => {
268                                push_msg!(@async: env.push_http_request(request), debug, "http request");
269                            }
270                            Some(Command::UpdateWeight(weight)) => {
271                                env.set_weight(weight);
272                            }
273                        }
274                    }
275                    rv = &mut wasm_run => {
276                        match rv {
277                            Ok(ret) => {
278                                info!(target: "sidevm", ret, "The sidevm instance exited normally.");
279                                break ExitReason::Exited(ret);
280                            }
281                            Err(err) => {
282                                info!(target: "sidevm", ?err, "The sidevm instance exited.");
283                                match err.downcast::<crate::env::OcallAborted>() {
284                                    Ok(err) => {
285                                        break ExitReason::OcallAborted(err);
286                                    }
287                                    Err(_) => {
288                                        break ExitReason::Panicked;
289                                    }
290                                }
291                            }
292                        }
293                    }
294                }
295            }
296        });
297        let report_tx = self.report_tx.clone();
298        let handle = self.spawn(async move {
299            let reason = match handle.await {
300                Ok(r) => r,
301                Err(err) => {
302                    warn!(target: "sidevm", ?err, "The sidevm instance exited with error");
303                    if err.is_cancelled() {
304                        ExitReason::Cancelled
305                    } else {
306                        ExitReason::Panicked
307                    }
308                }
309            };
310            if let Err(err) = report_tx.send(Report::VmTerminated { id, reason }).await {
311                warn!(target: "sidevm", ?err, "Failed to send report to sidevm service");
312            }
313            reason
314        });
315        Ok((cmd_tx, handle))
316    }
317
318    pub fn spawn<O: Send + 'static>(
319        &self,
320        fut: impl Future<Output = O> + Send + 'static,
321    ) -> JoinHandle<O> {
322        self.runtime_handle.spawn(fut.in_current_span())
323    }
324
325    pub fn event_tx(&self) -> crate::OutgoingRequestChannel {
326        self.out_tx.clone()
327    }
328}