tcp_server/
tcp_server.rs

1extern crate bytes;
2extern crate futures;
3extern crate tokio;
4extern crate tokio_simplified;
5
6use bytes::BytesMut;
7use futures::Stream;
8use std::net::{IpAddr, Ipv4Addr};
9use tokio::codec::{Decoder, Encoder};
10use tokio::net::{TcpListener, TcpStream};
11
12use tokio_simplified::IoManagerBuilder;
13
14struct LineCodec;
15
16impl Decoder for LineCodec {
17    type Item = String;
18    type Error = std::io::Error;
19
20    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
21        let line_end_index = src.iter().position(|x| x.clone() == '\n' as u8);
22        Ok(match line_end_index {
23            None => None,
24            Some(index) => {
25                let line = src.split_to(index);
26                src.split_to(1);
27                Some(String::from_utf8(line.to_vec()).unwrap())
28            }
29        })
30    }
31
32    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
33        if src.len() > 0 {
34            return Ok(Some(String::from_utf8(src.to_vec()).unwrap()));
35        }
36        Err(std::io::Error::from(std::io::ErrorKind::ConnectionAborted))
37    }
38}
39
40impl Encoder for LineCodec {
41    type Item = String;
42    type Error = std::io::Error;
43
44    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
45        dst.extend(item.as_bytes());
46        dst.extend(b"\r\n");
47        Ok(())
48    }
49}
50
51fn process_socket(socket: TcpStream) {
52    println!("New Client");
53    let (sink, stream) = LineCodec.framed(socket).split();
54    let trx = IoManagerBuilder::new(sink, stream)
55        .with_filter(|frame, writer| {
56            if frame.to_lowercase().contains("hello there") {
57                writer.write("General Kenobi!".to_string());
58                return None;
59            }
60            Some(frame)
61        })
62        .with_error_handler(move |error| {
63            println!("{}", error);
64        })
65        .build();
66    let mut writer = trx.get_writer();
67    trx.on_receive(move |frame| {
68        println!("Got frame: {}", frame);
69        writer.write("Hi there".to_string());
70        Ok(())
71    });
72}
73
74fn main() {
75    println!("Hello Tokio");
76    let addr = std::net::SocketAddr::new(IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), 6000);
77    let listener = TcpListener::bind(&addr);
78    match listener {
79        Ok(listener) => tokio::run(
80            listener
81                .incoming()
82                .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
83                .for_each(|socket| {
84                    process_socket(socket);
85                    Ok(())
86                }),
87        ),
88        _ => {}
89    };
90}