gnostr-cat 0.0.40

Command-line client for web sockets, like netcat/curl/socat for ws://.
Documentation
use std::collections::HashMap;
use std::io::{Error as IoError, Read};
use std::rc::Rc;
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex};

use futures::future::ok;
use tokio_io::AsyncRead;
use wasmtime::{Caller, Linker, Memory, Module, TypedFunc};

use super::{BoxedNewPeerFuture, ConstructParams, Peer, PeerConstructor, Specifier};
use crate::peer_strerr;

#[derive(Default)]
struct Env {
    store: wasmtime::Store<()>,
    modules: HashMap<String, wasmtime::Instance>,
}

static ENV: Mutex<Option<Env>> = Mutex::new(None);
static COUNTER: AtomicU32 = AtomicU32::new(0);

#[derive(Clone)]
pub struct Handle {
    mem: Memory,
    malloc: TypedFunc<u32, u32>,
    free: TypedFunc<u32, ()>,
    transform: TypedFunc<(u32, u32, u32, u32, u32), u32>,
}
impl std::fmt::Debug for Handle {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Handle").finish()
    }
}

macro_rules! declare_wasm_transform_class {
    ($PI:ident, $C:ident, $opt:ident, $prefix:literal, $help:literal $(,)?) => {
        #[derive(Debug)]
        pub struct $PI<T: Specifier>(pub T);
        impl<T: Specifier> Specifier for $PI<T> {
            fn construct(&self, cp: ConstructParams) -> PeerConstructor {
                let inner = self.0.construct(cp.clone());
                inner
                    .map(move |p, _| transform_peer(p, cp.program_options.$opt.clone()))
            }
            specifier_boilerplate!(has_subspec noglobalstate);
            self_0_is_subspecifier!(proxy_is_multiconnect);
        }
        specifier_class!(
            name = $C,
            target = $PI,
            prefixes = [$prefix],
            arg_handling = subspec,
            overlay = true,
            MessageOriented,
            MulticonnectnessDependsOnInnerType,
            help = $help
        );
    };
}

declare_wasm_transform_class!(
    WasmTransformA,
    WasmTransformAClass,
    wasm_transform_a,
    "wasm_plugin_transform_a:",
    r#"
[A] Custom overlay that transforms data being read from inner specifier by calling a function from a loaded WebAssembly plugin.
Writes go through unmodified. Multiple distinct transforms may be active at the same time, see "_b", "_c" and "_d" postfixes

Example:

    emcc --no-entry -sERROR_ON_UNDEFINED_SYMBOLS=0  -s STANDALONE_WASM  transform_plugins/sample.c  \
            -Wl,--export=websocat_transform,--export=malloc,--export=free -o transform_plugins/sample.wasm
    
    websocat -Eb ws-l:127.0.0.1:1234 wasm_plugin_transform_a:mirror: --wasm-plugin-a=transform_plugins/sample.wasm
"#
);

declare_wasm_transform_class!(
    WasmTransformB,
    WasmTransformBClass,
    wasm_transform_b,
    "wasm_plugin_transform_b:",
    "[A] Same as `wasm_plugin_transform_a`, but for other plugin slot.",
);
declare_wasm_transform_class!(
    WasmTransformC,
    WasmTransformCClass,
    wasm_transform_c,
    "wasm_plugin_transform_c:",
    "[A] Same as `wasm_plugin_transform_a`, but for other plugin slot.",
);
declare_wasm_transform_class!(
    WasmTransformD,
    WasmTransformDClass,
    wasm_transform_d,
    "wasm_plugin_transform_d:",
    "[A] Same as `wasm_plugin_transform_a`, but for other plugin slot.",
);

