kyoto/
messages.rs

1use std::{collections::BTreeMap, ops::Range, time::Duration};
2
3use bitcoin::{block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Wtxid};
4use tokio::sync::oneshot;
5
6use crate::IndexedFilter;
7use crate::{
8    chain::{checkpoints::HeaderCheckpoint, IndexedHeader},
9    IndexedBlock, TrustedPeer, TxBroadcast,
10};
11
12use super::error::{FetchBlockError, FetchHeaderError};
13
14/// Informational messages emitted by a node
15#[derive(Debug, Clone)]
16pub enum Info {
17    /// The node was able to successfully complete a version handshake.
18    SuccessfulHandshake,
19    /// The node is connected to all required peers.
20    ConnectionsMet,
21    /// The progress of the node during the block filter download process.
22    Progress(Progress),
23    /// There was an update to the header chain.
24    NewChainHeight(u32),
25    /// A peer served a new fork.
26    NewFork {
27        /// The tip of the new fork.
28        tip: IndexedHeader,
29    },
30    /// A transaction was sent to a peer. The `wtxid` was advertised to the
31    /// peer, and the peer responded with `getdata`. The transaction was then serialized and sent
32    /// over the wire. This is a strong indication the transaction will propagate, but not
33    /// guaranteed. You may receive duplicate messages for a given `wtxid` given your broadcast
34    /// policy.
35    TxGossiped(Wtxid),
36    /// A requested block has been received and is being processed.
37    BlockReceived(BlockHash),
38}
39
40impl core::fmt::Display for Info {
41    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
42        match self {
43            Info::SuccessfulHandshake => write!(f, "Successful version handshake with a peer"),
44            Info::TxGossiped(txid) => write!(f, "Transaction gossiped: {txid}"),
45            Info::ConnectionsMet => write!(f, "Required connections met"),
46            Info::Progress(p) => {
47                let progress_percent = p.percentage_complete();
48                write!(f, "Percent complete: {progress_percent}")
49            }
50            Info::NewChainHeight(height) => write!(f, "New chain height: {height}"),
51            Info::NewFork { tip } => write!(f, "New fork {} -> {}", tip.height, tip.block_hash()),
52            Info::BlockReceived(hash) => write!(f, "Received block {hash}"),
53        }
54    }
55}
56
57/// Data and structures useful for a consumer, such as a wallet.
58#[derive(Debug, Clone)]
59pub enum Event {
60    /// A relevant [`Block`](crate).
61    /// Note that the block may not contain any transactions contained in the script set.
62    /// This is due to block filters having a non-zero false-positive rate when compressing data.
63    Block(IndexedBlock),
64    /// The node is fully synced, having scanned the requested range.
65    FiltersSynced(SyncUpdate),
66    /// Blocks were reorganized out of the chain.
67    BlocksDisconnected {
68        /// Blocks that were accepted to the chain of most work in ascending order by height.
69        accepted: Vec<IndexedHeader>,
70        /// Blocks that were disconnected from the chain of most work in ascending order by height.
71        disconnected: Vec<IndexedHeader>,
72    },
73    /// A compact block filter with associated height and block hash.
74    IndexedFilter(IndexedFilter),
75}
76
77/// The node has synced to a new tip of the chain.
78#[derive(Debug, Clone)]
79pub struct SyncUpdate {
80    /// Last known tip of the blockchain
81    pub tip: HeaderCheckpoint,
82    /// Ten recent headers ending with the tip
83    pub recent_history: BTreeMap<u32, Header>,
84}
85
86impl SyncUpdate {
87    pub(crate) fn new(tip: HeaderCheckpoint, recent_history: BTreeMap<u32, Header>) -> Self {
88        Self {
89            tip,
90            recent_history,
91        }
92    }
93
94    /// Get the tip of the blockchain after this sync.
95    pub fn tip(&self) -> HeaderCheckpoint {
96        self.tip
97    }
98
99    /// Get the ten most recent blocks in chronological order after this sync.
100    pub fn recent_history(&self) -> &BTreeMap<u32, Header> {
101        &self.recent_history
102    }
103}
104
105/// The progress of the node during the block filter download process.
106
107#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
108pub struct Progress {
109    /// The number of filter headers that have been assumed checked and downloaded.
110    pub filter_headers: u32,
111    /// The number of block filters that have been assumed checked and downloaded.
112    pub filters: u32,
113    /// The number of filters to check.
114    pub total_to_check: u32,
115}
116
117impl Progress {
118    pub(crate) fn new(filter_headers: u32, filters: u32, total_to_check: u32) -> Self {
119        Self {
120            filter_headers,
121            filters,
122            total_to_check,
123        }
124    }
125
126    /// The total progress represented as a percent.
127    pub fn percentage_complete(&self) -> f32 {
128        self.fraction_complete() * 100.0
129    }
130
131    /// The total progress represented as a fraction.
132    pub fn fraction_complete(&self) -> f32 {
133        let total = (2 * self.total_to_check) as f32;
134        (self.filter_headers + self.filters) as f32 / total
135    }
136}
137
138/// An attempt to broadcast a transaction failed.
139#[derive(Debug, Clone, Copy)]
140pub struct RejectPayload {
141    /// An enumeration of the reason for the transaction failure. If none is provided, the message could not be sent over the wire.
142    pub reason: Option<RejectReason>,
143    /// The transaction that was rejected or failed to broadcast.
144    pub wtxid: Wtxid,
145}
146
147/// Commands to issue a node.
148#[derive(Debug)]
149pub(crate) enum ClientMessage {
150    /// Stop the node.
151    Shutdown,
152    /// Broadcast a [`crate::Transaction`] with a [`crate::TxBroadcastPolicy`].
153    Broadcast(TxBroadcast),
154    /// Starting at the configured anchor checkpoint, re-emit all filters.
155    Rescan,
156    /// Explicitly request a block from the node.
157    GetBlock(BlockRequest),
158    /// Set a new connection timeout.
159    SetDuration(Duration),
160    /// Add another known peer to connect to.
161    AddPeer(TrustedPeer),
162    /// Request a header from a specified height.
163    GetHeader(HeaderRequest),
164    /// Request a range of headers.
165    GetHeaderBatch(BatchHeaderRequest),
166    /// Request the broadcast minimum fee rate.
167    GetBroadcastMinFeeRate(FeeRateSender),
168    /// Send an empty message to see if the node is running.
169    NoOp,
170}
171
172type HeaderSender = tokio::sync::oneshot::Sender<Result<Header, FetchHeaderError>>;
173
174#[derive(Debug)]
175pub(crate) struct HeaderRequest {
176    pub(crate) oneshot: HeaderSender,
177    pub(crate) height: u32,
178}
179
180impl HeaderRequest {
181    pub(crate) fn new(oneshot: HeaderSender, height: u32) -> Self {
182        Self { oneshot, height }
183    }
184}
185
186type BatchHeaderSender =
187    tokio::sync::oneshot::Sender<Result<BTreeMap<u32, Header>, FetchHeaderError>>;
188
189#[derive(Debug)]
190pub(crate) struct BatchHeaderRequest {
191    pub(crate) oneshot: BatchHeaderSender,
192    pub(crate) range: Range<u32>,
193}
194
195impl BatchHeaderRequest {
196    pub(crate) fn new(oneshot: BatchHeaderSender, range: Range<u32>) -> Self {
197        Self { oneshot, range }
198    }
199}
200
201pub(crate) type FeeRateSender = tokio::sync::oneshot::Sender<FeeRate>;
202
203#[derive(Debug)]
204pub(crate) struct BlockRequest {
205    pub(crate) oneshot: oneshot::Sender<Result<IndexedBlock, FetchBlockError>>,
206    pub(crate) hash: BlockHash,
207}
208
209impl BlockRequest {
210    pub(crate) fn new(
211        oneshot: oneshot::Sender<Result<IndexedBlock, FetchBlockError>>,
212        hash: BlockHash,
213    ) -> Self {
214        Self { oneshot, hash }
215    }
216}
217
218/// Warnings a node may issue while running.
219#[derive(Debug, Clone)]
220pub enum Warning {
221    /// The node is looking for connections to peers.
222    NeedConnections {
223        /// The number of live connections.
224        connected: usize,
225        /// The configured requirement.
226        required: usize,
227    },
228    /// A connection to a peer timed out.
229    PeerTimedOut,
230    /// The node was unable to connect to a peer in the database.
231    CouldNotConnect,
232    /// A connection was maintained, but the peer does not signal for compact block filers.
233    NoCompactFilters,
234    /// The node has been waiting for new `inv` and will find new peers to avoid block withholding.
235    PotentialStaleTip,
236    /// A peer sent us a peer-to-peer message the node did not request.
237    UnsolicitedMessage,
238    /// The provided anchor is deeper than the database history.
239    /// Recoverable by deleting the headers from the database or starting from a higher point in the chain.
240    InvalidStartHeight,
241    /// The headers in the database do not link together. Recoverable by deleting the database.
242    CorruptedHeaders,
243    /// A transaction got rejected, likely for being an insufficient fee or non-standard transaction.
244    TransactionRejected {
245        /// The transaction ID and reject reason, if it exists.
246        payload: RejectPayload,
247    },
248    /// A database failed to persist some data.
249    FailedPersistence {
250        /// Additional context for the persistence failure.
251        warning: String,
252    },
253    /// The peer sent us a potential fork.
254    EvaluatingFork,
255    /// The peer database has no values.
256    EmptyPeerDatabase,
257    /// An unexpected error occurred processing a peer-to-peer message.
258    UnexpectedSyncError {
259        /// Additional context as to why block syncing failed.
260        warning: String,
261    },
262    /// A channel that was supposed to receive a message was dropped.
263    ChannelDropped,
264}
265
266impl core::fmt::Display for Warning {
267    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
268        match self {
269            Warning::NeedConnections {
270                connected,
271                required,
272            } => {
273                write!(
274                    f,
275                    "Looking for connections to peers. Connected: {connected}, Required: {required}"
276                )
277            }
278            Warning::InvalidStartHeight => write!(
279                f,
280                "The provided anchor is deeper than the database history."
281            ),
282            Warning::CouldNotConnect => {
283                write!(f, "An attempted connection failed or timed out.")
284            }
285            Warning::NoCompactFilters => {
286                write!(f, "A connected peer does not serve compact block filters.")
287            }
288            Warning::PotentialStaleTip => {
289                write!(
290                    f,
291                    "The node has been running for a long duration without receiving new blocks."
292                )
293            }
294            Warning::TransactionRejected { payload } => {
295                write!(f, "A transaction got rejected: WTXID {}", payload.wtxid)
296            }
297            Warning::FailedPersistence { warning } => {
298                write!(f, "A database failed to persist some data: {warning}")
299            }
300            Warning::EvaluatingFork => write!(f, "Peer sent us a potential fork."),
301            Warning::EmptyPeerDatabase => write!(f, "The peer database has no values."),
302            Warning::UnexpectedSyncError { warning } => {
303                write!(f, "Error handling a P2P message: {warning}")
304            }
305            Warning::CorruptedHeaders => {
306                write!(f, "The headers in the database do not link together.")
307            }
308            Warning::PeerTimedOut => {
309                write!(f, "A connection to a peer timed out.")
310            }
311            Warning::UnsolicitedMessage => {
312                write!(
313                    f,
314                    "A peer sent us a peer-to-peer message the node did not request."
315                )
316            }
317            Warning::ChannelDropped => {
318                write!(
319                    f,
320                    "A channel that was supposed to receive a message was dropped."
321                )
322            }
323        }
324    }
325}