busrt 0.3.6

Local and network IPC bus
Documentation
use async_trait::async_trait;
use atty::Stream;
use busrt::client::AsyncClient;
use busrt::common::{BrokerInfo, BrokerStats, ClientList};
use busrt::ipc::{Client, Config};
use busrt::rpc::{DummyHandlers, Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
use busrt::{empty_payload, Error, Frame, QoS};
use clap::{Parser, Subcommand};
use colored::Colorize;
use log::{error, info};
use num_format::{Locale, ToFormattedString};
use serde_value::Value;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::time::sleep;

#[macro_use]
extern crate bma_benchmark;

#[cfg(not(feature = "std-alloc"))]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

#[macro_use]
extern crate prettytable;

trait ToDebugString<T> {
    fn to_debug_string(&self) -> String;
}

impl<T> ToDebugString<T> for T
where
    T: std::fmt::Debug,
{
    #[inline]
    fn to_debug_string(&self) -> String {
        format!("{:?}", self)
    }
}

#[derive(Subcommand, Clone)]
enum BrokerCommand {
    #[clap(name = "client.list")]
    ClientList,
    #[clap(name = "info")]
    Info,
    #[clap(name = "stats")]
    Stats,
    #[clap(name = "test")]
    Test,
}

#[derive(Parser, Clone)]
struct ListenCommand {
    #[clap(short = 't', long = "topics", help = "Subscribe to topics")]
    topics: Vec<String>,
}

#[derive(Parser, Clone)]
struct TargetPayload {
    #[clap()]
    target: String,
    #[clap(help = "payload string or empty for stdin")]
    payload: Option<String>,
}

#[derive(Parser, Clone)]
struct RpcCall {
    #[clap()]
    target: String,
    #[clap()]
    method: String,
    #[clap(help = "payload string key=value, '-' for stdin payload")]
    params: Vec<String>,
}

#[derive(Parser, Clone)]
struct PublishCommand {
    #[clap()]
    topic: String,
    #[clap(help = "payload string or empty for stdin")]
    payload: Option<String>,
}

#[derive(Subcommand, Clone)]
enum RpcCommand {
    Listen(RpcListenCommand),
    Notify(TargetPayload),
    Call0(RpcCall),
    Call(RpcCall),
}

#[derive(Parser, Clone)]
struct RpcListenCommand {
    #[clap(short = 't', long = "topics", help = "Subscribe to topics")]
    topics: Vec<String>,
}

#[derive(Parser, Clone)]
struct BenchmarkCommand {
    #[clap(short = 'w', long = "workers", default_value = "1")]
    workers: u32,
    #[clap(long = "payload-size", default_value = "100")]
    payload_size: usize,
    #[clap(short = 'i', long = "iters", default_value = "1000000")]
    iters: u32,
}

#[derive(Clone, Subcommand)]
enum Command {
    #[clap(subcommand)]
    Broker(BrokerCommand),
    Listen(ListenCommand),
    r#Send(TargetPayload),
    Publish(PublishCommand),
    #[clap(subcommand)]
    Rpc(RpcCommand),
    Benchmark(BenchmarkCommand),
}

#[derive(Parser)]
#[clap(version = busrt::VERSION, author = busrt::AUTHOR)]
struct Opts {
    #[clap(name = "socket path or host:port")]
    path: String,
    #[clap(short = 'n', long = "name")]
    name: Option<String>,
    #[clap(long = "buf-size", default_value = "8192")]
    buf_size: usize,
    #[clap(long = "queue-size", default_value = "8192")]
    queue_size: usize,
    #[clap(long = "timeout", default_value = "5")]
    timeout: f32,
    #[clap(short = 'v', long = "verbose")]
    verbose: bool,
    #[clap(short = 's', long = "silent", help = "suppress logging")]
    silent: bool,
    #[clap(subcommand)]
    command: Command,
}

fn ctable(titles: Vec<&str>) -> prettytable::Table {
    let mut table = prettytable::Table::new();
    let format = prettytable::format::FormatBuilder::new()
        .column_separator(' ')
        .borders(' ')
        .separators(
            &[prettytable::format::LinePosition::Title],
            prettytable::format::LineSeparator::new('-', '-', '-', '-'),
        )
        .padding(0, 1)
        .build();
    table.set_format(format);
    let mut titlevec: Vec<prettytable::Cell> = Vec::new();
    for t in titles {
        titlevec.push(prettytable::Cell::new(t).style_spec("Fb"));
    }
    table.set_titles(prettytable::Row::new(titlevec));
    table
}

#[inline]
fn decode_msgpack(payload: &[u8]) -> Result<Value, rmp_serde::decode::Error> {
    rmp_serde::from_slice(payload)
}

#[inline]
fn decode_json(payload: &str) -> Result<BTreeMap<Value, Value>, serde_json::Error> {
    serde_json::from_str(payload)
}

async fn print_payload(payload: &[u8], silent: bool) {
    let mut isstr = true;
    for p in payload {
        if *p < 9 {
            isstr = false;
            break;
        }
    }
    if isstr {
        if let Ok(s) = std::str::from_utf8(payload) {
            if let Ok(j) = decode_json(s) {
                if !silent {
                    println!("JSON:");
                }
                println!(
                    "{}",
                    if silent {
                        serde_json::to_string(&j)
                    } else {
                        serde_json::to_string_pretty(&j)
                    }
                    .unwrap()
                );
            } else {
                println!("{} {}", if silent { "" } else { "STR: " }, s);
            }
            return;
        }
    }
    if let Ok(data) = decode_msgpack(payload) {
        if !silent {
            println!("MSGPACK:");
        }
        if let Ok(s) = if silent {
            serde_json::to_string(&data)
        } else {
            serde_json::to_string_pretty(&data)
        } {
            println!("{}", s);
        } else {
            print_hex(payload);
        }
    } else if silent {
        let mut stdout = tokio::io::stdout();
        stdout.write_all(payload).await.unwrap();
    } else {
        print_hex(payload);
    }
}

fn print_hex(payload: &[u8]) {
    let (p, dots) = if payload.len() > 256 {
        (&payload[..256], "...")
    } else {
        #[allow(clippy::redundant_slicing)]
        (&payload[..], "")
    };
    println!("HEX: {}{}", hex::encode(p), dots);
}

#[inline]
fn sep() {
    println!("{}", "----".dimmed());
}

macro_rules! fnum {
    ($n: expr) => {
        $n.to_formatted_string(&Locale::en).replace(',', "_")
    };
}

macro_rules! ok {
    () => {
        println!("{}", "OK".green());
    };
}

#[allow(clippy::needless_for_each)]
async fn subscribe_topics(client: &mut Client, topics: &[String]) -> Result<(), Error> {
    topics
        .iter()
        .for_each(|t| info!("subscribing to the topic {}", t.yellow()));
    client
        .subscribe_bulk(
            &topics.iter().map(String::as_str).collect::<Vec<&str>>(),
            QoS::Processed,
        )
        .await
        .unwrap()
        .unwrap()
        .await
        .unwrap()
}

async fn print_frame(frame: &Frame) {
    info!("Incoming frame {} byte(s)", fnum!(frame.payload().len()));
    println!(
        "{} from {} ({})",
        frame.kind().to_debug_string().yellow(),
        frame.sender().bold(),
        frame.primary_sender()
    );
    if let Some(topic) = frame.topic() {
        println!("topic: {}", topic.magenta());
    }
    print_payload(frame.payload(), false).await;
    sep();
}

struct Handlers {}

#[async_trait]
impl RpcHandlers for Handlers {
    async fn handle_frame(&self, frame: Frame) {
        print_frame(&frame).await;
    }
    async fn handle_notification(&self, event: RpcEvent) {
        info!(
            "Incoming RPC notification {} byte(s)",
            fnum!(event.payload().len())
        );
        println!(
            "{} from {} ({})",
            event.kind().to_debug_string().yellow(),
            event.sender().bold(),
            event.primary_sender()
        );
        print_payload(event.payload(), false).await;
        sep();
    }
    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
        info!("Incoming RPC call");
        println!(
            "method: {}",
            event
                .parse_method()
                .map_or_else(
                    |_| format!("HEX: {}", hex::encode(event.method())),
                    ToOwned::to_owned
                )
                .blue()
                .bold()
        );
        println!(
            "from {} ({})",
            event.sender().bold(),
            event.primary_sender()
        );
        print_payload(event.payload(), false).await;
        sep();
        Ok(None)
    }
}

