solana_transaction_utils/
blockhash_watcher.rs

1use crate::Error;
2use futures::{future, TryFutureExt};
3use solana_client::nonblocking::rpc_client::RpcClient;
4use solana_sdk::hash::Hash;
5use std::{sync::Arc, time::Duration};
6use tokio::{sync::watch, time};
7use tokio_graceful_shutdown::SubsystemHandle;
8use tracing::{info, warn};
9
10pub type MessageSender = watch::Sender<BlockHashData>;
11pub type MessageReceiver = watch::Receiver<BlockHashData>;
12pub const BLOCKHASH_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
13
14pub fn last_valid<T>(receiver: &watch::Receiver<T>) -> watch::Ref<'_, T>
15where
16    T: Clone,
17{
18    receiver.borrow()
19}
20
21#[derive(Debug, Clone, Default)]
22pub struct BlockHashData {
23    pub last_valid_block_height: u64,
24    pub last_valid_blockhash: Hash,
25    pub current_block_height: u64,
26}
27
28#[derive(Clone)]
29pub struct BlockhashWatcher {
30    watch: MessageSender,
31    interval: Duration,
32    client: Arc<RpcClient>,
33}
34
35impl BlockhashWatcher {
36    pub fn new(interval: Duration, client: Arc<RpcClient>) -> Self {
37        let (watch, _) = watch::channel(Default::default());
38        Self {
39            watch,
40            interval,
41            client,
42        }
43    }
44
45    pub fn watcher(&mut self) -> MessageReceiver {
46        self.watch.subscribe()
47    }
48
49    pub async fn run(mut self, shutdown: SubsystemHandle) -> Result<(), Error> {
50        info!("starting");
51        let mut interval = time::interval(self.interval);
52        loop {
53            tokio::select! {
54                _ = shutdown.on_shutdown_requested() => {
55                    info!("shutting down");
56                    return Ok(());
57                }
58                _ = interval.tick() => {
59                        match self.fetch_data(&shutdown).await {
60                            Ok(Some(new_data)) => {
61                                let _ = self.watch.send_replace(new_data);
62                            }
63                            Ok(None) => (),
64                            Err(err) => warn!(?err, "failed to get block hash data"),
65                        };
66                }
67            }
68        }
69    }
70
71    pub async fn fetch_data(
72        &mut self,
73        shutdown: &SubsystemHandle,
74    ) -> Result<Option<BlockHashData>, Error> {
75        let fetch_fut = future::try_join(
76            self.client
77                .get_latest_blockhash_with_commitment(self.client.commitment()),
78            self.client.get_block_height(),
79        )
80        .map_err(Error::from)
81        .map_ok(
82            |((last_valid_blockhash, last_valid_block_height), current_block_height)| {
83                BlockHashData {
84                    last_valid_block_height,
85                    last_valid_blockhash,
86                    current_block_height,
87                }
88            },
89        );
90        tokio::select! {
91            result = fetch_fut => result.map(Some),
92            _ = shutdown.on_shutdown_requested() => Ok(None)
93        }
94    }
95}