openssh_sftp_client_lowlevel/
connection.rs

1#![forbid(unsafe_code)]
2
3use super::{awaitable_responses::AwaitableResponses, *};
4
5use std::{fmt, sync::Arc};
6
7use openssh_sftp_protocol::constants::SSH2_FILEXFER_VERSION;
8
9// TODO:
10//  - Support for zero copy syscalls
11
12#[derive(Debug)]
13struct SharedDataInner<Buffer, Q, Auxiliary> {
14    queue: Q,
15    responses: AwaitableResponses<Buffer>,
16
17    auxiliary: Auxiliary,
18}
19
20/// SharedData contains both the writer and the responses because:
21///  - The overhead of `Arc` and a separate allocation;
22///  - If the write end of a connection is closed, then openssh implementation
23///    of sftp-server would close the read end right away, discarding
24///    any unsent but processed or unprocessed responses.
25#[derive(Debug)]
26pub struct SharedData<Buffer, Q, Auxiliary = ()>(Arc<SharedDataInner<Buffer, Q, Auxiliary>>);
27
28impl<Buffer, Q, Auxiliary> fmt::Pointer for SharedData<Buffer, Q, Auxiliary> {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        fmt::Pointer::fmt(&self.0, f)
31    }
32}
33
34impl<Buffer, Q, Auxiliary> Clone for SharedData<Buffer, Q, Auxiliary> {
35    fn clone(&self) -> Self {
36        Self(self.0.clone())
37    }
38}
39
40impl<Buffer: Send + Sync, Q, Auxiliary> SharedData<Buffer, Q, Auxiliary> {
41    fn new(queue: Q, auxiliary: Auxiliary) -> Self {
42        SharedData(Arc::new(SharedDataInner {
43            responses: AwaitableResponses::new(),
44            queue,
45
46            auxiliary,
47        }))
48    }
49}
50
51impl<Buffer, Q, Auxiliary> SharedData<Buffer, Q, Auxiliary> {
52    pub fn queue(&self) -> &Q {
53        &self.0.queue
54    }
55
56    pub(crate) fn responses(&self) -> &AwaitableResponses<Buffer> {
57        &self.0.responses
58    }
59
60    /// Returned the auxiliary data.
61    pub fn get_auxiliary(&self) -> &Auxiliary {
62        &self.0.auxiliary
63    }
64}
65
66impl<Buffer: Send + Sync, Q, Auxiliary> SharedData<Buffer, Q, Auxiliary> {
67    /// Create a useable response id.
68    #[inline(always)]
69    pub fn create_response_id(&self) -> Id<Buffer> {
70        self.responses().insert()
71    }
72
73    /// Return true if reserve succeeds, false otherwise.
74    #[inline(always)]
75    pub fn try_reserve_id(&self, new_id_cnt: u32) -> bool {
76        self.responses().try_reserve(new_id_cnt)
77    }
78
79    /// Return true if reserve succeeds, false otherwise.
80    #[inline(always)]
81    pub fn reserve_id(&self, new_id_cnt: u32) {
82        self.responses().reserve(new_id_cnt);
83    }
84}
85
86/// Initialize connection to remote sftp server and
87/// negotiate the sftp version.
88///
89/// User of this function must manually create [`ReadEnd`]
90/// and manually flush the buffer.
91///
92/// # Cancel Safety
93///
94/// This function is not cancel safe.
95///
96/// After dropping the future, the connection would be in a undefined state.
97pub fn connect<Buffer, Q, Auxiliary>(
98    queue: Q,
99    auxiliary: Auxiliary,
100) -> Result<WriteEnd<Buffer, Q, Auxiliary>, Error>
101where
102    Buffer: ToBuffer + Send + Sync + 'static,
103    Q: Queue,
104{
105    let shared_data = SharedData::new(queue, auxiliary);
106
107    // Send hello message
108    let mut write_end = WriteEnd::new(shared_data);
109    write_end.send_hello(SSH2_FILEXFER_VERSION)?;
110
111    Ok(write_end)
112}