use cannyls::deadline::Deadline;
use cannyls::device::DeviceHandle;
use cannyls::lump::LumpId;
use fibers_tasque::{DefaultCpuTaskQueue, TaskQueueExt};
use frugalos_mds::StartSegmentGcReply;
use frugalos_raft::NodeId;
use futures::future::{join_all, loop_fn, ok, Either};
use futures::future::{Future, Loop};
use futures::Poll;
use libfrugalos::entity::object::ObjectVersion;
use prometrics::metrics::{Counter, Gauge, MetricBuilder};
use slog::Logger;
use crate::Error;
use crate::{config, ErrorKind};
use trackable::error::ErrorKindExt;
#[derive(Clone)]
pub(crate) struct SegmentGcMetrics {
segment_gc_count: Counter,
segment_gc_deleted_objects: Counter,
segment_gc_remaining: Gauge,
pub(crate) segment_gc_aborted: Counter,
}
impl SegmentGcMetrics {
pub(crate) fn new(metric_builder: &MetricBuilder) -> Self {
SegmentGcMetrics {
segment_gc_count: metric_builder
.counter("segment_gc_count")
.finish()
.expect("metric should be well-formed"),
segment_gc_deleted_objects: metric_builder
.counter("segment_gc_deleted_objects")
.finish()
.expect("metric should be well-formed"),
segment_gc_remaining: metric_builder
.gauge("segment_gc_remaining")
.finish()
.expect("metric should be well-formed"),
segment_gc_aborted: metric_builder
.counter("segment_gc_aborted")
.finish()
.expect("metric should be well-formed"),
}
}
pub(crate) fn reset(&self) {
self.segment_gc_remaining.set(0.0);
}
}
pub(crate) struct SegmentGc {
future: Box<dyn Future<Item = (), Error = Error> + Send + 'static>,
}
impl SegmentGc {
#[allow(clippy::needless_pass_by_value, clippy::too_many_arguments)]
pub fn new(
logger: &Logger,
node_id: NodeId,
device: &DeviceHandle,
object_versions: Vec<ObjectVersion>,
object_version_limit: ObjectVersion,
segment_gc_metrics: SegmentGcMetrics,
segment_gc_step: u64,
tx: StartSegmentGcReply,
) -> Self {
let logger = logger.clone();
info!(logger, "Starts segment_gc");
segment_gc_metrics.segment_gc_count.increment();
let create_object_table = make_create_object_table(logger.clone(), object_versions);
let logger = logger.clone();
let logger2 = logger.clone();
let device = device.clone();
let combined_future = create_object_table
.and_then(move |object_table| {
make_list_and_delete_content(
&logger,
&device,
node_id,
object_version_limit,
segment_gc_step,
object_table,
segment_gc_metrics.segment_gc_deleted_objects,
segment_gc_metrics.segment_gc_remaining,
)
})
.then(move |result| {
match &result {
Ok(()) => {
info!(logger2, "SegmentGc objects done");
tx.exit(Ok(()));
}
Err(e) => {
tx.exit(Err(Box::new(e.clone())));
}
}
futures::future::done(result)
});
let future = Box::new(combined_future);
SegmentGc { future }
}
fn compute_deleted_versions(
lump_ids: Vec<LumpId>,
object_table: &ObjectTable,
) -> Vec<ObjectVersion> {
let mut deleted_versions = Vec::new();
for lump_id in lump_ids.into_iter().rev() {
let object_version = config::get_object_version_from_lump_id(lump_id);
if !object_table.has_object(object_version) {
deleted_versions.push(object_version);
}
}
deleted_versions
}
}
impl Future for SegmentGc {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
track!(self.future.poll())
}
}
#[allow(clippy::too_many_arguments)]
pub(self) fn make_list_and_delete_content(
logger: &Logger,
device: &DeviceHandle,
node_id: NodeId,
object_version_limit: ObjectVersion,
step: u64,
object_table: ObjectTable,
segment_gc_deleted_objects: Counter,
segment_gc_remaining: Gauge,
) -> impl Future<Item = (), Error = Error> + Send {
if step == 0 {
return Either::A(futures::future::err(
ErrorKind::Invalid.cause("step should be > 0").into(),
));
}
let logger = logger.clone();
let device = device.clone();
info!(
logger,
"Starts listing and deleting content, object_version_limit = {:?}, step = {}",
object_version_limit,
step
);
let returned_future = loop_fn(
(ObjectVersion(0), object_table),
move |(current_version, object_table)| {
if current_version >= object_version_limit {
segment_gc_remaining.set(0.0);
return Either::A(ok(Loop::Break(())));
}
segment_gc_remaining.set((object_version_limit.0 - current_version.0) as f64);
let segment_gc_deleted_objects = segment_gc_deleted_objects.clone();
let start_lump_id = config::make_lump_id(&node_id, current_version);
let end_object_version = std::cmp::min(
ObjectVersion(current_version.0 + step),
object_version_limit,
);
let end_lump_id = config::make_lump_id(&node_id, end_object_version);
let device_cloned = device.clone();
let future = device
.request()
.deadline(Deadline::Infinity)
.list_range(start_lump_id..end_lump_id)
.and_then(move |lump_ids| {
let deleted_objects =
SegmentGc::compute_deleted_versions(lump_ids, &object_table);
let delete_future = make_delete_objects(
deleted_objects,
&device_cloned,
node_id,
&segment_gc_deleted_objects,
);
delete_future.map(move |()| object_table)
})
.map(move |object_table| {
let next = ObjectVersion(current_version.0 + step);
Loop::Continue((next, object_table))
})
.map_err(From::from);
Either::B(future)
},
);
Either::B(returned_future)
}
pub(self) fn make_create_object_table(
logger: Logger,
object_versions: Vec<ObjectVersion>,
) -> impl Future<Item = ObjectTable, Error = Error> + Send {
debug!(logger, "Starts segment_gc");
DefaultCpuTaskQueue
.async_call(move || get_object_table(&logger, object_versions))
.map_err(From::from)
}
fn get_object_table(logger: &Logger, mut versions: Vec<ObjectVersion>) -> ObjectTable {
versions.sort_unstable();
let objects_count = versions.len();
debug!(
logger,
"SegmentGc machine_objects_count = {}", objects_count
);
ObjectTable(versions)
}
fn make_delete_objects(
deleted_objects: Vec<ObjectVersion>,
device: &DeviceHandle,
node_id: NodeId,
segment_gc_deleted_objects: &Counter,
) -> impl Future<Item = (), Error = cannyls::Error> {
let segment_gc_deleted_objects = segment_gc_deleted_objects.clone();
let futures: Vec<_> = deleted_objects
.into_iter()
.map(|object| {
let segment_gc_deleted_objects = segment_gc_deleted_objects.clone();
let lump_id = config::make_lump_id(&node_id, object);
device
.request()
.deadline(Deadline::Infinity)
.delete(lump_id)
.then(move |result| {
if let Ok(true) = result {
segment_gc_deleted_objects.increment();
}
ok(())
})
})
.collect();
join_all(futures).map(|_| ())
}
pub(self) struct ObjectTable(Vec<ObjectVersion>);
impl ObjectTable {
fn has_object(&self, object_version: ObjectVersion) -> bool {
self.0.binary_search(&object_version).is_ok()
}
}
#[cfg(test)]
mod tests {
use crate::segment_gc::{
make_create_object_table, make_list_and_delete_content, ObjectTable, SegmentGc,
};
use crate::test_util::tests::{setup_system, wait, System};
use cannyls::deadline::Deadline;
use cannyls::device::DeviceHandle;
use cannyls::lump::LumpId;
use futures::future::Future;
use libfrugalos::entity::object::ObjectVersion;
use slog::{Discard, Logger};
use std::{thread, time};
use trackable::result::TestResult;
use crate::config::make_lump_id;
use fibers::executor::Executor;
use prometrics::metrics::{Counter, Gauge};
#[test]
fn list_and_delete_content_works_correctly() -> TestResult {
let logger = Logger::root(Discard, o!());
let mut system = System::new(2, 1)?;
let (nodes, _client) = setup_system(&mut system, 3)?;
let (node0, _device_id0, handle0): (_, _, DeviceHandle) = nodes[0].clone();
thread::spawn(move || loop {
system.executor.run_once().unwrap();
thread::sleep(time::Duration::from_micros(100));
});
for i in 0..10 {
let lump_id = make_lump_id(&node0, ObjectVersion(i));
let lump_data = handle0.allocate_lump_data_with_bytes(&[0; 10])?;
wait(
handle0
.request()
.deadline(Deadline::Immediate)
.put(lump_id, lump_data)
.map_err(From::from),
)?;
}
let object_table = ObjectTable(vec![]);
let segment_gc_deleted_objects =
Counter::new("segment_gc_deleted_objects").expect("Never fails");
let segment_gc_remaining = Gauge::new("segment_gc_remaining").expect("Never fails");
wait(make_list_and_delete_content(
&logger,
&handle0,
node0,
ObjectVersion(10),
1,
object_table,
segment_gc_deleted_objects.clone(),
segment_gc_remaining,
))?;
for i in 0..10 {
let lump_id = make_lump_id(&node0, ObjectVersion(i));
assert!(!wait(
handle0
.request()
.deadline(Deadline::Immediate)
.delete(lump_id)
.map_err(From::from)
)?);
}
assert_eq!(segment_gc_deleted_objects.value() as u64, 10);
Ok(())
}
#[test]
fn create_object_table_lists_objects_correctly() -> TestResult {
let logger = Logger::root(Discard, o!());
let mut object_versions = Vec::new();
for i in 0..10 {
object_versions.push(ObjectVersion(i));
}
let create_object_table = make_create_object_table(logger, object_versions);
let ObjectTable(result) = wait(create_object_table)?;
assert_eq!(result.len(), 10);
for (i, object_version) in result.into_iter().enumerate() {
assert_eq!(object_version, ObjectVersion(i as u64));
}
Ok(())
}
#[test]
fn compute_deleted_versions_works_correctly() -> TestResult {
let lump_ids = vec![
LumpId::new(1),
LumpId::new(5),
LumpId::new(8),
LumpId::new(25),
LumpId::new(100),
];
let object_table = ObjectTable(vec![ObjectVersion(1), ObjectVersion(8), ObjectVersion(25)]);
let deleted_objects = SegmentGc::compute_deleted_versions(lump_ids, &object_table);
assert_eq!(deleted_objects, vec![ObjectVersion(100), ObjectVersion(5)]);
Ok(())
}
}