taskers_core/
pane_runtime.rs1use std::{
2 collections::HashMap,
3 path::PathBuf,
4 sync::{Arc, Mutex},
5 thread,
6};
7
8use anyhow::{Context, Result, anyhow};
9use taskers_control::{ControlCommand, InMemoryController};
10use taskers_domain::{AppModel, PaneId, PaneKind, SurfaceId, WorkspaceId};
11use taskers_runtime::{CommandSpec, PtySession, ShellLaunchSpec, SignalStreamParser};
12
13const MAX_OUTPUT_CHARS: usize = 24_000;
14
15#[derive(Debug, Clone)]
16pub struct PaneRuntimeSnapshot {
17 pub output: String,
18 pub process_id: Option<u32>,
19}
20
21#[derive(Clone)]
22pub struct RuntimeManager {
23 enabled: bool,
24 controller: InMemoryController,
25 shell_launch: ShellLaunchSpec,
26 inner: Arc<Mutex<RuntimeManagerInner>>,
27}
28
29struct RuntimeManagerInner {
30 surfaces: HashMap<SurfaceId, PaneRuntime>,
31}
32
33struct PaneRuntime {
34 session: Arc<Mutex<PtySession>>,
35 output: Arc<Mutex<String>>,
36 process_id: Option<u32>,
37}
38
39impl RuntimeManager {
40 pub fn new(
41 controller: InMemoryController,
42 enabled: bool,
43 shell_launch: ShellLaunchSpec,
44 ) -> Self {
45 Self {
46 enabled,
47 controller,
48 shell_launch,
49 inner: Arc::new(Mutex::new(RuntimeManagerInner {
50 surfaces: HashMap::new(),
51 })),
52 }
53 }
54
55 pub fn sync_model(&self, model: &AppModel) -> Result<()> {
56 if !self.enabled {
57 return Ok(());
58 }
59
60 let model_surface_ids: std::collections::HashSet<SurfaceId> = model
61 .workspaces
62 .values()
63 .flat_map(|ws| {
64 ws.panes
65 .values()
66 .flat_map(|pane| pane.surface_ids())
67 .collect::<Vec<_>>()
68 })
69 .collect();
70
71 {
72 let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
73 inner
74 .surfaces
75 .retain(|id, _| model_surface_ids.contains(id));
76 }
77
78 for (workspace_id, workspace) in &model.workspaces {
79 for pane in workspace.panes.values() {
80 for surface in pane.surfaces.values() {
81 if surface.kind != PaneKind::Terminal {
82 continue;
83 }
84
85 let mut inner = self.inner.lock().expect("runtime manager mutex poisoned");
86 if inner.surfaces.contains_key(&surface.id) {
87 continue;
88 }
89
90 let runtime = spawn_surface_runtime(
91 self.controller.clone(),
92 self.shell_launch.clone(),
93 *workspace_id,
94 pane.id,
95 surface.id,
96 surface.metadata.cwd.as_deref().map(PathBuf::from),
97 )
98 .with_context(|| {
99 format!("failed to spawn shell runtime for surface {}", surface.id)
100 })?;
101 inner.surfaces.insert(surface.id, runtime);
102 }
103 }
104 }
105
106 Ok(())
107 }
108
109 pub fn snapshot(&self, surface_id: SurfaceId) -> Option<PaneRuntimeSnapshot> {
110 if !self.enabled {
111 return None;
112 }
113
114 let inner = self.inner.lock().expect("runtime manager mutex poisoned");
115 let runtime = inner.surfaces.get(&surface_id)?;
116 let output = runtime
117 .output
118 .lock()
119 .expect("pane output mutex poisoned")
120 .clone();
121
122 Some(PaneRuntimeSnapshot {
123 output,
124 process_id: runtime.process_id,
125 })
126 }
127
128 pub fn send_input(&self, surface_id: SurfaceId, input: &str) -> Result<()> {
129 if !self.enabled {
130 return Err(anyhow!("surface {surface_id} is using the Ghostty backend"));
131 }
132
133 let session = {
134 let inner = self.inner.lock().expect("runtime manager mutex poisoned");
135 inner
136 .surfaces
137 .get(&surface_id)
138 .map(|runtime| Arc::clone(&runtime.session))
139 .ok_or_else(|| anyhow!("surface {surface_id} has no live runtime"))?
140 };
141
142 let mut session = session.lock().expect("pty session mutex poisoned");
143 session
144 .write_all(input.as_bytes())
145 .with_context(|| format!("failed to send input to surface {surface_id}"))?;
146 Ok(())
147 }
148}
149
150fn spawn_surface_runtime(
151 controller: InMemoryController,
152 shell_launch: ShellLaunchSpec,
153 workspace_id: WorkspaceId,
154 pane_id: PaneId,
155 surface_id: SurfaceId,
156 cwd: Option<PathBuf>,
157) -> Result<PaneRuntime> {
158 let mut spec = CommandSpec::new(shell_launch.program.display().to_string());
159 spec.args = shell_launch.args;
160 spec.cwd = cwd;
161 spec.env.extend(shell_launch.env);
162 spec.env
163 .entry("TERM".into())
164 .or_insert_with(|| "xterm-256color".into());
165 spec.env
166 .insert("TASKERS_PANE_ID".into(), pane_id.to_string());
167 spec.env
168 .insert("TASKERS_WORKSPACE_ID".into(), workspace_id.to_string());
169 spec.env
170 .insert("TASKERS_SURFACE_ID".into(), surface_id.to_string());
171
172 let spawned = PtySession::spawn(&spec)?;
173 let process_id = spawned.session.process_id();
174 let output = Arc::new(Mutex::new(String::new()));
175 let session = Arc::new(Mutex::new(spawned.session));
176
177 let reader_output = Arc::clone(&output);
178 thread::spawn(move || {
179 let mut reader = spawned.reader;
180 let mut buffer = [0u8; 4096];
181 let mut signal_parser = SignalStreamParser::default();
182
183 loop {
184 match reader.read_into(&mut buffer) {
185 Ok(0) => {
186 let _ = controller.handle(ControlCommand::CloseSurface {
187 workspace_id,
188 pane_id,
189 surface_id,
190 });
191 break;
192 }
193 Ok(bytes_read) => {
194 let chunk = String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
195 let clean = sanitize_terminal_output(&chunk);
196 if !clean.is_empty() {
197 append_output(&reader_output, &clean);
198 }
199
200 for signal in signal_parser.push(&chunk) {
201 let _ = controller.handle(ControlCommand::EmitSignal {
202 workspace_id,
203 pane_id,
204 surface_id: Some(surface_id),
205 event: signal.clone().into_event("pty"),
206 });
207 }
208 }
209 Err(_) => {
210 let _ = controller.handle(ControlCommand::CloseSurface {
211 workspace_id,
212 pane_id,
213 surface_id,
214 });
215 break;
216 }
217 }
218 }
219 });
220
221 Ok(PaneRuntime {
222 session,
223 output,
224 process_id,
225 })
226}
227
228fn append_output(output: &Arc<Mutex<String>>, chunk: &str) {
229 let mut output = output.lock().expect("pane output mutex poisoned");
230 output.push_str(chunk);
231
232 if output.chars().count() > MAX_OUTPUT_CHARS {
233 let trimmed = output
234 .chars()
235 .rev()
236 .take(MAX_OUTPUT_CHARS)
237 .collect::<Vec<_>>();
238 *output = trimmed.into_iter().rev().collect();
239 }
240}
241
242fn sanitize_terminal_output(input: &str) -> String {
243 let bytes = input.as_bytes();
244 let mut result = String::with_capacity(input.len());
245 let mut index = 0usize;
246
247 while index < bytes.len() {
248 match bytes[index] {
249 b'\r' => {
250 index += 1;
251 }
252 0x1b => {
253 index += 1;
254 if index >= bytes.len() {
255 break;
256 }
257
258 match bytes[index] {
259 b'[' => {
260 index += 1;
261 while index < bytes.len() {
262 let byte = bytes[index];
263 index += 1;
264 if (0x40..=0x7e).contains(&byte) {
265 break;
266 }
267 }
268 }
269 b']' => {
270 index += 1;
271 while index < bytes.len() {
272 let byte = bytes[index];
273 index += 1;
274 if byte == 0x07 {
275 break;
276 }
277 if byte == 0x1b && bytes.get(index) == Some(&b'\\') {
278 index += 1;
279 break;
280 }
281 }
282 }
283 _ => {
284 index += 1;
285 }
286 }
287 }
288 byte if byte.is_ascii_control() && byte != b'\n' && byte != b'\t' => {
289 index += 1;
290 }
291 _ => {
292 let start = index;
293 index += 1;
294 while index < bytes.len()
295 && bytes[index] != 0x1b
296 && bytes[index] != b'\r'
297 && (!bytes[index].is_ascii_control()
298 || bytes[index] == b'\n'
299 || bytes[index] == b'\t')
300 {
301 index += 1;
302 }
303 result.push_str(&input[start..index]);
304 }
305 }
306 }
307
308 result
309}
310
311#[cfg(test)]
312mod tests {
313 use super::sanitize_terminal_output;
314
315 #[test]
316 fn strips_control_sequences_from_terminal_output() {
317 let input = "\u{1b}[31mhello\u{1b}[0m\r\nworld\u{1b}]2;title\u{7}";
318 assert_eq!(sanitize_terminal_output(input), "hello\nworld");
319 }
320}