1use super::{ExecutionError, StageExecutor};
46use noether_core::stage::StageId;
47use serde_json::Value;
48use sha2::{Digest, Sha256};
49use std::collections::HashMap;
50use std::io::Write as IoWrite;
51use std::path::{Path, PathBuf};
52use std::process::{Command, Stdio};
53use std::sync::mpsc;
54use std::time::Duration;
55
56#[derive(Debug, Clone)]
60pub struct NixConfig {
61 pub timeout_secs: u64,
65 pub max_output_bytes: usize,
69 pub max_stderr_bytes: usize,
72}
73
74impl Default for NixConfig {
75 fn default() -> Self {
76 Self {
77 timeout_secs: 30,
78 max_output_bytes: 10 * 1024 * 1024,
79 max_stderr_bytes: 64 * 1024,
80 }
81 }
82}
83
84#[derive(Clone)]
88struct StageImpl {
89 code: String,
90 language: String,
91}
92
93pub struct NixExecutor {
110 nix_bin: PathBuf,
111 cache_dir: PathBuf,
112 config: NixConfig,
113 implementations: HashMap<String, StageImpl>,
114}
115
116impl NixExecutor {
117 pub fn find_nix() -> Option<PathBuf> {
120 let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
122 if determinate.exists() {
123 return Some(determinate);
124 }
125
126 let path_env = std::env::var_os("PATH")?;
130 for dir in std::env::split_paths(&path_env) {
131 let candidate = dir.join("nix");
132 if candidate.is_file() {
133 return Some(candidate);
134 }
135 }
136 None
137 }
138
139 pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
144 Self::from_store_with_config(store, NixConfig::default())
145 }
146
147 pub fn from_store_with_config(
149 store: &dyn noether_store::StageStore,
150 config: NixConfig,
151 ) -> Option<Self> {
152 let nix_bin = Self::find_nix()?;
153
154 let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
155 let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
156 let _ = std::fs::create_dir_all(&cache_dir);
157
158 let mut implementations = HashMap::new();
159 for stage in store.list(None) {
160 if let (Some(code), Some(lang)) =
161 (&stage.implementation_code, &stage.implementation_language)
162 {
163 implementations.insert(
164 stage.id.0.clone(),
165 StageImpl {
166 code: code.clone(),
167 language: lang.clone(),
168 },
169 );
170 }
171 }
172
173 Some(Self {
174 nix_bin,
175 cache_dir,
176 config,
177 implementations,
178 })
179 }
180
181 pub fn register(&mut self, stage_id: &StageId, code: &str, language: &str) {
183 self.implementations.insert(
184 stage_id.0.clone(),
185 StageImpl {
186 code: code.into(),
187 language: language.into(),
188 },
189 );
190 }
191
192 pub fn has_implementation(&self, stage_id: &StageId) -> bool {
194 self.implementations.contains_key(&stage_id.0)
195 }
196
197 pub fn warmup(&self) -> std::thread::JoinHandle<()> {
207 let nix_bin = self.nix_bin.clone();
208 std::thread::spawn(move || {
209 let status = Command::new(&nix_bin)
212 .args([
213 "build",
214 "--no-link",
215 "--quiet",
216 "--no-write-lock-file",
217 "nixpkgs#python3",
218 ])
219 .stdout(Stdio::null())
220 .stderr(Stdio::null())
221 .status();
222 match status {
223 Ok(s) if s.success() => {
224 eprintln!("[noether] nix warmup: python3 runtime cached");
225 }
226 Ok(s) => {
227 eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
228 }
229 Err(e) => {
230 eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
231 }
232 }
233 })
234 }
235
236 fn code_hash(code: &str) -> String {
240 hex::encode(Sha256::digest(code.as_bytes()))
241 }
242
243 fn ensure_script(
246 &self,
247 impl_hash: &str,
248 code: &str,
249 language: &str,
250 ) -> Result<PathBuf, ExecutionError> {
251 let ext = match language {
252 "javascript" | "js" => "js",
253 "bash" | "sh" => "sh",
254 _ => "py",
255 };
256
257 let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
258 if path.exists() {
259 return Ok(path);
260 }
261
262 let wrapped = match language {
263 "javascript" | "js" => Self::wrap_javascript(code),
264 "bash" | "sh" => Self::wrap_bash(code),
265 _ => Self::wrap_python(code),
266 };
267
268 std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
269 stage_id: StageId(impl_hash.into()),
270 message: format!("failed to write stage script: {e}"),
271 })?;
272
273 Ok(path)
274 }
275
276 fn run_script(
279 &self,
280 stage_id: &StageId,
281 script: &Path,
282 language: &str,
283 input: &Value,
284 ) -> Result<Value, ExecutionError> {
285 let input_json = serde_json::to_string(input).unwrap_or_default();
286
287 let code = self
288 .implementations
289 .get(&stage_id.0)
290 .map(|i| i.code.as_str())
291 .unwrap_or("");
292
293 let (nix_subcommand, args) = self.build_nix_command(language, script, code);
294
295 let mut child = if nix_subcommand == "__direct__" {
297 Command::new(&args[0])
298 .args(&args[1..])
299 .stdin(Stdio::piped())
300 .stdout(Stdio::piped())
301 .stderr(Stdio::piped())
302 .spawn()
303 } else {
304 Command::new(&self.nix_bin)
305 .arg(&nix_subcommand)
306 .args(["--no-write-lock-file", "--quiet"])
307 .args(&args)
308 .stdin(Stdio::piped())
309 .stdout(Stdio::piped())
310 .stderr(Stdio::piped())
311 .spawn()
312 }
313 .map_err(|e| ExecutionError::StageFailed {
314 stage_id: stage_id.clone(),
315 message: format!("failed to spawn process: {e}"),
316 })?;
317
318 if let Some(mut stdin) = child.stdin.take() {
321 let bytes = input_json.into_bytes();
322 std::thread::spawn(move || {
323 let _ = stdin.write_all(&bytes);
324 });
325 }
326
327 let pid = child.id();
329 let timeout = Duration::from_secs(self.config.timeout_secs);
330 let (tx, rx) = mpsc::channel();
331 std::thread::spawn(move || {
332 let _ = tx.send(child.wait_with_output());
333 });
334
335 let out = match rx.recv_timeout(timeout) {
336 Ok(Ok(o)) => o,
337 Ok(Err(e)) => {
338 return Err(ExecutionError::StageFailed {
339 stage_id: stage_id.clone(),
340 message: format!("nix process error: {e}"),
341 });
342 }
343 Err(_elapsed) => {
344 let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
346 return Err(ExecutionError::TimedOut {
347 stage_id: stage_id.clone(),
348 timeout_secs: self.config.timeout_secs,
349 });
350 }
351 };
352
353 let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
355 let stderr = String::from_utf8_lossy(stderr_raw);
356
357 if !out.status.success() {
358 return Err(ExecutionError::StageFailed {
359 stage_id: stage_id.clone(),
360 message: Self::classify_error(&stderr, out.status.code()),
361 });
362 }
363
364 let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
366 let stdout = String::from_utf8_lossy(stdout_raw);
367
368 if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
369 return Err(ExecutionError::StageFailed {
370 stage_id: stage_id.clone(),
371 message: format!(
372 "stage output exceeded {} bytes limit",
373 self.config.max_output_bytes
374 ),
375 });
376 }
377
378 serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
379 stage_id: stage_id.clone(),
380 message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
381 })
382 }
383
384 fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
387 if stderr.contains("cannot connect to nix daemon")
389 || stderr.contains("Cannot connect to the Nix daemon")
390 {
391 return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
392 or `nix daemon`"
393 .to_string();
394 }
395 if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
396 return format!(
397 "nix flake error (check network / nixpkgs access): {}",
398 first_line(stderr)
399 );
400 }
401 if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
402 return format!(
403 "nix failed to fetch runtime package (check network): {}",
404 first_line(stderr)
405 );
406 }
407 if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
408 return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
409 .to_string();
410 }
411 if stderr.contains("nix: command not found") || stderr.contains("No such file") {
412 return "nix binary not found — is Nix installed?".to_string();
413 }
414 let code_str = exit_code
416 .map(|c| format!(" (exit {c})"))
417 .unwrap_or_default();
418 if stderr.trim().is_empty() {
419 format!("stage exited without output{code_str}")
420 } else {
421 format!("stage error{code_str}: {stderr}")
422 }
423 }
424
425 fn build_nix_command(
431 &self,
432 language: &str,
433 script: &Path,
434 code: &str,
435 ) -> (String, Vec<String>) {
436 let script_path = script.to_str().unwrap_or("/dev/null").to_string();
437
438 match language {
439 "python" | "python3" | "" => {
440 if let Some(reqs) = Self::extract_pip_requirements(code) {
444 let venv_hash = {
445 use sha2::{Digest, Sha256};
446 let h = Sha256::digest(reqs.as_bytes());
447 hex::encode(&h[..8])
448 };
449 let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
450 let venv_str = venv_dir.to_string_lossy().to_string();
451 let python = venv_dir.join("bin").join("python3");
452 let python_str = python.to_string_lossy().to_string();
453
454 if !python.exists() {
456 let setup = std::process::Command::new("python3")
457 .args(["-m", "venv", &venv_str])
458 .output();
459 if let Ok(out) = setup {
460 if out.status.success() {
461 let pip = venv_dir.join("bin").join("pip");
462 let pkgs: Vec<&str> = reqs.split(", ").collect();
463 let mut pip_args =
464 vec!["install", "--quiet", "--disable-pip-version-check"];
465 pip_args.extend(pkgs);
466 let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
467 .args(&pip_args)
468 .output();
469 }
470 }
471 }
472
473 return ("__direct__".to_string(), vec![python_str, script_path]);
475 }
476
477 let extra_pkgs = Self::detect_python_packages(code);
478 if extra_pkgs.is_empty() {
479 (
480 "run".to_string(),
481 vec!["nixpkgs#python3".into(), "--".into(), script_path],
482 )
483 } else {
484 let mut args: Vec<String> = extra_pkgs
485 .iter()
486 .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
487 .collect();
488 args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
489 ("shell".to_string(), args)
490 }
491 }
492 "javascript" | "js" => (
493 "run".to_string(),
494 vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
495 ),
496 _ => (
497 "run".to_string(),
498 vec!["nixpkgs#bash".into(), "--".into(), script_path],
499 ),
500 }
501 }
502
503 fn extract_pip_requirements(code: &str) -> Option<String> {
517 for line in code.lines() {
518 let trimmed = line.trim();
519 if trimmed.starts_with("# requires:") {
520 let reqs = trimmed.strip_prefix("# requires:").unwrap().trim();
521 if reqs.is_empty() {
522 continue;
523 }
524 let valid: Vec<String> = reqs
525 .split(',')
526 .map(|s| s.trim())
527 .filter(|s| !s.is_empty())
528 .filter(|s| match validate_pip_spec(s) {
529 Ok(()) => true,
530 Err(reason) => {
531 eprintln!(
532 "[noether] rejected `# requires:` entry {s:?} ({reason}); skipping"
533 );
534 false
535 }
536 })
537 .map(|s| s.to_string())
538 .collect();
539
540 if valid.is_empty() {
541 eprintln!(
542 "[noether] all `# requires:` entries rejected (raw={reqs:?}); falling back to default Nix runtime"
543 );
544 return None;
545 }
546 return Some(valid.join(", "));
547 }
548 }
549 None
550 }
551
552 fn detect_python_packages(code: &str) -> Vec<&'static str> {
555 const KNOWN: &[(&str, &str)] = &[
557 ("requests", "requests"),
558 ("httpx", "httpx"),
559 ("aiohttp", "aiohttp"),
560 ("bs4", "beautifulsoup4"),
561 ("lxml", "lxml"),
562 ("pandas", "pandas"),
563 ("numpy", "numpy"),
564 ("scipy", "scipy"),
565 ("sklearn", "scikit-learn"),
566 ("PIL", "Pillow"),
567 ("cv2", "opencv4"),
568 ("yaml", "pyyaml"),
569 ("toml", "toml"),
570 ("dateutil", "python-dateutil"),
571 ("pytz", "pytz"),
572 ("boto3", "boto3"),
573 ("psycopg2", "psycopg2"),
574 ("pymongo", "pymongo"),
575 ("redis", "redis"),
576 ("celery", "celery"),
577 ("fastapi", "fastapi"),
578 ("pydantic", "pydantic"),
579 ("cryptography", "cryptography"),
580 ("jwt", "pyjwt"),
581 ("paramiko", "paramiko"),
582 ("dotenv", "python-dotenv"),
583 ("joblib", "joblib"),
584 ("torch", "pytorch"),
585 ("transformers", "transformers"),
586 ("datasets", "datasets"),
587 ("pyarrow", "pyarrow"),
588 ];
589
590 let mut found: Vec<&'static str> = Vec::new();
591 for (import_name, nix_name) in KNOWN {
592 let patterns = [
593 format!("import {import_name}"),
594 format!("import {import_name} "),
595 format!("from {import_name} "),
596 format!("from {import_name}."),
597 ];
598 if patterns.iter().any(|p| code.contains(p.as_str())) {
599 found.push(nix_name);
600 }
601 }
602 found
603 }
604
605 #[cfg(test)]
608 #[allow(dead_code)]
609 fn _expose_extract_future_imports(code: &str) -> (String, String) {
610 Self::extract_future_imports(code)
611 }
612
613 fn extract_future_imports(code: &str) -> (String, String) {
619 let mut hoisted = String::new();
620 let mut remaining = String::new();
621 for line in code.lines() {
622 let trimmed = line.trim_start();
623 if !line.starts_with(' ')
624 && !line.starts_with('\t')
625 && trimmed.starts_with("from __future__ import")
626 {
627 hoisted.push_str(line);
628 hoisted.push('\n');
629 } else {
630 remaining.push_str(line);
631 remaining.push('\n');
632 }
633 }
634 (hoisted, remaining)
635 }
636
637 fn wrap_python(user_code: &str) -> String {
638 let pip_install = String::new();
642
643 let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
651
652 format!(
653 r#"{future_imports}import sys, json as _json
654{pip_install}
655# ---- user implementation ----
656{user_code_clean}
657# ---- end implementation ----
658
659if __name__ == '__main__':
660 if 'execute' not in dir() or not callable(globals().get('execute')):
661 print(
662 "Noether stage error: implementation must define a top-level "
663 "function `def execute(input): ...` that takes the parsed input dict "
664 "and returns the output dict. Do not read from stdin or print to stdout — "
665 "the Noether runtime handles I/O for you.",
666 file=sys.stderr,
667 )
668 sys.exit(1)
669 try:
670 _raw = _json.loads(sys.stdin.read())
671 # If the runtime passed input as a JSON-encoded string, decode it once more.
672 # This happens when input arrives as null or a bare string from the CLI.
673 if isinstance(_raw, str):
674 try:
675 _raw = _json.loads(_raw)
676 except Exception:
677 pass
678 _output = execute(_raw if _raw is not None else {{}})
679 print(_json.dumps(_output))
680 except Exception as _e:
681 print(str(_e), file=sys.stderr)
682 sys.exit(1)
683"#
684 )
685 }
686
687 fn wrap_javascript(user_code: &str) -> String {
688 format!(
689 r#"const _readline = require('readline');
690let _input = '';
691process.stdin.on('data', d => _input += d);
692process.stdin.on('end', () => {{
693 try {{
694 // ---- user implementation ----
695 {user_code}
696 // ---- end implementation ----
697 const _result = execute(JSON.parse(_input));
698 process.stdout.write(JSON.stringify(_result) + '\n');
699 }} catch (e) {{
700 process.stderr.write(String(e) + '\n');
701 process.exit(1);
702 }}
703}});
704"#
705 )
706 }
707
708 fn wrap_bash(user_code: &str) -> String {
709 format!(
710 r#"#!/usr/bin/env bash
711set -euo pipefail
712INPUT=$(cat)
713
714# ---- user implementation ----
715{user_code}
716# ---- end implementation ----
717
718execute "$INPUT"
719"#
720 )
721 }
722}
723
724fn first_line(s: &str) -> &str {
728 s.lines()
729 .map(str::trim)
730 .find(|l| !l.is_empty())
731 .unwrap_or(s)
732}
733
734fn validate_pip_spec(spec: &str) -> Result<(), &'static str> {
743 let allow_unpinned = matches!(
744 std::env::var("NOETHER_ALLOW_UNPINNED_PIP").as_deref(),
745 Ok("1" | "true" | "yes" | "on")
746 );
747
748 let (name, version) = match spec.split_once("==") {
750 Some((n, v)) => (n.trim(), Some(v.trim())),
751 None => {
752 if !allow_unpinned {
753 return Err("unpinned; use pkg==version or set NOETHER_ALLOW_UNPINNED_PIP=1");
754 }
755 (spec.trim(), None)
756 }
757 };
758
759 if name.is_empty() {
760 return Err("empty package name");
761 }
762 if !name
763 .bytes()
764 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'_' | b'-' | b'.'))
765 {
766 return Err("package name contains disallowed characters");
767 }
768 if let Some(v) = version {
769 if v.is_empty() {
770 return Err("empty version after `==`");
771 }
772 if !v
773 .bytes()
774 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'+' | b'!' | b'-'))
775 {
776 return Err("version contains disallowed characters");
777 }
778 }
779 Ok(())
780}
781
782impl StageExecutor for NixExecutor {
785 fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
786 let impl_ = self
787 .implementations
788 .get(&stage_id.0)
789 .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
790
791 let code_hash = Self::code_hash(&impl_.code);
792 let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
793 self.run_script(stage_id, &script, &impl_.language, input)
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800
801 #[allow(dead_code)] fn make_executor() -> NixExecutor {
803 let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
804 let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
805 let _ = std::fs::create_dir_all(&cache_dir);
806 NixExecutor {
807 nix_bin,
808 cache_dir,
809 config: NixConfig::default(),
810 implementations: HashMap::new(),
811 }
812 }
813
814 #[test]
815 fn validate_pip_spec_accepts_pinned() {
816 assert!(validate_pip_spec("pandas==2.0.0").is_ok());
817 assert!(validate_pip_spec("scikit-learn==1.5.1").is_ok());
818 assert!(validate_pip_spec("urllib3==2.2.3").is_ok());
819 assert!(validate_pip_spec("pydantic==2.5.0+cu121").is_ok());
820 }
821
822 #[test]
823 fn validate_pip_spec_rejects_unpinned_by_default() {
824 let guard = (std::env::var_os("NOETHER_ALLOW_UNPINNED_PIP"),);
826 unsafe {
828 std::env::remove_var("NOETHER_ALLOW_UNPINNED_PIP");
829 }
830 let result = validate_pip_spec("pandas");
831 if let (Some(prev),) = guard {
833 unsafe {
834 std::env::set_var("NOETHER_ALLOW_UNPINNED_PIP", prev);
835 }
836 }
837 assert!(result.is_err(), "bare name must be rejected without opt-in");
838 }
839
840 #[test]
841 fn validate_pip_spec_rejects_shell_metacharacters() {
842 for bad in [
843 "pandas; rm -rf /",
844 "pandas==$(whoami)",
845 "pandas==1.0.0; echo pwned",
846 "pandas==`id`",
847 "https://evil.example/wheel.whl",
848 "git+https://example.com/repo.git",
849 "pkg with space==1.0",
850 "pkg==1.0 && echo",
851 ] {
852 assert!(validate_pip_spec(bad).is_err(), "should reject {bad:?}");
853 }
854 }
855
856 #[test]
857 fn validate_pip_spec_rejects_empty() {
858 assert!(validate_pip_spec("==1.0").is_err());
859 assert!(validate_pip_spec("pkg==").is_err());
860 }
861
862 #[test]
863 fn detect_python_packages_requests() {
864 let code = "import requests\ndef execute(v):\n return requests.get(v).json()";
865 let pkgs = NixExecutor::detect_python_packages(code);
866 assert!(
867 pkgs.contains(&"requests"),
868 "expected 'requests' in {pkgs:?}"
869 );
870 }
871
872 #[test]
873 fn detect_python_packages_stdlib_only() {
874 let code = "import urllib.request, json\ndef execute(v):\n return json.loads(v)";
875 let pkgs = NixExecutor::detect_python_packages(code);
876 assert!(
877 pkgs.is_empty(),
878 "stdlib imports should not trigger packages: {pkgs:?}"
879 );
880 }
881
882 #[test]
883 fn detect_python_packages_multiple() {
884 let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
885 let pkgs = NixExecutor::detect_python_packages(code);
886 assert!(pkgs.contains(&"pandas"));
887 assert!(pkgs.contains(&"numpy"));
888 assert!(pkgs.contains(&"beautifulsoup4"));
889 }
890
891 fn test_executor() -> NixExecutor {
892 NixExecutor {
893 nix_bin: PathBuf::from("/usr/bin/nix"),
894 cache_dir: PathBuf::from("/tmp/noether-test-cache"),
895 config: NixConfig::default(),
896 implementations: HashMap::new(),
897 }
898 }
899
900 #[test]
901 fn build_nix_command_no_packages() {
902 let exec = test_executor();
903 let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
904 assert_eq!(sub, "run");
905 assert!(args.iter().any(|a| a.contains("python3")));
906 assert!(!args.iter().any(|a| a.contains("shell")));
907 }
908
909 #[test]
910 fn build_nix_command_with_requests() {
911 let exec = test_executor();
912 let (sub, args) =
913 exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
914 assert_eq!(sub, "shell");
915 assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
916 assert!(args.iter().any(|a| a == "--command"));
917 assert!(
919 !args.iter().any(|a| a == "nixpkgs#python3"),
920 "bare python3 conflicts: {args:?}"
921 );
922 }
923
924 #[test]
925 fn python_wrapper_contains_boilerplate() {
926 let wrapped = NixExecutor::wrap_python("def execute(x):\n return x + 1");
927 assert!(wrapped.contains("sys.stdin.read()"));
928 assert!(wrapped.contains("_json.dumps(_output)"));
929 assert!(wrapped.contains("def execute(x)"));
930 }
931
932 #[test]
933 fn code_hash_is_stable() {
934 let h1 = NixExecutor::code_hash("hello world");
935 let h2 = NixExecutor::code_hash("hello world");
936 let h3 = NixExecutor::code_hash("different");
937 assert_eq!(h1, h2);
938 assert_ne!(h1, h3);
939 }
940
941 #[test]
942 fn classify_error_daemon_not_running() {
943 let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
944 assert!(msg.contains("nix daemon is not running"), "got: {msg}");
945 }
946
947 #[test]
948 fn future_imports_are_hoisted_out_of_user_code() {
949 let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n return input\n";
950 let wrapped = NixExecutor::wrap_python(user);
951 let future_pos = wrapped
953 .find("from __future__ import annotations")
954 .expect("future import should be present in wrapper");
955 let stdlib_pos = wrapped
956 .find("import sys, json as _json")
957 .expect("stdlib imports should be present");
958 assert!(
959 future_pos < stdlib_pos,
960 "future import must precede stdlib imports in wrapped output"
961 );
962 }
963
964 #[test]
965 fn user_code_without_future_imports_is_unchanged() {
966 let user = "import json\n\ndef execute(input):\n return input\n";
967 let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
968 assert_eq!(hoisted, "");
969 assert_eq!(remaining.trim(), user.trim());
970 }
971
972 #[test]
973 fn nested_future_import_inside_function_is_not_hoisted() {
974 let user =
977 "def execute(input):\n from __future__ import annotations\n return input\n";
978 let (hoisted, _) = NixExecutor::extract_future_imports(user);
979 assert_eq!(hoisted, "");
980 }
981
982 #[test]
983 fn classify_error_user_code_exit1() {
984 let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
985 assert!(msg.contains("ValueError"), "got: {msg}");
986 assert!(msg.contains("exit 1"), "got: {msg}");
987 }
988
989 #[test]
990 fn classify_error_disk_full() {
991 let msg = NixExecutor::classify_error("No space left on device", Some(1));
992 assert!(msg.contains("disk space"), "got: {msg}");
993 }
994
995 #[test]
996 fn classify_error_empty_stderr() {
997 let msg = NixExecutor::classify_error("", Some(137));
998 assert!(msg.contains("exit 137"), "got: {msg}");
999 }
1000
1001 #[test]
1002 fn nix_config_defaults() {
1003 let cfg = NixConfig::default();
1004 assert_eq!(cfg.timeout_secs, 30);
1005 assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
1006 assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
1007 }
1008
1009 #[test]
1010 fn first_line_extracts_correctly() {
1011 assert_eq!(first_line(" \nfoo\nbar"), "foo");
1012 assert_eq!(first_line("single"), "single");
1013 assert_eq!(first_line(""), "");
1014 }
1015
1016 #[test]
1019 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1020 fn nix_python_identity_stage() {
1021 let nix_bin = match NixExecutor::find_nix() {
1022 Some(p) => p,
1023 None => {
1024 eprintln!("nix not found, skipping");
1025 return;
1026 }
1027 };
1028
1029 let cache_dir = std::env::temp_dir().join("noether-nix-integ");
1030 let _ = std::fs::create_dir_all(&cache_dir);
1031
1032 let code = "def execute(x):\n return x";
1033 let executor = NixExecutor {
1034 nix_bin,
1035 cache_dir,
1036 config: NixConfig::default(),
1037 implementations: {
1038 let mut m = HashMap::new();
1039 let id = StageId("test_identity".into());
1040 m.insert(
1041 id.0.clone(),
1042 StageImpl {
1043 code: code.into(),
1044 language: "python".into(),
1045 },
1046 );
1047 m
1048 },
1049 };
1050
1051 let id = StageId("test_identity".into());
1052 let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
1053 assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
1054 }
1055
1056 #[test]
1059 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1060 fn nix_timeout_kills_hanging_stage() {
1061 let nix_bin = match NixExecutor::find_nix() {
1062 Some(p) => p,
1063 None => {
1064 eprintln!("nix not found, skipping timeout test");
1065 return;
1066 }
1067 };
1068
1069 let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
1070 let _ = std::fs::create_dir_all(&cache_dir);
1071
1072 let code = "import time\ndef execute(x):\n time.sleep(9999)\n return x";
1073 let executor = NixExecutor {
1074 nix_bin,
1075 cache_dir,
1076 config: NixConfig {
1077 timeout_secs: 2,
1078 ..NixConfig::default()
1079 },
1080 implementations: {
1081 let mut m = HashMap::new();
1082 let id = StageId("hanging".into());
1083 m.insert(
1084 id.0.clone(),
1085 StageImpl {
1086 code: code.into(),
1087 language: "python".into(),
1088 },
1089 );
1090 m
1091 },
1092 };
1093
1094 let id = StageId("hanging".into());
1095 let result = executor.execute(&id, &serde_json::json!(null));
1096 assert!(
1097 matches!(
1098 result,
1099 Err(ExecutionError::TimedOut {
1100 timeout_secs: 2,
1101 ..
1102 })
1103 ),
1104 "expected TimedOut, got: {result:?}"
1105 );
1106 }
1107}