#![allow(dead_code)]
use anyhow::Result;
use chrono::Local;
use serde_json::json;
use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::worker::claude::ClaudeCli;
use crate::worker::config::WorkerConfig;
use crate::worker::db::{Db, Event};
use crate::worker::mem0::Mem0Client;
use crate::worker::rules::Rules;
pub struct BurstReady {
pub project: String,
pub paths: Vec<String>,
}
#[derive(Debug, Default)]
pub struct ProjectBurst {
pub paths: Vec<String>,
pub last_event: Option<Instant>,
pub started: Option<Instant>,
}
impl ProjectBurst {
pub fn record(&mut self, path: String) {
if self.started.is_none() {
self.started = Some(Instant::now());
}
self.last_event = Some(Instant::now());
if !self.paths.contains(&path) {
self.paths.push(path);
}
}
pub fn is_idle(&self, idle: Duration) -> bool {
self.last_event
.map(|t| t.elapsed() >= idle)
.unwrap_or(false)
}
pub fn drain(&mut self) -> Vec<String> {
self.started = None;
self.last_event = None;
std::mem::take(&mut self.paths)
}
}
pub struct HourlyLimiter {
cap: u32,
hits: Mutex<VecDeque<Instant>>,
}
impl HourlyLimiter {
pub fn new(cap: u32) -> Self {
Self {
cap,
hits: Mutex::new(VecDeque::new()),
}
}
pub fn try_consume(&self) -> bool {
let mut hits = self.hits.lock().unwrap();
let now = Instant::now();
while let Some(&front) = hits.front() {
if now.duration_since(front) > Duration::from_secs(3600) {
hits.pop_front();
} else {
break;
}
}
if (hits.len() as u32) >= self.cap {
return false;
}
hits.push_back(now);
true
}
pub fn current(&self) -> u32 {
let hits = self.hits.lock().unwrap();
hits.len() as u32
}
}
pub struct AdviceWorker {
cfg: WorkerConfig,
db: Db,
claude: ClaudeCli,
mem0: Option<Mem0Client>,
advice_limit: HashMap<String, Arc<HourlyLimiter>>,
mem0_limit: Arc<HourlyLimiter>,
}
impl AdviceWorker {
pub fn new(cfg: WorkerConfig) -> Result<Self> {
let db = Db::open(&cfg.db_path)?;
let claude = ClaudeCli::new(&cfg.claude_bin);
let mem0 = match (&cfg.mem0_api_key, &cfg.mem0_user_id) {
(Some(k), Some(u)) if !k.is_empty() && !u.is_empty() => {
Some(Mem0Client::new(k.clone(), u.clone())?)
}
_ => None,
};
let mem0_limit = Arc::new(HourlyLimiter::new(cfg.mem0_max_writes_per_hour));
Ok(Self {
cfg,
db,
claude,
mem0,
advice_limit: HashMap::new(),
mem0_limit,
})
}
pub fn run(mut self, rx: Receiver<BurstReady>) {
log_line("[advice] thread up");
for msg in rx {
if !self.cfg.advice_enabled {
log_line("[advice] disabled — skipping");
continue;
}
if msg.paths.len() < self.cfg.advice_min_batch {
continue;
}
let limiter = self
.advice_limit
.entry(msg.project.clone())
.or_insert_with(|| Arc::new(HourlyLimiter::new(self.cfg.advice_max_per_hour)))
.clone();
if !limiter.try_consume() {
log_line(&format!(
"[advice] rate-limited for project {} ({}/h)",
msg.project, self.cfg.advice_max_per_hour
));
continue;
}
if let Err(e) = self.process_burst(&msg) {
log_line(&format!("[advice-err] {}: {}", msg.project, e));
}
}
}
fn process_burst(&self, msg: &BurstReady) -> Result<()> {
let project = &msg.project;
let paths = &msg.paths;
log_line(&format!(
"[advice] processing {} ({} paths)",
project,
paths.len()
));
let mem0_query = build_mem0_query(project, paths);
let prior = match &self.mem0 {
Some(m) => match m.search(&mem0_query, 5) {
Ok(v) => v,
Err(e) => {
log_line(&format!("[mem0-search-err] {}", e));
Vec::new()
}
},
None => Vec::new(),
};
let monitor = self.cfg.monitor_dir.clone();
let snippets = collect_snippets(&monitor, project, paths, 8, 1500);
let rules = Rules::load(&monitor, project);
let prior_text: String = prior
.iter()
.map(|m| format!("- {}", m.memory))
.collect::<Vec<_>>()
.join("\n");
let prompt = build_prompt(project, &snippets, &prior_text, &rules.render());
let result = self
.claude
.ask_json(&prompt, Duration::from_secs(60))
.map_err(|e| {
let _ = self.db.insert(&Event {
id: None,
project: project.clone(),
event_type: "advice_error".into(),
path: None,
payload: json!({"error": e.to_string()}).to_string(),
severity: "warn".into(),
created_at: Local::now().to_rfc3339(),
synced_at: None,
acked_at: None,
});
e
})?;
if let Some(advice) = result.get("advice").and_then(|v| v.as_array()) {
for item in advice {
let text = item
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let severity = item
.get("severity")
.and_then(|v| v.as_str())
.unwrap_or("info")
.to_string();
if text.is_empty() {
continue;
}
let _ = self.db.insert(&Event {
id: None,
project: project.clone(),
event_type: "advice".into(),
path: None,
payload: json!({"text": text, "raw": item}).to_string(),
severity,
created_at: Local::now().to_rfc3339(),
synced_at: None,
acked_at: None,
});
}
}
if let (Some(client), Some(facts)) =
(&self.mem0, result.get("facts").and_then(|v| v.as_array()))
{
for fact in facts {
let text = fact.get("text").and_then(|v| v.as_str()).unwrap_or("");
let conf = fact
.get("confidence")
.and_then(|v| v.as_f64())
.unwrap_or(0.0) as f32;
if text.is_empty() {
continue;
}
if conf < self.cfg.mem0_confidence_threshold {
continue;
}
if !self.mem0_limit.try_consume() {
log_line(&format!(
"[mem0] write cap hit ({}/h) — dropping fact",
self.cfg.mem0_max_writes_per_hour
));
break;
}
let meta = json!({
"project": project,
"confidence": conf,
"source": "devist-worker",
});
if let Err(e) = client.add(text, Some(meta)) {
log_line(&format!("[mem0-add-err] {}", e));
}
}
}
Ok(())
}
}
fn build_prompt(project: &str, snippets: &str, prior: &str, rules: &str) -> String {
let rules_section = if rules.is_empty() {
String::new()
} else {
format!("User-defined rules (follow these strictly):\n\n{rules}\n\n---\n\n")
};
format!(
r#"{rules_section}You are a code observer for the project "{project}".
The user just made these changes (paths + first lines shown):
{snippets}
Existing long-term memory about this user/project:
{prior}
Output STRICT JSON (no markdown fences, no commentary). Schema:
{{
"facts": [
{{"text": "<short factual statement worth remembering long-term>", "confidence": <0..1>}}
],
"advice": [
{{"text": "<actionable suggestion>", "severity": "info|suggest|warn|block"}}
]
}}
Defaults (overridden by user-defined rules above):
- "facts" should be DURABLE truths (preferences, conventions, architectural choices). Skip transient changes.
- "advice" should be high-signal: missing tests, security issues, dependency mismatches, conflicting patterns. Skip nitpicks.
- Empty arrays are fine. Prefer fewer, higher-quality items.
- Confidence reflects how sure you are this is worth long-term storage.
"#
)
}
fn collect_snippets(
monitor: &Path,
project: &str,
rel_paths: &[String],
max_files: usize,
max_bytes_per_file: usize,
) -> String {
let mut out = String::new();
for rel in rel_paths.iter().take(max_files) {
let abs = monitor.join(rel);
if crate::worker::secrets::is_secret_path(&abs) {
continue;
}
let display = rel
.strip_prefix(&format!("{}/", project))
.unwrap_or(rel)
.to_string();
out.push_str(&format!("--- {} ---\n", display));
match std::fs::read(&abs) {
Ok(bytes) => {
if looks_binary(&bytes) {
out.push_str("(binary, skipped)\n\n");
continue;
}
let n = bytes.len().min(max_bytes_per_file);
let snippet = String::from_utf8_lossy(&bytes[..n]);
out.push_str(&snippet);
if bytes.len() > n {
out.push_str("\n…(truncated)\n");
}
out.push_str("\n\n");
}
Err(_) => out.push_str("(unreadable / deleted)\n\n"),
}
}
if rel_paths.len() > max_files {
out.push_str(&format!(
"(+ {} more files not shown)\n",
rel_paths.len() - max_files
));
}
out
}
fn looks_binary(bytes: &[u8]) -> bool {
bytes.iter().take(2048).any(|&b| b == 0)
}
fn build_mem0_query(project: &str, paths: &[String]) -> String {
use std::collections::BTreeSet;
let mut dirs: BTreeSet<String> = BTreeSet::new();
let mut exts: BTreeSet<String> = BTreeSet::new();
let mut filenames: Vec<String> = Vec::new();
for p in paths.iter().take(20) {
let pp = Path::new(p);
if let Some(parent) = pp.parent().and_then(|x| x.to_str()) {
if !parent.is_empty() && parent != project {
dirs.insert(parent.to_string());
}
}
if let Some(ext) = pp.extension().and_then(|e| e.to_str()) {
exts.insert(ext.to_lowercase());
}
if let Some(name) = pp.file_name().and_then(|n| n.to_str()) {
filenames.push(name.to_string());
}
}
let dir_list = dirs.iter().take(8).cloned().collect::<Vec<_>>().join(", ");
let ext_list = exts.iter().cloned().collect::<Vec<_>>().join(", ");
let file_list = filenames
.iter()
.take(6)
.cloned()
.collect::<Vec<_>>()
.join(", ");
format!(
"Project '{project}': recent edits in {dir_list} \
affecting files like {file_list} ({ext_list}). \
Find prior conventions, preferences, and architectural decisions \
relevant to these areas of the codebase."
)
}
fn log_line(msg: &str) {
let now = Local::now().format("%Y-%m-%d %H:%M:%S");
println!("{} {}", now, msg);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mem0_query_includes_dirs_and_extensions() {
let q = build_mem0_query(
"myapp",
&[
"myapp/src/auth/login.tsx".into(),
"myapp/src/auth/session.ts".into(),
"myapp/package.json".into(),
],
);
assert!(q.contains("myapp"));
assert!(q.contains("login.tsx") || q.contains("session.ts"));
assert!(q.contains("tsx") || q.contains("ts") || q.contains("json"));
assert!(q.contains("myapp/src/auth"));
}
#[test]
fn mem0_query_handles_empty_paths() {
let q = build_mem0_query("x", &[]);
assert!(q.contains("x"));
}
#[test]
fn limiter_caps_to_window() {
let l = HourlyLimiter::new(3);
assert!(l.try_consume());
assert!(l.try_consume());
assert!(l.try_consume());
assert!(!l.try_consume());
assert_eq!(l.current(), 3);
}
}