1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use serde::{Deserialize, Serialize};
use solana_program::clock::UnixTimestamp;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::info;

#[derive(Clone, Serialize, Deserialize)]
pub struct QueueItem {
    pub numbers: Vec<u64>,
    pub programs: Vec<String>
}

#[derive(Clone, Serialize, Deserialize)]
pub struct Block {
    /// Basically the epoch this block belongs to
    pub epoch: u32,
    /// Parent block hash of the current block
    pub previous_hash: String,
    /// Validator producing said block
    pub producer: String,
    /// This block's hash
    pub hash: String,
    /// Parent's block number
    pub parent_number: u64,
    /// This block's number
    pub number: u64,
    /// Total count of transactions in the block
    pub number_of_transactions: u32,
    /// Total number of successful transactions
    pub successful_transactions: u32,
    /// Total number of vote-related transactions
    pub vote_transactions: u32,
    /// Total transaction fees accumulated in the transactions within this block
    pub total_tx_fees: u64,
    /// Total number of rewards
    pub number_of_rewards: u32,
    /// Total amount of rewards accrued in this block
    pub total_reward_amount: u64,
    /// Total amount of compute units consumed
    pub total_compute_units_consumed: u64,
    /// Absolute limit of compute units
    pub total_compute_units_limit: u64,
    /// Time this block was proposed
    pub block_time: u64,
}

/// Record an instance of a transaction transfer at any given time.
/// Routing key
/// <account>#<mint>#<timestamp>
/// Schema column qualifiers
/// type —> receive | send
/// amount (in finalised format, e.g. 0.000031 <eth, excluded>)
/// txHash
///
#[derive(Clone, Serialize, Deserialize)]
pub struct Transfer {
    // The transaction this instruction belongs to.
    pub transaction_hash: String,
    // Status of the transaction,
    pub status: u16,
    // The account that will give up the amount.
    pub source: String,
    // Should this be a token-based transfer, this will be the associated token account of the source.
    pub source_association: Option<String>,
    // The account that will receive the amount.
    pub destination: String,
    // Should this be a token-based transfer, this will be the associated token account of the destination.
    pub destination_association: Option<String>,
    // If this is empty, the balance relates to lamports. If its NOT empty, the balance relates to the
    // token in question.
    pub token: Option<String>,
    // The amount transferred
    pub amount: i128,
    // Epoch time for when this input was added to the db.
    pub timestamp: u64,
}

#[derive(Clone, Deserialize, Serialize)]
pub struct TransactionInfo {
    pub hash: String,
    pub status: u16,
    pub fee: u64,
    pub total_cu_consumed: u64,
    pub program_consumptions: Vec<ProgramConsumption>,
    pub transfers: Vec<Transfer>,
}

#[derive(Clone, Deserialize, Serialize)]
pub struct ProgramConsumption {
    pub tx: String,
    pub status: u16,
    pub line: u32,
    pub program: String,
    pub cu_consumed: u64,
    pub cu_limit: u64,
    pub timestamp: UnixTimestamp,
}

/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub struct Shutdown {
    /// `true` if the shutdown signal has been received
    shutdown: bool,

    /// The receive half of the channel used to listen for shutdown.
    notify: Receiver<()>,
}

impl Shutdown {
    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
    pub fn new(notify: Receiver<()>) -> Shutdown {
        Shutdown {
            shutdown: false,
            notify,
        }
    }

    /// Returns `true` if the shutdown signal has been received.
    pub fn is_shutdown(&self) -> bool {
        self.shutdown
    }

    pub fn shutdown(&mut self) {
        self.shutdown = true;
    }

    /// Receive the shutdown notice, waiting if necessary.
    pub async fn recv(&mut self) {
        // If the shutdown signal has already been received, then return
        // immediately.
        if self.is_shutdown() {
            return;
        }

        // Cannot receive a "lag error" as only one value is ever sent.
        let _ = self.notify.recv().await;

        // Remember that the signal has been received.
        self.shutdown();
    }

    pub async fn watchdog(&mut self, notifier: Sender<()>) {
        let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
        let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
        let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
        let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
        let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");

        select! {
            _ = alarm_sig.recv() => {
                info!("SIGALRM received, terminating now!");
            }
            _ = hangup_sig.recv() => {
                info!("SIGHUP received, terminating now!");
            }
            _ = int_sig.recv() => {
                info!("SIGINT received, terminating now!");
            }
            _ = quit_sig.recv() => {
                info!("SIGQUIT received, terminating now!");
            }
            _ = term_sig.recv() => {
                info!("SIGTERM received, terminating now!");
            }
        }

        self.shutdown = true;
        drop(notifier);
    }
}