use anyhow::{Result, anyhow};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tokio::time::{Duration, timeout};
pub struct LspTransport {
process: Arc<Mutex<Option<Child>>>,
stdin: Arc<Mutex<Option<Box<dyn AsyncWrite + Unpin + Send>>>>,
stdout_reader: Arc<Mutex<Option<BufReader<Box<dyn AsyncRead + Unpin + Send>>>>>,
request_id: AtomicU32,
server_name: String,
}
impl LspTransport {
pub async fn spawn(
server_name: impl Into<String>,
command: &str,
args: &[String],
) -> Result<Self> {
let server_name = server_name.into();
let (actual_command, actual_args) = if cfg!(target_os = "windows")
&& (command == "npx" || command == "npm" || command == "node")
{
let mut full_args = vec!["/c".to_string(), command.to_string()];
full_args.extend(args.iter().cloned());
("cmd.exe".to_string(), full_args)
} else {
(command.to_string(), args.to_vec())
};
let mut cmd = Command::new(&actual_command);
cmd.args(&actual_args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| {
anyhow!(
"Failed to spawn LSP server '{}': {} (command: {} {:?})",
server_name,
e,
actual_command,
actual_args
)
})?;
let stdin = child.stdin.take().map(|s| Box::new(s) as Box<dyn AsyncWrite + Unpin + Send>);
let stdout = child.stdout.take().map(|s| {
Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
});
let stdout_reader = stdout.map(|s| BufReader::new(s));
let stderr = child.stderr.take().map(|s| {
Box::new(s) as Box<dyn AsyncRead + Unpin + Send>
});
if let Some(stderr) = stderr {
let server_name_clone = server_name.clone();
let stderr_reader = BufReader::new(stderr).lines();
tokio::spawn(async move {
let mut lines = stderr_reader;
while let Ok(Some(line)) = lines.next_line().await {
let line_lower = line.to_lowercase();
if line_lower.contains("error") || line_lower.contains("fatal") {
log::error!("LSP '{}' stderr: {}", server_name_clone, line);
} else if line_lower.contains("warn") || line_lower.contains("warning") {
log::warn!("LSP '{}' stderr: {}", server_name_clone, line);
} else {
log::debug!("LSP '{}' stderr: {}", server_name_clone, line);
}
}
log::info!("LSP '{}' stderr stream ended", server_name_clone);
});
}
log::info!(
"LSP server '{}' spawned successfully (pid: {:?})",
server_name,
child.id()
);
Ok(Self {
process: Arc::new(Mutex::new(Some(child))),
stdin: Arc::new(Mutex::new(stdin)),
stdout_reader: Arc::new(Mutex::new(stdout_reader)),
request_id: AtomicU32::new(1),
server_name,
})
}
pub async fn send_request(
&self,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value> {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
let message = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params
});
self.send_message(&message.to_string()).await?;
self.receive_response(id).await
}
pub async fn send_notification(
&self,
method: &str,
params: serde_json::Value,
) -> Result<()> {
let message = serde_json::json!({
"jsonrpc": "2.0",
"method": method,
"params": params
});
self.send_message(&message.to_string()).await
}
async fn send_message(&self, content: &str) -> Result<()> {
let mut stdin = self.stdin.lock().await;
let stdin = stdin.as_mut().ok_or_else(|| anyhow!("stdin not available"))?;
let header = format!("Content-Length: {}\r\n\r\n", content.len());
stdin.write_all(header.as_bytes()).await?;
stdin.write_all(content.as_bytes()).await?;
stdin.flush().await?;
log::debug!("LSP '{}' sent: {}", self.server_name, content);
Ok(())
}
async fn receive_response(&self, expected_id: u32) -> Result<serde_json::Value> {
let timeout_duration = Duration::from_secs(30);
timeout(timeout_duration, async {
loop {
let message = self.receive_messages().await?;
let json: serde_json::Value = serde_json::from_str(&message)?;
if let Some(id) = json.get("id").and_then(|i| i.as_u64()) {
if id == expected_id as u64 {
if let Some(error) = json.get("error") {
return Err(anyhow!("LSP error: {:?}", error));
}
return Ok(json.get("result").cloned().unwrap_or(serde_json::Value::Null));
}
}
log::debug!("LSP '{}' received other message: {}", self.server_name, message);
}
}).await.map_err(|_| anyhow!("LSP request timeout after {}s", timeout_duration.as_secs()))?
}
pub async fn receive_messages(&self) -> Result<String> {
let mut reader = self.stdout_reader.lock().await;
let reader = reader.as_mut().ok_or_else(|| anyhow!("stdout not available"))?;
let mut header_line = String::new();
reader.read_line(&mut header_line).await?;
let content_length: usize = header_line
.strip_prefix("Content-Length: ")
.and_then(|s| s.trim().parse().ok())
.ok_or_else(|| anyhow!("Invalid LSP header: {}", header_line))?;
let mut empty_line = String::new();
reader.read_line(&mut empty_line).await?;
let mut content = vec![0u8; content_length];
reader.read_exact(&mut content).await?;
let message = String::from_utf8(content)?;
log::debug!("LSP '{}' received: {}", self.server_name, message);
Ok(message)
}
pub async fn close(&self) -> Result<()> {
let mut process = self.process.lock().await;
if let Some(mut child) = process.take() {
child.kill().await?;
log::info!("LSP server '{}' stopped", self.server_name);
}
Ok(())
}
pub fn server_name(&self) -> &str {
&self.server_name
}
}