1use crate::env::{DynCacheOps, OcallAborted};
2use crate::run::{WasmEngine, WasmInstanceConfig};
3use crate::{ShortId, VmId};
4use anyhow::Result;
5use phala_scheduler::TaskScheduler;
6use serde::{Deserialize, Serialize};
7use sidevm_env::messages::{AccountId, HttpHead, HttpResponseHead};
8use std::future::Future;
9use tokio::io::DuplexStream;
10use tokio::{
11 sync::mpsc::{channel, Receiver, Sender},
12 sync::oneshot::Sender as OneshotSender,
13 sync::watch::Receiver as WatchReceiver,
14 task::JoinHandle,
15};
16use tracing::{debug, error, info, trace, warn, Instrument};
17
18pub use sidevm_env::messages::{Metric, SystemMessage};
19pub type CommandSender = Sender<Command>;
20
21#[derive(Debug)]
22pub enum Report {
23 VmTerminated { id: VmId, reason: ExitReason },
24}
25
26#[derive(Debug, Clone, Copy, Serialize, Deserialize, derive_more::Display)]
27pub enum ExitReason {
28 Exited(i32),
30 Stopped,
32 InputClosed,
34 Panicked,
36 Cancelled,
38 OcallAborted(OcallAborted),
40 Restore,
42 WaitingForCode,
44 CodeTooLarge,
46 FailedToStart,
48}
49
50pub enum Command {
51 Stop,
53 PushMessage(Vec<u8>),
55 PushSystemMessage(SystemMessage),
57 PushQuery {
59 origin: Option<AccountId>,
60 payload: Vec<u8>,
61 reply_tx: OneshotSender<Vec<u8>>,
62 },
63 UpdateWeight(u32),
65 HttpRequest(IncomingHttpRequest),
67}
68
69pub struct IncomingHttpRequest {
70 pub(crate) head: HttpHead,
71 pub(crate) body_stream: DuplexStream,
72 pub(crate) response_tx: OneshotSender<anyhow::Result<HttpResponseHead>>,
73}
74
75pub struct ServiceRun {
76 runtime: tokio::runtime::Runtime,
77 report_rx: Receiver<Report>,
78}
79
80#[derive(Clone)]
81pub struct Spawner {
82 runtime_handle: tokio::runtime::Handle,
83 report_tx: Sender<Report>,
84 out_tx: crate::OutgoingRequestChannel,
85 scheduler: TaskScheduler<VmId>,
86}
87
88pub fn service(
89 worker_threads: usize,
90 out_tx: crate::OutgoingRequestChannel,
91) -> (ServiceRun, Spawner) {
92 let worker_threads = worker_threads.max(1);
93 let runtime = tokio::runtime::Builder::new_multi_thread()
94 .max_blocking_threads(16)
95 .worker_threads(worker_threads + 2)
99 .enable_all()
100 .build()
101 .unwrap();
102 let runtime_handle = runtime.handle().clone();
103 let (report_tx, report_rx) = channel(100);
104 let run = ServiceRun { runtime, report_rx };
105 let spawner = Spawner {
106 runtime_handle,
107 report_tx,
108 out_tx,
109 scheduler: TaskScheduler::new(worker_threads as _),
110 };
111 (run, spawner)
112}
113
114impl ServiceRun {
115 pub fn blocking_run(self, event_handler: impl FnMut(Report)) {
116 let handle = self.runtime.handle().clone();
117 handle.block_on(self.run(event_handler));
118 }
119
120 pub async fn run(mut self, mut event_handler: impl FnMut(Report)) {
121 loop {
122 match self.report_rx.recv().await {
123 None => {
124 info!(target: "sidevm", "The report channel is closed. Exiting service.");
125 break;
126 }
127 Some(report) => {
128 event_handler(report);
129 }
130 }
131 }
132
133 let handle = self.runtime.handle().clone();
135 handle.spawn_blocking(move || drop(self));
136 }
137}
138
139impl Spawner {
140 #[tracing::instrument(parent=None, name="sidevm", fields(id = %ShortId(id)), skip_all)]
141 #[allow(clippy::too_many_arguments)]
142 pub fn start(
143 &self,
144 wasm_bytes: &[u8],
145 max_memory_pages: u32,
146 id: VmId,
147 gas_per_breath: u64,
148 cache_ops: DynCacheOps,
149 weight: u32,
150 prev_stopped: Option<WatchReceiver<bool>>,
151 ) -> Result<(CommandSender, JoinHandle<ExitReason>)> {
152 let event_tx = self.out_tx.clone();
153 let (cmd_tx, mut cmd_rx) = channel(128);
154 let spawner = self.runtime_handle.clone();
155 let scheduler = self.scheduler.clone();
156 let wasm_bytes = wasm_bytes.to_vec();
157 let handle = self.spawn(async move {
158 macro_rules! push_msg {
159 ($expr: expr, $level: ident, $msg: expr) => {{
160 $level!(target: "sidevm", msg=%$msg, "Pushing message");
161 match $expr {
162 None => {
163 $level!(target: "sidevm", "Message rejected");
164 continue;
165 },
166 Some(v) => v,
167 }
168 }};
169 (@async: $expr: expr, $level: ident, $msg: expr) => {
170 let push = push_msg!($expr, $level, $msg);
171 spawner.spawn(async move {
172 if let Err(err) = push.await {
173 error!(target: "sidevm", msg=%$msg, ?err, "Push message failed");
174 }
175 }.in_current_span());
176 };
177 (@sync: $expr: expr, $level: ident, $msg: expr) => {
178 let push = push_msg!($expr, $level, $msg);
179 if let Err(err) = push {
180 error!(target: "sidevm", msg=%$msg, %err, "Push message failed");
181 }
182 };
183 }
184 let mut weight = weight;
185 if let Some(mut prev_stopped) = prev_stopped {
186 if !*prev_stopped.borrow() {
187 info!(target: "sidevm", "Waiting for the previous instance to be stopped...");
188 tokio::select! {
189 _ = prev_stopped.changed() => {},
190 cmd = cmd_rx.recv() => {
191 match cmd {
192 None => {
193 info!(target: "sidevm", "The command channel is closed. Exiting...");
194 return ExitReason::InputClosed;
195 }
196 Some(Command::Stop) => {
197 info!(target: "sidevm", "Received stop command. Exiting...");
198 return ExitReason::Stopped;
199 }
200 Some(Command::UpdateWeight(w)) => {
201 weight = w;
202 }
203 Some(
204 Command::PushMessage(_) |
205 Command::PushSystemMessage(_) |
206 Command::PushQuery { .. } |
207 Command::HttpRequest(_)
208 ) => {
209 info!(
210 target: "sidevm",
211 "Ignored command while waiting for the previous instance to be stopped"
212 );
213 }
214 }
215 },
216 }
217 }
218 }
219 info!(target: "sidevm", "Starting sidevm instance...");
220 let engine = WasmEngine::new();
221 let module = match engine.compile(&wasm_bytes) {
222 Ok(m) => m,
223 Err(err) => {
224 error!(target: "sidevm", ?err, "Failed to compile wasm module");
225 return ExitReason::FailedToStart;
226 }
227 };
228 info!(target: "sidevm", "Wasm module compiled");
229 let config = WasmInstanceConfig {
230 max_memory_pages,
231 id,
232 gas_per_breath,
233 cache_ops,
234 scheduler: Some(scheduler),
235 weight,
236 event_tx,
237 log_handler: None,
238 };
239 let (mut wasm_run, env) = match module.run(vec![], config) {
240 Ok(i) => i,
241 Err(err) => {
242 error!(target: "sidevm", "Failed to create sidevm instance: {err:?}");
243 return ExitReason::FailedToStart;
244 }
245 };
246 loop {
247 tokio::select! {
248 cmd = cmd_rx.recv() => {
249 match cmd {
250 None => {
251 info!(target: "sidevm", "The command channel is closed. Exiting...");
252 break ExitReason::InputClosed;
253 }
254 Some(Command::Stop) => {
255 info!(target: "sidevm", "Received stop command. Exiting...");
256 break ExitReason::Stopped;
257 }
258 Some(Command::PushMessage(msg)) => {
259 push_msg!(@sync: env.push_message(msg), debug, "message");
260 }
261 Some(Command::PushSystemMessage(msg)) => {
262 push_msg!(@sync: env.push_system_message(msg), trace, "system message");
263 }
264 Some(Command::PushQuery{ origin, payload, reply_tx }) => {
265 push_msg!(@async: env.push_query(origin, payload, reply_tx), debug, "query");
266 }
267 Some(Command::HttpRequest(request)) => {
268 push_msg!(@async: env.push_http_request(request), debug, "http request");
269 }
270 Some(Command::UpdateWeight(weight)) => {
271 env.set_weight(weight);
272 }
273 }
274 }
275 rv = &mut wasm_run => {
276 match rv {
277 Ok(ret) => {
278 info!(target: "sidevm", ret, "The sidevm instance exited normally.");
279 break ExitReason::Exited(ret);
280 }
281 Err(err) => {
282 info!(target: "sidevm", ?err, "The sidevm instance exited.");
283 match err.downcast::<crate::env::OcallAborted>() {
284 Ok(err) => {
285 break ExitReason::OcallAborted(err);
286 }
287 Err(_) => {
288 break ExitReason::Panicked;
289 }
290 }
291 }
292 }
293 }
294 }
295 }
296 });
297 let report_tx = self.report_tx.clone();
298 let handle = self.spawn(async move {
299 let reason = match handle.await {
300 Ok(r) => r,
301 Err(err) => {
302 warn!(target: "sidevm", ?err, "The sidevm instance exited with error");
303 if err.is_cancelled() {
304 ExitReason::Cancelled
305 } else {
306 ExitReason::Panicked
307 }
308 }
309 };
310 if let Err(err) = report_tx.send(Report::VmTerminated { id, reason }).await {
311 warn!(target: "sidevm", ?err, "Failed to send report to sidevm service");
312 }
313 reason
314 });
315 Ok((cmd_tx, handle))
316 }
317
318 pub fn spawn<O: Send + 'static>(
319 &self,
320 fut: impl Future<Output = O> + Send + 'static,
321 ) -> JoinHandle<O> {
322 self.runtime_handle.spawn(fut.in_current_span())
323 }
324
325 pub fn event_tx(&self) -> crate::OutgoingRequestChannel {
326 self.out_tx.clone()
327 }
328}