rta-for-fps-lib 0.2.0

An Implementation of the paper 'Response Time Analysis for Fixed Priority Servers' by Hamann et al
Documentation
//! Module that defines the finite Curve type
//!
//! and all associated functions

use alloc::vec::Vec;
use core::fmt::Debug;

use curve_types::CurveType;

use crate::iterators::curve::{CurveDeltaIterator, Delta};
use crate::iterators::CurveIterator;
use crate::server::{ServerKind, ServerProperties};
use crate::time::{TimeUnit, UnitNumber};
use crate::window::window_types::WindowType;
use crate::window::WindowEnd;
use crate::window::{Demand, Overlap, Window};

pub mod curve_types;

/// A Curve is an ordered Set of non-overlapping Windows
///
/// The windows are ordered by their start
#[derive(Debug, Eq)]
pub struct Curve<C: CurveType> {
    /// windows contains an ordered Set of non-overlapping non-empty windows
    windows: Vec<Window<C::WindowKind>>,
}

impl<C: CurveType> PartialEq for Curve<C> {
    fn eq(&self, other: &Self) -> bool {
        self.windows.eq(&other.windows)
    }
}

impl<C: CurveType> Clone for Curve<C> {
    fn clone(&self) -> Self {
        Curve {
            windows: self.windows.clone(),
        }
    }
}

impl<T: CurveType> Curve<T> {
    /// Create a new Curve from the provided window
    ///
    /// May return a Curve with no Windows when the provided Window is empty
    #[must_use]
    pub fn new(window: Window<T::WindowKind>) -> Self {
        let windows = if window.is_empty() {
            // Empty windows can be ignored
            alloc::vec![]
        } else {
            // A Curve with only a single has
            // the windows always ordered and non-overlapping
            alloc::vec![window]
        };

        Self { windows }
    }

    /// Returns a slice reference to the contained windows
    #[must_use]
    pub fn as_windows(&self) -> &[Window<T::WindowKind>] {
        self.windows.as_slice()
    }

    /// Consumes self and returns the contained Windows
    #[must_use]
    pub fn into_windows(self) -> Vec<Window<T::WindowKind>> {
        self.windows
    }

    /// Create a new empty Curve
    #[must_use]
    pub fn empty() -> Self {
        Self {
            windows: alloc::vec![],
        }
    }

    /// Create a new Curve from the given Vector of Windows
    /// without checking or guaranteeing that the Curve invariants are met
    /// by the list of windows.
    ///
    /// # Safety
    /// Windows need to be non-overlapping and
    /// ordered based on start, to fulfill invariants of curve
    #[must_use]
    pub unsafe fn from_windows_unchecked(windows: Vec<Window<T::WindowKind>>) -> Self {
        Self { windows }
    }

    /// Return the Curves Capacity as defined by Definition 3. in the paper
    #[must_use]
    pub fn capacity(&self) -> WindowEnd {
        self.windows.iter().map(Window::length).sum()
    }

    /// Return true if the Capacity of the Curve is 0
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.windows
            .iter()
            .map(Window::length)
            .all(|c| c == TimeUnit::ZERO)
    }

    /// Change the `CurveType` of the Curve,
    /// requires that the `WindowType` of both [`CurveTypes`](trait@CurveType) is the same
    #[must_use]
    pub fn reclassify<C: CurveType<WindowKind = T::WindowKind>>(self) -> Curve<C> {
        Curve {
            windows: self.windows,
        }
    }

    /// compare the curve to a curve iterator
    /// consuming the iterator in the process
    pub fn eq_curve_iterator<CI: CurveIterator<CurveKind = T>>(&self, mut other: CI) -> bool {
        let mut windows = self.as_windows().iter();
        loop {
            match (windows.next(), other.next_window()) {
                (None, None) => break true,
                (Some(_), None) | (None, Some(_)) => break false,
                (Some(left), Some(right)) => {
                    if left != &right {
                        break false;
                    }
                }
            }
        }
    }
}

