ockam_api 0.93.0

Ockam's request-response API
use ockam_api::nodes::service::SecureChannelType;
use std::sync::Arc;
use std::time::{Duration, Instant};

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tokio::time::timeout;

use ockam_api::nodes::models::portal::OutletAccessControl;
use ockam_api::test_utils::{start_tcp_echo_server, TestNode};
use ockam_core::env::FromString;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{route, Address, AllowAll, Error, NeutralMessage};
use ockam_multiaddr::MultiAddr;
use ockam_transport_core::HostnamePort;

/// These tests serve as a benchmark for the message roundtrip latency.
/// In order for the result to be reliable, use the --profile release
/// flag when running the tests.
/// `cargo test --test latency --release -- --ignored --show-output`
#[ignore]
#[test]
pub fn measure_message_latency_two_nodes() {
    let runtime = Arc::new(Runtime::new().unwrap());
    let runtime_cloned = runtime.clone();
    std::env::remove_var("OCKAM_LOG_LEVEL");

    let result: ockam::Result<()> = runtime_cloned.block_on(async move {
        let test_body = async move {
            TestNode::clean().await?;
            let mut first_node = TestNode::create(runtime.clone(), None).await;
            let second_node = TestNode::create(runtime.clone(), None).await;

            let secure_channel = first_node
                .node_manager
                .create_secure_channel(
                    &first_node.context,
                    second_node
                        .listen_address()
                        .await
                        .multi_addr()
                        .unwrap()
                        .concat(&MultiAddr::from_string("/service/api").unwrap())
                        .unwrap(),
                    None,
                    None,
                    None,
                    None,
                    SecureChannelType::KeyExchangeAndMessages,
                )
                .await
                .unwrap();

            let ping_route = route![secure_channel.encryptor_address().address(), "echo"];
            let next = ping_route.next().unwrap();

            if let Some(flow_control_id) = first_node
                .context
                .flow_controls()
                .find_flow_control_with_producer_address(next)
                .map(|x| x.flow_control_id().clone())
            {
                first_node
                    .context
                    .flow_controls()
                    .add_consumer(first_node.context.primary_address(), &flow_control_id);
            }

            let payload = NeutralMessage::from(vec![1, 2, 3, 4]);

            // warm up buffers, cache, etc...
            for _ in 0..100 {
                first_node
                    .context
                    .send(ping_route.clone(), payload.clone())
                    .await
                    .unwrap();
                first_node
                    .context
                    .receive::<NeutralMessage>()
                    .await
                    .unwrap();
            }

            let now = Instant::now();
            for _ in 0..10_000 {
                first_node
                    .context
                    .send(ping_route.clone(), payload.clone())
                    .await
                    .unwrap();
                first_node
                    .context
                    .receive::<NeutralMessage>()
                    .await
                    .unwrap();
            }
            let elapsed = now.elapsed();
            println!(
                "single message, roundtrip latency: {:?}",
                elapsed.div_f32(10_000f32)
            );

            first_node.context.shutdown_node().await?;
            second_node.context.shutdown_node().await?;

            Ok(())
        };

        timeout(Duration::from_secs(30), test_body).await.unwrap()
    });

    result.unwrap();
    drop(runtime_cloned);
}

