1use super::session_io;
2use super::types::{FeedItem, SessionStatus};
3use std::process::Stdio;
4use tokio::io::AsyncWriteExt;
5use tokio::process::Command;
6use tokio::sync::mpsc;
7use tokio::task::JoinSet;
8
9pub struct SessionHandle {
11 _task: tokio::task::JoinHandle<()>,
12}
13
14pub struct SessionManager;
16
17impl SessionManager {
18 pub fn spawn_session(
24 prompt: String,
25 resume_session_id: Option<String>,
26 working_dir: Option<std::path::PathBuf>,
27 model: Option<String>,
28 effort: Option<String>,
29 system_prompt: Option<String>,
30 mcp_config: Option<std::path::PathBuf>,
31 disable_builtin_tools: bool,
32 disable_all_tools: bool,
34 ) -> (mpsc::UnboundedReceiver<SessionUpdate>, SessionHandle) {
35 let (tx, rx) = mpsc::unbounded_channel();
36
37 let handle = tokio::spawn(async move {
38 let _ = tx.send(SessionUpdate::Status(SessionStatus::Starting));
39 eprintln!(
40 "[keel:session] spawning claude -p (resume={:?})",
41 resume_session_id
42 );
43
44 let mut cmd = Command::new("claude");
45 cmd.env("PATH", super::claude_path::shell_path());
46 cmd.arg("-p")
47 .arg("--output-format")
48 .arg("stream-json")
49 .arg("--verbose")
50 .arg("--include-partial-messages");
51
52 if disable_all_tools {
53 cmd.arg("--allowedTools").arg("");
55 } else {
56 cmd.arg("--dangerously-skip-permissions");
58 if disable_builtin_tools {
59 cmd.arg("--disallowedTools")
60 .arg("Edit")
61 .arg("Write")
62 .arg("NotebookEdit");
63 }
64 }
65
66 if let Some(ref m) = model {
67 cmd.arg("--model").arg(m);
68 }
69 if let Some(ref e) = effort {
70 cmd.arg("--effort").arg(e);
71 }
72 if let Some(ref sp) = system_prompt {
73 cmd.arg("--system-prompt").arg(sp);
74 }
75 if let Some(ref mcp) = mcp_config {
76 cmd.arg("--mcp-config").arg(mcp);
77 }
78 if let Some(ref session_id) = resume_session_id {
79 cmd.arg("--resume").arg(session_id);
80 }
81
82 cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
84 cmd.env_remove("CLAUDECODE");
85
86 if let Some(ref dir) = working_dir {
87 if !dir.is_dir() {
88 let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
89 "Working directory not found: {}. Was it deleted?",
90 dir.display()
91 ))));
92 return;
93 }
94 cmd.current_dir(dir);
95 }
96
97 cmd.stdout(Stdio::piped());
98 cmd.stderr(Stdio::piped());
99 cmd.stdin(Stdio::piped());
100
101 let mut child = match cmd.spawn() {
102 Ok(c) => c,
103 Err(e) => {
104 let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
105 "Failed to spawn claude: {e}"
106 ))));
107 return;
108 }
109 };
110
111 if let Some(mut stdin) = child.stdin.take() {
112 if let Err(e) = stdin.write_all(prompt.as_bytes()).await {
113 let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
114 "Failed to write prompt to stdin: {e}"
115 ))));
116 return;
117 }
118 drop(stdin);
119 }
120
121 if let Some(pid) = child.id() {
122 let _ = tx.send(SessionUpdate::ProcessPid(pid));
123 }
124 let _ = tx.send(SessionUpdate::Status(SessionStatus::Running));
125 eprintln!("[keel:session] claude process started, reading output");
126
127 let stdout = child.stdout.take();
128 let stderr = child.stderr.take();
129
130 let mut io_set: JoinSet<Option<Vec<String>>> = JoinSet::new();
133
134 if let Some(stderr) = stderr {
135 let tx2 = tx.clone();
136 io_set.spawn(async move {
137 Some(session_io::read_stderr_lines(stderr, tx2).await)
138 });
139 }
140 if let Some(stdout) = stdout {
141 let tx2 = tx.clone();
142 io_set.spawn(async move {
143 session_io::read_stdout_events(stdout, tx2).await;
144 None
145 });
146 }
147
148 let mut stderr_lines = Vec::new();
150 while let Some(result) = io_set.join_next().await {
151 match result {
152 Ok(Some(lines)) => stderr_lines = lines,
153 Ok(None) => {}
154 Err(e) => {
155 let msg = format!("I/O reader panicked: {e:?}");
156 eprintln!("[keel:session] {msg}");
157 let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(msg)));
158 let _ = child.kill().await;
159 return;
160 }
161 }
162 }
163
164 eprintln!("[keel:session] stdout closed, waiting for process exit");
165 match child.wait().await {
166 Ok(status) => {
167 eprintln!("[keel:session] process exited with {status}");
168 if status.success() {
169 let _ = tx.send(SessionUpdate::Status(SessionStatus::Completed));
170 } else {
171 let stderr_summary = if stderr_lines.is_empty() {
172 "no stderr output captured".to_string()
173 } else {
174 stderr_lines.join("\n")
175 };
176 let _ = tx.send(SessionUpdate::Feed(FeedItem::ToolResult {
177 content: format!("Process stderr:\n{stderr_summary}"),
178 is_error: true,
179 }));
180 let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
181 "claude exited with {status}"
182 ))));
183 }
184 }
185 Err(e) => {
186 let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
187 "Failed to wait for claude: {e}"
188 ))));
189 }
190 }
191 });
192
193 let session_handle = SessionHandle { _task: handle };
194 (rx, session_handle)
195 }
196}
197
198#[derive(Debug, Clone)]
200pub enum SessionUpdate {
201 Status(SessionStatus),
202 SessionId(String),
203 Feed(FeedItem),
204 ProcessPid(u32),
205}