use cannyls::device::DeviceHandle;
use frugalos_mds::Event;
use frugalos_raft::NodeId;
use futures::{Async, Future, Poll, Stream};
use libfrugalos::entity::object::ObjectVersion;
use libfrugalos::repair::RepairIdleness;
use prometrics::metrics::MetricBuilder;
use slog::Logger;
use crate::client::storage::StorageClient;
use crate::queue_executor::general_queue_executor::GeneralQueueExecutor;
use crate::queue_executor::repair_queue_executor::RepairQueueExecutor;
use crate::segment_gc::{SegmentGc, SegmentGcMetrics};
use crate::service::ServiceHandle;
use crate::Error;
pub struct Synchronizer {
logger: Logger,
node_id: NodeId,
device: DeviceHandle,
client: StorageClient,
segment_gc_metrics: SegmentGcMetrics,
segment_gc: Option<SegmentGc>,
segment_gc_step: u64,
general_queue: GeneralQueueExecutor,
repair_queue: RepairQueueExecutor,
}
impl Synchronizer {
pub fn new(
logger: Logger,
node_id: NodeId,
device: DeviceHandle,
service_handle: ServiceHandle,
client: StorageClient,
segment_gc_step: u64,
) -> Self {
let metric_builder = MetricBuilder::new()
.namespace("frugalos")
.subsystem("synchronizer")
.clone();
let enqueued_repair = metric_builder
.counter("enqueued_items")
.label("type", "repair")
.finish()
.expect("metric should be well-formed");
let enqueued_repair_prep = metric_builder
.counter("enqueued_items")
.label("type", "repair_prep")
.finish()
.expect("metric should be well-formed");
let enqueued_delete = metric_builder
.counter("enqueued_items")
.label("type", "delete")
.finish()
.expect("metric should be well-formed");
let dequeued_repair = metric_builder
.counter("dequeued_items")
.label("type", "repair")
.finish()
.expect("metric should be well-formed");
let dequeued_repair_prep = metric_builder
.counter("dequeued_items")
.label("type", "repair_prep")
.finish()
.expect("metric should be well-formed");
let dequeued_delete = metric_builder
.counter("dequeued_items")
.label("type", "delete")
.finish()
.expect("metric should be well-formed");
let general_queue = GeneralQueueExecutor::new(
&logger,
node_id,
&device,
&enqueued_repair_prep,
&enqueued_delete,
&dequeued_repair_prep,
&dequeued_delete,
);
let repair_queue = RepairQueueExecutor::new(
&logger,
node_id,
&device,
&client,
&service_handle,
&metric_builder,
&enqueued_repair,
&dequeued_repair,
);
Synchronizer {
logger,
node_id,
device,
client,
segment_gc_metrics: SegmentGcMetrics::new(&metric_builder),
segment_gc: None,
segment_gc_step,
general_queue,
repair_queue,
}
}
pub fn handle_event(&mut self, event: Event) {
debug!(
self.logger,
"New event: {:?} (metadata={})",
event,
self.client.is_metadata(),
);
if self.client.is_metadata() {
match event {
Event::StartSegmentGc { tx, .. } => {
debug!(self.logger, "StartSegmentGc suppressed");
tx.exit(Ok(()))
}
Event::StopSegmentGc { tx } => {
debug!(self.logger, "StopSegmentGc suppressed");
tx.exit(Ok(()))
}
_ => (),
}
} else {
match event {
Event::Putted { .. } => {
self.general_queue.push(&event);
}
Event::Deleted { version, .. } => {
self.general_queue.push(&event);
self.repair_queue.delete(version);
}
Event::StartSegmentGc {
object_versions,
next_commit,
tx,
} => {
if self.segment_gc.is_none() {
self.segment_gc = Some(SegmentGc::new(
&self.logger,
self.node_id,
&self.device,
object_versions,
ObjectVersion(next_commit.as_u64()),
self.segment_gc_metrics.clone(),
self.segment_gc_step,
tx,
));
}
}
Event::StopSegmentGc { tx } => {
if self.segment_gc.is_some() {
self.segment_gc_metrics.segment_gc_aborted.increment();
self.segment_gc = None;
self.segment_gc_metrics.reset();
}
tx.exit(Ok(()));
}
}
}
}
pub(crate) fn set_repair_idleness_threshold(
&mut self,
repair_idleness_threshold: RepairIdleness,
) {
self.repair_queue
.set_repair_idleness_threshold(repair_idleness_threshold);
}
}
impl Future for Synchronizer {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Async::Ready(Some(())) = self.segment_gc.poll().unwrap_or_else(|e| {
warn!(self.logger, "Task failure: {}", e);
Async::Ready(Some(()))
}) {
self.segment_gc = None;
self.segment_gc_metrics.reset();
}
if let Async::Ready(Some(versions)) = self.general_queue.poll().unwrap_or_else(|e| {
warn!(self.logger, "Task failure in general_queue: {}", e);
Async::Ready(None)
}) {
for version in versions {
self.repair_queue.push(version);
}
}
self.repair_queue.poll().unwrap_or_else(Into::into);
Ok(Async::NotReady)
}
}