openssh_sftp_client_lowlevel/
write_end.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4
5use awaitable_responses::ArenaArc;
6use connection::SharedData;
7
8use std::{
9    borrow::Cow,
10    convert::TryInto,
11    fmt::Debug,
12    io::IoSlice,
13    ops::{Deref, DerefMut},
14    path::Path,
15};
16
17use bytes::{BufMut, Bytes, BytesMut};
18use openssh_sftp_protocol::{
19    file_attrs::FileAttrs, request::*, serde::Serialize, ssh_format::Serializer, Handle,
20};
21
22/// It is recommended to create at most one `WriteEnd` per thread
23/// using [`WriteEnd::clone`].
24#[derive(Debug)]
25pub struct WriteEnd<Buffer, Q, Auxiliary = ()> {
26    serializer: Serializer<BytesMut>,
27    shared_data: SharedData<Buffer, Q, Auxiliary>,
28}
29
30impl<Buffer, Q, Auxiliary> Clone for WriteEnd<Buffer, Q, Auxiliary> {
31    fn clone(&self) -> Self {
32        Self::new(self.shared_data.clone())
33    }
34}
35
36impl<Buffer, Q, Auxiliary> WriteEnd<Buffer, Q, Auxiliary> {
37    /// Create a [`WriteEnd`] from [`SharedData`].
38    pub fn new(shared_data: SharedData<Buffer, Q, Auxiliary>) -> Self {
39        Self {
40            serializer: Serializer::default(),
41            shared_data,
42        }
43    }
44
45    /// Consume the [`WriteEnd`] and return the stored [`SharedData`].
46    pub fn into_shared_data(self) -> SharedData<Buffer, Q, Auxiliary> {
47        self.shared_data
48    }
49}
50
51impl<Buffer, Q, Auxiliary> Deref for WriteEnd<Buffer, Q, Auxiliary> {
52    type Target = SharedData<Buffer, Q, Auxiliary>;
53
54    fn deref(&self) -> &Self::Target {
55        &self.shared_data
56    }
57}
58
59impl<Buffer, Q, Auxiliary> DerefMut for WriteEnd<Buffer, Q, Auxiliary> {
60    fn deref_mut(&mut self) -> &mut Self::Target {
61        &mut self.shared_data
62    }
63}
64
65impl<Buffer, Q, Auxiliary> WriteEnd<Buffer, Q, Auxiliary>
66where
67    Buffer: Send + Sync,
68    Q: Queue,
69{
70    pub(crate) fn send_hello(&mut self, version: u32) -> Result<(), Error> {
71        self.shared_data
72            .queue()
73            .push(Self::serialize(&mut self.serializer, Hello { version })?);
74
75        Ok(())
76    }
77
78    fn reset(serializer: &mut Serializer<BytesMut>) {
79        serializer.reset_counter();
80        // Reserve for the header
81        serializer.output.resize(4, 0);
82    }
83
84    fn serialize<T>(serializer: &mut Serializer<BytesMut>, value: T) -> Result<Bytes, Error>
85    where
86        T: Serialize,
87    {
88        Self::reset(serializer);
89
90        value.serialize(&mut *serializer)?;
91
92        let header = serializer.create_header(0)?;
93        // Write the header
94        serializer.output[..4].copy_from_slice(&header);
95
96        Ok(serializer.output.split().freeze())
97    }
98
99    /// Send requests.
100    ///
101    /// NOTE that this merely add the request to the buffer, you need to call
102    /// [`SharedData::flush`] to actually send the requests.
103    fn send_request(
104        &mut self,
105        id: Id<Buffer>,
106        request: RequestInner<'_>,
107        buffer: Option<Buffer>,
108    ) -> Result<ArenaArc<Buffer>, Error> {
109        let serialized = Self::serialize(
110            &mut self.serializer,
111            Request {
112                request_id: ArenaArc::slot(&id.0),
113                inner: request,
114            },
115        )?;
116
117        id.0.reset(buffer);
118        self.shared_data.queue().push(serialized);
119
120        Ok(id.into_inner())
121    }
122
123    pub fn send_open_file_request(
124        &mut self,
125        id: Id<Buffer>,
126        params: OpenFileRequest<'_>,
127    ) -> Result<AwaitableHandle<Buffer>, Error> {
128        self.send_request(id, RequestInner::Open(params), None)
129            .map(AwaitableHandle::new)
130    }
131
132    pub fn send_close_request(
133        &mut self,
134        id: Id<Buffer>,
135        handle: Cow<'_, Handle>,
136    ) -> Result<AwaitableStatus<Buffer>, Error> {
137        self.send_request(id, RequestInner::Close(handle), None)
138            .map(AwaitableStatus::new)
139    }
140
141    /// - `buffer` - If set to `None` or the buffer is not long enough,
142    ///   then [`crate::Data::AllocatedBox`] will be returned.
143    ///
144    /// Return [`crate::Data::Buffer`] or
145    /// [`crate::Data::AllocatedBox`] if not EOF, otherwise returns
146    /// [`crate::Data::Eof`].
147    pub fn send_read_request(
148        &mut self,
149        id: Id<Buffer>,
150        handle: Cow<'_, Handle>,
151        offset: u64,
152        len: u32,
153        buffer: Option<Buffer>,
154    ) -> Result<AwaitableData<Buffer>, Error> {
155        self.send_request(
156            id,
157            RequestInner::Read {
158                handle,
159                offset,
160                len,
161            },
162            buffer,
163        )
164        .map(AwaitableData::new)
165    }
166
167    pub fn send_remove_request(
168        &mut self,
169        id: Id<Buffer>,
170        path: Cow<'_, Path>,
171    ) -> Result<AwaitableStatus<Buffer>, Error> {
172        self.send_request(id, RequestInner::Remove(path), None)
173            .map(AwaitableStatus::new)
174    }
175
176    pub fn send_rename_request(
177        &mut self,
178        id: Id<Buffer>,
179        oldpath: Cow<'_, Path>,
180        newpath: Cow<'_, Path>,
181    ) -> Result<AwaitableStatus<Buffer>, Error> {
182        self.send_request(id, RequestInner::Rename { oldpath, newpath }, None)
183            .map(AwaitableStatus::new)
184    }
185
186    /// * `attrs` - [`FileAttrs::get_size`] must be equal to `None`.
187    pub fn send_mkdir_request(
188        &mut self,
189        id: Id<Buffer>,
190        path: Cow<'_, Path>,
191        attrs: FileAttrs,
192    ) -> Result<AwaitableStatus<Buffer>, Error> {
193        self.send_request(id, RequestInner::Mkdir { path, attrs }, None)
194            .map(AwaitableStatus::new)
195    }
196
197    pub fn send_rmdir_request(
198        &mut self,
199        id: Id<Buffer>,
200        path: Cow<'_, Path>,
201    ) -> Result<AwaitableStatus<Buffer>, Error> {
202        self.send_request(id, RequestInner::Rmdir(path), None)
203            .map(AwaitableStatus::new)
204    }
205
206    pub fn send_opendir_request(
207        &mut self,
208        id: Id<Buffer>,
209        path: Cow<'_, Path>,
210    ) -> Result<AwaitableHandle<Buffer>, Error> {
211        self.send_request(id, RequestInner::Opendir(path), None)
212            .map(AwaitableHandle::new)
213    }
214
215    /// Return entries in the directory specified by the `handle`, including
216    /// `.` and `..`.
217    ///
218    /// The `filename` only contains the basename.
219    ///
220    /// **NOTE that it does not return all entries in one response.**
221    /// You would have to keep calling `send_readdir_request` until it returns
222    /// an empty `Box`.
223    pub fn send_readdir_request(
224        &mut self,
225        id: Id<Buffer>,
226        handle: Cow<'_, Handle>,
227    ) -> Result<AwaitableNameEntries<Buffer>, Error> {
228        self.send_request(id, RequestInner::Readdir(handle), None)
229            .map(AwaitableNameEntries::new)
230    }
231
232    pub fn send_stat_request(
233        &mut self,
234        id: Id<Buffer>,
235        path: Cow<'_, Path>,
236    ) -> Result<AwaitableAttrs<Buffer>, Error> {
237        self.send_request(id, RequestInner::Stat(path), None)
238            .map(AwaitableAttrs::new)
239    }
240
241    /// Does not follow symlink
242    pub fn send_lstat_request(
243        &mut self,
244        id: Id<Buffer>,
245        path: Cow<'_, Path>,
246    ) -> Result<AwaitableAttrs<Buffer>, Error> {
247        self.send_request(id, RequestInner::Lstat(path), None)
248            .map(AwaitableAttrs::new)
249    }
250
251    /// * `handle` - Must be opened with `FileMode::READ`.
252    pub fn send_fstat_request(
253        &mut self,
254        id: Id<Buffer>,
255        handle: Cow<'_, Handle>,
256    ) -> Result<AwaitableAttrs<Buffer>, Error> {
257        self.send_request(id, RequestInner::Fstat(handle), None)
258            .map(AwaitableAttrs::new)
259    }
260
261    pub fn send_setstat_request(
262        &mut self,
263        id: Id<Buffer>,
264        path: Cow<'_, Path>,
265        attrs: FileAttrs,
266    ) -> Result<AwaitableStatus<Buffer>, Error> {
267        self.send_request(id, RequestInner::Setstat { path, attrs }, None)
268            .map(AwaitableStatus::new)
269    }
270
271    /// * `handle` - Must be opened with `OpenOptions::write` set.
272    pub fn send_fsetstat_request(
273        &mut self,
274        id: Id<Buffer>,
275        handle: Cow<'_, Handle>,
276        attrs: FileAttrs,
277    ) -> Result<AwaitableStatus<Buffer>, Error> {
278        self.send_request(id, RequestInner::Fsetstat { handle, attrs }, None)
279            .map(AwaitableStatus::new)
280    }
281
282    pub fn send_readlink_request(
283        &mut self,
284        id: Id<Buffer>,
285        path: Cow<'_, Path>,
286    ) -> Result<AwaitableName<Buffer>, Error> {
287        self.send_request(id, RequestInner::Readlink(path), None)
288            .map(AwaitableName::new)
289    }
290
291    pub fn send_realpath_request(
292        &mut self,
293        id: Id<Buffer>,
294        path: Cow<'_, Path>,
295    ) -> Result<AwaitableName<Buffer>, Error> {
296        self.send_request(id, RequestInner::Realpath(path), None)
297            .map(AwaitableName::new)
298    }
299
300    /// Create symlink
301    pub fn send_symlink_request(
302        &mut self,
303        id: Id<Buffer>,
304        targetpath: Cow<'_, Path>,
305        linkpath: Cow<'_, Path>,
306    ) -> Result<AwaitableStatus<Buffer>, Error> {
307        self.send_request(
308            id,
309            RequestInner::Symlink {
310                linkpath,
311                targetpath,
312            },
313            None,
314        )
315        .map(AwaitableStatus::new)
316    }
317
318    /// Return limits of the server
319    ///
320    /// # Precondition
321    ///
322    /// Requires `extensions::contains(Extensions::LIMITS)` to be true.
323    pub fn send_limits_request(
324        &mut self,
325        id: Id<Buffer>,
326    ) -> Result<AwaitableLimits<Buffer>, Error> {
327        self.send_request(id, RequestInner::Limits, None)
328            .map(AwaitableLimits::new)
329    }
330
331    /// This supports canonicalisation of relative paths and those that need
332    /// tilde-expansion, i.e. "~", "~/..." and "~user/...".
333    ///
334    /// These paths are expanded using shell-like rules and the resultant path
335    /// is canonicalised similarly to [`WriteEnd::send_realpath_request`].
336    ///
337    /// # Precondition
338    ///
339    /// Requires `extensions::contains(Extensions::EXPAND_PATH)` to be true.
340    pub fn send_expand_path_request(
341        &mut self,
342        id: Id<Buffer>,
343        path: Cow<'_, Path>,
344    ) -> Result<AwaitableName<Buffer>, Error> {
345        self.send_request(id, RequestInner::ExpandPath(path), None)
346            .map(AwaitableName::new)
347    }
348
349    /// # Precondition
350    ///
351    /// Requires `extensions::contains(Extensions::FSYNC)` to be true.
352    pub fn send_fsync_request(
353        &mut self,
354        id: Id<Buffer>,
355        handle: Cow<'_, Handle>,
356    ) -> Result<AwaitableStatus<Buffer>, Error> {
357        self.send_request(id, RequestInner::Fsync(handle), None)
358            .map(AwaitableStatus::new)
359    }
360
361    /// # Precondition
362    ///
363    /// Requires `extensions::contains(Extensions::HARDLINK)` to be true.
364    pub fn send_hardlink_request(
365        &mut self,
366        id: Id<Buffer>,
367        oldpath: Cow<'_, Path>,
368        newpath: Cow<'_, Path>,
369    ) -> Result<AwaitableStatus<Buffer>, Error> {
370        self.send_request(id, RequestInner::HardLink { oldpath, newpath }, None)
371            .map(AwaitableStatus::new)
372    }
373
374    /// # Precondition
375    ///
376    /// Requires `extensions::contains(Extensions::POSIX_RENAME)` to be true.
377    pub fn send_posix_rename_request(
378        &mut self,
379        id: Id<Buffer>,
380        oldpath: Cow<'_, Path>,
381        newpath: Cow<'_, Path>,
382    ) -> Result<AwaitableStatus<Buffer>, Error> {
383        self.send_request(id, RequestInner::PosixRename { oldpath, newpath }, None)
384            .map(AwaitableStatus::new)
385    }
386
387    /// The server MUST copy the data exactly as if the client had issued a
388    /// series of [`RequestInner::Read`] requests on the `read_from_handle`
389    /// starting at `read_from_offset` and totaling `read_data_length` bytes,
390    /// and issued a series of [`RequestInner::Write`] packets on the
391    /// `write_to_handle`, starting at the `write_from_offset`, and totaling
392    /// the total number of bytes read by the [`RequestInner::Read`] packets.
393    ///
394    /// The server SHOULD allow `read_from_handle` and `write_to_handle` to
395    /// be the same handle as long as the range of data is not overlapping.
396    /// This allows data to efficiently be moved within a file.
397    ///
398    /// If `data_length` is `0`, this imples data should be read until EOF is
399    /// encountered.
400    ///
401    /// There are no protocol restictions on this operation; however, the
402    /// server MUST ensure that the user does not exceed quota, etc.  The
403    /// server is, as always, free to complete this operation out of order if
404    /// it is too large to complete immediately, or to refuse a request that
405    /// is too large.
406    ///
407    /// # Precondition
408    ///
409    /// Requires `extensions::contains(Extensions::COPY_DATA)` to be true.
410    ///
411    /// For [openssh-portable], this is available from V_9_0_P1.
412    ///
413    /// [openssh-portable]: https://github.com/openssh/openssh-portable
414    pub fn send_copy_data_request(
415        &mut self,
416        id: Id<Buffer>,
417
418        read_from_handle: Cow<'_, Handle>,
419        read_from_offset: u64,
420        read_data_length: u64,
421
422        write_to_handle: Cow<'_, Handle>,
423        write_to_offset: u64,
424    ) -> Result<AwaitableStatus<Buffer>, Error> {
425        self.send_request(
426            id,
427            RequestInner::Cp {
428                read_from_handle,
429                read_from_offset,
430                read_data_length,
431                write_to_handle,
432                write_to_offset,
433            },
434            None,
435        )
436        .map(AwaitableStatus::new)
437    }
438}
439
440impl<Buffer, Q, Auxiliary> WriteEnd<Buffer, Q, Auxiliary>
441where
442    Buffer: ToBuffer + Send + Sync + 'static,
443    Q: Queue,
444{
445    /// Write will extend the file if writing beyond the end of the file.
446    ///
447    /// It is legal to write way beyond the end of the file, the semantics
448    /// are to write zeroes from the end of the file to the specified offset
449    /// and then the data.
450    ///
451    /// On most operating systems, such writes do not allocate disk space but
452    /// instead leave "holes" in the file.
453    ///
454    /// This function is only suitable for writing small data since it needs to copy the
455    /// entire `data` into buffer.
456    pub fn send_write_request_buffered(
457        &mut self,
458        id: Id<Buffer>,
459        handle: Cow<'_, Handle>,
460        offset: u64,
461        data: Cow<'_, [u8]>,
462    ) -> Result<AwaitableStatus<Buffer>, Error> {
463        let len: u32 = data.len().try_into()?;
464
465        self.serializer.reserve(
466            // 9 bytes for the 4-byte len of packet, 1-byte packet type and
467            // 4-byte request id
468            9 +
469            handle.into_inner().len() +
470            // 8 bytes for the offset
471            8 +
472            // 4 bytes for the lenght of the data to be sent
473            4 +
474            // len of the data
475            len as usize,
476        );
477
478        self.send_request(
479            id,
480            RequestInner::Write {
481                handle,
482                offset,
483                data,
484            },
485            None,
486        )
487        .map(AwaitableStatus::new)
488    }
489
490    /// Write will extend the file if writing beyond the end of the file.
491    ///
492    /// It is legal to write way beyond the end of the file, the semantics
493    /// are to write zeroes from the end of the file to the specified offset
494    /// and then the data.
495    ///
496    /// On most operating systems, such writes do not allocate disk space but
497    /// instead leave "holes" in the file.
498    ///
499    /// This function is only suitable for writing small data since it needs to copy the
500    /// entire `data` into buffer.
501    pub fn send_write_request_buffered_vectored(
502        &mut self,
503        id: Id<Buffer>,
504        handle: Cow<'_, Handle>,
505        offset: u64,
506        io_slices: &[IoSlice<'_>],
507    ) -> Result<AwaitableStatus<Buffer>, Error> {
508        self.send_write_request_buffered_vectored2(id, handle, offset, &[io_slices])
509    }
510
511    fn serialize_write_request<'a>(
512        serializer: &'a mut Serializer<BytesMut>,
513        request_id: u32,
514        handle: Cow<'_, Handle>,
515        offset: u64,
516        len: u32,
517    ) -> Result<&'a mut BytesMut, Error> {
518        Self::reset(serializer);
519
520        let header = Request::serialize_write_request(serializer, request_id, handle, offset, len)?;
521
522        let buffer = &mut serializer.output;
523        // Write the header
524        buffer[..4].copy_from_slice(&header);
525
526        Ok(buffer)
527    }
528
529    /// Write will extend the file if writing beyond the end of the file.
530    ///
531    /// It is legal to write way beyond the end of the file, the semantics
532    /// are to write zeroes from the end of the file to the specified offset
533    /// and then the data.
534    ///
535    /// On most operating systems, such writes do not allocate disk space but
536    /// instead leave "holes" in the file.
537    ///
538    /// This function is only suitable for writing small data since it needs to copy the
539    /// entire `data` into buffer.
540    pub fn send_write_request_buffered_vectored2(
541        &mut self,
542        id: Id<Buffer>,
543        handle: Cow<'_, Handle>,
544        offset: u64,
545        bufs: &[&[IoSlice<'_>]],
546    ) -> Result<AwaitableStatus<Buffer>, Error> {
547        let len: usize = bufs
548            .iter()
549            .flat_map(Deref::deref)
550            .map(|io_slice| io_slice.len())
551            .sum();
552        let len: u32 = len.try_into()?;
553
554        self.serializer.reserve(
555            // 9 bytes for the 4-byte len of packet, 1-byte packet type and
556            // 4-byte request id
557            9 +
558            handle.into_inner().len() +
559            // 8 bytes for the offset
560            8 +
561            // 4 bytes for the lenght of the data to be sent
562            4 +
563            // len of the data
564            len as usize,
565        );
566
567        let buffer = Self::serialize_write_request(
568            &mut self.serializer,
569            ArenaArc::slot(&id.0),
570            handle,
571            offset,
572            len,
573        )?;
574
575        bufs.iter()
576            .flat_map(|io_slices| io_slices.iter())
577            .for_each(|io_slice| {
578                buffer.put_slice(io_slice);
579            });
580
581        id.0.reset(None);
582        self.shared_data.queue().push(buffer.split().freeze());
583
584        Ok(AwaitableStatus::new(id.into_inner()))
585    }
586
587    /// Write will extend the file if writing beyond the end of the file.
588    ///
589    /// It is legal to write way beyond the end of the file, the semantics
590    /// are to write zeroes from the end of the file to the specified offset
591    /// and then the data.
592    ///
593    /// On most operating systems, such writes do not allocate disk space but
594    /// instead leave "holes" in the file.
595    ///
596    /// This function is zero-copy.
597    pub fn send_write_request_zero_copy(
598        &mut self,
599        id: Id<Buffer>,
600        handle: Cow<'_, Handle>,
601        offset: u64,
602        data: &[Bytes],
603    ) -> Result<AwaitableStatus<Buffer>, Error> {
604        self.send_write_request_zero_copy2(id, handle, offset, &[data])
605    }
606
607    /// Write will extend the file if writing beyond the end of the file.
608    ///
609    /// It is legal to write way beyond the end of the file, the semantics
610    /// are to write zeroes from the end of the file to the specified offset
611    /// and then the data.
612    ///
613    /// On most operating systems, such writes do not allocate disk space but
614    /// instead leave "holes" in the file.
615    ///
616    /// This function is zero-copy.
617    pub fn send_write_request_zero_copy2(
618        &mut self,
619        id: Id<Buffer>,
620        handle: Cow<'_, Handle>,
621        offset: u64,
622        data_slice: &[&[Bytes]],
623    ) -> Result<AwaitableStatus<Buffer>, Error> {
624        let len: usize = data_slice
625            .iter()
626            .flat_map(Deref::deref)
627            .map(Bytes::len)
628            .sum();
629        let len: u32 = len.try_into()?;
630
631        let header = Self::serialize_write_request(
632            &mut self.serializer,
633            ArenaArc::slot(&id.0),
634            handle,
635            offset,
636            len,
637        )?
638        .split()
639        .freeze();
640
641        id.0.reset(None);
642        self.shared_data.queue().extend(header, data_slice);
643
644        Ok(AwaitableStatus::new(id.into_inner()))
645    }
646}