async_ssh2_lite/
sftp.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::Arc,
4};
5
6use ssh2::{File, FileStat, OpenFlags, OpenType, RenameFlags, Session, Sftp};
7
8use crate::{error::Error, session_stream::AsyncSessionStream};
9
10//
11pub struct AsyncSftp<S> {
12    inner: Sftp,
13    sess: Session,
14    stream: Arc<S>,
15}
16
17impl<S> AsyncSftp<S> {
18    pub(crate) fn from_parts(inner: Sftp, sess: Session, stream: Arc<S>) -> Self {
19        Self {
20            inner,
21            sess,
22            stream,
23        }
24    }
25}
26
27impl<S> AsyncSftp<S>
28where
29    S: AsyncSessionStream + Send + Sync + 'static,
30{
31    pub async fn open_mode(
32        &self,
33        filename: &Path,
34        flags: OpenFlags,
35        mode: i32,
36        open_type: OpenType,
37    ) -> Result<AsyncFile<S>, Error> {
38        let file = self
39            .stream
40            .rw_with(
41                || self.inner.open_mode(filename, flags, mode, open_type),
42                &self.sess,
43            )
44            .await?;
45
46        Ok(AsyncFile::from_parts(
47            file,
48            self.sess.clone(),
49            self.stream.clone(),
50        ))
51    }
52
53    pub async fn open(&self, filename: &Path) -> Result<AsyncFile<S>, Error> {
54        let file = self
55            .stream
56            .rw_with(|| self.inner.open(filename), &self.sess)
57            .await?;
58
59        Ok(AsyncFile::from_parts(
60            file,
61            self.sess.clone(),
62            self.stream.clone(),
63        ))
64    }
65
66    pub async fn create(&self, filename: &Path) -> Result<AsyncFile<S>, Error> {
67        let file = self
68            .stream
69            .rw_with(|| self.inner.create(filename), &self.sess)
70            .await?;
71
72        Ok(AsyncFile::from_parts(
73            file,
74            self.sess.clone(),
75            self.stream.clone(),
76        ))
77    }
78
79    pub async fn opendir(&self, dirname: &Path) -> Result<AsyncFile<S>, Error> {
80        let file = self
81            .stream
82            .rw_with(|| self.inner.opendir(dirname), &self.sess)
83            .await?;
84
85        Ok(AsyncFile::from_parts(
86            file,
87            self.sess.clone(),
88            self.stream.clone(),
89        ))
90    }
91
92    pub async fn readdir(&self, dirname: &Path) -> Result<Vec<(PathBuf, FileStat)>, Error> {
93        // Copy from ssh2
94        let mut dir = self.opendir(dirname).await?;
95        let mut ret = Vec::new();
96        loop {
97            match dir.readdir().await {
98                Ok((filename, stat)) => {
99                    if &*filename == Path::new(".") || &*filename == Path::new("..") {
100                        continue;
101                    }
102
103                    ret.push((dirname.join(&filename), stat))
104                }
105                Err(Error::Ssh2(ref e))
106                    if e.code() == ssh2::ErrorCode::Session(libssh2_sys::LIBSSH2_ERROR_FILE) =>
107                {
108                    break
109                }
110                Err(e) => return Err(e),
111            }
112        }
113        Ok(ret)
114    }
115
116    pub async fn mkdir(&self, filename: &Path, mode: i32) -> Result<(), Error> {
117        self.stream
118            .rw_with(|| self.inner.mkdir(filename, mode), &self.sess)
119            .await
120    }
121
122    pub async fn rmdir(&self, filename: &Path) -> Result<(), Error> {
123        self.stream
124            .rw_with(|| self.inner.rmdir(filename), &self.sess)
125            .await
126    }
127
128    pub async fn stat(&self, filename: &Path) -> Result<FileStat, Error> {
129        self.stream
130            .rw_with(|| self.inner.stat(filename), &self.sess)
131            .await
132    }
133
134    pub async fn lstat(&self, filename: &Path) -> Result<FileStat, Error> {
135        self.stream
136            .rw_with(|| self.inner.lstat(filename), &self.sess)
137            .await
138    }
139
140    pub async fn setstat(&self, filename: &Path, stat: FileStat) -> Result<(), Error> {
141        self.stream
142            .rw_with(|| self.inner.setstat(filename, stat.clone()), &self.sess)
143            .await
144    }
145
146    pub async fn symlink(&self, path: &Path, target: &Path) -> Result<(), Error> {
147        self.stream
148            .rw_with(|| self.inner.symlink(path, target), &self.sess)
149            .await
150    }
151
152    pub async fn readlink(&self, path: &Path) -> Result<PathBuf, Error> {
153        self.stream
154            .rw_with(|| self.inner.readlink(path), &self.sess)
155            .await
156    }
157
158    pub async fn realpath(&self, path: &Path) -> Result<PathBuf, Error> {
159        self.stream
160            .rw_with(|| self.inner.realpath(path), &self.sess)
161            .await
162    }
163
164    pub async fn rename(
165        &self,
166        src: &Path,
167        dst: &Path,
168        flags: Option<RenameFlags>,
169    ) -> Result<(), Error> {
170        self.stream
171            .rw_with(|| self.inner.rename(src, dst, flags), &self.sess)
172            .await
173    }
174
175    pub async fn unlink(&self, file: &Path) -> Result<(), Error> {
176        self.stream
177            .rw_with(|| self.inner.unlink(file), &self.sess)
178            .await
179    }
180
181    pub async fn shutdown(&mut self) -> Result<(), Error> {
182        self.stream
183            .rw_with(|| self.inner.shutdown(), &self.sess)
184            .await
185    }
186}
187
188//
189pub struct AsyncFile<S> {
190    inner: File,
191    sess: Session,
192    stream: Arc<S>,
193}
194
195impl<S> AsyncFile<S> {
196    pub(crate) fn from_parts(inner: File, sess: Session, stream: Arc<S>) -> Self {
197        Self {
198            inner,
199            sess,
200            stream,
201        }
202    }
203}
204
205impl<S> AsyncFile<S>
206where
207    S: AsyncSessionStream + Send + Sync + 'static,
208{
209    pub async fn setstat(&mut self, stat: FileStat) -> Result<(), Error> {
210        self.stream
211            .rw_with(|| self.inner.setstat(stat.clone()), &self.sess)
212            .await
213    }
214
215    pub async fn stat(&mut self) -> Result<FileStat, Error> {
216        self.stream.rw_with(|| self.inner.stat(), &self.sess).await
217    }
218
219    pub async fn statvfs(&mut self) -> Result<libssh2_sys::LIBSSH2_SFTP_STATVFS, Error> {
220        self.stream
221            .rw_with(|| self.inner.statvfs(), &self.sess)
222            .await
223    }
224
225    pub async fn readdir(&mut self) -> Result<(PathBuf, FileStat), Error> {
226        self.stream
227            .rw_with(|| self.inner.readdir(), &self.sess)
228            .await
229    }
230
231    pub async fn fsync(&mut self) -> Result<(), Error> {
232        self.stream.rw_with(|| self.inner.fsync(), &self.sess).await
233    }
234
235    #[doc(hidden)]
236    pub async fn close(&mut self) -> Result<(), Error> {
237        self.stream.rw_with(|| self.inner.close(), &self.sess).await
238    }
239}
240
241mod impl_futures_util {
242    use core::{
243        pin::Pin,
244        task::{Context, Poll},
245    };
246    use std::io::{Error as IoError, Read as _, Seek, SeekFrom, Write as _};
247
248    use futures_util::io::{AsyncRead, AsyncSeek, AsyncWrite};
249
250    use super::AsyncFile;
251    use crate::session_stream::AsyncSessionStream;
252
253    //
254    impl<S> AsyncRead for AsyncFile<S>
255    where
256        S: AsyncSessionStream + Send + Sync + 'static,
257    {
258        fn poll_read(
259            self: Pin<&mut Self>,
260            cx: &mut Context<'_>,
261            buf: &mut [u8],
262        ) -> Poll<Result<usize, IoError>> {
263            let this = self.get_mut();
264            let sess = this.sess.clone();
265            let inner = &mut this.inner;
266
267            this.stream.poll_read_with(cx, || inner.read(buf), &sess)
268        }
269    }
270
271    impl<S> AsyncWrite for AsyncFile<S>
272    where
273        S: AsyncSessionStream + Send + Sync + 'static,
274    {
275        fn poll_write(
276            self: Pin<&mut Self>,
277            cx: &mut Context,
278            buf: &[u8],
279        ) -> Poll<Result<usize, IoError>> {
280            let this = self.get_mut();
281            let sess = this.sess.clone();
282            let inner = &mut this.inner;
283
284            this.stream.poll_write_with(cx, || inner.write(buf), &sess)
285        }
286
287        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
288            let this = self.get_mut();
289            let sess = this.sess.clone();
290            let inner = &mut this.inner;
291
292            this.stream.poll_write_with(cx, || inner.flush(), &sess)
293        }
294
295        fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
296            self.poll_flush(cx)
297        }
298    }
299
300    impl<S> AsyncSeek for AsyncFile<S>
301    where
302        S: AsyncSessionStream + Send + Sync + 'static,
303    {
304        fn poll_seek(
305            self: Pin<&mut Self>,
306            cx: &mut Context<'_>,
307            pos: SeekFrom,
308        ) -> Poll<Result<u64, IoError>> {
309            let this = self.get_mut();
310            let sess = this.sess.clone();
311            let inner = &mut this.inner;
312
313            this.stream.poll_read_with(cx, || inner.seek(pos), &sess)
314        }
315    }
316}