agtop 2.1.6

Terminal UI for monitoring AI coding agents (Claude Code, Codex, Aider, Cursor, Gemini, Goose, ...) — like top, but for agents.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
// OpenAI Codex CLI session enricher.
//
// Codex stores rollouts (transcripts) at:
//
//   ~/.codex/sessions/<YYYY>/<MM>/<DD>/<rollout-id>.jsonl
//
// (older flat layout: ~/.codex/sessions/<rollout-id>.jsonl) — this module
// handles both via a shallow recursive walk (max depth 4).
//
// Each line is a JSON event. The schema has evolved; we probe defensively
// for the shapes that have actually shipped:
//
//   { "type": "session_meta", "payload": { "id": "...", "cwd": "...", ... } }
//   { "type": "response_item",
//     "payload": { "type": "function_call",
//                  "name": "shell", "arguments": "...", "call_id": "call_1" } }
//   { "type": "response_item",
//     "payload": { "type": "function_call_output", "call_id": "call_1", "output": "..." } }
//   { "type": "response_item",
//     "payload": { "type": "message", "role": "user|assistant",
//                  "content": [{"type":"input_text","text":"..."}] } }
//
// We also tolerate the flat shape (no `payload` nesting) and the older
// `function_call`/`tool_use` field names — anything that mentions a tool
// call_id we'll track as in-flight until a matching output arrives.

use crate::format::{project_basename, sanitize_control};
use crate::model::{RecentTask, Session, Sessions, Status};
use crate::sessions::{LiveAgentRef, SessionsResult};

use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};

const RECENT_WINDOW_MS: u64 = 24 * 60 * 60 * 1000;
const BUSY_WINDOW_MS: u64 = 30 * 1000;        // 30s — covers mid-turn tool waits
const ACTIVE_WINDOW_MS: u64 = 5 * 60 * 1000;  // 5 minutes
const TAIL_BYTES: u64 = 256 * 1024;
const HEAD_BYTES: u64 = 4 * 1024; // session_meta is at the top of the file

fn root() -> PathBuf {
    dirs::home_dir().unwrap_or_default().join(".codex").join("sessions")
}

fn read_tail(path: &Path, bytes: u64) -> String {
    let mut f = match File::open(path) { Ok(f) => f, Err(_) => return String::new() };
    let len = match f.metadata() { Ok(m) => m.len(), Err(_) => return String::new() };
    if len == 0 { return String::new(); }
    let take = bytes.min(len);
    if f.seek(SeekFrom::End(-(take as i64))).is_err() {
        return String::new();
    }
    let mut buf = String::with_capacity(take as usize);
    let _ = f.take(take).read_to_string(&mut buf);
    buf
}

fn read_head(path: &Path, bytes: u64) -> String {
    let f = match File::open(path) { Ok(f) => f, Err(_) => return String::new() };
    let mut buf = String::with_capacity(bytes as usize);
    let _ = f.take(bytes).read_to_string(&mut buf);
    buf
}

fn parse_lines(text: &str) -> Vec<Value> {
    text.split('\n')
        .filter(|l| !l.trim().is_empty())
        .filter_map(|l| serde_json::from_str::<Value>(l).ok())
        .collect()
}

/// Recursive walk capped at `max_depth` so we don't traverse the entire home
/// dir if the user has put junk under ~/.codex/sessions.
fn walk_jsonls(root: &Path, max_depth: usize) -> Vec<PathBuf> {
    let mut out = Vec::new();
    fn rec(dir: &Path, depth: usize, max: usize, out: &mut Vec<PathBuf>) {
        if depth > max { return; }
        let rd = match fs::read_dir(dir) { Ok(d) => d, Err(_) => return };
        for ent in rd.flatten() {
            let p = ent.path();
            let ft = match ent.file_type() { Ok(f) => f, Err(_) => continue };
            if ft.is_dir() {
                rec(&p, depth + 1, max, out);
            } else if ft.is_file() && p.extension().and_then(|s| s.to_str()) == Some("jsonl") {
                out.push(p);
            }
        }
    }
    rec(root, 0, max_depth, &mut out);
    out
}

fn extract_cwd_from_meta(text: &str) -> Option<String> {
    for r in parse_lines(text) {
        // Probe a handful of plausible field names across schema versions.
        let candidates = [
            r.get("payload").and_then(|p| p.get("cwd")).and_then(|v| v.as_str()),
            r.get("cwd").and_then(|v| v.as_str()),
            r.get("payload").and_then(|p| p.get("workspace")).and_then(|v| v.as_str()),
            r.get("workspace").and_then(|v| v.as_str()),
        ];
        for c in candidates {
            if let Some(s) = c {
                if !s.is_empty() { return Some(s.to_string()); }
            }
        }
    }
    None
}

