apollo_router/router/event/
license.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::str::FromStr;
4
5use derivative::Derivative;
6use derive_more::Display;
7use derive_more::From;
8use futures::prelude::*;
9
10use crate::router::Event;
11use crate::router::Event::NoMoreLicense;
12use crate::uplink::UplinkConfig;
13use crate::uplink::license_enforcement::Audience;
14use crate::uplink::license_enforcement::License;
15use crate::uplink::license_stream::LicenseQuery;
16use crate::uplink::license_stream::LicenseStreamExt;
17use crate::uplink::stream_from_uplink;
18
19const APOLLO_ROUTER_LICENSE_INVALID: &str = "APOLLO_ROUTER_LICENSE_INVALID";
20
21type LicenseStream = Pin<Box<dyn Stream<Item = License> + Send>>;
22
23#[derive(From, Display, Derivative)]
26#[derivative(Debug)]
27#[non_exhaustive]
28pub enum LicenseSource {
29 #[display("Static")]
31 Static { license: License },
32
33 #[display("Env")]
35 Env,
36
37 #[display("Stream")]
39 Stream(#[derivative(Debug = "ignore")] LicenseStream),
40
41 #[display("File")]
43 File {
44 path: PathBuf,
46
47 watch: bool,
49 },
50
51 #[display("Registry")]
53 Registry(UplinkConfig),
54}
55
56impl Default for LicenseSource {
57 fn default() -> Self {
58 LicenseSource::Static {
59 license: Default::default(),
60 }
61 }
62}
63
64const VALID_AUDIENCES_USER_SUPLIED_LICENSES: [Audience; 2] = [Audience::Offline, Audience::Cloud];
65
66impl LicenseSource {
67 pub(crate) fn into_stream(self) -> impl Stream<Item = Event> {
69 match self {
70 LicenseSource::Static { license } => stream::once(future::ready(license))
71 .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
72 .boxed(),
73 LicenseSource::Stream(stream) => stream
74 .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
75 .boxed(),
76 LicenseSource::File { path, watch } => {
77 if !path.exists() {
79 tracing::error!(
80 "License file at path '{}' does not exist.",
81 path.to_string_lossy()
82 );
83 stream::empty().boxed()
84 } else {
85 match std::fs::read_to_string(&path).map(|e| e.parse()) {
87 Ok(Ok(license)) => {
88 if watch {
89 crate::files::watch(&path)
90 .filter_map(move |_| {
91 let path = path.clone();
92 async move {
93 let result = tokio::fs::read_to_string(&path).await;
94 if let Err(e) = &result {
95 tracing::error!(
96 "failed to read license file, {}",
97 e
98 );
99 }
100 result.ok()
101 }
102 })
103 .filter_map(|e| async move {
104 let result = e.parse();
105 if let Err(e) = &result {
106 tracing::error!(
107 code = APOLLO_ROUTER_LICENSE_INVALID,
108 "failed to parse license file, {}",
109 e
110 );
111 }
112 result.ok()
113 })
114 .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
115 .boxed()
116 } else {
117 stream::once(future::ready(license))
118 .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
119 .boxed()
120 }
121 }
122 Ok(Err(err)) => {
123 tracing::error!(
124 code = APOLLO_ROUTER_LICENSE_INVALID,
125 "Failed to parse license: {}",
126 err
127 );
128 stream::empty().boxed()
129 }
130 Err(err) => {
131 tracing::error!(
132 code = APOLLO_ROUTER_LICENSE_INVALID,
133 "Failed to read license: {}",
134 err
135 );
136 stream::empty().boxed()
137 }
138 }
139 }
140 }
141
142 LicenseSource::Registry(uplink_config) => {
143 stream_from_uplink::<LicenseQuery, License>(uplink_config)
144 .filter_map(|res| {
145 future::ready(match res {
146 Ok(license) => Some(license),
147 Err(e) => {
148 tracing::error!(code = APOLLO_ROUTER_LICENSE_INVALID, "{}", e);
149 None
150 }
151 })
152 })
153 .boxed()
154 }
155 LicenseSource::Env => {
156 match std::env::var("APOLLO_ROUTER_LICENSE").map(|e| License::from_str(&e)) {
158 Ok(Ok(license)) => stream::once(future::ready(license)).boxed(),
159 Ok(Err(err)) => {
160 tracing::error!("Failed to parse license: {}", err);
161 stream::empty().boxed()
162 }
163 Err(_) => stream::once(future::ready(License::default()))
164 .validate_audience(VALID_AUDIENCES_USER_SUPLIED_LICENSES)
165 .boxed(),
166 }
167 }
168 }
169 .expand_licenses()
170 .chain(stream::iter(vec![NoMoreLicense]))
171 }
172}