1use tokio_io::{AsyncRead, AsyncWrite};
2use futures::{self, Async, Future, Poll};
3use std::io::prelude::*;
4use std::io;
5use thrussh;
6use session;
7use SharableConnection;
8
9#[derive(Default)]
10pub(crate) struct State {
11 pub(crate) closed: bool,
12
13 pub(crate) read_notify: Option<futures::task::Task>,
14 pub(crate) data_start: usize,
15 pub(crate) data: Vec<u8>,
16 pub(crate) eof: bool,
17
18 pub(crate) exit_notify: Option<futures::task::Task>,
19 pub(crate) exit_status: Option<u32>,
20
21 pub(crate) open_notify: Option<futures::task::Task>,
22 pub(crate) open_state: Option<Result<(), thrussh::ChannelOpenFailure>>,
23}
24
25pub struct ChannelOpenFuture<'a, S: AsyncRead + AsyncWrite> {
27 cmd: &'a str,
28 session: SharableConnection<S>,
29 state: session::state::Ref,
30 id: thrussh::ChannelId,
31 first_round: bool,
32}
33
34impl<'a, S: AsyncRead + AsyncWrite> ChannelOpenFuture<'a, S> {
35 pub(crate) fn new(
36 cmd: &'a str,
37 session: SharableConnection<S>,
38 state: session::state::Ref,
39 id: thrussh::ChannelId,
40 ) -> Self {
41 ChannelOpenFuture {
42 cmd,
43 session,
44 state,
45 id,
46 first_round: true,
47 }
48 }
49}
50
51impl<'a, S: AsyncRead + AsyncWrite + thrussh::Tcp> Future for ChannelOpenFuture<'a, S> {
52 type Item = Channel;
53 type Error = thrussh::HandlerError<()>;
54
55 fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
56 if self.first_round {
57 self.session.0.borrow_mut().c.abort_read()?;
58 self.first_round = false;
59 }
60
61 let mut s = self.state.borrow_mut();
62 let state = s.state_for
63 .get_mut(&self.id)
64 .expect("no state entry for valid channel");
65
66 state.open_notify = None;
67 match state.open_state.take() {
68 Some(Ok(_)) => {
69 {
70 let mut s = self.session.0.borrow_mut();
71 assert!(s.c.channel_is_open(self.id));
72 s.c.exec(self.id, true, self.cmd);
73 s.task.take().unwrap().notify();
75 }
76
77 Ok(Async::Ready(Channel {
78 state: self.state.clone(),
79 id: self.id,
80 }))
81 }
82 Some(Err(e)) => Err(thrussh::HandlerError::Error(thrussh::Error::IO(
83 io::Error::new(io::ErrorKind::Other, format!("{:?}", e)),
84 ))),
85 None => {
86 state.open_notify = Some(futures::task::current());
87 Ok(Async::NotReady)
88 }
89 }
90 }
91}
92
93pub struct Channel {
95 state: session::state::Ref,
96 id: thrussh::ChannelId,
97}
98
99pub struct ExitStatusFuture {
101 state: session::state::Ref,
102 id: thrussh::ChannelId,
103}
104
105impl Channel {
106 pub fn exit_status(self) -> ExitStatusFuture {
108 ExitStatusFuture {
109 state: self.state,
110 id: self.id,
111 }
112 }
113}
114
115impl Future for ExitStatusFuture {
116 type Item = u32;
117 type Error = ();
118
119 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
120 let mut s = self.state.borrow_mut();
121 let state = s.state_for
122 .get_mut(&self.id)
123 .expect("no state entry for valid channel");
124
125 state.exit_notify = None;
126 if let Some(e) = state.exit_status {
127 Ok(Async::Ready(e))
128 } else if state.closed {
129 Err(())
130 } else {
131 state.exit_notify = Some(futures::task::current());
132 Ok(Async::NotReady)
133 }
134 }
135}
136
137impl Read for Channel {
138 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
139 let mut s = self.state.borrow_mut();
140 let state = s.state_for
141 .get_mut(&self.id)
142 .expect("no state entry for valid channel");
143 let n = ::std::cmp::min(buf.len(), state.data.len() - state.data_start);
144 (&mut buf[..n]).copy_from_slice(&state.data[state.data_start..(state.data_start + n)]);
145
146 state.data_start += n;
151 if state.data_start == state.data.len() {
152 state.data_start = 0;
153 state.data.clear();
154 }
155
156 state.read_notify = None;
157 if n == 0 && !state.eof {
158 state.read_notify = Some(futures::task::current());
159 Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
160 } else {
161 Ok(n)
162 }
163 }
164}
165impl AsyncRead for Channel {}
175