Skip to main content

remotefs_ssh/ssh/backend/
russh.rs

1//! [russh](https://docs.rs/russh/latest/russh/) backend for `remotefs-ssh`.
2
3mod auth;
4mod scp;
5
6use std::borrow::Cow;
7use std::io::{Read, Seek, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use remotefs::fs::{Metadata, ReadStream, WriteStream};
12use remotefs::{File, RemoteError, RemoteErrorType, RemoteResult};
13use russh::client::{Handle, Handler};
14use russh::keys::{Algorithm, PublicKey};
15use russh::{Disconnect, client};
16use russh_sftp::client::SftpSession;
17use tokio::runtime::Runtime;
18
19use super::{SshSession, WriteMode};
20use crate::SshOpts;
21use crate::ssh::backend::Sftp;
22use crate::ssh::config::Config;
23use crate::ssh::key_method::MethodType;
24
25/// The default SSH client handler for russh.
26///
27/// Accepts all server host keys. Host key verification should be implemented
28/// by the caller if stricter security is required.
29///
30/// You can implement your own [`Handler`] and use it with [`RusshSession`] if you want a different behaviour.
31#[derive(Default)]
32pub struct NoCheckServerKey;
33
34impl Handler for NoCheckServerKey {
35    type Error = russh::Error;
36
37    async fn check_server_key(
38        &mut self,
39        _server_public_key: &PublicKey,
40    ) -> Result<bool, Self::Error> {
41        Ok(true)
42    }
43}
44
45/// [`russh`](https://docs.rs/russh/latest/russh) session.
46pub struct RusshSession<T>
47where
48    T: Handler + Default + Send + 'static,
49{
50    runtime: Arc<Runtime>,
51    session: Handle<T>,
52}
53
54/// SFTP handle for russh.
55pub struct RusshSftp {
56    runtime: Arc<Runtime>,
57    session: Arc<SftpSession>,
58}
59
60impl<T> SshSession for RusshSession<T>
61where
62    T: Handler + Default + Send + 'static,
63{
64    type Sftp = RusshSftp;
65
66    fn connect(opts: &SshOpts) -> RemoteResult<Self> {
67        let runtime = opts.runtime.as_ref().cloned().ok_or_else(|| {
68            RemoteError::new_ex(
69                RemoteErrorType::UnsupportedFeature,
70                "RusshSession requires a Tokio runtime",
71            )
72        })?;
73
74        let ssh_config = Config::try_from(opts)?;
75        debug!("Connecting to '{}'", ssh_config.address);
76
77        let mut config = client::Config {
78            inactivity_timeout: Some(ssh_config.connection_timeout),
79            ..Default::default()
80        };
81
82        // Apply algorithm preferences from ssh config
83        apply_config_algo_prefs(&mut config, &ssh_config);
84
85        // Apply algorithm preferences from opts
86        apply_opts_algo_prefs(&mut config, opts);
87
88        let config = Arc::new(config);
89
90        let mut session = runtime
91            .block_on(async {
92                client::connect(config, ssh_config.address.as_str(), T::default()).await
93            })
94            .map_err(|err| {
95                let msg = format!("SSH connection failed: {err:?}");
96                error!("{msg}");
97                RemoteError::new_ex(RemoteErrorType::ConnectionError, msg)
98            })?;
99
100        // Authenticate
101        auth::authenticate(&mut session, &runtime, opts, &ssh_config)?;
102
103        Ok(Self { runtime, session })
104    }
105
106    fn disconnect(&self) -> RemoteResult<()> {
107        self.runtime
108            .block_on(async {
109                self.session
110                    .disconnect(Disconnect::ByApplication, "Closed by user", "en_US")
111                    .await
112            })
113            .map_err(|err| {
114                log::error!("failed to disconnect {err}");
115                RemoteError::new_ex(RemoteErrorType::ConnectionError, err.to_string())
116            })
117    }
118
119    fn banner(&self) -> RemoteResult<Option<String>> {
120        // russh delivers the auth banner via the Handler::auth_banner callback
121        // during authentication, but does not expose it from the Handle after the fact.
122        // <https://docs.rs/russh/latest/russh/client/struct.Handle.html>
123        // <https://docs.rs/russh/latest/russh/client/trait.Handler.html#method.auth_banner>
124        Ok(None)
125    }
126
127    fn authenticated(&self) -> RemoteResult<bool> {
128        Ok(!self.session.is_closed())
129    }
130
131    fn cmd<S>(&mut self, cmd: S) -> RemoteResult<(u32, String)>
132    where
133        S: AsRef<str>,
134    {
135        let cmd = cmd.as_ref();
136        trace!("Running command: {cmd}");
137
138        // Escape single quotes and wrap in sh -c for consistent shell behavior.
139        // Without this, commands like "cd /some/dir; somecommand" fail if the
140        // remote user's login shell is fish or another non-POSIX shell.
141        let escaped = cmd.replace('\'', r#"'\''"#);
142        let wrapped = format!("sh -c '{escaped}'");
143
144        self.runtime
145            .block_on(async { perform_shell_cmd(&self.session, &wrapped).await })
146    }
147
148    fn scp_recv(&self, path: &Path) -> RemoteResult<Box<dyn Read + Send>> {
149        self.runtime
150            .block_on(async { scp::recv(&self.session, path).await })
151    }
152
153    fn scp_send(
154        &self,
155        remote_path: &Path,
156        mode: i32,
157        size: u64,
158        _times: Option<(u64, u64)>,
159    ) -> RemoteResult<Box<dyn Write + Send>> {
160        let runtime = self.runtime.clone();
161        self.runtime
162            .block_on(async { scp::send(&self.session, remote_path, mode, size, runtime).await })
163    }
164
165    fn sftp(&self) -> RemoteResult<Self::Sftp> {
166        let channel = self
167            .runtime
168            .block_on(async {
169                let channel = self.session.channel_open_session().await?;
170                channel.request_subsystem(true, "sftp").await?;
171                Ok(channel)
172            })
173            .map_err(|err: russh::Error| {
174                error!("Failed to init SFTP session: {err}");
175                RemoteError::new_ex(RemoteErrorType::ProtocolError, err.to_string())
176            })?;
177
178        self.runtime
179            .block_on(async { SftpSession::new(channel.into_stream()).await })
180            .map(|session| RusshSftp {
181                runtime: self.runtime.clone(),
182                session: Arc::new(session),
183            })
184            .map_err(|err| {
185                error!("Failed to init SFTP session: {err}");
186                RemoteError::new_ex(RemoteErrorType::ProtocolError, err.to_string())
187            })
188    }
189}
190
191impl Sftp for RusshSftp {
192    fn mkdir(&self, path: &Path, mode: i32) -> RemoteResult<()> {
193        let path_str = path.to_string_lossy().to_string();
194        self.runtime.block_on(async {
195            self.session.create_dir(&path_str).await.map_err(|err| {
196                RemoteError::new_ex(
197                    RemoteErrorType::FileCreateDenied,
198                    format!("Could not create directory '{}': {err}", path.display()),
199                )
200            })?;
201            // create_dir does not set permissions; apply them separately
202            let mut attrs = russh_sftp::protocol::FileAttributes::empty();
203            attrs.permissions = Some(mode as u32 & 0o7777);
204            self.session
205                .set_metadata(&path_str, attrs)
206                .await
207                .map_err(|err| {
208                    RemoteError::new_ex(
209                        RemoteErrorType::ProtocolError,
210                        format!("Could not set permissions on '{}': {err}", path.display()),
211                    )
212                })
213        })
214    }
215
216    fn open_read(&self, path: &Path) -> RemoteResult<ReadStream> {
217        let path_str = path.to_string_lossy().to_string();
218        let reader = PipelinedSftpReader::new(self.runtime.clone(), self.session.clone(), path_str)
219            .map_err(|err| {
220                RemoteError::new_ex(
221                    RemoteErrorType::ProtocolError,
222                    format!("Could not read file at '{}': {err}", path.display()),
223                )
224            })?;
225        Ok(ReadStream::from(Box::new(reader) as Box<dyn Read + Send>))
226    }
227
228    fn open_write(&self, path: &Path, flags: WriteMode, mode: i32) -> RemoteResult<WriteStream> {
229        let path_str = path.to_string_lossy().to_string();
230        self.runtime.block_on(async {
231            let open_flags = match flags {
232                WriteMode::Append => {
233                    russh_sftp::protocol::OpenFlags::WRITE
234                        | russh_sftp::protocol::OpenFlags::APPEND
235                        | russh_sftp::protocol::OpenFlags::CREATE
236                }
237                WriteMode::Truncate => {
238                    russh_sftp::protocol::OpenFlags::WRITE
239                        | russh_sftp::protocol::OpenFlags::CREATE
240                        | russh_sftp::protocol::OpenFlags::TRUNCATE
241                }
242            };
243
244            let mut attrs = russh_sftp::protocol::FileAttributes::empty();
245            attrs.permissions = Some(mode as u32 & 0o7777);
246
247            let file = self
248                .session
249                .open_with_flags_and_attributes(&path_str, open_flags, attrs)
250                .await
251                .map_err(|err| {
252                    RemoteError::new_ex(
253                        RemoteErrorType::ProtocolError,
254                        format!("Could not open file at '{}': {err}", path.display()),
255                    )
256                })?;
257
258            let writer = SftpFileWriter {
259                file,
260                runtime: self.runtime.clone(),
261            };
262            Ok(WriteStream::from(
263                Box::new(writer) as Box<dyn remotefs::fs::stream::WriteAndSeek>
264            ))
265        })
266    }
267
268    fn readdir<T>(&self, dirname: T) -> RemoteResult<Vec<File>>
269    where
270        T: AsRef<Path>,
271    {
272        let dirname = dirname.as_ref();
273        let dir_str = dirname.to_string_lossy().to_string();
274        self.runtime.block_on(async {
275            let entries = self.session.read_dir(&dir_str).await.map_err(|err| {
276                RemoteError::new_ex(
277                    RemoteErrorType::ProtocolError,
278                    format!("Could not read directory: {err}"),
279                )
280            })?;
281
282            let mut files = Vec::new();
283            for entry in entries {
284                let entry_path = dirname.join(entry.file_name());
285                let symlink = if entry.file_type().is_symlink() {
286                    match self
287                        .session
288                        .read_link(entry_path.to_string_lossy().as_ref())
289                        .await
290                    {
291                        Ok(target) => Some(PathBuf::from(target)),
292                        Err(err) => {
293                            error!(
294                                "Failed to read link of {} (even though it's a symlink): {err}",
295                                entry_path.display()
296                            );
297                            None
298                        }
299                    }
300                } else {
301                    None
302                };
303                files.push(make_fsentry(&entry_path, &entry.metadata(), symlink));
304            }
305
306            Ok(files)
307        })
308    }
309
310    fn realpath(&self, path: &Path) -> RemoteResult<PathBuf> {
311        let path_str = path.to_string_lossy().to_string();
312        self.runtime.block_on(async {
313            self.session
314                .canonicalize(&path_str)
315                .await
316                .map(PathBuf::from)
317                .map_err(|err| {
318                    RemoteError::new_ex(
319                        RemoteErrorType::ProtocolError,
320                        format!(
321                            "Could not resolve real path for '{}': {err}",
322                            path.display()
323                        ),
324                    )
325                })
326        })
327    }
328
329    fn rename(&self, src: &Path, dest: &Path) -> RemoteResult<()> {
330        let src_str = src.to_string_lossy().to_string();
331        let dest_str = dest.to_string_lossy().to_string();
332        self.runtime.block_on(async {
333            self.session
334                .rename(&src_str, &dest_str)
335                .await
336                .map_err(|err| {
337                    RemoteError::new_ex(
338                        RemoteErrorType::ProtocolError,
339                        format!("Could not rename file '{}': {err}", src.display()),
340                    )
341                })
342        })
343    }
344
345    fn rmdir(&self, path: &Path) -> RemoteResult<()> {
346        let path_str = path.to_string_lossy().to_string();
347        self.runtime.block_on(async {
348            self.session.remove_dir(&path_str).await.map_err(|err| {
349                RemoteError::new_ex(
350                    RemoteErrorType::CouldNotRemoveFile,
351                    format!("Could not remove directory '{}': {err}", path.display()),
352                )
353            })
354        })
355    }
356
357    fn setstat(&self, path: &Path, metadata: Metadata) -> RemoteResult<()> {
358        let path_str = path.to_string_lossy().to_string();
359        let attrs = metadata_to_file_attributes(metadata);
360        self.runtime.block_on(async {
361            self.session
362                .set_metadata(&path_str, attrs)
363                .await
364                .map_err(|err| {
365                    RemoteError::new_ex(
366                        RemoteErrorType::ProtocolError,
367                        format!(
368                            "Could not set file attributes for '{}': {err}",
369                            path.display()
370                        ),
371                    )
372                })
373        })
374    }
375
376    fn stat(&self, filename: &Path) -> RemoteResult<File> {
377        let path_str = filename.to_string_lossy().to_string();
378        self.runtime.block_on(async {
379            let attrs = self.session.metadata(&path_str).await.map_err(|err| {
380                RemoteError::new_ex(
381                    RemoteErrorType::ProtocolError,
382                    format!(
383                        "Could not get file attributes for '{}': {err}",
384                        filename.display()
385                    ),
386                )
387            })?;
388
389            let symlink = if attrs.is_symlink() {
390                match self.session.read_link(&path_str).await {
391                    Ok(target) => Some(PathBuf::from(target)),
392                    Err(err) => {
393                        error!(
394                            "Failed to read link of {} (even though it's a symlink): {err}",
395                            filename.display()
396                        );
397                        None
398                    }
399                }
400            } else {
401                None
402            };
403
404            Ok(make_fsentry(filename, &attrs, symlink))
405        })
406    }
407
408    fn symlink(&self, path: &Path, target: &Path) -> RemoteResult<()> {
409        let path_str = path.to_string_lossy().to_string();
410        let target_str = target.to_string_lossy().to_string();
411        self.runtime.block_on(async {
412            self.session
413                .symlink(&path_str, &target_str)
414                .await
415                .map_err(|err| {
416                    RemoteError::new_ex(
417                        RemoteErrorType::FileCreateDenied,
418                        format!("Could not create symlink '{}': {err}", path.display()),
419                    )
420                })
421        })
422    }
423
424    fn unlink(&self, path: &Path) -> RemoteResult<()> {
425        let path_str = path.to_string_lossy().to_string();
426        self.runtime.block_on(async {
427            self.session.remove_file(&path_str).await.map_err(|err| {
428                RemoteError::new_ex(
429                    RemoteErrorType::CouldNotRemoveFile,
430                    format!("Could not remove file '{}': {err}", path.display()),
431                )
432            })
433        })
434    }
435}
436
437/// Convert `remotefs::fs::Metadata` to `russh_sftp::protocol::FileAttributes`.
438fn metadata_to_file_attributes(metadata: Metadata) -> russh_sftp::protocol::FileAttributes {
439    let atime = metadata
440        .accessed
441        .and_then(|x| x.duration_since(std::time::UNIX_EPOCH).ok())
442        .map(|x| x.as_secs() as u32);
443    let mtime = metadata
444        .modified
445        .and_then(|x| x.duration_since(std::time::UNIX_EPOCH).ok())
446        .map(|x| x.as_secs() as u32);
447    russh_sftp::protocol::FileAttributes {
448        size: Some(metadata.size),
449        uid: metadata.uid,
450        user: None,
451        gid: metadata.gid,
452        group: None,
453        permissions: metadata.mode.map(u32::from),
454        atime,
455        mtime,
456    }
457}
458
459/// Build a `remotefs::File` from a path and russh-sftp `FileAttributes`.
460fn make_fsentry(
461    path: &Path,
462    attrs: &russh_sftp::protocol::FileAttributes,
463    symlink: Option<PathBuf>,
464) -> File {
465    let name = match path.file_name() {
466        None => "/".to_string(),
467        Some(name) => name.to_string_lossy().to_string(),
468    };
469    debug!("Found file {name}");
470
471    let uid = attrs.uid;
472    let gid = attrs.gid;
473    let mode = attrs.permissions.map(remotefs::fs::UnixPex::from);
474    let size = attrs.size.unwrap_or(0);
475    let accessed = attrs.atime.map(|x| {
476        std::time::UNIX_EPOCH
477            .checked_add(std::time::Duration::from_secs(u64::from(x)))
478            .unwrap_or(std::time::UNIX_EPOCH)
479    });
480    let modified = attrs.mtime.map(|x| {
481        std::time::UNIX_EPOCH
482            .checked_add(std::time::Duration::from_secs(u64::from(x)))
483            .unwrap_or(std::time::UNIX_EPOCH)
484    });
485
486    let file_type = if symlink.is_some() {
487        remotefs::fs::FileType::Symlink
488    } else if attrs.is_dir() {
489        remotefs::fs::FileType::Directory
490    } else {
491        remotefs::fs::FileType::File
492    };
493
494    let entry_metadata = Metadata {
495        accessed,
496        created: None,
497        file_type,
498        gid,
499        mode,
500        modified,
501        size,
502        symlink,
503        uid,
504    };
505    trace!("Metadata for {}: {:?}", path.display(), entry_metadata);
506    File {
507        path: path.to_path_buf(),
508        metadata: entry_metadata,
509    }
510}
511
512/// Synchronous writer wrapping a russh-sftp [`russh_sftp::client::fs::File`].
513///
514/// Stores the full `Arc<Runtime>` rather than just a `Handle` so that
515/// `Runtime::block_on` drives IO and background tasks on a current-thread
516/// runtime.
517struct SftpFileWriter {
518    file: russh_sftp::client::fs::File,
519    runtime: Arc<Runtime>,
520}
521
522impl Write for SftpFileWriter {
523    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
524        use tokio::io::AsyncWriteExt as _;
525        self.runtime.block_on(self.file.write(buf))
526    }
527
528    fn flush(&mut self) -> std::io::Result<()> {
529        use tokio::io::AsyncWriteExt as _;
530        self.runtime.block_on(self.file.flush())
531    }
532}
533
534impl Seek for SftpFileWriter {
535    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
536        use tokio::io::AsyncSeekExt as _;
537        self.runtime.block_on(self.file.seek(pos))
538    }
539}
540
541impl remotefs::fs::stream::WriteAndSeek for SftpFileWriter {}
542
543/// Number of concurrent SFTP file handles used per batch in pipelined reads.
544const SFTP_PIPELINE_DEPTH: usize = 4;
545
546/// Size of each chunk read by a single pipeline task (4 MiB).
547const SFTP_CHUNK_SIZE: usize = 4 * 1024 * 1024;
548
549/// Maximum number of completed batches to buffer ahead of the current read
550/// position. Caps memory usage to roughly `(MAX_PREFETCH + 1) * BATCH_SIZE`.
551const MAX_PREFETCH: usize = 2;
552
553/// Batch size: [`SFTP_PIPELINE_DEPTH`] * [`SFTP_CHUNK_SIZE`] = 16 MiB.
554const BATCH_SIZE: usize = SFTP_PIPELINE_DEPTH * SFTP_CHUNK_SIZE;
555
556/// A streaming SFTP reader that pipelines reads in batches.
557///
558/// Each batch spawns [`SFTP_PIPELINE_DEPTH`] concurrent SFTP read tasks of
559/// [`SFTP_CHUNK_SIZE`] bytes. Up to [`MAX_PREFETCH`] batches are fetched ahead
560/// of the current read position so the caller receives data immediately while
561/// keeping memory bounded.
562struct PipelinedSftpReader {
563    runtime: Arc<Runtime>,
564    session: Arc<SftpSession>,
565    path: String,
566    file_size: usize,
567    /// Next byte offset to start fetching from the remote file.
568    fetch_offset: usize,
569    /// Completed batches ready for consumption, front = current.
570    batches: std::collections::VecDeque<Vec<u8>>,
571    /// Read cursor within `batches[0]`.
572    buf_cursor: usize,
573    /// Background pre-fetch task, if any.
574    pending: Option<PrefetchTask>,
575}
576
577/// In-flight background batch fetch.
578struct PrefetchTask {
579    /// The byte offset this batch starts at — used to roll back
580    /// `fetch_offset` on failure.
581    batch_offset: usize,
582    handle: tokio::task::JoinHandle<Result<Vec<u8>, std::io::Error>>,
583}
584
585impl PipelinedSftpReader {
586    /// Creates a new streaming reader.
587    ///
588    /// Eagerly fetches the first batch and starts a background pre-fetch for
589    /// the second batch so the caller can start reading immediately.
590    fn new(
591        runtime: Arc<Runtime>,
592        session: Arc<SftpSession>,
593        path: String,
594    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
595        let metadata = runtime.block_on(session.metadata(&path))?;
596        let file_size = metadata.size.unwrap_or(0) as usize;
597
598        let mut reader = Self {
599            runtime,
600            session,
601            path,
602            file_size,
603            fetch_offset: 0,
604            batches: std::collections::VecDeque::new(),
605            buf_cursor: 0,
606            pending: None,
607        };
608
609        if file_size == 0 {
610            return Ok(reader);
611        }
612
613        // Eagerly fetch the first batch so data is available immediately.
614        let first_batch = reader.fetch_batch_blocking()?;
615        reader.batches.push_back(first_batch);
616
617        // Start background pre-fetch for the next batch.
618        reader.maybe_start_prefetch();
619
620        Ok(reader)
621    }
622
623    /// Fetches the next batch synchronously by blocking on the runtime.
624    fn fetch_batch_blocking(
625        &mut self,
626    ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
627        let remaining = self.file_size.saturating_sub(self.fetch_offset);
628        if remaining == 0 {
629            return Ok(Vec::new());
630        }
631
632        let batch_len = remaining.min(BATCH_SIZE);
633        let offset = self.fetch_offset;
634        let batch = self
635            .runtime
636            .block_on(Self::fetch_batch(
637                self.session.clone(),
638                self.path.clone(),
639                offset,
640                batch_len,
641            ))
642            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
643
644        self.fetch_offset += batch_len;
645        Ok(batch)
646    }
647
648    /// Spawns a background batch fetch if there is more data and the prefetch
649    /// queue is not full.
650    fn maybe_start_prefetch(&mut self) {
651        if self.pending.is_some() {
652            return;
653        }
654        if self.batches.len() > MAX_PREFETCH {
655            return;
656        }
657        let remaining = self.file_size.saturating_sub(self.fetch_offset);
658        if remaining == 0 {
659            return;
660        }
661
662        let batch_len = remaining.min(BATCH_SIZE);
663        let session = self.session.clone();
664        let path = self.path.clone();
665        let offset = self.fetch_offset;
666        // Speculatively advance; rolled back in collect_pending on failure.
667        self.fetch_offset += batch_len;
668
669        let handle = self
670            .runtime
671            .spawn(async move { Self::fetch_batch(session, path, offset, batch_len).await });
672
673        self.pending = Some(PrefetchTask {
674            batch_offset: offset,
675            handle,
676        });
677    }
678
679    /// Collects the result of a pending pre-fetch task.
680    ///
681    /// On failure, rolls back `fetch_offset` so the batch can be retried.
682    fn collect_pending(&mut self) -> std::io::Result<Option<Vec<u8>>> {
683        let task = match self.pending.take() {
684            Some(t) => t,
685            None => return Ok(None),
686        };
687
688        match self
689            .runtime
690            .block_on(task.handle)
691            .map_err(std::io::Error::other)?
692        {
693            Ok(batch) if batch.is_empty() => Ok(None),
694            Ok(batch) => Ok(Some(batch)),
695            Err(err) => {
696                // Roll back so the caller (or a retry) can re-fetch this range.
697                self.fetch_offset = task.batch_offset;
698                Err(std::io::Error::other(err))
699            }
700        }
701    }
702
703    /// Fetches a single batch: spawns [`SFTP_PIPELINE_DEPTH`] concurrent reads
704    /// and assembles the result into a contiguous buffer.
705    async fn fetch_batch(
706        session: Arc<SftpSession>,
707        path: String,
708        batch_offset: usize,
709        batch_len: usize,
710    ) -> Result<Vec<u8>, std::io::Error> {
711        use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _};
712
713        let chunk_count = batch_len.div_ceil(SFTP_CHUNK_SIZE);
714        let mut tasks = Vec::with_capacity(chunk_count);
715
716        for i in 0..chunk_count {
717            let chunk_offset = i * SFTP_CHUNK_SIZE;
718            let len = SFTP_CHUNK_SIZE.min(batch_len - chunk_offset);
719            let abs_offset = batch_offset + chunk_offset;
720
721            let mut file = session.open(&path).await.map_err(std::io::Error::other)?;
722            file.seek(std::io::SeekFrom::Start(abs_offset as u64))
723                .await?;
724
725            tasks.push(tokio::spawn(async move {
726                let mut buf = vec![0_u8; len];
727                file.read_exact(&mut buf).await?;
728                Ok::<(usize, Vec<u8>), std::io::Error>((chunk_offset, buf))
729            }));
730        }
731
732        let mut result = vec![0_u8; batch_len];
733        for task in tasks {
734            let (chunk_offset, chunk) = task
735                .await
736                .map_err(std::io::Error::other)?
737                .map_err(std::io::Error::other)?;
738            result[chunk_offset..chunk_offset + chunk.len()].copy_from_slice(&chunk);
739        }
740
741        Ok(result)
742    }
743}
744
745impl Read for PipelinedSftpReader {
746    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
747        loop {
748            // Try to serve from the current front batch.
749            if let Some(front) = self.batches.front() {
750                let available = &front[self.buf_cursor..];
751                if !available.is_empty() {
752                    let to_copy = available.len().min(buf.len());
753                    buf[..to_copy].copy_from_slice(&available[..to_copy]);
754                    self.buf_cursor += to_copy;
755                    return Ok(to_copy);
756                }
757
758                // Current batch fully consumed — pop it.
759                self.batches.pop_front();
760                self.buf_cursor = 0;
761
762                // Collect the pending pre-fetch if any.
763                if let Some(batch) = self.collect_pending()? {
764                    self.batches.push_back(batch);
765                }
766
767                // Kick off next pre-fetch.
768                self.maybe_start_prefetch();
769
770                continue;
771            }
772
773            // No batches buffered — try to collect pending.
774            if let Some(batch) = self.collect_pending()? {
775                self.batches.push_back(batch);
776                self.maybe_start_prefetch();
777                continue;
778            }
779
780            // Nothing left — EOF.
781            return Ok(0);
782        }
783    }
784}
785
786/// Apply algorithm preferences from SSH config to the russh [`client::Config`].
787fn apply_config_algo_prefs(config: &mut client::Config, ssh_config: &Config) {
788    let params = &ssh_config.params;
789
790    // KEX algorithms
791    let kex: Vec<russh::kex::Name> = params
792        .kex_algorithms
793        .algorithms()
794        .iter()
795        .filter_map(|name| {
796            russh::kex::Name::try_from(name.as_str())
797                .map_err(|()| warn!("Unsupported KEX algorithm: {name}"))
798                .ok()
799        })
800        .collect();
801    if !kex.is_empty() {
802        config.preferred.kex = Cow::Owned(kex);
803    }
804
805    // Host key algorithms
806    let host_keys: Vec<Algorithm> = params
807        .host_key_algorithms
808        .algorithms()
809        .iter()
810        .filter_map(|name| {
811            name.parse::<Algorithm>()
812                .map_err(|err| warn!("Unsupported host key algorithm '{name}': {err}"))
813                .ok()
814        })
815        .collect();
816    if !host_keys.is_empty() {
817        config.preferred.key = Cow::Owned(host_keys);
818    }
819
820    // Cipher algorithms
821    let ciphers: Vec<russh::cipher::Name> = params
822        .ciphers
823        .algorithms()
824        .iter()
825        .filter_map(|name| {
826            russh::cipher::Name::try_from(name.as_str())
827                .map_err(|()| warn!("Unsupported cipher algorithm: {name}"))
828                .ok()
829        })
830        .collect();
831    if !ciphers.is_empty() {
832        config.preferred.cipher = Cow::Owned(ciphers);
833    }
834
835    // MAC algorithms
836    let macs: Vec<russh::mac::Name> = params
837        .mac
838        .algorithms()
839        .iter()
840        .filter_map(|name| {
841            russh::mac::Name::try_from(name.as_str())
842                .map_err(|()| warn!("Unsupported MAC algorithm: {name}"))
843                .ok()
844        })
845        .collect();
846    if !macs.is_empty() {
847        config.preferred.mac = Cow::Owned(macs);
848    }
849}
850
851/// Apply algorithm preferences from [`SshOpts`] methods to the russh [`client::Config`].
852///
853/// Options from `SshOpts::methods` override those from the SSH config file.
854fn apply_opts_algo_prefs(config: &mut client::Config, opts: &SshOpts) {
855    for method in opts.methods.iter() {
856        let algos = method.prefs();
857        let names: Vec<&str> = algos.split(',').collect();
858
859        match method.method_type {
860            MethodType::Kex => {
861                let kex: Vec<russh::kex::Name> = names
862                    .iter()
863                    .filter_map(|name| {
864                        russh::kex::Name::try_from(*name)
865                            .map_err(|()| warn!("Unsupported KEX algorithm: {name}"))
866                            .ok()
867                    })
868                    .collect();
869                if !kex.is_empty() {
870                    config.preferred.kex = Cow::Owned(kex);
871                }
872            }
873            MethodType::HostKey => {
874                let keys: Vec<Algorithm> = names
875                    .iter()
876                    .filter_map(|name| {
877                        name.parse::<Algorithm>()
878                            .map_err(|err| warn!("Unsupported host key algorithm '{name}': {err}"))
879                            .ok()
880                    })
881                    .collect();
882                if !keys.is_empty() {
883                    config.preferred.key = Cow::Owned(keys);
884                }
885            }
886            MethodType::CryptClientServer | MethodType::CryptServerClient => {
887                let ciphers: Vec<russh::cipher::Name> = names
888                    .iter()
889                    .filter_map(|name| {
890                        russh::cipher::Name::try_from(*name)
891                            .map_err(|()| warn!("Unsupported cipher algorithm: {name}"))
892                            .ok()
893                    })
894                    .collect();
895                if !ciphers.is_empty() {
896                    config.preferred.cipher = Cow::Owned(ciphers);
897                }
898            }
899            MethodType::MacClientServer | MethodType::MacServerClient => {
900                let macs: Vec<russh::mac::Name> = names
901                    .iter()
902                    .filter_map(|name| {
903                        russh::mac::Name::try_from(*name)
904                            .map_err(|()| warn!("Unsupported MAC algorithm: {name}"))
905                            .ok()
906                    })
907                    .collect();
908                if !macs.is_empty() {
909                    config.preferred.mac = Cow::Owned(macs);
910                }
911            }
912            _ => {
913                trace!(
914                    "Ignoring unsupported method type {:?} for russh backend",
915                    method.method_type
916                );
917            }
918        }
919    }
920}
921
922/// Execute a shell command on the remote server via a russh channel.
923///
924/// Opens a session channel, executes the command, collects stdout,
925/// and returns the exit code with the output.
926async fn perform_shell_cmd<T>(session: &Handle<T>, cmd: &str) -> RemoteResult<(u32, String)>
927where
928    T: Handler,
929{
930    let mut channel = open_channel(session).await?;
931
932    channel.exec(true, cmd).await.map_err(|err| {
933        RemoteError::new_ex(
934            RemoteErrorType::ProtocolError,
935            format!("Could not execute command \"{cmd}\": {err}"),
936        )
937    })?;
938
939    let mut output = String::new();
940    let mut exit_code: Option<u32> = None;
941
942    while let Some(msg) = channel.wait().await {
943        match msg {
944            russh::ChannelMsg::Data { data } => {
945                output.push_str(&String::from_utf8_lossy(&data));
946            }
947            russh::ChannelMsg::ExitStatus { exit_status } => {
948                exit_code = Some(exit_status);
949            }
950            russh::ChannelMsg::Close => break,
951            russh::ChannelMsg::Eof => {}
952            _ => {}
953        }
954    }
955
956    let rc = exit_code.unwrap_or_else(|| {
957        warn!("No exit status received for command \"{cmd}\", defaulting to 1");
958        1
959    });
960
961    trace!("Command output: {output}");
962    debug!(r#"Command output: "{output}"; exit code: {rc}"#);
963
964    Ok((rc, output))
965}
966
967/// Open a session channel on the given handle.
968async fn open_channel<T>(session: &Handle<T>) -> RemoteResult<russh::Channel<russh::client::Msg>>
969where
970    T: Handler,
971{
972    session.channel_open_session().await.map_err(|err| {
973        RemoteError::new_ex(
974            RemoteErrorType::ProtocolError,
975            format!("Could not open channel: {err}"),
976        )
977    })
978}
979
980#[cfg(test)]
981mod test {
982
983    use std::sync::Arc;
984
985    use ssh2_config::ParseRule;
986
987    use super::*;
988    use crate::mock::ssh as ssh_mock;
989
990    fn test_runtime() -> Arc<Runtime> {
991        Arc::new(
992            tokio::runtime::Builder::new_current_thread()
993                .enable_all()
994                .build()
995                .unwrap(),
996        )
997    }
998
999    #[test]
1000    fn should_connect_to_ssh_server_auth_user_password() {
1001        use crate::ssh::container::OpensshServer;
1002
1003        let container = OpensshServer::start();
1004        let port = container.port();
1005
1006        crate::mock::logger();
1007        let runtime = test_runtime();
1008        let config_file = ssh_mock::create_ssh_config(port);
1009        let opts = SshOpts::new("sftp")
1010            .config_file(config_file.path(), ParseRule::ALLOW_UNKNOWN_FIELDS)
1011            .password("password")
1012            .runtime(runtime);
1013
1014        if let Err(err) = RusshSession::<NoCheckServerKey>::connect(&opts) {
1015            panic!("Could not connect to server: {err}");
1016        }
1017        let session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1018        assert!(session.authenticated().unwrap());
1019
1020        drop(container);
1021    }
1022
1023    #[test]
1024    fn should_connect_to_ssh_server_auth_key() {
1025        use crate::ssh::container::OpensshServer;
1026
1027        let container = OpensshServer::start();
1028        let port = container.port();
1029
1030        crate::mock::logger();
1031        let runtime = test_runtime();
1032        let config_file = ssh_mock::create_ssh_config(port);
1033        let opts = SshOpts::new("sftp")
1034            .config_file(config_file.path(), ParseRule::ALLOW_UNKNOWN_FIELDS)
1035            .key_storage(Box::new(ssh_mock::MockSshKeyStorage::default()))
1036            .runtime(runtime);
1037        let session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1038        assert!(session.authenticated().unwrap());
1039    }
1040
1041    #[test]
1042    fn should_perform_shell_command_on_server() {
1043        crate::mock::logger();
1044        let container = crate::ssh::container::OpensshServer::start();
1045        let port = container.port();
1046
1047        let runtime = test_runtime();
1048        let opts = SshOpts::new("127.0.0.1")
1049            .port(port)
1050            .username("sftp")
1051            .password("password")
1052            .runtime(runtime);
1053        let mut session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1054        assert!(session.authenticated().unwrap());
1055        assert!(session.cmd("pwd").is_ok());
1056    }
1057
1058    #[test]
1059    fn should_perform_shell_command_on_server_and_return_exit_code() {
1060        crate::mock::logger();
1061        let container = crate::ssh::container::OpensshServer::start();
1062        let port = container.port();
1063
1064        let runtime = test_runtime();
1065        let opts = SshOpts::new("127.0.0.1")
1066            .port(port)
1067            .username("sftp")
1068            .password("password")
1069            .runtime(runtime);
1070        let mut session = RusshSession::<NoCheckServerKey>::connect(&opts).unwrap();
1071        assert!(session.authenticated().unwrap());
1072        assert_eq!(
1073            session.cmd_at("pwd", Path::new("/tmp")).ok().unwrap(),
1074            (0, String::from("/tmp\n"))
1075        );
1076        assert_eq!(
1077            session
1078                .cmd_at("pippopluto", Path::new("/tmp"))
1079                .ok()
1080                .unwrap()
1081                .0,
1082            127
1083        );
1084    }
1085
1086    #[test]
1087    fn should_fail_authentication() {
1088        crate::mock::logger();
1089        let container = crate::ssh::container::OpensshServer::start();
1090        let port = container.port();
1091
1092        let runtime = test_runtime();
1093        let opts = SshOpts::new("127.0.0.1")
1094            .port(port)
1095            .username("sftp")
1096            .password("ippopotamo")
1097            .runtime(runtime);
1098        assert!(RusshSession::<NoCheckServerKey>::connect(&opts).is_err());
1099    }
1100
1101    #[test]
1102    fn test_filetransfer_sftp_bad_server() {
1103        crate::mock::logger();
1104        let runtime = test_runtime();
1105        let opts = SshOpts::new("myverybad.verybad.server")
1106            .port(10022)
1107            .username("sftp")
1108            .password("ippopotamo")
1109            .runtime(runtime);
1110        assert!(RusshSession::<NoCheckServerKey>::connect(&opts).is_err());
1111    }
1112}