cyfs_bdt/
types.rs

1use async_std::future;
2use cyfs_base::*;
3use futures::future::{AbortHandle, AbortRegistration, Abortable};
4use rand::Rng;
5use std::fmt;
6use std::{
7    hash::{Hash, Hasher},
8    collections::LinkedList,
9    sync::{
10        atomic::{AtomicU32, Ordering},
11    },
12    time::{Duration, SystemTime, UNIX_EPOCH}
13};
14
15#[derive(Clone)]
16pub struct MixAesKey {
17    pub enc_key: AesKey, 
18    pub mix_key: AesKey
19}
20
21impl std::fmt::Display for MixAesKey {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        write!(f, "enc {}, mix {}", self.enc_key.to_hex().unwrap(), self.mix_key.to_hex().unwrap())
24    }
25}
26
27
28impl MixAesKey {
29    pub fn mix_hash(&self) -> KeyMixHash {
30        self.mix_key.mix_hash(Some(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() / 60))
31    }
32}
33
34#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
35pub struct Sequence(u32);
36
37impl Sequence {
38    pub fn value(&self) -> u32 {
39        self.0
40    }
41}
42
43impl std::fmt::Debug for Sequence {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        write!(f, "{}", self.value())
46    }
47}
48
49impl From<u32> for Sequence {
50    fn from(v: u32) -> Self {
51        Sequence(v)
52    }
53}
54
55impl Hash for Sequence {
56    fn hash<H: Hasher>(&self, state: &mut H) {
57        state.write_u32(self.0)
58    }
59}
60
61impl RawFixedBytes for Sequence {
62    fn raw_bytes() -> Option<usize> {
63        u32::raw_bytes()
64    }
65}
66
67impl RawEncode for Sequence {
68    fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> Result<usize, BuckyError> {
69        Ok(<u32 as RawFixedBytes>::raw_bytes().unwrap())
70    }
71
72    fn raw_encode<'a>(
73        &self,
74        buf: &'a mut [u8],
75        purpose: &Option<RawEncodePurpose>,
76    ) -> Result<&'a mut [u8], BuckyError> {
77        self.0.raw_encode(buf, purpose)
78    }
79}
80
81impl<'de> RawDecode<'de> for Sequence {
82    fn raw_decode(buf: &'de [u8]) -> Result<(Self, &'de [u8]), BuckyError> {
83        u32::raw_decode(buf).map(|(n, buf)| (Self(n), buf))
84    }
85}
86
87
88#[derive(Clone, Copy, Ord, PartialEq, Eq, Debug)]
89pub struct TempSeq(u32);
90
91impl TempSeq {
92    pub fn value(&self) -> u32 {
93        self.0
94    }
95
96    fn now(_now: Timestamp) -> u32 {
97        let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
98        let since_2021 = Duration::from_secs((40 * 365 + 9) * 24 * 3600).as_secs() as u32;
99        // TODO: 用10年?
100        (now - since_2021) * 10
101    }
102
103    // fn time_bits() -> usize {
104    //     20
105    // }
106}
107
108impl PartialOrd for TempSeq {
109    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
110        if self.0 == 0 || other.0 == 0 {
111            self.0.partial_cmp(&other.0)
112        } else if (std::cmp::max(self.0, other.0) - std::cmp::min(self.0, other.0)) > (u32::MAX / 2)
113        {
114            Some(if self.0 > other.0 {
115                std::cmp::Ordering::Less
116            } else {
117                std::cmp::Ordering::Greater
118            })
119        } else {
120            self.0.partial_cmp(&other.0)
121        }
122    }
123}
124
125impl Default for TempSeq {
126    fn default() -> Self {
127        Self(0)
128    }
129}
130
131impl From<u32> for TempSeq {
132    fn from(v: u32) -> Self {
133        Self(v)
134    }
135}
136
137impl Hash for TempSeq {
138    fn hash<H: Hasher>(&self, state: &mut H) {
139        state.write_u32(self.0)
140    }
141}
142
143impl RawFixedBytes for TempSeq {
144    fn raw_bytes() -> Option<usize> {
145        u32::raw_bytes()
146    }
147}
148
149impl RawEncode for TempSeq {
150    fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> Result<usize, BuckyError> {
151        Ok(<u32 as RawFixedBytes>::raw_bytes().unwrap())
152    }
153
154    fn raw_encode<'a>(
155        &self,
156        buf: &'a mut [u8],
157        purpose: &Option<RawEncodePurpose>,
158    ) -> Result<&'a mut [u8], BuckyError> {
159        self.0.raw_encode(buf, purpose)
160    }
161}
162
163impl<'de> RawDecode<'de> for TempSeq {
164    fn raw_decode(buf: &'de [u8]) -> Result<(Self, &'de [u8]), BuckyError> {
165        u32::raw_decode(buf).map(|(n, buf)| (Self(n), buf))
166    }
167}
168
169pub struct TempSeqGenerator {
170    cur: AtomicU32,
171}
172
173
174impl From<TempSeq> for TempSeqGenerator {
175    fn from(init: TempSeq) -> Self {
176        Self {
177            cur: AtomicU32::new(init.value())
178        }
179    }
180}
181
182
183impl TempSeqGenerator {
184    pub fn new() -> Self {
185        let now = TempSeq::now(bucky_time_now());
186        Self {
187            cur: AtomicU32::new(now),
188        }
189    }
190
191    pub fn generate(&self) -> TempSeq {
192        let v = self.cur.fetch_add(1, Ordering::SeqCst);
193        if v == 0 {
194            TempSeq(self.cur.fetch_add(1, Ordering::SeqCst))
195        } else {
196            TempSeq(v)
197        }
198    }
199}
200
201pub type Timestamp = u64;
202
203#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
204pub struct IncreaseId(u32);
205
206impl std::fmt::Display for IncreaseId {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        write!(f, "{}", self.0)
209    }
210}
211
212impl Default for IncreaseId {
213    fn default() -> Self {
214        Self::invalid()
215    }
216}
217
218impl IncreaseId {
219    pub fn invalid() -> Self {
220        Self(0)
221    }
222
223    pub fn is_valid(&self) -> bool {
224        *self != Self::invalid()
225    }
226}
227
228impl RawEncode for IncreaseId {
229    fn raw_measure(&self, _purpose: &Option<RawEncodePurpose>) -> Result<usize, BuckyError> {
230        Ok(<u32 as RawFixedBytes>::raw_bytes().unwrap())
231    }
232
233    fn raw_encode<'a>(
234        &self,
235        buf: &'a mut [u8],
236        purpose: &Option<RawEncodePurpose>,
237    ) -> Result<&'a mut [u8], BuckyError> {
238        self.0.raw_encode(buf, purpose)
239    }
240}
241
242impl<'de> RawDecode<'de> for IncreaseId {
243    fn raw_decode(buf: &'de [u8]) -> Result<(Self, &'de [u8]), BuckyError> {
244        u32::raw_decode(buf).map(|(n, buf)| (Self(n), buf))
245    }
246}
247
248pub struct IncreaseIdGenerator {
249    cur: AtomicU32,
250}
251
252impl IncreaseIdGenerator {
253    pub fn new() -> Self {
254        let mut rng = rand::thread_rng();
255        Self {
256            cur: AtomicU32::new(rng.gen_range(1, 0x7fffffff)),
257        }
258    }
259
260    pub fn generate(&self) -> IncreaseId {
261        IncreaseId(self.cur.fetch_add(1, Ordering::SeqCst) + 1)
262    }
263}
264
265#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
266pub struct EndpointPair(Endpoint, Endpoint);
267
268impl std::fmt::Display for EndpointPair {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        write!(f, "{{{},{}}}", self.0, self.1)
271    }
272}
273
274impl From<(Endpoint, Endpoint)> for EndpointPair {
275    fn from(ep_pair: (Endpoint, Endpoint)) -> Self {
276        assert!(ep_pair.0.is_same_ip_version(&ep_pair.1));
277        assert!(ep_pair.0.protocol() == ep_pair.1.protocol());
278        Self(ep_pair.0, ep_pair.1)
279    }
280}
281
282impl EndpointPair {
283    pub fn local(&self) -> &Endpoint {
284        &self.0
285    }
286
287    pub fn remote(&self) -> &Endpoint {
288        &self.1
289    }
290
291    pub fn protocol(&self) -> Protocol {
292        self.0.protocol()
293    }
294
295    pub fn is_ipv4(&self) -> bool {
296        self.0.addr().is_ipv4()
297    }
298
299    pub fn is_ipv6(&self) -> bool {
300        self.0.addr().is_ipv6()
301    }
302
303    pub fn is_tcp(&self) -> bool {
304        self.0.is_tcp() && self.0.addr().port() == 0
305    }
306
307    pub fn is_udp(&self) -> bool {
308        self.0.is_udp()
309    }
310
311    pub fn is_reverse_tcp(&self) -> bool {
312        self.0.is_tcp() && self.0.addr().port() != 0
313    }
314}
315
316pub struct StateWaiter {
317    wakers: LinkedList<AbortHandle>,
318}
319
320impl StateWaiter {
321    pub fn new() -> Self {
322        Self { wakers: Default::default() }
323    }
324
325    pub fn transfer(&mut self) -> Self {
326        let mut waiter = Self::new();
327        self.transfer_into(&mut waiter);
328        waiter
329    }
330
331    pub fn transfer_into(&mut self, waiter: &mut Self) {
332        waiter.wakers.append(&mut self.wakers);
333    }
334
335    pub fn new_waiter(&mut self) -> AbortRegistration {
336        let (waker, waiter) = AbortHandle::new_pair();
337        self.wakers.push_back(waker);
338        waiter
339    }
340
341    pub async fn wait<T, S: FnOnce() -> T>(waiter: AbortRegistration, state: S) -> T {
342        let _ = Abortable::new(future::pending::<()>(), waiter).await;
343        state()
344    }
345
346    pub async fn abort_wait<A: futures::Future<Output = BuckyError>, T, S: FnOnce() -> T>(t: A, waiter: AbortRegistration, state: S) -> BuckyResult<T> {
347        match Abortable::new(t, waiter).await {
348            Ok(err) => {
349                //FIXME: remove waker 
350                Err(err)
351            }, 
352            Err(_) =>  Ok(state())
353        }
354    }
355
356    pub fn wake(self) {
357        for waker in self.wakers {
358            waker.abort();
359        }
360    }
361
362    pub fn pop(&mut self) -> Option<AbortHandle> {
363        self.wakers.pop_front()   
364    }
365
366    pub fn len(&self) -> usize {
367        self.wakers.len()
368    }
369}