1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::Blockstore;
use solana_runtime::bank::RewardInfo;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use solana_transaction_status::Reward;
use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread::{self, Builder, JoinHandle},
    time::Duration,
};

pub type RewardsRecorderReceiver = Receiver<(Slot, Vec<(Pubkey, RewardInfo)>)>;
pub type RewardsRecorderSender = Sender<(Slot, Vec<(Pubkey, RewardInfo)>)>;

pub struct RewardsRecorderService {
    thread_hdl: JoinHandle<()>,
}

impl RewardsRecorderService {
    #[allow(clippy::new_ret_no_self)]
    pub fn new(
        rewards_receiver: RewardsRecorderReceiver,
        blockstore: Arc<Blockstore>,
        exit: &Arc<AtomicBool>,
    ) -> Self {
        let exit = exit.clone();
        let thread_hdl = Builder::new()
            .name("solana-rewards-writer".to_string())
            .spawn(move || loop {
                if exit.load(Ordering::Relaxed) {
                    break;
                }
                if let Err(RecvTimeoutError::Disconnected) =
                    Self::write_rewards(&rewards_receiver, &blockstore)
                {
                    break;
                }
            })
            .unwrap();
        Self { thread_hdl }
    }

    fn write_rewards(
        rewards_receiver: &RewardsRecorderReceiver,
        blockstore: &Arc<Blockstore>,
    ) -> Result<(), RecvTimeoutError> {
        let (slot, rewards) = rewards_receiver.recv_timeout(Duration::from_secs(1))?;
        let rpc_rewards = rewards
            .into_iter()
            .map(|(pubkey, reward_info)| Reward {
                pubkey: pubkey.to_string(),
                lamports: reward_info.lamports,
                post_balance: reward_info.post_balance,
                reward_type: Some(reward_info.reward_type),
            })
            .collect();

        blockstore
            .write_rewards(slot, rpc_rewards)
            .expect("Expect database write to succeed");
        Ok(())
    }

    pub fn join(self) -> thread::Result<()> {
        self.thread_hdl.join()
    }
}