evix 0.3.2

Evaluate a Nix expression and stream derivation info as JSON lines
use std::{
  env,
  io,
  io::{BufRead, BufReader, Write},
  process::{Child, Command, Stdio},
  sync::{
    Arc,
    Condvar,
    Mutex,
    atomic::{AtomicBool, Ordering},
  },
  thread,
  time::Duration,
};

use anyhow::{Context as _, Result, bail};
use tracing::{debug, error, info, trace};

use crate::{Config, EvalError, Event, WORKER_ENV};

/// How often a collector parked waiting for work re-checks the cancellation
/// flag.
const CANCEL_POLL_INTERVAL: Duration = Duration::from_millis(200);

struct SharedState {
  todo:   Vec<Vec<String>>,
  active: usize,
  error:  Option<String>,
}

/// Spawn worker threads, each running a collector loop, and block until all
/// work is drained or a fatal error occurs.
///
/// The shared work-queue is seeded with the empty path `[]` (the root of the
/// expression). As attrsets are expanded their children are pushed back onto
/// the queue and dispatched to idle workers.
pub fn run<F>(
  config: &Config,
  cancel: &Arc<AtomicBool>,
  sink: Arc<Mutex<F>>,
) -> Result<()>
where
  F: FnMut(&Event) -> Result<()> + Send + 'static,
{
  let shared = Arc::new((
    Mutex::new(SharedState {
      todo:   vec![vec![]],
      active: 0,
      error:  None,
    }),
    Condvar::new(),
  ));

  let n = config.workers.max(1);
  let mut handles = Vec::with_capacity(n);
  for _ in 0..n {
    let shared = Arc::clone(&shared);
    let sink = Arc::clone(&sink);
    let cancel = Arc::clone(cancel);
    let config = config.clone();
    handles.push(thread::spawn(move || {
      collector(&config, &cancel, shared, sink)
    }));
  }

  for h in handles {
    h.join()
      .map_err(|_| anyhow::anyhow!("collector thread panicked"))??;
  }

  let (lock, _) = &*shared;
  if let Some(e) = &lock.lock().unwrap().error {
    bail!("{e}");
  }

  Ok(())
}

