use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use crossbeam_channel::{Receiver, Sender};
use lsp_server::{Message, Notification, Request, RequestId, Response};
use lsp_types::{Diagnostic, MessageType, Uri};
use super::DocumentState;
use super::LspRuntimeSettings;
use super::config::{load_config, load_config_with_source};
use super::task_pool::{TaskPool, default_pool_size};
use crate::Config;
use crate::config::ConfigSource;
use crate::syntax::{ParsedYamlRegionSnapshot, SyntaxNode};
pub(crate) type DocumentMap = HashMap<String, DocumentState>;
pub(crate) const DIAGNOSTICS_DEBOUNCE: std::time::Duration = std::time::Duration::from_millis(200);
#[derive(Clone)]
pub(crate) struct StoredDiagnostics {
pub(crate) version: Option<i32>,
pub(crate) items: Vec<Diagnostic>,
pub(crate) result_id: String,
}
#[derive(Default)]
pub(crate) struct DiagnosticCollection {
current: HashMap<Uri, StoredDiagnostics>,
result_seq: u64,
}
impl DiagnosticCollection {
pub(crate) fn apply(
&mut self,
publishes: Vec<(Uri, Option<i32>, Vec<Diagnostic>)>,
sender: &ClientSender,
pull: bool,
) {
let mut next: HashSet<Uri> = HashSet::with_capacity(publishes.len());
for (uri, version, items) in publishes {
next.insert(uri.clone());
let unchanged = self
.current
.get(&uri)
.is_some_and(|entry| entry.items == items);
if unchanged {
continue;
}
self.result_seq += 1;
let result_id = self.result_seq.to_string();
if !pull {
sender.publish_diagnostics(uri.clone(), items.clone(), version);
}
self.current.insert(
uri,
StoredDiagnostics {
version,
items,
result_id,
},
);
}
let stale: Vec<Uri> = self
.current
.keys()
.filter(|uri| !next.contains(*uri))
.cloned()
.collect();
for uri in stale {
self.drop_uri(&uri, sender, pull);
}
}
pub(crate) fn drop_uri(&mut self, uri: &Uri, sender: &ClientSender, pull: bool) {
if self.current.remove(uri).is_some() && !pull {
sender.publish_diagnostics(uri.clone(), Vec::new(), None);
}
}
pub(crate) fn get(&self, uri: &Uri) -> Option<&StoredDiagnostics> {
self.current.get(uri)
}
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Uri, &StoredDiagnostics)> {
self.current.iter()
}
}
#[derive(Clone)]
pub(crate) struct ClientSender {
sender: Sender<Message>,
}
impl ClientSender {
pub(crate) fn new(sender: Sender<Message>) -> Self {
Self { sender }
}
pub(crate) fn publish_diagnostics(
&self,
uri: Uri,
diagnostics: Vec<Diagnostic>,
version: Option<i32>,
) {
self.notify::<lsp_types::notification::PublishDiagnostics>(
lsp_types::PublishDiagnosticsParams {
uri,
diagnostics,
version,
},
);
}
pub(crate) fn log_message(&self, typ: MessageType, message: impl Into<String>) {
self.notify::<lsp_types::notification::LogMessage>(lsp_types::LogMessageParams {
typ,
message: message.into(),
});
}
pub(crate) fn notify<N: lsp_types::notification::Notification>(&self, params: N::Params) {
self.send(Message::Notification(Notification::new(
N::METHOD.to_owned(),
params,
)));
}
pub(crate) fn send(&self, message: Message) {
if let Err(err) = self.sender.send(message) {
log::warn!("LSP client channel closed; dropping message: {err}");
}
}
}
pub(crate) struct StateSnapshot {
analysis: crate::salsa::Analysis,
pub(crate) document_map: Arc<DocumentMap>,
pub(crate) workspace_root: Option<PathBuf>,
}
impl StateSnapshot {
pub(crate) fn db(&self) -> &dyn crate::salsa::Db {
self.analysis.db()
}
pub(crate) fn document_state(&self, uri: &Uri) -> Option<DocumentState> {
self.document_map.get(&uri.to_string()).cloned()
}
pub(crate) fn document_content(&self, uri: &Uri) -> Option<String> {
let state = self.document_map.get(&uri.to_string())?;
Some(state.salsa_file.content_or_empty(self.db()).to_string())
}
pub(crate) fn document_content_and_tree(&self, uri: &Uri) -> Option<(String, SyntaxNode)> {
let state = self.document_map.get(&uri.to_string())?;
Some((
state.salsa_file.content_or_empty(self.db()).to_string(),
SyntaxNode::new_root(state.tree.clone()),
))
}
pub(crate) fn parsed_tree(&self, uri: &Uri) -> Option<SyntaxNode> {
let state = self.document_map.get(&uri.to_string())?;
Some(crate::salsa::parsed_tree_root(
self.db(),
state.salsa_file,
state.salsa_config,
))
}
pub(crate) fn config(&self, uri: &Uri) -> Config {
load_config(&self.workspace_root, Some(uri))
}
pub(crate) fn document_and_config(&self, uri: &Uri) -> Option<(String, Config)> {
let content = self.document_content(uri)?;
Some((content, self.config(uri)))
}
pub(crate) fn document_config_and_source(
&self,
uri: &Uri,
) -> Option<(String, Config, ConfigSource, Option<PathBuf>)> {
let content = self.document_content(uri)?;
let (config, source) = load_config_with_source(&self.workspace_root, Some(uri));
Some((content, config, source, self.workspace_root.clone()))
}
pub(crate) fn definition_index_with_includes(
&self,
uri: &Uri,
) -> crate::salsa::DefinitionIndex {
let Some(state) = self.document_map.get(&uri.to_string()) else {
return crate::salsa::DefinitionIndex::default();
};
let (salsa_file, salsa_config) = (state.salsa_file, state.salsa_config);
let db = self.db();
let graph = crate::salsa::project_structure(db, salsa_file, salsa_config).clone();
let mut index = crate::salsa::definition_index(db, salsa_file, salsa_config).clone();
for path in graph.documents().iter() {
if let Some(include_file) = db.file_text(path.clone()) {
let include_index = crate::salsa::definition_index(db, include_file, salsa_config);
index.merge_from(include_index);
}
}
index
}
pub(crate) fn parsed_yaml_regions(&self, uri: &Uri) -> &[ParsedYamlRegionSnapshot] {
let Some((file, config)) = self
.document_map
.get(&uri.to_string())
.map(|state| (state.salsa_file, state.salsa_config))
else {
return &[];
};
crate::salsa::parsed_yaml_regions_for_file(self.db(), file, config)
}
}
pub(crate) enum Task {
Response(Response),
Diagnostics {
generation: u64,
publishes: Vec<(Uri, Option<i32>, Vec<Diagnostic>)>,
external_ran: HashSet<Uri>,
},
}
pub(crate) struct GlobalState {
pub(crate) sender: ClientSender,
pub(crate) document_map: Arc<DocumentMap>,
pub(crate) workspace_root: Option<PathBuf>,
pub(crate) runtime_settings: LspRuntimeSettings,
pub(crate) supports_pull_diagnostics: bool,
pub(crate) supports_diagnostic_refresh: bool,
pub(crate) supports_related_documents: bool,
pub(crate) diagnostics: DiagnosticCollection,
pub(crate) salsa: crate::salsa::SalsaDb,
pub(crate) pool: TaskPool<Task>,
pub(crate) fmt_pool: TaskPool<Task>,
pub(crate) task_receiver: Receiver<Task>,
pub(crate) in_flight: HashSet<RequestId>,
pub(crate) cancelled: HashSet<RequestId>,
pub(crate) outgoing: HashMap<RequestId, &'static str>,
pub(crate) next_outgoing_id: i32,
pub(crate) settle_deadline: Option<Instant>,
pub(crate) lint_generation: u64,
pub(crate) external_pending: HashSet<Uri>,
}
impl GlobalState {
pub(crate) fn new(sender: ClientSender) -> Self {
let (task_tx, task_receiver) = crossbeam_channel::unbounded::<Task>();
let pool = TaskPool::new(task_tx.clone(), default_pool_size());
let fmt_pool = TaskPool::new(task_tx, 1);
Self {
sender,
document_map: Arc::new(DocumentMap::new()),
workspace_root: None,
runtime_settings: LspRuntimeSettings::default(),
supports_pull_diagnostics: false,
supports_diagnostic_refresh: false,
supports_related_documents: false,
diagnostics: DiagnosticCollection::default(),
salsa: crate::salsa::SalsaDb::default(),
pool,
fmt_pool,
task_receiver,
in_flight: HashSet::new(),
cancelled: HashSet::new(),
outgoing: HashMap::new(),
next_outgoing_id: 1,
settle_deadline: None,
lint_generation: 0,
external_pending: HashSet::new(),
}
}
pub(crate) fn snapshot(&self) -> StateSnapshot {
StateSnapshot {
analysis: crate::salsa::Analysis::new(self.salsa.clone()),
document_map: Arc::clone(&self.document_map),
workspace_root: self.workspace_root.clone(),
}
}
pub(crate) fn document_map_mut(&mut self) -> &mut DocumentMap {
Arc::make_mut(&mut self.document_map)
}
pub(crate) fn respond(&mut self, response: Response) {
self.in_flight.remove(&response.id);
self.cancelled.remove(&response.id);
self.sender.send(Message::Response(response));
}
pub(crate) fn send_request<R: lsp_types::request::Request>(&mut self, params: R::Params) {
let id = RequestId::from(self.next_outgoing_id);
self.next_outgoing_id += 1;
self.outgoing.insert(id.clone(), R::METHOD);
self.sender.send(Message::Request(Request::new(
id,
R::METHOD.to_owned(),
params,
)));
}
pub(crate) fn on_client_response(&mut self, response: Response) {
if let Some(method) = self.outgoing.remove(&response.id)
&& let Some(err) = response.error
{
log::warn!("server request {method} failed: {}", err.message);
}
}
pub(crate) fn arm_settle(&mut self) {
self.settle_deadline = Some(Instant::now() + DIAGNOSTICS_DEBOUNCE);
}
pub(crate) fn arm_settle_external(&mut self, uri: Uri) {
self.external_pending.insert(uri);
self.arm_settle();
}
pub(crate) fn send_diagnostic_refresh(&mut self) {
if self.supports_diagnostic_refresh {
self.send_request::<lsp_types::request::WorkspaceDiagnosticRefresh>(());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use lsp_types::{Position, Range};
fn sender() -> ClientSender {
let (tx, _rx) = crossbeam_channel::unbounded();
ClientSender::new(tx)
}
fn uri(s: &str) -> Uri {
s.parse().expect("valid uri")
}
fn diag(line: u32) -> Diagnostic {
Diagnostic {
range: Range {
start: Position { line, character: 0 },
end: Position { line, character: 1 },
},
message: "x".to_owned(),
..Default::default()
}
}
#[test]
fn unchanged_uri_keeps_result_id_across_settles() {
let sender = sender();
let mut dc = DiagnosticCollection::default();
let a = uri("file:///a.qmd");
dc.apply(vec![(a.clone(), None, vec![diag(2)])], &sender, true);
let first = dc.get(&a).expect("stored").result_id.clone();
dc.apply(vec![(a.clone(), None, vec![diag(2)])], &sender, true);
assert_eq!(
dc.get(&a).expect("still stored").result_id,
first,
"unchanged diagnostics must keep the same result_id"
);
dc.apply(vec![(a.clone(), None, vec![diag(5)])], &sender, true);
assert_ne!(
dc.get(&a).expect("still stored").result_id,
first,
"changed diagnostics must get a fresh result_id"
);
}
#[test]
fn omitted_uri_is_cleared() {
let sender = sender();
let mut dc = DiagnosticCollection::default();
let (a, x) = (uri("file:///a.qmd"), uri("file:///x.qmd"));
dc.apply(
vec![
(a.clone(), None, vec![diag(1)]),
(x.clone(), None, vec![diag(1)]),
],
&sender,
true,
);
assert!(dc.get(&a).is_some() && dc.get(&x).is_some());
dc.apply(vec![(a.clone(), None, vec![diag(1)])], &sender, true);
assert!(dc.get(&a).is_some(), "still-reported uri retained");
assert!(dc.get(&x).is_none(), "omitted uri cleared");
}
}