use crate::{
analysis::{Analysis, AnalysisSnapshot},
change::AnalysisChange,
config::Config,
state::utils::Progress,
to_json, to_lsp,
};
use crossbeam_channel::{select, unbounded, Receiver, Sender};
use lsp_server::{ReqQueue, Response};
use lsp_types::{
notification::Notification, notification::PublishDiagnostics, PublishDiagnosticsParams,
};
use mun_paths::AbsPathBuf;
use mun_vfs::VirtualFileSystem;
use parking_lot::RwLock;
use rustc_hash::FxHashSet;
use std::{ops::Deref, sync::Arc, time::Instant};
mod protocol;
mod utils;
mod workspace;
#[derive(Debug)]
pub(crate) enum Task {
Response(Response),
Notify(lsp_server::Notification),
}
#[derive(Debug)]
pub(crate) enum Event {
Vfs(mun_vfs::MonitorMessage),
Task(Task),
Lsp(lsp_server::Message),
}
pub(crate) type RequestHandler = fn(&mut LanguageServerState, lsp_server::Response);
pub(crate) struct LanguageServerState {
pub(crate) sender: Sender<lsp_server::Message>,
pub(crate) request_queue: lsp_server::ReqQueue<(String, Instant), RequestHandler>,
pub config: Config,
pub thread_pool: threadpool::ThreadPool,
pub task_sender: Sender<Task>,
pub task_receiver: Receiver<Task>,
pub vfs: Arc<RwLock<VirtualFileSystem>>,
pub vfs_monitor: Box<dyn mun_vfs::Monitor>,
pub vfs_monitor_receiver: Receiver<mun_vfs::MonitorMessage>,
pub open_docs: FxHashSet<AbsPathBuf>,
pub analysis: Analysis,
pub packages: Arc<Vec<mun_project::Package>>,
pub shutdown_requested: bool,
}
pub(crate) struct LanguageServerSnapshot {
pub vfs: Arc<RwLock<VirtualFileSystem>>,
pub analysis: AnalysisSnapshot,
pub packages: Arc<Vec<mun_project::Package>>,
}
impl LanguageServerState {
pub fn new(sender: Sender<lsp_server::Message>, config: Config) -> Self {
let (vfs_monitor_sender, vfs_monitor_receiver) = unbounded::<mun_vfs::MonitorMessage>();
let vfs_monitor: mun_vfs::NotifyMonitor = mun_vfs::Monitor::new(Box::new(move |msg| {
vfs_monitor_sender
.send(msg)
.expect("error sending vfs monitor message to foreground")
}));
let vfs_monitor = Box::new(vfs_monitor) as Box<dyn mun_vfs::Monitor>;
let (task_sender, task_receiver) = unbounded();
let mut analysis = Analysis::default();
let mut change = AnalysisChange::new();
change.set_packages(Default::default());
change.set_roots(Default::default());
analysis.apply_change(change);
LanguageServerState {
sender,
request_queue: ReqQueue::default(),
config,
vfs: Arc::new(RwLock::new(Default::default())),
vfs_monitor,
vfs_monitor_receiver,
open_docs: FxHashSet::default(),
thread_pool: threadpool::ThreadPool::default(),
task_sender,
task_receiver,
analysis,
packages: Arc::new(Vec::new()),
shutdown_requested: false,
}
}
fn next_event(&self, receiver: &Receiver<lsp_server::Message>) -> Option<Event> {
select! {
recv(receiver) -> msg => msg.ok().map(Event::Lsp),
recv(self.vfs_monitor_receiver) -> task => Some(Event::Vfs(task.unwrap())),
recv(self.task_receiver) -> task => Some(Event::Task(task.unwrap()))
}
}
pub fn run(mut self, receiver: Receiver<lsp_server::Message>) -> anyhow::Result<()> {
self.fetch_workspaces();
while let Some(event) = self.next_event(&receiver) {
if let Event::Lsp(lsp_server::Message::Notification(notification)) = &event {
if notification.method == lsp_types::notification::Exit::METHOD {
return Ok(());
}
}
self.handle_event(event)?;
}
Ok(())
}
fn handle_event(&mut self, event: Event) -> anyhow::Result<()> {
let start_time = Instant::now();
log::info!("handling event: {:?}", event);
match event {
Event::Task(task) => self.handle_task(task)?,
Event::Lsp(msg) => match msg {
lsp_server::Message::Request(req) => self.on_request(req, start_time)?,
lsp_server::Message::Response(resp) => self.complete_request(resp),
lsp_server::Message::Notification(not) => self.on_notification(not)?,
},
Event::Vfs(task) => self.handle_vfs_task(task)?,
};
let state_changed = self.process_vfs_changes();
if state_changed {
let snapshot = self.snapshot();
let task_sender = self.task_sender.clone();
self.thread_pool.execute(move || {
let _result = handle_diagnostics(snapshot, task_sender);
});
}
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_task(&mut self, task: Task) -> anyhow::Result<()> {
match task {
Task::Notify(notification) => {
self.send(notification.into());
}
Task::Response(response) => self.respond(response),
}
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_vfs_task(&mut self, mut task: mun_vfs::MonitorMessage) -> anyhow::Result<()> {
loop {
match task {
mun_vfs::MonitorMessage::Progress { total, done } => {
let progress_state = if done == 0 {
Progress::Begin
} else if done < total {
Progress::Report
} else {
Progress::End
};
self.report_progress(
"projects scanned",
progress_state,
Some(format!("{}/{}", done, total)),
Some(Progress::fraction(done, total)),
)
}
mun_vfs::MonitorMessage::Loaded { files } => {
let vfs = &mut *self.vfs.write();
for (path, contents) in files {
vfs.set_file_contents(&path, contents);
}
}
}
task = match self.vfs_monitor_receiver.try_recv() {
Ok(task) => task,
_ => break,
}
}
Ok(())
}
}
fn handle_diagnostics(state: LanguageServerSnapshot, sender: Sender<Task>) -> anyhow::Result<()> {
for (idx, _package) in state.packages.iter().enumerate() {
let package_id = mun_hir::PackageId(idx as u32);
let files = state.analysis.package_source_files(package_id)?;
for file in files {
let line_index = state.analysis.file_line_index(file)?;
let uri = to_lsp::url(&state, file)?;
let diagnostics = state.analysis.diagnostics(file)?;
let diagnostics = {
let mut lsp_diagnostics = Vec::with_capacity(diagnostics.len());
for d in diagnostics {
lsp_diagnostics.push(lsp_types::Diagnostic {
range: to_lsp::range(d.range, &line_index),
severity: Some(lsp_types::DiagnosticSeverity::ERROR),
code: None,
code_description: None,
source: Some("mun".to_string()),
message: d.message,
related_information: {
let mut annotations =
Vec::with_capacity(d.additional_annotations.len());
for annotation in d.additional_annotations {
annotations.push(lsp_types::DiagnosticRelatedInformation {
location: lsp_types::Location {
uri: to_lsp::url(&state, annotation.range.file_id)?,
range: to_lsp::range(
annotation.range.value,
state
.analysis
.file_line_index(annotation.range.file_id)?
.deref(),
),
},
message: annotation.message,
});
}
if annotations.is_empty() {
None
} else {
Some(annotations)
}
},
tags: None,
data: None,
});
}
lsp_diagnostics
};
sender
.send(Task::Notify(lsp_server::Notification {
method: PublishDiagnostics::METHOD.to_owned(),
params: to_json(PublishDiagnosticsParams {
uri,
diagnostics,
version: None,
})
.unwrap(),
}))
.unwrap();
}
}
Ok(())
}
impl LanguageServerState {
pub fn snapshot(&self) -> LanguageServerSnapshot {
LanguageServerSnapshot {
vfs: self.vfs.clone(),
analysis: self.analysis.snapshot(),
packages: self.packages.clone(),
}
}
pub fn process_vfs_changes(&mut self) -> bool {
let changed_files = {
let mut vfs = self.vfs.write();
vfs.take_changes()
};
if changed_files.is_empty() {
return false;
}
let vfs = self.vfs.read();
let mut analysis_change = AnalysisChange::new();
let mut has_created_or_deleted_entries = false;
for file in changed_files {
if file.is_created_or_deleted() {
has_created_or_deleted_entries = true;
}
let bytes = vfs
.file_contents(file.file_id)
.map(Vec::from)
.unwrap_or_default();
let text = String::from_utf8(bytes).ok().map(Arc::from);
analysis_change.change_file(mun_hir::FileId(file.file_id.0), text);
}
if has_created_or_deleted_entries {
analysis_change.set_roots(self.recompute_source_roots());
}
self.analysis.apply_change(analysis_change);
true
}
}
impl Drop for LanguageServerState {
fn drop(&mut self) {
self.analysis.request_cancelation();
self.thread_pool.join();
}
}