tmq 0.2.0

ZeroMQ bindings for Tokio
Documentation
use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt};
use zmq::{Context, SocketType};

use std::thread::spawn;
use tmq::{dealer, Multipart, Result};
use utils::{
    generate_tcp_address, hammer_receive, msg, send_multipart_repeated, send_multiparts, sync_echo,
    sync_receive_multipart_repeated, sync_receive_multiparts,
};

mod utils;

#[tokio::test]
async fn send_single_message() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let sock = dealer(&ctx).connect(&address)?;

    let data = vec![vec!["hello", "world"]];
    let thread = sync_receive_multiparts(address, SocketType::DEALER, data.clone());

    send_multiparts(sock, data).await?;

    thread.join().unwrap();

    Ok(())
}

#[tokio::test]
async fn send_multiple_messages() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let sock = dealer(&ctx).connect(&address)?;

    let data = vec![
        vec!["hello", "world"],
        vec!["second", "message"],
        vec!["third", "message"],
    ];
    let thread = sync_receive_multiparts(address, SocketType::DEALER, data.clone());

    send_multiparts(sock, data).await?;

    thread.join().unwrap();

    Ok(())
}

#[tokio::test]
async fn send_empty_message() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let sock = dealer(&ctx).connect(&address)?;

    let data = vec!["hello", "world"];
    let thread = sync_receive_multiparts(address, SocketType::DEALER, vec![data.clone()]);

    send_multiparts(sock, vec![vec![], vec![], vec![], data]).await?;

    thread.join().unwrap();

    Ok(())
}

#[tokio::test]
async fn send_hammer() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let sock = dealer(&ctx).connect(&address)?;

    let count = 1_000;
    let data = vec!["hello", "world"];
    let thread = sync_receive_multipart_repeated(address, SocketType::DEALER, data.clone(), count);

    send_multipart_repeated(sock, data, count).await?;

    thread.join().unwrap();

    Ok(())
}

#[tokio::test]
async fn receive_hammer() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let sock = dealer(&ctx).bind(&address)?;
    hammer_receive(sock, address, SocketType::DEALER).await
}

#[tokio::test]
async fn proxy_sequence() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let mut sock = dealer(&ctx).connect(&address)?;

    let count = 1_000;

    let make_msg = |index| -> Multipart {
        let m1 = format!("Msg #{}", index);
        let m2 = format!("Msg #{} (contd.)", index);
        vec![msg(m1.as_bytes()), msg(m2.as_bytes())].into()
    };

    for i in 0..count {
        sock.send(make_msg(i)).await?;
    }

    let echo = sync_echo(address, SocketType::DEALER, count);

    for i in 0..count {
        if let Some(multipart) = sock.next().await {
            let multipart = multipart?;

            assert_eq!(make_msg(i), multipart);
        } else {
            panic!("Stream ended too soon.");
        }
    }

    echo.join().unwrap();

    Ok(())
}

#[tokio::test]
async fn proxy_interleaved() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let mut sock = dealer(&ctx).connect(&address)?;

    let count = 1_000;
    let echo = sync_echo(address, SocketType::DEALER, count);

    for i in 0..count {
        let m1 = format!("Msg #{}", i);
        let m2 = format!("Msg #{} (contd.)", i);
        sock.send(vec![msg(m1.as_bytes()), msg(m2.as_bytes())])
            .await?;
        if let Some(multipart) = sock.next().await {
            let multipart = multipart?;

            let expected: Multipart = vec![msg(m1.as_bytes()), msg(m2.as_bytes())].into();
            assert_eq!(expected, multipart);
        } else {
            panic!("Iteam in stream is missing.");
        }
    }

    echo.join().unwrap();

    Ok(())
}

#[tokio::test]
async fn split_echo() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let (tx, rx) = dealer(&ctx).bind(&address)?.split();
    let count = 10;

    let thread = spawn(move || {
        let ctx = Context::new();
        let sender = ctx.socket(SocketType::DEALER).unwrap();
        sender.connect(&address).unwrap();

        for _ in 0..count {
            sender
                .send_multipart(vec!["hello", "world"].into_iter(), 0)
                .unwrap();
            assert_eq!(
                sender.recv_multipart(0).unwrap(),
                vec!(b"hello".to_vec(), b"world".to_vec())
            );
        }
    });

    rx.take(count)
        .forward(tx)
        .map_err(|e| panic!(e))
        .map(|_| ())
        .await;

    thread.join().unwrap();

    Ok(())
}

#[tokio::test]
async fn split_send_all() -> Result<()> {
    let address = generate_tcp_address();
    let ctx = Context::new();
    let (mut tx, _) = dealer(&ctx).connect(&address)?.split();

    let count = 10_000;
    let thread = spawn(move || {
        let ctx = Context::new();
        let receiver = ctx.socket(SocketType::DEALER).unwrap();
        receiver.bind(&address).unwrap();

        for i in 0..count {
            let received = receiver.recv_multipart(0).unwrap();
            assert_eq!(
                received,
                vec!(
                    i.to_string().as_bytes().to_vec(),
                    (i + 1).to_string().as_bytes().to_vec(),
                )
            );
        }
    });

    let mut count = futures::stream::iter((0..count).map(|i| {
        Ok(Multipart::from(vec![
            zmq::Message::from(&i.to_string()),
            zmq::Message::from(&(i + 1).to_string()),
        ]))
    }));
    tx.send_all(&mut count).await?;

    thread.join().unwrap();

    Ok(())
}