devist 0.11.0

Project bootstrap CLI for AI-assisted development. Spin up new projects from templates, manage backends, and keep your codebase comprehensible.
#![allow(dead_code)]
// Supabase L2 push: batches local SQLite events into worker_events table
// via the PostgREST endpoint. Idempotent on (client_id, client_event_id).

use anyhow::{anyhow, Context, Result};
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::time::Duration;

use crate::worker::db::Event;

#[derive(Debug, Clone, Deserialize)]
pub struct RuleRow {
    pub scope: String,
    pub content: String,
    pub updated_at: String,
}

#[derive(Debug, Clone, Deserialize)]
pub struct JobRow {
    pub id: i64,
    pub kind: String,
    pub scope: String,
    pub input: Value,
    #[serde(default)]
    pub output: Option<Value>,
    pub status: String,
}

pub struct SupabaseClient {
    http: Client,
    base_url: String,
    key: String,
    client_id: String,
}

#[derive(Debug, Serialize)]
struct EventRow<'a> {
    client_id: &'a str,
    client_event_id: i64,
    project: &'a str,
    event_type: &'a str,
    path: Option<&'a str>,
    payload: Value,
    severity: &'a str,
    created_at: &'a str,
}

impl SupabaseClient {
    pub fn new(url: &str, key: &str, client_id: &str) -> Result<Self> {
        if url.is_empty() || key.is_empty() {
            return Err(anyhow!("supabase_url and supabase_key required"));
        }
        if client_id.is_empty() {
            return Err(anyhow!("client_id required"));
        }
        let http = Client::builder().timeout(Duration::from_secs(20)).build()?;
        Ok(Self {
            http,
            base_url: url.trim_end_matches('/').to_string(),
            key: key.to_string(),
            client_id: client_id.to_string(),
        })
    }

