statime_linux/
tlvforwarder.rs

1use statime::port::{ForwardedTLV, ForwardedTLVProvider};
2
3pub struct TlvForwarder {
4    sender: tokio::sync::broadcast::Sender<ForwardedTLV<'static>>,
5    receiver: tokio::sync::broadcast::Receiver<ForwardedTLV<'static>>,
6    peek: Option<ForwardedTLV<'static>>,
7}
8
9impl Default for TlvForwarder {
10    fn default() -> Self {
11        Self::new()
12    }
13}
14
15impl TlvForwarder {
16    pub fn new() -> Self {
17        let (sender, receiver) = tokio::sync::broadcast::channel(128);
18        Self {
19            sender,
20            receiver,
21            peek: None,
22        }
23    }
24
25    pub fn forward(&self, tlv: ForwardedTLV<'static>) {
26        // Dont care about all receivers being gone.
27        let _ = self.sender.send(tlv);
28    }
29
30    // We have this instead of clone since this a duplication of this
31    // basically creates a second forwarder that starts reading at the
32    // same point, without updating the old. This is in many cases not
33    // what the user wants so extra friction here is good.
34    pub fn duplicate(&self) -> Self {
35        Self {
36            sender: self.sender.clone(),
37            receiver: self.receiver.resubscribe(),
38            peek: None,
39        }
40    }
41
42    pub fn empty(&mut self) {
43        use tokio::sync::broadcast::error::TryRecvError;
44        self.peek = None;
45        // Empty the receiver
46        while !matches!(
47            self.receiver.try_recv(),
48            Err(TryRecvError::Empty) | Err(TryRecvError::Closed)
49        ) {}
50    }
51}
52
53impl ForwardedTLVProvider for TlvForwarder {
54    fn next_if_smaller(&mut self, max_size: usize) -> Option<statime::port::ForwardedTLV> {
55        use tokio::sync::broadcast::error::TryRecvError;
56
57        while self.peek.is_none() {
58            match self.receiver.try_recv() {
59                Ok(value) => self.peek = Some(value),
60                Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => return None,
61                Err(TryRecvError::Lagged(_)) => continue,
62            }
63        }
64
65        // workaround for take_if not being stable
66        if let Some(v) = self.peek.take() {
67            if v.size() <= max_size {
68                Some(v)
69            } else {
70                self.peek = Some(v);
71                None
72            }
73        } else {
74            None
75        }
76    }
77}