use std::sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
};
use dashmap::DashMap;
use simulator_client::AccountDiffNotification;
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction};
use tokio::sync::mpsc::UnboundedSender;
use crate::{
fetch::transaction_from_encoded,
output::{AccountDiff, AccountDiffRow, OutputEvent, Transaction},
};
const RECORDS_RETENTION_SLOTS: u64 = 64;
#[derive(Clone)]
pub(crate) struct RecordsStore {
map: Arc<DashMap<String, Transaction>>,
max_slot: Arc<AtomicU64>,
total: Arc<AtomicUsize>,
successes: Arc<AtomicUsize>,
}
impl RecordsStore {
pub(crate) fn new() -> Self {
Self {
map: Arc::new(DashMap::new()),
max_slot: Arc::new(AtomicU64::new(0)),
total: Arc::new(AtomicUsize::new(0)),
successes: Arc::new(AtomicUsize::new(0)),
}
}
pub(crate) fn total(&self) -> usize {
self.total.load(Ordering::Relaxed)
}
pub(crate) fn successes(&self) -> usize {
self.successes.load(Ordering::Relaxed)
}
fn get(&self, sig: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Transaction>> {
self.map.get(sig)
}
fn insert(&self, sig: String, tx: Transaction) {
self.total.fetch_add(1, Ordering::Relaxed);
if tx.success {
self.successes.fetch_add(1, Ordering::Relaxed);
}
let slot = tx.slot;
let prev_max = self.max_slot.fetch_max(slot, Ordering::Relaxed);
self.map.insert(sig, tx);
if slot > prev_max
&& let Some(cutoff) = slot.checked_sub(RECORDS_RETENTION_SLOTS)
{
self.map.retain(|_, v| v.slot >= cutoff);
}
}
}
pub(crate) fn on_account_diff_notification(
records: RecordsStore,
output_tx: UnboundedSender<OutputEvent>,
notification: AccountDiffNotification,
) {
let Some(sig) = notification.signature else {
return;
};
let Some(account) = notification.account else {
return;
};
let slot = notification.context.slot;
let tx_index = notification.tx_index;
let block_time = notification.block_time;
let diff = AccountDiff {
account,
pre_state: notification.pre,
post_state: notification.post,
};
let row = match records.get(&sig) {
Some(entry) => AccountDiffRow::from_diff(
slot,
&sig,
tx_index,
block_time,
&diff,
&entry.sol_changes,
&entry.token_changes,
),
None => AccountDiffRow::from_diff(slot, &sig, tx_index, block_time, &diff, &[], &[]),
};
let _ = output_tx.send(OutputEvent::Diff(row));
}
pub(crate) fn on_transaction_notification(
records: RecordsStore,
output_tx: UnboundedSender<OutputEvent>,
notification: EncodedConfirmedTransactionWithStatusMeta,
) {
let slot = notification.slot;
let Some(signature) = primary_signature(¬ification.transaction.transaction) else {
tracing::warn!(
slot,
"transactionSubscribe notification missing primary signature; dropping"
);
return;
};
let logs = notification
.transaction
.meta
.as_ref()
.and_then(|meta| match &meta.log_messages {
solana_transaction_status::option_serializer::OptionSerializer::Some(v) => {
Some(v.clone())
}
_ => None,
})
.unwrap_or_default();
let Some(mut tx) = transaction_from_encoded(notification, &signature, slot) else {
tracing::warn!(
slot,
signature,
"transactionSubscribe payload could not be decoded into a Transaction; dropping"
);
return;
};
tx.logs = logs;
records.insert(signature, tx.clone());
let _ = output_tx.send(OutputEvent::Tx(tx));
}
fn primary_signature(encoded: &EncodedTransaction) -> Option<String> {
match encoded {
EncodedTransaction::Json(ui_tx) => ui_tx.signatures.first().cloned(),
EncodedTransaction::LegacyBinary(_) | EncodedTransaction::Binary(_, _) => None,
EncodedTransaction::Accounts(parsed) => parsed.signatures.first().cloned(),
}
}