#![allow(dead_code)]
use anyhow::Result;
use chrono::Local;
use serde_json::json;
use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::worker::claude::ClaudeCli;
use crate::worker::config::WorkerConfig;
use crate::worker::db::{Db, Event};
use crate::worker::mem0::Mem0Client;
use crate::worker::rules::Rules;
use crate::worker::supabase::SupabaseClient;
pub struct BurstReady {
pub project: String,
pub paths: Vec<String>,
}
#[derive(Debug, Default)]
pub struct ProjectBurst {
pub paths: Vec<String>,
pub last_event: Option<Instant>,
pub started: Option<Instant>,
}
impl ProjectBurst {
pub fn record(&mut self, path: String) {
if self.started.is_none() {
self.started = Some(Instant::now());
}
self.last_event = Some(Instant::now());
if !self.paths.contains(&path) {
self.paths.push(path);
}
}
pub fn is_idle(&self, idle: Duration) -> bool {
self.last_event
.map(|t| t.elapsed() >= idle)
.unwrap_or(false)
}
pub fn drain(&mut self) -> Vec<String> {
self.started = None;
self.last_event = None;
std::mem::take(&mut self.paths)
}
}
pub struct HourlyLimiter {
cap: u32,
hits: Mutex<VecDeque<Instant>>,
}
impl HourlyLimiter {
pub fn new(cap: u32) -> Self {
Self {
cap,
hits: Mutex::new(VecDeque::new()),
}
}
pub fn try_consume(&self) -> bool {
let mut hits = self.hits.lock().unwrap();
let now = Instant::now();
while let Some(&front) = hits.front() {
if now.duration_since(front) > Duration::from_secs(3600) {
hits.pop_front();
} else {
break;
}
}
if (hits.len() as u32) >= self.cap {
return false;
}
hits.push_back(now);
true
}
pub fn current(&self) -> u32 {
let hits = self.hits.lock().unwrap();
hits.len() as u32
}
}
pub struct AdviceWorker {
cfg: WorkerConfig,
db: Db,
claude: ClaudeCli,
mem0: Option<Mem0Client>,
supabase: Option<Arc<SupabaseClient>>,
advice_limit: HashMap<String, Arc<HourlyLimiter>>,
mem0_limit: Arc<HourlyLimiter>,
}
impl AdviceWorker {
pub fn new(cfg: WorkerConfig) -> Result<Self> {
let db = Db::open(&cfg.db_path)?;
let claude = ClaudeCli::new(&cfg.claude_bin);
let mem0 = match (&cfg.mem0_api_key, &cfg.mem0_user_id) {
(Some(k), Some(u)) if !k.is_empty() && !u.is_empty() => {
Some(Mem0Client::new(k.clone(), u.clone())?)
}
_ => None,
};
let supabase = make_heartbeat_client(&cfg).map(Arc::new);
let mem0_limit = Arc::new(HourlyLimiter::new(cfg.mem0_max_writes_per_hour));
Ok(Self {
cfg,
db,
claude,
mem0,
supabase,
advice_limit: HashMap::new(),
mem0_limit,
})
}
fn resolve_locale(&self) -> String {
if let Some(sb) = &self.supabase {
if let Some(user_lang) = sb.get_user_locale() {
return user_lang;
}
}
self.cfg.advice_locale.clone()
}
pub fn run(mut self, rx: Receiver<BurstReady>) {
log_line("[advice] thread up");
let heartbeat = self.supabase.clone();
let mut last_beat = Instant::now() - Duration::from_secs(60);
loop {
if last_beat.elapsed() >= Duration::from_secs(10) {
if let Some(c) = heartbeat.as_ref() {
let _ = c.heartbeat("advice");
}
last_beat = Instant::now();
}
let msg = match rx.recv_timeout(Duration::from_secs(5)) {
Ok(m) => m,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => break,
};
if !self.cfg.advice_enabled {
log_line("[advice] disabled — skipping");
continue;
}
if msg.paths.len() < self.cfg.advice_min_batch {
continue;
}
let limiter = self
.advice_limit
.entry(msg.project.clone())
.or_insert_with(|| Arc::new(HourlyLimiter::new(self.cfg.advice_max_per_hour)))
.clone();
if !limiter.try_consume() {
log_line(&format!(
"[advice] rate-limited for project {} ({}/h)",
msg.project, self.cfg.advice_max_per_hour
));
continue;
}
if let Err(e) = self.process_burst(&msg) {
log_line(&format!("[advice-err] {}: {}", msg.project, e));
}
}
}
fn process_burst(&self, msg: &BurstReady) -> Result<()> {
let project = &msg.project;
let paths = &msg.paths;
log_line(&format!(
"[advice] processing {} ({} paths)",
project,
paths.len()
));
let mem0_query = build_mem0_query(project, paths);
let prior = match &self.mem0 {
Some(m) => match m.search(&mem0_query, 5) {
Ok(v) => v,
Err(e) => {
log_line(&format!("[mem0-search-err] {}", e));
Vec::new()
}
},
None => Vec::new(),
};
let monitor = self.cfg.monitor_dir.clone();
let snippets = collect_snippets(&monitor, project, paths, 8, 1500);
let rules = Rules::load_global();
let prior_text: String = prior
.iter()
.map(|m| format!("- {}", m.memory))
.collect::<Vec<_>>()
.join("\n");
let locale = self.resolve_locale();
log_line(&format!("[advice] locale={}", locale));
let prompt = build_prompt(project, &snippets, &prior_text, &rules.render(), &locale);
let result = self
.claude
.ask_json(&prompt, Duration::from_secs(60))
.map_err(|e| {
let _ = self.db.insert(&Event {
id: None,
project: project.clone(),
event_type: "advice_error".into(),
path: None,
payload: json!({"error": e.to_string()}).to_string(),
severity: "warn".into(),
created_at: Local::now().to_rfc3339(),
synced_at: None,
acked_at: None,
});
e
})?;
if let Some(advice) = result.get("advice").and_then(|v| v.as_array()) {
for item in advice {
let text = item
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let severity = item
.get("severity")
.and_then(|v| v.as_str())
.unwrap_or("info")
.to_string();
let verifiable = item
.get("verifiable")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let paths: Vec<String> = item
.get("paths")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|x| x.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
if text.is_empty() {
continue;
}
let _ = self.db.insert(&Event {
id: None,
project: project.clone(),
event_type: "advice".into(),
path: None,
payload: json!({
"text": text,
"verifiable": verifiable,
"paths": paths,
})
.to_string(),
severity,
created_at: Local::now().to_rfc3339(),
synced_at: None,
acked_at: None,
});
}
}
if let (Some(client), Some(facts)) =
(&self.mem0, result.get("facts").and_then(|v| v.as_array()))
{
for fact in facts {
let text = fact.get("text").and_then(|v| v.as_str()).unwrap_or("");
let conf = fact
.get("confidence")
.and_then(|v| v.as_f64())
.unwrap_or(0.0) as f32;
if text.is_empty() {
continue;
}
if conf < self.cfg.mem0_confidence_threshold {
continue;
}
if !self.mem0_limit.try_consume() {
log_line(&format!(
"[mem0] write cap hit ({}/h) — dropping fact",
self.cfg.mem0_max_writes_per_hour
));
break;
}
let meta = json!({
"project": project,
"confidence": conf,
"source": "devist-worker",
});
if let Err(e) = client.add(text, Some(meta)) {
log_line(&format!("[mem0-add-err] {}", e));
}
}
}
Ok(())
}
}
fn build_prompt(project: &str, snippets: &str, prior: &str, rules: &str, locale: &str) -> String {
let rules_section = if rules.is_empty() {
String::new()
} else {
format!("User-defined rules (follow these strictly):\n\n{rules}\n\n---\n\n")
};
let locale_section = format!(
"All natural-language strings in your output (every \"text\" field, \
every \"explain\" field) MUST be written in language code `{locale}`. \
Field names and JSON structure stay English.\n\n",
);
format!(
r#"{rules_section}{locale_section}You are a code observer for the project "{project}".
The user just made these changes (paths + first lines shown):
{snippets}
Existing long-term memory about this user/project:
{prior}
Output STRICT JSON (no markdown fences, no commentary). Schema:
{{
"facts": [
{{"text": "<short factual statement worth remembering long-term>", "confidence": <0..1>}}
],
"advice": [
{{"text": "<actionable suggestion>", "severity": "info|suggest|warn|block", "verifiable": true|false, "paths": ["<file paths the advice applies to>"]}}
]
}}
`verifiable`:
- true if the issue is OBJECTIVELY checkable in current file content (e.g., "missing input validation in src/auth.tsx", "secret literal in .env.example", "TODO left at line N")
- false if the advice is SUBJECTIVE / opinion (e.g., "consider refactoring", "naming could be clearer", "add more tests")
`paths`: list of repo-relative file paths the advice references. Empty array if not applicable.
Defaults (overridden by user-defined rules above):
- "facts" should be DURABLE truths (preferences, conventions, architectural choices). Skip transient changes.
- "advice" should be high-signal: missing tests, security issues, dependency mismatches, conflicting patterns. Skip nitpicks.
- Empty arrays are fine. Prefer fewer, higher-quality items.
- Confidence reflects how sure you are this is worth long-term storage.
"#
)
}
fn collect_snippets(
monitor: &Path,
project: &str,
rel_paths: &[String],
max_files: usize,
max_bytes_per_file: usize,
) -> String {
let mut out = String::new();
for rel in rel_paths.iter().take(max_files) {
let abs = monitor.join(rel);
if crate::worker::secrets::is_secret_path(&abs) {
continue;
}
let display = rel
.strip_prefix(&format!("{}/", project))
.unwrap_or(rel)
.to_string();
out.push_str(&format!("--- {} ---\n", display));
match std::fs::read(&abs) {
Ok(bytes) => {
if looks_binary(&bytes) {
out.push_str("(binary, skipped)\n\n");
continue;
}
let n = bytes.len().min(max_bytes_per_file);
let snippet = String::from_utf8_lossy(&bytes[..n]);
out.push_str(&snippet);
if bytes.len() > n {
out.push_str("\n…(truncated)\n");
}
out.push_str("\n\n");
}
Err(_) => out.push_str("(unreadable / deleted)\n\n"),
}
}
if rel_paths.len() > max_files {
out.push_str(&format!(
"(+ {} more files not shown)\n",
rel_paths.len() - max_files
));
}
out
}
fn make_heartbeat_client(cfg: &WorkerConfig) -> Option<SupabaseClient> {
let (url, key) = match (&cfg.supabase_url, &cfg.supabase_key) {
(Some(u), Some(k)) if !u.is_empty() && !k.is_empty() => (u.as_str(), k.as_str()),
_ => return None,
};
let client_id = cfg
.client_id
.as_deref()
.filter(|s| !s.is_empty())
.unwrap_or("unknown");
SupabaseClient::new(url, key, client_id).ok()
}
fn looks_binary(bytes: &[u8]) -> bool {
bytes.iter().take(2048).any(|&b| b == 0)
}
fn build_mem0_query(project: &str, paths: &[String]) -> String {
use std::collections::BTreeSet;
let mut dirs: BTreeSet<String> = BTreeSet::new();
let mut exts: BTreeSet<String> = BTreeSet::new();
let mut filenames: Vec<String> = Vec::new();
for p in paths.iter().take(20) {
let pp = Path::new(p);
if let Some(parent) = pp.parent().and_then(|x| x.to_str()) {
if !parent.is_empty() && parent != project {
dirs.insert(parent.to_string());
}
}
if let Some(ext) = pp.extension().and_then(|e| e.to_str()) {
exts.insert(ext.to_lowercase());
}
if let Some(name) = pp.file_name().and_then(|n| n.to_str()) {
filenames.push(name.to_string());
}
}
let dir_list = dirs.iter().take(8).cloned().collect::<Vec<_>>().join(", ");
let ext_list = exts.iter().cloned().collect::<Vec<_>>().join(", ");
let file_list = filenames
.iter()
.take(6)
.cloned()
.collect::<Vec<_>>()
.join(", ");
format!(
"Project '{project}': recent edits in {dir_list} \
affecting files like {file_list} ({ext_list}). \
Find prior conventions, preferences, and architectural decisions \
relevant to these areas of the codebase."
)
}
fn log_line(msg: &str) {
let now = Local::now().format("%Y-%m-%d %H:%M:%S");
println!("{} {}", now, msg);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mem0_query_includes_dirs_and_extensions() {
let q = build_mem0_query(
"myapp",
&[
"myapp/src/auth/login.tsx".into(),
"myapp/src/auth/session.ts".into(),
"myapp/package.json".into(),
],
);
assert!(q.contains("myapp"));
assert!(q.contains("login.tsx") || q.contains("session.ts"));
assert!(q.contains("tsx") || q.contains("ts") || q.contains("json"));
assert!(q.contains("myapp/src/auth"));
}
#[test]
fn mem0_query_handles_empty_paths() {
let q = build_mem0_query("x", &[]);
assert!(q.contains("x"));
}
#[test]
fn limiter_caps_to_window() {
let l = HourlyLimiter::new(3);
assert!(l.try_consume());
assert!(l.try_consume());
assert!(l.try_consume());
assert!(!l.try_consume());
assert_eq!(l.current(), 3);
}
}