use super::*;
use std::collections::HashMap;
pub struct HeartbeatBuffer {
buf: crossbeam::queue::SegQueue<(ShardIndex, request::Heartbeat)>,
}
impl HeartbeatBuffer {
pub fn new() -> Self {
Self {
buf: crossbeam::queue::SegQueue::new(),
}
}
pub fn push(&self, shard_index: ShardIndex, req: request::Heartbeat) {
self.buf.push((shard_index, req));
}
fn drain(&self) -> HashMap<ShardIndex, request::Heartbeat> {
let mut out = HashMap::new();
let n = self.buf.len();
for _ in 0..n {
let (k, v) = self.buf.pop().unwrap();
out.insert(k, v);
}
out
}
}
pub async fn run(buf: Arc<HeartbeatBuffer>, mut cli: raft::RaftClient, self_node_id: NodeAddress) {
loop {
tokio::time::sleep(Duration::from_millis(300)).await;
let heartbeats = buf.drain();
let states = {
let mut out = HashMap::new();
for (shard_index, heartbeat) in heartbeats {
let state = raft::LeaderCommitState {
leader_term: heartbeat.leader_term,
leader_commit_index: heartbeat.leader_commit_index,
};
out.insert(shard_index, state);
}
out
};
let req = raft::Heartbeat {
leader_id: self_node_id.to_string(),
leader_commit_states: states,
};
cli.send_heartbeat(req).await.ok();
}
}