impl<T: CurveType<WindowKind = Demand>> Curve<T> {
    /// Partition the Curve as Defined by Algorithms 2. and 3. of the paper
    ///
    /// The implementation here deviates from the paper by returning an exclusive index while the paper uses an inclusive index
    #[must_use]
    pub fn partition(
        &self,
        group_index: UnitNumber,
        server_properties: ServerProperties,
    ) -> PartitionResult {
        match server_properties.server_type {
            ServerKind::Deferrable => {
                // Algorithm 2.

                // Note for Step (1):
                // The paper indexes the Windows 0-based but iterates starting at 1
                // this appears to be a mix-up between 0-based and 1-based indexing
                // which is mixed throughout the paper

                // Note index is i+1 rather than i
                // as 0 is used in case the first window is larges than the server capacity
                // meaning index is exclusive here rather than inclusive as in the paper

                // (1)
                let (index, sum) = self
                    .windows
                    .iter()
                    .enumerate()
                    .scan(TimeUnit::ZERO, |acc, (index, window)| {
                        match window.length() {
                            WindowEnd::Finite(length) => {
                                *acc += length;
                                (*acc <= server_properties.capacity).then(|| (index + 1, *acc))
                            }
                            WindowEnd::Infinite => None,
                        }
                    })
                    .last()
                    .unwrap_or((0, TimeUnit::ZERO));

                // (2)
                let remaining_capacity = server_properties.capacity - sum;

                let (head, tail) = self.windows.get(index).map_or_else(
                    || (Window::empty(), Window::empty()),
                    |window| {
                        if remaining_capacity > TimeUnit::ZERO {
                            // we have remaining capacity and a window to fill the remaining budget
                            let head_start = window.start;
                            let tail_end = window.end;
                            let split = head_start + remaining_capacity;
                            let head = Window::new(head_start, Some(split));
                            let tail = Window::new(split, tail_end);
                            (head, tail)
                        } else {
                            // no capacity left set window as tail
                            (Window::empty(), window.clone())
                        }
                    },
                );

                PartitionResult { index, head, tail }
            }
            ServerKind::Periodic => {
                // Algorithm 3.
                // (1)

                let limit = group_index * server_properties.interval + server_properties.capacity;

                // Note index is i+1 rather than i,
                // as 0 is used to indicate that the first window is already past the limit
                // index need therefore be treated as exclusive rather than inclusive as in the paper

                let index = self
                    .windows
                    .iter()
                    .enumerate()
                    .filter_map(|(index, window)| (window.end < limit).then(|| index + 1))
                    .last()
                    .unwrap_or(0);

                // (2)
                let (head, tail) = self.windows.get(index).map_or_else(
                    || (Window::empty(), Window::empty()),
                    |window| {
                        if window.start < limit && limit < window.end {
                            // window crosses the limit, split it at the limit
                            let head = Window::new(window.start, limit);
                            let tail = Window::new(limit, window.end);
                            (head, tail)
                        } else {
                            // Window won't be split as it does not contain the limit
                            // just set the window as the tail
                            (Window::empty(), window.clone())
                        }
                    },
                );

                PartitionResult { index, head, tail }
            }
        }
    }
}

/// Return Type for [`CurveDeltaIterator::collect_delta`]
#[derive(Debug, PartialEq)]
pub struct CurveDeltaResult<
    P: CurveType,
    Q: CurveType,
    R: CurveType<WindowKind = Overlap<P::WindowKind, Q::WindowKind>>,
> {
    /// The remaining supply, can be 0-2 Windows
    pub remaining_supply: Curve<P>,
    /// The (used) Overlap between Supply and Demand
    pub overlap: Curve<R>,
    /// The remaining Demand that could not be fulfilled by the Supply
    pub remaining_demand: Curve<Q>,
}

impl<DW: WindowType, SW: WindowType, DI, SI> CurveDeltaIterator<DW, SW, DI, SI>
where
    DI: CurveIterator,
    DI::CurveKind: CurveType<WindowKind = DW>,
    SI: CurveIterator,
    SI::CurveKind: CurveType<WindowKind = SW>,
{
    /// collect the complete `CurveDeltaIterator`
    ///
    /// # Warning
    ///
    /// Won't terminate if `CurveDelaIterator` is infinite as it will try to consume the complete iterator
    ///
    #[must_use]
    pub fn collect_delta<R: CurveType<WindowKind = Overlap<SW, DW>>>(
        self,
    ) -> CurveDeltaResult<SI::CurveKind, DI::CurveKind, R>
    where
        Self: Iterator<Item = Delta<SW, DW, SI, DI>>,
    {
        let mut result = CurveDeltaResult {
            remaining_supply: Curve::empty(),
            overlap: Curve::empty(),
            remaining_demand: Curve::empty(),
        };

        for delta in self {
            match delta {
                Delta::RemainingSupply(supply) => result.remaining_supply.windows.push(supply),
                Delta::Overlap(overlap) => result.overlap.windows.push(overlap),
                Delta::RemainingDemand(demand) => result.remaining_demand.windows.push(demand),
                Delta::EndSupply(supply) => supply
                    .into_iterator()
                    .for_each(|window| result.remaining_supply.windows.push(window)),
                Delta::EndDemand(demand) => {
                    demand.for_each(|window| result.remaining_demand.windows.push(window))
                }
            }
        }

        result
    }
}

/// Return Type for [`Curve::partition`](Curve::partition)
#[derive(Debug)]
pub struct PartitionResult {
    /// The exclusive index up to which all demand fits into the current partition
    ///
    /// Note: the paper uses an inclusive index
    pub index: usize,

    /// If there is a window on the partitioning boundary
    /// this contains the split before the boundary, otherwise this contains an empty window
    pub head: Window<Demand>,

    /// If there is a window on the partitioning boundary
    /// this contains the split after the boundary,
    /// otherwise if there is no window on the boundary this contains the first window
    /// after the boundary or an empty window if there is no window after the boundary
    pub tail: Window<Demand>,
}

/// Extension trait to allow calling aggregate on an iterator
pub trait AggregateExt: Iterator + Sized {
    /// aggregate all iterator elements
    /// acts similar to [`core::iter::Iterator::sum`]
    #[must_use]
    fn aggregate<A: Aggregate<Self::Item>>(self) -> A {
        A::aggregate(self)
    }
}

impl<I: Iterator> AggregateExt for I {}

/// Trait used by the `AggregateExt` Extension trait
pub trait Aggregate<A = Self> {
    /// aggregate all elements of `iter` into a new Self
    /// pendant to [`core::iter::Sum`]
    #[must_use]
    fn aggregate<I>(iter: I) -> Self
    where
        I: Iterator<Item = A>;
}