1use async_std::net::{TcpListener, TcpStream};
18use async_std::prelude::*;
19use async_std::task;
20use async_std::task::{Context, Poll};
21use futures::io::{AsyncRead, AsyncWrite};
22use futures::stream::TryStreamExt;
23use simple_message_channels::{Message, Reader, Writer};
24use std::env;
25use std::io::{ErrorKind, Result};
26use std::pin::Pin;
27use std::sync::Arc;
28
29fn usage() {
30 println!("usage: cargo run --example tcp -- [client|server] [address]");
31 std::process::exit(1);
32}
33
34fn main() {
35 let count = env::args().count();
36 if count != 3 {
37 usage();
38 }
39 let mode = env::args().nth(1).unwrap();
40 let address = env::args().nth(2).unwrap();
41
42 task::block_on(async move {
43 let result = match mode.as_ref() {
44 "server" => tcp_server(address).await,
45 "client" => tcp_client(address).await,
46 _ => panic!(usage()),
47 };
48 if let Err(e) = result {
49 eprintln!("error: {}", e);
50 }
51 });
52}
53
54async fn tcp_server(address: String) -> Result<()> {
55 let listener = TcpListener::bind(&address).await?;
56 println!("Listening on {}", listener.local_addr()?);
57
58 let mut incoming = listener.incoming();
59 while let Some(stream) = incoming.next().await {
60 let stream = stream?;
61 let peer_addr = stream.peer_addr().unwrap();
62 eprintln!("new connection from {}", peer_addr);
63 task::spawn(async move {
64 match handle_incoming(stream).await {
65 Err(ref e) if e.kind() != ErrorKind::UnexpectedEof => {
66 eprintln!("connection closed from {} with error: {}", peer_addr, e);
67 }
68 Err(_) | Ok(()) => {
69 eprintln!("connection closed from {}", peer_addr);
70 }
71 }
72 });
73 }
74 Ok(())
75}
76
77async fn tcp_client(address: String) -> Result<()> {
78 let tcp_stream = TcpStream::connect(&address).await?;
79 handle_outgoing(tcp_stream).await?;
80 Ok(())
81}
82
83async fn handle_incoming(stream: TcpStream) -> Result<()> {
84 let (mut reader, mut writer) = create_from_stream(stream);
85 while let Some(msg) = reader.try_next().await? {
86 eprintln!("received: {}", format_msg(&msg));
87 let resp = Message {
88 channel: msg.channel,
89 typ: 2,
90 message: to_upper(&msg.message),
91 };
92 writer.send(resp).await?;
93 }
94 Ok(())
95}
96
97async fn handle_outgoing(stream: TcpStream) -> Result<()> {
98 let (mut reader, mut writer) = create_from_stream(stream);
99
100 let hello_msg = Message::new(1, 1, "hi".as_bytes().to_vec());
101 writer.send(hello_msg).await?;
102
103 while let Some(msg) = reader.try_next().await? {
104 eprintln!("received: {}", format_msg(&msg));
105 }
106
107 Ok(())
108}
109
110fn create_from_stream(tcp_stream: TcpStream) -> (Reader<CloneableStream>, Writer<CloneableStream>) {
111 let stream = CloneableStream(Arc::new(tcp_stream));
112 let reader = Reader::new(stream.clone());
113 let writer = Writer::new(stream.clone());
114 (reader, writer)
115}
116
117fn format_msg(msg: &Message) -> String {
118 format!(
119 "chan {} typ {} msg {}",
120 msg.channel,
121 msg.typ,
122 String::from_utf8(msg.message.to_vec()).unwrap_or("<invalid utf8>".to_string())
123 )
124}
125
126fn to_upper(bytes: &[u8]) -> Vec<u8> {
127 let string = String::from_utf8(bytes.to_vec()).unwrap();
128 string.to_uppercase().as_bytes().to_vec()
129}
130
131#[derive(Clone)]
132struct CloneableStream(Arc<TcpStream>);
133impl AsyncRead for CloneableStream {
134 fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
135 Pin::new(&mut &*self.0).poll_read(cx, buf)
136 }
137}
138impl AsyncWrite for CloneableStream {
139 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
140 Pin::new(&mut &*self.0).poll_write(cx, buf)
141 }
142 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
143 Pin::new(&mut &*self.0).poll_flush(cx)
144 }
145 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
146 Pin::new(&mut &*self.0).poll_close(cx)
147 }
148}