use std::sync::Arc;
use dashmap::DashMap;
use simulator_client::AccountDiffNotification;
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_response::{Response, RpcLogsResponse},
};
use crate::{
fetch::fetch_transaction,
output::{AccountDiff, AccountDiffRow, Transaction},
};
pub(crate) async fn on_log_notification(
rpc: Arc<RpcClient>,
records: Arc<DashMap<String, Transaction>>,
notification: Response<RpcLogsResponse>,
) {
let slot = notification.context.slot;
let signature = notification.value.signature.clone();
let logs = notification.value.logs.clone();
let mut tx = match fetch_transaction(&rpc, &signature, slot).await {
Some(t) => t,
None => {
let err_str = notification.value.err.as_ref().map(|e| e.to_string());
Transaction {
slot,
timestamp: None,
signature: signature.clone(),
success: err_str.is_none(),
error: err_str,
logs: Vec::new(),
sol_changes: Vec::new(),
token_changes: Vec::new(),
account_diffs: Vec::new(),
}
}
};
tx.logs = logs;
records.insert(signature, tx);
}
pub(crate) async fn on_account_diff_notification(
rpc: Arc<RpcClient>,
records: Arc<DashMap<String, Transaction>>,
stream_tx: Option<tokio::sync::mpsc::UnboundedSender<AccountDiffRow>>,
notification: AccountDiffNotification,
) {
let Some(sig) = notification.signature else {
return;
};
let Some(account) = notification.account else {
return;
};
let slot = notification.context.slot;
let diff = AccountDiff {
account,
pre_state: notification.pre,
post_state: notification.post,
};
if let Some(mut entry) = records.get_mut(&sig) {
if let Some(ref tx) = stream_tx {
let row = AccountDiffRow::from_diff(
entry.slot,
&sig,
&diff,
&entry.sol_changes,
&entry.token_changes,
);
let _ = tx.send(row);
} else {
entry.account_diffs.push(diff);
}
return;
}
let fetched = fetch_transaction(&rpc, &sig, slot).await;
if fetched.is_none() {
tracing::warn!(sig, slot, "failed to fetch transaction metadata");
}
let streaming = if let Some(ref tx) = stream_tx {
let (sol, tok) = match &fetched {
Some(f) => (f.sol_changes.as_slice(), f.token_changes.as_slice()),
None => (&[][..], &[][..]),
};
let _ = tx.send(AccountDiffRow::from_diff(slot, &sig, &diff, sol, tok));
true
} else {
false
};
let sig_fallback = sig.clone();
let mut entry = records.entry(sig).or_insert_with(|| match fetched {
Some(tx) => Transaction {
slot: tx.slot,
timestamp: tx.timestamp,
signature: tx.signature,
success: tx.success,
error: tx.error,
logs: Vec::new(),
sol_changes: tx.sol_changes,
token_changes: tx.token_changes,
account_diffs: Vec::new(),
},
None => Transaction {
slot,
timestamp: None,
signature: sig_fallback,
success: false,
error: None,
logs: Vec::new(),
sol_changes: Vec::new(),
token_changes: Vec::new(),
account_diffs: Vec::new(),
},
});
if !streaming {
entry.account_diffs.push(diff);
}
}