fn0 0.2.12

FaaS platform powered by wasmtime
use bytes::Bytes;
use opentelemetry::KeyValue;
use opentelemetry::metrics::Counter;
use std::io::Read;
use std::sync::OnceLock;
use struson::reader::{JsonReader, JsonStreamReader, ValueType};
use tokio::runtime::Handle;
use tokio::sync::mpsc::UnboundedReceiver;

pub struct ParseJob {
    pub code_id: String,
    pub bytes_rx: UnboundedReceiver<Bytes>,
}

static PARSER: OnceLock<crossbeam_channel::Sender<ParseJob>> = OnceLock::new();

pub fn submit(job: ParseJob) {
    let tx = PARSER.get_or_init(|| {
        let handle = Handle::current();
        let (tx, rx) = crossbeam_channel::unbounded::<ParseJob>();
        std::thread::Builder::new()
            .name("turso-parse".into())
            .spawn(move || {
                let _guard = handle.enter();
                parser_loop(rx);
            })
            .expect("failed to spawn turso-parse thread");
        tx
    });
    if let Err(err) = tx.send(job) {
        tracing::warn!(%err, "turso parse thread queue closed");
    }
}

fn parser_loop(rx: crossbeam_channel::Receiver<ParseJob>) {
    while let Ok(job) = rx.recv() {
        let code_id = job.code_id.clone();
        let reader = BlockingBytesReader::new(job.bytes_rx);
        match parse_pipeline_response(reader) {
            Ok(totals) => emit_metrics(&code_id, totals),
            Err(err) => tracing::debug!(%err, %code_id, "turso response parse error"),
        }
    }
}

fn emit_metrics(code_id: &str, totals: Totals) {
    let subdomain = code_id.split("::").next().unwrap_or(code_id).to_string();
    let attrs = [KeyValue::new("code_id", subdomain)];
    if totals.rows_read > 0 {
        rows_read_counter().add(totals.rows_read, &attrs);
    }
    if totals.rows_written > 0 {
        rows_written_counter().add(totals.rows_written, &attrs);
    }
}

fn rows_read_counter() -> &'static Counter<u64> {
    static COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
    COUNTER.get_or_init(|| {
        opentelemetry::global::meter("fn0-worker")
            .u64_counter("turso.rows_read")
            .with_description("Rows read reported by Turso hrana responses")
            .build()
    })
}

fn rows_written_counter() -> &'static Counter<u64> {
    static COUNTER: OnceLock<Counter<u64>> = OnceLock::new();
    COUNTER.get_or_init(|| {
        opentelemetry::global::meter("fn0-worker")
            .u64_counter("turso.rows_written")
            .with_description("Rows written reported by Turso hrana responses")
            .build()
    })
}

#[derive(Default)]
struct Totals {
    rows_read: u64,
    rows_written: u64,
}

fn parse_pipeline_response<R: Read>(reader: R) -> Result<Totals, struson::reader::ReaderError> {
    let mut json = JsonStreamReader::new(reader);
    let mut totals = Totals::default();

    json.begin_object()?;
    while json.has_next()? {
        let key = json.next_name_owned()?;
        if key == "results" {
            json.begin_array()?;
            while json.has_next()? {
                walk_stream_result(&mut json, &mut totals)?;
            }
            json.end_array()?;
        } else {
            json.skip_value()?;
        }
    }
    json.end_object()?;
    Ok(totals)
}

fn walk_stream_result<R: Read>(
    json: &mut JsonStreamReader<R>,
    totals: &mut Totals,
) -> Result<(), struson::reader::ReaderError> {
    if json.peek()? != ValueType::Object {
        json.skip_value()?;
        return Ok(());
    }
    json.begin_object()?;
    while json.has_next()? {
        let key = json.next_name_owned()?;
        if key == "response" {
            walk_response(json, totals)?;
        } else {
            json.skip_value()?;
        }
    }
    json.end_object()?;
    Ok(())
}

fn walk_response<R: Read>(
    json: &mut JsonStreamReader<R>,
    totals: &mut Totals,
) -> Result<(), struson::reader::ReaderError> {
    if json.peek()? != ValueType::Object {
        json.skip_value()?;
        return Ok(());
    }
    json.begin_object()?;
    while json.has_next()? {
        let key = json.next_name_owned()?;
        if key == "result" {
            walk_result_payload(json, totals)?;
        } else {
            json.skip_value()?;
        }
    }
    json.end_object()?;
    Ok(())
}

fn walk_result_payload<R: Read>(
    json: &mut JsonStreamReader<R>,
    totals: &mut Totals,
) -> Result<(), struson::reader::ReaderError> {
    if json.peek()? != ValueType::Object {
        json.skip_value()?;
        return Ok(());
    }
    json.begin_object()?;
    while json.has_next()? {
        let key = json.next_name_owned()?;
        match key.as_str() {
            "rows_read" => {
                if let Ok(Ok(n)) = json.next_number::<u64>() {
                    totals.rows_read += n;
                }
            }
            "rows_written" => {
                if let Ok(Ok(n)) = json.next_number::<u64>() {
                    totals.rows_written += n;
                }
            }
            "step_results" => {
                json.begin_array()?;
                while json.has_next()? {
                    walk_result_payload(json, totals)?;
                }
                json.end_array()?;
            }
            _ => json.skip_value()?,
        }
    }
    json.end_object()?;
    Ok(())
}

struct BlockingBytesReader {
    rx: UnboundedReceiver<Bytes>,
    leftover: Option<Bytes>,
}

impl BlockingBytesReader {
    fn new(rx: UnboundedReceiver<Bytes>) -> Self {
        Self {
            rx,
            leftover: None,
        }
    }
}

impl Read for BlockingBytesReader {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            if let Some(mut chunk) = self.leftover.take() {
                let n = chunk.len().min(buf.len());
                buf[..n].copy_from_slice(&chunk[..n]);
                if n < chunk.len() {
                    self.leftover = Some(chunk.split_off(n));
                }
                return Ok(n);
            }
            match self.rx.blocking_recv() {
                Some(chunk) if chunk.is_empty() => continue,
                Some(chunk) => self.leftover = Some(chunk),
                None => return Ok(0),
            }
        }
    }
}