simple_ssdp/
client.rs

1use std::net::IpAddr;
2use std::net::Ipv4Addr;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::time::Duration;
7
8use log::debug;
9use log::trace;
10use tokio::net::UdpSocket;
11use tokio::time::timeout;
12
13use crate::http_helper::generate_ssdp_discover;
14use crate::service::ServiceDescription;
15use crate::socket_helper::join_socket;
16use crate::MulticastAddr;
17use crate::SSDP_PORT;
18
19/// The SSDP Client
20pub struct Client {
21    /// List of Services found by the Client
22    ///
23    /// | USN URI          | Service Type URI | Expiration | Location          |
24    /// |------------------|------------------|------------|-------------------|
25    /// | upnp:uuid:k91... | upnp:clockradio  | 3 days     | http://foo.com/cr |
26    /// | uuid:x7z...      | ms:wince         | 1 week     | http://msce/win   |
27    services: Arc<Mutex<Vec<ServiceDescription>>>,
28
29    /// Timeout - used to wait for incoming answers
30    timeout: Duration,
31}
32
33impl Default for Client {
34    fn default() -> Self {
35        Self {
36            services: Arc::new(Mutex::new(vec![])),
37            timeout: Duration::from_secs(5),
38        }
39    }
40}
41
42impl Client {
43    // TODO there could be a blocking channel which also receives the NOTIFY and BYEBYE calls
44    /// Discover SSDP Services
45    /// - `identifier`: The unique Identifier for this Client e.g. `uuid:83760048-2d32-4e48-854f-f63a8fa9fd09`
46    /// - `address`: In which scope do you want to scan?
47    /// - `search`: `ssdp:all` to find all SSDP Services or custom Service Types to look for
48    pub async fn discover(
49        &self,
50        identifier: String,
51        address: MulticastAddr,
52        search: String,
53    ) -> Result<(), Box<dyn std::error::Error>> {
54        // Create a UDP socket
55        let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
56        let socket = Arc::new(UdpSocket::bind(local_addr).await?);
57        let multicast_addr: SocketAddr = SocketAddr::new(address.get_ip(), SSDP_PORT);
58
59        join_socket(&address, socket.clone())?;
60
61        let discover_message = generate_ssdp_discover(identifier, search, &address);
62
63        // Multicast search request
64        socket
65            .send_to(discover_message.as_bytes(), &multicast_addr)
66            .await?;
67
68        // Create a buffer to store the received data
69        let mut buf = vec![0; 1024];
70
71        // Listen for service replies
72        // Define a timeout duration for listening for responses
73        let start = tokio::time::Instant::now();
74
75        // Listen for service replies until timeout
76        while start.elapsed() < self.timeout {
77            match timeout(
78                self.timeout - start.elapsed(),
79                socket.recv_from(&mut buf),
80            )
81            .await
82            {
83                Ok(Ok((len, addr))) => {
84                    let response_message = String::from_utf8_lossy(&buf[..len]).to_string();
85                    trace!(
86                        "Received {} bytes from {}: {:#?}",
87                        len,
88                        addr,
89                        response_message
90                    );
91
92                    let mut headers = [httparse::EMPTY_HEADER; 64];
93                    let mut resp = httparse::Response::new(&mut headers);
94
95                    if resp.parse(&buf[..len]).is_err() {
96                        trace!("Could not parse request to HTTP");
97                        continue;
98                    }
99
100                    if resp.code.is_none() || resp.code.is_some_and(|code| 200 != code) {
101                        trace!("Response Code is not 200");
102                        continue;
103                    }
104
105                    // TODO the response must be in exact order right now
106                    let usn = resp.headers.get(4);
107                    if usn.is_none() || usn.is_some_and(|usn| usn.name != "USN") {
108                        trace!("USN header is not present");
109                        continue;
110                    }
111
112                    let st = resp.headers.get(3);
113                    if st.is_none() || st.is_some_and(|st| st.name != "ST") {
114                        trace!("ST header is not present");
115                        continue;
116                    }
117
118                    let al = resp.headers.get(5);
119                    if al.is_none() || al.is_some_and(|al| al.name != "AL") {
120                        trace!("ST header is not present");
121                        continue;
122                    }
123
124                    let new_service = ServiceDescription {
125                        usn_uri: String::from_utf8_lossy(usn.expect("Error with USN-Header").value)
126                            .into(),
127                        service_type_uri: String::from_utf8_lossy(
128                            st.expect("Error with ST-Header").value,
129                        )
130                        .into(),
131                        expiration: 100, // TODO needs to get parsed from max-age=<someint>
132                        location: String::from_utf8_lossy(al.expect("Error with AL-Header").value)
133                            .into(), // TODO is: <some:service><http://foo/bar> but should be http://foo/bar
134                    };
135
136                    let mut services_guard = self.services.lock().unwrap();
137                    let mut found = false;
138
139                    for service in services_guard.iter_mut() {
140                        if service.usn_uri == new_service.usn_uri {
141                            *service = new_service.clone();
142                            found = true;
143                            break;
144                        }
145                    }
146
147                    if !found {
148                        services_guard.push(new_service);
149                    }
150
151                    buf.clear();
152                    buf.resize(1024, 0);
153                }
154                Ok(Err(e)) => {
155                    trace!("Error receiving response: {}", e);
156                }
157                Err(_) => {
158                    break; // Timeout reached
159                }
160            }
161        }
162
163        debug!("Services found: {:#?}", self.services);
164
165        Ok(())
166    }
167    
168    /// Retrieve a list of all Services that answered to our multicast call
169    pub fn get_services(&self) -> Vec<ServiceDescription> {
170        self.services.lock().unwrap().clone()
171    }
172
173    /// Changes the timeout
174    pub fn set_timeout(&mut self, timeout: Duration) -> &Self {
175        self.timeout = timeout;
176
177        self
178    }
179}