use std::path::{Path, PathBuf};
use std::sync::Arc;
use chrono::Utc;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{info, warn};
use crate::core::names::{name_from_dir, name_from_uuid};
use super::record::{ManagedSessionId, ManagedSessionState, SessionRecord};
use super::store::{SessionStore, StoreError};
#[derive(Debug, Error)]
pub enum ManagedError {
#[error("tmux error: {0}")]
TmuxUnavailable(String),
#[error("session not found: {0}")]
SessionNotFound(String),
#[error("store error: {0}")]
Store(#[from] StoreError),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("name already in use: {0} — use `tm session ls` to find it")]
NameCollision(String),
#[error("invalid state transition for session {0}: {1}")]
InvalidState(String, String),
}
pub trait ManagedTmuxDriver: Send + Sync {
fn create_session(&self, name: &str, workdir: &str) -> Result<(), ManagedError>;
fn kill_session(&self, name: &str) -> Result<(), ManagedError>;
fn send_line(&self, name: &str, text: &str) -> Result<(), ManagedError>;
fn capture(&self, name: &str, lines: u32) -> Result<String, ManagedError>;
fn list_sessions(&self) -> Result<Vec<String>, ManagedError>;
fn session_exists(&self, name: &str) -> bool {
self.list_sessions()
.map(|names| names.iter().any(|n| n == name))
.unwrap_or(false)
}
}
#[derive(Debug, Default)]
pub struct ReconcileReport {
pub adopted: Vec<String>,
pub stopped: Vec<String>,
pub external_adopted: Vec<String>,
}
pub struct SessionManager {
pub(crate) store: Arc<RwLock<SessionStore>>,
tmux: Arc<dyn ManagedTmuxDriver>,
data_dir: PathBuf,
}
impl std::fmt::Debug for SessionManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionManager")
.field("data_dir", &self.data_dir)
.finish_non_exhaustive()
}
}
impl SessionManager {
pub async fn new(
data_dir: &Path,
tmux: Arc<dyn ManagedTmuxDriver>,
) -> Result<Self, ManagedError> {
let store = SessionStore::load(data_dir).await?;
Ok(Self {
store: Arc::new(RwLock::new(store)),
tmux,
data_dir: data_dir.to_owned(),
})
}
pub async fn create(
&self,
task: String,
cwd: Option<PathBuf>,
name_hint: Option<String>,
workspace_path: Option<PathBuf>,
repo_url: Option<String>,
branch: Option<String>,
) -> Result<SessionRecord, ManagedError> {
self.create_with_id(
ManagedSessionId::new(),
task,
cwd,
name_hint,
workspace_path,
repo_url,
branch,
crate::runtime::RuntimeKind::default(),
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn create_with_id(
&self,
id: ManagedSessionId,
task: String,
cwd: Option<PathBuf>,
name_hint: Option<String>,
workspace_path: Option<PathBuf>,
repo_url: Option<String>,
branch: Option<String>,
runtime: crate::runtime::RuntimeKind,
) -> Result<SessionRecord, ManagedError> {
let cwd = cwd.unwrap_or_else(|| dirs::home_dir().unwrap_or_else(|| PathBuf::from("/tmp")));
let tmux_name = if let Some(hint) = name_hint {
name_from_dir(Path::new(&hint))
} else if cwd != dirs::home_dir().unwrap_or_else(|| PathBuf::from("/tmp")) {
name_from_dir(&cwd)
} else {
name_from_uuid(id.as_uuid())
};
if self.tmux.session_exists(&tmux_name) {
return Err(ManagedError::NameCollision(tmux_name));
}
let workdir = cwd.to_string_lossy().to_string();
self.tmux
.create_session(&tmux_name, &workdir)
.map_err(|e| ManagedError::TmuxUnavailable(e.to_string()))?;
let mut correlation = crate::driver::SessionCorrelation::new();
if let Some(ref ws) = workspace_path {
correlation = correlation.with_worktree(ws.clone());
}
if let Some(ref b) = branch {
correlation = correlation.with_branch(b.clone());
}
let record = SessionRecord {
id,
tmux_name: tmux_name.clone(),
cwd,
task,
state: ManagedSessionState::Provisioning,
created_at: Utc::now(),
last_activity_at: None,
workspace_path,
repo_url,
branch,
pending_decision: None,
proposed_default: None,
correlation,
runtime,
};
self.store.write().await.upsert(record.clone()).await?;
info!(id = %id, name = %tmux_name, runtime = %runtime.as_str(), "managed session created");
Ok(record)
}
pub async fn answer_decision(
&self,
id: &ManagedSessionId,
answer: &str,
) -> Result<(), ManagedError> {
let mut record = self.get(id).await?;
record.pending_decision = None;
record.proposed_default = None;
self.tmux
.send_line(&record.tmux_name, answer)
.map_err(|e| ManagedError::TmuxUnavailable(e.to_string()))?;
record.last_activity_at = Some(Utc::now());
self.store.write().await.upsert(record).await?;
Ok(())
}
pub async fn get(&self, id: &ManagedSessionId) -> Result<SessionRecord, ManagedError> {
let mut guard = self.store.write().await;
if let Err(e) = guard.reload_if_changed().await {
warn!(id = %id, "session get: reload failed: {e}; using last-known record");
}
guard.cached_get(id).map_err(|e| match e {
StoreError::NotFound(k) => ManagedError::SessionNotFound(k),
other => ManagedError::Store(other),
})
}
pub async fn list(&self) -> Vec<SessionRecord> {
let mut guard = self.store.write().await;
match guard.all().await {
Ok(records) => records,
Err(e) => {
let last_known = guard.cached_all();
warn!(
count = last_known.len(),
"session list: reload failed: {e}; returning last-known in-memory set"
);
last_known
}
}
}
pub async fn send_input(&self, id: &ManagedSessionId, text: &str) -> Result<(), ManagedError> {
let mut record = self.get(id).await?;
if matches!(
record.state,
ManagedSessionState::Stopped | ManagedSessionState::Decommissioned
) {
return Err(ManagedError::TmuxUnavailable(format!(
"session {} is {}; cannot inject input",
record.tmux_name, record.state
)));
}
self.tmux
.send_line(&record.tmux_name, text)
.map_err(|e| ManagedError::TmuxUnavailable(e.to_string()))?;
record.last_activity_at = Some(Utc::now());
self.store.write().await.upsert(record).await?;
Ok(())
}
pub async fn stop(&self, id: &ManagedSessionId) -> Result<SessionRecord, ManagedError> {
let mut record = self.get(id).await?;
if let Err(e) = self.tmux.kill_session(&record.tmux_name) {
warn!(name = %record.tmux_name, "kill_session failed (may already be gone): {e}");
}
record.state = ManagedSessionState::Stopped;
self.store.write().await.upsert(record.clone()).await?;
info!(id = %id, name = %record.tmux_name, "managed session stopped (workspace intact)");
Ok(record)
}
pub async fn resume(&self, id: &ManagedSessionId) -> Result<SessionRecord, ManagedError> {
let mut record = self.get(id).await?;
match record.state {
ManagedSessionState::Stopped | ManagedSessionState::Errored => {}
ref s => {
return Err(ManagedError::InvalidState(
id.to_string(),
format!(
"cannot resume a session in state '{s}'; only Stopped or Errored sessions can be resumed"
),
));
}
}
let workdir = record
.workspace_path
.as_ref()
.unwrap_or(&record.cwd)
.to_string_lossy()
.to_string();
if self.tmux.session_exists(&record.tmux_name)
&& let Err(e) = self.tmux.kill_session(&record.tmux_name)
{
warn!(name = %record.tmux_name, "resume: kill stale session failed: {e}");
}
self.tmux
.create_session(&record.tmux_name, &workdir)
.map_err(|e| ManagedError::TmuxUnavailable(e.to_string()))?;
record.state = ManagedSessionState::Active;
record.last_activity_at = Some(Utc::now());
self.store.write().await.upsert(record.clone()).await?;
info!(id = %id, name = %record.tmux_name, workdir = %workdir, "managed session resumed");
Ok(record)
}
pub async fn decommission(&self, id: &ManagedSessionId) -> Result<SessionRecord, ManagedError> {
let mut record = self.get(id).await?;
if self.tmux.session_exists(&record.tmux_name)
&& let Err(e) = self.tmux.kill_session(&record.tmux_name)
{
warn!(name = %record.tmux_name, "decommission: kill_session failed: {e}");
}
if let Some(ref ws) = record.workspace_path {
if ws.exists() {
std::fs::remove_dir_all(ws).map_err(|e| {
ManagedError::Io(std::io::Error::new(
e.kind(),
format!("remove workspace {:?}: {e}", ws),
))
})?;
info!(id = %id, workspace = %ws.display(), "decommission: workspace removed from disk");
} else {
warn!(id = %id, workspace = %ws.display(), "decommission: workspace path absent (already removed?)");
}
}
record.workspace_path = None;
record.state = ManagedSessionState::Decommissioned;
self.store.write().await.upsert(record.clone()).await?;
info!(id = %id, name = %record.tmux_name, "managed session decommissioned");
Ok(record)
}
pub async fn reconcile_on_boot(
&self,
auto_resume: bool,
) -> Result<ReconcileReport, ManagedError> {
let live_names: std::collections::HashSet<String> = self
.tmux
.list_sessions()
.unwrap_or_else(|e| {
warn!("reconcile: list_sessions failed: {e}; assuming no live sessions");
Vec::new()
})
.into_iter()
.filter(|n| n.starts_with("tmpm-"))
.collect();
let mut report = ReconcileReport::default();
let mut guard = self.store.write().await;
let all_records = guard.all().await?;
let known_names: std::collections::HashSet<String> =
all_records.iter().map(|r| r.tmux_name.clone()).collect();
let mut to_resume: Vec<ManagedSessionId> = Vec::new();
for mut record in all_records {
if matches!(record.state, ManagedSessionState::Decommissioned) {
continue;
}
if live_names.contains(&record.tmux_name) {
record.state = ManagedSessionState::Active;
report.adopted.push(record.tmux_name.clone());
info!(name = %record.tmux_name, "reconcile: re-adopted live session");
} else {
record.state = ManagedSessionState::Stopped;
report.stopped.push(record.id.to_string());
warn!(name = %record.tmux_name, "reconcile: tmux session gone, marked Stopped (workspace intact, resumable)");
if auto_resume {
to_resume.push(record.id);
}
}
guard.upsert(record).await?;
}
for name in &live_names {
if !known_names.contains(name) {
let external = SessionRecord {
id: ManagedSessionId::new(),
tmux_name: name.clone(),
cwd: PathBuf::from("/unknown"),
task: "externally created".into(),
state: ManagedSessionState::Active,
created_at: Utc::now(),
last_activity_at: None,
workspace_path: None,
repo_url: None,
branch: None,
pending_decision: None,
proposed_default: None,
correlation: Default::default(),
runtime: crate::runtime::RuntimeKind::default(),
};
guard.upsert(external).await?;
report.external_adopted.push(name.clone());
info!(name = %name, "reconcile: adopted external tmpm- session");
}
}
drop(guard);
if auto_resume && !to_resume.is_empty() {
info!(
"reconcile: auto_resume=true, resuming {} stopped sessions",
to_resume.len()
);
for sid in to_resume {
if let Err(e) = self.resume(&sid).await {
warn!(id = %sid, "reconcile: auto_resume failed: {e}");
}
}
}
Ok(report)
}
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
pub fn tmux_driver(&self) -> Arc<dyn ManagedTmuxDriver> {
self.tmux.clone()
}
pub async fn capture_pane(
&self,
id: &ManagedSessionId,
lines: u32,
) -> Result<String, ManagedError> {
let record = self.get(id).await?;
self.tmux
.capture(&record.tmux_name, lines)
.map_err(|e| ManagedError::TmuxUnavailable(e.to_string()))
}
pub async fn mark_errored(
&self,
id: &ManagedSessionId,
error_msg: &str,
) -> Result<(), ManagedError> {
let mut record = self.get(id).await?;
record.state = ManagedSessionState::Errored;
record.task = format!("{} [error: {}]", record.task, error_msg);
self.store.write().await.upsert(record).await?;
Ok(())
}
pub async fn set_workspace(
&self,
id: &ManagedSessionId,
workspace_path: std::path::PathBuf,
new_state: ManagedSessionState,
) -> Result<(), ManagedError> {
let mut record = self.get(id).await?;
record.workspace_path = Some(workspace_path);
record.state = new_state;
self.store.write().await.upsert(record).await?;
Ok(())
}
}