#[macro_use]
extern crate string_enum;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use serde::{Deserialize, Serialize};
use solana_program::clock::UnixTimestamp;
use solana_sdk::transaction::TransactionError;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{watch::{Receiver, Sender}};
use tracing::info;
#[derive(Clone, Serialize, Deserialize)]
pub struct QueueItem {
pub numbers: Vec<u64>,
pub programs: Vec<String>
}
#[derive(Clone, Serialize, Deserialize)]
pub struct TransactionProgram {
pub transaction_hash: String,
pub program: String,
pub timestamp: u64
}
#[derive(Clone, Serialize, Deserialize)]
pub struct AccountProgram {
pub account: String,
pub program: String,
pub block: u64,
pub timestamp: u64
}
#[derive(Clone, Serialize, Deserialize)]
pub struct AccountTransaction {
pub transaction_hash: String,
pub account: String,
pub block: u64,
pub timestamp: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Block {
pub epoch: u32,
pub previous_hash: String,
pub producer: String,
pub hash: String,
pub parent_number: u64,
pub number: u64,
pub data_size: u64,
pub number_of_transactions: u32,
pub successful_transactions: u32,
pub vote_transactions: u32,
pub total_tx_fees: u64,
pub number_of_rewards: u32,
pub total_reward_amount: u64,
pub total_compute_units_consumed: u64,
pub total_compute_units_limit: u64,
pub block_time: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Transfer {
pub transaction_hash: String,
pub status: u16,
pub source: String,
pub source_association: Option<String>,
pub destination: String,
pub destination_association: Option<String>,
pub token: Option<String>,
pub amount: u64,
pub timestamp: u64,
}
impl From<EnrichedTransfer> for Transfer {
fn from(transfer: EnrichedTransfer) -> Self {
Self {
transaction_hash: "".into(),
status: transfer.status as u16,
source: transfer.source,
source_association: transfer.source_association,
destination: transfer.destination,
destination_association: transfer.destination_association,
token: transfer.token,
amount: transfer.amount,
timestamp: transfer.timestamp
}
}
}
impl From<Option<TransactionError>> for EnrichedTransferStatus {
fn from(err: Option<TransactionError>) -> Self {
match err {
Some(error) =>
match error {
TransactionError::AccountInUse => Self::AccountInUse,
TransactionError::AccountLoadedTwice => Self::AccountLoadedTwice,
TransactionError::AccountNotFound => Self::AccountNotFound,
TransactionError::ProgramAccountNotFound => Self::ProgramAccountNotFound,
TransactionError::InsufficientFundsForFee => Self::InsufficientFundsForFee,
TransactionError::InvalidAccountForFee => Self::InvalidAccountForFee,
TransactionError::AlreadyProcessed => Self::AlreadyProcessed,
TransactionError::BlockhashNotFound => Self::BlockhashNotFound,
TransactionError::InstructionError(_, _) => Self::InstructionError,
TransactionError::CallChainTooDeep => Self::CallChainTooDeep,
TransactionError::MissingSignatureForFee => Self::MissingSignatureForFee,
TransactionError::InvalidAccountIndex => Self::InvalidAccountIndex,
TransactionError::SignatureFailure => Self::SignatureFailure,
TransactionError::InvalidProgramForExecution => Self::InvalidProgramForExecution,
TransactionError::SanitizeFailure => Self::SanitizeFailure,
TransactionError::ClusterMaintenance => Self::ClusterMaintenance,
TransactionError::AccountBorrowOutstanding => Self::AccountBorrowOutstanding,
TransactionError::WouldExceedMaxBlockCostLimit => Self::WouldExceedMaxBlockCostLimit,
TransactionError::UnsupportedVersion => Self::UnsupportedVersion,
TransactionError::InvalidWritableAccount => Self::InvalidWritableAccount,
TransactionError::WouldExceedMaxAccountCostLimit => Self::WouldExceedMaxAccountCostLimit,
TransactionError::WouldExceedAccountDataBlockLimit => Self::WouldExceedAccountDataBlockLimit,
TransactionError::TooManyAccountLocks => Self::TooManyAccountLocks,
TransactionError::AddressLookupTableNotFound => Self::AddressLookupTableNotFound,
TransactionError::InvalidAddressLookupTableOwner => Self::InvalidAddressLookupTableOwner,
TransactionError::InvalidAddressLookupTableData => Self::InvalidAddressLookupTableData,
TransactionError::InvalidAddressLookupTableIndex => Self::InvalidAddressLookupTableIndex,
TransactionError::InvalidRentPayingAccount => Self::InvalidRentPayingAccount,
TransactionError::WouldExceedMaxVoteCostLimit => Self::WouldExceedMaxVoteCostLimit,
TransactionError::WouldExceedAccountDataTotalLimit => Self::WouldExceedAccountDataTotalLimit,
TransactionError::DuplicateInstruction(_) => Self::DuplicateInstruction,
TransactionError::InsufficientFundsForRent { .. } => Self::InsufficientFundsForRent,
},
None => Self::Successful
}
}
}
#[derive(Clone, StringEnum)]
pub enum EnrichedTransferStatus {
Successful = 1,
AccountInUse = 2,
AccountLoadedTwice = 3,
AccountNotFound = 4,
ProgramAccountNotFound = 5,
InsufficientFundsForFee = 6,
InvalidAccountForFee = 7,
AlreadyProcessed = 8,
BlockhashNotFound = 9,
InstructionError = 10,
CallChainTooDeep = 11,
MissingSignatureForFee = 12,
InvalidAccountIndex = 13,
SignatureFailure = 14,
InvalidProgramForExecution = 15,
SanitizeFailure = 16,
ClusterMaintenance = 17,
AccountBorrowOutstanding = 18,
WouldExceedMaxBlockCostLimit = 19,
UnsupportedVersion = 20,
InvalidWritableAccount = 21,
WouldExceedMaxAccountCostLimit = 22,
WouldExceedAccountDataBlockLimit = 23,
TooManyAccountLocks = 24,
AddressLookupTableNotFound = 25,
InvalidAddressLookupTableOwner = 26,
InvalidAddressLookupTableData = 27,
InvalidAddressLookupTableIndex = 28,
InvalidRentPayingAccount = 29,
WouldExceedMaxVoteCostLimit = 30,
WouldExceedAccountDataTotalLimit = 31,
DuplicateInstruction = 32,
InsufficientFundsForRent = 33
}
#[derive(Clone, Serialize, Deserialize)]
pub struct EnrichedTransfer {
pub status: EnrichedTransferStatus,
pub source: String,
pub source_association: Option<String>,
pub destination: String,
pub destination_association: Option<String>,
pub token: Option<String>,
pub amount: u64,
pub timestamp: u64,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct TransactionInfo {
pub hash: String,
pub status: u16,
pub fee: u64,
pub total_cu_consumed: u64,
pub total_cu_limit: u64,
pub transfers: Vec<Transfer>,
pub account_transactions: Vec<AccountTransaction>
}
#[derive(Clone, Deserialize, Serialize)]
pub struct ProgramConsumption {
pub tx: String,
pub status: u16,
pub line: u32,
pub program: String,
pub cu_consumed: u64,
pub cu_limit: u64,
pub timestamp: UnixTimestamp,
}
pub fn is_not_transfer(address: &str) -> bool {
!vec![
"11111111111111111111111111111111",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL"
].contains(&address)
}
#[derive(Debug)]
pub struct SysSigReceiver {
shutdown: Arc<AtomicBool>,
notify: Receiver<()>,
}
impl SysSigReceiver {
pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
SysSigReceiver {
shutdown,
notify,
}
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::SeqCst)
}
pub async fn recv(&mut self) {
if self.is_shutdown() {
return;
}
let _ = self.notify.changed().await;
if !self.is_shutdown() {
self.shutdown.store(true, Ordering::SeqCst);
}
}
}
pub struct SysSigListener {
shutdown: Arc<AtomicBool>,
notifier: Sender<()>
}
impl SysSigListener {
pub fn new(notifier: Sender<()>) -> Self {
Self {
shutdown: Arc::new(AtomicBool::new(false)),
notifier
}
}
pub async fn watchdog(self) {
info!("Watchdog turned on!");
let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");
select! {
_ = tokio::signal::ctrl_c() => {
info!("CTRL+C received, terminating now!");
},
_ = alarm_sig.recv() => {
info!("SIGALRM received, terminating now!");
}
_ = hangup_sig.recv() => {
info!("SIGHUP received, terminating now!");
}
_ = int_sig.recv() => {
info!("SIGINT received, terminating now!");
}
_ = quit_sig.recv() => {
info!("SIGQUIT received, terminating now!");
}
_ = term_sig.recv() => {
info!("SIGTERM received, terminating now!");
}
}
self.forced_shutdown();
}
pub fn forced_shutdown(self) {
info!("Shutting down!");
(*self.shutdown).store(true, Ordering::SeqCst);
drop(self.notifier);
}
pub fn get_receiver(&mut self) -> SysSigReceiver {
SysSigReceiver::new(Arc::clone(&self.shutdown),
self.notifier.subscribe())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::sync::watch;
use super::*;
#[tokio::test]
async fn test_shutdown() {
let (notify_shutdown, _) = watch::channel(());
let mut listener = SysSigListener::new(notify_shutdown);
let receiver = listener.get_receiver();
tokio::time::sleep(Duration::from_millis(2000)).await;
listener.forced_shutdown();
assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
}
}