hive_router_plan_executor/executors/
map.rs1use std::{
2 collections::{BTreeMap, HashMap},
3 sync::Arc,
4 time::Duration,
5};
6
7use bytes::{BufMut, Bytes, BytesMut};
8use dashmap::DashMap;
9use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig};
10use http::Uri;
11use hyper_tls::HttpsConnector;
12use hyper_util::{
13 client::legacy::Client,
14 rt::{TokioExecutor, TokioTimer},
15};
16use tokio::sync::{OnceCell, Semaphore};
17use tracing::error;
18use vrl::{
19 compiler::compile as vrl_compile,
20 compiler::Program as VrlProgram,
21 compiler::TargetValue as VrlTargetValue,
22 core::Value as VrlValue,
23 prelude::Function as VrlFunction,
24 prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone},
25 stdlib::all as vrl_build_functions,
26 value::Secrets as VrlSecrets,
27};
28
29use crate::{
30 execution::plan::ClientRequestDetails,
31 executors::{
32 common::{
33 HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
34 },
35 dedupe::{ABuildHasher, SharedResponse},
36 error::SubgraphExecutorError,
37 http::{HTTPSubgraphExecutor, HttpClient},
38 },
39 response::graphql_error::GraphQLError,
40};
41
42type SubgraphName = String;
43type SubgraphEndpoint = String;
44type ExecutorsBySubgraphMap =
45 DashMap<SubgraphName, DashMap<SubgraphEndpoint, SubgraphExecutorBoxedArc>>;
46type EndpointsBySubgraphMap = DashMap<SubgraphName, SubgraphEndpoint>;
47type ExpressionsBySubgraphMap = HashMap<SubgraphName, VrlProgram>;
48
49pub struct SubgraphExecutorMap {
50 executors_by_subgraph: ExecutorsBySubgraphMap,
51 static_endpoints_by_subgraph: EndpointsBySubgraphMap,
54 expressions_by_subgraph: ExpressionsBySubgraphMap,
56 config: Arc<HiveRouterConfig>,
57 vrl_functions: Vec<Box<dyn VrlFunction>>,
59 client: Arc<HttpClient>,
60 semaphores_by_origin: DashMap<String, Arc<Semaphore>>,
61 max_connections_per_host: usize,
62 in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
63}
64
65impl SubgraphExecutorMap {
66 pub fn new(config: Arc<HiveRouterConfig>) -> Self {
67 let https = HttpsConnector::new();
68 let client: HttpClient = Client::builder(TokioExecutor::new())
69 .pool_timer(TokioTimer::new())
70 .pool_idle_timeout(Duration::from_secs(
71 config.traffic_shaping.pool_idle_timeout_seconds,
72 ))
73 .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host)
74 .build(https);
75
76 let max_connections_per_host = config.traffic_shaping.max_connections_per_host;
77
78 SubgraphExecutorMap {
79 executors_by_subgraph: Default::default(),
80 static_endpoints_by_subgraph: Default::default(),
81 expressions_by_subgraph: Default::default(),
82 config,
83 vrl_functions: vrl_build_functions(),
84 client: Arc::new(client),
85 semaphores_by_origin: Default::default(),
86 max_connections_per_host,
87 in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())),
88 }
89 }
90
91 pub fn from_http_endpoint_map(
92 subgraph_endpoint_map: HashMap<SubgraphName, SubgraphEndpoint>,
93 config: Arc<HiveRouterConfig>,
94 ) -> Result<Self, SubgraphExecutorError> {
95 let mut subgraph_executor_map = SubgraphExecutorMap::new(config.clone());
96
97 for (subgraph_name, original_endpoint_str) in subgraph_endpoint_map.into_iter() {
98 let endpoint_str = config
99 .override_subgraph_urls
100 .get_subgraph_url(&subgraph_name);
101
102 let endpoint_str = match endpoint_str {
103 Some(UrlOrExpression::Url(url)) => url,
104 Some(UrlOrExpression::Expression { expression }) => {
105 subgraph_executor_map.register_expression(&subgraph_name, expression)?;
106 &original_endpoint_str
107 }
108 None => &original_endpoint_str,
109 };
110
111 subgraph_executor_map.register_executor(&subgraph_name, endpoint_str)?;
112 subgraph_executor_map.register_static_endpoint(&subgraph_name, endpoint_str);
113 }
114
115 Ok(subgraph_executor_map)
116 }
117
118 pub async fn execute<'a>(
119 &self,
120 subgraph_name: &str,
121 execution_request: HttpExecutionRequest<'a>,
122 client_request: &ClientRequestDetails<'a>,
123 ) -> HttpExecutionResponse {
124 match self.get_or_create_executor(subgraph_name, client_request) {
125 Ok(Some(executor)) => executor.execute(execution_request).await,
126 Err(err) => {
127 error!(
128 "Subgraph executor error for subgraph '{}': {}",
129 subgraph_name, err,
130 );
131 self.internal_server_error_response(err.into(), subgraph_name)
132 }
133 Ok(None) => {
134 error!(
135 "Subgraph executor not found for subgraph '{}'",
136 subgraph_name
137 );
138 self.internal_server_error_response("Internal server error".into(), subgraph_name)
139 }
140 }
141 }
142
143 fn internal_server_error_response(
144 &self,
145 graphql_error: GraphQLError,
146 subgraph_name: &str,
147 ) -> HttpExecutionResponse {
148 let errors = vec![graphql_error.add_subgraph_name(subgraph_name)];
149 let errors_bytes = sonic_rs::to_vec(&errors).unwrap();
150 let mut buffer = BytesMut::new();
151 buffer.put_slice(b"{\"errors\":");
152 buffer.put_slice(&errors_bytes);
153 buffer.put_slice(b"}");
154
155 HttpExecutionResponse {
156 body: buffer.freeze(),
157 headers: Default::default(),
158 }
159 }
160
161 fn get_or_create_executor(
165 &self,
166 subgraph_name: &str,
167 client_request: &ClientRequestDetails<'_>,
168 ) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
169 let from_expression =
170 self.get_or_create_executor_from_expression(subgraph_name, client_request)?;
171
172 if from_expression.is_some() {
173 return Ok(from_expression);
174 }
175
176 Ok(self.get_executor_from_static_endpoint(subgraph_name))
177 }
178
179 fn get_or_create_executor_from_expression(
184 &self,
185 subgraph_name: &str,
186 client_request: &ClientRequestDetails<'_>,
187 ) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
188 if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) {
189 let original_url_value = VrlValue::Bytes(Bytes::from(
190 self.static_endpoints_by_subgraph
191 .get(subgraph_name)
192 .map(|endpoint| endpoint.value().clone())
193 .ok_or_else(|| {
194 SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())
195 })?,
196 ));
197 let mut target = VrlTargetValue {
198 value: VrlValue::Object(BTreeMap::from([
199 ("request".into(), client_request.into()),
200 ("original_url".into(), original_url_value),
201 ])),
202 metadata: VrlValue::Object(BTreeMap::new()),
203 secrets: VrlSecrets::default(),
204 };
205
206 let mut state = VrlState::default();
207 let timezone = VrlTimeZone::default();
208 let mut ctx = VrlContext::new(&mut target, &mut state, &timezone);
209
210 let endpoint_result = expression.resolve(&mut ctx).map_err(|err| {
212 SubgraphExecutorError::new_endpoint_expression_resolution_failure(
213 subgraph_name.to_string(),
214 err,
215 )
216 })?;
217 let endpoint_str = match endpoint_result.as_str() {
218 Some(s) => s.to_string(),
219 None => {
220 return Err(SubgraphExecutorError::EndpointExpressionWrongType(
221 subgraph_name.to_string(),
222 ));
223 }
224 };
225
226 let existing_executor = self
228 .executors_by_subgraph
229 .get(subgraph_name)
230 .and_then(|endpoints| endpoints.get(&endpoint_str).map(|e| e.clone()));
231
232 if let Some(executor) = existing_executor {
233 return Ok(Some(executor));
234 }
235
236 self.register_executor(subgraph_name, &endpoint_str)?;
238
239 let endpoints = self
240 .executors_by_subgraph
241 .get(subgraph_name)
242 .expect("Executor was just registered, should be present");
243 return Ok(endpoints.get(&endpoint_str).map(|e| e.clone()));
244 }
245
246 Ok(None)
247 }
248
249 fn get_executor_from_static_endpoint(
251 &self,
252 subgraph_name: &str,
253 ) -> Option<SubgraphExecutorBoxedArc> {
254 self.static_endpoints_by_subgraph
255 .get(subgraph_name)
256 .and_then(|endpoint_ref| {
257 let endpoint_str = endpoint_ref.value();
258 self.executors_by_subgraph
259 .get(subgraph_name)
260 .and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone()))
261 })
262 }
263
264 fn register_expression(
268 &mut self,
269 subgraph_name: &str,
270 expression: &str,
271 ) -> Result<(), SubgraphExecutorError> {
272 let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| {
273 SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e)
274 })?;
275
276 self.expressions_by_subgraph
277 .insert(subgraph_name.to_string(), compilation_result.program);
278
279 Ok(())
280 }
281
282 fn register_static_endpoint(&self, subgraph_name: &str, endpoint_str: &str) {
286 self.static_endpoints_by_subgraph
287 .insert(subgraph_name.to_string(), endpoint_str.to_string());
288 }
289
290 fn register_executor(
293 &self,
294 subgraph_name: &str,
295 endpoint_str: &str,
296 ) -> Result<(), SubgraphExecutorError> {
297 let endpoint_uri = endpoint_str.parse::<Uri>().map_err(|e| {
298 SubgraphExecutorError::EndpointParseFailure(endpoint_str.to_string(), e.to_string())
299 })?;
300
301 let origin = format!(
302 "{}://{}:{}",
303 endpoint_uri.scheme_str().unwrap_or("http"),
304 endpoint_uri.host().unwrap_or(""),
305 endpoint_uri.port_u16().unwrap_or_else(|| {
306 if endpoint_uri.scheme_str() == Some("https") {
307 443
308 } else {
309 80
310 }
311 })
312 );
313
314 let semaphore = self
315 .semaphores_by_origin
316 .entry(origin)
317 .or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host)))
318 .clone();
319
320 let executor = HTTPSubgraphExecutor::new(
321 subgraph_name.to_string(),
322 endpoint_uri,
323 self.client.clone(),
324 semaphore,
325 self.config.clone(),
326 self.in_flight_requests.clone(),
327 );
328
329 self.executors_by_subgraph
330 .entry(subgraph_name.to_string())
331 .or_default()
332 .insert(endpoint_str.to_string(), executor.to_boxed_arc());
333
334 Ok(())
335 }
336}