1use bitcoin::{block::Header, BlockHash, FeeRate};
2use bitcoin::{Amount, Transaction};
3use std::{collections::BTreeMap, ops::Range, time::Duration};
4use tokio::sync::mpsc;
5use tokio::sync::mpsc::UnboundedSender;
6use tokio::sync::oneshot;
7
8use crate::chain::block_subsidy;
9use crate::{Event, Info, TrustedPeer, TxBroadcast, Warning};
10
11use super::{error::FetchBlockError, messages::BlockRequest, IndexedBlock};
12use super::{
13 error::{ClientError, FetchFeeRateError, FetchHeaderError},
14 messages::{BatchHeaderRequest, ClientMessage, HeaderRequest},
15};
16
17#[derive(Debug)]
19pub struct Client {
20 pub requester: Requester,
22 pub info_rx: mpsc::Receiver<Info>,
24 pub warn_rx: mpsc::UnboundedReceiver<Warning>,
26 pub event_rx: mpsc::UnboundedReceiver<Event>,
28}
29
30impl Client {
31 pub(crate) fn new(
32 info_rx: mpsc::Receiver<Info>,
33 warn_rx: mpsc::UnboundedReceiver<Warning>,
34 event_rx: mpsc::UnboundedReceiver<Event>,
35 ntx: UnboundedSender<ClientMessage>,
36 ) -> Self {
37 Self {
38 requester: Requester::new(ntx),
39 info_rx,
40 warn_rx,
41 event_rx,
42 }
43 }
44}
45
46#[derive(Debug, Clone)]
48pub struct Requester {
49 ntx: UnboundedSender<ClientMessage>,
50}
51
52impl Requester {
53 fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
54 Self { ntx }
55 }
56
57 pub fn shutdown(&self) -> Result<(), ClientError> {
63 self.ntx
64 .send(ClientMessage::Shutdown)
65 .map_err(|_| ClientError::SendError)
66 }
67
68 pub fn broadcast_tx(&self, tx: TxBroadcast) -> Result<(), ClientError> {
83 self.ntx
84 .send(ClientMessage::Broadcast(tx))
85 .map_err(|_| ClientError::SendError)
86 }
87
88 pub fn broadcast_random(&self, tx: Transaction) -> Result<(), ClientError> {
94 let tx_broadcast = TxBroadcast::random_broadcast(tx);
95 self.ntx
96 .send(ClientMessage::Broadcast(tx_broadcast))
97 .map_err(|_| ClientError::SendError)
98 }
99
100 pub async fn broadcast_min_feerate(&self) -> Result<FeeRate, FetchFeeRateError> {
110 let (tx, rx) = tokio::sync::oneshot::channel::<FeeRate>();
111 self.ntx
112 .send(ClientMessage::GetBroadcastMinFeeRate(tx))
113 .map_err(|_| FetchFeeRateError::SendError)?;
114 rx.await.map_err(|_| FetchFeeRateError::RecvError)
115 }
116
117 pub async fn get_header(&self, height: u32) -> Result<Header, FetchHeaderError> {
128 let (tx, rx) = tokio::sync::oneshot::channel::<Result<Header, FetchHeaderError>>();
129 let message = HeaderRequest::new(tx, height);
130 self.ntx
131 .send(ClientMessage::GetHeader(message))
132 .map_err(|_| FetchHeaderError::SendError)?;
133 rx.await.map_err(|_| FetchHeaderError::RecvError)?
134 }
135
136 pub async fn get_header_range(
142 &self,
143 range: Range<u32>,
144 ) -> Result<BTreeMap<u32, Header>, FetchHeaderError> {
145 let (tx, rx) =
146 tokio::sync::oneshot::channel::<Result<BTreeMap<u32, Header>, FetchHeaderError>>();
147 let message = BatchHeaderRequest::new(tx, range);
148 self.ntx
149 .send(ClientMessage::GetHeaderBatch(message))
150 .map_err(|_| FetchHeaderError::SendError)?;
151 rx.await.map_err(|_| FetchHeaderError::RecvError)?
152 }
153
154 pub async fn get_block(&self, block_hash: BlockHash) -> Result<IndexedBlock, FetchBlockError> {
162 let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
163 let message = BlockRequest::new(tx, block_hash);
164 self.ntx
165 .send(ClientMessage::GetBlock(message))
166 .map_err(|_| FetchBlockError::SendError)?;
167 rx.await.map_err(|_| FetchBlockError::RecvError)?
168 }
169
170 pub fn request_block(
177 &self,
178 block_hash: BlockHash,
179 ) -> Result<oneshot::Receiver<Result<IndexedBlock, FetchBlockError>>, FetchBlockError> {
180 let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
181 let message = BlockRequest::new(tx, block_hash);
182 self.ntx
183 .send(ClientMessage::GetBlock(message))
184 .map_err(|_| FetchBlockError::SendError)?;
185 Ok(rx)
186 }
187
188 pub async fn average_fee_rate(
195 &self,
196 block_hash: BlockHash,
197 ) -> Result<FeeRate, FetchBlockError> {
198 let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
199 let message = BlockRequest::new(tx, block_hash);
200 self.ntx
201 .send(ClientMessage::GetBlock(message))
202 .map_err(|_| FetchBlockError::SendError)?;
203 let indexed_block = rx.await.map_err(|_| FetchBlockError::RecvError)??;
204 let subsidy = block_subsidy(indexed_block.height);
205 let weight = indexed_block.block.weight();
206 let revenue = indexed_block
207 .block
208 .txdata
209 .first()
210 .map(|tx| tx.output.iter().map(|txout| txout.value).sum())
211 .unwrap_or(Amount::ZERO);
212 let block_fees = revenue.checked_sub(subsidy).unwrap_or(Amount::ZERO);
213 let fee_rate = block_fees.to_sat() / weight.to_kwu_floor();
214 Ok(FeeRate::from_sat_per_kwu(fee_rate))
215 }
216
217 pub fn rescan(&self) -> Result<(), ClientError> {
223 self.ntx
224 .send(ClientMessage::Rescan)
225 .map_err(|_| ClientError::SendError)
226 }
227
228 pub fn set_response_timeout(&self, duration: Duration) -> Result<(), ClientError> {
234 self.ntx
235 .send(ClientMessage::SetDuration(duration))
236 .map_err(|_| ClientError::SendError)
237 }
238
239 pub fn add_peer(&self, peer: impl Into<TrustedPeer>) -> Result<(), ClientError> {
245 self.ntx
246 .send(ClientMessage::AddPeer(peer.into()))
247 .map_err(|_| ClientError::SendError)
248 }
249
250 pub fn is_running(&self) -> bool {
252 self.ntx.send(ClientMessage::NoOp).is_ok()
253 }
254}
255
256impl<T> From<mpsc::error::SendError<T>> for ClientError {
257 fn from(_: mpsc::error::SendError<T>) -> Self {
258 ClientError::SendError
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use bitcoin::{consensus::deserialize, Transaction};
265 use tokio::sync::mpsc;
266
267 use super::*;
268
269 #[tokio::test]
270 async fn test_client_works() {
271 let transaction: Transaction = deserialize(&hex::decode("0200000001aad73931018bd25f84ae400b68848be09db706eac2ac18298babee71ab656f8b0000000048473044022058f6fc7c6a33e1b31548d481c826c015bd30135aad42cd67790dab66d2ad243b02204a1ced2604c6735b6393e5b41691dd78b00f0c5942fb9f751856faa938157dba01feffffff0280f0fa020000000017a9140fb9463421696b82c833af241c78c17ddbde493487d0f20a270100000017a91429ca74f8a08f81999428185c97b5d852e4063f618765000000").unwrap()).unwrap();
272 let (_, info_rx) = tokio::sync::mpsc::channel::<Info>(1);
273 let (_, warn_rx) = tokio::sync::mpsc::unbounded_channel::<Warning>();
274 let (_, event_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
275 let (ctx, crx) = mpsc::unbounded_channel::<ClientMessage>();
276 let Client {
277 requester,
278 info_rx: _,
279 warn_rx: _,
280 event_rx: _,
281 } = Client::new(info_rx, warn_rx, event_rx, ctx);
282 let broadcast = requester.broadcast_tx(TxBroadcast::new(
283 transaction.clone(),
284 crate::TxBroadcastPolicy::AllPeers,
285 ));
286 assert!(broadcast.is_ok());
287 drop(crx);
288 let broadcast = requester.shutdown();
289 assert!(broadcast.is_err());
290 }
291}