avassa_client/
client.rs

1use crate::strongbox;
2use crate::volga;
3use crate::{Error, Result};
4use bytes::Bytes;
5use serde_json::json;
6
7#[derive(Clone, serde::Deserialize)]
8#[serde(rename_all = "kebab-case")]
9struct LoginToken {
10    token: String,
11    expires_in: i64,
12    expires: chrono::DateTime<chrono::FixedOffset>,
13    creation_time: chrono::DateTime<chrono::FixedOffset>,
14}
15
16impl LoginToken {
17    fn renew_at(&self) -> chrono::DateTime<chrono::FixedOffset> {
18        self.expires - chrono::Duration::seconds(self.expires_in / 4)
19    }
20}
21
22impl std::fmt::Debug for LoginToken {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        f.debug_struct("LoginToken")
25            .field("expires_in", &self.expires_in)
26            .field("creation_time", &self.creation_time)
27            .finish_non_exhaustive()
28    }
29}
30
31#[derive(Debug)]
32struct ClientState {
33    login_token: LoginToken,
34}
35
36/// Builder for an Avassa [`Client`]
37#[derive(Clone)]
38#[allow(clippy::struct_excessive_bools)]
39pub struct ClientBuilder {
40    reqwest_ca: Vec<reqwest::Certificate>,
41    tls_ca: tokio_rustls::rustls::RootCertStore,
42    disable_cert_verification: bool,
43    connection_verbose: bool,
44    auto_renew_token: bool,
45    timeout: Option<core::time::Duration>,
46    connect_timeout: Option<core::time::Duration>,
47}
48
49impl ClientBuilder {
50    /// Create a new builder instance
51    #[must_use]
52    pub(crate) fn new() -> Self {
53        let tls_ca = webpki_roots::TLS_SERVER_ROOTS.iter().cloned().collect();
54        Self {
55            reqwest_ca: Vec::new(),
56            tls_ca,
57            disable_cert_verification: false,
58            connection_verbose: false,
59            auto_renew_token: true,
60            timeout: None,
61            connect_timeout: None,
62        }
63    }
64
65    /// Enables a request timeout
66    #[must_use]
67    pub fn timeout(self, timeout: core::time::Duration) -> Self {
68        Self {
69            timeout: Some(timeout),
70            ..self
71        }
72    }
73
74    /// Set a timeout for only the connect phase of a Client
75    #[must_use]
76    pub fn connection_timeout(self, timeout: core::time::Duration) -> Self {
77        Self {
78            connect_timeout: Some(timeout),
79            ..self
80        }
81    }
82
83    /// Add a root certificate for API certificate verification
84    pub fn add_root_certificate(mut self, cert: &[u8]) -> Result<Self> {
85        use std::iter;
86        let r_ca = reqwest::Certificate::from_pem(cert)?;
87        let mut ca_reader = std::io::BufReader::new(cert);
88        for item in iter::from_fn(|| rustls_pemfile::read_one(&mut ca_reader).transpose()) {
89            if let rustls_pemfile::Item::X509Certificate(cert) = item? {
90                self.tls_ca.add(cert)?;
91            }
92        }
93        self.reqwest_ca.push(r_ca);
94        Ok(self)
95    }
96
97    /// Disable certificate verification
98    #[must_use]
99    pub fn danger_disable_cert_verification(self) -> Self {
100        Self {
101            disable_cert_verification: true,
102            ..self
103        }
104    }
105
106    /// Enabling this option will emit log messages at the TRACE level for read and write operations
107    /// on the https client
108    #[must_use]
109    pub fn enable_verbose_connection(self) -> Self {
110        Self {
111            connection_verbose: true,
112            ..self
113        }
114    }
115
116    /// Disable auto renewal of authentication token
117    #[must_use]
118    pub fn disable_token_auto_renewal(self) -> Self {
119        Self {
120            auto_renew_token: false,
121            ..self
122        }
123    }
124
125    /// Login the application from secret set in the environment
126    /// `approle_id` can optionally be provided
127    /// This assumes the environment variable `APPROLE_SECRET_ID` is set by the system.
128    #[deprecated]
129    pub async fn application_login(&self, host: &str, approle_id: Option<&str>) -> Result<Client> {
130        let secret_id = std::env::var("APPROLE_SECRET_ID")
131            .map_err(|_| Error::LoginFailureMissingEnv(String::from("APPROLE_SECRET_ID")))?;
132
133        // If no app role is provided, we can try to use the secret id as app role.
134        let role_id = approle_id.unwrap_or(&secret_id);
135
136        let base_url = url::Url::parse(host)?;
137        let url = base_url.join("v1/approle-login")?;
138        let data = json!({
139            "role-id": role_id,
140            "secret-id": secret_id,
141        });
142        Client::do_login(self, base_url, url, data).await
143    }
144
145    /// Login using approle
146    pub async fn approle_login(
147        &self,
148        host: &str,
149        secret_id: &str,
150        role_id: Option<&str>,
151    ) -> Result<Client> {
152        // If no role id is provided, use the secret id;
153        let role_id = role_id.unwrap_or(secret_id);
154
155        let base_url = url::Url::parse(host)?;
156        let url = base_url.join("v1/approle-login")?;
157        let data = json!({
158            "role-id": role_id,
159            "secret-id": secret_id,
160        });
161        Client::do_login(self, base_url, url, data).await
162    }
163
164    /// Login to an avassa Control Tower or Edge Enforcer instance. If possible,
165    /// please use the `application_login` as no credentials needs to be distributed.
166    #[tracing::instrument(skip(self, password))]
167    pub async fn login(&self, host: &str, username: &str, password: &str) -> Result<Client> {
168        let base_url = url::Url::parse(host)?;
169        let url = base_url.join("v1/login")?;
170
171        // If we have a tenant, send it.
172        let data = json!({
173            "username":username,
174            "password":password
175        });
176        Client::do_login(self, base_url, url, data).await
177    }
178
179    /// Login using an existing bearer token
180    #[tracing::instrument(skip(self, token))]
181    pub fn token_login(&self, host: &str, token: &str) -> Result<Client> {
182        let base_url = url::Url::parse(host)?;
183        Client::new_from_token(self, base_url, token)
184    }
185}
186
187impl Default for ClientBuilder {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193/// The `Client` is used for all interaction with Control Tower or Edge Enforcer instances.
194/// Use one of the login functions to create an instance.
195#[derive(Clone)]
196pub struct Client {
197    base_url: url::Url,
198    pub(crate) websocket_url: url::Url,
199    state: std::sync::Arc<tokio::sync::Mutex<ClientState>>,
200    client: reqwest::Client,
201    tls_ca: tokio_rustls::rustls::RootCertStore,
202    disable_cert_verification: bool,
203}
204
205impl std::fmt::Debug for Client {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        f.debug_struct("Client")
208            .field("base_url", &self.base_url)
209            .field("websocket_url", &self.websocket_url)
210            .field("state", &self.state)
211            .field("client", &self.client)
212            .field("disable_cert_verification", &self.disable_cert_verification)
213            .finish_non_exhaustive()
214    }
215}
216
217impl Client {
218    /// Create a Client builder
219    #[must_use]
220    pub fn builder() -> ClientBuilder {
221        ClientBuilder::new()
222    }
223
224    async fn do_login(
225        builder: &ClientBuilder,
226        base_url: url::Url,
227        url: url::Url,
228        payload: serde_json::Value,
229    ) -> Result<Self> {
230        let json = serde_json::to_string(&payload)?;
231        let client = Self::reqwest_client(builder)?;
232        let result = client
233            .post(url)
234            .header("content-type", "application/json")
235            .body(json)
236            .send()
237            .await?;
238
239        if result.status().is_success() {
240            let login_token = result.json().await?;
241
242            Self::new(builder, client, base_url, login_token)
243        } else {
244            let text = result.text().await?;
245            tracing::debug!("login returned {}", text);
246            Err(Error::LoginFailure(text))
247        }
248    }
249
250    fn reqwest_client(builder: &ClientBuilder) -> Result<reqwest::Client> {
251        let reqwest_client_builder = reqwest::Client::builder().use_rustls_tls();
252
253        // Add CA certificates
254        let reqwest_client_builder = builder
255            .reqwest_ca
256            .iter()
257            .fold(reqwest_client_builder, |reqwest_client_builder, ca| {
258                reqwest_client_builder.add_root_certificate(ca.clone())
259            });
260
261        tracing::debug!("Added {} CA certs", builder.reqwest_ca.len());
262
263        let reqwest_client_builder =
264            reqwest_client_builder.danger_accept_invalid_certs(builder.disable_cert_verification);
265
266        let reqwest_client_builder =
267            reqwest_client_builder.connection_verbose(builder.connection_verbose);
268
269        let reqwest_client_builder = if let Some(duration) = builder.timeout {
270            reqwest_client_builder.timeout(duration)
271        } else {
272            reqwest_client_builder
273        };
274
275        let reqwest_client_builder = if let Some(duration) = builder.connect_timeout {
276            reqwest_client_builder.connect_timeout(duration)
277        } else {
278            reqwest_client_builder
279        };
280
281        let client = reqwest_client_builder.build()?;
282        Ok(client)
283    }
284
285    fn new_from_token(builder: &ClientBuilder, base_url: url::Url, token: &str) -> Result<Self> {
286        let client = Self::reqwest_client(builder)?;
287        let creation_time = chrono::Local::now().into();
288        let expires = creation_time + chrono::Duration::seconds(1);
289
290        let login_token = LoginToken {
291            token: token.to_string(),
292            expires_in: 1,
293            creation_time,
294            expires,
295        };
296
297        Self::new(builder, client, base_url, login_token)
298    }
299
300    fn new(
301        builder: &ClientBuilder,
302        client: reqwest::Client,
303        base_url: url::Url,
304        login_token: LoginToken,
305    ) -> Result<Self> {
306        let websocket_url = url::Url::parse(&format!("wss://{}/v1/ws/", base_url.host_port()?))?;
307
308        let renew_at = login_token.renew_at();
309
310        let state = std::sync::Arc::new(tokio::sync::Mutex::new(ClientState { login_token }));
311
312        let weak_state = std::sync::Arc::downgrade(&state);
313        let refresh_url = base_url.join("/v1/state/strongbox/token/refresh")?;
314
315        if builder.auto_renew_token {
316            tokio::spawn(renew_token_task(
317                weak_state,
318                renew_at,
319                client.clone(),
320                refresh_url,
321            ));
322        }
323
324        Ok(Self {
325            client,
326            tls_ca: builder.tls_ca.clone(),
327            disable_cert_verification: builder.disable_cert_verification,
328            base_url,
329            websocket_url,
330            state,
331        })
332    }
333
334    /// Returns the login bearer token
335    pub async fn bearer_token(&self) -> String {
336        let state = self.state.lock().await;
337        state.login_token.token.clone()
338    }
339
340    /// GET a json payload from the REST API.
341    pub async fn get_json<T: serde::de::DeserializeOwned>(
342        &self,
343        path: &str,
344        query_params: Option<&[(&str, &str)]>,
345    ) -> Result<T> {
346        let url = self.base_url.join(path)?;
347
348        let token = self.bearer_token().await;
349
350        let mut builder = self
351            .client
352            .get(url)
353            .bearer_auth(&token)
354            .header("Accept", "application/json");
355        if let Some(qp) = query_params {
356            builder = builder.query(qp);
357        }
358
359        let result = builder.send().await?;
360
361        if result.status().is_success() {
362            let res = result.json().await?;
363            Ok(res)
364        } else {
365            let status = result.status();
366            let error_payload = result
367                .text()
368                .await
369                .unwrap_or_else(|_| "No error payload".to_string());
370            Err(Error::WebServer(
371                status.as_u16(),
372                status.to_string(),
373                error_payload,
374            ))
375        }
376    }
377
378    /// GET a bytes payload from the REST API.
379    pub async fn get_bytes(
380        &self,
381        path: &str,
382        query_params: Option<&[(&str, &str)]>,
383    ) -> Result<Bytes> {
384        let url = self.base_url.join(path)?;
385
386        let token = self.bearer_token().await;
387
388        let mut builder = self.client.get(url).bearer_auth(&token);
389
390        if let Some(qp) = query_params {
391            builder = builder.query(qp);
392        }
393
394        let result = builder.send().await?;
395
396        if result.status().is_success() {
397            let res = result.bytes().await?;
398            Ok(res)
399        } else {
400            let status = result.status();
401            let error_payload = result
402                .text()
403                .await
404                .unwrap_or_else(|_| "No error payload".to_string());
405            Err(Error::WebServer(
406                status.as_u16(),
407                status.to_string(),
408                error_payload,
409            ))
410        }
411    }
412
413    /// POST arbitrary JSON to a path
414    /// # Panics
415    /// never
416    pub async fn post_json(
417        &self,
418        path: &str,
419        data: &serde_json::Value,
420    ) -> Result<serde_json::Value> {
421        let url = self.base_url.join(path)?;
422        let token = self.bearer_token().await;
423
424        tracing::debug!("POST {} {:?}", url, data);
425
426        let result = self
427            .client
428            .post(url)
429            .json(&data)
430            .bearer_auth(&token)
431            .send()
432            .await?;
433
434        if result.status().is_success() {
435            let resp = result.bytes().await?;
436
437            let mut responses: Vec<serde_json::Value> = Vec::new();
438            let decoder = serde_json::Deserializer::from_slice(&resp);
439
440            for v in decoder.into_iter() {
441                responses.push(v?);
442            }
443
444            match responses.len() {
445                0 => Ok(serde_json::Value::Object(serde_json::Map::default())),
446                1 => Ok(responses.into_iter().next().unwrap()),
447                _ => {
448                    // Convert to a JSON array
449                    Ok(serde_json::Value::Array(responses))
450                }
451            }
452        } else {
453            tracing::error!("POST call failed");
454            let status = result.status();
455            let resp = result.json().await;
456            match resp {
457                Ok(resp) => Err(Error::REST(resp)),
458                Err(_) => Err(Error::WebServer(
459                    status.as_u16(),
460                    status.to_string(),
461                    "Failed to get JSON responses".to_string(),
462                )),
463            }
464        }
465    }
466
467    /// PUT arbitrary data to a path
468    pub async fn put<T: Into<reqwest::Body> + std::fmt::Debug>(
469        &self,
470        path: &str,
471        content_type: &str,
472        data: T,
473    ) -> Result<()> {
474        let url = self.base_url.join(path)?;
475        let token = self.state.lock().await.login_token.token.clone();
476
477        tracing::debug!("PUT {} {:?}", url, data);
478
479        let result = self
480            .client
481            .put(url)
482            .header(reqwest::header::CONTENT_TYPE, content_type)
483            .body(data)
484            .bearer_auth(&token)
485            .send()
486            .await?;
487
488        //#[allow(clippy::redundant_closure_for_method_calls)]
489        if result.status().is_success() {
490            Ok(())
491        } else {
492            tracing::error!("PUT call failed");
493            let status = result.status();
494            let resp = result.json().await;
495            match resp {
496                Ok(resp) => Err(Error::REST(resp)),
497                Err(_) => Err(Error::WebServer(
498                    status.as_u16(),
499                    status.to_string(),
500                    "Failed to get JSON reply".to_string(),
501                )),
502            }
503        }
504    }
505
506    /// PUT arbitrary JSON to a path
507    pub async fn put_json(
508        &self,
509        path: &str,
510        data: &serde_json::Value,
511    ) -> Result<serde_json::Value> {
512        let url = self.base_url.join(path)?;
513        let token = self.state.lock().await.login_token.token.clone();
514
515        tracing::debug!("PUT {} {:?}", url, data);
516
517        let result = self
518            .client
519            .put(url)
520            .json(&data)
521            .bearer_auth(&token)
522            .send()
523            .await?;
524
525        #[allow(clippy::redundant_closure_for_method_calls)]
526        if result.status().is_success() {
527            use std::error::Error;
528            let resp = result.json().await.or_else(|e| match e {
529                e if e.is_decode() => {
530                    match e
531                        .source()
532                        .and_then(|e| e.downcast_ref::<serde_json::Error>())
533                    {
534                        Some(e) if e.is_eof() => {
535                            Ok(serde_json::Value::Object(serde_json::Map::new()))
536                        }
537                        _ => Err(e),
538                    }
539                }
540                e => Err(e),
541            })?;
542            Ok(resp)
543        } else {
544            tracing::error!("PUT call failed");
545            let status = result.status();
546            let resp = result.json().await;
547            match resp {
548                Ok(resp) => Err(Error::REST(resp)),
549                Err(_) => Err(Error::WebServer(
550                    status.as_u16(),
551                    status.to_string(),
552                    "Failed to get JSON reply".to_string(),
553                )),
554            }
555        }
556    }
557
558    /// Open a volga producer on a topic
559    pub async fn volga_open_producer(
560        &self,
561        producer_name: &str,
562        topic: &str,
563        on_no_exists: volga::OnNoExists,
564    ) -> Result<volga::producer::Producer> {
565        crate::volga::producer::Builder::new(self, producer_name, topic, on_no_exists)?
566            .connect()
567            .await
568    }
569
570    /// Open a volga NAT producer on a topic in a site
571    pub async fn volga_open_child_site_producer(
572        &self,
573        producer_name: &str,
574        topic: &str,
575        site: &str,
576        on_no_exists: volga::OnNoExists,
577    ) -> Result<volga::producer::Producer> {
578        crate::volga::producer::Builder::new_child(self, producer_name, topic, site, on_no_exists)?
579            .connect()
580            .await
581    }
582
583    /// Creates and opens a Volga consumer
584    #[tracing::instrument(skip(self))]
585    pub async fn volga_open_consumer(
586        &self,
587        consumer_name: &str,
588        topic: &str,
589        options: crate::volga::consumer::Options<'_>,
590    ) -> Result<volga::consumer::Consumer> {
591        crate::volga::consumer::Builder::new(self, consumer_name, topic)?
592            .set_options(options)
593            .connect()
594            .await
595    }
596
597    /// Creates and opens a Volga consumer on a child site
598    pub async fn volga_open_child_site_consumer(
599        &self,
600        consumer_name: &str,
601        topic: &str,
602        site: &str,
603        options: crate::volga::consumer::Options<'_>,
604    ) -> Result<volga::consumer::Consumer> {
605        crate::volga::consumer::Builder::new_child(self, consumer_name, topic, site)?
606            .set_options(options)
607            .connect()
608            .await
609    }
610
611    #[tracing::instrument(skip(self))]
612    pub(crate) async fn open_tls_stream(
613        &self,
614    ) -> Result<tokio_rustls::client::TlsStream<tokio::net::TcpStream>> {
615        let mut client_config = tokio_rustls::rustls::ClientConfig::builder()
616            .with_root_certificates(self.tls_ca.clone())
617            .with_no_client_auth();
618
619        if self.disable_cert_verification {
620            let mut danger = client_config.dangerous();
621
622            danger.set_certificate_verifier(std::sync::Arc::new(CertificateVerifier));
623        }
624
625        let client_config = std::sync::Arc::new(client_config);
626
627        let connector: tokio_rustls::TlsConnector = client_config.into();
628        let addrs = self.websocket_url.socket_addrs(|| None)?;
629        let stream = tokio::net::TcpStream::connect(&*addrs).await?;
630
631        let server_name = tokio_rustls::rustls::pki_types::ServerName::try_from(
632            self.websocket_url.host_str().unwrap().to_owned(),
633        )?;
634        let stream = connector.connect(server_name, stream).await?;
635        Ok(stream)
636    }
637
638    /// Opens a query stream
639    pub async fn volga_open_log_query(
640        &self,
641        query: &volga::log_query::Query,
642    ) -> Result<volga::log_query::QueryStream> {
643        volga::log_query::QueryStream::new(self, query).await
644    }
645
646    /// Try to open a Strongbox Vault
647    pub async fn open_strongbox_vault(&self, vault: &str) -> Result<strongbox::Vault> {
648        strongbox::Vault::open(self, vault).await
649    }
650}
651
652#[derive(Debug)]
653struct CertificateVerifier;
654
655impl tokio_rustls::rustls::client::danger::ServerCertVerifier for CertificateVerifier {
656    fn verify_server_cert(
657        &self,
658        _end_entity: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
659        _intermediates: &[tokio_rustls::rustls::pki_types::CertificateDer<'_>],
660        _server_name: &tokio_rustls::rustls::pki_types::ServerName<'_>,
661        _ocsp_response: &[u8],
662        _now: tokio_rustls::rustls::pki_types::UnixTime,
663    ) -> std::result::Result<
664        tokio_rustls::rustls::client::danger::ServerCertVerified,
665        tokio_rustls::rustls::Error,
666    > {
667        Ok(tokio_rustls::rustls::client::danger::ServerCertVerified::assertion())
668    }
669
670    fn verify_tls12_signature(
671        &self,
672        _message: &[u8],
673        _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
674        _dss: &tokio_rustls::rustls::DigitallySignedStruct,
675    ) -> std::result::Result<
676        tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
677        tokio_rustls::rustls::Error,
678    > {
679        Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
680    }
681
682    fn verify_tls13_signature(
683        &self,
684        _message: &[u8],
685        _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
686        _dss: &tokio_rustls::rustls::DigitallySignedStruct,
687    ) -> std::result::Result<
688        tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
689        tokio_rustls::rustls::Error,
690    > {
691        Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
692    }
693
694    fn supported_verify_schemes(&self) -> Vec<tokio_rustls::rustls::SignatureScheme> {
695        vec![
696            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA1,
697            tokio_rustls::rustls::SignatureScheme::ECDSA_SHA1_Legacy,
698            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA256,
699            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
700            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA384,
701            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
702            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA512,
703            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
704            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA256,
705            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA384,
706            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA512,
707            tokio_rustls::rustls::SignatureScheme::ED25519,
708            tokio_rustls::rustls::SignatureScheme::ED448,
709        ]
710    }
711}
712
713pub(crate) trait URLExt {
714    fn host_port(&self) -> std::result::Result<String, url::ParseError>;
715}
716
717impl URLExt for url::Url {
718    fn host_port(&self) -> std::result::Result<String, url::ParseError> {
719        let host = self.host_str().ok_or(url::ParseError::EmptyHost)?;
720        Ok(match (host, self.port()) {
721            (host, Some(port)) => format!("{host}:{port}"),
722            (host, _) => host.to_string(),
723        })
724    }
725}
726
727#[tracing::instrument(skip(next_renew_at, weak_state, client, refresh_url))]
728async fn renew_token_task(
729    weak_state: std::sync::Weak<tokio::sync::Mutex<ClientState>>,
730    mut next_renew_at: chrono::DateTime<chrono::FixedOffset>,
731    client: reqwest::Client,
732    refresh_url: url::Url,
733) {
734    loop {
735        let now: chrono::DateTime<_> = chrono::Local::now().into();
736        let sleep_time = next_renew_at - now;
737
738        tracing::debug!("renew token in {sleep_time}");
739
740        tokio::time::sleep(
741            sleep_time
742                .to_std()
743                .unwrap_or_else(|_| std::time::Duration::from_secs(0)),
744        )
745        .await;
746
747        if let Some(state) = weak_state.upgrade() {
748            let mut state = state.lock().await;
749            let response = client
750                .post(refresh_url.clone())
751                .bearer_auth(&state.login_token.token)
752                .send()
753                .await;
754
755            let response = match response {
756                Ok(r) => r,
757                Err(e) => {
758                    tracing::error!("Failed to renew token: {e}");
759                    let now: chrono::DateTime<chrono::FixedOffset> = chrono::Local::now().into();
760                    next_renew_at = now + chrono::Duration::seconds(1);
761                    continue;
762                }
763            };
764
765            let text = response.text().await.unwrap();
766            let new_login_token = serde_json::from_str::<LoginToken>(&text);
767
768            match new_login_token {
769                Ok(new_login_token) => {
770                    next_renew_at = new_login_token.renew_at();
771                    state.login_token = new_login_token;
772                    tracing::debug!("Successfully renewed token");
773                }
774                Err(e) => {
775                    tracing::error!("Failed to parse or get token: {e}");
776                    // After failure, we check every second
777                    let now: chrono::DateTime<chrono::FixedOffset> = chrono::Local::now().into();
778                    next_renew_at = now + chrono::Duration::seconds(1);
779                }
780            }
781        } else {
782            tracing::info!("renew_token: State lost");
783            // If we can't get the state, the client is gone and we should go as well
784            break;
785        }
786    }
787}
788
789#[cfg(test)]
790mod test {
791    #[test]
792    fn url_ext() {
793        use super::URLExt;
794        let url = url::Url::parse("https://1.2.3.4:5000/a/b/c").unwrap();
795        let host_port = url.host_port().unwrap();
796        assert_eq!(&host_port, "1.2.3.4:5000");
797
798        let url = url::Url::parse("https://1.2.3.4/a/b/c").unwrap();
799        let host_port = url.host_port().unwrap();
800        assert_eq!(&host_port, "1.2.3.4");
801
802        let url = url::Url::parse("https://www.avassa.com/a/b/c").unwrap();
803        let host_port = url.host_port().unwrap();
804        assert_eq!(&host_port, "www.avassa.com");
805
806        let url = url::Url::parse("https://www.avassa.com:1234/a/b/c").unwrap();
807        let host_port = url.host_port().unwrap();
808        assert_eq!(&host_port, "www.avassa.com:1234");
809    }
810}