use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use tempfile::NamedTempFile;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::time::timeout;
use crate::auth::GcpAuth;
use crate::context::ExecutionContext;
use crate::error::ToolError;
use crate::registry::{Tool, ToolConfig};
use crate::result::{ToolResult, ToolStatus};
use crate::template::TemplateEngine;
const GCS_READ_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_only"];
const DEFAULT_LOADER_TIMEOUT_SECS: u64 = 30;
const NOETL_RESULT_MARKER: &str = "@@__NOETL_RESULT__@@";
fn wrap_top_level_return(code: &str) -> String {
let mut found_return = false;
for line in code.lines() {
if line.starts_with(' ') || line.starts_with('\t') {
continue;
}
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
if trimmed.starts_with("def ")
|| trimmed.starts_with("async def ")
|| trimmed.starts_with("class ")
{
break;
}
if trimmed.starts_with("return ") || trimmed == "return" {
found_return = true;
break;
}
}
if !found_return {
return code.to_string();
}
let indented: String = code
.lines()
.map(|l| format!(" {l}"))
.collect::<Vec<_>>()
.join("\n");
format!(
"def __noetl_step__(args, input_data, **kw):\n{indented}\n\nresult = __noetl_step__(args, input_data)"
)
}
fn extract_result_from_stdout(stdout: &str) -> (Option<serde_json::Value>, String) {
let mut kept_lines = Vec::new();
let mut captured: Option<serde_json::Value> = None;
for line in stdout.lines() {
if let Some(rest) = line.strip_prefix(NOETL_RESULT_MARKER) {
captured = serde_json::from_str(rest).ok();
continue;
}
kept_lines.push(line);
}
let mut cleaned = kept_lines.join("\n");
if stdout.ends_with('\n') && !cleaned.is_empty() {
cleaned.push('\n');
}
(captured, cleaned)
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PythonScriptSource {
#[serde(rename = "type", default, skip_serializing_if = "Option::is_none")]
pub source_type: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub code: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub auth: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub method: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout: Option<u64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PythonScript {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub uri: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source: Option<PythonScriptSource>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PythonConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub code: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub script: Option<PythonScript>,
#[serde(default)]
pub args: HashMap<String, serde_json::Value>,
#[serde(default = "default_python")]
pub python: String,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout_seconds: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
}
fn default_python() -> String {
std::env::var("PYTHON_PATH").unwrap_or_else(|_| "python3".to_string())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PythonSource {
Inline(String),
File { path: String },
Gcs { uri: String },
Http {
endpoint: String,
method: String,
timeout_secs: u64,
},
}
impl PythonConfig {
pub fn resolve_source(&self) -> Result<PythonSource, ToolError> {
if let Some(code) = self.code.as_deref() {
return Ok(PythonSource::Inline(code.to_string()));
}
let script = self.script.as_ref();
let source = script.and_then(|s| s.source.as_ref());
let kind = source
.and_then(|s| s.source_type.as_deref())
.unwrap_or("inline");
match kind {
"inline" => source
.and_then(|s| s.code.as_deref())
.map(|c| PythonSource::Inline(c.to_string()))
.ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: inline` has no `code`".to_string(),
)
}),
"file" => {
let path = script.and_then(|s| s.uri.as_deref()).ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: file` requires `script.uri` (the file path)"
.to_string(),
)
})?;
Ok(PythonSource::File {
path: path.to_string(),
})
}
"gcs" => {
let uri = script.and_then(|s| s.uri.as_deref()).ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: gcs` requires `script.uri` (a gs:// URL)"
.to_string(),
)
})?;
Ok(PythonSource::Gcs {
uri: uri.to_string(),
})
}
"http" => {
let endpoint = source
.and_then(|s| s.endpoint.as_deref())
.or_else(|| script.and_then(|s| s.uri.as_deref()))
.ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: http` requires `source.endpoint` \
(the URL to GET) or a `script.uri`"
.to_string(),
)
})?;
let method = source
.and_then(|s| s.method.as_deref())
.unwrap_or("GET")
.to_uppercase();
let timeout_secs = source
.and_then(|s| s.timeout)
.unwrap_or(DEFAULT_LOADER_TIMEOUT_SECS);
Ok(PythonSource::Http {
endpoint: endpoint.to_string(),
method,
timeout_secs,
})
}
other => Err(ToolError::Configuration(format!(
"python script `source.type: {other}` is unknown; expected one of \
inline | file | gcs | http"
))),
}
}
pub fn resolve_code(&self) -> Result<&str, ToolError> {
if let Some(code) = self.code.as_deref() {
return Ok(code);
}
if let Some(source) = self.script.as_ref().and_then(|s| s.source.as_ref()) {
let kind = source.source_type.as_deref().unwrap_or("inline");
if kind == "inline" {
return source.code.as_deref().ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: inline` has no `code`".to_string(),
)
});
}
return Err(ToolError::Configuration(format!(
"python script `source.type: {kind}` requires async loading; \
call PythonTool::load_script_code"
)));
}
Err(ToolError::Configuration(
"python config has no code: set `code:` or `script.source.code`".to_string(),
))
}
}
fn parse_gcs_uri(uri: &str) -> Result<(String, String), ToolError> {
let rest = uri.strip_prefix("gs://").ok_or_else(|| {
ToolError::Configuration(format!(
"gcs script uri must start with `gs://`, got `{uri}`"
))
})?;
let (bucket, object) = rest.split_once('/').ok_or_else(|| {
ToolError::Configuration(format!(
"gcs script uri `{uri}` has no object path after the bucket"
))
})?;
if bucket.is_empty() || object.is_empty() {
return Err(ToolError::Configuration(format!(
"gcs script uri `{uri}` has an empty bucket or object"
)));
}
Ok((bucket.to_string(), object.to_string()))
}
fn encode_gcs_object(object: &str) -> String {
let mut out = String::with_capacity(object.len() * 2);
for byte in object.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(byte as char);
}
_ => out.push_str(&format!("%{byte:02X}")),
}
}
out
}
pub struct PythonTool {
template_engine: TemplateEngine,
http_client: reqwest::Client,
gcp_auth: GcpAuth,
}
impl PythonTool {
pub fn new() -> Self {
Self {
template_engine: TemplateEngine::new(),
http_client: reqwest::Client::new(),
gcp_auth: GcpAuth::new(),
}
}
pub async fn load_script_code(&self, cfg: &PythonConfig) -> Result<String, ToolError> {
match cfg.resolve_source()? {
PythonSource::Inline(code) => Ok(code),
PythonSource::File { path } => self.load_from_file(&path).await,
PythonSource::Gcs { uri } => self.load_from_gcs(&uri).await,
PythonSource::Http {
endpoint,
method,
timeout_secs,
} => self.load_from_http(&endpoint, &method, timeout_secs).await,
}
}
async fn load_from_file(&self, path: &str) -> Result<String, ToolError> {
tokio::fs::read_to_string(path).await.map_err(|e| {
ToolError::Io(format!(
"python script file `{path}` could not be read: {e}"
))
})
}
async fn load_from_gcs(&self, uri: &str) -> Result<String, ToolError> {
let (bucket, object) = parse_gcs_uri(uri)?;
let token = self.gcp_auth.get_token(GCS_READ_SCOPES).await?;
let url = format!(
"https://storage.googleapis.com/storage/v1/b/{bucket}/o/{}?alt=media",
encode_gcs_object(&object)
);
let resp = self
.http_client
.get(&url)
.bearer_auth(token)
.timeout(Duration::from_secs(DEFAULT_LOADER_TIMEOUT_SECS))
.send()
.await
.map_err(|e| ToolError::Http(format!("gcs fetch `{uri}` failed: {e}")))?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::Http(format!(
"gcs fetch `{uri}` returned HTTP {status}: {}",
body.chars().take(300).collect::<String>()
)));
}
resp.text()
.await
.map_err(|e| ToolError::Http(format!("gcs fetch `{uri}` body read failed: {e}")))
}
async fn load_from_http(
&self,
endpoint: &str,
method: &str,
timeout_secs: u64,
) -> Result<String, ToolError> {
let req_method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|_| {
ToolError::Configuration(format!("python http script: invalid method `{method}`"))
})?;
let resp = self
.http_client
.request(req_method, endpoint)
.timeout(Duration::from_secs(timeout_secs))
.send()
.await
.map_err(|e| ToolError::Http(format!("http fetch `{endpoint}` failed: {e}")))?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::Http(format!(
"http fetch `{endpoint}` returned HTTP {status}: {}",
body.chars().take(300).collect::<String>()
)));
}
resp.text()
.await
.map_err(|e| ToolError::Http(format!("http fetch `{endpoint}` body read failed: {e}")))
}
#[allow(clippy::too_many_arguments)]
pub async fn execute_code(
&self,
code: &str,
args: &HashMap<String, serde_json::Value>,
env: &HashMap<String, String>,
python: &str,
cwd: Option<&str>,
timeout_duration: Option<Duration>,
ctx: &ExecutionContext,
) -> Result<ToolResult, ToolError> {
let start = std::time::Instant::now();
let effective_code = wrap_top_level_return(code);
let wrapper_code = format!(
r#"
import sys
import json
# Read context from stdin
context = json.loads(sys.stdin.read())
args = context.get('args', {{}})
variables = context.get('variables', {{}})
execution_id = context.get('execution_id')
step = context.get('step')
# Make args available as globals for convenience
globals().update(args)
# Expose args as `input_data` for fixture parity (noetl/ai-meta#71).
# Legacy Python executor injected args under both the individual
# key names AND as the dict `input_data` so fixtures that call
# `input_data.get('foo')` work regardless of which style they use.
input_data = dict(args)
# User code (possibly wrapped by wrap_top_level_return for Style C)
{}
# Legacy `main()` convention (noetl/ai-meta#65): the
# script_execution/* fixtures + the Python (legacy) tool support
# function-based scripts that define a `main(...)` callable instead
# of setting the `result` global directly. When the user code
# didn't set a non-None `result` AND defines a callable `main`,
# call it with the matching args and capture its return value.
# Mirrors noetl/tools/python/executor.py:_invoke_main — binds
# main's named params from `args` by name, forwards all args when
# main accepts `**kwargs`, and awaits async `main` via asyncio.run.
if globals().get('result', None) is None:
__noetl_main = globals().get('main', None)
if callable(__noetl_main):
import inspect as __noetl_inspect
__noetl_sig = __noetl_inspect.signature(__noetl_main)
__noetl_kwargs = {{}}
__noetl_var_kw = any(
__p.kind == __p.VAR_KEYWORD
for __p in __noetl_sig.parameters.values()
)
for __pname, __p in __noetl_sig.parameters.items():
if __p.kind in (__p.VAR_POSITIONAL, __p.VAR_KEYWORD):
continue
if __pname in args:
__noetl_kwargs[__pname] = args[__pname]
if __noetl_var_kw:
for __k, __v in args.items():
__noetl_kwargs.setdefault(__k, __v)
if __noetl_inspect.iscoroutinefunction(__noetl_main):
import asyncio as __noetl_asyncio
result = __noetl_asyncio.run(__noetl_main(**__noetl_kwargs))
else:
result = __noetl_main(**__noetl_kwargs)
# Emit the user-set `result` global as JSON on a single line
# prefixed with the noetl marker. Missing / non-JSON-serializable
# results fall back to `null` so the tool can still complete
# successfully.
try:
__noetl_captured = globals().get('result', None)
sys.stdout.write('{marker}' + json.dumps(__noetl_captured, default=str) + '\n')
sys.stdout.flush()
except Exception as __noetl_err:
sys.stdout.write('{marker}null\n')
sys.stderr.write('noetl result capture failed: ' + repr(__noetl_err) + '\n')
"#,
effective_code,
marker = NOETL_RESULT_MARKER
);
let temp_file = NamedTempFile::new()
.map_err(|e| ToolError::Process(format!("Failed to create temp file: {}", e)))?;
tokio::fs::write(temp_file.path(), wrapper_code.as_bytes())
.await
.map_err(|e| ToolError::Process(format!("Failed to write script: {}", e)))?;
let mut cmd = Command::new(python);
cmd.arg(temp_file.path());
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
for (k, v) in env {
cmd.env(k, v);
}
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let mut child = cmd
.spawn()
.map_err(|e| ToolError::Process(format!("Failed to spawn Python process: {}", e)))?;
let context_json = serde_json::json!({
"args": args,
"variables": ctx.variables,
"execution_id": ctx.execution_id,
"step": ctx.step,
"server_url": ctx.server_url,
});
let stdin = child.stdin.take();
if let Some(mut stdin) = stdin {
let _ = stdin.write_all(context_json.to_string().as_bytes()).await;
let _ = stdin.shutdown().await;
}
let output = if let Some(duration) = timeout_duration {
let child_id = child.id();
match timeout(duration, child.wait_with_output()).await {
Ok(result) => result.map_err(|e| {
ToolError::Process(format!("Failed to wait for process: {}", e))
})?,
Err(_) => {
if let Some(pid) = child_id {
#[cfg(unix)]
{
let _ = std::process::Command::new("kill")
.args(["-9", &pid.to_string()])
.spawn();
}
#[cfg(windows)]
{
let _ = std::process::Command::new("taskkill")
.args(["/F", "/PID", &pid.to_string()])
.spawn();
}
}
let duration_ms = start.elapsed().as_millis() as u64;
return Ok(ToolResult::timeout(duration.as_secs()).with_duration(duration_ms));
}
}
} else {
child
.wait_with_output()
.await
.map_err(|e| ToolError::Process(format!("Failed to wait for process: {}", e)))?
};
let exit_code = output.status.code().unwrap_or(-1);
let raw_stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let (captured_result, stdout) = extract_result_from_stdout(&raw_stdout);
let duration_ms = start.elapsed().as_millis() as u64;
let data = if let Some(value) = captured_result {
if value.is_null() {
Some(serde_json::json!({
"stdout": stdout,
"stderr": stderr,
}))
} else {
Some(value)
}
} else if !stdout.trim().is_empty() {
serde_json::from_str(&stdout).ok()
} else {
None
};
let status = if exit_code == 0 {
ToolStatus::Success
} else {
ToolStatus::Error
};
Ok(ToolResult {
status,
data: data.or_else(|| {
Some(serde_json::json!({
"stdout": stdout,
"stderr": stderr,
}))
}),
error: if exit_code != 0 {
Some(format!("Python script exited with code {}", exit_code))
} else {
None
},
stdout: Some(stdout),
stderr: Some(stderr),
exit_code: Some(exit_code),
duration_ms: Some(duration_ms),
pending_callback: None,
})
}
fn parse_config(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<PythonConfig, ToolError> {
let template_ctx = ctx.to_template_context();
let rendered_config = self
.template_engine
.render_value(&config.config, &template_ctx)?;
serde_json::from_value(rendered_config)
.map_err(|e| ToolError::Configuration(format!("Invalid python config: {}", e)))
}
}
impl Default for PythonTool {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Tool for PythonTool {
fn name(&self) -> &'static str {
"python"
}
async fn execute(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<ToolResult, ToolError> {
let python_config = self.parse_config(config, ctx)?;
let code = self.load_script_code(&python_config).await?;
let timeout_duration = python_config
.timeout_seconds
.or(config.timeout)
.map(Duration::from_secs);
tracing::debug!(
code_len = code.len(),
python = %python_config.python,
timeout = ?timeout_duration,
"Executing Python script"
);
self.execute_code(
&code,
&python_config.args,
&python_config.env,
&python_config.python,
python_config.cwd.as_deref(),
timeout_duration,
ctx,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_python_config_deserialization() {
let json = serde_json::json!({
"code": "print('hello')",
"args": {"name": "world"},
"python": "python3"
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "print('hello')");
assert!(config.args.contains_key("name"));
}
#[test]
fn test_python_inline_script_shape_resolves_code() {
let json = serde_json::json!({
"script": {
"uri": "inline",
"source": { "type": "inline", "code": "result = {'ok': 1}" }
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "result = {'ok': 1}");
}
#[test]
fn test_python_source_without_type_defaults_inline() {
let json = serde_json::json!({
"script": { "source": { "code": "x = 1" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "x = 1");
}
#[test]
fn test_flat_code_wins_over_script() {
let json = serde_json::json!({
"code": "flat",
"script": { "source": { "type": "inline", "code": "nested" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "flat");
}
#[test]
fn test_resolve_source_classifies_file() {
let json = serde_json::json!({
"script": { "uri": "scripts/run.py", "source": { "type": "file" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::File {
path: "scripts/run.py".to_string()
},
);
}
#[test]
fn test_resolve_source_classifies_gcs() {
let json = serde_json::json!({
"script": {
"uri": "gs://my-bucket/scripts/run.py",
"source": { "type": "gcs", "auth": "gcp_cred" }
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::Gcs {
uri: "gs://my-bucket/scripts/run.py".to_string()
},
);
}
#[test]
fn test_resolve_source_classifies_http_endpoint_over_uri() {
let json = serde_json::json!({
"script": {
"uri": "hello.py",
"source": {
"type": "http",
"endpoint": "https://example.com/hello.py",
"method": "get",
"timeout": 15
}
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::Http {
endpoint: "https://example.com/hello.py".to_string(),
method: "GET".to_string(),
timeout_secs: 15,
},
);
}
#[test]
fn test_resolve_source_http_falls_back_to_uri() {
let json = serde_json::json!({
"script": {
"uri": "https://example.com/from-uri.py",
"source": { "type": "http" }
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::Http {
endpoint: "https://example.com/from-uri.py".to_string(),
method: "GET".to_string(),
timeout_secs: DEFAULT_LOADER_TIMEOUT_SECS,
},
);
}
#[test]
fn test_resolve_source_unknown_type_errors() {
let json = serde_json::json!({
"script": { "uri": "x", "source": { "type": "ftp" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
let err = config.resolve_source().unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("ftp"), "got: {msg}");
assert!(msg.contains("inline | file | gcs | http"), "got: {msg}");
}
#[test]
fn test_resolve_source_file_requires_uri() {
let json = serde_json::json!({
"script": { "source": { "type": "file" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
let err = config.resolve_source().unwrap_err();
assert!(format!("{err}").contains("script.uri"), "got: {err}");
}
#[test]
fn test_parse_gcs_uri() {
assert_eq!(
parse_gcs_uri("gs://bucket/a/b/c.py").unwrap(),
("bucket".to_string(), "a/b/c.py".to_string()),
);
assert!(parse_gcs_uri("https://x").is_err());
assert!(parse_gcs_uri("gs://bucket-only").is_err());
assert!(parse_gcs_uri("gs:///object").is_err());
}
#[test]
fn test_encode_gcs_object_percent_encodes_slashes() {
assert_eq!(encode_gcs_object("a/b/c.py"), "a%2Fb%2Fc.py");
assert_eq!(encode_gcs_object("plain.py"), "plain.py");
assert_eq!(encode_gcs_object("with space.py"), "with%20space.py");
}
#[tokio::test]
async fn test_load_from_file_reads_body() {
let mut tmp = NamedTempFile::new().unwrap();
use std::io::Write;
write!(tmp, "result = {{'loaded': 'from_file'}}").unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let tool = PythonTool::new();
let code = tool.load_from_file(&path).await.unwrap();
assert_eq!(code, "result = {'loaded': 'from_file'}");
}
#[tokio::test]
async fn test_load_from_file_missing_path_errors() {
let tool = PythonTool::new();
let err = tool
.load_from_file("/nonexistent/path/to/script.py")
.await
.unwrap_err();
assert!(format!("{err}").contains("could not be read"), "got: {err}");
}
#[tokio::test]
async fn test_load_script_code_inline_path() {
let json = serde_json::json!({ "code": "x = 1" });
let config: PythonConfig = serde_json::from_value(json).unwrap();
let tool = PythonTool::new();
assert_eq!(tool.load_script_code(&config).await.unwrap(), "x = 1");
}
#[test]
fn test_no_code_anywhere_errors() {
let json = serde_json::json!({ "args": {} });
let config: PythonConfig = serde_json::from_value(json).unwrap();
let err = config.resolve_code().unwrap_err();
assert!(format!("{err}").contains("no code"), "got: {err}");
}
#[test]
fn test_python_config_defaults() {
let json = serde_json::json!({
"code": "print(1)"
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert!(config.args.is_empty());
assert!(config.env.is_empty());
assert_eq!(config.python, default_python());
}
#[test]
fn test_extract_result_strips_marker_line() {
let raw = "hello from user\n@@__NOETL_RESULT__@@{\"is_hot\":true}\n";
let (captured, cleaned) = extract_result_from_stdout(raw);
assert_eq!(
captured,
Some(serde_json::json!({"is_hot": true})),
"marker JSON should parse"
);
assert_eq!(cleaned, "hello from user\n", "marker line stripped");
}
#[test]
fn test_extract_result_no_marker_returns_none() {
let raw = "just user output\n";
let (captured, cleaned) = extract_result_from_stdout(raw);
assert!(captured.is_none(), "no marker = no capture");
assert_eq!(cleaned, "just user output\n", "stdout unchanged");
}
#[test]
fn test_extract_result_handles_null_capture() {
let raw = "@@__NOETL_RESULT__@@null\n";
let (captured, cleaned) = extract_result_from_stdout(raw);
assert_eq!(captured, Some(serde_json::Value::Null));
assert_eq!(cleaned, "");
}
#[tokio::test]
async fn test_python_captures_result_global() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"result = {'is_hot': True, 'message': 'hot'}",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
let data = result.data.expect("data should be the captured result");
assert_eq!(
data.get("is_hot").and_then(|v| v.as_bool()),
Some(true),
"captured result must expose `is_hot`"
);
assert_eq!(
data.get("message").and_then(|v| v.as_str()),
Some("hot"),
"captured result must expose `message`"
);
}
#[tokio::test]
async fn test_python_main_function_convention() {
let tool = PythonTool::new();
let mut args = HashMap::new();
args.insert("name".to_string(), serde_json::json!("NoETL"));
args.insert("count".to_string(), serde_json::json!(3));
let env = HashMap::new();
let ctx = ExecutionContext::default();
let code = r#"
def main(name="World", count=1):
messages = [f"Hello, {name}! (#{i+1})" for i in range(count)]
return {"status": "success", "messages": messages, "total": count}
"#;
let result = tool
.execute_code(code, &args, &env, "python3", None, None, &ctx)
.await
.unwrap();
assert!(result.is_success(), "stderr: {:?}", result.stderr);
let data = result.data.expect("main() return becomes data");
assert_eq!(data.get("status").and_then(|v| v.as_str()), Some("success"));
assert_eq!(data.get("total").and_then(|v| v.as_i64()), Some(3));
assert_eq!(
data.get("messages")
.and_then(|v| v.as_array())
.map(|a| a.len()),
Some(3),
);
}
#[tokio::test]
async fn test_python_explicit_result_wins_over_main() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let code = r#"
def main():
return {"from": "main"}
result = {"from": "explicit"}
"#;
let result = tool
.execute_code(code, &args, &env, "python3", None, None, &ctx)
.await
.unwrap();
assert!(result.is_success());
let data = result.data.expect("explicit result");
assert_eq!(data.get("from").and_then(|v| v.as_str()), Some("explicit"));
}
#[tokio::test]
async fn test_python_async_main_function() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let code = r#"
async def main():
return {"async": True, "value": 42}
"#;
let result = tool
.execute_code(code, &args, &env, "python3", None, None, &ctx)
.await
.unwrap();
assert!(result.is_success(), "stderr: {:?}", result.stderr);
let data = result.data.expect("async main() return becomes data");
assert_eq!(data.get("async").and_then(|v| v.as_bool()), Some(true));
assert_eq!(data.get("value").and_then(|v| v.as_i64()), Some(42));
}
#[tokio::test]
async fn test_python_main_with_var_kwargs() {
let tool = PythonTool::new();
let mut args = HashMap::new();
args.insert("a".to_string(), serde_json::json!(1));
args.insert("b".to_string(), serde_json::json!(2));
let env = HashMap::new();
let ctx = ExecutionContext::default();
let code = r#"
def main(**kwargs):
return {"sum": sum(v for v in kwargs.values() if isinstance(v, int))}
"#;
let result = tool
.execute_code(code, &args, &env, "python3", None, None, &ctx)
.await
.unwrap();
assert!(result.is_success(), "stderr: {:?}", result.stderr);
assert_eq!(
result.data.unwrap().get("sum").and_then(|v| v.as_i64()),
Some(3),
);
}
#[tokio::test]
async fn test_python_capture_preserves_user_stdout() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"print('debug: starting'); result = {'ok': True}",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
let stdout = result.stdout.as_ref().unwrap();
assert!(stdout.contains("debug: starting"), "user print preserved");
assert!(
!stdout.contains(NOETL_RESULT_MARKER),
"marker line must be stripped from visible stdout"
);
let data = result.data.expect("captured result");
assert_eq!(data.get("ok").and_then(|v| v.as_bool()), Some(true));
}
#[tokio::test]
async fn test_python_simple_script() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"print('hello from python')",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
assert!(result
.stdout
.as_ref()
.unwrap()
.contains("hello from python"));
}
#[tokio::test]
async fn test_python_json_output() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
r#"import json; print(json.dumps({"result": 42}))"#,
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
if let Some(data) = result.data {
assert!(data.to_string().contains("42"));
}
}
#[tokio::test]
async fn test_python_with_args() {
let tool = PythonTool::new();
let mut args = HashMap::new();
args.insert("x".to_string(), serde_json::json!(10));
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"print(args.get('x', 0) * 2)",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
assert!(result.stdout.as_ref().unwrap().contains("20"));
}
#[tokio::test]
async fn test_python_error() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"raise ValueError('test error')",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(!result.is_success());
assert!(result.exit_code.unwrap() != 0);
}
#[tokio::test]
async fn test_python_timeout() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"import time; time.sleep(10)",
&args,
&env,
"python3",
None,
Some(Duration::from_millis(100)),
&ctx,
)
.await
.unwrap();
assert_eq!(result.status, ToolStatus::Timeout);
}
#[tokio::test]
async fn test_python_tool_interface() {
let tool = PythonTool::new();
assert_eq!(tool.name(), "python");
let config = ToolConfig {
kind: "python".to_string(),
config: serde_json::json!({
"code": "print('test')"
}),
timeout: None,
retry: None,
auth: None,
};
let ctx = ExecutionContext::default();
let result = tool.execute(&config, &ctx).await.unwrap();
assert!(result.is_success());
}
#[test]
fn test_wrap_top_level_return_noop_when_no_return() {
let code = "result = {'x': 1}";
assert_eq!(wrap_top_level_return(code), code);
}
#[test]
fn test_wrap_top_level_return_noop_inside_def() {
let code = "def main():\n return {'x': 1}";
assert_eq!(wrap_top_level_return(code), code);
}
#[test]
fn test_wrap_top_level_return_noop_inside_async_def() {
let code = "async def main():\n return {'x': 1}";
assert_eq!(wrap_top_level_return(code), code);
}
#[test]
fn test_wrap_top_level_return_wraps_bare_return() {
let code = "return {'x': 1}";
let wrapped = wrap_top_level_return(code);
assert!(
wrapped.contains("def __noetl_step__(args, input_data, **kw):"),
"expected wrapper function, got:\n{wrapped}"
);
assert!(
wrapped.contains("result = __noetl_step__(args, input_data)"),
"expected call line, got:\n{wrapped}"
);
assert!(
wrapped.contains(" return {'x': 1}"),
"expected indented user code, got:\n{wrapped}"
);
}
#[tokio::test]
async fn test_input_data_global_is_injected() {
let tool = PythonTool::new();
let mut args = HashMap::new();
args.insert("foo".to_string(), serde_json::json!("bar"));
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"result = {\"got\": input_data.get(\"foo\")}",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success(), "stderr: {:?}", result.stderr);
let data = result.data.expect("data should be the captured result");
assert_eq!(
data.get("got").and_then(|v| v.as_str()),
Some("bar"),
"input_data.get('foo') must return 'bar'"
);
}
#[tokio::test]
async fn test_top_level_return_wraps_user_code() {
let tool = PythonTool::new();
let mut args = HashMap::new();
args.insert("n".to_string(), serde_json::json!(5));
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"return {\"echoed\": input_data.get(\"n\", 0) * 2}",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success(), "stderr: {:?}", result.stderr);
let data = result.data.expect("return value becomes data");
assert_eq!(
data.get("echoed").and_then(|v| v.as_i64()),
Some(10),
"top-level return must yield echoed: 10"
);
}
#[tokio::test]
async fn test_top_level_return_with_no_input_data() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"return {\"ok\": True}",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success(), "stderr: {:?}", result.stderr);
let data = result.data.expect("return value becomes data");
assert_eq!(
data.get("ok").and_then(|v| v.as_bool()),
Some(true),
"top-level return with no args must yield ok: true"
);
}
#[tokio::test]
async fn test_main_function_convention_still_works_with_input_data_global() {
let tool = PythonTool::new();
let mut args = HashMap::new();
args.insert("value".to_string(), serde_json::json!(7));
let env = HashMap::new();
let ctx = ExecutionContext::default();
let code = r#"
def main(value=0):
return {"doubled": value * 2, "from_input_data": input_data.get("value", -1)}
"#;
let result = tool
.execute_code(code, &args, &env, "python3", None, None, &ctx)
.await
.unwrap();
assert!(result.is_success(), "stderr: {:?}", result.stderr);
let data = result.data.expect("main() return becomes data");
assert_eq!(
data.get("doubled").and_then(|v| v.as_i64()),
Some(14),
"main() called with value=7"
);
assert_eq!(
data.get("from_input_data").and_then(|v| v.as_i64()),
Some(7),
"input_data accessible inside main() body via global"
);
}
}