authly_client/
lib.rs

1//! `authly-client` is an asynchronous Rust client handle for services interfacing with the authly service.
2//!
3//! At present, it only works with the `tokio` runtime.
4
5#![forbid(unsafe_code)]
6#![warn(missing_docs)]
7
8pub use access_control::AccessControl;
9pub use authly_common::service::NamespacePropertyMapping;
10pub use builder::ClientBuilder;
11use builder::ConnectionParamsBuilder;
12use connection::{Connection, ConnectionParams, ReconfigureStrategy};
13pub use error::Error;
14use futures_util::{stream::BoxStream, StreamExt};
15use metadata::{NamespaceMetadata, ServiceMetadata};
16use rcgen::{CertificateParams, DnType, ExtendedKeyUsagePurpose, KeyPair, KeyUsagePurpose};
17use rustls_pki_types::{CertificateDer, PrivateKeyDer};
18pub use token::AccessToken;
19
20use arc_swap::ArcSwap;
21use tracing::info;
22
23use std::{borrow::Cow, sync::Arc, time::Duration};
24
25use anyhow::anyhow;
26use authly_common::{
27    access_token::AuthlyAccessTokenClaims,
28    id::{Id128DynamicArrayConv, ServiceId},
29    proto::{
30        proto_struct_to_json,
31        service::{self as proto, authly_service_client::AuthlyServiceClient},
32    },
33};
34use http::header::COOKIE;
35use tonic::{transport::Channel, Request};
36
37pub mod access_control;
38pub mod connection;
39pub mod identity;
40pub mod metadata;
41pub mod token;
42
43mod background_worker;
44mod builder;
45mod error;
46
47/// File path for the root CA certificate.
48#[expect(unused)]
49const ROOT_CA_CERT_PATH: &str = "/etc/authly/certs/root.crt";
50
51/// File path for the local CA certificate.
52const LOCAL_CA_CERT_PATH: &str = "/etc/authly/certs/local.crt";
53
54/// File path for the local CA certificate.
55const IDENTITY_PATH: &str = "/etc/authly/identity/identity.pem";
56
57/// File path for detecting a valid kubernetes environment.
58const K8S_SA_TOKENFILE_PATH: &str = "/var/run/secrets/kubernetes.io/serviceaccount/token";
59
60/// The authly client handle.
61#[derive(Clone)]
62pub struct Client {
63    state: Arc<ClientState>,
64}
65
66/// Shared data for cloned clients
67struct ClientState {
68    /// The current connection
69    conn: ArcSwap<Connection>,
70
71    /// How to reconfigure the connection
72    reconfigure: ReconfigureStrategy,
73
74    /// Triggered when the client connection parameters get reconfigured
75    #[allow(unused)]
76    reconfigured_rx: tokio::sync::watch::Receiver<Arc<ConnectionParams>>,
77
78    /// Triggered when the cache is cleared => service metadata invalidated
79    metadata_invalidated_rx: tokio::sync::watch::Receiver<()>,
80
81    /// signal sent when the state is dropped
82    closed_tx: tokio::sync::watch::Sender<()>,
83
84    /// current configuration
85    configuration: ArcSwap<Configuration>,
86}
87
88struct Configuration {
89    /// service hosts
90    hosts: Vec<String>,
91
92    /// The resource property mapping for this service.
93    /// It's kept in an ArcSwap to potentially support live-update of this structure.
94    /// For that to work, the client should keep a subscription option and listen
95    /// for change events and re-download the property mapping.
96    resource_property_mapping: Arc<NamespacePropertyMapping>,
97}
98
99impl Drop for ClientState {
100    fn drop(&mut self) {
101        let _ = self.closed_tx.send(());
102    }
103}
104
105impl Client {
106    /// Construct a new builder.
107    pub fn builder() -> ClientBuilder {
108        let url = std::env::var("AUTHLY_URL")
109            .map(Cow::Owned)
110            .unwrap_or(Cow::Borrowed("https://authly"));
111
112        ClientBuilder {
113            inner: ConnectionParamsBuilder::new(url),
114        }
115    }
116
117    /// Retrieve the [ServiceMetadata] about service this client identifies as.
118    pub async fn metadata(&self) -> Result<ServiceMetadata, Error> {
119        let proto = self
120            .current_service()
121            .get_metadata(proto::Empty::default())
122            .await
123            .map_err(error::tonic)?
124            .into_inner();
125
126        Ok(ServiceMetadata {
127            entity_id: ServiceId::try_from_bytes_dynamic(&proto.entity_id)
128                .ok_or_else(id_codec_error)?,
129            label: proto.label,
130            namespaces: proto
131                .namespaces
132                .into_iter()
133                .map(|proto| NamespaceMetadata {
134                    label: proto.label,
135                    metadata: proto.metadata.map(proto_struct_to_json),
136                })
137                .collect(),
138        })
139    }
140
141    /// Get a stream of [ServiceMetadata] changes.
142    ///
143    /// The first metadata in the stream resolves immediately, and is the current metadata.
144    pub async fn metadata_stream(&self) -> Result<BoxStream<'static, ServiceMetadata>, Error> {
145        struct StreamState {
146            initial: Option<ServiceMetadata>,
147            client: Client,
148            watch: tokio::sync::watch::Receiver<()>,
149        }
150
151        let mut state = StreamState {
152            initial: Some(self.metadata().await?),
153            client: self.clone(),
154            watch: self.state.metadata_invalidated_rx.clone(),
155        };
156        state.watch.mark_unchanged();
157
158        Ok(futures_util::stream::unfold(state, |mut state| async move {
159            match state.initial {
160                Some(initial) => Some((
161                    initial,
162                    StreamState {
163                        initial: None,
164                        ..state
165                    },
166                )),
167                None => {
168                    state.watch.changed().await.ok()?;
169
170                    let next = loop {
171                        match state.client.metadata().await {
172                            Ok(metadata) => break metadata,
173                            Err(err) => {
174                                info!(?err, "unable to re-fetch metadata, retrying soon");
175                                tokio::time::sleep(Duration::from_secs(10)).await;
176                            }
177                        }
178                    };
179
180                    Some((next, state))
181                }
182            }
183        })
184        .boxed())
185    }
186
187    /// Get the current resource properties of this service, in the form of a [NamespacePropertyMapping].
188    pub fn get_resource_property_mapping(&self) -> Arc<NamespacePropertyMapping> {
189        self.state
190            .configuration
191            .load()
192            .resource_property_mapping
193            .clone()
194    }
195
196    /// Decode and validate an Authly [AccessToken].
197    /// The access token usually represents an entity which is a user of the system.
198    pub fn decode_access_token(
199        &self,
200        access_token: impl Into<String>,
201    ) -> Result<Arc<AccessToken>, Error> {
202        let access_token = access_token.into();
203        let validation = jsonwebtoken::Validation::new(jsonwebtoken::Algorithm::ES256);
204        let token_data = jsonwebtoken::decode::<AuthlyAccessTokenClaims>(
205            &access_token,
206            &self.state.conn.load().params.jwt_decoding_key,
207            &validation,
208        )
209        .map_err(|err| Error::InvalidAccessToken(err.into()))?;
210
211        Ok(Arc::new(AccessToken {
212            token: access_token,
213            claims: token_data.claims,
214        }))
215    }
216
217    /// Exchange a session token for an access token suitable for evaluating access control.
218    pub async fn get_access_token(&self, session_token: &str) -> Result<Arc<AccessToken>, Error> {
219        let mut request = Request::new(proto::Empty::default());
220
221        // TODO: This should use Authorization instead of Cookie?
222        request.metadata_mut().append(
223            COOKIE.as_str(),
224            format!("session-cookie={session_token}")
225                .parse()
226                .map_err(error::unclassified)?,
227        );
228
229        let proto = self
230            .current_service()
231            .get_access_token(request)
232            .await
233            .map_err(error::tonic)?
234            .into_inner();
235
236        self.decode_access_token(proto.token)
237    }
238
239    /// Convert a clone of self into a dynamically dispatched access control object.
240    ///
241    /// This can be useful in tests where access control needs to be mocked out.
242    pub fn into_dyn_access_control(self) -> Arc<dyn AccessControl + Send + Sync + 'static> {
243        Arc::new(self)
244    }
245
246    /// Generate a server certificate and a key pair for the service.
247    ///
248    /// This involves sending a Certificate Signing Request for Authly to resolve.
249    ///
250    /// Returns a pair of Certificate signed by the Authly Local CA, and the matching private key to be used by the server.
251    ///
252    /// The common name can be any chosen text identifying the service.
253    ///
254    pub async fn generate_server_tls_params(
255        &self,
256        subject_common_name: &str,
257    ) -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>), Error> {
258        let hosts = self.state.configuration.load().hosts.clone();
259        let params = {
260            let mut params = CertificateParams::new(hosts).map_err(|_| Error::InvalidAltNames)?;
261            params
262                .distinguished_name
263                .push(DnType::CommonName, subject_common_name);
264            params.distinguished_name.push(
265                DnType::CustomDnType(
266                    authly_common::certificate::oid::ENTITY_UNIQUE_IDENTIFIER.to_vec(),
267                ),
268                self.state.conn.load().params.entity_id.to_string(),
269            );
270            params.use_authority_key_identifier_extension = false;
271            params.key_usages.push(KeyUsagePurpose::DigitalSignature);
272            params
273                .extended_key_usages
274                .push(ExtendedKeyUsagePurpose::ServerAuth);
275
276            let now = time::OffsetDateTime::now_utc();
277            params.not_before = now;
278
279            // A default timeout that is one year.
280            // FIXME(rotation) What happens to the server after the certificate expires?
281            // No other services would then be able to connect to it, but it wouldn't itself understand that it's broken.
282            params.not_after = now.checked_add(time::Duration::days(365)).unwrap();
283            params
284        };
285
286        // The key pair to use for the server, and signing the Certificate Signing Request.
287        // The private key is not sent to Authly.
288        let key_pair = KeyPair::generate().map_err(|_err| Error::PrivateKeyGen)?;
289        let csr_der = params
290            .serialize_request(&key_pair)
291            .expect("the parameters should be correct")
292            .der()
293            .to_vec();
294
295        let proto = self
296            .state
297            .conn
298            .load()
299            .authly_service
300            .clone()
301            .sign_certificate(Request::new(proto::CertificateSigningRequest {
302                der: csr_der,
303            }))
304            .await
305            .map_err(error::tonic)?;
306
307        let certificate = CertificateDer::from(proto.into_inner().der);
308        let private_key = PrivateKeyDer::try_from(key_pair.serialize_der()).map_err(|err| {
309            Error::Unclassified(anyhow!("could not serialize private key: {err}"))
310        })?;
311
312        Ok((certificate, private_key))
313    }
314
315    /// Return a stream of [rustls::ServerConfig] values for configuring authly-verified servers.
316    /// The first stream item will resolve immediately.
317    ///
318    /// The config comes with `h2` and `http/1.1` ALPN protocols.
319    /// This may become configurable in the future.
320    ///
321    /// For now, this only renews the server certificate when absolutely required.
322    /// In the future, this may rotate server certificates automatically on a fixed (configurable) interval.
323    #[cfg(feature = "rustls_023")]
324    pub async fn rustls_server_configurer(
325        &self,
326        subject_common_name: impl Into<Cow<'static, str>>,
327    ) -> Result<futures_util::stream::BoxStream<'static, Arc<rustls::ServerConfig>>, Error> {
328        use std::time::Duration;
329
330        use futures_util::StreamExt;
331        use rustls::{server::WebPkiClientVerifier, RootCertStore};
332        use rustls_pki_types::pem::PemObject;
333
334        async fn rebuild_server_config(
335            client: Client,
336            params: Arc<ConnectionParams>,
337            subject_common_name: Cow<'static, str>,
338        ) -> Result<Arc<rustls::ServerConfig>, Error> {
339            let mut root_cert_store = RootCertStore::empty();
340            root_cert_store
341                .add(
342                    CertificateDer::from_pem_slice(&params.authly_local_ca)
343                        .map_err(|_err| Error::AuthlyCA("unable to parse"))?,
344                )
345                .map_err(|_err| Error::AuthlyCA("unable to include in root cert store"))?;
346
347            let (cert, key) = client
348                .generate_server_tls_params(&subject_common_name)
349                .await?;
350
351            let mut tls_config = rustls::server::ServerConfig::builder()
352                .with_client_cert_verifier(
353                    WebPkiClientVerifier::builder(root_cert_store.into())
354                        .build()
355                        .map_err(|_| Error::AuthlyCA("cannot build a WebPki client verifier"))?,
356                )
357                .with_single_cert(vec![cert], key)
358                .map_err(|_| Error::Tls("Unable to configure server"))?;
359            tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
360
361            Ok(Arc::new(tls_config))
362        }
363
364        let client = self.clone();
365        let subject_common_name = subject_common_name.into();
366        let mut reconfigured_rx = self.state.reconfigured_rx.clone();
367        let initial_params = reconfigured_rx.borrow_and_update().clone();
368        let initial_tls_config =
369            rebuild_server_config(client.clone(), initial_params, subject_common_name.clone())
370                .await?;
371
372        let immediate_stream = futures_util::stream::iter([initial_tls_config]);
373
374        let rotation_stream =
375            futures_util::stream::unfold(reconfigured_rx, move |mut reconfigured_rx| {
376                let client = client.clone();
377                let subject_common_name = subject_common_name.clone();
378
379                async move {
380                    // wait for configuration change
381                    reconfigured_rx.changed().await.ok()?;
382
383                    loop {
384                        let params = reconfigured_rx.borrow_and_update().clone();
385                        let server_config_result = rebuild_server_config(
386                            client.clone(),
387                            params,
388                            subject_common_name.clone(),
389                        )
390                        .await;
391
392                        match server_config_result {
393                            Ok(server_config) => return Some((server_config, reconfigured_rx)),
394                            Err(err) => {
395                                tracing::error!(
396                                    ?err,
397                                    "could not regenerate TLS server config, trying again soon"
398                                );
399                                tokio::time::sleep(Duration::from_secs(10)).await;
400                            }
401                        }
402                    }
403                }
404            });
405
406        Ok(immediate_stream.chain(rotation_stream).boxed())
407    }
408
409    /// Generates a stream of [ConnectionParams] that this client uses to connect to Authly.
410    ///
411    /// The TLS-related parts of those parameters can be used by the client when
412    /// communicating with other services in the Authly service mesh.
413    ///
414    /// The first stream item will resolve immediately.
415    pub fn connection_params_stream(
416        &self,
417    ) -> futures_util::stream::BoxStream<'static, Arc<ConnectionParams>> {
418        use futures_util::StreamExt;
419
420        let mut reconfigured_rx = self.state.reconfigured_rx.clone();
421        let initial_params = reconfigured_rx.borrow_and_update().clone();
422
423        let immediate_stream = futures_util::stream::iter([initial_params]);
424
425        let rotation_stream =
426            futures_util::stream::unfold(reconfigured_rx, move |mut reconfigured_rx| {
427                async move {
428                    // wait for configuration change
429                    let Ok(()) = reconfigured_rx.changed().await else {
430                        // client dropped
431                        return None;
432                    };
433
434                    let params = reconfigured_rx.borrow_and_update().clone();
435
436                    Some((params, reconfigured_rx))
437                }
438            });
439
440        immediate_stream.chain(rotation_stream).boxed()
441    }
442
443    /// Generates a stream of [reqwest::ClientBuilder] preconfigured with Authly TLS paramaters.
444    /// The first stream item will resolve immediately.
445    #[cfg(feature = "reqwest_012")]
446    pub fn request_client_builder_stream(
447        &self,
448    ) -> Result<futures_util::stream::BoxStream<'static, reqwest::ClientBuilder>, Error> {
449        use futures_util::StreamExt;
450
451        fn rebuild(params: Arc<ConnectionParams>) -> Result<reqwest::ClientBuilder, Error> {
452            Ok(reqwest::Client::builder()
453                .add_root_certificate(
454                    reqwest::tls::Certificate::from_pem(&params.authly_local_ca)
455                        .map_err(|_| Error::AuthlyCA("unable to parse"))?,
456                )
457                .identity(
458                    reqwest::Identity::from_pem(params.identity.pem()?.as_ref())
459                        .map_err(|_| Error::Identity("unable to parse"))?,
460                ))
461        }
462
463        Ok(self
464            .connection_params_stream()
465            .map(|params| rebuild(params).expect("could not make a reqwest Client"))
466            .boxed())
467    }
468}
469
470/// Private methods
471impl Client {
472    fn current_service(&self) -> AuthlyServiceClient<Channel> {
473        self.state.conn.load().authly_service.clone()
474    }
475}
476
477fn id_codec_error() -> Error {
478    Error::Codec(anyhow!("id decocing error"))
479}
480
481async fn get_configuration(
482    mut service: AuthlyServiceClient<Channel>,
483) -> Result<Configuration, Error> {
484    let response = service
485        .get_configuration(proto::Empty::default())
486        .await
487        .map_err(error::tonic)?
488        .into_inner();
489
490    Ok(Configuration {
491        hosts: response.hosts,
492        resource_property_mapping: access_control::get_resource_property_mapping(
493            response.property_mapping_namespaces,
494        )?,
495    })
496}