pub mod convert;
pub mod server_config;
mod tests;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::{Context, Result};
use serde::Deserialize;
use serde_json::json;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use super::protocol::{
ClientCapabilities, Diagnostic, DocumentSymbol, DocumentSymbolParams, InitializeParams,
LspNotification, LspRequest, LspResponse, TextDocumentIdentifier,
};
pub use convert::{path_to_uri, to_repo_symbols, uri_to_path};
pub use server_config::{
LspServerConfig, extension_to_language_id, find_missing_server_for_language,
find_server_for_language, known_servers,
};
pub type DiagnosticsCache = Arc<std::sync::Mutex<HashMap<String, Vec<Diagnostic>>>>;
#[derive(Deserialize)]
struct PublishDiagnosticsParams {
uri: String,
diagnostics: Vec<Diagnostic>,
}
type PendingRequests =
Arc<std::sync::Mutex<HashMap<u64, tokio::sync::oneshot::Sender<LspResponse>>>>;
pub struct LspClient {
process: Child,
stdin: BufWriter<ChildStdin>,
next_id: u64,
server_name: String,
pending_requests: PendingRequests,
diagnostics: DiagnosticsCache,
diag_version: Arc<AtomicU64>,
_reader_handle: tokio::task::JoinHandle<()>,
}
impl LspClient {
pub fn pid(&self) -> Option<u32> {
self.process.id()
}
pub fn diag_version(&self) -> u64 {
self.diag_version.load(Ordering::Relaxed)
}
pub fn start(config: &LspServerConfig, _root_dir: &str) -> Result<Self> {
let mut child = Command::new(&config.command)
.args(&config.args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.kill_on_drop(true)
.spawn()
.with_context(|| format!("Failed to start language server: {}", config.command))?;
let stdin = child.stdin.take().expect("stdin should be piped");
let stdout = child.stdout.take().expect("stdout should be piped");
let pending_requests: PendingRequests = Arc::new(std::sync::Mutex::new(HashMap::new()));
let diagnostics: DiagnosticsCache = Arc::new(std::sync::Mutex::new(HashMap::new()));
let diag_version = Arc::new(AtomicU64::new(0));
let server_name = config.command.clone();
let reader_handle = {
let pending = Arc::clone(&pending_requests);
let diags = Arc::clone(&diagnostics);
let ver = Arc::clone(&diag_version);
let name = server_name.clone();
tokio::spawn(async move {
background_reader(BufReader::new(stdout), pending, diags, ver, name).await;
})
};
Ok(Self {
process: child,
stdin: BufWriter::new(stdin),
next_id: 1,
server_name,
pending_requests,
diagnostics,
diag_version,
_reader_handle: reader_handle,
})
}
pub async fn send_request(
&mut self,
method: &str,
params: serde_json::Value,
) -> Result<LspResponse> {
let id = self.next_id;
self.next_id += 1;
let request = LspRequest {
jsonrpc: "2.0".into(),
id,
method: method.into(),
params,
};
let body = serde_json::to_string(&request)?;
let header = format!("Content-Length: {}\r\n\r\n", body.len());
let (tx, rx) = tokio::sync::oneshot::channel();
{
let mut pending = self
.pending_requests
.lock()
.map_err(|e| anyhow::anyhow!("pending_requests mutex poisoned: {e}"))?;
pending.insert(id, tx);
}
self.stdin.write_all(header.as_bytes()).await?;
self.stdin.write_all(body.as_bytes()).await?;
self.stdin.flush().await?;
rx.await.context(format!(
"Background reader dropped before response for request {id} to {}",
self.server_name
))
}
pub async fn send_notification(
&mut self,
method: &str,
params: serde_json::Value,
) -> Result<()> {
let notification = LspNotification {
jsonrpc: "2.0".into(),
method: method.into(),
params,
};
let body = serde_json::to_string(¬ification)?;
let header = format!("Content-Length: {}\r\n\r\n", body.len());
self.stdin.write_all(header.as_bytes()).await?;
self.stdin.write_all(body.as_bytes()).await?;
self.stdin.flush().await?;
Ok(())
}
pub async fn initialize(&mut self, root_dir: &str) -> Result<()> {
let params = serde_json::to_value(InitializeParams {
process_id: std::process::id(),
root_uri: path_to_uri(root_dir),
capabilities: ClientCapabilities {},
})?;
tokio::time::timeout(
std::time::Duration::from_secs(30),
self.send_request("initialize", params),
)
.await
.context("LSP initialize timed out after 30 s")?
.context("LSP initialize request failed")?;
self.send_notification("initialized", json!({})).await?;
Ok(())
}
pub async fn document_symbols(&mut self, file_uri: &str) -> Result<Vec<DocumentSymbol>> {
let params = serde_json::to_value(DocumentSymbolParams {
text_document: TextDocumentIdentifier {
uri: file_uri.into(),
},
})?;
let resp = self
.send_request("textDocument/documentSymbol", params)
.await?;
if let Some(err) = resp.error {
anyhow::bail!("LSP error {}: {}", err.code, err.message);
}
match resp.result {
Some(value) => {
let symbols: Vec<DocumentSymbol> = serde_json::from_value(value)?;
Ok(symbols)
}
None => Ok(vec![]),
}
}
pub async fn send_did_open(
&mut self,
uri: &str,
language_id: &str,
content: &str,
) -> Result<()> {
let doc = crate::lsp::protocol::TextDocumentItem {
uri: uri.to_string(),
language_id: language_id.to_string(),
version: 1,
text: content.to_string(),
};
let params = json!({
"textDocument": {
"uri": doc.uri,
"languageId": doc.language_id,
"version": doc.version,
"text": doc.text
}
});
self.send_notification("textDocument/didOpen", params).await
}
pub async fn send_did_change(&mut self, uri: &str, content: &str) -> Result<()> {
let params = json!({
"textDocument": {
"uri": uri,
"version": self.next_id },
"contentChanges": [{
"text": content
}]
});
self.next_id += 1;
self.send_notification("textDocument/didChange", params)
.await
}
pub async fn send_did_save(&mut self, uri: &str) -> Result<()> {
let params = json!({
"textDocument": {
"uri": uri
}
});
self.send_notification("textDocument/didSave", params).await
}
pub fn diagnostics_cache(&self) -> &DiagnosticsCache {
&self.diagnostics
}
pub async fn shutdown(mut self) -> Result<()> {
let fut = async {
let _resp = self.send_request("shutdown", json!(null)).await?;
self.send_notification("exit", json!(null)).await?;
let _ = self.process.wait().await;
Ok::<(), anyhow::Error>(())
};
match crate::util::timeout::with_timeout(
std::time::Duration::from_secs(5),
"LSP shutdown",
fut,
)
.await
{
Ok(_) => {}
Err(_) => {
tracing::warn!("LSP shutdown timed out after 5s — killing process");
let _ = self.process.kill().await;
}
}
Ok(())
}
}
async fn background_reader(
mut reader: BufReader<ChildStdout>,
pending: PendingRequests,
diagnostics: DiagnosticsCache,
diag_version: Arc<AtomicU64>,
server_name: String,
) {
loop {
let message = match read_message(&mut reader).await {
Ok(msg) => msg,
Err(e) => {
tracing::debug!("LSP background reader for {server_name} ending: {e}");
break;
}
};
let msg: serde_json::Value = match serde_json::from_str(&message) {
Ok(v) => v,
Err(e) => {
tracing::warn!("Failed to parse LSP message from {server_name}: {e}");
continue;
}
};
if msg.get("id").is_some() && msg.get("method").is_none() {
match serde_json::from_value::<LspResponse>(msg) {
Ok(resp) => {
let mut map = match pending.lock() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
if let Some(tx) = map.remove(&resp.id) {
let _ = tx.send(resp);
}
}
Err(e) => {
tracing::warn!("Failed to parse LSP response from {server_name}: {e}");
}
}
} else if let Some(method) = msg.get("method").and_then(|m| m.as_str()) {
if method == "textDocument/publishDiagnostics"
&& let Some(params_val) = msg.get("params")
{
match serde_json::from_value::<PublishDiagnosticsParams>(params_val.clone()) {
Ok(params) => {
let mut cache = match diagnostics.lock() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
cache.insert(params.uri, params.diagnostics);
diag_version.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
tracing::warn!(
"Failed to parse publishDiagnostics from {server_name}: {e}"
);
}
}
}
}
}
}
async fn read_message(reader: &mut BufReader<ChildStdout>) -> Result<String> {
let mut content_length: usize = 0;
loop {
let mut line = String::new();
let bytes_read = reader.read_line(&mut line).await?;
if bytes_read == 0 {
anyhow::bail!("EOF while reading LSP headers");
}
let trimmed = line.trim();
if trimmed.is_empty() {
break;
}
if let Some(value) = trimmed.strip_prefix("Content-Length: ") {
content_length = value.parse()?;
}
}
anyhow::ensure!(content_length > 0, "Missing Content-Length header");
let mut buf = vec![0u8; content_length];
reader.read_exact(&mut buf).await?;
String::from_utf8(buf).context("LSP response is not valid UTF-8")
}