libp2prs-yamux 0.3.0

Yamux multiplexing protocol for libp2p
Documentation
// Copyright 2020 Netwarps Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::{AsyncReadExt, AsyncWriteExt};
use libp2prs_runtime::task;
use log::info;

use libp2prs_core::Transport;
use libp2prs_tcp::TcpConfig;
use libp2prs_yamux::Config as YamuxConfig;

use libp2prs_core::identity::Keypair;
use libp2prs_core::muxing::StreamMuxer;
use libp2prs_core::upgrade::{UpgradeInfo, Upgrader};
use libp2prs_secio::Config as SecioConfig;

fn main() {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
    if std::env::args().nth(1) == Some("server".to_string()) {
        info!("Starting server ......");
        run_server();
    } else {
        info!("Starting client ......");
        run_client();
    }
}

#[allow(clippy::clone_double_ref)]
fn run_server() {
    task::block_on(async {
        let mut listener = TcpConfig::new()
            .listen_on("/ip4/127.0.0.1/tcp/12345".parse().unwrap())
            .expect("listen on");

        while let Ok(socket) = listener.accept_output().await {
            task::spawn(async move {
                let key = Keypair::generate_secp256k1();
                let cfg = SecioConfig::new(key);
                let info = cfg.protocol_info().get(0).unwrap().clone();
                let socket = cfg.upgrade_inbound(socket, info).await.expect("secio upgrade inbound");

                let cfg = YamuxConfig::server();
                let info = cfg.protocol_info().get(0).unwrap().clone();

                let mut stream_muxer = cfg.upgrade_outbound(socket, info).await.expect("yamux upgrade inbound");

                if let Some(task) = stream_muxer.task() {
                    task::spawn(async move {
                        task.await;
                    });
                }

                while let Ok(mut s) = stream_muxer.accept_stream().await {
                    task::spawn(async move {
                        let mut buf = [0u8; 1024];
                        loop {
                            let n = s.read(&mut buf).await.expect("read2");
                            if n == 0 {
                                break;
                            }
                            s.write_all(&buf[..n]).await.expect("write all");
                            s.flush().await.unwrap();
                        }
                    });
                }
                info!("connection is closed");
            });
        }
    });
}

#[allow(clippy::clone_double_ref)]
fn run_client() {
    task::block_on(async move {
        let socket = TcpConfig::new()
            .dial("/ip4/127.0.0.1/tcp/12345".parse().unwrap())
            .await
            .expect("dial");

        let key = Keypair::generate_secp256k1();

        let cfg = SecioConfig::new(key);
        let info = cfg.protocol_info().get(0).unwrap().clone();

        let socket = cfg.upgrade_outbound(socket, info).await.expect("secio upgrade outbound");

        let cfg = YamuxConfig::client();
        let info = cfg.protocol_info().get(0).unwrap().clone();

        let mut stream_muxer = cfg.upgrade_outbound(socket, info).await.expect("yamux upgrade outbound");

        if let Some(task) = stream_muxer.task() {
            task::spawn(async move {
                task.await;
            });
        }

        let mut stream = stream_muxer.open_stream().await.expect("open stream");

        let data = b"hello world";

        stream.write_all(data).await.unwrap();
        info!("C: {}: wrote {} bytes", stream.id(), data.len());
        // need flush?
        stream.flush().await.unwrap();

        let mut frame = vec![0; data.len()];
        stream.read_exact(&mut frame).await.unwrap();

        info!("C: {}: read {} bytes", stream.id(), frame.len());

        assert_eq!(&data[..], &frame[..]);

        let _ = stream.close().await;

        //Ok::<(), yamux::ConnectionError>(())
    });
}