1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
#[macro_use]
extern crate string_enum;

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use serde::{Deserialize, Serialize};
use solana_program::clock::UnixTimestamp;
use solana_sdk::transaction::TransactionError;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{watch::{Receiver, Sender}};
use tracing::info;

#[derive(Clone, Serialize, Deserialize)]
pub struct QueueItem {
    pub numbers: Vec<u64>,
    pub programs: Vec<String>
}

#[derive(Clone, Serialize, Deserialize)]
pub struct TransactionProgram {
    /// Transaction hash..
    pub transaction_hash: String,
    /// The programs involved in the transaction
    pub program: String,
    /// Time of transaction
    pub timestamp: u64
}

#[derive(Clone, Serialize, Deserialize)]
pub struct AccountProgram {
    // The account pub key relating to this transaction.
    pub account: String,
    // The programs involved with the account
    pub program: String,
    // Block number responsible for this instance.
    pub block: u64,
    // Time when it is created according to Solana's on-chain clock
    pub timestamp: u64
}

#[derive(Clone, Serialize, Deserialize)]
pub struct AccountTransaction {
    // The transaction this transaction belongs to.
    pub transaction_hash: String,
    // The account pub key relating to this transaction.
    pub account: String,
    // Block number responsible for this instance.
    pub block: u64,
    // Time when it is created according to Solana's on-chain clock
    pub timestamp: u64,
}

#[derive(Clone, Serialize, Deserialize)]
pub struct Block {
    /// Basically the epoch this block belongs to
    pub epoch: u32,
    /// Parent block hash of the current block
    pub previous_hash: String,
    /// Validator producing said block
    pub producer: String,
    /// This block's hash
    pub hash: String,
    /// Parent's block number
    pub parent_number: u64,
    /// This block's number
    pub number: u64,
    /// Amount of data contained within the block
    pub data_size: u64,
    /// Total count of transactions in the block
    pub number_of_transactions: u32,
    /// Total number of successful transactions
    pub successful_transactions: u32,
    /// Total number of vote-related transactions
    pub vote_transactions: u32,
    /// Total transaction fees accumulated in the transactions within this block
    pub total_tx_fees: u64,
    /// Total number of rewards
    pub number_of_rewards: u32,
    /// Total amount of rewards accrued in this block
    pub total_reward_amount: u64,
    /// Total amount of compute units consumed
    pub total_compute_units_consumed: u64,
    /// Absolute limit of compute units
    pub total_compute_units_limit: u64,
    /// Time this block was proposed
    pub block_time: u64,
}

/// Record an instance of a transaction transfer at any given time.
/// Routing key
/// <account>#<mint>#<timestamp>
/// Schema column qualifiers
/// type —> receive | send
/// amount (in finalised format, e.g. 0.000031 <eth, excluded>)
/// txHash
///
#[derive(Clone, Serialize, Deserialize)]
pub struct Transfer {
    // The transaction this instruction belongs to.
    pub transaction_hash: String,
    // Status of the transaction,
    pub status: u16,
    // The account that will give up the amount.
    pub source: String,
    // Should this be a token-based transfer, this will be the associated token account of the source.
    pub source_association: Option<String>,
    // The account that will receive the amount.
    pub destination: String,
    // Should this be a token-based transfer, this will be the associated token account of the destination.
    pub destination_association: Option<String>,
    // If this is empty, the balance relates to lamports. If its NOT empty, the balance relates to the
    // token in question.
    pub token: Option<String>,
    // The amount transferred
    pub amount: u64,
    // Epoch time for when this input was added to the db.
    pub timestamp: u64,
}

impl From<EnrichedTransfer> for Transfer {
    fn from(transfer: EnrichedTransfer) -> Self {
        Self {
            transaction_hash: "".into(),
            status: transfer.status as u16,
            source: transfer.source,
            source_association: transfer.source_association,
            destination: transfer.destination,
            destination_association: transfer.destination_association,
            token: transfer.token,
            amount: transfer.amount,
            timestamp: transfer.timestamp
        }
    }
}

