Skip to main content

noether_engine/executor/
nix.rs

1//! Nix-based executor for synthesized stages.
2//!
3//! Runs stage implementations as isolated subprocesses using `nix run nixpkgs#<runtime>`,
4//! giving us hermetic, reproducible execution without requiring any ambient language runtime.
5//!
6//! ## Execution protocol
7//!
8//! - stdin  → JSON-encoded input value followed by a newline
9//! - stdout → JSON-encoded output value followed by a newline
10//! - stderr → error message (any content is treated as failure)
11//! - exit 0 → success; exit non-zero → `ExecutionError::StageFailed`
12//!
13//! ## Timeout
14//!
15//! Every execution is bounded by [`NixConfig::timeout_secs`] (default 30 s).
16//! When the child process exceeds the limit it is sent SIGKILL and the call
17//! returns [`ExecutionError::TimedOut`].
18//!
19//! ## Generated wrapper (Python example)
20//!
21//! ```python
22//! import sys, json as _json
23//!
24//! # ---- user code ----
25//! def execute(input_value):
26//!     ...
27//! # -------------------
28//!
29//! if __name__ == '__main__':
30//!     try:
31//!         _output = execute(_json.loads(sys.stdin.read()))
32//!         print(_json.dumps(_output))
33//!     except Exception as e:
34//!         print(str(e), file=sys.stderr)
35//!         sys.exit(1)
36//! ```
37
38use super::{ExecutionError, StageExecutor};
39use noether_core::stage::StageId;
40use serde_json::Value;
41use sha2::{Digest, Sha256};
42use std::collections::HashMap;
43use std::io::Write as IoWrite;
44use std::path::{Path, PathBuf};
45use std::process::{Command, Stdio};
46use std::sync::mpsc;
47use std::time::Duration;
48
49// ── Configuration ────────────────────────────────────────────────────────────
50
51/// Tunable knobs for the [`NixExecutor`].
52#[derive(Debug, Clone)]
53pub struct NixConfig {
54    /// Wall-clock timeout for a single stage execution in seconds.
55    /// The child process is killed with SIGKILL when exceeded.
56    /// Default: 30 s.
57    pub timeout_secs: u64,
58    /// Maximum number of bytes read from a stage's stdout before truncation.
59    /// Prevents runaway allocations from stages that produce huge outputs.
60    /// Default: 10 MiB.
61    pub max_output_bytes: usize,
62    /// Maximum number of bytes captured from stderr (for error messages).
63    /// Default: 64 KiB.
64    pub max_stderr_bytes: usize,
65}
66
67impl Default for NixConfig {
68    fn default() -> Self {
69        Self {
70            timeout_secs: 30,
71            max_output_bytes: 10 * 1024 * 1024,
72            max_stderr_bytes: 64 * 1024,
73        }
74    }
75}
76
77// ── Internal stage storage ───────────────────────────────────────────────────
78
79/// Maps stage IDs to their implementation (source code + language tag).
80#[derive(Clone)]
81struct StageImpl {
82    code: String,
83    language: String,
84}
85
86// ── NixExecutor ──────────────────────────────────────────────────────────────
87
88/// Executor that runs synthesized stages through Nix-managed language runtimes.
89///
90/// When `nix` is available, each stage is executed inside a hermetically isolated
91/// subprocess (e.g. `nix run nixpkgs#python3 -- stage.py`).  The Nix binary cache
92/// ensures the runtime is downloaded once and then reused forever from the store.
93///
94/// ## Resource limits
95///
96/// - **Timeout**: configured via [`NixConfig::timeout_secs`] (default 30 s).
97///   The child is sent SIGKILL when the limit is exceeded.
98/// - **Output cap**: configured via [`NixConfig::max_output_bytes`] (default 10 MiB).
99pub struct NixExecutor {
100    nix_bin: PathBuf,
101    cache_dir: PathBuf,
102    config: NixConfig,
103    implementations: HashMap<String, StageImpl>,
104}
105
106impl NixExecutor {
107    /// Probe the system for a usable `nix` binary.
108    /// Returns the path if found, or `None` if Nix is not installed.
109    pub fn find_nix() -> Option<PathBuf> {
110        // Determinate Systems installer puts nix here:
111        let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
112        if determinate.exists() {
113            return Some(determinate);
114        }
115        // Fallback: check PATH
116        if let Ok(output) = Command::new("which").arg("nix").output() {
117            let p = std::str::from_utf8(&output.stdout)
118                .unwrap_or("")
119                .trim()
120                .to_string();
121            if !p.is_empty() {
122                return Some(PathBuf::from(p));
123            }
124        }
125        None
126    }
127
128    /// Build an executor that can run synthesized stages found in `store`.
129    ///
130    /// Returns `None` when `nix` is not available — callers should fall back to
131    /// `InlineExecutor` exclusively in that case.
132    pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
133        Self::from_store_with_config(store, NixConfig::default())
134    }
135
136    /// Like [`from_store`] but with a custom [`NixConfig`].
137    pub fn from_store_with_config(
138        store: &dyn noether_store::StageStore,
139        config: NixConfig,
140    ) -> Option<Self> {
141        let nix_bin = Self::find_nix()?;
142
143        let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
144        let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
145        let _ = std::fs::create_dir_all(&cache_dir);
146
147        let mut implementations = HashMap::new();
148        for stage in store.list(None) {
149            if let (Some(code), Some(lang)) =
150                (&stage.implementation_code, &stage.implementation_language)
151            {
152                implementations.insert(
153                    stage.id.0.clone(),
154                    StageImpl {
155                        code: code.clone(),
156                        language: lang.clone(),
157                    },
158                );
159            }
160        }
161
162        Some(Self {
163            nix_bin,
164            cache_dir,
165            config,
166            implementations,
167        })
168    }
169
170    /// Register an in-memory synthesized stage without re-querying the store.
171    pub fn register(&mut self, stage_id: &StageId, code: &str, language: &str) {
172        self.implementations.insert(
173            stage_id.0.clone(),
174            StageImpl {
175                code: code.into(),
176                language: language.into(),
177            },
178        );
179    }
180
181    /// True when we have a real implementation for this stage.
182    pub fn has_implementation(&self, stage_id: &StageId) -> bool {
183        self.implementations.contains_key(&stage_id.0)
184    }
185
186    /// Pre-fetch the Python 3 runtime into the Nix store in a background thread.
187    ///
188    /// The first time any Python stage runs, Nix may take several seconds to
189    /// download and verify the runtime closure.  Calling `warmup()` at startup
190    /// overlaps that latency with application boot time.
191    ///
192    /// The returned `JoinHandle` can be ignored — any error is logged to stderr
193    /// but does not affect correctness; the runtime will still be fetched on first
194    /// actual use.
195    pub fn warmup(&self) -> std::thread::JoinHandle<()> {
196        let nix_bin = self.nix_bin.clone();
197        std::thread::spawn(move || {
198            // `nix build` with `--dry-run` is enough to populate the binary cache
199            // without running any user code.
200            let status = Command::new(&nix_bin)
201                .args([
202                    "build",
203                    "--no-link",
204                    "--quiet",
205                    "--no-write-lock-file",
206                    "nixpkgs#python3",
207                ])
208                .stdout(Stdio::null())
209                .stderr(Stdio::null())
210                .status();
211            match status {
212                Ok(s) if s.success() => {
213                    eprintln!("[noether] nix warmup: python3 runtime cached");
214                }
215                Ok(s) => {
216                    eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
217                }
218                Err(e) => {
219                    eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
220                }
221            }
222        })
223    }
224
225    // ── Internal helpers ─────────────────────────────────────────────────────
226
227    /// Hash the code string to get a stable cache key.
228    fn code_hash(code: &str) -> String {
229        hex::encode(Sha256::digest(code.as_bytes()))
230    }
231
232    /// Ensure the wrapped script for `impl_hash` exists on disk.
233    /// Returns the path to the file.
234    fn ensure_script(
235        &self,
236        impl_hash: &str,
237        code: &str,
238        language: &str,
239    ) -> Result<PathBuf, ExecutionError> {
240        let ext = match language {
241            "javascript" | "js" => "js",
242            "bash" | "sh" => "sh",
243            _ => "py",
244        };
245
246        let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
247        if path.exists() {
248            return Ok(path);
249        }
250
251        let wrapped = match language {
252            "javascript" | "js" => Self::wrap_javascript(code),
253            "bash" | "sh" => Self::wrap_bash(code),
254            _ => Self::wrap_python(code),
255        };
256
257        std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
258            stage_id: StageId(impl_hash.into()),
259            message: format!("failed to write stage script: {e}"),
260        })?;
261
262        Ok(path)
263    }
264
265    /// Run the stage script via Nix with JSON on stdin, enforcing timeout and
266    /// output-size limits.
267    fn run_script(
268        &self,
269        stage_id: &StageId,
270        script: &Path,
271        language: &str,
272        input: &Value,
273    ) -> Result<Value, ExecutionError> {
274        let input_json = serde_json::to_string(input).unwrap_or_default();
275
276        let code = self
277            .implementations
278            .get(&stage_id.0)
279            .map(|i| i.code.as_str())
280            .unwrap_or("");
281
282        let (nix_subcommand, args) = self.build_nix_command(language, script, code);
283
284        // __direct__ means run the binary directly (venv Python), not via nix
285        let mut child = if nix_subcommand == "__direct__" {
286            Command::new(&args[0])
287                .args(&args[1..])
288                .stdin(Stdio::piped())
289                .stdout(Stdio::piped())
290                .stderr(Stdio::piped())
291                .spawn()
292        } else {
293            Command::new(&self.nix_bin)
294                .arg(&nix_subcommand)
295                .args(["--no-write-lock-file", "--quiet"])
296                .args(&args)
297                .stdin(Stdio::piped())
298                .stdout(Stdio::piped())
299                .stderr(Stdio::piped())
300                .spawn()
301        }
302        .map_err(|e| ExecutionError::StageFailed {
303            stage_id: stage_id.clone(),
304            message: format!("failed to spawn process: {e}"),
305        })?;
306
307        // Write stdin in a background thread so we don't deadlock when the
308        // child's stdin pipe fills before we start reading stdout.
309        if let Some(mut stdin) = child.stdin.take() {
310            let bytes = input_json.into_bytes();
311            std::thread::spawn(move || {
312                let _ = stdin.write_all(&bytes);
313            });
314        }
315
316        // Collect output with a wall-clock timeout.
317        let pid = child.id();
318        let timeout = Duration::from_secs(self.config.timeout_secs);
319        let (tx, rx) = mpsc::channel();
320        std::thread::spawn(move || {
321            let _ = tx.send(child.wait_with_output());
322        });
323
324        let out = match rx.recv_timeout(timeout) {
325            Ok(Ok(o)) => o,
326            Ok(Err(e)) => {
327                return Err(ExecutionError::StageFailed {
328                    stage_id: stage_id.clone(),
329                    message: format!("nix process error: {e}"),
330                });
331            }
332            Err(_elapsed) => {
333                // Best-effort kill — process may already have exited.
334                let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
335                return Err(ExecutionError::TimedOut {
336                    stage_id: stage_id.clone(),
337                    timeout_secs: self.config.timeout_secs,
338                });
339            }
340        };
341
342        // Truncate stderr to avoid huge allocations from noisy runtimes.
343        let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
344        let stderr = String::from_utf8_lossy(stderr_raw);
345
346        if !out.status.success() {
347            return Err(ExecutionError::StageFailed {
348                stage_id: stage_id.clone(),
349                message: Self::classify_error(&stderr, out.status.code()),
350            });
351        }
352
353        // Truncate stdout to the configured limit.
354        let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
355        let stdout = String::from_utf8_lossy(stdout_raw);
356
357        if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
358            return Err(ExecutionError::StageFailed {
359                stage_id: stage_id.clone(),
360                message: format!(
361                    "stage output exceeded {} bytes limit",
362                    self.config.max_output_bytes
363                ),
364            });
365        }
366
367        serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
368            stage_id: stage_id.clone(),
369            message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
370        })
371    }
372
373    /// Classify a non-zero exit into a human-readable message, distinguishing
374    /// Nix infrastructure errors from user code errors.
375    fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
376        // Nix daemon / networking errors.
377        if stderr.contains("cannot connect to nix daemon")
378            || stderr.contains("Cannot connect to the Nix daemon")
379        {
380            return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
381                    or `nix daemon`"
382                .to_string();
383        }
384        if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
385            return format!(
386                "nix flake error (check network / nixpkgs access): {}",
387                first_line(stderr)
388            );
389        }
390        if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
391            return format!(
392                "nix failed to fetch runtime package (check network): {}",
393                first_line(stderr)
394            );
395        }
396        if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
397            return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
398                .to_string();
399        }
400        if stderr.contains("nix: command not found") || stderr.contains("No such file") {
401            return "nix binary not found — is Nix installed?".to_string();
402        }
403        // User code errors (exit 1 from the stage wrapper).
404        let code_str = exit_code
405            .map(|c| format!(" (exit {c})"))
406            .unwrap_or_default();
407        if stderr.trim().is_empty() {
408            format!("stage exited without output{code_str}")
409        } else {
410            format!("stage error{code_str}: {stderr}")
411        }
412    }
413
414    /// Build the nix subcommand + argument list for running a stage script.
415    ///
416    /// - Python with no third-party imports: `nix run nixpkgs#python3 -- script.py`
417    /// - Python with third-party imports:    `nix shell nixpkgs#python3Packages.X ... --command python3 script.py`
418    /// - JS/Bash: `nix run nixpkgs#<runtime> -- script`
419    fn build_nix_command(
420        &self,
421        language: &str,
422        script: &Path,
423        code: &str,
424    ) -> (String, Vec<String>) {
425        let script_path = script.to_str().unwrap_or("/dev/null").to_string();
426
427        match language {
428            "python" | "python3" | "" => {
429                // If the code has `# requires:` with pip packages, use a venv
430                // with system Python instead of Nix (Nix's python3Packages
431                // don't reliably work with `nix shell`).
432                if let Some(reqs) = Self::extract_pip_requirements(code) {
433                    let venv_hash = {
434                        use sha2::{Digest, Sha256};
435                        let h = Sha256::digest(reqs.as_bytes());
436                        hex::encode(&h[..8])
437                    };
438                    let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
439                    let venv_str = venv_dir.to_string_lossy().to_string();
440                    let python = venv_dir.join("bin").join("python3");
441                    let python_str = python.to_string_lossy().to_string();
442
443                    // Create venv + install deps if not cached
444                    if !python.exists() {
445                        let setup = std::process::Command::new("python3")
446                            .args(["-m", "venv", &venv_str])
447                            .output();
448                        if let Ok(out) = setup {
449                            if out.status.success() {
450                                let pip = venv_dir.join("bin").join("pip");
451                                let pkgs: Vec<&str> = reqs.split(", ").collect();
452                                let mut pip_args =
453                                    vec!["install", "--quiet", "--disable-pip-version-check"];
454                                pip_args.extend(pkgs);
455                                let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
456                                    .args(&pip_args)
457                                    .output();
458                            }
459                        }
460                    }
461
462                    // Run with the venv Python directly (no nix)
463                    return ("__direct__".to_string(), vec![python_str, script_path]);
464                }
465
466                let extra_pkgs = Self::detect_python_packages(code);
467                if extra_pkgs.is_empty() {
468                    (
469                        "run".to_string(),
470                        vec!["nixpkgs#python3".into(), "--".into(), script_path],
471                    )
472                } else {
473                    let mut args: Vec<String> = extra_pkgs
474                        .iter()
475                        .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
476                        .collect();
477                    args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
478                    ("shell".to_string(), args)
479                }
480            }
481            "javascript" | "js" => (
482                "run".to_string(),
483                vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
484            ),
485            _ => (
486                "run".to_string(),
487                vec!["nixpkgs#bash".into(), "--".into(), script_path],
488            ),
489        }
490    }
491
492    /// Extract pip requirements from `# requires: pkg1==ver, pkg2==ver` comments.
493    fn extract_pip_requirements(code: &str) -> Option<String> {
494        for line in code.lines() {
495            let trimmed = line.trim();
496            if trimmed.starts_with("# requires:") {
497                let reqs = trimmed.strip_prefix("# requires:").unwrap().trim();
498                if !reqs.is_empty() {
499                    return Some(reqs.to_string());
500                }
501            }
502        }
503        None
504    }
505
506    /// Scan Python source for `import X` / `from X import` statements and return
507    /// the Nix package names for any recognised third-party libraries.
508    fn detect_python_packages(code: &str) -> Vec<&'static str> {
509        // Map of Python import name → nixpkgs python3Packages attribute name.
510        const KNOWN: &[(&str, &str)] = &[
511            ("requests", "requests"),
512            ("httpx", "httpx"),
513            ("aiohttp", "aiohttp"),
514            ("bs4", "beautifulsoup4"),
515            ("lxml", "lxml"),
516            ("pandas", "pandas"),
517            ("numpy", "numpy"),
518            ("scipy", "scipy"),
519            ("sklearn", "scikit-learn"),
520            ("PIL", "Pillow"),
521            ("cv2", "opencv4"),
522            ("yaml", "pyyaml"),
523            ("toml", "toml"),
524            ("dateutil", "python-dateutil"),
525            ("pytz", "pytz"),
526            ("boto3", "boto3"),
527            ("psycopg2", "psycopg2"),
528            ("pymongo", "pymongo"),
529            ("redis", "redis"),
530            ("celery", "celery"),
531            ("fastapi", "fastapi"),
532            ("pydantic", "pydantic"),
533            ("cryptography", "cryptography"),
534            ("jwt", "pyjwt"),
535            ("paramiko", "paramiko"),
536            ("dotenv", "python-dotenv"),
537            ("joblib", "joblib"),
538            ("torch", "pytorch"),
539            ("transformers", "transformers"),
540            ("datasets", "datasets"),
541            ("pyarrow", "pyarrow"),
542        ];
543
544        let mut found: Vec<&'static str> = Vec::new();
545        for (import_name, nix_name) in KNOWN {
546            let patterns = [
547                format!("import {import_name}"),
548                format!("import {import_name} "),
549                format!("from {import_name} "),
550                format!("from {import_name}."),
551            ];
552            if patterns.iter().any(|p| code.contains(p.as_str())) {
553                found.push(nix_name);
554            }
555        }
556        found
557    }
558
559    // ── Language wrappers ────────────────────────────────────────────────────
560
561    #[cfg(test)]
562    #[allow(dead_code)]
563    fn _expose_extract_future_imports(code: &str) -> (String, String) {
564        Self::extract_future_imports(code)
565    }
566
567    /// Pull every `from __future__ import ...` line out of `code` and return
568    /// `(joined_future_imports, code_without_them)`. The future imports are
569    /// returned with trailing newlines so the caller can embed them directly
570    /// at the top of the wrapper. Detection is line-based (no AST) — matches
571    /// any non-indented line starting with `from __future__ import`.
572    fn extract_future_imports(code: &str) -> (String, String) {
573        let mut hoisted = String::new();
574        let mut remaining = String::new();
575        for line in code.lines() {
576            let trimmed = line.trim_start();
577            if !line.starts_with(' ')
578                && !line.starts_with('\t')
579                && trimmed.starts_with("from __future__ import")
580            {
581                hoisted.push_str(line);
582                hoisted.push('\n');
583            } else {
584                remaining.push_str(line);
585                remaining.push('\n');
586            }
587        }
588        (hoisted, remaining)
589    }
590
591    fn wrap_python(user_code: &str) -> String {
592        // Skip pip install — dependencies are handled by the venv executor
593        // (build_nix_command creates a venv with pip packages pre-installed)
594        // or by Nix packages (for known imports like numpy, pandas, etc.).
595        let pip_install = String::new();
596
597        // Hoist any `from __future__ import ...` lines out of user code and
598        // emit them as the very first statements of the wrapper. Python
599        // requires `__future__` imports to be the first non-comment,
600        // non-docstring statement in a module — leaving them embedded in the
601        // user-code block (which is line ~17 of the wrapped file) raises
602        // `SyntaxError: from __future__ imports must occur at the
603        // beginning of the file`.
604        let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
605
606        format!(
607            r#"{future_imports}import sys, json as _json
608{pip_install}
609# ---- user implementation ----
610{user_code_clean}
611# ---- end implementation ----
612
613if __name__ == '__main__':
614    if 'execute' not in dir() or not callable(globals().get('execute')):
615        print(
616            "Noether stage error: implementation must define a top-level "
617            "function `def execute(input): ...` that takes the parsed input dict "
618            "and returns the output dict. Do not read from stdin or print to stdout — "
619            "the Noether runtime handles I/O for you.",
620            file=sys.stderr,
621        )
622        sys.exit(1)
623    try:
624        _raw = _json.loads(sys.stdin.read())
625        # If the runtime passed input as a JSON-encoded string, decode it once more.
626        # This happens when input arrives as null or a bare string from the CLI.
627        if isinstance(_raw, str):
628            try:
629                _raw = _json.loads(_raw)
630            except Exception:
631                pass
632        _output = execute(_raw if _raw is not None else {{}})
633        print(_json.dumps(_output))
634    except Exception as _e:
635        print(str(_e), file=sys.stderr)
636        sys.exit(1)
637"#
638        )
639    }
640
641    fn wrap_javascript(user_code: &str) -> String {
642        format!(
643            r#"const _readline = require('readline');
644let _input = '';
645process.stdin.on('data', d => _input += d);
646process.stdin.on('end', () => {{
647    try {{
648        // ---- user implementation ----
649        {user_code}
650        // ---- end implementation ----
651        const _result = execute(JSON.parse(_input));
652        process.stdout.write(JSON.stringify(_result) + '\n');
653    }} catch (e) {{
654        process.stderr.write(String(e) + '\n');
655        process.exit(1);
656    }}
657}});
658"#
659        )
660    }
661
662    fn wrap_bash(user_code: &str) -> String {
663        format!(
664            r#"#!/usr/bin/env bash
665set -euo pipefail
666INPUT=$(cat)
667
668# ---- user implementation ----
669{user_code}
670# ---- end implementation ----
671
672execute "$INPUT"
673"#
674        )
675    }
676}
677
678// ── Helpers ──────────────────────────────────────────────────────────────────
679
680/// Return the first non-empty line of a multi-line string, trimmed.
681fn first_line(s: &str) -> &str {
682    s.lines()
683        .map(str::trim)
684        .find(|l| !l.is_empty())
685        .unwrap_or(s)
686}
687
688// ── StageExecutor impl ────────────────────────────────────────────────────────
689
690impl StageExecutor for NixExecutor {
691    fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
692        let impl_ = self
693            .implementations
694            .get(&stage_id.0)
695            .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
696
697        let code_hash = Self::code_hash(&impl_.code);
698        let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
699        self.run_script(stage_id, &script, &impl_.language, input)
700    }
701}
702
703#[cfg(test)]
704mod tests {
705    use super::*;
706
707    #[allow(dead_code)] // only used by the ignored integration tests
708    fn make_executor() -> NixExecutor {
709        let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
710        let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
711        let _ = std::fs::create_dir_all(&cache_dir);
712        NixExecutor {
713            nix_bin,
714            cache_dir,
715            config: NixConfig::default(),
716            implementations: HashMap::new(),
717        }
718    }
719
720    #[test]
721    fn detect_python_packages_requests() {
722        let code = "import requests\ndef execute(v):\n    return requests.get(v).json()";
723        let pkgs = NixExecutor::detect_python_packages(code);
724        assert!(
725            pkgs.contains(&"requests"),
726            "expected 'requests' in {pkgs:?}"
727        );
728    }
729
730    #[test]
731    fn detect_python_packages_stdlib_only() {
732        let code = "import urllib.request, json\ndef execute(v):\n    return json.loads(v)";
733        let pkgs = NixExecutor::detect_python_packages(code);
734        assert!(
735            pkgs.is_empty(),
736            "stdlib imports should not trigger packages: {pkgs:?}"
737        );
738    }
739
740    #[test]
741    fn detect_python_packages_multiple() {
742        let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
743        let pkgs = NixExecutor::detect_python_packages(code);
744        assert!(pkgs.contains(&"pandas"));
745        assert!(pkgs.contains(&"numpy"));
746        assert!(pkgs.contains(&"beautifulsoup4"));
747    }
748
749    fn test_executor() -> NixExecutor {
750        NixExecutor {
751            nix_bin: PathBuf::from("/usr/bin/nix"),
752            cache_dir: PathBuf::from("/tmp/noether-test-cache"),
753            config: NixConfig::default(),
754            implementations: HashMap::new(),
755        }
756    }
757
758    #[test]
759    fn build_nix_command_no_packages() {
760        let exec = test_executor();
761        let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
762        assert_eq!(sub, "run");
763        assert!(args.iter().any(|a| a.contains("python3")));
764        assert!(!args.iter().any(|a| a.contains("shell")));
765    }
766
767    #[test]
768    fn build_nix_command_with_requests() {
769        let exec = test_executor();
770        let (sub, args) =
771            exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
772        assert_eq!(sub, "shell");
773        assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
774        assert!(args.iter().any(|a| a == "--command"));
775        // Must NOT include bare nixpkgs#python3 — it conflicts with python3Packages.*
776        assert!(
777            !args.iter().any(|a| a == "nixpkgs#python3"),
778            "bare python3 conflicts: {args:?}"
779        );
780    }
781
782    #[test]
783    fn python_wrapper_contains_boilerplate() {
784        let wrapped = NixExecutor::wrap_python("def execute(x):\n    return x + 1");
785        assert!(wrapped.contains("sys.stdin.read()"));
786        assert!(wrapped.contains("_json.dumps(_output)"));
787        assert!(wrapped.contains("def execute(x)"));
788    }
789
790    #[test]
791    fn code_hash_is_stable() {
792        let h1 = NixExecutor::code_hash("hello world");
793        let h2 = NixExecutor::code_hash("hello world");
794        let h3 = NixExecutor::code_hash("different");
795        assert_eq!(h1, h2);
796        assert_ne!(h1, h3);
797    }
798
799    #[test]
800    fn classify_error_daemon_not_running() {
801        let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
802        assert!(msg.contains("nix daemon is not running"), "got: {msg}");
803    }
804
805    #[test]
806    fn future_imports_are_hoisted_out_of_user_code() {
807        let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n    return input\n";
808        let wrapped = NixExecutor::wrap_python(user);
809        // The future import must come BEFORE `import sys, json as _json`.
810        let future_pos = wrapped
811            .find("from __future__ import annotations")
812            .expect("future import should be present in wrapper");
813        let stdlib_pos = wrapped
814            .find("import sys, json as _json")
815            .expect("stdlib imports should be present");
816        assert!(
817            future_pos < stdlib_pos,
818            "future import must precede stdlib imports in wrapped output"
819        );
820    }
821
822    #[test]
823    fn user_code_without_future_imports_is_unchanged() {
824        let user = "import json\n\ndef execute(input):\n    return input\n";
825        let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
826        assert_eq!(hoisted, "");
827        assert_eq!(remaining.trim(), user.trim());
828    }
829
830    #[test]
831    fn nested_future_import_inside_function_is_not_hoisted() {
832        // Indented "from __future__" lines (inside a function) are not
833        // valid Python anyway, but the hoister must not promote them.
834        let user =
835            "def execute(input):\n    from __future__ import annotations\n    return input\n";
836        let (hoisted, _) = NixExecutor::extract_future_imports(user);
837        assert_eq!(hoisted, "");
838    }
839
840    #[test]
841    fn classify_error_user_code_exit1() {
842        let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
843        assert!(msg.contains("ValueError"), "got: {msg}");
844        assert!(msg.contains("exit 1"), "got: {msg}");
845    }
846
847    #[test]
848    fn classify_error_disk_full() {
849        let msg = NixExecutor::classify_error("No space left on device", Some(1));
850        assert!(msg.contains("disk space"), "got: {msg}");
851    }
852
853    #[test]
854    fn classify_error_empty_stderr() {
855        let msg = NixExecutor::classify_error("", Some(137));
856        assert!(msg.contains("exit 137"), "got: {msg}");
857    }
858
859    #[test]
860    fn nix_config_defaults() {
861        let cfg = NixConfig::default();
862        assert_eq!(cfg.timeout_secs, 30);
863        assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
864        assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
865    }
866
867    #[test]
868    fn first_line_extracts_correctly() {
869        assert_eq!(first_line("  \nfoo\nbar"), "foo");
870        assert_eq!(first_line("single"), "single");
871        assert_eq!(first_line(""), "");
872    }
873
874    /// Integration test — runs when nix is available (skips gracefully if not).
875    /// Requires a warm Nix binary cache; run with `cargo test -- --ignored` to include.
876    #[test]
877    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
878    fn nix_python_identity_stage() {
879        let nix_bin = match NixExecutor::find_nix() {
880            Some(p) => p,
881            None => {
882                eprintln!("nix not found, skipping");
883                return;
884            }
885        };
886
887        let cache_dir = std::env::temp_dir().join("noether-nix-integ");
888        let _ = std::fs::create_dir_all(&cache_dir);
889
890        let code = "def execute(x):\n    return x";
891        let executor = NixExecutor {
892            nix_bin,
893            cache_dir,
894            config: NixConfig::default(),
895            implementations: {
896                let mut m = HashMap::new();
897                let id = StageId("test_identity".into());
898                m.insert(
899                    id.0.clone(),
900                    StageImpl {
901                        code: code.into(),
902                        language: "python".into(),
903                    },
904                );
905                m
906            },
907        };
908
909        let id = StageId("test_identity".into());
910        let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
911        assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
912    }
913
914    /// Verify that a stage that hangs returns TimedOut, not a hang.
915    /// Requires nix + warm binary cache; run with `cargo test -- --ignored`.
916    #[test]
917    #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
918    fn nix_timeout_kills_hanging_stage() {
919        let nix_bin = match NixExecutor::find_nix() {
920            Some(p) => p,
921            None => {
922                eprintln!("nix not found, skipping timeout test");
923                return;
924            }
925        };
926
927        let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
928        let _ = std::fs::create_dir_all(&cache_dir);
929
930        let code = "import time\ndef execute(x):\n    time.sleep(9999)\n    return x";
931        let executor = NixExecutor {
932            nix_bin,
933            cache_dir,
934            config: NixConfig {
935                timeout_secs: 2,
936                ..NixConfig::default()
937            },
938            implementations: {
939                let mut m = HashMap::new();
940                let id = StageId("hanging".into());
941                m.insert(
942                    id.0.clone(),
943                    StageImpl {
944                        code: code.into(),
945                        language: "python".into(),
946                    },
947                );
948                m
949            },
950        };
951
952        let id = StageId("hanging".into());
953        let result = executor.execute(&id, &serde_json::json!(null));
954        assert!(
955            matches!(
956                result,
957                Err(ExecutionError::TimedOut {
958                    timeout_secs: 2,
959                    ..
960                })
961            ),
962            "expected TimedOut, got: {result:?}"
963        );
964    }
965}