use crate::{
runtime::websocket::{WebSocket, WebSocketSender},
error::{PandaError, Result},
models::gateway::{commands::Command, events::Event, payload::Payload},
};
use std::{
convert::TryFrom,
result::Result as StdResult,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
select,
sink::SinkExt,
stream::StreamExt,
};
use async_tungstenite::tungstenite::{Error as TungsteniteError, Message as TungsteniteMessage};
type TungsteniteOptionResult = Option<StdResult<TungsteniteMessage, TungsteniteError>>;
#[allow(unused_must_use)]
pub(crate) async fn gateway_process(
ws: WebSocket,
mut to_client: UnboundedSender<Event>,
mut from_client: UnboundedReceiver<Command>,
last_sequence: Arc<AtomicU64>,
) {
let (mut ws_sender, ws_receiver) = ws.split();
let mut from_gateway = ws_receiver.fuse();
loop {
select! {
tm = from_gateway.next() => {
let last_sequence = Arc::clone(&last_sequence);
if let Err(e) = from_gateway_process(tm, &mut to_client, last_sequence).await {
log::error!("Error when receiving an event: {}", e);
match e {
PandaError::AuthenticationFailed | PandaError::ConnectionClosed => {
to_client.send(Event::Close(e)).await.expect("EVENT CLOSE");
break;
},
_ => {},
};
}
},
cmd = from_client.next() => {
let last_sequence = Arc::clone(&last_sequence);
if let Err(e) = to_gateway_process(cmd, &mut ws_sender, last_sequence).await {
log::error!("Error when sending command to gateway: {}", e);
to_client.send(Event::Close(PandaError::ConnectionClosed)).await;
break;
}
}
}
}
}
async fn from_gateway_process(
tm: TungsteniteOptionResult,
to_client: &mut UnboundedSender<Event>,
last_sequence: Arc<AtomicU64>,
) -> Result<()> {
let tm = tm.ok_or_else(|| PandaError::ConnectionClosed)?;
let msg = tm?;
let p = Payload::try_from(msg)?;
if let Some(seq) = p.s {
last_sequence.store(seq, Ordering::Relaxed);
}
let event = Event::try_from(p)?;
to_client.send(event).await.map_err(|_| PandaError::ConnectionClosed)?;
Ok(())
}
async fn to_gateway_process(
command: Option<Command>,
to_gateway: &mut WebSocketSender,
last_sequence: Arc<AtomicU64>,
) -> Result<()> {
let command = command.ok_or_else(|| PandaError::ConnectionClosed)?;
if command == Command::Close {
return to_gateway
.send(TungsteniteMessage::Close(None))
.await
.map_err(|_| PandaError::ConnectionClosed);
}
let seq = match last_sequence.load(Ordering::Relaxed) {
0 => None,
v => Some(v),
};
let tm = command.to_tungstenite_message(seq);
to_gateway.send(tm).await.map_err(|_| PandaError::ConnectionClosed)?;
Ok(())
}