async_ssh/
channel.rs

1use tokio_io::{AsyncRead, AsyncWrite};
2use futures::{self, Async, Future, Poll};
3use std::io::prelude::*;
4use std::io;
5use thrussh;
6use session;
7use SharableConnection;
8
9#[derive(Default)]
10pub(crate) struct State {
11    pub(crate) closed: bool,
12
13    pub(crate) read_notify: Option<futures::task::Task>,
14    pub(crate) data_start: usize,
15    pub(crate) data: Vec<u8>,
16    pub(crate) eof: bool,
17
18    pub(crate) exit_notify: Option<futures::task::Task>,
19    pub(crate) exit_status: Option<u32>,
20
21    pub(crate) open_notify: Option<futures::task::Task>,
22    pub(crate) open_state: Option<Result<(), thrussh::ChannelOpenFailure>>,
23}
24
25/// A newly opened, but not yet established channel.
26pub struct ChannelOpenFuture<'a, S: AsyncRead + AsyncWrite> {
27    cmd: &'a str,
28    session: SharableConnection<S>,
29    state: session::state::Ref,
30    id: thrussh::ChannelId,
31    first_round: bool,
32}
33
34impl<'a, S: AsyncRead + AsyncWrite> ChannelOpenFuture<'a, S> {
35    pub(crate) fn new(
36        cmd: &'a str,
37        session: SharableConnection<S>,
38        state: session::state::Ref,
39        id: thrussh::ChannelId,
40    ) -> Self {
41        ChannelOpenFuture {
42            cmd,
43            session,
44            state,
45            id,
46            first_round: true,
47        }
48    }
49}
50
51impl<'a, S: AsyncRead + AsyncWrite + thrussh::Tcp> Future for ChannelOpenFuture<'a, S> {
52    type Item = Channel;
53    type Error = thrussh::HandlerError<()>;
54
55    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
56        if self.first_round {
57            self.session.0.borrow_mut().c.abort_read()?;
58            self.first_round = false;
59        }
60
61        let mut s = self.state.borrow_mut();
62        let state = s.state_for
63            .get_mut(&self.id)
64            .expect("no state entry for valid channel");
65
66        state.open_notify = None;
67        match state.open_state.take() {
68            Some(Ok(_)) => {
69                {
70                    let mut s = self.session.0.borrow_mut();
71                    assert!(s.c.channel_is_open(self.id));
72                    s.c.exec(self.id, true, self.cmd);
73                    // poke connection thread to say that we sent stuff
74                    s.task.take().unwrap().notify();
75                }
76
77                Ok(Async::Ready(Channel {
78                    state: self.state.clone(),
79                    id: self.id,
80                }))
81            }
82            Some(Err(e)) => Err(thrussh::HandlerError::Error(thrussh::Error::IO(
83                io::Error::new(io::ErrorKind::Other, format!("{:?}", e)),
84            ))),
85            None => {
86                state.open_notify = Some(futures::task::current());
87                Ok(Async::NotReady)
88            }
89        }
90    }
91}
92
93/// A channel used to communicate with a process running at a remote host.
94pub struct Channel {
95    state: session::state::Ref,
96    id: thrussh::ChannelId,
97}
98
99/// A future that will eventually resolve to the exit status of a process running on a remote host.
100pub struct ExitStatusFuture {
101    state: session::state::Ref,
102    id: thrussh::ChannelId,
103}
104
105impl Channel {
106    /// Get the exit status of the remote process associated with this channel.
107    pub fn exit_status(self) -> ExitStatusFuture {
108        ExitStatusFuture {
109            state: self.state,
110            id: self.id,
111        }
112    }
113}
114
115impl Future for ExitStatusFuture {
116    type Item = u32;
117    type Error = ();
118
119    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
120        let mut s = self.state.borrow_mut();
121        let state = s.state_for
122            .get_mut(&self.id)
123            .expect("no state entry for valid channel");
124
125        state.exit_notify = None;
126        if let Some(e) = state.exit_status {
127            Ok(Async::Ready(e))
128        } else if state.closed {
129            Err(())
130        } else {
131            state.exit_notify = Some(futures::task::current());
132            Ok(Async::NotReady)
133        }
134    }
135}
136
137impl Read for Channel {
138    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
139        let mut s = self.state.borrow_mut();
140        let state = s.state_for
141            .get_mut(&self.id)
142            .expect("no state entry for valid channel");
143        let n = ::std::cmp::min(buf.len(), state.data.len() - state.data_start);
144        (&mut buf[..n]).copy_from_slice(&state.data[state.data_start..(state.data_start + n)]);
145
146        // NOTE: Vec::drain is an attractive option here (as it would obviate the need for a bunch
147        // of the bookkeeping we're doing) but we're choosing not to use it because it copies the
148        // entire remaining vector on drop, which could be expensive.
149        // See also https://github.com/jonhoo/async-ssh/pull/1.
150        state.data_start += n;
151        if state.data_start == state.data.len() {
152            state.data_start = 0;
153            state.data.clear();
154        }
155
156        state.read_notify = None;
157        if n == 0 && !state.eof {
158            state.read_notify = Some(futures::task::current());
159            Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
160        } else {
161            Ok(n)
162        }
163    }
164}
165/*
166impl Write for Channel {
167    fn write(&mut self, buf: &[u8]) -> Result<usize> {
168        //
169    }
170    fn flush(&mut self) -> Result<()> {}
171}
172*/
173
174impl AsyncRead for Channel {}
175/*
176impl AsyncWrite for Channel {
177    fn shutdown(&mut self) -> Poll<(), Error> {}
178}
179*/