use std::error::Error;
use std::fmt;
use display_more::DisplayOptionExt;
use validit::Validate;
use validit::less_equal;
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub(crate) struct IOProgress<T>
where T: PartialOrd + fmt::Debug
{
accepted: Option<T>,
submitted: Option<T>,
flushed: Option<T>,
id: String,
name: &'static str,
}
impl<T> Validate for IOProgress<T>
where T: PartialOrd + fmt::Debug
{
fn validate(&self) -> Result<(), Box<dyn Error>> {
less_equal!(&self.flushed, &self.submitted);
less_equal!(&self.submitted, &self.accepted);
Ok(())
}
}
impl<T> fmt::Display for IOProgress<T>
where
T: PartialOrd + fmt::Debug,
T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"id={:<2} {:>7}:(flushed/submitted:({}, {}], accepted: {})",
self.id,
self.name,
self.flushed.display(),
self.submitted.display(),
self.accepted.display(),
)
}
}
impl<T> IOProgress<T>
where
T: PartialOrd + fmt::Debug,
T: fmt::Display,
{
pub(crate) fn new_synchronized(v: Option<T>, id: impl ToString, name: &'static str) -> Self
where T: Clone {
Self {
accepted: v.clone(),
submitted: v.clone(),
flushed: v.clone(),
id: id.to_string(),
name,
}
}
pub(crate) fn set_id(&mut self, id: impl ToString) {
self.id = id.to_string();
}
pub(crate) fn accept(&mut self, new_accepted: T) {
tracing::debug!("RAFT_io {}; new_accepted: {}", self, new_accepted);
if cfg!(debug_assertions) {
assert!(
self.accepted.as_ref().is_none_or(|accepted| accepted <= &new_accepted),
"expect accepted:{} < new_accepted:{}",
self.accepted.display(),
new_accepted,
);
}
self.accepted = Some(new_accepted);
tracing::debug!("RAFT_io {}", self);
}
pub(crate) fn submit(&mut self, new_submitted: T) {
tracing::debug!("RAFT_io {}; new_submitted: {}", self, new_submitted);
if cfg!(debug_assertions) {
assert!(
self.submitted.as_ref().is_none_or(|submitted| submitted <= &new_submitted),
"expect submitted:{} < new_submitted:{}",
self.submitted.display(),
new_submitted,
);
}
self.submitted = Some(new_submitted);
tracing::debug!("RAFT_io {}", self);
}
pub(crate) fn flush(&mut self, new_flushed: T) {
tracing::debug!("RAFT_io {}; new_flushed: {}", self, new_flushed);
if cfg!(debug_assertions) {
assert!(
self.flushed.as_ref().is_none_or(|flushed| flushed <= &new_flushed),
"expect flushed:{} < new_flushed:{}",
self.flushed.display(),
new_flushed,
);
}
self.flushed = Some(new_flushed);
tracing::debug!("RAFT_io {}", self);
}
pub(crate) fn try_update_all(&mut self, value: T)
where T: Clone {
if self.accepted.as_ref() < Some(&value) {
self.accept(value.clone());
}
if self.submitted.as_ref() < Some(&value) {
self.submit(value.clone());
}
if self.flushed.as_ref() < Some(&value) {
self.flush(value.clone());
}
tracing::debug!("RAFT_io {}", self);
}
#[allow(dead_code)]
pub(crate) fn try_accept(&mut self, new_accepted: T) {
if self.accepted.as_ref() < Some(&new_accepted) {
self.accept(new_accepted);
tracing::debug!("RAFT_io {}", self);
}
}
#[allow(dead_code)]
pub(crate) fn try_submit(&mut self, new_submitted: T) {
if self.submitted.as_ref() < Some(&new_submitted) {
self.submit(new_submitted);
tracing::debug!("RAFT_io {}", self);
}
}
#[allow(dead_code)]
pub(crate) fn try_flush(&mut self, new_flushed: T) {
if self.flushed.as_ref() < Some(&new_flushed) {
self.flush(new_flushed);
tracing::debug!("RAFT_io {}", self);
}
}
pub(crate) fn accepted(&self) -> Option<&T> {
self.accepted.as_ref()
}
#[allow(dead_code)]
pub(crate) fn submitted(&self) -> Option<&T> {
self.submitted.as_ref()
}
pub(crate) fn flushed(&self) -> Option<&T> {
self.flushed.as_ref()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_monotonic_updates() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.accept(11);
assert_eq!(Some(&11), progress.accepted());
progress.submit(11);
assert_eq!(Some(&11), progress.submitted());
progress.try_flush(11);
assert_eq!(Some(&11), progress.flushed());
}
#[test]
#[should_panic(expected = "expect accepted:11 < new_accepted:10")]
#[cfg(debug_assertions)]
fn test_accept_enforces_strict_monotonicity() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.accept(11);
progress.accept(10); }
#[test]
#[should_panic(expected = "expect submitted:11 < new_submitted:10")]
#[cfg(debug_assertions)]
fn test_submit_enforces_strict_monotonicity() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.submit(11);
progress.submit(10); }
#[test]
fn test_flush_tolerates_out_of_order() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.try_flush(11);
assert_eq!(Some(&11), progress.flushed());
progress.try_flush(7);
assert_eq!(Some(&11), progress.flushed()); }
#[test]
fn test_try_update_all() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.try_update_all(15);
assert_eq!(Some(&15), progress.accepted());
assert_eq!(Some(&15), progress.submitted());
assert_eq!(Some(&15), progress.flushed());
progress.try_update_all(12);
assert_eq!(Some(&15), progress.accepted());
assert_eq!(Some(&15), progress.submitted());
assert_eq!(Some(&15), progress.flushed());
}
#[test]
fn test_try_accept() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.try_accept(15);
assert_eq!(Some(&15), progress.accepted());
progress.try_accept(12);
assert_eq!(Some(&15), progress.accepted());
progress.try_accept(15);
assert_eq!(Some(&15), progress.accepted());
progress.try_accept(20);
assert_eq!(Some(&20), progress.accepted());
}
#[test]
fn test_try_submit() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.try_submit(15);
assert_eq!(Some(&15), progress.submitted());
progress.try_submit(12);
assert_eq!(Some(&15), progress.submitted());
progress.try_submit(15);
assert_eq!(Some(&15), progress.submitted());
progress.try_submit(20);
assert_eq!(Some(&20), progress.submitted());
}
#[test]
fn test_try_flush() {
let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
progress.try_flush(15);
assert_eq!(Some(&15), progress.flushed());
progress.try_flush(12);
assert_eq!(Some(&15), progress.flushed());
progress.try_flush(15);
assert_eq!(Some(&15), progress.flushed());
progress.try_flush(20);
assert_eq!(Some(&20), progress.flushed());
}
}