reactive-messaging 0.4.1

Reactive client/server communications, focused on high performance
//! Resting place of [ClientProtocolProcessor]

use crate::composite_protocol_stacking_common::{
    protocol_model::{GameClientMessages, PROTOCOL_VERSION, GameServerMessages},
    logic::{
        ping_pong_logic::{act, Umpire},
        ping_pong_models::{GameOverStates, GameStates, MatchConfig, PingPongEvent, Players, TurnFlipEvents},
        protocol_processor::{react_to_hard_fault, react_to_rally_event, react_to_score, react_to_service_soft_fault},
    }
};
use reactive_messaging::prelude::{ProtocolEvent, Peer, ResponsiveStream};
use std::{
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering::Relaxed},
    },
    time::Instant,
};
use std::fmt::Debug;
use reactive_mutiny::prelude::FullDuplexUniChannel;
use futures::{future, Stream, stream, StreamExt};
use log::{debug, info, error, warn};
use crate::composite_protocol_stacking_common::protocol_model::{PreGameClientError, PreGameClientMessages, PreGameServerMessages, ProtocolStates, PROTOCOL_VERSIONS};


const MATCH_CONFIG: MatchConfig = MatchConfig {
    score_limit:            15000,
    rally_timeout_millis:   1000,
    no_bounce_probability:  0.001,
    no_rebate_probability:  0.002,
    mishit_probability:     0.003,
    pre_bounce_probability: 0.004,
    net_touch_probability:  0.005,
    net_block_probability:  0.006,
    ball_out_probability:   0.007,
};

pub struct ClientProtocolProcessor {
    start_instant:     Instant,
    in_messages_count: AtomicU64,
    out_messages_count: AtomicU64,
}

impl ClientProtocolProcessor {

    pub fn new() -> Arc<Self> {
        Arc::new(Self {
            start_instant:     Instant::now(),
            in_messages_count: AtomicU64::new(0),
            out_messages_count: AtomicU64::new(0),
        })
    }

    pub fn pre_game_connection_events_handler<const NETWORK_CONFIG: u64,
                                              SenderChannel:        FullDuplexUniChannel<ItemType=PreGameClientMessages, DerivedItemType=PreGameClientMessages> + Send + Sync>
                                             (self: &Arc<Self>,
                                              connection_event: ProtocolEvent<NETWORK_CONFIG, PreGameClientMessages, SenderChannel, ProtocolStates>) {
        if let ProtocolEvent::PeerArrived { peer } = connection_event {
            debug!("Connected: {:?}", peer);
        }
    }

    pub fn pre_game_dialog_processor<const NETWORK_CONFIG: u64,
                                     SenderChannel:        FullDuplexUniChannel<ItemType=PreGameClientMessages, DerivedItemType=PreGameClientMessages> + Send + Sync,
                                     StreamItemType:       AsRef<PreGameServerMessages> + Debug>

                                    (self:                   &Arc<Self>,
                                     _server_addr:           String,
                                     _port:                  u16,
                                     peer:                   Arc<Peer<NETWORK_CONFIG, PreGameClientMessages, SenderChannel, ProtocolStates>>,
                                     server_messages_stream: impl Stream<Item=StreamItemType>)

