fn0 0.2.22

FaaS platform powered by wasmtime
use crate::cache::Bundle;
use crate::measure_cpu_time::{Clock, SystemClock, TimeTracker};
use crate::self_invoke::{self, AccessorGuard, SELF_HOST, SelfInvokeHooks, call_service};
use crate::turso_hijack::TursoHijack;
use crate::{Request, Response, telemetry};
use anyhow::{Result, anyhow};
use futures::stream::{FuturesUnordered, StreamExt};
use http_body_util::BodyExt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::{
    Arc,
    atomic::{AtomicBool, Ordering},
};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::AsyncWrite;
use tokio::sync::{mpsc, oneshot};
use wasmtime::{Engine, Store, component::Linker};
use wasmtime_wasi::cli::AsyncStdoutStream;
use wasmtime_wasi::*;
use wasmtime_wasi_http::{
    WasiHttpCtx,
    p3::{Request as P3Request, WasiHttpCtxView, WasiHttpView, bindings::http::types::ErrorCode},
};

struct TracingWriter {
    code_id: String,
    is_stderr: bool,
    buf: Vec<u8>,
}

impl TracingWriter {
    fn new(code_id: String, is_stderr: bool) -> Self {
        Self {
            code_id,
            is_stderr,
            buf: Vec::with_capacity(1024),
        }
    }

    fn emit_line(&self, line: &str) {
        if self.is_stderr {
            tracing::error!(code_id = %self.code_id, stream = "stderr", "{}", line);
        } else {
            tracing::info!(code_id = %self.code_id, stream = "stdout", "{}", line);
        }
    }
}

impl AsyncWrite for TracingWriter {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        let this = Pin::get_mut(self);
        this.buf.extend_from_slice(buf);
        while let Some(pos) = this.buf.iter().position(|&b| b == b'\n') {
            let line: Vec<u8> = this.buf.drain(..=pos).collect();
            let line_str = String::from_utf8_lossy(&line[..line.len() - 1]);
            let trimmed = line_str.trim_end_matches('\r');
            if !trimmed.is_empty() {
                this.emit_line(trimmed);
            }
        }
        Poll::Ready(Ok(buf.len()))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        let this = Pin::get_mut(self);
        if !this.buf.is_empty() {
            let line_str = String::from_utf8_lossy(&this.buf);
            let trimmed = line_str.trim_end_matches(['\r', '\n']);
            if !trimmed.is_empty() {
                this.emit_line(trimmed);
            }
            this.buf.clear();
        }
        Poll::Ready(Ok(()))
    }
}

fn make_tracing_stream(code_id: String, is_stderr: bool) -> AsyncStdoutStream {
    AsyncStdoutStream::new(4096, TracingWriter::new(code_id, is_stderr))
}

pub use fn0_wasmtime::engine_config;

pub fn build_linker(engine: &Engine) -> Linker<ClientState<SystemClock>> {
    let mut linker = Linker::new(engine);
    wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
    wasmtime_wasi::p3::add_to_linker(&mut linker).unwrap();
    wasmtime_wasi_http::p3::add_to_linker(&mut linker).unwrap();
    linker
}

pub fn spawn_epoch_ticker(engine: Engine) {
    std::thread::Builder::new()
        .name("fn0-epoch-ticker".into())
        .spawn(move || {
            loop {
                std::thread::sleep(Duration::from_millis(3));
                engine.increment_epoch();
            }
        })
        .expect("failed to spawn epoch ticker thread");
}

pub(crate) fn build_store<C>(
    engine: &Engine,
    code_id: &str,
    env_vars: &[(String, String)],
    time_tracker: TimeTracker<C>,
    is_timeout: Arc<AtomicBool>,
    hooks: SelfInvokeHooks,
    turso_hijack: Option<&TursoHijack>,
) -> Store<ClientState<C>>
where
    C: Clock,
{
    let wasi = {
        let mut builder = WasiCtx::builder();
        builder.stdout(make_tracing_stream(code_id.to_string(), false));
        builder.stderr(make_tracing_stream(code_id.to_string(), true));
        for (key, value) in env_vars {
            if turso_hijack.is_some() && (key == "TURSO_URL" || key == "TURSO_AUTH_TOKEN") {
                continue;
            }
            builder.env(key, value);
        }
        if let Some(hijack) = turso_hijack {
            builder.env("TURSO_URL", format!("http://{}", hijack.placeholder_host));
            builder.env("TURSO_AUTH_TOKEN", "");
        }
        builder.build()
    };

    let mut store = Store::new(
        engine,
        ClientState {
            table: ResourceTable::new(),
            wasi,
            http: WasiHttpCtx::new(),
            time_tracker,
            code_id: code_id.to_string(),
            is_timeout,
            hooks,
        },
    );
    store.epoch_deadline_trap();
    store.set_epoch_deadline(1);
    store.epoch_deadline_async_yield_and_update(1);
    store.epoch_deadline_callback(|context| {
        let state = context.data();
        let cpu_time = state.time_tracker.duration();
        if cpu_time > Duration::from_millis(1000) {
            telemetry::cpu_timeout(&state.code_id, cpu_time);
            state.is_timeout.store(true, Ordering::Relaxed);
            return Ok(wasmtime::UpdateDeadline::Interrupt);
        }
        Ok(wasmtime::UpdateDeadline::Continue(1))
    });

    store
}

