use std::{
sync::{Arc, RwLock},
time::Duration,
};
#[cfg(not(target_family = "wasm"))]
use tokio::runtime::Runtime;
#[cfg(feature = "mqtt")]
use {
crate::client::node_api::mqtt::{BrokerOptions, MqttEvent, TopicHandlerMap},
rumqttc::AsyncClient as MqttClient,
tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender},
};
#[cfg(target_family = "wasm")]
use crate::client::constants::CACHE_NETWORK_INFO_TIMEOUT_IN_SECONDS;
use crate::{
client::{
builder::{ClientBuilder, NetworkInfo},
constants::DEFAULT_TIPS_INTERVAL,
error::Result,
Error,
},
types::block::{output::RentStructure, protocol::ProtocolParameters},
};
#[derive(Clone)]
pub struct Client {
#[allow(dead_code)]
#[cfg(not(target_family = "wasm"))]
pub(crate) runtime: Option<Arc<Runtime>>,
pub(crate) node_manager: crate::client::node_manager::NodeManager,
#[cfg(not(target_family = "wasm"))]
pub(crate) sync_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
#[cfg(feature = "mqtt")]
pub(crate) mqtt_client: Arc<tokio::sync::RwLock<Option<MqttClient>>>,
#[cfg(feature = "mqtt")]
pub(crate) mqtt_topic_handlers: Arc<tokio::sync::RwLock<TopicHandlerMap>>,
#[cfg(feature = "mqtt")]
pub(crate) broker_options: BrokerOptions,
#[cfg(feature = "mqtt")]
pub(crate) mqtt_event_channel: (Arc<WatchSender<MqttEvent>>, WatchReceiver<MqttEvent>),
pub(crate) network_info: Arc<RwLock<NetworkInfo>>,
pub(crate) api_timeout: Duration,
pub(crate) remote_pow_timeout: Duration,
#[allow(dead_code)] pub(crate) pow_worker_count: Option<usize>,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Client");
d.field("node_manager", &self.node_manager);
#[cfg(feature = "mqtt")]
d.field("broker_options", &self.broker_options);
d.field("network_info", &self.network_info).finish()
}
}
impl Drop for Client {
fn drop(&mut self) {
#[cfg(not(target_family = "wasm"))]
if let Some(sync_handle) = self.sync_handle.take() {
if let Ok(sync_handle) = Arc::try_unwrap(sync_handle) {
sync_handle.abort();
}
}
#[cfg(not(target_family = "wasm"))]
if let Some(runtime) = self.runtime.take() {
if let Ok(runtime) = Arc::try_unwrap(runtime) {
runtime.shutdown_background();
}
}
#[cfg(feature = "mqtt")]
let mqtt_client = self.mqtt_client.clone();
#[cfg(feature = "mqtt")]
std::thread::spawn(move || {
crate::client::async_runtime::block_on(async move {
if let Some(mqtt_client) = mqtt_client.write().await.take() {
mqtt_client.disconnect().await.unwrap();
}
});
})
.join()
.unwrap();
}
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub async fn get_network_info(&self) -> Result<NetworkInfo> {
#[cfg(target_family = "wasm")]
{
lazy_static::lazy_static! {
static ref LAST_SYNC: std::sync::Mutex<Option<u32>> = std::sync::Mutex::new(None);
};
let current_time = crate::utils::unix_timestamp_now().as_secs() as u32;
if let Some(last_sync) = *LAST_SYNC.lock().unwrap() {
if current_time < last_sync {
return Ok(self
.network_info
.read()
.map_err(|_| crate::client::Error::PoisonError)?
.clone());
}
}
let info = self.get_info().await?.node_info;
let mut client_network_info = self
.network_info
.write()
.map_err(|_| crate::client::Error::PoisonError)?;
client_network_info.protocol_parameters = info.protocol.try_into()?;
*LAST_SYNC.lock().unwrap() = Some(current_time + CACHE_NETWORK_INFO_TIMEOUT_IN_SECONDS);
}
Ok(self
.network_info
.read()
.map_err(|_| crate::client::Error::PoisonError)?
.clone())
}
pub async fn get_protocol_parameters(&self) -> Result<ProtocolParameters> {
Ok(self.get_network_info().await?.protocol_parameters)
}
pub async fn get_protocol_version(&self) -> Result<u8> {
Ok(self.get_network_info().await?.protocol_parameters.protocol_version())
}
pub async fn get_network_name(&self) -> Result<String> {
Ok(self.get_network_info().await?.protocol_parameters.network_name().into())
}
pub async fn get_network_id(&self) -> Result<u64> {
Ok(self.get_network_info().await?.protocol_parameters.network_id())
}
pub async fn get_bech32_hrp(&self) -> Result<String> {
Ok(self.get_network_info().await?.protocol_parameters.bech32_hrp().into())
}
pub async fn get_min_pow_score(&self) -> Result<u32> {
Ok(self.get_network_info().await?.protocol_parameters.min_pow_score())
}
pub async fn get_below_max_depth(&self) -> Result<u8> {
Ok(self.get_network_info().await?.protocol_parameters.below_max_depth())
}
pub async fn get_rent_structure(&self) -> Result<RentStructure> {
Ok(*self.get_network_info().await?.protocol_parameters.rent_structure())
}
pub async fn get_token_supply(&self) -> Result<u64> {
Ok(self.get_network_info().await?.protocol_parameters.token_supply())
}
pub fn get_tips_interval(&self) -> u64 {
self.network_info
.read()
.map_or(DEFAULT_TIPS_INTERVAL, |info| info.tips_interval)
}
pub fn get_local_pow(&self) -> bool {
self.network_info
.read()
.map_or(NetworkInfo::default().local_pow, |info| info.local_pow)
}
pub(crate) fn get_timeout(&self) -> Duration {
self.api_timeout
}
pub(crate) fn get_remote_pow_timeout(&self) -> Duration {
self.remote_pow_timeout
}
pub fn get_fallback_to_local_pow(&self) -> bool {
self.network_info
.read()
.map_or(NetworkInfo::default().fallback_to_local_pow, |info| {
info.fallback_to_local_pow
})
}
pub async fn bech32_hrp_matches(&self, bech32_hrp: &str) -> Result<()> {
let expected = self.get_bech32_hrp().await?;
if bech32_hrp != expected {
return Err(Error::InvalidBech32Hrp {
provided: bech32_hrp.to_string(),
expected,
});
};
Ok(())
}
}