use std::sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
};
use dashmap::{DashMap, DashSet};
use simulator_client::AccountDiffNotification;
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction};
use tokio::sync::mpsc::UnboundedSender;
use crate::{
fetch::transaction_from_encoded,
output::{AccountDiff, AccountDiffRow, OutputEvent, SolChange, TokenChange, Transaction},
};
const RECORDS_RETENTION_SLOTS: u64 = 64;
#[derive(Clone)]
pub(crate) struct RecordsStore {
transactions: Arc<DashMap<String, BalanceChanges>>,
account_diffs: Arc<DashSet<DiffKey>>,
tx_max_slot: Arc<AtomicU64>,
diff_max_slot: Arc<AtomicU64>,
total: Arc<AtomicUsize>,
successes: Arc<AtomicUsize>,
}
type DiffKey = (u64, String, String);
struct BalanceChanges {
slot: u64,
sol_changes: Vec<SolChange>,
token_changes: Vec<TokenChange>,
}
impl RecordsStore {
pub(crate) fn new() -> Self {
Self {
transactions: Arc::new(DashMap::new()),
account_diffs: Arc::new(DashSet::new()),
tx_max_slot: Arc::new(AtomicU64::new(0)),
diff_max_slot: Arc::new(AtomicU64::new(0)),
total: Arc::new(AtomicUsize::new(0)),
successes: Arc::new(AtomicUsize::new(0)),
}
}
pub(crate) fn total(&self) -> usize {
self.total.load(Ordering::Relaxed)
}
pub(crate) fn successes(&self) -> usize {
self.successes.load(Ordering::Relaxed)
}
fn get(&self, sig: &str) -> Option<dashmap::mapref::one::Ref<'_, String, BalanceChanges>> {
self.transactions.get(sig)
}
fn note_transaction(&self, sig: String, tx: &Transaction) -> bool {
let stored = BalanceChanges {
slot: tx.slot,
sol_changes: tx.sol_changes.clone(),
token_changes: tx.token_changes.clone(),
};
if self.transactions.insert(sig, stored).is_some() {
return false;
}
self.total.fetch_add(1, Ordering::Relaxed);
if tx.success {
self.successes.fetch_add(1, Ordering::Relaxed);
}
advance_window(&self.tx_max_slot, tx.slot, |cutoff| {
self.transactions.retain(|_, v| v.slot >= cutoff)
});
true
}
fn note_diff(&self, slot: u64, sig: &str, account: &str) -> bool {
let is_new = self
.account_diffs
.insert((slot, sig.to_string(), account.to_string()));
advance_window(&self.diff_max_slot, slot, |cutoff| {
self.account_diffs.retain(|(s, _, _)| *s >= cutoff)
});
is_new
}
}
fn advance_window(max_slot: &AtomicU64, slot: u64, prune: impl FnOnce(u64)) {
let prev_max = max_slot.fetch_max(slot, Ordering::Relaxed);
if slot > prev_max
&& let Some(cutoff) = slot.checked_sub(RECORDS_RETENTION_SLOTS)
{
prune(cutoff);
}
}
pub(crate) fn on_account_diff_notification(
records: RecordsStore,
output_tx: UnboundedSender<OutputEvent>,
notification: AccountDiffNotification,
) {
let Some(sig) = notification.signature else {
return;
};
let Some(account) = notification.account else {
return;
};
let slot = notification.context.slot;
if !records.note_diff(slot, &sig, &account) {
return;
}
let tx_index = notification.tx_index;
let block_time = notification.block_time;
let diff = AccountDiff {
account,
pre_state: notification.pre,
post_state: notification.post,
};
let row = match records.get(&sig) {
Some(entry) => AccountDiffRow::from_diff(
slot,
&sig,
tx_index,
block_time,
&diff,
&entry.sol_changes,
&entry.token_changes,
),
None => AccountDiffRow::from_diff(slot, &sig, tx_index, block_time, &diff, &[], &[]),
};
let _ = output_tx.send(OutputEvent::Diff(row));
}
pub(crate) fn on_transaction_notification(
records: RecordsStore,
output_tx: UnboundedSender<OutputEvent>,
notification: EncodedConfirmedTransactionWithStatusMeta,
) {
let slot = notification.slot;
let Some(signature) = primary_signature(¬ification.transaction.transaction) else {
tracing::warn!(
slot,
"transactionSubscribe notification missing primary signature; dropping"
);
return;
};
let logs = notification
.transaction
.meta
.as_ref()
.and_then(|meta| match &meta.log_messages {
solana_transaction_status::option_serializer::OptionSerializer::Some(v) => {
Some(v.clone())
}
_ => None,
})
.unwrap_or_default();
let Some(mut tx) = transaction_from_encoded(notification, &signature, slot) else {
tracing::warn!(
slot,
signature,
"transactionSubscribe payload could not be decoded into a Transaction; dropping"
);
return;
};
tx.logs = logs;
if records.note_transaction(signature, &tx) {
let _ = output_tx.send(OutputEvent::Tx(tx));
}
}
fn primary_signature(encoded: &EncodedTransaction) -> Option<String> {
match encoded {
EncodedTransaction::Json(ui_tx) => ui_tx.signatures.first().cloned(),
EncodedTransaction::LegacyBinary(_) | EncodedTransaction::Binary(_, _) => None,
EncodedTransaction::Accounts(parsed) => parsed.signatures.first().cloned(),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tx(slot: u64, sig: &str, success: bool) -> Transaction {
Transaction {
slot,
timestamp: None,
signature: sig.to_string(),
success,
error: None,
logs: Vec::new(),
sol_changes: Vec::new(),
token_changes: Vec::new(),
account_diffs: Vec::new(),
}
}
#[test]
fn note_transaction_dedups_repeated_signatures() {
let store = RecordsStore::new();
assert!(store.note_transaction("sig".into(), &tx(10, "sig", true)));
assert!(!store.note_transaction("sig".into(), &tx(10, "sig", true)));
assert_eq!(store.total(), 1);
assert_eq!(store.successes(), 1);
}
#[test]
fn note_diff_dedups_repeated_identities() {
let store = RecordsStore::new();
assert!(store.note_diff(5, "sig", "acct"));
assert!(!store.note_diff(5, "sig", "acct"));
assert!(store.note_diff(5, "sig", "other"));
assert!(store.note_diff(6, "sig", "acct"));
}
}