openraft 0.10.0-alpha.18

Advanced Raft consensus
Documentation
use std::error::Error;
use std::fmt;

use display_more::DisplayOptionExt;
use validit::Validate;
use validit::less_equal;

/// Tracks the progress of I/O operations through three stages: accepted, submitted, and flushed.
///
/// `T`: A totally ordered type representing the I/O operation identifier (e.g., [`LogIOId`]).
///
/// Invariant: `flushed <= submitted <= accepted`
///
/// For a comprehensive explanation of the three-stage tracking and examples, see:
/// [Log I/O Progress](crate::docs::data::log_io_progress).
///
/// [`LogIOId`]: crate::raft_state::io_state::log_io_id::LogIOId
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub(crate) struct IOProgress<T>
where T: PartialOrd + fmt::Debug
{
    /// The highest I/O operation accepted by RaftCore for execution.
    ///
    /// An accepted I/O will eventually be submitted and flushed.
    accepted: Option<T>,

    /// The highest I/O operation submitted to the storage layer.
    ///
    /// Set before submitting an I/O in RaftCore. Within the same task, seeing this
    /// set to `v` guarantees I/O `v` has been submitted; no such guarantee exists
    /// for other tasks due to potential reordering.
    submitted: Option<T>,

    /// The highest I/O operation confirmed as durably persisted.
    flushed: Option<T>,

    id: String,
    name: &'static str,
}

impl<T> Validate for IOProgress<T>
where T: PartialOrd + fmt::Debug
{
    fn validate(&self) -> Result<(), Box<dyn Error>> {
        less_equal!(&self.flushed, &self.submitted);
        less_equal!(&self.submitted, &self.accepted);
        Ok(())
    }
}

impl<T> fmt::Display for IOProgress<T>
where
    T: PartialOrd + fmt::Debug,
    T: fmt::Display,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "id={:<2} {:>7}:(flushed/submitted:({}, {}], accepted: {})",
            self.id,
            self.name,
            self.flushed.display(),
            self.submitted.display(),
            self.accepted.display(),
        )
    }
}

