wampire 0.2.1

A asynchronous WAMPv2 client and router implenting the basic WAMP profile
Documentation
#![allow(dead_code)]
#![allow(unused_imports)]
#![allow(unused_variables)]
use std::{
    io,
    sync::{Arc, Mutex},
};

use log::info;

use wampire::{
    client::{Client, Connection, Subscription},
    MatchingPolicy, Value, URI,
};

enum Command {
    Sub,
    Pub,
    Unsub,
    List,
    Help,
    Quit,
    NoOp,
    Invalid(String),
}

fn print_prompt() {
    println!("Enter a command (or type \"help\")");
}

fn get_input_from_user() -> String {
    let mut input = String::new();
    io::stdin().read_line(&mut input).unwrap();
    input
}

fn process_input(input: &str) -> (Command, Vec<String>) {
    let mut i_iter = input.splitn(2, ' ');
    let command = match i_iter.next() {
        Some(command) => command.trim().to_lowercase(),
        None => return (Command::NoOp, Vec::new()),
    };
    let command = match command.as_str() {
        "pub" => Command::Pub,
        "sub" => Command::Sub,
        "unsub" => Command::Unsub,
        "list" => Command::List,
        "help" => Command::Help,
        "quit" => Command::Quit,
        "" => Command::NoOp,
        x => Command::Invalid(x.to_string()),
    };
    let args = match i_iter.next() {
        Some(args_string) => args_string
            .split(',')
            .map(|s| s.trim().to_string())
            .collect(),
        None => Vec::new(),
    };
    (command, args)
}

async fn subscribe(
    client: &mut Client,
    subscriptions: &mut Arc<Mutex<Vec<Subscription>>>,
    args: &[String],
) {
    if args.len() > 2 {
        println!("Too many arguments to subscribe.  Ignoring");
    } else if args.is_empty() {
        println!("Please specify the topic to subscribe to");
        return;
    }
    let topic = args[0].clone();
    let policy = if args.len() > 1 {
        match args[1].as_str() {
            "prefix" => MatchingPolicy::Prefix,
            "wild" => MatchingPolicy::Wildcard,
            "strict" => MatchingPolicy::Strict,
            _ => {
                println!("Invalid matching type, should be 'prefix', 'wild' or 'strict'");
                return;
            }
        }
    } else {
        MatchingPolicy::Strict
    };
    let subscriptions = Arc::clone(subscriptions);
    client
        .subscribe_with_pattern(
            URI::new(&topic),
            Box::new(move |args, kwargs| {
                println!(
                    "Received message on topic {} with args {:?} and kwargs {:?}",
                    topic, args, kwargs
                );
            }),
            policy,
        )
        .await
        .and_then(move |subscription| {
            println!("Subscribed to topic {}", subscription.topic.uri);
            subscriptions.lock().unwrap().push(subscription);
            Ok(())
        })
        .unwrap();
}

async fn unsubscribe(
    client: &mut Client,
    subscriptions: &mut Arc<Mutex<Vec<Subscription>>>,
    args: &[String],
) {
    if args.len() > 1 {
        println!("Too many arguments to subscribe.  Ignoring");
    } else if args.is_empty() {
        println!("Please specify the topic to subscribe to");
        return;
    }
    match args[0].parse::<usize>() {
        Ok(i) => {
            let mut subscriptions = subscriptions.lock().unwrap();
            if i >= subscriptions.len() {
                println!("Invalid subscription index: {}", i);
                return;
            }
            let subscription = subscriptions.remove(i);
            let topic = subscription.topic.uri.clone();
            client
                .unsubscribe(subscription)
                .await
                .and_then(move |()| {
                    println!("Successfully unsubscribed from {}", topic);
                    Ok(())
                })
                .unwrap();
        }
        Err(_) => {
            println!("Invalid subscription index: {}", args[0]);
        }
    }
}

fn list(subscriptions: &Arc<Mutex<Vec<Subscription>>>) {
    let subscriptions = subscriptions.lock().unwrap();
    for (index, subscription) in subscriptions.iter().enumerate() {
        println!("{} {}", index, subscription.topic.uri);
    }
}

async fn publish(client: &mut Client, args: &[String]) {
    if args.is_empty() {
        println!("Please specify a topic to publish to");
    }
    let uri = &args[0];
    let args = args[1..]
        .iter()
        .map(|arg| match arg.parse::<i64>() {
            Ok(i) => Value::Integer(i),
            Err(_) => Value::String(arg.clone()),
        })
        .collect();
    client
        .publish_and_acknowledge(URI::new(uri), Some(args), None)
        .await
        .unwrap();
}

fn help() {
    println!("The following commands are supported:");
    println!("  sub <topic>, <matching policy>?",);
    println!("       Subscribes to the topic specified by the uri <topic>");
    println!("       <matching policy> specifies the type of patten matching used",);
    println!(
        "       <matching policy> should be one of 'strict' (the default), 'wild' or 'prefix'",
    );
    println!("  pub <topic>, <args>*",);
    println!("       Publishes to the topic specified by uri <topic>");
    println!("       <args> is an optinal, comma separated list of arguments");
    println!("  list");
    println!("       Lists all of the current subscriptions, along with their index");
    println!("  unsub <index>");
    println!("       Unsubscribes from the topic subscription specified by the given index");
    println!("  quit");
    println!("       Sends a goodbye message and quits the program");
}

async fn event_loop(mut client: Client) {
    let mut subscriptions = Arc::new(Mutex::new(Vec::new()));
    loop {
        print_prompt();
        let input = get_input_from_user();
        let (command, args) = process_input(&input);
        match command {
            Command::Sub => subscribe(&mut client, &mut subscriptions, &args).await,
            Command::Pub => publish(&mut client, &args).await,
            Command::Unsub => unsubscribe(&mut client, &mut subscriptions, &args).await,
            Command::List => list(&subscriptions),
            Command::Help => help(),
            Command::Quit => break,
            Command::NoOp => {}
            Command::Invalid(bad_command) => print!("Invalid command: {}", bad_command),
        }
    }
    client.shutdown().await.unwrap();
}

#[tokio::main]
async fn main() {
    env_logger::init();
    let connection = Connection::new("ws://127.0.0.1:8090/ws", "wampire_realm");
    info!("Connecting");
    let client = connection.connect().unwrap();

    info!("Connected");
    event_loop(client).await
}