use super::process::{self, ProcInfo};
use crate::model::{AgentSession, ChildProcess, SessionFile, SessionStatus, SubAgent};
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
#[cfg(unix)]
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::process::Command;
pub struct ClaudeCollector {
sessions_dir: PathBuf,
projects_dir: PathBuf,
transcript_cache: HashMap<String, TranscriptResult>,
}
impl ClaudeCollector {
pub fn new() -> Self {
let home = dirs::home_dir().unwrap_or_default();
Self {
sessions_dir: home.join(".claude").join("sessions"),
projects_dir: home.join(".claude").join("projects"),
transcript_cache: HashMap::new(),
}
}
pub fn collect(&mut self, shared: &super::SharedProcessData) -> Vec<AgentSession> {
let session_files = match fs::read_dir(&self.sessions_dir) {
Ok(entries) => entries,
Err(_) => return vec![],
};
let mut sessions = Vec::new();
for entry in session_files.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
if let Some(session) = self.load_session(&path, &shared.process_info, &shared.children_map, &shared.ports) {
sessions.push(session);
}
}
let active_ids: std::collections::HashSet<&str> =
sessions.iter().map(|s| s.session_id.as_str()).collect();
self.transcript_cache.retain(|sid, _| active_ids.contains(sid.as_str()));
sessions.sort_by_key(|s| std::cmp::Reverse(s.started_at));
sessions
}
fn load_session(
&mut self,
path: &Path,
process_info: &HashMap<u32, ProcInfo>,
children_map: &HashMap<u32, Vec<u32>>,
ports: &HashMap<u32, Vec<u16>>,
) -> Option<AgentSession> {
let content = fs::read_to_string(path).ok()?;
let sf: SessionFile = serde_json::from_str(&content).ok()?;
let proc_cmd = process_info.get(&sf.pid).map(|p| p.command.as_str());
let pid_alive = proc_cmd
.map(|c| c.contains("/claude") && c.contains("--session-id"))
.unwrap_or(false);
if proc_cmd.map(|c| c.contains("--print")).unwrap_or(false) {
return None;
}
let project_name = sf
.cwd
.rsplit('/')
.next()
.unwrap_or("?")
.to_string();
let proc = process_info.get(&sf.pid);
let mem_mb = proc.map(|p| p.rss_kb / 1024).unwrap_or(0);
let transcript_path = self.find_transcript(&sf.cwd, &sf.session_id);
if let Some(ref tp) = transcript_path {
let cached = self.transcript_cache.remove(&sf.session_id);
let identity_changed = cached.as_ref()
.map(|c| c.file_identity != file_identity(tp))
.unwrap_or(false);
let from_offset = if identity_changed {
0
} else {
cached.as_ref().map(|c| c.new_offset).unwrap_or(0)
};
let delta = parse_transcript(tp, from_offset);
if let Some(mut prev) = cached {
if identity_changed || from_offset == 0 || delta.new_offset < from_offset {
self.transcript_cache.insert(sf.session_id.clone(), delta);
} else {
if delta.model != "-" {
prev.model = delta.model;
}
prev.total_input += delta.total_input;
prev.total_output += delta.total_output;
prev.total_cache_read += delta.total_cache_read;
prev.total_cache_create += delta.total_cache_create;
if delta.last_context_tokens > 0 {
prev.last_context_tokens = delta.last_context_tokens;
}
prev.turn_count += delta.turn_count;
if delta.turn_count > 0 {
prev.current_task = delta.current_task;
}
if !delta.version.is_empty() {
prev.version = delta.version;
}
if !delta.git_branch.is_empty() {
prev.git_branch = delta.git_branch;
}
if delta.last_activity > prev.last_activity {
prev.last_activity = delta.last_activity;
}
prev.token_history.extend(delta.token_history);
if prev.initial_prompt.is_empty() && !delta.initial_prompt.is_empty() {
prev.initial_prompt = delta.initial_prompt;
}
prev.new_offset = delta.new_offset;
self.transcript_cache.insert(sf.session_id.clone(), prev);
}
} else {
self.transcript_cache.insert(sf.session_id.clone(), delta);
}
}
let empty_result = TranscriptResult {
model: "-".to_string(),
total_input: 0, total_output: 0, total_cache_read: 0, total_cache_create: 0,
last_context_tokens: 0, turn_count: 0, current_task: String::new(),
version: String::new(), git_branch: String::new(),
last_activity: std::time::UNIX_EPOCH, new_offset: 0,
file_identity: (0, 0),
token_history: Vec::new(), initial_prompt: String::new(),
};
let cached = self.transcript_cache.get(&sf.session_id).unwrap_or(&empty_result);
let model = cached.model.clone();
let total_input = cached.total_input;
let total_output = cached.total_output;
let total_cache_read = cached.total_cache_read;
let total_cache_create = cached.total_cache_create;
let last_context_tokens = cached.last_context_tokens;
let turn_count = cached.turn_count;
let current_task = cached.current_task.clone();
let version = cached.version.clone();
let git_branch = cached.git_branch.clone();
let last_activity = cached.last_activity;
let token_history = cached.token_history.clone();
let initial_prompt = cached.initial_prompt.clone();
let status = if !pid_alive {
SessionStatus::Done
} else {
let since_activity = std::time::SystemTime::now()
.duration_since(last_activity)
.unwrap_or_default();
if since_activity.as_secs() < 30 {
SessionStatus::Working
} else {
let claude_cpu_active = proc.is_some_and(|p| p.cpu_pct > 1.0);
let has_active_descendant = process::has_active_descendant(
sf.pid, children_map, process_info, 5.0,
);
if claude_cpu_active || has_active_descendant {
SessionStatus::Working
} else {
SessionStatus::Waiting
}
}
};
let context_window = context_window_for_model(&model);
let context_percent = if context_window > 0 {
(last_context_tokens as f64 / context_window as f64) * 100.0
} else {
0.0
};
let current_tasks = if !current_task.is_empty() {
vec![current_task]
} else if !pid_alive {
vec!["finished".to_string()]
} else if matches!(status, SessionStatus::Waiting) {
vec!["waiting for input".to_string()]
} else {
vec!["thinking...".to_string()]
};
let mut children = Vec::new();
let mut stack: Vec<u32> = children_map
.get(&sf.pid)
.cloned()
.unwrap_or_default();
while let Some(cpid) = stack.pop() {
if let Some(cproc) = process_info.get(&cpid) {
let port = ports.get(&cpid).and_then(|v| v.first().copied());
children.push(ChildProcess {
pid: cpid,
command: cproc.command.clone(),
mem_kb: cproc.rss_kb,
port,
});
}
if let Some(grandchildren) = children_map.get(&cpid) {
stack.extend(grandchildren);
}
}
let (git_added, git_modified) = (0, 0);
let encoded_path = sf.cwd.replace('/', "-");
let subagents_dir = self.projects_dir.join(&encoded_path).join(&sf.session_id).join("subagents");
let subagents = Self::collect_subagents(&subagents_dir);
let memory_dir = self.projects_dir.join(&encoded_path).join("memory");
let (mem_file_count, mem_line_count) = Self::collect_memory_status(&memory_dir);
Some(AgentSession {
agent_cli: "claude",
pid: sf.pid,
session_id: sf.session_id,
cwd: sf.cwd,
project_name,
started_at: sf.started_at,
status,
model,
context_percent,
total_input_tokens: total_input,
total_output_tokens: total_output,
total_cache_read,
total_cache_create,
turn_count,
current_tasks,
mem_mb,
version,
git_branch,
git_added,
git_modified,
token_history,
subagents,
mem_file_count,
mem_line_count,
children,
transcript_offset: 0,
initial_prompt,
})
}
fn find_transcript(&self, cwd: &str, session_id: &str) -> Option<PathBuf> {
let encoded = cwd.replace('/', "-");
let dir = self.projects_dir.join(&encoded);
let path = dir.join(format!("{}.jsonl", session_id));
if path.exists() {
Some(path)
} else {
None
}
}
fn collect_subagents(subagents_dir: &Path) -> Vec<SubAgent> {
let mut subagents = Vec::new();
let entries = match fs::read_dir(subagents_dir) {
Ok(e) => e,
Err(_) => return subagents,
};
let mut meta_files: Vec<PathBuf> = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.ends_with(".meta.json") {
meta_files.push(path);
}
}
}
for meta_path in meta_files {
let meta_name = match meta_path.file_name().and_then(|n| n.to_str()) {
Some(n) => n.to_string(),
None => continue,
};
let meta_content = match fs::read_to_string(&meta_path) {
Ok(c) => c,
Err(_) => continue,
};
let meta_val: Value = match serde_json::from_str(&meta_content) {
Ok(v) => v,
Err(_) => continue,
};
let agent_type = meta_val.get("agentType")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let description = meta_val.get("description")
.and_then(|v| v.as_str())
.unwrap_or("agent")
.to_string();
let jsonl_name = meta_name.replace(".meta.json", ".jsonl");
let jsonl_path = meta_path.with_file_name(&jsonl_name);
let mut tokens = 0u64;
let mut last_activity = std::time::UNIX_EPOCH;
if jsonl_path.exists() {
if let Ok(metadata) = fs::metadata(&jsonl_path) {
if let Ok(mtime) = metadata.modified() {
last_activity = mtime;
}
}
let transcript = parse_transcript(&jsonl_path, 0);
tokens = transcript.total_input + transcript.total_output
+ transcript.total_cache_read + transcript.total_cache_create;
}
let status = {
let since = std::time::SystemTime::now()
.duration_since(last_activity)
.unwrap_or_default();
if since.as_secs() < 30 {
"working".to_string()
} else {
"done".to_string()
}
};
let name = truncate(&description, 30);
subagents.push(SubAgent {
name,
agent_type,
status,
tokens,
});
}
subagents
}
fn collect_memory_status(memory_dir: &Path) -> (u32, u32) {
let mut file_count = 0u32;
let mut line_count = 0u32;
if let Ok(entries) = fs::read_dir(memory_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
file_count += 1;
}
}
}
let memory_md = memory_dir.join("MEMORY.md");
if let Ok(content) = fs::read_to_string(&memory_md) {
line_count = content.lines().count() as u32;
}
(file_count, line_count)
}
}
struct TranscriptResult {
model: String,
total_input: u64,
total_output: u64,
total_cache_read: u64,
total_cache_create: u64,
last_context_tokens: u64,
turn_count: u32,
current_task: String,
version: String,
git_branch: String,
last_activity: std::time::SystemTime,
new_offset: u64,
file_identity: (u64, u64),
token_history: Vec<u64>,
initial_prompt: String,
}
fn file_identity(path: &Path) -> (u64, u64) {
fs::metadata(path)
.ok()
.map(|m| {
let ino = m.ino();
let mtime_ns = m.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
(ino, mtime_ns)
})
.unwrap_or((0, 0))
}
fn parse_transcript(path: &Path, from_offset: u64) -> TranscriptResult {
let identity = file_identity(path);
let mut result = TranscriptResult {
model: "-".to_string(),
total_input: 0,
total_output: 0,
total_cache_read: 0,
total_cache_create: 0,
last_context_tokens: 0,
turn_count: 0,
current_task: String::new(),
version: String::new(),
git_branch: String::new(),
last_activity: std::time::UNIX_EPOCH,
new_offset: from_offset,
file_identity: identity,
token_history: Vec::new(),
initial_prompt: String::new(),
};
let file = match fs::File::open(path) {
Ok(f) => f,
Err(_) => return result,
};
let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
if file_len == from_offset {
result.new_offset = file_len;
return result;
}
let effective_offset = if file_len < from_offset { 0 } else { from_offset };
let from_offset = effective_offset;
let mut reader = BufReader::new(file);
if from_offset > 0 {
let _ = reader.seek(SeekFrom::Start(from_offset));
}
let mtime = fs::metadata(path)
.ok()
.and_then(|m| m.modified().ok())
.unwrap_or(std::time::UNIX_EPOCH);
result.last_activity = mtime;
let mut bytes_read = from_offset;
let mut line_buf = String::new();
loop {
line_buf.clear();
match reader.read_line(&mut line_buf) {
Ok(0) => break,
Ok(n) => {
let has_newline = line_buf.ends_with('\n');
let line = line_buf.trim();
if line.is_empty() {
if has_newline {
bytes_read += n as u64;
}
continue;
}
let val = match serde_json::from_str::<Value>(line) {
Ok(v) => v,
Err(_) => {
if has_newline {
bytes_read += n as u64;
}
if !has_newline { break; }
continue;
}
};
bytes_read += n as u64;
{
match val.get("type").and_then(|t| t.as_str()) {
Some("assistant") => {
result.turn_count += 1;
result.current_task = String::new();
if let Some(msg) = val.get("message") {
if let Some(m) = msg.get("model").and_then(|m| m.as_str()) {
result.model = m.to_string();
}
if let Some(usage) = msg.get("usage") {
let inp = usage.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
let out = usage.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
let cr = usage.get("cache_read_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
let cc = usage.get("cache_creation_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
result.total_input += inp;
result.total_output += out;
result.total_cache_read += cr;
result.total_cache_create += cc;
result.last_context_tokens = inp + cr + cc;
result.token_history.push(inp + out + cr + cc);
}
if let Some(content) = msg.get("content").and_then(|c| c.as_array()) {
for item in content.iter().rev() {
if item.get("type").and_then(|t| t.as_str()) == Some("tool_use") {
let tool = item.get("name").and_then(|n| n.as_str()).unwrap_or("?");
let arg = extract_tool_arg(item);
result.current_task = format!("{} {}", tool, arg);
break;
}
}
}
}
}
Some("user") => {
if let Some(v) = val.get("version").and_then(|v| v.as_str()) {
result.version = v.to_string();
}
if let Some(b) = val.get("gitBranch").and_then(|b| b.as_str()) {
result.git_branch = b.to_string();
}
if result.initial_prompt.is_empty() {
if let Some(msg) = val.get("message") {
result.initial_prompt = extract_prompt_text(msg);
}
}
}
_ => {}
}
}
}
Err(_) => break,
}
}
result.new_offset = bytes_read;
result
}
fn extract_prompt_text(message: &Value) -> String {
let raw = match message.get("content") {
Some(Value::String(s)) => s.clone(),
Some(Value::Array(arr)) => {
arr.iter()
.filter_map(|block| {
if block.get("type").and_then(|t| t.as_str()) == Some("text") {
block.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
} else {
None
}
})
.next()
.unwrap_or_default()
}
_ => return String::new(),
};
let cleaned: String = raw
.lines()
.map(|l| l.trim())
.filter(|l| !l.is_empty() && !l.starts_with('#') && !l.starts_with("```"))
.collect::<Vec<_>>()
.join(" ");
let mut result = cleaned;
while let Some(start) = result.find("[Image") {
if let Some(end) = result[start..].find(']') {
result = format!("{}{}", &result[..start], result[start + end + 1..].trim_start());
} else {
break;
}
}
let clean = result.trim().to_string();
if clean.is_empty() {
return String::new();
}
if clean.contains("You are a conversation title generator") {
return String::new();
}
truncate(&clean, 50)
}
fn extract_tool_arg(tool_use: &Value) -> String {
if let Some(input) = tool_use.get("input") {
if let Some(fp) = input.get("file_path").and_then(|f| f.as_str()) {
return shorten_path(fp);
}
if let Some(cmd) = input.get("command").and_then(|c| c.as_str()) {
let short = cmd.lines().next().unwrap_or(cmd);
return truncate(short, 40);
}
if let Some(pat) = input.get("pattern").and_then(|p| p.as_str()) {
return truncate(pat, 40);
}
}
String::new()
}
fn shorten_path(path: &str) -> String {
let parts: Vec<&str> = path.rsplit('/').collect();
if parts.len() <= 2 {
path.to_string()
} else {
format!("{}/{}", parts[1], parts[0])
}
}
fn truncate(s: &str, max: usize) -> String {
if max == 0 {
return String::new();
}
if s.chars().count() <= max {
s.to_string()
} else {
let truncated: String = s.chars().take(max - 1).collect();
format!("{}…", truncated)
}
}
#[allow(dead_code)]
fn is_claude_process(pid: u32) -> bool {
let output = Command::new("ps")
.args(["-p", &pid.to_string(), "-o", "command="])
.output()
.ok();
match output {
Some(out) => {
let cmd = String::from_utf8_lossy(&out.stdout);
cmd.contains("/claude") && cmd.contains("--session-id")
}
None => false,
}
}
fn context_window_for_model(model: &str) -> u64 {
if model.contains("[1m]") {
1_000_000
} else {
200_000
}
}