Skip to main content

aimux_server/
attach.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result};
6use tokio::io::{self, AsyncRead, AsyncWrite};
7use tokio::sync::Mutex;
8use tracing::{debug, info, warn};
9use aimux_protocol::{
10    read_frame, write_frame, ClientInput, LayoutInfo, PaneId, PaneLayout, Response,
11    ScreenCell, ScreenCellAttrs, ScreenColor, ServerPush, WindowBarInfo, WindowId,
12};
13
14use crate::screen;
15use crate::session::{Session, SessionManager, Window};
16
17// ---------------------------------------------------------------------------
18// From conversions: server screen types → protocol wire types
19// ---------------------------------------------------------------------------
20
21impl From<&screen::Color> for ScreenColor {
22    fn from(c: &screen::Color) -> Self {
23        match c {
24            screen::Color::Default => ScreenColor::Default,
25            screen::Color::Indexed(n) => ScreenColor::Indexed(*n),
26            screen::Color::Rgb(r, g, b) => ScreenColor::Rgb(*r, *g, *b),
27        }
28    }
29}
30
31impl From<&screen::CellAttrs> for ScreenCellAttrs {
32    fn from(a: &screen::CellAttrs) -> Self {
33        ScreenCellAttrs {
34            bold: a.bold,
35            dim: a.dim,
36            italic: a.italic,
37            underline: a.underline,
38            fg: ScreenColor::from(&a.fg),
39            bg: ScreenColor::from(&a.bg),
40        }
41    }
42}
43
44impl From<&screen::Cell> for ScreenCell {
45    fn from(c: &screen::Cell) -> Self {
46        ScreenCell {
47            ch: c.ch,
48            attrs: ScreenCellAttrs::from(&c.attrs),
49        }
50    }
51}
52
53// ---------------------------------------------------------------------------
54// Layout helpers
55// ---------------------------------------------------------------------------
56
57fn build_layout_info(
58    session: &Session,
59    window: &Window,
60    terminal_cols: u16,
61    terminal_rows: u16,
62) -> LayoutInfo {
63    let positions = window.compute_pane_positions(terminal_cols, terminal_rows);
64    let active_pane_id = window
65        .panes
66        .get(window.active_pane)
67        .map(|p| p.id.clone())
68        .unwrap_or_default();
69
70    let panes: Vec<PaneLayout> = positions
71        .into_iter()
72        .map(|(pane_id, pos)| PaneLayout {
73            is_active: pane_id == active_pane_id,
74            is_zoomed: false, // Phase 5 adds zoom
75            pane_id,
76            x: pos.x,
77            y: pos.y,
78            width: pos.width,
79            height: pos.height,
80        })
81        .collect();
82
83    let windows: Vec<WindowBarInfo> = session
84        .windows
85        .iter()
86        .enumerate()
87        .map(|(idx, w)| WindowBarInfo {
88            id: w.id,
89            title: format!("{}:{}", w.id, w.panes.first().map(|p| p.title.as_str()).unwrap_or("")),
90            is_active: idx == session.active_window,
91        })
92        .collect();
93
94    LayoutInfo {
95        session: session.name.clone(),
96        window_id: window.id,
97        panes,
98        terminal_cols,
99        terminal_rows,
100        windows,
101    }
102}
103
104/// Collect the current pane IDs and active window ID for mutation detection.
105fn snapshot_pane_set(_session: &Session, window: &Window) -> (WindowId, Vec<PaneId>) {
106    let pane_ids: Vec<PaneId> = window.panes.iter().map(|p| p.id.clone()).collect();
107    (window.id, pane_ids)
108}
109
110// ---------------------------------------------------------------------------
111// Snapshot serialization
112// ---------------------------------------------------------------------------
113
114async fn build_snapshot(
115    pane_id: &str,
116    screen_arc: &Arc<tokio::sync::RwLock<screen::Screen>>,
117) -> ServerPush {
118    let screen = screen_arc.read().await;
119    let cells: Vec<Vec<ScreenCell>> = screen
120        .grid()
121        .iter()
122        .map(|row| row.iter().map(ScreenCell::from).collect())
123        .collect();
124    let cursor = screen.cursor();
125    let size = screen.size();
126    ServerPush::ScreenSnapshot {
127        pane_id: pane_id.to_string(),
128        cells,
129        cursor: aimux_protocol::CursorPos {
130            row: cursor.row,
131            col: cursor.col,
132        },
133        size,
134        cursor_visible: screen.cursor_visible(),
135        title: screen.title().to_string(),
136    }
137}
138
139// ---------------------------------------------------------------------------
140// Main attach handler
141// ---------------------------------------------------------------------------
142
143pub async fn handle_attach<S>(
144    pipe: S,
145    manager: Arc<Mutex<SessionManager>>,
146    session_name: &str,
147) -> Result<()>
148where
149    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
150{
151    info!("attach: client connecting to session '{}'", session_name);
152    let default_cols: u16 = 80;
153    let default_rows: u16 = 24;
154
155    // 1. Validate session, build initial layout
156    let layout = {
157        let mgr = manager.lock().await;
158        let session = mgr
159            .get_session(session_name)
160            .context("session not found")?;
161        let window = session
162            .windows
163            .get(session.active_window)
164            .context("no active window")?;
165        build_layout_info(session, window, default_cols, default_rows)
166    };
167
168    // 2. Send AttachAccepted
169    let (mut reader, mut writer) = io::split(pipe);
170    let response = Response::AttachAccepted {
171        layout: layout.clone(),
172    };
173    let bytes = rmp_serde::to_vec(&response).context("serialize AttachAccepted")?;
174    write_frame(&mut writer, &bytes).await.context("send AttachAccepted")?;
175
176    info!("attach: accepted for session '{}' ({} panes)", session_name, layout.panes.len());
177
178    // 3. Set up coalescing state
179    let mut dirty: HashSet<PaneId> = HashSet::new();
180    let mut interval = tokio::time::interval(Duration::from_millis(16));
181    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
182
183    // Track last-known pane set for mutation detection
184    let (mut last_window_id, mut last_pane_ids) = {
185        let mgr = manager.lock().await;
186        let session = match mgr.get_session(session_name) {
187            Some(s) => s,
188            None => {
189                warn!("attach: session '{}' disappeared before streaming started", session_name);
190                let err = ServerPush::Error {
191                    message: "session killed".into(),
192                };
193                let _ = write_frame(&mut writer, &rmp_serde::to_vec(&err)?).await;
194                return Ok(());
195            }
196        };
197        let window = &session.windows[session.active_window];
198        snapshot_pane_set(session, window)
199    };
200
201    // Subscribe to update channels for all visible panes (tagged with pane ID)
202    let mut update_rxs = subscribe_pane_updates(&manager, session_name).await;
203
204    // Mark all panes dirty initially so client gets first full frame
205    for pane_id in &last_pane_ids {
206        dirty.insert(pane_id.clone());
207    }
208
209    let mut terminal_cols = default_cols;
210    let mut terminal_rows = default_rows;
211
212    // Spawn a reader task to make frame reading cancel-safe in select!.
213    // read_frame does multiple sequential reads (version, length, payload) and
214    // is NOT cancel-safe — if dropped mid-read, already-consumed bytes are lost.
215    let (frame_tx, mut frame_rx) = tokio::sync::mpsc::channel::<Result<Vec<u8>>>(4);
216    tokio::spawn(async move {
217        loop {
218            let result = read_frame(&mut reader).await;
219            let is_err = result.is_err();
220            if frame_tx.send(result).await.is_err() {
221                break; // Receiver dropped
222            }
223            if is_err {
224                break; // Stop reading after error
225            }
226        }
227    });
228
229    // 4. Enter select! loop
230    loop {
231        tokio::select! {
232            _ = interval.tick() => {
233                // Drain update notifications, marking specific panes dirty
234                for (pane_id, rx) in &mut update_rxs {
235                    if rx.try_recv().is_ok() {
236                        dirty.insert(pane_id.clone());
237                        // Drain any remaining
238                        while rx.try_recv().is_ok() {}
239                    }
240                }
241
242                // Check for session mutation (pane set changes)
243                let mutation_result = {
244                    let mgr = manager.lock().await;
245                    match mgr.get_session(session_name) {
246                        Some(session) => {
247                            if session.windows.is_empty() {
248                                None
249                            } else {
250                                let window = &session.windows[session.active_window];
251                                let (wid, pids) = snapshot_pane_set(session, window);
252                                Some((wid, pids, session.active_window))
253                            }
254                        }
255                        None => None,
256                    }
257                };
258
259                match mutation_result {
260                    None => {
261                        // Session killed
262                        warn!("attach: session '{}' was killed, disconnecting client", session_name);
263                        let msg = ServerPush::Error { message: "session killed".into() };
264                        let _ = write_frame(&mut writer, &rmp_serde::to_vec(&msg)?).await;
265                        return Ok(());
266                    }
267                    Some((wid, pids, _active_window_idx)) => {
268                        if wid != last_window_id || pids != last_pane_ids {
269                            // Pane set changed — rebuild subscriptions, push new layout
270                            debug!("attach: pane set changed, rebuilding subscriptions");
271                            update_rxs = subscribe_pane_updates(&manager, session_name).await;
272                            last_window_id = wid;
273                            last_pane_ids = pids.clone();
274
275                            // Mark all new panes dirty
276                            for pid in &pids {
277                                dirty.insert(pid.clone());
278                            }
279
280                            // Push LayoutChanged
281                            let layout = {
282                                let mgr = manager.lock().await;
283                                let session = mgr.get_session(session_name).context("session gone")?;
284                                let window = &session.windows[session.active_window];
285                                build_layout_info(session, window, terminal_cols, terminal_rows)
286                            };
287                            let msg = ServerPush::LayoutChanged(layout);
288                            let bytes = rmp_serde::to_vec(&msg)?;
289                            if write_frame(&mut writer, &bytes).await.is_err() {
290                                info!("attach: pipe broken while sending layout change for '{}'", session_name);
291                                return Ok(());
292                            }
293                        }
294                    }
295                }
296
297                // Send snapshots for dirty panes
298                if !dirty.is_empty() {
299                    let pane_ids: Vec<PaneId> = dirty.drain().collect();
300                    for pane_id in &pane_ids {
301                        let screen_arc = {
302                            let mgr = manager.lock().await;
303                            match mgr.find_pane(pane_id) {
304                                Some((_, _, pane)) => pane.screen.clone(),
305                                None => continue,
306                            }
307                        };
308                        let snapshot = build_snapshot(pane_id, &screen_arc).await;
309                        let bytes = rmp_serde::to_vec(&snapshot)?;
310                        if write_frame(&mut writer, &bytes).await.is_err() {
311                            info!("attach: pipe broken while sending snapshot for '{}'", session_name);
312                            return Ok(());
313                        }
314                    }
315                }
316            }
317
318            frame_result = frame_rx.recv() => {
319                let payload = match frame_result {
320                    Some(Ok(p)) => p,
321                    Some(Err(e)) => {
322                        info!("attach: client disconnected from '{}': {}", session_name, e);
323                        return Ok(());
324                    }
325                    None => {
326                        info!("attach: client reader closed for '{}'", session_name);
327                        return Ok(());
328                    }
329                };
330                let input: ClientInput = rmp_serde::from_slice(&payload)
331                    .context("deserialize ClientInput")?;
332
333                match input {
334                    ClientInput::Keys { data } => {
335                        let mgr = manager.lock().await;
336                        let session = match mgr.get_session(session_name) {
337                            Some(s) => s,
338                            None => return Ok(()),
339                        };
340                        let window = &session.windows[session.active_window];
341                        if let Some(pane) = window.panes.get(window.active_pane) {
342                            let _ = pane.pty.write(&data);
343                        }
344                    }
345
346                    ClientInput::Resize { cols, rows } => {
347                        debug!("attach: client resize {}x{} for '{}'", cols, rows, session_name);
348                        terminal_cols = cols;
349                        terminal_rows = rows;
350
351                        // Recompute layout and resize all panes
352                        let resize_targets = {
353                            let mgr = manager.lock().await;
354                            let session = match mgr.get_session(session_name) {
355                                Some(s) => s,
356                                None => return Ok(()),
357                            };
358                            let window = &session.windows[session.active_window];
359                            let positions = window.compute_pane_positions(cols, rows);
360                            positions
361                                .into_iter()
362                                .map(|(pane_id, pos)| (pane_id, pos.width, pos.height))
363                                .collect::<Vec<_>>()
364                        };
365
366                        // Resize each pane's PTY and screen
367                        for (pane_id, w, h) in &resize_targets {
368                            let screen_arc = {
369                                let mut mgr = manager.lock().await;
370                                match mgr.resize_pane_pty(pane_id, *w, *h) {
371                                    Ok(s) => s,
372                                    Err(_) => continue,
373                                }
374                            };
375                            let mut screen = screen_arc.write().await;
376                            screen.resize(*w, *h);
377                            dirty.insert(pane_id.clone());
378                        }
379
380                        // Push LayoutChanged
381                        let layout = {
382                            let mgr = manager.lock().await;
383                            let session = match mgr.get_session(session_name) {
384                                Some(s) => s,
385                                None => return Ok(()),
386                            };
387                            let window = &session.windows[session.active_window];
388                            build_layout_info(session, window, cols, rows)
389                        };
390                        let msg = ServerPush::LayoutChanged(layout);
391                        let bytes = rmp_serde::to_vec(&msg)?;
392                        if write_frame(&mut writer, &bytes).await.is_err() {
393                            info!("attach: pipe broken during resize for '{}'", session_name);
394                            return Ok(());
395                        }
396
397                        // Rebuild pane set tracking after resize
398                        let mgr = manager.lock().await;
399                        if let Some(session) = mgr.get_session(session_name) {
400                            let window = &session.windows[session.active_window];
401                            let (wid, pids) = snapshot_pane_set(session, window);
402                            last_window_id = wid;
403                            last_pane_ids = pids;
404                        }
405                    }
406
407                    ClientInput::Detach => {
408                        info!("attach: client sent detach for session '{}'", session_name);
409                        return Ok(());
410                    }
411
412                    ClientInput::Command(request) => {
413                        debug!("attach: command from client in '{}': {:?}", session_name, request);
414                        let response = crate::ipc_dispatch::dispatch(request, &manager).await;
415                        let msg = rmp_serde::to_vec(&response)?;
416                        if write_frame(&mut writer, &msg).await.is_err() {
417                            info!("attach: pipe broken sending command response for '{}'", session_name);
418                            return Ok(());
419                        }
420                        // After a command, mark all panes dirty to pick up changes
421                        for pane_id in &last_pane_ids {
422                            dirty.insert(pane_id.clone());
423                        }
424                    }
425                }
426            }
427        }
428    }
429}
430
431/// Subscribe to update_tx for all visible panes in the session's active window.
432/// Returns receivers tagged with their pane IDs for per-pane dirty tracking.
433async fn subscribe_pane_updates(
434    manager: &Arc<Mutex<SessionManager>>,
435    session_name: &str,
436) -> Vec<(PaneId, tokio::sync::broadcast::Receiver<()>)> {
437    let mgr = manager.lock().await;
438    let Some(session) = mgr.get_session(session_name) else {
439        return vec![];
440    };
441    let window = &session.windows[session.active_window];
442    window
443        .panes
444        .iter()
445        .map(|pane| (pane.id.clone(), pane.update_tx.subscribe()))
446        .collect()
447}
448
449// ---------------------------------------------------------------------------
450// Tests
451// ---------------------------------------------------------------------------
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::screen::{Cell, CellAttrs, Color, Screen};
457
458    #[test]
459    fn color_default_converts() {
460        let c = Color::Default;
461        assert_eq!(ScreenColor::from(&c), ScreenColor::Default);
462    }
463
464    #[test]
465    fn color_indexed_converts() {
466        let c = Color::Indexed(42);
467        assert_eq!(ScreenColor::from(&c), ScreenColor::Indexed(42));
468    }
469
470    #[test]
471    fn color_rgb_converts() {
472        let c = Color::Rgb(10, 20, 30);
473        assert_eq!(ScreenColor::from(&c), ScreenColor::Rgb(10, 20, 30));
474    }
475
476    #[test]
477    fn cell_attrs_converts() {
478        let attrs = CellAttrs {
479            bold: true,
480            dim: false,
481            italic: true,
482            underline: false,
483            fg: Color::Indexed(1),
484            bg: Color::Rgb(255, 0, 128),
485        };
486        let proto = ScreenCellAttrs::from(&attrs);
487        assert!(proto.bold);
488        assert!(!proto.dim);
489        assert!(proto.italic);
490        assert!(!proto.underline);
491        assert_eq!(proto.fg, ScreenColor::Indexed(1));
492        assert_eq!(proto.bg, ScreenColor::Rgb(255, 0, 128));
493    }
494
495    #[test]
496    fn cell_converts() {
497        let cell = Cell {
498            ch: 'X',
499            attrs: CellAttrs {
500                bold: true,
501                ..CellAttrs::default()
502            },
503        };
504        let proto = ScreenCell::from(&cell);
505        assert_eq!(proto.ch, 'X');
506        assert!(proto.attrs.bold);
507    }
508
509    #[test]
510    fn default_cell_converts() {
511        let cell = Cell::default();
512        let proto = ScreenCell::from(&cell);
513        assert_eq!(proto.ch, '\0');
514        assert_eq!(proto.attrs, ScreenCellAttrs::default());
515    }
516
517    #[tokio::test]
518    async fn snapshot_from_screen() {
519        let mut screen = Screen::new(10, 3, 100);
520        screen.feed(b"Hello\r\nWorld");
521
522        let screen_arc = Arc::new(tokio::sync::RwLock::new(screen));
523        let snapshot = build_snapshot("%0", &screen_arc).await;
524
525        match snapshot {
526            ServerPush::ScreenSnapshot {
527                pane_id,
528                cells,
529                cursor,
530                size,
531                cursor_visible,
532                ..
533            } => {
534                assert_eq!(pane_id, "%0");
535                assert_eq!(size, (10, 3));
536                assert!(cursor_visible);
537                // Cursor should be after "World" — row 1, col 5
538                assert_eq!(cursor.row, 1);
539                assert_eq!(cursor.col, 5);
540                // Check first row has "Hello"
541                assert_eq!(cells.len(), 3);
542                let first_row_text: String = cells[0].iter().map(|c| c.ch).collect();
543                assert!(first_row_text.starts_with("Hello"));
544                let second_row_text: String = cells[1].iter().map(|c| c.ch).collect();
545                assert!(second_row_text.starts_with("World"));
546            }
547            _ => panic!("expected ScreenSnapshot"),
548        }
549    }
550
551    #[test]
552    fn screen_cursor_visible_getter() {
553        let screen = Screen::new(80, 24, 100);
554        assert!(screen.cursor_visible());
555    }
556
557    #[test]
558    fn screen_grid_getter() {
559        let screen = Screen::new(10, 5, 100);
560        let grid = screen.grid();
561        assert_eq!(grid.len(), 5);
562        assert_eq!(grid[0].len(), 10);
563    }
564
565    #[test]
566    fn screen_grid_after_feed() {
567        let mut screen = Screen::new(10, 3, 100);
568        screen.feed(b"AB");
569        let grid = screen.grid();
570        assert_eq!(grid[0][0].ch, 'A');
571        assert_eq!(grid[0][1].ch, 'B');
572    }
573}