1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
use diesel::{prelude::Insertable, Queryable};
use serde::Deserialize;

use crate::OptimizationConfig;

use super::diesel::schema::chaindexing_nodes;
use super::{ChaindexingRepo, ChaindexingRepoConn, Repo};

#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Insertable, Queryable)]
#[diesel(table_name = chaindexing_nodes)]
pub struct Node {
    pub id: i32,
    last_active_at: i64,
    inserted_at: i64,
}

impl Node {
    pub fn get_min_active_at_in_secs(node_election_rate_ms: u64) -> i64 {
        let now_ms = chrono::Utc::now().timestamp_millis();

        // Not active if not kept active at least 2 elections away
        (now_ms - (2 * node_election_rate_ms) as i64) / 1_000
    }

    fn is_leader(&self, leader: &Node) -> bool {
        self.id == leader.id
    }
}

use super::Config;
use super::{events_ingester, EventHandlers};

use chrono::Utc;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(PartialEq, Debug)]
enum NodeTasksState {
    /// Initial state of tasks are Idle.
    /// In this state, no NodeTask is running because nothing has happened yet.
    Idle,
    /// All NodeTasks are running when Active.
    Active,
    /// All NodeTasks are NOT running.
    /// However, when there is a recent KeepNodeActiveRequest, they get reactivated.
    InActive,
    /// All NodeTasks are NOT running.
    /// If there is a recent KeepNodeActiveRequest, it stays aborted.
    /// Only non-leader Nodes self-abort.
    Aborted,
}

#[derive(Clone)]
pub struct KeepNodeActiveRequest {
    /// Both in milliseconds
    last_refreshed_at_and_active_grace_period: Arc<Mutex<(u64, u32)>>,
}

impl KeepNodeActiveRequest {
    /// * `active_grace_period_ms` - how long should the Node wait
    /// till it goes inactive
    pub fn new(active_grace_period_ms: u32) -> Self {
        Self {
            last_refreshed_at_and_active_grace_period: Arc::new(Mutex::new((
                Self::now(),
                active_grace_period_ms,
            ))),
        }
    }
    pub async fn refresh(&self) {
        let mut last_refreshed_at_and_active_grace_period =
            self.last_refreshed_at_and_active_grace_period.lock().await;
        *last_refreshed_at_and_active_grace_period =
            (Self::now(), last_refreshed_at_and_active_grace_period.1);
    }

    fn now() -> u64 {
        Utc::now().timestamp_millis() as u64
    }

    async fn is_stale(&self) -> bool {
        !self.is_recent().await
    }
    async fn is_recent(&self) -> bool {
        let (last_refreshed_at, active_grace_period) =
            *self.last_refreshed_at_and_active_grace_period.lock().await;
        let min_last_refreshed_at = Self::now() - (active_grace_period as u64);

        last_refreshed_at > min_last_refreshed_at
    }
}

pub struct NodeTasks<'a> {
    current_node: &'a Node,
    state: NodeTasksState,
    tasks: Vec<tokio::task::JoinHandle<()>>,
    started_at_in_secs: u64,
    /// Not used currently. In V2, We will populate NodeTasksErrors here
    pub errors: Vec<String>,
}

impl<'a> NodeTasks<'a> {
    pub fn new(current_node: &'a Node) -> Self {
        Self {
            current_node,
            state: NodeTasksState::Idle,
            started_at_in_secs: Self::now_in_secs(),
            tasks: vec![],
            errors: vec![],
        }
    }

    pub async fn orchestrate<'b, S: Send + Sync + Clone + Debug + 'static>(
        &mut self,
        config: &Config<S>,
        conn: &mut ChaindexingRepoConn<'b>,
    ) {
        // Keep node active first to guarantee that at least this node is active before election
        ChaindexingRepo::keep_node_active(conn, self.current_node).await;

        let active_nodes =
            ChaindexingRepo::get_active_nodes(conn, config.get_node_election_rate_ms()).await;
        let leader_node = elect_leader(&active_nodes);

        if self.current_node.is_leader(leader_node) {
            match self.state {
                NodeTasksState::Idle | NodeTasksState::Aborted => self.make_active(config),

                NodeTasksState::Active => {
                    if let Some(OptimizationConfig {
                        keep_node_active_request,
                        optimize_after_in_secs,
                    }) = &config.optimization_config
                    {
                        if keep_node_active_request.is_stale().await
                            && self.started_n_seconds_ago(*optimize_after_in_secs)
                        {
                            self.make_inactive()
                        }
                    }
                }

                NodeTasksState::InActive => {
                    if let Some(OptimizationConfig {
                        keep_node_active_request,
                        ..
                    }) = &config.optimization_config
                    {
                        if keep_node_active_request.is_recent().await {
                            self.make_active(config)
                        }
                    }
                }
            }
        } else if self.state == NodeTasksState::Active {
            self.abort();
        }
    }

    fn make_active<S: Send + Sync + Clone + Debug + 'static>(&mut self, config: &Config<S>) {
        self.start(config);
        self.state = NodeTasksState::Active;
    }
    fn make_inactive(&mut self) {
        self.stop();
        self.state = NodeTasksState::InActive;
    }
    fn abort(&mut self) {
        self.stop();
        self.state = NodeTasksState::Aborted;
    }

    fn start<S: Send + Sync + Clone + Debug + 'static>(&mut self, config: &Config<S>) {
        let event_ingester = events_ingester::start(config);
        let event_handlers = EventHandlers::start(config);

        self.tasks = vec![event_ingester, event_handlers];
    }
    fn stop(&mut self) {
        for task in &self.tasks {
            task.abort();
        }
    }

    pub fn started_n_seconds_ago(&self, n_seconds: u64) -> bool {
        Self::now_in_secs() - self.started_at_in_secs >= n_seconds
    }

    fn now_in_secs() -> u64 {
        Utc::now().timestamp() as u64
    }
}

pub const DEFAULT_MAX_CONCURRENT_NODE_COUNT: u16 = 50;

fn elect_leader(nodes: &[Node]) -> &Node {
    let mut nodes_iter = nodes.iter();
    let mut leader: Option<&Node> = nodes_iter.next();

    for node in nodes_iter {
        if node.inserted_at > leader.unwrap().inserted_at {
            leader = Some(node);
        }
    }

    leader.unwrap()
}