#![allow(dead_code)]
use anyhow::{anyhow, Context, Result};
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use crate::worker::db::Event;
const LOCALE_CACHE_TTL: Duration = Duration::from_secs(300);
#[derive(Debug, Clone, Deserialize)]
pub struct RuleRow {
pub scope: String,
pub content: String,
pub updated_at: String,
}
#[derive(Debug, Clone)]
pub struct PendingAdvice {
pub id: i64,
pub text: String,
pub paths: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct JobRow {
pub id: i64,
pub kind: String,
pub scope: String,
pub input: Value,
#[serde(default)]
pub output: Option<Value>,
pub status: String,
}
pub struct SupabaseClient {
http: Client,
base_url: String,
key: String,
client_id: String,
locale_cache: Mutex<Option<(Option<String>, Instant)>>,
}
#[derive(Debug, Serialize)]
struct EventRow<'a> {
client_id: &'a str,
client_event_id: i64,
project: &'a str,
event_type: &'a str,
path: Option<&'a str>,
payload: Value,
severity: &'a str,
created_at: &'a str,
}
impl SupabaseClient {
pub fn new(url: &str, key: &str, client_id: &str) -> Result<Self> {
if url.is_empty() || key.is_empty() {
return Err(anyhow!("supabase_url and supabase_key required"));
}
if client_id.is_empty() {
return Err(anyhow!("client_id required"));
}
let http = Client::builder().timeout(Duration::from_secs(20)).build()?;
Ok(Self {
http,
base_url: url.trim_end_matches('/').to_string(),
key: key.to_string(),
client_id: client_id.to_string(),
locale_cache: Mutex::new(None),
})
}
pub fn push_events(&self, events: &[Event]) -> Result<usize> {
if events.is_empty() {
return Ok(0);
}
let rows: Vec<EventRow<'_>> = events
.iter()
.filter_map(|e| {
let id = e.id?;
let payload = serde_json::from_str::<Value>(&e.payload)
.unwrap_or_else(|_| Value::Object(Default::default()));
Some(EventRow {
client_id: &self.client_id,
client_event_id: id,
project: &e.project,
event_type: &e.event_type,
path: e.path.as_deref(),
payload,
severity: &e.severity,
created_at: &e.created_at,
})
})
.collect();
if rows.is_empty() {
return Ok(0);
}
let endpoint = format!("{}/rest/v1/worker_events", self.base_url);
let resp = self
.http
.post(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.header("Content-Type", "application/json")
.header("Prefer", "resolution=ignore-duplicates,return=minimal")
.json(&rows)
.send()
.context("supabase: request failed")?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!(
"supabase HTTP {}: {}",
status,
body.chars().take(400).collect::<String>()
));
}
Ok(rows.len())
}
pub fn list_rules(&self) -> Result<Vec<RuleRow>> {
let endpoint = format!(
"{}/rest/v1/worker_rules?select=scope,content,updated_at",
self.base_url
);
let resp = self
.http
.get(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.send()
.context("supabase list_rules: request failed")?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!(
"list_rules HTTP {}: {}",
status,
body.chars().take(400).collect::<String>()
));
}
let rows: Vec<RuleRow> = resp.json().context("list_rules: parse")?;
Ok(rows)
}
pub fn list_pending_verifiable_advice(&self, project: &str) -> Result<Vec<PendingAdvice>> {
let endpoint = format!(
"{}/rest/v1/worker_events?project=eq.{}&event_type=eq.advice&acked_at=is.null&payload->>verifiable=eq.true&select=id,payload,created_at&order=id.asc",
self.base_url,
urlencoding::encode(project)
);
let resp = self
.http
.get(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.send()
.context("supabase list_pending_verifiable_advice")?;
if !resp.status().is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!("list_pending HTTP error: {}", body));
}
let raw: Vec<Value> = resp.json().context("parse")?;
let mut out = Vec::new();
for v in raw {
let id = v.get("id").and_then(|x| x.as_i64()).unwrap_or(0);
let payload = v.get("payload").cloned().unwrap_or(json!({}));
let text = payload
.get("text")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
let paths: Vec<String> = payload
.get("paths")
.and_then(|x| x.as_array())
.map(|arr| {
arr.iter()
.filter_map(|p| p.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
out.push(PendingAdvice { id, text, paths });
}
Ok(out)
}
pub fn ack_event(&self, id: i64, source: &str) -> Result<()> {
let endpoint = format!("{}/rest/v1/worker_events?id=eq.{}", self.base_url, id);
let resp = self
.http
.patch(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.header("Content-Type", "application/json")
.header("Prefer", "return=minimal")
.json(&json!({
"acked_at": chrono::Utc::now().to_rfc3339(),
"acked_by": source,
}))
.send()
.context("supabase ack_event")?;
if !resp.status().is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!("ack_event HTTP error: {}", body));
}
Ok(())
}
pub fn get_user_locale(&self) -> Option<String> {
if let Ok(guard) = self.locale_cache.lock() {
if let Some((cached, fetched_at)) = guard.as_ref() {
if fetched_at.elapsed() < LOCALE_CACHE_TTL {
return cached.clone();
}
}
}
let endpoint = format!(
"{}/rest/v1/user_locale_pref?select=lang&limit=1",
self.base_url
);
let result: Option<String> = (|| -> Result<Option<String>> {
let resp = self
.http
.get(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.send()?;
if !resp.status().is_success() {
return Ok(None);
}
let rows: Vec<Value> = resp.json()?;
Ok(rows
.into_iter()
.next()
.and_then(|v| v.get("lang").and_then(|l| l.as_str().map(String::from))))
})()
.unwrap_or(None);
if let Ok(mut guard) = self.locale_cache.lock() {
*guard = Some((result.clone(), Instant::now()));
}
result
}
pub fn heartbeat(&self, thread: &str) -> Result<()> {
let endpoint = format!("{}/rest/v1/worker_heartbeat", self.base_url);
let body = json!([{
"client_id": self.client_id,
"thread": thread,
"last_beat_at": chrono::Utc::now().to_rfc3339(),
}]);
let resp = self
.http
.post(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.header("Content-Type", "application/json")
.header("Prefer", "resolution=merge-duplicates,return=minimal")
.json(&body)
.send()
.context("supabase heartbeat: request failed")?;
if !resp.status().is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!("heartbeat HTTP error: {}", body));
}
Ok(())
}
pub fn claim_next_pending_job(&self) -> Result<Option<JobRow>> {
let pending_endpoint = format!(
"{}/rest/v1/worker_jobs?status=eq.pending&order=created_at.asc&limit=1&select=*",
self.base_url
);
let pending_resp = self
.http
.get(&pending_endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.send()
.context("supabase claim_next: list request failed")?;
if !pending_resp.status().is_success() {
let body = pending_resp.text().unwrap_or_default();
return Err(anyhow!("claim_next list HTTP error: {}", body));
}
let pending_rows: Vec<JobRow> = pending_resp.json().context("claim_next: parse")?;
let candidate = match pending_rows.into_iter().next() {
Some(r) => r,
None => return Ok(None),
};
let claim_endpoint = format!(
"{}/rest/v1/worker_jobs?id=eq.{}&status=eq.pending",
self.base_url, candidate.id
);
let claim_resp = self
.http
.patch(&claim_endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.header("Content-Type", "application/json")
.header("Prefer", "return=representation")
.json(&json!({
"status": "running",
"client_id": self.client_id,
}))
.send()
.context("supabase claim_next: claim request failed")?;
if !claim_resp.status().is_success() {
let body = claim_resp.text().unwrap_or_default();
return Err(anyhow!("claim_next claim HTTP error: {}", body));
}
let claimed: Vec<JobRow> = claim_resp.json().context("claim_next: parse claim")?;
Ok(claimed.into_iter().next())
}
pub fn complete_job(&self, id: i64, output: Value) -> Result<()> {
self.update_job(
id,
json!({ "status": "done", "output": output, "error": null }),
)
}
pub fn fail_job(&self, id: i64, error: &str) -> Result<()> {
self.update_job(id, json!({ "status": "error", "error": error }))
}
fn update_job(&self, id: i64, body: Value) -> Result<()> {
let endpoint = format!("{}/rest/v1/worker_jobs?id=eq.{}", self.base_url, id);
let resp = self
.http
.patch(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.header("Content-Type", "application/json")
.header("Prefer", "return=minimal")
.json(&body)
.send()
.context("supabase update_job: request failed")?;
if !resp.status().is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!("update_job HTTP error: {}", body));
}
Ok(())
}
pub fn get_rule(&self, scope: &str) -> Result<Option<String>> {
let endpoint = format!(
"{}/rest/v1/worker_rules?scope=eq.{}&select=content",
self.base_url, scope
);
let resp = self
.http
.get(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.send()
.context("supabase get_rule: request failed")?;
if !resp.status().is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!("get_rule HTTP error: {}", body));
}
let rows: Vec<Value> = resp.json().context("get_rule: parse")?;
Ok(rows
.into_iter()
.next()
.and_then(|v| v.get("content").and_then(|c| c.as_str().map(String::from))))
}
pub fn upsert_rule(&self, scope: &str, content: &str) -> Result<()> {
let endpoint = format!("{}/rest/v1/worker_rules", self.base_url);
let body = json!([{ "scope": scope, "content": content }]);
let resp = self
.http
.post(&endpoint)
.header("apikey", &self.key)
.header("Authorization", format!("Bearer {}", self.key))
.header("Content-Type", "application/json")
.header("Prefer", "resolution=merge-duplicates,return=minimal")
.json(&body)
.send()
.context("supabase upsert_rule: request failed")?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().unwrap_or_default();
return Err(anyhow!(
"upsert_rule HTTP {}: {}",
status,
body.chars().take(400).collect::<String>()
));
}
Ok(())
}
}