1use crate::{
19 socket_services::socket_client::common::upgrade_to_connection_event_tracking,
20 types::{
21 ProtocolEvent,
22 MessagingMutinyStream,
23 },
24 socket_connection::{
25 peer::Peer,
26 socket_connection_handler::SocketConnectionHandler,
27 connection_provider::{ClientConnectionManager, ConnectionChannel},
28 },
29 serde::{ReactiveMessagingDeserializer, ReactiveMessagingSerializer},
30};
31use crate::socket_connection::connection::SocketConnection;
32use crate::socket_services::types::MessagingService;
33use crate::types::ConnectionEvent;
34use std::{
35 error::Error,
36 fmt::Debug,
37 future::Future,
38 sync::{
39 Arc,
40 atomic::AtomicBool,
41 },
42};
43use std::sync::atomic::Ordering::Relaxed;
44use reactive_mutiny::prelude::{FullDuplexUniChannel, GenericUni};
45use futures::{future::BoxFuture, Stream};
46use tokio::io::AsyncWriteExt;
47use log::{trace, warn, error, debug};
48
49
50#[macro_export]
64macro_rules! new_socket_client {
65 ($const_config: expr,
66 $host: expr,
67 $port: expr) => {
68 new_composite_socket_client!($const_config, $host, $port, ())
69 }
70}
71pub use new_socket_client;
72
73
74#[macro_export]
90macro_rules! new_composite_socket_client {
91 ($const_config: expr,
92 $ip: expr,
93 $port: expr,
94 $state_type: ty) => {{
95 const _CONFIG: u64 = $const_config.into();
96 CompositeSocketClient::<_CONFIG, $state_type>::new($ip, $port)
97 }}
98}
99pub use new_composite_socket_client;
100
101
102#[macro_export]
143macro_rules! spawn_client_processor {
144 ($const_config: expr,
145 $channel_type: tt,
146 $socket_client: expr,
147 $remote_messages: ty,
148 $local_messages: ty,
149 $protocol_events_handler_fn: expr,
150 $dialog_processor_builder_fn: expr) => {{
151 _define_processor_uni_and_sender_channel_types!($const_config, $channel_type, $remote_messages, $local_messages);
152 $socket_client.spawn_processor::<$remote_messages, $local_messages, ProcessorUniType, SenderChannel, _, _, _, _, _>($protocol_events_handler_fn, $dialog_processor_builder_fn).await
153 }}
154}
155pub use spawn_client_processor;
156
157
158#[macro_export]
196macro_rules! start_client_processor {
197 ($const_config: expr,
198 $channel_type: tt,
199 $socket_client: expr,
200 $remote_messages: ty,
201 $local_messages: ty,
202 $connection_events_handler_fn: expr,
203 $dialog_processor_builder_fn: expr) => {{
204 match spawn_client_processor!($const_config, $channel_type, $socket_client, $remote_messages, $local_messages, $connection_events_handler_fn, $dialog_processor_builder_fn) {
205 Ok(connection_channel) => $socket_client.start_single_protocol(connection_channel).await,
206 Err(err) => Err(err),
207 }
208 }}
209}
210pub use start_client_processor;
211
212
213pub struct CompositeSocketClient<const CONFIG: u64,
224 StateType: Send + Sync + Clone + Debug + 'static> {
225
226 connected: Arc<AtomicBool>,
228 ip: String,
230 port: u16,
232 client_termination_signaler: Option<tokio::sync::broadcast::Sender<()>>,
234 local_termination_is_complete_receiver: Option<tokio::sync::mpsc::Receiver<()>>,
236 local_termination_is_complete_sender: tokio::sync::mpsc::Sender<()>,
237 returned_connection_source: Option<tokio::sync::mpsc::Receiver<SocketConnection<StateType>>>,
240 returned_connection_sink: tokio::sync::mpsc::Sender<SocketConnection<StateType>>,
241 spawned_processors_count: u32,
243}
244impl<const CONFIG: u64,
245 StateType: Send + Sync + Clone + Debug + 'static>
246CompositeSocketClient<CONFIG, StateType> {
247
248 pub fn new<IntoString: Into<String>>
252 (ip: IntoString,
253 port: u16)
254 -> Self {
255 let (returned_connection_sink, returned_connection_source) = tokio::sync::mpsc::channel::<SocketConnection<StateType>>(1);
256 let (client_termination_signaler, _) = tokio::sync::broadcast::channel(1);
257 let (local_termination_is_complete_sender, local_termination_is_complete_receiver) = tokio::sync::mpsc::channel(1);
258 Self {
259 connected: Arc::new(AtomicBool::new(false)),
260 ip: ip.into(),
261 port,
262 client_termination_signaler: Some(client_termination_signaler),
263 local_termination_is_complete_receiver: Some(local_termination_is_complete_receiver),
264 local_termination_is_complete_sender,
265 returned_connection_source: Some(returned_connection_source),
266 returned_connection_sink,
267 spawned_processors_count: 0,
268 }
269 }
270
271 const fn config() -> u64 {
273 CONFIG
274 }
275
276 pub fn is_connected(&self) -> bool {
278 self.connected.load(Relaxed)
279 }
280}
281
282impl<const CONFIG: u64,
283 StateType: Send + Sync + Clone + Debug + 'static>
284MessagingService<CONFIG>
285for CompositeSocketClient<CONFIG, StateType> {
286 type StateType = StateType;
287
288 #[inline(always)]
290 async fn spawn_processor<RemoteMessages: ReactiveMessagingDeserializer<RemoteMessages> + Send + Sync + PartialEq + Debug + 'static,
291 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
292 ProcessorUniType: GenericUni<ItemType=RemoteMessages> + Send + Sync + 'static,
293 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync + 'static,
294 OutputStreamItemsType: Send + Sync + Debug + 'static,
295 ServerStreamType: Stream<Item=OutputStreamItemsType> + Send + 'static,
296 ConnectionEventsCallbackFuture: Future<Output=()> + Send + 'static,
297 ConnectionEventsCallback: Fn(ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) -> ConnectionEventsCallbackFuture + Send + Sync + 'static,
298 ProcessorBuilderFn: Fn(String, u16, Arc<Peer<CONFIG, LocalMessages, SenderChannel, StateType>>, MessagingMutinyStream<ProcessorUniType>) -> ServerStreamType + Send + Sync + 'static>
299
300 (&mut self,
301 connection_events_callback: ConnectionEventsCallback,
302 dialog_processor_builder_fn: ProcessorBuilderFn)
303
304 -> Result<ConnectionChannel<StateType>, Box<dyn Error + Sync + Send>> {
305
306 let ip = self.ip.clone();
307 let port = self.port;
308
309 let returned_connection_sink = self.returned_connection_sink.clone();
310 let local_termination_is_complete_sender = self.local_termination_is_complete_sender.clone();
311 let client_termination_signaler = self.client_termination_signaler.clone();
312
313 let connection_events_callback = upgrade_to_connection_event_tracking(&self.connected, local_termination_is_complete_sender, connection_events_callback);
314
315 let mut connection_provider = ConnectionChannel::new();
317 let mut connection_source = connection_provider.receiver()
318 .ok_or_else(|| String::from("couldn't move the Connection Receiver out of the Connection Provider"))?;
319
320 tokio::spawn(async move {
321 if let Some(connection) = connection_source.recv().await {
322 let client_termination_receiver = client_termination_signaler.expect("BUG! client_termination_signaler is NONE").subscribe();
323 let socket_communications_handler = SocketConnectionHandler::<CONFIG, RemoteMessages, LocalMessages, ProcessorUniType, SenderChannel, StateType>::new();
324 let result = socket_communications_handler.client_for_text_protocol(connection,
325 client_termination_receiver,
326 connection_events_callback,
327 dialog_processor_builder_fn).await
328 .map_err(|err| format!("Error while executing the dialog processor: {err}"));
329 match result {
330 Ok(socket_connection) => {
331 if let Err(err) = returned_connection_sink.send(socket_connection).await {
332 warn!("`reactive-messaging::CompositeGenericSocketClient`: ERROR returning the connection (after the unresponsive & textual processor ended) @ {ip}:{port}: {err}");
333 }
334 }
335 Err(err) => {
336 error!("`reactive-messaging::CompositeGenericSocketClient`: ERROR in client (unresponsive & textual) @ {ip}:{port}: {err}");
337 }
338 }
339 }
340 });
341 self.spawned_processors_count += 1;
342 Ok(connection_provider)
343 }
344
345 async fn start_multi_protocol<ConnectionEventsCallbackFuture: Future<Output=()> + Send>
346 (&mut self,
347 initial_connection_state: StateType,
348 mut connection_routing_closure: impl FnMut(&SocketConnection<StateType>, bool) -> Option<tokio::sync::mpsc::Sender<SocketConnection<StateType>>> + Send + 'static,
349 connection_events_callback: impl Fn(ConnectionEvent<StateType>) -> ConnectionEventsCallbackFuture + Send + 'static)
350 -> Result<(), Box<dyn Error + Sync + Send>> {
351
352 let mut connection_manager = ClientConnectionManager::<CONFIG>::new(&self.ip, self.port);
353 let connection = connection_manager.connect_retryable().await
354 .map_err(|err| format!("Error making client connection to {}:{} -- {err}", self.ip, self.port))?;
355 let mut just_opened_connection = Some(connection);
356
357 let mut returned_connection_source = self.returned_connection_source.take()
358 .ok_or_else(|| String::from("couldn't move the 'Returned Connection Source' out of the Connection Channel"))?;
359
360 let ip = self.ip.clone();
361 let port = self.port;
362
363 tokio::spawn(async move {
370
371 loop {
372 let (mut socket_connection, sender) = match just_opened_connection.take() {
373 Some(just_opened_connection) => {
375 let just_opened_socket_connection = SocketConnection::new(just_opened_connection, initial_connection_state.clone());
376 connection_events_callback(ConnectionEvent::Connected(&just_opened_socket_connection)).await;
377 let sender = connection_routing_closure(&just_opened_socket_connection, false);
378 (just_opened_socket_connection, sender)
379 },
380 None => {
382 let Some(returned_socket_connection) = returned_connection_source.recv().await else { break };
383 let sender = (!returned_socket_connection.closed())
384 .then_some(())
385 .and_then(|_| connection_routing_closure(&returned_socket_connection, true));
386 (returned_socket_connection, sender)
387 },
388 };
389
390 match sender {
392 Some(sender) => {
393 trace!("`reactive-messaging::CompositeSocketClient`: ROUTING the connection with the server @ {ip}:{port} to another processor");
394 if let Err(_) = sender.send(socket_connection).await {
395 error!("`reactive-messaging::CompositeSocketClient`: BUG(?) in the client connected to the server @ {ip}:{port} while re-routing the connection: THE NEW (ROUTED) PROCESSOR CAN NO LONGER RECEIVE CONNECTIONS -- THE CONNECTION WILL BE DROPPED");
396 break
397 }
398 },
399 None => {
400 connection_events_callback(ConnectionEvent::Disconnected(&socket_connection)).await;
401 if let Err(err) = socket_connection.connection_mut().shutdown().await {
402 debug!("`reactive-messaging::CompositeSocketClient`: ERROR in the client connected to the server @ {ip}:{port} while shutting down the connection (after the processors ended): {err}");
403 }
404 break
405 },
406 }
407 }
408 trace!("`reactive-messaging::CompositeSocketClient`: The 'Connection Routing Task' for the client connected to the server @ {ip}:{port} ended -- hopefully, due to a graceful client termination.");
410 });
413 Ok(())
414 }
415
416 fn termination_waiter(&mut self) -> Box<dyn FnOnce() -> BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>> {
417 let mut local_termination_receiver = self.local_termination_is_complete_receiver.take();
418 let mut latch = self.spawned_processors_count;
419 Box::new(move || Box::pin({
420 async move {
421 if let Some(mut local_termination_receiver) = local_termination_receiver.take() {
422 while latch > 0 {
423 match local_termination_receiver.recv().await {
424 Some(()) => latch -= 1,
425 None => return Err(Box::from(String::from("CompositeGenericSocketClient::termination_waiter(): It is no longer possible to tell when the client will be terminated: the broadcast channel was closed")))
426 }
427 }
428 Ok(())
429 } else {
430 Err(Box::from("CompositeGenericSocketClient: \"wait for termination\" requested, but the client was not started (or a previous service termination was commanded) at the moment `termination_waiter()` was called"))
431 }
432 }
433 }))
434 }
435
436 async fn terminate(mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
437 match self.client_termination_signaler.take() {
438 Some(client_sender) => {
439 warn!("`reactive-messaging::CompositeGenericSocketClient`: Shutdown asked & initiated for client connected @ {}:{}", self.ip, self.port);
440 _ = client_sender.send(());
441 Ok(())
442 }
443 None => {
444 Err(Box::from("Shutdown requested, but the service was not started. Ignoring..."))
445 }
446 }
447 }
448}
449
450
451#[cfg(any(test,doc))]
453mod tests {
454 use super::*;
455 use crate::prelude::*;
456 use std::{future, ops::Deref};
457 use std::sync::atomic::Ordering::Relaxed;
458 use std::time::Duration;
459 use futures::StreamExt;
460 use serde::{Deserialize, Serialize};
461 use crate::{new_socket_server, start_server_processor};
462 use crate::serde::{ron_deserializer, ron_serializer};
463
464
465 const REMOTE_SERVER: &str = "66.45.249.218";
466
467
468 #[cfg_attr(not(doc), test)]
470 fn single_protocol_instantiation() {
471 let _atomic_client = new_socket_client!(
472 ConstConfig {
473 ..ConstConfig::default()
474 },
475 REMOTE_SERVER, 443);
476
477 let _fullsync_client = new_socket_client!(
478 ConstConfig {
479 ..ConstConfig::default()
480 },
481 REMOTE_SERVER, 443);
482
483 let _crossbeam_client = new_socket_client!(
484 ConstConfig {
485 ..ConstConfig::default()
486 },
487 REMOTE_SERVER, 443);
488 }
489
490 #[cfg_attr(not(doc), test)]
492 fn composite_protocol_instantiation() {
493 let _atomic_client = new_composite_socket_client!(
494 ConstConfig {
495 ..ConstConfig::default()
496 },
497 REMOTE_SERVER, 443, () );
498
499 let _fullsync_client = new_composite_socket_client!(
500 ConstConfig {
501 ..ConstConfig::default()
502 },
503 REMOTE_SERVER, 443, () );
504
505 let _crossbeam_client = new_composite_socket_client!(
506 ConstConfig {
507 ..ConstConfig::default()
508 },
509 REMOTE_SERVER, 443, () );
510 }
511
512 #[cfg_attr(not(doc),tokio::test)]
515 async fn doc_usage() {
516
517 const TEST_CONFIG: ConstConfig = ConstConfig::default();
518
519 let mut client = new_socket_client!(
523 TEST_CONFIG,
524 REMOTE_SERVER,
525 443);
526 start_client_processor!(TEST_CONFIG, Atomic, client,
527 DummyClientAndServerMessages,
528 DummyClientAndServerMessages,
529 connection_events_handler,
530 unresponsive_processor
531 ).expect("Error starting a single protocol client");
532 async fn connection_events_handler<const CONFIG: u64,
533 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
534 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync>
535 (_event: ProtocolEvent<CONFIG, LocalMessages, SenderChannel>) {
536 }
537 fn unresponsive_processor<const CONFIG: u64,
538 LocalMessages: ReactiveMessagingSerializer<LocalMessages> + Send + Sync + PartialEq + Debug,
539 SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
540 StreamItemType: Deref<Target=DummyClientAndServerMessages>>
541 (_client_addr: String,
542 _connected_port: u16,
543 _peer: Arc<Peer<CONFIG, LocalMessages, SenderChannel>>,
544 client_messages_stream: impl Stream<Item=StreamItemType>)
545 -> impl Stream<Item=()> {
546 client_messages_stream.map(|_payload| ())
547 }
548 let wait_for_termination = client.termination_waiter();
549 client.terminate().await.expect("Error on client Termination command");
550 wait_for_termination().await.expect("Error waiting for client Termination");
551 warn!("1st DONE");
552
553 let mut client = new_socket_client!(
557 TEST_CONFIG,
558 REMOTE_SERVER,
559 443);
560 start_client_processor!(TEST_CONFIG, Atomic, client,
561 DummyClientAndServerMessages,
562 DummyClientAndServerMessages,
563 connection_events_handler,
564 responsive_processor
565 ).expect("Error starting a single protocol client");
566 fn responsive_processor<const CONFIG: u64,
567 SenderChannel: FullDuplexUniChannel<ItemType=DummyClientAndServerMessages, DerivedItemType=DummyClientAndServerMessages> + Send + Sync,
568 StreamItemType: Deref<Target=DummyClientAndServerMessages>>
569 (_client_addr: String,
570 _connected_port: u16,
571 peer: Arc<Peer<CONFIG, DummyClientAndServerMessages, SenderChannel>>,
572 client_messages_stream: impl Stream<Item=StreamItemType>)
573 -> impl Stream<Item=()> {
574 client_messages_stream
575 .map(|_payload| DummyClientAndServerMessages::FloodPing)
576 .to_responsive_stream(peer, |_, _| ())
577 }
578 let wait_for_termination = client.termination_waiter();
579 client.terminate().await.expect("Error on client Termination command");
580 wait_for_termination().await.expect("Error waiting for client Termination");
581 warn!("2nd DONE");
582
583 let mut client = new_socket_client!(
586 TEST_CONFIG,
587 REMOTE_SERVER,
588 443);
589 start_client_processor!(TEST_CONFIG, Atomic, client,
590 DummyClientAndServerMessages,
591 DummyClientAndServerMessages,
592 |_| future::ready(()),
593 |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
594 ).expect("Error starting a single protocol client");
595 let wait_for_termination = client.termination_waiter();
596 client.terminate().await.expect("Error on client Termination command");
597 wait_for_termination().await.expect("Error waiting for client Termination");
598 warn!("3rd DONE");
599
600 const CUSTOM_CONFIG: ConstConfig = ConstConfig {
606 receiver_buffer: 2048,
607 sender_buffer: 1024,
608 executor_instruments: reactive_mutiny::prelude::Instruments::LogsWithExpensiveMetrics,
609 ..ConstConfig::default()
610 };
611 let mut client = CompositeSocketClient :: <{CUSTOM_CONFIG.into()},
612 () >
613 :: new(REMOTE_SERVER,443);
614 type ProcessorUniType = UniZeroCopyFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.receiver_buffer as usize}, 1, {CUSTOM_CONFIG.executor_instruments.into()}>;
615 type SenderChannelType = ChannelUniMoveFullSync<DummyClientAndServerMessages, {CUSTOM_CONFIG.sender_buffer as usize}, 1>;
616 let connection_channel = client.spawn_processor::<DummyClientAndServerMessages,
617 DummyClientAndServerMessages,
618 ProcessorUniType,
619 SenderChannelType,
620 _, _, _, _, _ > (
621 |_| future::ready(()),
622 |_, _, _, client_messages_stream| client_messages_stream.map(|_payload| DummyClientAndServerMessages::FloodPing)
623 ).await.expect("Error spawning a protocol processor");
624 client.start_single_protocol(connection_channel).await.expect("Error starting a single protocol client");
625 let wait_for_termination = client.termination_waiter();
626 client.terminate().await.expect("Error on client Termination command");
627 wait_for_termination().await.expect("Error waiting for client Termination");
628 warn!("4th DONE");
629 }
630
631 #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
635 async fn termination_process() {
636 const IP: &str = "127.0.0.1";
637 const PORT: u16 = 8030;
638 const TEST_CONFIG: ConstConfig = ConstConfig::default();
639
640 let connected_to_client = Arc::new(AtomicBool::new(false));
642 let connected_to_client_ref = Arc::clone(&connected_to_client);
643 let mut server = new_socket_server!(TEST_CONFIG, IP, PORT);
644 start_server_processor!(TEST_CONFIG, Atomic, server, String, String,
645 move |event| {
646 let connected_to_client_ref = Arc::clone(&connected_to_client_ref);
647 async move {
648 match event {
649 ProtocolEvent::PeerArrived { .. } => {
650 connected_to_client_ref.store(true, Relaxed);
651 },
652 ProtocolEvent::PeerLeft { .. } => {},
653 ProtocolEvent::LocalServiceTermination => {},
654 }
655 }
656 },
657 |_, _, _, stream| stream
658 ).expect("Error starting the server");
659 let mut client = new_socket_client!(TEST_CONFIG, IP, PORT);
660 start_client_processor!(TEST_CONFIG, Atomic, client, String, String,
661 |_| future::ready(()),
662 |_, _, _, stream| stream
663 ).expect("Error starting the client");
664 let termination_waiter = client.termination_waiter();
665 tokio::time::sleep(Duration::from_millis(20)).await;
667 assert!(connected_to_client.load(Relaxed), "Client didn't connect to server");
668 assert!(client.is_connected(), "`client` didn't report any connection");
669 client.terminate().await.expect("Could not terminate the client");
670 _ = tokio::time::timeout(Duration::from_millis(100), termination_waiter()).await
671 .expect("Timed out (>100ms) waiting the the client's termination");
672 server.terminate().await.expect("Could not terminate the server");
673
674 let connected_to_client = Arc::new(AtomicBool::new(false));
676 let connected_to_client_ref = Arc::clone(&connected_to_client);
677 let mut server = new_socket_server!(TEST_CONFIG, IP, PORT);
678 start_server_processor!(TEST_CONFIG, Atomic, server, String, String,
679 move |event| {
680 let connected_to_client_ref = Arc::clone(&connected_to_client_ref);
681 async move {
682 match event {
683 ProtocolEvent::PeerArrived { peer } => {
684 connected_to_client_ref.store(true, Relaxed);
685 peer.send(String::from("Goodbye")).expect("Couldn't send");
686 },
687 ProtocolEvent::PeerLeft { .. } => {},
688 ProtocolEvent::LocalServiceTermination => {},
689 }
690 }
691 },
692 |_, _, _, stream| stream
693 ).expect("Error starting the server");
694 let mut client = new_socket_client!(TEST_CONFIG, IP, PORT);
695 start_client_processor!(TEST_CONFIG, Atomic, client, String, String,
696 |_| future::ready(()),
697 |_, _, peer, stream| stream.map(move |_msg| peer.cancel_and_close()) ).expect("Error starting the client");
699 let termination_waiter = client.termination_waiter();
700 tokio::time::sleep(Duration::from_millis(100)).await;
704 assert!(connected_to_client.load(Relaxed), "Client didn't connect to server");
705 _ = tokio::time::timeout(Duration::from_millis(1), termination_waiter()).await
706 .expect("A disconnected client should signal its `termination_waiter()` for an immediate return -- what didn't happen");
707 assert!(!client.is_connected(), "`client` didn't report the disconnection");
708 server.terminate().await.expect("Could not terminate the server");
709
710 }
711
712 #[cfg_attr(not(doc),tokio::test(flavor = "multi_thread"))]
720 async fn composite_protocol_stacking_pattern() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
721
722 const IP: &str = "127.0.0.1";
723 const PORT: u16 = 8031;
724 const TEST_CONFIG: ConstConfig = ConstConfig::default();
725
726 let mut server = new_socket_server!(
728 TEST_CONFIG,
729 IP,
730 PORT);
731 start_server_processor!(TEST_CONFIG, Atomic, server, String, String,
732 |_| future::ready(()),
733 move |_, _, peer, client_messages| client_messages
734 .map(|msg| {
735 println!("SERVER RECEIVED: {msg} -- answering with 'OK'");
736 String::from("OK")
737 })
738 .to_responsive_stream(peer, |_, _| ())
739
740 )?;
741 let server_termination_waiter = server.termination_waiter();
742
743 let mut client = new_composite_socket_client!(
744 TEST_CONFIG,
745 IP,
746 PORT,
747 Protocols );
748
749 #[derive(Debug,PartialEq,Clone)]
750 enum Protocols {
751 Handshake,
752 WelcomeAuthenticatedFriend,
753 AccountSettings,
754 GoodbyeOptions,
755 Disconnect,
756 }
757
758 let handshake_processor_greeted = Arc::new(AtomicBool::new(false));
760 let handshake_processor_greeted_ref = Arc::clone(&handshake_processor_greeted);
761 let handshake_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
762 |connection_event| async {
763 match connection_event {
764 ProtocolEvent::PeerArrived { peer } => peer.send_async(String::from("Client is at `Handshake`")).await
765 .expect("Sending failed"),
766 _ => {},
767 }
768 },
769 move |_, _, peer, server_messages_stream| {
770 assert_eq!(peer.try_take_state(), Some(Some(Protocols::Handshake)), "Connection is in a wrong state");
771 let handshake_processor_greeted_ref = Arc::clone(&handshake_processor_greeted_ref);
772 server_messages_stream.then(move |_payload| {
773 let peer = Arc::clone(&peer);
774 handshake_processor_greeted_ref.store(true, Relaxed);
775 async move {
776 peer.set_state(Protocols::WelcomeAuthenticatedFriend).await;
777 peer.flush_and_close(Duration::from_secs(1)).await;
778 }
779 })
780 }
781 )?;
782
783 let welcome_authenticated_friend_processor_greeted = Arc::new(AtomicBool::new(false));
785 let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted);
786 let welcome_authenticated_friend_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
787 |connection_event| async {
788 match connection_event {
789 ProtocolEvent::PeerArrived { peer } => peer.send_async(String::from("Client is at `WelcomeAuthenticatedFriend`")).await
790 .expect("Sending failed"),
791 _ => {},
792 }
793 },
794 move |_, _, peer, server_messages_stream| {
795 assert_eq!(peer.try_take_state(), Some(Some(Protocols::WelcomeAuthenticatedFriend)), "Connection is in a wrong state");
796 let welcome_authenticated_friend_processor_greeted_ref = Arc::clone(&welcome_authenticated_friend_processor_greeted_ref);
797 server_messages_stream.then(move |_payload| {
798 let peer = Arc::clone(&peer);
799 welcome_authenticated_friend_processor_greeted_ref.store(true, Relaxed);
800 async move {
801 peer.set_state(Protocols::AccountSettings).await;
802 peer.flush_and_close(Duration::from_secs(1)).await;
803 }
804 })
805 }
806 )?;
807
808 let account_settings_processor_greeted = Arc::new(AtomicBool::new(false));
809 let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted);
810 let account_settings_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
811 |connection_event| async {
812 match connection_event {
813 ProtocolEvent::PeerArrived { peer } => peer.send_async(String::from("Client is at `AccountSettings`")).await
814 .expect("Sending failed"),
815 _ => {},
816 }
817 },
818 move |_, _, peer, server_messages_stream| {
819 assert_eq!(peer.try_take_state(), Some(Some(Protocols::AccountSettings)), "Connection is in a wrong state");
820 let account_settings_processor_greeted_ref = Arc::clone(&account_settings_processor_greeted_ref);
821 server_messages_stream.then(move |_payload| {
822 let peer = Arc::clone(&peer);
823 account_settings_processor_greeted_ref.store(true, Relaxed);
824 async move {
825 peer.set_state(Protocols::GoodbyeOptions).await;
826 peer.flush_and_close(Duration::from_secs(1)).await;
827 }
828 })
829 }
830 )?;
831
832 let goodbye_options_processor_greeted = Arc::new(AtomicBool::new(false));
833 let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted);
834 let goodbye_options_processor = spawn_client_processor!(TEST_CONFIG, Atomic, client, String, String,
835 |connection_event| async {
836 match connection_event {
837 ProtocolEvent::PeerArrived { peer } => peer.send_async(String::from("Client is at `GoodbyeOptions`")).await
838 .expect("Sending failed"),
839 _ => {},
840 }
841 },
842 move |_, _, peer, server_messages_stream| {
843 assert_eq!(peer.try_take_state(), Some(Some(Protocols::GoodbyeOptions)), "Connection is in a wrong state");
844 let goodbye_options_processor_greeted_ref = Arc::clone(&goodbye_options_processor_greeted_ref);
845 server_messages_stream.then(move |_payload| {
846 let peer = Arc::clone(&peer);
847 goodbye_options_processor_greeted_ref.store(true, Relaxed);
848 async move {
849 peer.set_state(Protocols::Disconnect).await;
850 peer.flush_and_close(Duration::from_secs(1)).await;
851 }
852 })
853 }
854 )?;
855
856 let connection_routing_closure = move |socket_connection: &SocketConnection<Protocols>, _|
859 match socket_connection.state() {
860 Protocols::Handshake => Some(handshake_processor.clone_sender()),
861 Protocols::WelcomeAuthenticatedFriend => Some(welcome_authenticated_friend_processor.clone_sender()),
862 Protocols::AccountSettings => Some(account_settings_processor.clone_sender()),
863 Protocols::GoodbyeOptions => Some(goodbye_options_processor.clone_sender()),
864 Protocols::Disconnect => None,
865 };
866 client.start_multi_protocol(Protocols::Handshake, connection_routing_closure, |_| future::ready(())).await?;
867
868 let client_waiter = client.termination_waiter();
869 _ = tokio::time::timeout(Duration::from_secs(5), client_waiter()).await
871 .expect("TIMED OUT (>5s) Waiting for the server & client to do their stuff & disconnect the client");
872
873 server.terminate().await?;
875 server_termination_waiter().await?;
876
877 assert!(handshake_processor_greeted.load(Relaxed), "`Handshake` processor wasn't requested");
878 assert!(welcome_authenticated_friend_processor_greeted.load(Relaxed), "`WelcomeAuthenticatedFriend` processor wasn't requested");
879 assert!(account_settings_processor_greeted.load(Relaxed), "`AccountSettings` processor wasn't requested");
880 assert!(goodbye_options_processor_greeted.load(Relaxed), "`GoodbyeOptions` processor wasn't requested");
881
882 Ok(())
883 }
884
885
886 #[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
887 enum DummyClientAndServerMessages {
888 #[default]
889 FloodPing,
890 }
891
892 impl Deref for DummyClientAndServerMessages {
893 type Target = DummyClientAndServerMessages;
894 fn deref(&self) -> &Self::Target {
895 self
896 }
897 }
898
899 impl ReactiveMessagingSerializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
900 #[inline(always)]
901 fn serialize(remote_message: &DummyClientAndServerMessages, buffer: &mut Vec<u8>) {
902 ron_serializer(remote_message, buffer)
903 .expect("socket_client.rs unit tests: No errors should have happened here!")
904 }
905 #[inline(always)]
906 fn processor_error_message(err: String) -> DummyClientAndServerMessages {
907 panic!("socket_client.rs unit tests: protocol error when none should have happened: {err}");
908 }
909 }
910 impl ReactiveMessagingDeserializer<DummyClientAndServerMessages> for DummyClientAndServerMessages {
911 #[inline(always)]
912 fn deserialize(local_message: &[u8]) -> Result<DummyClientAndServerMessages, Box<dyn std::error::Error + Sync + Send>> {
913 ron_deserializer(local_message)
914 }
915 }
916}