1use crate::{
20 socket_services::socket_server::common::upgrade_to_termination_tracking,
21 types::{
22 ProtocolEvent,
23 MessagingMutinyStream,
24 }, socket_connection::{
25 peer::Peer,
26 socket_connection_handler::SocketConnectionHandler,
27 connection_provider::{ServerConnectionHandler, ConnectionChannel},
28 },
29 serde::{ ReactiveMessagingDeserializer, ReactiveMessagingSerializer},
30};
31use crate::socket_connection::connection::SocketConnection;
32use crate::socket_services::types::MessagingService;
33use crate::types::ConnectionEvent;
34use std::{
35 error::Error,
36 fmt::Debug,
37 future::Future,
38 sync::Arc,
39};
40use reactive_mutiny::prelude::advanced::{
41 FullDuplexUniChannel,
42 GenericUni,
43};
44use std::net::SocketAddr;
45use futures::{future::BoxFuture, Stream};
46use tokio::io::AsyncWriteExt;
47use log::{error, trace, warn};
48
49
50#[macro_export]
64macro_rules! new_socket_server {
65 ($const_config: expr,
66 $interface_ip: expr,
67 $port: expr) => {
68 crate::new_composite_socket_server!($const_config, $interface_ip, $port, ())
69 }
70}
71pub use new_socket_server;
72
73
74#[macro_export]
87macro_rules! new_composite_socket_server {
88 ($const_config: expr,
89 $interface_ip: expr,
90 $port: expr,
91 $state_type: ty) => {{
92 const _CONST_CONFIG: ConstConfig = $const_config;
93 const _CONFIG: u64 = _CONST_CONFIG.into();
94 CompositeSocketServer::<_CONFIG, $state_type>::new($interface_ip, $port)
95 }}
96}
97pub use new_composite_socket_server;
98
99
100#[macro_export]
102macro_rules! spawn_server_processor {
103 ($const_config: expr,
104 $channel_type: tt,
105 $socket_server: expr,
106 $remote_messages: ty,
107 $local_messages: ty,
108 $connection_events_handler_fn: expr,
109 $dialog_processor_builder_fn: expr) => {{
110 _define_processor_uni_and_sender_channel_types!($const_config, $channel_type, $remote_messages, $local_messages);
111 $socket_server.spawn_processor::<$remote_messages, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _>($connection_events_handler_fn, $dialog_processor_builder_fn).await
112 }}
113}
114pub use spawn_server_processor;
115
116
117#[macro_export]
121macro_rules! start_server_processor {
122 ($const_config: expr,
123 $channel_type: tt,
124 $socket_server: expr,
125 $remote_messages: ty,
126 $local_messages: ty,
127 $connection_events_handler_fn: expr,
128 $dialog_processor_builder_fn: expr) => {{
129 match spawn_server_processor!($const_config, $channel_type, $socket_server, $remote_messages, $local_messages, $connection_events_handler_fn, $dialog_processor_builder_fn) {
130 Ok(connection_channel) => $socket_server.start_single_protocol(connection_channel).await,
131 Err(err) => Err(err),
132 }
133 }}
134}
135pub use start_server_processor;
136
137
138pub struct CompositeSocketServer<const CONFIG: u64,
149 StateType: Send + Sync + Clone + Debug + 'static> {
150
151 interface_ip: String,
153 port: u16,
155 connection_provider: Option<ServerConnectionHandler<StateType>>,
158 processor_termination_complete_receivers: Option<Vec<tokio::sync::oneshot::Receiver<()>>>,
160 returned_connections_source: Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>>,
163 returned_connections_sink: tokio::sync::mpsc::Sender<SocketConnection<StateType>>,
164}
165impl<const CONFIG: u64,
166 StateType: Send + Sync + Clone + Debug + 'static>
167CompositeSocketServer<CONFIG, StateType> {
168
169 pub fn new<IntoString: Into<String>>
173 (interface_ip: IntoString,
174 port: u16)
175 -> Self {
176 let (returned_connections_sink, returned_connections_source) = tokio::sync::mpsc::channel::<SocketConnection<StateType>>(2);
177 Self {
178 interface_ip: interface_ip.into(),
179 port,
180 connection_provider: None,
181 processor_termination_complete_receivers: Some(vec![]),
182 returned_connections_source: Some(returned_connections_source),
183 returned_connections_sink,
184 }
185 }
186}
187
188impl<const CONFIG: u64,
189 StateType: Send + Sync + Clone + Debug + 'static>
190MessagingService<CONFIG>
191for CompositeSocketServer<CONFIG, StateType> {
192
193 type StateType = StateType;
194
195 async fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static,
196 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
197 ProcessorUniType: GenericUni<ItemType=RemoteMessages> + Send + Sync + 'static,
198 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync + 'static,
199 OutputStreamItemsType: Send + Sync + Debug + 'static,
200 ServerStreamType: Stream<Item=OutputStreamItemsType> + Send + 'static,
201 ConnectionEventsCallbackFuture: Future<Output=()> + Send + 'static,
202 ConnectionEventsCallback: Fn(ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
203 ProcessorBuilderFn: Fn(String, u16, Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>, MessagingMutinyStream<ProcessorUniType>) -> ServerStreamType + Send + Sync + 'static>
204
205 (&mut self,
206 connection_events_callback: ConnectionEventsCallback,
207 dialog_processor_builder_fn: ProcessorBuilderFn)
208
209 -> Result<ConnectionChannel<StateType>, Box<dyn Error + Sync + Send>> {
210
211 let (local_termination_sender, local_termination_receiver) = tokio::sync::oneshot::channel::<()>();
213 self.processor_termination_complete_receivers.as_mut().expect("BUG!").push(local_termination_receiver);
214 let connection_events_callback = upgrade_to_termination_tracking(local_termination_sender, connection_events_callback);
215
216 let mut connection_provider = ConnectionChannel::new();
218 let new_connections_source = connection_provider.receiver()
219 .ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
220
221 let socket_communications_handler = SocketConnectionHandler::<CONFIG, RemoteMessages, LocalMessages, ProcessorUniType, SenderChannel, StateType>::new();
223 socket_communications_handler.server_loop_for_text_protocol(&self.interface_ip,
224 self.port,
225 new_connections_source,
226 self.returned_connections_sink.clone(),
227 connection_events_callback,
228 dialog_processor_builder_fn).await
229 .map_err(|err| format!("Error starting an unresponsive GenericCompositeSocketServer @ {}:{}: {:?}", self.interface_ip, self.port, err))?;
230 Ok(connection_provider)
231 }
232
233 async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output=()> + Send>
234 (&mut self,
235 initial_connection_state: StateType,
236 mut connection_routing_closure: impl FnMut(&SocketConnection<StateType>, bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<StateType>>> + Send + 'static,
237 connection_events_callback: impl for <'r> Fn(ConnectionEvent<'r, StateType>) -> ConnectionEventsCallbackFuture + Send + 'static)
238 -> Result<(), Box<dyn Error + Sync + Send>> {
239 let mut connection_provider = ServerConnectionHandler::new(&self.interface_ip, self.port, initial_connection_state).await
240 .map_err(|err| format!("couldn't start the Connection Provider server event loop: {err}"))?;
241 let mut new_connections_source = connection_provider.connection_receiver()
242 .ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
243 _ = self.connection_provider.insert(connection_provider);
244
245 let mut returned_connections_source = self.returned_connections_source.take()
246 .ok_or_else(|| String::from("couldn't `take()` from the `returned_connections_source`. Has the server been `.start()`ed more than once?"))?;
247
248 let interface_ip = self.interface_ip.clone();
249 let port = self.port;
250
251 tokio::spawn(async move {
256
257 loop {
258 let (mut connection, sender) = tokio::select! {
259
260 new_connection = new_connections_source.recv() => {
262 let Some(new_socket_connection) = new_connection else { break };
263 connection_events_callback(ConnectionEvent::Connected(&new_socket_connection)).await;
264 let sender = connection_routing_closure(&new_socket_connection, false);
265 (new_socket_connection, sender)
266 },
267
268 returned_connection_and_state = returned_connections_source.recv() => {
270 let Some(returned_socket_connection) = returned_connection_and_state else { break };
271 let sender = (!returned_socket_connection.closed())
272 .then_some(())
273 .and_then(|_| connection_routing_closure(&returned_socket_connection, true));
274 (returned_socket_connection, sender)
275 },
276 };
277
278 match sender {
280 Some(sender) => {
281 let (client_ip, client_port) = connection.connection().peer_addr()
282 .map(|peer_addr| match peer_addr {
283 SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
284 SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
285 })
286 .unwrap_or_else(|err| (format!("<unknown -- err:{err}>"), 0));
287 trace!("`reactive-messaging::CompositeSocketServer`: ROUTING the client {client_ip}:{client_port} of the server @ {interface_ip}:{port} to another processor");
288 if let Err(_) = sender.send(connection).await {
289 error!("`reactive-messaging::CompositeSocketServer`: BUG(?) in server @ {interface_ip}:{port} while re-routing the client {client_ip}:{client_port}'s socket: THE NEW (ROUTED) PROCESSOR CAN NO LONGER RECEIVE CONNECTIONS -- THE CONNECTION WILL BE DROPPED");
290 break
291 }
292 },
293 None => {
294 connection_events_callback(ConnectionEvent::Disconnected(&connection)).await;
295 if let Err(err) = connection.connection_mut().shutdown().await {
296 let (client_ip, client_port) = connection.connection().peer_addr()
297 .map(|peer_addr| match peer_addr {
298 SocketAddr::V4(v4) => (v4.ip().to_string(), v4.port()),
299 SocketAddr::V6(v6) => (v6.ip().to_string(), v6.port()),
300 })
301 .unwrap_or_else(|err| (format!("<unknown -- err:{err}>"), 0));
302 error!("`reactive-messaging::CompositeSocketServer`: ERROR in server @ {interface_ip}:{port} while shutting down the socket with client {client_ip}:{client_port}: {err}");
303 }
304 }
305 }
306 }
307 trace!("`reactive-messaging::CompositeSocketServer`: The 'Connection Routing Task' for server @ {interface_ip}:{port} ended -- hopefully, due to a graceful server termination.");
309 });
310 Ok(())
311 }
312
313 fn termination_waiter(&mut self) -> Box< dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>> > {
314 let mut local_termination_receiver = self.processor_termination_complete_receivers.take();
315 let interface_ip = self.interface_ip.clone();
316 let port = self.port;
317 Box::new(move || Box::pin(async move {
318 let Some(local_termination_receiver) = local_termination_receiver.take() else {
319 return Err(Box::from(format!("GenericCompositeSocketServer::termination_waiter(): termination requested for server @ {interface_ip}:{port}, but the server was not started (or a previous termination was commanded) at the moment the `termination_waiter()`'s returned closure was called")))
320 };
321 for (i, processor_termination_complete_receiver) in local_termination_receiver.into_iter().enumerate() {
322 if let Err(err) = processor_termination_complete_receiver.await {
323 error!("GenericCompositeSocketServer::termination_waiter(): It is no longer possible to tell when the processor {i} will be termination for server @ {interface_ip}:{port}: `one_shot` signal error: {err}")
324 }
325 }
326 Ok(())
327 }))
328 }
329
330 async fn terminate(mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
331 match self.connection_provider.take() {
332 Some(connection_provider) => {
333 warn!("GenericCompositeSocketServer: Termination asked & initiated for server @ {}:{}", self.interface_ip, self.port);
334 connection_provider.shutdown().await;
335 Ok(())
336 }
337 None => {
338 Err(Box::from("GenericCompositeSocketServer: Termination requested, but the service was not started -- no `self.start_with_*()` was called. Ignoring..."))
339 }
340 }
341 }
342}
343
344
345#[cfg(any(test,doc))]
347mod tests {
348 use super::*;
349 use crate::prelude::*;
350 use std::{
351 future,
352 ops::Deref,
353 sync::atomic::{AtomicU32, Ordering::Relaxed},
354 time::Duration,
355 };
356 use std::sync::atomic::AtomicBool;
357 use serde::{Deserialize, Serialize};
358 use futures::StreamExt;
359 use tokio::sync::Mutex;
360
361
362 const LISTENING_INTERFACE: &str = "127.0.0.1";
364 const PORT_START: u16 = 8040;
366
367
368 #[cfg_attr(not(doc),test)]
370 fn single_protocol_instantiation() {
371 let _atomic_server = new_socket_server!(
372 ConstConfig {
373 ..ConstConfig::default()
374 },
375 LISTENING_INTERFACE, PORT_START);
376
377 let _fullsync_server = new_socket_server!(
378 ConstConfig {
379 ..ConstConfig::default()
380 },
381 LISTENING_INTERFACE, PORT_START+1);
382
383 let _crossbeam_server = new_socket_server!(
384 ConstConfig {
385 ..ConstConfig::default()
386 },
387 LISTENING_INTERFACE, PORT_START+2);
388 }
389
390 #[cfg_attr(not(doc),test)]
392 fn composite_protocol_instantiation() {
393 let _atomic_server = new_composite_socket_server!(
394 ConstConfig {
395 ..ConstConfig::default()
396 },
397 LISTENING_INTERFACE, PORT_START+3, () );
398
399 let _fullsync_server = new_composite_socket_server!(
400 ConstConfig {
401 ..ConstConfig::default()
402 },
403 LISTENING_INTERFACE, PORT_START+4, () );
404
405 let _crossbeam_server = new_composite_socket_server!(
406 ConstConfig {
407 ..ConstConfig::default()
408 },
409 LISTENING_INTERFACE, PORT_START+5, () );
410 }
411
412 #[cfg_attr(not(doc),tokio::test)]
415 async fn doc_usage() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
416
417 const PORT: u16 = PORT_START+6; const TEST_CONFIG: ConstConfig = ConstConfig::default();
419
420 let mut server = new_socket_server!(
424 TEST_CONFIG,
425 LISTENING_INTERFACE,
426 PORT);
427 start_server_processor!(TEST_CONFIG, Atomic, server,
428 DummyClientAndServerMessages,
429 DummyClientAndServerMessages,
430 connection_events_handler,
431 unresponsive_processor
432 )?;
433 async fn connection_events_handler<const CONFIG: u64,
434 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
435 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
436 (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel>) {
437 }
438 fn unresponsive_processor<const CONFIG: u64,
439 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
440 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
441 StreamItemType: Deref<Target=DummyClientAndServerMessages>>
442 (_client_addr: String,
443 _connected_port: u16,
444 _peer: Arc<Peer<CONFIG, LocalMessages, SenderChannel>>,
445 client_messages_stream: impl Stream<Item=StreamItemType>)
446 -> impl Stream<Item=()> {
447 client_messages_stream.map(|_payload| ())
448 }
449 let termination_waiter = server.termination_waiter();
450 server.terminate().await?;
451 termination_waiter().await?;
452
453 let mut server = new_socket_server!(
457 TEST_CONFIG,
458 LISTENING_INTERFACE,
459 PORT);
460 start_server_processor!(TEST_CONFIG, Atomic, server,
461 DummyClientAndServerMessages,
462 DummyClientAndServerMessages,
463 connection_events_handler,
464 responsive_processor
465 )?;
466 fn responsive_processor<const CONFIG: u64,
467 SenderChannel: FullDuplexUniChannel<ItemType=DummyClientAndServerMessages, DerivedItemType=DummyClientAndServerMessages> + Send + Sync,
468 StreamItemType: Deref<Target=DummyClientAndServerMessages>>
469 (_client_addr: String,
470 _connected_port: u16,
471 peer: Arc<Peer<CONFIG, DummyClientAndServerMessages, SenderChannel>>,
472 client_messages_stream: impl Stream<Item=StreamItemType>)
473 -> impl Stream<Item=()> {
474 client_messages_stream
475 .map(|_payload| DummyClientAndServerMessages::FloodPing)
476 .to_responsive_stream(peer, |_, _| ())
477 }
478
479 let termination_waiter = server.termination_waiter();
480 server.terminate().await?;
481 termination_waiter().await?;
482
483 let mut server = new_socket_server!(
486 TEST_CONFIG,
487 LISTENING_INTERFACE,
488 PORT);
489 start_server_processor!(TEST_CONFIG, Atomic, server,
490 DummyClientAndServerMessages,
491 DummyClientAndServerMessages,
492 |_| future::ready(()),
493 |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
494 )?;
495 let termination_waiter = server.termination_waiter();
496 server.terminate().await?;
497 termination_waiter().await?;
498
499 const CUSTOM_CONFIG: ConstConfig = ConstConfig {
504 receiver_buffer: 2048,
505 sender_buffer: 1024,
506 executor_instruments: reactive_mutiny::prelude::Instruments::LogsWithExpensiveMetrics,
507 ..ConstConfig::default()
508 };
509 let mut server = CompositeSocketServer :: <{CUSTOM_CONFIG.into()},
510 ()>
511 :: new(LISTENING_INTERFACE, PORT);
512 type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.receiver_buffer as usize}, 1, {CUSTOM_CONFIG.executor_instruments.into()}>;
513 type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.sender_buffer as usize}, 1>;
514 let connection_channel = server.spawn_processor::<DummyClientAndServerMessages,
515 DummyClientAndServerMessages,
516 ProcessorUniType,
517 SenderChannelType,
518 _, _, _, _, _ > (
519 |_| future::ready(()),
520 |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
521 ).await?;
522 server.start_single_protocol(connection_channel).await?;
523 let termination_waiter = server.termination_waiter();
524 server.terminate().await?;
525 termination_waiter().await?;
526
527 Ok(())
528 }
529
530 #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
536 async fn termination_process() {
537 const PORT: u16 = PORT_START+7;
538
539 let max_time_ms = 20;
541
542 let client_received_messages_count_ref1 = Arc::new(AtomicU32::new(0));
544 let client_received_messages_count_ref2 = Arc::clone(&client_received_messages_count_ref1);
545 let server_received_messages_count_ref1 = Arc::new(AtomicU32::new(0));
546 let server_received_messages_count_ref2 = Arc::clone(&server_received_messages_count_ref1);
547
548 let client_peer_ref1 = Arc::new(Mutex::new(None));
550 let client_peer_ref2 = Arc::clone(&client_peer_ref1);
551
552 const TEST_CONFIG: ConstConfig = ConstConfig {
553 ..ConstConfig::default()
554 };
555 let mut server = CompositeSocketServer :: <{TEST_CONFIG.into()},
556 () >
557 :: new(LISTENING_INTERFACE, PORT);
558 type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {TEST_CONFIG.receiver_buffer as usize}, 1, {TEST_CONFIG.executor_instruments.into()}>;
559 type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {TEST_CONFIG.sender_buffer as usize}, 1>;
560 let connection_channel = server.spawn_processor :: <DummyClientAndServerMessages,
561 DummyClientAndServerMessages,
562 ProcessorUniType,
563 SenderChannelType,
564 _, _, _, _, _> (
565 move |connection_event: ProtocolEvent<{TEST_CONFIG.into()}, DummyClientAndServerMessages, SenderChannelType>| {
566 let client_peer = Arc::clone(&client_peer_ref1);
567 async move {
568 match connection_event {
569 ProtocolEvent::PeerArrived { peer } => {
570 client_peer.lock().await.replace(peer);
572 },
573 ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => (),
574 ProtocolEvent::LocalServiceTermination => {
575 let client_peer = client_peer.lock().await;
578 let client_peer = client_peer.as_ref().expect("No client is connected");
579 let _ = client_peer.send_async(DummyClientAndServerMessages::FloodPing).await;
581 client_peer.flush_and_close(Duration::ZERO).await;
582 }
583 }
584 }
585 },
586 move |_, _, peer, client_messages: MessagingMutinyStream<ProcessorUniType>| {
587 let server_received_messages_count = Arc::clone(&server_received_messages_count_ref1);
588 client_messages
589 .map(move |client_message| {
590 std::mem::forget(client_message); server_received_messages_count.fetch_add(1, Relaxed);
592 DummyClientAndServerMessages::FloodPing
593 })
594 .to_responsive_stream(peer, |_, _| ())
595 }
596 ).await.expect("Spawning a server processor");
597 server.start_single_protocol(connection_channel).await.expect("Starting the server");
598
599 let mut client = new_socket_client!(
601 TEST_CONFIG,
602 LISTENING_INTERFACE,
603 PORT);
604 start_client_processor!(TEST_CONFIG, Atomic, client,
605 DummyClientAndServerMessages,
606 DummyClientAndServerMessages,
607 |_| async {},
608 move |_, _, peer, server_messages| {
609 let client_received_messages_count = Arc::clone(&client_received_messages_count_ref1);
610 server_messages
611 .map(move |server_message| {
612 std::mem::forget(server_message); client_received_messages_count.fetch_add(1, Relaxed);
614 DummyClientAndServerMessages::FloodPing
615 })
616 .to_responsive_stream(peer, |_, _| ())
617 }
618 ).expect("Starting the client");
619
620 while client_peer_ref2.lock().await.is_none() {
622 tokio::time::sleep(Duration::from_millis(2)).await;
623 }
624 let wait_for_server_termination = server.termination_waiter();
626 server.terminate().await
627 .expect("ERROR Signaling the server of the termination intention");
628 let start = std::time::SystemTime::now();
629 _ = tokio::time::timeout(Duration::from_secs(5), wait_for_server_termination()).await
630 .expect("TIMED OUT (>5s) Waiting for the server to live it's life and to complete the termination process");
631 let elapsed_ms = start.elapsed().unwrap().as_millis();
632 assert!(client_received_messages_count_ref2.load(Relaxed) > 1, "The client didn't receive any messages (not even the 'server is shutting down' notification)");
633 assert!(server_received_messages_count_ref2.load(Relaxed) > 1, "The server didn't receive any messages (not even 'gracefully disconnecting' after being notified that the server is shutting down)");
634 assert!(elapsed_ms <= max_time_ms as u128,
635 "The server termination (of a never complying client) didn't complete in a reasonable time, meaning the termination code is wrong. Maximum acceptable time: {}ms; Measured Time: {}ms",
636 max_time_ms, elapsed_ms);
637 }
638
639 #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
646 async fn composite_protocol_stacking_pattern() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
647
648 const PORT: u16 = PORT_START+8;
649 const TEST_CONFIG: ConstConfig = ConstConfig::default();
650
651 let mut server = new_composite_socket_server!(
652 TEST_CONFIG,
653 LISTENING_INTERFACE,
654 PORT,
655 Protocols);
656
657 #[derive(Debug,PartialEq,Clone)]
658 enum Protocols {
659 IncomingClient,
660 WelcomeAuthenticatedFriend,
661 AccountSettings,
662 GoodbyeOptions,
663 Disconnect,
664 }
665
666 let incoming_client_processor_greeted = Arc::new(AtomicBool::new(false));
668 let incoming_client_processor_greeted_ref = Arc::clone(&incoming_client_processor_greeted);
669 let incoming_client_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
670 |_| future::ready(()),
671 move |_, _, peer, client_messages_stream| {
672 assert_eq!(peer.try_take_state(), Some(Some(Protocols::IncomingClient)), "Connection is in a wrong state");
673 let incoming_client_processor_greeted_ref = Arc::clone(&incoming_client_processor_greeted_ref);
674 client_messages_stream.then(move |_payload| {
675 let peer = Arc::clone(&peer);
676 incoming_client_processor_greeted_ref.store(true, Relaxed);
677 async move {
678 peer.send_async(format!("`IncomingClient`: New peer {peer:?} ended up initial authentication proceedings. SAY SOMETHING and you will be routed to 'WelcomeAuthenticatedFriend'")).await
679 .expect("Sending failed");
680 peer.set_state(Protocols::WelcomeAuthenticatedFriend).await;
681 peer.flush_and_close(Duration::from_secs(1)).await;
682 }
683 })
684 }
685 )?;
686
687 let welcome_authenticated_friend_processor_greeted = Arc::new(AtomicBool::new(false));
689 let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted);
690 let welcome_authenticated_friend_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
691 |connection_event| async {
692 match connection_event {
693 ProtocolEvent::PeerArrived { peer } =>
694 peer.send_async(format!("`WelcomeAuthenticatedFriend`: Now dealing with client {peer:?}. SAY SOMETHING and you will be routed to 'AccountSettings'")).await
695 .expect("Sending failed"),
696 _ => (),
697 }
698 },
699 move |_, _, peer, client_messages_stream| {
700 assert_eq!(peer.try_take_state(), Some(Some(Protocols::WelcomeAuthenticatedFriend)), "Connection is in a wrong state");
701 let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted_ref);
702 client_messages_stream.then(move |_payload| {
703 let peer = Arc::clone(&peer);
704 welcome_authenticated_friend_processor_greeted_ref.store(true, Relaxed);
705 async move {
706 peer.set_state(Protocols::AccountSettings).await;
707 peer.flush_and_close(Duration::from_secs(1)).await;
708 }
709 })
710 }
711 )?;
712
713 let account_settings_processor_greeted = Arc::new(AtomicBool::new(false));
714 let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted);
715 let account_settings_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
716 |connection_event| async {
717 match connection_event {
718 ProtocolEvent::PeerArrived { peer } =>
719 peer.send_async(format!("`AccountSettings`: Now dealing with client {peer:?}. SAY SOMETHING and you will be routed to 'GoodbyeOptions'")).await
720 .expect("Sending failed"),
721 _ => (),
722 }
723 },
724 move |_, _, peer, client_messages_stream| {
725 assert_eq!(peer.try_take_state(), Some(Some(Protocols::AccountSettings)), "Connection is in a wrong state");
726 let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted_ref);
727 client_messages_stream.then(move |_payload| {
728 let peer = Arc::clone(&peer);
729 account_settings_processor_greeted_ref.store(true, Relaxed);
730 async move {
731 peer.set_state(Protocols::GoodbyeOptions).await;
732 peer.flush_and_close(Duration::from_secs(1)).await;
733 }
734 })
735 }
736 )?;
737
738 let goodbye_options_processor_greeted = Arc::new(AtomicBool::new(false));
739 let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted);
740 let goodbye_options_processor = spawn_server_processor!(TEST_CONFIG, Atomic, server, String, String,
741 |connection_event| async {
742 match connection_event {
743 ProtocolEvent::PeerArrived { peer } =>
744 peer.send_async(format!("`GoodbyeOptions`: Now dealing with client {peer:?}. SAY SOMETHING and you will be DISCONNECTED, as our talking is over. Thank you.")).await
745 .expect("Sending failed"),
746 _ => (),
747 }
748 },
749 move |_, _, peer, client_messages_stream| {
750 assert_eq!(peer.try_take_state(), Some(Some(Protocols::GoodbyeOptions)), "Connection is in a wrong state");
751 let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted_ref);
752 client_messages_stream.then(move |_payload| {
753 let peer = Arc::clone(&peer);
754 goodbye_options_processor_greeted_ref.store(true, Relaxed);
755 async move {
756 peer.set_state(Protocols::Disconnect).await;
757 peer.flush_and_close(Duration::from_secs(1)).await;
758 }
759 })
760 }
761 )?;
762
763 let connection_routing_closure = move |socket_connection: &SocketConnection<Protocols>, _|
766 match socket_connection.state() {
767 Protocols::IncomingClient => Some(incoming_client_processor.clone_sender()),
768 Protocols::WelcomeAuthenticatedFriend => Some(welcome_authenticated_friend_processor.clone_sender()),
769 Protocols::AccountSettings => Some(account_settings_processor.clone_sender()),
770 Protocols::GoodbyeOptions => Some(goodbye_options_processor.clone_sender()),
771 Protocols::Disconnect => None,
772 };
773 server.start_multi_protocol(Protocols::IncomingClient, connection_routing_closure, |_| future::ready(())).await?;
774 let server_termination_waiter = server.termination_waiter();
775
776 let mut client = new_socket_client!(
778 TEST_CONFIG,
779 LISTENING_INTERFACE,
780 PORT);
781 start_client_processor!(TEST_CONFIG, Atomic, client, String, String,
782 |connection_event| async {
783 match connection_event {
784 ProtocolEvent::PeerArrived { peer } => peer.send_async(String::from("Hello! Am I in?")).await.expect("Sending failed"),
785 ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => (),
786 ProtocolEvent::LocalServiceTermination => (),
787 }
788 },
789 move |_, _, peer, server_messages| server_messages
790 .map(|msg| {
791 println!("RECEIVED: {msg} -- answering with 'OK'");
792 String::from("OK")
793 })
794 .to_responsive_stream(peer, |_, _| ())
795 )?;
796
797 let client_waiter = client.termination_waiter();
798 _ = tokio::time::timeout(Duration::from_secs(5), client_waiter()).await
800 .expect("TIMED OUT (>5s) Waiting for the client & server to do their stuff & disconnect the client");
801
802 server.terminate().await?;
804 server_termination_waiter().await?;
805
806 assert!(incoming_client_processor_greeted.load(Relaxed), "`IncomingClient` processor wasn't requested");
807 assert!(welcome_authenticated_friend_processor_greeted.load(Relaxed), "`WelcomeAuthenticatedFriend` processor wasn't requested");
808 assert!(account_settings_processor_greeted.load(Relaxed), "`AccountSettings` processor wasn't requested");
809 assert!(goodbye_options_processor_greeted.load(Relaxed), "`GoodbyeOptions` processor wasn't requested");
810
811 Ok(())
812 }
813
814
815 #[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
816 enum DummyClientAndServerMessages {
817 #[default]
818 FloodPing,
819 }
820
821 impl Deref for DummyClientAndServerMessages {
822 type Target = DummyClientAndServerMessages;
823 fn deref(&self) -> &Self::Target {
824 self
825 }
826 }
827
828 impl ReactiveMessagingSerializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
829 #[inline(always)]
830 fn serialize(remote_message: &DummyClientAndServerMessages, buffer: &mut Vec<u8>) {
831 ron_serializer(remote_message, buffer)
832 .expect("composite_socket_server.rs unit tests: No errors should have happened here!")
833 }
834 #[inline(always)]
835 fn processor_error_message(err: String) -> DummyClientAndServerMessages {
836 panic!("composite_socket_server.rs unit tests: protocol error when none should have happened: {err}");
837 }
838 }
839 impl ReactiveMessagingDeserializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
840 #[inline(always)]
841 fn deserialize(local_message: &[u8]) -> Result<DummyClientAndServerMessages, Box<dyn std::error::Error + Sync + Send>> {
842 ron_deserializer(local_message)
843 }
844 }
845}