Skip to main content

keel_sessions/
manager.rs

1use super::session_io;
2use super::types::{FeedItem, SessionStatus};
3use std::process::Stdio;
4use tokio::io::AsyncWriteExt;
5use tokio::process::Command;
6use tokio::sync::mpsc;
7use tokio::task::JoinSet;
8
9/// Handle to a running session — keeps the task alive via its JoinHandle.
10pub struct SessionHandle {
11    _task: tokio::task::JoinHandle<()>,
12}
13
14/// Spawns a Claude/Codex session and streams parsed events back.
15pub struct SessionManager;
16
17impl SessionManager {
18    /// Start a new CLI session with the given prompt.
19    ///
20    /// Returns a receiver of session updates and a handle that keeps the task alive.
21    /// All sessions use --dangerously-skip-permissions because they're automated.
22    /// Safety is controlled via system prompts and --disallowedTools / --tools flags.
23    pub fn spawn_session(
24        prompt: String,
25        resume_session_id: Option<String>,
26        working_dir: Option<std::path::PathBuf>,
27        model: Option<String>,
28        effort: Option<String>,
29        system_prompt: Option<String>,
30        mcp_config: Option<std::path::PathBuf>,
31        disable_builtin_tools: bool,
32        // When true, disables ALL tools (--allowedTools ""). Use for pure conversation.
33        disable_all_tools: bool,
34    ) -> (mpsc::UnboundedReceiver<SessionUpdate>, SessionHandle) {
35        let (tx, rx) = mpsc::unbounded_channel();
36
37        let handle = tokio::spawn(async move {
38            let _ = tx.send(SessionUpdate::Status(SessionStatus::Starting));
39            eprintln!(
40                "[keel:session] spawning claude -p (resume={:?})",
41                resume_session_id
42            );
43
44            let mut cmd = Command::new("claude");
45            cmd.env("PATH", super::claude_path::shell_path());
46            cmd.arg("-p")
47                .arg("--output-format")
48                .arg("stream-json")
49                .arg("--verbose")
50                .arg("--include-partial-messages");
51
52            if disable_all_tools {
53                // Pure conversation mode: no tools, no permissions bypass.
54                cmd.arg("--allowedTools").arg("");
55            } else {
56                // Only grant full permissions when tools are enabled.
57                cmd.arg("--dangerously-skip-permissions");
58                if disable_builtin_tools {
59                    cmd.arg("--disallowedTools")
60                        .arg("Edit")
61                        .arg("Write")
62                        .arg("NotebookEdit");
63                }
64            }
65
66            if let Some(ref m) = model {
67                cmd.arg("--model").arg(m);
68            }
69            if let Some(ref e) = effort {
70                cmd.arg("--effort").arg(e);
71            }
72            if let Some(ref sp) = system_prompt {
73                cmd.arg("--system-prompt").arg(sp);
74            }
75            if let Some(ref mcp) = mcp_config {
76                cmd.arg("--mcp-config").arg(mcp);
77            }
78            if let Some(ref session_id) = resume_session_id {
79                cmd.arg("--resume").arg(session_id);
80            }
81
82            // Prevent nested Claude Code session detection.
83            cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
84            cmd.env_remove("CLAUDECODE");
85
86            if let Some(ref dir) = working_dir {
87                if !dir.is_dir() {
88                    let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
89                        "Working directory not found: {}. Was it deleted?",
90                        dir.display()
91                    ))));
92                    return;
93                }
94                cmd.current_dir(dir);
95            }
96
97            cmd.stdout(Stdio::piped());
98            cmd.stderr(Stdio::piped());
99            cmd.stdin(Stdio::piped());
100
101            let mut child = match cmd.spawn() {
102                Ok(c) => c,
103                Err(e) => {
104                    let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
105                        "Failed to spawn claude: {e}"
106                    ))));
107                    return;
108                }
109            };
110
111            if let Some(mut stdin) = child.stdin.take() {
112                if let Err(e) = stdin.write_all(prompt.as_bytes()).await {
113                    let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
114                        "Failed to write prompt to stdin: {e}"
115                    ))));
116                    return;
117                }
118                drop(stdin);
119            }
120
121            if let Some(pid) = child.id() {
122                let _ = tx.send(SessionUpdate::ProcessPid(pid));
123            }
124            let _ = tx.send(SessionUpdate::Status(SessionStatus::Running));
125            eprintln!("[keel:session] claude process started, reading output");
126
127            let stdout = child.stdout.take();
128            let stderr = child.stderr.take();
129
130            // Spawn stdout and stderr readers into a supervised JoinSet.
131            // `Option<Vec<String>>` is Some for stderr (collected lines) and None for stdout.
132            let mut io_set: JoinSet<Option<Vec<String>>> = JoinSet::new();
133
134            if let Some(stderr) = stderr {
135                let tx2 = tx.clone();
136                io_set.spawn(async move {
137                    Some(session_io::read_stderr_lines(stderr, tx2).await)
138                });
139            }
140            if let Some(stdout) = stdout {
141                let tx2 = tx.clone();
142                io_set.spawn(async move {
143                    session_io::read_stdout_events(stdout, tx2).await;
144                    None
145                });
146            }
147
148            // Drive I/O tasks; catch panics before they can corrupt shared state.
149            let mut stderr_lines = Vec::new();
150            while let Some(result) = io_set.join_next().await {
151                match result {
152                    Ok(Some(lines)) => stderr_lines = lines,
153                    Ok(None) => {}
154                    Err(e) => {
155                        let msg = format!("I/O reader panicked: {e:?}");
156                        eprintln!("[keel:session] {msg}");
157                        let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(msg)));
158                        let _ = child.kill().await;
159                        return;
160                    }
161                }
162            }
163
164            eprintln!("[keel:session] stdout closed, waiting for process exit");
165            match child.wait().await {
166                Ok(status) => {
167                    eprintln!("[keel:session] process exited with {status}");
168                    if status.success() {
169                        let _ = tx.send(SessionUpdate::Status(SessionStatus::Completed));
170                    } else {
171                        let stderr_summary = if stderr_lines.is_empty() {
172                            "no stderr output captured".to_string()
173                        } else {
174                            stderr_lines.join("\n")
175                        };
176                        let _ = tx.send(SessionUpdate::Feed(FeedItem::ToolResult {
177                            content: format!("Process stderr:\n{stderr_summary}"),
178                            is_error: true,
179                        }));
180                        let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
181                            "claude exited with {status}"
182                        ))));
183                    }
184                }
185                Err(e) => {
186                    let _ = tx.send(SessionUpdate::Status(SessionStatus::Error(format!(
187                        "Failed to wait for claude: {e}"
188                    ))));
189                }
190            }
191        });
192
193        let session_handle = SessionHandle { _task: handle };
194        (rx, session_handle)
195    }
196}
197
198/// Updates sent from the session manager to consumers (monitors, pumps).
199#[derive(Debug, Clone)]
200pub enum SessionUpdate {
201    Status(SessionStatus),
202    SessionId(String),
203    Feed(FeedItem),
204    ProcessPid(u32),
205}