use anyhow::Result;
use notify::{RecursiveMode, Watcher};
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::cmd::Cmd;
use crate::config::Config;
use crate::git::GitStatus;
use crate::multiplexer::{Multiplexer, create_backend, detect_backend};
use crate::state::StateStore;
use super::app::SidebarLayoutMode;
use super::snapshot::build_snapshot;
pub fn socket_path(instance_id: &str) -> PathBuf {
let safe_id = instance_id.replace(['/', '\\'], "-");
std::env::temp_dir().join(format!("workmux-sidebar-{}.sock", safe_id))
}
struct TmuxState {
window_statuses: HashMap<String, Option<String>>,
active_windows: HashSet<(String, String)>,
pane_window_ids: HashMap<String, String>,
active_pane_ids: HashSet<String>,
window_pane_counts: HashMap<String, usize>,
}
fn query_tmux_state() -> TmuxState {
let format = "#{pane_id}\t#{session_name}\t#{window_id}\t#{@workmux_pane_status}\t#{window_active}\t#{session_attached}\t#{pane_active}";
let output = Cmd::new("tmux")
.args(&["list-panes", "-a", "-F", format])
.run_and_capture_stdout()
.unwrap_or_default();
let mut window_statuses = HashMap::new();
let mut active_windows = HashSet::new();
let mut pane_window_ids = HashMap::new();
let mut active_pane_ids = HashSet::new();
let mut window_pane_counts: HashMap<String, usize> = HashMap::new();
for line in output.lines() {
let mut parts = line.split('\t');
let (
Some(pane_id),
Some(session),
Some(window_id),
Some(status),
Some(win_active),
Some(sess_attached),
Some(pane_active),
) = (
parts.next(),
parts.next(),
parts.next(),
parts.next(),
parts.next(),
parts.next(),
parts.next(),
)
else {
continue;
};
let win_active = win_active == "1";
let sess_attached = sess_attached == "1";
let pane_active = pane_active == "1";
let status_val = if status.is_empty() {
None
} else {
Some(status.to_string())
};
window_statuses.insert(pane_id.to_string(), status_val);
pane_window_ids.insert(pane_id.to_string(), window_id.to_string());
*window_pane_counts.entry(window_id.to_string()).or_default() += 1;
if win_active && sess_attached {
active_windows.insert((session.to_string(), window_id.to_string()));
}
if pane_active {
active_pane_ids.insert(pane_id.to_string());
}
}
TmuxState {
window_statuses,
active_windows,
pane_window_ids,
active_pane_ids,
window_pane_counts,
}
}
struct SocketServer {
clients: Arc<Mutex<Vec<UnixStream>>>,
}
impl SocketServer {
fn bind(path: &Path, dirty_flag: Arc<AtomicBool>) -> std::io::Result<Self> {
let listener = UnixListener::bind(path)?;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
listener.set_nonblocking(true)?;
let clients: Arc<Mutex<Vec<UnixStream>>> = Arc::new(Mutex::new(Vec::new()));
let clients_clone = clients.clone();
thread::spawn(move || {
loop {
match listener.accept() {
Ok((stream, _)) => {
let _ = stream.set_write_timeout(Some(Duration::from_millis(1)));
clients_clone.lock().unwrap().push(stream);
dirty_flag.store(true, Ordering::Relaxed);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(50));
}
Err(_) => break,
}
}
});
Ok(Self { clients })
}
fn broadcast(&self, snapshot: &super::snapshot::SidebarSnapshot) {
let data = serde_json::to_vec(snapshot).unwrap_or_default();
let len = (data.len() as u32).to_be_bytes();
let mut clients = std::mem::take(&mut *self.clients.lock().unwrap());
clients
.retain_mut(|stream| stream.write_all(&len).is_ok() && stream.write_all(&data).is_ok());
self.clients.lock().unwrap().append(&mut clients);
}
fn client_count(&self) -> usize {
self.clients.lock().unwrap().len()
}
}
fn read_sidebar_layout_mode(config: &Config) -> Option<SidebarLayoutMode> {
if let Ok(output) = Cmd::new("tmux")
.args(&["show-option", "-gqv", "@workmux_sidebar_layout"])
.run_and_capture_stdout()
{
match output.trim() {
"tiles" => return Some(SidebarLayoutMode::Tiles),
"compact" => return Some(SidebarLayoutMode::Compact),
_ => {}
}
}
if let Ok(store) = StateStore::new()
&& let Ok(settings) = store.load_settings()
{
match settings.sidebar_layout.as_deref() {
Some("tiles") => return Some(SidebarLayoutMode::Tiles),
Some("compact") => return Some(SidebarLayoutMode::Compact),
_ => {}
}
}
match config.sidebar.layout.as_deref() {
Some("tiles") => return Some(SidebarLayoutMode::Tiles),
Some("compact") => return Some(SidebarLayoutMode::Compact),
_ => {}
}
None
}
type GitCache = Arc<Mutex<HashMap<PathBuf, GitStatus>>>;
fn resolve_git_dir(worktree_path: &Path) -> Option<PathBuf> {
let dot_git = worktree_path.join(".git");
if dot_git.is_dir() {
return Some(dot_git);
}
if dot_git.is_file() {
let content = std::fs::read_to_string(&dot_git).ok()?;
let gitdir = content.strip_prefix("gitdir: ")?.trim();
let path = PathBuf::from(gitdir);
if path.is_absolute() {
return Some(path);
}
Some(worktree_path.join(path))
} else {
None
}
}
fn resolve_common_git_dir(gitdir: &Path) -> Option<PathBuf> {
let content = std::fs::read_to_string(gitdir.join("commondir")).ok()?;
let rel = content.trim();
let path = if Path::new(rel).is_absolute() {
PathBuf::from(rel)
} else {
gitdir.join(rel)
};
path.canonicalize().ok().or(Some(path))
}
fn git_status_semantically_equal(a: &GitStatus, b: &GitStatus) -> bool {
a.ahead == b.ahead
&& a.behind == b.behind
&& a.has_conflict == b.has_conflict
&& a.is_dirty == b.is_dirty
&& a.lines_added == b.lines_added
&& a.lines_removed == b.lines_removed
&& a.uncommitted_added == b.uncommitted_added
&& a.uncommitted_removed == b.uncommitted_removed
&& a.base_branch == b.base_branch
&& a.branch == b.branch
&& a.has_upstream == b.has_upstream
}
fn find_worktrees_for_path(
event_path: &Path,
watch_to_worktrees: &HashMap<PathBuf, HashSet<PathBuf>>,
) -> Vec<PathBuf> {
let mut result = Vec::new();
for (watched_dir, worktrees) in watch_to_worktrees {
if event_path.starts_with(watched_dir) {
result.extend(worktrees.iter().cloned());
}
}
result
}
fn add_watch(
watcher: &mut notify::RecommendedWatcher,
path: &Path,
mode: RecursiveMode,
worktree: &Path,
watch_to_worktrees: &mut HashMap<PathBuf, HashSet<PathBuf>>,
watched_for_worktree: &mut Vec<PathBuf>,
) {
let already_watching = watch_to_worktrees.get(path).is_some_and(|s| !s.is_empty());
if !already_watching && let Err(e) = watcher.watch(path, mode) {
tracing::warn!("failed to watch {}: {}", path.display(), e);
return;
}
watch_to_worktrees
.entry(path.to_path_buf())
.or_default()
.insert(worktree.to_path_buf());
watched_for_worktree.push(path.to_path_buf());
}
fn remove_worktree_watch(
watcher: &mut notify::RecommendedWatcher,
watch_path: &Path,
worktree: &Path,
watch_to_worktrees: &mut HashMap<PathBuf, HashSet<PathBuf>>,
) {
if let Some(worktrees) = watch_to_worktrees.get_mut(watch_path) {
worktrees.remove(worktree);
if worktrees.is_empty() {
watch_to_worktrees.remove(watch_path);
let _ = watcher.unwatch(watch_path);
}
}
}
fn setup_worktree_watches(
watcher: &mut notify::RecommendedWatcher,
worktree: &Path,
watch_to_worktrees: &mut HashMap<PathBuf, HashSet<PathBuf>>,
) -> Vec<PathBuf> {
let mut watched = Vec::new();
let dot_git = worktree.join(".git");
let is_linked = dot_git.is_file();
if is_linked {
if let Some(git_dir) = resolve_git_dir(worktree) {
add_watch(
watcher,
&git_dir,
RecursiveMode::Recursive,
worktree,
watch_to_worktrees,
&mut watched,
);
if let Some(common_dir) = resolve_common_git_dir(&git_dir) {
let refs_dir = common_dir.join("refs");
if refs_dir.is_dir() {
add_watch(
watcher,
&refs_dir,
RecursiveMode::Recursive,
worktree,
watch_to_worktrees,
&mut watched,
);
}
add_watch(
watcher,
&common_dir,
RecursiveMode::NonRecursive,
worktree,
watch_to_worktrees,
&mut watched,
);
}
}
add_watch(
watcher,
worktree,
RecursiveMode::Recursive,
worktree,
watch_to_worktrees,
&mut watched,
);
} else {
add_watch(
watcher,
worktree,
RecursiveMode::Recursive,
worktree,
watch_to_worktrees,
&mut watched,
);
}
watched
}
fn next_worker_timeout(
pending: &HashMap<PathBuf, Instant>,
debounce: Duration,
last_sweep: Instant,
sweep_interval: Duration,
) -> Duration {
let now = Instant::now();
let sweep_wait = sweep_interval.saturating_sub(last_sweep.elapsed());
let mut min_wait = sweep_wait;
for last_event in pending.values() {
let ready_at = *last_event + debounce;
if ready_at <= now {
return Duration::from_millis(1);
}
let wait = ready_at - now;
if wait < min_wait {
min_wait = wait;
}
}
min_wait.min(Duration::from_secs(1))
}
fn refresh_git_status(path: &Path, cache: &GitCache) -> bool {
let new_status = crate::git::get_git_status(path, None);
let changed = cache
.lock()
.ok()
.map(|c| {
c.get(path)
.is_none_or(|old| !git_status_semantically_equal(old, &new_status))
})
.unwrap_or(true);
if let Ok(mut c) = cache.lock() {
c.insert(path.to_path_buf(), new_status);
}
changed
}
struct GitWorkerPath {
path: PathBuf,
is_stale: bool,
}
fn spawn_git_worker(
term: Arc<AtomicBool>,
dirty_flag: Arc<AtomicBool>,
) -> (GitCache, std::sync::mpsc::Sender<Vec<GitWorkerPath>>) {
let cache: GitCache = Arc::new(Mutex::new(HashMap::new()));
let cache_clone = cache.clone();
let (tx, rx) = std::sync::mpsc::channel::<Vec<GitWorkerPath>>();
thread::spawn(move || {
let (fs_tx, fs_rx) = std::sync::mpsc::channel();
let mut watcher: Option<notify::RecommendedWatcher> =
match notify::RecommendedWatcher::new(fs_tx, notify::Config::default()) {
Ok(w) => Some(w),
Err(e) => {
tracing::warn!(
"filesystem watcher unavailable, falling back to polling: {}",
e
);
None
}
};
let mut active_entries: Vec<GitWorkerPath> = Vec::new();
let mut watch_to_worktrees: HashMap<PathBuf, HashSet<PathBuf>> = HashMap::new();
let mut worktree_watches: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
let mut pending_worktrees: HashMap<PathBuf, Instant> = HashMap::new();
let mut path_stale: HashMap<PathBuf, bool> = HashMap::new();
let mut unique_active: Vec<PathBuf> = Vec::new();
let mut last_full_sweep = Instant::now();
let full_sweep_interval = if watcher.is_some() {
Duration::from_secs(30)
} else {
Duration::from_secs(2)
};
let debounce_duration = Duration::from_millis(300);
while !term.load(Ordering::Relaxed) {
if watcher.is_some() {
let timeout = next_worker_timeout(
&pending_worktrees,
debounce_duration,
last_full_sweep,
full_sweep_interval,
);
match fs_rx.recv_timeout(timeout) {
Ok(Ok(event)) => {
for path in &event.paths {
for wt in find_worktrees_for_path(path, &watch_to_worktrees) {
pending_worktrees
.entry(wt)
.and_modify(|t| *t = Instant::now())
.or_insert_with(Instant::now);
}
}
}
Ok(Err(e)) => {
tracing::warn!("filesystem watch error: {}", e);
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
}
while let Ok(event_result) = fs_rx.try_recv() {
if let Ok(event) = event_result {
for path in &event.paths {
for wt in find_worktrees_for_path(path, &watch_to_worktrees) {
pending_worktrees
.entry(wt)
.and_modify(|t| *t = Instant::now())
.or_insert_with(Instant::now);
}
}
}
}
} else {
let sleep = full_sweep_interval
.saturating_sub(last_full_sweep.elapsed())
.min(Duration::from_secs(1));
thread::sleep(sleep);
}
let mut paths_changed = false;
while let Ok(entries) = rx.try_recv() {
active_entries = entries;
paths_changed = true;
}
if paths_changed {
path_stale.clear();
for entry in &active_entries {
let e = path_stale.entry(entry.path.clone()).or_insert(true);
if !entry.is_stale {
*e = false;
}
}
unique_active = path_stale.keys().cloned().collect();
unique_active.sort();
let unique_set: HashSet<PathBuf> = unique_active.iter().cloned().collect();
if let Some(ref mut w) = watcher {
let removed: Vec<PathBuf> = worktree_watches
.keys()
.filter(|p| !unique_set.contains(*p))
.cloned()
.collect();
for path in &removed {
if let Some(watched_paths) = worktree_watches.remove(path) {
for wp in &watched_paths {
remove_worktree_watch(w, wp, path, &mut watch_to_worktrees);
}
}
pending_worktrees.remove(path);
}
for path in &unique_active {
if worktree_watches.contains_key(path) {
continue;
}
let watched = setup_worktree_watches(w, path, &mut watch_to_worktrees);
worktree_watches.insert(path.clone(), watched);
pending_worktrees.insert(path.clone(), Instant::now() - debounce_duration);
}
if !removed.is_empty() {
if let Ok(mut c) = cache_clone.lock() {
c.retain(|p, _| unique_set.contains(p));
}
dirty_flag.store(true, Ordering::Relaxed);
}
} else {
if let Ok(mut c) = cache_clone.lock() {
let before = c.len();
c.retain(|p, _| unique_set.contains(p));
if c.len() != before {
dirty_flag.store(true, Ordering::Relaxed);
}
}
for path in &unique_active {
if !cache_clone
.lock()
.ok()
.is_some_and(|c| c.contains_key(path))
{
pending_worktrees
.insert(path.clone(), Instant::now() - debounce_duration);
}
}
}
}
let now = Instant::now();
let ready: Vec<PathBuf> = pending_worktrees
.iter()
.filter(|(_, last_event)| now.duration_since(**last_event) >= debounce_duration)
.map(|(path, _)| path.clone())
.collect();
let mut any_changed = false;
for path in &ready {
pending_worktrees.remove(path);
let is_stale = path_stale.get(path).copied().unwrap_or(false);
if is_stale {
continue;
}
if refresh_git_status(path, &cache_clone) {
any_changed = true;
}
}
if last_full_sweep.elapsed() >= full_sweep_interval {
last_full_sweep = Instant::now();
let sweep_paths: Vec<PathBuf> = if watcher.is_some() {
worktree_watches.keys().cloned().collect()
} else {
unique_active.clone()
};
for path in &sweep_paths {
if pending_worktrees.contains_key(path) {
continue;
}
if refresh_git_status(path, &cache_clone) {
any_changed = true;
}
}
}
if any_changed {
dirty_flag.store(true, Ordering::Relaxed);
}
}
});
(cache, tx)
}
struct InactivityTracker {
entries: HashMap<String, (u64, Instant)>,
confirmed: HashMap<String, (u64, u64)>,
timeout: Duration,
}
impl InactivityTracker {
fn new(timeout: Duration) -> Self {
Self {
entries: HashMap::new(),
confirmed: HashMap::new(),
timeout,
}
}
fn check(
&mut self,
agents: &[crate::multiplexer::AgentPane],
mux: &dyn crate::multiplexer::Multiplexer,
) -> (HashMap<String, u64>, Vec<String>) {
use std::hash::{Hash, Hasher};
let now = Instant::now();
let working: HashMap<&str, &crate::multiplexer::AgentPane> = agents
.iter()
.filter(|a| a.status == Some(crate::multiplexer::AgentStatus::Working))
.map(|a| (a.pane_id.as_str(), a))
.collect();
self.entries
.retain(|id, _| working.contains_key(id.as_str()));
self.confirmed
.retain(|id, _| working.contains_key(id.as_str()));
let mut resumed = Vec::new();
let prev_confirmed: HashSet<String> = self.confirmed.keys().cloned().collect();
self.confirmed.retain(|id, (confirmed_ts, _)| {
if let Some(agent) = working.get(id.as_str()) {
agent.updated_ts.unwrap_or(0) <= *confirmed_ts
} else {
false
}
});
for id in &prev_confirmed {
if !self.confirmed.contains_key(id) {
resumed.push(id.clone());
}
}
for (pane_id, agent) in &working {
if self.confirmed.contains_key(*pane_id) {
continue;
}
let Some(raw) = mux.capture_pane(pane_id, 5) else {
continue;
};
let stripped = console::strip_ansi_codes(&raw);
let normalized = stripped.trim();
let mut hasher = std::hash::DefaultHasher::new();
normalized.hash(&mut hasher);
let hash = hasher.finish();
match self.entries.get(*pane_id) {
Some(&(prev_hash, first_seen)) if prev_hash == hash => {
if now.duration_since(first_seen) >= self.timeout {
let agent_ts = agent.updated_ts.unwrap_or(0);
let now_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.confirmed
.insert(pane_id.to_string(), (agent_ts, now_ts));
}
}
_ => {
self.entries.insert(pane_id.to_string(), (hash, now));
}
}
}
let interrupted = self
.confirmed
.iter()
.map(|(k, (_, ts))| (k.clone(), *ts))
.collect();
(interrupted, resumed)
}
}
pub fn run() -> Result<()> {
let mux = create_backend(detect_backend());
let instance_id = mux.instance_id();
let config = Config::load(None)?;
let status_icons = config.status_icons.clone();
let term = Arc::new(AtomicBool::new(false));
let dirty_flag = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGTERM, term.clone())?;
signal_hook::flag::register(signal_hook::consts::SIGUSR1, dirty_flag.clone())?;
let sock_path = socket_path(&instance_id);
let _ = std::fs::remove_file(&sock_path); let server = SocketServer::bind(&sock_path, dirty_flag.clone())?;
let (git_cache, git_path_tx) = spawn_git_worker(term.clone(), dirty_flag.clone());
Cmd::new("tmux")
.args(&[
"set-option",
"-g",
"@workmux_sidebar_daemon_pid",
&std::process::id().to_string(),
])
.run()?;
let mut inactivity_tracker = InactivityTracker::new(Duration::from_secs(10));
let mut last_interrupted: HashMap<String, u64> = HashMap::new();
let mut last_runtime_write = Instant::now();
let backend_name = mux.name().to_string();
let mut last_refresh = Instant::now();
let mut last_client_seen = Instant::now();
let mut dirty_pending = false;
let mut last_agent_list = String::new();
let refresh_interval = Duration::from_secs(2);
let debounce_interval = Duration::from_millis(50);
while !term.load(Ordering::Relaxed) {
if dirty_flag.swap(false, Ordering::Relaxed) {
dirty_pending = true;
}
let time_since_refresh = last_refresh.elapsed();
let debounce_cleared = dirty_pending && time_since_refresh >= debounce_interval;
let timer_expired = time_since_refresh >= refresh_interval;
if debounce_cleared || timer_expired {
dirty_pending = false;
last_refresh = Instant::now();
if let Some(mut snapshot) = try_build_snapshot(&mux, &status_icons, &config, &git_cache)
{
let (interrupted, resumed) =
inactivity_tracker.check(&snapshot.agents, mux.as_ref());
snapshot.interrupted_pane_ids = interrupted.clone();
if !resumed.is_empty()
&& let Ok(store) = StateStore::new()
{
let now_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
for pane_id in &resumed {
let pane_key = crate::state::PaneKey {
backend: backend_name.clone(),
instance: instance_id.clone(),
pane_id: pane_id.clone(),
};
if let Ok(Some(mut state)) = store.get_agent(&pane_key) {
state.status_ts = Some(now_ts);
let _ = store.upsert_agent(&state);
}
}
}
let set_changed = interrupted != last_interrupted;
let heartbeat_due = last_runtime_write.elapsed() >= Duration::from_secs(10);
if set_changed || heartbeat_due {
last_interrupted = interrupted;
last_runtime_write = Instant::now();
if let Ok(store) = StateStore::new() {
let now_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let runtime = crate::state::RuntimeState {
interrupted_pane_ids: last_interrupted.clone(),
updated_ts: now_ts,
};
let _ = store.write_runtime(&backend_name, &instance_id, &runtime);
}
}
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let stale_threshold = 60 * 60; let entries: Vec<GitWorkerPath> = snapshot
.agents
.iter()
.map(|a| GitWorkerPath {
path: a.path.clone(),
is_stale: a
.status_ts
.map(|ts| now_secs.saturating_sub(ts) > stale_threshold)
.unwrap_or(false),
})
.collect();
let _ = git_path_tx.send(entries);
server.broadcast(&snapshot);
let agent_list: String = snapshot
.agents
.iter()
.map(|a| a.pane_id.as_str())
.collect::<Vec<_>>()
.join(" ");
if agent_list != last_agent_list {
if !agent_list.is_empty() {
let _ = Cmd::new("tmux")
.args(&["set-option", "-g", "@workmux_sidebar_agents", &agent_list])
.run();
} else {
let _ = Cmd::new("tmux")
.args(&["set-option", "-gu", "@workmux_sidebar_agents"])
.run();
}
last_agent_list = agent_list;
}
}
}
if server.client_count() > 0 {
last_client_seen = Instant::now();
} else if last_client_seen.elapsed() > Duration::from_secs(10) {
break;
}
thread::sleep(Duration::from_millis(10));
}
let _ = std::fs::remove_file(&sock_path);
if let Ok(store) = StateStore::new() {
store.delete_runtime(&backend_name, &instance_id);
}
let _ = Cmd::new("tmux")
.args(&["set-option", "-gu", "@workmux_sidebar_daemon_pid"])
.run();
let _ = Cmd::new("tmux")
.args(&["set-option", "-gu", "@workmux_sidebar_agents"])
.run();
Ok(())
}
fn try_build_snapshot(
mux: &Arc<dyn Multiplexer>,
status_icons: &crate::config::StatusIcons,
config: &Config,
git_cache: &GitCache,
) -> Option<super::snapshot::SidebarSnapshot> {
let tmux_state = query_tmux_state();
let agents = StateStore::new()
.and_then(|store| store.load_reconciled_agents(mux.as_ref()))
.ok()?;
let layout_mode = read_sidebar_layout_mode(config).unwrap_or_default();
let git_statuses = git_cache.lock().ok().map(|c| c.clone()).unwrap_or_default();
Some(build_snapshot(
agents,
&tmux_state.window_statuses,
&tmux_state.pane_window_ids,
tmux_state.active_windows,
tmux_state.active_pane_ids,
tmux_state.window_pane_counts,
layout_mode,
status_icons,
git_statuses,
))
}