tcp/
tcp.rs

1//! TCP example
2//!
3//! This demonstrates how to use simple-message-channels with TCP.
4//!
5//! Usage:
6//! In one terminal run
7//!
8//! cargo run --example tcp -- server 127.0.0.1:8080
9//!
10//! and in another
11//!
12//! cargo run --example tcp -- client 127.0.0.1:8080
13//!
14//! This should send a message "hi" on channel 1, typ 1,
15//! and the server should reply with "HI" on channel 1, typ 2.
16
17use async_std::net::{TcpListener, TcpStream};
18use async_std::prelude::*;
19use async_std::task;
20use async_std::task::{Context, Poll};
21use futures::io::{AsyncRead, AsyncWrite};
22use futures::stream::TryStreamExt;
23use simple_message_channels::{Message, Reader, Writer};
24use std::env;
25use std::io::{ErrorKind, Result};
26use std::pin::Pin;
27use std::sync::Arc;
28
29fn usage() {
30    println!("usage: cargo run --example tcp -- [client|server] [address]");
31    std::process::exit(1);
32}
33
34fn main() {
35    let count = env::args().count();
36    if count != 3 {
37        usage();
38    }
39    let mode = env::args().nth(1).unwrap();
40    let address = env::args().nth(2).unwrap();
41
42    task::block_on(async move {
43        let result = match mode.as_ref() {
44            "server" => tcp_server(address).await,
45            "client" => tcp_client(address).await,
46            _ => panic!(usage()),
47        };
48        if let Err(e) = result {
49            eprintln!("error: {}", e);
50        }
51    });
52}
53
54async fn tcp_server(address: String) -> Result<()> {
55    let listener = TcpListener::bind(&address).await?;
56    println!("Listening on {}", listener.local_addr()?);
57
58    let mut incoming = listener.incoming();
59    while let Some(stream) = incoming.next().await {
60        let stream = stream?;
61        let peer_addr = stream.peer_addr().unwrap();
62        eprintln!("new connection from {}", peer_addr);
63        task::spawn(async move {
64            match handle_incoming(stream).await {
65                Err(ref e) if e.kind() != ErrorKind::UnexpectedEof => {
66                    eprintln!("connection closed from {} with error: {}", peer_addr, e);
67                }
68                Err(_) | Ok(()) => {
69                    eprintln!("connection closed from {}", peer_addr);
70                }
71            }
72        });
73    }
74    Ok(())
75}
76
77async fn tcp_client(address: String) -> Result<()> {
78    let tcp_stream = TcpStream::connect(&address).await?;
79    handle_outgoing(tcp_stream).await?;
80    Ok(())
81}
82
83async fn handle_incoming(stream: TcpStream) -> Result<()> {
84    let (mut reader, mut writer) = create_from_stream(stream);
85    while let Some(msg) = reader.try_next().await? {
86        eprintln!("received: {}", format_msg(&msg));
87        let resp = Message {
88            channel: msg.channel,
89            typ: 2,
90            message: to_upper(&msg.message),
91        };
92        writer.send(resp).await?;
93    }
94    Ok(())
95}
96
97async fn handle_outgoing(stream: TcpStream) -> Result<()> {
98    let (mut reader, mut writer) = create_from_stream(stream);
99
100    let hello_msg = Message::new(1, 1, "hi".as_bytes().to_vec());
101    writer.send(hello_msg).await?;
102
103    while let Some(msg) = reader.try_next().await? {
104        eprintln!("received: {}", format_msg(&msg));
105    }
106
107    Ok(())
108}
109
110fn create_from_stream(tcp_stream: TcpStream) -> (Reader<CloneableStream>, Writer<CloneableStream>) {
111    let stream = CloneableStream(Arc::new(tcp_stream));
112    let reader = Reader::new(stream.clone());
113    let writer = Writer::new(stream.clone());
114    (reader, writer)
115}
116
117fn format_msg(msg: &Message) -> String {
118    format!(
119        "chan {} typ {} msg {}",
120        msg.channel,
121        msg.typ,
122        String::from_utf8(msg.message.to_vec()).unwrap_or("<invalid utf8>".to_string())
123    )
124}
125
126fn to_upper(bytes: &[u8]) -> Vec<u8> {
127    let string = String::from_utf8(bytes.to_vec()).unwrap();
128    string.to_uppercase().as_bytes().to_vec()
129}
130
131#[derive(Clone)]
132struct CloneableStream(Arc<TcpStream>);
133impl AsyncRead for CloneableStream {
134    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
135        Pin::new(&mut &*self.0).poll_read(cx, buf)
136    }
137}
138impl AsyncWrite for CloneableStream {
139    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
140        Pin::new(&mut &*self.0).poll_write(cx, buf)
141    }
142    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
143        Pin::new(&mut &*self.0).poll_flush(cx)
144    }
145    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
146        Pin::new(&mut &*self.0).poll_close(cx)
147    }
148}