1use crate::sftp_client::{
2 FileEntryType, RemoteFileEntry, format_permissions, format_unix_timestamp,
3};
4use anyhow::Result;
5use russh::*;
6use russh_keys::PublicKeyBase64;
7use russh_keys::*;
8use russh_sftp::client::SftpSession;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::io::{AsyncReadExt, AsyncWriteExt};
14use tokio::sync::mpsc;
15use tokio_util::sync::CancellationToken;
16
17pub mod host_keys;
18pub mod shell;
19pub use host_keys::{
20 HostKeyMismatch, HostKeyStore, HostKeyStoreAccessError, HostKeyVerificationFailure, Verdict,
21 VerificationFailureSlot,
22};
23
24pub const SFTP_CHUNK_SIZE: usize = 32 * 1024;
29
30pub static PREFERRED_HOST_KEY_ALGOS: &[russh_keys::key::Name] = &[
36 russh_keys::key::ED25519,
37 russh_keys::key::ECDSA_SHA2_NISTP256,
38 russh_keys::key::ECDSA_SHA2_NISTP521,
39 russh_keys::key::RSA_SHA2_256,
40 russh_keys::key::RSA_SHA2_512,
41 russh_keys::key::SSH_RSA,
42];
43
44pub static PREFERRED_KEX_ALGOS: &[russh::kex::Name] = &[
52 russh::kex::CURVE25519,
53 russh::kex::CURVE25519_PRE_RFC_8731,
54 russh::kex::DH_G16_SHA512,
55 russh::kex::DH_G14_SHA256,
56 russh::kex::DH_G14_SHA1,
57 russh::kex::DH_G1_SHA1,
58 russh::kex::EXTENSION_SUPPORT_AS_CLIENT,
61 russh::kex::EXTENSION_OPENSSH_STRICT_KEX_AS_CLIENT,
62];
63
64#[derive(Clone, Serialize, Deserialize)]
65pub struct SshConfig {
66 pub host: String,
67 pub port: u16,
68 pub username: String,
69 pub auth_method: AuthMethod,
70}
71
72impl std::fmt::Debug for SshConfig {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("SshConfig")
75 .field("host", &self.host)
76 .field("port", &self.port)
77 .field("username", &self.username)
78 .field("auth_method", &self.auth_method)
79 .finish()
80 }
81}
82
83#[derive(Clone, Serialize, Deserialize)]
84#[serde(tag = "type")]
85pub enum AuthMethod {
86 Password {
87 password: String,
88 },
89 PublicKey {
90 key_path: String,
91 passphrase: Option<String>,
92 },
93 Agent {
94 identity_hint: Option<String>,
95 },
96}
97
98impl std::fmt::Debug for AuthMethod {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 AuthMethod::Password { .. } => f
102 .debug_struct("AuthMethod::Password")
103 .field("password", &"<redacted>")
104 .finish(),
105 AuthMethod::PublicKey {
106 key_path,
107 passphrase,
108 } => f
109 .debug_struct("AuthMethod::PublicKey")
110 .field("key_path", key_path)
111 .field(
112 "passphrase",
113 &passphrase
114 .as_ref()
115 .map(|_| "<redacted>")
116 .unwrap_or("<none>"),
117 )
118 .finish(),
119 AuthMethod::Agent { identity_hint } => f
120 .debug_struct("AuthMethod::Agent")
121 .field("identity_hint", identity_hint)
122 .finish(),
123 }
124 }
125}
126
127pub struct SshClient {
128 session: Option<Arc<client::Handle<Client>>>,
129 host_keys: Arc<HostKeyStore>,
130 sftp: tokio::sync::OnceCell<Arc<SftpSession>>,
134}
135
136#[derive(Debug, Clone, Default)]
140pub struct CommandOutput {
141 pub stdout: String,
142 pub stderr: String,
143 pub exit_code: Option<u32>,
144}
145
146impl CommandOutput {
147 pub fn is_success(&self) -> bool {
149 matches!(self.exit_code, Some(0))
150 }
151
152 pub fn combined(&self) -> String {
156 if self.stderr.is_empty() {
157 self.stdout.clone()
158 } else if self.stdout.is_empty() {
159 self.stderr.clone()
160 } else {
161 let mut out = String::with_capacity(self.stdout.len() + self.stderr.len() + 1);
162 out.push_str(&self.stdout);
163 if !self.stdout.ends_with('\n') {
164 out.push('\n');
165 }
166 out.push_str(&self.stderr);
167 out
168 }
169 }
170}
171
172pub struct PtySession {
174 pub input_tx: mpsc::Sender<Vec<u8>>,
175 pub output_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Vec<u8>>>>,
176 pub resize_tx: mpsc::Sender<(u32, u32)>,
178 pub cancel: CancellationToken,
181}
182
183pub struct Client {
190 host: String,
191 port: u16,
192 store: Arc<HostKeyStore>,
193 verification_failure_slot: VerificationFailureSlot,
194}
195
196impl Client {
197 pub fn new(
198 host: impl Into<String>,
199 port: u16,
200 store: Arc<HostKeyStore>,
201 ) -> (Self, VerificationFailureSlot) {
202 let slot: VerificationFailureSlot = Arc::new(std::sync::Mutex::new(None));
203 let client = Self {
204 host: host.into(),
205 port,
206 store,
207 verification_failure_slot: slot.clone(),
208 };
209 (client, slot)
210 }
211}
212
213#[async_trait::async_trait]
214impl client::Handler for Client {
215 type Error = russh::Error;
216
217 async fn check_server_key(
218 &mut self,
219 server_public_key: &key::PublicKey,
220 ) -> Result<bool, Self::Error> {
221 match self
222 .store
223 .verify(&self.host, self.port, server_public_key)
224 .await
225 {
226 Ok(Verdict::Known) => {
227 tracing::debug!(
228 "host key for {}:{} matches known_hosts",
229 self.host,
230 self.port
231 );
232 Ok(true)
233 }
234 Ok(Verdict::Unknown) => {
235 tracing::warn!(
236 "TOFU: trusting new host key for {}:{} (fingerprint SHA256:{})",
237 self.host,
238 self.port,
239 server_public_key.fingerprint()
240 );
241 if let Err(e) = self
242 .store
243 .trust(&self.host, self.port, server_public_key)
244 .await
245 {
246 tracing::error!("failed to persist host key: {}", e);
247 if let Ok(mut slot) = self.verification_failure_slot.lock() {
248 *slot = Some(HostKeyVerificationFailure::StoreAccess(
249 HostKeyStoreAccessError {
250 host: self.host.clone(),
251 port: self.port,
252 store_path: self.store.path().to_path_buf(),
253 operation: "write",
254 source: e.to_string(),
255 },
256 ));
257 }
258 return Err(
259 std::io::Error::other("failed to persist trusted SSH host key").into(),
260 );
261 }
262 Ok(true)
263 }
264 Ok(Verdict::Mismatch {
265 expected_fingerprint,
266 got_fingerprint,
267 }) => {
268 tracing::error!(
269 "host key mismatch for {}:{} — expected SHA256:{}, got SHA256:{}",
270 self.host,
271 self.port,
272 expected_fingerprint,
273 got_fingerprint
274 );
275 if let Ok(mut slot) = self.verification_failure_slot.lock() {
276 *slot = Some(HostKeyVerificationFailure::Mismatch(HostKeyMismatch {
277 host: self.host.clone(),
278 port: self.port,
279 expected_fingerprint,
280 got_fingerprint,
281 store_path: self.store.path().to_path_buf(),
282 }));
283 }
284 Ok(false)
285 }
286 Err(e) => {
287 tracing::error!("failed to access host-key store: {}", e);
288 if let Ok(mut slot) = self.verification_failure_slot.lock() {
289 *slot = Some(HostKeyVerificationFailure::StoreAccess(
290 HostKeyStoreAccessError {
291 host: self.host.clone(),
292 port: self.port,
293 store_path: self.store.path().to_path_buf(),
294 operation: "read",
295 source: e.to_string(),
296 },
297 ));
298 }
299 Err(std::io::Error::other("failed to access SSH host-key store").into())
300 }
301 }
302 }
303}
304
305pub(crate) enum ResolvedAuth<'a> {
309 Password {
310 password: &'a str,
311 },
312 Key {
313 key: Box<key::KeyPair>,
314 key_path_hint: Option<&'a str>,
317 },
318 Agent {
319 identity_hint: Option<&'a str>,
320 },
321}
322
323pub(crate) async fn connect_authenticated(
327 host: &str,
328 port: u16,
329 username: &str,
330 auth: ResolvedAuth<'_>,
331 timeout: Duration,
332 host_keys: Arc<HostKeyStore>,
333) -> Result<client::Handle<Client>> {
334 let ssh_config = client::Config {
335 preferred: russh::Preferred {
336 key: PREFERRED_HOST_KEY_ALGOS,
337 kex: PREFERRED_KEX_ALGOS,
338 ..russh::Preferred::DEFAULT
339 },
340 keepalive_interval: Some(Duration::from_secs(60)),
341 keepalive_max: 3,
342 ..client::Config::default()
343 };
344
345 let (handler, verification_failure_slot) = Client::new(host, port, host_keys);
346
347 let mut session = tokio::time::timeout(
348 timeout,
349 client::connect(Arc::new(ssh_config), (host, port), handler),
350 )
351 .await
352 .map_err(|_| {
353 anyhow::anyhow!(
354 "Connection timed out after {}s. Please check the host address and network.",
355 timeout.as_secs()
356 )
357 })?
358 .map_err(|e| {
359 if let Ok(mut guard) = verification_failure_slot.lock()
360 && let Some(failure) = guard.take()
361 {
362 return anyhow::anyhow!(format_verification_failure(&failure));
363 }
364
365 let msg = e.to_string();
372 let looks_like_reset = msg.contains("reset by peer")
373 || msg.contains("ConnectionReset")
374 || msg.contains("kex_exchange_identification");
375 if looks_like_reset {
376 return anyhow::anyhow!(
377 "The SSH server at {}:{} accepted the TCP connection but then \
378 reset it during the handshake ({}).\n\n\
379 This usually means the server is rejecting your source IP \
380 or SSH client via a firewall / access list. Try:\n\
381 - Confirm your public IP is on the server's allowlist (ask \
382 the service operator).\n\
383 - Connect over a VPN that terminates inside the allowed \
384 network.\n\
385 - Verify the host and port are correct for external access \
386 (some services publish a different SFTP endpoint).",
387 host,
388 port,
389 e
390 );
391 }
392
393 anyhow::anyhow!("Failed to connect to {}:{}: {}", host, port, e)
394 })?;
395
396 let key_hint_for_error = match &auth {
399 ResolvedAuth::Password { .. } => None,
400 ResolvedAuth::Key { key_path_hint, .. } => key_path_hint.map(String::from),
401 ResolvedAuth::Agent { identity_hint } => Some(
402 identity_hint
403 .filter(|hint| !hint.is_empty())
404 .unwrap_or("SSH agent")
405 .to_string(),
406 ),
407 };
408
409 let authenticated = match auth {
410 ResolvedAuth::Password { password } => session
411 .authenticate_password(username, password)
412 .await
413 .map_err(|e| anyhow::anyhow!("Password authentication failed: {}", e))?,
414 ResolvedAuth::Key { key, .. } => session
415 .authenticate_publickey(username, Arc::new(*key))
416 .await
417 .map_err(|e| {
418 anyhow::anyhow!(
419 "Public key authentication failed: {}. The key may not be authorized on the server.",
420 e
421 )
422 })?,
423 ResolvedAuth::Agent { identity_hint } => {
424 let mut agent = russh_keys::agent::client::AgentClient::connect_env()
425 .await
426 .map_err(|e| {
427 anyhow::anyhow!(
428 "SSH agent authentication is enabled, but r-shell could not connect to SSH_AUTH_SOCK: {}",
429 e
430 )
431 })?;
432 let identities = agent.request_identities().await.map_err(|e| {
433 anyhow::anyhow!("SSH agent did not return identities: {}", e)
434 })?;
435 let key = select_agent_identity(identities, identity_hint).ok_or_else(|| {
436 if let Some(hint) = identity_hint.filter(|hint| !hint.is_empty()) {
437 anyhow::anyhow!(
438 "SSH agent has no identity matching '{}'. Add the key to your agent or clear the identity hint.",
439 hint
440 )
441 } else {
442 anyhow::anyhow!("SSH agent has no identities. Add a key to your agent and try again.")
443 }
444 })?;
445 let (_agent, result) = session.authenticate_future(username.to_string(), key, agent).await;
446 result.map_err(|e| anyhow::anyhow!("SSH agent authentication failed: {}", e))?
447 }
448 };
449
450 if !authenticated {
451 return Err(match key_hint_for_error {
452 None => anyhow::anyhow!(
453 "Authentication failed for {}@{} with password authentication.",
454 username,
455 host
456 ),
457 Some(path) => anyhow::anyhow!(
458 "Authentication failed for {}@{} using public key {}.",
459 username,
460 host,
461 path
462 ),
463 });
464 }
465
466 Ok(session)
467}
468
469fn select_agent_identity(
470 identities: Vec<key::PublicKey>,
471 identity_hint: Option<&str>,
472) -> Option<key::PublicKey> {
473 let hint = identity_hint.map(str::trim).filter(|hint| !hint.is_empty());
474
475 match hint {
476 None => identities.into_iter().next(),
477 Some(hint) => identities.into_iter().find(|identity| {
478 let encoded = identity.public_key_base64();
479 encoded.contains(hint) || hint.contains(&encoded)
480 }),
481 }
482}
483
484pub fn format_mismatch(m: &HostKeyMismatch) -> String {
486 format!(
487 "Host key verification failed for {}:{}.\n\
488 Expected fingerprint (stored): SHA256:{}\n\
489 Offered fingerprint (server): SHA256:{}\n\
490 If the remote host legitimately rotated its key, remove the entry from:\n {}",
491 m.host,
492 m.port,
493 m.expected_fingerprint,
494 m.got_fingerprint,
495 m.store_path.display()
496 )
497}
498
499fn format_store_access_error(err: &HostKeyStoreAccessError) -> String {
500 format!(
501 "Host key verification could not complete for {}:{}.\n\
502 r-shell could not {} the trusted host-key store at:\n {}\n\
503 Underlying error: {}\n\
504 Connection refused to avoid trusting a host key without a durable trust store.",
505 err.host,
506 err.port,
507 err.operation,
508 err.store_path.display(),
509 err.source
510 )
511}
512
513pub fn format_verification_failure(failure: &HostKeyVerificationFailure) -> String {
514 match failure {
515 HostKeyVerificationFailure::Mismatch(mismatch) => format_mismatch(mismatch),
516 HostKeyVerificationFailure::StoreAccess(err) => format_store_access_error(err),
517 }
518}
519
520pub(crate) fn expand_home_path(path: &str) -> Option<String> {
525 if let Some(rest) = path.strip_prefix("~/") {
526 let home = dirs::home_dir()?;
527 Some(home.join(rest).to_string_lossy().into_owned())
528 } else if path == "~" {
529 dirs::home_dir().map(|h| h.to_string_lossy().into_owned())
530 } else {
531 Some(path.to_string())
532 }
533}
534
535pub(crate) fn load_private_key(key_path: &str, passphrase: Option<&str>) -> Result<key::KeyPair> {
536 let expanded = expand_home_path(key_path).ok_or_else(|| {
537 anyhow::anyhow!(
538 "Cannot resolve '~' in SSH key path '{}': home directory unknown.",
539 key_path
540 )
541 })?;
542 let path = Path::new(&expanded);
543
544 let location = if expanded != key_path {
547 format!("{} (expanded from {})", expanded, key_path)
548 } else {
549 expanded.clone()
550 };
551
552 match path.metadata() {
556 Ok(_) => {}
557 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
558 return Err(anyhow::anyhow!(
559 "SSH key file not found: {}. Please check the file path and try again.",
560 location
561 ));
562 }
563 Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => {
564 return Err(anyhow::anyhow!(
565 "Permission denied reading SSH key at {}.\n\
566 On macOS this usually means r-shell hasn't been granted access to this file. \
567 Open System Settings → Privacy & Security → Full Disk Access (or App Management / Files and Folders), \
568 add r-shell to the list, then try again.",
569 location
570 ));
571 }
572 Err(e) => {
573 return Err(anyhow::anyhow!(
574 "Cannot access SSH key at {}: {}",
575 location,
576 e
577 ));
578 }
579 }
580
581 load_secret_key(path, passphrase).map_err(|e| {
582 let msg = e.to_string();
583 if msg.contains("encrypted") || msg.contains("passphrase") {
584 anyhow::anyhow!(
585 "Failed to decrypt SSH key at {}. The key may be encrypted. Please provide the correct passphrase.",
586 expanded
587 )
588 } else {
589 anyhow::anyhow!(
590 "Failed to load SSH key from {}: {}. Ensure the file is a valid SSH private key (RSA, Ed25519, or ECDSA).",
591 expanded, e
592 )
593 }
594 })
595}
596
597impl SshClient {
598 pub fn new(host_keys: Arc<HostKeyStore>) -> Self {
599 Self {
600 session: None,
601 host_keys,
602 sftp: tokio::sync::OnceCell::new(),
603 }
604 }
605
606 async fn sftp_session(&self) -> Result<Arc<SftpSession>> {
610 let session = self
611 .session
612 .as_ref()
613 .ok_or_else(|| anyhow::anyhow!("Not connected"))?
614 .clone();
615 let sftp = self
616 .sftp
617 .get_or_try_init(|| async move {
618 let channel = session.channel_open_session().await?;
619 channel.request_subsystem(true, "sftp").await?;
620 let session = SftpSession::new(channel.into_stream()).await?;
621 Ok::<_, anyhow::Error>(Arc::new(session))
622 })
623 .await?;
624 Ok(sftp.clone())
625 }
626
627 pub async fn connect(&mut self, config: &SshConfig) -> Result<()> {
628 let auth = match &config.auth_method {
629 AuthMethod::Password { password } => ResolvedAuth::Password { password },
630 AuthMethod::PublicKey {
631 key_path,
632 passphrase,
633 } => ResolvedAuth::Key {
634 key: Box::new(load_private_key(key_path, passphrase.as_deref())?),
635 key_path_hint: Some(key_path),
636 },
637 AuthMethod::Agent { identity_hint } => ResolvedAuth::Agent {
638 identity_hint: identity_hint.as_deref(),
639 },
640 };
641
642 let session = connect_authenticated(
643 &config.host,
644 config.port,
645 &config.username,
646 auth,
647 Duration::from_secs(10),
648 self.host_keys.clone(),
649 )
650 .await?;
651
652 self.session = Some(Arc::new(session));
653 Ok(())
654 }
655
656 pub async fn execute_command(&self, command: &str) -> Result<String> {
665 let out = self.execute_command_full(command).await?;
666 Ok(out.combined())
667 }
668
669 pub async fn execute_command_full(&self, command: &str) -> Result<CommandOutput> {
671 let Some(session) = &self.session else {
672 return Err(anyhow::anyhow!("Not connected"));
673 };
674
675 let mut channel = session.channel_open_session().await?;
676 channel.exec(true, command).await?;
677
678 let mut stdout = String::new();
679 let mut stderr = String::new();
680 let mut exit_code: Option<u32> = None;
681 let mut eof_received = false;
682
683 loop {
684 let msg = channel.wait().await;
685 match msg {
686 Some(ChannelMsg::Data { ref data }) => {
687 stdout.push_str(&String::from_utf8_lossy(data));
688 }
689 Some(ChannelMsg::ExtendedData { ref data, .. }) => {
690 stderr.push_str(&String::from_utf8_lossy(data));
695 }
696 Some(ChannelMsg::ExitStatus { exit_status }) => {
697 exit_code = Some(exit_status);
698 if eof_received {
699 break;
700 }
701 }
702 Some(ChannelMsg::Eof) => {
703 eof_received = true;
704 if exit_code.is_some() {
705 break;
706 }
707 }
708 Some(ChannelMsg::Close) | None => {
709 break;
710 }
711 _ => {}
712 }
713 }
714
715 Ok(CommandOutput {
716 stdout,
717 stderr,
718 exit_code,
719 })
720 }
721
722 pub async fn execute_command_streaming(
733 &self,
734 command: &str,
735 ) -> Result<(mpsc::Receiver<String>, CancellationToken)> {
736 let Some(session) = &self.session else {
737 return Err(anyhow::anyhow!("Not connected"));
738 };
739
740 let mut channel = session.channel_open_session().await?;
741 channel.exec(true, command).await?;
742
743 let (tx, rx) = mpsc::channel::<String>(256);
744 let cancel = CancellationToken::new();
745 let cancel_task = cancel.clone();
746
747 tokio::spawn(async move {
748 let mut stdout_buf = String::new();
750 let mut stderr_buf = String::new();
751 loop {
752 tokio::select! {
753 _ = cancel_task.cancelled() => {
754 let _ = channel.eof().await;
755 let _ = channel.close().await;
756 break;
757 }
758 msg = channel.wait() => {
759 match msg {
760 Some(ChannelMsg::Data { ref data }) => {
761 stdout_buf.push_str(&String::from_utf8_lossy(data));
762 while let Some(idx) = stdout_buf.find('\n') {
763 let line: String = stdout_buf.drain(..=idx).collect();
764 let trimmed = line.trim_end_matches(['\r', '\n']).to_string();
765 if tx.send(trimmed).await.is_err() {
766 cancel_task.cancel();
767 break;
768 }
769 }
770 }
771 Some(ChannelMsg::ExtendedData { ref data, .. }) => {
772 stderr_buf.push_str(&String::from_utf8_lossy(data));
773 while let Some(idx) = stderr_buf.find('\n') {
774 let line: String = stderr_buf.drain(..=idx).collect();
775 let trimmed = line.trim_end_matches(['\r', '\n']).to_string();
776 if tx.send(format!("!{}", trimmed)).await.is_err() {
777 cancel_task.cancel();
778 break;
779 }
780 }
781 }
782 Some(ChannelMsg::Eof) | Some(ChannelMsg::Close) | None => {
783 if !stdout_buf.is_empty() {
784 let _ = tx.send(stdout_buf.trim_end_matches(['\r', '\n']).to_string()).await;
785 }
786 if !stderr_buf.is_empty() {
787 let _ = tx.send(format!("!{}", stderr_buf.trim_end_matches(['\r', '\n']))).await;
788 }
789 break;
790 }
791 _ => {}
792 }
793 }
794 }
795 }
796 });
797
798 Ok((rx, cancel))
799 }
800
801 pub async fn disconnect(&mut self) -> Result<()> {
802 self.sftp.take();
805
806 if let Some(session) = self.session.take() {
807 match Arc::try_unwrap(session) {
808 Ok(session) => {
809 if let Err(e) = session.disconnect(Disconnect::ByApplication, "", "").await {
810 tracing::warn!("SSH disconnect failed cleanly: {}", e);
811 }
812 }
813 Err(arc_session) => {
814 tracing::debug!("SSH disconnect: other refs still alive, dropping handle");
818 drop(arc_session);
819 }
820 }
821 }
822 Ok(())
823 }
824
825 pub async fn open_direct_tcpip(
834 &self,
835 host: &str,
836 port: u16,
837 ) -> Result<russh::Channel<russh::client::Msg>> {
838 let Some(session) = &self.session else {
839 return Err(anyhow::anyhow!("Not connected"));
840 };
841 let channel = session
842 .channel_open_direct_tcpip(host.to_string(), port as u32, "127.0.0.1", 0)
843 .await?;
844 Ok(channel)
845 }
846
847 pub async fn create_pty_session(&self, cols: u32, rows: u32) -> Result<PtySession> {
850 if let Some(session) = &self.session {
851 let mut channel = session.channel_open_session().await?;
853
854 channel
857 .request_pty(
858 true, "xterm-256color", cols, rows, 0, 0, &[], )
866 .await?;
867
868 channel.request_shell(true).await?;
870
871 let (input_tx, mut input_rx) = mpsc::channel::<Vec<u8>>(1000); let (output_tx, output_rx) = mpsc::channel::<Vec<u8>>(2000); let input_channel = channel.make_writer();
878
879 let (resize_tx, mut resize_rx) = mpsc::channel::<(u32, u32)>(16);
881
882 let cancel = CancellationToken::new();
886
887 let input_cancel = cancel.clone();
891 tokio::spawn(async move {
892 let mut writer = input_channel;
893 loop {
894 tokio::select! {
895 biased;
896 _ = input_cancel.cancelled() => {
897 tracing::debug!("[PTY] input task cancelled");
898 break;
899 }
900 maybe_data = input_rx.recv() => {
901 let Some(data) = maybe_data else {
902 break;
904 };
905 if let Err(e) = writer.write_all(&data).await {
906 tracing::error!("[PTY] failed to send data to SSH: {}", e);
907 break;
908 }
909 if let Err(e) = writer.flush().await {
910 tracing::error!("[PTY] failed to flush data to SSH: {}", e);
911 break;
912 }
913 }
914 }
915 }
916 });
917
918 let output_cancel = cancel.clone();
926 tokio::spawn(async move {
927 loop {
928 tokio::select! {
929 biased;
930 _ = output_cancel.cancelled() => {
931 tracing::debug!("[PTY] output task cancelled");
932 break;
933 }
934 msg = channel.wait() => {
935 match msg {
936 Some(ChannelMsg::Data { data })
937 if output_tx.send(data.to_vec()).await.is_err() =>
938 {
939 break;
940 }
941 Some(ChannelMsg::ExtendedData { data, .. })
942 if output_tx.send(data.to_vec()).await.is_err() =>
943 {
944 break;
946 }
947 Some(ChannelMsg::Eof) | Some(ChannelMsg::Close) | None => {
948 tracing::debug!("[PTY] channel closed");
949 break;
950 }
951 Some(ChannelMsg::ExitStatus { exit_status }) => {
952 tracing::info!("[PTY] process exited with status: {}", exit_status);
953 }
954 _ => {}
955 }
956 }
957 resize = resize_rx.recv() => {
958 match resize {
959 Some((cols, rows)) => {
960 if let Err(e) = channel.window_change(cols, rows, 0, 0).await {
961 tracing::error!("[PTY] failed to send window change: {}", e);
962 } else {
963 tracing::debug!("[PTY] window changed to {}x{}", cols, rows);
964 }
965 }
966 None => {
967 break;
969 }
970 }
971 }
972 }
973 }
974 });
975
976 Ok(PtySession {
977 input_tx,
978 output_rx: Arc::new(tokio::sync::Mutex::new(output_rx)),
979 resize_tx,
980 cancel,
981 })
982 } else {
983 Err(anyhow::anyhow!("Not connected"))
984 }
985 }
986
987 pub async fn list_dir(&self, path: &str) -> Result<Vec<RemoteFileEntry>> {
988 let sftp = self.sftp_session().await?;
989 let entries = sftp
990 .read_dir(path)
991 .await
992 .map_err(|e| anyhow::anyhow!("Failed to list directory '{}': {}", path, e))?;
993
994 let mut result = Vec::new();
995 for entry in entries {
996 let name = entry.file_name();
997 if name == "." || name == ".." {
998 continue;
999 }
1000
1001 let attrs = entry.metadata();
1002 let size = attrs.size.unwrap_or(0);
1003 let mtime_secs = attrs.mtime.map(|t| t as i64);
1004 let modified = mtime_secs.map(format_unix_timestamp);
1005 let permissions = attrs.permissions.map(format_permissions);
1006 let owner = attrs.uid.map(|u| u.to_string());
1007 let group = attrs.gid.map(|g| g.to_string());
1008
1009 let file_type = if attrs.is_dir() {
1010 FileEntryType::Directory
1011 } else if attrs.is_symlink() {
1012 FileEntryType::Symlink
1013 } else {
1014 FileEntryType::File
1015 };
1016
1017 result.push(RemoteFileEntry {
1018 name,
1019 size,
1020 modified,
1021 modified_unix: mtime_secs,
1022 permissions,
1023 owner,
1024 group,
1025 file_type,
1026 });
1027 }
1028
1029 result.sort_by(|a, b| {
1030 let a_is_dir = matches!(a.file_type, FileEntryType::Directory);
1031 let b_is_dir = matches!(b.file_type, FileEntryType::Directory);
1032 b_is_dir
1033 .cmp(&a_is_dir)
1034 .then_with(|| a.name.to_lowercase().cmp(&b.name.to_lowercase()))
1035 });
1036
1037 Ok(result)
1038 }
1039
1040 pub async fn download_file(&self, remote_path: &str, local_path: &str) -> Result<u64> {
1041 self.download_file_with_progress(remote_path, local_path, |_| {}, None)
1042 .await
1043 }
1044
1045 pub async fn download_file_with_progress(
1061 &self,
1062 remote_path: &str,
1063 local_path: &str,
1064 mut progress: impl FnMut(u64),
1065 cancel: Option<&CancellationToken>,
1066 ) -> Result<u64> {
1067 let sftp = self.sftp_session().await?;
1068 let mut remote_file = sftp.open(remote_path).await?;
1069 let mut local_file = tokio::fs::File::create(local_path).await?;
1070
1071 let mut buf = vec![0u8; SFTP_CHUNK_SIZE];
1072 let mut total_bytes = 0u64;
1073 loop {
1074 if let Some(token) = cancel
1075 && token.is_cancelled()
1076 {
1077 return Err(anyhow::anyhow!("Transfer cancelled"));
1078 }
1079 let n = remote_file.read(&mut buf).await?;
1080 if n == 0 {
1081 break;
1082 }
1083 local_file.write_all(&buf[..n]).await?;
1084 total_bytes += n as u64;
1085 progress(total_bytes);
1086 }
1087 local_file.flush().await?;
1088
1089 Ok(total_bytes)
1090 }
1091
1092 pub async fn download_file_to_memory(&self, remote_path: &str) -> Result<Vec<u8>> {
1093 let sftp = self.sftp_session().await?;
1094 let mut remote_file = sftp.open(remote_path).await?;
1095
1096 let mut buffer = Vec::new();
1097 let mut temp_buf = vec![0u8; SFTP_CHUNK_SIZE];
1098 loop {
1099 let n = remote_file.read(&mut temp_buf).await?;
1100 if n == 0 {
1101 break;
1102 }
1103 buffer.extend_from_slice(&temp_buf[..n]);
1104 }
1105 Ok(buffer)
1106 }
1107
1108 pub async fn upload_file(&self, local_path: &str, remote_path: &str) -> Result<u64> {
1109 self.upload_file_with_progress(local_path, remote_path, |_| {}, None)
1110 .await
1111 }
1112
1113 pub async fn upload_file_with_progress(
1121 &self,
1122 local_path: &str,
1123 remote_path: &str,
1124 mut progress: impl FnMut(u64),
1125 cancel: Option<&CancellationToken>,
1126 ) -> Result<u64> {
1127 let sftp = self.sftp_session().await?;
1128 let mut local_file = tokio::fs::File::open(local_path).await?;
1129 let mut remote_file = sftp.create(remote_path).await?;
1130
1131 let mut buf = vec![0u8; SFTP_CHUNK_SIZE];
1132 let mut total_bytes = 0u64;
1133 loop {
1134 if let Some(token) = cancel
1135 && token.is_cancelled()
1136 {
1137 return Err(anyhow::anyhow!("Transfer cancelled"));
1138 }
1139 let n = local_file.read(&mut buf).await?;
1140 if n == 0 {
1141 break;
1142 }
1143 remote_file.write_all(&buf[..n]).await?;
1144 total_bytes += n as u64;
1145 progress(total_bytes);
1146 }
1147 remote_file.flush().await?;
1148
1149 Ok(total_bytes)
1150 }
1151
1152 pub async fn upload_file_from_bytes(&self, data: &[u8], remote_path: &str) -> Result<u64> {
1153 let sftp = self.sftp_session().await?;
1154 let mut remote_file = sftp.create(remote_path).await?;
1155
1156 for chunk in data.chunks(SFTP_CHUNK_SIZE) {
1157 remote_file.write_all(chunk).await?;
1158 }
1159 remote_file.flush().await?;
1160
1161 Ok(data.len() as u64)
1162 }
1163
1164 pub async fn create_dir(&self, path: &str) -> Result<()> {
1167 let sftp = self.sftp_session().await?;
1168 sftp.create_dir(path)
1169 .await
1170 .map_err(|e| anyhow::anyhow!("Failed to create directory '{}': {}", path, e))?;
1171 Ok(())
1172 }
1173
1174 pub async fn rename(&self, old_path: &str, new_path: &str) -> Result<()> {
1178 let sftp = self.sftp_session().await?;
1179 sftp.rename(old_path, new_path).await.map_err(|e| {
1180 anyhow::anyhow!("Failed to rename '{}' to '{}': {}", old_path, new_path, e)
1181 })?;
1182 Ok(())
1183 }
1184
1185 pub async fn delete_file(&self, path: &str) -> Result<()> {
1187 let sftp = self.sftp_session().await?;
1188 sftp.remove_file(path)
1189 .await
1190 .map_err(|e| anyhow::anyhow!("Failed to delete file '{}': {}", path, e))?;
1191 Ok(())
1192 }
1193
1194 pub async fn delete_dir(&self, path: &str) -> Result<()> {
1198 let sftp = self.sftp_session().await?;
1199 sftp.remove_dir(path)
1200 .await
1201 .map_err(|e| anyhow::anyhow!("Failed to delete directory '{}': {}", path, e))?;
1202 Ok(())
1203 }
1204}
1205
1206#[cfg(test)]
1207mod expand_home_tests {
1208 use super::expand_home_path;
1209
1210 #[test]
1211 fn returns_non_tilde_paths_unchanged() {
1212 assert_eq!(
1213 expand_home_path("/absolute/path").as_deref(),
1214 Some("/absolute/path")
1215 );
1216 assert_eq!(
1217 expand_home_path("relative/dir").as_deref(),
1218 Some("relative/dir")
1219 );
1220 assert_eq!(expand_home_path("").as_deref(), Some(""));
1221 }
1222
1223 #[test]
1224 fn expands_tilde_slash_prefix_when_home_is_known() {
1225 let expanded = expand_home_path("~/.ssh/id_rsa");
1228 if let Some(result) = expanded {
1231 assert!(
1232 !result.starts_with("~/"),
1233 "tilde must be expanded: {}",
1234 result
1235 );
1236 assert!(
1237 result.ends_with("/.ssh/id_rsa"),
1238 "suffix preserved: {}",
1239 result
1240 );
1241 }
1242 }
1243}
1244
1245#[cfg(test)]
1246mod command_output_tests {
1247 use super::CommandOutput;
1248
1249 #[test]
1250 fn is_success_requires_zero_exit() {
1251 assert!(
1252 CommandOutput {
1253 stdout: "x".into(),
1254 stderr: "".into(),
1255 exit_code: Some(0),
1256 }
1257 .is_success()
1258 );
1259 assert!(
1260 !CommandOutput {
1261 stdout: "x".into(),
1262 stderr: "".into(),
1263 exit_code: Some(1),
1264 }
1265 .is_success()
1266 );
1267 assert!(
1268 !CommandOutput {
1269 stdout: "x".into(),
1270 stderr: "".into(),
1271 exit_code: None,
1272 }
1273 .is_success()
1274 );
1275 }
1276
1277 #[test]
1278 fn combined_merges_streams_with_separator() {
1279 let c = CommandOutput {
1280 stdout: "out".into(),
1281 stderr: "err".into(),
1282 exit_code: Some(0),
1283 };
1284 assert_eq!(c.combined(), "out\nerr");
1285 }
1286
1287 #[test]
1288 fn combined_preserves_trailing_newline() {
1289 let c = CommandOutput {
1290 stdout: "out\n".into(),
1291 stderr: "err".into(),
1292 exit_code: Some(0),
1293 };
1294 assert_eq!(c.combined(), "out\nerr");
1295 }
1296
1297 #[test]
1298 fn combined_returns_single_stream_when_other_empty() {
1299 assert_eq!(
1300 CommandOutput {
1301 stdout: "only".into(),
1302 stderr: "".into(),
1303 exit_code: Some(0),
1304 }
1305 .combined(),
1306 "only"
1307 );
1308 assert_eq!(
1309 CommandOutput {
1310 stdout: "".into(),
1311 stderr: "only-err".into(),
1312 exit_code: Some(1),
1313 }
1314 .combined(),
1315 "only-err"
1316 );
1317 }
1318}
1319
1320#[cfg(test)]
1321mod redaction_tests {
1322 use super::{AuthMethod, SshConfig};
1323
1324 #[test]
1325 fn debug_redacts_password() {
1326 let cfg = SshConfig {
1327 host: "h".into(),
1328 port: 22,
1329 username: "u".into(),
1330 auth_method: AuthMethod::Password {
1331 password: "super-secret-123".into(),
1332 },
1333 };
1334 let rendered = format!("{:?}", cfg);
1335 assert!(
1336 !rendered.contains("super-secret-123"),
1337 "password must not appear in Debug output: {}",
1338 rendered
1339 );
1340 assert!(rendered.contains("<redacted>"), "expected redaction marker");
1341 }
1342
1343 #[test]
1344 fn debug_redacts_passphrase() {
1345 let m = AuthMethod::PublicKey {
1346 key_path: "/tmp/id".into(),
1347 passphrase: Some("xyz-passphrase".into()),
1348 };
1349 let rendered = format!("{:?}", m);
1350 assert!(!rendered.contains("xyz-passphrase"));
1351 assert!(rendered.contains("<redacted>"));
1352 assert!(rendered.contains("/tmp/id"));
1353 }
1354
1355 #[test]
1356 fn debug_shows_none_when_no_passphrase() {
1357 let m = AuthMethod::PublicKey {
1358 key_path: "/tmp/id".into(),
1359 passphrase: None,
1360 };
1361 let rendered = format!("{:?}", m);
1362 assert!(rendered.contains("<none>"));
1363 }
1364}
1365
1366#[cfg(test)]
1367mod key_loading_tests {
1368 use super::load_private_key;
1369 use std::io::Write;
1370 use tempfile::NamedTempFile;
1371
1372 const TEST_OPENSSH_PRIVATE_KEY: &str = "\
1373-----BEGIN OPENSSH PRIVATE KEY-----\n\
1374b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW\n\
1375QyNTUxOQAAACCzPq7zfqLffKoBDe/eo04kH2XxtSmk9D7RQyf1xUqrYgAAAJgAIAxdACAM\n\
1376XQAAAAtzc2gtZWQyNTUxOQAAACCzPq7zfqLffKoBDe/eo04kH2XxtSmk9D7RQyf1xUqrYg\n\
1377AAAEC2BsIi0QwW2uFscKTUUXNHLsYX4FxlaSDSblbAj7WR7bM+rvN+ot98qgEN796jTiQf\n\
1378ZfG1KaT0PtFDJ/XFSqtiAAAAEHVzZXJAZXhhbXBsZS5jb20BAgMEBQ==\n\
1379-----END OPENSSH PRIVATE KEY-----\n";
1380
1381 #[test]
1382 fn load_private_key_reads_key_file_contents() {
1383 let mut key_file = NamedTempFile::new().expect("failed to create temp key file");
1384 key_file
1385 .write_all(TEST_OPENSSH_PRIVATE_KEY.as_bytes())
1386 .expect("failed to write temp key file");
1387
1388 let key = load_private_key(
1389 key_file
1390 .path()
1391 .to_str()
1392 .expect("temp key path must be valid UTF-8"),
1393 None,
1394 )
1395 .expect("expected key file to load successfully");
1396
1397 assert_eq!(key.name(), "ssh-ed25519");
1398 }
1399}
1400
1401#[cfg(test)]
1402mod tests;