async_ssh2/
channel.rs

1use crate::{util::run_ssh2_fn, Error};
2use futures::prelude::*;
3use smol::Async;
4use ssh2::{self, ExitSignal, ExtendedData, PtyModes, ReadWindow, Stream, WriteWindow};
5use std::{
6    convert::From,
7    io,
8    io::{Read, Write},
9    net::TcpStream,
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13};
14
15/// See [`Channel`](ssh2::Channel).
16pub struct Channel {
17    inner: ssh2::Channel,
18    stream: Arc<Async<TcpStream>>,
19}
20
21impl Channel {
22    pub(crate) fn new(channel: ssh2::Channel, stream: Arc<Async<TcpStream>>) -> Self {
23        Self {
24            inner: channel,
25            stream,
26        }
27    }
28
29    /// See [`setenv`](ssh2::Channel::setenv).
30    pub async fn setenv(&mut self, var: &str, val: &str) -> Result<(), Error> {
31        run_ssh2_fn(&self.stream.clone(), || self.inner.setenv(var, val)).await
32    }
33
34    /// See [`request_pty`](ssh2::Channel::request_pty).
35    pub async fn request_pty(
36        &mut self,
37        term: &str,
38        mode: Option<PtyModes>,
39        dim: Option<(u32, u32, u32, u32)>,
40    ) -> Result<(), Error> {
41        run_ssh2_fn(&self.stream.clone(), || {
42            self.inner.request_pty(term, mode.clone(), dim)
43        })
44        .await
45    }
46
47    /// See [`request_pty_size`](ssh2::Channel::request_pty_size).
48    pub async fn request_pty_size(
49        &mut self,
50        width: u32,
51        height: u32,
52        width_px: Option<u32>,
53        height_px: Option<u32>,
54    ) -> Result<(), Error> {
55        run_ssh2_fn(&self.stream.clone(), || {
56            self.inner
57                .request_pty_size(width, height, width_px, height_px)
58        })
59        .await
60    }
61
62    /// See [`exec`](ssh2::Channel::exec).
63    pub async fn exec(&mut self, command: &str) -> Result<(), Error> {
64        run_ssh2_fn(&self.stream.clone(), || self.inner.exec(command)).await
65    }
66
67    /// See [`shell`](ssh2::Channel::shell).
68    pub async fn shell(&mut self) -> Result<(), Error> {
69        run_ssh2_fn(&self.stream.clone(), || self.inner.shell()).await
70    }
71
72    /// See [`subsystem`](ssh2::Channel::subsystem).
73    pub async fn subsystem(&mut self, system: &str) -> Result<(), Error> {
74        run_ssh2_fn(&self.stream.clone(), || self.inner.subsystem(system)).await
75    }
76
77    /// See [`process_startup`](ssh2::Channel::process_startup).
78    pub async fn process_startup(
79        &mut self,
80        request: &str,
81        message: Option<&str>,
82    ) -> Result<(), Error> {
83        run_ssh2_fn(&self.stream.clone(), || {
84            self.inner.process_startup(request, message)
85        })
86        .await
87    }
88
89    /// See [`stderr`](ssh2::Channel::stderr).
90    pub fn stderr(&mut self) -> Stream {
91        self.inner.stderr()
92    }
93
94    /// See [`stream`](ssh2::Channel::stream).
95    pub fn stream(&mut self, stream_id: i32) -> Stream {
96        self.inner.stream(stream_id)
97    }
98
99    /// See [`handle_extended_data`](ssh2::Channel::handle_extended_data).
100    pub async fn handle_extended_data(&mut self, mode: ExtendedData) -> Result<(), Error> {
101        run_ssh2_fn(&self.stream.clone(), || {
102            self.inner.handle_extended_data(mode)
103        })
104        .await
105    }
106
107    /// See [`exit_status`](ssh2::Channel::exit_status).
108    pub fn exit_status(&self) -> Result<i32, Error> {
109        self.inner.exit_status().map_err(From::from)
110    }
111
112    /// See [`exit_signal`](ssh2::Channel::exit_signal).
113    pub fn exit_signal(&self) -> Result<ExitSignal, Error> {
114        self.inner.exit_signal().map_err(From::from)
115    }
116
117    /// See [`read_window`](ssh2::Channel::read_window).
118    pub fn read_window(&self) -> ReadWindow {
119        self.inner.read_window()
120    }
121
122    /// See [`write_window`](ssh2::Channel::write_window).
123    pub fn write_window(&self) -> WriteWindow {
124        self.inner.write_window()
125    }
126
127    /// See [`adjust_receive_window`](ssh2::Channel::adjust_receive_window).
128    pub async fn adjust_receive_window(&mut self, adjust: u64, force: bool) -> Result<u64, Error> {
129        run_ssh2_fn(&self.stream.clone(), || {
130            self.inner.adjust_receive_window(adjust, force)
131        })
132        .await
133    }
134
135    /// See [`eof`](ssh2::Channel::eof).
136    pub fn eof(&self) -> bool {
137        self.inner.eof()
138    }
139
140    /// See [`send_eof`](ssh2::Channel::send_eof).
141    pub async fn send_eof(&mut self) -> Result<(), Error> {
142        run_ssh2_fn(&self.stream.clone(), || self.inner.send_eof()).await
143    }
144
145    /// See [`wait_eof`](ssh2::Channel::wait_eof).
146    pub async fn wait_eof(&mut self) -> Result<(), Error> {
147        run_ssh2_fn(&self.stream.clone(), || self.inner.wait_eof()).await
148    }
149
150    /// See [`close`](ssh2::Channel::close).
151    pub async fn close(&mut self) -> Result<(), Error> {
152        run_ssh2_fn(&self.stream.clone(), || self.inner.close()).await
153    }
154
155    /// See [`wait_close`](ssh2::Channel::wait_close).
156    pub async fn wait_close(&mut self) -> Result<(), Error> {
157        run_ssh2_fn(&self.stream.clone(), || self.inner.wait_close()).await
158    }
159}
160
161impl AsyncRead for Channel {
162    fn poll_read(
163        mut self: Pin<&mut Self>,
164        cx: &mut Context<'_>,
165        buf: &mut [u8],
166    ) -> Poll<io::Result<usize>> {
167        self.stream
168            .clone()
169            .read_with(|_s| self.inner.read(buf))
170            .boxed()
171            .poll_unpin(cx)
172    }
173}
174
175impl AsyncWrite for Channel {
176    fn poll_write(
177        mut self: Pin<&mut Self>,
178        cx: &mut Context<'_>,
179        buf: &[u8],
180    ) -> Poll<Result<usize, io::Error>> {
181        self.stream
182            .clone()
183            .write_with(|_s| self.inner.write(buf))
184            .boxed()
185            .poll_unpin(cx)
186    }
187
188    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
189        self.stream
190            .clone()
191            .write_with(|_s| self.inner.flush())
192            .boxed()
193            .poll_unpin(cx)
194    }
195
196    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
197        self.stream
198            .clone()
199            .write_with(|_s| self.inner.close().map_err(|e| io::Error::from(ssh2::Error::from_errno(e.code()))))
200            .boxed()
201            .poll_unpin(cx)
202    }
203}
204
205/*
206impl<'channel> Read for Stream<'channel> {
207    fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
208        if self.channel.eof() {
209            return Ok(0);
210        }
211
212        let data = match self.channel.read_limit {
213            Some(amt) => {
214                let len = data.len();
215                &mut data[..cmp::min(amt as usize, len)]
216            }
217            None => data,
218        };
219        let ret = unsafe {
220            let rc = raw::libssh2_channel_read_ex(
221                self.channel.raw,
222                self.id as c_int,
223                data.as_mut_ptr() as *mut _,
224                data.len() as size_t,
225            );
226            self.channel.sess.rc(rc as c_int).map(|()| rc as usize)
227        };
228        match ret {
229            Ok(n) => {
230                if let Some(ref mut amt) = self.channel.read_limit {
231                    *amt -= n as u64;
232                }
233                Ok(n)
234            }
235            Err(e) => Err(e.into()),
236        }
237    }
238}
239
240impl<'channel> Write for Stream<'channel> {
241    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
242        unsafe {
243            let rc = raw::libssh2_channel_write_ex(
244                self.channel.raw,
245                self.id as c_int,
246                data.as_ptr() as *mut _,
247                data.len() as size_t,
248            );
249            self.channel.sess.rc(rc as c_int).map(|()| rc as usize)
250        }
251        .map_err(Into::into)
252    }
253
254    fn flush(&mut self) -> io::Result<()> {
255        unsafe {
256            let rc = raw::libssh2_channel_flush_ex(self.channel.raw, self.id as c_int);
257            self.channel.sess.rc(rc)
258        }
259        .map_err(Into::into)
260    }
261}
262*/