#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use anyhow::Result;
use bytes::Bytes;
use object_store::path::Path as StorePath;
use object_store::ObjectStore;
use object_store::PutPayload;
use noetl_tools::auth::GcpAuth;
use noetl_tools::context::ExecutionContext as ToolsExecutionContext;
use noetl_tools::registry::{Tool as ToolsRegistryTool, ToolConfig};
use noetl_tools::result::{ToolResult, ToolStatus};
use noetl_tools::tools::{DuckdbTool, HttpTool, RhaiTool, ShellTool};
use tracing::{info_span, Instrument};
use crate::playbook::{AuthConfig as CliAuthConfig, CmdsList, SinkFormat, Tool};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BridgeOutcome {
pub result: Option<String>,
}
impl BridgeOutcome {
pub fn empty() -> Self {
Self { result: None }
}
}
pub struct BridgeContext<'a> {
pub execution_id: i64,
pub step: &'a str,
pub variables: &'a HashMap<String, String>,
pub server_url: String,
pub worker_id: Option<String>,
pub command_id: Option<String>,
}
pub fn to_tools_context(bridge: &BridgeContext) -> ToolsExecutionContext {
let variables: HashMap<String, serde_json::Value> = bridge
.variables
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
.collect();
ToolsExecutionContext {
execution_id: bridge.execution_id,
step: bridge.step.to_string(),
variables,
server_url: bridge.server_url.clone(),
worker_id: bridge.worker_id.clone(),
command_id: bridge.command_id.clone(),
..ToolsExecutionContext::default()
}
}
pub fn to_tools_context_for_rhai(bridge: &BridgeContext) -> ToolsExecutionContext {
let mut variables: HashMap<String, serde_json::Value> = HashMap::new();
let mut workload_map: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
let mut vars_map: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
let mut step_maps: HashMap<String, serde_json::Map<String, serde_json::Value>> =
HashMap::new();
for (key, value) in bridge.variables {
let val = serde_json::Value::String(value.clone());
if let Some(suffix) = key.strip_prefix("workload.") {
workload_map.insert(suffix.to_string(), val);
} else if let Some(suffix) = key.strip_prefix("vars.") {
vars_map.insert(suffix.to_string(), val);
} else if let Some((step, field)) = key.split_once('.') {
step_maps
.entry(step.to_string())
.or_default()
.insert(field.to_string(), val);
} else {
variables.insert(key.clone(), val);
}
}
if !workload_map.is_empty() {
variables.insert(
"workload".to_string(),
serde_json::Value::Object(workload_map),
);
}
if !vars_map.is_empty() {
variables.insert("vars".to_string(), serde_json::Value::Object(vars_map));
}
for (step, map) in step_maps {
variables.insert(step, serde_json::Value::Object(map));
}
ToolsExecutionContext {
execution_id: bridge.execution_id,
step: bridge.step.to_string(),
variables,
server_url: bridge.server_url.clone(),
worker_id: bridge.worker_id.clone(),
command_id: bridge.command_id.clone(),
..ToolsExecutionContext::default()
}
}
pub fn to_tools_config(tool: &Tool) -> ToolConfig {
let (kind, config) = match tool {
Tool::Shell { cmds } => {
(
"shell",
serde_json::json!({
"command": match cmds {
CmdsList::Single(s) => s.clone(),
CmdsList::Multiple(v) => v.join("\n"),
},
"shell": "bash",
"capture": true,
}),
)
}
Tool::Http {
method,
url,
headers,
params,
body,
auth: _, } => (
"http",
serde_json::json!({
"method": method.to_uppercase(),
"url": url,
"headers": headers,
"params": params,
"body": body.as_deref().map(http_body_value),
}),
),
Tool::Playbook { path, args, input } => (
"playbook",
serde_json::json!({
"path": path,
"args": args,
"input": input,
}),
),
Tool::DuckDb { db, query, params } => (
"duckdb",
serde_json::json!({
"db_path": db,
"query": query.clone().unwrap_or_default(),
"params": params
.iter()
.map(|p| serde_json::Value::String(p.clone()))
.collect::<Vec<_>>(),
"as_objects": true,
}),
),
Tool::Rhai { code, args } => (
"rhai",
serde_json::json!({
"code": code,
"args": args,
}),
),
Tool::Auth { provider, scopes, project } => (
"auth",
serde_json::json!({
"provider": provider,
"scopes": scopes,
"project": project,
}),
),
Tool::Sink { target, format } => (
"sink",
serde_json::json!({
"target": target_to_value(target),
"format": format!("{:?}", format).to_lowercase(),
}),
),
Tool::Unsupported => ("unsupported", serde_json::json!({})),
};
ToolConfig {
kind: kind.to_string(),
config,
timeout: None,
retry: None,
auth: None,
}
}
fn shell_command_config(command: &str) -> ToolConfig {
ToolConfig {
kind: "shell".to_string(),
config: serde_json::json!({
"command": command,
"shell": "bash",
"capture": true,
}),
timeout: None,
retry: None,
auth: None,
}
}
fn http_body_value(body: &str) -> serde_json::Value {
serde_json::from_str(body).unwrap_or_else(|_| serde_json::Value::String(body.to_string()))
}
pub async fn resolve_auth_to_bearer(cfg: &CliAuthConfig) -> Result<String> {
match cfg.provider.as_str() {
"gcp" | "google" | "adc" => {
let gcp = GcpAuth::new();
let scopes: Vec<&str> = cfg.scopes.iter().map(|s| s.as_str()).collect();
let token = if scopes.is_empty() {
gcp.get_default_token()
.await
.map_err(|e| anyhow::anyhow!("failed to get GCP access token: {}", e))?
} else {
gcp.get_token(&scopes)
.await
.map_err(|e| anyhow::anyhow!("failed to get GCP access token: {}", e))?
};
Ok(token)
}
other => anyhow::bail!(
"unsupported auth provider: {}. Supported: gcp, google, adc",
other
),
}
}
fn http_tool_config(
method: &str,
url: &str,
headers: &HashMap<String, String>,
params: &HashMap<String, String>,
body: Option<&str>,
bearer: Option<&str>,
) -> ToolConfig {
let mut merged_headers = headers.clone();
if let Some(token) = bearer {
merged_headers.insert(
"Authorization".to_string(),
format!("Bearer {}", token),
);
}
ToolConfig {
kind: "http".to_string(),
config: serde_json::json!({
"method": method.to_uppercase(),
"url": url,
"headers": merged_headers,
"params": params,
"body": body.map(http_body_value),
}),
timeout: None,
retry: None,
auth: None,
}
}
fn reshape_http_result(result: ToolResult) -> Result<BridgeOutcome> {
if let Some(data) = result.data {
let status_code = data
.get("status_code")
.and_then(|v| v.as_u64())
.unwrap_or(0) as i32;
let body = data
.get("body")
.cloned()
.unwrap_or(serde_json::Value::Null);
let envelope = serde_json::json!({
"status": status_code,
"body": body,
});
return Ok(BridgeOutcome {
result: Some(envelope.to_string()),
});
}
from_tools_result(result)
}
fn duckdb_tool_config(
db_path: &str,
query: &str,
params: &[String],
) -> ToolConfig {
ToolConfig {
kind: "duckdb".to_string(),
config: serde_json::json!({
"db_path": db_path,
"query": query,
"params": params
.iter()
.map(|p| serde_json::Value::String(p.clone()))
.collect::<Vec<_>>(),
"as_objects": true,
}),
timeout: None,
retry: None,
auth: None,
}
}
fn reshape_duckdb_result(result: ToolResult) -> Result<BridgeOutcome> {
let data = match result.data {
Some(d) => d,
None => return from_tools_result(result),
};
if let Some(rows) = data.get("rows").and_then(|v| v.as_array()) {
let pretty = serde_json::to_string_pretty(rows)?;
return Ok(BridgeOutcome { result: Some(pretty) });
}
if data.get("affected_rows").is_some() {
return Ok(BridgeOutcome {
result: Some(r#"{"status": "ok"}"#.to_string()),
});
}
from_tools_result(ToolResult {
status: result.status,
data: Some(data),
error: result.error,
stdout: result.stdout,
stderr: result.stderr,
exit_code: result.exit_code,
duration_ms: result.duration_ms,
pending_callback: result.pending_callback,
})
}
pub fn prepare_sub_playbook_vars<F>(
parent_vars: &HashMap<String, String>,
args: &HashMap<String, String>,
input: &HashMap<String, serde_yaml::Value>,
mut render: F,
) -> Result<HashMap<String, String>>
where
F: FnMut(&str) -> Result<String>,
{
let mut sub_vars = parent_vars.clone();
if !input.is_empty() {
for (key, value_yaml) in input {
let template = match value_yaml {
serde_yaml::Value::String(s) => s.clone(),
serde_yaml::Value::Number(n) => n.to_string(),
serde_yaml::Value::Bool(b) => b.to_string(),
other => serde_yaml::to_string(other)?.trim().to_string(),
};
let value = render(&template)?;
sub_vars.insert(format!("workload.{}", key), value);
}
} else if !args.is_empty() {
for (key, template) in args {
let value = render(template)?;
sub_vars.insert(format!("workload.{}", key), value);
}
}
Ok(sub_vars)
}
pub fn auth_context_updates(
provider: &str,
token: &str,
project: Option<&str>,
) -> Vec<(String, String)> {
let mut updates: Vec<(String, String)> = Vec::with_capacity(3);
if let Some(p) = project {
if !p.is_empty() {
updates.push(("auth.project".to_string(), p.to_string()));
}
}
updates.push(("auth.token".to_string(), token.to_string()));
updates.push(("auth.provider".to_string(), provider.to_string()));
updates
}
pub fn format_sink_payload(format: &SinkFormat, raw: &str) -> Result<String> {
match format {
SinkFormat::Json => Ok(raw.to_string()),
SinkFormat::Yaml => {
if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(raw) {
Ok(serde_yaml::to_string(&json_val).unwrap_or_else(|_| raw.to_string()))
} else {
Ok(raw.to_string())
}
}
SinkFormat::Csv => json_to_csv(raw),
}
}
pub fn json_to_csv(json_str: &str) -> Result<String> {
let value: serde_json::Value =
serde_json::from_str(json_str).unwrap_or(serde_json::Value::String(json_str.to_string()));
match value {
serde_json::Value::Array(arr) if !arr.is_empty() => {
let headers: Vec<String> = if let Some(serde_json::Value::Object(obj)) = arr.first() {
obj.keys().cloned().collect()
} else {
return Ok(json_str.to_string());
};
let mut csv = headers.join(",") + "\n";
for item in &arr {
if let serde_json::Value::Object(obj) = item {
let row: Vec<String> = headers
.iter()
.map(|h| {
obj.get(h)
.map(|v| match v {
serde_json::Value::String(s) => {
if s.contains(',') || s.contains('"') {
format!("\"{}\"", s.replace('"', "\"\""))
} else {
s.clone()
}
}
_ => v.to_string(),
})
.unwrap_or_default()
})
.collect();
csv.push_str(&row.join(","));
csv.push('\n');
}
}
Ok(csv)
}
_ => Ok(json_str.to_string()),
}
}
pub async fn gcs_upload(bucket: &str, key: &str, data: &str) -> Result<()> {
use object_store::gcp::GoogleCloudStorageBuilder;
let store = GoogleCloudStorageBuilder::from_env()
.with_bucket_name(bucket)
.build()
.map_err(|e| anyhow::anyhow!("failed to build GCS store for bucket {:?}: {}", bucket, e))?;
gcs_upload_with_store(Arc::new(store), key, data).await
}
pub async fn gcs_upload_with_store(
store: Arc<dyn ObjectStore>,
key: &str,
data: &str,
) -> Result<()> {
let bytes = Bytes::from(data.to_string());
let byte_len = bytes.len();
let path = StorePath::from(key);
let span = info_span!(
"gcs.upload",
key = key,
bytes = byte_len,
);
async move {
let start = Instant::now();
store
.put(&path, PutPayload::from_bytes(bytes))
.await
.map_err(|e| anyhow::anyhow!("GCS upload failed for key {:?}: {}", key, e))?;
let elapsed_ms = start.elapsed().as_millis();
tracing::debug!(
target: "noetl::gcs",
duration_ms = elapsed_ms,
key = key,
bytes = byte_len,
"gcs.upload complete"
);
Ok(())
}
.instrument(span)
.await
}
fn target_to_value(target: &crate::playbook::SinkTarget) -> serde_json::Value {
match target {
crate::playbook::SinkTarget::File { path } => {
serde_json::json!({"type": "file", "path": path})
}
crate::playbook::SinkTarget::DuckDb { db, table } => {
serde_json::json!({"type": "duckdb", "db": db, "table": table})
}
crate::playbook::SinkTarget::Gcs { bucket, path } => {
serde_json::json!({"type": "gcs", "bucket": bucket, "path": path})
}
}
}
pub fn from_tools_result(result: ToolResult) -> Result<BridgeOutcome> {
match result.status {
ToolStatus::Success => {
let payload = result
.data
.map(|v| match v {
serde_json::Value::String(s) => s,
other => other.to_string(),
})
.or(result.stdout);
Ok(BridgeOutcome { result: payload })
}
ToolStatus::Error => Err(anyhow::anyhow!(
"tool execution failed: {}",
result.error.unwrap_or_else(|| "unknown error".to_string())
)),
ToolStatus::Timeout => Err(anyhow::anyhow!(
"tool execution timed out after {} ms",
result.duration_ms.unwrap_or(0)
)),
}
}
pub async fn dispatch_via_registry(
tool: &Tool,
bridge: &BridgeContext<'_>,
) -> Result<BridgeOutcome> {
let _config = to_tools_config(tool);
let _ctx = to_tools_context(bridge);
match tool {
Tool::Rhai { .. } => {
let rhai_tool = RhaiTool::new();
let config = to_tools_config(tool);
let ctx = to_tools_context_for_rhai(bridge);
let result = rhai_tool
.execute(&config, &ctx)
.await
.map_err(|e| anyhow::anyhow!("rhai dispatch failed: {}", e))?;
from_tools_result(result)
}
Tool::Shell { cmds } => {
let commands: Vec<String> = match cmds {
CmdsList::Single(cmd) => cmd
.lines()
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect(),
CmdsList::Multiple(c) => c.clone(),
};
let shell_tool = ShellTool::new();
let ctx = to_tools_context(bridge);
let mut last_outcome = BridgeOutcome::empty();
for command in commands {
let config = shell_command_config(&command);
let result = shell_tool
.execute(&config, &ctx)
.await
.map_err(|e| anyhow::anyhow!("shell dispatch failed: {}", e))?;
if result.status != ToolStatus::Success {
let exit_code = result
.data
.as_ref()
.and_then(|d| d.get("exit_code"))
.and_then(|v| v.as_i64());
anyhow::bail!(
"Command failed with exit code: {:?}",
exit_code
);
}
let stdout = result
.data
.as_ref()
.and_then(|d| d.get("stdout"))
.and_then(|v| v.as_str())
.map(|s| s.trim_end_matches('\n').to_string());
last_outcome = BridgeOutcome { result: stdout };
}
Ok(last_outcome)
}
Tool::Http {
method,
url,
headers,
params,
body,
auth,
} => {
let bearer = if let Some(auth_cfg) = auth {
Some(resolve_auth_to_bearer(auth_cfg).await?)
} else {
None
};
let config = http_tool_config(
method,
url,
headers,
params,
body.as_deref(),
bearer.as_deref(),
);
let http_tool = HttpTool::new();
let ctx = to_tools_context(bridge);
let result = http_tool
.execute(&config, &ctx)
.await
.map_err(|e| anyhow::anyhow!("http dispatch failed: {}", e))?;
reshape_http_result(result)
}
Tool::DuckDb { db, query, params } => {
let query = match query {
Some(q) if !q.trim().is_empty() => q,
_ => return Ok(BridgeOutcome::empty()),
};
let config = duckdb_tool_config(db, query, params);
let duckdb_tool = DuckdbTool::new();
let ctx = to_tools_context(bridge);
let result = duckdb_tool
.execute(&config, &ctx)
.await
.map_err(|e| anyhow::anyhow!("duckdb dispatch failed: {}", e))?;
reshape_duckdb_result(result)
}
Tool::Playbook { .. } => {
anyhow::bail!(
"Tool::Playbook is not bridgeable: sub-playbook \
execution stays in the CLI's tree walker per \
§ H.10 of the Rust migration roadmap. Use \
`PlaybookRunner::new(path).run()` directly from \
the CLI."
);
}
Tool::Auth { .. } => {
anyhow::bail!(
"Tool::Auth is not bridge-dispatched: use \
`resolve_auth_to_bearer` for token resolution and \
`auth_context_updates` for applying the token to \
the caller's execution context. See § H.10 of the \
Rust migration roadmap."
);
}
Tool::Sink { .. } => {
anyhow::bail!(
"Tool::Sink is not bridge-dispatched: noetl-tools \
has no file / GCS / object-store target. Use \
`format_sink_payload` for format conversion; the \
CLI's sink targets (file / duckdb / gcs) stay \
inline per § H.10. GCS migration to `object_store` \
is tracked as a separate follow-up."
);
}
Tool::Unsupported => {
anyhow::bail!("unsupported tool kind");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::playbook::{AuthConfig as CliAuthConfig, SinkFormat, SinkTarget};
fn empty_vars() -> HashMap<String, String> {
HashMap::new()
}
fn bridge_ctx<'a>(vars: &'a HashMap<String, String>) -> BridgeContext<'a> {
BridgeContext {
execution_id: 12345,
step: "test_step",
variables: vars,
server_url: String::new(),
worker_id: None,
command_id: None,
}
}
#[test]
fn to_tools_context_wraps_string_variables_as_json_value() {
let vars: HashMap<String, String> =
[("workload.region".into(), "us-west-1".into())].into();
let ctx = to_tools_context(&bridge_ctx(&vars));
assert_eq!(ctx.execution_id, 12345);
assert_eq!(ctx.step, "test_step");
assert_eq!(
ctx.variables.get("workload.region"),
Some(&serde_json::Value::String("us-west-1".into()))
);
assert!(ctx.secrets.is_empty(), "secrets stay empty by default");
}
#[test]
fn to_tools_config_shell_single_cmd() {
let tool = Tool::Shell {
cmds: CmdsList::Single("ls -la".into()),
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.kind, "shell");
assert_eq!(cfg.config["command"], "ls -la");
assert_eq!(cfg.config["shell"], "bash");
assert_eq!(cfg.config["capture"], true);
assert!(cfg.timeout.is_none());
}
#[test]
fn to_tools_config_shell_multiple_cmds_joins_with_newlines() {
let tool = Tool::Shell {
cmds: CmdsList::Multiple(vec!["echo one".into(), "echo two".into()]),
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.kind, "shell");
assert_eq!(cfg.config["command"], "echo one\necho two");
}
#[test]
fn shell_command_config_emits_per_cmd_shape() {
let cfg = shell_command_config("echo hi");
assert_eq!(cfg.kind, "shell");
assert_eq!(cfg.config["command"], "echo hi");
assert_eq!(cfg.config["shell"], "bash");
assert_eq!(cfg.config["capture"], true);
}
#[test]
fn to_tools_config_http_round_trips_essentials() {
let tool = Tool::Http {
method: "post".into(), url: "https://example.com/api".into(),
headers: HashMap::new(),
params: HashMap::new(),
body: Some(r#"{"k":"v"}"#.into()),
auth: None,
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.kind, "http");
assert_eq!(cfg.config["method"], "POST");
assert_eq!(cfg.config["url"], "https://example.com/api");
assert_eq!(cfg.config["body"], serde_json::json!({"k": "v"}));
}
#[test]
fn to_tools_config_http_keeps_non_json_body_as_string() {
let tool = Tool::Http {
method: "POST".into(),
url: "https://example.com".into(),
headers: HashMap::new(),
params: HashMap::new(),
body: Some("not json at all".into()),
auth: None,
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.config["body"], "not json at all");
}
#[test]
fn http_body_value_parses_json_strings() {
let v = http_body_value(r#"{"a":1}"#);
assert_eq!(v, serde_json::json!({"a": 1}));
}
#[test]
fn http_body_value_falls_back_to_string() {
let v = http_body_value("plain text body");
assert_eq!(v, serde_json::Value::String("plain text body".into()));
}
#[test]
fn http_tool_config_injects_bearer_header() {
let cfg = http_tool_config(
"GET",
"https://example.com",
&HashMap::new(),
&HashMap::new(),
None,
Some("test-token-123"),
);
assert_eq!(cfg.kind, "http");
assert_eq!(
cfg.config["headers"]["Authorization"],
"Bearer test-token-123"
);
}
#[test]
fn http_tool_config_preserves_caller_headers_with_bearer() {
let mut hdrs = HashMap::new();
hdrs.insert("X-Trace-Id".into(), "abc123".into());
let cfg = http_tool_config(
"POST",
"https://example.com",
&hdrs,
&HashMap::new(),
None,
Some("token"),
);
assert_eq!(cfg.config["headers"]["X-Trace-Id"], "abc123");
assert_eq!(cfg.config["headers"]["Authorization"], "Bearer token");
}
#[test]
fn http_tool_config_no_auth_omits_authorization_header() {
let cfg = http_tool_config(
"GET",
"https://example.com",
&HashMap::new(),
&HashMap::new(),
None,
None,
);
let hdrs = cfg.config["headers"].as_object().unwrap();
assert!(!hdrs.contains_key("Authorization"));
}
#[test]
fn reshape_http_result_extracts_envelope() {
let mut result = ToolResult::success(serde_json::json!({
"status_code": 200,
"headers": {},
"body": {"ok": true},
}));
result.exit_code = Some(0);
let outcome = reshape_http_result(result).unwrap();
let parsed: serde_json::Value =
serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
assert_eq!(parsed["status"], 200);
assert_eq!(parsed["body"], serde_json::json!({"ok": true}));
}
#[test]
fn reshape_http_result_preserves_4xx_envelope_without_erroring() {
let mut result = ToolResult {
status: ToolStatus::Error,
data: Some(serde_json::json!({
"status_code": 404,
"headers": {},
"body": {"error": "not found"},
})),
error: Some("HTTP 404 response".into()),
stdout: None,
stderr: None,
exit_code: Some(1),
duration_ms: Some(5),
pending_callback: None,
};
result.exit_code = Some(1);
let outcome = reshape_http_result(result).unwrap();
let parsed: serde_json::Value =
serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
assert_eq!(parsed["status"], 404);
assert_eq!(parsed["body"], serde_json::json!({"error": "not found"}));
}
#[tokio::test]
async fn resolve_auth_to_bearer_rejects_unknown_provider() {
let cfg = CliAuthConfig {
provider: "azure".into(),
scopes: vec![],
};
let err = resolve_auth_to_bearer(&cfg).await.unwrap_err();
assert!(err.to_string().contains("unsupported auth provider"));
}
#[test]
fn duckdb_tool_config_emits_noetl_tools_schema() {
let cfg = duckdb_tool_config(
":memory:",
"SELECT 1",
&["arg1".to_string()],
);
assert_eq!(cfg.kind, "duckdb");
assert_eq!(cfg.config["db_path"], ":memory:");
assert_eq!(cfg.config["query"], "SELECT 1");
assert_eq!(cfg.config["as_objects"], true);
assert_eq!(
cfg.config["params"],
serde_json::json!([serde_json::Value::String("arg1".into())])
);
}
#[test]
fn to_tools_config_duckdb_carries_path_and_query() {
let tool = Tool::DuckDb {
db: "warehouse.db".into(),
query: Some("SELECT count(*) FROM orders".into()),
params: vec![],
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.kind, "duckdb");
assert_eq!(cfg.config["db_path"], "warehouse.db");
assert_eq!(cfg.config["query"], "SELECT count(*) FROM orders");
assert_eq!(cfg.config["as_objects"], true);
}
#[test]
fn to_tools_config_duckdb_missing_query_becomes_empty_string() {
let tool = Tool::DuckDb {
db: ":memory:".into(),
query: None,
params: vec![],
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.config["query"], "");
}
#[test]
fn reshape_duckdb_result_select_returns_rows_array() {
let result = ToolResult::success(serde_json::json!({
"columns": ["id", "name"],
"rows": [
{"id": 1, "name": "alice"},
{"id": 2, "name": "bob"},
],
"row_count": 2
}));
let outcome = reshape_duckdb_result(result).unwrap();
let parsed: serde_json::Value =
serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
let arr = parsed.as_array().expect("result is an array");
assert_eq!(arr.len(), 2);
assert_eq!(arr[0]["id"], 1);
assert_eq!(arr[0]["name"], "alice");
assert_eq!(arr[1]["name"], "bob");
}
#[test]
fn reshape_duckdb_result_select_empty_returns_empty_array() {
let result = ToolResult::success(serde_json::json!({
"columns": ["id"],
"rows": [],
"row_count": 0
}));
let outcome = reshape_duckdb_result(result).unwrap();
let parsed: serde_json::Value =
serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
assert_eq!(parsed.as_array().unwrap().len(), 0);
}
#[test]
fn reshape_duckdb_result_non_select_returns_status_envelope() {
let result = ToolResult::success(serde_json::json!({
"affected_rows": 3
}));
let outcome = reshape_duckdb_result(result).unwrap();
assert_eq!(outcome.result.as_deref(), Some(r#"{"status": "ok"}"#));
}
#[tokio::test]
async fn dispatch_duckdb_select_returns_rows_array() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::DuckDb {
db: ":memory:".into(),
query: Some("SELECT 1 AS num, 'hello' AS msg".into()),
params: vec![],
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
let parsed: serde_json::Value =
serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
let arr = parsed.as_array().expect("result is an array");
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["num"], 1);
assert_eq!(arr[0]["msg"], "hello");
}
#[tokio::test]
async fn dispatch_duckdb_missing_query_returns_empty_outcome() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::DuckDb {
db: ":memory:".into(),
query: None,
params: vec![],
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert!(outcome.result.is_none());
}
#[tokio::test]
async fn dispatch_duckdb_empty_query_returns_empty_outcome() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::DuckDb {
db: ":memory:".into(),
query: Some(" ".into()), params: vec![],
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert!(outcome.result.is_none());
}
#[test]
fn prepare_sub_playbook_vars_passes_parent_vars_through() {
let parent: HashMap<String, String> =
[("vars.timeout".into(), "30".into())].into();
let sub = prepare_sub_playbook_vars(
&parent,
&HashMap::new(),
&HashMap::new(),
|t| Ok(t.to_string()),
)
.unwrap();
assert_eq!(sub.get("vars.timeout"), Some(&"30".to_string()));
}
#[test]
fn prepare_sub_playbook_vars_v2_input_takes_precedence_over_v1_args() {
let parent: HashMap<String, String> = HashMap::new();
let mut input = HashMap::new();
input.insert(
"region".into(),
serde_yaml::Value::String("us-east-1".into()),
);
let mut args = HashMap::new();
args.insert("region".into(), "us-west-1".into());
let sub = prepare_sub_playbook_vars(&parent, &args, &input, |t| {
Ok(t.to_string())
})
.unwrap();
assert_eq!(sub.get("workload.region"), Some(&"us-east-1".to_string()));
}
#[test]
fn prepare_sub_playbook_vars_v1_args_used_when_input_empty() {
let parent: HashMap<String, String> = HashMap::new();
let mut args = HashMap::new();
args.insert("tier".into(), "prod".into());
let sub = prepare_sub_playbook_vars(
&parent,
&args,
&HashMap::new(),
|t| Ok(t.to_string()),
)
.unwrap();
assert_eq!(sub.get("workload.tier"), Some(&"prod".to_string()));
}
#[test]
fn prepare_sub_playbook_vars_renders_input_templates() {
let parent: HashMap<String, String> = HashMap::new();
let mut input = HashMap::new();
input.insert(
"url".into(),
serde_yaml::Value::String("{{base}}/api".into()),
);
let sub = prepare_sub_playbook_vars(
&parent,
&HashMap::new(),
&input,
|t| Ok(t.replace("{{base}}", "https://example.com")),
)
.unwrap();
assert_eq!(
sub.get("workload.url"),
Some(&"https://example.com/api".to_string())
);
}
#[test]
fn prepare_sub_playbook_vars_coerces_yaml_numbers_and_bools() {
let parent: HashMap<String, String> = HashMap::new();
let mut input = HashMap::new();
input.insert(
"timeout".into(),
serde_yaml::Value::Number(serde_yaml::Number::from(30)),
);
input.insert("verbose".into(), serde_yaml::Value::Bool(true));
let sub = prepare_sub_playbook_vars(
&parent,
&HashMap::new(),
&input,
|t| Ok(t.to_string()),
)
.unwrap();
assert_eq!(sub.get("workload.timeout"), Some(&"30".to_string()));
assert_eq!(sub.get("workload.verbose"), Some(&"true".to_string()));
}
#[test]
fn prepare_sub_playbook_vars_passes_through_when_both_empty() {
let parent: HashMap<String, String> = [(
"workload.region".into(),
"us-east-1".into(),
)]
.into();
let sub = prepare_sub_playbook_vars(
&parent,
&HashMap::new(),
&HashMap::new(),
|t| Ok(t.to_string()),
)
.unwrap();
assert_eq!(sub.len(), 1);
assert_eq!(
sub.get("workload.region"),
Some(&"us-east-1".to_string())
);
}
#[test]
fn prepare_sub_playbook_vars_render_error_propagates() {
let parent: HashMap<String, String> = HashMap::new();
let mut input = HashMap::new();
input.insert(
"bad".into(),
serde_yaml::Value::String("{{nope}}".into()),
);
let result = prepare_sub_playbook_vars(
&parent,
&HashMap::new(),
&input,
|_| Err(anyhow::anyhow!("render exploded")),
);
assert!(result.unwrap_err().to_string().contains("render exploded"));
}
#[test]
fn auth_context_updates_includes_token_and_provider() {
let updates = auth_context_updates("gcp", "tok-123", None);
let map: HashMap<String, String> = updates.into_iter().collect();
assert_eq!(map.get("auth.token"), Some(&"tok-123".to_string()));
assert_eq!(map.get("auth.provider"), Some(&"gcp".to_string()));
assert!(map.get("auth.project").is_none());
}
#[test]
fn auth_context_updates_includes_project_when_set() {
let updates = auth_context_updates("adc", "t", Some("my-project"));
let map: HashMap<String, String> = updates.into_iter().collect();
assert_eq!(
map.get("auth.project"),
Some(&"my-project".to_string())
);
assert_eq!(map.get("auth.token"), Some(&"t".to_string()));
assert_eq!(map.get("auth.provider"), Some(&"adc".to_string()));
}
#[test]
fn auth_context_updates_skips_empty_project() {
let updates = auth_context_updates("gcp", "t", Some(""));
let map: HashMap<String, String> = updates.into_iter().collect();
assert!(map.get("auth.project").is_none());
}
#[test]
fn auth_context_updates_orders_project_before_token() {
let updates = auth_context_updates("gcp", "t", Some("p"));
assert_eq!(updates[0].0, "auth.project");
assert_eq!(updates[1].0, "auth.token");
assert_eq!(updates[2].0, "auth.provider");
}
#[test]
fn format_sink_payload_json_passthrough() {
let raw = r#"{"k": "v"}"#;
let out = format_sink_payload(&SinkFormat::Json, raw).unwrap();
assert_eq!(out, raw);
}
#[test]
fn format_sink_payload_yaml_converts_json_object() {
let raw = r#"{"k": "v"}"#;
let out = format_sink_payload(&SinkFormat::Yaml, raw).unwrap();
let reparsed: serde_yaml::Value = serde_yaml::from_str(&out).unwrap();
assert_eq!(reparsed["k"].as_str(), Some("v"));
}
#[test]
fn format_sink_payload_yaml_falls_back_when_not_json() {
let raw = "not even close to json";
let out = format_sink_payload(&SinkFormat::Yaml, raw).unwrap();
assert_eq!(out, raw);
}
#[test]
fn format_sink_payload_csv_uses_json_to_csv() {
let raw = r#"[{"a":1,"b":2},{"a":3,"b":4}]"#;
let out = format_sink_payload(&SinkFormat::Csv, raw).unwrap();
assert!(out.contains("a,b\n") || out.contains("b,a\n"));
assert_eq!(out.lines().count(), 3);
}
#[test]
fn json_to_csv_returns_input_for_non_array() {
assert_eq!(json_to_csv("not json").unwrap(), "not json");
assert_eq!(json_to_csv(r#"{"k":"v"}"#).unwrap(), r#"{"k":"v"}"#);
}
#[test]
fn json_to_csv_returns_input_for_empty_array() {
assert_eq!(json_to_csv("[]").unwrap(), "[]");
}
#[test]
fn json_to_csv_emits_header_and_rows_for_object_array() {
let raw = r#"[{"name":"alice","age":30},{"name":"bob","age":25}]"#;
let csv = json_to_csv(raw).unwrap();
let lines: Vec<&str> = csv.lines().collect();
assert_eq!(lines.len(), 3);
assert!(lines[0] == "name,age" || lines[0] == "age,name");
assert!(lines[1].contains("alice") && lines[1].contains("30"));
assert!(lines[2].contains("bob") && lines[2].contains("25"));
}
#[test]
fn json_to_csv_quotes_strings_with_commas() {
let raw = r#"[{"label":"a, b","n":1}]"#;
let csv = json_to_csv(raw).unwrap();
assert!(csv.contains("\"a, b\""), "csv: {csv}");
}
#[test]
fn json_to_csv_doubles_embedded_quotes() {
let raw = r#"[{"q":"she said \"hi\""}]"#;
let csv = json_to_csv(raw).unwrap();
assert!(csv.contains("\"she said \"\"hi\"\"\""), "csv: {csv}");
}
#[test]
fn json_to_csv_missing_field_emits_empty() {
let raw = r#"[{"a":1,"b":2},{"a":3}]"#; let csv = json_to_csv(raw).unwrap();
let lines: Vec<&str> = csv.lines().collect();
assert!(
lines[2].ends_with(",") || lines[2].contains(",,"),
"csv: {csv}"
);
}
#[test]
fn to_tools_config_rhai_carries_code() {
let tool = Tool::Rhai {
code: "let x = 1; x + 1".into(),
args: HashMap::new(),
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.kind, "rhai");
assert_eq!(cfg.config["code"], "let x = 1; x + 1");
}
#[test]
fn to_tools_config_sink_emits_typed_target() {
let tool = Tool::Sink {
target: SinkTarget::File {
path: "/tmp/out.json".into(),
},
format: SinkFormat::Json,
};
let cfg = to_tools_config(&tool);
assert_eq!(cfg.kind, "sink");
assert_eq!(cfg.config["target"]["type"], "file");
assert_eq!(cfg.config["target"]["path"], "/tmp/out.json");
assert_eq!(cfg.config["format"], "json");
}
#[test]
fn from_tools_result_success_returns_data_string() {
let result = ToolResult::success(serde_json::Value::String("hello".into()));
let outcome = from_tools_result(result).unwrap();
assert_eq!(outcome.result, Some("hello".into()));
}
#[test]
fn from_tools_result_success_serialises_non_string_data() {
let result = ToolResult::success(serde_json::json!({"k": "v"}));
let outcome = from_tools_result(result).unwrap();
assert_eq!(outcome.result, Some(r#"{"k":"v"}"#.into()));
}
#[test]
fn from_tools_result_success_falls_back_to_stdout() {
let mut result = ToolResult::success(serde_json::Value::Null);
result.data = None;
result.stdout = Some("script output".into());
let outcome = from_tools_result(result).unwrap();
assert_eq!(outcome.result, Some("script output".into()));
}
#[test]
fn from_tools_result_error_propagates_message() {
let result = ToolResult::error("connection refused");
let err = from_tools_result(result).unwrap_err();
assert!(err.to_string().contains("connection refused"));
}
#[tokio::test]
async fn dispatch_auth_bails_pointing_at_helper() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Auth {
provider: "adc".into(),
scopes: vec![],
project: None,
};
let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Tool::Auth")
&& msg.contains("resolve_auth_to_bearer")
&& msg.contains("auth_context_updates"),
"error should point at the helpers: {msg}"
);
}
#[tokio::test]
async fn dispatch_sink_bails_pointing_at_helper() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Sink {
target: crate::playbook::SinkTarget::File {
path: "/tmp/out.json".into(),
},
format: SinkFormat::Json,
};
let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Tool::Sink") && msg.contains("format_sink_payload"),
"error should point at the helper: {msg}"
);
}
#[tokio::test]
async fn dispatch_playbook_bails_with_h10_finding() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Playbook {
path: "sub.yaml".into(),
args: HashMap::new(),
input: HashMap::new(),
};
let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Tool::Playbook")
&& msg.contains("not bridgeable")
&& msg.contains("§ H.10"),
"error message should explain the § H.10 finding: {msg}"
);
}
#[tokio::test]
async fn dispatch_shell_single_command_returns_stdout() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Shell {
cmds: CmdsList::Single("echo bridged".into()),
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert_eq!(outcome.result, Some("bridged".into()));
}
#[tokio::test]
async fn dispatch_shell_multiple_returns_last_command_stdout() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Shell {
cmds: CmdsList::Multiple(vec![
"echo first".into(),
"echo second".into(),
"echo third".into(),
]),
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert_eq!(outcome.result, Some("third".into()));
}
#[tokio::test]
async fn dispatch_shell_failure_propagates_error() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Shell {
cmds: CmdsList::Single("exit 7".into()),
};
let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
assert!(
err.to_string().contains("shell")
|| err.to_string().contains("exit")
|| err.to_string().contains("failed"),
"error message: {}",
err
);
}
#[tokio::test]
async fn dispatch_shell_single_with_newlines_runs_each_line_independently() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Shell {
cmds: CmdsList::Single("echo first_line\necho second_line".into()),
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert_eq!(outcome.result, Some("second_line".into()));
}
#[tokio::test]
async fn dispatch_via_registry_unsupported_errors() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Unsupported;
let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
assert!(err.to_string().contains("unsupported"));
}
#[tokio::test]
async fn dispatch_rhai_evaluates_simple_arithmetic() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Rhai {
code: "let x = 40; let y = 2; (x + y).to_string()".into(),
args: HashMap::new(),
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert_eq!(outcome.result, Some("42".into()));
}
#[tokio::test]
async fn dispatch_rhai_reads_workload_variable_via_scope() {
let vars: HashMap<String, String> =
[("workload.region".into(), "us-west-1".into())].into();
let bridge = bridge_ctx(&vars);
let tool = Tool::Rhai {
code: r#"workload.region.to_string()"#.into(),
args: HashMap::new(),
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert_eq!(outcome.result, Some("us-west-1".into()));
}
#[tokio::test]
async fn dispatch_rhai_reads_step_result_via_field_access() {
let vars: HashMap<String, String> = [
("check_health.result".into(), "ok".into()),
("check_health.status".into(), "200".into()),
]
.into();
let bridge = bridge_ctx(&vars);
let tool = Tool::Rhai {
code: r#"check_health.result.to_string()"#.into(),
args: HashMap::new(),
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert_eq!(outcome.result, Some("ok".into()));
}
#[test]
fn to_tools_context_for_rhai_groups_workload_prefix() {
let vars: HashMap<String, String> = [
("workload.region".into(), "us-west-1".into()),
("workload.tier".into(), "prod".into()),
("vars.timeout".into(), "30".into()),
("step_a.result".into(), "done".into()),
("toplevel".into(), "kept_at_root".into()),
]
.into();
let bridge = bridge_ctx(&vars);
let ctx = to_tools_context_for_rhai(&bridge);
let workload = ctx
.variables
.get("workload")
.expect("workload group should exist")
.as_object()
.expect("workload should be an object");
assert_eq!(workload.get("region"), Some(&serde_json::json!("us-west-1")));
assert_eq!(workload.get("tier"), Some(&serde_json::json!("prod")));
let vars_map = ctx.variables.get("vars").and_then(|v| v.as_object()).unwrap();
assert_eq!(vars_map.get("timeout"), Some(&serde_json::json!("30")));
let step_a = ctx.variables.get("step_a").and_then(|v| v.as_object()).unwrap();
assert_eq!(step_a.get("result"), Some(&serde_json::json!("done")));
assert_eq!(
ctx.variables.get("toplevel"),
Some(&serde_json::json!("kept_at_root"))
);
}
#[tokio::test]
async fn dispatch_rhai_string_literal_returns_unquoted() {
let vars = empty_vars();
let bridge = bridge_ctx(&vars);
let tool = Tool::Rhai {
code: r#""hello world""#.into(),
args: HashMap::new(),
};
let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
assert_eq!(outcome.result, Some("hello world".into()));
}
#[test]
fn cli_auth_config_constructs() {
let _auth = CliAuthConfig {
provider: "adc".into(),
scopes: vec!["https://www.googleapis.com/auth/cloud-platform".into()],
};
}
#[tokio::test]
async fn gcs_upload_with_store_writes_data_to_object_store() {
use object_store::memory::InMemory;
use object_store::ObjectStore;
let store = Arc::new(InMemory::new());
gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "output/data.json", r#"{"k":"v"}"#)
.await
.expect("upload should succeed");
let path = StorePath::from("output/data.json");
let retrieved = store.get(&path).await.expect("should read back uploaded object");
let body = retrieved.bytes().await.expect("should get bytes");
assert_eq!(body, bytes::Bytes::from(r#"{"k":"v"}"#));
}
#[tokio::test]
async fn gcs_upload_with_store_overwrites_existing_key() {
use object_store::memory::InMemory;
use object_store::ObjectStore;
let store = Arc::new(InMemory::new());
gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "data.csv", "first").await.unwrap();
gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "data.csv", "second").await.unwrap();
let path = StorePath::from("data.csv");
let body = store.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(body, bytes::Bytes::from("second"));
}
#[tokio::test]
async fn gcs_upload_with_store_handles_nested_key_paths() {
use object_store::memory::InMemory;
use object_store::ObjectStore;
let store = Arc::new(InMemory::new());
gcs_upload_with_store(
Arc::clone(&store) as Arc<dyn ObjectStore>,
"runs/2026-06-01/output/result.json",
"[]",
)
.await
.unwrap();
let path = StorePath::from("runs/2026-06-01/output/result.json");
let body = store.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(body, bytes::Bytes::from("[]"));
}
#[tokio::test]
async fn gcs_upload_with_store_uploads_empty_string() {
use object_store::memory::InMemory;
use object_store::ObjectStore;
let store = Arc::new(InMemory::new());
gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "empty.txt", "").await.unwrap();
let path = StorePath::from("empty.txt");
let body = store.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(body.len(), 0);
}
}