use bitcoin::p2p::address::AddrV2;
use bitcoin::p2p::ServiceFlags;
use bitcoin::{Amount, Wtxid};
use bitcoin::{BlockHash, FeeRate};
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use crate::chain::block_subsidy;
use crate::chain::IndexedHeader;
use crate::messages::ClientRequest;
use crate::{Event, HeaderCheckpoint, Info, Package, TrustedPeer, Warning};
use super::{error::ClientError, messages::ClientMessage};
use super::{error::FetchBlockError, IndexedBlock};
#[derive(Debug)]
pub struct Client {
pub requester: Requester,
pub info_rx: mpsc::Receiver<Info>,
pub warn_rx: mpsc::UnboundedReceiver<Warning>,
pub event_rx: mpsc::UnboundedReceiver<Event>,
}
impl Client {
pub(crate) fn new(
info_rx: mpsc::Receiver<Info>,
warn_rx: mpsc::UnboundedReceiver<Warning>,
event_rx: mpsc::UnboundedReceiver<Event>,
ntx: UnboundedSender<ClientMessage>,
) -> Self {
Self {
requester: Requester::new(ntx),
info_rx,
warn_rx,
event_rx,
}
}
}
#[derive(Debug, Clone)]
pub struct Requester {
ntx: UnboundedSender<ClientMessage>,
}
impl Requester {
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
Self { ntx }
}
pub fn shutdown(&self) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::Shutdown)
.map_err(|_| ClientError::SendError)
}
pub async fn submit_package(&self, package: impl Into<Package>) -> Result<Wtxid, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Wtxid>();
let package = package.into();
let client_request = ClientRequest::new(package, tx);
self.ntx
.send(ClientMessage::Broadcast(client_request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
}
pub async fn broadcast_min_feerate(&self) -> Result<FeeRate, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<FeeRate>();
let request = ClientRequest::new((), tx);
self.ntx
.send(ClientMessage::GetBroadcastMinFeeRate(request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
}
pub async fn get_block(&self, block_hash: BlockHash) -> Result<IndexedBlock, FetchBlockError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
let message = ClientRequest::new(block_hash, tx);
self.ntx
.send(ClientMessage::GetBlock(message))
.map_err(|_| FetchBlockError::SendError)?;
rx.await.map_err(|_| FetchBlockError::RecvError)?
}
pub fn request_block(
&self,
block_hash: BlockHash,
) -> Result<oneshot::Receiver<Result<IndexedBlock, FetchBlockError>>, FetchBlockError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
let message = ClientRequest::new(block_hash, tx);
self.ntx
.send(ClientMessage::GetBlock(message))
.map_err(|_| FetchBlockError::SendError)?;
Ok(rx)
}
pub async fn average_fee_rate(
&self,
block_hash: BlockHash,
) -> Result<FeeRate, FetchBlockError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
let message = ClientRequest::new(block_hash, tx);
self.ntx
.send(ClientMessage::GetBlock(message))
.map_err(|_| FetchBlockError::SendError)?;
let indexed_block = rx.await.map_err(|_| FetchBlockError::RecvError)??;
let subsidy = block_subsidy(indexed_block.height);
let weight = indexed_block.block.weight();
let revenue = indexed_block
.block
.txdata
.first()
.map(|tx| tx.output.iter().map(|txout| txout.value).sum())
.unwrap_or(Amount::ZERO);
let block_fees = revenue.checked_sub(subsidy).unwrap_or(Amount::ZERO);
let fee_rate = block_fees
.to_sat()
.checked_div(weight.to_kwu_floor())
.unwrap_or(0);
Ok(FeeRate::from_sat_per_kwu(fee_rate))
}
pub async fn peer_info(&self) -> Result<Vec<(AddrV2, ServiceFlags)>, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(AddrV2, ServiceFlags)>>();
let request = ClientRequest::new((), tx);
self.ntx
.send(ClientMessage::GetPeerInfo(request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
}
pub fn rescan(&self) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::Rescan(None))
.map_err(|_| ClientError::SendError)
}
pub fn rescan_from(&self, height: u32) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::Rescan(Some(height)))
.map_err(|_| ClientError::SendError)
}
pub fn add_peer(&self, peer: impl Into<TrustedPeer>) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::AddPeer(peer.into()))
.map_err(|_| ClientError::SendError)
}
pub async fn chain_tip(&self) -> Result<HeaderCheckpoint, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<HeaderCheckpoint>();
let request = ClientRequest::new((), tx);
self.ntx
.send(ClientMessage::BestBlock(request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
}
pub async fn get_header(&self, height: u32) -> Result<Option<IndexedHeader>, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Option<IndexedHeader>>();
let request = ClientRequest::new(height, tx);
self.ntx
.send(ClientMessage::GetHeader(request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
}
pub async fn height_of_hash(&self, hash: BlockHash) -> Result<Option<u32>, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Option<u32>>();
let request = ClientRequest::new(hash, tx);
self.ntx
.send(ClientMessage::HeightOfHash(request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
}
pub fn is_running(&self) -> bool {
self.ntx.send(ClientMessage::NoOp).is_ok()
}
}
impl<T> From<mpsc::error::SendError<T>> for ClientError {
fn from(_: mpsc::error::SendError<T>) -> Self {
ClientError::SendError
}
}