use cannyls::device::DeviceHandle;
use frugalos_raft::NodeId;
use futures::{Async, Future, Poll};
use libfrugalos::entity::object::ObjectVersion;
use libfrugalos::repair::RepairIdleness;
use prometrics::metrics::{Counter, MetricBuilder};
use slog::Logger;
use std::collections::BTreeSet;
use std::convert::Infallible;
use std::time::Instant;
use crate::client::storage::StorageClient;
use crate::repair::{RepairContent, RepairMetrics};
use crate::service::{RepairLock, ServiceHandle};
use crate::Error;
#[allow(clippy::large_enum_variant)]
enum Task {
Idle,
Repair(RepairContent, RepairLock),
}
impl Task {
fn is_sleeping(&self) -> bool {
match self {
Task::Idle => true,
Task::Repair(_, _) => false,
}
}
}
impl Future for Task {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
Task::Idle => Ok(Async::Ready(())),
Task::Repair(ref mut f, _) => track!(f.poll().map_err(Error::from)),
}
}
}
pub(crate) struct RepairQueueExecutor {
logger: Logger,
node_id: NodeId,
device: DeviceHandle,
client: StorageClient,
service_handle: ServiceHandle,
task: Task,
queue: BTreeSet<ObjectVersion>,
repair_idleness_threshold: RepairIdleness,
last_not_idle: Instant,
repair_metrics: RepairMetrics,
enqueued_repair: Counter,
dequeued_repair: Counter,
}
impl RepairQueueExecutor {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
logger: &Logger,
node_id: NodeId,
device: &DeviceHandle,
client: &StorageClient,
service_handle: &ServiceHandle,
metric_builder: &MetricBuilder,
enqueued_repair: &Counter,
dequeued_repair: &Counter,
) -> Self {
RepairQueueExecutor {
logger: logger.clone(),
node_id,
device: device.clone(),
client: client.clone(),
service_handle: service_handle.clone(),
task: Task::Idle,
queue: BTreeSet::new(),
repair_idleness_threshold: RepairIdleness::Disabled,
last_not_idle: Instant::now(),
repair_metrics: RepairMetrics::new(metric_builder),
enqueued_repair: enqueued_repair.clone(),
dequeued_repair: dequeued_repair.clone(),
}
}
pub(crate) fn push(&mut self, version: ObjectVersion) {
if self.queue.insert(version) {
self.enqueued_repair.increment();
}
}
pub(crate) fn delete(&mut self, version: ObjectVersion) {
if self.queue.remove(&version) {
self.dequeued_repair.increment();
}
}
fn pop(&mut self) -> Option<ObjectVersion> {
let result = self.queue.iter().next().copied();
if let Some(version) = result {
self.queue.remove(&version);
self.dequeued_repair.increment();
}
result
}
pub(crate) fn set_repair_idleness_threshold(
&mut self,
repair_idleness_threshold: RepairIdleness,
) {
self.repair_idleness_threshold = repair_idleness_threshold;
}
}
impl Future for RepairQueueExecutor {
type Item = Infallible; type Error = Infallible;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if !self.task.is_sleeping() {
self.last_not_idle = Instant::now();
debug!(self.logger, "last_not_idle = {:?}", self.last_not_idle);
}
while let Async::Ready(()) = self.task.poll().unwrap_or_else(|e| {
warn!(self.logger, "Task failure in RepairQueueExecutor: {}", e);
Async::Ready(())
}) {
self.task = Task::Idle;
if let RepairIdleness::Threshold(repair_idleness_threshold_duration) =
self.repair_idleness_threshold
{
if let Some(version) = self.pop() {
let elapsed = self.last_not_idle.elapsed();
if elapsed < repair_idleness_threshold_duration {
self.push(version);
break;
} else {
let repair_lock = self.service_handle.acquire_repair_lock();
if let Some(repair_lock) = repair_lock {
self.task = Task::Repair(
RepairContent::new(
&self.logger,
&self.device,
self.node_id,
&self.client,
&self.repair_metrics,
version,
),
repair_lock,
);
self.last_not_idle = Instant::now();
} else {
self.push(version);
break;
}
}
}
}
if let Task::Idle = self.task {
break;
}
}
Ok(Async::NotReady)
}
}