use futures::{AsyncReadExt, AsyncWriteExt};
use libp2prs_runtime::task;
use log::info;
use libp2prs_core::Transport;
use libp2prs_tcp::TcpConfig;
use libp2prs_yamux::Config as YamuxConfig;
use libp2prs_core::identity::Keypair;
use libp2prs_core::muxing::StreamMuxer;
use libp2prs_core::upgrade::{UpgradeInfo, Upgrader};
use libp2prs_secio::Config as SecioConfig;
fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
if std::env::args().nth(1) == Some("server".to_string()) {
info!("Starting server ......");
run_server();
} else {
info!("Starting client ......");
run_client();
}
}
#[allow(clippy::clone_double_ref)]
fn run_server() {
task::block_on(async {
let mut listener = TcpConfig::new()
.listen_on("/ip4/127.0.0.1/tcp/12345".parse().unwrap())
.expect("listen on");
while let Ok(socket) = listener.accept_output().await {
task::spawn(async move {
let key = Keypair::generate_secp256k1();
let cfg = SecioConfig::new(key);
let info = cfg.protocol_info().get(0).unwrap().clone();
let socket = cfg.upgrade_inbound(socket, info).await.expect("secio upgrade inbound");
let cfg = YamuxConfig::server();
let info = cfg.protocol_info().get(0).unwrap().clone();
let mut stream_muxer = cfg.upgrade_outbound(socket, info).await.expect("yamux upgrade inbound");
if let Some(task) = stream_muxer.task() {
task::spawn(async move {
task.await;
});
}
while let Ok(mut s) = stream_muxer.accept_stream().await {
task::spawn(async move {
let mut buf = [0u8; 1024];
loop {
let n = s.read(&mut buf).await.expect("read2");
if n == 0 {
break;
}
s.write_all(&buf[..n]).await.expect("write all");
s.flush().await.unwrap();
}
});
}
info!("connection is closed");
});
}
});
}
#[allow(clippy::clone_double_ref)]
fn run_client() {
task::block_on(async move {
let socket = TcpConfig::new()
.dial("/ip4/127.0.0.1/tcp/12345".parse().unwrap())
.await
.expect("dial");
let key = Keypair::generate_secp256k1();
let cfg = SecioConfig::new(key);
let info = cfg.protocol_info().get(0).unwrap().clone();
let socket = cfg.upgrade_outbound(socket, info).await.expect("secio upgrade outbound");
let cfg = YamuxConfig::client();
let info = cfg.protocol_info().get(0).unwrap().clone();
let mut stream_muxer = cfg.upgrade_outbound(socket, info).await.expect("yamux upgrade outbound");
if let Some(task) = stream_muxer.task() {
task::spawn(async move {
task.await;
});
}
let mut stream = stream_muxer.open_stream().await.expect("open stream");
let data = b"hello world";
stream.write_all(data).await.unwrap();
info!("C: {}: wrote {} bytes", stream.id(), data.len());
stream.flush().await.unwrap();
let mut frame = vec![0; data.len()];
stream.read_exact(&mut frame).await.unwrap();
info!("C: {}: read {} bytes", stream.id(), frame.len());
assert_eq!(&data[..], &frame[..]);
let _ = stream.close().await;
});
}