#[derive(Default)]
struct AnalysisOut {
    last_user_prompt: Option<String>,
    last_assistant: Option<String>,
    last_tool: Option<String>,
    current_tool: Option<String>,
    in_flight: u32,           // task / agent subagents only
    in_flight_tools: u32,     // any tool, used for busy-status decision
    last_ts: u64,
    finished: bool,
    tokens_input: u64,
    tokens_output: u64,
    model: Option<String>,
    /// Capped, prefix-tagged tail (`›` prose, `→` tool, `←` result) for
    /// the detail-popup live preview.
    recent_activity: Vec<String>,
}

fn push_recent(buf: &mut Vec<String>, line: String) {
    if buf.last().map(|s| s == &line).unwrap_or(false) { return; }
    buf.push(line);
    if buf.len() > 12 { buf.remove(0); }
}

fn analyse(records: &[Value]) -> AnalysisOut {
    let mut out = AnalysisOut::default();
    let mut tool_call_ids: Vec<String> = Vec::new();    // Task / Agent
    let mut all_tool_ids:  Vec<String> = Vec::new();    // any tool
    let mut completed: HashSet<String> = HashSet::new();

    for r in records {
        // Unwrap the optional `payload` envelope.
        let payload = r.get("payload").unwrap_or(r);
        let kind = payload.get("type").and_then(|v| v.as_str())
            .or_else(|| r.get("type").and_then(|v| v.as_str()))
            .unwrap_or("");

        match kind {
            "function_call" | "tool_use" | "local_shell_call" => {
                let name = payload.get("name").and_then(|v| v.as_str()).unwrap_or("tool");
                out.last_tool = Some(name.to_string());
                out.current_tool = Some(name.to_string());
                if let Some(id) = payload.get("call_id").and_then(|v| v.as_str())
                    .or_else(|| payload.get("id").and_then(|v| v.as_str())) {
                    all_tool_ids.push(id.to_string());
                    if name == "Task" || name == "Agent" {
                        tool_call_ids.push(id.to_string());
                    }
                }
                let arg_hint = payload.get("arguments").and_then(|v| v.as_str())
                    .or_else(|| payload.get("input").and_then(|i|
                        i.get("command").and_then(|v| v.as_str())
                            .or_else(|| i.get("file_path").and_then(|v| v.as_str()))
                            .or_else(|| i.get("subject").and_then(|v| v.as_str()))
                            .or_else(|| i.get("path").and_then(|v| v.as_str()))))
                    .map(|s| s.split_whitespace().collect::<Vec<_>>().join(" "))
                    .unwrap_or_default();
                let hint: String = arg_hint.chars().take(120).collect();
                let line = if hint.is_empty() { format!("{}", name) }
                           else { format!("{}: {}", name, hint) };
                push_recent(&mut out.recent_activity, line);
            }
            "function_call_output" | "tool_result" | "local_shell_call_output" => {
                if let Some(id) = payload.get("call_id").and_then(|v| v.as_str())
                    .or_else(|| payload.get("tool_use_id").and_then(|v| v.as_str())) {
                    completed.insert(id.to_string());
                }
                out.current_tool = None;
                let preview = payload.get("output").and_then(|v| v.as_str())
                    .or_else(|| payload.get("content").and_then(|v| v.as_str()))
                    .map(|s| s.split_whitespace().collect::<Vec<_>>().join(" "))
                    .unwrap_or_default();
                let hint: String = preview.chars().take(120).collect();
                let line = if hint.is_empty() { "← (ok)".to_string() }
                           else { format!("{}", hint) };
                push_recent(&mut out.recent_activity, line);
            }
            "message" | "response" => {
                let role = payload.get("role").and_then(|v| v.as_str()).unwrap_or("");
                let text = extract_text(payload);
                if !text.is_empty() {
                    let snippet: String = text.chars().take(120).collect();
                    if role == "user" || role == "human" {
                        out.last_user_prompt = Some(snippet.clone());
                        push_recent(&mut out.recent_activity, format!("{}", snippet));
                    } else if role == "assistant" || role == "model" {
                        out.last_assistant = Some(snippet.clone());
                        push_recent(&mut out.recent_activity, format!("{}", snippet));
                    }
                }
            }
            "session_end" | "stop" => {
                out.finished = true;
            }
            _ => {}
        }

        if let Some(t) = r.get("timestamp").and_then(|v| v.as_str()) {
            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(t) {
                let ms = dt.timestamp_millis() as u64;
                if ms > out.last_ts { out.last_ts = ms; }
            }
        }

        // Token usage — Codex emits a usage block on response.completed and
        // a few other events. Probe both nested-payload and flat shapes.
        let usage = payload.get("usage")
            .or_else(|| r.get("usage"))
            .or_else(|| payload.get("response").and_then(|r| r.get("usage")));
        if let Some(u) = usage {
            // OpenAI uses input_tokens / output_tokens (sometimes prompt_tokens /
            // completion_tokens on older APIs). Accept either.
            let it = u.get("input_tokens").and_then(|v| v.as_u64())
                .or_else(|| u.get("prompt_tokens").and_then(|v| v.as_u64()))
                .unwrap_or(0);
            let ot = u.get("output_tokens").and_then(|v| v.as_u64())
                .or_else(|| u.get("completion_tokens").and_then(|v| v.as_u64()))
                .unwrap_or(0);
            // Cached input bonus, when reported.
            let cr = u.get("input_tokens_details")
                .and_then(|d| d.get("cached_tokens"))
                .and_then(|v| v.as_u64()).unwrap_or(0);
            out.tokens_input  += it + cr;
            out.tokens_output += ot;
        }

        // Model — try every place codex might mention it.
        let model_str = payload.get("model").and_then(|v| v.as_str())
            .or_else(|| r.get("model").and_then(|v| v.as_str()))
            .or_else(|| payload.get("response").and_then(|r| r.get("model")).and_then(|v| v.as_str()));
        if let Some(m) = model_str {
            out.model = Some(m.to_string());
        }
    }

    out.in_flight = tool_call_ids.iter().filter(|id| !completed.contains(*id)).count() as u32;
    out.in_flight_tools = all_tool_ids.iter().filter(|id| !completed.contains(*id)).count() as u32;
    out
}

