Skip to main content

hive_router/
schema_state.rs

1use arc_swap::{ArcSwap, Guard};
2use async_trait::async_trait;
3use graphql_tools::parser::schema::Document;
4use graphql_tools::validation::utils::ValidationError;
5use hive_router_config::{supergraph::SupergraphSource, HiveRouterConfig};
6use hive_router_internal::telemetry::TelemetryContext;
7use hive_router_plan_executor::{
8    executors::error::SubgraphExecutorError,
9    introspection::schema::{SchemaMetadata, SchemaWithMetadata},
10    SubgraphExecutorMap,
11};
12use hive_router_query_planner::planner::plan_nodes::QueryPlan;
13use hive_router_query_planner::{
14    planner::{Planner, PlannerError},
15    state::supergraph_state::SupergraphState,
16    utils::parsing::parse_schema,
17};
18use moka::future::Cache;
19use std::sync::Arc;
20use tokio::sync::mpsc;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, trace};
23
24use crate::{
25    background_tasks::{BackgroundTask, BackgroundTasksManager},
26    pipeline::{
27        authorization::{AuthorizationMetadata, AuthorizationMetadataError},
28        normalize::GraphQLNormalizationPayload,
29    },
30    supergraph::{
31        base::{LoadSupergraphError, ReloadSupergraphResult, SupergraphLoader},
32        resolve_from_config,
33    },
34};
35
36pub struct SchemaState {
37    current_swapable: Arc<ArcSwap<Option<SupergraphData>>>,
38    pub plan_cache: Cache<u64, Arc<QueryPlan>>,
39    pub validate_cache: Cache<u64, Arc<Vec<ValidationError>>>,
40    pub normalize_cache: Cache<u64, Arc<GraphQLNormalizationPayload>>,
41}
42
43pub struct SupergraphData {
44    pub metadata: SchemaMetadata,
45    pub planner: Planner,
46    pub authorization: AuthorizationMetadata,
47    pub subgraph_executor_map: SubgraphExecutorMap,
48    pub supergraph_schema: Arc<Document<'static, String>>,
49}
50
51#[derive(Debug, thiserror::Error)]
52pub enum SupergraphManagerError {
53    #[error("Failed to load supergraph: {0}")]
54    LoadSupergraphError(#[from] LoadSupergraphError),
55
56    #[error("Failed to build planner: {0}")]
57    PlannerBuilderError(#[from] PlannerError),
58    #[error("Failed to build authorization: {0}")]
59    AuthorizationMetadataError(#[from] AuthorizationMetadataError),
60    #[error(transparent)]
61    ExecutorInitError(#[from] SubgraphExecutorError),
62    #[error("Unexpected: failed to load initial supergraph")]
63    FailedToLoadInitialSupergraph,
64}
65
66impl SchemaState {
67    pub fn current_supergraph(&self) -> Guard<Arc<Option<SupergraphData>>> {
68        self.current_swapable.load()
69    }
70
71    pub fn is_ready(&self) -> bool {
72        self.current_supergraph().is_some()
73    }
74
75    pub async fn new_from_config(
76        bg_tasks_manager: &mut BackgroundTasksManager,
77        telemetry_context: Arc<TelemetryContext>,
78        router_config: Arc<HiveRouterConfig>,
79    ) -> Result<Self, SupergraphManagerError> {
80        let (tx, mut rx) = mpsc::channel::<String>(1);
81        let background_loader = SupergraphBackgroundLoader::new(&router_config.supergraph, tx)?;
82        bg_tasks_manager.register_task(Arc::new(background_loader));
83
84        let swappable_data = Arc::new(ArcSwap::from(Arc::new(None)));
85        let swappable_data_spawn_clone = swappable_data.clone();
86        let plan_cache = Cache::new(1000);
87        let validate_cache = Cache::new(1000);
88        let normalize_cache = Cache::new(1000);
89
90        // This is cheap clone, as Cache is thread-safe and can be cloned without any performance penalty.
91        let task_plan_cache = plan_cache.clone();
92        let validate_cache_cache = validate_cache.clone();
93        let normalize_cache_cache = normalize_cache.clone();
94
95        bg_tasks_manager.register_handle(async move {
96            while let Some(new_sdl) = rx.recv().await {
97                debug!("Received new supergraph SDL, building new supergraph state...");
98
99                match Self::build_data(router_config.clone(), telemetry_context.clone(), &new_sdl) {
100                    Ok(new_data) => {
101                        swappable_data_spawn_clone.store(Arc::new(Some(new_data)));
102                        debug!("Supergraph updated successfully");
103
104                        task_plan_cache.invalidate_all();
105                        validate_cache_cache.invalidate_all();
106                        normalize_cache_cache.invalidate_all();
107                        debug!("Schema-associated caches cleared successfully");
108                    }
109                    Err(e) => {
110                        error!("Failed to build new supergraph data: {}", e);
111                    }
112                }
113            }
114        });
115
116        Ok(Self {
117            current_swapable: swappable_data,
118            plan_cache,
119            validate_cache,
120            normalize_cache,
121        })
122    }
123
124    fn build_data(
125        router_config: Arc<HiveRouterConfig>,
126        telemetry_context: Arc<TelemetryContext>,
127        supergraph_sdl: &str,
128    ) -> Result<SupergraphData, SupergraphManagerError> {
129        let parsed_supergraph_sdl = parse_schema(supergraph_sdl);
130        let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
131        let planner = Planner::new_from_supergraph(&parsed_supergraph_sdl)?;
132        let metadata = planner.consumer_schema.schema_metadata();
133        let authorization = AuthorizationMetadata::build(&planner.supergraph, &metadata)?;
134        let subgraph_executor_map = SubgraphExecutorMap::from_http_endpoint_map(
135            &supergraph_state.subgraph_endpoint_map,
136            router_config,
137            telemetry_context,
138        )?;
139
140        Ok(SupergraphData {
141            supergraph_schema: Arc::new(parsed_supergraph_sdl),
142            metadata,
143            planner,
144            authorization,
145            subgraph_executor_map,
146        })
147    }
148}
149
150pub struct SupergraphBackgroundLoader {
151    loader: Box<dyn SupergraphLoader + Send + Sync>,
152    sender: Arc<mpsc::Sender<String>>,
153}
154
155impl SupergraphBackgroundLoader {
156    pub fn new(
157        config: &SupergraphSource,
158        sender: mpsc::Sender<String>,
159    ) -> Result<Self, LoadSupergraphError> {
160        let loader = resolve_from_config(config)?;
161
162        Ok(Self {
163            loader,
164            sender: Arc::new(sender),
165        })
166    }
167}
168
169#[async_trait]
170impl BackgroundTask for Arc<SupergraphBackgroundLoader> {
171    fn id(&self) -> &str {
172        "supergraph-background-loader"
173    }
174
175    async fn run(&self, token: CancellationToken) {
176        loop {
177            if token.is_cancelled() {
178                trace!("Background task cancelled");
179
180                break;
181            }
182
183            match self.loader.load().await {
184                Ok(ReloadSupergraphResult::Unchanged) => {
185                    debug!("Supergraph fetched successfully with no changes");
186                }
187                Ok(ReloadSupergraphResult::Changed { new_sdl }) => {
188                    debug!("Supergraph loaded successfully with changes, updating...");
189
190                    if self.sender.clone().send(new_sdl).await.is_err() {
191                        error!("Failed to send new supergraph SDL: receiver dropped.");
192                        break;
193                    }
194                }
195                Err(err) => {
196                    error!("Failed to load supergraph: {}", err);
197                }
198            }
199
200            if let Some(interval) = self.loader.reload_interval() {
201                debug!(
202                    "waiting for {:?}ms before checking again for supergraph changes",
203                    interval.as_millis()
204                );
205
206                ntex::time::sleep(*interval).await;
207            } else {
208                debug!("poll interval not configured for supergraph changes, breaking");
209
210                break;
211            }
212        }
213    }
214}