Skip to main content

avassa_client/
client.rs

1use crate::volga;
2use crate::{Error, Result};
3use bytes::Bytes;
4use serde_json::json;
5
6/// Cert and private key in PEM format.
7#[derive(Clone, serde::Deserialize)]
8#[serde(rename_all = "kebab-case")]
9#[non_exhaustive]
10pub struct X509SVID {
11    pub cert: String,
12    pub private_key: String,
13}
14
15#[derive(Clone, serde::Deserialize)]
16#[serde(rename_all = "kebab-case")]
17struct LoginToken {
18    token: String,
19    expires_in: i64,
20    expires: chrono::DateTime<chrono::Utc>,
21    creation_time: chrono::DateTime<chrono::Utc>,
22    jwt_svid: Option<String>,
23    x509_svid: Option<X509SVID>,
24}
25
26impl LoginToken {
27    #[expect(clippy::integer_division, reason = "Good enough precision")]
28    #[expect(
29        clippy::integer_division_remainder_used,
30        reason = "Good enough precision"
31    )]
32    fn renew_at(&self) -> chrono::DateTime<chrono::Utc> {
33        let quarter_expiry = self.expires_in / 4;
34        self.expires
35            .checked_sub_signed(chrono::Duration::seconds(quarter_expiry))
36            .unwrap_or_else(chrono::Utc::now)
37    }
38}
39
40impl core::fmt::Debug for LoginToken {
41    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
42        f.debug_struct("LoginToken")
43            .field("creation_time", &self.creation_time)
44            .field("expires_in", &self.expires_in)
45            .field("expires", &self.expires)
46            .field("jwt-svid", &self.jwt_svid.is_some())
47            .field("x509-svid", &self.x509_svid.is_some())
48            .finish_non_exhaustive()
49    }
50}
51
52#[derive(Debug)]
53struct ClientState {
54    login_token: LoginToken,
55}
56
57/// Builder for an Avassa [`Client`].
58#[derive(Clone)]
59#[expect(clippy::module_name_repetitions, reason = "Backwards compat")]
60pub struct ClientBuilder {
61    reqwest_ca: Vec<reqwest::Certificate>,
62    tls_ca: tokio_rustls::rustls::RootCertStore,
63    disable_cert_verification: bool,
64    connection_verbose: bool,
65    auto_renew_token: bool,
66    timeout: Option<core::time::Duration>,
67    connect_timeout: Option<core::time::Duration>,
68}
69
70impl ClientBuilder {
71    /// Create a new builder instance.
72    #[must_use]
73    pub(crate) fn new() -> Self {
74        let tls_ca = webpki_roots::TLS_SERVER_ROOTS.iter().cloned().collect();
75        Self {
76            reqwest_ca: Vec::new(),
77            tls_ca,
78            disable_cert_verification: false,
79            connection_verbose: false,
80            auto_renew_token: true,
81            timeout: None,
82            connect_timeout: None,
83        }
84    }
85
86    /// Enables a request timeout.
87    #[must_use]
88    #[inline]
89    pub fn timeout(self, timeout: core::time::Duration) -> Self {
90        Self {
91            timeout: Some(timeout),
92            ..self
93        }
94    }
95
96    /// Set a timeout for only the connect phase of a Client.
97    #[must_use]
98    #[inline]
99    pub fn connection_timeout(self, timeout: core::time::Duration) -> Self {
100        Self {
101            connect_timeout: Some(timeout),
102            ..self
103        }
104    }
105
106    /// Add a root certificate for API certificate verification.
107    #[inline]
108    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
109    pub fn add_root_certificate(mut self, cert: &[u8]) -> Result<Self> {
110        use core::iter;
111        let r_ca = reqwest::Certificate::from_pem(cert)?;
112        let mut ca_reader = std::io::BufReader::new(cert);
113        for item in iter::from_fn(|| rustls_pemfile::read_one(&mut ca_reader).transpose()) {
114            if let rustls_pemfile::Item::X509Certificate(ca_cert) = item? {
115                self.tls_ca.add(ca_cert)?;
116            }
117        }
118        self.reqwest_ca.push(r_ca);
119        Ok(self)
120    }
121
122    /// Disable certificate verification.
123    #[must_use]
124    #[inline]
125    pub fn danger_disable_cert_verification(self) -> Self {
126        Self {
127            disable_cert_verification: true,
128            ..self
129        }
130    }
131
132    /// Enabling this option will emit log messages at the TRACE level for read and write operations
133    /// on the https client.
134    #[must_use]
135    #[inline]
136    pub fn enable_verbose_connection(self) -> Self {
137        Self {
138            connection_verbose: true,
139            ..self
140        }
141    }
142
143    /// Disable auto renewal of authentication token.
144    #[must_use]
145    #[inline]
146    pub fn disable_token_auto_renewal(self) -> Self {
147        Self {
148            auto_renew_token: false,
149            ..self
150        }
151    }
152
153    /// Login the application from secret set in the environment
154    /// `approle_id` can optionally be provided
155    /// This assumes the environment variable `APPROLE_SECRET_ID` is set by the system.
156    #[deprecated]
157    #[inline]
158    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
159    pub async fn application_login(&self, host: &str, approle_id: Option<&str>) -> Result<Client> {
160        let secret_id = std::env::var("APPROLE_SECRET_ID")
161            .map_err(|_| Error::LoginFailureMissingEnv(String::from("APPROLE_SECRET_ID")))?;
162
163        // If no app role is provided, we can try to use the secret id as app role.
164        let role_id = approle_id.unwrap_or(&secret_id);
165
166        let base_url = url::Url::parse(host)?;
167        let url = base_url.join("v1/approle-login")?;
168        let data = json!({
169            "role-id": role_id,
170            "secret-id": secret_id,
171        });
172        Client::do_login(self, base_url, url, data).await
173    }
174
175    /// Login using approle.
176    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
177    pub async fn approle_login(
178        &self,
179        host: &str,
180        secret_id: &str,
181        role_id: Option<&str>,
182    ) -> Result<Client> {
183        // If no role id is provided, use the secret id;
184        let role_id = role_id.unwrap_or(secret_id);
185
186        let base_url = url::Url::parse(host)?;
187        let url = base_url.join("v1/approle-login")?;
188        let data = json!({
189            "role-id": role_id,
190            "secret-id": secret_id,
191        });
192        Client::do_login(self, base_url, url, data).await
193    }
194
195    /// Login to an avassa Control Tower or Edge Enforcer instance. If possible,
196    /// please use the `application_login` as no credentials needs to be distributed.
197    #[tracing::instrument(skip(self, password))]
198    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
199    pub async fn login(&self, host: &str, username: &str, password: &str) -> Result<Client> {
200        let base_url = url::Url::parse(host)?;
201        let url = base_url.join("v1/login")?;
202
203        // If we have a tenant, send it.
204        let data = json!({
205            "username":username,
206            "password":password
207        });
208        Client::do_login(self, base_url, url, data).await
209    }
210
211    /// Login using an existing bearer token.
212    #[tracing::instrument(skip(self, token))]
213    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
214    #[expect(clippy::arithmetic_side_effects, reason = "Ok to add one second")]
215    pub fn token_login(&self, host: &str, token: &str) -> Result<Client> {
216        let base_url = url::Url::parse(host)?;
217        let client = Client::reqwest_client(self)?;
218        let creation_time = chrono::Local::now().into();
219        let expires = creation_time + chrono::Duration::seconds(1);
220
221        let login_token = LoginToken {
222            token: token.to_owned(),
223            expires_in: 1,
224            creation_time,
225            expires,
226            jwt_svid: None,
227            x509_svid: None,
228        };
229
230        Client::new(self, client, base_url, login_token)
231    }
232
233    /// Login using JWT issued by external entity, e.g. MS Entra.
234    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
235    pub async fn jwt_login(
236        &self,
237        host: &str,
238        tenant_name: &str,
239        jwt_auth: &str,
240        role: &str,
241        jwt: &str,
242    ) -> Result<Client> {
243        let base_url = url::Url::parse(host)?;
244        let url = base_url.join("v1/jwt-login")?;
245
246        let data = json!({
247            "tenant": tenant_name,
248            "jwt-auth": jwt_auth,
249            "role": role,
250            "jwt": jwt
251        });
252        Client::do_login(self, base_url, url, data).await
253    }
254}
255
256impl Default for ClientBuilder {
257    fn default() -> Self {
258        Self::new()
259    }
260}
261
262/// The `Client` is used for all interaction with Control Tower or Edge Enforcer instances.
263/// Use one of the login functions to create an instance.
264#[derive(Clone)]
265pub struct Client {
266    base_url: url::Url,
267    websocket_url: url::Url,
268    state: std::sync::Arc<tokio::sync::Mutex<ClientState>>,
269    #[expect(clippy::struct_field_names, reason = "easier to read")]
270    http_client: reqwest::Client,
271    tls_ca: tokio_rustls::rustls::RootCertStore,
272    disable_cert_verification: bool,
273}
274
275impl core::fmt::Debug for Client {
276    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
277        f.debug_struct("Client")
278            .field("base_url", &self.base_url)
279            .field("websocket_url", &self.websocket_url)
280            .field("state", &self.state)
281            .field("http_client", &self.http_client)
282            .field("disable_cert_verification", &self.disable_cert_verification)
283            .finish_non_exhaustive()
284    }
285}
286
287impl Client {
288    /// Create a Client builder.
289    #[must_use]
290    pub fn builder() -> ClientBuilder {
291        ClientBuilder::new()
292    }
293
294    /// Returns true if the token has expired.
295    pub async fn token_is_expired(&self) -> bool {
296        let state = self.state.lock().await;
297        chrono::Local::now() > state.login_token.expires
298    }
299
300    pub async fn token_expires(&self) -> chrono::DateTime<chrono::Utc> {
301        let state = self.state.lock().await;
302        state.login_token.expires
303    }
304
305    /// Returns the JWT SVID if exists.
306    pub async fn jwt_svid(&self) -> Option<String> {
307        let state = self.state.lock().await;
308        state.login_token.jwt_svid.clone()
309    }
310
311    pub async fn x509_svid(&self) -> Option<X509SVID> {
312        let state = self.state.lock().await;
313        state.login_token.x509_svid.clone()
314    }
315
316    async fn do_login(
317        builder: &ClientBuilder,
318        base_url: url::Url,
319        url: url::Url,
320        payload: serde_json::Value,
321    ) -> Result<Self> {
322        let json = serde_json::to_string(&payload)?;
323        let client = Self::reqwest_client(builder)?;
324        let result = client
325            .post(url)
326            .header("content-type", "application/json")
327            .body(json)
328            .send()
329            .await?;
330
331        if result.status().is_success() {
332            let login_token = result.json().await?;
333            Self::new(builder, client, base_url, login_token)
334        } else {
335            let text = result.text().await?;
336            tracing::debug!("login returned {}", text);
337            Err(Error::LoginFailure(text))
338        }
339    }
340
341    fn reqwest_client(builder: &ClientBuilder) -> Result<reqwest::Client> {
342        let reqwest_client_builder = reqwest::Client::builder().use_rustls_tls();
343
344        // Add CA certificates
345        let reqwest_client_builder = builder
346            .reqwest_ca
347            .iter()
348            .fold(reqwest_client_builder, |reqwest_client_builder, ca| {
349                reqwest_client_builder.add_root_certificate(ca.clone())
350            });
351
352        tracing::debug!("Added {} CA certs", builder.reqwest_ca.len());
353
354        let reqwest_client_builder =
355            reqwest_client_builder.danger_accept_invalid_certs(builder.disable_cert_verification);
356
357        let reqwest_client_builder =
358            reqwest_client_builder.connection_verbose(builder.connection_verbose);
359
360        let reqwest_client_builder = if let Some(duration) = builder.timeout {
361            reqwest_client_builder.timeout(duration)
362        } else {
363            reqwest_client_builder
364        };
365
366        let reqwest_client_builder = if let Some(duration) = builder.connect_timeout {
367            reqwest_client_builder.connect_timeout(duration)
368        } else {
369            reqwest_client_builder
370        };
371
372        let client = reqwest_client_builder.build()?;
373        Ok(client)
374    }
375
376    fn new(
377        builder: &ClientBuilder,
378        client: reqwest::Client,
379        base_url: url::Url,
380        login_token: LoginToken,
381    ) -> Result<Self> {
382        let websocket_url = url::Url::parse(&format!("wss://{}/v1/ws/", base_url.host_port()?))?;
383
384        let renew_at = login_token.renew_at();
385
386        let state = std::sync::Arc::new(tokio::sync::Mutex::new(ClientState { login_token }));
387
388        let weak_state = std::sync::Arc::downgrade(&state);
389        let refresh_url = base_url.join("/v1/state/strongbox/token/refresh")?;
390
391        if builder.auto_renew_token {
392            tokio::spawn(renew_token_task(
393                weak_state,
394                renew_at,
395                client.clone(),
396                refresh_url,
397            ));
398        }
399
400        Ok(Self {
401            http_client: client,
402            tls_ca: builder.tls_ca.clone(),
403            disable_cert_verification: builder.disable_cert_verification,
404            base_url,
405            websocket_url,
406            state,
407        })
408    }
409
410    /// Returns the login bearer token.
411    pub async fn bearer_token(&self) -> String {
412        let state = self.state.lock().await;
413        state.login_token.token.clone()
414    }
415
416    /// GET a json payload from the REST API.
417    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
418    pub async fn get_json<T: serde::de::DeserializeOwned>(
419        &self,
420        path: &str,
421        query_params: Option<&[(&str, &str)]>,
422    ) -> Result<T> {
423        let url = self.base_url.join(path)?;
424
425        let token = self.bearer_token().await;
426
427        let mut builder = self
428            .http_client
429            .get(url)
430            .bearer_auth(&token)
431            .header("Accept", "application/json");
432        if let Some(qp) = query_params {
433            builder = builder.query(qp);
434        }
435
436        let result = builder.send().await?;
437
438        if result.status().is_success() {
439            let res = result.json().await?;
440            Ok(res)
441        } else {
442            let status = result.status();
443            let error_payload = result
444                .text()
445                .await
446                .unwrap_or_else(|_| "No error payload".to_owned());
447            Err(Error::WebServer(
448                status.as_u16(),
449                status.to_string(),
450                error_payload,
451            ))
452        }
453    }
454
455    /// GET a bytes payload from the REST API.
456    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
457    pub async fn get_bytes(
458        &self,
459        path: &str,
460        query_params: Option<&[(&str, &str)]>,
461    ) -> Result<Bytes> {
462        let url = self.base_url.join(path)?;
463
464        let token = self.bearer_token().await;
465
466        let mut builder = self.http_client.get(url).bearer_auth(&token);
467
468        if let Some(qp) = query_params {
469            builder = builder.query(qp);
470        }
471
472        let result = builder.send().await?;
473
474        if result.status().is_success() {
475            let res = result.bytes().await?;
476            Ok(res)
477        } else {
478            let status = result.status();
479            let error_payload = result
480                .text()
481                .await
482                .unwrap_or_else(|_| "No error payload".to_owned());
483            Err(Error::WebServer(
484                status.as_u16(),
485                status.to_string(),
486                error_payload,
487            ))
488        }
489    }
490
491    /// POST arbitrary JSON to a path.
492    /// # Panics
493    ///   never.
494    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
495    #[expect(clippy::expect_used, reason = "Vec len checked")]
496    pub async fn post_json(
497        &self,
498        path: &str,
499        data: &serde_json::Value,
500    ) -> Result<serde_json::Value> {
501        let url = self.base_url.join(path)?;
502        let token = self.bearer_token().await;
503
504        tracing::debug!("POST {} {:?}", url, data);
505
506        let result = self
507            .http_client
508            .post(url)
509            .json(&data)
510            .bearer_auth(&token)
511            .send()
512            .await?;
513
514        if result.status().is_success() {
515            let resp = result.bytes().await?;
516
517            let decoder = serde_json::Deserializer::from_slice(&resp);
518
519            let responses: core::result::Result<Vec<serde_json::Value>, _> =
520                decoder.into_iter().collect();
521            let mut responses = responses?;
522
523            match responses.len() {
524                0 => Ok(serde_json::Value::Object(serde_json::Map::default())),
525                1 => Ok(responses.pop().expect("We know len == 1")),
526                _ => {
527                    // Convert to a JSON array
528                    Ok(serde_json::Value::Array(responses))
529                }
530            }
531        } else {
532            tracing::error!("POST call failed");
533            let status = result.status();
534            let resp = result.json().await;
535            match resp {
536                Ok(resp) => Err(Error::REST(resp)),
537                Err(_) => Err(Error::WebServer(
538                    status.as_u16(),
539                    status.to_string(),
540                    "Failed to get JSON responses".to_owned(),
541                )),
542            }
543        }
544    }
545
546    /// PUT arbitrary data to a path.
547    #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
548    pub async fn put<T: Into<reqwest::Body> + core::fmt::Debug>(
549        &self,
550        path: &str,
551        content_type: &str,
552        data: T,
553    ) -> Result<()> {
554        let url = self.base_url.join(path)?;
555        let token = self.state.lock().await.login_token.token.clone();
556
557        tracing::debug!("PUT {} {:?}", url, data);
558
559        let result = self
560            .http_client
561            .put(url)
562            .header(reqwest::header::CONTENT_TYPE, content_type)
563            .body(data)
564            .bearer_auth(&token)
565            .send()
566            .await?;
567
568        //#[allow(clippy::redundant_closure_for_method_calls)]
569        if result.status().is_success() {
570            Ok(())
571        } else {
572            tracing::error!("PUT call failed");
573            let status = result.status();
574            let resp = result.json().await;
575            match resp {
576                Ok(resp) => Err(Error::REST(resp)),
577                Err(_) => Err(Error::WebServer(
578                    status.as_u16(),
579                    status.to_string(),
580                    "Failed to get JSON reply".to_owned(),
581                )),
582            }
583        }
584    }
585
586    /// PUT arbitrary JSON to a path.
587    /// # Errors
588    /// `crate::error::Error`
589    pub async fn put_json(
590        &self,
591        path: &str,
592        data: &serde_json::Value,
593    ) -> Result<serde_json::Value> {
594        let url = self.base_url.join(path)?;
595        let token = self.state.lock().await.login_token.token.clone();
596
597        tracing::debug!("PUT {} {:?}", url, data);
598
599        let result = self
600            .http_client
601            .put(url)
602            .json(&data)
603            .bearer_auth(&token)
604            .send()
605            .await?;
606
607        // #[allow(clippy::redundant_closure_for_method_calls)]
608        if result.status().is_success() {
609            use core::error::Error as _;
610            let resp = result.json().await.or_else(|err| match err {
611                decode_err if err.is_decode() => {
612                    match decode_err
613                        .source()
614                        .and_then(|source_err| source_err.downcast_ref::<serde_json::Error>())
615                    {
616                        Some(eof_err) if eof_err.is_eof() => {
617                            Ok(serde_json::Value::Object(serde_json::Map::new()))
618                        }
619                        _ => Err(decode_err),
620                    }
621                }
622                other_err => Err(other_err),
623            })?;
624            Ok(resp)
625        } else {
626            tracing::error!("PUT call failed");
627            let status = result.status();
628            let resp = result.json().await;
629            match resp {
630                Ok(resp) => Err(Error::REST(resp)),
631                Err(_) => Err(Error::WebServer(
632                    status.as_u16(),
633                    status.to_string(),
634                    "Failed to get JSON reply".to_owned(),
635                )),
636            }
637        }
638    }
639
640    /// Open a volga producer on a topic.
641    /// # Errors
642    /// `crate::error::Error`
643    pub async fn volga_open_producer(
644        &self,
645        producer_name: &str,
646        topic: &str,
647        on_no_exists: volga::OnNoExists,
648    ) -> Result<volga::producer::Producer> {
649        crate::volga::producer::Builder::new(self, producer_name, topic, on_no_exists)?
650            .connect()
651            .await
652    }
653
654    /// Open a volga NAT producer on a topic in a site.
655    /// # Errors
656    /// `crate::error::Error`
657    pub async fn volga_open_child_site_producer(
658        &self,
659        producer_name: &str,
660        topic: &str,
661        site: &str,
662        on_no_exists: volga::OnNoExists,
663    ) -> Result<volga::producer::Producer> {
664        crate::volga::producer::Builder::new_child(self, producer_name, topic, site, on_no_exists)?
665            .connect()
666            .await
667    }
668
669    /// Open a volga producer on a topic in the parent site.
670    /// # Errors
671    /// `crate::error::Error`
672    pub async fn volga_open_parent_site_producer(
673        &self,
674        producer_name: &str,
675        topic: &str,
676        on_no_exists: volga::OnNoExists,
677    ) -> Result<volga::producer::Producer> {
678        crate::volga::producer::Builder::new_parent(self, producer_name, topic, on_no_exists)?
679            .connect()
680            .await
681    }
682
683    /// Creates and opens a Volga consumer.
684    /// # Errors
685    /// `crate::error::Error`
686    #[tracing::instrument(skip(self))]
687    pub async fn volga_open_consumer(
688        &self,
689        consumer_name: &str,
690        topic: &str,
691        options: crate::volga::consumer::Options<'_>,
692    ) -> Result<volga::consumer::Consumer> {
693        crate::volga::consumer::Builder::new(self, consumer_name, topic)?
694            .set_options(options)
695            .connect()
696            .await
697    }
698
699    /// Creates and opens a Volga consumer on a child site.
700    /// # Errors
701    /// `crate::error::Error`
702    pub async fn volga_open_child_site_consumer(
703        &self,
704        consumer_name: &str,
705        topic: &str,
706        site: &str,
707        options: crate::volga::consumer::Options<'_>,
708    ) -> Result<volga::consumer::Consumer> {
709        crate::volga::consumer::Builder::new_child(self, consumer_name, topic, site)?
710            .set_options(options)
711            .connect()
712            .await
713    }
714
715    /// Creates and opens a Volga consumer on the parent site.
716    /// # Errors
717    /// `crate::error::Error`
718    pub async fn volga_open_parent_site_consumer(
719        &self,
720        consumer_name: &str,
721        topic: &str,
722        options: crate::volga::consumer::Options<'_>,
723    ) -> Result<volga::consumer::Consumer> {
724        crate::volga::consumer::Builder::new_parent(self, consumer_name, topic)?
725            .set_options(options)
726            .connect()
727            .await
728    }
729
730    #[tracing::instrument(skip(self))]
731    pub(crate) async fn open_tls_stream(
732        &self,
733    ) -> Result<tokio_rustls::client::TlsStream<tokio::net::TcpStream>> {
734        let mut client_config = tokio_rustls::rustls::ClientConfig::builder()
735            .with_root_certificates(self.tls_ca.clone())
736            .with_no_client_auth();
737
738        if self.disable_cert_verification {
739            let mut danger = client_config.dangerous();
740
741            danger.set_certificate_verifier(std::sync::Arc::new(CertificateVerifier));
742        }
743
744        let client_config = std::sync::Arc::new(client_config);
745
746        let connector: tokio_rustls::TlsConnector = client_config.into();
747        let addrs = self.websocket_url.socket_addrs(|| None)?;
748        let stream = tokio::net::TcpStream::connect(&*addrs).await?;
749
750        let server_name = tokio_rustls::rustls::pki_types::ServerName::try_from(
751            self.websocket_url
752                .host_str()
753                .ok_or_else(|| Error::general("No websocket_url host"))?
754                .to_owned(),
755        )?;
756        let stream = connector.connect(server_name, stream).await?;
757        Ok(stream)
758    }
759
760    /// Opens a query stream.
761    /// # Errors
762    /// `crate::error::Error`
763    pub async fn volga_query_topic(
764        &self,
765        query: volga::query_topic::Query,
766    ) -> Result<volga::query_topic::QueryStream> {
767        volga::query_topic::QueryStream::new(self, query).await
768    }
769
770    /// Setup a connection to a remote container.
771    /// # Errors
772    /// `crate::error::Error`
773    pub async fn connect(
774        &self,
775        application: &str,
776        service_instance: &str,
777        site: &str,
778        protocol: crate::app_connect::Protocol,
779        port: u16,
780        ip_address: Option<core::net::IpAddr>,
781    ) -> Result<crate::app_connect::Connection> {
782        crate::app_connect::connect(
783            self,
784            application,
785            service_instance,
786            site,
787            protocol,
788            port,
789            ip_address,
790        )
791        .await
792    }
793
794    pub(crate) fn get_websocket_url(&self) -> &url::Url {
795        &self.websocket_url
796    }
797
798    pub(crate) fn get_base_url(&self) -> &url::Url {
799        &self.base_url
800    }
801}
802
803#[derive(Debug)]
804struct CertificateVerifier;
805
806#[expect(clippy::missing_trait_methods, reason="Provided by the trait")]
807impl tokio_rustls::rustls::client::danger::ServerCertVerifier for CertificateVerifier {
808    fn verify_server_cert(
809        &self,
810        _end_entity: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
811        _intermediates: &[tokio_rustls::rustls::pki_types::CertificateDer<'_>],
812        _server_name: &tokio_rustls::rustls::pki_types::ServerName<'_>,
813        _ocsp_response: &[u8],
814        _now: tokio_rustls::rustls::pki_types::UnixTime,
815    ) -> core::result::Result<
816        tokio_rustls::rustls::client::danger::ServerCertVerified,
817        tokio_rustls::rustls::Error,
818    > {
819        Ok(tokio_rustls::rustls::client::danger::ServerCertVerified::assertion())
820    }
821
822    fn verify_tls12_signature(
823        &self,
824        _message: &[u8],
825        _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
826        _dss: &tokio_rustls::rustls::DigitallySignedStruct,
827    ) -> core::result::Result<
828        tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
829        tokio_rustls::rustls::Error,
830    > {
831        Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
832    }
833
834    fn verify_tls13_signature(
835        &self,
836        _message: &[u8],
837        _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
838        _dss: &tokio_rustls::rustls::DigitallySignedStruct,
839    ) -> core::result::Result<
840        tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
841        tokio_rustls::rustls::Error,
842    > {
843        Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
844    }
845
846    fn supported_verify_schemes(&self) -> Vec<tokio_rustls::rustls::SignatureScheme> {
847        vec![
848            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA1,
849            tokio_rustls::rustls::SignatureScheme::ECDSA_SHA1_Legacy,
850            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA256,
851            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
852            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA384,
853            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
854            tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA512,
855            tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
856            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA256,
857            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA384,
858            tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA512,
859            tokio_rustls::rustls::SignatureScheme::ED25519,
860            tokio_rustls::rustls::SignatureScheme::ED448,
861        ]
862    }
863}
864
865pub(crate) trait URLExt {
866    fn host_port(&self) -> core::result::Result<String, url::ParseError>;
867}
868
869impl URLExt for url::Url {
870    fn host_port(&self) -> core::result::Result<String, url::ParseError> {
871        let host = self.host_str().ok_or(url::ParseError::EmptyHost)?;
872        Ok(match (host, self.port()) {
873            (host, Some(port)) => format!("{host}:{port}"),
874            (host, _) => host.to_owned(),
875        })
876    }
877}
878
879#[tracing::instrument(skip(next_renew_at, weak_state, client, refresh_url))]
880#[expect(clippy::arithmetic_side_effects, reason="Ok to subtract now")]
881#[expect(clippy::single_call_fn, reason="Don't want to inline")]
882async fn renew_token_task(
883    weak_state: std::sync::Weak<tokio::sync::Mutex<ClientState>>,
884    mut next_renew_at: chrono::DateTime<chrono::Utc>,
885    client: reqwest::Client,
886    refresh_url: url::Url,
887) {
888    loop {
889        let now: chrono::DateTime<_> = chrono::Local::now().into();
890
891        let sleep_time = next_renew_at - now;
892
893        tracing::info!("renew token in {sleep_time} ({})", next_renew_at);
894
895        tokio::time::sleep(
896            sleep_time
897                .to_std()
898                .unwrap_or_else(|_| core::time::Duration::from_secs(0)),
899        )
900        .await;
901
902        if let Some(state) = weak_state.upgrade() {
903            let mut state = state.lock().await;
904
905            if now > state.login_token.expires {
906                tracing::error!("Token is expired and we can't renew. Giving up");
907                break;
908            }
909            let response = client
910                .post(refresh_url.clone())
911                .bearer_auth(&state.login_token.token)
912                .send()
913                .await;
914
915            let response = match response {
916                Ok(resp) => resp,
917                Err(err) => {
918                    tracing::error!("Failed to renew token: {err}");
919                    next_renew_at = chrono::Utc::now() + chrono::Duration::seconds(1);
920                    continue;
921                }
922            };
923
924            let Ok(text) = response.text().await else {
925                tracing::error!("Failed to get token renew response text");
926                continue;
927            };
928            let new_login_token = serde_json::from_str::<LoginToken>(&text);
929
930            match new_login_token {
931                Ok(new_login_token) => {
932                    next_renew_at = new_login_token.renew_at();
933                    state.login_token = new_login_token;
934                    tracing::info!(
935                        "Successfully renewed token, expires: {}",
936                        state.login_token.expires
937                    );
938                }
939                Err(err) => {
940                    tracing::error!("Failed to parse or get token: {err}");
941                    // After failure, we check every second
942                    next_renew_at = chrono::Utc::now() + chrono::Duration::seconds(1);
943                }
944            }
945        } else {
946            tracing::debug!("renew_token: State lost");
947            // If we can't get the state, the client is gone and we should go as well
948            break;
949        }
950    }
951}
952
953#[cfg(test)]
954mod test {
955    #[test]
956    #[expect(clippy::shadow_unrelated, reason="Test")]
957    fn url_ext() {
958        use super::URLExt as _;
959        let url = url::Url::parse("https://1.2.3.4:5000/a/b/c").unwrap();
960        let host_port = url.host_port().unwrap();
961        assert_eq!(&host_port, "1.2.3.4:5000");
962
963        let url = url::Url::parse("https://1.2.3.4/a/b/c").unwrap();
964        let host_port = url.host_port().unwrap();
965        assert_eq!(&host_port, "1.2.3.4");
966
967        let url = url::Url::parse("https://www.avassa.com/a/b/c").unwrap();
968        let host_port = url.host_port().unwrap();
969        assert_eq!(&host_port, "www.avassa.com");
970
971        let url = url::Url::parse("https://www.avassa.com:1234/a/b/c").unwrap();
972        let host_port = url.host_port().unwrap();
973        assert_eq!(&host_port, "www.avassa.com:1234");
974    }
975}