avassa_client/
client.rs

1use crate::strongbox;
2use crate::volga;
3use crate::{Error, Result};
4use bytes::Bytes;
5use serde_json::json;
6
7#[derive(Clone, serde::Deserialize)]
8#[serde(rename_all = "kebab-case")]
9struct LoginToken {
10    token: String,
11    expires_in: i64,
12    expires: chrono::DateTime<chrono::FixedOffset>,
13    creation_time: chrono::DateTime<chrono::FixedOffset>,
14}
15
16impl LoginToken {
17    fn renew_at(&self) -> chrono::DateTime<chrono::FixedOffset> {
18        self.expires - chrono::Duration::seconds(self.expires_in / 4)
19    }
20}
21
22impl std::fmt::Debug for LoginToken {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        f.debug_struct("LoginToken")
25            .field("expires_in", &self.expires_in)
26            .field("creation_time", &self.creation_time)
27            .finish_non_exhaustive()
28    }
29}
30
31#[derive(Debug)]
32struct ClientState {
33    login_token: LoginToken,
34}
35
36/// Builder for an Avassa [`Client`]
37#[derive(Clone)]
38#[allow(clippy::struct_excessive_bools)]
39pub struct ClientBuilder {
40    reqwest_ca: Vec<reqwest::Certificate>,
41    tls_ca: tokio_rustls::rustls::RootCertStore,
42    disable_cert_verification: bool,
43    connection_verbose: bool,
44    auto_renew_token: bool,
45    timeout: Option<core::time::Duration>,
46    connect_timeout: Option<core::time::Duration>,
47}
48
49impl ClientBuilder {
50    /// Create a new builder instance
51    #[must_use]
52    pub(crate) fn new() -> Self {
53        let tls_ca = webpki_roots::TLS_SERVER_ROOTS.iter().cloned().collect();
54        Self {
55            reqwest_ca: Vec::new(),
56            tls_ca,
57            disable_cert_verification: false,
58            connection_verbose: false,
59            auto_renew_token: true,
60            timeout: None,
61            connect_timeout: None,
62        }
63    }
64
65    /// Enables a request timeout
66    #[must_use]
67    pub fn timeout(self, timeout: core::time::Duration) -> Self {
68        Self {
69            timeout: Some(timeout),
70            ..self
71        }
72    }
73
74    /// Set a timeout for only the connect phase of a Client
75    #[must_use]
76    pub fn connection_timeout(self, timeout: core::time::Duration) -> Self {
77        Self {
78            connect_timeout: Some(timeout),
79            ..self
80        }
81    }
82
83    /// Add a root certificate for API certificate verification
84    pub fn add_root_certificate(mut self, cert: &[u8]) -> Result<Self> {
85        use std::iter;
86        let r_ca = reqwest::Certificate::from_pem(cert)?;
87        let mut ca_reader = std::io::BufReader::new(cert);
88        for item in iter::from_fn(|| rustls_pemfile::read_one(&mut ca_reader).transpose()) {
89            if let rustls_pemfile::Item::X509Certificate(cert) = item? {
90                self.tls_ca.add(cert)?;
91            }
92        }
93        self.reqwest_ca.push(r_ca);
94        Ok(self)
95    }
96
97    /// Disable certificate verification
98    #[must_use]
99    pub fn danger_disable_cert_verification(self) -> Self {
100        Self {
101            disable_cert_verification: true,
102            ..self
103        }
104    }
105
106    /// Enabling this option will emit log messages at the TRACE level for read and write operations
107    /// on the https client
108    #[must_use]
109    pub fn enable_verbose_connection(self) -> Self {
110        Self {
111            connection_verbose: true,
112            ..self
113        }
114    }
115
116    /// Disable auto renewal of authentication token
117    #[must_use]
118    pub fn disable_token_auto_renewal(self) -> Self {
119        Self {
120            auto_renew_token: false,
121            ..self
122        }
123    }
124
125    /// Login the application from secret set in the environment
126    /// `approle_id` can optionally be provided
127    /// This assumes the environment variable `APPROLE_SECRET_ID` is set by the system.
128    #[deprecated]
129    pub async fn application_login(&self, host: &str, approle_id: Option<&str>) -> Result<Client> {
130        let secret_id = std::env::var("APPROLE_SECRET_ID")
131            .map_err(|_| Error::LoginFailureMissingEnv(String::from("APPROLE_SECRET_ID")))?;
132
133        // If no app role is provided, we can try to use the secret id as app role.
134        let role_id = approle_id.unwrap_or(&secret_id);
135
136        let base_url = url::Url::parse(host)?;
137        let url = base_url.join("v1/approle-login")?;
138        let data = json!({
139            "role-id": role_id,
140            "secret-id": secret_id,
141        });
142        Client::do_login(self, base_url, url, data).await
143    }
144
145    /// Login using approle
146    pub async fn approle_login(
147        &self,
148        host: &str,
149        secret_id: &str,
150        role_id: Option<&str>,
151    ) -> Result<Client> {
152        // If no role id is provided, use the secret id;
153        let role_id = role_id.unwrap_or(secret_id);
154
155        let base_url = url::Url::parse(host)?;
156        let url = base_url.join("v1/approle-login")?;
157        let data = json!({
158            "role-id": role_id,
159            "secret-id": secret_id,
160        });
161        Client::do_login(self, base_url, url, data).await
162    }
163
164    /// Login to an avassa Control Tower or Edge Enforcer instance. If possible,
165    /// please use the `application_login` as no credentials needs to be distributed.
166    #[tracing::instrument(skip(self, password))]
167    pub async fn login(&self, host: &str, username: &str, password: &str) -> Result<Client> {
168        let base_url = url::Url::parse(host)?;
169        let url = base_url.join("v1/login")?;
170
171        // If we have a tenant, send it.
172        let data = json!({
173            "username":username,
174            "password":password
175        });
176        Client::do_login(self, base_url, url, data).await
177    }
178
179    /// Login using an existing bearer token
180    #[tracing::instrument(skip(self, token))]
181    pub fn token_login(&self, host: &str, token: &str) -> Result<Client> {
182        let base_url = url::Url::parse(host)?;
183        Client::new_from_token(self, base_url, token)
184    }
185}
186
187impl Default for ClientBuilder {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193/// The `Client` is used for all interaction with Control Tower or Edge Enforcer instances.
194/// Use one of the login functions to create an instance.
195#[derive(Clone)]
196pub struct Client {
197    base_url: url::Url,
198    pub(crate) websocket_url: url::Url,
199    state: std::sync::Arc<tokio::sync::Mutex<ClientState>>,
200    #[allow(clippy::struct_field_names)]
201    http_client: reqwest::Client,
202    tls_ca: tokio_rustls::rustls::RootCertStore,
203    disable_cert_verification: bool,
204}
205
206impl std::fmt::Debug for Client {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        f.debug_struct("Client")
209            .field("base_url", &self.base_url)
210            .field("websocket_url", &self.websocket_url)
211            .field("state", &self.state)
212            .field("http_client", &self.http_client)
213            .field("disable_cert_verification", &self.disable_cert_verification)
214            .finish_non_exhaustive()
215    }
216}
217
218impl Client {
219    /// Create a Client builder
220    #[must_use]
221    pub fn builder() -> ClientBuilder {
222        ClientBuilder::new()
223    }
224
225    async fn do_login(
226        builder: &ClientBuilder,
227        base_url: url::Url,
228        url: url::Url,
229        payload: serde_json::Value,
230    ) -> Result<Self> {
231        let json = serde_json::to_string(&payload)?;
232        let client = Self::reqwest_client(builder)?;
233        let result = client
234            .post(url)
235            .header("content-type", "application/json")
236            .body(json)
237            .send()
238            .await?;
239
240        if result.status().is_success() {
241            let login_token = result.json().await?;
242
243            Self::new(builder, client, base_url, login_token)
244        } else {
245            let text = result.text().await?;
246            tracing::debug!("login returned {}", text);
247            Err(Error::LoginFailure(text))
248        }
249    }
250
251    fn reqwest_client(builder: &ClientBuilder) -> Result<reqwest::Client> {
252        let reqwest_client_builder = reqwest::Client::builder().use_rustls_tls();
253
254        // Add CA certificates
255        let reqwest_client_builder = builder
256            .reqwest_ca
257            .iter()
258            .fold(reqwest_client_builder, |reqwest_client_builder, ca| {
259                reqwest_client_builder.add_root_certificate(ca.clone())
260            });
261
262        tracing::debug!("Added {} CA certs", builder.reqwest_ca.len());
263
264        let reqwest_client_builder =
265            reqwest_client_builder.danger_accept_invalid_certs(builder.disable_cert_verification);
266
267        let reqwest_client_builder =
268            reqwest_client_builder.connection_verbose(builder.connection_verbose);
269
270        let reqwest_client_builder = if let Some(duration) = builder.timeout {
271            reqwest_client_builder.timeout(duration)
272        } else {
273            reqwest_client_builder
274        };
275
276        let reqwest_client_builder = if let Some(duration) = builder.connect_timeout {
277            reqwest_client_builder.connect_timeout(duration)
278        } else {
279            reqwest_client_builder
280        };
281
282        let client = reqwest_client_builder.build()?;
283        Ok(client)
284    }
285
286    fn new_from_token(builder: &ClientBuilder, base_url: url::Url, token: &str) -> Result<Self> {
287        let client = Self::reqwest_client(builder)?;
288        let creation_time = chrono::Local::now().into();
289        let expires = creation_time + chrono::Duration::seconds(1);
290
291        let login_token = LoginToken {
292            token: token.to_string(),
293            expires_in: 1,
294            creation_time,
295            expires,
296        };
297
298        Self::new(builder, client, base_url, login_token)
299    }
300
301    fn new(
302        builder: &ClientBuilder,
303        client: reqwest::Client,
304        base_url: url::Url,
305        login_token: LoginToken,
306    ) -> Result<Self> {
307        let websocket_url = url::Url::parse(&format!("wss://{}/v1/ws/", base_url.host_port()?))?;
308
309        let renew_at = login_token.renew_at();
310
311        let state = std::sync::Arc::new(tokio::sync::Mutex::new(ClientState { login_token }));
312
313        let weak_state = std::sync::Arc::downgrade(&state);
314        let refresh_url = base_url.join("/v1/state/strongbox/token/refresh")?;
315
316        if builder.auto_renew_token {
317            tokio::spawn(renew_token_task(
318                weak_state,
319                renew_at,
320                client.clone(),
321                refresh_url,
322            ));
323        }
324
325        Ok(Self {
326            http_client: client,
327            tls_ca: builder.tls_ca.clone(),
328            disable_cert_verification: builder.disable_cert_verification,
329            base_url,
330            websocket_url,
331            state,
332        })
333    }
334
335    /// Returns the login bearer token
336    pub async fn bearer_token(&self) -> String {
337        let state = self.state.lock().await;
338        state.login_token.token.clone()
339    }
340
341    /// GET a json payload from the REST API.
342    pub async fn get_json<T: serde::de::DeserializeOwned>(
343        &self,
344        path: &str,
345        query_params: Option<&[(&str, &str)]>,
346    ) -> Result<T> {
347        let url = self.base_url.join(path)?;
348
349        let token = self.bearer_token().await;
350
351        let mut builder = self
352            .http_client
353            .get(url)
354            .bearer_auth(&token)
355            .header("Accept", "application/json");
356        if let Some(qp) = query_params {
357            builder = builder.query(qp);
358        }
359
360        let result = builder.send().await?;
361
362        if result.status().is_success() {
363            let res = result.json().await?;
364            Ok(res)
365        } else {
366            let status = result.status();
367            let error_payload = result
368                .text()
369                .await
370                .unwrap_or_else(|_| "No error payload".to_string());
371            Err(Error::WebServer(
372                status.as_u16(),
373                status.to_string(),
374                error_payload,
375            ))
376        }
377    }
378
379    /// GET a bytes payload from the REST API.
380    pub async fn get_bytes(
381        &self,
382        path: &str,
383        query_params: Option<&[(&str, &str)]>,
384    ) -> Result<Bytes> {
385        let url = self.base_url.join(path)?;
386
387        let token = self.bearer_token().await;
388
389        let mut builder = self.http_client.get(url).bearer_auth(&token);
390
391        if let Some(qp) = query_params {
392            builder = builder.query(qp);
393        }
394
395        let result = builder.send().await?;
396
397        if result.status().is_success() {
398            let res = result.bytes().await?;
399            Ok(res)
400        } else {
401            let status = result.status();
402            let error_payload = result
403                .text()
404                .await
405                .unwrap_or_else(|_| "No error payload".to_string());
406            Err(Error::WebServer(
407                status.as_u16(),
408                status.to_string(),
409                error_payload,
410            ))
411        }
412    }
413
414    /// POST arbitrary JSON to a path
415    /// # Panics
416    /// never
417    pub async fn post_json(
418        &self,
419        path: &str,
420        data: &serde_json::Value,
421    ) -> Result<serde_json::Value> {
422        let url = self.base_url.join(path)?;
423        let token = self.bearer_token().await;
424
425        tracing::debug!("POST {} {:?}", url, data);
426
427        let result = self
428            .http_client
429            .post(url)
430            .json(&data)
431            .bearer_auth(&token)
432            .send()
433            .await?;
434
435        if result.status().is_success() {
436            let resp = result.bytes().await?;
437
438            let mut responses: Vec<serde_json::Value> = Vec::new();
439            let decoder = serde_json::Deserializer::from_slice(&resp);
440
441            for v in decoder.into_iter() {
442                responses.push(v?);
443            }
444
445            match responses.len() {
446                0 => Ok(serde_json::Value::Object(serde_json::Map::default())),
447                1 => Ok(responses.into_iter().next().unwrap()),
448                _ => {
449                    // Convert to a JSON array
450                    Ok(serde_json::Value::Array(responses))
451                }
452            }
453        } else {
454            tracing::error!("POST call failed");
455            let status = result.status();
456            let resp = result.json().await;
457            match resp {
458                Ok(resp) => Err(Error::REST(resp)),
459                Err(_) => Err(Error::WebServer(
460                    status.as_u16(),
461                    status.to_string(),
462                    "Failed to get JSON responses".to_string(),
463                )),
464            }
465        }
466    }
467
468    /// PUT arbitrary data to a path
469    pub async fn put<T: Into<reqwest::Body> + std::fmt::Debug>(
470        &self,
471        path: &str,
472        content_type: &str,
473        data: T,
474    ) -> Result<()> {
475        let url = self.base_url.join(path)?;
476        let token = self.state.lock().await.login_token.token.clone();
477
478        tracing::debug!("PUT {} {:?}", url, data);
479
480        let result = self
481            .http_client
482            .put(url)
483            .header(reqwest::header::CONTENT_TYPE, content_type)
484            .body(data)
485            .bearer_auth(&token)
486            .send()
487            .await?;
488
489        //#[allow(clippy::redundant_closure_for_method_calls)]
490        if result.status().is_success() {
491            Ok(())
492        } else {
493            tracing::error!("PUT call failed");
494            let status = result.status();
495            let resp = result.json().await;
496            match resp {
497                Ok(resp) => Err(Error::REST(resp)),
498                Err(_) => Err(Error::WebServer(
499                    status.as_u16(),
500                    status.to_string(),
501                    "Failed to get JSON reply".to_string(),
502                )),
503            }
504        }
505    }
506
507    /// PUT arbitrary JSON to a path
508    pub async fn put_json(
509        &self,
510        path: &str,
511        data: &serde_json::Value,
512    ) -> Result<serde_json::Value> {
513        let url = self.base_url.join(path)?;
514        let token = self.state.lock().await.login_token.token.clone();
515
516        tracing::debug!("PUT {} {:?}", url, data);
517
518        let result = self
519            .http_client
520            .put(url)
521            .json(&data)
522            .bearer_auth(&token)
523            .send()
524            .await?;
525
526        #[allow(clippy::redundant_closure_for_method_calls)]
527        if result.status().is_success() {
528            use std::error::Error;
529            let resp = result.json().await.or_else(|e| match e {
530                e if e.is_decode() => {
531                    match e
532                        .source()
533                        .and_then(|e| e.downcast_ref::<serde_json::Error>())
534                    {
535                        Some(e) if e.is_eof() => {
536                            Ok(serde_json::Value::Object(serde_json::Map::new()))
537                        }
538                        _ => Err(e),
539                    }
540                }
541                e => Err(e),
542            })?;
543            Ok(resp)
544        } else {
545            tracing::error!("PUT call failed");
546            let status = result.status();
547            let resp = result.json().await;
548            match resp {
549                Ok(resp) => Err(Error::REST(resp)),
550                Err(_) => Err(Error::WebServer(
551                    status.as_u16(),
552                    status.to_string(),
553                    "Failed to get JSON reply".to_string(),
554                )),
555            }
556        }
557    }
558
559    /// Open a volga producer on a topic
560    pub async fn volga_open_producer(
561        &self,
562        producer_name: &str,
563        topic: &str,
564        on_no_exists: volga::OnNoExists,
565    ) -> Result<volga::producer::Producer> {
566        crate::volga::producer::Builder::new(self, producer_name, topic, on_no_exists)?
567            .connect()
568            .await
569    }
570
571    /// Open a volga NAT producer on a topic in a site
572    pub async fn volga_open_child_site_producer(
573        &self,
574        producer_name: &str,
575        topic: &str,
576        site: &str,
577        on_no_exists: volga::OnNoExists,
578    ) -> Result<volga::producer::Producer> {
579        crate::volga::producer::Builder::new_child(self, producer_name, topic, site, on_no_exists)?
580            .connect()
581            .await
582    }
583
584    /// Creates and opens a Volga consumer
585    #[tracing::instrument(skip(self))]
586    pub async fn volga_open_consumer(
587        &self,
588        consumer_name: &str,
589        topic: &str,
590        options: crate::volga::consumer::Options<'_>,
591    ) -> Result<volga::consumer::Consumer> {
592        crate::volga::consumer::Builder::new(self, consumer_name, topic)?
593            .set_options(options)
594            .connect()
595            .await
596    }
597
598    /// Creates and opens a Volga consumer on a child site
599    pub async fn volga_open_child_site_consumer(
600        &self,
601        consumer_name: &str,
602        topic: &str,
603        site: &str,
604        options: crate::volga::consumer::Options<'_>,
605    ) -> Result<volga::consumer::Consumer> {
606        crate::volga::consumer::Builder::new_child(self, consumer_name, topic, site)?
607            .set_options(options)
608            .connect()
609            .await
610    }
611
612    #[tracing::instrument(skip(self))]
613    pub(crate) async fn open_tls_stream(
614        &self,
615    ) -> Result<tokio_rustls::client::TlsStream<tokio::net::TcpStream>> {
616        let mut client_config = tokio_rustls::rustls::ClientConfig::builder()
617            .with_root_certificates(self.tls_ca.clone())
618            .with_no_client_auth();
619
620        if self.disable_cert_verification {
621            let mut danger = client_config.dangerous();
622
623            danger.set_certificate_verifier(std::sync::Arc::new(CertificateVerifier));
624        }
625
626        let client_config = std::sync::Arc::new(client_config);
627
628        let connector: tokio_rustls::TlsConnector = client_config.into();
629        let addrs = self.websocket_url.socket_addrs(|| None)?;
630        let stream = tokio::net::TcpStream::connect(&*addrs).await?;
631
632        let server_name = tokio_rustls::rustls::pki_types::ServerName::try_from(
633            self.websocket_url.host_str().unwrap().to_owned(),
634        )?;
635        let stream = connector.connect(server_name, stream).await?;
636        Ok(stream)
637    }
638
639    /// Opens a query stream
640    pub async fn volga_query_topic(
641        &self,
642        query: volga::query_topic::Query,
643    ) -> Result<volga::query_topic::QueryStream> {
644        volga::query_topic::QueryStream::new(self, query).await
645    }
646
647    /// Try to open a Strongbox Vault
648    pub async fn open_strongbox_vault(&self, vault: &str) -> Result<strongbox::Vault> {
649        strongbox::Vault::open(self, vault).await
650    }
651}
652
653#[derive(Debug)]
654struct CertificateVerifier;
655
656impl tokio_rustls::rustls::client::danger::ServerCertVerifier for CertificateVerifier {
657    fn verify_server_cert(
658        &self,
659        _end_entity: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
660        _intermediates: &[tokio_rustls::rustls::pki_types::CertificateDer<'_>],
661        _server_name: &tokio_rustls::rustls::pki_types::ServerName<'_>,
662        _ocsp_response: &[u8],
663        _now: tokio_rustls::rustls::pki_types::UnixTime,
664    ) -> std::result::Result<
665        tokio_rustls::rustls::client::danger::ServerCertVerified,
666        tokio_rustls::rustls::Error,
667    > {
668        Ok(tokio_rustls::rustls::client::danger::ServerCertVerified::assertion())
669    }
670
671    fn verify_tls12_signature(
672        &self,
673        _message: &[u8],
674        _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
675        _dss: &tokio_rustls::rustls::DigitallySignedStruct,
676    ) -> std::result::Result<
677        tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
678        tokio_rustls::rustls::Error,
679    > {
680        Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
681    }
682
683    fn verify_tls13_signature(
684        &self,
685        _message: &[u8],
686        _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
687        _dss: &tokio_rustls::rustls::DigitallySignedStruct,
688    ) -> std::result::Result<
689        tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
690        tokio_rustls::rustls::Error,
691    > {
692        Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
693    }
694
695    fn supported_verify_schemes(&self) -> Vec<tokio_rustls::rustls::SignatureScheme> {
696        vec![
697            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA1,
698            tokio_rustls::rustls::SignatureScheme::ECDSA_SHA1_Legacy,
699            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA256,
700            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
701            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA384,
702            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
703            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA512,
704            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
705            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA256,
706            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA384,
707            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA512,
708            tokio_rustls::rustls::SignatureScheme::ED25519,
709            tokio_rustls::rustls::SignatureScheme::ED448,
710        ]
711    }
712}
713
714pub(crate) trait URLExt {
715    fn host_port(&self) -> std::result::Result<String, url::ParseError>;
716}
717
718impl URLExt for url::Url {
719    fn host_port(&self) -> std::result::Result<String, url::ParseError> {
720        let host = self.host_str().ok_or(url::ParseError::EmptyHost)?;
721        Ok(match (host, self.port()) {
722            (host, Some(port)) => format!("{host}:{port}"),
723            (host, _) => host.to_string(),
724        })
725    }
726}
727
728#[tracing::instrument(skip(next_renew_at, weak_state, client, refresh_url))]
729async fn renew_token_task(
730    weak_state: std::sync::Weak<tokio::sync::Mutex<ClientState>>,
731    mut next_renew_at: chrono::DateTime<chrono::FixedOffset>,
732    client: reqwest::Client,
733    refresh_url: url::Url,
734) {
735    loop {
736        let now: chrono::DateTime<_> = chrono::Local::now().into();
737        let sleep_time = next_renew_at - now;
738
739        tracing::debug!("renew token in {sleep_time}");
740
741        tokio::time::sleep(
742            sleep_time
743                .to_std()
744                .unwrap_or_else(|_| std::time::Duration::from_secs(0)),
745        )
746        .await;
747
748        if let Some(state) = weak_state.upgrade() {
749            let mut state = state.lock().await;
750            let response = client
751                .post(refresh_url.clone())
752                .bearer_auth(&state.login_token.token)
753                .send()
754                .await;
755
756            let response = match response {
757                Ok(r) => r,
758                Err(e) => {
759                    tracing::error!("Failed to renew token: {e}");
760                    let now: chrono::DateTime<chrono::FixedOffset> = chrono::Local::now().into();
761                    next_renew_at = now + chrono::Duration::seconds(1);
762                    continue;
763                }
764            };
765
766            let text = response.text().await.unwrap();
767            let new_login_token = serde_json::from_str::<LoginToken>(&text);
768
769            match new_login_token {
770                Ok(new_login_token) => {
771                    next_renew_at = new_login_token.renew_at();
772                    state.login_token = new_login_token;
773                    tracing::debug!("Successfully renewed token");
774                }
775                Err(e) => {
776                    tracing::error!("Failed to parse or get token: {e}");
777                    // After failure, we check every second
778                    let now: chrono::DateTime<chrono::FixedOffset> = chrono::Local::now().into();
779                    next_renew_at = now + chrono::Duration::seconds(1);
780                }
781            }
782        } else {
783            tracing::info!("renew_token: State lost");
784            // If we can't get the state, the client is gone and we should go as well
785            break;
786        }
787    }
788}
789
790#[cfg(test)]
791mod test {
792    #[test]
793    fn url_ext() {
794        use super::URLExt;
795        let url = url::Url::parse("https://1.2.3.4:5000/a/b/c").unwrap();
796        let host_port = url.host_port().unwrap();
797        assert_eq!(&host_port, "1.2.3.4:5000");
798
799        let url = url::Url::parse("https://1.2.3.4/a/b/c").unwrap();
800        let host_port = url.host_port().unwrap();
801        assert_eq!(&host_port, "1.2.3.4");
802
803        let url = url::Url::parse("https://www.avassa.com/a/b/c").unwrap();
804        let host_port = url.host_port().unwrap();
805        assert_eq!(&host_port, "www.avassa.com");
806
807        let url = url::Url::parse("https://www.avassa.com:1234/a/b/c").unwrap();
808        let host_port = url.host_port().unwrap();
809        assert_eq!(&host_port, "www.avassa.com:1234");
810    }
811}