use anyhow::{Result, anyhow};
use lsp_types::Diagnostic;
use serde::Deserialize;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, warn};
use crate::bridge::{DocumentManager, DocumentNotification, PathValidator};
use crate::lsp::{ClientManager, DIAGNOSTICS_TIMEOUT, DiagnosticsWaitResult, LspClient};
use crate::session::{EventBroadcaster, EventKind};
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum NotifyRequest {
File {
file: String,
},
AddRoots {
add_roots: Vec<String>,
},
}
pub struct NotifyServer {
client_manager: Arc<ClientManager>,
doc_manager: Arc<Mutex<DocumentManager>>,
path_validator: Arc<RwLock<PathValidator>>,
broadcaster: EventBroadcaster,
}
impl NotifyServer {
#[must_use]
pub const fn new(
client_manager: Arc<ClientManager>,
doc_manager: Arc<Mutex<DocumentManager>>,
path_validator: Arc<RwLock<PathValidator>>,
broadcaster: EventBroadcaster,
) -> Self {
Self {
client_manager,
doc_manager,
path_validator,
broadcaster,
}
}
#[cfg(unix)]
pub fn start(self, socket_path: &std::path::Path) -> Result<tokio::task::JoinHandle<()>> {
let _ = std::fs::remove_file(socket_path);
let listener = UnixListener::bind(socket_path).map_err(|e| {
anyhow!(
"Failed to bind notify socket {}: {e}",
socket_path.display()
)
})?;
info!("Notify socket listening on {}", socket_path.display());
let server = Arc::new(self);
let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let server = server.clone();
tokio::spawn(async move {
if let Err(e) = server.handle_connection(stream).await {
debug!("Notify connection error: {e}");
}
});
}
Err(e) => {
warn!("Notify socket accept error: {e}");
}
}
}
});
Ok(handle)
}
#[cfg(windows)]
pub fn start(self, pipe_path: &std::path::Path) -> Result<tokio::task::JoinHandle<()>> {
use tokio::net::windows::named_pipe::ServerOptions;
let pipe_name = pipe_path.to_string_lossy().to_string();
let mut server = ServerOptions::new()
.first_pipe_instance(true)
.create(&pipe_name)
.map_err(|e| anyhow!("Failed to create notify pipe {pipe_name}: {e}"))?;
info!("Notify pipe listening on {pipe_name}");
let server_arc = Arc::new(self);
let handle = tokio::spawn(async move {
loop {
if let Err(e) = server.connect().await {
warn!("Notify pipe connect error: {e}");
continue;
}
let connected = server;
server = match ServerOptions::new().create(&pipe_name) {
Ok(s) => s,
Err(e) => {
warn!("Notify pipe create error: {e}");
break;
}
};
let srv = server_arc.clone();
tokio::spawn(async move {
if let Err(e) = srv.handle_connection(connected).await {
debug!("Notify connection error: {e}");
}
});
}
});
Ok(handle)
}
async fn handle_connection<S: AsyncRead + AsyncWrite + Unpin>(&self, stream: S) -> Result<()> {
let (reader, mut writer) = tokio::io::split(stream);
let mut buf_reader = BufReader::new(reader);
let mut line = String::new();
buf_reader.read_line(&mut line).await?;
let request: NotifyRequest =
serde_json::from_str(line.trim()).map_err(|e| anyhow!("Invalid request: {e}"))?;
let response = match request {
NotifyRequest::File { file } => {
debug!("Notify: processing file {file}");
self.process_file(&file).await
}
NotifyRequest::AddRoots { add_roots } => {
debug!("Notify: adding {} root(s)", add_roots.len());
self.process_add_roots(&add_roots).await
}
};
writer.write_all(response.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.shutdown().await?;
Ok(())
}
async fn process_file(&self, file_path: &str) -> String {
match self.process_file_inner(file_path).await {
Ok(diagnostics) => diagnostics,
Err(e) => format!("Notify error: {e}"),
}
}
#[allow(
clippy::significant_drop_tightening,
reason = "Locks held across async operations by design"
)]
async fn process_file_inner(&self, file_path: &str) -> Result<String> {
let path = resolve_path(file_path)?;
let canonical = self.path_validator.read().await.validate_read(&path)?;
let lang_id = {
let doc_manager = self.doc_manager.lock().await;
doc_manager.language_id_for_path(&canonical).to_string()
};
let client_mutex: Arc<Mutex<LspClient>> =
match self.client_manager.get_client(&lang_id).await {
Ok(c) => c,
Err(_) => return Ok(String::new()), };
let mut doc_manager = self.doc_manager.lock().await;
let client = client_mutex.lock().await;
let lang = client.language().to_string();
if !client.is_alive() {
return Ok(format!(
"[{lang}] server is not running \u{2014} diagnostics unavailable"
));
}
let uri = doc_manager.uri_for_path(&canonical)?;
if let Some(notification) = doc_manager.ensure_open(&canonical).await? {
let snapshot = client.diagnostics_generation(&uri).await;
match notification {
DocumentNotification::Open(params) => {
client.did_open(params).await?;
}
DocumentNotification::Change(params) => {
client.did_change(params).await?;
}
}
client.did_save(uri.clone()).await?;
drop(doc_manager);
let max_retries: u32 = 3;
let mut server_died = false;
for attempt in 0..max_retries {
match client
.wait_for_diagnostics_update(&uri, snapshot, DIAGNOSTICS_TIMEOUT)
.await
{
DiagnosticsWaitResult::Updated => break,
DiagnosticsWaitResult::ServerDied => {
server_died = true;
break;
}
DiagnosticsWaitResult::Inactive => {
if attempt + 1 < max_retries {
warn!(
"Server silent for {:?}, nudging with didSave (attempt {}/{})",
DIAGNOSTICS_TIMEOUT,
attempt + 2,
max_retries
);
client.did_save(uri.clone()).await?;
} else {
warn!(
"Server still silent after {} nudge attempts — \
proceeding with cached diagnostics",
max_retries
);
}
}
}
}
if server_died {
return Ok(format!(
"[{lang}] server died \u{2014} diagnostics unavailable"
));
}
} else {
drop(doc_manager);
}
let diagnostics = client.get_diagnostics(&uri).await;
drop(client);
let count = diagnostics.len();
let compact = if diagnostics.is_empty() {
String::new()
} else {
format_diagnostics_compact(&diagnostics)
};
let preview = compact
.lines()
.next()
.unwrap_or_default()
.trim()
.to_string();
self.broadcaster.send(EventKind::Diagnostics {
file: file_path.to_string(),
count,
preview,
});
if diagnostics.is_empty() {
Ok(String::new())
} else {
Ok(format!("Diagnostics ({count}):\n{compact}"))
}
}
async fn process_add_roots(&self, paths: &[String]) -> String {
match self.process_add_roots_inner(paths).await {
Ok(msg) => msg,
Err(e) => format!("Notify error: {e}"),
}
}
async fn process_add_roots_inner(&self, paths: &[String]) -> Result<String> {
let mut new_paths = Vec::new();
for p in paths {
let path = PathBuf::from(p);
match path.canonicalize() {
Ok(canonical) => new_paths.push(canonical),
Err(e) => {
debug!("Skipping root {p}: {e}");
}
}
}
if new_paths.is_empty() {
return Ok(String::new());
}
let current_roots = self.path_validator.read().await.roots().to_vec();
let genuinely_new: Vec<PathBuf> = new_paths
.into_iter()
.filter(|p| !current_roots.contains(p))
.collect();
if genuinely_new.is_empty() {
return Ok(String::new());
}
let mut all_roots = current_roots;
all_roots.extend(genuinely_new.iter().cloned());
self.path_validator.write().await.update_roots(all_roots);
for root in &genuinely_new {
if let Err(e) = self.client_manager.add_root(root.clone()).await {
warn!("Failed to add root {}: {e}", root.display());
}
}
self.client_manager.spawn_all().await;
let added: Vec<String> = genuinely_new
.iter()
.map(|p| p.to_string_lossy().into_owned())
.collect();
info!("Added roots: {}", added.join(", "));
Ok(format!("Added roots: {}", added.join(", ")))
}
}
fn resolve_path(file: &str) -> Result<PathBuf> {
let path = PathBuf::from(file);
if path.is_absolute() {
Ok(path)
} else {
let cwd = std::env::current_dir()
.map_err(|e| anyhow!("Failed to get current working directory: {e}"))?;
Ok(cwd.join(path))
}
}
pub(crate) fn format_diagnostics_compact(diagnostics: &[Diagnostic]) -> String {
diagnostics
.iter()
.map(|d| {
let severity = match d.severity {
Some(lsp_types::DiagnosticSeverity::ERROR) => "error",
Some(lsp_types::DiagnosticSeverity::WARNING) => "warning",
Some(lsp_types::DiagnosticSeverity::INFORMATION) => "info",
Some(lsp_types::DiagnosticSeverity::HINT) => "hint",
_ => "unknown",
};
let line = d.range.start.line + 1;
let col = d.range.start.character + 1;
let source = d.source.as_deref().unwrap_or("");
let code = d
.code
.as_ref()
.map(|c| match c {
lsp_types::NumberOrString::Number(n) => n.to_string(),
lsp_types::NumberOrString::String(s) => s.clone(),
})
.unwrap_or_default();
if code.is_empty() {
format!(" {line}:{col} [{severity}] {source}: {}", d.message)
} else {
format!(
" {line}:{col} [{severity}] {source}({code}): {}",
d.message
)
}
})
.collect::<Vec<_>>()
.join("\n")
}