message_io/network.rs
1mod resource_id;
2mod endpoint;
3mod poll;
4mod registry;
5mod driver;
6mod remote_addr;
7mod transport;
8mod loader;
9
10/// Module that specify the pattern to follow to create adapters.
11/// This module is not part of the public API itself,
12/// it must be used from the internals to build new adapters.
13pub mod adapter;
14
15// Reexports
16pub use adapter::{SendStatus};
17pub use resource_id::{ResourceId, ResourceType};
18pub use endpoint::{Endpoint};
19pub use remote_addr::{RemoteAddr, ToRemoteAddr};
20pub use transport::{Transport, TransportConnect, TransportListen};
21pub use driver::{NetEvent};
22pub use poll::{Readiness};
23
24use loader::{DriverLoader, ActionControllerList, EventProcessorList};
25use poll::{Poll, PollEvent};
26
27use strum::{IntoEnumIterator};
28
29use std::net::{SocketAddr, ToSocketAddrs};
30use std::time::{Duration, Instant};
31use std::io::{self};
32
33/// Create a network instance giving its controller and processor.
34pub fn split() -> (NetworkController, NetworkProcessor) {
35 let mut drivers = DriverLoader::default();
36 Transport::iter().for_each(|transport| transport.mount_adapter(&mut drivers));
37
38 let (poll, controllers, processors) = drivers.take();
39
40 let network_controller = NetworkController::new(controllers);
41 let network_processor = NetworkProcessor::new(poll, processors);
42
43 (network_controller, network_processor)
44}
45
46/// Shareable instance in charge of control all the connections.
47pub struct NetworkController {
48 controllers: ActionControllerList,
49}
50
51impl NetworkController {
52 fn new(controllers: ActionControllerList) -> NetworkController {
53 Self { controllers }
54 }
55
56 /// Creates a connection to the specified address.
57 /// The endpoint, an identifier of the new connection, will be returned.
58 /// This function will generate a [`NetEvent::Connected`] event with the result of the connection.
59 /// This call will **NOT** block to perform the connection.
60 ///
61 /// Note that this function can return an error in the case the internal socket
62 /// could not be binded or open in the OS, but never will return an error an regarding
63 /// the connection itself.
64 /// If you want to check if the connection has been established or not you have to read the
65 /// boolean indicator in the [`NetEvent::Connected`] event.
66 ///
67 /// Example
68 /// ```
69 /// use message_io::node::{self, NodeEvent};
70 /// use message_io::network::{Transport, NetEvent};
71 ///
72 /// let (handler, listener) = node::split();
73 /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
74 ///
75 /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap();
76 /// let (conn_endpoint, _) = handler.network().connect(Transport::FramedTcp, addr).unwrap();
77 /// // The socket could not be able to send yet.
78 ///
79 /// listener.for_each(move |event| match event {
80 /// NodeEvent::Network(net_event) => match net_event {
81 /// NetEvent::Connected(endpoint, established) => {
82 /// assert_eq!(conn_endpoint, endpoint);
83 /// if established {
84 /// println!("Connected!");
85 /// handler.network().send(endpoint, &[42]);
86 /// }
87 /// else {
88 /// println!("Could not connect");
89 /// }
90 /// },
91 /// NetEvent::Accepted(endpoint, listening_id) => {
92 /// assert_eq!(id, listening_id);
93 /// println!("New connected endpoint: {}", endpoint.addr());
94 /// },
95 /// _ => (),
96 /// }
97 /// NodeEvent::Signal(_) => handler.stop(),
98 /// });
99 /// ```
100 pub fn connect(
101 &self,
102 transport: Transport,
103 addr: impl ToRemoteAddr,
104 ) -> io::Result<(Endpoint, SocketAddr)> {
105 self.connect_with(transport.into(), addr)
106 }
107
108 /// Creates a connection to the specified address with custom transport options for transports
109 /// that support it.
110 /// The endpoint, an identifier of the new connection, will be returned.
111 /// This function will generate a [`NetEvent::Connected`] event with the result of the
112 /// connection. This call will **NOT** block to perform the connection.
113 ///
114 /// Note that this function can return an error in the case the internal socket
115 /// could not be binded or open in the OS, but never will return an error regarding
116 /// the connection itself.
117 /// If you want to check if the connection has been established or not you have to read the
118 /// boolean indicator in the [`NetEvent::Connected`] event.
119 ///
120 /// Example
121 /// ```
122 /// use message_io::node::{self, NodeEvent};
123 /// use message_io::network::{TransportConnect, NetEvent};
124 /// use message_io::adapters::udp::{UdpConnectConfig};
125 ///
126 /// let (handler, listener) = node::split();
127 /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
128 ///
129 /// let config = UdpConnectConfig::default().with_broadcast();
130 /// let addr = "255.255.255.255:7777";
131 /// let (conn_endpoint, _) = handler.network().connect_with(TransportConnect::Udp(config), addr).unwrap();
132 /// // The socket could not be able to send yet.
133 ///
134 /// listener.for_each(move |event| match event {
135 /// NodeEvent::Network(net_event) => match net_event {
136 /// NetEvent::Connected(endpoint, established) => {
137 /// assert_eq!(conn_endpoint, endpoint);
138 /// if established {
139 /// println!("Connected!");
140 /// handler.network().send(endpoint, &[42]);
141 /// }
142 /// else {
143 /// println!("Could not connect");
144 /// }
145 /// },
146 /// _ => (),
147 /// }
148 /// NodeEvent::Signal(_) => handler.stop(),
149 /// });
150 /// ```
151 pub fn connect_with(
152 &self,
153 transport_connect: TransportConnect,
154 addr: impl ToRemoteAddr,
155 ) -> io::Result<(Endpoint, SocketAddr)> {
156 let addr = addr.to_remote_addr().unwrap();
157 self.controllers[transport_connect.id() as usize].connect_with(transport_connect, addr).map(
158 |(endpoint, addr)| {
159 log::trace!("Connect to {}", endpoint);
160 (endpoint, addr)
161 },
162 )
163 }
164
165 /// Creates a connection to the specified address.
166 /// This function is similar to [`NetworkController::connect()`] but will block
167 /// until for the connection is ready.
168 /// If the connection can not be established, a `ConnectionRefused` error will be returned.
169 ///
170 /// Note that the `Connect` event will be also generated.
171 ///
172 /// Since this function blocks the current thread, it must NOT be used inside
173 /// the network callback because the internal event could not be processed.
174 ///
175 /// In order to get the best scalability and performance, use the non-blocking
176 /// [`NetworkController::connect()`] version.
177 ///
178 /// Example
179 /// ```
180 /// use message_io::node::{self, NodeEvent};
181 /// use message_io::network::{Transport, NetEvent};
182 ///
183 /// let (handler, listener) = node::split();
184 /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
185 ///
186 /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap();
187 /// match handler.network().connect_sync(Transport::FramedTcp, addr) {
188 /// Ok((endpoint, _)) => {
189 /// println!("Connected!");
190 /// handler.network().send(endpoint, &[42]);
191 /// }
192 /// Err(err) if err.kind() == std::io::ErrorKind::ConnectionRefused => {
193 /// println!("Could not connect");
194 /// }
195 /// Err(err) => println!("An OS error creating the socket"),
196 /// }
197 /// ```
198 pub fn connect_sync(
199 &self,
200 transport: Transport,
201 addr: impl ToRemoteAddr,
202 ) -> io::Result<(Endpoint, SocketAddr)> {
203 self.connect_sync_with(transport.into(), addr)
204 }
205
206 /// Creates a connection to the specified address with custom transport options for transports
207 /// that support it.
208 /// This function is similar to [`NetworkController::connect_with()`] but will block
209 /// until for the connection is ready.
210 /// If the connection can not be established, a `ConnectionRefused` error will be returned.
211 ///
212 /// Note that the `Connect` event will be also generated.
213 ///
214 /// Since this function blocks the current thread, it must NOT be used inside
215 /// the network callback because the internal event could not be processed.
216 ///
217 /// In order to get the best scalability and performance, use the non-blocking
218 /// [`NetworkController::connect_with()`] version.
219 ///
220 /// Example
221 /// ```
222 /// use message_io::node::{self, NodeEvent};
223 /// use message_io::network::{TransportConnect, NetEvent};
224 /// use message_io::adapters::udp::{UdpConnectConfig};
225 ///
226 /// let (handler, listener) = node::split();
227 /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1));
228 ///
229 /// let config = UdpConnectConfig::default().with_broadcast();
230 /// let addr = "255.255.255.255:7777";
231 /// match handler.network().connect_sync_with(TransportConnect::Udp(config), addr) {
232 /// Ok((endpoint, _)) => {
233 /// println!("Connected!");
234 /// handler.network().send(endpoint, &[42]);
235 /// }
236 /// Err(err) if err.kind() == std::io::ErrorKind::ConnectionRefused => {
237 /// println!("Could not connect");
238 /// }
239 /// Err(err) => println!("An OS error creating the socket"),
240 /// }
241 /// ```
242 pub fn connect_sync_with(
243 &self,
244 transport_connect: TransportConnect,
245 addr: impl ToRemoteAddr,
246 ) -> io::Result<(Endpoint, SocketAddr)> {
247 let (endpoint, addr) = self.connect_with(transport_connect, addr)?;
248 loop {
249 std::thread::sleep(Duration::from_millis(1));
250 match self.is_ready(endpoint.resource_id()) {
251 Some(true) => return Ok((endpoint, addr)),
252 Some(false) => continue,
253 None => {
254 return Err(io::Error::new(
255 io::ErrorKind::ConnectionRefused,
256 "Connection refused",
257 ))
258 }
259 }
260 }
261 }
262
263 /// Listen messages from specified transport.
264 /// The given address will be used as interface and listening port.
265 /// If the port can be opened, a [ResourceId] identifying the listener is returned
266 /// along with the local address, or an error if not.
267 /// The address is returned despite you passed as parameter because
268 /// when a `0` port is specified, the OS will give choose the value.
269 pub fn listen(
270 &self,
271 transport: Transport,
272 addr: impl ToSocketAddrs,
273 ) -> io::Result<(ResourceId, SocketAddr)> {
274 self.listen_with(transport.into(), addr)
275 }
276
277 /// Listen messages from specified transport with custom transport options for transports that
278 /// support it.
279 /// The given address will be used as interface and listening port.
280 /// If the port can be opened, a [ResourceId] identifying the listener is returned
281 /// along with the local address, or an error if not.
282 /// The address is returned despite you passed as parameter because
283 /// when a `0` port is specified, the OS will give choose the value.
284 pub fn listen_with(
285 &self,
286 transport_listen: TransportListen,
287 addr: impl ToSocketAddrs,
288 ) -> io::Result<(ResourceId, SocketAddr)> {
289 let addr = addr.to_socket_addrs().unwrap().next().unwrap();
290 self.controllers[transport_listen.id() as usize].listen_with(transport_listen, addr).map(
291 |(resource_id, addr)| {
292 log::trace!("Listening at {} by {}", addr, resource_id);
293 (resource_id, addr)
294 },
295 )
296 }
297
298 /// Send the data message thought the connection represented by the given endpoint.
299 /// This function returns a [`SendStatus`] indicating the status of this send.
300 /// There is no guarantee that send over a correct connection generates a [`SendStatus::Sent`]
301 /// because any time a connection can be disconnected (even while you are sending).
302 /// Except cases where you need to be sure that the message has been sent,
303 /// you will want to process a [`NetEvent::Disconnected`] to determine if the connection +
304 /// is *alive* instead of check if `send()` returned [`SendStatus::ResourceNotFound`].
305 pub fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
306 log::trace!("Sending {} bytes to {}...", data.len(), endpoint);
307 let status =
308 self.controllers[endpoint.resource_id().adapter_id() as usize].send(endpoint, data);
309 log::trace!("Send status: {:?}", status);
310 status
311 }
312
313 /// Remove a network resource.
314 /// Returns `false` if the resource id doesn't exists.
315 /// This is used to remove resources as connection or listeners.
316 /// Resources of endpoints generated by listening in connection oriented transports
317 /// can also be removed to close the connection.
318 /// Removing an already connected connection implies a disconnection.
319 /// Note that non-oriented connections as UDP use its listener resource to manage all
320 /// remote endpoints internally, the remotes have not resource for themselfs.
321 /// It means that all generated `Endpoint`s share the `ResourceId` of the listener and
322 /// if you remove this resource you are removing the listener of all of them.
323 /// For that cases there is no need to remove the resource because non-oriented connections
324 /// have not connection itself to close, 'there is no spoon'.
325 pub fn remove(&self, resource_id: ResourceId) -> bool {
326 log::trace!("Remove {}", resource_id);
327 let value = self.controllers[resource_id.adapter_id() as usize].remove(resource_id);
328 log::trace!("Removed: {}", value);
329 value
330 }
331
332 /// Check a resource specified by `resource_id` is ready.
333 /// If the status is `true` means that the resource is ready to use.
334 /// In connection oriented transports, it implies the resource is connected.
335 /// If the status is `false` it means that the resource is not yet ready to use.
336 /// If the resource has been removed, disconnected, or does not exists in the network,
337 /// a `None` is returned.
338 pub fn is_ready(&self, resource_id: ResourceId) -> Option<bool> {
339 self.controllers[resource_id.adapter_id() as usize].is_ready(resource_id)
340 }
341}
342
343/// Instance in charge of process input network events.
344/// These events are offered to the user as a [`NetEvent`] its processing data.
345pub struct NetworkProcessor {
346 poll: Poll,
347 processors: EventProcessorList,
348}
349
350impl NetworkProcessor {
351 fn new(poll: Poll, processors: EventProcessorList) -> Self {
352 Self { poll, processors }
353 }
354
355 /// Process the next poll event.
356 /// This method waits the timeout specified until the poll event is generated.
357 /// If `None` is passed as timeout, it will wait indefinitely.
358 /// Note that there is no 1-1 relation between an internal poll event and a [`NetEvent`].
359 /// You need to assume that process an internal poll event could call 0 or N times to
360 /// the callback with diferents `NetEvent`s.
361 pub fn process_poll_event(
362 &mut self,
363 timeout: Option<Duration>,
364 mut event_callback: impl FnMut(NetEvent<'_>),
365 ) {
366 let processors = &mut self.processors;
367 self.poll.process_event(timeout, |poll_event| {
368 match poll_event {
369 PollEvent::Network(resource_id, interest) => {
370 let processor = &processors[resource_id.adapter_id() as usize];
371 processor.process(resource_id, interest, &mut |net_event| {
372 log::trace!("Processed {:?}", net_event);
373 event_callback(net_event);
374 });
375 }
376
377 #[allow(dead_code)] //TODO: remove it with native event support
378 PollEvent::Waker => todo!(),
379 }
380 });
381 }
382
383 /// Process poll events until there is no more events during a `timeout` duration.
384 /// This method makes succesive calls to [`NetworkProcessor::process_poll_event()`].
385 pub fn process_poll_events_until_timeout(
386 &mut self,
387 timeout: Duration,
388 mut event_callback: impl FnMut(NetEvent<'_>),
389 ) {
390 loop {
391 let now = Instant::now();
392 self.process_poll_event(Some(timeout), &mut event_callback);
393 if now.elapsed() > timeout {
394 break;
395 }
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use std::time::{Duration};
404 use crate::util::thread::{NamespacedThread};
405
406 use test_case::test_case;
407
408 lazy_static::lazy_static! {
409 static ref TIMEOUT: Duration = Duration::from_millis(1000);
410 static ref LOCALHOST_CONN_TIMEOUT: Duration = Duration::from_millis(5000);
411 }
412
413 #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
414 #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
415 #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
416 fn successful_connection(transport: Transport) {
417 let (controller, mut processor) = self::split();
418 let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
419 let (endpoint, _) = controller.connect(transport, addr).unwrap();
420
421 let mut was_connected = 0;
422 let mut was_accepted = 0;
423 processor.process_poll_events_until_timeout(*TIMEOUT, |net_event| match net_event {
424 NetEvent::Connected(net_endpoint, status) => {
425 assert!(status);
426 assert_eq!(endpoint, net_endpoint);
427 was_connected += 1;
428 }
429 NetEvent::Accepted(_, net_listener_id) => {
430 assert_eq!(listener_id, net_listener_id);
431 was_accepted += 1;
432 }
433 _ => unreachable!(),
434 });
435 assert_eq!(was_accepted, 1);
436 assert_eq!(was_connected, 1);
437 }
438
439 #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
440 #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
441 #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
442 fn successful_connection_sync(transport: Transport) {
443 let (controller, mut processor) = self::split();
444 let (_, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
445
446 let mut thread = NamespacedThread::spawn("test", move || {
447 let (endpoint, _) = controller.connect_sync(transport, addr).unwrap();
448 assert!(controller.is_ready(endpoint.resource_id()).unwrap());
449 });
450
451 processor.process_poll_events_until_timeout(*TIMEOUT, |_| ());
452
453 thread.join();
454 }
455
456 #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
457 #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
458 #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
459 fn unreachable_connection(transport: Transport) {
460 let (controller, mut processor) = self::split();
461
462 // Ensure that addr is not using by other process
463 // because it takes some secs to be reusable.
464 let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
465 controller.remove(listener_id);
466
467 let (endpoint, _) = controller.connect(transport, addr).unwrap();
468 assert_eq!(controller.send(endpoint, &[42]), SendStatus::ResourceNotAvailable);
469 assert!(!controller.is_ready(endpoint.resource_id()).unwrap());
470
471 let mut was_disconnected = false;
472 processor.process_poll_events_until_timeout(*LOCALHOST_CONN_TIMEOUT, |net_event| {
473 match net_event {
474 NetEvent::Connected(net_endpoint, status) => {
475 assert!(!status);
476 assert_eq!(endpoint, net_endpoint);
477 was_disconnected = true;
478 }
479 _ => unreachable!(),
480 }
481 });
482 assert!(was_disconnected);
483 }
484
485 #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))]
486 #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))]
487 #[cfg_attr(feature = "websocket", test_case(Transport::Ws))]
488 fn unreachable_connection_sync(transport: Transport) {
489 let (controller, mut processor) = self::split();
490
491 // Ensure that addr is not using by other process
492 // because it takes some secs to be reusable.
493 let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap();
494 controller.remove(listener_id);
495
496 let mut thread = NamespacedThread::spawn("test", move || {
497 let err = controller.connect_sync(transport, addr).unwrap_err();
498 assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
499 });
500
501 processor.process_poll_events_until_timeout(*LOCALHOST_CONN_TIMEOUT, |_| ());
502
503 thread.join();
504 }
505
506 #[test]
507 fn create_remove_listener() {
508 let (controller, mut processor) = self::split();
509 let (listener_id, _) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
510 assert!(controller.remove(listener_id)); // Do not generate an event
511 assert!(!controller.remove(listener_id));
512
513 processor.process_poll_events_until_timeout(*TIMEOUT, |_| unreachable!());
514 }
515
516 #[test]
517 fn create_remove_listener_with_connection() {
518 let (controller, mut processor) = self::split();
519 let (listener_id, addr) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
520 controller.connect(Transport::Tcp, addr).unwrap();
521
522 let mut was_accepted = false;
523 processor.process_poll_events_until_timeout(*TIMEOUT, |net_event| match net_event {
524 NetEvent::Connected(..) => (),
525 NetEvent::Accepted(_, _) => {
526 assert!(controller.remove(listener_id));
527 assert!(!controller.remove(listener_id));
528 was_accepted = true;
529 }
530 _ => unreachable!(),
531 });
532 assert!(was_accepted);
533 }
534}