use actix_web::body::to_bytes;
use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::web::Json;
use actix_web::{HttpRequest, HttpResponse, Responder, get, post};
use athena_pipelines::{
PipelineExecutionResponse, PipelineFetchFailure, PipelineRunContext, PipelineRunOutcome,
PipelineRuntime, PipelineStepLogEntry, PipelineTemplateCatalogError,
PipelineTemplateListResponse, execute_pipeline_run, insert_pipeline_step_log,
load_pipeline_definition_map, load_pipeline_template_summary_merge, resolve_client_name,
resolve_definition,
};
use serde_json::{Value, json};
use std::collections::HashMap;
use std::time::Instant;
use tracing::{error, info, warn};
use crate::AppState;
use crate::api::gateway::auth::{
authorize_gateway_request, read_right_for_resource, write_right_for_resource,
};
use crate::api::gateway::contracts::GatewayFetchRequest;
use crate::api::gateway::fetch::{
execute_gateway_fetch_data, parse_gateway_fetch_conditions, parse_sort_options_from_body,
resolve_where_column_types, to_query_conditions_with_types,
};
use crate::api::gateway::insert::insert;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::response::{bad_request, internal_error, service_unavailable};
use crate::drivers::postgresql::sqlx_driver::insert_row;
use crate::utils::request_logging::LoggedRequest;
use crate::utils::request_logging::{log_operation_event, log_request};
pub use athena_pipelines::{
ConditionEntry, PipelineDefinition, PipelineRequest, PipelineTemplateSummary, SinkConfig,
SourceConfig, TransformConfig, load_registry_from_path, pipeline_definition_from_value,
validate_pipeline_definition,
};
fn spawn_pipeline_step_log(state: &AppState, entry: PipelineStepLogEntry) {
let Some(logging_client) = state.logging_client_name.as_ref() else {
return;
};
let Some(pool) = state.pg_registry.get_pool(logging_client) else {
return;
};
let pool = pool.clone();
actix_web::rt::spawn(async move {
if let Err(err) = insert_pipeline_step_log(&pool, entry).await {
error!(error = %err, "failed to write pipeline_step_log");
}
});
}
fn spawn_pipeline_step_logs(state: &AppState, entries: Vec<PipelineStepLogEntry>) {
for entry in entries {
spawn_pipeline_step_log(state, entry);
}
}
fn template_catalog_pool(state: &AppState) -> Result<Option<sqlx::PgPool>, HttpResponse> {
let Some(client_name) = state.logging_client_name.as_ref() else {
return Ok(None);
};
state
.pg_registry
.get_pool(client_name)
.map(Some)
.ok_or_else(|| {
service_unavailable(
"Pipeline template store unavailable",
format!("Logging client '{}' is not connected.", client_name),
)
})
}
async fn load_template_catalog_pool(
app_state: &Data<AppState>,
) -> Result<Option<sqlx::PgPool>, HttpResponse> {
let Some(pool) = template_catalog_pool(app_state.get_ref())? else {
return Ok(None);
};
Ok(Some(pool))
}
fn pipeline_template_catalog_error_response(err: PipelineTemplateCatalogError) -> HttpResponse {
match err {
PipelineTemplateCatalogError::Initialize(source) => internal_error(
"Failed to initialize pipeline template store",
source.to_string(),
),
PipelineTemplateCatalogError::Load(source) => {
internal_error("Failed to load pipeline templates", source.to_string())
}
PipelineTemplateCatalogError::InvalidDefinition(message) => {
internal_error("Invalid pipeline template definition", message)
}
}
}
async fn load_merged_template_catalog(
app_state: &Data<AppState>,
) -> Result<athena_pipelines::PipelineTemplateSummaryMerge, HttpResponse> {
let registry: &HashMap<String, PipelineDefinition> = match &app_state.pipeline_registry {
Some(r) => r.as_ref(),
None => &HashMap::new(),
};
load_pipeline_template_summary_merge(
load_template_catalog_pool(app_state).await?.as_ref(),
registry,
)
.await
.map_err(pipeline_template_catalog_error_response)
}
async fn fetch_source_rows_via_gateway(
req: &HttpRequest,
app_state: &Data<AppState>,
source_client: &str,
fetch_body: &Value,
) -> Result<Vec<Value>, HttpResponse> {
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let parsed_fetch_body =
GatewayFetchRequest::from_body(fetch_body, force_camel_case_to_snake_case);
if parsed_fetch_body.table_name.trim().is_empty() {
return Err(bad_request("Bad request", "table_name is required"));
}
let mut columns_vec = parsed_fetch_body.columns.clone();
if columns_vec.is_empty() {
columns_vec.push("*".to_string());
}
let mut current_page: i64 = parsed_fetch_body.current_page.unwrap_or(1);
if current_page < 1 {
current_page = 1;
}
let page_size: i64 = parsed_fetch_body.page_size.unwrap_or(100);
let offset: i64 = parsed_fetch_body.offset.unwrap_or(0);
let limit: i64 = parsed_fetch_body.limit.unwrap_or(page_size);
let calculated_offset: i64 = (current_page - 1) * page_size + offset;
let mut conditions =
match parse_gateway_fetch_conditions(fetch_body, force_camel_case_to_snake_case) {
Ok(parsed) => parsed,
Err(resp) => return Err(resp),
};
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
let conditions_json: Vec<Value> = conditions
.iter()
.map(|c| json!({ "eq_column": c.eq_column, "eq_value": c.eq_value.clone() }))
.collect();
let column_types: Option<std::collections::HashMap<String, String>> =
match app_state.pg_registry.get_pool(source_client) {
Some(pool) => {
resolve_where_column_types(
&pool,
&parsed_fetch_body.table_name,
app_state.gateway_allow_schema_names_prefixed_as_table_name,
)
.await
}
None => None,
};
let pg_conditions = to_query_conditions_with_types(
&conditions,
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
column_types.as_ref(),
);
let sort_options =
parse_sort_options_from_body(Some(fetch_body), force_camel_case_to_snake_case);
let columns_refs: Vec<&str> = columns_vec.iter().map(String::as_str).collect();
execute_gateway_fetch_data(
app_state.get_ref(),
req,
"pipeline",
source_client,
None,
&parsed_fetch_body.table_name,
columns_refs,
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
false,
)
.await
.map_err(|err| HttpResponse::InternalServerError().json(json!({ "error": err })))
}
async fn insert_one_row_via_runtime(
app_state: &Data<AppState>,
client_name: &str,
table_name: &str,
row: &Value,
) -> Result<Value, String> {
if let Some(pool) = app_state.pg_registry.get_pool(client_name) {
insert_row(&pool, table_name, row)
.await
.map_err(|e| format!("{:?}", e))
} else {
insert(table_name.to_string(), row.clone(), client_name)
.await
.map(|(v, _)| v)
.map_err(|e| format!("{:?}", e))
}
}
struct GatewayPipelineRuntime {
req: HttpRequest,
app_state: Data<AppState>,
}
#[async_trait::async_trait(?Send)]
impl PipelineRuntime for GatewayPipelineRuntime {
async fn fetch_source_rows(
&self,
source_client: &str,
fetch_body: &Value,
) -> Result<Vec<Value>, PipelineFetchFailure> {
match fetch_source_rows_via_gateway(&self.req, &self.app_state, source_client, fetch_body)
.await
{
Ok(rows) => Ok(rows),
Err(fetch_response) => {
let status = fetch_response.status();
let response_bytes = to_bytes(fetch_response.into_body())
.await
.unwrap_or_default();
let response_body = serde_json::from_slice(&response_bytes)
.unwrap_or_else(|_| json!({ "error": "fetch failed" }));
Err(PipelineFetchFailure {
status_code: status.as_u16(),
response_body,
})
}
}
}
async fn insert_sink_row(
&self,
sink_client: &str,
sink_table: &str,
row: &Value,
) -> Result<Value, String> {
insert_one_row_via_runtime(&self.app_state, sink_client, sink_table, row).await
}
}
async fn execute_pipeline(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let client_name: String = x_athena_client(&req);
if client_name.is_empty() {
return bad_request(
"Missing required header",
"X-Athena-Client header is required",
);
}
let body_value: Value = match body {
Some(b) => b.into_inner(),
None => {
return bad_request("Bad request", "Request body is required");
}
};
let pipeline_req: PipelineRequest = match serde_json::from_value(body_value.clone()) {
Ok(r) => r,
Err(e) => {
return bad_request("Invalid pipeline request", e.to_string());
}
};
let registry: &HashMap<String, PipelineDefinition> = match &app_state.pipeline_registry {
Some(r) => r.as_ref(),
None => &HashMap::new(),
};
let template_catalog_pool = match load_template_catalog_pool(&app_state).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let database_templates =
match load_pipeline_definition_map(template_catalog_pool.as_ref()).await {
Ok(value) => value,
Err(err) => return pipeline_template_catalog_error_response(err),
};
let def = match resolve_definition(&pipeline_req, registry, &database_templates) {
Ok(d) => d,
Err(e) => return bad_request("Invalid pipeline definition", e),
};
let source_client: String = resolve_client_name(def.source.client.as_deref(), &client_name);
let sink_client: String = resolve_client_name(def.sink.client.as_deref(), &client_name);
let dry_run: bool = pipeline_req.dry_run.unwrap_or(false);
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![
read_right_for_resource(Some(&def.source.table_name)),
write_right_for_resource(Some(&def.sink.table_name)),
],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let runtime = GatewayPipelineRuntime {
req: req.clone(),
app_state: app_state.clone(),
};
match execute_pipeline_run(
&runtime,
&def,
PipelineRunContext {
request_id: Some(logged_request.request_id.clone()),
pipeline_name: pipeline_req.pipeline.clone(),
request_client: client_name.clone(),
source_client: source_client.clone(),
sink_client: sink_client.clone(),
dry_run,
force_camel_case_to_snake_case: app_state.gateway_force_camel_case_to_snake_case,
},
)
.await
{
PipelineRunOutcome::FetchFailure(failure) => {
let status = StatusCode::from_u16(failure.status_code)
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
spawn_pipeline_step_logs(app_state.get_ref(), failure.step_logs);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"pipeline_fetch",
Some(&def.sink.table_name),
operation_start.elapsed().as_millis(),
status,
Some(json!({
"status": failure.status_code,
"error": failure.response_body.clone(),
})),
);
HttpResponse::build(status).json(failure.response_body)
}
PipelineRunOutcome::Success(success) => {
let response: PipelineExecutionResponse = success.response;
spawn_pipeline_step_logs(app_state.get_ref(), success.step_logs);
info!(
client = %client_name,
source_client = %response.source_client,
sink_client = %response.sink_client,
source = %def.source.table_name,
sink = %def.sink.table_name,
rows_fetched = response.rows_fetched,
rows_inserted = response.rows_inserted,
errors = response.errors.len(),
dry_run = response.dry_run,
"pipeline run finished"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"pipeline",
Some(&def.sink.table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"rows_fetched": response.rows_fetched,
"rows_inserted": response.rows_inserted,
"source_client": response.source_client.clone(),
"sink_client": response.sink_client.clone(),
"dry_run": response.dry_run,
})),
);
HttpResponse::Ok().json(response)
}
}
}
#[post("/pipelines")]
pub async fn run_pipeline(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> impl Responder {
execute_pipeline(req, body, app_state).await
}
#[post("/pipelines/simulate")]
pub async fn simulate_pipeline(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> impl Responder {
let mut payload = body.map(|b| b.into_inner()).unwrap_or_else(|| json!({}));
payload["dry_run"] = json!(true);
execute_pipeline(req, Some(Json(payload)), app_state).await
}
#[get("/pipelines/templates")]
pub async fn list_pipeline_templates(app_state: Data<AppState>) -> impl Responder {
match load_merged_template_catalog(&app_state).await {
Ok(merge) => {
for template_name in &merge.shadowed_template_names {
warn!(
template = %template_name,
"database pipeline template conflicts with a boot YAML template; ignoring runtime shadow"
);
}
HttpResponse::Ok().json(PipelineTemplateListResponse::new(merge.templates))
}
Err(resp) => resp,
}
}