impl<T> IOProgress<T>
where
    T: PartialOrd + fmt::Debug,
    T: fmt::Display,
{
    /// Create a new IOProgress with all cursors synchronized to the same value.
    ///
    /// Used for initialization or snapshot installation to align all IO tracking.
    pub(crate) fn new_synchronized(v: Option<T>, id: impl ToString, name: &'static str) -> Self
    where T: Clone {
        Self {
            accepted: v.clone(),
            submitted: v.clone(),
            flushed: v.clone(),
            id: id.to_string(),
            name,
        }
    }

    pub(crate) fn set_id(&mut self, id: impl ToString) {
        self.id = id.to_string();
    }

    /// Update the `accept` cursor of the I/O progress.
    ///
    /// Enforces strict monotonicity - panics in debug mode if updates arrive out of order.
    pub(crate) fn accept(&mut self, new_accepted: T) {
        tracing::debug!("RAFT_io    {}; new_accepted: {}", self, new_accepted);

        if cfg!(debug_assertions) {
            assert!(
                self.accepted.as_ref().is_none_or(|accepted| accepted <= &new_accepted),
                "expect accepted:{} < new_accepted:{}",
                self.accepted.display(),
                new_accepted,
            );
        }

        self.accepted = Some(new_accepted);

        tracing::debug!("RAFT_io    {}", self);
    }

    /// Update the `submit` cursor of the I/O progress.
    ///
    /// Enforces strict monotonicity - panics in debug mode if updates arrive out of order.
    pub(crate) fn submit(&mut self, new_submitted: T) {
        tracing::debug!("RAFT_io    {}; new_submitted: {}", self, new_submitted);

        if cfg!(debug_assertions) {
            assert!(
                self.submitted.as_ref().is_none_or(|submitted| submitted <= &new_submitted),
                "expect submitted:{} < new_submitted:{}",
                self.submitted.display(),
                new_submitted,
            );
        }

        self.submitted = Some(new_submitted);

        tracing::debug!("RAFT_io    {}", self);
    }

    /// Update the `flush` cursor of the I/O progress.
    ///
    /// Only updates if the new value is greater than the current value.
    /// This allows the flush cursor to tolerate out-of-order I/O completion notifications.
    pub(crate) fn flush(&mut self, new_flushed: T) {
        tracing::debug!("RAFT_io    {}; new_flushed: {}", self, new_flushed);

        if cfg!(debug_assertions) {
            assert!(
                self.flushed.as_ref().is_none_or(|flushed| flushed <= &new_flushed),
                "expect flushed:{} < new_flushed:{}",
                self.flushed.display(),
                new_flushed,
            );
        }

        self.flushed = Some(new_flushed);

        tracing::debug!("RAFT_io    {}", self);
    }

    /// Conditionally update all three cursors (accepted, submitted, flushed) to the same value.
    ///
    /// Each cursor is only updated if it is behind the given value, ensuring monotonic progress.
    /// This is primarily used when snapshot building completes asynchronously - the snapshot
    /// progress may have already advanced through normal operations while the snapshot was being
    /// built, so we only update cursors that are actually behind.
    pub(crate) fn try_update_all(&mut self, value: T)
    where T: Clone {
        if self.accepted.as_ref() < Some(&value) {
            self.accept(value.clone());
        }
        if self.submitted.as_ref() < Some(&value) {
            self.submit(value.clone());
        }
        if self.flushed.as_ref() < Some(&value) {
            self.flush(value.clone());
        }

        tracing::debug!("RAFT_io    {}", self);
    }

    /// Conditionally update the `accept` cursor if the new value is greater.
    #[allow(dead_code)]
    pub(crate) fn try_accept(&mut self, new_accepted: T) {
        if self.accepted.as_ref() < Some(&new_accepted) {
            self.accept(new_accepted);
            tracing::debug!("RAFT_io    {}", self);
        }
    }

    /// Conditionally update the `submit` cursor if the new value is greater.
    #[allow(dead_code)]
    pub(crate) fn try_submit(&mut self, new_submitted: T) {
        if self.submitted.as_ref() < Some(&new_submitted) {
            self.submit(new_submitted);
            tracing::debug!("RAFT_io    {}", self);
        }
    }

    /// Conditionally update the `flush` cursor if the new value is greater.
    #[allow(dead_code)]
    pub(crate) fn try_flush(&mut self, new_flushed: T) {
        if self.flushed.as_ref() < Some(&new_flushed) {
            self.flush(new_flushed);
            tracing::debug!("RAFT_io    {}", self);
        }
    }

    pub(crate) fn accepted(&self) -> Option<&T> {
        self.accepted.as_ref()
    }

    // Not used until Command reorder is implemented.
    #[allow(dead_code)]
    pub(crate) fn submitted(&self) -> Option<&T> {
        self.submitted.as_ref()
    }

    pub(crate) fn flushed(&self) -> Option<&T> {
        self.flushed.as_ref()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_monotonic_updates() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");

        // Monotonic updates should work
        progress.accept(11);
        assert_eq!(Some(&11), progress.accepted());

        progress.submit(11);
        assert_eq!(Some(&11), progress.submitted());

        progress.try_flush(11);
        assert_eq!(Some(&11), progress.flushed());
    }

    #[test]
    #[should_panic(expected = "expect accepted:11 < new_accepted:10")]
    #[cfg(debug_assertions)]
    fn test_accept_enforces_strict_monotonicity() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
        progress.accept(11);
        progress.accept(10); // Should panic - out of order
    }

    #[test]
    #[should_panic(expected = "expect submitted:11 < new_submitted:10")]
    #[cfg(debug_assertions)]
    fn test_submit_enforces_strict_monotonicity() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
        progress.submit(11);
        progress.submit(10); // Should panic - out of order
    }

    #[test]
    fn test_flush_tolerates_out_of_order() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");

        // Flush uses conditional update logic
        progress.try_flush(11);
        assert_eq!(Some(&11), progress.flushed());

        // Out-of-order flush is ignored
        progress.try_flush(7);
        assert_eq!(Some(&11), progress.flushed()); // Remains 11, doesn't revert to 7
    }

    #[test]
    fn test_try_update_all() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");
        progress.try_update_all(15);
        assert_eq!(Some(&15), progress.accepted());
        assert_eq!(Some(&15), progress.submitted());
        assert_eq!(Some(&15), progress.flushed());

        // Try update with smaller value - should not update
        progress.try_update_all(12);
        assert_eq!(Some(&15), progress.accepted());
        assert_eq!(Some(&15), progress.submitted());
        assert_eq!(Some(&15), progress.flushed());
    }

    #[test]
    fn test_try_accept() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");

        // Update with greater value - should update
        progress.try_accept(15);
        assert_eq!(Some(&15), progress.accepted());

        // Update with smaller value - should not update
        progress.try_accept(12);
        assert_eq!(Some(&15), progress.accepted());

        // Update with equal value - should not update
        progress.try_accept(15);
        assert_eq!(Some(&15), progress.accepted());

        // Update with greater value again - should update
        progress.try_accept(20);
        assert_eq!(Some(&20), progress.accepted());
    }

    #[test]
    fn test_try_submit() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");

        // Update with greater value - should update
        progress.try_submit(15);
        assert_eq!(Some(&15), progress.submitted());

        // Update with smaller value - should not update
        progress.try_submit(12);
        assert_eq!(Some(&15), progress.submitted());

        // Update with equal value - should not update
        progress.try_submit(15);
        assert_eq!(Some(&15), progress.submitted());

        // Update with greater value again - should update
        progress.try_submit(20);
        assert_eq!(Some(&20), progress.submitted());
    }

    #[test]
    fn test_try_flush() {
        let mut progress = IOProgress::new_synchronized(Some(10), 1, "test");

        // Update with greater value - should update
        progress.try_flush(15);
        assert_eq!(Some(&15), progress.flushed());

        // Update with smaller value - should not update
        progress.try_flush(12);
        assert_eq!(Some(&15), progress.flushed());

        // Update with equal value - should not update
        progress.try_flush(15);
        assert_eq!(Some(&15), progress.flushed());

        // Update with greater value again - should update
        progress.try_flush(20);
        assert_eq!(Some(&20), progress.flushed());
    }
}