1#![warn(clippy::unwrap_used)]
2#![cfg_attr(test, allow(clippy::unwrap_used))]
3
4use super::{ExecutionError, StageExecutor};
49use noether_core::stage::StageId;
50use serde_json::Value;
51use sha2::{Digest, Sha256};
52use std::collections::HashMap;
53use std::io::Write as IoWrite;
54use std::path::{Path, PathBuf};
55use std::process::{Command, Stdio};
56use std::sync::mpsc;
57use std::time::Duration;
58
59#[derive(Debug, Clone)]
63pub struct NixConfig {
64 pub timeout_secs: u64,
68 pub max_output_bytes: usize,
72 pub max_stderr_bytes: usize,
75 pub isolation: super::isolation::IsolationBackend,
81}
82
83impl Default for NixConfig {
84 fn default() -> Self {
85 Self {
86 timeout_secs: 30,
87 max_output_bytes: 10 * 1024 * 1024,
88 max_stderr_bytes: 64 * 1024,
89 isolation: super::isolation::IsolationBackend::None,
90 }
91 }
92}
93
94impl NixConfig {
95 pub fn with_isolation(mut self, backend: super::isolation::IsolationBackend) -> Self {
97 self.isolation = backend;
98 self
99 }
100}
101
102#[derive(Clone)]
108struct StageImpl {
109 code: String,
110 language: String,
111 effects: noether_core::effects::EffectSet,
112}
113
114pub struct NixExecutor {
131 nix_bin: PathBuf,
132 cache_dir: PathBuf,
133 config: NixConfig,
134 implementations: HashMap<String, StageImpl>,
135}
136
137impl NixExecutor {
138 pub fn find_nix() -> Option<PathBuf> {
141 let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
143 if determinate.exists() {
144 return Some(determinate);
145 }
146
147 let path_env = std::env::var_os("PATH")?;
151 for dir in std::env::split_paths(&path_env) {
152 let candidate = dir.join("nix");
153 if candidate.is_file() {
154 return Some(candidate);
155 }
156 }
157 None
158 }
159
160 pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
165 Self::from_store_with_config(store, NixConfig::default())
166 }
167
168 pub fn from_store_with_config(
170 store: &dyn noether_store::StageStore,
171 config: NixConfig,
172 ) -> Option<Self> {
173 let nix_bin = Self::find_nix()?;
174
175 let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
176 let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
177 let _ = std::fs::create_dir_all(&cache_dir);
178
179 let mut implementations = HashMap::new();
180 for stage in store.list(None) {
181 if let (Some(code), Some(lang)) =
182 (&stage.implementation_code, &stage.implementation_language)
183 {
184 implementations.insert(
185 stage.id.0.clone(),
186 StageImpl {
187 code: code.clone(),
188 language: lang.clone(),
189 effects: stage.signature.effects.clone(),
190 },
191 );
192 }
193 }
194
195 Some(Self {
196 nix_bin,
197 cache_dir,
198 config,
199 implementations,
200 })
201 }
202
203 pub fn config_snapshot(&self) -> NixConfig {
206 self.config.clone()
207 }
208
209 pub fn rebuild_with_config(mut self, config: NixConfig) -> Option<Self> {
214 self.config = config;
215 Some(self)
216 }
217
218 pub fn register_with_effects(
222 &mut self,
223 stage_id: &StageId,
224 code: &str,
225 language: &str,
226 effects: noether_core::effects::EffectSet,
227 ) {
228 self.implementations.insert(
229 stage_id.0.clone(),
230 StageImpl {
231 code: code.into(),
232 language: language.into(),
233 effects,
234 },
235 );
236 }
237
238 pub fn has_implementation(&self, stage_id: &StageId) -> bool {
240 self.implementations.contains_key(&stage_id.0)
241 }
242
243 pub fn warmup(&self) -> std::thread::JoinHandle<()> {
253 let nix_bin = self.nix_bin.clone();
254 std::thread::spawn(move || {
255 let status = Command::new(&nix_bin)
258 .args([
259 "build",
260 "--no-link",
261 "--quiet",
262 "--no-write-lock-file",
263 "nixpkgs#python3",
264 ])
265 .stdout(Stdio::null())
266 .stderr(Stdio::null())
267 .status();
268 match status {
269 Ok(s) if s.success() => {
270 eprintln!("[noether] nix warmup: python3 runtime cached");
271 }
272 Ok(s) => {
273 eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
274 }
275 Err(e) => {
276 eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
277 }
278 }
279 })
280 }
281
282 fn code_hash(code: &str) -> String {
286 hex::encode(Sha256::digest(code.as_bytes()))
287 }
288
289 fn ensure_script(
292 &self,
293 impl_hash: &str,
294 code: &str,
295 language: &str,
296 ) -> Result<PathBuf, ExecutionError> {
297 let ext = match language {
298 "javascript" | "js" => "js",
299 "bash" | "sh" => "sh",
300 _ => "py",
301 };
302
303 let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
304 if path.exists() {
305 return Ok(path);
306 }
307
308 let wrapped = match language {
309 "javascript" | "js" => Self::wrap_javascript(code),
310 "bash" | "sh" => Self::wrap_bash(code),
311 _ => Self::wrap_python(code),
312 };
313
314 std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
315 stage_id: StageId(impl_hash.into()),
316 message: format!("failed to write stage script: {e}"),
317 })?;
318
319 Ok(path)
320 }
321
322 fn run_script(
325 &self,
326 stage_id: &StageId,
327 script: &Path,
328 language: &str,
329 input: &Value,
330 ) -> Result<Value, ExecutionError> {
331 let input_json = serde_json::to_string(input).unwrap_or_default();
332
333 let code = self
334 .implementations
335 .get(&stage_id.0)
336 .map(|i| i.code.as_str())
337 .unwrap_or("");
338
339 let (nix_subcommand, args) = self.build_nix_command(language, script, code);
340
341 let raw_argv: Vec<String> = if nix_subcommand == "__direct__" {
346 args.clone()
347 } else {
348 let mut v = vec![self.nix_bin.display().to_string(), nix_subcommand.clone()];
349 v.push("--no-write-lock-file".into());
350 v.push("--quiet".into());
351 v.extend(args.iter().cloned());
352 v
353 };
354
355 let mut spawn = match &self.config.isolation {
356 super::isolation::IsolationBackend::None => {
357 let mut cmd = Command::new(&raw_argv[0]);
359 cmd.args(&raw_argv[1..]);
360 cmd
361 }
362 super::isolation::IsolationBackend::Bwrap { bwrap_path } => {
363 let mut policy = super::isolation::IsolationPolicy::from_effects(
367 self.implementations
368 .get(&stage_id.0)
369 .map(|i| &i.effects)
370 .unwrap_or(&noether_core::effects::EffectSet::pure()),
371 );
372 policy.ro_binds.push(noether_isolation::RoBind::new(
377 self.cache_dir.to_path_buf(),
378 self.cache_dir.to_path_buf(),
379 ));
380 if !self.nix_bin.starts_with("/nix/store")
405 && !self.nix_bin.starts_with(&self.cache_dir)
406 {
407 return Err(ExecutionError::StageFailed {
408 stage_id: stage_id.clone(),
409 message: format!(
410 "stage isolation is enabled but nix is installed at \
411 {} (outside /nix/store). A distro-packaged nix is \
412 dynamically linked against host libraries; binding \
413 those into the sandbox would defeat isolation. \
414 Install nix via the Determinate / upstream \
415 installer (places nix under /nix/store) or pass \
416 --isolate=none to run without the sandbox.",
417 self.nix_bin.display()
418 ),
419 });
420 }
421 super::isolation::build_bwrap_command(bwrap_path, &policy, &raw_argv)
425 }
426 };
427
428 let mut child = spawn
429 .stdin(Stdio::piped())
430 .stdout(Stdio::piped())
431 .stderr(Stdio::piped())
432 .spawn()
433 .map_err(|e| ExecutionError::StageFailed {
434 stage_id: stage_id.clone(),
435 message: format!("failed to spawn process: {e}"),
436 })?;
437 let _ = raw_argv;
438
439 if let Some(mut stdin) = child.stdin.take() {
442 let bytes = input_json.into_bytes();
443 std::thread::spawn(move || {
444 let _ = stdin.write_all(&bytes);
445 });
446 }
447
448 let pid = child.id();
450 let timeout = Duration::from_secs(self.config.timeout_secs);
451 let (tx, rx) = mpsc::channel();
452 std::thread::spawn(move || {
453 let _ = tx.send(child.wait_with_output());
454 });
455
456 let out = match rx.recv_timeout(timeout) {
457 Ok(Ok(o)) => o,
458 Ok(Err(e)) => {
459 return Err(ExecutionError::StageFailed {
460 stage_id: stage_id.clone(),
461 message: format!("nix process error: {e}"),
462 });
463 }
464 Err(_elapsed) => {
465 let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
467 return Err(ExecutionError::TimedOut {
468 stage_id: stage_id.clone(),
469 timeout_secs: self.config.timeout_secs,
470 });
471 }
472 };
473
474 let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
476 let stderr = String::from_utf8_lossy(stderr_raw);
477
478 if !out.status.success() {
479 return Err(ExecutionError::StageFailed {
480 stage_id: stage_id.clone(),
481 message: Self::classify_error(&stderr, out.status.code()),
482 });
483 }
484
485 let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
487 let stdout = String::from_utf8_lossy(stdout_raw);
488
489 if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
490 return Err(ExecutionError::StageFailed {
491 stage_id: stage_id.clone(),
492 message: format!(
493 "stage output exceeded {} bytes limit",
494 self.config.max_output_bytes
495 ),
496 });
497 }
498
499 serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
500 stage_id: stage_id.clone(),
501 message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
502 })
503 }
504
505 fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
508 if stderr.contains("cannot connect to nix daemon")
510 || stderr.contains("Cannot connect to the Nix daemon")
511 {
512 return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
513 or `nix daemon`"
514 .to_string();
515 }
516 if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
517 return format!(
518 "nix flake error (check network / nixpkgs access): {}",
519 first_line(stderr)
520 );
521 }
522 if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
523 return format!(
524 "nix failed to fetch runtime package (check network): {}",
525 first_line(stderr)
526 );
527 }
528 if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
529 return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
530 .to_string();
531 }
532 if stderr.contains("nix: command not found") || stderr.contains("No such file") {
533 return "nix binary not found — is Nix installed?".to_string();
534 }
535 let code_str = exit_code
537 .map(|c| format!(" (exit {c})"))
538 .unwrap_or_default();
539 if stderr.trim().is_empty() {
540 format!("stage exited without output{code_str}")
541 } else {
542 format!("stage error{code_str}: {stderr}")
543 }
544 }
545
546 fn build_nix_command(
552 &self,
553 language: &str,
554 script: &Path,
555 code: &str,
556 ) -> (String, Vec<String>) {
557 let script_path = script.to_str().unwrap_or("/dev/null").to_string();
558
559 match language {
560 "python" | "python3" | "" => {
561 if let Some(reqs) = Self::extract_pip_requirements(code) {
565 let venv_hash = {
566 use sha2::{Digest, Sha256};
567 let h = Sha256::digest(reqs.as_bytes());
568 hex::encode(&h[..8])
569 };
570 let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
571 let venv_str = venv_dir.to_string_lossy().to_string();
572 let python = venv_dir.join("bin").join("python3");
573 let python_str = python.to_string_lossy().to_string();
574
575 if !python.exists() {
577 let setup = std::process::Command::new("python3")
578 .args(["-m", "venv", &venv_str])
579 .output();
580 if let Ok(out) = setup {
581 if out.status.success() {
582 let pip = venv_dir.join("bin").join("pip");
583 let pkgs: Vec<&str> = reqs.split(", ").collect();
584 let mut pip_args =
585 vec!["install", "--quiet", "--disable-pip-version-check"];
586 pip_args.extend(pkgs);
587 let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
588 .args(&pip_args)
589 .output();
590 }
591 }
592 }
593
594 return ("__direct__".to_string(), vec![python_str, script_path]);
596 }
597
598 let extra_pkgs = Self::detect_python_packages(code);
599 if extra_pkgs.is_empty() {
600 (
601 "run".to_string(),
602 vec!["nixpkgs#python3".into(), "--".into(), script_path],
603 )
604 } else {
605 let mut args: Vec<String> = extra_pkgs
606 .iter()
607 .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
608 .collect();
609 args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
610 ("shell".to_string(), args)
611 }
612 }
613 "javascript" | "js" => (
614 "run".to_string(),
615 vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
616 ),
617 _ => (
618 "run".to_string(),
619 vec!["nixpkgs#bash".into(), "--".into(), script_path],
620 ),
621 }
622 }
623
624 fn extract_pip_requirements(code: &str) -> Option<String> {
638 for line in code.lines() {
639 let trimmed = line.trim();
640 let Some(reqs_raw) = trimmed.strip_prefix("# requires:") else {
641 continue;
642 };
643 let reqs = reqs_raw.trim();
644 if reqs.is_empty() {
645 continue;
646 }
647 let valid: Vec<String> = reqs
648 .split(',')
649 .map(|s| s.trim())
650 .filter(|s| !s.is_empty())
651 .filter(|s| match validate_pip_spec(s) {
652 Ok(()) => true,
653 Err(reason) => {
654 eprintln!(
655 "[noether] rejected `# requires:` entry {s:?} ({reason}); skipping"
656 );
657 false
658 }
659 })
660 .map(|s| s.to_string())
661 .collect();
662
663 if valid.is_empty() {
664 eprintln!(
665 "[noether] all `# requires:` entries rejected (raw={reqs:?}); falling back to default Nix runtime"
666 );
667 return None;
668 }
669 return Some(valid.join(", "));
670 }
671 None
672 }
673
674 fn detect_python_packages(code: &str) -> Vec<&'static str> {
677 const KNOWN: &[(&str, &str)] = &[
679 ("requests", "requests"),
680 ("httpx", "httpx"),
681 ("aiohttp", "aiohttp"),
682 ("bs4", "beautifulsoup4"),
683 ("lxml", "lxml"),
684 ("pandas", "pandas"),
685 ("numpy", "numpy"),
686 ("scipy", "scipy"),
687 ("sklearn", "scikit-learn"),
688 ("PIL", "Pillow"),
689 ("cv2", "opencv4"),
690 ("yaml", "pyyaml"),
691 ("toml", "toml"),
692 ("dateutil", "python-dateutil"),
693 ("pytz", "pytz"),
694 ("boto3", "boto3"),
695 ("psycopg2", "psycopg2"),
696 ("pymongo", "pymongo"),
697 ("redis", "redis"),
698 ("celery", "celery"),
699 ("fastapi", "fastapi"),
700 ("pydantic", "pydantic"),
701 ("cryptography", "cryptography"),
702 ("jwt", "pyjwt"),
703 ("paramiko", "paramiko"),
704 ("dotenv", "python-dotenv"),
705 ("joblib", "joblib"),
706 ("torch", "pytorch"),
707 ("transformers", "transformers"),
708 ("datasets", "datasets"),
709 ("pyarrow", "pyarrow"),
710 ];
711
712 let mut found: Vec<&'static str> = Vec::new();
713 for (import_name, nix_name) in KNOWN {
714 let patterns = [
715 format!("import {import_name}"),
716 format!("import {import_name} "),
717 format!("from {import_name} "),
718 format!("from {import_name}."),
719 ];
720 if patterns.iter().any(|p| code.contains(p.as_str())) {
721 found.push(nix_name);
722 }
723 }
724 found
725 }
726
727 #[cfg(test)]
730 #[allow(dead_code)]
731 fn _expose_extract_future_imports(code: &str) -> (String, String) {
732 Self::extract_future_imports(code)
733 }
734
735 fn extract_future_imports(code: &str) -> (String, String) {
741 let mut hoisted = String::new();
742 let mut remaining = String::new();
743 for line in code.lines() {
744 let trimmed = line.trim_start();
745 if !line.starts_with(' ')
746 && !line.starts_with('\t')
747 && trimmed.starts_with("from __future__ import")
748 {
749 hoisted.push_str(line);
750 hoisted.push('\n');
751 } else {
752 remaining.push_str(line);
753 remaining.push('\n');
754 }
755 }
756 (hoisted, remaining)
757 }
758
759 fn wrap_python(user_code: &str) -> String {
760 let pip_install = String::new();
764
765 let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
773
774 format!(
775 r#"{future_imports}import sys, json as _json
776{pip_install}
777# ---- user implementation ----
778{user_code_clean}
779# ---- end implementation ----
780
781if __name__ == '__main__':
782 if 'execute' not in dir() or not callable(globals().get('execute')):
783 print(
784 "Noether stage error: implementation must define a top-level "
785 "function `def execute(input): ...` that takes the parsed input dict "
786 "and returns the output dict. Do not read from stdin or print to stdout — "
787 "the Noether runtime handles I/O for you.",
788 file=sys.stderr,
789 )
790 sys.exit(1)
791 try:
792 _raw = _json.loads(sys.stdin.read())
793 # If the runtime passed input as a JSON-encoded string, decode it once more.
794 # This happens when input arrives as null or a bare string from the CLI.
795 if isinstance(_raw, str):
796 try:
797 _raw = _json.loads(_raw)
798 except Exception:
799 pass
800 _output = execute(_raw if _raw is not None else {{}})
801 print(_json.dumps(_output))
802 except Exception as _e:
803 print(str(_e), file=sys.stderr)
804 sys.exit(1)
805"#
806 )
807 }
808
809 fn wrap_javascript(user_code: &str) -> String {
810 format!(
811 r#"const _readline = require('readline');
812let _input = '';
813process.stdin.on('data', d => _input += d);
814process.stdin.on('end', () => {{
815 try {{
816 // ---- user implementation ----
817 {user_code}
818 // ---- end implementation ----
819 const _result = execute(JSON.parse(_input));
820 process.stdout.write(JSON.stringify(_result) + '\n');
821 }} catch (e) {{
822 process.stderr.write(String(e) + '\n');
823 process.exit(1);
824 }}
825}});
826"#
827 )
828 }
829
830 fn wrap_bash(user_code: &str) -> String {
831 format!(
832 r#"#!/usr/bin/env bash
833set -euo pipefail
834INPUT=$(cat)
835
836# ---- user implementation ----
837{user_code}
838# ---- end implementation ----
839
840execute "$INPUT"
841"#
842 )
843 }
844}
845
846fn first_line(s: &str) -> &str {
850 s.lines()
851 .map(str::trim)
852 .find(|l| !l.is_empty())
853 .unwrap_or(s)
854}
855
856fn validate_pip_spec(spec: &str) -> Result<(), &'static str> {
865 let allow_unpinned = matches!(
866 std::env::var("NOETHER_ALLOW_UNPINNED_PIP").as_deref(),
867 Ok("1" | "true" | "yes" | "on")
868 );
869
870 let (name, version) = match spec.split_once("==") {
872 Some((n, v)) => (n.trim(), Some(v.trim())),
873 None => {
874 if !allow_unpinned {
875 return Err("unpinned; use pkg==version or set NOETHER_ALLOW_UNPINNED_PIP=1");
876 }
877 (spec.trim(), None)
878 }
879 };
880
881 if name.is_empty() {
882 return Err("empty package name");
883 }
884 if !name
885 .bytes()
886 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'_' | b'-' | b'.'))
887 {
888 return Err("package name contains disallowed characters");
889 }
890 if let Some(v) = version {
891 if v.is_empty() {
892 return Err("empty version after `==`");
893 }
894 if !v
895 .bytes()
896 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'+' | b'!' | b'-'))
897 {
898 return Err("version contains disallowed characters");
899 }
900 }
901 Ok(())
902}
903
904impl StageExecutor for NixExecutor {
907 fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
908 let impl_ = self
909 .implementations
910 .get(&stage_id.0)
911 .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
912
913 let code_hash = Self::code_hash(&impl_.code);
914 let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
915 self.run_script(stage_id, &script, &impl_.language, input)
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use super::*;
922
923 #[allow(dead_code)] fn make_executor() -> NixExecutor {
925 let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
926 let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
927 let _ = std::fs::create_dir_all(&cache_dir);
928 NixExecutor {
929 nix_bin,
930 cache_dir,
931 config: NixConfig::default(),
932 implementations: HashMap::new(),
933 }
934 }
935
936 #[test]
937 fn register_with_effects_preserves_network_effect() {
938 use noether_core::effects::{Effect, EffectSet};
947 let mut exec = make_executor();
948 let id = StageId("sig_network".into());
949 let effects = EffectSet::new([Effect::Pure, Effect::Network]);
950 exec.register_with_effects(&id, "code", "python", effects.clone());
951 let stored = exec
952 .implementations
953 .get(&id.0)
954 .expect("stage should be registered");
955 assert_eq!(
956 stored.effects, effects,
957 "declared effects must survive register_with_effects"
958 );
959 assert!(
960 stored.effects.iter().any(|e| matches!(e, Effect::Network)),
961 "Network must be preserved so the sandbox opens the net ns"
962 );
963 }
964
965 #[test]
966 fn validate_pip_spec_accepts_pinned() {
967 assert!(validate_pip_spec("pandas==2.0.0").is_ok());
968 assert!(validate_pip_spec("scikit-learn==1.5.1").is_ok());
969 assert!(validate_pip_spec("urllib3==2.2.3").is_ok());
970 assert!(validate_pip_spec("pydantic==2.5.0+cu121").is_ok());
971 }
972
973 #[test]
974 fn validate_pip_spec_rejects_unpinned_by_default() {
975 let guard = (std::env::var_os("NOETHER_ALLOW_UNPINNED_PIP"),);
977 unsafe {
979 std::env::remove_var("NOETHER_ALLOW_UNPINNED_PIP");
980 }
981 let result = validate_pip_spec("pandas");
982 if let (Some(prev),) = guard {
984 unsafe {
985 std::env::set_var("NOETHER_ALLOW_UNPINNED_PIP", prev);
986 }
987 }
988 assert!(result.is_err(), "bare name must be rejected without opt-in");
989 }
990
991 #[test]
992 fn validate_pip_spec_rejects_shell_metacharacters() {
993 for bad in [
994 "pandas; rm -rf /",
995 "pandas==$(whoami)",
996 "pandas==1.0.0; echo pwned",
997 "pandas==`id`",
998 "https://evil.example/wheel.whl",
999 "git+https://example.com/repo.git",
1000 "pkg with space==1.0",
1001 "pkg==1.0 && echo",
1002 ] {
1003 assert!(validate_pip_spec(bad).is_err(), "should reject {bad:?}");
1004 }
1005 }
1006
1007 #[test]
1008 fn validate_pip_spec_rejects_empty() {
1009 assert!(validate_pip_spec("==1.0").is_err());
1010 assert!(validate_pip_spec("pkg==").is_err());
1011 }
1012
1013 #[test]
1014 fn detect_python_packages_requests() {
1015 let code = "import requests\ndef execute(v):\n return requests.get(v).json()";
1016 let pkgs = NixExecutor::detect_python_packages(code);
1017 assert!(
1018 pkgs.contains(&"requests"),
1019 "expected 'requests' in {pkgs:?}"
1020 );
1021 }
1022
1023 #[test]
1024 fn detect_python_packages_stdlib_only() {
1025 let code = "import urllib.request, json\ndef execute(v):\n return json.loads(v)";
1026 let pkgs = NixExecutor::detect_python_packages(code);
1027 assert!(
1028 pkgs.is_empty(),
1029 "stdlib imports should not trigger packages: {pkgs:?}"
1030 );
1031 }
1032
1033 #[test]
1034 fn detect_python_packages_multiple() {
1035 let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
1036 let pkgs = NixExecutor::detect_python_packages(code);
1037 assert!(pkgs.contains(&"pandas"));
1038 assert!(pkgs.contains(&"numpy"));
1039 assert!(pkgs.contains(&"beautifulsoup4"));
1040 }
1041
1042 fn test_executor() -> NixExecutor {
1043 NixExecutor {
1044 nix_bin: PathBuf::from("/usr/bin/nix"),
1045 cache_dir: PathBuf::from("/tmp/noether-test-cache"),
1046 config: NixConfig::default(),
1047 implementations: HashMap::new(),
1048 }
1049 }
1050
1051 #[test]
1052 fn build_nix_command_no_packages() {
1053 let exec = test_executor();
1054 let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
1055 assert_eq!(sub, "run");
1056 assert!(args.iter().any(|a| a.contains("python3")));
1057 assert!(!args.iter().any(|a| a.contains("shell")));
1058 }
1059
1060 #[test]
1061 fn build_nix_command_with_requests() {
1062 let exec = test_executor();
1063 let (sub, args) =
1064 exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
1065 assert_eq!(sub, "shell");
1066 assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
1067 assert!(args.iter().any(|a| a == "--command"));
1068 assert!(
1070 !args.iter().any(|a| a == "nixpkgs#python3"),
1071 "bare python3 conflicts: {args:?}"
1072 );
1073 }
1074
1075 #[test]
1076 fn python_wrapper_contains_boilerplate() {
1077 let wrapped = NixExecutor::wrap_python("def execute(x):\n return x + 1");
1078 assert!(wrapped.contains("sys.stdin.read()"));
1079 assert!(wrapped.contains("_json.dumps(_output)"));
1080 assert!(wrapped.contains("def execute(x)"));
1081 }
1082
1083 #[test]
1084 fn code_hash_is_stable() {
1085 let h1 = NixExecutor::code_hash("hello world");
1086 let h2 = NixExecutor::code_hash("hello world");
1087 let h3 = NixExecutor::code_hash("different");
1088 assert_eq!(h1, h2);
1089 assert_ne!(h1, h3);
1090 }
1091
1092 #[test]
1093 fn classify_error_daemon_not_running() {
1094 let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
1095 assert!(msg.contains("nix daemon is not running"), "got: {msg}");
1096 }
1097
1098 #[test]
1099 fn future_imports_are_hoisted_out_of_user_code() {
1100 let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n return input\n";
1101 let wrapped = NixExecutor::wrap_python(user);
1102 let future_pos = wrapped
1104 .find("from __future__ import annotations")
1105 .expect("future import should be present in wrapper");
1106 let stdlib_pos = wrapped
1107 .find("import sys, json as _json")
1108 .expect("stdlib imports should be present");
1109 assert!(
1110 future_pos < stdlib_pos,
1111 "future import must precede stdlib imports in wrapped output"
1112 );
1113 }
1114
1115 #[test]
1116 fn user_code_without_future_imports_is_unchanged() {
1117 let user = "import json\n\ndef execute(input):\n return input\n";
1118 let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
1119 assert_eq!(hoisted, "");
1120 assert_eq!(remaining.trim(), user.trim());
1121 }
1122
1123 #[test]
1124 fn nested_future_import_inside_function_is_not_hoisted() {
1125 let user =
1128 "def execute(input):\n from __future__ import annotations\n return input\n";
1129 let (hoisted, _) = NixExecutor::extract_future_imports(user);
1130 assert_eq!(hoisted, "");
1131 }
1132
1133 #[test]
1134 fn classify_error_user_code_exit1() {
1135 let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
1136 assert!(msg.contains("ValueError"), "got: {msg}");
1137 assert!(msg.contains("exit 1"), "got: {msg}");
1138 }
1139
1140 #[test]
1141 fn classify_error_disk_full() {
1142 let msg = NixExecutor::classify_error("No space left on device", Some(1));
1143 assert!(msg.contains("disk space"), "got: {msg}");
1144 }
1145
1146 #[test]
1147 fn classify_error_empty_stderr() {
1148 let msg = NixExecutor::classify_error("", Some(137));
1149 assert!(msg.contains("exit 137"), "got: {msg}");
1150 }
1151
1152 #[test]
1153 fn nix_config_defaults() {
1154 let cfg = NixConfig::default();
1155 assert_eq!(cfg.timeout_secs, 30);
1156 assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
1157 assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
1158 }
1159
1160 #[test]
1161 fn first_line_extracts_correctly() {
1162 assert_eq!(first_line(" \nfoo\nbar"), "foo");
1163 assert_eq!(first_line("single"), "single");
1164 assert_eq!(first_line(""), "");
1165 }
1166
1167 #[test]
1170 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1171 fn nix_python_identity_stage() {
1172 let nix_bin = match NixExecutor::find_nix() {
1173 Some(p) => p,
1174 None => {
1175 eprintln!("nix not found, skipping");
1176 return;
1177 }
1178 };
1179
1180 let cache_dir = std::env::temp_dir().join("noether-nix-integ");
1181 let _ = std::fs::create_dir_all(&cache_dir);
1182
1183 let code = "def execute(x):\n return x";
1184 let executor = NixExecutor {
1185 nix_bin,
1186 cache_dir,
1187 config: NixConfig::default(),
1188 implementations: {
1189 let mut m = HashMap::new();
1190 let id = StageId("test_identity".into());
1191 m.insert(
1192 id.0.clone(),
1193 StageImpl {
1194 code: code.into(),
1195 language: "python".into(),
1196 effects: noether_core::effects::EffectSet::pure(),
1197 },
1198 );
1199 m
1200 },
1201 };
1202
1203 let id = StageId("test_identity".into());
1204 let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
1205 assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
1206 }
1207
1208 #[test]
1211 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1212 fn nix_timeout_kills_hanging_stage() {
1213 let nix_bin = match NixExecutor::find_nix() {
1214 Some(p) => p,
1215 None => {
1216 eprintln!("nix not found, skipping timeout test");
1217 return;
1218 }
1219 };
1220
1221 let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
1222 let _ = std::fs::create_dir_all(&cache_dir);
1223
1224 let code = "import time\ndef execute(x):\n time.sleep(9999)\n return x";
1225 let executor = NixExecutor {
1226 nix_bin,
1227 cache_dir,
1228 config: NixConfig {
1229 timeout_secs: 2,
1230 ..NixConfig::default()
1231 },
1232 implementations: {
1233 let mut m = HashMap::new();
1234 let id = StageId("hanging".into());
1235 m.insert(
1236 id.0.clone(),
1237 StageImpl {
1238 code: code.into(),
1239 language: "python".into(),
1240 effects: noether_core::effects::EffectSet::pure(),
1241 },
1242 );
1243 m
1244 },
1245 };
1246
1247 let id = StageId("hanging".into());
1248 let result = executor.execute(&id, &serde_json::json!(null));
1249 assert!(
1250 matches!(
1251 result,
1252 Err(ExecutionError::TimedOut {
1253 timeout_secs: 2,
1254 ..
1255 })
1256 ),
1257 "expected TimedOut, got: {result:?}"
1258 );
1259 }
1260}