use super::{
types::*,
protocol_model::*,
socket_server::{SocketServer},
connection::{SocketEvent, Peer, PeerId},
};
use crate::{
Runtime,
logic::ogre_robot::{
types::*,
events::dispatcher::Dispatcher,
},
};
use std::{
sync::Arc,
collections::HashMap,
fmt::Debug,
ops::Deref,
};
use std::time::Duration;
use futures::{Stream, stream, StreamExt};
use minstant::Instant;
use tokio::sync::{RwLock, RwLockWriteGuard};
use log::{trace, warn};
#[derive(Debug)]
struct ClientState {
identification: TimeTrackedInfo<ClientIdentification>,
peer: Arc<Peer<ServerMessages, DisconnectionReason>>,
estimated_clock_skew_nanos: Option<i32>,
}
fn processor(dispatcher: Arc<Dispatcher>,
stream: impl Stream<Item = SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>>)
-> impl Stream<Item = Result<(Arc<Peer<ServerMessages, DisconnectionReason>>, ServerMessages),
(Arc<Peer<ServerMessages, DisconnectionReason>>, Box<dyn std::error::Error + Sync + Send>)>> {
let client_states: Arc<RwLock<HashMap<PeerId, ClientState>>> = Arc::new(RwLock::new(HashMap::new()));
stream
.map(move |socket_event: SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>| {
let dispatcher = Arc::clone(&dispatcher);
let client_states = Arc::clone(&client_states);
async move {
match socket_event {
SocketEvent::Incoming { peer, message: client_message } => {
let lock = client_states.read().await;
let state = lock.get(&peer.peer_id)
.map_or_else(|| Err( ( Arc::clone(&peer),
Box::from(format!("SocketServer.Processor expects incoming messages only from known clients. Peer id {}, who is not in our `client_states` hashmap, popped up from address {:?}", peer.peer_id, peer.peer_address)) ) ),
|state| Ok(state))?;
let server_message = match client_message {
ClientMessages::ClientIdentification(client_identification) => {
drop(lock);
let mut state = RwLockWriteGuard::map(client_states.write().await, |states| states.get_mut(&peer.peer_id).unwrap() );
match state.identification.set(client_identification) {
Ok(client_identification) => {
if dispatcher.register_from_client_identification(client_identification).await {
ServerMessages::None
} else {
ServerMessages::Disconnected(DisconnectionReason::Reconnected { ip: "yours".to_string() })
}
},
Err(client_identification) => {
let message = format!("Attempt to authenticate twice! Protocolar authentication was {:?}; Out of protocol attempted one is {:?}", state.identification, client_identification);
let disconnection_reason = DisconnectionReason::ProtocolOffense { message };
dispatcher.unregister_from_client_identification(&*state.identification, disconnection_reason.clone()).await;
drop(state);
client_states.write().await
.remove(&peer.peer_id);
ServerMessages::Disconnected(disconnection_reason)
}
}
},
ClientMessages::UserAuthorization(_) => ServerMessages::None,
ClientMessages::KeepAliveRequest(n) => ServerMessages::KeepAliveAnswer(n+1),
ClientMessages::KeepAliveAnswer(_) => ServerMessages::None,
ClientMessages::MarketData(market_data) => {
dispatcher.market_data(&*state.identification, market_data.into()).await;
ServerMessages::None
},
ClientMessages::ExecutedOrder { .. } => ServerMessages::None,
ClientMessages::CancelledOrder { .. } => ServerMessages::None,
ClientMessages::PendingOrder { .. } => ServerMessages::None,
ClientMessages::ChartPoints { .. } => ServerMessages::None,
ClientMessages::GoodBye(_) => ServerMessages::Disconnected(DisconnectionReason::ClientInitiated),
ClientMessages::UnknownMessage(txt) => ServerMessages::Disconnected(DisconnectionReason::ProtocolOffense {message: format!("Unknown message received: '{}'. Bailing out...", txt)}),
};
Ok(Some((peer, server_message)))
},
SocketEvent::Connected { peer } => {
client_states.write().await
.insert(peer.peer_id,
ClientState { identification: TimeTrackedInfo::Unset,
peer: Arc::clone(&peer),
estimated_clock_skew_nanos: None });
Ok(Some((peer, ServerMessages::Welcome)))
},
SocketEvent::Disconnected { peer } => {
let state = client_states.write().await
.remove(&peer.peer_id).expect("disconnected one was not present");
dispatcher.unregister_from_client_identification(&*state.identification, DisconnectionReason::ClientInitiated).await;
Ok(Some((peer, ServerMessages::None)))
},
SocketEvent::Shutdown { timeout_ms } => {
warn!("SocketServer processor: Sending goodbye message and disconnecting from {} clients", client_states.read().await.len());
for (_peer_id, client_info) in client_states.read().await.iter() {
client_info.peer.sender.send(ServerMessages::ShuttingDown).await;
client_info.peer.sender.close();
}
Ok(None)
},
}
}
})
.filter_map(|fallible_future| async {
match fallible_future.await {
Ok(optional_msg) => match optional_msg {
Some(msg) => Some(Ok(msg)),
None => None,
},
Err(err) => Some(Err(err)),
}
})
}
pub async fn sync_processors(runtime: &RwLock<Runtime>,
socket_server: &SocketServer<'static>) -> (impl Stream<Item = Result < (Arc<Peer<ServerMessages, DisconnectionReason>>, ServerMessages),
(Arc<Peer<ServerMessages, DisconnectionReason>>, Box<dyn std::error::Error + Sync + Send>) > >,
impl Fn(SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>) -> bool,
impl Fn()) {
let tokio_runtime = Arc::clone(runtime.read().await.tokio_runtime.as_ref().unwrap());
let (stream, producer, closer) = super::executor::sync_tokio_stream(tokio_runtime);
let dispatcher = Runtime::do_for_ogre_robot(runtime, |ogre_robot| Box::pin(async {Arc::clone(&ogre_robot.dispatcher)})).await;
(processor(dispatcher, stream), producer, closer)
}
pub async fn spawn_stream_executor(stream: impl Stream<Item = (Arc<Peer<ServerMessages, DisconnectionReason>>, bool)> + Send + Sync + 'static) -> tokio::task::JoinHandle<()> {
super::executor::spawn_stream_executor(stream).await
}
#[derive(Debug)]
enum TimeTrackedInfo<InfoType: Debug> {
Unset,
Set { time: Instant, info: InfoType },
}
impl<InfoType: Debug> TimeTrackedInfo<InfoType> {
pub fn new() -> Self {
Self::Unset
}
pub fn set(&mut self, info: InfoType) -> Result<&InfoType, InfoType> {
match self {
TimeTrackedInfo::Unset => Ok(self.reset(info)),
TimeTrackedInfo::Set { .. } => Err(info)
}
}
pub fn reset(&mut self, info: InfoType) -> &InfoType {
*self = Self::Set {
time: Instant::now(),
info
};
match *self {
TimeTrackedInfo::Unset => panic!("BUG! Attempt to Deref a `TimeTrackedInfo` that is still `Unset`. Please, fix your code."),
TimeTrackedInfo::Set { time: _time, ref info } => info,
}
}
}
impl<InfoType: Debug> Deref for TimeTrackedInfo<InfoType> {
type Target = InfoType;
fn deref(&self) -> &Self::Target {
match self {
TimeTrackedInfo::Unset => panic!("BUG! Attempt to Deref a `TimeTrackedInfo` that is still `Unset`. Please, fix your code."),
TimeTrackedInfo::Set { time: _time, ref info } => info,
}
}
}