impl From<Option<TransactionError>> for EnrichedTransferStatus {
    fn from(err: Option<TransactionError>) -> Self {
        match err {
            Some(error) =>
                match error {
                    TransactionError::AccountInUse => Self::AccountInUse,
                    TransactionError::AccountLoadedTwice => Self::AccountLoadedTwice,
                    TransactionError::AccountNotFound => Self::AccountNotFound,
                    TransactionError::ProgramAccountNotFound => Self::ProgramAccountNotFound,
                    TransactionError::InsufficientFundsForFee => Self::InsufficientFundsForFee,
                    TransactionError::InvalidAccountForFee => Self::InvalidAccountForFee,
                    TransactionError::AlreadyProcessed => Self::AlreadyProcessed,
                    TransactionError::BlockhashNotFound => Self::BlockhashNotFound,
                    TransactionError::InstructionError(_, _) => Self::InstructionError,
                    TransactionError::CallChainTooDeep => Self::CallChainTooDeep,
                    TransactionError::MissingSignatureForFee => Self::MissingSignatureForFee,
                    TransactionError::InvalidAccountIndex => Self::InvalidAccountIndex,
                    TransactionError::SignatureFailure => Self::SignatureFailure,
                    TransactionError::InvalidProgramForExecution => Self::InvalidProgramForExecution,
                    TransactionError::SanitizeFailure => Self::SanitizeFailure,
                    TransactionError::ClusterMaintenance => Self::ClusterMaintenance,
                    TransactionError::AccountBorrowOutstanding => Self::AccountBorrowOutstanding,
                    TransactionError::WouldExceedMaxBlockCostLimit => Self::WouldExceedMaxBlockCostLimit,
                    TransactionError::UnsupportedVersion => Self::UnsupportedVersion,
                    TransactionError::InvalidWritableAccount => Self::InvalidWritableAccount,
                    TransactionError::WouldExceedMaxAccountCostLimit => Self::WouldExceedMaxAccountCostLimit,
                    TransactionError::WouldExceedAccountDataBlockLimit => Self::WouldExceedAccountDataBlockLimit,
                    TransactionError::TooManyAccountLocks => Self::TooManyAccountLocks,
                    TransactionError::AddressLookupTableNotFound => Self::AddressLookupTableNotFound,
                    TransactionError::InvalidAddressLookupTableOwner => Self::InvalidAddressLookupTableOwner,
                    TransactionError::InvalidAddressLookupTableData => Self::InvalidAddressLookupTableData,
                    TransactionError::InvalidAddressLookupTableIndex => Self::InvalidAddressLookupTableIndex,
                    TransactionError::InvalidRentPayingAccount => Self::InvalidRentPayingAccount,
                    TransactionError::WouldExceedMaxVoteCostLimit => Self::WouldExceedMaxVoteCostLimit,
                    TransactionError::WouldExceedAccountDataTotalLimit => Self::WouldExceedAccountDataTotalLimit,
                    TransactionError::DuplicateInstruction(_) => Self::DuplicateInstruction,
                    TransactionError::InsufficientFundsForRent { .. } => Self::InsufficientFundsForRent,
                },
            None => Self::Successful
        }
    }
}

