Skip to main content

downstream/dstream/
hybrid_0_steady_1_tilted_2_circular_3_algo.rs

1use super::{AssignStorageSiteTrait, CircularAlgo, HasIngestCapacityTrait, SteadyAlgo, TiltedAlgo};
2use crate::_auxlib as aux;
3
4/// Does this algorithm have the capacity to ingest a data item at logical time
5/// T?
6///
7/// @template Uint Unsigned integer type for operands.
8/// @param S The number of buffer sites available.
9/// @param T Queried logical time.
10/// @returns Whether there is capacity to ingest at time T.
11#[allow(non_snake_case)]
12pub fn has_ingest_capacity<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> bool {
13    let _0: Uint = Uint::zero();
14    let _1: Uint = Uint::one();
15    let _2: Uint = _1 + _1;
16    let _3: Uint = _2 + _1;
17    if S < _3 || S % _3 != _0 {
18        return false;
19    }
20    let third_S: Uint = S / _3;
21    let has_capacity_1st = SteadyAlgo::has_ingest_capacity(third_S, T / _3);
22    let has_capacity_2nd = T < _1 || TiltedAlgo::has_ingest_capacity(third_S, (T - _1) / _3);
23    let has_capacity_3rd = T < _2 || CircularAlgo::has_ingest_capacity(third_S, (T - _2) / _3);
24
25    has_capacity_1st && has_capacity_2nd && has_capacity_3rd
26}
27
28/// Site selection for hybrid steady/tilted/circular curation.
29///
30/// What buffer site should the T'th data item be stored to?
31///
32/// @template Uint Unsigned integer type for operands and return value.
33/// @param S Buffer size.
34///     Must be divisible by 3, with S/3 being a power of two.
35/// @param T Current logical time.
36/// @returns The selected storage site, if any.
37///     Returns S if no site should be selected (i.e., discard).
38#[allow(non_snake_case)]
39pub fn _assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Uint {
40    debug_assert!(has_ingest_capacity(S, T));
41
42    let _0: Uint = Uint::zero();
43    let _1: Uint = Uint::one();
44    let _2: Uint = _1 + _1;
45    let _3: Uint = _2 + _1;
46    let third_S: Uint = S / _3;
47    let adj_T: Uint = T / _3;
48    let remainder: Uint = T % _3;
49    if remainder == _0 {
50        let site: Uint = SteadyAlgo::_assign_storage_site(third_S, adj_T);
51        if site == third_S {
52            S
53        } else {
54            site
55        }
56    } else if remainder == _1 {
57        let site: Uint = TiltedAlgo::_assign_storage_site(third_S, adj_T);
58        if site == third_S {
59            S
60        } else {
61            third_S + site
62        }
63    } else {
64        let site: Uint = CircularAlgo::_assign_storage_site(third_S, adj_T);
65        if site == third_S {
66            S
67        } else {
68            _2 * third_S + site
69        }
70    }
71}
72
73/// Site selection for hybrid steady/tilted/circular curation.
74///
75/// What buffer site should the T'th data item be stored to?
76///
77/// @template Uint Unsigned integer type for operands and return value.
78/// @param S Buffer size.
79///     Must be divisible by 3, with S/3 being a power of two.
80/// @param T Current logical time.
81/// @returns The selected storage site, if any.
82///     Returns None if no site should be selected (i.e., discard).
83#[allow(non_snake_case)]
84pub fn assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Option<Uint> {
85    let k: Uint = _assign_storage_site(S, T);
86    if k == S {
87        None
88    } else {
89        Some(k)
90    }
91}
92
93pub struct Algo;
94
95#[allow(non_snake_case)]
96impl crate::dstream::HasIngestCapacityTrait for Algo {
97    fn has_ingest_capacity<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> bool {
98        has_ingest_capacity::<Uint>(S, T)
99    }
100}
101
102#[allow(non_snake_case)]
103impl crate::dstream::AssignStorageSiteTrait for Algo {
104    fn _assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Uint {
105        _assign_storage_site::<Uint>(S, T)
106    }
107
108    fn assign_storage_site<Uint: aux::UnsignedTrait>(S: Uint, T: Uint) -> Option<Uint> {
109        assign_storage_site::<Uint>(S, T)
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn test_smoke_has_ingest_capacity() {
119        has_ingest_capacity::<u32>(12, 5);
120    }
121
122    #[test]
123    fn test_smoke_assign_storage_site() {
124        assign_storage_site::<u32>(12, 5);
125    }
126
127    #[test]
128    fn test_smoke_impl_assign_storage_site() {
129        _assign_storage_site::<u32>(12, 5);
130    }
131}