1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
//! SSH connection handling using portable-pty for interactive PTY sessions.
//!
//! Uses the system ssh command with PTY support for full terminal emulation.
use crate::service_error::ServiceError;
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use std::io::{Read, Write};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
/// Active SSH connection with PTY
pub struct SshConnection {
/// Channel sender for writing to PTY
write_tx: mpsc::UnboundedSender<Vec<u8>>,
/// Channel receiver for reading from PTY
read_rx: mpsc::UnboundedReceiver<Vec<u8>>,
/// Flag indicating if connection is alive
alive: Arc<AtomicBool>,
/// Target host
pub host: String,
/// Target port
pub port: u16,
/// Target user
pub user: String,
/// Current terminal size
cols: u16,
rows: u16,
/// Handle to resize the PTY (via channel)
resize_tx: mpsc::UnboundedSender<(u16, u16)>,
}
impl SshConnection {
/// Connect to an SSH server and open a PTY session
pub async fn connect(
host: &str,
port: u16,
user: &str,
key_path: &Path,
_key_passphrase: Option<&str>,
cols: u16,
rows: u16,
shell: Option<&str>,
) -> Result<Self, ServiceError> {
// Verify key file exists
if !key_path.exists() {
return Err(ServiceError::InvalidParams(format!(
"SSH key not found: {:?}",
key_path
)));
}
// Create channels for communication
let (write_tx, mut write_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let (read_tx, read_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let (resize_tx, mut resize_rx) = mpsc::unbounded_channel::<(u16, u16)>();
let alive = Arc::new(AtomicBool::new(true));
let alive_clone = Arc::clone(&alive);
// Build command arguments
let key_path_str = key_path.to_string_lossy().to_string();
let target = format!("{}@{}", user, host);
let port_str = port.to_string();
let shell_cmd = shell.map(|s| s.to_string());
// Spawn a blocking thread to handle the PTY
thread::spawn(move || {
// Create PTY system
let pty_system = native_pty_system();
// Configure PTY size
let pty_size = PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
};
// Open PTY pair
let pair = match pty_system.openpty(pty_size) {
Ok(p) => p,
Err(e) => {
warn!("Failed to open PTY: {}", e);
alive_clone.store(false, Ordering::SeqCst);
return;
}
};
// Build SSH command
let mut cmd = CommandBuilder::new("ssh");
// Set TERM for proper terminal emulation (enables alternate screen for TUI apps)
cmd.env("TERM", "xterm-256color");
cmd.arg("-i");
cmd.arg(&key_path_str);
cmd.arg("-p");
cmd.arg(&port_str);
cmd.arg("-o");
cmd.arg("StrictHostKeyChecking=accept-new");
cmd.arg("-o");
cmd.arg("UserKnownHostsFile=/dev/null");
// Request pseudo-terminal allocation explicitly
cmd.arg("-t");
cmd.arg(&target);
if let Some(ref shell_cmd) = shell_cmd {
cmd.arg(shell_cmd);
}
debug!("Spawning SSH command to {}", target);
// Spawn the SSH process in the PTY
let _child = match pair.slave.spawn_command(cmd) {
Ok(c) => c,
Err(e) => {
warn!("Failed to spawn SSH: {}", e);
alive_clone.store(false, Ordering::SeqCst);
return;
}
};
// Drop the slave to close our handle to it
drop(pair.slave);
// Get reader and writer from master
let mut reader = match pair.master.try_clone_reader() {
Ok(r) => r,
Err(e) => {
warn!("Failed to get PTY reader: {}", e);
alive_clone.store(false, Ordering::SeqCst);
return;
}
};
let mut writer = match pair.master.take_writer() {
Ok(w) => w,
Err(e) => {
warn!("Failed to get PTY writer: {}", e);
alive_clone.store(false, Ordering::SeqCst);
return;
}
};
info!("SSH PTY session established");
// Use a thread for reading
let read_alive = Arc::clone(&alive_clone);
let read_thread = thread::spawn(move || {
let mut buf = [0u8; 4096];
loop {
if !read_alive.load(Ordering::SeqCst) {
break;
}
match reader.read(&mut buf) {
Ok(0) => {
// EOF
read_alive.store(false, Ordering::SeqCst);
break;
}
Ok(n) => {
if read_tx.send(buf[..n].to_vec()).is_err() {
break;
}
}
Err(e) => {
warn!("PTY read error: {}", e);
read_alive.store(false, Ordering::SeqCst);
break;
}
}
}
});
// Handle writes and resizes in this thread
loop {
if !alive_clone.load(Ordering::SeqCst) {
break;
}
// Check for write data (non-blocking via try_recv simulation with timeout)
if let Ok(data) = write_rx.try_recv() {
if let Err(e) = writer.write_all(&data) {
warn!("PTY write error: {}", e);
alive_clone.store(false, Ordering::SeqCst);
break;
}
let _ = writer.flush();
}
// Check for resize
if let Ok((new_cols, new_rows)) = resize_rx.try_recv() {
let size = PtySize {
rows: new_rows,
cols: new_cols,
pixel_width: 0,
pixel_height: 0,
};
if let Err(e) = pair.master.resize(size) {
warn!("PTY resize error: {}", e);
}
}
// Small sleep to prevent busy loop
thread::sleep(std::time::Duration::from_millis(10));
}
// Wait for read thread
let _ = read_thread.join();
});
Ok(Self {
write_tx,
read_rx,
alive,
host: host.to_string(),
port,
user: user.to_string(),
cols,
rows,
resize_tx,
})
}
/// Send data to the PTY
pub async fn send(&self, data: &[u8]) -> Result<(), ServiceError> {
self.write_tx.send(data.to_vec()).map_err(|_| {
ServiceError::Internal("Failed to send to PTY - channel closed".to_string())
})
}
/// Try to receive data from the PTY (non-blocking)
pub fn try_recv(&mut self) -> Option<Vec<u8>> {
self.read_rx.try_recv().ok()
}
/// Receive data from PTY with timeout
pub async fn recv_timeout(&mut self, timeout: std::time::Duration) -> Option<Vec<u8>> {
tokio::time::timeout(timeout, self.read_rx.recv())
.await
.ok()
.flatten()
}
/// Check if the connection is still alive
pub fn is_alive(&self) -> bool {
self.alive.load(Ordering::SeqCst)
}
/// Mark connection as dead
pub fn mark_dead(&self) {
self.alive.store(false, Ordering::SeqCst);
}
/// Resize the PTY
pub fn resize(&mut self, cols: u16, rows: u16) -> Result<(), ServiceError> {
self.resize_tx.send((cols, rows)).map_err(|_| {
ServiceError::Internal("Failed to resize PTY - channel closed".to_string())
})?;
self.cols = cols;
self.rows = rows;
Ok(())
}
/// Get current PTY size
pub fn size(&self) -> (u16, u16) {
(self.cols, self.rows)
}
/// Close the connection
pub async fn close(self) -> Result<(), ServiceError> {
self.alive.store(false, Ordering::SeqCst);
Ok(())
}
}