#![allow(dead_code)]
use std::collections::{HashMap, HashSet};
use super::{Dot, NodeId};
use crate::resources::ChangeLog;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommandStatus {
Collecting,
Committed,
Applied,
}
#[derive(Debug, Clone)]
pub struct CommandInfo {
pub dot: Dot,
pub changelog: ChangeLog,
pub clock: u64,
pub status: CommandStatus,
pub acks: HashMap<NodeId, u64>,
pub committed: bool,
}
impl CommandInfo {
pub fn new(dot: Dot, changelog: ChangeLog, clock: u64) -> Self {
Self {
dot,
changelog,
clock,
status: CommandStatus::Collecting,
acks: HashMap::new(),
committed: false,
}
}
pub fn add_ack(&mut self, node_id: NodeId, clock: u64) {
self.acks.insert(node_id, clock);
}
pub fn has_fast_quorum(&self, size: usize) -> bool {
self.acks.len() >= size
}
pub fn acks_agree(&self) -> bool {
if self.acks.is_empty() {
return false;
}
let first = self.acks.values().next().cloned();
self.acks.values().all(|c| Some(*c) == first)
}
}
pub struct TempoState {
node_id: NodeId,
pub fast_quorum_size: usize,
pub write_quorum_size: usize,
commands: HashMap<Dot, CommandInfo>,
committed_dots: HashSet<Dot>,
committed_count: u64,
last_applied: u64,
}
impl TempoState {
pub fn new(node_id: NodeId, fast_quorum_size: usize, write_quorum_size: usize) -> Self {
Self {
node_id,
fast_quorum_size,
write_quorum_size,
commands: HashMap::new(),
committed_dots: HashSet::new(),
committed_count: 0,
last_applied: 0,
}
}
pub fn add_command(&mut self, dot: Dot, changelog: ChangeLog, clock: u64) {
self.commands
.entry(dot)
.or_insert_with(|| CommandInfo::new(dot, changelog, clock));
}
pub fn add_ack(&mut self, dot: Dot, node_id: NodeId, clock: u64) -> bool {
if let Some(cmd) = self.commands.get_mut(&dot) {
cmd.add_ack(node_id, clock);
cmd.acks.len() >= self.fast_quorum_size && !cmd.committed
} else {
false
}
}
pub fn has_quorum(&self, dot: &Dot) -> bool {
self.commands
.get(dot)
.map(|cmd| cmd.acks.len() >= self.fast_quorum_size && !cmd.committed)
.unwrap_or(false)
}
pub fn get_command(&self, dot: &Dot) -> Option<&CommandInfo> {
self.commands.get(dot)
}
pub fn mark_committed(&mut self, dot: Dot) {
if let Some(cmd) = self.commands.get_mut(&dot) {
if !cmd.committed {
cmd.committed = true;
cmd.status = CommandStatus::Committed;
self.committed_dots.insert(dot);
self.committed_count += 1;
if dot.node_id == self.node_id {
self.last_applied = self.last_applied.max(dot.sequence);
}
}
} else {
self.committed_dots.insert(dot);
self.committed_count += 1;
}
}
pub fn is_committed(&self, dot: &Dot) -> bool {
self.committed_dots.contains(dot)
}
pub fn committed_count(&self) -> u64 {
self.committed_count
}
pub fn pending_count(&self) -> usize {
self.commands.values().filter(|c| !c.committed).count()
}
pub fn last_applied_sequence(&self) -> u64 {
self.last_applied
}
pub fn gc(&mut self, keep_recent: usize) {
if self.commands.len() <= keep_recent {
return;
}
let mut to_remove: Vec<Dot> = self
.commands
.iter()
.filter(|(_, cmd)| cmd.committed)
.map(|(dot, _)| *dot)
.collect();
to_remove.sort_by_key(|d| d.sequence);
let remove_count = to_remove.len().saturating_sub(keep_recent);
for dot in to_remove.into_iter().take(remove_count) {
self.commands.remove(&dot);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_changelog() -> ChangeLog {
use crate::resources::{ChangeCommand, Namespace};
ChangeLog::new(
ChangeCommand::AddNamespace,
"test",
"test",
&Namespace::new("test"),
)
}
#[test]
fn test_command_quorum() {
let mut state = TempoState::new(1, 2, 2);
let dot = Dot::new(1, 1);
state.add_command(dot, test_changelog(), 100);
let has_quorum = state.add_ack(dot, 1, 100);
assert!(!has_quorum);
let has_quorum = state.add_ack(dot, 2, 100);
assert!(has_quorum);
}
#[test]
fn test_mark_committed() {
let mut state = TempoState::new(1, 2, 2);
let dot = Dot::new(1, 1);
state.add_command(dot, test_changelog(), 100);
assert!(!state.is_committed(&dot));
state.mark_committed(dot);
assert!(state.is_committed(&dot));
assert_eq!(state.committed_count(), 1);
}
}