use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use crate::config::PullStrategy;
use crate::error::SessionError;
use crate::github::GitHubClient;
use crate::session::command::{AutoInputStep, SessionCommand};
use crate::session::event::SessionEvent;
use crate::session::osc::OscParser;
use crate::session::pty::{PtyChild, PtyConfig, PtyHandle};
use crate::session::{Session, SessionId, SessionStatus};
use crate::worktree::WorktreeManager;
struct SessionState<W> {
session: Session,
writer: Option<W>,
child: Arc<Mutex<Option<Box<dyn PtyChild>>>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
enum CwdSource {
Explicit(PathBuf),
CreateWorktree(Option<String>),
ExistingWorktree(PathBuf),
}
pub struct SessionManager<
P: PtyHandle,
W: WorktreeManager,
G: GitHubClient = crate::github::NativeGitHubClient,
> {
sessions: HashMap<SessionId, SessionState<P::Writer>>,
max_sessions: usize,
pty: Arc<P>,
worktree: Option<Arc<W>>,
auto_cleanup: bool,
pull_strategy: PullStrategy,
event_tx: mpsc::Sender<SessionEvent>,
socket_path: Option<PathBuf>,
github_client: Arc<G>,
}
impl<P: PtyHandle + 'static, W: WorktreeManager + 'static, G: GitHubClient + 'static>
SessionManager<P, W, G>
{
#[must_use]
pub fn new(
max_sessions: usize,
pty: P,
worktree: Option<W>,
auto_cleanup: bool,
pull_strategy: PullStrategy,
event_tx: mpsc::Sender<SessionEvent>,
github_client: G,
) -> Self {
Self {
sessions: HashMap::new(),
max_sessions,
pty: Arc::new(pty),
worktree: worktree.map(Arc::new),
auto_cleanup,
pull_strategy,
event_tx,
socket_path: None,
github_client: Arc::new(github_client),
}
}
pub fn set_socket_path(&mut self, path: PathBuf) {
self.socket_path = Some(path);
}
pub async fn run(mut self, mut command_rx: mpsc::Receiver<SessionCommand>) {
while let Some(cmd) = command_rx.recv().await {
self.dispatch_command(cmd).await;
}
}
async fn dispatch_command(&mut self, cmd: SessionCommand) {
match cmd {
SessionCommand::Create {
cmd,
args,
cwd,
branch_name,
rows,
cols,
response_tx,
} => {
let _ = response_tx.send(
self.handle_create(cmd, args, cwd, branch_name, rows, cols)
.await,
);
}
SessionCommand::Terminate { id, response_tx } => {
let _ = response_tx.send(self.handle_terminate(id));
}
SessionCommand::SendInput { id, data } => self.handle_input(id, &data),
SessionCommand::Resize { id, rows, cols } => self.handle_resize(id, rows, cols),
SessionCommand::List { response_tx } => {
let _ =
response_tx.send(self.sessions.values().map(|s| s.session.clone()).collect());
}
SessionCommand::CreateFromWorktree {
worktree_path,
cmd,
args,
rows,
cols,
response_tx,
} => {
let _ = response_tx.send(
self.handle_create_from_worktree(worktree_path, cmd, args, rows, cols)
.await,
);
}
SessionCommand::ListWorktrees { response_tx } => {
let _ = response_tx.send(self.handle_list_worktrees());
}
SessionCommand::DeleteWorktree { path, response_tx } => {
let _ = response_tx.send(self.handle_delete_worktree(&path));
}
SessionCommand::PullWorktree { path, response_tx } => {
let _ = response_tx.send(self.handle_pull_worktree(&path));
}
SessionCommand::FetchWorktree { path, response_tx } => {
let _ = response_tx.send(self.handle_fetch_worktree(&path));
}
SessionCommand::GetWorktreeStatus { path, response_tx } => {
let _ = response_tx.send(self.handle_get_worktree_status(&path));
}
SessionCommand::RefreshWorktreesAsync => self.handle_refresh_worktrees_async(),
SessionCommand::DeleteWorktreeAsync { path } => {
self.handle_delete_worktree_async(&path);
}
SessionCommand::PullWorktreeAsync { path } => self.handle_pull_worktree_async(&path),
SessionCommand::CloseSession { id, response_tx } => {
let _ = response_tx.send(self.handle_close_session(id));
}
SessionCommand::CreateWithAutoInput {
cmd,
args,
cwd,
branch_name,
rows,
cols,
auto_input,
response_tx,
} => {
let _ = response_tx.send(
self.handle_create_with_auto_input(
cmd,
args,
cwd,
branch_name,
rows,
cols,
auto_input,
)
.await,
);
}
SessionCommand::FetchIssuesAsync => self.handle_fetch_issues_async(),
SessionCommand::GenerateIssueActions { issue_number } => {
self.handle_generate_issue_actions(issue_number);
}
SessionCommand::FetchCostAsync => self.handle_fetch_cost_async(),
}
}
fn handle_fetch_cost_async(&self) {
let event_tx = self.event_tx.clone();
tokio::spawn(async move {
let cost = fetch_today_cost().await;
let _ = event_tx.send(SessionEvent::CostFetched { cost }).await;
});
}
fn handle_fetch_issues_async(&self) {
let event_tx = self.event_tx.clone();
let client = self.github_client.clone();
tokio::spawn(async move {
let result = client.fetch_issues().await.map_err(|e| e.to_string());
let _ = event_tx.send(SessionEvent::IssuesFetched { result }).await;
});
}
fn handle_generate_issue_actions(&self, issue_number: u32) {
let event_tx = self.event_tx.clone();
let client = self.github_client.clone();
tokio::spawn(async move {
let issue = match client.fetch_issue(issue_number).await {
Ok(issue) => issue,
Err(e) => {
let _ = event_tx
.send(SessionEvent::IssueActionsFetched {
issue_number,
result: Err(e.to_string()),
})
.await;
return;
}
};
let _ = event_tx
.send(SessionEvent::IssueFetched { issue_number })
.await;
let result = client
.generate_choices(&issue)
.await
.map_err(|e| e.to_string());
let _ = event_tx
.send(SessionEvent::IssueActionsFetched {
issue_number,
result,
})
.await;
});
}
fn handle_list_worktrees(&self) -> Vec<crate::worktree::WorktreeInfo> {
if let Some(ref wt) = self.worktree {
wt.list_worktrees().unwrap_or_default()
} else {
Vec::new()
}
}
fn handle_delete_worktree(&self, path: &Path) -> Result<(), SessionError> {
let wt = self
.worktree
.as_ref()
.ok_or(SessionError::WorktreeNotConfigured)?;
wt.remove_worktree(path).map_err(SessionError::Worktree)
}
fn handle_pull_worktree(&self, path: &Path) -> Result<(), SessionError> {
let wt = self
.worktree
.as_ref()
.ok_or(SessionError::WorktreeNotConfigured)?;
wt.git_pull(path, self.pull_strategy)
.map_err(SessionError::Worktree)
}
fn handle_fetch_worktree(&self, path: &Path) -> Result<(), SessionError> {
let wt = self
.worktree
.as_ref()
.ok_or(SessionError::WorktreeNotConfigured)?;
wt.git_fetch(path).map_err(SessionError::Worktree)
}
fn handle_get_worktree_status(
&self,
path: &Path,
) -> Result<crate::worktree::GitWorktreeStatus, SessionError> {
let wt = self
.worktree
.as_ref()
.ok_or(SessionError::WorktreeNotConfigured)?;
wt.get_status(path).map_err(SessionError::Worktree)
}
fn handle_refresh_worktrees_async(&self) {
let worktree = self.worktree.clone();
let event_tx = self.event_tx.clone();
tokio::spawn(async move {
let Some(wt) = worktree else {
let _ = event_tx
.send(SessionEvent::WorktreesRefreshed {
worktrees: Vec::new(),
fetch_pending: false,
})
.await;
return;
};
let initial = tokio::task::spawn_blocking({
let wt = wt.clone();
move || {
wt.list_worktrees()
.unwrap_or_default()
.into_iter()
.map(|mut info| {
if let Ok(status) = wt.get_status(&info.path) {
info.status = status;
}
info
})
.collect::<Vec<_>>()
}
})
.await
.unwrap_or_default();
let _ = event_tx
.send(SessionEvent::WorktreesRefreshed {
worktrees: initial.clone(),
fetch_pending: true,
})
.await;
let updated = tokio::task::spawn_blocking({
let wt = wt.clone();
move || {
if let Some(first) = initial.first() {
let _ = wt.git_fetch(&first.path);
}
initial
.into_iter()
.map(|mut info| {
if let Ok(status) = wt.get_status(&info.path) {
info.status = status;
}
info
})
.collect()
}
})
.await
.unwrap_or_default();
let _ = event_tx
.send(SessionEvent::WorktreesRefreshed {
worktrees: updated,
fetch_pending: false,
})
.await;
});
}
fn handle_delete_worktree_async(&self, path: &Path) {
let result = self.handle_delete_worktree(path).map_err(|e| e.to_string());
let _ = self.event_tx.try_send(SessionEvent::WorktreeDeleted {
path: path.to_path_buf(),
result,
});
}
fn handle_pull_worktree_async(&self, path: &Path) {
let result = self.handle_pull_worktree(path).and_then(|()| {
self.worktree
.as_ref()
.ok_or(SessionError::WorktreeNotConfigured)?
.get_status(path)
.map_err(SessionError::Worktree)
});
let _ = self.event_tx.try_send(SessionEvent::WorktreePulled {
path: path.to_path_buf(),
result: result.map_err(|e| e.to_string()),
});
}
fn spawn_session(
&mut self,
cmd: String,
args: Vec<String>,
rows: u16,
cols: u16,
cwd_source: CwdSource,
) -> Result<(SessionId, Option<String>), SessionError> {
if self.sessions.len() >= self.max_sessions {
return Err(SessionError::LimitReached {
max: self.max_sessions,
});
}
let (cwd, worktree_path, branch) = match cwd_source {
CwdSource::Explicit(path) => (Some(path), None, None),
CwdSource::CreateWorktree(branch_name) => {
if let Some(ref wt) = self.worktree {
let (path, branch) = wt
.create_worktree(branch_name.as_deref())
.map_err(SessionError::Worktree)?;
(Some(path.clone()), Some(path), Some(branch))
} else {
(std::env::current_dir().ok(), None, None)
}
}
CwdSource::ExistingWorktree(path) => {
if !path.exists() {
return Err(SessionError::WorktreeNotFound { path });
}
let branch = self.worktree.as_ref().and_then(|wt| {
wt.list_worktrees().ok().and_then(|list| {
list.into_iter()
.find(|info| info.path == path)
.map(|info| info.branch)
})
});
(Some(path.clone()), Some(path), branch)
}
};
let mut session = Session::new(&cmd);
let id = session.id;
session.worktree_path.clone_from(&worktree_path);
let mut env = HashMap::new();
env.insert("TAZUNA_SESSION_ID".to_string(), id.to_string());
if let Some(ref socket_path) = self.socket_path {
env.insert(
"TAZUNA_SOCKET_PATH".to_string(),
socket_path.display().to_string(),
);
}
let config = PtyConfig {
rows,
cols,
cmd,
args,
cwd,
env,
};
let (reader, writer, child) = self.pty.spawn(&config).map_err(SessionError::PtyCreation)?;
session.status = SessionStatus::Running;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let child = Arc::new(Mutex::new(Some(child)));
self.sessions.insert(
id,
SessionState {
session,
writer: Some(writer),
child: Arc::clone(&child),
shutdown_tx: Some(shutdown_tx),
},
);
let event_tx = self.event_tx.clone();
Self::reader_task(id, reader, event_tx, shutdown_rx, child);
Ok((id, branch))
}
async fn handle_create(
&mut self,
cmd: String,
args: Vec<String>,
cwd: Option<PathBuf>,
branch_name: Option<String>,
rows: u16,
cols: u16,
) -> Result<SessionId, SessionError> {
let cwd_source = cwd.map_or(CwdSource::CreateWorktree(branch_name), CwdSource::Explicit);
let (id, branch) = self.spawn_session(cmd, args, rows, cols, cwd_source)?;
let _ = self
.event_tx
.send(SessionEvent::Created {
id,
branch,
auto_input: Vec::new(),
})
.await;
Ok(id)
}
#[allow(clippy::too_many_arguments)]
async fn handle_create_with_auto_input(
&mut self,
cmd: String,
args: Vec<String>,
cwd: Option<PathBuf>,
branch_name: Option<String>,
rows: u16,
cols: u16,
auto_input: Vec<AutoInputStep>,
) -> Result<SessionId, SessionError> {
let cwd_source = cwd.map_or(CwdSource::CreateWorktree(branch_name), CwdSource::Explicit);
let (id, branch) = self.spawn_session(cmd, args, rows, cols, cwd_source)?;
let _ = self
.event_tx
.send(SessionEvent::Created {
id,
branch,
auto_input,
})
.await;
Ok(id)
}
async fn handle_create_from_worktree(
&mut self,
worktree_path: PathBuf,
cmd: String,
args: Vec<String>,
rows: u16,
cols: u16,
) -> Result<SessionId, SessionError> {
let (id, branch) = self.spawn_session(
cmd,
args,
rows,
cols,
CwdSource::ExistingWorktree(worktree_path),
)?;
let _ = self
.event_tx
.send(SessionEvent::Created {
id,
branch,
auto_input: Vec::new(),
})
.await;
Ok(id)
}
fn handle_terminate(&mut self, id: SessionId) -> Result<(), SessionError> {
let state = self
.sessions
.get_mut(&id)
.ok_or_else(|| SessionError::NotFound { id: id.to_string() })?;
if let Ok(mut guard) = state.child.lock()
&& let Some(ref mut child) = *guard
{
let _ = child.kill();
}
if let Some(shutdown_tx) = state.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
if self.auto_cleanup
&& let Some(ref wt) = self.worktree
&& let Some(ref path) = state.session.worktree_path
&& let Err(e) = wt.remove_worktree(path)
{
tracing::warn!("Failed to remove worktree: {e}");
}
self.sessions.remove(&id);
Ok(())
}
fn handle_close_session(&mut self, id: SessionId) -> Result<Option<PathBuf>, SessionError> {
let state = self
.sessions
.get(&id)
.ok_or_else(|| SessionError::NotFound { id: id.to_string() })?;
let worktree_path = state.session.worktree_path.clone();
self.sessions.remove(&id);
Ok(worktree_path)
}
fn handle_input(&mut self, id: SessionId, data: &[u8]) {
if let Some(state) = self.sessions.get_mut(&id)
&& let Some(ref mut writer) = state.writer
{
use std::io::Write;
let _ = writer.write_all(data);
let _ = writer.flush();
}
}
fn handle_resize(&mut self, id: SessionId, rows: u16, cols: u16) {
if let Some(state) = self.sessions.get(&id)
&& let Ok(mut guard) = state.child.lock()
&& let Some(ref mut child) = *guard
{
let _ = child.resize(rows, cols);
}
}
fn reader_task<R: std::io::Read + Send + 'static>(
id: SessionId,
reader: R,
event_tx: mpsc::Sender<SessionEvent>,
shutdown_rx: oneshot::Receiver<()>,
child: Arc<Mutex<Option<Box<dyn PtyChild>>>>,
) {
std::thread::spawn(move || {
Self::reader_thread(id, reader, event_tx, shutdown_rx, child);
});
}
#[allow(clippy::needless_pass_by_value)]
fn reader_thread<R: std::io::Read>(
id: SessionId,
mut reader: R,
event_tx: mpsc::Sender<SessionEvent>,
mut shutdown_rx: oneshot::Receiver<()>,
child: Arc<Mutex<Option<Box<dyn PtyChild>>>>,
) {
let mut buf = [0u8; 4096];
let mut osc_parser = OscParser::new();
loop {
if shutdown_rx.try_recv().is_ok() {
break;
}
match reader.read(&mut buf) {
Ok(0) => {
let exit_code = child
.lock()
.ok()
.and_then(|mut guard| guard.as_mut()?.try_wait().ok().flatten());
let _ = event_tx.blocking_send(SessionEvent::Terminated { id, exit_code });
break;
}
Ok(n) => {
let data = buf[..n].to_vec();
for result in osc_parser.feed_slice(&data) {
let crate::session::osc::OscResult::Title(title) = result;
let _ = event_tx.blocking_send(SessionEvent::TitleChanged { id, title });
}
let _ = event_tx.blocking_send(SessionEvent::Output { id, data });
}
Err(_) => {
let exit_code = child
.lock()
.ok()
.and_then(|mut guard| guard.as_mut()?.try_wait().ok().flatten());
let _ = event_tx.blocking_send(SessionEvent::Terminated { id, exit_code });
break;
}
}
}
}
#[must_use]
pub fn session_count(&self) -> usize {
self.sessions.len()
}
}
async fn fetch_today_cost() -> Option<f64> {
use chrono::Local;
use tokio::process::Command;
#[derive(serde::Deserialize)]
struct DailyResponse {
daily: Vec<DailyEntry>,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct DailyEntry {
total_cost: f64,
}
let today = Local::now().format("%Y%m%d").to_string();
let output = Command::new("bunx")
.args(["ccusage", "daily", "--since", &today, "--json"])
.output()
.await
.ok()?;
if !output.status.success() {
return None;
}
let response: DailyResponse = serde_json::from_slice(&output.stdout).ok()?;
response.daily.first().map(|e| e.total_cost)
}
#[cfg(test)]
#[path = "manager_tests.rs"]
mod tests;