1use std::cell::Cell;
2use std::cmp::min;
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::io;
6use std::mem::transmute;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9
10use derive_new::new;
11use snafu::OptionExt;
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13
14use crate::channel::{BufferChannel, Channel};
15use crate::error::{builder, Result};
16use crate::msg::Request;
17use crate::ssh::common::*;
18use crate::BoxFuture;
19use crate::{
20 error::Error,
21 ssh::{buffer::Buffer, common::code::*},
22};
23
24use super::o_channel;
25use bitflags::bitflags;
26
27bitflags! {
28 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
30 pub struct OpenFlags: u32 {
31 const READ = SSH_FXF_READ;
33 const WRITE = SSH_FXF_WRITE;
36 const APPEND = SSH_FXF_APPEND;
38 const CREAT = SSH_FXF_CREAT;
42 const TRUNC = SSH_FXF_TRUNC;
46 const EXCL = SSH_FXF_EXCL;
49 }
50}
51
52#[derive(Debug, Clone, Copy)]
53pub struct Statvfs {
54 pub bsize: u64,
55 pub frsize: u64,
56 pub blocks: u64,
57 pub bfree: u64,
58 pub bavail: u64,
59 pub files: u64,
60 pub ffree: u64,
61 pub favail: u64,
62 pub fsid: u64,
63 pub flag: u64,
64 pub namemax: u64,
65}
66
67impl Statvfs {
68 pub const FLAG_RDONLY: u64 = 0x1;
69 pub const FLAG_NOSUID: u64 = 0x2;
70 fn parse(data: &[u8]) -> Option<Self> {
71 let buffer = Buffer::from_slice(data);
72
73 Some(Self {
74 bsize: buffer.take_u64()?,
75 frsize: buffer.take_u64()?,
76 blocks: buffer.take_u64()?,
77 bfree: buffer.take_u64()?,
78 bavail: buffer.take_u64()?,
79 files: buffer.take_u64()?,
80 ffree: buffer.take_u64()?,
81 favail: buffer.take_u64()?,
82 fsid: buffer.take_u64()?,
83 flag: buffer.take_u64()?,
84 namemax: buffer.take_u64()?,
85 })
86 }
87}
88
89pub struct Stream<'a> {
90 sftp: &'a mut SFtp,
91 file: &'a mut File,
92 read_future: Option<BoxFuture<'a, Result<Vec<u8>>>>,
93 write_future: Option<BoxFuture<'a, Result<()>>>,
94}
95
96impl<'a> Stream<'a> {
97 fn poll_read_no_pin(
98 &'a mut self,
99 cx: &mut Context<'_>,
100 buf: &mut ReadBuf<'_>,
101 ) -> Poll<io::Result<()>> {
102 let this = self;
103 if this.read_future.is_none() {
104 let read: BoxFuture<'_, _> = Box::pin(this.sftp.read_file(
105 this.file,
106 if buf.remaining() > u32::MAX as usize {
107 u32::MAX
108 } else {
109 buf.remaining() as u32
110 },
111 ));
112 this.read_future = Some(read);
113 }
114
115 let f = this.read_future.as_mut().unwrap().as_mut();
116
117 let res = ready!(f.poll(cx));
118 this.read_future = None;
119 match res {
120 Ok(data) => {
121 if data.len() > buf.remaining() {
122 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "data too long")));
123 }
124 buf.put_slice(&data);
125 Poll::Ready(Ok(()))
126 }
127 Err(err) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, Box::new(err)))),
128 }
129 }
130
131 fn poll_write_no_pin(
132 &'a mut self,
133 cx: &mut Context<'_>,
134 buf: &'a [u8],
135 ) -> Poll<io::Result<usize>> {
136 if self.write_future.is_none() {
137 let future: BoxFuture<_> = Box::pin(self.sftp.write_file_buf(self.file, buf));
138
139 self.write_future = Some(future);
140 }
141 let res = ready!(self.write_future.as_mut().unwrap().as_mut().poll(cx));
142
143 self.write_future = None;
144
145 match res {
146 Ok(_) => Poll::Ready(Ok(buf.len())),
147 Err(err) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, Box::new(err)))),
148 }
149 }
150}
151
152impl<'a> AsyncWrite for Stream<'a> {
153 fn poll_write(
154 self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 buf: &[u8],
157 ) -> Poll<io::Result<usize>> {
158 let this: &mut Stream<'a> = unsafe { transmute(Pin::into_inner(self)) };
159
160 let buf = unsafe { transmute::<&[u8], &[u8]>(buf) };
161
162 this.poll_write_no_pin(cx, buf)
163 }
164
165 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
166 Poll::Ready(Ok(()))
167 }
168
169 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
170 Poll::Ready(Ok(()))
171 }
172}
173
174impl<'a> AsyncRead for Stream<'a> {
175 fn poll_read(
176 self: Pin<&mut Self>,
177 cx: &mut Context<'_>,
178 buf: &mut ReadBuf<'_>,
179 ) -> Poll<io::Result<()>> {
180 let this: &mut Stream<'a> = unsafe { transmute(Pin::into_inner(self)) };
181 this.poll_read_no_pin(cx, buf)
182 }
212}
213
214pub struct SFtp {
215 channel: BufferChannel,
216 request_id: u32,
217 version: u32,
218 ext: HashMap<String, Vec<u8>>,
219 packets: HashMap<u32, Packet>,
220}
221
222impl SFtp {
223 pub(crate) fn new(channel: Channel, version: u32, ext: HashMap<String, Vec<u8>>) -> Self {
224 Self {
225 channel: BufferChannel::new(channel),
226 request_id: 0,
227 version,
228 ext,
229 packets: Default::default(),
230 }
231 }
232}
233
234pub struct File {
235 handle: Vec<u8>,
236 pos: u64,
237}
238
239impl File {
240 fn new(handle: Vec<u8>) -> Self {
241 Self { handle, pos: 0 }
242 }
243}
244
245pub struct Dir {
246 handle: Vec<u8>,
247}
248
249impl Dir {
250 fn new(handle: Vec<u8>) -> Self {
251 Self { handle }
252 }
253}
254
255bitflags! {
256 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
257 pub struct Permissions: u32 {
258 const OTHER_EXEC = 1 << 0;
259 const OTHER_WRITE = 1 << 1;
260 const OTHER_READ = 1 << 2;
261
262 const GROUP_EXEC = 1 << 0 << 4;
263 const GROUP_WRITE = 1 << 1 << 4;
264 const GROUP_READ = 1 << 2 << 4;
265
266 const OWNER_EXEC = 1 << 0 << 8;
267 const OWNER_WRITE = 1 << 1 << 8;
268 const OWNER_READ = 1 << 2 << 8;
269 }
270}
271
272impl Permissions {
273 pub fn p0755() -> Self {
274 Self::from_bits_retain(0o755)
275 }
276}
277
278#[derive(new, Debug, Clone, Copy)]
279pub struct Timestamp {
280 pub atime: u32,
281 pub mtime: u32,
282}
283
284#[derive(new, Debug, Clone, Copy)]
285pub struct User {
286 pub uid: u32,
287 pub gid: u32,
288}
289
290#[derive(new, Debug, Clone)]
291pub struct Attributes {
292 pub size: Option<u64>,
293 pub user: Option<User>,
294 pub permissions: Option<Permissions>,
295 pub time: Option<Timestamp>,
297 pub extend: HashMap<String, Vec<u8>>,
299}
300
301impl Attributes {
302 fn to_buffer(&self) -> Buffer<Vec<u8>> {
303 let mut buffer = Buffer::new();
304 self.to_bytes(&mut buffer);
305 buffer
306 }
307
308 fn to_bytes(&self, buffer: &mut Buffer<Vec<u8>>) {
309 let mut flags = 0;
310 let mut tmp = Buffer::new();
311 if let Some(size) = self.size {
312 flags |= SSH_FILEXFER_ATTR_SIZE;
313 tmp.put_u64(size);
314 }
315
316 if let Some(user) = self.user {
317 flags |= SSH_FILEXFER_ATTR_UIDGID;
318 tmp.put_u32(user.uid);
319 tmp.put_u32(user.gid);
320 }
321
322 if let Some(permissions) = self.permissions {
323 flags |= SSH_FILEXFER_ATTR_PERMISSIONS;
324 tmp.put_u32(permissions.bits());
325 }
326
327 if let Some(time) = self.time {
328 flags |= SSH_FILEXFER_ATTR_ACMODTIME;
329 tmp.put_u32(time.atime);
330 tmp.put_u32(time.mtime);
331 }
332
333 let count = self.extend.len() as u32;
334
335 tmp.put_u32(count);
336
337 for (k, v) in &self.extend {
338 tmp.put_one(k);
339 tmp.put_one(v);
340 }
341 buffer.put_u32(flags);
342 buffer.put_bytes(tmp);
343 }
344
345 fn parse(buffer: &mut Buffer<Cell<&[u8]>>) -> Option<Self> {
346 let flags = buffer.take_u32()?;
347
348 let mut size = None;
349 let mut user = None;
350 let mut permissions = None;
351 let mut time = None;
352
353 let mut extend = HashMap::new();
354
355 if flags & SSH_FILEXFER_ATTR_SIZE != 0 {
356 size = Some(buffer.take_u64()?)
357 }
358
359 if flags & SSH_FILEXFER_ATTR_UIDGID != 0 {
360 let uid = buffer.take_u32()?;
361 let gid = buffer.take_u32()?;
362 user = Some(User::new(uid, gid))
363 }
364
365 if flags & SSH_FILEXFER_ATTR_PERMISSIONS != 0 {
366 let per = buffer.take_u32()?;
367 permissions = Some(Permissions::from_bits_retain(per))
368 }
369
370 if flags & SSH_FILEXFER_ATTR_ACMODTIME != 0 {
371 let atime = buffer.take_u32()?;
372 let mtime = buffer.take_u32()?;
373
374 time = Some(Timestamp::new(atime, mtime))
375 }
376
377 if flags & SSH_FILEXFER_ATTR_EXTENDED != 0 {
378 let ecount = buffer.take_u32()?;
379
380 for _ in 0..ecount {
381 let (_, key) = buffer.take_one()?;
382 let (_, value) = buffer.take_one()?;
383
384 extend.insert(std::str::from_utf8(key).ok()?.to_string(), value.to_vec());
385 }
386 }
387
388 Some(Self::new(size, user, permissions, time, extend))
389 }
390}
391
392#[derive(new, Debug, Clone)]
393pub struct FileInfo {
394 pub filename: String,
395 pub longname: String,
396 pub attrs: Attributes,
397}
398
399#[derive(Debug, Clone, Copy)]
400pub struct Limits {
401 pub max_packet_len: u64,
402 pub max_read_len: u64,
403 pub max_write_len: u64,
404 pub max_open_handles: u64,
405}
406
407impl Limits {
408 fn parse(data: &[u8]) -> Option<Self> {
409 let buffer = Buffer::from_slice(data);
410
411 Some(Self {
412 max_packet_len: buffer.take_u64()?,
413 max_read_len: buffer.take_u64()?,
414 max_write_len: buffer.take_u64()?,
415 max_open_handles: buffer.take_u64()?,
416 })
417 }
418}
419
420#[derive(custom_debug_derive::Debug)]
421struct Packet {
422 id: u32,
423 msg: Message,
424}
425
426impl Packet {
427 fn parse(data: &[u8]) -> Option<Packet> {
428 let data = Buffer::from_slice(data);
429 let (_, data) = data.take_one()?;
430
431 let mut data = Buffer::from_slice(data);
432
433 let code = data.take_u8()?;
434 let id = data.take_u32()?;
435
436 let msg = match code {
437 SSH_FXP_HANDLE => {
438 let (_, handle) = data.take_one()?;
439
440 Message::FileHandle(handle.to_vec())
441 }
442 SSH_FXP_STATUS => {
443 let status = data.take_u32()?;
444
445 let (_, msg) = data.take_one()?;
446
447 let (_, tag) = data.take_one()?;
448
449 let msg = std::str::from_utf8(msg).ok()?.to_string();
450
451 let _tag = std::str::from_utf8(tag).ok()?.to_string();
452
453 let status = Status::from_status(status).ok()?;
454 Message::Status { status, msg, _tag }
455 }
456 SSH_FXP_DATA => Message::Data(data.take_one()?.1.to_vec()),
457 SSH_FXP_NAME => {
458 let count = data.take_u32()?;
459 let mut res = Vec::with_capacity(count as usize);
460
461 for _ in 0..count {
462 let (_, filename) = data.take_one()?;
463 let (_, longname) = data.take_one()?;
464
465 let filename = std::str::from_utf8(filename).ok()?.to_string();
466
467 let longname = std::str::from_utf8(longname).ok()?.to_string();
468
469 res.push(FileInfo::new(
470 filename,
471 longname,
472 Attributes::parse(&mut data)?,
473 ));
474 }
475 Message::Name(res)
476 }
477 SSH_FXP_ATTRS => Message::Attributes(Attributes::parse(&mut data)?),
478 SSH_FXP_EXTENDED_REPLY => Message::ExtendReply(data.to_vec()),
479 _ => return None,
480 };
481
482 Some(Packet { id, msg })
483 }
484}
485
486#[derive(Debug, PartialEq)]
487#[repr(u32)]
488enum Status {
489 OK = SSH_FX_OK,
490 Eof = SSH_FX_EOF,
491 NoSuchFile = SSH_FX_NO_SUCH_FILE,
492 PermissionDenied = SSH_FX_PERMISSION_DENIED,
493 Failure = SSH_FX_FAILURE,
494 BadMessage = SSH_FX_BAD_MESSAGE,
495 NoConnection = SSH_FX_NO_CONNECTION,
496 ConnectionLost = SSH_FX_CONNECTION_LOST,
497 OpUnsupported = SSH_FX_OP_UNSUPPORTED,
498}
499
500impl Status {
501 fn from_status(code: u32) -> Result<Self> {
502 Ok(match code {
503 SSH_FX_OK => Self::OK,
504 SSH_FX_EOF => Self::Eof,
505 SSH_FX_NO_SUCH_FILE => Self::NoSuchFile,
506 SSH_FX_PERMISSION_DENIED => Self::PermissionDenied,
507 SSH_FX_FAILURE => Self::Failure,
508 SSH_FX_BAD_MESSAGE => Self::BadMessage,
509 SSH_FX_NO_CONNECTION => Self::NoConnection,
510 SSH_FX_CONNECTION_LOST => Self::ConnectionLost,
511 SSH_FX_OP_UNSUPPORTED => Self::OpUnsupported,
512 _ => return Err(Error::invalid_format("Invalid Sftp status code")),
513 })
514 }
515
516 fn to_result<T: Default>(&self, msg: String) -> Result<T> {
517 match self {
518 Status::OK => Ok(Default::default()),
519 Status::Eof => Ok(Default::default()),
520 Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), Status::Failure => builder::SFtpFailure { tip: msg }.fail(), Status::BadMessage => builder::BadMessage { tip: msg }.fail(), Status::NoConnection => builder::NoConnection { tip: msg }.fail(), Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), }
528 }
529
530 fn no_ok_and_eof<T>(&self, msg: String) -> Result<T> {
531 match self {
532 Status::OK =>
533 {
537 builder::Protocol {
538 tip: "Unexpected Ok status received",
539 }
540 .fail()
541 }
542 Status::Eof =>
543 {
547 builder::Protocol {
548 tip: "Unexpected EOF status received",
549 }
550 .fail()
551 }
552 Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), Status::Failure => builder::SFtpFailure { tip: msg }.fail(), Status::BadMessage => builder::BadMessage { tip: msg }.fail(), Status::NoConnection => builder::NoConnection { tip: msg }.fail(), Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), }
560 }
561
562 fn no_eof<T: Default>(&self, msg: String) -> Result<T> {
563 match self {
564 Status::OK => Ok(Default::default()),
565 Status::Eof =>
566 {
570 builder::Protocol {
571 tip: "Unexpected EOF status received",
572 }
573 .fail()
574 }
575 Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), Status::Failure => builder::SFtpFailure { tip: msg }.fail(), Status::BadMessage => builder::BadMessage { tip: msg }.fail(), Status::NoConnection => builder::NoConnection { tip: msg }.fail(), Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), }
583 }
584
585 fn no_ok<T: Default>(&self, msg: String) -> Result<T> {
586 match self {
587 Status::OK =>
588 {
592 builder::Protocol {
593 tip: "Unexpected Ok status received",
594 }
595 .fail()
596 }
597 Status::Eof => Ok(Default::default()),
598 Status::NoSuchFile => builder::NoSuchFile { tip: msg }.fail(), Status::PermissionDenied => builder::PermissionDenied { tip: msg }.fail(), Status::Failure => builder::SFtpFailure { tip: msg }.fail(), Status::BadMessage => builder::BadMessage { tip: msg }.fail(), Status::NoConnection => builder::NoConnection { tip: msg }.fail(), Status::ConnectionLost => builder::NoConnection { tip: msg }.fail(), Status::OpUnsupported => builder::OpUnsupported { tip: msg }.fail(), }
606 }
607}
608
609#[derive(custom_debug_derive::Debug)]
610enum Message {
611 FileHandle(Vec<u8>),
612 Status {
613 status: Status,
614 msg: String,
615 _tag: String,
616 },
617 Data(#[debug(skip)] Vec<u8>),
618 Name(Vec<FileInfo>),
619 Attributes(Attributes),
620 ExtendReply(#[debug(skip)] Vec<u8>),
621}
622
623impl SFtp {
628 const MAX_SFTP_PACKET: usize = 32000;
629
630 pub fn extend(&self, key: &str) -> Option<&[u8]> {
631 self.ext.get(key).map(|v| v.as_ref())
632 }
633
634 pub fn version(&self) -> u32 {
635 self.version
636 }
637
638 pub async fn from_channel(channel: Channel) -> Result<Self> {
639 let (sender, recver) = o_channel();
640
641 let session = channel.session();
642 let request = Request::SFtpFromChannel { channel, sender };
643
644 session
645 .send(request)
646 .map_err(|_| builder::Disconnected.build())?;
647
648 recver.await?
649 }
650
651 pub async fn close(self) -> Result<()> {
652 self.channel.into_inner().close().await
653 }
654
655 pub fn support_posix_rename(&self) -> bool {
665 self.support(OPENSSH_SFTP_EXT_POSIX_RENAME)
666 }
667
668 pub async fn posix_rename(&mut self, oldpath: &str, newpath: &str) -> Result<()> {
669 debug_assert!(
670 self.support_posix_rename(),
671 "Server doesn't support posix rename"
672 );
673 let request_id = self.genarate_request_id();
676
677 let buffer = make_buffer! {
685 u8: SSH_FXP_EXTENDED,
686 u32: request_id,
687 one: OPENSSH_SFTP_EXT_POSIX_RENAME.0,
688 one: oldpath,
689 one: newpath,
690 };
691
692 self.channel.write_all(buffer).await?;
693
694 self.wait_for_status(request_id, Status::no_eof).await
695 }
696
697 pub fn support_fstatvfs(&self) -> bool {
698 self.support(OPENSSH_SFTP_EXT_FSTATVFS)
699 }
700
701 pub async fn fstatvfs(&mut self, file: &File) -> Result<Statvfs> {
702 debug_assert!(self.support_fstatvfs(), "Server doesn't support fstatvfs");
703
704 let request_id = self.genarate_request_id();
705 let buffer = make_buffer! {
713 u8: SSH_FXP_EXTENDED,
714 u32: request_id,
715 one: OPENSSH_SFTP_EXT_FSTATVFS.0,
716 one: &file.handle,
717 };
718
719 self.channel.write_all(buffer).await?;
720
721 let packet = self.wait_for_packet(request_id).await?;
722
723 match packet.msg {
724 Message::Status { status, msg, _tag } => status.no_ok_and_eof(msg),
725 Message::ExtendReply(data) => Statvfs::parse(&data).context(builder::Protocol {
726 tip: "Invalid Statvfs Message",
727 }),
728 _ => builder::Protocol {
729 tip: "Unexpected SFtp Message",
730 }
731 .fail(), }
733 }
734
735 pub fn support_statvfs(&self) -> bool {
736 self.support(OPENSSH_SFTP_EXT_STATVFS)
737 }
738
739 pub async fn statvfs(&mut self, path: &str) -> Result<Statvfs> {
740 debug_assert!(self.support_fstatvfs(), "Server doesn't support statvfs");
741
742 let request_id = self.genarate_request_id();
743 let buffer = make_buffer! {
751 u8: SSH_FXP_EXTENDED,
752 u32: request_id,
753 one: OPENSSH_SFTP_EXT_STATVFS.0,
754 one: path,
755 };
756
757 self.channel.write_all(buffer).await?;
758
759 let packet = self.wait_for_packet(request_id).await?;
760
761 match packet.msg {
762 Message::Status { status, msg, _tag } => status.no_ok_and_eof(msg),
763 Message::ExtendReply(data) => Statvfs::parse(&data).context(builder::Protocol {
764 tip: "Invalid Statvfs Message",
765 }),
766 _ => builder::Protocol {
767 tip: "Unexpected SFtp Message",
768 }
769 .fail(), }
771 }
772
773 pub fn support_hardlink(&self) -> bool {
774 self.support(OPENSSH_SFTP_EXT_HARDLINK)
775 }
776
777 pub async fn hardlink(&mut self, oldpath: &str, newpath: &str) -> Result<()> {
778 debug_assert!(self.support_hardlink(), "Server doesn't support hardlink");
779
780 let request_id = self.genarate_request_id();
781 let buffer = make_buffer! {
790 u8: SSH_FXP_EXTENDED,
791 u32: request_id,
792 one: OPENSSH_SFTP_EXT_HARDLINK.0,
793 one: oldpath,
794 one: newpath,
795 };
796
797 self.channel.write_all(buffer).await?;
798
799 self.wait_for_status(request_id, Status::no_eof).await
800 }
801
802 pub fn support_fsync(&self) -> bool {
803 self.support(OPENSSH_SFTP_EXT_FSYNC)
804 }
805
806 pub async fn fsync(&mut self, file: &File) -> Result<()> {
807 debug_assert!(self.support_fsync(), "Server doesn't support fsync");
808
809 let request_id = self.genarate_request_id();
810 let buffer = make_buffer! {
818 u8: SSH_FXP_EXTENDED,
819 u32: request_id,
820 one: OPENSSH_SFTP_EXT_FSYNC.0,
821 one: &file.handle,
822 };
823
824 self.channel.write_all(buffer).await?;
825
826 self.wait_for_status(request_id, Status::no_eof).await
827 }
828
829 pub fn support_lsetstat(&self) -> bool {
830 self.support(OPENSSH_SFTP_EXT_LSETSTAT)
831 }
832
833 pub async fn lsetstat(&mut self, path: &str, attrs: &Attributes) -> Result<()> {
834 debug_assert!(self.support_lsetstat(), "Server doesn't lsetstat");
835
836 let request_id = self.genarate_request_id();
837 let attrs = attrs.to_buffer();
846
847 let buffer = make_buffer! {
848 u8: SSH_FXP_EXTENDED,
849 u32: request_id,
850 one: OPENSSH_SFTP_EXT_LSETSTAT.0,
851 one: path,
852 bytes: attrs,
853 };
854
855 self.channel.write_all(buffer).await?;
856
857 self.wait_for_status(request_id, Status::no_eof).await
858 }
859
860 pub fn support_limits(&self) -> bool {
861 self.support(OPENSSH_SFTP_EXT_LIMITS)
862 }
863
864 pub async fn limits(&mut self) -> Result<Limits> {
865 debug_assert!(self.support_limits(), "Server doesn't support limits");
866 let request_id = self.genarate_request_id();
867 let buffer = make_buffer! {
873 u8: SSH_FXP_EXTENDED,
874 u32: request_id,
875 one: OPENSSH_SFTP_EXT_LIMITS.0
876 };
877
878 self.channel.write_all(buffer).await?;
879
880 let packet = self.wait_for_packet(request_id).await?;
881
882 match packet.msg {
883 Message::ExtendReply(data) => Limits::parse(&data).context(builder::Protocol {
884 tip: "Invalid packet format",
885 }),
886 _ => builder::Protocol {
887 tip: "Unexpected SFtp Message",
888 }
889 .fail(),
890 }
891 }
892
893 pub fn support_expand_path(&self) -> bool {
894 self.support(OPENSSH_SFTP_EXT_EXPAND_PATH)
895 }
896
897 pub async fn expand_path(&mut self, path: &str) -> Result<String> {
898 debug_assert!(
899 self.support_expand_path(),
900 "Server doesn't support expand path"
901 );
902
903 let request_id = self.genarate_request_id();
904 let buffer = make_buffer! {
912 u8: SSH_FXP_EXTENDED,
913 u32: request_id,
914 one: OPENSSH_SFTP_EXT_EXPAND_PATH.0,
915 one: path
916 };
917
918 self.channel.write_all(buffer).await?;
919
920 let packet = self.wait_for_packet(request_id).await?;
921
922 match packet.msg {
923 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
924 Message::Name(infos) if infos.len() == 1 => Ok(infos[0].filename.clone()),
925 _ => builder::Protocol { tip: "Unknown msg" }.fail(),
927 }
928 }
929
930 pub fn support_copy_data(&self) -> bool {
931 self.support(OPENSSH_SFTP_EXT_COPY_DATA)
932 }
933
934 pub async fn copy_data(&mut self, read: &mut File, len: u64, write: &mut File) -> Result<()> {
935 debug_assert!(self.support_copy_data(), "Server doesn't support copy data");
936
937 let request_id = self.genarate_request_id();
938 let buffer = make_buffer! {
950 u8: SSH_FXP_EXTENDED,
951 u32: request_id,
952 one: OPENSSH_SFTP_EXT_COPY_DATA.0,
953 one: &read.handle,
954 u64: read.pos,
955 u64: len,
956 one: &write.handle,
957 u64: write.pos,
958 };
959
960 self.channel.write_all(buffer).await?;
961
962 let status = self.wait_for_status(request_id, Status::to_result).await;
963
964 if status.is_ok() {
965 read.pos += len;
966 write.pos += len;
967 }
968
969 status
970 }
971
972 pub fn support_home_directory(&self) -> bool {
973 self.support(OPENSSH_SFTP_EXT_HOME_DIRECTORY)
974 }
975
976 pub async fn home_directory(&mut self, username: &str) -> Result<String> {
977 debug_assert!(
978 self.support_home_directory(),
979 "Server doesn't support home directory"
980 );
981
982 let request_id = self.genarate_request_id();
983 let buffer = make_buffer! {
998 u8: SSH_FXP_EXTENDED,
999 u32: request_id,
1000 one: OPENSSH_SFTP_EXT_HOME_DIRECTORY.0,
1001 one: username
1002 };
1003
1004 self.channel.write_all(buffer).await?;
1005
1006 let packet = self.wait_for_packet(request_id).await?;
1007
1008 match packet.msg {
1009 Message::Status { status, msg, _tag } => status.no_ok_and_eof(msg),
1010 Message::Name(infos) if infos.len() == 1 => Ok(infos[0].filename.clone()),
1011 _ => builder::Protocol {
1013 tip: "Unexpected message",
1014 }
1015 .fail(),
1016 }
1017 }
1018
1019 pub fn support_users_groups_by_id(&self) -> bool {
1020 self.support(OPENSSH_SFTP_EXT_USERS_GROUPS_BY_ID)
1021 }
1022
1023 pub async fn users_groups_by_id(
1024 &mut self,
1025 users: &[u32],
1026 groups: &[u32],
1027 ) -> Result<(Vec<String>, Vec<String>)> {
1028 let request_id = self.genarate_request_id();
1029 let cap = 4
1030 + 1
1031 + 4
1032 + OPENSSH_SFTP_EXT_USERS_GROUPS_BY_ID.0.len()
1033 + 4
1034 + users.len() * 4
1035 + 4
1036 + groups.len() * 4;
1037 let mut buffer = Buffer::with_capacity(cap);
1038 buffer.put_u32((cap - 4) as u32);
1039 buffer.put_u8(SSH_FXP_EXTENDED);
1040 buffer.put_u32(request_id);
1041 buffer.put_one(OPENSSH_SFTP_EXT_USERS_GROUPS_BY_ID.0);
1042
1043 buffer.put_u32((users.len() * 4) as u32);
1044
1045 users.iter().for_each(|v| {
1046 buffer.put_u32(*v);
1047 });
1048
1049 buffer.put_u32((groups.len() * 4) as u32);
1050
1051 groups.iter().for_each(|v| {
1052 buffer.put_u32(*v);
1053 });
1054
1055 self.channel.write_all(buffer).await?;
1056
1057 let packet = self.wait_for_packet(request_id).await?;
1058
1059 match packet.msg {
1060 Message::ExtendReply(data) => {
1061 let buffer = Buffer::from_slice(&data);
1062 let usernames = buffer
1063 .take_one()
1064 .context(builder::Protocol {
1065 tip: "Invalid sftp packet format",
1066 })?
1067 .1;
1068 let usernames = Buffer::from_slice(usernames);
1069
1070 let groupnames = buffer
1071 .take_one()
1072 .context(builder::Protocol {
1073 tip: "Invalid sftp packet format",
1074 })?
1075 .1;
1076 let groupnames = Buffer::from_slice(groupnames);
1077
1078 let mut unames = vec![];
1079 while let Some(user) = usernames.take_one() {
1080 unames.push(std::str::from_utf8(user.1)?.to_string())
1082 }
1083
1084 let mut gnames = vec![];
1085
1086 while let Some(group) = groupnames.take_one() {
1087 gnames.push(std::str::from_utf8(group.1)?.to_string());
1089 }
1090
1091 Ok((unames, gnames))
1092 }
1093 _ => builder::Protocol {
1094 tip: "Unexpected message",
1095 }
1096 .fail(), }
1098 }
1099
1100 fn support(&self, (e, v): (&str, &[u8])) -> bool {
1101 self.ext.get(e).map(|v| v.as_ref()) == Some(v)
1102 }
1103
1104 pub fn seek_file(&self, file: &mut File, pos: u64) {
1118 file.pos = pos;
1119 }
1120
1121 pub async fn close_file(&mut self, file: File) -> Result<()> {
1122 let request_id = self.genarate_request_id();
1123
1124 let buffer = make_buffer! {
1132 u8: SSH_FXP_CLOSE,
1133 u32: request_id,
1134 one: file.handle,
1135 };
1136
1137 self.channel.write_all(buffer).await?;
1138
1139 self.wait_for_status(request_id, Status::no_eof).await
1140 }
1141
1142 pub async fn read_file_buf(&mut self, file: &mut File, max: u32) -> Result<Vec<Vec<u8>>> {
1143 let base = 255 * 1024;
1144
1145 let mut times = max / base;
1146 if times == 0 {
1147 times = 1;
1148 }
1149
1150 let mut requests = Vec::with_capacity(times as usize);
1151
1152 let mut datas = Vec::with_capacity(times as usize);
1153
1154 let mut all =
1155 Vec::with_capacity(times as usize * (4 + 1 + 4 + 4 + file.handle.len() + 8 + 4));
1156
1157 let mut pos = file.pos;
1158 for _ in 0..times {
1159 let request_id = self.genarate_request_id();
1160
1161 let buffer = make_buffer! {
1162 u8: SSH_FXP_READ,
1163 u32: request_id,
1164 one: &file.handle,
1165 u64: pos,
1166 u32: base
1167 };
1168
1169 all.extend(buffer.into_vec());
1170
1171 requests.push(request_id);
1172
1173 pos += base as u64;
1174 }
1175 self.channel.write_all(all).await?;
1177
1178 for i in requests {
1182 let packet = self.wait_for_packet(i).await?;
1183 match packet.msg {
1187 Message::Data(data) => {
1188 file.pos += data.len() as u64;
1189 datas.push(data);
1190 }
1191 Message::Status {
1192 status: Status::Eof,
1193 ..
1194 } => {
1195 datas.push(vec![]);
1196 }
1197 Message::Status { status, msg, .. } => return status.no_ok(msg),
1198 _ => return builder::Protocol { tip: "Unknown msg" }.fail(), }
1200 }
1201
1202 Ok(datas)
1203 }
1204
1205 pub async fn read_file(&mut self, file: &mut File, max: u32) -> Result<Vec<u8>> {
1206 let request_id = self.genarate_request_id();
1207
1208 let buffer = make_buffer! {
1225 u8: SSH_FXP_READ,
1226 u32: request_id,
1227 one: &file.handle,
1228 u64: file.pos,
1229 u32: max
1230 };
1231
1232 self.channel.write_all(buffer).await?;
1233
1234 let packet = self.wait_for_packet(request_id).await?;
1237
1238 match packet.msg {
1239 Message::Data(data) => {
1240 file.pos += data.len() as u64;
1241 Ok(data)
1242 }
1243 Message::Status { status, msg, .. } => status.no_ok(msg),
1244 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1246 }
1247
1248 pub async fn write_file_buf(&mut self, file: &mut File, data: &[u8]) -> Result<()> {
1266 if data.is_empty() {
1267 return Ok(());
1268 }
1269 let max = Self::MAX_SFTP_PACKET;
1270 let mut requests = vec![];
1271 let mut buffer =
1273 Buffer::with_capacity(4 + 1 + 4 + 4 + file.handle.len() + 8 + 4 + min(max, data.len()));
1274 for i in (0..data.len()).step_by(max) {
1275 let left = data.len() - i;
1276
1277 let min = min(left, max);
1278
1279 let request_id = self.genarate_request_id();
1280
1281 buffer.put_u32((1 + 4 + 4 + file.handle.len() + 8 + 4 + min) as u32);
1282 buffer.put_u8(SSH_FXP_WRITE);
1283 buffer.put_u32(request_id);
1284 buffer.put_one(&file.handle);
1285 buffer.put_u64(file.pos);
1286 buffer.put_one(&data[i..i + min]);
1287
1288 self.channel.write(&buffer).await?;
1291
1292 requests.push(request_id);
1293 file.pos += min as u64;
1294 buffer.clear();
1295 }
1296
1297 self.channel.flush().await?;
1298
1299 for id in requests {
1300 self.wait_for_status(id, Status::no_eof).await?;
1301 }
1302
1303 Ok(())
1304 }
1305
1306 pub async fn write_file(&mut self, file: &mut File, data: &[u8]) -> Result<()> {
1307 let max = Self::MAX_SFTP_PACKET;
1308 for i in (0..data.len()).step_by(max) {
1309 let left = data.len() - i;
1310
1311 let min = min(left, max);
1312
1313 self.write_file_unchecked(file, &data[i..i + min]).await?;
1314 }
1315
1316 Ok(())
1317 }
1318
1319 async fn write_file_unchecked(&mut self, file: &mut File, data: &[u8]) -> Result<()> {
1320 let request_id = self.genarate_request_id();
1324
1325 let buffer = make_buffer! {
1326 u8: SSH_FXP_WRITE,
1327 u32: request_id,
1328 one: &file.handle,
1329 u64: file.pos,
1330 one: data
1331 };
1332
1333 self.channel.write_all(buffer).await?;
1334
1335 let res = self.wait_for_status(request_id, Status::no_eof).await;
1336 if res.is_ok() {
1337 file.pos += data.len() as u64;
1338 }
1339 res
1340 }
1341
1342 async fn wait_for_status<T, B>(&mut self, id: u32, f: T) -> Result<B>
1343 where
1344 T: FnOnce(&Status, String) -> Result<B>,
1345 {
1346 let packet = self.wait_for_packet(id).await?;
1347
1348 match packet.msg {
1349 Message::Status { status, msg, .. } => f(&status, msg),
1351 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1354 }
1355
1356 pub async fn mkdir(&mut self, path: &str, permissions: Permissions) -> Result<()> {
1357 let request_id = self.genarate_request_id();
1358 let flags = SSH_FILEXFER_ATTR_PERMISSIONS;
1359 let permissions_bits = permissions.bits();
1360
1361 let buffer = make_buffer! {
1362 u8: SSH_FXP_MKDIR,
1363 u32: request_id,
1364 one: path,
1365 u32: flags,
1366 u32: permissions_bits,
1367 };
1368
1369 self.channel.write_all(buffer).await?;
1370
1371 self.wait_for_status(request_id, Status::no_eof).await
1372 }
1373
1374 pub async fn rmdir(&mut self, path: &str) -> Result<()> {
1375 let request_id = self.genarate_request_id();
1376
1377 let buffer = make_buffer! {
1378 u8: SSH_FXP_RMDIR,
1379 u32: request_id,
1380 one: path,
1381 };
1382
1383 self.channel.write_all(buffer).await?;
1384
1385 self.wait_for_status(request_id, Status::no_ok).await
1386 }
1387
1388 pub async fn open_dir(&mut self, path: &str) -> Result<Dir> {
1389 let request_id = self.genarate_request_id();
1390
1391 let buffer = make_buffer! {
1392 u8: SSH_FXP_OPENDIR,
1393 u32: request_id,
1394 one: path,
1395 };
1396
1397 self.channel.write_all(buffer).await?;
1398
1399 let packet = self.wait_for_packet(request_id).await?;
1400
1401 match packet.msg {
1402 Message::FileHandle(handle) => Ok(Dir::new(handle)),
1403 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1404
1405 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1407 }
1408
1409 pub async fn close_dir(&mut self, dir: Dir) -> Result<()> {
1410 let request_id = self.genarate_request_id();
1411
1412 let buffer = make_buffer! {
1420 u8: SSH_FXP_CLOSE,
1421 u32: request_id,
1422 one: dir.handle,
1423 };
1424
1425 self.channel.write_all(buffer).await?;
1426
1427 self.wait_for_status(request_id, Status::no_eof).await
1428 }
1429
1430 pub async fn read_dir(&mut self, dir: &Dir) -> Result<Vec<FileInfo>> {
1431 let request_id = self.genarate_request_id();
1432
1433 let buffer = make_buffer! {
1434 u8: SSH_FXP_READDIR,
1435 u32: request_id,
1436 one: &dir.handle,
1437 };
1438
1439 self.channel.write_all(buffer).await?;
1440
1441 let packet = self.wait_for_packet(request_id).await?;
1442
1443 match packet.msg {
1444 Message::Status { status, msg, .. } => status.no_ok(msg),
1445 Message::Name(infos) => Ok(infos),
1446 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1449 }
1450
1451 pub async fn stat(&mut self, path: &str) -> Result<Attributes> {
1452 let request_id = self.genarate_request_id();
1453
1454 let buffer = make_buffer! {
1455 u8: SSH_FXP_STAT,
1456 u32: request_id,
1457 one: path,
1458 };
1459
1460 self.channel.write_all(buffer).await?;
1461
1462 let packet = self.wait_for_packet(request_id).await?;
1463
1464 match packet.msg {
1465 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1466 Message::Attributes(attrs) => Ok(attrs),
1467 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1470 }
1471
1472 pub async fn lstat(&mut self, path: &str) -> Result<Attributes> {
1473 let request_id = self.genarate_request_id();
1474
1475 let buffer = make_buffer! {
1476 u8: SSH_FXP_LSTAT,
1477 u32: request_id,
1478 one: path,
1479 };
1480
1481 self.channel.write_all(buffer).await?;
1482
1483 let packet = self.wait_for_packet(request_id).await?;
1484
1485 match packet.msg {
1486 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1487 Message::Attributes(attrs) => Ok(attrs),
1488 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1491 }
1492
1493 pub async fn fstat(&mut self, file: &File) -> Result<Attributes> {
1494 let request_id = self.genarate_request_id();
1495 let buffer = make_buffer! {
1496 u8: SSH_FXP_FSTAT,
1497 u32: request_id,
1498 one: &file.handle,
1499 };
1500
1501 self.channel.write_all(buffer).await?;
1502
1503 let packet = self.wait_for_packet(request_id).await?;
1504
1505 match packet.msg {
1506 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1507 Message::Attributes(attrs) => Ok(attrs),
1508 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1511 }
1512
1513 pub async fn setstat(&mut self, path: &str, attrs: &Attributes) -> Result<()> {
1514 let request_id = self.genarate_request_id();
1515
1516 let attrs = attrs.to_buffer();
1517
1518 let buffer = make_buffer! {
1525 u8: SSH_FXP_SETSTAT,
1526 u32: request_id,
1527 one: path,
1528 bytes: attrs,
1529 };
1530
1531 self.channel.write_all(buffer).await?;
1532
1533 self.wait_for_status(request_id, Status::no_eof).await
1534 }
1535
1536 pub async fn setfstat(&mut self, file: &File, attrs: &Attributes) -> Result<()> {
1537 let request_id = self.genarate_request_id();
1538
1539 let attrs = attrs.to_buffer();
1540 let buffer = make_buffer! {
1549 u8: SSH_FXP_FSETSTAT,
1550 u32: request_id,
1551 one: &file.handle,
1552 bytes: attrs,
1553 };
1554
1555 self.channel.write_all(buffer).await?;
1556
1557 self.wait_for_status(request_id, Status::no_eof).await
1558 }
1559
1560 pub async fn readlink(&mut self, path: &str) -> Result<FileInfo> {
1561 let request_id = self.genarate_request_id();
1562
1563 let buffer = make_buffer! {
1570 u8: SSH_FXP_READLINK,
1571 u32: request_id,
1572 one: path,
1573 };
1574
1575 self.channel.write_all(buffer).await?;
1576
1577 let packet = self.wait_for_packet(request_id).await?;
1578
1579 match packet.msg {
1580 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1581 Message::Name(mut infos) if infos.len() == 1 => Ok(infos.remove(0)),
1582 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1585 }
1586
1587 pub async fn symlink(&mut self, linkpath: &str, targetpath: &str) -> Result<()> {
1588 let request_id = self.genarate_request_id();
1589
1590 let buffer = make_buffer! {
1598 u8: SSH_FXP_SYMLINK,
1599 u32: request_id,
1600 one: linkpath,
1601 one: targetpath,
1602 };
1603
1604 self.channel.write_all(buffer).await?;
1605
1606 self.wait_for_status(request_id, Status::no_eof).await
1607 }
1608
1609 pub async fn realpath(&mut self, path: &str) -> Result<String> {
1610 let request_id = self.genarate_request_id();
1611
1612 let buffer = make_buffer! {
1619 u8: SSH_FXP_REALPATH,
1620 u32: request_id,
1621 one: path,
1622 };
1623
1624 self.channel.write_all(buffer).await?;
1625
1626 let packet = self.wait_for_packet(request_id).await?;
1627
1628 match packet.msg {
1629 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1630 Message::Name(infos) if infos.len() == 1 => Ok(infos[0].filename.clone()),
1631 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1634 }
1635
1636 pub async fn rename_file_or_dir(&mut self, old: &str, new: &str) -> Result<()> {
1637 let request_id = self.genarate_request_id();
1638
1639 let buffer = make_buffer! {
1647 u8: SSH_FXP_RENAME,
1648 u32: request_id,
1649 one: old,
1650 one: new,
1651 };
1652
1653 self.channel.write_all(buffer).await?;
1654
1655 self.wait_for_status(request_id, Status::no_eof).await
1656 }
1657
1658 pub async fn remove_file(&mut self, file: &str) -> Result<()> {
1659 let request_id = self.genarate_request_id();
1660
1661 let buffer = make_buffer! {
1668 u8: SSH_FXP_REMOVE,
1669 u32: request_id,
1670 one: file,
1671 };
1672
1673 self.channel.write_all(buffer).await?;
1674
1675 self.wait_for_status(request_id, Status::no_eof).await
1676 }
1677
1678 pub async fn open_file(
1679 &mut self,
1680 filename: &str,
1681 flags: OpenFlags,
1682 permissions: Option<Permissions>,
1683 ) -> Result<File> {
1684 let request_id = self.genarate_request_id();
1685 let mut flag = 0;
1686
1687 let mut tmp = Buffer::new();
1688 if let Some(permissions) = permissions {
1689 flag |= SSH_FILEXFER_ATTR_PERMISSIONS;
1690 tmp.put_u32(permissions.bits());
1691 }
1692
1693 let openflags = flags.bits();
1705 let buffer = make_buffer! {
1706 u8: SSH_FXP_OPEN,
1707 u32: request_id,
1708 one: filename,
1709 u32: openflags,
1710 u32: flag,
1711 bytes: tmp,
1712 };
1713
1714 self.channel.write_all(buffer).await?;
1715
1716 let packet = self.wait_for_packet(request_id).await?;
1717
1718 match packet.msg {
1719 Message::FileHandle(handle) => Ok(File::new(handle)),
1720 Message::Status { status, msg, .. } => status.no_ok_and_eof(msg),
1721
1722 _ => builder::Protocol { tip: "Unknown msg" }.fail(), }
1725 }
1726
1727 pub fn file_streamer<'a>(&'a mut self, file: &'a mut File) -> Stream<'a> {
1728 Stream {
1729 sftp: self,
1730 file,
1731 read_future: None,
1732 write_future: None,
1733 }
1734 }
1735
1736 async fn wait_for_packet(&mut self, id: u32) -> Result<Packet> {
1737 let packet = self.packets.remove(&id);
1738 if let Some(packet) = packet {
1739 return Ok(packet);
1740 }
1741 loop {
1742 let packet = self.recv().await?;
1743 if packet.id == id {
1744 return Ok(packet);
1745 }
1746 self.packets.insert(packet.id, packet);
1747 }
1748 }
1749
1750 async fn recv(&mut self) -> Result<Packet> {
1751 let data = self.channel.fill(4).await?;
1752
1753 let len = u32::from_be_bytes(data.try_into().unwrap());
1754
1755 let data = self.channel.fill(4 + len as usize).await?;
1756
1757 let res = Packet::parse(data).context(builder::InvalidArgument {
1758 tip: "Unable to parse sftp packet",
1759 });
1760 self.channel.consume(4 + len as usize);
1761 res
1762 }
1763
1764 fn genarate_request_id(&mut self) -> u32 {
1765 self.request_id = self.request_id.wrapping_add(1);
1766 self.request_id
1767 }
1768
1769 }