use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use crate::context::ExecutionContext;
use crate::error::ToolError;
use crate::registry::{Tool, ToolConfig};
use crate::result::ToolResult;
use crate::template::TemplateEngine;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultFetchConfig {
pub r#ref: String,
#[serde(default = "default_prefer")]
pub prefer: BackendPreference,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub flight_endpoint: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum BackendPreference {
Flight,
Http,
}
fn default_prefer() -> BackendPreference {
BackendPreference::Flight
}
pub struct ResultFetchTool {
http_client: reqwest::Client,
template_engine: TemplateEngine,
}
impl ResultFetchTool {
pub fn new() -> Self {
Self {
http_client: reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default(),
template_engine: TemplateEngine::new(),
}
}
fn parse_config(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<ResultFetchConfig, ToolError> {
let template_ctx = ctx.to_template_context();
let rendered = self
.template_engine
.render_value(&config.config, &template_ctx)?;
serde_json::from_value(rendered)
.map_err(|e| ToolError::Configuration(format!("Invalid result_fetch config: {}", e)))
}
fn derive_flight_endpoint(server_url: &str) -> String {
let trimmed = server_url
.strip_prefix("https://")
.or_else(|| server_url.strip_prefix("http://"))
.unwrap_or(server_url);
let rewritten = if let Some(stripped) = trimmed.strip_suffix(":8082") {
format!("{stripped}:8083")
} else {
trimmed.to_string()
};
format!("grpc://{rewritten}")
}
async fn fetch_via_http(
&self,
cfg: &ResultFetchConfig,
ctx: &ExecutionContext,
) -> Result<JsonValue, ToolError> {
let url = format!(
"{}/api/result/resolve",
ctx.server_url.trim_end_matches('/')
);
let response = self
.http_client
.get(&url)
.query(&[("ref", cfg.r#ref.as_str())])
.send()
.await
.map_err(|e| ToolError::Http(format!("HTTP fetch failed: {e}")))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(ToolError::Http(format!(
"/api/result/resolve returned {}: {}",
status.as_u16(),
body
)));
}
let body: JsonValue = response
.json()
.await
.map_err(|e| ToolError::Http(format!("Failed to parse JSON response: {e}")))?;
Ok(body)
}
async fn fetch_via_flight(
&self,
cfg: &ResultFetchConfig,
ctx: &ExecutionContext,
) -> Result<JsonValue, FlightFetchError> {
let endpoint = cfg
.flight_endpoint
.clone()
.unwrap_or_else(|| Self::derive_flight_endpoint(&ctx.server_url));
let resolver = noetl_arrow_flight_client::FlightResolver::connect(&endpoint)
.await
.map_err(|e| {
FlightFetchError::Transport(format!("connect to Flight endpoint {endpoint}: {e}"))
})?;
match resolver.resolve_rows(&cfg.r#ref).await {
Ok(rows) => {
let columns: Vec<String> = rows
.first()
.and_then(|row| row.as_object())
.map(|obj| obj.keys().cloned().collect())
.unwrap_or_default();
Ok(serde_json::json!({
"data": {
"rows": rows,
"columns": columns,
"row_count": rows.len(),
},
"status": "success",
}))
}
Err(noetl_arrow_flight_client::FlightError::NonTabular { ref_uri, message }) => {
Err(FlightFetchError::NonTabular { ref_uri, message })
}
Err(noetl_arrow_flight_client::FlightError::Server(msg)) => {
Err(FlightFetchError::Server(msg))
}
Err(noetl_arrow_flight_client::FlightError::Transport(msg)) => {
Err(FlightFetchError::Transport(msg))
}
}
}
}
#[derive(Debug)]
enum FlightFetchError {
NonTabular { ref_uri: String, message: String },
Transport(String),
Server(String),
}
impl Default for ResultFetchTool {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Tool for ResultFetchTool {
fn name(&self) -> &'static str {
"result_fetch"
}
async fn execute(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<ToolResult, ToolError> {
let cfg = self.parse_config(config, ctx)?;
let start = std::time::Instant::now();
tracing::debug!(
ref_uri = %cfg.r#ref,
prefer = ?cfg.prefer,
server_url = %ctx.server_url,
"Executing result_fetch",
);
let data = match cfg.prefer {
BackendPreference::Http => self.fetch_via_http(&cfg, ctx).await?,
BackendPreference::Flight => {
match self.fetch_via_flight(&cfg, ctx).await {
Ok(v) => v,
Err(FlightFetchError::NonTabular { ref_uri, message }) => {
tracing::debug!(
ref_uri = %ref_uri,
message = %message,
"Flight signalled non-tabular; falling back to HTTP",
);
self.fetch_via_http(&cfg, ctx).await?
}
Err(FlightFetchError::Transport(msg)) => {
tracing::warn!(
ref_uri = %cfg.r#ref,
error = %msg,
"Flight transport failed; falling back to HTTP",
);
self.fetch_via_http(&cfg, ctx).await?
}
Err(FlightFetchError::Server(msg)) => {
return Err(ToolError::Http(format!(
"Flight server error for ref {}: {}",
cfg.r#ref, msg
)));
}
}
}
};
let duration_ms = start.elapsed().as_millis() as u64;
Ok(ToolResult::success(data).with_duration(duration_ms))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn derive_flight_endpoint_swaps_port_8082_to_8083() {
assert_eq!(
ResultFetchTool::derive_flight_endpoint("http://noetl.noetl.svc.cluster.local:8082"),
"grpc://noetl.noetl.svc.cluster.local:8083",
);
assert_eq!(
ResultFetchTool::derive_flight_endpoint("http://localhost:8082"),
"grpc://localhost:8083",
);
assert_eq!(
ResultFetchTool::derive_flight_endpoint("https://noetl.example.com:8082"),
"grpc://noetl.example.com:8083",
);
}
#[test]
fn derive_flight_endpoint_passes_through_non_8082() {
assert_eq!(
ResultFetchTool::derive_flight_endpoint("http://noetl.example.com"),
"grpc://noetl.example.com",
);
assert_eq!(
ResultFetchTool::derive_flight_endpoint("http://noetl.example.com:9000"),
"grpc://noetl.example.com:9000",
);
}
#[test]
fn default_prefer_is_flight() {
let cfg: ResultFetchConfig = serde_json::from_value(serde_json::json!({
"ref": "noetl://execution/12345/result/big_select/abcd1234",
}))
.unwrap();
assert_eq!(cfg.prefer, BackendPreference::Flight);
assert_eq!(
cfg.r#ref,
"noetl://execution/12345/result/big_select/abcd1234"
);
assert!(cfg.flight_endpoint.is_none());
}
#[test]
fn config_round_trips_http_preference() {
let cfg: ResultFetchConfig = serde_json::from_value(serde_json::json!({
"ref": "noetl://execution/1/result/x/y",
"prefer": "http",
}))
.unwrap();
assert_eq!(cfg.prefer, BackendPreference::Http);
}
#[test]
fn config_round_trips_explicit_flight_endpoint() {
let cfg: ResultFetchConfig = serde_json::from_value(serde_json::json!({
"ref": "noetl://execution/1/result/x/y",
"flight_endpoint": "grpc://other-server.example.com:9999",
}))
.unwrap();
assert_eq!(
cfg.flight_endpoint.as_deref(),
Some("grpc://other-server.example.com:9999"),
);
}
#[test]
fn tool_name_is_result_fetch() {
let tool = ResultFetchTool::new();
assert_eq!(tool.name(), "result_fetch");
}
#[test]
fn fetch_via_http_normalises_server_url_trailing_slash() {
let tool = ResultFetchTool::new();
let _ = tool.http_client; }
}