apollo_router/router/event/
license.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::str::FromStr;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9
10use crate::router::Event;
11use crate::router::Event::NoMoreLicense;
12use crate::uplink::UplinkConfig;
13use crate::uplink::license_enforcement::Audience;
14use crate::uplink::license_enforcement::License;
15use crate::uplink::license_stream::LicenseQuery;
16use crate::uplink::license_stream::LicenseStreamExt;
17use crate::uplink::stream_from_uplink;
18
19const APOLLO_ROUTER_LICENSE_INVALID: &str = "APOLLO_ROUTER_LICENSE_INVALID";
20
21type LicenseStream = Pin<Box<dyn Stream<Item = License> + Send>>;
22
23/// License controls availability of certain features of the Router.
24/// This API experimental and is subject to change outside of semver.
25#[derive(From, Display, Derivative)]
26#[derivative(Debug)]
27#[non_exhaustive]
28pub enum LicenseSource {
29    /// A static license. EXPERIMENTAL and not subject to semver.
30    #[display("Static")]
31    Static { license: License },
32
33    /// A license supplied via APOLLO_ROUTER_LICENSE. EXPERIMENTAL and not subject to semver.
34    #[display("Env")]
35    Env,
36
37    /// A stream of license. EXPERIMENTAL and not subject to semver.
38    #[display("Stream")]
39    Stream(#[derivative(Debug = "ignore")] LicenseStream),
40
41    /// A raw file that may be watched for changes. EXPERIMENTAL and not subject to semver.
42    #[display("File")]
43    File {
44        /// The path of the license file.
45        path: PathBuf,
46
47        /// `true` to watch the file for changes and hot apply them.
48        watch: bool,
49    },
50
51    /// Apollo uplink.
52    #[display("Registry")]
53    Registry(UplinkConfig),
54}
55
56impl Default for LicenseSource {
57    fn default() -> Self {
58        LicenseSource::Static {
59            license: Default::default(),
60        }
61    }
62}
63
64const VALID_AUDIENCES_USER_SUPLIED_LICENSES: [Audience; 2] = [Audience::Offline, Audience::Cloud];
65
66impl LicenseSource {
67    /// Convert this license into a stream regardless of if is static or not. Allows for unified handling later.
68    pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
69        match self {
70            LicenseSource::Static { license } => stream::once(future::ready(license))
71                .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
72                .boxed(),
73            LicenseSource::Stream(stream) => stream
74                .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
75                .boxed(),
76            LicenseSource::File { path, watch } => {
77                // Sanity check, does the schema file exists, if it doesn't then bail.
78                if !path.exists() {
79                    tracing::error!(
80                        "License file at path '{}' does not exist.",
81                        path.to_string_lossy()
82                    );
83                    stream::empty().boxed()
84                } else {
85                    // The license file exists try and load it
86                    match std::fs::read_to_string(&path).map(|e| e.parse()) {
87                        Ok(Ok(license)) => {
88                            if watch {
89                                crate::files::watch(&path)
90                                    .filter_map(move |_| {
91                                        let path = path.clone();
92                                        async move {
93                                            let result = tokio::fs::read_to_string(&path).await;
94                                            if let Err(e) = &result {
95                                                tracing::error!(
96                                                    "failed to read license file, {}",
97                                                    e
98                                                );
99                                            }
100                                            result.ok()
101                                        }
102                                    })
103                                    .filter_map(|e| async move {
104                                        let result = e.parse();
105                                        if let Err(e) = &result {
106                                            tracing::error!(
107                                                code = APOLLO_ROUTER_LICENSE_INVALID,
108                                                "failed to parse license file, {}",
109                                                e
110                                            );
111                                        }
112                                        result.ok()
113                                    })
114                                    .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
115                                    .boxed()
116                            } else {
117                                stream::once(future::ready(license))
118                                    .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
119                                    .boxed()
120                            }
121                        }
122                        Ok(Err(err)) => {
123                            tracing::error!(
124                                code = APOLLO_ROUTER_LICENSE_INVALID,
125                                "Failed to parse license: {}",
126                                err
127                            );
128                            stream::empty().boxed()
129                        }
130                        Err(err) => {
131                            tracing::error!(
132                                code = APOLLO_ROUTER_LICENSE_INVALID,
133                                "Failed to read license: {}",
134                                err
135                            );
136                            stream::empty().boxed()
137                        }
138                    }
139                }
140            }
141
142            LicenseSource::Registry(uplink_config) => {
143                stream_from_uplink::<LicenseQuery, License>(uplink_config)
144                    .filter_map(|res| {
145                        future::ready(match res {
146                            Ok(license) => Some(license),
147                            Err(e) => {
148                                tracing::error!(code = APOLLO_ROUTER_LICENSE_INVALID, "{}", e);
149                                None
150                            }
151                        })
152                    })
153                    .boxed()
154            }
155            LicenseSource::Env => {
156                // EXPERIMENTAL and not subject to semver.
157                match std::env::var("APOLLO_ROUTER_LICENSE").map(|e| License::from_str(&e)) {
158                    Ok(Ok(license)) => stream::once(future::ready(license)).boxed(),
159                    Ok(Err(err)) => {
160                        tracing::error!("Failed to parse license: {}", err);
161                        stream::empty().boxed()
162                    }
163                    Err(_) => stream::once(future::ready(License::default()))
164                        .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
165                        .boxed(),
166                }
167            }
168        }
169        .expand_licenses()
170        .chain(stream::iter(vec![NoMoreLicense]))
171    }
172}