flatline/
sftp.rs

1use std::cell::Cell;
2use std::cmp::min;
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::io;
6use std::mem::transmute;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9
10use derive_new::new;
11use snafu::OptionExt;
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13
14use crate::channel::{BufferChannel, Channel};
15use crate::error::{builder, Result};
16use crate::msg::Request;
17use crate::ssh::common::*;
18use crate::BoxFuture;
19use crate::{
20    error::Error,
21    ssh::{buffer::Buffer, common::code::*},
22};
23
24use super::o_channel;
25use bitflags::bitflags;
26
27bitflags! {
28    // https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-01#section-7.3
29    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
30    pub struct OpenFlags: u32 {
31        // Open the file for reading
32        const READ                        = SSH_FXF_READ;
33        // Open the file for writing.  If both this and SSH_FXF_READ are specified,
34        // the file is opened for both reading and writing.
35        const WRITE                       = SSH_FXF_WRITE;
36        // Force all writes to append data at the end of the file.
37        const APPEND                      = SSH_FXF_APPEND;
38        // If this flag is specified, then a new file will be created if one
39        // does not alread exist (if O_TRUNC is specified, the new file will
40        // be truncated to zero length if it previously exists)
41        const CREAT                       = SSH_FXF_CREAT;
42        // Forces an existing file with the same name to be truncated to zero
43        // length when creating a file by specifying SSH_FXF_CREAT.
44        // SSH_FXF_CREAT MUST also be specified if this flag is used.
45        const TRUNC                       = SSH_FXF_TRUNC;
46        // Causes the request to fail if the named file already exists.
47        // SSH_FXF_CREAT MUST also be specified if this flag is used.
48        const EXCL                        = SSH_FXF_EXCL;
49    }
50}
51
52#[derive(Debug, Clone, Copy)]
53pub struct Statvfs {
54    pub bsize: u64,
55    pub frsize: u64,
56    pub blocks: u64,
57    pub bfree: u64,
58    pub bavail: u64,
59    pub files: u64,
60    pub ffree: u64,
61    pub favail: u64,
62    pub fsid: u64,
63    pub flag: u64,
64    pub namemax: u64,
65}
66
67impl Statvfs {
68    pub const FLAG_RDONLY: u64 = 0x1;
69    pub const FLAG_NOSUID: u64 = 0x2;
70    fn parse(data: &[u8]) -> Option<Self> {
71        let buffer = Buffer::from_slice(data);
72
73        Some(Self {
74            bsize: buffer.take_u64()?,
75            frsize: buffer.take_u64()?,
76            blocks: buffer.take_u64()?,
77            bfree: buffer.take_u64()?,
78            bavail: buffer.take_u64()?,
79            files: buffer.take_u64()?,
80            ffree: buffer.take_u64()?,
81            favail: buffer.take_u64()?,
82            fsid: buffer.take_u64()?,
83            flag: buffer.take_u64()?,
84            namemax: buffer.take_u64()?,
85        })
86    }
87}
88
89pub struct Stream<'a> {
90    sftp: &'a mut SFtp,
91    file: &'a mut File,
92    read_future: Option<BoxFuture<'a, Result<Vec<u8>>>>,
93    write_future: Option<BoxFuture<'a, Result<()>>>,
94}
95
96impl<'a> Stream<'a> {
97    fn poll_read_no_pin(
98        &'a mut self,
99        cx: &mut Context<'_>,
100        buf: &mut ReadBuf<'_>,
101    ) -> Poll<io::Result<()>> {
102        let this = self;
103        if this.read_future.is_none() {
104            let read: BoxFuture<'_, _> = Box::pin(this.sftp.read_file(
105                this.file,
106                if buf.remaining() > u32::MAX as usize {
107                    u32::MAX
108                } else {
109                    buf.remaining() as u32
110                },
111            ));
112            this.read_future = Some(read);
113        }
114
115        let f = this.read_future.as_mut().unwrap().as_mut();
116
117        let res = ready!(f.poll(cx));
118        this.read_future = None;
119        match res {
120            Ok(data) => {
121                if data.len() > buf.remaining() {
122                    return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "data too long")));
123                }
124                buf.put_slice(&data);
125                Poll::Ready(Ok(()))
126            }
127            Err(err) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, Box::new(err)))),
128        }
129    }
130
131    fn poll_write_no_pin(
132        &'a mut self,
133        cx: &mut Context<'_>,
134        buf: &'a [u8],
135    ) -> Poll<io::Result<usize>> {
136        if self.write_future.is_none() {
137            let future: BoxFuture<_> = Box::pin(self.sftp.write_file_buf(self.file, buf));
138
139            self.write_future = Some(future);
140        }
141        let res = ready!(self.write_future.as_mut().unwrap().as_mut().poll(cx));
142
143        self.write_future = None;
144
145        match res {
146            Ok(_) => Poll::Ready(Ok(buf.len())),
147            Err(err) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, Box::new(err)))),
148        }
149    }
150}
151
152impl<'a> AsyncWrite for Stream<'a> {
153    fn poll_write(
154        self: Pin<&mut Self>,
155        cx: &mut Context<'_>,
156        buf: &[u8],
157    ) -> Poll<io::Result<usize>> {
158        let this: &mut Stream<'a> = unsafe { transmute(Pin::into_inner(self)) };
159
160        let buf = unsafe { transmute::<&[u8], &[u8]>(buf) };
161
162        this.poll_write_no_pin(cx, buf)
163    }
164
165    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
166        Poll::Ready(Ok(()))
167    }
168
169    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
170        Poll::Ready(Ok(()))
171    }
172}
173
174impl<'a> AsyncRead for Stream<'a> {
175    fn poll_read(
176        self: Pin<&mut Self>,
177        cx: &mut Context<'_>,
178        buf: &mut ReadBuf<'_>,
179    ) -> Poll<io::Result<()>> {
180        let this: &mut Stream<'a> = unsafe { transmute(Pin::into_inner(self)) };
181        this.poll_read_no_pin(cx, buf)
182        // let this = Pin::into_inner(self);
183        // let this = &mut *this;
184        // if this.read_future.is_none() {
185        //     let read: BoxFuture<'_, _> = Box::pin(this.sftp.read_file(
186        //         &mut this.file,
187        //         if buf.remaining() > u32::MAX as usize {
188        //             u32::MAX
189        //         } else {
190        //             buf.remaining() as u32
191        //         },
192        //     ));
193        //     // this.read_future = unsafe { transmute(Some(read)) };
194        //     this.read_future = Some(read);
195        // }
196
197        // let f = this.read_future.as_mut().unwrap().as_mut();
198
199        // let res = ready!(f.poll(cx));
200        // this.read_future = None;
201        // match res {
202        //     Ok(data) => {
203        //         if data.len() > buf.remaining() {
204        //             return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "data too long")));
205        //         }
206        //         buf.put_slice(&data);
207        //         Poll::Ready(Ok(()))
208        //     }
209        //     Err(err) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, Box::new(err)))),
210        // }
211    }
212}
213
214pub struct SFtp {
215    channel: BufferChannel,
216    request_id: u32,
217    version: u32,
218    ext: HashMap<String, Vec<u8>>,
219    packets: HashMap<u32, Packet>,
220}
221
222impl SFtp {
223    pub(crate) fn new(channel: Channel, version: u32, ext: HashMap<String, Vec<u8>>) -> Self {
224        Self {
225            channel: BufferChannel::new(channel),
226            request_id: 0,
227            version,
228            ext,
229            packets: Default::default(),
230        }
231    }
232}
233
234pub struct File {
235    handle: Vec<u8>,
236    pos: u64,
237}
238
239impl File {
240    fn new(handle: Vec<u8>) -> Self {
241        Self { handle, pos: 0 }
242    }
243}
244
245pub struct Dir {
246    handle: Vec<u8>,
247}
248
249impl Dir {
250    fn new(handle: Vec<u8>) -> Self {
251        Self { handle }
252    }
253}
254
255bitflags! {
256    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
257    pub struct Permissions: u32 {
258        const OTHER_EXEC                        = 1 << 0;
259        const OTHER_WRITE                       = 1 << 1;
260        const OTHER_READ                        = 1 << 2;
261
262        const GROUP_EXEC                        = 1 << 0 << 4;
263        const GROUP_WRITE                       = 1 << 1 << 4;
264        const GROUP_READ                        = 1 << 2 << 4;
265
266        const OWNER_EXEC                        = 1 << 0 << 8;
267        const OWNER_WRITE                       = 1 << 1 << 8;
268        const OWNER_READ                        = 1 << 2 << 8;
269    }
270}
271
272impl Permissions {
273    pub fn p0755() -> Self {
274        Self::from_bits_retain(0o755)
275    }
276}
277
278#[derive(new, Debug, Clone, Copy)]
279pub struct Timestamp {
280    pub atime: u32,
281    pub mtime: u32,
282}
283
284#[derive(new, Debug, Clone, Copy)]
285pub struct User {
286    pub uid: u32,
287    pub gid: u32,
288}
289
290#[derive(new, Debug, Clone)]
291pub struct Attributes {
292    pub size: Option<u64>,
293    pub user: Option<User>,
294    pub permissions: Option<Permissions>,
295    // atime mtime
296    pub time: Option<Timestamp>,
297    // extended_count: Option<u32>,
298    pub extend: HashMap<String, Vec<u8>>,
299}
300
301impl Attributes {
302    fn to_buffer(&self) -> Buffer<Vec<u8>> {
303        let mut buffer = Buffer::new();
304        self.to_bytes(&mut buffer);
305        buffer
306    }
307
308    fn to_bytes(&self, buffer: &mut Buffer<Vec<u8>>) {
309        let mut flags = 0;
310        let mut tmp = Buffer::new();
311        if let Some(size) = self.size {
312            flags |= SSH_FILEXFER_ATTR_SIZE;
313            tmp.put_u64(size);
314        }
315
316        if let Some(user) = self.user {
317            flags |= SSH_FILEXFER_ATTR_UIDGID;
318            tmp.put_u32(user.uid);
319            tmp.put_u32(user.gid);
320        }
321
322        if let Some(permissions) = self.permissions {
323            flags |= SSH_FILEXFER_ATTR_PERMISSIONS;
324            tmp.put_u32(permissions.bits());
325        }
326
327        if let Some(time) = self.time {
328            flags |= SSH_FILEXFER_ATTR_ACMODTIME;
329            tmp.put_u32(time.atime);
330            tmp.put_u32(time.mtime);
331        }
332
333        let count = self.extend.len() as u32;
334
335        tmp.put_u32(count);
336
337        for (k, v) in &self.extend {
338            tmp.put_one(k);
339            tmp.put_one(v);
340        }
341        buffer.put_u32(flags);
342        buffer.put_bytes(tmp);
343    }
344
345    fn parse(buffer: &mut Buffer<Cell<&[u8]>>) -> Option<Self> {
346        let flags = buffer.take_u32()?;
347
348        let mut size = None;
349        let mut user = None;
350        let mut permissions = None;
351        let mut time = None;
352
353        let mut extend = HashMap::new();
354
355        if flags & SSH_FILEXFER_ATTR_SIZE != 0 {
356            size = Some(buffer.take_u64()?)
357        }
358
359        if flags & SSH_FILEXFER_ATTR_UIDGID != 0 {
360            let uid = buffer.take_u32()?;
361            let gid = buffer.take_u32()?;
362            user = Some(User::new(uid, gid))
363        }
364
365        if flags & SSH_FILEXFER_ATTR_PERMISSIONS != 0 {
366            let per = buffer.take_u32()?;
367            permissions = Some(Permissions::from_bits_retain(per))
368        }
369
370        if flags & SSH_FILEXFER_ATTR_ACMODTIME != 0 {
371            let atime = buffer.take_u32()?;
372            let mtime = buffer.take_u32()?;
373
374            time = Some(Timestamp::new(atime, mtime))
375        }
376
377        if flags & SSH_FILEXFER_ATTR_EXTENDED != 0 {
378            let ecount = buffer.take_u32()?;
379
380            for _ in 0..ecount {
381                let (_, key) = buffer.take_one()?;
382                let (_, value) = buffer.take_one()?;
383
384                extend.insert(std::str::from_utf8(key).ok()?.to_string(), value.to_vec());
385            }
386        }
387
388        Some(Self::new(size, user, permissions, time, extend))
389    }
390}
391
392#[derive(new, Debug, Clone)]
393pub struct FileInfo {
394    pub filename: String,
395    pub longname: String,
396    pub attrs: Attributes,
397}
398
399#[derive(Debug, Clone, Copy)]
400pub struct Limits {
401    pub max_packet_len: u64,
402    pub max_read_len: u64,
403    pub max_write_len: u64,
404    pub max_open_handles: u64,
405}
406
407impl Limits {
408    fn parse(data: &[u8]) -> Option<Self> {
409        let buffer = Buffer::from_slice(data);
410
411        Some(Self {
412            max_packet_len: buffer.take_u64()?,
413            max_read_len: buffer.take_u64()?,
414            max_write_len: buffer.take_u64()?,
415            max_open_handles: buffer.take_u64()?,
416        })
417    }
418}
419
420#[derive(custom_debug_derive::Debug)]
421struct Packet {
422    id: u32,
423    msg: Message,
424}
425
426impl Packet {
427    fn parse(data: &[u8]) -> Option<Packet> {
428        let data = Buffer::from_slice(data);
429        let (_, data) = data.take_one()?;
430
431        let mut data = Buffer::from_slice(data);
432
433        let code = data.take_u8()?;
434        let id = data.take_u32()?;
435
436        let msg = match code {
437            SSH_FXP_HANDLE => {
438                let (_, handle) = data.take_one()?;
439
440                Message::FileHandle(handle.to_vec())
441            }
442            SSH_FXP_STATUS => {
443                let status = data.take_u32()?;
444
445                let (_, msg) = data.take_one()?;
446
447                let (_, tag) = data.take_one()?;
448
449                let msg = std::str::from_utf8(msg).ok()?.to_string();
450
451                let _tag = std::str::from_utf8(tag).ok()?.to_string();
452
453                let status = Status::from_status(status).ok()?;
454                Message::Status { status, msg, _tag }
455            }
456            SSH_FXP_DATA => Message::Data(data.take_one()?.1.to_vec()),
457            SSH_FXP_NAME => {
458                let count = data.take_u32()?;
459                let mut res = Vec::with_capacity(count as usize);
460
461                for _ in 0..count {
462                    let (_, filename) = data.take_one()?;
463                    let (_, longname) = data.take_one()?;
464
465                    let filename = std::str::from_utf8(filename).ok()?.to_string();
466
467                    let longname = std::str::from_utf8(longname).ok()?.to_string();
468
469                    res.push(FileInfo::new(
470                        filename,
471                        longname,
472                        Attributes::parse(&mut data)?,
473                    ));
474                }
475                Message::Name(res)
476            }
477            SSH_FXP_ATTRS => Message::Attributes(Attributes::parse(&mut data)?),
478            SSH_FXP_EXTENDED_REPLY => Message::ExtendReply(data.to_vec()),
479            _ => return None,
480        };
481
482        Some(Packet { id, msg })
483    }
484}
485
486#[derive(Debug, PartialEq)]
487#[repr(u32)]
488enum Status {
489    OK = SSH_FX_OK,
490    Eof = SSH_FX_EOF,
491    NoSuchFile = SSH_FX_NO_SUCH_FILE,
492    PermissionDenied = SSH_FX_PERMISSION_DENIED,
493    Failure = SSH_FX_FAILURE,
494    BadMessage = SSH_FX_BAD_MESSAGE,
495    NoConnection = SSH_FX_NO_CONNECTION,
496    ConnectionLost = SSH_FX_CONNECTION_LOST,
497    OpUnsupported = SSH_FX_OP_UNSUPPORTED,
498}
499
500impl Status {
501    fn from_status(code: u32) -> Result<Self> {
502        Ok(match code {
503            SSH_FX_OK => Self::OK,
504            SSH_FX_EOF => Self::Eof,
505            SSH_FX_NO_SUCH_FILE => Self::NoSuchFile,
506            SSH_FX_PERMISSION_DENIED => Self::PermissionDenied,
507            SSH_FX_FAILURE => Self::Failure,
508            SSH_FX_BAD_MESSAGE => Self::BadMessage,
509            SSH_FX_NO_CONNECTION => Self::NoConnection,
510            SSH_FX_CONNECTION_LOST => Self::ConnectionLost,
511            SSH_FX_OP_UNSUPPORTED => Self::OpUnsupported,
512            _ => return Err(Error::invalid_format("Invalid Sftp status code")),
513        })
514    }
515
516    fn to_result<T: Default>(&self, msg: String) -> Result<T> {
517        match self {
518            Status::OK => Ok(Default::default()),
519            Status::Eof => Ok(Default::default()),
520            Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), //Err(Error::NoSuchFile(msg)),
521            Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), //Err(Error::PermissionDenied(msg)),
522            Status::Failure => builder::SFtpFailure { tip: msg }.fail(), //Err(Error::SFtpFailure(msg)),
523            Status::BadMessage => builder::BadMessage { tip: msg }.fail(), //Err(Error::BadMessage(msg)),
524            Status::NoConnection => builder::NoConnection { tip: msg }.fail(), // Err(Error::NoConnection(msg)),
525            Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), // Err(Error::ConnectionLost(msg)),
526            Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), // Err(Error::OpUnsupported(msg)),
527        }
528    }
529
530    fn no_ok_and_eof<T>(&self, msg: String) -> Result<T> {
531        match self {
532            Status::OK =>
533            /*  Err(Error::ProtocolError(
534                "Unexpected Ok status received".to_string(),
535            )) */
536            {
537                builder::Protocol {
538                    tip: "Unexpected Ok status received",
539                }
540                .fail()
541            }
542            Status::Eof =>
543            /* Err(Error::ProtocolError(
544                "Unexpected EOF status received".to_string(),
545            ))*/
546            {
547                builder::Protocol {
548                    tip: "Unexpected EOF status received",
549                }
550                .fail()
551            }
552            Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), //   Err(Error::NoSuchFile(msg)),
553            Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), // Err(Error::PermissionDenied(msg)),
554            Status::Failure => builder::SFtpFailure { tip: msg }.fail(), // Err(Error::SFtpFailure(msg)),
555            Status::BadMessage => builder::BadMessage { tip: msg }.fail(), // Err(Error::BadMessage(msg)),
556            Status::NoConnection => builder::NoConnection { tip: msg }.fail(), // Err(Error::NoConnection(msg)),
557            Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), // Err(Error::ConnectionLost(msg)),
558            Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), // Err(Error::OpUnsupported(msg)),
559        }
560    }
561
562    fn no_eof<T: Default>(&self, msg: String) -> Result<T> {
563        match self {
564            Status::OK => Ok(Default::default()),
565            Status::Eof =>
566            /* Err(Error::ProtocolError(
567                "Unexpected EOF status received".to_string(),
568            ))*/
569            {
570                builder::Protocol {
571                    tip: "Unexpected EOF status received",
572                }
573                .fail()
574            }
575            Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), //   Err(Error::NoSuchFile(msg)),
576            Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), // Err(Error::PermissionDenied(msg)),
577            Status::Failure => builder::SFtpFailure { tip: msg }.fail(), // Err(Error::SFtpFailure(msg)),
578            Status::BadMessage => builder::BadMessage { tip: msg }.fail(), // Err(Error::BadMessage(msg)),
579            Status::NoConnection => builder::NoConnection { tip: msg }.fail(), // Err(Error::NoConnection(msg)),
580            Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), // Err(Error::ConnectionLost(msg)),
581            Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), // Err(Error::OpUnsupported(msg)),
582        }
583    }
584
585    fn no_ok<T: Default>(&self, msg: String) -> Result<T> {
586        match self {
587            Status::OK =>
588            /* Err(Error::ProtocolError(
589                "Unexpected Ok status received".to_string(),
590            )) */
591            {
592                builder::Protocol {
593                    tip: "Unexpected Ok status received",
594                }
595                .fail()
596            }
597            Status::Eof => Ok(Default::default()),
598            Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), //   Err(Error::NoSuchFile(msg)),
599            Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), // Err(Error::PermissionDenied(msg)),
600            Status::Failure => builder::SFtpFailure { tip: msg }.fail(), // Err(Error::SFtpFailure(msg)),
601            Status::BadMessage => builder::BadMessage { tip: msg }.fail(), // Err(Error::BadMessage(msg)),
602            Status::NoConnection => builder::NoConnection { tip: msg }.fail(), // Err(Error::NoConnection(msg)),
603            Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), // Err(Error::ConnectionLost(msg)),
604            Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), // Err(Error::OpUnsupported(msg)),
605        }
606    }
607}
608
609#[derive(custom_debug_derive::Debug)]
610enum Message {
611    FileHandle(Vec<u8>),
612    Status {
613        status: Status,
614        msg: String,
615        _tag: String,
616    },
617    Data(#[debug(skip)] Vec<u8>),
618    Name(Vec<FileInfo>),
619    Attributes(Attributes),
620    ExtendReply(#[debug(skip)] Vec<u8>),
621}
622
623// struct Stream<'a> {
624//     sftp: &'a mut SFtp,
625// }
626
627impl SFtp {
628    const MAX_SFTP_PACKET: usize = 32000;
629
630    pub fn extend(&self, key: &str) -> Option<&[u8]> {
631        self.ext.get(key).map(|v| v.as_ref())
632    }
633
634    pub fn version(&self) -> u32 {
635        self.version
636    }
637
638    pub async fn from_channel(channel: Channel) -> Result<Self> {
639        let (sender, recver) = o_channel();
640
641        let session = channel.session();
642        let request = Request::SFtpFromChannel { channel, sender };
643
644        session
645            .send(request)
646            .map_err(|_| builder::Disconnected.build())?;
647
648        recver.await?
649    }
650
651    pub async fn close(self) -> Result<()> {
652        self.channel.into_inner().close().await
653    }
654
655    // pub async fn flush(&mut self) -> Result<()> {
656    //     self.channel.flush().await?;
657    //     Ok(())
658    // }
659
660    // pub async fn flush(&self) -> Result<()> {
661    //     self.channel.inner().inner().flush().await
662    //}
663
664    pub fn support_posix_rename(&self) -> bool {
665        self.support(OPENSSH_SFTP_EXT_POSIX_RENAME)
666    }
667
668    pub async fn posix_rename(&mut self, oldpath: &str, newpath: &str) -> Result<()> {
669        debug_assert!(
670            self.support_posix_rename(),
671            "Server doesn't support posix rename"
672        );
673        // let mut buffer = Buffer::new();
674
675        let request_id = self.genarate_request_id();
676
677        // buffer.put_u32(request_id);
678        // buffer.put_one(OPENSSH_SFTP_EXT_POSIX_RENAME.0);
679        // buffer.put_one(oldpath);
680        // buffer.put_one(newpath);
681
682        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
683
684        let buffer = make_buffer! {
685            u8: SSH_FXP_EXTENDED,
686            u32: request_id,
687            one: OPENSSH_SFTP_EXT_POSIX_RENAME.0,
688            one: oldpath,
689            one: newpath,
690        };
691
692        self.channel.write_all(buffer).await?;
693
694        self.wait_for_status(request_id, Status::no_eof).await
695    }
696
697    pub fn support_fstatvfs(&self) -> bool {
698        self.support(OPENSSH_SFTP_EXT_FSTATVFS)
699    }
700
701    pub async fn fstatvfs(&mut self, file: &File) -> Result<Statvfs> {
702        debug_assert!(self.support_fstatvfs(), "Server doesn't support fstatvfs");
703
704        let request_id = self.genarate_request_id();
705        // let mut buffer = Buffer::new();
706        // buffer.put_u32(request_id);
707        // buffer.put_one(OPENSSH_SFTP_EXT_FSTATVFS.0);
708        // buffer.put_one(&file.handle);
709
710        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
711
712        let buffer = make_buffer! {
713            u8: SSH_FXP_EXTENDED,
714            u32: request_id,
715            one: OPENSSH_SFTP_EXT_FSTATVFS.0,
716            one: &file.handle,
717        };
718
719        self.channel.write_all(buffer).await?;
720
721        let packet = self.wait_for_packet(request_id).await?;
722
723        match packet.msg {
724            Message::Status { status, msg, _tag } => status.no_ok_and_eof(msg),
725            Message::ExtendReply(data) => Statvfs::parse(&data).context(builder::Protocol {
726                tip: "Invalid Statvfs Message",
727            }),
728            _ => builder::Protocol {
729                tip: "Unexpected SFtp Message",
730            }
731            .fail(), // _ => Err(Error::ProtocolError("Unexpected SFtp Message".to_string())),
732        }
733    }
734
735    pub fn support_statvfs(&self) -> bool {
736        self.support(OPENSSH_SFTP_EXT_STATVFS)
737    }
738
739    pub async fn statvfs(&mut self, path: &str) -> Result<Statvfs> {
740        debug_assert!(self.support_fstatvfs(), "Server doesn't support statvfs");
741
742        let request_id = self.genarate_request_id();
743        // let mut buffer = Buffer::new();
744        // buffer.put_u32(request_id);
745        // buffer.put_one(OPENSSH_SFTP_EXT_STATVFS.0);
746        // buffer.put_one(path);
747
748        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
749
750        let buffer = make_buffer! {
751            u8: SSH_FXP_EXTENDED,
752            u32: request_id,
753            one: OPENSSH_SFTP_EXT_STATVFS.0,
754            one: path,
755        };
756
757        self.channel.write_all(buffer).await?;
758
759        let packet = self.wait_for_packet(request_id).await?;
760
761        match packet.msg {
762            Message::Status { status, msg, _tag } => status.no_ok_and_eof(msg),
763            Message::ExtendReply(data) => Statvfs::parse(&data).context(builder::Protocol {
764                tip: "Invalid Statvfs Message",
765            }),
766            _ => builder::Protocol {
767                tip: "Unexpected SFtp Message",
768            }
769            .fail(), // _ => Err(Error::ProtocolError("Unexpected SFtp Message".to_string())),
770        }
771    }
772
773    pub fn support_hardlink(&self) -> bool {
774        self.support(OPENSSH_SFTP_EXT_HARDLINK)
775    }
776
777    pub async fn hardlink(&mut self, oldpath: &str, newpath: &str) -> Result<()> {
778        debug_assert!(self.support_hardlink(), "Server doesn't support hardlink");
779
780        let request_id = self.genarate_request_id();
781        // let mut buffer = Buffer::new();
782        // buffer.put_u32(request_id);
783        // buffer.put_one(OPENSSH_SFTP_EXT_HARDLINK.0);
784        // buffer.put_one(oldpath);
785        // buffer.put_one(newpath);
786
787        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
788
789        let buffer = make_buffer! {
790            u8: SSH_FXP_EXTENDED,
791            u32: request_id,
792            one: OPENSSH_SFTP_EXT_HARDLINK.0,
793            one: oldpath,
794            one: newpath,
795        };
796
797        self.channel.write_all(buffer).await?;
798
799        self.wait_for_status(request_id, Status::no_eof).await
800    }
801
802    pub fn support_fsync(&self) -> bool {
803        self.support(OPENSSH_SFTP_EXT_FSYNC)
804    }
805
806    pub async fn fsync(&mut self, file: &File) -> Result<()> {
807        debug_assert!(self.support_fsync(), "Server doesn't support fsync");
808
809        let request_id = self.genarate_request_id();
810        // let mut buffer = Buffer::new();
811        // buffer.put_u32(request_id);
812        // buffer.put_one(OPENSSH_SFTP_EXT_FSYNC.0);
813        // buffer.put_one(&file.handle);
814
815        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
816
817        let buffer = make_buffer! {
818            u8: SSH_FXP_EXTENDED,
819            u32: request_id,
820            one: OPENSSH_SFTP_EXT_FSYNC.0,
821            one: &file.handle,
822        };
823
824        self.channel.write_all(buffer).await?;
825
826        self.wait_for_status(request_id, Status::no_eof).await
827    }
828
829    pub fn support_lsetstat(&self) -> bool {
830        self.support(OPENSSH_SFTP_EXT_LSETSTAT)
831    }
832
833    pub async fn lsetstat(&mut self, path: &str, attrs: &Attributes) -> Result<()> {
834        debug_assert!(self.support_lsetstat(), "Server doesn't lsetstat");
835
836        let request_id = self.genarate_request_id();
837        // let mut buffer = Buffer::new();
838        // buffer.put_u32(request_id);
839        // buffer.put_one(OPENSSH_SFTP_EXT_LSETSTAT.0);
840        // buffer.put_one(path);
841        // attrs.to_bytes(&mut buffer);
842
843        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
844
845        let attrs = attrs.to_buffer();
846
847        let buffer = make_buffer! {
848            u8: SSH_FXP_EXTENDED,
849            u32: request_id,
850            one: OPENSSH_SFTP_EXT_LSETSTAT.0,
851            one: path,
852            bytes: attrs,
853        };
854
855        self.channel.write_all(buffer).await?;
856
857        self.wait_for_status(request_id, Status::no_eof).await
858    }
859
860    pub fn support_limits(&self) -> bool {
861        self.support(OPENSSH_SFTP_EXT_LIMITS)
862    }
863
864    pub async fn limits(&mut self) -> Result<Limits> {
865        debug_assert!(self.support_limits(), "Server doesn't support limits");
866        let request_id = self.genarate_request_id();
867        // let mut buffer = Buffer::new();
868        // buffer.put_one(OPENSSH_SFTP_EXT_LIMITS.0);
869
870        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
871
872        let buffer = make_buffer! {
873            u8: SSH_FXP_EXTENDED,
874            u32: request_id,
875            one: OPENSSH_SFTP_EXT_LIMITS.0
876        };
877
878        self.channel.write_all(buffer).await?;
879
880        let packet = self.wait_for_packet(request_id).await?;
881
882        match packet.msg {
883            Message::ExtendReply(data) => Limits::parse(&data).context(builder::Protocol {
884                tip: "Invalid packet format",
885            }),
886            _ => builder::Protocol {
887                tip: "Unexpected SFtp Message",
888            }
889            .fail(),
890        }
891    }
892
893    pub fn support_expand_path(&self) -> bool {
894        self.support(OPENSSH_SFTP_EXT_EXPAND_PATH)
895    }
896
897    pub async fn expand_path(&mut self, path: &str) -> Result<String> {
898        debug_assert!(
899            self.support_expand_path(),
900            "Server doesn't support expand path"
901        );
902
903        let request_id = self.genarate_request_id();
904        // let mut buffer = Buffer::new();
905        // buffer.put_u32(request_id);
906        // buffer.put_one(OPENSSH_SFTP_EXT_EXPAND_PATH.0);
907        // buffer.put_one(path);
908
909        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
910
911        let buffer = make_buffer! {
912            u8: SSH_FXP_EXTENDED,
913            u32: request_id,
914            one: OPENSSH_SFTP_EXT_EXPAND_PATH.0,
915            one: path
916        };
917
918        self.channel.write_all(buffer).await?;
919
920        let packet = self.wait_for_packet(request_id).await?;
921
922        match packet.msg {
923            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
924            Message::Name(infos) if infos.len() == 1 => Ok(infos[0].filename.clone()),
925            // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
926            _ => builder::Protocol { tip: "Unknown msg" }.fail(),
927        }
928    }
929
930    pub fn support_copy_data(&self) -> bool {
931        self.support(OPENSSH_SFTP_EXT_COPY_DATA)
932    }
933
934    pub async fn copy_data(&mut self, read: &mut File, len: u64, write: &mut File) -> Result<()> {
935        debug_assert!(self.support_copy_data(), "Server doesn't support copy data");
936
937        let request_id = self.genarate_request_id();
938        // let mut buffer = Buffer::new();
939        // buffer.put_u32(request_id);
940        // buffer.put_one(OPENSSH_SFTP_EXT_COPY_DATA.0);
941        // buffer.put_one(&read.handle);
942        // buffer.put_u64(read.pos);
943        // buffer.put_u64(len);
944        // buffer.put_one(&write.handle);
945        // buffer.put_u64(write.pos);
946
947        // self.send(SSH_FXP_EXTENDED, buffer.as_ref()).await?;
948
949        let buffer = make_buffer! {
950            u8: SSH_FXP_EXTENDED,
951            u32: request_id,
952            one: OPENSSH_SFTP_EXT_COPY_DATA.0,
953            one: &read.handle,
954            u64: read.pos,
955            u64: len,
956            one: &write.handle,
957            u64: write.pos,
958        };
959
960        self.channel.write_all(buffer).await?;
961
962        let status = self.wait_for_status(request_id, Status::to_result).await;
963
964        if status.is_ok() {
965            read.pos += len;
966            write.pos += len;
967        }
968
969        status
970    }
971
972    pub fn support_home_directory(&self) -> bool {
973        self.support(OPENSSH_SFTP_EXT_HOME_DIRECTORY)
974    }
975
976    pub async fn home_directory(&mut self, username: &str) -> Result<String> {
977        debug_assert!(
978            self.support_home_directory(),
979            "Server doesn't support home directory"
980        );
981
982        let request_id = self.genarate_request_id();
983        // cap: 4 + 1 + 4 + 4 + xx.len() + 4 + username.len()
984        // let mut buffer = Buffer::with_capacity(
985        //     4 + 1 + 4 + 4 + OPENSSH_SFTP_EXT_HOME_DIRECTORY.0.len() + 4 + username.len(),
986        // );
987        // buffer.put_u32(
988        //     (4 + 1 + 4 + 4 + OPENSSH_SFTP_EXT_HOME_DIRECTORY.0.len() + 4 + username.len()) as u32,
989        // );
990        // buffer.put_u8(SSH_FXP_EXTENDED);
991        // buffer.put_u32(request_id);
992        // buffer.put_one(OPENSSH_SFTP_EXT_HOME_DIRECTORY.0);
993        // buffer.put_one(username);
994
995        // self.write(buffer).await?;
996
997        let buffer = make_buffer! {
998            u8: SSH_FXP_EXTENDED,
999            u32: request_id,
1000            one: OPENSSH_SFTP_EXT_HOME_DIRECTORY.0,
1001            one: username
1002        };
1003
1004        self.channel.write_all(buffer).await?;
1005
1006        let packet = self.wait_for_packet(request_id).await?;
1007
1008        match packet.msg {
1009            Message::Status { status, msg, _tag } => status.no_ok_and_eof(msg),
1010            Message::Name(infos) if infos.len() == 1 => Ok(infos[0].filename.clone()),
1011            // _ => Err(Error::ProtocolError("Unexpected message".to_string())),
1012            _ => builder::Protocol {
1013                tip: "Unexpected message",
1014            }
1015            .fail(),
1016        }
1017    }
1018
1019    pub fn support_users_groups_by_id(&self) -> bool {
1020        self.support(OPENSSH_SFTP_EXT_USERS_GROUPS_BY_ID)
1021    }
1022
1023    pub async fn users_groups_by_id(
1024        &mut self,
1025        users: &[u32],
1026        groups: &[u32],
1027    ) -> Result<(Vec<String>, Vec<String>)> {
1028        let request_id = self.genarate_request_id();
1029        let cap = 4
1030            + 1
1031            + 4
1032            + OPENSSH_SFTP_EXT_USERS_GROUPS_BY_ID.0.len()
1033            + 4
1034            + users.len() * 4
1035            + 4
1036            + groups.len() * 4;
1037        let mut buffer = Buffer::with_capacity(cap);
1038        buffer.put_u32((cap - 4) as u32);
1039        buffer.put_u8(SSH_FXP_EXTENDED);
1040        buffer.put_u32(request_id);
1041        buffer.put_one(OPENSSH_SFTP_EXT_USERS_GROUPS_BY_ID.0);
1042
1043        buffer.put_u32((users.len() * 4) as u32);
1044
1045        users.iter().for_each(|v| {
1046            buffer.put_u32(*v);
1047        });
1048
1049        buffer.put_u32((groups.len() * 4) as u32);
1050
1051        groups.iter().for_each(|v| {
1052            buffer.put_u32(*v);
1053        });
1054
1055        self.channel.write_all(buffer).await?;
1056
1057        let packet = self.wait_for_packet(request_id).await?;
1058
1059        match packet.msg {
1060            Message::ExtendReply(data) => {
1061                let buffer = Buffer::from_slice(&data);
1062                let usernames = buffer
1063                    .take_one()
1064                    .context(builder::Protocol {
1065                        tip: "Invalid sftp packet format",
1066                    })?
1067                    .1;
1068                let usernames = Buffer::from_slice(usernames);
1069
1070                let groupnames = buffer
1071                    .take_one()
1072                    .context(builder::Protocol {
1073                        tip: "Invalid sftp packet format",
1074                    })?
1075                    .1;
1076                let groupnames = Buffer::from_slice(groupnames);
1077
1078                let mut unames = vec![];
1079                while let Some(user) = usernames.take_one() {
1080                    // unames.push(String::from_utf8(user.1).map_err(|e| e.utf8_error())?);
1081                    unames.push(std::str::from_utf8(user.1)?.to_string())
1082                }
1083
1084                let mut gnames = vec![];
1085
1086                while let Some(group) = groupnames.take_one() {
1087                    // gnames.push(String::from_utf8(group.1).map_err(|e| e.utf8_error())?)
1088                    gnames.push(std::str::from_utf8(group.1)?.to_string());
1089                }
1090
1091                Ok((unames, gnames))
1092            }
1093            _ => builder::Protocol {
1094                tip: "Unexpected message",
1095            }
1096            .fail(), //Err(Error::ProtocolError("Unexpected message".to_string())),
1097        }
1098    }
1099
1100    fn support(&self, (e, v): (&str, &[u8])) -> bool {
1101        self.ext.get(e).map(|v| v.as_ref()) == Some(v)
1102    }
1103
1104    // pub async fn close(mut self) -> Result<()> {
1105    //     self.closed = true;
1106    //     let (sender, recver) = async_channel::bounded(1);
1107    //     self.session
1108    //         .send(Request::ChannelDrop {
1109    //             id: self.id,
1110    //             sender: Some(sender),
1111    //         })
1112    //         .await
1113    //         .map_err(|_| Error::Disconnect)?;
1114    //     recver.recv().await.map_err(|_| Error::Disconnect)?
1115    // }
1116
1117    pub fn seek_file(&self, file: &mut File, pos: u64) {
1118        file.pos = pos;
1119    }
1120
1121    pub async fn close_file(&mut self, file: File) -> Result<()> {
1122        let request_id = self.genarate_request_id();
1123
1124        // let mut buffer = Buffer::new();
1125
1126        // buffer.put_u32(request_id);
1127        // buffer.put_one(file.handle);
1128
1129        // self.send(SSH_FXP_CLOSE, buffer.as_ref()).await?;
1130
1131        let buffer = make_buffer! {
1132            u8: SSH_FXP_CLOSE,
1133            u32: request_id,
1134            one: file.handle,
1135        };
1136
1137        self.channel.write_all(buffer).await?;
1138
1139        self.wait_for_status(request_id, Status::no_eof).await
1140    }
1141
1142    pub async fn read_file_buf(&mut self, file: &mut File, max: u32) -> Result<Vec<Vec<u8>>> {
1143        let base = 255 * 1024;
1144
1145        let mut times = max / base;
1146        if times == 0 {
1147            times = 1;
1148        }
1149
1150        let mut requests = Vec::with_capacity(times as usize);
1151
1152        let mut datas = Vec::with_capacity(times as usize);
1153
1154        let mut all =
1155            Vec::with_capacity(times as usize * (4 + 1 + 4 + 4 + file.handle.len() + 8 + 4));
1156
1157        let mut pos = file.pos;
1158        for _ in 0..times {
1159            let request_id = self.genarate_request_id();
1160
1161            let buffer = make_buffer! {
1162                u8: SSH_FXP_READ,
1163                u32: request_id,
1164                one: &file.handle,
1165                u64: pos,
1166                u32: base
1167            };
1168
1169            all.extend(buffer.into_vec());
1170
1171            requests.push(request_id);
1172
1173            pos += base as u64;
1174        }
1175        // let first = std::time::Instant::now();
1176        self.channel.write_all(all).await?;
1177
1178        // println!("spent1: {}", first.elapsed().as_millis());
1179        // let mut first = std::time::Instant::now();
1180
1181        for i in requests {
1182            let packet = self.wait_for_packet(i).await?;
1183            // println!("spent2: {}", first.elapsed().as_millis());
1184            // first = std::time::Instant::now();
1185
1186            match packet.msg {
1187                Message::Data(data) => {
1188                    file.pos += data.len() as u64;
1189                    datas.push(data);
1190                }
1191                Message::Status {
1192                    status: Status::Eof,
1193                    ..
1194                } => {
1195                    datas.push(vec![]);
1196                }
1197                Message::Status { status, msg, .. } => return status.no_ok(msg),
1198                _ => return builder::Protocol { tip: "Unknown msg" }.fail(), // return Err(Error::ProtocolError("Unknown msg".to_string())),
1199            }
1200        }
1201
1202        Ok(datas)
1203    }
1204
1205    pub async fn read_file(&mut self, file: &mut File, max: u32) -> Result<Vec<u8>> {
1206        let request_id = self.genarate_request_id();
1207
1208        // // cap: 4 + 1 + 4 + (4 + file.handle.len()) + 8 + 4
1209        // let len = 1 + 4 + 4 + file.handle.len() + 8 + 4;
1210        // let mut buffer = Buffer::with_capacity(4 + len);
1211
1212        // buffer.put_u32(len as u32);
1213        // buffer.put_u8(SSH_FXP_READ);
1214
1215        // buffer.put_u32(request_id);
1216
1217        // buffer.put_one(&file.handle);
1218        // buffer.put_u64(file.pos);
1219
1220        // buffer.put_u32(max);
1221
1222        // self.write(buffer).await?;
1223
1224        let buffer = make_buffer! {
1225            u8: SSH_FXP_READ,
1226            u32: request_id,
1227            one: &file.handle,
1228            u64: file.pos,
1229            u32: max
1230        };
1231
1232        self.channel.write_all(buffer).await?;
1233
1234        // self.send(SSH_FXP_READ, buffer.as_ref()).await?;
1235
1236        let packet = self.wait_for_packet(request_id).await?;
1237
1238        match packet.msg {
1239            Message::Data(data) => {
1240                file.pos += data.len() as u64;
1241                Ok(data)
1242            }
1243            Message::Status { status, msg, .. } => status.no_ok(msg),
1244            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1245        }
1246    }
1247
1248    // async fn send(&mut self, code: u8, bytes: &[u8]) -> Result<()> {
1249    //     // let mut packet = Buffer::with_capacity(4 + 1 + bytes.len());
1250
1251    //     // packet.put_u32((bytes.len() + 1) as u32);
1252    //     // packet.put_u8(code);
1253    //     // packet.put_bytes(bytes);
1254
1255    //     let packet = make_buffer! {
1256    //         u32: (bytes.len() + 1) as u32,
1257    //         u8: code,
1258    //         bytes: bytes,
1259    //     };
1260
1261    //     self.write(packet).await?;
1262    //     Ok(())
1263    // }
1264
1265    pub async fn write_file_buf(&mut self, file: &mut File, data: &[u8]) -> Result<()> {
1266        if data.is_empty() {
1267            return Ok(());
1268        }
1269        let max = Self::MAX_SFTP_PACKET;
1270        let mut requests = vec![];
1271        // cap: 4 + 1 + 4 + 4 + file.handle.len() + 8 + 4 + data
1272        let mut buffer =
1273            Buffer::with_capacity(4 + 1 + 4 + 4 + file.handle.len() + 8 + 4 + min(max, data.len()));
1274        for i in (0..data.len()).step_by(max) {
1275            let left = data.len() - i;
1276
1277            let min = min(left, max);
1278
1279            let request_id = self.genarate_request_id();
1280
1281            buffer.put_u32((1 + 4 + 4 + file.handle.len() + 8 + 4 + min) as u32);
1282            buffer.put_u8(SSH_FXP_WRITE);
1283            buffer.put_u32(request_id);
1284            buffer.put_one(&file.handle);
1285            buffer.put_u64(file.pos);
1286            buffer.put_one(&data[i..i + min]);
1287
1288            // self.channel.write_all(&buffer).await?;
1289
1290            self.channel.write(&buffer).await?;
1291
1292            requests.push(request_id);
1293            file.pos += min as u64;
1294            buffer.clear();
1295        }
1296
1297        self.channel.flush().await?;
1298
1299        for id in requests {
1300            self.wait_for_status(id, Status::no_eof).await?;
1301        }
1302
1303        Ok(())
1304    }
1305
1306    pub async fn write_file(&mut self, file: &mut File, data: &[u8]) -> Result<()> {
1307        let max = Self::MAX_SFTP_PACKET;
1308        for i in (0..data.len()).step_by(max) {
1309            let left = data.len() - i;
1310
1311            let min = min(left, max);
1312
1313            self.write_file_unchecked(file, &data[i..i + min]).await?;
1314        }
1315
1316        Ok(())
1317    }
1318
1319    async fn write_file_unchecked(&mut self, file: &mut File, data: &[u8]) -> Result<()> {
1320        // ssh最大数据包检查
1321        // cap: 4 + 1 + 4 + file.handle.len() + 8 + 4 + data.len()
1322
1323        let request_id = self.genarate_request_id();
1324
1325        let buffer = make_buffer! {
1326            u8: SSH_FXP_WRITE,
1327            u32: request_id,
1328            one: &file.handle,
1329            u64: file.pos,
1330            one: data
1331        };
1332
1333        self.channel.write_all(buffer).await?;
1334
1335        let res = self.wait_for_status(request_id, Status::no_eof).await;
1336        if res.is_ok() {
1337            file.pos += data.len() as u64;
1338        }
1339        res
1340    }
1341
1342    async fn wait_for_status<T, B>(&mut self, id: u32, f: T) -> Result<B>
1343    where
1344        T: FnOnce(&Status, String) -> Result<B>,
1345    {
1346        let packet = self.wait_for_packet(id).await?;
1347
1348        match packet.msg {
1349            // Message::Status { status, .. } if status == Status::OK => Ok(()),
1350            Message::Status { status, msg, .. } => f(&status, msg),
1351            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1352                                                                  // _ => Err(Error::ProtocolError("Unknown msg received".to_string())),
1353        }
1354    }
1355
1356    pub async fn mkdir(&mut self, path: &str, permissions: Permissions) -> Result<()> {
1357        let request_id = self.genarate_request_id();
1358        let flags = SSH_FILEXFER_ATTR_PERMISSIONS;
1359        let permissions_bits = permissions.bits();
1360
1361        let buffer = make_buffer! {
1362            u8: SSH_FXP_MKDIR,
1363            u32: request_id,
1364            one: path,
1365            u32: flags,
1366            u32: permissions_bits,
1367        };
1368
1369        self.channel.write_all(buffer).await?;
1370
1371        self.wait_for_status(request_id, Status::no_eof).await
1372    }
1373
1374    pub async fn rmdir(&mut self, path: &str) -> Result<()> {
1375        let request_id = self.genarate_request_id();
1376
1377        let buffer = make_buffer! {
1378            u8: SSH_FXP_RMDIR,
1379            u32: request_id,
1380            one: path,
1381        };
1382
1383        self.channel.write_all(buffer).await?;
1384
1385        self.wait_for_status(request_id, Status::no_ok).await
1386    }
1387
1388    pub async fn open_dir(&mut self, path: &str) -> Result<Dir> {
1389        let request_id = self.genarate_request_id();
1390
1391        let buffer = make_buffer! {
1392            u8: SSH_FXP_OPENDIR,
1393            u32: request_id,
1394            one: path,
1395        };
1396
1397        self.channel.write_all(buffer).await?;
1398
1399        let packet = self.wait_for_packet(request_id).await?;
1400
1401        match packet.msg {
1402            Message::FileHandle(handle) => Ok(Dir::new(handle)),
1403            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1404
1405            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1406        }
1407    }
1408
1409    pub async fn close_dir(&mut self, dir: Dir) -> Result<()> {
1410        let request_id = self.genarate_request_id();
1411
1412        // let mut buffer = Buffer::new();
1413
1414        // buffer.put_u32(request_id);
1415        // buffer.put_one(dir.handle);
1416
1417        // self.send(SSH_FXP_CLOSE, buffer.as_ref()).await?;
1418
1419        let buffer = make_buffer! {
1420            u8: SSH_FXP_CLOSE,
1421            u32: request_id,
1422            one: dir.handle,
1423        };
1424
1425        self.channel.write_all(buffer).await?;
1426
1427        self.wait_for_status(request_id, Status::no_eof).await
1428    }
1429
1430    pub async fn read_dir(&mut self, dir: &Dir) -> Result<Vec<FileInfo>> {
1431        let request_id = self.genarate_request_id();
1432
1433        let buffer = make_buffer! {
1434            u8: SSH_FXP_READDIR,
1435            u32: request_id,
1436            one: &dir.handle,
1437        };
1438
1439        self.channel.write_all(buffer).await?;
1440
1441        let packet = self.wait_for_packet(request_id).await?;
1442
1443        match packet.msg {
1444            Message::Status { status, msg, .. } => status.no_ok(msg),
1445            Message::Name(infos) => Ok(infos),
1446            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1447                                                                  // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
1448        }
1449    }
1450
1451    pub async fn stat(&mut self, path: &str) -> Result<Attributes> {
1452        let request_id = self.genarate_request_id();
1453
1454        let buffer = make_buffer! {
1455            u8: SSH_FXP_STAT,
1456            u32: request_id,
1457            one: path,
1458        };
1459
1460        self.channel.write_all(buffer).await?;
1461
1462        let packet = self.wait_for_packet(request_id).await?;
1463
1464        match packet.msg {
1465            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1466            Message::Attributes(attrs) => Ok(attrs),
1467            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1468                                                                  // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
1469        }
1470    }
1471
1472    pub async fn lstat(&mut self, path: &str) -> Result<Attributes> {
1473        let request_id = self.genarate_request_id();
1474
1475        let buffer = make_buffer! {
1476            u8: SSH_FXP_LSTAT,
1477            u32: request_id,
1478            one: path,
1479        };
1480
1481        self.channel.write_all(buffer).await?;
1482
1483        let packet = self.wait_for_packet(request_id).await?;
1484
1485        match packet.msg {
1486            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1487            Message::Attributes(attrs) => Ok(attrs),
1488            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1489                                                                  // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
1490        }
1491    }
1492
1493    pub async fn fstat(&mut self, file: &File) -> Result<Attributes> {
1494        let request_id = self.genarate_request_id();
1495        let buffer = make_buffer! {
1496            u8: SSH_FXP_FSTAT,
1497            u32: request_id,
1498            one: &file.handle,
1499        };
1500
1501        self.channel.write_all(buffer).await?;
1502
1503        let packet = self.wait_for_packet(request_id).await?;
1504
1505        match packet.msg {
1506            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1507            Message::Attributes(attrs) => Ok(attrs),
1508            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1509                                                                  // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
1510        }
1511    }
1512
1513    pub async fn setstat(&mut self, path: &str, attrs: &Attributes) -> Result<()> {
1514        let request_id = self.genarate_request_id();
1515
1516        let attrs = attrs.to_buffer();
1517
1518        // let mut buffer = Buffer::new();
1519        // buffer.put_u32(request_id);
1520        // buffer.put_one(path);
1521
1522        // attrs.to_bytes(&mut buffer);
1523
1524        let buffer = make_buffer! {
1525            u8: SSH_FXP_SETSTAT,
1526            u32: request_id,
1527            one: path,
1528            bytes: attrs,
1529        };
1530
1531        self.channel.write_all(buffer).await?;
1532
1533        self.wait_for_status(request_id, Status::no_eof).await
1534    }
1535
1536    pub async fn setfstat(&mut self, file: &File, attrs: &Attributes) -> Result<()> {
1537        let request_id = self.genarate_request_id();
1538
1539        let attrs = attrs.to_buffer();
1540        // let mut buffer = Buffer::new();
1541        // buffer.put_u32(request_id);
1542        // buffer.put_one(&file.handle);
1543
1544        // attrs.to_bytes(&mut buffer);
1545
1546        // self.send(SSH_FXP_FSETSTAT, buffer.as_ref()).await?;
1547
1548        let buffer = make_buffer! {
1549            u8: SSH_FXP_FSETSTAT,
1550            u32: request_id,
1551            one: &file.handle,
1552            bytes: attrs,
1553        };
1554
1555        self.channel.write_all(buffer).await?;
1556
1557        self.wait_for_status(request_id, Status::no_eof).await
1558    }
1559
1560    pub async fn readlink(&mut self, path: &str) -> Result<FileInfo> {
1561        let request_id = self.genarate_request_id();
1562
1563        // let mut buffer = Buffer::new();
1564        // buffer.put_u32(request_id);
1565        // buffer.put_one(path);
1566
1567        // self.send(SSH_FXP_READLINK, buffer.as_ref()).await?;
1568
1569        let buffer = make_buffer! {
1570            u8: SSH_FXP_READLINK,
1571            u32: request_id,
1572            one: path,
1573        };
1574
1575        self.channel.write_all(buffer).await?;
1576
1577        let packet = self.wait_for_packet(request_id).await?;
1578
1579        match packet.msg {
1580            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1581            Message::Name(mut infos) if infos.len() == 1 => Ok(infos.remove(0)),
1582            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1583                                                                  // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
1584        }
1585    }
1586
1587    pub async fn symlink(&mut self, linkpath: &str, targetpath: &str) -> Result<()> {
1588        let request_id = self.genarate_request_id();
1589
1590        // let mut buffer = Buffer::new();
1591        // buffer.put_u32(request_id);
1592        // buffer.put_one(linkpath);
1593        // buffer.put_one(targetpath);
1594
1595        // self.send(SSH_FXP_SYMLINK, buffer.as_ref()).await?;
1596
1597        let buffer = make_buffer! {
1598            u8: SSH_FXP_SYMLINK,
1599            u32: request_id,
1600            one: linkpath,
1601            one: targetpath,
1602        };
1603
1604        self.channel.write_all(buffer).await?;
1605
1606        self.wait_for_status(request_id, Status::no_eof).await
1607    }
1608
1609    pub async fn realpath(&mut self, path: &str) -> Result<String> {
1610        let request_id = self.genarate_request_id();
1611
1612        // let mut buffer = Buffer::new();
1613        // buffer.put_u32(request_id);
1614        // buffer.put_one(path);
1615
1616        // self.send(SSH_FXP_REALPATH, buffer.as_ref()).await?;
1617
1618        let buffer = make_buffer! {
1619            u8: SSH_FXP_REALPATH,
1620            u32: request_id,
1621            one: path,
1622        };
1623
1624        self.channel.write_all(buffer).await?;
1625
1626        let packet = self.wait_for_packet(request_id).await?;
1627
1628        match packet.msg {
1629            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1630            Message::Name(infos) if infos.len() == 1 => Ok(infos[0].filename.clone()),
1631            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1632                                                                  // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
1633        }
1634    }
1635
1636    pub async fn rename_file_or_dir(&mut self, old: &str, new: &str) -> Result<()> {
1637        let request_id = self.genarate_request_id();
1638
1639        // let mut buffer = Buffer::new();
1640        // buffer.put_u32(request_id);
1641        // buffer.put_one(old);
1642        // buffer.put_one(new);
1643
1644        // self.send(SSH_FXP_RENAME, buffer.as_ref()).await?;
1645
1646        let buffer = make_buffer! {
1647            u8: SSH_FXP_RENAME,
1648            u32: request_id,
1649            one: old,
1650            one: new,
1651        };
1652
1653        self.channel.write_all(buffer).await?;
1654
1655        self.wait_for_status(request_id, Status::no_eof).await
1656    }
1657
1658    pub async fn remove_file(&mut self, file: &str) -> Result<()> {
1659        let request_id = self.genarate_request_id();
1660
1661        // let mut buffer = Buffer::new();
1662        // buffer.put_u32(request_id);
1663        // buffer.put_one(file);
1664
1665        // self.send(SSH_FXP_REMOVE, buffer.as_ref()).await?;
1666
1667        let buffer = make_buffer! {
1668            u8: SSH_FXP_REMOVE,
1669            u32: request_id,
1670            one: file,
1671        };
1672
1673        self.channel.write_all(buffer).await?;
1674
1675        self.wait_for_status(request_id, Status::no_eof).await
1676    }
1677
1678    pub async fn open_file(
1679        &mut self,
1680        filename: &str,
1681        flags: OpenFlags,
1682        permissions: Option<Permissions>,
1683    ) -> Result<File> {
1684        let request_id = self.genarate_request_id();
1685        let mut flag = 0;
1686
1687        let mut tmp = Buffer::new();
1688        if let Some(permissions) = permissions {
1689            flag |= SSH_FILEXFER_ATTR_PERMISSIONS;
1690            tmp.put_u32(permissions.bits());
1691        }
1692
1693        // let mut buffer = Buffer::new();
1694
1695        // buffer.put_u32(request_id);
1696        // buffer.put_one(filename);
1697        // buffer.put_u32(flags.bits());
1698
1699        // buffer.put_u32(flag);
1700        // buffer.put_bytes(tmp);
1701
1702        // self.send(SSH_FXP_OPEN, buffer.as_ref()).await?;
1703
1704        let openflags = flags.bits();
1705        let buffer = make_buffer! {
1706            u8: SSH_FXP_OPEN,
1707            u32: request_id,
1708            one: filename,
1709            u32: openflags,
1710            u32: flag,
1711            bytes: tmp,
1712        };
1713
1714        self.channel.write_all(buffer).await?;
1715
1716        let packet = self.wait_for_packet(request_id).await?;
1717
1718        match packet.msg {
1719            Message::FileHandle(handle) => Ok(File::new(handle)),
1720            Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1721
1722            _ => builder::Protocol { tip: "Unknown msg" }.fail(), //Err(Error::ProtocolError("Unknown msg".to_string())),
1723                                                                  // _ => Err(Error::ProtocolError("Unknown msg".to_string())),
1724        }
1725    }
1726
1727    pub fn file_streamer<'a>(&'a mut self, file: &'a mut File) -> Stream<'a> {
1728        Stream {
1729            sftp: self,
1730            file,
1731            read_future: None,
1732            write_future: None,
1733        }
1734    }
1735
1736    async fn wait_for_packet(&mut self, id: u32) -> Result<Packet> {
1737        let packet = self.packets.remove(&id);
1738        if let Some(packet) = packet {
1739            return Ok(packet);
1740        }
1741        loop {
1742            let packet = self.recv().await?;
1743            if packet.id == id {
1744                return Ok(packet);
1745            }
1746            self.packets.insert(packet.id, packet);
1747        }
1748    }
1749
1750    async fn recv(&mut self) -> Result<Packet> {
1751        let data = self.channel.fill(4).await?;
1752
1753        let len = u32::from_be_bytes(data.try_into().unwrap());
1754
1755        let data = self.channel.fill(4 + len as usize).await?;
1756
1757        let res = Packet::parse(data).context(builder::InvalidArgument {
1758            tip: "Unable to parse sftp packet",
1759        });
1760        self.channel.consume(4 + len as usize);
1761        res
1762    }
1763
1764    fn genarate_request_id(&mut self) -> u32 {
1765        self.request_id = self.request_id.wrapping_add(1);
1766        self.request_id
1767    }
1768
1769    // async fn write(&mut self, data: impl AsRef<[u8]>) -> Result<()> {
1770    //     if !self.channel.write(data.as_ref()).await? {
1771    //         self.channel.flush().await?;
1772    //     }
1773    //     Ok(())
1774    // }
1775}