crows_wasm/
lib.rs

1use anyhow::anyhow;
2use crows_bindings::{HTTPError, HTTPMethod, HTTPRequest, HTTPResponse};
3use crows_utils::services::{IterationInfo, RequestInfo, RunId};
4use crows_utils::{InfoHandle, InfoMessage};
5use futures::Future;
6use reqwest::header::{HeaderName, HeaderValue};
7use reqwest::{Body, Request, Url};
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10use serde_json::{from_slice, to_vec};
11use std::collections::VecDeque;
12use std::pin::Pin;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16use std::{any::Any, collections::HashMap, io::IoSlice};
17use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
18use tokio::sync::oneshot;
19use tokio::sync::RwLock;
20use tokio::time::Instant;
21use wasi_common::file::{FdFlags, FileType};
22use wasi_common::WasiFile;
23use wasmtime::{Caller, Engine, Linker, Memory, MemoryType, Module, Store};
24use wasmtime_wasi::{StdoutStream, StreamResult};
25
26pub mod executors;
27
28use crows_shared::Config;
29use executors::Executors;
30
31#[derive(thiserror::Error, Debug)]
32pub enum Error {
33    #[error("the module with a given name couldn't be found")]
34    NoSuchRun(RunId),
35}
36
37enum RuntimeMessage {
38    RunTest(oneshot::Sender<()>),
39}
40
41#[derive(Clone)]
42pub struct InstanceHandle {
43    inner: Option<InstanceHandleInner>,
44}
45
46#[derive(Clone)]
47pub struct InstanceHandleInner {
48    sender: UnboundedSender<RuntimeMessage>,
49    runtime: Arc<RwLock<RuntimeInner>>,
50}
51
52impl InstanceHandle {
53    pub async fn run_test(&self) -> anyhow::Result<()> {
54        let (sender, receiver) = oneshot::channel();
55        let inner = self
56            .inner
57            .iter()
58            .next()
59            .expect("Inner should be available before drop");
60        let instant = Instant::now();
61        inner.sender.send(RuntimeMessage::RunTest(sender))?;
62        receiver.await?;
63        let latency = instant.elapsed();
64        inner
65            .runtime
66            .write()
67            .await
68            .info_sender
69            .send(InfoMessage::IterationInfo(IterationInfo { latency }))?;
70        Ok(())
71    }
72}
73
74pub struct RuntimeInner {
75    pub instances: VecDeque<InstanceHandle>,
76    pub info_sender: UnboundedSender<InfoMessage>,
77}
78
79pub struct Runtime {
80    pub environment: Environment,
81    pub module: Module,
82    inner: Arc<RwLock<RuntimeInner>>,
83    info_sender: UnboundedSender<InfoMessage>,
84    length: usize,
85}
86
87impl Drop for InstanceHandle {
88    // once we drop the instance handle we want it to get back to the queue
89    // not sure if I like it, but I don't have time and motivation to rewrite it for now
90    fn drop(&mut self) {
91        if let Some(inner) = self.inner.take() {
92            tokio::spawn(async move {
93                let mut runtime = inner.runtime.write().await;
94                runtime
95                    .checkin_instance(InstanceHandle {
96                        inner: Some(inner.clone()),
97                    })
98                    .await;
99            });
100        }
101    }
102}
103
104impl Runtime {
105    pub fn send_update(&self, update: InfoMessage) -> anyhow::Result<()> {
106        Ok(self.info_sender.send(update)?)
107    }
108
109    pub fn capacity(&self) -> usize {
110        self.length
111    }
112
113    pub async fn active_count(&self) -> usize {
114        self.length - self.inner.read().await.instances.len()
115    }
116
117    pub fn new(content: &Vec<u8>) -> anyhow::Result<(Self, InfoHandle)> {
118        let environment = Environment::new()?;
119        let module = Module::from_binary(&environment.engine, content)?;
120
121        let (info_sender, info_receiver) = unbounded_channel();
122
123        let info_handle = InfoHandle {
124            receiver: info_receiver,
125        };
126
127        Ok((
128            Self {
129                module,
130                environment,
131                inner: Arc::new(RwLock::new(RuntimeInner {
132                    instances: VecDeque::new(),
133                    info_sender: info_sender.clone(),
134                })),
135                info_sender,
136                length: 0,
137            },
138            info_handle,
139        ))
140    }
141
142    pub async fn new_instance(&self) -> anyhow::Result<(Instance, InfoHandle, Store<WasiHostCtx>)> {
143        Instance::new(&self.environment, &self.module).await
144    }
145
146    pub async fn reserve_instance(&mut self) -> anyhow::Result<()> {
147        let (sender, receiver) = unbounded_channel();
148        let inner = InstanceHandleInner {
149            sender,
150            runtime: self.inner.clone(),
151        };
152        let handle = InstanceHandle { inner: Some(inner) };
153
154        let (instance, mut info_handle, store) =
155            Instance::new(&self.environment, &self.module).await?;
156
157        let info_sender = self.info_sender.clone();
158        tokio::spawn(async move {
159            while let Some(message) = info_handle.receiver.recv().await {
160                if let Err(_) = info_sender.send(message) {
161                    // TODO: should we report this error? it probably is just a closed channel
162                    break;
163                }
164            }
165        });
166
167        tokio::spawn(async move {
168            let mut store = store;
169            let mut receiver = receiver;
170            let mut instance = instance;
171
172            while let Some(message) = receiver.recv().await {
173                match message {
174                    RuntimeMessage::RunTest(sender) => {
175                        // TODO: remove this unwrap
176                        run_wasm(&mut instance, &mut store).await.unwrap();
177                        sender.send(()).unwrap();
178                    }
179                }
180            }
181        });
182
183        let mut inner = self.inner.write().await;
184        inner.instances.push_back(handle);
185        self.length += 1;
186        self.info_sender.send(InfoMessage::InstanceReserved);
187
188        Ok(())
189    }
190
191    pub async fn checkout_instance(&self) -> Option<InstanceHandle> {
192        let mut inner = self.inner.write().await;
193        inner.instances.pop_front()
194    }
195
196    pub async fn checkin_instance(&self, instance_handle: InstanceHandle) {
197        self.inner
198            .write()
199            .await
200            .checkin_instance(instance_handle)
201            .await;
202    }
203
204    pub async fn checkout_or_create_instance(&mut self) -> anyhow::Result<InstanceHandle> {
205        // TODO: I don't have time to refactor this code, but I don't really like trying in a loop
206        // the loop is needed because if we create instance and other task fetches it, we might
207        // still end up with None
208        loop {
209            if let Some(handle) = self.checkout_instance().await {
210                self.info_sender.send(InfoMessage::InstanceCheckedOut);
211                return Ok(handle);
212            } else {
213                self.reserve_instance().await?;
214            }
215        }
216    }
217}
218
219impl RuntimeInner {
220    pub async fn checkin_instance(&mut self, instance_handle: InstanceHandle) {
221        self.instances.push_back(instance_handle);
222        self.info_sender.send(InfoMessage::InstanceCheckedIn);
223    }
224}
225
226pub struct WasiHostCtx {
227    preview2_ctx: wasmtime_wasi::WasiCtx,
228    preview2_table: wasmtime::component::ResourceTable,
229    preview1_adapter: wasmtime_wasi::preview1::WasiPreview1Adapter,
230    memory: Option<Memory>,
231    buffers: slab::Slab<Box<[u8]>>,
232    client: reqwest::Client,
233    request_info_sender: UnboundedSender<RequestInfo>,
234    stderr_sender: UnboundedSender<Vec<u8>>,
235}
236
237fn create_return_value(status: u8, length: u32, ptr: u32) -> u64 {
238    assert!(
239        length <= 0x00FFFFFF,
240        "Length must be no larger than 3 bytes"
241    );
242    ((status as u64) << 56) | ((length as u64) << 32) | (ptr as u64)
243}
244
245impl WasiHostCtx {
246    pub fn instantiate(&mut self, mem: Memory) {
247        self.memory = Some(mem);
248    }
249
250    pub async fn wrap_async<'a, T, U, F, E>(
251        mut caller: Caller<'a, Self>,
252        ptr: u32,
253        len: u32,
254        f: F,
255    ) -> anyhow::Result<u64>
256    where
257        F: for<'b> FnOnce(
258            &'b mut Caller<'_, Self>,
259            T,
260        ) -> Pin<Box<dyn Future<Output = Result<U, E>> + 'b + Send>>,
261        U: Serialize,
262        E: Serialize,
263        T: DeserializeOwned,
264    {
265        let memory = get_memory(&mut caller)?;
266
267        let slice = memory
268            .data(&caller)
269            .get(ptr as usize..(ptr + len) as usize)
270            .ok_or(anyhow!("Could not get memory slice"))?;
271
272        let arg = from_slice(slice)?;
273
274        let result = f(&mut caller, arg).await;
275
276        let (_, store) = { memory.data_and_store_mut(&mut caller) };
277
278        match result {
279            Ok(ret) => {
280                let encoded = to_vec(&ret)?;
281
282                let length = encoded.len();
283                let index = store.buffers.insert(encoded.into_boxed_slice());
284
285                Ok(create_return_value(0, length as u32, index as u32))
286            }
287            Err(err) => {
288                let encoded = to_vec(&err)?;
289
290                let length = encoded.len();
291                let index = store.buffers.insert(encoded.into_boxed_slice());
292
293                Ok(create_return_value(1, length as u32, index as u32))
294            }
295        }
296    }
297
298    pub fn http<'a>(
299        mut caller: &'a mut Caller<'_, Self>,
300        request: HTTPRequest,
301    ) -> Pin<Box<dyn Future<Output = Result<HTTPResponse, HTTPError>> + 'a + Send>> {
302        Box::pin(async move {
303            // TODO remove this unwrap
304            let memory = get_memory(&mut caller).unwrap();
305            let (_, store) = memory.data_and_store_mut(&mut caller);
306
307            let client = &store.client;
308
309            let method = match request.method {
310                HTTPMethod::HEAD => reqwest::Method::HEAD,
311                HTTPMethod::GET => reqwest::Method::GET,
312                HTTPMethod::POST => reqwest::Method::POST,
313                HTTPMethod::PUT => reqwest::Method::PUT,
314                HTTPMethod::DELETE => reqwest::Method::DELETE,
315                HTTPMethod::OPTIONS => reqwest::Method::OPTIONS,
316            };
317            let url = Url::parse(&request.url).map_err(|err| HTTPError {
318                message: format!("Error when parsing the URL: {err:?}"),
319            })?;
320
321            let mut reqw_req = Request::new(method, url);
322
323            for (key, value) in request.headers {
324                let name = HeaderName::from_str(&key).map_err(|err| HTTPError {
325                    message: format!("Invalid header name: {key}: {err:?}"),
326                })?;
327                let value = HeaderValue::from_str(&value).map_err(|err| HTTPError {
328                    message: format!("Invalid header value: {value}: {err:?}"),
329                })?;
330                reqw_req.headers_mut().insert(name, value);
331            }
332
333            *reqw_req.body_mut() = request.body.map(|b| Body::from(b));
334
335            let instant = Instant::now();
336            let response = client.execute(reqw_req).await.map_err(|err| {
337                store.request_info_sender.send(RequestInfo {
338                    latency: instant.elapsed(),
339                    successful: false,
340                });
341
342                HTTPError {
343                    message: format!("Error when sending a request: {err:?}"),
344                }
345            })?;
346            let latency = instant.elapsed();
347
348            let mut headers = HashMap::new();
349            for (name, value) in response.headers().iter() {
350                let value = value.to_str().map_err(|err| HTTPError {
351                    message: format!("Could not parse response header {value:?}: {err:?}"),
352                })?;
353                headers.insert(name.to_string(), value.to_string());
354            }
355
356            let status = response.status().as_u16();
357            let successful = response.status().is_success();
358            let body = response.text().await.map_err(|err| HTTPError {
359                message: format!("Problem with fetching the body: {err:?}"),
360            })?;
361
362            store.request_info_sender.send(RequestInfo {
363                latency,
364                successful,
365            });
366
367            Ok(HTTPResponse {
368                headers,
369                body,
370                status,
371            })
372        })
373    }
374
375    pub fn set_config(mut caller: Caller<'_, Self>, ptr: u32, len: u32) -> anyhow::Result<u32> {
376        let memory = get_memory(&mut caller)?;
377
378        let slice = memory
379            .data(&caller)
380            .get(ptr as usize..(ptr + len) as usize)
381            .ok_or(anyhow!("Could not get memory slice"))?
382            .to_owned()
383            .into_boxed_slice();
384
385        let (_, store) = memory.data_and_store_mut(&mut caller);
386
387        let index = store.buffers.insert(slice);
388
389        Ok(index as u32)
390    }
391
392    pub fn consume_buffer(
393        mut caller: Caller<'_, Self>,
394        index: u32,
395        ptr: u32,
396        len: u32,
397    ) -> anyhow::Result<()> {
398        let memory = get_memory(&mut caller)?;
399        let (slice, store) = memory.data_and_store_mut(&mut caller);
400
401        let buffer = store
402            .buffers
403            .try_remove(index as usize)
404            .ok_or(anyhow!("Could not remove slab buffer"))?;
405
406        anyhow::ensure!(
407            len as usize == buffer.len(),
408            "bad length passed to consume_buffer"
409        );
410
411        slice
412            .get_mut((ptr as usize)..((ptr + len) as usize))
413            .ok_or(anyhow!("Could not fetch slice from WASM memory"))?
414            .copy_from_slice(&buffer);
415
416        Ok(())
417    }
418}
419
420impl wasmtime_wasi::WasiView for WasiHostCtx {
421    fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
422        &mut self.preview2_table
423    }
424
425    fn ctx(&mut self) -> &mut wasmtime_wasi::WasiCtx {
426        &mut self.preview2_ctx
427    }
428}
429
430impl wasmtime_wasi::preview1::WasiPreview1View for WasiHostCtx {
431    fn adapter(&self) -> &wasmtime_wasi::preview1::WasiPreview1Adapter {
432        &self.preview1_adapter
433    }
434
435    fn adapter_mut(&mut self) -> &mut wasmtime_wasi::preview1::WasiPreview1Adapter {
436        &mut self.preview1_adapter
437    }
438}
439
440#[derive(Clone)]
441pub struct Environment {
442    engine: Engine,
443    linker: Linker<WasiHostCtx>,
444}
445
446pub struct Instance {
447    instance: wasmtime::Instance,
448}
449
450impl Environment {
451    pub fn new() -> anyhow::Result<Self> {
452        let mut config = wasmtime::Config::new();
453        config.async_support(true);
454        config.consume_fuel(true);
455
456        let engine = Engine::new(&config)?;
457
458        let mut linker = Linker::new(&engine);
459
460        linker
461            .func_wrap("crows", "consume_buffer", WasiHostCtx::consume_buffer)
462            .unwrap();
463        linker
464            .func_wrap2_async("crows", "http", |caller, ptr, len| {
465                Box::new(async move {
466                    WasiHostCtx::wrap_async(caller, ptr, len, WasiHostCtx::http).await
467                })
468            })
469            .unwrap();
470        linker
471            .func_wrap("crows", "set_config", WasiHostCtx::set_config)
472            .unwrap();
473
474        wasmtime_wasi::preview1::add_to_linker_async(&mut linker, |t| t)?;
475
476        Ok(Self { engine, linker })
477    }
478}
479
480pub fn get_memory<T>(caller: &mut Caller<'_, T>) -> anyhow::Result<Memory> {
481    Ok(caller.get_export("memory").unwrap().into_memory().unwrap())
482}
483
484impl Instance {
485    pub fn new_store(
486        engine: &Engine,
487    ) -> anyhow::Result<(wasmtime::Store<WasiHostCtx>, InfoHandle)> {
488        let (stdout_sender, mut stdout_receiver) = tokio::sync::mpsc::unbounded_channel();
489        let (stderr_sender, mut stderr_receiver) = tokio::sync::mpsc::unbounded_channel();
490        let (request_info_sender, mut request_info_receiver) =
491            tokio::sync::mpsc::unbounded_channel();
492
493        let (info_sender, info_receiver) = tokio::sync::mpsc::unbounded_channel();
494
495        tokio::spawn(async move {
496            loop {
497                tokio::select! {
498                    Some(message) = stdout_receiver.recv() => info_sender.send(InfoMessage::Stdout(message)),
499                    Some(message) = stderr_receiver.recv() => info_sender.send(InfoMessage::Stderr(message)),
500                    Some(message) = request_info_receiver.recv() => info_sender.send(InfoMessage::RequestInfo(message)),
501                    else => break
502                };
503            }
504        });
505
506        let info_handle = InfoHandle {
507            receiver: info_receiver,
508        };
509
510        let stdout = RemoteIo {
511            sender: stdout_sender,
512        };
513        let stderr = RemoteIo {
514            sender: stderr_sender.clone(),
515        };
516
517        let wasi_ctx = wasmtime_wasi::WasiCtxBuilder::new()
518            .stdout(stdout)
519            .stderr(stderr)
520            // .inherit_stdio()
521            .build();
522
523        let host_ctx = WasiHostCtx {
524            preview2_ctx: wasi_ctx,
525            preview2_table: wasmtime::component::ResourceTable::new(),
526            preview1_adapter: wasmtime_wasi::preview1::WasiPreview1Adapter::new(),
527            buffers: slab::Slab::default(),
528            memory: None,
529            client: reqwest::Client::new(),
530            request_info_sender,
531            stderr_sender,
532        };
533        let mut store: Store<WasiHostCtx> = Store::new(engine, host_ctx);
534
535        let memory = Memory::new(&mut store, MemoryType::new(1, None)).unwrap();
536        store.data_mut().memory = Some(memory);
537
538        // WebAssembly execution will be paused for an async yield every time it
539        // consumes 10000 fuel. Fuel will be refilled u64::MAX times.
540        store.fuel_async_yield_interval(Some(10000))?;
541        store.set_fuel(u64::MAX).unwrap();
542
543        Ok((store, info_handle))
544    }
545
546    pub async fn new(
547        env: &Environment,
548        module: &Module,
549    ) -> anyhow::Result<(Self, InfoHandle, Store<WasiHostCtx>)> {
550        let (mut store, info_handle) = Instance::new_store(&env.engine)?;
551        let instance = env.linker.instantiate_async(&mut store, module).await?;
552
553        let result = Self { instance };
554        Ok((result, info_handle, store))
555    }
556}
557
558pub async fn run_wasm(
559    instance: &mut Instance,
560    mut store: &mut Store<WasiHostCtx>,
561) -> anyhow::Result<()> {
562    let func = instance
563        .instance
564        .get_typed_func::<(), ()>(&mut store, "scenario")?;
565
566    if let Err(err) = func.call_async(&mut store, ()).await {
567        if let Err(e) = store.data().stderr_sender.send(format!("Encountered an error when running a scenario: {err:?}").as_bytes().to_vec()) {
568            eprintln!("Problem when sending logs to worker: {e:?}");
569        }
570    }
571
572    Ok(())
573}
574
575pub async fn fetch_config(
576    instance: Instance,
577    mut store: &mut Store<WasiHostCtx>,
578) -> anyhow::Result<crows_shared::Config> {
579    let func = instance
580        .instance
581        .get_typed_func::<(), u32>(&mut store, "__config")?;
582
583    let index = func.call_async(&mut store, ()).await?;
584    let buffer = store
585        .data_mut()
586        .buffers
587        .try_remove(index as usize)
588        .ok_or(anyhow!("Couldn't find slab"))?;
589
590    Ok(from_slice(&buffer)?)
591}
592
593#[derive(Clone)]
594struct RemoteIo {
595    sender: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
596}
597
598#[wiggle::async_trait]
599impl WasiFile for RemoteIo {
600    fn as_any(&self) -> &dyn Any {
601        self
602    }
603    async fn get_filetype(&self) -> Result<FileType, wasi_common::Error> {
604        Ok(FileType::Pipe)
605    }
606    async fn get_fdflags(&self) -> Result<FdFlags, wasi_common::Error> {
607        Ok(FdFlags::APPEND)
608    }
609    async fn write_vectored<'a>(&self, bufs: &[IoSlice<'a>]) -> Result<u64, wasi_common::Error> {
610        let mut size: u64 = 0;
611        for slice in bufs {
612            let slice = slice.to_vec();
613            size += slice.len() as u64;
614            self.sender.send(slice).unwrap();
615        }
616        Ok(size)
617    }
618}
619
620impl wasmtime_wasi::HostOutputStream for RemoteIo {
621    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
622        self.sender.send(bytes.to_vec()).unwrap();
623
624        Ok(())
625    }
626
627    fn flush(&mut self) -> StreamResult<()> {
628        Ok(())
629    }
630
631    fn check_write(&mut self) -> StreamResult<usize> {
632        Ok(1024 * 1024)
633    }
634}
635
636impl StdoutStream for RemoteIo {
637    fn stream(&self) -> Box<dyn wasmtime_wasi::HostOutputStream> {
638        Box::new(self.clone())
639    }
640
641    fn isatty(&self) -> bool {
642        false
643    }
644}
645
646#[async_trait::async_trait]
647impl wasmtime_wasi::Subscribe for RemoteIo {
648    async fn ready(&mut self) {}
649}
650
651pub async fn run_scenario(runtime: Runtime, scenario: Vec<u8>, config: Config) {
652    let mut executor = Executors::create_executor(config, runtime).await;
653
654    tokio::spawn(async move {
655        // TODO: prepare should be an entirely separate step and coordinator should wait for
656        // prepare from all of the workers
657        executor.prepare().await;
658        executor.run().await;
659    });
660}