pub mod claude;
pub mod codex;
pub mod mcp;
pub mod opencode;
pub mod process;
pub mod rate_limit;
pub use claude::ClaudeCollector;
pub use codex::CodexCollector;
pub use mcp::McpServer;
pub use opencode::OpenCodeCollector;
pub use rate_limit::read_rate_limits;
pub(crate) fn abbrev_path(path: &std::path::Path) -> String {
if let Some(home) = dirs::home_dir() {
if let Ok(rel) = path.strip_prefix(&home) {
return format!("~/{}", rel.display());
}
}
path.to_string_lossy().into_owned()
}
pub(crate) fn redact_secrets(s: &str) -> String {
const PATTERNS: &[&str] = &[
"sk-ant-",
"sk-proj-",
"sk-or-",
"sk_live_",
"sk_test_",
"rk_live_",
"rk_test_",
"ghp_",
"gho_",
"ghs_",
"ghr_",
"ghu_",
"github_pat_",
"glpat-",
"xoxb-",
"xoxp-",
"xoxa-",
"xoxs-",
"AKIA",
"ASIA",
"Bearer ",
];
let mut result = s.to_string();
for pat in PATTERNS {
while let Some(pos) = result.find(pat) {
let end = result[pos..]
.find(char::is_whitespace)
.map(|i| pos + i)
.unwrap_or(result.len());
result.replace_range(pos..end, "[REDACTED]");
}
}
result
}
pub(crate) fn sanitize_terminal_text(s: &str) -> String {
s.chars()
.filter(|c| {
!c.is_control()
&& !matches!(
*c,
'\u{202A}'..='\u{202E}' | '\u{2066}'..='\u{2069}' | '\u{200E}' | '\u{200F}'
)
})
.collect()
}
use crate::model::{AgentSession, OrphanPort, RateLimitInfo, SessionStatus};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::{
atomic::{AtomicU32, Ordering},
mpsc::{self, Receiver, Sender},
Arc,
};
use std::time::{Duration, Instant};
pub trait AgentCollector {
fn collect(&mut self, shared: &SharedProcessData) -> Vec<AgentSession>;
fn live_rate_limit(&self) -> Option<RateLimitInfo> {
None
}
fn discovered_config_dirs(&self) -> Vec<std::path::PathBuf> {
Vec::new()
}
}
pub struct SharedProcessData {
pub process_info: HashMap<u32, process::ProcInfo>,
pub children_map: HashMap<u32, Vec<u32>>,
pub ports: HashMap<u32, Vec<u16>>,
pub slow_tick: bool,
pub mcp_server_pids: HashSet<u32>,
pub mcp_owned_rollouts: HashSet<PathBuf>,
pub mcp_suppress: bool,
pub desktop_rollout_fd_map: HashMap<u32, Vec<PathBuf>>,
}
impl SharedProcessData {
pub fn fetch(cached_ports: Option<&HashMap<u32, Vec<u16>>>, slow_tick: bool) -> Self {
let process_info = process::get_process_info();
let children_map = process::get_children_map(&process_info);
let ports = match cached_ports {
Some(p) => p.clone(),
None => process::get_listening_ports(),
};
Self {
process_info,
children_map,
ports,
slow_tick,
mcp_server_pids: HashSet::new(),
mcp_owned_rollouts: HashSet::new(),
mcp_suppress: true,
desktop_rollout_fd_map: HashMap::new(),
}
}
}
#[derive(Clone)]
struct TrackedPortChild {
port: u16,
command: String,
project_name: String,
}
struct DesktopRolloutScanResult {
pids: Vec<u32>,
rollouts: Option<HashMap<u32, Vec<PathBuf>>>,
}
struct DesktopRolloutScanner {
cached: HashMap<u32, Vec<PathBuf>>,
cached_pids: Vec<u32>,
in_flight_pids: Option<Vec<u32>>,
child_pid: Arc<AtomicU32>,
last_started: Option<Instant>,
tx: Sender<DesktopRolloutScanResult>,
rx: Receiver<DesktopRolloutScanResult>,
}
const DESKTOP_ROLLOUT_SCAN_TIMEOUT: Duration = Duration::from_secs(90);
const DESKTOP_ROLLOUT_RESCAN_INTERVAL: Duration = Duration::from_secs(60);
impl DesktopRolloutScanner {
fn new() -> Self {
let (tx, rx) = mpsc::channel();
Self {
cached: HashMap::new(),
cached_pids: Vec::new(),
in_flight_pids: None,
child_pid: Arc::new(AtomicU32::new(0)),
last_started: None,
tx,
rx,
}
}
fn update(&mut self, pids: &[u32]) -> HashMap<u32, Vec<PathBuf>> {
self.poll_completed();
if self.should_start(pids) {
self.start(pids.to_vec());
}
self.cached_for(pids)
}
fn poll_completed(&mut self) {
while let Ok(result) = self.rx.try_recv() {
self.apply_result(result);
}
}
fn apply_result(&mut self, result: DesktopRolloutScanResult) {
self.in_flight_pids = None;
if let Some(rollouts) = result.rollouts {
self.cached_pids = result.pids;
self.cached = rollouts;
}
}
fn should_start(&self, pids: &[u32]) -> bool {
if pids.is_empty() || self.in_flight_pids.is_some() {
return false;
}
if self.cached_pids != pids {
return true;
}
self.last_started
.is_none_or(|started| started.elapsed() >= DESKTOP_ROLLOUT_RESCAN_INTERVAL)
}
fn start(&mut self, pids: Vec<u32>) {
self.in_flight_pids = Some(pids.clone());
self.last_started = Some(Instant::now());
let tx = self.tx.clone();
let child_pid = self.child_pid.clone();
std::thread::spawn(move || {
let rollouts = mcp::map_pid_to_rollouts_with_timeout_and_pid_slot(
&pids,
DESKTOP_ROLLOUT_SCAN_TIMEOUT,
Some(child_pid),
);
let _ = tx.send(DesktopRolloutScanResult { pids, rollouts });
});
}
fn cached_for(&self, pids: &[u32]) -> HashMap<u32, Vec<PathBuf>> {
let mut map = HashMap::new();
for pid in pids {
if let Some(paths) = self.cached.get(pid) {
map.insert(*pid, paths.clone());
}
}
map
}
}
impl Drop for DesktopRolloutScanner {
fn drop(&mut self) {
let pid = self.child_pid.swap(0, Ordering::SeqCst);
mcp::kill_rollout_scan_child(pid);
}
}
pub struct MultiCollector {
collectors: Vec<Box<dyn AgentCollector>>,
codex_enabled: bool,
tick_count: u32,
cached_ports: HashMap<u32, Vec<u16>>,
cached_port_pids: Vec<u32>,
cached_git: HashMap<String, (u32, u32)>,
tracked_port_children: HashMap<u32, TrackedPortChild>,
pub orphan_ports: Vec<OrphanPort>,
pub mcp_servers: Vec<McpServer>,
pub mcp_suppress: bool,
desktop_rollout_scanner: DesktopRolloutScanner,
}
const SLOW_POLL_INTERVAL: u32 = 5;
impl MultiCollector {
#[cfg(test)]
pub fn with_hidden(hidden: &[String]) -> Self {
Self::with_hidden_and_claude_config_dirs(hidden, &[])
}
pub fn with_hidden_and_claude_config_dirs(
hidden: &[String],
claude_config_dirs: &[PathBuf],
) -> Self {
let is_hidden = |name: &str| hidden.iter().any(|h| h.eq_ignore_ascii_case(name));
let mut collectors: Vec<Box<dyn AgentCollector>> = Vec::new();
if !is_hidden("claude") {
collectors.push(Box::new(ClaudeCollector::with_configured_dirs(
claude_config_dirs.to_vec(),
)));
}
if !is_hidden("codex") {
collectors.push(Box::new(CodexCollector::new()));
}
if !is_hidden("opencode") {
collectors.push(Box::new(OpenCodeCollector::new()));
}
let codex_enabled = !is_hidden("codex");
Self {
collectors,
codex_enabled,
tick_count: SLOW_POLL_INTERVAL, cached_ports: HashMap::new(),
cached_port_pids: Vec::new(),
cached_git: HashMap::new(),
tracked_port_children: HashMap::new(),
orphan_ports: Vec::new(),
mcp_servers: Vec::new(),
mcp_suppress: true,
desktop_rollout_scanner: DesktopRolloutScanner::new(),
}
}
pub fn set_mcp_suppress(&mut self, on: bool) {
self.mcp_suppress = on;
}
pub fn agent_rate_limits(&self) -> Vec<RateLimitInfo> {
self.collectors
.iter()
.filter_map(|c| c.live_rate_limit())
.collect()
}
pub fn all_config_dirs(&self) -> Vec<std::path::PathBuf> {
self.collectors
.iter()
.flat_map(|c| c.discovered_config_dirs())
.collect()
}
pub fn collect(&mut self) -> Vec<AgentSession> {
let slow_tick = self.tick_count >= SLOW_POLL_INTERVAL;
if slow_tick {
self.tick_count = 0;
}
self.tick_count += 1;
let fresh_process = SharedProcessData::fetch(Some(&self.cached_ports), slow_tick);
let mut current_pids: Vec<u32> = fresh_process.process_info.keys().copied().collect();
current_pids.sort_unstable();
let pids_changed = current_pids != self.cached_port_pids;
let mut shared = if slow_tick || pids_changed {
let s = SharedProcessData::fetch(None, slow_tick);
self.cached_ports = s.ports.clone();
self.cached_port_pids = current_pids;
s
} else {
fresh_process
};
let detection = mcp::detect(&shared.process_info);
self.mcp_servers = detection.servers;
shared.mcp_suppress = self.mcp_suppress;
if self.mcp_suppress {
shared.mcp_server_pids = detection.server_pids;
shared.mcp_owned_rollouts = detection.owned_rollouts;
}
if self.codex_enabled {
let desktop_pids = CodexCollector::find_codex_desktop_pids_from_shared(
&shared.process_info,
&shared.mcp_server_pids,
);
shared.desktop_rollout_fd_map = self.desktop_rollout_scanner.update(&desktop_pids);
}
let mut all = Vec::new();
for collector in &mut self.collectors {
all.extend(collector.collect(&shared));
}
if slow_tick {
self.cached_git.clear();
for s in &mut all {
let stats = process::collect_git_stats(&s.cwd);
self.cached_git.insert(s.cwd.clone(), stats);
s.git_added = stats.0;
s.git_modified = stats.1;
}
} else {
for s in &mut all {
if let Some(&(added, modified)) = self.cached_git.get(&s.cwd) {
s.git_added = added;
s.git_modified = modified;
} else {
let stats = process::collect_git_stats(&s.cwd);
self.cached_git.insert(s.cwd.clone(), stats);
s.git_added = stats.0;
s.git_modified = stats.1;
}
}
}
all.retain(|s| !matches!(s.status, SessionStatus::Done));
all.sort_by_key(|s| std::cmp::Reverse(s.started_at));
let mut live_child_pids = std::collections::HashSet::new();
for s in &all {
if !matches!(s.status, SessionStatus::Done) {
for child in &s.children {
live_child_pids.insert(child.pid);
if let Some(port) = child.port {
self.tracked_port_children.insert(
child.pid,
TrackedPortChild {
port,
command: child.command.clone(),
project_name: s.project_name.clone(),
},
);
}
}
}
}
self.orphan_ports.clear();
let mut stale_pids = Vec::new();
for (pid, tracked) in &self.tracked_port_children {
if live_child_pids.contains(pid) {
continue; }
let still_listening = shared
.ports
.get(pid)
.is_some_and(|ports| ports.contains(&tracked.port));
let still_alive = shared.process_info.contains_key(pid);
if still_alive && still_listening {
self.orphan_ports.push(OrphanPort {
port: tracked.port,
pid: *pid,
command: tracked.command.clone(),
project_name: tracked.project_name.clone(),
});
} else {
stale_pids.push(*pid);
}
}
for pid in stale_pids {
self.tracked_port_children.remove(&pid);
}
self.orphan_ports.sort_by_key(|o| o.port);
all
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn with_hidden_empty_keeps_all_collectors() {
let mc = MultiCollector::with_hidden(&[]);
assert_eq!(mc.collectors.len(), 3);
}
#[test]
fn with_hidden_codex_drops_codex_only() {
let mc = MultiCollector::with_hidden(&["codex".to_string()]);
assert_eq!(mc.collectors.len(), 2);
}
#[test]
fn with_hidden_is_case_insensitive() {
let mc = MultiCollector::with_hidden(&["CODEX".to_string()]);
assert_eq!(mc.collectors.len(), 2);
let mc = MultiCollector::with_hidden(&["Claude".to_string()]);
assert_eq!(mc.collectors.len(), 2);
}
#[test]
fn with_hidden_unknown_names_are_ignored() {
let mc = MultiCollector::with_hidden(&["kiro".to_string(), "gemini".to_string()]);
assert_eq!(mc.collectors.len(), 3);
}
#[test]
fn with_hidden_all_agents_yields_empty() {
let mc = MultiCollector::with_hidden(&[
"claude".to_string(),
"codex".to_string(),
"opencode".to_string(),
]);
assert!(mc.collectors.is_empty());
}
#[test]
fn desktop_rollout_scanner_keeps_cache_on_failed_result() {
let mut scanner = DesktopRolloutScanner::new();
let mut cached = HashMap::new();
cached.insert(42, vec![PathBuf::from("/tmp/rollout-live.jsonl")]);
scanner.apply_result(DesktopRolloutScanResult {
pids: vec![42],
rollouts: Some(cached.clone()),
});
scanner.apply_result(DesktopRolloutScanResult {
pids: vec![42],
rollouts: None,
});
assert_eq!(scanner.cached_for(&[42]), cached);
}
#[test]
fn desktop_rollout_scanner_in_flight_guard_blocks_duplicate_scan() {
let mut scanner = DesktopRolloutScanner::new();
scanner.in_flight_pids = Some(vec![42]);
assert!(!scanner.should_start(&[42]));
}
}