use crate::collector::{MultiCollector, read_rate_limits};
use crate::model::{AgentSession, OrphanPort, RateLimitInfo, SessionStatus};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::mpsc;
use std::time::Instant;
const GRAPH_HISTORY_LEN: usize = 200;
const MAX_SUMMARY_JOBS: usize = 3;
const MAX_SUMMARY_RETRIES: u32 = 2;
fn sanitize_fallback(prompt: &str, max_len: usize) -> String {
let cleaned: String = prompt.chars()
.filter(|c| !c.is_control() || *c == ' ')
.take(max_len)
.collect();
if prompt.chars().count() > max_len {
format!("{}…", cleaned)
} else {
cleaned
}
}
pub struct App {
pub sessions: Vec<AgentSession>,
pub selected: usize,
pub should_quit: bool,
pub token_rates: VecDeque<f64>,
pub rate_limits: Vec<RateLimitInfo>,
prev_tokens: HashMap<(String, String), u64>,
rate_limit_counter: u32,
collector: MultiCollector,
pub summaries: HashMap<String, String>,
pending_summaries: HashSet<String>,
summary_retries: HashMap<String, u32>,
summary_rx: mpsc::Receiver<(String, String, Option<String>)>,
summary_tx: mpsc::Sender<(String, String, Option<String>)>,
pub orphan_ports: Vec<OrphanPort>,
pub status_msg: Option<(String, Instant)>,
}
impl App {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel();
let summaries = load_summary_cache();
Self {
sessions: Vec::new(),
selected: 0,
should_quit: false,
token_rates: VecDeque::with_capacity(GRAPH_HISTORY_LEN),
rate_limits: Vec::new(),
prev_tokens: HashMap::new(),
rate_limit_counter: 5, collector: MultiCollector::new(),
summaries,
pending_summaries: HashSet::new(),
summary_retries: HashMap::new(),
summary_rx: rx,
summary_tx: tx,
orphan_ports: Vec::new(),
status_msg: None,
}
}
pub fn set_status(&mut self, msg: String) {
self.status_msg = Some((msg, Instant::now()));
}
pub fn tick(&mut self) {
self.sessions = self.collector.collect();
self.orphan_ports = self.collector.orphan_ports.clone();
if self.selected >= self.sessions.len() && !self.sessions.is_empty() {
self.selected = self.sessions.len() - 1;
}
let mut rate: f64 = 0.0;
for s in &self.sessions {
let key = (s.agent_cli.to_string(), s.session_id.clone());
let total = s.active_tokens();
let prev = self.prev_tokens.get(&key).copied().unwrap_or(total);
rate += total.saturating_sub(prev) as f64;
self.prev_tokens.insert(key, total);
}
self.token_rates.push_back(rate);
if self.token_rates.len() > GRAPH_HISTORY_LEN {
self.token_rates.pop_front();
}
if self.rate_limits.is_empty() || self.rate_limit_counter >= 5 {
self.rate_limit_counter = 0;
self.rate_limits = read_rate_limits();
if let Some(codex_rl) = self.collector.codex_rate_limit() {
self.rate_limits.push(codex_rl.clone());
}
} else {
self.rate_limit_counter += 1;
}
self.drain_and_retry_summaries();
}
pub fn drain_and_retry_summaries(&mut self) {
while let Ok((sid, prompt, maybe_summary)) = self.summary_rx.try_recv() {
self.pending_summaries.remove(&sid);
match maybe_summary {
Some(summary) => {
self.summary_retries.remove(&sid);
self.summaries.insert(sid, summary);
save_summary_cache(&self.summaries);
}
None => {
let count = self.summary_retries.entry(sid.clone()).or_insert(0);
*count += 1;
if *count >= MAX_SUMMARY_RETRIES {
self.summaries.insert(sid, sanitize_fallback(&prompt, 28));
save_summary_cache(&self.summaries);
}
}
}
}
for s in &self.sessions {
let retries = self.summary_retries.get(&s.session_id).copied().unwrap_or(0);
if !s.initial_prompt.is_empty()
&& !self.summaries.contains_key(&s.session_id)
&& !self.pending_summaries.contains(&s.session_id)
&& self.pending_summaries.len() < MAX_SUMMARY_JOBS
&& retries < MAX_SUMMARY_RETRIES
{
self.pending_summaries.insert(s.session_id.clone());
let sid = s.session_id.clone();
let prompt = s.initial_prompt.clone();
let tx = self.summary_tx.clone();
std::thread::spawn(move || {
let result = generate_summary(&prompt);
let _ = tx.send((sid, prompt, result));
});
}
}
}
pub fn has_pending_summaries(&self) -> bool {
!self.pending_summaries.is_empty()
}
pub fn has_retryable_summaries(&self) -> bool {
self.sessions.iter().any(|s| {
!s.initial_prompt.is_empty()
&& !self.summaries.contains_key(&s.session_id)
&& !self.pending_summaries.contains(&s.session_id)
&& self.summary_retries.get(&s.session_id).copied().unwrap_or(0) < MAX_SUMMARY_RETRIES
})
}
pub fn select_next(&mut self) {
if !self.sessions.is_empty() {
self.selected = (self.selected + 1).min(self.sessions.len() - 1);
}
}
pub fn select_prev(&mut self) {
self.selected = self.selected.saturating_sub(1);
}
pub fn kill_selected(&mut self) {
if self.sessions.is_empty() {
return;
}
let session = &self.sessions[self.selected];
if session.status == SessionStatus::Done {
return;
}
let pid = session.pid;
let _ = std::process::Command::new("kill")
.args(["-9", &pid.to_string()])
.output();
self.tick();
}
pub fn kill_orphan_ports(&mut self) {
use crate::collector::process::get_listening_ports;
let fresh_ports = get_listening_ports();
for orphan in &self.orphan_ports {
let still_listening = fresh_ports.get(&orphan.pid)
.is_some_and(|ports| ports.contains(&orphan.port));
if !still_listening {
continue;
}
if let Ok(output) = std::process::Command::new("ps")
.args(["-p", &orphan.pid.to_string(), "-o", "command="])
.output()
{
let current_cmd = String::from_utf8_lossy(&output.stdout).trim().to_string();
if current_cmd == orphan.command {
let _ = std::process::Command::new("kill")
.args([&orphan.pid.to_string()])
.output();
}
}
}
self.tick();
}
pub fn quit(&mut self) {
self.should_quit = true;
}
pub fn jump_to_session(&mut self) -> Option<String> {
if self.sessions.is_empty() {
return None;
}
let session = &self.sessions[self.selected];
let target_pid = session.pid;
if std::env::var("TMUX").is_ok() {
return self.jump_via_tmux(target_pid);
}
None
}
fn jump_via_tmux(&self, target_pid: u32) -> Option<String> {
let output = std::process::Command::new("tmux")
.args(["list-panes", "-a", "-F", "#{pane_pid} #{session_name}:#{window_index}.#{pane_index}"])
.output()
.ok()?;
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
let mut parts = line.splitn(2, ' ');
let pane_pid: u32 = match parts.next().and_then(|p| p.parse().ok()) {
Some(p) => p,
None => continue,
};
let pane_target = match parts.next() {
Some(t) => t,
None => continue,
};
if is_descendant_of(target_pid, pane_pid) {
let _ = std::process::Command::new("tmux")
.args(["select-pane", "-t", pane_target])
.status();
if let Some(window) = pane_target.split('.').next() {
let _ = std::process::Command::new("tmux")
.args(["select-window", "-t", window])
.status();
}
return None; }
}
Some("pane not found".to_string())
}
pub fn session_summary(&self, session: &AgentSession) -> String {
if let Some(summary) = self.summaries.get(&session.session_id) {
summary.clone()
} else if matches!(session.status, SessionStatus::Done) {
if !session.initial_prompt.is_empty() {
sanitize_fallback(&session.initial_prompt, 28)
} else {
"—".to_string()
}
} else if self.pending_summaries.contains(&session.session_id) {
let dots = match (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() / 500) % 3 {
0 => ".",
1 => "..",
_ => "...",
};
dots.to_string()
} else if !session.initial_prompt.is_empty() {
sanitize_fallback(&session.initial_prompt, 28)
} else {
"—".to_string()
}
}
}
fn generate_summary(prompt: &str) -> Option<String> {
use std::io::Write;
use std::process::{Command, Stdio};
use std::time::Duration;
let input: String = prompt.chars().take(200).collect();
let request = format!(
"You are a conversation title generator. Given the user's first message, create a short title (3-5 words) that describes what they want to do. Be specific and actionable. Do NOT output generic titles like 'New conversation' or 'Initial setup'. Output ONLY the title, no quotes, no explanation.\n\nUser message: {}",
input
);
let mut child = match Command::new("claude")
.args(["--print", "-"])
.current_dir(std::env::temp_dir())
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
{
Ok(c) => c,
Err(_) => return Some(sanitize_fallback(prompt, 28)),
};
if let Some(mut stdin) = child.stdin.take() {
let _ = stdin.write_all(request.as_bytes());
}
let child_pid = child.id();
let (wo_tx, wo_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _ = wo_tx.send(child.wait_with_output());
});
let result = match wo_rx.recv_timeout(Duration::from_secs(10)) {
Ok(r) => r,
Err(_) => {
let _ = std::process::Command::new("kill")
.args(["-9", &child_pid.to_string()])
.status();
return None;
}
};
let fallback = sanitize_fallback(prompt, 28);
match result {
Ok(output) if output.status.success() => {
let raw = String::from_utf8_lossy(&output.stdout)
.trim()
.to_string();
let lower = raw.to_lowercase();
if raw.is_empty()
|| raw.chars().count() > 40
|| raw.contains("Summarize")
|| raw.starts_with("- ")
|| lower.contains("new conversation")
|| lower.contains("initial setup")
|| lower.contains("initial project")
|| lower.contains("initial conversation")
|| lower.starts_with("greeting")
{
Some(fallback)
} else {
Some(raw.trim_matches('"').trim_matches('\'').to_string())
}
}
_ => Some(fallback),
}
}
fn cache_dir() -> std::path::PathBuf {
dirs::cache_dir()
.unwrap_or_else(|| dirs::home_dir().unwrap_or_default().join(".cache"))
.join("abtop")
}
fn cache_path() -> std::path::PathBuf {
cache_dir().join("summaries.json")
}
fn load_summary_cache() -> HashMap<String, String> {
let path = cache_path();
match std::fs::read_to_string(&path) {
Ok(content) => {
let mut cache: HashMap<String, String> =
serde_json::from_str(&content).unwrap_or_default();
let before = cache.len();
cache.retain(|_, v| !v.contains("You are a conversation tit"));
if cache.len() < before {
let _ = std::fs::create_dir_all(cache_dir());
let _ = std::fs::write(&path, serde_json::to_string(&cache).unwrap_or_default());
}
cache
}
Err(_) => HashMap::new(),
}
}
fn is_descendant_of(target: u32, ancestor: u32) -> bool {
if target == ancestor {
return true;
}
let output = match std::process::Command::new("ps")
.args(["-eo", "pid,ppid"])
.output()
{
Ok(o) => o,
Err(_) => return false,
};
let stdout = String::from_utf8_lossy(&output.stdout);
let mut ppid_map: HashMap<u32, u32> = HashMap::new();
for line in stdout.lines().skip(1) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let (Ok(pid), Ok(ppid)) = (parts[0].parse::<u32>(), parts[1].parse::<u32>()) {
ppid_map.insert(pid, ppid);
}
}
}
let mut current = target;
let mut depth = 0;
while depth < 50 {
if let Some(&parent) = ppid_map.get(¤t) {
if parent == ancestor {
return true;
}
if parent == 0 || parent == 1 || parent == current {
return false;
}
current = parent;
depth += 1;
} else {
return false;
}
}
false
}
fn save_summary_cache(summaries: &HashMap<String, String>) {
let path = cache_path();
let _ = std::fs::create_dir_all(cache_dir());
if let Ok(json) = serde_json::to_string(summaries) {
let _ = std::fs::write(&path, json);
}
}