hive_router/
schema_state.rs1use arc_swap::{ArcSwap, Guard};
2use async_trait::async_trait;
3use graphql_tools::parser::schema::Document;
4use graphql_tools::validation::utils::ValidationError;
5use hive_router_config::{supergraph::SupergraphSource, HiveRouterConfig};
6use hive_router_internal::telemetry::TelemetryContext;
7use hive_router_plan_executor::{
8 executors::error::SubgraphExecutorError,
9 introspection::schema::{SchemaMetadata, SchemaWithMetadata},
10 SubgraphExecutorMap,
11};
12use hive_router_query_planner::planner::plan_nodes::QueryPlan;
13use hive_router_query_planner::{
14 planner::{Planner, PlannerError},
15 state::supergraph_state::SupergraphState,
16 utils::parsing::parse_schema,
17};
18use moka::future::Cache;
19use std::sync::Arc;
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, trace};
23
24use crate::{
25 background_tasks::{BackgroundTask, BackgroundTasksManager},
26 pipeline::{
27 authorization::{AuthorizationMetadata, AuthorizationMetadataError},
28 normalize::GraphQLNormalizationPayload,
29 },
30 supergraph::{
31 base::{LoadSupergraphError, ReloadSupergraphResult, SupergraphLoader},
32 resolve_from_config,
33 },
34};
35
36pub struct SchemaState {
37 current_swapable: Arc<ArcSwap<Option<SupergraphData>>>,
38 pub plan_cache: Cache<u64, Arc<QueryPlan>>,
39 pub validate_cache: Cache<u64, Arc<Vec<ValidationError>>>,
40 pub normalize_cache: Cache<u64, Arc<GraphQLNormalizationPayload>>,
41}
42
43pub struct SupergraphData {
44 pub metadata: SchemaMetadata,
45 pub planner: Planner,
46 pub authorization: AuthorizationMetadata,
47 pub subgraph_executor_map: SubgraphExecutorMap,
48 pub supergraph_schema: Arc<Document<'static, String>>,
49}
50
51#[derive(Debug, thiserror::Error)]
52pub enum SupergraphManagerError {
53 #[error("Failed to load supergraph: {0}")]
54 LoadSupergraphError(#[from] LoadSupergraphError),
55
56 #[error("Failed to build planner: {0}")]
57 PlannerBuilderError(#[from] PlannerError),
58 #[error("Failed to build authorization: {0}")]
59 AuthorizationMetadataError(#[from] AuthorizationMetadataError),
60 #[error(transparent)]
61 ExecutorInitError(#[from] SubgraphExecutorError),
62 #[error("Unexpected: failed to load initial supergraph")]
63 FailedToLoadInitialSupergraph,
64}
65
66impl SchemaState {
67 pub fn current_supergraph(&self) -> Guard<Arc<Option<SupergraphData>>> {
68 self.current_swapable.load()
69 }
70
71 pub fn is_ready(&self) -> bool {
72 self.current_supergraph().is_some()
73 }
74
75 pub async fn new_from_config(
76 bg_tasks_manager: &mut BackgroundTasksManager,
77 telemetry_context: Arc<TelemetryContext>,
78 router_config: Arc<HiveRouterConfig>,
79 ) -> Result<Self, SupergraphManagerError> {
80 let (tx, mut rx) = mpsc::channel::<String>(1);
81 let background_loader = SupergraphBackgroundLoader::new(&router_config.supergraph, tx)?;
82 bg_tasks_manager.register_task(Arc::new(background_loader));
83
84 let swappable_data = Arc::new(ArcSwap::from(Arc::new(None)));
85 let swappable_data_spawn_clone = swappable_data.clone();
86 let plan_cache = Cache::new(1000);
87 let validate_cache = Cache::new(1000);
88 let normalize_cache = Cache::new(1000);
89
90 let task_plan_cache = plan_cache.clone();
92 let validate_cache_cache = validate_cache.clone();
93 let normalize_cache_cache = normalize_cache.clone();
94
95 bg_tasks_manager.register_handle(async move {
96 while let Some(new_sdl) = rx.recv().await {
97 debug!("Received new supergraph SDL, building new supergraph state...");
98
99 match Self::build_data(router_config.clone(), telemetry_context.clone(), &new_sdl) {
100 Ok(new_data) => {
101 swappable_data_spawn_clone.store(Arc::new(Some(new_data)));
102 debug!("Supergraph updated successfully");
103
104 task_plan_cache.invalidate_all();
105 validate_cache_cache.invalidate_all();
106 normalize_cache_cache.invalidate_all();
107 debug!("Schema-associated caches cleared successfully");
108 }
109 Err(e) => {
110 error!("Failed to build new supergraph data: {}", e);
111 }
112 }
113 }
114 });
115
116 Ok(Self {
117 current_swapable: swappable_data,
118 plan_cache,
119 validate_cache,
120 normalize_cache,
121 })
122 }
123
124 fn build_data(
125 router_config: Arc<HiveRouterConfig>,
126 telemetry_context: Arc<TelemetryContext>,
127 supergraph_sdl: &str,
128 ) -> Result<SupergraphData, SupergraphManagerError> {
129 let parsed_supergraph_sdl = parse_schema(supergraph_sdl);
130 let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
131 let planner = Planner::new_from_supergraph(&parsed_supergraph_sdl)?;
132 let metadata = planner.consumer_schema.schema_metadata();
133 let authorization = AuthorizationMetadata::build(&planner.supergraph, &metadata)?;
134 let subgraph_executor_map = SubgraphExecutorMap::from_http_endpoint_map(
135 &supergraph_state.subgraph_endpoint_map,
136 router_config,
137 telemetry_context,
138 )?;
139
140 Ok(SupergraphData {
141 supergraph_schema: Arc::new(parsed_supergraph_sdl),
142 metadata,
143 planner,
144 authorization,
145 subgraph_executor_map,
146 })
147 }
148}
149
150pub struct SupergraphBackgroundLoader {
151 loader: Box<dyn SupergraphLoader + Send + Sync>,
152 sender: Arc<mpsc::Sender<String>>,
153}
154
155impl SupergraphBackgroundLoader {
156 pub fn new(
157 config: &SupergraphSource,
158 sender: mpsc::Sender<String>,
159 ) -> Result<Self, LoadSupergraphError> {
160 let loader = resolve_from_config(config)?;
161
162 Ok(Self {
163 loader,
164 sender: Arc::new(sender),
165 })
166 }
167}
168
169#[async_trait]
170impl BackgroundTask for Arc<SupergraphBackgroundLoader> {
171 fn id(&self) -> &str {
172 "supergraph-background-loader"
173 }
174
175 async fn run(&self, token: CancellationToken) {
176 loop {
177 if token.is_cancelled() {
178 trace!("Background task cancelled");
179
180 break;
181 }
182
183 match self.loader.load().await {
184 Ok(ReloadSupergraphResult::Unchanged) => {
185 debug!("Supergraph fetched successfully with no changes");
186 }
187 Ok(ReloadSupergraphResult::Changed { new_sdl }) => {
188 debug!("Supergraph loaded successfully with changes, updating...");
189
190 if self.sender.clone().send(new_sdl).await.is_err() {
191 error!("Failed to send new supergraph SDL: receiver dropped.");
192 break;
193 }
194 }
195 Err(err) => {
196 error!("Failed to load supergraph: {}", err);
197 }
198 }
199
200 if let Some(interval) = self.loader.reload_interval() {
201 debug!(
202 "waiting for {:?}ms before checking again for supergraph changes",
203 interval.as_millis()
204 );
205
206 ntex::time::sleep(*interval).await;
207 } else {
208 debug!("poll interval not configured for supergraph changes, breaking");
209
210 break;
211 }
212 }
213 }
214}