1use std::ffi::OsStr;
4use std::fmt;
5use std::future::Future;
6use std::mem::ManuallyDrop;
7use std::ops::Range;
8use std::path::Path;
9use std::path::absolute;
10use std::sync::Arc;
11use std::thread::JoinHandle;
12
13use anyhow::Context;
14use anyhow::Error;
15use anyhow::Result;
16use anyhow::anyhow;
17use anyhow::bail;
18use indexmap::IndexSet;
19use line_index::LineCol;
20use line_index::LineIndex;
21use line_index::WideEncoding;
22use line_index::WideLineCol;
23use lsp_types::GotoDefinitionResponse;
24use lsp_types::Location;
25use path_clean::clean;
26use tokio::runtime::Handle;
27use tokio::sync::mpsc;
28use tokio::sync::oneshot;
29use url::Url;
30use walkdir::WalkDir;
31
32use crate::config::Config;
33use crate::document::Document;
34use crate::graph::DocumentGraphNode;
35use crate::graph::ParseState;
36use crate::queue::AddRequest;
37use crate::queue::AnalysisQueue;
38use crate::queue::AnalyzeRequest;
39use crate::queue::FindAllReferencesRequest;
40use crate::queue::FormatRequest;
41use crate::queue::GotoDefinitionRequest;
42use crate::queue::NotifyChangeRequest;
43use crate::queue::NotifyIncrementalChangeRequest;
44use crate::queue::RemoveRequest;
45use crate::queue::Request;
46use crate::rayon::RayonHandle;
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum ProgressKind {
51 Parsing,
53 Analyzing,
55}
56
57impl fmt::Display for ProgressKind {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 match self {
60 Self::Parsing => write!(f, "parsing"),
61 Self::Analyzing => write!(f, "analyzing"),
62 }
63 }
64}
65
66pub fn path_to_uri(path: impl AsRef<Path>) -> Option<Url> {
68 Url::from_file_path(clean(absolute(path).ok()?)).ok()
69}
70
71#[derive(Debug, Clone)]
75pub struct AnalysisResult {
76 error: Option<Arc<Error>>,
79 version: Option<i32>,
85 lines: Option<Arc<LineIndex>>,
87 document: Document,
89}
90
91impl AnalysisResult {
92 pub(crate) fn new(node: &DocumentGraphNode) -> Self {
94 if let Some(error) = node.analysis_error() {
95 return Self {
96 error: Some(error.clone()),
97 version: node.parse_state().version(),
98 lines: node.parse_state().lines().cloned(),
99 document: Document::default_from_uri(node.uri().clone()),
100 };
101 }
102
103 let (error, version, lines) = match node.parse_state() {
104 ParseState::NotParsed => unreachable!("document should have been parsed"),
105 ParseState::Error(e) => (Some(e), None, None),
106 ParseState::Parsed { version, lines, .. } => (None, *version, Some(lines)),
107 };
108
109 Self {
110 error: error.cloned(),
111 version,
112 lines: lines.cloned(),
113 document: node
114 .document()
115 .expect("analysis should have completed")
116 .clone(),
117 }
118 }
119
120 pub fn error(&self) -> Option<&Arc<Error>> {
126 self.error.as_ref()
127 }
128
129 pub fn version(&self) -> Option<i32> {
134 self.version
135 }
136
137 pub fn lines(&self) -> Option<&Arc<LineIndex>> {
141 self.lines.as_ref()
142 }
143
144 pub fn document(&self) -> &Document {
146 &self.document
147 }
148}
149
150#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Default)]
152pub struct SourcePosition {
153 pub line: u32,
156 pub character: u32,
159}
160
161impl SourcePosition {
162 pub fn new(line: u32, character: u32) -> Self {
164 Self { line, character }
165 }
166}
167
168#[derive(Debug, Eq, PartialEq, Copy, Clone)]
170pub enum SourcePositionEncoding {
171 UTF8,
175 UTF16,
179}
180
181#[derive(Debug, Clone)]
183pub struct SourceEdit {
184 range: Range<SourcePosition>,
188 encoding: SourcePositionEncoding,
190 text: String,
192}
193
194impl SourceEdit {
195 pub fn new(
197 range: Range<SourcePosition>,
198 encoding: SourcePositionEncoding,
199 text: impl Into<String>,
200 ) -> Self {
201 Self {
202 range,
203 encoding,
204 text: text.into(),
205 }
206 }
207
208 pub(crate) fn range(&self) -> Range<SourcePosition> {
210 self.range.start..self.range.end
211 }
212
213 pub(crate) fn apply(&self, source: &mut String, lines: &LineIndex) -> Result<()> {
215 let (start, end) = match self.encoding {
216 SourcePositionEncoding::UTF8 => (
217 LineCol {
218 line: self.range.start.line,
219 col: self.range.start.character,
220 },
221 LineCol {
222 line: self.range.end.line,
223 col: self.range.end.character,
224 },
225 ),
226 SourcePositionEncoding::UTF16 => (
227 lines
228 .to_utf8(
229 WideEncoding::Utf16,
230 WideLineCol {
231 line: self.range.start.line,
232 col: self.range.start.character,
233 },
234 )
235 .context("invalid edit start position")?,
236 lines
237 .to_utf8(
238 WideEncoding::Utf16,
239 WideLineCol {
240 line: self.range.end.line,
241 col: self.range.end.character,
242 },
243 )
244 .context("invalid edit end position")?,
245 ),
246 };
247
248 let range: Range<usize> = lines
249 .offset(start)
250 .context("invalid edit start position")?
251 .into()
252 ..lines
253 .offset(end)
254 .context("invalid edit end position")?
255 .into();
256
257 if !source.is_char_boundary(range.start) {
258 bail!("edit start position is not at a character boundary");
259 }
260
261 if !source.is_char_boundary(range.end) {
262 bail!("edit end position is not at a character boundary");
263 }
264
265 source.replace_range(range, &self.text);
266 Ok(())
267 }
268}
269
270#[derive(Clone, Debug)]
272pub struct IncrementalChange {
273 pub version: i32,
277 pub start: Option<String>,
285 pub edits: Vec<SourceEdit>,
287}
288
289#[derive(Debug)]
303pub struct Analyzer<Context> {
304 sender: ManuallyDrop<mpsc::UnboundedSender<Request<Context>>>,
306 handle: Option<JoinHandle<()>>,
308}
309
310impl<Context> Analyzer<Context>
311where
312 Context: Send + Clone + 'static,
313{
314 pub fn new<Progress, Return>(config: Config, progress: Progress) -> Self
322 where
323 Progress: Fn(Context, ProgressKind, usize, usize) -> Return + Send + 'static,
324 Return: Future<Output = ()>,
325 {
326 Self::new_with_validator(config, progress, crate::Validator::default)
327 }
328
329 pub fn new_with_validator<Progress, Return, Validator>(
338 config: Config,
339 progress: Progress,
340 validator: Validator,
341 ) -> Self
342 where
343 Progress: Fn(Context, ProgressKind, usize, usize) -> Return + Send + 'static,
344 Return: Future<Output = ()>,
345 Validator: Fn() -> crate::Validator + Send + Sync + 'static,
346 {
347 let (tx, rx) = mpsc::unbounded_channel();
348 let tokio = Handle::current();
349 let handle = std::thread::spawn(move || {
350 let queue = AnalysisQueue::new(config, tokio, progress, validator);
351 queue.run(rx);
352 });
353
354 Self {
355 sender: ManuallyDrop::new(tx),
356 handle: Some(handle),
357 }
358 }
359
360 pub async fn add_document(&self, uri: Url) -> Result<()> {
364 let mut documents = IndexSet::new();
365 documents.insert(uri);
366
367 let (tx, rx) = oneshot::channel();
368 self.sender
369 .send(Request::Add(AddRequest {
370 documents,
371 completed: tx,
372 }))
373 .map_err(|_| {
374 anyhow!("failed to send request to analysis queue because the channel has closed")
375 })?;
376
377 rx.await.map_err(|_| {
378 anyhow!("failed to receive response from analysis queue because the channel has closed")
379 })?;
380
381 Ok(())
382 }
383
384 pub async fn add_directory(&self, path: impl AsRef<Path>) -> Result<()> {
390 let path = path.as_ref().to_path_buf();
391 let documents = RayonHandle::spawn(move || -> Result<IndexSet<Url>> {
393 let mut documents = IndexSet::new();
394
395 let metadata = path.metadata().with_context(|| {
396 format!(
397 "failed to read metadata for `{path}`",
398 path = path.display()
399 )
400 })?;
401
402 if metadata.is_file() {
403 bail!("`{path}` is a file, not a directory", path = path.display());
404 }
405
406 for result in WalkDir::new(&path).follow_links(true) {
407 let entry = result.with_context(|| {
408 format!("failed to read directory `{path}`", path = path.display())
409 })?;
410 if !entry.file_type().is_file()
411 || entry.path().extension().and_then(OsStr::to_str) != Some("wdl")
412 {
413 continue;
414 }
415
416 documents.insert(path_to_uri(entry.path()).with_context(|| {
417 format!(
418 "failed to convert path `{path}` to a URI",
419 path = entry.path().display()
420 )
421 })?);
422 }
423
424 Ok(documents)
425 })
426 .await?;
427
428 if documents.is_empty() {
429 return Ok(());
430 }
431
432 let (tx, rx) = oneshot::channel();
434 self.sender
435 .send(Request::Add(AddRequest {
436 documents,
437 completed: tx,
438 }))
439 .map_err(|_| {
440 anyhow!("failed to send request to analysis queue because the channel has closed")
441 })?;
442
443 rx.await.map_err(|_| {
444 anyhow!("failed to receive response from analysis queue because the channel has closed")
445 })?;
446
447 Ok(())
448 }
449
450 pub async fn remove_documents(&self, documents: Vec<Url>) -> Result<()> {
457 let (tx, rx) = oneshot::channel();
459 self.sender
460 .send(Request::Remove(RemoveRequest {
461 documents,
462 completed: tx,
463 }))
464 .map_err(|_| {
465 anyhow!("failed to send request to analysis queue because the channel has closed")
466 })?;
467
468 rx.await.map_err(|_| {
469 anyhow!("failed to receive response from analysis queue because the channel has closed")
470 })?;
471
472 Ok(())
473 }
474
475 pub fn notify_incremental_change(
479 &self,
480 document: Url,
481 change: IncrementalChange,
482 ) -> Result<()> {
483 self.sender
484 .send(Request::NotifyIncrementalChange(
485 NotifyIncrementalChangeRequest { document, change },
486 ))
487 .map_err(|_| {
488 anyhow!("failed to send request to analysis queue because the channel has closed")
489 })
490 }
491
492 pub fn notify_change(&self, document: Url, discard_pending: bool) -> Result<()> {
501 self.sender
502 .send(Request::NotifyChange(NotifyChangeRequest {
503 document,
504 discard_pending,
505 }))
506 .map_err(|_| {
507 anyhow!("failed to send request to analysis queue because the channel has closed")
508 })
509 }
510
511 pub async fn analyze_document(
520 &self,
521 context: Context,
522 document: Url,
523 ) -> Result<Vec<AnalysisResult>> {
524 let (tx, rx) = oneshot::channel();
526 self.sender
527 .send(Request::Analyze(AnalyzeRequest {
528 document: Some(document),
529 context,
530 completed: tx,
531 }))
532 .map_err(|_| {
533 anyhow!("failed to send request to analysis queue because the channel has closed")
534 })?;
535
536 rx.await.map_err(|_| {
537 anyhow!("failed to receive response from analysis queue because the channel has closed")
538 })?
539 }
540
541 pub async fn analyze(&self, context: Context) -> Result<Vec<AnalysisResult>> {
550 let (tx, rx) = oneshot::channel();
552 self.sender
553 .send(Request::Analyze(AnalyzeRequest {
554 document: None, context,
556 completed: tx,
557 }))
558 .map_err(|_| {
559 anyhow!("failed to send request to analysis queue because the channel has closed")
560 })?;
561
562 rx.await.map_err(|_| {
563 anyhow!("failed to receive response from analysis queue because the channel has closed")
564 })?
565 }
566
567 pub async fn format_document(&self, document: Url) -> Result<Option<(u32, u32, String)>> {
569 let (tx, rx) = oneshot::channel();
570 self.sender
571 .send(Request::Format(FormatRequest {
572 document,
573 completed: tx,
574 }))
575 .map_err(|_| {
576 anyhow!("failed to send format request to the queue because the channel has closed")
577 })?;
578
579 rx.await.map_err(|_| {
580 anyhow!("failed to send format request to the queue because the channel has closed")
581 })
582 }
583
584 pub async fn goto_definition(
586 &self,
587 document: Url,
588 position: SourcePosition,
589 encoding: SourcePositionEncoding,
590 ) -> Result<Option<GotoDefinitionResponse>> {
591 let (tx, rx) = oneshot::channel();
592 self.sender
593 .send(Request::GotoDefinition(GotoDefinitionRequest {
594 document,
595 position,
596 encoding,
597 completed: tx,
598 }))
599 .map_err(|_| {
600 anyhow!(
601 "failed to send goto definition request to analysis queue because the channel \
602 has closed"
603 )
604 })?;
605
606 rx.await.map_err(|_| {
607 anyhow!(
608 "failed to receive goto definition response from analysis queue because the \
609 channel has closed"
610 )
611 })
612 }
613
614 pub async fn find_all_references(
616 &self,
617 document: Url,
618 position: SourcePosition,
619 encoding: SourcePositionEncoding,
620 include_declaration: bool,
621 ) -> Result<Vec<Location>> {
622 let (tx, rx) = oneshot::channel();
623 self.sender
624 .send(Request::FindAllReferences(FindAllReferencesRequest {
625 document,
626 position,
627 encoding,
628 include_declaration,
629 completed: tx,
630 }))
631 .map_err(|_| {
632 anyhow!(
633 "failed to send find all references request to analysis queue because the \
634 channel has closed"
635 )
636 })?;
637
638 rx.await.map_err(|_| {
639 anyhow!(
640 "failed to receive find all references response from analysis queue because the \
641 client channel has closed"
642 )
643 })
644 }
645}
646
647impl Default for Analyzer<()> {
648 fn default() -> Self {
649 Self::new(Default::default(), |_, _, _, _| async {})
650 }
651}
652
653impl<C> Drop for Analyzer<C> {
654 fn drop(&mut self) {
655 unsafe { ManuallyDrop::drop(&mut self.sender) };
656 if let Some(handle) = self.handle.take() {
657 handle.join().unwrap();
658 }
659 }
660}
661
662const _: () = {
665 const fn _assert<T: Send + Sync>() {}
667 _assert::<Analyzer<()>>();
668};
669
670#[cfg(test)]
671mod test {
672 use std::fs;
673
674 use tempfile::TempDir;
675 use wdl_ast::Severity;
676
677 use super::*;
678
679 #[tokio::test]
680 async fn it_returns_empty_results() {
681 let analyzer = Analyzer::default();
682 let results = analyzer.analyze(()).await.unwrap();
683 assert!(results.is_empty());
684 }
685
686 #[tokio::test]
687 async fn it_analyzes_a_document() {
688 let dir = TempDir::new().expect("failed to create temporary directory");
689 let path = dir.path().join("foo.wdl");
690 fs::write(
691 &path,
692 r#"version 1.1
693
694task test {
695 command <<<>>>
696}
697
698workflow test {
699}
700"#,
701 )
702 .expect("failed to create test file");
703
704 let analyzer = Analyzer::default();
706 analyzer
707 .add_document(path_to_uri(&path).expect("should convert to URI"))
708 .await
709 .expect("should add document");
710
711 let results = analyzer.analyze(()).await.unwrap();
712 assert_eq!(results.len(), 1);
713 assert_eq!(results[0].document.diagnostics().len(), 1);
714 assert_eq!(results[0].document.diagnostics()[0].rule(), None);
715 assert_eq!(
716 results[0].document.diagnostics()[0].severity(),
717 Severity::Error
718 );
719 assert_eq!(
720 results[0].document.diagnostics()[0].message(),
721 "conflicting workflow name `test`"
722 );
723
724 let id = results[0].document.id().clone();
726 let results = analyzer.analyze(()).await.unwrap();
727 assert_eq!(results.len(), 1);
728 assert_eq!(results[0].document.id().as_ref(), id.as_ref());
729 assert_eq!(results[0].document.diagnostics().len(), 1);
730 assert_eq!(results[0].document.diagnostics()[0].rule(), None);
731 assert_eq!(
732 results[0].document.diagnostics()[0].severity(),
733 Severity::Error
734 );
735 assert_eq!(
736 results[0].document.diagnostics()[0].message(),
737 "conflicting workflow name `test`"
738 );
739 }
740
741 #[tokio::test]
742 async fn it_reanalyzes_a_document_on_change() {
743 let dir = TempDir::new().expect("failed to create temporary directory");
744 let path = dir.path().join("foo.wdl");
745 fs::write(
746 &path,
747 r#"version 1.1
748
749task test {
750 command <<<>>>
751}
752
753workflow test {
754}
755"#,
756 )
757 .expect("failed to create test file");
758
759 let analyzer = Analyzer::default();
761 analyzer
762 .add_document(path_to_uri(&path).expect("should convert to URI"))
763 .await
764 .expect("should add document");
765
766 let results = analyzer.analyze(()).await.unwrap();
767 assert_eq!(results.len(), 1);
768 assert_eq!(results[0].document.diagnostics().len(), 1);
769 assert_eq!(results[0].document.diagnostics()[0].rule(), None);
770 assert_eq!(
771 results[0].document.diagnostics()[0].severity(),
772 Severity::Error
773 );
774 assert_eq!(
775 results[0].document.diagnostics()[0].message(),
776 "conflicting workflow name `test`"
777 );
778
779 fs::write(
781 &path,
782 r#"version 1.1
783
784task test {
785 command <<<>>>
786}
787
788workflow something_else {
789}
790"#,
791 )
792 .expect("failed to create test file");
793
794 let uri = path_to_uri(&path).expect("should convert to URI");
795 analyzer.notify_change(uri.clone(), false).unwrap();
796
797 let id = results[0].document.id().clone();
800 let results = analyzer.analyze(()).await.unwrap();
801 assert_eq!(results.len(), 1);
802 assert!(results[0].document.id().as_ref() != id.as_ref());
803 assert_eq!(results[0].document.diagnostics().len(), 0);
804
805 let id = results[0].document.id().clone();
807 let results = analyzer.analyze_document((), uri).await.unwrap();
808 assert_eq!(results.len(), 1);
809 assert!(results[0].document.id().as_ref() == id.as_ref());
810 assert_eq!(results[0].document.diagnostics().len(), 0);
811 }
812
813 #[tokio::test]
814 async fn it_reanalyzes_a_document_on_incremental_change() {
815 let dir = TempDir::new().expect("failed to create temporary directory");
816 let path = dir.path().join("foo.wdl");
817 fs::write(
818 &path,
819 r#"version 1.1
820
821task test {
822 command <<<>>>
823}
824
825workflow test {
826}
827"#,
828 )
829 .expect("failed to create test file");
830
831 let analyzer = Analyzer::default();
833 analyzer
834 .add_document(path_to_uri(&path).expect("should convert to URI"))
835 .await
836 .expect("should add document");
837
838 let results = analyzer.analyze(()).await.unwrap();
839 assert_eq!(results.len(), 1);
840 assert_eq!(results[0].document.diagnostics().len(), 1);
841 assert_eq!(results[0].document.diagnostics()[0].rule(), None);
842 assert_eq!(
843 results[0].document.diagnostics()[0].severity(),
844 Severity::Error
845 );
846 assert_eq!(
847 results[0].document.diagnostics()[0].message(),
848 "conflicting workflow name `test`"
849 );
850
851 let uri = path_to_uri(&path).expect("should convert to URI");
853 analyzer
854 .notify_incremental_change(
855 uri.clone(),
856 IncrementalChange {
857 version: 2,
858 start: None,
859 edits: vec![SourceEdit {
860 range: SourcePosition::new(6, 9)..SourcePosition::new(6, 13),
861 encoding: SourcePositionEncoding::UTF8,
862 text: "something_else".to_string(),
863 }],
864 },
865 )
866 .unwrap();
867
868 let id = results[0].document.id().clone();
871 let results = analyzer.analyze_document((), uri).await.unwrap();
872 assert_eq!(results.len(), 1);
873 assert!(results[0].document.id().as_ref() != id.as_ref());
874 assert_eq!(results[0].document.diagnostics().len(), 0);
875 }
876
877 #[tokio::test]
878 async fn it_removes_documents() {
879 let dir = TempDir::new().expect("failed to create temporary directory");
880 let foo = dir.path().join("foo.wdl");
881 fs::write(
882 &foo,
883 r#"version 1.1
884workflow test {
885}
886"#,
887 )
888 .expect("failed to create test file");
889
890 let bar = dir.path().join("bar.wdl");
891 fs::write(
892 &bar,
893 r#"version 1.1
894workflow test {
895}
896"#,
897 )
898 .expect("failed to create test file");
899
900 let baz = dir.path().join("baz.wdl");
901 fs::write(
902 &baz,
903 r#"version 1.1
904workflow test {
905}
906"#,
907 )
908 .expect("failed to create test file");
909
910 let analyzer = Analyzer::default();
912 analyzer
913 .add_directory(dir.path().to_path_buf())
914 .await
915 .expect("should add documents");
916
917 let results = analyzer.analyze(()).await.unwrap();
919 assert_eq!(results.len(), 3);
920 assert!(results[0].document.diagnostics().is_empty());
921 assert!(results[1].document.diagnostics().is_empty());
922 assert!(results[2].document.diagnostics().is_empty());
923
924 let results = analyzer.analyze(()).await.unwrap();
926 assert_eq!(results.len(), 3);
927
928 analyzer
930 .remove_documents(vec![
931 path_to_uri(dir.path()).expect("should convert to URI"),
932 ])
933 .await
934 .unwrap();
935 let results = analyzer.analyze(()).await.unwrap();
936 assert!(results.is_empty());
937 }
938}