1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use serde::{Deserialize, Serialize};
4use solana_program::clock::UnixTimestamp;
5use tokio::select;
6use tokio::signal::unix::{signal, SignalKind};
7use tokio::sync::{watch, watch::{Receiver, Sender}};
8use tracing::info;
9
10#[derive(Clone, Serialize, Deserialize)]
11pub struct QueueItem {
12 pub numbers: Vec<u64>,
13 pub programs: Vec<String>
14}
15
16#[derive(Clone, Serialize, Deserialize)]
17pub struct TransactionProgram {
18 pub transaction_hash: String,
20 pub program: String,
22 pub timestamp: u64
24}
25
26#[derive(Clone, Serialize, Deserialize)]
27pub struct AccountTransaction {
28 pub transaction_hash: String,
30 pub account: String,
32 pub timestamp: u64,
34}
35
36#[derive(Clone, Serialize, Deserialize)]
37pub struct Block {
38 pub epoch: u32,
40 pub previous_hash: String,
42 pub producer: String,
44 pub hash: String,
46 pub parent_number: u64,
48 pub number: u64,
50 pub data_size: u64,
52 pub number_of_transactions: u32,
54 pub successful_transactions: u32,
56 pub vote_transactions: u32,
58 pub total_tx_fees: u64,
60 pub number_of_rewards: u32,
62 pub total_reward_amount: u64,
64 pub total_compute_units_consumed: u64,
66 pub total_compute_units_limit: u64,
68 pub block_time: u64,
70}
71
72#[derive(Clone, Serialize, Deserialize)]
81pub struct Transfer {
82 pub transaction_hash: String,
84 pub status: u16,
86 pub source: String,
88 pub source_association: Option<String>,
90 pub destination: String,
92 pub destination_association: Option<String>,
94 pub token: Option<String>,
97 pub amount: u64,
99 pub timestamp: u64,
101}
102
103#[derive(Clone, Deserialize, Serialize)]
104pub struct TransactionInfo {
105 pub hash: String,
106 pub status: u16,
107 pub fee: u64,
108 pub total_cu_consumed: u64,
109 pub program_consumptions: Vec<ProgramConsumption>,
110 pub transfers: Vec<Transfer>,
111}
112
113#[derive(Clone, Deserialize, Serialize)]
114pub struct ProgramConsumption {
115 pub tx: String,
116 pub status: u16,
117 pub line: u32,
118 pub program: String,
119 pub cu_consumed: u64,
120 pub cu_limit: u64,
121 pub timestamp: UnixTimestamp,
122}
123
124#[derive(Debug)]
134pub struct SysSigReceiver {
135 shutdown: Arc<AtomicBool>,
137
138 notify: Receiver<()>,
140}
141
142impl SysSigReceiver {
143 pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
145 SysSigReceiver {
146 shutdown,
147 notify,
148 }
149 }
150
151 pub fn is_shutdown(&self) -> bool {
153 self.shutdown.load(Ordering::SeqCst)
154 }
155
156 pub async fn recv(&mut self) {
158 if self.is_shutdown() {
161 return;
162 }
163
164 let _ = self.notify.changed().await;
166
167 if !self.is_shutdown() {
168 self.shutdown.store(true, Ordering::SeqCst);
169 }
170 }
171}
172
173pub struct SysSigListener {
177 shutdown: Arc<AtomicBool>,
178 notifier: Sender<()>
179}
180
181impl SysSigListener {
182 pub fn new(notifier: Sender<()>) -> Self {
183 Self {
184 shutdown: Arc::new(AtomicBool::new(false)),
185 notifier
186 }
187 }
188
189 pub async fn watchdog(self) {
191 info!("Watchdog turned on!");
192
193 let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
194 let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
195 let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
196 let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
197 let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");
198
199 select! {
200 _ = tokio::signal::ctrl_c() => {
201 info!("CTRL+C received, terminating now!");
202 },
203 _ = alarm_sig.recv() => {
204 info!("SIGALRM received, terminating now!");
205 }
206 _ = hangup_sig.recv() => {
207 info!("SIGHUP received, terminating now!");
208 }
209 _ = int_sig.recv() => {
210 info!("SIGINT received, terminating now!");
211 }
212 _ = quit_sig.recv() => {
213 info!("SIGQUIT received, terminating now!");
214 }
215 _ = term_sig.recv() => {
216 info!("SIGTERM received, terminating now!");
217 }
218 }
219
220 self.forced_shutdown();
221 }
222
223 pub fn forced_shutdown(self) {
224 info!("Shutting down!");
225 (*self.shutdown).store(true, Ordering::SeqCst);
226
227 drop(self.notifier);
228 }
229
230 pub fn get_receiver(&mut self) -> SysSigReceiver {
232 SysSigReceiver::new(Arc::clone(&self.shutdown),
233 self.notifier.subscribe())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use std::time::Duration;
240 use super::*;
241
242 #[tokio::test]
243 async fn test_shutdown() {
244 let (notify_shutdown, _) = watch::channel(());
246 let mut listener = SysSigListener::new(notify_shutdown);
248 let receiver = listener.get_receiver();
250
251 tokio::time::sleep(Duration::from_millis(2000)).await;
253 listener.forced_shutdown();
254
255 assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
256 }
257}