use crate::{
custom_requests::{
ApolloCanComposeNotification, ApolloCanComposeNotificationParams,
ApolloComposeServicesParams, ApolloComposeServicesRequest, ApolloComposeServicesResponse,
ApolloConfigureAutoCompositionNotification, ApolloConfigureAutoCompositionParams,
APOLLO_COMPOSITION_PROGRESS_TOKEN,
},
graph::{
supergraph::{KnownSubgraphs, Supergraph},
Graph, GraphConfig,
},
semantic_tokens::{incomplete_tokens_to_deltas, LEGEND_TYPE},
telemetry::{AnalyticsEvent, CompositionTiming, TelemetryEvent},
};
use apollo_composition::Issue;
use apollo_federation_types::{config::SchemaSource, javascript::SubgraphDefinition};
use debounced::debounced;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
future::join_all,
lock::Mutex,
SinkExt, StreamExt,
};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, collections::HashMap};
use std::{sync::Arc, time::Duration};
use tower_lsp::{
async_trait, jsonrpc,
lsp_types::{self as lsp, notification::Notification, request::Request},
Client, ClientSocket, LanguageServer, LspService,
};
#[cfg(all(feature = "wasm", not(test)))]
use wasm_bindgen_futures::spawn_local as spawn;
#[cfg(any(feature = "tokio", test))]
use tokio::spawn;
pub const LANGUAGE_IDS: [&str; 2] = ["apollo-graphql", "graphql"];
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase", default)]
pub struct Config {
pub root_uri: String,
pub enable_auto_composition: bool,
pub force_federation: bool,
pub disable_telemetry: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
root_uri: "/".to_string(),
enable_auto_composition: false,
force_federation: false,
disable_telemetry: false,
}
}
}
#[derive(Clone)]
pub struct ApolloLanguageServer {
state: Arc<State>,
}
struct State {
graph: Mutex<Option<Graph>>,
client: Mutex<Client>,
request_composition: Mutex<Sender<Vec<SubgraphDefinition>>>,
config: Mutex<Config>,
document_change_queue_sender: Mutex<Sender<lsp::DidChangeTextDocumentParams>>,
}
const DOCUMENT_SIZE_DEBOUNCE_THRESHOLD: usize = 50_000;
const DOCUMENT_UPDATE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(150);
impl ApolloLanguageServer {
fn new(
client: Client,
request_composition: Sender<Vec<SubgraphDefinition>>,
config: Config,
known_subgraphs: KnownSubgraphs,
) -> Self {
let (document_change_queue_sender, document_change_queue_receiver) =
channel::<lsp::DidChangeTextDocumentParams>(1);
let graph = (!known_subgraphs.by_name.is_empty()).then_some(Graph::Supergraph(Box::new(
Supergraph::new(known_subgraphs),
)));
let state = State {
graph: Mutex::new(graph),
client: Mutex::new(client),
request_composition: Mutex::new(request_composition),
config: Mutex::new(config),
document_change_queue_sender: Mutex::new(document_change_queue_sender),
};
let state = Arc::new(state);
State::initialize_document_update_debouncer(state.clone(), document_change_queue_receiver);
Self { state }
}
pub fn build_service(
config: Config,
known_subgraphs: HashMap<String, SchemaSource>,
) -> (
LspService<Self>,
ClientSocket,
Receiver<Vec<SubgraphDefinition>>,
) {
let (composition_request_sender, composition_request_receiver) =
channel::<Vec<SubgraphDefinition>>(1);
let known_subgraphs = KnownSubgraphs {
root_uri: config.root_uri.clone(),
by_name: known_subgraphs
.iter()
.map(|(name, source)| (name.clone(), source.clone()))
.collect(),
by_uri: known_subgraphs
.iter()
.filter_map(|(name, source)| match source {
SchemaSource::File { file } => {
let uri = if file.is_relative() {
let url = lsp::Url::from_directory_path(&config.root_uri)
.expect("Failed to parse URL");
url.join(file.as_str()).expect("Failed to join URL")
} else {
lsp::Url::from_file_path(file).expect("Failed to convert path to URL")
};
Some((uri, name.clone()))
}
_ => None,
})
.collect(),
};
let (service, client_socket) = LspService::build(|client| {
ApolloLanguageServer::new(client, composition_request_sender, config, known_subgraphs)
})
.custom_method(
ApolloConfigureAutoCompositionNotification::METHOD,
ApolloLanguageServer::configure_auto_composition,
)
.custom_method(
ApolloComposeServicesRequest::METHOD,
ApolloLanguageServer::request_recompose,
)
.finish();
(service, client_socket, composition_request_receiver)
}
fn capabilities() -> lsp::InitializeResult {
lsp::InitializeResult {
capabilities: lsp::ServerCapabilities {
text_document_sync: Some(lsp::TextDocumentSyncCapability::Options(
lsp::TextDocumentSyncOptions {
change: Some(lsp::TextDocumentSyncKind::FULL),
open_close: Some(true),
..Default::default()
},
)),
completion_provider: Some(lsp::CompletionOptions {
trigger_characters: Some(vec![
"@".to_string(),
"|".to_string(),
"&".to_string(),
]),
..Default::default()
}),
semantic_tokens_provider: Some(
lsp::SemanticTokensServerCapabilities::SemanticTokensOptions(
lsp::SemanticTokensOptions {
work_done_progress_options: lsp::WorkDoneProgressOptions {
work_done_progress: None,
},
legend: lsp::SemanticTokensLegend {
token_types: LEGEND_TYPE.into(),
token_modifiers: vec![],
},
range: Some(false),
full: Some(lsp::SemanticTokensFullOptions::Delta { delta: Some(true) }),
},
),
),
hover_provider: Some(lsp::HoverProviderCapability::Simple(true)),
definition_provider: Some(lsp::OneOf::Left(true)),
..Default::default()
},
..Default::default()
}
}
async fn request_recompose(
&self,
_params: ApolloComposeServicesParams,
) -> jsonrpc::Result<ApolloComposeServicesResponse> {
let did_compose = self.state.maybe_recompose().await;
Ok(ApolloComposeServicesResponse { did_compose })
}
async fn configure_auto_composition(&self, params: ApolloConfigureAutoCompositionParams) {
{
let mut config = self.state.config.lock().await;
config.enable_auto_composition = params.enabled;
}
self.state.maybe_auto_recompose().await;
}
pub async fn composition_did_start(&self) {
self.state
.client
.lock()
.await
.send_notification::<lsp::notification::Progress>(lsp::ProgressParams {
token: lsp::ProgressToken::String(APOLLO_COMPOSITION_PROGRESS_TOKEN.to_string()),
value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Begin(
lsp::WorkDoneProgressBegin {
title: "Composing services".to_string(),
cancellable: Some(false),
message: None,
percentage: None,
},
)),
})
.await;
}
pub async fn composition_did_update(
&self,
_supergraph_sdl: Option<String>,
issues: Vec<Issue>,
timing: Option<Duration>,
) {
self.state.composition_did_update(issues).await;
if let Some(timing) = timing {
self.send_telemetry_if_enabled(TelemetryEvent::Analytics(
AnalyticsEvent::CompositionTimeInMs(CompositionTiming {
value: timing.as_millis() as u64,
}),
))
.await;
}
}
pub async fn add_subgraph(&self, name: String, source: SchemaSource) {
let Some(Graph::Supergraph(supergraph)) = &mut *self.state.graph.lock().await else {
panic!("Graph is unexpectedly not a supergraph");
};
supergraph.add_known_subgraph(name, source).await;
}
pub async fn remove_subgraph(&self, name: &str) {
let Some(Graph::Supergraph(supergraph)) = &mut *self.state.graph.lock().await else {
panic!("Graph is unexpectedly not a supergraph");
};
supergraph.remove_known_subgraph(name).await;
}
pub async fn publish_diagnostics(&self, uri: lsp::Url, diagnostics: Vec<lsp::Diagnostic>) {
let graph = self.state.graph.lock().await;
let version = graph.as_ref().and_then(|graph| graph.version_for_uri(&uri));
self.state
.client
.lock()
.await
.publish_diagnostics(uri, diagnostics, version)
.await;
}
async fn send_telemetry_if_enabled(&self, event: TelemetryEvent) {
if !self.state.config.lock().await.disable_telemetry {
self.state.client.lock().await.telemetry_event(event).await;
}
}
#[cfg(test)]
async fn config(&self) -> Config {
self.state.config.lock().await.clone()
}
}
impl State {
async fn maybe_auto_recompose(&self) {
if !self.config.lock().await.enable_auto_composition {
return;
}
self.maybe_recompose().await;
}
async fn maybe_send_can_compose_notification(&self) {
let invalid_subgraph_names = if let Some(Some(supergraph)) =
self.graph.lock().await.as_ref().map(Graph::supergraph)
{
supergraph.get_invalid_subgraph_uris()
} else {
return;
};
self.client
.lock()
.await
.send_notification::<ApolloCanComposeNotification>(ApolloCanComposeNotificationParams {
can_compose: invalid_subgraph_names.is_empty(),
subgraphs_with_errors: invalid_subgraph_names,
})
.await;
}
async fn maybe_recompose(&self) -> bool {
let subgraph_definitions = if let Some(Some(supergraph)) =
self.graph.lock().await.as_ref().map(Graph::supergraph)
{
if !supergraph.subgraphs_are_invalid() {
supergraph.subgraph_definitions()
} else {
return false;
}
} else {
return false;
};
self.request_composition
.lock()
.await
.send(subgraph_definitions)
.await
.expect("Failed to send message");
true
}
async fn composition_did_update(&self, issues: Vec<Issue>) {
let (diagnostics_by_subgraph, unattributed_diagnostics) = {
let graph = self.graph.lock().await;
match graph.as_ref() {
Some(Graph::Supergraph(supergraph)) => supergraph.diagnostics_for_composition(issues),
_ => panic!("Programming error: called `composition_did_update` when graph is not a supergraph."),
}
};
let client = self.client.lock().await;
join_all(
diagnostics_by_subgraph
.into_iter()
.map(|(url, (diagnostics, version))| {
client.publish_diagnostics(url, diagnostics, Some(version))
}),
)
.await;
client
.publish_diagnostics(
lsp::Url::parse(&self.config.lock().await.root_uri).expect("Failed to parse URL"),
unattributed_diagnostics,
0.into(),
)
.await;
client
.send_notification::<lsp::notification::Progress>(lsp::ProgressParams {
token: lsp::ProgressToken::String(APOLLO_COMPOSITION_PROGRESS_TOKEN.to_string()),
value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::End(
lsp::WorkDoneProgressEnd { message: None },
)),
})
.await;
}
fn initialize_document_update_debouncer(
state: Arc<State>,
document_change_queue_receiver: Receiver<lsp::DidChangeTextDocumentParams>,
) {
spawn(async move {
let mut debounced_document_change_queue = debounced(
document_change_queue_receiver,
DOCUMENT_UPDATE_DEBOUNCE_INTERVAL,
);
while let Some(data) = debounced_document_change_queue.next().await {
state.handle_document_update(data).await;
}
});
}
async fn handle_document_update(&self, data: lsp::DidChangeTextDocumentParams) {
let lsp::DidChangeTextDocumentParams {
text_document,
content_changes,
} = data;
let lsp::VersionedTextDocumentIdentifier { uri, version } = text_document;
let text = content_changes
.into_iter()
.next()
.expect("Expected at least one content change")
.text;
let graph_config = {
let config = self.config.lock().await;
GraphConfig {
force_federation: config.force_federation,
}
};
let (diagnostics, version) = {
let mut graph = self.graph.lock().await;
let Some(graph) = graph.as_mut() else {
panic!("Attempted to change a document that hasn't been opened");
};
graph.update(uri.clone(), text, version, graph_config);
graph.diagnostics_for_uri(&uri)
};
self.client
.lock()
.await
.publish_diagnostics(uri.clone(), diagnostics, Some(version))
.await;
self.maybe_auto_recompose().await;
self.maybe_send_can_compose_notification().await;
}
}
#[async_trait]
impl LanguageServer for ApolloLanguageServer {
async fn initialize(
&self,
initialize_params: lsp::InitializeParams,
) -> jsonrpc::Result<lsp::InitializeResult> {
let options: Option<Config> = initialize_params
.initialization_options
.map(|options| {
serde_json::from_value(options).map_err(|err| jsonrpc::Error {
message: Cow::from(err.to_string()),
code: jsonrpc::ErrorCode::InvalidParams,
data: None,
})
})
.transpose()?;
let mut config = self.state.config.lock().await;
if let Some(options) = options {
*config = options;
}
config.root_uri = initialize_params
.root_uri
.map(|uri| uri.to_string())
.unwrap_or("inmemory://".to_string());
Ok(ApolloLanguageServer::capabilities())
}
async fn initialized(&self, _: lsp::InitializedParams) {
self.send_telemetry_if_enabled(TelemetryEvent::Analytics(AnalyticsEvent::Initialized))
.await;
}
async fn shutdown(&self) -> jsonrpc::Result<()> {
Ok(())
}
async fn did_open(&self, params: lsp::DidOpenTextDocumentParams) {
let lsp::TextDocumentItem {
uri,
version,
text,
language_id,
} = params.text_document;
if !LANGUAGE_IDS.contains(&language_id.as_str()) {
return;
}
let graph_config = {
let config = self.state.config.lock().await;
GraphConfig {
force_federation: config.force_federation,
}
};
let (diagnostics_for_uri, version) = {
let mut graph = self.state.graph.lock().await;
if graph.is_some() {
graph
.as_mut()
.unwrap()
.update(uri.clone(), text, version, graph_config);
} else {
let new_graph =
Graph::new(uri.clone(), text, KnownSubgraphs::default(), graph_config);
*graph = Some(new_graph);
};
graph.as_ref().unwrap().diagnostics_for_uri(&uri)
};
if !diagnostics_for_uri.is_empty() {
self.state
.client
.lock()
.await
.publish_diagnostics(uri, diagnostics_for_uri, Some(version))
.await;
}
self.state.maybe_auto_recompose().await;
self.state.maybe_send_can_compose_notification().await;
}
async fn did_change(&self, params: lsp::DidChangeTextDocumentParams) {
if params.content_changes[0].text.len() < DOCUMENT_SIZE_DEBOUNCE_THRESHOLD {
self.state.handle_document_update(params).await;
} else {
self.state
.document_change_queue_sender
.lock()
.await
.send(params.clone())
.await
.expect("Failed to send message")
}
}
async fn did_close(&self, params: lsp::DidCloseTextDocumentParams) {
{
let mut graph = self.state.graph.lock().await;
match &mut *graph {
Some(Graph::Monolith(_)) => {
*graph = None;
}
Some(Graph::Supergraph(supergraph)) => {
if supergraph.remove(¶ms.text_document.uri).is_none() {
panic!("Subgraph is unexpectedly None");
}
}
None => panic!("Graph is unexpectedly None"),
}
}
self.state.maybe_auto_recompose().await;
self.state.maybe_send_can_compose_notification().await;
}
async fn completion(
&self,
params: lsp::CompletionParams,
) -> jsonrpc::Result<Option<lsp::CompletionResponse>> {
let graph = self.state.graph.lock().await;
Ok(graph.as_ref().and_then(|graph| {
graph.completions(
¶ms.text_document_position.text_document.uri,
params.text_document_position.position,
)
}))
}
async fn semantic_tokens_full(
&self,
params: lsp::SemanticTokensParams,
) -> jsonrpc::Result<Option<lsp::SemanticTokensResult>> {
let graph = self.state.graph.lock().await;
Ok(graph.as_ref().map(|graph| {
let tokens = graph
.semantic_tokens_full(¶ms.text_document.uri)
.unwrap_or_default();
lsp::SemanticTokensResult::Tokens(lsp::SemanticTokens {
result_id: None,
data: incomplete_tokens_to_deltas(tokens),
})
}))
}
async fn hover(&self, params: lsp::HoverParams) -> jsonrpc::Result<Option<lsp::Hover>> {
let graph = self.state.graph.lock().await;
Ok(graph.as_ref().and_then(|graph| {
graph.on_hover(
¶ms.text_document_position_params.text_document.uri,
¶ms.text_document_position_params.position,
)
}))
}
async fn goto_definition(
&self,
params: lsp::GotoDefinitionParams,
) -> jsonrpc::Result<Option<lsp::GotoDefinitionResponse>> {
let graph = self.state.graph.lock().await;
Ok(graph.as_ref().map(|graph| {
lsp::GotoDefinitionResponse::from(
graph
.goto_definition(
¶ms.text_document_position_params.text_document.uri,
¶ms.text_document_position_params.position,
)
.unwrap_or_default(),
)
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use apollo_composition::{Severity, SubgraphLocation};
use lsp::{InitializeParams, PublishDiagnosticsParams};
use serde_json::{from_value, json, to_value};
use tokio::{sync::Mutex, task::yield_now};
use tower::{Service, ServiceExt};
use tower_lsp::{jsonrpc, LspService, Server};
fn initialize_request(id: i64) -> jsonrpc::Request {
jsonrpc::Request::build("initialize")
.params(json!({"capabilities":{}}))
.id(id)
.finish()
}
fn initialize_request_with_server_configurations(id: i64) -> jsonrpc::Request {
jsonrpc::Request::build("initialize")
.params(
to_value(InitializeParams {
initialization_options: Some(json!({
"enableAutoComposition": true,
"disableTelemetry": true,
})),
process_id: None,
root_uri: Some(lsp::Url::from_directory_path("/path/to/project").unwrap()),
..Default::default()
})
.unwrap(),
)
.id(id)
.finish()
}
fn initialized_notification() -> jsonrpc::Request {
jsonrpc::Request::build("initialized")
.params(json!({}))
.finish()
}
fn shutdown_request() -> jsonrpc::Request {
jsonrpc::Request::build("shutdown").finish()
}
fn server_capabilities() -> serde_json::Value {
serde_json::to_value(ApolloLanguageServer::capabilities()).unwrap()
}
async fn request(
server: &mut LspService<ApolloLanguageServer>,
request: jsonrpc::Request,
) -> Option<jsonrpc::Response> {
server.ready().await.unwrap().call(request).await.unwrap()
}
#[allow(clippy::type_complexity)]
fn listen_to_channels(
mut socket: ClientSocket,
mut composition_listener: Receiver<Vec<SubgraphDefinition>>,
) -> (
Arc<Mutex<Vec<jsonrpc::Request>>>,
Arc<Mutex<Vec<Vec<SubgraphDefinition>>>>,
) {
let socket_messages = Arc::new(Mutex::new(Vec::default()));
let composition_listener_messages = Arc::new(Mutex::new(Vec::default()));
let socket_messages_inner = socket_messages.clone();
let composition_listener_messages_inner = composition_listener_messages.clone();
tokio::spawn(async move {
while let Some(data) = socket.next().await {
socket_messages_inner.lock().await.push(data)
}
});
tokio::spawn(async move {
while let Some(data) = composition_listener.next().await {
composition_listener_messages_inner.lock().await.push(data)
}
});
(socket_messages, composition_listener_messages)
}
#[tokio::test]
async fn initialization_and_shutdown() {
let (mut server, ..) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
assert_eq!(
request(&mut server, initialize_request(1)).await,
Some(jsonrpc::Response::from_ok(1.into(), server_capabilities()))
);
assert_eq!(request(&mut server, initialized_notification()).await, None);
assert_eq!(request(&mut server, shutdown_request()).await, None);
assert_eq!(request(&mut server, shutdown_request()).await, None);
}
#[tokio::test]
async fn initialization_with_client_config() {
let (mut server, ..) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
assert_eq!(
request(
&mut server,
initialize_request_with_server_configurations(1)
)
.await,
Some(jsonrpc::Response::from_ok(1.into(), server_capabilities()))
);
assert!(server.inner().config().await.enable_auto_composition);
assert_eq!(
server.inner().config().await.root_uri,
"file:///path/to/project/".to_string()
);
assert_eq!(request(&mut server, initialized_notification()).await, None);
}
#[tokio::test]
async fn initialization_without_root_uri_specified() {
let (mut server, ..) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
assert_eq!(
request(&mut server, initialize_request(1)).await,
Some(jsonrpc::Response::from_ok(1.into(), server_capabilities()))
);
assert_eq!(
server.inner().state.config.lock().await.root_uri,
"inmemory://".to_string()
);
assert_eq!(request(&mut server, initialized_notification()).await, None);
}
#[tokio::test]
async fn diagnostics() {
let (mut server, socket, composition_listener) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
let (socket_messages, composition_listener_messages) =
listen_to_channels(socket, composition_listener);
request(
&mut server,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut server, initialized_notification()).await;
request(
&mut server,
jsonrpc::Request::build("textDocument/didOpen")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file",
"version": 1,
"languageId": "apollo-graphql",
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
}
}))
.finish(),
)
.await;
request(
&mut server,
jsonrpc::Request::build("textDocument/didChange")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file",
"version": 2,
},
"contentChanges": [{
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String! }"
}]
}))
.finish(),
)
.await;
yield_now().await;
let messages = &*socket_messages.lock().await;
assert_eq!(
messages,
&[
jsonrpc::Request::build("apollo/canCompose")
.params(json!({"canCompose": true, "subgraphsWithErrors": []}))
.finish(),
jsonrpc::Request::build("textDocument/publishDiagnostics")
.params(json!({"uri": "file:///path/to/file", "diagnostics": [], "version": 2}))
.finish(),
jsonrpc::Request::build("apollo/canCompose")
.params(json!({"canCompose": true, "subgraphsWithErrors": []}))
.finish(),
]
);
let composition_messages = &*composition_listener_messages.lock().await;
assert_eq!(composition_messages, &[
[
SubgraphDefinition {
name: "file:///path/to/file".into(),
url: "file:///path/to/file".into(),
sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }".into()
}
],[
SubgraphDefinition {
name: "file:///path/to/file".into(),
url: "file:///path/to/file".into(),
sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String! }".into()
}
]
]);
}
#[tokio::test]
async fn runs_in_tower_lsp_server() {
fn mock_msg(msg: &str) -> String {
format!("Content-Length: {}\r\n\r\n{}", msg.len(), msg)
}
let mock_request = mock_msg(
r#"{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":0}"#,
);
let mock_response = mock_msg(
&serde_json::to_string(&jsonrpc::Response::from_ok(0.into(), server_capabilities()))
.unwrap(),
);
let (sender, _) = channel::<Vec<SubgraphDefinition>>(1);
let (service, socket) = LspService::new(|client| {
ApolloLanguageServer::new(client, sender, Config::default(), Default::default())
});
let mut stdout = Vec::new();
Server::new(&mut mock_request.as_bytes(), &mut stdout, socket)
.serve(service)
.await;
let stdout = String::from_utf8(stdout).unwrap();
assert_eq!(stdout, mock_response);
}
#[tokio::test]
async fn did_close() {
let (mut server, _, mut composition_listener) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
request(
&mut server,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut server, initialized_notification()).await;
request(
&mut server,
jsonrpc::Request::build("textDocument/didOpen")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file1",
"version": 1,
"languageId": "apollo-graphql",
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
}
}))
.finish(),
)
.await;
composition_listener.next().await.unwrap();
request(
&mut server,
jsonrpc::Request::build("textDocument/didOpen")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file2",
"version": 1,
"languageId": "apollo-graphql",
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { goodbye: String! }"
},
}))
.finish(),
)
.await;
composition_listener.next().await.unwrap();
request(
&mut server,
jsonrpc::Request::build("textDocument/didClose")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file1",
}
}))
.finish(),
)
.await;
composition_listener.next().await.unwrap();
}
#[tokio::test]
async fn custom_notification_toggle_auto_composition() {
let (mut server, _, _) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
request(
&mut server,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut server, initialized_notification()).await;
assert!(server.inner().config().await.enable_auto_composition);
request(
&mut server,
jsonrpc::Request::build(ApolloConfigureAutoCompositionNotification::METHOD)
.params(json!({"enabled": false}))
.finish(),
)
.await;
assert!(!server.inner().config().await.enable_auto_composition);
request(
&mut server,
jsonrpc::Request::build(ApolloConfigureAutoCompositionNotification::METHOD)
.params(json!({"enabled": true}))
.finish(),
)
.await;
assert!(server.inner().config().await.enable_auto_composition);
}
#[tokio::test]
async fn custom_notification_trigger_recomposition() {
let (mut server, socket, composition_listener) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
let (socket_messages, composition_listener_messages) =
listen_to_channels(socket, composition_listener);
request(
&mut server,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut server, initialized_notification()).await;
request(
&mut server,
jsonrpc::Request::build("textDocument/didOpen")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file1",
"version": 1,
"languageId": "apollo-graphql",
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
}
}))
.finish(),
).await;
request(
&mut server,
jsonrpc::Request::build(ApolloComposeServicesRequest::METHOD)
.id(100)
.params(json!({}))
.finish(),
)
.await;
let messages = &*socket_messages.lock().await;
assert_eq!(
messages,
&[jsonrpc::Request::build("apollo/canCompose")
.params(json!({"canCompose": true, "subgraphsWithErrors": []}))
.finish(),]
);
let composition_messages = &*composition_listener_messages.lock().await;
assert_eq!(composition_messages, &[
[
SubgraphDefinition {
name: "file:///path/to/file1".into(),
url: "file:///path/to/file1".into(),
sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }".into()
}
],
[
SubgraphDefinition {
name: "file:///path/to/file1".into(),
url: "file:///path/to/file1".into(),
sdl: "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }".into()
}
]
]);
}
#[tokio::test]
async fn with_graphql_language_id() {
let (mut server, _, mut composition_listener) =
ApolloLanguageServer::build_service(Config::default(), HashMap::new());
request(
&mut server,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut server, initialized_notification()).await;
request(
&mut server,
jsonrpc::Request::build("textDocument/didOpen")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file1",
"version": 1,
"languageId": "graphql",
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
}
}))
.finish(),
)
.await;
composition_listener.next().await.unwrap();
request(
&mut server,
jsonrpc::Request::build("textDocument/didChange")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file1",
"version": 2,
},
"contentChanges": [{
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String! }"
}]
}))
.finish(),
)
.await;
composition_listener.next().await.unwrap();
request(
&mut server,
jsonrpc::Request::build("textDocument/didClose")
.params(json!({
"textDocument": {
"uri": "file:///path/to/file1",
}
}))
.finish(),
)
.await;
composition_listener.next().await.unwrap();
}
#[tokio::test]
async fn publishing_diagnostics_to_known_subgraphs() {
let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
Config {
root_uri: "/path/to/project".into(),
..Default::default()
},
[
(
"local".into(),
SchemaSource::File {
file: "local.graphql".into(),
},
),
(
"local_relative".into(),
SchemaSource::File {
file: "../../local_relative.graphql".into(),
},
),
(
"remote".into(),
SchemaSource::Subgraph {
graphref: "testing@current".into(),
subgraph: "remote".into(),
},
),
]
.into_iter()
.collect(),
);
let (socket_messages, _) = listen_to_channels(socket, composition_listener);
request(
&mut service,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut service, initialized_notification()).await;
request(&mut service, jsonrpc::Request::build("textDocument/didOpen")
.params(json!({
"textDocument": {
"uri": "file:///path/to/project/local.graphql",
"version": 1,
"languageId": "graphql",
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello: String }"
}
}))
.finish()).await;
request(&mut service, jsonrpc::Request::build("textDocument/didOpen")
.params(json!({
"textDocument": {
"uri": "file:///path/local_relative.graphql",
"version": 1,
"languageId": "graphql",
"text": "extend schema @link(url: \"https://specs.apollo.dev/federation/v2.10\")\ntype Query { hello2: String }"
}
}))
.finish()).await;
service
.inner()
.composition_did_update(
None,
vec![
Issue {
code: "TEST_REMOTE".into(),
message: "Test issue".into(),
locations: vec![SubgraphLocation {
subgraph: Some("remote".into()),
range: None,
}],
severity: Severity::Error,
},
Issue {
code: "TEST_LOCAL".into(),
message: "Test issue".into(),
locations: vec![SubgraphLocation {
subgraph: Some("local".into()),
range: None,
}],
severity: Severity::Error,
},
Issue {
code: "TEST_LOCAL_RELATIVE".into(),
message: "Test issue".into(),
locations: vec![SubgraphLocation {
subgraph: Some("local_relative".into()),
range: None,
}],
severity: Severity::Error,
},
],
None,
)
.await;
let lock = socket_messages.lock().await;
let diagnostics = lock
.iter()
.filter(|msg| msg.method() == "textDocument/publishDiagnostics")
.map(|msg| {
from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
})
.collect::<Vec<_>>();
assert_eq!(diagnostics.len(), 3);
assert!(diagnostics
.iter()
.any(|diag| diag.uri.as_str() == "file:///path/to/project/"));
assert!(diagnostics
.iter()
.any(|diag| diag.uri.as_str() == "file:///path/to/project/local.graphql"));
assert!(diagnostics
.iter()
.any(|diag| diag.uri.as_str() == "file:///path/local_relative.graphql"));
}
#[tokio::test]
async fn publishing_diagnostics_to_known_subgraphs_without_opening_files() {
let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
Config {
root_uri: "/path/to/project".into(),
..Default::default()
},
[
(
"local".into(),
SchemaSource::File {
file: "local.graphql".into(),
},
),
(
"local_relative".into(),
SchemaSource::File {
file: "../../local_relative.graphql".into(),
},
),
(
"remote".into(),
SchemaSource::Subgraph {
graphref: "testing@current".into(),
subgraph: "remote".into(),
},
),
]
.into_iter()
.collect(),
);
let (socket_messages, _) = listen_to_channels(socket, composition_listener);
request(
&mut service,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut service, initialized_notification()).await;
service
.inner()
.composition_did_update(
None,
vec![
Issue {
code: "TEST_REMOTE".into(),
message: "Test issue".into(),
locations: vec![SubgraphLocation {
subgraph: Some("remote".into()),
range: None,
}],
severity: Severity::Error,
},
Issue {
code: "TEST_LOCAL".into(),
message: "Test issue".into(),
locations: vec![SubgraphLocation {
subgraph: Some("local".into()),
range: None,
}],
severity: Severity::Error,
},
Issue {
code: "TEST_LOCAL_RELATIVE".into(),
message: "Test issue".into(),
locations: vec![SubgraphLocation {
subgraph: Some("local_relative".into()),
range: None,
}],
severity: Severity::Error,
},
],
None,
)
.await;
let lock = socket_messages.lock().await;
let diagnostics = lock
.iter()
.filter(|msg| msg.method() == "textDocument/publishDiagnostics")
.map(|msg| {
from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
})
.collect::<Vec<_>>();
assert_eq!(diagnostics.len(), 3);
assert!(diagnostics
.iter()
.any(|diag| diag.uri.as_str() == "file:///path/to/project/"));
assert!(diagnostics
.iter()
.any(|diag| diag.uri.as_str() == "file:///path/to/project/local.graphql"));
assert!(diagnostics
.iter()
.any(|diag| diag.uri.as_str() == "file:///path/local_relative.graphql"));
}
#[tokio::test]
async fn host_can_publish_diagnostics() {
let (mut service, socket, composition_listener) = ApolloLanguageServer::build_service(
Config {
root_uri: "/path/to/project".into(),
..Default::default()
},
HashMap::new(),
);
let (socket_messages, _) = listen_to_channels(socket, composition_listener);
request(
&mut service,
initialize_request_with_server_configurations(1),
)
.await;
request(&mut service, initialized_notification()).await;
let diagnostics = vec![lsp::Diagnostic::new_simple(
lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
"supergraph.yaml missing fields".into(),
)];
service
.inner()
.publish_diagnostics(
lsp::Url::from_file_path("/path/to/project/supergraph.yaml").unwrap(),
diagnostics,
)
.await;
yield_now().await;
let lock = socket_messages.lock().await;
let diagnostics_published_to_client = lock
.iter()
.filter(|msg| msg.method() == "textDocument/publishDiagnostics")
.map(|msg| {
from_value::<PublishDiagnosticsParams>(msg.params().unwrap().clone()).unwrap()
})
.collect::<Vec<_>>();
assert_eq!(diagnostics_published_to_client.len(), 1);
assert_eq!(
diagnostics_published_to_client[0].uri.as_str(),
"file:///path/to/project/supergraph.yaml"
);
}
}