1use std::{
2 path::{Path, PathBuf},
3 sync::Arc,
4};
5
6use ssh2::{File, FileStat, OpenFlags, OpenType, RenameFlags, Session, Sftp};
7
8use crate::{error::Error, session_stream::AsyncSessionStream};
9
10pub struct AsyncSftp<S> {
12 inner: Sftp,
13 sess: Session,
14 stream: Arc<S>,
15}
16
17impl<S> AsyncSftp<S> {
18 pub(crate) fn from_parts(inner: Sftp, sess: Session, stream: Arc<S>) -> Self {
19 Self {
20 inner,
21 sess,
22 stream,
23 }
24 }
25}
26
27impl<S> AsyncSftp<S>
28where
29 S: AsyncSessionStream + Send + Sync + 'static,
30{
31 pub async fn open_mode(
32 &self,
33 filename: &Path,
34 flags: OpenFlags,
35 mode: i32,
36 open_type: OpenType,
37 ) -> Result<AsyncFile<S>, Error> {
38 let file = self
39 .stream
40 .rw_with(
41 || self.inner.open_mode(filename, flags, mode, open_type),
42 &self.sess,
43 )
44 .await?;
45
46 Ok(AsyncFile::from_parts(
47 file,
48 self.sess.clone(),
49 self.stream.clone(),
50 ))
51 }
52
53 pub async fn open(&self, filename: &Path) -> Result<AsyncFile<S>, Error> {
54 let file = self
55 .stream
56 .rw_with(|| self.inner.open(filename), &self.sess)
57 .await?;
58
59 Ok(AsyncFile::from_parts(
60 file,
61 self.sess.clone(),
62 self.stream.clone(),
63 ))
64 }
65
66 pub async fn create(&self, filename: &Path) -> Result<AsyncFile<S>, Error> {
67 let file = self
68 .stream
69 .rw_with(|| self.inner.create(filename), &self.sess)
70 .await?;
71
72 Ok(AsyncFile::from_parts(
73 file,
74 self.sess.clone(),
75 self.stream.clone(),
76 ))
77 }
78
79 pub async fn opendir(&self, dirname: &Path) -> Result<AsyncFile<S>, Error> {
80 let file = self
81 .stream
82 .rw_with(|| self.inner.opendir(dirname), &self.sess)
83 .await?;
84
85 Ok(AsyncFile::from_parts(
86 file,
87 self.sess.clone(),
88 self.stream.clone(),
89 ))
90 }
91
92 pub async fn readdir(&self, dirname: &Path) -> Result<Vec<(PathBuf, FileStat)>, Error> {
93 let mut dir = self.opendir(dirname).await?;
95 let mut ret = Vec::new();
96 loop {
97 match dir.readdir().await {
98 Ok((filename, stat)) => {
99 if &*filename == Path::new(".") || &*filename == Path::new("..") {
100 continue;
101 }
102
103 ret.push((dirname.join(&filename), stat))
104 }
105 Err(Error::Ssh2(ref e))
106 if e.code() == ssh2::ErrorCode::Session(libssh2_sys::LIBSSH2_ERROR_FILE) =>
107 {
108 break
109 }
110 Err(e) => return Err(e),
111 }
112 }
113 Ok(ret)
114 }
115
116 pub async fn mkdir(&self, filename: &Path, mode: i32) -> Result<(), Error> {
117 self.stream
118 .rw_with(|| self.inner.mkdir(filename, mode), &self.sess)
119 .await
120 }
121
122 pub async fn rmdir(&self, filename: &Path) -> Result<(), Error> {
123 self.stream
124 .rw_with(|| self.inner.rmdir(filename), &self.sess)
125 .await
126 }
127
128 pub async fn stat(&self, filename: &Path) -> Result<FileStat, Error> {
129 self.stream
130 .rw_with(|| self.inner.stat(filename), &self.sess)
131 .await
132 }
133
134 pub async fn lstat(&self, filename: &Path) -> Result<FileStat, Error> {
135 self.stream
136 .rw_with(|| self.inner.lstat(filename), &self.sess)
137 .await
138 }
139
140 pub async fn setstat(&self, filename: &Path, stat: FileStat) -> Result<(), Error> {
141 self.stream
142 .rw_with(|| self.inner.setstat(filename, stat.clone()), &self.sess)
143 .await
144 }
145
146 pub async fn symlink(&self, path: &Path, target: &Path) -> Result<(), Error> {
147 self.stream
148 .rw_with(|| self.inner.symlink(path, target), &self.sess)
149 .await
150 }
151
152 pub async fn readlink(&self, path: &Path) -> Result<PathBuf, Error> {
153 self.stream
154 .rw_with(|| self.inner.readlink(path), &self.sess)
155 .await
156 }
157
158 pub async fn realpath(&self, path: &Path) -> Result<PathBuf, Error> {
159 self.stream
160 .rw_with(|| self.inner.realpath(path), &self.sess)
161 .await
162 }
163
164 pub async fn rename(
165 &self,
166 src: &Path,
167 dst: &Path,
168 flags: Option<RenameFlags>,
169 ) -> Result<(), Error> {
170 self.stream
171 .rw_with(|| self.inner.rename(src, dst, flags), &self.sess)
172 .await
173 }
174
175 pub async fn unlink(&self, file: &Path) -> Result<(), Error> {
176 self.stream
177 .rw_with(|| self.inner.unlink(file), &self.sess)
178 .await
179 }
180
181 pub async fn shutdown(&mut self) -> Result<(), Error> {
182 self.stream
183 .rw_with(|| self.inner.shutdown(), &self.sess)
184 .await
185 }
186}
187
188pub struct AsyncFile<S> {
190 inner: File,
191 sess: Session,
192 stream: Arc<S>,
193}
194
195impl<S> AsyncFile<S> {
196 pub(crate) fn from_parts(inner: File, sess: Session, stream: Arc<S>) -> Self {
197 Self {
198 inner,
199 sess,
200 stream,
201 }
202 }
203}
204
205impl<S> AsyncFile<S>
206where
207 S: AsyncSessionStream + Send + Sync + 'static,
208{
209 pub async fn setstat(&mut self, stat: FileStat) -> Result<(), Error> {
210 self.stream
211 .rw_with(|| self.inner.setstat(stat.clone()), &self.sess)
212 .await
213 }
214
215 pub async fn stat(&mut self) -> Result<FileStat, Error> {
216 self.stream.rw_with(|| self.inner.stat(), &self.sess).await
217 }
218
219 pub async fn statvfs(&mut self) -> Result<libssh2_sys::LIBSSH2_SFTP_STATVFS, Error> {
220 self.stream
221 .rw_with(|| self.inner.statvfs(), &self.sess)
222 .await
223 }
224
225 pub async fn readdir(&mut self) -> Result<(PathBuf, FileStat), Error> {
226 self.stream
227 .rw_with(|| self.inner.readdir(), &self.sess)
228 .await
229 }
230
231 pub async fn fsync(&mut self) -> Result<(), Error> {
232 self.stream.rw_with(|| self.inner.fsync(), &self.sess).await
233 }
234
235 #[doc(hidden)]
236 pub async fn close(&mut self) -> Result<(), Error> {
237 self.stream.rw_with(|| self.inner.close(), &self.sess).await
238 }
239}
240
241mod impl_futures_util {
242 use core::{
243 pin::Pin,
244 task::{Context, Poll},
245 };
246 use std::io::{Error as IoError, Read as _, Seek, SeekFrom, Write as _};
247
248 use futures_util::io::{AsyncRead, AsyncSeek, AsyncWrite};
249
250 use super::AsyncFile;
251 use crate::session_stream::AsyncSessionStream;
252
253 impl<S> AsyncRead for AsyncFile<S>
255 where
256 S: AsyncSessionStream + Send + Sync + 'static,
257 {
258 fn poll_read(
259 self: Pin<&mut Self>,
260 cx: &mut Context<'_>,
261 buf: &mut [u8],
262 ) -> Poll<Result<usize, IoError>> {
263 let this = self.get_mut();
264 let sess = this.sess.clone();
265 let inner = &mut this.inner;
266
267 this.stream.poll_read_with(cx, || inner.read(buf), &sess)
268 }
269 }
270
271 impl<S> AsyncWrite for AsyncFile<S>
272 where
273 S: AsyncSessionStream + Send + Sync + 'static,
274 {
275 fn poll_write(
276 self: Pin<&mut Self>,
277 cx: &mut Context,
278 buf: &[u8],
279 ) -> Poll<Result<usize, IoError>> {
280 let this = self.get_mut();
281 let sess = this.sess.clone();
282 let inner = &mut this.inner;
283
284 this.stream.poll_write_with(cx, || inner.write(buf), &sess)
285 }
286
287 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
288 let this = self.get_mut();
289 let sess = this.sess.clone();
290 let inner = &mut this.inner;
291
292 this.stream.poll_write_with(cx, || inner.flush(), &sess)
293 }
294
295 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
296 self.poll_flush(cx)
297 }
298 }
299
300 impl<S> AsyncSeek for AsyncFile<S>
301 where
302 S: AsyncSessionStream + Send + Sync + 'static,
303 {
304 fn poll_seek(
305 self: Pin<&mut Self>,
306 cx: &mut Context<'_>,
307 pos: SeekFrom,
308 ) -> Poll<Result<u64, IoError>> {
309 let this = self.get_mut();
310 let sess = this.sess.clone();
311 let inner = &mut this.inner;
312
313 this.stream.poll_read_with(cx, || inner.seek(pos), &sess)
314 }
315 }
316}