use serde_json::Value;
use crate::drivers::postgresql::sqlx_driver::{PostgresInsertError, insert_row};
use crate::drivers::supabase::{client_router, client_router_health_aware};
use supabase_rs::SupabaseClient;
#[derive(Debug)]
pub enum SupabaseInsertError {
UnknownClient { client_name: String },
ClientInit { message: String },
InsertFailed { message: String },
BackendUnavailable { message: String },
}
#[derive(Debug)]
pub(crate) enum InsertDriverError {
Postgres(PostgresInsertError),
Supabase(SupabaseInsertError),
StageTimeout {
backend: &'static str,
stage: &'static str,
timeout_ms: u64,
},
}
pub(crate) async fn postgres_insert_with_timeout(
pool: &sqlx::postgres::PgPool,
table_name: &str,
insert_body: &Value,
timeout_ms: u64,
) -> Result<Value, InsertDriverError> {
let timeout = tokio::time::Duration::from_millis(timeout_ms);
match tokio::time::timeout(timeout, insert_row(pool, table_name, insert_body)).await {
Ok(Ok(value)) => Ok(value),
Ok(Err(err)) => Err(InsertDriverError::Postgres(err)),
Err(_) => Err(InsertDriverError::StageTimeout {
backend: "postgres",
stage: "db_insert",
timeout_ms,
}),
}
}
pub(crate) async fn supabase_insert_with_timeout(
table_name: String,
insert_body: Value,
client_name: &str,
timeout_ms: u64,
) -> Result<(Value, String), InsertDriverError> {
let timeout = tokio::time::Duration::from_millis(timeout_ms);
match tokio::time::timeout(
timeout,
insert_via_supabase(table_name, insert_body, client_name),
)
.await
{
Ok(Ok(value)) => Ok(value),
Ok(Err(err)) => Err(InsertDriverError::Supabase(err)),
Err(_) => Err(InsertDriverError::StageTimeout {
backend: "supabase",
stage: "db_insert",
timeout_ms,
}),
}
}
pub async fn insert_via_supabase(
table_name: String,
data: Value,
client_name: &str,
) -> Result<(Value, String), SupabaseInsertError> {
if let Ok(health_client) = client_router_health_aware(client_name) {
match health_client.insert(&table_name, data.clone()).await {
Ok(_) => {
return Ok((
data,
format!("Successfully inserted data into {:?}", table_name),
));
}
Err(err) => {
if err
.downcast_ref::<crate::drivers::scylla::health::HostOffline>()
.is_some()
{
return Err(SupabaseInsertError::BackendUnavailable {
message: format!(
"Backend {} temporarily unavailable (circuit breaker)",
client_name
),
});
}
return Err(SupabaseInsertError::InsertFailed {
message: err.to_string(),
});
}
}
}
let client: SupabaseClient = client_router(client_name).await.map_err(|err| {
if err.starts_with("Unknown client name") {
SupabaseInsertError::UnknownClient {
client_name: client_name.to_string(),
}
} else {
SupabaseInsertError::ClientInit { message: err }
}
})?;
let insert_result: Result<String, String> = client.insert(&table_name, data.clone()).await;
match insert_result {
Ok(_) => Ok((
data,
format!("Successfully inserted data into {:?}", table_name),
)),
Err(err) => Err(SupabaseInsertError::InsertFailed { message: err }),
}
}