use crate::services::async_bridge::{
AsyncBridge, AsyncMessage, LspMessageType, LspProgressValue, LspSemanticTokensResponse,
LspServerStatus,
};
use crate::services::process_limits::ProcessLimits;
use lsp_types::{
notification::{
DidChangeTextDocument, DidCloseTextDocument, DidOpenTextDocument, DidSaveTextDocument,
Initialized, Notification, PublishDiagnostics,
},
request::{Initialize, Request},
ClientCapabilities, DidChangeTextDocumentParams, DidCloseTextDocumentParams,
DidOpenTextDocumentParams, DidSaveTextDocumentParams, InitializeParams, InitializeResult,
InitializedParams, PartialResultParams, Position, PublishDiagnosticsParams, Range,
SemanticTokenModifier, SemanticTokenType, SemanticTokensClientCapabilities,
SemanticTokensClientCapabilitiesRequests, SemanticTokensFullOptions, SemanticTokensParams,
SemanticTokensResult, SemanticTokensServerCapabilities, ServerCapabilities,
TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentItem,
TextDocumentPositionParams, TokenFormat, Uri, VersionedTextDocumentIdentifier,
WindowClientCapabilities, WorkDoneProgressParams, WorkspaceFolder,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{mpsc as std_mpsc, Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::{ChildStdin, ChildStdout};
use tokio::sync::{mpsc, oneshot};
type PendingRequests = Arc<Mutex<HashMap<i64, oneshot::Sender<Result<Value, String>>>>>;
const DID_OPEN_GRACE_PERIOD_MS: u64 = 200;
const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
const LSP_ERROR_CONTENT_MODIFIED: i64 = -32801;
const LSP_ERROR_SERVER_CANCELLED: i64 = -32802;
fn is_suppressed_error_code(code: i64) -> bool {
code == LSP_ERROR_CONTENT_MODIFIED || code == LSP_ERROR_SERVER_CANCELLED
}
fn log_response_error(code: i64, message: &str, server_name: &str, language: &str) {
if is_suppressed_error_code(code) {
tracing::debug!(
"LSP response from '{}' ({}): {} (code {}), discarding",
server_name,
language,
message,
code
);
} else {
tracing::warn!(
"LSP response error from '{}' ({}): {} (code {})",
server_name,
language,
message,
code
);
}
}
fn should_skip_did_open(
document_versions: &Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
path: &PathBuf,
language: &str,
uri: &Uri,
) -> bool {
if document_versions.lock().unwrap().contains_key(path) {
tracing::debug!(
"LSP ({}): skipping didOpen - document already open: {}",
language,
uri.as_str()
);
true
} else {
false
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum JsonRpcMessage {
Request(JsonRpcRequest),
Response(JsonRpcResponse),
Notification(JsonRpcNotification),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
pub id: i64,
pub method: String,
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
pub id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcNotification {
pub jsonrpc: String,
pub method: String,
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcError {
pub code: i64,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LspClientState {
Initial,
Starting,
Initializing,
Running,
Stopping,
Stopped,
Error,
}
impl LspClientState {
pub fn can_transition_to(&self, next: LspClientState) -> bool {
use LspClientState::*;
match (self, next) {
(Initial, Starting) => true,
(Starting, Initializing) | (Starting, Error) => true,
(Initializing, Running) | (Initializing, Error) => true,
(Running, Stopping) | (Running, Error) => true,
(Stopping, Stopped) | (Stopping, Error) => true,
(Stopped, Starting) => true,
(Error, Stopping) | (Error, Starting) => true,
(_, Error) => true,
(a, b) if *a == b => true,
_ => false,
}
}
pub fn transition_to(&mut self, next: LspClientState) -> Result<(), String> {
if self.can_transition_to(next) {
*self = next;
Ok(())
} else {
Err(format!(
"Invalid state transition from {:?} to {:?}",
self, next
))
}
}
pub fn can_send_requests(&self) -> bool {
matches!(self, Self::Running)
}
pub fn can_initialize(&self) -> bool {
matches!(self, Self::Initial | Self::Starting | Self::Stopped)
}
pub fn to_server_status(&self) -> LspServerStatus {
match self {
Self::Initial => LspServerStatus::Starting,
Self::Starting => LspServerStatus::Starting,
Self::Initializing => LspServerStatus::Initializing,
Self::Running => LspServerStatus::Running,
Self::Stopping => LspServerStatus::Shutdown,
Self::Stopped => LspServerStatus::Shutdown,
Self::Error => LspServerStatus::Error,
}
}
}
fn create_client_capabilities() -> ClientCapabilities {
use lsp_types::{
CodeActionClientCapabilities, CodeActionKindLiteralSupport, CodeActionLiteralSupport,
CompletionClientCapabilities, DiagnosticClientCapabilities, DiagnosticTag,
DiagnosticWorkspaceClientCapabilities, DocumentFormattingClientCapabilities,
DocumentHighlightClientCapabilities, DocumentRangeFormattingClientCapabilities,
DocumentSymbolClientCapabilities, DynamicRegistrationClientCapabilities,
FoldingRangeCapability, FoldingRangeClientCapabilities, FoldingRangeKind,
FoldingRangeKindCapability, GeneralClientCapabilities, GotoCapability,
HoverClientCapabilities, InlayHintClientCapabilities, InlayHintWorkspaceClientCapabilities,
MarkupKind, PublishDiagnosticsClientCapabilities, RenameClientCapabilities,
SemanticTokensWorkspaceClientCapabilities, SignatureHelpClientCapabilities, TagSupport,
TextDocumentClientCapabilities, TextDocumentSyncClientCapabilities,
WorkspaceClientCapabilities, WorkspaceEditClientCapabilities,
WorkspaceSymbolClientCapabilities,
};
ClientCapabilities {
window: Some(WindowClientCapabilities {
work_done_progress: Some(true),
..Default::default()
}),
workspace: Some(WorkspaceClientCapabilities {
apply_edit: Some(true),
workspace_edit: Some(WorkspaceEditClientCapabilities {
document_changes: Some(true),
..Default::default()
}),
workspace_folders: Some(true),
configuration: Some(true),
symbol: Some(WorkspaceSymbolClientCapabilities {
dynamic_registration: Some(true),
..Default::default()
}),
diagnostic: Some(DiagnosticWorkspaceClientCapabilities {
refresh_support: Some(true),
}),
inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
refresh_support: Some(true),
}),
semantic_tokens: Some(SemanticTokensWorkspaceClientCapabilities {
refresh_support: Some(true),
}),
..Default::default()
}),
text_document: Some(TextDocumentClientCapabilities {
synchronization: Some(TextDocumentSyncClientCapabilities {
did_save: Some(true),
..Default::default()
}),
completion: Some(CompletionClientCapabilities {
dynamic_registration: Some(true),
..Default::default()
}),
hover: Some(HoverClientCapabilities {
dynamic_registration: Some(true),
content_format: Some(vec![MarkupKind::Markdown, MarkupKind::PlainText]),
}),
signature_help: Some(SignatureHelpClientCapabilities {
dynamic_registration: Some(true),
..Default::default()
}),
definition: Some(GotoCapability {
dynamic_registration: Some(true),
link_support: Some(true),
}),
references: Some(DynamicRegistrationClientCapabilities {
dynamic_registration: Some(true),
}),
document_highlight: Some(DocumentHighlightClientCapabilities {
dynamic_registration: Some(true),
}),
document_symbol: Some(DocumentSymbolClientCapabilities {
dynamic_registration: Some(true),
..Default::default()
}),
formatting: Some(DocumentFormattingClientCapabilities {
dynamic_registration: Some(true),
}),
range_formatting: Some(DocumentRangeFormattingClientCapabilities {
dynamic_registration: Some(true),
}),
code_action: Some(CodeActionClientCapabilities {
dynamic_registration: Some(true),
code_action_literal_support: Some(CodeActionLiteralSupport {
code_action_kind: CodeActionKindLiteralSupport {
value_set: vec![
String::new(),
"quickfix".to_string(),
"refactor".to_string(),
"refactor.extract".to_string(),
"refactor.inline".to_string(),
"refactor.rewrite".to_string(),
"source".to_string(),
"source.organizeImports".to_string(),
],
},
}),
..Default::default()
}),
rename: Some(RenameClientCapabilities {
dynamic_registration: Some(true),
prepare_support: Some(true),
honors_change_annotations: Some(true),
..Default::default()
}),
publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
related_information: Some(true),
tag_support: Some(TagSupport {
value_set: vec![DiagnosticTag::UNNECESSARY, DiagnosticTag::DEPRECATED],
}),
version_support: Some(true),
code_description_support: Some(true),
data_support: Some(true),
}),
inlay_hint: Some(InlayHintClientCapabilities {
dynamic_registration: Some(true),
..Default::default()
}),
diagnostic: Some(DiagnosticClientCapabilities {
dynamic_registration: Some(true),
..Default::default()
}),
folding_range: Some(FoldingRangeClientCapabilities {
dynamic_registration: Some(true),
line_folding_only: Some(true),
folding_range_kind: Some(FoldingRangeKindCapability {
value_set: Some(vec![
FoldingRangeKind::Comment,
FoldingRangeKind::Imports,
FoldingRangeKind::Region,
]),
}),
folding_range: Some(FoldingRangeCapability {
collapsed_text: Some(true),
}),
..Default::default()
}),
semantic_tokens: Some(SemanticTokensClientCapabilities {
dynamic_registration: Some(true),
requests: SemanticTokensClientCapabilitiesRequests {
range: Some(true),
full: Some(SemanticTokensFullOptions::Delta { delta: Some(true) }),
},
token_types: vec![
SemanticTokenType::NAMESPACE,
SemanticTokenType::TYPE,
SemanticTokenType::CLASS,
SemanticTokenType::ENUM,
SemanticTokenType::INTERFACE,
SemanticTokenType::STRUCT,
SemanticTokenType::TYPE_PARAMETER,
SemanticTokenType::PARAMETER,
SemanticTokenType::VARIABLE,
SemanticTokenType::PROPERTY,
SemanticTokenType::ENUM_MEMBER,
SemanticTokenType::EVENT,
SemanticTokenType::FUNCTION,
SemanticTokenType::METHOD,
SemanticTokenType::MACRO,
SemanticTokenType::KEYWORD,
SemanticTokenType::MODIFIER,
SemanticTokenType::COMMENT,
SemanticTokenType::STRING,
SemanticTokenType::NUMBER,
SemanticTokenType::REGEXP,
SemanticTokenType::OPERATOR,
SemanticTokenType::DECORATOR,
],
token_modifiers: vec![
SemanticTokenModifier::DECLARATION,
SemanticTokenModifier::DEFINITION,
SemanticTokenModifier::READONLY,
SemanticTokenModifier::STATIC,
SemanticTokenModifier::DEPRECATED,
SemanticTokenModifier::ABSTRACT,
SemanticTokenModifier::ASYNC,
SemanticTokenModifier::MODIFICATION,
SemanticTokenModifier::DOCUMENTATION,
SemanticTokenModifier::DEFAULT_LIBRARY,
],
formats: vec![TokenFormat::RELATIVE],
overlapping_token_support: Some(true),
multiline_token_support: Some(true),
server_cancel_support: Some(true),
augments_syntax_tokens: Some(true),
}),
..Default::default()
}),
general: Some(GeneralClientCapabilities {
..Default::default()
}),
experimental: Some(serde_json::json!({
"serverStatusNotification": true
})),
..Default::default()
}
}
use crate::services::lsp::manager::ServerCapabilitySummary;
fn extract_capability_summary(caps: &ServerCapabilities) -> ServerCapabilitySummary {
let (sem_legend, sem_full, sem_full_delta, sem_range) = caps
.semantic_tokens_provider
.as_ref()
.map(|provider| {
let (legend, full_opt) = match provider {
SemanticTokensServerCapabilities::SemanticTokensOptions(o) => {
(o.legend.clone(), &o.full)
}
SemanticTokensServerCapabilities::SemanticTokensRegistrationOptions(o) => (
o.semantic_tokens_options.legend.clone(),
&o.semantic_tokens_options.full,
),
};
let range = match provider {
SemanticTokensServerCapabilities::SemanticTokensOptions(o) => {
o.range.unwrap_or(false)
}
SemanticTokensServerCapabilities::SemanticTokensRegistrationOptions(o) => {
o.semantic_tokens_options.range.unwrap_or(false)
}
};
let full = match full_opt {
Some(SemanticTokensFullOptions::Bool(v)) => *v,
Some(SemanticTokensFullOptions::Delta { .. }) => true,
None => false,
};
let delta = match full_opt {
Some(SemanticTokensFullOptions::Delta { delta }) => delta.unwrap_or(false),
_ => false,
};
(Some(legend), full, delta, range)
})
.unwrap_or((None, false, false, false));
ServerCapabilitySummary {
initialized: false, hover: bool_or_options(&caps.hover_provider, |p| match p {
lsp_types::HoverProviderCapability::Simple(v) => *v,
lsp_types::HoverProviderCapability::Options(_) => true,
}),
completion: caps.completion_provider.is_some(),
completion_resolve: caps
.completion_provider
.as_ref()
.and_then(|cp| cp.resolve_provider)
.unwrap_or(false),
completion_trigger_characters: caps
.completion_provider
.as_ref()
.and_then(|cp| cp.trigger_characters.clone())
.unwrap_or_default(),
definition: bool_or_options(&caps.definition_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
references: bool_or_options(&caps.references_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
document_formatting: bool_or_options(&caps.document_formatting_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
document_range_formatting: bool_or_options(&caps.document_range_formatting_provider, |p| {
match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}
}),
rename: bool_or_options(&caps.rename_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
signature_help: caps.signature_help_provider.is_some(),
inlay_hints: bool_or_options(&caps.inlay_hint_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
folding_ranges: bool_or_options(&caps.folding_range_provider, |p| match p {
lsp_types::FoldingRangeProviderCapability::Simple(v) => *v,
_ => true,
}),
semantic_tokens_full: sem_full,
semantic_tokens_full_delta: sem_full_delta,
semantic_tokens_range: sem_range,
semantic_tokens_legend: sem_legend,
document_highlight: bool_or_options(&caps.document_highlight_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
code_action: bool_or_options(&caps.code_action_provider, |p| match p {
lsp_types::CodeActionProviderCapability::Simple(v) => *v,
lsp_types::CodeActionProviderCapability::Options(_) => true,
}),
code_action_resolve: caps.code_action_provider.as_ref().is_some_and(|p| match p {
lsp_types::CodeActionProviderCapability::Options(opts) => {
opts.resolve_provider.unwrap_or(false)
}
_ => false,
}),
document_symbols: bool_or_options(&caps.document_symbol_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
workspace_symbols: bool_or_options(&caps.workspace_symbol_provider, |p| match p {
lsp_types::OneOf::Left(v) => *v,
lsp_types::OneOf::Right(_) => true,
}),
diagnostics: caps.diagnostic_provider.is_some(),
}
}
fn bool_or_options<T>(opt: &Option<T>, check: impl FnOnce(&T) -> bool) -> bool {
opt.as_ref().is_some_and(check)
}
#[derive(Debug)]
enum LspCommand {
Initialize {
root_uri: Option<Uri>,
initialization_options: Option<Value>,
response: oneshot::Sender<Result<InitializeResult, String>>,
},
DidOpen {
uri: Uri,
text: String,
language_id: String,
},
DidChange {
uri: Uri,
content_changes: Vec<TextDocumentContentChangeEvent>,
},
DidClose { uri: Uri },
DidSave { uri: Uri, text: Option<String> },
DidChangeWorkspaceFolders {
added: Vec<lsp_types::WorkspaceFolder>,
removed: Vec<lsp_types::WorkspaceFolder>,
},
Completion {
request_id: u64,
uri: Uri,
line: u32,
character: u32,
},
GotoDefinition {
request_id: u64,
uri: Uri,
line: u32,
character: u32,
},
Rename {
request_id: u64,
uri: Uri,
line: u32,
character: u32,
new_name: String,
},
Hover {
request_id: u64,
uri: Uri,
line: u32,
character: u32,
},
References {
request_id: u64,
uri: Uri,
line: u32,
character: u32,
},
SignatureHelp {
request_id: u64,
uri: Uri,
line: u32,
character: u32,
},
CodeActions {
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
diagnostics: Vec<lsp_types::Diagnostic>,
},
DocumentDiagnostic {
request_id: u64,
uri: Uri,
previous_result_id: Option<String>,
},
InlayHints {
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
},
FoldingRange { request_id: u64, uri: Uri },
SemanticTokensFull { request_id: u64, uri: Uri },
SemanticTokensFullDelta {
request_id: u64,
uri: Uri,
previous_result_id: String,
},
SemanticTokensRange {
request_id: u64,
uri: Uri,
range: lsp_types::Range,
},
ExecuteCommand {
command: String,
arguments: Option<Vec<Value>>,
},
CodeActionResolve {
request_id: u64,
action: Box<lsp_types::CodeAction>,
},
CompletionResolve {
request_id: u64,
item: Box<lsp_types::CompletionItem>,
},
DocumentFormatting {
request_id: u64,
uri: Uri,
tab_size: u32,
insert_spaces: bool,
},
DocumentRangeFormatting {
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
tab_size: u32,
insert_spaces: bool,
},
PrepareRename {
request_id: u64,
uri: Uri,
line: u32,
character: u32,
},
CancelRequest {
request_id: u64,
},
PluginRequest {
request_id: u64,
method: String,
params: Option<Value>,
},
Shutdown,
}
#[derive(Clone)]
struct LspState {
stdin: Arc<tokio::sync::Mutex<ChildStdin>>,
next_id: Arc<AtomicI64>,
capabilities: Arc<std::sync::Mutex<Option<ServerCapabilities>>>,
document_versions: Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
pending_opens: Arc<std::sync::Mutex<HashMap<PathBuf, Instant>>>,
initialized: Arc<AtomicBool>,
async_tx: std_mpsc::Sender<AsyncMessage>,
language: Arc<String>,
server_name: Arc<String>,
active_requests: Arc<std::sync::Mutex<HashMap<u64, i64>>>,
language_id_overrides: Arc<HashMap<String, String>>,
}
#[allow(clippy::let_underscore_must_use)]
impl LspState {
async fn replay_pending_commands(&self, commands: Vec<LspCommand>, pending: &PendingRequests) {
if commands.is_empty() {
return;
}
tracing::info!(
"Replaying {} pending commands after initialization",
commands.len()
);
for cmd in commands {
match cmd {
LspCommand::DidOpen {
uri,
text,
language_id,
} => {
tracing::info!("Replaying DidOpen for {}", uri.as_str());
let _ = self
.handle_did_open_sequential(uri, text, language_id, pending)
.await;
}
LspCommand::DidChange {
uri,
content_changes,
} => {
tracing::info!("Replaying DidChange for {}", uri.as_str());
let _ = self
.handle_did_change_sequential(uri, content_changes, pending)
.await;
}
LspCommand::DidClose { uri } => {
tracing::info!("Replaying DidClose for {}", uri.as_str());
let _ = self.handle_did_close(uri).await;
}
LspCommand::DidSave { uri, text } => {
tracing::info!("Replaying DidSave for {}", uri.as_str());
let _ = self.handle_did_save(uri, text).await;
}
LspCommand::DidChangeWorkspaceFolders { added, removed } => {
tracing::info!(
"Replaying DidChangeWorkspaceFolders: +{} -{}",
added.len(),
removed.len()
);
let _ = self
.send_notification::<lsp_types::notification::DidChangeWorkspaceFolders>(
lsp_types::DidChangeWorkspaceFoldersParams {
event: lsp_types::WorkspaceFoldersChangeEvent { added, removed },
},
)
.await;
}
LspCommand::SemanticTokensFull { request_id, uri } => {
tracing::info!("Replaying semantic tokens request for {}", uri.as_str());
let s = self.clone();
let p = pending.clone();
tokio::spawn(async move {
let _ = s.handle_semantic_tokens_full(request_id, uri, &p).await;
});
}
LspCommand::SemanticTokensFullDelta {
request_id,
uri,
previous_result_id,
} => {
tracing::info!(
"Replaying semantic tokens delta request for {}",
uri.as_str()
);
let s = self.clone();
let p = pending.clone();
tokio::spawn(async move {
let _ = s
.handle_semantic_tokens_full_delta(
request_id,
uri,
previous_result_id,
&p,
)
.await;
});
}
LspCommand::SemanticTokensRange {
request_id,
uri,
range,
} => {
tracing::info!(
"Replaying semantic tokens range request for {}",
uri.as_str()
);
let s = self.clone();
let p = pending.clone();
tokio::spawn(async move {
let _ = s
.handle_semantic_tokens_range(request_id, uri, range, &p)
.await;
});
}
LspCommand::FoldingRange { request_id, uri } => {
tracing::info!("Replaying folding range request for {}", uri.as_str());
let s = self.clone();
let p = pending.clone();
tokio::spawn(async move {
let _ = s.handle_folding_ranges(request_id, uri, &p).await;
});
}
_ => {}
}
}
}
async fn write_message<T: Serialize>(&self, message: &T) -> Result<(), String> {
let json =
serde_json::to_string(message).map_err(|e| format!("Serialization error: {}", e))?;
let content = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);
tracing::trace!("Writing LSP message to stdin ({} bytes)", content.len());
let mut stdin = self.stdin.lock().await;
stdin
.write_all(content.as_bytes())
.await
.map_err(|e| format!("Failed to write to stdin: {}", e))?;
stdin
.flush()
.await
.map_err(|e| format!("Failed to flush stdin: {}", e))?;
tracing::trace!("Successfully sent LSP message");
Ok(())
}
async fn send_notification<N>(&self, params: N::Params) -> Result<(), String>
where
N: Notification,
{
let notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: N::METHOD.to_string(),
params: Some(
serde_json::to_value(params)
.map_err(|e| format!("Failed to serialize params: {}", e))?,
),
};
self.write_message(¬ification).await
}
async fn send_request_sequential<P: Serialize, R: for<'de> Deserialize<'de>>(
&self,
method: &str,
params: Option<P>,
pending: &PendingRequests,
) -> Result<R, String> {
self.send_request_with_timeout(
method,
params,
pending,
None,
Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MS),
)
.await
}
async fn send_request_sequential_tracked<P: Serialize, R: for<'de> Deserialize<'de>>(
&self,
method: &str,
params: Option<P>,
pending: &PendingRequests,
editor_request_id: Option<u64>,
) -> Result<R, String> {
self.send_request_with_timeout(
method,
params,
pending,
editor_request_id,
Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MS),
)
.await
}
async fn send_request_with_timeout<P: Serialize, R: for<'de> Deserialize<'de>>(
&self,
method: &str,
params: Option<P>,
pending: &PendingRequests,
editor_request_id: Option<u64>,
timeout: Duration,
) -> Result<R, String> {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
if let Some(editor_id) = editor_request_id {
self.active_requests.lock().unwrap().insert(editor_id, id);
tracing::trace!("Tracking request: editor_id={}, lsp_id={}", editor_id, id);
}
let params_value = params
.map(|p| serde_json::to_value(p))
.transpose()
.map_err(|e| format!("Failed to serialize params: {}", e))?;
let request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id,
method: method.to_string(),
params: params_value,
};
let (tx, rx) = oneshot::channel();
pending.lock().unwrap().insert(id, tx);
if let Err(e) = self.write_message(&request).await {
pending.lock().unwrap().remove(&id);
if let Some(editor_id) = editor_request_id {
self.active_requests.lock().unwrap().remove(&editor_id);
}
return Err(e);
}
tracing::trace!(
"Sent LSP request id={} method={}, waiting up to {:?} for response",
id,
method,
timeout
);
let response_result = match tokio::time::timeout(timeout, rx).await {
Ok(Ok(inner)) => inner,
Ok(Err(_)) => Err("Response channel closed".to_string()),
Err(_) => {
pending.lock().unwrap().remove(&id);
tracing::warn!(
"LSP request '{}' (lsp_id={}) on '{}' ({}) timed out after {:?}; sending $/cancelRequest",
method,
id,
self.server_name.as_str(),
self.language.as_str(),
timeout
);
let _ = self.send_cancel_request(id).await;
Err(format!(
"Request '{}' timed out after {:?}",
method, timeout
))
}
};
if let Some(editor_id) = editor_request_id {
self.active_requests.lock().unwrap().remove(&editor_id);
tracing::trace!("Completed request: editor_id={}, lsp_id={}", editor_id, id);
}
let result = response_result?;
serde_json::from_value(result).map_err(|e| format!("Failed to deserialize response: {}", e))
}
async fn handle_initialize_sequential(
&self,
root_uri: Option<Uri>,
initialization_options: Option<Value>,
pending: &PendingRequests,
) -> Result<InitializeResult, String> {
tracing::info!(
"Initializing async LSP server with root_uri: {:?}, initialization_options: {:?}",
root_uri,
initialization_options
);
let workspace_folders = root_uri.as_ref().map(|uri| {
vec![WorkspaceFolder {
uri: uri.clone(),
name: uri
.path()
.as_str()
.split('/')
.next_back()
.unwrap_or("workspace")
.to_string(),
}]
});
#[allow(deprecated)]
let params = InitializeParams {
process_id: Some(std::process::id()),
capabilities: create_client_capabilities(),
workspace_folders,
initialization_options,
root_uri: root_uri.clone(),
..Default::default()
};
let result: InitializeResult = self
.send_request_sequential(Initialize::METHOD, Some(params), pending)
.await?;
tracing::info!(
"LSP initialize result: position_encoding={:?}",
result.capabilities.position_encoding
);
*self.capabilities.lock().unwrap() = Some(result.capabilities.clone());
self.send_notification::<Initialized>(InitializedParams {})
.await?;
self.initialized.store(true, Ordering::SeqCst);
let capabilities = extract_capability_summary(&result.capabilities);
let _ = self.async_tx.send(AsyncMessage::LspInitialized {
language: (*self.language).clone(),
server_name: (*self.server_name).clone(),
capabilities,
});
let _ = self.async_tx.send(AsyncMessage::LspStatusUpdate {
language: (*self.language).clone(),
server_name: (*self.server_name).clone(),
status: LspServerStatus::Running,
message: None,
});
tracing::info!("Async LSP server initialized successfully");
Ok(result)
}
async fn handle_did_open_sequential(
&self,
uri: Uri,
text: String,
language_id: String,
_pending: &PendingRequests,
) -> Result<(), String> {
let path = PathBuf::from(uri.path().as_str());
if should_skip_did_open(&self.document_versions, &path, self.language.as_str(), &uri) {
return Ok(());
}
tracing::trace!("LSP: did_open for {}", uri.as_str());
let lsp_language_id = path
.extension()
.and_then(|e| e.to_str())
.and_then(|ext| self.language_id_overrides.get(ext))
.cloned()
.unwrap_or(language_id);
let params = DidOpenTextDocumentParams {
text_document: TextDocumentItem {
uri: uri.clone(),
language_id: lsp_language_id,
version: 0,
text,
},
};
self.document_versions
.lock()
.unwrap()
.insert(path.clone(), 0);
self.pending_opens
.lock()
.unwrap()
.insert(path, Instant::now());
self.send_notification::<DidOpenTextDocument>(params).await
}
async fn handle_did_change_sequential(
&self,
uri: Uri,
content_changes: Vec<TextDocumentContentChangeEvent>,
_pending: &PendingRequests,
) -> Result<(), String> {
tracing::trace!("LSP: did_change for {}", uri.as_str());
let path = PathBuf::from(uri.path().as_str());
if !self.document_versions.lock().unwrap().contains_key(&path) {
tracing::debug!(
"LSP ({}): skipping didChange - document not yet opened",
self.language
);
return Ok(());
}
let opened_at = self.pending_opens.lock().unwrap().get(&path).copied();
if let Some(opened_at) = opened_at {
let elapsed = opened_at.elapsed();
let grace_period = std::time::Duration::from_millis(DID_OPEN_GRACE_PERIOD_MS);
if elapsed < grace_period {
let wait_time = grace_period - elapsed;
tracing::debug!(
"LSP ({}): waiting {:?} for didOpen grace period before didChange",
self.language,
wait_time
);
tokio::time::sleep(wait_time).await;
}
self.pending_opens.lock().unwrap().remove(&path);
}
let new_version = {
let mut versions = self.document_versions.lock().unwrap();
let version = versions.entry(path).or_insert(0);
*version += 1;
*version
};
let params = DidChangeTextDocumentParams {
text_document: VersionedTextDocumentIdentifier {
uri: uri.clone(),
version: new_version as i32,
},
content_changes,
};
self.send_notification::<DidChangeTextDocument>(params)
.await
}
async fn handle_did_save(&self, uri: Uri, text: Option<String>) -> Result<(), String> {
tracing::trace!("LSP: did_save for {}", uri.as_str());
let params = DidSaveTextDocumentParams {
text_document: TextDocumentIdentifier { uri },
text,
};
self.send_notification::<DidSaveTextDocument>(params).await
}
async fn handle_did_close(&self, uri: Uri) -> Result<(), String> {
let path = PathBuf::from(uri.path().as_str());
if self
.document_versions
.lock()
.unwrap()
.remove(&path)
.is_some()
{
tracing::info!("LSP ({}): didClose for {}", self.language, uri.as_str());
} else {
tracing::debug!(
"LSP ({}): didClose for {} but document was not tracked",
self.language,
uri.as_str()
);
}
self.pending_opens.lock().unwrap().remove(&path);
let params = DidCloseTextDocumentParams {
text_document: TextDocumentIdentifier { uri },
};
self.send_notification::<DidCloseTextDocument>(params).await
}
async fn handle_completion(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::CompletionParams;
tracing::trace!(
"LSP: completion request at {}:{}:{}",
uri.as_str(),
line,
character
);
let params = CompletionParams {
text_document_position: TextDocumentPositionParams {
text_document: TextDocumentIdentifier { uri },
position: Position { line, character },
},
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
context: None,
};
match self
.send_request_sequential_tracked::<_, Value>(
"textDocument/completion",
Some(params),
pending,
Some(request_id),
)
.await
{
Ok(result) => {
let items = if let Ok(list) =
serde_json::from_value::<lsp_types::CompletionList>(result.clone())
{
list.items
} else {
serde_json::from_value::<Vec<lsp_types::CompletionItem>>(result)
.unwrap_or_default()
};
let _ = self
.async_tx
.send(AsyncMessage::LspCompletion { request_id, items });
Ok(())
}
Err(e) => {
tracing::debug!("Completion request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspCompletion {
request_id,
items: vec![],
});
Err(e)
}
}
}
async fn handle_goto_definition(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::GotoDefinitionParams;
tracing::trace!(
"LSP: go-to-definition request at {}:{}:{}",
uri.as_str(),
line,
character
);
let params = GotoDefinitionParams {
text_document_position_params: TextDocumentPositionParams {
text_document: TextDocumentIdentifier { uri },
position: Position { line, character },
},
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
};
match self
.send_request_sequential::<_, Value>("textDocument/definition", Some(params), pending)
.await
{
Ok(result) => {
let locations = if let Ok(loc) =
serde_json::from_value::<lsp_types::Location>(result.clone())
{
vec![loc]
} else if let Ok(locs) =
serde_json::from_value::<Vec<lsp_types::Location>>(result.clone())
{
locs
} else if let Ok(links) =
serde_json::from_value::<Vec<lsp_types::LocationLink>>(result)
{
links
.into_iter()
.map(|link| lsp_types::Location {
uri: link.target_uri,
range: link.target_selection_range,
})
.collect()
} else {
vec![]
};
let _ = self.async_tx.send(AsyncMessage::LspGotoDefinition {
request_id,
locations,
});
Ok(())
}
Err(e) => {
tracing::debug!("Go-to-definition request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspGotoDefinition {
request_id,
locations: vec![],
});
Err(e)
}
}
}
async fn handle_rename(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
new_name: String,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::RenameParams;
tracing::trace!(
"LSP: rename request at {}:{}:{} to '{}'",
uri.as_str(),
line,
character,
new_name
);
let params = RenameParams {
text_document_position: TextDocumentPositionParams {
text_document: TextDocumentIdentifier { uri },
position: Position { line, character },
},
new_name,
work_done_progress_params: WorkDoneProgressParams::default(),
};
match self
.send_request_sequential::<_, Value>("textDocument/rename", Some(params), pending)
.await
{
Ok(result) => {
match serde_json::from_value::<lsp_types::WorkspaceEdit>(result) {
Ok(workspace_edit) => {
let _ = self.async_tx.send(AsyncMessage::LspRename {
request_id,
result: Ok(workspace_edit),
});
Ok(())
}
Err(e) => {
tracing::error!("Failed to parse rename response: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspRename {
request_id,
result: Err(format!("Failed to parse rename response: {}", e)),
});
Err(format!("Failed to parse rename response: {}", e))
}
}
}
Err(e) => {
tracing::debug!("Rename request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspRename {
request_id,
result: Err(e.clone()),
});
Err(e)
}
}
}
async fn handle_hover(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::HoverParams;
tracing::trace!(
"LSP: hover request at {}:{}:{}",
uri.as_str(),
line,
character
);
let params = HoverParams {
text_document_position_params: TextDocumentPositionParams {
text_document: TextDocumentIdentifier { uri },
position: Position { line, character },
},
work_done_progress_params: WorkDoneProgressParams::default(),
};
match self
.send_request_sequential::<_, Value>("textDocument/hover", Some(params), pending)
.await
{
Ok(result) => {
tracing::debug!("Raw LSP hover response: {:?}", result);
let (contents, is_markdown, range) = if result.is_null() {
(String::new(), false, None)
} else {
match serde_json::from_value::<lsp_types::Hover>(result) {
Ok(hover) => {
let (contents, is_markdown) =
Self::extract_hover_contents(&hover.contents);
let range = hover.range.map(|r| {
(
(r.start.line, r.start.character),
(r.end.line, r.end.character),
)
});
(contents, is_markdown, range)
}
Err(e) => {
tracing::error!("Failed to parse hover response: {}", e);
(String::new(), false, None)
}
}
};
let _ = self.async_tx.send(AsyncMessage::LspHover {
request_id,
contents,
is_markdown,
range,
});
Ok(())
}
Err(e) => {
tracing::debug!("Hover request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspHover {
request_id,
contents: String::new(),
is_markdown: false,
range: None,
});
Err(e)
}
}
}
fn extract_hover_contents(contents: &lsp_types::HoverContents) -> (String, bool) {
use lsp_types::{HoverContents, MarkedString, MarkupContent, MarkupKind};
match contents {
HoverContents::Scalar(marked) => match marked {
MarkedString::String(s) => (s.clone(), false),
MarkedString::LanguageString(ls) => {
(format!("```{}\n{}\n```", ls.language, ls.value), true)
}
},
HoverContents::Array(arr) => {
let content = arr
.iter()
.map(|marked| match marked {
MarkedString::String(s) => s.clone(),
MarkedString::LanguageString(ls) => {
format!("```{}\n{}\n```", ls.language, ls.value)
}
})
.collect::<Vec<_>>()
.join("\n\n");
(content, true)
}
HoverContents::Markup(MarkupContent { kind, value }) => {
let is_markdown = matches!(kind, MarkupKind::Markdown);
(value.clone(), is_markdown)
}
}
}
async fn handle_references(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::{ReferenceContext, ReferenceParams};
tracing::trace!(
"LSP: find references request at {}:{}:{}",
uri.as_str(),
line,
character
);
let params = ReferenceParams {
text_document_position: lsp_types::TextDocumentPositionParams {
text_document: TextDocumentIdentifier { uri },
position: Position { line, character },
},
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
context: ReferenceContext {
include_declaration: true,
},
};
match self
.send_request_sequential::<_, Value>("textDocument/references", Some(params), pending)
.await
{
Ok(result) => {
let locations = if result.is_null() {
Vec::new()
} else {
serde_json::from_value::<Vec<lsp_types::Location>>(result).unwrap_or_default()
};
tracing::trace!("LSP: found {} references", locations.len());
let _ = self.async_tx.send(AsyncMessage::LspReferences {
request_id,
locations,
});
Ok(())
}
Err(e) => {
tracing::debug!("Find references request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspReferences {
request_id,
locations: Vec::new(),
});
Err(e)
}
}
}
async fn handle_signature_help(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::SignatureHelpParams;
tracing::trace!(
"LSP: signature help request at {}:{}:{}",
uri.as_str(),
line,
character
);
let params = SignatureHelpParams {
text_document_position_params: TextDocumentPositionParams {
text_document: TextDocumentIdentifier { uri },
position: Position { line, character },
},
work_done_progress_params: WorkDoneProgressParams::default(),
context: None, };
match self
.send_request_sequential::<_, Value>(
"textDocument/signatureHelp",
Some(params),
pending,
)
.await
{
Ok(result) => {
let signature_help = if result.is_null() {
None
} else {
serde_json::from_value::<lsp_types::SignatureHelp>(result).ok()
};
tracing::trace!(
"LSP: signature help received: {} signatures",
signature_help
.as_ref()
.map(|h| h.signatures.len())
.unwrap_or(0)
);
let _ = self.async_tx.send(AsyncMessage::LspSignatureHelp {
request_id,
signature_help,
});
Ok(())
}
Err(e) => {
tracing::debug!("Signature help request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspSignatureHelp {
request_id,
signature_help: None,
});
Err(e)
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_code_actions(
&self,
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
diagnostics: Vec<lsp_types::Diagnostic>,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::{CodeActionContext, CodeActionParams};
tracing::trace!(
"LSP: code actions request at {}:{}:{}-{}:{}",
uri.as_str(),
start_line,
start_char,
end_line,
end_char
);
let params = CodeActionParams {
text_document: TextDocumentIdentifier { uri },
range: Range {
start: Position {
line: start_line,
character: start_char,
},
end: Position {
line: end_line,
character: end_char,
},
},
context: CodeActionContext {
diagnostics,
only: None,
trigger_kind: None,
},
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
};
match self
.send_request_sequential::<_, Value>("textDocument/codeAction", Some(params), pending)
.await
{
Ok(result) => {
let actions = if result.is_null() {
Vec::new()
} else {
serde_json::from_value::<Vec<lsp_types::CodeActionOrCommand>>(result)
.unwrap_or_default()
};
tracing::trace!("LSP: received {} code actions", actions.len());
let _ = self.async_tx.send(AsyncMessage::LspCodeActions {
request_id,
actions,
});
Ok(())
}
Err(e) => {
tracing::debug!("Code actions request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspCodeActions {
request_id,
actions: Vec::new(),
});
Err(e)
}
}
}
async fn handle_execute_command(
&self,
command: String,
arguments: Option<Vec<Value>>,
pending: &PendingRequests,
) -> Result<(), String> {
let params = lsp_types::ExecuteCommandParams {
command: command.clone(),
arguments: arguments.unwrap_or_default(),
work_done_progress_params: lsp_types::WorkDoneProgressParams::default(),
};
match self
.send_request_sequential::<_, Value>("workspace/executeCommand", Some(params), pending)
.await
{
Ok(_) => {
tracing::info!("ExecuteCommand '{}' completed", command);
Ok(())
}
Err(e) => {
tracing::debug!("ExecuteCommand '{}' failed: {}", command, e);
Err(e)
}
}
}
async fn handle_code_action_resolve(
&self,
request_id: u64,
action: lsp_types::CodeAction,
pending: &PendingRequests,
) -> Result<(), String> {
match self
.send_request_sequential::<_, Value>("codeAction/resolve", Some(action), pending)
.await
{
Ok(result) => {
let resolved = serde_json::from_value::<lsp_types::CodeAction>(result)
.map_err(|e| format!("Failed to parse codeAction/resolve response: {}", e));
let _ = self.async_tx.send(AsyncMessage::LspCodeActionResolved {
request_id,
action: resolved,
});
Ok(())
}
Err(e) => {
tracing::debug!("codeAction/resolve failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspCodeActionResolved {
request_id,
action: Err(e.clone()),
});
Err(e)
}
}
}
async fn handle_completion_resolve(
&self,
request_id: u64,
item: lsp_types::CompletionItem,
pending: &PendingRequests,
) -> Result<(), String> {
match self
.send_request_sequential::<_, Value>("completionItem/resolve", Some(item), pending)
.await
{
Ok(result) => {
let resolved = serde_json::from_value::<lsp_types::CompletionItem>(result)
.map_err(|e| format!("Failed to parse completionItem/resolve response: {}", e));
let _ = self.async_tx.send(AsyncMessage::LspCompletionResolved {
request_id,
item: resolved,
});
Ok(())
}
Err(e) => {
tracing::debug!("completionItem/resolve failed: {}", e);
Err(e)
}
}
}
async fn handle_document_formatting(
&self,
request_id: u64,
uri: Uri,
tab_size: u32,
insert_spaces: bool,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::{DocumentFormattingParams, FormattingOptions};
let params = DocumentFormattingParams {
text_document: TextDocumentIdentifier { uri: uri.clone() },
options: FormattingOptions {
tab_size,
insert_spaces,
..Default::default()
},
work_done_progress_params: WorkDoneProgressParams::default(),
};
match self
.send_request_sequential::<_, Value>("textDocument/formatting", Some(params), pending)
.await
{
Ok(result) => {
let edits = if result.is_null() {
Vec::new()
} else {
serde_json::from_value::<Vec<lsp_types::TextEdit>>(result).unwrap_or_default()
};
let _ = self.async_tx.send(AsyncMessage::LspFormatting {
request_id,
uri: uri.as_str().to_string(),
edits,
});
Ok(())
}
Err(e) => {
tracing::debug!("textDocument/formatting failed: {}", e);
Err(e)
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_document_range_formatting(
&self,
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
tab_size: u32,
insert_spaces: bool,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::{DocumentRangeFormattingParams, FormattingOptions};
let params = DocumentRangeFormattingParams {
text_document: TextDocumentIdentifier { uri: uri.clone() },
range: Range {
start: Position::new(start_line, start_char),
end: Position::new(end_line, end_char),
},
options: FormattingOptions {
tab_size,
insert_spaces,
..Default::default()
},
work_done_progress_params: WorkDoneProgressParams::default(),
};
match self
.send_request_sequential::<_, Value>(
"textDocument/rangeFormatting",
Some(params),
pending,
)
.await
{
Ok(result) => {
let edits = if result.is_null() {
Vec::new()
} else {
serde_json::from_value::<Vec<lsp_types::TextEdit>>(result).unwrap_or_default()
};
let _ = self.async_tx.send(AsyncMessage::LspFormatting {
request_id,
uri: uri.as_str().to_string(),
edits,
});
Ok(())
}
Err(e) => {
tracing::debug!("textDocument/rangeFormatting failed: {}", e);
Err(e)
}
}
}
async fn handle_prepare_rename(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
pending: &PendingRequests,
) -> Result<(), String> {
let params = TextDocumentPositionParams {
text_document: TextDocumentIdentifier { uri },
position: Position::new(line, character),
};
match self
.send_request_sequential::<_, Value>(
"textDocument/prepareRename",
Some(params),
pending,
)
.await
{
Ok(result) => {
let _ = self.async_tx.send(AsyncMessage::LspPrepareRename {
request_id,
result: Ok(result),
});
Ok(())
}
Err(e) => {
let _ = self.async_tx.send(AsyncMessage::LspPrepareRename {
request_id,
result: Err(e.clone()),
});
Err(e)
}
}
}
async fn handle_document_diagnostic(
&self,
request_id: u64,
uri: Uri,
previous_result_id: Option<String>,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::DocumentDiagnosticParams;
let supports_pull = self
.capabilities
.lock()
.unwrap()
.as_ref()
.and_then(|c| c.diagnostic_provider.as_ref())
.is_some();
if !supports_pull {
tracing::trace!(
"LSP: server does not support pull diagnostics, skipping request for {}",
uri.as_str()
);
return Ok(());
}
tracing::trace!(
"LSP: document diagnostic request for {} (previous_result_id: {:?})",
uri.as_str(),
previous_result_id
);
let params = DocumentDiagnosticParams {
text_document: TextDocumentIdentifier { uri: uri.clone() },
identifier: None,
previous_result_id,
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
};
match self
.send_request_sequential::<_, Value>("textDocument/diagnostic", Some(params), pending)
.await
{
Ok(result) => {
let uri_string = uri.as_str().to_string();
if let Ok(full_report) = serde_json::from_value::<
lsp_types::RelatedFullDocumentDiagnosticReport,
>(result.clone())
{
let diagnostics = full_report.full_document_diagnostic_report.items;
let result_id = full_report.full_document_diagnostic_report.result_id;
tracing::trace!(
"LSP: received {} diagnostics for {} (result_id: {:?})",
diagnostics.len(),
uri_string,
result_id
);
let _ = self.async_tx.send(AsyncMessage::LspPulledDiagnostics {
request_id,
uri: uri_string,
result_id,
diagnostics,
unchanged: false,
});
} else if let Ok(unchanged_report) = serde_json::from_value::<
lsp_types::RelatedUnchangedDocumentDiagnosticReport,
>(result.clone())
{
let result_id = unchanged_report
.unchanged_document_diagnostic_report
.result_id;
tracing::trace!(
"LSP: diagnostics unchanged for {} (result_id: {:?})",
uri_string,
result_id
);
let _ = self.async_tx.send(AsyncMessage::LspPulledDiagnostics {
request_id,
uri: uri_string,
result_id: Some(result_id),
diagnostics: Vec::new(),
unchanged: true,
});
} else {
tracing::warn!(
"LSP: could not parse diagnostic report, sending empty: {}",
result
);
let _ = self.async_tx.send(AsyncMessage::LspPulledDiagnostics {
request_id,
uri: uri_string,
result_id: None,
diagnostics: Vec::new(),
unchanged: false,
});
}
Ok(())
}
Err(e) => {
tracing::debug!("Document diagnostic request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspPulledDiagnostics {
request_id,
uri: uri.as_str().to_string(),
result_id: None,
diagnostics: Vec::new(),
unchanged: false,
});
Err(e)
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_inlay_hints(
&self,
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::InlayHintParams;
tracing::trace!(
"LSP: inlay hints request for {} ({}:{} - {}:{})",
uri.as_str(),
start_line,
start_char,
end_line,
end_char
);
let params = InlayHintParams {
text_document: TextDocumentIdentifier { uri: uri.clone() },
range: Range {
start: Position {
line: start_line,
character: start_char,
},
end: Position {
line: end_line,
character: end_char,
},
},
work_done_progress_params: WorkDoneProgressParams::default(),
};
match self
.send_request_sequential::<_, Option<Vec<lsp_types::InlayHint>>>(
"textDocument/inlayHint",
Some(params),
pending,
)
.await
{
Ok(hints) => {
let hints = hints.unwrap_or_default();
let uri_string = uri.as_str().to_string();
tracing::trace!(
"LSP: received {} inlay hints for {}",
hints.len(),
uri_string
);
let _ = self.async_tx.send(AsyncMessage::LspInlayHints {
request_id,
uri: uri_string,
hints,
});
Ok(())
}
Err(e) => {
tracing::debug!("Inlay hints request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspInlayHints {
request_id,
uri: uri.as_str().to_string(),
hints: Vec::new(),
});
Err(e)
}
}
}
async fn handle_folding_ranges(
&self,
request_id: u64,
uri: Uri,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::FoldingRangeParams;
tracing::trace!("LSP: folding range request for {}", uri.as_str());
let params = FoldingRangeParams {
text_document: TextDocumentIdentifier { uri: uri.clone() },
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
};
match self
.send_request_sequential::<_, Option<Vec<lsp_types::FoldingRange>>>(
"textDocument/foldingRange",
Some(params),
pending,
)
.await
{
Ok(ranges) => {
let ranges = ranges.unwrap_or_default();
let uri_string = uri.as_str().to_string();
tracing::trace!(
"LSP: received {} folding ranges for {}",
ranges.len(),
uri_string
);
let _ = self.async_tx.send(AsyncMessage::LspFoldingRanges {
request_id,
uri: uri_string,
ranges,
});
Ok(())
}
Err(e) => {
tracing::debug!("Folding range request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspFoldingRanges {
request_id,
uri: uri.as_str().to_string(),
ranges: Vec::new(),
});
Err(e)
}
}
}
async fn handle_semantic_tokens_full(
&self,
request_id: u64,
uri: Uri,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::request::SemanticTokensFullRequest;
tracing::trace!("LSP: semanticTokens/full request for {}", uri.as_str());
let params = SemanticTokensParams {
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
text_document: TextDocumentIdentifier { uri: uri.clone() },
};
match self
.send_request_sequential_tracked::<_, Option<SemanticTokensResult>>(
SemanticTokensFullRequest::METHOD,
Some(params),
pending,
Some(request_id),
)
.await
{
Ok(result) => {
let _ = self.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::Full(Ok(result)),
});
Ok(())
}
Err(e) => {
tracing::debug!("Semantic tokens request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::Full(Err(e.clone())),
});
Err(e)
}
}
}
async fn handle_semantic_tokens_full_delta(
&self,
request_id: u64,
uri: Uri,
previous_result_id: String,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::{
request::SemanticTokensFullDeltaRequest, SemanticTokensDeltaParams,
SemanticTokensFullDeltaResult,
};
tracing::trace!(
"LSP: semanticTokens/full/delta request for {}",
uri.as_str()
);
let params = SemanticTokensDeltaParams {
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
text_document: TextDocumentIdentifier { uri: uri.clone() },
previous_result_id,
};
match self
.send_request_sequential_tracked::<_, Option<SemanticTokensFullDeltaResult>>(
SemanticTokensFullDeltaRequest::METHOD,
Some(params),
pending,
Some(request_id),
)
.await
{
Ok(result) => {
let _ = self.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::FullDelta(Ok(result)),
});
Ok(())
}
Err(e) => {
tracing::debug!("Semantic tokens delta request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::FullDelta(Err(e.clone())),
});
Err(e)
}
}
}
async fn handle_semantic_tokens_range(
&self,
request_id: u64,
uri: Uri,
range: lsp_types::Range,
pending: &PendingRequests,
) -> Result<(), String> {
use lsp_types::{request::SemanticTokensRangeRequest, SemanticTokensRangeParams};
tracing::trace!("LSP: semanticTokens/range request for {}", uri.as_str());
let params = SemanticTokensRangeParams {
work_done_progress_params: WorkDoneProgressParams::default(),
partial_result_params: PartialResultParams::default(),
text_document: TextDocumentIdentifier { uri: uri.clone() },
range,
};
match self
.send_request_sequential_tracked::<_, Option<lsp_types::SemanticTokensRangeResult>>(
SemanticTokensRangeRequest::METHOD,
Some(params),
pending,
Some(request_id),
)
.await
{
Ok(result) => {
let _ = self.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::Range(Ok(result)),
});
Ok(())
}
Err(e) => {
tracing::debug!("Semantic tokens range request failed: {}", e);
let _ = self.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::Range(Err(e.clone())),
});
Err(e)
}
}
}
async fn handle_plugin_request(
&self,
request_id: u64,
method: String,
params: Option<Value>,
pending: &PendingRequests,
) {
tracing::trace!(
"Plugin request {} => method={} params={:?}",
request_id,
method,
params
);
let result = self
.send_request_sequential_tracked::<Value, Value>(
&method,
params,
pending,
Some(request_id),
)
.await;
tracing::trace!(
"Plugin request {} completed with result {:?}",
request_id,
&result
);
let _ = self.async_tx.send(AsyncMessage::PluginLspResponse {
language: (*self.language).clone(),
request_id,
result,
});
}
async fn handle_shutdown(&self) -> Result<(), String> {
tracing::info!("Shutting down async LSP server");
let notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "shutdown".to_string(),
params: None,
};
self.write_message(¬ification).await?;
let exit = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "exit".to_string(),
params: None,
};
self.write_message(&exit).await
}
async fn send_cancel_request(&self, lsp_id: i64) -> Result<(), String> {
tracing::trace!("Sending $/cancelRequest for LSP id {}", lsp_id);
let notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "$/cancelRequest".to_string(),
params: Some(serde_json::json!({ "id": lsp_id })),
};
self.write_message(¬ification).await
}
async fn handle_cancel_request(&self, request_id: u64) -> Result<(), String> {
let lsp_id = self.active_requests.lock().unwrap().remove(&request_id);
if let Some(lsp_id) = lsp_id {
tracing::info!(
"Cancelling request: editor_id={}, lsp_id={}",
request_id,
lsp_id
);
self.send_cancel_request(lsp_id).await
} else {
tracing::trace!(
"Cancel request ignored: no active LSP request for editor_id={}",
request_id
);
Ok(())
}
}
}
struct LspTask {
_process: crate::services::remote::StdioChild,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
next_id: i64,
pending: HashMap<i64, oneshot::Sender<Result<Value, String>>>,
capabilities: Option<ServerCapabilities>,
document_versions: Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
pending_opens: HashMap<PathBuf, Instant>,
initialized: bool,
async_tx: std_mpsc::Sender<AsyncMessage>,
language: String,
server_name: String,
server_command: String,
stderr_log_path: std::path::PathBuf,
language_id_overrides: HashMap<String, String>,
}
impl LspTask {
#[allow(clippy::too_many_arguments)]
async fn spawn(
command: &str,
args: &[String],
env: &std::collections::HashMap<String, String>,
language: String,
server_name: String,
async_tx: std_mpsc::Sender<AsyncMessage>,
process_limits: &ProcessLimits,
stderr_log_path: std::path::PathBuf,
language_id_overrides: HashMap<String, String>,
document_versions: Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
long_running_spawner: Arc<dyn crate::services::remote::LongRunningSpawner>,
) -> Result<Self, String> {
tracing::info!("Spawning async LSP server: {} {:?}", command, args);
tracing::info!("Process limits: {:?}", process_limits);
tracing::info!("LSP stderr will be logged to: {:?}", stderr_log_path);
if !long_running_spawner.command_exists(command).await {
return Err(format!(
"LSP server executable '{}' not found in the active authority's PATH. \
Please install it or check your configuration.",
command
));
}
let env_pairs: Vec<(String, String)> =
env.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
let mut stdio_child = long_running_spawner
.spawn_stdio(command, args, env_pairs, None, Some(process_limits))
.await
.map_err(|e| format!("Failed to spawn LSP server '{}': {}", command, e))?;
let stdin = stdio_child
.take_stdin()
.ok_or_else(|| "Failed to get stdin".to_string())?;
let stdout_stream = stdio_child
.take_stdout()
.ok_or_else(|| "Failed to get stdout".to_string())?;
let stdout = BufReader::new(stdout_stream);
if let Some(stderr_stream) = stdio_child.take_stderr() {
let log_path = stderr_log_path.clone();
tokio::spawn(async move {
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader};
let mut file = match File::create(&log_path).await {
Ok(f) => f,
Err(e) => {
tracing::warn!("Could not create LSP stderr log {:?}: {}", log_path, e);
return;
}
};
let mut reader = TokioBufReader::new(stderr_stream);
let mut buf = String::new();
loop {
buf.clear();
match reader.read_line(&mut buf).await {
Ok(0) => break,
Ok(_) => {
if let Err(e) = file.write_all(buf.as_bytes()).await {
tracing::warn!(
"Write to LSP stderr log {:?} failed: {}",
log_path,
e
);
return;
}
}
Err(e) => {
tracing::debug!("LSP stderr stream closed for {:?}: {}", log_path, e);
return;
}
}
}
});
}
Ok(Self {
_process: stdio_child,
stdin,
stdout,
next_id: 0,
pending: HashMap::new(),
capabilities: None,
document_versions,
pending_opens: HashMap::new(),
initialized: false,
async_tx,
language,
server_name,
server_command: command.to_string(),
stderr_log_path,
language_id_overrides,
})
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::let_underscore_must_use)] fn spawn_stdout_reader(
mut stdout: BufReader<ChildStdout>,
pending: PendingRequests,
async_tx: std_mpsc::Sender<AsyncMessage>,
language: String,
server_name: String,
server_command: String,
stdin_writer: Arc<tokio::sync::Mutex<ChildStdin>>,
stderr_log_path: std::path::PathBuf,
shutting_down: Arc<AtomicBool>,
document_versions: Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
config_options: Arc<std::sync::Mutex<Option<Value>>>,
capabilities: Arc<std::sync::Mutex<Option<ServerCapabilities>>>,
) {
tokio::spawn(async move {
tracing::info!("LSP stdout reader task started for {}", language);
loop {
match read_message_from_stdout(&mut stdout).await {
Ok(message) => {
tracing::trace!("Read message from LSP server: {:?}", message);
if let Err(e) = handle_message_dispatch(
message,
&pending,
&async_tx,
&language,
&server_name,
&server_command,
&stdin_writer,
&document_versions,
&config_options,
&capabilities,
)
.await
{
tracing::error!("Error handling LSP message: {}", e);
}
}
Err(e) => {
if shutting_down.load(Ordering::SeqCst) {
tracing::info!(
"LSP stdout reader exiting due to graceful shutdown for {}",
language
);
} else {
tracing::error!("Error reading from LSP server: {}", e);
let _ = async_tx.send(AsyncMessage::LspStatusUpdate {
language: language.clone(),
server_name: server_name.clone(),
status: LspServerStatus::Error,
message: None,
});
let _ = async_tx.send(AsyncMessage::LspError {
language: language.clone(),
error: format!("Read error: {}", e),
stderr_log_path: Some(stderr_log_path.clone()),
});
}
break;
}
}
}
{
let mut pending_guard = pending.lock().unwrap();
let count = pending_guard.len();
if count > 0 {
tracing::info!(
"LSP stdout reader: draining {} pending requests for {}",
count,
language
);
for (id, tx) in pending_guard.drain() {
tracing::debug!(
"LSP stdout reader: failing pending request id={} for {}",
id,
language
);
let _ = tx.send(Err(
"LSP server connection closed while awaiting response".to_string(),
));
}
}
}
tracing::info!("LSP stdout reader task exiting for {}", language);
});
}
#[allow(clippy::let_underscore_must_use)]
async fn run(self, mut command_rx: mpsc::Receiver<LspCommand>) {
tracing::info!("LspTask::run() started for language: {}", self.language);
let stdin_writer = Arc::new(tokio::sync::Mutex::new(self.stdin));
let state = LspState {
stdin: stdin_writer.clone(),
next_id: Arc::new(AtomicI64::new(self.next_id)),
capabilities: Arc::new(Mutex::new(self.capabilities)),
document_versions: self.document_versions.clone(),
pending_opens: Arc::new(Mutex::new(self.pending_opens)),
initialized: Arc::new(AtomicBool::new(self.initialized)),
async_tx: self.async_tx.clone(),
language: Arc::new(self.language.clone()),
server_name: Arc::new(self.server_name.clone()),
active_requests: Arc::new(Mutex::new(HashMap::new())),
language_id_overrides: Arc::new(self.language_id_overrides.clone()),
};
let pending = Arc::new(Mutex::new(self.pending));
let async_tx = state.async_tx.clone();
let language_clone: String = (*state.language).clone();
let server_name: String = (*state.server_name).clone();
let config_options: Arc<std::sync::Mutex<Option<Value>>> =
Arc::new(std::sync::Mutex::new(None));
let shutting_down = Arc::new(AtomicBool::new(false));
Self::spawn_stdout_reader(
self.stdout,
pending.clone(),
async_tx.clone(),
language_clone.clone(),
self.server_name.clone(),
self.server_command.clone(),
stdin_writer.clone(),
self.stderr_log_path,
shutting_down.clone(),
self.document_versions.clone(),
config_options.clone(),
state.capabilities.clone(),
);
macro_rules! await_draining {
($fut:expr, $command_rx:expr, $buf:expr) => {{
let fut = $fut;
tokio::pin!(fut);
loop {
tokio::select! {
biased; result = &mut fut => break result,
Some(cmd) = $command_rx.recv() => {
$buf.push_back(cmd);
}
}
}
}};
}
macro_rules! spawn_request {
($state:expr, $pending:expr, |$s:ident, $p:ident| $body:expr) => {{
let $s = $state.clone();
let $p = $pending.clone();
tokio::spawn(async move {
let _ = $body;
});
}};
}
let mut pending_commands = Vec::new();
let mut draining_buffer: std::collections::VecDeque<LspCommand> =
std::collections::VecDeque::new();
loop {
let cmd = if let Some(cmd) = draining_buffer.pop_front() {
cmd
} else {
match command_rx.recv().await {
Some(cmd) => cmd,
None => {
tracing::info!("Command channel closed");
break;
}
}
};
tracing::trace!("LspTask received command: {:?}", cmd);
let initialized = state.initialized.load(Ordering::SeqCst);
match cmd {
LspCommand::Initialize {
root_uri,
initialization_options,
response,
} => {
let _ = async_tx.send(AsyncMessage::LspStatusUpdate {
language: language_clone.clone(),
server_name: server_name.clone(),
status: LspServerStatus::Initializing,
message: None,
});
tracing::info!("Processing Initialize command");
*config_options.lock().unwrap() = initialization_options.clone();
let result = await_draining!(
state.handle_initialize_sequential(
root_uri,
initialization_options,
&pending
),
command_rx,
draining_buffer
);
let success = result.is_ok();
let _ = response.send(result);
if success {
let queued = std::mem::take(&mut pending_commands);
await_draining!(
state.replay_pending_commands(queued, &pending),
command_rx,
draining_buffer
);
}
}
LspCommand::DidOpen {
uri,
text,
language_id,
} => {
if initialized {
tracing::info!("Processing DidOpen for {}", uri.as_str());
let _ = state
.handle_did_open_sequential(uri, text, language_id, &pending)
.await;
} else {
tracing::trace!(
"Queueing DidOpen for {} until initialization completes",
uri.as_str()
);
pending_commands.push(LspCommand::DidOpen {
uri,
text,
language_id,
});
}
}
LspCommand::DidChange {
uri,
content_changes,
} => {
if initialized {
tracing::trace!("Processing DidChange for {}", uri.as_str());
let _ = state
.handle_did_change_sequential(uri, content_changes, &pending)
.await;
} else {
tracing::trace!(
"Queueing DidChange for {} until initialization completes",
uri.as_str()
);
pending_commands.push(LspCommand::DidChange {
uri,
content_changes,
});
}
}
LspCommand::DidClose { uri } => {
if initialized {
tracing::info!("Processing DidClose for {}", uri.as_str());
let _ = state.handle_did_close(uri).await;
} else {
tracing::trace!(
"Queueing DidClose for {} until initialization completes",
uri.as_str()
);
pending_commands.push(LspCommand::DidClose { uri });
}
}
LspCommand::DidSave { uri, text } => {
if initialized {
tracing::info!("Processing DidSave for {}", uri.as_str());
let _ = state.handle_did_save(uri, text).await;
} else {
tracing::trace!(
"Queueing DidSave for {} until initialization completes",
uri.as_str()
);
pending_commands.push(LspCommand::DidSave { uri, text });
}
}
LspCommand::DidChangeWorkspaceFolders { added, removed } => {
if initialized {
tracing::info!(
"Processing DidChangeWorkspaceFolders: +{} -{}",
added.len(),
removed.len()
);
let _ = state
.send_notification::<lsp_types::notification::DidChangeWorkspaceFolders>(
lsp_types::DidChangeWorkspaceFoldersParams {
event: lsp_types::WorkspaceFoldersChangeEvent {
added,
removed,
},
},
)
.await;
} else {
tracing::trace!(
"Queueing DidChangeWorkspaceFolders until initialization completes"
);
pending_commands
.push(LspCommand::DidChangeWorkspaceFolders { added, removed });
}
}
LspCommand::Completion {
request_id,
uri,
line,
character,
} => {
if initialized {
tracing::info!("Processing Completion request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_completion(request_id, uri, line, character, &p)
.await);
} else {
tracing::trace!("LSP not initialized, sending empty completion");
let _ = state.async_tx.send(AsyncMessage::LspCompletion {
request_id,
items: vec![],
});
}
}
LspCommand::GotoDefinition {
request_id,
uri,
line,
character,
} => {
if initialized {
tracing::info!("Processing GotoDefinition request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_goto_definition(request_id, uri, line, character, &p)
.await);
} else {
tracing::trace!("LSP not initialized, sending empty locations");
let _ = state.async_tx.send(AsyncMessage::LspGotoDefinition {
request_id,
locations: vec![],
});
}
}
LspCommand::Rename {
request_id,
uri,
line,
character,
new_name,
} => {
if initialized {
tracing::info!("Processing Rename request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_rename(request_id, uri, line, character, new_name, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot rename");
let _ = state.async_tx.send(AsyncMessage::LspRename {
request_id,
result: Err("LSP not initialized".to_string()),
});
}
}
LspCommand::Hover {
request_id,
uri,
line,
character,
} => {
if initialized {
tracing::info!("Processing Hover request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_hover(request_id, uri, line, character, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get hover");
let _ = state.async_tx.send(AsyncMessage::LspHover {
request_id,
contents: String::new(),
is_markdown: false,
range: None,
});
}
}
LspCommand::References {
request_id,
uri,
line,
character,
} => {
if initialized {
tracing::info!("Processing References request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_references(request_id, uri, line, character, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get references");
let _ = state.async_tx.send(AsyncMessage::LspReferences {
request_id,
locations: Vec::new(),
});
}
}
LspCommand::SignatureHelp {
request_id,
uri,
line,
character,
} => {
if initialized {
tracing::info!("Processing SignatureHelp request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_signature_help(request_id, uri, line, character, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get signature help");
let _ = state.async_tx.send(AsyncMessage::LspSignatureHelp {
request_id,
signature_help: None,
});
}
}
LspCommand::CodeActions {
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
diagnostics,
} => {
if initialized {
tracing::info!("Processing CodeActions request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_code_actions(
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
diagnostics,
&p,
)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get code actions");
let _ = state.async_tx.send(AsyncMessage::LspCodeActions {
request_id,
actions: Vec::new(),
});
}
}
LspCommand::DocumentDiagnostic {
request_id,
uri,
previous_result_id,
} => {
if initialized {
tracing::info!(
"Processing DocumentDiagnostic request for {}",
uri.as_str()
);
spawn_request!(state, pending, |s, p| s
.handle_document_diagnostic(request_id, uri, previous_result_id, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get document diagnostics");
let _ = state.async_tx.send(AsyncMessage::LspPulledDiagnostics {
request_id,
uri: uri.as_str().to_string(),
result_id: None,
diagnostics: Vec::new(),
unchanged: false,
});
}
}
LspCommand::InlayHints {
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
} => {
if initialized {
tracing::info!("Processing InlayHints request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_inlay_hints(
request_id, uri, start_line, start_char, end_line, end_char, &p,
)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get inlay hints");
let _ = state.async_tx.send(AsyncMessage::LspInlayHints {
request_id,
uri: uri.as_str().to_string(),
hints: Vec::new(),
});
}
}
LspCommand::FoldingRange { request_id, uri } => {
if initialized {
tracing::info!("Processing FoldingRange request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_folding_ranges(request_id, uri, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get folding ranges");
let _ = state.async_tx.send(AsyncMessage::LspFoldingRanges {
request_id,
uri: uri.as_str().to_string(),
ranges: Vec::new(),
});
}
}
LspCommand::SemanticTokensFull { request_id, uri } => {
if initialized {
tracing::info!("Processing SemanticTokens request for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_semantic_tokens_full(request_id, uri, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get semantic tokens");
let _ = state.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::Full(Err(
"LSP not initialized".to_string()
)),
});
}
}
LspCommand::SemanticTokensFullDelta {
request_id,
uri,
previous_result_id,
} => {
if initialized {
tracing::info!(
"Processing SemanticTokens delta request for {}",
uri.as_str()
);
spawn_request!(state, pending, |s, p| s
.handle_semantic_tokens_full_delta(
request_id,
uri,
previous_result_id,
&p,
)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get semantic tokens");
let _ = state.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::FullDelta(Err(
"LSP not initialized".to_string(),
)),
});
}
}
LspCommand::SemanticTokensRange {
request_id,
uri,
range,
} => {
if initialized {
tracing::info!(
"Processing SemanticTokens range request for {}",
uri.as_str()
);
spawn_request!(state, pending, |s, p| s
.handle_semantic_tokens_range(request_id, uri, range, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot get semantic tokens");
let _ = state.async_tx.send(AsyncMessage::LspSemanticTokens {
request_id,
uri: uri.as_str().to_string(),
response: LspSemanticTokensResponse::Range(Err(
"LSP not initialized".to_string()
)),
});
}
}
LspCommand::ExecuteCommand { command, arguments } => {
if initialized {
tracing::info!("Processing ExecuteCommand: {}", command);
spawn_request!(state, pending, |s, p| s
.handle_execute_command(command, arguments, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot execute command");
}
}
LspCommand::CodeActionResolve { request_id, action } => {
if initialized {
tracing::info!("Processing CodeActionResolve (request_id={})", request_id);
spawn_request!(state, pending, |s, p| s
.handle_code_action_resolve(request_id, *action, &p)
.await);
} else {
tracing::trace!("LSP not initialized, cannot resolve code action");
let _ = state.async_tx.send(AsyncMessage::LspCodeActionResolved {
request_id,
action: Err("LSP not initialized".to_string()),
});
}
}
LspCommand::CompletionResolve { request_id, item } => {
if initialized {
spawn_request!(state, pending, |s, p| s
.handle_completion_resolve(request_id, *item, &p)
.await);
}
}
LspCommand::DocumentFormatting {
request_id,
uri,
tab_size,
insert_spaces,
} => {
if initialized {
tracing::info!("Processing DocumentFormatting for {}", uri.as_str());
spawn_request!(state, pending, |s, p| s
.handle_document_formatting(
request_id,
uri,
tab_size,
insert_spaces,
&p,
)
.await);
}
}
LspCommand::DocumentRangeFormatting {
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
tab_size,
insert_spaces,
} => {
if initialized {
spawn_request!(state, pending, |s, p| s
.handle_document_range_formatting(
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
tab_size,
insert_spaces,
&p,
)
.await);
}
}
LspCommand::PrepareRename {
request_id,
uri,
line,
character,
} => {
if initialized {
spawn_request!(state, pending, |s, p| s
.handle_prepare_rename(request_id, uri, line, character, &p)
.await);
}
}
LspCommand::CancelRequest { request_id } => {
tracing::info!("Processing CancelRequest for editor_id={}", request_id);
let _ = state.handle_cancel_request(request_id).await;
}
LspCommand::PluginRequest {
request_id,
method,
params,
} => {
if initialized {
tracing::trace!("Processing plugin request {} ({})", request_id, method);
spawn_request!(state, pending, |s, p| s
.handle_plugin_request(request_id, method, params, &p)
.await);
} else {
tracing::trace!(
"Plugin LSP request {} received before initialization",
request_id
);
let _ = state.async_tx.send(AsyncMessage::PluginLspResponse {
language: language_clone.clone(),
request_id,
result: Err("LSP not initialized".to_string()),
});
}
}
LspCommand::Shutdown => {
tracing::info!("Processing Shutdown command");
shutting_down.store(true, Ordering::SeqCst);
let _ = state.handle_shutdown().await;
break;
}
}
}
tracing::info!("LSP task exiting for language: {}", self.language);
}
}
async fn read_message_from_stdout(
stdout: &mut BufReader<ChildStdout>,
) -> Result<JsonRpcMessage, String> {
let mut content_length: Option<usize> = None;
loop {
let mut line = String::new();
let bytes_read = stdout
.read_line(&mut line)
.await
.map_err(|e| format!("Failed to read from stdout: {}", e))?;
if bytes_read == 0 {
return Err("LSP server closed stdout (EOF)".to_string());
}
if line == "\r\n" {
break;
}
if let Some(len_str) = line.strip_prefix("Content-Length: ") {
content_length = Some(
len_str
.trim()
.parse()
.map_err(|e| format!("Invalid Content-Length: {}", e))?,
);
}
}
let content_length =
content_length.ok_or_else(|| "Missing Content-Length header".to_string())?;
let mut content = vec![0u8; content_length];
stdout
.read_exact(&mut content)
.await
.map_err(|e| format!("Failed to read content: {}", e))?;
let json = String::from_utf8(content).map_err(|e| format!("Invalid UTF-8: {}", e))?;
tracing::trace!("Received LSP message: {}", json);
serde_json::from_str(&json).map_err(|e| format!("Failed to deserialize message: {}", e))
}
fn registrations_from_params(params: Option<&Value>) -> Vec<(String, Option<Value>)> {
params
.and_then(|p| serde_json::from_value::<lsp_types::RegistrationParams>(p.clone()).ok())
.map(|rp| {
rp.registrations
.into_iter()
.map(|r| (r.method, r.register_options))
.collect()
})
.unwrap_or_default()
}
fn unregistrations_from_params(params: Option<&Value>) -> Vec<String> {
params
.and_then(|p| serde_json::from_value::<lsp_types::UnregistrationParams>(p.clone()).ok())
.map(|up| up.unregisterations.into_iter().map(|u| u.method).collect())
.unwrap_or_default()
}
fn sync_raw_capabilities(
capabilities: &Arc<std::sync::Mutex<Option<ServerCapabilities>>>,
registrations: &[(String, Option<Value>)],
register: bool,
) {
use lsp_types::{DiagnosticOptions, DiagnosticServerCapabilities};
if !registrations
.iter()
.any(|(method, _)| method == "textDocument/diagnostic")
{
return;
}
let mut guard = capabilities.lock().unwrap();
let caps = guard.get_or_insert_with(ServerCapabilities::default);
for (method, options) in registrations {
if method == "textDocument/diagnostic" {
caps.diagnostic_provider = register.then(|| {
let opts = options
.as_ref()
.and_then(|o| serde_json::from_value::<DiagnosticOptions>(o.clone()).ok())
.unwrap_or_default();
DiagnosticServerCapabilities::Options(opts)
});
}
}
}
fn resolve_workspace_configuration(
items: &[Value],
init_options: Option<&Value>,
server_command: &str,
) -> Vec<Value> {
if items.is_empty() {
return vec![resolve_configuration_section(
None,
init_options,
server_command,
)];
}
items
.iter()
.map(|item| {
let section = item
.get("section")
.and_then(Value::as_str)
.filter(|s| !s.is_empty());
resolve_configuration_section(section, init_options, server_command)
})
.collect()
}
fn resolve_configuration_section(
section: Option<&str>,
init_options: Option<&Value>,
server_command: &str,
) -> Value {
if let Some(options) = init_options {
match section {
Some(section) => {
let mut current = options;
let mut resolved = true;
for part in section.split('.') {
match current.get(part) {
Some(next) => current = next,
None => {
resolved = false;
break;
}
}
}
if resolved {
return current.clone();
}
}
None => return options.clone(),
}
}
default_configuration_section(server_command)
}
fn default_configuration_section(server_command: &str) -> Value {
if server_command_is_rust_analyzer(server_command) {
serde_json::json!({
"inlayHints": {
"typeHints": { "enable": true },
"parameterHints": { "enable": true },
"chainingHints": { "enable": true },
"closureReturnTypeHints": { "enable": "always" }
}
})
} else {
Value::Null
}
}
fn server_command_is_rust_analyzer(server_command: &str) -> bool {
std::path::Path::new(server_command)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or(server_command)
.contains("rust-analyzer")
}
fn null_response(id: i64) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(Value::Null),
error: None,
}
}
fn parse_window_message(
params: Option<Value>,
default_type: i64,
) -> Option<(LspMessageType, String)> {
let msg = serde_json::from_value::<serde_json::Map<String, Value>>(params?).ok()?;
let type_num = msg
.get("type")
.and_then(|v| v.as_i64())
.unwrap_or(default_type);
let message = msg
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("(no message)")
.to_string();
let message_type = match type_num {
1 => LspMessageType::Error,
2 => LspMessageType::Warning,
3 => LspMessageType::Info,
_ => LspMessageType::Log,
};
Some((message_type, message))
}
fn log_lsp_message(message_type: LspMessageType, language: &str, message: &str) {
match message_type {
LspMessageType::Error => tracing::error!("LSP ({}): {}", language, message),
LspMessageType::Warning => tracing::warn!("LSP ({}): {}", language, message),
LspMessageType::Info => tracing::info!("LSP ({}): {}", language, message),
LspMessageType::Log => tracing::trace!("LSP ({}): {}", language, message),
}
}
fn parse_progress_notification(
params: Option<Value>,
language: &str,
) -> Option<(String, LspProgressValue)> {
let progress = serde_json::from_value::<serde_json::Map<String, Value>>(params?).ok()?;
let token = progress
.get("token")
.and_then(|v| {
v.as_str()
.map(|s| s.to_string())
.or_else(|| v.as_i64().map(|n| n.to_string()))
})
.unwrap_or_else(|| "unknown".to_string());
let value_obj = progress.get("value").and_then(|v| v.as_object())?;
let kind = value_obj.get("kind").and_then(|v| v.as_str());
let value = match kind {
Some("begin") => {
let title = value_obj
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("Working...")
.to_string();
let message = value_obj
.get("message")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let percentage = value_obj
.get("percentage")
.and_then(|v| v.as_u64())
.map(|p| p as u32);
tracing::info!(
"LSP ({}) progress begin: {} {:?} {:?}",
language,
title,
message,
percentage
);
LspProgressValue::Begin {
title,
message,
percentage,
}
}
Some("report") => {
let message = value_obj
.get("message")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let percentage = value_obj
.get("percentage")
.and_then(|v| v.as_u64())
.map(|p| p as u32);
tracing::trace!(
"LSP ({}) progress report: {:?} {:?}",
language,
message,
percentage
);
LspProgressValue::Report {
message,
percentage,
}
}
Some("end") => {
let message = value_obj
.get("message")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
tracing::info!("LSP ({}) progress end: {:?}", language, message);
LspProgressValue::End { message }
}
_ => return None,
};
Some((token, value))
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::let_underscore_must_use)] async fn handle_message_dispatch(
message: JsonRpcMessage,
pending: &PendingRequests,
async_tx: &std_mpsc::Sender<AsyncMessage>,
language: &str,
server_name: &str,
server_command: &str,
stdin_writer: &Arc<tokio::sync::Mutex<ChildStdin>>,
document_versions: &Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
config_options: &Arc<std::sync::Mutex<Option<Value>>>,
capabilities: &Arc<std::sync::Mutex<Option<ServerCapabilities>>>,
) -> Result<(), String> {
match message {
JsonRpcMessage::Response(response) => {
tracing::trace!("Received LSP response for request id={}", response.id);
if let Some(tx) = pending.lock().unwrap().remove(&response.id) {
let result = if let Some(error) = response.error {
log_response_error(error.code, &error.message, server_name, language);
Err(format!(
"LSP error from '{}' ({}): {} (code {})",
server_name, language, error.message, error.code
))
} else {
tracing::trace!(
"LSP response success from '{}' ({}) for request id={}",
server_name,
language,
response.id
);
Ok(response.result.unwrap_or(serde_json::Value::Null))
};
let _ = tx.send(result);
} else {
tracing::warn!(
"Received LSP response from '{}' ({}) for unknown request id={}",
server_name,
language,
response.id
);
}
}
JsonRpcMessage::Notification(notification) => {
tracing::trace!("Received LSP notification: {}", notification.method);
handle_notification_dispatch(
notification,
async_tx,
language,
server_name,
document_versions,
)
.await?;
}
JsonRpcMessage::Request(request) => {
tracing::trace!("Received request from server: {}", request.method);
let response = match request.method.as_str() {
"window/workDoneProgress/create" => {
tracing::trace!("Acknowledging workDoneProgress/create (id={})", request.id);
null_response(request.id)
}
"workspace/configuration" => {
tracing::trace!(
"Responding to workspace/configuration for {}",
server_command
);
let empty = Vec::new();
let items = request
.params
.as_ref()
.and_then(|p| p.get("items"))
.and_then(|items| items.as_array())
.unwrap_or(&empty);
let stored = config_options.lock().unwrap().clone();
let configs =
resolve_workspace_configuration(items, stored.as_ref(), server_command);
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(Value::Array(configs)),
error: None,
}
}
"client/registerCapability" => {
let registrations = registrations_from_params(request.params.as_ref());
tracing::debug!(
"client/registerCapability (id={}) registering {} method(s): {:?}",
request.id,
registrations.len(),
registrations.iter().map(|(m, _)| m).collect::<Vec<_>>()
);
if !registrations.is_empty() {
sync_raw_capabilities(capabilities, ®istrations, true);
let _ = async_tx.send(AsyncMessage::LspDynamicCapabilities {
language: language.to_string(),
server_name: server_name.to_string(),
register: true,
registrations,
});
}
null_response(request.id)
}
"client/unregisterCapability" => {
let methods = unregistrations_from_params(request.params.as_ref());
tracing::debug!(
"client/unregisterCapability (id={}) unregistering {} method(s): {:?}",
request.id,
methods.len(),
methods
);
if !methods.is_empty() {
let registrations: Vec<(String, Option<Value>)> =
methods.into_iter().map(|m| (m, None)).collect();
sync_raw_capabilities(capabilities, ®istrations, false);
let _ = async_tx.send(AsyncMessage::LspDynamicCapabilities {
language: language.to_string(),
server_name: server_name.to_string(),
register: false,
registrations,
});
}
null_response(request.id)
}
"workspace/diagnostic/refresh" => {
tracing::info!(
"LSP ({}) requested diagnostic refresh (workspace/diagnostic/refresh)",
language
);
let _ = async_tx.send(AsyncMessage::LspDiagnosticRefresh {
language: language.to_string(),
});
null_response(request.id)
}
"workspace/inlayHint/refresh" => {
tracing::info!(
"LSP ({}) requested inlay-hint refresh (workspace/inlayHint/refresh)",
language
);
let _ = async_tx.send(AsyncMessage::LspInlayHintRefresh {
language: language.to_string(),
});
null_response(request.id)
}
"workspace/semanticTokens/refresh" => {
tracing::info!(
"LSP ({}) requested semantic-tokens refresh (workspace/semanticTokens/refresh)",
language
);
let _ = async_tx.send(AsyncMessage::LspSemanticTokensRefresh {
language: language.to_string(),
});
null_response(request.id)
}
"workspace/applyEdit" => {
tracing::info!("LSP ({}) received workspace/applyEdit request", language);
let applied = if let Some(params) = &request.params {
match serde_json::from_value::<lsp_types::ApplyWorkspaceEditParams>(
params.clone(),
) {
Ok(apply_params) => {
let label = apply_params.label.clone();
let _ = async_tx.send(AsyncMessage::LspApplyEdit {
edit: apply_params.edit,
label,
});
true
}
Err(e) => {
tracing::error!(
"Failed to parse workspace/applyEdit params: {}",
e
);
false
}
}
} else {
false
};
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(serde_json::json!({ "applied": applied })),
error: None,
}
}
_ => {
tracing::debug!("Server request for plugins: {}", request.method);
let _ = async_tx.send(AsyncMessage::LspServerRequest {
language: language.to_string(),
server_command: server_command.to_string(),
method: request.method.clone(),
params: request.params.clone(),
});
null_response(request.id)
}
};
let json = serde_json::to_string(&response)
.map_err(|e| format!("Failed to serialize response: {}", e))?;
let message = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);
let mut stdin = stdin_writer.lock().await;
use tokio::io::AsyncWriteExt;
if let Err(e) = stdin.write_all(message.as_bytes()).await {
tracing::error!("Failed to write server response: {}", e);
}
if let Err(e) = stdin.flush().await {
tracing::error!("Failed to flush server response: {}", e);
}
tracing::trace!("Sent response to server request id={}", response.id);
}
}
Ok(())
}
#[allow(clippy::let_underscore_must_use)] async fn handle_notification_dispatch(
notification: JsonRpcNotification,
async_tx: &std_mpsc::Sender<AsyncMessage>,
language: &str,
server_name: &str,
document_versions: &Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
) -> Result<(), String> {
match notification.method.as_str() {
PublishDiagnostics::METHOD => {
if let Some(params) = notification.params {
let params: PublishDiagnosticsParams = serde_json::from_value(params)
.map_err(|e| format!("Failed to deserialize diagnostics: {}", e))?;
if let Some(diag_version) = params.version {
let path = PathBuf::from(params.uri.path().as_str());
let current_version = document_versions.lock().unwrap().get(&path).copied();
if let Some(current) = current_version {
if (diag_version as i64) < current {
tracing::debug!(
"LSP ({}): dropping stale diagnostics for {} (diag version {} < current {})",
language,
params.uri.as_str(),
diag_version,
current
);
return Ok(());
}
}
}
tracing::trace!(
"Received {} diagnostics for {}",
params.diagnostics.len(),
params.uri.as_str()
);
let _ = async_tx.send(AsyncMessage::LspDiagnostics {
uri: params.uri.to_string(),
diagnostics: params.diagnostics,
server_name: server_name.to_string(),
});
}
}
"window/showMessage" => {
if let Some((message_type, message)) = parse_window_message(notification.params, 3) {
log_lsp_message(message_type, language, &message);
let _ = async_tx.send(AsyncMessage::LspWindowMessage {
language: language.to_string(),
message_type,
message,
});
}
}
"window/logMessage" => {
if let Some((message_type, message)) = parse_window_message(notification.params, 4) {
log_lsp_message(message_type, language, &message);
let _ = async_tx.send(AsyncMessage::LspLogMessage {
language: language.to_string(),
message_type,
message,
});
}
}
"$/progress" => {
if let Some((token, value)) = parse_progress_notification(notification.params, language)
{
let _ = async_tx.send(AsyncMessage::LspProgress {
language: language.to_string(),
token,
value,
});
}
}
"experimental/serverStatus" => {
if let Some(params) = notification.params {
if let Ok(status) = serde_json::from_value::<serde_json::Map<String, Value>>(params)
{
let quiescent = status
.get("quiescent")
.and_then(|v| v.as_bool())
.unwrap_or(false);
tracing::info!("LSP ({}) server status: quiescent={}", language, quiescent);
if quiescent {
let _ = async_tx.send(AsyncMessage::LspServerQuiescent {
language: language.to_string(),
});
}
}
}
}
_ => {
tracing::debug!("Unhandled notification: {}", notification.method);
}
}
Ok(())
}
static NEXT_HANDLE_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
pub struct LspHandle {
id: u64,
scope: crate::services::lsp::manager::LanguageScope,
command_tx: mpsc::Sender<LspCommand>,
state: Arc<Mutex<LspClientState>>,
runtime: tokio::runtime::Handle,
document_versions: Arc<std::sync::Mutex<HashMap<PathBuf, i64>>>,
}
#[allow(clippy::let_underscore_must_use)]
impl LspHandle {
#[allow(clippy::too_many_arguments)]
pub fn spawn(
runtime: &tokio::runtime::Handle,
command: &str,
args: &[String],
env: std::collections::HashMap<String, String>,
scope: crate::services::lsp::manager::LanguageScope,
server_name: String,
async_bridge: &AsyncBridge,
process_limits: ProcessLimits,
language_id_overrides: std::collections::HashMap<String, String>,
long_running_spawner: Arc<dyn crate::services::remote::LongRunningSpawner>,
) -> Result<Self, String> {
let (command_tx, command_rx) = mpsc::channel(100); let async_tx = async_bridge.sender();
let language_label = scope.label().to_string();
let language_clone = language_label.clone();
let server_name_clone = server_name.clone();
let command = command.to_string();
let args = args.to_vec();
let state = Arc::new(Mutex::new(LspClientState::Starting));
let stderr_log_path = crate::services::log_dirs::lsp_log_path(&language_label);
let _ = async_tx.send(AsyncMessage::LspStatusUpdate {
language: language_label.clone(),
server_name: server_name_clone.clone(),
status: LspServerStatus::Starting,
message: None,
});
let document_versions: Arc<std::sync::Mutex<HashMap<PathBuf, i64>>> =
Arc::new(std::sync::Mutex::new(HashMap::new()));
let document_versions_for_task = document_versions.clone();
let state_clone = state.clone();
let stderr_log_path_clone = stderr_log_path.clone();
runtime.spawn(async move {
match LspTask::spawn(
&command,
&args,
&env,
language_clone.clone(),
server_name_clone.clone(),
async_tx.clone(),
&process_limits,
stderr_log_path_clone.clone(),
language_id_overrides,
document_versions_for_task,
long_running_spawner,
)
.await
{
Ok(task) => {
task.run(command_rx).await;
}
Err(e) => {
tracing::error!("Failed to spawn LSP task: {}", e);
let stub = format!(
"[fresh] LSP server '{}' for {} failed to spawn:\n {}\n\n\
Configured command: {} {}\n",
server_name_clone,
language_clone,
e,
command,
args.join(" "),
);
if let Err(write_err) = std::fs::write(&stderr_log_path_clone, stub.as_bytes())
{
tracing::warn!(
"Failed to write LSP failure-stub log for {}: {}",
language_clone,
write_err,
);
}
if let Ok(mut s) = state_clone.lock() {
let _ = s.transition_to(LspClientState::Error);
}
let _ = async_tx.send(AsyncMessage::LspStatusUpdate {
language: language_clone.clone(),
server_name: server_name_clone.clone(),
status: LspServerStatus::Error,
message: None,
});
let _ = async_tx.send(AsyncMessage::LspError {
language: language_clone,
error: e,
stderr_log_path: Some(stderr_log_path_clone),
});
}
}
});
let id = NEXT_HANDLE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(Self {
id,
scope,
command_tx,
state,
runtime: runtime.clone(),
document_versions,
})
}
pub fn id(&self) -> u64 {
self.id
}
pub fn scope(&self) -> &crate::services::lsp::manager::LanguageScope {
&self.scope
}
pub fn document_version(&self, path: &std::path::Path) -> Option<i64> {
self.document_versions
.lock()
.ok()
.and_then(|versions| versions.get(path).copied())
}
pub fn initialize(
&self,
root_uri: Option<Uri>,
initialization_options: Option<Value>,
) -> Result<(), String> {
{
let mut state = self.state.lock().unwrap();
if !state.can_initialize() {
return Err(format!(
"Cannot initialize: client is in state {:?}",
*state
));
}
state.transition_to(LspClientState::Initializing)?;
}
let state = self.state.clone();
let (tx, rx) = oneshot::channel();
self.command_tx
.try_send(LspCommand::Initialize {
root_uri,
initialization_options,
response: tx,
})
.map_err(|_| "Failed to send initialize command".to_string())?;
let runtime = self.runtime.clone();
runtime.spawn(async move {
match tokio::time::timeout(std::time::Duration::from_secs(60), rx).await {
Ok(Ok(Ok(_))) => {
if let Ok(mut s) = state.lock() {
let _ = s.transition_to(LspClientState::Running);
}
tracing::info!("LSP initialization completed successfully");
}
Ok(Ok(Err(e))) => {
tracing::error!("LSP initialization failed: {}", e);
if let Ok(mut s) = state.lock() {
let _ = s.transition_to(LspClientState::Error);
}
}
Ok(Err(_)) => {
tracing::error!("LSP initialization response channel closed");
if let Ok(mut s) = state.lock() {
let _ = s.transition_to(LspClientState::Error);
}
}
Err(_) => {
tracing::error!("LSP initialization timed out after 60 seconds");
if let Ok(mut s) = state.lock() {
let _ = s.transition_to(LspClientState::Error);
}
}
}
});
Ok(())
}
pub fn is_initialized(&self) -> bool {
self.state.lock().unwrap().can_send_requests()
}
pub fn state(&self) -> LspClientState {
*self.state.lock().unwrap()
}
pub fn did_open(&self, uri: Uri, text: String, language_id: String) -> Result<(), String> {
if !self.scope.accepts(&language_id) {
tracing::warn!(
"did_open: document language '{}' not accepted by LSP handle (serves {:?}) for {}",
language_id,
self.scope,
uri.as_str()
);
return Err(format!(
"Language mismatch: document is '{}' but LSP serves {:?}",
language_id, self.scope
));
}
self.command_tx
.try_send(LspCommand::DidOpen {
uri,
text,
language_id,
})
.map_err(|_| "Failed to send did_open command".to_string())
}
pub fn did_change(
&self,
uri: Uri,
content_changes: Vec<TextDocumentContentChangeEvent>,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::DidChange {
uri,
content_changes,
})
.map_err(|_| "Failed to send did_change command".to_string())
}
pub fn did_close(&self, uri: Uri) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::DidClose { uri })
.map_err(|_| "Failed to send did_close command".to_string())
}
pub fn did_save(&self, uri: Uri, text: Option<String>) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::DidSave { uri, text })
.map_err(|_| "Failed to send did_save command".to_string())
}
pub fn add_workspace_folder(&self, uri: lsp_types::Uri, name: String) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::DidChangeWorkspaceFolders {
added: vec![lsp_types::WorkspaceFolder { uri, name }],
removed: vec![],
})
.map_err(|_| "Failed to send workspace folder change".to_string())
}
pub fn completion(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::Completion {
request_id,
uri,
line,
character,
})
.map_err(|_| "Failed to send completion command".to_string())
}
pub fn goto_definition(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::GotoDefinition {
request_id,
uri,
line,
character,
})
.map_err(|_| "Failed to send goto_definition command".to_string())
}
pub fn rename(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
new_name: String,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::Rename {
request_id,
uri,
line,
character,
new_name,
})
.map_err(|_| "Failed to send rename command".to_string())
}
pub fn hover(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::Hover {
request_id,
uri,
line,
character,
})
.map_err(|_| "Failed to send hover command".to_string())
}
pub fn references(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::References {
request_id,
uri,
line,
character,
})
.map_err(|_| "Failed to send references command".to_string())
}
pub fn signature_help(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::SignatureHelp {
request_id,
uri,
line,
character,
})
.map_err(|_| "Failed to send signature_help command".to_string())
}
#[allow(clippy::too_many_arguments)]
pub fn code_actions(
&self,
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
diagnostics: Vec<lsp_types::Diagnostic>,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::CodeActions {
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
diagnostics,
})
.map_err(|_| "Failed to send code_actions command".to_string())
}
pub fn execute_command(
&self,
command: String,
arguments: Option<Vec<Value>>,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::ExecuteCommand { command, arguments })
.map_err(|_| "Failed to send execute_command command".to_string())
}
pub fn code_action_resolve(
&self,
request_id: u64,
action: lsp_types::CodeAction,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::CodeActionResolve {
request_id,
action: Box::new(action),
})
.map_err(|_| "Failed to send code_action_resolve command".to_string())
}
pub fn completion_resolve(
&self,
request_id: u64,
item: lsp_types::CompletionItem,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::CompletionResolve {
request_id,
item: Box::new(item),
})
.map_err(|_| "Failed to send completion_resolve command".to_string())
}
pub fn document_formatting(
&self,
request_id: u64,
uri: Uri,
tab_size: u32,
insert_spaces: bool,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::DocumentFormatting {
request_id,
uri,
tab_size,
insert_spaces,
})
.map_err(|_| "Failed to send document_formatting command".to_string())
}
#[allow(clippy::too_many_arguments)]
pub fn document_range_formatting(
&self,
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
tab_size: u32,
insert_spaces: bool,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::DocumentRangeFormatting {
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
tab_size,
insert_spaces,
})
.map_err(|_| "Failed to send document_range_formatting command".to_string())
}
pub fn prepare_rename(
&self,
request_id: u64,
uri: Uri,
line: u32,
character: u32,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::PrepareRename {
request_id,
uri,
line,
character,
})
.map_err(|_| "Failed to send prepare_rename command".to_string())
}
pub fn document_diagnostic(
&self,
request_id: u64,
uri: Uri,
previous_result_id: Option<String>,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::DocumentDiagnostic {
request_id,
uri,
previous_result_id,
})
.map_err(|_| "Failed to send document_diagnostic command".to_string())
}
pub fn inlay_hints(
&self,
request_id: u64,
uri: Uri,
start_line: u32,
start_char: u32,
end_line: u32,
end_char: u32,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::InlayHints {
request_id,
uri,
start_line,
start_char,
end_line,
end_char,
})
.map_err(|_| "Failed to send inlay_hints command".to_string())
}
pub fn folding_ranges(&self, request_id: u64, uri: Uri) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::FoldingRange { request_id, uri })
.map_err(|_| "Failed to send folding_range command".to_string())
}
pub fn semantic_tokens_full(&self, request_id: u64, uri: Uri) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::SemanticTokensFull { request_id, uri })
.map_err(|_| "Failed to send semantic_tokens command".to_string())
}
pub fn semantic_tokens_full_delta(
&self,
request_id: u64,
uri: Uri,
previous_result_id: String,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::SemanticTokensFullDelta {
request_id,
uri,
previous_result_id,
})
.map_err(|_| "Failed to send semantic_tokens delta command".to_string())
}
pub fn semantic_tokens_range(
&self,
request_id: u64,
uri: Uri,
range: lsp_types::Range,
) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::SemanticTokensRange {
request_id,
uri,
range,
})
.map_err(|_| "Failed to send semantic_tokens_range command".to_string())
}
pub fn cancel_request(&self, request_id: u64) -> Result<(), String> {
self.command_tx
.try_send(LspCommand::CancelRequest { request_id })
.map_err(|_| "Failed to send cancel_request command".to_string())
}
pub fn send_plugin_request(
&self,
request_id: u64,
method: String,
params: Option<Value>,
) -> Result<(), String> {
tracing::trace!(
"LspHandle sending plugin request {}: method={}",
request_id,
method
);
match self.command_tx.try_send(LspCommand::PluginRequest {
request_id,
method,
params,
}) {
Ok(()) => {
tracing::trace!(
"LspHandle enqueued plugin request {} successfully",
request_id
);
Ok(())
}
Err(e) => {
tracing::error!("Failed to enqueue plugin request {}: {}", request_id, e);
Err("Failed to send plugin LSP request".to_string())
}
}
}
pub fn shutdown(&self) -> Result<(), String> {
{
let mut state = self.state.lock().unwrap();
if let Err(e) = state.transition_to(LspClientState::Stopping) {
tracing::warn!("State transition warning during shutdown: {}", e);
}
}
self.command_tx
.try_send(LspCommand::Shutdown)
.map_err(|_| "Failed to send shutdown command".to_string())?;
{
let mut state = self.state.lock().unwrap();
let _ = state.transition_to(LspClientState::Stopped);
}
Ok(())
}
}
#[allow(clippy::let_underscore_must_use)] impl Drop for LspHandle {
fn drop(&mut self) {
let _ = self.command_tx.try_send(LspCommand::Shutdown);
if let Ok(mut state) = self.state.lock() {
let _ = state.transition_to(LspClientState::Stopped);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::services::lsp::manager::LanguageScope;
use crate::services::remote::LocalLongRunningSpawner;
fn config_item(section: &str) -> Value {
serde_json::json!({ "section": section })
}
#[test]
fn workspace_configuration_resolves_section_from_init_options() {
let opts = serde_json::json!({
"harper-ls": { "linters": { "SpellCheck": false } }
});
let configs =
resolve_workspace_configuration(&[config_item("harper-ls")], Some(&opts), "harper-ls");
assert_eq!(
configs,
vec![serde_json::json!({ "linters": { "SpellCheck": false } })]
);
}
#[test]
fn workspace_configuration_resolves_dotted_section() {
let opts = serde_json::json!({ "a": { "b": { "c": 1 } } });
let configs =
resolve_workspace_configuration(&[config_item("a.b")], Some(&opts), "some-ls");
assert_eq!(configs, vec![serde_json::json!({ "c": 1 })]);
}
#[test]
fn workspace_configuration_unknown_section_is_null_for_non_rust() {
let opts = serde_json::json!({ "harper-ls": { "linters": {} } });
let configs =
resolve_workspace_configuration(&[config_item("marksman")], Some(&opts), "marksman");
assert_eq!(configs, vec![Value::Null]);
}
#[test]
fn workspace_configuration_rust_analyzer_default_enables_inlay_hints() {
for command in [
"rust-analyzer",
"/usr/local/bin/rust-analyzer",
"custom-rust-analyzer",
] {
let configs =
resolve_workspace_configuration(&[config_item("rust-analyzer")], None, command);
assert_eq!(configs.len(), 1);
assert_eq!(
configs[0]["inlayHints"]["typeHints"]["enable"], true,
"{command}"
);
}
}
#[test]
fn workspace_configuration_non_rust_without_options_is_null() {
let configs =
resolve_workspace_configuration(&[config_item("harper-ls")], None, "harper-ls");
assert_eq!(configs, vec![Value::Null]);
}
#[test]
fn workspace_configuration_one_response_per_item() {
let opts = serde_json::json!({ "a": 1, "b": 2 });
let configs = resolve_workspace_configuration(
&[config_item("a"), config_item("b"), config_item("missing")],
Some(&opts),
"some-ls",
);
assert_eq!(
configs,
vec![serde_json::json!(1), serde_json::json!(2), Value::Null]
);
}
#[test]
fn workspace_configuration_no_items_returns_whole_object() {
let opts = serde_json::json!({ "linters": { "SpellCheck": false } });
let configs = resolve_workspace_configuration(&[], Some(&opts), "harper-ls");
assert_eq!(configs, vec![opts]);
}
fn local_spawner() -> Arc<dyn crate::services::remote::LongRunningSpawner> {
Arc::new(LocalLongRunningSpawner::new(
Arc::new(crate::services::env_provider::EnvProvider::inactive()),
Arc::new(crate::services::workspace_trust::WorkspaceTrust::permissive()),
))
}
#[test]
fn test_json_rpc_request_serialization() {
let request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: 1,
method: "initialize".to_string(),
params: Some(serde_json::json!({"rootUri": "file:///test"})),
};
let json = serde_json::to_string(&request).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"id\":1"));
assert!(json.contains("\"method\":\"initialize\""));
assert!(json.contains("\"rootUri\":\"file:///test\""));
}
#[test]
fn test_json_rpc_response_serialization() {
let response = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: 1,
result: Some(serde_json::json!({"success": true})),
error: None,
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"id\":1"));
assert!(json.contains("\"success\":true"));
assert!(!json.contains("\"error\""));
}
#[test]
fn code_action_capability_advertises_literal_support() {
let caps = create_client_capabilities();
let code_action = caps
.text_document
.as_ref()
.and_then(|td| td.code_action.as_ref())
.expect("code_action capability must be set");
let literal = code_action
.code_action_literal_support
.as_ref()
.expect("codeActionLiteralSupport must be advertised");
let kinds = &literal.code_action_kind.value_set;
for required in [
"",
"quickfix",
"refactor",
"refactor.extract",
"refactor.inline",
"refactor.rewrite",
"source",
"source.organizeImports",
] {
assert!(
kinds.iter().any(|k| k == required),
"expected codeActionKind value_set to include {required:?}, got {kinds:?}",
);
}
}
#[test]
fn advertises_dynamic_registration_on_honored_capabilities() {
let caps = create_client_capabilities();
let td = caps
.text_document
.as_ref()
.expect("text_document capabilities must be set");
assert_eq!(
td.inlay_hint.as_ref().and_then(|c| c.dynamic_registration),
Some(true),
"inlay_hint must advertise dynamicRegistration"
);
assert_eq!(
td.completion.as_ref().and_then(|c| c.dynamic_registration),
Some(true),
"completion must advertise dynamicRegistration"
);
assert_eq!(
td.formatting.as_ref().and_then(|c| c.dynamic_registration),
Some(true),
"formatting must advertise dynamicRegistration"
);
assert_eq!(
td.document_symbol
.as_ref()
.and_then(|c| c.dynamic_registration),
Some(true),
"document_symbol must advertise dynamicRegistration"
);
assert_eq!(
caps.workspace
.as_ref()
.and_then(|w| w.symbol.as_ref())
.and_then(|s| s.dynamic_registration),
Some(true),
"workspace.symbol must advertise dynamicRegistration"
);
}
#[test]
fn advertises_inlay_hint_and_semantic_tokens_refresh_support() {
let caps = create_client_capabilities();
let workspace = caps.workspace.as_ref().expect("workspace caps must be set");
assert_eq!(
workspace
.inlay_hint
.as_ref()
.and_then(|c| c.refresh_support),
Some(true),
"workspace.inlayHint.refreshSupport must be advertised"
);
assert_eq!(
workspace
.semantic_tokens
.as_ref()
.and_then(|c| c.refresh_support),
Some(true),
"workspace.semanticTokens.refreshSupport must be advertised"
);
}
#[test]
fn sync_raw_capabilities_mirrors_dynamic_diagnostic_provider() {
let caps: Arc<std::sync::Mutex<Option<ServerCapabilities>>> =
Arc::new(std::sync::Mutex::new(Some(ServerCapabilities::default())));
assert!(caps
.lock()
.unwrap()
.as_ref()
.unwrap()
.diagnostic_provider
.is_none());
sync_raw_capabilities(
&caps,
&[("textDocument/diagnostic".to_string(), None)],
true,
);
assert!(
caps.lock()
.unwrap()
.as_ref()
.unwrap()
.diagnostic_provider
.is_some(),
"dynamic diagnostic registration must set diagnostic_provider so pulls aren't skipped"
);
sync_raw_capabilities(
&caps,
&[("textDocument/diagnostic".to_string(), None)],
false,
);
assert!(
caps.lock()
.unwrap()
.as_ref()
.unwrap()
.diagnostic_provider
.is_none(),
"unregister must clear diagnostic_provider"
);
}
#[test]
fn sync_raw_capabilities_ignores_non_diagnostic_methods() {
let caps: Arc<std::sync::Mutex<Option<ServerCapabilities>>> =
Arc::new(std::sync::Mutex::new(None));
sync_raw_capabilities(&caps, &[("textDocument/hover".to_string(), None)], true);
assert!(
caps.lock().unwrap().is_none(),
"a non-diagnostic registration must not materialize the raw snapshot"
);
}
#[test]
fn parses_register_and_unregister_capability_params() {
let register = serde_json::json!({
"registrations": [
{ "id": "1", "method": "textDocument/inlayHint" },
{
"id": "2",
"method": "textDocument/completion",
"registerOptions": { "triggerCharacters": ["."] }
}
]
});
let parsed = registrations_from_params(Some(®ister));
assert_eq!(parsed.len(), 2);
assert_eq!(parsed[0].0, "textDocument/inlayHint");
assert!(parsed[0].1.is_none());
assert_eq!(parsed[1].0, "textDocument/completion");
assert!(parsed[1].1.is_some());
let unregister = serde_json::json!({
"unregisterations": [
{ "id": "1", "method": "textDocument/inlayHint" }
]
});
let methods = unregistrations_from_params(Some(&unregister));
assert_eq!(methods, vec!["textDocument/inlayHint".to_string()]);
assert!(registrations_from_params(Some(&serde_json::json!({ "bogus": 1 }))).is_empty());
assert!(unregistrations_from_params(None).is_empty());
}
#[test]
fn test_json_rpc_error_response() {
let response = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: 1,
result: None,
error: Some(JsonRpcError {
code: -32600,
message: "Invalid request".to_string(),
data: None,
}),
};
let json = serde_json::to_string(&response).unwrap();
assert!(json.contains("\"error\""));
assert!(json.contains("\"code\":-32600"));
assert!(json.contains("\"message\":\"Invalid request\""));
}
#[test]
fn test_suppressed_error_codes() {
assert!(is_suppressed_error_code(LSP_ERROR_CONTENT_MODIFIED));
assert!(is_suppressed_error_code(LSP_ERROR_SERVER_CANCELLED));
assert!(!is_suppressed_error_code(-32600)); assert!(!is_suppressed_error_code(-32601)); assert!(!is_suppressed_error_code(-32602)); assert!(!is_suppressed_error_code(-32603)); assert!(!is_suppressed_error_code(-32700)); assert!(!is_suppressed_error_code(0));
}
fn capture_warn_logs(body: impl FnOnce()) -> (bool, String) {
use std::time::Duration;
use tempfile::NamedTempFile;
use tracing_subscriber::prelude::*;
let log_file = NamedTempFile::new().unwrap();
let log_path = log_file.into_temp_path();
let (layer, handle) =
crate::services::warning_log::create_with_path(log_path.to_path_buf()).unwrap();
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, body);
let emitted = handle
.receiver
.recv_timeout(Duration::from_millis(100))
.is_ok();
let contents = std::fs::read_to_string(&log_path).unwrap_or_default();
(emitted, contents)
}
#[test]
fn test_content_modified_and_server_cancelled_are_not_logged_as_warn() {
for code in [LSP_ERROR_CONTENT_MODIFIED, LSP_ERROR_SERVER_CANCELLED] {
let (emitted, contents) = capture_warn_logs(|| {
log_response_error(code, "expected during editing", "rust-analyzer", "rust");
});
assert!(
!emitted,
"code {} must not notify the WARN channel; got log:\n{}",
code, contents
);
}
}
#[test]
fn test_method_not_found_still_surfaces_as_warn() {
let (emitted, contents) = capture_warn_logs(|| {
log_response_error(
-32601,
"Unhandled method textDocument/inlayHint",
"vscode-json-language-server",
"json",
);
});
assert!(
emitted,
"MethodNotFound should notify the WARN channel so the mismatch is visible"
);
assert!(
contents.contains("code -32601"),
"WARN log should record the error code; got:\n{}",
contents
);
}
#[test]
fn test_non_suppressed_errors_still_warn() {
let (emitted, contents) = capture_warn_logs(|| {
log_response_error(-32603, "internal error", "rust-analyzer", "rust");
});
assert!(
emitted,
"non-suppressed error codes should notify the WARN channel"
);
assert!(
contents.contains("code -32603"),
"WARN log should record the error code; got:\n{}",
contents
);
assert!(
contents.contains("rust-analyzer"),
"WARN log should record the server name; got:\n{}",
contents
);
}
#[test]
fn test_json_rpc_notification_serialization() {
let notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "textDocument/didOpen".to_string(),
params: Some(serde_json::json!({"uri": "file:///test.rs"})),
};
let json = serde_json::to_string(¬ification).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"method\":\"textDocument/didOpen\""));
assert!(json.contains("\"uri\":\"file:///test.rs\""));
assert!(!json.contains("\"id\"")); }
#[test]
fn test_json_rpc_message_deserialization_request() {
let json =
r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"rootUri":"file:///test"}}"#;
let message: JsonRpcMessage = serde_json::from_str(json).unwrap();
match message {
JsonRpcMessage::Request(request) => {
assert_eq!(request.jsonrpc, "2.0");
assert_eq!(request.id, 1);
assert_eq!(request.method, "initialize");
assert!(request.params.is_some());
}
_ => panic!("Expected Request"),
}
}
#[test]
fn test_json_rpc_message_deserialization_response() {
let json = r#"{"jsonrpc":"2.0","id":1,"result":{"success":true}}"#;
let message: JsonRpcMessage = serde_json::from_str(json).unwrap();
match message {
JsonRpcMessage::Response(response) => {
assert_eq!(response.jsonrpc, "2.0");
assert_eq!(response.id, 1);
assert!(response.result.is_some());
assert!(response.error.is_none());
}
_ => panic!("Expected Response"),
}
}
#[test]
fn test_json_rpc_message_deserialization_notification() {
let json = r#"{"jsonrpc":"2.0","method":"textDocument/didOpen","params":{"uri":"file:///test.rs"}}"#;
let message: JsonRpcMessage = serde_json::from_str(json).unwrap();
match message {
JsonRpcMessage::Notification(notification) => {
assert_eq!(notification.jsonrpc, "2.0");
assert_eq!(notification.method, "textDocument/didOpen");
assert!(notification.params.is_some());
}
_ => panic!("Expected Notification"),
}
}
#[test]
fn test_json_rpc_error_deserialization() {
let json =
r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"Invalid request"}}"#;
let message: JsonRpcMessage = serde_json::from_str(json).unwrap();
match message {
JsonRpcMessage::Response(response) => {
assert_eq!(response.jsonrpc, "2.0");
assert_eq!(response.id, 1);
assert!(response.result.is_none());
assert!(response.error.is_some());
let error = response.error.unwrap();
assert_eq!(error.code, -32600);
assert_eq!(error.message, "Invalid request");
}
_ => panic!("Expected Response with error"),
}
}
#[tokio::test]
async fn test_lsp_handle_spawn_and_drop() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let result = LspHandle::spawn(
&runtime,
"cat",
&[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
);
assert!(result.is_ok());
let handle = result.unwrap();
drop(handle);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
#[tokio::test]
async fn test_lsp_handle_did_open_queues_before_initialization() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let handle = LspHandle::spawn(
&runtime,
"cat",
&[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
)
.unwrap();
let result = handle.did_open(
"file:///test.txt".parse().unwrap(),
"fn main() {}".to_string(),
"test".to_string(),
);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_lsp_handle_did_change_queues_before_initialization() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let handle = LspHandle::spawn(
&runtime,
"cat",
&[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
)
.unwrap();
let result = handle.did_change(
"file:///test.rs".parse().unwrap(),
vec![TextDocumentContentChangeEvent {
range: Some(lsp_types::Range::new(
lsp_types::Position::new(0, 0),
lsp_types::Position::new(0, 0),
)),
range_length: None,
text: "fn main() {}".to_string(),
}],
);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_lsp_handle_incremental_change_with_range() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let handle = LspHandle::spawn(
&runtime,
"cat",
&[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
)
.unwrap();
let result = handle.did_change(
"file:///test.rs".parse().unwrap(),
vec![TextDocumentContentChangeEvent {
range: Some(lsp_types::Range::new(
lsp_types::Position::new(0, 3),
lsp_types::Position::new(0, 7),
)),
range_length: None,
text: String::new(), }],
);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_lsp_handle_spawn_invalid_command() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let result = LspHandle::spawn(
&runtime,
"this-command-does-not-exist-12345",
&[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
);
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let messages = async_bridge.try_recv_all();
assert!(!messages.is_empty());
let has_error = messages
.iter()
.any(|msg| matches!(msg, AsyncMessage::LspError { .. }));
assert!(has_error, "Expected LspError message");
}
#[test]
fn test_lsp_handle_shutdown_from_sync_context() {
std::thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
let async_bridge = AsyncBridge::new();
let handle = rt.block_on(async {
let runtime = tokio::runtime::Handle::current();
LspHandle::spawn(
&runtime,
"cat",
&[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
)
.unwrap()
});
assert!(handle.shutdown().is_ok());
std::thread::sleep(std::time::Duration::from_millis(50));
})
.join()
.unwrap();
}
#[test]
fn test_lsp_command_debug_format() {
let cmd = LspCommand::Shutdown;
let debug_str = format!("{:?}", cmd);
assert!(debug_str.contains("Shutdown"));
}
#[test]
fn test_lsp_client_state_can_initialize_from_starting() {
let state = LspClientState::Starting;
assert!(
state.can_initialize(),
"Starting state must allow initialization to avoid race condition"
);
let mut state = LspClientState::Starting;
assert!(state.can_transition_to(LspClientState::Initializing));
assert!(state.transition_to(LspClientState::Initializing).is_ok());
assert!(state.can_transition_to(LspClientState::Running));
assert!(state.transition_to(LspClientState::Running).is_ok());
}
#[tokio::test]
async fn test_lsp_handle_initialize_from_starting_state() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let handle = LspHandle::spawn(
&runtime,
"cat", &[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
)
.unwrap();
let result = handle.initialize(None, None);
assert!(
result.is_ok(),
"initialize() must succeed from Starting state. Got error: {:?}",
result.err()
);
}
#[tokio::test]
async fn test_lsp_state_machine_race_condition_fix() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let fake_lsp_script = r#"
read -r line # Read Content-Length header
read -r empty # Read empty line
read -r json # Read JSON body
# Send a valid initialize response
response='{"jsonrpc":"2.0","id":1,"result":{"capabilities":{}}}'
echo "Content-Length: ${#response}"
echo ""
echo -n "$response"
# Keep running to avoid EOF
sleep 10
"#;
let handle = LspHandle::spawn(
&runtime,
"bash",
&["-c".to_string(), fake_lsp_script.to_string()],
Default::default(),
LanguageScope::single("fake"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
)
.unwrap();
let init_result = handle.initialize(None, None);
assert!(
init_result.is_ok(),
"initialize() failed from Starting state: {:?}",
init_result.err()
);
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let messages = async_bridge.try_recv_all();
let has_status_update = messages
.iter()
.any(|msg| matches!(msg, AsyncMessage::LspStatusUpdate { .. }));
assert!(
has_status_update,
"Expected status update messages from LSP initialization"
);
#[allow(clippy::let_underscore_must_use)]
let _ = handle.shutdown();
}
#[test]
fn test_lsp_client_state_can_shutdown_from_error() {
let mut state = LspClientState::Error;
assert!(
state.can_transition_to(LspClientState::Stopping),
"Error state must allow transition to Stopping for graceful shutdown"
);
assert!(state.transition_to(LspClientState::Stopping).is_ok());
assert!(state.transition_to(LspClientState::Stopped).is_ok());
}
#[tokio::test]
async fn test_lsp_handle_shutdown_after_spawn_failure_advances_state() {
let runtime = tokio::runtime::Handle::current();
let async_bridge = AsyncBridge::new();
let handle = LspHandle::spawn(
&runtime,
"fresh-nonexistent-lsp-binary-7c93af",
&[],
Default::default(),
LanguageScope::single("test"),
"test-server".to_string(),
&async_bridge,
ProcessLimits::unlimited(),
Default::default(),
local_spawner(),
)
.unwrap();
for _ in 0..200 {
if handle.state() == LspClientState::Error {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
assert_eq!(
handle.state(),
LspClientState::Error,
"spawn task should have transitioned to Error after failed spawn"
);
#[allow(clippy::let_underscore_must_use)]
let _ = handle.shutdown();
let final_state = handle.state();
assert!(
matches!(
final_state,
LspClientState::Stopping | LspClientState::Stopped
),
"shutdown from Error must advance state, got {:?}",
final_state
);
}
}