1use std::io::{self, Read, Write};
2use std::os::unix::io::RawFd;
3use std::os::unix::net::{UnixListener, UnixStream};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use nix::fcntl::{fcntl, FcntlArg, OFlag};
11use nix::sys::signal::{kill, Signal};
12use nix::sys::termios::Termios;
13use nix::unistd::{close, dup2, execvp, fork, setsid, ForkResult, Pid};
14
15use super::client::ClientInfo;
16use super::health_monitor::{attempt_recovery, HealthMonitor, RecoveryStrategy};
17use super::io_handler::{
18 spawn_resize_monitor_thread, spawn_socket_to_stdout_thread, PtyIoHandler, ScrollbackHandler,
19 DEFAULT_BUFFER_SIZE,
20};
21use super::session_switcher::{SessionSwitcher, SwitchResult};
22use super::socket::{create_listener, get_command_end, parse_nds_command, send_resize_command};
23use super::terminal::{
24 capture_terminal_state, get_terminal_size, restore_terminal, save_terminal_state, send_refresh,
25 send_terminal_refresh_sequences, set_raw_mode, set_stdin_blocking, set_terminal_size,
26};
27use crate::error::{NdsError, Result};
28use crate::pty_buffer::PtyBuffer;
29use crate::scrollback::ScrollbackViewer;
30use crate::session::Session;
31
32#[derive(Debug, Clone)]
33struct TerminalModeTracker {
34 cursor_visible: bool,
35 application_cursor_keys: bool,
36 alternate_screen: bool,
37 bracketed_paste: bool,
38 tail: Vec<u8>,
39}
40
41impl Default for TerminalModeTracker {
42 fn default() -> Self {
43 TerminalModeTracker {
44 cursor_visible: true,
45 application_cursor_keys: false,
46 alternate_screen: false,
47 bracketed_paste: false,
48 tail: Vec::with_capacity(16),
49 }
50 }
51}
52
53impl TerminalModeTracker {
54 fn observe(&mut self, chunk: &[u8]) {
55 if chunk.is_empty() {
56 return;
57 }
58
59 let mut changes = Vec::new();
60 self.scan(chunk, &mut changes);
61
62 if !self.tail.is_empty() {
63 let mut combined = Vec::with_capacity(self.tail.len() + chunk.len());
64 combined.extend_from_slice(&self.tail);
65 combined.extend_from_slice(chunk);
66 self.scan(&combined, &mut changes);
67 }
68
69 const MAX_TAIL: usize = 7; self.tail.clear();
71 let take = chunk.len().min(MAX_TAIL);
72 self.tail.extend_from_slice(&chunk[chunk.len() - take..]);
73
74 if trace_enabled() && !changes.is_empty() {
75 trace(|| format!("observed sequences: {}", changes.join(", ")));
76 }
77 }
78
79 fn scan(&mut self, data: &[u8], changes: &mut Vec<&'static str>) {
80 if contains_sequence(data, b"\x1b[?25l") {
81 if self.cursor_visible {
82 self.cursor_visible = false;
83 changes.push("?25l");
84 }
85 }
86 if contains_sequence(data, b"\x1b[?25h") {
87 if !self.cursor_visible {
88 self.cursor_visible = true;
89 changes.push("?25h");
90 }
91 }
92 if contains_sequence(data, b"\x1b[?1h") {
93 if !self.application_cursor_keys {
94 self.application_cursor_keys = true;
95 changes.push("?1h");
96 }
97 }
98 if contains_sequence(data, b"\x1b[?1l") {
99 if self.application_cursor_keys {
100 self.application_cursor_keys = false;
101 changes.push("?1l");
102 }
103 }
104 if contains_sequence(data, b"\x1b[?1049h") || contains_sequence(data, b"\x1b[?47h") {
105 if !self.alternate_screen {
106 self.alternate_screen = true;
107 changes.push("?1049h");
108 }
109 }
110 if contains_sequence(data, b"\x1b[?1049l") || contains_sequence(data, b"\x1b[?47l") {
111 if self.alternate_screen {
112 self.alternate_screen = false;
113 changes.push("?1049l");
114 }
115 }
116 if contains_sequence(data, b"\x1b[?2004h") {
117 if !self.bracketed_paste {
118 self.bracketed_paste = true;
119 changes.push("?2004h");
120 }
121 }
122 if contains_sequence(data, b"\x1b[?2004l") {
123 if self.bracketed_paste {
124 self.bracketed_paste = false;
125 changes.push("?2004l");
126 }
127 }
128 }
129
130 fn apply_to_client(&self, client: &mut ClientInfo) -> io::Result<()> {
131 let mut seq = Vec::new();
132 let mut applied = Vec::new();
133
134 if self.alternate_screen {
135 push_sequence(&mut seq, &mut applied, b"\x1b[?1049h", "?1049h");
136 } else {
137 push_sequence(&mut seq, &mut applied, b"\x1b[?1049l", "?1049l");
138 }
139
140 if self.bracketed_paste {
141 push_sequence(&mut seq, &mut applied, b"\x1b[?2004h", "?2004h");
142 } else {
143 push_sequence(&mut seq, &mut applied, b"\x1b[?2004l", "?2004l");
144 }
145
146 if self.application_cursor_keys {
147 push_sequence(&mut seq, &mut applied, b"\x1b[?1h", "?1h");
148 } else {
149 push_sequence(&mut seq, &mut applied, b"\x1b[?1l", "?1l");
150 }
151
152 if self.cursor_visible {
153 push_sequence(&mut seq, &mut applied, b"\x1b[?25h", "?25h");
154 } else {
155 push_sequence(&mut seq, &mut applied, b"\x1b[?25l", "?25l");
156 }
157
158 if !seq.is_empty() {
159 client.send_data(&seq)?;
160 client.flush_pending()?;
161 }
162
163 if trace_enabled() && !applied.is_empty() {
164 trace(|| format!("reapplied to client {}: {}", client.id, applied.join(", ")));
165 }
166
167 Ok(())
168 }
169}
170
171fn contains_sequence(haystack: &[u8], needle: &[u8]) -> bool {
172 if needle.is_empty() || needle.len() > haystack.len() {
173 return false;
174 }
175
176 haystack
177 .windows(needle.len())
178 .any(|window| window == needle)
179}
180
181fn send_buffered_output_to_client(
182 client: &mut ClientInfo,
183 output_buffer: &PtyBuffer,
184 io_handler: &PtyIoHandler,
185) -> io::Result<()> {
186 if !output_buffer.is_empty() {
187 let mut buffered_data = Vec::new();
188 output_buffer.drain_to(&mut buffered_data);
189
190 if !buffered_data.is_empty() {
191 client.send_data(&buffered_data)?;
192 client.flush_pending()?;
193
194 if trace_enabled() {
195 trace(|| {
196 format!(
197 "replayed {} bytes of scrollback to client {}",
198 buffered_data.len(),
199 client.id
200 )
201 });
202 }
203 }
204
205 io_handler.send_refresh()?;
207 } else {
208 io_handler.send_refresh()?;
209 }
210
211 Ok(())
212}
213
214fn push_sequence(
215 seq: &mut Vec<u8>,
216 applied: &mut Vec<&'static str>,
217 bytes: &[u8],
218 label: &'static str,
219) {
220 seq.extend_from_slice(bytes);
221 applied.push(label);
222}
223
224fn trace_enabled() -> bool {
225 static TRACE: OnceLock<bool> = OnceLock::new();
226 *TRACE.get_or_init(|| {
227 std::env::var("NDS_TRACE_TERMINAL")
228 .map(|v| !v.is_empty() && v != "0")
229 .unwrap_or(false)
230 })
231}
232
233fn trace<F>(msg: F)
234where
235 F: FnOnce() -> String,
236{
237 if trace_enabled() {
238 eprintln!("[NDS trace] {}", msg());
239 }
240}
241
242pub struct PtyProcess {
243 pub master_fd: RawFd,
244 pub pid: Pid,
245 pub socket_path: PathBuf,
246 listener: Option<UnixListener>,
247 output_buffer: Option<PtyBuffer>,
248 #[allow(dead_code)]
249 shell_pid: Option<Pid>, #[allow(dead_code)]
251 session_id: String, }
253
254impl PtyProcess {
255 fn open_pty() -> Result<(RawFd, RawFd)> {
257 unsafe {
258 let master_fd = libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY);
260 if master_fd < 0 {
261 return Err(NdsError::PtyError("Failed to open PTY master".to_string()));
262 }
263
264 if libc::grantpt(master_fd) < 0 {
266 let _ = libc::close(master_fd);
267 return Err(NdsError::PtyError("Failed to grant PTY access".to_string()));
268 }
269
270 if libc::unlockpt(master_fd) < 0 {
272 let _ = libc::close(master_fd);
273 return Err(NdsError::PtyError("Failed to unlock PTY".to_string()));
274 }
275
276 let slave_name = libc::ptsname(master_fd);
278 if slave_name.is_null() {
279 let _ = libc::close(master_fd);
280 return Err(NdsError::PtyError(
281 "Failed to get PTY slave name".to_string(),
282 ));
283 }
284
285 let slave_cstr = std::ffi::CStr::from_ptr(slave_name);
287 let slave_fd = libc::open(slave_cstr.as_ptr(), libc::O_RDWR);
288 if slave_fd < 0 {
289 let _ = libc::close(master_fd);
290 return Err(NdsError::PtyError("Failed to open PTY slave".to_string()));
291 }
292
293 Ok((master_fd, slave_fd))
294 }
295 }
296
297 pub fn spawn_new_detached(session_id: &str) -> Result<Session> {
299 Self::spawn_new_detached_with_name(session_id, None)
300 }
301
302 pub fn spawn_new_detached_with_name(session_id: &str, name: Option<String>) -> Result<Session> {
304 let (cols, rows) = get_terminal_size().unwrap_or((80, 24));
306
307 match unsafe { fork() }
309 .map_err(|e| NdsError::ForkError(format!("First fork failed: {}", e)))?
310 {
311 ForkResult::Parent { child: _ } => {
312 thread::sleep(Duration::from_millis(200));
314
315 Session::load(session_id)
317 }
318 ForkResult::Child => {
319 setsid().map_err(|e| NdsError::ProcessError(format!("setsid failed: {}", e)))?;
322
323 match unsafe { fork() }
325 .map_err(|e| NdsError::ForkError(format!("Second fork failed: {}", e)))?
326 {
327 ForkResult::Parent { child: _ } => {
328 std::process::exit(0);
330 }
331 ForkResult::Child => {
332 unsafe {
335 libc::close(0);
336 libc::close(1);
337 libc::close(2);
338
339 let dev_null = libc::open(
341 b"/dev/null\0".as_ptr() as *const libc::c_char,
342 libc::O_RDWR,
343 );
344 if dev_null >= 0 {
345 libc::dup2(dev_null, 0);
346 libc::dup2(dev_null, 1);
347 libc::dup2(dev_null, 2);
348 if dev_null > 2 {
349 libc::close(dev_null);
350 }
351 }
352 }
353
354 let daemon_pid = std::process::id() as i32;
356
357 let (pty_process, _session) = Self::spawn_new_internal_with_size(
359 session_id, name, cols, rows, daemon_pid,
360 )?;
361
362 if let Err(_e) = pty_process.run_detached() {
364 }
366
367 Session::cleanup(session_id).ok();
369 std::process::exit(0);
370 }
371 }
372 }
373 }
374 }
375
376 fn spawn_new_internal_with_size(
377 session_id: &str,
378 name: Option<String>,
379 cols: u16,
380 rows: u16,
381 daemon_pid: i32,
382 ) -> Result<(Self, Session)> {
383 let (master_fd, slave_fd) = Self::open_pty()?;
385
386 set_terminal_size(slave_fd, cols, rows)?;
388
389 let flags = fcntl(master_fd, FcntlArg::F_GETFL)
391 .map_err(|e| NdsError::PtyError(format!("Failed to get flags: {}", e)))?;
392 fcntl(
393 master_fd,
394 FcntlArg::F_SETFL(OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK),
395 )
396 .map_err(|e| NdsError::PtyError(format!("Failed to set non-blocking: {}", e)))?;
397
398 let (listener, socket_path) = create_listener(session_id)?;
400
401 match unsafe { fork() }.map_err(|e| NdsError::ForkError(e.to_string()))? {
403 ForkResult::Parent { child } => {
404 let _ = close(slave_fd);
406
407 let session = Session::with_name(
410 session_id.to_string(),
411 name,
412 daemon_pid, socket_path.clone(),
414 );
415 session.save().map_err(|e| {
416 eprintln!("Failed to save session: {}", e);
417 e
418 })?;
419
420 let pty_process = PtyProcess {
421 master_fd,
422 pid: child,
423 socket_path,
424 listener: Some(listener),
425 output_buffer: Some(PtyBuffer::new(2 * 1024 * 1024)), shell_pid: Some(child), session_id: session_id.to_string(),
428 };
429
430 Ok((pty_process, session))
431 }
432 ForkResult::Child => {
433 let _ = close(master_fd);
435
436 setsid().map_err(|e| NdsError::ProcessError(format!("setsid failed: {}", e)))?;
438
439 unsafe {
441 if libc::ioctl(slave_fd, libc::TIOCSCTTY as u64, 0) < 0 {
442 eprintln!("Failed to set controlling terminal");
443 std::process::exit(1);
444 }
445 }
446
447 dup2(slave_fd, 0)
449 .map_err(|e| NdsError::ProcessError(format!("dup2 stdin failed: {}", e)))?;
450 dup2(slave_fd, 1)
451 .map_err(|e| NdsError::ProcessError(format!("dup2 stdout failed: {}", e)))?;
452 dup2(slave_fd, 2)
453 .map_err(|e| NdsError::ProcessError(format!("dup2 stderr failed: {}", e)))?;
454
455 if slave_fd > 2 {
457 let _ = close(slave_fd);
458 }
459
460 std::env::set_var("NDS_SESSION_ID", session_id);
462 if let Some(ref session_name) = name {
463 std::env::set_var("NDS_SESSION_NAME", session_name);
464 } else {
465 std::env::set_var("NDS_SESSION_NAME", session_id);
466 }
467
468 unsafe {
470 libc::umask(0o077); }
472
473 let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
475
476 let shell_cstr = std::ffi::CString::new(shell.as_str()).unwrap();
478 let args = vec![shell_cstr.clone()];
479
480 execvp(&shell_cstr, &args)
481 .map_err(|e| NdsError::ProcessError(format!("execvp failed: {}", e)))?;
482
483 unreachable!()
485 }
486 }
487 }
488
489 pub fn attach_to_session(session: &Session) -> Result<Option<String>> {
491 std::env::set_var("NDS_SESSION_ID", &session.id);
493 std::env::set_var(
494 "NDS_SESSION_NAME",
495 session.name.as_ref().unwrap_or(&session.id),
496 );
497
498 let stdin_fd = 0;
500 let original_termios = save_terminal_state(stdin_fd)?;
501
502 let _terminal_state = capture_terminal_state(stdin_fd)?;
504
505 let mut socket = session.connect_socket()?;
507
508 let (cols, rows) = get_terminal_size()?;
510 send_resize_command(&mut socket, cols, rows)?;
511 thread::sleep(Duration::from_millis(50));
512
513 let running = Arc::new(AtomicBool::new(true));
519 let r1 = running.clone();
520 let r2 = running.clone();
521
522 let paused = Arc::new(AtomicBool::new(false));
524 let paused_clone = paused.clone();
525
526 ctrlc::set_handler(move || {
528 r1.store(false, Ordering::SeqCst);
529 })
530 .map_err(|e| NdsError::SignalError(format!("Failed to set signal handler: {}", e)))?;
531
532 set_raw_mode(stdin_fd, &original_termios)?;
534
535 let scrollback = ScrollbackHandler::new(10 * 1024 * 1024); let socket_for_resize = socket
545 .try_clone()
546 .map_err(|e| NdsError::SocketError(format!("Failed to clone socket: {}", e)))?;
547 let resize_running = running.clone();
548 let _resize_monitor =
549 spawn_resize_monitor_thread(socket_for_resize, resize_running, (cols, rows));
550
551 let socket_clone = socket
553 .try_clone()
554 .map_err(|e| NdsError::SocketError(format!("Failed to clone socket: {}", e)))?;
555 let socket_to_stdout = spawn_socket_to_stdout_thread(
556 socket_clone,
557 r2,
558 scrollback.get_shared_buffer(),
559 paused_clone,
560 );
561
562 let result = Self::handle_input_loop(
567 &mut socket,
568 session,
569 &original_termios,
570 &running,
571 &scrollback,
572 &paused,
573 );
574
575 running.store(false, Ordering::SeqCst);
577 let _ = socket.shutdown(std::net::Shutdown::Both);
578 drop(socket);
579 thread::sleep(Duration::from_millis(50));
580 let _ = socket_to_stdout.join();
581
582 restore_terminal(stdin_fd, &original_termios)?;
584
585 set_stdin_blocking(stdin_fd)?;
587
588 std::env::remove_var("NDS_SESSION_ID");
590 std::env::remove_var("NDS_SESSION_NAME");
591
592 println!("\n[Detached from session {}]", session.id);
593 let _ = io::stdout().flush();
594
595 result
596 }
597
598 fn handle_input_loop(
599 socket: &mut UnixStream,
600 session: &Session,
601 original_termios: &Termios,
602 running: &Arc<AtomicBool>,
603 scrollback: &ScrollbackHandler,
604 paused: &Arc<AtomicBool>,
605 ) -> Result<Option<String>> {
606 let stdin_fd = 0i32;
607 let mut buffer = [0u8; 1024]; let mut at_line_start = true;
611 let mut escape_state = 0; let mut escape_time = Instant::now();
613
614 use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
616
617 loop {
618 if !running.load(Ordering::SeqCst) {
619 break;
620 }
621
622 use std::os::unix::io::BorrowedFd;
624 let stdin_borrowed = unsafe { BorrowedFd::borrow_raw(stdin_fd) };
625 let mut poll_fds = [PollFd::new(stdin_borrowed, PollFlags::POLLIN)];
626 let poll_result = poll(&mut poll_fds, PollTimeout::try_from(10).unwrap()); match poll_result {
629 Ok(0) => {
630 continue;
632 }
633 Ok(_) => {
634 let read_result = unsafe {
636 libc::read(
637 stdin_fd,
638 buffer.as_mut_ptr() as *mut libc::c_void,
639 buffer.len(),
640 )
641 };
642
643 match read_result {
644 0 => {
645 running.store(false, Ordering::SeqCst);
648 break;
649 }
650 n if n > 0 => {
651 let n = n as usize;
652 let (should_detach, should_switch, should_scroll, data_to_forward) =
653 Self::process_input(
654 &buffer[..n],
655 &mut at_line_start,
656 &mut escape_state,
657 &mut escape_time,
658 );
659
660 if should_detach {
661 running.store(false, Ordering::SeqCst);
663 break;
664 }
665
666 if should_switch {
667 paused.store(true, Ordering::SeqCst);
669
670 thread::sleep(Duration::from_millis(50));
672
673 let switcher =
675 SessionSwitcher::new(session, stdin_fd, original_termios);
676
677 restore_terminal(stdin_fd, original_termios)?;
679
680 let switch_result = switcher.show_switcher()?;
681
682 set_raw_mode(stdin_fd, original_termios)?;
684
685 paused.store(false, Ordering::SeqCst);
687
688 match switch_result {
689 SwitchResult::SwitchTo(target_id) => {
690 return Ok(Some(target_id));
691 }
692 SwitchResult::Continue => {
693 escape_state = 0;
694 at_line_start = true;
695 send_terminal_refresh_sequences(socket)?;
697 }
698 }
699 }
700
701 if should_scroll {
702 Self::show_scrollback_viewer(original_termios, socket, scrollback)?;
703 escape_state = 0;
704 at_line_start = true;
705 }
706
707 if !data_to_forward.is_empty() {
709 if let Err(e) = socket.write_all(&data_to_forward) {
710 if e.kind() == io::ErrorKind::BrokenPipe {
711 break;
712 } else {
713 eprintln!("\r\nError writing to socket: {}\r", e);
714 break;
715 }
716 }
717 }
718 }
719 _ => {
720 let err = io::Error::last_os_error();
722 if err.kind() != io::ErrorKind::Interrupted {
723 return Err(NdsError::Io(err));
724 }
725 }
726 }
727 }
728 Err(e) => {
729 eprintln!("Poll error: {:?}", e);
731 return Err(NdsError::Io(io::Error::new(
732 io::ErrorKind::Other,
733 format!("Poll error: {:?}", e),
734 )));
735 }
736 }
737 }
738
739 Ok(None)
740 }
741
742 fn process_input(
743 buffer: &[u8],
744 at_line_start: &mut bool,
745 escape_state: &mut u8,
746 escape_time: &mut Instant,
747 ) -> (bool, bool, bool, Vec<u8>) {
748 let mut should_detach = false;
749 let mut should_switch = false;
750 let mut should_scroll = false;
751 let mut data_to_forward = Vec::new();
752
753 if *escape_state == 1 && escape_time.elapsed() > Duration::from_secs(1) {
755 data_to_forward.push(b'~');
757 *escape_state = 0;
758 }
759
760 for &byte in buffer {
762 if byte == 0x04 {
764 should_detach = true;
765 break;
766 }
767
768 match *escape_state {
769 0 => {
770 if *at_line_start && byte == b'~' {
772 *escape_state = 1;
774 *escape_time = Instant::now();
775 } else {
777 data_to_forward.push(byte);
779 *at_line_start = byte == b'\r' || byte == b'\n';
781 }
782 }
783 1 => {
784 match byte {
786 b'd' => {
787 should_detach = true;
788 break;
789 }
790 b's' => {
791 should_switch = true;
792 break;
793 }
794 b'h' => {
795 should_scroll = true;
796 break;
797 }
798 b'~' => {
799 data_to_forward.push(b'~');
801 *escape_state = 0;
802 *at_line_start = false;
803 }
804 _ => {
805 data_to_forward.push(b'~');
807 data_to_forward.push(byte);
808 *escape_state = 0;
809 *at_line_start =
810 byte == b'\r' || byte == b'\n' || byte == 10 || byte == 13;
811 }
812 }
813 }
814 _ => {
815 *escape_state = 0;
816 }
817 }
818 }
819
820 (should_detach, should_switch, should_scroll, data_to_forward)
821 }
822
823 fn show_scrollback_viewer(
824 original_termios: &Termios,
825 socket: &mut UnixStream,
826 scrollback: &ScrollbackHandler,
827 ) -> Result<()> {
828 use nix::sys::termios::{tcsetattr, SetArg};
829 use std::os::unix::io::BorrowedFd;
830
831 println!("\r\n[Opening scrollback viewer...]\r");
832
833 let content = scrollback.get_buffer();
835
836 let stdin_fd = 0;
838 let stdin = unsafe { BorrowedFd::borrow_raw(stdin_fd) };
839
840 let raw_termios = nix::sys::termios::tcgetattr(&stdin)?;
842
843 tcsetattr(&stdin, SetArg::TCSANOW, original_termios)?;
845
846 let mut viewer = ScrollbackViewer::new(&content);
848 let _ = viewer.run(); tcsetattr(&stdin, SetArg::TCSANOW, &raw_termios)?;
852
853 send_refresh(socket)?;
855 println!("\r\n[Returned to session]\r");
856
857 Ok(())
858 }
859
860 pub fn run_detached(mut self) -> Result<()> {
862 let listener = self
863 .listener
864 .take()
865 .ok_or_else(|| NdsError::PtyError("No listener available".to_string()))?;
866
867 listener.set_nonblocking(true)?;
869
870 let running = Arc::new(AtomicBool::new(true));
871 let r = running.clone();
872
873 ctrlc::set_handler(move || {
875 r.store(false, Ordering::SeqCst);
876 })
877 .map_err(|e| NdsError::SignalError(format!("Failed to set signal handler: {}", e)))?;
878
879 let output_buffer = self
880 .output_buffer
881 .take()
882 .ok_or_else(|| NdsError::PtyError("No output buffer available".to_string()))?;
883
884 let mut active_clients: Vec<ClientInfo> = Vec::new();
886 let mut buffer = [0u8; DEFAULT_BUFFER_SIZE]; let mut terminal_modes = TerminalModeTracker::default();
888
889 let session_id = self
891 .socket_path
892 .file_stem()
893 .and_then(|s| s.to_str())
894 .unwrap_or("unknown")
895 .to_string();
896
897 let io_handler = PtyIoHandler::new(self.master_fd);
899
900 let health_monitor = HealthMonitor::new();
902 let _monitor_thread = health_monitor.start_monitoring(300); let mut consecutive_pty_errors = 0;
906 let max_consecutive_errors = 10;
907 let mut last_recovery_attempt = Instant::now();
908 let mut last_client_health_check = Instant::now();
909
910 while running.load(Ordering::SeqCst) {
911 let _ = self.handle_new_connections(
913 &listener,
914 &mut active_clients,
915 &output_buffer,
916 &io_handler,
917 &session_id,
918 &terminal_modes,
919 );
920
921 match self.read_from_pty(&io_handler, &mut buffer) {
923 Ok(Some(data)) => {
924 consecutive_pty_errors = 0; health_monitor.update_activity(); terminal_modes.observe(&data);
927 let _ = self.broadcast_to_clients(
928 &mut active_clients,
929 &data,
930 &output_buffer,
931 &session_id,
932 );
933 }
934 Ok(None) => {
935 }
937 Err(e) => {
938 consecutive_pty_errors += 1;
940
941 if last_recovery_attempt.elapsed() > Duration::from_secs(5) {
943 let _ = attempt_recovery(RecoveryStrategy::RefreshTerminal, self.master_fd);
945 let _ = attempt_recovery(RecoveryStrategy::ResetBuffers, self.master_fd);
946 last_recovery_attempt = Instant::now();
947 }
948
949 if consecutive_pty_errors >= max_consecutive_errors {
950 eprintln!(
952 "PTY appears to be dead after {} errors: {}",
953 consecutive_pty_errors, e
954 );
955
956 if !health_monitor.is_healthy() {
958 eprintln!("Health monitor confirms session is unhealthy, terminating");
959 return Err(e);
960 }
961
962 consecutive_pty_errors = max_consecutive_errors - 1;
964 }
965
966 thread::sleep(Duration::from_millis(100));
968 }
969 }
970
971 let _ = self.handle_client_input(&mut active_clients, &io_handler, &session_id);
973
974 let _ = self.flush_pending_clients(&mut active_clients, &session_id);
976
977 if last_client_health_check.elapsed() > Duration::from_secs(10) {
979 self.check_client_health(&mut active_clients, &session_id);
980 last_client_health_check = Instant::now();
981 }
982
983 thread::sleep(Duration::from_millis(10));
985 }
986
987 health_monitor.stop_monitoring();
989
990 Ok(())
991 }
992
993 fn handle_new_connections(
994 &self,
995 listener: &UnixListener,
996 active_clients: &mut Vec<ClientInfo>,
997 output_buffer: &PtyBuffer,
998 io_handler: &PtyIoHandler,
999 session_id: &str,
1000 terminal_modes: &TerminalModeTracker,
1001 ) -> Result<()> {
1002 match listener.accept() {
1003 Ok((stream, _)) => {
1004 stream.set_nonblocking(true)?;
1006
1007 let mut client = ClientInfo::new(stream);
1008
1009 if let Err(e) = terminal_modes.apply_to_client(&mut client) {
1010 eprintln!(
1011 "Warning: failed to reapply terminal modes for client {}: {}",
1012 client.id, e
1013 );
1014 }
1015
1016 if let Err(e) =
1018 send_buffered_output_to_client(&mut client, output_buffer, io_handler)
1019 {
1020 eprintln!(
1021 "Warning: failed to send buffered output to new client {}: {}",
1022 client.id, e
1023 );
1024 }
1025
1026 let _ = client.flush_pending();
1027
1028 active_clients.push(client);
1029
1030 let _ = Session::update_client_count(session_id, active_clients.len());
1032 }
1033 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1034 }
1036 Err(_) => {
1037 }
1039 }
1040 Ok(())
1041 }
1042
1043 fn read_from_pty(
1044 &self,
1045 io_handler: &PtyIoHandler,
1046 buffer: &mut [u8],
1047 ) -> Result<Option<Vec<u8>>> {
1048 match io_handler.read_from_pty(buffer) {
1049 Ok(0) => {
1050 eprintln!("Shell process exited, session remains alive for restart");
1053 Ok(None)
1054 }
1055 Ok(n) => Ok(Some(buffer[..n].to_vec())),
1056 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok(None),
1057 Err(e) => Err(NdsError::Io(e)),
1058 }
1059 }
1060
1061 fn broadcast_to_clients(
1062 &self,
1063 active_clients: &mut Vec<ClientInfo>,
1064 data: &[u8],
1065 output_buffer: &PtyBuffer,
1066 session_id: &str,
1067 ) -> Result<()> {
1068 if !active_clients.is_empty() {
1069 let mut disconnected_indices = Vec::new();
1070
1071 for (i, client) in active_clients.iter_mut().enumerate() {
1072 let write_result = client.flush_pending().and_then(|_| client.send_data(data));
1073
1074 if let Err(e) = write_result {
1075 match e.kind() {
1076 io::ErrorKind::BrokenPipe
1077 | io::ErrorKind::ConnectionAborted
1078 | io::ErrorKind::ConnectionReset
1079 | io::ErrorKind::WriteZero => {
1080 disconnected_indices.push(i);
1081 }
1082 _ => {
1083 eprintln!("Warning: failed to write to client {}: {}", client.id, e);
1084 disconnected_indices.push(i);
1085 }
1086 }
1087 }
1088 }
1089
1090 if !disconnected_indices.is_empty() {
1092 self.handle_client_disconnections(
1093 active_clients,
1094 disconnected_indices,
1095 session_id,
1096 )?;
1097 }
1098
1099 if active_clients.is_empty() {
1101 output_buffer.push(data);
1102 }
1103 } else {
1104 output_buffer.push(data);
1106 }
1107 Ok(())
1108 }
1109
1110 fn handle_client_disconnections(
1111 &self,
1112 active_clients: &mut Vec<ClientInfo>,
1113 disconnected_indices: Vec<usize>,
1114 session_id: &str,
1115 ) -> Result<()> {
1116 for i in disconnected_indices.iter().rev() {
1117 active_clients.remove(*i);
1118 }
1119
1120 let _ = Session::update_client_count(session_id, active_clients.len());
1122
1123 if !active_clients.is_empty() {
1125 for client in active_clients.iter_mut() {
1127 let _ = send_terminal_refresh_sequences(&mut client.stream);
1128 let _ = client.stream.flush();
1129 }
1130
1131 self.resize_to_smallest(active_clients)?;
1133 }
1134 Ok(())
1135 }
1136
1137 fn flush_pending_clients(
1138 &self,
1139 active_clients: &mut Vec<ClientInfo>,
1140 session_id: &str,
1141 ) -> Result<()> {
1142 if active_clients.is_empty() {
1143 return Ok(());
1144 }
1145
1146 let mut disconnected_indices = Vec::new();
1147
1148 for (i, client) in active_clients.iter_mut().enumerate() {
1149 if let Err(e) = client.flush_pending() {
1150 match e.kind() {
1151 io::ErrorKind::BrokenPipe
1152 | io::ErrorKind::ConnectionAborted
1153 | io::ErrorKind::ConnectionReset
1154 | io::ErrorKind::WriteZero => disconnected_indices.push(i),
1155 _ => {
1156 eprintln!("Warning: failed to flush client {}: {}", client.id, e);
1157 disconnected_indices.push(i);
1158 }
1159 }
1160 }
1161 }
1162
1163 if !disconnected_indices.is_empty() {
1164 self.handle_client_disconnections(active_clients, disconnected_indices, session_id)?;
1165 }
1166
1167 Ok(())
1168 }
1169
1170 fn resize_to_smallest(&self, active_clients: &[ClientInfo]) -> Result<()> {
1171 let mut min_cols = u16::MAX;
1172 let mut min_rows = u16::MAX;
1173
1174 for client in active_clients {
1175 min_cols = min_cols.min(client.cols);
1176 min_rows = min_rows.min(client.rows);
1177 }
1178
1179 if min_cols != u16::MAX && min_rows != u16::MAX {
1180 set_terminal_size(self.master_fd, min_cols, min_rows)?;
1181 let _ = kill(self.pid, Signal::SIGWINCH);
1182
1183 let io_handler = PtyIoHandler::new(self.master_fd);
1185 let _ = io_handler.send_refresh();
1186 }
1187 Ok(())
1188 }
1189
1190 fn check_client_health(&self, active_clients: &mut Vec<ClientInfo>, session_id: &str) {
1192 let mut dead_clients = Vec::new();
1193
1194 for (i, client) in active_clients.iter_mut().enumerate() {
1195 if client.stream.write(&[]).is_err() {
1197 dead_clients.push(i);
1198 }
1199 }
1200
1201 if !dead_clients.is_empty() {
1202 for i in dead_clients.iter().rev() {
1204 active_clients.remove(*i);
1205 }
1206
1207 let _ = Session::update_client_count(session_id, active_clients.len());
1209
1210 if !active_clients.is_empty() {
1212 let notification = format!(
1213 "\r\n[Cleaned up {} dead connection(s), {} client(s) remaining]\r\n",
1214 dead_clients.len(),
1215 active_clients.len()
1216 );
1217 for client in active_clients.iter_mut() {
1218 let _ = client.stream.write_all(notification.as_bytes());
1219 let _ = client.stream.flush();
1220 }
1221 }
1222 }
1223 }
1224
1225 fn handle_client_input(
1226 &self,
1227 active_clients: &mut Vec<ClientInfo>,
1228 io_handler: &PtyIoHandler,
1229 session_id: &str,
1230 ) -> Result<()> {
1231 let mut disconnected_indices = Vec::new();
1232 let mut client_buffer = [0u8; DEFAULT_BUFFER_SIZE]; let mut pending_disconnects = Vec::new(); let client_count = active_clients.len();
1237
1238 for (i, client) in active_clients.iter_mut().enumerate() {
1239 match client.stream.read(&mut client_buffer) {
1240 Ok(0) => {
1241 disconnected_indices.push(i);
1242 }
1243 Ok(n) => {
1244 let data = &client_buffer[..n];
1245
1246 if let Some((cmd, args)) = parse_nds_command(data) {
1248 if cmd == "resize" && args.len() == 2 {
1249 if let (Ok(cols), Ok(rows)) =
1250 (args[0].parse::<u16>(), args[1].parse::<u16>())
1251 {
1252 client.cols = cols;
1253 client.rows = rows;
1254 set_terminal_size(self.master_fd, cols, rows)?;
1255 let _ = kill(self.pid, Signal::SIGWINCH);
1256
1257 if let Some(end_idx) = get_command_end(data) {
1259 if end_idx < n {
1260 io_handler.write_to_pty(&data[end_idx..])?;
1261 }
1262 }
1263 continue;
1264 }
1265 } else if cmd == "list_clients" {
1266 let response = format!("Connected clients: {}\r\n", client_count);
1269 let _ = client.stream.write_all(response.as_bytes());
1270 let _ = client.stream.flush();
1271 continue; } else if cmd == "disconnect_client" && !args.is_empty() {
1273 let target_id = args[0].to_string();
1275 let current_id = client.id.clone();
1276
1277 let response = if current_id == target_id {
1278 "Cannot disconnect yourself. Use ~d to detach.\r\n".to_string()
1279 } else {
1280 pending_disconnects.push(target_id.clone());
1282 format!("Client {} will be disconnected\r\n", target_id)
1283 };
1284
1285 let _ = client.stream.write_all(response.as_bytes());
1286 let _ = client.stream.flush();
1287 continue; }
1289 }
1290
1291 if let Err(e) = io_handler.write_to_pty(data) {
1294 eprintln!("Warning: Failed to write to PTY: {}", e);
1295 }
1297 }
1298 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1299 }
1301 Err(_) => {
1302 disconnected_indices.push(i);
1303 }
1304 }
1305 }
1306
1307 for target_id in pending_disconnects {
1309 if let Some(idx) = active_clients.iter().position(|c| c.id == target_id) {
1310 let _ = active_clients[idx]
1312 .stream
1313 .write_all(b"\r\n[You have been disconnected by another client]\r\n");
1314 let _ = active_clients[idx].stream.flush();
1315 let _ = active_clients[idx]
1316 .stream
1317 .shutdown(std::net::Shutdown::Both);
1318 disconnected_indices.push(idx);
1319 }
1320 }
1321
1322 if !disconnected_indices.is_empty() {
1324 self.handle_client_disconnections(active_clients, disconnected_indices, session_id)?;
1325 }
1326
1327 Ok(())
1328 }
1329
1330 #[allow(dead_code)]
1332 fn format_client_list(&self, clients: &[ClientInfo]) -> String {
1333 if clients.is_empty() {
1334 return "No clients connected\r\n".to_string();
1335 }
1336
1337 let mut output = format!("Connected clients ({}):\r\n\r\n", clients.len());
1338 output.push_str("ID | Size | Connected | Duration\r\n");
1339 output.push_str("---------|---------|--------------|----------\r\n");
1340
1341 for client in clients {
1342 output.push_str(&format!(
1343 "{:<8} | {}x{:<3} | {} | ",
1344 client.id,
1345 client.cols,
1346 client.rows,
1347 client.connected_at.format("%H:%M:%S")
1348 ));
1349
1350 let duration = chrono::Utc::now().signed_duration_since(client.connected_at);
1351 let hours = duration.num_hours();
1352 let minutes = (duration.num_minutes() % 60) as u32;
1353 let seconds = (duration.num_seconds() % 60) as u32;
1354
1355 if hours > 0 {
1356 output.push_str(&format!("{}h{}m\r\n", hours, minutes));
1357 } else if minutes > 0 {
1358 output.push_str(&format!("{}m{}s\r\n", minutes, seconds));
1359 } else {
1360 output.push_str(&format!("{}s\r\n", seconds));
1361 }
1362 }
1363
1364 output
1365 }
1366
1367 #[allow(dead_code)]
1369 fn disconnect_client_by_id(
1370 &self,
1371 clients: &mut Vec<ClientInfo>,
1372 target_id: &str,
1373 requester_index: usize,
1374 ) -> String {
1375 if let Some(target_index) = clients.iter().position(|c| c.id == target_id) {
1377 if target_index == requester_index {
1378 return "Cannot disconnect yourself. Use ~d to detach.\r\n".to_string();
1379 }
1380
1381 let _ = clients[target_index]
1383 .stream
1384 .write_all(b"\r\n[You have been disconnected by another client]\r\n");
1385 let _ = clients[target_index].stream.flush();
1386 let _ = clients[target_index]
1387 .stream
1388 .shutdown(std::net::Shutdown::Both);
1389
1390 format!("Client {} disconnected successfully\r\n", target_id)
1391 } else {
1392 format!("Client {} not found\r\n", target_id)
1393 }
1394 }
1395
1396 pub fn kill_session(session_id: &str) -> Result<()> {
1398 let session = Session::load(session_id)?;
1399
1400 kill(Pid::from_raw(session.pid), Signal::SIGTERM)
1402 .map_err(|e| NdsError::ProcessError(format!("Failed to kill process: {}", e)))?;
1403
1404 thread::sleep(Duration::from_millis(500));
1406
1407 if Session::is_process_alive(session.pid) {
1409 kill(Pid::from_raw(session.pid), Signal::SIGKILL).map_err(|e| {
1410 NdsError::ProcessError(format!("Failed to force kill process: {}", e))
1411 })?;
1412 }
1413
1414 Session::cleanup(session_id)?;
1416
1417 Ok(())
1418 }
1419}
1420
1421#[allow(dead_code)]
1423fn format_client_list_static(
1424 client_infos: &[(String, u16, u16, chrono::DateTime<chrono::Utc>)],
1425) -> String {
1426 if client_infos.is_empty() {
1427 return "No clients connected\r\n".to_string();
1428 }
1429
1430 let mut output = format!("Connected clients ({}):\r\n\r\n", client_infos.len());
1431 output.push_str("ID | Size | Connected | Duration\r\n");
1432 output.push_str("---------|---------|--------------|----------\r\n");
1433
1434 for (id, cols, rows, connected_at) in client_infos {
1435 output.push_str(&format!(
1436 "{:<8} | {}x{:<3} | {} | ",
1437 id,
1438 cols,
1439 rows,
1440 connected_at.format("%H:%M:%S")
1441 ));
1442
1443 let duration = chrono::Utc::now().signed_duration_since(*connected_at);
1444 let hours = duration.num_hours();
1445 let minutes = (duration.num_minutes() % 60) as u32;
1446 let seconds = (duration.num_seconds() % 60) as u32;
1447
1448 if hours > 0 {
1449 output.push_str(&format!("{}h{}m\r\n", hours, minutes));
1450 } else if minutes > 0 {
1451 output.push_str(&format!("{}m{}s\r\n", minutes, seconds));
1452 } else {
1453 output.push_str(&format!("{}s\r\n", seconds));
1454 }
1455 }
1456
1457 output
1458}
1459
1460impl Drop for PtyProcess {
1461 fn drop(&mut self) {
1462 let _ = close(self.master_fd);
1463 if let Some(listener) = self.listener.take() {
1464 drop(listener);
1465 }
1466 }
1467}
1468
1469#[allow(dead_code)]
1471pub fn spawn_new_detached(session_id: &str) -> Result<Session> {
1472 PtyProcess::spawn_new_detached(session_id)
1473}
1474
1475#[allow(dead_code)]
1476pub fn spawn_new_detached_with_name(session_id: &str, name: Option<String>) -> Result<Session> {
1477 PtyProcess::spawn_new_detached_with_name(session_id, name)
1478}
1479
1480#[allow(dead_code)]
1481pub fn kill_session(session_id: &str) -> Result<()> {
1482 PtyProcess::kill_session(session_id)
1483}