use std::{
env,
fs,
mem,
path::PathBuf,
process,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::{Context as _, Result, bail};
use nix_bindings::{Context, EvalState, EvalStateBuilder, Store, Value};
use tokio::io::{BufReader, BufWriter};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tracing::{debug, trace, warn};
use crate::{
AutoArg,
Input,
remote_proto::{ClientMessage, ServerMessage, read_client, write_server},
worker_config::WorkerConfig,
worker_process::WorkerStatus,
};
#[allow(clippy::arc_with_non_send_sync)]
pub async fn run() -> Result<()> {
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin).compat();
let mut writer = BufWriter::new(stdout).compat_write();
let config = match read_client(&mut reader).await? {
ClientMessage::Setup(config) => config,
other => bail!("worker expected setup message, got {other:?}"),
};
debug!("worker initialized");
let _nix_options_file = apply_nix_options(&config.nix_options)?;
let ctx = Arc::new(Context::new().context("Nix context")?);
let store = Arc::new(Store::open(&ctx, None).context("Nix store")?);
let eval_options = crate::eval::EvalOptions::from(&config);
let state = build_eval_state(&ctx, &store, &config)?;
let auto_args = build_auto_args(&state, &config.auto_args)?;
let auto_ref = auto_args.as_ref();
let root = eval_root(&ctx, &state, &config, auto_ref)?;
write_server(&mut writer, &ServerMessage::Ready).await?;
loop {
let path = match read_client(&mut reader).await? {
ClientMessage::Work(path) => path,
ClientMessage::Shutdown => {
debug!("received shutdown command, worker exiting");
break;
},
ClientMessage::Setup(_) => bail!("worker setup sent twice"),
};
let attr = path.join(".");
trace!(attr = %attr, "evaluating attribute");
let response = crate::eval::process_attr(
&state,
&store,
&root,
&path,
auto_ref,
&eval_options,
);
write_server(&mut writer, &ServerMessage::Event(Box::new(response)))
.await?;
if should_restart(config.max_memory_size) {
warn!(
max_rss_kb = get_maxrss_kb(),
"memory limit exceeded, worker restarting"
);
write_server(&mut writer, &ServerMessage::Status(WorkerStatus::Restart))
.await?;
return Ok(());
}
write_server(&mut writer, &ServerMessage::Status(WorkerStatus::Ready))
.await?;
}
Ok(())
}
fn apply_nix_options(options: &[(String, String)]) -> Result<Option<PathBuf>> {
if options.is_empty() {
return Ok(None);
}
let path = env::temp_dir().join(format!(
"evix-nix-options-{}-{}.conf",
process::id(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_nanos())
));
let mut contents = String::new();
for (key, value) in options {
contents.push_str(key);
contents.push_str(" = ");
contents.push_str(value);
contents.push('\n');
}
fs::write(&path, contents).context("writing Nix options file")?;
unsafe {
env::set_var("NIX_USER_CONF_FILES", &path);
}
Ok(Some(path))
}
#[allow(clippy::arc_with_non_send_sync)]
fn build_eval_state(
_ctx: &Arc<Context>,
store: &Arc<Store>,
config: &WorkerConfig,
) -> Result<EvalState> {
let mut builder =
EvalStateBuilder::new(store).context("eval state builder")?;
#[cfg(feature = "flake")]
if matches!(config.input, Input::Flake(_)) {
let fs = nix_bindings::flake::FlakeSettings::new(_ctx)
.context("flake settings")?;
builder = builder
.with_flake_settings(&fs)
.context("applying flake settings")?;
}
builder.build().context("building eval state")
}
fn eval_root<'s>(
ctx: &Arc<Context>,
state: &'s EvalState,
config: &WorkerConfig,
auto_args: Option<&Value<'s>>,
) -> Result<Value<'s>> {
match &config.input {
Input::Flake(flake_ref) => {
eval_flake(ctx, state, flake_ref, &config.override_inputs)
},
Input::Expr(expr) => {
let v = state
.eval_from_string(expr, "<cmdline>")
.context("evaluating expression")?;
Ok(state.auto_call_function(auto_args, &v)?)
},
Input::File(file) => {
let v = state.eval_from_file(file).context("evaluating file")?;
Ok(state.auto_call_function(auto_args, &v)?)
},
}
}
#[cfg(feature = "flake")]
#[allow(clippy::arc_with_non_send_sync)]
fn eval_flake<'s>(
ctx: &Arc<Context>,
state: &'s EvalState,
flake_ref_str: &str,
override_inputs: &[(String, String)],
) -> Result<Value<'s>> {
use nix_bindings::flake::{
FetchersSettings,
FlakeReference,
FlakeReferenceParseFlags,
LockFlags,
LockMode,
LockedFlake,
};
let flake_settings = Arc::new(
nix_bindings::flake::FlakeSettings::new(ctx).context("flake settings")?,
);
let fetchers = FetchersSettings::new(ctx).context("fetcher settings")?;
let base_dir =
env::current_dir().context("resolving flake base directory")?;
let parse_flags = FlakeReferenceParseFlags::new(ctx, &flake_settings)
.context("parse flags")?
.set_base_directory(&base_dir.to_string_lossy())
.with_context(|| {
format!("setting flake base directory {}", base_dir.display())
})?;
let (flake_ref, fragment) = FlakeReference::parse(
ctx,
&fetchers,
&flake_settings,
&parse_flags,
flake_ref_str,
)
.context("parsing flake reference")?;
let mut lock_flags = LockFlags::new(ctx, &flake_settings)
.context("lock flags")?
.set_mode(LockMode::Virtual)
.context("setting lock mode")?;
for (name, value) in override_inputs {
let (override_ref, _fragment) = FlakeReference::parse(
ctx,
&fetchers,
&flake_settings,
&parse_flags,
value,
)
.with_context(|| {
format!("parsing --override-input {name} reference {value:?}")
})?;
lock_flags = lock_flags
.add_input_override(name, &override_ref)
.with_context(|| format!("applying --override-input {name}"))?;
}
let locked = LockedFlake::lock(
ctx,
&fetchers,
&flake_settings,
state,
&lock_flags,
&flake_ref,
)
.context("locking flake")?;
let outputs = locked
.output_attrs(&flake_settings, state)
.context("flake outputs")?;
if fragment.is_empty() {
return Ok(outputs);
}
let mut current: Value<'s> = outputs;
for part in fragment.split('.') {
let next = {
let raw = current
.get_attr(part)
.with_context(|| format!("fragment attr {part:?}"))?;
state
.auto_call_function(None, &raw)
.with_context(|| format!("auto-calling fragment {part:?}"))?
};
current = next;
}
Ok(current)
}
fn build_auto_args<'s>(
state: &'s EvalState,
args: &[(String, AutoArg)],
) -> Result<Option<Value<'s>>> {
if args.is_empty() {
return Ok(None);
}
let mut pairs: Vec<(String, Value<'s>)> = Vec::new();
for (name, arg) in args {
let val = match arg {
AutoArg::Expr(expr) => {
state
.eval_from_string(expr, "<arg>")
.with_context(|| format!("--arg {name}"))?
},
AutoArg::Str(s) => {
state
.make_string(s)
.with_context(|| format!("--argstr {name}"))?
},
};
pairs.push((name.clone(), val));
}
let pair_refs: Vec<(&str, &Value<'_>)> =
pairs.iter().map(|(k, v)| (k.as_str(), v)).collect();
let attrs = state
.make_attrs(&pair_refs)
.context("building auto-args attrset")?;
Ok(Some(attrs))
}
fn should_restart(max_memory_mb: usize) -> bool {
get_maxrss_kb() > max_memory_mb * 1024
}
fn get_maxrss_kb() -> usize {
let mut usage: libc::rusage = unsafe { mem::zeroed() };
unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut usage) };
let rss = usage.ru_maxrss as usize;
if cfg!(target_os = "macos") {
rss / 1024
} else {
rss
}
}