1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use derivative::Derivative;
use derive_more::Display;
use derive_more::From;
use futures::prelude::*;
use thiserror::Error;
use crate::router::Event;
use crate::router::Event::NoMoreLicense;
use crate::uplink::license_enforcement::Audience;
use crate::uplink::license_enforcement::License;
use crate::uplink::license_stream::LicenseQuery;
use crate::uplink::license_stream::LicenseStreamExt;
use crate::uplink::stream_from_uplink;
use crate::uplink::UplinkConfig;
const APOLLO_ROUTER_LICENSE_INVALID: &str = "APOLLO_ROUTER_LICENSE_INVALID";
type LicenseStream = Pin<Box<dyn Stream<Item = License> + Send>>;
#[derive(Debug, Display, From, Error)]
enum Error {
/// The license is invalid.
InvalidLicense,
}
/// License controls availability of certain features of the Router.
/// This API experimental and is subject to change outside of semver.
#[derive(From, Display, Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
pub enum LicenseSource {
/// A static license. EXPERIMENTAL and not subject to semver.
#[display(fmt = "Static")]
Static { license: License },
/// A license supplied via APOLLO_ROUTER_LICENSE. EXPERIMENTAL and not subject to semver.
#[display(fmt = "Env")]
Env,
/// A stream of license. EXPERIMENTAL and not subject to semver.
#[display(fmt = "Stream")]
Stream(#[derivative(Debug = "ignore")] LicenseStream),
/// A raw file that may be watched for changes. EXPERIMENTAL and not subject to semver.
#[display(fmt = "File")]
File {
/// The path of the license file.
path: PathBuf,
/// `true` to watch the file for changes and hot apply them.
watch: bool,
},
/// Apollo uplink.
#[display(fmt = "Registry")]
Registry(UplinkConfig),
}
impl Default for LicenseSource {
fn default() -> Self {
LicenseSource::Static {
license: Default::default(),
}
}
}
const VALID_AUDIENCES_USER_SUPLIED_LICENSES: [Audience; 2] = [Audience::Offline, Audience::Cloud];
impl LicenseSource {
/// Convert this license into a stream regardless of if is static or not. Allows for unified handling later.
pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
match self {
LicenseSource::Static { license } => stream::once(future::ready(license))
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed(),
LicenseSource::Stream(stream) => stream
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed(),
LicenseSource::File { path, watch } => {
// Sanity check, does the schema file exists, if it doesn't then bail.
if !path.exists() {
tracing::error!(
"License file at path '{}' does not exist.",
path.to_string_lossy()
);
stream::empty().boxed()
} else {
// The license file exists try and load it
match std::fs::read_to_string(&path).map(|e| e.parse()) {
Ok(Ok(license)) => {
if watch {
crate::files::watch(&path)
.filter_map(move |_| {
let path = path.clone();
async move {
let result = tokio::fs::read_to_string(&path).await;
if let Err(e) = &result {
tracing::error!(
"failed to read license file, {}",
e
);
}
result.ok()
}
})
.filter_map(|e| async move {
let result = e.parse();
if let Err(e) = &result {
tracing::error!(
code = APOLLO_ROUTER_LICENSE_INVALID,
"failed to parse license file, {}",
e
);
}
result.ok()
})
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed()
} else {
stream::once(future::ready(license))
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed()
}
}
Ok(Err(err)) => {
tracing::error!(
code = APOLLO_ROUTER_LICENSE_INVALID,
"Failed to parse license: {}",
err
);
stream::empty().boxed()
}
Err(err) => {
tracing::error!(
code = APOLLO_ROUTER_LICENSE_INVALID,
"Failed to read license: {}",
err
);
stream::empty().boxed()
}
}
}
}
LicenseSource::Registry(uplink_config) => {
stream_from_uplink::<LicenseQuery, License>(uplink_config)
.filter_map(|res| {
future::ready(match res {
Ok(license) => Some(license),
Err(e) => {
tracing::error!(code = APOLLO_ROUTER_LICENSE_INVALID, "{}", e);
None
}
})
})
.boxed()
}
LicenseSource::Env => {
// EXPERIMENTAL and not subject to semver.
match std::env::var("APOLLO_ROUTER_LICENSE").map(|e| License::from_str(&e)) {
Ok(Ok(license)) => stream::once(future::ready(license)).boxed(),
Ok(Err(err)) => {
tracing::error!("Failed to parse license: {}", err);
stream::empty().boxed()
}
Err(_) => stream::once(future::ready(License::default()))
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed(),
}
}
}
.expand_licenses()
.chain(stream::iter(vec![NoMoreLicense]))
}
}