noetl_executor/tools_bridge.rs
1//! Bridge from the CLI's YAML-parsed [`crate::playbook::Tool`] enum
2//! onto the [`noetl_tools`] registry's dispatch API.
3//!
4//! Added in R-1.1 PR-2c-1 per § H.10.4 of Appendix H of the global
5//! hybrid cloud blueprint; fleshed out with adapter helpers in
6//! R-1.1 PR-2c-2. This module is the integration surface between
7//! the CLI's parsed playbook and the shared tool registry the
8//! worker (R-1.3) also uses.
9//!
10//! ## Strategy B rollout
11//!
12//! Replacement of the CLI's inline tool implementations happens
13//! incrementally — one tool kind per sub-PR (PR-2c-3 rhai, PR-2c-4
14//! shell, PR-2c-5 http, PR-2c-6 duckdb, PR-2c-7 playbook, PR-2c-8
15//! auth + sink). This module ships the adapter layer in PR-2c-2;
16//! each subsequent sub-PR fills in one [`dispatch_via_registry`]
17//! match arm and replaces the matching CLI call site in
18//! `repos/cli/src/playbook_runner.rs`.
19//!
20//! ## Why a bridge instead of converting the Tool enum directly
21//!
22//! The CLI's [`crate::playbook::Tool`] enum and the registry's
23//! [`noetl_tools::registry::ToolConfig`] carry different invariants:
24//!
25//! - The CLI's `Tool::Auth { provider, scopes, project }` resolves
26//! credentials inline during dispatch. The worker resolves them at
27//! credential-resolution time (before tool dispatch). The bridge
28//! needs to know which mode to use; it's not a trivial enum cast.
29//! - The CLI's `Tool::Sink { target, format }` writes outputs through
30//! the runner's filesystem helpers. The registry would dispatch
31//! sinks through the same `noetl-tools` registry, but the tool kind
32//! doesn't exist on the worker side yet (PR-2c-8 may add it).
33//! - The CLI's `Tool::DuckDb { db, query, params }` opens a fresh
34//! DuckDB connection per call. `noetl-tools::tools::duckdb`
35//! manages a pool. Semantic difference; needs careful migration.
36//!
37//! Keeping the bridge explicit forces these decisions into one place
38//! instead of scattering them across each tool-kind sub-PR.
39//!
40//! ## GCS upload helper (R-3, noetl/ai-meta#31)
41//!
42//! [`gcs_upload`] wraps `object_store::gcp::GoogleCloudStorageBuilder`
43//! so the CLI's `SinkTarget::Gcs` arm no longer shells out to `gsutil`.
44//! Auth flows through the same provider chain as
45//! [`resolve_auth_to_bearer`]: workload identity on GKE, Application
46//! Default Credentials on dev hosts. The helper accepts a pluggable
47//! `Arc<dyn ObjectStore>` so integration tests substitute an
48//! `object_store::memory::InMemory` store without real GCS. See
49//! [`gcs_upload`] for the full credential-chain and error-shape notes.
50
51#![allow(dead_code)] // until PR-2c-4 onwards wires the call sites in.
52
53use std::collections::HashMap;
54use std::sync::Arc;
55use std::time::Instant;
56
57use anyhow::Result;
58use bytes::Bytes;
59use object_store::path::Path as StorePath;
60use object_store::ObjectStore;
61use object_store::PutPayload;
62use noetl_tools::auth::GcpAuth;
63use noetl_tools::context::ExecutionContext as ToolsExecutionContext;
64use noetl_tools::registry::{Tool as ToolsRegistryTool, ToolConfig};
65use noetl_tools::result::{ToolResult, ToolStatus};
66use noetl_tools::tools::{DuckdbTool, HttpTool, RhaiTool, ShellTool};
67use tracing::{info_span, Instrument};
68
69use crate::playbook::{AuthConfig as CliAuthConfig, CmdsList, SinkFormat, Tool};
70
71// ---------------------------------------------------------------------------
72// Bridge outcome — what the dispatch returns back to the caller.
73// ---------------------------------------------------------------------------
74
75/// Outcome of a bridged tool dispatch.
76///
77/// The shape matches the existing CLI surface where
78/// `PlaybookRunner::execute_tool` returns `Result<Option<String>>`:
79/// `result == Some(s)` for a successful tool execution that produced
80/// output the runner stores in `step_results[step].result`; `None`
81/// for tools that do not produce a per-step string result (e.g.
82/// fire-and-forget sinks).
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct BridgeOutcome {
85 pub result: Option<String>,
86}
87
88impl BridgeOutcome {
89 pub fn empty() -> Self {
90 Self { result: None }
91 }
92}
93
94// ---------------------------------------------------------------------------
95// Bridge context — what the dispatch needs from the caller.
96// ---------------------------------------------------------------------------
97
98/// Per-call context for the bridge. Groups together what would
99/// otherwise be many parameters threaded through every dispatch site.
100///
101/// The CLI's `ExecutionContext` (`repos/cli/src/playbook_runner.rs`)
102/// has a different shape than [`ToolsExecutionContext`] — the CLI
103/// uses `HashMap<String, String>` for variables and tracks step
104/// results separately; `noetl-tools` uses `HashMap<String,
105/// serde_json::Value>` and bundles many more execution-level fields
106/// (server_url, worker_id, command_id, etc.).
107///
108/// `BridgeContext` is the narrow view the CLI hands to the bridge;
109/// [`to_tools_context`] expands it into the full
110/// [`ToolsExecutionContext`] shape.
111pub struct BridgeContext<'a> {
112 /// Execution id — required by [`ToolsExecutionContext`]. CLI
113 /// local mode synthesises this from the start time / playbook
114 /// path; the worker uses the snowflake id from `noetl.command`.
115 pub execution_id: i64,
116
117 /// Step name the bridged tool is running under.
118 pub step: &'a str,
119
120 /// CLI variables map (workload.*, vars.*, <step>.result, etc.).
121 pub variables: &'a HashMap<String, String>,
122
123 /// Control-plane server URL. Empty string when running in
124 /// CLI local mode without a server backend.
125 pub server_url: String,
126
127 /// Worker id / command id — `None` in CLI local mode.
128 pub worker_id: Option<String>,
129 pub command_id: Option<String>,
130}
131
132// ---------------------------------------------------------------------------
133// Adapters
134// ---------------------------------------------------------------------------
135
136/// Convert a [`BridgeContext`] into the [`ToolsExecutionContext`]
137/// shape `noetl-tools` tools expect. String variables become
138/// [`serde_json::Value::String`] entries; secrets stay empty (CLI
139/// local mode resolves credentials at the credential-resolver layer,
140/// not at tool dispatch).
141///
142/// Variable shape: **flat**. Each CLI variable `workload.region`
143/// becomes a JSON value at the same flat key in the resulting map.
144/// This matches what most `noetl-tools` tools (http / postgres / etc.)
145/// expect from their template engine. The rhai tool needs a
146/// *nested* shape so `workload.region` is reachable as a Rhai field
147/// access on a `workload` map; see [`to_tools_context_for_rhai`] for
148/// the restructured variant used inside the rhai dispatch arm.
149pub fn to_tools_context(bridge: &BridgeContext) -> ToolsExecutionContext {
150 let variables: HashMap<String, serde_json::Value> = bridge
151 .variables
152 .iter()
153 .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
154 .collect();
155
156 ToolsExecutionContext {
157 execution_id: bridge.execution_id,
158 step: bridge.step.to_string(),
159 variables,
160 server_url: bridge.server_url.clone(),
161 worker_id: bridge.worker_id.clone(),
162 command_id: bridge.command_id.clone(),
163 ..ToolsExecutionContext::default()
164 }
165}
166
167/// Build a [`ToolsExecutionContext`] whose `variables` map matches the
168/// scope shape the CLI's inline `execute_rhai_script` produced — flat
169/// `workload.region` / `vars.x` / `<step>.<field>` keys grouped into
170/// nested objects so Rhai's `workload.region` / `vars.x` / `<step>.<field>`
171/// field-access syntax works.
172///
173/// PR-2c-3 introduces this for the rhai dispatch arm. Other tool
174/// kinds (http, postgres, duckdb, etc.) continue to consume the flat
175/// shape from [`to_tools_context`] because their template engines
176/// expect the `{{workload.region}}` lookup style, not Rhai-style
177/// field navigation.
178pub fn to_tools_context_for_rhai(bridge: &BridgeContext) -> ToolsExecutionContext {
179 let mut variables: HashMap<String, serde_json::Value> = HashMap::new();
180 let mut workload_map: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
181 let mut vars_map: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
182 let mut step_maps: HashMap<String, serde_json::Map<String, serde_json::Value>> =
183 HashMap::new();
184
185 for (key, value) in bridge.variables {
186 let val = serde_json::Value::String(value.clone());
187 if let Some(suffix) = key.strip_prefix("workload.") {
188 workload_map.insert(suffix.to_string(), val);
189 } else if let Some(suffix) = key.strip_prefix("vars.") {
190 vars_map.insert(suffix.to_string(), val);
191 } else if let Some((step, field)) = key.split_once('.') {
192 step_maps
193 .entry(step.to_string())
194 .or_default()
195 .insert(field.to_string(), val);
196 } else {
197 // Unprefixed keys land at the top level — same shape as
198 // [`to_tools_context`].
199 variables.insert(key.clone(), val);
200 }
201 }
202
203 if !workload_map.is_empty() {
204 variables.insert(
205 "workload".to_string(),
206 serde_json::Value::Object(workload_map),
207 );
208 }
209 if !vars_map.is_empty() {
210 variables.insert("vars".to_string(), serde_json::Value::Object(vars_map));
211 }
212 for (step, map) in step_maps {
213 variables.insert(step, serde_json::Value::Object(map));
214 }
215
216 ToolsExecutionContext {
217 execution_id: bridge.execution_id,
218 step: bridge.step.to_string(),
219 variables,
220 server_url: bridge.server_url.clone(),
221 worker_id: bridge.worker_id.clone(),
222 command_id: bridge.command_id.clone(),
223 ..ToolsExecutionContext::default()
224 }
225}
226
227/// Build a [`ToolConfig`] from a CLI [`Tool`] enum variant.
228///
229/// The `kind` string matches what [`noetl_tools::registry::ToolRegistry`]
230/// uses for dispatch. The `config` payload is the variant's fields
231/// serialized as JSON; the receiving tool deserializes its own
232/// expected schema from this value (e.g. `noetl_tools::tools::shell`
233/// expects `{"cmds": [...]}`).
234///
235/// `Tool::Unsupported` returns a `ToolConfig` with `kind: "unsupported"`
236/// — dispatch will fail at registry lookup, which matches the CLI's
237/// current behaviour of emitting an error.
238pub fn to_tools_config(tool: &Tool) -> ToolConfig {
239 let (kind, config) = match tool {
240 Tool::Shell { cmds } => {
241 // noetl-tools::ShellConfig expects a single `command`
242 // string. CLI's CmdsList::Multiple becomes a newline-
243 // joined block (one bash invocation with a multi-line
244 // script); CmdsList::Single becomes the string verbatim.
245 //
246 // Important: this is the per-call ToolConfig shape. The
247 // Tool::Shell arm of `dispatch_via_registry` does NOT use
248 // this helper because the CLI's runtime semantics require
249 // one bash invocation PER command (independent process,
250 // no shared cwd/env state) — the dispatch arm loops and
251 // builds per-command ToolConfigs via [`shell_command_config`].
252 (
253 "shell",
254 serde_json::json!({
255 "command": match cmds {
256 CmdsList::Single(s) => s.clone(),
257 CmdsList::Multiple(v) => v.join("\n"),
258 },
259 "shell": "bash",
260 "capture": true,
261 }),
262 )
263 }
264 Tool::Http {
265 method,
266 url,
267 headers,
268 params,
269 body,
270 auth: _, // resolved at dispatch time into a Bearer header; not threaded through ToolConfig.auth (see PR-2c-5)
271 } => (
272 "http",
273 // noetl-tools' HttpConfig deserializes the method via
274 // `#[serde(rename_all = "UPPERCASE")]`, so we emit the
275 // uppercased CLI string here. The body is wrapped as a
276 // JSON Value: if the CLI's body parses as JSON we pass the
277 // parsed Value (so reqwest serialises it as JSON with the
278 // right Content-Type); otherwise we pass it as a JSON
279 // string which noetl-tools sends verbatim as the body.
280 serde_json::json!({
281 "method": method.to_uppercase(),
282 "url": url,
283 "headers": headers,
284 "params": params,
285 "body": body.as_deref().map(http_body_value),
286 }),
287 ),
288 Tool::Playbook { path, args, input } => (
289 "playbook",
290 serde_json::json!({
291 "path": path,
292 "args": args,
293 "input": input,
294 }),
295 ),
296 Tool::DuckDb { db, query, params } => (
297 // noetl-tools' DuckdbConfig schema uses `db_path` (not
298 // `db`), `query` is required (so we substitute an empty
299 // string when the CLI doesn't carry one — the dispatch
300 // arm short-circuits in that case), and params are
301 // `Vec<serde_json::Value>` rather than `Vec<String>`.
302 // Conversion is faithful: a CLI string param becomes a
303 // JSON string value bound at the `?` placeholder by
304 // noetl-tools' DuckdbTool.
305 //
306 // Compatibility note: the CLI's pre-PR-2c-6
307 // `execute_duckdb_query` accepted but **ignored** the
308 // `params` field (signature was `_params: &[String]`).
309 // The bridge now binds them, which is a feature gain
310 // documented in the PR body and on the executor-crate-
311 // architecture wiki page.
312 "duckdb",
313 serde_json::json!({
314 "db_path": db,
315 "query": query.clone().unwrap_or_default(),
316 "params": params
317 .iter()
318 .map(|p| serde_json::Value::String(p.clone()))
319 .collect::<Vec<_>>(),
320 "as_objects": true,
321 }),
322 ),
323 Tool::Rhai { code, args } => (
324 "rhai",
325 serde_json::json!({
326 "code": code,
327 "args": args,
328 }),
329 ),
330 Tool::Auth { provider, scopes, project } => (
331 "auth",
332 serde_json::json!({
333 "provider": provider,
334 "scopes": scopes,
335 "project": project,
336 }),
337 ),
338 Tool::Sink { target, format } => (
339 "sink",
340 serde_json::json!({
341 "target": target_to_value(target),
342 "format": format!("{:?}", format).to_lowercase(),
343 }),
344 ),
345 Tool::Unsupported => ("unsupported", serde_json::json!({})),
346 };
347
348 ToolConfig {
349 kind: kind.to_string(),
350 config,
351 timeout: None,
352 retry: None,
353 auth: None,
354 }
355}
356
357/// Build a single-command ToolConfig for the shell tool. Used by
358/// the `Tool::Shell` dispatch arm to preserve the CLI's per-command
359/// bash-invocation semantics (independent process, no shared
360/// cwd/env state across commands).
361fn shell_command_config(command: &str) -> ToolConfig {
362 ToolConfig {
363 kind: "shell".to_string(),
364 config: serde_json::json!({
365 "command": command,
366 "shell": "bash",
367 "capture": true,
368 }),
369 timeout: None,
370 retry: None,
371 auth: None,
372 }
373}
374
375/// Convert a CLI HTTP body string into a JSON [`serde_json::Value`]
376/// suitable for noetl-tools' `HttpConfig.body` field. If the body
377/// parses as JSON, the parsed value is returned (and `reqwest` sends
378/// it with `Content-Type: application/json`). Otherwise the body
379/// is wrapped as a [`Value::String`] which `reqwest` writes
380/// verbatim as the request body.
381fn http_body_value(body: &str) -> serde_json::Value {
382 serde_json::from_str(body).unwrap_or_else(|_| serde_json::Value::String(body.to_string()))
383}
384
385/// Resolve a CLI [`AuthConfig`] to a Bearer token using noetl-tools'
386/// [`GcpAuth`] provider.
387///
388/// CLI providers `"gcp"`, `"google"`, and `"adc"` all map to GCP
389/// Application Default Credentials. Any other provider value
390/// returns an error matching the CLI's pre-PR-2c-5 behaviour.
391///
392/// This replaces the CLI's inline `get_auth_token` (which shelled
393/// out to `gcloud auth print-access-token`). See semantic
394/// divergence row on the executor-crate-architecture wiki page.
395pub async fn resolve_auth_to_bearer(cfg: &CliAuthConfig) -> Result<String> {
396 match cfg.provider.as_str() {
397 "gcp" | "google" | "adc" => {
398 let gcp = GcpAuth::new();
399 let scopes: Vec<&str> = cfg.scopes.iter().map(|s| s.as_str()).collect();
400 let token = if scopes.is_empty() {
401 gcp.get_default_token()
402 .await
403 .map_err(|e| anyhow::anyhow!("failed to get GCP access token: {}", e))?
404 } else {
405 gcp.get_token(&scopes)
406 .await
407 .map_err(|e| anyhow::anyhow!("failed to get GCP access token: {}", e))?
408 };
409 Ok(token)
410 }
411 other => anyhow::bail!(
412 "unsupported auth provider: {}. Supported: gcp, google, adc",
413 other
414 ),
415 }
416}
417
418/// Build the noetl-tools [`ToolConfig`] for an HTTP request.
419///
420/// Identical to the [`to_tools_config`] `Tool::Http` arm but pulled
421/// out so the dispatch arm can also inject an `Authorization:
422/// Bearer <token>` header when a CLI `AuthConfig` is present
423/// (resolved via [`resolve_auth_to_bearer`]).
424///
425/// CLI's `auth` is intentionally NOT mapped to noetl-tools'
426/// `ToolConfig.auth` field: that field expects an `AuthConfig` with
427/// `credential` / `token` lookup against `ExecutionContext.secrets`,
428/// which CLI local mode does not populate. Pre-resolving the
429/// token and injecting it as a header keeps the CLI's existing
430/// authority semantics (the CLI process's gcloud / ADC chain) and
431/// avoids reshaping the credential resolver path.
432fn http_tool_config(
433 method: &str,
434 url: &str,
435 headers: &HashMap<String, String>,
436 params: &HashMap<String, String>,
437 body: Option<&str>,
438 bearer: Option<&str>,
439) -> ToolConfig {
440 let mut merged_headers = headers.clone();
441 if let Some(token) = bearer {
442 merged_headers.insert(
443 "Authorization".to_string(),
444 format!("Bearer {}", token),
445 );
446 }
447 ToolConfig {
448 kind: "http".to_string(),
449 config: serde_json::json!({
450 "method": method.to_uppercase(),
451 "url": url,
452 "headers": merged_headers,
453 "params": params,
454 "body": body.map(http_body_value),
455 }),
456 timeout: None,
457 retry: None,
458 auth: None,
459 }
460}
461
462/// Reshape noetl-tools' HTTP result envelope back to the CLI's
463/// pre-PR-2c-5 shape.
464///
465/// noetl-tools' HttpTool always packs `data: {"status_code":
466/// u16, "headers": {...}, "body": <json>}` into the ToolResult,
467/// regardless of whether the HTTP response was 2xx (Success) or
468/// 4xx/5xx (Error). The CLI's `execute_http_request` returned the
469/// envelope `{"status": <int>, "body": <json>}` for ALL HTTP
470/// responses (including 4xx/5xx) so playbook steps could branch on
471/// the status code. We preserve that contract here: only network-
472/// transport failures bubble up as `anyhow::Error`; HTTP error
473/// statuses come back inside the JSON envelope.
474fn reshape_http_result(result: ToolResult) -> Result<BridgeOutcome> {
475 if let Some(data) = result.data {
476 let status_code = data
477 .get("status_code")
478 .and_then(|v| v.as_u64())
479 .unwrap_or(0) as i32;
480 let body = data
481 .get("body")
482 .cloned()
483 .unwrap_or(serde_json::Value::Null);
484 let envelope = serde_json::json!({
485 "status": status_code,
486 "body": body,
487 });
488 return Ok(BridgeOutcome {
489 result: Some(envelope.to_string()),
490 });
491 }
492 // No data — fall back to the generic from_tools_result path so
493 // we surface whatever error / stdout the tool emitted.
494 from_tools_result(result)
495}
496
497/// Build a [`ToolConfig`] for a DuckDB query.
498///
499/// Used by the `Tool::DuckDb` dispatch arm. Path resolution
500/// (playbook-relative vs absolute) and `mkdir -p` of the parent
501/// directory are handled at the CLI call site BEFORE the bridge is
502/// invoked, so this helper receives an already-resolved absolute
503/// path string (or `:memory:` for in-memory mode).
504fn duckdb_tool_config(
505 db_path: &str,
506 query: &str,
507 params: &[String],
508) -> ToolConfig {
509 ToolConfig {
510 kind: "duckdb".to_string(),
511 config: serde_json::json!({
512 "db_path": db_path,
513 "query": query,
514 "params": params
515 .iter()
516 .map(|p| serde_json::Value::String(p.clone()))
517 .collect::<Vec<_>>(),
518 // CLI's pre-PR-2c-6 SELECT result shape was an array of
519 // JSON objects keyed by column name; `as_objects: true`
520 // matches that. `reshape_duckdb_result` then unwraps
521 // the noetl-tools envelope back to the raw array.
522 "as_objects": true,
523 }),
524 timeout: None,
525 retry: None,
526 auth: None,
527 }
528}
529
530/// Reshape noetl-tools' DuckDB result envelope back to the CLI's
531/// pre-PR-2c-6 shape.
532///
533/// noetl-tools' DuckdbTool returns:
534/// - SELECT / WITH: `data: {"columns": [...], "rows": [{...}, ...],
535/// "row_count": N}`
536/// - non-SELECT: `data: {"affected_rows": N}`
537///
538/// The CLI's `execute_duckdb_query` returned:
539/// - SELECT / WITH: a JSON array of objects (pretty-printed)
540/// - non-SELECT: the literal string `{"status": "ok"}`
541///
542/// `reshape_duckdb_result` maps the former onto the latter so
543/// playbook steps that read `<step>.result[0].col_name` keep
544/// working. `affected_rows` from the noetl-tools envelope is
545/// dropped on purpose — the CLI never exposed it.
546fn reshape_duckdb_result(result: ToolResult) -> Result<BridgeOutcome> {
547 let data = match result.data {
548 Some(d) => d,
549 None => return from_tools_result(result),
550 };
551
552 if let Some(rows) = data.get("rows").and_then(|v| v.as_array()) {
553 // SELECT path. Return the rows array as a pretty-printed
554 // JSON string — matches the CLI's
555 // `serde_json::to_string_pretty(&results)`.
556 let pretty = serde_json::to_string_pretty(rows)?;
557 return Ok(BridgeOutcome { result: Some(pretty) });
558 }
559
560 if data.get("affected_rows").is_some() {
561 // Non-SELECT path. CLI emitted the literal `{"status":
562 // "ok"}` here; preserve that.
563 return Ok(BridgeOutcome {
564 result: Some(r#"{"status": "ok"}"#.to_string()),
565 });
566 }
567
568 // Unknown shape — fall back to the generic from_tools_result
569 // path so we still surface whatever the tool emitted.
570 from_tools_result(ToolResult {
571 status: result.status,
572 data: Some(data),
573 error: result.error,
574 stdout: result.stdout,
575 stderr: result.stderr,
576 exit_code: result.exit_code,
577 duration_ms: result.duration_ms,
578 })
579}
580
581/// Prepare the variable map for a sub-playbook invocation.
582///
583/// Used by the CLI's `Tool::Playbook` arm (which keeps owning the
584/// tree-walker recursion per § H.10). The helper merges the
585/// parent context's variables with the sub-playbook's
586/// `input:` (DSL v2) or `args:` (DSL v1 legacy), each rendered
587/// against the parent context via the caller-supplied
588/// `render_template` closure and prefixed with `workload.` to
589/// match the sub-playbook's expected variable shape.
590///
591/// `input` takes precedence over `args` when both are present —
592/// same precedence the CLI's pre-PR-2c-7 inline implementation
593/// applied.
594///
595/// `parent_vars`, `args`, and `input` correspond directly to the
596/// caller's `context.variables`, `Tool::Playbook.args`, and
597/// `Tool::Playbook.input` fields. The `render` closure receives
598/// each template string and is expected to return the rendered
599/// value (the CLI passes `|t| self.render_template(t, context)`).
600///
601/// Returning a fresh `HashMap` rather than mutating in place makes
602/// the helper easy to test and matches how the inline
603/// implementation operated.
604pub fn prepare_sub_playbook_vars<F>(
605 parent_vars: &HashMap<String, String>,
606 args: &HashMap<String, String>,
607 input: &HashMap<String, serde_yaml::Value>,
608 mut render: F,
609) -> Result<HashMap<String, String>>
610where
611 F: FnMut(&str) -> Result<String>,
612{
613 let mut sub_vars = parent_vars.clone();
614
615 if !input.is_empty() {
616 // DSL v2: tool.input takes precedence — render and prefix
617 // with `workload.`.
618 for (key, value_yaml) in input {
619 let template = match value_yaml {
620 serde_yaml::Value::String(s) => s.clone(),
621 serde_yaml::Value::Number(n) => n.to_string(),
622 serde_yaml::Value::Bool(b) => b.to_string(),
623 other => serde_yaml::to_string(other)?.trim().to_string(),
624 };
625 let value = render(&template)?;
626 sub_vars.insert(format!("workload.{}", key), value);
627 }
628 } else if !args.is_empty() {
629 // DSL v1 legacy: args field — prefix with `workload.`.
630 for (key, template) in args {
631 let value = render(template)?;
632 sub_vars.insert(format!("workload.{}", key), value);
633 }
634 }
635
636 Ok(sub_vars)
637}
638
639/// Apply post-resolution `Tool::Auth` side-effects to the CLI's
640/// execution context.
641///
642/// Returns the (key, value) pairs the caller should
643/// `set_variable` on its `ExecutionContext` so subsequent steps
644/// can reference `{{ auth.token }}` etc. Wrapping this in a
645/// helper means future call sites (the worker, integration tests)
646/// don't have to re-derive which keys to set.
647///
648/// `project` is the **already-rendered** project string (the CLI
649/// renders templates against its own context before calling this
650/// helper), or `None` if the playbook didn't supply one.
651///
652/// Output order:
653/// - `auth.project` (only if `project` is `Some` and non-empty)
654/// - `auth.token`
655/// - `auth.provider`
656///
657/// Matching the CLI's pre-PR-2c-8 ordering — `auth.project` set
658/// first by the inline arm, then the token + provider after the
659/// `resolve_auth_to_bearer` call.
660pub fn auth_context_updates(
661 provider: &str,
662 token: &str,
663 project: Option<&str>,
664) -> Vec<(String, String)> {
665 let mut updates: Vec<(String, String)> = Vec::with_capacity(3);
666 if let Some(p) = project {
667 if !p.is_empty() {
668 updates.push(("auth.project".to_string(), p.to_string()));
669 }
670 }
671 updates.push(("auth.token".to_string(), token.to_string()));
672 updates.push(("auth.provider".to_string(), provider.to_string()));
673 updates
674}
675
676/// Format the payload a `Tool::Sink` writes to its target.
677///
678/// Pure transformation lifted from the CLI's inline
679/// `Tool::Sink` arm. The CLI passes the last step's result
680/// (already a JSON-serialized string in `ExecutionContext`) and
681/// the playbook's declared `format:` field; the helper returns
682/// the formatted string ready to write to file / DuckDB / GCS.
683///
684/// Format rules:
685/// - [`SinkFormat::Json`]: pass-through. Same as CLI's
686/// pre-PR-2c-8 behaviour (the raw step-result string).
687/// - [`SinkFormat::Yaml`]: parse the input as JSON, then dump as
688/// YAML. Falls back to pass-through if the input doesn't parse.
689/// - [`SinkFormat::Csv`]: see [`json_to_csv`] for the rules.
690pub fn format_sink_payload(format: &SinkFormat, raw: &str) -> Result<String> {
691 match format {
692 SinkFormat::Json => Ok(raw.to_string()),
693 SinkFormat::Yaml => {
694 if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(raw) {
695 Ok(serde_yaml::to_string(&json_val).unwrap_or_else(|_| raw.to_string()))
696 } else {
697 Ok(raw.to_string())
698 }
699 }
700 SinkFormat::Csv => json_to_csv(raw),
701 }
702}
703
704/// Convert a JSON-array-of-objects string into CSV.
705///
706/// Pure helper lifted from the CLI's inline `json_to_csv`. Returns
707/// the input unchanged if:
708/// - it doesn't parse as JSON,
709/// - it parses as a non-array value, or
710/// - it's an empty array, or
711/// - the first element isn't a JSON object.
712///
713/// Otherwise: emits a header row from the first object's keys
714/// followed by one row per array element. Values are converted
715/// via `Display`; strings that contain `,` or `"` are
716/// double-quoted with embedded `"` doubled — minimal RFC 4180
717/// quoting, matching the CLI's pre-PR-2c-8 implementation.
718pub fn json_to_csv(json_str: &str) -> Result<String> {
719 let value: serde_json::Value =
720 serde_json::from_str(json_str).unwrap_or(serde_json::Value::String(json_str.to_string()));
721
722 match value {
723 serde_json::Value::Array(arr) if !arr.is_empty() => {
724 let headers: Vec<String> = if let Some(serde_json::Value::Object(obj)) = arr.first() {
725 obj.keys().cloned().collect()
726 } else {
727 return Ok(json_str.to_string());
728 };
729
730 let mut csv = headers.join(",") + "\n";
731
732 for item in &arr {
733 if let serde_json::Value::Object(obj) = item {
734 let row: Vec<String> = headers
735 .iter()
736 .map(|h| {
737 obj.get(h)
738 .map(|v| match v {
739 serde_json::Value::String(s) => {
740 if s.contains(',') || s.contains('"') {
741 format!("\"{}\"", s.replace('"', "\"\""))
742 } else {
743 s.clone()
744 }
745 }
746 _ => v.to_string(),
747 })
748 .unwrap_or_default()
749 })
750 .collect();
751 csv.push_str(&row.join(","));
752 csv.push('\n');
753 }
754 }
755 Ok(csv)
756 }
757 _ => Ok(json_str.to_string()),
758 }
759}
760
761// ---------------------------------------------------------------------------
762// GCS upload helper (R-3, noetl/ai-meta#31)
763// ---------------------------------------------------------------------------
764
765/// Upload `data` to `gs://<bucket>/<key>` using the `object_store` crate.
766///
767/// # Credential chain
768///
769/// Authentication defaults to the same Application Default Credentials
770/// (ADC) / workload-identity chain that [`resolve_auth_to_bearer`] uses
771/// via `gcp_auth`. Concretely: `GoogleCloudStorageBuilder::from_env()`
772/// reads (in priority order):
773///
774/// 1. `GOOGLE_SERVICE_ACCOUNT_KEY` env var (JSON service-account key
775/// inline — useful for CI / test containers).
776/// 2. `GOOGLE_SERVICE_ACCOUNT` env var (path to a JSON key file).
777/// 3. The ambient Application Default Credentials
778/// (`~/.config/gcloud/application_default_credentials.json` on dev
779/// hosts; the GKE metadata server on cluster pods).
780///
781/// This matches GKE workload-identity on cluster and `gcloud auth
782/// application-default login` on dev hosts — the same two paths the
783/// former `gsutil cp` subprocess relied on.
784///
785/// # Error shape
786///
787/// Returns `anyhow::Error` with a human-readable message on failure
788/// (instead of a gsutil exit-code string). The CLI's `sink_to_gcs`
789/// wrapper maps this through the usual `?` chain.
790///
791/// # Observability
792///
793/// Wraps the upload in a `gcs.upload` tracing span that carries
794/// `bucket`, `key`, and `bytes` fields so the span is grep-able in
795/// structured logs. Upload duration is emitted as a debug-level event
796/// (`gcs.upload.duration_ms`) so tooling can aggregate latency without
797/// a Prometheus registry in the executor crate. A future PR can
798/// promote this to a proper histogram once the executor crate grows a
799/// metrics registry.
800///
801/// # Pluggable store (testing)
802///
803/// The `store` parameter is `Arc<dyn ObjectStore>`. Production callers
804/// pass `None` (the default GCS store is built from env); integration
805/// tests inject `Arc<object_store::memory::InMemory::new()>` to avoid
806/// real GCS calls. See `gcs_upload_with_store` for the inner
807/// implementation that both paths share.
808pub async fn gcs_upload(bucket: &str, key: &str, data: &str) -> Result<()> {
809 use object_store::gcp::GoogleCloudStorageBuilder;
810
811 let store = GoogleCloudStorageBuilder::from_env()
812 .with_bucket_name(bucket)
813 .build()
814 .map_err(|e| anyhow::anyhow!("failed to build GCS store for bucket {:?}: {}", bucket, e))?;
815
816 gcs_upload_with_store(Arc::new(store), key, data).await
817}
818
819/// Inner upload path shared by production and test callers.
820///
821/// Production: called by [`gcs_upload`] with a real
822/// `GoogleCloudStorage` store.
823/// Tests: called directly with `Arc<InMemory>` — no GCS dependency.
824pub async fn gcs_upload_with_store(
825 store: Arc<dyn ObjectStore>,
826 key: &str,
827 data: &str,
828) -> Result<()> {
829 let bytes = Bytes::from(data.to_string());
830 let byte_len = bytes.len();
831 let path = StorePath::from(key);
832
833 let span = info_span!(
834 "gcs.upload",
835 key = key,
836 bytes = byte_len,
837 );
838
839 async move {
840 let start = Instant::now();
841
842 store
843 .put(&path, PutPayload::from_bytes(bytes))
844 .await
845 .map_err(|e| anyhow::anyhow!("GCS upload failed for key {:?}: {}", key, e))?;
846
847 let elapsed_ms = start.elapsed().as_millis();
848 tracing::debug!(
849 target: "noetl::gcs",
850 duration_ms = elapsed_ms,
851 key = key,
852 bytes = byte_len,
853 "gcs.upload complete"
854 );
855
856 Ok(())
857 }
858 .instrument(span)
859 .await
860}
861
862fn target_to_value(target: &crate::playbook::SinkTarget) -> serde_json::Value {
863 match target {
864 crate::playbook::SinkTarget::File { path } => {
865 serde_json::json!({"type": "file", "path": path})
866 }
867 crate::playbook::SinkTarget::DuckDb { db, table } => {
868 serde_json::json!({"type": "duckdb", "db": db, "table": table})
869 }
870 crate::playbook::SinkTarget::Gcs { bucket, path } => {
871 serde_json::json!({"type": "gcs", "bucket": bucket, "path": path})
872 }
873 }
874}
875
876/// Convert a [`ToolResult`] back into the bridge outcome shape the
877/// CLI consumes. Success results carry `data` (or `stdout` if no
878/// `data` was populated) as the result string; failures bubble up
879/// as `anyhow::Error` so the CLI's existing error-handling chain
880/// continues to work.
881pub fn from_tools_result(result: ToolResult) -> Result<BridgeOutcome> {
882 match result.status {
883 ToolStatus::Success => {
884 let payload = result
885 .data
886 .map(|v| match v {
887 serde_json::Value::String(s) => s,
888 other => other.to_string(),
889 })
890 .or(result.stdout);
891 Ok(BridgeOutcome { result: payload })
892 }
893 ToolStatus::Error => Err(anyhow::anyhow!(
894 "tool execution failed: {}",
895 result.error.unwrap_or_else(|| "unknown error".to_string())
896 )),
897 ToolStatus::Timeout => Err(anyhow::anyhow!(
898 "tool execution timed out after {} ms",
899 result.duration_ms.unwrap_or(0)
900 )),
901 }
902}
903
904// ---------------------------------------------------------------------------
905// Dispatch — per-tool-kind match scaffold.
906// ---------------------------------------------------------------------------
907
908/// Bridge dispatch entry point. Each tool kind is replaced
909/// incrementally in subsequent sub-PRs (PR-2c-3 onwards).
910///
911/// The function is async because every concrete `noetl-tools` tool
912/// implementation is async (`Tool::execute` is `async`). The CLI
913/// adapts via `tokio::runtime::Handle::current().block_on(...)` if
914/// the call site is sync — see PR-2c-3's wiring for the pattern.
915pub async fn dispatch_via_registry(
916 tool: &Tool,
917 bridge: &BridgeContext<'_>,
918) -> Result<BridgeOutcome> {
919 let _config = to_tools_config(tool);
920 let _ctx = to_tools_context(bridge);
921
922 match tool {
923 Tool::Rhai { .. } => {
924 // PR-2c-3: first real tool replacement. Builds a
925 // RhaiTool from noetl-tools, dispatches against the
926 // adapter-converted config + context, and converts the
927 // result back through `from_tools_result`.
928 //
929 // Semantic note documented in the PR body: noetl-tools'
930 // `timestamp()` returns the Unix epoch as a string
931 // (e.g. "1716847425"), whereas the CLI's inline
932 // implementation returned `chrono::Local::now()
933 // .format("%H:%M:%S")` (e.g. "14:23:45"). Other
934 // helpers (log, print, parse_json, contains, http_*,
935 // get_gcp_token, sleep, sleep_ms) match.
936 let rhai_tool = RhaiTool::new();
937 let config = to_tools_config(tool);
938 // rhai needs a nested variable shape so
939 // `workload.region` is a Rhai field-access expression.
940 let ctx = to_tools_context_for_rhai(bridge);
941 let result = rhai_tool
942 .execute(&config, &ctx)
943 .await
944 .map_err(|e| anyhow::anyhow!("rhai dispatch failed: {}", e))?;
945 from_tools_result(result)
946 }
947 Tool::Shell { cmds } => {
948 // PR-2c-4: dispatch through noetl_tools::ShellTool.
949 //
950 // CLI semantics preserved:
951 // - CmdsList::Single splits on newlines into individual
952 // commands; each runs in its own bash invocation.
953 // - CmdsList::Multiple runs each element in its own
954 // bash invocation in order.
955 // - Bails on first non-zero exit (CLI's existing
956 // `anyhow::bail!("Command failed ...")`).
957 // - Returns the last command's stdout as the step result.
958 //
959 // Note vs CLI: noetl-tools' ShellTool collects stdout +
960 // stderr and returns them in the ToolResult at the end
961 // of execution. The CLI's inline implementation
962 // streamed output to the terminal line-by-line as the
963 // command ran. For long-running shell steps users no
964 // longer see real-time output. Documented in the PR
965 // body and on the executor-crate-architecture wiki
966 // page's semantic-divergence table.
967 let commands: Vec<String> = match cmds {
968 CmdsList::Single(cmd) => cmd
969 .lines()
970 .map(|s| s.trim())
971 .filter(|s| !s.is_empty())
972 .map(|s| s.to_string())
973 .collect(),
974 CmdsList::Multiple(c) => c.clone(),
975 };
976
977 let shell_tool = ShellTool::new();
978 let ctx = to_tools_context(bridge);
979 let mut last_outcome = BridgeOutcome::empty();
980 for command in commands {
981 let config = shell_command_config(&command);
982 let result = shell_tool
983 .execute(&config, &ctx)
984 .await
985 .map_err(|e| anyhow::anyhow!("shell dispatch failed: {}", e))?;
986
987 // noetl-tools' shell tool packs the result into
988 // ToolResult.data as a typed JSON object:
989 // {"exit_code": i32, "stdout": String, "stderr": String}
990 // For the CLI's step-result contract (a single
991 // string = the command's stdout), we unwrap stdout
992 // directly here. `from_tools_result` would
993 // otherwise stringify the whole JSON dict.
994 if result.status != ToolStatus::Success {
995 let exit_code = result
996 .data
997 .as_ref()
998 .and_then(|d| d.get("exit_code"))
999 .and_then(|v| v.as_i64());
1000 anyhow::bail!(
1001 "Command failed with exit code: {:?}",
1002 exit_code
1003 );
1004 }
1005 let stdout = result
1006 .data
1007 .as_ref()
1008 .and_then(|d| d.get("stdout"))
1009 .and_then(|v| v.as_str())
1010 .map(|s| s.trim_end_matches('\n').to_string());
1011 last_outcome = BridgeOutcome { result: stdout };
1012 }
1013 Ok(last_outcome)
1014 }
1015 Tool::Http {
1016 method,
1017 url,
1018 headers,
1019 params,
1020 body,
1021 auth,
1022 } => {
1023 // PR-2c-5: dispatch through noetl_tools::HttpTool.
1024 //
1025 // CLI semantics preserved:
1026 // - Auth resolution via GCP ADC (gcp / google / adc).
1027 // - Step result is the JSON envelope
1028 // `{"status": <int>, "body": <json-or-string>}`
1029 // regardless of HTTP status code (so playbook steps
1030 // can branch on `<step>.body.status`).
1031 //
1032 // Semantic divergences (documented on the executor-crate-
1033 // architecture wiki page):
1034 // - HTTP transport: curl subprocess → reqwest direct.
1035 // - GCP token: `gcloud auth print-access-token` shellout
1036 // → `gcp_auth` crate (workload-identity aware on GKE).
1037 // - Body bytes: CLI sent the body string verbatim via
1038 // `curl -d`. noetl-tools serializes the body as JSON
1039 // when the string parses as JSON (adding Content-Type:
1040 // application/json automatically), otherwise sends it
1041 // verbatim. See `http_body_value`.
1042 let bearer = if let Some(auth_cfg) = auth {
1043 Some(resolve_auth_to_bearer(auth_cfg).await?)
1044 } else {
1045 None
1046 };
1047 let config = http_tool_config(
1048 method,
1049 url,
1050 headers,
1051 params,
1052 body.as_deref(),
1053 bearer.as_deref(),
1054 );
1055 let http_tool = HttpTool::new();
1056 let ctx = to_tools_context(bridge);
1057 let result = http_tool
1058 .execute(&config, &ctx)
1059 .await
1060 .map_err(|e| anyhow::anyhow!("http dispatch failed: {}", e))?;
1061 reshape_http_result(result)
1062 }
1063 Tool::DuckDb { db, query, params } => {
1064 // PR-2c-6: dispatch through noetl_tools::DuckdbTool.
1065 //
1066 // CLI semantics preserved:
1067 // - The CLI's call site already resolved playbook-
1068 // relative paths (`resolve_duckdb_path`) and ran
1069 // `mkdir -p` on the parent directory before invoking
1070 // the bridge, so `db` here is an absolute path
1071 // string ready to hand to DuckDB.
1072 // - SELECT / WITH queries return a JSON array of
1073 // objects (pretty-printed).
1074 // - Non-SELECT queries return the literal envelope
1075 // `{"status": "ok"}` (CLI never exposed
1076 // noetl-tools' `affected_rows`).
1077 // - Empty / missing query short-circuits to an empty
1078 // outcome, matching the CLI arm's
1079 // `if let Some(query_str) = query` guard.
1080 //
1081 // Feature gain: CLI's pre-PR-2c-6 inline impl took a
1082 // `_params: &[String]` and silently ignored it. The
1083 // bridge now binds those params as JSON values at
1084 // `?` placeholders. Playbooks that had a stale
1085 // `params:` list under a query without `?` placeholders
1086 // continue to work (DuckDB ignores extra params); any
1087 // playbook that *intended* the params would now see
1088 // them applied — documented in the PR body.
1089 let query = match query {
1090 Some(q) if !q.trim().is_empty() => q,
1091 _ => return Ok(BridgeOutcome::empty()),
1092 };
1093 let config = duckdb_tool_config(db, query, params);
1094 let duckdb_tool = DuckdbTool::new();
1095 let ctx = to_tools_context(bridge);
1096 let result = duckdb_tool
1097 .execute(&config, &ctx)
1098 .await
1099 .map_err(|e| anyhow::anyhow!("duckdb dispatch failed: {}", e))?;
1100 reshape_duckdb_result(result)
1101 }
1102 Tool::Playbook { .. } => {
1103 // PR-2c-7: encodes the § H.10 architectural finding.
1104 //
1105 // `Tool::Playbook` is the recursion case of the CLI's
1106 // tree walker — it loads a sub-playbook YAML and
1107 // dispatches it through the same `PlaybookRunner` the
1108 // top-level invocation uses. `PlaybookRunner` lives in
1109 // the CLI binary, not in `noetl-executor` or
1110 // `noetl-tools`, so routing this tool through the
1111 // bridge would require either:
1112 // - dragging the tree walker into `noetl-executor`,
1113 // re-opening the § H.10 question that re-scoped
1114 // the crate to a utilities-and-types crate; or
1115 // - adding a callback trait to `noetl-tools` that
1116 // delegates back to the CLI binary, an
1117 // infrastructure layer nothing else in the
1118 // registry uses.
1119 //
1120 // The architecturally honest answer is that this tool
1121 // kind is NOT bridgeable. The CLI's `Tool::Playbook`
1122 // arm stays inline by design. Bailing loudly here
1123 // ensures any future code that tries to dispatch
1124 // `Tool::Playbook` through the bridge gets an
1125 // immediate, descriptive error instead of a silent
1126 // empty outcome.
1127 //
1128 // Sub-playbook variable preparation (the input + args
1129 // merging logic the CLI's call site performs before
1130 // recursing) DOES move into the executor as
1131 // [`prepare_sub_playbook_vars`] — that part is reusable
1132 // and testable independent of the tree walker.
1133 anyhow::bail!(
1134 "Tool::Playbook is not bridgeable: sub-playbook \
1135 execution stays in the CLI's tree walker per \
1136 § H.10 of the Rust migration roadmap. Use \
1137 `PlaybookRunner::new(path).run()` directly from \
1138 the CLI."
1139 );
1140 }
1141 Tool::Auth { .. } => {
1142 // PR-2c-8: `Tool::Auth` does not dispatch through the
1143 // registry. Token resolution lives in
1144 // [`resolve_auth_to_bearer`] (added in PR-2c-5);
1145 // applying the resulting token to the CLI's
1146 // `ExecutionContext` lives in [`auth_context_updates`]
1147 // (added in PR-2c-8). Both are sync helpers the CLI
1148 // calls directly without going through dispatch. The
1149 // arm bails so any future code path that tries to
1150 // route a `Tool::Auth` through the registry gets a
1151 // clear, descriptive error instead of silently
1152 // returning an empty outcome.
1153 anyhow::bail!(
1154 "Tool::Auth is not bridge-dispatched: use \
1155 `resolve_auth_to_bearer` for token resolution and \
1156 `auth_context_updates` for applying the token to \
1157 the caller's execution context. See § H.10 of the \
1158 Rust migration roadmap."
1159 );
1160 }
1161 Tool::Sink { .. } => {
1162 // PR-2c-8: `Tool::Sink` does not dispatch through the
1163 // registry either. noetl-tools' `TransferTool` is
1164 // database-to-database only (snowflake / postgres /
1165 // duckdb / http source → snowflake / postgres /
1166 // duckdb target); it has no file / GCS / object-store
1167 // target. The CLI's three sink targets (File,
1168 // DuckDb, Gcs) each stay inline:
1169 //
1170 // - **File**: `fs::write` is a one-liner; the format
1171 // conversion (json / yaml / csv) DID extract into
1172 // [`format_sink_payload`] so it's reusable and
1173 // testable.
1174 // - **DuckDb**: complex `INSERT INTO ... SELECT FROM
1175 // read_json_auto(...)` with a single-object fallback;
1176 // no `noetl-tools` equivalent. Stays inline by
1177 // design (§ H.10-style finding).
1178 // - **Gcs**: gsutil shellout. A follow-up sub-PR
1179 // (tracked separately) will migrate this to the
1180 // `object_store` crate per § H.4 of Appendix H.
1181 //
1182 // The arm bails so misuse is loud.
1183 anyhow::bail!(
1184 "Tool::Sink is not bridge-dispatched: noetl-tools \
1185 has no file / GCS / object-store target. Use \
1186 `format_sink_payload` for format conversion; the \
1187 CLI's sink targets (file / duckdb / gcs) stay \
1188 inline per § H.10. GCS migration to `object_store` \
1189 is tracked as a separate follow-up."
1190 );
1191 }
1192 Tool::Unsupported => {
1193 anyhow::bail!("unsupported tool kind");
1194 }
1195 }
1196}
1197
1198// ---------------------------------------------------------------------------
1199// Tests
1200// ---------------------------------------------------------------------------
1201
1202#[cfg(test)]
1203mod tests {
1204 use super::*;
1205 use crate::playbook::{AuthConfig as CliAuthConfig, SinkFormat, SinkTarget};
1206
1207 fn empty_vars() -> HashMap<String, String> {
1208 HashMap::new()
1209 }
1210
1211 fn bridge_ctx<'a>(vars: &'a HashMap<String, String>) -> BridgeContext<'a> {
1212 BridgeContext {
1213 execution_id: 12345,
1214 step: "test_step",
1215 variables: vars,
1216 server_url: String::new(),
1217 worker_id: None,
1218 command_id: None,
1219 }
1220 }
1221
1222 #[test]
1223 fn to_tools_context_wraps_string_variables_as_json_value() {
1224 let vars: HashMap<String, String> =
1225 [("workload.region".into(), "us-west-1".into())].into();
1226 let ctx = to_tools_context(&bridge_ctx(&vars));
1227 assert_eq!(ctx.execution_id, 12345);
1228 assert_eq!(ctx.step, "test_step");
1229 assert_eq!(
1230 ctx.variables.get("workload.region"),
1231 Some(&serde_json::Value::String("us-west-1".into()))
1232 );
1233 assert!(ctx.secrets.is_empty(), "secrets stay empty by default");
1234 }
1235
1236 #[test]
1237 fn to_tools_config_shell_single_cmd() {
1238 let tool = Tool::Shell {
1239 cmds: CmdsList::Single("ls -la".into()),
1240 };
1241 let cfg = to_tools_config(&tool);
1242 assert_eq!(cfg.kind, "shell");
1243 assert_eq!(cfg.config["command"], "ls -la");
1244 assert_eq!(cfg.config["shell"], "bash");
1245 assert_eq!(cfg.config["capture"], true);
1246 assert!(cfg.timeout.is_none());
1247 }
1248
1249 #[test]
1250 fn to_tools_config_shell_multiple_cmds_joins_with_newlines() {
1251 // The to_tools_config helper produces a SINGLE-command shape
1252 // by joining; the dispatch arm instead loops per command to
1253 // preserve the CLI's "fresh bash per command" semantics.
1254 let tool = Tool::Shell {
1255 cmds: CmdsList::Multiple(vec!["echo one".into(), "echo two".into()]),
1256 };
1257 let cfg = to_tools_config(&tool);
1258 assert_eq!(cfg.kind, "shell");
1259 assert_eq!(cfg.config["command"], "echo one\necho two");
1260 }
1261
1262 #[test]
1263 fn shell_command_config_emits_per_cmd_shape() {
1264 let cfg = shell_command_config("echo hi");
1265 assert_eq!(cfg.kind, "shell");
1266 assert_eq!(cfg.config["command"], "echo hi");
1267 assert_eq!(cfg.config["shell"], "bash");
1268 assert_eq!(cfg.config["capture"], true);
1269 }
1270
1271 #[test]
1272 fn to_tools_config_http_round_trips_essentials() {
1273 let tool = Tool::Http {
1274 method: "post".into(), // lowercase to verify uppercasing
1275 url: "https://example.com/api".into(),
1276 headers: HashMap::new(),
1277 params: HashMap::new(),
1278 body: Some(r#"{"k":"v"}"#.into()),
1279 auth: None,
1280 };
1281 let cfg = to_tools_config(&tool);
1282 assert_eq!(cfg.kind, "http");
1283 // noetl-tools' HttpConfig.method deserializes via
1284 // #[serde(rename_all = "UPPERCASE")] so the bridge always
1285 // uppercases the CLI's method string.
1286 assert_eq!(cfg.config["method"], "POST");
1287 assert_eq!(cfg.config["url"], "https://example.com/api");
1288 // JSON bodies are parsed into a JSON Value so reqwest
1289 // serialises them with Content-Type: application/json.
1290 assert_eq!(cfg.config["body"], serde_json::json!({"k": "v"}));
1291 }
1292
1293 #[test]
1294 fn to_tools_config_http_keeps_non_json_body_as_string() {
1295 let tool = Tool::Http {
1296 method: "POST".into(),
1297 url: "https://example.com".into(),
1298 headers: HashMap::new(),
1299 params: HashMap::new(),
1300 body: Some("not json at all".into()),
1301 auth: None,
1302 };
1303 let cfg = to_tools_config(&tool);
1304 assert_eq!(cfg.config["body"], "not json at all");
1305 }
1306
1307 #[test]
1308 fn http_body_value_parses_json_strings() {
1309 let v = http_body_value(r#"{"a":1}"#);
1310 assert_eq!(v, serde_json::json!({"a": 1}));
1311 }
1312
1313 #[test]
1314 fn http_body_value_falls_back_to_string() {
1315 let v = http_body_value("plain text body");
1316 assert_eq!(v, serde_json::Value::String("plain text body".into()));
1317 }
1318
1319 #[test]
1320 fn http_tool_config_injects_bearer_header() {
1321 let cfg = http_tool_config(
1322 "GET",
1323 "https://example.com",
1324 &HashMap::new(),
1325 &HashMap::new(),
1326 None,
1327 Some("test-token-123"),
1328 );
1329 assert_eq!(cfg.kind, "http");
1330 assert_eq!(
1331 cfg.config["headers"]["Authorization"],
1332 "Bearer test-token-123"
1333 );
1334 }
1335
1336 #[test]
1337 fn http_tool_config_preserves_caller_headers_with_bearer() {
1338 let mut hdrs = HashMap::new();
1339 hdrs.insert("X-Trace-Id".into(), "abc123".into());
1340 let cfg = http_tool_config(
1341 "POST",
1342 "https://example.com",
1343 &hdrs,
1344 &HashMap::new(),
1345 None,
1346 Some("token"),
1347 );
1348 assert_eq!(cfg.config["headers"]["X-Trace-Id"], "abc123");
1349 assert_eq!(cfg.config["headers"]["Authorization"], "Bearer token");
1350 }
1351
1352 #[test]
1353 fn http_tool_config_no_auth_omits_authorization_header() {
1354 let cfg = http_tool_config(
1355 "GET",
1356 "https://example.com",
1357 &HashMap::new(),
1358 &HashMap::new(),
1359 None,
1360 None,
1361 );
1362 let hdrs = cfg.config["headers"].as_object().unwrap();
1363 assert!(!hdrs.contains_key("Authorization"));
1364 }
1365
1366 #[test]
1367 fn reshape_http_result_extracts_envelope() {
1368 let mut result = ToolResult::success(serde_json::json!({
1369 "status_code": 200,
1370 "headers": {},
1371 "body": {"ok": true},
1372 }));
1373 result.exit_code = Some(0);
1374 let outcome = reshape_http_result(result).unwrap();
1375 let parsed: serde_json::Value =
1376 serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1377 assert_eq!(parsed["status"], 200);
1378 assert_eq!(parsed["body"], serde_json::json!({"ok": true}));
1379 }
1380
1381 #[test]
1382 fn reshape_http_result_preserves_4xx_envelope_without_erroring() {
1383 // CLI contract: HTTP error statuses come back inside the
1384 // `{status, body}` envelope, NOT as anyhow::Error. Only
1385 // network-transport failures bubble up.
1386 let mut result = ToolResult {
1387 status: ToolStatus::Error,
1388 data: Some(serde_json::json!({
1389 "status_code": 404,
1390 "headers": {},
1391 "body": {"error": "not found"},
1392 })),
1393 error: Some("HTTP 404 response".into()),
1394 stdout: None,
1395 stderr: None,
1396 exit_code: Some(1),
1397 duration_ms: Some(5),
1398 };
1399 result.exit_code = Some(1);
1400 let outcome = reshape_http_result(result).unwrap();
1401 let parsed: serde_json::Value =
1402 serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1403 assert_eq!(parsed["status"], 404);
1404 assert_eq!(parsed["body"], serde_json::json!({"error": "not found"}));
1405 }
1406
1407 #[tokio::test]
1408 async fn resolve_auth_to_bearer_rejects_unknown_provider() {
1409 let cfg = CliAuthConfig {
1410 provider: "azure".into(),
1411 scopes: vec![],
1412 };
1413 let err = resolve_auth_to_bearer(&cfg).await.unwrap_err();
1414 assert!(err.to_string().contains("unsupported auth provider"));
1415 }
1416
1417 // ---- PR-2c-6 — Tool::DuckDb bridge integration -------------------
1418
1419 #[test]
1420 fn duckdb_tool_config_emits_noetl_tools_schema() {
1421 let cfg = duckdb_tool_config(
1422 ":memory:",
1423 "SELECT 1",
1424 &["arg1".to_string()],
1425 );
1426 assert_eq!(cfg.kind, "duckdb");
1427 assert_eq!(cfg.config["db_path"], ":memory:");
1428 assert_eq!(cfg.config["query"], "SELECT 1");
1429 assert_eq!(cfg.config["as_objects"], true);
1430 assert_eq!(
1431 cfg.config["params"],
1432 serde_json::json!([serde_json::Value::String("arg1".into())])
1433 );
1434 }
1435
1436 #[test]
1437 fn to_tools_config_duckdb_carries_path_and_query() {
1438 let tool = Tool::DuckDb {
1439 db: "warehouse.db".into(),
1440 query: Some("SELECT count(*) FROM orders".into()),
1441 params: vec![],
1442 };
1443 let cfg = to_tools_config(&tool);
1444 assert_eq!(cfg.kind, "duckdb");
1445 assert_eq!(cfg.config["db_path"], "warehouse.db");
1446 assert_eq!(cfg.config["query"], "SELECT count(*) FROM orders");
1447 assert_eq!(cfg.config["as_objects"], true);
1448 }
1449
1450 #[test]
1451 fn to_tools_config_duckdb_missing_query_becomes_empty_string() {
1452 let tool = Tool::DuckDb {
1453 db: ":memory:".into(),
1454 query: None,
1455 params: vec![],
1456 };
1457 let cfg = to_tools_config(&tool);
1458 assert_eq!(cfg.config["query"], "");
1459 }
1460
1461 #[test]
1462 fn reshape_duckdb_result_select_returns_rows_array() {
1463 let result = ToolResult::success(serde_json::json!({
1464 "columns": ["id", "name"],
1465 "rows": [
1466 {"id": 1, "name": "alice"},
1467 {"id": 2, "name": "bob"},
1468 ],
1469 "row_count": 2
1470 }));
1471 let outcome = reshape_duckdb_result(result).unwrap();
1472 let parsed: serde_json::Value =
1473 serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1474 let arr = parsed.as_array().expect("result is an array");
1475 assert_eq!(arr.len(), 2);
1476 assert_eq!(arr[0]["id"], 1);
1477 assert_eq!(arr[0]["name"], "alice");
1478 assert_eq!(arr[1]["name"], "bob");
1479 }
1480
1481 #[test]
1482 fn reshape_duckdb_result_select_empty_returns_empty_array() {
1483 let result = ToolResult::success(serde_json::json!({
1484 "columns": ["id"],
1485 "rows": [],
1486 "row_count": 0
1487 }));
1488 let outcome = reshape_duckdb_result(result).unwrap();
1489 let parsed: serde_json::Value =
1490 serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1491 assert_eq!(parsed.as_array().unwrap().len(), 0);
1492 }
1493
1494 #[test]
1495 fn reshape_duckdb_result_non_select_returns_status_envelope() {
1496 let result = ToolResult::success(serde_json::json!({
1497 "affected_rows": 3
1498 }));
1499 let outcome = reshape_duckdb_result(result).unwrap();
1500 // CLI returned the literal `{"status": "ok"}` string for
1501 // non-SELECT queries; `affected_rows` is intentionally
1502 // dropped (CLI never exposed it, so playbooks can't depend
1503 // on it).
1504 assert_eq!(outcome.result.as_deref(), Some(r#"{"status": "ok"}"#));
1505 }
1506
1507 #[tokio::test]
1508 async fn dispatch_duckdb_select_returns_rows_array() {
1509 let vars = empty_vars();
1510 let bridge = bridge_ctx(&vars);
1511 let tool = Tool::DuckDb {
1512 db: ":memory:".into(),
1513 query: Some("SELECT 1 AS num, 'hello' AS msg".into()),
1514 params: vec![],
1515 };
1516 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1517 let parsed: serde_json::Value =
1518 serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1519 let arr = parsed.as_array().expect("result is an array");
1520 assert_eq!(arr.len(), 1);
1521 assert_eq!(arr[0]["num"], 1);
1522 assert_eq!(arr[0]["msg"], "hello");
1523 }
1524
1525 #[tokio::test]
1526 async fn dispatch_duckdb_missing_query_returns_empty_outcome() {
1527 // Mirrors the CLI arm's `if let Some(query_str) = query` guard:
1528 // a Tool::DuckDb with no query falls through to None.
1529 let vars = empty_vars();
1530 let bridge = bridge_ctx(&vars);
1531 let tool = Tool::DuckDb {
1532 db: ":memory:".into(),
1533 query: None,
1534 params: vec![],
1535 };
1536 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1537 assert!(outcome.result.is_none());
1538 }
1539
1540 #[tokio::test]
1541 async fn dispatch_duckdb_empty_query_returns_empty_outcome() {
1542 let vars = empty_vars();
1543 let bridge = bridge_ctx(&vars);
1544 let tool = Tool::DuckDb {
1545 db: ":memory:".into(),
1546 query: Some(" ".into()), // whitespace only
1547 params: vec![],
1548 };
1549 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1550 assert!(outcome.result.is_none());
1551 }
1552
1553 // ---- PR-2c-7 — sub-playbook variable preparation ------------------
1554
1555 #[test]
1556 fn prepare_sub_playbook_vars_passes_parent_vars_through() {
1557 let parent: HashMap<String, String> =
1558 [("vars.timeout".into(), "30".into())].into();
1559 let sub = prepare_sub_playbook_vars(
1560 &parent,
1561 &HashMap::new(),
1562 &HashMap::new(),
1563 |t| Ok(t.to_string()),
1564 )
1565 .unwrap();
1566 assert_eq!(sub.get("vars.timeout"), Some(&"30".to_string()));
1567 }
1568
1569 #[test]
1570 fn prepare_sub_playbook_vars_v2_input_takes_precedence_over_v1_args() {
1571 let parent: HashMap<String, String> = HashMap::new();
1572 let mut input = HashMap::new();
1573 input.insert(
1574 "region".into(),
1575 serde_yaml::Value::String("us-east-1".into()),
1576 );
1577 let mut args = HashMap::new();
1578 args.insert("region".into(), "us-west-1".into());
1579
1580 let sub = prepare_sub_playbook_vars(&parent, &args, &input, |t| {
1581 Ok(t.to_string())
1582 })
1583 .unwrap();
1584 // input wins; args ignored when input is non-empty.
1585 assert_eq!(sub.get("workload.region"), Some(&"us-east-1".to_string()));
1586 }
1587
1588 #[test]
1589 fn prepare_sub_playbook_vars_v1_args_used_when_input_empty() {
1590 let parent: HashMap<String, String> = HashMap::new();
1591 let mut args = HashMap::new();
1592 args.insert("tier".into(), "prod".into());
1593 let sub = prepare_sub_playbook_vars(
1594 &parent,
1595 &args,
1596 &HashMap::new(),
1597 |t| Ok(t.to_string()),
1598 )
1599 .unwrap();
1600 assert_eq!(sub.get("workload.tier"), Some(&"prod".to_string()));
1601 }
1602
1603 #[test]
1604 fn prepare_sub_playbook_vars_renders_input_templates() {
1605 let parent: HashMap<String, String> = HashMap::new();
1606 let mut input = HashMap::new();
1607 input.insert(
1608 "url".into(),
1609 serde_yaml::Value::String("{{base}}/api".into()),
1610 );
1611 let sub = prepare_sub_playbook_vars(
1612 &parent,
1613 &HashMap::new(),
1614 &input,
1615 |t| Ok(t.replace("{{base}}", "https://example.com")),
1616 )
1617 .unwrap();
1618 assert_eq!(
1619 sub.get("workload.url"),
1620 Some(&"https://example.com/api".to_string())
1621 );
1622 }
1623
1624 #[test]
1625 fn prepare_sub_playbook_vars_coerces_yaml_numbers_and_bools() {
1626 let parent: HashMap<String, String> = HashMap::new();
1627 let mut input = HashMap::new();
1628 input.insert(
1629 "timeout".into(),
1630 serde_yaml::Value::Number(serde_yaml::Number::from(30)),
1631 );
1632 input.insert("verbose".into(), serde_yaml::Value::Bool(true));
1633 let sub = prepare_sub_playbook_vars(
1634 &parent,
1635 &HashMap::new(),
1636 &input,
1637 |t| Ok(t.to_string()),
1638 )
1639 .unwrap();
1640 assert_eq!(sub.get("workload.timeout"), Some(&"30".to_string()));
1641 assert_eq!(sub.get("workload.verbose"), Some(&"true".to_string()));
1642 }
1643
1644 #[test]
1645 fn prepare_sub_playbook_vars_passes_through_when_both_empty() {
1646 let parent: HashMap<String, String> = [(
1647 "workload.region".into(),
1648 "us-east-1".into(),
1649 )]
1650 .into();
1651 let sub = prepare_sub_playbook_vars(
1652 &parent,
1653 &HashMap::new(),
1654 &HashMap::new(),
1655 |t| Ok(t.to_string()),
1656 )
1657 .unwrap();
1658 // No input or args; parent vars come through unchanged.
1659 assert_eq!(sub.len(), 1);
1660 assert_eq!(
1661 sub.get("workload.region"),
1662 Some(&"us-east-1".to_string())
1663 );
1664 }
1665
1666 #[test]
1667 fn prepare_sub_playbook_vars_render_error_propagates() {
1668 let parent: HashMap<String, String> = HashMap::new();
1669 let mut input = HashMap::new();
1670 input.insert(
1671 "bad".into(),
1672 serde_yaml::Value::String("{{nope}}".into()),
1673 );
1674 let result = prepare_sub_playbook_vars(
1675 &parent,
1676 &HashMap::new(),
1677 &input,
1678 |_| Err(anyhow::anyhow!("render exploded")),
1679 );
1680 assert!(result.unwrap_err().to_string().contains("render exploded"));
1681 }
1682
1683 // ---- PR-2c-8 — Tool::Auth context updates -------------------------
1684
1685 #[test]
1686 fn auth_context_updates_includes_token_and_provider() {
1687 let updates = auth_context_updates("gcp", "tok-123", None);
1688 let map: HashMap<String, String> = updates.into_iter().collect();
1689 assert_eq!(map.get("auth.token"), Some(&"tok-123".to_string()));
1690 assert_eq!(map.get("auth.provider"), Some(&"gcp".to_string()));
1691 assert!(map.get("auth.project").is_none());
1692 }
1693
1694 #[test]
1695 fn auth_context_updates_includes_project_when_set() {
1696 let updates = auth_context_updates("adc", "t", Some("my-project"));
1697 let map: HashMap<String, String> = updates.into_iter().collect();
1698 assert_eq!(
1699 map.get("auth.project"),
1700 Some(&"my-project".to_string())
1701 );
1702 assert_eq!(map.get("auth.token"), Some(&"t".to_string()));
1703 assert_eq!(map.get("auth.provider"), Some(&"adc".to_string()));
1704 }
1705
1706 #[test]
1707 fn auth_context_updates_skips_empty_project() {
1708 let updates = auth_context_updates("gcp", "t", Some(""));
1709 let map: HashMap<String, String> = updates.into_iter().collect();
1710 assert!(map.get("auth.project").is_none());
1711 }
1712
1713 #[test]
1714 fn auth_context_updates_orders_project_before_token() {
1715 // The CLI's pre-PR-2c-8 inline arm set `auth.project` first,
1716 // then the token + provider after the auth call. Preserve
1717 // that ordering so observable side-effects (logs, traces)
1718 // match.
1719 let updates = auth_context_updates("gcp", "t", Some("p"));
1720 assert_eq!(updates[0].0, "auth.project");
1721 assert_eq!(updates[1].0, "auth.token");
1722 assert_eq!(updates[2].0, "auth.provider");
1723 }
1724
1725 // ---- PR-2c-8 — Sink payload formatting + CSV ----------------------
1726
1727 #[test]
1728 fn format_sink_payload_json_passthrough() {
1729 let raw = r#"{"k": "v"}"#;
1730 let out = format_sink_payload(&SinkFormat::Json, raw).unwrap();
1731 assert_eq!(out, raw);
1732 }
1733
1734 #[test]
1735 fn format_sink_payload_yaml_converts_json_object() {
1736 let raw = r#"{"k": "v"}"#;
1737 let out = format_sink_payload(&SinkFormat::Yaml, raw).unwrap();
1738 let reparsed: serde_yaml::Value = serde_yaml::from_str(&out).unwrap();
1739 assert_eq!(reparsed["k"].as_str(), Some("v"));
1740 }
1741
1742 #[test]
1743 fn format_sink_payload_yaml_falls_back_when_not_json() {
1744 let raw = "not even close to json";
1745 let out = format_sink_payload(&SinkFormat::Yaml, raw).unwrap();
1746 assert_eq!(out, raw);
1747 }
1748
1749 #[test]
1750 fn format_sink_payload_csv_uses_json_to_csv() {
1751 let raw = r#"[{"a":1,"b":2},{"a":3,"b":4}]"#;
1752 let out = format_sink_payload(&SinkFormat::Csv, raw).unwrap();
1753 assert!(out.contains("a,b\n") || out.contains("b,a\n"));
1754 // Two data rows + header.
1755 assert_eq!(out.lines().count(), 3);
1756 }
1757
1758 #[test]
1759 fn json_to_csv_returns_input_for_non_array() {
1760 assert_eq!(json_to_csv("not json").unwrap(), "not json");
1761 assert_eq!(json_to_csv(r#"{"k":"v"}"#).unwrap(), r#"{"k":"v"}"#);
1762 }
1763
1764 #[test]
1765 fn json_to_csv_returns_input_for_empty_array() {
1766 assert_eq!(json_to_csv("[]").unwrap(), "[]");
1767 }
1768
1769 #[test]
1770 fn json_to_csv_emits_header_and_rows_for_object_array() {
1771 let raw = r#"[{"name":"alice","age":30},{"name":"bob","age":25}]"#;
1772 let csv = json_to_csv(raw).unwrap();
1773 let lines: Vec<&str> = csv.lines().collect();
1774 assert_eq!(lines.len(), 3);
1775 // Header derived from first object's keys (order
1776 // preserved by serde_json::Map).
1777 assert!(lines[0] == "name,age" || lines[0] == "age,name");
1778 // Each subsequent line should contain both values.
1779 assert!(lines[1].contains("alice") && lines[1].contains("30"));
1780 assert!(lines[2].contains("bob") && lines[2].contains("25"));
1781 }
1782
1783 #[test]
1784 fn json_to_csv_quotes_strings_with_commas() {
1785 let raw = r#"[{"label":"a, b","n":1}]"#;
1786 let csv = json_to_csv(raw).unwrap();
1787 // Quoted field with the comma preserved inside.
1788 assert!(csv.contains("\"a, b\""), "csv: {csv}");
1789 }
1790
1791 #[test]
1792 fn json_to_csv_doubles_embedded_quotes() {
1793 let raw = r#"[{"q":"she said \"hi\""}]"#;
1794 let csv = json_to_csv(raw).unwrap();
1795 // RFC-4180-style: embedded `"` doubled, whole field quoted.
1796 assert!(csv.contains("\"she said \"\"hi\"\"\""), "csv: {csv}");
1797 }
1798
1799 #[test]
1800 fn json_to_csv_missing_field_emits_empty() {
1801 let raw = r#"[{"a":1,"b":2},{"a":3}]"#; // second row missing `b`
1802 let csv = json_to_csv(raw).unwrap();
1803 let lines: Vec<&str> = csv.lines().collect();
1804 // The second data row should end with a trailing comma or
1805 // have an empty field for `b`.
1806 assert!(
1807 lines[2].ends_with(",") || lines[2].contains(",,"),
1808 "csv: {csv}"
1809 );
1810 }
1811
1812 #[test]
1813 fn to_tools_config_rhai_carries_code() {
1814 let tool = Tool::Rhai {
1815 code: "let x = 1; x + 1".into(),
1816 args: HashMap::new(),
1817 };
1818 let cfg = to_tools_config(&tool);
1819 assert_eq!(cfg.kind, "rhai");
1820 assert_eq!(cfg.config["code"], "let x = 1; x + 1");
1821 }
1822
1823 #[test]
1824 fn to_tools_config_sink_emits_typed_target() {
1825 let tool = Tool::Sink {
1826 target: SinkTarget::File {
1827 path: "/tmp/out.json".into(),
1828 },
1829 format: SinkFormat::Json,
1830 };
1831 let cfg = to_tools_config(&tool);
1832 assert_eq!(cfg.kind, "sink");
1833 assert_eq!(cfg.config["target"]["type"], "file");
1834 assert_eq!(cfg.config["target"]["path"], "/tmp/out.json");
1835 assert_eq!(cfg.config["format"], "json");
1836 }
1837
1838 #[test]
1839 fn from_tools_result_success_returns_data_string() {
1840 let result = ToolResult::success(serde_json::Value::String("hello".into()));
1841 let outcome = from_tools_result(result).unwrap();
1842 assert_eq!(outcome.result, Some("hello".into()));
1843 }
1844
1845 #[test]
1846 fn from_tools_result_success_serialises_non_string_data() {
1847 let result = ToolResult::success(serde_json::json!({"k": "v"}));
1848 let outcome = from_tools_result(result).unwrap();
1849 assert_eq!(outcome.result, Some(r#"{"k":"v"}"#.into()));
1850 }
1851
1852 #[test]
1853 fn from_tools_result_success_falls_back_to_stdout() {
1854 let mut result = ToolResult::success(serde_json::Value::Null);
1855 result.data = None;
1856 result.stdout = Some("script output".into());
1857 let outcome = from_tools_result(result).unwrap();
1858 assert_eq!(outcome.result, Some("script output".into()));
1859 }
1860
1861 #[test]
1862 fn from_tools_result_error_propagates_message() {
1863 let result = ToolResult::error("connection refused");
1864 let err = from_tools_result(result).unwrap_err();
1865 assert!(err.to_string().contains("connection refused"));
1866 }
1867
1868 // PR-2c-8 removed the
1869 // `dispatch_via_registry_returns_empty_for_unwired_kind` test:
1870 // every Tool variant now either dispatches through the registry
1871 // (Rhai/Shell/Http/DuckDb), bails with a § H.10 finding
1872 // (Playbook/Auth/Sink), or bails as unsupported. See the
1873 // per-variant dispatch tests for the wired kinds and the bail
1874 // tests for Playbook/Auth/Sink/Unsupported.
1875
1876 #[tokio::test]
1877 async fn dispatch_auth_bails_pointing_at_helper() {
1878 // PR-2c-8: Tool::Auth has no bridge dispatch path. The
1879 // bridge bails with a message pointing at
1880 // `resolve_auth_to_bearer` + `auth_context_updates` so
1881 // misuse is loud rather than silent.
1882 let vars = empty_vars();
1883 let bridge = bridge_ctx(&vars);
1884 let tool = Tool::Auth {
1885 provider: "adc".into(),
1886 scopes: vec![],
1887 project: None,
1888 };
1889 let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1890 let msg = err.to_string();
1891 assert!(
1892 msg.contains("Tool::Auth")
1893 && msg.contains("resolve_auth_to_bearer")
1894 && msg.contains("auth_context_updates"),
1895 "error should point at the helpers: {msg}"
1896 );
1897 }
1898
1899 #[tokio::test]
1900 async fn dispatch_sink_bails_pointing_at_helper() {
1901 // PR-2c-8: Tool::Sink has no bridge dispatch path either —
1902 // noetl-tools' TransferTool is database-to-database only.
1903 // The bridge bails with a message pointing at
1904 // `format_sink_payload` for format conversion.
1905 let vars = empty_vars();
1906 let bridge = bridge_ctx(&vars);
1907 let tool = Tool::Sink {
1908 target: crate::playbook::SinkTarget::File {
1909 path: "/tmp/out.json".into(),
1910 },
1911 format: SinkFormat::Json,
1912 };
1913 let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1914 let msg = err.to_string();
1915 assert!(
1916 msg.contains("Tool::Sink") && msg.contains("format_sink_payload"),
1917 "error should point at the helper: {msg}"
1918 );
1919 }
1920
1921 #[tokio::test]
1922 async fn dispatch_playbook_bails_with_h10_finding() {
1923 // PR-2c-7: `Tool::Playbook` is not bridgeable. Make sure
1924 // the dispatch arm bails with a descriptive error rather
1925 // than silently returning an empty outcome.
1926 let vars = empty_vars();
1927 let bridge = bridge_ctx(&vars);
1928 let tool = Tool::Playbook {
1929 path: "sub.yaml".into(),
1930 args: HashMap::new(),
1931 input: HashMap::new(),
1932 };
1933 let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1934 let msg = err.to_string();
1935 assert!(
1936 msg.contains("Tool::Playbook")
1937 && msg.contains("not bridgeable")
1938 && msg.contains("§ H.10"),
1939 "error message should explain the § H.10 finding: {msg}"
1940 );
1941 }
1942
1943 // ---- PR-2c-4 — Tool::Shell bridge integration --------------------
1944
1945 #[tokio::test]
1946 async fn dispatch_shell_single_command_returns_stdout() {
1947 let vars = empty_vars();
1948 let bridge = bridge_ctx(&vars);
1949 let tool = Tool::Shell {
1950 cmds: CmdsList::Single("echo bridged".into()),
1951 };
1952 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1953 // The bridge trims the trailing newline that `echo` adds so
1954 // the step result matches the CLI's pre-PR-2c-4 contract
1955 // (per-line stdout joined without trailing whitespace).
1956 assert_eq!(outcome.result, Some("bridged".into()));
1957 }
1958
1959 #[tokio::test]
1960 async fn dispatch_shell_multiple_returns_last_command_stdout() {
1961 // CLI semantic: with CmdsList::Multiple, each command runs
1962 // in its own bash invocation; the step result is the last
1963 // command's stdout.
1964 let vars = empty_vars();
1965 let bridge = bridge_ctx(&vars);
1966 let tool = Tool::Shell {
1967 cmds: CmdsList::Multiple(vec![
1968 "echo first".into(),
1969 "echo second".into(),
1970 "echo third".into(),
1971 ]),
1972 };
1973 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1974 assert_eq!(outcome.result, Some("third".into()));
1975 }
1976
1977 #[tokio::test]
1978 async fn dispatch_shell_failure_propagates_error() {
1979 let vars = empty_vars();
1980 let bridge = bridge_ctx(&vars);
1981 let tool = Tool::Shell {
1982 cmds: CmdsList::Single("exit 7".into()),
1983 };
1984 let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1985 // noetl-tools' shell tool reports non-zero exit codes by
1986 // surfacing ToolResult.status == Error or by returning
1987 // result with exit_code set; either way the bridge's
1988 // from_tools_result converts that into an anyhow::Error.
1989 assert!(
1990 err.to_string().contains("shell")
1991 || err.to_string().contains("exit")
1992 || err.to_string().contains("failed"),
1993 "error message: {}",
1994 err
1995 );
1996 }
1997
1998 #[tokio::test]
1999 async fn dispatch_shell_single_with_newlines_runs_each_line_independently() {
2000 // CLI semantic: CmdsList::Single splits on newlines into
2001 // separate bash invocations. This means `cd /tmp` on one
2002 // line doesn't change the cwd of the next line.
2003 let vars = empty_vars();
2004 let bridge = bridge_ctx(&vars);
2005 let tool = Tool::Shell {
2006 cmds: CmdsList::Single("echo first_line\necho second_line".into()),
2007 };
2008 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2009 assert_eq!(outcome.result, Some("second_line".into()));
2010 }
2011
2012 #[tokio::test]
2013 async fn dispatch_via_registry_unsupported_errors() {
2014 let vars = empty_vars();
2015 let bridge = bridge_ctx(&vars);
2016 let tool = Tool::Unsupported;
2017 let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
2018 assert!(err.to_string().contains("unsupported"));
2019 }
2020
2021 // ---- PR-2c-3 — Tool::Rhai bridge integration ---------------------
2022
2023 #[tokio::test]
2024 async fn dispatch_rhai_evaluates_simple_arithmetic() {
2025 let vars = empty_vars();
2026 let bridge = bridge_ctx(&vars);
2027 let tool = Tool::Rhai {
2028 code: "let x = 40; let y = 2; (x + y).to_string()".into(),
2029 args: HashMap::new(),
2030 };
2031 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2032 assert_eq!(outcome.result, Some("42".into()));
2033 }
2034
2035 #[tokio::test]
2036 async fn dispatch_rhai_reads_workload_variable_via_scope() {
2037 // `to_tools_context_for_rhai` groups the CLI's flat
2038 // `workload.region` key into a nested `workload` Map.
2039 // Rhai's `workload.region` then resolves as field access.
2040 let vars: HashMap<String, String> =
2041 [("workload.region".into(), "us-west-1".into())].into();
2042 let bridge = bridge_ctx(&vars);
2043 let tool = Tool::Rhai {
2044 code: r#"workload.region.to_string()"#.into(),
2045 args: HashMap::new(),
2046 };
2047 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2048 assert_eq!(outcome.result, Some("us-west-1".into()));
2049 }
2050
2051 #[tokio::test]
2052 async fn dispatch_rhai_reads_step_result_via_field_access() {
2053 // Step results in the CLI surface as `<step>.result` keys.
2054 // The nested-shape adapter groups them under a step-named map.
2055 let vars: HashMap<String, String> = [
2056 ("check_health.result".into(), "ok".into()),
2057 ("check_health.status".into(), "200".into()),
2058 ]
2059 .into();
2060 let bridge = bridge_ctx(&vars);
2061 let tool = Tool::Rhai {
2062 code: r#"check_health.result.to_string()"#.into(),
2063 args: HashMap::new(),
2064 };
2065 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2066 assert_eq!(outcome.result, Some("ok".into()));
2067 }
2068
2069 #[test]
2070 fn to_tools_context_for_rhai_groups_workload_prefix() {
2071 let vars: HashMap<String, String> = [
2072 ("workload.region".into(), "us-west-1".into()),
2073 ("workload.tier".into(), "prod".into()),
2074 ("vars.timeout".into(), "30".into()),
2075 ("step_a.result".into(), "done".into()),
2076 ("toplevel".into(), "kept_at_root".into()),
2077 ]
2078 .into();
2079 let bridge = bridge_ctx(&vars);
2080 let ctx = to_tools_context_for_rhai(&bridge);
2081
2082 let workload = ctx
2083 .variables
2084 .get("workload")
2085 .expect("workload group should exist")
2086 .as_object()
2087 .expect("workload should be an object");
2088 assert_eq!(workload.get("region"), Some(&serde_json::json!("us-west-1")));
2089 assert_eq!(workload.get("tier"), Some(&serde_json::json!("prod")));
2090
2091 let vars_map = ctx.variables.get("vars").and_then(|v| v.as_object()).unwrap();
2092 assert_eq!(vars_map.get("timeout"), Some(&serde_json::json!("30")));
2093
2094 let step_a = ctx.variables.get("step_a").and_then(|v| v.as_object()).unwrap();
2095 assert_eq!(step_a.get("result"), Some(&serde_json::json!("done")));
2096
2097 assert_eq!(
2098 ctx.variables.get("toplevel"),
2099 Some(&serde_json::json!("kept_at_root"))
2100 );
2101 }
2102
2103 #[tokio::test]
2104 async fn dispatch_rhai_string_literal_returns_unquoted() {
2105 let vars = empty_vars();
2106 let bridge = bridge_ctx(&vars);
2107 let tool = Tool::Rhai {
2108 code: r#""hello world""#.into(),
2109 args: HashMap::new(),
2110 };
2111 let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2112 // noetl-tools' RhaiTool returns the result through ToolResult.data
2113 // as a JSON value; for string results that means a JSON-quoted
2114 // string. from_tools_result strips the JSON quotes when data
2115 // is a Value::String.
2116 assert_eq!(outcome.result, Some("hello world".into()));
2117 }
2118
2119 // ---- Compiler proof: AuthConfig from playbook is still constructable
2120 // even though we don't pass it through to the bridge yet. Locks in
2121 // the field surface so PR-2c-5 / PR-2c-8 see a deliberate gap, not
2122 // a missing type.
2123 #[test]
2124 fn cli_auth_config_constructs() {
2125 let _auth = CliAuthConfig {
2126 provider: "adc".into(),
2127 scopes: vec!["https://www.googleapis.com/auth/cloud-platform".into()],
2128 };
2129 }
2130
2131 // ---- gcs_upload helper (R-3, noetl/ai-meta#31) ------------------
2132 //
2133 // These tests exercise `gcs_upload_with_store` — the inner path
2134 // shared by production (real GCS) and test (InMemory) callers.
2135 // The `gcs_upload` function (which builds the real GCS store from
2136 // env) is NOT tested here — real GCS credentials are not available
2137 // in CI. The call shape (bucket → builder → store → put) is the
2138 // same in both paths; the InMemory tests lock in the object_store
2139 // API surface and the helper's error-handling contract.
2140
2141 #[tokio::test]
2142 async fn gcs_upload_with_store_writes_data_to_object_store() {
2143 // Verifies the happy path: data is uploaded and can be read
2144 // back from the same InMemory store — proving gcs_upload_with_store
2145 // calls ObjectStore::put with the correct path + payload.
2146 use object_store::memory::InMemory;
2147 use object_store::ObjectStore;
2148
2149 let store = Arc::new(InMemory::new());
2150 gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "output/data.json", r#"{"k":"v"}"#)
2151 .await
2152 .expect("upload should succeed");
2153
2154 let path = StorePath::from("output/data.json");
2155 let retrieved = store.get(&path).await.expect("should read back uploaded object");
2156 let body = retrieved.bytes().await.expect("should get bytes");
2157 assert_eq!(body, bytes::Bytes::from(r#"{"k":"v"}"#));
2158 }
2159
2160 #[tokio::test]
2161 async fn gcs_upload_with_store_overwrites_existing_key() {
2162 // Second upload to the same key must overwrite the first — the
2163 // InMemory store's put is idempotent on the key, which is the
2164 // same contract the real GCS object-level PUT provides.
2165 use object_store::memory::InMemory;
2166 use object_store::ObjectStore;
2167
2168 let store = Arc::new(InMemory::new());
2169 gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "data.csv", "first").await.unwrap();
2170 gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "data.csv", "second").await.unwrap();
2171
2172 let path = StorePath::from("data.csv");
2173 let body = store.get(&path).await.unwrap().bytes().await.unwrap();
2174 assert_eq!(body, bytes::Bytes::from("second"));
2175 }
2176
2177 #[tokio::test]
2178 async fn gcs_upload_with_store_handles_nested_key_paths() {
2179 // GCS object keys can contain slashes (they are logical paths
2180 // within a bucket, not filesystem paths). StorePath should
2181 // preserve the full slash-separated key.
2182 use object_store::memory::InMemory;
2183 use object_store::ObjectStore;
2184
2185 let store = Arc::new(InMemory::new());
2186 gcs_upload_with_store(
2187 Arc::clone(&store) as Arc<dyn ObjectStore>,
2188 "runs/2026-06-01/output/result.json",
2189 "[]",
2190 )
2191 .await
2192 .unwrap();
2193
2194 let path = StorePath::from("runs/2026-06-01/output/result.json");
2195 let body = store.get(&path).await.unwrap().bytes().await.unwrap();
2196 assert_eq!(body, bytes::Bytes::from("[]"));
2197 }
2198
2199 #[tokio::test]
2200 async fn gcs_upload_with_store_uploads_empty_string() {
2201 // An empty payload is a valid GCS object — the helper must not
2202 // short-circuit or error on empty data.
2203 use object_store::memory::InMemory;
2204 use object_store::ObjectStore;
2205
2206 let store = Arc::new(InMemory::new());
2207 gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "empty.txt", "").await.unwrap();
2208
2209 let path = StorePath::from("empty.txt");
2210 let body = store.get(&path).await.unwrap().bytes().await.unwrap();
2211 assert_eq!(body.len(), 0);
2212 }
2213}