async fn read_stdin() -> Vec<u8> {
    let mut stdin = tokio::io::stdin();
    let mut buf: Vec<u8> = Vec::new();
    if atty::is(Stream::Stdin) {
        println!("Reading stdin, Ctrl-D to finish...");
    }
    stdin.read_to_end(&mut buf).await.unwrap();
    buf
}

async fn get_payload(candidate: &Option<String>) -> Vec<u8> {
    if let Some(p) = candidate {
        p.as_bytes().to_vec()
    } else {
        read_stdin().await
    }
}

async fn create_client(opts: &Opts, name: &str) -> Client {
    let config = Config::new(&opts.path, name)
        .buf_size(opts.buf_size)
        .queue_size(opts.queue_size)
        .timeout(Duration::from_secs_f32(opts.timeout));
    Client::connect(&config)
        .await
        .expect("Unable to connect to the busrt broker")
}

macro_rules! bm_finish {
    ($iters: expr, $futs: expr) => {
        while let Some(f) = $futs.pop() {
            f.await.unwrap();
        }
        staged_benchmark_finish_current!($iters);
    };
}

async fn benchmark_client(
    opts: &Opts,
    client_name: &str,
    iters: u32,
    workers: u32,
    payload_size: usize,
) {
    let iters_worker = iters / workers;
    let data = Arc::new(vec![0xee; payload_size]);
    let mut clients = Vec::new();
    let mut ecs = Vec::new();
    let mut cnns = Vec::new();
    let mut futs = Vec::new();
    for w in 0..workers {
        let cname = format!("{}-{}", client_name, w + 1);
        let cname_null = format!("{}-{}-null", client_name, w + 1);
        let mut client = create_client(opts, &cname).await;
        let rx = client.take_event_channel().unwrap();
        clients.push(Arc::new(Mutex::new(client)));
        ecs.push(Arc::new(Mutex::new(rx)));
        cnns.push(cname_null);
    }
    macro_rules! clear {
        () => {
            for e in &ecs {
                let rx = e.lock().await;
                while !rx.is_empty() {
                    let _r = rx.recv().await;
                }
            }
        };
    }
    macro_rules! spawn_sender {
        ($client: expr, $target: expr, $payload: expr, $qos: expr) => {
            futs.push(tokio::spawn(async move {
                let mut client = $client.lock().await;
                for _ in 0..iters_worker {
                    let result = client
                        .send(&$target, $payload.clone().into(), $qos)
                        .await
                        .unwrap();
                    if $qos.needs_ack() {
                        let _r = result.unwrap().await.unwrap();
                    }
                }
            }));
        };
    }
    for q in &[(QoS::No, "no"), (QoS::RealtimeProcessed, "processed")] {
        let qos = q.0;
        clear!();
        staged_benchmark_start!(&format!("send.qos.{}", q.1));
        for w in 0..workers {
            let client = clients[w as usize].clone();
            let payload = data.clone();
            let target = cnns[w as usize].clone();
            spawn_sender!(client, target, payload, qos);
        }
        bm_finish!(iters, futs);
    }
    for q in &[(QoS::No, "no"), (QoS::RealtimeProcessed, "processed")] {
        let qos = q.0;
        clear!();
        staged_benchmark_start!(&format!("send+recv.qos.{}", q.1));
        for w in 0..workers {
            let client = clients[w as usize].clone();
            let target = client.lock().await.get_name().to_owned();
            let payload = data.clone();
            spawn_sender!(client, target, payload, qos);
            let crx = ecs[w as usize].clone();
            futs.push(tokio::spawn(async move {
                let rx = crx.lock().await;
                let mut cnt = 0;
                while cnt < iters_worker {
                    let _r = rx.recv().await;
                    cnt += 1;
                }
            }));
        }
        bm_finish!(iters, futs);
    }
}

