use std::collections::VecDeque;
use slog::Logger;
use crate::eraftpb::Message;
use crate::{HashMap, HashSet};
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum ReadOnlyOption {
Safe,
LeaseBased,
}
impl Default for ReadOnlyOption {
fn default() -> ReadOnlyOption {
ReadOnlyOption::Safe
}
}
#[derive(Default, Debug, PartialEq, Clone)]
pub struct ReadState {
pub index: u64,
pub request_ctx: Vec<u8>,
}
#[derive(Default, Debug, Clone)]
pub struct ReadIndexStatus {
pub req: Message,
pub index: u64,
pub acks: HashSet<u64>,
}
#[derive(Default, Debug, Clone)]
pub struct ReadOnly {
pub option: ReadOnlyOption,
pub pending_read_index: HashMap<Vec<u8>, ReadIndexStatus>,
pub read_index_queue: VecDeque<Vec<u8>>,
}
impl ReadOnly {
pub fn new(option: ReadOnlyOption) -> ReadOnly {
ReadOnly {
option,
pending_read_index: HashMap::default(),
read_index_queue: VecDeque::new(),
}
}
pub fn add_request(&mut self, index: u64, req: Message, self_id: u64) {
let ctx = {
let key: &[u8] = req.entries[0].data.as_ref();
if self.pending_read_index.contains_key(key) {
return;
}
key.to_vec()
};
let mut acks = HashSet::<u64>::default();
acks.insert(self_id);
let status = ReadIndexStatus { req, index, acks };
self.pending_read_index.insert(ctx.clone(), status);
self.read_index_queue.push_back(ctx);
}
pub fn recv_ack(&mut self, id: u64, ctx: &[u8]) -> Option<&HashSet<u64>> {
self.pending_read_index.get_mut(ctx).map(|rs| {
rs.acks.insert(id);
&rs.acks
})
}
pub fn advance(&mut self, ctx: &[u8], logger: &Logger) -> Vec<ReadIndexStatus> {
let mut rss = vec![];
if let Some(i) = self.read_index_queue.iter().position(|x| {
if !self.pending_read_index.contains_key(x) {
fatal!(logger, "cannot find correspond read state from pending map");
}
*x == ctx
}) {
for _ in 0..=i {
let rs = self.read_index_queue.pop_front().unwrap();
let status = self.pending_read_index.remove(&rs).unwrap();
rss.push(status);
}
}
rss
}
pub fn last_pending_request_ctx(&self) -> Option<Vec<u8>> {
self.read_index_queue.back().cloned()
}
#[inline]
pub fn pending_read_count(&self) -> usize {
self.read_index_queue.len()
}
}