Skip to main content

hive_router_plan_executor/executors/
map.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    sync::Arc,
4    time::Duration,
5};
6
7use dashmap::DashMap;
8use hive_router_config::{
9    override_subgraph_urls::UrlOrExpression, traffic_shaping::DurationOrExpression,
10    HiveRouterConfig,
11};
12use hive_router_internal::expressions::vrl::compiler::Program as VrlProgram;
13use hive_router_internal::expressions::vrl::core::Value as VrlValue;
14use hive_router_internal::expressions::{CompileExpression, DurationOrProgram, ExecutableProgram};
15use http::Uri;
16use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
17use hyper_util::{
18    client::legacy::{connect::HttpConnector, Client},
19    rt::{TokioExecutor, TokioTimer},
20};
21use tokio::sync::{OnceCell, Semaphore};
22use tracing::error;
23
24use crate::{
25    execution::client_request_details::ClientRequestDetails,
26    executors::{
27        common::{SubgraphExecutionRequest, SubgraphExecutor, SubgraphExecutorBoxedArc},
28        dedupe::ABuildHasher,
29        error::SubgraphExecutorError,
30        http::{HTTPSubgraphExecutor, HttpClient, HttpResponse},
31    },
32    response::{graphql_error::GraphQLError, subgraph_response::SubgraphResponse},
33};
34
35type SubgraphName = String;
36type SubgraphEndpoint = String;
37type ExecutorsBySubgraphMap =
38    DashMap<SubgraphName, DashMap<SubgraphEndpoint, SubgraphExecutorBoxedArc>>;
39type StaticEndpointsBySubgraphMap = DashMap<SubgraphName, SubgraphEndpoint>;
40type ExpressionEndpointsBySubgraphMap = HashMap<SubgraphName, VrlProgram>;
41type TimeoutsBySubgraph = DashMap<SubgraphName, DurationOrProgram>;
42
43struct ResolvedSubgraphConfig<'a> {
44    client: Arc<HttpClient>,
45    timeout_config: &'a DurationOrExpression,
46    dedupe_enabled: bool,
47}
48
49pub type InflightRequestsMap = Arc<DashMap<u64, Arc<OnceCell<HttpResponse>>, ABuildHasher>>;
50
51pub struct SubgraphExecutorMap {
52    executors_by_subgraph: ExecutorsBySubgraphMap,
53    /// Mapping from subgraph name to static endpoint for quick lookup
54    /// based on subgraph SDL and static overrides from router's config.
55    static_endpoints_by_subgraph: StaticEndpointsBySubgraphMap,
56    /// Mapping from subgraph name to VRL expression program
57    /// Only contains subgraphs with expression-based endpoint overrides
58    expression_endpoints_by_subgraph: ExpressionEndpointsBySubgraphMap,
59    timeouts_by_subgraph: TimeoutsBySubgraph,
60    global_timeout: DurationOrProgram,
61    config: Arc<HiveRouterConfig>,
62    client: Arc<HttpClient>,
63    semaphores_by_origin: DashMap<String, Arc<Semaphore>>,
64    max_connections_per_host: usize,
65    in_flight_requests: InflightRequestsMap,
66}
67
68fn build_https_executor() -> Result<HttpsConnector<HttpConnector>, SubgraphExecutorError> {
69    HttpsConnectorBuilder::new()
70        .with_native_roots()
71        .map_err(|e| SubgraphExecutorError::NativeTlsCertificatesError(e.to_string()))
72        .map(|b| b.https_or_http().enable_http1().enable_http2().build())
73}
74
75impl SubgraphExecutorMap {
76    pub fn new(
77        config: Arc<HiveRouterConfig>,
78        global_timeout: DurationOrProgram,
79    ) -> Result<Self, SubgraphExecutorError> {
80        let client: HttpClient = Client::builder(TokioExecutor::new())
81            .pool_timer(TokioTimer::new())
82            .pool_idle_timeout(config.traffic_shaping.all.pool_idle_timeout)
83            .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host)
84            .build(build_https_executor()?);
85
86        let max_connections_per_host = config.traffic_shaping.max_connections_per_host;
87
88        Ok(SubgraphExecutorMap {
89            executors_by_subgraph: Default::default(),
90            static_endpoints_by_subgraph: Default::default(),
91            expression_endpoints_by_subgraph: Default::default(),
92            config,
93            client: Arc::new(client),
94            semaphores_by_origin: Default::default(),
95            max_connections_per_host,
96            in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())),
97            timeouts_by_subgraph: Default::default(),
98            global_timeout,
99        })
100    }
101
102    pub fn from_http_endpoint_map(
103        subgraph_endpoint_map: &HashMap<SubgraphName, String>,
104        config: Arc<HiveRouterConfig>,
105    ) -> Result<Self, SubgraphExecutorError> {
106        let global_timeout = DurationOrProgram::compile(
107            &config.traffic_shaping.all.request_timeout,
108            None,
109        )
110        .map_err(|err| {
111            SubgraphExecutorError::RequestTimeoutExpressionBuild("all".to_string(), err.diagnostics)
112        })?;
113        let mut subgraph_executor_map = SubgraphExecutorMap::new(config.clone(), global_timeout)?;
114
115        for (subgraph_name, original_endpoint_str) in subgraph_endpoint_map.iter() {
116            let endpoint_config = config
117                .override_subgraph_urls
118                .get_subgraph_url(subgraph_name);
119
120            let endpoint_str = match endpoint_config {
121                Some(UrlOrExpression::Url(url)) => url.clone(),
122                Some(UrlOrExpression::Expression { expression }) => {
123                    subgraph_executor_map
124                        .register_endpoint_expression(subgraph_name, expression)?;
125                    original_endpoint_str.clone()
126                }
127                None => original_endpoint_str.clone(),
128            };
129
130            subgraph_executor_map.register_static_endpoint(subgraph_name, &endpoint_str);
131            subgraph_executor_map.register_executor(subgraph_name, &endpoint_str)?;
132            subgraph_executor_map.register_subgraph_timeout(subgraph_name)?;
133        }
134
135        Ok(subgraph_executor_map)
136    }
137
138    pub async fn execute<'exec>(
139        &self,
140        subgraph_name: &'exec str,
141        execution_request: SubgraphExecutionRequest<'exec>,
142        client_request: &ClientRequestDetails<'exec>,
143    ) -> SubgraphResponse<'exec> {
144        match self.get_or_create_executor(subgraph_name, client_request) {
145            Ok(executor) => {
146                let timeout = self
147                    .timeouts_by_subgraph
148                    .get(subgraph_name)
149                    .map(|t| {
150                        let global_timeout_duration =
151                            resolve_timeout(&self.global_timeout, client_request, None, "all")?;
152                        resolve_timeout(
153                            t.value(),
154                            client_request,
155                            Some(global_timeout_duration),
156                            subgraph_name,
157                        )
158                    })
159                    .transpose();
160
161                match timeout {
162                    Ok(timeout) => executor.execute(execution_request, timeout).await,
163                    Err(err) => {
164                        error!(
165                            "Failed to resolve timeout for subgraph '{}': {}",
166                            subgraph_name, err,
167                        );
168                        self.internal_server_error_response(err.into(), subgraph_name)
169                    }
170                }
171            }
172            Err(err) => {
173                error!(
174                    "Subgraph executor error for subgraph '{}': {}",
175                    subgraph_name, err,
176                );
177                self.internal_server_error_response(err.into(), subgraph_name)
178            }
179        }
180    }
181
182    fn internal_server_error_response<'exec>(
183        &self,
184        graphql_error: GraphQLError,
185        subgraph_name: &'exec str,
186    ) -> SubgraphResponse<'exec> {
187        let error_with_subgraph_name = graphql_error.add_subgraph_name(subgraph_name);
188        SubgraphResponse {
189            errors: Some(vec![error_with_subgraph_name]),
190            ..Default::default()
191        }
192    }
193
194    /// Looks up a subgraph executor based on the subgraph name.
195    /// Looks for an expression first, falling back to a static endpoint.
196    /// If nothing is found, returns an error.
197    fn get_or_create_executor(
198        &self,
199        subgraph_name: &str,
200        client_request: &ClientRequestDetails<'_>,
201    ) -> Result<SubgraphExecutorBoxedArc, SubgraphExecutorError> {
202        self.expression_endpoints_by_subgraph
203            .get(subgraph_name)
204            .map(|expression| {
205                self.get_or_create_executor_from_expression(
206                    subgraph_name,
207                    expression,
208                    client_request,
209                )
210            })
211            .unwrap_or_else(|| {
212                self.get_executor_from_static_endpoint(subgraph_name)
213                    .ok_or_else(|| {
214                        SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())
215                    })
216            })
217    }
218
219    /// Looks up a subgraph executor,
220    /// or creates one if a VRL expression is defined for the subgraph.
221    /// The expression is resolved to get the endpoint URL,
222    /// and a new executor is created and stored for future requests.
223    fn get_or_create_executor_from_expression(
224        &self,
225        subgraph_name: &str,
226        expression: &VrlProgram,
227        client_request: &ClientRequestDetails<'_>,
228    ) -> Result<SubgraphExecutorBoxedArc, SubgraphExecutorError> {
229        let original_url_value = VrlValue::Bytes(
230            self.static_endpoints_by_subgraph
231                .get(subgraph_name)
232                .map(|endpoint| endpoint.value().clone())
233                .ok_or_else(|| {
234                    SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())
235                })?
236                .into(),
237        );
238
239        let value = VrlValue::Object(BTreeMap::from([
240            ("request".into(), client_request.into()),
241            ("default".into(), original_url_value),
242        ]));
243
244        // Resolve the expression to get an endpoint URL.
245        let endpoint_result = expression.execute(value).map_err(|err| {
246            SubgraphExecutorError::EndpointExpressionResolutionFailure(
247                subgraph_name.to_string(),
248                err.to_string(),
249            )
250        })?;
251
252        let endpoint_str = match endpoint_result.as_str() {
253            Some(s) => Ok(s.to_string()),
254            None => Err(SubgraphExecutorError::EndpointExpressionWrongType(
255                subgraph_name.to_string(),
256            )),
257        }?;
258
259        // Check if an executor for this endpoint already exists.
260        if let Some(executor) = self.get_executor_from_endpoint(subgraph_name, &endpoint_str) {
261            return Ok(executor);
262        }
263
264        // If not, create and register a new one.
265        self.register_executor(subgraph_name, &endpoint_str)
266    }
267
268    /// Looks up a subgraph executor based on a static endpoint URL.
269    fn get_executor_from_static_endpoint(
270        &self,
271        subgraph_name: &str,
272    ) -> Option<SubgraphExecutorBoxedArc> {
273        let endpoint_ref = self.static_endpoints_by_subgraph.get(subgraph_name)?;
274        let endpoint_str = endpoint_ref.value();
275        self.get_executor_from_endpoint(subgraph_name, endpoint_str)
276    }
277
278    /// Looks up a subgraph executor for a given endpoint URL.
279    #[inline]
280    fn get_executor_from_endpoint(
281        &self,
282        subgraph_name: &str,
283        endpoint_str: &str,
284    ) -> Option<SubgraphExecutorBoxedArc> {
285        self.executors_by_subgraph
286            .get(subgraph_name)
287            .and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone()))
288    }
289
290    /// Registers a new HTTP subgraph executor for the given subgraph name and endpoint URL.
291    /// It makes it availble for future requests.
292    fn register_endpoint_expression(
293        &mut self,
294        subgraph_name: &str,
295        expression: &str,
296    ) -> Result<(), SubgraphExecutorError> {
297        let program = expression.compile_expression(None).map_err(|err| {
298            SubgraphExecutorError::EndpointExpressionBuild(
299                subgraph_name.to_string(),
300                err.diagnostics,
301            )
302        })?;
303        self.expression_endpoints_by_subgraph
304            .insert(subgraph_name.to_string(), program);
305
306        Ok(())
307    }
308
309    /// Registers a static endpoint for the given subgraph name.
310    /// This is used for quick lookup when no expression is defined
311    /// or when resolving the expression (to have the original URL available there).
312    fn register_static_endpoint(&self, subgraph_name: &str, endpoint_str: &str) {
313        self.static_endpoints_by_subgraph
314            .insert(subgraph_name.to_string(), endpoint_str.to_string());
315    }
316
317    /// Registers a new HTTP subgraph executor for the given subgraph name and endpoint URL.
318    /// It makes it available for future requests.
319    fn register_executor(
320        &self,
321        subgraph_name: &str,
322        endpoint_str: &str,
323    ) -> Result<SubgraphExecutorBoxedArc, SubgraphExecutorError> {
324        let endpoint_uri = endpoint_str.parse::<Uri>().map_err(|e| {
325            SubgraphExecutorError::EndpointParseFailure(endpoint_str.to_string(), e.to_string())
326        })?;
327
328        let origin = format!(
329            "{}://{}:{}",
330            endpoint_uri.scheme_str().unwrap_or("http"),
331            endpoint_uri.host().unwrap_or(""),
332            endpoint_uri.port_u16().unwrap_or_else(|| {
333                if endpoint_uri.scheme_str() == Some("https") {
334                    443
335                } else {
336                    80
337                }
338            })
339        );
340
341        let semaphore = self
342            .semaphores_by_origin
343            .entry(origin)
344            .or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host)))
345            .clone();
346
347        let subgraph_config = self.resolve_subgraph_config(subgraph_name)?;
348
349        let executor = HTTPSubgraphExecutor::new(
350            subgraph_name.to_string(),
351            endpoint_uri,
352            subgraph_config.client,
353            semaphore,
354            subgraph_config.dedupe_enabled,
355            self.in_flight_requests.clone(),
356        );
357
358        let executor_arc = executor.to_boxed_arc();
359
360        self.executors_by_subgraph
361            .entry(subgraph_name.to_string())
362            .or_default()
363            .insert(endpoint_str.to_string(), executor_arc.clone());
364
365        Ok(executor_arc)
366    }
367
368    /// Resolves traffic shaping configuration for a specific subgraph, applying subgraph-specific
369    /// overrides on top of global settings
370    fn resolve_subgraph_config<'a>(
371        &'a self,
372        subgraph_name: &'a str,
373    ) -> Result<ResolvedSubgraphConfig<'a>, SubgraphExecutorError> {
374        let mut config = ResolvedSubgraphConfig {
375            client: self.client.clone(),
376            timeout_config: &self.config.traffic_shaping.all.request_timeout,
377            dedupe_enabled: self.config.traffic_shaping.all.dedupe_enabled,
378        };
379
380        let Some(subgraph_config) = self.config.traffic_shaping.subgraphs.get(subgraph_name) else {
381            return Ok(config);
382        };
383
384        // Override client only if pool idle timeout is customized
385        if let Some(pool_idle_timeout) = subgraph_config.pool_idle_timeout {
386            // Only override if it's different from the global setting
387            if pool_idle_timeout != self.config.traffic_shaping.all.pool_idle_timeout {
388                config.client = Arc::new(
389                    Client::builder(TokioExecutor::new())
390                        .pool_timer(TokioTimer::new())
391                        .pool_idle_timeout(pool_idle_timeout)
392                        .pool_max_idle_per_host(self.max_connections_per_host)
393                        .build(build_https_executor()?),
394                );
395            }
396        }
397
398        // Apply other subgraph-specific overrides
399        if let Some(dedupe_enabled) = subgraph_config.dedupe_enabled {
400            config.dedupe_enabled = dedupe_enabled;
401        }
402
403        if let Some(custom_timeout) = &subgraph_config.request_timeout {
404            config.timeout_config = custom_timeout;
405        }
406
407        Ok(config)
408    }
409
410    /// Compiles and registers a timeout for a specific subgraph.
411    /// If the subgraph has a custom timeout configuration, it will be used.
412    /// Otherwise, the global timeout configuration will be used.
413    fn register_subgraph_timeout(&self, subgraph_name: &str) -> Result<(), SubgraphExecutorError> {
414        // Check if this subgraph already has a timeout registered
415        if self.timeouts_by_subgraph.contains_key(subgraph_name) {
416            return Ok(());
417        }
418
419        // Get the timeout configuration for this subgraph, or fall back to global
420        let timeout_config = self
421            .config
422            .traffic_shaping
423            .subgraphs
424            .get(subgraph_name)
425            .and_then(|s| s.request_timeout.as_ref())
426            .unwrap_or(&self.config.traffic_shaping.all.request_timeout);
427
428        // Compile the timeout configuration into a DurationOrProgram
429        let timeout_prog = DurationOrProgram::compile(timeout_config, None).map_err(|err| {
430            SubgraphExecutorError::RequestTimeoutExpressionBuild(
431                subgraph_name.to_string(),
432                err.diagnostics,
433            )
434        })?;
435
436        // Register the compiled timeout
437        self.timeouts_by_subgraph
438            .insert(subgraph_name.to_string(), timeout_prog);
439
440        Ok(())
441    }
442}
443
444/// Resolves a timeout DurationOrProgram to a concrete Duration.
445/// Optionally includes a default timeout value in the VRL context.
446fn resolve_timeout(
447    duration_or_program: &DurationOrProgram,
448    client_request: &ClientRequestDetails<'_>,
449    default_timeout: Option<Duration>,
450    timeout_name: &str,
451) -> Result<Duration, SubgraphExecutorError> {
452    duration_or_program
453        .resolve(|| {
454            let mut context_map = BTreeMap::new();
455            context_map.insert("request".into(), client_request.into());
456
457            if let Some(default) = default_timeout {
458                context_map.insert(
459                    "default".into(),
460                    VrlValue::Integer(default.as_millis() as i64),
461                );
462            }
463
464            VrlValue::Object(context_map)
465        })
466        .map_err(|err| {
467            SubgraphExecutorError::TimeoutExpressionResolution(
468                timeout_name.to_string(),
469                err.to_string(),
470            )
471        })
472}