1use super::{ExecutionError, StageExecutor};
46use noether_core::stage::StageId;
47use serde_json::Value;
48use sha2::{Digest, Sha256};
49use std::collections::HashMap;
50use std::io::Write as IoWrite;
51use std::path::{Path, PathBuf};
52use std::process::{Command, Stdio};
53use std::sync::mpsc;
54use std::time::Duration;
55
56#[derive(Debug, Clone)]
60pub struct NixConfig {
61 pub timeout_secs: u64,
65 pub max_output_bytes: usize,
69 pub max_stderr_bytes: usize,
72 pub isolation: super::isolation::IsolationBackend,
78}
79
80impl Default for NixConfig {
81 fn default() -> Self {
82 Self {
83 timeout_secs: 30,
84 max_output_bytes: 10 * 1024 * 1024,
85 max_stderr_bytes: 64 * 1024,
86 isolation: super::isolation::IsolationBackend::None,
87 }
88 }
89}
90
91impl NixConfig {
92 pub fn with_isolation(mut self, backend: super::isolation::IsolationBackend) -> Self {
94 self.isolation = backend;
95 self
96 }
97}
98
99#[derive(Clone)]
105struct StageImpl {
106 code: String,
107 language: String,
108 effects: noether_core::effects::EffectSet,
109}
110
111pub struct NixExecutor {
128 nix_bin: PathBuf,
129 cache_dir: PathBuf,
130 config: NixConfig,
131 implementations: HashMap<String, StageImpl>,
132}
133
134impl NixExecutor {
135 pub fn find_nix() -> Option<PathBuf> {
138 let determinate = PathBuf::from("/nix/var/nix/profiles/default/bin/nix");
140 if determinate.exists() {
141 return Some(determinate);
142 }
143
144 let path_env = std::env::var_os("PATH")?;
148 for dir in std::env::split_paths(&path_env) {
149 let candidate = dir.join("nix");
150 if candidate.is_file() {
151 return Some(candidate);
152 }
153 }
154 None
155 }
156
157 pub fn from_store(store: &dyn noether_store::StageStore) -> Option<Self> {
162 Self::from_store_with_config(store, NixConfig::default())
163 }
164
165 pub fn from_store_with_config(
167 store: &dyn noether_store::StageStore,
168 config: NixConfig,
169 ) -> Option<Self> {
170 let nix_bin = Self::find_nix()?;
171
172 let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into());
173 let cache_dir = PathBuf::from(home).join(".noether").join("impl_cache");
174 let _ = std::fs::create_dir_all(&cache_dir);
175
176 let mut implementations = HashMap::new();
177 for stage in store.list(None) {
178 if let (Some(code), Some(lang)) =
179 (&stage.implementation_code, &stage.implementation_language)
180 {
181 implementations.insert(
182 stage.id.0.clone(),
183 StageImpl {
184 code: code.clone(),
185 language: lang.clone(),
186 effects: stage.signature.effects.clone(),
187 },
188 );
189 }
190 }
191
192 Some(Self {
193 nix_bin,
194 cache_dir,
195 config,
196 implementations,
197 })
198 }
199
200 pub fn config_snapshot(&self) -> NixConfig {
203 self.config.clone()
204 }
205
206 pub fn rebuild_with_config(mut self, config: NixConfig) -> Option<Self> {
211 self.config = config;
212 Some(self)
213 }
214
215 pub fn register_with_effects(
219 &mut self,
220 stage_id: &StageId,
221 code: &str,
222 language: &str,
223 effects: noether_core::effects::EffectSet,
224 ) {
225 self.implementations.insert(
226 stage_id.0.clone(),
227 StageImpl {
228 code: code.into(),
229 language: language.into(),
230 effects,
231 },
232 );
233 }
234
235 pub fn has_implementation(&self, stage_id: &StageId) -> bool {
237 self.implementations.contains_key(&stage_id.0)
238 }
239
240 pub fn warmup(&self) -> std::thread::JoinHandle<()> {
250 let nix_bin = self.nix_bin.clone();
251 std::thread::spawn(move || {
252 let status = Command::new(&nix_bin)
255 .args([
256 "build",
257 "--no-link",
258 "--quiet",
259 "--no-write-lock-file",
260 "nixpkgs#python3",
261 ])
262 .stdout(Stdio::null())
263 .stderr(Stdio::null())
264 .status();
265 match status {
266 Ok(s) if s.success() => {
267 eprintln!("[noether] nix warmup: python3 runtime cached");
268 }
269 Ok(s) => {
270 eprintln!("[noether] nix warmup: exited with {s} (non-fatal)");
271 }
272 Err(e) => {
273 eprintln!("[noether] nix warmup: failed to spawn ({e}) (non-fatal)");
274 }
275 }
276 })
277 }
278
279 fn code_hash(code: &str) -> String {
283 hex::encode(Sha256::digest(code.as_bytes()))
284 }
285
286 fn ensure_script(
289 &self,
290 impl_hash: &str,
291 code: &str,
292 language: &str,
293 ) -> Result<PathBuf, ExecutionError> {
294 let ext = match language {
295 "javascript" | "js" => "js",
296 "bash" | "sh" => "sh",
297 _ => "py",
298 };
299
300 let path = self.cache_dir.join(format!("{impl_hash}.{ext}"));
301 if path.exists() {
302 return Ok(path);
303 }
304
305 let wrapped = match language {
306 "javascript" | "js" => Self::wrap_javascript(code),
307 "bash" | "sh" => Self::wrap_bash(code),
308 _ => Self::wrap_python(code),
309 };
310
311 std::fs::write(&path, &wrapped).map_err(|e| ExecutionError::StageFailed {
312 stage_id: StageId(impl_hash.into()),
313 message: format!("failed to write stage script: {e}"),
314 })?;
315
316 Ok(path)
317 }
318
319 fn run_script(
322 &self,
323 stage_id: &StageId,
324 script: &Path,
325 language: &str,
326 input: &Value,
327 ) -> Result<Value, ExecutionError> {
328 let input_json = serde_json::to_string(input).unwrap_or_default();
329
330 let code = self
331 .implementations
332 .get(&stage_id.0)
333 .map(|i| i.code.as_str())
334 .unwrap_or("");
335
336 let (nix_subcommand, args) = self.build_nix_command(language, script, code);
337
338 let raw_argv: Vec<String> = if nix_subcommand == "__direct__" {
343 args.clone()
344 } else {
345 let mut v = vec![self.nix_bin.display().to_string(), nix_subcommand.clone()];
346 v.push("--no-write-lock-file".into());
347 v.push("--quiet".into());
348 v.extend(args.iter().cloned());
349 v
350 };
351
352 let mut spawn = match &self.config.isolation {
353 super::isolation::IsolationBackend::None => {
354 let mut cmd = Command::new(&raw_argv[0]);
356 cmd.args(&raw_argv[1..]);
357 cmd
358 }
359 super::isolation::IsolationBackend::Bwrap { bwrap_path } => {
360 let mut policy = super::isolation::IsolationPolicy::from_effects(
364 self.implementations
365 .get(&stage_id.0)
366 .map(|i| &i.effects)
367 .unwrap_or(&noether_core::effects::EffectSet::pure()),
368 );
369 policy
374 .ro_binds
375 .push((self.cache_dir.to_path_buf(), self.cache_dir.to_path_buf()));
376 if !self.nix_bin.starts_with("/nix/store")
401 && !self.nix_bin.starts_with(&self.cache_dir)
402 {
403 return Err(ExecutionError::StageFailed {
404 stage_id: stage_id.clone(),
405 message: format!(
406 "stage isolation is enabled but nix is installed at \
407 {} (outside /nix/store). A distro-packaged nix is \
408 dynamically linked against host libraries; binding \
409 those into the sandbox would defeat isolation. \
410 Install nix via the Determinate / upstream \
411 installer (places nix under /nix/store) or pass \
412 --isolate=none to run without the sandbox.",
413 self.nix_bin.display()
414 ),
415 });
416 }
417 super::isolation::build_bwrap_command(bwrap_path, &policy, &raw_argv)
421 }
422 };
423
424 let mut child = spawn
425 .stdin(Stdio::piped())
426 .stdout(Stdio::piped())
427 .stderr(Stdio::piped())
428 .spawn()
429 .map_err(|e| ExecutionError::StageFailed {
430 stage_id: stage_id.clone(),
431 message: format!("failed to spawn process: {e}"),
432 })?;
433 let _ = raw_argv;
434
435 if let Some(mut stdin) = child.stdin.take() {
438 let bytes = input_json.into_bytes();
439 std::thread::spawn(move || {
440 let _ = stdin.write_all(&bytes);
441 });
442 }
443
444 let pid = child.id();
446 let timeout = Duration::from_secs(self.config.timeout_secs);
447 let (tx, rx) = mpsc::channel();
448 std::thread::spawn(move || {
449 let _ = tx.send(child.wait_with_output());
450 });
451
452 let out = match rx.recv_timeout(timeout) {
453 Ok(Ok(o)) => o,
454 Ok(Err(e)) => {
455 return Err(ExecutionError::StageFailed {
456 stage_id: stage_id.clone(),
457 message: format!("nix process error: {e}"),
458 });
459 }
460 Err(_elapsed) => {
461 let _ = Command::new("kill").args(["-9", &pid.to_string()]).status();
463 return Err(ExecutionError::TimedOut {
464 stage_id: stage_id.clone(),
465 timeout_secs: self.config.timeout_secs,
466 });
467 }
468 };
469
470 let stderr_raw = &out.stderr[..out.stderr.len().min(self.config.max_stderr_bytes)];
472 let stderr = String::from_utf8_lossy(stderr_raw);
473
474 if !out.status.success() {
475 return Err(ExecutionError::StageFailed {
476 stage_id: stage_id.clone(),
477 message: Self::classify_error(&stderr, out.status.code()),
478 });
479 }
480
481 let stdout_raw = &out.stdout[..out.stdout.len().min(self.config.max_output_bytes)];
483 let stdout = String::from_utf8_lossy(stdout_raw);
484
485 if stdout_raw.len() == self.config.max_output_bytes && !out.stdout.is_empty() {
486 return Err(ExecutionError::StageFailed {
487 stage_id: stage_id.clone(),
488 message: format!(
489 "stage output exceeded {} bytes limit",
490 self.config.max_output_bytes
491 ),
492 });
493 }
494
495 serde_json::from_str(stdout.trim()).map_err(|e| ExecutionError::StageFailed {
496 stage_id: stage_id.clone(),
497 message: format!("failed to parse stage output as JSON: {e} (got: {stdout:?})"),
498 })
499 }
500
501 fn classify_error(stderr: &str, exit_code: Option<i32>) -> String {
504 if stderr.contains("cannot connect to nix daemon")
506 || stderr.contains("Cannot connect to the Nix daemon")
507 {
508 return "nix daemon is not running — start it with `sudo systemctl start nix-daemon` \
509 or `nix daemon`"
510 .to_string();
511 }
512 if stderr.contains("error: flake") || stderr.contains("error: getting flake") {
513 return format!(
514 "nix flake error (check network / nixpkgs access): {}",
515 first_line(stderr)
516 );
517 }
518 if stderr.contains("error: downloading") || stderr.contains("error: fetching") {
519 return format!(
520 "nix failed to fetch runtime package (check network): {}",
521 first_line(stderr)
522 );
523 }
524 if stderr.contains("out of disk space") || stderr.contains("No space left on device") {
525 return "nix store out of disk space — run `nix-collect-garbage -d` to free space"
526 .to_string();
527 }
528 if stderr.contains("nix: command not found") || stderr.contains("No such file") {
529 return "nix binary not found — is Nix installed?".to_string();
530 }
531 let code_str = exit_code
533 .map(|c| format!(" (exit {c})"))
534 .unwrap_or_default();
535 if stderr.trim().is_empty() {
536 format!("stage exited without output{code_str}")
537 } else {
538 format!("stage error{code_str}: {stderr}")
539 }
540 }
541
542 fn build_nix_command(
548 &self,
549 language: &str,
550 script: &Path,
551 code: &str,
552 ) -> (String, Vec<String>) {
553 let script_path = script.to_str().unwrap_or("/dev/null").to_string();
554
555 match language {
556 "python" | "python3" | "" => {
557 if let Some(reqs) = Self::extract_pip_requirements(code) {
561 let venv_hash = {
562 use sha2::{Digest, Sha256};
563 let h = Sha256::digest(reqs.as_bytes());
564 hex::encode(&h[..8])
565 };
566 let venv_dir = self.cache_dir.join(format!("venv-{venv_hash}"));
567 let venv_str = venv_dir.to_string_lossy().to_string();
568 let python = venv_dir.join("bin").join("python3");
569 let python_str = python.to_string_lossy().to_string();
570
571 if !python.exists() {
573 let setup = std::process::Command::new("python3")
574 .args(["-m", "venv", &venv_str])
575 .output();
576 if let Ok(out) = setup {
577 if out.status.success() {
578 let pip = venv_dir.join("bin").join("pip");
579 let pkgs: Vec<&str> = reqs.split(", ").collect();
580 let mut pip_args =
581 vec!["install", "--quiet", "--disable-pip-version-check"];
582 pip_args.extend(pkgs);
583 let _ = std::process::Command::new(pip.to_string_lossy().as_ref())
584 .args(&pip_args)
585 .output();
586 }
587 }
588 }
589
590 return ("__direct__".to_string(), vec![python_str, script_path]);
592 }
593
594 let extra_pkgs = Self::detect_python_packages(code);
595 if extra_pkgs.is_empty() {
596 (
597 "run".to_string(),
598 vec!["nixpkgs#python3".into(), "--".into(), script_path],
599 )
600 } else {
601 let mut args: Vec<String> = extra_pkgs
602 .iter()
603 .map(|pkg| format!("nixpkgs#python3Packages.{pkg}"))
604 .collect();
605 args.extend_from_slice(&["--command".into(), "python3".into(), script_path]);
606 ("shell".to_string(), args)
607 }
608 }
609 "javascript" | "js" => (
610 "run".to_string(),
611 vec!["nixpkgs#nodejs".into(), "--".into(), script_path],
612 ),
613 _ => (
614 "run".to_string(),
615 vec!["nixpkgs#bash".into(), "--".into(), script_path],
616 ),
617 }
618 }
619
620 fn extract_pip_requirements(code: &str) -> Option<String> {
634 for line in code.lines() {
635 let trimmed = line.trim();
636 if trimmed.starts_with("# requires:") {
637 let reqs = trimmed.strip_prefix("# requires:").unwrap().trim();
638 if reqs.is_empty() {
639 continue;
640 }
641 let valid: Vec<String> = reqs
642 .split(',')
643 .map(|s| s.trim())
644 .filter(|s| !s.is_empty())
645 .filter(|s| match validate_pip_spec(s) {
646 Ok(()) => true,
647 Err(reason) => {
648 eprintln!(
649 "[noether] rejected `# requires:` entry {s:?} ({reason}); skipping"
650 );
651 false
652 }
653 })
654 .map(|s| s.to_string())
655 .collect();
656
657 if valid.is_empty() {
658 eprintln!(
659 "[noether] all `# requires:` entries rejected (raw={reqs:?}); falling back to default Nix runtime"
660 );
661 return None;
662 }
663 return Some(valid.join(", "));
664 }
665 }
666 None
667 }
668
669 fn detect_python_packages(code: &str) -> Vec<&'static str> {
672 const KNOWN: &[(&str, &str)] = &[
674 ("requests", "requests"),
675 ("httpx", "httpx"),
676 ("aiohttp", "aiohttp"),
677 ("bs4", "beautifulsoup4"),
678 ("lxml", "lxml"),
679 ("pandas", "pandas"),
680 ("numpy", "numpy"),
681 ("scipy", "scipy"),
682 ("sklearn", "scikit-learn"),
683 ("PIL", "Pillow"),
684 ("cv2", "opencv4"),
685 ("yaml", "pyyaml"),
686 ("toml", "toml"),
687 ("dateutil", "python-dateutil"),
688 ("pytz", "pytz"),
689 ("boto3", "boto3"),
690 ("psycopg2", "psycopg2"),
691 ("pymongo", "pymongo"),
692 ("redis", "redis"),
693 ("celery", "celery"),
694 ("fastapi", "fastapi"),
695 ("pydantic", "pydantic"),
696 ("cryptography", "cryptography"),
697 ("jwt", "pyjwt"),
698 ("paramiko", "paramiko"),
699 ("dotenv", "python-dotenv"),
700 ("joblib", "joblib"),
701 ("torch", "pytorch"),
702 ("transformers", "transformers"),
703 ("datasets", "datasets"),
704 ("pyarrow", "pyarrow"),
705 ];
706
707 let mut found: Vec<&'static str> = Vec::new();
708 for (import_name, nix_name) in KNOWN {
709 let patterns = [
710 format!("import {import_name}"),
711 format!("import {import_name} "),
712 format!("from {import_name} "),
713 format!("from {import_name}."),
714 ];
715 if patterns.iter().any(|p| code.contains(p.as_str())) {
716 found.push(nix_name);
717 }
718 }
719 found
720 }
721
722 #[cfg(test)]
725 #[allow(dead_code)]
726 fn _expose_extract_future_imports(code: &str) -> (String, String) {
727 Self::extract_future_imports(code)
728 }
729
730 fn extract_future_imports(code: &str) -> (String, String) {
736 let mut hoisted = String::new();
737 let mut remaining = String::new();
738 for line in code.lines() {
739 let trimmed = line.trim_start();
740 if !line.starts_with(' ')
741 && !line.starts_with('\t')
742 && trimmed.starts_with("from __future__ import")
743 {
744 hoisted.push_str(line);
745 hoisted.push('\n');
746 } else {
747 remaining.push_str(line);
748 remaining.push('\n');
749 }
750 }
751 (hoisted, remaining)
752 }
753
754 fn wrap_python(user_code: &str) -> String {
755 let pip_install = String::new();
759
760 let (future_imports, user_code_clean) = Self::extract_future_imports(user_code);
768
769 format!(
770 r#"{future_imports}import sys, json as _json
771{pip_install}
772# ---- user implementation ----
773{user_code_clean}
774# ---- end implementation ----
775
776if __name__ == '__main__':
777 if 'execute' not in dir() or not callable(globals().get('execute')):
778 print(
779 "Noether stage error: implementation must define a top-level "
780 "function `def execute(input): ...` that takes the parsed input dict "
781 "and returns the output dict. Do not read from stdin or print to stdout — "
782 "the Noether runtime handles I/O for you.",
783 file=sys.stderr,
784 )
785 sys.exit(1)
786 try:
787 _raw = _json.loads(sys.stdin.read())
788 # If the runtime passed input as a JSON-encoded string, decode it once more.
789 # This happens when input arrives as null or a bare string from the CLI.
790 if isinstance(_raw, str):
791 try:
792 _raw = _json.loads(_raw)
793 except Exception:
794 pass
795 _output = execute(_raw if _raw is not None else {{}})
796 print(_json.dumps(_output))
797 except Exception as _e:
798 print(str(_e), file=sys.stderr)
799 sys.exit(1)
800"#
801 )
802 }
803
804 fn wrap_javascript(user_code: &str) -> String {
805 format!(
806 r#"const _readline = require('readline');
807let _input = '';
808process.stdin.on('data', d => _input += d);
809process.stdin.on('end', () => {{
810 try {{
811 // ---- user implementation ----
812 {user_code}
813 // ---- end implementation ----
814 const _result = execute(JSON.parse(_input));
815 process.stdout.write(JSON.stringify(_result) + '\n');
816 }} catch (e) {{
817 process.stderr.write(String(e) + '\n');
818 process.exit(1);
819 }}
820}});
821"#
822 )
823 }
824
825 fn wrap_bash(user_code: &str) -> String {
826 format!(
827 r#"#!/usr/bin/env bash
828set -euo pipefail
829INPUT=$(cat)
830
831# ---- user implementation ----
832{user_code}
833# ---- end implementation ----
834
835execute "$INPUT"
836"#
837 )
838 }
839}
840
841fn first_line(s: &str) -> &str {
845 s.lines()
846 .map(str::trim)
847 .find(|l| !l.is_empty())
848 .unwrap_or(s)
849}
850
851fn validate_pip_spec(spec: &str) -> Result<(), &'static str> {
860 let allow_unpinned = matches!(
861 std::env::var("NOETHER_ALLOW_UNPINNED_PIP").as_deref(),
862 Ok("1" | "true" | "yes" | "on")
863 );
864
865 let (name, version) = match spec.split_once("==") {
867 Some((n, v)) => (n.trim(), Some(v.trim())),
868 None => {
869 if !allow_unpinned {
870 return Err("unpinned; use pkg==version or set NOETHER_ALLOW_UNPINNED_PIP=1");
871 }
872 (spec.trim(), None)
873 }
874 };
875
876 if name.is_empty() {
877 return Err("empty package name");
878 }
879 if !name
880 .bytes()
881 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'_' | b'-' | b'.'))
882 {
883 return Err("package name contains disallowed characters");
884 }
885 if let Some(v) = version {
886 if v.is_empty() {
887 return Err("empty version after `==`");
888 }
889 if !v
890 .bytes()
891 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'+' | b'!' | b'-'))
892 {
893 return Err("version contains disallowed characters");
894 }
895 }
896 Ok(())
897}
898
899impl StageExecutor for NixExecutor {
902 fn execute(&self, stage_id: &StageId, input: &Value) -> Result<Value, ExecutionError> {
903 let impl_ = self
904 .implementations
905 .get(&stage_id.0)
906 .ok_or_else(|| ExecutionError::StageNotFound(stage_id.clone()))?;
907
908 let code_hash = Self::code_hash(&impl_.code);
909 let script = self.ensure_script(&code_hash, &impl_.code, &impl_.language)?;
910 self.run_script(stage_id, &script, &impl_.language, input)
911 }
912}
913
914#[cfg(test)]
915mod tests {
916 use super::*;
917
918 #[allow(dead_code)] fn make_executor() -> NixExecutor {
920 let nix_bin = NixExecutor::find_nix().unwrap_or_else(|| PathBuf::from("/usr/bin/nix"));
921 let cache_dir = std::env::temp_dir().join("noether-test-impl-cache");
922 let _ = std::fs::create_dir_all(&cache_dir);
923 NixExecutor {
924 nix_bin,
925 cache_dir,
926 config: NixConfig::default(),
927 implementations: HashMap::new(),
928 }
929 }
930
931 #[test]
932 fn register_with_effects_preserves_network_effect() {
933 use noether_core::effects::{Effect, EffectSet};
942 let mut exec = make_executor();
943 let id = StageId("sig_network".into());
944 let effects = EffectSet::new([Effect::Pure, Effect::Network]);
945 exec.register_with_effects(&id, "code", "python", effects.clone());
946 let stored = exec
947 .implementations
948 .get(&id.0)
949 .expect("stage should be registered");
950 assert_eq!(
951 stored.effects, effects,
952 "declared effects must survive register_with_effects"
953 );
954 assert!(
955 stored.effects.iter().any(|e| matches!(e, Effect::Network)),
956 "Network must be preserved so the sandbox opens the net ns"
957 );
958 }
959
960 #[test]
961 fn validate_pip_spec_accepts_pinned() {
962 assert!(validate_pip_spec("pandas==2.0.0").is_ok());
963 assert!(validate_pip_spec("scikit-learn==1.5.1").is_ok());
964 assert!(validate_pip_spec("urllib3==2.2.3").is_ok());
965 assert!(validate_pip_spec("pydantic==2.5.0+cu121").is_ok());
966 }
967
968 #[test]
969 fn validate_pip_spec_rejects_unpinned_by_default() {
970 let guard = (std::env::var_os("NOETHER_ALLOW_UNPINNED_PIP"),);
972 unsafe {
974 std::env::remove_var("NOETHER_ALLOW_UNPINNED_PIP");
975 }
976 let result = validate_pip_spec("pandas");
977 if let (Some(prev),) = guard {
979 unsafe {
980 std::env::set_var("NOETHER_ALLOW_UNPINNED_PIP", prev);
981 }
982 }
983 assert!(result.is_err(), "bare name must be rejected without opt-in");
984 }
985
986 #[test]
987 fn validate_pip_spec_rejects_shell_metacharacters() {
988 for bad in [
989 "pandas; rm -rf /",
990 "pandas==$(whoami)",
991 "pandas==1.0.0; echo pwned",
992 "pandas==`id`",
993 "https://evil.example/wheel.whl",
994 "git+https://example.com/repo.git",
995 "pkg with space==1.0",
996 "pkg==1.0 && echo",
997 ] {
998 assert!(validate_pip_spec(bad).is_err(), "should reject {bad:?}");
999 }
1000 }
1001
1002 #[test]
1003 fn validate_pip_spec_rejects_empty() {
1004 assert!(validate_pip_spec("==1.0").is_err());
1005 assert!(validate_pip_spec("pkg==").is_err());
1006 }
1007
1008 #[test]
1009 fn detect_python_packages_requests() {
1010 let code = "import requests\ndef execute(v):\n return requests.get(v).json()";
1011 let pkgs = NixExecutor::detect_python_packages(code);
1012 assert!(
1013 pkgs.contains(&"requests"),
1014 "expected 'requests' in {pkgs:?}"
1015 );
1016 }
1017
1018 #[test]
1019 fn detect_python_packages_stdlib_only() {
1020 let code = "import urllib.request, json\ndef execute(v):\n return json.loads(v)";
1021 let pkgs = NixExecutor::detect_python_packages(code);
1022 assert!(
1023 pkgs.is_empty(),
1024 "stdlib imports should not trigger packages: {pkgs:?}"
1025 );
1026 }
1027
1028 #[test]
1029 fn detect_python_packages_multiple() {
1030 let code = "import pandas\nimport numpy as np\nfrom bs4 import BeautifulSoup\ndef execute(v): pass";
1031 let pkgs = NixExecutor::detect_python_packages(code);
1032 assert!(pkgs.contains(&"pandas"));
1033 assert!(pkgs.contains(&"numpy"));
1034 assert!(pkgs.contains(&"beautifulsoup4"));
1035 }
1036
1037 fn test_executor() -> NixExecutor {
1038 NixExecutor {
1039 nix_bin: PathBuf::from("/usr/bin/nix"),
1040 cache_dir: PathBuf::from("/tmp/noether-test-cache"),
1041 config: NixConfig::default(),
1042 implementations: HashMap::new(),
1043 }
1044 }
1045
1046 #[test]
1047 fn build_nix_command_no_packages() {
1048 let exec = test_executor();
1049 let (sub, args) = exec.build_nix_command("python", Path::new("/tmp/x.py"), "import json");
1050 assert_eq!(sub, "run");
1051 assert!(args.iter().any(|a| a.contains("python3")));
1052 assert!(!args.iter().any(|a| a.contains("shell")));
1053 }
1054
1055 #[test]
1056 fn build_nix_command_with_requests() {
1057 let exec = test_executor();
1058 let (sub, args) =
1059 exec.build_nix_command("python", Path::new("/tmp/x.py"), "import requests");
1060 assert_eq!(sub, "shell");
1061 assert!(args.iter().any(|a| a.contains("python3Packages.requests")));
1062 assert!(args.iter().any(|a| a == "--command"));
1063 assert!(
1065 !args.iter().any(|a| a == "nixpkgs#python3"),
1066 "bare python3 conflicts: {args:?}"
1067 );
1068 }
1069
1070 #[test]
1071 fn python_wrapper_contains_boilerplate() {
1072 let wrapped = NixExecutor::wrap_python("def execute(x):\n return x + 1");
1073 assert!(wrapped.contains("sys.stdin.read()"));
1074 assert!(wrapped.contains("_json.dumps(_output)"));
1075 assert!(wrapped.contains("def execute(x)"));
1076 }
1077
1078 #[test]
1079 fn code_hash_is_stable() {
1080 let h1 = NixExecutor::code_hash("hello world");
1081 let h2 = NixExecutor::code_hash("hello world");
1082 let h3 = NixExecutor::code_hash("different");
1083 assert_eq!(h1, h2);
1084 assert_ne!(h1, h3);
1085 }
1086
1087 #[test]
1088 fn classify_error_daemon_not_running() {
1089 let msg = NixExecutor::classify_error("error: cannot connect to nix daemon", Some(1));
1090 assert!(msg.contains("nix daemon is not running"), "got: {msg}");
1091 }
1092
1093 #[test]
1094 fn future_imports_are_hoisted_out_of_user_code() {
1095 let user = "from __future__ import annotations\nimport json\n\ndef execute(input):\n return input\n";
1096 let wrapped = NixExecutor::wrap_python(user);
1097 let future_pos = wrapped
1099 .find("from __future__ import annotations")
1100 .expect("future import should be present in wrapper");
1101 let stdlib_pos = wrapped
1102 .find("import sys, json as _json")
1103 .expect("stdlib imports should be present");
1104 assert!(
1105 future_pos < stdlib_pos,
1106 "future import must precede stdlib imports in wrapped output"
1107 );
1108 }
1109
1110 #[test]
1111 fn user_code_without_future_imports_is_unchanged() {
1112 let user = "import json\n\ndef execute(input):\n return input\n";
1113 let (hoisted, remaining) = NixExecutor::extract_future_imports(user);
1114 assert_eq!(hoisted, "");
1115 assert_eq!(remaining.trim(), user.trim());
1116 }
1117
1118 #[test]
1119 fn nested_future_import_inside_function_is_not_hoisted() {
1120 let user =
1123 "def execute(input):\n from __future__ import annotations\n return input\n";
1124 let (hoisted, _) = NixExecutor::extract_future_imports(user);
1125 assert_eq!(hoisted, "");
1126 }
1127
1128 #[test]
1129 fn classify_error_user_code_exit1() {
1130 let msg = NixExecutor::classify_error("ValueError: invalid input", Some(1));
1131 assert!(msg.contains("ValueError"), "got: {msg}");
1132 assert!(msg.contains("exit 1"), "got: {msg}");
1133 }
1134
1135 #[test]
1136 fn classify_error_disk_full() {
1137 let msg = NixExecutor::classify_error("No space left on device", Some(1));
1138 assert!(msg.contains("disk space"), "got: {msg}");
1139 }
1140
1141 #[test]
1142 fn classify_error_empty_stderr() {
1143 let msg = NixExecutor::classify_error("", Some(137));
1144 assert!(msg.contains("exit 137"), "got: {msg}");
1145 }
1146
1147 #[test]
1148 fn nix_config_defaults() {
1149 let cfg = NixConfig::default();
1150 assert_eq!(cfg.timeout_secs, 30);
1151 assert_eq!(cfg.max_output_bytes, 10 * 1024 * 1024);
1152 assert_eq!(cfg.max_stderr_bytes, 64 * 1024);
1153 }
1154
1155 #[test]
1156 fn first_line_extracts_correctly() {
1157 assert_eq!(first_line(" \nfoo\nbar"), "foo");
1158 assert_eq!(first_line("single"), "single");
1159 assert_eq!(first_line(""), "");
1160 }
1161
1162 #[test]
1165 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1166 fn nix_python_identity_stage() {
1167 let nix_bin = match NixExecutor::find_nix() {
1168 Some(p) => p,
1169 None => {
1170 eprintln!("nix not found, skipping");
1171 return;
1172 }
1173 };
1174
1175 let cache_dir = std::env::temp_dir().join("noether-nix-integ");
1176 let _ = std::fs::create_dir_all(&cache_dir);
1177
1178 let code = "def execute(x):\n return x";
1179 let executor = NixExecutor {
1180 nix_bin,
1181 cache_dir,
1182 config: NixConfig::default(),
1183 implementations: {
1184 let mut m = HashMap::new();
1185 let id = StageId("test_identity".into());
1186 m.insert(
1187 id.0.clone(),
1188 StageImpl {
1189 code: code.into(),
1190 language: "python".into(),
1191 effects: noether_core::effects::EffectSet::pure(),
1192 },
1193 );
1194 m
1195 },
1196 };
1197
1198 let id = StageId("test_identity".into());
1199 let result = executor.execute(&id, &serde_json::json!({"hello": "world"}));
1200 assert_eq!(result.unwrap(), serde_json::json!({"hello": "world"}));
1201 }
1202
1203 #[test]
1206 #[ignore = "requires nix + warm binary cache; run manually with `cargo test -- --ignored`"]
1207 fn nix_timeout_kills_hanging_stage() {
1208 let nix_bin = match NixExecutor::find_nix() {
1209 Some(p) => p,
1210 None => {
1211 eprintln!("nix not found, skipping timeout test");
1212 return;
1213 }
1214 };
1215
1216 let cache_dir = std::env::temp_dir().join("noether-nix-timeout");
1217 let _ = std::fs::create_dir_all(&cache_dir);
1218
1219 let code = "import time\ndef execute(x):\n time.sleep(9999)\n return x";
1220 let executor = NixExecutor {
1221 nix_bin,
1222 cache_dir,
1223 config: NixConfig {
1224 timeout_secs: 2,
1225 ..NixConfig::default()
1226 },
1227 implementations: {
1228 let mut m = HashMap::new();
1229 let id = StageId("hanging".into());
1230 m.insert(
1231 id.0.clone(),
1232 StageImpl {
1233 code: code.into(),
1234 language: "python".into(),
1235 effects: noether_core::effects::EffectSet::pure(),
1236 },
1237 );
1238 m
1239 },
1240 };
1241
1242 let id = StageId("hanging".into());
1243 let result = executor.execute(&id, &serde_json::json!(null));
1244 assert!(
1245 matches!(
1246 result,
1247 Err(ExecutionError::TimedOut {
1248 timeout_secs: 2,
1249 ..
1250 })
1251 ),
1252 "expected TimedOut, got: {result:?}"
1253 );
1254 }
1255}