graphgate_handler/
shared_route_table.rs1use std::sync::Arc;
2
3use anyhow::{Context, Error, Result};
4use graphgate_planner::{PlanBuilder, Request, Response, ServerError};
5use graphgate_schema::ComposedSchema;
6use opentelemetry::trace::{TraceContextExt, Tracer};
7use opentelemetry::{global, Context as OpenTelemetryContext};
8use serde::Deserialize;
9use tokio::sync::{mpsc, RwLock};
10use tokio::time::{Duration, Instant};
11use value::ConstValue;
12use warp::http::{HeaderMap, Response as HttpResponse, StatusCode};
13
14use crate::executor::Executor;
15use crate::fetcher::HttpFetcher;
16use crate::service_route::ServiceRouteTable;
17
18enum Command {
19 Change(ServiceRouteTable),
20}
21
22struct Inner {
23 schema: Option<Arc<ComposedSchema>>,
24 route_table: Option<Arc<ServiceRouteTable>>,
25}
26
27#[derive(Clone)]
28pub struct SharedRouteTable {
29 inner: Arc<RwLock<Inner>>,
30 tx: mpsc::UnboundedSender<Command>,
31}
32
33impl Default for SharedRouteTable {
34 fn default() -> Self {
35 let (tx, rx) = mpsc::unbounded_channel();
36 let shared_route_table = Self {
37 inner: Arc::new(RwLock::new(Inner {
38 schema: None,
39 route_table: None,
40 })),
41 tx,
42 };
43 tokio::spawn({
44 let shared_route_table = shared_route_table.clone();
45 async move { shared_route_table.update_loop(rx).await }
46 });
47 shared_route_table
48 }
49}
50
51impl SharedRouteTable {
52 async fn update_loop(self, mut rx: mpsc::UnboundedReceiver<Command>) {
53 let mut update_interval = tokio::time::interval_at(
54 Instant::now() + Duration::from_secs(3),
55 Duration::from_secs(30),
56 );
57
58 loop {
59 tokio::select! {
60 _ = update_interval.tick() => {
61 if let Err(err) = self.update().await {
62 tracing::error!(error = %err, "Failed to update schema.");
63 }
64 }
65 command = rx.recv() => {
66 if let Some(command) = command {
67 match command {
68 Command::Change(route_table) => {
69 let mut inner = self.inner.write().await;
70 inner.route_table = Some(Arc::new(route_table));
71 inner.schema = None;
72 }
73 }
74 }
75 }
76 }
77 }
78 }
79
80 async fn update(&self) -> Result<()> {
81 const QUERY_SDL: &str = "{ _service { sdl }}";
82
83 #[derive(Deserialize)]
84 struct ResponseQuery {
85 #[serde(rename = "_service")]
86 service: ResponseService,
87 }
88
89 #[derive(Deserialize)]
90 struct ResponseService {
91 sdl: String,
92 }
93
94 let route_table = match self.inner.read().await.route_table.clone() {
95 Some(route_table) => route_table,
96 None => return Ok(()),
97 };
98
99 let resp = futures_util::future::try_join_all(route_table.keys().map(|service| {
100 let route_table = route_table.clone();
101 async move {
102 let resp = route_table
103 .query(service, Request::new(QUERY_SDL), None)
104 .await
105 .with_context(|| format!("Failed to fetch SDL from '{}'.", service))?;
106 let resp: ResponseQuery =
107 value::from_value(resp.data).context("Failed to parse response.")?;
108 let document = parser::parse_schema(resp.service.sdl)
109 .with_context(|| format!("Invalid SDL from '{}'.", service))?;
110 Ok::<_, Error>((service.to_string(), document))
111 }
112 }))
113 .await?;
114
115 let schema = ComposedSchema::combine(resp)?;
116 self.inner.write().await.schema = Some(Arc::new(schema));
117 Ok(())
118 }
119
120 pub fn set_route_table(&self, route_table: ServiceRouteTable) {
121 self.tx.send(Command::Change(route_table)).ok();
122 }
123
124 pub async fn get(&self) -> Option<(Arc<ComposedSchema>, Arc<ServiceRouteTable>)> {
125 let (composed_schema, route_table) = {
126 let inner = self.inner.read().await;
127 (inner.schema.clone(), inner.route_table.clone())
128 };
129 composed_schema.zip(route_table)
130 }
131
132 pub async fn query(&self, request: Request, header_map: HeaderMap) -> HttpResponse<String> {
133 let tracer = global::tracer("graphql");
134
135 let document = match tracer.in_span("parse", |_| parser::parse_query(&request.query)) {
136 Ok(document) => document,
137 Err(err) => {
138 return HttpResponse::builder()
139 .status(StatusCode::BAD_REQUEST)
140 .body(err.to_string())
141 .unwrap();
142 }
143 };
144
145 let (composed_schema, route_table) = match self.get().await {
146 Some((composed_schema, route_table)) => (composed_schema, route_table),
147 _ => {
148 return HttpResponse::builder()
149 .status(StatusCode::BAD_REQUEST)
150 .body(
151 serde_json::to_string(&Response {
152 data: ConstValue::Null,
153 errors: vec![ServerError::new("Not ready.")],
154 extensions: Default::default(),
155 })
156 .unwrap(),
157 )
158 .unwrap();
159 }
160 };
161
162 let mut plan_builder =
163 PlanBuilder::new(&composed_schema, document).variables(request.variables);
164 if let Some(operation) = request.operation {
165 plan_builder = plan_builder.operation_name(operation);
166 }
167
168 let plan = match tracer.in_span("plan", |_| plan_builder.plan()) {
169 Ok(plan) => plan,
170 Err(response) => {
171 return HttpResponse::builder()
172 .status(StatusCode::OK)
173 .body(serde_json::to_string(&response).unwrap())
174 .unwrap();
175 }
176 };
177
178 let executor = Executor::new(&composed_schema);
179 let resp = opentelemetry::trace::FutureExt::with_context(
180 executor.execute_query(&HttpFetcher::new(&*route_table, &header_map), &plan),
181 OpenTelemetryContext::current_with_span(tracer.span_builder("execute").start(&tracer)),
182 )
183 .await;
184 HttpResponse::builder()
185 .status(StatusCode::OK)
186 .body(serde_json::to_string(&resp).unwrap())
187 .unwrap()
188 }
189}