kyoto/
client.rs

1use bitcoin::{block::Header, BlockHash, FeeRate};
2use bitcoin::{Amount, Transaction};
3use std::{collections::BTreeMap, ops::Range, time::Duration};
4use tokio::sync::mpsc;
5use tokio::sync::mpsc::UnboundedSender;
6use tokio::sync::oneshot;
7
8use crate::chain::block_subsidy;
9use crate::{Event, Info, TrustedPeer, TxBroadcast, Warning};
10
11use super::{error::FetchBlockError, messages::BlockRequest, IndexedBlock};
12use super::{
13    error::{ClientError, FetchFeeRateError, FetchHeaderError},
14    messages::{BatchHeaderRequest, ClientMessage, HeaderRequest},
15};
16
17/// A [`Client`] allows for communication with a running node.
18#[derive(Debug)]
19pub struct Client {
20    /// Send events to a node, such as broadcasting a transaction.
21    pub requester: Requester,
22    /// Receive informational messages from the node.
23    pub info_rx: mpsc::Receiver<Info>,
24    /// Receive warning messages from a node.
25    pub warn_rx: mpsc::UnboundedReceiver<Warning>,
26    /// Receive [`Event`] from a node to act on.
27    pub event_rx: mpsc::UnboundedReceiver<Event>,
28}
29
30impl Client {
31    pub(crate) fn new(
32        info_rx: mpsc::Receiver<Info>,
33        warn_rx: mpsc::UnboundedReceiver<Warning>,
34        event_rx: mpsc::UnboundedReceiver<Event>,
35        ntx: UnboundedSender<ClientMessage>,
36    ) -> Self {
37        Self {
38            requester: Requester::new(ntx),
39            info_rx,
40            warn_rx,
41            event_rx,
42        }
43    }
44}
45
46/// Send messages to a node that is running so the node may complete a task.
47#[derive(Debug, Clone)]
48pub struct Requester {
49    ntx: UnboundedSender<ClientMessage>,
50}
51
52impl Requester {
53    fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
54        Self { ntx }
55    }
56
57    /// Tell the node to shut down.
58    ///
59    /// # Errors
60    ///
61    /// If the node has already stopped running.
62    pub fn shutdown(&self) -> Result<(), ClientError> {
63        self.ntx
64            .send(ClientMessage::Shutdown)
65            .map_err(|_| ClientError::SendError)
66    }
67
68    /// Broadcast a new transaction to the network.
69    ///
70    /// # Note
71    ///
72    /// When broadcasting a one-parent one-child (TRUC) package,
73    /// broadcast the child first, followed by the parent.
74    ///
75    /// Package relay is under-development at the time of writing.
76    ///
77    /// For more information, see BIP-431 and BIP-331.
78    ///
79    /// # Errors
80    ///
81    /// If the node has stopped running.
82    pub fn broadcast_tx(&self, tx: TxBroadcast) -> Result<(), ClientError> {
83        self.ntx
84            .send(ClientMessage::Broadcast(tx))
85            .map_err(|_| ClientError::SendError)
86    }
87
88    /// Broadcast a new transaction to the network to a random peer.
89    ///
90    /// # Errors
91    ///
92    /// If the node has stopped running.
93    pub fn broadcast_random(&self, tx: Transaction) -> Result<(), ClientError> {
94        let tx_broadcast = TxBroadcast::random_broadcast(tx);
95        self.ntx
96            .send(ClientMessage::Broadcast(tx_broadcast))
97            .map_err(|_| ClientError::SendError)
98    }
99
100    /// A connection has a minimum transaction fee requirement to enter its mempool. For proper transaction propagation,
101    /// transactions should have a fee rate at least as high as the maximum fee filter received.
102    /// This method returns the maximum fee rate requirement of all connected peers.
103    ///
104    /// For more information, refer to BIP133
105    ///
106    /// # Errors
107    ///
108    /// If the node has stopped running.
109    pub async fn broadcast_min_feerate(&self) -> Result<FeeRate, FetchFeeRateError> {
110        let (tx, rx) = tokio::sync::oneshot::channel::<FeeRate>();
111        self.ntx
112            .send(ClientMessage::GetBroadcastMinFeeRate(tx))
113            .map_err(|_| FetchFeeRateError::SendError)?;
114        rx.await.map_err(|_| FetchFeeRateError::RecvError)
115    }
116
117    /// Get a header at the specified height, if it exists.
118    ///
119    /// # Note
120    ///
121    /// The height of the chain is the canonical index of the header in the chain.
122    /// For example, the genesis block is at a height of zero.
123    ///
124    /// # Errors
125    ///
126    /// If the node has stopped running.
127    pub async fn get_header(&self, height: u32) -> Result<Header, FetchHeaderError> {
128        let (tx, rx) = tokio::sync::oneshot::channel::<Result<Header, FetchHeaderError>>();
129        let message = HeaderRequest::new(tx, height);
130        self.ntx
131            .send(ClientMessage::GetHeader(message))
132            .map_err(|_| FetchHeaderError::SendError)?;
133        rx.await.map_err(|_| FetchHeaderError::RecvError)?
134    }
135
136    /// Get a range of headers by the specified range.
137    ///
138    /// # Errors
139    ///
140    /// If the node has stopped running.
141    pub async fn get_header_range(
142        &self,
143        range: Range<u32>,
144    ) -> Result<BTreeMap<u32, Header>, FetchHeaderError> {
145        let (tx, rx) =
146            tokio::sync::oneshot::channel::<Result<BTreeMap<u32, Header>, FetchHeaderError>>();
147        let message = BatchHeaderRequest::new(tx, range);
148        self.ntx
149            .send(ClientMessage::GetHeaderBatch(message))
150            .map_err(|_| FetchHeaderError::SendError)?;
151        rx.await.map_err(|_| FetchHeaderError::RecvError)?
152    }
153
154    /// Request a block be fetched. Note that this method will request a block
155    /// from a connected peer's inventory, and may take an indefinite amount of
156    /// time, until a peer responds.
157    ///
158    /// # Errors
159    ///
160    /// If the node has stopped running.
161    pub async fn get_block(&self, block_hash: BlockHash) -> Result<IndexedBlock, FetchBlockError> {
162        let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
163        let message = BlockRequest::new(tx, block_hash);
164        self.ntx
165            .send(ClientMessage::GetBlock(message))
166            .map_err(|_| FetchBlockError::SendError)?;
167        rx.await.map_err(|_| FetchBlockError::RecvError)?
168    }
169
170    /// Request a block be fetched and receive a [`tokio::sync::oneshot::Receiver`]
171    /// to await the resulting block.
172    ///
173    /// # Errors
174    ///
175    /// If the node has stopped running.
176    pub fn request_block(
177        &self,
178        block_hash: BlockHash,
179    ) -> Result<oneshot::Receiver<Result<IndexedBlock, FetchBlockError>>, FetchBlockError> {
180        let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
181        let message = BlockRequest::new(tx, block_hash);
182        self.ntx
183            .send(ClientMessage::GetBlock(message))
184            .map_err(|_| FetchBlockError::SendError)?;
185        Ok(rx)
186    }
187
188    /// Fetch the average fee rate for the given block hash.
189    ///
190    /// Computed by taking (`coinbase output amount` - `block subsidy`) / `block weight`. Note that
191    /// this value may provide skewed estimates, as averages are more effected by outliers than
192    /// medians. For a rudimentary estimation of the fee rate required to enter the next block,
193    /// this method may suffice.
194    pub async fn average_fee_rate(
195        &self,
196        block_hash: BlockHash,
197    ) -> Result<FeeRate, FetchBlockError> {
198        let (tx, rx) = tokio::sync::oneshot::channel::<Result<IndexedBlock, FetchBlockError>>();
199        let message = BlockRequest::new(tx, block_hash);
200        self.ntx
201            .send(ClientMessage::GetBlock(message))
202            .map_err(|_| FetchBlockError::SendError)?;
203        let indexed_block = rx.await.map_err(|_| FetchBlockError::RecvError)??;
204        let subsidy = block_subsidy(indexed_block.height);
205        let weight = indexed_block.block.weight();
206        let revenue = indexed_block
207            .block
208            .txdata
209            .first()
210            .map(|tx| tx.output.iter().map(|txout| txout.value).sum())
211            .unwrap_or(Amount::ZERO);
212        let block_fees = revenue.checked_sub(subsidy).unwrap_or(Amount::ZERO);
213        let fee_rate = block_fees.to_sat() / weight.to_kwu_floor();
214        Ok(FeeRate::from_sat_per_kwu(fee_rate))
215    }
216
217    /// Starting after the configured checkpoint, re-emit all block filters.
218    ///
219    /// # Errors
220    ///
221    /// If the node has stopped running.
222    pub fn rescan(&self) -> Result<(), ClientError> {
223        self.ntx
224            .send(ClientMessage::Rescan)
225            .map_err(|_| ClientError::SendError)
226    }
227
228    /// Set a new connection timeout for peers to respond to messages.
229    ///
230    /// # Errors
231    ///
232    /// If the node has stopped running.
233    pub fn set_response_timeout(&self, duration: Duration) -> Result<(), ClientError> {
234        self.ntx
235            .send(ClientMessage::SetDuration(duration))
236            .map_err(|_| ClientError::SendError)
237    }
238
239    /// Add another known peer to connect to.
240    ///
241    /// # Errors
242    ///
243    /// If the node has stopped running.
244    pub fn add_peer(&self, peer: impl Into<TrustedPeer>) -> Result<(), ClientError> {
245        self.ntx
246            .send(ClientMessage::AddPeer(peer.into()))
247            .map_err(|_| ClientError::SendError)
248    }
249
250    /// Check if the node is running.
251    pub fn is_running(&self) -> bool {
252        self.ntx.send(ClientMessage::NoOp).is_ok()
253    }
254}
255
256impl<T> From<mpsc::error::SendError<T>> for ClientError {
257    fn from(_: mpsc::error::SendError<T>) -> Self {
258        ClientError::SendError
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use bitcoin::{consensus::deserialize, Transaction};
265    use tokio::sync::mpsc;
266
267    use super::*;
268
269    #[tokio::test]
270    async fn test_client_works() {
271        let transaction: Transaction = deserialize(&hex::decode("0200000001aad73931018bd25f84ae400b68848be09db706eac2ac18298babee71ab656f8b0000000048473044022058f6fc7c6a33e1b31548d481c826c015bd30135aad42cd67790dab66d2ad243b02204a1ced2604c6735b6393e5b41691dd78b00f0c5942fb9f751856faa938157dba01feffffff0280f0fa020000000017a9140fb9463421696b82c833af241c78c17ddbde493487d0f20a270100000017a91429ca74f8a08f81999428185c97b5d852e4063f618765000000").unwrap()).unwrap();
272        let (_, info_rx) = tokio::sync::mpsc::channel::<Info>(1);
273        let (_, warn_rx) = tokio::sync::mpsc::unbounded_channel::<Warning>();
274        let (_, event_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
275        let (ctx, crx) = mpsc::unbounded_channel::<ClientMessage>();
276        let Client {
277            requester,
278            info_rx: _,
279            warn_rx: _,
280            event_rx: _,
281        } = Client::new(info_rx, warn_rx, event_rx, ctx);
282        let broadcast = requester.broadcast_tx(TxBroadcast::new(
283            transaction.clone(),
284            crate::TxBroadcastPolicy::AllPeers,
285        ));
286        assert!(broadcast.is_ok());
287        drop(crx);
288        let broadcast = requester.shutdown();
289        assert!(broadcast.is_err());
290    }
291}