Skip to main content

downstream/dstream/
hybrid_0_steady_2_tilted_3_algo.rs

1use super::{AssignStorageSiteTrait, HasIngestCapacityTrait, SteadyAlgo, TiltedAlgo};
2use crate::_auxlib as aux;
3
4/// Does this algorithm have the capacity to ingest a data item at logical time
5/// T?
6///
7/// @template Uint Unsigned integer type for operands.
8/// @param S The number of buffer sites available.
9/// @param T Queried logical time.
10/// @returns Whether there is capacity to ingest at time T.
11#[allow(non_snake_case)]
12pub fn has_ingest_capacity<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> bool {
13    let _0: Uint = Uint::zero();
14    let _1: Uint = Uint::one();
15    let _2: Uint = _1 + _1;
16    let _3: Uint = _2 + _1;
17    if S < _3 || S % _3 != _0 {
18        return false;
19    }
20    let third_S: Uint = S / _3;
21    let two_thirds_S: Uint = _2 * third_S;
22    let t_div_3: Uint = T / _3;
23    let has_capacity_1st = SteadyAlgo::has_ingest_capacity(
24        two_thirds_S,
25        t_div_3 * _2 + aux::from_bool::<Uint>(T % _3 > _0),
26    );
27    let has_capacity_2nd = T < _2 || TiltedAlgo::has_ingest_capacity(third_S, (T - _2) / _3);
28
29    has_capacity_1st && has_capacity_2nd
30}
31
32/// Site selection for hybrid steady/tilted curation.
33///
34/// What buffer site should the T'th data item be stored to?
35///
36/// @template Uint Unsigned integer type for operands and return value.
37/// @param S Buffer size.
38///     Must be divisible by 3, with S/3 being a power of two.
39/// @param T Current logical time.
40/// @returns The selected storage site, if any.
41///     Returns S if no site should be selected (i.e., discard).
42#[allow(non_snake_case)]
43pub fn _assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Uint {
44    debug_assert!(has_ingest_capacity(S, T));
45
46    let _1: Uint = Uint::one();
47    let _2: Uint = _1 + _1;
48    let _3: Uint = _2 + _1;
49    let third_S: Uint = S / _3;
50    let two_thirds_S: Uint = _2 * third_S;
51    let remainder: Uint = T % _3;
52    if remainder < _2 {
53        let adj_T: Uint = (T / _3) * _2 + remainder;
54        let site: Uint = SteadyAlgo::_assign_storage_site(two_thirds_S, adj_T);
55        if site == two_thirds_S {
56            S
57        } else {
58            site
59        }
60    } else {
61        let adj_T: Uint = T / _3;
62        let site: Uint = TiltedAlgo::_assign_storage_site(third_S, adj_T);
63        if site == third_S {
64            S
65        } else {
66            two_thirds_S + site
67        }
68    }
69}
70
71/// Site selection for hybrid steady/tilted curation.
72///
73/// What buffer site should the T'th data item be stored to?
74///
75/// @template Uint Unsigned integer type for operands and return value.
76/// @param S Buffer size.
77///     Must be divisible by 3, with S/3 being a power of two.
78/// @param T Current logical time.
79/// @returns The selected storage site, if any.
80///     Returns None if no site should be selected (i.e., discard).
81#[allow(non_snake_case)]
82pub fn assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Option<Uint> {
83    let k: Uint = _assign_storage_site(S, T);
84    if k == S {
85        None
86    } else {
87        Some(k)
88    }
89}
90
91pub struct Algo;
92
93#[allow(non_snake_case)]
94impl crate::dstream::HasIngestCapacityTrait for Algo {
95    fn has_ingest_capacity<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> bool {
96        has_ingest_capacity::<Uint>(S, T)
97    }
98}
99
100#[allow(non_snake_case)]
101impl crate::dstream::AssignStorageSiteTrait for Algo {
102    fn _assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Uint {
103        _assign_storage_site::<Uint>(S, T)
104    }
105
106    fn assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Option<Uint> {
107        assign_storage_site::<Uint>(S, T)
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    #[test]
116    fn test_smoke_has_ingest_capacity() {
117        has_ingest_capacity::<u32>(12, 5);
118    }
119
120    #[test]
121    fn test_smoke_assign_storage_site() {
122        assign_storage_site::<u32>(12, 5);
123    }
124
125    #[test]
126    fn test_smoke_impl_assign_storage_site() {
127        _assign_storage_site::<u32>(12, 5);
128    }
129}