fs_share_utils/broadcast/
receiver.rs1use std::{
7 collections::HashMap,
8 net::{IpAddr, Ipv4Addr},
9 net::{SocketAddr, UdpSocket},
10 num::NonZero,
11 sync::mpsc::{self, Receiver},
12 thread::{self, JoinHandle},
13 time::Duration,
14};
15
16use anyhow::Context;
17
18pub struct BroadcastReceiver {
23 prefix: Vec<u8>,
25
26 buffer: Box<[u8]>,
28
29 socket: UdpSocket,
31}
32
33impl BroadcastReceiver {
34 pub fn builder() -> BroadcastReceiverBuilder {
36 BroadcastReceiverBuilder::default()
37 }
38}
39
40pub struct PayloadReader<'a> {
51 buf: &'a [u8],
52 pos: usize,
53}
54
55impl<'a> PayloadReader<'a> {
56 pub fn new(buf: &'a [u8]) -> Self {
58 Self { buf, pos: 0 }
59 }
60}
61
62impl<'a> Iterator for PayloadReader<'a> {
63 type Item = &'a [u8];
64
65 fn next(&mut self) -> Option<Self::Item> {
66 if self.pos >= self.buf.len() {
68 return None;
69 }
70
71 unsafe {
73 if *self.buf.get_unchecked(self.pos) != b':' {
74 return None;
75 }
76 }
77 self.pos += 1;
78
79 if self.pos + 2 > self.buf.len() {
81 return None;
82 }
83
84 let len = u16::from_be_bytes([self.buf[self.pos], self.buf[self.pos + 1]]) as usize;
85
86 self.pos += 2;
87
88 if self.pos + len > self.buf.len() {
90 return None;
91 }
92
93 let slice = &self.buf[self.pos..self.pos + len];
94 self.pos += len;
95
96 Some(slice)
97 }
98}
99
100impl BroadcastReceiver {
101 pub fn start<U>(
123 self,
124 ) -> (
125 Box<dyn FnOnce() + Send>,
126 Receiver<(SocketAddr, U)>,
127 JoinHandle<()>,
128 )
129 where
130 U: for<'a> TryFrom<(SocketAddr, PayloadReader<'a>)>,
131 U: Clone + PartialEq + Send + 'static,
132 {
133 let (data_tx, data_rx) = mpsc::channel();
134 let (stop_tx, stop_rx) = mpsc::channel();
135
136 let handle = thread::spawn(move || {
137 let mut this = self;
138
139 let mut seen: HashMap<SocketAddr, U> = HashMap::new();
141
142 loop {
143 if stop_rx.try_recv().is_ok() {
145 break;
146 }
147
148 match this.socket.recv_from(&mut this.buffer) {
149 Ok((size, addr)) => {
150 if this.buffer.starts_with(&this.prefix) {
152 let payload = &this.buffer[this.prefix.len()..size];
153 let reader = PayloadReader::new(payload);
154
155 match U::try_from((addr, reader)) {
156 Ok(data) => {
157 let is_new_or_changed = match seen.get(&addr) {
159 Some(old) => old != &data,
160 None => true,
161 };
162
163 if is_new_or_changed {
164 seen.insert(addr, data.clone());
165 let _ = data_tx.send((addr, data));
166 }
167 }
168 Err(_) => continue, }
170 }
171 }
172 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
173 Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => continue,
174 Err(e) => {
175 eprintln!("Receive error: {}", e);
176 break;
177 }
178 }
179 }
180 });
181
182 let stop = Box::new(move || {
183 let _ = stop_tx.send(());
184 });
185
186 (stop, data_rx, handle)
187 }
188}
189
190pub struct BroadcastReceiverBuilder {
193 prefix: Vec<u8>,
194 timeout: Option<Duration>,
195 buffer_size: Option<NonZero<usize>>,
196 bind_addr: SocketAddr,
197}
198
199impl Default for BroadcastReceiverBuilder {
200 fn default() -> Self {
201 Self {
202 prefix: Vec::new(),
203 timeout: Some(Duration::from_millis(300)),
204 buffer_size: NonZero::new(8 * 1024), bind_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 7755),
206 }
207 }
208}
209
210impl BroadcastReceiverBuilder {
211 pub fn prefix<T: Into<Vec<u8>>>(mut self, value: T) -> Self {
213 self.prefix = value.into();
214 self
215 }
216
217 pub fn buffer_size(mut self, size: usize) -> Self {
219 self.buffer_size = NonZero::new(size);
220 self
221 }
222
223 pub fn bind_addr(mut self, addr: SocketAddr) -> Self {
225 self.bind_addr = addr;
226 self
227 }
228
229 pub fn build(self) -> anyhow::Result<BroadcastReceiver> {
231 let buffer_size = self.buffer_size.context("Buffer size is not set")?.get();
232
233 let buffer = vec![0u8; buffer_size + self.prefix.len()].into_boxed_slice();
234
235 let socket = UdpSocket::bind(self.bind_addr)
236 .with_context(|| format!("Failed to bind UDP socket on {}", self.bind_addr))?;
237
238 socket.set_read_timeout(self.timeout).with_context(|| {
239 format!(
240 "Failed to set read timeout {:?} on {}",
241 self.timeout, self.bind_addr
242 )
243 })?;
244
245 Ok(BroadcastReceiver {
246 prefix: self.prefix,
247 buffer,
248 socket,
249 })
250 }
251}