                                    -> impl Stream<Item=bool> {
                            
        let cloned_self1 = Arc::clone(self);
        let cloned_self2 = Arc::clone(self);
        let peer_ref = peer.clone();
        peer.send(PreGameClientMessages::Version(PROTOCOL_VERSION))
            .expect("Couldn't send the pre-game VERSION message");
        server_messages_stream
            .inspect(move |_| {cloned_self1.in_messages_count.fetch_add(1, Relaxed);})  // increment the incoming messages metrics
            .then(move |server_message| {
                let peer = peer.clone();
                async move {
                    match server_message.as_ref() {
                        PreGameServerMessages::Version(server_protocol_version) => {
                            if server_protocol_version == &PROTOCOL_VERSION {
                                // Upgrade to the next protocol, sending the trigger message to do the same at the server
                                peer.set_state(ProtocolStates::Game).await;
                                PreGameClientMessages::Config(MATCH_CONFIG)
                            } else {
                                warn!("Aborting Connection: Client protocol version is {:?} while server is {:?}",
                                      PROTOCOL_VERSIONS.get_key_value(&PROTOCOL_VERSION),
                                      PROTOCOL_VERSIONS.get_key_value(server_protocol_version));
                                peer.set_state(ProtocolStates::Disconnect).await;
                                PreGameClientMessages::Error(PreGameClientError::IncompatibleProtocols)
                            }
                        },
                        PreGameServerMessages::Error(err) => {
                            warn!("Server (pre game) answered with error {err:?} -- closing the connection");
                            peer.set_state(ProtocolStates::Disconnect).await;
                            PreGameClientMessages::Error(PreGameClientError::TextualProtocolProcessorParsingError)
                        },
                    }
                }
            })
            .to_responsive_stream(peer_ref, |client_message, _peer| matches!(client_message, PreGameClientMessages::Error(..)) )
            .inspect(move |_| { cloned_self2.out_messages_count.fetch_add(1, Relaxed); })        // increment the outgoing messages metrics
            .take(1)
    }

    pub async fn game_connection_events_handler<const NETWORK_CONFIG: u64,
                                                SenderChannel:        FullDuplexUniChannel<ItemType=GameClientMessages, DerivedItemType=GameClientMessages> + Send + Sync>
                                               (self: &Arc<Self>,
                                                connection_event: ProtocolEvent<NETWORK_CONFIG, GameClientMessages, SenderChannel, ProtocolStates>) {
        match connection_event {
            ProtocolEvent::PeerArrived { peer: _ } => {},
            ProtocolEvent::PeerLeft { peer, stream_stats } => {
                let in_messages_count = self.in_messages_count.load(Relaxed);
                let out_messages_count = self.out_messages_count.load(Relaxed);
                info!("CLIENT Disconnected: {:?}; stats: {:?} -- with {in_messages_count}+{out_messages_count} messages IN & OUT: {:.2}/s",
                      peer,
                      stream_stats,
                      (in_messages_count + out_messages_count) as f64 / self.start_instant.elapsed().as_secs_f64());
                peer.set_state(ProtocolStates::Disconnect).await;     // the next state -- after this dialog ends -- is "disconnect".
            }
            ProtocolEvent::LocalServiceTermination => {
                info!("Ping-Pong client shutdown requested. Notifying the server...");
            }
        }
    }

    pub fn game_dialog_processor<const NETWORK_CONFIG: u64,
                                 SenderChannel:        FullDuplexUniChannel<ItemType=GameClientMessages, DerivedItemType=GameClientMessages> + Send + Sync,
                                 StreamItemType:       AsRef<GameServerMessages> + Debug>

                                (self:                   &Arc<Self>,
                                 server_addr:            String,
                                 port:                   u16,
                                 peer:                   Arc<Peer<NETWORK_CONFIG, GameClientMessages, SenderChannel, ProtocolStates>>,
                                 server_messages_stream: impl Stream<Item=StreamItemType>)

