heliosdb_proxy/
graphql_gateway.rs1use std::sync::Arc;
10use std::time::Duration;
11
12use serde_json::{json, Value};
13use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
14use tokio::net::TcpListener;
15
16use crate::backend::{tls::default_client_config, BackendConfig, TlsMode};
17use crate::config::GraphqlGatewayConfig;
18use crate::graphql::introspector::{ColumnDefinition, TableDefinition};
19use crate::graphql::{GraphQLConfig, GraphQLEngine, GraphQLRequest, SchemaIntrospector};
20use crate::{ProxyError, Result};
21
22pub struct GraphqlGateway {
23 config: Arc<GraphqlGatewayConfig>,
24 engine: Arc<GraphQLEngine>,
25}
26
27impl GraphqlGateway {
28 pub fn new(config: GraphqlGatewayConfig) -> Self {
29 let tabledefs: Vec<TableDefinition> = config
31 .tables
32 .iter()
33 .map(|t| TableDefinition {
34 name: t.name.clone(),
35 schema: "public".to_string(),
36 columns: t
37 .columns
38 .iter()
39 .map(|c| ColumnDefinition {
40 name: c.clone(),
41 data_type: "text".to_string(),
42 nullable: true,
43 is_primary_key: c == "id",
44 has_default: false,
45 })
46 .collect(),
47 foreign_keys: Vec::new(),
48 })
49 .collect();
50 let schema = SchemaIntrospector::new().build_schema(&tabledefs);
51
52 let bcfg = BackendConfig {
53 host: config.backend_host.clone(),
54 port: config.backend_port,
55 user: config.backend_user.clone(),
56 password: config.backend_password.clone(),
57 database: config.backend_database.clone(),
58 application_name: Some("heliosproxy-graphql".to_string()),
59 tls_mode: TlsMode::Disable,
60 connect_timeout: Duration::from_secs(5),
61 query_timeout: Duration::from_secs(30),
62 tls_config: default_client_config(),
63 };
64 let engine = GraphQLEngine::new(GraphQLConfig::default(), schema).with_backend(bcfg);
65
66 Self {
67 config: Arc::new(config),
68 engine: Arc::new(engine),
69 }
70 }
71
72 pub async fn run(self) -> Result<()> {
73 let listener = TcpListener::bind(&self.config.listen_address)
74 .await
75 .map_err(|e| {
76 ProxyError::Network(format!(
77 "GraphQL gateway bind {}: {}",
78 self.config.listen_address, e
79 ))
80 })?;
81 tracing::info!(addr = %self.config.listen_address, "GraphQL gateway listening");
82 let config = self.config.clone();
83 let engine = self.engine.clone();
84 loop {
85 let (stream, peer) = match listener.accept().await {
86 Ok(x) => x,
87 Err(e) => {
88 tracing::warn!("GraphQL gateway accept error: {}", e);
89 continue;
90 }
91 };
92 let config = config.clone();
93 let engine = engine.clone();
94 tokio::spawn(async move {
95 if let Err(e) = Self::handle(stream, config, engine).await {
96 tracing::debug!(%peer, "GraphQL gateway error: {}", e);
97 }
98 });
99 }
100 }
101
102 async fn handle(
103 mut stream: tokio::net::TcpStream,
104 cfg: Arc<GraphqlGatewayConfig>,
105 engine: Arc<GraphQLEngine>,
106 ) -> Result<()> {
107 use tokio::io::AsyncBufReadExt;
108 let (reader, mut writer) = stream.split();
109 let mut reader = BufReader::new(reader);
110 let mut line = String::new();
111 let mut content_length = 0usize;
112 let mut method = String::new();
113 let mut path = String::new();
114 let mut authorized = cfg.auth_token.is_none();
115 let mut first = true;
116 loop {
117 line.clear();
118 let n = reader
119 .read_line(&mut line)
120 .await
121 .map_err(|e| ProxyError::Network(format!("GraphQL gw read: {}", e)))?;
122 if n == 0 || line == "\r\n" {
123 break;
124 }
125 if first {
126 let mut parts = line.split_whitespace();
127 method = parts.next().unwrap_or("").to_string();
128 path = parts.next().unwrap_or("").to_string();
129 first = false;
130 continue;
131 }
132 let lower = line.to_ascii_lowercase();
133 if lower.starts_with("content-length:") {
134 content_length = line
135 .split(':')
136 .nth(1)
137 .and_then(|v| v.trim().parse().ok())
138 .unwrap_or(0);
139 } else if lower.starts_with("authorization:") {
140 if let Some(tok) = cfg.auth_token.as_ref() {
141 let v = line.split_once(':').map(|x| x.1).unwrap_or("").trim();
142 authorized = v == format!("Bearer {}", tok);
143 }
144 }
145 }
146
147 if method == "GET" && (path == "/health" || path == "/") {
148 return Self::respond(&mut writer, 200, &json!({"status":"ok"})).await;
149 }
150 if !authorized {
151 return Self::respond(&mut writer, 401, &json!({"error":"unauthorized"})).await;
152 }
153 if method != "POST" {
154 return Self::respond(
155 &mut writer,
156 405,
157 &json!({"error":"use POST with a GraphQL query"}),
158 )
159 .await;
160 }
161
162 let mut body_buf = vec![0u8; content_length];
163 if content_length > 0 {
164 reader
165 .read_exact(&mut body_buf)
166 .await
167 .map_err(|e| ProxyError::Network(format!("GraphQL gw body: {}", e)))?;
168 }
169 let req: Value = match serde_json::from_slice(&body_buf) {
170 Ok(v) => v,
171 Err(e) => {
172 return Self::respond(
173 &mut writer,
174 400,
175 &json!({"errors":[{"message": format!("invalid JSON: {}", e)}]}),
176 )
177 .await
178 }
179 };
180 let query = req
181 .get("query")
182 .and_then(|q| q.as_str())
183 .unwrap_or("")
184 .trim();
185 if query.is_empty() {
186 return Self::respond(
187 &mut writer,
188 400,
189 &json!({"errors":[{"message":"missing 'query'"}]}),
190 )
191 .await;
192 }
193
194 let response = engine.execute(GraphQLRequest::new(query)).await;
195 let errors = response.errors.map(|errs| {
196 errs.iter()
197 .map(|e| json!({ "message": e.to_string() }))
198 .collect::<Vec<_>>()
199 });
200 let body = json!({ "data": response.data, "errors": errors });
201 Self::respond(&mut writer, 200, &body).await
202 }
203
204 async fn respond<W: AsyncWriteExt + Unpin>(
205 writer: &mut W,
206 status: u16,
207 body: &Value,
208 ) -> Result<()> {
209 let payload = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
210 let reason = match status {
211 200 => "OK",
212 400 => "Bad Request",
213 401 => "Unauthorized",
214 405 => "Method Not Allowed",
215 _ => "OK",
216 };
217 let head = format!(
218 "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
219 status,
220 reason,
221 payload.len()
222 );
223 writer
224 .write_all(head.as_bytes())
225 .await
226 .map_err(|e| ProxyError::Network(format!("GraphQL gw write: {}", e)))?;
227 writer
228 .write_all(&payload)
229 .await
230 .map_err(|e| ProxyError::Network(format!("GraphQL gw write: {}", e)))?;
231 let _ = writer.flush().await;
232 Ok(())
233 }
234}