uppercut/remote/
server.rs

1use std::io::{Read, Write};
2use std::net::SocketAddr;
3use std::time::Duration;
4
5use mio::net::{TcpListener, TcpStream};
6use mio::{Events, Interest, Poll, Token};
7
8use parsed::stream::ByteStream;
9
10use crate::api::{AnyActor, AnySender, Envelope};
11use crate::config::ServerConfig;
12use crate::error::Error;
13use crate::remote::packet::Packet;
14
15#[derive(Debug)]
16pub(crate) struct Loop;
17
18#[derive(Debug)]
19struct Work {
20    is_readable: bool,
21    is_writable: bool,
22}
23
24pub struct Server {
25    config: ServerConfig,
26    poll: Poll,
27    events: Events,
28    socket: TcpListener,
29    counter: usize,
30    port: u16,
31}
32
33// TODO Make configurable?
34const PACKET_SIZE_LIMIT: usize = 4096 + 12; // 12 bytes header + max 4kb payload
35
36impl Server {
37    pub fn listen(addr: &str, config: &ServerConfig) -> Result<Server, Error> {
38        let poll = Poll::new().unwrap();
39        let events = Events::with_capacity(config.events_capacity);
40        let addr = addr
41            .parse::<SocketAddr>()
42            .map_err(|_| Error::InvalidServerAddress(addr.to_string()))?;
43        let port = addr.port();
44        let mut socket = TcpListener::bind(addr).map_err(|_| Error::ServerBindFailed(port))?;
45        poll.registry()
46            .register(&mut socket, Token(0), Interest::READABLE)
47            .unwrap();
48
49        let listener = Server {
50            config: config.clone(),
51            poll,
52            events,
53            socket,
54            counter: 0,
55            port,
56        };
57        Ok(listener)
58    }
59
60    pub fn port(&self) -> u16 {
61        self.port
62    }
63}
64
65impl Server {
66    fn tag(me: &str, id: usize) -> String {
67        format!("{}#{:012}", me, id)
68    }
69}
70
71impl AnyActor for Server {
72    fn receive(&mut self, envelope: Envelope, sender: &mut dyn AnySender) {
73        if envelope.message.downcast_ref::<Loop>().is_some() {
74            self.poll
75                .poll(&mut self.events, Some(Duration::from_millis(1)))
76                .unwrap();
77            for event in self.events.iter() {
78                match event.token() {
79                    Token(0) => {
80                        while let Ok((mut socket, _remote)) = self.socket.accept() {
81                            self.counter += 1;
82                            let token = Token(self.counter);
83                            self.poll
84                                .registry()
85                                .register(
86                                    &mut socket,
87                                    token,
88                                    Interest::READABLE | Interest::WRITABLE,
89                                )
90                                .unwrap();
91                            let connection = Connection {
92                                socket,
93                                keep_alive: true,
94                                recv_buf: ByteStream::with_capacity(self.config.recv_buffer_size),
95                                send_buf: ByteStream::with_capacity(self.config.send_buffer_size),
96                                is_open: true,
97                                can_read: false,
98                                can_write: false,
99                            };
100                            let tag = Server::tag(sender.me(), self.counter);
101                            sender.spawn(&tag, Box::new(move || Box::new(connection)));
102                        }
103                    }
104                    token => {
105                        let tag = Server::tag(sender.me(), token.0);
106                        let work = Work {
107                            is_readable: event.is_readable(),
108                            is_writable: event.is_writable(),
109                        };
110                        sender.send(&tag, Envelope::of(work));
111                    }
112                }
113            }
114            sender.send(sender.me(), Envelope::of(Loop));
115        }
116    }
117}
118
119struct Connection {
120    socket: TcpStream,
121    is_open: bool,
122    keep_alive: bool,
123    recv_buf: ByteStream,
124    send_buf: ByteStream,
125    can_read: bool,
126    can_write: bool,
127}
128
129impl Connection {
130    fn keep_open(&mut self, sender: &mut dyn AnySender) -> bool {
131        if !self.is_open {
132            if !self.keep_alive {
133                self.is_open = true;
134            } else {
135                sender.stop(sender.me());
136            }
137        }
138        self.is_open
139    }
140}
141
142impl AnyActor for Connection {
143    fn receive(&mut self, envelope: Envelope, sender: &mut dyn AnySender) {
144        if let Some(work) = envelope.message.downcast_ref::<Work>() {
145            let mut buffer = [0u8; 1024];
146            self.can_read = work.is_readable;
147            self.can_write = self.can_write || work.is_writable;
148            if self.can_read {
149                match self.socket.read(&mut buffer[..]) {
150                    Ok(0) | Err(_) => {
151                        self.is_open = false;
152                    }
153                    Ok(n) => {
154                        self.recv_buf.put(&buffer[0..n]);
155                    }
156                }
157            }
158
159            if !self.keep_open(sender) {
160                return;
161            }
162
163            loop {
164                let r = Packet::from_bytes(&mut self.recv_buf, PACKET_SIZE_LIMIT);
165                if r.is_err() {
166                    sender.log("Packet parser marked connection buffer as failed");
167                    self.is_open = false;
168                    break;
169                }
170
171                if let Ok(Some(packet)) = r {
172                    let mut host = self.socket.peer_addr().unwrap();
173                    host.set_port(packet.port);
174                    let from = format!("{}@{}", packet.from, host);
175
176                    sender.log(&format!(
177                        "server/rcvd: to={} from={} bytes={}",
178                        packet.to,
179                        packet.from,
180                        packet.payload.len()
181                    ));
182                    let e = Envelope::of(packet.payload).to(&packet.to).from(&from);
183                    sender.send(&packet.to, e);
184                } else {
185                    break;
186                }
187            }
188
189            if self.can_write && self.send_buf.len() > 0 {
190                match self.socket.write_all(self.send_buf.as_ref()) {
191                    Ok(_) => {
192                        self.send_buf.clear();
193                    }
194                    _ => {
195                        self.is_open = false;
196                    }
197                }
198            }
199
200            self.keep_open(sender);
201        }
202    }
203}