#![allow(clippy::type_complexity)]
use bytecodec::{DecodeExt, EncodeExt};
use fibers::sync::mpsc;
use fibers::sync::oneshot::Monitored;
use fibers::time::timer;
use fibers_rpc::client::ClientServiceHandle as RpcServiceHandle;
use fibers_tasque::{self, AsyncCall, TaskQueueExt};
use frugalos_raft::{NodeId, RaftIo};
use futures::{Async, Future, Poll, Stream};
use libfrugalos::consistency::ReadConsistency;
use libfrugalos::entity::object::{Metadata, ObjectVersion};
use prometrics::metrics::{
Counter, CounterBuilder, Gauge, GaugeBuilder, Histogram, HistogramBuilder, MetricBuilder,
};
use raftlog::cluster::{ClusterConfig, ClusterMembers};
use raftlog::election::Role;
use raftlog::log::{LogEntry, LogIndex, LogPosition};
use raftlog::{self, ReplicatedLog};
use slog::Logger;
use std::collections::VecDeque;
use std::env;
use std::ops::Range;
use std::time::{Duration, Instant};
use trackable::error::ErrorKindExt;
use super::metrics::make_histogram;
use super::snapshot::SnapshotThreshold;
use super::timeout::CountDownTimeout;
use super::{Event, NodeHandle, Proposal, ProposalMetrics, Reply, Request, Seconds};
use crate::codec;
use crate::config::FrugalosMdsConfig;
use crate::machine::{Command, Machine};
use crate::node::Event::{StartSegmentGc, StopSegmentGc};
use crate::protobuf;
use crate::{Error, ErrorKind, Result, ServiceHandle};
use std::cmp::Ordering;
type RaftEvent = raftlog::Event;
#[derive(Debug)]
struct LargeProposalQueueThreshold(usize);
#[derive(Debug)]
struct LargeLeaderWaitingQueueThreshold(usize);
#[derive(Debug)]
struct ReElectionThreshold(usize);
#[derive(Clone)]
struct Metrics {
objects: Gauge,
snapshots_total: Counter,
snapshot_bytes_total: Counter,
snapshot_encoding_duration_seconds: Histogram,
snapshot_decoding_duration_seconds: Histogram,
proposal_queue_len: Gauge,
get_request_duration_seconds: Histogram,
leader_waiting_duration_seconds: Histogram,
leader_waiting_enqueued_total: Counter,
leader_waiting_completed_total: Counter,
leader_waiting_failed_total: Counter,
}
impl Metrics {
pub fn new(node_id: &NodeId) -> Result<Self> {
let node = node_id.to_string();
let objects = track!(GaugeBuilder::new("objects")
.namespace("frugalos")
.subsystem("mds")
.label("node", &node)
.label("role", "Follower")
.default_registry()
.finish())?;
let proposal_queue_len = track!(GaugeBuilder::new("proposal_queue_len")
.namespace("frugalos")
.subsystem("mds")
.default_registry()
.finish())?;
let snapshots_total = track!(CounterBuilder::new("snapshots_total")
.namespace("frugalos")
.subsystem("mds")
.default_registry()
.finish())?;
let snapshot_bytes_total = track!(CounterBuilder::new("snapshot_bytes_total")
.namespace("frugalos")
.subsystem("mds")
.default_registry()
.finish())?;
let snapshot_encoding_duration_seconds = track!(make_histogram(
&mut HistogramBuilder::new("snapshot_encoding_duration_seconds")
.namespace("frugalos")
.subsystem("mds")
))?;
let snapshot_decoding_duration_seconds = track!(make_histogram(
&mut HistogramBuilder::new("snapshot_decoding_duration_seconds")
.namespace("frugalos")
.subsystem("mds")
))?;
let get_request_duration_seconds = track!(make_histogram(
&mut HistogramBuilder::new("get_request_duration_seconds")
.namespace("frugalos")
.subsystem("mds")
))?;
let leader_waiting_duration_seconds = track!(make_histogram(
&mut HistogramBuilder::new("leader_waiting_duration_seconds")
.namespace("frugalos")
.subsystem("mds")
))?;
let leader_waiting_enqueued_total =
track!(CounterBuilder::new("leader_waiting_enqueued_total")
.namespace("frugalos")
.subsystem("mds")
.help("The total number of active requests waiting for a new leader")
.default_registry()
.finish())?;
let leader_waiting_completed_total =
track!(CounterBuilder::new("leader_waiting_completed_total")
.namespace("frugalos")
.subsystem("mds")
.help("The total number of completed requests waiting for a new leader")
.default_registry()
.finish())?;
let leader_waiting_failed_total =
track!(CounterBuilder::new("leader_waiting_failed_total")
.namespace("frugalos")
.subsystem("mds")
.help("The total number of failed requests waiting for a new leader")
.default_registry()
.finish())?;
Ok(Metrics {
objects,
snapshots_total,
snapshot_bytes_total,
snapshot_encoding_duration_seconds,
snapshot_decoding_duration_seconds,
proposal_queue_len,
get_request_duration_seconds,
leader_waiting_duration_seconds,
leader_waiting_enqueued_total,
leader_waiting_completed_total,
leader_waiting_failed_total,
})
}
}
struct LeaderWaiting {
monitored: Monitored<NodeId, Error>,
started_at: Instant,
metrics: Metrics,
}
impl LeaderWaiting {
fn exit(self, result: Result<NodeId>) {
let elapsed = prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.metrics
.leader_waiting_duration_seconds
.observe(elapsed);
self.monitored.exit(result);
}
}
#[derive(Debug)]
struct Stopping(Option<Reply<()>>);
impl Stopping {
fn new(reply: Reply<()>) -> Self {
Stopping(Some(reply))
}
}
impl Drop for Stopping {
fn drop(&mut self) {
if let Some(monitored) = self.0.take() {
monitored.exit(Ok(()));
}
}
}
#[derive(Debug, PartialEq, Eq)]
enum Phase {
Running,
Stopping,
Stopped,
}
pub struct Node {
logger: Logger,
service: ServiceHandle,
node_id: NodeId,
rlog: ReplicatedLog<RaftIo>,
leader: Option<NodeId>,
large_leader_waiting_queue_threshold: LargeLeaderWaitingQueueThreshold,
leader_waitings: Vec<LeaderWaiting>,
leader_waiting_timeout: CountDownTimeout,
request_rx: mpsc::Receiver<Request>,
proposals: VecDeque<Proposal>,
local_log_size: usize,
snapshot_threshold: SnapshotThreshold,
next_commit: LogIndex,
last_commit: Option<LogIndex>,
events: VecDeque<Event>,
machine: Machine,
metrics: Metrics,
proposal_metrics: ProposalMetrics,
ready_snapshot: Option<AsyncCall<Result<(LogIndex, Vec<u8>)>>>,
decoding_snapshot: Option<AsyncCall<Result<(LogPosition, Machine, Vec<ObjectVersion>)>>>,
polling_timer: timer::Timeout,
polling_timer_interval: Duration,
phase: Phase,
stopping: Option<Stopping>,
rpc_service: RpcServiceHandle,
log_leader_absence_timeout: Option<CountDownTimeout>,
staled_object_threshold: usize,
staled_object_rounds: usize,
large_queue_rounds: usize,
large_queue_threshold: LargeProposalQueueThreshold,
reelection_threshold: ReElectionThreshold,
commit_timeout: Option<usize>,
commit_timeout_threshold: usize,
}
impl Node {
pub fn new(
logger: Logger,
config: &FrugalosMdsConfig,
service: ServiceHandle,
node_id: NodeId,
cluster: ClusterMembers,
io: RaftIo,
rpc_service: RpcServiceHandle,
) -> Result<Self> {
let (request_tx, request_rx) = mpsc::channel();
let node_handle = NodeHandle::new(request_tx);
track!(service.add_node(node_id, node_handle))?;
let metric_builder = MetricBuilder::new();
let rlog = track!(ReplicatedLog::new(
node_id.to_raft_node_id(),
cluster,
io,
&metric_builder
))?;
let snapshot_threshold = config.snapshot_threshold();
let snapshot_threshold_range = env::var("FRUGALOS_SNAPSHOT_THRESHOLD")
.ok()
.and_then(|v| v.parse().map(|v| Range { start: v, end: v }).ok())
.unwrap_or(snapshot_threshold);
let snapshot_threshold = track!(SnapshotThreshold::new(
&node_id.to_string(),
snapshot_threshold_range
))?;
let reelection_threshold = env::var("FRUGALOS_REELECTION_THRESHOLD")
.ok()
.and_then(|v| v.parse().map(ReElectionThreshold).ok())
.unwrap_or_else(|| ReElectionThreshold(config.reelection_threshold));
let large_queue_threshold = env::var("FRUGALOS_LARGE_QUEUE_THRESHOLD")
.ok()
.and_then(|v| v.parse().map(LargeProposalQueueThreshold).ok())
.unwrap_or_else(|| LargeProposalQueueThreshold(config.large_proposal_queue_threshold));
let large_leader_waiting_queue_threshold =
LargeLeaderWaitingQueueThreshold(config.large_leader_waiting_queue_threshold);
let leader_waiting_timeout = CountDownTimeout::new(config.leader_waiting_timeout_threshold);
let log_leader_absence_timeout = if config.log_leader_absence {
Some(CountDownTimeout::new(config.log_leader_absence_threshold))
} else {
None
};
info!(
logger,
"Thresholds: snapshot={}, reelection={}, queue={}, commit_timeout={}, leader_waiting={}, staled_object={}, log_leader_absence={}, log_leader_absence_threshold={}",
snapshot_threshold,
reelection_threshold.0,
large_queue_threshold.0,
config.commit_timeout_threshold,
large_leader_waiting_queue_threshold.0,
config.staled_object_threshold,
config.log_leader_absence,
config.log_leader_absence_threshold,
);
let metrics = track!(Metrics::new(&node_id))?;
let proposal_metrics = track!(ProposalMetrics::new())?;
Ok(Node {
logger,
service,
node_id,
rlog,
leader: None,
large_leader_waiting_queue_threshold,
leader_waitings: Vec::new(),
leader_waiting_timeout,
request_rx,
proposals: VecDeque::new(),
local_log_size: 0,
snapshot_threshold,
next_commit: LogIndex::new(0),
last_commit: None,
events: VecDeque::new(),
machine: Machine::new(),
metrics,
proposal_metrics,
ready_snapshot: None,
decoding_snapshot: None,
polling_timer: timer::timeout(config.node_polling_interval),
polling_timer_interval: config.node_polling_interval,
phase: Phase::Running,
stopping: None,
log_leader_absence_timeout,
large_queue_rounds: 0,
large_queue_threshold,
reelection_threshold,
commit_timeout: None,
commit_timeout_threshold: config.commit_timeout_threshold,
rpc_service,
staled_object_rounds: 0,
staled_object_threshold: config.staled_object_threshold,
})
}
pub fn get_next_commit(&self) -> LogIndex {
self.next_commit
}
#[allow(clippy::cognitive_complexity)]
fn handle_request(&mut self, request: Request) {
match request {
Request::GetLeader(_, _)
| Request::Get(_, _, _, _, _)
| Request::Head(_, _, _, _)
| Request::Exit
| Request::Stop(_)
| Request::TakeSnapshot
| Request::StartElection
| Request::StartSegmentGc(_)
| Request::StopSegmentGc(_) => {}
_ => {
if let Err(e) = self.check_leader() {
request.failed(e);
return;
}
}
}
match request {
Request::StartElection => {
info!(self.logger, "Re-election is required");
self.rlog.start_election();
}
Request::GetLeader(started_at, monitored) => {
info!(self.logger, "GetLeader: {:?}", self.leader);
let waiting = LeaderWaiting {
monitored,
started_at,
metrics: self.metrics.clone(),
};
if let Some(leader) = self.leader {
waiting.exit(Ok(leader));
} else {
if self.leader_waitings.is_empty() {
self.leader_waiting_timeout.reset();
}
self.leader_waitings.push(waiting);
self.metrics.leader_waiting_enqueued_total.increment();
if self.leader_waitings.len() > self.large_leader_waiting_queue_threshold.0 {
warn!(self.logger, "Too many waitings (cleared)");
self.clear_leader_waitings();
}
}
}
Request::List(monitored) => {
let list = self.machine.to_summaries();
monitored.exit(Ok(list));
}
Request::ListByPrefix(prefix, monitored) => {
let list = self.machine.to_summaries_by_prefix(&prefix);
monitored.exit(Ok(list));
}
Request::LatestVersion(monitored) => {
let latest = self.machine.latest_version();
monitored.exit(Ok(latest));
}
Request::ObjectCount(monitored) => monitored.exit(Ok(self.machine.len() as u64)),
Request::Get(object_id, expect, consistency, started_at, monitored) => {
let result = self.check_leader_if_needed(&consistency);
let elapsed = prometrics::timestamp::duration_to_seconds(started_at.elapsed());
self.metrics.get_request_duration_seconds.observe(elapsed);
monitored.exit(result.and_then(|()| self.machine.get(&object_id, &expect)));
}
Request::Head(object_id, expect, consistency, monitored) => {
let result = self.check_leader_if_needed(&consistency);
monitored.exit(result.and_then(|()| self.machine.head(&object_id, &expect)));
}
Request::Put(object_id, data, expect, put_content_timeout, started_at, monitored) => {
let command = Command::Put {
object_id,
userdata: data,
expect,
put_content_timeout,
};
let result = track!(protobuf::command_encoder().encode_into_bytes(command))
.map_err(Error::from)
.and_then(|c| track!(self.rlog.propose_command(c)).map_err(Error::from));
match result {
Err(e) => monitored.exit(Err(e)),
Ok(proposal_id) => {
let proposal = Proposal::Put(
proposal_id,
started_at,
self.proposal_metrics.clone(),
monitored,
);
self.push_proposal(proposal);
}
}
}
Request::Delete(object_id, expect, started_at, monitored) => {
let command = Command::Delete { object_id, expect };
let result = track!(protobuf::command_encoder().encode_into_bytes(command))
.map_err(Error::from)
.and_then(|c| track!(self.rlog.propose_command(c)).map_err(Error::from));
match result {
Err(e) => monitored.exit(Err(e)),
Ok(proposal_id) => {
let proposal = Proposal::Delete(
proposal_id,
started_at,
self.proposal_metrics.clone(),
monitored,
);
self.push_proposal(proposal);
}
}
}
Request::DeleteByVersion(object_version, monitored) => {
let command = Command::DeleteByVersion { object_version };
let result = track!(protobuf::command_encoder().encode_into_bytes(command))
.map_err(Error::from)
.and_then(|c| track!(self.rlog.propose_command(c)).map_err(Error::from));
match result {
Err(e) => monitored.exit(Err(e)),
Ok(proposal_id) => {
let proposal = Proposal::Delete(
proposal_id,
Instant::now(),
self.proposal_metrics.clone(),
monitored,
);
self.push_proposal(proposal);
}
}
}
Request::DeleteByRange(version_from, version_to, _monitored) => {
let command = Command::DeleteByRange {
version_from,
version_to,
};
let result = track!(protobuf::command_encoder().encode_into_bytes(command))
.map_err(Error::from)
.and_then(|c| track!(self.rlog.propose_command(c)).map_err(Error::from));
match result {
Err(_e) => {
unreachable!();
}
Ok(_proposal_id) => {
unreachable!();
}
}
}
Request::DeleteByPrefix(prefix, monitored) => {
let command = Command::DeleteByPrefix {
prefix: prefix.clone(),
};
let result = track!(protobuf::command_encoder().encode_into_bytes(command))
.map_err(Error::from)
.and_then(|c| track!(self.rlog.propose_command(c)).map_err(Error::from));
match result {
Err(e) => monitored.exit(Err(e)),
Ok(proposal_id) => {
let proposal = Proposal::DeleteByPrefix(
proposal_id,
Instant::now(),
self.proposal_metrics.clone(),
prefix,
monitored,
);
self.push_proposal(proposal);
}
}
}
Request::Stop(monitored) => {
if self.phase == Phase::Running {
info!(self.logger, "Starts stopping the node");
match track!(self.take_snapshot()) {
Err(e) => {
error!(self.logger, "Cannot take snapshot: {}", e);
self.phase = Phase::Stopped;
monitored.exit(Ok(()));
}
Ok(false) => {
self.phase = Phase::Stopped;
warn!(self.logger, "no take snapshot");
monitored.exit(Ok(()));
}
Ok(true) => {
self.phase = Phase::Stopping;
self.stopping = Some(Stopping::new(monitored));
}
}
}
}
Request::TakeSnapshot => {
if let Err(e) = track!(self.take_snapshot()) {
error!(self.logger, "Cannot take snapshot: {}", e);
}
}
Request::StartSegmentGc(tx) => {
let object_versions = self.machine.to_versions();
self.events.push_back(StartSegmentGc {
object_versions,
next_commit: self.next_commit,
tx,
})
}
Request::StopSegmentGc(tx) => {
self.events.push_back(StopSegmentGc { tx });
}
Request::Exit => {
if self.phase == Phase::Stopping {
info!(self.logger, "Exit: node={:?}", self.node_id);
self.phase = Phase::Stopped;
}
}
}
}
fn take_snapshot(&mut self) -> Result<bool> {
let commit = if let Some(commit) = self.last_commit {
if commit.as_u64() == 0 {
return Ok(false);
}
commit
} else {
return Ok(false);
};
if !self.rlog.is_snapshot_installing() && self.ready_snapshot.is_none() {
self.local_log_size = 0;
self.snapshot_threshold.refresh();
info!(
self.logger,
"Starts taking snapshot: objects={}, next_threshold={}",
self.machine.len(),
self.snapshot_threshold.value()
);
let machine = self.machine.clone();
info!(self.logger, "Snapshot cloned");
let logger = self.logger.clone();
let started_at = Instant::now();
let metrics = self.metrics.clone();
let future = fibers_tasque::DefaultCpuTaskQueue.async_call(move || {
let snapshot = track!(codec::encode_machine(&machine))?;
info!(
logger,
"Converted: machine to snapshot: {} bytes",
snapshot.len(),
);
metrics.snapshots_total.increment();
metrics.snapshot_bytes_total.add_u64(snapshot.len() as u64);
let elapsed = prometrics::timestamp::duration_to_seconds(started_at.elapsed());
metrics.snapshot_encoding_duration_seconds.observe(elapsed);
Ok((commit, snapshot))
});
self.ready_snapshot = Some(future);
}
Ok(true)
}
fn push_proposal(&mut self, proposal: Proposal) {
while let Some(last) = self.proposals.pop_back() {
if last.id().index < proposal.id().index {
self.proposals.push_back(last);
break;
}
warn!(self.logger, "This proposal is rejected: {:?}", last.id());
last.notify_rejected();
}
self.proposals.push_back(proposal);
if self.commit_timeout.is_none() {
self.commit_timeout = Some(self.commit_timeout_threshold);
}
}
fn check_leader(&self) -> Result<()> {
track_assert_eq!(
self.rlog.local_node().role,
Role::Leader,
ErrorKind::NotLeader
);
Ok(())
}
fn check_leader_if_needed(&self, consistency: &ReadConsistency) -> Result<()> {
match consistency {
ReadConsistency::Stale | ReadConsistency::Quorum | ReadConsistency::Subset(_) => {
if self.is_staled_object_visible() {
Ok(())
} else {
self.check_leader()
}
}
ReadConsistency::Consistent => self.check_leader(),
}
}
fn change_leader(&mut self, leader: NodeId) {
self.metrics
.leader_waiting_completed_total
.add_u64(self.leader_waitings.len() as u64);
for x in self.leader_waitings.drain(..) {
x.exit(Ok(leader));
}
self.leader = Some(leader);
}
fn is_follower(&self) -> bool {
self.rlog.local_node().role == Role::Follower
}
fn handle_raft_event(&mut self, event: RaftEvent) -> Result<()> {
use raftlog::Event as E;
trace!(self.logger, "New raft event: {:?}", event);
match event {
E::RoleChanged { new_role } => {
info!(self.logger, "New raft role: {:?}", new_role);
let role = format!("{:?}", new_role);
track!(self.metrics.objects.labels_mut().insert("role", &role))?;
}
E::TermChanged { new_ballot } => {
info!(
self.logger,
"New raft election term: ballot={:?}", new_ballot
);
self.leader = None;
}
E::NewLeaderElected if self.is_follower() => {
let leader = track!(NodeId::from_raft_node_id(
&self.rlog.local_node().ballot.voted_for
))?;
info!(self.logger, "New leader is elected: {:?}", leader);
self.change_leader(leader);
}
E::NewLeaderElected => {}
E::Committed { index, entry } => track!(self.handle_committed(index, entry))?,
E::SnapshotLoaded { new_head, snapshot } => {
info!(
self.logger,
"New snapshot is loaded: new_head={:?}, bytes={}",
new_head,
snapshot.len()
);
let logger = self.logger.clone();
let started_at = Instant::now();
let metrics = self.metrics.clone();
let future = fibers_tasque::DefaultCpuTaskQueue.async_call(move || {
let machine = track!(codec::decode_machine(&snapshot))?;
let versions = machine.to_versions();
info!(logger, "Snapshot decoded: {} bytes", snapshot.len());
let elapsed = prometrics::timestamp::duration_to_seconds(started_at.elapsed());
metrics.snapshot_decoding_duration_seconds.observe(elapsed);
Ok((new_head, machine, versions))
});
self.decoding_snapshot = Some(future);
}
E::SnapshotInstalled { new_head } => {
info!(
self.logger,
"New snapshot is installed: new_head={:?}, phase={:?}", new_head, self.phase
);
if self.stopping.is_some() {
debug!(self.logger, "Drop stopping");
self.stopping = None;
}
}
}
Ok(())
}
fn handle_committed(&mut self, commit: LogIndex, entry: LogEntry) -> Result<()> {
track_assert_eq!(self.next_commit, commit, ErrorKind::InvalidInput);
self.next_commit = commit + 1;
let mut proposal = None;
while let Some(next) = self.proposals.pop_front() {
match next.id().index.cmp(&commit) {
Ordering::Equal => {
if next.id().term == entry.term() {
proposal = Some(next);
break;
} else {
warn!(self.logger, "This proposal is rejected: {:?}", next.id());
next.notify_rejected();
}
}
Ordering::Greater => {
self.proposals.push_front(next);
break;
}
Ordering::Less => {
track_panic!(ErrorKind::Other, "Inconsistent state");
}
}
}
match entry {
LogEntry::Noop { .. } => {
let leader = track!(NodeId::from_raft_node_id(
&self.rlog.local_node().ballot.voted_for
))?;
info!(
self.logger,
"New leader is elected: {:?} (commit:{:?})", leader, commit
);
self.change_leader(leader);
}
LogEntry::Command { command, .. } => {
self.commit_timeout = None;
let command = track!(protobuf::command_decoder().decode_from_bytes(&command))?;
let result = track!(self.handle_command(commit, command));
if let Some(proposal) = proposal {
match result {
Err(e) => proposal.notify_error(e),
Ok(old) => proposal.notify_committed(&old),
}
}
}
LogEntry::Config { config, .. } => self.handle_config(commit, &config),
}
self.last_commit = Some(commit);
self.local_log_size += 1;
if self.local_log_size > self.snapshot_threshold.value() {
track!(self.take_snapshot())?;
}
Ok(())
}
fn handle_command(&mut self, commit: LogIndex, command: Command) -> Result<Vec<ObjectVersion>> {
match command {
Command::Put {
object_id,
userdata: data,
put_content_timeout,
expect,
} => {
let version = ObjectVersion(commit.as_u64());
let metadata = Metadata { version, data };
let old = track!(self.machine.put(object_id, metadata, &expect))?;
if let Some(old) = old {
track_assert!(
old < version,
ErrorKind::InvalidInput,
"old={:?}, new={:?}",
old,
version
);
self.events.push_back(Event::Deleted { version: old });
}
self.events.push_back(Event::Putted {
version,
put_content_timeout,
});
self.metrics.objects.set(self.machine.len() as f64);
Ok(old.into_iter().collect())
}
Command::Delete { object_id, expect } => {
let old = track!(self.machine.delete(&object_id, &expect))?;
if let Some(version) = old {
self.events.push_back(Event::Deleted { version });
}
self.metrics.objects.set(self.machine.len() as f64);
Ok(old.into_iter().collect())
}
Command::DeleteByVersion { object_version } => {
let old = track!(self.machine.delete_version(object_version))?;
if let Some(version) = old {
self.events.push_back(Event::Deleted { version });
}
self.metrics.objects.set(self.machine.len() as f64);
Ok(old.into_iter().collect())
}
Command::DeleteByRange {
version_from,
version_to,
} => {
unreachable!(
"Command::DeleteByRange from = {:?}, to = {:?}",
version_from, version_to
);
}
Command::DeleteByPrefix { prefix } => {
let deleted = track!(self.machine.delete_by_prefix(&prefix))?;
deleted
.iter()
.for_each(|&version| self.events.push_back(Event::Deleted { version }));
self.metrics.objects.set(self.machine.len() as f64);
Ok(deleted)
}
}
}
fn handle_config(&mut self, commit: LogIndex, config: &ClusterConfig) {
info!(
self.logger,
"New cluster configuration at {:?}: {:?}", commit, config
);
}
fn is_staled_object_visible(&self) -> bool {
self.leader.is_some() || self.staled_object_rounds <= self.staled_object_threshold
}
fn start_reelection(&mut self) {
let members = self.rlog.cluster_config().primary_members();
let local = self.rlog.local_node();
for m in members.iter().filter(|n| **n != local.id) {
let m = track!(NodeId::from_raft_node_id(&m));
if let Ok(m) = m {
let client = ::libfrugalos::client::mds::Client::new(
(m.addr, m.local_id.to_string()),
self.rpc_service.clone(),
);
client.recommend_to_leader();
}
}
}
fn clear_leader_waitings(&mut self) {
self.metrics
.leader_waiting_failed_total
.add_u64(self.leader_waitings.len() as u64);
for x in self.leader_waitings.drain(..) {
let e = track!(Error::from(
ErrorKind::Other.cause("Leader waiting timeout")
));
x.exit(Err(e));
}
}
}
impl Drop for Node {
fn drop(&mut self) {
if let Err(e) = track!(self.service.remove_node(self.node_id)) {
warn!(
self.logger,
"Cannot remove the node {:?}: {}", self.node_id, e
);
}
}
}
impl Stream for Node {
type Item = Event;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
while self.polling_timer.poll().expect("Broken timer").is_ready() {
self.polling_timer = timer::timeout(self.polling_timer_interval);
let proposal_queue_len = self.rlog.proposal_queue_len();
self.metrics
.proposal_queue_len
.set(proposal_queue_len as f64);
if proposal_queue_len > self.large_queue_threshold.0 {
self.large_queue_rounds += 1;
if self.large_queue_rounds >= self.reelection_threshold.0 {
warn!(self.logger, "The leader may be slow. Reelection is started");
self.start_reelection();
self.large_queue_rounds = 0;
}
} else {
self.large_queue_rounds = 0;
}
if let Some(n) = self.commit_timeout {
if n == 0 {
warn!(self.logger, "Commit timeout. Reelection is started");
self.commit_timeout = None;
self.start_reelection();
} else {
self.commit_timeout = Some(n - 1);
}
}
if self.leader_waiting_timeout.count_down() && !self.leader_waitings.is_empty() {
warn!(self.logger, "Leader waiting timeout (cleared)");
self.clear_leader_waitings();
}
if self.leader.is_some() {
self.staled_object_rounds = 0;
} else {
self.staled_object_rounds += 1;
}
if let Some(ref mut timeout) = self.log_leader_absence_timeout {
if self.leader.is_some() {
timeout.reset();
} else if timeout.count_down() {
warn!(
self.logger,
"Leader absence timeout has been expired: expired_total={}",
timeout.expired_total(),
);
}
}
}
match track!(self.decoding_snapshot.poll().map_err(Error::from))? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => {}
Async::Ready(Some(result)) => {
let (new_head, machine, versions) = track!(result)?;
info!(self.logger, "Snapshot decoded: new_head={:?}", new_head);
let delay = env::var("FRUGALOS_SNAPSHOT_REPAIR_DELAY")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(0);
self.events.reserve_exact(machine.len());
self.events
.extend(versions.into_iter().map(|version| Event::Putted {
version,
put_content_timeout: Seconds(delay),
}));
self.next_commit = new_head.index;
self.machine = machine;
self.metrics.objects.set(self.machine.len() as f64);
self.decoding_snapshot = None;
}
}
if let Async::Ready(Some(result)) = track!(self.ready_snapshot.poll().map_err(Error::from))?
{
info!(self.logger, "Snapshot readied");
let (commit, snapshot) = track!(result)?;
track!(self.rlog.install_snapshot(commit, snapshot).or_else(|e| {
if *e.kind() == ::raftlog::ErrorKind::Busy {
info!(self.logger, "Busy");
Ok(())
} else {
Err(e)
}
}))?;
self.ready_snapshot = None;
}
while let Async::Ready(polled) = self.request_rx.poll().expect("Never fails") {
let request = polled.expect("Never fails");
self.handle_request(request);
}
while let Async::Ready(polled) = track!(self.rlog.poll())? {
if let Some(event) = polled {
if let raftlog::Event::SnapshotLoaded { .. } = event {
track!(self.handle_raft_event(event))?;
break;
} else {
track!(self.handle_raft_event(event))?;
}
} else {
track_panic!(
ErrorKind::Other,
"Unexpected termination of the Raft event stream"
);
}
}
if self.phase == Phase::Stopped {
info!(self.logger, "Stopped");
return Ok(Async::Ready(None));
}
if let Some(event) = self.events.pop_front() {
if self.events.capacity() > 32 && self.events.len() < self.events.capacity() / 2 {
self.events.shrink_to_fit();
}
return Ok(Async::Ready(Some(event)));
}
Ok(Async::NotReady)
}
}