pub type WasmInjectEnvelope = (Request, oneshot::Sender<Result<Response>>);

pub async fn run_wasm_instance_loop(
    engine: &Engine,
    bundle: Arc<Bundle>,
    subdomain: String,
    mut rx: mpsc::UnboundedReceiver<WasmInjectEnvelope>,
    turso_hijack: Option<Arc<TursoHijack>>,
    otlp_hijack: Option<Arc<crate::OtlpHijack>>,
) -> Result<()> {
    let time_tracker = TimeTracker::new(SystemClock);
    let is_timeout = Arc::new(AtomicBool::new(false));

    let mut store = build_store(
        engine,
        &subdomain,
        &bundle.env_vars,
        time_tracker.clone(),
        is_timeout.clone(),
        SelfInvokeHooks::new(turso_hijack.clone(), otlp_hijack.clone()),
        turso_hijack.as_deref(),
    );

    let service = bundle
        .service_pre
        .instantiate_async(&mut store)
        .await
        .map_err(|error| {
            telemetry::wasmtime_error("instantiate_async", &subdomain, &format!("{error:?}"));
            anyhow!("instantiate_async failed: {error:?}")
        })?;

    let code_id_for_closure = subdomain.clone();
    let run_result = store
        .run_concurrent(async move |accessor| -> Result<()> {
            let _guard = AccessorGuard::install(accessor, &service);

            let mut pending: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
                FuturesUnordered::new();

            loop {
                tokio::select! {
                    biased;
                    maybe = rx.recv() => {
                        match maybe {
                            Some((req, resp_tx)) => {
                                let self_host = self_invoke::extract_host(req.headers())
                                    .unwrap_or_default();
                                let service_ref = &service;
                                let code_id = code_id_for_closure.clone();
                                let time_tracker = time_tracker.clone();
                                let is_timeout = is_timeout.clone();
                                pending.push(Box::pin(async move {
                                    let result = SELF_HOST
                                        .scope(self_host, async move {
                                            let req_http = req.map(|body| {
                                                body.map_err(|err| {
                                                    ErrorCode::InternalError(Some(err.to_string()))
                                                })
                                                .boxed_unsync()
                                            });
                                            let (p3_req, req_io) = P3Request::from_http(req_http);
                                            call_service(
                                                accessor,
                                                service_ref,
                                                p3_req,
                                                req_io,
                                                &code_id,
                                                time_tracker,
                                                &is_timeout,
                                            )
                                            .await
                                        })
                                        .await;
                                    let _ = resp_tx.send(result);
                                }));
                            }
                            None => {
                                while pending.next().await.is_some() {}
                                break;
                            }
                        }
                    }
                    Some(()) = pending.next() => {}
                }
            }

            telemetry::cpu_time(&code_id_for_closure, time_tracker.duration());
            Ok(())
        })
        .await;

    match run_result {
        Ok(inner) => inner,
        Err(error) => {
            telemetry::wasmtime_error("run_concurrent", &subdomain, &format!("{error:?}"));
            Err(anyhow!("run_concurrent failed: {error:?}"))
        }
    }
}

pub struct ClientState<C: Clock> {
    wasi: WasiCtx,
    http: WasiHttpCtx,
    table: ResourceTable,
    pub(crate) time_tracker: TimeTracker<C>,
    pub(crate) code_id: String,
    pub(crate) is_timeout: Arc<AtomicBool>,
    hooks: SelfInvokeHooks,
}

impl<C: Clock> WasiView for ClientState<C> {
    fn ctx(&mut self) -> WasiCtxView<'_> {
        WasiCtxView {
            ctx: &mut self.wasi,
            table: &mut self.table,
        }
    }
}

impl<C: Clock> WasiHttpView for ClientState<C> {
    fn http(&mut self) -> WasiHttpCtxView<'_> {
        WasiHttpCtxView {
            ctx: &mut self.http,
            table: &mut self.table,
            hooks: &mut self.hooks,
        }
    }
}