use std::{sync::Arc, time::Duration};
use tokio::sync::RwLock;
#[cfg(feature = "mqtt")]
use {
crate::client::node_api::mqtt::{BrokerOptions, MqttEvent, TopicHandlerMap},
rumqttc::AsyncClient as MqttClient,
tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender},
};
#[cfg(not(target_family = "wasm"))]
use super::request_pool::RequestPool;
#[cfg(target_family = "wasm")]
use crate::client::constants::CACHE_NETWORK_INFO_TIMEOUT_IN_SECONDS;
use crate::{
client::{
builder::{ClientBuilder, NetworkInfo},
error::Result,
node_manager::NodeManager,
Error,
},
types::block::{address::Hrp, output::RentStructure, protocol::ProtocolParameters},
};
#[derive(Clone)]
pub struct Client {
pub(crate) inner: Arc<ClientInner>,
#[cfg(not(target_family = "wasm"))]
pub(crate) _sync_handle: Arc<RwLock<SyncHandle>>,
}
impl core::ops::Deref for Client {
type Target = ClientInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct ClientInner {
pub(crate) node_manager: RwLock<NodeManager>,
pub(crate) network_info: RwLock<NetworkInfo>,
pub(crate) api_timeout: RwLock<Duration>,
pub(crate) remote_pow_timeout: RwLock<Duration>,
#[cfg(not(target_family = "wasm"))]
pub(crate) pow_worker_count: RwLock<Option<usize>>,
#[cfg(feature = "mqtt")]
pub(crate) mqtt: MqttInner,
#[cfg(target_family = "wasm")]
pub(crate) last_sync: tokio::sync::Mutex<Option<u32>>,
#[cfg(not(target_family = "wasm"))]
pub(crate) request_pool: RequestPool,
}
#[cfg(not(target_family = "wasm"))]
#[derive(Default)]
pub(crate) struct SyncHandle(pub(crate) Option<tokio::task::JoinHandle<()>>);
#[cfg(not(target_family = "wasm"))]
impl Drop for SyncHandle {
fn drop(&mut self) {
if let Some(sync_handle) = self.0.take() {
sync_handle.abort();
}
}
}
#[cfg(feature = "mqtt")]
pub(crate) struct MqttInner {
pub(crate) client: RwLock<Option<MqttClient>>,
pub(crate) topic_handlers: RwLock<TopicHandlerMap>,
pub(crate) broker_options: RwLock<BrokerOptions>,
pub(crate) sender: RwLock<WatchSender<MqttEvent>>,
pub(crate) receiver: RwLock<WatchReceiver<MqttEvent>>,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Client");
d.field("node_manager", &self.node_manager);
#[cfg(feature = "mqtt")]
d.field("broker_options", &self.mqtt.broker_options);
d.field("network_info", &self.network_info);
#[cfg(not(target_family = "wasm"))]
d.field("request_pool", &self.request_pool);
d.finish()
}
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
}
impl ClientInner {
pub async fn get_network_info(&self) -> Result<NetworkInfo> {
#[cfg(target_family = "wasm")]
{
let current_time = crate::utils::unix_timestamp_now().as_secs() as u32;
if let Some(last_sync) = *self.last_sync.lock().await {
if current_time < last_sync {
return Ok(self.network_info.read().await.clone());
}
}
let info = self.get_info().await?.node_info;
let mut client_network_info = self.network_info.write().await;
client_network_info.protocol_parameters = info.protocol.clone();
*self.last_sync.lock().await = Some(current_time + CACHE_NETWORK_INFO_TIMEOUT_IN_SECONDS);
}
Ok(self.network_info.read().await.clone())
}
pub async fn get_protocol_parameters(&self) -> Result<ProtocolParameters> {
Ok(self.get_network_info().await?.protocol_parameters)
}
pub async fn get_protocol_version(&self) -> Result<u8> {
Ok(self.get_network_info().await?.protocol_parameters.protocol_version())
}
pub async fn get_network_name(&self) -> Result<String> {
Ok(self.get_network_info().await?.protocol_parameters.network_name().into())
}
pub async fn get_network_id(&self) -> Result<u64> {
Ok(self.get_network_info().await?.protocol_parameters.network_id())
}
pub async fn get_bech32_hrp(&self) -> Result<Hrp> {
Ok(*self.get_network_info().await?.protocol_parameters.bech32_hrp())
}
pub async fn get_min_pow_score(&self) -> Result<u32> {
Ok(self.get_network_info().await?.protocol_parameters.min_pow_score())
}
pub async fn get_below_max_depth(&self) -> Result<u8> {
Ok(self.get_network_info().await?.protocol_parameters.below_max_depth())
}
pub async fn get_rent_structure(&self) -> Result<RentStructure> {
Ok(*self.get_network_info().await?.protocol_parameters.rent_structure())
}
pub async fn get_token_supply(&self) -> Result<u64> {
Ok(self.get_network_info().await?.protocol_parameters.token_supply())
}
pub async fn get_tips_interval(&self) -> u64 {
self.network_info.read().await.tips_interval
}
pub async fn get_local_pow(&self) -> bool {
self.network_info.read().await.local_pow
}
pub(crate) async fn get_timeout(&self) -> Duration {
*self.api_timeout.read().await
}
pub(crate) async fn get_remote_pow_timeout(&self) -> Duration {
*self.remote_pow_timeout.read().await
}
pub async fn get_fallback_to_local_pow(&self) -> bool {
self.network_info.read().await.fallback_to_local_pow
}
pub async fn bech32_hrp_matches(&self, bech32_hrp: &Hrp) -> Result<()> {
let expected = self.get_bech32_hrp().await?;
if bech32_hrp != &expected {
return Err(Error::Bech32HrpMismatch {
provided: bech32_hrp.to_string(),
expected: expected.to_string(),
});
};
Ok(())
}
#[cfg(not(target_family = "wasm"))]
pub async fn resize_request_pool(&self, new_size: usize) {
self.request_pool.resize(new_size).await;
}
}