1use crate::consts::{
2 ASSET_VERSION_FILE, GIT_IGNORE_CONTENTS, GIT_IGNORE_FILE, MIN_NODE_VERSION, SCHEMA_PARSER_DIR, SCHEMA_PARSER_INDEX,
3};
4use crate::custom_resolvers::build_resolvers;
5use crate::error_server;
6use crate::event::{wait_for_event, wait_for_event_and_match, Event};
7use crate::file_watcher::start_watcher;
8use crate::types::{Assets, ServerMessage};
9use crate::{bridge, errors::ServerError};
10use common::consts::{EPHEMERAL_PORT_RANGE, GRAFBASE_DIRECTORY_NAME, GRAFBASE_SCHEMA_FILE_NAME};
11use common::environment::Environment;
12use common::types::LocalAddressType;
13use common::utils::find_available_port_in_range;
14use futures_util::FutureExt;
15
16use std::env;
17use std::path::Path;
18use std::sync::mpsc::{self, Receiver, Sender};
19use std::{
20 fs,
21 process::Stdio,
22 thread::{self, JoinHandle},
23};
24use tokio::io::AsyncWriteExt;
25use tokio::process::Command;
26use tokio::runtime::Builder;
27use tokio::sync::broadcast::{self, channel};
28use version_compare::Version;
29use which::which;
30
31const EVENT_BUS_BOUND: usize = 5;
32
33#[must_use]
48pub fn start(port: u16, watch: bool, tracing: bool) -> (JoinHandle<Result<(), ServerError>>, Receiver<ServerMessage>) {
49 let (sender, receiver): (Sender<ServerMessage>, Receiver<ServerMessage>) = mpsc::channel();
50
51 let environment = Environment::get();
52
53 let handle = thread::spawn(move || {
54 export_embedded_files()?;
55
56 create_project_dot_grafbase_directory()?;
57
58 let bridge_port = get_bridge_port(port)?;
59
60 Builder::new_current_thread()
62 .enable_all()
63 .build()
64 .unwrap()
65 .block_on(async {
66 let (event_bus, _receiver) = channel::<Event>(EVENT_BUS_BOUND);
67
68 if watch {
69 let watch_event_bus = event_bus.clone();
70
71 tokio::select! {
72 result = start_watcher(environment.project_grafbase_path.clone(), move |path| {
73 let relative_path = path.strip_prefix(&environment.project_path).expect("must succeed by definition").to_owned();
74 watch_event_bus.send(Event::Reload(relative_path)).expect("cannot fail");
75 }) => { result }
76 result = server_loop(port, bridge_port, watch, sender, event_bus.clone(), tracing) => { result }
77 }
78 } else {
79 Ok(spawn_servers(port, bridge_port, watch, sender, event_bus, None, tracing).await?)
80 }
81 })
82 });
83
84 (handle, receiver)
85}
86
87async fn server_loop(
88 worker_port: u16,
89 bridge_port: u16,
90 watch: bool,
91 sender: Sender<ServerMessage>,
92 event_bus: broadcast::Sender<Event>,
93 tracing: bool,
94) -> Result<(), ServerError> {
95 let mut path_changed = None;
96 loop {
97 let receiver = event_bus.subscribe();
98 tokio::select! {
99 result = spawn_servers(worker_port, bridge_port, watch, sender.clone(), event_bus.clone(), path_changed.as_deref(), tracing) => {
100 result?;
101 }
102 path = wait_for_event_and_match(receiver, |event| match event {
103 Event::Reload(path) => Some(path),
104 Event::BridgeReady => None,
105 }) => {
106 trace!("reload");
107 path_changed = Some(path.clone());
108 let _: Result<_, _> = sender.send(ServerMessage::Reload(path));
109 }
110 }
111 }
112}
113
114#[tracing::instrument(level = "trace")]
115async fn spawn_servers(
116 worker_port: u16,
117 bridge_port: u16,
118 watch: bool,
119 sender: Sender<ServerMessage>,
120 event_bus: broadcast::Sender<Event>,
121 path_changed: Option<&Path>,
122 tracing: bool,
123) -> Result<(), ServerError> {
124 let bridge_event_bus = event_bus.clone();
125
126 let receiver = event_bus.subscribe();
127
128 validate_dependencies().await?;
129
130 let environment_variables: std::collections::HashMap<_, _> = crate::environment::variables().collect();
131
132 let mut resolvers = match run_schema_parser(&environment_variables).await {
133 Ok(resolvers) => resolvers,
134 Err(error) => {
135 let _: Result<_, _> = sender.send(ServerMessage::CompilationError(error.to_string()));
136 tokio::spawn(async move { error_server::start(worker_port, error.to_string(), bridge_event_bus).await })
137 .await??;
138 return Ok(());
139 }
140 };
141
142 if !path_changed
150 .map(|path| path == Path::new(GRAFBASE_DIRECTORY_NAME).join(GRAFBASE_SCHEMA_FILE_NAME))
151 .unwrap_or_default()
152 {
153 for resolver in &mut resolvers {
154 resolver.fresh = false;
155 }
156 }
157
158 let environment = Environment::get();
159
160 let resolver_paths = match build_resolvers(&sender, environment, &environment_variables, resolvers, tracing).await {
161 Ok(resolver_paths) => resolver_paths,
162 Err(error) => {
163 let _: Result<_, _> = sender.send(ServerMessage::CompilationError(error.to_string()));
164 let error = strip_ansi_escapes::strip(error.to_string().as_bytes())
166 .ok()
167 .and_then(|stripped| String::from_utf8(stripped).ok())
168 .unwrap_or_else(|| error.to_string());
169 tokio::spawn(async move { error_server::start(worker_port, error, bridge_event_bus).await }).await??;
170 return Ok(());
171 }
172 };
173
174 let (bridge_sender, mut bridge_receiver) = tokio::sync::mpsc::channel(128);
175
176 let mut bridge_handle =
177 tokio::spawn(async move { bridge::start(bridge_port, worker_port, bridge_sender, bridge_event_bus).await })
178 .fuse();
179
180 let sender_cloned = sender.clone();
181 tokio::spawn(async move {
182 while let Some(message) = bridge_receiver.recv().await {
183 sender_cloned.send(message).unwrap();
184 }
185 });
186
187 trace!("waiting for bridge ready");
188 tokio::select! {
189 _ = wait_for_event(receiver, |event| *event == Event::BridgeReady) => (),
190 result = &mut bridge_handle => {result??; return Ok(());}
191 };
192 trace!("bridge ready");
193
194 let registry_path = environment
195 .project_grafbase_registry_path
196 .to_str()
197 .ok_or(ServerError::ProjectPath)?;
198
199 trace!("spawning miniflare for the main worker");
200
201 let worker_port_string = worker_port.to_string();
202 let bridge_port_binding_string = format!("BRIDGE_PORT={bridge_port}");
203 let registry_text_blob_string = format!("REGISTRY={registry_path}");
204
205 let mut miniflare_arguments: Vec<_> = [
206 "--experimental-vm-modules",
208 "./node_modules/miniflare/dist/src/cli.js",
209 "--modules",
210 "--host",
211 "127.0.0.1",
212 "--port",
213 &worker_port_string,
214 "--no-update-check",
215 "--no-cf-fetch",
216 "--do-persist",
217 "--wrangler-config",
218 "./wrangler.toml",
219 "--binding",
220 &bridge_port_binding_string,
221 "--text-blob",
222 ®istry_text_blob_string,
223 "--mount",
224 "stream-router=./stream-router",
225 ]
226 .into_iter()
227 .map(std::borrow::Cow::Borrowed)
228 .collect();
229 miniflare_arguments.extend(resolver_paths.into_iter().flat_map(|(resolver_name, resolver_path)| {
230 [
231 "--mount".into(),
232 format!(
233 "{resolver_name}={resolver_path}",
234 resolver_name = slug::slugify(resolver_name),
235 resolver_path = resolver_path.display()
236 )
237 .into(),
238 ]
239 }));
240
241 #[cfg(feature = "dynamodb")]
242 {
243 #[allow(clippy::panic)]
244 fn get_env(key: &str) -> String {
245 let val = std::env::var(key).unwrap_or_else(|_| panic!("Environment variable not found:{key}"));
246 format!("{key}={val}")
247 }
248
249 miniflare_arguments.extend(
250 vec![
251 "AWS_ACCESS_KEY_ID",
252 "AWS_SECRET_ACCESS_KEY",
253 "DYNAMODB_REGION",
254 "DYNAMODB_TABLE_NAME",
255 ]
256 .iter()
257 .map(|key| get_env(key))
258 .flat_map(|env| {
259 std::iter::once(std::borrow::Cow::Borrowed("--binding")).chain(std::iter::once(env.into()))
260 }),
261 );
262 }
263
264 let mut miniflare = Command::new("node");
265 miniflare
266 .env("MINIFLARE_SUBREQUEST_LIMIT", "1000")
268 .args(miniflare_arguments.iter().map(std::convert::AsRef::as_ref))
269 .stdout(if tracing { Stdio::inherit() } else { Stdio::piped() })
270 .stderr(if tracing { Stdio::inherit() } else { Stdio::piped() })
271 .current_dir(&environment.user_dot_grafbase_path)
272 .kill_on_drop(watch);
273 trace!("Spawning {miniflare:?}");
274 let miniflare = miniflare.spawn().map_err(ServerError::MiniflareCommandError)?;
275
276 let _: Result<_, _> = sender.send(ServerMessage::Ready(worker_port));
277
278 let miniflare_output_result = miniflare.wait_with_output();
279
280 tokio::select! {
281 result = miniflare_output_result => {
282 let output = result.map_err(ServerError::MiniflareCommandError)?;
283
284 output
285 .status
286 .success()
287 .then_some(())
288 .ok_or_else(|| ServerError::MiniflareError(String::from_utf8_lossy(&output.stderr).into_owned()))?;
289 }
290 bridge_handle_result = bridge_handle => { bridge_handle_result??; }
291 }
292
293 Ok(())
294}
295
296fn export_embedded_files() -> Result<(), ServerError> {
297 let environment = Environment::get();
298
299 let current_version = env!("CARGO_PKG_VERSION");
300
301 let version_path = environment.user_dot_grafbase_path.join(ASSET_VERSION_FILE);
302
303 let export_files = if env::var("GRAFBASE_SKIP_ASSET_VERSION_CHECK").is_ok() {
304 false
305 } else if env::var("GRAFBASE_FORCE_EXPORT_FILES").is_ok() {
306 true
307 } else if version_path.exists() {
308 let asset_version = fs::read_to_string(&version_path).map_err(|_| ServerError::ReadVersion)?;
309 current_version != asset_version
310 } else {
311 true
312 };
313
314 if export_files {
315 trace!("writing worker files");
316
317 fs::create_dir_all(&environment.user_dot_grafbase_path).map_err(|_| ServerError::CreateCacheDir)?;
318
319 let gitignore_path = &environment.user_dot_grafbase_path.join(GIT_IGNORE_FILE);
320
321 fs::write(gitignore_path, GIT_IGNORE_CONTENTS)
322 .map_err(|_| ServerError::WriteFile(gitignore_path.to_string_lossy().into_owned()))?;
323
324 let mut write_results = Assets::iter().map(|path| {
325 let file = Assets::get(path.as_ref());
326
327 let full_path = environment.user_dot_grafbase_path.join(path.as_ref());
328
329 let parent = full_path.parent().expect("must have a parent");
330 let create_dir_result = fs::create_dir_all(parent);
331
332 let write_result = create_dir_result.and_then(|_| fs::write(&full_path, file.unwrap().data));
334
335 (write_result, full_path)
336 });
337
338 if let Some((_, path)) = write_results.find(|(result, _)| result.is_err()) {
339 let error_path_string = path.to_string_lossy().into_owned();
340 return Err(ServerError::WriteFile(error_path_string));
341 }
342
343 if fs::write(&version_path, current_version).is_err() {
344 let version_path_string = version_path.to_string_lossy().into_owned();
345 return Err(ServerError::WriteFile(version_path_string));
346 };
347 }
348
349 Ok(())
350}
351
352fn create_project_dot_grafbase_directory() -> Result<(), ServerError> {
353 let environment = Environment::get();
354
355 let project_dot_grafbase_path = environment.project_dot_grafbase_path.clone();
356
357 if fs::metadata(&project_dot_grafbase_path).is_err() {
358 trace!("creating .grafbase directory");
359 fs::create_dir_all(&project_dot_grafbase_path).map_err(|_| ServerError::CreateCacheDir)?;
360 fs::write(project_dot_grafbase_path.join(GIT_IGNORE_FILE), "*\n").map_err(|_| ServerError::CreateCacheDir)?;
361 }
362
363 Ok(())
364}
365
366#[derive(serde::Deserialize)]
367struct SchemaParserResult {
368 #[allow(dead_code)]
369 required_resolvers: Vec<String>,
370 versioned_registry: serde_json::Value,
371}
372
373pub struct DetectedResolver {
374 pub resolver_name: String,
375 pub fresh: bool,
376}
377
378async fn run_schema_parser(
381 environment_variables: &std::collections::HashMap<String, String>,
382) -> Result<Vec<DetectedResolver>, ServerError> {
383 trace!("parsing schema");
384
385 let environment = Environment::get();
386
387 let parser_path = environment
388 .user_dot_grafbase_path
389 .join(SCHEMA_PARSER_DIR)
390 .join(SCHEMA_PARSER_INDEX);
391
392 let parser_result_path = tokio::task::spawn_blocking(tempfile::NamedTempFile::new)
393 .await?
394 .map_err(ServerError::CreateTemporaryFile)?
395 .into_temp_path();
396
397 trace!(
398 "using a temporary file for the parser output: {parser_result_path}",
399 parser_result_path = parser_result_path.display()
400 );
401
402 let output = {
403 let mut node_command = Command::new("node")
404 .args([
405 parser_path.to_str().ok_or(ServerError::CachePath)?,
406 environment
407 .project_grafbase_schema_path
408 .to_str()
409 .ok_or(ServerError::ProjectPath)?,
410 parser_result_path.to_str().expect("must be a valid path"),
411 ])
412 .current_dir(&environment.project_dot_grafbase_path)
413 .stdin(Stdio::piped())
414 .stderr(Stdio::piped())
415 .stdout(Stdio::piped())
416 .spawn()
417 .map_err(ServerError::SchemaParserError)?;
418
419 let node_command_stdin = node_command.stdin.as_mut().expect("stdin must be available");
420 node_command_stdin
421 .write_all(&serde_json::to_vec(environment_variables).expect("must serialise to JSON just fine"))
422 .await
423 .map_err(ServerError::SchemaParserError)?;
424
425 node_command
426 .wait_with_output()
427 .await
428 .map_err(ServerError::SchemaParserError)?
429 };
430
431 if !output.status.success() {
432 return Err(ServerError::ParseSchema(
433 String::from_utf8_lossy(&output.stderr).into_owned(),
434 ));
435 }
436
437 let parser_result_string = tokio::fs::read_to_string(&parser_result_path)
438 .await
439 .map_err(ServerError::SchemaParserResultRead)?;
440 let SchemaParserResult {
441 versioned_registry,
442 required_resolvers,
443 } = serde_json::from_str(&parser_result_string).map_err(ServerError::SchemaParserResultJson)?;
444
445 let registry_mtime = tokio::fs::metadata(&environment.project_grafbase_registry_path)
446 .await
447 .ok()
448 .map(|metadata| metadata.modified().expect("must be supported"));
449
450 let detected_resolvers = futures_util::future::join_all(required_resolvers.into_iter().map(|resolver_name| {
451 let wrangler_toml_path = environment
453 .resolvers_build_artifact_path
454 .join(&resolver_name)
455 .join("wrangler.toml");
456 async move {
457 let wrangler_toml_mtime = tokio::fs::metadata(&wrangler_toml_path)
458 .await
459 .ok()
460 .map(|metadata| metadata.modified().expect("must be supported"));
461 let fresh = registry_mtime
462 .zip(wrangler_toml_mtime)
463 .map(|(registry_mtime, wrangler_toml_mtime)| wrangler_toml_mtime > registry_mtime)
464 .unwrap_or_default();
465 DetectedResolver { resolver_name, fresh }
466 }
467 }))
468 .await;
469
470 tokio::fs::write(
471 &environment.project_grafbase_registry_path,
472 serde_json::to_string(&versioned_registry).expect("serde_json::Value serialises just fine for sure"),
473 )
474 .await
475 .map_err(ServerError::SchemaRegistryWrite)?;
476
477 Ok(detected_resolvers)
478}
479
480async fn get_node_version_string() -> Result<String, ServerError> {
481 let output = Command::new("node")
482 .arg("--version")
483 .stdout(Stdio::piped())
484 .stderr(Stdio::piped())
485 .spawn()
486 .map_err(|_| ServerError::CheckNodeVersion)?
487 .wait_with_output()
488 .await
489 .map_err(|_| ServerError::CheckNodeVersion)?;
490
491 let node_version_string = String::from_utf8_lossy(&output.stdout).trim().to_owned();
492
493 Ok(node_version_string)
494}
495
496async fn validate_node_version() -> Result<(), ServerError> {
497 trace!("validating Node.js version");
498 trace!("minimal supported Node.js version: {}", MIN_NODE_VERSION);
499
500 let node_version_string = get_node_version_string().await?;
501
502 trace!("installed node version: {}", node_version_string);
503
504 let node_version = Version::from(&node_version_string).ok_or(ServerError::CheckNodeVersion)?;
505 let min_version = Version::from(MIN_NODE_VERSION).expect("must be valid");
506
507 if node_version >= min_version {
508 Ok(())
509 } else {
510 Err(ServerError::OutdatedNode(
511 node_version_string,
512 MIN_NODE_VERSION.to_owned(),
513 ))
514 }
515}
516
517async fn validate_dependencies() -> Result<(), ServerError> {
518 trace!("validating dependencies");
519
520 which("node").map_err(|_| ServerError::NodeInPath)?;
521
522 validate_node_version().await?;
523
524 Ok(())
525}
526
527fn get_bridge_port(http_port: u16) -> Result<u16, ServerError> {
533 #[allow(clippy::cast_possible_truncation)]
535 let segment = http_port % 100;
536 #[allow(clippy::cast_possible_truncation)]
538 let size = EPHEMERAL_PORT_RANGE.len() as u16;
539 let offset = size / 100 * segment;
540 let start = EPHEMERAL_PORT_RANGE.min().expect("must exist");
541 let range = EPHEMERAL_PORT_RANGE.map(|port| (port + offset) % size + start);
543
544 find_available_port_in_range(range, LocalAddressType::Localhost).ok_or(ServerError::AvailablePort)
546}