/// One collector thread: owns a worker child process and feeds it attribute
/// paths drawn from the shared queue. Incoming [`Event`]s are handed to `sink`;
/// attrset expansions push new paths back onto the queue.
///
/// The collector parks when the queue is empty (but work is still in-flight)
/// and periodically wakes to check the cancellation flag. On cancellation or
/// empty-queue, it sends `exit` to the worker, waits for the process, and
/// returns.
fn collector<F>(
  config: &Config,
  cancel: &Arc<AtomicBool>,
  shared: Arc<(Mutex<SharedState>, Condvar)>,
  sink: Arc<Mutex<F>>,
) -> Result<()>
where
  F: FnMut(&Event) -> Result<()>,
{
  let (lock, cvar) = &*shared;

  let (mut proc, mut child_stdin, mut reader) = spawn_worker_pair(config)?;

  loop {
    let path = {
      let mut s = lock.lock().unwrap();
      loop {
        if cancel.load(Ordering::Relaxed) {
          writeln!(child_stdin, "exit")?;
          child_stdin.flush()?;
          let _ = proc.wait();
          info!("cancellation requested, collector exiting");
          return Ok(());
        }
        if let Some(ref e) = s.error {
          let msg = e.clone();
          writeln!(child_stdin, "exit")?;
          child_stdin.flush()?;
          error!(error = %msg, "stopping collector due to fatal error");
          bail!("{msg}");
        }
        if !s.todo.is_empty() {
          let p = s.todo.remove(0);
          s.active += 1;
          debug!(attr = %p.join("."), active = s.active, pending = s.todo.len(), "dispatched attribute");
          break Some(p);
        }
        if s.active == 0 {
          writeln!(child_stdin, "exit")?;
          child_stdin.flush()?;
          let _ = proc.wait();
          info!("evaluation queue empty, exiting worker");
          return Ok(());
        }
        // Park until new work arrives, but wake periodically so a
        // cancellation request is observed even when no events flow.
        let (guard, _timeout) =
          cvar.wait_timeout(s, CANCEL_POLL_INTERVAL).unwrap();
        s = guard;
      }
    };

    let Some(path) = path else {
      continue;
    };

    let attr = path.join(".");
    trace!(attr = %attr, "sending work to worker");
    writeln!(child_stdin, "do {}", serde_json::to_string(&path)?)?;
    child_stdin.flush()?;

    let event = read_event(&mut reader, &path).with_context(|| {
      format!(
        "reading event for {attr}; worker status: {}",
        worker_status(&mut proc)
      )
    })?;

    {
      let mut s = lock.lock().unwrap();
      s.active -= 1;

      if let Event::AttrSet { attrs, .. } = &event {
        debug!(attr = %attr, new_attrs = attrs.len(), "expanded attrset");
        for name in attrs {
          let mut child = path.clone();
          child.push(name.clone());
          s.todo.push(child);
        }
      } else {
        if let Event::Error(EvalError {
          fatal: true, error, ..
        }) = &event
        {
          error!(attr = %attr, error = %error, "fatal evaluation error");
          s.error = Some(error.clone());
        }
        sink.lock().unwrap()(&event).context("event sink returned an error")?;
      }

      cvar.notify_all();
    }

    let status = read_line(&mut reader).with_context(|| {
      format!(
        "reading worker status for {attr}; worker status: {}",
        worker_status(&mut proc)
      )
    })?;
    trace!(attr = %attr, status = %status, "received worker status");
    match status.as_str() {
      "ready" => {},
      "restart" => {
        info!("restarting worker due to memory limit");
        if let Ok(status) = proc.wait() {
          debug!(?status, "previous worker exited");
        }
        (proc, child_stdin, reader) = spawn_worker_pair(config)?;
      },
      other => {
        if let Ok(Event::Error(EvalError { error, .. })) =
          serde_json::from_str::<Event>(other)
        {
          error!(error = %error, "worker reported error");
          let mut s = lock.lock().unwrap();
          s.error = Some(error.clone());
          cvar.notify_all();
          bail!("{error}");
        }
        bail!("unexpected worker message: {other}");
      },
    }
  }
}

fn read_event(
  reader: &mut BufReader<impl io::Read>,
  path: &[String],
) -> Result<Event> {
  let resp = read_line(reader)?;
  serde_json::from_str(&resp)
    .with_context(|| format!("parsing worker response for {path:?}: {resp}"))
}

fn read_line(reader: &mut BufReader<impl io::Read>) -> Result<String> {
  let mut line = String::new();
  if reader.read_line(&mut line)? == 0 {
    bail!("worker closed stdout unexpectedly");
  }
  Ok(line.trim_matches(['\n', '\r', ' ']).to_string())
}

/// Spawn a worker subprocess, handshake, and return the child handle together
/// with its stdin writer and stdout reader.
///
/// The worker is the same binary re-executed with [`WORKER_ENV`] set.
fn spawn_worker_pair(
  config: &Config,
) -> Result<(Child, impl Write, BufReader<impl io::Read>)> {
  let exe = env::current_exe().context("resolving current exe")?;
  debug!("spawning worker process");

  let mut child = Command::new(&exe)
    .env(WORKER_ENV, "1")
    .stdin(Stdio::piped())
    .stdout(Stdio::piped())
    .stderr(Stdio::inherit())
    .spawn()
    .with_context(|| format!("spawning worker process from {exe:?}"))?;

  let mut stdin = child.stdin.take().context("worker stdin")?;
  writeln!(stdin, "{}", serde_json::to_string(config)?)?;
  stdin.flush()?;

  let mut reader =
    BufReader::new(child.stdout.take().context("worker stdout")?);
  read_ready(&mut reader)?;
  info!("worker ready");

  Ok((child, stdin, reader))
}

fn read_ready(reader: &mut BufReader<impl io::Read>) -> Result<()> {
  let line = read_line(reader)?;
  if line != "ready" {
    bail!("unexpected worker handshake: {line:?}");
  }
  Ok(())
}

fn worker_status(proc: &mut Child) -> String {
  proc
    .try_wait()
    .ok()
    .flatten()
    .map_or_else(|| "unknown".to_string(), |s| s.to_string())
}