statime_linux/
tlvforwarder.rs1use statime::port::{ForwardedTLV, ForwardedTLVProvider};
2
3pub struct TlvForwarder {
4 sender: tokio::sync::broadcast::Sender<ForwardedTLV<'static>>,
5 receiver: tokio::sync::broadcast::Receiver<ForwardedTLV<'static>>,
6 peek: Option<ForwardedTLV<'static>>,
7}
8
9impl Default for TlvForwarder {
10 fn default() -> Self {
11 Self::new()
12 }
13}
14
15impl TlvForwarder {
16 pub fn new() -> Self {
17 let (sender, receiver) = tokio::sync::broadcast::channel(128);
18 Self {
19 sender,
20 receiver,
21 peek: None,
22 }
23 }
24
25 pub fn forward(&self, tlv: ForwardedTLV<'static>) {
26 let _ = self.sender.send(tlv);
28 }
29
30 pub fn duplicate(&self) -> Self {
35 Self {
36 sender: self.sender.clone(),
37 receiver: self.receiver.resubscribe(),
38 peek: None,
39 }
40 }
41
42 pub fn empty(&mut self) {
43 use tokio::sync::broadcast::error::TryRecvError;
44 self.peek = None;
45 while !matches!(
47 self.receiver.try_recv(),
48 Err(TryRecvError::Empty) | Err(TryRecvError::Closed)
49 ) {}
50 }
51}
52
53impl ForwardedTLVProvider for TlvForwarder {
54 fn next_if_smaller(&mut self, max_size: usize) -> Option<statime::port::ForwardedTLV> {
55 use tokio::sync::broadcast::error::TryRecvError;
56
57 while self.peek.is_none() {
58 match self.receiver.try_recv() {
59 Ok(value) => self.peek = Some(value),
60 Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => return None,
61 Err(TryRecvError::Lagged(_)) => continue,
62 }
63 }
64
65 if let Some(v) = self.peek.take() {
67 if v.size() <= max_size {
68 Some(v)
69 } else {
70 self.peek = Some(v);
71 None
72 }
73 } else {
74 None
75 }
76 }
77}