apollo_language_server/
server.rs

1use crate::{
2    custom_requests::{
3        ApolloCanComposeNotification, ApolloCanComposeNotificationParams,
4        ApolloComposeServicesParams, ApolloComposeServicesRequest, ApolloComposeServicesResponse,
5        ApolloConfigureAutoCompositionNotification, ApolloConfigureAutoCompositionParams,
6        APOLLO_COMPOSITION_PROGRESS_TOKEN,
7    },
8    graph::{
9        supergraph::{KnownSubgraphs, Supergraph},
10        Graph, GraphConfig,
11    },
12    semantic_tokens::{incomplete_tokens_to_deltas, LEGEND_TYPE},
13    telemetry::{AnalyticsEvent, CompositionTiming, TelemetryEvent},
14};
15use apollo_federation_types::{
16    composition::Issue, config::SchemaSource, javascript::SubgraphDefinition,
17};
18use debounced::debounced;
19use futures::{
20    channel::mpsc::{channel, Receiver, Sender},
21    future::join_all,
22    lock::Mutex,
23    SinkExt, StreamExt,
24};
25use serde::{Deserialize, Serialize};
26use std::{borrow::Cow, collections::HashMap};
27use std::{sync::Arc, time::Duration};
28use tower_lsp::{
29    async_trait, jsonrpc,
30    lsp_types::{self as lsp, notification::Notification, request::Request},
31    Client, ClientSocket, LanguageServer, LspService,
32};
33
34#[cfg(all(feature = "wasm", not(test)))]
35use wasm_bindgen_futures::spawn_local as spawn;
36
37#[cfg(any(feature = "tokio", test))]
38use tokio::spawn;
39
40pub const LANGUAGE_IDS: [&str; 2] = ["apollo-graphql", "graphql"];
41
42#[derive(Debug, Clone, Deserialize, Serialize, Default)]
43pub struct MaxSpecVersions {
44    pub connect: Option<semver::Version>,
45    pub federation: Option<semver::Version>,
46}
47
48/// Configuration options for the Apollo Language Server.
49#[derive(Debug, Clone, Deserialize, Serialize)]
50#[serde(rename_all = "camelCase", default)]
51pub struct Config {
52    /// The root URI for the workspace.
53    pub root_uri: String,
54    /// Whether to enable auto composition. This can also be configured at runtime by the client via the `apollo/configureAutoComposition` notification (with params `{ "enabled": boolean }`).
55    pub enable_auto_composition: bool,
56    /// Forces the server to treat all graphs as subgraphs
57    pub force_federation: bool,
58    /// Whether to disable telemetry. The language server does not submit telemetry itself, it only sends telemetry events to the client. The client is responsible for submitting telemetry to the telemetry service.
59    pub disable_telemetry: bool,
60    /// The maximum supported versions for the `federation` and `connect` specs.
61    /// This is used to determine which `@link` completions to show.
62    pub max_spec_versions: MaxSpecVersions,
63}
64
65impl Default for Config {
66    fn default() -> Self {
67        Self {
68            root_uri: "/".to_string(),
69            enable_auto_composition: false,
70            force_federation: false,
71            disable_telemetry: false,
72            max_spec_versions: MaxSpecVersions {
73                connect: None,
74                federation: None,
75            },
76        }
77    }
78}
79
80/// The Apollo Language Server.
81#[derive(Clone)]
82pub struct ApolloLanguageServer {
83    state: Arc<State>,
84}
85
86#[derive(Debug)]
87struct State {
88    graph: Mutex<Option<Graph>>,
89    client: Mutex<Client>,
90    request_composition: Mutex<Sender<Vec<SubgraphDefinition>>>,
91    config: Mutex<Config>,
92    document_change_queue_sender: Mutex<Sender<lsp::DidChangeTextDocumentParams>>,
93}
94
95const DOCUMENT_SIZE_DEBOUNCE_THRESHOLD: usize = 50_000;
96const DOCUMENT_UPDATE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(150);
97
98impl ApolloLanguageServer {
99    fn new(
100        client: Client,
101        request_composition: Sender<Vec<SubgraphDefinition>>,
102        config: Config,
103        known_subgraphs: KnownSubgraphs,
104    ) -> Self {
105        let (document_change_queue_sender, document_change_queue_receiver) =
106            channel::<lsp::DidChangeTextDocumentParams>(1);
107
108        // If we have known subgraphs, we should initialize a supergraph
109        let graph = (!known_subgraphs.by_name.is_empty() || config.force_federation).then_some(
110            Graph::Supergraph(Box::new(Supergraph::new(known_subgraphs))),
111        );
112
113        let state = State {
114            graph: Mutex::new(graph),
115            client: Mutex::new(client),
116            request_composition: Mutex::new(request_composition),
117            config: Mutex::new(config),
118            document_change_queue_sender: Mutex::new(document_change_queue_sender),
119        };
120        let state = Arc::new(state);
121        State::initialize_document_update_debouncer(state.clone(), document_change_queue_receiver);
122
123        Self { state }
124    }
125
126    /// Construct a new `LspService<ApolloLanguageServer>` with related channels
127    /// given the provided `Config`. This returns a tuple of:
128    /// - The built tower-lsp `LspService` instance.
129    /// - The `ClientSocket` instance.
130    /// - The `Receiver` for the composition request channel.
131    ///
132    /// Example:
133    /// ```no_run
134    /// use std::collections::HashMap;
135    /// use apollo_language_server::{ApolloLanguageServer, Config};
136    /// use tower_lsp::Server;
137    ///
138    /// let (service, client_socket, composition_request_receiver) = ApolloLanguageServer::build_service(Config::default(), HashMap::new());
139    /// let input = tokio::io::stdin();
140    /// let output = tokio::io::stdout();
141    /// Server::new(input, output, client_socket)
142    ///     .serve(service);
143    /// ```
144    pub fn build_service(
145        config: Config,
146        known_subgraphs: HashMap<String, SchemaSource>,
147    ) -> (
148        LspService<Self>,
149        ClientSocket,
150        Receiver<Vec<SubgraphDefinition>>,
151    ) {
152        let (composition_request_sender, composition_request_receiver) =
153            channel::<Vec<SubgraphDefinition>>(1);
154
155        #[cfg(any(not(feature = "wasm"), test))]
156        let known_subgraphs = KnownSubgraphs {
157            root_uri: config.root_uri.clone(),
158            by_name: known_subgraphs
159                .iter()
160                .map(|(name, source)| (name.clone(), source.clone()))
161                .collect(),
162            by_uri: known_subgraphs
163                .iter()
164                .filter_map(|(name, source)| match source {
165                    SchemaSource::File { file } => {
166                        let uri = if file.is_relative() {
167                            // build a uri from config.root_uri and file path
168                            let url = lsp::Url::from_directory_path(&config.root_uri)
169                                .expect("Failed to parse URL");
170                            url.join(file.to_str().expect("Failed to convert path to string"))
171                                .expect("Failed to join URL")
172                        } else {
173                            lsp::Url::from_file_path(file).expect("Failed to convert path to URL")
174                        };
175
176                        Some((uri, name.clone()))
177                    }
178                    _ => None,
179                })
180                .collect(),
181        };
182
183        #[cfg(all(feature = "wasm", not(test)))]
184        let known_subgraphs = KnownSubgraphs::default();
185
186        let (service, client_socket) = LspService::build(|client| {
187            ApolloLanguageServer::new(client, composition_request_sender, config, known_subgraphs)
188        })
189        .custom_method(
190            ApolloConfigureAutoCompositionNotification::METHOD,
191            ApolloLanguageServer::configure_auto_composition,
192        )
193        .custom_method(
194            ApolloComposeServicesRequest::METHOD,
195            ApolloLanguageServer::request_recompose,
196        )
197        .finish();
198        (service, client_socket, composition_request_receiver)
199    }
200
201    fn capabilities() -> lsp::InitializeResult {
202        lsp::InitializeResult {
203            capabilities: lsp::ServerCapabilities {
204                text_document_sync: Some(lsp::TextDocumentSyncCapability::Options(
205                    lsp::TextDocumentSyncOptions {
206                        change: Some(lsp::TextDocumentSyncKind::FULL),
207                        open_close: Some(true),
208                        ..Default::default()
209                    },
210                )),
211                completion_provider: Some(lsp::CompletionOptions {
212                    // Trigger characters force the completions prompt to show as soon as that character is typed in the case
213                    // that we need to show items when the user types a special character.
214                    trigger_characters: Some(vec![
215                        // `@` in order to properly show completions for directives.
216                        "@".to_string(),
217                        // `|` in order to properly show completions for directive locations.
218                        "|".to_string(),
219                        // `&` in order to properly show completions for implementing interfaces.
220                        "&".to_string(),
221                        // `"`  in order to properly show completions for particular string values (i.e. spec import directives).
222                        "\"".to_string(),
223                    ]),
224                    ..Default::default()
225                }),
226                semantic_tokens_provider: Some(
227                    lsp::SemanticTokensServerCapabilities::SemanticTokensOptions(
228                        lsp::SemanticTokensOptions {
229                            work_done_progress_options: lsp::WorkDoneProgressOptions {
230                                work_done_progress: None,
231                            },
232                            legend: lsp::SemanticTokensLegend {
233                                token_types: LEGEND_TYPE.into(),
234                                token_modifiers: vec![],
235                            },
236                            range: Some(false),
237                            full: Some(lsp::SemanticTokensFullOptions::Delta { delta: Some(true) }),
238                        },
239                    ),
240                ),
241                hover_provider: Some(lsp::HoverProviderCapability::Simple(true)),
242                definition_provider: Some(lsp::OneOf::Left(true)),
243                ..Default::default()
244            },
245            ..Default::default()
246        }
247    }
248
249    async fn request_recompose(
250        &self,
251        _params: ApolloComposeServicesParams,
252    ) -> jsonrpc::Result<ApolloComposeServicesResponse> {
253        let did_compose = self.state.maybe_recompose().await;
254        Ok(ApolloComposeServicesResponse { did_compose })
255    }
256
257    async fn configure_auto_composition(&self, params: ApolloConfigureAutoCompositionParams) {
258        {
259            let mut config = self.state.config.lock().await;
260            config.enable_auto_composition = params.enabled;
261        }
262        self.state.maybe_auto_recompose().await;
263    }
264
265    /// This method must be called by the host when composition begins in order
266    /// to notify the language client with a `WorkDoneProgressBegin` event. The
267    /// corresponding `WorkDoneProgressEnd` event will be sent by the language
268    /// server in `composition_did_update`.
269    pub async fn composition_did_start(&self) {
270        self.state
271            .client
272            .lock()
273            .await
274            .send_notification::<lsp::notification::Progress>(lsp::ProgressParams {
275                token: lsp::ProgressToken::String(APOLLO_COMPOSITION_PROGRESS_TOKEN.to_string()),
276                value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Begin(
277                    lsp::WorkDoneProgressBegin {
278                        title: "Composing services".to_string(),
279                        cancellable: Some(false),
280                        message: None,
281                        percentage: None,
282                    },
283                )),
284            })
285            .await;
286    }
287
288    /// This method must be called by the host of the language server after
289    /// completing a composition request. It will update the supergraph and
290    /// subsequently publish diagnostics for each subgraph.
291    /// > Note: A `WorkDoneProgressEnd` event will be sent after this completes.
292    /// > The `composition_did_start` method must be called before this method
293    /// > in order to send the matching `WorkDoneProgressBegin` event.
294    pub async fn composition_did_update(
295        &self,
296        _supergraph_sdl: Option<String>,
297        issues: Vec<Issue>,
298        timing: Option<Duration>,
299    ) {
300        self.state.composition_did_update(issues).await;
301        if let Some(timing) = timing {
302            self.send_telemetry_if_enabled(TelemetryEvent::Analytics(
303                AnalyticsEvent::CompositionTimeInMs(CompositionTiming {
304                    value: timing.as_millis() as u64,
305                }),
306            ))
307            .await;
308        }
309    }
310
311    /// This method must be called by the host of the language server when a new
312    /// subgraph is added to the supergraph. Concretely, when a new subgraph is
313    /// added to the supergraph.yaml, this method should be called.
314    #[cfg(any(not(feature = "wasm"), test))]
315    pub async fn add_subgraph(&self, name: String, source: SchemaSource) {
316        let Some(Graph::Supergraph(supergraph)) = &mut *self.state.graph.lock().await else {
317            panic!("Graph is unexpectedly not a supergraph");
318        };
319
320        supergraph.add_known_subgraph(name, source).await;
321    }
322
323    /// This method must be called by the host of the language server when a
324    /// subgraph is removed from the supergraph. Concretely, when a subgraph is
325    /// removed from the supergraph.yaml, this method should be called.
326    #[cfg(any(not(feature = "wasm"), test))]
327    pub async fn remove_subgraph(&self, name: &str) {
328        let Some(Graph::Supergraph(supergraph)) = &mut *self.state.graph.lock().await else {
329            panic!("Graph is unexpectedly not a supergraph");
330        };
331
332        if let Some(uri) = supergraph.uri_for_name(name) {
333            self.state
334                .client
335                .lock()
336                .await
337                .publish_diagnostics(uri.clone(), vec![], None)
338                .await;
339        }
340
341        supergraph.remove_known_subgraph(name).await;
342    }
343
344    /// The host can call this method to publish diagnostics for a given URI.
345    pub async fn publish_diagnostics(&self, uri: lsp::Url, diagnostics: Vec<lsp::Diagnostic>) {
346        let graph = self.state.graph.lock().await;
347        let version = graph.as_ref().and_then(|graph| graph.version_for_uri(&uri));
348
349        self.state
350            .client
351            .lock()
352            .await
353            .publish_diagnostics(uri, diagnostics, version)
354            .await;
355    }
356
357    async fn send_telemetry_if_enabled(&self, event: TelemetryEvent) {
358        if !self.state.config.lock().await.disable_telemetry {
359            self.state.client.lock().await.telemetry_event(event).await;
360        }
361    }
362
363    #[cfg(test)]
364    async fn config(&self) -> Config {
365        self.state.config.lock().await.clone()
366    }
367}
368
369impl State {
370    async fn maybe_auto_recompose(&self) {
371        if !self.config.lock().await.enable_auto_composition {
372            return;
373        }
374        self.maybe_recompose().await;
375    }
376
377    async fn maybe_send_can_compose_notification(&self) {
378        let invalid_subgraph_names = if let Some(Some(supergraph)) =
379            self.graph.lock().await.as_ref().map(Graph::supergraph)
380        {
381            // We only want to allow recomposition if none of the subgraphs have
382            // any parse/validation errors. We know those will need to be
383            // addressed before we can attempt to compose.
384            supergraph.get_invalid_subgraph_uris()
385        } else {
386            return;
387        };
388
389        self.client
390            .lock()
391            .await
392            .send_notification::<ApolloCanComposeNotification>(ApolloCanComposeNotificationParams {
393                can_compose: invalid_subgraph_names.is_empty(),
394                subgraphs_with_errors: invalid_subgraph_names,
395            })
396            .await;
397    }
398
399    async fn maybe_recompose(&self) -> bool {
400        let subgraph_definitions = if let Some(Some(supergraph)) =
401            self.graph.lock().await.as_ref().map(Graph::supergraph)
402        {
403            // We only want to recompose if none of the subgraphs have any
404            // parse/validation errors. We know those will need to be addressed
405            // before we can attempt to compose.
406            if !supergraph.subgraphs_are_invalid() {
407                supergraph.subgraph_definitions()
408            } else {
409                return false;
410            }
411        } else {
412            return false;
413        };
414
415        self.request_composition
416            .lock()
417            .await
418            .send(subgraph_definitions)
419            .await
420            .expect("Failed to send message");
421        true
422    }
423
424    async fn composition_did_update(&self, issues: Vec<Issue>) {
425        let (diagnostics_by_subgraph, unattributed_diagnostics) = {
426            let graph = self.graph.lock().await;
427            match graph.as_ref() {
428                Some(Graph::Supergraph(supergraph)) => supergraph.diagnostics_for_composition(issues),
429                _ => panic!("Programming error: called `composition_did_update` when graph is not a supergraph."),
430            }
431        };
432
433        let client = self.client.lock().await;
434        join_all(
435            diagnostics_by_subgraph
436                .into_iter()
437                .map(|(url, (diagnostics, version))| {
438                    client.publish_diagnostics(url, diagnostics, Some(version))
439                }),
440        )
441        .await;
442
443        client
444            .publish_diagnostics(
445                lsp::Url::parse(&self.config.lock().await.root_uri).expect("Failed to parse URL"),
446                unattributed_diagnostics,
447                0.into(),
448            )
449            .await;
450
451        client
452            .send_notification::<lsp::notification::Progress>(lsp::ProgressParams {
453                token: lsp::ProgressToken::String(APOLLO_COMPOSITION_PROGRESS_TOKEN.to_string()),
454                value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::End(
455                    lsp::WorkDoneProgressEnd { message: None },
456                )),
457            })
458            .await;
459    }
460
461    fn initialize_document_update_debouncer(
462        state: Arc<State>,
463        document_change_queue_receiver: Receiver<lsp::DidChangeTextDocumentParams>,
464    ) {
465        spawn(async move {
466            let mut debounced_document_change_queue = debounced(
467                document_change_queue_receiver,
468                DOCUMENT_UPDATE_DEBOUNCE_INTERVAL,
469            );
470
471            while let Some(data) = debounced_document_change_queue.next().await {
472                state.handle_document_update(data).await;
473            }
474        });
475    }
476
477    async fn handle_document_update(&self, data: lsp::DidChangeTextDocumentParams) {
478        let lsp::DidChangeTextDocumentParams {
479            text_document,
480            content_changes,
481        } = data;
482        let lsp::VersionedTextDocumentIdentifier { uri, version } = text_document;
483        let text = content_changes
484            .into_iter()
485            .next()
486            .expect("Expected at least one content change")
487            .text;
488
489        let graph_config = {
490            let config = self.config.lock().await;
491
492            GraphConfig {
493                force_federation: config.force_federation,
494            }
495        };
496
497        let (diagnostics, version) = {
498            let mut graph = self.graph.lock().await;
499
500            let Some(graph) = graph.as_mut() else {
501                panic!("Attempted to change a document that hasn't been opened");
502            };
503            graph.update(uri.clone(), text, version, graph_config);
504
505            graph.diagnostics_for_uri(&uri)
506        };
507
508        self.client
509            .lock()
510            .await
511            .publish_diagnostics(uri.clone(), diagnostics, Some(version))
512            .await;
513
514        self.maybe_auto_recompose().await;
515        self.maybe_send_can_compose_notification().await;
516    }
517}
518
519#[async_trait]
520impl LanguageServer for ApolloLanguageServer {
521    async fn initialize(
522        &self,
523        initialize_params: lsp::InitializeParams,
524    ) -> jsonrpc::Result<lsp::InitializeResult> {
525        let options: Option<Config> = initialize_params
526            .initialization_options
527            .map(|options| {
528                serde_json::from_value(options).map_err(|err| jsonrpc::Error {
529                    message: Cow::from(err.to_string()),
530                    code: jsonrpc::ErrorCode::InvalidParams,
531                    data: None,
532                })
533            })
534            .transpose()?;
535
536        let mut config = self.state.config.lock().await;
537        if let Some(options) = options {
538            *config = options;
539        }
540        config.root_uri = initialize_params
541            .root_uri
542            .map(|uri| uri.to_string())
543            .unwrap_or("inmemory://".to_string());
544
545        Ok(ApolloLanguageServer::capabilities())
546    }
547
548    async fn initialized(&self, _: lsp::InitializedParams) {
549        self.send_telemetry_if_enabled(TelemetryEvent::Analytics(AnalyticsEvent::Initialized))
550            .await;
551    }
552
553    async fn shutdown(&self) -> jsonrpc::Result<()> {
554        Ok(())
555    }
556
557    async fn did_open(&self, params: lsp::DidOpenTextDocumentParams) {
558        let lsp::TextDocumentItem {
559            uri,
560            version,
561            text,
562            language_id,
563        } = params.text_document;
564
565        if !LANGUAGE_IDS.contains(&language_id.as_str()) {
566            return;
567        }
568
569        let graph_config = {
570            let config = self.state.config.lock().await;
571            GraphConfig {
572                force_federation: config.force_federation,
573            }
574        };
575
576        // This step applies to both monolith and supergraph. Capture any
577        // diagnostics for the new document. We'll publish them after the block
578        // so as not to hold the lock across an `await`.
579        let (diagnostics_for_uri, _) = {
580            let mut graph = self.state.graph.lock().await;
581            if graph.is_some() {
582                graph
583                    .as_mut()
584                    .unwrap()
585                    .update(uri.clone(), text, version, graph_config);
586            } else {
587                let new_graph = Graph::new(
588                    uri.clone(),
589                    text,
590                    version,
591                    KnownSubgraphs::default(),
592                    graph_config,
593                );
594                *graph = Some(new_graph);
595            };
596
597            graph.as_ref().unwrap().diagnostics_for_uri(&uri)
598        };
599
600        if !diagnostics_for_uri.is_empty() {
601            self.state
602                .client
603                .lock()
604                .await
605                .publish_diagnostics(uri, diagnostics_for_uri, Some(version))
606                .await;
607        }
608
609        self.state.maybe_auto_recompose().await;
610        self.state.maybe_send_can_compose_notification().await;
611    }
612
613    async fn did_change(&self, params: lsp::DidChangeTextDocumentParams) {
614        if params.content_changes[0].text.len() < DOCUMENT_SIZE_DEBOUNCE_THRESHOLD {
615            self.state.handle_document_update(params).await;
616        } else {
617            self.state
618                .document_change_queue_sender
619                .lock()
620                .await
621                .send(params.clone())
622                .await
623                .expect("Failed to send message")
624        }
625    }
626
627    async fn did_close(&self, params: lsp::DidCloseTextDocumentParams) {
628        {
629            let mut graph = self.state.graph.lock().await;
630            match &mut *graph {
631                Some(Graph::Monolith(_)) => {
632                    *graph = None;
633                }
634                Some(Graph::Supergraph(supergraph)) => {
635                    if supergraph.remove(&params.text_document.uri).is_none() {
636                        panic!("Subgraph is unexpectedly None");
637                    }
638                }
639                None => panic!("Graph is unexpectedly None"),
640            }
641        }
642        self.state.maybe_auto_recompose().await;
643        self.state.maybe_send_can_compose_notification().await;
644    }
645
646    async fn completion(
647        &self,
648        params: lsp::CompletionParams,
649    ) -> jsonrpc::Result<Option<lsp::CompletionResponse>> {
650        let graph = self.state.graph.lock().await;
651        let max_spec_versions = &self.state.config.lock().await.max_spec_versions;
652
653        Ok(graph.as_ref().and_then(|graph| {
654            graph.completions(
655                &params.text_document_position.text_document.uri,
656                params.text_document_position.position,
657                max_spec_versions,
658            )
659        }))
660    }
661
662    async fn semantic_tokens_full(
663        &self,
664        params: lsp::SemanticTokensParams,
665    ) -> jsonrpc::Result<Option<lsp::SemanticTokensResult>> {
666        let graph = self.state.graph.lock().await;
667        Ok(graph.as_ref().map(|graph| {
668            let tokens = graph
669                .semantic_tokens_full(&params.text_document.uri)
670                .unwrap_or_default();
671            lsp::SemanticTokensResult::Tokens(lsp::SemanticTokens {
672                result_id: None,
673                data: incomplete_tokens_to_deltas(tokens),
674            })
675        }))
676    }
677
678    async fn hover(&self, params: lsp::HoverParams) -> jsonrpc::Result<Option<lsp::Hover>> {
679        let graph = self.state.graph.lock().await;
680        Ok(graph.as_ref().and_then(|graph| {
681            graph.on_hover(
682                &params.text_document_position_params.text_document.uri,
683                &params.text_document_position_params.position,
684            )
685        }))
686    }
687
688    async fn goto_definition(
689        &self,
690        params: lsp::GotoDefinitionParams,
691    ) -> jsonrpc::Result<Option<lsp::GotoDefinitionResponse>> {
692        let graph = self.state.graph.lock().await;
693        Ok(graph.as_ref().map(|graph| {
694            lsp::GotoDefinitionResponse::from(
695                graph
696                    .goto_definition(
697                        &params.text_document_position_params.text_document.uri,
698                        &params.text_document_position_params.position,
699                    )
700                    .unwrap_or_default(),
701            )
702        }))
703    }
704}
705
706#[cfg(test)]
707mod tests {
708    use super::*;
709    use apollo_federation_types::composition::{Severity, SubgraphLocation};
710    use lsp::{InitializeParams, PublishDiagnosticsParams};
711    use serde_json::{from_value, json, to_value};
712    use tokio::{sync::Mutex, task::yield_now};
713    use tower::{Service, ServiceExt};
714    use tower_lsp::{jsonrpc, LspService, Server};
715
716    fn initialize_request(id: i64) -> jsonrpc::Request {
717        jsonrpc::Request::build("initialize")
718            .params(json!({"capabilities":{}}))
719            .id(id)
720            .finish()
721    }
722
723    fn initialize_request_with_server_configurations(id: i64) -> jsonrpc::Request {
724        jsonrpc::Request::build("initialize")
725            .params(
726                to_value(InitializeParams {
727                    initialization_options: Some(json!({
728                        "enableAutoComposition": true,
729                        "disableTelemetry": true,
730                    })),
731                    process_id: None,
732                    root_uri: Some(lsp::Url::from_directory_path("/path/to/project").unwrap()),
733                    ..Default::default()
734                })
735                .unwrap(),
736            )
737            .id(id)
738            .finish()
739    }
740
741    fn initialized_notification() -> jsonrpc::Request {
742        jsonrpc::Request::build("initialized")
743            .params(json!({}))
744            .finish()
745    }
746
747    fn shutdown_request() -> jsonrpc::Request {
748        jsonrpc::Request::build("shutdown").finish()
749    }
750
751    fn server_capabilities() -> serde_json::Value {
752        serde_json::to_value(ApolloLanguageServer::capabilities()).unwrap()
753    }
754
755    async fn request(
756        server: &mut LspService<ApolloLanguageServer>,
757        request: jsonrpc::Request,
758    ) -> Option<jsonrpc::Response> {
759        server.ready().await.unwrap().call(request).await.unwrap()
760    }
761
762    #[allow(clippy::type_complexity)]
763    fn listen_to_channels(
764        mut socket: ClientSocket,
765        mut composition_listener: Receiver<Vec<SubgraphDefinition>>,
766    ) -> (
767        Arc<Mutex<Vec<jsonrpc::Request>>>,
768        Arc<Mutex<Vec<Vec<SubgraphDefinition>>>>,
769    ) {
770        let socket_messages = Arc::new(Mutex::new(Vec::default()));
771        let composition_listener_messages = Arc::new(Mutex::new(Vec::default()));
772
773        let socket_messages_inner = socket_messages.clone();
774        let composition_listener_messages_inner = composition_listener_messages.clone();
775        tokio::spawn(async move {
776            while let Some(data) = socket.next().await {
777                socket_messages_inner.lock().await.push(data)
778            }
779        });
780        tokio::spawn(async move {
781            while let Some(data) = composition_listener.next().await {
782                composition_listener_messages_inner.lock().await.push(data)
783            }
784        });
785
786        (socket_messages, composition_listener_messages)
787    }
788
789    #[tokio::test]
790    async fn initialization_and_shutdown() {
791        let (mut server, ..) =
792            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
793
794        assert_eq!(
795            request(&mut server, initialize_request(1)).await,
796            Some(jsonrpc::Response::from_ok(1.into(), server_capabilities()))
797        );
798
799        assert_eq!(request(&mut server, initialized_notification()).await, None);
800
801        // TODO: something isn't working here, should be an error after 2nd shutdown request
802        // and our shutdown method isn't being called at all
803        assert_eq!(request(&mut server, shutdown_request()).await, None);
804        assert_eq!(request(&mut server, shutdown_request()).await, None);
805    }
806
807    #[tokio::test]
808    async fn initialization_with_client_config() {
809        let (mut server, ..) =
810            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
811
812        assert_eq!(
813            request(
814                &mut server,
815                initialize_request_with_server_configurations(1)
816            )
817            .await,
818            Some(jsonrpc::Response::from_ok(1.into(), server_capabilities()))
819        );
820
821        assert!(server.inner().config().await.enable_auto_composition);
822        assert_eq!(
823            server.inner().config().await.root_uri,
824            "file:///path/to/project/".to_string()
825        );
826
827        assert_eq!(request(&mut server, initialized_notification()).await, None);
828    }
829
830    #[tokio::test]
831    async fn initialization_without_root_uri_specified() {
832        let (mut server, ..) =
833            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
834
835        assert_eq!(
836            request(&mut server, initialize_request(1)).await,
837            Some(jsonrpc::Response::from_ok(1.into(), server_capabilities()))
838        );
839
840        assert_eq!(
841            server.inner().state.config.lock().await.root_uri,
842            "inmemory://".to_string()
843        );
844
845        assert_eq!(request(&mut server, initialized_notification()).await, None);
846    }
847
848    #[tokio::test]
849    async fn diagnostics() {
850        let (mut server, socket, composition_listener) =
851            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
852
853        let (socket_messages, composition_listener_messages) =
854            listen_to_channels(socket, composition_listener);
855
856        request(
857            &mut server,
858            // enable composition for this test
859            initialize_request_with_server_configurations(1),
860        )
861        .await;
862        request(&mut server, initialized_notification()).await;
863
864        // On didOpen, we don't expect any diagnostics to be published when the
865        // document is valid. Further down, on didChange, we expect empty diagnostics
866        // to be published to clear any existing ones.
867        request(
868            &mut server,
869            jsonrpc::Request::build("textDocument/didOpen")
870                .params(json!({
871                    "textDocument": {
872                        "uri": "file:///path/to/file",
873                        "version": 1,
874                        "languageId": "apollo-graphql",
875                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
876                    }
877                }))
878                .finish(),
879        )
880        .await;
881
882        request(
883            &mut server,
884            jsonrpc::Request::build("textDocument/didChange")
885                .params(json!({
886                    "textDocument": {
887                        "uri": "file:///path/to/file",
888                        "version": 2,
889                    },
890                    "contentChanges": [{
891                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String! }"
892                    }]
893                }))
894                .finish(),
895        )
896        .await;
897
898        // Wait for the client and composition channel to send their messages
899        yield_now().await;
900
901        let messages = &*socket_messages.lock().await;
902        assert_eq!(
903            messages,
904            &[
905                // no diagnostics published on didOpen, since the document is valid
906                jsonrpc::Request::build("apollo/canCompose")
907                    .params(json!({"canCompose": true, "subgraphsWithErrors": []}))
908                    .finish(),
909                // empty diagnostics published on didChange to clear any existing ones
910                jsonrpc::Request::build("textDocument/publishDiagnostics")
911                    .params(json!({"uri": "file:///path/to/file", "diagnostics": [], "version": 2}))
912                    .finish(),
913                jsonrpc::Request::build("apollo/canCompose")
914                    .params(json!({"canCompose": true, "subgraphsWithErrors": []}))
915                    .finish(),
916            ]
917        );
918
919        let composition_messages = &*composition_listener_messages.lock().await;
920        assert_eq!(composition_messages, &[
921            [
922                SubgraphDefinition {
923                    name: "file:///path/to/file".into(), 
924                    url: "file:///path/to/file".into(), 
925                    sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }".into() 
926                }
927            ],[
928                SubgraphDefinition {
929                    name: "file:///path/to/file".into(), 
930                    url: "file:///path/to/file".into(), 
931                    sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String! }".into() 
932                }
933            ]
934        ]);
935    }
936
937    #[tokio::test]
938    async fn registers_non_zero_document_version_on_open() {
939        let (mut server, socket, composition_listener) =
940            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
941
942        let (socket_messages, _) = listen_to_channels(socket, composition_listener);
943
944        request(
945            &mut server,
946            // enable composition for this test
947            initialize_request_with_server_configurations(1),
948        )
949        .await;
950        request(&mut server, initialized_notification()).await;
951
952        request(
953            &mut server,
954            jsonrpc::Request::build("textDocument/didOpen")
955                .params(json!({
956                    "textDocument": {
957                        "uri": "file:///path/to/file",
958                        "version": 42,
959                        "languageId": "apollo-graphql",
960                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String"
961                    }
962                }))
963                .finish(),
964        )
965        .await;
966
967        let messages = &*socket_messages.lock().await;
968        // The published diagnostics should include the document version that
969        // was provided in the didOpen request.
970        assert_eq!(messages[0].params().unwrap()["version"], 42);
971    }
972
973    #[tokio::test]
974    async fn runs_in_tower_lsp_server() {
975        fn mock_msg(msg: &str) -> String {
976            format!("Content-Length: {}\r\n\r\n{}", msg.len(), msg)
977        }
978
979        let mock_request = mock_msg(
980            r#"{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":0}"#,
981        );
982
983        let mock_response = mock_msg(
984            &serde_json::to_string(&jsonrpc::Response::from_ok(0.into(), server_capabilities()))
985                .unwrap(),
986        );
987
988        let (sender, _) = channel::<Vec<SubgraphDefinition>>(1);
989        let (service, socket) = LspService::new(|client| {
990            ApolloLanguageServer::new(client, sender, Config::default(), Default::default())
991        });
992
993        let mut stdout = Vec::new();
994
995        Server::new(&mut mock_request.as_bytes(), &mut stdout, socket)
996            .serve(service)
997            .await;
998
999        let stdout = String::from_utf8(stdout).unwrap();
1000        assert_eq!(stdout, mock_response);
1001    }
1002
1003    #[tokio::test]
1004    async fn did_close() {
1005        let (mut server, _, mut composition_listener) =
1006            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
1007
1008        request(
1009            &mut server,
1010            // enable composition for this test
1011            initialize_request_with_server_configurations(1),
1012        )
1013        .await;
1014        request(&mut server, initialized_notification()).await;
1015
1016        // On didOpen, we don't expect any diagnostics to be published when the
1017        // document is valid.
1018        request(
1019            &mut server,
1020            jsonrpc::Request::build("textDocument/didOpen")
1021                .params(json!({
1022                    "textDocument": {
1023                        "uri": "file:///path/to/file1",
1024                        "version": 1,
1025                        "languageId": "apollo-graphql",
1026                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
1027                    }
1028                }))
1029                .finish(),
1030        )
1031        .await;
1032
1033        // The server will hang / wait for this message to be consumed. For this
1034        // test we don't really need to do anything with it.
1035        composition_listener.next().await.unwrap();
1036
1037        request(
1038            &mut server,
1039            jsonrpc::Request::build("textDocument/didOpen")
1040                .params(json!({
1041                    "textDocument": {
1042                        "uri": "file:///path/to/file2",
1043                        "version": 1,
1044                        "languageId": "apollo-graphql",
1045                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { goodbye: String! }"
1046                    },
1047                }))
1048                .finish(),
1049        )
1050        .await;
1051
1052        // The server will hang / wait for this message to be consumed. For this
1053        // test we don't really need to do anything with it.
1054        composition_listener.next().await.unwrap();
1055
1056        request(
1057            &mut server,
1058            jsonrpc::Request::build("textDocument/didClose")
1059                .params(json!({
1060                    "textDocument": {
1061                        "uri": "file:///path/to/file1",
1062                    }
1063                }))
1064                .finish(),
1065        )
1066        .await;
1067
1068        // On didClose, we expect composition to be requested again. If it
1069        // doesn't, this test will hang.
1070        composition_listener.next().await.unwrap();
1071    }
1072
1073    #[tokio::test]
1074    async fn custom_notification_toggle_auto_composition() {
1075        let (mut server, _, _) =
1076            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
1077
1078        request(
1079            &mut server,
1080            initialize_request_with_server_configurations(1),
1081        )
1082        .await;
1083        request(&mut server, initialized_notification()).await;
1084
1085        assert!(server.inner().config().await.enable_auto_composition);
1086        request(
1087            &mut server,
1088            jsonrpc::Request::build(ApolloConfigureAutoCompositionNotification::METHOD)
1089                .params(json!({"enabled": false}))
1090                .finish(),
1091        )
1092        .await;
1093        assert!(!server.inner().config().await.enable_auto_composition);
1094        request(
1095            &mut server,
1096            jsonrpc::Request::build(ApolloConfigureAutoCompositionNotification::METHOD)
1097                .params(json!({"enabled": true}))
1098                .finish(),
1099        )
1100        .await;
1101        assert!(server.inner().config().await.enable_auto_composition);
1102    }
1103
1104    #[tokio::test]
1105    async fn custom_notification_trigger_recomposition() {
1106        let (mut server, socket, composition_listener) =
1107            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
1108
1109        let (socket_messages, composition_listener_messages) =
1110            listen_to_channels(socket, composition_listener);
1111
1112        request(
1113            &mut server,
1114            initialize_request_with_server_configurations(1),
1115        )
1116        .await;
1117
1118        request(&mut server, initialized_notification()).await;
1119
1120        request(
1121            &mut server,
1122            jsonrpc::Request::build("textDocument/didOpen")
1123                .params(json!({
1124                    "textDocument": {
1125                        "uri": "file:///path/to/file1",
1126                        "version": 1,
1127                        "languageId": "apollo-graphql",
1128                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
1129                    }
1130                }))
1131                .finish(),
1132        ).await;
1133
1134        request(
1135            &mut server,
1136            jsonrpc::Request::build(ApolloComposeServicesRequest::METHOD)
1137                .id(100)
1138                .params(json!({}))
1139                .finish(),
1140        )
1141        .await;
1142
1143        let messages = &*socket_messages.lock().await;
1144        assert_eq!(
1145            messages,
1146            &[jsonrpc::Request::build("apollo/canCompose")
1147                .params(json!({"canCompose": true, "subgraphsWithErrors": []}))
1148                .finish(),]
1149        );
1150
1151        let composition_messages = &*composition_listener_messages.lock().await;
1152        assert_eq!(composition_messages, &[
1153            [
1154                SubgraphDefinition {
1155                    name: "file:///path/to/file1".into(), 
1156                    url: "file:///path/to/file1".into(), 
1157                    sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }".into() 
1158                }
1159            ],
1160            // the manual request
1161            [
1162                SubgraphDefinition {
1163                    name: "file:///path/to/file1".into(), 
1164                    url: "file:///path/to/file1".into(), 
1165                    sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }".into() 
1166                }
1167            ]
1168        ]);
1169    }
1170
1171    #[tokio::test]
1172    async fn with_graphql_language_id() {
1173        let (mut server, _, mut composition_listener) =
1174            ApolloLanguageServer::build_service(Config::default(), HashMap::new());
1175
1176        request(
1177            &mut server,
1178            // enable composition for this test
1179            initialize_request_with_server_configurations(1),
1180        )
1181        .await;
1182        request(&mut server, initialized_notification()).await;
1183
1184        // On didOpen, we don't expect any diagnostics to be published when the
1185        // document is valid.
1186        request(
1187            &mut server,
1188            jsonrpc::Request::build("textDocument/didOpen")
1189                .params(json!({
1190                    "textDocument": {
1191                        "uri": "file:///path/to/file1",
1192                        "version": 1,
1193                        "languageId": "graphql",
1194                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
1195                    }
1196                }))
1197                .finish(),
1198        )
1199        .await;
1200
1201        // The server will hang / wait for this message to be consumed. For this
1202        // test we don't really need to do anything with it.
1203        composition_listener.next().await.unwrap();
1204
1205        request(
1206            &mut server,
1207            jsonrpc::Request::build("textDocument/didChange")
1208                .params(json!({
1209                    "textDocument": {
1210                        "uri": "file:///path/to/file1",
1211                        "version": 2,
1212                    },
1213                    "contentChanges": [{
1214                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String! }"
1215                    }]
1216                }))
1217                .finish(),
1218        )
1219        .await;
1220
1221        // The server will hang / wait for this message to be consumed. For this
1222        // test we don't really need to do anything with it.
1223        composition_listener.next().await.unwrap();
1224
1225        request(
1226            &mut server,
1227            jsonrpc::Request::build("textDocument/didClose")
1228                .params(json!({
1229                    "textDocument": {
1230                        "uri": "file:///path/to/file1",
1231                    }
1232                }))
1233                .finish(),
1234        )
1235        .await;
1236
1237        // On didClose, we expect composition to be requested again. If it
1238        // doesn't, this test will hang.
1239        composition_listener.next().await.unwrap();
1240    }
1241
1242    #[tokio::test]
1243    async fn publishing_diagnostics_to_known_subgraphs() {
1244        // Initialize the server with known subgraphs.
1245        // Note that `remote` has no URI - its composition diagnostics should be published to the root URI.
1246        // The other two subgraphs should have their diagnostics published to their respective URIs.
1247        let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
1248            Config {
1249                root_uri: "/path/to/project".into(),
1250                ..Default::default()
1251            },
1252            [
1253                (
1254                    "local".into(),
1255                    SchemaSource::File {
1256                        file: "local.graphql".into(),
1257                    },
1258                ),
1259                (
1260                    "local_relative".into(),
1261                    SchemaSource::File {
1262                        file: "../../local_relative.graphql".into(),
1263                    },
1264                ),
1265                (
1266                    "remote".into(),
1267                    SchemaSource::Subgraph {
1268                        graphref: "testing@current".into(),
1269                        subgraph: "remote".into(),
1270                    },
1271                ),
1272            ]
1273            .into_iter()
1274            .collect(),
1275        );
1276
1277        let (socket_messages, _) = listen_to_channels(socket, composition_listener);
1278
1279        request(
1280            &mut service,
1281            initialize_request_with_server_configurations(1),
1282        )
1283        .await;
1284
1285        request(&mut service, initialized_notification()).await;
1286
1287        // didOpen request for local subgraph
1288        request(&mut service, jsonrpc::Request::build("textDocument/didOpen")
1289                .params(json!({
1290                    "textDocument": {
1291                        "uri": "file:///path/to/project/local.graphql",
1292                        "version": 1,
1293                        "languageId": "graphql",
1294                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
1295                    }
1296                }))
1297                .finish()).await;
1298
1299        // didOpen request for local_relative subgraph
1300        request(&mut service, jsonrpc::Request::build("textDocument/didOpen")
1301                .params(json!({
1302                    "textDocument": {
1303                        "uri": "file:///path/local_relative.graphql",
1304                        "version": 1,
1305                        "languageId": "graphql",
1306                        "text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello2: String }"
1307                    }
1308                }))
1309                .finish()).await;
1310
1311        service
1312            .inner()
1313            .composition_did_update(
1314                None,
1315                vec![
1316                    Issue {
1317                        code: "TEST_REMOTE".into(),
1318                        message: "Test issue".into(),
1319                        locations: vec![SubgraphLocation {
1320                            subgraph: Some("remote".into()),
1321                            range: None,
1322                        }],
1323                        severity: Severity::Error,
1324                    },
1325                    Issue {
1326                        code: "TEST_LOCAL".into(),
1327                        message: "Test issue".into(),
1328                        locations: vec![SubgraphLocation {
1329                            subgraph: Some("local".into()),
1330                            range: None,
1331                        }],
1332                        severity: Severity::Error,
1333                    },
1334                    Issue {
1335                        code: "TEST_LOCAL_RELATIVE".into(),
1336                        message: "Test issue".into(),
1337                        locations: vec![SubgraphLocation {
1338                            subgraph: Some("local_relative".into()),
1339                            range: None,
1340                        }],
1341                        severity: Severity::Error,
1342                    },
1343                ],
1344                None,
1345            )
1346            .await;
1347
1348        let lock = socket_messages.lock().await;
1349        let diagnostics = lock
1350            .iter()
1351            .filter(|msg| msg.method() == "textDocument/publishDiagnostics")
1352            .map(|msg| {
1353                from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
1354            })
1355            .collect::<Vec<_>>();
1356
1357        assert_eq!(diagnostics.len(), 3);
1358        assert!(diagnostics
1359            .iter()
1360            .any(|diag| diag.uri.as_str() == "file:///path/to/project/"));
1361        assert!(diagnostics
1362            .iter()
1363            .any(|diag| diag.uri.as_str() == "file:///path/local_relative.graphql"));
1364
1365        // local is a "known" subgraph which is then opened in this test, giving
1366        // it a `version`. Ensure this `version` isn't coming from the known
1367        // subgraphs map (with a default version of 0).
1368        let local = diagnostics
1369            .iter()
1370            .find(|diag| diag.uri.as_str() == "file:///path/to/project/local.graphql")
1371            .unwrap();
1372        assert_eq!(&local.version, &Some(1));
1373    }
1374
1375    #[tokio::test]
1376    async fn publishing_diagnostics_to_known_subgraphs_without_opening_files() {
1377        // Initialize the server with known subgraphs.
1378        // Note that `remote` has no URI - its composition diagnostics should be published to the root URI.
1379        // The other two subgraphs should have their diagnostics published to their respective URIs.
1380        let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
1381            Config {
1382                root_uri: "/path/to/project".into(),
1383                ..Default::default()
1384            },
1385            [
1386                (
1387                    "local".into(),
1388                    SchemaSource::File {
1389                        file: "local.graphql".into(),
1390                    },
1391                ),
1392                (
1393                    "local_relative".into(),
1394                    SchemaSource::File {
1395                        file: "../../local_relative.graphql".into(),
1396                    },
1397                ),
1398                (
1399                    "remote".into(),
1400                    SchemaSource::Subgraph {
1401                        graphref: "testing@current".into(),
1402                        subgraph: "remote".into(),
1403                    },
1404                ),
1405            ]
1406            .into_iter()
1407            .collect(),
1408        );
1409
1410        let (socket_messages, _) = listen_to_channels(socket, composition_listener);
1411
1412        request(
1413            &mut service,
1414            initialize_request_with_server_configurations(1),
1415        )
1416        .await;
1417
1418        request(&mut service, initialized_notification()).await;
1419
1420        service
1421            .inner()
1422            .composition_did_update(
1423                None,
1424                vec![
1425                    Issue {
1426                        code: "TEST_REMOTE".into(),
1427                        message: "Test issue".into(),
1428                        locations: vec![SubgraphLocation {
1429                            subgraph: Some("remote".into()),
1430                            range: None,
1431                        }],
1432                        severity: Severity::Error,
1433                    },
1434                    Issue {
1435                        code: "TEST_LOCAL".into(),
1436                        message: "Test issue".into(),
1437                        locations: vec![SubgraphLocation {
1438                            subgraph: Some("local".into()),
1439                            range: None,
1440                        }],
1441                        severity: Severity::Error,
1442                    },
1443                    Issue {
1444                        code: "TEST_LOCAL_RELATIVE".into(),
1445                        message: "Test issue".into(),
1446                        locations: vec![SubgraphLocation {
1447                            subgraph: Some("local_relative".into()),
1448                            range: None,
1449                        }],
1450                        severity: Severity::Error,
1451                    },
1452                ],
1453                None,
1454            )
1455            .await;
1456
1457        let lock = socket_messages.lock().await;
1458        let diagnostics = lock
1459            .iter()
1460            .filter(|msg| msg.method() == "textDocument/publishDiagnostics")
1461            .map(|msg| {
1462                from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
1463            })
1464            .collect::<Vec<_>>();
1465
1466        assert_eq!(diagnostics.len(), 3);
1467        assert!(diagnostics
1468            .iter()
1469            .any(|diag| diag.uri.as_str() == "file:///path/to/project/"));
1470        assert!(diagnostics
1471            .iter()
1472            .any(|diag| diag.uri.as_str() == "file:///path/to/project/local.graphql"));
1473        assert!(diagnostics
1474            .iter()
1475            .any(|diag| diag.uri.as_str() == "file:///path/local_relative.graphql"));
1476    }
1477
1478    #[tokio::test]
1479    async fn host_can_publish_diagnostics() {
1480        let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
1481            Config {
1482                root_uri: "/path/to/project".into(),
1483                ..Default::default()
1484            },
1485            HashMap::new(),
1486        );
1487
1488        let (socket_messages, _) = listen_to_channels(socket, composition_listener);
1489
1490        request(
1491            &mut service,
1492            initialize_request_with_server_configurations(1),
1493        )
1494        .await;
1495
1496        request(&mut service, initialized_notification()).await;
1497
1498        let diagnostics = vec![lsp::Diagnostic::new_simple(
1499            lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
1500            "supergraph.yaml missing fields".into(),
1501        )];
1502
1503        service
1504            .inner()
1505            .publish_diagnostics(
1506                lsp::Url::from_file_path("/path/to/project/supergraph.yaml").unwrap(),
1507                diagnostics,
1508            )
1509            .await;
1510
1511        yield_now().await;
1512
1513        let lock = socket_messages.lock().await;
1514        let diagnostics_published_to_client = lock
1515            .iter()
1516            .filter(|msg| msg.method() == "textDocument/publishDiagnostics")
1517            .map(|msg| {
1518                from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
1519            })
1520            .collect::<Vec<_>>();
1521
1522        assert_eq!(diagnostics_published_to_client.len(), 1);
1523        assert_eq!(
1524            diagnostics_published_to_client[0].uri.as_str(),
1525            "file:///path/to/project/supergraph.yaml"
1526        );
1527    }
1528
1529    #[tokio::test]
1530    async fn stale_diagnostics_removed_on_remote_subgraph_removal() {
1531        let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
1532            Config {
1533                root_uri: "/path/to/project".into(),
1534                ..Default::default()
1535            },
1536            [(
1537                "remote".into(),
1538                SchemaSource::Subgraph {
1539                    graphref: "testing@current".into(),
1540                    subgraph: "remote".into(),
1541                },
1542            )]
1543            .into_iter()
1544            .collect(),
1545        );
1546
1547        let (socket_messages, _) = listen_to_channels(socket, composition_listener);
1548
1549        request(
1550            &mut service,
1551            initialize_request_with_server_configurations(1),
1552        )
1553        .await;
1554
1555        request(&mut service, initialized_notification()).await;
1556
1557        service
1558            .inner()
1559            .composition_did_update(
1560                None,
1561                vec![Issue {
1562                    code: "TEST_REMOTE".into(),
1563                    message: "Test issue".into(),
1564                    locations: vec![SubgraphLocation {
1565                        subgraph: Some("remote".into()),
1566                        range: None,
1567                    }],
1568                    severity: Severity::Error,
1569                }],
1570                None,
1571            )
1572            .await;
1573        {
1574            let messages = &mut *socket_messages.lock().await;
1575            let diagnostics = messages
1576                .iter()
1577                .filter(|msg| msg.method() == "textDocument/publishDiagnostics")
1578                .map(|msg| {
1579                    from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
1580                })
1581                .collect::<Vec<_>>();
1582
1583            assert_eq!(diagnostics.len(), 1);
1584            assert!(diagnostics
1585                .iter()
1586                .any(|diag| diag.uri.as_str() == "file:///path/to/project/"
1587                    && diag.diagnostics.len() == 1));
1588
1589            messages.clear();
1590        }
1591
1592        service.inner().remove_subgraph("remote").await;
1593
1594        {
1595            let messages = socket_messages.lock().await;
1596            let diagnostics = messages
1597                .iter()
1598                .filter(|msg| msg.method() == "textDocument/publishDiagnostics")
1599                .map(|msg| {
1600                    from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
1601                })
1602                .collect::<Vec<_>>();
1603
1604            // A subgraph was removed that we don't have a URI for, so its
1605            // diagnostics are at the root. We don't want to clear all root
1606            // diagnostics on every subgraph removal. This behavior depends on
1607            // composition_did_update being called to update any root level
1608            // diagnostics.
1609            assert_eq!(diagnostics.len(), 0);
1610        }
1611    }
1612
1613    #[tokio::test]
1614    async fn stale_diagnostics_removed_on_local_removal() {
1615        let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
1616            Config {
1617                root_uri: "/path/to/project".into(),
1618                ..Default::default()
1619            },
1620            [(
1621                "local".into(),
1622                SchemaSource::File {
1623                    file: "local.graphql".into(),
1624                },
1625            )]
1626            .into_iter()
1627            .collect(),
1628        );
1629
1630        let (socket_messages, _) = listen_to_channels(socket, composition_listener);
1631
1632        request(
1633            &mut service,
1634            initialize_request_with_server_configurations(1),
1635        )
1636        .await;
1637
1638        request(&mut service, initialized_notification()).await;
1639
1640        service
1641            .inner()
1642            .composition_did_update(
1643                None,
1644                vec![Issue {
1645                    code: "TEST_LOCAL".into(),
1646                    message: "Test issue".into(),
1647                    locations: vec![SubgraphLocation {
1648                        subgraph: Some("local".into()),
1649                        range: None,
1650                    }],
1651                    severity: Severity::Error,
1652                }],
1653                None,
1654            )
1655            .await;
1656        {
1657            let messages = &mut *socket_messages.lock().await;
1658            let diagnostics = messages
1659                .iter()
1660                .filter(|msg| msg.method() == "textDocument/publishDiagnostics")
1661                .map(|msg| {
1662                    from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
1663                })
1664                .collect::<Vec<_>>();
1665
1666            assert_eq!(diagnostics.len(), 2);
1667            assert!(diagnostics.iter().any(|diag| diag.uri.as_str()
1668                == "file:///path/to/project/local.graphql"
1669                && diag.diagnostics.len() == 1));
1670
1671            messages.clear();
1672        }
1673
1674        service.inner().remove_subgraph("local").await;
1675
1676        {
1677            let messages = socket_messages.lock().await;
1678            let diagnostics = messages
1679                .iter()
1680                .filter(|msg| msg.method() == "textDocument/publishDiagnostics")
1681                .map(|msg| {
1682                    from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
1683                })
1684                .collect::<Vec<_>>();
1685
1686            // Here we're expecting a message to clear the diagnostics for the actual file URI.
1687            assert_eq!(diagnostics.len(), 1);
1688            assert!(diagnostics.iter().any(|diag| diag.uri.as_str()
1689                == "file:///path/to/project/local.graphql"
1690                && diag.diagnostics.is_empty()));
1691        }
1692    }
1693}