Skip to main content

remotefs_ssh/ssh/backend/
russh.rs

1//! [russh](https://docs.rs/russh/latest/russh/) backend for `remotefs-ssh`.
2
3mod auth;
4mod scp;
5
6use std::borrow::Cow;
7use std::io::{Read, Seek, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use remotefs::fs::{Metadata, ReadStream, WriteStream};
12use remotefs::{File, RemoteError, RemoteErrorType, RemoteResult};
13use russh::client::{Handle, Handler};
14use russh::keys::{Algorithm, PublicKey};
15use russh::{Disconnect, client};
16use russh_sftp::client::SftpSession;
17use tokio::runtime::Runtime;
18
19use super::{SshSession, WriteMode};
20use crate::SshOpts;
21use crate::ssh::backend::Sftp;
22use crate::ssh::config::Config;
23use crate::ssh::key_method::MethodType;
24
25/// The default SSH client handler for russh.
26///
27/// Accepts all server host keys. Host key verification should be implemented
28/// by the caller if stricter security is required.
29///
30/// You can implement your own [`Handler`] and use it with [`RusshSession`] if you want a different behaviour.
31#[derive(Default)]
32pub struct NoCheckServerKey;
33
34impl Handler for NoCheckServerKey {
35    type Error = russh::Error;
36
37    async fn check_server_key(
38        &mut self,
39        _server_public_key: &PublicKey,
40    ) -> Result<bool, Self::Error> {
41        Ok(true)
42    }
43}
44
45/// [`russh`](https://docs.rs/russh/latest/russh) session.
46pub struct RusshSession<T>
47where
48    T: Handler + Default + Send + 'static,
49{
50    runtime: Arc<Runtime>,
51    session: Handle<T>,
52}
53
54/// SFTP handle for russh.
55pub struct RusshSftp {
56    runtime: Arc<Runtime>,
57    session: SftpSession,
58}
59
60impl<T> SshSession for RusshSession<T>
61where
62    T: Handler + Default + Send + 'static,
63{
64    type Sftp = RusshSftp;
65
66    fn connect(opts: &SshOpts) -> RemoteResult<Self> {
67        let runtime = opts.runtime.as_ref().cloned().ok_or_else(|| {
68            RemoteError::new_ex(
69                RemoteErrorType::UnsupportedFeature,
70                "RusshSession requires a Tokio runtime",
71            )
72        })?;
73
74        let ssh_config = Config::try_from(opts)?;
75        debug!("Connecting to '{}'", ssh_config.address);
76
77        let mut config = client::Config {
78            inactivity_timeout: Some(ssh_config.connection_timeout),
79            ..Default::default()
80        };
81
82        // Apply algorithm preferences from ssh config
83        apply_config_algo_prefs(&mut config, &ssh_config);
84
85        // Apply algorithm preferences from opts
86        apply_opts_algo_prefs(&mut config, opts);
87
88        let config = Arc::new(config);
89
90        let mut session = runtime
91            .block_on(async {
92                client::connect(config, ssh_config.address.as_str(), T::default()).await
93            })
94            .map_err(|err| {
95                let msg = format!("SSH connection failed: {err:?}");
96                error!("{msg}");
97                RemoteError::new_ex(RemoteErrorType::ConnectionError, msg)
98            })?;
99
100        // Authenticate
101        auth::authenticate(&mut session, &runtime, opts, &ssh_config)?;
102
103        Ok(Self { runtime, session })
104    }
105
106    fn disconnect(&self) -> RemoteResult<()> {
107        self.runtime
108            .block_on(async {
109                self.session
110                    .disconnect(Disconnect::ByApplication, "Closed by user", "en_US")
111                    .await
112            })
113            .map_err(|err| {
114                log::error!("failed to disconnect {err}");
115                RemoteError::new_ex(RemoteErrorType::ConnectionError, err.to_string())
116            })
117    }
118
119    fn banner(&self) -> RemoteResult<Option<String>> {
120        // russh delivers the auth banner via the Handler::auth_banner callback
121        // during authentication, but does not expose it from the Handle after the fact.
122        // <https://docs.rs/russh/latest/russh/client/struct.Handle.html>
123        // <https://docs.rs/russh/latest/russh/client/trait.Handler.html#method.auth_banner>
124        Ok(None)
125    }
126
127    fn authenticated(&self) -> RemoteResult<bool> {
128        Ok(!self.session.is_closed())
129    }
130
131    fn cmd<S>(&mut self, cmd: S) -> RemoteResult<(u32, String)>
132    where
133        S: AsRef<str>,
134    {
135        let cmd = cmd.as_ref();
136        trace!("Running command: {cmd}");
137
138        // Escape single quotes and wrap in sh -c for consistent shell behavior.
139        // Without this, commands like "cd /some/dir; somecommand" fail if the
140        // remote user's login shell is fish or another non-POSIX shell.
141        let escaped = cmd.replace('\'', r#"'\''"#);
142        let wrapped = format!("sh -c '{escaped}'");
143
144        self.runtime
145            .block_on(async { perform_shell_cmd(&self.session, &wrapped).await })
146    }
147
148    fn scp_recv(&self, path: &Path) -> RemoteResult<Box<dyn Read + Send>> {
149        self.runtime
150            .block_on(async { scp::recv(&self.session, path).await })
151    }
152
153    fn scp_send(
154        &self,
155        remote_path: &Path,
156        mode: i32,
157        size: u64,
158        _times: Option<(u64, u64)>,
159    ) -> RemoteResult<Box<dyn Write + Send>> {
160        let runtime = self.runtime.clone();
161        self.runtime
162            .block_on(async { scp::send(&self.session, remote_path, mode, size, runtime).await })
163    }
164
165    fn sftp(&self) -> RemoteResult<Self::Sftp> {
166        let channel = self
167            .runtime
168            .block_on(async {
169                let channel = self.session.channel_open_session().await?;
170                channel.request_subsystem(true, "sftp").await?;
171                Ok(channel)
172            })
173            .map_err(|err: russh::Error| {
174                error!("Failed to init SFTP session: {err}");
175                RemoteError::new_ex(RemoteErrorType::ProtocolError, err.to_string())
176            })?;
177
178        self.runtime
179            .block_on(async { SftpSession::new(channel.into_stream()).await })
180            .map(|session| RusshSftp {
181                runtime: self.runtime.clone(),
182                session,
183            })
184            .map_err(|err| {
185                error!("Failed to init SFTP session: {err}");
186                RemoteError::new_ex(RemoteErrorType::ProtocolError, err.to_string())
187            })
188    }
189}
190
191impl Sftp for RusshSftp {
192    fn mkdir(&self, path: &Path, mode: i32) -> RemoteResult<()> {
193        let path_str = path.to_string_lossy().to_string();
194        self.runtime.block_on(async {
195            self.session.create_dir(&path_str).await.map_err(|err| {
196                RemoteError::new_ex(
197                    RemoteErrorType::FileCreateDenied,
198                    format!("Could not create directory '{}': {err}", path.display()),
199                )
200            })?;
201            // create_dir does not set permissions; apply them separately
202            let mut attrs = russh_sftp::protocol::FileAttributes::empty();
203            attrs.permissions = Some(mode as u32 & 0o7777);
204            self.session
205                .set_metadata(&path_str, attrs)
206                .await
207                .map_err(|err| {
208                    RemoteError::new_ex(
209                        RemoteErrorType::ProtocolError,
210                        format!("Could not set permissions on '{}': {err}", path.display()),
211                    )
212                })
213        })
214    }
215
216    fn open_read(&self, path: &Path) -> RemoteResult<ReadStream> {
217        let path_str = path.to_string_lossy().to_string();
218        self.runtime.block_on(async {
219            let data = pipelined_sftp_read(&self.session, &path_str)
220                .await
221                .map_err(|err| {
222                    RemoteError::new_ex(
223                        RemoteErrorType::ProtocolError,
224                        format!("Could not read file at '{}': {err}", path.display()),
225                    )
226                })?;
227            Ok(ReadStream::from(
228                Box::new(std::io::Cursor::new(data)) as Box<dyn Read + Send>
229            ))
230        })
231    }
232
233    fn open_write(&self, path: &Path, flags: WriteMode, mode: i32) -> RemoteResult<WriteStream> {
234        let path_str = path.to_string_lossy().to_string();
235        self.runtime.block_on(async {
236            let open_flags = match flags {
237                WriteMode::Append => {
238                    russh_sftp::protocol::OpenFlags::WRITE
239                        | russh_sftp::protocol::OpenFlags::APPEND
240                        | russh_sftp::protocol::OpenFlags::CREATE
241                }
242                WriteMode::Truncate => {
243                    russh_sftp::protocol::OpenFlags::WRITE
244                        | russh_sftp::protocol::OpenFlags::CREATE
245                        | russh_sftp::protocol::OpenFlags::TRUNCATE
246                }
247            };
248
249            let mut attrs = russh_sftp::protocol::FileAttributes::empty();
250            attrs.permissions = Some(mode as u32 & 0o7777);
251
252            let file = self
253                .session
254                .open_with_flags_and_attributes(&path_str, open_flags, attrs)
255                .await
256                .map_err(|err| {
257                    RemoteError::new_ex(
258                        RemoteErrorType::ProtocolError,
259                        format!("Could not open file at '{}': {err}", path.display()),
260                    )
261                })?;
262
263            let writer = SftpFileWriter {
264                file,
265                runtime: self.runtime.clone(),
266            };
267            Ok(WriteStream::from(
268                Box::new(writer) as Box<dyn remotefs::fs::stream::WriteAndSeek>
269            ))
270        })
271    }
272
273    fn readdir<T>(&self, dirname: T) -> RemoteResult<Vec<File>>
274    where
275        T: AsRef<Path>,
276    {
277        let dirname = dirname.as_ref();
278        let dir_str = dirname.to_string_lossy().to_string();
279        self.runtime.block_on(async {
280            let entries = self.session.read_dir(&dir_str).await.map_err(|err| {
281                RemoteError::new_ex(
282                    RemoteErrorType::ProtocolError,
283                    format!("Could not read directory: {err}"),
284                )
285            })?;
286
287            let mut files = Vec::new();
288            for entry in entries {
289                let entry_path = dirname.join(entry.file_name());
290                let symlink = if entry.file_type().is_symlink() {
291                    match self
292                        .session
293                        .read_link(entry_path.to_string_lossy().as_ref())
294                        .await
295                    {
296                        Ok(target) => Some(PathBuf::from(target)),
297                        Err(err) => {
298                            error!(
299                                "Failed to read link of {} (even though it's a symlink): {err}",
300                                entry_path.display()
301                            );
302                            None
303                        }
304                    }
305                } else {
306                    None
307                };
308                files.push(make_fsentry(&entry_path, &entry.metadata(), symlink));
309            }
310
311            Ok(files)
312        })
313    }
314
315    fn realpath(&self, path: &Path) -> RemoteResult<PathBuf> {
316        let path_str = path.to_string_lossy().to_string();
317        self.runtime.block_on(async {
318            self.session
319                .canonicalize(&path_str)
320                .await
321                .map(PathBuf::from)
322                .map_err(|err| {
323                    RemoteError::new_ex(
324                        RemoteErrorType::ProtocolError,
325                        format!(
326                            "Could not resolve real path for '{}': {err}",
327                            path.display()
328                        ),
329                    )
330                })
331        })
332    }
333
334    fn rename(&self, src: &Path, dest: &Path) -> RemoteResult<()> {
335        let src_str = src.to_string_lossy().to_string();
336        let dest_str = dest.to_string_lossy().to_string();
337        self.runtime.block_on(async {
338            self.session
339                .rename(&src_str, &dest_str)
340                .await
341                .map_err(|err| {
342                    RemoteError::new_ex(
343                        RemoteErrorType::ProtocolError,
344                        format!("Could not rename file '{}': {err}", src.display()),
345                    )
346                })
347        })
348    }
349
350    fn rmdir(&self, path: &Path) -> RemoteResult<()> {
351        let path_str = path.to_string_lossy().to_string();
352        self.runtime.block_on(async {
353            self.session.remove_dir(&path_str).await.map_err(|err| {
354                RemoteError::new_ex(
355                    RemoteErrorType::CouldNotRemoveFile,
356                    format!("Could not remove directory '{}': {err}", path.display()),
357                )
358            })
359        })
360    }
361
362    fn setstat(&self, path: &Path, metadata: Metadata) -> RemoteResult<()> {
363        let path_str = path.to_string_lossy().to_string();
364        let attrs = metadata_to_file_attributes(metadata);
365        self.runtime.block_on(async {
366            self.session
367                .set_metadata(&path_str, attrs)
368                .await
369                .map_err(|err| {
370                    RemoteError::new_ex(
371                        RemoteErrorType::ProtocolError,
372                        format!(
373                            "Could not set file attributes for '{}': {err}",
374                            path.display()
375                        ),
376                    )
377                })
378        })
379    }
380
381    fn stat(&self, filename: &Path) -> RemoteResult<File> {
382        let path_str = filename.to_string_lossy().to_string();
383        self.runtime.block_on(async {
384            let attrs = self.session.metadata(&path_str).await.map_err(|err| {
385                RemoteError::new_ex(
386                    RemoteErrorType::ProtocolError,
387                    format!(
388                        "Could not get file attributes for '{}': {err}",
389                        filename.display()
390                    ),
391                )
392            })?;
393
394            let symlink = if attrs.is_symlink() {
395                match self.session.read_link(&path_str).await {
396                    Ok(target) => Some(PathBuf::from(target)),
397                    Err(err) => {
398                        error!(
399                            "Failed to read link of {} (even though it's a symlink): {err}",
400                            filename.display()
401                        );
402                        None
403                    }
404                }
405            } else {
406                None
407            };
408
409            Ok(make_fsentry(filename, &attrs, symlink))
410        })
411    }
412
413    fn symlink(&self, path: &Path, target: &Path) -> RemoteResult<()> {
414        let path_str = path.to_string_lossy().to_string();
415        let target_str = target.to_string_lossy().to_string();
416        self.runtime.block_on(async {
417            self.session
418                .symlink(&path_str, &target_str)
419                .await
420                .map_err(|err| {
421                    RemoteError::new_ex(
422                        RemoteErrorType::FileCreateDenied,
423                        format!("Could not create symlink '{}': {err}", path.display()),
424                    )
425                })
426        })
427    }
428
429    fn unlink(&self, path: &Path) -> RemoteResult<()> {
430        let path_str = path.to_string_lossy().to_string();
431        self.runtime.block_on(async {
432            self.session.remove_file(&path_str).await.map_err(|err| {
433                RemoteError::new_ex(
434                    RemoteErrorType::CouldNotRemoveFile,
435                    format!("Could not remove file '{}': {err}", path.display()),
436                )
437            })
438        })
439    }
440}
441
442/// Convert `remotefs::fs::Metadata` to `russh_sftp::protocol::FileAttributes`.
443fn metadata_to_file_attributes(metadata: Metadata) -> russh_sftp::protocol::FileAttributes {
444    let atime = metadata
445        .accessed
446        .and_then(|x| x.duration_since(std::time::UNIX_EPOCH).ok())
447        .map(|x| x.as_secs() as u32);
448    let mtime = metadata
449        .modified
450        .and_then(|x| x.duration_since(std::time::UNIX_EPOCH).ok())
451        .map(|x| x.as_secs() as u32);
452    russh_sftp::protocol::FileAttributes {
453        size: Some(metadata.size),
454        uid: metadata.uid,
455        user: None,
456        gid: metadata.gid,
457        group: None,
458        permissions: metadata.mode.map(u32::from),
459        atime,
460        mtime,
461    }
462}
463
464/// Build a `remotefs::File` from a path and russh-sftp `FileAttributes`.
465fn make_fsentry(
466    path: &Path,
467    attrs: &russh_sftp::protocol::FileAttributes,
468    symlink: Option<PathBuf>,
469) -> File {
470    let name = match path.file_name() {
471        None => "/".to_string(),
472        Some(name) => name.to_string_lossy().to_string(),
473    };
474    debug!("Found file {name}");
475
476    let uid = attrs.uid;
477    let gid = attrs.gid;
478    let mode = attrs.permissions.map(remotefs::fs::UnixPex::from);
479    let size = attrs.size.unwrap_or(0);
480    let accessed = attrs.atime.map(|x| {
481        std::time::UNIX_EPOCH
482            .checked_add(std::time::Duration::from_secs(u64::from(x)))
483            .unwrap_or(std::time::UNIX_EPOCH)
484    });
485    let modified = attrs.mtime.map(|x| {
486        std::time::UNIX_EPOCH
487            .checked_add(std::time::Duration::from_secs(u64::from(x)))
488            .unwrap_or(std::time::UNIX_EPOCH)
489    });
490
491    let file_type = if symlink.is_some() {
492        remotefs::fs::FileType::Symlink
493    } else if attrs.is_dir() {
494        remotefs::fs::FileType::Directory
495    } else {
496        remotefs::fs::FileType::File
497    };
498
499    let entry_metadata = Metadata {
500        accessed,
501        created: None,
502        file_type,
503        gid,
504        mode,
505        modified,
506        size,
507        symlink,
508        uid,
509    };
510    trace!("Metadata for {}: {:?}", path.display(), entry_metadata);
511    File {
512        path: path.to_path_buf(),
513        metadata: entry_metadata,
514    }
515}
516
517/// Synchronous writer wrapping a russh-sftp [`russh_sftp::client::fs::File`].
518///
519/// Stores the full `Arc<Runtime>` rather than just a `Handle` so that
520/// `Runtime::block_on` drives IO and background tasks on a current-thread
521/// runtime.
522struct SftpFileWriter {
523    file: russh_sftp::client::fs::File,
524    runtime: Arc<Runtime>,
525}
526
527impl Write for SftpFileWriter {
528    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
529        use tokio::io::AsyncWriteExt as _;
530        self.runtime.block_on(self.file.write(buf))
531    }
532
533    fn flush(&mut self) -> std::io::Result<()> {
534        use tokio::io::AsyncWriteExt as _;
535        self.runtime.block_on(self.file.flush())
536    }
537}
538
539impl Seek for SftpFileWriter {
540    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
541        use tokio::io::AsyncSeekExt as _;
542        self.runtime.block_on(self.file.seek(pos))
543    }
544}
545
546impl remotefs::fs::stream::WriteAndSeek for SftpFileWriter {}
547
548/// Number of concurrent SFTP file handles used for pipelined reads.
549const SFTP_READ_PIPELINE_DEPTH: usize = 4;
550
551/// Read a remote file using multiple concurrent SFTP file handles to pipeline
552/// reads.
553///
554/// Each task owns its chunk buffer and the final result is assembled only after
555/// all tasks complete, avoiding shared mutable state across tasks.
556async fn pipelined_sftp_read(
557    session: &russh_sftp::client::SftpSession,
558    path: &str,
559) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
560    use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _};
561
562    let metadata = session.metadata(path).await?;
563    let file_size = metadata.size.unwrap_or(0) as usize;
564
565    if file_size == 0 {
566        return Ok(Vec::new());
567    }
568
569    let chunk_size = file_size.div_ceil(SFTP_READ_PIPELINE_DEPTH);
570    let mut tasks = Vec::with_capacity(SFTP_READ_PIPELINE_DEPTH);
571
572    for i in 0..SFTP_READ_PIPELINE_DEPTH {
573        let offset = i * chunk_size;
574        if offset >= file_size {
575            break;
576        }
577        let len = chunk_size.min(file_size - offset);
578        let mut file = session.open(path).await.map_err(std::io::Error::other)?;
579        file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
580
581        tasks.push(tokio::spawn(async move {
582            let mut buf = vec![0_u8; len];
583            file.read_exact(&mut buf).await?;
584            Ok::<(usize, Vec<u8>), std::io::Error>((offset, buf))
585        }));
586    }
587
588    let mut result = vec![0_u8; file_size];
589    let mut first_err: Option<std::io::Error> = None;
590
591    for task in tasks {
592        match task.await {
593            Ok(Ok((offset, chunk))) => {
594                result[offset..offset + chunk.len()].copy_from_slice(&chunk);
595            }
596            Ok(Err(err)) => {
597                if first_err.is_none() {
598                    first_err = Some(err);
599                }
600            }
601            Err(err) => {
602                if first_err.is_none() {
603                    first_err = Some(std::io::Error::other(err));
604                }
605            }
606        }
607    }
608
609    if let Some(err) = first_err {
610        return Err(Box::new(err));
611    }
612
613    Ok(result)
614}
615
616/// Apply algorithm preferences from SSH config to the russh [`client::Config`].
617fn apply_config_algo_prefs(config: &mut client::Config, ssh_config: &Config) {
618    let params = &ssh_config.params;
619
620    // KEX algorithms
621    let kex: Vec<russh::kex::Name> = params
622        .kex_algorithms
623        .algorithms()
624        .iter()
625        .filter_map(|name| {
626            russh::kex::Name::try_from(name.as_str())
627                .map_err(|()| warn!("Unsupported KEX algorithm: {name}"))
628                .ok()
629        })
630        .collect();
631    if !kex.is_empty() {
632        config.preferred.kex = Cow::Owned(kex);
633    }
634
635    // Host key algorithms
636    let host_keys: Vec<Algorithm> = params
637        .host_key_algorithms
638        .algorithms()
639        .iter()
640        .filter_map(|name| {
641            name.parse::<Algorithm>()
642                .map_err(|err| warn!("Unsupported host key algorithm '{name}': {err}"))
643                .ok()
644        })
645        .collect();
646    if !host_keys.is_empty() {
647        config.preferred.key = Cow::Owned(host_keys);
648    }
649
650    // Cipher algorithms
651    let ciphers: Vec<russh::cipher::Name> = params
652        .ciphers
653        .algorithms()
654        .iter()
655        .filter_map(|name| {
656            russh::cipher::Name::try_from(name.as_str())
657                .map_err(|()| warn!("Unsupported cipher algorithm: {name}"))
658                .ok()
659        })
660        .collect();
661    if !ciphers.is_empty() {
662        config.preferred.cipher = Cow::Owned(ciphers);
663    }
664
665    // MAC algorithms
666    let macs: Vec<russh::mac::Name> = params
667        .mac
668        .algorithms()
669        .iter()
670        .filter_map(|name| {
671            russh::mac::Name::try_from(name.as_str())
672                .map_err(|()| warn!("Unsupported MAC algorithm: {name}"))
673                .ok()
674        })
675        .collect();
676    if !macs.is_empty() {
677        config.preferred.mac = Cow::Owned(macs);
678    }
679}
680
681/// Apply algorithm preferences from [`SshOpts`] methods to the russh [`client::Config`].
682///
683/// Options from `SshOpts::methods` override those from the SSH config file.
684fn apply_opts_algo_prefs(config: &mut client::Config, opts: &SshOpts) {
685    for method in opts.methods.iter() {
686        let algos = method.prefs();
687        let names: Vec<&str> = algos.split(',').collect();
688
689        match method.method_type {
690            MethodType::Kex => {
691                let kex: Vec<russh::kex::Name> = names
692                    .iter()
693                    .filter_map(|name| {
694                        russh::kex::Name::try_from(*name)
695                            .map_err(|()| warn!("Unsupported KEX algorithm: {name}"))
696                            .ok()
697                    })
698                    .collect();
699                if !kex.is_empty() {
700                    config.preferred.kex = Cow::Owned(kex);
701                }
702            }
703            MethodType::HostKey => {
704                let keys: Vec<Algorithm> = names
705                    .iter()
706                    .filter_map(|name| {
707                        name.parse::<Algorithm>()
708                            .map_err(|err| warn!("Unsupported host key algorithm '{name}': {err}"))
709                            .ok()
710                    })
711                    .collect();
712                if !keys.is_empty() {
713                    config.preferred.key = Cow::Owned(keys);
714                }
715            }
716            MethodType::CryptClientServer | MethodType::CryptServerClient => {
717                let ciphers: Vec<russh::cipher::Name> = names
718                    .iter()
719                    .filter_map(|name| {
720                        russh::cipher::Name::try_from(*name)
721                            .map_err(|()| warn!("Unsupported cipher algorithm: {name}"))
722                            .ok()
723                    })
724                    .collect();
725                if !ciphers.is_empty() {
726                    config.preferred.cipher = Cow::Owned(ciphers);
727                }
728            }
729            MethodType::MacClientServer | MethodType::MacServerClient => {
730                let macs: Vec<russh::mac::Name> = names
731                    .iter()
732                    .filter_map(|name| {
733                        russh::mac::Name::try_from(*name)
734                            .map_err(|()| warn!("Unsupported MAC algorithm: {name}"))
735                            .ok()
736                    })
737                    .collect();
738                if !macs.is_empty() {
739                    config.preferred.mac = Cow::Owned(macs);
740                }
741            }
742            _ => {
743                trace!(
744                    "Ignoring unsupported method type {:?} for russh backend",
745                    method.method_type
746                );
747            }
748        }
749    }
750}
751
752/// Execute a shell command on the remote server via a russh channel.
753///
754/// Opens a session channel, executes the command, collects stdout,
755/// and returns the exit code with the output.
756async fn perform_shell_cmd<T>(session: &Handle<T>, cmd: &str) -> RemoteResult<(u32, String)>
757where
758    T: Handler,
759{
760    let mut channel = open_channel(session).await?;
761
762    channel.exec(true, cmd).await.map_err(|err| {
763        RemoteError::new_ex(
764            RemoteErrorType::ProtocolError,
765            format!("Could not execute command \"{cmd}\": {err}"),
766        )
767    })?;
768
769    let mut output = String::new();
770    let mut exit_code: Option<u32> = None;
771
772    while let Some(msg) = channel.wait().await {
773        match msg {
774            russh::ChannelMsg::Data { data } => {
775                output.push_str(&String::from_utf8_lossy(&data));
776            }
777            russh::ChannelMsg::ExitStatus { exit_status } => {
778                exit_code = Some(exit_status);
779            }
780            russh::ChannelMsg::Close => break,
781            russh::ChannelMsg::Eof => {}
782            _ => {}
783        }
784    }
785
786    let rc = exit_code.unwrap_or_else(|| {
787        warn!("No exit status received for command \"{cmd}\", defaulting to 1");
788        1
789    });
790
791    trace!("Command output: {output}");
792    debug!(r#"Command output: "{output}"; exit code: {rc}"#);
793
794    Ok((rc, output))
795}
796
797/// Open a session channel on the given handle.
798async fn open_channel<T>(session: &Handle<T>) -> RemoteResult<russh::Channel<russh::client::Msg>>
799where
800    T: Handler,
801{
802    session.channel_open_session().await.map_err(|err| {
803        RemoteError::new_ex(
804            RemoteErrorType::ProtocolError,
805            format!("Could not open channel: {err}"),
806        )
807    })
808}
809
810#[cfg(test)]
811mod test {
812
813    use std::sync::Arc;
814
815    use ssh2_config::ParseRule;
816
817    use super::*;
818    use crate::mock::ssh as ssh_mock;
819
820    fn test_runtime() -> Arc<Runtime> {
821        Arc::new(
822            tokio::runtime::Builder::new_current_thread()
823                .enable_all()
824                .build()
825                .unwrap(),
826        )
827    }
828
829    #[test]
830    fn should_connect_to_ssh_server_auth_user_password() {
831        use crate::ssh::container::OpensshServer;
832
833        let container = OpensshServer::start();
834        let port = container.port();
835
836        crate::mock::logger();
837        let runtime = test_runtime();
838        let config_file = ssh_mock::create_ssh_config(port);
839        let opts = SshOpts::new("sftp")
840            .config_file(config_file.path(), ParseRule::ALLOW_UNKNOWN_FIELDS)
841            .password("password")
842            .runtime(runtime);
843
844        if let Err(err) = RusshSession::<NoCheckServerKey>::connect(&opts) {
845            panic!("Could not connect to server: {err}");
846        }
847        let session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
848        assert!(session.authenticated().unwrap());
849
850        drop(container);
851    }
852
853    #[test]
854    fn should_connect_to_ssh_server_auth_key() {
855        use crate::ssh::container::OpensshServer;
856
857        let container = OpensshServer::start();
858        let port = container.port();
859
860        crate::mock::logger();
861        let runtime = test_runtime();
862        let config_file = ssh_mock::create_ssh_config(port);
863        let opts = SshOpts::new("sftp")
864            .config_file(config_file.path(), ParseRule::ALLOW_UNKNOWN_FIELDS)
865            .key_storage(Box::new(ssh_mock::MockSshKeyStorage::default()))
866            .runtime(runtime);
867        let session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
868        assert!(session.authenticated().unwrap());
869    }
870
871    #[test]
872    fn should_perform_shell_command_on_server() {
873        crate::mock::logger();
874        let container = crate::ssh::container::OpensshServer::start();
875        let port = container.port();
876
877        let runtime = test_runtime();
878        let opts = SshOpts::new("127.0.0.1")
879            .port(port)
880            .username("sftp")
881            .password("password")
882            .runtime(runtime);
883        let mut session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
884        assert!(session.authenticated().unwrap());
885        assert!(session.cmd("pwd").is_ok());
886    }
887
888    #[test]
889    fn should_perform_shell_command_on_server_and_return_exit_code() {
890        crate::mock::logger();
891        let container = crate::ssh::container::OpensshServer::start();
892        let port = container.port();
893
894        let runtime = test_runtime();
895        let opts = SshOpts::new("127.0.0.1")
896            .port(port)
897            .username("sftp")
898            .password("password")
899            .runtime(runtime);
900        let mut session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
901        assert!(session.authenticated().unwrap());
902        assert_eq!(
903            session.cmd_at("pwd", Path::new("/tmp")).ok().unwrap(),
904            (0, String::from("/tmp\n"))
905        );
906        assert_eq!(
907            session
908                .cmd_at("pippopluto", Path::new("/tmp"))
909                .ok()
910                .unwrap()
911                .0,
912            127
913        );
914    }
915
916    #[test]
917    fn should_fail_authentication() {
918        crate::mock::logger();
919        let container = crate::ssh::container::OpensshServer::start();
920        let port = container.port();
921
922        let runtime = test_runtime();
923        let opts = SshOpts::new("127.0.0.1")
924            .port(port)
925            .username("sftp")
926            .password("ippopotamo")
927            .runtime(runtime);
928        assert!(RusshSession::<NoCheckServerKey>::connect(&opts).is_err());
929    }
930
931    #[test]
932    fn test_filetransfer_sftp_bad_server() {
933        crate::mock::logger();
934        let runtime = test_runtime();
935        let opts = SshOpts::new("myverybad.verybad.server")
936            .port(10022)
937            .username("sftp")
938            .password("ippopotamo")
939            .runtime(runtime);
940        assert!(RusshSession::<NoCheckServerKey>::connect(&opts).is_err());
941    }
942}