Skip to main content

noetl_executor/
tools_bridge.rs

1//! Bridge from the CLI's YAML-parsed [`crate::playbook::Tool`] enum
2//! onto the [`noetl_tools`] registry's dispatch API.
3//!
4//! Added in R-1.1 PR-2c-1 per § H.10.4 of Appendix H of the global
5//! hybrid cloud blueprint; fleshed out with adapter helpers in
6//! R-1.1 PR-2c-2.  This module is the integration surface between
7//! the CLI's parsed playbook and the shared tool registry the
8//! worker (R-1.3) also uses.
9//!
10//! ## Strategy B rollout
11//!
12//! Replacement of the CLI's inline tool implementations happens
13//! incrementally — one tool kind per sub-PR (PR-2c-3 rhai, PR-2c-4
14//! shell, PR-2c-5 http, PR-2c-6 duckdb, PR-2c-7 playbook, PR-2c-8
15//! auth + sink).  This module ships the adapter layer in PR-2c-2;
16//! each subsequent sub-PR fills in one [`dispatch_via_registry`]
17//! match arm and replaces the matching CLI call site in
18//! `repos/cli/src/playbook_runner.rs`.
19//!
20//! ## Why a bridge instead of converting the Tool enum directly
21//!
22//! The CLI's [`crate::playbook::Tool`] enum and the registry's
23//! [`noetl_tools::registry::ToolConfig`] carry different invariants:
24//!
25//! - The CLI's `Tool::Auth { provider, scopes, project }` resolves
26//!   credentials inline during dispatch.  The worker resolves them at
27//!   credential-resolution time (before tool dispatch).  The bridge
28//!   needs to know which mode to use; it's not a trivial enum cast.
29//! - The CLI's `Tool::Sink { target, format }` writes outputs through
30//!   the runner's filesystem helpers.  The registry would dispatch
31//!   sinks through the same `noetl-tools` registry, but the tool kind
32//!   doesn't exist on the worker side yet (PR-2c-8 may add it).
33//! - The CLI's `Tool::DuckDb { db, query, params }` opens a fresh
34//!   DuckDB connection per call.  `noetl-tools::tools::duckdb`
35//!   manages a pool.  Semantic difference; needs careful migration.
36//!
37//! Keeping the bridge explicit forces these decisions into one place
38//! instead of scattering them across each tool-kind sub-PR.
39//!
40//! ## GCS upload helper (R-3, noetl/ai-meta#31)
41//!
42//! [`gcs_upload`] wraps `object_store::gcp::GoogleCloudStorageBuilder`
43//! so the CLI's `SinkTarget::Gcs` arm no longer shells out to `gsutil`.
44//! Auth flows through the same provider chain as
45//! [`resolve_auth_to_bearer`]: workload identity on GKE, Application
46//! Default Credentials on dev hosts.  The helper accepts a pluggable
47//! `Arc<dyn ObjectStore>` so integration tests substitute an
48//! `object_store::memory::InMemory` store without real GCS.  See
49//! [`gcs_upload`] for the full credential-chain and error-shape notes.
50
51#![allow(dead_code)] // until PR-2c-4 onwards wires the call sites in.
52
53use std::collections::HashMap;
54use std::sync::Arc;
55use std::time::Instant;
56
57use anyhow::Result;
58use bytes::Bytes;
59use object_store::path::Path as StorePath;
60use object_store::ObjectStore;
61use object_store::PutPayload;
62use noetl_tools::auth::GcpAuth;
63use noetl_tools::context::ExecutionContext as ToolsExecutionContext;
64use noetl_tools::registry::{Tool as ToolsRegistryTool, ToolConfig};
65use noetl_tools::result::{ToolResult, ToolStatus};
66use noetl_tools::tools::{DuckdbTool, HttpTool, RhaiTool, ShellTool};
67use tracing::{info_span, Instrument};
68
69use crate::playbook::{AuthConfig as CliAuthConfig, CmdsList, SinkFormat, Tool};
70
71// ---------------------------------------------------------------------------
72// Bridge outcome — what the dispatch returns back to the caller.
73// ---------------------------------------------------------------------------
74
75/// Outcome of a bridged tool dispatch.
76///
77/// The shape matches the existing CLI surface where
78/// `PlaybookRunner::execute_tool` returns `Result<Option<String>>`:
79/// `result == Some(s)` for a successful tool execution that produced
80/// output the runner stores in `step_results[step].result`; `None`
81/// for tools that do not produce a per-step string result (e.g.
82/// fire-and-forget sinks).
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct BridgeOutcome {
85    pub result: Option<String>,
86}
87
88impl BridgeOutcome {
89    pub fn empty() -> Self {
90        Self { result: None }
91    }
92}
93
94// ---------------------------------------------------------------------------
95// Bridge context — what the dispatch needs from the caller.
96// ---------------------------------------------------------------------------
97
98/// Per-call context for the bridge.  Groups together what would
99/// otherwise be many parameters threaded through every dispatch site.
100///
101/// The CLI's `ExecutionContext` (`repos/cli/src/playbook_runner.rs`)
102/// has a different shape than [`ToolsExecutionContext`] — the CLI
103/// uses `HashMap<String, String>` for variables and tracks step
104/// results separately; `noetl-tools` uses `HashMap<String,
105/// serde_json::Value>` and bundles many more execution-level fields
106/// (server_url, worker_id, command_id, etc.).
107///
108/// `BridgeContext` is the narrow view the CLI hands to the bridge;
109/// [`to_tools_context`] expands it into the full
110/// [`ToolsExecutionContext`] shape.
111pub struct BridgeContext<'a> {
112    /// Execution id — required by [`ToolsExecutionContext`].  CLI
113    /// local mode synthesises this from the start time / playbook
114    /// path; the worker uses the snowflake id from `noetl.command`.
115    pub execution_id: i64,
116
117    /// Step name the bridged tool is running under.
118    pub step: &'a str,
119
120    /// CLI variables map (workload.*, vars.*, <step>.result, etc.).
121    pub variables: &'a HashMap<String, String>,
122
123    /// Control-plane server URL.  Empty string when running in
124    /// CLI local mode without a server backend.
125    pub server_url: String,
126
127    /// Worker id / command id — `None` in CLI local mode.
128    pub worker_id: Option<String>,
129    pub command_id: Option<String>,
130}
131
132// ---------------------------------------------------------------------------
133// Adapters
134// ---------------------------------------------------------------------------
135
136/// Convert a [`BridgeContext`] into the [`ToolsExecutionContext`]
137/// shape `noetl-tools` tools expect.  String variables become
138/// [`serde_json::Value::String`] entries; secrets stay empty (CLI
139/// local mode resolves credentials at the credential-resolver layer,
140/// not at tool dispatch).
141///
142/// Variable shape: **flat**.  Each CLI variable `workload.region`
143/// becomes a JSON value at the same flat key in the resulting map.
144/// This matches what most `noetl-tools` tools (http / postgres / etc.)
145/// expect from their template engine.  The rhai tool needs a
146/// *nested* shape so `workload.region` is reachable as a Rhai field
147/// access on a `workload` map; see [`to_tools_context_for_rhai`] for
148/// the restructured variant used inside the rhai dispatch arm.
149pub fn to_tools_context(bridge: &BridgeContext) -> ToolsExecutionContext {
150    let variables: HashMap<String, serde_json::Value> = bridge
151        .variables
152        .iter()
153        .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
154        .collect();
155
156    ToolsExecutionContext {
157        execution_id: bridge.execution_id,
158        step: bridge.step.to_string(),
159        variables,
160        server_url: bridge.server_url.clone(),
161        worker_id: bridge.worker_id.clone(),
162        command_id: bridge.command_id.clone(),
163        ..ToolsExecutionContext::default()
164    }
165}
166
167/// Build a [`ToolsExecutionContext`] whose `variables` map matches the
168/// scope shape the CLI's inline `execute_rhai_script` produced — flat
169/// `workload.region` / `vars.x` / `<step>.<field>` keys grouped into
170/// nested objects so Rhai's `workload.region` / `vars.x` / `<step>.<field>`
171/// field-access syntax works.
172///
173/// PR-2c-3 introduces this for the rhai dispatch arm.  Other tool
174/// kinds (http, postgres, duckdb, etc.) continue to consume the flat
175/// shape from [`to_tools_context`] because their template engines
176/// expect the `{{workload.region}}` lookup style, not Rhai-style
177/// field navigation.
178pub fn to_tools_context_for_rhai(bridge: &BridgeContext) -> ToolsExecutionContext {
179    let mut variables: HashMap<String, serde_json::Value> = HashMap::new();
180    let mut workload_map: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
181    let mut vars_map: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
182    let mut step_maps: HashMap<String, serde_json::Map<String, serde_json::Value>> =
183        HashMap::new();
184
185    for (key, value) in bridge.variables {
186        let val = serde_json::Value::String(value.clone());
187        if let Some(suffix) = key.strip_prefix("workload.") {
188            workload_map.insert(suffix.to_string(), val);
189        } else if let Some(suffix) = key.strip_prefix("vars.") {
190            vars_map.insert(suffix.to_string(), val);
191        } else if let Some((step, field)) = key.split_once('.') {
192            step_maps
193                .entry(step.to_string())
194                .or_default()
195                .insert(field.to_string(), val);
196        } else {
197            // Unprefixed keys land at the top level — same shape as
198            // [`to_tools_context`].
199            variables.insert(key.clone(), val);
200        }
201    }
202
203    if !workload_map.is_empty() {
204        variables.insert(
205            "workload".to_string(),
206            serde_json::Value::Object(workload_map),
207        );
208    }
209    if !vars_map.is_empty() {
210        variables.insert("vars".to_string(), serde_json::Value::Object(vars_map));
211    }
212    for (step, map) in step_maps {
213        variables.insert(step, serde_json::Value::Object(map));
214    }
215
216    ToolsExecutionContext {
217        execution_id: bridge.execution_id,
218        step: bridge.step.to_string(),
219        variables,
220        server_url: bridge.server_url.clone(),
221        worker_id: bridge.worker_id.clone(),
222        command_id: bridge.command_id.clone(),
223        ..ToolsExecutionContext::default()
224    }
225}
226
227/// Build a [`ToolConfig`] from a CLI [`Tool`] enum variant.
228///
229/// The `kind` string matches what [`noetl_tools::registry::ToolRegistry`]
230/// uses for dispatch.  The `config` payload is the variant's fields
231/// serialized as JSON; the receiving tool deserializes its own
232/// expected schema from this value (e.g. `noetl_tools::tools::shell`
233/// expects `{"cmds": [...]}`).
234///
235/// `Tool::Unsupported` returns a `ToolConfig` with `kind: "unsupported"`
236/// — dispatch will fail at registry lookup, which matches the CLI's
237/// current behaviour of emitting an error.
238pub fn to_tools_config(tool: &Tool) -> ToolConfig {
239    let (kind, config) = match tool {
240        Tool::Shell { cmds } => {
241            // noetl-tools::ShellConfig expects a single `command`
242            // string.  CLI's CmdsList::Multiple becomes a newline-
243            // joined block (one bash invocation with a multi-line
244            // script); CmdsList::Single becomes the string verbatim.
245            //
246            // Important: this is the per-call ToolConfig shape.  The
247            // Tool::Shell arm of `dispatch_via_registry` does NOT use
248            // this helper because the CLI's runtime semantics require
249            // one bash invocation PER command (independent process,
250            // no shared cwd/env state) — the dispatch arm loops and
251            // builds per-command ToolConfigs via [`shell_command_config`].
252            (
253                "shell",
254                serde_json::json!({
255                    "command": match cmds {
256                        CmdsList::Single(s) => s.clone(),
257                        CmdsList::Multiple(v) => v.join("\n"),
258                    },
259                    "shell": "bash",
260                    "capture": true,
261                }),
262            )
263        }
264        Tool::Http {
265            method,
266            url,
267            headers,
268            params,
269            body,
270            auth: _, // resolved at dispatch time into a Bearer header; not threaded through ToolConfig.auth (see PR-2c-5)
271        } => (
272            "http",
273            // noetl-tools' HttpConfig deserializes the method via
274            // `#[serde(rename_all = "UPPERCASE")]`, so we emit the
275            // uppercased CLI string here.  The body is wrapped as a
276            // JSON Value: if the CLI's body parses as JSON we pass the
277            // parsed Value (so reqwest serialises it as JSON with the
278            // right Content-Type); otherwise we pass it as a JSON
279            // string which noetl-tools sends verbatim as the body.
280            serde_json::json!({
281                "method": method.to_uppercase(),
282                "url": url,
283                "headers": headers,
284                "params": params,
285                "body": body.as_deref().map(http_body_value),
286            }),
287        ),
288        Tool::Playbook { path, args, input } => (
289            "playbook",
290            serde_json::json!({
291                "path": path,
292                "args": args,
293                "input": input,
294            }),
295        ),
296        Tool::DuckDb { db, query, params } => (
297            // noetl-tools' DuckdbConfig schema uses `db_path` (not
298            // `db`), `query` is required (so we substitute an empty
299            // string when the CLI doesn't carry one — the dispatch
300            // arm short-circuits in that case), and params are
301            // `Vec<serde_json::Value>` rather than `Vec<String>`.
302            // Conversion is faithful: a CLI string param becomes a
303            // JSON string value bound at the `?` placeholder by
304            // noetl-tools' DuckdbTool.
305            //
306            // Compatibility note: the CLI's pre-PR-2c-6
307            // `execute_duckdb_query` accepted but **ignored** the
308            // `params` field (signature was `_params: &[String]`).
309            // The bridge now binds them, which is a feature gain
310            // documented in the PR body and on the executor-crate-
311            // architecture wiki page.
312            "duckdb",
313            serde_json::json!({
314                "db_path": db,
315                "query": query.clone().unwrap_or_default(),
316                "params": params
317                    .iter()
318                    .map(|p| serde_json::Value::String(p.clone()))
319                    .collect::<Vec<_>>(),
320                "as_objects": true,
321            }),
322        ),
323        Tool::Rhai { code, args } => (
324            "rhai",
325            serde_json::json!({
326                "code": code,
327                "args": args,
328            }),
329        ),
330        Tool::Auth { provider, scopes, project } => (
331            "auth",
332            serde_json::json!({
333                "provider": provider,
334                "scopes": scopes,
335                "project": project,
336            }),
337        ),
338        Tool::Sink { target, format } => (
339            "sink",
340            serde_json::json!({
341                "target": target_to_value(target),
342                "format": format!("{:?}", format).to_lowercase(),
343            }),
344        ),
345        Tool::Unsupported => ("unsupported", serde_json::json!({})),
346    };
347
348    ToolConfig {
349        kind: kind.to_string(),
350        config,
351        timeout: None,
352        retry: None,
353        auth: None,
354    }
355}
356
357/// Build a single-command ToolConfig for the shell tool.  Used by
358/// the `Tool::Shell` dispatch arm to preserve the CLI's per-command
359/// bash-invocation semantics (independent process, no shared
360/// cwd/env state across commands).
361fn shell_command_config(command: &str) -> ToolConfig {
362    ToolConfig {
363        kind: "shell".to_string(),
364        config: serde_json::json!({
365            "command": command,
366            "shell": "bash",
367            "capture": true,
368        }),
369        timeout: None,
370        retry: None,
371        auth: None,
372    }
373}
374
375/// Convert a CLI HTTP body string into a JSON [`serde_json::Value`]
376/// suitable for noetl-tools' `HttpConfig.body` field.  If the body
377/// parses as JSON, the parsed value is returned (and `reqwest` sends
378/// it with `Content-Type: application/json`).  Otherwise the body
379/// is wrapped as a [`Value::String`] which `reqwest` writes
380/// verbatim as the request body.
381fn http_body_value(body: &str) -> serde_json::Value {
382    serde_json::from_str(body).unwrap_or_else(|_| serde_json::Value::String(body.to_string()))
383}
384
385/// Resolve a CLI [`AuthConfig`] to a Bearer token using noetl-tools'
386/// [`GcpAuth`] provider.
387///
388/// CLI providers `"gcp"`, `"google"`, and `"adc"` all map to GCP
389/// Application Default Credentials.  Any other provider value
390/// returns an error matching the CLI's pre-PR-2c-5 behaviour.
391///
392/// This replaces the CLI's inline `get_auth_token` (which shelled
393/// out to `gcloud auth print-access-token`).  See semantic
394/// divergence row on the executor-crate-architecture wiki page.
395pub async fn resolve_auth_to_bearer(cfg: &CliAuthConfig) -> Result<String> {
396    match cfg.provider.as_str() {
397        "gcp" | "google" | "adc" => {
398            let gcp = GcpAuth::new();
399            let scopes: Vec<&str> = cfg.scopes.iter().map(|s| s.as_str()).collect();
400            let token = if scopes.is_empty() {
401                gcp.get_default_token()
402                    .await
403                    .map_err(|e| anyhow::anyhow!("failed to get GCP access token: {}", e))?
404            } else {
405                gcp.get_token(&scopes)
406                    .await
407                    .map_err(|e| anyhow::anyhow!("failed to get GCP access token: {}", e))?
408            };
409            Ok(token)
410        }
411        other => anyhow::bail!(
412            "unsupported auth provider: {}. Supported: gcp, google, adc",
413            other
414        ),
415    }
416}
417
418/// Build the noetl-tools [`ToolConfig`] for an HTTP request.
419///
420/// Identical to the [`to_tools_config`] `Tool::Http` arm but pulled
421/// out so the dispatch arm can also inject an `Authorization:
422/// Bearer <token>` header when a CLI `AuthConfig` is present
423/// (resolved via [`resolve_auth_to_bearer`]).
424///
425/// CLI's `auth` is intentionally NOT mapped to noetl-tools'
426/// `ToolConfig.auth` field: that field expects an `AuthConfig` with
427/// `credential` / `token` lookup against `ExecutionContext.secrets`,
428/// which CLI local mode does not populate.  Pre-resolving the
429/// token and injecting it as a header keeps the CLI's existing
430/// authority semantics (the CLI process's gcloud / ADC chain) and
431/// avoids reshaping the credential resolver path.
432fn http_tool_config(
433    method: &str,
434    url: &str,
435    headers: &HashMap<String, String>,
436    params: &HashMap<String, String>,
437    body: Option<&str>,
438    bearer: Option<&str>,
439) -> ToolConfig {
440    let mut merged_headers = headers.clone();
441    if let Some(token) = bearer {
442        merged_headers.insert(
443            "Authorization".to_string(),
444            format!("Bearer {}", token),
445        );
446    }
447    ToolConfig {
448        kind: "http".to_string(),
449        config: serde_json::json!({
450            "method": method.to_uppercase(),
451            "url": url,
452            "headers": merged_headers,
453            "params": params,
454            "body": body.map(http_body_value),
455        }),
456        timeout: None,
457        retry: None,
458        auth: None,
459    }
460}
461
462/// Reshape noetl-tools' HTTP result envelope back to the CLI's
463/// pre-PR-2c-5 shape.
464///
465/// noetl-tools' HttpTool always packs `data: {"status_code":
466/// u16, "headers": {...}, "body": <json>}` into the ToolResult,
467/// regardless of whether the HTTP response was 2xx (Success) or
468/// 4xx/5xx (Error).  The CLI's `execute_http_request` returned the
469/// envelope `{"status": <int>, "body": <json>}` for ALL HTTP
470/// responses (including 4xx/5xx) so playbook steps could branch on
471/// the status code.  We preserve that contract here: only network-
472/// transport failures bubble up as `anyhow::Error`; HTTP error
473/// statuses come back inside the JSON envelope.
474fn reshape_http_result(result: ToolResult) -> Result<BridgeOutcome> {
475    if let Some(data) = result.data {
476        let status_code = data
477            .get("status_code")
478            .and_then(|v| v.as_u64())
479            .unwrap_or(0) as i32;
480        let body = data
481            .get("body")
482            .cloned()
483            .unwrap_or(serde_json::Value::Null);
484        let envelope = serde_json::json!({
485            "status": status_code,
486            "body": body,
487        });
488        return Ok(BridgeOutcome {
489            result: Some(envelope.to_string()),
490        });
491    }
492    // No data — fall back to the generic from_tools_result path so
493    // we surface whatever error / stdout the tool emitted.
494    from_tools_result(result)
495}
496
497/// Build a [`ToolConfig`] for a DuckDB query.
498///
499/// Used by the `Tool::DuckDb` dispatch arm.  Path resolution
500/// (playbook-relative vs absolute) and `mkdir -p` of the parent
501/// directory are handled at the CLI call site BEFORE the bridge is
502/// invoked, so this helper receives an already-resolved absolute
503/// path string (or `:memory:` for in-memory mode).
504fn duckdb_tool_config(
505    db_path: &str,
506    query: &str,
507    params: &[String],
508) -> ToolConfig {
509    ToolConfig {
510        kind: "duckdb".to_string(),
511        config: serde_json::json!({
512            "db_path": db_path,
513            "query": query,
514            "params": params
515                .iter()
516                .map(|p| serde_json::Value::String(p.clone()))
517                .collect::<Vec<_>>(),
518            // CLI's pre-PR-2c-6 SELECT result shape was an array of
519            // JSON objects keyed by column name; `as_objects: true`
520            // matches that.  `reshape_duckdb_result` then unwraps
521            // the noetl-tools envelope back to the raw array.
522            "as_objects": true,
523        }),
524        timeout: None,
525        retry: None,
526        auth: None,
527    }
528}
529
530/// Reshape noetl-tools' DuckDB result envelope back to the CLI's
531/// pre-PR-2c-6 shape.
532///
533/// noetl-tools' DuckdbTool returns:
534/// - SELECT / WITH: `data: {"columns": [...], "rows": [{...}, ...],
535///   "row_count": N}`
536/// - non-SELECT:    `data: {"affected_rows": N}`
537///
538/// The CLI's `execute_duckdb_query` returned:
539/// - SELECT / WITH: a JSON array of objects (pretty-printed)
540/// - non-SELECT:    the literal string `{"status": "ok"}`
541///
542/// `reshape_duckdb_result` maps the former onto the latter so
543/// playbook steps that read `<step>.result[0].col_name` keep
544/// working.  `affected_rows` from the noetl-tools envelope is
545/// dropped on purpose — the CLI never exposed it.
546fn reshape_duckdb_result(result: ToolResult) -> Result<BridgeOutcome> {
547    let data = match result.data {
548        Some(d) => d,
549        None => return from_tools_result(result),
550    };
551
552    if let Some(rows) = data.get("rows").and_then(|v| v.as_array()) {
553        // SELECT path.  Return the rows array as a pretty-printed
554        // JSON string — matches the CLI's
555        // `serde_json::to_string_pretty(&results)`.
556        let pretty = serde_json::to_string_pretty(rows)?;
557        return Ok(BridgeOutcome { result: Some(pretty) });
558    }
559
560    if data.get("affected_rows").is_some() {
561        // Non-SELECT path.  CLI emitted the literal `{"status":
562        // "ok"}` here; preserve that.
563        return Ok(BridgeOutcome {
564            result: Some(r#"{"status": "ok"}"#.to_string()),
565        });
566    }
567
568    // Unknown shape — fall back to the generic from_tools_result
569    // path so we still surface whatever the tool emitted.
570    from_tools_result(ToolResult {
571        status: result.status,
572        data: Some(data),
573        error: result.error,
574        stdout: result.stdout,
575        stderr: result.stderr,
576        exit_code: result.exit_code,
577        duration_ms: result.duration_ms,
578        // noetl-tools 2.21 added this marker field; the executor
579        // bridge has nothing to attach here (DuckDB doesn't dispatch
580        // async work), so it always falls through as `None`.
581        pending_callback: result.pending_callback,
582    })
583}
584
585/// Prepare the variable map for a sub-playbook invocation.
586///
587/// Used by the CLI's `Tool::Playbook` arm (which keeps owning the
588/// tree-walker recursion per § H.10).  The helper merges the
589/// parent context's variables with the sub-playbook's
590/// `input:` (DSL v2) or `args:` (DSL v1 legacy), each rendered
591/// against the parent context via the caller-supplied
592/// `render_template` closure and prefixed with `workload.` to
593/// match the sub-playbook's expected variable shape.
594///
595/// `input` takes precedence over `args` when both are present —
596/// same precedence the CLI's pre-PR-2c-7 inline implementation
597/// applied.
598///
599/// `parent_vars`, `args`, and `input` correspond directly to the
600/// caller's `context.variables`, `Tool::Playbook.args`, and
601/// `Tool::Playbook.input` fields.  The `render` closure receives
602/// each template string and is expected to return the rendered
603/// value (the CLI passes `|t| self.render_template(t, context)`).
604///
605/// Returning a fresh `HashMap` rather than mutating in place makes
606/// the helper easy to test and matches how the inline
607/// implementation operated.
608pub fn prepare_sub_playbook_vars<F>(
609    parent_vars: &HashMap<String, String>,
610    args: &HashMap<String, String>,
611    input: &HashMap<String, serde_yaml::Value>,
612    mut render: F,
613) -> Result<HashMap<String, String>>
614where
615    F: FnMut(&str) -> Result<String>,
616{
617    let mut sub_vars = parent_vars.clone();
618
619    if !input.is_empty() {
620        // DSL v2: tool.input takes precedence — render and prefix
621        // with `workload.`.
622        for (key, value_yaml) in input {
623            let template = match value_yaml {
624                serde_yaml::Value::String(s) => s.clone(),
625                serde_yaml::Value::Number(n) => n.to_string(),
626                serde_yaml::Value::Bool(b) => b.to_string(),
627                other => serde_yaml::to_string(other)?.trim().to_string(),
628            };
629            let value = render(&template)?;
630            sub_vars.insert(format!("workload.{}", key), value);
631        }
632    } else if !args.is_empty() {
633        // DSL v1 legacy: args field — prefix with `workload.`.
634        for (key, template) in args {
635            let value = render(template)?;
636            sub_vars.insert(format!("workload.{}", key), value);
637        }
638    }
639
640    Ok(sub_vars)
641}
642
643/// Apply post-resolution `Tool::Auth` side-effects to the CLI's
644/// execution context.
645///
646/// Returns the (key, value) pairs the caller should
647/// `set_variable` on its `ExecutionContext` so subsequent steps
648/// can reference `{{ auth.token }}` etc.  Wrapping this in a
649/// helper means future call sites (the worker, integration tests)
650/// don't have to re-derive which keys to set.
651///
652/// `project` is the **already-rendered** project string (the CLI
653/// renders templates against its own context before calling this
654/// helper), or `None` if the playbook didn't supply one.
655///
656/// Output order:
657///  - `auth.project` (only if `project` is `Some` and non-empty)
658///  - `auth.token`
659///  - `auth.provider`
660///
661/// Matching the CLI's pre-PR-2c-8 ordering — `auth.project` set
662/// first by the inline arm, then the token + provider after the
663/// `resolve_auth_to_bearer` call.
664pub fn auth_context_updates(
665    provider: &str,
666    token: &str,
667    project: Option<&str>,
668) -> Vec<(String, String)> {
669    let mut updates: Vec<(String, String)> = Vec::with_capacity(3);
670    if let Some(p) = project {
671        if !p.is_empty() {
672            updates.push(("auth.project".to_string(), p.to_string()));
673        }
674    }
675    updates.push(("auth.token".to_string(), token.to_string()));
676    updates.push(("auth.provider".to_string(), provider.to_string()));
677    updates
678}
679
680/// Format the payload a `Tool::Sink` writes to its target.
681///
682/// Pure transformation lifted from the CLI's inline
683/// `Tool::Sink` arm.  The CLI passes the last step's result
684/// (already a JSON-serialized string in `ExecutionContext`) and
685/// the playbook's declared `format:` field; the helper returns
686/// the formatted string ready to write to file / DuckDB / GCS.
687///
688/// Format rules:
689/// - [`SinkFormat::Json`]: pass-through.  Same as CLI's
690///   pre-PR-2c-8 behaviour (the raw step-result string).
691/// - [`SinkFormat::Yaml`]: parse the input as JSON, then dump as
692///   YAML.  Falls back to pass-through if the input doesn't parse.
693/// - [`SinkFormat::Csv`]: see [`json_to_csv`] for the rules.
694pub fn format_sink_payload(format: &SinkFormat, raw: &str) -> Result<String> {
695    match format {
696        SinkFormat::Json => Ok(raw.to_string()),
697        SinkFormat::Yaml => {
698            if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(raw) {
699                Ok(serde_yaml::to_string(&json_val).unwrap_or_else(|_| raw.to_string()))
700            } else {
701                Ok(raw.to_string())
702            }
703        }
704        SinkFormat::Csv => json_to_csv(raw),
705    }
706}
707
708/// Convert a JSON-array-of-objects string into CSV.
709///
710/// Pure helper lifted from the CLI's inline `json_to_csv`.  Returns
711/// the input unchanged if:
712/// - it doesn't parse as JSON,
713/// - it parses as a non-array value, or
714/// - it's an empty array, or
715/// - the first element isn't a JSON object.
716///
717/// Otherwise: emits a header row from the first object's keys
718/// followed by one row per array element.  Values are converted
719/// via `Display`; strings that contain `,` or `"` are
720/// double-quoted with embedded `"` doubled — minimal RFC 4180
721/// quoting, matching the CLI's pre-PR-2c-8 implementation.
722pub fn json_to_csv(json_str: &str) -> Result<String> {
723    let value: serde_json::Value =
724        serde_json::from_str(json_str).unwrap_or(serde_json::Value::String(json_str.to_string()));
725
726    match value {
727        serde_json::Value::Array(arr) if !arr.is_empty() => {
728            let headers: Vec<String> = if let Some(serde_json::Value::Object(obj)) = arr.first() {
729                obj.keys().cloned().collect()
730            } else {
731                return Ok(json_str.to_string());
732            };
733
734            let mut csv = headers.join(",") + "\n";
735
736            for item in &arr {
737                if let serde_json::Value::Object(obj) = item {
738                    let row: Vec<String> = headers
739                        .iter()
740                        .map(|h| {
741                            obj.get(h)
742                                .map(|v| match v {
743                                    serde_json::Value::String(s) => {
744                                        if s.contains(',') || s.contains('"') {
745                                            format!("\"{}\"", s.replace('"', "\"\""))
746                                        } else {
747                                            s.clone()
748                                        }
749                                    }
750                                    _ => v.to_string(),
751                                })
752                                .unwrap_or_default()
753                        })
754                        .collect();
755                    csv.push_str(&row.join(","));
756                    csv.push('\n');
757                }
758            }
759            Ok(csv)
760        }
761        _ => Ok(json_str.to_string()),
762    }
763}
764
765// ---------------------------------------------------------------------------
766// GCS upload helper (R-3, noetl/ai-meta#31)
767// ---------------------------------------------------------------------------
768
769/// Upload `data` to `gs://<bucket>/<key>` using the `object_store` crate.
770///
771/// # Credential chain
772///
773/// Authentication defaults to the same Application Default Credentials
774/// (ADC) / workload-identity chain that [`resolve_auth_to_bearer`] uses
775/// via `gcp_auth`.  Concretely: `GoogleCloudStorageBuilder::from_env()`
776/// reads (in priority order):
777///
778/// 1. `GOOGLE_SERVICE_ACCOUNT_KEY` env var (JSON service-account key
779///    inline — useful for CI / test containers).
780/// 2. `GOOGLE_SERVICE_ACCOUNT` env var (path to a JSON key file).
781/// 3. The ambient Application Default Credentials
782///    (`~/.config/gcloud/application_default_credentials.json` on dev
783///    hosts; the GKE metadata server on cluster pods).
784///
785/// This matches GKE workload-identity on cluster and `gcloud auth
786/// application-default login` on dev hosts — the same two paths the
787/// former `gsutil cp` subprocess relied on.
788///
789/// # Error shape
790///
791/// Returns `anyhow::Error` with a human-readable message on failure
792/// (instead of a gsutil exit-code string).  The CLI's `sink_to_gcs`
793/// wrapper maps this through the usual `?` chain.
794///
795/// # Observability
796///
797/// Wraps the upload in a `gcs.upload` tracing span that carries
798/// `bucket`, `key`, and `bytes` fields so the span is grep-able in
799/// structured logs.  Upload duration is emitted as a debug-level event
800/// (`gcs.upload.duration_ms`) so tooling can aggregate latency without
801/// a Prometheus registry in the executor crate.  A future PR can
802/// promote this to a proper histogram once the executor crate grows a
803/// metrics registry.
804///
805/// # Pluggable store (testing)
806///
807/// The `store` parameter is `Arc<dyn ObjectStore>`.  Production callers
808/// pass `None` (the default GCS store is built from env); integration
809/// tests inject `Arc<object_store::memory::InMemory::new()>` to avoid
810/// real GCS calls.  See `gcs_upload_with_store` for the inner
811/// implementation that both paths share.
812pub async fn gcs_upload(bucket: &str, key: &str, data: &str) -> Result<()> {
813    use object_store::gcp::GoogleCloudStorageBuilder;
814
815    let store = GoogleCloudStorageBuilder::from_env()
816        .with_bucket_name(bucket)
817        .build()
818        .map_err(|e| anyhow::anyhow!("failed to build GCS store for bucket {:?}: {}", bucket, e))?;
819
820    gcs_upload_with_store(Arc::new(store), key, data).await
821}
822
823/// Inner upload path shared by production and test callers.
824///
825/// Production: called by [`gcs_upload`] with a real
826/// `GoogleCloudStorage` store.
827/// Tests: called directly with `Arc<InMemory>` — no GCS dependency.
828pub async fn gcs_upload_with_store(
829    store: Arc<dyn ObjectStore>,
830    key: &str,
831    data: &str,
832) -> Result<()> {
833    let bytes = Bytes::from(data.to_string());
834    let byte_len = bytes.len();
835    let path = StorePath::from(key);
836
837    let span = info_span!(
838        "gcs.upload",
839        key = key,
840        bytes = byte_len,
841    );
842
843    async move {
844        let start = Instant::now();
845
846        store
847            .put(&path, PutPayload::from_bytes(bytes))
848            .await
849            .map_err(|e| anyhow::anyhow!("GCS upload failed for key {:?}: {}", key, e))?;
850
851        let elapsed_ms = start.elapsed().as_millis();
852        tracing::debug!(
853            target: "noetl::gcs",
854            duration_ms = elapsed_ms,
855            key = key,
856            bytes = byte_len,
857            "gcs.upload complete"
858        );
859
860        Ok(())
861    }
862    .instrument(span)
863    .await
864}
865
866fn target_to_value(target: &crate::playbook::SinkTarget) -> serde_json::Value {
867    match target {
868        crate::playbook::SinkTarget::File { path } => {
869            serde_json::json!({"type": "file", "path": path})
870        }
871        crate::playbook::SinkTarget::DuckDb { db, table } => {
872            serde_json::json!({"type": "duckdb", "db": db, "table": table})
873        }
874        crate::playbook::SinkTarget::Gcs { bucket, path } => {
875            serde_json::json!({"type": "gcs", "bucket": bucket, "path": path})
876        }
877    }
878}
879
880/// Convert a [`ToolResult`] back into the bridge outcome shape the
881/// CLI consumes.  Success results carry `data` (or `stdout` if no
882/// `data` was populated) as the result string; failures bubble up
883/// as `anyhow::Error` so the CLI's existing error-handling chain
884/// continues to work.
885pub fn from_tools_result(result: ToolResult) -> Result<BridgeOutcome> {
886    match result.status {
887        ToolStatus::Success => {
888            let payload = result
889                .data
890                .map(|v| match v {
891                    serde_json::Value::String(s) => s,
892                    other => other.to_string(),
893                })
894                .or(result.stdout);
895            Ok(BridgeOutcome { result: payload })
896        }
897        ToolStatus::Error => Err(anyhow::anyhow!(
898            "tool execution failed: {}",
899            result.error.unwrap_or_else(|| "unknown error".to_string())
900        )),
901        ToolStatus::Timeout => Err(anyhow::anyhow!(
902            "tool execution timed out after {} ms",
903            result.duration_ms.unwrap_or(0)
904        )),
905    }
906}
907
908// ---------------------------------------------------------------------------
909// Dispatch — per-tool-kind match scaffold.
910// ---------------------------------------------------------------------------
911
912/// Bridge dispatch entry point.  Each tool kind is replaced
913/// incrementally in subsequent sub-PRs (PR-2c-3 onwards).
914///
915/// The function is async because every concrete `noetl-tools` tool
916/// implementation is async (`Tool::execute` is `async`).  The CLI
917/// adapts via `tokio::runtime::Handle::current().block_on(...)` if
918/// the call site is sync — see PR-2c-3's wiring for the pattern.
919pub async fn dispatch_via_registry(
920    tool: &Tool,
921    bridge: &BridgeContext<'_>,
922) -> Result<BridgeOutcome> {
923    let _config = to_tools_config(tool);
924    let _ctx = to_tools_context(bridge);
925
926    match tool {
927        Tool::Rhai { .. } => {
928            // PR-2c-3: first real tool replacement.  Builds a
929            // RhaiTool from noetl-tools, dispatches against the
930            // adapter-converted config + context, and converts the
931            // result back through `from_tools_result`.
932            //
933            // Semantic note documented in the PR body: noetl-tools'
934            // `timestamp()` returns the Unix epoch as a string
935            // (e.g. "1716847425"), whereas the CLI's inline
936            // implementation returned `chrono::Local::now()
937            // .format("%H:%M:%S")` (e.g. "14:23:45").  Other
938            // helpers (log, print, parse_json, contains, http_*,
939            // get_gcp_token, sleep, sleep_ms) match.
940            let rhai_tool = RhaiTool::new();
941            let config = to_tools_config(tool);
942            // rhai needs a nested variable shape so
943            // `workload.region` is a Rhai field-access expression.
944            let ctx = to_tools_context_for_rhai(bridge);
945            let result = rhai_tool
946                .execute(&config, &ctx)
947                .await
948                .map_err(|e| anyhow::anyhow!("rhai dispatch failed: {}", e))?;
949            from_tools_result(result)
950        }
951        Tool::Shell { cmds } => {
952            // PR-2c-4: dispatch through noetl_tools::ShellTool.
953            //
954            // CLI semantics preserved:
955            // - CmdsList::Single splits on newlines into individual
956            //   commands; each runs in its own bash invocation.
957            // - CmdsList::Multiple runs each element in its own
958            //   bash invocation in order.
959            // - Bails on first non-zero exit (CLI's existing
960            //   `anyhow::bail!("Command failed ...")`).
961            // - Returns the last command's stdout as the step result.
962            //
963            // Note vs CLI: noetl-tools' ShellTool collects stdout +
964            // stderr and returns them in the ToolResult at the end
965            // of execution.  The CLI's inline implementation
966            // streamed output to the terminal line-by-line as the
967            // command ran.  For long-running shell steps users no
968            // longer see real-time output.  Documented in the PR
969            // body and on the executor-crate-architecture wiki
970            // page's semantic-divergence table.
971            let commands: Vec<String> = match cmds {
972                CmdsList::Single(cmd) => cmd
973                    .lines()
974                    .map(|s| s.trim())
975                    .filter(|s| !s.is_empty())
976                    .map(|s| s.to_string())
977                    .collect(),
978                CmdsList::Multiple(c) => c.clone(),
979            };
980
981            let shell_tool = ShellTool::new();
982            let ctx = to_tools_context(bridge);
983            let mut last_outcome = BridgeOutcome::empty();
984            for command in commands {
985                let config = shell_command_config(&command);
986                let result = shell_tool
987                    .execute(&config, &ctx)
988                    .await
989                    .map_err(|e| anyhow::anyhow!("shell dispatch failed: {}", e))?;
990
991                // noetl-tools' shell tool packs the result into
992                // ToolResult.data as a typed JSON object:
993                //   {"exit_code": i32, "stdout": String, "stderr": String}
994                // For the CLI's step-result contract (a single
995                // string = the command's stdout), we unwrap stdout
996                // directly here.  `from_tools_result` would
997                // otherwise stringify the whole JSON dict.
998                if result.status != ToolStatus::Success {
999                    let exit_code = result
1000                        .data
1001                        .as_ref()
1002                        .and_then(|d| d.get("exit_code"))
1003                        .and_then(|v| v.as_i64());
1004                    anyhow::bail!(
1005                        "Command failed with exit code: {:?}",
1006                        exit_code
1007                    );
1008                }
1009                let stdout = result
1010                    .data
1011                    .as_ref()
1012                    .and_then(|d| d.get("stdout"))
1013                    .and_then(|v| v.as_str())
1014                    .map(|s| s.trim_end_matches('\n').to_string());
1015                last_outcome = BridgeOutcome { result: stdout };
1016            }
1017            Ok(last_outcome)
1018        }
1019        Tool::Http {
1020            method,
1021            url,
1022            headers,
1023            params,
1024            body,
1025            auth,
1026        } => {
1027            // PR-2c-5: dispatch through noetl_tools::HttpTool.
1028            //
1029            // CLI semantics preserved:
1030            // - Auth resolution via GCP ADC (gcp / google / adc).
1031            // - Step result is the JSON envelope
1032            //     `{"status": <int>, "body": <json-or-string>}`
1033            //   regardless of HTTP status code (so playbook steps
1034            //   can branch on `<step>.body.status`).
1035            //
1036            // Semantic divergences (documented on the executor-crate-
1037            // architecture wiki page):
1038            // - HTTP transport: curl subprocess → reqwest direct.
1039            // - GCP token: `gcloud auth print-access-token` shellout
1040            //   → `gcp_auth` crate (workload-identity aware on GKE).
1041            // - Body bytes: CLI sent the body string verbatim via
1042            //   `curl -d`.  noetl-tools serializes the body as JSON
1043            //   when the string parses as JSON (adding Content-Type:
1044            //   application/json automatically), otherwise sends it
1045            //   verbatim.  See `http_body_value`.
1046            let bearer = if let Some(auth_cfg) = auth {
1047                Some(resolve_auth_to_bearer(auth_cfg).await?)
1048            } else {
1049                None
1050            };
1051            let config = http_tool_config(
1052                method,
1053                url,
1054                headers,
1055                params,
1056                body.as_deref(),
1057                bearer.as_deref(),
1058            );
1059            let http_tool = HttpTool::new();
1060            let ctx = to_tools_context(bridge);
1061            let result = http_tool
1062                .execute(&config, &ctx)
1063                .await
1064                .map_err(|e| anyhow::anyhow!("http dispatch failed: {}", e))?;
1065            reshape_http_result(result)
1066        }
1067        Tool::DuckDb { db, query, params } => {
1068            // PR-2c-6: dispatch through noetl_tools::DuckdbTool.
1069            //
1070            // CLI semantics preserved:
1071            // - The CLI's call site already resolved playbook-
1072            //   relative paths (`resolve_duckdb_path`) and ran
1073            //   `mkdir -p` on the parent directory before invoking
1074            //   the bridge, so `db` here is an absolute path
1075            //   string ready to hand to DuckDB.
1076            // - SELECT / WITH queries return a JSON array of
1077            //   objects (pretty-printed).
1078            // - Non-SELECT queries return the literal envelope
1079            //   `{"status": "ok"}` (CLI never exposed
1080            //   noetl-tools' `affected_rows`).
1081            // - Empty / missing query short-circuits to an empty
1082            //   outcome, matching the CLI arm's
1083            //   `if let Some(query_str) = query` guard.
1084            //
1085            // Feature gain: CLI's pre-PR-2c-6 inline impl took a
1086            // `_params: &[String]` and silently ignored it.  The
1087            // bridge now binds those params as JSON values at
1088            // `?` placeholders.  Playbooks that had a stale
1089            // `params:` list under a query without `?` placeholders
1090            // continue to work (DuckDB ignores extra params); any
1091            // playbook that *intended* the params would now see
1092            // them applied — documented in the PR body.
1093            let query = match query {
1094                Some(q) if !q.trim().is_empty() => q,
1095                _ => return Ok(BridgeOutcome::empty()),
1096            };
1097            let config = duckdb_tool_config(db, query, params);
1098            let duckdb_tool = DuckdbTool::new();
1099            let ctx = to_tools_context(bridge);
1100            let result = duckdb_tool
1101                .execute(&config, &ctx)
1102                .await
1103                .map_err(|e| anyhow::anyhow!("duckdb dispatch failed: {}", e))?;
1104            reshape_duckdb_result(result)
1105        }
1106        Tool::Playbook { .. } => {
1107            // PR-2c-7: encodes the § H.10 architectural finding.
1108            //
1109            // `Tool::Playbook` is the recursion case of the CLI's
1110            // tree walker — it loads a sub-playbook YAML and
1111            // dispatches it through the same `PlaybookRunner` the
1112            // top-level invocation uses.  `PlaybookRunner` lives in
1113            // the CLI binary, not in `noetl-executor` or
1114            // `noetl-tools`, so routing this tool through the
1115            // bridge would require either:
1116            //   - dragging the tree walker into `noetl-executor`,
1117            //     re-opening the § H.10 question that re-scoped
1118            //     the crate to a utilities-and-types crate; or
1119            //   - adding a callback trait to `noetl-tools` that
1120            //     delegates back to the CLI binary, an
1121            //     infrastructure layer nothing else in the
1122            //     registry uses.
1123            //
1124            // The architecturally honest answer is that this tool
1125            // kind is NOT bridgeable.  The CLI's `Tool::Playbook`
1126            // arm stays inline by design.  Bailing loudly here
1127            // ensures any future code that tries to dispatch
1128            // `Tool::Playbook` through the bridge gets an
1129            // immediate, descriptive error instead of a silent
1130            // empty outcome.
1131            //
1132            // Sub-playbook variable preparation (the input + args
1133            // merging logic the CLI's call site performs before
1134            // recursing) DOES move into the executor as
1135            // [`prepare_sub_playbook_vars`] — that part is reusable
1136            // and testable independent of the tree walker.
1137            anyhow::bail!(
1138                "Tool::Playbook is not bridgeable: sub-playbook \
1139                 execution stays in the CLI's tree walker per \
1140                 § H.10 of the Rust migration roadmap. Use \
1141                 `PlaybookRunner::new(path).run()` directly from \
1142                 the CLI."
1143            );
1144        }
1145        Tool::Auth { .. } => {
1146            // PR-2c-8: `Tool::Auth` does not dispatch through the
1147            // registry.  Token resolution lives in
1148            // [`resolve_auth_to_bearer`] (added in PR-2c-5);
1149            // applying the resulting token to the CLI's
1150            // `ExecutionContext` lives in [`auth_context_updates`]
1151            // (added in PR-2c-8).  Both are sync helpers the CLI
1152            // calls directly without going through dispatch.  The
1153            // arm bails so any future code path that tries to
1154            // route a `Tool::Auth` through the registry gets a
1155            // clear, descriptive error instead of silently
1156            // returning an empty outcome.
1157            anyhow::bail!(
1158                "Tool::Auth is not bridge-dispatched: use \
1159                 `resolve_auth_to_bearer` for token resolution and \
1160                 `auth_context_updates` for applying the token to \
1161                 the caller's execution context. See § H.10 of the \
1162                 Rust migration roadmap."
1163            );
1164        }
1165        Tool::Sink { .. } => {
1166            // PR-2c-8: `Tool::Sink` does not dispatch through the
1167            // registry either.  noetl-tools' `TransferTool` is
1168            // database-to-database only (snowflake / postgres /
1169            // duckdb / http source → snowflake / postgres /
1170            // duckdb target); it has no file / GCS / object-store
1171            // target.  The CLI's three sink targets (File,
1172            // DuckDb, Gcs) each stay inline:
1173            //
1174            // - **File**: `fs::write` is a one-liner; the format
1175            //   conversion (json / yaml / csv) DID extract into
1176            //   [`format_sink_payload`] so it's reusable and
1177            //   testable.
1178            // - **DuckDb**: complex `INSERT INTO ... SELECT FROM
1179            //   read_json_auto(...)` with a single-object fallback;
1180            //   no `noetl-tools` equivalent.  Stays inline by
1181            //   design (§ H.10-style finding).
1182            // - **Gcs**: gsutil shellout.  A follow-up sub-PR
1183            //   (tracked separately) will migrate this to the
1184            //   `object_store` crate per § H.4 of Appendix H.
1185            //
1186            // The arm bails so misuse is loud.
1187            anyhow::bail!(
1188                "Tool::Sink is not bridge-dispatched: noetl-tools \
1189                 has no file / GCS / object-store target. Use \
1190                 `format_sink_payload` for format conversion; the \
1191                 CLI's sink targets (file / duckdb / gcs) stay \
1192                 inline per § H.10. GCS migration to `object_store` \
1193                 is tracked as a separate follow-up."
1194            );
1195        }
1196        Tool::Unsupported => {
1197            anyhow::bail!("unsupported tool kind");
1198        }
1199    }
1200}
1201
1202// ---------------------------------------------------------------------------
1203// Tests
1204// ---------------------------------------------------------------------------
1205
1206#[cfg(test)]
1207mod tests {
1208    use super::*;
1209    use crate::playbook::{AuthConfig as CliAuthConfig, SinkFormat, SinkTarget};
1210
1211    fn empty_vars() -> HashMap<String, String> {
1212        HashMap::new()
1213    }
1214
1215    fn bridge_ctx<'a>(vars: &'a HashMap<String, String>) -> BridgeContext<'a> {
1216        BridgeContext {
1217            execution_id: 12345,
1218            step: "test_step",
1219            variables: vars,
1220            server_url: String::new(),
1221            worker_id: None,
1222            command_id: None,
1223        }
1224    }
1225
1226    #[test]
1227    fn to_tools_context_wraps_string_variables_as_json_value() {
1228        let vars: HashMap<String, String> =
1229            [("workload.region".into(), "us-west-1".into())].into();
1230        let ctx = to_tools_context(&bridge_ctx(&vars));
1231        assert_eq!(ctx.execution_id, 12345);
1232        assert_eq!(ctx.step, "test_step");
1233        assert_eq!(
1234            ctx.variables.get("workload.region"),
1235            Some(&serde_json::Value::String("us-west-1".into()))
1236        );
1237        assert!(ctx.secrets.is_empty(), "secrets stay empty by default");
1238    }
1239
1240    #[test]
1241    fn to_tools_config_shell_single_cmd() {
1242        let tool = Tool::Shell {
1243            cmds: CmdsList::Single("ls -la".into()),
1244        };
1245        let cfg = to_tools_config(&tool);
1246        assert_eq!(cfg.kind, "shell");
1247        assert_eq!(cfg.config["command"], "ls -la");
1248        assert_eq!(cfg.config["shell"], "bash");
1249        assert_eq!(cfg.config["capture"], true);
1250        assert!(cfg.timeout.is_none());
1251    }
1252
1253    #[test]
1254    fn to_tools_config_shell_multiple_cmds_joins_with_newlines() {
1255        // The to_tools_config helper produces a SINGLE-command shape
1256        // by joining; the dispatch arm instead loops per command to
1257        // preserve the CLI's "fresh bash per command" semantics.
1258        let tool = Tool::Shell {
1259            cmds: CmdsList::Multiple(vec!["echo one".into(), "echo two".into()]),
1260        };
1261        let cfg = to_tools_config(&tool);
1262        assert_eq!(cfg.kind, "shell");
1263        assert_eq!(cfg.config["command"], "echo one\necho two");
1264    }
1265
1266    #[test]
1267    fn shell_command_config_emits_per_cmd_shape() {
1268        let cfg = shell_command_config("echo hi");
1269        assert_eq!(cfg.kind, "shell");
1270        assert_eq!(cfg.config["command"], "echo hi");
1271        assert_eq!(cfg.config["shell"], "bash");
1272        assert_eq!(cfg.config["capture"], true);
1273    }
1274
1275    #[test]
1276    fn to_tools_config_http_round_trips_essentials() {
1277        let tool = Tool::Http {
1278            method: "post".into(), // lowercase to verify uppercasing
1279            url: "https://example.com/api".into(),
1280            headers: HashMap::new(),
1281            params: HashMap::new(),
1282            body: Some(r#"{"k":"v"}"#.into()),
1283            auth: None,
1284        };
1285        let cfg = to_tools_config(&tool);
1286        assert_eq!(cfg.kind, "http");
1287        // noetl-tools' HttpConfig.method deserializes via
1288        // #[serde(rename_all = "UPPERCASE")] so the bridge always
1289        // uppercases the CLI's method string.
1290        assert_eq!(cfg.config["method"], "POST");
1291        assert_eq!(cfg.config["url"], "https://example.com/api");
1292        // JSON bodies are parsed into a JSON Value so reqwest
1293        // serialises them with Content-Type: application/json.
1294        assert_eq!(cfg.config["body"], serde_json::json!({"k": "v"}));
1295    }
1296
1297    #[test]
1298    fn to_tools_config_http_keeps_non_json_body_as_string() {
1299        let tool = Tool::Http {
1300            method: "POST".into(),
1301            url: "https://example.com".into(),
1302            headers: HashMap::new(),
1303            params: HashMap::new(),
1304            body: Some("not json at all".into()),
1305            auth: None,
1306        };
1307        let cfg = to_tools_config(&tool);
1308        assert_eq!(cfg.config["body"], "not json at all");
1309    }
1310
1311    #[test]
1312    fn http_body_value_parses_json_strings() {
1313        let v = http_body_value(r#"{"a":1}"#);
1314        assert_eq!(v, serde_json::json!({"a": 1}));
1315    }
1316
1317    #[test]
1318    fn http_body_value_falls_back_to_string() {
1319        let v = http_body_value("plain text body");
1320        assert_eq!(v, serde_json::Value::String("plain text body".into()));
1321    }
1322
1323    #[test]
1324    fn http_tool_config_injects_bearer_header() {
1325        let cfg = http_tool_config(
1326            "GET",
1327            "https://example.com",
1328            &HashMap::new(),
1329            &HashMap::new(),
1330            None,
1331            Some("test-token-123"),
1332        );
1333        assert_eq!(cfg.kind, "http");
1334        assert_eq!(
1335            cfg.config["headers"]["Authorization"],
1336            "Bearer test-token-123"
1337        );
1338    }
1339
1340    #[test]
1341    fn http_tool_config_preserves_caller_headers_with_bearer() {
1342        let mut hdrs = HashMap::new();
1343        hdrs.insert("X-Trace-Id".into(), "abc123".into());
1344        let cfg = http_tool_config(
1345            "POST",
1346            "https://example.com",
1347            &hdrs,
1348            &HashMap::new(),
1349            None,
1350            Some("token"),
1351        );
1352        assert_eq!(cfg.config["headers"]["X-Trace-Id"], "abc123");
1353        assert_eq!(cfg.config["headers"]["Authorization"], "Bearer token");
1354    }
1355
1356    #[test]
1357    fn http_tool_config_no_auth_omits_authorization_header() {
1358        let cfg = http_tool_config(
1359            "GET",
1360            "https://example.com",
1361            &HashMap::new(),
1362            &HashMap::new(),
1363            None,
1364            None,
1365        );
1366        let hdrs = cfg.config["headers"].as_object().unwrap();
1367        assert!(!hdrs.contains_key("Authorization"));
1368    }
1369
1370    #[test]
1371    fn reshape_http_result_extracts_envelope() {
1372        let mut result = ToolResult::success(serde_json::json!({
1373            "status_code": 200,
1374            "headers": {},
1375            "body": {"ok": true},
1376        }));
1377        result.exit_code = Some(0);
1378        let outcome = reshape_http_result(result).unwrap();
1379        let parsed: serde_json::Value =
1380            serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1381        assert_eq!(parsed["status"], 200);
1382        assert_eq!(parsed["body"], serde_json::json!({"ok": true}));
1383    }
1384
1385    #[test]
1386    fn reshape_http_result_preserves_4xx_envelope_without_erroring() {
1387        // CLI contract: HTTP error statuses come back inside the
1388        // `{status, body}` envelope, NOT as anyhow::Error.  Only
1389        // network-transport failures bubble up.
1390        let mut result = ToolResult {
1391            status: ToolStatus::Error,
1392            data: Some(serde_json::json!({
1393                "status_code": 404,
1394                "headers": {},
1395                "body": {"error": "not found"},
1396            })),
1397            error: Some("HTTP 404 response".into()),
1398            stdout: None,
1399            stderr: None,
1400            exit_code: Some(1),
1401            duration_ms: Some(5),
1402            pending_callback: None,
1403        };
1404        result.exit_code = Some(1);
1405        let outcome = reshape_http_result(result).unwrap();
1406        let parsed: serde_json::Value =
1407            serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1408        assert_eq!(parsed["status"], 404);
1409        assert_eq!(parsed["body"], serde_json::json!({"error": "not found"}));
1410    }
1411
1412    #[tokio::test]
1413    async fn resolve_auth_to_bearer_rejects_unknown_provider() {
1414        let cfg = CliAuthConfig {
1415            provider: "azure".into(),
1416            scopes: vec![],
1417        };
1418        let err = resolve_auth_to_bearer(&cfg).await.unwrap_err();
1419        assert!(err.to_string().contains("unsupported auth provider"));
1420    }
1421
1422    // ---- PR-2c-6 — Tool::DuckDb bridge integration -------------------
1423
1424    #[test]
1425    fn duckdb_tool_config_emits_noetl_tools_schema() {
1426        let cfg = duckdb_tool_config(
1427            ":memory:",
1428            "SELECT 1",
1429            &["arg1".to_string()],
1430        );
1431        assert_eq!(cfg.kind, "duckdb");
1432        assert_eq!(cfg.config["db_path"], ":memory:");
1433        assert_eq!(cfg.config["query"], "SELECT 1");
1434        assert_eq!(cfg.config["as_objects"], true);
1435        assert_eq!(
1436            cfg.config["params"],
1437            serde_json::json!([serde_json::Value::String("arg1".into())])
1438        );
1439    }
1440
1441    #[test]
1442    fn to_tools_config_duckdb_carries_path_and_query() {
1443        let tool = Tool::DuckDb {
1444            db: "warehouse.db".into(),
1445            query: Some("SELECT count(*) FROM orders".into()),
1446            params: vec![],
1447        };
1448        let cfg = to_tools_config(&tool);
1449        assert_eq!(cfg.kind, "duckdb");
1450        assert_eq!(cfg.config["db_path"], "warehouse.db");
1451        assert_eq!(cfg.config["query"], "SELECT count(*) FROM orders");
1452        assert_eq!(cfg.config["as_objects"], true);
1453    }
1454
1455    #[test]
1456    fn to_tools_config_duckdb_missing_query_becomes_empty_string() {
1457        let tool = Tool::DuckDb {
1458            db: ":memory:".into(),
1459            query: None,
1460            params: vec![],
1461        };
1462        let cfg = to_tools_config(&tool);
1463        assert_eq!(cfg.config["query"], "");
1464    }
1465
1466    #[test]
1467    fn reshape_duckdb_result_select_returns_rows_array() {
1468        let result = ToolResult::success(serde_json::json!({
1469            "columns": ["id", "name"],
1470            "rows": [
1471                {"id": 1, "name": "alice"},
1472                {"id": 2, "name": "bob"},
1473            ],
1474            "row_count": 2
1475        }));
1476        let outcome = reshape_duckdb_result(result).unwrap();
1477        let parsed: serde_json::Value =
1478            serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1479        let arr = parsed.as_array().expect("result is an array");
1480        assert_eq!(arr.len(), 2);
1481        assert_eq!(arr[0]["id"], 1);
1482        assert_eq!(arr[0]["name"], "alice");
1483        assert_eq!(arr[1]["name"], "bob");
1484    }
1485
1486    #[test]
1487    fn reshape_duckdb_result_select_empty_returns_empty_array() {
1488        let result = ToolResult::success(serde_json::json!({
1489            "columns": ["id"],
1490            "rows": [],
1491            "row_count": 0
1492        }));
1493        let outcome = reshape_duckdb_result(result).unwrap();
1494        let parsed: serde_json::Value =
1495            serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1496        assert_eq!(parsed.as_array().unwrap().len(), 0);
1497    }
1498
1499    #[test]
1500    fn reshape_duckdb_result_non_select_returns_status_envelope() {
1501        let result = ToolResult::success(serde_json::json!({
1502            "affected_rows": 3
1503        }));
1504        let outcome = reshape_duckdb_result(result).unwrap();
1505        // CLI returned the literal `{"status": "ok"}` string for
1506        // non-SELECT queries; `affected_rows` is intentionally
1507        // dropped (CLI never exposed it, so playbooks can't depend
1508        // on it).
1509        assert_eq!(outcome.result.as_deref(), Some(r#"{"status": "ok"}"#));
1510    }
1511
1512    #[tokio::test]
1513    async fn dispatch_duckdb_select_returns_rows_array() {
1514        let vars = empty_vars();
1515        let bridge = bridge_ctx(&vars);
1516        let tool = Tool::DuckDb {
1517            db: ":memory:".into(),
1518            query: Some("SELECT 1 AS num, 'hello' AS msg".into()),
1519            params: vec![],
1520        };
1521        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1522        let parsed: serde_json::Value =
1523            serde_json::from_str(outcome.result.as_deref().unwrap()).unwrap();
1524        let arr = parsed.as_array().expect("result is an array");
1525        assert_eq!(arr.len(), 1);
1526        assert_eq!(arr[0]["num"], 1);
1527        assert_eq!(arr[0]["msg"], "hello");
1528    }
1529
1530    #[tokio::test]
1531    async fn dispatch_duckdb_missing_query_returns_empty_outcome() {
1532        // Mirrors the CLI arm's `if let Some(query_str) = query` guard:
1533        // a Tool::DuckDb with no query falls through to None.
1534        let vars = empty_vars();
1535        let bridge = bridge_ctx(&vars);
1536        let tool = Tool::DuckDb {
1537            db: ":memory:".into(),
1538            query: None,
1539            params: vec![],
1540        };
1541        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1542        assert!(outcome.result.is_none());
1543    }
1544
1545    #[tokio::test]
1546    async fn dispatch_duckdb_empty_query_returns_empty_outcome() {
1547        let vars = empty_vars();
1548        let bridge = bridge_ctx(&vars);
1549        let tool = Tool::DuckDb {
1550            db: ":memory:".into(),
1551            query: Some("   ".into()), // whitespace only
1552            params: vec![],
1553        };
1554        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1555        assert!(outcome.result.is_none());
1556    }
1557
1558    // ---- PR-2c-7 — sub-playbook variable preparation ------------------
1559
1560    #[test]
1561    fn prepare_sub_playbook_vars_passes_parent_vars_through() {
1562        let parent: HashMap<String, String> =
1563            [("vars.timeout".into(), "30".into())].into();
1564        let sub = prepare_sub_playbook_vars(
1565            &parent,
1566            &HashMap::new(),
1567            &HashMap::new(),
1568            |t| Ok(t.to_string()),
1569        )
1570        .unwrap();
1571        assert_eq!(sub.get("vars.timeout"), Some(&"30".to_string()));
1572    }
1573
1574    #[test]
1575    fn prepare_sub_playbook_vars_v2_input_takes_precedence_over_v1_args() {
1576        let parent: HashMap<String, String> = HashMap::new();
1577        let mut input = HashMap::new();
1578        input.insert(
1579            "region".into(),
1580            serde_yaml::Value::String("us-east-1".into()),
1581        );
1582        let mut args = HashMap::new();
1583        args.insert("region".into(), "us-west-1".into());
1584
1585        let sub = prepare_sub_playbook_vars(&parent, &args, &input, |t| {
1586            Ok(t.to_string())
1587        })
1588        .unwrap();
1589        // input wins; args ignored when input is non-empty.
1590        assert_eq!(sub.get("workload.region"), Some(&"us-east-1".to_string()));
1591    }
1592
1593    #[test]
1594    fn prepare_sub_playbook_vars_v1_args_used_when_input_empty() {
1595        let parent: HashMap<String, String> = HashMap::new();
1596        let mut args = HashMap::new();
1597        args.insert("tier".into(), "prod".into());
1598        let sub = prepare_sub_playbook_vars(
1599            &parent,
1600            &args,
1601            &HashMap::new(),
1602            |t| Ok(t.to_string()),
1603        )
1604        .unwrap();
1605        assert_eq!(sub.get("workload.tier"), Some(&"prod".to_string()));
1606    }
1607
1608    #[test]
1609    fn prepare_sub_playbook_vars_renders_input_templates() {
1610        let parent: HashMap<String, String> = HashMap::new();
1611        let mut input = HashMap::new();
1612        input.insert(
1613            "url".into(),
1614            serde_yaml::Value::String("{{base}}/api".into()),
1615        );
1616        let sub = prepare_sub_playbook_vars(
1617            &parent,
1618            &HashMap::new(),
1619            &input,
1620            |t| Ok(t.replace("{{base}}", "https://example.com")),
1621        )
1622        .unwrap();
1623        assert_eq!(
1624            sub.get("workload.url"),
1625            Some(&"https://example.com/api".to_string())
1626        );
1627    }
1628
1629    #[test]
1630    fn prepare_sub_playbook_vars_coerces_yaml_numbers_and_bools() {
1631        let parent: HashMap<String, String> = HashMap::new();
1632        let mut input = HashMap::new();
1633        input.insert(
1634            "timeout".into(),
1635            serde_yaml::Value::Number(serde_yaml::Number::from(30)),
1636        );
1637        input.insert("verbose".into(), serde_yaml::Value::Bool(true));
1638        let sub = prepare_sub_playbook_vars(
1639            &parent,
1640            &HashMap::new(),
1641            &input,
1642            |t| Ok(t.to_string()),
1643        )
1644        .unwrap();
1645        assert_eq!(sub.get("workload.timeout"), Some(&"30".to_string()));
1646        assert_eq!(sub.get("workload.verbose"), Some(&"true".to_string()));
1647    }
1648
1649    #[test]
1650    fn prepare_sub_playbook_vars_passes_through_when_both_empty() {
1651        let parent: HashMap<String, String> = [(
1652            "workload.region".into(),
1653            "us-east-1".into(),
1654        )]
1655        .into();
1656        let sub = prepare_sub_playbook_vars(
1657            &parent,
1658            &HashMap::new(),
1659            &HashMap::new(),
1660            |t| Ok(t.to_string()),
1661        )
1662        .unwrap();
1663        // No input or args; parent vars come through unchanged.
1664        assert_eq!(sub.len(), 1);
1665        assert_eq!(
1666            sub.get("workload.region"),
1667            Some(&"us-east-1".to_string())
1668        );
1669    }
1670
1671    #[test]
1672    fn prepare_sub_playbook_vars_render_error_propagates() {
1673        let parent: HashMap<String, String> = HashMap::new();
1674        let mut input = HashMap::new();
1675        input.insert(
1676            "bad".into(),
1677            serde_yaml::Value::String("{{nope}}".into()),
1678        );
1679        let result = prepare_sub_playbook_vars(
1680            &parent,
1681            &HashMap::new(),
1682            &input,
1683            |_| Err(anyhow::anyhow!("render exploded")),
1684        );
1685        assert!(result.unwrap_err().to_string().contains("render exploded"));
1686    }
1687
1688    // ---- PR-2c-8 — Tool::Auth context updates -------------------------
1689
1690    #[test]
1691    fn auth_context_updates_includes_token_and_provider() {
1692        let updates = auth_context_updates("gcp", "tok-123", None);
1693        let map: HashMap<String, String> = updates.into_iter().collect();
1694        assert_eq!(map.get("auth.token"), Some(&"tok-123".to_string()));
1695        assert_eq!(map.get("auth.provider"), Some(&"gcp".to_string()));
1696        assert!(map.get("auth.project").is_none());
1697    }
1698
1699    #[test]
1700    fn auth_context_updates_includes_project_when_set() {
1701        let updates = auth_context_updates("adc", "t", Some("my-project"));
1702        let map: HashMap<String, String> = updates.into_iter().collect();
1703        assert_eq!(
1704            map.get("auth.project"),
1705            Some(&"my-project".to_string())
1706        );
1707        assert_eq!(map.get("auth.token"), Some(&"t".to_string()));
1708        assert_eq!(map.get("auth.provider"), Some(&"adc".to_string()));
1709    }
1710
1711    #[test]
1712    fn auth_context_updates_skips_empty_project() {
1713        let updates = auth_context_updates("gcp", "t", Some(""));
1714        let map: HashMap<String, String> = updates.into_iter().collect();
1715        assert!(map.get("auth.project").is_none());
1716    }
1717
1718    #[test]
1719    fn auth_context_updates_orders_project_before_token() {
1720        // The CLI's pre-PR-2c-8 inline arm set `auth.project` first,
1721        // then the token + provider after the auth call.  Preserve
1722        // that ordering so observable side-effects (logs, traces)
1723        // match.
1724        let updates = auth_context_updates("gcp", "t", Some("p"));
1725        assert_eq!(updates[0].0, "auth.project");
1726        assert_eq!(updates[1].0, "auth.token");
1727        assert_eq!(updates[2].0, "auth.provider");
1728    }
1729
1730    // ---- PR-2c-8 — Sink payload formatting + CSV ----------------------
1731
1732    #[test]
1733    fn format_sink_payload_json_passthrough() {
1734        let raw = r#"{"k": "v"}"#;
1735        let out = format_sink_payload(&SinkFormat::Json, raw).unwrap();
1736        assert_eq!(out, raw);
1737    }
1738
1739    #[test]
1740    fn format_sink_payload_yaml_converts_json_object() {
1741        let raw = r#"{"k": "v"}"#;
1742        let out = format_sink_payload(&SinkFormat::Yaml, raw).unwrap();
1743        let reparsed: serde_yaml::Value = serde_yaml::from_str(&out).unwrap();
1744        assert_eq!(reparsed["k"].as_str(), Some("v"));
1745    }
1746
1747    #[test]
1748    fn format_sink_payload_yaml_falls_back_when_not_json() {
1749        let raw = "not even close to json";
1750        let out = format_sink_payload(&SinkFormat::Yaml, raw).unwrap();
1751        assert_eq!(out, raw);
1752    }
1753
1754    #[test]
1755    fn format_sink_payload_csv_uses_json_to_csv() {
1756        let raw = r#"[{"a":1,"b":2},{"a":3,"b":4}]"#;
1757        let out = format_sink_payload(&SinkFormat::Csv, raw).unwrap();
1758        assert!(out.contains("a,b\n") || out.contains("b,a\n"));
1759        // Two data rows + header.
1760        assert_eq!(out.lines().count(), 3);
1761    }
1762
1763    #[test]
1764    fn json_to_csv_returns_input_for_non_array() {
1765        assert_eq!(json_to_csv("not json").unwrap(), "not json");
1766        assert_eq!(json_to_csv(r#"{"k":"v"}"#).unwrap(), r#"{"k":"v"}"#);
1767    }
1768
1769    #[test]
1770    fn json_to_csv_returns_input_for_empty_array() {
1771        assert_eq!(json_to_csv("[]").unwrap(), "[]");
1772    }
1773
1774    #[test]
1775    fn json_to_csv_emits_header_and_rows_for_object_array() {
1776        let raw = r#"[{"name":"alice","age":30},{"name":"bob","age":25}]"#;
1777        let csv = json_to_csv(raw).unwrap();
1778        let lines: Vec<&str> = csv.lines().collect();
1779        assert_eq!(lines.len(), 3);
1780        // Header derived from first object's keys (order
1781        // preserved by serde_json::Map).
1782        assert!(lines[0] == "name,age" || lines[0] == "age,name");
1783        // Each subsequent line should contain both values.
1784        assert!(lines[1].contains("alice") && lines[1].contains("30"));
1785        assert!(lines[2].contains("bob") && lines[2].contains("25"));
1786    }
1787
1788    #[test]
1789    fn json_to_csv_quotes_strings_with_commas() {
1790        let raw = r#"[{"label":"a, b","n":1}]"#;
1791        let csv = json_to_csv(raw).unwrap();
1792        // Quoted field with the comma preserved inside.
1793        assert!(csv.contains("\"a, b\""), "csv: {csv}");
1794    }
1795
1796    #[test]
1797    fn json_to_csv_doubles_embedded_quotes() {
1798        let raw = r#"[{"q":"she said \"hi\""}]"#;
1799        let csv = json_to_csv(raw).unwrap();
1800        // RFC-4180-style: embedded `"` doubled, whole field quoted.
1801        assert!(csv.contains("\"she said \"\"hi\"\"\""), "csv: {csv}");
1802    }
1803
1804    #[test]
1805    fn json_to_csv_missing_field_emits_empty() {
1806        let raw = r#"[{"a":1,"b":2},{"a":3}]"#; // second row missing `b`
1807        let csv = json_to_csv(raw).unwrap();
1808        let lines: Vec<&str> = csv.lines().collect();
1809        // The second data row should end with a trailing comma or
1810        // have an empty field for `b`.
1811        assert!(
1812            lines[2].ends_with(",") || lines[2].contains(",,"),
1813            "csv: {csv}"
1814        );
1815    }
1816
1817    #[test]
1818    fn to_tools_config_rhai_carries_code() {
1819        let tool = Tool::Rhai {
1820            code: "let x = 1; x + 1".into(),
1821            args: HashMap::new(),
1822        };
1823        let cfg = to_tools_config(&tool);
1824        assert_eq!(cfg.kind, "rhai");
1825        assert_eq!(cfg.config["code"], "let x = 1; x + 1");
1826    }
1827
1828    #[test]
1829    fn to_tools_config_sink_emits_typed_target() {
1830        let tool = Tool::Sink {
1831            target: SinkTarget::File {
1832                path: "/tmp/out.json".into(),
1833            },
1834            format: SinkFormat::Json,
1835        };
1836        let cfg = to_tools_config(&tool);
1837        assert_eq!(cfg.kind, "sink");
1838        assert_eq!(cfg.config["target"]["type"], "file");
1839        assert_eq!(cfg.config["target"]["path"], "/tmp/out.json");
1840        assert_eq!(cfg.config["format"], "json");
1841    }
1842
1843    #[test]
1844    fn from_tools_result_success_returns_data_string() {
1845        let result = ToolResult::success(serde_json::Value::String("hello".into()));
1846        let outcome = from_tools_result(result).unwrap();
1847        assert_eq!(outcome.result, Some("hello".into()));
1848    }
1849
1850    #[test]
1851    fn from_tools_result_success_serialises_non_string_data() {
1852        let result = ToolResult::success(serde_json::json!({"k": "v"}));
1853        let outcome = from_tools_result(result).unwrap();
1854        assert_eq!(outcome.result, Some(r#"{"k":"v"}"#.into()));
1855    }
1856
1857    #[test]
1858    fn from_tools_result_success_falls_back_to_stdout() {
1859        let mut result = ToolResult::success(serde_json::Value::Null);
1860        result.data = None;
1861        result.stdout = Some("script output".into());
1862        let outcome = from_tools_result(result).unwrap();
1863        assert_eq!(outcome.result, Some("script output".into()));
1864    }
1865
1866    #[test]
1867    fn from_tools_result_error_propagates_message() {
1868        let result = ToolResult::error("connection refused");
1869        let err = from_tools_result(result).unwrap_err();
1870        assert!(err.to_string().contains("connection refused"));
1871    }
1872
1873    // PR-2c-8 removed the
1874    // `dispatch_via_registry_returns_empty_for_unwired_kind` test:
1875    // every Tool variant now either dispatches through the registry
1876    // (Rhai/Shell/Http/DuckDb), bails with a § H.10 finding
1877    // (Playbook/Auth/Sink), or bails as unsupported.  See the
1878    // per-variant dispatch tests for the wired kinds and the bail
1879    // tests for Playbook/Auth/Sink/Unsupported.
1880
1881    #[tokio::test]
1882    async fn dispatch_auth_bails_pointing_at_helper() {
1883        // PR-2c-8: Tool::Auth has no bridge dispatch path.  The
1884        // bridge bails with a message pointing at
1885        // `resolve_auth_to_bearer` + `auth_context_updates` so
1886        // misuse is loud rather than silent.
1887        let vars = empty_vars();
1888        let bridge = bridge_ctx(&vars);
1889        let tool = Tool::Auth {
1890            provider: "adc".into(),
1891            scopes: vec![],
1892            project: None,
1893        };
1894        let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1895        let msg = err.to_string();
1896        assert!(
1897            msg.contains("Tool::Auth")
1898                && msg.contains("resolve_auth_to_bearer")
1899                && msg.contains("auth_context_updates"),
1900            "error should point at the helpers: {msg}"
1901        );
1902    }
1903
1904    #[tokio::test]
1905    async fn dispatch_sink_bails_pointing_at_helper() {
1906        // PR-2c-8: Tool::Sink has no bridge dispatch path either —
1907        // noetl-tools' TransferTool is database-to-database only.
1908        // The bridge bails with a message pointing at
1909        // `format_sink_payload` for format conversion.
1910        let vars = empty_vars();
1911        let bridge = bridge_ctx(&vars);
1912        let tool = Tool::Sink {
1913            target: crate::playbook::SinkTarget::File {
1914                path: "/tmp/out.json".into(),
1915            },
1916            format: SinkFormat::Json,
1917        };
1918        let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1919        let msg = err.to_string();
1920        assert!(
1921            msg.contains("Tool::Sink") && msg.contains("format_sink_payload"),
1922            "error should point at the helper: {msg}"
1923        );
1924    }
1925
1926    #[tokio::test]
1927    async fn dispatch_playbook_bails_with_h10_finding() {
1928        // PR-2c-7: `Tool::Playbook` is not bridgeable.  Make sure
1929        // the dispatch arm bails with a descriptive error rather
1930        // than silently returning an empty outcome.
1931        let vars = empty_vars();
1932        let bridge = bridge_ctx(&vars);
1933        let tool = Tool::Playbook {
1934            path: "sub.yaml".into(),
1935            args: HashMap::new(),
1936            input: HashMap::new(),
1937        };
1938        let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1939        let msg = err.to_string();
1940        assert!(
1941            msg.contains("Tool::Playbook")
1942                && msg.contains("not bridgeable")
1943                && msg.contains("§ H.10"),
1944            "error message should explain the § H.10 finding: {msg}"
1945        );
1946    }
1947
1948    // ---- PR-2c-4 — Tool::Shell bridge integration --------------------
1949
1950    #[tokio::test]
1951    async fn dispatch_shell_single_command_returns_stdout() {
1952        let vars = empty_vars();
1953        let bridge = bridge_ctx(&vars);
1954        let tool = Tool::Shell {
1955            cmds: CmdsList::Single("echo bridged".into()),
1956        };
1957        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1958        // The bridge trims the trailing newline that `echo` adds so
1959        // the step result matches the CLI's pre-PR-2c-4 contract
1960        // (per-line stdout joined without trailing whitespace).
1961        assert_eq!(outcome.result, Some("bridged".into()));
1962    }
1963
1964    #[tokio::test]
1965    async fn dispatch_shell_multiple_returns_last_command_stdout() {
1966        // CLI semantic: with CmdsList::Multiple, each command runs
1967        // in its own bash invocation; the step result is the last
1968        // command's stdout.
1969        let vars = empty_vars();
1970        let bridge = bridge_ctx(&vars);
1971        let tool = Tool::Shell {
1972            cmds: CmdsList::Multiple(vec![
1973                "echo first".into(),
1974                "echo second".into(),
1975                "echo third".into(),
1976            ]),
1977        };
1978        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
1979        assert_eq!(outcome.result, Some("third".into()));
1980    }
1981
1982    #[tokio::test]
1983    async fn dispatch_shell_failure_propagates_error() {
1984        let vars = empty_vars();
1985        let bridge = bridge_ctx(&vars);
1986        let tool = Tool::Shell {
1987            cmds: CmdsList::Single("exit 7".into()),
1988        };
1989        let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
1990        // noetl-tools' shell tool reports non-zero exit codes by
1991        // surfacing ToolResult.status == Error or by returning
1992        // result with exit_code set; either way the bridge's
1993        // from_tools_result converts that into an anyhow::Error.
1994        assert!(
1995            err.to_string().contains("shell")
1996                || err.to_string().contains("exit")
1997                || err.to_string().contains("failed"),
1998            "error message: {}",
1999            err
2000        );
2001    }
2002
2003    #[tokio::test]
2004    async fn dispatch_shell_single_with_newlines_runs_each_line_independently() {
2005        // CLI semantic: CmdsList::Single splits on newlines into
2006        // separate bash invocations.  This means `cd /tmp` on one
2007        // line doesn't change the cwd of the next line.
2008        let vars = empty_vars();
2009        let bridge = bridge_ctx(&vars);
2010        let tool = Tool::Shell {
2011            cmds: CmdsList::Single("echo first_line\necho second_line".into()),
2012        };
2013        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2014        assert_eq!(outcome.result, Some("second_line".into()));
2015    }
2016
2017    #[tokio::test]
2018    async fn dispatch_via_registry_unsupported_errors() {
2019        let vars = empty_vars();
2020        let bridge = bridge_ctx(&vars);
2021        let tool = Tool::Unsupported;
2022        let err = dispatch_via_registry(&tool, &bridge).await.unwrap_err();
2023        assert!(err.to_string().contains("unsupported"));
2024    }
2025
2026    // ---- PR-2c-3 — Tool::Rhai bridge integration ---------------------
2027
2028    #[tokio::test]
2029    async fn dispatch_rhai_evaluates_simple_arithmetic() {
2030        let vars = empty_vars();
2031        let bridge = bridge_ctx(&vars);
2032        let tool = Tool::Rhai {
2033            code: "let x = 40; let y = 2; (x + y).to_string()".into(),
2034            args: HashMap::new(),
2035        };
2036        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2037        assert_eq!(outcome.result, Some("42".into()));
2038    }
2039
2040    #[tokio::test]
2041    async fn dispatch_rhai_reads_workload_variable_via_scope() {
2042        // `to_tools_context_for_rhai` groups the CLI's flat
2043        // `workload.region` key into a nested `workload` Map.
2044        // Rhai's `workload.region` then resolves as field access.
2045        let vars: HashMap<String, String> =
2046            [("workload.region".into(), "us-west-1".into())].into();
2047        let bridge = bridge_ctx(&vars);
2048        let tool = Tool::Rhai {
2049            code: r#"workload.region.to_string()"#.into(),
2050            args: HashMap::new(),
2051        };
2052        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2053        assert_eq!(outcome.result, Some("us-west-1".into()));
2054    }
2055
2056    #[tokio::test]
2057    async fn dispatch_rhai_reads_step_result_via_field_access() {
2058        // Step results in the CLI surface as `<step>.result` keys.
2059        // The nested-shape adapter groups them under a step-named map.
2060        let vars: HashMap<String, String> = [
2061            ("check_health.result".into(), "ok".into()),
2062            ("check_health.status".into(), "200".into()),
2063        ]
2064        .into();
2065        let bridge = bridge_ctx(&vars);
2066        let tool = Tool::Rhai {
2067            code: r#"check_health.result.to_string()"#.into(),
2068            args: HashMap::new(),
2069        };
2070        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2071        assert_eq!(outcome.result, Some("ok".into()));
2072    }
2073
2074    #[test]
2075    fn to_tools_context_for_rhai_groups_workload_prefix() {
2076        let vars: HashMap<String, String> = [
2077            ("workload.region".into(), "us-west-1".into()),
2078            ("workload.tier".into(), "prod".into()),
2079            ("vars.timeout".into(), "30".into()),
2080            ("step_a.result".into(), "done".into()),
2081            ("toplevel".into(), "kept_at_root".into()),
2082        ]
2083        .into();
2084        let bridge = bridge_ctx(&vars);
2085        let ctx = to_tools_context_for_rhai(&bridge);
2086
2087        let workload = ctx
2088            .variables
2089            .get("workload")
2090            .expect("workload group should exist")
2091            .as_object()
2092            .expect("workload should be an object");
2093        assert_eq!(workload.get("region"), Some(&serde_json::json!("us-west-1")));
2094        assert_eq!(workload.get("tier"), Some(&serde_json::json!("prod")));
2095
2096        let vars_map = ctx.variables.get("vars").and_then(|v| v.as_object()).unwrap();
2097        assert_eq!(vars_map.get("timeout"), Some(&serde_json::json!("30")));
2098
2099        let step_a = ctx.variables.get("step_a").and_then(|v| v.as_object()).unwrap();
2100        assert_eq!(step_a.get("result"), Some(&serde_json::json!("done")));
2101
2102        assert_eq!(
2103            ctx.variables.get("toplevel"),
2104            Some(&serde_json::json!("kept_at_root"))
2105        );
2106    }
2107
2108    #[tokio::test]
2109    async fn dispatch_rhai_string_literal_returns_unquoted() {
2110        let vars = empty_vars();
2111        let bridge = bridge_ctx(&vars);
2112        let tool = Tool::Rhai {
2113            code: r#""hello world""#.into(),
2114            args: HashMap::new(),
2115        };
2116        let outcome = dispatch_via_registry(&tool, &bridge).await.unwrap();
2117        // noetl-tools' RhaiTool returns the result through ToolResult.data
2118        // as a JSON value; for string results that means a JSON-quoted
2119        // string.  from_tools_result strips the JSON quotes when data
2120        // is a Value::String.
2121        assert_eq!(outcome.result, Some("hello world".into()));
2122    }
2123
2124    // ---- Compiler proof: AuthConfig from playbook is still constructable
2125    // even though we don't pass it through to the bridge yet.  Locks in
2126    // the field surface so PR-2c-5 / PR-2c-8 see a deliberate gap, not
2127    // a missing type.
2128    #[test]
2129    fn cli_auth_config_constructs() {
2130        let _auth = CliAuthConfig {
2131            provider: "adc".into(),
2132            scopes: vec!["https://www.googleapis.com/auth/cloud-platform".into()],
2133        };
2134    }
2135
2136    // ---- gcs_upload helper (R-3, noetl/ai-meta#31) ------------------
2137    //
2138    // These tests exercise `gcs_upload_with_store` — the inner path
2139    // shared by production (real GCS) and test (InMemory) callers.
2140    // The `gcs_upload` function (which builds the real GCS store from
2141    // env) is NOT tested here — real GCS credentials are not available
2142    // in CI.  The call shape (bucket → builder → store → put) is the
2143    // same in both paths; the InMemory tests lock in the object_store
2144    // API surface and the helper's error-handling contract.
2145
2146    #[tokio::test]
2147    async fn gcs_upload_with_store_writes_data_to_object_store() {
2148        // Verifies the happy path: data is uploaded and can be read
2149        // back from the same InMemory store — proving gcs_upload_with_store
2150        // calls ObjectStore::put with the correct path + payload.
2151        use object_store::memory::InMemory;
2152        use object_store::ObjectStore;
2153
2154        let store = Arc::new(InMemory::new());
2155        gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "output/data.json", r#"{"k":"v"}"#)
2156            .await
2157            .expect("upload should succeed");
2158
2159        let path = StorePath::from("output/data.json");
2160        let retrieved = store.get(&path).await.expect("should read back uploaded object");
2161        let body = retrieved.bytes().await.expect("should get bytes");
2162        assert_eq!(body, bytes::Bytes::from(r#"{"k":"v"}"#));
2163    }
2164
2165    #[tokio::test]
2166    async fn gcs_upload_with_store_overwrites_existing_key() {
2167        // Second upload to the same key must overwrite the first — the
2168        // InMemory store's put is idempotent on the key, which is the
2169        // same contract the real GCS object-level PUT provides.
2170        use object_store::memory::InMemory;
2171        use object_store::ObjectStore;
2172
2173        let store = Arc::new(InMemory::new());
2174        gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "data.csv", "first").await.unwrap();
2175        gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "data.csv", "second").await.unwrap();
2176
2177        let path = StorePath::from("data.csv");
2178        let body = store.get(&path).await.unwrap().bytes().await.unwrap();
2179        assert_eq!(body, bytes::Bytes::from("second"));
2180    }
2181
2182    #[tokio::test]
2183    async fn gcs_upload_with_store_handles_nested_key_paths() {
2184        // GCS object keys can contain slashes (they are logical paths
2185        // within a bucket, not filesystem paths).  StorePath should
2186        // preserve the full slash-separated key.
2187        use object_store::memory::InMemory;
2188        use object_store::ObjectStore;
2189
2190        let store = Arc::new(InMemory::new());
2191        gcs_upload_with_store(
2192            Arc::clone(&store) as Arc<dyn ObjectStore>,
2193            "runs/2026-06-01/output/result.json",
2194            "[]",
2195        )
2196        .await
2197        .unwrap();
2198
2199        let path = StorePath::from("runs/2026-06-01/output/result.json");
2200        let body = store.get(&path).await.unwrap().bytes().await.unwrap();
2201        assert_eq!(body, bytes::Bytes::from("[]"));
2202    }
2203
2204    #[tokio::test]
2205    async fn gcs_upload_with_store_uploads_empty_string() {
2206        // An empty payload is a valid GCS object — the helper must not
2207        // short-circuit or error on empty data.
2208        use object_store::memory::InMemory;
2209        use object_store::ObjectStore;
2210
2211        let store = Arc::new(InMemory::new());
2212        gcs_upload_with_store(Arc::clone(&store) as Arc<dyn ObjectStore>, "empty.txt", "").await.unwrap();
2213
2214        let path = StorePath::from("empty.txt");
2215        let body = store.get(&path).await.unwrap().bytes().await.unwrap();
2216        assert_eq!(body.len(), 0);
2217    }
2218}