1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
use super::*;

/// An Offset represents the position of the left edge of some Segment.
/// Offsets must be paired with a *power* to map to quantum coordinates.
/// The absolute DhtLocation of the offset is determined by the "power" of its
/// context, and topology of the space, by:
///
///   dht_location = offset * 2^pow * quantum_size
pub trait Offset: Sized + Copy + Clone + Deref<Target = u32> + From<u32> {
    /// The type of quantum to map to, which also implies the absolute coordinates
    type Quantum: Quantum;

    /// Get the absolute coordinate for this Offset
    fn to_absolute(
        &self,
        topo: &Topology,
        power: u8,
    ) -> <<Self as Offset>::Quantum as Quantum>::Absolute;

    /// Get the quantum coordinate for this Offset
    fn to_quantum(&self, power: u8) -> Self::Quantum;

    /// Get the nearest rounded-down Offset for the given Loc
    fn from_absolute_rounded(loc: Loc, topo: &Topology, power: u8) -> Self;
}

/// An Offset in space.
#[derive(
    Copy,
    Clone,
    Debug,
    PartialEq,
    Eq,
    PartialOrd,
    Ord,
    Hash,
    derive_more::Add,
    derive_more::Sub,
    derive_more::Mul,
    derive_more::Div,
    derive_more::Deref,
    derive_more::DerefMut,
    derive_more::From,
    derive_more::Into,
    serde::Serialize,
    serde::Deserialize,
)]
#[serde(transparent)]
pub struct SpaceOffset(pub u32);

/// An Offset in time.
#[derive(
    Copy,
    Clone,
    Debug,
    PartialEq,
    Eq,
    PartialOrd,
    Ord,
    Hash,
    derive_more::Add,
    derive_more::Sub,
    derive_more::Mul,
    derive_more::Div,
    derive_more::Deref,
    derive_more::DerefMut,
    derive_more::From,
    derive_more::Into,
    serde::Serialize,
    serde::Deserialize,
)]
#[serde(transparent)]
pub struct TimeOffset(pub u32);

impl Offset for SpaceOffset {
    type Quantum = SpaceQuantum;

    /// Get the absolute coordinate for this Offset
    fn to_absolute(&self, topo: &Topology, power: u8) -> Loc {
        self.wrapping_mul(topo.space.quantum)
            .wrapping_mul(pow2(power))
            .into()
    }

    /// Get the quantum coordinate for this Offset
    fn to_quantum(&self, power: u8) -> Self::Quantum {
        self.wrapping_mul(pow2(power)).into()
    }

    /// Get the nearest rounded-down Offset for the given Loc
    fn from_absolute_rounded(loc: Loc, topo: &Topology, power: u8) -> Self {
        (loc.as_u32() / topo.space.quantum / pow2(power)).into()
    }
}

impl Offset for TimeOffset {
    type Quantum = TimeQuantum;

    /// Get the absolute coordinate for this Offset
    fn to_absolute(&self, topo: &Topology, power: u8) -> Timestamp {
        Timestamp::from_micros(
            self.wrapping_mul(topo.time.quantum)
                .wrapping_mul(pow2(power)) as i64,
        )
    }

    /// Get the quantum coordinate for this Offset
    fn to_quantum(&self, power: u8) -> Self::Quantum {
        self.wrapping_mul(pow2(power)).into()
    }

    /// Get the nearest rounded-down Offset for the given Loc
    fn from_absolute_rounded(loc: Loc, topo: &Topology, power: u8) -> Self {
        (loc.as_u32() / topo.time.quantum / pow2(power)).into()
    }
}

/// Any interval in space or time is represented by a node in a tree, so our
/// way of describing intervals uses tree coordinates as well:
/// The length of an interval is 2^(power), and the position of its left edge
/// is at (offset * length).
#[derive(
    Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize,
)]
pub struct Segment<O: Offset> {
    /// The exponent, where length = 2^power
    pub power: u8,
    /// The offset from the origin, measured in number of lengths
    pub offset: O,
}

