use std::ffi::OsStr;
use std::fmt;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::ops::Range;
use std::path::Path;
use std::path::absolute;
use std::sync::Arc;
use std::thread::JoinHandle;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use ignore::WalkBuilder;
use indexmap::IndexSet;
use line_index::LineCol;
use line_index::LineIndex;
use line_index::WideEncoding;
use line_index::WideLineCol;
use lsp_types::CompletionResponse;
use lsp_types::DocumentSymbolResponse;
use lsp_types::GotoDefinitionResponse;
use lsp_types::Hover;
use lsp_types::InlayHint;
use lsp_types::Location;
use lsp_types::SemanticTokensResult;
use lsp_types::SignatureHelp;
use lsp_types::SymbolInformation;
use lsp_types::WorkspaceEdit;
use path_clean::PathClean;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use url::Url;
use crate::config::Config;
use crate::document::Document;
use crate::graph::DocumentGraphNode;
use crate::graph::ParseState;
use crate::queue::AddRequest;
use crate::queue::AnalysisQueue;
use crate::queue::AnalyzeRequest;
use crate::queue::CompletionRequest;
use crate::queue::DocumentSymbolRequest;
use crate::queue::FindAllReferencesRequest;
use crate::queue::FormatRequest;
use crate::queue::GotoDefinitionRequest;
use crate::queue::HoverRequest;
use crate::queue::InlayHintsRequest;
use crate::queue::NotifyChangeRequest;
use crate::queue::NotifyIncrementalChangeRequest;
use crate::queue::RemoveRequest;
use crate::queue::RenameRequest;
use crate::queue::Request;
use crate::queue::SemanticTokenRequest;
use crate::queue::SignatureHelpRequest;
use crate::queue::WorkspaceSymbolRequest;
use crate::rayon::RayonHandle;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProgressKind {
Parsing,
Analyzing,
}
impl fmt::Display for ProgressKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Parsing => write!(f, "parsing"),
Self::Analyzing => write!(f, "analyzing"),
}
}
}
pub fn path_to_uri(path: impl AsRef<Path>) -> Option<Url> {
Url::from_file_path(absolute(path).ok()?.clean()).ok()
}
#[derive(Debug, Clone)]
pub struct AnalysisResult {
error: Option<Arc<Error>>,
version: Option<i32>,
lines: Option<Arc<LineIndex>>,
document: Document,
}
impl AnalysisResult {
pub(crate) fn new(node: &DocumentGraphNode) -> Self {
if let Some(error) = node.analysis_error() {
return Self {
error: Some(error.clone()),
version: node.parse_state().version(),
lines: node.parse_state().lines().cloned(),
document: Document::default_from_uri(node.uri().clone()),
};
}
let (error, version, lines) = match node.parse_state() {
ParseState::NotParsed => unreachable!("document should have been parsed"),
ParseState::Error(e) => (Some(e), None, None),
ParseState::Parsed { version, lines, .. } => (None, *version, Some(lines)),
};
Self {
error: error.cloned(),
version,
lines: lines.cloned(),
document: node
.document()
.expect("analysis should have completed")
.clone(),
}
}
pub fn error(&self) -> Option<&Arc<Error>> {
self.error.as_ref()
}
pub fn version(&self) -> Option<i32> {
self.version
}
pub fn lines(&self) -> Option<&Arc<LineIndex>> {
self.lines.as_ref()
}
pub fn document(&self) -> &Document {
&self.document
}
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Default)]
pub struct SourcePosition {
pub line: u32,
pub character: u32,
}
impl SourcePosition {
pub fn new(line: u32, character: u32) -> Self {
Self { line, character }
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum SourcePositionEncoding {
UTF8,
UTF16,
}
#[derive(Debug, Clone)]
pub struct SourceEdit {
range: Range<SourcePosition>,
encoding: SourcePositionEncoding,
text: String,
}
impl SourceEdit {
pub fn new(
range: Range<SourcePosition>,
encoding: SourcePositionEncoding,
text: impl Into<String>,
) -> Self {
Self {
range,
encoding,
text: text.into(),
}
}
pub(crate) fn range(&self) -> Range<SourcePosition> {
self.range.start..self.range.end
}
pub(crate) fn apply(&self, source: &mut String, lines: &LineIndex) -> Result<()> {
let (start, end) = match self.encoding {
SourcePositionEncoding::UTF8 => (
LineCol {
line: self.range.start.line,
col: self.range.start.character,
},
LineCol {
line: self.range.end.line,
col: self.range.end.character,
},
),
SourcePositionEncoding::UTF16 => (
lines
.to_utf8(
WideEncoding::Utf16,
WideLineCol {
line: self.range.start.line,
col: self.range.start.character,
},
)
.context("invalid edit start position")?,
lines
.to_utf8(
WideEncoding::Utf16,
WideLineCol {
line: self.range.end.line,
col: self.range.end.character,
},
)
.context("invalid edit end position")?,
),
};
let range: Range<usize> = lines
.offset(start)
.context("invalid edit start position")?
.into()
..lines
.offset(end)
.context("invalid edit end position")?
.into();
if !source.is_char_boundary(range.start) {
bail!("edit start position is not at a character boundary");
}
if !source.is_char_boundary(range.end) {
bail!("edit end position is not at a character boundary");
}
source.replace_range(range, &self.text);
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct IncrementalChange {
pub version: i32,
pub start: Option<String>,
pub edits: Vec<SourceEdit>,
}
#[derive(Debug)]
pub struct Analyzer<Context> {
sender: ManuallyDrop<mpsc::UnboundedSender<Request<Context>>>,
handle: Option<JoinHandle<()>>,
config: Config,
}
impl<Context> Analyzer<Context>
where
Context: Send + Clone + 'static,
{
pub fn new<Progress, Return>(config: Config, progress: Progress) -> Self
where
Progress: Fn(Context, ProgressKind, usize, usize) -> Return + Send + 'static,
Return: Future<Output = ()>,
{
Self::new_with_validator(config, progress, crate::Validator::default)
}
pub fn new_with_validator<Progress, Return, Validator>(
config: Config,
progress: Progress,
validator: Validator,
) -> Self
where
Progress: Fn(Context, ProgressKind, usize, usize) -> Return + Send + 'static,
Return: Future<Output = ()>,
Validator: Fn() -> crate::Validator + Send + Sync + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
let tokio = Handle::current();
let inner_config = config.clone();
let handle = std::thread::spawn(move || {
let queue = AnalysisQueue::new(inner_config, tokio, progress, validator);
queue.run(rx);
});
Self {
sender: ManuallyDrop::new(tx),
handle: Some(handle),
config,
}
}
pub async fn add_document(&self, uri: Url) -> Result<()> {
let mut documents = IndexSet::new();
documents.insert(uri);
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Add(AddRequest {
documents,
completed: tx,
}))
.map_err(|_| {
anyhow!("failed to send request to analysis queue because the channel has closed")
})?;
rx.await.map_err(|_| {
anyhow!("failed to receive response from analysis queue because the channel has closed")
})?;
Ok(())
}
pub async fn add_directory(&self, path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref().to_path_buf();
let config = self.config.clone();
let documents = RayonHandle::spawn(move || -> Result<IndexSet<Url>> {
let mut documents = IndexSet::new();
let metadata = path.metadata().with_context(|| {
format!(
"failed to read metadata for `{path}`",
path = path.display()
)
})?;
if metadata.is_file() {
bail!("`{path}` is a file, not a directory", path = path.display());
}
let mut walker = WalkBuilder::new(&path);
if let Some(ignore_filename) = config.ignore_filename() {
walker.add_custom_ignore_filename(ignore_filename);
}
let walker = walker
.standard_filters(false)
.parents(true)
.follow_links(true)
.build();
for result in walker {
let entry = result.with_context(|| {
format!("failed to read directory `{path}`", path = path.display())
})?;
let Some(file_type) = entry.file_type() else {
continue;
};
if !file_type.is_file() {
continue;
}
if entry.path().extension() != Some(OsStr::new("wdl")) {
continue;
}
documents.insert(path_to_uri(entry.path()).with_context(|| {
format!(
"failed to convert path `{path}` to a URI",
path = entry.path().display()
)
})?);
}
Ok(documents)
})
.await?;
if documents.is_empty() {
return Ok(());
}
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Add(AddRequest {
documents,
completed: tx,
}))
.map_err(|_| {
anyhow!("failed to send request to analysis queue because the channel has closed")
})?;
rx.await.map_err(|_| {
anyhow!("failed to receive response from analysis queue because the channel has closed")
})?;
Ok(())
}
pub async fn remove_documents(&self, documents: Vec<Url>) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Remove(RemoveRequest {
documents,
completed: tx,
}))
.map_err(|_| {
anyhow!("failed to send request to analysis queue because the channel has closed")
})?;
rx.await.map_err(|_| {
anyhow!("failed to receive response from analysis queue because the channel has closed")
})?;
Ok(())
}
pub fn notify_incremental_change(
&self,
document: Url,
change: IncrementalChange,
) -> Result<()> {
self.sender
.send(Request::NotifyIncrementalChange(
NotifyIncrementalChangeRequest { document, change },
))
.map_err(|_| {
anyhow!("failed to send request to analysis queue because the channel has closed")
})
}
pub fn notify_change(&self, document: Url, discard_pending: bool) -> Result<()> {
self.sender
.send(Request::NotifyChange(NotifyChangeRequest {
document,
discard_pending,
}))
.map_err(|_| {
anyhow!("failed to send request to analysis queue because the channel has closed")
})
}
pub async fn analyze_document(
&self,
context: Context,
document: Url,
) -> Result<Vec<AnalysisResult>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Analyze(AnalyzeRequest {
document: Some(document),
context,
completed: tx,
}))
.map_err(|_| {
anyhow!("failed to send request to analysis queue because the channel has closed")
})?;
rx.await.map_err(|_| {
anyhow!("failed to receive response from analysis queue because the channel has closed")
})?
}
pub async fn analyze(&self, context: Context) -> Result<Vec<AnalysisResult>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Analyze(AnalyzeRequest {
document: None, context,
completed: tx,
}))
.map_err(|_| {
anyhow!("failed to send request to analysis queue because the channel has closed")
})?;
rx.await.map_err(|_| {
anyhow!("failed to receive response from analysis queue because the channel has closed")
})?
}
pub async fn format_document(&self, document: Url) -> Result<Option<(u32, u32, String)>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Format(FormatRequest {
document,
completed: tx,
}))
.map_err(|_| {
anyhow!("failed to send format request to the queue because the channel has closed")
})?;
rx.await.map_err(|_| {
anyhow!("failed to send format request to the queue because the channel has closed")
})
}
pub async fn goto_definition(
&self,
document: Url,
position: SourcePosition,
encoding: SourcePositionEncoding,
) -> Result<Option<GotoDefinitionResponse>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::GotoDefinition(GotoDefinitionRequest {
document,
position,
encoding,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send goto definition request to analysis queue because the channel \
has closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive goto definition response from analysis queue because the \
channel has closed"
)
})
}
pub async fn find_all_references(
&self,
document: Url,
position: SourcePosition,
encoding: SourcePositionEncoding,
include_declaration: bool,
) -> Result<Vec<Location>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::FindAllReferences(FindAllReferencesRequest {
document,
position,
encoding,
include_declaration,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send find all references request to analysis queue because the \
channel has closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive find all references response from analysis queue because the \
client channel has closed"
)
})
}
pub async fn completion(
&self,
context: Context,
document: Url,
position: SourcePosition,
encoding: SourcePositionEncoding,
) -> Result<Option<CompletionResponse>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Completion(CompletionRequest {
document,
position,
encoding,
context,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send completion request to analysis queue because the channel has \
closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to send completion request to analysis queue because the channel has \
closed"
)
})
}
pub async fn hover(
&self,
document: Url,
position: SourcePosition,
encoding: SourcePositionEncoding,
) -> Result<Option<Hover>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Hover(HoverRequest {
document,
position,
encoding,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send hover request to analysis queue because the channel has closed"
)
})?;
rx.await.map_err(|_| {
anyhow!("failed to send hover request to analysis queue because the channel has closed")
})
}
pub async fn rename(
&self,
document: Url,
position: SourcePosition,
encoding: SourcePositionEncoding,
new_name: String,
) -> Result<Option<WorkspaceEdit>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::Rename(RenameRequest {
document,
position,
encoding,
new_name,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send rename request to analysis queue because the channel has \
closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive rename response from analysis queue because the channel has \
closed"
)
})
}
pub async fn semantic_tokens(&self, document: Url) -> Result<Option<SemanticTokensResult>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::SemanticTokens(SemanticTokenRequest {
document,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send semantic tokens request to analysis queue because the channel \
has closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive semantic tokens response from analysis queue because the \
channel has closed"
)
})
}
pub async fn document_symbol(&self, document: Url) -> Result<Option<DocumentSymbolResponse>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::DocumentSymbol(DocumentSymbolRequest {
document,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send document symbol request to analysis queue because the channel \
has closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive document symbol request to analysis queue because the channel \
has closed"
)
})
}
pub async fn workspace_symbol(&self, query: String) -> Result<Option<Vec<SymbolInformation>>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::WorkspaceSymbol(WorkspaceSymbolRequest {
query,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send workspace symbol request to analysis queue because the \
channel has closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive workspace symbol response from analysis queue because the \
channel has closed"
)
})
}
pub async fn signature_help(
&self,
document: Url,
position: SourcePosition,
encoding: SourcePositionEncoding,
) -> Result<Option<SignatureHelp>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::SignatureHelp(SignatureHelpRequest {
document,
position,
encoding,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send signature help request to analysis queue because the channel \
has closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive signature help response from analysis queue because the \
channel has closed"
)
})
}
pub async fn inlay_hints(
&self,
document: Url,
range: lsp_types::Range,
) -> Result<Option<Vec<InlayHint>>> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Request::InlayHints(InlayHintsRequest {
document,
range,
completed: tx,
}))
.map_err(|_| {
anyhow!(
"failed to send inlay hints request to analysis queue because the channel has \
closed"
)
})?;
rx.await.map_err(|_| {
anyhow!(
"failed to receive inlay hints response from analysis queue because the channel \
has closed"
)
})
}
}
impl Default for Analyzer<()> {
fn default() -> Self {
Self::new(Default::default(), |_, _, _, _| async {})
}
}
impl<C> Drop for Analyzer<C> {
fn drop(&mut self) {
unsafe { ManuallyDrop::drop(&mut self.sender) };
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
}
const _: () = {
const fn _assert<T: Send + Sync>() {}
_assert::<Analyzer<()>>();
};
#[cfg(test)]
mod test {
use std::fs;
use tempfile::TempDir;
use wdl_ast::Severity;
use super::*;
#[tokio::test]
async fn it_returns_empty_results() {
let analyzer = Analyzer::default();
let results = analyzer.analyze(()).await.unwrap();
assert!(results.is_empty());
}
#[tokio::test]
async fn it_analyzes_a_document() {
let dir = TempDir::new().expect("failed to create temporary directory");
let path = dir.path().join("foo.wdl");
fs::write(
&path,
r#"version 1.1
task test {
command <<<>>>
}
workflow test {
}
"#,
)
.expect("failed to create test file");
let analyzer = Analyzer::default();
analyzer
.add_document(path_to_uri(&path).expect("should convert to URI"))
.await
.expect("should add document");
let results = analyzer.analyze(()).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].document.diagnostics().count(), 1);
assert_eq!(
results[0].document.diagnostics().next().unwrap().rule(),
None
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().severity(),
Severity::Error
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().message(),
"conflicting workflow name `test`"
);
let id = results[0].document.id().clone();
let results = analyzer.analyze(()).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].document.id().as_ref(), id.as_ref());
assert_eq!(results[0].document.diagnostics().count(), 1);
assert_eq!(
results[0].document.diagnostics().next().unwrap().rule(),
None
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().severity(),
Severity::Error
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().message(),
"conflicting workflow name `test`"
);
}
#[tokio::test]
async fn it_reanalyzes_a_document_on_change() {
let dir = TempDir::new().expect("failed to create temporary directory");
let path = dir.path().join("foo.wdl");
fs::write(
&path,
r#"version 1.1
task test {
command <<<>>>
}
workflow test {
}
"#,
)
.expect("failed to create test file");
let analyzer = Analyzer::default();
analyzer
.add_document(path_to_uri(&path).expect("should convert to URI"))
.await
.expect("should add document");
let results = analyzer.analyze(()).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].document.diagnostics().count(), 1);
assert_eq!(
results[0].document.diagnostics().next().unwrap().rule(),
None
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().severity(),
Severity::Error
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().message(),
"conflicting workflow name `test`"
);
fs::write(
&path,
r#"version 1.1
task test {
command <<<>>>
}
workflow something_else {
}
"#,
)
.expect("failed to create test file");
let uri = path_to_uri(&path).expect("should convert to URI");
analyzer.notify_change(uri.clone(), false).unwrap();
let id = results[0].document.id().clone();
let results = analyzer.analyze(()).await.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].document.id().as_ref() != id.as_ref());
assert_eq!(results[0].document.diagnostics().count(), 0);
let id = results[0].document.id().clone();
let results = analyzer.analyze_document((), uri).await.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].document.id().as_ref() == id.as_ref());
assert_eq!(results[0].document.diagnostics().count(), 0);
}
#[tokio::test]
async fn it_reanalyzes_a_document_on_incremental_change() {
let dir = TempDir::new().expect("failed to create temporary directory");
let path = dir.path().join("foo.wdl");
fs::write(
&path,
r#"version 1.1
task test {
command <<<>>>
}
workflow test {
}
"#,
)
.expect("failed to create test file");
let analyzer = Analyzer::default();
analyzer
.add_document(path_to_uri(&path).expect("should convert to URI"))
.await
.expect("should add document");
let results = analyzer.analyze(()).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].document.diagnostics().count(), 1);
assert_eq!(
results[0].document.diagnostics().next().unwrap().rule(),
None
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().severity(),
Severity::Error
);
assert_eq!(
results[0].document.diagnostics().next().unwrap().message(),
"conflicting workflow name `test`"
);
let uri = path_to_uri(&path).expect("should convert to URI");
analyzer
.notify_incremental_change(
uri.clone(),
IncrementalChange {
version: 2,
start: None,
edits: vec![SourceEdit {
range: SourcePosition::new(6, 9)..SourcePosition::new(6, 13),
encoding: SourcePositionEncoding::UTF8,
text: "something_else".to_string(),
}],
},
)
.unwrap();
let id = results[0].document.id().clone();
let results = analyzer.analyze_document((), uri).await.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].document.id().as_ref() != id.as_ref());
assert_eq!(results[0].document.diagnostics().count(), 0);
}
#[tokio::test]
async fn it_removes_documents() {
let dir = TempDir::new().expect("failed to create temporary directory");
let foo = dir.path().join("foo.wdl");
fs::write(
&foo,
r#"version 1.1
workflow test {
}
"#,
)
.expect("failed to create test file");
let bar = dir.path().join("bar.wdl");
fs::write(
&bar,
r#"version 1.1
workflow test {
}
"#,
)
.expect("failed to create test file");
let baz = dir.path().join("baz.wdl");
fs::write(
&baz,
r#"version 1.1
workflow test {
}
"#,
)
.expect("failed to create test file");
let analyzer = Analyzer::default();
analyzer
.add_directory(dir.path())
.await
.expect("should add documents");
let results = analyzer.analyze(()).await.unwrap();
assert_eq!(results.len(), 3);
assert!(results[0].document.diagnostics().next().is_none());
assert!(results[1].document.diagnostics().next().is_none());
assert!(results[2].document.diagnostics().next().is_none());
let results = analyzer.analyze(()).await.unwrap();
assert_eq!(results.len(), 3);
analyzer
.remove_documents(vec![
path_to_uri(dir.path()).expect("should convert to URI"),
])
.await
.unwrap();
let results = analyzer.analyze(()).await.unwrap();
assert!(results.is_empty());
}
}