use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use derivative::Derivative;
use derive_more::Display;
use derive_more::From;
use futures::prelude::*;
use crate::router::Event;
use crate::router::Event::NoMoreLicense;
use crate::uplink::UplinkConfig;
use crate::uplink::license_enforcement::Audience;
use crate::uplink::license_enforcement::License;
use crate::uplink::license_stream::LicenseQuery;
use crate::uplink::license_stream::LicenseStreamExt;
use crate::uplink::stream_from_uplink;
const APOLLO_ROUTER_LICENSE_INVALID: &str = "APOLLO_ROUTER_LICENSE_INVALID";
type LicenseStream = Pin<Box<dyn Stream<Item = License> + Send>>;
#[derive(From, Display, Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
pub enum LicenseSource {
#[display(fmt = "Static")]
Static { license: License },
#[display(fmt = "Env")]
Env,
#[display(fmt = "Stream")]
Stream(#[derivative(Debug = "ignore")] LicenseStream),
#[display(fmt = "File")]
File {
path: PathBuf,
watch: bool,
},
#[display(fmt = "Registry")]
Registry(UplinkConfig),
}
impl Default for LicenseSource {
fn default() -> Self {
LicenseSource::Static {
license: Default::default(),
}
}
}
const VALID_AUDIENCES_USER_SUPLIED_LICENSES: [Audience; 2] = [Audience::Offline, Audience::Cloud];
impl LicenseSource {
pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
match self {
LicenseSource::Static { license } => stream::once(future::ready(license))
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed(),
LicenseSource::Stream(stream) => stream
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed(),
LicenseSource::File { path, watch } => {
if !path.exists() {
tracing::error!(
"License file at path '{}' does not exist.",
path.to_string_lossy()
);
stream::empty().boxed()
} else {
match std::fs::read_to_string(&path).map(|e| e.parse()) {
Ok(Ok(license)) => {
if watch {
crate::files::watch(&path)
.filter_map(move |_| {
let path = path.clone();
async move {
let result = tokio::fs::read_to_string(&path).await;
if let Err(e) = &result {
tracing::error!(
"failed to read license file, {}",
e
);
}
result.ok()
}
})
.filter_map(|e| async move {
let result = e.parse();
if let Err(e) = &result {
tracing::error!(
code = APOLLO_ROUTER_LICENSE_INVALID,
"failed to parse license file, {}",
e
);
}
result.ok()
})
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed()
} else {
stream::once(future::ready(license))
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed()
}
}
Ok(Err(err)) => {
tracing::error!(
code = APOLLO_ROUTER_LICENSE_INVALID,
"Failed to parse license: {}",
err
);
stream::empty().boxed()
}
Err(err) => {
tracing::error!(
code = APOLLO_ROUTER_LICENSE_INVALID,
"Failed to read license: {}",
err
);
stream::empty().boxed()
}
}
}
}
LicenseSource::Registry(uplink_config) => {
stream_from_uplink::<LicenseQuery, License>(uplink_config)
.filter_map(|res| {
future::ready(match res {
Ok(license) => Some(license),
Err(e) => {
tracing::error!(code = APOLLO_ROUTER_LICENSE_INVALID, "{}", e);
None
}
})
})
.boxed()
}
LicenseSource::Env => {
match std::env::var("APOLLO_ROUTER_LICENSE").map(|e| License::from_str(&e)) {
Ok(Ok(license)) => stream::once(future::ready(license)).boxed(),
Ok(Err(err)) => {
tracing::error!("Failed to parse license: {}", err);
stream::empty().boxed()
}
Err(_) => stream::once(future::ready(License::default()))
.validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
.boxed(),
}
}
}
.expand_licenses()
.chain(stream::iter(vec![NoMoreLicense]))
}
}