rusftp/client/
mod.rs

1// This file is part of the rusftp project
2//
3// Copyright (C) ANEO, 2024-2024. All rights reserved.
4//
5// Licensed under the Apache License, Version 2.0 (the "License")
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! SFTP client module.
18//!
19//! See [`SftpClient`]
20
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use russh::ChannelStream;
25use russh::{client::Msg, Channel};
26use tokio::io::AsyncWrite;
27use tokio::task::JoinHandle;
28use tokio::{io::AsyncRead, sync::mpsc};
29
30use crate::message::{Init, Message, StatusCode, Version};
31
32mod commands;
33mod dir;
34mod error;
35mod file;
36mod receiver;
37mod request;
38mod stop;
39
40pub use dir::{Dir, DIR_CLOSED};
41pub use error::Error;
42pub use file::{File, FILE_CLOSED};
43pub use request::{SftpFuture, SftpReply, SftpRequest};
44use stop::SftpClientStopping;
45
46/// SFTP client
47///
48/// # Example
49///
50/// ```no_run
51/// # use std::sync::Arc;
52/// # use async_trait::async_trait;
53/// # struct ClientHandler;
54/// #
55/// # #[async_trait]
56/// # impl russh::client::Handler for ClientHandler {
57/// #    type Error = russh::Error;
58/// #    // ...
59/// # }
60/// #
61/// # async fn dummy() -> Result<(), Box<dyn std::error::Error>> {
62/// /// Create ssh client
63/// let config = Arc::new(russh::client::Config::default());
64/// let mut ssh = russh::client::connect(config, ("localhost", 2222), ClientHandler).await?;
65/// ssh.authenticate_password("user", "pass").await?;
66///
67/// // Create SFTP client
68/// let sftp = rusftp::client::SftpClient::new(&ssh).await?;
69/// println!("stat '.': {:?}", sftp.stat(".").await?);
70/// # Ok(())
71/// # }
72/// ```
73#[derive(Default, Clone)]
74pub struct SftpClient {
75    commands: Option<mpsc::UnboundedSender<receiver::Request>>,
76    request_processor: Option<Arc<JoinHandle<()>>>,
77}
78
79pub static SFTP_CLIENT_STOPPED: SftpClient = SftpClient::new_stopped();
80
81impl SftpClient {
82    /// Creates a stopped client.
83    /// This client cannot be opened.
84    pub const fn new_stopped() -> Self {
85        Self {
86            commands: None,
87            request_processor: None,
88        }
89    }
90
91    /// Creates a new client from a ssh connection.
92    ///
93    /// `ssh` can be a [`russh::Channel<Msg>`]
94    /// or a [`russh::client::Handler`].
95    /// In case of the handler, it can be moved or borrowed.
96    pub async fn new<T: IntoSftpStream>(ssh: T) -> Result<Self, Error> {
97        Self::with_stream(ssh.into_sftp_stream().await?).await
98    }
99
100    /// Creates a new client from a stream ([`AsyncRead`] + [`AsyncWrite`]).
101    pub async fn with_stream(
102        mut stream: impl AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
103    ) -> Result<Self, Error> {
104        // Init SFTP handshake
105        receiver::write_msg(
106            &mut stream,
107            Message::Init(Init {
108                version: 3,
109                extensions: Default::default(),
110            }),
111            3,
112        )
113        .await?;
114
115        match receiver::read_msg(&mut stream).await? {
116            // Valid response: continue
117            (
118                _,
119                Message::Version(Version {
120                    version: 3,
121                    extensions: _,
122                }),
123            ) => (),
124
125            // Invalid responses: abort
126            (_, Message::Version(_)) => {
127                return Err(StatusCode::BadMessage
128                    .to_status("Invalid sftp version")
129                    .into());
130            }
131            _ => {
132                return Err(StatusCode::BadMessage.to_status("Bad SFTP init").into());
133            }
134        }
135
136        let (receiver, tx) = receiver::Receiver::new(stream);
137        let request_processor = tokio::spawn(receiver.run());
138
139        Ok(Self {
140            commands: Some(tx),
141            request_processor: Some(Arc::new(request_processor)),
142        })
143    }
144}
145
146impl std::fmt::Debug for SftpClient {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        write!(f, "SftpClient")
149    }
150}
151
152/// Convert the object to a SSH channel
153#[async_trait]
154pub trait IntoSftpStream {
155    type Stream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static;
156    async fn into_sftp_stream(self) -> Result<Self::Stream, Error>;
157}
158
159#[async_trait]
160impl IntoSftpStream for ChannelStream<Msg> {
161    type Stream = ChannelStream<Msg>;
162    async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
163        Ok(self)
164    }
165}
166
167#[async_trait]
168impl IntoSftpStream for Channel<Msg> {
169    type Stream = ChannelStream<Msg>;
170    async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
171        // Start SFTP subsystem
172        self.request_subsystem(false, "sftp").await?;
173
174        Ok(self.into_stream())
175    }
176}
177
178#[async_trait]
179impl<H: russh::client::Handler> IntoSftpStream for &russh::client::Handle<H> {
180    type Stream = ChannelStream<Msg>;
181    async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
182        self.channel_open_session().await?.into_sftp_stream().await
183    }
184}
185
186#[async_trait]
187impl<H: russh::client::Handler> IntoSftpStream for russh::client::Handle<H> {
188    type Stream = ChannelStream<Msg>;
189    async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
190        (&self).into_sftp_stream().await
191    }
192}