switchgear_components/discovery/
http.rs

1use crate::discovery::error::DiscoveryBackendStoreError;
2use async_trait::async_trait;
3use reqwest::header::{HeaderMap, HeaderValue};
4use reqwest::{Certificate, Client, ClientBuilder, IntoUrl, StatusCode};
5use rustls::pki_types::CertificateDer;
6use secp256k1::PublicKey;
7use std::time::Duration;
8use switchgear_service_api::discovery::{
9    DiscoveryBackend, DiscoveryBackendPatch, DiscoveryBackendStore, DiscoveryBackends,
10    HttpDiscoveryBackendClient,
11};
12use switchgear_service_api::service::ServiceErrorSource;
13use url::Url;
14
15#[derive(Clone, Debug)]
16pub struct HttpDiscoveryBackendStore {
17    client: Client,
18    discovery_url: String,
19    health_check_url: String,
20}
21
22impl HttpDiscoveryBackendStore {
23    pub fn create<U: IntoUrl>(
24        base_url: U,
25        total_timeout: Duration,
26        connect_timeout: Duration,
27        trusted_roots: &[CertificateDer],
28        authorization: String,
29    ) -> Result<Self, DiscoveryBackendStoreError> {
30        let mut headers = HeaderMap::new();
31        let mut auth_value =
32            HeaderValue::from_str(&format!("Bearer {authorization}")).map_err(|e| {
33                DiscoveryBackendStoreError::internal_error(
34                    ServiceErrorSource::Internal,
35                    format!("creating http client with base url: {}", base_url.as_str()),
36                    e.to_string(),
37                )
38            })?;
39        auth_value.set_sensitive(true);
40        headers.insert(reqwest::header::AUTHORIZATION, auth_value);
41
42        let mut builder = ClientBuilder::new();
43
44        for root in trusted_roots {
45            let root = Certificate::from_der(root).map_err(|e| {
46                DiscoveryBackendStoreError::internal_error(
47                    ServiceErrorSource::Internal,
48                    format!("parsing certificate for url: {}", base_url.as_str()),
49                    e.to_string(),
50                )
51            })?;
52            builder = builder.add_root_certificate(root);
53        }
54
55        let client = builder
56            .default_headers(headers)
57            .use_rustls_tls()
58            .timeout(total_timeout)
59            .connect_timeout(connect_timeout)
60            .build()
61            .map_err(|e| {
62                DiscoveryBackendStoreError::http_error(
63                    ServiceErrorSource::Internal,
64                    format!("creating http client with base url: {}", base_url.as_str()),
65                    e,
66                )
67            })?;
68        Self::with_client(client, base_url)
69    }
70
71    pub fn with_client<U: IntoUrl>(
72        client: Client,
73        base_url: U,
74    ) -> Result<Self, DiscoveryBackendStoreError> {
75        let base_url = base_url.as_str().trim_end_matches('/').to_string();
76        let discovery_url = format!("{base_url}/discovery");
77        Url::parse(&discovery_url).map_err(|e| {
78            DiscoveryBackendStoreError::internal_error(
79                ServiceErrorSource::Upstream,
80                format!("parsing service url {discovery_url}"),
81                e.to_string(),
82            )
83        })?;
84
85        let health_check_url = format!("{base_url}/health");
86        Url::parse(&health_check_url).map_err(|e| {
87            DiscoveryBackendStoreError::internal_error(
88                ServiceErrorSource::Upstream,
89                format!("parsing service url {health_check_url}"),
90                e.to_string(),
91            )
92        })?;
93
94        Ok(Self {
95            client,
96            discovery_url,
97            health_check_url,
98        })
99    }
100
101    fn discovery_public_key_url(&self, public_key: &PublicKey) -> String {
102        format!("{}/{}", self.discovery_url, public_key)
103    }
104
105    fn general_error(status: StatusCode, context: &str) -> DiscoveryBackendStoreError {
106        if status.is_success() {
107            return DiscoveryBackendStoreError::internal_error(
108                ServiceErrorSource::Upstream,
109                context.to_string(),
110                format!("unexpected http status {status}"),
111            );
112        }
113        if status.is_client_error() {
114            return DiscoveryBackendStoreError::invalid_input_error(
115                context.to_string(),
116                format!("invalid input, http status: {status}"),
117            );
118        }
119        DiscoveryBackendStoreError::http_status_error(
120            ServiceErrorSource::Upstream,
121            context.to_string(),
122            status.as_u16(),
123        )
124    }
125}
126
127#[async_trait]
128impl DiscoveryBackendStore for HttpDiscoveryBackendStore {
129    type Error = DiscoveryBackendStoreError;
130
131    async fn get(&self, public_key: &PublicKey) -> Result<Option<DiscoveryBackend>, Self::Error> {
132        let url = self.discovery_public_key_url(public_key);
133
134        let response = self.client.get(&url).send().await.map_err(|e| {
135            DiscoveryBackendStoreError::http_error(
136                ServiceErrorSource::Upstream,
137                format!("get backend {url}"),
138                e,
139            )
140        })?;
141
142        match response.status() {
143            StatusCode::OK => {
144                let backend: DiscoveryBackend = response.json().await.map_err(|e| {
145                    DiscoveryBackendStoreError::deserialization_error(
146                        ServiceErrorSource::Upstream,
147                        format!("parse backend {url}"),
148                        e,
149                    )
150                })?;
151                Ok(Some(backend))
152            }
153            StatusCode::NOT_FOUND => Ok(None),
154            status => Err(Self::general_error(status, &format!("get backend {url}"))),
155        }
156    }
157
158    async fn get_all(&self, requested_etag: Option<u64>) -> Result<DiscoveryBackends, Self::Error> {
159        let url = &self.discovery_url;
160        let client = self.client.get(url);
161        let client = if let Some(requested_etag) = requested_etag {
162            client.header(
163                reqwest::header::IF_NONE_MATCH,
164                hex::encode(requested_etag.to_be_bytes()),
165            )
166        } else {
167            client
168        };
169        let response = client.send().await.map_err(|e| {
170            DiscoveryBackendStoreError::http_error(
171                ServiceErrorSource::Upstream,
172                format!("get all backends {url}"),
173                e,
174            )
175        })?;
176
177        let response_etag = response
178            .headers()
179            .get(reqwest::header::ETAG)
180            .ok_or_else(|| {
181                DiscoveryBackendStoreError::internal_error(
182                    ServiceErrorSource::Upstream,
183                    format!("parsing etag header response from get all backends {url}"),
184                    "missing expected etag".to_string(),
185                )
186            })?
187            .to_str()
188            .map_err(|e| {
189                DiscoveryBackendStoreError::internal_error(
190                    ServiceErrorSource::Upstream,
191                    format!("parsing etag header response from get all backends {url}"),
192                    e.to_string(),
193                )
194            })?;
195
196        let response_etag = DiscoveryBackends::etag_from_str(response_etag).map_err(|e| {
197            DiscoveryBackendStoreError::internal_error(
198                ServiceErrorSource::Upstream,
199                format!(
200                    "parsing etag '{response_etag}' header response from get all backends {url}"
201                ),
202                e.to_string(),
203            )
204        })?;
205
206        match response.status() {
207            StatusCode::OK => {
208                let backends: Vec<DiscoveryBackend> = response.json().await.map_err(|e| {
209                    DiscoveryBackendStoreError::deserialization_error(
210                        ServiceErrorSource::Upstream,
211                        format!("parse all backends {url}"),
212                        e,
213                    )
214                })?;
215
216                Ok(DiscoveryBackends {
217                    etag: response_etag,
218                    backends: Some(backends),
219                })
220            }
221            StatusCode::NOT_MODIFIED => Ok(DiscoveryBackends {
222                etag: response_etag,
223                backends: None,
224            }),
225            status => Err(Self::general_error(
226                status,
227                &format!("get all backends {url}"),
228            )),
229        }
230    }
231
232    async fn post(&self, backend: DiscoveryBackend) -> Result<Option<PublicKey>, Self::Error> {
233        let response = self
234            .client
235            .post(&self.discovery_url)
236            .json(&backend)
237            .send()
238            .await
239            .map_err(|e| {
240                DiscoveryBackendStoreError::http_error(
241                    ServiceErrorSource::Upstream,
242                    format!(
243                        "post backend: {}, url: {}",
244                        backend.public_key, &self.discovery_url
245                    ),
246                    e,
247                )
248            })?;
249
250        match response.status() {
251            StatusCode::CREATED => Ok(Some(backend.public_key)),
252            StatusCode::CONFLICT => Ok(None),
253            status => Err(Self::general_error(
254                status,
255                &format!(
256                    "post backend: {}, url: {}",
257                    backend.public_key, &self.discovery_url
258                ),
259            )),
260        }
261    }
262
263    async fn put(&self, backend: DiscoveryBackend) -> Result<bool, Self::Error> {
264        let url = self.discovery_public_key_url(&backend.public_key);
265
266        let response = self
267            .client
268            .put(&url)
269            .json(&backend.backend)
270            .send()
271            .await
272            .map_err(|e| {
273                DiscoveryBackendStoreError::http_error(
274                    ServiceErrorSource::Upstream,
275                    format!("put backend {url}"),
276                    e,
277                )
278            })?;
279
280        match response.status() {
281            StatusCode::NO_CONTENT => Ok(false),
282            StatusCode::CREATED => Ok(true),
283            status => Err(Self::general_error(status, &format!("put backend {url}"))),
284        }
285    }
286
287    async fn patch(&self, backend: DiscoveryBackendPatch) -> Result<bool, Self::Error> {
288        let url = self.discovery_public_key_url(&backend.public_key);
289
290        let response = self
291            .client
292            .patch(&url)
293            .json(&backend.backend)
294            .send()
295            .await
296            .map_err(|e| {
297                DiscoveryBackendStoreError::http_error(
298                    ServiceErrorSource::Upstream,
299                    format!("patch backend {url}"),
300                    e,
301                )
302            })?;
303
304        match response.status() {
305            StatusCode::NO_CONTENT => Ok(true),
306            StatusCode::NOT_FOUND => Ok(false),
307            status => Err(Self::general_error(status, &format!("patch backend {url}"))),
308        }
309    }
310
311    async fn delete(&self, public_key: &PublicKey) -> Result<bool, Self::Error> {
312        let url = self.discovery_public_key_url(public_key);
313
314        let response = self.client.delete(&url).send().await.map_err(|e| {
315            DiscoveryBackendStoreError::http_error(
316                ServiceErrorSource::Upstream,
317                format!("delete backend {url}"),
318                e,
319            )
320        })?;
321
322        match response.status() {
323            StatusCode::NO_CONTENT => Ok(true),
324            StatusCode::NOT_FOUND => Ok(false),
325            status => Err(Self::general_error(
326                status,
327                &format!("delete backend: {url}"),
328            )),
329        }
330    }
331}
332
333#[async_trait]
334impl HttpDiscoveryBackendClient for HttpDiscoveryBackendStore {
335    async fn health(&self) -> Result<(), Self::Error> {
336        let response = self
337            .client
338            .get(&self.health_check_url)
339            .send()
340            .await
341            .map_err(|e| {
342                DiscoveryBackendStoreError::http_error(
343                    ServiceErrorSource::Upstream,
344                    "health check",
345                    e,
346                )
347            })?;
348        if !response.status().is_success() {
349            return Err(DiscoveryBackendStoreError::http_status_error(
350                ServiceErrorSource::Upstream,
351                "health check",
352                response.status().as_u16(),
353            ));
354        }
355        Ok(())
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use crate::discovery::http::HttpDiscoveryBackendStore;
362    use anyhow::anyhow;
363    use rand::Rng;
364    use secp256k1::{PublicKey, Secp256k1, SecretKey};
365    use url::Url;
366
367    #[test]
368    fn base_urls() {
369        let _ = rustls::crypto::aws_lc_rs::default_provider()
370            .install_default()
371            .map_err(|_| anyhow!("failed to stand up rustls encryption platform"));
372
373        let client = HttpDiscoveryBackendStore::with_client(
374            reqwest::Client::default(),
375            Url::parse("https://base.com").unwrap(),
376        )
377        .unwrap();
378
379        assert_eq!(&client.discovery_url, "https://base.com/discovery");
380
381        let client = HttpDiscoveryBackendStore::with_client(
382            reqwest::Client::default(),
383            Url::parse("https://base.com/").unwrap(),
384        )
385        .unwrap();
386
387        assert_eq!(&client.discovery_url, "https://base.com/discovery");
388
389        assert_eq!(&client.health_check_url, "https://base.com/health");
390
391        let secp = Secp256k1::new();
392        let mut rng = rand::thread_rng();
393
394        let secret_key = SecretKey::from_byte_array(rng.gen::<[u8; 32]>()).unwrap();
395        let public_key = PublicKey::from_secret_key(&secp, &secret_key);
396
397        let discovery_partition_public_key_url = client.discovery_public_key_url(&public_key);
398        assert_eq!(
399            format!("https://base.com/discovery/{public_key}"),
400            discovery_partition_public_key_url,
401        );
402    }
403}