wspty 0.1.0

xterm.js example with Rust Tokio and pty backend server
Documentation
extern crate env_logger;
extern crate futures;
extern crate tokio;
use bytes::BytesMut;
use futures::SinkExt;
use futures::StreamExt;
use futures_util::stream::{SplitSink, SplitStream};
use log::{debug, error};
use serde_derive::Deserialize;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::process::Command;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_tungstenite::{accept_async, WebSocketStream};
use tungstenite::Message;
use wspty::{PtyCommand, PtyMaster};

#[tokio::main]
async fn main() {
    env_logger::init();
    let _ = ws_server()
        .await
        .map_err(|e| debug!("ws server exit with error: {:?}", e));
}

#[derive(Deserialize, Debug)]
struct WindowSize {
    cols: u16,
    rows: u16,
}

async fn handle_websocket_incoming(
    mut incoming: SplitStream<WebSocketStream<TcpStream>>,
    mut pty_shell_writer: PtyMaster,
    websocket_sender: UnboundedSender<Message>,
    stop_sender: UnboundedSender<()>,
) -> Result<(), anyhow::Error> {
    while let Some(Ok(msg)) = incoming.next().await {
        match msg {
            Message::Binary(data) => match data[0] {
                0 => {
                    if data.len().gt(&0) {
                        pty_shell_writer.write_all(&data[1..]).await?;
                    }
                }
                1 => {
                    let resize_msg: WindowSize = serde_json::from_slice(&data[1..])?;
                    pty_shell_writer.resize(resize_msg.cols, resize_msg.rows)?;
                }
                2 => {
                    websocket_sender.send(Message::Binary(vec![1u8]))?;
                }
                _ => (),
            },
            Message::Ping(data) => websocket_sender.send(Message::Pong(data))?,
            _ => (),
        };
    }
    let _ = stop_sender
        .send(())
        .map_err(|e| debug!("failed to send stop signal: {:?}", e));
    Ok(())
}

async fn handle_pty_incoming(
    mut pty_shell_reader: PtyMaster,
    websocket_sender: UnboundedSender<Message>,
) -> Result<(), anyhow::Error> {
    let fut = async move {
        let mut buffer = BytesMut::with_capacity(1024);
        loop {
            buffer.extend_from_slice(&0u8.to_be_bytes());
            buffer.resize(1024, 0u8);
            let mut tail = &mut buffer[1..];
            let n = pty_shell_reader.read_buf(&mut tail).await?;
            if n == 0 {
                break;
            }
            match websocket_sender.send(Message::Binary(buffer[..n + 1].to_vec())) {
                Ok(_) => (),
                Err(e) => anyhow::bail!("failed to send msg to client: {:?}", e),
            }
        }
        Ok::<(), anyhow::Error>(())
    };
    fut.await.map_err(|e| {
        log::error!("handle pty incoming error: {:?}", &e);
        e
    })
}

async fn write_to_websocket(
    mut outgoing: SplitSink<WebSocketStream<TcpStream>, Message>,
    mut receiver: UnboundedReceiver<Message>,
) -> Result<(), anyhow::Error> {
    while let Some(msg) = receiver.recv().await {
        outgoing.send(msg).await?;
    }
    Ok(())
}

async fn handle_connection(stream: TcpStream) -> Result<(), anyhow::Error> {
    let ws_stream = accept_async(stream).await?;
    let (ws_outgoing, ws_incoming) = ws_stream.split();
    let (sender, receiver) = unbounded_channel();
    let ws_sender = sender.clone();

    let mut cmd = Command::new("su");
    let mut envs: HashMap<String, String> = HashMap::new();
    envs.insert(
        "PATH".to_owned(),
        "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin".to_owned(),
    );
    cmd.envs(&envs).args(&["-", "jason"]);
    let mut pty_cmd = PtyCommand::from(cmd);
    let (stop_sender, stop_receiver) = unbounded_channel();
    let pty_master = pty_cmd.run(stop_receiver).await?;

    let pty_shell_writer = pty_master.clone();
    let pty_shell_reader = pty_master.clone();

    let res = tokio::select! {
        res = handle_websocket_incoming(ws_incoming, pty_shell_writer, sender, stop_sender) => res,
        res = handle_pty_incoming(pty_shell_reader, ws_sender) => res,
        res = write_to_websocket(ws_outgoing, receiver) => res,
    };
    log::debug!("res = {:?}", res);
    Ok(())
}

async fn ws_server() -> Result<(), anyhow::Error> {
    let addr: SocketAddr = "127.0.0.1:7703".parse().unwrap();
    match TcpListener::bind(addr).await {
        Ok(listener) => {
            while let Ok((stream, peer)) = listener.accept().await {
                log::debug!("handling request from {:?}", peer);
                let fut = async move {
                    let _ = handle_connection(stream)
                        .await
                        .map_err(|e| error!("handle connection error: {:?}", e));
                };
                tokio::spawn(fut);
            }
        }
        Err(e) => return Err(anyhow::anyhow!("failed to listen: {:?}", e)),
    }
    Ok(())
}