1extern crate bytes;
2extern crate futures;
3extern crate tokio;
4extern crate tokio_simplified;
5
6use bytes::BytesMut;
7use futures::Stream;
8use std::net::{IpAddr, Ipv4Addr};
9use tokio::codec::{Decoder, Encoder};
10use tokio::net::{TcpListener, TcpStream};
11
12use tokio_simplified::IoManagerBuilder;
13
14struct LineCodec;
15
16impl Decoder for LineCodec {
17 type Item = String;
18 type Error = std::io::Error;
19
20 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
21 let line_end_index = src.iter().position(|x| x.clone() == '\n' as u8);
22 Ok(match line_end_index {
23 None => None,
24 Some(index) => {
25 let line = src.split_to(index);
26 src.split_to(1);
27 Some(String::from_utf8(line.to_vec()).unwrap())
28 }
29 })
30 }
31
32 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
33 if src.len() > 0 {
34 return Ok(Some(String::from_utf8(src.to_vec()).unwrap()));
35 }
36 Err(std::io::Error::from(std::io::ErrorKind::ConnectionAborted))
37 }
38}
39
40impl Encoder for LineCodec {
41 type Item = String;
42 type Error = std::io::Error;
43
44 fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
45 dst.extend(item.as_bytes());
46 dst.extend(b"\r\n");
47 Ok(())
48 }
49}
50
51fn process_socket(socket: TcpStream) {
52 println!("New Client");
53 let (sink, stream) = LineCodec.framed(socket).split();
54 let trx = IoManagerBuilder::new(sink, stream)
55 .with_filter(|frame, writer| {
56 if frame.to_lowercase().contains("hello there") {
57 writer.write("General Kenobi!".to_string());
58 return None;
59 }
60 Some(frame)
61 })
62 .with_error_handler(move |error| {
63 println!("{}", error);
64 })
65 .build();
66 let mut writer = trx.get_writer();
67 trx.on_receive(move |frame| {
68 println!("Got frame: {}", frame);
69 writer.write("Hi there".to_string());
70 Ok(())
71 });
72}
73
74fn main() {
75 println!("Hello Tokio");
76 let addr = std::net::SocketAddr::new(IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), 6000);
77 let listener = TcpListener::bind(&addr);
78 match listener {
79 Ok(listener) => tokio::run(
80 listener
81 .incoming()
82 .map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
83 .for_each(|socket| {
84 process_socket(socket);
85 Ok(())
86 }),
87 ),
88 _ => {}
89 };
90}