worterbuch_cluster_orchestrator/
lib.rs1mod config;
19mod election;
20mod error;
21mod follower;
22mod leader;
23mod process_manager;
24mod socket;
25mod stats;
26mod telemetry;
27mod utils;
28
29use config::instrument_and_load_config;
30use election::{ElectionOutcome, elect_leader};
31use follower::follow;
32use leader::lead;
33use miette::{Result, miette};
34use serde::{Deserialize, Serialize};
35use socket::init_socket;
36use stats::start_stats_endpoint;
37use std::{
38 cmp::Ordering,
39 net::{IpAddr, SocketAddr},
40 path::Path,
41};
42use tokio::{fs::File, select};
43use tokio_graceful_shutdown::SubsystemHandle;
44use tracing::{debug, instrument};
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub enum PeerMessage {
49 Vote(Vote),
50 Heartbeat(Heartbeat),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub enum Vote {
56 Request(VoteRequest),
57 Response(VoteResponse),
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(rename_all = "camelCase")]
62pub struct VoteRequest {
63 node_id: String,
64 priority: Priority,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct Priority(i64);
70
71impl PartialOrd for Priority {
72 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73 Some(self.cmp(other))
74 }
75}
76
77impl Ord for Priority {
78 fn cmp(&self, other: &Self) -> Ordering {
79 other.0.cmp(&self.0)
80 }
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct VoteResponse {
86 node_id: String,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90#[serde(rename_all = "camelCase")]
91pub enum Heartbeat {
92 Request(HeartbeatRequest),
93 Response(HeartbeatResponse),
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97#[serde(rename_all = "camelCase")]
98pub struct HeartbeatRequest {
99 node_id: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct HeartbeatResponse {
105 node_id: String,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct PeerInfo {
111 node_id: String,
112 address: IpAddr,
113 raft_port: u16,
114 sync_port: u16,
115 priority: Option<i64>,
116 suicide_on_split_brain: bool,
117}
118
119impl PeerInfo {
120 pub fn raft_addr(&self) -> SocketAddr {
121 SocketAddr::new(self.address, self.raft_port)
122 }
123
124 pub fn sync_addr(&self) -> SocketAddr {
125 SocketAddr::new(self.address, self.sync_port)
126 }
127}
128
129pub async fn instrument_and_run_main(subsys: &mut SubsystemHandle) -> Result<()> {
130 let (config, peers_rx) = instrument_and_load_config(subsys).await?;
131 run_main(subsys, config, peers_rx).await
132}
133
134async fn run_main(
135 subsys: &mut SubsystemHandle,
136 mut config: config::Config,
137 mut peers_rx: tokio::sync::mpsc::Receiver<(config::Peers, PeerInfo, Option<usize>)>,
138) -> std::result::Result<(), miette::Error> {
139 let (mut peers, mut me, _) = peers_rx
140 .recv()
141 .await
142 .ok_or_else(|| miette!("peers sender dropped"))?;
143
144 let mut socket = init_socket(&config).await?;
145
146 let stats = start_stats_endpoint(subsys, config.stats_port).await?;
147
148 while !subsys.is_shutdown_requested() {
149 let prio = config.priority().await;
150
151 stats.candidate().await;
152 let outcome = select! {
153 res = elect_leader(subsys, &mut socket, &mut config, &mut peers, &mut peers_rx, prio) => res?,
154 _ = subsys.on_shutdown_requested() => break,
155 };
156
157 match outcome {
158 ElectionOutcome::Leader => {
159 stats.leader().await;
160 select! {
161 it = lead(subsys, &mut socket, &mut config, &mut me, &mut peers, &mut peers_rx) => it?,
162 _ = subsys.on_shutdown_requested() => break,
163 }
164 }
165 ElectionOutcome::Follower(heartbeat) => {
166 stats.follower().await;
167 select! {
168 it = follow(subsys, &mut socket, &config, &peers, heartbeat) => it?,
169 _ = subsys.on_shutdown_requested() => break,
170 }
171 }
172 ElectionOutcome::Cancelled => break,
173 }
174 }
175
176 Ok(())
177}
178
179const TIMESTAMP_FILE_NAME: &str = "last-persisted";
180
181#[instrument(ret)]
182pub async fn load_millis_since_active(path: &Path) -> Option<i64> {
183 let path = path.join(TIMESTAMP_FILE_NAME);
184 debug!("getting metadata of file {path:?}");
185 let file = File::open(&path).await.ok()?;
186 let last_modified = file.metadata().await.ok()?.modified().ok()?;
187 debug!("{path:?} last modified: {last_modified:?}");
188 let elapsed = last_modified.elapsed().ok()?;
189 Some(elapsed.as_millis() as i64)
190}
191
192#[cfg(test)]
193mod lib_test {
194
195 use super::*;
196
197 #[test]
198 fn priorities_are_ordered_by_reverse_value() {
199 assert!(Priority(i64::MIN) > Priority(i64::MAX));
200 assert!(Priority(i64::MAX) < Priority(i64::MIN));
201 assert!(Priority(10) > Priority(200));
202 assert!(Priority(200) < Priority(10));
203 assert!(Priority(-10) > Priority(200));
204 assert!(Priority(200) < Priority(-10));
205 assert!(Priority(0) == Priority(0));
206 }
207}