Skip to main content

noether_engine/executor/
nix.rs

1//! Nix-based executor for synthesized stages.
2//!
3//! Runs stage implementations as subprocesses using `nix run nixpkgs#<runtime>`,
4//! giving a reproducible, Nix-pinned runtime for Python/JavaScript/Bash code
5//! without requiring any ambient language runtime on the host.
6//!
7//! **This is a reproducibility boundary, not an isolation boundary.** Stages
8//! run with the privileges of the host user — they can read/write the
9//! filesystem, make arbitrary network calls, and read environment variables.
10//! Do not execute untrusted stages on a host with credentials you are not
11//! willing to risk. See SECURITY.md for the full trust model.
12//!
13//! ## Execution protocol
14//!
15//! - stdin  → JSON-encoded input value followed by a newline
16//! - stdout → JSON-encoded output value followed by a newline
17//! - stderr → error message (any content is treated as failure)
18//! - exit 0 → success; exit non-zero → `ExecutionError::StageFailed`
19//!
20//! ## Timeout
21//!
22//! Every execution is bounded by [`NixConfig::timeout_secs`] (default 30 s).
23//! When the child process exceeds the limit it is sent SIGKILL and the call
24//! returns [`ExecutionError::TimedOut`].
25//!
26//! ## Generated wrapper (Python example)
27//!
28//! ```python
29//! import sys, json as _json
30//!
31//! # ---- user code ----
32//! def execute(input_value):
33//!     ...
34//! # -------------------
35//!
36//! if __name__ == '__main__':
37//!     try:
38//!         _output = execute(_json.loads(sys.stdin.read()))
39//!         print(_json.dumps(_output))
40//!     except Exception as e:
41//!         print(str(e), file=sys.stderr)
42//!         sys.exit(1)
43//! ```
44
45use super::{ExecutionError, StageExecutor};
46use noether_core::stage::StageId;
47use serde_json::Value;
48use sha2::{Digest, Sha256};
49use std::collections::HashMap;
50use std::io::Write as IoWrite;
51use std::path::{Path, PathBuf};
52use std::process::{Command, Stdio};
53use std::sync::mpsc;
54use std::time::Duration;
55
56// ── Configuration ────────────────────────────────────────────────────────────
57
58/// Tunable knobs for the [`NixExecutor`].
59#[derive(Debug, Clone)]
60pub struct NixConfig {
61    /// Wall-clock timeout for a single stage execution in seconds.
62    /// The child process is killed with SIGKILL when exceeded.
63    /// Default: 30 s.
64    pub timeout_secs: u64,
65    /// Maximum number of bytes read from a stage's stdout before truncation.
66    /// Prevents runaway allocations from stages that produce huge outputs.
67    /// Default: 10 MiB.
68    pub max_output_bytes: usize,
69    /// Maximum number of bytes captured from stderr (for error messages).
70    /// Default: 64 KiB.
71    pub max_stderr_bytes: usize,
72    /// Isolation backend to wrap each stage subprocess in. When set
73    /// to [`super::isolation::IsolationBackend::None`] (the default
74    /// for back-compat), stages run with full host-user privileges
75    /// — see SECURITY.md. Set via
76    /// [`NixConfig::with_isolation`] or the CLI `--isolate` flag.
77    pub isolation: super::isolation::IsolationBackend,
78}
79
80impl Default for NixConfig {
81    fn default() -> Self {
82        Self {
83            timeout_secs: 30,
84            max_output_bytes: 10 * 1024 * 1024,
85            max_stderr_bytes: 64 * 1024,
86            isolation: super::isolation::IsolationBackend::None,
87        }
88    }
89}
90
91impl NixConfig {
92    /// Set the isolation backend. Returns `self` for chaining.
93    pub fn with_isolation(mut self, backend: super::isolation::IsolationBackend) -> Self {
94        self.isolation = backend;
95        self
96    }
97}
98
99// ── Internal stage storage ───────────────────────────────────────────────────
100
101/// Maps stage IDs to their implementation (source code + language tag +
102/// declared effects — needed so the isolation layer can derive a
103/// policy).
104#[derive(Clone)]
105struct StageImpl {
106    code: String,
107    language: String,
108    effects: noether_core::effects::EffectSet,
109}
110
111// ── NixExecutor ──────────────────────────────────────────────────────────────
112
113/// Executor that runs synthesized stages through Nix-managed language runtimes.
114///
115/// When `nix` is available, each stage is executed as a subprocess with a
116/// Nix-pinned runtime (e.g. `nix run nixpkgs#python3 -- stage.py`). The Nix
117/// binary cache ensures the runtime is downloaded once and then reused from
118/// the store. **This gives reproducibility, not isolation**: the subprocess
119/// inherits the host user's privileges, filesystem, and network. See module
120/// docs and SECURITY.md for the full trust model.
121///
122/// ## Resource limits
123///
124/// - **Timeout**: configured via [`NixConfig::timeout_secs`] (default 30 s).
125///   The child is sent SIGKILL when the limit is exceeded.
126/// - **Output cap**: configured via [`NixConfig::max_output_bytes`] (default 10 MiB).
127pub struct NixExecutor {
128    nix_bin: PathBuf,
129    cache_dir: PathBuf,
130    config: NixConfig,
131    implementations: HashMap<String, StageImpl>,
132}
133
134impl NixExecutor {
135    /// Probe the system for a usable `nix` binary.
136    /// Returns the path if found, or `None` if Nix is not installed.
137    pub fn find_nix() -> Option<PathBuf> {
138        // Determinate Systems installer puts nix here:
139        let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
140        if determinate.exists() {
141            return Some(determinate);
142        }
143
144        // Walk $PATH directly rather than spawning `which`. Avoids a
145        // subprocess + the risk that `which` is missing or shadowed on
146        // minimal systems (e.g. some container base images).
147        let path_env = std::env::var_os("PATH")?;
148        for dir in std::env::split_paths(&path_env) {
149            let candidate = dir.join("nix");
150            if candidate.is_file() {
151                return Some(candidate);
152            }
153        }
154        None
155    }
156
157    /// Build an executor that can run synthesized stages found in `store`.
158    ///
159    /// Returns `None` when `nix` is not available — callers should fall back to
160    /// `InlineExecutor` exclusively in that case.
161    pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
162        Self::from_store_with_config(store, NixConfig::default())
163    }
164
165    /// Like [`from_store`] but with a custom [`NixConfig`].
166    pub fn from_store_with_config(
167        store: &dyn noether_store::StageStore,
168        config: NixConfig,
169    ) -> Option<Self> {
170        let nix_bin = Self::find_nix()?;
171
172        let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
173        let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
174        let _ = std::fs::create_dir_all(&cache_dir);
175
176        let mut implementations = HashMap::new();
177        for stage in store.list(None) {
178            if let (Some(code), Some(lang)) =
179                (&stage.implementation_code, &stage.implementation_language)
180            {
181                implementations.insert(
182                    stage.id.0.clone(),
183                    StageImpl {
184                        code: code.clone(),
185                        language: lang.clone(),
186                        effects: stage.signature.effects.clone(),
187                    },
188                );
189            }
190        }
191
192        Some(Self {
193            nix_bin,
194            cache_dir,
195            config,
196            implementations,
197        })
198    }
199
200    /// Clone the current config (minus the implementations map) for
201    /// callers that want to rebuild with different knobs.
202    pub fn config_snapshot(&self) -> NixConfig {
203        self.config.clone()
204    }
205
206    /// Rebuild a NixExecutor with a replacement config, preserving
207    /// its registered implementations. Returns `Some(..)` or `None`
208    /// when reconstruction fails — today it can't fail, but the
209    /// Option keeps the API forward-compatible.
210    pub fn rebuild_with_config(mut self, config: NixConfig) -> Option<Self> {
211        self.config = config;
212        Some(self)
213    }
214
215    /// Register a stage with explicit declared effects. Used by tests
216    /// and by callers that want to drive the isolation policy without
217    /// going through the full StageStore.
218    pub fn register_with_effects(
219        &mut self,
220        stage_id: &StageId,
221        code: &str,
222        language: &str,
223        effects: noether_core::effects::EffectSet,
224    ) {
225        self.implementations.insert(
226            stage_id.0.clone(),
227            StageImpl {
228                code: code.into(),
229                language: language.into(),
230                effects,
231            },
232        );
233    }
234
235    /// True when we have a real implementation for this stage.
236    pub fn has_implementation(&self, stage_id: &StageId) -> bool {
237        self.implementations.contains_key(&stage_id.0)
238    }
239
240    /// Pre-fetch the Python 3 runtime into the Nix store in a background thread.
241    ///
242    /// The first time any Python stage runs, Nix may take several seconds to
243    /// download and verify the runtime closure.  Calling `warmup()` at startup
244    /// overlaps that latency with application boot time.
245    ///
246    /// The returned `JoinHandle` can be ignored — any error is logged to stderr
247    /// but does not affect correctness; the runtime will still be fetched on first
248    /// actual use.
249    pub fn warmup(&self) -> std::thread::JoinHandle<()> {
250        let nix_bin = self.nix_bin.clone();
251        std::thread::spawn(move || {
252            // `nix build` with `--dry-run` is enough to populate the binary cache
253            // without running any user code.
254            let status = Command::new(&nix_bin)
255                .args([
256                    "build",
257                    "--no-link",
258                    "--quiet",
259                    "--no-write-lock-file",
260                    "nixpkgs#python3",
261                ])
262                .stdout(Stdio::null())
263                .stderr(Stdio::null())
264                .status();
265            match status {
266                Ok(s) if s.success() => {
267                    eprintln!("[noether] nix warmup: python3 runtime cached");
268                }
269                Ok(s) => {
270                    eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
271                }
272                Err(e) => {
273                    eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
274                }
275            }
276        })
277    }
278
279    // ── Internal helpers ─────────────────────────────────────────────────────
280
281    /// Hash the code string to get a stable cache key.
282    fn code_hash(code: &str) -> String {
283        hex::encode(Sha256::digest(code.as_bytes()))
284    }
285
286    /// Ensure the wrapped script for `impl_hash` exists on disk.
287    /// Returns the path to the file.
288    fn ensure_script(
289        &self,
290        impl_hash: &str,
291        code: &str,
292        language: &str,
293    ) -> Result<PathBuf, ExecutionError> {
294        let ext = match language {
295            "javascript" | "js" => "js",
296            "bash" | "sh" => "sh",
297            _ => "py",
298        };
299
300        let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
301        if path.exists() {
302            return Ok(path);
303        }
304
305        let wrapped = match language {
306            "javascript" | "js" => Self::wrap_javascript(code),
307            "bash" | "sh" => Self::wrap_bash(code),
308            _ => Self::wrap_python(code),
309        };
310
311        std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
312            stage_id: StageId(impl_hash.into()),
313            message: format!("failed to write stage script: {e}"),
314        })?;
315
316        Ok(path)
317    }
318
319    /// Run the stage script via Nix with JSON on stdin, enforcing timeout and
320    /// output-size limits.
321    fn run_script(
322        &self,
323        stage_id: &StageId,
324        script: &Path,
325        language: &str,
326        input: &Value,
327    ) -> Result<Value, ExecutionError> {
328        let input_json = serde_json::to_string(input).unwrap_or_default();
329
330        let code = self
331            .implementations
332            .get(&stage_id.0)
333            .map(|i| i.code.as_str())
334            .unwrap_or("");
335
336        let (nix_subcommand, args) = self.build_nix_command(language, script, code);
337
338        // Build the full argv — either raw (no isolation) or wrapped in
339        // `bwrap` when an isolation backend is configured. The wrapped
340        // command spawns bwrap which execs the inner command inside a
341        // fresh sandbox.
342        let raw_argv: Vec<String> = if nix_subcommand == "__direct__" {
343            args.clone()
344        } else {
345            let mut v = vec![self.nix_bin.display().to_string(), nix_subcommand.clone()];
346            v.push("--no-write-lock-file".into());
347            v.push("--quiet".into());
348            v.extend(args.iter().cloned());
349            v
350        };
351
352        let mut spawn = match &self.config.isolation {
353            super::isolation::IsolationBackend::None => {
354                // No sandbox — legacy behaviour.
355                let mut cmd = Command::new(&raw_argv[0]);
356                cmd.args(&raw_argv[1..]);
357                cmd
358            }
359            super::isolation::IsolationBackend::Bwrap { bwrap_path } => {
360                // /work is a sandbox-private tmpfs (set by
361                // `IsolationPolicy::from_effects` default) — no host-side
362                // tmpdir to manage, no cleanup, no race.
363                let mut policy = super::isolation::IsolationPolicy::from_effects(
364                    self.implementations
365                        .get(&stage_id.0)
366                        .map(|i| &i.effects)
367                        .unwrap_or(&noether_core::effects::EffectSet::pure()),
368                );
369                // Expose the stage-script cache (where this invocation's
370                // wrapped `.py` / `.sh` / `.js` file lives). Scoped to
371                // `cache_dir` so the sandbox sees noether's own
372                // workspace and nothing else from the host user's home.
373                policy
374                    .ro_binds
375                    .push((self.cache_dir.to_path_buf(), self.cache_dir.to_path_buf()));
376                // Nix binary visibility inside the sandbox has three cases:
377                //
378                // 1. `nix_bin` is under `/nix/store` — covered by the
379                //    default `/nix/store` bind. Nothing to add.
380                // 2. `nix_bin` is under `cache_dir` — covered by the
381                //    `cache_dir` bind above. Nothing to add.
382                // 3. `nix_bin` is a distro-packaged install (e.g.
383                //    `/usr/bin/nix`, `/usr/local/bin/nix`). The
384                //    binary is dynamically linked against glibc,
385                //    libcrypto, and readline living in `/usr/lib*`.
386                //    Binding just the nix executable file would let
387                //    the sandbox exec it but immediately fail
388                //    resolving `ld-linux-x86-64.so.2` — the kernel
389                //    can't find the dynamic loader.
390                //
391                //    Widening the bind set to include `/usr/lib*`
392                //    re-exposes the full suid-binary surface the
393                //    hardening closed. Instead: refuse to run, with
394                //    a clear message pointing the operator at the
395                //    Nix-native install path. The trust model here
396                //    is "nix belongs to the same reproducibility
397                //    boundary as the stages it dispatches;" a
398                //    distro-packaged nix violates that boundary
399                //    anyway.
400                if !self.nix_bin.starts_with("/nix/store")
401                    && !self.nix_bin.starts_with(&self.cache_dir)
402                {
403                    return Err(ExecutionError::StageFailed {
404                        stage_id: stage_id.clone(),
405                        message: format!(
406                            "stage isolation is enabled but nix is installed at \
407                             {} (outside /nix/store). A distro-packaged nix is \
408                             dynamically linked against host libraries; binding \
409                             those into the sandbox would defeat isolation. \
410                             Install nix via the Determinate / upstream \
411                             installer (places nix under /nix/store) or pass \
412                             --isolate=none to run without the sandbox.",
413                            self.nix_bin.display()
414                        ),
415                    });
416                }
417                // `build_bwrap_command` emits `--setenv` args for
418                // the sandbox's env allowlist (HOME=/work,
419                // USER=nobody, + inherited). Nothing else to do here.
420                super::isolation::build_bwrap_command(bwrap_path, &policy, &raw_argv)
421            }
422        };
423
424        let mut child = spawn
425            .stdin(Stdio::piped())
426            .stdout(Stdio::piped())
427            .stderr(Stdio::piped())
428            .spawn()
429            .map_err(|e| ExecutionError::StageFailed {
430                stage_id: stage_id.clone(),
431                message: format!("failed to spawn process: {e}"),
432            })?;
433        let _ = raw_argv;
434
435        // Write stdin in a background thread so we don't deadlock when the
436        // child's stdin pipe fills before we start reading stdout.
437        if let Some(mut stdin) = child.stdin.take() {
438            let bytes = input_json.into_bytes();
439            std::thread::spawn(move || {
440                let _ = stdin.write_all(&bytes);
441            });
442        }
443
444        // Collect output with a wall-clock timeout.
445        let pid = child.id();
446        let timeout = Duration::from_secs(self.config.timeout_secs);
447        let (tx, rx) = mpsc::channel();
448        std::thread::spawn(move || {
449            let _ = tx.send(child.wait_with_output());
450        });
451
452        let out = match rx.recv_timeout(timeout) {
453            Ok(Ok(o)) => o,
454            Ok(Err(e)) => {
455                return Err(ExecutionError::StageFailed {
456                    stage_id: stage_id.clone(),
457                    message: format!("nix process error: {e}"),
458                });
459            }
460            Err(_elapsed) => {
461                // Best-effort kill — process may already have exited.
462                let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
463                return Err(ExecutionError::TimedOut {
464                    stage_id: stage_id.clone(),
465                    timeout_secs: self.config.timeout_secs,
466                });
467            }
468        };
469
470        // Truncate stderr to avoid huge allocations from noisy runtimes.
471        let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
472        let stderr = String::from_utf8_lossy(stderr_raw);
473
474        if !out.status.success() {
475            return Err(ExecutionError::StageFailed {
476                stage_id: stage_id.clone(),
477                message: Self::classify_error(&stderr, out.status.code()),
478            });
479        }
480
481        // Truncate stdout to the configured limit.
482        let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
483        let stdout = String::from_utf8_lossy(stdout_raw);
484
485        if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
486            return Err(ExecutionError::StageFailed {
487                stage_id: stage_id.clone(),
488                message: format!(
489                    "stage output exceeded {} bytes limit",
490                    self.config.max_output_bytes
491                ),
492            });
493        }
494
495        serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
496            stage_id: stage_id.clone(),
497            message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
498        })
499    }
500
501    /// Classify a non-zero exit into a human-readable message, distinguishing
502    /// Nix infrastructure errors from user code errors.
503    fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
504        // Nix daemon / networking errors.
505        if stderr.contains("cannot connect to nix daemon")
506            || stderr.contains("Cannot connect to the Nix daemon")
507        {
508            return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
509                    or `nix daemon`"
510                .to_string();
511        }
512        if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
513            return format!(
514                "nix flake error (check network / nixpkgs access): {}",
515                first_line(stderr)
516            );
517        }
518        if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
519            return format!(
520                "nix failed to fetch runtime package (check network): {}",
521                first_line(stderr)
522            );
523        }
524        if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
525            return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
526                .to_string();
527        }
528        if stderr.contains("nix: command not found") || stderr.contains("No such file") {
529            return "nix binary not found — is Nix installed?".to_string();
530        }
531        // User code errors (exit 1 from the stage wrapper).
532        let code_str = exit_code
533            .map(|c| format!(" (exit {c})"))
534            .unwrap_or_default();
535        if stderr.trim().is_empty() {
536            format!("stage exited without output{code_str}")
537        } else {
538            format!("stage error{code_str}: {stderr}")
539        }
540    }
541
542    /// Build the nix subcommand + argument list for running a stage script.
543    ///
544    /// - Python with no third-party imports: `nix run nixpkgs#python3 -- script.py`
545    /// - Python with third-party imports:    `nix shell nixpkgs#python3Packages.X ... --command python3 script.py`
546    /// - JS/Bash: `nix run nixpkgs#<runtime> -- script`
547    fn build_nix_command(
548        &self,
549        language: &str,
550        script: &Path,
551        code: &str,
552    ) -> (String, Vec<String>) {
553        let script_path = script.to_str().unwrap_or("/dev/null").to_string();
554
555        match language {
556            "python" | "python3" | "" => {
557                // If the code has `# requires:` with pip packages, use a venv
558                // with system Python instead of Nix (Nix's python3Packages
559                // don't reliably work with `nix shell`).
560                if let Some(reqs) = Self::extract_pip_requirements(code) {
561                    let venv_hash = {
562                        use sha2::{Digest, Sha256};
563                        let h = Sha256::digest(reqs.as_bytes());
564                        hex::encode(&h[..8])
565                    };
566                    let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
567                    let venv_str = venv_dir.to_string_lossy().to_string();
568                    let python = venv_dir.join("bin").join("python3");
569                    let python_str = python.to_string_lossy().to_string();
570
571                    // Create venv + install deps if not cached
572                    if !python.exists() {
573                        let setup = std::process::Command::new("python3")
574                            .args(["-m", "venv", &venv_str])
575                            .output();
576                        if let Ok(out) = setup {
577                            if out.status.success() {
578                                let pip = venv_dir.join("bin").join("pip");
579                                let pkgs: Vec<&str> = reqs.split(", ").collect();
580                                let mut pip_args =
581                                    vec!["install", "--quiet", "--disable-pip-version-check"];
582                                pip_args.extend(pkgs);
583                                let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
584                                    .args(&pip_args)
585                                    .output();
586                            }
587                        }
588                    }
589
590                    // Run with the venv Python directly (no nix)
591                    return ("__direct__".to_string(), vec![python_str, script_path]);
592                }
593
594                let extra_pkgs = Self::detect_python_packages(code);
595                if extra_pkgs.is_empty() {
596                    (
597                        "run".to_string(),
598                        vec!["nixpkgs#python3".into(), "--".into(), script_path],
599                    )
600                } else {
601                    let mut args: Vec<String> = extra_pkgs
602                        .iter()
603                        .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
604                        .collect();
605                    args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
606                    ("shell".to_string(), args)
607                }
608            }
609            "javascript" | "js" => (
610                "run".to_string(),
611                vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
612            ),
613            _ => (
614                "run".to_string(),
615                vec!["nixpkgs#bash".into(), "--".into(), script_path],
616            ),
617        }
618    }
619
620    /// Extract pip requirements from `# requires: pkg1==ver, pkg2==ver` comments.
621    ///
622    /// Each spec is validated to prevent typosquatting and shell-metacharacter
623    /// injection from LLM-authored stages. By default, every package must be
624    /// pinned to an exact version (`pkg==1.2.3`). Set
625    /// `NOETHER_ALLOW_UNPINNED_PIP=1` to lift the pinning requirement for
626    /// local development; the character-set validation always runs.
627    ///
628    /// Invalid specs are dropped with a warning; if no valid specs remain,
629    /// this returns `None` and the caller falls back to the default Nix
630    /// runtime (where the missing dependency will surface as an honest
631    /// runtime error instead of a silent pip-install of attacker-chosen
632    /// names).
633    fn extract_pip_requirements(code: &str) -> Option<String> {
634        for line in code.lines() {
635            let trimmed = line.trim();
636            if trimmed.starts_with("# requires:") {
637                let reqs = trimmed.strip_prefix("# requires:").unwrap().trim();
638                if reqs.is_empty() {
639                    continue;
640                }
641                let valid: Vec<String> = reqs
642                    .split(',')
643                    .map(|s| s.trim())
644                    .filter(|s| !s.is_empty())
645                    .filter(|s| match validate_pip_spec(s) {
646                        Ok(()) => true,
647                        Err(reason) => {
648                            eprintln!(
649                                "[noether] rejected `# requires:` entry {s:?} ({reason}); skipping"
650                            );
651                            false
652                        }
653                    })
654                    .map(|s| s.to_string())
655                    .collect();
656
657                if valid.is_empty() {
658                    eprintln!(
659                        "[noether] all `# requires:` entries rejected (raw={reqs:?}); falling back to default Nix runtime"
660                    );
661                    return None;
662                }
663                return Some(valid.join(", "));
664            }
665        }
666        None
667    }
668
669    /// Scan Python source for `import X` / `from X import` statements and return
670    /// the Nix package names for any recognised third-party libraries.
671    fn detect_python_packages(code: &str) -> Vec<&'static str> {
672        // Map of Python import name → nixpkgs python3Packages attribute name.
673        const KNOWN: &[(&str, &str)] = &[
674            ("requests", "requests"),
675            ("httpx", "httpx"),
676            ("aiohttp", "aiohttp"),
677            ("bs4", "beautifulsoup4"),
678            ("lxml", "lxml"),
679            ("pandas", "pandas"),
680            ("numpy", "numpy"),
681            ("scipy", "scipy"),
682            ("sklearn", "scikit-learn"),
683            ("PIL", "Pillow"),
684            ("cv2", "opencv4"),
685            ("yaml", "pyyaml"),
686            ("toml", "toml"),
687            ("dateutil", "python-dateutil"),
688            ("pytz", "pytz"),
689            ("boto3", "boto3"),
690            ("psycopg2", "psycopg2"),
691            ("pymongo", "pymongo"),
692            ("redis", "redis"),
693            ("celery", "celery"),
694            ("fastapi", "fastapi"),
695            ("pydantic", "pydantic"),
696            ("cryptography", "cryptography"),
697            ("jwt", "pyjwt"),
698            ("paramiko", "paramiko"),
699            ("dotenv", "python-dotenv"),
700            ("joblib", "joblib"),
701            ("torch", "pytorch"),
702            ("transformers", "transformers"),
703            ("datasets", "datasets"),
704            ("pyarrow", "pyarrow"),
705        ];
706
707        let mut found: Vec<&'static str> = Vec::new();
708        for (import_name, nix_name) in KNOWN {
709            let patterns = [
710                format!("import {import_name}"),
711                format!("import {import_name} "),
712                format!("from {import_name} "),
713                format!("from {import_name}."),
714            ];
715            if patterns.iter().any(|p| code.contains(p.as_str())) {
716                found.push(nix_name);
717            }
718        }
719        found
720    }
721
722    // ── Language wrappers ────────────────────────────────────────────────────
723
724    #[cfg(test)]
725    #[allow(dead_code)]
726    fn _expose_extract_future_imports(code: &str) -> (String, String) {
727        Self::extract_future_imports(code)
728    }
729
730    /// Pull every `from __future__ import ...` line out of `code` and return
731    /// `(joined_future_imports, code_without_them)`. The future imports are
732    /// returned with trailing newlines so the caller can embed them directly
733    /// at the top of the wrapper. Detection is line-based (no AST) — matches
734    /// any non-indented line starting with `from __future__ import`.
735    fn extract_future_imports(code: &str) -> (String, String) {
736        let mut hoisted = String::new();
737        let mut remaining = String::new();
738        for line in code.lines() {
739            let trimmed = line.trim_start();
740            if !line.starts_with(' ')
741                && !line.starts_with('\t')
742                && trimmed.starts_with("from __future__ import")
743            {
744                hoisted.push_str(line);
745                hoisted.push('\n');
746            } else {
747                remaining.push_str(line);
748                remaining.push('\n');
749            }
750        }
751        (hoisted, remaining)
752    }
753
754    fn wrap_python(user_code: &str) -> String {
755        // Skip pip install — dependencies are handled by the venv executor
756        // (build_nix_command creates a venv with pip packages pre-installed)
757        // or by Nix packages (for known imports like numpy, pandas, etc.).
758        let pip_install = String::new();
759
760        // Hoist any `from __future__ import ...` lines out of user code and
761        // emit them as the very first statements of the wrapper. Python
762        // requires `__future__` imports to be the first non-comment,
763        // non-docstring statement in a module — leaving them embedded in the
764        // user-code block (which is line ~17 of the wrapped file) raises
765        // `SyntaxError: from __future__ imports must occur at the
766        // beginning of the file`.
767        let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
768
769        format!(
770            r#"{future_imports}import sys, json as _json
771{pip_install}
772# ---- user implementation ----
773{user_code_clean}
774# ---- end implementation ----
775
776if __name__ == '__main__':
777    if 'execute' not in dir() or not callable(globals().get('execute')):
778        print(
779            "Noether stage error: implementation must define a top-level "
780            "function `def execute(input): ...` that takes the parsed input dict "
781            "and returns the output dict. Do not read from stdin or print to stdout — "
782            "the Noether runtime handles I/O for you.",
783            file=sys.stderr,
784        )
785        sys.exit(1)
786    try:
787        _raw = _json.loads(sys.stdin.read())
788        # If the runtime passed input as a JSON-encoded string, decode it once more.
789        # This happens when input arrives as null or a bare string from the CLI.
790        if isinstance(_raw, str):
791            try:
792                _raw = _json.loads(_raw)
793            except Exception:
794                pass
795        _output = execute(_raw if _raw is not None else {{}})
796        print(_json.dumps(_output))
797    except Exception as _e:
798        print(str(_e), file=sys.stderr)
799        sys.exit(1)
800"#
801        )
802    }
803
804    fn wrap_javascript(user_code: &str) -> String {
805        format!(
806            r#"const _readline = require('readline');
807let _input = '';
808process.stdin.on('data', d => _input += d);
809process.stdin.on('end', () => {{
810    try {{
811        // ---- user implementation ----
812        {user_code}
813        // ---- end implementation ----
814        const _result = execute(JSON.parse(_input));
815        process.stdout.write(JSON.stringify(_result) + '\n');
816    }} catch (e) {{
817        process.stderr.write(String(e) + '\n');
818        process.exit(1);
819    }}
820}});
821"#
822        )
823    }
824
825    fn wrap_bash(user_code: &str) -> String {
826        format!(
827            r#"#!/usr/bin/env bash
828set -euo pipefail
829INPUT=$(cat)
830
831# ---- user implementation ----
832{user_code}
833# ---- end implementation ----
834
835execute "$INPUT"
836"#
837        )
838    }
839}
840
841// ── Helpers ──────────────────────────────────────────────────────────────────
842
843/// Return the first non-empty line of a multi-line string, trimmed.
844fn first_line(s: &str) -> &str {
845    s.lines()
846        .map(str::trim)
847        .find(|l| !l.is_empty())
848        .unwrap_or(s)
849}
850
851/// Validate a single pip requirement spec from a `# requires:` comment.
852///
853/// Accepts `pkg==version`. The package name must match PEP 503 normalisation
854/// (letters, digits, `_`, `-`, `.`). The version must be a straightforward
855/// PEP 440-ish literal (letters, digits, `.`, `+`, `!`, `-`). The pinning
856/// requirement (`==`) can be lifted with `NOETHER_ALLOW_UNPINNED_PIP=1`
857/// for local dev, but the character-set validation always runs so injected
858/// shell metacharacters, quotes, or URL-form specs are rejected.
859fn validate_pip_spec(spec: &str) -> Result<(), &'static str> {
860    let allow_unpinned = matches!(
861        std::env::var("NOETHER_ALLOW_UNPINNED_PIP").as_deref(),
862        Ok("1" | "true" | "yes" | "on")
863    );
864
865    // Split at the first `==`. If absent, require the opt-in flag.
866    let (name, version) = match spec.split_once("==") {
867        Some((n, v)) => (n.trim(), Some(v.trim())),
868        None => {
869            if !allow_unpinned {
870                return Err("unpinned; use pkg==version or set NOETHER_ALLOW_UNPINNED_PIP=1");
871            }
872            (spec.trim(), None)
873        }
874    };
875
876    if name.is_empty() {
877        return Err("empty package name");
878    }
879    if !name
880        .bytes()
881        .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'_' | b'-' | b'.'))
882    {
883        return Err("package name contains disallowed characters");
884    }
885    if let Some(v) = version {
886        if v.is_empty() {
887            return Err("empty version after `==`");
888        }
889        if !v
890            .bytes()
891            .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'+' | b'!' | b'-'))
892        {
893            return Err("version contains disallowed characters");
894        }
895    }
896    Ok(())
897}
898
899// ── StageExecutor impl ────────────────────────────────────────────────────────
900
901impl StageExecutor for NixExecutor {
902    fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
903        let impl_ = self
904            .implementations
905            .get(&stage_id.0)
906            .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
907
908        let code_hash = Self::code_hash(&impl_.code);
909        let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
910        self.run_script(stage_id, &script, &impl_.language, input)
911    }
912}
913
914#[cfg(test)]
915mod tests {
916    use super::*;
917
918    #[allow(dead_code)] // only used by the ignored integration tests
919    fn make_executor() -> NixExecutor {
920        let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
921        let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
922        let _ = std::fs::create_dir_all(&cache_dir);
923        NixExecutor {
924            nix_bin,
925            cache_dir,
926            config: NixConfig::default(),
927            implementations: HashMap::new(),
928        }
929    }
930
931    #[test]
932    fn register_with_effects_preserves_network_effect() {
933        // Regression guard on the synthesized-stage effects path.
934        // Pre-hardening, `register_synthesized` → `NixExecutor::register`
935        // dropped the declared effects and stamped `EffectSet::pure()`
936        // onto the stored `StageImpl`. A Network-effect stage ended
937        // up with a no-network sandbox and failed with DNS errors at
938        // runtime. The `register()` shim is gone; this test locks in
939        // that `register_with_effects` is the only registration path
940        // and that it threads the effects through verbatim.
941        use noether_core::effects::{Effect, EffectSet};
942        let mut exec = make_executor();
943        let id = StageId("sig_network".into());
944        let effects = EffectSet::new([Effect::Pure, Effect::Network]);
945        exec.register_with_effects(&id, "code", "python", effects.clone());
946        let stored = exec
947            .implementations
948            .get(&id.0)
949            .expect("stage should be registered");
950        assert_eq!(
951            stored.effects, effects,
952            "declared effects must survive register_with_effects"
953        );
954        assert!(
955            stored.effects.iter().any(|e| matches!(e, Effect::Network)),
956            "Network must be preserved so the sandbox opens the net ns"
957        );
958    }
959
960    #[test]
961    fn validate_pip_spec_accepts_pinned() {
962        assert!(validate_pip_spec("pandas==2.0.0").is_ok());
963        assert!(validate_pip_spec("scikit-learn==1.5.1").is_ok());
964        assert!(validate_pip_spec("urllib3==2.2.3").is_ok());
965        assert!(validate_pip_spec("pydantic==2.5.0+cu121").is_ok());
966    }
967
968    #[test]
969    fn validate_pip_spec_rejects_unpinned_by_default() {
970        // Ensure the opt-in flag is not accidentally set in the test env.
971        let guard = (std::env::var_os("NOETHER_ALLOW_UNPINNED_PIP"),);
972        // SAFETY: single-threaded test — no other test reads this var at the same time.
973        unsafe {
974            std::env::remove_var("NOETHER_ALLOW_UNPINNED_PIP");
975        }
976        let result = validate_pip_spec("pandas");
977        // Restore prior state before asserting.
978        if let (Some(prev),) = guard {
979            unsafe {
980                std::env::set_var("NOETHER_ALLOW_UNPINNED_PIP", prev);
981            }
982        }
983        assert!(result.is_err(), "bare name must be rejected without opt-in");
984    }
985
986    #[test]
987    fn validate_pip_spec_rejects_shell_metacharacters() {
988        for bad in [
989            "pandas; rm -rf /",
990            "pandas==$(whoami)",
991            "pandas==1.0.0; echo pwned",
992            "pandas==`id`",
993            "https://evil.example/wheel.whl",
994            "git+https://example.com/repo.git",
995            "pkg with space==1.0",
996            "pkg==1.0 && echo",
997        ] {
998            assert!(validate_pip_spec(bad).is_err(), "should reject {bad:?}");
999        }
1000    }
1001
1002    #[test]
1003    fn validate_pip_spec_rejects_empty() {
1004        assert!(validate_pip_spec("==1.0").is_err());
1005        assert!(validate_pip_spec("pkg==").is_err());
1006    }
1007
1008    #[test]
1009    fn detect_python_packages_requests() {
1010        let code = "import requests\ndef execute(v):\n    return requests.get(v).json()";
1011        let pkgs = NixExecutor::detect_python_packages(code);
1012        assert!(
1013            pkgs.contains(&"requests"),
1014            "expected 'requests' in {pkgs:?}"
1015        );
1016    }
1017
1018    #[test]
1019    fn detect_python_packages_stdlib_only() {
1020        let code = "import urllib.request, json\ndef execute(v):\n    return json.loads(v)";
1021        let pkgs = NixExecutor::detect_python_packages(code);
1022        assert!(
1023            pkgs.is_empty(),
1024            "stdlib imports should not trigger packages: {pkgs:?}"
1025        );
1026    }
1027
1028    #[test]
1029    fn detect_python_packages_multiple() {
1030        let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
1031        let pkgs = NixExecutor::detect_python_packages(code);
1032        assert!(pkgs.contains(&"pandas"));
1033        assert!(pkgs.contains(&"numpy"));
1034        assert!(pkgs.contains(&"beautifulsoup4"));
1035    }
1036
1037    fn test_executor() -> NixExecutor {
1038        NixExecutor {
1039            nix_bin: PathBuf::from("/usr/bin/nix"),
1040            cache_dir: PathBuf::from("/tmp/noether-test-cache"),
1041            config: NixConfig::default(),
1042            implementations: HashMap::new(),
1043        }
1044    }
1045
1046    #[test]
1047    fn build_nix_command_no_packages() {
1048        let exec = test_executor();
1049        let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
1050        assert_eq!(sub, "run");
1051        assert!(args.iter().any(|a| a.contains("python3")));
1052        assert!(!args.iter().any(|a| a.contains("shell")));
1053    }
1054
1055    #[test]
1056    fn build_nix_command_with_requests() {
1057        let exec = test_executor();
1058        let (sub, args) =
1059            exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
1060        assert_eq!(sub, "shell");
1061        assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
1062        assert!(args.iter().any(|a| a == "--command"));
1063        // Must NOT include bare nixpkgs#python3 — it conflicts with python3Packages.*
1064        assert!(
1065            !args.iter().any(|a| a == "nixpkgs#python3"),
1066            "bare python3 conflicts: {args:?}"
1067        );
1068    }
1069
1070    #[test]
1071    fn python_wrapper_contains_boilerplate() {
1072        let wrapped = NixExecutor::wrap_python("def execute(x):\n    return x + 1");
1073        assert!(wrapped.contains("sys.stdin.read()"));
1074        assert!(wrapped.contains("_json.dumps(_output)"));
1075        assert!(wrapped.contains("def execute(x)"));
1076    }
1077
1078    #[test]
1079    fn code_hash_is_stable() {
1080        let h1 = NixExecutor::code_hash("hello world");
1081        let h2 = NixExecutor::code_hash("hello world");
1082        let h3 = NixExecutor::code_hash("different");
1083        assert_eq!(h1, h2);
1084        assert_ne!(h1, h3);
1085    }
1086
1087    #[test]
1088    fn classify_error_daemon_not_running() {
1089        let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
1090        assert!(msg.contains("nix daemon is not running"), "got: {msg}");
1091    }
1092
1093    #[test]
1094    fn future_imports_are_hoisted_out_of_user_code() {
1095        let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n    return input\n";
1096        let wrapped = NixExecutor::wrap_python(user);
1097        // The future import must come BEFORE `import sys, json as _json`.
1098        let future_pos = wrapped
1099            .find("from __future__ import annotations")
1100            .expect("future import should be present in wrapper");
1101        let stdlib_pos = wrapped
1102            .find("import sys, json as _json")
1103            .expect("stdlib imports should be present");
1104        assert!(
1105            future_pos < stdlib_pos,
1106            "future import must precede stdlib imports in wrapped output"
1107        );
1108    }
1109
1110    #[test]
1111    fn user_code_without_future_imports_is_unchanged() {
1112        let user = "import json\n\ndef execute(input):\n    return input\n";
1113        let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
1114        assert_eq!(hoisted, "");
1115        assert_eq!(remaining.trim(), user.trim());
1116    }
1117
1118    #[test]
1119    fn nested_future_import_inside_function_is_not_hoisted() {
1120        // Indented "from __future__" lines (inside a function) are not
1121        // valid Python anyway, but the hoister must not promote them.
1122        let user =
1123            "def execute(input):\n    from __future__ import annotations\n    return input\n";
1124        let (hoisted, _) = NixExecutor::extract_future_imports(user);
1125        assert_eq!(hoisted, "");
1126    }
1127
1128    #[test]
1129    fn classify_error_user_code_exit1() {
1130        let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
1131        assert!(msg.contains("ValueError"), "got: {msg}");
1132        assert!(msg.contains("exit 1"), "got: {msg}");
1133    }
1134
1135    #[test]
1136    fn classify_error_disk_full() {
1137        let msg = NixExecutor::classify_error("No space left on device", Some(1));
1138        assert!(msg.contains("disk space"), "got: {msg}");
1139    }
1140
1141    #[test]
1142    fn classify_error_empty_stderr() {
1143        let msg = NixExecutor::classify_error("", Some(137));
1144        assert!(msg.contains("exit 137"), "got: {msg}");
1145    }
1146
1147    #[test]
1148    fn nix_config_defaults() {
1149        let cfg = NixConfig::default();
1150        assert_eq!(cfg.timeout_secs, 30);
1151        assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
1152        assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
1153    }
1154
1155    #[test]
1156    fn first_line_extracts_correctly() {
1157        assert_eq!(first_line("  \nfoo\nbar"), "foo");
1158        assert_eq!(first_line("single"), "single");
1159        assert_eq!(first_line(""), "");
1160    }
1161
1162    /// Integration test — runs when nix is available (skips gracefully if not).
1163    /// Requires a warm Nix binary cache; run with `cargo test -- --ignored` to include.
1164    #[test]
1165    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1166    fn nix_python_identity_stage() {
1167        let nix_bin = match NixExecutor::find_nix() {
1168            Some(p) => p,
1169            None => {
1170                eprintln!("nix not found, skipping");
1171                return;
1172            }
1173        };
1174
1175        let cache_dir = std::env::temp_dir().join("noether-nix-integ");
1176        let _ = std::fs::create_dir_all(&cache_dir);
1177
1178        let code = "def execute(x):\n    return x";
1179        let executor = NixExecutor {
1180            nix_bin,
1181            cache_dir,
1182            config: NixConfig::default(),
1183            implementations: {
1184                let mut m = HashMap::new();
1185                let id = StageId("test_identity".into());
1186                m.insert(
1187                    id.0.clone(),
1188                    StageImpl {
1189                        code: code.into(),
1190                        language: "python".into(),
1191                        effects: noether_core::effects::EffectSet::pure(),
1192                    },
1193                );
1194                m
1195            },
1196        };
1197
1198        let id = StageId("test_identity".into());
1199        let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
1200        assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
1201    }
1202
1203    /// Verify that a stage that hangs returns TimedOut, not a hang.
1204    /// Requires nix + warm binary cache; run with `cargo test -- --ignored`.
1205    #[test]
1206    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1207    fn nix_timeout_kills_hanging_stage() {
1208        let nix_bin = match NixExecutor::find_nix() {
1209            Some(p) => p,
1210            None => {
1211                eprintln!("nix not found, skipping timeout test");
1212                return;
1213            }
1214        };
1215
1216        let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
1217        let _ = std::fs::create_dir_all(&cache_dir);
1218
1219        let code = "import time\ndef execute(x):\n    time.sleep(9999)\n    return x";
1220        let executor = NixExecutor {
1221            nix_bin,
1222            cache_dir,
1223            config: NixConfig {
1224                timeout_secs: 2,
1225                ..NixConfig::default()
1226            },
1227            implementations: {
1228                let mut m = HashMap::new();
1229                let id = StageId("hanging".into());
1230                m.insert(
1231                    id.0.clone(),
1232                    StageImpl {
1233                        code: code.into(),
1234                        language: "python".into(),
1235                        effects: noether_core::effects::EffectSet::pure(),
1236                    },
1237                );
1238                m
1239            },
1240        };
1241
1242        let id = StageId("hanging".into());
1243        let result = executor.execute(&id, &serde_json::json!(null));
1244        assert!(
1245            matches!(
1246                result,
1247                Err(ExecutionError::TimedOut {
1248                    timeout_secs: 2,
1249                    ..
1250                })
1251            ),
1252            "expected TimedOut, got: {result:?}"
1253        );
1254    }
1255}