impl<O: Offset> Segment<O> {
    /// Constructor
    pub fn new<OO: Into<O>>(power: u8, offset: OO) -> Self {
        Self {
            power,
            offset: offset.into(),
        }
    }

    /// How many quanta does this segment cover?
    pub fn num_quanta(&self) -> u64 {
        // If power is 32, this overflows a u32
        2u64.pow(self.power.into())
    }

    /// The length, in absolute terms (Location or microseconds of time)
    pub fn absolute_length(&self, topo: &Topology) -> u64 {
        let q = O::Quantum::dimension(topo).quantum as u64;
        // If power is 32, this overflows a u32
        self.num_quanta() * q
    }

    /// Get the quanta which bound this segment
    pub fn quantum_bounds(&self, topo: &Topology) -> (O::Quantum, O::Quantum) {
        let n = self.num_quanta();
        let a = (n * u64::from(*self.offset)) as u32;
        (
            O::Quantum::from(a).normalized(topo),
            O::Quantum::from(a.wrapping_add(n as u32).wrapping_sub(1)).normalized(topo),
        )
    }

    /// The segment contains the given quantum coord
    pub fn contains_quantum(&self, topo: &Topology, coord: O::Quantum) -> bool {
        let (lo, hi) = self.quantum_bounds(topo);
        let coord = coord.normalized(topo);
        if lo <= hi {
            lo <= coord && coord <= hi
        } else {
            lo <= coord || coord <= hi
        }
    }

    /// Split a segment in half
    pub fn bisect(&self) -> Option<[Self; 2]> {
        if self.power == 0 {
            // Can't split a quantum value (a leaf has no children)
            None
        } else {
            let power = self.power - 1;
            Some([
                Segment::new(power, O::from(self.offset.wrapping_mul(2))),
                Segment::new(power, O::from(self.offset.wrapping_mul(2).wrapping_add(1))),
            ])
        }
    }
}

impl SpaceSegment {
    /// Get the start and end bounds, in absolute Loc coordinates, for this segment
    pub fn loc_bounds(&self, topo: &Topology) -> (Loc, Loc) {
        let (a, b): (u32, u32) = bounds(&topo.space, self.power, self.offset, 1);
        (Loc::from(a), Loc::from(b))
    }
}

impl TimeSegment {
    /// Get the start and end bounds, in absolute Timestamp coordinates, for this segment
    pub fn timestamp_bounds(&self, topo: &Topology) -> (Timestamp, Timestamp) {
        let (a, b): (i64, i64) = bounds64(&topo.time, self.power, self.offset, 1);
        let o = topo.time_origin.as_micros();
        (Timestamp::from_micros(a + o), Timestamp::from_micros(b + o))
    }
}

/// Alias
pub type SpaceSegment = Segment<SpaceOffset>;
/// Alias
pub type TimeSegment = Segment<TimeOffset>;

pub(super) fn bounds<N: From<u32>>(
    dim: &Dimension,
    power: u8,
    offset: SpaceOffset,
    count: u32,
) -> (N, N) {
    debug_assert_ne!(dim.quantum, 0);
    debug_assert_ne!(count, 0);
    let q = dim.quantum.wrapping_mul(pow2(power));
    let start = offset.wrapping_mul(q);
    let len = count.wrapping_mul(q);
    (start.into(), start.wrapping_add(len).wrapping_sub(1).into())
}

pub(super) fn bounds64<N: From<i64>>(
    dim: &Dimension,
    power: u8,
    offset: TimeOffset,
    count: u32,
) -> (N, N) {
    debug_assert_ne!(dim.quantum, 0);
    debug_assert_ne!(count, 0);
    let q = dim.quantum as i64 * 2i64.pow(power.into());
    let start = (*offset as i64).wrapping_mul(q);
    let len = (count as i64).wrapping_mul(q);
    (start.into(), start.wrapping_add(len).wrapping_sub(1).into())
}