muchin/models/effectful/mio/
state.rs

1use super::action::{MioEvent, PollResult, TcpAcceptResult, TcpReadResult, TcpWriteResult};
2use crate::automaton::Timeout;
3use crate::automaton::{Objects, Uid};
4use mio::net::{TcpListener, TcpStream};
5use mio::{Events, Interest, Poll, Token};
6use std::cell::RefCell;
7use std::io::{self, Read, Write};
8use std::time::Duration;
9
10pub struct MioState {
11    poll_objects: RefCell<Objects<Poll>>,
12    events_objects: RefCell<Objects<Events>>,
13    tcp_listener_objects: RefCell<Objects<TcpListener>>,
14    tcp_connection_objects: RefCell<Objects<TcpStream>>,
15}
16
17impl MioState {
18    pub fn new() -> Self {
19        Self {
20            poll_objects: RefCell::new(Objects::<Poll>::new()),
21            events_objects: RefCell::new(Objects::<Events>::new()),
22            tcp_listener_objects: RefCell::new(Objects::<TcpListener>::new()),
23            tcp_connection_objects: RefCell::new(Objects::<TcpStream>::new()),
24        }
25    }
26
27    fn new_poll(&mut self, uid: Uid, obj: Poll) {
28        if self.poll_objects.borrow_mut().insert(uid, obj).is_some() {
29            panic!("Attempt to re-use existing {:?}", uid)
30        }
31    }
32
33    fn new_events(&mut self, uid: Uid, obj: Events) {
34        if self.events_objects.borrow_mut().insert(uid, obj).is_some() {
35            panic!("Attempt to re-use existing {:?}", uid)
36        }
37    }
38
39    fn new_tcp_listener(&mut self, uid: Uid, obj: TcpListener) {
40        if self
41            .tcp_listener_objects
42            .borrow_mut()
43            .insert(uid, obj)
44            .is_some()
45        {
46            panic!("Attempt to re-use existing {:?}", uid)
47        }
48    }
49
50    fn new_tcp_connection(&mut self, uid: Uid, obj: TcpStream) {
51        if self
52            .tcp_connection_objects
53            .borrow_mut()
54            .insert(uid, obj)
55            .is_some()
56        {
57            panic!("Attempt to re-use existing {:?}", uid)
58        }
59    }
60
61    pub fn poll_create(&mut self, uid: Uid) -> Result<(), String> {
62        match Poll::new() {
63            Ok(poll_obj) => {
64                self.new_poll(uid, poll_obj);
65                Ok(())
66            }
67            Err(error) => Err(error.to_string()),
68        }
69    }
70
71    pub fn poll_register_tcp_server(
72        &mut self,
73        poll: &Uid,
74        tcp_listener: Uid,
75    ) -> Result<(), String> {
76        let mut tcp_listener_objects = self.tcp_listener_objects.borrow_mut();
77
78        let listener = tcp_listener_objects
79            .get_mut(&tcp_listener)
80            .expect(&format!("TcpListener object {:?} not found", tcp_listener));
81
82        if let Some(poll) = self.poll_objects.borrow().get(poll) {
83            match poll
84                .registry()
85                .register(listener, Token(tcp_listener.into()), Interest::READABLE)
86            {
87                Ok(_) => Ok(()),
88                Err(error) => Err(error.to_string()),
89            }
90        } else {
91            panic!("Poll object not found {:?}", poll)
92        }
93    }
94
95    pub fn poll_register_tcp_connection(
96        &mut self,
97        poll: &Uid,
98        connection: Uid,
99    ) -> Result<(), String> {
100        let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
101        let stream = tcp_connection_objects
102            .get_mut(&connection)
103            .expect(&format!("TcpConnection object not found {:?}", connection));
104
105        match self
106            .poll_objects
107            .borrow()
108            .get(poll)
109            .expect(&format!("Poll object not found {:?}", poll))
110            .registry()
111            .register(
112                stream,
113                Token(connection.into()),
114                Interest::READABLE.add(Interest::WRITABLE),
115            ) {
116            Ok(_) => Ok(()),
117            Err(error) => Err(error.to_string()),
118        }
119    }
120
121    pub fn poll_deregister_tcp_connection(
122        &mut self,
123        poll: &Uid,
124        connection: Uid,
125    ) -> Result<(), String> {
126        let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
127        let stream = tcp_connection_objects
128            .get_mut(&connection)
129            .expect(&format!("TcpConnection object not found {:?}", connection));
130
131        match self
132            .poll_objects
133            .borrow()
134            .get(poll)
135            .expect(&format!("Poll object not found {:?}", poll))
136            .registry()
137            .deregister(stream)
138        {
139            Ok(_) => Ok(()),
140            Err(error) => Err(error.to_string()),
141        }
142    }
143
144    pub fn poll_events(&mut self, poll: &Uid, events: &Uid, timeout: Timeout) -> PollResult {
145        let mut events_object = self.events_objects.borrow_mut();
146        let events = events_object
147            .get_mut(events)
148            .expect(&format!("Events object not found {:?}", events));
149
150        let timeout = match timeout {
151            Timeout::Millis(ms) => Some(Duration::from_millis(ms)),
152            Timeout::Never => None,
153        };
154
155        match self
156            .poll_objects
157            .borrow_mut()
158            .get_mut(poll)
159            .expect(&format!("Poll object not found {:?}", poll))
160            .poll(events, timeout)
161        {
162            Err(err) if err.kind() == io::ErrorKind::Interrupted => PollResult::Interrupted,
163            Err(err) => PollResult::Error(err.to_string()),
164            Ok(_) => {
165                let events = events
166                    .iter()
167                    .map(|event| MioEvent {
168                        token: event.token().0.into(),
169                        readable: event.is_readable(),
170                        writable: event.is_writable(),
171                        error: event.is_error(),
172                        read_closed: event.is_read_closed(),
173                        write_closed: event.is_write_closed(),
174                        priority: event.is_priority(),
175                        aio: event.is_aio(),
176                        lio: event.is_lio(),
177                    })
178                    .collect();
179                //info!("|MIO| poll events: {:?}", events);
180                PollResult::Events(events)
181            }
182        }
183    }
184
185    pub fn events_create(&mut self, uid: Uid, capacity: usize) {
186        self.new_events(uid, Events::with_capacity(capacity));
187    }
188
189    pub fn tcp_listen(&mut self, uid: Uid, address: String) -> Result<(), String> {
190        match address.parse() {
191            Ok(address) => match TcpListener::bind(address) {
192                Ok(tcp_listener) => {
193                    self.new_tcp_listener(uid, tcp_listener);
194                    Ok(())
195                }
196                Err(error) => Err(error.to_string()),
197            },
198            Err(error) => Err(error.to_string()),
199        }
200    }
201
202    pub fn tcp_accept(&mut self, connection: Uid, listener: &Uid) -> TcpAcceptResult {
203        let accept_result = {
204            let tcp_listener_objects = self.tcp_listener_objects.borrow();
205            let tcp_listener = tcp_listener_objects
206                .get(listener)
207                .expect(&format!("TcpListener object {:?} not found", listener));
208
209            tcp_listener.accept()
210        };
211
212        match accept_result {
213            Ok((stream, _address)) => {
214                self.new_tcp_connection(connection, stream);
215                TcpAcceptResult::Success
216            }
217
218            Err(error) => {
219                if error.kind() == std::io::ErrorKind::WouldBlock {
220                    TcpAcceptResult::WouldBlock
221                } else {
222                    TcpAcceptResult::Error(error.to_string())
223                }
224            }
225        }
226    }
227
228    pub fn tcp_connect(&mut self, connection: Uid, address: String) -> Result<(), String> {
229        match address.parse() {
230            Ok(address) => match TcpStream::connect(address) {
231                Ok(stream) => {
232                    self.new_tcp_connection(connection, stream);
233                    Ok(())
234                }
235                Err(error) => Err(error.to_string()),
236            },
237            Err(error) => Err(error.to_string()),
238        }
239    }
240
241    pub fn tcp_close(&mut self, connection: &Uid) {
242        let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
243
244        tcp_connection_objects.remove(connection).expect(&format!(
245            "TCP connection stream object not found {:?}",
246            connection
247        ));
248        // implict stream drop
249    }
250
251    pub fn tcp_write(&mut self, connection: &Uid, data: &[u8]) -> TcpWriteResult {
252        let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
253        let stream = tcp_connection_objects.get_mut(connection).expect(&format!(
254            "TCP connection stream object not found {:?}",
255            connection
256        ));
257
258        match stream.write(data) {
259            Ok(written) => {
260                if written < data.len() {
261                    TcpWriteResult::WrittenPartial(written)
262                } else {
263                    TcpWriteResult::WrittenAll
264                }
265            }
266            Err(error) => match error.kind() {
267                io::ErrorKind::Interrupted => TcpWriteResult::Interrupted,
268                io::ErrorKind::WouldBlock => TcpWriteResult::WouldBlock,
269                _ => TcpWriteResult::Error(error.to_string()),
270            },
271        }
272    }
273
274    pub fn tcp_read(&mut self, connection: &Uid, len: usize) -> TcpReadResult {
275        assert_ne!(len, 0);
276
277        let mut tcp_connection_objects = self.tcp_connection_objects.borrow_mut();
278        let stream = tcp_connection_objects.get_mut(connection).expect(&format!(
279            "TCP connection stream object not found {:?}",
280            connection
281        ));
282
283        let mut recv_buf = vec![0u8; len];
284
285        match stream.read(&mut recv_buf) {
286            Ok(read) if read > 0 => {
287                if read < len {
288                    recv_buf.truncate(read);
289                    TcpReadResult::ReadPartial(recv_buf)
290                } else {
291                    TcpReadResult::ReadAll(recv_buf)
292                }
293            }
294            Ok(_) => TcpReadResult::Error("Connection closed".to_string()),
295            Err(error) => match error.kind() {
296                io::ErrorKind::Interrupted => TcpReadResult::Interrupted,
297                io::ErrorKind::WouldBlock => TcpReadResult::WouldBlock,
298                _ => TcpReadResult::Error(error.to_string()),
299            },
300        }
301    }
302
303    pub fn tcp_peer_address(&mut self, connection: &Uid) -> Result<String, String> {
304        let tcp_connection_objects = self.tcp_connection_objects.borrow();
305        let stream = tcp_connection_objects.get(connection).expect(&format!(
306            "TCP connection stream object not found {:?}",
307            connection
308        ));
309
310        match stream.peer_addr() {
311            Ok(addr) => Ok(addr.to_string()),
312            Err(err) => Err(err.to_string()),
313        }
314    }
315}