use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use tempfile::NamedTempFile;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::time::timeout;
use crate::auth::GcpAuth;
use crate::context::ExecutionContext;
use crate::error::ToolError;
use crate::registry::{Tool, ToolConfig};
use crate::result::{ToolResult, ToolStatus};
use crate::template::TemplateEngine;
const GCS_READ_SCOPES: &[&str] = &["https://www.googleapis.com/auth/devstorage.read_only"];
const DEFAULT_LOADER_TIMEOUT_SECS: u64 = 30;
const NOETL_RESULT_MARKER: &str = "@@__NOETL_RESULT__@@";
fn extract_result_from_stdout(stdout: &str) -> (Option<serde_json::Value>, String) {
let mut kept_lines = Vec::new();
let mut captured: Option<serde_json::Value> = None;
for line in stdout.lines() {
if let Some(rest) = line.strip_prefix(NOETL_RESULT_MARKER) {
captured = serde_json::from_str(rest).ok();
continue;
}
kept_lines.push(line);
}
let mut cleaned = kept_lines.join("\n");
if stdout.ends_with('\n') && !cleaned.is_empty() {
cleaned.push('\n');
}
(captured, cleaned)
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PythonScriptSource {
#[serde(rename = "type", default, skip_serializing_if = "Option::is_none")]
pub source_type: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub code: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub auth: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub method: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout: Option<u64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PythonScript {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub uri: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source: Option<PythonScriptSource>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PythonConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub code: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub script: Option<PythonScript>,
#[serde(default)]
pub args: HashMap<String, serde_json::Value>,
#[serde(default = "default_python")]
pub python: String,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout_seconds: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
}
fn default_python() -> String {
std::env::var("PYTHON_PATH").unwrap_or_else(|_| "python3".to_string())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PythonSource {
Inline(String),
File { path: String },
Gcs { uri: String },
Http {
endpoint: String,
method: String,
timeout_secs: u64,
},
}
impl PythonConfig {
pub fn resolve_source(&self) -> Result<PythonSource, ToolError> {
if let Some(code) = self.code.as_deref() {
return Ok(PythonSource::Inline(code.to_string()));
}
let script = self.script.as_ref();
let source = script.and_then(|s| s.source.as_ref());
let kind = source
.and_then(|s| s.source_type.as_deref())
.unwrap_or("inline");
match kind {
"inline" => source
.and_then(|s| s.code.as_deref())
.map(|c| PythonSource::Inline(c.to_string()))
.ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: inline` has no `code`".to_string(),
)
}),
"file" => {
let path = script.and_then(|s| s.uri.as_deref()).ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: file` requires `script.uri` (the file path)"
.to_string(),
)
})?;
Ok(PythonSource::File {
path: path.to_string(),
})
}
"gcs" => {
let uri = script.and_then(|s| s.uri.as_deref()).ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: gcs` requires `script.uri` (a gs:// URL)"
.to_string(),
)
})?;
Ok(PythonSource::Gcs {
uri: uri.to_string(),
})
}
"http" => {
let endpoint = source
.and_then(|s| s.endpoint.as_deref())
.or_else(|| script.and_then(|s| s.uri.as_deref()))
.ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: http` requires `source.endpoint` \
(the URL to GET) or a `script.uri`"
.to_string(),
)
})?;
let method = source
.and_then(|s| s.method.as_deref())
.unwrap_or("GET")
.to_uppercase();
let timeout_secs = source
.and_then(|s| s.timeout)
.unwrap_or(DEFAULT_LOADER_TIMEOUT_SECS);
Ok(PythonSource::Http {
endpoint: endpoint.to_string(),
method,
timeout_secs,
})
}
other => Err(ToolError::Configuration(format!(
"python script `source.type: {other}` is unknown; expected one of \
inline | file | gcs | http"
))),
}
}
pub fn resolve_code(&self) -> Result<&str, ToolError> {
if let Some(code) = self.code.as_deref() {
return Ok(code);
}
if let Some(source) = self.script.as_ref().and_then(|s| s.source.as_ref()) {
let kind = source.source_type.as_deref().unwrap_or("inline");
if kind == "inline" {
return source.code.as_deref().ok_or_else(|| {
ToolError::Configuration(
"python script `source.type: inline` has no `code`".to_string(),
)
});
}
return Err(ToolError::Configuration(format!(
"python script `source.type: {kind}` requires async loading; \
call PythonTool::load_script_code"
)));
}
Err(ToolError::Configuration(
"python config has no code: set `code:` or `script.source.code`".to_string(),
))
}
}
fn parse_gcs_uri(uri: &str) -> Result<(String, String), ToolError> {
let rest = uri.strip_prefix("gs://").ok_or_else(|| {
ToolError::Configuration(format!(
"gcs script uri must start with `gs://`, got `{uri}`"
))
})?;
let (bucket, object) = rest.split_once('/').ok_or_else(|| {
ToolError::Configuration(format!(
"gcs script uri `{uri}` has no object path after the bucket"
))
})?;
if bucket.is_empty() || object.is_empty() {
return Err(ToolError::Configuration(format!(
"gcs script uri `{uri}` has an empty bucket or object"
)));
}
Ok((bucket.to_string(), object.to_string()))
}
fn encode_gcs_object(object: &str) -> String {
let mut out = String::with_capacity(object.len() * 2);
for byte in object.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(byte as char);
}
_ => out.push_str(&format!("%{byte:02X}")),
}
}
out
}
pub struct PythonTool {
template_engine: TemplateEngine,
http_client: reqwest::Client,
gcp_auth: GcpAuth,
}
impl PythonTool {
pub fn new() -> Self {
Self {
template_engine: TemplateEngine::new(),
http_client: reqwest::Client::new(),
gcp_auth: GcpAuth::new(),
}
}
pub async fn load_script_code(&self, cfg: &PythonConfig) -> Result<String, ToolError> {
match cfg.resolve_source()? {
PythonSource::Inline(code) => Ok(code),
PythonSource::File { path } => self.load_from_file(&path).await,
PythonSource::Gcs { uri } => self.load_from_gcs(&uri).await,
PythonSource::Http {
endpoint,
method,
timeout_secs,
} => self.load_from_http(&endpoint, &method, timeout_secs).await,
}
}
async fn load_from_file(&self, path: &str) -> Result<String, ToolError> {
tokio::fs::read_to_string(path).await.map_err(|e| {
ToolError::Io(format!("python script file `{path}` could not be read: {e}"))
})
}
async fn load_from_gcs(&self, uri: &str) -> Result<String, ToolError> {
let (bucket, object) = parse_gcs_uri(uri)?;
let token = self.gcp_auth.get_token(GCS_READ_SCOPES).await?;
let url = format!(
"https://storage.googleapis.com/storage/v1/b/{bucket}/o/{}?alt=media",
encode_gcs_object(&object)
);
let resp = self
.http_client
.get(&url)
.bearer_auth(token)
.timeout(Duration::from_secs(DEFAULT_LOADER_TIMEOUT_SECS))
.send()
.await
.map_err(|e| ToolError::Http(format!("gcs fetch `{uri}` failed: {e}")))?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::Http(format!(
"gcs fetch `{uri}` returned HTTP {status}: {}",
body.chars().take(300).collect::<String>()
)));
}
resp.text()
.await
.map_err(|e| ToolError::Http(format!("gcs fetch `{uri}` body read failed: {e}")))
}
async fn load_from_http(
&self,
endpoint: &str,
method: &str,
timeout_secs: u64,
) -> Result<String, ToolError> {
let req_method = reqwest::Method::from_bytes(method.as_bytes()).map_err(|_| {
ToolError::Configuration(format!("python http script: invalid method `{method}`"))
})?;
let resp = self
.http_client
.request(req_method, endpoint)
.timeout(Duration::from_secs(timeout_secs))
.send()
.await
.map_err(|e| ToolError::Http(format!("http fetch `{endpoint}` failed: {e}")))?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::Http(format!(
"http fetch `{endpoint}` returned HTTP {status}: {}",
body.chars().take(300).collect::<String>()
)));
}
resp.text()
.await
.map_err(|e| ToolError::Http(format!("http fetch `{endpoint}` body read failed: {e}")))
}
#[allow(clippy::too_many_arguments)]
pub async fn execute_code(
&self,
code: &str,
args: &HashMap<String, serde_json::Value>,
env: &HashMap<String, String>,
python: &str,
cwd: Option<&str>,
timeout_duration: Option<Duration>,
ctx: &ExecutionContext,
) -> Result<ToolResult, ToolError> {
let start = std::time::Instant::now();
let wrapper_code = format!(
r#"
import sys
import json
# Read context from stdin
context = json.loads(sys.stdin.read())
args = context.get('args', {{}})
variables = context.get('variables', {{}})
execution_id = context.get('execution_id')
step = context.get('step')
# Make args available as globals for convenience
globals().update(args)
# User code
{}
# Emit the user-set `result` global as JSON on a single line
# prefixed with the noetl marker. Missing / non-JSON-serializable
# results fall back to `null` so the tool can still complete
# successfully.
try:
__noetl_captured = globals().get('result', None)
sys.stdout.write('{marker}' + json.dumps(__noetl_captured, default=str) + '\n')
sys.stdout.flush()
except Exception as __noetl_err:
sys.stdout.write('{marker}null\n')
sys.stderr.write('noetl result capture failed: ' + repr(__noetl_err) + '\n')
"#,
code,
marker = NOETL_RESULT_MARKER
);
let temp_file = NamedTempFile::new()
.map_err(|e| ToolError::Process(format!("Failed to create temp file: {}", e)))?;
tokio::fs::write(temp_file.path(), wrapper_code.as_bytes())
.await
.map_err(|e| ToolError::Process(format!("Failed to write script: {}", e)))?;
let mut cmd = Command::new(python);
cmd.arg(temp_file.path());
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
for (k, v) in env {
cmd.env(k, v);
}
cmd.stdin(std::process::Stdio::piped());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let mut child = cmd
.spawn()
.map_err(|e| ToolError::Process(format!("Failed to spawn Python process: {}", e)))?;
let context_json = serde_json::json!({
"args": args,
"variables": ctx.variables,
"execution_id": ctx.execution_id,
"step": ctx.step,
"server_url": ctx.server_url,
});
let stdin = child.stdin.take();
if let Some(mut stdin) = stdin {
let _ = stdin.write_all(context_json.to_string().as_bytes()).await;
let _ = stdin.shutdown().await;
}
let output = if let Some(duration) = timeout_duration {
let child_id = child.id();
match timeout(duration, child.wait_with_output()).await {
Ok(result) => result.map_err(|e| {
ToolError::Process(format!("Failed to wait for process: {}", e))
})?,
Err(_) => {
if let Some(pid) = child_id {
#[cfg(unix)]
{
let _ = std::process::Command::new("kill")
.args(["-9", &pid.to_string()])
.spawn();
}
#[cfg(windows)]
{
let _ = std::process::Command::new("taskkill")
.args(["/F", "/PID", &pid.to_string()])
.spawn();
}
}
let duration_ms = start.elapsed().as_millis() as u64;
return Ok(ToolResult::timeout(duration.as_secs()).with_duration(duration_ms));
}
}
} else {
child
.wait_with_output()
.await
.map_err(|e| ToolError::Process(format!("Failed to wait for process: {}", e)))?
};
let exit_code = output.status.code().unwrap_or(-1);
let raw_stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let (captured_result, stdout) = extract_result_from_stdout(&raw_stdout);
let duration_ms = start.elapsed().as_millis() as u64;
let data = if let Some(value) = captured_result {
if value.is_null() {
Some(serde_json::json!({
"stdout": stdout,
"stderr": stderr,
}))
} else {
Some(value)
}
} else if !stdout.trim().is_empty() {
serde_json::from_str(&stdout).ok()
} else {
None
};
let status = if exit_code == 0 {
ToolStatus::Success
} else {
ToolStatus::Error
};
Ok(ToolResult {
status,
data: data.or_else(|| {
Some(serde_json::json!({
"stdout": stdout,
"stderr": stderr,
}))
}),
error: if exit_code != 0 {
Some(format!("Python script exited with code {}", exit_code))
} else {
None
},
stdout: Some(stdout),
stderr: Some(stderr),
exit_code: Some(exit_code),
duration_ms: Some(duration_ms),
pending_callback: None,
})
}
fn parse_config(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<PythonConfig, ToolError> {
let template_ctx = ctx.to_template_context();
let rendered_config = self
.template_engine
.render_value(&config.config, &template_ctx)?;
serde_json::from_value(rendered_config)
.map_err(|e| ToolError::Configuration(format!("Invalid python config: {}", e)))
}
}
impl Default for PythonTool {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Tool for PythonTool {
fn name(&self) -> &'static str {
"python"
}
async fn execute(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<ToolResult, ToolError> {
let python_config = self.parse_config(config, ctx)?;
let code = self.load_script_code(&python_config).await?;
let timeout_duration = python_config
.timeout_seconds
.or(config.timeout)
.map(Duration::from_secs);
tracing::debug!(
code_len = code.len(),
python = %python_config.python,
timeout = ?timeout_duration,
"Executing Python script"
);
self.execute_code(
&code,
&python_config.args,
&python_config.env,
&python_config.python,
python_config.cwd.as_deref(),
timeout_duration,
ctx,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_python_config_deserialization() {
let json = serde_json::json!({
"code": "print('hello')",
"args": {"name": "world"},
"python": "python3"
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "print('hello')");
assert!(config.args.contains_key("name"));
}
#[test]
fn test_python_inline_script_shape_resolves_code() {
let json = serde_json::json!({
"script": {
"uri": "inline",
"source": { "type": "inline", "code": "result = {'ok': 1}" }
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "result = {'ok': 1}");
}
#[test]
fn test_python_source_without_type_defaults_inline() {
let json = serde_json::json!({
"script": { "source": { "code": "x = 1" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "x = 1");
}
#[test]
fn test_flat_code_wins_over_script() {
let json = serde_json::json!({
"code": "flat",
"script": { "source": { "type": "inline", "code": "nested" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(config.resolve_code().unwrap(), "flat");
}
#[test]
fn test_resolve_source_classifies_file() {
let json = serde_json::json!({
"script": { "uri": "scripts/run.py", "source": { "type": "file" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::File {
path: "scripts/run.py".to_string()
},
);
}
#[test]
fn test_resolve_source_classifies_gcs() {
let json = serde_json::json!({
"script": {
"uri": "gs://my-bucket/scripts/run.py",
"source": { "type": "gcs", "auth": "gcp_cred" }
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::Gcs {
uri: "gs://my-bucket/scripts/run.py".to_string()
},
);
}
#[test]
fn test_resolve_source_classifies_http_endpoint_over_uri() {
let json = serde_json::json!({
"script": {
"uri": "hello.py",
"source": {
"type": "http",
"endpoint": "https://example.com/hello.py",
"method": "get",
"timeout": 15
}
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::Http {
endpoint: "https://example.com/hello.py".to_string(),
method: "GET".to_string(),
timeout_secs: 15,
},
);
}
#[test]
fn test_resolve_source_http_falls_back_to_uri() {
let json = serde_json::json!({
"script": {
"uri": "https://example.com/from-uri.py",
"source": { "type": "http" }
}
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert_eq!(
config.resolve_source().unwrap(),
PythonSource::Http {
endpoint: "https://example.com/from-uri.py".to_string(),
method: "GET".to_string(),
timeout_secs: DEFAULT_LOADER_TIMEOUT_SECS,
},
);
}
#[test]
fn test_resolve_source_unknown_type_errors() {
let json = serde_json::json!({
"script": { "uri": "x", "source": { "type": "ftp" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
let err = config.resolve_source().unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("ftp"), "got: {msg}");
assert!(msg.contains("inline | file | gcs | http"), "got: {msg}");
}
#[test]
fn test_resolve_source_file_requires_uri() {
let json = serde_json::json!({
"script": { "source": { "type": "file" } }
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
let err = config.resolve_source().unwrap_err();
assert!(format!("{err}").contains("script.uri"), "got: {err}");
}
#[test]
fn test_parse_gcs_uri() {
assert_eq!(
parse_gcs_uri("gs://bucket/a/b/c.py").unwrap(),
("bucket".to_string(), "a/b/c.py".to_string()),
);
assert!(parse_gcs_uri("https://x").is_err());
assert!(parse_gcs_uri("gs://bucket-only").is_err());
assert!(parse_gcs_uri("gs:///object").is_err());
}
#[test]
fn test_encode_gcs_object_percent_encodes_slashes() {
assert_eq!(encode_gcs_object("a/b/c.py"), "a%2Fb%2Fc.py");
assert_eq!(encode_gcs_object("plain.py"), "plain.py");
assert_eq!(encode_gcs_object("with space.py"), "with%20space.py");
}
#[tokio::test]
async fn test_load_from_file_reads_body() {
let mut tmp = NamedTempFile::new().unwrap();
use std::io::Write;
write!(tmp, "result = {{'loaded': 'from_file'}}").unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let tool = PythonTool::new();
let code = tool.load_from_file(&path).await.unwrap();
assert_eq!(code, "result = {'loaded': 'from_file'}");
}
#[tokio::test]
async fn test_load_from_file_missing_path_errors() {
let tool = PythonTool::new();
let err = tool
.load_from_file("/nonexistent/path/to/script.py")
.await
.unwrap_err();
assert!(
format!("{err}").contains("could not be read"),
"got: {err}"
);
}
#[tokio::test]
async fn test_load_script_code_inline_path() {
let json = serde_json::json!({ "code": "x = 1" });
let config: PythonConfig = serde_json::from_value(json).unwrap();
let tool = PythonTool::new();
assert_eq!(tool.load_script_code(&config).await.unwrap(), "x = 1");
}
#[test]
fn test_no_code_anywhere_errors() {
let json = serde_json::json!({ "args": {} });
let config: PythonConfig = serde_json::from_value(json).unwrap();
let err = config.resolve_code().unwrap_err();
assert!(format!("{err}").contains("no code"), "got: {err}");
}
#[test]
fn test_python_config_defaults() {
let json = serde_json::json!({
"code": "print(1)"
});
let config: PythonConfig = serde_json::from_value(json).unwrap();
assert!(config.args.is_empty());
assert!(config.env.is_empty());
assert_eq!(config.python, default_python());
}
#[test]
fn test_extract_result_strips_marker_line() {
let raw = "hello from user\n@@__NOETL_RESULT__@@{\"is_hot\":true}\n";
let (captured, cleaned) = extract_result_from_stdout(raw);
assert_eq!(
captured,
Some(serde_json::json!({"is_hot": true})),
"marker JSON should parse"
);
assert_eq!(cleaned, "hello from user\n", "marker line stripped");
}
#[test]
fn test_extract_result_no_marker_returns_none() {
let raw = "just user output\n";
let (captured, cleaned) = extract_result_from_stdout(raw);
assert!(captured.is_none(), "no marker = no capture");
assert_eq!(cleaned, "just user output\n", "stdout unchanged");
}
#[test]
fn test_extract_result_handles_null_capture() {
let raw = "@@__NOETL_RESULT__@@null\n";
let (captured, cleaned) = extract_result_from_stdout(raw);
assert_eq!(captured, Some(serde_json::Value::Null));
assert_eq!(cleaned, "");
}
#[tokio::test]
async fn test_python_captures_result_global() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"result = {'is_hot': True, 'message': 'hot'}",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
let data = result.data.expect("data should be the captured result");
assert_eq!(
data.get("is_hot").and_then(|v| v.as_bool()),
Some(true),
"captured result must expose `is_hot`"
);
assert_eq!(
data.get("message").and_then(|v| v.as_str()),
Some("hot"),
"captured result must expose `message`"
);
}
#[tokio::test]
async fn test_python_capture_preserves_user_stdout() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"print('debug: starting'); result = {'ok': True}",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
let stdout = result.stdout.as_ref().unwrap();
assert!(stdout.contains("debug: starting"), "user print preserved");
assert!(
!stdout.contains(NOETL_RESULT_MARKER),
"marker line must be stripped from visible stdout"
);
let data = result.data.expect("captured result");
assert_eq!(data.get("ok").and_then(|v| v.as_bool()), Some(true));
}
#[tokio::test]
async fn test_python_simple_script() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"print('hello from python')",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
assert!(result
.stdout
.as_ref()
.unwrap()
.contains("hello from python"));
}
#[tokio::test]
async fn test_python_json_output() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
r#"import json; print(json.dumps({"result": 42}))"#,
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
if let Some(data) = result.data {
assert!(data.to_string().contains("42"));
}
}
#[tokio::test]
async fn test_python_with_args() {
let tool = PythonTool::new();
let mut args = HashMap::new();
args.insert("x".to_string(), serde_json::json!(10));
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"print(args.get('x', 0) * 2)",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(result.is_success());
assert!(result.stdout.as_ref().unwrap().contains("20"));
}
#[tokio::test]
async fn test_python_error() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"raise ValueError('test error')",
&args,
&env,
"python3",
None,
None,
&ctx,
)
.await
.unwrap();
assert!(!result.is_success());
assert!(result.exit_code.unwrap() != 0);
}
#[tokio::test]
async fn test_python_timeout() {
let tool = PythonTool::new();
let args = HashMap::new();
let env = HashMap::new();
let ctx = ExecutionContext::default();
let result = tool
.execute_code(
"import time; time.sleep(10)",
&args,
&env,
"python3",
None,
Some(Duration::from_millis(100)),
&ctx,
)
.await
.unwrap();
assert_eq!(result.status, ToolStatus::Timeout);
}
#[tokio::test]
async fn test_python_tool_interface() {
let tool = PythonTool::new();
assert_eq!(tool.name(), "python");
let config = ToolConfig {
kind: "python".to_string(),
config: serde_json::json!({
"code": "print('test')"
}),
timeout: None,
retry: None,
auth: None,
};
let ctx = ExecutionContext::default();
let result = tool.execute(&config, &ctx).await.unwrap();
assert!(result.is_success());
}
}