Skip to main content

gritty/
client.rs

1use crate::protocol::{Frame, FrameCodec};
2use bytes::Bytes;
3use futures_util::{SinkExt, StreamExt};
4use nix::sys::termios::{self, FlushArg, LocalFlags, SetArg, SpecialCharacterIndices, Termios};
5use std::collections::HashMap;
6use std::io::{self, Read, Write};
7use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
8use std::path::Path;
9use std::time::Duration;
10use tokio::io::unix::AsyncFd;
11use tokio::net::UnixStream;
12use tokio::signal::unix::{SignalKind, signal};
13use tokio::sync::mpsc;
14use tokio::time::Instant;
15use tokio_util::codec::Framed;
16use tracing::{debug, info};
17
18// --- Escape sequence processing (SSH-style ~. detach, ~^Z suspend, ~? help) ---
19
20const ESCAPE_HELP: &[u8] = b"\r\nSupported escape sequences:\r\n\
21    ~.  - detach from session\r\n\
22    ~^Z - suspend client\r\n\
23    ~?  - this message\r\n\
24    ~~  - send the escape character by typing it twice\r\n\
25(Note that escapes are only recognized immediately after newline.)\r\n";
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum EscapeState {
29    Normal,
30    AfterNewline,
31    AfterTilde,
32}
33
34#[derive(Debug, PartialEq, Eq)]
35enum EscapeAction {
36    Data(Vec<u8>),
37    Detach,
38    Suspend,
39    Help,
40}
41
42struct EscapeProcessor {
43    state: EscapeState,
44}
45
46impl EscapeProcessor {
47    fn new() -> Self {
48        Self { state: EscapeState::AfterNewline }
49    }
50
51    fn process(&mut self, input: &[u8]) -> Vec<EscapeAction> {
52        let mut actions = Vec::new();
53        let mut data_buf = Vec::new();
54
55        for &b in input {
56            match self.state {
57                EscapeState::Normal => {
58                    if b == b'\n' || b == b'\r' {
59                        self.state = EscapeState::AfterNewline;
60                    }
61                    data_buf.push(b);
62                }
63                EscapeState::AfterNewline => {
64                    if b == b'~' {
65                        self.state = EscapeState::AfterTilde;
66                        // Buffer the tilde — don't send yet
67                        if !data_buf.is_empty() {
68                            actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
69                        }
70                    } else if b == b'\n' || b == b'\r' {
71                        // Stay in AfterNewline
72                        data_buf.push(b);
73                    } else {
74                        self.state = EscapeState::Normal;
75                        data_buf.push(b);
76                    }
77                }
78                EscapeState::AfterTilde => {
79                    match b {
80                        b'.' => {
81                            if !data_buf.is_empty() {
82                                actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
83                            }
84                            actions.push(EscapeAction::Detach);
85                            return actions; // Stop processing
86                        }
87                        0x1a => {
88                            // Ctrl-Z
89                            if !data_buf.is_empty() {
90                                actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
91                            }
92                            actions.push(EscapeAction::Suspend);
93                            self.state = EscapeState::Normal;
94                        }
95                        b'?' => {
96                            if !data_buf.is_empty() {
97                                actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
98                            }
99                            actions.push(EscapeAction::Help);
100                            self.state = EscapeState::Normal;
101                        }
102                        b'~' => {
103                            // Literal tilde
104                            data_buf.push(b'~');
105                            self.state = EscapeState::Normal;
106                        }
107                        b'\n' | b'\r' => {
108                            // Flush buffered tilde + this byte
109                            data_buf.push(b'~');
110                            data_buf.push(b);
111                            self.state = EscapeState::AfterNewline;
112                        }
113                        _ => {
114                            // Unknown — flush tilde + byte
115                            data_buf.push(b'~');
116                            data_buf.push(b);
117                            self.state = EscapeState::Normal;
118                        }
119                    }
120                }
121            }
122        }
123
124        if !data_buf.is_empty() {
125            actions.push(EscapeAction::Data(data_buf));
126        }
127        actions
128    }
129}
130
131fn suspend(raw_guard: &RawModeGuard, nb_guard: &NonBlockGuard) -> anyhow::Result<()> {
132    // Restore cooked mode and blocking stdin so the parent shell works normally
133    termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw_guard.original)?;
134    let _ = nix::fcntl::fcntl(nb_guard.fd, nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags));
135
136    nix::sys::signal::kill(nix::unistd::Pid::from_raw(0), nix::sys::signal::Signal::SIGTSTP)?;
137
138    // After resume (fg): re-enter raw mode and non-blocking stdin
139    let _ = nix::fcntl::fcntl(
140        nb_guard.fd,
141        nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags | nix::fcntl::OFlag::O_NONBLOCK),
142    );
143    let mut raw = raw_guard.original.clone();
144    termios::cfmakeraw(&mut raw);
145    termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw)?;
146    Ok(())
147}
148
149const SEND_TIMEOUT: Duration = Duration::from_secs(5);
150
151struct NonBlockGuard {
152    fd: BorrowedFd<'static>,
153    original_flags: nix::fcntl::OFlag,
154}
155
156impl NonBlockGuard {
157    fn set(fd: BorrowedFd<'static>) -> nix::Result<Self> {
158        let flags = nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFL)?;
159        let original_flags = nix::fcntl::OFlag::from_bits_truncate(flags);
160        nix::fcntl::fcntl(
161            fd,
162            nix::fcntl::FcntlArg::F_SETFL(original_flags | nix::fcntl::OFlag::O_NONBLOCK),
163        )?;
164        Ok(Self { fd, original_flags })
165    }
166}
167
168impl Drop for NonBlockGuard {
169    fn drop(&mut self) {
170        let _ = nix::fcntl::fcntl(self.fd, nix::fcntl::FcntlArg::F_SETFL(self.original_flags));
171    }
172}
173
174struct RawModeGuard {
175    fd: BorrowedFd<'static>,
176    original: Termios,
177}
178
179impl RawModeGuard {
180    fn enter(fd: BorrowedFd<'static>) -> nix::Result<Self> {
181        let original = termios::tcgetattr(fd)?;
182        let mut raw = original.clone();
183        termios::cfmakeraw(&mut raw);
184        termios::tcsetattr(fd, SetArg::TCSAFLUSH, &raw)?;
185        Ok(Self { fd, original })
186    }
187}
188
189impl Drop for RawModeGuard {
190    fn drop(&mut self) {
191        let _ = termios::tcsetattr(self.fd, SetArg::TCSAFLUSH, &self.original);
192    }
193}
194
195/// Suppresses stdin echo for tail mode: disables ECHO and ICANON but keeps
196/// ISIG so Ctrl-C still generates SIGINT. Flushes pending input on drop.
197struct SuppressInputGuard {
198    fd: BorrowedFd<'static>,
199    original: Termios,
200}
201
202impl SuppressInputGuard {
203    fn enter(fd: BorrowedFd<'static>) -> nix::Result<Self> {
204        let original = termios::tcgetattr(fd)?;
205        let mut modified = original.clone();
206        modified.local_flags.remove(LocalFlags::ECHO | LocalFlags::ICANON);
207        modified.control_chars[SpecialCharacterIndices::VMIN as usize] = 1;
208        modified.control_chars[SpecialCharacterIndices::VTIME as usize] = 0;
209        termios::tcsetattr(fd, SetArg::TCSAFLUSH, &modified)?;
210        Ok(Self { fd, original })
211    }
212}
213
214impl Drop for SuppressInputGuard {
215    fn drop(&mut self) {
216        let _ = termios::tcflush(self.fd, FlushArg::TCIFLUSH);
217        let _ = termios::tcsetattr(self.fd, SetArg::TCSAFLUSH, &self.original);
218    }
219}
220
221/// Write all bytes to stdout, retrying on WouldBlock.
222/// Needed because setting O_NONBLOCK on stdin also affects stdout
223/// when they share the same terminal file description.
224fn write_stdout(data: &[u8]) -> io::Result<()> {
225    let mut stdout = io::stdout();
226    let mut written = 0;
227    while written < data.len() {
228        match stdout.write(&data[written..]) {
229            Ok(n) => written += n,
230            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
231                std::thread::sleep(Duration::from_millis(1));
232            }
233            Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
234            Err(e) => return Err(e),
235        }
236    }
237    loop {
238        match stdout.flush() {
239            Ok(()) => return Ok(()),
240            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
241                std::thread::sleep(Duration::from_millis(1));
242            }
243            Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
244            Err(e) => return Err(e),
245        }
246    }
247}
248
249fn get_terminal_size() -> (u16, u16) {
250    let mut ws: libc::winsize = unsafe { std::mem::zeroed() };
251    if unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) } != 0
252        || ws.ws_col == 0
253        || ws.ws_row == 0
254    {
255        return (80, 24);
256    }
257    (ws.ws_col, ws.ws_row)
258}
259
260/// Send a frame with a timeout. Returns false if the send failed or timed out.
261async fn timed_send(framed: &mut Framed<UnixStream, FrameCodec>, frame: Frame) -> bool {
262    match tokio::time::timeout(SEND_TIMEOUT, framed.send(frame)).await {
263        Ok(Ok(())) => true,
264        Ok(Err(e)) => {
265            debug!("send error: {e}");
266            false
267        }
268        Err(_) => {
269            debug!("send timed out");
270            false
271        }
272    }
273}
274
275const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
276const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15);
277
278/// Events from local agent connection tasks to the relay loop.
279enum AgentEvent {
280    Data { channel_id: u32, data: Bytes },
281    Closed { channel_id: u32 },
282}
283
284/// Send session setup frames (env, agent/open forwarding, resize, redraw).
285/// Returns false if the connection dropped during setup.
286async fn send_init_frames(
287    framed: &mut Framed<UnixStream, FrameCodec>,
288    env_vars: &[(String, String)],
289    forward_agent: bool,
290    agent_socket: Option<&str>,
291    forward_open: bool,
292    redraw: bool,
293) -> bool {
294    if !env_vars.is_empty() && !timed_send(framed, Frame::Env { vars: env_vars.to_vec() }).await {
295        return false;
296    }
297    if forward_agent && agent_socket.is_some() && !timed_send(framed, Frame::AgentForward).await {
298        return false;
299    }
300    if forward_open && !timed_send(framed, Frame::OpenForward).await {
301        return false;
302    }
303    let (cols, rows) = get_terminal_size();
304    if !timed_send(framed, Frame::Resize { cols, rows }).await {
305        return false;
306    }
307    if redraw && !timed_send(framed, Frame::Data(Bytes::from_static(b"\x0c"))).await {
308        return false;
309    }
310    true
311}
312
313/// Relay between stdin/stdout and the framed socket.
314/// Returns `Some(code)` on clean shell exit or detach, `None` on server disconnect / heartbeat timeout.
315#[allow(clippy::too_many_arguments)]
316async fn relay(
317    framed: &mut Framed<UnixStream, FrameCodec>,
318    async_stdin: &AsyncFd<io::Stdin>,
319    sigwinch: &mut tokio::signal::unix::Signal,
320    buf: &mut [u8],
321    mut escape: Option<&mut EscapeProcessor>,
322    raw_guard: &RawModeGuard,
323    nb_guard: &NonBlockGuard,
324    agent_socket: Option<&str>,
325) -> anyhow::Result<Option<i32>> {
326    let mut sigterm = signal(SignalKind::terminate())?;
327    let mut sighup = signal(SignalKind::hangup())?;
328
329    let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
330    heartbeat_interval.reset(); // first tick is immediate otherwise; delay it
331    let mut last_pong = Instant::now();
332
333    // Agent channel management
334    let mut agent_channels: HashMap<u32, mpsc::UnboundedSender<Bytes>> = HashMap::new();
335    let (agent_event_tx, mut agent_event_rx) = mpsc::unbounded_channel::<AgentEvent>();
336
337    loop {
338        tokio::select! {
339            ready = async_stdin.readable() => {
340                let mut guard = ready?;
341                match guard.try_io(|inner| inner.get_ref().read(buf)) {
342                    Ok(Ok(0)) => {
343                        debug!("stdin EOF");
344                        return Ok(Some(0));
345                    }
346                    Ok(Ok(n)) => {
347                        debug!(len = n, "stdin → socket");
348                        if let Some(ref mut esc) = escape {
349                            for action in esc.process(&buf[..n]) {
350                                match action {
351                                    EscapeAction::Data(data) => {
352                                        if !timed_send(framed, Frame::Data(Bytes::from(data))).await {
353                                            return Ok(None);
354                                        }
355                                    }
356                                    EscapeAction::Detach => {
357                                        write_stdout(b"\r\n[detached]\r\n")?;
358                                        return Ok(Some(0));
359                                    }
360                                    EscapeAction::Suspend => {
361                                        suspend(raw_guard, nb_guard)?;
362                                        // Re-sync terminal size after resume
363                                        let (cols, rows) = get_terminal_size();
364                                        if !timed_send(framed, Frame::Resize { cols, rows }).await {
365                                            return Ok(None);
366                                        }
367                                    }
368                                    EscapeAction::Help => {
369                                        write_stdout(ESCAPE_HELP)?;
370                                    }
371                                }
372                            }
373                        } else if !timed_send(framed, Frame::Data(Bytes::copy_from_slice(&buf[..n]))).await {
374                            return Ok(None);
375                        }
376                    }
377                    Ok(Err(e)) => return Err(e.into()),
378                    Err(_would_block) => continue,
379                }
380            }
381
382            frame = framed.next() => {
383                match frame {
384                    Some(Ok(Frame::Data(data))) => {
385                        debug!(len = data.len(), "socket → stdout");
386                        write_stdout(&data)?;
387                    }
388                    Some(Ok(Frame::Pong)) => {
389                        debug!("pong received");
390                        last_pong = Instant::now();
391                    }
392                    Some(Ok(Frame::Exit { code })) => {
393                        debug!(code, "server sent exit");
394                        return Ok(Some(code));
395                    }
396                    Some(Ok(Frame::Detached)) => {
397                        info!("detached by another client");
398                        write_stdout(b"[detached]\r\n")?;
399                        return Ok(Some(0));
400                    }
401                    Some(Ok(Frame::AgentOpen { channel_id })) => {
402                        if let Some(sock_path) = agent_socket {
403                            match tokio::net::UnixStream::connect(sock_path).await {
404                                Ok(stream) => {
405                                    let (read_half, write_half) = stream.into_split();
406                                    let data_tx = agent_event_tx.clone();
407                                    let close_tx = agent_event_tx.clone();
408                                    let writer_tx = crate::spawn_channel_relay(
409                                        channel_id,
410                                        read_half,
411                                        write_half,
412                                        move |id, data| data_tx.send(AgentEvent::Data { channel_id: id, data }).is_ok(),
413                                        move |id| { let _ = close_tx.send(AgentEvent::Closed { channel_id: id }); },
414                                    );
415                                    agent_channels.insert(channel_id, writer_tx);
416                                }
417                                Err(e) => {
418                                    debug!("failed to connect to local agent: {e}");
419                                    let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
420                                }
421                            }
422                        } else {
423                            let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
424                        }
425                    }
426                    Some(Ok(Frame::AgentData { channel_id, data })) => {
427                        if let Some(tx) = agent_channels.get(&channel_id) {
428                            let _ = tx.send(data);
429                        }
430                    }
431                    Some(Ok(Frame::AgentClose { channel_id })) => {
432                        agent_channels.remove(&channel_id);
433                    }
434                    Some(Ok(Frame::OpenUrl { url })) => {
435                        if url.starts_with("http://") || url.starts_with("https://") {
436                            debug!("opening URL locally: {url}");
437                            let cmd = if cfg!(target_os = "macos") { "open" } else { "xdg-open" };
438                            let _ = std::process::Command::new(cmd)
439                                .arg("--")
440                                .arg(&url)
441                                .stdin(std::process::Stdio::null())
442                                .stdout(std::process::Stdio::null())
443                                .stderr(std::process::Stdio::null())
444                                .spawn();
445                        } else {
446                            debug!("rejected non-http(s) URL: {url}");
447                        }
448                    }
449                    Some(Ok(_)) => {} // ignore control/resize frames
450                    Some(Err(e)) => {
451                        debug!("server connection error: {e}");
452                        return Ok(None);
453                    }
454                    None => {
455                        debug!("server disconnected");
456                        return Ok(None);
457                    }
458                }
459            }
460
461            // Agent events from local agent connections
462            event = agent_event_rx.recv() => {
463                match event {
464                    Some(AgentEvent::Data { channel_id, data }) => {
465                        if agent_channels.contains_key(&channel_id)
466                            && !timed_send(framed, Frame::AgentData { channel_id, data }).await
467                        {
468                            return Ok(None);
469                        }
470                    }
471                    Some(AgentEvent::Closed { channel_id }) => {
472                        if agent_channels.remove(&channel_id).is_some()
473                            && !timed_send(framed, Frame::AgentClose { channel_id }).await
474                        {
475                            return Ok(None);
476                        }
477                    }
478                    None => {} // no more agent tasks
479                }
480            }
481
482            _ = sigwinch.recv() => {
483                let (cols, rows) = get_terminal_size();
484                debug!(cols, rows, "SIGWINCH → resize");
485                if !timed_send(framed, Frame::Resize { cols, rows }).await {
486                    return Ok(None);
487                }
488            }
489
490            _ = heartbeat_interval.tick() => {
491                if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
492                    debug!("heartbeat timeout");
493                    return Ok(None);
494                }
495                if !timed_send(framed, Frame::Ping).await {
496                    return Ok(None);
497                }
498            }
499
500            _ = sigterm.recv() => {
501                debug!("SIGTERM received, exiting");
502                return Ok(Some(1));
503            }
504
505            _ = sighup.recv() => {
506                debug!("SIGHUP received, exiting");
507                return Ok(Some(1));
508            }
509        }
510    }
511}
512
513#[allow(clippy::too_many_arguments)]
514pub async fn run(
515    session: &str,
516    mut framed: Framed<UnixStream, FrameCodec>,
517    redraw: bool,
518    ctl_path: &Path,
519    env_vars: Vec<(String, String)>,
520    no_escape: bool,
521    forward_agent: bool,
522    forward_open: bool,
523) -> anyhow::Result<i32> {
524    let stdin = io::stdin();
525    let stdin_fd = stdin.as_fd();
526    // Safety: stdin lives for the duration of the program
527    let stdin_borrowed: BorrowedFd<'static> =
528        unsafe { BorrowedFd::borrow_raw(stdin_fd.as_raw_fd()) };
529    let raw_guard = RawModeGuard::enter(stdin_borrowed)?;
530
531    // Set stdin to non-blocking for AsyncFd — guard restores on drop.
532    // Declared BEFORE async_stdin so it drops AFTER AsyncFd (reverse drop order).
533    let nb_guard = NonBlockGuard::set(stdin_borrowed)?;
534    let async_stdin = AsyncFd::new(io::stdin())?;
535    let mut sigwinch = signal(SignalKind::window_change())?;
536    let mut buf = vec![0u8; 4096];
537    let mut current_redraw = redraw;
538    let mut current_env = env_vars;
539    let mut escape = if no_escape { None } else { Some(EscapeProcessor::new()) };
540    let agent_socket = if forward_agent { std::env::var("SSH_AUTH_SOCK").ok() } else { None };
541
542    loop {
543        let result = if send_init_frames(
544            &mut framed,
545            &current_env,
546            forward_agent,
547            agent_socket.as_deref(),
548            forward_open,
549            current_redraw,
550        )
551        .await
552        {
553            relay(
554                &mut framed,
555                &async_stdin,
556                &mut sigwinch,
557                &mut buf,
558                escape.as_mut(),
559                &raw_guard,
560                &nb_guard,
561                agent_socket.as_deref(),
562            )
563            .await?
564        } else {
565            None
566        };
567        match result {
568            Some(code) => return Ok(code),
569            None => {
570                // Env vars only sent on first connection; clear for reconnect
571                current_env.clear();
572                // Disconnected — try to reconnect
573                write_stdout(b"[reconnecting...]\r\n")?;
574
575                loop {
576                    tokio::time::sleep(Duration::from_secs(1)).await;
577
578                    // Check for Ctrl-C (0x03) in raw mode
579                    {
580                        let mut peek = [0u8; 1];
581                        match io::stdin().read(&mut peek) {
582                            Ok(1) if peek[0] == 0x03 => {
583                                write_stdout(b"\r\n")?;
584                                return Ok(1);
585                            }
586                            _ => {}
587                        }
588                    }
589
590                    let stream = match UnixStream::connect(ctl_path).await {
591                        Ok(s) => s,
592                        Err(_) => continue,
593                    };
594
595                    let mut new_framed = Framed::new(stream, FrameCodec);
596                    if crate::handshake(&mut new_framed).await.is_err() {
597                        continue;
598                    }
599                    if new_framed
600                        .send(Frame::Attach { session: session.to_string() })
601                        .await
602                        .is_err()
603                    {
604                        continue;
605                    }
606
607                    match new_framed.next().await {
608                        Some(Ok(Frame::Ok)) => {
609                            write_stdout(b"[reconnected]\r\n")?;
610                            framed = new_framed;
611                            current_redraw = true;
612                            break;
613                        }
614                        Some(Ok(Frame::Error { message })) => {
615                            let msg = format!("[session gone: {message}]\r\n");
616                            write_stdout(msg.as_bytes())?;
617                            return Ok(1);
618                        }
619                        _ => continue,
620                    }
621                }
622            }
623        }
624    }
625}
626
627/// Read-only tail of a session's PTY output.
628/// No raw mode, no stdin, no escape processing, no forwarding.
629/// Ctrl-C triggers clean exit with terminal reset.
630pub async fn tail(
631    session: &str,
632    mut framed: Framed<UnixStream, FrameCodec>,
633    ctl_path: &Path,
634) -> anyhow::Result<i32> {
635    // Suppress stdin echo — tail is read-only. Guard restores on drop.
636    let stdin_fd = unsafe { BorrowedFd::borrow_raw(libc::STDIN_FILENO) };
637    let _input_guard = SuppressInputGuard::enter(stdin_fd).ok();
638
639    // Drain stdin in background, ring bell on first keystroke
640    tokio::task::spawn_blocking(|| {
641        let mut buf = [0u8; 64];
642        let mut belled = false;
643        loop {
644            match io::stdin().read(&mut buf) {
645                Ok(0) | Err(_) => break,
646                Ok(_) if !belled => {
647                    let _ = io::stderr().write_all(b"\x07");
648                    let _ = io::stderr().flush();
649                    belled = true;
650                }
651                _ => {}
652            }
653        }
654    });
655
656    let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
657    heartbeat_interval.reset();
658    let mut last_pong = Instant::now();
659    let mut sigint = signal(SignalKind::interrupt())?;
660    let mut sigterm = signal(SignalKind::terminate())?;
661    let mut sighup = signal(SignalKind::hangup())?;
662
663    let code = 'outer: loop {
664        let result = 'relay: loop {
665            tokio::select! {
666                frame = framed.next() => {
667                    match frame {
668                        Some(Ok(Frame::Data(data))) => {
669                            write_stdout(&data)?;
670                        }
671                        Some(Ok(Frame::Pong)) => {
672                            last_pong = Instant::now();
673                        }
674                        Some(Ok(Frame::Exit { code })) => {
675                            break 'relay Some(code);
676                        }
677                        Some(Ok(_)) => {}
678                        Some(Err(e)) => {
679                            debug!("tail connection error: {e}");
680                            break 'relay None;
681                        }
682                        None => {
683                            debug!("tail server disconnected");
684                            break 'relay None;
685                        }
686                    }
687                }
688                _ = heartbeat_interval.tick() => {
689                    if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
690                        debug!("tail heartbeat timeout");
691                        break 'relay None;
692                    }
693                    if framed.send(Frame::Ping).await.is_err() {
694                        break 'relay None;
695                    }
696                }
697                _ = sigint.recv() => {
698                    break 'outer 0;
699                }
700                _ = sigterm.recv() => {
701                    break 'outer 1;
702                }
703                _ = sighup.recv() => {
704                    break 'outer 1;
705                }
706            }
707        };
708
709        match result {
710            Some(code) => break code,
711            None => {
712                eprintln!("[reconnecting...]");
713                loop {
714                    tokio::time::sleep(Duration::from_secs(1)).await;
715
716                    let stream = match UnixStream::connect(ctl_path).await {
717                        Ok(s) => s,
718                        Err(_) => continue,
719                    };
720
721                    let mut new_framed = Framed::new(stream, FrameCodec);
722                    if crate::handshake(&mut new_framed).await.is_err() {
723                        continue;
724                    }
725                    if new_framed.send(Frame::Tail { session: session.to_string() }).await.is_err()
726                    {
727                        continue;
728                    }
729
730                    match new_framed.next().await {
731                        Some(Ok(Frame::Ok)) => {
732                            eprintln!("[reconnected]");
733                            framed = new_framed;
734                            heartbeat_interval.reset();
735                            last_pong = Instant::now();
736                            break;
737                        }
738                        Some(Ok(Frame::Error { message })) => {
739                            eprintln!("[session gone: {message}]");
740                            break 'outer 1;
741                        }
742                        _ => continue,
743                    }
744                }
745            }
746        }
747    };
748
749    // Reset terminal state: clear attributes and show cursor.
750    // PTY output may have left colors/bold set or cursor hidden.
751    let _ = write_stdout(b"\x1b[0m\x1b[?25h");
752    Ok(code)
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758
759    #[test]
760    fn normal_passthrough() {
761        let mut ep = EscapeProcessor::new();
762        // No newlines — after initial AfterNewline, 'h' transitions to Normal
763        let actions = ep.process(b"hello");
764        assert_eq!(actions, vec![EscapeAction::Data(b"hello".to_vec())]);
765    }
766
767    #[test]
768    fn tilde_after_newline_detach() {
769        let mut ep = EscapeProcessor { state: EscapeState::Normal };
770        let actions = ep.process(b"\n~.");
771        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
772    }
773
774    #[test]
775    fn tilde_after_cr_detach() {
776        let mut ep = EscapeProcessor { state: EscapeState::Normal };
777        let actions = ep.process(b"\r~.");
778        assert_eq!(actions, vec![EscapeAction::Data(b"\r".to_vec()), EscapeAction::Detach,]);
779    }
780
781    #[test]
782    fn tilde_not_after_newline() {
783        let mut ep = EscapeProcessor { state: EscapeState::Normal };
784        let actions = ep.process(b"a~.");
785        assert_eq!(actions, vec![EscapeAction::Data(b"a~.".to_vec())]);
786    }
787
788    #[test]
789    fn initial_state_detach() {
790        let mut ep = EscapeProcessor::new();
791        let actions = ep.process(b"~.");
792        assert_eq!(actions, vec![EscapeAction::Detach]);
793    }
794
795    #[test]
796    fn tilde_suspend() {
797        let mut ep = EscapeProcessor { state: EscapeState::Normal };
798        let actions = ep.process(b"\n~\x1a");
799        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Suspend,]);
800    }
801
802    #[test]
803    fn tilde_help() {
804        let mut ep = EscapeProcessor { state: EscapeState::Normal };
805        let actions = ep.process(b"\n~?");
806        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Help,]);
807    }
808
809    #[test]
810    fn double_tilde() {
811        let mut ep = EscapeProcessor { state: EscapeState::Normal };
812        let actions = ep.process(b"\n~~");
813        assert_eq!(
814            actions,
815            vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~".to_vec()),]
816        );
817        assert_eq!(ep.state, EscapeState::Normal);
818    }
819
820    #[test]
821    fn tilde_unknown_char() {
822        let mut ep = EscapeProcessor { state: EscapeState::Normal };
823        let actions = ep.process(b"\n~x");
824        assert_eq!(
825            actions,
826            vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~x".to_vec()),]
827        );
828    }
829
830    #[test]
831    fn split_across_reads() {
832        let mut ep = EscapeProcessor { state: EscapeState::Normal };
833        let a1 = ep.process(b"\n");
834        assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
835        let a2 = ep.process(b"~");
836        assert_eq!(a2, vec![]); // tilde buffered
837        let a3 = ep.process(b".");
838        assert_eq!(a3, vec![EscapeAction::Detach]);
839    }
840
841    #[test]
842    fn split_tilde_then_normal() {
843        let mut ep = EscapeProcessor { state: EscapeState::Normal };
844        let a1 = ep.process(b"\n");
845        assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
846        let a2 = ep.process(b"~");
847        assert_eq!(a2, vec![]);
848        let a3 = ep.process(b"a");
849        assert_eq!(a3, vec![EscapeAction::Data(b"~a".to_vec())]);
850    }
851
852    #[test]
853    fn multiple_escapes_one_buffer() {
854        let mut ep = EscapeProcessor { state: EscapeState::Normal };
855        let actions = ep.process(b"\n~?\n~.");
856        assert_eq!(
857            actions,
858            vec![
859                EscapeAction::Data(b"\n".to_vec()),
860                EscapeAction::Help,
861                EscapeAction::Data(b"\n".to_vec()),
862                EscapeAction::Detach,
863            ]
864        );
865    }
866
867    #[test]
868    fn consecutive_newlines() {
869        let mut ep = EscapeProcessor { state: EscapeState::Normal };
870        let actions = ep.process(b"\n\n\n~.");
871        assert_eq!(actions, vec![EscapeAction::Data(b"\n\n\n".to_vec()), EscapeAction::Detach,]);
872    }
873
874    #[test]
875    fn detach_stops_processing() {
876        let mut ep = EscapeProcessor { state: EscapeState::Normal };
877        let actions = ep.process(b"\n~.remaining");
878        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
879    }
880
881    #[test]
882    fn tilde_then_newline() {
883        let mut ep = EscapeProcessor { state: EscapeState::Normal };
884        let actions = ep.process(b"\n~\n");
885        assert_eq!(
886            actions,
887            vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~\n".to_vec()),]
888        );
889        assert_eq!(ep.state, EscapeState::AfterNewline);
890    }
891
892    #[test]
893    fn empty_input() {
894        let mut ep = EscapeProcessor::new();
895        let actions = ep.process(b"");
896        assert_eq!(actions, vec![]);
897    }
898
899    #[test]
900    fn only_tilde_buffered() {
901        let mut ep = EscapeProcessor { state: EscapeState::Normal };
902        let a1 = ep.process(b"\n~");
903        assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
904        assert_eq!(ep.state, EscapeState::AfterTilde);
905        let a2 = ep.process(b".");
906        assert_eq!(a2, vec![EscapeAction::Detach]);
907    }
908}