1use crate::protocol::{Frame, FrameCodec};
2use bytes::Bytes;
3use futures_util::{SinkExt, StreamExt};
4use nix::sys::termios::{self, SetArg, Termios};
5use std::collections::HashMap;
6use std::io::{self, Read, Write};
7use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
8use std::path::Path;
9use std::time::Duration;
10use tokio::io::unix::AsyncFd;
11use tokio::net::UnixStream;
12use tokio::signal::unix::{SignalKind, signal};
13use tokio::sync::mpsc;
14use tokio::time::Instant;
15use tokio_util::codec::Framed;
16use tracing::{debug, info};
17
18const ESCAPE_HELP: &[u8] = b"\r\nSupported escape sequences:\r\n\
21 ~. - detach from session\r\n\
22 ~^Z - suspend client\r\n\
23 ~? - this message\r\n\
24 ~~ - send the escape character by typing it twice\r\n\
25(Note that escapes are only recognized immediately after newline.)\r\n";
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum EscapeState {
29 Normal,
30 AfterNewline,
31 AfterTilde,
32}
33
34#[derive(Debug, PartialEq, Eq)]
35enum EscapeAction {
36 Data(Vec<u8>),
37 Detach,
38 Suspend,
39 Help,
40}
41
42struct EscapeProcessor {
43 state: EscapeState,
44}
45
46impl EscapeProcessor {
47 fn new() -> Self {
48 Self { state: EscapeState::AfterNewline }
49 }
50
51 fn process(&mut self, input: &[u8]) -> Vec<EscapeAction> {
52 let mut actions = Vec::new();
53 let mut data_buf = Vec::new();
54
55 for &b in input {
56 match self.state {
57 EscapeState::Normal => {
58 if b == b'\n' || b == b'\r' {
59 self.state = EscapeState::AfterNewline;
60 }
61 data_buf.push(b);
62 }
63 EscapeState::AfterNewline => {
64 if b == b'~' {
65 self.state = EscapeState::AfterTilde;
66 if !data_buf.is_empty() {
68 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
69 }
70 } else if b == b'\n' || b == b'\r' {
71 data_buf.push(b);
73 } else {
74 self.state = EscapeState::Normal;
75 data_buf.push(b);
76 }
77 }
78 EscapeState::AfterTilde => {
79 match b {
80 b'.' => {
81 if !data_buf.is_empty() {
82 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
83 }
84 actions.push(EscapeAction::Detach);
85 return actions; }
87 0x1a => {
88 if !data_buf.is_empty() {
90 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
91 }
92 actions.push(EscapeAction::Suspend);
93 self.state = EscapeState::Normal;
94 }
95 b'?' => {
96 if !data_buf.is_empty() {
97 actions.push(EscapeAction::Data(std::mem::take(&mut data_buf)));
98 }
99 actions.push(EscapeAction::Help);
100 self.state = EscapeState::Normal;
101 }
102 b'~' => {
103 data_buf.push(b'~');
105 self.state = EscapeState::Normal;
106 }
107 b'\n' | b'\r' => {
108 data_buf.push(b'~');
110 data_buf.push(b);
111 self.state = EscapeState::AfterNewline;
112 }
113 _ => {
114 data_buf.push(b'~');
116 data_buf.push(b);
117 self.state = EscapeState::Normal;
118 }
119 }
120 }
121 }
122 }
123
124 if !data_buf.is_empty() {
125 actions.push(EscapeAction::Data(data_buf));
126 }
127 actions
128 }
129}
130
131fn suspend(raw_guard: &RawModeGuard, nb_guard: &NonBlockGuard) -> anyhow::Result<()> {
132 termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw_guard.original)?;
134 let _ = nix::fcntl::fcntl(nb_guard.fd, nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags));
135
136 nix::sys::signal::kill(nix::unistd::Pid::from_raw(0), nix::sys::signal::Signal::SIGTSTP)?;
137
138 let _ = nix::fcntl::fcntl(
140 nb_guard.fd,
141 nix::fcntl::FcntlArg::F_SETFL(nb_guard.original_flags | nix::fcntl::OFlag::O_NONBLOCK),
142 );
143 let mut raw = raw_guard.original.clone();
144 termios::cfmakeraw(&mut raw);
145 termios::tcsetattr(raw_guard.fd, SetArg::TCSAFLUSH, &raw)?;
146 Ok(())
147}
148
149const SEND_TIMEOUT: Duration = Duration::from_secs(5);
150
151struct NonBlockGuard {
152 fd: BorrowedFd<'static>,
153 original_flags: nix::fcntl::OFlag,
154}
155
156impl NonBlockGuard {
157 fn set(fd: BorrowedFd<'static>) -> nix::Result<Self> {
158 let flags = nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFL)?;
159 let original_flags = nix::fcntl::OFlag::from_bits_truncate(flags);
160 nix::fcntl::fcntl(
161 fd,
162 nix::fcntl::FcntlArg::F_SETFL(original_flags | nix::fcntl::OFlag::O_NONBLOCK),
163 )?;
164 Ok(Self { fd, original_flags })
165 }
166}
167
168impl Drop for NonBlockGuard {
169 fn drop(&mut self) {
170 let _ = nix::fcntl::fcntl(self.fd, nix::fcntl::FcntlArg::F_SETFL(self.original_flags));
171 }
172}
173
174struct RawModeGuard {
175 fd: BorrowedFd<'static>,
176 original: Termios,
177}
178
179impl RawModeGuard {
180 fn enter(fd: BorrowedFd<'static>) -> nix::Result<Self> {
181 let original = termios::tcgetattr(fd)?;
182 let mut raw = original.clone();
183 termios::cfmakeraw(&mut raw);
184 termios::tcsetattr(fd, SetArg::TCSAFLUSH, &raw)?;
185 Ok(Self { fd, original })
186 }
187}
188
189impl Drop for RawModeGuard {
190 fn drop(&mut self) {
191 let _ = termios::tcsetattr(self.fd, SetArg::TCSAFLUSH, &self.original);
192 }
193}
194
195fn write_stdout(data: &[u8]) -> io::Result<()> {
199 let mut stdout = io::stdout();
200 let mut written = 0;
201 while written < data.len() {
202 match stdout.write(&data[written..]) {
203 Ok(n) => written += n,
204 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
205 std::thread::yield_now();
206 }
207 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
208 Err(e) => return Err(e),
209 }
210 }
211 loop {
212 match stdout.flush() {
213 Ok(()) => return Ok(()),
214 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
215 std::thread::yield_now();
216 }
217 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
218 Err(e) => return Err(e),
219 }
220 }
221}
222
223fn get_terminal_size() -> (u16, u16) {
224 let mut ws: libc::winsize = unsafe { std::mem::zeroed() };
225 unsafe { libc::ioctl(libc::STDIN_FILENO, libc::TIOCGWINSZ, &mut ws) };
226 (ws.ws_col, ws.ws_row)
227}
228
229async fn timed_send(framed: &mut Framed<UnixStream, FrameCodec>, frame: Frame) -> bool {
231 match tokio::time::timeout(SEND_TIMEOUT, framed.send(frame)).await {
232 Ok(Ok(())) => true,
233 Ok(Err(e)) => {
234 debug!("send error: {e}");
235 false
236 }
237 Err(_) => {
238 debug!("send timed out");
239 false
240 }
241 }
242}
243
244const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
245const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(15);
246
247enum AgentEvent {
249 Data { channel_id: u32, data: Bytes },
250 Closed { channel_id: u32 },
251}
252
253async fn send_init_frames(
256 framed: &mut Framed<UnixStream, FrameCodec>,
257 env_vars: &[(String, String)],
258 forward_agent: bool,
259 agent_socket: Option<&str>,
260 forward_open: bool,
261 redraw: bool,
262) -> bool {
263 if !env_vars.is_empty() && !timed_send(framed, Frame::Env { vars: env_vars.to_vec() }).await {
264 return false;
265 }
266 if forward_agent && agent_socket.is_some() && !timed_send(framed, Frame::AgentForward).await {
267 return false;
268 }
269 if forward_open && !timed_send(framed, Frame::OpenForward).await {
270 return false;
271 }
272 let (cols, rows) = get_terminal_size();
273 if !timed_send(framed, Frame::Resize { cols, rows }).await {
274 return false;
275 }
276 if redraw && !timed_send(framed, Frame::Data(Bytes::from_static(b"\x0c"))).await {
277 return false;
278 }
279 true
280}
281
282#[allow(clippy::too_many_arguments)]
285async fn relay(
286 framed: &mut Framed<UnixStream, FrameCodec>,
287 async_stdin: &AsyncFd<io::Stdin>,
288 sigwinch: &mut tokio::signal::unix::Signal,
289 buf: &mut [u8],
290 mut escape: Option<&mut EscapeProcessor>,
291 raw_guard: &RawModeGuard,
292 nb_guard: &NonBlockGuard,
293 agent_socket: Option<&str>,
294) -> anyhow::Result<Option<i32>> {
295 let mut sigterm = signal(SignalKind::terminate())?;
296 let mut sighup = signal(SignalKind::hangup())?;
297
298 let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
299 heartbeat_interval.reset(); let mut last_pong = Instant::now();
301
302 let mut agent_channels: HashMap<u32, mpsc::UnboundedSender<Bytes>> = HashMap::new();
304 let (agent_event_tx, mut agent_event_rx) = mpsc::unbounded_channel::<AgentEvent>();
305
306 loop {
307 tokio::select! {
308 ready = async_stdin.readable() => {
309 let mut guard = ready?;
310 match guard.try_io(|inner| inner.get_ref().read(buf)) {
311 Ok(Ok(0)) => {
312 debug!("stdin EOF");
313 return Ok(Some(0));
314 }
315 Ok(Ok(n)) => {
316 debug!(len = n, "stdin → socket");
317 if let Some(ref mut esc) = escape {
318 for action in esc.process(&buf[..n]) {
319 match action {
320 EscapeAction::Data(data) => {
321 if !timed_send(framed, Frame::Data(Bytes::from(data))).await {
322 return Ok(None);
323 }
324 }
325 EscapeAction::Detach => {
326 write_stdout(b"\r\n[detached]\r\n")?;
327 return Ok(Some(0));
328 }
329 EscapeAction::Suspend => {
330 suspend(raw_guard, nb_guard)?;
331 let (cols, rows) = get_terminal_size();
333 if !timed_send(framed, Frame::Resize { cols, rows }).await {
334 return Ok(None);
335 }
336 }
337 EscapeAction::Help => {
338 write_stdout(ESCAPE_HELP)?;
339 }
340 }
341 }
342 } else if !timed_send(framed, Frame::Data(Bytes::copy_from_slice(&buf[..n]))).await {
343 return Ok(None);
344 }
345 }
346 Ok(Err(e)) => return Err(e.into()),
347 Err(_would_block) => continue,
348 }
349 }
350
351 frame = framed.next() => {
352 match frame {
353 Some(Ok(Frame::Data(data))) => {
354 debug!(len = data.len(), "socket → stdout");
355 write_stdout(&data)?;
356 }
357 Some(Ok(Frame::Pong)) => {
358 debug!("pong received");
359 last_pong = Instant::now();
360 }
361 Some(Ok(Frame::Exit { code })) => {
362 info!(code, "server sent exit");
363 return Ok(Some(code));
364 }
365 Some(Ok(Frame::Detached)) => {
366 info!("detached by another client");
367 write_stdout(b"[detached]\r\n")?;
368 return Ok(Some(0));
369 }
370 Some(Ok(Frame::AgentOpen { channel_id })) => {
371 if let Some(sock_path) = agent_socket {
372 match tokio::net::UnixStream::connect(sock_path).await {
373 Ok(stream) => {
374 let (read_half, write_half) = stream.into_split();
375 let data_tx = agent_event_tx.clone();
376 let close_tx = agent_event_tx.clone();
377 let writer_tx = crate::spawn_channel_relay(
378 channel_id,
379 read_half,
380 write_half,
381 move |id, data| data_tx.send(AgentEvent::Data { channel_id: id, data }).is_ok(),
382 move |id| { let _ = close_tx.send(AgentEvent::Closed { channel_id: id }); },
383 );
384 agent_channels.insert(channel_id, writer_tx);
385 }
386 Err(e) => {
387 debug!("failed to connect to local agent: {e}");
388 let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
389 }
390 }
391 } else {
392 let _ = timed_send(framed, Frame::AgentClose { channel_id }).await;
393 }
394 }
395 Some(Ok(Frame::AgentData { channel_id, data })) => {
396 if let Some(tx) = agent_channels.get(&channel_id) {
397 let _ = tx.send(data);
398 }
399 }
400 Some(Ok(Frame::AgentClose { channel_id })) => {
401 agent_channels.remove(&channel_id);
402 }
403 Some(Ok(Frame::OpenUrl { url })) => {
404 debug!("opening URL locally: {url}");
405 let cmd = if cfg!(target_os = "macos") { "open" } else { "xdg-open" };
406 let _ = std::process::Command::new(cmd)
407 .arg(&url)
408 .stdin(std::process::Stdio::null())
409 .stdout(std::process::Stdio::null())
410 .stderr(std::process::Stdio::null())
411 .spawn();
412 }
413 Some(Ok(_)) => {} Some(Err(e)) => {
415 debug!("server connection error: {e}");
416 return Ok(None);
417 }
418 None => {
419 debug!("server disconnected");
420 return Ok(None);
421 }
422 }
423 }
424
425 event = agent_event_rx.recv() => {
427 match event {
428 Some(AgentEvent::Data { channel_id, data }) => {
429 if agent_channels.contains_key(&channel_id)
430 && !timed_send(framed, Frame::AgentData { channel_id, data }).await
431 {
432 return Ok(None);
433 }
434 }
435 Some(AgentEvent::Closed { channel_id }) => {
436 if agent_channels.remove(&channel_id).is_some()
437 && !timed_send(framed, Frame::AgentClose { channel_id }).await
438 {
439 return Ok(None);
440 }
441 }
442 None => {} }
444 }
445
446 _ = sigwinch.recv() => {
447 let (cols, rows) = get_terminal_size();
448 debug!(cols, rows, "SIGWINCH → resize");
449 if !timed_send(framed, Frame::Resize { cols, rows }).await {
450 return Ok(None);
451 }
452 }
453
454 _ = heartbeat_interval.tick() => {
455 if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
456 debug!("heartbeat timeout");
457 return Ok(None);
458 }
459 if !timed_send(framed, Frame::Ping).await {
460 return Ok(None);
461 }
462 }
463
464 _ = sigterm.recv() => {
465 debug!("SIGTERM received, exiting");
466 return Ok(Some(1));
467 }
468
469 _ = sighup.recv() => {
470 debug!("SIGHUP received, exiting");
471 return Ok(Some(1));
472 }
473 }
474 }
475}
476
477#[allow(clippy::too_many_arguments)]
478pub async fn run(
479 session: &str,
480 mut framed: Framed<UnixStream, FrameCodec>,
481 redraw: bool,
482 ctl_path: &Path,
483 env_vars: Vec<(String, String)>,
484 no_escape: bool,
485 forward_agent: bool,
486 forward_open: bool,
487) -> anyhow::Result<i32> {
488 let stdin = io::stdin();
489 let stdin_fd = stdin.as_fd();
490 let stdin_borrowed: BorrowedFd<'static> =
492 unsafe { BorrowedFd::borrow_raw(stdin_fd.as_raw_fd()) };
493 let raw_guard = RawModeGuard::enter(stdin_borrowed)?;
494
495 let nb_guard = NonBlockGuard::set(stdin_borrowed)?;
498 let async_stdin = AsyncFd::new(io::stdin())?;
499 let mut sigwinch = signal(SignalKind::window_change())?;
500 let mut buf = vec![0u8; 4096];
501 let mut current_redraw = redraw;
502 let mut current_env = env_vars;
503 let mut escape = if no_escape { None } else { Some(EscapeProcessor::new()) };
504 let agent_socket = if forward_agent { std::env::var("SSH_AUTH_SOCK").ok() } else { None };
505
506 loop {
507 let result = if send_init_frames(
508 &mut framed,
509 ¤t_env,
510 forward_agent,
511 agent_socket.as_deref(),
512 forward_open,
513 current_redraw,
514 )
515 .await
516 {
517 relay(
518 &mut framed,
519 &async_stdin,
520 &mut sigwinch,
521 &mut buf,
522 escape.as_mut(),
523 &raw_guard,
524 &nb_guard,
525 agent_socket.as_deref(),
526 )
527 .await?
528 } else {
529 None
530 };
531 match result {
532 Some(code) => return Ok(code),
533 None => {
534 current_env.clear();
536 write_stdout(b"[reconnecting...]\r\n")?;
538
539 loop {
540 tokio::time::sleep(Duration::from_secs(1)).await;
541
542 {
544 let mut peek = [0u8; 1];
545 match io::stdin().read(&mut peek) {
546 Ok(1) if peek[0] == 0x03 => {
547 write_stdout(b"\r\n")?;
548 return Ok(1);
549 }
550 _ => {}
551 }
552 }
553
554 let stream = match UnixStream::connect(ctl_path).await {
555 Ok(s) => s,
556 Err(_) => continue,
557 };
558
559 let mut new_framed = Framed::new(stream, FrameCodec);
560 if new_framed
561 .send(Frame::Attach { session: session.to_string() })
562 .await
563 .is_err()
564 {
565 continue;
566 }
567
568 match new_framed.next().await {
569 Some(Ok(Frame::Ok)) => {
570 write_stdout(b"[reconnected]\r\n")?;
571 framed = new_framed;
572 current_redraw = true;
573 break;
574 }
575 Some(Ok(Frame::Error { message })) => {
576 let msg = format!("[session gone: {message}]\r\n");
577 write_stdout(msg.as_bytes())?;
578 return Ok(1);
579 }
580 _ => continue,
581 }
582 }
583 }
584 }
585 }
586}
587
588pub async fn tail(
592 session: &str,
593 mut framed: Framed<UnixStream, FrameCodec>,
594 ctl_path: &Path,
595) -> anyhow::Result<i32> {
596 let mut heartbeat_interval = tokio::time::interval(HEARTBEAT_INTERVAL);
597 heartbeat_interval.reset();
598 let mut last_pong = Instant::now();
599 let mut sigterm = signal(SignalKind::terminate())?;
600 let mut sighup = signal(SignalKind::hangup())?;
601
602 loop {
603 let result = 'relay: loop {
604 tokio::select! {
605 frame = framed.next() => {
606 match frame {
607 Some(Ok(Frame::Data(data))) => {
608 write_stdout(&data)?;
609 }
610 Some(Ok(Frame::Pong)) => {
611 last_pong = Instant::now();
612 }
613 Some(Ok(Frame::Exit { code })) => {
614 break 'relay Some(code);
615 }
616 Some(Ok(_)) => {}
617 Some(Err(e)) => {
618 debug!("tail connection error: {e}");
619 break 'relay None;
620 }
621 None => {
622 debug!("tail server disconnected");
623 break 'relay None;
624 }
625 }
626 }
627 _ = heartbeat_interval.tick() => {
628 if last_pong.elapsed() > HEARTBEAT_TIMEOUT {
629 debug!("tail heartbeat timeout");
630 break 'relay None;
631 }
632 if framed.send(Frame::Ping).await.is_err() {
633 break 'relay None;
634 }
635 }
636 _ = sigterm.recv() => {
637 break 'relay Some(1);
638 }
639 _ = sighup.recv() => {
640 break 'relay Some(1);
641 }
642 }
643 };
644
645 match result {
646 Some(code) => return Ok(code),
647 None => {
648 eprintln!("[reconnecting...]");
649 loop {
650 tokio::time::sleep(Duration::from_secs(1)).await;
651
652 let stream = match UnixStream::connect(ctl_path).await {
653 Ok(s) => s,
654 Err(_) => continue,
655 };
656
657 let mut new_framed = Framed::new(stream, FrameCodec);
658 if new_framed.send(Frame::Tail { session: session.to_string() }).await.is_err()
659 {
660 continue;
661 }
662
663 match new_framed.next().await {
664 Some(Ok(Frame::Ok)) => {
665 eprintln!("[reconnected]");
666 framed = new_framed;
667 heartbeat_interval.reset();
668 last_pong = Instant::now();
669 break;
670 }
671 Some(Ok(Frame::Error { message })) => {
672 eprintln!("[session gone: {message}]");
673 return Ok(1);
674 }
675 _ => continue,
676 }
677 }
678 }
679 }
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use super::*;
686
687 #[test]
688 fn normal_passthrough() {
689 let mut ep = EscapeProcessor::new();
690 let actions = ep.process(b"hello");
692 assert_eq!(actions, vec![EscapeAction::Data(b"hello".to_vec())]);
693 }
694
695 #[test]
696 fn tilde_after_newline_detach() {
697 let mut ep = EscapeProcessor { state: EscapeState::Normal };
698 let actions = ep.process(b"\n~.");
699 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
700 }
701
702 #[test]
703 fn tilde_after_cr_detach() {
704 let mut ep = EscapeProcessor { state: EscapeState::Normal };
705 let actions = ep.process(b"\r~.");
706 assert_eq!(actions, vec![EscapeAction::Data(b"\r".to_vec()), EscapeAction::Detach,]);
707 }
708
709 #[test]
710 fn tilde_not_after_newline() {
711 let mut ep = EscapeProcessor { state: EscapeState::Normal };
712 let actions = ep.process(b"a~.");
713 assert_eq!(actions, vec![EscapeAction::Data(b"a~.".to_vec())]);
714 }
715
716 #[test]
717 fn initial_state_detach() {
718 let mut ep = EscapeProcessor::new();
719 let actions = ep.process(b"~.");
720 assert_eq!(actions, vec![EscapeAction::Detach]);
721 }
722
723 #[test]
724 fn tilde_suspend() {
725 let mut ep = EscapeProcessor { state: EscapeState::Normal };
726 let actions = ep.process(b"\n~\x1a");
727 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Suspend,]);
728 }
729
730 #[test]
731 fn tilde_help() {
732 let mut ep = EscapeProcessor { state: EscapeState::Normal };
733 let actions = ep.process(b"\n~?");
734 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Help,]);
735 }
736
737 #[test]
738 fn double_tilde() {
739 let mut ep = EscapeProcessor { state: EscapeState::Normal };
740 let actions = ep.process(b"\n~~");
741 assert_eq!(
742 actions,
743 vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~".to_vec()),]
744 );
745 assert_eq!(ep.state, EscapeState::Normal);
746 }
747
748 #[test]
749 fn tilde_unknown_char() {
750 let mut ep = EscapeProcessor { state: EscapeState::Normal };
751 let actions = ep.process(b"\n~x");
752 assert_eq!(
753 actions,
754 vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~x".to_vec()),]
755 );
756 }
757
758 #[test]
759 fn split_across_reads() {
760 let mut ep = EscapeProcessor { state: EscapeState::Normal };
761 let a1 = ep.process(b"\n");
762 assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
763 let a2 = ep.process(b"~");
764 assert_eq!(a2, vec![]); let a3 = ep.process(b".");
766 assert_eq!(a3, vec![EscapeAction::Detach]);
767 }
768
769 #[test]
770 fn split_tilde_then_normal() {
771 let mut ep = EscapeProcessor { state: EscapeState::Normal };
772 let a1 = ep.process(b"\n");
773 assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
774 let a2 = ep.process(b"~");
775 assert_eq!(a2, vec![]);
776 let a3 = ep.process(b"a");
777 assert_eq!(a3, vec![EscapeAction::Data(b"~a".to_vec())]);
778 }
779
780 #[test]
781 fn multiple_escapes_one_buffer() {
782 let mut ep = EscapeProcessor { state: EscapeState::Normal };
783 let actions = ep.process(b"\n~?\n~.");
784 assert_eq!(
785 actions,
786 vec![
787 EscapeAction::Data(b"\n".to_vec()),
788 EscapeAction::Help,
789 EscapeAction::Data(b"\n".to_vec()),
790 EscapeAction::Detach,
791 ]
792 );
793 }
794
795 #[test]
796 fn consecutive_newlines() {
797 let mut ep = EscapeProcessor { state: EscapeState::Normal };
798 let actions = ep.process(b"\n\n\n~.");
799 assert_eq!(actions, vec![EscapeAction::Data(b"\n\n\n".to_vec()), EscapeAction::Detach,]);
800 }
801
802 #[test]
803 fn detach_stops_processing() {
804 let mut ep = EscapeProcessor { state: EscapeState::Normal };
805 let actions = ep.process(b"\n~.remaining");
806 assert_eq!(actions, vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Detach,]);
807 }
808
809 #[test]
810 fn tilde_then_newline() {
811 let mut ep = EscapeProcessor { state: EscapeState::Normal };
812 let actions = ep.process(b"\n~\n");
813 assert_eq!(
814 actions,
815 vec![EscapeAction::Data(b"\n".to_vec()), EscapeAction::Data(b"~\n".to_vec()),]
816 );
817 assert_eq!(ep.state, EscapeState::AfterNewline);
818 }
819
820 #[test]
821 fn empty_input() {
822 let mut ep = EscapeProcessor::new();
823 let actions = ep.process(b"");
824 assert_eq!(actions, vec![]);
825 }
826
827 #[test]
828 fn only_tilde_buffered() {
829 let mut ep = EscapeProcessor { state: EscapeState::Normal };
830 let a1 = ep.process(b"\n~");
831 assert_eq!(a1, vec![EscapeAction::Data(b"\n".to_vec())]);
832 assert_eq!(ep.state, EscapeState::AfterTilde);
833 let a2 = ep.process(b".");
834 assert_eq!(a2, vec![EscapeAction::Detach]);
835 }
836}