1use std::sync::Arc;
22
23use async_trait::async_trait;
24use russh::ChannelStream;
25use russh::{client::Msg, Channel};
26use tokio::io::AsyncWrite;
27use tokio::task::JoinHandle;
28use tokio::{io::AsyncRead, sync::mpsc};
29
30use crate::message::{Init, Message, StatusCode, Version};
31
32mod commands;
33mod dir;
34mod error;
35mod file;
36mod receiver;
37mod request;
38mod stop;
39
40pub use dir::{Dir, DIR_CLOSED};
41pub use error::Error;
42pub use file::{File, FILE_CLOSED};
43pub use request::{SftpFuture, SftpReply, SftpRequest};
44use stop::SftpClientStopping;
45
46#[derive(Default, Clone)]
74pub struct SftpClient {
75 commands: Option<mpsc::UnboundedSender<receiver::Request>>,
76 request_processor: Option<Arc<JoinHandle<()>>>,
77}
78
79pub static SFTP_CLIENT_STOPPED: SftpClient = SftpClient::new_stopped();
80
81impl SftpClient {
82 pub const fn new_stopped() -> Self {
85 Self {
86 commands: None,
87 request_processor: None,
88 }
89 }
90
91 pub async fn new<T: IntoSftpStream>(ssh: T) -> Result<Self, Error> {
97 Self::with_stream(ssh.into_sftp_stream().await?).await
98 }
99
100 pub async fn with_stream(
102 mut stream: impl AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
103 ) -> Result<Self, Error> {
104 receiver::write_msg(
106 &mut stream,
107 Message::Init(Init {
108 version: 3,
109 extensions: Default::default(),
110 }),
111 3,
112 )
113 .await?;
114
115 match receiver::read_msg(&mut stream).await? {
116 (
118 _,
119 Message::Version(Version {
120 version: 3,
121 extensions: _,
122 }),
123 ) => (),
124
125 (_, Message::Version(_)) => {
127 return Err(StatusCode::BadMessage
128 .to_status("Invalid sftp version")
129 .into());
130 }
131 _ => {
132 return Err(StatusCode::BadMessage.to_status("Bad SFTP init").into());
133 }
134 }
135
136 let (receiver, tx) = receiver::Receiver::new(stream);
137 let request_processor = tokio::spawn(receiver.run());
138
139 Ok(Self {
140 commands: Some(tx),
141 request_processor: Some(Arc::new(request_processor)),
142 })
143 }
144}
145
146impl std::fmt::Debug for SftpClient {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 write!(f, "SftpClient")
149 }
150}
151
152#[async_trait]
154pub trait IntoSftpStream {
155 type Stream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static;
156 async fn into_sftp_stream(self) -> Result<Self::Stream, Error>;
157}
158
159#[async_trait]
160impl IntoSftpStream for ChannelStream<Msg> {
161 type Stream = ChannelStream<Msg>;
162 async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
163 Ok(self)
164 }
165}
166
167#[async_trait]
168impl IntoSftpStream for Channel<Msg> {
169 type Stream = ChannelStream<Msg>;
170 async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
171 self.request_subsystem(false, "sftp").await?;
173
174 Ok(self.into_stream())
175 }
176}
177
178#[async_trait]
179impl<H: russh::client::Handler> IntoSftpStream for &russh::client::Handle<H> {
180 type Stream = ChannelStream<Msg>;
181 async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
182 self.channel_open_session().await?.into_sftp_stream().await
183 }
184}
185
186#[async_trait]
187impl<H: russh::client::Handler> IntoSftpStream for russh::client::Handle<H> {
188 type Stream = ChannelStream<Msg>;
189 async fn into_sftp_stream(self) -> Result<Self::Stream, Error> {
190 (&self).into_sftp_stream().await
191 }
192}