message_io/
network.rs

1mod resource_id;
2mod endpoint;
3mod poll;
4mod registry;
5mod driver;
6mod remote_addr;
7mod transport;
8mod loader;
9
10/// Module that specify the pattern to follow to create adapters.
11/// This module is not part of the public API itself,
12/// it must be used from the internals to build new adapters.
13pub mod adapter;
14
15// Reexports
16pub use adapter::{SendStatus};
17pub use resource_id::{ResourceId, ResourceType};
18pub use endpoint::{Endpoint};
19pub use remote_addr::{RemoteAddr, ToRemoteAddr};
20pub use transport::{Transport, TransportConnect, TransportListen};
21pub use driver::{NetEvent};
22pub use poll::{Readiness};
23
24use loader::{DriverLoader, ActionControllerList, EventProcessorList};
25use poll::{Poll, PollEvent};
26
27use strum::{IntoEnumIterator};
28
29use std::net::{SocketAddr, ToSocketAddrs};
30use std::time::{Duration, Instant};
31use std::io::{self};
32
33/// Create a network instance giving its controller and processor.
34pub fn split() -> (NetworkController, NetworkProcessor) {
35    let mut drivers = DriverLoader::default();
36    Transport::iter().for_each(|transport| transport.mount_adapter(&mut drivers));
37
38    let (poll, controllers, processors) = drivers.take();
39
40    let network_controller = NetworkController::new(controllers);
41    let network_processor = NetworkProcessor::new(poll, processors);
42
43    (network_controller, network_processor)
44}
45
46/// Shareable instance in charge of control all the connections.
47pub struct NetworkController {
48    controllers: ActionControllerList,
49}
50
51impl NetworkController {
52    fn new(controllers: ActionControllerList) -> NetworkController {
53        Self { controllers }
54    }
55
56    /// Creates a connection to the specified address.
57    /// The endpoint, an identifier of the new connection, will be returned.
58    /// This function will generate a [`NetEvent::Connected`] event with the result of the connection.
59    /// This call will **NOT** block to perform the connection.
60    ///
61    /// Note that this function can return an error in the case the internal socket
62    /// could not be binded or open in the OS, but never will return an error an regarding
63    /// the connection itself.
64    /// If you want to check if the connection has been established or not you have to read the
65    /// boolean indicator in the [`NetEvent::Connected`] event.
66    ///
67    /// Example
68    /// ```
69    /// use message_io::node::{self, NodeEvent};
70    /// use message_io::network::{Transport, NetEvent};
71    ///
72    /// let (handler, listener) = node::split();
73    /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
74    ///
75    /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap();
76    /// let (conn_endpoint, _) = handler.network().connect(Transport::FramedTcp, addr).unwrap();
77    /// // The socket could not be able to send yet.
78    ///
79    /// listener.for_each(move |event| match event {
80    ///     NodeEvent::Network(net_event) => match net_event {
81    ///         NetEvent::Connected(endpoint, established) => {
82    ///             assert_eq!(conn_endpoint, endpoint);
83    ///             if established {
84    ///                 println!("Connected!");
85    ///                 handler.network().send(endpoint, &[42]);
86    ///             }
87    ///             else {
88    ///                 println!("Could not connect");
89    ///             }
90    ///         },
91    ///         NetEvent::Accepted(endpoint, listening_id) => {
92    ///             assert_eq!(id, listening_id);
93    ///             println!("New connected endpoint: {}", endpoint.addr());
94    ///         },
95    ///         _ => (),
96    ///     }
97    ///     NodeEvent::Signal(_) => handler.stop(),
98    /// });
99    /// ```
100    pub fn connect(
101        &self,
102        transport: Transport,
103        addr: impl ToRemoteAddr,
104    ) -> io::Result<(Endpoint, SocketAddr)> {
105        self.connect_with(transport.into(), addr)
106    }
107
108    /// Creates a connection to the specified address with custom transport options for transports
109    /// that support it.
110    /// The endpoint, an identifier of the new connection, will be returned.
111    /// This function will generate a [`NetEvent::Connected`] event with the result of the
112    /// connection.  This call will **NOT** block to perform the connection.
113    ///
114    /// Note that this function can return an error in the case the internal socket
115    /// could not be binded or open in the OS, but never will return an error regarding
116    /// the connection itself.
117    /// If you want to check if the connection has been established or not you have to read the
118    /// boolean indicator in the [`NetEvent::Connected`] event.
119    ///
120    /// Example
121    /// ```
122    /// use message_io::node::{self, NodeEvent};
123    /// use message_io::network::{TransportConnect, NetEvent};
124    /// use message_io::adapters::udp::{UdpConnectConfig};
125    ///
126    /// let (handler, listener) = node::split();
127    /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
128    ///
129    /// let config = UdpConnectConfig::default().with_broadcast();
130    /// let addr = "255.255.255.255:7777";
131    /// let (conn_endpoint, _) = handler.network().connect_with(TransportConnect::Udp(config), addr).unwrap();
132    /// // The socket could not be able to send yet.
133    ///
134    /// listener.for_each(move |event| match event {
135    ///     NodeEvent::Network(net_event) => match net_event {
136    ///         NetEvent::Connected(endpoint, established) => {
137    ///             assert_eq!(conn_endpoint, endpoint);
138    ///             if established {
139    ///                 println!("Connected!");
140    ///                 handler.network().send(endpoint, &[42]);
141    ///             }
142    ///             else {
143    ///                 println!("Could not connect");
144    ///             }
145    ///         },
146    ///         _ => (),
147    ///     }
148    ///     NodeEvent::Signal(_) => handler.stop(),
149    /// });
150    /// ```
151    pub fn connect_with(
152        &self,
153        transport_connect: TransportConnect,
154        addr: impl ToRemoteAddr,
155    ) -> io::Result<(Endpoint, SocketAddr)> {
156        let addr = addr.to_remote_addr().unwrap();
157        self.controllers[transport_connect.id() as usize].connect_with(transport_connect, addr).map(
158            |(endpoint, addr)| {
159                log::trace!("Connect to {}", endpoint);
160                (endpoint, addr)
161            },
162        )
163    }
164
165    /// Creates a connection to the specified address.
166    /// This function is similar to [`NetworkController::connect()`] but will block
167    /// until for the connection is ready.
168    /// If the connection can not be established, a `ConnectionRefused` error will be returned.
169    ///
170    /// Note that the `Connect` event will be also generated.
171    ///
172    /// Since this function blocks the current thread, it must NOT be used inside
173    /// the network callback because the internal event could not be processed.
174    ///
175    /// In order to get the best scalability and performance, use the non-blocking
176    /// [`NetworkController::connect()`] version.
177    ///
178    /// Example
179    /// ```
180    /// use message_io::node::{self, NodeEvent};
181    /// use message_io::network::{Transport, NetEvent};
182    ///
183    /// let (handler, listener) = node::split();
184    /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
185    ///
186    /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap();
187    /// match handler.network().connect_sync(Transport::FramedTcp, addr) {
188    ///     Ok((endpoint, _)) => {
189    ///         println!("Connected!");
190    ///         handler.network().send(endpoint, &[42]);
191    ///     }
192    ///     Err(err) if err.kind() == std::io::ErrorKind::ConnectionRefused => {
193    ///         println!("Could not connect");
194    ///     }
195    ///     Err(err) => println!("An OS error creating the socket"),
196    /// }
197    /// ```
198    pub fn connect_sync(
199        &self,
200        transport: Transport,
201        addr: impl ToRemoteAddr,
202    ) -> io::Result<(Endpoint, SocketAddr)> {
203        self.connect_sync_with(transport.into(), addr)
204    }
205
206    /// Creates a connection to the specified address with custom transport options for transports
207    /// that support it.
208    /// This function is similar to [`NetworkController::connect_with()`] but will block
209    /// until for the connection is ready.
210    /// If the connection can not be established, a `ConnectionRefused` error will be returned.
211    ///
212    /// Note that the `Connect` event will be also generated.
213    ///
214    /// Since this function blocks the current thread, it must NOT be used inside
215    /// the network callback because the internal event could not be processed.
216    ///
217    /// In order to get the best scalability and performance, use the non-blocking
218    /// [`NetworkController::connect_with()`] version.
219    ///
220    /// Example
221    /// ```
222    /// use message_io::node::{self, NodeEvent};
223    /// use message_io::network::{TransportConnect, NetEvent};
224    /// use message_io::adapters::udp::{UdpConnectConfig};
225    ///
226    /// let (handler, listener) = node::split();
227    /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
228    ///
229    /// let config = UdpConnectConfig::default().with_broadcast();
230    /// let addr = "255.255.255.255:7777";
231    /// match handler.network().connect_sync_with(TransportConnect::Udp(config), addr) {
232    ///     Ok((endpoint, _)) => {
233    ///         println!("Connected!");
234    ///         handler.network().send(endpoint, &[42]);
235    ///     }
236    ///     Err(err) if err.kind() == std::io::ErrorKind::ConnectionRefused => {
237    ///         println!("Could not connect");
238    ///     }
239    ///     Err(err) => println!("An OS error creating the socket"),
240    /// }
241    /// ```
242    pub fn connect_sync_with(
243        &self,
244        transport_connect: TransportConnect,
245        addr: impl ToRemoteAddr,
246    ) -> io::Result<(Endpoint, SocketAddr)> {
247        let (endpoint, addr) = self.connect_with(transport_connect, addr)?;
248        loop {
249            std::thread::sleep(Duration::from_millis(1));
250            match self.is_ready(endpoint.resource_id()) {
251                Some(true) => return Ok((endpoint, addr)),
252                Some(false) => continue,
253                None => {
254                    return Err(io::Error::new(
255                        io::ErrorKind::ConnectionRefused,
256                        "Connection refused",
257                    ))
258                }
259            }
260        }
261    }
262
263    /// Listen messages from specified transport.
264    /// The given address will be used as interface and listening port.
265    /// If the port can be opened, a [ResourceId] identifying the listener is returned
266    /// along with the local address, or an error if not.
267    /// The address is returned despite you passed as parameter because
268    /// when a `0` port is specified, the OS will give choose the value.
269    pub fn listen(
270        &self,
271        transport: Transport,
272        addr: impl ToSocketAddrs,
273    ) -> io::Result<(ResourceId, SocketAddr)> {
274        self.listen_with(transport.into(), addr)
275    }
276
277    /// Listen messages from specified transport with custom transport options for transports that
278    /// support it.
279    /// The given address will be used as interface and listening port.
280    /// If the port can be opened, a [ResourceId] identifying the listener is returned
281    /// along with the local address, or an error if not.
282    /// The address is returned despite you passed as parameter because
283    /// when a `0` port is specified, the OS will give choose the value.
284    pub fn listen_with(
285        &self,
286        transport_listen: TransportListen,
287        addr: impl ToSocketAddrs,
288    ) -> io::Result<(ResourceId, SocketAddr)> {
289        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
290        self.controllers[transport_listen.id() as usize].listen_with(transport_listen, addr).map(
291            |(resource_id, addr)| {
292                log::trace!("Listening at {} by {}", addr, resource_id);
293                (resource_id, addr)
294            },
295        )
296    }
297
298    /// Send the data message thought the connection represented by the given endpoint.
299    /// This function returns a [`SendStatus`] indicating the status of this send.
300    /// There is no guarantee that send over a correct connection generates a [`SendStatus::Sent`]
301    /// because any time a connection can be disconnected (even while you are sending).
302    /// Except cases where you need to be sure that the message has been sent,
303    /// you will want to process a [`NetEvent::Disconnected`] to determine if the connection +
304    /// is *alive* instead of check if `send()` returned [`SendStatus::ResourceNotFound`].
305    pub fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
306        log::trace!("Sending {} bytes to {}...", data.len(), endpoint);
307        let status =
308            self.controllers[endpoint.resource_id().adapter_id() as usize].send(endpoint, data);
309        log::trace!("Send status: {:?}", status);
310        status
311    }
312
313    /// Remove a network resource.
314    /// Returns `false` if the resource id doesn't exists.
315    /// This is used to remove resources as connection or listeners.
316    /// Resources of endpoints generated by listening in connection oriented transports
317    /// can also be removed to close the connection.
318    /// Removing an already connected connection implies a disconnection.
319    /// Note that non-oriented connections as UDP use its listener resource to manage all
320    /// remote endpoints internally, the remotes have not resource for themselfs.
321    /// It means that all generated `Endpoint`s share the `ResourceId` of the listener and
322    /// if you remove this resource you are removing the listener of all of them.
323    /// For that cases there is no need to remove the resource because non-oriented connections
324    /// have not connection itself to close, 'there is no spoon'.
325    pub fn remove(&self, resource_id: ResourceId) -> bool {
326        log::trace!("Remove {}", resource_id);
327        let value = self.controllers[resource_id.adapter_id() as usize].remove(resource_id);
328        log::trace!("Removed: {}", value);
329        value
330    }
331
332    /// Check a resource specified by `resource_id` is ready.
333    /// If the status is `true` means that the resource is ready to use.
334    /// In connection oriented transports, it implies the resource is connected.
335    /// If the status is `false` it means that the resource is not yet ready to use.
336    /// If the resource has been removed, disconnected, or does not exists in the network,
337    /// a `None` is returned.
338    pub fn is_ready(&self, resource_id: ResourceId) -> Option<bool> {
339        self.controllers[resource_id.adapter_id() as usize].is_ready(resource_id)
340    }
341}
342
343/// Instance in charge of process input network events.
344/// These events are offered to the user as a [`NetEvent`] its processing data.
345pub struct NetworkProcessor {
346    poll: Poll,
347    processors: EventProcessorList,
348}
349
350impl NetworkProcessor {
351    fn new(poll: Poll, processors: EventProcessorList) -> Self {
352        Self { poll, processors }
353    }
354
355    /// Process the next poll event.
356    /// This method waits the timeout specified until the poll event is generated.
357    /// If `None` is passed as timeout, it will wait indefinitely.
358    /// Note that there is no 1-1 relation between an internal poll event and a [`NetEvent`].
359    /// You need to assume that process an internal poll event could call 0 or N times to
360    /// the callback with diferents `NetEvent`s.
361    pub fn process_poll_event(
362        &mut self,
363        timeout: Option<Duration>,
364        mut event_callback: impl FnMut(NetEvent<'_>),
365    ) {
366        let processors = &mut self.processors;
367        self.poll.process_event(timeout, |poll_event| {
368            match poll_event {
369                PollEvent::Network(resource_id, interest) => {
370                    let processor = &processors[resource_id.adapter_id() as usize];
371                    processor.process(resource_id, interest, &mut |net_event| {
372                        log::trace!("Processed {:?}", net_event);
373                        event_callback(net_event);
374                    });
375                }
376
377                #[allow(dead_code)] //TODO: remove it with native event support
378                PollEvent::Waker => todo!(),
379            }
380        });
381    }
382
383    /// Process poll events until there is no more events during a `timeout` duration.
384    /// This method makes succesive calls to [`NetworkProcessor::process_poll_event()`].
385    pub fn process_poll_events_until_timeout(
386        &mut self,
387        timeout: Duration,
388        mut event_callback: impl FnMut(NetEvent<'_>),
389    ) {
390        loop {
391            let now = Instant::now();
392            self.process_poll_event(Some(timeout), &mut event_callback);
393            if now.elapsed() > timeout {
394                break;
395            }
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use std::time::{Duration};
404    use crate::util::thread::{NamespacedThread};
405
406    use test_case::test_case;
407
408    lazy_static::lazy_static! {
409        static ref TIMEOUT: Duration = Duration::from_millis(1000);
410        static ref LOCALHOST_CONN_TIMEOUT: Duration = Duration::from_millis(5000);
411    }
412
413    #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
414    #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
415    #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
416    fn successful_connection(transport: Transport) {
417        let (controller, mut processor) = self::split();
418        let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
419        let (endpoint, _) = controller.connect(transport, addr).unwrap();
420
421        let mut was_connected = 0;
422        let mut was_accepted = 0;
423        processor.process_poll_events_until_timeout(*TIMEOUT, |net_event| match net_event {
424            NetEvent::Connected(net_endpoint, status) => {
425                assert!(status);
426                assert_eq!(endpoint, net_endpoint);
427                was_connected += 1;
428            }
429            NetEvent::Accepted(_, net_listener_id) => {
430                assert_eq!(listener_id, net_listener_id);
431                was_accepted += 1;
432            }
433            _ => unreachable!(),
434        });
435        assert_eq!(was_accepted, 1);
436        assert_eq!(was_connected, 1);
437    }
438
439    #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
440    #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
441    #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
442    fn successful_connection_sync(transport: Transport) {
443        let (controller, mut processor) = self::split();
444        let (_, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
445
446        let mut thread = NamespacedThread::spawn("test", move || {
447            let (endpoint, _) = controller.connect_sync(transport, addr).unwrap();
448            assert!(controller.is_ready(endpoint.resource_id()).unwrap());
449        });
450
451        processor.process_poll_events_until_timeout(*TIMEOUT, |_| ());
452
453        thread.join();
454    }
455
456    #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
457    #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
458    #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
459    fn unreachable_connection(transport: Transport) {
460        let (controller, mut processor) = self::split();
461
462        // Ensure that addr is not using by other process
463        // because it takes some secs to be reusable.
464        let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
465        controller.remove(listener_id);
466
467        let (endpoint, _) = controller.connect(transport, addr).unwrap();
468        assert_eq!(controller.send(endpoint, &[42]), SendStatus::ResourceNotAvailable);
469        assert!(!controller.is_ready(endpoint.resource_id()).unwrap());
470
471        let mut was_disconnected = false;
472        processor.process_poll_events_until_timeout(*LOCALHOST_CONN_TIMEOUT, |net_event| {
473            match net_event {
474                NetEvent::Connected(net_endpoint, status) => {
475                    assert!(!status);
476                    assert_eq!(endpoint, net_endpoint);
477                    was_disconnected = true;
478                }
479                _ => unreachable!(),
480            }
481        });
482        assert!(was_disconnected);
483    }
484
485    #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
486    #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
487    #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
488    fn unreachable_connection_sync(transport: Transport) {
489        let (controller, mut processor) = self::split();
490
491        // Ensure that addr is not using by other process
492        // because it takes some secs to be reusable.
493        let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
494        controller.remove(listener_id);
495
496        let mut thread = NamespacedThread::spawn("test", move || {
497            let err = controller.connect_sync(transport, addr).unwrap_err();
498            assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
499        });
500
501        processor.process_poll_events_until_timeout(*LOCALHOST_CONN_TIMEOUT, |_| ());
502
503        thread.join();
504    }
505
506    #[test]
507    fn create_remove_listener() {
508        let (controller, mut processor) = self::split();
509        let (listener_id, _) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
510        assert!(controller.remove(listener_id)); // Do not generate an event
511        assert!(!controller.remove(listener_id));
512
513        processor.process_poll_events_until_timeout(*TIMEOUT, |_| unreachable!());
514    }
515
516    #[test]
517    fn create_remove_listener_with_connection() {
518        let (controller, mut processor) = self::split();
519        let (listener_id, addr) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
520        controller.connect(Transport::Tcp, addr).unwrap();
521
522        let mut was_accepted = false;
523        processor.process_poll_events_until_timeout(*TIMEOUT, |net_event| match net_event {
524            NetEvent::Connected(..) => (),
525            NetEvent::Accepted(_, _) => {
526                assert!(controller.remove(listener_id));
527                assert!(!controller.remove(listener_id));
528                was_accepted = true;
529            }
530            _ => unreachable!(),
531        });
532        assert!(was_accepted);
533    }
534}