Skip to main content

noether_engine/executor/
nix.rs

1//! Nix-based executor for synthesized stages.
2//!
3//! Runs stage implementations as subprocesses using `nix run nixpkgs#<runtime>`,
4//! giving a reproducible, Nix-pinned runtime for Python/JavaScript/Bash code
5//! without requiring any ambient language runtime on the host.
6//!
7//! **This is a reproducibility boundary, not an isolation boundary.** Stages
8//! run with the privileges of the host user — they can read/write the
9//! filesystem, make arbitrary network calls, and read environment variables.
10//! Do not execute untrusted stages on a host with credentials you are not
11//! willing to risk. See SECURITY.md for the full trust model.
12//!
13//! ## Execution protocol
14//!
15//! - stdin  → JSON-encoded input value followed by a newline
16//! - stdout → JSON-encoded output value followed by a newline
17//! - stderr → error message (any content is treated as failure)
18//! - exit 0 → success; exit non-zero → `ExecutionError::StageFailed`
19//!
20//! ## Timeout
21//!
22//! Every execution is bounded by [`NixConfig::timeout_secs`] (default 30 s).
23//! When the child process exceeds the limit it is sent SIGKILL and the call
24//! returns [`ExecutionError::TimedOut`].
25//!
26//! ## Generated wrapper (Python example)
27//!
28//! ```python
29//! import sys, json as _json
30//!
31//! # ---- user code ----
32//! def execute(input_value):
33//!     ...
34//! # -------------------
35//!
36//! if __name__ == '__main__':
37//!     try:
38//!         _output = execute(_json.loads(sys.stdin.read()))
39//!         print(_json.dumps(_output))
40//!     except Exception as e:
41//!         print(str(e), file=sys.stderr)
42//!         sys.exit(1)
43//! ```
44
45use super::{ExecutionError, StageExecutor};
46use noether_core::stage::StageId;
47use serde_json::Value;
48use sha2::{Digest, Sha256};
49use std::collections::HashMap;
50use std::io::Write as IoWrite;
51use std::path::{Path, PathBuf};
52use std::process::{Command, Stdio};
53use std::sync::mpsc;
54use std::time::Duration;
55
56// ── Configuration ────────────────────────────────────────────────────────────
57
58/// Tunable knobs for the [`NixExecutor`].
59#[derive(Debug, Clone)]
60pub struct NixConfig {
61    /// Wall-clock timeout for a single stage execution in seconds.
62    /// The child process is killed with SIGKILL when exceeded.
63    /// Default: 30 s.
64    pub timeout_secs: u64,
65    /// Maximum number of bytes read from a stage's stdout before truncation.
66    /// Prevents runaway allocations from stages that produce huge outputs.
67    /// Default: 10 MiB.
68    pub max_output_bytes: usize,
69    /// Maximum number of bytes captured from stderr (for error messages).
70    /// Default: 64 KiB.
71    pub max_stderr_bytes: usize,
72}
73
74impl Default for NixConfig {
75    fn default() -> Self {
76        Self {
77            timeout_secs: 30,
78            max_output_bytes: 10 * 1024 * 1024,
79            max_stderr_bytes: 64 * 1024,
80        }
81    }
82}
83
84// ── Internal stage storage ───────────────────────────────────────────────────
85
86/// Maps stage IDs to their implementation (source code + language tag).
87#[derive(Clone)]
88struct StageImpl {
89    code: String,
90    language: String,
91}
92
93// ── NixExecutor ──────────────────────────────────────────────────────────────
94
95/// Executor that runs synthesized stages through Nix-managed language runtimes.
96///
97/// When `nix` is available, each stage is executed as a subprocess with a
98/// Nix-pinned runtime (e.g. `nix run nixpkgs#python3 -- stage.py`). The Nix
99/// binary cache ensures the runtime is downloaded once and then reused from
100/// the store. **This gives reproducibility, not isolation**: the subprocess
101/// inherits the host user's privileges, filesystem, and network. See module
102/// docs and SECURITY.md for the full trust model.
103///
104/// ## Resource limits
105///
106/// - **Timeout**: configured via [`NixConfig::timeout_secs`] (default 30 s).
107///   The child is sent SIGKILL when the limit is exceeded.
108/// - **Output cap**: configured via [`NixConfig::max_output_bytes`] (default 10 MiB).
109pub struct NixExecutor {
110    nix_bin: PathBuf,
111    cache_dir: PathBuf,
112    config: NixConfig,
113    implementations: HashMap<String, StageImpl>,
114}
115
116impl NixExecutor {
117    /// Probe the system for a usable `nix` binary.
118    /// Returns the path if found, or `None` if Nix is not installed.
119    pub fn find_nix() -> Option<PathBuf> {
120        // Determinate Systems installer puts nix here:
121        let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
122        if determinate.exists() {
123            return Some(determinate);
124        }
125
126        // Walk $PATH directly rather than spawning `which`. Avoids a
127        // subprocess + the risk that `which` is missing or shadowed on
128        // minimal systems (e.g. some container base images).
129        let path_env = std::env::var_os("PATH")?;
130        for dir in std::env::split_paths(&path_env) {
131            let candidate = dir.join("nix");
132            if candidate.is_file() {
133                return Some(candidate);
134            }
135        }
136        None
137    }
138
139    /// Build an executor that can run synthesized stages found in `store`.
140    ///
141    /// Returns `None` when `nix` is not available — callers should fall back to
142    /// `InlineExecutor` exclusively in that case.
143    pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
144        Self::from_store_with_config(store, NixConfig::default())
145    }
146
147    /// Like [`from_store`] but with a custom [`NixConfig`].
148    pub fn from_store_with_config(
149        store: &dyn noether_store::StageStore,
150        config: NixConfig,
151    ) -> Option<Self> {
152        let nix_bin = Self::find_nix()?;
153
154        let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
155        let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
156        let _ = std::fs::create_dir_all(&cache_dir);
157
158        let mut implementations = HashMap::new();
159        for stage in store.list(None) {
160            if let (Some(code), Some(lang)) =
161                (&stage.implementation_code, &stage.implementation_language)
162            {
163                implementations.insert(
164                    stage.id.0.clone(),
165                    StageImpl {
166                        code: code.clone(),
167                        language: lang.clone(),
168                    },
169                );
170            }
171        }
172
173        Some(Self {
174            nix_bin,
175            cache_dir,
176            config,
177            implementations,
178        })
179    }
180
181    /// Register an in-memory synthesized stage without re-querying the store.
182    pub fn register(&mut self, stage_id: &StageId, code: &str, language: &str) {
183        self.implementations.insert(
184            stage_id.0.clone(),
185            StageImpl {
186                code: code.into(),
187                language: language.into(),
188            },
189        );
190    }
191
192    /// True when we have a real implementation for this stage.
193    pub fn has_implementation(&self, stage_id: &StageId) -> bool {
194        self.implementations.contains_key(&stage_id.0)
195    }
196
197    /// Pre-fetch the Python 3 runtime into the Nix store in a background thread.
198    ///
199    /// The first time any Python stage runs, Nix may take several seconds to
200    /// download and verify the runtime closure.  Calling `warmup()` at startup
201    /// overlaps that latency with application boot time.
202    ///
203    /// The returned `JoinHandle` can be ignored — any error is logged to stderr
204    /// but does not affect correctness; the runtime will still be fetched on first
205    /// actual use.
206    pub fn warmup(&self) -> std::thread::JoinHandle<()> {
207        let nix_bin = self.nix_bin.clone();
208        std::thread::spawn(move || {
209            // `nix build` with `--dry-run` is enough to populate the binary cache
210            // without running any user code.
211            let status = Command::new(&nix_bin)
212                .args([
213                    "build",
214                    "--no-link",
215                    "--quiet",
216                    "--no-write-lock-file",
217                    "nixpkgs#python3",
218                ])
219                .stdout(Stdio::null())
220                .stderr(Stdio::null())
221                .status();
222            match status {
223                Ok(s) if s.success() => {
224                    eprintln!("[noether] nix warmup: python3 runtime cached");
225                }
226                Ok(s) => {
227                    eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
228                }
229                Err(e) => {
230                    eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
231                }
232            }
233        })
234    }
235
236    // ── Internal helpers ─────────────────────────────────────────────────────
237
238    /// Hash the code string to get a stable cache key.
239    fn code_hash(code: &str) -> String {
240        hex::encode(Sha256::digest(code.as_bytes()))
241    }
242
243    /// Ensure the wrapped script for `impl_hash` exists on disk.
244    /// Returns the path to the file.
245    fn ensure_script(
246        &self,
247        impl_hash: &str,
248        code: &str,
249        language: &str,
250    ) -> Result<PathBuf, ExecutionError> {
251        let ext = match language {
252            "javascript" | "js" => "js",
253            "bash" | "sh" => "sh",
254            _ => "py",
255        };
256
257        let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
258        if path.exists() {
259            return Ok(path);
260        }
261
262        let wrapped = match language {
263            "javascript" | "js" => Self::wrap_javascript(code),
264            "bash" | "sh" => Self::wrap_bash(code),
265            _ => Self::wrap_python(code),
266        };
267
268        std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
269            stage_id: StageId(impl_hash.into()),
270            message: format!("failed to write stage script: {e}"),
271        })?;
272
273        Ok(path)
274    }
275
276    /// Run the stage script via Nix with JSON on stdin, enforcing timeout and
277    /// output-size limits.
278    fn run_script(
279        &self,
280        stage_id: &StageId,
281        script: &Path,
282        language: &str,
283        input: &Value,
284    ) -> Result<Value, ExecutionError> {
285        let input_json = serde_json::to_string(input).unwrap_or_default();
286
287        let code = self
288            .implementations
289            .get(&stage_id.0)
290            .map(|i| i.code.as_str())
291            .unwrap_or("");
292
293        let (nix_subcommand, args) = self.build_nix_command(language, script, code);
294
295        // __direct__ means run the binary directly (venv Python), not via nix
296        let mut child = if nix_subcommand == "__direct__" {
297            Command::new(&args[0])
298                .args(&args[1..])
299                .stdin(Stdio::piped())
300                .stdout(Stdio::piped())
301                .stderr(Stdio::piped())
302                .spawn()
303        } else {
304            Command::new(&self.nix_bin)
305                .arg(&nix_subcommand)
306                .args(["--no-write-lock-file", "--quiet"])
307                .args(&args)
308                .stdin(Stdio::piped())
309                .stdout(Stdio::piped())
310                .stderr(Stdio::piped())
311                .spawn()
312        }
313        .map_err(|e| ExecutionError::StageFailed {
314            stage_id: stage_id.clone(),
315            message: format!("failed to spawn process: {e}"),
316        })?;
317
318        // Write stdin in a background thread so we don't deadlock when the
319        // child's stdin pipe fills before we start reading stdout.
320        if let Some(mut stdin) = child.stdin.take() {
321            let bytes = input_json.into_bytes();
322            std::thread::spawn(move || {
323                let _ = stdin.write_all(&bytes);
324            });
325        }
326
327        // Collect output with a wall-clock timeout.
328        let pid = child.id();
329        let timeout = Duration::from_secs(self.config.timeout_secs);
330        let (tx, rx) = mpsc::channel();
331        std::thread::spawn(move || {
332            let _ = tx.send(child.wait_with_output());
333        });
334
335        let out = match rx.recv_timeout(timeout) {
336            Ok(Ok(o)) => o,
337            Ok(Err(e)) => {
338                return Err(ExecutionError::StageFailed {
339                    stage_id: stage_id.clone(),
340                    message: format!("nix process error: {e}"),
341                });
342            }
343            Err(_elapsed) => {
344                // Best-effort kill — process may already have exited.
345                let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
346                return Err(ExecutionError::TimedOut {
347                    stage_id: stage_id.clone(),
348                    timeout_secs: self.config.timeout_secs,
349                });
350            }
351        };
352
353        // Truncate stderr to avoid huge allocations from noisy runtimes.
354        let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
355        let stderr = String::from_utf8_lossy(stderr_raw);
356
357        if !out.status.success() {
358            return Err(ExecutionError::StageFailed {
359                stage_id: stage_id.clone(),
360                message: Self::classify_error(&stderr, out.status.code()),
361            });
362        }
363
364        // Truncate stdout to the configured limit.
365        let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
366        let stdout = String::from_utf8_lossy(stdout_raw);
367
368        if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
369            return Err(ExecutionError::StageFailed {
370                stage_id: stage_id.clone(),
371                message: format!(
372                    "stage output exceeded {} bytes limit",
373                    self.config.max_output_bytes
374                ),
375            });
376        }
377
378        serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
379            stage_id: stage_id.clone(),
380            message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
381        })
382    }
383
384    /// Classify a non-zero exit into a human-readable message, distinguishing
385    /// Nix infrastructure errors from user code errors.
386    fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
387        // Nix daemon / networking errors.
388        if stderr.contains("cannot connect to nix daemon")
389            || stderr.contains("Cannot connect to the Nix daemon")
390        {
391            return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
392                    or `nix daemon`"
393                .to_string();
394        }
395        if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
396            return format!(
397                "nix flake error (check network / nixpkgs access): {}",
398                first_line(stderr)
399            );
400        }
401        if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
402            return format!(
403                "nix failed to fetch runtime package (check network): {}",
404                first_line(stderr)
405            );
406        }
407        if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
408            return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
409                .to_string();
410        }
411        if stderr.contains("nix: command not found") || stderr.contains("No such file") {
412            return "nix binary not found — is Nix installed?".to_string();
413        }
414        // User code errors (exit 1 from the stage wrapper).
415        let code_str = exit_code
416            .map(|c| format!(" (exit {c})"))
417            .unwrap_or_default();
418        if stderr.trim().is_empty() {
419            format!("stage exited without output{code_str}")
420        } else {
421            format!("stage error{code_str}: {stderr}")
422        }
423    }
424
425    /// Build the nix subcommand + argument list for running a stage script.
426    ///
427    /// - Python with no third-party imports: `nix run nixpkgs#python3 -- script.py`
428    /// - Python with third-party imports:    `nix shell nixpkgs#python3Packages.X ... --command python3 script.py`
429    /// - JS/Bash: `nix run nixpkgs#<runtime> -- script`
430    fn build_nix_command(
431        &self,
432        language: &str,
433        script: &Path,
434        code: &str,
435    ) -> (String, Vec<String>) {
436        let script_path = script.to_str().unwrap_or("/dev/null").to_string();
437
438        match language {
439            "python" | "python3" | "" => {
440                // If the code has `# requires:` with pip packages, use a venv
441                // with system Python instead of Nix (Nix's python3Packages
442                // don't reliably work with `nix shell`).
443                if let Some(reqs) = Self::extract_pip_requirements(code) {
444                    let venv_hash = {
445                        use sha2::{Digest, Sha256};
446                        let h = Sha256::digest(reqs.as_bytes());
447                        hex::encode(&h[..8])
448                    };
449                    let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
450                    let venv_str = venv_dir.to_string_lossy().to_string();
451                    let python = venv_dir.join("bin").join("python3");
452                    let python_str = python.to_string_lossy().to_string();
453
454                    // Create venv + install deps if not cached
455                    if !python.exists() {
456                        let setup = std::process::Command::new("python3")
457                            .args(["-m", "venv", &venv_str])
458                            .output();
459                        if let Ok(out) = setup {
460                            if out.status.success() {
461                                let pip = venv_dir.join("bin").join("pip");
462                                let pkgs: Vec<&str> = reqs.split(", ").collect();
463                                let mut pip_args =
464                                    vec!["install", "--quiet", "--disable-pip-version-check"];
465                                pip_args.extend(pkgs);
466                                let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
467                                    .args(&pip_args)
468                                    .output();
469                            }
470                        }
471                    }
472
473                    // Run with the venv Python directly (no nix)
474                    return ("__direct__".to_string(), vec![python_str, script_path]);
475                }
476
477                let extra_pkgs = Self::detect_python_packages(code);
478                if extra_pkgs.is_empty() {
479                    (
480                        "run".to_string(),
481                        vec!["nixpkgs#python3".into(), "--".into(), script_path],
482                    )
483                } else {
484                    let mut args: Vec<String> = extra_pkgs
485                        .iter()
486                        .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
487                        .collect();
488                    args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
489                    ("shell".to_string(), args)
490                }
491            }
492            "javascript" | "js" => (
493                "run".to_string(),
494                vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
495            ),
496            _ => (
497                "run".to_string(),
498                vec!["nixpkgs#bash".into(), "--".into(), script_path],
499            ),
500        }
501    }
502
503    /// Extract pip requirements from `# requires: pkg1==ver, pkg2==ver` comments.
504    ///
505    /// Each spec is validated to prevent typosquatting and shell-metacharacter
506    /// injection from LLM-authored stages. By default, every package must be
507    /// pinned to an exact version (`pkg==1.2.3`). Set
508    /// `NOETHER_ALLOW_UNPINNED_PIP=1` to lift the pinning requirement for
509    /// local development; the character-set validation always runs.
510    ///
511    /// Invalid specs are dropped with a warning; if no valid specs remain,
512    /// this returns `None` and the caller falls back to the default Nix
513    /// runtime (where the missing dependency will surface as an honest
514    /// runtime error instead of a silent pip-install of attacker-chosen
515    /// names).
516    fn extract_pip_requirements(code: &str) -> Option<String> {
517        for line in code.lines() {
518            let trimmed = line.trim();
519            if trimmed.starts_with("# requires:") {
520                let reqs = trimmed.strip_prefix("# requires:").unwrap().trim();
521                if reqs.is_empty() {
522                    continue;
523                }
524                let valid: Vec<String> = reqs
525                    .split(',')
526                    .map(|s| s.trim())
527                    .filter(|s| !s.is_empty())
528                    .filter(|s| match validate_pip_spec(s) {
529                        Ok(()) => true,
530                        Err(reason) => {
531                            eprintln!(
532                                "[noether] rejected `# requires:` entry {s:?} ({reason}); skipping"
533                            );
534                            false
535                        }
536                    })
537                    .map(|s| s.to_string())
538                    .collect();
539
540                if valid.is_empty() {
541                    eprintln!(
542                        "[noether] all `# requires:` entries rejected (raw={reqs:?}); falling back to default Nix runtime"
543                    );
544                    return None;
545                }
546                return Some(valid.join(", "));
547            }
548        }
549        None
550    }
551
552    /// Scan Python source for `import X` / `from X import` statements and return
553    /// the Nix package names for any recognised third-party libraries.
554    fn detect_python_packages(code: &str) -> Vec<&'static str> {
555        // Map of Python import name → nixpkgs python3Packages attribute name.
556        const KNOWN: &[(&str, &str)] = &[
557            ("requests", "requests"),
558            ("httpx", "httpx"),
559            ("aiohttp", "aiohttp"),
560            ("bs4", "beautifulsoup4"),
561            ("lxml", "lxml"),
562            ("pandas", "pandas"),
563            ("numpy", "numpy"),
564            ("scipy", "scipy"),
565            ("sklearn", "scikit-learn"),
566            ("PIL", "Pillow"),
567            ("cv2", "opencv4"),
568            ("yaml", "pyyaml"),
569            ("toml", "toml"),
570            ("dateutil", "python-dateutil"),
571            ("pytz", "pytz"),
572            ("boto3", "boto3"),
573            ("psycopg2", "psycopg2"),
574            ("pymongo", "pymongo"),
575            ("redis", "redis"),
576            ("celery", "celery"),
577            ("fastapi", "fastapi"),
578            ("pydantic", "pydantic"),
579            ("cryptography", "cryptography"),
580            ("jwt", "pyjwt"),
581            ("paramiko", "paramiko"),
582            ("dotenv", "python-dotenv"),
583            ("joblib", "joblib"),
584            ("torch", "pytorch"),
585            ("transformers", "transformers"),
586            ("datasets", "datasets"),
587            ("pyarrow", "pyarrow"),
588        ];
589
590        let mut found: Vec<&'static str> = Vec::new();
591        for (import_name, nix_name) in KNOWN {
592            let patterns = [
593                format!("import {import_name}"),
594                format!("import {import_name} "),
595                format!("from {import_name} "),
596                format!("from {import_name}."),
597            ];
598            if patterns.iter().any(|p| code.contains(p.as_str())) {
599                found.push(nix_name);
600            }
601        }
602        found
603    }
604
605    // ── Language wrappers ────────────────────────────────────────────────────
606
607    #[cfg(test)]
608    #[allow(dead_code)]
609    fn _expose_extract_future_imports(code: &str) -> (String, String) {
610        Self::extract_future_imports(code)
611    }
612
613    /// Pull every `from __future__ import ...` line out of `code` and return
614    /// `(joined_future_imports, code_without_them)`. The future imports are
615    /// returned with trailing newlines so the caller can embed them directly
616    /// at the top of the wrapper. Detection is line-based (no AST) — matches
617    /// any non-indented line starting with `from __future__ import`.
618    fn extract_future_imports(code: &str) -> (String, String) {
619        let mut hoisted = String::new();
620        let mut remaining = String::new();
621        for line in code.lines() {
622            let trimmed = line.trim_start();
623            if !line.starts_with(' ')
624                && !line.starts_with('\t')
625                && trimmed.starts_with("from __future__ import")
626            {
627                hoisted.push_str(line);
628                hoisted.push('\n');
629            } else {
630                remaining.push_str(line);
631                remaining.push('\n');
632            }
633        }
634        (hoisted, remaining)
635    }
636
637    fn wrap_python(user_code: &str) -> String {
638        // Skip pip install — dependencies are handled by the venv executor
639        // (build_nix_command creates a venv with pip packages pre-installed)
640        // or by Nix packages (for known imports like numpy, pandas, etc.).
641        let pip_install = String::new();
642
643        // Hoist any `from __future__ import ...` lines out of user code and
644        // emit them as the very first statements of the wrapper. Python
645        // requires `__future__` imports to be the first non-comment,
646        // non-docstring statement in a module — leaving them embedded in the
647        // user-code block (which is line ~17 of the wrapped file) raises
648        // `SyntaxError: from __future__ imports must occur at the
649        // beginning of the file`.
650        let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
651
652        format!(
653            r#"{future_imports}import sys, json as _json
654{pip_install}
655# ---- user implementation ----
656{user_code_clean}
657# ---- end implementation ----
658
659if __name__ == '__main__':
660    if 'execute' not in dir() or not callable(globals().get('execute')):
661        print(
662            "Noether stage error: implementation must define a top-level "
663            "function `def execute(input): ...` that takes the parsed input dict "
664            "and returns the output dict. Do not read from stdin or print to stdout — "
665            "the Noether runtime handles I/O for you.",
666            file=sys.stderr,
667        )
668        sys.exit(1)
669    try:
670        _raw = _json.loads(sys.stdin.read())
671        # If the runtime passed input as a JSON-encoded string, decode it once more.
672        # This happens when input arrives as null or a bare string from the CLI.
673        if isinstance(_raw, str):
674            try:
675                _raw = _json.loads(_raw)
676            except Exception:
677                pass
678        _output = execute(_raw if _raw is not None else {{}})
679        print(_json.dumps(_output))
680    except Exception as _e:
681        print(str(_e), file=sys.stderr)
682        sys.exit(1)
683"#
684        )
685    }
686
687    fn wrap_javascript(user_code: &str) -> String {
688        format!(
689            r#"const _readline = require('readline');
690let _input = '';
691process.stdin.on('data', d => _input += d);
692process.stdin.on('end', () => {{
693    try {{
694        // ---- user implementation ----
695        {user_code}
696        // ---- end implementation ----
697        const _result = execute(JSON.parse(_input));
698        process.stdout.write(JSON.stringify(_result) + '\n');
699    }} catch (e) {{
700        process.stderr.write(String(e) + '\n');
701        process.exit(1);
702    }}
703}});
704"#
705        )
706    }
707
708    fn wrap_bash(user_code: &str) -> String {
709        format!(
710            r#"#!/usr/bin/env bash
711set -euo pipefail
712INPUT=$(cat)
713
714# ---- user implementation ----
715{user_code}
716# ---- end implementation ----
717
718execute "$INPUT"
719"#
720        )
721    }
722}
723
724// ── Helpers ──────────────────────────────────────────────────────────────────
725
726/// Return the first non-empty line of a multi-line string, trimmed.
727fn first_line(s: &str) -> &str {
728    s.lines()
729        .map(str::trim)
730        .find(|l| !l.is_empty())
731        .unwrap_or(s)
732}
733
734/// Validate a single pip requirement spec from a `# requires:` comment.
735///
736/// Accepts `pkg==version`. The package name must match PEP 503 normalisation
737/// (letters, digits, `_`, `-`, `.`). The version must be a straightforward
738/// PEP 440-ish literal (letters, digits, `.`, `+`, `!`, `-`). The pinning
739/// requirement (`==`) can be lifted with `NOETHER_ALLOW_UNPINNED_PIP=1`
740/// for local dev, but the character-set validation always runs so injected
741/// shell metacharacters, quotes, or URL-form specs are rejected.
742fn validate_pip_spec(spec: &str) -> Result<(), &'static str> {
743    let allow_unpinned = matches!(
744        std::env::var("NOETHER_ALLOW_UNPINNED_PIP").as_deref(),
745        Ok("1" | "true" | "yes" | "on")
746    );
747
748    // Split at the first `==`. If absent, require the opt-in flag.
749    let (name, version) = match spec.split_once("==") {
750        Some((n, v)) => (n.trim(), Some(v.trim())),
751        None => {
752            if !allow_unpinned {
753                return Err("unpinned; use pkg==version or set NOETHER_ALLOW_UNPINNED_PIP=1");
754            }
755            (spec.trim(), None)
756        }
757    };
758
759    if name.is_empty() {
760        return Err("empty package name");
761    }
762    if !name
763        .bytes()
764        .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'_' | b'-' | b'.'))
765    {
766        return Err("package name contains disallowed characters");
767    }
768    if let Some(v) = version {
769        if v.is_empty() {
770            return Err("empty version after `==`");
771        }
772        if !v
773            .bytes()
774            .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'+' | b'!' | b'-'))
775        {
776            return Err("version contains disallowed characters");
777        }
778    }
779    Ok(())
780}
781
782// ── StageExecutor impl ────────────────────────────────────────────────────────
783
784impl StageExecutor for NixExecutor {
785    fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
786        let impl_ = self
787            .implementations
788            .get(&stage_id.0)
789            .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
790
791        let code_hash = Self::code_hash(&impl_.code);
792        let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
793        self.run_script(stage_id, &script, &impl_.language, input)
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800
801    #[allow(dead_code)] // only used by the ignored integration tests
802    fn make_executor() -> NixExecutor {
803        let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
804        let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
805        let _ = std::fs::create_dir_all(&cache_dir);
806        NixExecutor {
807            nix_bin,
808            cache_dir,
809            config: NixConfig::default(),
810            implementations: HashMap::new(),
811        }
812    }
813
814    #[test]
815    fn validate_pip_spec_accepts_pinned() {
816        assert!(validate_pip_spec("pandas==2.0.0").is_ok());
817        assert!(validate_pip_spec("scikit-learn==1.5.1").is_ok());
818        assert!(validate_pip_spec("urllib3==2.2.3").is_ok());
819        assert!(validate_pip_spec("pydantic==2.5.0+cu121").is_ok());
820    }
821
822    #[test]
823    fn validate_pip_spec_rejects_unpinned_by_default() {
824        // Ensure the opt-in flag is not accidentally set in the test env.
825        let guard = (std::env::var_os("NOETHER_ALLOW_UNPINNED_PIP"),);
826        // SAFETY: single-threaded test — no other test reads this var at the same time.
827        unsafe {
828            std::env::remove_var("NOETHER_ALLOW_UNPINNED_PIP");
829        }
830        let result = validate_pip_spec("pandas");
831        // Restore prior state before asserting.
832        if let (Some(prev),) = guard {
833            unsafe {
834                std::env::set_var("NOETHER_ALLOW_UNPINNED_PIP", prev);
835            }
836        }
837        assert!(result.is_err(), "bare name must be rejected without opt-in");
838    }
839
840    #[test]
841    fn validate_pip_spec_rejects_shell_metacharacters() {
842        for bad in [
843            "pandas; rm -rf /",
844            "pandas==$(whoami)",
845            "pandas==1.0.0; echo pwned",
846            "pandas==`id`",
847            "https://evil.example/wheel.whl",
848            "git+https://example.com/repo.git",
849            "pkg with space==1.0",
850            "pkg==1.0 && echo",
851        ] {
852            assert!(validate_pip_spec(bad).is_err(), "should reject {bad:?}");
853        }
854    }
855
856    #[test]
857    fn validate_pip_spec_rejects_empty() {
858        assert!(validate_pip_spec("==1.0").is_err());
859        assert!(validate_pip_spec("pkg==").is_err());
860    }
861
862    #[test]
863    fn detect_python_packages_requests() {
864        let code = "import requests\ndef execute(v):\n    return requests.get(v).json()";
865        let pkgs = NixExecutor::detect_python_packages(code);
866        assert!(
867            pkgs.contains(&"requests"),
868            "expected 'requests' in {pkgs:?}"
869        );
870    }
871
872    #[test]
873    fn detect_python_packages_stdlib_only() {
874        let code = "import urllib.request, json\ndef execute(v):\n    return json.loads(v)";
875        let pkgs = NixExecutor::detect_python_packages(code);
876        assert!(
877            pkgs.is_empty(),
878            "stdlib imports should not trigger packages: {pkgs:?}"
879        );
880    }
881
882    #[test]
883    fn detect_python_packages_multiple() {
884        let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
885        let pkgs = NixExecutor::detect_python_packages(code);
886        assert!(pkgs.contains(&"pandas"));
887        assert!(pkgs.contains(&"numpy"));
888        assert!(pkgs.contains(&"beautifulsoup4"));
889    }
890
891    fn test_executor() -> NixExecutor {
892        NixExecutor {
893            nix_bin: PathBuf::from("/usr/bin/nix"),
894            cache_dir: PathBuf::from("/tmp/noether-test-cache"),
895            config: NixConfig::default(),
896            implementations: HashMap::new(),
897        }
898    }
899
900    #[test]
901    fn build_nix_command_no_packages() {
902        let exec = test_executor();
903        let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
904        assert_eq!(sub, "run");
905        assert!(args.iter().any(|a| a.contains("python3")));
906        assert!(!args.iter().any(|a| a.contains("shell")));
907    }
908
909    #[test]
910    fn build_nix_command_with_requests() {
911        let exec = test_executor();
912        let (sub, args) =
913            exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
914        assert_eq!(sub, "shell");
915        assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
916        assert!(args.iter().any(|a| a == "--command"));
917        // Must NOT include bare nixpkgs#python3 — it conflicts with python3Packages.*
918        assert!(
919            !args.iter().any(|a| a == "nixpkgs#python3"),
920            "bare python3 conflicts: {args:?}"
921        );
922    }
923
924    #[test]
925    fn python_wrapper_contains_boilerplate() {
926        let wrapped = NixExecutor::wrap_python("def execute(x):\n    return x + 1");
927        assert!(wrapped.contains("sys.stdin.read()"));
928        assert!(wrapped.contains("_json.dumps(_output)"));
929        assert!(wrapped.contains("def execute(x)"));
930    }
931
932    #[test]
933    fn code_hash_is_stable() {
934        let h1 = NixExecutor::code_hash("hello world");
935        let h2 = NixExecutor::code_hash("hello world");
936        let h3 = NixExecutor::code_hash("different");
937        assert_eq!(h1, h2);
938        assert_ne!(h1, h3);
939    }
940
941    #[test]
942    fn classify_error_daemon_not_running() {
943        let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
944        assert!(msg.contains("nix daemon is not running"), "got: {msg}");
945    }
946
947    #[test]
948    fn future_imports_are_hoisted_out_of_user_code() {
949        let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n    return input\n";
950        let wrapped = NixExecutor::wrap_python(user);
951        // The future import must come BEFORE `import sys, json as _json`.
952        let future_pos = wrapped
953            .find("from __future__ import annotations")
954            .expect("future import should be present in wrapper");
955        let stdlib_pos = wrapped
956            .find("import sys, json as _json")
957            .expect("stdlib imports should be present");
958        assert!(
959            future_pos < stdlib_pos,
960            "future import must precede stdlib imports in wrapped output"
961        );
962    }
963
964    #[test]
965    fn user_code_without_future_imports_is_unchanged() {
966        let user = "import json\n\ndef execute(input):\n    return input\n";
967        let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
968        assert_eq!(hoisted, "");
969        assert_eq!(remaining.trim(), user.trim());
970    }
971
972    #[test]
973    fn nested_future_import_inside_function_is_not_hoisted() {
974        // Indented "from __future__" lines (inside a function) are not
975        // valid Python anyway, but the hoister must not promote them.
976        let user =
977            "def execute(input):\n    from __future__ import annotations\n    return input\n";
978        let (hoisted, _) = NixExecutor::extract_future_imports(user);
979        assert_eq!(hoisted, "");
980    }
981
982    #[test]
983    fn classify_error_user_code_exit1() {
984        let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
985        assert!(msg.contains("ValueError"), "got: {msg}");
986        assert!(msg.contains("exit 1"), "got: {msg}");
987    }
988
989    #[test]
990    fn classify_error_disk_full() {
991        let msg = NixExecutor::classify_error("No space left on device", Some(1));
992        assert!(msg.contains("disk space"), "got: {msg}");
993    }
994
995    #[test]
996    fn classify_error_empty_stderr() {
997        let msg = NixExecutor::classify_error("", Some(137));
998        assert!(msg.contains("exit 137"), "got: {msg}");
999    }
1000
1001    #[test]
1002    fn nix_config_defaults() {
1003        let cfg = NixConfig::default();
1004        assert_eq!(cfg.timeout_secs, 30);
1005        assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
1006        assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
1007    }
1008
1009    #[test]
1010    fn first_line_extracts_correctly() {
1011        assert_eq!(first_line("  \nfoo\nbar"), "foo");
1012        assert_eq!(first_line("single"), "single");
1013        assert_eq!(first_line(""), "");
1014    }
1015
1016    /// Integration test — runs when nix is available (skips gracefully if not).
1017    /// Requires a warm Nix binary cache; run with `cargo test -- --ignored` to include.
1018    #[test]
1019    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1020    fn nix_python_identity_stage() {
1021        let nix_bin = match NixExecutor::find_nix() {
1022            Some(p) => p,
1023            None => {
1024                eprintln!("nix not found, skipping");
1025                return;
1026            }
1027        };
1028
1029        let cache_dir = std::env::temp_dir().join("noether-nix-integ");
1030        let _ = std::fs::create_dir_all(&cache_dir);
1031
1032        let code = "def execute(x):\n    return x";
1033        let executor = NixExecutor {
1034            nix_bin,
1035            cache_dir,
1036            config: NixConfig::default(),
1037            implementations: {
1038                let mut m = HashMap::new();
1039                let id = StageId("test_identity".into());
1040                m.insert(
1041                    id.0.clone(),
1042                    StageImpl {
1043                        code: code.into(),
1044                        language: "python".into(),
1045                    },
1046                );
1047                m
1048            },
1049        };
1050
1051        let id = StageId("test_identity".into());
1052        let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
1053        assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
1054    }
1055
1056    /// Verify that a stage that hangs returns TimedOut, not a hang.
1057    /// Requires nix + warm binary cache; run with `cargo test -- --ignored`.
1058    #[test]
1059    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1060    fn nix_timeout_kills_hanging_stage() {
1061        let nix_bin = match NixExecutor::find_nix() {
1062            Some(p) => p,
1063            None => {
1064                eprintln!("nix not found, skipping timeout test");
1065                return;
1066            }
1067        };
1068
1069        let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
1070        let _ = std::fs::create_dir_all(&cache_dir);
1071
1072        let code = "import time\ndef execute(x):\n    time.sleep(9999)\n    return x";
1073        let executor = NixExecutor {
1074            nix_bin,
1075            cache_dir,
1076            config: NixConfig {
1077                timeout_secs: 2,
1078                ..NixConfig::default()
1079            },
1080            implementations: {
1081                let mut m = HashMap::new();
1082                let id = StageId("hanging".into());
1083                m.insert(
1084                    id.0.clone(),
1085                    StageImpl {
1086                        code: code.into(),
1087                        language: "python".into(),
1088                    },
1089                );
1090                m
1091            },
1092        };
1093
1094        let id = StageId("hanging".into());
1095        let result = executor.execute(&id, &serde_json::json!(null));
1096        assert!(
1097            matches!(
1098                result,
1099                Err(ExecutionError::TimedOut {
1100                    timeout_secs: 2,
1101                    ..
1102                })
1103            ),
1104            "expected TimedOut, got: {result:?}"
1105        );
1106    }
1107}