cotton_ssdp/
service.rs

1use crate::engine::{Callback, Engine};
2use crate::refresh_timer::StdTimebase;
3use crate::udp;
4use crate::udp::TargetedReceive;
5use crate::{Advertisement, Notification};
6use rand::RngCore;
7use std::time::Instant;
8
9struct SyncCallback {
10    callback: Box<dyn Fn(&Notification)>,
11}
12
13impl Callback for SyncCallback {
14    fn on_notification(&self, r: &Notification) {
15        (self.callback)(r);
16    }
17}
18
19/** High-level reactor-style SSDP service using mio.
20
21Use a `Service` to discover network resources using SSDP, or to advertise
22network resources which your program provides. Or both.
23
24The implementation integrates with the [`mio`] crate, which provides a
25"reactor-style" I/O API suited for running several I/O operations in a
26single thread. (SSDP, being relatively low-bandwidth and non-urgent,
27is unlikely to require any more high-performance I/O solution.)
28An alternative mechanism, [`crate::AsyncService`], integrates directly
29with [`tokio`] instead of [`mio`].
30
31The implementation requires _two_ UDP sockets: one bound to the
32well-known SSDP port number (1900) which subscribes to the multicast
33group, and a second bound to a random port for sending unicast
34searches and receiving unicast replies. (It would be possible to get
35by with a single socket if cotton-ssdp knew it was the _only_ SSDP
36implementation running on that IP address -- but if there might be
37other implementations running, it needs its own search socket in order
38not to steal other applications' packets.)
39
40For that reason, _two_ MIO tokens are required; these should be passed
41to [`Service::new`], which takes care of registering them with the MIO
42poller. Likewise, the main polling loop can indicate readiness on
43either token at any time, and the corresponding "`*_ready`" method on
44`Service` -- [`Service::multicast_ready`] or [`Service::search_ready`]
45-- should be called in response. All this can be seen in [the
46ssdp-search-mio
47example](https://github.com/pdh11/cotton/blob/main/cotton-ssdp/examples/ssdp-search-mio.rs),
48from which the example code below is adapted.
49
50# Example subscriber
51
52This code starts a search for _all_ SSDP resources on the local
53network, from all network interfaces, and stores unique ones in a
54`HashMap`. The map will be populated as the MIO polling loop runs.
55
56```rust
57# use cotton_ssdp::*;
58# use std::collections::HashMap;
59# use std::cell::RefCell;
60# const SSDP_TOKEN1: mio::Token = mio::Token(0);
61# const SSDP_TOKEN2: mio::Token = mio::Token(1);
62# let mut poll = mio::Poll::new().unwrap();
63# #[cfg(not(miri))]
64# let mut ssdp = Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
65# #[cfg(not(miri))]
66    let map = RefCell::new(HashMap::new());
67# #[cfg(not(miri))]
68    ssdp.subscribe(
69        "ssdp:all",
70        Box::new(move |r| {
71            let mut m = map.borrow_mut();
72            if let Notification::Alive {
73                ref notification_type,
74                ref unique_service_name,
75                ref location,
76            } = r {
77                if !m.contains_key(unique_service_name) {
78                    m.insert(unique_service_name.clone(), r.clone());
79                }
80            }
81        }),
82    );
83```
84
85# Example advertiser
86
87This code sets up an advertisement for a (fictitious) resource,
88ostensibly available over HTTP on port 3333. The actual advertisements
89will be sent (and any incoming searches replied to) as the MIO polling
90loop runs.
91
92(The [UPnP Device
93Architecture](https://openconnectivity.org/developer/specifications/upnp-resources/upnp/archive-of-previously-published-upnp-device-architectures/)
94specifies exactly what to advertise in the case of a _UPnP_ implementation;
95this simpler example is not in itself compliant with that document.)
96
97```rust
98# use cotton_ssdp::*;
99# const SSDP_TOKEN1: mio::Token = mio::Token(0);
100# const SSDP_TOKEN2: mio::Token = mio::Token(1);
101# let mut poll = mio::Poll::new().unwrap();
102# #[cfg(not(miri))]
103# let mut ssdp = Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
104    let uuid = uuid::Uuid::new_v4();
105# #[cfg(not(miri))]
106    ssdp.advertise(
107        uuid.to_string(),
108        cotton_ssdp::Advertisement {
109            notification_type: "test".to_string(),
110            location: "http://127.0.0.1:3333/test".to_string(),
111        },
112    );
113```
114
115Notice that the URL in the `location` field uses the localhost IP
116address. The `Service` itself takes care of rewriting that, on a
117per-network-interface basis, to the IP address on which each SSDP
118peer will be able to reach the host where the `Service` is
119running. For instance, if your Ethernet IP address is 192.168.1.3,
120and your wifi IP address is 10.0.4.7, anyone listening to SSDP on
121Ethernet will see `http://192.168.1.3:3333/test` and anyone listening on
122wifi will see `http://10.0.4.7:3333/test`. (For how this is done, see the use
123of `rewrite_host` in [`Engine::on_data`].)
124
125# The polling loop
126
127The actual MIO polling loop, mentioned above, should be written in the
128standard way common to all MIO applications. For instance, it might look like
129this:
130
131```rust
132# use cotton_ssdp::*;
133# use std::collections::HashMap;
134# use std::cell::RefCell;
135# const SSDP_TOKEN1: mio::Token = mio::Token(0);
136# const SSDP_TOKEN2: mio::Token = mio::Token(1);
137# let mut poll = mio::Poll::new().unwrap();
138# let mut events = mio::Events::with_capacity(128);
139# #[cfg(not(miri))]
140# let mut ssdp = Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
141# #[cfg(not(miri))]
142    loop {
143        poll.poll(&mut events, Some(ssdp.next_wakeup())).unwrap();
144
145        if ssdp.next_wakeup() == std::time::Duration::ZERO {
146            ssdp.wakeup();
147        }
148
149        for event in &events {
150            match event.token() {
151                SSDP_TOKEN1 => ssdp.multicast_ready(),
152                SSDP_TOKEN2 => ssdp.search_ready(),
153                // ... other tokens as required by the application ...
154                _ => (),
155            }
156        }
157#       break;
158    }
159```
160
161*/
162pub struct Service {
163    engine: Engine<SyncCallback, StdTimebase>,
164    multicast_socket: mio::net::UdpSocket,
165    search_socket: mio::net::UdpSocket,
166}
167
168/// The type of [`udp::std::setup_socket`]
169type SocketFn = fn(u16) -> Result<std::net::UdpSocket, std::io::Error>;
170
171/// The type of [`mio::Registry::register`]
172type RegisterFn = fn(
173    &mio::Registry,
174    &mut mio::net::UdpSocket,
175    mio::Token,
176) -> std::io::Result<()>;
177
178impl Service {
179    fn new_inner(
180        registry: &mio::Registry,
181        tokens: (mio::Token, mio::Token),
182        socket: SocketFn,
183        register: RegisterFn,
184        interfaces: Vec<cotton_netif::NetworkEvent>,
185    ) -> Result<Self, std::io::Error> {
186        let mut multicast_socket =
187            mio::net::UdpSocket::from_std(socket(1900u16)?);
188        let mut search_socket = mio::net::UdpSocket::from_std(socket(0u16)?); // ephemeral port
189        let mut engine = Engine::<SyncCallback, StdTimebase>::new(
190            rand::thread_rng().next_u32(),
191            Instant::now(),
192        );
193
194        for netif in interfaces {
195            // Ignore errors -- some interfaces are returned on which
196            // join_multicast failes (lxcbr0)
197            _ = engine.on_network_event(
198                &netif,
199                &multicast_socket,
200                &search_socket,
201            );
202        }
203
204        register(registry, &mut multicast_socket, tokens.0)?;
205        register(registry, &mut search_socket, tokens.1)?;
206
207        Ok(Self {
208            engine,
209            multicast_socket,
210            search_socket,
211        })
212    }
213
214    /// Create a new `Service`, including its two UDP sockets
215    ///
216    /// And registers the sockets with the [`mio::Registry`]
217    ///
218    /// # Errors
219    ///
220    /// Can return a `std::io::Error` if any of the underlying socket
221    /// calls fail.
222    ///
223    pub fn new(
224        registry: &mio::Registry,
225        tokens: (mio::Token, mio::Token),
226    ) -> Result<Self, std::io::Error> {
227        Self::new_inner(
228            registry,
229            tokens,
230            udp::std::setup_socket,
231            |r, s, t| r.register(s, t, mio::Interest::READABLE),
232            cotton_netif::get_interfaces()?.collect(),
233        )
234    }
235
236    /// Subscribe to notifications about a particular service type
237    ///
238    /// Or subscribe to "ssdp:all" for notifications about *all* service
239    /// types.
240    ///
241    /// This call also sends fresh search messages.
242    pub fn subscribe<A>(
243        &mut self,
244        notification_type: A,
245        callback: Box<dyn Fn(&Notification)>,
246    ) where
247        A: Into<String>,
248    {
249        self.engine.subscribe(
250            notification_type.into(),
251            SyncCallback { callback },
252            &self.search_socket,
253        );
254    }
255
256    /// Advertise a local resource on the network
257    pub fn advertise<USN>(
258        &mut self,
259        unique_service_name: USN,
260        advertisement: Advertisement,
261    ) where
262        USN: Into<String>,
263    {
264        self.engine.advertise(
265            unique_service_name.into(),
266            advertisement,
267            &self.search_socket,
268        );
269    }
270
271    /// Withdraw an advertisement for a local resource
272    ///
273    /// For instance, it is "polite" to call this if shutting down
274    /// cleanly.
275    ///
276    pub fn deadvertise(&mut self, unique_service_name: &str) {
277        self.engine
278            .deadvertise(unique_service_name, &self.search_socket);
279    }
280
281    /// Handler to be called when multicast socket is readable
282    pub fn multicast_ready(&mut self) {
283        let mut buf = [0u8; 1500];
284        while let Ok((n, wasto, wasfrom)) =
285            self.multicast_socket.receive_to(&mut buf)
286        {
287            self.engine
288                .on_data(&buf[0..n], wasto, wasfrom, Instant::now());
289        }
290    }
291
292    /// Handler to be called when search socket is readable
293    pub fn search_ready(&mut self) {
294        let mut buf = [0u8; 1500];
295        while let Ok((n, wasto, wasfrom)) =
296            self.search_socket.receive_to(&mut buf)
297        {
298            self.engine
299                .on_data(&buf[0..n], wasto, wasfrom, Instant::now());
300        }
301    }
302
303    /// Time before next wakeup
304    pub fn next_wakeup(&self) -> std::time::Duration {
305        self.engine.poll_timeout() - Instant::now()
306    }
307
308    /// Handler to be called when wakeup timer elapses
309    pub fn wakeup(&mut self) {
310        self.engine
311            .handle_timeout(&self.search_socket, Instant::now());
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    fn my_err() -> std::io::Error {
320        std::io::Error::from(std::io::ErrorKind::Other)
321    }
322
323    fn bogus_register(
324        _: &mio::Registry,
325        _: &mut mio::net::UdpSocket,
326        _: mio::Token,
327    ) -> std::io::Result<()> {
328        Err(my_err())
329    }
330
331    #[test]
332    #[cfg_attr(miri, ignore)]
333    fn instantiate() {
334        const SSDP_TOKEN1: mio::Token = mio::Token(37);
335        const SSDP_TOKEN2: mio::Token = mio::Token(94);
336        let poll = mio::Poll::new().unwrap();
337
338        let _ =
339            Service::new(poll.registry(), (SSDP_TOKEN1, SSDP_TOKEN2)).unwrap();
340    }
341
342    #[test]
343    #[cfg_attr(miri, ignore)]
344    fn service_passes_on_socket_failure() {
345        const SSDP_TOKEN1: mio::Token = mio::Token(37);
346        const SSDP_TOKEN2: mio::Token = mio::Token(94);
347        let poll = mio::Poll::new().unwrap();
348
349        let e = Service::new_inner(
350            poll.registry(),
351            (SSDP_TOKEN1, SSDP_TOKEN2),
352            |_| Err(std::io::Error::new(std::io::ErrorKind::Other, "TEST")),
353            bogus_register,
354            cotton_netif::get_interfaces().unwrap().collect(),
355        );
356
357        assert!(e.is_err());
358    }
359
360    #[test]
361    #[cfg_attr(miri, ignore)]
362    fn service_passes_on_second_socket_failure() {
363        const SSDP_TOKEN1: mio::Token = mio::Token(37);
364        const SSDP_TOKEN2: mio::Token = mio::Token(94);
365        let poll = mio::Poll::new().unwrap();
366
367        let e = Service::new_inner(
368            poll.registry(),
369            (SSDP_TOKEN1, SSDP_TOKEN2),
370            |p| {
371                if p == 0 {
372                    Err(std::io::Error::new(std::io::ErrorKind::Other, "TEST"))
373                } else {
374                    Ok(std::net::UdpSocket::bind("127.0.0.1:0").unwrap())
375                }
376            },
377            bogus_register,
378            cotton_netif::get_interfaces().unwrap().collect(),
379        );
380
381        assert!(e.is_err());
382    }
383
384    #[test]
385    #[cfg_attr(miri, ignore)]
386    fn service_ok_with_no_netifs() {
387        const SSDP_TOKEN1: mio::Token = mio::Token(37);
388        const SSDP_TOKEN2: mio::Token = mio::Token(94);
389        let poll = mio::Poll::new().unwrap();
390
391        let e = Service::new_inner(
392            poll.registry(),
393            (SSDP_TOKEN1, SSDP_TOKEN2),
394            udp::std::setup_socket,
395            |r, s, t| r.register(s, t, mio::Interest::READABLE),
396            Vec::default(),
397        );
398
399        assert!(e.is_ok());
400    }
401
402    #[test]
403    #[cfg_attr(miri, ignore)]
404    fn service_passes_on_register_failure() {
405        const SSDP_TOKEN1: mio::Token = mio::Token(37);
406        const SSDP_TOKEN2: mio::Token = mio::Token(94);
407        let poll = mio::Poll::new().unwrap();
408
409        let e = Service::new_inner(
410            poll.registry(),
411            (SSDP_TOKEN1, SSDP_TOKEN2),
412            udp::std::setup_socket,
413            bogus_register,
414            cotton_netif::get_interfaces().unwrap().collect(),
415        );
416
417        assert!(e.is_err());
418    }
419
420    #[test]
421    #[cfg_attr(miri, ignore)]
422    fn service_passes_on_second_register_failure() {
423        const SSDP_TOKEN1: mio::Token = mio::Token(37);
424        const SSDP_TOKEN2: mio::Token = mio::Token(94);
425        let poll = mio::Poll::new().unwrap();
426
427        let e = Service::new_inner(
428            poll.registry(),
429            (SSDP_TOKEN1, SSDP_TOKEN2),
430            udp::std::setup_socket,
431            |_, _, t| {
432                if t == SSDP_TOKEN1 {
433                    Ok(())
434                } else {
435                    Err(my_err())
436                }
437            },
438            cotton_netif::get_interfaces().unwrap().collect(),
439        );
440
441        assert!(e.is_err());
442    }
443}