muchin/models/effectful/mio/
state.rs1use super::action::{MioEvent, PollResult, TcpAcceptResult, TcpReadResult, TcpWriteResult};
2use crate::automaton::Timeout;
3use crate::automaton::{Objects, Uid};
4use mio::net::{TcpListener, TcpStream};
5use mio::{Events, Interest, Poll, Token};
6use std::cell::RefCell;
7use std::io::{self, Read, Write};
8use std::time::Duration;
9
10pub struct MioState {
11 poll_objects: RefCell<Objects<Poll>>,
12 events_objects: RefCell<Objects<Events>>,
13 tcp_listener_objects: RefCell<Objects<TcpListener>>,
14 tcp_connection_objects: RefCell<Objects<TcpStream>>,
15}
16
17impl MioState {
18 pub fn new() -> Self {
19 Self {
20 poll_objects: RefCell::new(Objects::<Poll>::new()),
21 events_objects: RefCell::new(Objects::<Events>::new()),
22 tcp_listener_objects: RefCell::new(Objects::<TcpListener>::new()),
23 tcp_connection_objects: RefCell::new(Objects::<TcpStream>::new()),
24 }
25 }
26
27 fn new_poll(&mut self, uid: Uid, obj: Poll) {
28 if self.poll_objects.borrow_mut().insert(uid, obj).is_some() {
29 panic!("Attempt to re-use existing {:?}", uid)
30 }
31 }
32
33 fn new_events(&mut self, uid: Uid, obj: Events) {
34 if self.events_objects.borrow_mut().insert(uid, obj).is_some() {
35 panic!("Attempt to re-use existing {:?}", uid)
36 }
37 }
38
39 fn new_tcp_listener(&mut self, uid: Uid, obj: TcpListener) {
40 if self
41 .tcp_listener_objects
42 .borrow_mut()
43 .insert(uid, obj)
44 .is_some()
45 {
46 panic!("Attempt to re-use existing {:?}", uid)
47 }
48 }
49
50 fn new_tcp_connection(&mut self, uid: Uid, obj: TcpStream) {
51 if self
52 .tcp_connection_objects
53 .borrow_mut()
54 .insert(uid, obj)
55 .is_some()
56 {
57 panic!("Attempt to re-use existing {:?}", uid)
58 }
59 }
60
61 pub fn poll_create(&mut self, uid: Uid) -> Result<(), String> {
62 match Poll::new() {
63 Ok(poll_obj) => {
64 self.new_poll(uid, poll_obj);
65 Ok(())
66 }
67 Err(error) => Err(error.to_string()),
68 }
69 }
70
71 pub fn poll_register_tcp_server(
72 &mut self,
73 poll: &Uid,
74 tcp_listener: Uid,
75 ) -> Result<(), String> {
76 let mut tcp_listener_objects = self.tcp_listener_objects.borrow_mut();
77
78 let listener = tcp_listener_objects
79 .get_mut(&tcp_listener)
80 .expect(&format!("TcpListener object {:?} not found", tcp_listener));
81
82 if let Some(poll) = self.poll_objects.borrow().get(poll) {
83 match poll
84 .registry()
85 .register(listener, Token(tcp_listener.into()), Interest::READABLE)
86 {
87 Ok(_) => Ok(()),
88 Err(error) => Err(error.to_string()),
89 }
90 } else {
91 panic!("Poll object not found {:?}", poll)
92 }
93 }
94
95 pub fn poll_register_tcp_connection(
96 &mut self,
97 poll: &Uid,
98 connection: Uid,
99 ) -> Result<(), String> {
100 let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
101 let stream = tcp_connection_objects
102 .get_mut(&connection)
103 .expect(&format!("TcpConnection object not found {:?}", connection));
104
105 match self
106 .poll_objects
107 .borrow()
108 .get(poll)
109 .expect(&format!("Poll object not found {:?}", poll))
110 .registry()
111 .register(
112 stream,
113 Token(connection.into()),
114 Interest::READABLE.add(Interest::WRITABLE),
115 ) {
116 Ok(_) => Ok(()),
117 Err(error) => Err(error.to_string()),
118 }
119 }
120
121 pub fn poll_deregister_tcp_connection(
122 &mut self,
123 poll: &Uid,
124 connection: Uid,
125 ) -> Result<(), String> {
126 let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
127 let stream = tcp_connection_objects
128 .get_mut(&connection)
129 .expect(&format!("TcpConnection object not found {:?}", connection));
130
131 match self
132 .poll_objects
133 .borrow()
134 .get(poll)
135 .expect(&format!("Poll object not found {:?}", poll))
136 .registry()
137 .deregister(stream)
138 {
139 Ok(_) => Ok(()),
140 Err(error) => Err(error.to_string()),
141 }
142 }
143
144 pub fn poll_events(&mut self, poll: &Uid, events: &Uid, timeout: Timeout) -> PollResult {
145 let mut events_object = self.events_objects.borrow_mut();
146 let events = events_object
147 .get_mut(events)
148 .expect(&format!("Events object not found {:?}", events));
149
150 let timeout = match timeout {
151 Timeout::Millis(ms) => Some(Duration::from_millis(ms)),
152 Timeout::Never => None,
153 };
154
155 match self
156 .poll_objects
157 .borrow_mut()
158 .get_mut(poll)
159 .expect(&format!("Poll object not found {:?}", poll))
160 .poll(events, timeout)
161 {
162 Err(err) if err.kind() == io::ErrorKind::Interrupted => PollResult::Interrupted,
163 Err(err) => PollResult::Error(err.to_string()),
164 Ok(_) => {
165 let events = events
166 .iter()
167 .map(|event| MioEvent {
168 token: event.token().0.into(),
169 readable: event.is_readable(),
170 writable: event.is_writable(),
171 error: event.is_error(),
172 read_closed: event.is_read_closed(),
173 write_closed: event.is_write_closed(),
174 priority: event.is_priority(),
175 aio: event.is_aio(),
176 lio: event.is_lio(),
177 })
178 .collect();
179 PollResult::Events(events)
181 }
182 }
183 }
184
185 pub fn events_create(&mut self, uid: Uid, capacity: usize) {
186 self.new_events(uid, Events::with_capacity(capacity));
187 }
188
189 pub fn tcp_listen(&mut self, uid: Uid, address: String) -> Result<(), String> {
190 match address.parse() {
191 Ok(address) => match TcpListener::bind(address) {
192 Ok(tcp_listener) => {
193 self.new_tcp_listener(uid, tcp_listener);
194 Ok(())
195 }
196 Err(error) => Err(error.to_string()),
197 },
198 Err(error) => Err(error.to_string()),
199 }
200 }
201
202 pub fn tcp_accept(&mut self, connection: Uid, listener: &Uid) -> TcpAcceptResult {
203 let accept_result = {
204 let tcp_listener_objects = self.tcp_listener_objects.borrow();
205 let tcp_listener = tcp_listener_objects
206 .get(listener)
207 .expect(&format!("TcpListener object {:?} not found", listener));
208
209 tcp_listener.accept()
210 };
211
212 match accept_result {
213 Ok((stream, _address)) => {
214 self.new_tcp_connection(connection, stream);
215 TcpAcceptResult::Success
216 }
217
218 Err(error) => {
219 if error.kind() == std::io::ErrorKind::WouldBlock {
220 TcpAcceptResult::WouldBlock
221 } else {
222 TcpAcceptResult::Error(error.to_string())
223 }
224 }
225 }
226 }
227
228 pub fn tcp_connect(&mut self, connection: Uid, address: String) -> Result<(), String> {
229 match address.parse() {
230 Ok(address) => match TcpStream::connect(address) {
231 Ok(stream) => {
232 self.new_tcp_connection(connection, stream);
233 Ok(())
234 }
235 Err(error) => Err(error.to_string()),
236 },
237 Err(error) => Err(error.to_string()),
238 }
239 }
240
241 pub fn tcp_close(&mut self, connection: &Uid) {
242 let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
243
244 tcp_connection_objects.remove(connection).expect(&format!(
245 "TCP connection stream object not found {:?}",
246 connection
247 ));
248 }
250
251 pub fn tcp_write(&mut self, connection: &Uid, data: &[u8]) -> TcpWriteResult {
252 let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
253 let stream = tcp_connection_objects.get_mut(connection).expect(&format!(
254 "TCP connection stream object not found {:?}",
255 connection
256 ));
257
258 match stream.write(data) {
259 Ok(written) => {
260 if written < data.len() {
261 TcpWriteResult::WrittenPartial(written)
262 } else {
263 TcpWriteResult::WrittenAll
264 }
265 }
266 Err(error) => match error.kind() {
267 io::ErrorKind::Interrupted => TcpWriteResult::Interrupted,
268 io::ErrorKind::WouldBlock => TcpWriteResult::WouldBlock,
269 _ => TcpWriteResult::Error(error.to_string()),
270 },
271 }
272 }
273
274 pub fn tcp_read(&mut self, connection: &Uid, len: usize) -> TcpReadResult {
275 assert_ne!(len, 0);
276
277 let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
278 let stream = tcp_connection_objects.get_mut(connection).expect(&format!(
279 "TCP connection stream object not found {:?}",
280 connection
281 ));
282
283 let mut recv_buf = vec![0u8; len];
284
285 match stream.read(&mut recv_buf) {
286 Ok(read) if read > 0 => {
287 if read < len {
288 recv_buf.truncate(read);
289 TcpReadResult::ReadPartial(recv_buf)
290 } else {
291 TcpReadResult::ReadAll(recv_buf)
292 }
293 }
294 Ok(_) => TcpReadResult::Error("Connection closed".to_string()),
295 Err(error) => match error.kind() {
296 io::ErrorKind::Interrupted => TcpReadResult::Interrupted,
297 io::ErrorKind::WouldBlock => TcpReadResult::WouldBlock,
298 _ => TcpReadResult::Error(error.to_string()),
299 },
300 }
301 }
302
303 pub fn tcp_peer_address(&mut self, connection: &Uid) -> Result<String, String> {
304 let tcp_connection_objects = self.tcp_connection_objects.borrow();
305 let stream = tcp_connection_objects.get(connection).expect(&format!(
306 "TCP connection stream object not found {:?}",
307 connection
308 ));
309
310 match stream.peer_addr() {
311 Ok(addr) => Ok(addr.to_string()),
312 Err(err) => Err(err.to_string()),
313 }
314 }
315}