#[derive(Clone, PartialEq, StringEnum)]
pub enum EnrichedTransferStatus {
    /// `Successful`
    Successful = 1,
    /// `AccountInUse`
    AccountInUse = 2,
    /// `AccountLoadedTwice`
    AccountLoadedTwice = 3,
    /// `AccountNotFound`
    AccountNotFound = 4,
    /// `ProgramAccountNotFound`
    ProgramAccountNotFound = 5,
    /// `InsufficientFundsForFee`
    InsufficientFundsForFee = 6,
    /// `InvalidAccountForFee`
    InvalidAccountForFee = 7,
    /// `AlreadyProcessed`
    AlreadyProcessed = 8,
    /// `BlockhashNotFound`
    BlockhashNotFound = 9,
    /// `InstructionError`
    InstructionError = 10,
    /// `CallChainTooDeep`
    CallChainTooDeep = 11,
    /// `MissingSignatureForFee`
    MissingSignatureForFee = 12,
    /// `InvalidAccountIndex`
    InvalidAccountIndex = 13,
    /// `SignatureFailure`
    SignatureFailure = 14,
    /// `InvalidProgramForExecution`
    InvalidProgramForExecution = 15,
    /// `SanitizeFailure`
    SanitizeFailure = 16,
    /// `ClusterMaintenance`
    ClusterMaintenance = 17,
    /// `AccountBorrowOutstanding`
    AccountBorrowOutstanding = 18,
    /// `WouldExceedMaxBlockCostLimit`
    WouldExceedMaxBlockCostLimit = 19,
    /// `UnsupportedVersion`
    UnsupportedVersion = 20,
    /// `InvalidWritableAccount`
    InvalidWritableAccount = 21,
    /// `WouldExceedMaxAccountCostLimit`
    WouldExceedMaxAccountCostLimit = 22,
    /// `WouldExceedAccountDataBlockLimit`
    WouldExceedAccountDataBlockLimit = 23,
    /// `TooManyAccountLocks`
    TooManyAccountLocks = 24,
    /// `AddressLookupTableNotFound`
    AddressLookupTableNotFound = 25,
    /// `InvalidAddressLookupTableOwner`
    InvalidAddressLookupTableOwner = 26,
    /// `InvalidAddressLookupTableData`
    InvalidAddressLookupTableData = 27,
    /// `InvalidAddressLookupTableIndex`
    InvalidAddressLookupTableIndex = 28,
    /// `InvalidRentPayingAccount`
    InvalidRentPayingAccount = 29,
    /// `WouldExceedMaxVoteCostLimit`
    WouldExceedMaxVoteCostLimit = 30,
    /// `WouldExceedAccountDataTotalLimit`
    WouldExceedAccountDataTotalLimit = 31,
    /// `DuplicateInstruction`
    DuplicateInstruction = 32,
    /// `InsufficientFundsForRent`
    InsufficientFundsForRent = 33
}

/// For Transaction Hash to EnrichedTransfer binding
pub type EnrichedTransferPair = (String, EnrichedTransfer);

/// Routing key
/// <account>#<mint>#<timestamp>
/// Schema column qualifiers
/// type —> receive | send
/// amount (in finalised format, e.g. 0.000031 <eth, excluded>)
/// txHash
#[derive(Clone, Serialize, Deserialize)]
pub struct EnrichedTransfer {
    // Dictates the actions resulting in this transfer.
    pub action: String,
    // Status of the transaction,
    pub status: EnrichedTransferStatus,
    // The account that will give up the amount.
    pub source: String,
    // Should this be a token-based transfer, this will be the associated token account of the source.
    pub source_association: Option<String>,
    // The account that will receive the amount.
    pub destination: String,
    // Should this be a token-based transfer, this will be the associated token account of the destination.
    pub destination_association: Option<String>,
    // If this is empty, the balance relates to lamports. If its NOT empty, the balance relates to the
    // token in question.
    pub token: Option<String>,
    // The amount transferred
    pub amount: u64,
    // Epoch time for when this input was added to the db.
    pub timestamp: u64,
}

#[derive(Clone, Deserialize, Serialize)]
pub struct TransactionInfo {
    pub hash: String,
    pub status: u16,
    pub fee: u64,
    pub total_cu_consumed: u64,
    pub total_cu_limit: u64,
    pub transfers: Vec<EnrichedTransferPair>,
    pub account_transactions: Vec<AccountTransaction>
}

impl TransactionInfo {
    pub fn old_transfers(&mut self) -> Vec<EnrichedTransfer> {
        self.transfers.iter().map(|trf| trf.1.clone()).collect()
    }
}

