use std::path::Path;
use tmux_interface::{KillSession, NewSession, PipePane, SendKeys, Tmux};
pub struct TmuxSession {
pub name: String, }
impl TmuxSession {
pub fn new(name: String) -> Self {
Self::create(name.clone()).unwrap_or(Self { name })
}
pub fn create(name: String) -> anyhow::Result<Self> {
let output = Tmux::new()
.add_command(NewSession::new().detached().session_name(&name))
.output()
.map_err(|e| anyhow::anyhow!("Failed to create tmux session '{}': {}", name, e))?;
if !output.success() {
let stderr = String::from_utf8_lossy(&output.stderr()).trim().to_string();
anyhow::bail!(
"Failed to create tmux session '{}': {}",
name,
if stderr.is_empty() {
"tmux returned a non-zero exit status"
} else {
&stderr
}
);
}
std::thread::sleep(std::time::Duration::from_secs(1));
Ok(Self { name })
}
pub fn send_command(&self, command: &str) {
self.try_send_command(command).ok();
}
pub fn try_send_command(&self, command: &str) -> anyhow::Result<()> {
let output = Tmux::new()
.add_command(SendKeys::new().target_pane(&self.name).key(command))
.add_command(SendKeys::new().target_pane(&self.name).key("Enter"))
.output()
.map_err(|e| {
anyhow::anyhow!(
"Failed to send command to tmux session '{}': {}",
self.name,
e
)
})?;
if !output.success() {
let stderr = String::from_utf8_lossy(&output.stderr()).trim().to_string();
anyhow::bail!(
"Failed to send command to tmux session '{}': {}",
self.name,
if stderr.is_empty() {
"tmux returned a non-zero exit status"
} else {
&stderr
}
);
}
Ok(())
}
pub fn enable_pipe_pane(&self, log_path: &Path) -> anyhow::Result<()> {
let log_path_str = log_path
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid log path"))?;
Tmux::with_command(
tmux_interface::PipePane::new()
.target_pane(&self.name)
.open()
.shell_command(format!("cat >> {}", log_path_str)),
)
.output()
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to enable pipe-pane: {}", e))
}
pub fn disable_pipe_pane(&self) -> anyhow::Result<()> {
Tmux::with_command(tmux_interface::PipePane::new().target_pane(&self.name))
.output()
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to disable pipe-pane: {}", e))
}
pub fn is_pipe_pane_active(&self) -> bool {
Tmux::with_command(
tmux_interface::DisplayMessage::new()
.target_pane(&self.name)
.print()
.message("#{pane_pipe}"),
)
.output()
.map(|output| output.success())
.unwrap_or(false)
}
}
pub fn normalize_session_name(name: &str) -> String {
let mut normalized = String::with_capacity(name.len());
let mut last_was_separator = false;
for ch in name.trim().chars() {
if ch.is_alphanumeric() || matches!(ch, '-' | '_') {
normalized.push(ch);
last_was_separator = false;
} else if !last_was_separator {
normalized.push('_');
last_was_separator = true;
}
}
normalized.trim_matches('_').to_string()
}
pub fn is_session_exist(name: &str) -> bool {
Tmux::with_command(tmux_interface::HasSession::new().target_session(name))
.output()
.map(|output| output.success())
.unwrap_or(false)
}
pub fn get_all_session_names() -> std::collections::HashSet<String> {
Tmux::with_command(tmux_interface::ListSessions::new().format("#{session_name}"))
.output()
.map(|output| {
if output.success() {
let stdout_bytes = output.stdout();
let stdout_str = String::from_utf8_lossy(&stdout_bytes);
stdout_str.lines().map(|line| line.to_string()).collect()
} else {
std::collections::HashSet::new()
}
})
.unwrap_or_else(|_| std::collections::HashSet::new())
}
pub fn send_ctrl_c(name: &str) -> anyhow::Result<()> {
Tmux::with_command(SendKeys::new().target_pane(name).key("C-c"))
.output()
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to send C-c to tmux session: {}", e))
}
pub fn disable_pipe_pane(name: &str) -> anyhow::Result<()> {
Tmux::with_command(tmux_interface::PipePane::new().target_pane(name))
.output()
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to disable pipe-pane: {}", e))
}
pub fn disable_pipe_pane_for_job(job_id: u32, session_name: &str, expect_failure: bool) {
tracing::info!(
"Disabling pipe-pane for job {} (session: {})",
job_id,
session_name
);
if let Err(e) = disable_pipe_pane(session_name) {
if expect_failure {
tracing::debug!(
"Could not disable pipe-pane for session '{}' (may already be gone): {}",
session_name,
e
);
} else {
tracing::warn!(
"Failed to disable pipe-pane for session '{}': {}",
session_name,
e
);
}
}
}
pub fn kill_session(name: &str) -> anyhow::Result<()> {
Tmux::with_command(tmux_interface::PipePane::new().target_pane(name))
.output()
.ok();
std::thread::sleep(std::time::Duration::from_secs(1));
Tmux::with_command(tmux_interface::KillSession::new().target_session(name))
.output()
.map(|_| ())
.map_err(|e| anyhow::anyhow!("Failed to kill tmux session: {}", e))
}
pub fn kill_sessions_batch(names: &[String]) -> Vec<(String, anyhow::Result<()>)> {
if names.is_empty() {
return Vec::new();
}
let existing_sessions = get_all_session_names();
let (existing, non_existing): (Vec<_>, Vec<_>) = names
.iter()
.partition(|name| existing_sessions.contains(*name));
let mut results = Vec::new();
for name in non_existing {
results.push((
name.clone(),
Err(anyhow::anyhow!("Session '{}' does not exist", name)),
));
}
if existing.is_empty() {
return results;
}
let mut tmux = Tmux::new();
for name in &existing {
tmux = tmux
.add_command(PipePane::new().target_pane(name.as_str()))
.add_command(KillSession::new().target_session(name.as_str()));
}
let batch_result = tmux.output();
match batch_result {
Ok(_) => {
for name in existing {
results.push((name.clone(), Ok(())));
}
}
Err(_) => {
for name in existing {
let result = kill_session(name);
results.push((name.clone(), result));
}
}
}
results
}
pub fn attach_to_session(name: &str) -> anyhow::Result<()> {
Tmux::with_command(tmux_interface::AttachSession::new().target_session(name))
.output()
.map_err(|e| anyhow::anyhow!("Failed to attach to tmux session: {}", e))?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::process::Command;
use std::time::{SystemTime, UNIX_EPOCH};
use tmux_interface::{HasSession, KillSession, Tmux};
use super::*;
#[test]
fn test_tmux_session() {
let tmux_usable = Command::new("tmux")
.arg("start-server")
.output()
.map(|output| output.status.success())
.unwrap_or(false);
if !tmux_usable {
eprintln!(
"Skipping test_tmux_session: tmux not usable (not installed or can't connect)"
);
return;
}
let session_name = format!(
"gflow-test-{}-{}",
std::process::id(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
TmuxSession::new(session_name.clone());
let has_session = Tmux::with_command(HasSession::new().target_session(&session_name))
.output()
.unwrap();
assert!(has_session.success());
Tmux::with_command(KillSession::new().target_session(&session_name))
.output()
.unwrap();
}
#[test]
fn test_tmux_session_create_reports_duplicate_session() {
let tmux_usable = Command::new("tmux")
.arg("start-server")
.output()
.map(|output| output.status.success())
.unwrap_or(false);
if !tmux_usable {
eprintln!(
"Skipping test_tmux_session_create_reports_duplicate_session: tmux not usable"
);
return;
}
let session_name = format!(
"gflow-test-dup-{}-{}",
std::process::id(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
);
let _session = TmuxSession::create(session_name.clone()).unwrap();
let error = TmuxSession::create(session_name.clone()).err().unwrap();
assert!(error.to_string().contains("Failed to create tmux session"));
Tmux::with_command(KillSession::new().target_session(&session_name))
.output()
.unwrap();
}
#[test]
fn normalize_session_name_replaces_tmux_target_delimiters() {
assert_eq!(
normalize_session_name(" train:v1.2 / gpu#0 "),
"train_v1_2_gpu_0"
);
assert_eq!(normalize_session_name("中文:实验.1"), "中文_实验_1");
assert_eq!(normalize_session_name("___"), "");
}
}