Skip to main content

agent_session/
lib.rs

1// SPDX-License-Identifier: MIT
2// Copyright (c) 2026 eunomia-bpf org.
3
4//! Portable session IR, parsers, discovery, and process matching for local AI
5//! coding-agent transcripts.
6//!
7//! The crate currently normalizes Claude Code, Codex, and Gemini CLI sessions.
8//! It intentionally stops at session data and process/session correlation; UI,
9//! database storage, eBPF collection, and OpenTelemetry export belong in
10//! applications that consume this crate.
11
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
15use std::fs;
16use std::path::{Path, PathBuf};
17use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
18
19pub const AGENT_CLAUDE: &str = "claude";
20pub const AGENT_CODEX: &str = "codex";
21pub const AGENT_GEMINI: &str = "gemini";
22
23pub const TRACE_EBPF_FILE: &str = "ebpf_file";
24pub const TRACE_PROC_FD: &str = "proc_fd";
25pub const TRACE_STICKY_BINDING: &str = "sticky";
26pub const TRACE_RECENT_CWD: &str = "cwd_recent";
27pub const SOURCE_SESSION_PROCESS_MATCH: &str = "agent_session.process_match";
28
29const SESSION_PROCESS_START_SKEW_MS: u64 = 30_000;
30
31#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
32pub struct TokenUsage {
33    pub input_tokens: i64,
34    pub output_tokens: i64,
35    pub cache_creation_tokens: i64,
36    pub cache_read_tokens: i64,
37    pub total_tokens: i64,
38}
39
40impl TokenUsage {
41    fn add(&mut self, input: i64, output: i64, cache_creation: i64, cache_read: i64, total: i64) {
42        self.input_tokens += input;
43        self.output_tokens += output;
44        self.cache_creation_tokens += cache_creation;
45        self.cache_read_tokens += cache_read;
46        self.total_tokens += if total > 0 {
47            total
48        } else {
49            input + output + cache_creation + cache_read
50        };
51    }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct AgentSession {
56    pub agent: String,
57    pub session_id: String,
58    pub display_id: String,
59    pub path: PathBuf,
60    pub updated: SystemTime,
61    pub start_timestamp_ms: Option<u64>,
62    pub end_timestamp_ms: Option<u64>,
63    pub model: Option<String>,
64    pub token_usage: TokenUsage,
65    pub model_usage: BTreeMap<String, TokenUsage>,
66    pub tools: BTreeMap<String, usize>,
67    pub files: BTreeMap<String, usize>,
68    pub prompt_preview: Option<String>,
69    pub duration_ms: u64,
70    pub cwd: Option<String>,
71    pub last_message_at: Option<String>,
72}
73
74#[derive(Debug, Clone)]
75pub struct SessionCandidate {
76    pub agent: &'static str,
77    pub path: PathBuf,
78    pub updated: SystemTime,
79}
80
81#[derive(Debug, Clone)]
82pub struct SessionDirStat {
83    pub agent: &'static str,
84    pub dir: PathBuf,
85    pub sessions: usize,
86    pub bytes: u64,
87}
88
89#[derive(Default)]
90pub struct SessionCache {
91    entries: HashMap<PathBuf, CacheEntry>,
92    cached_sessions: Vec<AgentSession>,
93    last_refresh: Option<Instant>,
94    last_limit: usize,
95}
96
97struct CacheEntry {
98    mtime: SystemTime,
99    session: Option<AgentSession>,
100}
101
102impl SessionCache {
103    pub fn new() -> Self {
104        Self::default()
105    }
106
107    pub fn discover_cached(&mut self, limit: usize, max_age: Duration) -> Vec<AgentSession> {
108        let target = limit.clamp(1, 25);
109        if self.last_limit < target
110            || self
111                .last_refresh
112                .is_none_or(|last| last.elapsed() >= max_age)
113        {
114            self.refresh(target);
115        }
116        self.cached_sessions.iter().take(target).cloned().collect()
117    }
118
119    fn refresh(&mut self, limit: usize) {
120        let mut candidates = discover_session_files();
121        candidates.sort_by_key(|candidate| std::cmp::Reverse(candidate.updated));
122        let target = limit.clamp(1, 25);
123        let mut live_paths = HashSet::new();
124        let mut sessions = Vec::new();
125        let mut seen = HashSet::new();
126
127        for candidate in candidates
128            .into_iter()
129            .take(target.saturating_mul(3).clamp(10, 75))
130        {
131            live_paths.insert(candidate.path.clone());
132            let session = match self.entries.get(&candidate.path) {
133                Some(entry) if entry.mtime == candidate.updated => entry.session.clone(),
134                _ => {
135                    let parsed = parse_session_file(&candidate);
136                    self.entries.insert(
137                        candidate.path.clone(),
138                        CacheEntry {
139                            mtime: candidate.updated,
140                            session: parsed.clone(),
141                        },
142                    );
143                    parsed
144                }
145            };
146            if let Some(session) = session
147                && seen.insert(session.display_id.clone())
148            {
149                sessions.push(session);
150                if sessions.len() >= target {
151                    break;
152                }
153            }
154        }
155        self.entries.retain(|path, _| live_paths.contains(path));
156        self.cached_sessions = sessions;
157        self.last_refresh = Some(Instant::now());
158        self.last_limit = target;
159    }
160}
161
162pub fn discover_session_files() -> Vec<SessionCandidate> {
163    user_home_dir()
164        .as_deref()
165        .map(discover_session_files_in_home)
166        .unwrap_or_default()
167}
168
169pub fn discover_session_files_in_home(home: &Path) -> Vec<SessionCandidate> {
170    let roots = [
171        (AGENT_CLAUDE, home.join(".claude/projects")),
172        (AGENT_CODEX, home.join(".codex/sessions")),
173        (AGENT_GEMINI, home.join(".gemini/tmp")),
174    ];
175    let mut out = Vec::new();
176    for (agent, dir) in roots {
177        walk_agent_files(agent, &dir, &mut |path, meta| {
178            out.push(SessionCandidate {
179                agent,
180                path: path.to_path_buf(),
181                updated: meta.modified().unwrap_or(UNIX_EPOCH),
182            });
183        });
184    }
185    out
186}
187
188pub fn count_session_dirs() -> Vec<SessionDirStat> {
189    let Some(home) = user_home_dir() else {
190        return Vec::new();
191    };
192    [
193        (AGENT_CLAUDE, home.join(".claude/projects")),
194        (AGENT_CODEX, home.join(".codex/sessions")),
195        (AGENT_GEMINI, home.join(".gemini/tmp")),
196    ]
197    .into_iter()
198    .filter_map(|(agent, dir)| {
199        let (mut sessions, mut bytes) = (0usize, 0u64);
200        walk_agent_files(agent, &dir, &mut |_, meta| {
201            sessions += 1;
202            bytes += meta.len();
203        });
204        (sessions > 0).then_some(SessionDirStat {
205            agent,
206            dir,
207            sessions,
208            bytes,
209        })
210    })
211    .collect()
212}
213
214pub fn parse_session_file(candidate: &SessionCandidate) -> Option<AgentSession> {
215    let content = fs::read_to_string(&candidate.path).ok()?;
216    parse_session_content(
217        candidate.agent,
218        &candidate.path,
219        candidate.updated,
220        &content,
221    )
222}
223
224pub fn parse_session_path(path: &Path) -> Option<AgentSession> {
225    let agent = agent_source_for_path(path)?;
226    let updated = fs::metadata(path)
227        .and_then(|metadata| metadata.modified())
228        .unwrap_or(UNIX_EPOCH);
229    parse_session_file(&SessionCandidate {
230        agent,
231        path: path.to_path_buf(),
232        updated,
233    })
234}
235
236pub fn parse_session_content(
237    agent: &str,
238    path: &Path,
239    updated: SystemTime,
240    content: &str,
241) -> Option<AgentSession> {
242    if agent == AGENT_GEMINI {
243        parse_gemini_json(path, updated, content)
244    } else {
245        parse_jsonl(agent, path, updated, content)
246    }
247}
248
249pub fn session_log_path_from_str(raw: &str) -> Option<PathBuf> {
250    let trimmed = raw.trim().trim_end_matches(" (deleted)");
251    if trimmed.is_empty() {
252        return None;
253    }
254    let path = Path::new(trimmed);
255    if !path.is_absolute() || !is_agent_session_file(path) {
256        return None;
257    }
258    agent_source_for_path(path).map(|_| normalize_session_log_path(path))
259}
260
261pub fn normalize_session_log_path(path: &Path) -> PathBuf {
262    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
263}
264
265pub fn agent_source_for_path(path: &Path) -> Option<&'static str> {
266    let value = path.to_string_lossy();
267    if value.contains("/.claude/") && path.extension().and_then(|ext| ext.to_str()) == Some("jsonl")
268    {
269        Some(AGENT_CLAUDE)
270    } else if value.contains("/.codex/")
271        && path.extension().and_then(|ext| ext.to_str()) == Some("jsonl")
272    {
273        Some(AGENT_CODEX)
274    } else if value.contains("/.gemini/")
275        && path.extension().and_then(|ext| ext.to_str()) == Some("json")
276    {
277        Some(AGENT_GEMINI)
278    } else {
279        None
280    }
281}
282
283pub fn fixture_session_path(agent: &str, home: &Path) -> Option<PathBuf> {
284    match agent {
285        AGENT_CLAUDE => Some(home.join(".claude/projects/test/session.jsonl")),
286        AGENT_CODEX => Some(home.join(".codex/sessions/2026/06/02/session.jsonl")),
287        AGENT_GEMINI => Some(home.join(".gemini/tmp/test/chats/session-test.json")),
288        _ => None,
289    }
290}
291
292pub fn is_codex_cli_entrypoint(target: Option<&str>) -> bool {
293    target.is_some_and(|target| {
294        Path::new(target).file_name().and_then(|name| name.to_str()) == Some("codex")
295            && !target.contains("/node_modules/")
296    })
297}
298
299pub fn codex_exec_prompt(command: &str) -> Option<String> {
300    let mut args = command.split_once(" exec ")?.1.trim();
301    while let Some(rest) = strip_codex_exec_option(args) {
302        args = rest.trim_start();
303    }
304    (!args.starts_with('-'))
305        .then(|| args.trim_matches(['"', '\'']))
306        .and_then(clean_prompt_text)
307}
308
309fn parse_jsonl(
310    agent: &str,
311    path: &Path,
312    updated: SystemTime,
313    content: &str,
314) -> Option<AgentSession> {
315    let mut acc = SessionAccumulator::new(agent, path, updated);
316    let mut codex_model = String::new();
317    let mut claude_message_models = BTreeMap::<String, TokenUsage>::new();
318    let mut claude_seen_usage = HashSet::new();
319
320    for line in content.lines() {
321        let Ok(obj) = serde_json::from_str::<Value>(line) else {
322            continue;
323        };
324        if let Some(id) = local_session_id(&obj) {
325            acc.session_id = id;
326        }
327        if acc.cwd.is_none() {
328            acc.cwd = obj
329                .get("cwd")
330                .and_then(Value::as_str)
331                .or_else(|| obj.pointer("/payload/cwd").and_then(Value::as_str))
332                .filter(|s| !s.is_empty())
333                .map(ToString::to_string);
334        }
335        if let Some(ts) = obj.get("timestamp").and_then(Value::as_str) {
336            acc.last_message_at = Some(ts.to_string());
337            acc.end_timestamp_ms = iso_ms(ts).or(acc.end_timestamp_ms);
338        }
339        let typ = obj.get("type").and_then(Value::as_str).unwrap_or("");
340        match (agent, typ) {
341            (AGENT_CLAUDE, "result") => {
342                acc.duration_ms = json_u64(&obj, "duration_ms");
343                if let Some(model_usage) = obj.get("modelUsage").and_then(Value::as_object) {
344                    for (name, usage) in model_usage {
345                        acc.model.get_or_insert_with(|| name.clone());
346                        acc.add_usage(
347                            name,
348                            json_i64(usage, "inputTokens"),
349                            json_i64(usage, "outputTokens"),
350                            json_i64(usage, "cacheCreationInputTokens"),
351                            json_i64(usage, "cacheReadInputTokens"),
352                            0,
353                        );
354                    }
355                }
356            }
357            (AGENT_CLAUDE, "assistant") => {
358                if let Some(name) = obj.pointer("/message/model").and_then(Value::as_str) {
359                    acc.model.get_or_insert_with(|| name.to_string());
360                }
361                if let Some(usage) = obj.pointer("/message/usage")
362                    && claude_seen_usage.insert(claude_usage_key(&obj))
363                {
364                    let name = obj
365                        .pointer("/message/model")
366                        .and_then(Value::as_str)
367                        .unwrap_or("unknown");
368                    add_usage(
369                        &mut claude_message_models,
370                        name,
371                        json_i64(usage, "input_tokens"),
372                        json_i64(usage, "output_tokens"),
373                        json_i64(usage, "cache_creation_input_tokens"),
374                        json_i64(usage, "cache_read_input_tokens"),
375                        0,
376                    );
377                }
378                if let Some(items) = obj.pointer("/message/content").and_then(Value::as_array) {
379                    for item in items
380                        .iter()
381                        .filter(|item| item.get("type").and_then(Value::as_str) == Some("tool_use"))
382                    {
383                        let name = item.get("name").and_then(Value::as_str).unwrap_or("?");
384                        acc.add_tool(name);
385                        if let Some(fp) = item
386                            .pointer("/input/file_path")
387                            .and_then(Value::as_str)
388                            .filter(|s| !is_noise_path(s))
389                        {
390                            acc.add_file(fp);
391                        }
392                    }
393                }
394            }
395            (AGENT_CLAUDE, "queue-operation") if acc.prompt_preview.is_none() => {
396                if obj.get("operation").and_then(Value::as_str) == Some("enqueue")
397                    && let Some(text) = obj.get("content").and_then(Value::as_str)
398                    && let Some(text) = clean_prompt_text(text)
399                {
400                    acc.prompt_preview = Some(text);
401                }
402            }
403            (AGENT_CLAUDE, "last-prompt") if acc.prompt_preview.is_none() => {
404                if let Some(text) = obj.get("lastPrompt").and_then(Value::as_str)
405                    && let Some(text) = clean_prompt_text(text)
406                {
407                    acc.prompt_preview = Some(text);
408                }
409            }
410            (AGENT_CLAUDE, "user") => {
411                if acc.prompt_preview.is_none()
412                    && !is_claude_tool_result(&obj)
413                    && let Some(text) =
414                        local_message_preview(obj.pointer("/message/content").unwrap_or(&obj))
415                {
416                    acc.prompt_preview = Some(text);
417                }
418            }
419            (AGENT_CODEX, "turn_context") => {
420                if let Some(name) = obj.pointer("/payload/model").and_then(Value::as_str) {
421                    codex_model = name.to_string();
422                    acc.model = Some(name.to_string());
423                }
424            }
425            (AGENT_CODEX, "event_msg") => {
426                if obj.pointer("/payload/type").and_then(Value::as_str) == Some("token_count")
427                    && let Some(usage) = obj.pointer("/payload/info/total_token_usage")
428                {
429                    let name = if codex_model.is_empty() {
430                        "unknown"
431                    } else {
432                        &codex_model
433                    };
434                    acc.set_usage(
435                        name,
436                        json_i64(usage, "input_tokens"),
437                        json_i64(usage, "output_tokens"),
438                        0,
439                        0,
440                        json_i64(usage, "total_tokens"),
441                    );
442                }
443            }
444            (AGENT_CODEX, "response_item")
445                if obj.pointer("/payload/type").and_then(Value::as_str)
446                    == Some("function_call") =>
447            {
448                let name = obj
449                    .pointer("/payload/name")
450                    .and_then(Value::as_str)
451                    .unwrap_or("?");
452                acc.add_tool(name);
453            }
454            (AGENT_CODEX, "message" | "input" | "user") => {
455                if let Some(text) = local_message_preview(&obj) {
456                    acc.prompt_preview = Some(text);
457                }
458            }
459            _ if acc.prompt_preview.is_none() && typ.contains("user") => {
460                if let Some(text) = local_message_preview(&obj) {
461                    acc.prompt_preview = Some(text);
462                }
463            }
464            _ => {}
465        }
466    }
467
468    if acc.model_usage.is_empty() {
469        acc.model_usage = claude_message_models;
470    }
471    acc.finish()
472}
473
474fn parse_gemini_json(path: &Path, updated: SystemTime, content: &str) -> Option<AgentSession> {
475    let root: Value = serde_json::from_str(content).ok()?;
476    let mut acc = SessionAccumulator::new(AGENT_GEMINI, path, updated);
477    if let Some(id) = root.get("sessionId").and_then(Value::as_str) {
478        acc.session_id = id.to_string();
479    }
480    acc.start_timestamp_ms = root
481        .get("startTime")
482        .and_then(Value::as_str)
483        .and_then(iso_ms);
484    acc.end_timestamp_ms = root
485        .get("lastUpdated")
486        .and_then(Value::as_str)
487        .and_then(iso_ms)
488        .or(acc.start_timestamp_ms);
489    acc.duration_ms = acc
490        .start_timestamp_ms
491        .zip(acc.end_timestamp_ms)
492        .map(|(start, end)| end.saturating_sub(start))
493        .unwrap_or_default();
494
495    let Some(messages) = root.get("messages").and_then(Value::as_array) else {
496        return acc.finish();
497    };
498    for msg in messages {
499        if let Some(ts) = msg.get("timestamp").and_then(Value::as_str) {
500            acc.last_message_at = Some(ts.to_string());
501        }
502        match msg.get("type").and_then(Value::as_str) {
503            Some("user") if acc.prompt_preview.is_none() => {
504                if let Some(text) = local_message_preview(msg.get("content").unwrap_or(msg)) {
505                    acc.prompt_preview = Some(text);
506                }
507            }
508            Some("gemini") | Some("assistant") | Some("model") => {
509                if let Some(model) = msg.get("model").and_then(Value::as_str) {
510                    acc.model.get_or_insert_with(|| model.to_string());
511                    if let Some(tokens) = msg.get("tokens") {
512                        acc.add_usage(
513                            model,
514                            json_i64(tokens, "input"),
515                            json_i64(tokens, "output"),
516                            0,
517                            json_i64(tokens, "cached"),
518                            json_i64(tokens, "total"),
519                        );
520                    }
521                }
522                if let Some(tool_calls) = msg.get("toolCalls").and_then(Value::as_array) {
523                    for call in tool_calls {
524                        let name = call.get("name").and_then(Value::as_str).unwrap_or("?");
525                        acc.add_tool(name);
526                        if let Some(path) = find_file_arg(call).filter(|path| !is_noise_path(path))
527                        {
528                            acc.add_file(path);
529                        }
530                    }
531                }
532            }
533            _ => {}
534        }
535    }
536    acc.finish()
537}
538
539struct SessionAccumulator {
540    agent: String,
541    session_id: String,
542    path: PathBuf,
543    updated: SystemTime,
544    start_timestamp_ms: Option<u64>,
545    end_timestamp_ms: Option<u64>,
546    model: Option<String>,
547    model_usage: BTreeMap<String, TokenUsage>,
548    tools: BTreeMap<String, usize>,
549    files: BTreeMap<String, usize>,
550    prompt_preview: Option<String>,
551    duration_ms: u64,
552    cwd: Option<String>,
553    last_message_at: Option<String>,
554}
555
556impl SessionAccumulator {
557    fn new(agent: &str, path: &Path, updated: SystemTime) -> Self {
558        let normalized = normalize_session_log_path(path);
559        let session_id = path
560            .file_stem()
561            .and_then(|stem| stem.to_str())
562            .unwrap_or("session")
563            .to_string();
564        Self {
565            agent: agent.to_string(),
566            session_id,
567            path: normalized.clone(),
568            updated,
569            start_timestamp_ms: None,
570            end_timestamp_ms: Some(system_time_ms(updated)),
571            model: None,
572            model_usage: BTreeMap::new(),
573            tools: BTreeMap::new(),
574            files: BTreeMap::new(),
575            prompt_preview: None,
576            duration_ms: 0,
577            cwd: None,
578            last_message_at: None,
579        }
580    }
581
582    fn add_usage(
583        &mut self,
584        model: &str,
585        input: i64,
586        output: i64,
587        cache_creation: i64,
588        cache_read: i64,
589        total: i64,
590    ) {
591        add_usage(
592            &mut self.model_usage,
593            model,
594            input,
595            output,
596            cache_creation,
597            cache_read,
598            total,
599        );
600    }
601
602    fn set_usage(
603        &mut self,
604        model: &str,
605        input: i64,
606        output: i64,
607        cache_creation: i64,
608        cache_read: i64,
609        total: i64,
610    ) {
611        let mut usage = TokenUsage::default();
612        usage.add(input, output, cache_creation, cache_read, total);
613        self.model_usage.insert(model.to_string(), usage);
614    }
615
616    fn add_tool(&mut self, name: &str) {
617        *self.tools.entry(name.to_string()).or_default() += 1;
618    }
619
620    fn add_file(&mut self, path: &str) {
621        *self.files.entry(path.to_string()).or_default() += 1;
622    }
623
624    fn finish(self) -> Option<AgentSession> {
625        let token_usage =
626            self.model_usage
627                .values()
628                .fold(TokenUsage::default(), |mut total, usage| {
629                    total.input_tokens += usage.input_tokens;
630                    total.output_tokens += usage.output_tokens;
631                    total.cache_creation_tokens += usage.cache_creation_tokens;
632                    total.cache_read_tokens += usage.cache_read_tokens;
633                    total.total_tokens += usage.total_tokens;
634                    total
635                });
636        if token_usage.total_tokens == 0
637            && self.tools.is_empty()
638            && self.prompt_preview.is_none()
639            && self.model.is_none()
640        {
641            return None;
642        }
643        let display_id = format!("{}:{}", self.agent, short_session_id(&self.session_id));
644        Some(AgentSession {
645            agent: self.agent,
646            session_id: self.session_id,
647            display_id,
648            path: self.path,
649            updated: self.updated,
650            start_timestamp_ms: self
651                .start_timestamp_ms
652                .or_else(|| Some(system_time_ms(self.updated).saturating_sub(self.duration_ms))),
653            end_timestamp_ms: self.end_timestamp_ms,
654            model: self.model,
655            token_usage,
656            model_usage: self.model_usage,
657            tools: self.tools,
658            files: self.files,
659            prompt_preview: self.prompt_preview,
660            duration_ms: self.duration_ms,
661            cwd: self.cwd,
662            last_message_at: self.last_message_at,
663        })
664    }
665}
666
667#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
668pub struct ProcessKey {
669    pub pid: u32,
670    pub starttime_ticks: u64,
671}
672
673#[derive(Debug, Clone, Default)]
674pub struct ProcessTree {
675    pub root: ProcessKey,
676    pub members: Vec<ProcessKey>,
677}
678
679#[derive(Debug, Clone, Default)]
680pub struct LiveProcessCandidate {
681    pub tree: ProcessTree,
682    pub agent: String,
683    pub age_s: Option<f64>,
684    pub cwd: Option<String>,
685}
686
687#[derive(Debug, Clone)]
688pub struct SessionProcessInput {
689    pub id: String,
690    pub agent: String,
691    pub path: PathBuf,
692    pub start_timestamp_ms: Option<u64>,
693    pub end_timestamp_ms: Option<u64>,
694    pub cwd: Option<String>,
695}
696
697#[derive(Debug, Clone)]
698pub struct SessionProcessMatch {
699    pub session_id: String,
700    pub root_pid: u32,
701    pub matched_pids: Vec<u32>,
702    pub pid_starttime_ticks: u64,
703    pub source: &'static str,
704    pub confidence: f32,
705    pub evidence: &'static str,
706}
707
708#[derive(Debug, Default)]
709pub struct SessionProcessMatches {
710    pub by_session_id: HashMap<String, SessionProcessMatch>,
711    pub by_pid: HashMap<u32, String>,
712    pub used_root_pids: HashSet<u32>,
713}
714
715impl SessionProcessMatches {
716    pub fn session_for_pid(&self, pid: u32) -> Option<&SessionProcessMatch> {
717        self.by_pid
718            .get(&pid)
719            .and_then(|session_id| self.by_session_id.get(session_id))
720    }
721}
722
723#[derive(Default)]
724pub struct SessionProcessMatcher {
725    bindings: HashMap<u32, LiveSessionBinding>,
726}
727
728struct LiveSessionBinding {
729    starttime_ticks: u64,
730    session_path: PathBuf,
731}
732
733impl SessionProcessMatcher {
734    pub fn match_sessions(
735        &mut self,
736        sessions: &[SessionProcessInput],
737        processes: &[LiveProcessCandidate],
738        fd_paths_by_process: &HashMap<ProcessKey, BTreeSet<PathBuf>>,
739        observed_path_by_process: &HashMap<ProcessKey, PathBuf>,
740        now_ms: u64,
741    ) -> SessionProcessMatches {
742        let path_evidence =
743            collect_path_evidence(processes, fd_paths_by_process, observed_path_by_process);
744        self.retain_live(processes);
745
746        let mut out = SessionProcessMatches::default();
747        for session in sessions {
748            let Some((process, evidence)) = processes.iter().find_map(|process| {
749                if out.used_root_pids.contains(&process.tree.root.pid)
750                    || process.agent != session.agent
751                    || !session_is_fresh_enough_for_process(session, process, now_ms)
752                {
753                    return None;
754                }
755                self.link_trace(&session.path, process, &path_evidence)
756                    .map(|evidence| (process, evidence))
757            }) else {
758                continue;
759            };
760            record_match(&mut out, session, process, evidence);
761        }
762
763        let mut cwd_candidates = Vec::new();
764        for (session_index, session) in sessions.iter().enumerate() {
765            if out.by_session_id.contains_key(&session.id) {
766                continue;
767            }
768            for (process_index, process) in processes.iter().enumerate() {
769                if out.used_root_pids.contains(&process.tree.root.pid)
770                    || process.agent != session.agent
771                    || !self.can_use_cwd_trace(&session.path, process, &path_evidence)
772                {
773                    continue;
774                }
775                let Some(distance_ms) = recent_cwd_distance_ms(session, process, now_ms) else {
776                    continue;
777                };
778                cwd_candidates.push((
779                    distance_ms,
780                    std::cmp::Reverse(session_end_ms(session)),
781                    session_index,
782                    process_index,
783                ));
784            }
785        }
786        cwd_candidates.sort_unstable();
787        for (_, _, session_index, process_index) in cwd_candidates {
788            let session = &sessions[session_index];
789            let process = &processes[process_index];
790            if out.by_session_id.contains_key(&session.id)
791                || out.used_root_pids.contains(&process.tree.root.pid)
792            {
793                continue;
794            }
795            record_match(&mut out, session, process, TRACE_RECENT_CWD);
796        }
797        out
798    }
799
800    fn retain_live(&mut self, processes: &[LiveProcessCandidate]) {
801        self.bindings.retain(|pid, binding| {
802            processes.iter().any(|process| {
803                process.tree.root.pid == *pid
804                    && process.tree.root.starttime_ticks == binding.starttime_ticks
805            })
806        });
807    }
808
809    fn link_trace(
810        &mut self,
811        session_path: &Path,
812        process: &LiveProcessCandidate,
813        path_evidence: &HashMap<u32, BTreeMap<PathBuf, &'static str>>,
814    ) -> Option<&'static str> {
815        let pid = process.tree.root.pid;
816        let path = normalize_session_log_path(session_path);
817        if let Some(evidence) = path_evidence.get(&pid) {
818            if let Some(trace) = evidence.get(&path).copied() {
819                self.bindings.insert(
820                    pid,
821                    LiveSessionBinding {
822                        starttime_ticks: process.tree.root.starttime_ticks,
823                        session_path: path,
824                    },
825                );
826                return Some(trace);
827            }
828            self.bindings.remove(&pid);
829            return None;
830        }
831        self.bindings
832            .get(&pid)
833            .filter(|binding| {
834                binding.starttime_ticks == process.tree.root.starttime_ticks
835                    && binding.session_path == path
836            })
837            .map(|_| TRACE_STICKY_BINDING)
838    }
839
840    fn can_use_cwd_trace(
841        &self,
842        session_path: &Path,
843        process: &LiveProcessCandidate,
844        path_evidence: &HashMap<u32, BTreeMap<PathBuf, &'static str>>,
845    ) -> bool {
846        let pid = process.tree.root.pid;
847        if path_evidence.contains_key(&pid) {
848            return false;
849        }
850        let path = normalize_session_log_path(session_path);
851        !self.bindings.get(&pid).is_some_and(|binding| {
852            binding.starttime_ticks == process.tree.root.starttime_ticks
853                && binding.session_path != path
854        })
855    }
856}
857
858fn record_match(
859    out: &mut SessionProcessMatches,
860    session: &SessionProcessInput,
861    process: &LiveProcessCandidate,
862    evidence: &'static str,
863) {
864    let matched_pids = process
865        .tree
866        .members
867        .iter()
868        .map(|key| key.pid)
869        .collect::<Vec<_>>();
870    out.used_root_pids.insert(process.tree.root.pid);
871    for pid in &matched_pids {
872        out.by_pid.insert(*pid, session.id.clone());
873    }
874    out.by_session_id.insert(
875        session.id.clone(),
876        SessionProcessMatch {
877            session_id: session.id.clone(),
878            root_pid: process.tree.root.pid,
879            matched_pids,
880            pid_starttime_ticks: process.tree.root.starttime_ticks,
881            source: SOURCE_SESSION_PROCESS_MATCH,
882            confidence: confidence_for_evidence(evidence),
883            evidence,
884        },
885    );
886}
887
888fn collect_path_evidence(
889    processes: &[LiveProcessCandidate],
890    fd_paths_by_process: &HashMap<ProcessKey, BTreeSet<PathBuf>>,
891    observed_path_by_process: &HashMap<ProcessKey, PathBuf>,
892) -> HashMap<u32, BTreeMap<PathBuf, &'static str>> {
893    let mut out = HashMap::new();
894    for process in processes {
895        let mut evidence = BTreeMap::new();
896        for key in &process.tree.members {
897            if let Some(paths) = fd_paths_by_process.get(key) {
898                for path in paths {
899                    if let Some(session_path) = session_log_path_from_str(&path.to_string_lossy()) {
900                        evidence.entry(session_path).or_insert(TRACE_PROC_FD);
901                    }
902                }
903            }
904            if let Some(path) = observed_path_by_process.get(key)
905                && let Some(session_path) = session_log_path_from_str(&path.to_string_lossy())
906            {
907                evidence.insert(session_path, TRACE_EBPF_FILE);
908            }
909        }
910        if !evidence.is_empty() {
911            out.insert(process.tree.root.pid, evidence);
912        }
913    }
914    out
915}
916
917fn session_is_fresh_enough_for_process(
918    session: &SessionProcessInput,
919    process: &LiveProcessCandidate,
920    now_ms: u64,
921) -> bool {
922    let Some(process_start_ms) = process_start_ms(process, now_ms) else {
923        return true;
924    };
925    session_end_ms(session).saturating_add(SESSION_PROCESS_START_SKEW_MS) >= process_start_ms
926}
927
928fn recent_cwd_distance_ms(
929    session: &SessionProcessInput,
930    process: &LiveProcessCandidate,
931    now_ms: u64,
932) -> Option<u64> {
933    let session_cwd = session.cwd.as_deref().filter(|value| !value.is_empty())?;
934    let process_cwd = process.cwd.as_deref().filter(|value| !value.is_empty())?;
935    if session_cwd != process_cwd {
936        return None;
937    }
938    let process_start_ms = process_start_ms(process, now_ms)?;
939    let session_end_ms = session_end_ms(session);
940    (session_end_ms.saturating_add(SESSION_PROCESS_START_SKEW_MS) >= process_start_ms)
941        .then_some(session_end_ms.abs_diff(process_start_ms))
942}
943
944fn process_start_ms(process: &LiveProcessCandidate, now_ms: u64) -> Option<u64> {
945    process
946        .age_s
947        .map(|age_s| now_ms.saturating_sub((age_s.max(0.0) * 1000.0).round() as u64))
948}
949
950fn session_end_ms(session: &SessionProcessInput) -> u64 {
951    session
952        .end_timestamp_ms
953        .or(session.start_timestamp_ms)
954        .unwrap_or_default()
955}
956
957fn confidence_for_evidence(evidence: &str) -> f32 {
958    match evidence {
959        TRACE_EBPF_FILE => 0.95,
960        TRACE_PROC_FD => 0.90,
961        TRACE_STICKY_BINDING => 0.70,
962        TRACE_RECENT_CWD => 0.55,
963        _ => 0.50,
964    }
965}
966
967fn walk_agent_files(agent: &'static str, dir: &Path, f: &mut dyn FnMut(&Path, &fs::Metadata)) {
968    let Ok(entries) = fs::read_dir(dir) else {
969        return;
970    };
971    for entry in entries.flatten() {
972        let path = entry.path();
973        if path.is_dir() {
974            walk_agent_files(agent, &path, f);
975        } else if is_agent_file_for(agent, &path)
976            && let Ok(meta) = path.metadata()
977        {
978            f(&path, &meta);
979        }
980    }
981}
982
983fn is_agent_session_file(path: &Path) -> bool {
984    agent_source_for_path(path).is_some()
985}
986
987fn is_agent_file_for(agent: &str, path: &Path) -> bool {
988    match agent {
989        AGENT_CLAUDE | AGENT_CODEX => {
990            path.extension().and_then(|ext| ext.to_str()) == Some("jsonl")
991        }
992        AGENT_GEMINI => {
993            path.extension().and_then(|ext| ext.to_str()) == Some("json")
994                && path
995                    .file_name()
996                    .and_then(|name| name.to_str())
997                    .is_some_and(|name| name.starts_with("session-"))
998                && path.to_string_lossy().contains("/chats/")
999        }
1000        _ => false,
1001    }
1002}
1003
1004fn user_home_dir() -> Option<PathBuf> {
1005    std::env::var("SUDO_USER")
1006        .ok()
1007        .and_then(|user| {
1008            fs::read_to_string("/etc/passwd").ok().and_then(|passwd| {
1009                passwd
1010                    .lines()
1011                    .find(|line| line.starts_with(&format!("{user}:")))
1012                    .and_then(|line| line.split(':').nth(5))
1013                    .map(PathBuf::from)
1014            })
1015        })
1016        .or_else(dirs::home_dir)
1017}
1018
1019fn add_usage(
1020    models: &mut BTreeMap<String, TokenUsage>,
1021    model: &str,
1022    input: i64,
1023    output: i64,
1024    cache_creation: i64,
1025    cache_read: i64,
1026    total: i64,
1027) {
1028    models.entry(model.to_string()).or_default().add(
1029        input,
1030        output,
1031        cache_creation,
1032        cache_read,
1033        total,
1034    );
1035}
1036
1037fn local_session_id(obj: &Value) -> Option<String> {
1038    for key in ["sessionId", "session_id", "conversation_id"] {
1039        if let Some(value) = obj.get(key).and_then(Value::as_str)
1040            && !value.is_empty()
1041        {
1042            return Some(value.to_string());
1043        }
1044    }
1045    for pointer in ["/payload/session_id", "/payload/sessionId"] {
1046        if let Some(value) = obj.pointer(pointer).and_then(Value::as_str)
1047            && !value.is_empty()
1048        {
1049            return Some(value.to_string());
1050        }
1051    }
1052    None
1053}
1054
1055fn strip_codex_exec_option(args: &str) -> Option<&str> {
1056    let (head, rest) = args.split_once(char::is_whitespace).unwrap_or((args, ""));
1057    match head {
1058        "--json" | "--skip-git-repo-check" | "--ephemeral" => Some(rest),
1059        "-C" | "-a" | "-s" | "-m" | "-c" | "-p" => rest
1060            .trim_start()
1061            .split_once(char::is_whitespace)
1062            .map(|(_, rest)| rest),
1063        _ => None,
1064    }
1065}
1066
1067fn claude_usage_key(obj: &Value) -> String {
1068    obj.get("requestId")
1069        .or_else(|| obj.pointer("/message/id"))
1070        .or_else(|| obj.get("uuid"))
1071        .and_then(Value::as_str)
1072        .unwrap_or("usage")
1073        .to_string()
1074}
1075
1076fn local_message_preview(value: &Value) -> Option<String> {
1077    let mut parts = Vec::new();
1078    collect_local_text(value, &mut parts);
1079    clean_prompt_text(&parts.join(" "))
1080}
1081
1082fn collect_local_text(value: &Value, out: &mut Vec<String>) {
1083    match value {
1084        Value::String(text) => out.push(text.clone()),
1085        Value::Array(items) => {
1086            for item in items {
1087                collect_local_text(item, out);
1088            }
1089        }
1090        Value::Object(obj) => {
1091            if obj.get("type").and_then(Value::as_str).is_some_and(|typ| {
1092                typ == "tool_use" || typ == "function_call" || typ == "tool_result"
1093            }) {
1094                return;
1095            }
1096            for key in ["text", "content", "message", "input", "prompt"] {
1097                if let Some(value) = obj.get(key) {
1098                    collect_local_text(value, out);
1099                }
1100            }
1101        }
1102        _ => {}
1103    }
1104}
1105
1106fn is_claude_tool_result(obj: &Value) -> bool {
1107    obj.get("toolUseResult").is_some()
1108        || obj.get("tool_use_result").is_some()
1109        || obj
1110            .pointer("/message/content")
1111            .and_then(Value::as_array)
1112            .is_some_and(|items| {
1113                items
1114                    .iter()
1115                    .any(|item| item.get("type").and_then(Value::as_str) == Some("tool_result"))
1116            })
1117}
1118
1119fn find_file_arg(value: &Value) -> Option<&str> {
1120    match value {
1121        Value::Object(obj) => {
1122            for key in ["file_path", "path", "filepath"] {
1123                if let Some(path) = obj.get(key).and_then(Value::as_str) {
1124                    return Some(path);
1125                }
1126            }
1127            obj.values().find_map(find_file_arg)
1128        }
1129        Value::Array(items) => items.iter().find_map(find_file_arg),
1130        _ => None,
1131    }
1132}
1133
1134fn is_noise_path(path: &str) -> bool {
1135    const NOISE: &[&str] = &[
1136        "/.claude/",
1137        "/.codex/",
1138        "/.gemini/",
1139        "/.git/",
1140        "/node_modules/",
1141        "/.npm/",
1142        "/.cache/",
1143        "CLAUDE.md",
1144        "AGENTS.md",
1145    ];
1146    NOISE.iter().any(|pat| path.contains(pat))
1147}
1148
1149fn clean_prompt_text(text: &str) -> Option<String> {
1150    let text = text.split_whitespace().collect::<Vec<_>>().join(" ");
1151    let text = text
1152        .strip_prefix("<session>")
1153        .and_then(|text| text.strip_suffix("</session>"))
1154        .unwrap_or(&text)
1155        .trim();
1156    (!text.is_empty()).then(|| text.to_string())
1157}
1158
1159fn short_session_id(id: &str) -> String {
1160    let id = id.trim();
1161    if id.is_empty() {
1162        return "session".to_string();
1163    }
1164    let compact = id
1165        .rsplit(['/', '\\'])
1166        .next()
1167        .unwrap_or(id)
1168        .trim_end_matches(".jsonl");
1169    const MAX_SESSION_ID_CHARS: usize = 12;
1170    if compact.chars().count() <= MAX_SESSION_ID_CHARS {
1171        return compact.to_string();
1172    }
1173    let head = compact.chars().take(6).collect::<String>();
1174    let tail = compact
1175        .chars()
1176        .rev()
1177        .take(5)
1178        .collect::<Vec<_>>()
1179        .into_iter()
1180        .rev()
1181        .collect::<String>();
1182    format!("{head}.{tail}")
1183}
1184
1185fn json_i64(value: &Value, key: &str) -> i64 {
1186    value.get(key).and_then(Value::as_i64).unwrap_or(0)
1187}
1188
1189fn json_u64(value: &Value, key: &str) -> u64 {
1190    value.get(key).and_then(Value::as_u64).unwrap_or(0)
1191}
1192
1193fn iso_ms(value: &str) -> Option<u64> {
1194    chrono::DateTime::parse_from_rfc3339(value)
1195        .ok()
1196        .and_then(|ts| u64::try_from(ts.timestamp_millis()).ok())
1197}
1198
1199fn system_time_ms(value: SystemTime) -> u64 {
1200    value
1201        .duration_since(UNIX_EPOCH)
1202        .unwrap_or_default()
1203        .as_millis() as u64
1204}