use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use serde::Serialize;
use std::path::PathBuf;
use std::process::Command as PCommand;
mod tui;
#[derive(Serialize)]
struct DoctorReport {
task_journal_version: &'static str,
claude_in_path: bool,
claude_version: Option<String>,
data_dir: PathBuf,
events_dir: PathBuf,
state_dir: PathBuf,
metrics_dir: PathBuf,
events_dir_writable: bool,
state_dir_writable: bool,
metrics_dir_writable: bool,
known_projects: Vec<String>,
schema_versions_applied: Vec<i64>,
issues: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
notes: Vec<String>,
}
impl DoctorReport {
fn print_human(&self) {
println!("task-journal doctor");
println!(" version {}", self.task_journal_version);
println!(
" claude binary {}",
if self.claude_in_path {
self.claude_version
.clone()
.unwrap_or_else(|| "found (version unknown)".into())
} else {
"NOT FOUND in PATH".into()
}
);
println!(" data dir {}", self.data_dir.display());
println!(
" events dir {} ({})",
self.events_dir.display(),
if self.events_dir_writable {
"writable"
} else {
"NOT writable"
}
);
println!(
" state dir {} ({})",
self.state_dir.display(),
if self.state_dir_writable {
"writable"
} else {
"NOT writable"
}
);
println!(
" metrics dir {} ({})",
self.metrics_dir.display(),
if self.metrics_dir_writable {
"writable"
} else {
"NOT writable"
}
);
println!(" known projects {}", self.known_projects.len());
if !self.schema_versions_applied.is_empty() {
let v: Vec<String> = self
.schema_versions_applied
.iter()
.map(|n| format!("v{n:03}"))
.collect();
println!(" schema (current) {}", v.join(", "));
}
if !self.notes.is_empty() {
println!("\nℹ {} note(s):", self.notes.len());
for n in &self.notes {
println!(" - {n}");
}
}
if self.issues.is_empty() {
println!("\n✓ all checks passed");
} else {
println!("\n✗ {} issue(s):", self.issues.len());
for i in &self.issues {
println!(" - {i}");
}
}
}
}
fn dir_writable(dir: &std::path::Path) -> bool {
if std::fs::create_dir_all(dir).is_err() {
return false;
}
let probe = dir.join(".tj-doctor-write-probe");
let r = std::fs::write(&probe, b"ok").is_ok();
let _ = std::fs::remove_file(&probe);
r
}
fn run_migrate_project(from: &std::path::Path, to: &std::path::Path, force: bool) -> Result<()> {
let from_hash = tj_core::project_hash::from_path(from)
.with_context(|| format!("compute project_hash for --from {from:?}"))?;
let to_hash = tj_core::project_hash::from_path(to)
.with_context(|| format!("compute project_hash for --to {to:?}"))?;
if from_hash == to_hash {
anyhow::bail!(
"--from and --to resolve to the same project_hash ({from_hash}) — nothing to migrate"
);
}
let events_dir = tj_core::paths::events_dir()?;
let state_dir = tj_core::paths::state_dir()?;
let metrics_dir = tj_core::paths::metrics_dir()?;
let pairs = [
(
events_dir.join(format!("{from_hash}.jsonl")),
events_dir.join(format!("{to_hash}.jsonl")),
),
(
state_dir.join(format!("{from_hash}.sqlite")),
state_dir.join(format!("{to_hash}.sqlite")),
),
(
metrics_dir.join(format!("{from_hash}.jsonl")),
metrics_dir.join(format!("{to_hash}.jsonl")),
),
];
if !force {
for (_src, dst) in &pairs {
if dst.exists() {
anyhow::bail!(
"destination already exists: {} — pass --force to overwrite",
dst.display()
);
}
}
}
let mut moved: Vec<String> = Vec::new();
for (src, dst) in &pairs {
if !src.exists() {
continue;
}
if let Some(parent) = dst.parent() {
std::fs::create_dir_all(parent)?;
}
if dst.exists() && force {
std::fs::remove_file(dst).with_context(|| format!("remove existing {dst:?}"))?;
}
std::fs::rename(src, dst).with_context(|| format!("rename {src:?} -> {dst:?}"))?;
moved.push(dst.display().to_string());
}
let new_state_path = state_dir.join(format!("{to_hash}.sqlite"));
if new_state_path.exists() {
let conn = tj_core::db::open(&new_state_path)?;
conn.execute(
"UPDATE tasks SET project_hash = ?1 WHERE project_hash = ?2",
rusqlite::params![to_hash, from_hash],
)?;
conn.execute(
"UPDATE index_state SET project_hash = ?1 WHERE project_hash = ?2",
rusqlite::params![to_hash, from_hash],
)?;
}
if moved.is_empty() {
println!("no on-disk data found for project_hash {from_hash} — nothing to migrate");
} else {
println!("migrated {} file(s):", moved.len());
for path in moved {
println!(" {path}");
}
println!(" project_hash {from_hash} -> {to_hash}");
}
Ok(())
}
fn html_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'&' => out.push_str("&"),
'<' => out.push_str("<"),
'>' => out.push_str(">"),
'"' => out.push_str("""),
'\'' => out.push_str("'"),
_ => out.push(c),
}
}
out
}
const HTML_TIMELINE_CSS: &str = r#"
:root { color-scheme: light dark; --fg:#222; --bg:#fafafa; --muted:#666; --accent:#0366d6; }
@media (prefers-color-scheme: dark) { :root { --fg:#eee; --bg:#1a1a1a; --muted:#999; --accent:#58a6ff; } }
* { box-sizing: border-box; }
body { font: 14px/1.5 -apple-system, BlinkMacSystemFont, "Segoe UI", system-ui, sans-serif;
color: var(--fg); background: var(--bg); margin: 0; padding: 1.5rem; }
header h1 { margin: 0 0 1.5rem; font-size: 1.4rem; }
article { margin-bottom: 2rem; padding: 1rem 1.25rem; background: rgba(127,127,127,0.07);
border-radius: 6px; }
article h2 { margin: 0; font-size: 1.05rem; font-weight: 600; }
.tid { font-family: ui-monospace, "SF Mono", Menlo, Consolas, monospace;
color: var(--accent); margin-right: 0.4em; }
.meta { color: var(--muted); font-size: 0.85rem; margin: 0.25rem 0 0.75rem; }
ol.timeline { list-style: none; margin: 0; padding-left: 0; }
ol.timeline li { padding: 0.4rem 0; border-top: 1px solid rgba(127,127,127,0.15); }
ol.timeline li:first-child { border-top: none; }
time { font-family: ui-monospace, monospace; color: var(--muted); margin-right: 0.6em; }
.type { display: inline-block; padding: 0 0.35em; margin-right: 0.4em; border-radius: 3px;
font-size: 0.75rem; text-transform: uppercase; letter-spacing: 0.05em;
background: rgba(127,127,127,0.15); }
.type-decision { background: rgba(3,102,214,0.18); color: var(--accent); }
.type-rejection { background: rgba(214,3,3,0.18); }
.type-evidence { background: rgba(40,167,69,0.18); }
.type-finding { background: rgba(255,166,0,0.20); }
.suggested::after { content: " ?"; color: var(--muted); }
"#;
fn render_html_timeline(events: &[&tj_core::event::Event]) -> String {
use std::collections::BTreeMap;
let mut tasks: BTreeMap<String, Vec<&tj_core::event::Event>> = BTreeMap::new();
for e in events {
tasks.entry(e.task_id.clone()).or_default().push(e);
}
let mut out = String::new();
out.push_str("<!doctype html>\n");
out.push_str("<html lang=\"en\"><head>");
out.push_str("<meta charset=\"utf-8\">");
out.push_str("<meta name=\"viewport\" content=\"width=device-width,initial-scale=1\">");
out.push_str("<title>Task Journal — Export</title>");
out.push_str("<style>");
out.push_str(HTML_TIMELINE_CSS);
out.push_str("</style>");
out.push_str("</head><body>");
out.push_str("<header><h1>Task Journal — Export</h1></header>");
out.push_str("<main>");
for (task_id, task_events) in &tasks {
let title = task_events
.iter()
.find(|e| e.event_type == tj_core::event::EventType::Open)
.and_then(|e| {
e.meta
.get("title")
.and_then(|v| v.as_str())
.map(String::from)
.or_else(|| Some(e.text.clone()))
})
.unwrap_or_else(|| "(untitled)".into());
let closed = task_events
.last()
.map(|e| e.event_type == tj_core::event::EventType::Close)
.unwrap_or(false);
let status = if closed { "closed" } else { "open" };
let created = task_events
.first()
.map(|e| e.timestamp.as_str())
.unwrap_or("?");
out.push_str("<article>");
out.push_str(&format!(
"<h2><span class=\"tid\">{}</span>{}</h2>",
html_escape(task_id),
html_escape(&title)
));
out.push_str(&format!(
"<p class=\"meta\">status: {} · created: {}</p>",
status,
html_escape(created)
));
out.push_str("<ol class=\"timeline\">");
for e in task_events {
let etype = serde_json::to_value(e.event_type)
.ok()
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| "unknown".into());
let suggested_class = if matches!(e.status, tj_core::event::EventStatus::Suggested) {
" suggested"
} else {
""
};
out.push_str(&format!(
"<li class=\"event{}\"><time>{}</time>\
<span class=\"type type-{}\">{}</span>{}</li>",
suggested_class,
html_escape(&e.timestamp),
html_escape(&etype),
html_escape(&etype),
html_escape(&e.text)
));
}
out.push_str("</ol>");
out.push_str("</article>");
}
out.push_str("</main></body></html>\n");
out
}
fn pending_dir() -> Result<std::path::PathBuf> {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let dir = events_path
.parent()
.and_then(|p| p.parent())
.ok_or_else(|| anyhow::anyhow!("events_dir has no grandparent"))?
.join("pending");
Ok(dir)
}
fn run_pending_list() -> Result<()> {
let dir = pending_dir()?;
if !dir.exists() {
println!("(no pending entries)");
return Ok(());
}
let mut entries: Vec<(String, String, String, u32)> = Vec::new();
for entry in std::fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let id = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("?")
.to_string();
let body = std::fs::read_to_string(&path)?;
let v: serde_json::Value = serde_json::from_str(&body)?;
let queued_at = v
.get("queued_at")
.and_then(|x| x.as_str())
.unwrap_or("?")
.to_string();
let text_preview: String = v
.get("text")
.and_then(|x| x.as_str())
.unwrap_or("")
.chars()
.take(72)
.collect();
let attempts = v.get("attempts").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
let dead_marker = if id.ends_with(".dead") { " [DEAD]" } else { "" };
entries.push((id, queued_at, text_preview, attempts));
let _ = dead_marker;
}
if entries.is_empty() {
println!("(no pending entries)");
return Ok(());
}
println!("{:<26} {:<25} attempts text", "id", "queued_at");
for (id, qa, text, attempts) in &entries {
println!("{id:<26} {qa:<25} {attempts:<8} {text}");
}
Ok(())
}
fn run_pending_retry(
mock_etype: Option<&str>,
mock_tid: Option<&str>,
mock_conf: Option<f64>,
) -> Result<()> {
let dir = pending_dir()?;
if !dir.exists() {
println!("(no pending entries)");
return Ok(());
}
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let mut succeeded = 0usize;
let mut died = 0usize;
let mut still_pending = 0usize;
for entry in std::fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
if path
.file_stem()
.and_then(|s| s.to_str())
.map(|s| s.ends_with(".dead"))
.unwrap_or(false)
{
continue; }
let body = std::fs::read_to_string(&path)?;
let mut v: serde_json::Value = serde_json::from_str(&body)?;
if v.get("schema").and_then(|x| x.as_str()) == Some("v2") {
continue;
}
let attempts = v.get("attempts").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
let text = v
.get("text")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
let outcome: anyhow::Result<()> = match (mock_etype, mock_tid) {
(Some(etype), Some(tid)) => {
let mut event = tj_core::event::Event::new(
tid,
parse_event_type(etype)?,
tj_core::event::Author::Classifier,
tj_core::event::Source::Hook,
text,
);
event.confidence = mock_conf;
event.status = tj_core::classifier::decide_status(mock_conf.unwrap_or(1.0));
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
Ok(())
}
_ => Err(anyhow::anyhow!(
"no real classifier wired in retry path yet — pass --mock-* for tests, or run install-hooks and let the hook drain the queue"
)),
};
match outcome {
Ok(()) => {
std::fs::remove_file(&path)?;
succeeded += 1;
}
Err(_) => {
let new_attempts = attempts + 1;
if new_attempts >= PENDING_MAX_ATTEMPTS {
let dead_path = path.with_file_name(format!(
"{}.dead.json",
path.file_stem().and_then(|s| s.to_str()).unwrap_or("dead")
));
std::fs::rename(&path, &dead_path)?;
died += 1;
} else {
if let Some(obj) = v.as_object_mut() {
obj.insert(
"attempts".into(),
serde_json::Value::Number(new_attempts.into()),
);
}
std::fs::write(&path, serde_json::to_string_pretty(&v)?)?;
still_pending += 1;
}
}
}
}
println!(
"pending retry: {succeeded} drained, {still_pending} still pending, {died} marked dead"
);
Ok(())
}
fn run_doctor() -> Result<DoctorReport> {
let mut issues: Vec<String> = Vec::new();
let mut notes: Vec<String> = Vec::new();
let claude_check = PCommand::new("claude").arg("--version").output();
let (claude_in_path, claude_version) = match claude_check {
Ok(out) if out.status.success() => {
let v = String::from_utf8_lossy(&out.stdout).trim().to_string();
(true, Some(v))
}
Ok(_) | Err(_) => {
notes.push(
"claude CLI not on PATH — that's fine if you use the API backend \
(set ANTHROPIC_API_KEY). For the CLI backend (free with Pro/Max), \
install Claude Code from https://claude.com/claude-code"
.into(),
);
(false, None)
}
};
let data_dir = tj_core::paths::data_dir()?;
let events_dir = tj_core::paths::events_dir()?;
let state_dir = tj_core::paths::state_dir()?;
let metrics_dir = tj_core::paths::metrics_dir()?;
let events_dir_writable = dir_writable(&events_dir);
let state_dir_writable = dir_writable(&state_dir);
let metrics_dir_writable = dir_writable(&metrics_dir);
if !events_dir_writable {
issues.push(format!("events dir not writable: {}", events_dir.display()));
}
if !state_dir_writable {
issues.push(format!("state dir not writable: {}", state_dir.display()));
}
if !metrics_dir_writable {
issues.push(format!(
"metrics dir not writable: {}",
metrics_dir.display()
));
}
let known_projects = tj_core::db::list_all_projects(&state_dir).unwrap_or_default();
let schema_versions_applied = (|| -> Result<Vec<i64>> {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let state_path = state_dir.join(format!("{project_hash}.sqlite"));
if !state_path.exists() {
return Ok(Vec::new());
}
let conn = tj_core::db::open(&state_path)?;
let mut stmt = conn.prepare("SELECT version FROM schema_migrations ORDER BY version")?;
let v: Vec<i64> = stmt
.query_map([], |r| r.get::<_, i64>(0))?
.collect::<Result<_, _>>()?;
Ok(v)
})()
.unwrap_or_default();
Ok(DoctorReport {
task_journal_version: env!("CARGO_PKG_VERSION"),
claude_in_path,
claude_version,
data_dir,
events_dir,
state_dir,
metrics_dir,
events_dir_writable,
state_dir_writable,
metrics_dir_writable,
known_projects,
schema_versions_applied,
issues,
notes,
})
}
#[derive(Parser)]
#[command(name = "task-journal", version, about = "Task Journal CLI", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Create {
title: String,
#[arg(long)]
context: Option<String>,
#[arg(long)]
goal: Option<String>,
},
Events {
#[command(subcommand)]
action: EventsCmd,
},
RebuildState,
Pack {
task_id: String,
#[arg(long, default_value = "compact")]
mode: String,
},
Event {
task_id: String,
#[arg(long, name = "type")]
r#type: String,
#[arg(long)]
text: String,
#[arg(long)]
corrects: Option<String>,
#[arg(long)]
supersedes: Option<String>,
},
Close {
task_id: String,
#[arg(long)]
reason: Option<String>,
#[arg(long)]
outcome: Option<String>,
#[arg(long)]
outcome_tag: Option<String>,
},
Reopen {
task_id: String,
#[arg(long)]
reason: Option<String>,
},
Stale {
#[arg(long, default_value_t = 7)]
days: i64,
},
PendingGc {
#[arg(long, default_value_t = 7)]
days: i64,
},
Goal {
task_id: String,
text: String,
},
External {
task_id: String,
#[arg(long = "add")]
add: String,
},
Reclassify { task_id: String },
Search {
query: String,
#[arg(long, default_value_t = 20)]
limit: usize,
#[arg(long)]
all_projects: bool,
},
EventCorrect {
#[arg(long)]
corrects: String,
#[arg(long)]
task: String,
#[arg(long)]
text: String,
},
InstallHooks {
#[arg(long, default_value = "user")]
scope: String,
#[arg(long)]
uninstall: bool,
#[arg(long)]
classifier_command: Option<String>,
#[arg(long)]
backfill: bool,
},
Stats,
#[command(alias = "tui")]
Ui {
#[arg(long)]
project: Option<String>,
#[arg(long)]
chats: bool,
},
Backfill {
#[arg(long)]
dry_run: bool,
#[arg(long)]
limit: Option<usize>,
#[arg(long)]
project: Option<String>,
},
Export {
#[arg(long, default_value = "md")]
format: String,
#[arg(long)]
task: Option<String>,
#[arg(long)]
project: Option<String>,
},
Doctor {
#[arg(long)]
json: bool,
},
Pending {
#[command(subcommand)]
action: PendingCmd,
},
MigrateProject {
#[arg(long, value_name = "PATH")]
from: PathBuf,
#[arg(long, value_name = "PATH")]
to: PathBuf,
#[arg(long)]
force: bool,
},
IngestHook {
#[arg(long)]
kind: Option<String>,
#[arg(long)]
text: Option<String>,
#[arg(long, default_value = "cli")]
backend: String,
#[arg(long, hide = true)]
mock_event_type: Option<String>,
#[arg(long, hide = true)]
mock_task_id: Option<String>,
#[arg(long, hide = true)]
mock_confidence: Option<f64>,
},
#[command(hide = true)]
ClassifyWorker {
#[arg(long, default_value = "cli")]
backend: String,
},
#[command(hide = true)]
Statusline,
Rejected {
topic: String,
#[arg(long)]
all_projects: bool,
#[arg(long, default_value_t = 20)]
limit: usize,
#[arg(long)]
since: Option<i64>,
},
ExportPr {
task_id: String,
},
}
#[derive(Subcommand)]
enum EventsCmd {
List {
#[arg(long, default_value_t = 20)]
limit: usize,
},
}
#[derive(Subcommand)]
enum PendingCmd {
List,
Retry {
#[arg(long, hide = true)]
mock_event_type: Option<String>,
#[arg(long, hide = true)]
mock_task_id: Option<String>,
#[arg(long, hide = true)]
mock_confidence: Option<f64>,
},
}
const PENDING_MAX_ATTEMPTS: u32 = 3;
fn main() -> Result<()> {
let cli = Cli::parse();
match cli.command {
Commands::Create {
title,
context,
goal,
} => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_dir = tj_core::paths::events_dir()?;
let events_path = events_dir.join(format!("{project_hash}.jsonl"));
std::fs::create_dir_all(&events_dir)?;
let task_id = tj_core::new_task_id();
let mut event = tj_core::event::Event::new(
task_id.clone(),
tj_core::event::EventType::Open,
tj_core::event::Author::User,
tj_core::event::Source::Cli,
context.clone().unwrap_or_else(|| title.clone()),
);
event.meta = serde_json::json!({ "title": title });
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
if let Some(g) = goal {
let state_path =
tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
tj_core::db::set_task_goal(&conn, &task_id, &g)?;
}
println!("{}", task_id);
}
Commands::Events { action } => match action {
EventsCmd::List { limit } => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path =
tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
if !events_path.exists() {
println!("(no events yet)");
return Ok(());
}
let body = std::fs::read_to_string(&events_path)?;
let mut events: Vec<tj_core::event::Event> = body
.lines()
.filter(|l| !l.trim().is_empty())
.map(serde_json::from_str)
.collect::<Result<_, _>>()?;
events.reverse();
for e in events.into_iter().take(limit) {
let title = e
.meta
.get("title")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| e.text.clone());
println!("{} [{:?}] {}", e.timestamp, e.event_type, title);
}
}
},
Commands::Pack { task_id, mode } => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
let pmode = match mode.as_str() {
"compact" => tj_core::pack::PackMode::Compact,
"full" => tj_core::pack::PackMode::Full,
other => anyhow::bail!("unknown mode: {other}"),
};
let pack = tj_core::pack::assemble(&conn, &task_id, pmode)?;
print!("{}", pack.text);
}
Commands::RebuildState => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
if !events_path.exists() {
anyhow::bail!("no events file at {events_path:?}");
}
let conn = tj_core::db::open(&state_path)?;
let n = tj_core::db::rebuild_state(&conn, &events_path, &project_hash)?;
println!("rebuilt {n} events into {state_path:?}");
}
Commands::Event {
task_id,
r#type,
text,
corrects,
supersedes,
} => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
std::fs::create_dir_all(events_path.parent().unwrap())?;
let event_type = parse_event_type(&r#type)?;
let mut event = tj_core::event::Event::new(
&task_id,
event_type,
tj_core::event::Author::User,
tj_core::event::Source::Cli,
text,
);
event.corrects = corrects;
event.supersedes = supersedes;
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
println!("{}", event.event_id);
}
Commands::Close {
task_id,
reason,
outcome,
outcome_tag,
} => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
if let Some(tag) = outcome_tag.as_deref() {
match tag {
"done" | "abandoned" | "superseded" => {}
other => anyhow::bail!(
"invalid --outcome-tag `{other}` (expected: done | abandoned | superseded)"
),
}
}
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
if !tj_core::db::task_exists(&conn, &task_id)? {
anyhow::bail!("task not found: {task_id}");
}
if let Some(o) = outcome.as_deref() {
tj_core::db::set_task_outcome(&conn, &task_id, o, outcome_tag.as_deref())?;
}
drop(conn);
let mut event = tj_core::event::Event::new(
&task_id,
tj_core::event::EventType::Close,
tj_core::event::Author::User,
tj_core::event::Source::Cli,
reason.clone().unwrap_or_else(|| "(closed)".into()),
);
if let Some(r) = reason {
event.meta = serde_json::json!({"reason": r});
}
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
println!("{}", event.event_id);
}
Commands::Stale { days } => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
let stale = tj_core::db::stale_tasks(&conn, days)?;
if stale.is_empty() {
println!("(no stale tasks — all open tasks active within {days} days)");
} else {
println!("# Stale tasks (idle ≥ {days} days)\n");
for t in stale {
println!(
"{} {} days idle {} {}",
t.task_id, t.days_idle, t.last_event_at, t.title
);
}
println!(
"\nClose abandoned ones with: task-journal close <id> --outcome-tag abandoned --reason <why>"
);
}
}
Commands::PendingGc { days } => {
let pending_dir = tj_core::paths::events_dir()?
.parent()
.ok_or_else(|| anyhow::anyhow!("events_dir has no parent"))?
.join("pending");
if !pending_dir.exists() {
println!("(no pending dir — nothing to gc)");
return Ok(());
}
let cutoff = chrono::Utc::now() - chrono::Duration::days(days);
let mut removed = 0usize;
for entry in std::fs::read_dir(&pending_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
let mtime = entry
.metadata()
.and_then(|m| m.modified())
.ok()
.and_then(|t| {
chrono::DateTime::<chrono::Utc>::from(t)
.signed_duration_since(cutoff)
.num_seconds()
.into()
});
if let Some(secs) = mtime {
if secs < 0 && std::fs::remove_file(&path).is_ok() {
removed += 1;
}
}
}
println!(
"removed {} stale pending entries (older than {} days)",
removed, days
);
}
Commands::Reopen { task_id, reason } => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
if !tj_core::db::task_exists(&conn, &task_id)? {
anyhow::bail!("task not found: {task_id}");
}
drop(conn);
let mut event = tj_core::event::Event::new(
&task_id,
tj_core::event::EventType::Reopen,
tj_core::event::Author::User,
tj_core::event::Source::Cli,
reason.clone().unwrap_or_else(|| "(reopened)".into()),
);
if let Some(r) = reason {
event.meta = serde_json::json!({"reason": r});
}
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
println!("{}", event.event_id);
}
Commands::Goal { task_id, text } => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
if !tj_core::db::task_exists(&conn, &task_id)? {
anyhow::bail!("task not found: {task_id}");
}
tj_core::db::set_task_goal(&conn, &task_id, &text)?;
println!("ok");
}
Commands::External { task_id, add } => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
if !tj_core::db::task_exists(&conn, &task_id)? {
anyhow::bail!("task not found: {task_id}");
}
tj_core::db::add_task_external(&conn, &task_id, &add)?;
println!("ok");
}
Commands::Reclassify { task_id } => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
if !tj_core::db::task_exists(&conn, &task_id)? {
anyhow::bail!("task not found: {task_id}");
}
let count = tj_core::db::reclassify_task_artifacts(&conn, &task_id)?;
println!("reclassified {} events", count);
}
Commands::EventCorrect {
corrects,
task,
text,
} => {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
std::fs::create_dir_all(events_path.parent().unwrap())?;
let mut event = tj_core::event::Event::new(
&task,
tj_core::event::EventType::Correction,
tj_core::event::Author::User,
tj_core::event::Source::Cli,
text,
);
event.corrects = Some(corrects);
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
println!("{}", event.event_id);
}
Commands::InstallHooks {
scope,
uninstall,
classifier_command,
backfill,
} => {
let settings_path = match scope.as_str() {
"user" => {
let home =
std::env::var_os("HOME").ok_or_else(|| anyhow::anyhow!("HOME not set"))?;
std::path::PathBuf::from(home)
.join(".claude")
.join("settings.json")
}
"project" => std::env::current_dir()?
.join(".claude")
.join("settings.json"),
other => anyhow::bail!("unknown scope: {other}"),
};
if let Some(p) = settings_path.parent() {
std::fs::create_dir_all(p)?;
}
let mut current: serde_json::Value = if settings_path.exists() {
serde_json::from_str(&std::fs::read_to_string(&settings_path)?)
.unwrap_or_else(|_| serde_json::json!({}))
} else {
serde_json::json!({})
};
let hooks_obj = current
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("settings is not a JSON object"))?;
if uninstall {
if let Some(hooks_block) =
hooks_obj.get_mut("hooks").and_then(|v| v.as_object_mut())
{
let kinds: Vec<String> = hooks_block.keys().cloned().collect();
for kind in kinds {
let Some(arr) = hooks_block.get_mut(&kind).and_then(|v| v.as_array_mut())
else {
continue;
};
for entry in arr.iter_mut() {
let Some(inner) = entry.get_mut("hooks").and_then(|v| v.as_array_mut())
else {
continue;
};
inner.retain(|h| {
h.get("command")
.and_then(|c| c.as_str())
.map(|c| !c.contains("task-journal ingest-hook"))
.unwrap_or(true)
});
}
arr.retain(|entry| {
entry
.get("hooks")
.and_then(|v| v.as_array())
.map(|a| !a.is_empty())
.unwrap_or(true)
});
if arr.is_empty() {
hooks_block.remove(&kind);
}
}
if hooks_block.is_empty() {
hooks_obj.remove("hooks");
}
}
if let Some(env) = hooks_obj.get_mut("env").and_then(|v| v.as_object_mut()) {
env.remove("TJ_CLASSIFIER_CLI");
if env.is_empty() {
hooks_obj.remove("env");
}
}
} else {
let cmd = "task-journal ingest-hook --backend=cli || true";
let entries = serde_json::json!({
"UserPromptSubmit": [{ "matcher": "", "hooks": [{ "type": "command", "command": cmd }] }],
"PostToolUse": [{ "matcher": "", "hooks": [{ "type": "command", "command": cmd }] }],
"Stop": [{ "matcher": "", "hooks": [{ "type": "command", "command": cmd }] }],
"SessionStart": [{ "matcher": "", "hooks": [{ "type": "command", "command": cmd }] }],
"PreCompact": [{ "matcher": "", "hooks": [{ "type": "command", "command": cmd }] }],
});
hooks_obj.insert("hooks".into(), entries);
if let Some(cmd) = classifier_command {
let env = hooks_obj
.entry("env".to_string())
.or_insert_with(|| serde_json::json!({}));
let env_obj = env
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("settings.env is not a JSON object"))?;
env_obj.insert(
"TJ_CLASSIFIER_CLI".to_string(),
serde_json::Value::String(cmd),
);
}
}
std::fs::write(&settings_path, serde_json::to_string_pretty(¤t)?)?;
println!("{}", settings_path.display());
if !uninstall && backfill {
let exe =
std::env::current_exe().context("locate task-journal binary for backfill")?;
let status = std::process::Command::new(&exe)
.arg("backfill")
.status()
.with_context(|| format!("spawn `{} backfill`", exe.display()))?;
if !status.success() {
eprintln!("backfill exited with {status}");
}
}
}
Commands::Stats => {
let metrics_dir = tj_core::paths::metrics_dir()?;
let mut total = 0usize;
let mut confirmed = 0usize;
let mut suggested = 0usize;
let mut errors = 0usize;
if metrics_dir.exists() {
for entry in std::fs::read_dir(&metrics_dir)? {
let path = entry?.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
let body = std::fs::read_to_string(&path)?;
for line in body.lines().filter(|l| !l.trim().is_empty()) {
total += 1;
let v: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => {
errors += 1;
continue;
}
};
match v.get("status").and_then(|s| s.as_str()) {
Some("confirmed") => confirmed += 1,
Some("suggested") => suggested += 1,
_ => {}
}
}
}
}
println!("classified: {total}");
println!(" confirmed: {confirmed}");
println!(" suggested: {suggested}");
println!(" parse errors: {errors}");
if total > 0 {
let ratio = confirmed as f64 / total as f64 * 100.0;
println!(" confirmed ratio: {ratio:.1}%");
}
}
Commands::Doctor { json } => {
let report = run_doctor()?;
if json {
println!("{}", serde_json::to_string_pretty(&report)?);
} else {
report.print_human();
}
if !report.issues.is_empty() {
std::process::exit(1);
}
}
Commands::MigrateProject { from, to, force } => {
run_migrate_project(&from, &to, force)?;
}
Commands::Pending { action } => match action {
PendingCmd::List => {
run_pending_list()?;
}
PendingCmd::Retry {
mock_event_type,
mock_task_id,
mock_confidence,
} => {
run_pending_retry(
mock_event_type.as_deref(),
mock_task_id.as_deref(),
mock_confidence,
)?;
}
},
Commands::IngestHook {
kind,
text,
backend,
mock_event_type,
mock_task_id,
mock_confidence,
} => {
if std::env::var("TJ_IN_CLASSIFIER").is_ok() {
return Ok(());
}
let (kind, text, payload) = match (kind, text) {
(Some(k), Some(t)) => (k, t, serde_json::Value::Null),
_ => parse_hook_stdin()?,
};
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
std::fs::create_dir_all(events_path.parent().unwrap())?;
if kind == "SessionStart" {
if !events_path.exists() {
return Ok(());
}
let state_path =
tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
let recent = recent_task_contexts(&conn, 3)?;
if recent.is_empty() {
return Ok(());
}
let mut bundle = String::new();
for tc in &recent {
let pack = tj_core::pack::assemble(
&conn,
&tc.task_id,
tj_core::pack::PackMode::Compact,
)?;
bundle.push_str(&pack.text);
bundle.push_str("\n\n");
}
let envelope = serde_json::json!({
"hookSpecificOutput": {
"hookEventName": "SessionStart",
"additionalContext": bundle.trim_end(),
}
});
println!("{}", serde_json::to_string(&envelope)?);
return Ok(());
}
if kind == "PreCompact" {
if !events_path.exists() {
return Ok(());
}
let state_path =
tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
let recent = recent_task_contexts(&conn, 1)?;
let Some(tc) = recent.into_iter().next() else {
return Ok(());
};
let last_event_ts: Option<String> = conn
.query_row(
"SELECT timestamp FROM events_index WHERE task_id=?1 \
ORDER BY timestamp DESC LIMIT 1",
rusqlite::params![&tc.task_id],
|r| r.get::<_, String>(0),
)
.ok();
let transcript_path = payload
.get("transcript_path")
.and_then(|x| x.as_str())
.map(std::path::PathBuf::from);
if let Some(tp) = transcript_path.as_ref() {
if tp.exists() {
let enq = precompact_enqueue_transcript_chunks(
tp,
&events_path,
&project_hash,
&backend,
last_event_ts.as_deref(),
)
.unwrap_or(0);
if enq > 0 && std::env::var("TJ_DISABLE_CLASSIFY_SPAWN").is_err() {
let _ = spawn_classify_worker(&backend);
}
}
}
let now = chrono::Utc::now()
.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let marker_text = format!(
"Conversation compacted at {now}; preceding events should be treated as a single reasoning unit."
);
let mut event = tj_core::event::Event::new(
&tc.task_id,
tj_core::event::EventType::Decision,
tj_core::event::Author::Classifier,
tj_core::event::Source::Hook,
marker_text,
);
event.confidence = Some(1.0);
event.status = tj_core::event::EventStatus::Confirmed;
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
let metrics_path =
tj_core::paths::metrics_dir()?.join(format!("{project_hash}.jsonl"));
let _ = tj_core::classifier::telemetry::append(
&metrics_path,
&tj_core::classifier::telemetry::TelemetryRecord {
timestamp: chrono::Utc::now()
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
project_hash: project_hash.clone(),
task_id_guess: Some(tc.task_id.clone()),
event_type: "decision".into(),
confidence: 1.0,
status: "confirmed".into(),
error: None,
},
);
println!("{}", event.event_id);
return Ok(());
}
drain_pending(
&events_path,
mock_event_type.as_deref(),
mock_task_id.as_deref(),
mock_confidence,
)?;
let is_mock_pre = mock_event_type.is_some() && mock_task_id.is_some();
if !is_mock_pre && text.trim().is_empty() {
return Ok(());
}
if !is_mock_pre && kind == "UserPromptSubmit" && is_rewind_prompt(&text) {
if !events_path.exists() {
return Ok(());
}
let state_path =
tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
let recent = recent_task_contexts(&conn, 1)?;
let Some(tc) = recent.into_iter().next() else {
return Ok(());
};
let mut event = tj_core::event::Event::new(
&tc.task_id,
tj_core::event::EventType::Correction,
tj_core::event::Author::User,
tj_core::event::Source::Hook,
"User invoked /rewind — preceding events on this task should be reconsidered. They may have been part of a path the user explicitly rolled back.".to_string(),
);
event.confidence = Some(1.0);
event.status = tj_core::event::EventStatus::Confirmed;
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
println!("{}", event.event_id);
return Ok(());
}
let force_sync = std::env::var("TJ_INGEST_SYNC")
.ok()
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let is_mock = mock_event_type.is_some() && mock_task_id.is_some();
if !is_mock && !force_sync {
let _ = persist_pending_v2(
&events_path,
&kind,
&text,
&project_hash,
&backend,
)?;
let _ = spawn_classify_worker(&backend);
return Ok(());
}
let author_hint = if kind.contains("UserPrompt") {
"user"
} else {
"assistant"
};
let (etype, task_id, confidence, evidence_strength, suggested_text) =
if let (Some(t), Some(tid)) = (mock_event_type.as_deref(), mock_task_id.as_deref())
{
(
parse_event_type(t)?,
tid.to_string(),
mock_confidence.unwrap_or(1.0),
None,
None,
)
} else {
let state_path =
tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
let mut recent = recent_task_contexts(&conn, 5)?;
if recent.is_empty() {
let auto_open_disabled = std::env::var("TJ_AUTO_OPEN_TASKS")
.ok()
.map(|v| v == "0" || v.eq_ignore_ascii_case("false"))
.unwrap_or(false);
if auto_open_disabled || !kind.contains("UserPrompt") {
return Ok(());
}
let new_task =
auto_open_task_from_prompt(&events_path, &project_hash, &conn, &text)?;
recent.push(new_task);
}
use tj_core::classifier::Classifier;
let classifier: Box<dyn Classifier> = match backend.as_str() {
"cli" => Box::new(tj_core::classifier::cli::ClaudeCliClassifier::default()),
"api" => {
Box::new(tj_core::classifier::http::AnthropicClassifier::from_env()?)
}
other => {
anyhow::bail!("unknown backend: {other} (expected `cli` or `api`)")
}
};
let input = tj_core::classifier::ClassifyInput {
text: text.clone(),
author_hint: author_hint.into(),
recent_tasks: recent,
};
let out = match classifier.classify(&input) {
Ok(o) => o,
Err(e) => {
persist_pending(&events_path, &text, &e.to_string())?;
return Ok(());
}
};
let Some(tid) = out.task_id_guess else {
return Ok(());
};
use tj_core::event::EventType;
if matches!(out.event_type, EventType::Close) && kind == "Stop" {
return Ok(());
}
match tj_core::db::task_status(&conn, &tid)? {
None => {
persist_pending(
&events_path,
&text,
&format!("task_id_guess `{tid}` not found"),
)?;
return Ok(());
}
Some(s) if s == "closed" => {
persist_pending(
&events_path,
&text,
&format!("task_id_guess `{tid}` is closed"),
)?;
return Ok(());
}
_ => {}
}
(
out.event_type,
tid,
out.confidence,
out.evidence_strength,
Some(out.suggested_text),
)
};
let event_text = suggested_text.unwrap_or(text);
let mut event = tj_core::event::Event::new(
&task_id,
etype,
tj_core::event::Author::Classifier,
tj_core::event::Source::Hook,
event_text,
);
event.confidence = Some(confidence);
event.status = tj_core::classifier::decide_status(confidence);
event.evidence_strength = evidence_strength;
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
let metrics_path = tj_core::paths::metrics_dir()?.join(format!("{project_hash}.jsonl"));
let etype_str = serde_json::to_value(etype)?
.as_str()
.unwrap_or("?")
.to_string();
let status_str = serde_json::to_value(event.status)?
.as_str()
.unwrap_or("?")
.to_string();
let _ = tj_core::classifier::telemetry::append(
&metrics_path,
&tj_core::classifier::telemetry::TelemetryRecord {
timestamp: chrono::Utc::now()
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
project_hash: project_hash.clone(),
task_id_guess: Some(task_id.clone()),
event_type: etype_str,
confidence,
status: status_str,
error: None,
},
);
println!("{}", event.event_id);
}
Commands::ClassifyWorker { backend } => {
run_classify_worker(&backend)?;
}
Commands::Export {
format,
task,
project,
} => {
let cwd = match project {
Some(p) => std::path::PathBuf::from(p),
None => std::env::current_dir()?,
};
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
if !events_path.exists() {
anyhow::bail!("no events file at {events_path:?}");
}
let body = std::fs::read_to_string(&events_path)?;
let all_events: Vec<tj_core::event::Event> = body
.lines()
.filter(|l| !l.trim().is_empty())
.map(serde_json::from_str)
.collect::<Result<_, _>>()?;
let events: Vec<&tj_core::event::Event> = if let Some(ref tid) = task {
all_events.iter().filter(|e| e.task_id == *tid).collect()
} else {
all_events.iter().collect()
};
if events.is_empty() {
if let Some(tid) = task {
anyhow::bail!("no events found for task {tid}");
} else {
anyhow::bail!("no events in project");
}
}
match format.as_str() {
"json" => {
let json = serde_json::to_string_pretty(&events)?;
println!("{json}");
}
"md" => {
println!("# Task Journal Export\n");
let mut tasks: std::collections::BTreeMap<String, Vec<&tj_core::event::Event>> =
std::collections::BTreeMap::new();
for e in &events {
tasks.entry(e.task_id.clone()).or_default().push(e);
}
for (task_id, task_events) in &tasks {
let title = task_events
.iter()
.find(|e| e.event_type == tj_core::event::EventType::Open)
.and_then(|e| {
e.meta
.get("title")
.and_then(|v| v.as_str())
.map(String::from)
.or_else(|| Some(e.text.clone()))
})
.unwrap_or_else(|| "(untitled)".into());
let status = if task_events
.last()
.map(|e| e.event_type == tj_core::event::EventType::Close)
.unwrap_or(false)
{
"closed"
} else {
"open"
};
let created = task_events
.first()
.map(|e| e.timestamp.as_str())
.unwrap_or("?");
println!("## [{task_id}] {title}");
println!("**Status**: {status} ");
println!("**Created**: {created}\n");
println!("### Timeline");
for e in task_events {
let etype = serde_json::to_value(e.event_type)
.ok()
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| "?".into());
println!("- **[{}] {}**: {}", e.timestamp, etype, e.text);
}
println!();
}
}
"html" => {
print!("{}", render_html_timeline(&events));
}
"sqlite" => {
let state_path =
tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
tj_core::db::rebuild_state(&conn, &events_path, &project_hash)?;
let tmp = tempfile::TempDir::new()?;
let out_path = tmp.path().join("export.sqlite");
conn.execute(
"VACUUM INTO ?1",
rusqlite::params![out_path.to_string_lossy().into_owned()],
)?;
drop(conn);
let bytes = std::fs::read(&out_path)?;
use std::io::Write;
std::io::stdout()
.lock()
.write_all(&bytes)
.context("write sqlite snapshot to stdout")?;
}
other => anyhow::bail!(
"unknown format: {other} (expected `md`, `json`, `html`, or `sqlite`)"
),
}
}
Commands::Search {
query,
limit,
all_projects,
} => {
if all_projects {
let state_dir = tj_core::paths::state_dir()?;
let hashes = tj_core::db::list_all_projects(&state_dir)?;
for hash in hashes {
let path = state_dir.join(format!("{hash}.sqlite"));
let conn = match rusqlite::Connection::open(&path) {
Ok(c) => c,
Err(_) => continue,
};
let mut stmt = match conn.prepare(
"SELECT DISTINCT task_id FROM search_fts WHERE search_fts MATCH ?1 LIMIT ?2"
) {
Ok(s) => s,
Err(_) => continue,
};
let rows = match stmt.query_map(rusqlite::params![&query, limit as i64], |r| {
r.get::<_, String>(0)
}) {
Ok(r) => r,
Err(_) => continue,
};
for id in rows.flatten() {
println!("{hash}\t{id}");
}
}
} else {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path =
tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path =
tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
let mut stmt = conn.prepare(
"SELECT DISTINCT task_id FROM search_fts WHERE search_fts MATCH ?1 LIMIT ?2",
)?;
let ids: Vec<String> = stmt
.query_map(rusqlite::params![query, limit as i64], |r| {
r.get::<_, String>(0)
})?
.collect::<Result<_, _>>()?;
for id in ids {
println!("{id}");
}
}
}
Commands::Ui { project, chats } => {
let project_path = match project {
Some(p) => std::path::PathBuf::from(p),
None => std::env::current_dir()?,
};
if chats {
let mut app = tui::app::App::new_chats(&project_path)?;
let empty = app
.session_list
.as_ref()
.map(|sl| sl.sessions.is_empty())
.unwrap_or(true);
if empty {
eprintln!(
"No Claude Code sessions found for: {}",
project_path.display()
);
return Ok(());
}
app.run()?;
} else {
let mut app = tui::app::App::new(&project_path)?;
app.run()?;
}
}
Commands::Backfill {
dry_run,
limit,
project,
} => {
use tj_core::session::{discovery, extractor, parser};
let project_path = match project {
Some(p) => std::path::PathBuf::from(p),
None => std::env::current_dir()?,
};
let project_hash = tj_core::project_hash::from_path(&project_path)?;
let events_dir = tj_core::paths::events_dir()?;
let events_path = events_dir.join(format!("{project_hash}.jsonl"));
let proj_dir = discovery::find_project_dir(&project_path)?;
let proj_dir = match proj_dir {
Some(d) => d,
None => {
eprintln!(
"No Claude Code sessions found for: {}",
project_path.display()
);
eprintln!(
"Looked in: {}",
discovery::projects_dir()
.map(|p| p.display().to_string())
.unwrap_or_else(|_| "?".into())
);
return Ok(());
}
};
let mut sessions = discovery::list_sessions(&proj_dir)?;
if let Some(max) = limit {
sessions.truncate(max);
}
if sessions.is_empty() {
eprintln!("No session JSONL files found in: {}", proj_dir.display());
return Ok(());
}
eprintln!(
"Found {} session(s) for {}",
sessions.len(),
project_path.display()
);
let already_imported = if events_path.exists() {
let content = std::fs::read_to_string(&events_path).unwrap_or_default();
sessions
.iter()
.filter_map(|p| p.file_stem().and_then(|s| s.to_str()).map(String::from))
.filter(|sid| content.contains(sid))
.collect::<std::collections::HashSet<_>>()
} else {
std::collections::HashSet::new()
};
let mut total_tasks = 0;
let mut total_events = 0;
for session_path in &sessions {
let session_id = session_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("?")
.to_string();
if already_imported.contains(&session_id) {
eprintln!(
" ⊘ {} — already imported, skipping",
&session_id[..8.min(session_id.len())]
);
continue;
}
let parsed = match parser::parse_session(session_path) {
Ok(p) => p,
Err(e) => {
eprintln!(
" ✗ {} — parse error: {}",
&session_id[..8.min(session_id.len())],
e
);
continue;
}
};
let task = match extractor::extract_from_session(&parsed) {
Some(t) => t,
None => {
eprintln!(
" ⊘ {} — too small ({} msgs), skipping",
&session_id[..8.min(session_id.len())],
parsed.user_message_count()
);
continue;
}
};
if dry_run {
eprintln!(
" ▸ {} → task {} \"{}\" ({} events)",
&session_id[..8.min(session_id.len())],
task.task_id,
task.title.chars().take(60).collect::<String>(),
task.events.len()
);
for ev in &task.events {
let etype = serde_json::to_value(ev.event_type)
.ok()
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| "?".into());
eprintln!(
" {:12} {}",
etype,
ev.text.chars().take(80).collect::<String>()
);
}
} else {
std::fs::create_dir_all(&events_dir)?;
let mut writer = tj_core::storage::JsonlWriter::open(&events_path)?;
for event in &task.events {
writer.append(event)?;
}
writer.flush_durable()?;
eprintln!(
" ✓ {} → {} \"{}\" ({} events)",
&session_id[..8.min(session_id.len())],
task.task_id,
task.title.chars().take(60).collect::<String>(),
task.events.len()
);
}
total_tasks += 1;
total_events += task.events.len();
}
if dry_run {
eprintln!(
"\nDry run: would create {total_tasks} task(s) with {total_events} event(s)."
);
eprintln!("Run without --dry-run to import.");
} else {
eprintln!("\nImported {total_tasks} task(s) with {total_events} event(s).");
}
}
Commands::Statusline => {
print!("{}", run_statusline().unwrap_or_default());
}
Commands::Rejected {
topic,
all_projects,
limit,
since,
} => {
run_rejected(&topic, all_projects, limit, since)?;
}
Commands::ExportPr { task_id } => {
run_export_pr(&task_id)?;
}
}
Ok(())
}
fn run_statusline() -> anyhow::Result<String> {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
if !state_path.exists() && !events_path.exists() {
return Ok(String::new());
}
if !state_path.exists() && events_path.exists() {
let conn = tj_core::db::open(&state_path)?;
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
let conn = rusqlite::Connection::open(&state_path)?;
let recent_open: Option<String> = conn
.query_row(
"SELECT task_id FROM tasks WHERE project_hash = ?1 AND status = 'open'
ORDER BY last_event_at DESC LIMIT 1",
rusqlite::params![project_hash],
|r| r.get::<_, String>(0),
)
.ok();
let open_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM tasks WHERE project_hash = ?1 AND status = 'open'",
rusqlite::params![project_hash],
|r| r.get(0),
)
.unwrap_or(0);
let stale_count = tj_core::db::stale_tasks(&conn, 7)?
.into_iter()
.filter(|t| {
conn.query_row(
"SELECT project_hash FROM tasks WHERE task_id = ?1",
rusqlite::params![t.task_id],
|r| r.get::<_, String>(0),
)
.map(|h| h == project_hash)
.unwrap_or(false)
})
.count();
let pending_count = pending_dir()
.ok()
.and_then(|d| std::fs::read_dir(&d).ok())
.map(|rd| {
rd.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.and_then(|x| x.to_str())
.map(|x| x == "json")
.unwrap_or(false)
})
.count()
})
.unwrap_or(0);
let inner = match recent_open {
Some(id) => format!(
"{id} · open: {open_count} · pending: {pending_count} · stale: {stale_count}"
),
None => format!(
"open: {open_count} · pending: {pending_count} · stale: {stale_count}"
),
};
Ok(format!("[{inner}]"))
}
fn is_rewind_prompt(text: &str) -> bool {
let trimmed = text.trim_start();
let token = trimmed
.split_whitespace()
.next()
.unwrap_or("");
token.eq_ignore_ascii_case("/rewind")
}
fn topic_is_fts_safe(topic: &str) -> bool {
!topic
.chars()
.any(|c| matches!(c, '-' | '"' | '*' | ':' | '(' | ')'))
}
fn run_rejected(
topic: &str,
all_projects: bool,
limit: usize,
since: Option<i64>,
) -> Result<()> {
let cutoff: Option<String> = since.map(|d| {
(chrono::Utc::now() - chrono::Duration::days(d))
.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
});
let state_dir = tj_core::paths::state_dir()?;
let project_filter: Option<String> = if all_projects {
None
} else {
let cwd = std::env::current_dir()?;
Some(tj_core::project_hash::from_path(&cwd)?)
};
let hashes: Vec<String> = if let Some(h) = &project_filter {
let events_path = tj_core::paths::events_dir()?.join(format!("{h}.jsonl"));
if events_path.exists() {
let state_path = state_dir.join(format!("{h}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
tj_core::db::ingest_new_events(&conn, &events_path, h)?;
}
vec![h.clone()]
} else {
tj_core::db::list_all_projects(&state_dir)?
};
let mut hits: Vec<(String, String, String, String, String)> = Vec::new();
for hash in hashes {
let path = state_dir.join(format!("{hash}.sqlite"));
let conn = match rusqlite::Connection::open(&path) {
Ok(c) => c,
Err(_) => continue,
};
let use_fts = topic_is_fts_safe(topic);
let sql = if use_fts {
"SELECT ei.event_id, ei.task_id, ei.timestamp, sf.text, t.title
FROM events_index ei
JOIN search_fts sf ON sf.event_id = ei.event_id
JOIN tasks t ON t.task_id = ei.task_id
WHERE ei.type = 'rejection'
AND search_fts MATCH ?1
AND (?2 IS NULL OR ei.timestamp >= ?2)
ORDER BY ei.timestamp DESC LIMIT ?3"
} else {
"SELECT ei.event_id, ei.task_id, ei.timestamp, sf.text, t.title
FROM events_index ei
JOIN search_fts sf ON sf.event_id = ei.event_id
JOIN tasks t ON t.task_id = ei.task_id
WHERE ei.type = 'rejection'
AND sf.text LIKE ?1
AND (?2 IS NULL OR ei.timestamp >= ?2)
ORDER BY ei.timestamp DESC LIMIT ?3"
};
let mut stmt = match conn.prepare(sql) {
Ok(s) => s,
Err(_) => continue,
};
let bind_q = if use_fts {
topic.to_string()
} else {
format!("%{topic}%")
};
let rows = match stmt.query_map(
rusqlite::params![bind_q, cutoff, limit as i64],
|r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
r.get::<_, Option<String>>(3)?.unwrap_or_default(),
r.get::<_, String>(4)?,
))
},
) {
Ok(r) => r,
Err(_) => continue,
};
for row in rows.flatten() {
hits.push(row);
}
}
hits.sort_by(|a, b| b.2.cmp(&a.2));
hits.truncate(limit);
for (_eid, task_id, ts, text, title) in hits {
let date = ts.get(..10).unwrap_or(&ts);
let one_line: String = text
.lines()
.next()
.unwrap_or("")
.chars()
.take(120)
.collect();
println!("{task_id}\t{date}\t\"{one_line}\"");
println!("\t\t(in task: {title})");
}
Ok(())
}
fn run_export_pr(task_id: &str) -> Result<()> {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, &events_path, &project_hash)?;
}
let title: String = match conn
.query_row(
"SELECT title FROM tasks WHERE task_id = ?1",
rusqlite::params![task_id],
|r| r.get::<_, String>(0),
) {
Ok(t) => t,
Err(rusqlite::Error::QueryReturnedNoRows) => {
eprintln!("Error: task not found: {task_id}");
std::process::exit(1);
}
Err(e) => return Err(e.into()),
};
let meta = tj_core::db::task_metadata(&conn, task_id)?.unwrap_or_default();
let summary = meta.goal.unwrap_or_else(|| title.clone());
let mut stmt = conn.prepare(
"SELECT ei.type, sf.text FROM events_index ei
LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
WHERE ei.task_id = ?1 ORDER BY ei.timestamp ASC",
)?;
let rows = stmt.query_map(rusqlite::params![task_id], |r| {
let ty: String = r.get(0)?;
let txt: Option<String> = r.get(1)?;
Ok((ty, txt.unwrap_or_default()))
})?;
let mut decisions: Vec<String> = Vec::new();
let mut rejections: Vec<String> = Vec::new();
let mut evidence: Vec<String> = Vec::new();
for row in rows {
let (ty, text) = row?;
let one_line: String = text
.lines()
.next()
.unwrap_or("")
.trim()
.to_string();
if one_line.is_empty() {
continue;
}
match ty.as_str() {
"decision" => decisions.push(one_line),
"rejection" => rejections.push(one_line),
"evidence" => evidence.push(one_line),
_ => {}
}
}
let arts = tj_core::db::task_artifacts(&conn, task_id)?;
let mut out = String::new();
out.push_str("## Summary\n");
out.push_str(&summary);
out.push_str("\n\n");
out.push_str("## Changes\n");
if decisions.is_empty() {
out.push_str("- (no decision events recorded)\n");
} else {
for d in &decisions {
out.push_str(&format!("- {d}\n"));
}
}
out.push('\n');
if !rejections.is_empty() {
out.push_str("## Why this approach (vs alternatives)\n");
for r in &rejections {
out.push_str(&format!("- {r}\n"));
}
out.push('\n');
}
if !evidence.is_empty() {
out.push_str("## Verification\n");
for e in &evidence {
out.push_str(&format!("- {e}\n"));
}
out.push('\n');
}
let any_arts = !arts.files.is_empty()
|| !arts.commit_hashes.is_empty()
|| !arts.linked_issues.is_empty()
|| !arts.branch_names.is_empty()
|| !arts.pr_urls.is_empty();
if any_arts {
out.push_str("## Affected\n");
if !arts.files.is_empty() {
out.push_str(&format!("- Files: {}\n", arts.files.join(", ")));
}
if !arts.commit_hashes.is_empty() {
out.push_str(&format!(
"- Commits: {}\n",
arts.commit_hashes.join(", ")
));
}
if !arts.linked_issues.is_empty() {
out.push_str(&format!(
"- Issues: {}\n",
arts.linked_issues.join(", ")
));
}
if !arts.branch_names.is_empty() {
out.push_str(&format!(
"- Branches: {}\n",
arts.branch_names.join(", ")
));
}
if !arts.pr_urls.is_empty() {
out.push_str(&format!("- PRs: {}\n", arts.pr_urls.join(", ")));
}
out.push('\n');
}
print!("{}", out);
Ok(())
}
fn recent_task_contexts(
conn: &rusqlite::Connection,
limit: usize,
) -> anyhow::Result<Vec<tj_core::classifier::TaskContext>> {
let mut stmt = conn.prepare(
"SELECT task_id, title FROM tasks WHERE status='open' ORDER BY last_event_at DESC LIMIT ?1",
)?;
let task_rows: Vec<(String, String)> = stmt
.query_map(rusqlite::params![limit as i64], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
})?
.collect::<Result<_, _>>()?;
let mut out = Vec::with_capacity(task_rows.len());
for (task_id, title) in task_rows {
let mut e_stmt = conn.prepare(
"SELECT ei.type, sf.text FROM events_index ei
LEFT JOIN search_fts sf ON sf.event_id = ei.event_id
WHERE ei.task_id=?1 ORDER BY ei.timestamp DESC LIMIT 3",
)?;
let last_events: Vec<String> = e_stmt
.query_map(rusqlite::params![task_id], |r| {
let ty: String = r.get(0)?;
let txt: Option<String> = r.get(1)?;
Ok(format!(
"[{ty}] {}",
txt.unwrap_or_default().chars().take(80).collect::<String>()
))
})?
.collect::<Result<_, _>>()?;
out.push(tj_core::classifier::TaskContext {
task_id,
title,
last_events,
});
}
Ok(out)
}
fn auto_open_task_from_prompt(
events_path: &std::path::Path,
project_hash: &str,
conn: &rusqlite::Connection,
prompt: &str,
) -> anyhow::Result<tj_core::classifier::TaskContext> {
let cleaned = prompt.trim();
let title: String = cleaned
.lines()
.map(|l| l.trim())
.find(|l| !l.is_empty())
.map(|l| l.chars().take(80).collect())
.unwrap_or_else(|| "(auto-opened: empty prompt)".to_string());
let goal: String = cleaned.chars().take(200).collect();
let task_id = tj_core::new_task_id();
let mut event = tj_core::event::Event::new(
task_id.clone(),
tj_core::event::EventType::Open,
tj_core::event::Author::User,
tj_core::event::Source::Cli,
title.clone(),
);
event.meta = serde_json::json!({ "title": title, "auto_opened": true });
let mut writer = tj_core::storage::JsonlWriter::open(events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
tj_core::db::ingest_new_events(conn, events_path, project_hash)?;
if !goal.is_empty() {
tj_core::db::set_task_goal(conn, &task_id, &goal)?;
}
let prompt_arts = tj_core::artifacts::extract(prompt);
if !prompt_arts.is_empty() {
let related = tj_core::db::find_related_tasks(conn, &prompt_arts)?;
let mut warned = false;
for r in related.iter().take(5) {
if r.task_id == task_id {
continue;
}
let _ =
tj_core::db::add_task_external(conn, &task_id, &format!("linked:{}", r.task_id));
if !warned && r.status == "closed" {
eprintln!(
"task-journal: this prompt looks like a continuation of closed task {} \
(score {:.1}) — run `task-journal reopen {}` if it is.",
r.task_id, r.score, r.task_id
);
warned = true;
}
}
}
Ok(tj_core::classifier::TaskContext {
task_id,
title,
last_events: vec![],
})
}
fn persist_pending(events_path: &std::path::Path, text: &str, err: &str) -> anyhow::Result<()> {
let pending_dir = events_path
.parent()
.unwrap()
.parent()
.unwrap()
.join("pending");
std::fs::create_dir_all(&pending_dir)?;
let id = ulid::Ulid::new().to_string();
let payload = serde_json::json!({"text": text, "error": err, "queued_at": chrono::Utc::now().to_rfc3339()});
std::fs::write(
pending_dir.join(format!("{id}.json")),
serde_json::to_string_pretty(&payload)?,
)?;
Ok(())
}
fn persist_pending_v2(
events_path: &std::path::Path,
kind: &str,
text: &str,
project_hash: &str,
backend: &str,
) -> anyhow::Result<std::path::PathBuf> {
let pending_dir = events_path
.parent()
.unwrap()
.parent()
.unwrap()
.join("pending");
std::fs::create_dir_all(&pending_dir)?;
let id = ulid::Ulid::new().to_string();
let payload = serde_json::json!({
"schema": "v2",
"kind": kind,
"text": text,
"project_hash": project_hash,
"events_path": events_path.to_string_lossy(),
"backend": backend,
"queued_at": chrono::Utc::now().to_rfc3339(),
});
let path = pending_dir.join(format!("{id}.json"));
std::fs::write(&path, serde_json::to_string_pretty(&payload)?)?;
Ok(path)
}
fn precompact_enqueue_transcript_chunks(
transcript_path: &std::path::Path,
events_path: &std::path::Path,
project_hash: &str,
backend: &str,
last_event_ts: Option<&str>,
) -> anyhow::Result<usize> {
use tj_core::session::parser::{
extract_assistant_texts, extract_user_text, parse_session, SessionEntry,
};
let parsed = match parse_session(transcript_path) {
Ok(p) => p,
Err(_) => return Ok(0),
};
let mut count = 0usize;
for entry in &parsed.entries {
let (ts, text, kind) = match entry {
SessionEntry::User(u) => {
let text = extract_user_text(u).unwrap_or_default();
(u.timestamp.clone(), text, "UserPromptSubmit")
}
SessionEntry::Assistant(a) => {
let texts = extract_assistant_texts(a);
if texts.is_empty() {
continue;
}
(a.timestamp.clone(), texts.join("\n"), "PreCompactChunk")
}
_ => continue,
};
if text.trim().len() < 20 {
continue;
}
if let Some(last) = last_event_ts {
if ts.as_str() <= last {
continue;
}
}
persist_pending_v2(events_path, kind, &text, project_hash, backend)?;
count += 1;
}
Ok(count)
}
fn spawn_classify_worker(backend: &str) -> anyhow::Result<()> {
let exe = std::env::current_exe().context("locate current task-journal exe")?;
let mut cmd = std::process::Command::new(exe);
cmd.arg("classify-worker")
.arg("--backend")
.arg(backend)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.env("TJ_CLASSIFIER_BUMP", "1")
.env_remove("TJ_IN_CLASSIFIER");
let _child = cmd.spawn().context("spawn classify-worker")?;
Ok(())
}
struct WorkerLock {
path: std::path::PathBuf,
}
impl WorkerLock {
fn try_acquire(project_hash: &str) -> anyhow::Result<Option<Self>> {
let dir = tj_core::paths::state_dir()?;
std::fs::create_dir_all(&dir)?;
let path = dir.join(format!("classifier-{project_hash}.lock"));
loop {
match std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)
{
Ok(mut f) => {
use std::io::Write;
let _ = writeln!(f, "{}", std::process::id());
return Ok(Some(Self { path }));
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
let body = std::fs::read_to_string(&path).unwrap_or_default();
let pid: Option<u32> = body.trim().parse().ok();
if let Some(pid) = pid {
if pid_is_alive(pid) {
return Ok(None);
}
}
let _ = std::fs::remove_file(&path);
continue;
}
Err(e) => return Err(e.into()),
}
}
}
}
impl Drop for WorkerLock {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
#[cfg(unix)]
fn pid_is_alive(pid: u32) -> bool {
unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}
#[cfg(not(unix))]
fn pid_is_alive(_pid: u32) -> bool {
true
}
fn run_classify_worker(backend: &str) -> Result<()> {
let cwd = std::env::current_dir()?;
let project_hash = tj_core::project_hash::from_path(&cwd)?;
let lock = match WorkerLock::try_acquire(&project_hash)? {
Some(l) => l,
None => return Ok(()), };
let events_path = tj_core::paths::events_dir()?.join(format!("{project_hash}.jsonl"));
let pending = events_path
.parent()
.and_then(|p| p.parent())
.ok_or_else(|| anyhow::anyhow!("events_dir has no grandparent"))?
.join("pending");
if !pending.exists() {
drop(lock);
return Ok(());
}
let mut entries: Vec<std::path::PathBuf> = Vec::new();
for e in std::fs::read_dir(&pending)? {
let e = e?;
let p = e.path();
if p.extension().and_then(|s| s.to_str()) == Some("json") {
entries.push(p);
}
}
for path in entries {
if let Err(err) = process_pending_entry(&path, &events_path, &project_hash, backend) {
eprintln!("classify-worker: {} failed: {err:#}", path.display());
}
}
drop(lock);
Ok(())
}
fn process_pending_entry(
path: &std::path::Path,
events_path: &std::path::Path,
project_hash: &str,
backend: &str,
) -> anyhow::Result<()> {
let body = std::fs::read_to_string(path)?;
let v: serde_json::Value = serde_json::from_str(&body)?;
let schema = v.get("schema").and_then(|x| x.as_str()).unwrap_or("v1");
if schema != "v2" {
return Ok(()); }
let kind = v
.get("kind")
.and_then(|x| x.as_str())
.unwrap_or("Stop")
.to_string();
let text = v
.get("text")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
let state_path = tj_core::paths::state_dir()?.join(format!("{project_hash}.sqlite"));
let conn = tj_core::db::open(&state_path)?;
if events_path.exists() {
tj_core::db::ingest_new_events(&conn, events_path, project_hash)?;
}
let mut recent = recent_task_contexts(&conn, 5)?;
if recent.is_empty() {
let auto_open_disabled = std::env::var("TJ_AUTO_OPEN_TASKS")
.ok()
.map(|v| v == "0" || v.eq_ignore_ascii_case("false"))
.unwrap_or(false);
if auto_open_disabled || !kind.contains("UserPrompt") {
std::fs::remove_file(path)?;
return Ok(());
}
let new_task = auto_open_task_from_prompt(events_path, project_hash, &conn, &text)?;
recent.push(new_task);
}
let author_hint = if kind.contains("UserPrompt") {
"user"
} else {
"assistant"
};
use tj_core::classifier::Classifier;
let classifier: Box<dyn Classifier> = match backend {
"cli" => Box::new(tj_core::classifier::cli::ClaudeCliClassifier::default()),
"api" => Box::new(tj_core::classifier::http::AnthropicClassifier::from_env()?),
other => anyhow::bail!("unknown backend: {other}"),
};
let input = tj_core::classifier::ClassifyInput {
text: text.clone(),
author_hint: author_hint.into(),
recent_tasks: recent,
};
let out = match classifier.classify(&input) {
Ok(o) => o,
Err(e) => {
persist_pending(events_path, &text, &e.to_string())?;
std::fs::remove_file(path)?;
return Ok(());
}
};
let Some(tid) = out.task_id_guess else {
std::fs::remove_file(path)?;
return Ok(());
};
use tj_core::event::EventType;
if matches!(out.event_type, EventType::Close) && kind == "Stop" {
std::fs::remove_file(path)?;
return Ok(());
}
match tj_core::db::task_status(&conn, &tid)? {
None => {
persist_pending(
events_path,
&text,
&format!("task_id_guess `{tid}` not found"),
)?;
std::fs::remove_file(path)?;
return Ok(());
}
Some(s) if s == "closed" => {
persist_pending(
events_path,
&text,
&format!("task_id_guess `{tid}` is closed"),
)?;
std::fs::remove_file(path)?;
return Ok(());
}
_ => {}
}
let confidence = out.confidence;
let evidence_strength = out.evidence_strength;
let etype = out.event_type;
let event_text = out.suggested_text;
let mut event = tj_core::event::Event::new(
&tid,
etype,
tj_core::event::Author::Classifier,
tj_core::event::Source::Hook,
event_text,
);
event.confidence = Some(confidence);
event.status = tj_core::classifier::decide_status(confidence);
event.evidence_strength = evidence_strength;
let mut writer = tj_core::storage::JsonlWriter::open(events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
let metrics_path = tj_core::paths::metrics_dir()?.join(format!("{project_hash}.jsonl"));
let etype_str = serde_json::to_value(etype)?
.as_str()
.unwrap_or("?")
.to_string();
let status_str = serde_json::to_value(event.status)?
.as_str()
.unwrap_or("?")
.to_string();
let _ = tj_core::classifier::telemetry::append(
&metrics_path,
&tj_core::classifier::telemetry::TelemetryRecord {
timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
project_hash: project_hash.to_string(),
task_id_guess: Some(tid.clone()),
event_type: etype_str,
confidence,
status: status_str,
error: None,
},
);
std::fs::remove_file(path)?;
Ok(())
}
fn drain_pending(
events_path: &std::path::Path,
mock_etype: Option<&str>,
mock_tid: Option<&str>,
mock_conf: Option<f64>,
) -> anyhow::Result<()> {
let pending_dir = events_path
.parent()
.unwrap()
.parent()
.unwrap()
.join("pending");
if !pending_dir.exists() {
return Ok(());
}
for entry in std::fs::read_dir(&pending_dir)? {
let entry = entry?;
if entry.path().extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let body = std::fs::read_to_string(entry.path())?;
let v: serde_json::Value = serde_json::from_str(&body)?;
if v.get("schema").and_then(|x| x.as_str()) == Some("v2") {
continue;
}
let text = v
.get("text")
.and_then(|x| x.as_str())
.unwrap_or("")
.to_string();
if !text.is_empty() {
if let (Some(t), Some(tid)) = (mock_etype, mock_tid) {
let mut event = tj_core::event::Event::new(
tid,
parse_event_type(t)?,
tj_core::event::Author::Classifier,
tj_core::event::Source::Hook,
text,
);
event.confidence = mock_conf;
event.status = tj_core::classifier::decide_status(mock_conf.unwrap_or(1.0));
let mut writer = tj_core::storage::JsonlWriter::open(events_path)?;
writer.append(&event)?;
writer.flush_durable()?;
}
}
std::fs::remove_file(entry.path())?;
}
Ok(())
}
fn parse_hook_stdin() -> anyhow::Result<(String, String, serde_json::Value)> {
let mut buf = String::new();
std::io::Read::read_to_string(&mut std::io::stdin(), &mut buf)
.context("read hook payload from stdin")?;
let buf = buf.trim();
if buf.is_empty() {
return Ok(("Stop".into(), String::new(), serde_json::Value::Null));
}
let v: serde_json::Value =
serde_json::from_str(buf).with_context(|| format!("parse hook payload JSON: {buf}"))?;
let kind = v
.get("hook_event_name")
.and_then(|s| s.as_str())
.unwrap_or("Stop")
.to_string();
let text = match kind.as_str() {
"UserPromptSubmit" => v
.get("prompt")
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string(),
"PreToolUse" | "PostToolUse" => {
let tool = v
.get("tool_name")
.and_then(|s| s.as_str())
.unwrap_or("tool");
let input = v
.get("tool_input")
.map(|x| x.to_string())
.unwrap_or_default();
let response = v
.get("tool_response")
.map(|x| x.to_string())
.unwrap_or_default();
if response.is_empty() {
format!("{tool}: {input}")
} else {
format!("{tool}: {input} → {response}")
}
}
_ => String::new(),
};
Ok((kind, text, v))
}
fn parse_event_type(s: &str) -> anyhow::Result<tj_core::event::EventType> {
use tj_core::event::EventType::*;
Ok(match s {
"open" => Open,
"hypothesis" => Hypothesis,
"finding" => Finding,
"evidence" => Evidence,
"decision" => Decision,
"rejection" => Rejection,
"constraint" => Constraint,
"correction" => Correction,
"reopen" => Reopen,
"supersede" => Supersede,
"close" => Close,
"redirect" => Redirect,
other => anyhow::bail!("unknown event type: {other}"),
})
}
#[cfg(test)]
mod inline_tests {
use super::*;
#[test]
fn is_rewind_prompt_simple() {
assert!(is_rewind_prompt("/rewind"));
assert!(is_rewind_prompt("/rewind back to plan A"));
assert!(is_rewind_prompt(" /rewind"));
assert!(is_rewind_prompt("\t/rewind"));
}
#[test]
fn is_rewind_prompt_case_insensitive() {
assert!(is_rewind_prompt("/Rewind"));
assert!(is_rewind_prompt("/REWIND"));
}
#[test]
fn is_rewind_prompt_rejects_non_match() {
assert!(!is_rewind_prompt("rewind"));
assert!(!is_rewind_prompt("hello /rewind"));
assert!(!is_rewind_prompt(""));
assert!(!is_rewind_prompt("/rewinder"));
}
#[test]
fn topic_is_fts_safe_basic() {
assert!(topic_is_fts_safe("oauth"));
assert!(topic_is_fts_safe("foo bar"));
assert!(!topic_is_fts_safe("foo-bar"));
assert!(!topic_is_fts_safe("\"quote\""));
assert!(!topic_is_fts_safe("col:name"));
}
}