use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Mutex;
use chrono::Utc;
use tempfile::TempDir;
use super::manager::{ManagedError, ManagedTmuxDriver, SessionManager};
use super::record::{ManagedSessionId, ManagedSessionState, SessionRecord};
use std::sync::Arc;
pub struct FakeTmuxDriver {
sessions: Mutex<HashMap<String, String>>,
pub send_calls: Mutex<Vec<(String, String)>>,
pub kill_calls: Mutex<Vec<String>>,
pub capture_responses: Mutex<HashMap<String, String>>,
pub seeded_names: Mutex<Vec<String>>,
pub create_cwd_calls: Mutex<Vec<(String, String)>>,
}
impl FakeTmuxDriver {
pub fn new() -> Arc<Self> {
Arc::new(Self {
sessions: Mutex::new(HashMap::new()),
send_calls: Mutex::new(Vec::new()),
kill_calls: Mutex::new(Vec::new()),
capture_responses: Mutex::new(HashMap::new()),
seeded_names: Mutex::new(Vec::new()),
create_cwd_calls: Mutex::new(Vec::new()),
})
}
}
impl ManagedTmuxDriver for FakeTmuxDriver {
fn create_session(&self, name: &str, workdir: &str) -> Result<(), ManagedError> {
self.sessions
.lock()
.unwrap()
.insert(name.to_owned(), workdir.to_owned());
self.create_cwd_calls
.lock()
.unwrap()
.push((name.to_owned(), workdir.to_owned()));
Ok(())
}
fn kill_session(&self, name: &str) -> Result<(), ManagedError> {
self.kill_calls.lock().unwrap().push(name.to_owned());
self.sessions.lock().unwrap().remove(name);
Ok(())
}
fn send_line(&self, name: &str, text: &str) -> Result<(), ManagedError> {
self.send_calls
.lock()
.unwrap()
.push((name.to_owned(), text.to_owned()));
Ok(())
}
fn capture(&self, name: &str, _lines: u32) -> Result<String, ManagedError> {
Ok(self
.capture_responses
.lock()
.unwrap()
.get(name)
.cloned()
.unwrap_or_default())
}
fn list_sessions(&self) -> Result<Vec<String>, ManagedError> {
let mut names: Vec<String> = self.sessions.lock().unwrap().keys().cloned().collect();
names.extend(self.seeded_names.lock().unwrap().iter().cloned());
Ok(names)
}
}
async fn make_manager(dir: &TempDir) -> (SessionManager, Arc<FakeTmuxDriver>) {
let fake = FakeTmuxDriver::new();
let mgr = SessionManager::new(dir.path(), fake.clone())
.await
.expect("manager");
(mgr, fake)
}
#[tokio::test]
async fn manager_create_record() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create(
"implement OAuth2".into(),
Some(PathBuf::from("/tmp/wt1")),
None,
None,
None,
None,
)
.await
.expect("create");
assert!(
record.tmux_name.starts_with("tmpm-"),
"tmux_name has prefix: {}",
record.tmux_name
);
assert_eq!(record.state, ManagedSessionState::Provisioning);
assert_eq!(record.task, "implement OAuth2");
let listed = mgr.list().await;
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].id, record.id);
}
#[tokio::test]
async fn manager_naming_convention() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create(
"task".into(),
Some(PathBuf::from("/tmp/wt1")),
None,
None,
None,
None,
)
.await
.expect("create");
assert!(record.tmux_name.starts_with("tmpm-"), "has tmpm- prefix");
}
#[tokio::test]
async fn manager_name_hint_overrides() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create(
"task".into(),
None,
Some("ticket-1234".into()),
None,
None,
None,
)
.await
.expect("create");
assert_eq!(record.tmux_name, "tmpm-ticket-1234");
}
#[tokio::test]
async fn manager_stop_keeps_workspace() {
let dir = TempDir::new().unwrap();
let workspace_dir = TempDir::new().unwrap();
let (mgr, fake) = make_manager(&dir).await;
let record = mgr
.create(
"task".into(),
Some(workspace_dir.path().to_owned()),
None,
Some(workspace_dir.path().to_owned()),
None,
None,
)
.await
.expect("create");
let stopped = mgr.stop(&record.id).await.expect("stop");
assert_eq!(stopped.state, ManagedSessionState::Stopped);
assert!(fake.kill_calls.lock().unwrap().contains(&record.tmux_name));
assert!(
workspace_dir.path().exists(),
"workspace dir must survive a stop; it is still on disk for resume"
);
let after = mgr.get(&record.id).await.unwrap();
assert_eq!(after.state, ManagedSessionState::Stopped);
assert!(
after.workspace_path.is_some(),
"workspace_path must be preserved in the record after stop"
);
}
#[tokio::test]
async fn manager_resume_respawns_in_existing_workspace() {
let dir = TempDir::new().unwrap();
let workspace_dir = TempDir::new().unwrap();
let (mgr, fake) = make_manager(&dir).await;
let workspace_path = workspace_dir.path().to_owned();
let record = mgr
.create(
"task".into(),
Some(workspace_path.clone()),
Some("my-session".into()),
Some(workspace_path.clone()),
Some("https://github.com/owner/repo".into()),
Some("main".into()),
)
.await
.expect("create");
mgr.stop(&record.id).await.expect("stop");
let creates_before = fake.create_cwd_calls.lock().unwrap().len();
let resumed = mgr.resume(&record.id).await.expect("resume");
assert_eq!(resumed.state, ManagedSessionState::Active);
let (create_len, resume_cwd) = {
let create_calls = fake.create_cwd_calls.lock().unwrap();
let len = create_calls.len();
let cwd = create_calls
.get(creates_before)
.map(|c| c.1.clone())
.unwrap_or_default();
(len, cwd)
};
assert!(
create_len > creates_before,
"resume must issue a new tmux create_session call"
);
assert_eq!(
resume_cwd,
workspace_path.to_string_lossy().to_string(),
"resume must use the EXISTING workspace path as cwd, not re-clone"
);
let after = mgr.get(&record.id).await.unwrap();
assert_eq!(
after.workspace_path.as_deref(),
Some(workspace_path.as_path()),
"workspace_path must be preserved after resume (no re-clone)"
);
}
#[tokio::test]
async fn manager_decommission_removes_workspace() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let workspace_dir = TempDir::new().unwrap();
let workspace_path = workspace_dir.path().to_owned();
std::fs::write(workspace_path.join("sentinel.txt"), "exists").unwrap();
let record = mgr
.create(
"task".into(),
Some(workspace_path.clone()),
None,
Some(workspace_path.clone()),
None,
None,
)
.await
.expect("create");
let tombstone = mgr.decommission(&record.id).await.expect("decommission");
assert_eq!(tombstone.state, ManagedSessionState::Decommissioned);
assert!(
tombstone.workspace_path.is_none(),
"workspace_path must be None after decommission (workspace was deleted)"
);
assert!(
!workspace_path.exists(),
"workspace directory must be removed from disk after decommission"
);
let after = mgr.get(&record.id).await.unwrap();
assert_eq!(after.state, ManagedSessionState::Decommissioned);
assert!(after.workspace_path.is_none());
}
#[tokio::test]
async fn manager_reconcile_gone_tmux_yields_stopped() {
let dir = TempDir::new().unwrap();
let fake = FakeTmuxDriver::new();
fake.seeded_names
.lock()
.unwrap()
.push("tmpm-live-session".into());
let mgr = SessionManager::new(dir.path(), fake.clone()).await.unwrap();
let live_record = SessionRecord {
id: ManagedSessionId::new(),
tmux_name: "tmpm-live-session".into(),
cwd: PathBuf::from("/tmp"),
task: "live task".into(),
state: ManagedSessionState::Active,
created_at: Utc::now(),
last_activity_at: None,
workspace_path: Some(PathBuf::from("/tmp/live-ws")),
repo_url: None,
branch: None,
pending_decision: None,
proposed_default: None,
correlation: Default::default(),
runtime: Default::default(),
};
let rebooted_record = SessionRecord {
id: ManagedSessionId::new(),
tmux_name: "tmpm-rebooted-session".into(),
cwd: PathBuf::from("/tmp"),
task: "rebooted task".into(),
state: ManagedSessionState::Active,
created_at: Utc::now(),
last_activity_at: None,
workspace_path: Some(PathBuf::from("/tmp/rebooted-ws")),
repo_url: None,
branch: None,
pending_decision: None,
proposed_default: None,
correlation: Default::default(),
runtime: Default::default(),
};
{
let mut store = mgr.store.write().await;
store.upsert(live_record.clone()).await.unwrap();
store.upsert(rebooted_record.clone()).await.unwrap();
}
let report = mgr.reconcile_on_boot(false).await.expect("reconcile");
assert!(report.adopted.contains(&"tmpm-live-session".to_string()));
assert!(
report.stopped.contains(&rebooted_record.id.to_string()),
"gone session must be in report.stopped; report: {:?}",
report
);
let live = mgr.get(&live_record.id).await.unwrap();
assert_eq!(live.state, ManagedSessionState::Active);
let rebooted = mgr.get(&rebooted_record.id).await.unwrap();
assert_eq!(
rebooted.state,
ManagedSessionState::Stopped,
"gone-tmux session must be Stopped (resumable), not Orphaned or Dead"
);
assert!(
rebooted.workspace_path.is_some(),
"workspace_path must be preserved after reconcile→Stopped"
);
}
#[tokio::test]
async fn manager_reconcile_skips_decommissioned() {
let dir = TempDir::new().unwrap();
let fake = FakeTmuxDriver::new();
let mgr = SessionManager::new(dir.path(), fake.clone()).await.unwrap();
let tombstone = SessionRecord {
id: ManagedSessionId::new(),
tmux_name: "tmpm-decomm".into(),
cwd: PathBuf::from("/tmp"),
task: "done task".into(),
state: ManagedSessionState::Decommissioned,
created_at: Utc::now(),
last_activity_at: None,
workspace_path: None,
repo_url: None,
branch: None,
pending_decision: None,
proposed_default: None,
correlation: Default::default(),
runtime: Default::default(),
};
{
let mut store = mgr.store.write().await;
store.upsert(tombstone.clone()).await.unwrap();
}
let report = mgr.reconcile_on_boot(false).await.expect("reconcile");
assert!(!report.adopted.contains(&tombstone.tmux_name));
assert!(!report.stopped.contains(&tombstone.id.to_string()));
let after = mgr.get(&tombstone.id).await.unwrap();
assert_eq!(after.state, ManagedSessionState::Decommissioned);
}
#[tokio::test]
async fn manager_send_input() {
let dir = TempDir::new().unwrap();
let (mgr, fake) = make_manager(&dir).await;
let record = mgr
.create(
"task".into(),
Some(PathBuf::from("/tmp/x")),
None,
None,
None,
None,
)
.await
.expect("create");
{
let mut store = mgr.store.write().await;
let mut r = store.get(&record.id).await.unwrap();
r.state = ManagedSessionState::Active;
store.upsert(r).await.unwrap();
}
mgr.send_input(&record.id, "hello from test")
.await
.expect("send");
let calls = fake.send_calls.lock().unwrap();
assert!(calls.iter().any(|(_, text)| text == "hello from test"));
}
#[tokio::test]
async fn manager_send_input_rejected_for_stopped_and_decommissioned() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create(
"task".into(),
Some(PathBuf::from("/tmp/x")),
None,
None,
None,
None,
)
.await
.expect("create");
{
let mut store = mgr.store.write().await;
let mut r = store.get(&record.id).await.unwrap();
r.state = ManagedSessionState::Stopped;
store.upsert(r).await.unwrap();
}
let result = mgr.send_input(&record.id, "test").await;
assert!(result.is_err(), "send_input must fail for Stopped sessions");
{
let mut store = mgr.store.write().await;
let mut r = store.get(&record.id).await.unwrap();
r.state = ManagedSessionState::Decommissioned;
store.upsert(r).await.unwrap();
}
let result = mgr.send_input(&record.id, "test").await;
assert!(
result.is_err(),
"send_input must fail for Decommissioned sessions"
);
}
#[tokio::test]
async fn manager_env_scrub_command_sent() {
let dir = TempDir::new().unwrap();
let fake = FakeTmuxDriver::new();
let mgr = SessionManager::new(dir.path(), fake.clone()).await.unwrap();
let record = mgr
.create(
"task".into(),
Some(PathBuf::from("/tmp/x")),
None,
None,
None,
None,
)
.await
.expect("create");
let scrubbed_cmd = "env -u ANTHROPIC_API_KEY claude";
mgr.send_input(
&{
let mut store = mgr.store.write().await;
let mut r = store.get(&record.id).await.unwrap();
r.state = ManagedSessionState::Active;
store.upsert(r).await.unwrap();
record.id
},
scrubbed_cmd,
)
.await
.expect("send");
let calls = fake.send_calls.lock().unwrap();
let found = calls
.iter()
.any(|(_, cmd)| cmd.contains("env -u ANTHROPIC_API_KEY claude"));
assert!(found, "env scrub command must be sent; calls: {calls:?}");
}
#[tokio::test]
async fn manager_answer_decision() {
let dir = TempDir::new().unwrap();
let (mgr, fake) = make_manager(&dir).await;
let record = mgr
.create(
"task".into(),
Some(PathBuf::from("/tmp/x")),
None,
None,
None,
None,
)
.await
.expect("create");
{
let mut store = mgr.store.write().await;
let mut r = store.get(&record.id).await.unwrap();
r.pending_decision = Some("merge or rebase?".into());
r.proposed_default = Some("rebase".into());
store.upsert(r).await.unwrap();
}
mgr.answer_decision(&record.id, "rebase")
.await
.expect("answer");
let injected = {
let calls = fake.send_calls.lock().unwrap();
calls.iter().any(|(_, text)| text == "rebase")
};
assert!(injected);
let after = mgr.get(&record.id).await.unwrap();
assert!(after.pending_decision.is_none());
assert!(after.proposed_default.is_none());
}
#[tokio::test]
async fn spawn_session_tmux_cwd_is_workspace() {
use crate::session_manager::record::ManagedSessionId;
use tempfile::TempDir;
let store_dir = TempDir::new().unwrap();
let workspace_root = TempDir::new().unwrap();
let fake = FakeTmuxDriver::new();
let mgr = SessionManager::new(store_dir.path(), fake.clone())
.await
.expect("manager");
let session_id = ManagedSessionId::new();
let provisioner = crate::provisioner::WorkspaceProvisioner::without_prepare(
crate::provisioner::FakeGitBackend::new(),
workspace_root.path().to_owned(),
);
let prepared = provisioner
.provision(&session_id, "https://github.com/owner/repo", "main", "task")
.expect("provision");
let workspace_path = prepared.path.clone();
let record = mgr
.create_with_id(
session_id,
"task".into(),
Some(workspace_path.clone()),
None,
Some(workspace_path.clone()),
Some("https://github.com/owner/repo".into()),
Some("main".into()),
crate::runtime::RuntimeKind::default(),
)
.await
.expect("create_with_id");
let cwd_calls = fake.create_cwd_calls.lock().unwrap();
assert_eq!(
cwd_calls.len(),
1,
"exactly one tmux session must be created"
);
let (session_name, cwd) = &cwd_calls[0];
assert_eq!(
session_name, &record.tmux_name,
"session name must match the record"
);
assert_eq!(
cwd,
&workspace_path.to_string_lossy().to_string(),
"tmux session cwd must equal the provisioned workspace path"
);
let home = dirs::home_dir()
.map(|h| h.to_string_lossy().to_string())
.unwrap_or_default();
assert_ne!(
cwd, &home,
"tmux session cwd must NOT be $HOME (workspace-isolation regression)"
);
assert_ne!(
cwd, "/tmp",
"tmux session cwd must NOT be /tmp (workspace-isolation regression)"
);
assert!(
workspace_path.starts_with(workspace_root.path()),
"workspace must be under the mpm workspace root"
);
}
#[tokio::test]
async fn manager_create_defaults_runtime_to_claude_code() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create(
"task".into(),
Some(PathBuf::from("/tmp/wt-d")),
None,
None,
None,
None,
)
.await
.expect("create");
assert_eq!(record.runtime, crate::runtime::RuntimeKind::ClaudeCode);
let reloaded = mgr.get(&record.id).await.expect("get");
assert_eq!(reloaded.runtime, crate::runtime::RuntimeKind::ClaudeCode);
}
#[tokio::test]
async fn manager_create_persists_runtime() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create_with_id(
ManagedSessionId::new(),
"task".into(),
Some(PathBuf::from("/tmp/wt-t")),
None,
None,
None,
None,
crate::runtime::RuntimeKind::Tcode,
)
.await
.expect("create_with_id");
assert_eq!(record.runtime, crate::runtime::RuntimeKind::Tcode);
let reloaded = mgr.get(&record.id).await.expect("get");
assert_eq!(
reloaded.runtime,
crate::runtime::RuntimeKind::Tcode,
"runtime must survive persistence so resume re-spawns the same backend"
);
}
#[tokio::test]
async fn manager_get_reflects_out_of_process_write() {
let dir = TempDir::new().unwrap();
let (mgr_a, _fake_a) = make_manager(&dir).await;
let (mgr_b, fake_b) = make_manager(&dir).await;
let record = mgr_a
.create(
"shared-state task".into(),
Some(PathBuf::from("/tmp/wt-shared")),
None,
None,
None,
None,
)
.await
.expect("create");
let id = record.id;
mgr_a.stop(&id).await.expect("stop");
let before = mgr_a.get(&id).await.expect("get before");
assert_eq!(
before.state,
ManagedSessionState::Stopped,
"precondition: manager A sees the session as Stopped"
);
fake_b.seeded_names.lock().unwrap().clear();
mgr_b.resume(&id).await.expect("supervisor resume");
let after = mgr_a.get(&id).await.expect("get after");
assert_eq!(
after.state,
ManagedSessionState::Active,
"manager A must reflect the out-of-process resume written by manager B"
);
let listed = mgr_a.list().await;
let found = listed
.iter()
.find(|r| r.id == id)
.expect("session present in list");
assert_eq!(
found.state,
ManagedSessionState::Active,
"manager A's list must also reflect the out-of-process write"
);
}
fn corrupt_store_file(mgr: &SessionManager) {
let path = mgr.data_dir().join("sessions.json");
std::fs::write(&path, b"{ this is not valid json ]").expect("corrupt store file");
}
#[tokio::test]
async fn manager_list_returns_last_known_on_reload_error() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create(
"fleet-visibility task".into(),
Some(PathBuf::from("/tmp/wt-lastknown")),
None,
None,
None,
None,
)
.await
.expect("create");
let id = record.id;
assert_eq!(
mgr.list().await.len(),
1,
"precondition: one session listed"
);
corrupt_store_file(&mgr);
let listed = mgr.list().await;
assert_eq!(
listed.len(),
1,
"list() must return the last-known set on reload error, not empty: {listed:?}"
);
assert_eq!(
listed[0].id, id,
"the last-known record must be the one we created"
);
}
#[tokio::test]
async fn manager_get_returns_last_known_on_reload_error() {
let dir = TempDir::new().unwrap();
let (mgr, _fake) = make_manager(&dir).await;
let record = mgr
.create(
"single-session task".into(),
Some(PathBuf::from("/tmp/wt-getlastknown")),
None,
None,
None,
None,
)
.await
.expect("create");
let id = record.id;
corrupt_store_file(&mgr);
let got = mgr
.get(&id)
.await
.expect("get must return last-known record on reload error");
assert_eq!(got.id, id, "get() returned the last-known record");
let missing = ManagedSessionId::new();
assert!(
matches!(
mgr.get(&missing).await,
Err(ManagedError::SessionNotFound(_))
),
"an unknown id must still yield SessionNotFound"
);
}