use crate::client::storage::{GetFragment, MaybeFragment, StorageClient};
use cannyls::deadline::Deadline;
use cannyls::device::DeviceHandle;
use cannyls::lump::LumpHeader;
use frugalos_raft::NodeId;
use futures::{Async, Future, Poll};
use libfrugalos::entity::object::ObjectVersion;
use prometrics::metrics::{Counter, Histogram, MetricBuilder};
use slog::Logger;
use std::time::Instant;
use crate::util::{into_box_future, BoxFuture, Phase3};
use crate::{config, Error};
#[derive(Clone)]
pub(crate) struct RepairMetrics {
pub(crate) repairs_success_total: Counter,
pub(crate) repairs_failure_total: Counter,
pub(crate) repairs_unnecessary_total: Counter,
pub(crate) repairs_durations_seconds_step_1: Histogram,
pub(crate) repairs_durations_seconds_step_2: Histogram,
pub(crate) repairs_durations_seconds: Histogram,
}
impl RepairMetrics {
pub(crate) fn new(metric_builder: &MetricBuilder) -> Self {
RepairMetrics {
repairs_success_total: metric_builder
.counter("repairs_success_total")
.label("type", "repair")
.finish()
.expect("metric should be well-formed"),
repairs_failure_total: metric_builder
.counter("repairs_failure_total")
.label("type", "repair")
.finish()
.expect("metric should be well-formed"),
repairs_unnecessary_total: metric_builder
.counter("repairs_unnecessary_total")
.label("type", "repair")
.finish()
.expect("metric should be well-formed"),
repairs_durations_seconds_step_1: metric_builder
.histogram("repairs_durations_seconds_step_1")
.bucket(0.001)
.bucket(0.005)
.bucket(0.01)
.bucket(0.05)
.bucket(0.1)
.bucket(0.5)
.bucket(1.0)
.bucket(5.0)
.bucket(10.0)
.label("type", "repair")
.finish()
.expect("metric should be well-formed"),
repairs_durations_seconds_step_2: metric_builder
.histogram("repairs_durations_seconds_step_2")
.bucket(0.001)
.bucket(0.005)
.bucket(0.01)
.bucket(0.05)
.bucket(0.1)
.bucket(0.5)
.bucket(1.0)
.bucket(5.0)
.bucket(10.0)
.label("type", "repair")
.finish()
.expect("metric should be well-formed"),
repairs_durations_seconds: metric_builder
.histogram("repairs_durations_seconds")
.bucket(0.001)
.bucket(0.005)
.bucket(0.01)
.bucket(0.05)
.bucket(0.1)
.bucket(0.5)
.bucket(1.0)
.bucket(5.0)
.bucket(10.0)
.label("type", "repair")
.finish()
.expect("metric should be well-formed"),
}
}
}
pub(crate) struct RepairPrepContent {
future: BoxFuture<Option<ObjectVersion>>,
}
impl RepairPrepContent {
pub fn new(
logger: &Logger,
device: &DeviceHandle,
node_id: NodeId,
version: ObjectVersion,
) -> Self {
let logger = logger.clone();
let device = device.clone();
let lump_id = config::make_lump_id(&node_id, version);
debug!(
logger,
"Starts checking content: version={:?}, lump_id={:?}", version, lump_id
);
let future = into_box_future(
device
.request()
.deadline(Deadline::Infinity)
.head(lump_id)
.map(move |result| result.map_or(Some(version), |_| None)),
);
RepairPrepContent { future }
}
}
impl Future for RepairPrepContent {
type Item = Option<ObjectVersion>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.future.poll()
}
}
pub(crate) struct RepairContent {
logger: Logger,
node_id: NodeId,
version: ObjectVersion,
client: StorageClient,
device: DeviceHandle,
started_at: Instant,
repair_metrics: RepairMetrics,
phase: Phase3<BoxFuture<Option<LumpHeader>>, GetFragment, BoxFuture<bool>>,
}
impl RepairContent {
pub fn new(
logger: &Logger,
device: &DeviceHandle,
node_id: NodeId,
client: &StorageClient,
repair_metrics: &RepairMetrics,
version: ObjectVersion,
) -> Self {
let logger = logger.clone();
let device = device.clone();
let lump_id = config::make_lump_id(&node_id, version);
let started_at = Instant::now();
debug!(
logger,
"Starts checking content: version={:?}, lump_id={:?}", version, lump_id
);
let phase = Phase3::A(into_box_future(
device.request().deadline(Deadline::Infinity).head(lump_id),
));
RepairContent {
logger,
node_id,
version,
client: client.clone(),
device,
started_at,
repair_metrics: repair_metrics.clone(),
phase,
}
}
}
impl Future for RepairContent {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Async::Ready(phase) = track!(self.phase.poll().map_err(|e| {
self.repair_metrics.repairs_failure_total.increment();
e
}))? {
let next = match phase {
Phase3::A(Some(_)) => {
debug!(self.logger, "The object {:?} already exists", self.version);
self.repair_metrics.repairs_unnecessary_total.increment();
return Ok(Async::Ready(()));
}
Phase3::A(None) => {
debug!(
self.logger,
"The object {:?} does not exist (try repairing)", self.version
);
let elapsed =
prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.repair_metrics
.repairs_durations_seconds_step_1
.observe(elapsed);
let future = self.client.clone().get_fragment(self.node_id, self.version);
Phase3::B(future)
}
Phase3::B(MaybeFragment::NotParticipant) => {
debug!(
self.logger,
"The object {:?} should not be stored on this node: node_id={:?}",
self.version,
self.node_id
);
self.repair_metrics.repairs_failure_total.increment();
return Ok(Async::Ready(()));
}
Phase3::B(MaybeFragment::Fragment(mut content)) => {
crate::client::storage::append_checksum(&mut content);
let lump_id = config::make_lump_id(&self.node_id, self.version);
debug!(
self.logger,
"Puts repaired content: version={:?}, lump_id={:?}, content_size={}",
self.version,
lump_id,
content.len()
);
let elapsed =
prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.repair_metrics
.repairs_durations_seconds_step_2
.observe(elapsed);
let data = track!(self.device.allocate_lump_data_with_bytes(&content))
.expect("TODO: error handling");
let future = self
.device
.request()
.deadline(Deadline::Infinity)
.put(lump_id, data);
Phase3::C(into_box_future(future))
}
Phase3::C(_) => {
debug!(
self.logger,
"Completed repairing content: {:?}", self.version
);
self.repair_metrics.repairs_success_total.increment();
let elapsed =
prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.repair_metrics
.repairs_durations_seconds
.observe(elapsed);
return Ok(Async::Ready(()));
}
};
self.phase = next;
}
Ok(Async::NotReady)
}
}