1mod auth;
4mod scp;
5
6use std::borrow::Cow;
7use std::io::{Read, Seek, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use remotefs::fs::{Metadata, ReadStream, WriteStream};
12use remotefs::{File, RemoteError, RemoteErrorType, RemoteResult};
13use russh::client::{Handle, Handler};
14use russh::keys::{Algorithm, PublicKey};
15use russh::{Disconnect, client};
16use russh_sftp::client::SftpSession;
17use tokio::runtime::Runtime;
18
19use super::{SshSession, WriteMode};
20use crate::SshOpts;
21use crate::ssh::backend::Sftp;
22use crate::ssh::config::Config;
23use crate::ssh::key_method::MethodType;
24
25#[derive(Default)]
32pub struct NoCheckServerKey;
33
34impl Handler for NoCheckServerKey {
35 type Error = russh::Error;
36
37 async fn check_server_key(
38 &mut self,
39 _server_public_key: &PublicKey,
40 ) -> Result<bool, Self::Error> {
41 Ok(true)
42 }
43}
44
45pub struct RusshSession<T>
47where
48 T: Handler + Default + Send + 'static,
49{
50 runtime: Arc<Runtime>,
51 session: Handle<T>,
52}
53
54pub struct RusshSftp {
56 runtime: Arc<Runtime>,
57 session: Arc<SftpSession>,
58}
59
60impl<T> SshSession for RusshSession<T>
61where
62 T: Handler + Default + Send + 'static,
63{
64 type Sftp = RusshSftp;
65
66 fn connect(opts: &SshOpts) -> RemoteResult<Self> {
67 let runtime = opts.runtime.as_ref().cloned().ok_or_else(|| {
68 RemoteError::new_ex(
69 RemoteErrorType::UnsupportedFeature,
70 "RusshSession requires a Tokio runtime",
71 )
72 })?;
73
74 let ssh_config = Config::try_from(opts)?;
75 debug!("Connecting to '{}'", ssh_config.address);
76
77 let mut config = client::Config {
78 inactivity_timeout: Some(ssh_config.connection_timeout),
79 ..Default::default()
80 };
81
82 apply_config_algo_prefs(&mut config, &ssh_config);
84
85 apply_opts_algo_prefs(&mut config, opts);
87
88 let config = Arc::new(config);
89
90 let mut session = runtime
91 .block_on(async {
92 client::connect(config, ssh_config.address.as_str(), T::default()).await
93 })
94 .map_err(|err| {
95 let msg = format!("SSH connection failed: {err:?}");
96 error!("{msg}");
97 RemoteError::new_ex(RemoteErrorType::ConnectionError, msg)
98 })?;
99
100 auth::authenticate(&mut session, &runtime, opts, &ssh_config)?;
102
103 Ok(Self { runtime, session })
104 }
105
106 fn disconnect(&self) -> RemoteResult<()> {
107 self.runtime
108 .block_on(async {
109 self.session
110 .disconnect(Disconnect::ByApplication, "Closed by user", "en_US")
111 .await
112 })
113 .map_err(|err| {
114 log::error!("failed to disconnect {err}");
115 RemoteError::new_ex(RemoteErrorType::ConnectionError, err.to_string())
116 })
117 }
118
119 fn banner(&self) -> RemoteResult<Option<String>> {
120 Ok(None)
125 }
126
127 fn authenticated(&self) -> RemoteResult<bool> {
128 Ok(!self.session.is_closed())
129 }
130
131 fn cmd<S>(&mut self, cmd: S) -> RemoteResult<(u32, String)>
132 where
133 S: AsRef<str>,
134 {
135 let cmd = cmd.as_ref();
136 trace!("Running command: {cmd}");
137
138 let escaped = cmd.replace('\'', r#"'\''"#);
142 let wrapped = format!("sh -c '{escaped}'");
143
144 self.runtime
145 .block_on(async { perform_shell_cmd(&self.session, &wrapped).await })
146 }
147
148 fn scp_recv(&self, path: &Path) -> RemoteResult<Box<dyn Read + Send>> {
149 self.runtime
150 .block_on(async { scp::recv(&self.session, path).await })
151 }
152
153 fn scp_send(
154 &self,
155 remote_path: &Path,
156 mode: i32,
157 size: u64,
158 _times: Option<(u64, u64)>,
159 ) -> RemoteResult<Box<dyn Write + Send>> {
160 let runtime = self.runtime.clone();
161 self.runtime
162 .block_on(async { scp::send(&self.session, remote_path, mode, size, runtime).await })
163 }
164
165 fn sftp(&self) -> RemoteResult<Self::Sftp> {
166 let channel = self
167 .runtime
168 .block_on(async {
169 let channel = self.session.channel_open_session().await?;
170 channel.request_subsystem(true, "sftp").await?;
171 Ok(channel)
172 })
173 .map_err(|err: russh::Error| {
174 error!("Failed to init SFTP session: {err}");
175 RemoteError::new_ex(RemoteErrorType::ProtocolError, err.to_string())
176 })?;
177
178 self.runtime
179 .block_on(async { SftpSession::new(channel.into_stream()).await })
180 .map(|session| RusshSftp {
181 runtime: self.runtime.clone(),
182 session: Arc::new(session),
183 })
184 .map_err(|err| {
185 error!("Failed to init SFTP session: {err}");
186 RemoteError::new_ex(RemoteErrorType::ProtocolError, err.to_string())
187 })
188 }
189}
190
191impl Sftp for RusshSftp {
192 fn mkdir(&self, path: &Path, mode: i32) -> RemoteResult<()> {
193 let path_str = path.to_string_lossy().to_string();
194 self.runtime.block_on(async {
195 self.session.create_dir(&path_str).await.map_err(|err| {
196 RemoteError::new_ex(
197 RemoteErrorType::FileCreateDenied,
198 format!("Could not create directory '{}': {err}", path.display()),
199 )
200 })?;
201 let mut attrs = russh_sftp::protocol::FileAttributes::empty();
203 attrs.permissions = Some(mode as u32 & 0o7777);
204 self.session
205 .set_metadata(&path_str, attrs)
206 .await
207 .map_err(|err| {
208 RemoteError::new_ex(
209 RemoteErrorType::ProtocolError,
210 format!("Could not set permissions on '{}': {err}", path.display()),
211 )
212 })
213 })
214 }
215
216 fn open_read(&self, path: &Path) -> RemoteResult<ReadStream> {
217 let path_str = path.to_string_lossy().to_string();
218 let reader = PipelinedSftpReader::new(self.runtime.clone(), self.session.clone(), path_str)
219 .map_err(|err| {
220 RemoteError::new_ex(
221 RemoteErrorType::ProtocolError,
222 format!("Could not read file at '{}': {err}", path.display()),
223 )
224 })?;
225 Ok(ReadStream::from(Box::new(reader) as Box<dyn Read + Send>))
226 }
227
228 fn open_write(&self, path: &Path, flags: WriteMode, mode: i32) -> RemoteResult<WriteStream> {
229 let path_str = path.to_string_lossy().to_string();
230 self.runtime.block_on(async {
231 let open_flags = match flags {
232 WriteMode::Append => {
233 russh_sftp::protocol::OpenFlags::WRITE
234 | russh_sftp::protocol::OpenFlags::APPEND
235 | russh_sftp::protocol::OpenFlags::CREATE
236 }
237 WriteMode::Truncate => {
238 russh_sftp::protocol::OpenFlags::WRITE
239 | russh_sftp::protocol::OpenFlags::CREATE
240 | russh_sftp::protocol::OpenFlags::TRUNCATE
241 }
242 };
243
244 let mut attrs = russh_sftp::protocol::FileAttributes::empty();
245 attrs.permissions = Some(mode as u32 & 0o7777);
246
247 let file = self
248 .session
249 .open_with_flags_and_attributes(&path_str, open_flags, attrs)
250 .await
251 .map_err(|err| {
252 RemoteError::new_ex(
253 RemoteErrorType::ProtocolError,
254 format!("Could not open file at '{}': {err}", path.display()),
255 )
256 })?;
257
258 let writer = SftpFileWriter {
259 file,
260 runtime: self.runtime.clone(),
261 };
262 Ok(WriteStream::from(
263 Box::new(writer) as Box<dyn remotefs::fs::stream::WriteAndSeek>
264 ))
265 })
266 }
267
268 fn readdir<T>(&self, dirname: T) -> RemoteResult<Vec<File>>
269 where
270 T: AsRef<Path>,
271 {
272 let dirname = dirname.as_ref();
273 let dir_str = dirname.to_string_lossy().to_string();
274 self.runtime.block_on(async {
275 let entries = self.session.read_dir(&dir_str).await.map_err(|err| {
276 RemoteError::new_ex(
277 RemoteErrorType::ProtocolError,
278 format!("Could not read directory: {err}"),
279 )
280 })?;
281
282 let mut files = Vec::new();
283 for entry in entries {
284 let entry_path = dirname.join(entry.file_name());
285 let symlink = if entry.file_type().is_symlink() {
286 match self
287 .session
288 .read_link(entry_path.to_string_lossy().as_ref())
289 .await
290 {
291 Ok(target) => Some(PathBuf::from(target)),
292 Err(err) => {
293 error!(
294 "Failed to read link of {} (even though it's a symlink): {err}",
295 entry_path.display()
296 );
297 None
298 }
299 }
300 } else {
301 None
302 };
303 files.push(make_fsentry(&entry_path, &entry.metadata(), symlink));
304 }
305
306 Ok(files)
307 })
308 }
309
310 fn realpath(&self, path: &Path) -> RemoteResult<PathBuf> {
311 let path_str = path.to_string_lossy().to_string();
312 self.runtime.block_on(async {
313 self.session
314 .canonicalize(&path_str)
315 .await
316 .map(PathBuf::from)
317 .map_err(|err| {
318 RemoteError::new_ex(
319 RemoteErrorType::ProtocolError,
320 format!(
321 "Could not resolve real path for '{}': {err}",
322 path.display()
323 ),
324 )
325 })
326 })
327 }
328
329 fn rename(&self, src: &Path, dest: &Path) -> RemoteResult<()> {
330 let src_str = src.to_string_lossy().to_string();
331 let dest_str = dest.to_string_lossy().to_string();
332 self.runtime.block_on(async {
333 self.session
334 .rename(&src_str, &dest_str)
335 .await
336 .map_err(|err| {
337 RemoteError::new_ex(
338 RemoteErrorType::ProtocolError,
339 format!("Could not rename file '{}': {err}", src.display()),
340 )
341 })
342 })
343 }
344
345 fn rmdir(&self, path: &Path) -> RemoteResult<()> {
346 let path_str = path.to_string_lossy().to_string();
347 self.runtime.block_on(async {
348 self.session.remove_dir(&path_str).await.map_err(|err| {
349 RemoteError::new_ex(
350 RemoteErrorType::CouldNotRemoveFile,
351 format!("Could not remove directory '{}': {err}", path.display()),
352 )
353 })
354 })
355 }
356
357 fn setstat(&self, path: &Path, metadata: Metadata) -> RemoteResult<()> {
358 let path_str = path.to_string_lossy().to_string();
359 let attrs = metadata_to_file_attributes(metadata);
360 self.runtime.block_on(async {
361 self.session
362 .set_metadata(&path_str, attrs)
363 .await
364 .map_err(|err| {
365 RemoteError::new_ex(
366 RemoteErrorType::ProtocolError,
367 format!(
368 "Could not set file attributes for '{}': {err}",
369 path.display()
370 ),
371 )
372 })
373 })
374 }
375
376 fn stat(&self, filename: &Path) -> RemoteResult<File> {
377 let path_str = filename.to_string_lossy().to_string();
378 self.runtime.block_on(async {
379 let attrs = self.session.metadata(&path_str).await.map_err(|err| {
380 RemoteError::new_ex(
381 RemoteErrorType::ProtocolError,
382 format!(
383 "Could not get file attributes for '{}': {err}",
384 filename.display()
385 ),
386 )
387 })?;
388
389 let symlink = if attrs.is_symlink() {
390 match self.session.read_link(&path_str).await {
391 Ok(target) => Some(PathBuf::from(target)),
392 Err(err) => {
393 error!(
394 "Failed to read link of {} (even though it's a symlink): {err}",
395 filename.display()
396 );
397 None
398 }
399 }
400 } else {
401 None
402 };
403
404 Ok(make_fsentry(filename, &attrs, symlink))
405 })
406 }
407
408 fn symlink(&self, path: &Path, target: &Path) -> RemoteResult<()> {
409 let path_str = path.to_string_lossy().to_string();
410 let target_str = target.to_string_lossy().to_string();
411 self.runtime.block_on(async {
412 self.session
413 .symlink(&path_str, &target_str)
414 .await
415 .map_err(|err| {
416 RemoteError::new_ex(
417 RemoteErrorType::FileCreateDenied,
418 format!("Could not create symlink '{}': {err}", path.display()),
419 )
420 })
421 })
422 }
423
424 fn unlink(&self, path: &Path) -> RemoteResult<()> {
425 let path_str = path.to_string_lossy().to_string();
426 self.runtime.block_on(async {
427 self.session.remove_file(&path_str).await.map_err(|err| {
428 RemoteError::new_ex(
429 RemoteErrorType::CouldNotRemoveFile,
430 format!("Could not remove file '{}': {err}", path.display()),
431 )
432 })
433 })
434 }
435}
436
437fn metadata_to_file_attributes(metadata: Metadata) -> russh_sftp::protocol::FileAttributes {
439 let atime = metadata
440 .accessed
441 .and_then(|x| x.duration_since(std::time::UNIX_EPOCH).ok())
442 .map(|x| x.as_secs() as u32);
443 let mtime = metadata
444 .modified
445 .and_then(|x| x.duration_since(std::time::UNIX_EPOCH).ok())
446 .map(|x| x.as_secs() as u32);
447 russh_sftp::protocol::FileAttributes {
448 size: Some(metadata.size),
449 uid: metadata.uid,
450 user: None,
451 gid: metadata.gid,
452 group: None,
453 permissions: metadata.mode.map(u32::from),
454 atime,
455 mtime,
456 }
457}
458
459fn make_fsentry(
461 path: &Path,
462 attrs: &russh_sftp::protocol::FileAttributes,
463 symlink: Option<PathBuf>,
464) -> File {
465 let name = match path.file_name() {
466 None => "/".to_string(),
467 Some(name) => name.to_string_lossy().to_string(),
468 };
469 debug!("Found file {name}");
470
471 let uid = attrs.uid;
472 let gid = attrs.gid;
473 let mode = attrs.permissions.map(remotefs::fs::UnixPex::from);
474 let size = attrs.size.unwrap_or(0);
475 let accessed = attrs.atime.map(|x| {
476 std::time::UNIX_EPOCH
477 .checked_add(std::time::Duration::from_secs(u64::from(x)))
478 .unwrap_or(std::time::UNIX_EPOCH)
479 });
480 let modified = attrs.mtime.map(|x| {
481 std::time::UNIX_EPOCH
482 .checked_add(std::time::Duration::from_secs(u64::from(x)))
483 .unwrap_or(std::time::UNIX_EPOCH)
484 });
485
486 let file_type = if symlink.is_some() {
487 remotefs::fs::FileType::Symlink
488 } else if attrs.is_dir() {
489 remotefs::fs::FileType::Directory
490 } else {
491 remotefs::fs::FileType::File
492 };
493
494 let entry_metadata = Metadata {
495 accessed,
496 created: None,
497 file_type,
498 gid,
499 mode,
500 modified,
501 size,
502 symlink,
503 uid,
504 };
505 trace!("Metadata for {}: {:?}", path.display(), entry_metadata);
506 File {
507 path: path.to_path_buf(),
508 metadata: entry_metadata,
509 }
510}
511
512struct SftpFileWriter {
518 file: russh_sftp::client::fs::File,
519 runtime: Arc<Runtime>,
520}
521
522impl Write for SftpFileWriter {
523 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
524 use tokio::io::AsyncWriteExt as _;
525 self.runtime.block_on(self.file.write(buf))
526 }
527
528 fn flush(&mut self) -> std::io::Result<()> {
529 use tokio::io::AsyncWriteExt as _;
530 self.runtime.block_on(self.file.flush())
531 }
532}
533
534impl Seek for SftpFileWriter {
535 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
536 use tokio::io::AsyncSeekExt as _;
537 self.runtime.block_on(self.file.seek(pos))
538 }
539}
540
541impl remotefs::fs::stream::WriteAndSeek for SftpFileWriter {}
542
543const SFTP_PIPELINE_DEPTH: usize = 4;
545
546const SFTP_CHUNK_SIZE: usize = 4 * 1024 * 1024;
548
549const MAX_PREFETCH: usize = 2;
552
553const BATCH_SIZE: usize = SFTP_PIPELINE_DEPTH * SFTP_CHUNK_SIZE;
555
556struct PipelinedSftpReader {
563 runtime: Arc<Runtime>,
564 session: Arc<SftpSession>,
565 path: String,
566 file_size: usize,
567 fetch_offset: usize,
569 batches: std::collections::VecDeque<Vec<u8>>,
571 buf_cursor: usize,
573 pending: Option<PrefetchTask>,
575}
576
577struct PrefetchTask {
579 batch_offset: usize,
582 handle: tokio::task::JoinHandle<Result<Vec<u8>, std::io::Error>>,
583}
584
585impl PipelinedSftpReader {
586 fn new(
591 runtime: Arc<Runtime>,
592 session: Arc<SftpSession>,
593 path: String,
594 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
595 let metadata = runtime.block_on(session.metadata(&path))?;
596 let file_size = metadata.size.unwrap_or(0) as usize;
597
598 let mut reader = Self {
599 runtime,
600 session,
601 path,
602 file_size,
603 fetch_offset: 0,
604 batches: std::collections::VecDeque::new(),
605 buf_cursor: 0,
606 pending: None,
607 };
608
609 if file_size == 0 {
610 return Ok(reader);
611 }
612
613 let first_batch = reader.fetch_batch_blocking()?;
615 reader.batches.push_back(first_batch);
616
617 reader.maybe_start_prefetch();
619
620 Ok(reader)
621 }
622
623 fn fetch_batch_blocking(
625 &mut self,
626 ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
627 let remaining = self.file_size.saturating_sub(self.fetch_offset);
628 if remaining == 0 {
629 return Ok(Vec::new());
630 }
631
632 let batch_len = remaining.min(BATCH_SIZE);
633 let offset = self.fetch_offset;
634 let batch = self
635 .runtime
636 .block_on(Self::fetch_batch(
637 self.session.clone(),
638 self.path.clone(),
639 offset,
640 batch_len,
641 ))
642 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
643
644 self.fetch_offset += batch_len;
645 Ok(batch)
646 }
647
648 fn maybe_start_prefetch(&mut self) {
651 if self.pending.is_some() {
652 return;
653 }
654 if self.batches.len() > MAX_PREFETCH {
655 return;
656 }
657 let remaining = self.file_size.saturating_sub(self.fetch_offset);
658 if remaining == 0 {
659 return;
660 }
661
662 let batch_len = remaining.min(BATCH_SIZE);
663 let session = self.session.clone();
664 let path = self.path.clone();
665 let offset = self.fetch_offset;
666 self.fetch_offset += batch_len;
668
669 let handle = self
670 .runtime
671 .spawn(async move { Self::fetch_batch(session, path, offset, batch_len).await });
672
673 self.pending = Some(PrefetchTask {
674 batch_offset: offset,
675 handle,
676 });
677 }
678
679 fn collect_pending(&mut self) -> std::io::Result<Option<Vec<u8>>> {
683 let task = match self.pending.take() {
684 Some(t) => t,
685 None => return Ok(None),
686 };
687
688 match self
689 .runtime
690 .block_on(task.handle)
691 .map_err(std::io::Error::other)?
692 {
693 Ok(batch) if batch.is_empty() => Ok(None),
694 Ok(batch) => Ok(Some(batch)),
695 Err(err) => {
696 self.fetch_offset = task.batch_offset;
698 Err(std::io::Error::other(err))
699 }
700 }
701 }
702
703 async fn fetch_batch(
706 session: Arc<SftpSession>,
707 path: String,
708 batch_offset: usize,
709 batch_len: usize,
710 ) -> Result<Vec<u8>, std::io::Error> {
711 use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _};
712
713 let chunk_count = batch_len.div_ceil(SFTP_CHUNK_SIZE);
714 let mut tasks = Vec::with_capacity(chunk_count);
715
716 for i in 0..chunk_count {
717 let chunk_offset = i * SFTP_CHUNK_SIZE;
718 let len = SFTP_CHUNK_SIZE.min(batch_len - chunk_offset);
719 let abs_offset = batch_offset + chunk_offset;
720
721 let mut file = session.open(&path).await.map_err(std::io::Error::other)?;
722 file.seek(std::io::SeekFrom::Start(abs_offset as u64))
723 .await?;
724
725 tasks.push(tokio::spawn(async move {
726 let mut buf = vec![0_u8; len];
727 file.read_exact(&mut buf).await?;
728 Ok::<(usize, Vec<u8>), std::io::Error>((chunk_offset, buf))
729 }));
730 }
731
732 let mut result = vec![0_u8; batch_len];
733 for task in tasks {
734 let (chunk_offset, chunk) = task
735 .await
736 .map_err(std::io::Error::other)?
737 .map_err(std::io::Error::other)?;
738 result[chunk_offset..chunk_offset + chunk.len()].copy_from_slice(&chunk);
739 }
740
741 Ok(result)
742 }
743}
744
745impl Read for PipelinedSftpReader {
746 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
747 loop {
748 if let Some(front) = self.batches.front() {
750 let available = &front[self.buf_cursor..];
751 if !available.is_empty() {
752 let to_copy = available.len().min(buf.len());
753 buf[..to_copy].copy_from_slice(&available[..to_copy]);
754 self.buf_cursor += to_copy;
755 return Ok(to_copy);
756 }
757
758 self.batches.pop_front();
760 self.buf_cursor = 0;
761
762 if let Some(batch) = self.collect_pending()? {
764 self.batches.push_back(batch);
765 }
766
767 self.maybe_start_prefetch();
769
770 continue;
771 }
772
773 if let Some(batch) = self.collect_pending()? {
775 self.batches.push_back(batch);
776 self.maybe_start_prefetch();
777 continue;
778 }
779
780 return Ok(0);
782 }
783 }
784}
785
786fn apply_config_algo_prefs(config: &mut client::Config, ssh_config: &Config) {
788 let params = &ssh_config.params;
789
790 let kex: Vec<russh::kex::Name> = params
792 .kex_algorithms
793 .algorithms()
794 .iter()
795 .filter_map(|name| {
796 russh::kex::Name::try_from(name.as_str())
797 .map_err(|()| warn!("Unsupported KEX algorithm: {name}"))
798 .ok()
799 })
800 .collect();
801 if !kex.is_empty() {
802 config.preferred.kex = Cow::Owned(kex);
803 }
804
805 let host_keys: Vec<Algorithm> = params
807 .host_key_algorithms
808 .algorithms()
809 .iter()
810 .filter_map(|name| {
811 name.parse::<Algorithm>()
812 .map_err(|err| warn!("Unsupported host key algorithm '{name}': {err}"))
813 .ok()
814 })
815 .collect();
816 if !host_keys.is_empty() {
817 config.preferred.key = Cow::Owned(host_keys);
818 }
819
820 let ciphers: Vec<russh::cipher::Name> = params
822 .ciphers
823 .algorithms()
824 .iter()
825 .filter_map(|name| {
826 russh::cipher::Name::try_from(name.as_str())
827 .map_err(|()| warn!("Unsupported cipher algorithm: {name}"))
828 .ok()
829 })
830 .collect();
831 if !ciphers.is_empty() {
832 config.preferred.cipher = Cow::Owned(ciphers);
833 }
834
835 let macs: Vec<russh::mac::Name> = params
837 .mac
838 .algorithms()
839 .iter()
840 .filter_map(|name| {
841 russh::mac::Name::try_from(name.as_str())
842 .map_err(|()| warn!("Unsupported MAC algorithm: {name}"))
843 .ok()
844 })
845 .collect();
846 if !macs.is_empty() {
847 config.preferred.mac = Cow::Owned(macs);
848 }
849}
850
851fn apply_opts_algo_prefs(config: &mut client::Config, opts: &SshOpts) {
855 for method in opts.methods.iter() {
856 let algos = method.prefs();
857 let names: Vec<&str> = algos.split(',').collect();
858
859 match method.method_type {
860 MethodType::Kex => {
861 let kex: Vec<russh::kex::Name> = names
862 .iter()
863 .filter_map(|name| {
864 russh::kex::Name::try_from(*name)
865 .map_err(|()| warn!("Unsupported KEX algorithm: {name}"))
866 .ok()
867 })
868 .collect();
869 if !kex.is_empty() {
870 config.preferred.kex = Cow::Owned(kex);
871 }
872 }
873 MethodType::HostKey => {
874 let keys: Vec<Algorithm> = names
875 .iter()
876 .filter_map(|name| {
877 name.parse::<Algorithm>()
878 .map_err(|err| warn!("Unsupported host key algorithm '{name}': {err}"))
879 .ok()
880 })
881 .collect();
882 if !keys.is_empty() {
883 config.preferred.key = Cow::Owned(keys);
884 }
885 }
886 MethodType::CryptClientServer | MethodType::CryptServerClient => {
887 let ciphers: Vec<russh::cipher::Name> = names
888 .iter()
889 .filter_map(|name| {
890 russh::cipher::Name::try_from(*name)
891 .map_err(|()| warn!("Unsupported cipher algorithm: {name}"))
892 .ok()
893 })
894 .collect();
895 if !ciphers.is_empty() {
896 config.preferred.cipher = Cow::Owned(ciphers);
897 }
898 }
899 MethodType::MacClientServer | MethodType::MacServerClient => {
900 let macs: Vec<russh::mac::Name> = names
901 .iter()
902 .filter_map(|name| {
903 russh::mac::Name::try_from(*name)
904 .map_err(|()| warn!("Unsupported MAC algorithm: {name}"))
905 .ok()
906 })
907 .collect();
908 if !macs.is_empty() {
909 config.preferred.mac = Cow::Owned(macs);
910 }
911 }
912 _ => {
913 trace!(
914 "Ignoring unsupported method type {:?} for russh backend",
915 method.method_type
916 );
917 }
918 }
919 }
920}
921
922async fn perform_shell_cmd<T>(session: &Handle<T>, cmd: &str) -> RemoteResult<(u32, String)>
927where
928 T: Handler,
929{
930 let mut channel = open_channel(session).await?;
931
932 channel.exec(true, cmd).await.map_err(|err| {
933 RemoteError::new_ex(
934 RemoteErrorType::ProtocolError,
935 format!("Could not execute command \"{cmd}\": {err}"),
936 )
937 })?;
938
939 let mut output = String::new();
940 let mut exit_code: Option<u32> = None;
941
942 while let Some(msg) = channel.wait().await {
943 match msg {
944 russh::ChannelMsg::Data { data } => {
945 output.push_str(&String::from_utf8_lossy(&data));
946 }
947 russh::ChannelMsg::ExitStatus { exit_status } => {
948 exit_code = Some(exit_status);
949 }
950 russh::ChannelMsg::Close => break,
951 russh::ChannelMsg::Eof => {}
952 _ => {}
953 }
954 }
955
956 let rc = exit_code.unwrap_or_else(|| {
957 warn!("No exit status received for command \"{cmd}\", defaulting to 1");
958 1
959 });
960
961 trace!("Command output: {output}");
962 debug!(r#"Command output: "{output}"; exit code: {rc}"#);
963
964 Ok((rc, output))
965}
966
967async fn open_channel<T>(session: &Handle<T>) -> RemoteResult<russh::Channel<russh::client::Msg>>
969where
970 T: Handler,
971{
972 session.channel_open_session().await.map_err(|err| {
973 RemoteError::new_ex(
974 RemoteErrorType::ProtocolError,
975 format!("Could not open channel: {err}"),
976 )
977 })
978}
979
980#[cfg(test)]
981mod test {
982
983 use std::sync::Arc;
984
985 use ssh2_config::ParseRule;
986
987 use super::*;
988 use crate::mock::ssh as ssh_mock;
989
990 fn test_runtime() -> Arc<Runtime> {
991 Arc::new(
992 tokio::runtime::Builder::new_current_thread()
993 .enable_all()
994 .build()
995 .unwrap(),
996 )
997 }
998
999 #[test]
1000 fn should_connect_to_ssh_server_auth_user_password() {
1001 use crate::ssh::container::OpensshServer;
1002
1003 let container = OpensshServer::start();
1004 let port = container.port();
1005
1006 crate::mock::logger();
1007 let runtime = test_runtime();
1008 let config_file = ssh_mock::create_ssh_config(port);
1009 let opts = SshOpts::new("sftp")
1010 .config_file(config_file.path(), ParseRule::ALLOW_UNKNOWN_FIELDS)
1011 .password("password")
1012 .runtime(runtime);
1013
1014 if let Err(err) = RusshSession::<NoCheckServerKey>::connect(&opts) {
1015 panic!("Could not connect to server: {err}");
1016 }
1017 let session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1018 assert!(session.authenticated().unwrap());
1019
1020 drop(container);
1021 }
1022
1023 #[test]
1024 fn should_connect_to_ssh_server_auth_key() {
1025 use crate::ssh::container::OpensshServer;
1026
1027 let container = OpensshServer::start();
1028 let port = container.port();
1029
1030 crate::mock::logger();
1031 let runtime = test_runtime();
1032 let config_file = ssh_mock::create_ssh_config(port);
1033 let opts = SshOpts::new("sftp")
1034 .config_file(config_file.path(), ParseRule::ALLOW_UNKNOWN_FIELDS)
1035 .key_storage(Box::new(ssh_mock::MockSshKeyStorage::default()))
1036 .runtime(runtime);
1037 let session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1038 assert!(session.authenticated().unwrap());
1039 }
1040
1041 #[test]
1042 fn should_perform_shell_command_on_server() {
1043 crate::mock::logger();
1044 let container = crate::ssh::container::OpensshServer::start();
1045 let port = container.port();
1046
1047 let runtime = test_runtime();
1048 let opts = SshOpts::new("127.0.0.1")
1049 .port(port)
1050 .username("sftp")
1051 .password("password")
1052 .runtime(runtime);
1053 let mut session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1054 assert!(session.authenticated().unwrap());
1055 assert!(session.cmd("pwd").is_ok());
1056 }
1057
1058 #[test]
1059 fn should_perform_shell_command_on_server_and_return_exit_code() {
1060 crate::mock::logger();
1061 let container = crate::ssh::container::OpensshServer::start();
1062 let port = container.port();
1063
1064 let runtime = test_runtime();
1065 let opts = SshOpts::new("127.0.0.1")
1066 .port(port)
1067 .username("sftp")
1068 .password("password")
1069 .runtime(runtime);
1070 let mut session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1071 assert!(session.authenticated().unwrap());
1072 assert_eq!(
1073 session.cmd_at("pwd", Path::new("/tmp")).ok().unwrap(),
1074 (0, String::from("/tmp\n"))
1075 );
1076 assert_eq!(
1077 session
1078 .cmd_at("pippopluto", Path::new("/tmp"))
1079 .ok()
1080 .unwrap()
1081 .0,
1082 127
1083 );
1084 }
1085
1086 #[test]
1087 fn should_fail_authentication() {
1088 crate::mock::logger();
1089 let container = crate::ssh::container::OpensshServer::start();
1090 let port = container.port();
1091
1092 let runtime = test_runtime();
1093 let opts = SshOpts::new("127.0.0.1")
1094 .port(port)
1095 .username("sftp")
1096 .password("ippopotamo")
1097 .runtime(runtime);
1098 assert!(RusshSession::<NoCheckServerKey>::connect(&opts).is_err());
1099 }
1100
1101 #[test]
1102 fn test_filetransfer_sftp_bad_server() {
1103 crate::mock::logger();
1104 let runtime = test_runtime();
1105 let opts = SshOpts::new("myverybad.verybad.server")
1106 .port(10022)
1107 .username("sftp")
1108 .password("ippopotamo")
1109 .runtime(runtime);
1110 assert!(RusshSession::<NoCheckServerKey>::connect(&opts).is_err());
1111 }
1112}