Skip to main content

hive_router/
lib.rs

1pub mod background_tasks;
2mod consts;
3pub mod error;
4mod http_utils;
5mod jwt;
6pub mod pipeline;
7mod schema_state;
8mod shared_state;
9mod supergraph;
10pub mod telemetry;
11mod utils;
12
13use std::sync::Arc;
14
15use crate::{
16    background_tasks::BackgroundTasksManager,
17    consts::ROUTER_VERSION,
18    error::RouterInitError,
19    http_utils::{
20        landing_page::landing_page_handler,
21        probes::{health_check_handler, readiness_check_handler},
22    },
23    jwt::JwtAuthRuntime,
24    pipeline::{
25        error::PipelineError,
26        graphql_request_handler,
27        header::{RequestAccepts, ResponseMode, TEXT_HTML_MIME},
28        usage_reporting::init_hive_usage_agent,
29        validation::{
30            max_aliases_rule::MaxAliasesRule, max_depth_rule::MaxDepthRule,
31            max_directives_rule::MaxDirectivesRule,
32        },
33    },
34    telemetry::HeaderExtractor,
35};
36
37pub use crate::{schema_state::SchemaState, shared_state::RouterSharedState};
38
39use graphql_tools::validation::rules::default_rules_validation_plan;
40use hive_router_config::{load_config, HiveRouterConfig};
41use hive_router_internal::telemetry::{
42    otel::tracing_opentelemetry::OpenTelemetrySpanExt,
43    traces::spans::http_request::HttpServerRequestSpan, TelemetryContext,
44};
45use http::header::{CONTENT_TYPE, RETRY_AFTER};
46use ntex::util::{select, Either};
47use ntex::{
48    time::sleep,
49    web::{self, HttpRequest},
50};
51use tracing::{info, warn, Instrument};
52
53static GRAPHIQL_HTML: &str = include_str!("../static/graphiql.html");
54
55async fn graphql_endpoint_handler(
56    request: HttpRequest,
57    body_stream: web::types::Payload,
58    schema_state: web::types::State<Arc<SchemaState>>,
59    app_state: web::types::State<Arc<RouterSharedState>>,
60) -> impl web::Responder {
61    if let Some(supergraph) = schema_state.current_supergraph().as_ref() {
62        // If an early CORS response is needed, return it immediately.
63        if let Some(early_response) = app_state
64            .cors_runtime
65            .as_ref()
66            .and_then(|cors| cors.get_early_response(&request))
67        {
68            return early_response;
69        }
70
71        // agree on the response content type so that errors can be handled
72        // properly outside the request handler.
73        let response_mode = match request.negotiate() {
74            Ok(response_mode) => response_mode,
75            Err(err) => return err.into_response(None),
76        };
77
78        if response_mode == ResponseMode::GraphiQL {
79            if app_state.router_config.graphiql.enabled {
80                return web::HttpResponse::Ok()
81                    .header(CONTENT_TYPE, TEXT_HTML_MIME)
82                    .body(GRAPHIQL_HTML);
83            } else {
84                return web::HttpResponse::NotFound().into();
85            }
86        }
87
88        let parent_ctx = app_state
89            .telemetry_context
90            .extract_context(&HeaderExtractor(request.headers()));
91        let root_http_request_span = HttpServerRequestSpan::from_request(&request);
92        let _ = root_http_request_span.set_parent(parent_ctx);
93
94        async {
95            let timeout_fut = sleep(
96                app_state
97                    .router_config
98                    .traffic_shaping
99                    .router
100                    .request_timeout,
101            );
102            let req_handler_fut = graphql_request_handler(
103                &request,
104                body_stream,
105                &response_mode,
106                supergraph,
107                app_state.get_ref(),
108                schema_state.get_ref(),
109                &root_http_request_span,
110            );
111            let mut res = match select(timeout_fut, req_handler_fut).await {
112                // If the timeout future completes first, return a timeout error response.
113                Either::Left(_) => {
114                    let err = PipelineError::TimeoutError;
115                    return {
116                        tracing::error!("{}", err);
117                        err.into_response(Some(response_mode))
118                    };
119                }
120                // If the request handler future completes first, return its response.
121                Either::Right(Ok(response)) => response,
122                // If the request handler future completes first with an error, return the error response.
123                Either::Right(Err(err)) => {
124                    return {
125                        tracing::error!("{}", err);
126                        err.into_response(Some(response_mode))
127                    }
128                }
129            };
130
131            // Apply CORS headers to the final response if CORS is configured.
132            if let Some(cors) = app_state.cors_runtime.as_ref() {
133                cors.set_headers(&request, res.headers_mut());
134            }
135
136            root_http_request_span.record_response(&res);
137
138            res
139        }
140        .instrument(root_http_request_span.clone())
141        .await
142    } else {
143        warn!("No supergraph available yet, unable to process request");
144
145        web::HttpResponse::ServiceUnavailable()
146            .header(RETRY_AFTER, 10)
147            .finish()
148    }
149}
150
151pub async fn router_entrypoint() -> Result<(), RouterInitError> {
152    let config_path = std::env::var("ROUTER_CONFIG_FILE_PATH").ok();
153    let router_config = load_config(config_path)?;
154    let telemetry = telemetry::Telemetry::init_global(&router_config)?;
155    info!("hive-router@{} starting...", ROUTER_VERSION);
156    let http_config = router_config.http.clone();
157    let addr = router_config.http.address();
158    let mut bg_tasks_manager = BackgroundTasksManager::new();
159    let (shared_state, schema_state) = configure_app_from_config(
160        router_config,
161        telemetry.context.clone(),
162        &mut bg_tasks_manager,
163    )
164    .await?;
165
166    let maybe_error = web::HttpServer::new(async move || {
167        let lp_gql_path = http_config.graphql_endpoint().to_string();
168        web::App::new()
169            .state(shared_state.clone())
170            .state(schema_state.clone())
171            .configure(|m| configure_ntex_app(m, http_config.graphql_endpoint()))
172            .default_service(web::to(move || landing_page_handler(lp_gql_path.clone())))
173    })
174    .bind(&addr)
175    .map_err(|err| RouterInitError::HttpServerBindError(addr, err))?
176    .run()
177    .await
178    .map_err(RouterInitError::HttpServerStartError);
179
180    info!("server stopped, clearning background tasks");
181    bg_tasks_manager.shutdown();
182    telemetry.graceful_shutdown().await;
183
184    maybe_error
185}
186
187pub async fn configure_app_from_config(
188    router_config: HiveRouterConfig,
189    telemetry_context: TelemetryContext,
190    bg_tasks_manager: &mut BackgroundTasksManager,
191) -> Result<(Arc<RouterSharedState>, Arc<SchemaState>), RouterInitError> {
192    let jwt_runtime = match router_config.jwt.is_jwt_auth_enabled() {
193        true => Some(JwtAuthRuntime::init(bg_tasks_manager, &router_config.jwt).await?),
194        false => None,
195    };
196
197    let hive_usage_agent = match router_config.telemetry.hive.as_ref() {
198        Some(hive_config) if hive_config.usage_reporting.enabled => {
199            Some(init_hive_usage_agent(bg_tasks_manager, hive_config)?)
200        }
201        _ => None,
202    };
203
204    let router_config_arc = Arc::new(router_config);
205    let telemetry_context_arc = Arc::new(telemetry_context);
206    let schema_state = SchemaState::new_from_config(
207        bg_tasks_manager,
208        telemetry_context_arc.clone(),
209        router_config_arc.clone(),
210    )
211    .await?;
212    let schema_state_arc = Arc::new(schema_state);
213    let mut validation_plan = default_rules_validation_plan();
214    if let Some(max_depth_config) = &router_config_arc.limits.max_depth {
215        validation_plan.add_rule(Box::new(MaxDepthRule {
216            config: max_depth_config.clone(),
217        }));
218    }
219    if let Some(max_directives_config) = &router_config_arc.limits.max_directives {
220        validation_plan.add_rule(Box::new(MaxDirectivesRule {
221            config: max_directives_config.clone(),
222        }));
223    }
224    if let Some(max_aliases_config) = &router_config_arc.limits.max_aliases {
225        validation_plan.add_rule(Box::new(MaxAliasesRule {
226            config: max_aliases_config.clone(),
227        }));
228    }
229    let shared_state = Arc::new(RouterSharedState::new(
230        router_config_arc,
231        jwt_runtime,
232        hive_usage_agent,
233        validation_plan,
234        telemetry_context_arc,
235    )?);
236
237    Ok((shared_state, schema_state_arc))
238}
239
240pub fn configure_ntex_app(cfg: &mut web::ServiceConfig, graphql_path: &str) {
241    cfg.route(graphql_path, web::to(graphql_endpoint_handler))
242        .route("/health", web::to(health_check_handler))
243        .route("/readiness", web::to(readiness_check_handler));
244}
245
246/// Initializes the rustls cryptographic provider for the entire process.
247///
248/// Rustls requires a cryptographic provider to be set as the default before any TLS operations occur.
249/// Installs AWS-LC, as `ring` is no longer maintained.
250///
251/// This function should be called early in the application startup, before any rustls-based TLS
252/// connections are established.
253/// In the hive-router binary and docker image, it's called automatically during router initialization.
254/// This ensures that all TLS operations throughout the application can use the configured provider.
255///
256/// This function can only be called successfully once per process.
257/// Subsequent calls will log a warning, but will not fail.
258///
259///
260/// This allows consumers of the `hive-router` crate to use their own cryptographic provider if needed,
261/// by calling this function or setting their own provider before initializing the router.
262///
263/// This function does not return an error. If the provider is already installed, it logs a warning.
264pub fn init_rustls_crypto_provider() {
265    if rustls::crypto::aws_lc_rs::default_provider()
266        .install_default()
267        .is_err()
268    {
269        warn!("Rustls crypto provider already installed");
270    }
271}