use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::{Duration as StdDuration, Instant};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use quiver_core::usage::{Outcome, UsageEvent};
use quiver_recommender::embed::Embedder;
use quiver_storage::{open, suggestions, tools as tools_store, usage};
use rusqlite::Connection;
use crate::AgentConfig;
use crate::hint;
use crate::recommend::top_k;
use crate::tail::{TailEvent, TailReader, walk_jsonl};
#[derive(Debug, Clone)]
struct PendingUse {
tool_id: String,
session_id: String,
task_text: Option<String>,
occurred_at: DateTime<Utc>,
}
type LastTexts = HashMap<String, String>;
pub async fn run(cfg: AgentConfig) -> Result<()> {
if let Some(parent) = cfg.db_path.parent() {
std::fs::create_dir_all(parent)?;
}
if !cfg.sessions_dir.exists() {
anyhow::bail!(
"sessions_dir does not exist: {}",
cfg.sessions_dir.display()
);
}
std::fs::create_dir_all(&cfg.hints_dir)
.with_context(|| format!("create hints_dir {}", cfg.hints_dir.display()))?;
if let Err(e) = hint::cleanup_stale(&cfg.hints_dir, 7) {
tracing::warn!("hints cleanup failed: {e}");
}
let mut conn = open(&cfg.db_path)?;
let catalogue = load_catalogue(&conn)?;
tracing::info!(
catalogued = catalogue.len(),
sessions_dir = %cfg.sessions_dir.display(),
hints_dir = %cfg.hints_dir.display(),
"agent: indexing existing sessions"
);
let mut readers: HashMap<PathBuf, TailReader> = HashMap::new();
for p in walk_jsonl(&cfg.sessions_dir) {
match TailReader::at_eof(&p) {
Ok(r) => {
readers.insert(p, r);
},
Err(e) => tracing::warn!("tail seed {} failed: {e}", p.display()),
}
}
let embedder = Embedder::new().context("init fastembed")?;
let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
let mut watcher: RecommendedWatcher = notify::recommended_watcher(tx)?;
watcher
.watch(&cfg.sessions_dir, RecursiveMode::Recursive)
.with_context(|| format!("watch {}", cfg.sessions_dir.display()))?;
tracing::info!("agent: watching for new prompts. Ctrl-C to exit.");
let mut last_texts: LastTexts = HashMap::new();
let mut pending: HashMap<String, PendingUse> = HashMap::new();
let mut events_since_recompute = 0usize;
let mut last_recompute = Instant::now();
loop {
let next = rx.recv_timeout(StdDuration::from_millis(500));
match next {
Ok(Ok(ev)) => {
handle_fs_event(&ev, &cfg, &mut readers);
let mut all_events = Vec::new();
for r in readers.values_mut() {
match r.poll() {
Ok(mut evs) => all_events.append(&mut evs),
Err(e) => tracing::warn!("poll {} failed: {e}", r.path.display()),
}
}
for tev in all_events {
if let Err(e) = dispatch_event(
tev,
&cfg,
&conn,
&embedder,
&mut last_texts,
&mut pending,
&catalogue,
)
.await
{
tracing::warn!("dispatch failed: {e}");
} else {
events_since_recompute += 1;
}
}
},
Ok(Err(e)) => tracing::warn!("notify error: {e}"),
Err(mpsc::RecvTimeoutError::Timeout) => {},
Err(mpsc::RecvTimeoutError::Disconnected) => {
tracing::error!("notify channel disconnected, exiting");
break;
},
}
let elapsed = last_recompute.elapsed();
let interval = StdDuration::from_secs(cfg.score_recompute_interval_secs);
if events_since_recompute >= 50 || (elapsed >= interval && events_since_recompute > 0) {
match usage::recompute_scores(&mut conn) {
Ok(n) => tracing::info!("recomputed scores for {n} tool(s)"),
Err(e) => tracing::warn!("recompute_scores failed: {e}"),
}
events_since_recompute = 0;
last_recompute = Instant::now();
}
}
Ok(())
}
fn handle_fs_event(ev: &Event, cfg: &AgentConfig, readers: &mut HashMap<PathBuf, TailReader>) {
match ev.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
for p in &ev.paths {
if p.extension().and_then(|s| s.to_str()) != Some("jsonl") {
continue;
}
if !readers.contains_key(p) {
readers.insert(p.clone(), TailReader::at_start(p));
tracing::debug!("agent: new file {}", p.display());
}
}
},
EventKind::Remove(_) => {
for p in &ev.paths {
readers.remove(p);
}
},
_ => {},
}
for p in walk_jsonl(&cfg.sessions_dir) {
if !readers.contains_key(&p)
&& let Ok(r) = TailReader::at_eof(&p)
{
readers.insert(p, r);
}
}
}
async fn dispatch_event(
ev: TailEvent,
cfg: &AgentConfig,
conn: &Connection,
embedder: &Embedder,
last_texts: &mut LastTexts,
pending: &mut HashMap<String, PendingUse>,
catalogue: &HashSet<String>,
) -> Result<()> {
match ev {
TailEvent::UserText {
session_id,
text,
ts,
} => {
last_texts.insert(session_id.clone(), text.clone());
let classified = match &cfg.classifier {
Some(c) => c.classify(&text).await,
None => crate::classify::ClassifiedTask::passthrough(&text),
};
if !classified.is_task {
tracing::debug!(
session = %session_id,
"classifier dropped non-task message"
);
return Ok(());
}
let hits = top_k(conn, embedder, &classified.query, cfg.top_k)?;
if let Err(e) = hint::write_hint(&cfg.hints_dir, &session_id, Some(&text), ts, &hits) {
tracing::warn!("write_hint {session_id} failed: {e}");
}
if let Some(top) = hits.first()
&& catalogue.contains(&top.tool_id)
{
suggestions::record(
conn,
&session_id,
&top.tool_id,
Some(&text),
Some(top.score as f64),
ts,
)?;
tracing::info!(
session = %session_id,
tool = %top.tool_id,
"suggested top-1"
);
}
},
TailEvent::ToolUse {
session_id,
uuid,
tool_id,
ts,
} => {
if catalogue.contains(&tool_id) {
let n = suggestions::mark_accepted(
conn,
&session_id,
&tool_id,
ts,
cfg.acceptance_window_minutes,
)?;
if n > 0 {
tracing::info!(
session = %session_id,
tool = %tool_id,
accepted = n,
"suggestion accepted"
);
}
}
let task_text = last_texts.get(&session_id).cloned();
pending.insert(
uuid,
PendingUse {
tool_id,
session_id,
task_text,
occurred_at: ts,
},
);
},
TailEvent::ToolResult {
session_id: _,
uuid,
is_error,
..
} => {
if let Some(p) = pending.remove(&uuid)
&& catalogue.contains(&p.tool_id)
{
let outcome = match is_error {
Some(true) => Outcome::Failure,
_ => Outcome::Success,
};
let evt = UsageEvent {
uuid: Some(uuid),
tool_id: p.tool_id,
session_id: Some(p.session_id),
project: None,
task_text: p.task_text,
outcome,
duration_ms: None,
cost_usd: None,
occurred_at: p.occurred_at,
};
if let Err(e) = usage::insert_event(conn, &evt) {
tracing::warn!("insert_event failed: {e}");
}
}
},
}
Ok(())
}
fn load_catalogue(conn: &Connection) -> Result<HashSet<String>> {
Ok(tools_store::list_all(conn)?
.into_iter()
.map(|m| m.id)
.collect())
}