websocat 1.11.0

Command-line client for web sockets, like netcat/curl/socat for ws://.
Documentation
#[cfg(unix)]
extern crate tokio_file_unix;
extern crate tokio_reactor;
#[cfg(all(unix, feature = "signal_handler"))]
extern crate tokio_signal;
extern crate tokio_stdin_stdout;

use futures;
use futures::future::Future;
use std;
use std::cell::RefCell;
use std::io::Result as IoResult;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::rc::Rc;
use tokio_io::{AsyncRead, AsyncWrite};

#[cfg(unix)]
use self::tokio_file_unix::File as UnixFile;
use std::fs::{File as FsFile, OpenOptions};

use super::{BoxedNewPeerFuture, Peer, Result};
use futures::Stream;

use super::{once, spawn_hack, ConstructParams, PeerConstructor, Specifier};

#[derive(Clone, Debug)]
pub struct AsyncStdio;
impl Specifier for AsyncStdio {
    fn construct(&self, p: ConstructParams) -> PeerConstructor {
        let ret;
        ret = get_stdio_peer(&mut p.global(GlobalState::default));
        once(ret)
    }
    specifier_boilerplate!(globalstate singleconnect no_subspec);
}

specifier_class!(
    name = AsyncStdioClass,
    target = AsyncStdio,
    prefixes = ["asyncstdio:"],
    arg_handling = noarg,
    overlay = false,
    StreamOriented,
    SingleConnect,
    help = r#"
[A] Set stdin and stdout to nonblocking mode, then use it as a communication counterpart. UNIX-only.
May cause problems with programs running at the same terminal. This specifier backs the `--async-stdio` CLI option. 

Typically this specifier can be specified only one time.
    
Example: simulate `cat(1)`. This is an exception from "only one time" rule above:

    websocat - -

Example: SSH transport

    ssh -c ProxyCommand='websocat asyncstdio: ws://myserver/mywebsocket' user@myserver
"#
);


specifier_class!(
    name = InetdClass,
    target = AsyncStdio,
    prefixes = ["inetd:"],
    arg_handling = noarg,
    overlay = false,
    StreamOriented,
    SingleConnect,
    help = r#"
Like `asyncstdio:`, but intended for inetd(8) usage. [A]

Automatically enables `-q` (`--quiet`) mode.

`inetd-ws:` - is of `ws-l:inetd:`

Example of inetd.conf line that makes it listen for websocket
connections on port 1234 and redirect the data to local SSH server.

    1234 stream tcp nowait myuser  /opt/websocat websocat inetd-ws: tcp:127.0.0.1:22
"#
);

#[derive(Clone, Debug)]
pub struct OpenAsync(pub PathBuf);
impl Specifier for OpenAsync {
    fn construct(&self, _: ConstructParams) -> PeerConstructor {
        let ret;
        ret = get_file_peer(&self.0);
        once(ret)
    }
    specifier_boilerplate!(noglobalstate singleconnect no_subspec);
}
specifier_class!(
    name = OpenAsyncClass,
    target = OpenAsync,
    prefixes = ["open-async:"],
    arg_handling = into,
    overlay = false,
    MessageOriented, // ?
    SingleConnect,
    help = r#"
Open file for read and write and use it like a socket. [A]
Not for regular files, see readfile/writefile instead.
  
Example: Serve big blobs of random data to clients

    websocat -U ws-l:127.0.0.1:8088 open-async:/dev/urandom

"#
);

#[derive(Clone, Debug)]
pub struct OpenFdAsync(pub i32);
impl Specifier for OpenFdAsync {
    fn construct(&self, _: ConstructParams) -> PeerConstructor {
        let ret;
        ret = get_fd_peer(self.0);
        once(ret)
    }
    specifier_boilerplate!(noglobalstate singleconnect no_subspec);
}
specifier_class!(
    name = OpenFdAsyncClass,
    target = OpenFdAsync,
    prefixes = ["open-fd:"],
    arg_handling = parse,
    overlay = false,
    MessageOriented, // ?
    SingleConnect,
    help = r#"
Use specified file descriptor like a socket. [A]

Example: Serve random data to clients v2

    websocat -U ws-l:127.0.0.1:8088 reuse:open-fd:55   55< /dev/urandom
"#
);

