use crate::{
rpc,
sync::{AccountUpdate, SolanaChangeListener, SubRequest},
Error, Network, Result,
};
use dashmap::DashMap;
use solana_sdk::{
account::Account, commitment_config::CommitmentLevel, pubkey::Pubkey,
};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{
broadcast::{self, Receiver, Sender},
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
task::JoinHandle,
};
use tracing::{debug, error, info};
use tracing_futures::Instrument;
pub(crate) type AccountsMap = DashMap<Pubkey, (Account, bool)>;
pub(crate) type SubRequestCall =
(SubRequest, Option<oneshot::Sender<Vec<Account>>>);
const MAX_UPDATES_SUBSCRIBER_LAG: usize = 64;
const MIN_RECONNECT_EVERY: u64 = 5;
#[derive(Clone)]
pub struct SyncOptions {
pub network: Network,
pub max_lag: Option<usize>,
pub reconnect_every: Option<Duration>,
pub rpc_timeout: Duration,
pub ws_connect_timeout: Duration,
pub commitment: CommitmentLevel,
}
impl Default for SyncOptions {
fn default() -> Self {
Self {
network: Network::Mainnet,
max_lag: None,
reconnect_every: None,
commitment: CommitmentLevel::Finalized,
rpc_timeout: Duration::from_secs(12),
ws_connect_timeout: Duration::from_secs(12),
}
}
}
pub struct BlockchainShadow {
options: SyncOptions,
accounts: Arc<AccountsMap>,
sub_req: UnboundedSender<SubRequestCall>,
sync_worker: Option<JoinHandle<Result<()>>>,
monitor_worker: Option<JoinHandle<Result<()>>>,
ext_updates: Sender<(Pubkey, Account)>,
}
impl BlockchainShadow {
pub async fn new(options: SyncOptions) -> Result<Self> {
let max_lag = options.max_lag.unwrap_or(MAX_UPDATES_SUBSCRIBER_LAG);
if options
.reconnect_every
.map(|v| v < Duration::from_secs(MIN_RECONNECT_EVERY))
.unwrap_or(false)
{
panic!("low reconnect_every duration causes unstable behaviour, minimum reconnect_every value is {} seconds", MIN_RECONNECT_EVERY)
}
let (subscribe_tx, subscribe_rx) = unbounded_channel::<SubRequestCall>();
let mut instance = Self {
options,
accounts: Arc::new(AccountsMap::new()),
sync_worker: None,
monitor_worker: None,
sub_req: subscribe_tx,
ext_updates: broadcast::channel(max_lag).0,
};
instance.create_worker(subscribe_rx).await?;
instance.create_monitor_worker().await?;
Ok(instance)
}
pub async fn add_accounts(
&mut self,
accounts: &[Pubkey],
) -> Result<Vec<Option<Account>>> {
let mut result = Vec::new();
for key in accounts {
result.push(self.add_account(key).await?);
}
Ok(result)
}
pub async fn add_account(
&mut self,
account: &Pubkey,
) -> Result<Option<Account>> {
let (oneshot, result) = oneshot::channel::<Vec<Account>>();
self
.sub_req
.clone()
.send((SubRequest::Account(*account), Some(oneshot)))
.map_err(|_| Error::InternalError)?;
Ok(result.await?.pop())
}
pub async fn add_program(
&mut self,
program_id: &Pubkey,
) -> Result<Vec<Account>> {
let (oneshot, result) = oneshot::channel();
self
.sub_req
.clone()
.send((SubRequest::Program(*program_id), Some(oneshot)))
.map_err(|_| Error::InternalError)?;
Ok(result.await?)
}
pub async fn new_for_accounts(
accounts: &[Pubkey],
options: SyncOptions,
) -> Result<Self> {
let mut instance = BlockchainShadow::new(options).await?;
instance.add_accounts(accounts).await?;
Ok(instance)
}
pub async fn new_for_program(
program: &Pubkey,
options: SyncOptions,
) -> Result<Self> {
let mut instance = BlockchainShadow::new(options).await?;
instance.add_program(program).await?;
Ok(instance)
}
pub const fn network(&self) -> &Network {
&self.options.network
}
pub fn len(&self) -> usize {
self.accounts.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn for_each_account(&self, mut op: impl FnMut(&Pubkey, &Account)) {
for pair in self.accounts.iter() {
let pubkey = pair.pair().0;
let (account, _) = pair.pair().1;
op(pubkey, account);
}
}
pub fn get_account(&self, key: &Pubkey) -> Option<Account> {
self.accounts.get(key).map(|acc| acc.clone().0)
}
pub async fn worker(mut self) -> Result<()> {
match self.sync_worker.take() {
Some(handle) => Ok(handle.await??),
None => Err(Error::WorkerDead),
}
}
pub fn updates_channel(&self) -> Receiver<(Pubkey, Account)> {
self.ext_updates.subscribe()
}
}
impl BlockchainShadow {
async fn create_worker(
&mut self,
mut subscribe_rx: UnboundedReceiver<SubRequestCall>,
) -> Result<()> {
let accs_ref = self.accounts.clone();
let updates_tx = self.ext_updates.clone();
let options = self.options.clone();
let client = rpc::ClientBuilder::new(
self.network().rpc_url(),
self.options.rpc_timeout,
self.options.commitment,
);
self.sync_worker = Some(tokio::spawn(
async move {
let mut listener =
SolanaChangeListener::new(client, accs_ref.clone(), options).await?;
loop {
tokio::select! {
recv_result = listener.recv() => {
match recv_result {
Ok(Some(AccountUpdate { pubkey, account })) => {
debug!("account {} updated", &pubkey);
accs_ref.insert(pubkey, ( account.clone(), true ));
if updates_tx.receiver_count() != 0 {
updates_tx.send((pubkey, account)).unwrap();
}
},
Ok(None) => {
tracing::error!("unreachable recv_result reached??");
unreachable!();
},
Err(e) => {
error!("error in the sync worker thread: {:?}", e);
listener.reconnect_all().await?;
}
}
},
Some(subreq) = subscribe_rx.recv() => {
match subreq {
( SubRequest::Account(pubkey), oneshot ) => {
debug!(?pubkey, "subscribe_account recv");
listener.subscribe_account(pubkey, oneshot).await? },
( SubRequest::Program(pubkey), oneshot ) => {
debug!(?pubkey, "subscribe_program recv");
listener.subscribe_program(pubkey, oneshot).await? },
( SubRequest::ReconnectAll, _ ) => {
debug!("reconnect_all recv");
listener.reconnect_all().await?
}
}
}
};
}
}
.instrument(tracing::debug_span!("worker_loop")),
));
Ok(())
}
async fn create_monitor_worker(&mut self) -> Result<()> {
if let Some(every) = self.options.reconnect_every {
let channel = self.sub_req.clone();
self.monitor_worker = Some(tokio::spawn(
async move {
loop {
tokio::time::sleep(every).await;
info!("reestablising connection to solana");
if let Err(e) = channel.send((SubRequest::ReconnectAll, None)) {
tracing::error!(?e)
}
}
}
.instrument(tracing::debug_span!("monitor_worker_loop")),
));
}
Ok(())
}
}