evix 0.3.0

Evaluate a Nix expression and stream derivation info as JSON lines
use std::{
    env, io,
    io::{BufRead, BufReader, Write},
    process::{Child, Command, Stdio},
    sync::{
        Arc, Condvar, Mutex,
        atomic::{AtomicBool, Ordering},
    },
    thread,
    time::Duration,
};

use anyhow::{Context as _, Result, bail};
use tracing::{debug, error, info, trace};

use crate::{Config, EvalError, Event, WORKER_ENV};

/// How often a collector parked waiting for work re-checks the cancellation
/// flag.
const CANCEL_POLL_INTERVAL: Duration = Duration::from_millis(200);

struct SharedState {
    todo: Vec<Vec<String>>,
    active: usize,
    error: Option<String>,
}

pub fn run<F>(config: &Config, cancel: &Arc<AtomicBool>, sink: Arc<Mutex<F>>) -> Result<()>
where
    F: FnMut(&Event) -> Result<()> + Send + 'static,
{
    let shared = Arc::new((
        Mutex::new(SharedState {
            todo: vec![vec![]],
            active: 0,
            error: None,
        }),
        Condvar::new(),
    ));

    let n = config.workers.max(1);
    let mut handles = Vec::with_capacity(n);
    for _ in 0..n {
        let shared = Arc::clone(&shared);
        let sink = Arc::clone(&sink);
        let cancel = Arc::clone(cancel);
        let config = config.clone();
        handles.push(thread::spawn(move || {
            collector(&config, &cancel, shared, sink)
        }));
    }

    for h in handles {
        h.join()
            .map_err(|_| anyhow::anyhow!("collector thread panicked"))??;
    }

    let (lock, _) = &*shared;
    if let Some(e) = &lock.lock().unwrap().error {
        bail!("{e}");
    }

    Ok(())
}

fn collector<F>(
    config: &Config,
    cancel: &Arc<AtomicBool>,
    shared: Arc<(Mutex<SharedState>, Condvar)>,
    sink: Arc<Mutex<F>>,
) -> Result<()>
where
    F: FnMut(&Event) -> Result<()>,
{
    let (lock, cvar) = &*shared;

    let (mut proc, mut child_stdin, mut reader) = spawn_worker_pair(config)?;

    loop {
        let path = {
            let mut s = lock.lock().unwrap();
            loop {
                if cancel.load(Ordering::Relaxed) {
                    writeln!(child_stdin, "exit")?;
                    child_stdin.flush()?;
                    let _ = proc.wait();
                    info!("cancellation requested, collector exiting");
                    return Ok(());
                }
                if let Some(ref e) = s.error {
                    let msg = e.clone();
                    writeln!(child_stdin, "exit")?;
                    child_stdin.flush()?;
                    error!(error = %msg, "stopping collector due to fatal error");
                    bail!("{msg}");
                }
                if !s.todo.is_empty() {
                    let p = s.todo.remove(0);
                    s.active += 1;
                    debug!(attr = %p.join("."), active = s.active, pending = s.todo.len(), "dispatched attribute");
                    break Some(p);
                }
                if s.active == 0 {
                    writeln!(child_stdin, "exit")?;
                    child_stdin.flush()?;
                    let _ = proc.wait();
                    info!("evaluation queue empty, exiting worker");
                    return Ok(());
                }
                // Park until new work arrives, but wake periodically so a
                // cancellation request is observed even when no events flow.
                let (guard, _timeout) = cvar.wait_timeout(s, CANCEL_POLL_INTERVAL).unwrap();
                s = guard;
            }
        };

        let Some(path) = path else {
            continue;
        };

        let attr = path.join(".");
        trace!(attr = %attr, "sending work to worker");
        writeln!(child_stdin, "do {}", serde_json::to_string(&path)?)?;
        child_stdin.flush()?;

        let event = read_event(&mut reader, &path).with_context(|| {
            format!(
                "reading event for {attr}; worker status: {}",
                worker_status(&mut proc)
            )
        })?;

        {
            let mut s = lock.lock().unwrap();
            s.active -= 1;

            if let Event::AttrSet { attrs, .. } = &event {
                debug!(attr = %attr, new_attrs = attrs.len(), "expanded attrset");
                for name in attrs {
                    let mut child = path.clone();
                    child.push(name.clone());
                    s.todo.push(child);
                }
            } else {
                if let Event::Error(EvalError {
                    fatal: true, error, ..
                }) = &event
                {
                    error!(attr = %attr, error = %error, "fatal evaluation error");
                    s.error = Some(error.clone());
                }
                sink.lock().unwrap()(&event).context("event sink returned an error")?;
            }

            cvar.notify_all();
        }

        let status = read_line(&mut reader).with_context(|| {
            format!(
                "reading worker status for {attr}; worker status: {}",
                worker_status(&mut proc)
            )
        })?;
        trace!(attr = %attr, status = %status, "received worker status");
        match status.as_str() {
            "ready" => {}
            "restart" => {
                info!("restarting worker due to memory limit");
                if let Ok(status) = proc.wait() {
                    debug!(?status, "previous worker exited");
                }
                (proc, child_stdin, reader) = spawn_worker_pair(config)?;
            }
            other => {
                if let Ok(Event::Error(EvalError { error, .. })) =
                    serde_json::from_str::<Event>(other)
                {
                    error!(error = %error, "worker reported error");
                    let mut s = lock.lock().unwrap();
                    s.error = Some(error.clone());
                    cvar.notify_all();
                    bail!("{error}");
                }
                bail!("unexpected worker message: {other}");
            }
        }
    }
}

fn read_event(reader: &mut BufReader<impl io::Read>, path: &[String]) -> Result<Event> {
    let resp = read_line(reader)?;
    serde_json::from_str(&resp)
        .with_context(|| format!("parsing worker response for {path:?}: {resp}"))
}

fn read_line(reader: &mut BufReader<impl io::Read>) -> Result<String> {
    let mut line = String::new();
    if reader.read_line(&mut line)? == 0 {
        bail!("worker closed stdout unexpectedly");
    }
    Ok(line.trim_matches(['\n', '\r', ' ']).to_string())
}

fn spawn_worker_pair(config: &Config) -> Result<(Child, impl Write, BufReader<impl io::Read>)> {
    let exe = env::current_exe().context("resolving current exe")?;
    debug!("spawning worker process");

    let mut child = Command::new(&exe)
        .env(WORKER_ENV, "1")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::inherit())
        .spawn()
        .with_context(|| format!("spawning worker process from {exe:?}"))?;

    let mut stdin = child.stdin.take().context("worker stdin")?;
    writeln!(stdin, "{}", serde_json::to_string(config)?)?;
    stdin.flush()?;

    let mut reader = BufReader::new(child.stdout.take().context("worker stdout")?);
    read_ready(&mut reader)?;
    info!("worker ready");

    Ok((child, stdin, reader))
}

fn read_ready(reader: &mut BufReader<impl io::Read>) -> Result<()> {
    let line = read_line(reader)?;
    if line != "ready" {
        bail!("unexpected worker handshake: {line:?}");
    }
    Ok(())
}

fn worker_status(proc: &mut Child) -> String {
    proc.try_wait()
        .ok()
        .flatten()
        .map_or_else(|| "unknown".to_string(), |s| s.to_string())
}