sfm_models/
lib.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use serde::{Deserialize, Serialize};
4use solana_program::clock::UnixTimestamp;
5use tokio::select;
6use tokio::signal::unix::{signal, SignalKind};
7use tokio::sync::{watch, watch::{Receiver, Sender}};
8use tracing::info;
9
10#[derive(Clone, Serialize, Deserialize)]
11pub struct QueueItem {
12    pub numbers: Vec<u64>,
13    pub programs: Vec<String>
14}
15
16#[derive(Clone, Serialize, Deserialize)]
17pub struct TransactionProgram {
18    /// Transaction hash..
19    pub transaction_hash: String,
20    /// The programs involved in the transaction
21    pub program: String,
22    /// Time of transaction
23    pub timestamp: u64
24}
25
26#[derive(Clone, Serialize, Deserialize)]
27pub struct AccountTransaction {
28    // The transaction this transaction belongs to.
29    pub transaction_hash: String,
30    // The account pub key relating to this transaction.
31    pub account: String,
32    // Time when it is created according solana's time
33    pub timestamp: u64,
34}
35
36#[derive(Clone, Serialize, Deserialize)]
37pub struct Block {
38    /// Basically the epoch this block belongs to
39    pub epoch: u32,
40    /// Parent block hash of the current block
41    pub previous_hash: String,
42    /// Validator producing said block
43    pub producer: String,
44    /// This block's hash
45    pub hash: String,
46    /// Parent's block number
47    pub parent_number: u64,
48    /// This block's number
49    pub number: u64,
50    /// Amount of data contained within the block
51    pub data_size: u64,
52    /// Total count of transactions in the block
53    pub number_of_transactions: u32,
54    /// Total number of successful transactions
55    pub successful_transactions: u32,
56    /// Total number of vote-related transactions
57    pub vote_transactions: u32,
58    /// Total transaction fees accumulated in the transactions within this block
59    pub total_tx_fees: u64,
60    /// Total number of rewards
61    pub number_of_rewards: u32,
62    /// Total amount of rewards accrued in this block
63    pub total_reward_amount: u64,
64    /// Total amount of compute units consumed
65    pub total_compute_units_consumed: u64,
66    /// Absolute limit of compute units
67    pub total_compute_units_limit: u64,
68    /// Time this block was proposed
69    pub block_time: u64,
70}
71
72/// Record an instance of a transaction transfer at any given time.
73/// Routing key
74/// <account>#<mint>#<timestamp>
75/// Schema column qualifiers
76/// type —> receive | send
77/// amount (in finalised format, e.g. 0.000031 <eth, excluded>)
78/// txHash
79///
80#[derive(Clone, Serialize, Deserialize)]
81pub struct Transfer {
82    // The transaction this instruction belongs to.
83    pub transaction_hash: String,
84    // Status of the transaction,
85    pub status: u16,
86    // The account that will give up the amount.
87    pub source: String,
88    // Should this be a token-based transfer, this will be the associated token account of the source.
89    pub source_association: Option<String>,
90    // The account that will receive the amount.
91    pub destination: String,
92    // Should this be a token-based transfer, this will be the associated token account of the destination.
93    pub destination_association: Option<String>,
94    // If this is empty, the balance relates to lamports. If its NOT empty, the balance relates to the
95    // token in question.
96    pub token: Option<String>,
97    // The amount transferred
98    pub amount: u64,
99    // Epoch time for when this input was added to the db.
100    pub timestamp: u64,
101}
102
103#[derive(Clone, Deserialize, Serialize)]
104pub struct TransactionInfo {
105    pub hash: String,
106    pub status: u16,
107    pub fee: u64,
108    pub total_cu_consumed: u64,
109    pub program_consumptions: Vec<ProgramConsumption>,
110    pub transfers: Vec<Transfer>,
111}
112
113#[derive(Clone, Deserialize, Serialize)]
114pub struct ProgramConsumption {
115    pub tx: String,
116    pub status: u16,
117    pub line: u32,
118    pub program: String,
119    pub cu_consumed: u64,
120    pub cu_limit: u64,
121    pub timestamp: UnixTimestamp,
122}
123
124/// Listens for the machine shutdown signals via a SysSigListener with a `broadcast::Receiver`.
125///
126/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
127/// ever sent. Once a value has been sent via the broadcast channel, the server
128/// should shutdown.
129///
130/// The `Shutdown` struct listens for the signal and tracks that the signal has
131/// been received. Callers may query for whether the shutdown signal has been
132/// received or not.
133#[derive(Debug)]
134pub struct SysSigReceiver {
135    /// `true` if the shutdown signal has been received
136    shutdown: Arc<AtomicBool>,
137
138    /// The receive half of the channel used to listen for shutdown.
139    notify: Receiver<()>,
140}
141
142impl SysSigReceiver {
143    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
144    pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
145        SysSigReceiver {
146            shutdown,
147            notify,
148        }
149    }
150
151    /// Returns `true` if the shutdown signal has been received.
152    pub fn is_shutdown(&self) -> bool {
153        self.shutdown.load(Ordering::SeqCst)
154    }
155
156    /// Receive the shutdown notice, waiting if necessary.
157    pub async fn recv(&mut self) {
158        // If the shutdown signal has already been received, then return
159        // immediately.
160        if self.is_shutdown() {
161            return;
162        }
163
164        // Cannot receive a "lag error" as only one value is ever sent.
165        let _ = self.notify.changed().await;
166
167        if !self.is_shutdown() {
168            self.shutdown.store(true, Ordering::SeqCst);
169        }
170    }
171}
172
173/// System Signal Listener
174/// A simple struct to listen for shutdown signals from system, and for listeners to receive a
175/// subscriber pipe.
176pub struct SysSigListener {
177    shutdown: Arc<AtomicBool>,
178    notifier: Sender<()>
179}
180
181impl SysSigListener {
182    pub fn new(notifier: Sender<()>) -> Self {
183        Self {
184            shutdown: Arc::new(AtomicBool::new(false)),
185            notifier
186        }
187    }
188
189    /// Initiates the watchdog sequence, listening to signals from the host.
190    pub async fn watchdog(self) {
191        info!("Watchdog turned on!");
192
193        let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
194        let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
195        let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
196        let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
197        let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");
198
199        select! {
200            _ = tokio::signal::ctrl_c() => {
201                info!("CTRL+C received, terminating now!");
202            },
203            _ = alarm_sig.recv() => {
204                info!("SIGALRM received, terminating now!");
205            }
206            _ = hangup_sig.recv() => {
207                info!("SIGHUP received, terminating now!");
208            }
209            _ = int_sig.recv() => {
210                info!("SIGINT received, terminating now!");
211            }
212            _ = quit_sig.recv() => {
213                info!("SIGQUIT received, terminating now!");
214            }
215            _ = term_sig.recv() => {
216                info!("SIGTERM received, terminating now!");
217            }
218        }
219
220        self.forced_shutdown();
221    }
222
223    pub fn forced_shutdown(self) {
224        info!("Shutting down!");
225        (*self.shutdown).store(true, Ordering::SeqCst);
226
227        drop(self.notifier);
228    }
229
230    /// Retrieves a listener for the watchdog.
231    pub fn get_receiver(&mut self) -> SysSigReceiver {
232        SysSigReceiver::new(Arc::clone(&self.shutdown),
233                            self.notifier.subscribe())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use std::time::Duration;
240    use super::*;
241
242    #[tokio::test]
243    async fn test_shutdown() {
244        // Establish a channel broadcaster
245        let (notify_shutdown, _) = watch::channel(());
246        // Establish the SysSigListener with the shutdown notifier (the channel sender)
247        let mut listener = SysSigListener::new(notify_shutdown);
248        // Establish the receiver in conjunction with the listener by obtaining a receiver.
249        let receiver = listener.get_receiver();
250
251        // Simulate listener.watchdog().await; and sleep for 2 seconds before termination.
252        tokio::time::sleep(Duration::from_millis(2000)).await;
253        listener.forced_shutdown();
254
255        assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
256    }
257}