solana_transaction_utils/
blockhash_watcher.rs1use crate::Error;
2use futures::{future, TryFutureExt};
3use solana_client::nonblocking::rpc_client::RpcClient;
4use solana_sdk::hash::Hash;
5use std::{sync::Arc, time::Duration};
6use tokio::{sync::watch, time};
7use tokio_graceful_shutdown::SubsystemHandle;
8use tracing::{info, warn};
9
10pub type MessageSender = watch::Sender<BlockHashData>;
11pub type MessageReceiver = watch::Receiver<BlockHashData>;
12pub const BLOCKHASH_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
13
14pub fn last_valid<T>(receiver: &watch::Receiver<T>) -> watch::Ref<'_, T>
15where
16 T: Clone,
17{
18 receiver.borrow()
19}
20
21#[derive(Debug, Clone, Default)]
22pub struct BlockHashData {
23 pub last_valid_block_height: u64,
24 pub last_valid_blockhash: Hash,
25 pub current_block_height: u64,
26}
27
28#[derive(Clone)]
29pub struct BlockhashWatcher {
30 watch: MessageSender,
31 interval: Duration,
32 client: Arc<RpcClient>,
33}
34
35impl BlockhashWatcher {
36 pub fn new(interval: Duration, client: Arc<RpcClient>) -> Self {
37 let (watch, _) = watch::channel(Default::default());
38 Self {
39 watch,
40 interval,
41 client,
42 }
43 }
44
45 pub fn watcher(&mut self) -> MessageReceiver {
46 self.watch.subscribe()
47 }
48
49 pub async fn run(mut self, shutdown: SubsystemHandle) -> Result<(), Error> {
50 info!("starting");
51 let mut interval = time::interval(self.interval);
52 loop {
53 tokio::select! {
54 _ = shutdown.on_shutdown_requested() => {
55 info!("shutting down");
56 return Ok(());
57 }
58 _ = interval.tick() => {
59 match self.fetch_data(&shutdown).await {
60 Ok(Some(new_data)) => {
61 let _ = self.watch.send_replace(new_data);
62 }
63 Ok(None) => (),
64 Err(err) => warn!(?err, "failed to get block hash data"),
65 };
66 }
67 }
68 }
69 }
70
71 pub async fn fetch_data(
72 &mut self,
73 shutdown: &SubsystemHandle,
74 ) -> Result<Option<BlockHashData>, Error> {
75 let fetch_fut = future::try_join(
76 self.client
77 .get_latest_blockhash_with_commitment(self.client.commitment()),
78 self.client.get_block_height(),
79 )
80 .map_err(Error::from)
81 .map_ok(
82 |((last_valid_blockhash, last_valid_block_height), current_block_height)| {
83 BlockHashData {
84 last_valid_block_height,
85 last_valid_blockhash,
86 current_block_height,
87 }
88 },
89 );
90 tokio::select! {
91 result = fetch_fut => result.map(Some),
92 _ = shutdown.on_shutdown_requested() => Ok(None)
93 }
94 }
95}