1pub mod block;
2pub mod connect;
3pub mod pool;
4pub mod translate;
5
6use std::net::SocketAddr;
7use std::time::Duration;
8
9use chia::consensus::consensus_constants::ConsensusConstants;
10use chia::protocol::{
11 Bytes32, CoinStateFilters, FullBlock as ProtoFullBlock, RejectAdditionsRequest, RejectBlock,
12 RejectHeaderRequest, RejectRemovalsRequest, RequestAdditions, RequestBlock, RequestBlockHeader,
13 RequestFeeEstimates, RequestRemovals, RespondAdditions, RespondBlock, RespondBlockHeader,
14 RespondFeeEstimates, RespondRemovals, SpendBundle as ProtoBundle,
15};
16use chia_wallet_sdk::client::Peer;
17use chia_wallet_sdk::types::{MAINNET_CONSTANTS, TESTNET11_CONSTANTS};
18use tokio_tungstenite::Connector;
19
20use crate::types::*;
21use crate::NetworkType;
22use pool::PeerPool;
23
24pub struct PeerBackend {
29 pool: PeerPool,
30 network: NetworkType,
31 request_timeout: Duration,
32}
33
34impl PeerBackend {
35 pub async fn new(
36 network: crate::NetworkType,
37 tls: Connector,
38 max_peers: usize,
39 connect_timeout: Duration,
40 request_timeout: Duration,
41 ) -> Result<Self, ChiaQueryError> {
42 let pool = PeerPool::new(network, tls, max_peers, connect_timeout).await?;
43 Ok(Self {
44 pool,
45 network,
46 request_timeout,
47 })
48 }
49
50 pub fn constants(&self) -> &ConsensusConstants {
52 match self.network {
53 NetworkType::Mainnet => &MAINNET_CONSTANTS,
54 NetworkType::Testnet11 => &TESTNET11_CONSTANTS,
55 }
56 }
57
58 fn genesis_challenge(&self) -> Bytes32 {
62 self.constants().genesis_challenge
63 }
64
65 pub async fn has_peers(&self) -> bool {
66 self.pool.has_peers().await
67 }
68
69 async fn pick(&self) -> Result<(Peer, SocketAddr), ChiaQueryError> {
74 self.pool.try_refill().await;
76
77 self.pool
78 .select_peer()
79 .await
80 .ok_or_else(|| ChiaQueryError::PeerConnection("no peers available".into()))
81 }
82
83 pub async fn try_get_coin_record_by_name(
89 &self,
90 name: &str,
91 ) -> Result<CoinRecord, ChiaQueryError> {
92 let (peer, addr) = self.pick().await?;
93 let res = self.do_get_coin_record_by_name(&peer, name).await;
94 if res.is_err() {
95 self.pool.eject_peer(addr).await;
96 }
97 res
98 }
99
100 pub async fn try_get_coin_records_by_puzzle_hash(
101 &self,
102 puzzle_hash: &str,
103 start_height: Option<u32>,
104 end_height: Option<u32>,
105 include_spent: bool,
106 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
107 let (peer, addr) = self.pick().await?;
108 let res = self
109 .do_puzzle_hash_query(
110 &peer,
111 &[puzzle_hash],
112 start_height,
113 end_height,
114 include_spent,
115 false,
116 )
117 .await;
118 if res.is_err() {
119 self.pool.eject_peer(addr).await;
120 }
121 res
122 }
123
124 pub async fn try_get_coin_records_by_puzzle_hashes(
125 &self,
126 puzzle_hashes: &[String],
127 start_height: Option<u32>,
128 end_height: Option<u32>,
129 include_spent: bool,
130 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
131 let hashes: Vec<&str> = puzzle_hashes.iter().map(String::as_str).collect();
132 let (peer, addr) = self.pick().await?;
133 let res = self
134 .do_puzzle_hash_query(
135 &peer,
136 &hashes,
137 start_height,
138 end_height,
139 include_spent,
140 false,
141 )
142 .await;
143 if res.is_err() {
144 self.pool.eject_peer(addr).await;
145 }
146 res
147 }
148
149 pub async fn try_get_coin_records_by_hint(
150 &self,
151 hint: &str,
152 start_height: Option<u32>,
153 end_height: Option<u32>,
154 include_spent: bool,
155 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
156 let (peer, addr) = self.pick().await?;
157 let res = self
158 .do_puzzle_hash_query(
159 &peer,
160 &[hint],
161 start_height,
162 end_height,
163 include_spent,
164 true,
165 )
166 .await;
167 if res.is_err() {
168 self.pool.eject_peer(addr).await;
169 }
170 res
171 }
172
173 pub async fn try_get_coin_records_by_hints(
174 &self,
175 hints: &[String],
176 start_height: Option<u32>,
177 end_height: Option<u32>,
178 include_spent: bool,
179 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
180 let hs: Vec<&str> = hints.iter().map(String::as_str).collect();
181 let (peer, addr) = self.pick().await?;
182 let res = self
183 .do_puzzle_hash_query(&peer, &hs, start_height, end_height, include_spent, true)
184 .await;
185 if res.is_err() {
186 self.pool.eject_peer(addr).await;
187 }
188 res
189 }
190
191 pub async fn try_get_coin_records_by_names(
192 &self,
193 names: &[String],
194 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
195 let (peer, addr) = self.pick().await?;
196 let res = self.do_coin_ids_query(&peer, names).await;
197 if res.is_err() {
198 self.pool.eject_peer(addr).await;
199 }
200 res
201 }
202
203 pub async fn try_get_puzzle_and_solution(
204 &self,
205 coin_id: &str,
206 height: u32,
207 ) -> Result<CoinSpend, ChiaQueryError> {
208 let (peer, addr) = self.pick().await?;
209 let res = self
210 .do_get_puzzle_and_solution(&peer, coin_id, height)
211 .await;
212 if res.is_err() {
213 self.pool.eject_peer(addr).await;
214 }
215 res
216 }
217
218 pub async fn try_get_fee_estimate(
219 &self,
220 target_times: &[u64],
221 ) -> Result<FeeEstimate, ChiaQueryError> {
222 let (peer, addr) = self.pick().await?;
223 let res = self.do_get_fee_estimate(&peer, target_times).await;
224 if res.is_err() {
225 self.pool.eject_peer(addr).await;
226 }
227 res
228 }
229
230 pub async fn try_push_tx(&self, bundle: &SpendBundle) -> Result<TxStatus, ChiaQueryError> {
231 let (peer, addr) = self.pick().await?;
232 let res = self.do_push_tx(&peer, bundle).await;
233 if res.is_err() {
234 self.pool.eject_peer(addr).await;
235 }
236 res
237 }
238
239 pub async fn try_get_block_record_by_height(
242 &self,
243 height: u32,
244 ) -> Result<BlockRecord, ChiaQueryError> {
245 let (peer, addr) = self.pick().await?;
246 let res = self.do_get_block_record_by_height(&peer, height).await;
247 if res.is_err() {
248 self.pool.eject_peer(addr).await;
249 }
250 res
251 }
252
253 #[allow(dead_code)]
259 pub async fn try_get_additions_and_removals(
260 &self,
261 height: u32,
262 header_hash: &str,
263 ) -> Result<AdditionsAndRemovals, ChiaQueryError> {
264 let (peer, addr) = self.pick().await?;
265 let res = self
266 .do_get_additions_and_removals(&peer, height, header_hash)
267 .await;
268 if res.is_err() {
269 self.pool.eject_peer(addr).await;
270 }
271 res
272 }
273
274 pub async fn try_get_children(
277 &self,
278 parent_id: &str,
279 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
280 let (peer, addr) = self.pick().await?;
281 let res = self.do_get_children(&peer, parent_id).await;
282 if res.is_err() {
283 self.pool.eject_peer(addr).await;
284 }
285 res
286 }
287
288 pub async fn try_get_block_by_height(
291 &self,
292 height: u32,
293 ) -> Result<serde_json::Value, ChiaQueryError> {
294 let (peer, addr) = self.pick().await?;
295 let res = self.do_get_block_by_height(&peer, height).await;
296 if res.is_err() {
297 self.pool.eject_peer(addr).await;
298 }
299 res
300 }
301
302 pub async fn try_get_additions_and_removals_from_block(
305 &self,
306 height: u32,
307 ) -> Result<AdditionsAndRemovals, ChiaQueryError> {
308 let (peer, addr) = self.pick().await?;
309 let res = self
310 .do_get_additions_and_removals_from_block(&peer, height)
311 .await;
312 if res.is_err() {
313 self.pool.eject_peer(addr).await;
314 }
315 res
316 }
317
318 pub async fn try_get_block_spends_by_height(
321 &self,
322 height: u32,
323 ) -> Result<Vec<CoinSpend>, ChiaQueryError> {
324 let (peer, addr) = self.pick().await?;
325 let res = self.do_get_block_spends(&peer, height).await;
326 if res.is_err() {
327 self.pool.eject_peer(addr).await;
328 }
329 res
330 }
331
332 pub async fn try_get_block_spends_with_conditions(
335 &self,
336 height: u32,
337 ) -> Result<Vec<CoinSpendWithConditions>, ChiaQueryError> {
338 let (peer, addr) = self.pick().await?;
339 let proto_block = self.fetch_full_block(&peer, height).await;
340 if proto_block.is_err() {
341 self.pool.eject_peer(addr).await;
342 }
343 let proto_block = proto_block?;
344 block::block_spends_with_conditions(&proto_block, self.constants())
345 }
346
347 pub async fn try_get_puzzle_and_solution_auto(
350 &self,
351 coin_id: &str,
352 ) -> Result<CoinSpend, ChiaQueryError> {
353 let (peer, addr) = self.pick().await?;
355 let id = translate::parse_bytes32(coin_id)?;
356
357 let state_resp = tokio::time::timeout(self.request_timeout, {
358 peer.request_coin_state(vec![id], None, self.genesis_challenge(), false)
359 })
360 .await
361 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
362 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
363 .map_err(|_| ChiaQueryError::PeerRejection("coin state rejected".into()))?;
364
365 let cs = state_resp
366 .coin_states
367 .first()
368 .ok_or_else(|| ChiaQueryError::PeerRejection("coin not found".into()))?;
369 let spent_height = cs
370 .spent_height
371 .ok_or_else(|| ChiaQueryError::PeerRejection("coin is not spent".into()))?;
372
373 let res = self
374 .do_get_puzzle_and_solution(&peer, coin_id, spent_height)
375 .await;
376 if res.is_err() {
377 self.pool.eject_peer(addr).await;
378 }
379 res
380 }
381
382 pub async fn try_get_block_records(
385 &self,
386 start: u32,
387 end: u32,
388 ) -> Result<Vec<BlockRecord>, ChiaQueryError> {
389 let mut records = Vec::with_capacity((end - start) as usize);
390 for height in start..end {
391 records.push(self.try_get_block_record_by_height(height).await?);
392 }
393 Ok(records)
394 }
395
396 pub async fn try_get_blocks_range(
399 &self,
400 start: u32,
401 end: u32,
402 ) -> Result<Vec<serde_json::Value>, ChiaQueryError> {
403 let mut blocks = Vec::with_capacity((end - start) as usize);
404 for height in start..end {
405 blocks.push(self.try_get_block_by_height(height).await?);
406 }
407 Ok(blocks)
408 }
409
410 pub fn network_info(&self) -> NetworkInfo {
413 let c = self.constants();
414 NetworkInfo {
415 network_name: self.network.network_id().to_string(),
416 network_prefix: match self.network {
417 NetworkType::Mainnet => "xch".to_string(),
418 NetworkType::Testnet11 => "txch".to_string(),
419 },
420 genesis_challenge: format!("0x{}", hex::encode(c.genesis_challenge)),
421 }
422 }
423
424 pub fn aggsig_additional_data(&self) -> String {
427 format!(
428 "0x{}",
429 hex::encode(self.constants().agg_sig_me_additional_data)
430 )
431 }
432
433 pub fn peak_height(&self) -> u32 {
436 self.pool.peak_height()
437 }
438
439 async fn do_get_coin_record_by_name(
444 &self,
445 peer: &Peer,
446 name: &str,
447 ) -> Result<CoinRecord, ChiaQueryError> {
448 let coin_id = translate::parse_bytes32(name)?;
449
450 let response = tokio::time::timeout(self.request_timeout, {
451 peer.request_coin_state(vec![coin_id], None, self.genesis_challenge(), false)
452 })
453 .await
454 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
455 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
456 .map_err(|_| ChiaQueryError::PeerRejection("coin state request rejected".into()))?;
457
458 response
459 .coin_states
460 .first()
461 .map(translate::coin_state_to_record)
462 .ok_or_else(|| ChiaQueryError::PeerRejection("coin not found".into()))
463 }
464
465 async fn do_puzzle_hash_query(
466 &self,
467 peer: &Peer,
468 hashes: &[&str],
469 start_height: Option<u32>,
470 end_height: Option<u32>,
471 include_spent: bool,
472 include_hinted: bool,
473 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
474 let puzzle_hashes: Vec<Bytes32> = hashes
475 .iter()
476 .map(|h| translate::parse_bytes32(h))
477 .collect::<Result<_, _>>()?;
478
479 let filters = CoinStateFilters {
480 include_spent,
481 include_unspent: true,
482 include_hinted,
483 min_amount: 0,
484 };
485
486 let mut all_states = Vec::new();
487 let mut prev_height: Option<u32> = None;
493 let mut prev_header = self.genesis_challenge();
494
495 loop {
496 let response = tokio::time::timeout(self.request_timeout, {
497 peer.request_puzzle_state(
498 puzzle_hashes.clone(),
499 prev_height,
500 prev_header,
501 filters.clone(),
502 false,
503 )
504 })
505 .await
506 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
507 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
508 .map_err(|_| ChiaQueryError::PeerRejection("puzzle state request rejected".into()))?;
509
510 all_states.extend(response.coin_states.iter().cloned());
511
512 if response.is_finished {
513 break;
514 }
515 prev_height = Some(response.height);
516 prev_header = response.header_hash;
517 }
518
519 let records: Vec<CoinRecord> = all_states
521 .iter()
522 .filter(|cs| {
523 let h = cs.created_height.unwrap_or(0);
524 let above_start = start_height.is_none_or(|s| h >= s);
525 let below_end = end_height.is_none_or(|e| h <= e);
526 above_start && below_end
527 })
528 .map(translate::coin_state_to_record)
529 .collect();
530
531 Ok(records)
532 }
533
534 async fn do_coin_ids_query(
535 &self,
536 peer: &Peer,
537 names: &[String],
538 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
539 let ids: Vec<Bytes32> = names
540 .iter()
541 .map(|n| translate::parse_bytes32(n))
542 .collect::<Result<_, _>>()?;
543
544 let response = tokio::time::timeout(self.request_timeout, {
545 peer.request_coin_state(ids, None, self.genesis_challenge(), false)
546 })
547 .await
548 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
549 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
550 .map_err(|_| ChiaQueryError::PeerRejection("coin state request rejected".into()))?;
551
552 Ok(translate::coin_states_to_records(&response.coin_states))
553 }
554
555 async fn do_get_puzzle_and_solution(
556 &self,
557 peer: &Peer,
558 coin_id: &str,
559 height: u32,
560 ) -> Result<CoinSpend, ChiaQueryError> {
561 let id = translate::parse_bytes32(coin_id)?;
562
563 let response = tokio::time::timeout(self.request_timeout, {
564 peer.request_puzzle_and_solution(id, height)
565 })
566 .await
567 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
568 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
569 .map_err(|_| ChiaQueryError::PeerRejection("puzzle solution rejected".into()))?;
570
571 Ok(translate::make_coin_spend(
572 &chia::protocol::Coin {
578 parent_coin_info: response.coin_name,
579 puzzle_hash: Bytes32::default(),
580 amount: 0,
581 },
582 &response.puzzle,
583 &response.solution,
584 ))
585 }
586
587 async fn do_get_fee_estimate(
588 &self,
589 peer: &Peer,
590 target_times: &[u64],
591 ) -> Result<FeeEstimate, ChiaQueryError> {
592 let request = RequestFeeEstimates {
593 time_targets: target_times.to_vec(),
594 };
595
596 let response: RespondFeeEstimates =
597 tokio::time::timeout(self.request_timeout, peer.request_infallible(request))
598 .await
599 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
600 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?;
601
602 let estimates: Vec<f64> = response
603 .estimates
604 .estimates
605 .iter()
606 .map(|e| e.estimated_fee_rate.mojos_per_clvm_cost as f64)
607 .collect();
608
609 Ok(translate::make_fee_estimate(
610 estimates,
611 target_times.to_vec(),
612 ))
613 }
614
615 async fn do_push_tx(
616 &self,
617 peer: &Peer,
618 bundle: &SpendBundle,
619 ) -> Result<TxStatus, ChiaQueryError> {
620 let proto = to_protocol_spend_bundle(bundle)?;
621
622 let ack = tokio::time::timeout(self.request_timeout, peer.send_transaction(proto))
623 .await
624 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
625 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?;
626
627 Ok(translate::ack_to_tx_status(ack.status))
628 }
629
630 async fn do_get_block_by_height(
633 &self,
634 peer: &Peer,
635 height: u32,
636 ) -> Result<serde_json::Value, ChiaQueryError> {
637 let proto_block = self.fetch_full_block(peer, height).await?;
638 serde_json::to_value(&proto_block)
639 .map_err(|e| ChiaQueryError::PeerConnection(format!("serialize block: {e}")))
640 }
641
642 async fn do_get_additions_and_removals_from_block(
645 &self,
646 peer: &Peer,
647 height: u32,
648 ) -> Result<AdditionsAndRemovals, ChiaQueryError> {
649 let proto_block = self.fetch_full_block(peer, height).await?;
650 block::block_additions_and_removals(&proto_block, height, self.constants())
651 }
652
653 async fn do_get_block_spends(
656 &self,
657 peer: &Peer,
658 height: u32,
659 ) -> Result<Vec<CoinSpend>, ChiaQueryError> {
660 let proto_block = self.fetch_full_block(peer, height).await?;
661 block::block_spends(&proto_block, self.constants())
662 }
663
664 async fn fetch_full_block(
667 &self,
668 peer: &Peer,
669 height: u32,
670 ) -> Result<ProtoFullBlock, ChiaQueryError> {
671 let response = tokio::time::timeout(self.request_timeout, {
672 peer.request_fallible::<RespondBlock, RejectBlock, _>(RequestBlock {
673 height,
674 include_transaction_block: true,
675 })
676 })
677 .await
678 .map_err(|_| ChiaQueryError::PeerConnection("block request timed out".into()))?
679 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
680 .map_err(|_| ChiaQueryError::PeerRejection("block request rejected".into()))?;
681
682 Ok(response.block)
683 }
684
685 async fn do_get_block_record_by_height(
688 &self,
689 peer: &Peer,
690 height: u32,
691 ) -> Result<BlockRecord, ChiaQueryError> {
692 let response = tokio::time::timeout(self.request_timeout, {
693 peer.request_fallible::<RespondBlockHeader, RejectHeaderRequest, _>(
694 RequestBlockHeader { height },
695 )
696 })
697 .await
698 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
699 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
700 .map_err(|_| ChiaQueryError::PeerRejection("header request rejected".into()))?;
701
702 Ok(translate::header_block_to_block_record(
703 &response.header_block,
704 ))
705 }
706
707 async fn do_get_additions_and_removals(
710 &self,
711 peer: &Peer,
712 height: u32,
713 header_hash_hex: &str,
714 ) -> Result<AdditionsAndRemovals, ChiaQueryError> {
715 let header_hash = translate::parse_bytes32(header_hash_hex)?;
716
717 let (adds_result, rems_result) = tokio::join!(
719 tokio::time::timeout(self.request_timeout, {
720 peer.request_fallible::<RespondAdditions, RejectAdditionsRequest, _>(
721 RequestAdditions {
722 height,
723 header_hash: Some(header_hash),
724 puzzle_hashes: None,
725 },
726 )
727 }),
728 tokio::time::timeout(self.request_timeout, {
729 peer.request_fallible::<RespondRemovals, RejectRemovalsRequest, _>(
730 RequestRemovals {
731 height,
732 header_hash,
733 coin_names: None,
734 },
735 )
736 }),
737 );
738
739 let adds = adds_result
740 .map_err(|_| ChiaQueryError::PeerConnection("additions request timed out".into()))?
741 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
742 .map_err(|_| ChiaQueryError::PeerRejection("additions rejected".into()))?;
743
744 let rems = rems_result
745 .map_err(|_| ChiaQueryError::PeerConnection("removals request timed out".into()))?
746 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?
747 .map_err(|_| ChiaQueryError::PeerRejection("removals rejected".into()))?;
748
749 Ok(translate::additions_removals_to_response(
750 &adds, &rems, height,
751 ))
752 }
753
754 async fn do_get_children(
757 &self,
758 peer: &Peer,
759 parent_id: &str,
760 ) -> Result<Vec<CoinRecord>, ChiaQueryError> {
761 let coin_name = translate::parse_bytes32(parent_id)?;
762
763 let response = tokio::time::timeout(self.request_timeout, peer.request_children(coin_name))
764 .await
765 .map_err(|_| ChiaQueryError::PeerConnection("request timed out".into()))?
766 .map_err(|e| ChiaQueryError::PeerConnection(e.to_string()))?;
767
768 Ok(translate::coin_states_to_records(&response.coin_states))
769 }
770}
771
772fn to_protocol_spend_bundle(bundle: &SpendBundle) -> Result<ProtoBundle, ChiaQueryError> {
777 let coin_spends: Vec<chia::protocol::CoinSpend> = bundle
778 .coin_spends
779 .iter()
780 .map(|cs| {
781 Ok(chia::protocol::CoinSpend {
782 coin: chia::protocol::Coin {
783 parent_coin_info: translate::parse_bytes32(&cs.coin.parent_coin_info)?,
784 puzzle_hash: translate::parse_bytes32(&cs.coin.puzzle_hash)?,
785 amount: cs.coin.amount,
786 },
787 puzzle_reveal: chia::protocol::Program::from(chia::protocol::Bytes::from(
788 translate::parse_hex(&cs.puzzle_reveal)?,
789 )),
790 solution: chia::protocol::Program::from(chia::protocol::Bytes::from(
791 translate::parse_hex(&cs.solution)?,
792 )),
793 })
794 })
795 .collect::<Result<_, ChiaQueryError>>()?;
796
797 let sig_bytes = translate::parse_hex(&bundle.aggregated_signature)?;
798 let sig_arr: [u8; 96] = sig_bytes
799 .try_into()
800 .map_err(|_| ChiaQueryError::InvalidRequest("signature must be 96 bytes".into()))?;
801 let aggregated_signature = chia::bls::Signature::from_bytes(&sig_arr)
802 .map_err(|e| ChiaQueryError::InvalidRequest(format!("bad BLS signature: {e}")))?;
803
804 Ok(ProtoBundle {
805 coin_spends,
806 aggregated_signature,
807 })
808}