use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use serde::{Deserialize, Serialize};
use solana_program::clock::UnixTimestamp;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{watch, watch::{Receiver, Sender}};
use tracing::info;
#[derive(Clone, Serialize, Deserialize)]
pub struct QueueItem {
pub numbers: Vec<u64>,
pub programs: Vec<String>
}
#[derive(Clone, Serialize, Deserialize)]
pub struct TransactionProgram {
pub transaction_hash: String,
pub program: String,
pub timestamp: u64
}
#[derive(Clone, Serialize, Deserialize)]
pub struct AccountTransaction {
pub transaction_hash: String,
pub account: String,
pub timestamp: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Block {
pub epoch: u32,
pub previous_hash: String,
pub producer: String,
pub hash: String,
pub parent_number: u64,
pub number: u64,
pub data_size: u64,
pub number_of_transactions: u32,
pub successful_transactions: u32,
pub vote_transactions: u32,
pub total_tx_fees: u64,
pub number_of_rewards: u32,
pub total_reward_amount: u64,
pub total_compute_units_consumed: u64,
pub total_compute_units_limit: u64,
pub block_time: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Transfer {
pub transaction_hash: String,
pub status: u16,
pub source: String,
pub source_association: Option<String>,
pub destination: String,
pub destination_association: Option<String>,
pub token: Option<String>,
pub amount: u64,
pub timestamp: u64,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct TransactionInfo {
pub hash: String,
pub status: u16,
pub fee: u64,
pub total_cu_consumed: u64,
pub program_consumptions: Vec<ProgramConsumption>,
pub transfers: Vec<Transfer>,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct ProgramConsumption {
pub tx: String,
pub status: u16,
pub line: u32,
pub program: String,
pub cu_consumed: u64,
pub cu_limit: u64,
pub timestamp: UnixTimestamp,
}
#[derive(Debug)]
pub struct SysSigReceiver {
shutdown: Arc<AtomicBool>,
notify: Receiver<()>,
}
impl SysSigReceiver {
pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
SysSigReceiver {
shutdown,
notify,
}
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::SeqCst)
}
pub async fn recv(&mut self) {
if self.is_shutdown() {
return;
}
let _ = self.notify.changed().await;
if !self.is_shutdown() {
self.shutdown.store(true, Ordering::SeqCst);
}
}
}
pub struct SysSigListener {
shutdown: Arc<AtomicBool>,
notifier: Sender<()>
}
impl SysSigListener {
pub fn new(notifier: Sender<()>) -> Self {
Self {
shutdown: Arc::new(AtomicBool::new(false)),
notifier
}
}
pub async fn watchdog(self) {
info!("Watchdog turned on!");
let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");
select! {
_ = tokio::signal::ctrl_c() => {
info!("CTRL+C received, terminating now!");
},
_ = alarm_sig.recv() => {
info!("SIGALRM received, terminating now!");
}
_ = hangup_sig.recv() => {
info!("SIGHUP received, terminating now!");
}
_ = int_sig.recv() => {
info!("SIGINT received, terminating now!");
}
_ = quit_sig.recv() => {
info!("SIGQUIT received, terminating now!");
}
_ = term_sig.recv() => {
info!("SIGTERM received, terminating now!");
}
}
self.forced_shutdown();
}
pub fn forced_shutdown(self) {
info!("Shutting down!");
(*self.shutdown).store(true, Ordering::SeqCst);
drop(self.notifier);
}
pub fn get_receiver(&mut self) -> SysSigReceiver {
SysSigReceiver::new(Arc::clone(&self.shutdown),
self.notifier.subscribe())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[tokio::test]
async fn test_shutdown() {
let (notify_shutdown, _) = watch::channel(());
let mut listener = SysSigListener::new(notify_shutdown);
let receiver = listener.get_receiver();
tokio::time::sleep(Duration::from_millis(2000)).await;
listener.forced_shutdown();
assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
}
}