1#![forbid(unsafe_code)]
2
3use super::*;
4
5use awaitable_responses::ArenaArc;
6use connection::SharedData;
7
8use std::{
9 borrow::Cow,
10 convert::TryInto,
11 fmt::Debug,
12 io::IoSlice,
13 ops::{Deref, DerefMut},
14 path::Path,
15};
16
17use bytes::{BufMut, Bytes, BytesMut};
18use openssh_sftp_protocol::{
19 file_attrs::FileAttrs, request::*, serde::Serialize, ssh_format::Serializer, Handle,
20};
21
22#[derive(Debug)]
25pub struct WriteEnd<Buffer, Q, Auxiliary = ()> {
26 serializer: Serializer<BytesMut>,
27 shared_data: SharedData<Buffer, Q, Auxiliary>,
28}
29
30impl<Buffer, Q, Auxiliary> Clone for WriteEnd<Buffer, Q, Auxiliary> {
31 fn clone(&self) -> Self {
32 Self::new(self.shared_data.clone())
33 }
34}
35
36impl<Buffer, Q, Auxiliary> WriteEnd<Buffer, Q, Auxiliary> {
37 pub fn new(shared_data: SharedData<Buffer, Q, Auxiliary>) -> Self {
39 Self {
40 serializer: Serializer::default(),
41 shared_data,
42 }
43 }
44
45 pub fn into_shared_data(self) -> SharedData<Buffer, Q, Auxiliary> {
47 self.shared_data
48 }
49}
50
51impl<Buffer, Q, Auxiliary> Deref for WriteEnd<Buffer, Q, Auxiliary> {
52 type Target = SharedData<Buffer, Q, Auxiliary>;
53
54 fn deref(&self) -> &Self::Target {
55 &self.shared_data
56 }
57}
58
59impl<Buffer, Q, Auxiliary> DerefMut for WriteEnd<Buffer, Q, Auxiliary> {
60 fn deref_mut(&mut self) -> &mut Self::Target {
61 &mut self.shared_data
62 }
63}
64
65impl<Buffer, Q, Auxiliary> WriteEnd<Buffer, Q, Auxiliary>
66where
67 Buffer: Send + Sync,
68 Q: Queue,
69{
70 pub(crate) fn send_hello(&mut self, version: u32) -> Result<(), Error> {
71 self.shared_data
72 .queue()
73 .push(Self::serialize(&mut self.serializer, Hello { version })?);
74
75 Ok(())
76 }
77
78 fn reset(serializer: &mut Serializer<BytesMut>) {
79 serializer.reset_counter();
80 serializer.output.resize(4, 0);
82 }
83
84 fn serialize<T>(serializer: &mut Serializer<BytesMut>, value: T) -> Result<Bytes, Error>
85 where
86 T: Serialize,
87 {
88 Self::reset(serializer);
89
90 value.serialize(&mut *serializer)?;
91
92 let header = serializer.create_header(0)?;
93 serializer.output[..4].copy_from_slice(&header);
95
96 Ok(serializer.output.split().freeze())
97 }
98
99 fn send_request(
104 &mut self,
105 id: Id<Buffer>,
106 request: RequestInner<'_>,
107 buffer: Option<Buffer>,
108 ) -> Result<ArenaArc<Buffer>, Error> {
109 let serialized = Self::serialize(
110 &mut self.serializer,
111 Request {
112 request_id: ArenaArc::slot(&id.0),
113 inner: request,
114 },
115 )?;
116
117 id.0.reset(buffer);
118 self.shared_data.queue().push(serialized);
119
120 Ok(id.into_inner())
121 }
122
123 pub fn send_open_file_request(
124 &mut self,
125 id: Id<Buffer>,
126 params: OpenFileRequest<'_>,
127 ) -> Result<AwaitableHandle<Buffer>, Error> {
128 self.send_request(id, RequestInner::Open(params), None)
129 .map(AwaitableHandle::new)
130 }
131
132 pub fn send_close_request(
133 &mut self,
134 id: Id<Buffer>,
135 handle: Cow<'_, Handle>,
136 ) -> Result<AwaitableStatus<Buffer>, Error> {
137 self.send_request(id, RequestInner::Close(handle), None)
138 .map(AwaitableStatus::new)
139 }
140
141 pub fn send_read_request(
148 &mut self,
149 id: Id<Buffer>,
150 handle: Cow<'_, Handle>,
151 offset: u64,
152 len: u32,
153 buffer: Option<Buffer>,
154 ) -> Result<AwaitableData<Buffer>, Error> {
155 self.send_request(
156 id,
157 RequestInner::Read {
158 handle,
159 offset,
160 len,
161 },
162 buffer,
163 )
164 .map(AwaitableData::new)
165 }
166
167 pub fn send_remove_request(
168 &mut self,
169 id: Id<Buffer>,
170 path: Cow<'_, Path>,
171 ) -> Result<AwaitableStatus<Buffer>, Error> {
172 self.send_request(id, RequestInner::Remove(path), None)
173 .map(AwaitableStatus::new)
174 }
175
176 pub fn send_rename_request(
177 &mut self,
178 id: Id<Buffer>,
179 oldpath: Cow<'_, Path>,
180 newpath: Cow<'_, Path>,
181 ) -> Result<AwaitableStatus<Buffer>, Error> {
182 self.send_request(id, RequestInner::Rename { oldpath, newpath }, None)
183 .map(AwaitableStatus::new)
184 }
185
186 pub fn send_mkdir_request(
188 &mut self,
189 id: Id<Buffer>,
190 path: Cow<'_, Path>,
191 attrs: FileAttrs,
192 ) -> Result<AwaitableStatus<Buffer>, Error> {
193 self.send_request(id, RequestInner::Mkdir { path, attrs }, None)
194 .map(AwaitableStatus::new)
195 }
196
197 pub fn send_rmdir_request(
198 &mut self,
199 id: Id<Buffer>,
200 path: Cow<'_, Path>,
201 ) -> Result<AwaitableStatus<Buffer>, Error> {
202 self.send_request(id, RequestInner::Rmdir(path), None)
203 .map(AwaitableStatus::new)
204 }
205
206 pub fn send_opendir_request(
207 &mut self,
208 id: Id<Buffer>,
209 path: Cow<'_, Path>,
210 ) -> Result<AwaitableHandle<Buffer>, Error> {
211 self.send_request(id, RequestInner::Opendir(path), None)
212 .map(AwaitableHandle::new)
213 }
214
215 pub fn send_readdir_request(
224 &mut self,
225 id: Id<Buffer>,
226 handle: Cow<'_, Handle>,
227 ) -> Result<AwaitableNameEntries<Buffer>, Error> {
228 self.send_request(id, RequestInner::Readdir(handle), None)
229 .map(AwaitableNameEntries::new)
230 }
231
232 pub fn send_stat_request(
233 &mut self,
234 id: Id<Buffer>,
235 path: Cow<'_, Path>,
236 ) -> Result<AwaitableAttrs<Buffer>, Error> {
237 self.send_request(id, RequestInner::Stat(path), None)
238 .map(AwaitableAttrs::new)
239 }
240
241 pub fn send_lstat_request(
243 &mut self,
244 id: Id<Buffer>,
245 path: Cow<'_, Path>,
246 ) -> Result<AwaitableAttrs<Buffer>, Error> {
247 self.send_request(id, RequestInner::Lstat(path), None)
248 .map(AwaitableAttrs::new)
249 }
250
251 pub fn send_fstat_request(
253 &mut self,
254 id: Id<Buffer>,
255 handle: Cow<'_, Handle>,
256 ) -> Result<AwaitableAttrs<Buffer>, Error> {
257 self.send_request(id, RequestInner::Fstat(handle), None)
258 .map(AwaitableAttrs::new)
259 }
260
261 pub fn send_setstat_request(
262 &mut self,
263 id: Id<Buffer>,
264 path: Cow<'_, Path>,
265 attrs: FileAttrs,
266 ) -> Result<AwaitableStatus<Buffer>, Error> {
267 self.send_request(id, RequestInner::Setstat { path, attrs }, None)
268 .map(AwaitableStatus::new)
269 }
270
271 pub fn send_fsetstat_request(
273 &mut self,
274 id: Id<Buffer>,
275 handle: Cow<'_, Handle>,
276 attrs: FileAttrs,
277 ) -> Result<AwaitableStatus<Buffer>, Error> {
278 self.send_request(id, RequestInner::Fsetstat { handle, attrs }, None)
279 .map(AwaitableStatus::new)
280 }
281
282 pub fn send_readlink_request(
283 &mut self,
284 id: Id<Buffer>,
285 path: Cow<'_, Path>,
286 ) -> Result<AwaitableName<Buffer>, Error> {
287 self.send_request(id, RequestInner::Readlink(path), None)
288 .map(AwaitableName::new)
289 }
290
291 pub fn send_realpath_request(
292 &mut self,
293 id: Id<Buffer>,
294 path: Cow<'_, Path>,
295 ) -> Result<AwaitableName<Buffer>, Error> {
296 self.send_request(id, RequestInner::Realpath(path), None)
297 .map(AwaitableName::new)
298 }
299
300 pub fn send_symlink_request(
302 &mut self,
303 id: Id<Buffer>,
304 targetpath: Cow<'_, Path>,
305 linkpath: Cow<'_, Path>,
306 ) -> Result<AwaitableStatus<Buffer>, Error> {
307 self.send_request(
308 id,
309 RequestInner::Symlink {
310 linkpath,
311 targetpath,
312 },
313 None,
314 )
315 .map(AwaitableStatus::new)
316 }
317
318 pub fn send_limits_request(
324 &mut self,
325 id: Id<Buffer>,
326 ) -> Result<AwaitableLimits<Buffer>, Error> {
327 self.send_request(id, RequestInner::Limits, None)
328 .map(AwaitableLimits::new)
329 }
330
331 pub fn send_expand_path_request(
341 &mut self,
342 id: Id<Buffer>,
343 path: Cow<'_, Path>,
344 ) -> Result<AwaitableName<Buffer>, Error> {
345 self.send_request(id, RequestInner::ExpandPath(path), None)
346 .map(AwaitableName::new)
347 }
348
349 pub fn send_fsync_request(
353 &mut self,
354 id: Id<Buffer>,
355 handle: Cow<'_, Handle>,
356 ) -> Result<AwaitableStatus<Buffer>, Error> {
357 self.send_request(id, RequestInner::Fsync(handle), None)
358 .map(AwaitableStatus::new)
359 }
360
361 pub fn send_hardlink_request(
365 &mut self,
366 id: Id<Buffer>,
367 oldpath: Cow<'_, Path>,
368 newpath: Cow<'_, Path>,
369 ) -> Result<AwaitableStatus<Buffer>, Error> {
370 self.send_request(id, RequestInner::HardLink { oldpath, newpath }, None)
371 .map(AwaitableStatus::new)
372 }
373
374 pub fn send_posix_rename_request(
378 &mut self,
379 id: Id<Buffer>,
380 oldpath: Cow<'_, Path>,
381 newpath: Cow<'_, Path>,
382 ) -> Result<AwaitableStatus<Buffer>, Error> {
383 self.send_request(id, RequestInner::PosixRename { oldpath, newpath }, None)
384 .map(AwaitableStatus::new)
385 }
386
387 pub fn send_copy_data_request(
415 &mut self,
416 id: Id<Buffer>,
417
418 read_from_handle: Cow<'_, Handle>,
419 read_from_offset: u64,
420 read_data_length: u64,
421
422 write_to_handle: Cow<'_, Handle>,
423 write_to_offset: u64,
424 ) -> Result<AwaitableStatus<Buffer>, Error> {
425 self.send_request(
426 id,
427 RequestInner::Cp {
428 read_from_handle,
429 read_from_offset,
430 read_data_length,
431 write_to_handle,
432 write_to_offset,
433 },
434 None,
435 )
436 .map(AwaitableStatus::new)
437 }
438}
439
440impl<Buffer, Q, Auxiliary> WriteEnd<Buffer, Q, Auxiliary>
441where
442 Buffer: ToBuffer + Send + Sync + 'static,
443 Q: Queue,
444{
445 pub fn send_write_request_buffered(
457 &mut self,
458 id: Id<Buffer>,
459 handle: Cow<'_, Handle>,
460 offset: u64,
461 data: Cow<'_, [u8]>,
462 ) -> Result<AwaitableStatus<Buffer>, Error> {
463 let len: u32 = data.len().try_into()?;
464
465 self.serializer.reserve(
466 9 +
469 handle.into_inner().len() +
470 8 +
472 4 +
474 len as usize,
476 );
477
478 self.send_request(
479 id,
480 RequestInner::Write {
481 handle,
482 offset,
483 data,
484 },
485 None,
486 )
487 .map(AwaitableStatus::new)
488 }
489
490 pub fn send_write_request_buffered_vectored(
502 &mut self,
503 id: Id<Buffer>,
504 handle: Cow<'_, Handle>,
505 offset: u64,
506 io_slices: &[IoSlice<'_>],
507 ) -> Result<AwaitableStatus<Buffer>, Error> {
508 self.send_write_request_buffered_vectored2(id, handle, offset, &[io_slices])
509 }
510
511 fn serialize_write_request<'a>(
512 serializer: &'a mut Serializer<BytesMut>,
513 request_id: u32,
514 handle: Cow<'_, Handle>,
515 offset: u64,
516 len: u32,
517 ) -> Result<&'a mut BytesMut, Error> {
518 Self::reset(serializer);
519
520 let header = Request::serialize_write_request(serializer, request_id, handle, offset, len)?;
521
522 let buffer = &mut serializer.output;
523 buffer[..4].copy_from_slice(&header);
525
526 Ok(buffer)
527 }
528
529 pub fn send_write_request_buffered_vectored2(
541 &mut self,
542 id: Id<Buffer>,
543 handle: Cow<'_, Handle>,
544 offset: u64,
545 bufs: &[&[IoSlice<'_>]],
546 ) -> Result<AwaitableStatus<Buffer>, Error> {
547 let len: usize = bufs
548 .iter()
549 .flat_map(Deref::deref)
550 .map(|io_slice| io_slice.len())
551 .sum();
552 let len: u32 = len.try_into()?;
553
554 self.serializer.reserve(
555 9 +
558 handle.into_inner().len() +
559 8 +
561 4 +
563 len as usize,
565 );
566
567 let buffer = Self::serialize_write_request(
568 &mut self.serializer,
569 ArenaArc::slot(&id.0),
570 handle,
571 offset,
572 len,
573 )?;
574
575 bufs.iter()
576 .flat_map(|io_slices| io_slices.iter())
577 .for_each(|io_slice| {
578 buffer.put_slice(io_slice);
579 });
580
581 id.0.reset(None);
582 self.shared_data.queue().push(buffer.split().freeze());
583
584 Ok(AwaitableStatus::new(id.into_inner()))
585 }
586
587 pub fn send_write_request_zero_copy(
598 &mut self,
599 id: Id<Buffer>,
600 handle: Cow<'_, Handle>,
601 offset: u64,
602 data: &[Bytes],
603 ) -> Result<AwaitableStatus<Buffer>, Error> {
604 self.send_write_request_zero_copy2(id, handle, offset, &[data])
605 }
606
607 pub fn send_write_request_zero_copy2(
618 &mut self,
619 id: Id<Buffer>,
620 handle: Cow<'_, Handle>,
621 offset: u64,
622 data_slice: &[&[Bytes]],
623 ) -> Result<AwaitableStatus<Buffer>, Error> {
624 let len: usize = data_slice
625 .iter()
626 .flat_map(Deref::deref)
627 .map(Bytes::len)
628 .sum();
629 let len: u32 = len.try_into()?;
630
631 let header = Self::serialize_write_request(
632 &mut self.serializer,
633 ArenaArc::slot(&id.0),
634 handle,
635 offset,
636 len,
637 )?
638 .split()
639 .freeze();
640
641 id.0.reset(None);
642 self.shared_data.queue().extend(header, data_slice);
643
644 Ok(AwaitableStatus::new(id.into_inner()))
645 }
646}