1use std::{
2 collections::HashMap,
3 path::PathBuf,
4 sync::{Arc, Mutex},
5 thread,
6};
7
8use anyhow::{Context, Result, anyhow};
9use taskers_control::{ControlCommand, InMemoryController};
10use taskers_domain::{
11 AgentTarget, AppModel, AttentionState, PaneId, PaneKind, SignalKind, SurfaceId, WorkspaceId,
12};
13use taskers_runtime::{
14 CommandSpec, ParsedTerminalEvent, PtySession, ShellLaunchSpec, SignalStreamParser,
15};
16
17const MAX_OUTPUT_CHARS: usize = 24_000;
18
19#[derive(Debug, Clone)]
20pub struct PaneRuntimeSnapshot {
21 pub output: String,
22 pub process_id: Option<u32>,
23}
24
25#[derive(Clone)]
26pub struct RuntimeManager {
27 enabled: bool,
28 controller: InMemoryController,
29 shell_launch: ShellLaunchSpec,
30 inner: Arc<Mutex<RuntimeManagerInner>>,
31}
32
33struct RuntimeManagerInner {
34 surfaces: HashMap<SurfaceId, PaneRuntime>,
35}
36
37struct PaneRuntime {
38 session: Arc<Mutex<PtySession>>,
39 output: Arc<Mutex<String>>,
40 process_id: Option<u32>,
41}
42
43impl RuntimeManager {
44 pub fn new(
45 controller: InMemoryController,
46 enabled: bool,
47 shell_launch: ShellLaunchSpec,
48 ) -> Self {
49 Self {
50 enabled,
51 controller,
52 shell_launch,
53 inner: Arc::new(Mutex::new(RuntimeManagerInner {
54 surfaces: HashMap::new(),
55 })),
56 }
57 }
58
59 pub fn sync_model(&self, model: &AppModel) -> Result<()> {
60 if !self.enabled {
61 return Ok(());
62 }
63
64 let model_surface_ids: std::collections::HashSet<SurfaceId> = model
65 .workspaces
66 .values()
67 .flat_map(|ws| {
68 ws.panes
69 .values()
70 .flat_map(|pane| pane.surface_ids())
71 .collect::<Vec<_>>()
72 })
73 .collect();
74
75 {
76 let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
77 inner
78 .surfaces
79 .retain(|id, _| model_surface_ids.contains(id));
80 }
81
82 for (workspace_id, workspace) in &model.workspaces {
83 for pane in workspace.panes.values() {
84 for surface in pane.surfaces.values() {
85 if surface.kind != PaneKind::Terminal {
86 continue;
87 }
88
89 let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
90 if inner.surfaces.contains_key(&surface.id) {
91 continue;
92 }
93
94 let runtime = spawn_surface_runtime(
95 self.controller.clone(),
96 self.shell_launch.clone(),
97 *workspace_id,
98 pane.id,
99 surface.id,
100 surface.metadata.cwd.as_deref().map(PathBuf::from),
101 )
102 .with_context(|| {
103 format!("failed to spawn shell runtime for surface {}", surface.id)
104 })?;
105 inner.surfaces.insert(surface.id, runtime);
106 }
107 }
108 }
109
110 Ok(())
111 }
112
113 pub fn snapshot(&self, surface_id: SurfaceId) -> Option<PaneRuntimeSnapshot> {
114 if !self.enabled {
115 return None;
116 }
117
118 let inner = self.inner.lock().expect("runtime manager mutex poisoned");
119 let runtime = inner.surfaces.get(&surface_id)?;
120 let output = runtime
121 .output
122 .lock()
123 .expect("pane output mutex poisoned")
124 .clone();
125
126 Some(PaneRuntimeSnapshot {
127 output,
128 process_id: runtime.process_id,
129 })
130 }
131
132 pub fn send_input(&self, surface_id: SurfaceId, input: &str) -> Result<()> {
133 if !self.enabled {
134 return Err(anyhow!("surface {surface_id} is using the Ghostty backend"));
135 }
136
137 let session = {
138 let inner = self.inner.lock().expect("runtime manager mutex poisoned");
139 inner
140 .surfaces
141 .get(&surface_id)
142 .map(|runtime| Arc::clone(&runtime.session))
143 .ok_or_else(|| anyhow!("surface {surface_id} has no live runtime"))?
144 };
145
146 let mut session = session.lock().expect("pty session mutex poisoned");
147 session
148 .write_all(input.as_bytes())
149 .with_context(|| format!("failed to send input to surface {surface_id}"))?;
150 Ok(())
151 }
152}
153
154fn spawn_surface_runtime(
155 controller: InMemoryController,
156 shell_launch: ShellLaunchSpec,
157 workspace_id: WorkspaceId,
158 pane_id: PaneId,
159 surface_id: SurfaceId,
160 cwd: Option<PathBuf>,
161) -> Result<PaneRuntime> {
162 let mut spec = CommandSpec::new(shell_launch.program.display().to_string());
163 spec.args = shell_launch.args;
164 spec.cwd = cwd;
165 spec.env.extend(shell_launch.env);
166 spec.env
167 .entry("TERM".into())
168 .or_insert_with(|| "xterm-256color".into());
169 spec.env
170 .insert("TASKERS_PANE_ID".into(), pane_id.to_string());
171 spec.env
172 .insert("TASKERS_WORKSPACE_ID".into(), workspace_id.to_string());
173 spec.env
174 .insert("TASKERS_SURFACE_ID".into(), surface_id.to_string());
175
176 let spawned = PtySession::spawn(&spec)?;
177 let process_id = spawned.session.process_id();
178 let output = Arc::new(Mutex::new(String::new()));
179 let session = Arc::new(Mutex::new(spawned.session));
180
181 let reader_output = Arc::clone(&output);
182 thread::spawn(move || {
183 let mut reader = spawned.reader;
184 let mut buffer = [0u8; 4096];
185 let mut signal_parser = SignalStreamParser::default();
186
187 loop {
188 match reader.read_into(&mut buffer) {
189 Ok(0) => {
190 let _ = controller.handle(ControlCommand::CloseSurface {
191 workspace_id,
192 pane_id,
193 surface_id,
194 });
195 break;
196 }
197 Ok(bytes_read) => {
198 let chunk = String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
199 let clean = sanitize_terminal_output(&chunk);
200 if !clean.is_empty() {
201 append_output(&reader_output, &clean);
202 }
203
204 for event in signal_parser.push_events(&chunk) {
205 match event {
206 ParsedTerminalEvent::Signal(signal) => {
207 let _ = controller.handle(ControlCommand::EmitSignal {
208 workspace_id,
209 pane_id,
210 surface_id: Some(surface_id),
211 event: signal.into_event("pty"),
212 });
213 }
214 ParsedTerminalEvent::Notification(notification) => {
215 let _ =
216 controller.handle(ControlCommand::AgentCreateNotification {
217 target: AgentTarget::Surface {
218 workspace_id,
219 pane_id,
220 surface_id,
221 },
222 kind: SignalKind::Notification,
223 title: notification.title,
224 subtitle: notification.subtitle,
225 external_id: notification.external_id,
226 message: notification.body.unwrap_or_default(),
227 state: AttentionState::WaitingInput,
228 });
229 }
230 }
231 }
232 }
233 Err(_) => {
234 let _ = controller.handle(ControlCommand::CloseSurface {
235 workspace_id,
236 pane_id,
237 surface_id,
238 });
239 break;
240 }
241 }
242 }
243 });
244
245 Ok(PaneRuntime {
246 session,
247 output,
248 process_id,
249 })
250}
251
252fn append_output(output: &Arc<Mutex<String>>, chunk: &str) {
253 let mut output = output.lock().expect("pane output mutex poisoned");
254 output.push_str(chunk);
255
256 if output.chars().count() > MAX_OUTPUT_CHARS {
257 let trimmed = output
258 .chars()
259 .rev()
260 .take(MAX_OUTPUT_CHARS)
261 .collect::<Vec<_>>();
262 *output = trimmed.into_iter().rev().collect();
263 }
264}
265
266fn sanitize_terminal_output(input: &str) -> String {
267 let bytes = input.as_bytes();
268 let mut result = String::with_capacity(input.len());
269 let mut index = 0usize;
270
271 while index < bytes.len() {
272 match bytes[index] {
273 b'\r' => {
274 index += 1;
275 }
276 0x1b => {
277 index += 1;
278 if index >= bytes.len() {
279 break;
280 }
281
282 match bytes[index] {
283 b'[' => {
284 index += 1;
285 while index < bytes.len() {
286 let byte = bytes[index];
287 index += 1;
288 if (0x40..=0x7e).contains(&byte) {
289 break;
290 }
291 }
292 }
293 b']' => {
294 index += 1;
295 while index < bytes.len() {
296 let byte = bytes[index];
297 index += 1;
298 if byte == 0x07 {
299 break;
300 }
301 if byte == 0x1b && bytes.get(index) == Some(&b'\\') {
302 index += 1;
303 break;
304 }
305 }
306 }
307 _ => {
308 index += 1;
309 }
310 }
311 }
312 byte if byte.is_ascii_control() && byte != b'\n' && byte != b'\t' => {
313 index += 1;
314 }
315 _ => {
316 let start = index;
317 index += 1;
318 while index < bytes.len()
319 && bytes[index] != 0x1b
320 && bytes[index] != b'\r'
321 && (!bytes[index].is_ascii_control()
322 || bytes[index] == b'\n'
323 || bytes[index] == b'\t')
324 {
325 index += 1;
326 }
327 result.push_str(&input[start..index]);
328 }
329 }
330 }
331
332 result
333}
334
335#[cfg(test)]
336mod tests {
337 use super::sanitize_terminal_output;
338
339 #[test]
340 fn strips_control_sequences_from_terminal_output() {
341 let input = "\u{1b}[31mhello\u{1b}[0m\r\nworld\u{1b}]2;title\u{7}";
342 assert_eq!(sanitize_terminal_output(input), "hello\nworld");
343 }
344}