message_io/network/
driver.rs

1use super::endpoint::{Endpoint};
2use super::resource_id::{ResourceId, ResourceType};
3use super::poll::{Poll, Readiness};
4use super::registry::{ResourceRegistry, Register};
5use super::remote_addr::{RemoteAddr};
6use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus, PendingStatus};
7use super::transport::{TransportConnect, TransportListen};
8
9use std::net::{SocketAddr};
10use std::sync::{
11    Arc,
12    atomic::{AtomicBool, Ordering},
13};
14use std::io::{self};
15
16#[cfg(doctest)]
17use super::transport::{Transport};
18
19/// Enum used to describe a network event that an internal transport adapter has produced.
20pub enum NetEvent<'a> {
21    /// Connection result.
22    /// This event is only generated after a [`crate::network::NetworkController::connect()`]
23    /// call.
24    /// The event contains the endpoint of the connection
25    /// (same endpoint returned by the `connect()` method),
26    /// and a boolean indicating the *result* of that connection.
27    /// In *non connection-oriented transports* as *UDP* it simply means that the resource
28    /// is ready to use, and the boolean will be always `true`.
29    /// In connection-oriented transports it means that the handshake has been performed, and the
30    /// connection is established and ready to use.
31    /// Since this handshake could fail, the boolean could be `false`.
32    Connected(Endpoint, bool),
33
34    /// New endpoint has been accepted by a listener and considered ready to use.
35    /// The event contains the resource id of the listener that accepted this connection.
36    ///
37    /// Note that this event will only be generated by connection-oriented transports as *TCP*.
38    Accepted(Endpoint, ResourceId),
39
40    /// Input message received by the network.
41    /// In packet-based transports, the data of a message sent corresponds with the data of this
42    /// event. This one-to-one relation is not conserved in stream-based transports as *TCP*.
43    ///
44    /// If you want a packet-based protocol over *TCP* use
45    /// [`crate::network::Transport::FramedTcp`].
46    Message(Endpoint, &'a [u8]),
47
48    /// This event is only dispatched when a connection is lost.
49    /// Remove explicitely a resource will NOT generate the event.
50    /// When this event is received, the resource is considered already removed,
51    /// the user do not need to remove it after this event.
52    /// A [`NetEvent::Message`] event will never be generated after this event from this endpoint.
53    ///
54    /// Note that this event will only be generated by connection-oriented transports as *TCP*.
55    /// *UDP*, for example, is NOT connection-oriented, and the event can no be detected.
56    Disconnected(Endpoint),
57}
58
59impl std::fmt::Debug for NetEvent<'_> {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        let string = match self {
62            Self::Connected(endpoint, status) => format!("Connected({endpoint}, {status})"),
63            Self::Accepted(endpoint, id) => format!("Accepted({endpoint}, {id})"),
64            Self::Message(endpoint, data) => format!("Message({}, {})", endpoint, data.len()),
65            Self::Disconnected(endpoint) => format!("Disconnected({endpoint})"),
66        };
67        write!(f, "NetEvent::{string}")
68    }
69}
70
71pub trait ActionController: Send + Sync {
72    fn connect_with(
73        &self,
74        config: TransportConnect,
75        addr: RemoteAddr,
76    ) -> io::Result<(Endpoint, SocketAddr)>;
77    fn listen_with(
78        &self,
79        config: TransportListen,
80        addr: SocketAddr,
81    ) -> io::Result<(ResourceId, SocketAddr)>;
82    fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus;
83    fn remove(&self, id: ResourceId) -> bool;
84    fn is_ready(&self, id: ResourceId) -> Option<bool>;
85}
86
87pub trait EventProcessor: Send + Sync {
88    fn process(&self, id: ResourceId, readiness: Readiness, callback: &mut dyn FnMut(NetEvent<'_>));
89}
90
91struct RemoteProperties {
92    peer_addr: SocketAddr,
93    local: Option<ResourceId>,
94    ready: AtomicBool,
95}
96
97impl RemoteProperties {
98    fn new(peer_addr: SocketAddr, local: Option<ResourceId>) -> Self {
99        Self { peer_addr, local, ready: AtomicBool::new(false) }
100    }
101
102    pub fn is_ready(&self) -> bool {
103        self.ready.load(Ordering::Relaxed)
104    }
105
106    pub fn mark_as_ready(&self) {
107        self.ready.store(true, Ordering::Relaxed);
108    }
109}
110
111struct LocalProperties;
112
113pub struct Driver<R: Remote, L: Local> {
114    remote_registry: Arc<ResourceRegistry<R, RemoteProperties>>,
115    local_registry: Arc<ResourceRegistry<L, LocalProperties>>,
116}
117
118impl<R: Remote, L: Local> Driver<R, L> {
119    pub fn new(
120        _: impl Adapter<Remote = R, Local = L>,
121        adapter_id: u8,
122        poll: &mut Poll,
123    ) -> Driver<R, L> {
124        let remote_poll_registry = poll.create_registry(adapter_id, ResourceType::Remote);
125        let local_poll_registry = poll.create_registry(adapter_id, ResourceType::Local);
126
127        Driver {
128            remote_registry: Arc::new(ResourceRegistry::<R, RemoteProperties>::new(
129                remote_poll_registry,
130            )),
131            local_registry: Arc::new(ResourceRegistry::<L, LocalProperties>::new(
132                local_poll_registry,
133            )),
134        }
135    }
136}
137
138impl<R: Remote, L: Local> Clone for Driver<R, L> {
139    fn clone(&self) -> Driver<R, L> {
140        Driver {
141            remote_registry: self.remote_registry.clone(),
142            local_registry: self.local_registry.clone(),
143        }
144    }
145}
146
147impl<R: Remote, L: Local> ActionController for Driver<R, L> {
148    fn connect_with(
149        &self,
150        config: TransportConnect,
151        addr: RemoteAddr,
152    ) -> io::Result<(Endpoint, SocketAddr)> {
153        R::connect_with(config, addr).map(|info| {
154            let id = self.remote_registry.register(
155                info.remote,
156                RemoteProperties::new(info.peer_addr, None),
157                true,
158            );
159            (Endpoint::new(id, info.peer_addr), info.local_addr)
160        })
161    }
162
163    fn listen_with(
164        &self,
165        config: TransportListen,
166        addr: SocketAddr,
167    ) -> io::Result<(ResourceId, SocketAddr)> {
168        L::listen_with(config, addr).map(|info| {
169            let id = self.local_registry.register(info.local, LocalProperties, false);
170            (id, info.local_addr)
171        })
172    }
173
174    fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
175        match endpoint.resource_id().resource_type() {
176            ResourceType::Remote => match self.remote_registry.get(endpoint.resource_id()) {
177                Some(remote) => match remote.properties.is_ready() {
178                    true => remote.resource.send(data),
179                    false => SendStatus::ResourceNotAvailable,
180                },
181                None => SendStatus::ResourceNotFound,
182            },
183            ResourceType::Local => match self.local_registry.get(endpoint.resource_id()) {
184                Some(remote) => remote.resource.send_to(endpoint.addr(), data),
185                None => SendStatus::ResourceNotFound,
186            },
187        }
188    }
189
190    fn remove(&self, id: ResourceId) -> bool {
191        match id.resource_type() {
192            ResourceType::Remote => self.remote_registry.deregister(id),
193            ResourceType::Local => self.local_registry.deregister(id),
194        }
195    }
196
197    fn is_ready(&self, id: ResourceId) -> Option<bool> {
198        match id.resource_type() {
199            ResourceType::Remote => self.remote_registry.get(id).map(|r| r.properties.is_ready()),
200            ResourceType::Local => self.local_registry.get(id).map(|_| true),
201        }
202    }
203}
204
205impl<R: Remote, L: Local<Remote = R>> EventProcessor for Driver<R, L> {
206    fn process(
207        &self,
208        id: ResourceId,
209        readiness: Readiness,
210        event_callback: &mut dyn FnMut(NetEvent<'_>),
211    ) {
212        match id.resource_type() {
213            ResourceType::Remote => {
214                if let Some(remote) = self.remote_registry.get(id) {
215                    let endpoint = Endpoint::new(id, remote.properties.peer_addr);
216                    log::trace!("Processed remote for {}", endpoint);
217
218                    if !remote.properties.is_ready() {
219                        self.resolve_pending_remote(&remote, endpoint, readiness, |e| {
220                            event_callback(e)
221                        });
222                    }
223                    if remote.properties.is_ready() {
224                        match readiness {
225                            Readiness::Write => {
226                                self.write_to_remote(&remote, endpoint, event_callback);
227                            }
228                            Readiness::Read => {
229                                self.read_from_remote(&remote, endpoint, event_callback);
230                            }
231                        }
232                    }
233                }
234            }
235            ResourceType::Local => {
236                if let Some(local) = self.local_registry.get(id) {
237                    log::trace!("Processed local for {}", id);
238                    match readiness {
239                        Readiness::Write => (),
240                        Readiness::Read => self.read_from_local(&local, id, event_callback),
241                    }
242                }
243            }
244        }
245    }
246}
247
248impl<R: Remote, L: Local<Remote = R>> Driver<R, L> {
249    fn resolve_pending_remote(
250        &self,
251        remote: &Arc<Register<R, RemoteProperties>>,
252        endpoint: Endpoint,
253        readiness: Readiness,
254        mut event_callback: impl FnMut(NetEvent<'_>),
255    ) {
256        let status = remote.resource.pending(readiness);
257        log::trace!("Resolve pending for {}: {:?}", endpoint, status);
258        match status {
259            PendingStatus::Ready => {
260                remote.properties.mark_as_ready();
261                match remote.properties.local {
262                    Some(listener_id) => event_callback(NetEvent::Accepted(endpoint, listener_id)),
263                    None => event_callback(NetEvent::Connected(endpoint, true)),
264                }
265                remote.resource.ready_to_write();
266            }
267            PendingStatus::Incomplete => (),
268            PendingStatus::Disconnected => {
269                self.remote_registry.deregister(endpoint.resource_id());
270                if remote.properties.local.is_none() {
271                    event_callback(NetEvent::Connected(endpoint, false));
272                }
273            }
274        }
275    }
276
277    fn write_to_remote(
278        &self,
279        remote: &Arc<Register<R, RemoteProperties>>,
280        endpoint: Endpoint,
281        mut event_callback: impl FnMut(NetEvent<'_>),
282    ) {
283        if !remote.resource.ready_to_write() {
284            event_callback(NetEvent::Disconnected(endpoint));
285        }
286    }
287
288    fn read_from_remote(
289        &self,
290        remote: &Arc<Register<R, RemoteProperties>>,
291        endpoint: Endpoint,
292        mut event_callback: impl FnMut(NetEvent<'_>),
293    ) {
294        let status =
295            remote.resource.receive(|data| event_callback(NetEvent::Message(endpoint, data)));
296        log::trace!("Receive status: {:?}", status);
297        if let ReadStatus::Disconnected = status {
298            // Checked because, the user in the callback could have removed the same resource.
299            if self.remote_registry.deregister(endpoint.resource_id()) {
300                event_callback(NetEvent::Disconnected(endpoint));
301            }
302        }
303    }
304
305    fn read_from_local(
306        &self,
307        local: &Arc<Register<L, LocalProperties>>,
308        id: ResourceId,
309        mut event_callback: impl FnMut(NetEvent<'_>),
310    ) {
311        local.resource.accept(|accepted| {
312            log::trace!("Accepted type: {}", accepted);
313            match accepted {
314                AcceptedType::Remote(addr, remote) => {
315                    self.remote_registry.register(
316                        remote,
317                        RemoteProperties::new(addr, Some(id)),
318                        true,
319                    );
320                }
321                AcceptedType::Data(addr, data) => {
322                    let endpoint = Endpoint::new(id, addr);
323                    event_callback(NetEvent::Message(endpoint, data));
324                }
325            }
326        });
327    }
328}
329
330impl<R> std::fmt::Display for AcceptedType<'_, R> {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        let string = match self {
333            AcceptedType::Remote(addr, _) => format!("Remote({addr})"),
334            AcceptedType::Data(addr, _) => format!("Data({addr})"),
335        };
336        write!(f, "AcceptedType::{string}")
337    }
338}