omnipaxos/
omni_paxos.rs

1use crate::{
2    ballot_leader_election::{Ballot, BallotLeaderElection},
3    errors::{valid_config, ConfigError},
4    messages::Message,
5    sequence_paxos::SequencePaxos,
6    storage::{Entry, StopSign, Storage},
7    util::{
8        defaults::{BUFFER_SIZE, ELECTION_TIMEOUT, RESEND_MESSAGE_TIMEOUT},
9        ConfigurationId, FlexibleQuorum, LogEntry, LogicalClock, NodeId,
10    },
11    utils::{ui, ui::ClusterState},
12};
13#[cfg(any(feature = "toml_config", feature = "serde"))]
14use serde::Deserialize;
15#[cfg(feature = "serde")]
16use serde::Serialize;
17#[cfg(feature = "toml_config")]
18use std::fs;
19use std::{
20    error::Error,
21    fmt::{Debug, Display},
22    ops::RangeBounds,
23};
24#[cfg(feature = "toml_config")]
25use toml;
26
27/// Configuration for `OmniPaxos`.
28/// # Fields
29/// * `cluster_config`: The configuration settings that are cluster-wide.
30/// * `server_config`: The configuration settings that are specific to this OmniPaxos server.
31#[allow(missing_docs)]
32#[derive(Clone, Debug, Default)]
33#[cfg_attr(feature = "toml_config", derive(Deserialize), serde(default))]
34pub struct OmniPaxosConfig {
35    pub cluster_config: ClusterConfig,
36    pub server_config: ServerConfig,
37}
38
39impl OmniPaxosConfig {
40    /// Checks that all the fields of the cluster config are valid.
41    pub fn validate(&self) -> Result<(), ConfigError> {
42        self.cluster_config.validate()?;
43        self.server_config.validate()?;
44        valid_config!(
45            self.cluster_config.nodes.contains(&self.server_config.pid),
46            "Nodes must include own server pid"
47        );
48        Ok(())
49    }
50
51    /// Creates a new `OmniPaxosConfig` from a `toml` file.
52    #[cfg(feature = "toml_config")]
53    pub fn with_toml(file_path: &str) -> Result<Self, ConfigError> {
54        let config_file = fs::read_to_string(file_path)?;
55        let config: OmniPaxosConfig = toml::from_str(&config_file)?;
56        config.validate()?;
57        Ok(config)
58    }
59
60    /// Checks all configuration fields and returns the local OmniPaxos node if successful.
61    pub fn build<T, B>(self, storage: B) -> Result<OmniPaxos<T, B>, ConfigError>
62    where
63        T: Entry,
64        B: Storage<T>,
65    {
66        self.validate()?;
67        // Use stored ballot as initial BLE leader
68        let recovered_leader = storage
69            .get_promise()
70            .expect("storage error while trying to read promise");
71        Ok(OmniPaxos {
72            ble: BallotLeaderElection::with(self.clone().into(), recovered_leader),
73            election_clock: LogicalClock::with(self.server_config.election_tick_timeout),
74            resend_message_clock: LogicalClock::with(
75                self.server_config.resend_message_tick_timeout,
76            ),
77            seq_paxos: SequencePaxos::with(self.into(), storage),
78        })
79    }
80}
81
82/// Configuration for an `OmniPaxos` cluster.
83/// # Fields
84/// * `configuration_id`: The identifier for the cluster configuration that this OmniPaxos server is part of.
85/// * `nodes`: The nodes in the cluster i.e. the `pid`s of the other servers in the configuration.
86/// * `flexible_quorum` : Defines read and write quorum sizes. Can be used for different latency vs fault tolerance tradeoffs.
87#[derive(Clone, Debug, PartialEq, Default)]
88#[cfg_attr(any(feature = "serde", feature = "toml_config"), derive(Deserialize))]
89#[cfg_attr(feature = "toml_config", serde(default))]
90#[cfg_attr(feature = "serde", derive(Serialize))]
91pub struct ClusterConfig {
92    /// The identifier for the cluster configuration that this OmniPaxos server is part of. Must
93    /// not be 0 and be greater than the previous configuration's id.
94    pub configuration_id: ConfigurationId,
95    /// The nodes in the cluster i.e. the `pid`s of the servers in the configuration.
96    pub nodes: Vec<NodeId>,
97    /// Defines read and write quorum sizes. Can be used for different latency vs fault tolerance tradeoffs.
98    pub flexible_quorum: Option<FlexibleQuorum>,
99}
100
101impl ClusterConfig {
102    /// Checks that all the fields of the cluster config are valid.
103    pub fn validate(&self) -> Result<(), ConfigError> {
104        let num_nodes = self.nodes.len();
105        valid_config!(num_nodes > 1, "Need more than 1 node");
106        valid_config!(self.configuration_id != 0, "Configuration ID cannot be 0");
107        if let Some(FlexibleQuorum {
108            read_quorum_size,
109            write_quorum_size,
110        }) = self.flexible_quorum
111        {
112            valid_config!(
113                read_quorum_size + write_quorum_size > num_nodes,
114                "The quorums must overlap i.e., the sum of their sizes must exceed the # of nodes"
115            );
116            valid_config!(
117                read_quorum_size >= 2 && read_quorum_size <= num_nodes,
118                "Read quorum must be in range 2 to # of nodes in the cluster"
119            );
120            valid_config!(
121                write_quorum_size >= 2 && write_quorum_size <= num_nodes,
122                "Write quorum must be in range 2 to # of nodes in the cluster"
123            );
124            valid_config!(
125                read_quorum_size >= write_quorum_size,
126                "Read quorum size must be >= the write quorum size."
127            );
128        }
129        Ok(())
130    }
131
132    /// Checks all configuration fields and builds a local OmniPaxos node with settings for this
133    /// node defined in `server_config` and using storage `with_storage`.
134    pub fn build_for_server<T, B>(
135        self,
136        server_config: ServerConfig,
137        with_storage: B,
138    ) -> Result<OmniPaxos<T, B>, ConfigError>
139    where
140        T: Entry,
141        B: Storage<T>,
142    {
143        let op_config = OmniPaxosConfig {
144            cluster_config: self,
145            server_config,
146        };
147        op_config.build(with_storage)
148    }
149}
150
151/// Configuration for a singular `OmniPaxos` instance in a cluster.
152/// # Fields
153/// * `pid`: The unique identifier of this node. Must not be 0.
154/// * `election_tick_timeout`: The number of calls to `tick()` before leader election is updated.
155/// If this is set to 5 and `tick()` is called every 10ms, then the election timeout will be 50ms. Must not be 0.
156/// * `resend_message_tick_timeout`: The number of calls to `tick()` before a message is considered dropped and thus resent. Must not be 0.
157/// * `buffer_size`: The buffer size for outgoing messages.
158/// * `batch_size`: The size of the buffer for log batching. The default is 1, which means no batching.
159/// * `logger_file_path`: The path where the default logger logs events.
160/// * `leader_priority` : Custom priority for this node to be elected as the leader.
161#[derive(Clone, Debug)]
162#[cfg_attr(feature = "toml_config", derive(Deserialize), serde(default))]
163pub struct ServerConfig {
164    /// The unique identifier of this node. Must not be 0.
165    pub pid: NodeId,
166    /// The number of calls to `tick()` before leader election is updated. If this is set to 5 and `tick()` is called every 10ms, then the election timeout will be 50ms.
167    pub election_tick_timeout: u64,
168    /// The number of calls to `tick()` before a message is considered dropped and thus resent. Must not be 0.
169    pub resend_message_tick_timeout: u64,
170    /// The buffer size for outgoing messages.
171    pub buffer_size: usize,
172    /// The size of the buffer for log batching. The default is 1, which means no batching.
173    pub batch_size: usize,
174    /// Custom priority for this node to be elected as the leader.
175    pub leader_priority: u32,
176    /// The path where the default logger logs events.
177    #[cfg(feature = "logging")]
178    pub logger_file_path: Option<String>,
179    /// Custom logger, if provided, will be used instead of the default logger.
180    #[cfg(feature = "logging")]
181    #[cfg_attr(feature = "toml_config", serde(skip_deserializing))]
182    pub custom_logger: Option<slog::Logger>,
183}
184
185impl ServerConfig {
186    /// Checks that all the fields of the server config are valid.
187    pub fn validate(&self) -> Result<(), ConfigError> {
188        valid_config!(self.pid != 0, "Server pid cannot be 0");
189        valid_config!(self.buffer_size != 0, "Buffer size must be greater than 0");
190        valid_config!(self.batch_size != 0, "Batch size must be greater than 0");
191        valid_config!(
192            self.election_tick_timeout != 0,
193            "Election tick timeout must be greater than 0"
194        );
195        valid_config!(
196            self.resend_message_tick_timeout != 0,
197            "Resend message tick timeout must be greater than 0"
198        );
199        Ok(())
200    }
201}
202
203impl Default for ServerConfig {
204    fn default() -> Self {
205        Self {
206            pid: 0,
207            election_tick_timeout: ELECTION_TIMEOUT,
208            resend_message_tick_timeout: RESEND_MESSAGE_TIMEOUT,
209            buffer_size: BUFFER_SIZE,
210            batch_size: 1,
211            leader_priority: 0,
212            #[cfg(feature = "logging")]
213            logger_file_path: None,
214            #[cfg(feature = "logging")]
215            custom_logger: None,
216        }
217    }
218}
219
220/// The `OmniPaxos` struct represents an OmniPaxos server. Maintains the replicated log that can be read from and appended to.
221/// It also handles incoming messages and produces outgoing messages that you need to fetch and send periodically using your own network implementation.
222pub struct OmniPaxos<T, B>
223where
224    T: Entry,
225    B: Storage<T>,
226{
227    seq_paxos: SequencePaxos<T, B>,
228    ble: BallotLeaderElection,
229    election_clock: LogicalClock,
230    resend_message_clock: LogicalClock,
231}
232
233impl<T, B> OmniPaxos<T, B>
234where
235    T: Entry,
236    B: Storage<T>,
237{
238    /// Initiates the trim process.
239    /// # Arguments
240    /// * `trim_index` - Deletes all entries up to [`trim_index`], if the [`trim_index`] is `None` then the minimum index accepted by **ALL** servers will be used as the [`trim_index`].
241    pub fn trim(&mut self, trim_index: Option<u64>) -> Result<(), CompactionErr> {
242        self.seq_paxos.trim(trim_index)
243    }
244
245    /// Trim the log and create a snapshot. ** Note: only up to the `decided_idx` can be snapshotted **
246    /// # Arguments
247    /// `compact_idx` - Snapshots all entries < [`compact_idx`], if the [`compact_idx`] is None then the decided index will be used.
248    /// `local_only` - If `true`, only this server snapshots the log. If `false` all servers performs the snapshot.
249    pub fn snapshot(
250        &mut self,
251        compact_idx: Option<u64>,
252        local_only: bool,
253    ) -> Result<(), CompactionErr> {
254        self.seq_paxos.snapshot(compact_idx, local_only)
255    }
256
257    /// Return the decided index.
258    pub fn get_decided_idx(&self) -> u64 {
259        self.seq_paxos.get_decided_idx()
260    }
261
262    /// Return trim index from storage.
263    pub fn get_compacted_idx(&self) -> u64 {
264        self.seq_paxos.get_compacted_idx()
265    }
266
267    /// Returns the id of the current leader.
268    pub fn get_current_leader(&self) -> Option<NodeId> {
269        let promised_pid = self.seq_paxos.get_promise().pid;
270        if promised_pid == 0 {
271            None
272        } else {
273            Some(promised_pid)
274        }
275    }
276
277    /// Returns the promised ballot of this node.
278    pub fn get_promise(&self) -> Ballot {
279        self.seq_paxos.get_promise()
280    }
281
282    /// Returns the outgoing messages from this server. The messages should then be sent via the network implementation.
283    pub fn outgoing_messages(&mut self) -> Vec<Message<T>> {
284        let paxos_msgs = self
285            .seq_paxos
286            .get_outgoing_msgs()
287            .into_iter()
288            .map(|p| Message::SequencePaxos(p));
289        let ble_msgs = self
290            .ble
291            .get_outgoing_msgs()
292            .into_iter()
293            .map(|b| Message::BLE(b));
294        ble_msgs.chain(paxos_msgs).collect()
295    }
296
297    /// Read entry at index `idx` in the log. Returns `None` if `idx` is out of bounds.
298    pub fn read(&self, idx: u64) -> Option<LogEntry<T>> {
299        match self
300            .seq_paxos
301            .internal_storage
302            .read(idx..idx + 1)
303            .expect("storage error while trying to read log entries")
304        {
305            Some(mut v) => v.pop(),
306            None => None,
307        }
308    }
309
310    /// Read entries in the range `r` in the log. Returns `None` if `r` is out of bounds.
311    pub fn read_entries<R>(&self, r: R) -> Option<Vec<LogEntry<T>>>
312    where
313        R: RangeBounds<u64>,
314    {
315        self.seq_paxos
316            .internal_storage
317            .read(r)
318            .expect("storage error while trying to read log entries")
319    }
320
321    /// Read all decided entries from `from_idx` in the log. Returns `None` if `from_idx` is out of bounds.
322    pub fn read_decided_suffix(&self, from_idx: u64) -> Option<Vec<LogEntry<T>>> {
323        self.seq_paxos
324            .internal_storage
325            .read_decided_suffix(from_idx)
326            .expect("storage error while trying to read decided log suffix")
327    }
328
329    /// Handle an incoming message
330    pub fn handle_incoming(&mut self, m: Message<T>) {
331        match m {
332            Message::SequencePaxos(p) => self.seq_paxos.handle(p),
333            Message::BLE(b) => self.ble.handle(b),
334        }
335    }
336
337    /// Returns whether this Sequence Paxos has been reconfigured
338    pub fn is_reconfigured(&self) -> Option<StopSign> {
339        self.seq_paxos.is_reconfigured()
340    }
341
342    /// Append an entry to the replicated log.
343    pub fn append(&mut self, entry: T) -> Result<(), ProposeErr<T>> {
344        self.seq_paxos.append(entry)
345    }
346
347    /// Propose a cluster reconfiguration. Returns an error if the current configuration has already been stopped
348    /// by a previous reconfiguration request or if the `new_configuration` is invalid.
349    /// `new_configuration` defines the cluster-wide configuration settings for the **next** cluster.
350    /// `metadata` is optional data to commit alongside the reconfiguration.
351    pub fn reconfigure(
352        &mut self,
353        new_configuration: ClusterConfig,
354        metadata: Option<Vec<u8>>,
355    ) -> Result<(), ProposeErr<T>> {
356        if let Err(config_error) = new_configuration.validate() {
357            return Err(ProposeErr::ConfigError(
358                config_error,
359                new_configuration,
360                metadata,
361            ));
362        }
363        self.seq_paxos.reconfigure(new_configuration, metadata)
364    }
365
366    /// Handles re-establishing a connection to a previously disconnected peer.
367    /// This should only be called if the underlying network implementation indicates that a connection has been re-established.
368    pub fn reconnected(&mut self, pid: NodeId) {
369        self.seq_paxos.reconnected(pid)
370    }
371
372    /// Increments the internal logical clock. Will trigger leader changes and resend dropped messages (if required)
373    /// after every `election_tick_timeout` and `resend_message_tick_timeout` number of calls to this function (See how to set these in `ServerConfig`).
374    pub fn tick(&mut self) {
375        if self.election_clock.tick_and_check_timeout() {
376            self.election_timeout();
377        }
378        if self.resend_message_clock.tick_and_check_timeout() {
379            self.seq_paxos.resend_message_timeout();
380        }
381    }
382
383    /*** BLE calls ***/
384    /// Update the custom priority used in the Ballot for this server. Note that changing the
385    /// priority triggers a leader re-election.
386    pub fn set_priority(&mut self, p: u32) {
387        self.ble.set_priority(p)
388    }
389
390    /// If the heartbeat of a leader is not received when election_timeout() is called, the server might attempt to become the leader.
391    /// It is also used for the election process, where the server checks if it can become the leader.
392    /// For instance if `election_timeout()` is called every 100ms, then if the leader fails, the servers will detect it after 100ms and elect a new server after another 100ms if possible.
393    fn election_timeout(&mut self) {
394        if let Some(new_leader) = self
395            .ble
396            .hb_timeout(self.seq_paxos.get_state(), self.seq_paxos.get_promise())
397        {
398            self.seq_paxos.handle_leader(new_leader);
399        }
400    }
401
402    /// Returns the current states of the OmniPaxos instance for OmniPaxos UI to display.
403    pub fn get_ui_states(&self) -> ui::OmniPaxosStates {
404        let mut cluster_state = ClusterState::from(self.seq_paxos.get_leader_state());
405        cluster_state.heartbeats = self.ble.get_ballots();
406
407        ui::OmniPaxosStates {
408            current_ballot: self.ble.get_current_ballot(),
409            current_leader: self.get_current_leader(),
410            decided_idx: self.get_decided_idx(),
411            heartbeats: self.ble.get_ballots(),
412            cluster_state,
413        }
414    }
415}
416
417/// An error indicating a failed proposal due to the current cluster configuration being already stopped
418/// or due to an invalid proposed configuration. Returns the failed proposal.
419#[derive(Debug)]
420pub enum ProposeErr<T>
421where
422    T: Entry,
423{
424    /// Couldn't propose entry because a reconfiguration is pending. Returns the failed, proposed entry.
425    PendingReconfigEntry(T),
426    /// Couldn't propose reconfiguration because a reconfiguration is already pending. Returns the failed, proposed `ClusterConfig` and the metadata.
427    /// cluster config and metadata.
428    PendingReconfigConfig(ClusterConfig, Option<Vec<u8>>),
429    /// Couldn't propose reconfiguration because of an invalid cluster config. Contains the config
430    /// error and the failed, proposed cluster config and metadata.
431    ConfigError(ConfigError, ClusterConfig, Option<Vec<u8>>),
432}
433
434/// An error returning the proposal that was failed due to that the current configuration is stopped.
435#[derive(Copy, Clone, Debug)]
436pub enum CompactionErr {
437    /// Snapshot was called with an index that is not decided yet. Returns the currently decided index.
438    UndecidedIndex(u64),
439    /// Snapshot was called with an index which is already trimmed. Returns the currently compacted index.
440    TrimmedIndex(u64),
441    /// Trim was called with an index that is not decided by all servers yet. Returns the index decided by ALL servers currently.
442    NotAllDecided(u64),
443    /// Trim was called at a follower node. Trim must be called by the leader, which is the returned NodeId.
444    NotCurrentLeader(NodeId),
445}
446
447impl Error for CompactionErr {}
448impl Display for CompactionErr {
449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450        Debug::fmt(self, f)
451    }
452}