grafbase_local_server/
servers.rs

1use crate::consts::{
2    ASSET_VERSION_FILE, GIT_IGNORE_CONTENTS, GIT_IGNORE_FILE, MIN_NODE_VERSION, SCHEMA_PARSER_DIR, SCHEMA_PARSER_INDEX,
3};
4use crate::custom_resolvers::build_resolvers;
5use crate::error_server;
6use crate::event::{wait_for_event, wait_for_event_and_match, Event};
7use crate::file_watcher::start_watcher;
8use crate::types::{Assets, ServerMessage};
9use crate::{bridge, errors::ServerError};
10use common::consts::{EPHEMERAL_PORT_RANGE, GRAFBASE_DIRECTORY_NAME, GRAFBASE_SCHEMA_FILE_NAME};
11use common::environment::Environment;
12use common::types::LocalAddressType;
13use common::utils::find_available_port_in_range;
14use futures_util::FutureExt;
15
16use std::env;
17use std::path::Path;
18use std::sync::mpsc::{self, Receiver, Sender};
19use std::{
20    fs,
21    process::Stdio,
22    thread::{self, JoinHandle},
23};
24use tokio::io::AsyncWriteExt;
25use tokio::process::Command;
26use tokio::runtime::Builder;
27use tokio::sync::broadcast::{self, channel};
28use version_compare::Version;
29use which::which;
30
31const EVENT_BUS_BOUND: usize = 5;
32
33/// starts a development server by unpacking any files needed by the gateway worker
34/// and starting the miniflare cli in `user_grafbase_path` in [`Environment`]
35///
36/// # Errors
37///
38/// returns [`ServerError::ReadVersion`] if the version file for the extracted worker files cannot be read
39///
40/// returns [`ServerError::CreateDir`] if the `WORKER_DIR` cannot be created
41///
42/// returns [`ServerError::WriteFile`] if a file cannot be written into `WORKER_DIR`
43///
44/// # Panics
45///
46/// The spawned server and miniflare thread can panic if either of the two inner spawned threads panic
47#[must_use]
48pub fn start(port: u16, watch: bool, tracing: bool) -> (JoinHandle<Result<(), ServerError>>, Receiver<ServerMessage>) {
49    let (sender, receiver): (Sender<ServerMessage>, Receiver<ServerMessage>) = mpsc::channel();
50
51    let environment = Environment::get();
52
53    let handle = thread::spawn(move || {
54        export_embedded_files()?;
55
56        create_project_dot_grafbase_directory()?;
57
58        let bridge_port = get_bridge_port(port)?;
59
60        // manual implementation of #[tokio::main] due to a rust analyzer issue
61        Builder::new_current_thread()
62            .enable_all()
63            .build()
64            .unwrap()
65            .block_on(async {
66                let (event_bus, _receiver) = channel::<Event>(EVENT_BUS_BOUND);
67
68                if watch {
69                    let watch_event_bus = event_bus.clone();
70
71                    tokio::select! {
72                        result = start_watcher(environment.project_grafbase_path.clone(),  move |path| {
73                            let relative_path = path.strip_prefix(&environment.project_path).expect("must succeed by definition").to_owned();
74                            watch_event_bus.send(Event::Reload(relative_path)).expect("cannot fail");
75                        }) => { result }
76                        result = server_loop(port, bridge_port, watch, sender, event_bus.clone(), tracing) => { result }
77                    }
78                } else {
79                    Ok(spawn_servers(port, bridge_port, watch, sender, event_bus, None, tracing).await?)
80                }
81            })
82    });
83
84    (handle, receiver)
85}
86
87async fn server_loop(
88    worker_port: u16,
89    bridge_port: u16,
90    watch: bool,
91    sender: Sender<ServerMessage>,
92    event_bus: broadcast::Sender<Event>,
93    tracing: bool,
94) -> Result<(), ServerError> {
95    let mut path_changed = None;
96    loop {
97        let receiver = event_bus.subscribe();
98        tokio::select! {
99            result = spawn_servers(worker_port, bridge_port, watch, sender.clone(), event_bus.clone(), path_changed.as_deref(), tracing) => {
100                result?;
101            }
102            path = wait_for_event_and_match(receiver, |event| match event {
103                Event::Reload(path) => Some(path),
104                Event::BridgeReady => None,
105            }) => {
106                trace!("reload");
107                path_changed = Some(path.clone());
108                let _: Result<_, _> = sender.send(ServerMessage::Reload(path));
109            }
110        }
111    }
112}
113
114#[tracing::instrument(level = "trace")]
115async fn spawn_servers(
116    worker_port: u16,
117    bridge_port: u16,
118    watch: bool,
119    sender: Sender<ServerMessage>,
120    event_bus: broadcast::Sender<Event>,
121    path_changed: Option<&Path>,
122    tracing: bool,
123) -> Result<(), ServerError> {
124    let bridge_event_bus = event_bus.clone();
125
126    let receiver = event_bus.subscribe();
127
128    validate_dependencies().await?;
129
130    let environment_variables: std::collections::HashMap<_, _> = crate::environment::variables().collect();
131
132    let mut resolvers = match run_schema_parser(&environment_variables).await {
133        Ok(resolvers) => resolvers,
134        Err(error) => {
135            let _: Result<_, _> = sender.send(ServerMessage::CompilationError(error.to_string()));
136            tokio::spawn(async move { error_server::start(worker_port, error.to_string(), bridge_event_bus).await })
137                .await??;
138            return Ok(());
139        }
140    };
141
142    // If the rebuild has been triggered by a change in the schema file, we can honour the freshness of resolvers
143    // determined by inspecting the modified time of final artifacts of detected resolvers compared to the modified time
144    // of the generated schema registry file.
145    // Otherwise, we trigger a rebuild all resolvers. That, individually, will still more often than not be very quick
146    // because the build naturally reuses the intermediate artifacts from node_modules from previous builds.
147    // For this logic to become more fine-grained we would need to have an understanding of the module dependency graph
148    // in resolvers, and that's a non-trivial problem.
149    if !path_changed
150        .map(|path| path == Path::new(GRAFBASE_DIRECTORY_NAME).join(GRAFBASE_SCHEMA_FILE_NAME))
151        .unwrap_or_default()
152    {
153        for resolver in &mut resolvers {
154            resolver.fresh = false;
155        }
156    }
157
158    let environment = Environment::get();
159
160    let resolver_paths = match build_resolvers(&sender, environment, &environment_variables, resolvers, tracing).await {
161        Ok(resolver_paths) => resolver_paths,
162        Err(error) => {
163            let _: Result<_, _> = sender.send(ServerMessage::CompilationError(error.to_string()));
164            // TODO consider disabling colored output from wrangler
165            let error = strip_ansi_escapes::strip(error.to_string().as_bytes())
166                .ok()
167                .and_then(|stripped| String::from_utf8(stripped).ok())
168                .unwrap_or_else(|| error.to_string());
169            tokio::spawn(async move { error_server::start(worker_port, error, bridge_event_bus).await }).await??;
170            return Ok(());
171        }
172    };
173
174    let (bridge_sender, mut bridge_receiver) = tokio::sync::mpsc::channel(128);
175
176    let mut bridge_handle =
177        tokio::spawn(async move { bridge::start(bridge_port, worker_port, bridge_sender, bridge_event_bus).await })
178            .fuse();
179
180    let sender_cloned = sender.clone();
181    tokio::spawn(async move {
182        while let Some(message) = bridge_receiver.recv().await {
183            sender_cloned.send(message).unwrap();
184        }
185    });
186
187    trace!("waiting for bridge ready");
188    tokio::select! {
189        _ = wait_for_event(receiver, |event| *event == Event::BridgeReady) => (),
190        result = &mut bridge_handle => {result??; return Ok(());}
191    };
192    trace!("bridge ready");
193
194    let registry_path = environment
195        .project_grafbase_registry_path
196        .to_str()
197        .ok_or(ServerError::ProjectPath)?;
198
199    trace!("spawning miniflare for the main worker");
200
201    let worker_port_string = worker_port.to_string();
202    let bridge_port_binding_string = format!("BRIDGE_PORT={bridge_port}");
203    let registry_text_blob_string = format!("REGISTRY={registry_path}");
204
205    let mut miniflare_arguments: Vec<_> = [
206        // used by miniflare when running normally as well
207        "--experimental-vm-modules",
208        "./node_modules/miniflare/dist/src/cli.js",
209        "--modules",
210        "--host",
211        "127.0.0.1",
212        "--port",
213        &worker_port_string,
214        "--no-update-check",
215        "--no-cf-fetch",
216        "--do-persist",
217        "--wrangler-config",
218        "./wrangler.toml",
219        "--binding",
220        &bridge_port_binding_string,
221        "--text-blob",
222        &registry_text_blob_string,
223        "--mount",
224        "stream-router=./stream-router",
225    ]
226    .into_iter()
227    .map(std::borrow::Cow::Borrowed)
228    .collect();
229    miniflare_arguments.extend(resolver_paths.into_iter().flat_map(|(resolver_name, resolver_path)| {
230        [
231            "--mount".into(),
232            format!(
233                "{resolver_name}={resolver_path}",
234                resolver_name = slug::slugify(resolver_name),
235                resolver_path = resolver_path.display()
236            )
237            .into(),
238        ]
239    }));
240
241    #[cfg(feature = "dynamodb")]
242    {
243        #[allow(clippy::panic)]
244        fn get_env(key: &str) -> String {
245            let val = std::env::var(key).unwrap_or_else(|_| panic!("Environment variable not found:{key}"));
246            format!("{key}={val}")
247        }
248
249        miniflare_arguments.extend(
250            vec![
251                "AWS_ACCESS_KEY_ID",
252                "AWS_SECRET_ACCESS_KEY",
253                "DYNAMODB_REGION",
254                "DYNAMODB_TABLE_NAME",
255            ]
256            .iter()
257            .map(|key| get_env(key))
258            .flat_map(|env| {
259                std::iter::once(std::borrow::Cow::Borrowed("--binding")).chain(std::iter::once(env.into()))
260            }),
261        );
262    }
263
264    let mut miniflare = Command::new("node");
265    miniflare
266        // Unbounded worker limit
267        .env("MINIFLARE_SUBREQUEST_LIMIT", "1000")
268        .args(miniflare_arguments.iter().map(std::convert::AsRef::as_ref))
269        .stdout(if tracing { Stdio::inherit() } else { Stdio::piped() })
270        .stderr(if tracing { Stdio::inherit() } else { Stdio::piped() })
271        .current_dir(&environment.user_dot_grafbase_path)
272        .kill_on_drop(watch);
273    trace!("Spawning {miniflare:?}");
274    let miniflare = miniflare.spawn().map_err(ServerError::MiniflareCommandError)?;
275
276    let _: Result<_, _> = sender.send(ServerMessage::Ready(worker_port));
277
278    let miniflare_output_result = miniflare.wait_with_output();
279
280    tokio::select! {
281        result = miniflare_output_result => {
282            let output = result.map_err(ServerError::MiniflareCommandError)?;
283
284            output
285                .status
286                .success()
287                .then_some(())
288                .ok_or_else(|| ServerError::MiniflareError(String::from_utf8_lossy(&output.stderr).into_owned()))?;
289        }
290        bridge_handle_result = bridge_handle => { bridge_handle_result??; }
291    }
292
293    Ok(())
294}
295
296fn export_embedded_files() -> Result<(), ServerError> {
297    let environment = Environment::get();
298
299    let current_version = env!("CARGO_PKG_VERSION");
300
301    let version_path = environment.user_dot_grafbase_path.join(ASSET_VERSION_FILE);
302
303    let export_files = if env::var("GRAFBASE_SKIP_ASSET_VERSION_CHECK").is_ok() {
304        false
305    } else if env::var("GRAFBASE_FORCE_EXPORT_FILES").is_ok() {
306        true
307    } else if version_path.exists() {
308        let asset_version = fs::read_to_string(&version_path).map_err(|_| ServerError::ReadVersion)?;
309        current_version != asset_version
310    } else {
311        true
312    };
313
314    if export_files {
315        trace!("writing worker files");
316
317        fs::create_dir_all(&environment.user_dot_grafbase_path).map_err(|_| ServerError::CreateCacheDir)?;
318
319        let gitignore_path = &environment.user_dot_grafbase_path.join(GIT_IGNORE_FILE);
320
321        fs::write(gitignore_path, GIT_IGNORE_CONTENTS)
322            .map_err(|_| ServerError::WriteFile(gitignore_path.to_string_lossy().into_owned()))?;
323
324        let mut write_results = Assets::iter().map(|path| {
325            let file = Assets::get(path.as_ref());
326
327            let full_path = environment.user_dot_grafbase_path.join(path.as_ref());
328
329            let parent = full_path.parent().expect("must have a parent");
330            let create_dir_result = fs::create_dir_all(parent);
331
332            // must be Some(file) since we're iterating over existing paths
333            let write_result = create_dir_result.and_then(|_| fs::write(&full_path, file.unwrap().data));
334
335            (write_result, full_path)
336        });
337
338        if let Some((_, path)) = write_results.find(|(result, _)| result.is_err()) {
339            let error_path_string = path.to_string_lossy().into_owned();
340            return Err(ServerError::WriteFile(error_path_string));
341        }
342
343        if fs::write(&version_path, current_version).is_err() {
344            let version_path_string = version_path.to_string_lossy().into_owned();
345            return Err(ServerError::WriteFile(version_path_string));
346        };
347    }
348
349    Ok(())
350}
351
352fn create_project_dot_grafbase_directory() -> Result<(), ServerError> {
353    let environment = Environment::get();
354
355    let project_dot_grafbase_path = environment.project_dot_grafbase_path.clone();
356
357    if fs::metadata(&project_dot_grafbase_path).is_err() {
358        trace!("creating .grafbase directory");
359        fs::create_dir_all(&project_dot_grafbase_path).map_err(|_| ServerError::CreateCacheDir)?;
360        fs::write(project_dot_grafbase_path.join(GIT_IGNORE_FILE), "*\n").map_err(|_| ServerError::CreateCacheDir)?;
361    }
362
363    Ok(())
364}
365
366#[derive(serde::Deserialize)]
367struct SchemaParserResult {
368    #[allow(dead_code)]
369    required_resolvers: Vec<String>,
370    versioned_registry: serde_json::Value,
371}
372
373pub struct DetectedResolver {
374    pub resolver_name: String,
375    pub fresh: bool,
376}
377
378// schema-parser is run via NodeJS due to it being built to run in a Wasm (via wasm-bindgen) environement
379// and due to schema-parser not being open source
380async fn run_schema_parser(
381    environment_variables: &std::collections::HashMap<String, String>,
382) -> Result<Vec<DetectedResolver>, ServerError> {
383    trace!("parsing schema");
384
385    let environment = Environment::get();
386
387    let parser_path = environment
388        .user_dot_grafbase_path
389        .join(SCHEMA_PARSER_DIR)
390        .join(SCHEMA_PARSER_INDEX);
391
392    let parser_result_path = tokio::task::spawn_blocking(tempfile::NamedTempFile::new)
393        .await?
394        .map_err(ServerError::CreateTemporaryFile)?
395        .into_temp_path();
396
397    trace!(
398        "using a temporary file for the parser output: {parser_result_path}",
399        parser_result_path = parser_result_path.display()
400    );
401
402    let output = {
403        let mut node_command = Command::new("node")
404            .args([
405                parser_path.to_str().ok_or(ServerError::CachePath)?,
406                environment
407                    .project_grafbase_schema_path
408                    .to_str()
409                    .ok_or(ServerError::ProjectPath)?,
410                parser_result_path.to_str().expect("must be a valid path"),
411            ])
412            .current_dir(&environment.project_dot_grafbase_path)
413            .stdin(Stdio::piped())
414            .stderr(Stdio::piped())
415            .stdout(Stdio::piped())
416            .spawn()
417            .map_err(ServerError::SchemaParserError)?;
418
419        let node_command_stdin = node_command.stdin.as_mut().expect("stdin must be available");
420        node_command_stdin
421            .write_all(&serde_json::to_vec(environment_variables).expect("must serialise to JSON just fine"))
422            .await
423            .map_err(ServerError::SchemaParserError)?;
424
425        node_command
426            .wait_with_output()
427            .await
428            .map_err(ServerError::SchemaParserError)?
429    };
430
431    if !output.status.success() {
432        return Err(ServerError::ParseSchema(
433            String::from_utf8_lossy(&output.stderr).into_owned(),
434        ));
435    }
436
437    let parser_result_string = tokio::fs::read_to_string(&parser_result_path)
438        .await
439        .map_err(ServerError::SchemaParserResultRead)?;
440    let SchemaParserResult {
441        versioned_registry,
442        required_resolvers,
443    } = serde_json::from_str(&parser_result_string).map_err(ServerError::SchemaParserResultJson)?;
444
445    let registry_mtime = tokio::fs::metadata(&environment.project_grafbase_registry_path)
446        .await
447        .ok()
448        .map(|metadata| metadata.modified().expect("must be supported"));
449
450    let detected_resolvers = futures_util::future::join_all(required_resolvers.into_iter().map(|resolver_name| {
451        // Last file to be written to in the build process.
452        let wrangler_toml_path = environment
453            .resolvers_build_artifact_path
454            .join(&resolver_name)
455            .join("wrangler.toml");
456        async move {
457            let wrangler_toml_mtime = tokio::fs::metadata(&wrangler_toml_path)
458                .await
459                .ok()
460                .map(|metadata| metadata.modified().expect("must be supported"));
461            let fresh = registry_mtime
462                .zip(wrangler_toml_mtime)
463                .map(|(registry_mtime, wrangler_toml_mtime)| wrangler_toml_mtime > registry_mtime)
464                .unwrap_or_default();
465            DetectedResolver { resolver_name, fresh }
466        }
467    }))
468    .await;
469
470    tokio::fs::write(
471        &environment.project_grafbase_registry_path,
472        serde_json::to_string(&versioned_registry).expect("serde_json::Value serialises just fine for sure"),
473    )
474    .await
475    .map_err(ServerError::SchemaRegistryWrite)?;
476
477    Ok(detected_resolvers)
478}
479
480async fn get_node_version_string() -> Result<String, ServerError> {
481    let output = Command::new("node")
482        .arg("--version")
483        .stdout(Stdio::piped())
484        .stderr(Stdio::piped())
485        .spawn()
486        .map_err(|_| ServerError::CheckNodeVersion)?
487        .wait_with_output()
488        .await
489        .map_err(|_| ServerError::CheckNodeVersion)?;
490
491    let node_version_string = String::from_utf8_lossy(&output.stdout).trim().to_owned();
492
493    Ok(node_version_string)
494}
495
496async fn validate_node_version() -> Result<(), ServerError> {
497    trace!("validating Node.js version");
498    trace!("minimal supported Node.js version: {}", MIN_NODE_VERSION);
499
500    let node_version_string = get_node_version_string().await?;
501
502    trace!("installed node version: {}", node_version_string);
503
504    let node_version = Version::from(&node_version_string).ok_or(ServerError::CheckNodeVersion)?;
505    let min_version = Version::from(MIN_NODE_VERSION).expect("must be valid");
506
507    if node_version >= min_version {
508        Ok(())
509    } else {
510        Err(ServerError::OutdatedNode(
511            node_version_string,
512            MIN_NODE_VERSION.to_owned(),
513        ))
514    }
515}
516
517async fn validate_dependencies() -> Result<(), ServerError> {
518    trace!("validating dependencies");
519
520    which("node").map_err(|_| ServerError::NodeInPath)?;
521
522    validate_node_version().await?;
523
524    Ok(())
525}
526
527// the bridge runs on an available port within the ephemeral port range which is also supplied to the worker,
528// making the port choice and availability transprent to the user.
529// to avoid issues when starting multiple CLIs simultainiously,
530// we segment the ephemeral port range into 100 segments and select a segment based on the last two digits of the process ID.
531// this allows for simultainious start of up to 100 CLIs
532fn get_bridge_port(http_port: u16) -> Result<u16, ServerError> {
533    // must be 0-99, will fit in u16
534    #[allow(clippy::cast_possible_truncation)]
535    let segment = http_port % 100;
536    // since the size is `max - min` in a u16 range, will fit in u16
537    #[allow(clippy::cast_possible_truncation)]
538    let size = EPHEMERAL_PORT_RANGE.len() as u16;
539    let offset = size / 100 * segment;
540    let start = EPHEMERAL_PORT_RANGE.min().expect("must exist");
541    // allows us to loop back to the start of the range, giving any offset the same amount of potential ports
542    let range = EPHEMERAL_PORT_RANGE.map(|port| (port + offset) % size + start);
543
544    // TODO: loop back and limit iteration to get an even range for each
545    find_available_port_in_range(range, LocalAddressType::Localhost).ok_or(ServerError::AvailablePort)
546}