cotton_ssdp/service.rs
1use crate::engine::{Callback, Engine};
2use crate::refresh_timer::StdTimebase;
3use crate::udp;
4use crate::udp::TargetedReceive;
5use crate::{Advertisement, Notification};
6use rand::RngCore;
7use std::time::Instant;
8
9struct SyncCallback {
10 callback: Box<dyn Fn(&Notification)>,
11}
12
13impl Callback for SyncCallback {
14 fn on_notification(&self, r: &Notification) {
15 (self.callback)(r);
16 }
17}
18
19/** High-level reactor-style SSDP service using mio.
20
21Use a `Service` to discover network resources using SSDP, or to advertise
22network resources which your program provides. Or both.
23
24The implementation integrates with the [`mio`] crate, which provides a
25"reactor-style" I/O API suited for running several I/O operations in a
26single thread. (SSDP, being relatively low-bandwidth and non-urgent,
27is unlikely to require any more high-performance I/O solution.)
28An alternative mechanism, [`crate::AsyncService`], integrates directly
29with [`tokio`] instead of [`mio`].
30
31The implementation requires _two_ UDP sockets: one bound to the
32well-known SSDP port number (1900) which subscribes to the multicast
33group, and a second bound to a random port for sending unicast
34searches and receiving unicast replies. (It would be possible to get
35by with a single socket if cotton-ssdp knew it was the _only_ SSDP
36implementation running on that IP address -- but if there might be
37other implementations running, it needs its own search socket in order
38not to steal other applications' packets.)
39
40For that reason, _two_ MIO tokens are required; these should be passed
41to [`Service::new`], which takes care of registering them with the MIO
42poller. Likewise, the main polling loop can indicate readiness on
43either token at any time, and the corresponding "`*_ready`" method on
44`Service` -- [`Service::multicast_ready`] or [`Service::search_ready`]
45-- should be called in response. All this can be seen in [the
46ssdp-search-mio
47example](https://github.com/pdh11/cotton/blob/main/cotton-ssdp/examples/ssdp-search-mio.rs),
48from which the example code below is adapted.
49
50# Example subscriber
51
52This code starts a search for _all_ SSDP resources on the local
53network, from all network interfaces, and stores unique ones in a
54`HashMap`. The map will be populated as the MIO polling loop runs.
55
56```rust
57# use cotton_ssdp::*;
58# use std::collections::HashMap;
59# use std::cell::RefCell;
60# const SSDP_TOKEN1: mio::Token = mio::Token(0);
61# const SSDP_TOKEN2: mio::Token = mio::Token(1);
62# let mut poll = mio::Poll::new().unwrap();
63# #[cfg(not(miri))]
64# let mut ssdp = Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
65# #[cfg(not(miri))]
66 let map = RefCell::new(HashMap::new());
67# #[cfg(not(miri))]
68 ssdp.subscribe(
69 "ssdp:all",
70 Box::new(move |r| {
71 let mut m = map.borrow_mut();
72 if let Notification::Alive {
73 ref notification_type,
74 ref unique_service_name,
75 ref location,
76 } = r {
77 if !m.contains_key(unique_service_name) {
78 m.insert(unique_service_name.clone(), r.clone());
79 }
80 }
81 }),
82 );
83```
84
85# Example advertiser
86
87This code sets up an advertisement for a (fictitious) resource,
88ostensibly available over HTTP on port 3333. The actual advertisements
89will be sent (and any incoming searches replied to) as the MIO polling
90loop runs.
91
92(The [UPnP Device
93Architecture](https://openconnectivity.org/developer/specifications/upnp-resources/upnp/archive-of-previously-published-upnp-device-architectures/)
94specifies exactly what to advertise in the case of a _UPnP_ implementation;
95this simpler example is not in itself compliant with that document.)
96
97```rust
98# use cotton_ssdp::*;
99# const SSDP_TOKEN1: mio::Token = mio::Token(0);
100# const SSDP_TOKEN2: mio::Token = mio::Token(1);
101# let mut poll = mio::Poll::new().unwrap();
102# #[cfg(not(miri))]
103# let mut ssdp = Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
104 let uuid = uuid::Uuid::new_v4();
105# #[cfg(not(miri))]
106 ssdp.advertise(
107 uuid.to_string(),
108 cotton_ssdp::Advertisement {
109 notification_type: "test".to_string(),
110 location: "http://127.0.0.1:3333/test".to_string(),
111 },
112 );
113```
114
115Notice that the URL in the `location` field uses the localhost IP
116address. The `Service` itself takes care of rewriting that, on a
117per-network-interface basis, to the IP address on which each SSDP
118peer will be able to reach the host where the `Service` is
119running. For instance, if your Ethernet IP address is 192.168.1.3,
120and your wifi IP address is 10.0.4.7, anyone listening to SSDP on
121Ethernet will see `http://192.168.1.3:3333/test` and anyone listening on
122wifi will see `http://10.0.4.7:3333/test`. (For how this is done, see the use
123of `rewrite_host` in [`Engine::on_data`].)
124
125# The polling loop
126
127The actual MIO polling loop, mentioned above, should be written in the
128standard way common to all MIO applications. For instance, it might look like
129this:
130
131```rust
132# use cotton_ssdp::*;
133# use std::collections::HashMap;
134# use std::cell::RefCell;
135# const SSDP_TOKEN1: mio::Token = mio::Token(0);
136# const SSDP_TOKEN2: mio::Token = mio::Token(1);
137# let mut poll = mio::Poll::new().unwrap();
138# let mut events = mio::Events::with_capacity(128);
139# #[cfg(not(miri))]
140# let mut ssdp = Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
141# #[cfg(not(miri))]
142 loop {
143 poll.poll(&mut events, Some(ssdp.next_wakeup())).unwrap();
144
145 if ssdp.next_wakeup() == std::time::Duration::ZERO {
146 ssdp.wakeup();
147 }
148
149 for event in &events {
150 match event.token() {
151 SSDP_TOKEN1 => ssdp.multicast_ready(),
152 SSDP_TOKEN2 => ssdp.search_ready(),
153 // ... other tokens as required by the application ...
154 _ => (),
155 }
156 }
157# break;
158 }
159```
160
161*/
162pub struct Service {
163 engine: Engine<SyncCallback, StdTimebase>,
164 multicast_socket: mio::net::UdpSocket,
165 search_socket: mio::net::UdpSocket,
166}
167
168/// The type of [`udp::std::setup_socket`]
169type SocketFn = fn(u16) -> Result<std::net::UdpSocket, std::io::Error>;
170
171/// The type of [`mio::Registry::register`]
172type RegisterFn = fn(
173 &mio::Registry,
174 &mut mio::net::UdpSocket,
175 mio::Token,
176) -> std::io::Result<()>;
177
178impl Service {
179 fn new_inner(
180 registry: &mio::Registry,
181 tokens: (mio::Token, mio::Token),
182 socket: SocketFn,
183 register: RegisterFn,
184 interfaces: Vec<cotton_netif::NetworkEvent>,
185 ) -> Result<Self, std::io::Error> {
186 let mut multicast_socket =
187 mio::net::UdpSocket::from_std(socket(1900u16)?);
188 let mut search_socket = mio::net::UdpSocket::from_std(socket(0u16)?); // ephemeral port
189 let mut engine = Engine::<SyncCallback, StdTimebase>::new(
190 rand::thread_rng().next_u32(),
191 Instant::now(),
192 );
193
194 for netif in interfaces {
195 // Ignore errors -- some interfaces are returned on which
196 // join_multicast failes (lxcbr0)
197 _ = engine.on_network_event(
198 &netif,
199 &multicast_socket,
200 &search_socket,
201 );
202 }
203
204 register(registry, &mut multicast_socket, tokens.0)?;
205 register(registry, &mut search_socket, tokens.1)?;
206
207 Ok(Self {
208 engine,
209 multicast_socket,
210 search_socket,
211 })
212 }
213
214 /// Create a new `Service`, including its two UDP sockets
215 ///
216 /// And registers the sockets with the [`mio::Registry`]
217 ///
218 /// # Errors
219 ///
220 /// Can return a `std::io::Error` if any of the underlying socket
221 /// calls fail.
222 ///
223 pub fn new(
224 registry: &mio::Registry,
225 tokens: (mio::Token, mio::Token),
226 ) -> Result<Self, std::io::Error> {
227 Self::new_inner(
228 registry,
229 tokens,
230 udp::std::setup_socket,
231 |r, s, t| r.register(s, t, mio::Interest::READABLE),
232 cotton_netif::get_interfaces()?.collect(),
233 )
234 }
235
236 /// Subscribe to notifications about a particular service type
237 ///
238 /// Or subscribe to "ssdp:all" for notifications about *all* service
239 /// types.
240 ///
241 /// This call also sends fresh search messages.
242 pub fn subscribe<A>(
243 &mut self,
244 notification_type: A,
245 callback: Box<dyn Fn(&Notification)>,
246 ) where
247 A: Into<String>,
248 {
249 self.engine.subscribe(
250 notification_type.into(),
251 SyncCallback { callback },
252 &self.search_socket,
253 );
254 }
255
256 /// Advertise a local resource on the network
257 pub fn advertise<USN>(
258 &mut self,
259 unique_service_name: USN,
260 advertisement: Advertisement,
261 ) where
262 USN: Into<String>,
263 {
264 self.engine.advertise(
265 unique_service_name.into(),
266 advertisement,
267 &self.search_socket,
268 );
269 }
270
271 /// Withdraw an advertisement for a local resource
272 ///
273 /// For instance, it is "polite" to call this if shutting down
274 /// cleanly.
275 ///
276 pub fn deadvertise(&mut self, unique_service_name: &str) {
277 self.engine
278 .deadvertise(unique_service_name, &self.search_socket);
279 }
280
281 /// Handler to be called when multicast socket is readable
282 pub fn multicast_ready(&mut self) {
283 let mut buf = [0u8; 1500];
284 while let Ok((n, wasto, wasfrom)) =
285 self.multicast_socket.receive_to(&mut buf)
286 {
287 self.engine
288 .on_data(&buf[0..n], wasto, wasfrom, Instant::now());
289 }
290 }
291
292 /// Handler to be called when search socket is readable
293 pub fn search_ready(&mut self) {
294 let mut buf = [0u8; 1500];
295 while let Ok((n, wasto, wasfrom)) =
296 self.search_socket.receive_to(&mut buf)
297 {
298 self.engine
299 .on_data(&buf[0..n], wasto, wasfrom, Instant::now());
300 }
301 }
302
303 /// Time before next wakeup
304 pub fn next_wakeup(&self) -> std::time::Duration {
305 self.engine.poll_timeout() - Instant::now()
306 }
307
308 /// Handler to be called when wakeup timer elapses
309 pub fn wakeup(&mut self) {
310 self.engine
311 .handle_timeout(&self.search_socket, Instant::now());
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 fn my_err() -> std::io::Error {
320 std::io::Error::from(std::io::ErrorKind::Other)
321 }
322
323 fn bogus_register(
324 _: &mio::Registry,
325 _: &mut mio::net::UdpSocket,
326 _: mio::Token,
327 ) -> std::io::Result<()> {
328 Err(my_err())
329 }
330
331 #[test]
332 #[cfg_attr(miri, ignore)]
333 fn instantiate() {
334 const SSDP_TOKEN1: mio::Token = mio::Token(37);
335 const SSDP_TOKEN2: mio::Token = mio::Token(94);
336 let poll = mio::Poll::new().unwrap();
337
338 let _ =
339 Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
340 }
341
342 #[test]
343 #[cfg_attr(miri, ignore)]
344 fn service_passes_on_socket_failure() {
345 const SSDP_TOKEN1: mio::Token = mio::Token(37);
346 const SSDP_TOKEN2: mio::Token = mio::Token(94);
347 let poll = mio::Poll::new().unwrap();
348
349 let e = Service::new_inner(
350 poll.registry(),
351 (SSDP_TOKEN1, SSDP_TOKEN2),
352 |_| Err(std::io::Error::new(std::io::ErrorKind::Other, "TEST")),
353 bogus_register,
354 cotton_netif::get_interfaces().unwrap().collect(),
355 );
356
357 assert!(e.is_err());
358 }
359
360 #[test]
361 #[cfg_attr(miri, ignore)]
362 fn service_passes_on_second_socket_failure() {
363 const SSDP_TOKEN1: mio::Token = mio::Token(37);
364 const SSDP_TOKEN2: mio::Token = mio::Token(94);
365 let poll = mio::Poll::new().unwrap();
366
367 let e = Service::new_inner(
368 poll.registry(),
369 (SSDP_TOKEN1, SSDP_TOKEN2),
370 |p| {
371 if p == 0 {
372 Err(std::io::Error::new(std::io::ErrorKind::Other, "TEST"))
373 } else {
374 Ok(std::net::UdpSocket::bind("127.0.0.1:0").unwrap())
375 }
376 },
377 bogus_register,
378 cotton_netif::get_interfaces().unwrap().collect(),
379 );
380
381 assert!(e.is_err());
382 }
383
384 #[test]
385 #[cfg_attr(miri, ignore)]
386 fn service_ok_with_no_netifs() {
387 const SSDP_TOKEN1: mio::Token = mio::Token(37);
388 const SSDP_TOKEN2: mio::Token = mio::Token(94);
389 let poll = mio::Poll::new().unwrap();
390
391 let e = Service::new_inner(
392 poll.registry(),
393 (SSDP_TOKEN1, SSDP_TOKEN2),
394 udp::std::setup_socket,
395 |r, s, t| r.register(s, t, mio::Interest::READABLE),
396 Vec::default(),
397 );
398
399 assert!(e.is_ok());
400 }
401
402 #[test]
403 #[cfg_attr(miri, ignore)]
404 fn service_passes_on_register_failure() {
405 const SSDP_TOKEN1: mio::Token = mio::Token(37);
406 const SSDP_TOKEN2: mio::Token = mio::Token(94);
407 let poll = mio::Poll::new().unwrap();
408
409 let e = Service::new_inner(
410 poll.registry(),
411 (SSDP_TOKEN1, SSDP_TOKEN2),
412 udp::std::setup_socket,
413 bogus_register,
414 cotton_netif::get_interfaces().unwrap().collect(),
415 );
416
417 assert!(e.is_err());
418 }
419
420 #[test]
421 #[cfg_attr(miri, ignore)]
422 fn service_passes_on_second_register_failure() {
423 const SSDP_TOKEN1: mio::Token = mio::Token(37);
424 const SSDP_TOKEN2: mio::Token = mio::Token(94);
425 let poll = mio::Poll::new().unwrap();
426
427 let e = Service::new_inner(
428 poll.registry(),
429 (SSDP_TOKEN1, SSDP_TOKEN2),
430 udp::std::setup_socket,
431 |_, _, t| {
432 if t == SSDP_TOKEN1 {
433 Ok(())
434 } else {
435 Err(my_err())
436 }
437 },
438 cotton_netif::get_interfaces().unwrap().collect(),
439 );
440
441 assert!(e.is_err());
442 }
443}