uppercut/remote/
server.rs1use std::io::{Read, Write};
2use std::net::SocketAddr;
3use std::time::Duration;
4
5use mio::net::{TcpListener, TcpStream};
6use mio::{Events, Interest, Poll, Token};
7
8use parsed::stream::ByteStream;
9
10use crate::api::{AnyActor, AnySender, Envelope};
11use crate::config::ServerConfig;
12use crate::error::Error;
13use crate::remote::packet::Packet;
14
15#[derive(Debug)]
16pub(crate) struct Loop;
17
18#[derive(Debug)]
19struct Work {
20 is_readable: bool,
21 is_writable: bool,
22}
23
24pub struct Server {
25 config: ServerConfig,
26 poll: Poll,
27 events: Events,
28 socket: TcpListener,
29 counter: usize,
30 port: u16,
31}
32
33const PACKET_SIZE_LIMIT: usize = 4096 + 12; impl Server {
37 pub fn listen(addr: &str, config: &ServerConfig) -> Result<Server, Error> {
38 let poll = Poll::new().unwrap();
39 let events = Events::with_capacity(config.events_capacity);
40 let addr = addr
41 .parse::<SocketAddr>()
42 .map_err(|_| Error::InvalidServerAddress(addr.to_string()))?;
43 let port = addr.port();
44 let mut socket = TcpListener::bind(addr).map_err(|_| Error::ServerBindFailed(port))?;
45 poll.registry()
46 .register(&mut socket, Token(0), Interest::READABLE)
47 .unwrap();
48
49 let listener = Server {
50 config: config.clone(),
51 poll,
52 events,
53 socket,
54 counter: 0,
55 port,
56 };
57 Ok(listener)
58 }
59
60 pub fn port(&self) -> u16 {
61 self.port
62 }
63}
64
65impl Server {
66 fn tag(me: &str, id: usize) -> String {
67 format!("{}#{:012}", me, id)
68 }
69}
70
71impl AnyActor for Server {
72 fn receive(&mut self, envelope: Envelope, sender: &mut dyn AnySender) {
73 if envelope.message.downcast_ref::<Loop>().is_some() {
74 self.poll
75 .poll(&mut self.events, Some(Duration::from_millis(1)))
76 .unwrap();
77 for event in self.events.iter() {
78 match event.token() {
79 Token(0) => {
80 while let Ok((mut socket, _remote)) = self.socket.accept() {
81 self.counter += 1;
82 let token = Token(self.counter);
83 self.poll
84 .registry()
85 .register(
86 &mut socket,
87 token,
88 Interest::READABLE | Interest::WRITABLE,
89 )
90 .unwrap();
91 let connection = Connection {
92 socket,
93 keep_alive: true,
94 recv_buf: ByteStream::with_capacity(self.config.recv_buffer_size),
95 send_buf: ByteStream::with_capacity(self.config.send_buffer_size),
96 is_open: true,
97 can_read: false,
98 can_write: false,
99 };
100 let tag = Server::tag(sender.me(), self.counter);
101 sender.spawn(&tag, Box::new(move || Box::new(connection)));
102 }
103 }
104 token => {
105 let tag = Server::tag(sender.me(), token.0);
106 let work = Work {
107 is_readable: event.is_readable(),
108 is_writable: event.is_writable(),
109 };
110 sender.send(&tag, Envelope::of(work));
111 }
112 }
113 }
114 sender.send(sender.me(), Envelope::of(Loop));
115 }
116 }
117}
118
119struct Connection {
120 socket: TcpStream,
121 is_open: bool,
122 keep_alive: bool,
123 recv_buf: ByteStream,
124 send_buf: ByteStream,
125 can_read: bool,
126 can_write: bool,
127}
128
129impl Connection {
130 fn keep_open(&mut self, sender: &mut dyn AnySender) -> bool {
131 if !self.is_open {
132 if !self.keep_alive {
133 self.is_open = true;
134 } else {
135 sender.stop(sender.me());
136 }
137 }
138 self.is_open
139 }
140}
141
142impl AnyActor for Connection {
143 fn receive(&mut self, envelope: Envelope, sender: &mut dyn AnySender) {
144 if let Some(work) = envelope.message.downcast_ref::<Work>() {
145 let mut buffer = [0u8; 1024];
146 self.can_read = work.is_readable;
147 self.can_write = self.can_write || work.is_writable;
148 if self.can_read {
149 match self.socket.read(&mut buffer[..]) {
150 Ok(0) | Err(_) => {
151 self.is_open = false;
152 }
153 Ok(n) => {
154 self.recv_buf.put(&buffer[0..n]);
155 }
156 }
157 }
158
159 if !self.keep_open(sender) {
160 return;
161 }
162
163 loop {
164 let r = Packet::from_bytes(&mut self.recv_buf, PACKET_SIZE_LIMIT);
165 if r.is_err() {
166 sender.log("Packet parser marked connection buffer as failed");
167 self.is_open = false;
168 break;
169 }
170
171 if let Ok(Some(packet)) = r {
172 let mut host = self.socket.peer_addr().unwrap();
173 host.set_port(packet.port);
174 let from = format!("{}@{}", packet.from, host);
175
176 sender.log(&format!(
177 "server/rcvd: to={} from={} bytes={}",
178 packet.to,
179 packet.from,
180 packet.payload.len()
181 ));
182 let e = Envelope::of(packet.payload).to(&packet.to).from(&from);
183 sender.send(&packet.to, e);
184 } else {
185 break;
186 }
187 }
188
189 if self.can_write && self.send_buf.len() > 0 {
190 match self.socket.write_all(self.send_buf.as_ref()) {
191 Ok(_) => {
192 self.send_buf.clear();
193 }
194 _ => {
195 self.is_open = false;
196 }
197 }
198 }
199
200 self.keep_open(sender);
201 }
202 }
203}