use super::types::{JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
use anyhow::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::{RwLock, mpsc, oneshot};
use tracing::{debug, error, trace, warn};
pub struct LspTransport {
_child: Child,
tx: mpsc::Sender<String>,
pending: Arc<RwLock<std::collections::HashMap<i64, oneshot::Sender<JsonRpcResponse>>>>,
request_id: AtomicI64,
initialized: std::sync::atomic::AtomicBool,
}
impl LspTransport {
pub async fn spawn(command: &str, args: &[String]) -> Result<Self> {
let mut child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.spawn()?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("No stdout"))?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("No stdin"))?;
let (write_tx, mut write_rx) = mpsc::channel::<String>(100);
let pending: Arc<RwLock<std::collections::HashMap<i64, oneshot::Sender<JsonRpcResponse>>>> =
Arc::new(RwLock::new(std::collections::HashMap::new()));
let pending_clone = Arc::clone(&pending);
tokio::spawn(async move {
while let Some(msg) = write_rx.recv().await {
let content_length = msg.len();
let header = format!("Content-Length: {}\r\n\r\n", content_length);
trace!("LSP TX header: {}", header.trim());
trace!("LSP TX body: {}", msg);
if let Err(e) = stdin.write_all(header.as_bytes()).await {
error!("Failed to write header to LSP server: {}", e);
break;
}
if let Err(e) = stdin.write_all(msg.as_bytes()).await {
error!("Failed to write body to LSP server: {}", e);
break;
}
if let Err(e) = stdin.flush().await {
error!("Failed to flush LSP server stdin: {}", e);
break;
}
}
pending_clone.write().await.clear();
});
let pending_clone = Arc::clone(&pending);
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut header_buf = String::new();
loop {
header_buf.clear();
let mut content_length: Option<usize> = None;
loop {
header_buf.clear();
match reader.read_line(&mut header_buf).await {
Ok(0) => {
debug!("LSP server closed connection");
return;
}
Ok(_) => {
let line = header_buf.trim();
if line.is_empty() {
break; }
if let Some(stripped) = line.strip_prefix("Content-Length:") {
if let Ok(len) = stripped.trim().parse::<usize>() {
content_length = Some(len);
}
}
}
Err(e) => {
error!("Failed to read header from LSP server: {}", e);
return;
}
}
}
let Some(len) = content_length else {
warn!("LSP message missing Content-Length header");
continue;
};
let mut body_buf = vec![0u8; len];
match reader.read_exact(&mut body_buf).await {
Ok(_) => {
let body = String::from_utf8_lossy(&body_buf);
trace!("LSP RX: {}", body);
match serde_json::from_str::<JsonRpcResponse>(&body) {
Ok(response) => {
let mut pending_guard = pending_clone.write().await;
if let Some(tx) = pending_guard.remove(&response.id) {
let id = response.id;
if tx.send(response).is_err() {
warn!("Request {} receiver dropped", id);
}
} else {
debug!("Received response for unknown request {}", response.id);
}
}
Err(e) => {
debug!("Failed to parse LSP response: {} - body: {}", e, body);
}
}
}
Err(e) => {
error!("Failed to read LSP message body: {}", e);
return;
}
}
}
});
Ok(Self {
_child: child,
tx: write_tx,
pending,
request_id: AtomicI64::new(1),
initialized: std::sync::atomic::AtomicBool::new(false),
})
}
pub async fn request(
&self,
method: &str,
params: Option<serde_json::Value>,
) -> Result<JsonRpcResponse> {
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
let request = JsonRpcRequest::new(id, method, params);
let (tx, rx) = oneshot::channel();
self.pending.write().await.insert(id, tx);
let json = serde_json::to_string(&request)?;
self.tx.send(json).await?;
let response = tokio::time::timeout(std::time::Duration::from_secs(30), rx)
.await
.map_err(|_| anyhow::anyhow!("LSP request timeout for method: {}", method))?
.map_err(|_| anyhow::anyhow!("LSP response channel closed"))?;
Ok(response)
}
pub async fn notify(&self, method: &str, params: Option<serde_json::Value>) -> Result<()> {
let notification = JsonRpcNotification::new(method, params);
let json = serde_json::to_string(¬ification)?;
self.tx.send(json).await?;
Ok(())
}
pub fn is_initialized(&self) -> bool {
self.initialized.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn set_initialized(&self, value: bool) {
self.initialized
.store(value, std::sync::atomic::Ordering::SeqCst);
}
}
impl Drop for LspTransport {
fn drop(&mut self) {
if self.is_initialized() {
tracing::debug!("LspTransport dropped while still initialized");
}
}
}