Skip to main content

taskers_core/
pane_runtime.rs

1use std::{
2    collections::HashMap,
3    path::PathBuf,
4    sync::{Arc, Mutex},
5    thread,
6};
7
8use anyhow::{Context, Result, anyhow};
9use taskers_control::{ControlCommand, InMemoryController};
10use taskers_domain::{AppModel, PaneId, PaneKind, SurfaceId, WorkspaceId};
11use taskers_runtime::{CommandSpec, PtySession, ShellLaunchSpec, SignalStreamParser};
12
13const MAX_OUTPUT_CHARS: usize = 24_000;
14
15#[derive(Debug, Clone)]
16pub struct PaneRuntimeSnapshot {
17    pub output: String,
18    pub process_id: Option<u32>,
19}
20
21#[derive(Clone)]
22pub struct RuntimeManager {
23    enabled: bool,
24    controller: InMemoryController,
25    shell_launch: ShellLaunchSpec,
26    inner: Arc<Mutex<RuntimeManagerInner>>,
27}
28
29struct RuntimeManagerInner {
30    surfaces: HashMap<SurfaceId, PaneRuntime>,
31}
32
33struct PaneRuntime {
34    session: Arc<Mutex<PtySession>>,
35    output: Arc<Mutex<String>>,
36    process_id: Option<u32>,
37}
38
39impl RuntimeManager {
40    pub fn new(
41        controller: InMemoryController,
42        enabled: bool,
43        shell_launch: ShellLaunchSpec,
44    ) -> Self {
45        Self {
46            enabled,
47            controller,
48            shell_launch,
49            inner: Arc::new(Mutex::new(RuntimeManagerInner {
50                surfaces: HashMap::new(),
51            })),
52        }
53    }
54
55    pub fn sync_model(&self, model: &AppModel) -> Result<()> {
56        if !self.enabled {
57            return Ok(());
58        }
59
60        let model_surface_ids: std::collections::HashSet<SurfaceId> = model
61            .workspaces
62            .values()
63            .flat_map(|ws| {
64                ws.panes
65                    .values()
66                    .flat_map(|pane| pane.surface_ids())
67                    .collect::<Vec<_>>()
68            })
69            .collect();
70
71        {
72            let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
73            inner
74                .surfaces
75                .retain(|id, _| model_surface_ids.contains(id));
76        }
77
78        for (workspace_id, workspace) in &model.workspaces {
79            for pane in workspace.panes.values() {
80                for surface in pane.surfaces.values() {
81                    if surface.kind != PaneKind::Terminal {
82                        continue;
83                    }
84
85                    let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
86                    if inner.surfaces.contains_key(&surface.id) {
87                        continue;
88                    }
89
90                    let runtime = spawn_surface_runtime(
91                        self.controller.clone(),
92                        self.shell_launch.clone(),
93                        *workspace_id,
94                        pane.id,
95                        surface.id,
96                        surface.metadata.cwd.as_deref().map(PathBuf::from),
97                    )
98                    .with_context(|| {
99                        format!("failed to spawn shell runtime for surface {}", surface.id)
100                    })?;
101                    inner.surfaces.insert(surface.id, runtime);
102                }
103            }
104        }
105
106        Ok(())
107    }
108
109    pub fn snapshot(&self, surface_id: SurfaceId) -> Option<PaneRuntimeSnapshot> {
110        if !self.enabled {
111            return None;
112        }
113
114        let inner = self.inner.lock().expect("runtime manager mutex poisoned");
115        let runtime = inner.surfaces.get(&surface_id)?;
116        let output = runtime
117            .output
118            .lock()
119            .expect("pane output mutex poisoned")
120            .clone();
121
122        Some(PaneRuntimeSnapshot {
123            output,
124            process_id: runtime.process_id,
125        })
126    }
127
128    pub fn send_input(&self, surface_id: SurfaceId, input: &str) -> Result<()> {
129        if !self.enabled {
130            return Err(anyhow!("surface {surface_id} is using the Ghostty backend"));
131        }
132
133        let session = {
134            let inner = self.inner.lock().expect("runtime manager mutex poisoned");
135            inner
136                .surfaces
137                .get(&surface_id)
138                .map(|runtime| Arc::clone(&runtime.session))
139                .ok_or_else(|| anyhow!("surface {surface_id} has no live runtime"))?
140        };
141
142        let mut session = session.lock().expect("pty session mutex poisoned");
143        session
144            .write_all(input.as_bytes())
145            .with_context(|| format!("failed to send input to surface {surface_id}"))?;
146        Ok(())
147    }
148}
149
150fn spawn_surface_runtime(
151    controller: InMemoryController,
152    shell_launch: ShellLaunchSpec,
153    workspace_id: WorkspaceId,
154    pane_id: PaneId,
155    surface_id: SurfaceId,
156    cwd: Option<PathBuf>,
157) -> Result<PaneRuntime> {
158    let mut spec = CommandSpec::new(shell_launch.program.display().to_string());
159    spec.args = shell_launch.args;
160    spec.cwd = cwd;
161    spec.env.extend(shell_launch.env);
162    spec.env
163        .entry("TERM".into())
164        .or_insert_with(|| "xterm-256color".into());
165    spec.env
166        .insert("TASKERS_PANE_ID".into(), pane_id.to_string());
167    spec.env
168        .insert("TASKERS_WORKSPACE_ID".into(), workspace_id.to_string());
169    spec.env
170        .insert("TASKERS_SURFACE_ID".into(), surface_id.to_string());
171
172    let spawned = PtySession::spawn(&spec)?;
173    let process_id = spawned.session.process_id();
174    let output = Arc::new(Mutex::new(String::new()));
175    let session = Arc::new(Mutex::new(spawned.session));
176
177    let reader_output = Arc::clone(&output);
178    thread::spawn(move || {
179        let mut reader = spawned.reader;
180        let mut buffer = [0u8; 4096];
181        let mut signal_parser = SignalStreamParser::default();
182
183        loop {
184            match reader.read_into(&mut buffer) {
185                Ok(0) => {
186                    let _ = controller.handle(ControlCommand::CloseSurface {
187                        workspace_id,
188                        pane_id,
189                        surface_id,
190                    });
191                    break;
192                }
193                Ok(bytes_read) => {
194                    let chunk = String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
195                    let clean = sanitize_terminal_output(&chunk);
196                    if !clean.is_empty() {
197                        append_output(&reader_output, &clean);
198                    }
199
200                    for signal in signal_parser.push(&chunk) {
201                        let _ = controller.handle(ControlCommand::EmitSignal {
202                            workspace_id,
203                            pane_id,
204                            surface_id: Some(surface_id),
205                            event: signal.clone().into_event("pty"),
206                        });
207                    }
208                }
209                Err(_) => {
210                    let _ = controller.handle(ControlCommand::CloseSurface {
211                        workspace_id,
212                        pane_id,
213                        surface_id,
214                    });
215                    break;
216                }
217            }
218        }
219    });
220
221    Ok(PaneRuntime {
222        session,
223        output,
224        process_id,
225    })
226}
227
228fn append_output(output: &Arc<Mutex<String>>, chunk: &str) {
229    let mut output = output.lock().expect("pane output mutex poisoned");
230    output.push_str(chunk);
231
232    if output.chars().count() > MAX_OUTPUT_CHARS {
233        let trimmed = output
234            .chars()
235            .rev()
236            .take(MAX_OUTPUT_CHARS)
237            .collect::<Vec<_>>();
238        *output = trimmed.into_iter().rev().collect();
239    }
240}
241
242fn sanitize_terminal_output(input: &str) -> String {
243    let bytes = input.as_bytes();
244    let mut result = String::with_capacity(input.len());
245    let mut index = 0usize;
246
247    while index < bytes.len() {
248        match bytes[index] {
249            b'\r' => {
250                index += 1;
251            }
252            0x1b => {
253                index += 1;
254                if index >= bytes.len() {
255                    break;
256                }
257
258                match bytes[index] {
259                    b'[' => {
260                        index += 1;
261                        while index < bytes.len() {
262                            let byte = bytes[index];
263                            index += 1;
264                            if (0x40..=0x7e).contains(&byte) {
265                                break;
266                            }
267                        }
268                    }
269                    b']' => {
270                        index += 1;
271                        while index < bytes.len() {
272                            let byte = bytes[index];
273                            index += 1;
274                            if byte == 0x07 {
275                                break;
276                            }
277                            if byte == 0x1b && bytes.get(index) == Some(&b'\\') {
278                                index += 1;
279                                break;
280                            }
281                        }
282                    }
283                    _ => {
284                        index += 1;
285                    }
286                }
287            }
288            byte if byte.is_ascii_control() && byte != b'\n' && byte != b'\t' => {
289                index += 1;
290            }
291            _ => {
292                let start = index;
293                index += 1;
294                while index < bytes.len()
295                    && bytes[index] != 0x1b
296                    && bytes[index] != b'\r'
297                    && (!bytes[index].is_ascii_control()
298                        || bytes[index] == b'\n'
299                        || bytes[index] == b'\t')
300                {
301                    index += 1;
302                }
303                result.push_str(&input[start..index]);
304            }
305        }
306    }
307
308    result
309}
310
311#[cfg(test)]
312mod tests {
313    use super::sanitize_terminal_output;
314
315    #[test]
316    fn strips_control_sequences_from_terminal_output() {
317        let input = "\u{1b}[31mhello\u{1b}[0m\r\nworld\u{1b}]2;title\u{7}";
318        assert_eq!(sanitize_terminal_output(input), "hello\nworld");
319    }
320}