use std::{str::FromStr, sync::Arc, time::Duration};
use tokio::{
net::TcpStream,
sync::{
Mutex,
broadcast::{Receiver, Sender, channel},
mpsc,
},
time::timeout,
};
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async_with_config,
tungstenite::{ClientRequestBuilder, http::Uri},
};
use crate::{ClientConfig, NotisResponse, PaleError, RPCRequest, RPCResponse, Result};
#[derive(Debug, Clone)]
pub struct Client {
pub(crate) channels: Channels,
pub(crate) client_connected: Arc<Mutex<bool>>,
pub uri: Arc<String>,
pub config: Arc<ClientConfig>,
}
#[derive(Debug, Clone)]
pub(crate) struct Channels {
pub response: Arc<Sender<RPCResponse>>,
pub notis: Arc<Sender<NotisResponse>>,
pub request: Arc<Sender<RPCRequest>>,
pub on_reconnect: Arc<Sender<Client>>,
pub on_disconnect: Arc<Sender<Client>>,
pub close: (Arc<mpsc::Sender<()>>, Arc<Mutex<mpsc::Receiver<()>>>),
}
impl Channels {
pub(crate) fn new(capacity: usize) -> Self {
let close = mpsc::channel(1);
Self {
response: Arc::new(channel::<RPCResponse>(capacity).0),
notis: Arc::new(channel::<NotisResponse>(capacity).0),
on_reconnect: Arc::new(channel::<Client>(capacity).0),
on_disconnect: Arc::new(channel::<Client>(capacity).0),
request: Arc::new(channel::<RPCRequest>(capacity).0),
close: (Arc::new(close.0), Arc::new(Mutex::new(close.1))),
}
}
}
impl Client {
pub(crate) const JSONRPC: &'static str = "2.0";
pub async fn new(uri: impl AsRef<str>, config: ClientConfig) -> Result<Self> {
let client = Self::new_without_connection(uri, config)?;
client.connect().await?;
Ok(client)
}
pub fn new_without_connection(uri: impl AsRef<str>, config: ClientConfig) -> Result<Self> {
let client = Self {
channels: Channels::new(config.channel_capacity),
client_connected: Arc::new(Mutex::new(false)),
uri: Arc::new(uri.as_ref().to_string()),
config: Arc::new(config),
};
Ok(client)
}
pub(crate) async fn connect_websocket(
uri: Arc<String>,
config: &ClientConfig,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let mut req = ClientRequestBuilder::new(Uri::from_str(uri.as_ref())?);
if let Some(bearer) = &config.bearer_token {
req = req.with_header("Authorization", format!("Bearer {bearer}"));
tracing::debug!("Added 'Authorization' to websocket connection");
}
let (ws, _) = connect_async_with_config(req, Some(config.ws_config), false).await?;
tracing::debug!(
"Connected to server '{}' ({:?})",
uri.as_ref(),
config.ws_config
);
Ok(ws)
}
pub(crate) async fn close_all_subscriptions(&self) -> Result<()> {
self.channels.notis.send(NotisResponse {
jsonrpc: Client::JSONRPC.to_string(),
method: String::default(),
params: None,
_close: Some(true),
})?;
tracing::debug!("Closed all subscriptions");
Ok(())
}
pub async fn close(&self) -> Result<()> {
Ok(self.channels.close.0.send(()).await?)
}
pub async fn is_connected(&self) -> bool {
*self.client_connected.lock().await
}
pub async fn wait_for_connection(&self, state: bool, timeout_duration: Duration) -> Result<()> {
if self.is_connected().await == state {
return Ok(());
}
let _client = self.clone();
let client_check = async {
loop {
if _client.is_connected().await == state {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
};
timeout(timeout_duration, client_check).await?;
Ok(())
}
pub async fn connect(&self) -> Result<()> {
if self.is_connected().await {
return Err(PaleError::ClientAlreadyConnected);
}
let _client = self.clone();
tokio::spawn(async move { _client.run().await });
self.wait_for_connection(true, self.config.retry_connection)
.await?;
Ok(())
}
pub fn on_reconnect(&self) -> Receiver<Client> {
self.channels.on_reconnect.subscribe()
}
pub fn on_disconnect(&self) -> Receiver<Client> {
self.channels.on_disconnect.subscribe()
}
}