1use anyhow::anyhow;
2use crows_bindings::{HTTPError, HTTPMethod, HTTPRequest, HTTPResponse};
3use crows_utils::services::{IterationInfo, RequestInfo, RunId};
4use crows_utils::{InfoHandle, InfoMessage};
5use futures::Future;
6use reqwest::header::{HeaderName, HeaderValue};
7use reqwest::{Body, Request, Url};
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10use serde_json::{from_slice, to_vec};
11use std::collections::VecDeque;
12use std::pin::Pin;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16use std::{any::Any, collections::HashMap, io::IoSlice};
17use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
18use tokio::sync::oneshot;
19use tokio::sync::RwLock;
20use tokio::time::Instant;
21use wasi_common::file::{FdFlags, FileType};
22use wasi_common::WasiFile;
23use wasmtime::{Caller, Engine, Linker, Memory, MemoryType, Module, Store};
24use wasmtime_wasi::{StdoutStream, StreamResult};
25
26pub mod executors;
27
28use crows_shared::Config;
29use executors::Executors;
30
31#[derive(thiserror::Error, Debug)]
32pub enum Error {
33 #[error("the module with a given name couldn't be found")]
34 NoSuchRun(RunId),
35}
36
37enum RuntimeMessage {
38 RunTest(oneshot::Sender<()>),
39}
40
41#[derive(Clone)]
42pub struct InstanceHandle {
43 inner: Option<InstanceHandleInner>,
44}
45
46#[derive(Clone)]
47pub struct InstanceHandleInner {
48 sender: UnboundedSender<RuntimeMessage>,
49 runtime: Arc<RwLock<RuntimeInner>>,
50}
51
52impl InstanceHandle {
53 pub async fn run_test(&self) -> anyhow::Result<()> {
54 let (sender, receiver) = oneshot::channel();
55 let inner = self
56 .inner
57 .iter()
58 .next()
59 .expect("Inner should be available before drop");
60 let instant = Instant::now();
61 inner.sender.send(RuntimeMessage::RunTest(sender))?;
62 receiver.await?;
63 let latency = instant.elapsed();
64 inner
65 .runtime
66 .write()
67 .await
68 .info_sender
69 .send(InfoMessage::IterationInfo(IterationInfo { latency }))?;
70 Ok(())
71 }
72}
73
74pub struct RuntimeInner {
75 pub instances: VecDeque<InstanceHandle>,
76 pub info_sender: UnboundedSender<InfoMessage>,
77}
78
79pub struct Runtime {
80 pub environment: Environment,
81 pub module: Module,
82 inner: Arc<RwLock<RuntimeInner>>,
83 info_sender: UnboundedSender<InfoMessage>,
84 length: usize,
85}
86
87impl Drop for InstanceHandle {
88 fn drop(&mut self) {
91 if let Some(inner) = self.inner.take() {
92 tokio::spawn(async move {
93 let mut runtime = inner.runtime.write().await;
94 runtime
95 .checkin_instance(InstanceHandle {
96 inner: Some(inner.clone()),
97 })
98 .await;
99 });
100 }
101 }
102}
103
104impl Runtime {
105 pub fn send_update(&self, update: InfoMessage) -> anyhow::Result<()> {
106 Ok(self.info_sender.send(update)?)
107 }
108
109 pub fn capacity(&self) -> usize {
110 self.length
111 }
112
113 pub async fn active_count(&self) -> usize {
114 self.length - self.inner.read().await.instances.len()
115 }
116
117 pub fn new(content: &Vec<u8>) -> anyhow::Result<(Self, InfoHandle)> {
118 let environment = Environment::new()?;
119 let module = Module::from_binary(&environment.engine, content)?;
120
121 let (info_sender, info_receiver) = unbounded_channel();
122
123 let info_handle = InfoHandle {
124 receiver: info_receiver,
125 };
126
127 Ok((
128 Self {
129 module,
130 environment,
131 inner: Arc::new(RwLock::new(RuntimeInner {
132 instances: VecDeque::new(),
133 info_sender: info_sender.clone(),
134 })),
135 info_sender,
136 length: 0,
137 },
138 info_handle,
139 ))
140 }
141
142 pub async fn new_instance(&self) -> anyhow::Result<(Instance, InfoHandle, Store<WasiHostCtx>)> {
143 Instance::new(&self.environment, &self.module).await
144 }
145
146 pub async fn reserve_instance(&mut self) -> anyhow::Result<()> {
147 let (sender, receiver) = unbounded_channel();
148 let inner = InstanceHandleInner {
149 sender,
150 runtime: self.inner.clone(),
151 };
152 let handle = InstanceHandle { inner: Some(inner) };
153
154 let (instance, mut info_handle, store) =
155 Instance::new(&self.environment, &self.module).await?;
156
157 let info_sender = self.info_sender.clone();
158 tokio::spawn(async move {
159 while let Some(message) = info_handle.receiver.recv().await {
160 if let Err(_) = info_sender.send(message) {
161 break;
163 }
164 }
165 });
166
167 tokio::spawn(async move {
168 let mut store = store;
169 let mut receiver = receiver;
170 let mut instance = instance;
171
172 while let Some(message) = receiver.recv().await {
173 match message {
174 RuntimeMessage::RunTest(sender) => {
175 run_wasm(&mut instance, &mut store).await.unwrap();
177 sender.send(()).unwrap();
178 }
179 }
180 }
181 });
182
183 let mut inner = self.inner.write().await;
184 inner.instances.push_back(handle);
185 self.length += 1;
186 self.info_sender.send(InfoMessage::InstanceReserved);
187
188 Ok(())
189 }
190
191 pub async fn checkout_instance(&self) -> Option<InstanceHandle> {
192 let mut inner = self.inner.write().await;
193 inner.instances.pop_front()
194 }
195
196 pub async fn checkin_instance(&self, instance_handle: InstanceHandle) {
197 self.inner
198 .write()
199 .await
200 .checkin_instance(instance_handle)
201 .await;
202 }
203
204 pub async fn checkout_or_create_instance(&mut self) -> anyhow::Result<InstanceHandle> {
205 loop {
209 if let Some(handle) = self.checkout_instance().await {
210 self.info_sender.send(InfoMessage::InstanceCheckedOut);
211 return Ok(handle);
212 } else {
213 self.reserve_instance().await?;
214 }
215 }
216 }
217}
218
219impl RuntimeInner {
220 pub async fn checkin_instance(&mut self, instance_handle: InstanceHandle) {
221 self.instances.push_back(instance_handle);
222 self.info_sender.send(InfoMessage::InstanceCheckedIn);
223 }
224}
225
226pub struct WasiHostCtx {
227 preview2_ctx: wasmtime_wasi::WasiCtx,
228 preview2_table: wasmtime::component::ResourceTable,
229 preview1_adapter: wasmtime_wasi::preview1::WasiPreview1Adapter,
230 memory: Option<Memory>,
231 buffers: slab::Slab<Box<[u8]>>,
232 client: reqwest::Client,
233 request_info_sender: UnboundedSender<RequestInfo>,
234 stderr_sender: UnboundedSender<Vec<u8>>,
235}
236
237fn create_return_value(status: u8, length: u32, ptr: u32) -> u64 {
238 assert!(
239 length <= 0x00FFFFFF,
240 "Length must be no larger than 3 bytes"
241 );
242 ((status as u64) << 56) | ((length as u64) << 32) | (ptr as u64)
243}
244
245impl WasiHostCtx {
246 pub fn instantiate(&mut self, mem: Memory) {
247 self.memory = Some(mem);
248 }
249
250 pub async fn wrap_async<'a, T, U, F, E>(
251 mut caller: Caller<'a, Self>,
252 ptr: u32,
253 len: u32,
254 f: F,
255 ) -> anyhow::Result<u64>
256 where
257 F: for<'b> FnOnce(
258 &'b mut Caller<'_, Self>,
259 T,
260 ) -> Pin<Box<dyn Future<Output = Result<U, E>> + 'b + Send>>,
261 U: Serialize,
262 E: Serialize,
263 T: DeserializeOwned,
264 {
265 let memory = get_memory(&mut caller)?;
266
267 let slice = memory
268 .data(&caller)
269 .get(ptr as usize..(ptr + len) as usize)
270 .ok_or(anyhow!("Could not get memory slice"))?;
271
272 let arg = from_slice(slice)?;
273
274 let result = f(&mut caller, arg).await;
275
276 let (_, store) = { memory.data_and_store_mut(&mut caller) };
277
278 match result {
279 Ok(ret) => {
280 let encoded = to_vec(&ret)?;
281
282 let length = encoded.len();
283 let index = store.buffers.insert(encoded.into_boxed_slice());
284
285 Ok(create_return_value(0, length as u32, index as u32))
286 }
287 Err(err) => {
288 let encoded = to_vec(&err)?;
289
290 let length = encoded.len();
291 let index = store.buffers.insert(encoded.into_boxed_slice());
292
293 Ok(create_return_value(1, length as u32, index as u32))
294 }
295 }
296 }
297
298 pub fn http<'a>(
299 mut caller: &'a mut Caller<'_, Self>,
300 request: HTTPRequest,
301 ) -> Pin<Box<dyn Future<Output = Result<HTTPResponse, HTTPError>> + 'a + Send>> {
302 Box::pin(async move {
303 let memory = get_memory(&mut caller).unwrap();
305 let (_, store) = memory.data_and_store_mut(&mut caller);
306
307 let client = &store.client;
308
309 let method = match request.method {
310 HTTPMethod::HEAD => reqwest::Method::HEAD,
311 HTTPMethod::GET => reqwest::Method::GET,
312 HTTPMethod::POST => reqwest::Method::POST,
313 HTTPMethod::PUT => reqwest::Method::PUT,
314 HTTPMethod::DELETE => reqwest::Method::DELETE,
315 HTTPMethod::OPTIONS => reqwest::Method::OPTIONS,
316 };
317 let url = Url::parse(&request.url).map_err(|err| HTTPError {
318 message: format!("Error when parsing the URL: {err:?}"),
319 })?;
320
321 let mut reqw_req = Request::new(method, url);
322
323 for (key, value) in request.headers {
324 let name = HeaderName::from_str(&key).map_err(|err| HTTPError {
325 message: format!("Invalid header name: {key}: {err:?}"),
326 })?;
327 let value = HeaderValue::from_str(&value).map_err(|err| HTTPError {
328 message: format!("Invalid header value: {value}: {err:?}"),
329 })?;
330 reqw_req.headers_mut().insert(name, value);
331 }
332
333 *reqw_req.body_mut() = request.body.map(|b| Body::from(b));
334
335 let instant = Instant::now();
336 let response = client.execute(reqw_req).await.map_err(|err| {
337 store.request_info_sender.send(RequestInfo {
338 latency: instant.elapsed(),
339 successful: false,
340 });
341
342 HTTPError {
343 message: format!("Error when sending a request: {err:?}"),
344 }
345 })?;
346 let latency = instant.elapsed();
347
348 let mut headers = HashMap::new();
349 for (name, value) in response.headers().iter() {
350 let value = value.to_str().map_err(|err| HTTPError {
351 message: format!("Could not parse response header {value:?}: {err:?}"),
352 })?;
353 headers.insert(name.to_string(), value.to_string());
354 }
355
356 let status = response.status().as_u16();
357 let successful = response.status().is_success();
358 let body = response.text().await.map_err(|err| HTTPError {
359 message: format!("Problem with fetching the body: {err:?}"),
360 })?;
361
362 store.request_info_sender.send(RequestInfo {
363 latency,
364 successful,
365 });
366
367 Ok(HTTPResponse {
368 headers,
369 body,
370 status,
371 })
372 })
373 }
374
375 pub fn set_config(mut caller: Caller<'_, Self>, ptr: u32, len: u32) -> anyhow::Result<u32> {
376 let memory = get_memory(&mut caller)?;
377
378 let slice = memory
379 .data(&caller)
380 .get(ptr as usize..(ptr + len) as usize)
381 .ok_or(anyhow!("Could not get memory slice"))?
382 .to_owned()
383 .into_boxed_slice();
384
385 let (_, store) = memory.data_and_store_mut(&mut caller);
386
387 let index = store.buffers.insert(slice);
388
389 Ok(index as u32)
390 }
391
392 pub fn consume_buffer(
393 mut caller: Caller<'_, Self>,
394 index: u32,
395 ptr: u32,
396 len: u32,
397 ) -> anyhow::Result<()> {
398 let memory = get_memory(&mut caller)?;
399 let (slice, store) = memory.data_and_store_mut(&mut caller);
400
401 let buffer = store
402 .buffers
403 .try_remove(index as usize)
404 .ok_or(anyhow!("Could not remove slab buffer"))?;
405
406 anyhow::ensure!(
407 len as usize == buffer.len(),
408 "bad length passed to consume_buffer"
409 );
410
411 slice
412 .get_mut((ptr as usize)..((ptr + len) as usize))
413 .ok_or(anyhow!("Could not fetch slice from WASM memory"))?
414 .copy_from_slice(&buffer);
415
416 Ok(())
417 }
418}
419
420impl wasmtime_wasi::WasiView for WasiHostCtx {
421 fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
422 &mut self.preview2_table
423 }
424
425 fn ctx(&mut self) -> &mut wasmtime_wasi::WasiCtx {
426 &mut self.preview2_ctx
427 }
428}
429
430impl wasmtime_wasi::preview1::WasiPreview1View for WasiHostCtx {
431 fn adapter(&self) -> &wasmtime_wasi::preview1::WasiPreview1Adapter {
432 &self.preview1_adapter
433 }
434
435 fn adapter_mut(&mut self) -> &mut wasmtime_wasi::preview1::WasiPreview1Adapter {
436 &mut self.preview1_adapter
437 }
438}
439
440#[derive(Clone)]
441pub struct Environment {
442 engine: Engine,
443 linker: Linker<WasiHostCtx>,
444}
445
446pub struct Instance {
447 instance: wasmtime::Instance,
448}
449
450impl Environment {
451 pub fn new() -> anyhow::Result<Self> {
452 let mut config = wasmtime::Config::new();
453 config.async_support(true);
454 config.consume_fuel(true);
455
456 let engine = Engine::new(&config)?;
457
458 let mut linker = Linker::new(&engine);
459
460 linker
461 .func_wrap("crows", "consume_buffer", WasiHostCtx::consume_buffer)
462 .unwrap();
463 linker
464 .func_wrap2_async("crows", "http", |caller, ptr, len| {
465 Box::new(async move {
466 WasiHostCtx::wrap_async(caller, ptr, len, WasiHostCtx::http).await
467 })
468 })
469 .unwrap();
470 linker
471 .func_wrap("crows", "set_config", WasiHostCtx::set_config)
472 .unwrap();
473
474 wasmtime_wasi::preview1::add_to_linker_async(&mut linker, |t| t)?;
475
476 Ok(Self { engine, linker })
477 }
478}
479
480pub fn get_memory<T>(caller: &mut Caller<'_, T>) -> anyhow::Result<Memory> {
481 Ok(caller.get_export("memory").unwrap().into_memory().unwrap())
482}
483
484impl Instance {
485 pub fn new_store(
486 engine: &Engine,
487 ) -> anyhow::Result<(wasmtime::Store<WasiHostCtx>, InfoHandle)> {
488 let (stdout_sender, mut stdout_receiver) = tokio::sync::mpsc::unbounded_channel();
489 let (stderr_sender, mut stderr_receiver) = tokio::sync::mpsc::unbounded_channel();
490 let (request_info_sender, mut request_info_receiver) =
491 tokio::sync::mpsc::unbounded_channel();
492
493 let (info_sender, info_receiver) = tokio::sync::mpsc::unbounded_channel();
494
495 tokio::spawn(async move {
496 loop {
497 tokio::select! {
498 Some(message) = stdout_receiver.recv() => info_sender.send(InfoMessage::Stdout(message)),
499 Some(message) = stderr_receiver.recv() => info_sender.send(InfoMessage::Stderr(message)),
500 Some(message) = request_info_receiver.recv() => info_sender.send(InfoMessage::RequestInfo(message)),
501 else => break
502 };
503 }
504 });
505
506 let info_handle = InfoHandle {
507 receiver: info_receiver,
508 };
509
510 let stdout = RemoteIo {
511 sender: stdout_sender,
512 };
513 let stderr = RemoteIo {
514 sender: stderr_sender.clone(),
515 };
516
517 let wasi_ctx = wasmtime_wasi::WasiCtxBuilder::new()
518 .stdout(stdout)
519 .stderr(stderr)
520 .build();
522
523 let host_ctx = WasiHostCtx {
524 preview2_ctx: wasi_ctx,
525 preview2_table: wasmtime::component::ResourceTable::new(),
526 preview1_adapter: wasmtime_wasi::preview1::WasiPreview1Adapter::new(),
527 buffers: slab::Slab::default(),
528 memory: None,
529 client: reqwest::Client::new(),
530 request_info_sender,
531 stderr_sender,
532 };
533 let mut store: Store<WasiHostCtx> = Store::new(engine, host_ctx);
534
535 let memory = Memory::new(&mut store, MemoryType::new(1, None)).unwrap();
536 store.data_mut().memory = Some(memory);
537
538 store.fuel_async_yield_interval(Some(10000))?;
541 store.set_fuel(u64::MAX).unwrap();
542
543 Ok((store, info_handle))
544 }
545
546 pub async fn new(
547 env: &Environment,
548 module: &Module,
549 ) -> anyhow::Result<(Self, InfoHandle, Store<WasiHostCtx>)> {
550 let (mut store, info_handle) = Instance::new_store(&env.engine)?;
551 let instance = env.linker.instantiate_async(&mut store, module).await?;
552
553 let result = Self { instance };
554 Ok((result, info_handle, store))
555 }
556}
557
558pub async fn run_wasm(
559 instance: &mut Instance,
560 mut store: &mut Store<WasiHostCtx>,
561) -> anyhow::Result<()> {
562 let func = instance
563 .instance
564 .get_typed_func::<(), ()>(&mut store, "scenario")?;
565
566 if let Err(err) = func.call_async(&mut store, ()).await {
567 if let Err(e) = store.data().stderr_sender.send(format!("Encountered an error when running a scenario: {err:?}").as_bytes().to_vec()) {
568 eprintln!("Problem when sending logs to worker: {e:?}");
569 }
570 }
571
572 Ok(())
573}
574
575pub async fn fetch_config(
576 instance: Instance,
577 mut store: &mut Store<WasiHostCtx>,
578) -> anyhow::Result<crows_shared::Config> {
579 let func = instance
580 .instance
581 .get_typed_func::<(), u32>(&mut store, "__config")?;
582
583 let index = func.call_async(&mut store, ()).await?;
584 let buffer = store
585 .data_mut()
586 .buffers
587 .try_remove(index as usize)
588 .ok_or(anyhow!("Couldn't find slab"))?;
589
590 Ok(from_slice(&buffer)?)
591}
592
593#[derive(Clone)]
594struct RemoteIo {
595 sender: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
596}
597
598#[wiggle::async_trait]
599impl WasiFile for RemoteIo {
600 fn as_any(&self) -> &dyn Any {
601 self
602 }
603 async fn get_filetype(&self) -> Result<FileType, wasi_common::Error> {
604 Ok(FileType::Pipe)
605 }
606 async fn get_fdflags(&self) -> Result<FdFlags, wasi_common::Error> {
607 Ok(FdFlags::APPEND)
608 }
609 async fn write_vectored<'a>(&self, bufs: &[IoSlice<'a>]) -> Result<u64, wasi_common::Error> {
610 let mut size: u64 = 0;
611 for slice in bufs {
612 let slice = slice.to_vec();
613 size += slice.len() as u64;
614 self.sender.send(slice).unwrap();
615 }
616 Ok(size)
617 }
618}
619
620impl wasmtime_wasi::HostOutputStream for RemoteIo {
621 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
622 self.sender.send(bytes.to_vec()).unwrap();
623
624 Ok(())
625 }
626
627 fn flush(&mut self) -> StreamResult<()> {
628 Ok(())
629 }
630
631 fn check_write(&mut self) -> StreamResult<usize> {
632 Ok(1024 * 1024)
633 }
634}
635
636impl StdoutStream for RemoteIo {
637 fn stream(&self) -> Box<dyn wasmtime_wasi::HostOutputStream> {
638 Box::new(self.clone())
639 }
640
641 fn isatty(&self) -> bool {
642 false
643 }
644}
645
646#[async_trait::async_trait]
647impl wasmtime_wasi::Subscribe for RemoteIo {
648 async fn ready(&mut self) {}
649}
650
651pub async fn run_scenario(runtime: Runtime, scenario: Vec<u8>, config: Config) {
652 let mut executor = Executors::create_executor(config, runtime).await;
653
654 tokio::spawn(async move {
655 executor.prepare().await;
658 executor.run().await;
659 });
660}