use crate::cmd::Cmd;
use crate::config::SplitDirection;
use anyhow::{Context, Result, anyhow};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
use super::agent;
use super::handshake::UnixPipeHandshake;
use super::types::*;
use super::util;
use super::{Multiplexer, PaneHandshake};
#[derive(Debug, Deserialize)]
struct KittyProcess {
pid: u32,
#[allow(dead_code)]
cwd: String,
cmdline: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct KittyWindow {
id: u64,
title: String,
cwd: String,
pid: u32,
is_focused: bool,
#[allow(dead_code)]
is_active: bool,
#[serde(default)]
foreground_processes: Vec<KittyProcess>,
}
#[derive(Debug, Deserialize)]
struct KittyTab {
id: u64,
title: String,
is_active: bool,
#[allow(dead_code)]
is_focused: bool,
windows: Vec<KittyWindow>,
}
#[derive(Debug, Deserialize)]
struct KittyOsWindow {
id: u64,
is_focused: bool,
tabs: Vec<KittyTab>,
}
#[derive(Debug, Clone)]
struct FlatPane {
os_window_id: u64,
tab_id: u64,
tab_title: String,
window_id: u64,
is_focused: bool,
#[allow(dead_code)]
is_tab_active: bool,
cwd: PathBuf,
pid: u32,
title: String,
foreground_command: Option<String>,
foreground_pid: Option<u32>,
}
#[derive(Debug)]
pub struct KittyBackend;
impl Default for KittyBackend {
fn default() -> Self {
Self::new()
}
}
impl KittyBackend {
pub fn new() -> Self {
Self
}
fn kitten_cmd(&self) -> Cmd<'static> {
Cmd::new("kitten").arg("@")
}
fn list_panes(&self) -> Result<Vec<FlatPane>> {
let output = self
.kitten_cmd()
.arg("ls")
.run_and_capture_stdout()
.context("Failed to list kitty panes")?;
let os_windows: Vec<KittyOsWindow> =
serde_json::from_str(&output).context("Failed to parse kitty ls output")?;
let mut panes = Vec::new();
for os_win in os_windows {
for tab in os_win.tabs {
for win in tab.windows {
let fg = win.foreground_processes.iter().min_by_key(|p| p.pid);
let foreground_command = fg.and_then(|p| {
p.cmdline.first().map(|c| {
Path::new(c)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| c.clone())
})
});
let foreground_pid = fg.map(|p| p.pid);
panes.push(FlatPane {
os_window_id: os_win.id,
tab_id: tab.id,
tab_title: tab.title.clone(),
window_id: win.id,
is_focused: win.is_focused && tab.is_focused && os_win.is_focused,
is_tab_active: tab.is_active,
cwd: PathBuf::from(&win.cwd),
pid: win.pid,
title: win.title,
foreground_command,
foreground_pid,
});
}
}
}
Ok(panes)
}
fn current_window_id(&self) -> Option<u64> {
std::env::var("KITTY_WINDOW_ID").ok()?.parse().ok()
}
fn current_os_window_id(&self) -> Option<u64> {
let window_id = self.current_window_id()?;
let panes = self.list_panes().ok()?;
panes
.iter()
.find(|p| p.window_id == window_id)
.map(|p| p.os_window_id)
}
fn panes_in_current_scope<'a>(&self, panes: &'a [FlatPane]) -> Vec<&'a FlatPane> {
let current_os = self.current_os_window_id();
panes
.iter()
.filter(|p| current_os.is_none() || Some(p.os_window_id) == current_os)
.collect()
}
#[allow(dead_code)]
fn set_tab_title(&self, window_id: &str, title: &str) -> Result<()> {
self.kitten_cmd()
.args(&[
"set-tab-title",
"--match",
&format!("id:{}", window_id),
title,
])
.run()
.context("Failed to set tab title")?;
Ok(())
}
fn split_pane_internal(
&self,
target_pane_id: &str,
direction: SplitDirection,
cwd: &Path,
_size: Option<u16>,
_percentage: Option<u8>,
command: Option<&str>,
) -> Result<String> {
let location_arg = match direction {
SplitDirection::Horizontal => "vsplit",
SplitDirection::Vertical => "hsplit",
};
let cwd_str = cwd.to_string_lossy();
let match_arg = format!("id:{}", target_pane_id);
let mut args = vec![
"launch",
"--location",
location_arg,
"--match",
&match_arg,
"--cwd",
&*cwd_str,
];
if let Some(cmd) = command {
args.push("sh");
args.push("-c");
args.push(cmd);
}
let output = self
.kitten_cmd()
.args(&args)
.run_and_capture_stdout()
.context("Failed to split kitty pane")?;
Ok(output.trim().to_string())
}
}
impl Multiplexer for KittyBackend {
fn name(&self) -> &'static str {
"kitty"
}
fn is_running(&self) -> Result<bool> {
self.kitten_cmd().arg("ls").run_as_check()
}
fn current_pane_id(&self) -> Option<String> {
std::env::var("KITTY_WINDOW_ID").ok()
}
fn active_pane_id(&self) -> Option<String> {
self.list_panes().ok().and_then(|panes| {
panes
.into_iter()
.find(|p| p.is_focused)
.map(|p| p.window_id.to_string())
})
}
fn get_client_active_pane_path(&self) -> Result<PathBuf> {
let window_id = self
.current_window_id()
.ok_or_else(|| anyhow!("KITTY_WINDOW_ID not set or invalid"))?;
let panes = self.list_panes()?;
let current = panes
.iter()
.find(|p| p.window_id == window_id)
.ok_or_else(|| anyhow!("Current window {} not found", window_id))?;
if current.cwd.as_os_str().is_empty() {
return Err(anyhow!("Empty path returned from kitty"));
}
Ok(current.cwd.clone())
}
fn create_session(&self, _params: CreateSessionParams) -> Result<String> {
Err(anyhow!(
"Session mode (--session) is not supported in Kitty.\n\
Kitty does not have a session concept like tmux.\n\
Use the default window mode instead (omit --session flag)."
))
}
fn switch_to_session(&self, _prefix: &str, _name: &str) -> Result<()> {
Err(anyhow!(
"Session mode is not supported in Kitty.\n\
Use the default window mode instead."
))
}
fn session_exists(&self, _full_name: &str) -> Result<bool> {
Ok(false)
}
fn kill_session(&self, _full_name: &str) -> Result<()> {
Ok(())
}
fn schedule_session_close(&self, _full_name: &str, _delay: Duration) -> Result<()> {
Err(anyhow!(
"Session mode is not supported in Kitty. Use window mode instead."
))
}
fn get_all_session_names(&self) -> Result<HashSet<String>> {
Ok(HashSet::new())
}
fn wait_until_session_closed(&self, _full_session_name: &str) -> Result<()> {
Err(anyhow!(
"Session mode is not supported in Kitty. Use window mode instead."
))
}
fn create_window(&self, params: CreateWindowParams) -> Result<String> {
let full_name = util::prefixed(params.prefix, params.name);
let cwd_str = params.cwd.to_string_lossy();
let output = self
.kitten_cmd()
.args(&[
"launch",
"--type=tab",
"--tab-title",
&full_name,
"--cwd",
&*cwd_str,
"--dont-take-focus",
])
.run_and_capture_stdout()
.context("Failed to create kitty tab")?;
let window_id = output.trim().to_string();
let _ = self
.kitten_cmd()
.args(&[
"set-tab-title",
"--match",
&format!("id:{}", window_id),
&full_name,
])
.run();
Ok(window_id)
}
fn kill_window(&self, full_name: &str) -> Result<()> {
let panes = self.list_panes()?;
let scoped_panes = self.panes_in_current_scope(&panes);
let mut seen_tabs = HashSet::new();
let window_ids: Vec<u64> = scoped_panes
.iter()
.filter(|p| p.tab_title == full_name)
.filter(|p| seen_tabs.insert(p.tab_id))
.map(|p| p.window_id)
.collect();
if window_ids.is_empty() {
return Ok(()); }
for window_id in window_ids {
let _ = self
.kitten_cmd()
.args(&["close-tab", "--match", &format!("id:{}", window_id)])
.run();
}
Ok(())
}
fn schedule_window_close(&self, full_name: &str, delay: Duration) -> Result<()> {
let panes = self.list_panes()?;
let scoped_panes = self.panes_in_current_scope(&panes);
let mut seen_tabs = HashSet::new();
let window_ids: Vec<u64> = scoped_panes
.iter()
.filter(|p| p.tab_title == full_name)
.filter(|p| seen_tabs.insert(p.tab_id))
.map(|p| p.window_id)
.collect();
if window_ids.is_empty() {
return Ok(());
}
let close_cmds: String = window_ids
.iter()
.map(|id| format!("kitten @ close-tab --match 'id:{}'", id))
.collect::<Vec<_>>()
.join("; ");
let script = format!(
"nohup sh -c 'sleep {}; {}' >/dev/null 2>&1 &",
delay.as_secs_f64(),
close_cmds
);
Cmd::new("sh").args(&["-c", &script]).run()?;
Ok(())
}
fn run_deferred_script(&self, script: &str) -> Result<()> {
let bg_script = format!("nohup sh -c '{}' >/dev/null 2>&1 &", script);
Cmd::new("sh").args(&["-c", &bg_script]).run()?;
Ok(())
}
fn shell_select_window_cmd(&self, full_name: &str) -> Result<String> {
let panes = self.list_panes()?;
let scoped_panes = self.panes_in_current_scope(&panes);
let target = scoped_panes
.iter()
.find(|p| p.tab_title == full_name)
.ok_or_else(|| anyhow!("Window '{}' not found", full_name))?;
Ok(format!(
"kitten @ focus-tab --match 'id:{}' >/dev/null 2>&1",
target.window_id
))
}
fn shell_kill_window_cmd(&self, full_name: &str) -> Result<String> {
let panes = self.list_panes()?;
let scoped_panes = self.panes_in_current_scope(&panes);
let target = scoped_panes
.iter()
.find(|p| p.tab_title == full_name)
.ok_or_else(|| anyhow!("Window '{}' not found", full_name))?;
Ok(format!(
"kitten @ close-tab --match 'id:{}' >/dev/null 2>&1",
target.window_id
))
}
fn shell_switch_session_cmd(&self, _full_name: &str) -> Result<String> {
Err(anyhow!(
"Session mode is not supported in Kitty. Use window mode instead."
))
}
fn shell_kill_session_cmd(&self, _full_name: &str) -> Result<String> {
Err(anyhow!(
"Session mode is not supported in Kitty. Use window mode instead."
))
}
fn select_window(&self, prefix: &str, name: &str) -> Result<()> {
let full_name = util::prefixed(prefix, name);
let panes = self.list_panes()?;
let scoped_panes = self.panes_in_current_scope(&panes);
let target = scoped_panes
.iter()
.find(|p| p.tab_title == full_name)
.ok_or_else(|| anyhow!("Window '{}' not found", full_name))?;
self.kitten_cmd()
.args(&["focus-tab", "--match", &format!("id:{}", target.window_id)])
.run()
.context("Failed to focus tab")?;
Ok(())
}
fn window_exists(&self, prefix: &str, name: &str) -> Result<bool> {
let full_name = util::prefixed(prefix, name);
self.window_exists_by_full_name(&full_name)
}
fn window_exists_by_full_name(&self, full_name: &str) -> Result<bool> {
let names = self.get_all_window_names()?;
Ok(names.contains(full_name))
}
fn current_window_name(&self) -> Result<Option<String>> {
let window_id = match self.current_window_id() {
Some(id) => id,
None => return Ok(None),
};
let panes = self.list_panes()?;
let current = panes.iter().find(|p| p.window_id == window_id);
Ok(current.map(|p| p.tab_title.clone()))
}
fn get_all_window_names(&self) -> Result<HashSet<String>> {
let panes = self.list_panes()?;
let scoped_panes = self.panes_in_current_scope(&panes);
let names: HashSet<String> = scoped_panes.iter().map(|p| p.tab_title.clone()).collect();
Ok(names)
}
fn filter_active_windows(&self, windows: &[String]) -> Result<Vec<String>> {
let all_current = self.get_all_window_names()?;
Ok(windows
.iter()
.filter(|w| all_current.contains(*w))
.cloned()
.collect())
}
fn find_last_window_with_prefix(&self, _prefix: &str) -> Result<Option<String>> {
Ok(None)
}
fn find_last_window_with_base_handle(
&self,
_prefix: &str,
_base_handle: &str,
) -> Result<Option<String>> {
Ok(None)
}
fn wait_until_windows_closed(&self, full_window_names: &[String]) -> Result<()> {
if full_window_names.is_empty() {
return Ok(());
}
let targets: HashSet<String> = full_window_names.iter().cloned().collect();
if targets.len() == 1 {
println!("Waiting for window '{}' to close...", full_window_names[0]);
} else {
println!("Waiting for {} windows to close...", targets.len());
}
loop {
if !self.is_running()? {
return Ok(());
}
let current_windows = self.get_all_window_names()?;
let any_exists = targets
.iter()
.any(|target| current_windows.contains(target));
if !any_exists {
return Ok(());
}
thread::sleep(Duration::from_millis(500));
}
}
fn select_pane(&self, pane_id: &str) -> Result<()> {
self.kitten_cmd()
.args(&["focus-window", "--match", &format!("id:{}", pane_id)])
.run()
.context("Failed to focus window")?;
Ok(())
}
fn switch_to_pane(&self, pane_id: &str, _window_hint: Option<&str>) -> Result<()> {
self.select_pane(pane_id)
}
fn kill_pane(&self, pane_id: &str) -> Result<()> {
self.kitten_cmd()
.args(&["close-window", "--match", &format!("id:{}", pane_id)])
.run()?;
Ok(())
}
fn respawn_pane(&self, pane_id: &str, cwd: &Path, cmd: Option<&str>) -> Result<String> {
let new_pane_id =
self.split_pane_internal(pane_id, SplitDirection::Vertical, cwd, None, None, cmd)?;
let _ = self.kill_pane(pane_id);
Ok(new_pane_id)
}
fn capture_pane(&self, pane_id: &str, lines: u16) -> Option<String> {
let output = self
.kitten_cmd()
.args(&["get-text", "--match", &format!("id:{}", pane_id), "--ansi"])
.run_and_capture_stdout()
.ok()?;
let all_lines: Vec<&str> = output.lines().collect();
let start = all_lines.len().saturating_sub(lines as usize);
Some(all_lines[start..].join("\n"))
}
fn send_keys(&self, pane_id: &str, command: &str) -> Result<()> {
self.kitten_cmd()
.args(&["send-text", "--match", &format!("id:{}", pane_id), command])
.run()
.context("Failed to send text to pane")?;
self.kitten_cmd()
.args(&["send-text", "--match", &format!("id:{}", pane_id), "\r"])
.run()
.context("Failed to send Enter key to pane")?;
Ok(())
}
fn send_keys_to_agent(&self, pane_id: &str, command: &str, agent: Option<&str>) -> Result<()> {
if agent::resolve_profile(agent).needs_bang_delay() && command.starts_with('!') {
self.kitten_cmd()
.args(&["send-text", "--match", &format!("id:{}", pane_id), "!"])
.run()
.context("Failed to send ! to pane")?;
thread::sleep(Duration::from_millis(50));
self.kitten_cmd()
.args(&[
"send-text",
"--match",
&format!("id:{}", pane_id),
&command[1..],
])
.run()
.context("Failed to send keys to pane")?;
self.kitten_cmd()
.args(&["send-text", "--match", &format!("id:{}", pane_id), "\r"])
.run()
.context("Failed to send Enter key to pane")?;
Ok(())
} else {
self.send_keys(pane_id, command)
}
}
fn send_key(&self, pane_id: &str, key: &str) -> Result<()> {
let translated = match key {
"BSpace" => "\x7f",
"Enter" => "\r",
"Tab" => "\t",
"Up" => "\x1b[A",
"Down" => "\x1b[B",
"Right" => "\x1b[C",
"Left" => "\x1b[D",
"Escape" => "\x1b",
_ => key,
};
self.kitten_cmd()
.args(&[
"send-text",
"--match",
&format!("id:{}", pane_id),
translated,
])
.run()
.context("Failed to send key to pane")?;
Ok(())
}
fn paste_multiline(&self, pane_id: &str, content: &str) -> Result<()> {
self.kitten_cmd()
.args(&[
"send-text",
"--match",
&format!("id:{}", pane_id),
"--bracketed-paste",
content,
])
.run()
.context("Failed to paste content to pane")?;
thread::sleep(Duration::from_millis(100));
self.kitten_cmd()
.args(&["send-text", "--match", &format!("id:{}", pane_id), "\r"])
.run()
.context("Failed to send Enter after paste")?;
Ok(())
}
fn get_default_shell(&self) -> Result<String> {
std::env::var("SHELL").or_else(|_| Ok("/bin/bash".to_string()))
}
fn create_handshake(&self) -> Result<Box<dyn PaneHandshake>> {
Ok(Box::new(UnixPipeHandshake::new()?))
}
fn set_status(&self, pane_id: &str, icon: &str, auto_clear_on_focus: bool) -> Result<()> {
let match_arg = format!("id:{}", pane_id);
let _ = self
.kitten_cmd()
.args(&[
"set-user-vars",
"--match",
&match_arg,
&format!("workmux_status={}", icon),
])
.run();
let auto_clear_val = if auto_clear_on_focus { "1" } else { "" };
let _ = self
.kitten_cmd()
.args(&[
"set-user-vars",
"--match",
&match_arg,
&format!("workmux_auto_clear={}", auto_clear_val),
])
.run();
Ok(())
}
fn clear_status(&self, pane_id: &str) -> Result<()> {
let _ = self
.kitten_cmd()
.args(&[
"set-user-vars",
"--match",
&format!("id:{}", pane_id),
"workmux_status=",
])
.run();
Ok(())
}
fn ensure_status_format(&self, _pane_id: &str) -> Result<()> {
Ok(())
}
fn current_session(&self) -> Option<String> {
self.current_os_window_id()
.map(|id| format!("os-window-{}", id))
}
fn get_all_window_names_all_sessions(&self) -> Result<HashSet<String>> {
let panes = self.list_panes()?;
let names: HashSet<String> = panes.iter().map(|p| p.tab_title.clone()).collect();
Ok(names)
}
fn instance_id(&self) -> String {
std::env::var("KITTY_LISTEN_ON").unwrap_or_else(|_| "default".to_string())
}
fn get_live_pane_info(&self, pane_id: &str) -> Result<Option<LivePaneInfo>> {
let pane_id_num: u64 = match pane_id.parse() {
Ok(id) => id,
Err(_) => return Ok(None),
};
let panes = self.list_panes()?;
let pane = panes.into_iter().find(|p| p.window_id == pane_id_num);
match pane {
Some(p) => Ok(Some(LivePaneInfo {
pid: Some(p.foreground_pid.unwrap_or(p.pid)),
current_command: p.foreground_command.or_else(|| Some("unknown".to_string())),
working_dir: p.cwd,
title: if p.title.is_empty() {
None
} else {
Some(p.title)
},
session: Some(format!("os-window-{}", p.os_window_id)),
window: Some(p.tab_title),
})),
None => Ok(None),
}
}
fn get_all_live_pane_info(&self) -> Result<HashMap<String, LivePaneInfo>> {
let mut result = HashMap::new();
for p in self.list_panes()? {
let pane_id = p.window_id.to_string();
result.insert(
pane_id,
LivePaneInfo {
pid: Some(p.foreground_pid.unwrap_or(p.pid)),
current_command: p.foreground_command.or_else(|| Some("unknown".to_string())),
working_dir: p.cwd,
title: if p.title.is_empty() {
None
} else {
Some(p.title)
},
session: Some(format!("os-window-{}", p.os_window_id)),
window: Some(p.tab_title),
},
);
}
Ok(result)
}
fn split_pane(
&self,
target_pane_id: &str,
direction: &SplitDirection,
cwd: &Path,
size: Option<u16>,
percentage: Option<u8>,
command: Option<&str>,
) -> Result<String> {
self.split_pane_internal(
target_pane_id,
direction.clone(),
cwd,
size,
percentage,
command,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kitty_backend_name() {
let backend = KittyBackend::new();
assert_eq!(backend.name(), "kitty");
}
}