use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ReplicaAckPolicyKind {
All,
SimpleMajority,
None,
}
impl ReplicaAckPolicyKind {
pub fn required_acks(self, electable_count: u32) -> u32 {
match self {
ReplicaAckPolicyKind::All => {
if electable_count == 0 {
0
} else {
electable_count - 1
}
}
ReplicaAckPolicyKind::SimpleMajority => {
if electable_count <= 1 {
0
} else {
let majority = electable_count / 2 + 1;
majority - 1
}
}
ReplicaAckPolicyKind::None => 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AckWaitErrorKind {
Timeout,
NotMaster,
Shutdown,
}
#[derive(Debug, Clone)]
pub struct AckWaitError {
pub kind: AckWaitErrorKind,
pub needed: u32,
pub received: u32,
}
impl std::fmt::Display for AckWaitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.kind {
AckWaitErrorKind::Timeout => write!(
f,
"replica ack timeout: needed {}, received {}",
self.needed, self.received,
),
AckWaitErrorKind::NotMaster => {
write!(f, "commit attempted on non-master node")
}
AckWaitErrorKind::Shutdown => {
write!(f, "replicated environment is shutting down")
}
}
}
}
impl std::error::Error for AckWaitError {}
pub trait ReplicaAckCoordinator: Send + Sync {
fn await_replica_acks(
&self,
policy: ReplicaAckPolicyKind,
timeout: Duration,
) -> std::result::Result<u32, AckWaitError>;
fn alloc_vlsn_for_recovered_commit(&self, _lsn: noxu_util::Lsn) -> u64 {
0
}
fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
0
}
fn register_recovered_commit_vlsn(
&self,
_vlsn: u64,
_commit_lsn: noxu_util::Lsn,
) {
}
}
pub type SharedReplicaAckCoordinator = Arc<dyn ReplicaAckCoordinator>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn required_acks_all() {
assert_eq!(ReplicaAckPolicyKind::All.required_acks(0), 0);
assert_eq!(ReplicaAckPolicyKind::All.required_acks(1), 0);
assert_eq!(ReplicaAckPolicyKind::All.required_acks(3), 2);
assert_eq!(ReplicaAckPolicyKind::All.required_acks(5), 4);
}
#[test]
fn required_acks_simple_majority() {
assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(0), 0);
assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(1), 0);
assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(3), 1);
assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(5), 2);
}
#[test]
fn required_acks_none() {
assert_eq!(ReplicaAckPolicyKind::None.required_acks(0), 0);
assert_eq!(ReplicaAckPolicyKind::None.required_acks(100), 0);
}
}