#![allow(dead_code)]
use anyhow::{Context, Result};
use chrono::{Local, Utc};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use crate::worker::advice::HourlyLimiter;
use crate::worker::claude::ClaudeCli;
use crate::worker::config::WorkerConfig;
use crate::worker::db::Db;
use crate::worker::supabase::{PendingAdvice, SupabaseClient};
const TICK_INTERVAL_SECS: u64 = 20;
const PROJECT_COOLDOWN_SECS: u64 = 90; const VERIFY_BUDGET_PER_HOUR: u32 = 8;
const CLAUDE_TIMEOUT_SECS: u64 = 60;
const MAX_FILE_BYTES: usize = 4000;
pub fn run(cfg: WorkerConfig) -> Result<()> {
let client_id = cfg
.client_id
.clone()
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "unknown".into());
let (url, key) = match (&cfg.supabase_url, &cfg.supabase_key) {
(Some(u), Some(k)) if !u.is_empty() && !k.is_empty() => (u.clone(), k.clone()),
_ => {
log_line("[verify] disabled (Supabase not configured)");
return Ok(());
}
};
let supabase = Arc::new(SupabaseClient::new(&url, &key, &client_id)?);
let db = Db::open(&cfg.db_path)?;
let claude = ClaudeCli::new(&cfg.claude_bin);
log_line("[verify] thread up");
let mut last_check: HashMap<String, Instant> = HashMap::new();
let mut last_verified_ts: HashMap<String, String> = HashMap::new();
let mut limiters: HashMap<String, Arc<HourlyLimiter>> = HashMap::new();
loop {
let _ = supabase.heartbeat("verify");
let projects = match db.distinct_projects() {
Ok(p) => p,
Err(e) => {
log_line(&format!("[verify] distinct_projects err: {}", e));
Vec::new()
}
};
for project in projects {
if let Some(t) = last_check.get(&project) {
if t.elapsed() < Duration::from_secs(PROJECT_COOLDOWN_SECS) {
continue;
}
}
let since = last_verified_ts
.get(&project)
.cloned()
.unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string());
let has_changes = db
.project_has_changes_since(&project, &since)
.unwrap_or(false);
if !has_changes {
continue;
}
let pending = match supabase.list_pending_verifiable_advice(&project) {
Ok(p) => p,
Err(e) => {
log_line(&format!("[verify] list err for {}: {}", project, e));
continue;
}
};
log_line(&format!(
"[verify] {} change_since={} pending={}",
project,
since,
pending.len()
));
if pending.is_empty() {
last_check.insert(project.clone(), Instant::now());
last_verified_ts.insert(project.clone(), Utc::now().to_rfc3339());
continue;
}
let limiter = limiters
.entry(project.clone())
.or_insert_with(|| Arc::new(HourlyLimiter::new(VERIFY_BUDGET_PER_HOUR)))
.clone();
if !limiter.try_consume() {
log_line(&format!(
"[verify] rate-limited for {} ({}/h)",
project, VERIFY_BUDGET_PER_HOUR
));
continue;
}
let monitor = cfg.monitor_dir.clone();
match verify_batch(&claude, &monitor, &project, &pending, &cfg.advice_locale) {
Ok(decisions) => {
let mut acked = 0;
for d in decisions {
if !d.resolved {
continue;
}
match supabase.ack_event(d.id, "verify") {
Ok(()) => {
acked += 1;
log_line(&format!(
"[verify] auto-acked #{} ({}): {}",
d.id, project, d.reason
));
}
Err(e) => log_line(&format!("[verify] ack err #{}: {}", d.id, e)),
}
}
log_line(&format!(
"[verify] {} processed {} pending, acked {}",
project,
pending.len(),
acked
));
}
Err(e) => log_line(&format!("[verify] batch err for {}: {}", project, e)),
}
last_check.insert(project.clone(), Instant::now());
last_verified_ts.insert(project.clone(), Utc::now().to_rfc3339());
}
thread::sleep(Duration::from_secs(TICK_INTERVAL_SECS));
}
}
#[derive(Debug)]
struct Decision {
id: i64,
resolved: bool,
reason: String,
}
fn verify_batch(
claude: &ClaudeCli,
monitor: &Path,
project: &str,
pending: &[PendingAdvice],
locale: &str,
) -> Result<Vec<Decision>> {
let mut snippet_block = String::new();
let mut seen = std::collections::BTreeSet::new();
for adv in pending {
for p in &adv.paths {
if !seen.insert(p.clone()) {
continue;
}
let abs = monitor.join(project).join(p);
if crate::worker::secrets::is_secret_path(&abs) {
continue;
}
snippet_block.push_str(&format!("--- {} ---\n", p));
match std::fs::read(&abs) {
Ok(bytes) => {
let n = bytes.len().min(MAX_FILE_BYTES);
let snippet = String::from_utf8_lossy(&bytes[..n]);
snippet_block.push_str(&snippet);
if bytes.len() > n {
snippet_block.push_str("\n…(truncated)\n");
}
snippet_block.push_str("\n\n");
}
Err(_) => snippet_block.push_str("(unreadable / deleted)\n\n"),
}
}
}
let advice_json: Vec<Value> = pending
.iter()
.map(|a| {
json!({
"id": a.id,
"text": a.text,
"paths": a.paths,
})
})
.collect();
let advice_block = serde_json::to_string_pretty(&advice_json)?;
let prompt = format!(
r#"You previously generated the following advice items for project "{project}".
Given the CURRENT content of the referenced files below, decide which
items are now resolved.
BE CONSERVATIVE: only mark resolved=true if you can VERIFY in the
content that the specific issue described is no longer present. If you
cannot tell, or the file is missing, mark resolved=false. Do not
speculate.
The "reason" field MUST be written in language code `{locale}`.
PENDING ADVICE:
{advice_block}
CURRENT FILE STATE:
{snippet_block}
Output STRICT JSON, no markdown fences. Schema:
[
{{"id": <number>, "resolved": true|false, "reason": "<short, in {locale}>"}}
]
"#
);
let result = claude.ask_json(&prompt, Duration::from_secs(CLAUDE_TIMEOUT_SECS))?;
let arr = result
.as_array()
.context("verify: expected JSON array at top level")?;
let mut out = Vec::new();
for v in arr {
let id = v.get("id").and_then(|x| x.as_i64()).unwrap_or(0);
let resolved = v.get("resolved").and_then(|x| x.as_bool()).unwrap_or(false);
let reason = v
.get("reason")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
if id > 0 {
out.push(Decision {
id,
resolved,
reason,
});
}
}
Ok(out)
}
fn log_line(msg: &str) {
let now = Local::now().format("%Y-%m-%d %H:%M:%S");
println!("{} {}", now, msg);
}