hive_router_plan_executor/executors/
map.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    sync::Arc,
4    time::Duration,
5};
6
7use bytes::{BufMut, Bytes, BytesMut};
8use dashmap::DashMap;
9use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig};
10use http::Uri;
11use hyper_tls::HttpsConnector;
12use hyper_util::{
13    client::legacy::Client,
14    rt::{TokioExecutor, TokioTimer},
15};
16use tokio::sync::{OnceCell, Semaphore};
17use tracing::error;
18use vrl::{
19    compiler::compile as vrl_compile,
20    compiler::Program as VrlProgram,
21    compiler::TargetValue as VrlTargetValue,
22    core::Value as VrlValue,
23    prelude::Function as VrlFunction,
24    prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone},
25    stdlib::all as vrl_build_functions,
26    value::Secrets as VrlSecrets,
27};
28
29use crate::{
30    execution::plan::ClientRequestDetails,
31    executors::{
32        common::{
33            HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc,
34        },
35        dedupe::{ABuildHasher, SharedResponse},
36        error::SubgraphExecutorError,
37        http::{HTTPSubgraphExecutor, HttpClient},
38    },
39    response::graphql_error::GraphQLError,
40};
41
42type SubgraphName = String;
43type SubgraphEndpoint = String;
44type ExecutorsBySubgraphMap =
45    DashMap<SubgraphName, DashMap<SubgraphEndpoint, SubgraphExecutorBoxedArc>>;
46type EndpointsBySubgraphMap = DashMap<SubgraphName, SubgraphEndpoint>;
47type ExpressionsBySubgraphMap = HashMap<SubgraphName, VrlProgram>;
48
49pub struct SubgraphExecutorMap {
50    executors_by_subgraph: ExecutorsBySubgraphMap,
51    /// Mapping from subgraph name to endpoint for quick lookup
52    /// based on supergrah sdl and static overrides from router's config.
53    static_endpoints_by_subgraph: EndpointsBySubgraphMap,
54    /// Mapping from subgraph name to VRL expression program
55    expressions_by_subgraph: ExpressionsBySubgraphMap,
56    config: Arc<HiveRouterConfig>,
57    /// Precompiled VRL functions to be used in endpoint expressions.
58    vrl_functions: Vec<Box<dyn VrlFunction>>,
59    client: Arc<HttpClient>,
60    semaphores_by_origin: DashMap<String, Arc<Semaphore>>,
61    max_connections_per_host: usize,
62    in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
63}
64
65impl SubgraphExecutorMap {
66    pub fn new(config: Arc<HiveRouterConfig>) -> Self {
67        let https = HttpsConnector::new();
68        let client: HttpClient = Client::builder(TokioExecutor::new())
69            .pool_timer(TokioTimer::new())
70            .pool_idle_timeout(Duration::from_secs(
71                config.traffic_shaping.pool_idle_timeout_seconds,
72            ))
73            .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host)
74            .build(https);
75
76        let max_connections_per_host = config.traffic_shaping.max_connections_per_host;
77
78        SubgraphExecutorMap {
79            executors_by_subgraph: Default::default(),
80            static_endpoints_by_subgraph: Default::default(),
81            expressions_by_subgraph: Default::default(),
82            config,
83            vrl_functions: vrl_build_functions(),
84            client: Arc::new(client),
85            semaphores_by_origin: Default::default(),
86            max_connections_per_host,
87            in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())),
88        }
89    }
90
91    pub fn from_http_endpoint_map(
92        subgraph_endpoint_map: HashMap<SubgraphName, SubgraphEndpoint>,
93        config: Arc<HiveRouterConfig>,
94    ) -> Result<Self, SubgraphExecutorError> {
95        let mut subgraph_executor_map = SubgraphExecutorMap::new(config.clone());
96
97        for (subgraph_name, original_endpoint_str) in subgraph_endpoint_map.into_iter() {
98            let endpoint_str = config
99                .override_subgraph_urls
100                .get_subgraph_url(&subgraph_name);
101
102            let endpoint_str = match endpoint_str {
103                Some(UrlOrExpression::Url(url)) => url,
104                Some(UrlOrExpression::Expression { expression }) => {
105                    subgraph_executor_map.register_expression(&subgraph_name, expression)?;
106                    &original_endpoint_str
107                }
108                None => &original_endpoint_str,
109            };
110
111            subgraph_executor_map.register_executor(&subgraph_name, endpoint_str)?;
112            subgraph_executor_map.register_static_endpoint(&subgraph_name, endpoint_str);
113        }
114
115        Ok(subgraph_executor_map)
116    }
117
118    pub async fn execute<'a>(
119        &self,
120        subgraph_name: &str,
121        execution_request: HttpExecutionRequest<'a>,
122        client_request: &ClientRequestDetails<'a>,
123    ) -> HttpExecutionResponse {
124        match self.get_or_create_executor(subgraph_name, client_request) {
125            Ok(Some(executor)) => executor.execute(execution_request).await,
126            Err(err) => {
127                error!(
128                    "Subgraph executor error for subgraph '{}': {}",
129                    subgraph_name, err,
130                );
131                self.internal_server_error_response(err.into(), subgraph_name)
132            }
133            Ok(None) => {
134                error!(
135                    "Subgraph executor not found for subgraph '{}'",
136                    subgraph_name
137                );
138                self.internal_server_error_response("Internal server error".into(), subgraph_name)
139            }
140        }
141    }
142
143    fn internal_server_error_response(
144        &self,
145        graphql_error: GraphQLError,
146        subgraph_name: &str,
147    ) -> HttpExecutionResponse {
148        let errors = vec![graphql_error.add_subgraph_name(subgraph_name)];
149        let errors_bytes = sonic_rs::to_vec(&errors).unwrap();
150        let mut buffer = BytesMut::new();
151        buffer.put_slice(b"{\"errors\":");
152        buffer.put_slice(&errors_bytes);
153        buffer.put_slice(b"}");
154
155        HttpExecutionResponse {
156            body: buffer.freeze(),
157            headers: Default::default(),
158        }
159    }
160
161    /// Looks up a subgraph executor based on the subgraph name.
162    /// Looks for an expression first, falling back to a static endpoint.
163    /// If nothing is found, returns None.
164    fn get_or_create_executor(
165        &self,
166        subgraph_name: &str,
167        client_request: &ClientRequestDetails<'_>,
168    ) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
169        let from_expression =
170            self.get_or_create_executor_from_expression(subgraph_name, client_request)?;
171
172        if from_expression.is_some() {
173            return Ok(from_expression);
174        }
175
176        Ok(self.get_executor_from_static_endpoint(subgraph_name))
177    }
178
179    /// Looks up a subgraph executor,
180    /// or creates one if a VRL expression is defined for the subgraph.
181    /// The expression is resolved to get the endpoint URL,
182    /// and a new executor is created and stored for future requests.
183    fn get_or_create_executor_from_expression(
184        &self,
185        subgraph_name: &str,
186        client_request: &ClientRequestDetails<'_>,
187    ) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
188        if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) {
189            let original_url_value = VrlValue::Bytes(Bytes::from(
190                self.static_endpoints_by_subgraph
191                    .get(subgraph_name)
192                    .map(|endpoint| endpoint.value().clone())
193                    .ok_or_else(|| {
194                        SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())
195                    })?,
196            ));
197            let mut target = VrlTargetValue {
198                value: VrlValue::Object(BTreeMap::from([
199                    ("request".into(), client_request.into()),
200                    ("original_url".into(), original_url_value),
201                ])),
202                metadata: VrlValue::Object(BTreeMap::new()),
203                secrets: VrlSecrets::default(),
204            };
205
206            let mut state = VrlState::default();
207            let timezone = VrlTimeZone::default();
208            let mut ctx = VrlContext::new(&mut target, &mut state, &timezone);
209
210            // Resolve the expression to get an endpoint URL.
211            let endpoint_result = expression.resolve(&mut ctx).map_err(|err| {
212                SubgraphExecutorError::new_endpoint_expression_resolution_failure(
213                    subgraph_name.to_string(),
214                    err,
215                )
216            })?;
217            let endpoint_str = match endpoint_result.as_str() {
218                Some(s) => s.to_string(),
219                None => {
220                    return Err(SubgraphExecutorError::EndpointExpressionWrongType(
221                        subgraph_name.to_string(),
222                    ));
223                }
224            };
225
226            // Check if an executor for this endpoint already exists.
227            let existing_executor = self
228                .executors_by_subgraph
229                .get(subgraph_name)
230                .and_then(|endpoints| endpoints.get(&endpoint_str).map(|e| e.clone()));
231
232            if let Some(executor) = existing_executor {
233                return Ok(Some(executor));
234            }
235
236            // If not, create and register a new one.
237            self.register_executor(subgraph_name, &endpoint_str)?;
238
239            let endpoints = self
240                .executors_by_subgraph
241                .get(subgraph_name)
242                .expect("Executor was just registered, should be present");
243            return Ok(endpoints.get(&endpoint_str).map(|e| e.clone()));
244        }
245
246        Ok(None)
247    }
248
249    /// Looks up a subgraph executor based on a static endpoint URL.
250    fn get_executor_from_static_endpoint(
251        &self,
252        subgraph_name: &str,
253    ) -> Option<SubgraphExecutorBoxedArc> {
254        self.static_endpoints_by_subgraph
255            .get(subgraph_name)
256            .and_then(|endpoint_ref| {
257                let endpoint_str = endpoint_ref.value();
258                self.executors_by_subgraph
259                    .get(subgraph_name)
260                    .and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone()))
261            })
262    }
263
264    /// Registers a VRL expression for the given subgraph name.
265    /// The expression can later be used to resolve the endpoint URL cheaply,
266    /// without needing to recompile it every time.
267    fn register_expression(
268        &mut self,
269        subgraph_name: &str,
270        expression: &str,
271    ) -> Result<(), SubgraphExecutorError> {
272        let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| {
273            SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e)
274        })?;
275
276        self.expressions_by_subgraph
277            .insert(subgraph_name.to_string(), compilation_result.program);
278
279        Ok(())
280    }
281
282    /// Registers a static endpoint for the given subgraph name.
283    /// This is used for quick lookup when no expression is defined
284    /// or when resolving the expression (to have the original URL available there).
285    fn register_static_endpoint(&self, subgraph_name: &str, endpoint_str: &str) {
286        self.static_endpoints_by_subgraph
287            .insert(subgraph_name.to_string(), endpoint_str.to_string());
288    }
289
290    /// Registers a new HTTP subgraph executor for the given subgraph name and endpoint URL.
291    /// It makes it availble for future requests.
292    fn register_executor(
293        &self,
294        subgraph_name: &str,
295        endpoint_str: &str,
296    ) -> Result<(), SubgraphExecutorError> {
297        let endpoint_uri = endpoint_str.parse::<Uri>().map_err(|e| {
298            SubgraphExecutorError::EndpointParseFailure(endpoint_str.to_string(), e.to_string())
299        })?;
300
301        let origin = format!(
302            "{}://{}:{}",
303            endpoint_uri.scheme_str().unwrap_or("http"),
304            endpoint_uri.host().unwrap_or(""),
305            endpoint_uri.port_u16().unwrap_or_else(|| {
306                if endpoint_uri.scheme_str() == Some("https") {
307                    443
308                } else {
309                    80
310                }
311            })
312        );
313
314        let semaphore = self
315            .semaphores_by_origin
316            .entry(origin)
317            .or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host)))
318            .clone();
319
320        let executor = HTTPSubgraphExecutor::new(
321            subgraph_name.to_string(),
322            endpoint_uri,
323            self.client.clone(),
324            semaphore,
325            self.config.clone(),
326            self.in_flight_requests.clone(),
327        );
328
329        self.executors_by_subgraph
330            .entry(subgraph_name.to_string())
331            .or_default()
332            .insert(endpoint_str.to_string(), executor.to_boxed_arc());
333
334        Ok(())
335    }
336}