1#![forbid(unsafe_code)]
6#![warn(missing_docs)]
7
8pub use access_control::AccessControl;
9pub use authly_common::service::NamespacePropertyMapping;
10pub use builder::ClientBuilder;
11use builder::ConnectionParamsBuilder;
12use connection::{Connection, ConnectionParams, ReconfigureStrategy};
13pub use error::Error;
14use futures_util::{stream::BoxStream, StreamExt};
15use metadata::{NamespaceMetadata, ServiceMetadata};
16use rcgen::{CertificateParams, DnType, ExtendedKeyUsagePurpose, KeyPair, KeyUsagePurpose};
17use rustls_pki_types::{CertificateDer, PrivateKeyDer};
18pub use token::AccessToken;
19
20use arc_swap::ArcSwap;
21use tracing::info;
22
23use std::{borrow::Cow, sync::Arc, time::Duration};
24
25use anyhow::anyhow;
26use authly_common::{
27 access_token::AuthlyAccessTokenClaims,
28 id::{Id128DynamicArrayConv, ServiceId},
29 proto::{
30 proto_struct_to_json,
31 service::{self as proto, authly_service_client::AuthlyServiceClient},
32 },
33};
34use http::header::COOKIE;
35use tonic::{transport::Channel, Request};
36
37pub mod access_control;
38pub mod connection;
39pub mod identity;
40pub mod metadata;
41pub mod token;
42
43mod background_worker;
44mod builder;
45mod error;
46
47#[expect(unused)]
49const ROOT_CA_CERT_PATH: &str = "/etc/authly/certs/root.crt";
50
51const LOCAL_CA_CERT_PATH: &str = "/etc/authly/certs/local.crt";
53
54const IDENTITY_PATH: &str = "/etc/authly/identity/identity.pem";
56
57const K8S_SA_TOKENFILE_PATH: &str = "/var/run/secrets/kubernetes.io/serviceaccount/token";
59
60#[derive(Clone)]
62pub struct Client {
63 state: Arc<ClientState>,
64}
65
66struct ClientState {
68 conn: ArcSwap<Connection>,
70
71 reconfigure: ReconfigureStrategy,
73
74 #[allow(unused)]
76 reconfigured_rx: tokio::sync::watch::Receiver<Arc<ConnectionParams>>,
77
78 metadata_invalidated_rx: tokio::sync::watch::Receiver<()>,
80
81 closed_tx: tokio::sync::watch::Sender<()>,
83
84 configuration: ArcSwap<Configuration>,
86}
87
88struct Configuration {
89 hosts: Vec<String>,
91
92 resource_property_mapping: Arc<NamespacePropertyMapping>,
97}
98
99impl Drop for ClientState {
100 fn drop(&mut self) {
101 let _ = self.closed_tx.send(());
102 }
103}
104
105impl Client {
106 pub fn builder() -> ClientBuilder {
108 let url = std::env::var("AUTHLY_URL")
109 .map(Cow::Owned)
110 .unwrap_or(Cow::Borrowed("https://authly"));
111
112 ClientBuilder {
113 inner: ConnectionParamsBuilder::new(url),
114 }
115 }
116
117 pub async fn metadata(&self) -> Result<ServiceMetadata, Error> {
119 let proto = self
120 .current_service()
121 .get_metadata(proto::Empty::default())
122 .await
123 .map_err(error::tonic)?
124 .into_inner();
125
126 Ok(ServiceMetadata {
127 entity_id: ServiceId::try_from_bytes_dynamic(&proto.entity_id)
128 .ok_or_else(id_codec_error)?,
129 label: proto.label,
130 namespaces: proto
131 .namespaces
132 .into_iter()
133 .map(|proto| NamespaceMetadata {
134 label: proto.label,
135 metadata: proto.metadata.map(proto_struct_to_json),
136 })
137 .collect(),
138 })
139 }
140
141 pub async fn metadata_stream(&self) -> Result<BoxStream<'static, ServiceMetadata>, Error> {
145 struct StreamState {
146 initial: Option<ServiceMetadata>,
147 client: Client,
148 watch: tokio::sync::watch::Receiver<()>,
149 }
150
151 let mut state = StreamState {
152 initial: Some(self.metadata().await?),
153 client: self.clone(),
154 watch: self.state.metadata_invalidated_rx.clone(),
155 };
156 state.watch.mark_unchanged();
157
158 Ok(futures_util::stream::unfold(state, |mut state| async move {
159 match state.initial {
160 Some(initial) => Some((
161 initial,
162 StreamState {
163 initial: None,
164 ..state
165 },
166 )),
167 None => {
168 state.watch.changed().await.ok()?;
169
170 let next = loop {
171 match state.client.metadata().await {
172 Ok(metadata) => break metadata,
173 Err(err) => {
174 info!(?err, "unable to re-fetch metadata, retrying soon");
175 tokio::time::sleep(Duration::from_secs(10)).await;
176 }
177 }
178 };
179
180 Some((next, state))
181 }
182 }
183 })
184 .boxed())
185 }
186
187 pub fn get_resource_property_mapping(&self) -> Arc<NamespacePropertyMapping> {
189 self.state
190 .configuration
191 .load()
192 .resource_property_mapping
193 .clone()
194 }
195
196 pub fn decode_access_token(
199 &self,
200 access_token: impl Into<String>,
201 ) -> Result<Arc<AccessToken>, Error> {
202 let access_token = access_token.into();
203 let validation = jsonwebtoken::Validation::new(jsonwebtoken::Algorithm::ES256);
204 let token_data = jsonwebtoken::decode::<AuthlyAccessTokenClaims>(
205 &access_token,
206 &self.state.conn.load().params.jwt_decoding_key,
207 &validation,
208 )
209 .map_err(|err| Error::InvalidAccessToken(err.into()))?;
210
211 Ok(Arc::new(AccessToken {
212 token: access_token,
213 claims: token_data.claims,
214 }))
215 }
216
217 pub async fn get_access_token(&self, session_token: &str) -> Result<Arc<AccessToken>, Error> {
219 let mut request = Request::new(proto::Empty::default());
220
221 request.metadata_mut().append(
223 COOKIE.as_str(),
224 format!("session-cookie={session_token}")
225 .parse()
226 .map_err(error::unclassified)?,
227 );
228
229 let proto = self
230 .current_service()
231 .get_access_token(request)
232 .await
233 .map_err(error::tonic)?
234 .into_inner();
235
236 self.decode_access_token(proto.token)
237 }
238
239 pub fn into_dyn_access_control(self) -> Arc<dyn AccessControl + Send + Sync + 'static> {
243 Arc::new(self)
244 }
245
246 pub async fn generate_server_tls_params(
255 &self,
256 subject_common_name: &str,
257 ) -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>), Error> {
258 let hosts = self.state.configuration.load().hosts.clone();
259 let params = {
260 let mut params = CertificateParams::new(hosts).map_err(|_| Error::InvalidAltNames)?;
261 params
262 .distinguished_name
263 .push(DnType::CommonName, subject_common_name);
264 params.distinguished_name.push(
265 DnType::CustomDnType(
266 authly_common::certificate::oid::ENTITY_UNIQUE_IDENTIFIER.to_vec(),
267 ),
268 self.state.conn.load().params.entity_id.to_string(),
269 );
270 params.use_authority_key_identifier_extension = false;
271 params.key_usages.push(KeyUsagePurpose::DigitalSignature);
272 params
273 .extended_key_usages
274 .push(ExtendedKeyUsagePurpose::ServerAuth);
275
276 let now = time::OffsetDateTime::now_utc();
277 params.not_before = now;
278
279 params.not_after = now.checked_add(time::Duration::days(365)).unwrap();
283 params
284 };
285
286 let key_pair = KeyPair::generate().map_err(|_err| Error::PrivateKeyGen)?;
289 let csr_der = params
290 .serialize_request(&key_pair)
291 .expect("the parameters should be correct")
292 .der()
293 .to_vec();
294
295 let proto = self
296 .state
297 .conn
298 .load()
299 .authly_service
300 .clone()
301 .sign_certificate(Request::new(proto::CertificateSigningRequest {
302 der: csr_der,
303 }))
304 .await
305 .map_err(error::tonic)?;
306
307 let certificate = CertificateDer::from(proto.into_inner().der);
308 let private_key = PrivateKeyDer::try_from(key_pair.serialize_der()).map_err(|err| {
309 Error::Unclassified(anyhow!("could not serialize private key: {err}"))
310 })?;
311
312 Ok((certificate, private_key))
313 }
314
315 #[cfg(feature = "rustls_023")]
324 pub async fn rustls_server_configurer(
325 &self,
326 subject_common_name: impl Into<Cow<'static, str>>,
327 ) -> Result<futures_util::stream::BoxStream<'static, Arc<rustls::ServerConfig>>, Error> {
328 use std::time::Duration;
329
330 use futures_util::StreamExt;
331 use rustls::{server::WebPkiClientVerifier, RootCertStore};
332 use rustls_pki_types::pem::PemObject;
333
334 async fn rebuild_server_config(
335 client: Client,
336 params: Arc<ConnectionParams>,
337 subject_common_name: Cow<'static, str>,
338 ) -> Result<Arc<rustls::ServerConfig>, Error> {
339 let mut root_cert_store = RootCertStore::empty();
340 root_cert_store
341 .add(
342 CertificateDer::from_pem_slice(¶ms.authly_local_ca)
343 .map_err(|_err| Error::AuthlyCA("unable to parse"))?,
344 )
345 .map_err(|_err| Error::AuthlyCA("unable to include in root cert store"))?;
346
347 let (cert, key) = client
348 .generate_server_tls_params(&subject_common_name)
349 .await?;
350
351 let mut tls_config = rustls::server::ServerConfig::builder()
352 .with_client_cert_verifier(
353 WebPkiClientVerifier::builder(root_cert_store.into())
354 .build()
355 .map_err(|_| Error::AuthlyCA("cannot build a WebPki client verifier"))?,
356 )
357 .with_single_cert(vec![cert], key)
358 .map_err(|_| Error::Tls("Unable to configure server"))?;
359 tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
360
361 Ok(Arc::new(tls_config))
362 }
363
364 let client = self.clone();
365 let subject_common_name = subject_common_name.into();
366 let mut reconfigured_rx = self.state.reconfigured_rx.clone();
367 let initial_params = reconfigured_rx.borrow_and_update().clone();
368 let initial_tls_config =
369 rebuild_server_config(client.clone(), initial_params, subject_common_name.clone())
370 .await?;
371
372 let immediate_stream = futures_util::stream::iter([initial_tls_config]);
373
374 let rotation_stream =
375 futures_util::stream::unfold(reconfigured_rx, move |mut reconfigured_rx| {
376 let client = client.clone();
377 let subject_common_name = subject_common_name.clone();
378
379 async move {
380 reconfigured_rx.changed().await.ok()?;
382
383 loop {
384 let params = reconfigured_rx.borrow_and_update().clone();
385 let server_config_result = rebuild_server_config(
386 client.clone(),
387 params,
388 subject_common_name.clone(),
389 )
390 .await;
391
392 match server_config_result {
393 Ok(server_config) => return Some((server_config, reconfigured_rx)),
394 Err(err) => {
395 tracing::error!(
396 ?err,
397 "could not regenerate TLS server config, trying again soon"
398 );
399 tokio::time::sleep(Duration::from_secs(10)).await;
400 }
401 }
402 }
403 }
404 });
405
406 Ok(immediate_stream.chain(rotation_stream).boxed())
407 }
408
409 pub fn connection_params_stream(
416 &self,
417 ) -> futures_util::stream::BoxStream<'static, Arc<ConnectionParams>> {
418 use futures_util::StreamExt;
419
420 let mut reconfigured_rx = self.state.reconfigured_rx.clone();
421 let initial_params = reconfigured_rx.borrow_and_update().clone();
422
423 let immediate_stream = futures_util::stream::iter([initial_params]);
424
425 let rotation_stream =
426 futures_util::stream::unfold(reconfigured_rx, move |mut reconfigured_rx| {
427 async move {
428 let Ok(()) = reconfigured_rx.changed().await else {
430 return None;
432 };
433
434 let params = reconfigured_rx.borrow_and_update().clone();
435
436 Some((params, reconfigured_rx))
437 }
438 });
439
440 immediate_stream.chain(rotation_stream).boxed()
441 }
442
443 #[cfg(feature = "reqwest_012")]
446 pub fn request_client_builder_stream(
447 &self,
448 ) -> Result<futures_util::stream::BoxStream<'static, reqwest::ClientBuilder>, Error> {
449 use futures_util::StreamExt;
450
451 fn rebuild(params: Arc<ConnectionParams>) -> Result<reqwest::ClientBuilder, Error> {
452 Ok(reqwest::Client::builder()
453 .add_root_certificate(
454 reqwest::tls::Certificate::from_pem(¶ms.authly_local_ca)
455 .map_err(|_| Error::AuthlyCA("unable to parse"))?,
456 )
457 .identity(
458 reqwest::Identity::from_pem(params.identity.pem()?.as_ref())
459 .map_err(|_| Error::Identity("unable to parse"))?,
460 ))
461 }
462
463 Ok(self
464 .connection_params_stream()
465 .map(|params| rebuild(params).expect("could not make a reqwest Client"))
466 .boxed())
467 }
468}
469
470impl Client {
472 fn current_service(&self) -> AuthlyServiceClient<Channel> {
473 self.state.conn.load().authly_service.clone()
474 }
475}
476
477fn id_codec_error() -> Error {
478 Error::Codec(anyhow!("id decocing error"))
479}
480
481async fn get_configuration(
482 mut service: AuthlyServiceClient<Channel>,
483) -> Result<Configuration, Error> {
484 let response = service
485 .get_configuration(proto::Empty::default())
486 .await
487 .map_err(error::tonic)?
488 .into_inner();
489
490 Ok(Configuration {
491 hosts: response.hosts,
492 resource_property_mapping: access_control::get_resource_property_mapping(
493 response.property_mapping_namespaces,
494 )?,
495 })
496}