Skip to main content

taskers_core/
pane_runtime.rs

1use std::{
2    collections::HashMap,
3    path::PathBuf,
4    sync::{Arc, Mutex},
5    thread,
6};
7
8use anyhow::{Context, Result, anyhow};
9use taskers_control::{ControlCommand, InMemoryController};
10use taskers_domain::{
11    AgentTarget, AppModel, AttentionState, PaneId, PaneKind, SignalKind, SurfaceId, WorkspaceId,
12};
13use taskers_runtime::{
14    CommandSpec, ParsedTerminalEvent, PtySession, ShellLaunchSpec, SignalStreamParser,
15};
16
17const MAX_OUTPUT_CHARS: usize = 24_000;
18
19#[derive(Debug, Clone)]
20pub struct PaneRuntimeSnapshot {
21    pub output: String,
22    pub process_id: Option<u32>,
23}
24
25#[derive(Clone)]
26pub struct RuntimeManager {
27    enabled: bool,
28    controller: InMemoryController,
29    shell_launch: ShellLaunchSpec,
30    inner: Arc<Mutex<RuntimeManagerInner>>,
31}
32
33struct RuntimeManagerInner {
34    surfaces: HashMap<SurfaceId, PaneRuntime>,
35}
36
37struct PaneRuntime {
38    session: Arc<Mutex<PtySession>>,
39    output: Arc<Mutex<String>>,
40    process_id: Option<u32>,
41}
42
43impl RuntimeManager {
44    pub fn new(
45        controller: InMemoryController,
46        enabled: bool,
47        shell_launch: ShellLaunchSpec,
48    ) -> Self {
49        Self {
50            enabled,
51            controller,
52            shell_launch,
53            inner: Arc::new(Mutex::new(RuntimeManagerInner {
54                surfaces: HashMap::new(),
55            })),
56        }
57    }
58
59    pub fn sync_model(&self, model: &AppModel) -> Result<()> {
60        if !self.enabled {
61            return Ok(());
62        }
63
64        let model_surface_ids: std::collections::HashSet<SurfaceId> = model
65            .workspaces
66            .values()
67            .flat_map(|ws| {
68                ws.panes
69                    .values()
70                    .flat_map(|pane| pane.surface_ids())
71                    .collect::<Vec<_>>()
72            })
73            .collect();
74
75        {
76            let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
77            inner
78                .surfaces
79                .retain(|id, _| model_surface_ids.contains(id));
80        }
81
82        for (workspace_id, workspace) in &model.workspaces {
83            for pane in workspace.panes.values() {
84                for surface in pane.surfaces.values() {
85                    if surface.kind != PaneKind::Terminal {
86                        continue;
87                    }
88
89                    let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
90                    if inner.surfaces.contains_key(&surface.id) {
91                        continue;
92                    }
93
94                    let runtime = spawn_surface_runtime(
95                        self.controller.clone(),
96                        self.shell_launch.clone(),
97                        *workspace_id,
98                        pane.id,
99                        surface.id,
100                        surface.metadata.cwd.as_deref().map(PathBuf::from),
101                    )
102                    .with_context(|| {
103                        format!("failed to spawn shell runtime for surface {}", surface.id)
104                    })?;
105                    inner.surfaces.insert(surface.id, runtime);
106                }
107            }
108        }
109
110        Ok(())
111    }
112
113    pub fn snapshot(&self, surface_id: SurfaceId) -> Option<PaneRuntimeSnapshot> {
114        if !self.enabled {
115            return None;
116        }
117
118        let inner = self.inner.lock().expect("runtime manager mutex poisoned");
119        let runtime = inner.surfaces.get(&surface_id)?;
120        let output = runtime
121            .output
122            .lock()
123            .expect("pane output mutex poisoned")
124            .clone();
125
126        Some(PaneRuntimeSnapshot {
127            output,
128            process_id: runtime.process_id,
129        })
130    }
131
132    pub fn send_input(&self, surface_id: SurfaceId, input: &str) -> Result<()> {
133        if !self.enabled {
134            return Err(anyhow!("surface {surface_id} is using the Ghostty backend"));
135        }
136
137        let session = {
138            let inner = self.inner.lock().expect("runtime manager mutex poisoned");
139            inner
140                .surfaces
141                .get(&surface_id)
142                .map(|runtime| Arc::clone(&runtime.session))
143                .ok_or_else(|| anyhow!("surface {surface_id} has no live runtime"))?
144        };
145
146        let mut session = session.lock().expect("pty session mutex poisoned");
147        session
148            .write_all(input.as_bytes())
149            .with_context(|| format!("failed to send input to surface {surface_id}"))?;
150        Ok(())
151    }
152}
153
154fn spawn_surface_runtime(
155    controller: InMemoryController,
156    shell_launch: ShellLaunchSpec,
157    workspace_id: WorkspaceId,
158    pane_id: PaneId,
159    surface_id: SurfaceId,
160    cwd: Option<PathBuf>,
161) -> Result<PaneRuntime> {
162    let mut spec = CommandSpec::new(shell_launch.program.display().to_string());
163    spec.args = shell_launch.args;
164    spec.cwd = cwd;
165    spec.env.extend(shell_launch.env);
166    spec.env
167        .entry("TERM".into())
168        .or_insert_with(|| "xterm-256color".into());
169    spec.env
170        .insert("TASKERS_PANE_ID".into(), pane_id.to_string());
171    spec.env
172        .insert("TASKERS_WORKSPACE_ID".into(), workspace_id.to_string());
173    spec.env
174        .insert("TASKERS_SURFACE_ID".into(), surface_id.to_string());
175
176    let spawned = PtySession::spawn(&spec)?;
177    let process_id = spawned.session.process_id();
178    let output = Arc::new(Mutex::new(String::new()));
179    let session = Arc::new(Mutex::new(spawned.session));
180
181    let reader_output = Arc::clone(&output);
182    thread::spawn(move || {
183        let mut reader = spawned.reader;
184        let mut buffer = [0u8; 4096];
185        let mut signal_parser = SignalStreamParser::default();
186
187        loop {
188            match reader.read_into(&mut buffer) {
189                Ok(0) => {
190                    let _ = controller.handle(ControlCommand::CloseSurface {
191                        workspace_id,
192                        pane_id,
193                        surface_id,
194                    });
195                    break;
196                }
197                Ok(bytes_read) => {
198                    let chunk = String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
199                    let clean = sanitize_terminal_output(&chunk);
200                    if !clean.is_empty() {
201                        append_output(&reader_output, &clean);
202                    }
203
204                    for event in signal_parser.push_events(&chunk) {
205                        match event {
206                            ParsedTerminalEvent::Signal(signal) => {
207                                let _ = controller.handle(ControlCommand::EmitSignal {
208                                    workspace_id,
209                                    pane_id,
210                                    surface_id: Some(surface_id),
211                                    event: signal.into_event("pty"),
212                                });
213                            }
214                            ParsedTerminalEvent::Notification(notification) => {
215                                let _ =
216                                    controller.handle(ControlCommand::AgentCreateNotification {
217                                        target: AgentTarget::Surface {
218                                            workspace_id,
219                                            pane_id,
220                                            surface_id,
221                                        },
222                                        kind: SignalKind::Notification,
223                                        title: notification.title,
224                                        subtitle: notification.subtitle,
225                                        external_id: notification.external_id,
226                                        message: notification.body.unwrap_or_default(),
227                                        state: AttentionState::WaitingInput,
228                                    });
229                            }
230                        }
231                    }
232                }
233                Err(_) => {
234                    let _ = controller.handle(ControlCommand::CloseSurface {
235                        workspace_id,
236                        pane_id,
237                        surface_id,
238                    });
239                    break;
240                }
241            }
242        }
243    });
244
245    Ok(PaneRuntime {
246        session,
247        output,
248        process_id,
249    })
250}
251
252fn append_output(output: &Arc<Mutex<String>>, chunk: &str) {
253    let mut output = output.lock().expect("pane output mutex poisoned");
254    output.push_str(chunk);
255
256    if output.chars().count() > MAX_OUTPUT_CHARS {
257        let trimmed = output
258            .chars()
259            .rev()
260            .take(MAX_OUTPUT_CHARS)
261            .collect::<Vec<_>>();
262        *output = trimmed.into_iter().rev().collect();
263    }
264}
265
266fn sanitize_terminal_output(input: &str) -> String {
267    let bytes = input.as_bytes();
268    let mut result = String::with_capacity(input.len());
269    let mut index = 0usize;
270
271    while index < bytes.len() {
272        match bytes[index] {
273            b'\r' => {
274                index += 1;
275            }
276            0x1b => {
277                index += 1;
278                if index >= bytes.len() {
279                    break;
280                }
281
282                match bytes[index] {
283                    b'[' => {
284                        index += 1;
285                        while index < bytes.len() {
286                            let byte = bytes[index];
287                            index += 1;
288                            if (0x40..=0x7e).contains(&byte) {
289                                break;
290                            }
291                        }
292                    }
293                    b']' => {
294                        index += 1;
295                        while index < bytes.len() {
296                            let byte = bytes[index];
297                            index += 1;
298                            if byte == 0x07 {
299                                break;
300                            }
301                            if byte == 0x1b && bytes.get(index) == Some(&b'\\') {
302                                index += 1;
303                                break;
304                            }
305                        }
306                    }
307                    _ => {
308                        index += 1;
309                    }
310                }
311            }
312            byte if byte.is_ascii_control() && byte != b'\n' && byte != b'\t' => {
313                index += 1;
314            }
315            _ => {
316                let start = index;
317                index += 1;
318                while index < bytes.len()
319                    && bytes[index] != 0x1b
320                    && bytes[index] != b'\r'
321                    && (!bytes[index].is_ascii_control()
322                        || bytes[index] == b'\n'
323                        || bytes[index] == b'\t')
324                {
325                    index += 1;
326                }
327                result.push_str(&input[start..index]);
328            }
329        }
330    }
331
332    result
333}
334
335#[cfg(test)]
336mod tests {
337    use super::sanitize_terminal_output;
338
339    #[test]
340    fn strips_control_sequences_from_terminal_output() {
341        let input = "\u{1b}[31mhello\u{1b}[0m\r\nworld\u{1b}]2;title\u{7}";
342        assert_eq!(sanitize_terminal_output(input), "hello\nworld");
343    }
344}