#[derive(Clone, Deserialize, Serialize)]
pub struct ProgramConsumption {
    pub tx: String,
    pub status: u16,
    pub line: u32,
    pub program: String,
    pub cu_consumed: u64,
    pub cu_limit: u64,
    pub timestamp: UnixTimestamp,
}

pub fn is_not_transfer(address: &str) -> bool {
    !vec![
        // System Program
        "11111111111111111111111111111111",
        // Config Program
        "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
        // Associated Token Account Program
        "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL"
    ].contains(&address)
}

/// Listens for the machine shutdown signals via a SysSigListener with a `broadcast::Receiver`.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub struct SysSigReceiver {
    /// `true` if the shutdown signal has been received
    shutdown: Arc<AtomicBool>,

    /// The receive half of the channel used to listen for shutdown.
    notify: Receiver<()>,
}

impl SysSigReceiver {
    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
    pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
        SysSigReceiver {
            shutdown,
            notify,
        }
    }

    /// Returns `true` if the shutdown signal has been received.
    pub fn is_shutdown(&self) -> bool {
        self.shutdown.load(Ordering::SeqCst)
    }

    /// Receive the shutdown notice, waiting if necessary.
    pub async fn recv(&mut self) {
        // If the shutdown signal has already been received, then return
        // immediately.
        if self.is_shutdown() {
            return;
        }

        // Cannot receive a "lag error" as only one value is ever sent.
        let _ = self.notify.changed().await;

        if !self.is_shutdown() {
            self.shutdown.store(true, Ordering::SeqCst);
        }
    }
}

/// System Signal Listener
/// A simple struct to listen for shutdown signals from system, and for listeners to receive a
/// subscriber pipe.
pub struct SysSigListener {
    shutdown: Arc<AtomicBool>,
    notifier: Sender<()>
}

impl SysSigListener {
    pub fn new(notifier: Sender<()>) -> Self {
        Self {
            shutdown: Arc::new(AtomicBool::new(false)),
            notifier
        }
    }

    /// Initiates the watchdog sequence, listening to signals from the host.
    pub async fn watchdog(self) {
        info!("Watchdog turned on!");

        let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
        let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
        let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
        let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
        let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");

        select! {
            _ = tokio::signal::ctrl_c() => {
                info!("CTRL+C received, terminating now!");
            },
            _ = alarm_sig.recv() => {
                info!("SIGALRM received, terminating now!");
            }
            _ = hangup_sig.recv() => {
                info!("SIGHUP received, terminating now!");
            }
            _ = int_sig.recv() => {
                info!("SIGINT received, terminating now!");
            }
            _ = quit_sig.recv() => {
                info!("SIGQUIT received, terminating now!");
            }
            _ = term_sig.recv() => {
                info!("SIGTERM received, terminating now!");
            }
        }

        self.forced_shutdown();
    }

    pub fn forced_shutdown(self) {
        info!("Shutting down!");
        (*self.shutdown).store(true, Ordering::SeqCst);

        drop(self.notifier);
    }

    /// Retrieves a listener for the watchdog.
    pub fn get_receiver(&mut self) -> SysSigReceiver {
        SysSigReceiver::new(Arc::clone(&self.shutdown),
                            self.notifier.subscribe())
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;
    use tokio::sync::watch;
    use super::*;

    #[tokio::test]
    async fn test_shutdown() {
        // Establish a channel broadcaster
        let (notify_shutdown, _) = watch::channel(());
        // Establish the SysSigListener with the shutdown notifier (the channel sender)
        let mut listener = SysSigListener::new(notify_shutdown);
        // Establish the receiver in conjunction with the listener by obtaining a receiver.
        let receiver = listener.get_receiver();

        // Simulate listener.watchdog().await; and sleep for 2 seconds before termination.
        tokio::time::sleep(Duration::from_millis(2000)).await;
        listener.forced_shutdown();

        assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
    }
}