1use super::endpoint::{Endpoint};
2use super::resource_id::{ResourceId, ResourceType};
3use super::poll::{Poll, Readiness};
4use super::registry::{ResourceRegistry, Register};
5use super::remote_addr::{RemoteAddr};
6use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus, PendingStatus};
7use super::transport::{TransportConnect, TransportListen};
8
9use std::net::{SocketAddr};
10use std::sync::{
11 Arc,
12 atomic::{AtomicBool, Ordering},
13};
14use std::io::{self};
15
16#[cfg(doctest)]
17use super::transport::{Transport};
18
19pub enum NetEvent<'a> {
21 Connected(Endpoint, bool),
33
34 Accepted(Endpoint, ResourceId),
39
40 Message(Endpoint, &'a [u8]),
47
48 Disconnected(Endpoint),
57}
58
59impl std::fmt::Debug for NetEvent<'_> {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 let string = match self {
62 Self::Connected(endpoint, status) => format!("Connected({endpoint}, {status})"),
63 Self::Accepted(endpoint, id) => format!("Accepted({endpoint}, {id})"),
64 Self::Message(endpoint, data) => format!("Message({}, {})", endpoint, data.len()),
65 Self::Disconnected(endpoint) => format!("Disconnected({endpoint})"),
66 };
67 write!(f, "NetEvent::{string}")
68 }
69}
70
71pub trait ActionController: Send + Sync {
72 fn connect_with(
73 &self,
74 config: TransportConnect,
75 addr: RemoteAddr,
76 ) -> io::Result<(Endpoint, SocketAddr)>;
77 fn listen_with(
78 &self,
79 config: TransportListen,
80 addr: SocketAddr,
81 ) -> io::Result<(ResourceId, SocketAddr)>;
82 fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus;
83 fn remove(&self, id: ResourceId) -> bool;
84 fn is_ready(&self, id: ResourceId) -> Option<bool>;
85}
86
87pub trait EventProcessor: Send + Sync {
88 fn process(&self, id: ResourceId, readiness: Readiness, callback: &mut dyn FnMut(NetEvent<'_>));
89}
90
91struct RemoteProperties {
92 peer_addr: SocketAddr,
93 local: Option<ResourceId>,
94 ready: AtomicBool,
95}
96
97impl RemoteProperties {
98 fn new(peer_addr: SocketAddr, local: Option<ResourceId>) -> Self {
99 Self { peer_addr, local, ready: AtomicBool::new(false) }
100 }
101
102 pub fn is_ready(&self) -> bool {
103 self.ready.load(Ordering::Relaxed)
104 }
105
106 pub fn mark_as_ready(&self) {
107 self.ready.store(true, Ordering::Relaxed);
108 }
109}
110
111struct LocalProperties;
112
113pub struct Driver<R: Remote, L: Local> {
114 remote_registry: Arc<ResourceRegistry<R, RemoteProperties>>,
115 local_registry: Arc<ResourceRegistry<L, LocalProperties>>,
116}
117
118impl<R: Remote, L: Local> Driver<R, L> {
119 pub fn new(
120 _: impl Adapter<Remote = R, Local = L>,
121 adapter_id: u8,
122 poll: &mut Poll,
123 ) -> Driver<R, L> {
124 let remote_poll_registry = poll.create_registry(adapter_id, ResourceType::Remote);
125 let local_poll_registry = poll.create_registry(adapter_id, ResourceType::Local);
126
127 Driver {
128 remote_registry: Arc::new(ResourceRegistry::<R, RemoteProperties>::new(
129 remote_poll_registry,
130 )),
131 local_registry: Arc::new(ResourceRegistry::<L, LocalProperties>::new(
132 local_poll_registry,
133 )),
134 }
135 }
136}
137
138impl<R: Remote, L: Local> Clone for Driver<R, L> {
139 fn clone(&self) -> Driver<R, L> {
140 Driver {
141 remote_registry: self.remote_registry.clone(),
142 local_registry: self.local_registry.clone(),
143 }
144 }
145}
146
147impl<R: Remote, L: Local> ActionController for Driver<R, L> {
148 fn connect_with(
149 &self,
150 config: TransportConnect,
151 addr: RemoteAddr,
152 ) -> io::Result<(Endpoint, SocketAddr)> {
153 R::connect_with(config, addr).map(|info| {
154 let id = self.remote_registry.register(
155 info.remote,
156 RemoteProperties::new(info.peer_addr, None),
157 true,
158 );
159 (Endpoint::new(id, info.peer_addr), info.local_addr)
160 })
161 }
162
163 fn listen_with(
164 &self,
165 config: TransportListen,
166 addr: SocketAddr,
167 ) -> io::Result<(ResourceId, SocketAddr)> {
168 L::listen_with(config, addr).map(|info| {
169 let id = self.local_registry.register(info.local, LocalProperties, false);
170 (id, info.local_addr)
171 })
172 }
173
174 fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus {
175 match endpoint.resource_id().resource_type() {
176 ResourceType::Remote => match self.remote_registry.get(endpoint.resource_id()) {
177 Some(remote) => match remote.properties.is_ready() {
178 true => remote.resource.send(data),
179 false => SendStatus::ResourceNotAvailable,
180 },
181 None => SendStatus::ResourceNotFound,
182 },
183 ResourceType::Local => match self.local_registry.get(endpoint.resource_id()) {
184 Some(remote) => remote.resource.send_to(endpoint.addr(), data),
185 None => SendStatus::ResourceNotFound,
186 },
187 }
188 }
189
190 fn remove(&self, id: ResourceId) -> bool {
191 match id.resource_type() {
192 ResourceType::Remote => self.remote_registry.deregister(id),
193 ResourceType::Local => self.local_registry.deregister(id),
194 }
195 }
196
197 fn is_ready(&self, id: ResourceId) -> Option<bool> {
198 match id.resource_type() {
199 ResourceType::Remote => self.remote_registry.get(id).map(|r| r.properties.is_ready()),
200 ResourceType::Local => self.local_registry.get(id).map(|_| true),
201 }
202 }
203}
204
205impl<R: Remote, L: Local<Remote = R>> EventProcessor for Driver<R, L> {
206 fn process(
207 &self,
208 id: ResourceId,
209 readiness: Readiness,
210 event_callback: &mut dyn FnMut(NetEvent<'_>),
211 ) {
212 match id.resource_type() {
213 ResourceType::Remote => {
214 if let Some(remote) = self.remote_registry.get(id) {
215 let endpoint = Endpoint::new(id, remote.properties.peer_addr);
216 log::trace!("Processed remote for {}", endpoint);
217
218 if !remote.properties.is_ready() {
219 self.resolve_pending_remote(&remote, endpoint, readiness, |e| {
220 event_callback(e)
221 });
222 }
223 if remote.properties.is_ready() {
224 match readiness {
225 Readiness::Write => {
226 self.write_to_remote(&remote, endpoint, event_callback);
227 }
228 Readiness::Read => {
229 self.read_from_remote(&remote, endpoint, event_callback);
230 }
231 }
232 }
233 }
234 }
235 ResourceType::Local => {
236 if let Some(local) = self.local_registry.get(id) {
237 log::trace!("Processed local for {}", id);
238 match readiness {
239 Readiness::Write => (),
240 Readiness::Read => self.read_from_local(&local, id, event_callback),
241 }
242 }
243 }
244 }
245 }
246}
247
248impl<R: Remote, L: Local<Remote = R>> Driver<R, L> {
249 fn resolve_pending_remote(
250 &self,
251 remote: &Arc<Register<R, RemoteProperties>>,
252 endpoint: Endpoint,
253 readiness: Readiness,
254 mut event_callback: impl FnMut(NetEvent<'_>),
255 ) {
256 let status = remote.resource.pending(readiness);
257 log::trace!("Resolve pending for {}: {:?}", endpoint, status);
258 match status {
259 PendingStatus::Ready => {
260 remote.properties.mark_as_ready();
261 match remote.properties.local {
262 Some(listener_id) => event_callback(NetEvent::Accepted(endpoint, listener_id)),
263 None => event_callback(NetEvent::Connected(endpoint, true)),
264 }
265 remote.resource.ready_to_write();
266 }
267 PendingStatus::Incomplete => (),
268 PendingStatus::Disconnected => {
269 self.remote_registry.deregister(endpoint.resource_id());
270 if remote.properties.local.is_none() {
271 event_callback(NetEvent::Connected(endpoint, false));
272 }
273 }
274 }
275 }
276
277 fn write_to_remote(
278 &self,
279 remote: &Arc<Register<R, RemoteProperties>>,
280 endpoint: Endpoint,
281 mut event_callback: impl FnMut(NetEvent<'_>),
282 ) {
283 if !remote.resource.ready_to_write() {
284 event_callback(NetEvent::Disconnected(endpoint));
285 }
286 }
287
288 fn read_from_remote(
289 &self,
290 remote: &Arc<Register<R, RemoteProperties>>,
291 endpoint: Endpoint,
292 mut event_callback: impl FnMut(NetEvent<'_>),
293 ) {
294 let status =
295 remote.resource.receive(|data| event_callback(NetEvent::Message(endpoint, data)));
296 log::trace!("Receive status: {:?}", status);
297 if let ReadStatus::Disconnected = status {
298 if self.remote_registry.deregister(endpoint.resource_id()) {
300 event_callback(NetEvent::Disconnected(endpoint));
301 }
302 }
303 }
304
305 fn read_from_local(
306 &self,
307 local: &Arc<Register<L, LocalProperties>>,
308 id: ResourceId,
309 mut event_callback: impl FnMut(NetEvent<'_>),
310 ) {
311 local.resource.accept(|accepted| {
312 log::trace!("Accepted type: {}", accepted);
313 match accepted {
314 AcceptedType::Remote(addr, remote) => {
315 self.remote_registry.register(
316 remote,
317 RemoteProperties::new(addr, Some(id)),
318 true,
319 );
320 }
321 AcceptedType::Data(addr, data) => {
322 let endpoint = Endpoint::new(id, addr);
323 event_callback(NetEvent::Message(endpoint, data));
324 }
325 }
326 });
327 }
328}
329
330impl<R> std::fmt::Display for AcceptedType<'_, R> {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 let string = match self {
333 AcceptedType::Remote(addr, _) => format!("Remote({addr})"),
334 AcceptedType::Data(addr, _) => format!("Data({addr})"),
335 };
336 write!(f, "AcceptedType::{string}")
337 }
338}