use crate::drivers::scylla::health::{self, HostHealthTracker, HostOffline, SystemClock};
use serde_json::Value;
use std::env::var;
use std::error::Error;
use std::time::Duration;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use supabase_rs::query::QueryBuilder;
use tracing::{debug, error, info, warn};
#[derive(Clone, Debug)]
pub struct SupabaseConnectionInfo {
pub url: String,
pub key: String,
pub host: String,
pub name: Option<String>,
}
impl SupabaseConnectionInfo {
pub fn new(url: String, key: String) -> Self {
let host: String = extract_host_from_url(&url);
Self {
url,
key,
host,
name: None,
}
}
pub fn with_name(url: String, key: String, name: String) -> Self {
let host: String = extract_host_from_url(&url);
Self {
url,
key,
host,
name: Some(name),
}
}
pub fn from_env(url_key: &str, key_key: &str) -> Result<Self, String> {
let url: String = var(url_key)
.map_err(|e| format!("Missing environment variable '{}': {}", url_key, e))?;
let key: String = var(key_key)
.map_err(|e| format!("Missing environment variable '{}': {}", key_key, e))?;
Ok(Self::new(url, key))
}
}
#[doc(hidden)]
pub fn extract_host_from_url(url: &str) -> String {
url.trim_start_matches("https://")
.trim_start_matches("http://")
.split('/')
.next()
.unwrap_or(url)
.to_string()
}
pub struct HealthAwareSupabaseClient {
info: SupabaseConnectionInfo,
client: SupabaseClient,
tracker: &'static HostHealthTracker<SystemClock>,
}
impl HealthAwareSupabaseClient {
pub fn new(info: SupabaseConnectionInfo) -> Result<Self, Box<dyn Error>> {
let client: SupabaseClient = SupabaseClient::new(info.url.clone(), info.key.clone())
.map_err(|e| format!("Failed to create SupabaseClient: {:?}", e))?;
Ok(Self {
info,
client,
tracker: health::global_tracker(),
})
}
pub fn from_env(url_key: &str, key_key: &str) -> Result<Self, Box<dyn Error>> {
let info: SupabaseConnectionInfo = SupabaseConnectionInfo::from_env(url_key, key_key)?;
Self::new(info)
}
pub fn is_offline(&self) -> Option<Instant> {
self.tracker.offline_until(&self.info.host)
}
pub fn force_offline(&self, duration: Duration) -> Instant {
warn!(
host = %self.info.host,
name = ?self.info.name,
duration_secs = duration.as_secs(),
"Forcing Supabase host offline"
);
self.tracker.force_offline(&self.info.host, duration)
}
pub fn reset_health(&self) {
info!(
host = %self.info.host,
name = ?self.info.name,
"Resetting Supabase host health tracking"
);
self.tracker.reset_host(&self.info.host);
}
fn record_success(&self) {
self.tracker.record_success(&self.info.host);
}
fn record_failure(&self) -> Option<Instant> {
self.tracker.record_failure(&self.info.host)
}
fn check_availability(&self) -> Result<(), Box<dyn Error>> {
if let Some(deadline) = self.is_offline() {
return Err(Box::new(HostOffline::new(self.info.host.clone(), deadline)));
}
Ok(())
}
pub fn select(&self, table_name: &str) -> HealthAwareQueryBuilder<'_> {
HealthAwareQueryBuilder {
client: self,
builder: self.client.select(table_name),
table_name: table_name.to_string(),
}
}
pub async fn execute(
&self,
table_name: &str,
query_string: &str,
) -> Result<Vec<Value>, Box<dyn Error>> {
self.check_availability()?;
debug!(
host = %self.info.host,
table = %table_name,
query = %query_string,
"Executing Supabase query"
);
let rows: Vec<Value> = match self.client.execute(table_name, query_string).await {
Ok(result) => {
self.record_success();
info!(
host = %self.info.host,
table = %table_name,
rows = result.len(),
"Supabase query succeeded"
);
result
}
Err(err) => {
error!(
host = %self.info.host,
table = %table_name,
error = %err,
"Supabase query failed"
);
if let Some(deadline) = self.record_failure() {
return Err(Box::new(HostOffline::new(self.info.host.clone(), deadline))
as Box<dyn Error>);
}
return Err(
Box::new(std::io::Error::new(std::io::ErrorKind::Other, err)) as Box<dyn Error>,
);
}
};
Ok(rows)
}
pub async fn insert(&self, table_name: &str, data: Value) -> Result<String, Box<dyn Error>> {
self.check_availability()?;
debug!(
host = %self.info.host,
table = %table_name,
"Inserting into Supabase table"
);
match self.client.insert(table_name, data).await {
Ok(id) => {
self.record_success();
info!(
host = %self.info.host,
table = %table_name,
id = %id,
"Supabase insert succeeded"
);
Ok(id)
}
Err(err) => {
error!(
host = %self.info.host,
table = %table_name,
error = %err,
"Supabase insert failed"
);
if let Some(deadline) = self.record_failure() {
return Err(Box::new(HostOffline::new(self.info.host.clone(), deadline))
as Box<dyn Error>);
}
Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err)) as Box<dyn Error>)
}
}
}
pub async fn update(
&self,
table_name: &str,
row_id: &str,
data: Value,
) -> Result<(), Box<dyn Error>> {
self.check_availability()?;
debug!(
host = %self.info.host,
table = %table_name,
row_id = %row_id,
"Updating Supabase table"
);
match self.client.update(table_name, row_id, data).await {
Ok(_) => {
self.record_success();
info!(
host = %self.info.host,
table = %table_name,
row_id = %row_id,
"Supabase update succeeded"
);
}
Err(err) => {
error!(
host = %self.info.host,
table = %table_name,
row_id = %row_id,
error = %err,
"Supabase update failed"
);
if let Some(deadline) = self.record_failure() {
return Err(Box::new(HostOffline::new(self.info.host.clone(), deadline))
as Box<dyn Error>);
}
return Err(
Box::new(std::io::Error::new(std::io::ErrorKind::Other, err)) as Box<dyn Error>,
);
}
}
Ok(())
}
pub async fn delete(&self, table_name: &str, row_id: &str) -> Result<(), Box<dyn Error>> {
self.check_availability()?;
debug!(
host = %self.info.host,
table = %table_name,
row_id = %row_id,
"Deleting from Supabase table"
);
match self.client.delete(table_name, row_id).await {
Ok(_) => {
self.record_success();
info!(
host = %self.info.host,
table = %table_name,
row_id = %row_id,
"Supabase delete succeeded"
);
Ok(())
}
Err(err) => {
error!(
host = %self.info.host,
table = %table_name,
row_id = %row_id,
error = %err,
"Supabase delete failed"
);
if let Some(deadline) = self.record_failure() {
return Err(Box::new(HostOffline::new(self.info.host.clone(), deadline))
as Box<dyn Error>);
}
Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err)) as Box<dyn Error>)
}
}
}
}
pub struct HealthAwareQueryBuilder<'a> {
client: &'a HealthAwareSupabaseClient,
builder: QueryBuilder,
table_name: String,
}
impl<'a> HealthAwareQueryBuilder<'a> {
pub fn columns(mut self, columns: Vec<&str>) -> Self {
self.builder = self.builder.columns(columns);
self
}
pub fn eq(mut self, column: &str, value: &str) -> Self {
self.builder = self.builder.eq(column, value);
self
}
pub fn neq(mut self, column: &str, value: &str) -> Self {
self.builder = self.builder.neq(column, value);
self
}
pub fn gt(mut self, column: &str, value: &str) -> Self {
self.builder = self.builder.gt(column, value);
self
}
pub fn lt(mut self, column: &str, value: &str) -> Self {
self.builder = self.builder.lt(column, value);
self
}
pub fn order(mut self, column: &str, ascending: bool) -> Self {
self.builder = self.builder.order(column, ascending);
self
}
pub fn limit(mut self, count: usize) -> Self {
self.builder = self.builder.limit(count);
self
}
pub fn offset(mut self, count: usize) -> Self {
self.builder = self.builder.offset(count);
self
}
pub async fn execute(self) -> Result<Vec<Value>, Box<dyn Error>> {
self.client.check_availability()?;
debug!(
host = %self.client.info.host,
table = %self.table_name,
"Executing health-aware Supabase query"
);
let rows: Vec<Value> = match self.builder.execute().await {
Ok(result) => {
self.client.record_success();
info!(
host = %self.client.info.host,
table = %self.table_name,
rows = result.len(),
"Health-aware query succeeded"
);
result
}
Err(err) => {
error!(
host = %self.client.info.host,
table = %self.table_name,
error = %err,
"Health-aware query failed"
);
if let Some(deadline) = self.client.record_failure() {
return Err(
Box::new(HostOffline::new(self.client.info.host.clone(), deadline))
as Box<dyn Error>,
);
}
return Err(
Box::new(std::io::Error::new(std::io::ErrorKind::Other, err)) as Box<dyn Error>,
);
}
};
Ok(rows)
}
}