worterbuch_cluster_orchestrator/
lib.rs

1/*
2 *  Copyright (C) 2025 Michael Bachmann
3 *
4 *  This program is free software: you can redistribute it and/or modify
5 *  it under the terms of the GNU Affero General Public License as published by
6 *  the Free Software Foundation, either version 3 of the License, or
7 *  (at your option) any later version.
8 *
9 *  This program is distributed in the hope that it will be useful,
10 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 *  GNU Affero General Public License for more details.
13 *
14 *  You should have received a copy of the GNU Affero General Public License
15 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
16 */
17
18mod config;
19mod election;
20mod error;
21mod follower;
22mod leader;
23mod process_manager;
24mod socket;
25mod stats;
26mod telemetry;
27mod utils;
28
29use config::instrument_and_load_config;
30use election::{ElectionOutcome, elect_leader};
31use follower::follow;
32use leader::lead;
33use miette::{Result, miette};
34use serde::{Deserialize, Serialize};
35use socket::init_socket;
36use stats::start_stats_endpoint;
37use std::{
38    cmp::Ordering,
39    net::{IpAddr, SocketAddr},
40    path::Path,
41};
42use tokio::{fs::File, select};
43use tokio_graceful_shutdown::SubsystemHandle;
44use tracing::{debug, instrument};
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub enum PeerMessage {
49    Vote(Vote),
50    Heartbeat(Heartbeat),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub enum Vote {
56    Request(VoteRequest),
57    Response(VoteResponse),
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(rename_all = "camelCase")]
62pub struct VoteRequest {
63    node_id: String,
64    priority: Priority,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct Priority(i64);
70
71impl PartialOrd for Priority {
72    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73        Some(self.cmp(other))
74    }
75}
76
77impl Ord for Priority {
78    fn cmp(&self, other: &Self) -> Ordering {
79        other.0.cmp(&self.0)
80    }
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct VoteResponse {
86    node_id: String,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "camelCase")]
91pub enum Heartbeat {
92    Request(HeartbeatRequest),
93    Response(HeartbeatResponse),
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97#[serde(rename_all = "camelCase")]
98pub struct HeartbeatRequest {
99    node_id: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct HeartbeatResponse {
105    node_id: String,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct PeerInfo {
111    node_id: String,
112    address: IpAddr,
113    raft_port: u16,
114    sync_port: u16,
115    priority: Option<i64>,
116    suicide_on_split_brain: bool,
117}
118
119impl PeerInfo {
120    pub fn raft_addr(&self) -> SocketAddr {
121        SocketAddr::new(self.address, self.raft_port)
122    }
123
124    pub fn sync_addr(&self) -> SocketAddr {
125        SocketAddr::new(self.address, self.sync_port)
126    }
127}
128
129pub async fn instrument_and_run_main(subsys: &mut SubsystemHandle) -> Result<()> {
130    let (config, peers_rx) = instrument_and_load_config(subsys).await?;
131    run_main(subsys, config, peers_rx).await
132}
133
134async fn run_main(
135    subsys: &mut SubsystemHandle,
136    mut config: config::Config,
137    mut peers_rx: tokio::sync::mpsc::Receiver<(config::Peers, PeerInfo, Option<usize>)>,
138) -> std::result::Result<(), miette::Error> {
139    let (mut peers, mut me, _) = peers_rx
140        .recv()
141        .await
142        .ok_or_else(|| miette!("peers sender dropped"))?;
143
144    let mut socket = init_socket(&config).await?;
145
146    let stats = start_stats_endpoint(subsys, config.stats_port).await?;
147
148    while !subsys.is_shutdown_requested() {
149        let prio = config.priority().await;
150
151        stats.candidate().await;
152        let outcome = select! {
153            res = elect_leader(subsys, &mut socket, &mut config, &mut peers, &mut peers_rx, prio) => res?,
154            _ = subsys.on_shutdown_requested() => break,
155        };
156
157        match outcome {
158            ElectionOutcome::Leader => {
159                stats.leader().await;
160                select! {
161                    it = lead(subsys, &mut socket, &mut config, &mut me, &mut peers, &mut peers_rx) => it?,
162                    _ = subsys.on_shutdown_requested() => break,
163                }
164            }
165            ElectionOutcome::Follower(heartbeat) => {
166                stats.follower().await;
167                select! {
168                    it = follow(subsys, &mut socket,  &config, &peers, heartbeat) => it?,
169                    _ = subsys.on_shutdown_requested() => break,
170                }
171            }
172            ElectionOutcome::Cancelled => break,
173        }
174    }
175
176    Ok(())
177}
178
179const TIMESTAMP_FILE_NAME: &str = "last-persisted";
180
181#[instrument(ret)]
182pub async fn load_millis_since_active(path: &Path) -> Option<i64> {
183    let path = path.join(TIMESTAMP_FILE_NAME);
184    debug!("getting metadata of file {path:?}");
185    let file = File::open(&path).await.ok()?;
186    let last_modified = file.metadata().await.ok()?.modified().ok()?;
187    debug!("{path:?} last modified: {last_modified:?}");
188    let elapsed = last_modified.elapsed().ok()?;
189    Some(elapsed.as_millis() as i64)
190}
191
192#[cfg(test)]
193mod lib_test {
194
195    use super::*;
196
197    #[test]
198    fn priorities_are_ordered_by_reverse_value() {
199        assert!(Priority(i64::MIN) > Priority(i64::MAX));
200        assert!(Priority(i64::MAX) < Priority(i64::MIN));
201        assert!(Priority(10) > Priority(200));
202        assert!(Priority(200) < Priority(10));
203        assert!(Priority(-10) > Priority(200));
204        assert!(Priority(200) < Priority(-10));
205        assert!(Priority(0) == Priority(0));
206    }
207}