1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use tokio::io::{self, AsyncRead, AsyncWrite};
7use tokio::sync::Mutex;
8use tracing::{debug, info, warn};
9use aimux_protocol::{
10 read_frame, write_frame, ClientInput, LayoutInfo, PaneId, PaneLayout, Response,
11 ScreenCell, ScreenCellAttrs, ScreenColor, ServerPush, WindowBarInfo, WindowId,
12};
13
14use crate::screen;
15use crate::session::{Session, SessionManager, Window};
16
17impl From<&screen::Color> for ScreenColor {
22 fn from(c: &screen::Color) -> Self {
23 match c {
24 screen::Color::Default => ScreenColor::Default,
25 screen::Color::Indexed(n) => ScreenColor::Indexed(*n),
26 screen::Color::Rgb(r, g, b) => ScreenColor::Rgb(*r, *g, *b),
27 }
28 }
29}
30
31impl From<&screen::CellAttrs> for ScreenCellAttrs {
32 fn from(a: &screen::CellAttrs) -> Self {
33 ScreenCellAttrs {
34 bold: a.bold,
35 dim: a.dim,
36 italic: a.italic,
37 underline: a.underline,
38 fg: ScreenColor::from(&a.fg),
39 bg: ScreenColor::from(&a.bg),
40 }
41 }
42}
43
44impl From<&screen::Cell> for ScreenCell {
45 fn from(c: &screen::Cell) -> Self {
46 ScreenCell {
47 ch: c.ch,
48 attrs: ScreenCellAttrs::from(&c.attrs),
49 }
50 }
51}
52
53fn build_layout_info(
58 session: &Session,
59 window: &Window,
60 terminal_cols: u16,
61 terminal_rows: u16,
62) -> LayoutInfo {
63 let positions = window.compute_pane_positions(terminal_cols, terminal_rows);
64 let active_pane_id = window
65 .panes
66 .get(window.active_pane)
67 .map(|p| p.id.clone())
68 .unwrap_or_default();
69
70 let panes: Vec<PaneLayout> = positions
71 .into_iter()
72 .map(|(pane_id, pos)| PaneLayout {
73 is_active: pane_id == active_pane_id,
74 is_zoomed: false, pane_id,
76 x: pos.x,
77 y: pos.y,
78 width: pos.width,
79 height: pos.height,
80 })
81 .collect();
82
83 let windows: Vec<WindowBarInfo> = session
84 .windows
85 .iter()
86 .enumerate()
87 .map(|(idx, w)| WindowBarInfo {
88 id: w.id,
89 title: format!("{}:{}", w.id, w.panes.first().map(|p| p.title.as_str()).unwrap_or("")),
90 is_active: idx == session.active_window,
91 })
92 .collect();
93
94 LayoutInfo {
95 session: session.name.clone(),
96 window_id: window.id,
97 panes,
98 terminal_cols,
99 terminal_rows,
100 windows,
101 }
102}
103
104fn snapshot_pane_set(_session: &Session, window: &Window) -> (WindowId, Vec<PaneId>) {
106 let pane_ids: Vec<PaneId> = window.panes.iter().map(|p| p.id.clone()).collect();
107 (window.id, pane_ids)
108}
109
110async fn build_snapshot(
115 pane_id: &str,
116 screen_arc: &Arc<tokio::sync::RwLock<screen::Screen>>,
117) -> ServerPush {
118 let screen = screen_arc.read().await;
119 let cells: Vec<Vec<ScreenCell>> = screen
120 .grid()
121 .iter()
122 .map(|row| row.iter().map(ScreenCell::from).collect())
123 .collect();
124 let cursor = screen.cursor();
125 let size = screen.size();
126 ServerPush::ScreenSnapshot {
127 pane_id: pane_id.to_string(),
128 cells,
129 cursor: aimux_protocol::CursorPos {
130 row: cursor.row,
131 col: cursor.col,
132 },
133 size,
134 cursor_visible: screen.cursor_visible(),
135 title: screen.title().to_string(),
136 }
137}
138
139pub async fn handle_attach<S>(
144 pipe: S,
145 manager: Arc<Mutex<SessionManager>>,
146 session_name: &str,
147) -> Result<()>
148where
149 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
150{
151 info!("attach: client connecting to session '{}'", session_name);
152 let default_cols: u16 = 80;
153 let default_rows: u16 = 24;
154
155 let layout = {
157 let mgr = manager.lock().await;
158 let session = mgr
159 .get_session(session_name)
160 .context("session not found")?;
161 let window = session
162 .windows
163 .get(session.active_window)
164 .context("no active window")?;
165 build_layout_info(session, window, default_cols, default_rows)
166 };
167
168 let (mut reader, mut writer) = io::split(pipe);
170 let response = Response::AttachAccepted {
171 layout: layout.clone(),
172 };
173 let bytes = rmp_serde::to_vec(&response).context("serialize AttachAccepted")?;
174 write_frame(&mut writer, &bytes).await.context("send AttachAccepted")?;
175
176 info!("attach: accepted for session '{}' ({} panes)", session_name, layout.panes.len());
177
178 let mut dirty: HashSet<PaneId> = HashSet::new();
180 let mut interval = tokio::time::interval(Duration::from_millis(16));
181 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
182
183 let (mut last_window_id, mut last_pane_ids) = {
185 let mgr = manager.lock().await;
186 let session = match mgr.get_session(session_name) {
187 Some(s) => s,
188 None => {
189 warn!("attach: session '{}' disappeared before streaming started", session_name);
190 let err = ServerPush::Error {
191 message: "session killed".into(),
192 };
193 let _ = write_frame(&mut writer, &rmp_serde::to_vec(&err)?).await;
194 return Ok(());
195 }
196 };
197 let window = &session.windows[session.active_window];
198 snapshot_pane_set(session, window)
199 };
200
201 let mut update_rxs = subscribe_pane_updates(&manager, session_name).await;
203
204 for pane_id in &last_pane_ids {
206 dirty.insert(pane_id.clone());
207 }
208
209 let mut terminal_cols = default_cols;
210 let mut terminal_rows = default_rows;
211
212 let (frame_tx, mut frame_rx) = tokio::sync::mpsc::channel::<Result<Vec<u8>>>(4);
216 tokio::spawn(async move {
217 loop {
218 let result = read_frame(&mut reader).await;
219 let is_err = result.is_err();
220 if frame_tx.send(result).await.is_err() {
221 break; }
223 if is_err {
224 break; }
226 }
227 });
228
229 loop {
231 tokio::select! {
232 _ = interval.tick() => {
233 for (pane_id, rx) in &mut update_rxs {
235 if rx.try_recv().is_ok() {
236 dirty.insert(pane_id.clone());
237 while rx.try_recv().is_ok() {}
239 }
240 }
241
242 let mutation_result = {
244 let mgr = manager.lock().await;
245 match mgr.get_session(session_name) {
246 Some(session) => {
247 if session.windows.is_empty() {
248 None
249 } else {
250 let window = &session.windows[session.active_window];
251 let (wid, pids) = snapshot_pane_set(session, window);
252 Some((wid, pids, session.active_window))
253 }
254 }
255 None => None,
256 }
257 };
258
259 match mutation_result {
260 None => {
261 warn!("attach: session '{}' was killed, disconnecting client", session_name);
263 let msg = ServerPush::Error { message: "session killed".into() };
264 let _ = write_frame(&mut writer, &rmp_serde::to_vec(&msg)?).await;
265 return Ok(());
266 }
267 Some((wid, pids, _active_window_idx)) => {
268 if wid != last_window_id || pids != last_pane_ids {
269 debug!("attach: pane set changed, rebuilding subscriptions");
271 update_rxs = subscribe_pane_updates(&manager, session_name).await;
272 last_window_id = wid;
273 last_pane_ids = pids.clone();
274
275 for pid in &pids {
277 dirty.insert(pid.clone());
278 }
279
280 let layout = {
282 let mgr = manager.lock().await;
283 let session = mgr.get_session(session_name).context("session gone")?;
284 let window = &session.windows[session.active_window];
285 build_layout_info(session, window, terminal_cols, terminal_rows)
286 };
287 let msg = ServerPush::LayoutChanged(layout);
288 let bytes = rmp_serde::to_vec(&msg)?;
289 if write_frame(&mut writer, &bytes).await.is_err() {
290 info!("attach: pipe broken while sending layout change for '{}'", session_name);
291 return Ok(());
292 }
293 }
294 }
295 }
296
297 if !dirty.is_empty() {
299 let pane_ids: Vec<PaneId> = dirty.drain().collect();
300 for pane_id in &pane_ids {
301 let screen_arc = {
302 let mgr = manager.lock().await;
303 match mgr.find_pane(pane_id) {
304 Some((_, _, pane)) => pane.screen.clone(),
305 None => continue,
306 }
307 };
308 let snapshot = build_snapshot(pane_id, &screen_arc).await;
309 let bytes = rmp_serde::to_vec(&snapshot)?;
310 if write_frame(&mut writer, &bytes).await.is_err() {
311 info!("attach: pipe broken while sending snapshot for '{}'", session_name);
312 return Ok(());
313 }
314 }
315 }
316 }
317
318 frame_result = frame_rx.recv() => {
319 let payload = match frame_result {
320 Some(Ok(p)) => p,
321 Some(Err(e)) => {
322 info!("attach: client disconnected from '{}': {}", session_name, e);
323 return Ok(());
324 }
325 None => {
326 info!("attach: client reader closed for '{}'", session_name);
327 return Ok(());
328 }
329 };
330 let input: ClientInput = rmp_serde::from_slice(&payload)
331 .context("deserialize ClientInput")?;
332
333 match input {
334 ClientInput::Keys { data } => {
335 let mgr = manager.lock().await;
336 let session = match mgr.get_session(session_name) {
337 Some(s) => s,
338 None => return Ok(()),
339 };
340 let window = &session.windows[session.active_window];
341 if let Some(pane) = window.panes.get(window.active_pane) {
342 let _ = pane.pty.write(&data);
343 }
344 }
345
346 ClientInput::Resize { cols, rows } => {
347 debug!("attach: client resize {}x{} for '{}'", cols, rows, session_name);
348 terminal_cols = cols;
349 terminal_rows = rows;
350
351 let resize_targets = {
353 let mgr = manager.lock().await;
354 let session = match mgr.get_session(session_name) {
355 Some(s) => s,
356 None => return Ok(()),
357 };
358 let window = &session.windows[session.active_window];
359 let positions = window.compute_pane_positions(cols, rows);
360 positions
361 .into_iter()
362 .map(|(pane_id, pos)| (pane_id, pos.width, pos.height))
363 .collect::<Vec<_>>()
364 };
365
366 for (pane_id, w, h) in &resize_targets {
368 let screen_arc = {
369 let mut mgr = manager.lock().await;
370 match mgr.resize_pane_pty(pane_id, *w, *h) {
371 Ok(s) => s,
372 Err(_) => continue,
373 }
374 };
375 let mut screen = screen_arc.write().await;
376 screen.resize(*w, *h);
377 dirty.insert(pane_id.clone());
378 }
379
380 let layout = {
382 let mgr = manager.lock().await;
383 let session = match mgr.get_session(session_name) {
384 Some(s) => s,
385 None => return Ok(()),
386 };
387 let window = &session.windows[session.active_window];
388 build_layout_info(session, window, cols, rows)
389 };
390 let msg = ServerPush::LayoutChanged(layout);
391 let bytes = rmp_serde::to_vec(&msg)?;
392 if write_frame(&mut writer, &bytes).await.is_err() {
393 info!("attach: pipe broken during resize for '{}'", session_name);
394 return Ok(());
395 }
396
397 let mgr = manager.lock().await;
399 if let Some(session) = mgr.get_session(session_name) {
400 let window = &session.windows[session.active_window];
401 let (wid, pids) = snapshot_pane_set(session, window);
402 last_window_id = wid;
403 last_pane_ids = pids;
404 }
405 }
406
407 ClientInput::Detach => {
408 info!("attach: client sent detach for session '{}'", session_name);
409 return Ok(());
410 }
411
412 ClientInput::Command(request) => {
413 debug!("attach: command from client in '{}': {:?}", session_name, request);
414 let response = crate::ipc_dispatch::dispatch(request, &manager).await;
415 let msg = rmp_serde::to_vec(&response)?;
416 if write_frame(&mut writer, &msg).await.is_err() {
417 info!("attach: pipe broken sending command response for '{}'", session_name);
418 return Ok(());
419 }
420 for pane_id in &last_pane_ids {
422 dirty.insert(pane_id.clone());
423 }
424 }
425 }
426 }
427 }
428 }
429}
430
431async fn subscribe_pane_updates(
434 manager: &Arc<Mutex<SessionManager>>,
435 session_name: &str,
436) -> Vec<(PaneId, tokio::sync::broadcast::Receiver<()>)> {
437 let mgr = manager.lock().await;
438 let Some(session) = mgr.get_session(session_name) else {
439 return vec![];
440 };
441 let window = &session.windows[session.active_window];
442 window
443 .panes
444 .iter()
445 .map(|pane| (pane.id.clone(), pane.update_tx.subscribe()))
446 .collect()
447}
448
449#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::screen::{Cell, CellAttrs, Color, Screen};
457
458 #[test]
459 fn color_default_converts() {
460 let c = Color::Default;
461 assert_eq!(ScreenColor::from(&c), ScreenColor::Default);
462 }
463
464 #[test]
465 fn color_indexed_converts() {
466 let c = Color::Indexed(42);
467 assert_eq!(ScreenColor::from(&c), ScreenColor::Indexed(42));
468 }
469
470 #[test]
471 fn color_rgb_converts() {
472 let c = Color::Rgb(10, 20, 30);
473 assert_eq!(ScreenColor::from(&c), ScreenColor::Rgb(10, 20, 30));
474 }
475
476 #[test]
477 fn cell_attrs_converts() {
478 let attrs = CellAttrs {
479 bold: true,
480 dim: false,
481 italic: true,
482 underline: false,
483 fg: Color::Indexed(1),
484 bg: Color::Rgb(255, 0, 128),
485 };
486 let proto = ScreenCellAttrs::from(&attrs);
487 assert!(proto.bold);
488 assert!(!proto.dim);
489 assert!(proto.italic);
490 assert!(!proto.underline);
491 assert_eq!(proto.fg, ScreenColor::Indexed(1));
492 assert_eq!(proto.bg, ScreenColor::Rgb(255, 0, 128));
493 }
494
495 #[test]
496 fn cell_converts() {
497 let cell = Cell {
498 ch: 'X',
499 attrs: CellAttrs {
500 bold: true,
501 ..CellAttrs::default()
502 },
503 };
504 let proto = ScreenCell::from(&cell);
505 assert_eq!(proto.ch, 'X');
506 assert!(proto.attrs.bold);
507 }
508
509 #[test]
510 fn default_cell_converts() {
511 let cell = Cell::default();
512 let proto = ScreenCell::from(&cell);
513 assert_eq!(proto.ch, '\0');
514 assert_eq!(proto.attrs, ScreenCellAttrs::default());
515 }
516
517 #[tokio::test]
518 async fn snapshot_from_screen() {
519 let mut screen = Screen::new(10, 3, 100);
520 screen.feed(b"Hello\r\nWorld");
521
522 let screen_arc = Arc::new(tokio::sync::RwLock::new(screen));
523 let snapshot = build_snapshot("%0", &screen_arc).await;
524
525 match snapshot {
526 ServerPush::ScreenSnapshot {
527 pane_id,
528 cells,
529 cursor,
530 size,
531 cursor_visible,
532 ..
533 } => {
534 assert_eq!(pane_id, "%0");
535 assert_eq!(size, (10, 3));
536 assert!(cursor_visible);
537 assert_eq!(cursor.row, 1);
539 assert_eq!(cursor.col, 5);
540 assert_eq!(cells.len(), 3);
542 let first_row_text: String = cells[0].iter().map(|c| c.ch).collect();
543 assert!(first_row_text.starts_with("Hello"));
544 let second_row_text: String = cells[1].iter().map(|c| c.ch).collect();
545 assert!(second_row_text.starts_with("World"));
546 }
547 _ => panic!("expected ScreenSnapshot"),
548 }
549 }
550
551 #[test]
552 fn screen_cursor_visible_getter() {
553 let screen = Screen::new(80, 24, 100);
554 assert!(screen.cursor_visible());
555 }
556
557 #[test]
558 fn screen_grid_getter() {
559 let screen = Screen::new(10, 5, 100);
560 let grid = screen.grid();
561 assert_eq!(grid.len(), 5);
562 assert_eq!(grid[0].len(), 10);
563 }
564
565 #[test]
566 fn screen_grid_after_feed() {
567 let mut screen = Screen::new(10, 3, 100);
568 screen.feed(b"AB");
569 let grid = screen.grid();
570 assert_eq!(grid[0][0].ch, 'A');
571 assert_eq!(grid[0][1].ch, 'B');
572 }
573}