1use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
18
19pub const AGENT_CLAUDE: &str = "claude";
20pub const AGENT_CODEX: &str = "codex";
21pub const AGENT_GEMINI: &str = "gemini";
22
23pub const TRACE_EBPF_FILE: &str = "ebpf_file";
24pub const TRACE_PROC_FD: &str = "proc_fd";
25pub const TRACE_STICKY_BINDING: &str = "sticky";
26pub const TRACE_RECENT_CWD: &str = "cwd_recent";
27pub const SOURCE_SESSION_PROCESS_MATCH: &str = "agent_session.process_match";
28
29const SESSION_PROCESS_START_SKEW_MS: u64 = 30_000;
30
31#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
32pub struct TokenUsage {
33 pub input_tokens: i64,
34 pub output_tokens: i64,
35 pub cache_creation_tokens: i64,
36 pub cache_read_tokens: i64,
37 pub total_tokens: i64,
38}
39
40impl TokenUsage {
41 fn add(&mut self, input: i64, output: i64, cache_creation: i64, cache_read: i64, total: i64) {
42 self.input_tokens += input;
43 self.output_tokens += output;
44 self.cache_creation_tokens += cache_creation;
45 self.cache_read_tokens += cache_read;
46 self.total_tokens += if total > 0 {
47 total
48 } else {
49 input + output + cache_creation + cache_read
50 };
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct AgentSession {
56 pub agent: String,
57 pub session_id: String,
58 pub display_id: String,
59 pub path: PathBuf,
60 pub updated: SystemTime,
61 pub start_timestamp_ms: Option<u64>,
62 pub end_timestamp_ms: Option<u64>,
63 pub model: Option<String>,
64 pub token_usage: TokenUsage,
65 pub model_usage: BTreeMap<String, TokenUsage>,
66 pub tools: BTreeMap<String, usize>,
67 pub files: BTreeMap<String, usize>,
68 pub prompt_preview: Option<String>,
69 pub duration_ms: u64,
70 pub cwd: Option<String>,
71 pub last_message_at: Option<String>,
72}
73
74#[derive(Debug, Clone)]
75pub struct SessionCandidate {
76 pub agent: &'static str,
77 pub path: PathBuf,
78 pub updated: SystemTime,
79}
80
81#[derive(Debug, Clone)]
82pub struct SessionDirStat {
83 pub agent: &'static str,
84 pub dir: PathBuf,
85 pub sessions: usize,
86 pub bytes: u64,
87}
88
89#[derive(Default)]
90pub struct SessionCache {
91 entries: HashMap<PathBuf, CacheEntry>,
92 cached_sessions: Vec<AgentSession>,
93 last_refresh: Option<Instant>,
94 last_limit: usize,
95}
96
97struct CacheEntry {
98 mtime: SystemTime,
99 session: Option<AgentSession>,
100}
101
102impl SessionCache {
103 pub fn new() -> Self {
104 Self::default()
105 }
106
107 pub fn discover_cached(&mut self, limit: usize, max_age: Duration) -> Vec<AgentSession> {
108 let target = limit.clamp(1, 25);
109 if self.last_limit < target
110 || self
111 .last_refresh
112 .is_none_or(|last| last.elapsed() >= max_age)
113 {
114 self.refresh(target);
115 }
116 self.cached_sessions.iter().take(target).cloned().collect()
117 }
118
119 fn refresh(&mut self, limit: usize) {
120 let mut candidates = discover_session_files();
121 candidates.sort_by_key(|candidate| std::cmp::Reverse(candidate.updated));
122 let target = limit.clamp(1, 25);
123 let mut live_paths = HashSet::new();
124 let mut sessions = Vec::new();
125 let mut seen = HashSet::new();
126
127 for candidate in candidates
128 .into_iter()
129 .take(target.saturating_mul(3).clamp(10, 75))
130 {
131 live_paths.insert(candidate.path.clone());
132 let session = match self.entries.get(&candidate.path) {
133 Some(entry) if entry.mtime == candidate.updated => entry.session.clone(),
134 _ => {
135 let parsed = parse_session_file(&candidate);
136 self.entries.insert(
137 candidate.path.clone(),
138 CacheEntry {
139 mtime: candidate.updated,
140 session: parsed.clone(),
141 },
142 );
143 parsed
144 }
145 };
146 if let Some(session) = session
147 && seen.insert(session.display_id.clone())
148 {
149 sessions.push(session);
150 if sessions.len() >= target {
151 break;
152 }
153 }
154 }
155 self.entries.retain(|path, _| live_paths.contains(path));
156 self.cached_sessions = sessions;
157 self.last_refresh = Some(Instant::now());
158 self.last_limit = target;
159 }
160}
161
162pub fn discover_session_files() -> Vec<SessionCandidate> {
163 user_home_dir()
164 .as_deref()
165 .map(discover_session_files_in_home)
166 .unwrap_or_default()
167}
168
169pub fn discover_session_files_in_home(home: &Path) -> Vec<SessionCandidate> {
170 let roots = [
171 (AGENT_CLAUDE, home.join(".claude/projects")),
172 (AGENT_CODEX, home.join(".codex/sessions")),
173 (AGENT_GEMINI, home.join(".gemini/tmp")),
174 ];
175 let mut out = Vec::new();
176 for (agent, dir) in roots {
177 walk_agent_files(agent, &dir, &mut |path, meta| {
178 out.push(SessionCandidate {
179 agent,
180 path: path.to_path_buf(),
181 updated: meta.modified().unwrap_or(UNIX_EPOCH),
182 });
183 });
184 }
185 out
186}
187
188pub fn count_session_dirs() -> Vec<SessionDirStat> {
189 let Some(home) = user_home_dir() else {
190 return Vec::new();
191 };
192 [
193 (AGENT_CLAUDE, home.join(".claude/projects")),
194 (AGENT_CODEX, home.join(".codex/sessions")),
195 (AGENT_GEMINI, home.join(".gemini/tmp")),
196 ]
197 .into_iter()
198 .filter_map(|(agent, dir)| {
199 let (mut sessions, mut bytes) = (0usize, 0u64);
200 walk_agent_files(agent, &dir, &mut |_, meta| {
201 sessions += 1;
202 bytes += meta.len();
203 });
204 (sessions > 0).then_some(SessionDirStat {
205 agent,
206 dir,
207 sessions,
208 bytes,
209 })
210 })
211 .collect()
212}
213
214pub fn parse_session_file(candidate: &SessionCandidate) -> Option<AgentSession> {
215 let content = fs::read_to_string(&candidate.path).ok()?;
216 parse_session_content(
217 candidate.agent,
218 &candidate.path,
219 candidate.updated,
220 &content,
221 )
222}
223
224pub fn parse_session_path(path: &Path) -> Option<AgentSession> {
225 let agent = agent_source_for_path(path)?;
226 let updated = fs::metadata(path)
227 .and_then(|metadata| metadata.modified())
228 .unwrap_or(UNIX_EPOCH);
229 parse_session_file(&SessionCandidate {
230 agent,
231 path: path.to_path_buf(),
232 updated,
233 })
234}
235
236pub fn parse_session_content(
237 agent: &str,
238 path: &Path,
239 updated: SystemTime,
240 content: &str,
241) -> Option<AgentSession> {
242 if agent == AGENT_GEMINI {
243 parse_gemini_json(path, updated, content)
244 } else {
245 parse_jsonl(agent, path, updated, content)
246 }
247}
248
249pub fn session_log_path_from_str(raw: &str) -> Option<PathBuf> {
250 let trimmed = raw.trim().trim_end_matches(" (deleted)");
251 if trimmed.is_empty() {
252 return None;
253 }
254 let path = Path::new(trimmed);
255 if !path.is_absolute() || !is_agent_session_file(path) {
256 return None;
257 }
258 agent_source_for_path(path).map(|_| normalize_session_log_path(path))
259}
260
261pub fn normalize_session_log_path(path: &Path) -> PathBuf {
262 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
263}
264
265pub fn agent_source_for_path(path: &Path) -> Option<&'static str> {
266 let value = path.to_string_lossy();
267 if value.contains("/.claude/") && path.extension().and_then(|ext| ext.to_str()) == Some("jsonl")
268 {
269 Some(AGENT_CLAUDE)
270 } else if value.contains("/.codex/")
271 && path.extension().and_then(|ext| ext.to_str()) == Some("jsonl")
272 {
273 Some(AGENT_CODEX)
274 } else if value.contains("/.gemini/")
275 && path.extension().and_then(|ext| ext.to_str()) == Some("json")
276 {
277 Some(AGENT_GEMINI)
278 } else {
279 None
280 }
281}
282
283pub fn fixture_session_path(agent: &str, home: &Path) -> Option<PathBuf> {
284 match agent {
285 AGENT_CLAUDE => Some(home.join(".claude/projects/test/session.jsonl")),
286 AGENT_CODEX => Some(home.join(".codex/sessions/2026/06/02/session.jsonl")),
287 AGENT_GEMINI => Some(home.join(".gemini/tmp/test/chats/session-test.json")),
288 _ => None,
289 }
290}
291
292pub fn is_codex_cli_entrypoint(target: Option<&str>) -> bool {
293 target.is_some_and(|target| {
294 Path::new(target).file_name().and_then(|name| name.to_str()) == Some("codex")
295 && !target.contains("/node_modules/")
296 })
297}
298
299pub fn codex_exec_prompt(command: &str) -> Option<String> {
300 let mut args = command.split_once(" exec ")?.1.trim();
301 while let Some(rest) = strip_codex_exec_option(args) {
302 args = rest.trim_start();
303 }
304 (!args.starts_with('-'))
305 .then(|| args.trim_matches(['"', '\'']))
306 .and_then(clean_prompt_text)
307}
308
309fn parse_jsonl(
310 agent: &str,
311 path: &Path,
312 updated: SystemTime,
313 content: &str,
314) -> Option<AgentSession> {
315 let mut acc = SessionAccumulator::new(agent, path, updated);
316 let mut codex_model = String::new();
317 let mut claude_message_models = BTreeMap::<String, TokenUsage>::new();
318 let mut claude_seen_usage = HashSet::new();
319
320 for line in content.lines() {
321 let Ok(obj) = serde_json::from_str::<Value>(line) else {
322 continue;
323 };
324 if let Some(id) = local_session_id(&obj) {
325 acc.session_id = id;
326 }
327 if acc.cwd.is_none() {
328 acc.cwd = obj
329 .get("cwd")
330 .and_then(Value::as_str)
331 .or_else(|| obj.pointer("/payload/cwd").and_then(Value::as_str))
332 .filter(|s| !s.is_empty())
333 .map(ToString::to_string);
334 }
335 if let Some(ts) = obj.get("timestamp").and_then(Value::as_str) {
336 acc.last_message_at = Some(ts.to_string());
337 acc.end_timestamp_ms = iso_ms(ts).or(acc.end_timestamp_ms);
338 }
339 let typ = obj.get("type").and_then(Value::as_str).unwrap_or("");
340 match (agent, typ) {
341 (AGENT_CLAUDE, "result") => {
342 acc.duration_ms = json_u64(&obj, "duration_ms");
343 if let Some(model_usage) = obj.get("modelUsage").and_then(Value::as_object) {
344 for (name, usage) in model_usage {
345 acc.model.get_or_insert_with(|| name.clone());
346 acc.add_usage(
347 name,
348 json_i64(usage, "inputTokens"),
349 json_i64(usage, "outputTokens"),
350 json_i64(usage, "cacheCreationInputTokens"),
351 json_i64(usage, "cacheReadInputTokens"),
352 0,
353 );
354 }
355 }
356 }
357 (AGENT_CLAUDE, "assistant") => {
358 if let Some(name) = obj.pointer("/message/model").and_then(Value::as_str) {
359 acc.model.get_or_insert_with(|| name.to_string());
360 }
361 if let Some(usage) = obj.pointer("/message/usage")
362 && claude_seen_usage.insert(claude_usage_key(&obj))
363 {
364 let name = obj
365 .pointer("/message/model")
366 .and_then(Value::as_str)
367 .unwrap_or("unknown");
368 add_usage(
369 &mut claude_message_models,
370 name,
371 json_i64(usage, "input_tokens"),
372 json_i64(usage, "output_tokens"),
373 json_i64(usage, "cache_creation_input_tokens"),
374 json_i64(usage, "cache_read_input_tokens"),
375 0,
376 );
377 }
378 if let Some(items) = obj.pointer("/message/content").and_then(Value::as_array) {
379 for item in items
380 .iter()
381 .filter(|item| item.get("type").and_then(Value::as_str) == Some("tool_use"))
382 {
383 let name = item.get("name").and_then(Value::as_str).unwrap_or("?");
384 acc.add_tool(name);
385 if let Some(fp) = item
386 .pointer("/input/file_path")
387 .and_then(Value::as_str)
388 .filter(|s| !is_noise_path(s))
389 {
390 acc.add_file(fp);
391 }
392 }
393 }
394 }
395 (AGENT_CLAUDE, "queue-operation") if acc.prompt_preview.is_none() => {
396 if obj.get("operation").and_then(Value::as_str) == Some("enqueue")
397 && let Some(text) = obj.get("content").and_then(Value::as_str)
398 && let Some(text) = clean_prompt_text(text)
399 {
400 acc.prompt_preview = Some(text);
401 }
402 }
403 (AGENT_CLAUDE, "last-prompt") if acc.prompt_preview.is_none() => {
404 if let Some(text) = obj.get("lastPrompt").and_then(Value::as_str)
405 && let Some(text) = clean_prompt_text(text)
406 {
407 acc.prompt_preview = Some(text);
408 }
409 }
410 (AGENT_CLAUDE, "user") => {
411 if acc.prompt_preview.is_none()
412 && !is_claude_tool_result(&obj)
413 && let Some(text) =
414 local_message_preview(obj.pointer("/message/content").unwrap_or(&obj))
415 {
416 acc.prompt_preview = Some(text);
417 }
418 }
419 (AGENT_CODEX, "turn_context") => {
420 if let Some(name) = obj.pointer("/payload/model").and_then(Value::as_str) {
421 codex_model = name.to_string();
422 acc.model = Some(name.to_string());
423 }
424 }
425 (AGENT_CODEX, "event_msg") => {
426 if obj.pointer("/payload/type").and_then(Value::as_str) == Some("token_count")
427 && let Some(usage) = obj.pointer("/payload/info/total_token_usage")
428 {
429 let name = if codex_model.is_empty() {
430 "unknown"
431 } else {
432 &codex_model
433 };
434 acc.set_usage(
435 name,
436 json_i64(usage, "input_tokens"),
437 json_i64(usage, "output_tokens"),
438 0,
439 0,
440 json_i64(usage, "total_tokens"),
441 );
442 }
443 }
444 (AGENT_CODEX, "response_item")
445 if obj.pointer("/payload/type").and_then(Value::as_str)
446 == Some("function_call") =>
447 {
448 let name = obj
449 .pointer("/payload/name")
450 .and_then(Value::as_str)
451 .unwrap_or("?");
452 acc.add_tool(name);
453 }
454 (AGENT_CODEX, "message" | "input" | "user") => {
455 if let Some(text) = local_message_preview(&obj) {
456 acc.prompt_preview = Some(text);
457 }
458 }
459 _ if acc.prompt_preview.is_none() && typ.contains("user") => {
460 if let Some(text) = local_message_preview(&obj) {
461 acc.prompt_preview = Some(text);
462 }
463 }
464 _ => {}
465 }
466 }
467
468 if acc.model_usage.is_empty() {
469 acc.model_usage = claude_message_models;
470 }
471 acc.finish()
472}
473
474fn parse_gemini_json(path: &Path, updated: SystemTime, content: &str) -> Option<AgentSession> {
475 let root: Value = serde_json::from_str(content).ok()?;
476 let mut acc = SessionAccumulator::new(AGENT_GEMINI, path, updated);
477 if let Some(id) = root.get("sessionId").and_then(Value::as_str) {
478 acc.session_id = id.to_string();
479 }
480 acc.start_timestamp_ms = root
481 .get("startTime")
482 .and_then(Value::as_str)
483 .and_then(iso_ms);
484 acc.end_timestamp_ms = root
485 .get("lastUpdated")
486 .and_then(Value::as_str)
487 .and_then(iso_ms)
488 .or(acc.start_timestamp_ms);
489 acc.duration_ms = acc
490 .start_timestamp_ms
491 .zip(acc.end_timestamp_ms)
492 .map(|(start, end)| end.saturating_sub(start))
493 .unwrap_or_default();
494
495 let Some(messages) = root.get("messages").and_then(Value::as_array) else {
496 return acc.finish();
497 };
498 for msg in messages {
499 if let Some(ts) = msg.get("timestamp").and_then(Value::as_str) {
500 acc.last_message_at = Some(ts.to_string());
501 }
502 match msg.get("type").and_then(Value::as_str) {
503 Some("user") if acc.prompt_preview.is_none() => {
504 if let Some(text) = local_message_preview(msg.get("content").unwrap_or(msg)) {
505 acc.prompt_preview = Some(text);
506 }
507 }
508 Some("gemini") | Some("assistant") | Some("model") => {
509 if let Some(model) = msg.get("model").and_then(Value::as_str) {
510 acc.model.get_or_insert_with(|| model.to_string());
511 if let Some(tokens) = msg.get("tokens") {
512 acc.add_usage(
513 model,
514 json_i64(tokens, "input"),
515 json_i64(tokens, "output"),
516 0,
517 json_i64(tokens, "cached"),
518 json_i64(tokens, "total"),
519 );
520 }
521 }
522 if let Some(tool_calls) = msg.get("toolCalls").and_then(Value::as_array) {
523 for call in tool_calls {
524 let name = call.get("name").and_then(Value::as_str).unwrap_or("?");
525 acc.add_tool(name);
526 if let Some(path) = find_file_arg(call).filter(|path| !is_noise_path(path))
527 {
528 acc.add_file(path);
529 }
530 }
531 }
532 }
533 _ => {}
534 }
535 }
536 acc.finish()
537}
538
539struct SessionAccumulator {
540 agent: String,
541 session_id: String,
542 path: PathBuf,
543 updated: SystemTime,
544 start_timestamp_ms: Option<u64>,
545 end_timestamp_ms: Option<u64>,
546 model: Option<String>,
547 model_usage: BTreeMap<String, TokenUsage>,
548 tools: BTreeMap<String, usize>,
549 files: BTreeMap<String, usize>,
550 prompt_preview: Option<String>,
551 duration_ms: u64,
552 cwd: Option<String>,
553 last_message_at: Option<String>,
554}
555
556impl SessionAccumulator {
557 fn new(agent: &str, path: &Path, updated: SystemTime) -> Self {
558 let normalized = normalize_session_log_path(path);
559 let session_id = path
560 .file_stem()
561 .and_then(|stem| stem.to_str())
562 .unwrap_or("session")
563 .to_string();
564 Self {
565 agent: agent.to_string(),
566 session_id,
567 path: normalized.clone(),
568 updated,
569 start_timestamp_ms: None,
570 end_timestamp_ms: Some(system_time_ms(updated)),
571 model: None,
572 model_usage: BTreeMap::new(),
573 tools: BTreeMap::new(),
574 files: BTreeMap::new(),
575 prompt_preview: None,
576 duration_ms: 0,
577 cwd: None,
578 last_message_at: None,
579 }
580 }
581
582 fn add_usage(
583 &mut self,
584 model: &str,
585 input: i64,
586 output: i64,
587 cache_creation: i64,
588 cache_read: i64,
589 total: i64,
590 ) {
591 add_usage(
592 &mut self.model_usage,
593 model,
594 input,
595 output,
596 cache_creation,
597 cache_read,
598 total,
599 );
600 }
601
602 fn set_usage(
603 &mut self,
604 model: &str,
605 input: i64,
606 output: i64,
607 cache_creation: i64,
608 cache_read: i64,
609 total: i64,
610 ) {
611 let mut usage = TokenUsage::default();
612 usage.add(input, output, cache_creation, cache_read, total);
613 self.model_usage.insert(model.to_string(), usage);
614 }
615
616 fn add_tool(&mut self, name: &str) {
617 *self.tools.entry(name.to_string()).or_default() += 1;
618 }
619
620 fn add_file(&mut self, path: &str) {
621 *self.files.entry(path.to_string()).or_default() += 1;
622 }
623
624 fn finish(self) -> Option<AgentSession> {
625 let token_usage =
626 self.model_usage
627 .values()
628 .fold(TokenUsage::default(), |mut total, usage| {
629 total.input_tokens += usage.input_tokens;
630 total.output_tokens += usage.output_tokens;
631 total.cache_creation_tokens += usage.cache_creation_tokens;
632 total.cache_read_tokens += usage.cache_read_tokens;
633 total.total_tokens += usage.total_tokens;
634 total
635 });
636 if token_usage.total_tokens == 0
637 && self.tools.is_empty()
638 && self.prompt_preview.is_none()
639 && self.model.is_none()
640 {
641 return None;
642 }
643 let display_id = format!("{}:{}", self.agent, short_session_id(&self.session_id));
644 Some(AgentSession {
645 agent: self.agent,
646 session_id: self.session_id,
647 display_id,
648 path: self.path,
649 updated: self.updated,
650 start_timestamp_ms: self
651 .start_timestamp_ms
652 .or_else(|| Some(system_time_ms(self.updated).saturating_sub(self.duration_ms))),
653 end_timestamp_ms: self.end_timestamp_ms,
654 model: self.model,
655 token_usage,
656 model_usage: self.model_usage,
657 tools: self.tools,
658 files: self.files,
659 prompt_preview: self.prompt_preview,
660 duration_ms: self.duration_ms,
661 cwd: self.cwd,
662 last_message_at: self.last_message_at,
663 })
664 }
665}
666
667#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
668pub struct ProcessKey {
669 pub pid: u32,
670 pub starttime_ticks: u64,
671}
672
673#[derive(Debug, Clone, Default)]
674pub struct ProcessTree {
675 pub root: ProcessKey,
676 pub members: Vec<ProcessKey>,
677}
678
679#[derive(Debug, Clone, Default)]
680pub struct LiveProcessCandidate {
681 pub tree: ProcessTree,
682 pub agent: String,
683 pub age_s: Option<f64>,
684 pub cwd: Option<String>,
685}
686
687#[derive(Debug, Clone)]
688pub struct SessionProcessInput {
689 pub id: String,
690 pub agent: String,
691 pub path: PathBuf,
692 pub start_timestamp_ms: Option<u64>,
693 pub end_timestamp_ms: Option<u64>,
694 pub cwd: Option<String>,
695}
696
697#[derive(Debug, Clone)]
698pub struct SessionProcessMatch {
699 pub session_id: String,
700 pub root_pid: u32,
701 pub matched_pids: Vec<u32>,
702 pub pid_starttime_ticks: u64,
703 pub source: &'static str,
704 pub confidence: f32,
705 pub evidence: &'static str,
706}
707
708#[derive(Debug, Default)]
709pub struct SessionProcessMatches {
710 pub by_session_id: HashMap<String, SessionProcessMatch>,
711 pub by_pid: HashMap<u32, String>,
712 pub used_root_pids: HashSet<u32>,
713}
714
715impl SessionProcessMatches {
716 pub fn session_for_pid(&self, pid: u32) -> Option<&SessionProcessMatch> {
717 self.by_pid
718 .get(&pid)
719 .and_then(|session_id| self.by_session_id.get(session_id))
720 }
721}
722
723#[derive(Default)]
724pub struct SessionProcessMatcher {
725 bindings: HashMap<u32, LiveSessionBinding>,
726}
727
728struct LiveSessionBinding {
729 starttime_ticks: u64,
730 session_path: PathBuf,
731}
732
733impl SessionProcessMatcher {
734 pub fn match_sessions(
735 &mut self,
736 sessions: &[SessionProcessInput],
737 processes: &[LiveProcessCandidate],
738 fd_paths_by_process: &HashMap<ProcessKey, BTreeSet<PathBuf>>,
739 observed_path_by_process: &HashMap<ProcessKey, PathBuf>,
740 now_ms: u64,
741 ) -> SessionProcessMatches {
742 let path_evidence =
743 collect_path_evidence(processes, fd_paths_by_process, observed_path_by_process);
744 self.retain_live(processes);
745
746 let mut out = SessionProcessMatches::default();
747 for session in sessions {
748 let Some((process, evidence)) = processes.iter().find_map(|process| {
749 if out.used_root_pids.contains(&process.tree.root.pid)
750 || process.agent != session.agent
751 || !session_is_fresh_enough_for_process(session, process, now_ms)
752 {
753 return None;
754 }
755 self.link_trace(&session.path, process, &path_evidence)
756 .map(|evidence| (process, evidence))
757 }) else {
758 continue;
759 };
760 record_match(&mut out, session, process, evidence);
761 }
762
763 let mut cwd_candidates = Vec::new();
764 for (session_index, session) in sessions.iter().enumerate() {
765 if out.by_session_id.contains_key(&session.id) {
766 continue;
767 }
768 for (process_index, process) in processes.iter().enumerate() {
769 if out.used_root_pids.contains(&process.tree.root.pid)
770 || process.agent != session.agent
771 || !self.can_use_cwd_trace(&session.path, process, &path_evidence)
772 {
773 continue;
774 }
775 let Some(distance_ms) = recent_cwd_distance_ms(session, process, now_ms) else {
776 continue;
777 };
778 cwd_candidates.push((
779 distance_ms,
780 std::cmp::Reverse(session_end_ms(session)),
781 session_index,
782 process_index,
783 ));
784 }
785 }
786 cwd_candidates.sort_unstable();
787 for (_, _, session_index, process_index) in cwd_candidates {
788 let session = &sessions[session_index];
789 let process = &processes[process_index];
790 if out.by_session_id.contains_key(&session.id)
791 || out.used_root_pids.contains(&process.tree.root.pid)
792 {
793 continue;
794 }
795 record_match(&mut out, session, process, TRACE_RECENT_CWD);
796 }
797 out
798 }
799
800 fn retain_live(&mut self, processes: &[LiveProcessCandidate]) {
801 self.bindings.retain(|pid, binding| {
802 processes.iter().any(|process| {
803 process.tree.root.pid == *pid
804 && process.tree.root.starttime_ticks == binding.starttime_ticks
805 })
806 });
807 }
808
809 fn link_trace(
810 &mut self,
811 session_path: &Path,
812 process: &LiveProcessCandidate,
813 path_evidence: &HashMap<u32, BTreeMap<PathBuf, &'static str>>,
814 ) -> Option<&'static str> {
815 let pid = process.tree.root.pid;
816 let path = normalize_session_log_path(session_path);
817 if let Some(evidence) = path_evidence.get(&pid) {
818 if let Some(trace) = evidence.get(&path).copied() {
819 self.bindings.insert(
820 pid,
821 LiveSessionBinding {
822 starttime_ticks: process.tree.root.starttime_ticks,
823 session_path: path,
824 },
825 );
826 return Some(trace);
827 }
828 self.bindings.remove(&pid);
829 return None;
830 }
831 self.bindings
832 .get(&pid)
833 .filter(|binding| {
834 binding.starttime_ticks == process.tree.root.starttime_ticks
835 && binding.session_path == path
836 })
837 .map(|_| TRACE_STICKY_BINDING)
838 }
839
840 fn can_use_cwd_trace(
841 &self,
842 session_path: &Path,
843 process: &LiveProcessCandidate,
844 path_evidence: &HashMap<u32, BTreeMap<PathBuf, &'static str>>,
845 ) -> bool {
846 let pid = process.tree.root.pid;
847 if path_evidence.contains_key(&pid) {
848 return false;
849 }
850 let path = normalize_session_log_path(session_path);
851 !self.bindings.get(&pid).is_some_and(|binding| {
852 binding.starttime_ticks == process.tree.root.starttime_ticks
853 && binding.session_path != path
854 })
855 }
856}
857
858fn record_match(
859 out: &mut SessionProcessMatches,
860 session: &SessionProcessInput,
861 process: &LiveProcessCandidate,
862 evidence: &'static str,
863) {
864 let matched_pids = process
865 .tree
866 .members
867 .iter()
868 .map(|key| key.pid)
869 .collect::<Vec<_>>();
870 out.used_root_pids.insert(process.tree.root.pid);
871 for pid in &matched_pids {
872 out.by_pid.insert(*pid, session.id.clone());
873 }
874 out.by_session_id.insert(
875 session.id.clone(),
876 SessionProcessMatch {
877 session_id: session.id.clone(),
878 root_pid: process.tree.root.pid,
879 matched_pids,
880 pid_starttime_ticks: process.tree.root.starttime_ticks,
881 source: SOURCE_SESSION_PROCESS_MATCH,
882 confidence: confidence_for_evidence(evidence),
883 evidence,
884 },
885 );
886}
887
888fn collect_path_evidence(
889 processes: &[LiveProcessCandidate],
890 fd_paths_by_process: &HashMap<ProcessKey, BTreeSet<PathBuf>>,
891 observed_path_by_process: &HashMap<ProcessKey, PathBuf>,
892) -> HashMap<u32, BTreeMap<PathBuf, &'static str>> {
893 let mut out = HashMap::new();
894 for process in processes {
895 let mut evidence = BTreeMap::new();
896 for key in &process.tree.members {
897 if let Some(paths) = fd_paths_by_process.get(key) {
898 for path in paths {
899 if let Some(session_path) = session_log_path_from_str(&path.to_string_lossy()) {
900 evidence.entry(session_path).or_insert(TRACE_PROC_FD);
901 }
902 }
903 }
904 if let Some(path) = observed_path_by_process.get(key)
905 && let Some(session_path) = session_log_path_from_str(&path.to_string_lossy())
906 {
907 evidence.insert(session_path, TRACE_EBPF_FILE);
908 }
909 }
910 if !evidence.is_empty() {
911 out.insert(process.tree.root.pid, evidence);
912 }
913 }
914 out
915}
916
917fn session_is_fresh_enough_for_process(
918 session: &SessionProcessInput,
919 process: &LiveProcessCandidate,
920 now_ms: u64,
921) -> bool {
922 let Some(process_start_ms) = process_start_ms(process, now_ms) else {
923 return true;
924 };
925 session_end_ms(session).saturating_add(SESSION_PROCESS_START_SKEW_MS) >= process_start_ms
926}
927
928fn recent_cwd_distance_ms(
929 session: &SessionProcessInput,
930 process: &LiveProcessCandidate,
931 now_ms: u64,
932) -> Option<u64> {
933 let session_cwd = session.cwd.as_deref().filter(|value| !value.is_empty())?;
934 let process_cwd = process.cwd.as_deref().filter(|value| !value.is_empty())?;
935 if session_cwd != process_cwd {
936 return None;
937 }
938 let process_start_ms = process_start_ms(process, now_ms)?;
939 let session_end_ms = session_end_ms(session);
940 (session_end_ms.saturating_add(SESSION_PROCESS_START_SKEW_MS) >= process_start_ms)
941 .then_some(session_end_ms.abs_diff(process_start_ms))
942}
943
944fn process_start_ms(process: &LiveProcessCandidate, now_ms: u64) -> Option<u64> {
945 process
946 .age_s
947 .map(|age_s| now_ms.saturating_sub((age_s.max(0.0) * 1000.0).round() as u64))
948}
949
950fn session_end_ms(session: &SessionProcessInput) -> u64 {
951 session
952 .end_timestamp_ms
953 .or(session.start_timestamp_ms)
954 .unwrap_or_default()
955}
956
957fn confidence_for_evidence(evidence: &str) -> f32 {
958 match evidence {
959 TRACE_EBPF_FILE => 0.95,
960 TRACE_PROC_FD => 0.90,
961 TRACE_STICKY_BINDING => 0.70,
962 TRACE_RECENT_CWD => 0.55,
963 _ => 0.50,
964 }
965}
966
967fn walk_agent_files(agent: &'static str, dir: &Path, f: &mut dyn FnMut(&Path, &fs::Metadata)) {
968 let Ok(entries) = fs::read_dir(dir) else {
969 return;
970 };
971 for entry in entries.flatten() {
972 let path = entry.path();
973 if path.is_dir() {
974 walk_agent_files(agent, &path, f);
975 } else if is_agent_file_for(agent, &path)
976 && let Ok(meta) = path.metadata()
977 {
978 f(&path, &meta);
979 }
980 }
981}
982
983fn is_agent_session_file(path: &Path) -> bool {
984 agent_source_for_path(path).is_some()
985}
986
987fn is_agent_file_for(agent: &str, path: &Path) -> bool {
988 match agent {
989 AGENT_CLAUDE | AGENT_CODEX => {
990 path.extension().and_then(|ext| ext.to_str()) == Some("jsonl")
991 }
992 AGENT_GEMINI => {
993 path.extension().and_then(|ext| ext.to_str()) == Some("json")
994 && path
995 .file_name()
996 .and_then(|name| name.to_str())
997 .is_some_and(|name| name.starts_with("session-"))
998 && path.to_string_lossy().contains("/chats/")
999 }
1000 _ => false,
1001 }
1002}
1003
1004fn user_home_dir() -> Option<PathBuf> {
1005 std::env::var("SUDO_USER")
1006 .ok()
1007 .and_then(|user| {
1008 fs::read_to_string("/etc/passwd").ok().and_then(|passwd| {
1009 passwd
1010 .lines()
1011 .find(|line| line.starts_with(&format!("{user}:")))
1012 .and_then(|line| line.split(':').nth(5))
1013 .map(PathBuf::from)
1014 })
1015 })
1016 .or_else(dirs::home_dir)
1017}
1018
1019fn add_usage(
1020 models: &mut BTreeMap<String, TokenUsage>,
1021 model: &str,
1022 input: i64,
1023 output: i64,
1024 cache_creation: i64,
1025 cache_read: i64,
1026 total: i64,
1027) {
1028 models.entry(model.to_string()).or_default().add(
1029 input,
1030 output,
1031 cache_creation,
1032 cache_read,
1033 total,
1034 );
1035}
1036
1037fn local_session_id(obj: &Value) -> Option<String> {
1038 for key in ["sessionId", "session_id", "conversation_id"] {
1039 if let Some(value) = obj.get(key).and_then(Value::as_str)
1040 && !value.is_empty()
1041 {
1042 return Some(value.to_string());
1043 }
1044 }
1045 for pointer in ["/payload/session_id", "/payload/sessionId"] {
1046 if let Some(value) = obj.pointer(pointer).and_then(Value::as_str)
1047 && !value.is_empty()
1048 {
1049 return Some(value.to_string());
1050 }
1051 }
1052 None
1053}
1054
1055fn strip_codex_exec_option(args: &str) -> Option<&str> {
1056 let (head, rest) = args.split_once(char::is_whitespace).unwrap_or((args, ""));
1057 match head {
1058 "--json" | "--skip-git-repo-check" | "--ephemeral" => Some(rest),
1059 "-C" | "-a" | "-s" | "-m" | "-c" | "-p" => rest
1060 .trim_start()
1061 .split_once(char::is_whitespace)
1062 .map(|(_, rest)| rest),
1063 _ => None,
1064 }
1065}
1066
1067fn claude_usage_key(obj: &Value) -> String {
1068 obj.get("requestId")
1069 .or_else(|| obj.pointer("/message/id"))
1070 .or_else(|| obj.get("uuid"))
1071 .and_then(Value::as_str)
1072 .unwrap_or("usage")
1073 .to_string()
1074}
1075
1076fn local_message_preview(value: &Value) -> Option<String> {
1077 let mut parts = Vec::new();
1078 collect_local_text(value, &mut parts);
1079 clean_prompt_text(&parts.join(" "))
1080}
1081
1082fn collect_local_text(value: &Value, out: &mut Vec<String>) {
1083 match value {
1084 Value::String(text) => out.push(text.clone()),
1085 Value::Array(items) => {
1086 for item in items {
1087 collect_local_text(item, out);
1088 }
1089 }
1090 Value::Object(obj) => {
1091 if obj.get("type").and_then(Value::as_str).is_some_and(|typ| {
1092 typ == "tool_use" || typ == "function_call" || typ == "tool_result"
1093 }) {
1094 return;
1095 }
1096 for key in ["text", "content", "message", "input", "prompt"] {
1097 if let Some(value) = obj.get(key) {
1098 collect_local_text(value, out);
1099 }
1100 }
1101 }
1102 _ => {}
1103 }
1104}
1105
1106fn is_claude_tool_result(obj: &Value) -> bool {
1107 obj.get("toolUseResult").is_some()
1108 || obj.get("tool_use_result").is_some()
1109 || obj
1110 .pointer("/message/content")
1111 .and_then(Value::as_array)
1112 .is_some_and(|items| {
1113 items
1114 .iter()
1115 .any(|item| item.get("type").and_then(Value::as_str) == Some("tool_result"))
1116 })
1117}
1118
1119fn find_file_arg(value: &Value) -> Option<&str> {
1120 match value {
1121 Value::Object(obj) => {
1122 for key in ["file_path", "path", "filepath"] {
1123 if let Some(path) = obj.get(key).and_then(Value::as_str) {
1124 return Some(path);
1125 }
1126 }
1127 obj.values().find_map(find_file_arg)
1128 }
1129 Value::Array(items) => items.iter().find_map(find_file_arg),
1130 _ => None,
1131 }
1132}
1133
1134fn is_noise_path(path: &str) -> bool {
1135 const NOISE: &[&str] = &[
1136 "/.claude/",
1137 "/.codex/",
1138 "/.gemini/",
1139 "/.git/",
1140 "/node_modules/",
1141 "/.npm/",
1142 "/.cache/",
1143 "CLAUDE.md",
1144 "AGENTS.md",
1145 ];
1146 NOISE.iter().any(|pat| path.contains(pat))
1147}
1148
1149fn clean_prompt_text(text: &str) -> Option<String> {
1150 let text = text.split_whitespace().collect::<Vec<_>>().join(" ");
1151 let text = text
1152 .strip_prefix("<session>")
1153 .and_then(|text| text.strip_suffix("</session>"))
1154 .unwrap_or(&text)
1155 .trim();
1156 (!text.is_empty()).then(|| text.to_string())
1157}
1158
1159fn short_session_id(id: &str) -> String {
1160 let id = id.trim();
1161 if id.is_empty() {
1162 return "session".to_string();
1163 }
1164 let compact = id
1165 .rsplit(['/', '\\'])
1166 .next()
1167 .unwrap_or(id)
1168 .trim_end_matches(".jsonl");
1169 const MAX_SESSION_ID_CHARS: usize = 12;
1170 if compact.chars().count() <= MAX_SESSION_ID_CHARS {
1171 return compact.to_string();
1172 }
1173 let head = compact.chars().take(6).collect::<String>();
1174 let tail = compact
1175 .chars()
1176 .rev()
1177 .take(5)
1178 .collect::<Vec<_>>()
1179 .into_iter()
1180 .rev()
1181 .collect::<String>();
1182 format!("{head}.{tail}")
1183}
1184
1185fn json_i64(value: &Value, key: &str) -> i64 {
1186 value.get(key).and_then(Value::as_i64).unwrap_or(0)
1187}
1188
1189fn json_u64(value: &Value, key: &str) -> u64 {
1190 value.get(key).and_then(Value::as_u64).unwrap_or(0)
1191}
1192
1193fn iso_ms(value: &str) -> Option<u64> {
1194 chrono::DateTime::parse_from_rfc3339(value)
1195 .ok()
1196 .and_then(|ts| u64::try_from(ts.timestamp_millis()).ok())
1197}
1198
1199fn system_time_ms(value: SystemTime) -> u64 {
1200 value
1201 .duration_since(UNIX_EPOCH)
1202 .unwrap_or_default()
1203 .as_millis() as u64
1204}