1use crate::{
2 ballot_leader_election::{Ballot, BallotLeaderElection},
3 errors::{valid_config, ConfigError},
4 messages::Message,
5 sequence_paxos::SequencePaxos,
6 storage::{Entry, StopSign, Storage},
7 util::{
8 defaults::{BUFFER_SIZE, ELECTION_TIMEOUT, RESEND_MESSAGE_TIMEOUT},
9 ConfigurationId, FlexibleQuorum, LogEntry, LogicalClock, NodeId,
10 },
11 utils::{ui, ui::ClusterState},
12};
13#[cfg(any(feature = "toml_config", feature = "serde"))]
14use serde::Deserialize;
15#[cfg(feature = "serde")]
16use serde::Serialize;
17#[cfg(feature = "toml_config")]
18use std::fs;
19use std::{
20 error::Error,
21 fmt::{Debug, Display},
22 ops::RangeBounds,
23};
24#[cfg(feature = "toml_config")]
25use toml;
26
27#[allow(missing_docs)]
32#[derive(Clone, Debug, Default)]
33#[cfg_attr(feature = "toml_config", derive(Deserialize), serde(default))]
34pub struct OmniPaxosConfig {
35 pub cluster_config: ClusterConfig,
36 pub server_config: ServerConfig,
37}
38
39impl OmniPaxosConfig {
40 pub fn validate(&self) -> Result<(), ConfigError> {
42 self.cluster_config.validate()?;
43 self.server_config.validate()?;
44 valid_config!(
45 self.cluster_config.nodes.contains(&self.server_config.pid),
46 "Nodes must include own server pid"
47 );
48 Ok(())
49 }
50
51 #[cfg(feature = "toml_config")]
53 pub fn with_toml(file_path: &str) -> Result<Self, ConfigError> {
54 let config_file = fs::read_to_string(file_path)?;
55 let config: OmniPaxosConfig = toml::from_str(&config_file)?;
56 config.validate()?;
57 Ok(config)
58 }
59
60 pub fn build<T, B>(self, storage: B) -> Result<OmniPaxos<T, B>, ConfigError>
62 where
63 T: Entry,
64 B: Storage<T>,
65 {
66 self.validate()?;
67 let recovered_leader = storage
69 .get_promise()
70 .expect("storage error while trying to read promise");
71 Ok(OmniPaxos {
72 ble: BallotLeaderElection::with(self.clone().into(), recovered_leader),
73 election_clock: LogicalClock::with(self.server_config.election_tick_timeout),
74 resend_message_clock: LogicalClock::with(
75 self.server_config.resend_message_tick_timeout,
76 ),
77 seq_paxos: SequencePaxos::with(self.into(), storage),
78 })
79 }
80}
81
82#[derive(Clone, Debug, PartialEq, Default)]
88#[cfg_attr(any(feature = "serde", feature = "toml_config"), derive(Deserialize))]
89#[cfg_attr(feature = "toml_config", serde(default))]
90#[cfg_attr(feature = "serde", derive(Serialize))]
91pub struct ClusterConfig {
92 pub configuration_id: ConfigurationId,
95 pub nodes: Vec<NodeId>,
97 pub flexible_quorum: Option<FlexibleQuorum>,
99}
100
101impl ClusterConfig {
102 pub fn validate(&self) -> Result<(), ConfigError> {
104 let num_nodes = self.nodes.len();
105 valid_config!(num_nodes > 1, "Need more than 1 node");
106 valid_config!(self.configuration_id != 0, "Configuration ID cannot be 0");
107 if let Some(FlexibleQuorum {
108 read_quorum_size,
109 write_quorum_size,
110 }) = self.flexible_quorum
111 {
112 valid_config!(
113 read_quorum_size + write_quorum_size > num_nodes,
114 "The quorums must overlap i.e., the sum of their sizes must exceed the # of nodes"
115 );
116 valid_config!(
117 read_quorum_size >= 2 && read_quorum_size <= num_nodes,
118 "Read quorum must be in range 2 to # of nodes in the cluster"
119 );
120 valid_config!(
121 write_quorum_size >= 2 && write_quorum_size <= num_nodes,
122 "Write quorum must be in range 2 to # of nodes in the cluster"
123 );
124 valid_config!(
125 read_quorum_size >= write_quorum_size,
126 "Read quorum size must be >= the write quorum size."
127 );
128 }
129 Ok(())
130 }
131
132 pub fn build_for_server<T, B>(
135 self,
136 server_config: ServerConfig,
137 with_storage: B,
138 ) -> Result<OmniPaxos<T, B>, ConfigError>
139 where
140 T: Entry,
141 B: Storage<T>,
142 {
143 let op_config = OmniPaxosConfig {
144 cluster_config: self,
145 server_config,
146 };
147 op_config.build(with_storage)
148 }
149}
150
151#[derive(Clone, Debug)]
162#[cfg_attr(feature = "toml_config", derive(Deserialize), serde(default))]
163pub struct ServerConfig {
164 pub pid: NodeId,
166 pub election_tick_timeout: u64,
168 pub resend_message_tick_timeout: u64,
170 pub buffer_size: usize,
172 pub batch_size: usize,
174 pub leader_priority: u32,
176 #[cfg(feature = "logging")]
178 pub logger_file_path: Option<String>,
179 #[cfg(feature = "logging")]
181 #[cfg_attr(feature = "toml_config", serde(skip_deserializing))]
182 pub custom_logger: Option<slog::Logger>,
183}
184
185impl ServerConfig {
186 pub fn validate(&self) -> Result<(), ConfigError> {
188 valid_config!(self.pid != 0, "Server pid cannot be 0");
189 valid_config!(self.buffer_size != 0, "Buffer size must be greater than 0");
190 valid_config!(self.batch_size != 0, "Batch size must be greater than 0");
191 valid_config!(
192 self.election_tick_timeout != 0,
193 "Election tick timeout must be greater than 0"
194 );
195 valid_config!(
196 self.resend_message_tick_timeout != 0,
197 "Resend message tick timeout must be greater than 0"
198 );
199 Ok(())
200 }
201}
202
203impl Default for ServerConfig {
204 fn default() -> Self {
205 Self {
206 pid: 0,
207 election_tick_timeout: ELECTION_TIMEOUT,
208 resend_message_tick_timeout: RESEND_MESSAGE_TIMEOUT,
209 buffer_size: BUFFER_SIZE,
210 batch_size: 1,
211 leader_priority: 0,
212 #[cfg(feature = "logging")]
213 logger_file_path: None,
214 #[cfg(feature = "logging")]
215 custom_logger: None,
216 }
217 }
218}
219
220pub struct OmniPaxos<T, B>
223where
224 T: Entry,
225 B: Storage<T>,
226{
227 seq_paxos: SequencePaxos<T, B>,
228 ble: BallotLeaderElection,
229 election_clock: LogicalClock,
230 resend_message_clock: LogicalClock,
231}
232
233impl<T, B> OmniPaxos<T, B>
234where
235 T: Entry,
236 B: Storage<T>,
237{
238 pub fn trim(&mut self, trim_index: Option<u64>) -> Result<(), CompactionErr> {
242 self.seq_paxos.trim(trim_index)
243 }
244
245 pub fn snapshot(
250 &mut self,
251 compact_idx: Option<u64>,
252 local_only: bool,
253 ) -> Result<(), CompactionErr> {
254 self.seq_paxos.snapshot(compact_idx, local_only)
255 }
256
257 pub fn get_decided_idx(&self) -> u64 {
259 self.seq_paxos.get_decided_idx()
260 }
261
262 pub fn get_compacted_idx(&self) -> u64 {
264 self.seq_paxos.get_compacted_idx()
265 }
266
267 pub fn get_current_leader(&self) -> Option<NodeId> {
269 let promised_pid = self.seq_paxos.get_promise().pid;
270 if promised_pid == 0 {
271 None
272 } else {
273 Some(promised_pid)
274 }
275 }
276
277 pub fn get_promise(&self) -> Ballot {
279 self.seq_paxos.get_promise()
280 }
281
282 pub fn outgoing_messages(&mut self) -> Vec<Message<T>> {
284 let paxos_msgs = self
285 .seq_paxos
286 .get_outgoing_msgs()
287 .into_iter()
288 .map(|p| Message::SequencePaxos(p));
289 let ble_msgs = self
290 .ble
291 .get_outgoing_msgs()
292 .into_iter()
293 .map(|b| Message::BLE(b));
294 ble_msgs.chain(paxos_msgs).collect()
295 }
296
297 pub fn read(&self, idx: u64) -> Option<LogEntry<T>> {
299 match self
300 .seq_paxos
301 .internal_storage
302 .read(idx..idx + 1)
303 .expect("storage error while trying to read log entries")
304 {
305 Some(mut v) => v.pop(),
306 None => None,
307 }
308 }
309
310 pub fn read_entries<R>(&self, r: R) -> Option<Vec<LogEntry<T>>>
312 where
313 R: RangeBounds<u64>,
314 {
315 self.seq_paxos
316 .internal_storage
317 .read(r)
318 .expect("storage error while trying to read log entries")
319 }
320
321 pub fn read_decided_suffix(&self, from_idx: u64) -> Option<Vec<LogEntry<T>>> {
323 self.seq_paxos
324 .internal_storage
325 .read_decided_suffix(from_idx)
326 .expect("storage error while trying to read decided log suffix")
327 }
328
329 pub fn handle_incoming(&mut self, m: Message<T>) {
331 match m {
332 Message::SequencePaxos(p) => self.seq_paxos.handle(p),
333 Message::BLE(b) => self.ble.handle(b),
334 }
335 }
336
337 pub fn is_reconfigured(&self) -> Option<StopSign> {
339 self.seq_paxos.is_reconfigured()
340 }
341
342 pub fn append(&mut self, entry: T) -> Result<(), ProposeErr<T>> {
344 self.seq_paxos.append(entry)
345 }
346
347 pub fn reconfigure(
352 &mut self,
353 new_configuration: ClusterConfig,
354 metadata: Option<Vec<u8>>,
355 ) -> Result<(), ProposeErr<T>> {
356 if let Err(config_error) = new_configuration.validate() {
357 return Err(ProposeErr::ConfigError(
358 config_error,
359 new_configuration,
360 metadata,
361 ));
362 }
363 self.seq_paxos.reconfigure(new_configuration, metadata)
364 }
365
366 pub fn reconnected(&mut self, pid: NodeId) {
369 self.seq_paxos.reconnected(pid)
370 }
371
372 pub fn tick(&mut self) {
375 if self.election_clock.tick_and_check_timeout() {
376 self.election_timeout();
377 }
378 if self.resend_message_clock.tick_and_check_timeout() {
379 self.seq_paxos.resend_message_timeout();
380 }
381 }
382
383 pub fn set_priority(&mut self, p: u32) {
387 self.ble.set_priority(p)
388 }
389
390 fn election_timeout(&mut self) {
394 if let Some(new_leader) = self
395 .ble
396 .hb_timeout(self.seq_paxos.get_state(), self.seq_paxos.get_promise())
397 {
398 self.seq_paxos.handle_leader(new_leader);
399 }
400 }
401
402 pub fn get_ui_states(&self) -> ui::OmniPaxosStates {
404 let mut cluster_state = ClusterState::from(self.seq_paxos.get_leader_state());
405 cluster_state.heartbeats = self.ble.get_ballots();
406
407 ui::OmniPaxosStates {
408 current_ballot: self.ble.get_current_ballot(),
409 current_leader: self.get_current_leader(),
410 decided_idx: self.get_decided_idx(),
411 heartbeats: self.ble.get_ballots(),
412 cluster_state,
413 }
414 }
415}
416
417#[derive(Debug)]
420pub enum ProposeErr<T>
421where
422 T: Entry,
423{
424 PendingReconfigEntry(T),
426 PendingReconfigConfig(ClusterConfig, Option<Vec<u8>>),
429 ConfigError(ConfigError, ClusterConfig, Option<Vec<u8>>),
432}
433
434#[derive(Copy, Clone, Debug)]
436pub enum CompactionErr {
437 UndecidedIndex(u64),
439 TrimmedIndex(u64),
441 NotAllDecided(u64),
443 NotCurrentLeader(NodeId),
445}
446
447impl Error for CompactionErr {}
448impl Display for CompactionErr {
449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450 Debug::fmt(self, f)
451 }
452}