#[allow(deprecated)]
use actix_web::HttpRequest;
use actix_web::web::{Data, Json};
use actix_web::{HttpResponse, post};
use serde_json::{Value, json};
use sqlx::{Pool, Postgres};
use std::time::Instant;
use tracing::{error, info, warn};
use crate::AppState;
use crate::api::cache::check::{
CacheLookupOutcome, check_cache_control_and_get_response_v2_with_outcome,
};
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::client_context::{logging_pool, optional_pool_for_client};
use crate::api::gateway::auth::{
GatewayAuthOptions, authorize_gateway_request_with_options, read_right_for_resource,
};
use crate::api::gateway::contracts::{
GATEWAY_DEFERRED_KIND_FETCH, GatewayDeferredRequest, GatewayOperationKind,
};
use crate::api::gateway::deferred::enqueue_gateway_deferred_request;
use crate::api::gateway::pool_resolver::{
gateway_client_name_or_direct_token, request_uses_direct_postgres_uri, resolve_postgres_pool,
};
use crate::api::gateway::response::{
GATEWAY_ERROR_CODE_DEFERRED_QUEUE_UNAVAILABLE, GATEWAY_ERROR_CODE_INVALID_FETCH_CONDITION,
GATEWAY_ERROR_CODE_INVALID_SCHEMA_TABLE_SELECTOR,
GATEWAY_ERROR_CODE_INVALID_STRUCTURED_SELECT_REQUEST, GATEWAY_ERROR_CODE_MISSING_REQUEST_BODY,
GATEWAY_ERROR_CODE_MISSING_TABLE_NAME, GATEWAY_ERROR_CODE_UNSUPPORTED_GATEWAY_FETCH_SHAPE,
gateway_bad_request_with_code, gateway_service_unavailable_with_code,
missing_client_header_response,
};
use crate::api::headers::request_context::resolved_athena_route_target_url;
use crate::api::headers::response_headers::set_cache_headers;
#[cfg(feature = "deadpool_experimental")]
use crate::api::headers::x_athena_deadpool_enable::x_athena_deadpool_enable;
use crate::api::headers::x_strip_nulls::get_x_strip_nulls;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::response::api_accepted;
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::parser::query_builder::Condition;
use crate::utils::cache_key_token::cache_key_client_token;
use crate::utils::format::normalize_rows;
use crate::utils::request_logging::{
LoggedRequest, RequestCompletionLogContext, finalize_request_log, log_operation_event,
log_request,
};
use super::conditions::{
RequestCondition, merge_column_types_with_stats_fallback, resolve_where_column_types,
to_query_conditions_with_types,
};
use super::execution::execute_gateway_fetch_data;
use super::proxy::proxy_fetch_to_route_target;
use super::request::{build_fetch_hashed_cache_key, build_fetch_hashed_cache_key_legacy8};
use super::response::{respond_fetch_err, respond_fetch_ok};
use super::singleflight::{
SingleflightRole, acquire_fetch_singleflight, publish_fetch_singleflight_result,
wait_for_fetch_singleflight_result,
};
use super::types::{PostProcessingConfig, SortOptions};
use athena_gateway::{
GatewayFetchBodyPlanError, build_gateway_fetch_body_plan, build_structured_fetch_cache_key,
execute_structured_fetch_sql, render_structured_fetch_sql,
};
fn is_stats_rollup_table_for_type_fallback(table_name: &str) -> bool {
matches!(
table_name.rsplit('.').next().unwrap_or(table_name),
"client_statistics" | "client_table_statistics"
)
}
fn cache_source_from_outcome(outcome: CacheLookupOutcome) -> &'static str {
match outcome {
CacheLookupOutcome::HitLocalRaw | CacheLookupOutcome::HitLocal => "local",
CacheLookupOutcome::HitRedis => "redis",
CacheLookupOutcome::BypassNoCacheHeader => "bypass",
CacheLookupOutcome::MissAllTiers
| CacheLookupOutcome::MissAfterRedisGetError
| CacheLookupOutcome::MissAfterRedisGetTimeout => "database",
}
}
pub(crate) async fn handle_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let start_time: Instant = Instant::now();
let client_name: String = gateway_client_name_or_direct_token(&req);
if client_name.is_empty() {
return missing_client_header_response(GatewayOperationKind::Fetch);
}
let direct_pg_uri_requested: bool = request_uses_direct_postgres_uri(&req);
if !direct_pg_uri_requested {
if let Some(target_url) = resolved_athena_route_target_url(&req) {
return proxy_fetch_to_route_target(
&req,
app_state.get_ref(),
&target_url,
&client_name,
body.as_ref().map(|json_body| &json_body.0),
)
.await;
}
}
let gateway_webhook_route_key: &'static str = if req.path().contains("/gateway/fetch") {
crate::webhooks::ROUTE_GATEWAY_FETCH
} else {
crate::webhooks::ROUTE_GATEWAY_DATA
};
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let fetch_body_plan =
match build_gateway_fetch_body_plan(body.as_deref(), force_camel_case_to_snake_case) {
Ok(plan) => plan,
Err(GatewayFetchBodyPlanError::InvalidStructuredSelect(err)) => {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_STRUCTURED_SELECT_REQUEST,
GatewayOperationKind::Fetch,
"Invalid structured select request",
err,
);
}
Err(GatewayFetchBodyPlanError::InvalidSchemaTableSelector(err)) => {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_SCHEMA_TABLE_SELECTOR,
GatewayOperationKind::Fetch,
"Invalid schema/table selector",
err,
);
}
Err(GatewayFetchBodyPlanError::InvalidCondition(err)) => {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_FETCH_CONDITION,
GatewayOperationKind::Fetch,
"Invalid fetch condition",
err.message(),
);
}
};
let table_name = fetch_body_plan.table_name.clone();
let mut conditions: Vec<RequestCondition> = fetch_body_plan.conditions.clone();
let apikey: Option<String> = req
.headers()
.get("x-athena-key")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string())
.filter(|key| key == &std::env::var("SUITSBOOKS_API_ADMIN_KEY").unwrap_or_default());
let user_id: String = get_x_user_id(&req)
.or_else(|| apikey.clone())
.unwrap_or_default();
let current_page: i64 = fetch_body_plan.current_page;
let page_size: i64 = fetch_body_plan.page_size;
let offset: i64 = fetch_body_plan.offset;
let limit: i64 = fetch_body_plan.limit;
let columns_vec: Vec<String> = fetch_body_plan.columns.clone();
let strip_nulls: bool = match get_x_strip_nulls(&req) {
Some(value) => value.to_lowercase() == "true",
None => false,
};
let post_processing_config: PostProcessingConfig =
fetch_body_plan.post_processing_config.clone();
let sort_options: Option<SortOptions> = fetch_body_plan.sort_options.clone();
let structured_fetch_plan = fetch_body_plan.structured_fetch_plan.clone();
let request_payload_for_webhook = fetch_body_plan.request_payload_for_webhook.clone();
if body.is_none() {
let auth = authorize_gateway_request_with_options(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
GatewayAuthOptions::default(),
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_MISSING_REQUEST_BODY,
GatewayOperationKind::Fetch,
"Invalid request",
"table_name is required",
);
}
let required_read_rights: Vec<String> = if let Some(plan) = structured_fetch_plan.as_ref() {
plan.resource_names()
.into_iter()
.map(|resource| read_right_for_resource(Some(resource.as_str())))
.collect()
} else {
vec![read_right_for_resource(Some(&table_name))]
};
let auth = authorize_gateway_request_with_options(
&req,
app_state.get_ref(),
Some(&client_name),
required_read_rights,
GatewayAuthOptions::default(),
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let direct_pg_pool: Option<Pool<Postgres>> = if direct_pg_uri_requested {
match resolve_postgres_pool(&req, app_state.get_ref()).await {
Ok(pool) => Some(pool),
Err(resp) => return resp,
}
} else {
None
};
if auth.force_deferred_queue {
if direct_pg_uri_requested {
warn!(
request_id = %auth.request_id,
"Auth fallback requested deferred queueing for direct PostgreSQL URI request; continuing with inline execution",
);
} else {
if table_name.is_empty() {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_MISSING_TABLE_NAME,
GatewayOperationKind::Fetch,
"Invalid request",
"table_name is required",
);
}
let Some(request_body) = body.as_ref().map(|json_body| json_body.0.clone()) else {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_MISSING_REQUEST_BODY,
GatewayOperationKind::Fetch,
"Invalid request",
"request body is required for deferred /gateway/fetch execution",
);
};
let request_bytes: Option<u64> = req
.headers()
.get(actix_web::http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok());
let deferred_request = GatewayDeferredRequest::for_request_body(
GATEWAY_DEFERRED_KIND_FETCH,
auth.request_id.clone(),
client_name.clone(),
request_body,
)
.with_reason(auth.force_deferred_reason.clone())
.with_requested_at_unix_ms(chrono::Utc::now().timestamp_millis());
if let Err(err) = enqueue_gateway_deferred_request(
app_state.get_ref(),
"POST",
req.path(),
request_bytes,
&deferred_request,
)
.await
{
return gateway_service_unavailable_with_code(
GATEWAY_ERROR_CODE_DEFERRED_QUEUE_UNAVAILABLE,
GatewayOperationKind::Fetch,
"Deferred queue unavailable",
format!("Failed to queue deferred fetch request: {err}"),
);
}
return api_accepted(
"Fetch request queued for deferred execution (auth fallback mode)",
json!({
"request_id": auth.request_id,
"status": "queued",
"route": req.path(),
}),
);
}
}
let resolved_gateway_pg_pool: Option<Pool<Postgres>> = if direct_pg_uri_requested {
direct_pg_pool.clone()
} else {
match optional_pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
}
};
#[cfg(feature = "deadpool_experimental")]
let deadpool_requested = x_athena_deadpool_enable(&req, Some(&auth.request_id));
#[cfg(not(feature = "deadpool_experimental"))]
let deadpool_requested = false;
if table_name.is_empty() {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_MISSING_TABLE_NAME,
GatewayOperationKind::Fetch,
"Invalid request",
"table_name is required",
);
}
let resolved_structured_pg_pool: Option<Pool<Postgres>> = if structured_fetch_plan.is_some() {
resolved_gateway_pg_pool.clone()
} else {
None
};
let structured_sql: Option<String> = match (
structured_fetch_plan.as_ref(),
resolved_structured_pg_pool.as_ref(),
) {
(Some(plan), Some(pool)) => match render_structured_fetch_sql(pool, plan).await {
Ok(sql) => Some(sql),
Err(err) => {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_INVALID_STRUCTURED_SELECT_REQUEST,
GatewayOperationKind::Fetch,
"Invalid structured select request",
err,
);
}
},
(Some(_), None) => {
return gateway_bad_request_with_code(
GATEWAY_ERROR_CODE_UNSUPPORTED_GATEWAY_FETCH_SHAPE,
GatewayOperationKind::Fetch,
"Unsupported gateway fetch shape",
"structured select fetch requires a PostgreSQL-backed Athena client or x-pg-uri",
);
}
(None, _) => None,
};
if structured_fetch_plan.is_none() {
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
}
let hashed_cache_key: String = if let Some(plan) = structured_fetch_plan.as_ref() {
build_structured_fetch_cache_key(
plan,
structured_sql.as_deref().unwrap_or_default(),
strip_nulls,
&client_name,
)
} else {
build_fetch_hashed_cache_key(
&table_name,
&conditions,
&columns_vec,
limit,
strip_nulls,
&client_name,
sort_options.as_ref(),
)
};
let legacy_hashed_cache_key: String = if structured_fetch_plan.is_some() {
hashed_cache_key.clone()
} else {
build_fetch_hashed_cache_key_legacy8(
&table_name,
&conditions,
&columns_vec,
limit,
strip_nulls,
&client_name,
sort_options.as_ref(),
)
};
let (cache_result, cache_outcome): (Option<HttpResponse>, CacheLookupOutcome) =
check_cache_control_and_get_response_v2_with_outcome(
&req,
app_state.clone(),
&hashed_cache_key,
"gateway_fetch_cache_lookup",
)
.await;
match cache_result {
Some(mut cached_response) => {
let cache_source = cache_source_from_outcome(cache_outcome);
let cache_id = cache_key_client_token(&hashed_cache_key);
set_cache_headers(
cached_response.headers_mut(),
true,
Some(&cache_id),
Some(cache_outcome.as_str()),
Some(cache_source),
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
cached_response.status(),
Some(json!({
"cache_key": hashed_cache_key,
"cached": true,
"cache_source": cache_source,
"cache_lookup_outcome": cache_outcome.as_str(),
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(cached_response.status().as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(true),
cache_lookup_outcome: Some(cache_outcome.as_str().to_string()),
cache_source: Some(cache_source.to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.clone()),
},
);
if cached_response.status().is_success() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
gateway_webhook_route_key,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
request_payload_for_webhook.clone(),
Some(json!({
"cached": true,
"cache_key": cache_key_client_token(&hashed_cache_key),
})),
),
);
}
return cached_response;
}
None => {
if legacy_hashed_cache_key != hashed_cache_key {
let (legacy_cache_result, legacy_cache_outcome): (
Option<HttpResponse>,
CacheLookupOutcome,
) = check_cache_control_and_get_response_v2_with_outcome(
&req,
app_state.clone(),
&legacy_hashed_cache_key,
"gateway_fetch_cache_lookup",
)
.await;
if let Some(mut cached_response) = legacy_cache_result {
let cache_source = cache_source_from_outcome(legacy_cache_outcome);
let cache_id = cache_key_client_token(&legacy_hashed_cache_key);
set_cache_headers(
cached_response.headers_mut(),
true,
Some(&cache_id),
Some(legacy_cache_outcome.as_str()),
Some(cache_source),
);
info!(
cache_key = %hashed_cache_key,
legacy_cache_key = %legacy_hashed_cache_key,
outcome = %legacy_cache_outcome.as_str(),
"cache hit via legacy hashed key (POST gateway fetch)"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
cached_response.status(),
Some(json!({
"cache_key": legacy_hashed_cache_key,
"cached": true,
"cache_source": cache_source,
"cache_lookup_outcome": legacy_cache_outcome.as_str(),
})),
);
finalize_request_log(
Some(app_state.get_ref()),
&logged_request.request_id,
RequestCompletionLogContext {
status_code: Some(cached_response.status().as_u16()),
duration_ms: Some(operation_start.elapsed().as_millis()),
cached: Some(true),
cache_lookup_outcome: Some(legacy_cache_outcome.as_str().to_string()),
cache_source: Some(cache_source.to_string()),
operation: Some("fetch".to_string()),
table_name: Some(table_name.clone()),
},
);
if cached_response.status().is_success() {
crate::webhooks::spawn_gateway_webhook_dispatch(
app_state.clone(),
crate::webhooks::gateway_webhook_trigger_from_http(
&req,
&client_name,
gateway_webhook_route_key,
Some(table_name.clone()),
Some(logged_request.request_id.clone()),
request_payload_for_webhook.clone(),
Some(json!({
"cached": true,
"cache_key": cache_key_client_token(&legacy_hashed_cache_key),
})),
),
);
}
return cached_response;
}
}
info!(
cache_key = %hashed_cache_key,
legacy_cache_key = %legacy_hashed_cache_key,
outcome = %cache_outcome.as_str(),
"cache miss (POST gateway fetch)"
);
}
}
let singleflight_started_at: Instant = Instant::now();
let singleflight_role: SingleflightRole = acquire_fetch_singleflight(&hashed_cache_key).await;
let data_result: Result<Vec<Value>, String> = if let Some(plan) = structured_fetch_plan.as_ref()
{
let pool = resolved_structured_pg_pool
.as_ref()
.expect("structured pool resolved before execution");
let sql = structured_sql
.as_deref()
.expect("structured sql compiled before execution");
match singleflight_role {
SingleflightRole::Leader(flight) => {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"leader",
singleflight_started_at.elapsed().as_secs_f64(),
);
let leader_result = match execute_structured_fetch_sql(pool, plan, sql).await {
Ok(result) => {
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/fetch", "sqlx");
Ok(result.rows)
}
Err(err) => Err(process_sqlx_error_with_context(&err, Some(&table_name))
.to_json()
.to_string()),
};
publish_fetch_singleflight_result(&hashed_cache_key, flight, leader_result.clone())
.await;
leader_result
}
SingleflightRole::Follower(flight) => {
let follower_result: Option<Result<Vec<Value>, String>> =
wait_for_fetch_singleflight_result(flight).await;
if let Some(shared_result) = follower_result {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_shared",
singleflight_started_at.elapsed().as_secs_f64(),
);
shared_result
} else {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_timeout_fallback",
singleflight_started_at.elapsed().as_secs_f64(),
);
match execute_structured_fetch_sql(pool, plan, sql).await {
Ok(result) => {
app_state
.metrics_state
.record_gateway_athena_backend("/gateway/fetch", "sqlx");
Ok(result.rows)
}
Err(err) => Err(process_sqlx_error_with_context(&err, Some(&table_name))
.to_json()
.to_string()),
}
}
}
}
} else {
let conditions_json: Vec<Value> = conditions
.iter()
.map(|c| {
json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value.clone()
})
})
.collect();
let columns_refs: Vec<&str> = columns_vec.iter().map(|s| s.as_str()).collect();
let allow_schema: bool = app_state.gateway_allow_schema_names_prefixed_as_table_name;
let column_types: Option<std::collections::HashMap<String, String>> =
merge_column_types_with_stats_fallback(&table_name, {
let primary: Option<std::collections::HashMap<String, String>> =
if let Some(pool) = resolved_gateway_pg_pool.as_ref() {
resolve_where_column_types(pool, &table_name, allow_schema).await
} else {
None
};
let primary_empty: bool = primary.as_ref().map(|m| m.is_empty()).unwrap_or(true);
if primary_empty && is_stats_rollup_table_for_type_fallback(&table_name) {
if let Ok(pool) = logging_pool(app_state.get_ref()) {
let from_logging: Option<std::collections::HashMap<String, String>> =
resolve_where_column_types(&pool, &table_name, allow_schema).await;
if from_logging
.as_ref()
.map(|m| !m.is_empty())
.unwrap_or(false)
{
from_logging
} else {
primary
}
} else {
primary
}
} else {
primary
}
});
let pg_conditions: Vec<Condition> = to_query_conditions_with_types(
&conditions[..],
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
column_types.as_ref(),
);
let page_offset: i64 = if current_page < 1 { 1 } else { current_page };
let calculated_offset: i64 = (page_offset - 1) * page_size + offset;
match singleflight_role {
SingleflightRole::Leader(flight) => {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"leader",
singleflight_started_at.elapsed().as_secs_f64(),
);
let leader_result = execute_gateway_fetch_data(
app_state.get_ref(),
&req,
&auth.request_id,
&client_name,
resolved_gateway_pg_pool.clone(),
&table_name,
columns_refs.clone(),
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
deadpool_requested,
)
.await;
publish_fetch_singleflight_result(&hashed_cache_key, flight, leader_result.clone())
.await;
leader_result
}
SingleflightRole::Follower(flight) => {
let follower_result: Option<Result<Vec<Value>, String>> =
wait_for_fetch_singleflight_result(flight).await;
if let Some(shared_result) = follower_result {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_shared",
singleflight_started_at.elapsed().as_secs_f64(),
);
shared_result
} else {
app_state.metrics_state.record_management_mutation(
"gateway_fetch_singleflight",
"follower_timeout_fallback",
singleflight_started_at.elapsed().as_secs_f64(),
);
execute_gateway_fetch_data(
app_state.get_ref(),
&req,
&auth.request_id,
&client_name,
resolved_gateway_pg_pool.clone(),
&table_name,
columns_refs,
&pg_conditions,
conditions_json,
limit,
current_page,
page_size,
offset,
calculated_offset,
sort_options.as_ref(),
deadpool_requested,
)
.await
}
}
}
};
if let Ok(data) = &data_result {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": normalized_rows.clone()})],
)
.await;
} else {
error!("Failed to rehydrate cache due to data fetch error");
}
match data_result {
Ok(data) => {
respond_fetch_ok(
&req,
gateway_webhook_route_key,
request_payload_for_webhook.clone(),
app_state.clone(),
&data,
&hashed_cache_key,
strip_nulls,
&post_processing_config,
force_camel_case_to_snake_case,
&table_name,
&logged_request,
operation_start,
cache_outcome,
)
.await
}
Err(err) => respond_fetch_err(
err,
app_state.clone(),
&table_name,
&client_name,
&user_id,
&hashed_cache_key,
start_time,
operation_start,
&logged_request,
cache_outcome,
),
}
}
#[post("/gateway/data")]
pub async fn fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[post("/gateway/fetch")]
pub async fn proxy_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}