1use crate::{
6 api::*,
7 builder::{ClientBuilder, NetworkInfo, GET_API_TIMEOUT},
8 error::*,
9 node::*,
10 node_manager::Node,
11};
12use bee_common::packable::Packable;
13use bee_message::{
14 constants::INPUT_OUTPUT_COUNT_MAX,
15 payload::Payload,
16 prelude::{
17 Address, Ed25519Address, Message, MessageBuilder, MessageId, Parents, TransactionId, UtxoInput,
18 ED25519_ADDRESS_LENGTH,
19 },
20};
21use bee_pow::providers::{
22 miner::{MinerBuilder, MinerCancel},
23 NonceProvider, NonceProviderBuilder,
24};
25use bee_rest_api::types::{
26 body::SuccessBody,
27 dtos::{LedgerInclusionStateDto, MessageDto, PeerDto, ReceiptDto},
28 responses::{
29 BalanceAddressResponse, InfoResponse as NodeInfo, MessageResponse, MilestoneResponse as MilestoneResponseDto,
30 OutputResponse, PeersResponse, ReceiptsResponse, SubmitMessageResponse, TipsResponse, TreasuryResponse,
31 UtxoChangesResponse as MilestoneUTXOChanges,
32 },
33};
34use crypto::{
35 hashes::{blake2b::Blake2b256, Digest},
36 keys::{
37 bip39::{mnemonic_to_seed, wordlist},
38 slip10::Seed,
39 },
40 utils,
41};
42
43use zeroize::Zeroize;
44
45use crate::builder::TIPS_INTERVAL;
46#[cfg(feature = "wasm")]
47use gloo_timers::future::TimeoutFuture;
48#[cfg(feature = "mqtt")]
49use rumqttc::AsyncClient as MqttClient;
50#[cfg(feature = "mqtt")]
51use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
52#[cfg(not(feature = "wasm"))]
53use tokio::{
54 runtime::Runtime,
55 sync::broadcast::{Receiver, Sender},
56 time::{sleep, Duration as TokioDuration},
57};
58use url::Url;
59
60use std::{
61 collections::{HashMap, HashSet},
62 convert::{TryFrom, TryInto},
63 hash::Hash,
64 ops::Range,
65 str::FromStr,
66 sync::{Arc, RwLock},
67 time::Duration,
68};
69
70const RESPONSE_MAX_OUTPUTS: usize = 1000;
71const DUST_THRESHOLD: u64 = 1_000_000;
72
73#[derive(Debug, Serialize, Deserialize)]
75pub struct NodeInfoWrapper {
76 pub nodeinfo: NodeInfo,
78 pub url: String,
80}
81
82#[derive(Debug, Serialize, Clone, Copy)]
83pub struct MilestoneResponse {
85 pub index: u32,
87 #[serde(rename = "messageId")]
89 pub message_id: MessageId,
90 pub timestamp: u64,
92}
93
94#[cfg(feature = "mqtt")]
95type TopicHandler = Box<dyn Fn(&TopicEvent) + Send + Sync>;
96#[cfg(feature = "mqtt")]
97pub(crate) type TopicHandlerMap = HashMap<Topic, Vec<Arc<TopicHandler>>>;
98
99#[cfg(feature = "mqtt")]
101#[derive(Debug, Clone, serde::Serialize)]
102pub struct TopicEvent {
103 pub topic: String,
105 pub payload: String,
107}
108
109#[cfg(feature = "mqtt")]
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum MqttEvent {
113 Connected,
115 Disconnected,
117}
118
119#[cfg(feature = "mqtt")]
121#[derive(Debug, Clone, serde::Deserialize)]
122pub struct BrokerOptions {
123 #[serde(default = "default_broker_automatic_disconnect", rename = "automaticDisconnect")]
124 pub(crate) automatic_disconnect: bool,
125 #[serde(default = "default_broker_timeout")]
126 pub(crate) timeout: Duration,
127 #[serde(default = "default_broker_use_ws", rename = "useWs")]
128 pub(crate) use_ws: bool,
129 #[serde(default = "default_broker_port")]
130 pub(crate) port: u16,
131 #[serde(default = "default_max_reconnection_attempts", rename = "maxReconnectionAttempts")]
132 pub(crate) max_reconnection_attempts: usize,
133}
134
135#[cfg(feature = "mqtt")]
136fn default_broker_automatic_disconnect() -> bool {
137 true
138}
139
140#[cfg(feature = "mqtt")]
141fn default_broker_timeout() -> Duration {
142 Duration::from_secs(30)
143}
144#[cfg(feature = "mqtt")]
145fn default_broker_use_ws() -> bool {
146 true
147}
148
149#[cfg(feature = "mqtt")]
150fn default_broker_port() -> u16 {
151 1883
152}
153
154#[cfg(feature = "mqtt")]
155fn default_max_reconnection_attempts() -> usize {
156 0
157}
158
159#[cfg(feature = "mqtt")]
160impl Default for BrokerOptions {
161 fn default() -> Self {
162 Self {
163 automatic_disconnect: default_broker_automatic_disconnect(),
164 timeout: default_broker_timeout(),
165 use_ws: default_broker_use_ws(),
166 port: default_broker_port(),
167 max_reconnection_attempts: default_max_reconnection_attempts(),
168 }
169 }
170}
171
172#[cfg(feature = "mqtt")]
173impl BrokerOptions {
174 pub fn new() -> Self {
176 Default::default()
177 }
178
179 pub fn automatic_disconnect(mut self, automatic_disconnect: bool) -> Self {
181 self.automatic_disconnect = automatic_disconnect;
182 self
183 }
184
185 pub fn timeout(mut self, timeout: Duration) -> Self {
187 self.timeout = timeout;
188 self
189 }
190
191 pub fn use_ws(mut self, use_ws: bool) -> Self {
193 self.use_ws = use_ws;
194 self
195 }
196
197 pub fn port(mut self, port: u16) -> Self {
199 self.port = port;
200 self
201 }
202
203 pub fn max_reconnection_attempts(mut self, max_reconnection_attempts: usize) -> Self {
205 self.max_reconnection_attempts = max_reconnection_attempts;
206 self
207 }
208}
209
210#[derive(Default)]
212pub struct ClientMinerBuilder {
213 local_pow: bool,
214 cancel: MinerCancel,
215}
216
217impl ClientMinerBuilder {
218 pub fn with_local_pow(mut self, value: bool) -> Self {
220 self.local_pow = value;
221 self
222 }
223 pub fn with_cancel(mut self, cancel: MinerCancel) -> Self {
225 self.cancel = cancel;
226 self
227 }
228}
229
230impl NonceProviderBuilder for ClientMinerBuilder {
231 type Provider = ClientMiner;
232
233 fn new() -> Self {
234 Self::default()
235 }
236
237 fn finish(self) -> ClientMiner {
238 ClientMiner {
239 local_pow: self.local_pow,
240 cancel: self.cancel,
241 }
242 }
243}
244
245pub struct ClientMiner {
247 local_pow: bool,
248 cancel: MinerCancel,
249}
250
251impl NonceProvider for ClientMiner {
252 type Builder = ClientMinerBuilder;
253 type Error = crate::Error;
254
255 fn nonce(&self, bytes: &[u8], target_score: f64) -> std::result::Result<u64, Self::Error> {
256 if self.local_pow {
257 MinerBuilder::new()
258 .with_num_workers(num_cpus::get())
259 .with_cancel(self.cancel.clone())
260 .finish()
261 .nonce(bytes, target_score)
262 .map_err(|e| crate::Error::Pow(e.to_string()))
263 } else {
264 Ok(0)
265 }
266 }
267}
268
269#[derive(Clone, Eq, PartialEq, Hash)]
271pub enum Api {
272 GetHealth,
274 GetInfo,
276 GetPeers,
278 GetTips,
280 PostMessage,
282 PostMessageWithRemotePow,
284 GetOutput,
286 GetMilestone,
288 GetMessage,
290 GetBalance,
292}
293
294impl FromStr for Api {
295 type Err = String;
296
297 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
298 let t = match s {
299 "GetHealth" => Self::GetHealth,
300 "GetInfo" => Self::GetInfo,
301 "GetPeers" => Self::GetPeers,
302 "GetTips" => Self::GetTips,
303 "PostMessage" => Self::PostMessage,
304 "PostMessageWithRemotePow" => Self::PostMessageWithRemotePow,
305 "GetOutput" => Self::GetOutput,
306 "GetMilestone" => Self::GetMilestone,
307 "GetMessage" => Self::GetMessage,
308 "GetBalance" => Self::GetBalance,
309 _ => return Err(format!("unknown api kind `{s}`")),
310 };
311 Ok(t)
312 }
313}
314
315#[cfg_attr(feature = "wasm", derive(Clone))]
317pub struct Client {
318 #[allow(dead_code)]
319 #[cfg(not(feature = "wasm"))]
320 pub(crate) runtime: Option<Runtime>,
321 pub(crate) node_manager: crate::node_manager::NodeManager,
323 #[cfg(not(feature = "wasm"))]
325 pub(crate) sync_kill_sender: Option<Arc<Sender<()>>>,
326 #[cfg(feature = "mqtt")]
328 pub(crate) mqtt_client: Option<MqttClient>,
329 #[cfg(feature = "mqtt")]
330 pub(crate) mqtt_topic_handlers: Arc<tokio::sync::RwLock<TopicHandlerMap>>,
331 #[cfg(feature = "mqtt")]
332 pub(crate) broker_options: BrokerOptions,
333 #[cfg(feature = "mqtt")]
334 pub(crate) mqtt_event_channel: (Arc<WatchSender<MqttEvent>>, WatchReceiver<MqttEvent>),
335 pub(crate) network_info: Arc<RwLock<NetworkInfo>>,
336 pub(crate) request_timeout: Duration,
338 pub(crate) api_timeout: HashMap<Api, Duration>,
340}
341
342impl std::fmt::Debug for Client {
343 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
344 let mut d = f.debug_struct("Client");
345 d.field("node_manager", &self.node_manager);
346 #[cfg(feature = "mqtt")]
347 d.field("broker_options", &self.broker_options);
348 d.field("network_info", &self.network_info).finish()
349 }
350}
351
352impl Drop for Client {
353 fn drop(&mut self) {
355 #[cfg(not(feature = "wasm"))]
356 if let Some(sender) = self.sync_kill_sender.take() {
357 sender.send(()).expect("failed to stop syncing process");
358 }
359
360 #[cfg(not(feature = "wasm"))]
361 if let Some(runtime) = self.runtime.take() {
362 runtime.shutdown_background();
363 }
364
365 #[cfg(feature = "mqtt")]
366 if let Some(mqtt_client) = self.mqtt_client.take() {
367 std::thread::spawn(move || {
368 let _ = crate::async_runtime::block_on(mqtt_client.cancel());
371 })
372 .join()
373 .unwrap();
374 }
375 }
376}
377
378impl Client {
379 pub fn builder() -> ClientBuilder {
381 ClientBuilder::new()
382 }
383
384 #[cfg(not(feature = "wasm"))]
386 pub(crate) fn start_sync_process(
387 runtime: &Runtime,
388 sync: Arc<RwLock<HashSet<Node>>>,
389 nodes: HashSet<Node>,
390 node_sync_interval: Duration,
391 network_info: Arc<RwLock<NetworkInfo>>,
392 mut kill: Receiver<()>,
393 ) {
394 let node_sync_interval =
395 TokioDuration::from_nanos(node_sync_interval.as_nanos().try_into().unwrap_or(TIPS_INTERVAL));
396
397 runtime.spawn(async move {
398 loop {
399 tokio::select! {
400 _ = async {
401 sleep(node_sync_interval).await;
404 Client::sync_nodes(&sync, &nodes, &network_info).await;
405 } => {}
406 _ = kill.recv() => {}
407 }
408 }
409 });
410 }
411
412 #[cfg(not(feature = "wasm"))]
413 pub(crate) async fn sync_nodes(
414 sync: &Arc<RwLock<HashSet<Node>>>,
415 nodes: &HashSet<Node>,
416 network_info: &Arc<RwLock<NetworkInfo>>,
417 ) {
418 let mut synced_nodes = HashSet::new();
419 let mut network_nodes: HashMap<String, Vec<(NodeInfo, Node)>> = HashMap::new();
420 for node in nodes {
421 if let Ok(info) = Client::get_node_info(node.url.as_ref(), None, None).await {
423 if info.is_healthy {
424 match network_nodes.get_mut(&info.network_id) {
425 Some(network_id_entry) => {
426 network_id_entry.push((info, node.clone()));
427 }
428 None => match &network_info
429 .read()
430 .map_or(NetworkInfo::default().network, |info| info.network.clone())
431 {
432 Some(id) => {
433 if info.network_id.contains(id) {
434 network_nodes.insert(info.network_id.clone(), vec![(info, node.clone())]);
435 }
436 }
437 None => {
438 network_nodes.insert(info.network_id.clone(), vec![(info, node.clone())]);
439 }
440 },
441 }
442 }
443 }
444 }
445 let mut most_nodes = ("network_id", 0);
447 for (network_id, node) in network_nodes.iter() {
448 if node.len() > most_nodes.1 {
449 most_nodes.0 = network_id;
450 most_nodes.1 = node.len();
451 }
452 }
453 if let Some(nodes) = network_nodes.get(most_nodes.0) {
454 for (info, node_url) in nodes.iter() {
455 if let Ok(mut client_network_info) = network_info.write() {
456 client_network_info.network_id = hash_network(&info.network_id).ok();
457 client_network_info.min_pow_score = info.min_pow_score;
458 client_network_info.bech32_hrp = info.bech32_hrp.clone();
459 if !client_network_info.local_pow {
460 if info.features.contains(&"PoW".to_string()) {
461 synced_nodes.insert(node_url.clone());
462 }
463 } else {
464 synced_nodes.insert(node_url.clone());
465 }
466 }
467 }
468 }
469
470 if let Ok(mut sync) = sync.write() {
472 *sync = synced_nodes;
473 }
474 }
475
476 pub async fn get_node(&self) -> Result<Node> {
478 if let Some(primary_node) = &self.node_manager.primary_node {
479 return Ok(primary_node.clone());
480 }
481 let pool = self.node_manager.nodes.clone();
482 pool.into_iter().next().ok_or(Error::SyncedNodePoolEmpty)
483 }
484
485 pub async fn get_network_id(&self) -> Result<u64> {
487 let network_info = self.get_network_info().await?;
488 network_info
489 .network_id
490 .ok_or(Error::MissingParameter("Missing network id."))
491 }
492
493 pub async fn get_pow_provider(&self) -> ClientMiner {
495 ClientMinerBuilder::new()
496 .with_local_pow(self.get_local_pow().await)
497 .finish()
498 }
499
500 pub async fn get_network_info(&self) -> Result<NetworkInfo> {
503 let not_synced = self.network_info.read().map_or(true, |info| info.network_id.is_none());
504
505 if not_synced {
506 let info = self.get_info().await?.nodeinfo;
507 let network_id = hash_network(&info.network_id).ok();
508 {
509 let mut client_network_info = self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
510 client_network_info.network_id = network_id;
511 client_network_info.min_pow_score = info.min_pow_score;
512 client_network_info.bech32_hrp = info.bech32_hrp;
513 }
514 }
515 let res = self
516 .network_info
517 .read()
518 .map_or(NetworkInfo::default(), |info| info.clone());
519 Ok(res)
520 }
521
522 pub async fn get_bech32_hrp(&self) -> Result<String> {
524 Ok(self.get_network_info().await?.bech32_hrp)
525 }
526
527 pub async fn get_min_pow_score(&self) -> Result<f64> {
529 Ok(self.get_network_info().await?.min_pow_score)
530 }
531
532 pub async fn get_tips_interval(&self) -> u64 {
534 self.network_info
535 .read()
536 .map_or(TIPS_INTERVAL, |info| info.tips_interval)
537 }
538
539 pub async fn get_local_pow(&self) -> bool {
541 self.network_info
542 .read()
543 .map_or(NetworkInfo::default().local_pow, |info| info.local_pow)
544 }
545
546 pub async fn get_fallback_to_local_pow(&self) -> bool {
548 self.network_info
549 .read()
550 .map_or(NetworkInfo::default().fallback_to_local_pow, |info| {
551 info.fallback_to_local_pow
552 })
553 }
554
555 #[cfg(not(feature = "wasm"))]
557 pub async fn unsynced_nodes(&self) -> HashSet<&Node> {
558 self.node_manager.synced_nodes.read().map_or(HashSet::new(), |synced| {
559 self.node_manager
560 .nodes
561 .iter()
562 .filter(|node| !synced.contains(node))
563 .collect()
564 })
565 }
566
567 pub fn generate_mnemonic() -> Result<String> {
569 let mut entropy = [0u8; 32];
570 utils::rand::fill(&mut entropy)?;
571 let mnemonic = wordlist::encode(&entropy, &crypto::keys::bip39::wordlist::ENGLISH)
572 .map_err(|e| crate::Error::MnemonicError(format!("{e:?}")))?;
573 entropy.zeroize();
574 Ok(mnemonic)
575 }
576
577 pub fn mnemonic_to_hex_seed(mnemonic: &str) -> Result<String> {
579 let mnemonic = mnemonic.trim();
581 crypto::keys::bip39::wordlist::verify(mnemonic, &crypto::keys::bip39::wordlist::ENGLISH)
583 .map_err(|e| crate::Error::InvalidMnemonic(format!("{e:?}")))?;
584 let mut mnemonic_seed = [0u8; 64];
585 mnemonic_to_seed(mnemonic, "", &mut mnemonic_seed);
586 Ok(hex::encode(mnemonic_seed))
587 }
588
589 pub async fn find_inputs(&self, addresses: Vec<String>, amount: u64) -> Result<Vec<UtxoInput>> {
591 let mut available_outputs = Vec::new();
593 for address in addresses {
594 available_outputs.extend_from_slice(&self.get_address().outputs(&address, Default::default()).await?);
595 }
596
597 let mut signature_locked_outputs = Vec::new();
598 let mut dust_allowance_outputs = Vec::new();
599
600 for output in available_outputs.into_iter() {
601 let output_data = self.get_output(&output).await?;
602 let (amount, _, signature_locked) =
603 ClientMessageBuilder::get_output_amount_and_address(&output_data.output)?;
604 if signature_locked {
605 signature_locked_outputs.push((output, amount));
606 } else {
607 dust_allowance_outputs.push((output, amount));
608 }
609 }
610 signature_locked_outputs.sort_by(|l, r| r.1.cmp(&l.1));
611 dust_allowance_outputs.sort_by(|l, r| r.1.cmp(&l.1));
612
613 let mut total_already_spent = 0;
614 let mut selected_inputs = Vec::new();
615 for (_offset, output_wrapper) in signature_locked_outputs
616 .into_iter()
617 .chain(dust_allowance_outputs.into_iter())
618 .take(INPUT_OUTPUT_COUNT_MAX)
620 .enumerate()
621 {
622 if total_already_spent == amount || total_already_spent >= amount + DUST_THRESHOLD {
624 break;
625 }
626 selected_inputs.push(output_wrapper.0.clone());
627 total_already_spent += output_wrapper.1;
628 }
629
630 if total_already_spent < amount
631 || (total_already_spent != amount && total_already_spent < amount + DUST_THRESHOLD)
632 {
633 return Err(crate::Error::NotEnoughBalance(total_already_spent, amount));
634 }
635
636 Ok(selected_inputs)
637 }
638
639 #[cfg(feature = "mqtt")]
645 pub fn subscriber(&mut self) -> MqttManager<'_> {
646 MqttManager::new(self)
647 }
648
649 #[cfg(feature = "mqtt")]
651 pub fn mqtt_event_receiver(&self) -> WatchReceiver<MqttEvent> {
652 self.mqtt_event_channel.1.clone()
653 }
654
655 pub(crate) fn get_timeout(&self, api: Api) -> Duration {
660 *self.api_timeout.get(&api).unwrap_or(&self.request_timeout)
661 }
662
663 pub async fn get_node_health(url: &str) -> Result<bool> {
665 let mut url = Url::parse(url)?;
666 url.set_path("health");
667 let status = crate::node_manager::HttpClient::new()
668 .get(Node { url, jwt: None }, GET_API_TIMEOUT)
669 .await?
670 .status();
671 match status {
672 200 => Ok(true),
673 _ => Ok(false),
674 }
675 }
676
677 pub async fn get_health(&self) -> Result<bool> {
679 let mut node = self.get_node().await?;
680 node.url.set_path("health");
681 let status = self.node_manager.http_client.get(node, GET_API_TIMEOUT).await?.status();
682 match status {
683 200 => Ok(true),
684 _ => Ok(false),
685 }
686 }
687
688 pub async fn get_node_info(
690 url: &str,
691 jwt: Option<String>,
692 auth_name_pwd: Option<(&str, &str)>,
693 ) -> Result<NodeInfo> {
694 let mut url = crate::node_manager::validate_url(Url::parse(url)?)?;
695 if let Some((name, password)) = auth_name_pwd {
696 url.set_username(name)
697 .map_err(|_| crate::Error::UrlAuthError("username".to_string()))?;
698 url.set_password(Some(password))
699 .map_err(|_| crate::Error::UrlAuthError("password".to_string()))?;
700 }
701
702 let path = "api/v1/info";
703 url.set_path(path);
704
705 let resp: SuccessBody<NodeInfo> = crate::node_manager::HttpClient::new()
706 .get(Node { url, jwt }, GET_API_TIMEOUT)
707 .await?
708 .json()
709 .await?;
710
711 Ok(resp.data)
712 }
713
714 pub async fn get_info(&self) -> Result<NodeInfoWrapper> {
716 let path = "api/v1/info";
717
718 let resp: NodeInfoWrapper = self
719 .node_manager
720 .get_request(path, None, self.get_timeout(Api::GetInfo))
721 .await?;
722
723 Ok(resp)
724 }
725
726 pub async fn get_peers(&self) -> Result<Vec<PeerDto>> {
728 let path = "api/v1/peers";
729
730 let resp: SuccessBody<PeersResponse> = self
731 .node_manager
732 .get_request(path, None, self.get_timeout(Api::GetPeers))
733 .await?;
734
735 Ok(resp.data.0)
736 }
737
738 pub async fn get_tips(&self) -> Result<Vec<MessageId>> {
740 let path = "api/v1/tips";
741
742 let resp: SuccessBody<TipsResponse> = self
743 .node_manager
744 .get_request(path, None, self.get_timeout(Api::GetTips))
745 .await?;
746
747 let mut tips = Vec::new();
748 for tip in resp.data.tip_message_ids {
749 let mut new_tip = [0u8; 32];
750 hex::decode_to_slice(tip, &mut new_tip)?;
751 tips.push(MessageId::from(new_tip));
752 }
753 Ok(tips)
754 }
755
756 pub async fn post_message(&self, message: &Message) -> Result<MessageId> {
758 let path = "api/v1/messages";
759 let local_pow = self.get_local_pow().await;
760 let timeout = if local_pow {
761 self.get_timeout(Api::PostMessage)
762 } else {
763 self.get_timeout(Api::PostMessageWithRemotePow)
764 };
765
766 let resp: SuccessBody<SubmitMessageResponse> = match self
768 .node_manager
769 .post_request_bytes(path, timeout, &message.pack_new(), local_pow)
770 .await
771 {
772 Ok(res) => res,
773 Err(e) => {
774 if let Error::NodeError(e) = e {
775 let fallback_to_local_pow = self.get_fallback_to_local_pow().await;
776 if (e == *"No available nodes with remote PoW"
778 || e.contains("proof of work is not enabled")
779 || e.contains("`PoW` not enabled"))
780 && fallback_to_local_pow
781 {
782 {
785 let mut client_network_info =
786 self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
787 client_network_info.local_pow = true;
789 }
790 #[cfg(not(feature = "wasm"))]
791 let msg_res = crate::api::finish_pow(self, message.payload().clone()).await;
792 #[cfg(feature = "wasm")]
793 let msg_res = {
794 let min_pow_score = self.get_min_pow_score().await?;
795 let network_id = self.get_network_id().await?;
796 crate::api::finish_single_thread_pow(
797 self,
798 network_id,
799 None,
800 message.payload().clone(),
801 min_pow_score,
802 )
803 .await
804 };
805 let message_with_local_pow = match msg_res {
806 Ok(msg) => {
807 let mut client_network_info =
809 self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
810 client_network_info.local_pow = false;
811 msg
812 }
813 Err(e) => {
814 let mut client_network_info =
816 self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
817 client_network_info.local_pow = false;
818 return Err(e);
819 }
820 };
821 self.node_manager
822 .post_request_bytes(path, timeout, &message_with_local_pow.pack_new(), true)
823 .await?
824 } else {
825 return Err(Error::NodeError(e));
826 }
827 } else {
828 return Err(e);
829 }
830 }
831 };
832
833 let mut message_id_bytes = [0u8; 32];
834 hex::decode_to_slice(resp.data.message_id, &mut message_id_bytes)?;
835 Ok(MessageId::from(message_id_bytes))
836 }
837
838 pub async fn post_message_json(&self, message: &Message) -> Result<MessageId> {
840 let path = "api/v1/messages";
841 let local_pow = self.get_local_pow().await;
842 let timeout = if local_pow {
843 self.get_timeout(Api::PostMessage)
844 } else {
845 self.get_timeout(Api::PostMessageWithRemotePow)
846 };
847 let message_dto = MessageDto::from(message);
848
849 let resp: SuccessBody<SubmitMessageResponse> = match self
851 .node_manager
852 .post_request_json(path, timeout, serde_json::to_value(message_dto)?, local_pow)
853 .await
854 {
855 Ok(res) => res,
856 Err(e) => {
857 if let Error::NodeError(e) = e {
858 let fallback_to_local_pow = self.get_fallback_to_local_pow().await;
859 if (e == *"No available nodes with remote PoW"
861 || e.contains("proof of work is not enabled")
862 || e.contains("`PoW` not enabled"))
863 && fallback_to_local_pow
864 {
865 {
868 let mut client_network_info =
869 self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
870 client_network_info.local_pow = true;
872 }
873 #[cfg(not(feature = "wasm"))]
874 let msg_res = crate::api::finish_pow(self, message.payload().clone()).await;
875 #[cfg(feature = "wasm")]
876 let msg_res = {
877 let min_pow_score = self.get_min_pow_score().await?;
878 let network_id = self.get_network_id().await?;
879 crate::api::finish_single_thread_pow(
880 self,
881 network_id,
882 None,
883 message.payload().clone(),
884 min_pow_score,
885 )
886 .await
887 };
888 let message_with_local_pow = match msg_res {
889 Ok(msg) => {
890 let mut client_network_info =
892 self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
893 client_network_info.local_pow = false;
894 msg
895 }
896 Err(e) => {
897 let mut client_network_info =
899 self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
900 client_network_info.local_pow = false;
901 return Err(e);
902 }
903 };
904 let message_dto = MessageDto::from(&message_with_local_pow);
905
906 self.node_manager
907 .post_request_json(path, timeout, serde_json::to_value(message_dto)?, true)
908 .await?
909 } else {
910 return Err(Error::NodeError(e));
911 }
912 } else {
913 return Err(e);
914 }
915 }
916 };
917
918 let mut message_id_bytes = [0u8; 32];
919 hex::decode_to_slice(resp.data.message_id, &mut message_id_bytes)?;
920 Ok(MessageId::from(message_id_bytes))
921 }
922
923 pub fn get_message(&self) -> GetMessageBuilder<'_> {
925 GetMessageBuilder::new(self)
926 }
927
928 pub async fn get_output(&self, output_id: &UtxoInput) -> Result<OutputResponse> {
931 let path = &format!(
932 "api/v1/outputs/{}{}",
933 output_id.output_id().transaction_id(),
934 hex::encode(output_id.output_id().index().to_le_bytes())
935 );
936
937 let resp: SuccessBody<OutputResponse> = self
938 .node_manager
939 .get_request(path, None, self.get_timeout(Api::GetOutput))
940 .await?;
941
942 Ok(resp.data)
943 }
944
945 pub async fn find_outputs(&self, outputs: &[UtxoInput], addresses: &[String]) -> Result<Vec<OutputResponse>> {
948 let mut output_metadata = Vec::<OutputResponse>::new();
949 let mut output_to_query = HashSet::<UtxoInput>::new();
951
952 for output in outputs {
954 output_to_query.insert(output.to_owned());
955 }
956
957 for address in addresses {
960 let address_outputs = self.get_address().outputs(address, Default::default()).await?;
961 for output in address_outputs.iter() {
962 output_to_query.insert(output.to_owned());
963 }
964 if address_outputs.len() == RESPONSE_MAX_OUTPUTS {
967 let address_dust_allowance_outputs = self
968 .get_address()
969 .outputs(
970 address,
971 OutputsOptions {
972 include_spent: false,
973 output_type: Some(OutputType::SignatureLockedDustAllowance),
974 },
975 )
976 .await?;
977 for output in address_dust_allowance_outputs.iter() {
978 output_to_query.insert(output.to_owned());
979 }
980 }
981 }
982
983 for output in output_to_query {
985 let meta_data = self.get_output(&output).await?;
986 output_metadata.push(meta_data);
987 }
988 Ok(output_metadata)
989 }
990
991 pub fn get_address(&self) -> GetAddressBuilder<'_> {
993 GetAddressBuilder::new(self)
994 }
995
996 pub async fn get_milestone(&self, index: u32) -> Result<MilestoneResponse> {
999 let path = &format!("api/v1/milestones/{index}");
1000
1001 let resp: SuccessBody<MilestoneResponseDto> = self
1002 .node_manager
1003 .get_request(path, None, self.get_timeout(Api::GetMilestone))
1004 .await?;
1005
1006 let milestone = resp.data;
1007 let mut message_id = [0u8; 32];
1008 hex::decode_to_slice(milestone.message_id, &mut message_id)?;
1009 Ok(MilestoneResponse {
1010 index: milestone.milestone_index,
1011 message_id: MessageId::new(message_id),
1012 timestamp: milestone.timestamp,
1013 })
1014 }
1015
1016 pub async fn get_milestone_utxo_changes(&self, index: u32) -> Result<MilestoneUTXOChanges> {
1019 let path = &format!("api/v1/milestones/{index}/utxo-changes");
1020
1021 let resp: SuccessBody<MilestoneUTXOChanges> = self
1022 .node_manager
1023 .get_request(path, None, self.get_timeout(Api::GetMilestone))
1024 .await?;
1025
1026 Ok(resp.data)
1027 }
1028
1029 pub async fn get_receipts(&self) -> Result<Vec<ReceiptDto>> {
1032 let path = &"api/v1/receipts";
1033
1034 let resp: SuccessBody<ReceiptsResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1035
1036 Ok(resp.data.receipts)
1037 }
1038
1039 pub async fn get_receipts_migrated_at(&self, milestone_index: u32) -> Result<Vec<ReceiptDto>> {
1042 let path = &format!("api/v1/receipts/{milestone_index}");
1043
1044 let resp: SuccessBody<ReceiptsResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1045
1046 Ok(resp.data.receipts)
1047 }
1048
1049 pub async fn get_treasury(&self) -> Result<TreasuryResponse> {
1052 let path = "api/v1/treasury";
1053
1054 let resp: SuccessBody<TreasuryResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1055
1056 Ok(resp.data)
1057 }
1058
1059 pub async fn get_included_message(&self, transaction_id: &TransactionId) -> Result<Message> {
1062 let path = &format!("api/v1/transactions/{transaction_id}/included-message");
1063
1064 let resp: SuccessBody<MessageResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1065 Ok(Message::try_from(&resp.data.0)?)
1066 }
1067 pub async fn reattach(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1070 let metadata = self.get_message().metadata(message_id).await?;
1071 if metadata.should_reattach.unwrap_or(false) {
1072 self.reattach_unchecked(message_id).await
1073 } else {
1074 Err(Error::NoNeedPromoteOrReattach(message_id.to_string()))
1075 }
1076 }
1077
1078 pub async fn reattach_unchecked(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1080 let message = self.get_message().data(message_id).await?;
1082 let reattach_message = {
1083 #[cfg(feature = "wasm")]
1084 {
1085 let network_id = self.get_network_id().await?;
1086 let mut tips = self.get_tips().await?;
1087 tips.sort_unstable_by_key(|a| a.pack_new());
1088 tips.dedup();
1089 let mut message_builder = MessageBuilder::<ClientMiner>::new()
1090 .with_network_id(network_id)
1091 .with_parents(Parents::new(tips)?);
1092 if let Some(p) = message.payload().to_owned() {
1093 message_builder = message_builder.with_payload(p)
1094 }
1095 message_builder.finish().map_err(Error::MessageError)?
1096 }
1097 #[cfg(not(feature = "wasm"))]
1098 {
1099 finish_pow(self, message.payload().to_owned()).await?
1100 }
1101 };
1102
1103 let message_id = self.post_message(&reattach_message).await?;
1105 let msg = match self.get_local_pow().await {
1107 true => reattach_message,
1108 false => self.get_message().data(&message_id).await?,
1109 };
1110 Ok((message_id, msg))
1111 }
1112
1113 pub async fn promote(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1116 let metadata = self.get_message().metadata(message_id).await?;
1117 if metadata.should_promote.unwrap_or(false) {
1118 self.promote_unchecked(message_id).await
1119 } else {
1120 Err(Error::NoNeedPromoteOrReattach(message_id.to_string()))
1121 }
1122 }
1123
1124 pub async fn promote_unchecked(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1126 let mut tips = self.get_tips().await?;
1128 let min_pow_score = self.get_min_pow_score().await?;
1129 let network_id = self.get_network_id().await?;
1130 tips.push(*message_id);
1131 tips.sort_unstable_by_key(|a| a.pack_new());
1133 tips.dedup();
1134
1135 let promote_message = MessageBuilder::<ClientMiner>::new()
1136 .with_network_id(network_id)
1137 .with_parents(Parents::new(tips)?)
1138 .with_nonce_provider(self.get_pow_provider().await, min_pow_score)
1139 .finish()
1140 .map_err(|_| Error::TransactionError)?;
1141
1142 let message_id = self.post_message(&promote_message).await?;
1143 let msg = match self.get_local_pow().await {
1145 true => promote_message,
1146 false => self.get_message().data(&message_id).await?,
1147 };
1148 Ok((message_id, msg))
1149 }
1150
1151 pub fn message(&self) -> ClientMessageBuilder<'_> {
1157 ClientMessageBuilder::new(self)
1158 }
1159
1160 pub fn get_unspent_address<'a>(&'a self, seed: &'a Seed) -> GetUnspentAddressBuilder<'a> {
1162 GetUnspentAddressBuilder::new(self, seed)
1163 }
1164
1165 pub fn get_addresses<'a>(&'a self, seed: &'a Seed) -> GetAddressesBuilder<'a> {
1167 GetAddressesBuilder::new(seed).with_client(self)
1168 }
1169
1170 pub async fn find_messages<I: AsRef<[u8]>>(
1172 &self,
1173 indexation_keys: &[I],
1174 message_ids: &[MessageId],
1175 ) -> Result<Vec<Message>> {
1176 let mut messages = Vec::new();
1177
1178 let mut message_ids_to_query = HashSet::<MessageId>::new();
1180
1181 for message_id in message_ids {
1183 message_ids_to_query.insert(message_id.to_owned());
1184 }
1185
1186 for index in indexation_keys {
1189 let message_ids = self.get_message().index(index).await?;
1190 for message_id in message_ids.iter() {
1191 message_ids_to_query.insert(message_id.to_owned());
1192 }
1193 }
1194
1195 for message_id in message_ids_to_query {
1197 let message = self.get_message().data(&message_id).await?;
1198 messages.push(message);
1199 }
1200 Ok(messages)
1201 }
1202
1203 pub fn get_balance<'a>(&'a self, seed: &'a Seed) -> GetBalanceBuilder<'a> {
1207 GetBalanceBuilder::new(self, seed)
1208 }
1209
1210 pub async fn get_address_balances(&self, addresses: &[String]) -> Result<Vec<BalanceAddressResponse>> {
1213 let mut address_balance_pairs = Vec::new();
1214 for address in addresses {
1215 let balance_response = self.get_address().balance(address).await?;
1216 address_balance_pairs.push(balance_response);
1217 }
1218 Ok(address_balance_pairs)
1219 }
1220
1221 pub fn bech32_to_hex(bech32: &str) -> crate::Result<String> {
1223 let address = Address::try_from_bech32(bech32)?;
1224 let Address::Ed25519(ed) = address;
1225 Ok(ed.to_string())
1226 }
1227
1228 pub async fn hex_to_bech32(&self, hex: &str, bech32_hrp: Option<&str>) -> crate::Result<String> {
1230 let address: Ed25519Address = hex.parse::<Ed25519Address>()?;
1231 match bech32_hrp {
1232 Some(hrp) => Ok(Address::Ed25519(address).to_bech32(hrp)),
1233 None => Ok(Address::Ed25519(address).to_bech32(self.get_bech32_hrp().await?.as_str())),
1234 }
1235 }
1236
1237 pub async fn hex_public_key_to_bech32_address(&self, hex: &str, bech32_hrp: Option<&str>) -> crate::Result<String> {
1239 let mut public_key = [0u8; ED25519_ADDRESS_LENGTH];
1240 hex::decode_to_slice(hex, &mut public_key)?;
1241
1242 let address = Blake2b256::digest(&public_key)
1243 .try_into()
1244 .map_err(|_e| Error::Blake2b256Error("Hashing the public key failed."))?;
1245 let address: Ed25519Address = Ed25519Address::new(address);
1246 match bech32_hrp {
1247 Some(hrp) => Ok(Address::Ed25519(address).to_bech32(hrp)),
1248 None => Ok(Address::Ed25519(address).to_bech32(self.get_bech32_hrp().await?.as_str())),
1249 }
1250 }
1251
1252 pub fn parse_bech32_address(address: &str) -> crate::Result<Address> {
1254 Ok(Address::try_from_bech32(address)?)
1255 }
1256
1257 pub fn is_address_valid(address: &str) -> bool {
1259 Address::try_from_bech32(address).is_ok()
1260 }
1261
1262 pub async fn retry(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1265 let message_metadata = self.get_message().metadata(message_id).await?;
1267 if message_metadata.should_promote.unwrap_or(false) {
1268 self.promote_unchecked(message_id).await
1269 } else if message_metadata.should_reattach.unwrap_or(false) {
1270 self.reattach_unchecked(message_id).await
1271 } else {
1272 Err(Error::NoNeedPromoteOrReattach(message_id.to_string()))
1273 }
1274 }
1275
1276 pub async fn retry_until_included(
1280 &self,
1281 message_id: &MessageId,
1282 interval: Option<u64>,
1283 max_attempts: Option<u64>,
1284 ) -> Result<Vec<(MessageId, Message)>> {
1285 let mut message_ids = vec![*message_id];
1287 let mut messages_with_id = Vec::new();
1289 for _ in 0..max_attempts.unwrap_or(40) {
1290 #[cfg(feature = "wasm")]
1291 {
1292 TimeoutFuture::new((interval.unwrap_or(5) * 1000).try_into().unwrap()).await;
1293 }
1294 #[cfg(not(feature = "wasm"))]
1295 sleep(Duration::from_secs(interval.unwrap_or(5))).await;
1296 let message_ids_len = message_ids.len();
1298 let mut conflicting = false;
1299 for (index, msg_id) in message_ids.clone().iter().enumerate() {
1300 let message_metadata = self.get_message().metadata(msg_id).await?;
1301 if let Some(inclusion_state) = message_metadata.ledger_inclusion_state {
1302 match inclusion_state {
1303 LedgerInclusionStateDto::Included | LedgerInclusionStateDto::NoTransaction => {
1304 if message_id == msg_id {
1306 let mut included_and_reattached_messages =
1307 vec![(*message_id, self.get_message().data(message_id).await?)];
1308 included_and_reattached_messages.extend(messages_with_id);
1309 return Ok(included_and_reattached_messages);
1310 } else {
1311 messages_with_id.rotate_left(index);
1313 return Ok(messages_with_id);
1314 }
1315 }
1316 LedgerInclusionStateDto::Conflicting => conflicting = true,
1319 };
1320 }
1321 if index == message_ids_len - 1 {
1323 if message_metadata.should_promote.unwrap_or(false) {
1324 self.promote_unchecked(message_ids.last().unwrap()).await?;
1326 } else if message_metadata.should_reattach.unwrap_or(false) {
1327 let reattached = self.reattach_unchecked(message_ids.last().unwrap()).await?;
1329 message_ids.push(reattached.0);
1330 messages_with_id.push(reattached);
1331 }
1332 }
1333 }
1334 if conflicting {
1337 let message = self.get_message().data(message_id).await?;
1338 if let Some(Payload::Transaction(transaction_payload)) = message.payload() {
1339 let included_message = self.get_included_message(&transaction_payload.id()).await?;
1340 let mut included_and_reattached_messages = vec![(included_message.id().0, included_message)];
1341 included_and_reattached_messages.extend(messages_with_id);
1342 return Ok(included_and_reattached_messages);
1343 }
1344 }
1345 }
1346 Err(Error::TangleInclusionError(message_id.to_string()))
1347 }
1348
1349 pub async fn consolidate_funds(
1352 &self,
1353 seed: &Seed,
1354 account_index: usize,
1355 address_range: Range<usize>,
1356 ) -> crate::Result<String> {
1357 crate::api::consolidate_funds(self, seed, account_index, address_range).await
1358 }
1359}
1360
1361pub fn hash_network(network_id_string: &str) -> Result<u64> {
1363 let bytes = Blake2b256::digest(network_id_string.as_bytes())[0..8]
1364 .try_into()
1365 .map_err(|_e| Error::Blake2b256Error("Hashing the network id failed."))?;
1366
1367 Ok(u64::from_le_bytes(bytes))
1368}