#[allow(deprecated)]
use actix_web::HttpRequest;
use actix_web::web;
use actix_web::web::Data;
use actix_web::{HttpResponse, Responder, get, post, web::Json};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use reqwest::{Client, RequestBuilder, Response};
use rust_decimal::Decimal;
use rust_decimal::prelude::FromPrimitive;
use serde::Deserialize;
use serde_json::{Value, json};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::str::FromStr;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use tracing::{error, info};
mod conditions;
use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::fetch::conditions::{RequestCondition, to_query_conditions};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_strip_nulls::get_x_strip_nulls;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::data::parse::strip_nulls::strip_nulls_from_key;
use crate::drivers::postgresql::sqlx_driver::fetch_rows_with_columns;
use crate::drivers::supabase::{fetch_multiple_conditions, fetch_multiple_conditions_with_client};
use crate::parser::query_builder::Condition;
use crate::utils::format::{normalize_column_name, normalize_columns_csv, normalize_rows};
use crate::utils::request_logging::log_request;
#[derive(Debug)]
enum TimeGranularity {
Day,
Hour,
Minute,
}
impl FromStr for TimeGranularity {
type Err = String;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value.to_lowercase().as_str() {
"day" => Ok(TimeGranularity::Day),
"hour" => Ok(TimeGranularity::Hour),
"minute" => Ok(TimeGranularity::Minute),
other => Err(format!("Unsupported time_granularity '{}'", other)),
}
}
}
impl TimeGranularity {
fn format_label(&self, datetime: DateTime<Utc>) -> String {
match self {
TimeGranularity::Day => datetime.format("%Y-%m-%d").to_string(),
TimeGranularity::Hour => datetime.format("%Y-%m-%d %H:00").to_string(),
TimeGranularity::Minute => datetime.format("%Y-%m-%d %H:%M").to_string(),
}
}
}
#[derive(Debug, PartialEq, Eq)]
enum AggregationStrategy {
CumulativeSum,
}
impl FromStr for AggregationStrategy {
type Err = String;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value.to_lowercase().as_str() {
"cumulative_sum" => Ok(AggregationStrategy::CumulativeSum),
other => Err(format!("Unsupported aggregation_strategy '{}'", other)),
}
}
}
#[derive(Debug)]
struct PostProcessingConfig {
group_by: Option<String>,
time_granularity: Option<TimeGranularity>,
aggregation_column: Option<String>,
aggregation_strategy: Option<AggregationStrategy>,
dedup_aggregation: bool,
}
impl PostProcessingConfig {
fn from_body(body: Option<&Value>, force_snake: bool) -> Self {
let normalize = |value: &str| normalize_column_name(value, force_snake);
let group_by: Option<String> = body
.and_then(|b| b.get("group_by"))
.and_then(Value::as_str)
.map(normalize);
let aggregation_column: Option<String> = body
.and_then(|b| b.get("aggregation_column"))
.and_then(Value::as_str)
.map(normalize);
let time_granularity: Option<TimeGranularity> = body
.and_then(|b| b.get("time_granularity"))
.and_then(Value::as_str)
.and_then(|s| s.parse::<TimeGranularity>().ok());
let aggregation_strategy: Option<AggregationStrategy> = body
.and_then(|b| b.get("aggregation_strategy"))
.and_then(Value::as_str)
.and_then(|s| s.parse::<AggregationStrategy>().ok());
let dedup_aggregation: bool = body
.and_then(|b| b.get("aggregation_dedup"))
.and_then(Value::as_bool)
.unwrap_or(false);
PostProcessingConfig {
group_by,
time_granularity,
aggregation_column,
aggregation_strategy,
dedup_aggregation,
}
}
}
pub(crate) async fn handle_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
log_request(req.clone());
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let mut table_name: String = String::new();
let mut requested_view_name: Option<String> = None;
let mut current_page: i64 = 1;
let mut page_size: i64 = 100;
let mut offset: i64 = 0;
let mut total_pages: i64 = 1;
let mut conditions: Vec<RequestCondition> = vec![];
let apikey: Option<String> = req
.headers()
.get("apikey")
.or_else(|| req.headers().get("x-api-key"))
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string())
.filter(|key| key == &std::env::var("SUITSBOOKS_API_ADMIN_KEY").unwrap_or_default());
let user_id: String = get_x_user_id(&req)
.map(|id| id)
.or_else(|| apikey.clone())
.unwrap_or_default();
let limit: i64 = match &body {
Some(json_body) => json_body
.get("limit")
.and_then(Value::as_u64)
.unwrap_or(page_size as u64) as i64,
None => page_size,
};
let mut columns_vec: Vec<String> = vec![];
if let Some(ref json_body) = body {
if let Some(cols_val) = json_body.get("columns") {
if let Some(arr) = cols_val.as_array() {
columns_vec = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
} else if let Some(s) = cols_val.as_str() {
columns_vec = s
.split(',')
.map(|p| p.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
}
}
if columns_vec.is_empty() {
columns_vec.push("*".to_string());
}
if force_camel_case_to_snake_case {
columns_vec = columns_vec
.into_iter()
.map(|col| normalize_column_name(&col, true))
.collect();
}
let strip_nulls: bool = match get_x_strip_nulls(&req) {
Some(value) => value.to_lowercase() == "true",
None => false,
};
let post_processing_config: PostProcessingConfig =
PostProcessingConfig::from_body(body.as_deref(), force_camel_case_to_snake_case);
if let Some(ref json_body) = body {
if let Some(page) = json_body.get("current_page").and_then(Value::as_u64) {
current_page = page as i64;
}
if let Some(size) = json_body.get("page_size").and_then(Value::as_u64) {
page_size = size as i64;
}
if let Some(off) = json_body.get("offset").and_then(Value::as_u64) {
offset = off as i64;
}
if let Some(pages) = json_body.get("total_pages").and_then(Value::as_u64) {
total_pages = pages as i64;
}
}
if let Some(ref json_body) = body {
if let Some(name) = json_body.get("table_name").and_then(Value::as_str) {
table_name = name.to_string();
}
if let Some(view_name) = json_body.get("view_name").and_then(Value::as_str) {
requested_view_name = Some(view_name.to_string());
table_name = view_name.to_string();
}
if table_name.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
if let Some(additional_conditions) = json_body.get("conditions").and_then(|c| c.as_array())
{
for condition in additional_conditions {
if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str) {
let eq_value: String = match condition.get("eq_value") {
Some(Value::Bool(b)) => {
if *b {
"true".to_string()
} else {
"false".to_string()
}
}
Some(Value::String(s)) => s.clone(),
Some(other) => other.to_string(),
None => continue,
};
let normalized_eq_column =
normalize_column_name(eq_column, force_camel_case_to_snake_case);
conditions.push(RequestCondition::new(
normalized_eq_column.clone(),
eq_value.clone(),
));
}
}
}
} else {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
let first_eq_column: &str = conditions.first().map_or("_", |c| &c.eq_column);
let hash_input: Value = json!({
"columns": columns_vec,
"conditions": conditions.iter().map(|c| json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value
})).collect::<Vec<_>>(),
"limit": limit,
"strip_nulls": strip_nulls,
"client": client_name,
});
let hash_str: String = sha256::digest(serde_json::to_string(&hash_input).unwrap_or_default());
let short_hash: &str = &hash_str[..8];
let hashed_cache_key: String = format!(
"{}:{}:{}:{}:{}:{}",
table_name,
first_eq_column,
columns_vec.join(","),
limit,
strip_nulls,
short_hash
);
let cache_result: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;
match cache_result {
Some(cached_response) => {
return cached_response;
}
None => {
info!(cache_key = %hashed_cache_key, "cache miss (POST gateway fetch)");
}
}
let conditions_json: Vec<Value> = conditions
.iter()
.map(|c| {
json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value
})
})
.collect();
let columns_refs: Vec<&str> = columns_vec.iter().map(|s| s.as_str()).collect();
let pg_conditions: Vec<Condition> = to_query_conditions(&conditions[..]);
let page_offset: i64 = if current_page < 1 { 1 } else { current_page };
let calculated_offset: i64 = (page_offset - 1) * page_size + offset;
let data_result: Result<Vec<Value>, String>;
if let Some(pool) = app_state.pg_registry.get_pool(&client_name) {
data_result = fetch_rows_with_columns(
&pool,
&table_name,
&columns_refs,
&pg_conditions,
limit,
calculated_offset,
)
.await
.map_err(|err| err.to_string());
match &data_result {
Ok(rows) => info!(
backend = "postgres",
row_count = rows.len(),
"fetch_rows_with_columns finished"
),
Err(e) => info!(
backend = "postgres",
error = %e,
"fetch_rows_with_columns failed"
),
}
} else if client_name == "custom_supabase" {
let supabase_url: Option<String> = req
.headers()
.get("x-supabase-url")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let supabase_key: Option<String> = req
.headers()
.get("x-supabase-key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
match (supabase_url, supabase_key) {
(Some(url), Some(key)) => match SupabaseClient::new(url, key) {
Ok(client) => {
data_result = fetch_multiple_conditions_with_client(
client,
table_name.clone(),
columns_refs,
Some(conditions_json),
limit,
current_page,
page_size,
offset,
total_pages,
)
.await;
match &data_result {
Ok(rows) => info!(
backend = "custom_supabase",
row_count = rows.len(),
"custom Supabase fetch finished"
),
Err(e) => info!(
backend = "custom_supabase",
error = %e,
"custom Supabase fetch failed"
),
}
}
Err(e) => {
error!(
backend = "custom_supabase",
error = %e,
"failed to create Supabase client with provided headers"
);
data_result = Err(format!("Failed to construct Supabase client: {:?}", e));
}
},
_ => {
error!(
backend = "custom_supabase",
"missing supabase credentials in headers"
);
data_result = Err("Missing x-supabase-url or x-supabase-key headers".to_string());
}
}
} else {
data_result = fetch_multiple_conditions(
table_name.clone(),
columns_refs,
Some(conditions_json),
limit,
&client_name,
current_page,
page_size,
offset,
total_pages,
)
.await;
match &data_result {
Ok(rows) => info!(
backend = "supabase",
row_count = rows.len(),
"Supabase fetch_multiple_conditions finished"
),
Err(e) => info!(
backend = "supabase",
error = %e,
"Supabase fetch_multiple_conditions failed"
),
}
}
if let Ok(data) = &data_result {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": normalized_rows.clone()})],
)
.await;
} else {
error!("Failed to rehydrate cache due to data fetch error");
}
match data_result {
Ok(data) => {
let normalized_rows: Vec<Value> = normalize_rows(&data, force_camel_case_to_snake_case);
let mut data: Value = json!({ "data": normalized_rows, "cache_key": hashed_cache_key });
if strip_nulls {
let data_stripped_result: Option<Value> =
strip_nulls_from_key(&mut data, "data").await;
if let Some(data_stripped) = data_stripped_result {
data = data_stripped.clone();
} else {
error!("Failed to strip nulls from data");
return HttpResponse::InternalServerError().json(json!({
"error": "Failed to strip nulls from data"
}
));
}
}
let row_snapshot: Vec<Value> = data
.get("data")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
match apply_post_processing(&row_snapshot, &post_processing_config) {
Ok(Some(post_processing)) => {
data["post_processing"] = post_processing;
}
Ok(None) => {}
Err(err) => {
error!("Post-processing error: {}", err);
return HttpResponse::BadRequest().json(json!({
"error": format!("Post-processing failure: {}", err),
"cache_key": hashed_cache_key,
}));
}
}
let post_processing_applied: bool = data.get("post_processing").is_some();
HttpResponse::Ok().json(data)
}
Err(err) => {
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key = %hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
error = %err,
"fetch POST failed"
);
HttpResponse::InternalServerError().json(
json!({"error": "Failed to fetch data with conditions", "cache_key": hashed_cache_key, "error": err}),
)
}
}
}
fn apply_post_processing(
rows: &[Value],
config: &PostProcessingConfig,
) -> Result<Option<Value>, String> {
let group_by_column: &String = match &config.group_by {
Some(column) => column,
None => return Ok(None),
};
if config.aggregation_strategy.is_some() && config.aggregation_column.is_none() {
return Err("aggregation_strategy requires aggregation_column".to_string());
}
let mut buckets: BTreeMap<String, Vec<Value>> = BTreeMap::new();
for row in rows {
let label: String =
extract_group_label(row, group_by_column, config.time_granularity.as_ref())?;
buckets.entry(label).or_default().push(row.clone());
}
let mut grouped: Vec<Value> = Vec::new();
let mut running_total: Decimal = Decimal::ZERO;
for (label, bucket_rows) in buckets {
let base_sum: Option<Decimal> = if let Some(column) = &config.aggregation_column {
Some(sum_bucket(&bucket_rows, column, config.dedup_aggregation)?)
} else {
None
};
let aggregation_value: Option<Decimal> =
match (config.aggregation_strategy.as_ref(), base_sum) {
(Some(AggregationStrategy::CumulativeSum), Some(sum)) => {
running_total += sum;
Some(running_total)
}
(_, Some(sum)) => Some(sum),
_ => None,
};
let aggregation_payload: Option<Value> =
aggregation_value.map(|value| Value::String(value.to_string()));
grouped.push(json!({
"label": label,
"rows": bucket_rows,
"aggregation": aggregation_payload
}));
}
Ok(Some(json!({ "grouped": grouped })))
}
fn extract_group_label(
row: &Value,
column: &str,
granularity: Option<&TimeGranularity>,
) -> Result<String, String> {
let value: &Value = row
.get(column)
.ok_or_else(|| format!("Missing group_by column '{}'", column))?;
if let Some(gran) = granularity {
let datetime: DateTime<Utc> = parse_datetime_value(value)?;
return Ok(gran.format_label(datetime));
}
if let Some(text) = value.as_str() {
return Ok(text.to_string());
}
Ok(value.to_string())
}
#[allow(deprecated)]
fn parse_datetime_value(value: &Value) -> Result<DateTime<Utc>, String> {
match value {
Value::String(text) => DateTime::parse_from_rfc3339(text)
.map(|dt| dt.with_timezone(&Utc))
.or_else(|_| {
NaiveDateTime::parse_from_str(text, "%Y-%m-%d %H:%M:%S")
.map(|naive| DateTime::<Utc>::from_utc(naive, Utc))
})
.map_err(|err| format!("Failed to parse datetime '{}': {}", text, err)),
Value::Number(number) => {
if let Some(timestamp) = number.as_i64() {
Utc.timestamp_opt(timestamp, 0)
.single()
.ok_or_else(|| format!("Invalid unix timestamp {}", timestamp))
} else if let Some(float) = number.as_f64() {
let secs: i64 = float.trunc() as i64;
let nanos: u32 = ((float.fract()) * 1_000_000_000f64) as u32;
Utc.timestamp_opt(secs, nanos)
.single()
.ok_or_else(|| format!("Invalid unix timestamp {}", float))
} else {
Err("Unsupported numeric timestamp".to_string())
}
}
_ => Err("Unsupported value type for time granularity".to_string()),
}
}
fn sum_bucket(rows: &[Value], column: &str, dedup: bool) -> Result<Decimal, String> {
let mut total: Decimal = Decimal::ZERO;
let mut seen: HashSet<Decimal> = HashSet::new();
for row in rows {
let value: &Value = row
.get(column)
.ok_or_else(|| format!("Missing aggregation column '{}'", column))?;
let decimal: Decimal = value_to_decimal(value)
.ok_or_else(|| format!("Aggregation column '{}' contains non-numeric data", column))?;
if dedup {
if seen.contains(&decimal) {
continue;
}
seen.insert(decimal.clone());
}
total += decimal;
}
Ok(total)
}
fn value_to_decimal(value: &Value) -> Option<Decimal> {
match value {
Value::Number(num) => {
if let Some(i) = num.as_i64() {
Some(Decimal::from(i))
} else if let Some(u) = num.as_u64() {
Some(Decimal::from(u))
} else if let Some(f) = num.as_f64() {
Decimal::from_f64(f)
} else {
None
}
}
Value::String(text) => Decimal::from_str(text).ok(),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn post_processing_config_from_body_normalizes_columns() {
let body = json!({
"group_by": "CreatedAt",
"aggregation_column": "TotalValue",
"aggregation_strategy": "cumulative_sum"
});
let config = PostProcessingConfig::from_body(Some(&body), true);
assert_eq!(config.group_by.as_deref(), Some("created_at"));
assert_eq!(config.aggregation_column.as_deref(), Some("total_value"));
assert_eq!(
config.aggregation_strategy,
Some(AggregationStrategy::CumulativeSum)
);
}
#[test]
fn apply_post_processing_requires_aggregation_column() {
let rows = vec![json!({"created_at": "2024-01-01T00:00:00Z", "value": 1})];
let config = PostProcessingConfig {
group_by: Some("created_at".to_string()),
time_granularity: None,
aggregation_column: None,
aggregation_strategy: Some(AggregationStrategy::CumulativeSum),
dedup_aggregation: false,
};
let result = apply_post_processing(&rows, &config);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.contains("aggregation_strategy requires aggregation_column")
);
}
#[test]
fn apply_post_processing_groups_rows() {
let rows = vec![
json!({"created_at": "2024-01-01T00:00:00Z", "value": 2}),
json!({"created_at": "2024-01-01T00:00:00Z", "value": 3}),
];
let config = PostProcessingConfig {
group_by: Some("created_at".to_string()),
time_granularity: None,
aggregation_column: Some("value".to_string()),
aggregation_strategy: Some(AggregationStrategy::CumulativeSum),
dedup_aggregation: false,
};
let grouped = apply_post_processing(&rows, &config)
.expect("Grouping should succeed")
.expect("Grouping payload should exist");
assert!(grouped.get("grouped").is_some());
}
}
#[post("/gateway/data")]
pub async fn fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[post("/gateway/fetch")]
pub async fn proxy_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[post("/gateway/update")]
pub async fn gateway_update_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[get("/data")]
pub async fn get_data_route(
req: HttpRequest,
query: web::Query<HashMap<String, String>>,
app_state: Data<AppState>,
) -> impl Responder {
log_request(req.clone());
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let view: String = match query.get("view") {
Some(v) => v.clone(),
None => {
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: view"
}));
}
};
let eq_column_raw: String = match query.get("eq_column") {
Some(c) => c.clone(),
None => {
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: eq_column"
}));
}
};
let eq_column: String = normalize_column_name(&eq_column_raw, force_camel_case_to_snake_case);
let eq_value: String = match query.get("eq_value") {
Some(v) => v.clone(),
None => {
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: eq_value"
}));
}
};
let columns: Option<String> = query.get("columns").cloned();
let limit: Option<i32> = query.get("limit").and_then(|l| l.parse::<i32>().ok());
let current_page: Option<i32> = query
.get("current_page")
.and_then(|p| p.parse::<i32>().ok());
let page_size: Option<i32> = query.get("page_size").and_then(|s| s.parse::<i32>().ok());
let offset: Option<i32> = query.get("offset").and_then(|o| o.parse::<i32>().ok());
let total_pages: Option<i32> = query.get("total_pages").and_then(|t| t.parse::<i32>().ok());
let strip_nulls: bool = query
.get("strip_nulls")
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or(false);
let normalized_columns_param: Option<String> = columns
.as_ref()
.map(|cols| normalize_columns_csv(cols, force_camel_case_to_snake_case));
let legacy_cache_key: String = format!(
"get_data_route:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}",
view,
eq_column,
eq_value,
normalized_columns_param.as_deref().unwrap_or(""),
limit.unwrap_or(0),
current_page.unwrap_or(1),
page_size.unwrap_or(100),
offset.unwrap_or(0),
total_pages.unwrap_or(1),
strip_nulls,
client_name
);
let get_hash_input: Value = json!({
"view": view,
"eq_column": eq_column,
"eq_value": eq_value,
"columns": normalized_columns_param.clone(),
"limit": limit,
"current_page": current_page,
"page_size": page_size,
"offset": offset,
"total_pages": total_pages,
"strip_nulls": strip_nulls,
"client": client_name,
});
let get_hash_str: String =
sha256::digest(serde_json::to_string(&get_hash_input).unwrap_or_default());
let get_short_hash: &str = &get_hash_str[..8];
let hashed_cache_key: String = format!("{}-h{}", legacy_cache_key, get_short_hash);
let cache_result_hashed: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;
if let Some(cached_response) = cache_result_hashed {
return cached_response;
}
let cache_result_legacy: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &legacy_cache_key).await;
if let Some(cached_response) = cache_result_legacy {
info!(cache_key = %legacy_cache_key, duration_ms = %start_time.elapsed().as_millis(), "cache hit (GET, legacy)");
return cached_response;
}
let condition: Value = json!({
"eq_column": eq_column,
"eq_value": eq_value
});
let mut request_body: Value = json!({
"view_name": view,
"conditions": [condition]
});
if let Some(cols) = normalized_columns_param.clone() {
request_body["columns"] = json!(cols);
}
if let Some(l) = limit {
request_body["limit"] = json!(l);
}
if let Some(cp) = current_page {
request_body["current_page"] = json!(cp);
}
if let Some(ps) = page_size {
request_body["page_size"] = json!(ps);
}
if let Some(o) = offset {
request_body["offset"] = json!(o);
}
if let Some(tp) = total_pages {
request_body["total_pages"] = json!(tp);
}
if strip_nulls {
request_body["strip_nulls"] = json!(strip_nulls);
}
let client: Client = Client::new();
let mut req_builder: RequestBuilder = client
.post("http://localhost:4884/gateway/fetch")
.header("Content-Type", "application/json")
.header(
"X-User-Id",
req.headers()
.get("X-User-Id")
.and_then(|h| h.to_str().ok())
.unwrap_or(""),
)
.header(
"X-Athena-Client",
if client_name.is_empty() {
"supabase"
} else {
&client_name
},
);
if let Some(api_key) = req.headers().get("apikey").and_then(|h| h.to_str().ok()) {
req_builder = req_builder.header("apikey", api_key);
}
if let Some(api_key) = req.headers().get("x-api-key").and_then(|h| h.to_str().ok()) {
req_builder = req_builder.header("x-api-key", api_key);
}
if let Some(url) = req
.headers()
.get("x-supabase-url")
.and_then(|h| h.to_str().ok())
{
req_builder = req_builder.header("x-supabase-url", url);
}
if let Some(key) = req
.headers()
.get("x-supabase-key")
.and_then(|h| h.to_str().ok())
{
req_builder = req_builder.header("x-supabase-key", key);
}
let response: Response = match req_builder.json(&request_body).send().await {
Ok(resp) => resp,
Err(e) => {
error!("Failed to send POST request: {:#?}", e);
return HttpResponse::InternalServerError().json(json!({
"error": "Failed to process request"
}));
}
};
match response.json::<Value>().await {
Ok(data) => {
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": data.clone()})],
)
.await;
hydrate_cache_and_return_json(
app_state.clone(),
legacy_cache_key.clone(),
vec![json!({"data": data.clone()})],
)
.await;
HttpResponse::Ok().json(data)
}
Err(e) => {
error!(duration_ms = %start_time.elapsed().as_millis(), error = %e, "fetch GET failed to parse response");
HttpResponse::InternalServerError().json(json!({
"error": "Failed to parse response"
}))
}
}
}