fn extract_text(payload: &Value) -> String {
    // Codex content arrays use "input_text" / "output_text" / "text"; we also
    // accept a plain string for the simple shape.
    if let Some(s) = payload.get("content").and_then(|v| v.as_str()) {
        return s.split_whitespace().collect::<Vec<_>>().join(" ");
    }
    if let Some(arr) = payload.get("content").and_then(|v| v.as_array()) {
        let mut out = String::new();
        for c in arr {
            let t = c.get("type").and_then(|v| v.as_str()).unwrap_or("");
            if t == "input_text" || t == "output_text" || t == "text" {
                if let Some(s) = c.get("text").and_then(|v| v.as_str()) {
                    if !out.is_empty() { out.push(' '); }
                    out.push_str(s);
                }
            }
        }
        return out.split_whitespace().collect::<Vec<_>>().join(" ");
    }
    String::new()
}

fn classify_status(
    is_live: bool, age_ms: u64, finished: bool,
    has_in_flight_task: bool, has_in_flight_tool: bool,
) -> Status {
    if is_live && has_in_flight_task { return Status::Spawning; }
    if is_live && (age_ms < BUSY_WINDOW_MS || has_in_flight_tool) { return Status::Busy; }
    if is_live && age_ms < ACTIVE_WINDOW_MS { return Status::Active; }
    if is_live { return Status::Idle; }
    if finished { return Status::Completed; }
    if age_ms < RECENT_WINDOW_MS { return Status::Waiting; }
    Status::Stale
}