fn get_stdio_peer_impl(s: &mut GlobalState) -> Result<Peer> {
    let si;
    let so;
    {
        if !UnixFile::raw_new(std::io::stdin()).get_nonblocking()? {
            debug!("Setting stdin to nonblocking mode");
            s.need_to_restore_stdin_blocking_status = true;
        }
        let stdin = self::UnixFile::new_nb(std::io::stdin())?;

        if !UnixFile::raw_new(std::io::stdout()).get_nonblocking()? {
            debug!("Setting stdout to nonblocking mode");
            s.need_to_restore_stdout_blocking_status = true;
        }
        let stdout = self::UnixFile::new_nb(std::io::stdout())?;

        si = stdin.into_reader(&tokio_reactor::Handle::default())?;
        so = stdout.into_io(&tokio_reactor::Handle::default())?;

        let s_clone = s.clone();

        #[cfg(all(unix, feature = "signal_handler"))]
        {
            debug!("Installing signal handler");
            let ctrl_c = tokio_signal::ctrl_c().flatten_stream();
            let prog = ctrl_c.for_each(move |()| {
                restore_blocking_status(&s_clone);
                ::std::process::exit(0);
                #[allow(unreachable_code)]
                Ok(())
            });
            spawn_hack(Box::new(prog.map_err(|_| ())));
        }
    }
    Ok(Peer::new(si, so, None))
}

pub fn get_stdio_peer(s: &mut GlobalState) -> BoxedNewPeerFuture {
    debug!("get_stdio_peer (async)");
    Box::new(futures::future::result(get_stdio_peer_impl(s))) as BoxedNewPeerFuture
}

#[derive(Default, Clone)]
pub struct GlobalState {
    need_to_restore_stdin_blocking_status: bool,
    need_to_restore_stdout_blocking_status: bool,
}

impl Drop for GlobalState {
    fn drop(&mut self) {
        restore_blocking_status(self);
    }
}

fn restore_blocking_status(s: &GlobalState) {
    {
        debug!("restore_blocking_status");
        if s.need_to_restore_stdin_blocking_status {
            debug!("Restoring blocking status for stdin");
            let _ = UnixFile::raw_new(std::io::stdin()).set_nonblocking(false);
        }
        if s.need_to_restore_stdout_blocking_status {
            debug!("Restoring blocking status for stdout");
            let _ = UnixFile::raw_new(std::io::stdout()).set_nonblocking(false);
        }
    }
}

type ImplPollEvented = ::tokio_reactor::PollEvented<UnixFile<std::fs::File>>;

#[derive(Clone)]
struct FileWrapper(Rc<RefCell<ImplPollEvented>>);

impl AsyncRead for FileWrapper {}
impl Read for FileWrapper {
    fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
        self.0.borrow_mut().read(buf)
    }
}

impl AsyncWrite for FileWrapper {
    fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> {
        self.0.borrow_mut().shutdown()
    }
}
impl Write for FileWrapper {
    fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
        self.0.borrow_mut().write(buf)
    }
    fn flush(&mut self) -> IoResult<()> {
        self.0.borrow_mut().flush()
    }
}

fn get_file_peer_impl(p: &Path) -> Result<Peer> {
    let oo = OpenOptions::new()
        .read(true)
        .write(true)
        .create(false)
        .open(p)?;
    let f = self::UnixFile::new_nb(oo)?;

    let s = f.into_io(&tokio_reactor::Handle::default())?;
    let ss = FileWrapper(Rc::new(RefCell::new(s)));
    Ok(Peer::new(ss.clone(), ss, None))
}

pub fn get_file_peer(p: &Path) -> BoxedNewPeerFuture {
    debug!("get_file_peer");
    Box::new(futures::future::result(get_file_peer_impl(p))) as BoxedNewPeerFuture
}

fn get_fd_peer_impl(fd: i32) -> Result<Peer> {
    let ff: FsFile = unsafe { std::os::unix::io::FromRawFd::from_raw_fd(fd) };
    let f = self::UnixFile::new_nb(ff)?;

    let s = f.into_io(&tokio_reactor::Handle::default())?;
    let ss = FileWrapper(Rc::new(RefCell::new(s)));
    Ok(Peer::new(ss.clone(), ss, None))
}

pub fn get_fd_peer(fd: i32) -> BoxedNewPeerFuture {
    debug!("get_fd_peer");
    Box::new(futures::future::result(get_fd_peer_impl(fd))) as BoxedNewPeerFuture
}