use std::collections::HashMap;
use std::iter::Map;
use ts_rs::TS;
#[macro_use]
extern crate string_enum;
pub mod helpers;
pub mod processors;
pub mod utils;
use serde::{Deserialize, Serialize};
use solana_program::clock::UnixTimestamp;
use solana_sdk::transaction::TransactionError;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch::{Receiver, Sender};
use tracing::info;
#[derive(Clone, Serialize, Deserialize)]
pub struct QueueItem {
pub numbers: Vec<u64>,
pub programs: Vec<String>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct TransactionProgram {
pub transaction_hash: String,
pub program: String,
pub timestamp: u64,
}
pub type AccountMap = Map<String, AccountLatest>;
pub struct AccountLatest {
pub balance: u64,
pub data: Vec<u8>,
pub owner: String,
pub rent_epoch: u64,
pub mint: Option<String>,
pub timestamp: u64,
pub block: u64
}
pub struct AccountEntry {
pub hash: String,
pub balance: Option<u64>,
pub data: Option<Vec<u8>>,
pub data_diff_map: Option<HashMap<usize, u8>>,
pub owner: Option<String>,
pub mint: Option<String>,
pub timestamp: u64,
pub block: u64
}
#[derive(Clone, Serialize, Deserialize)]
pub struct AccountProgram {
pub account: String,
pub program: String,
pub block: u64,
pub timestamp: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct AccountTransaction {
pub transaction_hash: String,
pub account: String,
pub block: u64,
pub timestamp: u64,
}
impl PartialEq for AccountTransaction {
fn eq(&self, other: &Self) -> bool {
self.block == other.block
&& self.account == other.account
&& self.timestamp == other.timestamp
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct BatchedAccountTransaction {
pub hashes: Vec<String>,
pub timestamp: u64,
}
#[derive(Clone, Serialize, Deserialize, TS)]
#[ts(export)]
pub struct Block {
pub epoch: u32,
pub previous_hash: String,
pub producer: String,
pub hash: String,
pub parent_number: u64,
pub number: u64,
pub data_size: u64,
pub number_of_transactions: u32,
pub successful_transactions: u32,
pub vote_transactions: u32,
pub total_tx_fees: u64,
pub number_of_rewards: u32,
pub total_reward_amount: u64,
pub total_compute_units_consumed: u64,
pub total_compute_units_limit: u64,
pub block_time: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub enum VersionedTransfer {
V0(Transfer),
V1(EnrichedTransfer),
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Transfer {
pub transaction_hash: String,
pub status: u16,
pub source: String,
pub source_association: Option<String>,
pub destination: String,
pub destination_association: Option<String>,
pub token: Option<String>,
pub amount: u64,
pub timestamp: u64,
}
impl From<EnrichedTransfer> for Transfer {
fn from(transfer: EnrichedTransfer) -> Self {
Self {
transaction_hash: "".into(),
status: transfer.status as u16,
source: transfer.source,
source_association: transfer.source_association,
destination: transfer.destination,
destination_association: transfer.destination_association,
token: transfer.token,
amount: transfer.amount,
timestamp: transfer.timestamp,
}
}
}
impl From<Option<TransactionError>> for EnrichedTransferStatus {
fn from(err: Option<TransactionError>) -> Self {
match err {
Some(error) => match error {
TransactionError::AccountInUse => Self::AccountInUse,
TransactionError::AccountLoadedTwice => Self::AccountLoadedTwice,
TransactionError::AccountNotFound => Self::AccountNotFound,
TransactionError::ProgramAccountNotFound => Self::ProgramAccountNotFound,
TransactionError::InsufficientFundsForFee => Self::InsufficientFundsForFee,
TransactionError::InvalidAccountForFee => Self::InvalidAccountForFee,
TransactionError::AlreadyProcessed => Self::AlreadyProcessed,
TransactionError::BlockhashNotFound => Self::BlockhashNotFound,
TransactionError::InstructionError(_, _) => Self::InstructionError,
TransactionError::CallChainTooDeep => Self::CallChainTooDeep,
TransactionError::MissingSignatureForFee => Self::MissingSignatureForFee,
TransactionError::InvalidAccountIndex => Self::InvalidAccountIndex,
TransactionError::SignatureFailure => Self::SignatureFailure,
TransactionError::InvalidProgramForExecution => Self::InvalidProgramForExecution,
TransactionError::SanitizeFailure => Self::SanitizeFailure,
TransactionError::ClusterMaintenance => Self::ClusterMaintenance,
TransactionError::AccountBorrowOutstanding => Self::AccountBorrowOutstanding,
TransactionError::WouldExceedMaxBlockCostLimit => {
Self::WouldExceedMaxBlockCostLimit
}
TransactionError::UnsupportedVersion => Self::UnsupportedVersion,
TransactionError::InvalidWritableAccount => Self::InvalidWritableAccount,
TransactionError::WouldExceedMaxAccountCostLimit => {
Self::WouldExceedMaxAccountCostLimit
}
TransactionError::WouldExceedAccountDataBlockLimit => {
Self::WouldExceedAccountDataBlockLimit
}
TransactionError::TooManyAccountLocks => Self::TooManyAccountLocks,
TransactionError::AddressLookupTableNotFound => Self::AddressLookupTableNotFound,
TransactionError::InvalidAddressLookupTableOwner => {
Self::InvalidAddressLookupTableOwner
}
TransactionError::InvalidAddressLookupTableData => {
Self::InvalidAddressLookupTableData
}
TransactionError::InvalidAddressLookupTableIndex => {
Self::InvalidAddressLookupTableIndex
}
TransactionError::InvalidRentPayingAccount => Self::InvalidRentPayingAccount,
TransactionError::WouldExceedMaxVoteCostLimit => Self::WouldExceedMaxVoteCostLimit,
TransactionError::WouldExceedAccountDataTotalLimit => {
Self::WouldExceedAccountDataTotalLimit
}
TransactionError::DuplicateInstruction(_) => Self::DuplicateInstruction,
TransactionError::InsufficientFundsForRent { .. } => Self::InsufficientFundsForRent,
},
None => Self::Successful,
}
}
}
#[derive(Copy, Clone, PartialEq, StringEnum)]
pub enum EnrichedTransferStatus {
Null = 0,
Successful = 1,
AccountInUse = 2,
AccountLoadedTwice = 3,
AccountNotFound = 4,
ProgramAccountNotFound = 5,
InsufficientFundsForFee = 6,
InvalidAccountForFee = 7,
AlreadyProcessed = 8,
BlockhashNotFound = 9,
InstructionError = 10,
CallChainTooDeep = 11,
MissingSignatureForFee = 12,
InvalidAccountIndex = 13,
SignatureFailure = 14,
InvalidProgramForExecution = 15,
SanitizeFailure = 16,
ClusterMaintenance = 17,
AccountBorrowOutstanding = 18,
WouldExceedMaxBlockCostLimit = 19,
UnsupportedVersion = 20,
InvalidWritableAccount = 21,
WouldExceedMaxAccountCostLimit = 22,
WouldExceedAccountDataBlockLimit = 23,
TooManyAccountLocks = 24,
AddressLookupTableNotFound = 25,
InvalidAddressLookupTableOwner = 26,
InvalidAddressLookupTableData = 27,
InvalidAddressLookupTableIndex = 28,
InvalidRentPayingAccount = 29,
WouldExceedMaxVoteCostLimit = 30,
WouldExceedAccountDataTotalLimit = 31,
DuplicateInstruction = 32,
InsufficientFundsForRent = 33,
}
impl EnrichedTransferStatus {
pub fn from_u16(value: u16) -> Self {
match value {
1 => Self::Successful,
2 => Self::AccountInUse,
3 => Self::AccountLoadedTwice,
4 => Self::AccountNotFound,
5 => Self::ProgramAccountNotFound,
6 => Self::InsufficientFundsForFee,
7 => Self::InvalidAccountForFee,
8 => Self::AlreadyProcessed,
9 => Self::BlockhashNotFound,
10 => Self::InstructionError,
11 => Self::CallChainTooDeep,
12 => Self::MissingSignatureForFee,
13 => Self::InvalidAccountIndex,
14 => Self::SignatureFailure,
15 => Self::InvalidProgramForExecution,
16 => Self::SanitizeFailure,
17 => Self::ClusterMaintenance,
18 => Self::AccountBorrowOutstanding,
19 => Self::WouldExceedMaxBlockCostLimit,
20 => Self::UnsupportedVersion,
21 => Self::InvalidWritableAccount,
22 => Self::WouldExceedMaxAccountCostLimit,
23 => Self::WouldExceedAccountDataBlockLimit,
24 => Self::TooManyAccountLocks,
25 => Self::AddressLookupTableNotFound,
26 => Self::InvalidAddressLookupTableOwner,
27 => Self::InvalidAddressLookupTableData,
28 => Self::InvalidAddressLookupTableIndex,
29 => Self::InvalidRentPayingAccount,
30 => Self::WouldExceedMaxVoteCostLimit,
31 => Self::WouldExceedAccountDataTotalLimit,
32 => Self::DuplicateInstruction,
33 => Self::InsufficientFundsForRent,
_ => Self::Null,
}
}
pub fn to_16(&self) -> u16 {
*self as u16
}
}
pub type EnrichedTransferPair = (String, i16, i16, EnrichedTransfer);
#[derive(Clone, Serialize, Deserialize)]
pub struct EnrichedTransfer {
pub action: String,
pub status: EnrichedTransferStatus,
pub source: String,
pub source_association: Option<String>,
pub destination: String,
pub destination_association: Option<String>,
pub token: Option<String>,
pub amount: u64,
pub timestamp: u64,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct TransactionInfo {
pub hash: String,
pub status: u16,
pub fee: u64,
pub total_cu_consumed: u64,
pub total_cu_limit: u64,
pub transfers: Vec<EnrichedTransferPair>,
pub account_transactions: Vec<AccountTransaction>,
}
impl TransactionInfo {
pub fn old_transfers(&mut self) -> Vec<EnrichedTransfer> {
self.transfers.iter().map(|trf| trf.3.clone()).collect()
}
}
#[derive(Clone, Deserialize, Serialize)]
pub struct ProgramConsumption {
pub tx: String,
pub status: u16,
pub line: u32,
pub program: String,
pub cu_consumed: u64,
pub cu_limit: u64,
pub timestamp: UnixTimestamp,
}
pub fn is_not_transfer(address: &str) -> bool {
!vec![
"11111111111111111111111111111111",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL",
]
.contains(&address)
}
#[derive(Debug)]
pub struct SysSigReceiver {
shutdown: Arc<AtomicBool>,
notify: Receiver<()>,
}
impl SysSigReceiver {
pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
SysSigReceiver { shutdown, notify }
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::SeqCst)
}
pub async fn recv(&mut self) {
if self.is_shutdown() {
return;
}
let _ = self.notify.changed().await;
if !self.is_shutdown() {
self.shutdown.store(true, Ordering::SeqCst);
}
}
}
pub struct SysSigListener {
shutdown: Arc<AtomicBool>,
notifier: Sender<()>,
}
impl SysSigListener {
pub fn new(notifier: Sender<()>) -> Self {
Self {
shutdown: Arc::new(AtomicBool::new(false)),
notifier,
}
}
pub async fn watchdog(self) {
info!("Watchdog turned on!");
let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");
select! {
_ = tokio::signal::ctrl_c() => {
info!("CTRL+C received, terminating now!");
},
_ = alarm_sig.recv() => {
info!("SIGALRM received, terminating now!");
}
_ = hangup_sig.recv() => {
info!("SIGHUP received, terminating now!");
}
_ = int_sig.recv() => {
info!("SIGINT received, terminating now!");
}
_ = quit_sig.recv() => {
info!("SIGQUIT received, terminating now!");
}
_ = term_sig.recv() => {
info!("SIGTERM received, terminating now!");
}
}
self.forced_shutdown();
}
pub fn forced_shutdown(self) {
info!("Shutting down!");
(*self.shutdown).store(true, Ordering::SeqCst);
drop(self.notifier);
}
pub fn get_receiver(&mut self) -> SysSigReceiver {
SysSigReceiver::new(Arc::clone(&self.shutdown), self.notifier.subscribe())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::sync::watch;
#[tokio::test]
async fn test_shutdown() {
let (notify_shutdown, _) = watch::channel(());
let mut listener = SysSigListener::new(notify_shutdown);
let receiver = listener.get_receiver();
tokio::time::sleep(Duration::from_millis(2000)).await;
listener.forced_shutdown();
assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
}
}