struct BenchmarkHandlers {}

#[async_trait]
impl RpcHandlers for BenchmarkHandlers {
    async fn handle_frame(&self, _frame: Frame) {}
    async fn handle_notification(&self, _event: RpcEvent) {}
    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
        if event.parse_method()? == "benchmark.selftest" {
            Ok(Some(event.payload().to_vec()))
        } else {
            Err(RpcError::method(None))
        }
    }
}

async fn benchmark_rpc(
    opts: &Opts,
    client_name: &str,
    iters: u32,
    workers: u32,
    payload_size: usize,
) {
    let iters_worker = iters / workers;
    let data = Arc::new(vec![0xee; payload_size]);
    let mut rpcs = Vec::new();
    let mut cnns = Vec::new();
    let mut futs = Vec::new();
    for w in 0..workers {
        let cname = format!("{}-{}", client_name, w + 1);
        let cname_null = format!("{}-{}-null", client_name, w + 1);
        let client = create_client(opts, &cname).await;
        let rpc = RpcClient::new(client, BenchmarkHandlers {});
        rpcs.push(Arc::new(Mutex::new(rpc)));
        cnns.push(cname_null);
    }
    macro_rules! spawn_caller {
        ($rpc: expr, $target: expr, $method: expr, $payload: expr, $cr: expr) => {
            futs.push(tokio::spawn(async move {
                let rpc = $rpc.lock().await;
                for _ in 0..iters_worker {
                    if $cr {
                        let result = rpc
                            .call(
                                &$target,
                                $method,
                                $payload.clone().into(),
                                QoS::RealtimeProcessed,
                            )
                            .await
                            .unwrap();
                        assert_eq!(result.payload(), *$payload);
                    } else {
                        let result = rpc
                            .call0(
                                &$target,
                                $method,
                                $payload.clone().into(),
                                QoS::RealtimeProcessed,
                            )
                            .await
                            .unwrap();
                        let _r = result.unwrap().await.unwrap();
                    }
                }
            }));
        };
    }
    staged_benchmark_start!("rpc.call");
    for w in 0..workers {
        let rpc = rpcs[w as usize].clone();
        let payload = data.clone();
        spawn_caller!(rpc, ".broker", "benchmark.test", payload, true);
    }
    bm_finish!(iters, futs);
    staged_benchmark_start!("rpc.call+handle");
    for w in 0..workers {
        let rpc = rpcs[w as usize].clone();
        let target = rpc.lock().await.client().lock().await.get_name().to_owned();
        let payload = data.clone();
        spawn_caller!(rpc, target, "benchmark.selftest", payload, true);
    }
    bm_finish!(iters, futs);
    staged_benchmark_start!("rpc.call0");
    for w in 0..workers {
        let rpc = rpcs[w as usize].clone();
        let target = cnns[w as usize].clone();
        let payload = data.clone();
        spawn_caller!(rpc, target, "test", payload, false);
    }
    bm_finish!(iters, futs);
}

