use slog::{Drain, Logger};
use std::collections::HashMap;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::thread;
use std::time::{Duration, Instant};
use raft::eraftpb::ConfState;
use raft::prelude::*;
use raft::storage::MemStorage;
use slog::{info, o};
type ProposeCallback = Box<dyn Fn() + Send>;
enum Msg {
Propose {
id: u8,
cb: ProposeCallback,
},
#[allow(dead_code)]
Raft(Message),
}
fn main() {
let storage = MemStorage::new_with_conf_state(ConfState::from((vec![1], vec![])));
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain)
.chan_size(4096)
.overflow_strategy(slog_async::OverflowStrategy::Block)
.build()
.fuse();
let logger = slog::Logger::root(drain, o!("tag" => format!("[{}]", 1)));
let cfg = Config {
id: 1,
election_tick: 10,
heartbeat_tick: 3,
max_size_per_msg: 1024 * 1024 * 1024,
max_inflight_msgs: 256,
applied: 0,
..Default::default()
};
let mut r = RawNode::new(&cfg, storage, &logger).unwrap();
let (sender, receiver) = mpsc::channel();
send_propose(logger.clone(), sender);
let mut t = Instant::now();
let mut timeout = Duration::from_millis(100);
let mut cbs = HashMap::new();
loop {
match receiver.recv_timeout(timeout) {
Ok(Msg::Propose { id, cb }) => {
cbs.insert(id, cb);
r.propose(vec![], vec![id]).unwrap();
}
Ok(Msg::Raft(m)) => r.step(m).unwrap(),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return,
}
let d = t.elapsed();
t = Instant::now();
if d >= timeout {
timeout = Duration::from_millis(100);
r.tick();
} else {
timeout -= d;
}
on_ready(&mut r, &mut cbs);
}
}
fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>) {
if !raft_group.has_ready() {
return;
}
let store = raft_group.raft.raft_log.store.clone();
let mut ready = raft_group.ready();
let handle_messages = |msgs: Vec<Message>| {
for _msg in msgs {
}
};
if !ready.messages().is_empty() {
handle_messages(ready.take_messages());
}
if !ready.snapshot().is_empty() {
store.wl().apply_snapshot(ready.snapshot().clone()).unwrap();
}
let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
_last_apply_index = entry.index;
if entry.data.is_empty() {
continue;
}
if entry.get_entry_type() == EntryType::EntryNormal {
if let Some(cb) = cbs.remove(entry.data.get(0).unwrap()) {
cb();
}
}
}
};
handle_committed_entries(ready.take_committed_entries());
if !ready.entries().is_empty() {
store.wl().append(&ready.entries()).unwrap();
}
if let Some(hs) = ready.hs() {
store.wl().set_hardstate(hs.clone());
}
if !ready.persisted_messages().is_empty() {
handle_messages(ready.take_persisted_messages());
}
let mut light_rd = raft_group.advance(ready);
if let Some(commit) = light_rd.commit_index() {
store.wl().mut_hard_state().set_commit(commit);
}
handle_messages(light_rd.take_messages());
handle_committed_entries(light_rd.take_committed_entries());
raft_group.advance_apply();
}
fn send_propose(logger: Logger, sender: mpsc::Sender<Msg>) {
thread::spawn(move || {
thread::sleep(Duration::from_secs(10));
let (s1, r1) = mpsc::channel::<u8>();
info!(logger, "propose a request");
sender
.send(Msg::Propose {
id: 1,
cb: Box::new(move || {
s1.send(0).unwrap();
}),
})
.unwrap();
let n = r1.recv().unwrap();
assert_eq!(n, 0);
info!(logger, "receive the propose callback");
});
}