1use std::collections::HashMap;
2use std::fmt::Display;
3use std::iter::Map;
4use ts_rs::TS;
5
6#[macro_use]
7extern crate string_enum;
8
9pub mod helpers;
10pub mod processors;
11pub mod utils;
12
13use serde::{Deserialize, Serialize};
14use solana_program::clock::UnixTimestamp;
15use solana_sdk::transaction::TransactionError;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use tokio::select;
19use tokio::signal::unix::{signal, SignalKind};
20use tokio::sync::watch::{Receiver, Sender};
21use tracing::info;
22
23#[derive(Clone, Serialize, Deserialize)]
24pub struct QueueItem {
25 pub numbers: Vec<u64>,
26 pub programs: Vec<String>,
27}
28
29#[derive(Clone, Serialize, Deserialize)]
30pub struct TransactionProgram {
31 pub transaction_hash: String,
33 pub program: String,
35 pub timestamp: u64,
37}
38
39pub type AccountMap = Map<String, AccountLatest>;
40
41pub struct AccountLatest {
43 pub balance: u64,
45 pub data: Vec<u8>,
47 pub owner: String,
49 pub rent_epoch: u64,
51 pub mint: Option<String>,
53 pub timestamp: u64,
55 pub block: u64,
57}
58
59pub struct AccountEntry {
62 pub hash: String,
64 pub balance: Option<u64>,
66 pub data: Option<Vec<u8>>,
69 pub data_diff_map: Option<HashMap<usize, u8>>,
71 pub owner: Option<String>,
73 pub mint: Option<String>,
75 pub timestamp: u64,
77 pub block: u64,
79}
80
81#[derive(Clone, Serialize, Deserialize)]
82pub struct AccountProgram {
83 pub account: String,
85 pub program: String,
87 pub block: u64,
89 pub timestamp: u64,
91}
92
93#[derive(Clone, Serialize, Deserialize)]
94pub struct AccountTransaction {
95 pub transaction_hash: String,
97 pub account: String,
99 pub block: u64,
101 pub timestamp: u64,
103}
104
105impl PartialEq for AccountTransaction {
106 fn eq(&self, other: &Self) -> bool {
107 self.block == other.block
108 && self.account == other.account
109 && self.timestamp == other.timestamp
110 }
111}
112
113#[derive(Clone, Serialize, Deserialize)]
114pub struct BatchedAccountTransaction {
115 pub hashes: Vec<String>,
117 pub timestamp: u64,
119}
120
121#[derive(Clone, Serialize, Deserialize, TS)]
122#[ts(export)]
123pub struct Block {
124 pub epoch: u32,
126 pub previous_hash: String,
128 pub producer: String,
130 pub hash: String,
132 pub parent_number: u64,
134 pub number: u64,
136 pub data_size: u64,
138 pub number_of_transactions: u32,
140 pub successful_transactions: u32,
142 pub vote_transactions: u32,
144 pub total_tx_fees: u64,
146 pub number_of_rewards: u32,
148 pub total_reward_amount: u64,
150 pub total_compute_units_consumed: u64,
152 pub total_compute_units_limit: u64,
154 pub block_time: u64,
156}
157
158#[derive(Clone, Serialize, Deserialize)]
159pub enum VersionedTransfer {
160 V0(Transfer),
161 V1(EnrichedTransfer),
162}
163
164#[derive(Clone, Serialize, Deserialize)]
171pub struct EnrichedTransfer {
172 pub action: String,
174 pub index: String,
176 pub status: EnrichedTransferStatus,
178 pub source: String,
180 pub source_association: Option<String>,
182 pub destination: String,
184 pub destination_association: Option<String>,
186 pub token: Option<String>,
189 pub decimals: Option<u8>,
191 pub amount: u64,
193 pub timestamp: u64,
195}
196
197#[derive(Clone, Serialize, Deserialize)]
206pub struct Transfer {
207 pub transaction_hash: String,
209 pub status: u16,
211 pub source: String,
213 pub source_association: Option<String>,
215 pub destination: String,
217 pub destination_association: Option<String>,
219 pub token: Option<String>,
222 pub amount: u64,
224 pub timestamp: u64,
226}
227
228impl From<EnrichedTransfer> for Transfer {
229 fn from(transfer: EnrichedTransfer) -> Self {
230 Self {
231 transaction_hash: "".into(),
232 status: transfer.status as u16,
233 source: transfer.source,
234 source_association: transfer.source_association,
235 destination: transfer.destination,
236 destination_association: transfer.destination_association,
237 token: transfer.token,
238 amount: transfer.amount,
239 timestamp: transfer.timestamp,
240 }
241 }
242}
243
244#[derive(Clone, Serialize, Deserialize)]
245pub struct BTEnrichedTransfer {
246 pub action: String,
248 pub index: String,
250 pub status: String,
252 pub source: String,
254 #[serde(rename(serialize = "sourceAssociation"))]
256 pub source_association: String,
257 pub destination: String,
259 #[serde(rename(serialize = "destinationAssociation"))]
261 pub destination_association: String,
262 pub token: String,
265 pub decimals: Option<u8>,
267 pub amount: u64,
269 pub timestamp: u64,
271}
272
273impl From<EnrichedTransfer> for BTEnrichedTransfer {
274 fn from(trf: EnrichedTransfer) -> Self {
275 BTEnrichedTransfer {
276 action: trf.action,
277 index: trf.index,
278 status: trf.status.to_string(),
279 source: trf.source,
280 source_association: if let Some(source_association) = trf.source_association {
281 source_association
282 } else {
283 "".to_string()
284 },
285 destination: trf.destination,
286 destination_association: if let Some(destination_association) =
287 trf.destination_association
288 {
289 destination_association
290 } else {
291 "".to_string()
292 },
293 token: if let Some(token) = trf.token {
294 token
295 } else {
296 "".to_string()
297 },
298 decimals: trf.decimals,
299 amount: trf.amount,
300 timestamp: trf.timestamp,
301 }
302 }
303}
304
305#[derive(Clone, Serialize, Deserialize)]
306#[serde(rename_all = "camelCase")]
307pub struct JsonEnrichedTransfer {
308 pub action: String,
310 pub index: String,
312 pub status: String,
314 pub source: String,
316 pub source_association: String,
318 pub destination: String,
320 pub destination_association: String,
322 pub token: String,
325 pub amount: u64,
327 pub timestamp: u64,
329}
330
331impl From<Option<TransactionError>> for EnrichedTransferStatus {
332 fn from(err: Option<TransactionError>) -> Self {
333 match err {
334 Some(error) => match error {
335 TransactionError::AccountInUse => Self::AccountInUse,
336 TransactionError::AccountLoadedTwice => Self::AccountLoadedTwice,
337 TransactionError::AccountNotFound => Self::AccountNotFound,
338 TransactionError::ProgramAccountNotFound => Self::ProgramAccountNotFound,
339 TransactionError::InsufficientFundsForFee => Self::InsufficientFundsForFee,
340 TransactionError::InvalidAccountForFee => Self::InvalidAccountForFee,
341 TransactionError::AlreadyProcessed => Self::AlreadyProcessed,
342 TransactionError::BlockhashNotFound => Self::BlockhashNotFound,
343 TransactionError::InstructionError(_, _) => Self::InstructionError,
344 TransactionError::CallChainTooDeep => Self::CallChainTooDeep,
345 TransactionError::MissingSignatureForFee => Self::MissingSignatureForFee,
346 TransactionError::InvalidAccountIndex => Self::InvalidAccountIndex,
347 TransactionError::SignatureFailure => Self::SignatureFailure,
348 TransactionError::InvalidProgramForExecution => Self::InvalidProgramForExecution,
349 TransactionError::SanitizeFailure => Self::SanitizeFailure,
350 TransactionError::ClusterMaintenance => Self::ClusterMaintenance,
351 TransactionError::AccountBorrowOutstanding => Self::AccountBorrowOutstanding,
352 TransactionError::WouldExceedMaxBlockCostLimit => {
353 Self::WouldExceedMaxBlockCostLimit
354 }
355 TransactionError::UnsupportedVersion => Self::UnsupportedVersion,
356 TransactionError::InvalidWritableAccount => Self::InvalidWritableAccount,
357 TransactionError::WouldExceedMaxAccountCostLimit => {
358 Self::WouldExceedMaxAccountCostLimit
359 }
360 TransactionError::WouldExceedAccountDataBlockLimit => {
361 Self::WouldExceedAccountDataBlockLimit
362 }
363 TransactionError::TooManyAccountLocks => Self::TooManyAccountLocks,
364 TransactionError::AddressLookupTableNotFound => Self::AddressLookupTableNotFound,
365 TransactionError::InvalidAddressLookupTableOwner => {
366 Self::InvalidAddressLookupTableOwner
367 }
368 TransactionError::InvalidAddressLookupTableData => {
369 Self::InvalidAddressLookupTableData
370 }
371 TransactionError::InvalidAddressLookupTableIndex => {
372 Self::InvalidAddressLookupTableIndex
373 }
374 TransactionError::InvalidRentPayingAccount => Self::InvalidRentPayingAccount,
375 TransactionError::WouldExceedMaxVoteCostLimit => Self::WouldExceedMaxVoteCostLimit,
376 TransactionError::WouldExceedAccountDataTotalLimit => {
377 Self::WouldExceedAccountDataTotalLimit
378 }
379 TransactionError::DuplicateInstruction(_) => Self::DuplicateInstruction,
380 TransactionError::InsufficientFundsForRent { .. } => Self::InsufficientFundsForRent,
381 TransactionError::MaxLoadedAccountsDataSizeExceeded => {
382 Self::MaxLoadedAccountsDataSizeExceeded
383 }
384 TransactionError::InvalidLoadedAccountsDataSizeLimit => {
385 Self::InvalidLoadedAccountsDataSizeLimit
386 }
387 TransactionError::ResanitizationNeeded => Self::ResanitizationNeeded,
388 TransactionError::UnbalancedTransaction => Self::UnbalancedTransaction,
389 TransactionError::ProgramExecutionTemporarilyRestricted { account_index: _ } => {
390 Self::ProgramExecutionTemporarilyRestricted
391 }
392 },
393 None => Self::Successful,
394 }
395 }
396}
397
398#[derive(Copy, Clone, PartialEq, Serialize, Deserialize)]
399pub enum EnrichedTransferStatus {
400 Null = 0,
402 Successful = 1,
404 AccountInUse = 2,
406 AccountLoadedTwice = 3,
408 AccountNotFound = 4,
410 ProgramAccountNotFound = 5,
412 InsufficientFundsForFee = 6,
414 InvalidAccountForFee = 7,
416 AlreadyProcessed = 8,
418 BlockhashNotFound = 9,
420 InstructionError = 10,
422 CallChainTooDeep = 11,
424 MissingSignatureForFee = 12,
426 InvalidAccountIndex = 13,
428 SignatureFailure = 14,
430 InvalidProgramForExecution = 15,
432 SanitizeFailure = 16,
434 ClusterMaintenance = 17,
436 AccountBorrowOutstanding = 18,
438 WouldExceedMaxBlockCostLimit = 19,
440 UnsupportedVersion = 20,
442 InvalidWritableAccount = 21,
444 WouldExceedMaxAccountCostLimit = 22,
446 WouldExceedAccountDataBlockLimit = 23,
448 TooManyAccountLocks = 24,
450 AddressLookupTableNotFound = 25,
452 InvalidAddressLookupTableOwner = 26,
454 InvalidAddressLookupTableData = 27,
456 InvalidAddressLookupTableIndex = 28,
458 InvalidRentPayingAccount = 29,
460 WouldExceedMaxVoteCostLimit = 30,
462 WouldExceedAccountDataTotalLimit = 31,
464 DuplicateInstruction = 32,
466 InsufficientFundsForRent = 33,
468 MaxLoadedAccountsDataSizeExceeded = 34,
470 InvalidLoadedAccountsDataSizeLimit = 35,
472 ResanitizationNeeded = 36,
474 UnbalancedTransaction = 37,
476 ProgramExecutionTemporarilyRestricted = 38,
478}
479
480impl Display for EnrichedTransferStatus {
481 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
482 match self {
483 EnrichedTransferStatus::Null => write!(f, "Null"),
484 EnrichedTransferStatus::Successful => write!(f, "Successful"),
485 EnrichedTransferStatus::AccountInUse => write!(f, "AccountInUse"),
486 EnrichedTransferStatus::AccountLoadedTwice => write!(f, "AccountLoadedTwice"),
487 EnrichedTransferStatus::AccountNotFound => write!(f, "AccountNotFound"),
488 EnrichedTransferStatus::ProgramAccountNotFound => write!(f, "ProgramAccountNotFound"),
489 EnrichedTransferStatus::InsufficientFundsForFee => write!(f, "InsufficientFundsForFee"),
490 EnrichedTransferStatus::InvalidAccountForFee => write!(f, "InvalidAccountForFee"),
491 EnrichedTransferStatus::AlreadyProcessed => write!(f, "AlreadyProcessed"),
492 EnrichedTransferStatus::BlockhashNotFound => write!(f, "BlockhashNotFound"),
493 EnrichedTransferStatus::InstructionError => write!(f, "InstructionError"),
494 EnrichedTransferStatus::CallChainTooDeep => write!(f, "CallChainTooDeep"),
495 EnrichedTransferStatus::MissingSignatureForFee => write!(f, "MissingSignatureForFee"),
496 EnrichedTransferStatus::InvalidAccountIndex => write!(f, "InvalidAccountIndex"),
497 EnrichedTransferStatus::SignatureFailure => write!(f, "SignatureFailure"),
498 EnrichedTransferStatus::InvalidProgramForExecution => {
499 write!(f, "InvalidProgramForExecution")
500 }
501 EnrichedTransferStatus::SanitizeFailure => write!(f, "SanitizeFailure"),
502 EnrichedTransferStatus::ClusterMaintenance => write!(f, "ClusterMaintenance"),
503 EnrichedTransferStatus::AccountBorrowOutstanding => {
504 write!(f, "AccountBorrowOutstanding")
505 }
506 EnrichedTransferStatus::WouldExceedMaxBlockCostLimit => {
507 write!(f, "WouldExceedMaxBlockCostLimit")
508 }
509 EnrichedTransferStatus::UnsupportedVersion => write!(f, "UnsupportedVersion"),
510 EnrichedTransferStatus::InvalidWritableAccount => write!(f, "InvalidWritableAccount"),
511 EnrichedTransferStatus::WouldExceedMaxAccountCostLimit => {
512 write!(f, "WouldExceedMaxAccountCostLimit")
513 }
514 EnrichedTransferStatus::WouldExceedAccountDataBlockLimit => {
515 write!(f, "WouldExceedAccountDataBlockLimit")
516 }
517 EnrichedTransferStatus::TooManyAccountLocks => write!(f, "TooManyAccountLocks"),
518 EnrichedTransferStatus::AddressLookupTableNotFound => {
519 write!(f, "AddressLookupTableNotFound")
520 }
521 EnrichedTransferStatus::InvalidAddressLookupTableOwner => {
522 write!(f, "InvalidAddressLookupTableOwner")
523 }
524 EnrichedTransferStatus::InvalidAddressLookupTableData => {
525 write!(f, "InvalidAddressLookupTableData")
526 }
527 EnrichedTransferStatus::InvalidAddressLookupTableIndex => {
528 write!(f, "InvalidAddressLookupTableIndex")
529 }
530 EnrichedTransferStatus::InvalidRentPayingAccount => {
531 write!(f, "InvalidRentPayingAccount")
532 }
533 EnrichedTransferStatus::WouldExceedMaxVoteCostLimit => {
534 write!(f, "WouldExceedMaxVoteCostLimit")
535 }
536 EnrichedTransferStatus::WouldExceedAccountDataTotalLimit => {
537 write!(f, "WouldExceedAccountDataTotalLimit")
538 }
539 EnrichedTransferStatus::DuplicateInstruction => write!(f, "DuplicateInstruction"),
540 EnrichedTransferStatus::InsufficientFundsForRent => {
541 write!(f, "InsufficientFundsForRent")
542 }
543 EnrichedTransferStatus::MaxLoadedAccountsDataSizeExceeded => {
544 write!(f, "MaxLoadedAccountsDataSizeExceeded")
545 }
546 EnrichedTransferStatus::InvalidLoadedAccountsDataSizeLimit => {
547 write!(f, "InvalidLoadedAccountsDataSizeLimit")
548 }
549 EnrichedTransferStatus::ResanitizationNeeded => write!(f, "ResanitizationNeeded"),
550 EnrichedTransferStatus::UnbalancedTransaction => write!(f, "UnbalancedTransaction"),
551 EnrichedTransferStatus::ProgramExecutionTemporarilyRestricted => {
552 write!(f, "ProgramExecutionTemporarilyRestricted")
553 }
554 }
555 }
556}
557
558impl EnrichedTransferStatus {
559 pub fn from_u16(value: u16) -> Self {
560 match value {
561 1 => Self::Successful,
562 2 => Self::AccountInUse,
563 3 => Self::AccountLoadedTwice,
564 4 => Self::AccountNotFound,
565 5 => Self::ProgramAccountNotFound,
566 6 => Self::InsufficientFundsForFee,
567 7 => Self::InvalidAccountForFee,
568 8 => Self::AlreadyProcessed,
569 9 => Self::BlockhashNotFound,
570 10 => Self::InstructionError,
571 11 => Self::CallChainTooDeep,
572 12 => Self::MissingSignatureForFee,
573 13 => Self::InvalidAccountIndex,
574 14 => Self::SignatureFailure,
575 15 => Self::InvalidProgramForExecution,
576 16 => Self::SanitizeFailure,
577 17 => Self::ClusterMaintenance,
578 18 => Self::AccountBorrowOutstanding,
579 19 => Self::WouldExceedMaxBlockCostLimit,
580 20 => Self::UnsupportedVersion,
581 21 => Self::InvalidWritableAccount,
582 22 => Self::WouldExceedMaxAccountCostLimit,
583 23 => Self::WouldExceedAccountDataBlockLimit,
584 24 => Self::TooManyAccountLocks,
585 25 => Self::AddressLookupTableNotFound,
586 26 => Self::InvalidAddressLookupTableOwner,
587 27 => Self::InvalidAddressLookupTableData,
588 28 => Self::InvalidAddressLookupTableIndex,
589 29 => Self::InvalidRentPayingAccount,
590 30 => Self::WouldExceedMaxVoteCostLimit,
591 31 => Self::WouldExceedAccountDataTotalLimit,
592 32 => Self::DuplicateInstruction,
593 33 => Self::InsufficientFundsForRent,
594 34 => Self::MaxLoadedAccountsDataSizeExceeded,
595 35 => Self::InvalidLoadedAccountsDataSizeLimit,
596 36 => Self::ResanitizationNeeded,
597 37 => Self::UnbalancedTransaction,
598 38 => Self::ProgramExecutionTemporarilyRestricted,
599 _ => Self::Null,
600 }
601 }
602 pub fn to_16(&self) -> u16 {
603 *self as u16
604 }
605}
606
607#[derive(Clone, Deserialize, Serialize)]
608pub struct TransactionInfo {
609 pub hash: String,
610 pub status: u16,
611 pub fee: u64,
612 pub total_cu_consumed: u64,
613 pub total_cu_limit: u64,
614 pub transfers: Vec<EnrichedTransfer>,
615}
616
617impl TransactionInfo {
618 pub fn old_transfers(&mut self) -> Vec<EnrichedTransfer> {
619 self.transfers.clone()
620 }
621}
622
623#[derive(Clone, Deserialize, Serialize)]
624pub struct ProgramConsumption {
625 pub tx: String,
626 pub status: u16,
627 pub line: u32,
628 pub program: String,
629 pub cu_consumed: u64,
630 pub cu_limit: u64,
631 pub timestamp: UnixTimestamp,
632}
633
634pub fn is_not_transfer(address: &str) -> bool {
635 ![
636 "11111111111111111111111111111111",
637 "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
639 "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL",
641 ]
642 .contains(&address)
643}
644
645#[derive(Debug)]
655pub struct SysSigReceiver {
656 shutdown: Arc<AtomicBool>,
658
659 notify: Receiver<()>,
661}
662
663impl SysSigReceiver {
664 pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
666 SysSigReceiver { shutdown, notify }
667 }
668
669 pub fn is_shutdown(&self) -> bool {
671 self.shutdown.load(Ordering::SeqCst)
672 }
673
674 pub async fn recv(&mut self) {
676 if self.is_shutdown() {
679 return;
680 }
681
682 let _ = self.notify.changed().await;
684
685 if !self.is_shutdown() {
686 self.shutdown.store(true, Ordering::SeqCst);
687 }
688 }
689}
690
691pub struct SysSigListener {
695 shutdown: Arc<AtomicBool>,
696 notifier: Sender<()>,
697}
698
699impl SysSigListener {
700 pub fn new(notifier: Sender<()>) -> Self {
701 Self {
702 shutdown: Arc::new(AtomicBool::new(false)),
703 notifier,
704 }
705 }
706
707 pub async fn watchdog(self) {
709 info!("Watchdog turned on!");
710
711 let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
712 let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
713 let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
714 let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
715 let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");
716
717 select! {
718 _ = tokio::signal::ctrl_c() => {
719 info!("CTRL+C received, terminating now!");
720 },
721 _ = alarm_sig.recv() => {
722 info!("SIGALRM received, terminating now!");
723 }
724 _ = hangup_sig.recv() => {
725 info!("SIGHUP received, terminating now!");
726 }
727 _ = int_sig.recv() => {
728 info!("SIGINT received, terminating now!");
729 }
730 _ = quit_sig.recv() => {
731 info!("SIGQUIT received, terminating now!");
732 }
733 _ = term_sig.recv() => {
734 info!("SIGTERM received, terminating now!");
735 }
736 }
737
738 self.forced_shutdown();
739 }
740
741 pub fn forced_shutdown(self) {
742 info!("Shutting down!");
743 (*self.shutdown).store(true, Ordering::SeqCst);
744
745 drop(self.notifier);
746 }
747
748 pub fn get_receiver(&mut self) -> SysSigReceiver {
750 SysSigReceiver::new(Arc::clone(&self.shutdown), self.notifier.subscribe())
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757 use std::time::Duration;
758 use tokio::sync::watch;
759
760 #[tokio::test]
761 async fn test_shutdown() {
762 let (notify_shutdown, _) = watch::channel(());
764 let mut listener = SysSigListener::new(notify_shutdown);
766 let receiver = listener.get_receiver();
768
769 tokio::time::sleep(Duration::from_millis(2000)).await;
771 listener.forced_shutdown();
772
773 assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
774 }
775}