use anyhow::{anyhow, Context, Result};
use chrono::Local;
use serde_json::Value;
use std::time::{Duration, Instant};
use crate::worker::claude::ClaudeCli;
use crate::worker::config::WorkerConfig;
use crate::worker::supabase::{MemoryRow, SupabaseClient};
const TICK_INTERVAL_SECS: u64 = 30;
const HOUR: Duration = Duration::from_secs(3600);
const NEW_MEMORY_TRIGGER: usize = 10;
const MAX_PER_CYCLE: usize = 60;
pub fn run(cfg: WorkerConfig) -> Result<()> {
let supabase = match make_supabase(&cfg) {
Some(s) => s,
None => {
log_line("[consolidate] Supabase not configured — thread idle");
return Ok(());
}
};
let claude = ClaudeCli::new(cfg.claude_bin.clone());
log_line("[consolidate] thread up");
let mut last_run = Instant::now() - HOUR; let mut last_seen_count = 0usize;
let mut last_heartbeat = Instant::now() - Duration::from_secs(60);
loop {
std::thread::sleep(Duration::from_secs(TICK_INTERVAL_SECS));
if last_heartbeat.elapsed() >= Duration::from_secs(30) {
let _ = supabase.heartbeat("consolidate");
last_heartbeat = Instant::now();
}
let now = Instant::now();
let elapsed = now.duration_since(last_run);
let current_count = match supabase.list_all_memories() {
Ok(rows) => rows.len(),
Err(e) => {
log_line(&format!("[consolidate] count err: {}", e));
continue;
}
};
let new_since_last = current_count.saturating_sub(last_seen_count);
let hourly_due = elapsed >= HOUR;
let accumulation_due = new_since_last >= NEW_MEMORY_TRIGGER;
if !hourly_due && !accumulation_due {
continue;
}
let trigger = if hourly_due { "hourly" } else { "accumulation" };
log_line(&format!(
"[consolidate] {} trigger ({} new since last run, total {})",
trigger, new_since_last, current_count
));
match run_once(&supabase, &claude, &cfg.advice_locale) {
Ok(summary) => log_line(&format!("[consolidate] {}", summary)),
Err(e) => log_line(&format!("[consolidate] cycle err: {}", e)),
}
last_run = now;
last_seen_count = supabase.list_all_memories().map(|r| r.len()).unwrap_or(0);
}
}
fn make_supabase(cfg: &WorkerConfig) -> Option<SupabaseClient> {
let (url, key) = match (&cfg.supabase_url, &cfg.supabase_key) {
(Some(u), Some(k)) if !u.is_empty() && !k.is_empty() => (u.as_str(), k.as_str()),
_ => return None,
};
let client_id = cfg
.client_id
.as_deref()
.filter(|s| !s.is_empty())
.unwrap_or("unknown");
SupabaseClient::new(url, key, client_id).ok()
}
fn run_once(supabase: &SupabaseClient, claude: &ClaudeCli, locale: &str) -> Result<String> {
let mut rows = supabase.list_all_memories()?;
if rows.is_empty() {
return Ok("nothing to consolidate".into());
}
rows.sort_by(|a, b| a.id.cmp(&b.id));
if rows.len() > MAX_PER_CYCLE {
rows.truncate(MAX_PER_CYCLE);
}
let prompt = build_prompt(&rows, locale);
let raw = claude
.ask_json(&prompt, Duration::from_secs(120))
.context("claude consolidate call")?;
let verdicts = raw
.get("verdicts")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow!("no verdicts[] in response"))?;
let mut kept = 0usize;
let mut updated = 0usize;
let mut merged = 0usize;
let mut deleted = 0usize;
let mut skipped_protected = 0usize;
let mut errors = 0usize;
for v in verdicts {
let id = match v.get("id").and_then(|x| x.as_i64()) {
Some(id) => id,
None => {
errors += 1;
continue;
}
};
let action = v.get("action").and_then(|x| x.as_str()).unwrap_or("keep");
let row = match rows.iter().find(|r| r.id == id) {
Some(r) => r,
None => continue,
};
let protected = is_protected(row);
let mutating = matches!(action, "delete" | "merge" | "update");
if protected && mutating && !is_safe_protected_update(row, action, v) {
skipped_protected += 1;
continue;
}
match action {
"keep" => kept += 1,
"update" => {
let new_text = v.get("text").and_then(|x| x.as_str());
let new_scope = v.get("scope").and_then(|x| x.as_str());
let new_priority = v.get("priority").and_then(|x| x.as_str());
let new_project = v.get("project").map(|x| x.as_str());
if let Err(e) =
supabase.update_memory(id, new_text, new_scope, new_priority, new_project)
{
log_line(&format!("[consolidate] update #{} err: {}", id, e));
errors += 1;
} else {
updated += 1;
}
}
"merge" => {
if let Err(e) = supabase.soft_delete_memory(id) {
log_line(&format!("[consolidate] merge #{} err: {}", id, e));
errors += 1;
} else {
merged += 1;
}
}
"delete" => {
if let Err(e) = supabase.soft_delete_memory(id) {
log_line(&format!("[consolidate] delete #{} err: {}", id, e));
errors += 1;
} else {
deleted += 1;
}
}
_ => kept += 1,
}
}
Ok(format!(
"{} reviewed → keep {}, update {}, merge {}, delete {} (protected skipped {}, errors {})",
rows.len(),
kept,
updated,
merged,
deleted,
skipped_protected,
errors,
))
}
fn is_protected(r: &MemoryRow) -> bool {
r.source == "user" || r.priority == "constraint"
}
fn is_safe_protected_update(row: &MemoryRow, action: &str, verdict: &Value) -> bool {
if action != "update" {
return false;
}
if let Some(np) = verdict.get("priority").and_then(|x| x.as_str()) {
let cur = row.priority.as_str();
if cur == "constraint" && np != "constraint" {
return false;
}
if row.source == "user" && matches!(np, "preference" | "info") {
return false;
}
}
true
}
fn build_prompt(rows: &[MemoryRow], locale: &str) -> String {
let mut listing = String::new();
for r in rows {
let project = r.project.as_deref().unwrap_or("-");
let tech = if r.tech.is_empty() {
"-".into()
} else {
r.tech.join(",")
};
listing.push_str(&format!(
"#{} [scope={} priority={} source={} project={} tech={}]\n {}\n\n",
r.id, r.scope, r.priority, r.source, project, tech, r.text
));
}
format!(
r#"You are the consolidation pass for the Reso memory store.
You will be given the full list of currently active memories. Evaluate
each one against a strict bar and return a JSON verdict per memory.
============================================================
THE BAR — what counts as a real, durable memory
============================================================
A memory should remain ONLY IF a future maintainer or AI assistant
WOULD WANT TO KNOW IT but CANNOT learn it by reading the code. Valid
categories:
1. Decision rationale (WHY a non-obvious choice was made)
2. Hard constraints / invariants (rules future code MUST follow)
3. Cross-file invariants & contracts
4. Past incidents / scars (bugs that motivated current shape)
DELETE memories that are:
- Restatements of file structure, dependencies, build flags
- "This project uses X" / "X is implemented with Y" observations
- Already covered by docs (CLAUDE.md, README, module docstrings)
- Stale references to removed features
- Snapshots of "current architecture" that go stale on next release
MERGE memories that say semantically the same thing — keep the
strongest (highest priority, most specific text), delete the rest.
UPDATE memories where:
- scope is wrong: `project` (specific repo), `tech` (Rust/React/...
ecosystem), `user` (cross-project user preference)
- priority is wrong: `constraint` (hard MUST), `strong` (well-
established), `preference` (semantic-search retrievable),
`info` (archival)
- text is verbose / repetitive — tighten it
KEEP if it already passes the bar with correct metadata.
============================================================
PROTECTED ROWS — be respectful
============================================================
Rows with `source=user` were created by a human deliberately.
Rows with `priority=constraint` are explicit hard rules.
For these:
- You may suggest scope/project corrections
- You may NOT delete or merge them
- You may NOT demote priority below its current level
- You may shorten text only if the meaning is preserved exactly
============================================================
INPUT — current memories
============================================================
{listing}
============================================================
OUTPUT — STRICT JSON, no markdown fences
============================================================
{{
"verdicts": [
{{"id": <int>, "action": "keep|update|merge|delete", "reason": "<short, in {locale}>", "text": "<new text if action=update>", "scope": "project|tech|user", "priority": "constraint|strong|preference|info", "project": "<canonical name or null>"}}
]
}}
Every memory in the input MUST appear exactly once in `verdicts`.
Fields beyond `id`, `action`, `reason` are only required when action=update.
For action=merge, set `reason` to "merged into #<other_id>".
Reasons should be brief — ~12 Korean characters or 8 English words.
"#,
)
}
fn log_line(msg: &str) {
let now = Local::now().format("%Y-%m-%d %H:%M:%S");
println!("{} {}", now, msg);
}