Skip to main content

gritty/
client.rs

1use crate::protocol::{Frame, FrameCodec};
2use bytes::Bytes;
3use futures_util::{SinkExt, StreamExt};
4use nix::sys::termios::{self, SetArg, Termios};
5use std::collections::HashMap;
6use std::io::{self, Read, Write};
7use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
8use std::path::Path;
9use std::time::Duration;
10use tokio::io::unix::AsyncFd;
11use tokio::net::UnixStream;
12use tokio::signal::unix::{SignalKind, signal};
13use tokio::sync::mpsc;
14use tokio::time::Instant;
15use tokio_util::codec::Framed;
16use tracing::{debug, info};
17
18// --- Escape sequence processing (SSH-style ~. detach, ~^Z suspend, ~? help) ---
19
20const ESCAPE_HELP: &[u8] = b"\r\nSupported escape sequences:\r\n\
21    ~.  - detach from session\r\n\
22    ~^Z - suspend client\r\n\
23    ~?  - this message\r\n\
24    ~~  - send the escape character by typing it twice\r\n\
25(Note that escapes are only recognized immediately after newline.)\r\n";
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum EscapeState {
29    Normal,
30    AfterNewline,
31    AfterTilde,
32}
33
34#[derive(Debug, PartialEq, Eq)]
35enum EscapeAction {
36    Data(Vec<u8>),
37    Detach,
38    Suspend,
39    Help,
40}
41
42struct EscapeProcessor {
43    state: EscapeState,
44}
45
46impl EscapeProcessor {
47    fn new() -> Self {
48        Self { state: EscapeState::AfterNewline }
49    }
50
51    fn process(&mut self, input: &[u8]) -> Vec<EscapeAction> {
52        let mut actions = Vec::new();
53        let mut data_buf = Vec::new();
54
55        for &b in input {
56            match self.state {
57                EscapeState::Normal => {
58                    if b == b'\n' || b == b'\r' {
59                        self.state = EscapeState::AfterNewline;
60                    }
61                    data_buf.push(b);
62                }
63                EscapeState::AfterNewline => {
64                    if b == b'~' {
65                        self.state = EscapeState::AfterTilde;
66                        // Buffer the tilde — don't send yet
67                        if !data_buf.is_empty() {
68                            actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
69                        }
70                    } else if b == b'\n' || b == b'\r' {
71                        // Stay in AfterNewline
72                        data_buf.push(b);
73                    } else {
74                        self.state = EscapeState::Normal;
75                        data_buf.push(b);
76                    }
77                }
78                EscapeState::AfterTilde => {
79                    match b {
80                        b'.' => {
81                            if !data_buf.is_empty() {
82                                actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
83                            }
84                            actions.push(EscapeAction::Detach);
85                            return actions; // Stop processing
86                        }
87                        0x1a => {
88                            // Ctrl-Z
89                            if !data_buf.is_empty() {
90                                actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
91                            }
92                            actions.push(EscapeAction::Suspend);
93                            self.state = EscapeState::Normal;
94                        }
95                        b'?' => {
96                            if !data_buf.is_empty() {
97                                actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
98                            }
99                            actions.push(EscapeAction::Help);
100                            self.state = EscapeState::Normal;
101                        }
102                        b'~' => {
103                            // Literal tilde
104                            data_buf.push(b'~');
105                            self.state = EscapeState::Normal;
106                        }
107                        b'\n' | b'\r' => {
108                            // Flush buffered tilde + this byte
109                            data_buf.push(b'~');
110                            data_buf.push(b);
111                            self.state = EscapeState::AfterNewline;
112                        }
113                        _ => {
114                            // Unknown — flush tilde + byte
115                            data_buf.push(b'~');
116                            data_buf.push(b);
117                            self.state = EscapeState::Normal;
118                        }
119                    }
120                }
121            }
122        }
123
124        if !data_buf.is_empty() {
125            actions.push(EscapeAction::Data(data_buf));
126        }
127        actions
128    }
129}
130
131fn suspend(raw_guard: &RawModeGuard, nb_guard: &NonBlockGuard) -> anyhow::Result<()> {
132    // Restore cooked mode and blocking stdin so the parent shell works normally
133    termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw_guard.original)?;
134    let _ = nix::fcntl::fcntl(nb_guard.fd, nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags));
135
136    nix::sys::signal::kill(nix::unistd::Pid::from_raw(0), nix::sys::signal::Signal::SIGTSTP)?;
137
138    // After resume (fg): re-enter raw mode and non-blocking stdin
139    let _ = nix::fcntl::fcntl(
140        nb_guard.fd,
141        nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags | nix::fcntl::OFlag::O_NONBLOCK),
142    );
143    let mut raw = raw_guard.original.clone();
144    termios::cfmakeraw(&mut raw);
145    termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw)?;
146    Ok(())
147}
148
149const SEND_TIMEOUT: Duration = Duration::from_secs(5);
150
151struct NonBlockGuard {
152    fd: BorrowedFd<'static>,
153    original_flags: nix::fcntl::OFlag,
154}
155
156impl NonBlockGuard {
157    fn set(fd: BorrowedFd<'static>) -> nix::Result<Self> {
158        let flags = nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFL)?;
159        let original_flags = nix::fcntl::OFlag::from_bits_truncate(flags);
160        nix::fcntl::fcntl(
161            fd,
162            nix::fcntl::FcntlArg::F_SETFL(original_flags | nix::fcntl::OFlag::O_NONBLOCK),
163        )?;
164        Ok(Self { fd, original_flags })
165    }
166}
167
168impl Drop for NonBlockGuard {
169    fn drop(&mut self) {
170        let _ = nix::fcntl::fcntl(self.fd, nix::fcntl::FcntlArg::F_SETFL(self.original_flags));
171    }
172}
173
174struct RawModeGuard {
175    fd: BorrowedFd<'static>,
176    original: Termios,
177}
178
179impl RawModeGuard {
180    fn enter(fd: BorrowedFd<'static>) -> nix::Result<Self> {
181        let original = termios::tcgetattr(fd)?;
182        let mut raw = original.clone();
183        termios::cfmakeraw(&mut raw);
184        termios::tcsetattr(fd, SetArg::TCSAFLUSH, &raw)?;
185        Ok(Self { fd, original })
186    }
187}
188
189impl Drop for RawModeGuard {
190    fn drop(&mut self) {
191        let _ = termios::tcsetattr(self.fd, SetArg::TCSAFLUSH, &self.original);
192    }
193}
194
195/// Write all bytes to stdout, retrying on WouldBlock.
196/// Needed because setting O_NONBLOCK on stdin also affects stdout
197/// when they share the same terminal file description.
198fn write_stdout(data: &[u8]) -> io::Result<()> {
199    let mut stdout = io::stdout();
200    let mut written = 0;
201    while written < data.len() {
202        match stdout.write(&data[written..]) {
203            Ok(n) => written += n,
204            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
205                std::thread::yield_now();
206            }
207            Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
208            Err(e) => return Err(e),
209        }
210    }
211    loop {
212        match stdout.flush() {
213            Ok(()) => return Ok(()),
214            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
215                std::thread::yield_now();
216            }
217            Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
218            Err(e) => return Err(e),
219        }
220    }
221}
222
223fn get_terminal_size() -> (u16, u16) {
224    let mut ws: libc::winsize = unsafe { std::mem::zeroed() };
225    unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) };
226    (ws.ws_col, ws.ws_row)
227}
228
229/// Send a frame with a timeout. Returns false if the send failed or timed out.
230async fn timed_send(framed: &mut Framed<UnixStream, FrameCodec>, frame: Frame) -> bool {
231    match tokio::time::timeout(SEND_TIMEOUT, framed.send(frame)).await {
232        Ok(Ok(())) => true,
233        Ok(Err(e)) => {
234            debug!("send error: {e}");
235            false
236        }
237        Err(_) => {
238            debug!("send timed out");
239            false
240        }
241    }
242}
243
244const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
245const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15);
246
247/// Events from local agent connection tasks to the relay loop.
248enum AgentEvent {
249    Data { channel_id: u32, data: Bytes },
250    Closed { channel_id: u32 },
251}
252
253/// Send session setup frames (env, agent/open forwarding, resize, redraw).
254/// Returns false if the connection dropped during setup.
255async fn send_init_frames(
256    framed: &mut Framed<UnixStream, FrameCodec>,
257    env_vars: &[(String, String)],
258    forward_agent: bool,
259    agent_socket: Option<&str>,
260    forward_open: bool,
261    redraw: bool,
262) -> bool {
263    if !env_vars.is_empty() && !timed_send(framed, Frame::Env { vars: env_vars.to_vec() }).await {
264        return false;
265    }
266    if forward_agent && agent_socket.is_some() && !timed_send(framed, Frame::AgentForward).await {
267        return false;
268    }
269    if forward_open && !timed_send(framed, Frame::OpenForward).await {
270        return false;
271    }
272    let (cols, rows) = get_terminal_size();
273    if !timed_send(framed, Frame::Resize { cols, rows }).await {
274        return false;
275    }
276    if redraw && !timed_send(framed, Frame::Data(Bytes::from_static(b"\x0c"))).await {
277        return false;
278    }
279    true
280}
281
282/// Relay between stdin/stdout and the framed socket.
283/// Returns `Some(code)` on clean shell exit or detach, `None` on server disconnect / heartbeat timeout.
284#[allow(clippy::too_many_arguments)]
285async fn relay(
286    framed: &mut Framed<UnixStream, FrameCodec>,
287    async_stdin: &AsyncFd<io::Stdin>,
288    sigwinch: &mut tokio::signal::unix::Signal,
289    buf: &mut [u8],
290    mut escape: Option<&mut EscapeProcessor>,
291    raw_guard: &RawModeGuard,
292    nb_guard: &NonBlockGuard,
293    agent_socket: Option<&str>,
294) -> anyhow::Result<Option<i32>> {
295    let mut sigterm = signal(SignalKind::terminate())?;
296    let mut sighup = signal(SignalKind::hangup())?;
297
298    let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
299    heartbeat_interval.reset(); // first tick is immediate otherwise; delay it
300    let mut last_pong = Instant::now();
301
302    // Agent channel management
303    let mut agent_channels: HashMap<u32, mpsc::UnboundedSender<Bytes>> = HashMap::new();
304    let (agent_event_tx, mut agent_event_rx) = mpsc::unbounded_channel::<AgentEvent>();
305
306    loop {
307        tokio::select! {
308            ready = async_stdin.readable() => {
309                let mut guard = ready?;
310                match guard.try_io(|inner| inner.get_ref().read(buf)) {
311                    Ok(Ok(0)) => {
312                        debug!("stdin EOF");
313                        return Ok(Some(0));
314                    }
315                    Ok(Ok(n)) => {
316                        debug!(len = n, "stdin → socket");
317                        if let Some(ref mut esc) = escape {
318                            for action in esc.process(&buf[..n]) {
319                                match action {
320                                    EscapeAction::Data(data) => {
321                                        if !timed_send(framed, Frame::Data(Bytes::from(data))).await {
322                                            return Ok(None);
323                                        }
324                                    }
325                                    EscapeAction::Detach => {
326                                        write_stdout(b"\r\n[detached]\r\n")?;
327                                        return Ok(Some(0));
328                                    }
329                                    EscapeAction::Suspend => {
330                                        suspend(raw_guard, nb_guard)?;
331                                        // Re-sync terminal size after resume
332                                        let (cols, rows) = get_terminal_size();
333                                        if !timed_send(framed, Frame::Resize { cols, rows }).await {
334                                            return Ok(None);
335                                        }
336                                    }
337                                    EscapeAction::Help => {
338                                        write_stdout(ESCAPE_HELP)?;
339                                    }
340                                }
341                            }
342                        } else if !timed_send(framed, Frame::Data(Bytes::copy_from_slice(&buf[..n]))).await {
343                            return Ok(None);
344                        }
345                    }
346                    Ok(Err(e)) => return Err(e.into()),
347                    Err(_would_block) => continue,
348                }
349            }
350
351            frame = framed.next() => {
352                match frame {
353                    Some(Ok(Frame::Data(data))) => {
354                        debug!(len = data.len(), "socket → stdout");
355                        write_stdout(&data)?;
356                    }
357                    Some(Ok(Frame::Pong)) => {
358                        debug!("pong received");
359                        last_pong = Instant::now();
360                    }
361                    Some(Ok(Frame::Exit { code })) => {
362                        info!(code, "server sent exit");
363                        return Ok(Some(code));
364                    }
365                    Some(Ok(Frame::Detached)) => {
366                        info!("detached by another client");
367                        write_stdout(b"[detached]\r\n")?;
368                        return Ok(Some(0));
369                    }
370                    Some(Ok(Frame::AgentOpen { channel_id })) => {
371                        if let Some(sock_path) = agent_socket {
372                            match tokio::net::UnixStream::connect(sock_path).await {
373                                Ok(stream) => {
374                                    let (read_half, write_half) = stream.into_split();
375                                    let data_tx = agent_event_tx.clone();
376                                    let close_tx = agent_event_tx.clone();
377                                    let writer_tx = crate::spawn_channel_relay(
378                                        channel_id,
379                                        read_half,
380                                        write_half,
381                                        move |id, data| data_tx.send(AgentEvent::Data { channel_id: id, data }).is_ok(),
382                                        move |id| { let _ = close_tx.send(AgentEvent::Closed { channel_id: id }); },
383                                    );
384                                    agent_channels.insert(channel_id, writer_tx);
385                                }
386                                Err(e) => {
387                                    debug!("failed to connect to local agent: {e}");
388                                    let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
389                                }
390                            }
391                        } else {
392                            let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
393                        }
394                    }
395                    Some(Ok(Frame::AgentData { channel_id, data })) => {
396                        if let Some(tx) = agent_channels.get(&channel_id) {
397                            let _ = tx.send(data);
398                        }
399                    }
400                    Some(Ok(Frame::AgentClose { channel_id })) => {
401                        agent_channels.remove(&channel_id);
402                    }
403                    Some(Ok(Frame::OpenUrl { url })) => {
404                        debug!("opening URL locally: {url}");
405                        let cmd = if cfg!(target_os = "macos") { "open" } else { "xdg-open" };
406                        let _ = std::process::Command::new(cmd)
407                            .arg(&url)
408                            .stdin(std::process::Stdio::null())
409                            .stdout(std::process::Stdio::null())
410                            .stderr(std::process::Stdio::null())
411                            .spawn();
412                    }
413                    Some(Ok(_)) => {} // ignore control/resize frames
414                    Some(Err(e)) => {
415                        debug!("server connection error: {e}");
416                        return Ok(None);
417                    }
418                    None => {
419                        debug!("server disconnected");
420                        return Ok(None);
421                    }
422                }
423            }
424
425            // Agent events from local agent connections
426            event = agent_event_rx.recv() => {
427                match event {
428                    Some(AgentEvent::Data { channel_id, data }) => {
429                        if agent_channels.contains_key(&channel_id)
430                            && !timed_send(framed, Frame::AgentData { channel_id, data }).await
431                        {
432                            return Ok(None);
433                        }
434                    }
435                    Some(AgentEvent::Closed { channel_id }) => {
436                        if agent_channels.remove(&channel_id).is_some()
437                            && !timed_send(framed, Frame::AgentClose { channel_id }).await
438                        {
439                            return Ok(None);
440                        }
441                    }
442                    None => {} // no more agent tasks
443                }
444            }
445
446            _ = sigwinch.recv() => {
447                let (cols, rows) = get_terminal_size();
448                debug!(cols, rows, "SIGWINCH → resize");
449                if !timed_send(framed, Frame::Resize { cols, rows }).await {
450                    return Ok(None);
451                }
452            }
453
454            _ = heartbeat_interval.tick() => {
455                if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
456                    debug!("heartbeat timeout");
457                    return Ok(None);
458                }
459                if !timed_send(framed, Frame::Ping).await {
460                    return Ok(None);
461                }
462            }
463
464            _ = sigterm.recv() => {
465                debug!("SIGTERM received, exiting");
466                return Ok(Some(1));
467            }
468
469            _ = sighup.recv() => {
470                debug!("SIGHUP received, exiting");
471                return Ok(Some(1));
472            }
473        }
474    }
475}
476
477#[allow(clippy::too_many_arguments)]
478pub async fn run(
479    session: &str,
480    mut framed: Framed<UnixStream, FrameCodec>,
481    redraw: bool,
482    ctl_path: &Path,
483    env_vars: Vec<(String, String)>,
484    no_escape: bool,
485    forward_agent: bool,
486    forward_open: bool,
487) -> anyhow::Result<i32> {
488    let stdin = io::stdin();
489    let stdin_fd = stdin.as_fd();
490    // Safety: stdin lives for the duration of the program
491    let stdin_borrowed: BorrowedFd<'static> =
492        unsafe { BorrowedFd::borrow_raw(stdin_fd.as_raw_fd()) };
493    let raw_guard = RawModeGuard::enter(stdin_borrowed)?;
494
495    // Set stdin to non-blocking for AsyncFd — guard restores on drop.
496    // Declared BEFORE async_stdin so it drops AFTER AsyncFd (reverse drop order).
497    let nb_guard = NonBlockGuard::set(stdin_borrowed)?;
498    let async_stdin = AsyncFd::new(io::stdin())?;
499    let mut sigwinch = signal(SignalKind::window_change())?;
500    let mut buf = vec![0u8; 4096];
501    let mut current_redraw = redraw;
502    let mut current_env = env_vars;
503    let mut escape = if no_escape { None } else { Some(EscapeProcessor::new()) };
504    let agent_socket = if forward_agent { std::env::var("SSH_AUTH_SOCK").ok() } else { None };
505
506    loop {
507        let result = if send_init_frames(
508            &mut framed,
509            &current_env,
510            forward_agent,
511            agent_socket.as_deref(),
512            forward_open,
513            current_redraw,
514        )
515        .await
516        {
517            relay(
518                &mut framed,
519                &async_stdin,
520                &mut sigwinch,
521                &mut buf,
522                escape.as_mut(),
523                &raw_guard,
524                &nb_guard,
525                agent_socket.as_deref(),
526            )
527            .await?
528        } else {
529            None
530        };
531        match result {
532            Some(code) => return Ok(code),
533            None => {
534                // Env vars only sent on first connection; clear for reconnect
535                current_env.clear();
536                // Disconnected — try to reconnect
537                write_stdout(b"[reconnecting...]\r\n")?;
538
539                loop {
540                    tokio::time::sleep(Duration::from_secs(1)).await;
541
542                    // Check for Ctrl-C (0x03) in raw mode
543                    {
544                        let mut peek = [0u8; 1];
545                        match io::stdin().read(&mut peek) {
546                            Ok(1) if peek[0] == 0x03 => {
547                                write_stdout(b"\r\n")?;
548                                return Ok(1);
549                            }
550                            _ => {}
551                        }
552                    }
553
554                    let stream = match UnixStream::connect(ctl_path).await {
555                        Ok(s) => s,
556                        Err(_) => continue,
557                    };
558
559                    let mut new_framed = Framed::new(stream, FrameCodec);
560                    if new_framed
561                        .send(Frame::Attach { session: session.to_string() })
562                        .await
563                        .is_err()
564                    {
565                        continue;
566                    }
567
568                    match new_framed.next().await {
569                        Some(Ok(Frame::Ok)) => {
570                            write_stdout(b"[reconnected]\r\n")?;
571                            framed = new_framed;
572                            current_redraw = true;
573                            break;
574                        }
575                        Some(Ok(Frame::Error { message })) => {
576                            let msg = format!("[session gone: {message}]\r\n");
577                            write_stdout(msg.as_bytes())?;
578                            return Ok(1);
579                        }
580                        _ => continue,
581                    }
582                }
583            }
584        }
585    }
586}
587
588/// Read-only tail of a session's PTY output.
589/// No raw mode, no stdin, no escape processing, no forwarding.
590/// Ctrl-C exits normally (default SIGINT handler).
591pub async fn tail(
592    session: &str,
593    mut framed: Framed<UnixStream, FrameCodec>,
594    ctl_path: &Path,
595) -> anyhow::Result<i32> {
596    let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
597    heartbeat_interval.reset();
598    let mut last_pong = Instant::now();
599    let mut sigterm = signal(SignalKind::terminate())?;
600    let mut sighup = signal(SignalKind::hangup())?;
601
602    loop {
603        let result = 'relay: loop {
604            tokio::select! {
605                frame = framed.next() => {
606                    match frame {
607                        Some(Ok(Frame::Data(data))) => {
608                            write_stdout(&data)?;
609                        }
610                        Some(Ok(Frame::Pong)) => {
611                            last_pong = Instant::now();
612                        }
613                        Some(Ok(Frame::Exit { code })) => {
614                            break 'relay Some(code);
615                        }
616                        Some(Ok(_)) => {}
617                        Some(Err(e)) => {
618                            debug!("tail connection error: {e}");
619                            break 'relay None;
620                        }
621                        None => {
622                            debug!("tail server disconnected");
623                            break 'relay None;
624                        }
625                    }
626                }
627                _ = heartbeat_interval.tick() => {
628                    if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
629                        debug!("tail heartbeat timeout");
630                        break 'relay None;
631                    }
632                    if framed.send(Frame::Ping).await.is_err() {
633                        break 'relay None;
634                    }
635                }
636                _ = sigterm.recv() => {
637                    break 'relay Some(1);
638                }
639                _ = sighup.recv() => {
640                    break 'relay Some(1);
641                }
642            }
643        };
644
645        match result {
646            Some(code) => return Ok(code),
647            None => {
648                eprintln!("[reconnecting...]");
649                loop {
650                    tokio::time::sleep(Duration::from_secs(1)).await;
651
652                    let stream = match UnixStream::connect(ctl_path).await {
653                        Ok(s) => s,
654                        Err(_) => continue,
655                    };
656
657                    let mut new_framed = Framed::new(stream, FrameCodec);
658                    if new_framed.send(Frame::Tail { session: session.to_string() }).await.is_err()
659                    {
660                        continue;
661                    }
662
663                    match new_framed.next().await {
664                        Some(Ok(Frame::Ok)) => {
665                            eprintln!("[reconnected]");
666                            framed = new_framed;
667                            heartbeat_interval.reset();
668                            last_pong = Instant::now();
669                            break;
670                        }
671                        Some(Ok(Frame::Error { message })) => {
672                            eprintln!("[session gone: {message}]");
673                            return Ok(1);
674                        }
675                        _ => continue,
676                    }
677                }
678            }
679        }
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use super::*;
686
687    #[test]
688    fn normal_passthrough() {
689        let mut ep = EscapeProcessor::new();
690        // No newlines — after initial AfterNewline, 'h' transitions to Normal
691        let actions = ep.process(b"hello");
692        assert_eq!(actions, vec![EscapeAction::Data(b"hello".to_vec())]);
693    }
694
695    #[test]
696    fn tilde_after_newline_detach() {
697        let mut ep = EscapeProcessor { state: EscapeState::Normal };
698        let actions = ep.process(b"\n~.");
699        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
700    }
701
702    #[test]
703    fn tilde_after_cr_detach() {
704        let mut ep = EscapeProcessor { state: EscapeState::Normal };
705        let actions = ep.process(b"\r~.");
706        assert_eq!(actions, vec![EscapeAction::Data(b"\r".to_vec()), EscapeAction::Detach,]);
707    }
708
709    #[test]
710    fn tilde_not_after_newline() {
711        let mut ep = EscapeProcessor { state: EscapeState::Normal };
712        let actions = ep.process(b"a~.");
713        assert_eq!(actions, vec![EscapeAction::Data(b"a~.".to_vec())]);
714    }
715
716    #[test]
717    fn initial_state_detach() {
718        let mut ep = EscapeProcessor::new();
719        let actions = ep.process(b"~.");
720        assert_eq!(actions, vec![EscapeAction::Detach]);
721    }
722
723    #[test]
724    fn tilde_suspend() {
725        let mut ep = EscapeProcessor { state: EscapeState::Normal };
726        let actions = ep.process(b"\n~\x1a");
727        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Suspend,]);
728    }
729
730    #[test]
731    fn tilde_help() {
732        let mut ep = EscapeProcessor { state: EscapeState::Normal };
733        let actions = ep.process(b"\n~?");
734        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Help,]);
735    }
736
737    #[test]
738    fn double_tilde() {
739        let mut ep = EscapeProcessor { state: EscapeState::Normal };
740        let actions = ep.process(b"\n~~");
741        assert_eq!(
742            actions,
743            vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~".to_vec()),]
744        );
745        assert_eq!(ep.state, EscapeState::Normal);
746    }
747
748    #[test]
749    fn tilde_unknown_char() {
750        let mut ep = EscapeProcessor { state: EscapeState::Normal };
751        let actions = ep.process(b"\n~x");
752        assert_eq!(
753            actions,
754            vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~x".to_vec()),]
755        );
756    }
757
758    #[test]
759    fn split_across_reads() {
760        let mut ep = EscapeProcessor { state: EscapeState::Normal };
761        let a1 = ep.process(b"\n");
762        assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
763        let a2 = ep.process(b"~");
764        assert_eq!(a2, vec![]); // tilde buffered
765        let a3 = ep.process(b".");
766        assert_eq!(a3, vec![EscapeAction::Detach]);
767    }
768
769    #[test]
770    fn split_tilde_then_normal() {
771        let mut ep = EscapeProcessor { state: EscapeState::Normal };
772        let a1 = ep.process(b"\n");
773        assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
774        let a2 = ep.process(b"~");
775        assert_eq!(a2, vec![]);
776        let a3 = ep.process(b"a");
777        assert_eq!(a3, vec![EscapeAction::Data(b"~a".to_vec())]);
778    }
779
780    #[test]
781    fn multiple_escapes_one_buffer() {
782        let mut ep = EscapeProcessor { state: EscapeState::Normal };
783        let actions = ep.process(b"\n~?\n~.");
784        assert_eq!(
785            actions,
786            vec![
787                EscapeAction::Data(b"\n".to_vec()),
788                EscapeAction::Help,
789                EscapeAction::Data(b"\n".to_vec()),
790                EscapeAction::Detach,
791            ]
792        );
793    }
794
795    #[test]
796    fn consecutive_newlines() {
797        let mut ep = EscapeProcessor { state: EscapeState::Normal };
798        let actions = ep.process(b"\n\n\n~.");
799        assert_eq!(actions, vec![EscapeAction::Data(b"\n\n\n".to_vec()), EscapeAction::Detach,]);
800    }
801
802    #[test]
803    fn detach_stops_processing() {
804        let mut ep = EscapeProcessor { state: EscapeState::Normal };
805        let actions = ep.process(b"\n~.remaining");
806        assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
807    }
808
809    #[test]
810    fn tilde_then_newline() {
811        let mut ep = EscapeProcessor { state: EscapeState::Normal };
812        let actions = ep.process(b"\n~\n");
813        assert_eq!(
814            actions,
815            vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~\n".to_vec()),]
816        );
817        assert_eq!(ep.state, EscapeState::AfterNewline);
818    }
819
820    #[test]
821    fn empty_input() {
822        let mut ep = EscapeProcessor::new();
823        let actions = ep.process(b"");
824        assert_eq!(actions, vec![]);
825    }
826
827    #[test]
828    fn only_tilde_buffered() {
829        let mut ep = EscapeProcessor { state: EscapeState::Normal };
830        let a1 = ep.process(b"\n~");
831        assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
832        assert_eq!(ep.state, EscapeState::AfterTilde);
833        let a2 = ep.process(b".");
834        assert_eq!(a2, vec![EscapeAction::Detach]);
835    }
836}