1use super::{ExecutionError, StageExecutor};
39use noether_core::stage::StageId;
40use serde_json::Value;
41use sha2::{Digest, Sha256};
42use std::collections::HashMap;
43use std::io::Write as IoWrite;
44use std::path::{Path, PathBuf};
45use std::process::{Command, Stdio};
46use std::sync::mpsc;
47use std::time::Duration;
48
49#[derive(Debug, Clone)]
53pub struct NixConfig {
54 pub timeout_secs: u64,
58 pub max_output_bytes: usize,
62 pub max_stderr_bytes: usize,
65}
66
67impl Default for NixConfig {
68 fn default() -> Self {
69 Self {
70 timeout_secs: 30,
71 max_output_bytes: 10 * 1024 * 1024,
72 max_stderr_bytes: 64 * 1024,
73 }
74 }
75}
76
77#[derive(Clone)]
81struct StageImpl {
82 code: String,
83 language: String,
84}
85
86pub struct NixExecutor {
100 nix_bin: PathBuf,
101 cache_dir: PathBuf,
102 config: NixConfig,
103 implementations: HashMap<String, StageImpl>,
104}
105
106impl NixExecutor {
107 pub fn find_nix() -> Option<PathBuf> {
110 let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
112 if determinate.exists() {
113 return Some(determinate);
114 }
115 if let Ok(output) = Command::new("which").arg("nix").output() {
117 let p = std::str::from_utf8(&output.stdout)
118 .unwrap_or("")
119 .trim()
120 .to_string();
121 if !p.is_empty() {
122 return Some(PathBuf::from(p));
123 }
124 }
125 None
126 }
127
128 pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
133 Self::from_store_with_config(store, NixConfig::default())
134 }
135
136 pub fn from_store_with_config(
138 store: &dyn noether_store::StageStore,
139 config: NixConfig,
140 ) -> Option<Self> {
141 let nix_bin = Self::find_nix()?;
142
143 let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
144 let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
145 let _ = std::fs::create_dir_all(&cache_dir);
146
147 let mut implementations = HashMap::new();
148 for stage in store.list(None) {
149 if let (Some(code), Some(lang)) =
150 (&stage.implementation_code, &stage.implementation_language)
151 {
152 implementations.insert(
153 stage.id.0.clone(),
154 StageImpl {
155 code: code.clone(),
156 language: lang.clone(),
157 },
158 );
159 }
160 }
161
162 Some(Self {
163 nix_bin,
164 cache_dir,
165 config,
166 implementations,
167 })
168 }
169
170 pub fn register(&mut self, stage_id: &StageId, code: &str, language: &str) {
172 self.implementations.insert(
173 stage_id.0.clone(),
174 StageImpl {
175 code: code.into(),
176 language: language.into(),
177 },
178 );
179 }
180
181 pub fn has_implementation(&self, stage_id: &StageId) -> bool {
183 self.implementations.contains_key(&stage_id.0)
184 }
185
186 pub fn warmup(&self) -> std::thread::JoinHandle<()> {
196 let nix_bin = self.nix_bin.clone();
197 std::thread::spawn(move || {
198 let status = Command::new(&nix_bin)
201 .args([
202 "build",
203 "--no-link",
204 "--quiet",
205 "--no-write-lock-file",
206 "nixpkgs#python3",
207 ])
208 .stdout(Stdio::null())
209 .stderr(Stdio::null())
210 .status();
211 match status {
212 Ok(s) if s.success() => {
213 eprintln!("[noether] nix warmup: python3 runtime cached");
214 }
215 Ok(s) => {
216 eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
217 }
218 Err(e) => {
219 eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
220 }
221 }
222 })
223 }
224
225 fn code_hash(code: &str) -> String {
229 hex::encode(Sha256::digest(code.as_bytes()))
230 }
231
232 fn ensure_script(
235 &self,
236 impl_hash: &str,
237 code: &str,
238 language: &str,
239 ) -> Result<PathBuf, ExecutionError> {
240 let ext = match language {
241 "javascript" | "js" => "js",
242 "bash" | "sh" => "sh",
243 _ => "py",
244 };
245
246 let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
247 if path.exists() {
248 return Ok(path);
249 }
250
251 let wrapped = match language {
252 "javascript" | "js" => Self::wrap_javascript(code),
253 "bash" | "sh" => Self::wrap_bash(code),
254 _ => Self::wrap_python(code),
255 };
256
257 std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
258 stage_id: StageId(impl_hash.into()),
259 message: format!("failed to write stage script: {e}"),
260 })?;
261
262 Ok(path)
263 }
264
265 fn run_script(
268 &self,
269 stage_id: &StageId,
270 script: &Path,
271 language: &str,
272 input: &Value,
273 ) -> Result<Value, ExecutionError> {
274 let input_json = serde_json::to_string(input).unwrap_or_default();
275
276 let code = self
277 .implementations
278 .get(&stage_id.0)
279 .map(|i| i.code.as_str())
280 .unwrap_or("");
281
282 let (nix_subcommand, args) = self.build_nix_command(language, script, code);
283
284 let mut child = if nix_subcommand == "__direct__" {
286 Command::new(&args[0])
287 .args(&args[1..])
288 .stdin(Stdio::piped())
289 .stdout(Stdio::piped())
290 .stderr(Stdio::piped())
291 .spawn()
292 } else {
293 Command::new(&self.nix_bin)
294 .arg(&nix_subcommand)
295 .args(["--no-write-lock-file", "--quiet"])
296 .args(&args)
297 .stdin(Stdio::piped())
298 .stdout(Stdio::piped())
299 .stderr(Stdio::piped())
300 .spawn()
301 }
302 .map_err(|e| ExecutionError::StageFailed {
303 stage_id: stage_id.clone(),
304 message: format!("failed to spawn process: {e}"),
305 })?;
306
307 if let Some(mut stdin) = child.stdin.take() {
310 let bytes = input_json.into_bytes();
311 std::thread::spawn(move || {
312 let _ = stdin.write_all(&bytes);
313 });
314 }
315
316 let pid = child.id();
318 let timeout = Duration::from_secs(self.config.timeout_secs);
319 let (tx, rx) = mpsc::channel();
320 std::thread::spawn(move || {
321 let _ = tx.send(child.wait_with_output());
322 });
323
324 let out = match rx.recv_timeout(timeout) {
325 Ok(Ok(o)) => o,
326 Ok(Err(e)) => {
327 return Err(ExecutionError::StageFailed {
328 stage_id: stage_id.clone(),
329 message: format!("nix process error: {e}"),
330 });
331 }
332 Err(_elapsed) => {
333 let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
335 return Err(ExecutionError::TimedOut {
336 stage_id: stage_id.clone(),
337 timeout_secs: self.config.timeout_secs,
338 });
339 }
340 };
341
342 let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
344 let stderr = String::from_utf8_lossy(stderr_raw);
345
346 if !out.status.success() {
347 return Err(ExecutionError::StageFailed {
348 stage_id: stage_id.clone(),
349 message: Self::classify_error(&stderr, out.status.code()),
350 });
351 }
352
353 let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
355 let stdout = String::from_utf8_lossy(stdout_raw);
356
357 if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
358 return Err(ExecutionError::StageFailed {
359 stage_id: stage_id.clone(),
360 message: format!(
361 "stage output exceeded {} bytes limit",
362 self.config.max_output_bytes
363 ),
364 });
365 }
366
367 serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
368 stage_id: stage_id.clone(),
369 message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
370 })
371 }
372
373 fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
376 if stderr.contains("cannot connect to nix daemon")
378 || stderr.contains("Cannot connect to the Nix daemon")
379 {
380 return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
381 or `nix daemon`"
382 .to_string();
383 }
384 if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
385 return format!(
386 "nix flake error (check network / nixpkgs access): {}",
387 first_line(stderr)
388 );
389 }
390 if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
391 return format!(
392 "nix failed to fetch runtime package (check network): {}",
393 first_line(stderr)
394 );
395 }
396 if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
397 return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
398 .to_string();
399 }
400 if stderr.contains("nix: command not found") || stderr.contains("No such file") {
401 return "nix binary not found — is Nix installed?".to_string();
402 }
403 let code_str = exit_code
405 .map(|c| format!(" (exit {c})"))
406 .unwrap_or_default();
407 if stderr.trim().is_empty() {
408 format!("stage exited without output{code_str}")
409 } else {
410 format!("stage error{code_str}: {stderr}")
411 }
412 }
413
414 fn build_nix_command(
420 &self,
421 language: &str,
422 script: &Path,
423 code: &str,
424 ) -> (String, Vec<String>) {
425 let script_path = script.to_str().unwrap_or("/dev/null").to_string();
426
427 match language {
428 "python" | "python3" | "" => {
429 if let Some(reqs) = Self::extract_pip_requirements(code) {
433 let venv_hash = {
434 use sha2::{Digest, Sha256};
435 let h = Sha256::digest(reqs.as_bytes());
436 hex::encode(&h[..8])
437 };
438 let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
439 let venv_str = venv_dir.to_string_lossy().to_string();
440 let python = venv_dir.join("bin").join("python3");
441 let python_str = python.to_string_lossy().to_string();
442
443 if !python.exists() {
445 let setup = std::process::Command::new("python3")
446 .args(["-m", "venv", &venv_str])
447 .output();
448 if let Ok(out) = setup {
449 if out.status.success() {
450 let pip = venv_dir.join("bin").join("pip");
451 let pkgs: Vec<&str> = reqs.split(", ").collect();
452 let mut pip_args =
453 vec!["install", "--quiet", "--disable-pip-version-check"];
454 pip_args.extend(pkgs);
455 let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
456 .args(&pip_args)
457 .output();
458 }
459 }
460 }
461
462 return ("__direct__".to_string(), vec![python_str, script_path]);
464 }
465
466 let extra_pkgs = Self::detect_python_packages(code);
467 if extra_pkgs.is_empty() {
468 (
469 "run".to_string(),
470 vec!["nixpkgs#python3".into(), "--".into(), script_path],
471 )
472 } else {
473 let mut args: Vec<String> = extra_pkgs
474 .iter()
475 .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
476 .collect();
477 args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
478 ("shell".to_string(), args)
479 }
480 }
481 "javascript" | "js" => (
482 "run".to_string(),
483 vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
484 ),
485 _ => (
486 "run".to_string(),
487 vec!["nixpkgs#bash".into(), "--".into(), script_path],
488 ),
489 }
490 }
491
492 fn extract_pip_requirements(code: &str) -> Option<String> {
494 for line in code.lines() {
495 let trimmed = line.trim();
496 if trimmed.starts_with("# requires:") {
497 let reqs = trimmed.strip_prefix("# requires:").unwrap().trim();
498 if !reqs.is_empty() {
499 return Some(reqs.to_string());
500 }
501 }
502 }
503 None
504 }
505
506 fn detect_python_packages(code: &str) -> Vec<&'static str> {
509 const KNOWN: &[(&str, &str)] = &[
511 ("requests", "requests"),
512 ("httpx", "httpx"),
513 ("aiohttp", "aiohttp"),
514 ("bs4", "beautifulsoup4"),
515 ("lxml", "lxml"),
516 ("pandas", "pandas"),
517 ("numpy", "numpy"),
518 ("scipy", "scipy"),
519 ("sklearn", "scikit-learn"),
520 ("PIL", "Pillow"),
521 ("cv2", "opencv4"),
522 ("yaml", "pyyaml"),
523 ("toml", "toml"),
524 ("dateutil", "python-dateutil"),
525 ("pytz", "pytz"),
526 ("boto3", "boto3"),
527 ("psycopg2", "psycopg2"),
528 ("pymongo", "pymongo"),
529 ("redis", "redis"),
530 ("celery", "celery"),
531 ("fastapi", "fastapi"),
532 ("pydantic", "pydantic"),
533 ("cryptography", "cryptography"),
534 ("jwt", "pyjwt"),
535 ("paramiko", "paramiko"),
536 ("dotenv", "python-dotenv"),
537 ("joblib", "joblib"),
538 ("torch", "pytorch"),
539 ("transformers", "transformers"),
540 ("datasets", "datasets"),
541 ("pyarrow", "pyarrow"),
542 ];
543
544 let mut found: Vec<&'static str> = Vec::new();
545 for (import_name, nix_name) in KNOWN {
546 let patterns = [
547 format!("import {import_name}"),
548 format!("import {import_name} "),
549 format!("from {import_name} "),
550 format!("from {import_name}."),
551 ];
552 if patterns.iter().any(|p| code.contains(p.as_str())) {
553 found.push(nix_name);
554 }
555 }
556 found
557 }
558
559 #[cfg(test)]
562 #[allow(dead_code)]
563 fn _expose_extract_future_imports(code: &str) -> (String, String) {
564 Self::extract_future_imports(code)
565 }
566
567 fn extract_future_imports(code: &str) -> (String, String) {
573 let mut hoisted = String::new();
574 let mut remaining = String::new();
575 for line in code.lines() {
576 let trimmed = line.trim_start();
577 if !line.starts_with(' ')
578 && !line.starts_with('\t')
579 && trimmed.starts_with("from __future__ import")
580 {
581 hoisted.push_str(line);
582 hoisted.push('\n');
583 } else {
584 remaining.push_str(line);
585 remaining.push('\n');
586 }
587 }
588 (hoisted, remaining)
589 }
590
591 fn wrap_python(user_code: &str) -> String {
592 let pip_install = String::new();
596
597 let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
605
606 format!(
607 r#"{future_imports}import sys, json as _json
608{pip_install}
609# ---- user implementation ----
610{user_code_clean}
611# ---- end implementation ----
612
613if __name__ == '__main__':
614 if 'execute' not in dir() or not callable(globals().get('execute')):
615 print(
616 "Noether stage error: implementation must define a top-level "
617 "function `def execute(input): ...` that takes the parsed input dict "
618 "and returns the output dict. Do not read from stdin or print to stdout — "
619 "the Noether runtime handles I/O for you.",
620 file=sys.stderr,
621 )
622 sys.exit(1)
623 try:
624 _raw = _json.loads(sys.stdin.read())
625 # If the runtime passed input as a JSON-encoded string, decode it once more.
626 # This happens when input arrives as null or a bare string from the CLI.
627 if isinstance(_raw, str):
628 try:
629 _raw = _json.loads(_raw)
630 except Exception:
631 pass
632 _output = execute(_raw if _raw is not None else {{}})
633 print(_json.dumps(_output))
634 except Exception as _e:
635 print(str(_e), file=sys.stderr)
636 sys.exit(1)
637"#
638 )
639 }
640
641 fn wrap_javascript(user_code: &str) -> String {
642 format!(
643 r#"const _readline = require('readline');
644let _input = '';
645process.stdin.on('data', d => _input += d);
646process.stdin.on('end', () => {{
647 try {{
648 // ---- user implementation ----
649 {user_code}
650 // ---- end implementation ----
651 const _result = execute(JSON.parse(_input));
652 process.stdout.write(JSON.stringify(_result) + '\n');
653 }} catch (e) {{
654 process.stderr.write(String(e) + '\n');
655 process.exit(1);
656 }}
657}});
658"#
659 )
660 }
661
662 fn wrap_bash(user_code: &str) -> String {
663 format!(
664 r#"#!/usr/bin/env bash
665set -euo pipefail
666INPUT=$(cat)
667
668# ---- user implementation ----
669{user_code}
670# ---- end implementation ----
671
672execute "$INPUT"
673"#
674 )
675 }
676}
677
678fn first_line(s: &str) -> &str {
682 s.lines()
683 .map(str::trim)
684 .find(|l| !l.is_empty())
685 .unwrap_or(s)
686}
687
688impl StageExecutor for NixExecutor {
691 fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
692 let impl_ = self
693 .implementations
694 .get(&stage_id.0)
695 .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
696
697 let code_hash = Self::code_hash(&impl_.code);
698 let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
699 self.run_script(stage_id, &script, &impl_.language, input)
700 }
701}
702
703#[cfg(test)]
704mod tests {
705 use super::*;
706
707 #[allow(dead_code)] fn make_executor() -> NixExecutor {
709 let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
710 let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
711 let _ = std::fs::create_dir_all(&cache_dir);
712 NixExecutor {
713 nix_bin,
714 cache_dir,
715 config: NixConfig::default(),
716 implementations: HashMap::new(),
717 }
718 }
719
720 #[test]
721 fn detect_python_packages_requests() {
722 let code = "import requests\ndef execute(v):\n return requests.get(v).json()";
723 let pkgs = NixExecutor::detect_python_packages(code);
724 assert!(
725 pkgs.contains(&"requests"),
726 "expected 'requests' in {pkgs:?}"
727 );
728 }
729
730 #[test]
731 fn detect_python_packages_stdlib_only() {
732 let code = "import urllib.request, json\ndef execute(v):\n return json.loads(v)";
733 let pkgs = NixExecutor::detect_python_packages(code);
734 assert!(
735 pkgs.is_empty(),
736 "stdlib imports should not trigger packages: {pkgs:?}"
737 );
738 }
739
740 #[test]
741 fn detect_python_packages_multiple() {
742 let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
743 let pkgs = NixExecutor::detect_python_packages(code);
744 assert!(pkgs.contains(&"pandas"));
745 assert!(pkgs.contains(&"numpy"));
746 assert!(pkgs.contains(&"beautifulsoup4"));
747 }
748
749 fn test_executor() -> NixExecutor {
750 NixExecutor {
751 nix_bin: PathBuf::from("/usr/bin/nix"),
752 cache_dir: PathBuf::from("/tmp/noether-test-cache"),
753 config: NixConfig::default(),
754 implementations: HashMap::new(),
755 }
756 }
757
758 #[test]
759 fn build_nix_command_no_packages() {
760 let exec = test_executor();
761 let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
762 assert_eq!(sub, "run");
763 assert!(args.iter().any(|a| a.contains("python3")));
764 assert!(!args.iter().any(|a| a.contains("shell")));
765 }
766
767 #[test]
768 fn build_nix_command_with_requests() {
769 let exec = test_executor();
770 let (sub, args) =
771 exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
772 assert_eq!(sub, "shell");
773 assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
774 assert!(args.iter().any(|a| a == "--command"));
775 assert!(
777 !args.iter().any(|a| a == "nixpkgs#python3"),
778 "bare python3 conflicts: {args:?}"
779 );
780 }
781
782 #[test]
783 fn python_wrapper_contains_boilerplate() {
784 let wrapped = NixExecutor::wrap_python("def execute(x):\n return x + 1");
785 assert!(wrapped.contains("sys.stdin.read()"));
786 assert!(wrapped.contains("_json.dumps(_output)"));
787 assert!(wrapped.contains("def execute(x)"));
788 }
789
790 #[test]
791 fn code_hash_is_stable() {
792 let h1 = NixExecutor::code_hash("hello world");
793 let h2 = NixExecutor::code_hash("hello world");
794 let h3 = NixExecutor::code_hash("different");
795 assert_eq!(h1, h2);
796 assert_ne!(h1, h3);
797 }
798
799 #[test]
800 fn classify_error_daemon_not_running() {
801 let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
802 assert!(msg.contains("nix daemon is not running"), "got: {msg}");
803 }
804
805 #[test]
806 fn future_imports_are_hoisted_out_of_user_code() {
807 let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n return input\n";
808 let wrapped = NixExecutor::wrap_python(user);
809 let future_pos = wrapped
811 .find("from __future__ import annotations")
812 .expect("future import should be present in wrapper");
813 let stdlib_pos = wrapped
814 .find("import sys, json as _json")
815 .expect("stdlib imports should be present");
816 assert!(
817 future_pos < stdlib_pos,
818 "future import must precede stdlib imports in wrapped output"
819 );
820 }
821
822 #[test]
823 fn user_code_without_future_imports_is_unchanged() {
824 let user = "import json\n\ndef execute(input):\n return input\n";
825 let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
826 assert_eq!(hoisted, "");
827 assert_eq!(remaining.trim(), user.trim());
828 }
829
830 #[test]
831 fn nested_future_import_inside_function_is_not_hoisted() {
832 let user =
835 "def execute(input):\n from __future__ import annotations\n return input\n";
836 let (hoisted, _) = NixExecutor::extract_future_imports(user);
837 assert_eq!(hoisted, "");
838 }
839
840 #[test]
841 fn classify_error_user_code_exit1() {
842 let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
843 assert!(msg.contains("ValueError"), "got: {msg}");
844 assert!(msg.contains("exit 1"), "got: {msg}");
845 }
846
847 #[test]
848 fn classify_error_disk_full() {
849 let msg = NixExecutor::classify_error("No space left on device", Some(1));
850 assert!(msg.contains("disk space"), "got: {msg}");
851 }
852
853 #[test]
854 fn classify_error_empty_stderr() {
855 let msg = NixExecutor::classify_error("", Some(137));
856 assert!(msg.contains("exit 137"), "got: {msg}");
857 }
858
859 #[test]
860 fn nix_config_defaults() {
861 let cfg = NixConfig::default();
862 assert_eq!(cfg.timeout_secs, 30);
863 assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
864 assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
865 }
866
867 #[test]
868 fn first_line_extracts_correctly() {
869 assert_eq!(first_line(" \nfoo\nbar"), "foo");
870 assert_eq!(first_line("single"), "single");
871 assert_eq!(first_line(""), "");
872 }
873
874 #[test]
877 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
878 fn nix_python_identity_stage() {
879 let nix_bin = match NixExecutor::find_nix() {
880 Some(p) => p,
881 None => {
882 eprintln!("nix not found, skipping");
883 return;
884 }
885 };
886
887 let cache_dir = std::env::temp_dir().join("noether-nix-integ");
888 let _ = std::fs::create_dir_all(&cache_dir);
889
890 let code = "def execute(x):\n return x";
891 let executor = NixExecutor {
892 nix_bin,
893 cache_dir,
894 config: NixConfig::default(),
895 implementations: {
896 let mut m = HashMap::new();
897 let id = StageId("test_identity".into());
898 m.insert(
899 id.0.clone(),
900 StageImpl {
901 code: code.into(),
902 language: "python".into(),
903 },
904 );
905 m
906 },
907 };
908
909 let id = StageId("test_identity".into());
910 let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
911 assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
912 }
913
914 #[test]
917 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
918 fn nix_timeout_kills_hanging_stage() {
919 let nix_bin = match NixExecutor::find_nix() {
920 Some(p) => p,
921 None => {
922 eprintln!("nix not found, skipping timeout test");
923 return;
924 }
925 };
926
927 let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
928 let _ = std::fs::create_dir_all(&cache_dir);
929
930 let code = "import time\ndef execute(x):\n time.sleep(9999)\n return x";
931 let executor = NixExecutor {
932 nix_bin,
933 cache_dir,
934 config: NixConfig {
935 timeout_secs: 2,
936 ..NixConfig::default()
937 },
938 implementations: {
939 let mut m = HashMap::new();
940 let id = StageId("hanging".into());
941 m.insert(
942 id.0.clone(),
943 StageImpl {
944 code: code.into(),
945 language: "python".into(),
946 },
947 );
948 m
949 },
950 };
951
952 let id = StageId("hanging".into());
953 let result = executor.execute(&id, &serde_json::json!(null));
954 assert!(
955 matches!(
956 result,
957 Err(ExecutionError::TimedOut {
958 timeout_secs: 2,
959 ..
960 })
961 ),
962 "expected TimedOut, got: {result:?}"
963 );
964 }
965}