use cannyls::deadline::Deadline;
use fibers::sync::mpsc;
use futures::{Async, Future, Poll};
use raftlog::log::{Log, LogSuffix};
use raftlog::{Error, ErrorKind};
use slog::Logger;
use std::mem;
use std::time::Instant;
use trackable::error::ErrorKindExt;
use super::log_prefix::{LoadLogPrefix, SaveLogPrefix};
use super::log_suffix::{LoadLogSuffix, SaveLogSuffix};
use super::{into_box_future, BoxFuture, Event, Handle, LocalNodeId, StorageMetrics};
pub struct SaveLog {
pub(crate) inner: SaveLogInner,
started_at: Instant,
metrics: StorageMetrics,
}
impl SaveLog {
pub(crate) fn new(inner: SaveLogInner, metrics: StorageMetrics) -> Self {
Self {
inner,
started_at: Instant::now(),
metrics,
}
}
}
impl Future for SaveLog {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(()) = track!(self.inner.poll())? {
let elapsed = prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.metrics.save_log_duration_seconds.observe(elapsed);
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
#[allow(clippy::large_enum_variant)]
pub enum SaveLogInner {
Suffix(SaveLogSuffix),
Prefix(SaveLogPrefix),
Failed(Error),
}
impl Future for SaveLogInner {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
SaveLogInner::Suffix(ref mut f) => track!(f.poll()),
SaveLogInner::Prefix(ref mut f) => track!(f.poll()),
SaveLogInner::Failed(ref mut e) => {
let e = mem::replace(e, ErrorKind::Other.error().into());
Err(track!(e))
}
}
}
}
pub struct LoadLog {
pub(crate) inner: LoadLogInner,
started_at: Instant,
metrics: StorageMetrics,
}
impl LoadLog {
pub(crate) fn new(inner: LoadLogInner, metrics: StorageMetrics) -> Self {
Self {
inner,
started_at: Instant::now(),
metrics,
}
}
}
impl Future for LoadLog {
type Item = Log;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(item) = track!(self.inner.poll())? {
let elapsed = prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.metrics.load_log_duration_seconds.observe(elapsed);
Ok(Async::Ready(item))
} else {
Ok(Async::NotReady)
}
}
}
#[allow(clippy::large_enum_variant)]
pub(crate) enum LoadLogInner {
LoadLogPrefix {
next: Option<LoadLogSuffix>,
event_tx: Option<mpsc::Sender<Event>>,
future: LoadLogPrefix,
},
LoadLogSuffix(LoadLogSuffix),
CopyLogSuffix(LogSuffix),
Failed(Error),
}
impl Future for LoadLogInner {
type Item = Log;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match *self {
LoadLogInner::LoadLogSuffix(ref mut f) => {
return Ok(track!(f.poll())?.map(Log::Suffix));
}
LoadLogInner::CopyLogSuffix(ref mut f) => {
let suffix = mem::take(f);
return Ok(Async::Ready(Log::Suffix(suffix)));
}
LoadLogInner::LoadLogPrefix {
ref mut next,
ref mut future,
ref mut event_tx,
} => {
match track!(future.poll())? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => {
let next =
track_assert_some!(next.take(), ErrorKind::InconsistentState);
LoadLogInner::LoadLogSuffix(next)
}
Async::Ready(Some(p)) => {
if let Some(tx) = event_tx.take() {
let _ = tx.send(Event::LogPrefixUpdated { new_head: p.tail });
}
return Ok(Async::Ready(Log::Prefix(p)));
}
}
}
LoadLogInner::Failed(ref mut e) => {
let e = mem::replace(e, ErrorKind::Other.error().into());
return Err(track!(e));
}
};
*self = next;
}
}
}
pub struct DeleteLog {
logger: Logger,
event_tx: mpsc::Sender<Event>,
future: BoxFuture<()>,
}
impl DeleteLog {
pub(crate) fn new(handle: &Handle, event_tx: mpsc::Sender<Event>, node: LocalNodeId) -> Self {
let logger = handle.logger.clone();
let future = into_box_future(
handle
.device
.request()
.deadline(Deadline::Infinity)
.prioritized()
.wait_for_running()
.delete_range(node.to_available_lump_id_range())
.map(|_| ()),
);
info!(logger, "[START] DeleteLog");
Self {
logger,
event_tx,
future,
}
}
}
impl Future for DeleteLog {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(()) = track!(self.future.poll())? {
info!(self.logger, "[FINISH] DeleteLog");
let _ = self.event_tx.send(Event::LogSuffixDeleted);
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use cannyls::lump::LumpData;
use raftlog::cluster::ClusterConfig;
use raftlog::election::Term;
use raftlog::log::{LogEntry, LogIndex, LogPosition, LogPrefix, LogSuffix};
use std::collections::btree_set::BTreeSet;
use trackable::result::TestResult;
use crate::test_util::{run_test_with_storage, wait_for};
use crate::LocalNodeId;
#[test]
fn delete_log_prefix_works() -> TestResult {
let node_id = LocalNodeId::new([0, 11, 222, 3, 44, 5, 66]);
run_test_with_storage(node_id, |(mut storage, device)| {
let term = Term::new(3);
let log_prefix = LogPrefix {
tail: LogPosition {
prev_term: term,
index: LogIndex::new(0),
},
config: ClusterConfig::new(BTreeSet::new()),
snapshot: vec![],
};
wait_for(storage.save_log_prefix(log_prefix))?;
let lump_id = node_id.to_log_prefix_index_lump_id();
let result = wait_for(device.handle().request().head(lump_id))?;
assert!(result.is_some());
wait_for(storage.delete_log())?;
let lump_id = node_id.to_log_prefix_index_lump_id();
let result = wait_for(device.handle().request().head(lump_id))?;
assert!(result.is_none());
Ok(())
})
}
#[test]
fn delete_log_works_without_suffix() -> TestResult {
let node_id = LocalNodeId::new([0, 11, 222, 3, 44, 5, 66]);
run_test_with_storage(node_id, |(mut storage, _device)| {
wait_for(storage.delete_log())?;
Ok(())
})
}
#[test]
fn delete_log_suffix_works_with_suffix() -> TestResult {
let node_id = LocalNodeId::new([0, 11, 222, 3, 44, 5, 66]);
let next_node_id = LocalNodeId::new([0, 11, 222, 3, 44, 5, 67]);
run_test_with_storage(node_id, |(mut storage, device)| {
let term = Term::new(0);
let log_entries = vec![LogEntry::Noop { term }; 3];
let log_suffix = LogSuffix {
head: LogPosition {
prev_term: term,
index: LogIndex::new(0),
},
entries: log_entries.clone(),
};
let non_deleted_lump_id = next_node_id.to_log_entry_lump_id(LogIndex::new(0));
let lump_data = track!(LumpData::new(vec![]))?;
wait_for(storage.save_log_suffix(&log_suffix))?;
for i in 0..log_entries.len() {
let lump_id = node_id.to_log_entry_lump_id(LogIndex::new(i as u64));
let result = wait_for(device.handle().request().head(lump_id))?;
assert!(result.is_some());
}
let _ = wait_for(
device
.handle()
.request()
.put(non_deleted_lump_id, lump_data),
);
wait_for(storage.delete_log())?;
for i in 0..(log_entries.len() * 2) {
let lump_id = node_id.to_log_entry_lump_id(LogIndex::new(i as u64));
let result = wait_for(device.handle().request().head(lump_id))?;
assert!(result.is_none());
}
assert!(wait_for(device.handle().request().get(non_deleted_lump_id))
.unwrap()
.is_some());
Ok(())
})
}
}