use tokio_io::{AsyncRead, AsyncWrite};
use futures::{self, Async, Future, Poll};
use std::io::prelude::*;
use std::io;
use thrussh;
use session;
use SharableConnection;
#[derive(Default)]
pub(crate) struct State {
pub(crate) closed: bool,
pub(crate) read_notify: Option<futures::task::Task>,
pub(crate) data_start: usize,
pub(crate) data: Vec<u8>,
pub(crate) eof: bool,
pub(crate) exit_notify: Option<futures::task::Task>,
pub(crate) exit_status: Option<u32>,
pub(crate) open_notify: Option<futures::task::Task>,
pub(crate) open_state: Option<Result<(), thrussh::ChannelOpenFailure>>,
}
pub struct ChannelOpenFuture<'a, S: AsyncRead + AsyncWrite> {
cmd: &'a str,
session: SharableConnection<S>,
state: session::state::Ref,
id: thrussh::ChannelId,
first_round: bool,
}
impl<'a, S: AsyncRead + AsyncWrite> ChannelOpenFuture<'a, S> {
pub(crate) fn new(
cmd: &'a str,
session: SharableConnection<S>,
state: session::state::Ref,
id: thrussh::ChannelId,
) -> Self {
ChannelOpenFuture {
cmd,
session,
state,
id,
first_round: true,
}
}
}
impl<'a, S: AsyncRead + AsyncWrite + thrussh::Tcp> Future for ChannelOpenFuture<'a, S> {
type Item = Channel;
type Error = thrussh::HandlerError<()>;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
if self.first_round {
self.session.0.borrow_mut().c.abort_read()?;
self.first_round = false;
}
let mut s = self.state.borrow_mut();
let state = s.state_for
.get_mut(&self.id)
.expect("no state entry for valid channel");
state.open_notify = None;
match state.open_state.take() {
Some(Ok(_)) => {
{
let mut s = self.session.0.borrow_mut();
assert!(s.c.channel_is_open(self.id));
s.c.exec(self.id, true, self.cmd);
s.task.take().unwrap().notify();
}
Ok(Async::Ready(Channel {
state: self.state.clone(),
id: self.id,
}))
}
Some(Err(e)) => Err(thrussh::HandlerError::Error(thrussh::Error::IO(
io::Error::new(io::ErrorKind::Other, format!("{:?}", e)),
))),
None => {
state.open_notify = Some(futures::task::current());
Ok(Async::NotReady)
}
}
}
}
pub struct Channel {
state: session::state::Ref,
id: thrussh::ChannelId,
}
pub struct ExitStatusFuture {
state: session::state::Ref,
id: thrussh::ChannelId,
}
impl Channel {
pub fn exit_status(self) -> ExitStatusFuture {
ExitStatusFuture {
state: self.state,
id: self.id,
}
}
}
impl Future for ExitStatusFuture {
type Item = u32;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut s = self.state.borrow_mut();
let state = s.state_for
.get_mut(&self.id)
.expect("no state entry for valid channel");
state.exit_notify = None;
if let Some(e) = state.exit_status {
Ok(Async::Ready(e))
} else if state.closed {
Err(())
} else {
state.exit_notify = Some(futures::task::current());
Ok(Async::NotReady)
}
}
}
impl Read for Channel {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut s = self.state.borrow_mut();
let state = s.state_for
.get_mut(&self.id)
.expect("no state entry for valid channel");
let n = ::std::cmp::min(buf.len(), state.data.len() - state.data_start);
(&mut buf[..n]).copy_from_slice(&state.data[state.data_start..(state.data_start + n)]);
state.data_start += n;
if state.data_start == state.data.len() {
state.data_start = 0;
state.data.clear();
}
state.read_notify = None;
if n == 0 && !state.eof {
state.read_notify = Some(futures::task::current());
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
} else {
Ok(n)
}
}
}
impl AsyncRead for Channel {}