frugalos_raft 1.2.0

An implementation of the `raftlog::Io` trait for `frugalos`
Documentation
use cannyls::deadline::Deadline;
use cannyls::lump::LumpData;
use fibers::sync::mpsc;
use futures::{Async, Future, Poll};
use raftlog::log::{LogIndex, LogPosition, LogPrefix};
use raftlog::{Error, Result};
use std::cmp;
use std::env;
use std::ops::Range;
use std::time::Instant;

use super::super::{into_box_future, BoxFuture, Event, Handle, Storage, StorageMetrics};
use super::delete::{DeleteOldLogEntries, DeleteOldLogPrefixBytes};
use super::load::LoadLogPrefixIndex;
use crate::protobuf;
use crate::util::Phase5;

fn max_lump_data_size() -> usize {
    env::var("RAFT_IO_MAX_LUMP_DATA_SIZE")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(LumpData::MAX_SIZE)
}

// #[derive(Debug)]
pub struct SaveLogPrefix {
    handle: Handle,
    phase: Phase5<
        LoadLogPrefixIndex,
        SaveLogPrefixBytes,
        SaveLogPrefixIndex,
        DeleteOldLogPrefixBytes,
        DeleteOldLogEntries,
    >,
    prefix: Option<LogPrefix>,
    old_prefix_index: Range<u64>,
    old_entries: Range<LogIndex>,
    new_head: LogPosition,
    event_tx: mpsc::Sender<Event>,
    started_at: Instant,
    metrics: StorageMetrics,
}
impl SaveLogPrefix {
    #[allow(clippy::reversed_empty_ranges)]
    pub(crate) fn new(storage: &mut Storage, prefix: LogPrefix) -> Self {
        let handle = storage.handle.clone();
        let old_entries = Range {
            start: storage.log_suffix.head.index,
            end: prefix.tail.index,
        };
        info!(
            handle.logger,
            "[START] SaveLogPrefix: {}",
            dump!(prefix.tail, prefix.config, prefix.snapshot.len())
        );
        let phase = Phase5::A(LoadLogPrefixIndex::new(handle.clone()));
        SaveLogPrefix {
            handle,
            phase,
            new_head: prefix.tail,
            prefix: Some(prefix),
            old_prefix_index: Range { start: 0, end: 0 },
            old_entries,
            event_tx: storage.event_tx.clone(),
            started_at: Instant::now(),
            metrics: storage.metrics.clone(),
        }
    }
}
impl Future for SaveLogPrefix {
    type Item = ();
    type Error = Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        #[allow(clippy::reversed_empty_ranges)]
        while let Async::Ready(phase) = self.phase.poll()? {
            let next = match phase {
                Phase5::A(index) => {
                    let index = index.unwrap_or(Range { start: 0, end: 0 });
                    self.old_prefix_index = index.clone();
                    let prefix = self.prefix.take().expect("Never fails");
                    let future =
                        track!(SaveLogPrefixBytes::new(self.handle.clone(), index, prefix))?;
                    Phase5::B(future)
                }
                Phase5::B(prefix_index) => {
                    let future =
                        track!(SaveLogPrefixIndex::new(self.handle.clone(), prefix_index))?;
                    Phase5::C(future)
                }
                Phase5::C(()) => {
                    let future = DeleteOldLogPrefixBytes::new(
                        self.handle.clone(),
                        self.old_prefix_index.clone(),
                    );
                    Phase5::D(future)
                }
                Phase5::D(()) => {
                    let future = track!(DeleteOldLogEntries::new(
                        self.handle.clone(),
                        self.old_entries.clone()
                    ))?;
                    Phase5::E(future)
                }
                Phase5::E(()) => {
                    info!(self.handle.logger, "[FINISH] SaveLogPrefix");
                    let event = Event::LogPrefixUpdated {
                        new_head: self.new_head,
                    };
                    let _ = self.event_tx.send(event);
                    let elapsed =
                        prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
                    self.metrics
                        .save_log_prefix_duration_seconds
                        .observe(elapsed);
                    return Ok(Async::Ready(()));
                }
            };
            self.phase = next;
        }
        Ok(Async::NotReady)
    }
}

// #[derive(Debug)]
struct SaveLogPrefixBytes {
    handle: Handle,
    current_index: u64,
    prefix_index: Range<u64>,
    prefix_bytes: Vec<u8>,
    future: Option<BoxFuture<bool>>,
}
impl SaveLogPrefixBytes {
    pub fn new(handle: Handle, old_index: Range<u64>, prefix: LogPrefix) -> Result<Self> {
        let prefix_bytes = track!(protobuf::encode_log_prefix(prefix))?;
        let lump_count = (prefix_bytes.len() + max_lump_data_size() - 1) / max_lump_data_size();
        let prefix_index = Range {
            start: old_index.end,
            end: old_index.end + lump_count as u64,
        };
        info!(
            handle.logger,
            "[START] SaveLogPrefixBytes: {}",
            dump!(prefix_index, prefix_bytes.len())
        );
        Ok(SaveLogPrefixBytes {
            handle,
            current_index: prefix_index.start,
            prefix_index,
            prefix_bytes,
            future: None,
        })
    }
}
impl Future for SaveLogPrefixBytes {
    type Item = Range<u64>;
    type Error = Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while let Async::Ready(_) = track!(self.future.poll())? {
            if self.current_index == self.prefix_index.end {
                info!(self.handle.logger, "[FINISH] SaveLogPrefixBytes");
                return Ok(Async::Ready(self.prefix_index.clone()));
            }

            let end = cmp::min(self.prefix_bytes.len(), max_lump_data_size());
            let bytes = self.prefix_bytes.drain(0..end).collect::<Vec<_>>();

            let index = self.current_index;
            let lump_id = self.handle.node_id.to_log_prefix_lump_id(index);
            info!(
                self.handle.logger,
                "[PROGRESS] SaveLogPrefixBytes: {}",
                dump!(index, lump_id, bytes.len())
            );

            let data = self
                .handle
                .device
                .allocate_lump_data_with_bytes(&bytes)
                .expect("Never fails");
            let future = self
                .handle
                .device
                .request()
                .deadline(Deadline::Infinity)
                .prioritized()
                .put(lump_id, data);
            self.current_index += 1;
            self.future = Some(into_box_future(future));
        }
        Ok(Async::NotReady)
    }
}

// #[derive(Debug)]
struct SaveLogPrefixIndex {
    handle: Handle,
    future: BoxFuture<bool>,
}
impl SaveLogPrefixIndex {
    pub fn new(handle: Handle, index: Range<u64>) -> Result<Self> {
        let lump_id = handle.node_id.to_log_prefix_index_lump_id();
        let bytes = track!(protobuf::encode_log_prefix_index(index.clone()))?;
        info!(
            handle.logger,
            "[START] SaveLogPrefixIndex: {}",
            dump!(index, bytes.len(), lump_id)
        );
        let data = LumpData::new_embedded(bytes).expect("Never fails");
        let future = into_box_future(
            handle
                .device
                .request()
                .deadline(Deadline::Infinity)
                .prioritized()
                .put(lump_id, data),
        );
        Ok(SaveLogPrefixIndex { handle, future })
    }
}
impl Future for SaveLogPrefixIndex {
    type Item = ();
    type Error = Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let polled = track!(self.future.poll())?;
        if polled.is_ready() {
            info!(self.handle.logger, "[FINISH] SaveLogPrefixIndex");
        }
        Ok(polled.map(|_| ()))
    }
}