pub fn summarise(live_agents: &[LiveAgentRef], now_ms: u64) -> SessionsResult {
    let root = root();
    if !root.exists() {
        return SessionsResult::empty();
    }
    // Map cwd -> live codex pid.
    let mut cwd_to_pid: HashMap<String, u32> = HashMap::new();
    for a in live_agents {
        if a.label == "codex" || a.label == "openai-codex" {
            cwd_to_pid.insert(a.cwd.to_string(), a.pid);
        }
    }

    let mut by_pid: HashMap<u32, Session> = HashMap::new();
    let mut sessions: Vec<Session> = Vec::new();
    let mut recent_tasks: Vec<RecentTask> = Vec::new();

    let files = walk_jsonls(&root, 4);
    // Group files by their session_meta cwd. Some users have many rollouts
    // for the same project; the most recently modified one is "the" session
    // for that cwd.
    let mut by_cwd: HashMap<String, Vec<(PathBuf, u64, u64)>> = HashMap::new();
    let mut orphan: Vec<(PathBuf, u64, u64)> = Vec::new();

    for path in files {
        let md = match fs::metadata(&path) { Ok(m) => m, Err(_) => continue };
        let mtime = md.modified().ok()
            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
            .map(|d| d.as_millis() as u64).unwrap_or(0);
        let size = md.len();
        // Cheap-only: don't bother scanning sessions older than the recent
        // window unless they belong to a live process.
        let age_ms = now_ms.saturating_sub(mtime);
        if age_ms > RECENT_WINDOW_MS && live_agents.is_empty() { continue; }
        let head = read_head(&path, HEAD_BYTES);
        if let Some(cwd) = extract_cwd_from_meta(&head) {
            by_cwd.entry(cwd).or_default().push((path, mtime, size));
        } else {
            orphan.push((path, mtime, size));
        }
    }

    // Sort each cwd group by mtime desc and process.
    for (cwd, mut files) in by_cwd {
        files.sort_by(|a, b| b.1.cmp(&a.1));
        let live_pid = cwd_to_pid.get(&cwd).copied();
        for (i, (path, mtime, size)) in files.iter().enumerate() {
            let is_most_recent = i == 0;
            let age_ms = now_ms.saturating_sub(*mtime);
            let id = path.file_stem().map(|s| s.to_string_lossy().into_owned()).unwrap_or_default();

            // Only do the expensive tail+parse for live or recently-active.
            let info = if (is_most_recent && live_pid.is_some()) || age_ms < RECENT_WINDOW_MS {
                analyse(&parse_lines(&read_tail(path, TAIL_BYTES)))
            } else {
                AnalysisOut::default()
            };

            let status = classify_status(
                is_most_recent && live_pid.is_some(),
                age_ms,
                info.finished,
                info.in_flight > 0,
                info.in_flight_tools > 0,
            );

            let last_task = info.last_assistant.clone()
                .or(info.last_user_prompt.clone());

            let proj_short = project_basename(&cwd);
            let sess = Session {
                id: id.clone(),
                project: cwd.clone(),
                project_short: proj_short.clone(),
                file: path.to_string_lossy().into_owned(),
                size_bytes: *size,
                mtime_ms: *mtime,
                age_ms,
                status,
                stop_reason: if info.finished { Some("session_end".to_string()) } else { None },
                last_task:    last_task.as_deref().map(sanitize_control),
                last_tool:    info.last_tool.as_deref().map(sanitize_control),
                current_tool: info.current_tool.as_deref().map(sanitize_control),
                in_flight_subagents: Vec::new(),
                recent_activity: info.recent_activity.iter()
                    .map(|s| sanitize_control(s)).collect(),
                in_flight_tasks: info.in_flight,
                live_pid: if is_most_recent { live_pid } else { None },
                is_most_recent,
                tokens_input: info.tokens_input,
                tokens_output: info.tokens_output,
                tokens_total: info.tokens_input + info.tokens_output,
                cost_usd: 0.0,
                model: info.model.as_deref().map(sanitize_control),
            };

            if is_most_recent {
                if let Some(pid) = live_pid {
                    by_pid.entry(pid).or_insert_with(|| sess.clone());
                }
            }

            if let Some(t) = &last_task {
                if age_ms < RECENT_WINDOW_MS {
                    let task = t.split_whitespace().collect::<Vec<_>>().join(" ");
                    recent_tasks.push(RecentTask {
                        project: cwd.clone(),
                        project_short: proj_short.clone(),
                        task: task.chars().take(120).collect(),
                        mtime_ms: *mtime,
                        status,
                    });
                }
            }

            sessions.push(sess);
        }
    }

    // Sessions whose meta line we couldn't parse — still surface their mtime
    // as "waiting" so they show up in the panel.
    for (path, mtime, size) in orphan {
        let age_ms = now_ms.saturating_sub(mtime);
        if age_ms > RECENT_WINDOW_MS { continue; }
        let id = path.file_stem().map(|s| s.to_string_lossy().into_owned()).unwrap_or_default();
        sessions.push(Session {
            id, project: "?".into(), project_short: "?".into(),
            file: path.to_string_lossy().into_owned(),
            size_bytes: size, mtime_ms: mtime, age_ms,
            status: Status::Waiting,
            stop_reason: None, last_task: None, last_tool: None,
            current_tool: None, in_flight_tasks: 0,
            in_flight_subagents: Vec::new(),
            recent_activity: Vec::new(),
            live_pid: None,
            is_most_recent: false,
            tokens_input: 0, tokens_output: 0, tokens_total: 0,
            cost_usd: 0.0, model: None,
        });
    }

    sessions.sort_by(|a, b| b.mtime_ms.cmp(&a.mtime_ms));
    recent_tasks.sort_by(|a, b| b.mtime_ms.cmp(&a.mtime_ms));
    if recent_tasks.len() > 20 { recent_tasks.truncate(20); }

    let waiting   = sessions.iter().filter(|s| s.status == Status::Waiting).count() as u32;
    let completed = sessions.iter().filter(|s| s.status == Status::Completed).count() as u32;
    let active    = sessions.iter().filter(|s| matches!(s.status, Status::Active | Status::Busy | Status::Spawning | Status::Idle)).count() as u32;
    let busy      = sessions.iter().filter(|s| matches!(s.status, Status::Busy | Status::Spawning)).count() as u32;

    SessionsResult {
        sessions: Sessions { sessions, recent_tasks, active, busy, waiting, completed },
        by_pid,
    }
}