pg_replica 0.1.0

Consensus-driven failover for PostgreSQL (Raft control plane)
use crate::storage::{DiskStorage, Recovered};
use protobuf::Message as _;
use raft::prelude::*;
use raft::StateRole;
use std::path::PathBuf;

pub struct Applied {
    pub committed: Vec<Vec<u8>>,
    pub snapshot: Option<Vec<u8>>,
}

pub struct Node {
    raw: RawNode<DiskStorage>,
}

impl Node {
    pub fn new(node_id: u64, voters: Vec<u64>, raft_dir: PathBuf) -> (Self, Recovered) {
        let (storage, recovered) = DiskStorage::new(node_id, voters, raft_dir);
        let cfg = Config {
            id: node_id,
            election_tick: 10,
            heartbeat_tick: 3,
            max_size_per_msg: 1024 * 1024 * 1024,
            max_inflight_msgs: 256,
            applied: recovered.applied,
            pre_vote: true,
            check_quorum: true,
            ..Default::default()
        };
        let logger = slog::Logger::root(slog::Discard, slog::o!());
        let raw = RawNode::new(&cfg, storage, &logger).expect("raft init failed");
        (Node { raw }, recovered)
    }

    pub fn set_snapshot_data(&self, data: Vec<u8>) {
        self.raw.raft.raft_log.store.set_snapshot_data(data);
    }

    pub fn maybe_compact(&self, threshold: u64) -> Option<u64> {
        let applied = self.raw.raft.raft_log.applied;
        let store = &self.raw.raft.raft_log.store;
        let first = store.first_index().unwrap_or(1);
        let last = store.last_index().unwrap_or(0);
        if applied == last && applied > first && applied - first >= threshold {
            store.compact(applied);
            Some(applied)
        } else {
            None
        }
    }

    pub fn tick(&mut self) {
        self.raw.tick();
    }

    pub fn step_bytes(&mut self, bytes: &[u8]) {
        let mut msg = Message::new();
        if msg.merge_from_bytes(bytes).is_ok() {
            let _ = self.raw.step(msg);
        }
    }

    pub fn is_leader(&self) -> bool {
        matches!(self.raw.raft.state, StateRole::Leader)
    }

    pub fn propose(&mut self, data: Vec<u8>) -> bool {
        self.raw.propose(vec![], data).is_ok()
    }

    pub fn drain_ready<F: FnMut(u64, Vec<u8>)>(&mut self, mut send: F) -> Applied {
        let mut applied = Applied {
            committed: Vec::new(),
            snapshot: None,
        };
        if !self.raw.has_ready() {
            return applied;
        }
        let store = self.raw.raft.raft_log.store.clone();
        let mut ready = self.raw.ready();

        if !ready.messages().is_empty() {
            emit(ready.take_messages(), &mut send);
        }
        if !ready.snapshot().is_empty() {
            let snapshot = ready.snapshot().clone();
            let data = snapshot.get_data().to_vec();
            store.apply_snapshot(snapshot);
            if !data.is_empty() {
                applied.snapshot = Some(data);
            }
        }
        collect_committed(ready.take_committed_entries(), &mut applied.committed);
        if !ready.entries().is_empty() {
            store.append(ready.entries());
        }
        if let Some(hs) = ready.hs() {
            store.set_hardstate(hs.clone());
        }
        if !ready.persisted_messages().is_empty() {
            emit(ready.take_persisted_messages(), &mut send);
        }

        let mut light_rd = self.raw.advance(ready);
        if let Some(commit) = light_rd.commit_index() {
            store.set_commit(commit);
        }
        emit(light_rd.take_messages(), &mut send);
        collect_committed(light_rd.take_committed_entries(), &mut applied.committed);
        self.raw.advance_apply();
        applied
    }

    pub fn role_name(&self) -> &'static str {
        match self.raw.raft.state {
            StateRole::Leader => "leader",
            StateRole::Follower => "follower",
            StateRole::Candidate => "candidate",
            StateRole::PreCandidate => "precandidate",
        }
    }

    pub fn term(&self) -> u64 {
        self.raw.raft.term
    }

    pub fn leader_id(&self) -> u64 {
        self.raw.raft.leader_id
    }
}

fn emit<F: FnMut(u64, Vec<u8>)>(messages: Vec<Message>, send: &mut F) {
    for msg in messages {
        let to = msg.to;
        if let Ok(bytes) = msg.write_to_bytes() {
            send(to, bytes);
        }
    }
}

fn collect_committed(entries: Vec<Entry>, out: &mut Vec<Vec<u8>>) {
    for entry in entries {
        if entry.get_entry_type() == EntryType::EntryNormal {
            let data = entry.get_data();
            if !data.is_empty() {
                out.push(data.to_vec());
            }
        }
    }
}