use serde_json::Value;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
use tracing::{debug, trace, warn};
use super::client::DiagnosticsCache;
use super::extract;
use super::protocol::RpcError;
use super::state::{ProgressTracker, ServerState};
use crate::session::{EventBroadcaster, EventKind};
pub trait Inbox: Send + Sync {
fn on_notification(&self, method: &str, params: &Value);
fn on_request(&self, method: &str, params: &Value) -> Result<Value, RpcError>;
fn on_shutdown(&self);
fn is_progress_active(&self) -> bool;
fn state_notify(&self) -> &Notify;
}
pub struct ServerInbox {
pub(crate) diagnostics: DiagnosticsCache,
pub(crate) diagnostics_generation: Arc<Mutex<HashMap<String, u64>>>,
pub(crate) diagnostics_notify: Arc<Notify>,
pub(crate) capability_notify: Arc<Notify>,
pub(crate) progress: Arc<Mutex<ProgressTracker>>,
pub(crate) progress_notify: Arc<Notify>,
pub(crate) state: Arc<AtomicU8>,
pub(crate) state_notify: Arc<Notify>,
pub(crate) has_published_diagnostics: Arc<AtomicBool>,
pub(crate) publishes_version: Arc<AtomicBool>,
pub(crate) has_sent_progress: Arc<AtomicBool>,
pub(crate) language: String,
pub(crate) broadcaster: EventBroadcaster,
settings: Option<Value>,
}
impl ServerInbox {
pub(crate) fn new(
language: String,
broadcaster: EventBroadcaster,
settings: Option<Value>,
) -> Self {
Self {
diagnostics: Arc::new(Mutex::new(HashMap::new())),
diagnostics_generation: Arc::new(Mutex::new(HashMap::new())),
diagnostics_notify: Arc::new(Notify::new()),
capability_notify: Arc::new(Notify::new()),
progress: Arc::new(Mutex::new(ProgressTracker::new())),
progress_notify: Arc::new(Notify::new()),
state: Arc::new(AtomicU8::new(ServerState::Initializing.as_u8())),
state_notify: Arc::new(Notify::new()),
has_published_diagnostics: Arc::new(AtomicBool::new(false)),
publishes_version: Arc::new(AtomicBool::new(false)),
has_sent_progress: Arc::new(AtomicBool::new(false)),
language,
broadcaster,
settings,
}
}
pub(crate) const fn settings(&self) -> Option<&Value> {
self.settings.as_ref()
}
}
fn resolve_section(settings: Option<&Value>, section: Option<&str>) -> Value {
let (Some(mut current), Some(section)) = (settings, section) else {
return Value::Object(serde_json::Map::new());
};
for key in section.split('.') {
match current.get(key) {
Some(child) => current = child,
None => return Value::Object(serde_json::Map::new()),
}
}
current.clone()
}
impl Inbox for ServerInbox {
fn on_notification(&self, method: &str, params: &Value) {
match method {
"textDocument/publishDiagnostics" => {
let Some(uri) = extract::publish_diagnostics_uri(params) else {
warn!("publishDiagnostics missing uri");
return;
};
let version = extract::publish_diagnostics_version(params);
let diagnostics = extract::publish_diagnostics_diagnostics(params);
debug!(
"Received {} diagnostics for {:?} (version={:?})",
diagnostics.len(),
uri,
version,
);
self.has_published_diagnostics.store(true, Ordering::SeqCst);
if version.is_some() && !self.publishes_version.swap(true, Ordering::SeqCst) {
self.capability_notify.notify_waiters();
}
let mut cache = self
.diagnostics
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
cache.insert(uri.to_string(), (version, diagnostics));
drop(cache);
let mut generations = self
.diagnostics_generation
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let counter = generations.entry(uri.to_string()).or_insert(0);
*counter += 1;
drop(generations);
self.diagnostics_notify.notify_waiters();
}
"$/progress" => {
let Some(token_value) = extract::progress_token(params) else {
warn!("$/progress missing token");
return;
};
let token_str = token_value
.as_str()
.map_or_else(|| token_value.to_string(), str::to_string);
if !self.has_sent_progress.swap(true, Ordering::SeqCst) {
self.capability_notify.notify_waiters();
}
let mut tracker = self
.progress
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
tracker.update(&token_str, ¶ms["value"]);
let current_state = ServerState::from_u8(self.state.load(Ordering::SeqCst));
if current_state != ServerState::Dead {
if tracker.is_busy() {
self.state
.store(ServerState::Busy.as_u8(), Ordering::SeqCst);
if tracker.broadcast_changed()
&& let Some(p) = tracker.primary_progress()
{
debug!("Progress: {} {}%", p.title, p.percentage.unwrap_or(0));
self.broadcaster.send(EventKind::Progress {
language: self.language.clone(),
title: p.title.clone(),
message: p.message.clone(),
percentage: p.percentage,
});
}
} else {
self.state
.store(ServerState::Ready.as_u8(), Ordering::SeqCst);
debug!("Server ready (progress completed)");
self.broadcaster.send(EventKind::ProgressEnd {
language: self.language.clone(),
});
}
self.progress_notify.notify_waiters();
self.state_notify.notify_waiters();
}
}
"window/logMessage" | "window/showMessage" => {
if let Some(message) = params.get("message").and_then(|m| m.as_str()) {
debug!("LSP server message: {}", message);
}
}
_ => {
trace!("Ignoring notification: {} params={}", method, params);
}
}
}
fn on_request(&self, method: &str, params: &Value) -> Result<Value, RpcError> {
match method {
"workspace/configuration" => {
let items = params.get("items").and_then(Value::as_array);
let item_count = items.map_or(1, Vec::len);
let results: Vec<Value> = (0..item_count)
.map(|i| {
let section = items
.and_then(|arr| arr.get(i))
.and_then(|item| item.get("section"))
.and_then(Value::as_str);
resolve_section(self.settings.as_ref(), section)
})
.collect();
Ok(Value::Array(results))
}
"window/workDoneProgress/create" => Ok(Value::Null),
_ => Err(RpcError {
code: -32601,
message: format!("Method '{method}' not supported by client"),
}),
}
}
fn on_shutdown(&self) {
self.state
.store(ServerState::Dead.as_u8(), Ordering::SeqCst);
if let Ok(mut progress) = self.progress.lock() {
progress.clear();
}
self.diagnostics_notify.notify_waiters();
self.state_notify.notify_waiters();
}
fn is_progress_active(&self) -> bool {
self.progress
.try_lock()
.map_or(true, |tracker| tracker.is_busy())
}
fn state_notify(&self) -> &Notify {
&self.state_notify
}
}
#[cfg(test)]
#[allow(
clippy::expect_used,
reason = "tests use expect for readable assertions"
)]
mod tests {
use super::*;
use serde_json::json;
fn test_inbox() -> ServerInbox {
ServerInbox::new(
"test".to_string(),
crate::session::EventBroadcaster::noop(),
None,
)
}
#[test]
fn resolve_section_traverses_dot_path() {
let settings = json!({
"python": {
"analysis": {
"exclude": ["**/target"],
"extraPaths": []
},
"pythonPath": "/usr/bin/python3"
}
});
assert_eq!(
resolve_section(Some(&settings), Some("python.analysis")),
json!({"exclude": ["**/target"], "extraPaths": []})
);
assert_eq!(
resolve_section(Some(&settings), Some("python.pythonPath")),
json!("/usr/bin/python3")
);
assert_eq!(
resolve_section(Some(&settings), Some("python")),
json!({"analysis": {"exclude": ["**/target"], "extraPaths": []}, "pythonPath": "/usr/bin/python3"})
);
}
#[test]
fn resolve_section_missing_path_returns_empty_object() {
let settings = json!({"python": {"analysis": {}}});
assert_eq!(resolve_section(Some(&settings), Some("rust")), json!({}));
assert_eq!(
resolve_section(Some(&settings), Some("python.nonexistent")),
json!({})
);
}
#[test]
fn resolve_section_none_settings_returns_empty_object() {
assert_eq!(resolve_section(None, Some("python")), json!({}));
}
#[test]
fn resolve_section_none_section_returns_empty_object() {
let settings = json!({"python": {}});
assert_eq!(resolve_section(Some(&settings), None), json!({}));
}
#[test]
fn configuration_request_uses_settings() {
let inbox = ServerInbox::new(
"test".to_string(),
crate::session::EventBroadcaster::noop(),
Some(json!({"mockls": {"key": "value"}})),
);
let result = inbox
.on_request(
"workspace/configuration",
&json!({"items": [{"section": "mockls"}]}),
)
.expect("configuration request should succeed");
assert_eq!(result, json!([{"key": "value"}]));
}
#[test]
fn configuration_request_without_settings_returns_empty_objects() {
let inbox = test_inbox();
let result = inbox
.on_request(
"workspace/configuration",
&json!({"items": [{"section": "mockls"}, {"section": "other"}]}),
)
.expect("configuration request should succeed");
assert_eq!(result, json!([{}, {}]));
}
#[test]
fn is_progress_active_begin_end() {
let inbox = test_inbox();
assert!(!inbox.is_progress_active());
inbox.on_notification(
"$/progress",
&json!({
"token": "test-token",
"value": { "kind": "begin", "title": "Indexing", "percentage": 0 }
}),
);
assert!(inbox.is_progress_active());
inbox.on_notification(
"$/progress",
&json!({
"token": "test-token",
"value": { "kind": "end" }
}),
);
assert!(!inbox.is_progress_active());
}
}