1use crate::protocol::{Frame, FrameCodec};
2use bytes::Bytes;
3use futures_util::{SinkExt, StreamExt};
4use nix::sys::termios::{self, FlushArg, LocalFlags, SetArg, SpecialCharacterIndices, Termios};
5use std::collections::HashMap;
6use std::io::{self, Read, Write};
7use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
8use std::path::Path;
9use std::time::Duration;
10use tokio::io::unix::AsyncFd;
11use tokio::net::UnixStream;
12use tokio::signal::unix::{SignalKind, signal};
13use tokio::sync::mpsc;
14use tokio::time::Instant;
15use tokio_util::codec::Framed;
16use tracing::{debug, info};
17
18const ESCAPE_HELP: &[u8] = b"\r\nSupported escape sequences:\r\n\
21 ~. - detach from session\r\n\
22 ~^Z - suspend client\r\n\
23 ~? - this message\r\n\
24 ~~ - send the escape character by typing it twice\r\n\
25(Note that escapes are only recognized immediately after newline.)\r\n";
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum EscapeState {
29 Normal,
30 AfterNewline,
31 AfterTilde,
32}
33
34#[derive(Debug, PartialEq, Eq)]
35enum EscapeAction {
36 Data(Vec<u8>),
37 Detach,
38 Suspend,
39 Help,
40}
41
42struct EscapeProcessor {
43 state: EscapeState,
44}
45
46impl EscapeProcessor {
47 fn new() -> Self {
48 Self { state: EscapeState::AfterNewline }
49 }
50
51 fn process(&mut self, input: &[u8]) -> Vec<EscapeAction> {
52 let mut actions = Vec::new();
53 let mut data_buf = Vec::new();
54
55 for &b in input {
56 match self.state {
57 EscapeState::Normal => {
58 if b == b'\n' || b == b'\r' {
59 self.state = EscapeState::AfterNewline;
60 }
61 data_buf.push(b);
62 }
63 EscapeState::AfterNewline => {
64 if b == b'~' {
65 self.state = EscapeState::AfterTilde;
66 if !data_buf.is_empty() {
68 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
69 }
70 } else if b == b'\n' || b == b'\r' {
71 data_buf.push(b);
73 } else {
74 self.state = EscapeState::Normal;
75 data_buf.push(b);
76 }
77 }
78 EscapeState::AfterTilde => {
79 match b {
80 b'.' => {
81 if !data_buf.is_empty() {
82 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
83 }
84 actions.push(EscapeAction::Detach);
85 return actions; }
87 0x1a => {
88 if !data_buf.is_empty() {
90 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
91 }
92 actions.push(EscapeAction::Suspend);
93 self.state = EscapeState::Normal;
94 }
95 b'?' => {
96 if !data_buf.is_empty() {
97 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
98 }
99 actions.push(EscapeAction::Help);
100 self.state = EscapeState::Normal;
101 }
102 b'~' => {
103 data_buf.push(b'~');
105 self.state = EscapeState::Normal;
106 }
107 b'\n' | b'\r' => {
108 data_buf.push(b'~');
110 data_buf.push(b);
111 self.state = EscapeState::AfterNewline;
112 }
113 _ => {
114 data_buf.push(b'~');
116 data_buf.push(b);
117 self.state = EscapeState::Normal;
118 }
119 }
120 }
121 }
122 }
123
124 if !data_buf.is_empty() {
125 actions.push(EscapeAction::Data(data_buf));
126 }
127 actions
128 }
129}
130
131fn suspend(raw_guard: &RawModeGuard, nb_guard: &NonBlockGuard) -> anyhow::Result<()> {
132 termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw_guard.original)?;
134 let _ = nix::fcntl::fcntl(nb_guard.fd, nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags));
135
136 nix::sys::signal::kill(nix::unistd::Pid::from_raw(0), nix::sys::signal::Signal::SIGTSTP)?;
137
138 let _ = nix::fcntl::fcntl(
140 nb_guard.fd,
141 nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags | nix::fcntl::OFlag::O_NONBLOCK),
142 );
143 let mut raw = raw_guard.original.clone();
144 termios::cfmakeraw(&mut raw);
145 termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw)?;
146 Ok(())
147}
148
149const SEND_TIMEOUT: Duration = Duration::from_secs(5);
150
151struct NonBlockGuard {
152 fd: BorrowedFd<'static>,
153 original_flags: nix::fcntl::OFlag,
154}
155
156impl NonBlockGuard {
157 fn set(fd: BorrowedFd<'static>) -> nix::Result<Self> {
158 let flags = nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFL)?;
159 let original_flags = nix::fcntl::OFlag::from_bits_truncate(flags);
160 nix::fcntl::fcntl(
161 fd,
162 nix::fcntl::FcntlArg::F_SETFL(original_flags | nix::fcntl::OFlag::O_NONBLOCK),
163 )?;
164 Ok(Self { fd, original_flags })
165 }
166}
167
168impl Drop for NonBlockGuard {
169 fn drop(&mut self) {
170 let _ = nix::fcntl::fcntl(self.fd, nix::fcntl::FcntlArg::F_SETFL(self.original_flags));
171 }
172}
173
174struct RawModeGuard {
175 fd: BorrowedFd<'static>,
176 original: Termios,
177}
178
179impl RawModeGuard {
180 fn enter(fd: BorrowedFd<'static>) -> nix::Result<Self> {
181 let original = termios::tcgetattr(fd)?;
182 let mut raw = original.clone();
183 termios::cfmakeraw(&mut raw);
184 termios::tcsetattr(fd, SetArg::TCSAFLUSH, &raw)?;
185 Ok(Self { fd, original })
186 }
187}
188
189impl Drop for RawModeGuard {
190 fn drop(&mut self) {
191 let _ = termios::tcsetattr(self.fd, SetArg::TCSAFLUSH, &self.original);
192 }
193}
194
195struct SuppressInputGuard {
198 fd: BorrowedFd<'static>,
199 original: Termios,
200}
201
202impl SuppressInputGuard {
203 fn enter(fd: BorrowedFd<'static>) -> nix::Result<Self> {
204 let original = termios::tcgetattr(fd)?;
205 let mut modified = original.clone();
206 modified.local_flags.remove(LocalFlags::ECHO | LocalFlags::ICANON);
207 modified.control_chars[SpecialCharacterIndices::VMIN as usize] = 1;
208 modified.control_chars[SpecialCharacterIndices::VTIME as usize] = 0;
209 termios::tcsetattr(fd, SetArg::TCSAFLUSH, &modified)?;
210 Ok(Self { fd, original })
211 }
212}
213
214impl Drop for SuppressInputGuard {
215 fn drop(&mut self) {
216 let _ = termios::tcflush(self.fd, FlushArg::TCIFLUSH);
217 let _ = termios::tcsetattr(self.fd, SetArg::TCSAFLUSH, &self.original);
218 }
219}
220
221fn write_stdout(data: &[u8]) -> io::Result<()> {
225 let mut stdout = io::stdout();
226 let mut written = 0;
227 while written < data.len() {
228 match stdout.write(&data[written..]) {
229 Ok(n) => written += n,
230 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
231 std::thread::sleep(Duration::from_millis(1));
232 }
233 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
234 Err(e) => return Err(e),
235 }
236 }
237 loop {
238 match stdout.flush() {
239 Ok(()) => return Ok(()),
240 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
241 std::thread::sleep(Duration::from_millis(1));
242 }
243 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
244 Err(e) => return Err(e),
245 }
246 }
247}
248
249fn get_terminal_size() -> (u16, u16) {
250 let mut ws: libc::winsize = unsafe { std::mem::zeroed() };
251 if unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) } != 0
252 || ws.ws_col == 0
253 || ws.ws_row == 0
254 {
255 return (80, 24);
256 }
257 (ws.ws_col, ws.ws_row)
258}
259
260async fn timed_send(framed: &mut Framed<UnixStream, FrameCodec>, frame: Frame) -> bool {
262 match tokio::time::timeout(SEND_TIMEOUT, framed.send(frame)).await {
263 Ok(Ok(())) => true,
264 Ok(Err(e)) => {
265 debug!("send error: {e}");
266 false
267 }
268 Err(_) => {
269 debug!("send timed out");
270 false
271 }
272 }
273}
274
275const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
276const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15);
277
278enum AgentEvent {
280 Data { channel_id: u32, data: Bytes },
281 Closed { channel_id: u32 },
282}
283
284async fn send_init_frames(
287 framed: &mut Framed<UnixStream, FrameCodec>,
288 env_vars: &[(String, String)],
289 forward_agent: bool,
290 agent_socket: Option<&str>,
291 forward_open: bool,
292 redraw: bool,
293) -> bool {
294 if !env_vars.is_empty() && !timed_send(framed, Frame::Env { vars: env_vars.to_vec() }).await {
295 return false;
296 }
297 if forward_agent && agent_socket.is_some() && !timed_send(framed, Frame::AgentForward).await {
298 return false;
299 }
300 if forward_open && !timed_send(framed, Frame::OpenForward).await {
301 return false;
302 }
303 let (cols, rows) = get_terminal_size();
304 if !timed_send(framed, Frame::Resize { cols, rows }).await {
305 return false;
306 }
307 if redraw && !timed_send(framed, Frame::Data(Bytes::from_static(b"\x0c"))).await {
308 return false;
309 }
310 true
311}
312
313#[allow(clippy::too_many_arguments)]
316async fn relay(
317 framed: &mut Framed<UnixStream, FrameCodec>,
318 async_stdin: &AsyncFd<io::Stdin>,
319 sigwinch: &mut tokio::signal::unix::Signal,
320 buf: &mut [u8],
321 mut escape: Option<&mut EscapeProcessor>,
322 raw_guard: &RawModeGuard,
323 nb_guard: &NonBlockGuard,
324 agent_socket: Option<&str>,
325) -> anyhow::Result<Option<i32>> {
326 let mut sigterm = signal(SignalKind::terminate())?;
327 let mut sighup = signal(SignalKind::hangup())?;
328
329 let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
330 heartbeat_interval.reset(); let mut last_pong = Instant::now();
332
333 let mut agent_channels: HashMap<u32, mpsc::UnboundedSender<Bytes>> = HashMap::new();
335 let (agent_event_tx, mut agent_event_rx) = mpsc::unbounded_channel::<AgentEvent>();
336
337 loop {
338 tokio::select! {
339 ready = async_stdin.readable() => {
340 let mut guard = ready?;
341 match guard.try_io(|inner| inner.get_ref().read(buf)) {
342 Ok(Ok(0)) => {
343 debug!("stdin EOF");
344 return Ok(Some(0));
345 }
346 Ok(Ok(n)) => {
347 debug!(len = n, "stdin → socket");
348 if let Some(ref mut esc) = escape {
349 for action in esc.process(&buf[..n]) {
350 match action {
351 EscapeAction::Data(data) => {
352 if !timed_send(framed, Frame::Data(Bytes::from(data))).await {
353 return Ok(None);
354 }
355 }
356 EscapeAction::Detach => {
357 write_stdout(b"\r\n[detached]\r\n")?;
358 return Ok(Some(0));
359 }
360 EscapeAction::Suspend => {
361 suspend(raw_guard, nb_guard)?;
362 let (cols, rows) = get_terminal_size();
364 if !timed_send(framed, Frame::Resize { cols, rows }).await {
365 return Ok(None);
366 }
367 }
368 EscapeAction::Help => {
369 write_stdout(ESCAPE_HELP)?;
370 }
371 }
372 }
373 } else if !timed_send(framed, Frame::Data(Bytes::copy_from_slice(&buf[..n]))).await {
374 return Ok(None);
375 }
376 }
377 Ok(Err(e)) => return Err(e.into()),
378 Err(_would_block) => continue,
379 }
380 }
381
382 frame = framed.next() => {
383 match frame {
384 Some(Ok(Frame::Data(data))) => {
385 debug!(len = data.len(), "socket → stdout");
386 write_stdout(&data)?;
387 }
388 Some(Ok(Frame::Pong)) => {
389 debug!("pong received");
390 last_pong = Instant::now();
391 }
392 Some(Ok(Frame::Exit { code })) => {
393 debug!(code, "server sent exit");
394 return Ok(Some(code));
395 }
396 Some(Ok(Frame::Detached)) => {
397 info!("detached by another client");
398 write_stdout(b"[detached]\r\n")?;
399 return Ok(Some(0));
400 }
401 Some(Ok(Frame::AgentOpen { channel_id })) => {
402 if let Some(sock_path) = agent_socket {
403 match tokio::net::UnixStream::connect(sock_path).await {
404 Ok(stream) => {
405 let (read_half, write_half) = stream.into_split();
406 let data_tx = agent_event_tx.clone();
407 let close_tx = agent_event_tx.clone();
408 let writer_tx = crate::spawn_channel_relay(
409 channel_id,
410 read_half,
411 write_half,
412 move |id, data| data_tx.send(AgentEvent::Data { channel_id: id, data }).is_ok(),
413 move |id| { let _ = close_tx.send(AgentEvent::Closed { channel_id: id }); },
414 );
415 agent_channels.insert(channel_id, writer_tx);
416 }
417 Err(e) => {
418 debug!("failed to connect to local agent: {e}");
419 let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
420 }
421 }
422 } else {
423 let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
424 }
425 }
426 Some(Ok(Frame::AgentData { channel_id, data })) => {
427 if let Some(tx) = agent_channels.get(&channel_id) {
428 let _ = tx.send(data);
429 }
430 }
431 Some(Ok(Frame::AgentClose { channel_id })) => {
432 agent_channels.remove(&channel_id);
433 }
434 Some(Ok(Frame::OpenUrl { url })) => {
435 if url.starts_with("http://") || url.starts_with("https://") {
436 debug!("opening URL locally: {url}");
437 let cmd = if cfg!(target_os = "macos") { "open" } else { "xdg-open" };
438 let _ = std::process::Command::new(cmd)
439 .arg("--")
440 .arg(&url)
441 .stdin(std::process::Stdio::null())
442 .stdout(std::process::Stdio::null())
443 .stderr(std::process::Stdio::null())
444 .spawn();
445 } else {
446 debug!("rejected non-http(s) URL: {url}");
447 }
448 }
449 Some(Ok(_)) => {} Some(Err(e)) => {
451 debug!("server connection error: {e}");
452 return Ok(None);
453 }
454 None => {
455 debug!("server disconnected");
456 return Ok(None);
457 }
458 }
459 }
460
461 event = agent_event_rx.recv() => {
463 match event {
464 Some(AgentEvent::Data { channel_id, data }) => {
465 if agent_channels.contains_key(&channel_id)
466 && !timed_send(framed, Frame::AgentData { channel_id, data }).await
467 {
468 return Ok(None);
469 }
470 }
471 Some(AgentEvent::Closed { channel_id }) => {
472 if agent_channels.remove(&channel_id).is_some()
473 && !timed_send(framed, Frame::AgentClose { channel_id }).await
474 {
475 return Ok(None);
476 }
477 }
478 None => {} }
480 }
481
482 _ = sigwinch.recv() => {
483 let (cols, rows) = get_terminal_size();
484 debug!(cols, rows, "SIGWINCH → resize");
485 if !timed_send(framed, Frame::Resize { cols, rows }).await {
486 return Ok(None);
487 }
488 }
489
490 _ = heartbeat_interval.tick() => {
491 if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
492 debug!("heartbeat timeout");
493 return Ok(None);
494 }
495 if !timed_send(framed, Frame::Ping).await {
496 return Ok(None);
497 }
498 }
499
500 _ = sigterm.recv() => {
501 debug!("SIGTERM received, exiting");
502 return Ok(Some(1));
503 }
504
505 _ = sighup.recv() => {
506 debug!("SIGHUP received, exiting");
507 return Ok(Some(1));
508 }
509 }
510 }
511}
512
513#[allow(clippy::too_many_arguments)]
514pub async fn run(
515 session: &str,
516 mut framed: Framed<UnixStream, FrameCodec>,
517 redraw: bool,
518 ctl_path: &Path,
519 env_vars: Vec<(String, String)>,
520 no_escape: bool,
521 forward_agent: bool,
522 forward_open: bool,
523) -> anyhow::Result<i32> {
524 let stdin = io::stdin();
525 let stdin_fd = stdin.as_fd();
526 let stdin_borrowed: BorrowedFd<'static> =
528 unsafe { BorrowedFd::borrow_raw(stdin_fd.as_raw_fd()) };
529 let raw_guard = RawModeGuard::enter(stdin_borrowed)?;
530
531 let nb_guard = NonBlockGuard::set(stdin_borrowed)?;
534 let async_stdin = AsyncFd::new(io::stdin())?;
535 let mut sigwinch = signal(SignalKind::window_change())?;
536 let mut buf = vec![0u8; 4096];
537 let mut current_redraw = redraw;
538 let mut current_env = env_vars;
539 let mut escape = if no_escape { None } else { Some(EscapeProcessor::new()) };
540 let agent_socket = if forward_agent { std::env::var("SSH_AUTH_SOCK").ok() } else { None };
541
542 loop {
543 let result = if send_init_frames(
544 &mut framed,
545 ¤t_env,
546 forward_agent,
547 agent_socket.as_deref(),
548 forward_open,
549 current_redraw,
550 )
551 .await
552 {
553 relay(
554 &mut framed,
555 &async_stdin,
556 &mut sigwinch,
557 &mut buf,
558 escape.as_mut(),
559 &raw_guard,
560 &nb_guard,
561 agent_socket.as_deref(),
562 )
563 .await?
564 } else {
565 None
566 };
567 match result {
568 Some(code) => return Ok(code),
569 None => {
570 current_env.clear();
572 write_stdout(b"[reconnecting...]\r\n")?;
574
575 loop {
576 tokio::time::sleep(Duration::from_secs(1)).await;
577
578 {
580 let mut peek = [0u8; 1];
581 match io::stdin().read(&mut peek) {
582 Ok(1) if peek[0] == 0x03 => {
583 write_stdout(b"\r\n")?;
584 return Ok(1);
585 }
586 _ => {}
587 }
588 }
589
590 let stream = match UnixStream::connect(ctl_path).await {
591 Ok(s) => s,
592 Err(_) => continue,
593 };
594
595 let mut new_framed = Framed::new(stream, FrameCodec);
596 if crate::handshake(&mut new_framed).await.is_err() {
597 continue;
598 }
599 if new_framed
600 .send(Frame::Attach { session: session.to_string() })
601 .await
602 .is_err()
603 {
604 continue;
605 }
606
607 match new_framed.next().await {
608 Some(Ok(Frame::Ok)) => {
609 write_stdout(b"[reconnected]\r\n")?;
610 framed = new_framed;
611 current_redraw = true;
612 break;
613 }
614 Some(Ok(Frame::Error { message })) => {
615 let msg = format!("[session gone: {message}]\r\n");
616 write_stdout(msg.as_bytes())?;
617 return Ok(1);
618 }
619 _ => continue,
620 }
621 }
622 }
623 }
624 }
625}
626
627pub async fn tail(
631 session: &str,
632 mut framed: Framed<UnixStream, FrameCodec>,
633 ctl_path: &Path,
634) -> anyhow::Result<i32> {
635 let stdin_fd = unsafe { BorrowedFd::borrow_raw(libc::STDIN_FILENO) };
637 let _input_guard = SuppressInputGuard::enter(stdin_fd).ok();
638
639 tokio::task::spawn_blocking(|| {
641 let mut buf = [0u8; 64];
642 let mut belled = false;
643 loop {
644 match io::stdin().read(&mut buf) {
645 Ok(0) | Err(_) => break,
646 Ok(_) if !belled => {
647 let _ = io::stderr().write_all(b"\x07");
648 let _ = io::stderr().flush();
649 belled = true;
650 }
651 _ => {}
652 }
653 }
654 });
655
656 let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
657 heartbeat_interval.reset();
658 let mut last_pong = Instant::now();
659 let mut sigint = signal(SignalKind::interrupt())?;
660 let mut sigterm = signal(SignalKind::terminate())?;
661 let mut sighup = signal(SignalKind::hangup())?;
662
663 let code = 'outer: loop {
664 let result = 'relay: loop {
665 tokio::select! {
666 frame = framed.next() => {
667 match frame {
668 Some(Ok(Frame::Data(data))) => {
669 write_stdout(&data)?;
670 }
671 Some(Ok(Frame::Pong)) => {
672 last_pong = Instant::now();
673 }
674 Some(Ok(Frame::Exit { code })) => {
675 break 'relay Some(code);
676 }
677 Some(Ok(_)) => {}
678 Some(Err(e)) => {
679 debug!("tail connection error: {e}");
680 break 'relay None;
681 }
682 None => {
683 debug!("tail server disconnected");
684 break 'relay None;
685 }
686 }
687 }
688 _ = heartbeat_interval.tick() => {
689 if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
690 debug!("tail heartbeat timeout");
691 break 'relay None;
692 }
693 if framed.send(Frame::Ping).await.is_err() {
694 break 'relay None;
695 }
696 }
697 _ = sigint.recv() => {
698 break 'outer 0;
699 }
700 _ = sigterm.recv() => {
701 break 'outer 1;
702 }
703 _ = sighup.recv() => {
704 break 'outer 1;
705 }
706 }
707 };
708
709 match result {
710 Some(code) => break code,
711 None => {
712 eprintln!("[reconnecting...]");
713 loop {
714 tokio::time::sleep(Duration::from_secs(1)).await;
715
716 let stream = match UnixStream::connect(ctl_path).await {
717 Ok(s) => s,
718 Err(_) => continue,
719 };
720
721 let mut new_framed = Framed::new(stream, FrameCodec);
722 if crate::handshake(&mut new_framed).await.is_err() {
723 continue;
724 }
725 if new_framed.send(Frame::Tail { session: session.to_string() }).await.is_err()
726 {
727 continue;
728 }
729
730 match new_framed.next().await {
731 Some(Ok(Frame::Ok)) => {
732 eprintln!("[reconnected]");
733 framed = new_framed;
734 heartbeat_interval.reset();
735 last_pong = Instant::now();
736 break;
737 }
738 Some(Ok(Frame::Error { message })) => {
739 eprintln!("[session gone: {message}]");
740 break 'outer 1;
741 }
742 _ => continue,
743 }
744 }
745 }
746 }
747 };
748
749 let _ = write_stdout(b"\x1b[0m\x1b[?25h");
752 Ok(code)
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758
759 #[test]
760 fn normal_passthrough() {
761 let mut ep = EscapeProcessor::new();
762 let actions = ep.process(b"hello");
764 assert_eq!(actions, vec![EscapeAction::Data(b"hello".to_vec())]);
765 }
766
767 #[test]
768 fn tilde_after_newline_detach() {
769 let mut ep = EscapeProcessor { state: EscapeState::Normal };
770 let actions = ep.process(b"\n~.");
771 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
772 }
773
774 #[test]
775 fn tilde_after_cr_detach() {
776 let mut ep = EscapeProcessor { state: EscapeState::Normal };
777 let actions = ep.process(b"\r~.");
778 assert_eq!(actions, vec![EscapeAction::Data(b"\r".to_vec()), EscapeAction::Detach,]);
779 }
780
781 #[test]
782 fn tilde_not_after_newline() {
783 let mut ep = EscapeProcessor { state: EscapeState::Normal };
784 let actions = ep.process(b"a~.");
785 assert_eq!(actions, vec![EscapeAction::Data(b"a~.".to_vec())]);
786 }
787
788 #[test]
789 fn initial_state_detach() {
790 let mut ep = EscapeProcessor::new();
791 let actions = ep.process(b"~.");
792 assert_eq!(actions, vec![EscapeAction::Detach]);
793 }
794
795 #[test]
796 fn tilde_suspend() {
797 let mut ep = EscapeProcessor { state: EscapeState::Normal };
798 let actions = ep.process(b"\n~\x1a");
799 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Suspend,]);
800 }
801
802 #[test]
803 fn tilde_help() {
804 let mut ep = EscapeProcessor { state: EscapeState::Normal };
805 let actions = ep.process(b"\n~?");
806 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Help,]);
807 }
808
809 #[test]
810 fn double_tilde() {
811 let mut ep = EscapeProcessor { state: EscapeState::Normal };
812 let actions = ep.process(b"\n~~");
813 assert_eq!(
814 actions,
815 vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~".to_vec()),]
816 );
817 assert_eq!(ep.state, EscapeState::Normal);
818 }
819
820 #[test]
821 fn tilde_unknown_char() {
822 let mut ep = EscapeProcessor { state: EscapeState::Normal };
823 let actions = ep.process(b"\n~x");
824 assert_eq!(
825 actions,
826 vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~x".to_vec()),]
827 );
828 }
829
830 #[test]
831 fn split_across_reads() {
832 let mut ep = EscapeProcessor { state: EscapeState::Normal };
833 let a1 = ep.process(b"\n");
834 assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
835 let a2 = ep.process(b"~");
836 assert_eq!(a2, vec![]); let a3 = ep.process(b".");
838 assert_eq!(a3, vec![EscapeAction::Detach]);
839 }
840
841 #[test]
842 fn split_tilde_then_normal() {
843 let mut ep = EscapeProcessor { state: EscapeState::Normal };
844 let a1 = ep.process(b"\n");
845 assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
846 let a2 = ep.process(b"~");
847 assert_eq!(a2, vec![]);
848 let a3 = ep.process(b"a");
849 assert_eq!(a3, vec![EscapeAction::Data(b"~a".to_vec())]);
850 }
851
852 #[test]
853 fn multiple_escapes_one_buffer() {
854 let mut ep = EscapeProcessor { state: EscapeState::Normal };
855 let actions = ep.process(b"\n~?\n~.");
856 assert_eq!(
857 actions,
858 vec![
859 EscapeAction::Data(b"\n".to_vec()),
860 EscapeAction::Help,
861 EscapeAction::Data(b"\n".to_vec()),
862 EscapeAction::Detach,
863 ]
864 );
865 }
866
867 #[test]
868 fn consecutive_newlines() {
869 let mut ep = EscapeProcessor { state: EscapeState::Normal };
870 let actions = ep.process(b"\n\n\n~.");
871 assert_eq!(actions, vec![EscapeAction::Data(b"\n\n\n".to_vec()), EscapeAction::Detach,]);
872 }
873
874 #[test]
875 fn detach_stops_processing() {
876 let mut ep = EscapeProcessor { state: EscapeState::Normal };
877 let actions = ep.process(b"\n~.remaining");
878 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
879 }
880
881 #[test]
882 fn tilde_then_newline() {
883 let mut ep = EscapeProcessor { state: EscapeState::Normal };
884 let actions = ep.process(b"\n~\n");
885 assert_eq!(
886 actions,
887 vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~\n".to_vec()),]
888 );
889 assert_eq!(ep.state, EscapeState::AfterNewline);
890 }
891
892 #[test]
893 fn empty_input() {
894 let mut ep = EscapeProcessor::new();
895 let actions = ep.process(b"");
896 assert_eq!(actions, vec![]);
897 }
898
899 #[test]
900 fn only_tilde_buffered() {
901 let mut ep = EscapeProcessor { state: EscapeState::Normal };
902 let a1 = ep.process(b"\n~");
903 assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
904 assert_eq!(ep.state, EscapeState::AfterTilde);
905 let a2 = ep.process(b".");
906 assert_eq!(a2, vec![EscapeAction::Detach]);
907 }
908}