#[allow(clippy::too_many_lines)]
#[tokio::main(worker_threads = 1)]
async fn main() {
    let opts = Opts::parse();
    let client_name = opts.name.as_ref().map_or_else(
        || {
            format!(
                "cli.{}.{}",
                hostname::get()
                    .expect("Unable to get hostname")
                    .to_str()
                    .expect("Unable to parse hostname"),
                std::process::id()
            )
        },
        ToOwned::to_owned,
    );
    if !opts.silent {
        env_logger::Builder::new()
            .target(env_logger::Target::Stdout)
            .filter_level(if opts.verbose {
                log::LevelFilter::Trace
            } else {
                log::LevelFilter::Info
            })
            .init();
    }
    info!(
        "Connecting to {}, using service name {}",
        opts.path, client_name
    );
    macro_rules! prepare_rpc_call {
        ($c: expr, $client: expr) => {{
            let rpc = RpcClient::new($client, DummyHandlers {});
            let payload = if $c.params.len() == 1 && $c.params[0] == "-" {
                read_stdin().await
            } else if $c.params.is_empty() {
                Vec::new()
            } else {
                let s = $c.params.iter().map(String::as_str).collect::<Vec<&str>>();
                rmp_serde::to_vec_named(&busrt::common::str_to_params_map(&s).unwrap()).unwrap()
            };
            (rpc, payload)
        }};
    }
    match opts.command {
        Command::Broker(ref op) => {
            let client = create_client(&opts, &client_name).await;
            match op {
                BrokerCommand::ClientList => {
                    let rpc = RpcClient::new(client, DummyHandlers {});
                    let result = rpc
                        .call(".broker", "client.list", empty_payload!(), QoS::Processed)
                        .await
                        .unwrap();
                    let mut clients: ClientList = rmp_serde::from_slice(result.payload()).unwrap();
                    clients.clients.sort();
                    let mut table = ctable(vec![
                        "name", "type", "source", "port", "r_frames", "r_bytes", "w_frames",
                        "w_bytes", "queue", "ins",
                    ]);
                    for c in clients.clients {
                        if c.name != client_name {
                            table.add_row(row![
                                c.name,
                                c.kind,
                                c.source.unwrap_or_default(),
                                c.port.unwrap_or_default(),
                                fnum!(c.r_frames),
                                fnum!(c.r_bytes),
                                fnum!(c.w_frames),
                                fnum!(c.w_bytes),
                                fnum!(c.queue),
                                fnum!(c.instances),
                            ]);
                        }
                    }
                    table.printstd();
                }
                BrokerCommand::Stats => {
                    let rpc = RpcClient::new(client, DummyHandlers {});
                    let result = rpc
                        .call(".broker", "stats", empty_payload!(), QoS::Processed)
                        .await
                        .unwrap();
                    let stats: BrokerStats = rmp_serde::from_slice(result.payload()).unwrap();
                    let mut table = ctable(vec!["field", "value"]);
                    table.add_row(row!["r_frames", stats.r_frames]);
                    table.add_row(row!["r_bytes", stats.r_bytes]);
                    table.add_row(row!["w_frames", stats.w_frames]);
                    table.add_row(row!["w_bytes", stats.w_bytes]);
                    table.add_row(row!["uptime", stats.uptime]);
                    table.printstd();
                }
                BrokerCommand::Info => {
                    let rpc = RpcClient::new(client, DummyHandlers {});
                    let result = rpc
                        .call(".broker", "info", empty_payload!(), QoS::Processed)
                        .await
                        .unwrap();
                    let info: BrokerInfo = rmp_serde::from_slice(result.payload()).unwrap();
                    let mut table = ctable(vec!["field", "value"]);
                    table.add_row(row!["author", info.author]);
                    table.add_row(row!["version", info.version]);
                    table.printstd();
                }
                BrokerCommand::Test => {
                    let rpc = RpcClient::new(client, DummyHandlers {});
                    let result = rpc
                        .call(".broker", "test", empty_payload!(), QoS::Processed)
                        .await
                        .unwrap();
                    print_payload(result.payload(), opts.silent).await;
                }
            }
        }
        Command::Listen(ref cmd) => {
            let mut client = create_client(&opts, &client_name).await;
            subscribe_topics(&mut client, &cmd.topics).await.unwrap();
            sep();
            let rx = client.take_event_channel().unwrap();
            println!(
                "Listening to messages for {} ...",
                client_name.cyan().bold()
            );
            let fut = tokio::spawn(async move {
                while let Ok(frame) = rx.recv().await {
                    print_frame(&frame).await;
                }
            });
            let sleep_step = Duration::from_millis(100);
            while client.is_connected() {
                sleep(sleep_step).await;
            }
            fut.abort();
        }
        Command::r#Send(ref cmd) => {
            let mut client = create_client(&opts, &client_name).await;
            let payload = get_payload(&cmd.payload).await;
            let fut = if cmd.target.contains(&['*', '?'][..]) {
                client.send_broadcast(&cmd.target, payload.into(), QoS::Processed)
            } else {
                client.send(&cmd.target, payload.into(), QoS::Processed)
            };
            fut.await.unwrap().unwrap().await.unwrap().unwrap();
            ok!();
        }
        Command::Publish(ref cmd) => {
            let mut client = create_client(&opts, &client_name).await;
            let payload = get_payload(&cmd.payload).await;
            client
                .publish(&cmd.topic, payload.into(), QoS::Processed)
                .await
                .unwrap()
                .unwrap()
                .await
                .unwrap()
                .unwrap();
            ok!();
        }
        Command::Rpc(ref r) => {
            let mut client = create_client(&opts, &client_name).await;
            match r {
                RpcCommand::Listen(cmd) => {
                    subscribe_topics(&mut client, &cmd.topics).await.unwrap();
                    let rpc = RpcClient::new(client, Handlers {});
                    sep();
                    println!(
                        "Listening to RPC messages for {} ...",
                        client_name.cyan().bold()
                    );
                    let sleep_step = Duration::from_millis(100);
                    while rpc.is_connected() {
                        sleep(sleep_step).await;
                    }
                }
                RpcCommand::Notify(cmd) => {
                    let rpc = RpcClient::new(client, DummyHandlers {});
                    let payload = get_payload(&cmd.payload).await;
                    rpc.notify(&cmd.target, payload.into(), QoS::Processed)
                        .await
                        .unwrap()
                        .unwrap()
                        .await
                        .unwrap()
                        .unwrap();
                    ok!();
                }
                RpcCommand::Call0(cmd) => {
                    let (rpc, payload) = prepare_rpc_call!(cmd, client);
                    rpc.call0(&cmd.target, &cmd.method, payload.into(), QoS::Processed)
                        .await
                        .unwrap()
                        .unwrap()
                        .await
                        .unwrap()
                        .unwrap();
                    ok!();
                }
                RpcCommand::Call(cmd) => {
                    let (rpc, payload) = prepare_rpc_call!(cmd, client);
                    match rpc
                        .call(&cmd.target, &cmd.method, payload.into(), QoS::Processed)
                        .await
                    {
                        Ok(result) => print_payload(result.payload(), opts.silent).await,
                        Err(e) => {
                            let message = e
                                .data()
                                .map_or("", |data| std::str::from_utf8(data).unwrap_or(""));
                            error!("RPC Error {}: {}", e.code(), message);
                            std::process::exit(1);
                        }
                    }
                }
            }
        }
        Command::Benchmark(ref cmd) => {
            println!(
                "Starting benchmark, {} worker(s), {} iters, {} iters/worker, {} byte(s) payload",
                cmd.workers.to_string().blue().bold(),
                fnum!(cmd.iters).yellow(),
                fnum!(cmd.iters / cmd.workers).bold(),
                fnum!(cmd.payload_size).cyan()
            );
            benchmark_client(
                &opts,
                &client_name,
                cmd.iters,
                cmd.workers,
                cmd.payload_size,
            )
            .await;
            benchmark_rpc(
                &opts,
                &client_name,
                cmd.iters,
                cmd.workers,
                cmd.payload_size,
            )
            .await;
            staged_benchmark_print!();
        }
    }
}