    /// Push events to `worker_events`. Returns the count actually accepted
    /// (best-effort — Supabase returns minimal on success).
    pub fn push_events(&self, events: &[Event]) -> Result<usize> {
        if events.is_empty() {
            return Ok(0);
        }

        let rows: Vec<EventRow<'_>> = events
            .iter()
            .filter_map(|e| {
                let id = e.id?;
                let payload = serde_json::from_str::<Value>(&e.payload)
                    .unwrap_or_else(|_| Value::Object(Default::default()));
                Some(EventRow {
                    client_id: &self.client_id,
                    client_event_id: id,
                    project: &e.project,
                    event_type: &e.event_type,
                    path: e.path.as_deref(),
                    payload,
                    severity: &e.severity,
                    created_at: &e.created_at,
                })
            })
            .collect();

        if rows.is_empty() {
            return Ok(0);
        }

        let endpoint = format!("{}/rest/v1/worker_events", self.base_url);
        let resp = self
            .http
            .post(&endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .header("Content-Type", "application/json")
            .header("Prefer", "resolution=ignore-duplicates,return=minimal")
            .json(&rows)
            .send()
            .context("supabase: request failed")?;

        let status = resp.status();
        if !status.is_success() {
            let body = resp.text().unwrap_or_default();
            return Err(anyhow!(
                "supabase HTTP {}: {}",
                status,
                body.chars().take(400).collect::<String>()
            ));
        }
        Ok(rows.len())
    }

    /// Fetch all worker_rules rows.
    pub fn list_rules(&self) -> Result<Vec<RuleRow>> {
        let endpoint = format!(
            "{}/rest/v1/worker_rules?select=scope,content,updated_at",
            self.base_url
        );
        let resp = self
            .http
            .get(&endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .send()
            .context("supabase list_rules: request failed")?;
        let status = resp.status();
        if !status.is_success() {
            let body = resp.text().unwrap_or_default();
            return Err(anyhow!(
                "list_rules HTTP {}: {}",
                status,
                body.chars().take(400).collect::<String>()
            ));
        }
        let rows: Vec<RuleRow> = resp.json().context("list_rules: parse")?;
        Ok(rows)
    }

    /// Upsert a heartbeat row for (client_id, thread). Idempotent.
    pub fn heartbeat(&self, thread: &str) -> Result<()> {
        let endpoint = format!("{}/rest/v1/worker_heartbeat", self.base_url);
        let body = json!([{
            "client_id": self.client_id,
            "thread": thread,
            "last_beat_at": chrono::Utc::now().to_rfc3339(),
        }]);
        let resp = self
            .http
            .post(&endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .header("Content-Type", "application/json")
            .header("Prefer", "resolution=merge-duplicates,return=minimal")
            .json(&body)
            .send()
            .context("supabase heartbeat: request failed")?;
        if !resp.status().is_success() {
            let body = resp.text().unwrap_or_default();
            return Err(anyhow!("heartbeat HTTP error: {}", body));
        }
        Ok(())
    }

    /// Atomically claim the next pending job for this client.
    /// Returns None if no pending job exists.
    pub fn claim_next_pending_job(&self) -> Result<Option<JobRow>> {
        // 1. Find the oldest pending job
        let pending_endpoint = format!(
            "{}/rest/v1/worker_jobs?status=eq.pending&order=created_at.asc&limit=1&select=*",
            self.base_url
        );
        let pending_resp = self
            .http
            .get(&pending_endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .send()
            .context("supabase claim_next: list request failed")?;
        if !pending_resp.status().is_success() {
            let body = pending_resp.text().unwrap_or_default();
            return Err(anyhow!("claim_next list HTTP error: {}", body));
        }
        let pending_rows: Vec<JobRow> = pending_resp.json().context("claim_next: parse")?;
        let candidate = match pending_rows.into_iter().next() {
            Some(r) => r,
            None => return Ok(None),
        };

        // 2. UPDATE WHERE id=X AND status='pending' to claim it.
        // Returns the updated row(s); if another client already claimed,
        // we get an empty array.
        let claim_endpoint = format!(
            "{}/rest/v1/worker_jobs?id=eq.{}&status=eq.pending",
            self.base_url, candidate.id
        );
        let claim_resp = self
            .http
            .patch(&claim_endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .header("Content-Type", "application/json")
            .header("Prefer", "return=representation")
            .json(&json!({
                "status": "running",
                "client_id": self.client_id,
            }))
            .send()
            .context("supabase claim_next: claim request failed")?;
        if !claim_resp.status().is_success() {
            let body = claim_resp.text().unwrap_or_default();
            return Err(anyhow!("claim_next claim HTTP error: {}", body));
        }
        let claimed: Vec<JobRow> = claim_resp.json().context("claim_next: parse claim")?;
        Ok(claimed.into_iter().next())
    }

    /// Mark a job done with an output payload.
    pub fn complete_job(&self, id: i64, output: Value) -> Result<()> {
        self.update_job(
            id,
            json!({ "status": "done", "output": output, "error": null }),
        )
    }

    /// Mark a job errored.
    pub fn fail_job(&self, id: i64, error: &str) -> Result<()> {
        self.update_job(id, json!({ "status": "error", "error": error }))
    }

    fn update_job(&self, id: i64, body: Value) -> Result<()> {
        let endpoint = format!("{}/rest/v1/worker_jobs?id=eq.{}", self.base_url, id);
        let resp = self
            .http
            .patch(&endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .header("Content-Type", "application/json")
            .header("Prefer", "return=minimal")
            .json(&body)
            .send()
            .context("supabase update_job: request failed")?;
        if !resp.status().is_success() {
            let body = resp.text().unwrap_or_default();
            return Err(anyhow!("update_job HTTP error: {}", body));
        }
        Ok(())
    }

    /// Get a single rule's content by scope (returns None if missing).
    pub fn get_rule(&self, scope: &str) -> Result<Option<String>> {
        let endpoint = format!(
            "{}/rest/v1/worker_rules?scope=eq.{}&select=content",
            self.base_url, scope
        );
        let resp = self
            .http
            .get(&endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .send()
            .context("supabase get_rule: request failed")?;
        if !resp.status().is_success() {
            let body = resp.text().unwrap_or_default();
            return Err(anyhow!("get_rule HTTP error: {}", body));
        }
        let rows: Vec<Value> = resp.json().context("get_rule: parse")?;
        Ok(rows
            .into_iter()
            .next()
            .and_then(|v| v.get("content").and_then(|c| c.as_str().map(String::from))))
    }

    /// Upsert a single worker_rules row by scope.
    pub fn upsert_rule(&self, scope: &str, content: &str) -> Result<()> {
        let endpoint = format!("{}/rest/v1/worker_rules", self.base_url);
        let body = json!([{ "scope": scope, "content": content }]);
        let resp = self
            .http
            .post(&endpoint)
            .header("apikey", &self.key)
            .header("Authorization", format!("Bearer {}", self.key))
            .header("Content-Type", "application/json")
            .header("Prefer", "resolution=merge-duplicates,return=minimal")
            .json(&body)
            .send()
            .context("supabase upsert_rule: request failed")?;
        let status = resp.status();
        if !status.is_success() {
            let body = resp.text().unwrap_or_default();
            return Err(anyhow!(
                "upsert_rule HTTP {}: {}",
                status,
                body.chars().take(400).collect::<String>()
            ));
        }
        Ok(())
    }
}