sfm-models 0.1.2

A simple SDK for reusing SFM entities.
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use serde::{Deserialize, Serialize};
use solana_program::clock::UnixTimestamp;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{watch, watch::{Receiver, Sender}};
use tracing::info;

#[derive(Clone, Serialize, Deserialize)]
pub struct QueueItem {
    pub numbers: Vec<u64>,
    pub programs: Vec<String>
}

#[derive(Clone, Serialize, Deserialize)]
pub struct TransactionProgram {
    /// Transaction hash..
    pub transaction_hash: String,
    /// The programs involved in the transaction
    pub program: String,
    /// Time of transaction
    pub timestamp: u64
}

#[derive(Clone, Serialize, Deserialize)]
pub struct AccountTransaction {
    // The transaction this transaction belongs to.
    pub transaction_hash: String,
    // The account pub key relating to this transaction.
    pub account: String,
    // Time when it is created according solana's time
    pub timestamp: u64,
}

#[derive(Clone, Serialize, Deserialize)]
pub struct Block {
    /// Basically the epoch this block belongs to
    pub epoch: u32,
    /// Parent block hash of the current block
    pub previous_hash: String,
    /// Validator producing said block
    pub producer: String,
    /// This block's hash
    pub hash: String,
    /// Parent's block number
    pub parent_number: u64,
    /// This block's number
    pub number: u64,
    /// Amount of data contained within the block
    pub data_size: u64,
    /// Total count of transactions in the block
    pub number_of_transactions: u32,
    /// Total number of successful transactions
    pub successful_transactions: u32,
    /// Total number of vote-related transactions
    pub vote_transactions: u32,
    /// Total transaction fees accumulated in the transactions within this block
    pub total_tx_fees: u64,
    /// Total number of rewards
    pub number_of_rewards: u32,
    /// Total amount of rewards accrued in this block
    pub total_reward_amount: u64,
    /// Total amount of compute units consumed
    pub total_compute_units_consumed: u64,
    /// Absolute limit of compute units
    pub total_compute_units_limit: u64,
    /// Time this block was proposed
    pub block_time: u64,
}

/// Record an instance of a transaction transfer at any given time.
/// Routing key
/// <account>#<mint>#<timestamp>
/// Schema column qualifiers
/// type —> receive | send
/// amount (in finalised format, e.g. 0.000031 <eth, excluded>)
/// txHash
///
#[derive(Clone, Serialize, Deserialize)]
pub struct Transfer {
    // The transaction this instruction belongs to.
    pub transaction_hash: String,
    // Status of the transaction,
    pub status: u16,
    // The account that will give up the amount.
    pub source: String,
    // Should this be a token-based transfer, this will be the associated token account of the source.
    pub source_association: Option<String>,
    // The account that will receive the amount.
    pub destination: String,
    // Should this be a token-based transfer, this will be the associated token account of the destination.
    pub destination_association: Option<String>,
    // If this is empty, the balance relates to lamports. If its NOT empty, the balance relates to the
    // token in question.
    pub token: Option<String>,
    // The amount transferred
    pub amount: u64,
    // Epoch time for when this input was added to the db.
    pub timestamp: u64,
}

#[derive(Clone, Deserialize, Serialize)]
pub struct TransactionInfo {
    pub hash: String,
    pub status: u16,
    pub fee: u64,
    pub total_cu_consumed: u64,
    pub program_consumptions: Vec<ProgramConsumption>,
    pub transfers: Vec<Transfer>,
}

#[derive(Clone, Deserialize, Serialize)]
pub struct ProgramConsumption {
    pub tx: String,
    pub status: u16,
    pub line: u32,
    pub program: String,
    pub cu_consumed: u64,
    pub cu_limit: u64,
    pub timestamp: UnixTimestamp,
}

/// Listens for the machine shutdown signals via a SysSigListener with a `broadcast::Receiver`.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub struct SysSigReceiver {
    /// `true` if the shutdown signal has been received
    shutdown: Arc<AtomicBool>,

    /// The receive half of the channel used to listen for shutdown.
    notify: Receiver<()>,
}

impl SysSigReceiver {
    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
    pub fn new(shutdown: Arc<AtomicBool>, notify: Receiver<()>) -> SysSigReceiver {
        SysSigReceiver {
            shutdown,
            notify,
        }
    }

    /// Returns `true` if the shutdown signal has been received.
    pub fn is_shutdown(&self) -> bool {
        self.shutdown.load(Ordering::SeqCst)
    }

    /// Receive the shutdown notice, waiting if necessary.
    pub async fn recv(&mut self) {
        // If the shutdown signal has already been received, then return
        // immediately.
        if self.is_shutdown() {
            return;
        }

        // Cannot receive a "lag error" as only one value is ever sent.
        let _ = self.notify.changed().await;

        if !self.is_shutdown() {
            self.shutdown.store(true, Ordering::SeqCst);
        }
    }
}

/// System Signal Listener
/// A simple struct to listen for shutdown signals from system, and for listeners to receive a
/// subscriber pipe.
pub struct SysSigListener {
    shutdown: Arc<AtomicBool>,
    notifier: Sender<()>
}

impl SysSigListener {
    pub fn new(notifier: Sender<()>) -> Self {
        Self {
            shutdown: Arc::new(AtomicBool::new(false)),
            notifier
        }
    }

    /// Initiates the watchdog sequence, listening to signals from the host.
    pub async fn watchdog(self) {
        info!("Watchdog turned on!");

        let mut alarm_sig = signal(SignalKind::alarm()).expect("Alarm stream failed.");
        let mut hangup_sig = signal(SignalKind::hangup()).expect("Hangup stream failed.");
        let mut int_sig = signal(SignalKind::interrupt()).expect("Interrupt stream failed.");
        let mut quit_sig = signal(SignalKind::quit()).expect("Quit stream failed.");
        let mut term_sig = signal(SignalKind::terminate()).expect("Terminate stream failed.");

        select! {
            _ = tokio::signal::ctrl_c() => {
                info!("CTRL+C received, terminating now!");
            },
            _ = alarm_sig.recv() => {
                info!("SIGALRM received, terminating now!");
            }
            _ = hangup_sig.recv() => {
                info!("SIGHUP received, terminating now!");
            }
            _ = int_sig.recv() => {
                info!("SIGINT received, terminating now!");
            }
            _ = quit_sig.recv() => {
                info!("SIGQUIT received, terminating now!");
            }
            _ = term_sig.recv() => {
                info!("SIGTERM received, terminating now!");
            }
        }

        self.forced_shutdown();
    }

    pub fn forced_shutdown(self) {
        info!("Shutting down!");
        (*self.shutdown).store(true, Ordering::SeqCst);

        drop(self.notifier);
    }

    /// Retrieves a listener for the watchdog.
    pub fn get_receiver(&mut self) -> SysSigReceiver {
        SysSigReceiver::new(Arc::clone(&self.shutdown),
                            self.notifier.subscribe())
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;
    use super::*;

    #[tokio::test]
    async fn test_shutdown() {
        // Establish a channel broadcaster
        let (notify_shutdown, _) = watch::channel(());
        // Establish the SysSigListener with the shutdown notifier (the channel sender)
        let mut listener = SysSigListener::new(notify_shutdown);
        // Establish the receiver in conjunction with the listener by obtaining a receiver.
        let receiver = listener.get_receiver();

        // Simulate listener.watchdog().await; and sleep for 2 seconds before termination.
        tokio::time::sleep(Duration::from_millis(2000)).await;
        listener.forced_shutdown();

        assert_eq!(receiver.shutdown.load(Ordering::SeqCst), true);
    }
}