#[ignore]
#[test]
pub fn measure_buffer_latency_two_nodes_portal() {
    let runtime = Arc::new(Runtime::new().unwrap());
    let runtime_cloned = runtime.clone();
    std::env::remove_var("OCKAM_LOG_LEVEL");

    let result: ockam::Result<()> = runtime_cloned.block_on(async move {
        let test_body = async move {
            let echo_server_handle = start_tcp_echo_server().await;

            TestNode::clean().await?;
            let first_node = TestNode::create(runtime.clone(), None).await;
            let second_node = TestNode::create(runtime.clone(), None).await;

            let _outlet_status = second_node
                .node_manager
                .create_outlet(
                    &second_node.context,
                    echo_server_handle.chosen_addr.clone(),
                    false,
                    Some(Address::from_string("outlet")),
                    true,
                    OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))),
                    false,
                    false,
                    true,
                )
                .await?;

            let second_node_listen_address = second_node.listen_address().await;

            // create inlet in the first node pointing to the second one
            let inlet_status = first_node
                .node_manager
                .create_inlet(
                    &first_node.context,
                    HostnamePort::localhost(0),
                    route![],
                    route![],
                    second_node_listen_address
                        .multi_addr()?
                        .concat(&MultiAddr::from_string("/secure/api/service/outlet")?)?,
                    "inlet_alias".to_string(),
                    None,
                    None,
                    None,
                    true,
                    None,
                    false,
                    false,
                    false,
                    None,
                    false,
                    true,
                )
                .await?;

            // connect to inlet_status.bind_addr and send dummy payload
            let mut socket = TcpStream::connect(inlet_status.bind_addr.clone())
                .await
                .unwrap();

            socket.set_nodelay(true).unwrap();

            let mut buffer = [0u8; 5];

            for _ in 0..100 {
                socket.write_all(b"hello").await.unwrap();
                socket.read_exact(&mut buffer).await.unwrap();
            }

            let now = Instant::now();
            for _ in 0..10_000 {
                socket.write_all(b"hello").await.unwrap();
                socket.read_exact(&mut buffer).await.unwrap();
            }
            let elapsed = now.elapsed();
            println!(
                "short payload, roundtrip latency: {:?}",
                elapsed.div_f32(10_000f32)
            );

            first_node.context.shutdown_node().await?;
            second_node.context.shutdown_node().await?;

            Ok(())
        };

        timeout(Duration::from_secs(30), test_body)
            .await
            .unwrap_or_else(|_| Err(Error::new(Origin::Node, Kind::Timeout, "Test timed out")))
    });

    result.unwrap();
    drop(runtime_cloned);
}

#[ignore]
#[test]
pub fn measure_connection_latency_two_nodes_portal() {
    let runtime = Arc::new(Runtime::new().unwrap());
    let runtime_cloned = runtime.clone();
    std::env::set_var("OCKAM_LOGGING", "0");

    let result: ockam::Result<()> = runtime_cloned.block_on(async move {
        let test_body = async move {
            let echo_server_handle = start_tcp_echo_server().await;

            TestNode::clean().await?;
            let first_node = TestNode::create(runtime.clone(), None).await;
            let second_node = TestNode::create(runtime.clone(), None).await;

            let _outlet_status = second_node
                .node_manager
                .create_outlet(
                    &second_node.context,
                    echo_server_handle.chosen_addr.clone(),
                    false,
                    Some(Address::from_string("outlet")),
                    true,
                    OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))),
                    false,
                    true,
                    true,
                )
                .await?;

            let second_node_listen_address = second_node.listen_address().await;

            // create inlet in the first node pointing to the second one
            let inlet_status = first_node
                .node_manager
                .create_inlet(
                    &first_node.context,
                    HostnamePort::new("127.0.0.1", 0)?,
                    route![],
                    route![],
                    second_node_listen_address
                        .multi_addr()?
                        .concat(&MultiAddr::from_string("/secure/api/service/outlet")?)?,
                    "inlet_alias".to_string(),
                    None,
                    None,
                    None,
                    true,
                    None,
                    false,
                    false,
                    false,
                    None,
                    true,
                    true,
                )
                .await?;

            let now = Instant::now();

            for _ in 0..1000 {
                // connect to inlet_status.bind_addr and send dummy payload
                let mut socket = TcpStream::connect(inlet_status.bind_addr.clone())
                    .await
                    .unwrap();

                socket.set_nodelay(true).unwrap();

                let mut buffer = [0u8; 5];

                socket.write_all(b"hello").await.unwrap();
                socket.read_exact(&mut buffer).await.unwrap();
            }

            let elapsed = now.elapsed();
            println!(
                "short payload, connect + roundtrip latency: {:?}",
                elapsed.div_f32(1_000f32)
            );

            first_node.context.shutdown_node().await?;
            second_node.context.shutdown_node().await?;

            Ok(())
        };

        timeout(Duration::from_secs(30), test_body)
            .await
            .unwrap_or_else(|_| Err(Error::new(Origin::Node, Kind::Timeout, "Test timed out")))
    });

    result.unwrap();
    drop(runtime_cloned);
}