1use crate::volga;
2use crate::{Error, Result};
3use bytes::Bytes;
4use serde_json::json;
5
6#[derive(Clone, serde::Deserialize)]
8#[serde(rename_all = "kebab-case")]
9#[non_exhaustive]
10pub struct X509SVID {
11 pub cert: String,
12 pub private_key: String,
13}
14
15#[derive(Clone, serde::Deserialize)]
16#[serde(rename_all = "kebab-case")]
17struct LoginToken {
18 token: String,
19 expires_in: i64,
20 expires: chrono::DateTime<chrono::Utc>,
21 creation_time: chrono::DateTime<chrono::Utc>,
22 jwt_svid: Option<String>,
23 x509_svid: Option<X509SVID>,
24}
25
26impl LoginToken {
27 #[expect(clippy::integer_division, reason = "Good enough precision")]
28 #[expect(
29 clippy::integer_division_remainder_used,
30 reason = "Good enough precision"
31 )]
32 fn renew_at(&self) -> chrono::DateTime<chrono::Utc> {
33 let quarter_expiry = self.expires_in / 4;
34 self.expires
35 .checked_sub_signed(chrono::Duration::seconds(quarter_expiry))
36 .unwrap_or_else(chrono::Utc::now)
37 }
38}
39
40impl core::fmt::Debug for LoginToken {
41 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
42 f.debug_struct("LoginToken")
43 .field("creation_time", &self.creation_time)
44 .field("expires_in", &self.expires_in)
45 .field("expires", &self.expires)
46 .field("jwt-svid", &self.jwt_svid.is_some())
47 .field("x509-svid", &self.x509_svid.is_some())
48 .finish_non_exhaustive()
49 }
50}
51
52#[derive(Debug)]
53struct ClientState {
54 login_token: LoginToken,
55}
56
57#[derive(Clone)]
59#[expect(clippy::module_name_repetitions, reason = "Backwards compat")]
60pub struct ClientBuilder {
61 reqwest_ca: Vec<reqwest::Certificate>,
62 tls_ca: tokio_rustls::rustls::RootCertStore,
63 disable_cert_verification: bool,
64 connection_verbose: bool,
65 auto_renew_token: bool,
66 timeout: Option<core::time::Duration>,
67 connect_timeout: Option<core::time::Duration>,
68}
69
70impl ClientBuilder {
71 #[must_use]
73 pub(crate) fn new() -> Self {
74 let tls_ca = webpki_roots::TLS_SERVER_ROOTS.iter().cloned().collect();
75 Self {
76 reqwest_ca: Vec::new(),
77 tls_ca,
78 disable_cert_verification: false,
79 connection_verbose: false,
80 auto_renew_token: true,
81 timeout: None,
82 connect_timeout: None,
83 }
84 }
85
86 #[must_use]
88 #[inline]
89 pub fn timeout(self, timeout: core::time::Duration) -> Self {
90 Self {
91 timeout: Some(timeout),
92 ..self
93 }
94 }
95
96 #[must_use]
98 #[inline]
99 pub fn connection_timeout(self, timeout: core::time::Duration) -> Self {
100 Self {
101 connect_timeout: Some(timeout),
102 ..self
103 }
104 }
105
106 #[inline]
108 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
109 pub fn add_root_certificate(mut self, cert: &[u8]) -> Result<Self> {
110 use core::iter;
111 let r_ca = reqwest::Certificate::from_pem(cert)?;
112 let mut ca_reader = std::io::BufReader::new(cert);
113 for item in iter::from_fn(|| rustls_pemfile::read_one(&mut ca_reader).transpose()) {
114 if let rustls_pemfile::Item::X509Certificate(ca_cert) = item? {
115 self.tls_ca.add(ca_cert)?;
116 }
117 }
118 self.reqwest_ca.push(r_ca);
119 Ok(self)
120 }
121
122 #[must_use]
124 #[inline]
125 pub fn danger_disable_cert_verification(self) -> Self {
126 Self {
127 disable_cert_verification: true,
128 ..self
129 }
130 }
131
132 #[must_use]
135 #[inline]
136 pub fn enable_verbose_connection(self) -> Self {
137 Self {
138 connection_verbose: true,
139 ..self
140 }
141 }
142
143 #[must_use]
145 #[inline]
146 pub fn disable_token_auto_renewal(self) -> Self {
147 Self {
148 auto_renew_token: false,
149 ..self
150 }
151 }
152
153 #[deprecated]
157 #[inline]
158 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
159 pub async fn application_login(&self, host: &str, approle_id: Option<&str>) -> Result<Client> {
160 let secret_id = std::env::var("APPROLE_SECRET_ID")
161 .map_err(|_| Error::LoginFailureMissingEnv(String::from("APPROLE_SECRET_ID")))?;
162
163 let role_id = approle_id.unwrap_or(&secret_id);
165
166 let base_url = url::Url::parse(host)?;
167 let url = base_url.join("v1/approle-login")?;
168 let data = json!({
169 "role-id": role_id,
170 "secret-id": secret_id,
171 });
172 Client::do_login(self, base_url, url, data).await
173 }
174
175 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
177 pub async fn approle_login(
178 &self,
179 host: &str,
180 secret_id: &str,
181 role_id: Option<&str>,
182 ) -> Result<Client> {
183 let role_id = role_id.unwrap_or(secret_id);
185
186 let base_url = url::Url::parse(host)?;
187 let url = base_url.join("v1/approle-login")?;
188 let data = json!({
189 "role-id": role_id,
190 "secret-id": secret_id,
191 });
192 Client::do_login(self, base_url, url, data).await
193 }
194
195 #[tracing::instrument(skip(self, password))]
198 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
199 pub async fn login(&self, host: &str, username: &str, password: &str) -> Result<Client> {
200 let base_url = url::Url::parse(host)?;
201 let url = base_url.join("v1/login")?;
202
203 let data = json!({
205 "username":username,
206 "password":password
207 });
208 Client::do_login(self, base_url, url, data).await
209 }
210
211 #[tracing::instrument(skip(self, token))]
213 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
214 #[expect(clippy::arithmetic_side_effects, reason = "Ok to add one second")]
215 pub fn token_login(&self, host: &str, token: &str) -> Result<Client> {
216 let base_url = url::Url::parse(host)?;
217 let client = Client::reqwest_client(self)?;
218 let creation_time = chrono::Local::now().into();
219 let expires = creation_time + chrono::Duration::seconds(1);
220
221 let login_token = LoginToken {
222 token: token.to_owned(),
223 expires_in: 1,
224 creation_time,
225 expires,
226 jwt_svid: None,
227 x509_svid: None,
228 };
229
230 Client::new(self, client, base_url, login_token)
231 }
232
233 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
235 pub async fn jwt_login(
236 &self,
237 host: &str,
238 tenant_name: &str,
239 jwt_auth: &str,
240 role: &str,
241 jwt: &str,
242 ) -> Result<Client> {
243 let base_url = url::Url::parse(host)?;
244 let url = base_url.join("v1/jwt-login")?;
245
246 let data = json!({
247 "tenant": tenant_name,
248 "jwt-auth": jwt_auth,
249 "role": role,
250 "jwt": jwt
251 });
252 Client::do_login(self, base_url, url, data).await
253 }
254}
255
256impl Default for ClientBuilder {
257 fn default() -> Self {
258 Self::new()
259 }
260}
261
262#[derive(Clone)]
265pub struct Client {
266 base_url: url::Url,
267 websocket_url: url::Url,
268 state: std::sync::Arc<tokio::sync::Mutex<ClientState>>,
269 #[expect(clippy::struct_field_names, reason = "easier to read")]
270 http_client: reqwest::Client,
271 tls_ca: tokio_rustls::rustls::RootCertStore,
272 disable_cert_verification: bool,
273}
274
275impl core::fmt::Debug for Client {
276 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
277 f.debug_struct("Client")
278 .field("base_url", &self.base_url)
279 .field("websocket_url", &self.websocket_url)
280 .field("state", &self.state)
281 .field("http_client", &self.http_client)
282 .field("disable_cert_verification", &self.disable_cert_verification)
283 .finish_non_exhaustive()
284 }
285}
286
287impl Client {
288 #[must_use]
290 pub fn builder() -> ClientBuilder {
291 ClientBuilder::new()
292 }
293
294 pub async fn token_is_expired(&self) -> bool {
296 let state = self.state.lock().await;
297 chrono::Local::now() > state.login_token.expires
298 }
299
300 pub async fn token_expires(&self) -> chrono::DateTime<chrono::Utc> {
301 let state = self.state.lock().await;
302 state.login_token.expires
303 }
304
305 pub async fn jwt_svid(&self) -> Option<String> {
307 let state = self.state.lock().await;
308 state.login_token.jwt_svid.clone()
309 }
310
311 pub async fn x509_svid(&self) -> Option<X509SVID> {
312 let state = self.state.lock().await;
313 state.login_token.x509_svid.clone()
314 }
315
316 async fn do_login(
317 builder: &ClientBuilder,
318 base_url: url::Url,
319 url: url::Url,
320 payload: serde_json::Value,
321 ) -> Result<Self> {
322 let json = serde_json::to_string(&payload)?;
323 let client = Self::reqwest_client(builder)?;
324 let result = client
325 .post(url)
326 .header("content-type", "application/json")
327 .body(json)
328 .send()
329 .await?;
330
331 if result.status().is_success() {
332 let login_token = result.json().await?;
333 Self::new(builder, client, base_url, login_token)
334 } else {
335 let text = result.text().await?;
336 tracing::debug!("login returned {}", text);
337 Err(Error::LoginFailure(text))
338 }
339 }
340
341 fn reqwest_client(builder: &ClientBuilder) -> Result<reqwest::Client> {
342 let reqwest_client_builder = reqwest::Client::builder().use_rustls_tls();
343
344 let reqwest_client_builder = builder
346 .reqwest_ca
347 .iter()
348 .fold(reqwest_client_builder, |reqwest_client_builder, ca| {
349 reqwest_client_builder.add_root_certificate(ca.clone())
350 });
351
352 tracing::debug!("Added {} CA certs", builder.reqwest_ca.len());
353
354 let reqwest_client_builder =
355 reqwest_client_builder.danger_accept_invalid_certs(builder.disable_cert_verification);
356
357 let reqwest_client_builder =
358 reqwest_client_builder.connection_verbose(builder.connection_verbose);
359
360 let reqwest_client_builder = if let Some(duration) = builder.timeout {
361 reqwest_client_builder.timeout(duration)
362 } else {
363 reqwest_client_builder
364 };
365
366 let reqwest_client_builder = if let Some(duration) = builder.connect_timeout {
367 reqwest_client_builder.connect_timeout(duration)
368 } else {
369 reqwest_client_builder
370 };
371
372 let client = reqwest_client_builder.build()?;
373 Ok(client)
374 }
375
376 fn new(
377 builder: &ClientBuilder,
378 client: reqwest::Client,
379 base_url: url::Url,
380 login_token: LoginToken,
381 ) -> Result<Self> {
382 let websocket_url = url::Url::parse(&format!("wss://{}/v1/ws/", base_url.host_port()?))?;
383
384 let renew_at = login_token.renew_at();
385
386 let state = std::sync::Arc::new(tokio::sync::Mutex::new(ClientState { login_token }));
387
388 let weak_state = std::sync::Arc::downgrade(&state);
389 let refresh_url = base_url.join("/v1/state/strongbox/token/refresh")?;
390
391 if builder.auto_renew_token {
392 tokio::spawn(renew_token_task(
393 weak_state,
394 renew_at,
395 client.clone(),
396 refresh_url,
397 ));
398 }
399
400 Ok(Self {
401 http_client: client,
402 tls_ca: builder.tls_ca.clone(),
403 disable_cert_verification: builder.disable_cert_verification,
404 base_url,
405 websocket_url,
406 state,
407 })
408 }
409
410 pub async fn bearer_token(&self) -> String {
412 let state = self.state.lock().await;
413 state.login_token.token.clone()
414 }
415
416 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
418 pub async fn get_json<T: serde::de::DeserializeOwned>(
419 &self,
420 path: &str,
421 query_params: Option<&[(&str, &str)]>,
422 ) -> Result<T> {
423 let url = self.base_url.join(path)?;
424
425 let token = self.bearer_token().await;
426
427 let mut builder = self
428 .http_client
429 .get(url)
430 .bearer_auth(&token)
431 .header("Accept", "application/json");
432 if let Some(qp) = query_params {
433 builder = builder.query(qp);
434 }
435
436 let result = builder.send().await?;
437
438 if result.status().is_success() {
439 let res = result.json().await?;
440 Ok(res)
441 } else {
442 let status = result.status();
443 let error_payload = result
444 .text()
445 .await
446 .unwrap_or_else(|_| "No error payload".to_owned());
447 Err(Error::WebServer(
448 status.as_u16(),
449 status.to_string(),
450 error_payload,
451 ))
452 }
453 }
454
455 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
457 pub async fn get_bytes(
458 &self,
459 path: &str,
460 query_params: Option<&[(&str, &str)]>,
461 ) -> Result<Bytes> {
462 let url = self.base_url.join(path)?;
463
464 let token = self.bearer_token().await;
465
466 let mut builder = self.http_client.get(url).bearer_auth(&token);
467
468 if let Some(qp) = query_params {
469 builder = builder.query(qp);
470 }
471
472 let result = builder.send().await?;
473
474 if result.status().is_success() {
475 let res = result.bytes().await?;
476 Ok(res)
477 } else {
478 let status = result.status();
479 let error_payload = result
480 .text()
481 .await
482 .unwrap_or_else(|_| "No error payload".to_owned());
483 Err(Error::WebServer(
484 status.as_u16(),
485 status.to_string(),
486 error_payload,
487 ))
488 }
489 }
490
491 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
495 #[expect(clippy::expect_used, reason = "Vec len checked")]
496 pub async fn post_json(
497 &self,
498 path: &str,
499 data: &serde_json::Value,
500 ) -> Result<serde_json::Value> {
501 let url = self.base_url.join(path)?;
502 let token = self.bearer_token().await;
503
504 tracing::debug!("POST {} {:?}", url, data);
505
506 let result = self
507 .http_client
508 .post(url)
509 .json(&data)
510 .bearer_auth(&token)
511 .send()
512 .await?;
513
514 if result.status().is_success() {
515 let resp = result.bytes().await?;
516
517 let decoder = serde_json::Deserializer::from_slice(&resp);
518
519 let responses: core::result::Result<Vec<serde_json::Value>, _> =
520 decoder.into_iter().collect();
521 let mut responses = responses?;
522
523 match responses.len() {
524 0 => Ok(serde_json::Value::Object(serde_json::Map::default())),
525 1 => Ok(responses.pop().expect("We know len == 1")),
526 _ => {
527 Ok(serde_json::Value::Array(responses))
529 }
530 }
531 } else {
532 tracing::error!("POST call failed");
533 let status = result.status();
534 let resp = result.json().await;
535 match resp {
536 Ok(resp) => Err(Error::REST(resp)),
537 Err(_) => Err(Error::WebServer(
538 status.as_u16(),
539 status.to_string(),
540 "Failed to get JSON responses".to_owned(),
541 )),
542 }
543 }
544 }
545
546 #[expect(clippy::missing_errors_doc, reason = "Convered in the Error type")]
548 pub async fn put<T: Into<reqwest::Body> + core::fmt::Debug>(
549 &self,
550 path: &str,
551 content_type: &str,
552 data: T,
553 ) -> Result<()> {
554 let url = self.base_url.join(path)?;
555 let token = self.state.lock().await.login_token.token.clone();
556
557 tracing::debug!("PUT {} {:?}", url, data);
558
559 let result = self
560 .http_client
561 .put(url)
562 .header(reqwest::header::CONTENT_TYPE, content_type)
563 .body(data)
564 .bearer_auth(&token)
565 .send()
566 .await?;
567
568 if result.status().is_success() {
570 Ok(())
571 } else {
572 tracing::error!("PUT call failed");
573 let status = result.status();
574 let resp = result.json().await;
575 match resp {
576 Ok(resp) => Err(Error::REST(resp)),
577 Err(_) => Err(Error::WebServer(
578 status.as_u16(),
579 status.to_string(),
580 "Failed to get JSON reply".to_owned(),
581 )),
582 }
583 }
584 }
585
586 pub async fn put_json(
590 &self,
591 path: &str,
592 data: &serde_json::Value,
593 ) -> Result<serde_json::Value> {
594 let url = self.base_url.join(path)?;
595 let token = self.state.lock().await.login_token.token.clone();
596
597 tracing::debug!("PUT {} {:?}", url, data);
598
599 let result = self
600 .http_client
601 .put(url)
602 .json(&data)
603 .bearer_auth(&token)
604 .send()
605 .await?;
606
607 if result.status().is_success() {
609 use core::error::Error as _;
610 let resp = result.json().await.or_else(|err| match err {
611 decode_err if err.is_decode() => {
612 match decode_err
613 .source()
614 .and_then(|source_err| source_err.downcast_ref::<serde_json::Error>())
615 {
616 Some(eof_err) if eof_err.is_eof() => {
617 Ok(serde_json::Value::Object(serde_json::Map::new()))
618 }
619 _ => Err(decode_err),
620 }
621 }
622 other_err => Err(other_err),
623 })?;
624 Ok(resp)
625 } else {
626 tracing::error!("PUT call failed");
627 let status = result.status();
628 let resp = result.json().await;
629 match resp {
630 Ok(resp) => Err(Error::REST(resp)),
631 Err(_) => Err(Error::WebServer(
632 status.as_u16(),
633 status.to_string(),
634 "Failed to get JSON reply".to_owned(),
635 )),
636 }
637 }
638 }
639
640 pub async fn volga_open_producer(
644 &self,
645 producer_name: &str,
646 topic: &str,
647 on_no_exists: volga::OnNoExists,
648 ) -> Result<volga::producer::Producer> {
649 crate::volga::producer::Builder::new(self, producer_name, topic, on_no_exists)?
650 .connect()
651 .await
652 }
653
654 pub async fn volga_open_child_site_producer(
658 &self,
659 producer_name: &str,
660 topic: &str,
661 site: &str,
662 on_no_exists: volga::OnNoExists,
663 ) -> Result<volga::producer::Producer> {
664 crate::volga::producer::Builder::new_child(self, producer_name, topic, site, on_no_exists)?
665 .connect()
666 .await
667 }
668
669 pub async fn volga_open_parent_site_producer(
673 &self,
674 producer_name: &str,
675 topic: &str,
676 on_no_exists: volga::OnNoExists,
677 ) -> Result<volga::producer::Producer> {
678 crate::volga::producer::Builder::new_parent(self, producer_name, topic, on_no_exists)?
679 .connect()
680 .await
681 }
682
683 #[tracing::instrument(skip(self))]
687 pub async fn volga_open_consumer(
688 &self,
689 consumer_name: &str,
690 topic: &str,
691 options: crate::volga::consumer::Options<'_>,
692 ) -> Result<volga::consumer::Consumer> {
693 crate::volga::consumer::Builder::new(self, consumer_name, topic)?
694 .set_options(options)
695 .connect()
696 .await
697 }
698
699 pub async fn volga_open_child_site_consumer(
703 &self,
704 consumer_name: &str,
705 topic: &str,
706 site: &str,
707 options: crate::volga::consumer::Options<'_>,
708 ) -> Result<volga::consumer::Consumer> {
709 crate::volga::consumer::Builder::new_child(self, consumer_name, topic, site)?
710 .set_options(options)
711 .connect()
712 .await
713 }
714
715 pub async fn volga_open_parent_site_consumer(
719 &self,
720 consumer_name: &str,
721 topic: &str,
722 options: crate::volga::consumer::Options<'_>,
723 ) -> Result<volga::consumer::Consumer> {
724 crate::volga::consumer::Builder::new_parent(self, consumer_name, topic)?
725 .set_options(options)
726 .connect()
727 .await
728 }
729
730 #[tracing::instrument(skip(self))]
731 pub(crate) async fn open_tls_stream(
732 &self,
733 ) -> Result<tokio_rustls::client::TlsStream<tokio::net::TcpStream>> {
734 let mut client_config = tokio_rustls::rustls::ClientConfig::builder()
735 .with_root_certificates(self.tls_ca.clone())
736 .with_no_client_auth();
737
738 if self.disable_cert_verification {
739 let mut danger = client_config.dangerous();
740
741 danger.set_certificate_verifier(std::sync::Arc::new(CertificateVerifier));
742 }
743
744 let client_config = std::sync::Arc::new(client_config);
745
746 let connector: tokio_rustls::TlsConnector = client_config.into();
747 let addrs = self.websocket_url.socket_addrs(|| None)?;
748 let stream = tokio::net::TcpStream::connect(&*addrs).await?;
749
750 let server_name = tokio_rustls::rustls::pki_types::ServerName::try_from(
751 self.websocket_url
752 .host_str()
753 .ok_or_else(|| Error::general("No websocket_url host"))?
754 .to_owned(),
755 )?;
756 let stream = connector.connect(server_name, stream).await?;
757 Ok(stream)
758 }
759
760 pub async fn volga_query_topic(
764 &self,
765 query: volga::query_topic::Query,
766 ) -> Result<volga::query_topic::QueryStream> {
767 volga::query_topic::QueryStream::new(self, query).await
768 }
769
770 pub async fn connect(
774 &self,
775 application: &str,
776 service_instance: &str,
777 site: &str,
778 protocol: crate::app_connect::Protocol,
779 port: u16,
780 ip_address: Option<core::net::IpAddr>,
781 ) -> Result<crate::app_connect::Connection> {
782 crate::app_connect::connect(
783 self,
784 application,
785 service_instance,
786 site,
787 protocol,
788 port,
789 ip_address,
790 )
791 .await
792 }
793
794 pub(crate) fn get_websocket_url(&self) -> &url::Url {
795 &self.websocket_url
796 }
797
798 pub(crate) fn get_base_url(&self) -> &url::Url {
799 &self.base_url
800 }
801}
802
803#[derive(Debug)]
804struct CertificateVerifier;
805
806#[expect(clippy::missing_trait_methods, reason="Provided by the trait")]
807impl tokio_rustls::rustls::client::danger::ServerCertVerifier for CertificateVerifier {
808 fn verify_server_cert(
809 &self,
810 _end_entity: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
811 _intermediates: &[tokio_rustls::rustls::pki_types::CertificateDer<'_>],
812 _server_name: &tokio_rustls::rustls::pki_types::ServerName<'_>,
813 _ocsp_response: &[u8],
814 _now: tokio_rustls::rustls::pki_types::UnixTime,
815 ) -> core::result::Result<
816 tokio_rustls::rustls::client::danger::ServerCertVerified,
817 tokio_rustls::rustls::Error,
818 > {
819 Ok(tokio_rustls::rustls::client::danger::ServerCertVerified::assertion())
820 }
821
822 fn verify_tls12_signature(
823 &self,
824 _message: &[u8],
825 _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
826 _dss: &tokio_rustls::rustls::DigitallySignedStruct,
827 ) -> core::result::Result<
828 tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
829 tokio_rustls::rustls::Error,
830 > {
831 Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
832 }
833
834 fn verify_tls13_signature(
835 &self,
836 _message: &[u8],
837 _cert: &tokio_rustls::rustls::pki_types::CertificateDer<'_>,
838 _dss: &tokio_rustls::rustls::DigitallySignedStruct,
839 ) -> core::result::Result<
840 tokio_rustls::rustls::client::danger::HandshakeSignatureValid,
841 tokio_rustls::rustls::Error,
842 > {
843 Ok(tokio_rustls::rustls::client::danger::HandshakeSignatureValid::assertion())
844 }
845
846 fn supported_verify_schemes(&self) -> Vec<tokio_rustls::rustls::SignatureScheme> {
847 vec![
848 tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA1,
849 tokio_rustls::rustls::SignatureScheme::ECDSA_SHA1_Legacy,
850 tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA256,
851 tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
852 tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA384,
853 tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
854 tokio_rustls::rustls::SignatureScheme::RSA_PKCS1_SHA512,
855 tokio_rustls::rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
856 tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA256,
857 tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA384,
858 tokio_rustls::rustls::SignatureScheme::RSA_PSS_SHA512,
859 tokio_rustls::rustls::SignatureScheme::ED25519,
860 tokio_rustls::rustls::SignatureScheme::ED448,
861 ]
862 }
863}
864
865pub(crate) trait URLExt {
866 fn host_port(&self) -> core::result::Result<String, url::ParseError>;
867}
868
869impl URLExt for url::Url {
870 fn host_port(&self) -> core::result::Result<String, url::ParseError> {
871 let host = self.host_str().ok_or(url::ParseError::EmptyHost)?;
872 Ok(match (host, self.port()) {
873 (host, Some(port)) => format!("{host}:{port}"),
874 (host, _) => host.to_owned(),
875 })
876 }
877}
878
879#[tracing::instrument(skip(next_renew_at, weak_state, client, refresh_url))]
880#[expect(clippy::arithmetic_side_effects, reason="Ok to subtract now")]
881#[expect(clippy::single_call_fn, reason="Don't want to inline")]
882async fn renew_token_task(
883 weak_state: std::sync::Weak<tokio::sync::Mutex<ClientState>>,
884 mut next_renew_at: chrono::DateTime<chrono::Utc>,
885 client: reqwest::Client,
886 refresh_url: url::Url,
887) {
888 loop {
889 let now: chrono::DateTime<_> = chrono::Local::now().into();
890
891 let sleep_time = next_renew_at - now;
892
893 tracing::info!("renew token in {sleep_time} ({})", next_renew_at);
894
895 tokio::time::sleep(
896 sleep_time
897 .to_std()
898 .unwrap_or_else(|_| core::time::Duration::from_secs(0)),
899 )
900 .await;
901
902 if let Some(state) = weak_state.upgrade() {
903 let mut state = state.lock().await;
904
905 if now > state.login_token.expires {
906 tracing::error!("Token is expired and we can't renew. Giving up");
907 break;
908 }
909 let response = client
910 .post(refresh_url.clone())
911 .bearer_auth(&state.login_token.token)
912 .send()
913 .await;
914
915 let response = match response {
916 Ok(resp) => resp,
917 Err(err) => {
918 tracing::error!("Failed to renew token: {err}");
919 next_renew_at = chrono::Utc::now() + chrono::Duration::seconds(1);
920 continue;
921 }
922 };
923
924 let Ok(text) = response.text().await else {
925 tracing::error!("Failed to get token renew response text");
926 continue;
927 };
928 let new_login_token = serde_json::from_str::<LoginToken>(&text);
929
930 match new_login_token {
931 Ok(new_login_token) => {
932 next_renew_at = new_login_token.renew_at();
933 state.login_token = new_login_token;
934 tracing::info!(
935 "Successfully renewed token, expires: {}",
936 state.login_token.expires
937 );
938 }
939 Err(err) => {
940 tracing::error!("Failed to parse or get token: {err}");
941 next_renew_at = chrono::Utc::now() + chrono::Duration::seconds(1);
943 }
944 }
945 } else {
946 tracing::debug!("renew_token: State lost");
947 break;
949 }
950 }
951}
952
953#[cfg(test)]
954mod test {
955 #[test]
956 #[expect(clippy::shadow_unrelated, reason="Test")]
957 fn url_ext() {
958 use super::URLExt as _;
959 let url = url::Url::parse("https://1.2.3.4:5000/a/b/c").unwrap();
960 let host_port = url.host_port().unwrap();
961 assert_eq!(&host_port, "1.2.3.4:5000");
962
963 let url = url::Url::parse("https://1.2.3.4/a/b/c").unwrap();
964 let host_port = url.host_port().unwrap();
965 assert_eq!(&host_port, "1.2.3.4");
966
967 let url = url::Url::parse("https://www.avassa.com/a/b/c").unwrap();
968 let host_port = url.host_port().unwrap();
969 assert_eq!(&host_port, "www.avassa.com");
970
971 let url = url::Url::parse("https://www.avassa.com:1234/a/b/c").unwrap();
972 let host_port = url.host_port().unwrap();
973 assert_eq!(&host_port, "www.avassa.com:1234");
974 }
975}