use super::session_io;
use super::types::{FeedItem, SessionStatus};
use std::process::Stdio;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
pub struct SessionHandle {
_task: tokio::task::JoinHandle<()>,
}
pub struct SessionManager;
impl SessionManager {
pub fn spawn_session(
prompt: String,
resume_session_id: Option<String>,
working_dir: Option<std::path::PathBuf>,
model: Option<String>,
effort: Option<String>,
system_prompt: Option<String>,
mcp_config: Option<std::path::PathBuf>,
disable_builtin_tools: bool,
disable_all_tools: bool,
) -> (mpsc::UnboundedReceiver<SessionUpdate>, SessionHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
let _ = tx.send(SessionUpdate::Status(SessionStatus::Starting));
eprintln!(
"[keel:session] spawning claude -p (resume={:?})",
resume_session_id
);
let mut cmd = Command::new("claude");
cmd.env("PATH", super::claude_path::shell_path());
cmd.arg("-p")
.arg("--output-format")
.arg("stream-json")
.arg("--verbose")
.arg("--include-partial-messages");
if disable_all_tools {
cmd.arg("--allowedTools").arg("");
} else {
cmd.arg("--dangerously-skip-permissions");
if disable_builtin_tools {
cmd.arg("--disallowedTools")
.arg("Edit")
.arg("Write")
.arg("NotebookEdit");
}
}
if let Some(ref m) = model {
cmd.arg("--model").arg(m);
}
if let Some(ref e) = effort {
cmd.arg("--effort").arg(e);
}
if let Some(ref sp) = system_prompt {
cmd.arg("--system-prompt").arg(sp);
}
if let Some(ref mcp) = mcp_config {
cmd.arg("--mcp-config").arg(mcp);
}
if let Some(ref session_id) = resume_session_id {
cmd.arg("--resume").arg(session_id);
}
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
cmd.env_remove("CLAUDECODE");
if let Some(ref dir) = working_dir {
if !dir.is_dir() {
let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
"Working directory not found: {}. Was it deleted?",
dir.display()
))));
return;
}
cmd.current_dir(dir);
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.stdin(Stdio::piped());
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
"Failed to spawn claude: {e}"
))));
return;
}
};
if let Some(mut stdin) = child.stdin.take() {
if let Err(e) = stdin.write_all(prompt.as_bytes()).await {
let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
"Failed to write prompt to stdin: {e}"
))));
return;
}
drop(stdin);
}
if let Some(pid) = child.id() {
let _ = tx.send(SessionUpdate::ProcessPid(pid));
}
let _ = tx.send(SessionUpdate::Status(SessionStatus::Running));
eprintln!("[keel:session] claude process started, reading output");
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let mut io_set: JoinSet<Option<Vec<String>>> = JoinSet::new();
if let Some(stderr) = stderr {
let tx2 = tx.clone();
io_set.spawn(async move {
Some(session_io::read_stderr_lines(stderr, tx2).await)
});
}
if let Some(stdout) = stdout {
let tx2 = tx.clone();
io_set.spawn(async move {
session_io::read_stdout_events(stdout, tx2).await;
None
});
}
let mut stderr_lines = Vec::new();
while let Some(result) = io_set.join_next().await {
match result {
Ok(Some(lines)) => stderr_lines = lines,
Ok(None) => {}
Err(e) => {
let msg = format!("I/O reader panicked: {e:?}");
eprintln!("[keel:session] {msg}");
let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(msg)));
let _ = child.kill().await;
return;
}
}
}
eprintln!("[keel:session] stdout closed, waiting for process exit");
match child.wait().await {
Ok(status) => {
eprintln!("[keel:session] process exited with {status}");
if status.success() {
let _ = tx.send(SessionUpdate::Status(SessionStatus::Completed));
} else {
let stderr_summary = if stderr_lines.is_empty() {
"no stderr output captured".to_string()
} else {
stderr_lines.join("\n")
};
let _ = tx.send(SessionUpdate::Feed(FeedItem::ToolResult {
content: format!("Process stderr:\n{stderr_summary}"),
is_error: true,
}));
let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
"claude exited with {status}"
))));
}
}
Err(e) => {
let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
"Failed to wait for claude: {e}"
))));
}
}
});
let session_handle = SessionHandle { _task: handle };
(rx, session_handle)
}
}
#[derive(Debug, Clone)]
pub enum SessionUpdate {
Status(SessionStatus),
SessionId(String),
Feed(FeedItem),
ProcessPid(u32),
}