pub fn load_symbol(spec: &str) -> crate::Result<Handle> {
    let mut env = ENV.lock().unwrap();
    let env = env.get_or_insert_with(Default::default);

    let (libname, symname) = if let Some((before, after)) = spec.split_once('@') {
        (after, before)
    } else {
        (spec, "websocat_transform")
    };

    let instance = match env.modules.entry(libname.to_owned()) {
        std::collections::hash_map::Entry::Occupied(x) => x.into_mut(),
        std::collections::hash_map::Entry::Vacant(x) => {
            let module =
                if libname.starts_with('!') {
                    info!("Loading pre-built wasm module {}", &libname[1..]);
                    unsafe { Module::deserialize_file(env.store.engine(), &libname[1..])? }
                } else {
                    #[cfg(feature = "wasm_compiler")]
                    {
                        info!("Compiling wasm module {}", libname);
                        Module::from_file(env.store.engine(), libname)?
                    }
                    #[cfg(not(feature = "wasm_compiler"))]
                    {
                        return Err("Compiling wasm modules is not enabled in this Websocat \
                                    build. Pre-compile them using `wasmtime compile`, then \
                                    specify as `!myfilename.cwasm`")?;
                    }
                };

            let mut linker = Linker::<()>::new(env.store.engine());
            let mem_cell = Arc::new(Mutex::new(None::<Memory>));
            let mem_cell2 = mem_cell.clone();
            linker.func_wrap(
                "env",
                "websocat_log",
                move |c: Caller<()>, severity: i32, buffer: u32, mut len: u32| {
                    if len > 4096 {
                        len = 4096;
                    }
                    let mut buf = vec![0u8; len as usize];
                    mem_cell2
                        .lock()
                        .unwrap()
                        .unwrap()
                        .read(c, buffer as usize, &mut buf[..])
                        .unwrap();

                    let level = match severity {
                        x if x <= 1 => log::Level::Error,
                        2 => log::Level::Warn,
                        3 => log::Level::Info,
                        4 => log::Level::Debug,
                        _ => log::Level::Trace,
                    };
                    log::log!(level, "{}", std::string::String::from_utf8_lossy(&buf[..]));
                },
            )?;

            let instance = linker.instantiate(&mut env.store, &module).unwrap();
            let mem = instance
                .get_memory(&mut env.store, "memory")
                .ok_or_else(|| "no memory")?;
            *mem_cell.lock().unwrap() = Some(mem);
            debug!("Wasm module loaded successfully");
            x.insert(instance)
        }
    };

    debug!("Instantiating {}", spec);
    let mem = instance
        .get_memory(&mut env.store, "memory")
        .ok_or_else(|| "no memory")?;
    let transform =
        instance.get_typed_func::<(u32, u32, u32, u32, u32), u32, _>(&mut env.store, symname)?;
    let malloc = instance.get_typed_func::<u32, u32, _>(&mut env.store, "malloc")?;
    let free = instance.get_typed_func::<u32, (), _>(&mut env.store, "free")?;
    debug!("Instantiated");

    let h = Handle {
        mem,
        malloc,
        free,
        transform,
    };

    Ok(h)
}

pub fn transform_peer(inner_peer: Peer, s: Option<Handle>) -> BoxedNewPeerFuture {
    if s.is_none() {
        return peer_strerr("Symbol for wasm_plugin_transform_... is not specified");
    }

    let conn_seqn = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
    let s = s.unwrap();

    let mut env = ENV.lock().unwrap();
    let env = env.as_mut().unwrap();

    if let Err(_e) = s.transform.call(&mut env.store, (0, 0, 0, conn_seqn, 0)) {
        return peer_strerr("Failed to call symbol from wasm module");
    }

    let filtered_r = TransformPeer {
        inner: inner_peer.0,
        sym: s,
        seqn: 1,
        conn_seqn,
        buf: 0,
        buf_cap: 0,
    };
    let thepeer = Peer::new(filtered_r, inner_peer.1, inner_peer.2);

    Box::new(ok(thepeer)) as BoxedNewPeerFuture
}

struct TransformPeer {
    inner: Box<dyn AsyncRead>,
    sym: Handle,
    seqn: u32,
    conn_seqn: u32,
    buf: u32,
    buf_cap: u32,
}

impl TransformPeer {
    fn transform(&mut self, b: &mut [u8], l: u32, mut n: u32) -> crate::Result<u32> {
        let mut env = ENV.lock().unwrap();
        let env = env.as_mut().unwrap();

        if self.buf == 0 {
            self.buf = self.sym.malloc.call(&mut env.store, l)?;
            self.buf_cap = l;

            if self.buf == 0 {
                return Err("Allocation failed inside wasm module")?;
            }
        }

        if self.buf_cap < n {
            warn!("Trimming message to be processed in wasm due to larger read than expected");
            n = self.buf_cap;
        }
        self.sym
            .mem
            .write(&mut env.store, self.buf as usize, &b[0..(n as usize)])?;
        let mut k = self.sym.transform.call(
            &mut env.store,
            (self.buf, n, self.buf_cap, self.conn_seqn, self.seqn),
        )?;
        if k > self.buf_cap || k > l {
            warn!("Invalid return value from wasm transform function");
            k = self.buf_cap.min(l);
        }
        self.sym
            .mem
            .read(&mut env.store, self.buf as usize, &mut b[0..(k as usize)])?;

        Ok(k)
    }
}

impl Read for TransformPeer {
    fn read(&mut self, b: &mut [u8]) -> Result<usize, IoError> {
        let mut l = b.len();

        if l > i32::MAX as usize {
            l = i32::MAX as usize;
        }

        let n = match self.inner.read(&mut b[..l]) {
            Ok(x) => x,
            Err(e) => return Err(e),
        };

        if n == 0 {
            return Ok(0);
        }

        let k = match self.transform(b, l as u32, n as u32) {
            Ok(k) => k,
            Err(e) => return Err(crate::util::simple_err(format!("{}", e))),
        };
        self.seqn += 1;

        Ok(k as usize)
    }
}
impl AsyncRead for TransformPeer {}
impl Drop for TransformPeer {
    fn drop(&mut self) {
        let mut env = ENV.lock().unwrap();
        let env = env.as_mut().unwrap();
        let _ = self
            .sym
            .transform
            .call(&mut env.store, (0, 0, 0, self.conn_seqn, self.seqn));
        if self.buf != 0 {
            let _ = self.sym.free.call(&mut env.store, self.buf);
            self.buf = 0;
        }
    }
}