Skip to main content

noether_engine/executor/
nix.rs

1#![warn(clippy::unwrap_used)]
2#![cfg_attr(test, allow(clippy::unwrap_used))]
3
4//! Nix-based executor for synthesized stages.
5//!
6//! Runs stage implementations as subprocesses using `nix run nixpkgs#<runtime>`,
7//! giving a reproducible, Nix-pinned runtime for Python/JavaScript/Bash code
8//! without requiring any ambient language runtime on the host.
9//!
10//! **This is a reproducibility boundary, not an isolation boundary.** Stages
11//! run with the privileges of the host user — they can read/write the
12//! filesystem, make arbitrary network calls, and read environment variables.
13//! Do not execute untrusted stages on a host with credentials you are not
14//! willing to risk. See SECURITY.md for the full trust model.
15//!
16//! ## Execution protocol
17//!
18//! - stdin  → JSON-encoded input value followed by a newline
19//! - stdout → JSON-encoded output value followed by a newline
20//! - stderr → error message (any content is treated as failure)
21//! - exit 0 → success; exit non-zero → `ExecutionError::StageFailed`
22//!
23//! ## Timeout
24//!
25//! Every execution is bounded by [`NixConfig::timeout_secs`] (default 30 s).
26//! When the child process exceeds the limit it is sent SIGKILL and the call
27//! returns [`ExecutionError::TimedOut`].
28//!
29//! ## Generated wrapper (Python example)
30//!
31//! ```python
32//! import sys, json as _json
33//!
34//! # ---- user code ----
35//! def execute(input_value):
36//!     ...
37//! # -------------------
38//!
39//! if __name__ == '__main__':
40//!     try:
41//!         _output = execute(_json.loads(sys.stdin.read()))
42//!         print(_json.dumps(_output))
43//!     except Exception as e:
44//!         print(str(e), file=sys.stderr)
45//!         sys.exit(1)
46//! ```
47
48use super::{ExecutionError, StageExecutor};
49use noether_core::stage::StageId;
50use serde_json::Value;
51use sha2::{Digest, Sha256};
52use std::collections::HashMap;
53use std::io::Write as IoWrite;
54use std::path::{Path, PathBuf};
55use std::process::{Command, Stdio};
56use std::sync::mpsc;
57use std::time::Duration;
58
59// ── Configuration ────────────────────────────────────────────────────────────
60
61/// Tunable knobs for the [`NixExecutor`].
62#[derive(Debug, Clone)]
63pub struct NixConfig {
64    /// Wall-clock timeout for a single stage execution in seconds.
65    /// The child process is killed with SIGKILL when exceeded.
66    /// Default: 30 s.
67    pub timeout_secs: u64,
68    /// Maximum number of bytes read from a stage's stdout before truncation.
69    /// Prevents runaway allocations from stages that produce huge outputs.
70    /// Default: 10 MiB.
71    pub max_output_bytes: usize,
72    /// Maximum number of bytes captured from stderr (for error messages).
73    /// Default: 64 KiB.
74    pub max_stderr_bytes: usize,
75    /// Isolation backend to wrap each stage subprocess in. When set
76    /// to [`super::isolation::IsolationBackend::None`] (the default
77    /// for back-compat), stages run with full host-user privileges
78    /// — see SECURITY.md. Set via
79    /// [`NixConfig::with_isolation`] or the CLI `--isolate` flag.
80    pub isolation: super::isolation::IsolationBackend,
81}
82
83impl Default for NixConfig {
84    fn default() -> Self {
85        Self {
86            timeout_secs: 30,
87            max_output_bytes: 10 * 1024 * 1024,
88            max_stderr_bytes: 64 * 1024,
89            isolation: super::isolation::IsolationBackend::None,
90        }
91    }
92}
93
94impl NixConfig {
95    /// Set the isolation backend. Returns `self` for chaining.
96    pub fn with_isolation(mut self, backend: super::isolation::IsolationBackend) -> Self {
97        self.isolation = backend;
98        self
99    }
100}
101
102// ── Internal stage storage ───────────────────────────────────────────────────
103
104/// Maps stage IDs to their implementation (source code + language tag +
105/// declared effects — needed so the isolation layer can derive a
106/// policy).
107#[derive(Clone)]
108struct StageImpl {
109    code: String,
110    language: String,
111    effects: noether_core::effects::EffectSet,
112}
113
114// ── NixExecutor ──────────────────────────────────────────────────────────────
115
116/// Executor that runs synthesized stages through Nix-managed language runtimes.
117///
118/// When `nix` is available, each stage is executed as a subprocess with a
119/// Nix-pinned runtime (e.g. `nix run nixpkgs#python3 -- stage.py`). The Nix
120/// binary cache ensures the runtime is downloaded once and then reused from
121/// the store. **This gives reproducibility, not isolation**: the subprocess
122/// inherits the host user's privileges, filesystem, and network. See module
123/// docs and SECURITY.md for the full trust model.
124///
125/// ## Resource limits
126///
127/// - **Timeout**: configured via [`NixConfig::timeout_secs`] (default 30 s).
128///   The child is sent SIGKILL when the limit is exceeded.
129/// - **Output cap**: configured via [`NixConfig::max_output_bytes`] (default 10 MiB).
130pub struct NixExecutor {
131    nix_bin: PathBuf,
132    cache_dir: PathBuf,
133    config: NixConfig,
134    implementations: HashMap<String, StageImpl>,
135}
136
137impl NixExecutor {
138    /// Probe the system for a usable `nix` binary.
139    /// Returns the path if found, or `None` if Nix is not installed.
140    pub fn find_nix() -> Option<PathBuf> {
141        // Determinate Systems installer puts nix here:
142        let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
143        if determinate.exists() {
144            return Some(determinate);
145        }
146
147        // Walk $PATH directly rather than spawning `which`. Avoids a
148        // subprocess + the risk that `which` is missing or shadowed on
149        // minimal systems (e.g. some container base images).
150        let path_env = std::env::var_os("PATH")?;
151        for dir in std::env::split_paths(&path_env) {
152            let candidate = dir.join("nix");
153            if candidate.is_file() {
154                return Some(candidate);
155            }
156        }
157        None
158    }
159
160    /// Build an executor that can run synthesized stages found in `store`.
161    ///
162    /// Returns `None` when `nix` is not available — callers should fall back to
163    /// `InlineExecutor` exclusively in that case.
164    pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
165        Self::from_store_with_config(store, NixConfig::default())
166    }
167
168    /// Like [`from_store`] but with a custom [`NixConfig`].
169    pub fn from_store_with_config(
170        store: &dyn noether_store::StageStore,
171        config: NixConfig,
172    ) -> Option<Self> {
173        let nix_bin = Self::find_nix()?;
174
175        let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
176        let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
177        let _ = std::fs::create_dir_all(&cache_dir);
178
179        let mut implementations = HashMap::new();
180        for stage in store.list(None) {
181            if let (Some(code), Some(lang)) =
182                (&stage.implementation_code, &stage.implementation_language)
183            {
184                implementations.insert(
185                    stage.id.0.clone(),
186                    StageImpl {
187                        code: code.clone(),
188                        language: lang.clone(),
189                        effects: stage.signature.effects.clone(),
190                    },
191                );
192            }
193        }
194
195        Some(Self {
196            nix_bin,
197            cache_dir,
198            config,
199            implementations,
200        })
201    }
202
203    /// Clone the current config (minus the implementations map) for
204    /// callers that want to rebuild with different knobs.
205    pub fn config_snapshot(&self) -> NixConfig {
206        self.config.clone()
207    }
208
209    /// Rebuild a NixExecutor with a replacement config, preserving
210    /// its registered implementations. Returns `Some(..)` or `None`
211    /// when reconstruction fails — today it can't fail, but the
212    /// Option keeps the API forward-compatible.
213    pub fn rebuild_with_config(mut self, config: NixConfig) -> Option<Self> {
214        self.config = config;
215        Some(self)
216    }
217
218    /// Register a stage with explicit declared effects. Used by tests
219    /// and by callers that want to drive the isolation policy without
220    /// going through the full StageStore.
221    pub fn register_with_effects(
222        &mut self,
223        stage_id: &StageId,
224        code: &str,
225        language: &str,
226        effects: noether_core::effects::EffectSet,
227    ) {
228        self.implementations.insert(
229            stage_id.0.clone(),
230            StageImpl {
231                code: code.into(),
232                language: language.into(),
233                effects,
234            },
235        );
236    }
237
238    /// True when we have a real implementation for this stage.
239    pub fn has_implementation(&self, stage_id: &StageId) -> bool {
240        self.implementations.contains_key(&stage_id.0)
241    }
242
243    /// Pre-fetch the Python 3 runtime into the Nix store in a background thread.
244    ///
245    /// The first time any Python stage runs, Nix may take several seconds to
246    /// download and verify the runtime closure.  Calling `warmup()` at startup
247    /// overlaps that latency with application boot time.
248    ///
249    /// The returned `JoinHandle` can be ignored — any error is logged to stderr
250    /// but does not affect correctness; the runtime will still be fetched on first
251    /// actual use.
252    pub fn warmup(&self) -> std::thread::JoinHandle<()> {
253        let nix_bin = self.nix_bin.clone();
254        std::thread::spawn(move || {
255            // `nix build` with `--dry-run` is enough to populate the binary cache
256            // without running any user code.
257            let status = Command::new(&nix_bin)
258                .args([
259                    "build",
260                    "--no-link",
261                    "--quiet",
262                    "--no-write-lock-file",
263                    "nixpkgs#python3",
264                ])
265                .stdout(Stdio::null())
266                .stderr(Stdio::null())
267                .status();
268            match status {
269                Ok(s) if s.success() => {
270                    eprintln!("[noether] nix warmup: python3 runtime cached");
271                }
272                Ok(s) => {
273                    eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
274                }
275                Err(e) => {
276                    eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
277                }
278            }
279        })
280    }
281
282    // ── Internal helpers ─────────────────────────────────────────────────────
283
284    /// Hash the code string to get a stable cache key.
285    fn code_hash(code: &str) -> String {
286        hex::encode(Sha256::digest(code.as_bytes()))
287    }
288
289    /// Ensure the wrapped script for `impl_hash` exists on disk.
290    /// Returns the path to the file.
291    fn ensure_script(
292        &self,
293        impl_hash: &str,
294        code: &str,
295        language: &str,
296    ) -> Result<PathBuf, ExecutionError> {
297        let ext = match language {
298            "javascript" | "js" => "js",
299            "bash" | "sh" => "sh",
300            _ => "py",
301        };
302
303        let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
304        if path.exists() {
305            return Ok(path);
306        }
307
308        let wrapped = match language {
309            "javascript" | "js" => Self::wrap_javascript(code),
310            "bash" | "sh" => Self::wrap_bash(code),
311            _ => Self::wrap_python(code),
312        };
313
314        std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
315            stage_id: StageId(impl_hash.into()),
316            message: format!("failed to write stage script: {e}"),
317        })?;
318
319        Ok(path)
320    }
321
322    /// Run the stage script via Nix with JSON on stdin, enforcing timeout and
323    /// output-size limits.
324    fn run_script(
325        &self,
326        stage_id: &StageId,
327        script: &Path,
328        language: &str,
329        input: &Value,
330    ) -> Result<Value, ExecutionError> {
331        let input_json = serde_json::to_string(input).unwrap_or_default();
332
333        let code = self
334            .implementations
335            .get(&stage_id.0)
336            .map(|i| i.code.as_str())
337            .unwrap_or("");
338
339        let (nix_subcommand, args) = self.build_nix_command(language, script, code);
340
341        // Build the full argv — either raw (no isolation) or wrapped in
342        // `bwrap` when an isolation backend is configured. The wrapped
343        // command spawns bwrap which execs the inner command inside a
344        // fresh sandbox.
345        let raw_argv: Vec<String> = if nix_subcommand == "__direct__" {
346            args.clone()
347        } else {
348            let mut v = vec![self.nix_bin.display().to_string(), nix_subcommand.clone()];
349            v.push("--no-write-lock-file".into());
350            v.push("--quiet".into());
351            v.extend(args.iter().cloned());
352            v
353        };
354
355        let mut spawn = match &self.config.isolation {
356            super::isolation::IsolationBackend::None => {
357                // No sandbox — legacy behaviour.
358                let mut cmd = Command::new(&raw_argv[0]);
359                cmd.args(&raw_argv[1..]);
360                cmd
361            }
362            super::isolation::IsolationBackend::Bwrap { bwrap_path } => {
363                // /work is a sandbox-private tmpfs (set by
364                // `IsolationPolicy::from_effects` default) — no host-side
365                // tmpdir to manage, no cleanup, no race.
366                let mut policy = super::isolation::IsolationPolicy::from_effects(
367                    self.implementations
368                        .get(&stage_id.0)
369                        .map(|i| &i.effects)
370                        .unwrap_or(&noether_core::effects::EffectSet::pure()),
371                );
372                // Expose the stage-script cache (where this invocation's
373                // wrapped `.py` / `.sh` / `.js` file lives). Scoped to
374                // `cache_dir` so the sandbox sees noether's own
375                // workspace and nothing else from the host user's home.
376                policy.ro_binds.push(noether_isolation::RoBind::new(
377                    self.cache_dir.to_path_buf(),
378                    self.cache_dir.to_path_buf(),
379                ));
380                // Nix binary visibility inside the sandbox has three cases:
381                //
382                // 1. `nix_bin` is under `/nix/store` — covered by the
383                //    default `/nix/store` bind. Nothing to add.
384                // 2. `nix_bin` is under `cache_dir` — covered by the
385                //    `cache_dir` bind above. Nothing to add.
386                // 3. `nix_bin` is a distro-packaged install (e.g.
387                //    `/usr/bin/nix`, `/usr/local/bin/nix`). The
388                //    binary is dynamically linked against glibc,
389                //    libcrypto, and readline living in `/usr/lib*`.
390                //    Binding just the nix executable file would let
391                //    the sandbox exec it but immediately fail
392                //    resolving `ld-linux-x86-64.so.2` — the kernel
393                //    can't find the dynamic loader.
394                //
395                //    Widening the bind set to include `/usr/lib*`
396                //    re-exposes the full suid-binary surface the
397                //    hardening closed. Instead: refuse to run, with
398                //    a clear message pointing the operator at the
399                //    Nix-native install path. The trust model here
400                //    is "nix belongs to the same reproducibility
401                //    boundary as the stages it dispatches;" a
402                //    distro-packaged nix violates that boundary
403                //    anyway.
404                if !self.nix_bin.starts_with("/nix/store")
405                    && !self.nix_bin.starts_with(&self.cache_dir)
406                {
407                    return Err(ExecutionError::StageFailed {
408                        stage_id: stage_id.clone(),
409                        message: format!(
410                            "stage isolation is enabled but nix is installed at \
411                             {} (outside /nix/store). A distro-packaged nix is \
412                             dynamically linked against host libraries; binding \
413                             those into the sandbox would defeat isolation. \
414                             Install nix via the Determinate / upstream \
415                             installer (places nix under /nix/store) or pass \
416                             --isolate=none to run without the sandbox.",
417                            self.nix_bin.display()
418                        ),
419                    });
420                }
421                // `build_bwrap_command` emits `--setenv` args for
422                // the sandbox's env allowlist (HOME=/work,
423                // USER=nobody, + inherited). Nothing else to do here.
424                super::isolation::build_bwrap_command(bwrap_path, &policy, &raw_argv)
425            }
426        };
427
428        let mut child = spawn
429            .stdin(Stdio::piped())
430            .stdout(Stdio::piped())
431            .stderr(Stdio::piped())
432            .spawn()
433            .map_err(|e| ExecutionError::StageFailed {
434                stage_id: stage_id.clone(),
435                message: format!("failed to spawn process: {e}"),
436            })?;
437        let _ = raw_argv;
438
439        // Write stdin in a background thread so we don't deadlock when the
440        // child's stdin pipe fills before we start reading stdout.
441        if let Some(mut stdin) = child.stdin.take() {
442            let bytes = input_json.into_bytes();
443            std::thread::spawn(move || {
444                let _ = stdin.write_all(&bytes);
445            });
446        }
447
448        // Collect output with a wall-clock timeout.
449        let pid = child.id();
450        let timeout = Duration::from_secs(self.config.timeout_secs);
451        let (tx, rx) = mpsc::channel();
452        std::thread::spawn(move || {
453            let _ = tx.send(child.wait_with_output());
454        });
455
456        let out = match rx.recv_timeout(timeout) {
457            Ok(Ok(o)) => o,
458            Ok(Err(e)) => {
459                return Err(ExecutionError::StageFailed {
460                    stage_id: stage_id.clone(),
461                    message: format!("nix process error: {e}"),
462                });
463            }
464            Err(_elapsed) => {
465                // Best-effort kill — process may already have exited.
466                let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
467                return Err(ExecutionError::TimedOut {
468                    stage_id: stage_id.clone(),
469                    timeout_secs: self.config.timeout_secs,
470                });
471            }
472        };
473
474        // Truncate stderr to avoid huge allocations from noisy runtimes.
475        let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
476        let stderr = String::from_utf8_lossy(stderr_raw);
477
478        if !out.status.success() {
479            return Err(ExecutionError::StageFailed {
480                stage_id: stage_id.clone(),
481                message: Self::classify_error(&stderr, out.status.code()),
482            });
483        }
484
485        // Truncate stdout to the configured limit.
486        let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
487        let stdout = String::from_utf8_lossy(stdout_raw);
488
489        if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
490            return Err(ExecutionError::StageFailed {
491                stage_id: stage_id.clone(),
492                message: format!(
493                    "stage output exceeded {} bytes limit",
494                    self.config.max_output_bytes
495                ),
496            });
497        }
498
499        serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
500            stage_id: stage_id.clone(),
501            message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
502        })
503    }
504
505    /// Classify a non-zero exit into a human-readable message, distinguishing
506    /// Nix infrastructure errors from user code errors.
507    fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
508        // Nix daemon / networking errors.
509        if stderr.contains("cannot connect to nix daemon")
510            || stderr.contains("Cannot connect to the Nix daemon")
511        {
512            return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
513                    or `nix daemon`"
514                .to_string();
515        }
516        if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
517            return format!(
518                "nix flake error (check network / nixpkgs access): {}",
519                first_line(stderr)
520            );
521        }
522        if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
523            return format!(
524                "nix failed to fetch runtime package (check network): {}",
525                first_line(stderr)
526            );
527        }
528        if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
529            return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
530                .to_string();
531        }
532        if stderr.contains("nix: command not found") || stderr.contains("No such file") {
533            return "nix binary not found — is Nix installed?".to_string();
534        }
535        // User code errors (exit 1 from the stage wrapper).
536        let code_str = exit_code
537            .map(|c| format!(" (exit {c})"))
538            .unwrap_or_default();
539        if stderr.trim().is_empty() {
540            format!("stage exited without output{code_str}")
541        } else {
542            format!("stage error{code_str}: {stderr}")
543        }
544    }
545
546    /// Build the nix subcommand + argument list for running a stage script.
547    ///
548    /// - Python with no third-party imports: `nix run nixpkgs#python3 -- script.py`
549    /// - Python with third-party imports:    `nix shell nixpkgs#python3Packages.X ... --command python3 script.py`
550    /// - JS/Bash: `nix run nixpkgs#<runtime> -- script`
551    fn build_nix_command(
552        &self,
553        language: &str,
554        script: &Path,
555        code: &str,
556    ) -> (String, Vec<String>) {
557        let script_path = script.to_str().unwrap_or("/dev/null").to_string();
558
559        match language {
560            "python" | "python3" | "" => {
561                // If the code has `# requires:` with pip packages, use a venv
562                // with system Python instead of Nix (Nix's python3Packages
563                // don't reliably work with `nix shell`).
564                if let Some(reqs) = Self::extract_pip_requirements(code) {
565                    let venv_hash = {
566                        use sha2::{Digest, Sha256};
567                        let h = Sha256::digest(reqs.as_bytes());
568                        hex::encode(&h[..8])
569                    };
570                    let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
571                    let venv_str = venv_dir.to_string_lossy().to_string();
572                    let python = venv_dir.join("bin").join("python3");
573                    let python_str = python.to_string_lossy().to_string();
574
575                    // Create venv + install deps if not cached
576                    if !python.exists() {
577                        let setup = std::process::Command::new("python3")
578                            .args(["-m", "venv", &venv_str])
579                            .output();
580                        if let Ok(out) = setup {
581                            if out.status.success() {
582                                let pip = venv_dir.join("bin").join("pip");
583                                let pkgs: Vec<&str> = reqs.split(", ").collect();
584                                let mut pip_args =
585                                    vec!["install", "--quiet", "--disable-pip-version-check"];
586                                pip_args.extend(pkgs);
587                                let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
588                                    .args(&pip_args)
589                                    .output();
590                            }
591                        }
592                    }
593
594                    // Run with the venv Python directly (no nix)
595                    return ("__direct__".to_string(), vec![python_str, script_path]);
596                }
597
598                let extra_pkgs = Self::detect_python_packages(code);
599                if extra_pkgs.is_empty() {
600                    (
601                        "run".to_string(),
602                        vec!["nixpkgs#python3".into(), "--".into(), script_path],
603                    )
604                } else {
605                    let mut args: Vec<String> = extra_pkgs
606                        .iter()
607                        .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
608                        .collect();
609                    args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
610                    ("shell".to_string(), args)
611                }
612            }
613            "javascript" | "js" => (
614                "run".to_string(),
615                vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
616            ),
617            _ => (
618                "run".to_string(),
619                vec!["nixpkgs#bash".into(), "--".into(), script_path],
620            ),
621        }
622    }
623
624    /// Extract pip requirements from `# requires: pkg1==ver, pkg2==ver` comments.
625    ///
626    /// Each spec is validated to prevent typosquatting and shell-metacharacter
627    /// injection from LLM-authored stages. By default, every package must be
628    /// pinned to an exact version (`pkg==1.2.3`). Set
629    /// `NOETHER_ALLOW_UNPINNED_PIP=1` to lift the pinning requirement for
630    /// local development; the character-set validation always runs.
631    ///
632    /// Invalid specs are dropped with a warning; if no valid specs remain,
633    /// this returns `None` and the caller falls back to the default Nix
634    /// runtime (where the missing dependency will surface as an honest
635    /// runtime error instead of a silent pip-install of attacker-chosen
636    /// names).
637    fn extract_pip_requirements(code: &str) -> Option<String> {
638        for line in code.lines() {
639            let trimmed = line.trim();
640            let Some(reqs_raw) = trimmed.strip_prefix("# requires:") else {
641                continue;
642            };
643            let reqs = reqs_raw.trim();
644            if reqs.is_empty() {
645                continue;
646            }
647            let valid: Vec<String> = reqs
648                .split(',')
649                .map(|s| s.trim())
650                .filter(|s| !s.is_empty())
651                .filter(|s| match validate_pip_spec(s) {
652                    Ok(()) => true,
653                    Err(reason) => {
654                        eprintln!(
655                            "[noether] rejected `# requires:` entry {s:?} ({reason}); skipping"
656                        );
657                        false
658                    }
659                })
660                .map(|s| s.to_string())
661                .collect();
662
663            if valid.is_empty() {
664                eprintln!(
665                    "[noether] all `# requires:` entries rejected (raw={reqs:?}); falling back to default Nix runtime"
666                );
667                return None;
668            }
669            return Some(valid.join(", "));
670        }
671        None
672    }
673
674    /// Scan Python source for `import X` / `from X import` statements and return
675    /// the Nix package names for any recognised third-party libraries.
676    fn detect_python_packages(code: &str) -> Vec<&'static str> {
677        // Map of Python import name → nixpkgs python3Packages attribute name.
678        const KNOWN: &[(&str, &str)] = &[
679            ("requests", "requests"),
680            ("httpx", "httpx"),
681            ("aiohttp", "aiohttp"),
682            ("bs4", "beautifulsoup4"),
683            ("lxml", "lxml"),
684            ("pandas", "pandas"),
685            ("numpy", "numpy"),
686            ("scipy", "scipy"),
687            ("sklearn", "scikit-learn"),
688            ("PIL", "Pillow"),
689            ("cv2", "opencv4"),
690            ("yaml", "pyyaml"),
691            ("toml", "toml"),
692            ("dateutil", "python-dateutil"),
693            ("pytz", "pytz"),
694            ("boto3", "boto3"),
695            ("psycopg2", "psycopg2"),
696            ("pymongo", "pymongo"),
697            ("redis", "redis"),
698            ("celery", "celery"),
699            ("fastapi", "fastapi"),
700            ("pydantic", "pydantic"),
701            ("cryptography", "cryptography"),
702            ("jwt", "pyjwt"),
703            ("paramiko", "paramiko"),
704            ("dotenv", "python-dotenv"),
705            ("joblib", "joblib"),
706            ("torch", "pytorch"),
707            ("transformers", "transformers"),
708            ("datasets", "datasets"),
709            ("pyarrow", "pyarrow"),
710        ];
711
712        let mut found: Vec<&'static str> = Vec::new();
713        for (import_name, nix_name) in KNOWN {
714            let patterns = [
715                format!("import {import_name}"),
716                format!("import {import_name} "),
717                format!("from {import_name} "),
718                format!("from {import_name}."),
719            ];
720            if patterns.iter().any(|p| code.contains(p.as_str())) {
721                found.push(nix_name);
722            }
723        }
724        found
725    }
726
727    // ── Language wrappers ────────────────────────────────────────────────────
728
729    #[cfg(test)]
730    #[allow(dead_code)]
731    fn _expose_extract_future_imports(code: &str) -> (String, String) {
732        Self::extract_future_imports(code)
733    }
734
735    /// Pull every `from __future__ import ...` line out of `code` and return
736    /// `(joined_future_imports, code_without_them)`. The future imports are
737    /// returned with trailing newlines so the caller can embed them directly
738    /// at the top of the wrapper. Detection is line-based (no AST) — matches
739    /// any non-indented line starting with `from __future__ import`.
740    fn extract_future_imports(code: &str) -> (String, String) {
741        let mut hoisted = String::new();
742        let mut remaining = String::new();
743        for line in code.lines() {
744            let trimmed = line.trim_start();
745            if !line.starts_with(' ')
746                && !line.starts_with('\t')
747                && trimmed.starts_with("from __future__ import")
748            {
749                hoisted.push_str(line);
750                hoisted.push('\n');
751            } else {
752                remaining.push_str(line);
753                remaining.push('\n');
754            }
755        }
756        (hoisted, remaining)
757    }
758
759    fn wrap_python(user_code: &str) -> String {
760        // Skip pip install — dependencies are handled by the venv executor
761        // (build_nix_command creates a venv with pip packages pre-installed)
762        // or by Nix packages (for known imports like numpy, pandas, etc.).
763        let pip_install = String::new();
764
765        // Hoist any `from __future__ import ...` lines out of user code and
766        // emit them as the very first statements of the wrapper. Python
767        // requires `__future__` imports to be the first non-comment,
768        // non-docstring statement in a module — leaving them embedded in the
769        // user-code block (which is line ~17 of the wrapped file) raises
770        // `SyntaxError: from __future__ imports must occur at the
771        // beginning of the file`.
772        let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
773
774        format!(
775            r#"{future_imports}import sys, json as _json
776{pip_install}
777# ---- user implementation ----
778{user_code_clean}
779# ---- end implementation ----
780
781if __name__ == '__main__':
782    if 'execute' not in dir() or not callable(globals().get('execute')):
783        print(
784            "Noether stage error: implementation must define a top-level "
785            "function `def execute(input): ...` that takes the parsed input dict "
786            "and returns the output dict. Do not read from stdin or print to stdout — "
787            "the Noether runtime handles I/O for you.",
788            file=sys.stderr,
789        )
790        sys.exit(1)
791    try:
792        _raw = _json.loads(sys.stdin.read())
793        # If the runtime passed input as a JSON-encoded string, decode it once more.
794        # This happens when input arrives as null or a bare string from the CLI.
795        if isinstance(_raw, str):
796            try:
797                _raw = _json.loads(_raw)
798            except Exception:
799                pass
800        _output = execute(_raw if _raw is not None else {{}})
801        print(_json.dumps(_output))
802    except Exception as _e:
803        print(str(_e), file=sys.stderr)
804        sys.exit(1)
805"#
806        )
807    }
808
809    fn wrap_javascript(user_code: &str) -> String {
810        format!(
811            r#"const _readline = require('readline');
812let _input = '';
813process.stdin.on('data', d => _input += d);
814process.stdin.on('end', () => {{
815    try {{
816        // ---- user implementation ----
817        {user_code}
818        // ---- end implementation ----
819        const _result = execute(JSON.parse(_input));
820        process.stdout.write(JSON.stringify(_result) + '\n');
821    }} catch (e) {{
822        process.stderr.write(String(e) + '\n');
823        process.exit(1);
824    }}
825}});
826"#
827        )
828    }
829
830    fn wrap_bash(user_code: &str) -> String {
831        format!(
832            r#"#!/usr/bin/env bash
833set -euo pipefail
834INPUT=$(cat)
835
836# ---- user implementation ----
837{user_code}
838# ---- end implementation ----
839
840execute "$INPUT"
841"#
842        )
843    }
844}
845
846// ── Helpers ──────────────────────────────────────────────────────────────────
847
848/// Return the first non-empty line of a multi-line string, trimmed.
849fn first_line(s: &str) -> &str {
850    s.lines()
851        .map(str::trim)
852        .find(|l| !l.is_empty())
853        .unwrap_or(s)
854}
855
856/// Validate a single pip requirement spec from a `# requires:` comment.
857///
858/// Accepts `pkg==version`. The package name must match PEP 503 normalisation
859/// (letters, digits, `_`, `-`, `.`). The version must be a straightforward
860/// PEP 440-ish literal (letters, digits, `.`, `+`, `!`, `-`). The pinning
861/// requirement (`==`) can be lifted with `NOETHER_ALLOW_UNPINNED_PIP=1`
862/// for local dev, but the character-set validation always runs so injected
863/// shell metacharacters, quotes, or URL-form specs are rejected.
864fn validate_pip_spec(spec: &str) -> Result<(), &'static str> {
865    let allow_unpinned = matches!(
866        std::env::var("NOETHER_ALLOW_UNPINNED_PIP").as_deref(),
867        Ok("1" | "true" | "yes" | "on")
868    );
869
870    // Split at the first `==`. If absent, require the opt-in flag.
871    let (name, version) = match spec.split_once("==") {
872        Some((n, v)) => (n.trim(), Some(v.trim())),
873        None => {
874            if !allow_unpinned {
875                return Err("unpinned; use pkg==version or set NOETHER_ALLOW_UNPINNED_PIP=1");
876            }
877            (spec.trim(), None)
878        }
879    };
880
881    if name.is_empty() {
882        return Err("empty package name");
883    }
884    if !name
885        .bytes()
886        .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'_' | b'-' | b'.'))
887    {
888        return Err("package name contains disallowed characters");
889    }
890    if let Some(v) = version {
891        if v.is_empty() {
892            return Err("empty version after `==`");
893        }
894        if !v
895            .bytes()
896            .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'+' | b'!' | b'-'))
897        {
898            return Err("version contains disallowed characters");
899        }
900    }
901    Ok(())
902}
903
904// ── StageExecutor impl ────────────────────────────────────────────────────────
905
906impl StageExecutor for NixExecutor {
907    fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
908        let impl_ = self
909            .implementations
910            .get(&stage_id.0)
911            .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
912
913        let code_hash = Self::code_hash(&impl_.code);
914        let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
915        self.run_script(stage_id, &script, &impl_.language, input)
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use super::*;
922
923    #[allow(dead_code)] // only used by the ignored integration tests
924    fn make_executor() -> NixExecutor {
925        let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
926        let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
927        let _ = std::fs::create_dir_all(&cache_dir);
928        NixExecutor {
929            nix_bin,
930            cache_dir,
931            config: NixConfig::default(),
932            implementations: HashMap::new(),
933        }
934    }
935
936    #[test]
937    fn register_with_effects_preserves_network_effect() {
938        // Regression guard on the synthesized-stage effects path.
939        // Pre-hardening, `register_synthesized` → `NixExecutor::register`
940        // dropped the declared effects and stamped `EffectSet::pure()`
941        // onto the stored `StageImpl`. A Network-effect stage ended
942        // up with a no-network sandbox and failed with DNS errors at
943        // runtime. The `register()` shim is gone; this test locks in
944        // that `register_with_effects` is the only registration path
945        // and that it threads the effects through verbatim.
946        use noether_core::effects::{Effect, EffectSet};
947        let mut exec = make_executor();
948        let id = StageId("sig_network".into());
949        let effects = EffectSet::new([Effect::Pure, Effect::Network]);
950        exec.register_with_effects(&id, "code", "python", effects.clone());
951        let stored = exec
952            .implementations
953            .get(&id.0)
954            .expect("stage should be registered");
955        assert_eq!(
956            stored.effects, effects,
957            "declared effects must survive register_with_effects"
958        );
959        assert!(
960            stored.effects.iter().any(|e| matches!(e, Effect::Network)),
961            "Network must be preserved so the sandbox opens the net ns"
962        );
963    }
964
965    #[test]
966    fn validate_pip_spec_accepts_pinned() {
967        assert!(validate_pip_spec("pandas==2.0.0").is_ok());
968        assert!(validate_pip_spec("scikit-learn==1.5.1").is_ok());
969        assert!(validate_pip_spec("urllib3==2.2.3").is_ok());
970        assert!(validate_pip_spec("pydantic==2.5.0+cu121").is_ok());
971    }
972
973    #[test]
974    fn validate_pip_spec_rejects_unpinned_by_default() {
975        // Ensure the opt-in flag is not accidentally set in the test env.
976        let guard = (std::env::var_os("NOETHER_ALLOW_UNPINNED_PIP"),);
977        // SAFETY: single-threaded test — no other test reads this var at the same time.
978        unsafe {
979            std::env::remove_var("NOETHER_ALLOW_UNPINNED_PIP");
980        }
981        let result = validate_pip_spec("pandas");
982        // Restore prior state before asserting.
983        if let (Some(prev),) = guard {
984            unsafe {
985                std::env::set_var("NOETHER_ALLOW_UNPINNED_PIP", prev);
986            }
987        }
988        assert!(result.is_err(), "bare name must be rejected without opt-in");
989    }
990
991    #[test]
992    fn validate_pip_spec_rejects_shell_metacharacters() {
993        for bad in [
994            "pandas; rm -rf /",
995            "pandas==$(whoami)",
996            "pandas==1.0.0; echo pwned",
997            "pandas==`id`",
998            "https://evil.example/wheel.whl",
999            "git+https://example.com/repo.git",
1000            "pkg with space==1.0",
1001            "pkg==1.0 && echo",
1002        ] {
1003            assert!(validate_pip_spec(bad).is_err(), "should reject {bad:?}");
1004        }
1005    }
1006
1007    #[test]
1008    fn validate_pip_spec_rejects_empty() {
1009        assert!(validate_pip_spec("==1.0").is_err());
1010        assert!(validate_pip_spec("pkg==").is_err());
1011    }
1012
1013    #[test]
1014    fn detect_python_packages_requests() {
1015        let code = "import requests\ndef execute(v):\n    return requests.get(v).json()";
1016        let pkgs = NixExecutor::detect_python_packages(code);
1017        assert!(
1018            pkgs.contains(&"requests"),
1019            "expected 'requests' in {pkgs:?}"
1020        );
1021    }
1022
1023    #[test]
1024    fn detect_python_packages_stdlib_only() {
1025        let code = "import urllib.request, json\ndef execute(v):\n    return json.loads(v)";
1026        let pkgs = NixExecutor::detect_python_packages(code);
1027        assert!(
1028            pkgs.is_empty(),
1029            "stdlib imports should not trigger packages: {pkgs:?}"
1030        );
1031    }
1032
1033    #[test]
1034    fn detect_python_packages_multiple() {
1035        let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
1036        let pkgs = NixExecutor::detect_python_packages(code);
1037        assert!(pkgs.contains(&"pandas"));
1038        assert!(pkgs.contains(&"numpy"));
1039        assert!(pkgs.contains(&"beautifulsoup4"));
1040    }
1041
1042    fn test_executor() -> NixExecutor {
1043        NixExecutor {
1044            nix_bin: PathBuf::from("/usr/bin/nix"),
1045            cache_dir: PathBuf::from("/tmp/noether-test-cache"),
1046            config: NixConfig::default(),
1047            implementations: HashMap::new(),
1048        }
1049    }
1050
1051    #[test]
1052    fn build_nix_command_no_packages() {
1053        let exec = test_executor();
1054        let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
1055        assert_eq!(sub, "run");
1056        assert!(args.iter().any(|a| a.contains("python3")));
1057        assert!(!args.iter().any(|a| a.contains("shell")));
1058    }
1059
1060    #[test]
1061    fn build_nix_command_with_requests() {
1062        let exec = test_executor();
1063        let (sub, args) =
1064            exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
1065        assert_eq!(sub, "shell");
1066        assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
1067        assert!(args.iter().any(|a| a == "--command"));
1068        // Must NOT include bare nixpkgs#python3 — it conflicts with python3Packages.*
1069        assert!(
1070            !args.iter().any(|a| a == "nixpkgs#python3"),
1071            "bare python3 conflicts: {args:?}"
1072        );
1073    }
1074
1075    #[test]
1076    fn python_wrapper_contains_boilerplate() {
1077        let wrapped = NixExecutor::wrap_python("def execute(x):\n    return x + 1");
1078        assert!(wrapped.contains("sys.stdin.read()"));
1079        assert!(wrapped.contains("_json.dumps(_output)"));
1080        assert!(wrapped.contains("def execute(x)"));
1081    }
1082
1083    #[test]
1084    fn code_hash_is_stable() {
1085        let h1 = NixExecutor::code_hash("hello world");
1086        let h2 = NixExecutor::code_hash("hello world");
1087        let h3 = NixExecutor::code_hash("different");
1088        assert_eq!(h1, h2);
1089        assert_ne!(h1, h3);
1090    }
1091
1092    #[test]
1093    fn classify_error_daemon_not_running() {
1094        let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
1095        assert!(msg.contains("nix daemon is not running"), "got: {msg}");
1096    }
1097
1098    #[test]
1099    fn future_imports_are_hoisted_out_of_user_code() {
1100        let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n    return input\n";
1101        let wrapped = NixExecutor::wrap_python(user);
1102        // The future import must come BEFORE `import sys, json as _json`.
1103        let future_pos = wrapped
1104            .find("from __future__ import annotations")
1105            .expect("future import should be present in wrapper");
1106        let stdlib_pos = wrapped
1107            .find("import sys, json as _json")
1108            .expect("stdlib imports should be present");
1109        assert!(
1110            future_pos < stdlib_pos,
1111            "future import must precede stdlib imports in wrapped output"
1112        );
1113    }
1114
1115    #[test]
1116    fn user_code_without_future_imports_is_unchanged() {
1117        let user = "import json\n\ndef execute(input):\n    return input\n";
1118        let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
1119        assert_eq!(hoisted, "");
1120        assert_eq!(remaining.trim(), user.trim());
1121    }
1122
1123    #[test]
1124    fn nested_future_import_inside_function_is_not_hoisted() {
1125        // Indented "from __future__" lines (inside a function) are not
1126        // valid Python anyway, but the hoister must not promote them.
1127        let user =
1128            "def execute(input):\n    from __future__ import annotations\n    return input\n";
1129        let (hoisted, _) = NixExecutor::extract_future_imports(user);
1130        assert_eq!(hoisted, "");
1131    }
1132
1133    #[test]
1134    fn classify_error_user_code_exit1() {
1135        let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
1136        assert!(msg.contains("ValueError"), "got: {msg}");
1137        assert!(msg.contains("exit 1"), "got: {msg}");
1138    }
1139
1140    #[test]
1141    fn classify_error_disk_full() {
1142        let msg = NixExecutor::classify_error("No space left on device", Some(1));
1143        assert!(msg.contains("disk space"), "got: {msg}");
1144    }
1145
1146    #[test]
1147    fn classify_error_empty_stderr() {
1148        let msg = NixExecutor::classify_error("", Some(137));
1149        assert!(msg.contains("exit 137"), "got: {msg}");
1150    }
1151
1152    #[test]
1153    fn nix_config_defaults() {
1154        let cfg = NixConfig::default();
1155        assert_eq!(cfg.timeout_secs, 30);
1156        assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
1157        assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
1158    }
1159
1160    #[test]
1161    fn first_line_extracts_correctly() {
1162        assert_eq!(first_line("  \nfoo\nbar"), "foo");
1163        assert_eq!(first_line("single"), "single");
1164        assert_eq!(first_line(""), "");
1165    }
1166
1167    /// Integration test — runs when nix is available (skips gracefully if not).
1168    /// Requires a warm Nix binary cache; run with `cargo test -- --ignored` to include.
1169    #[test]
1170    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1171    fn nix_python_identity_stage() {
1172        let nix_bin = match NixExecutor::find_nix() {
1173            Some(p) => p,
1174            None => {
1175                eprintln!("nix not found, skipping");
1176                return;
1177            }
1178        };
1179
1180        let cache_dir = std::env::temp_dir().join("noether-nix-integ");
1181        let _ = std::fs::create_dir_all(&cache_dir);
1182
1183        let code = "def execute(x):\n    return x";
1184        let executor = NixExecutor {
1185            nix_bin,
1186            cache_dir,
1187            config: NixConfig::default(),
1188            implementations: {
1189                let mut m = HashMap::new();
1190                let id = StageId("test_identity".into());
1191                m.insert(
1192                    id.0.clone(),
1193                    StageImpl {
1194                        code: code.into(),
1195                        language: "python".into(),
1196                        effects: noether_core::effects::EffectSet::pure(),
1197                    },
1198                );
1199                m
1200            },
1201        };
1202
1203        let id = StageId("test_identity".into());
1204        let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
1205        assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
1206    }
1207
1208    /// Verify that a stage that hangs returns TimedOut, not a hang.
1209    /// Requires nix + warm binary cache; run with `cargo test -- --ignored`.
1210    #[test]
1211    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1212    fn nix_timeout_kills_hanging_stage() {
1213        let nix_bin = match NixExecutor::find_nix() {
1214            Some(p) => p,
1215            None => {
1216                eprintln!("nix not found, skipping timeout test");
1217                return;
1218            }
1219        };
1220
1221        let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
1222        let _ = std::fs::create_dir_all(&cache_dir);
1223
1224        let code = "import time\ndef execute(x):\n    time.sleep(9999)\n    return x";
1225        let executor = NixExecutor {
1226            nix_bin,
1227            cache_dir,
1228            config: NixConfig {
1229                timeout_secs: 2,
1230                ..NixConfig::default()
1231            },
1232            implementations: {
1233                let mut m = HashMap::new();
1234                let id = StageId("hanging".into());
1235                m.insert(
1236                    id.0.clone(),
1237                    StageImpl {
1238                        code: code.into(),
1239                        language: "python".into(),
1240                        effects: noether_core::effects::EffectSet::pure(),
1241                    },
1242                );
1243                m
1244            },
1245        };
1246
1247        let id = StageId("hanging".into());
1248        let result = executor.execute(&id, &serde_json::json!(null));
1249        assert!(
1250            matches!(
1251                result,
1252                Err(ExecutionError::TimedOut {
1253                    timeout_secs: 2,
1254                    ..
1255                })
1256            ),
1257            "expected TimedOut, got: {result:?}"
1258        );
1259    }
1260}