                                -> impl Stream<Item=bool> {

        let cloned_self1 = Arc::clone(self);
        let cloned_self2 = Arc::clone(self);
        let peer_ref = Arc::clone(&peer);
        let mut umpire = Umpire::new(&MATCH_CONFIG, Players::Ourself);
        server_messages_stream.map(move |server_message| {
            cloned_self1.in_messages_count.fetch_add(1, Relaxed);
            match server_message.as_ref() {

                GameServerMessages::GameStarted => {
                    // Start the game: service the ball
                    let our_action = act();
                    let our_event = umpire.process_turn(Players::Ourself, &our_action);
                    vec![GameClientMessages::PingPongEvent(our_event)]
                },

                GameServerMessages::MatchConfig(match_config) => {
                    info!("Server told us it is using {:?}", match_config);
                    vec![/* no answer */]
                }

                GameServerMessages::PingPongEvent(reported_ping_pong_event) => {
                    match reported_ping_pong_event {
                        PingPongEvent::TurnFlip { player_action: opponent_action, resulting_event } => {
                            match resulting_event {
                                TurnFlipEvents::SuccessfulService => vec![
                                    GameClientMessages::PingPongEvent(react_to_rally_event(&mut umpire,
                                                                                           "WaitingForService",
                                                                                           |rs| matches!(rs, GameStates::WaitingForService { attempt: _ }),
                                                                                           opponent_action,
                                                                                           /*reported_ping_pong_event*/PingPongEvent::TurnFlip { player_action: *opponent_action, resulting_event: TurnFlipEvents::SuccessfulService }))
                                ],
                                TurnFlipEvents::SoftFaultService => vec![
                                    GameClientMessages::PingPongEvent(react_to_rally_event(&mut umpire,
                                                                                           "WaitingForService` or `Rally",
                                                                                           |rs| matches!(rs, GameStates::WaitingForService { attempt: _ }),
                                                                                           opponent_action,
                                                                                           /*reported_ping_pong_event*/ PingPongEvent::TurnFlip { player_action: *opponent_action, resulting_event: TurnFlipEvents::SoftFaultService }))
                                ],
                                TurnFlipEvents::SuccessfulRebate => vec![
                                    GameClientMessages::PingPongEvent(react_to_rally_event(&mut umpire,
                                                                                           "Rally",
                                                                                           |rs| matches!(rs, GameStates::Rally),
                                                                                           opponent_action,
                                                                                           /*reported_ping_pong_event*/ PingPongEvent::TurnFlip { player_action: *opponent_action, resulting_event: TurnFlipEvents::SuccessfulRebate }))
                                ],
                            }
                        },
                        PingPongEvent::HardFault { player_action: opponent_action, resulting_fault_event } => {
                            react_to_hard_fault(&mut umpire, opponent_action, resulting_fault_event).into_iter()
                                .map(GameClientMessages::PingPongEvent)
                                .collect()
                        },
                        PingPongEvent::SoftFault { player_action: opponent_action, resulting_fault_event } => {
                            react_to_service_soft_fault(&mut umpire, opponent_action, resulting_fault_event).into_iter()
                                .map(GameClientMessages::PingPongEvent)
                                .collect()
                        },
                        PingPongEvent::Score { point_winning_player, last_player_action, last_fault } => {
                            if *point_winning_player != Players::Opponent {
                                error!("TO-BE-REMOVED Unrepresentable state IN THE CLIENT: It is not up to any client ({:?}) to tell the server that a score was made", *peer);
                                vec![GameClientMessages::Quit]
                            } else {
                                // our score: opponent's hard fault
                                react_to_score(&mut umpire, last_player_action, last_fault).into_iter()
                                    .map(GameClientMessages::PingPongEvent)
                                    .collect()
                            }
                        },
                        PingPongEvent::GameOver(game_over_state) => {
                            match game_over_state {
                                GameOverStates::GracefullyEnded { final_score, last_player_action: _, last_fault: _ } => {
                                    info!("Game ended: {} Client; {} Server @ {}:{}", final_score.opponent, final_score.oneself, server_addr, port);
                                    vec![GameClientMessages::EndorsedScore]
                                }
                                GameOverStates::GameCancelled { partial_score: _, broken_rule: _ } => {
                                    vec![/* no answer */]
                                }
                            }
                        },
                    }
                },

                GameServerMessages::Error(err) => {
                    error!("Server answered with error {err:?}");
                    vec![GameClientMessages::Quit]
                },

                GameServerMessages::GoodBye | GameServerMessages::ServerShutdown => {
                    peer.cancel_and_close();
                    vec![/* no answer */]
                },
            }
        })
        .flat_map(stream::iter)
        .inspect(move |_| { cloned_self2.out_messages_count.fetch_add(1, Relaxed); })       // update the outgoing messages metrics
        .to_responsive_stream(peer_ref, |client_message, _peer| matches!(client_message, GameClientMessages::Quit | GameClientMessages::Error(..)))
        .take_while(|stop| future::ready(!stop))
    }
}