1use std::collections::HashSet;
4use std::fs;
5use std::panic;
6use std::sync::Arc;
7use std::time::Duration;
8use std::time::Instant;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use anyhow::bail;
14use indexmap::IndexMap;
15use indexmap::IndexSet;
16use line_index::LineIndex;
17use petgraph::Direction;
18use petgraph::algo::has_path_connecting;
19use petgraph::graph::NodeIndex;
20use petgraph::stable_graph::StableDiGraph;
21use petgraph::visit::Bfs;
22use petgraph::visit::EdgeRef;
23use petgraph::visit::Visitable;
24use petgraph::visit::Walker;
25use reqwest::Client;
26use rowan::GreenNode;
27use tokio::runtime::Handle;
28use tracing::debug;
29use tracing::info;
30use url::Url;
31use wdl_ast::AstNode;
32use wdl_ast::AstToken as _;
33use wdl_ast::Diagnostic;
34use wdl_ast::SupportedVersion;
35use wdl_ast::SyntaxNode;
36
37use crate::Config;
38use crate::IncrementalChange;
39use crate::document::Document;
40
41pub type DfsSpace =
43 petgraph::algo::DfsSpace<NodeIndex, <StableDiGraph<DocumentGraphNode, ()> as Visitable>::Map>;
44
45#[derive(Debug, Clone)]
47pub enum ParseState {
48 NotParsed,
50 Error(Arc<anyhow::Error>),
52 Parsed {
54 version: Option<i32>,
60 wdl_version: Option<SupportedVersion>,
66 root: GreenNode,
68 lines: Arc<LineIndex>,
70 diagnostics: Vec<Diagnostic>,
72 },
73}
74
75impl ParseState {
76 pub fn version(&self) -> Option<i32> {
78 match self {
79 ParseState::Parsed { version, .. } => *version,
80 _ => None,
81 }
82 }
83
84 pub fn lines(&self) -> Option<&Arc<LineIndex>> {
86 match self {
87 ParseState::Parsed { lines, .. } => Some(lines),
88 _ => None,
89 }
90 }
91}
92
93#[derive(Debug)]
95pub struct DocumentGraphNode {
96 config: Config,
98 uri: Arc<Url>,
100 change: Option<IncrementalChange>,
104 parse_state: ParseState,
106 document: Option<Document>,
111 analysis_error: Option<Arc<anyhow::Error>>,
113}
114
115impl DocumentGraphNode {
116 pub fn new(config: Config, uri: Arc<Url>) -> Self {
118 Self {
119 config,
120 uri,
121 change: None,
122 parse_state: ParseState::NotParsed,
123 document: None,
124 analysis_error: None,
125 }
126 }
127
128 pub fn uri(&self) -> &Arc<Url> {
130 &self.uri
131 }
132
133 pub fn notify_incremental_change(&mut self, change: IncrementalChange) {
135 debug!("document `{uri}` has incrementally changed", uri = self.uri);
136
137 self.document = None;
139
140 if let Some(IncrementalChange {
142 version: existing_version,
143 start: existing_start,
144 edits: existing_edits,
145 }) = &mut self.change
146 {
147 let IncrementalChange {
148 version,
149 start,
150 edits,
151 } = change;
152 *existing_version = version;
153 if start.is_some() {
154 *existing_start = start;
155 *existing_edits = edits;
156 } else {
157 existing_edits.extend(edits);
158 }
159 } else {
160 self.change = Some(change)
161 }
162 }
163
164 pub fn notify_change(&mut self, discard_pending: bool) {
166 info!("document `{uri}` has changed", uri = self.uri);
167
168 self.document = None;
170 self.analysis_error = None;
171
172 if !matches!(
173 self.parse_state,
174 ParseState::Parsed {
175 version: Some(_),
176 ..
177 }
178 ) || discard_pending
179 {
180 self.parse_state = ParseState::NotParsed;
181 self.change = None;
182 }
183 }
184
185 pub fn parse_state(&self) -> &ParseState {
187 &self.parse_state
188 }
189
190 pub fn parse_completed(&mut self, state: ParseState) {
192 assert!(!matches!(state, ParseState::NotParsed));
193 self.parse_state = state;
194 self.analysis_error = None;
195
196 self.change = None;
198 }
199
200 pub fn document(&self) -> Option<&Document> {
204 self.document.as_ref()
205 }
206
207 pub fn analysis_error(&self) -> Option<&Arc<anyhow::Error>> {
209 self.analysis_error.as_ref()
210 }
211
212 pub fn analysis_completed(&mut self, document: Document) {
214 self.document = Some(document);
215 self.analysis_error = None;
216 }
217
218 pub fn analysis_failed(&mut self, error: Arc<anyhow::Error>) {
220 self.document = None;
221 self.analysis_error = Some(error);
222 }
223
224 pub fn reanalyze(&mut self) {
228 self.analysis_error = None;
229 self.document = None;
230 }
231
232 pub fn root(&self) -> Option<wdl_ast::Document> {
236 if let ParseState::Parsed { root, .. } = &self.parse_state {
237 return Some(
238 wdl_ast::Document::cast(SyntaxNode::new_root(root.clone()))
239 .expect("node should cast"),
240 );
241 }
242
243 None
244 }
245
246 pub fn wdl_version(&self) -> Option<SupportedVersion> {
251 if let ParseState::Parsed {
252 wdl_version: Some(v),
253 ..
254 } = &self.parse_state
255 {
256 Some(*v)
257 } else {
258 None
259 }
260 }
261
262 pub fn needs_parse(&self) -> bool {
264 self.change.is_some() || matches!(self.parse_state, ParseState::NotParsed)
265 }
266
267 pub fn parse(&self, tokio: &Handle, client: &Client) -> Result<ParseState> {
273 if !self.needs_parse() {
274 return Ok(self.parse_state.clone());
275 }
276
277 if let Some(state) = self.incremental_parse() {
279 return Ok(state);
280 }
281
282 self.full_parse(tokio, client)
284 }
285
286 fn incremental_parse(&self) -> Option<ParseState> {
291 match &self.change {
292 None | Some(IncrementalChange { start: Some(_), .. }) => None,
293 Some(IncrementalChange { start: None, .. }) => {
294 None
303 }
304 }
305 }
306
307 fn full_parse(&self, tokio: &Handle, client: &Client) -> Result<ParseState> {
309 let (version, source, lines) = match &self.change {
310 None => {
311 let result = match self.uri.to_file_path() {
313 Ok(path) => fs::read_to_string(path).map_err(Into::into),
314 Err(_) => match self.uri.scheme() {
315 "https" | "http" => Self::download_source(tokio, client, &self.uri),
316 scheme => Err(anyhow!("unsupported URI scheme `{scheme}`")),
317 },
318 };
319
320 match result {
321 Ok(source) => {
322 let lines = Arc::new(LineIndex::new(&source));
323 (None, source, lines)
324 }
325 Err(e) => return Ok(ParseState::Error(e.into())),
326 }
327 }
328 Some(IncrementalChange {
329 version,
330 start,
331 edits,
332 }) => {
333 let (mut source, mut lines) = if let Some(start) = start {
335 let source = start.clone();
336 let lines = Arc::new(LineIndex::new(&source));
337 (source, lines)
338 } else {
339 match &self.parse_state {
341 ParseState::Parsed { root, lines, .. } => (
342 SyntaxNode::new_root(root.clone()).text().to_string(),
343 lines.clone(),
344 ),
345 _ => panic!(
346 "cannot apply edits to a document that was not previously parsed"
347 ),
348 }
349 };
350
351 let mut last_line = !0u32;
354 for edit in edits {
355 let range = edit.range();
356 if last_line <= range.end.line {
357 lines = Arc::new(LineIndex::new(&source));
359 }
360
361 last_line = range.start.line;
362 edit.apply(&mut source, &lines)?;
363 }
364
365 if !edits.is_empty() {
366 lines = Arc::new(LineIndex::new(&source));
368 }
369
370 (Some(*version), source, lines)
371 }
372 };
373
374 let start = Instant::now();
376 let (document, mut diagnostics) = wdl_ast::Document::parse(&source);
377 info!(
378 "parsing of `{uri}` completed in {elapsed:?}",
379 uri = self.uri,
380 elapsed = start.elapsed()
381 );
382
383 let mut wdl_version = None;
387 if let Some(version_token) = document.version_statement().map(|stmt| stmt.version()) {
388 match (
389 version_token.text().parse::<SupportedVersion>(),
390 self.config.fallback_version(),
391 ) {
392 (Ok(version), _) => {
394 wdl_version = Some(version);
395 }
396 (Err(unrecognized), Some(fallback)) => {
398 if let Some(severity) = self.config.diagnostics_config().using_fallback_version
399 {
400 diagnostics.push(
401 Diagnostic::warning(format!(
402 "unsupported WDL version `{unrecognized}`; interpreting document \
403 as version `{fallback}`"
404 ))
405 .with_severity(severity)
406 .with_label(
407 "this version of WDL is not supported",
408 version_token.span(),
409 ),
410 );
411 }
412 wdl_version = Some(fallback);
413 }
414 (Err(unrecognized), None) => {
417 diagnostics.push(
418 Diagnostic::error(format!("unsupported WDL version `{unrecognized}`"))
419 .with_label(
420 "this version of WDL is not supported",
421 version_token.span(),
422 ),
423 );
424 }
425 };
426 }
427
428 Ok(ParseState::Parsed {
429 version,
430 wdl_version,
431 root: document.inner().green().into(),
432 lines,
433 diagnostics,
434 })
435 }
436
437 fn download_source(tokio: &Handle, client: &Client, uri: &Url) -> Result<String> {
442 const TIMEOUT_IN_SECS: u64 = 30;
444
445 info!("downloading source from `{uri}`");
446
447 tokio.block_on(async {
448 let resp = client
449 .get(uri.as_str())
450 .timeout(Duration::from_secs(TIMEOUT_IN_SECS))
451 .send()
452 .await?;
453
454 let code = resp.status();
455 if !code.is_success() {
456 bail!(
457 "server response for `{uri}` was {code} ({message})",
458 code = code.as_u16(),
459 message = code.canonical_reason().unwrap_or("unknown")
460 );
461 }
462
463 resp.text()
464 .await
465 .with_context(|| format!("failed to read response body for `{uri}`"))
466 })
467 }
468}
469
470#[derive(Debug)]
472pub struct DocumentGraph {
473 config: Config,
475 inner: StableDiGraph<DocumentGraphNode, ()>,
480 indexes: IndexMap<Arc<Url>, NodeIndex>,
482 roots: IndexSet<NodeIndex>,
489 cycles: HashSet<(NodeIndex, NodeIndex)>,
499}
500
501impl DocumentGraph {
502 pub fn new(config: Config) -> Self {
504 DocumentGraph {
505 config,
506 inner: StableDiGraph::new(),
507 indexes: IndexMap::new(),
508 roots: IndexSet::new(),
509 cycles: HashSet::new(),
510 }
511 }
512
513 pub fn add_node(&mut self, uri: Url, rooted: bool) -> NodeIndex {
515 let index = match self.indexes.get(&uri) {
516 Some(index) => *index,
517 _ => {
518 debug!("inserting `{uri}` into the document graph");
519 let uri = Arc::new(uri);
520 let index = self
521 .inner
522 .add_node(DocumentGraphNode::new(self.config.clone(), uri.clone()));
523 self.indexes.insert(uri, index);
524 index
525 }
526 };
527
528 if rooted {
529 self.roots.insert(index);
530 }
531
532 index
533 }
534
535 pub fn remove_root(&mut self, uri: &Url) {
543 let base = match uri.to_file_path() {
544 Ok(base) => base,
545 Err(_) => return,
546 };
547
548 let mut removed = Vec::new();
550 for (uri, index) in &self.indexes {
551 let path = match uri.to_file_path() {
552 Ok(path) => path,
553 Err(_) => continue,
554 };
555
556 if path.starts_with(&base) {
557 removed.push(*index);
558 }
559 }
560
561 for index in removed {
562 let node = &mut self.inner[index];
563
564 if !self.roots.swap_remove(&index) {
567 debug!(
568 "document `{uri}` is no longer rooted in the graph",
569 uri = node.uri
570 );
571 }
572
573 node.parse_state = ParseState::NotParsed;
574 node.document = None;
575 node.change = None;
576
577 self.bfs_mut(index, |graph, dependent: NodeIndex| {
579 let node = graph.get_mut(dependent);
580 debug!("document `{uri}` needs to be reanalyzed", uri = node.uri);
581 node.document = None;
582 });
583 }
584 }
585
586 pub fn is_rooted(&self, index: NodeIndex) -> bool {
588 self.roots.contains(&index)
589 }
590
591 pub fn roots(&self) -> &IndexSet<NodeIndex> {
593 &self.roots
594 }
595
596 pub fn include_result(&self, index: NodeIndex) -> bool {
599 let node = self.get(index);
601 node.document().is_some()
602 && (self.roots.contains(&index)
603 || matches!(node.parse_state(), ParseState::Parsed { .. }))
604 }
605
606 pub fn get(&self, index: NodeIndex) -> &DocumentGraphNode {
608 &self.inner[index]
609 }
610
611 pub fn get_mut(&mut self, index: NodeIndex) -> &mut DocumentGraphNode {
613 &mut self.inner[index]
614 }
615
616 pub fn get_index(&self, uri: &Url) -> Option<NodeIndex> {
620 self.indexes.get(uri).copied()
621 }
622
623 pub fn bfs_mut(&mut self, index: NodeIndex, mut cb: impl FnMut(&mut Self, NodeIndex)) {
628 let mut bfs = Bfs::new(&self.inner, index);
629 while let Some(node) = bfs.next(&self.inner) {
630 cb(self, node);
631 }
632 }
633
634 pub fn dependencies(&self, index: NodeIndex) -> impl Iterator<Item = NodeIndex> + '_ {
636 self.inner
637 .edges_directed(index, Direction::Incoming)
638 .map(|e| e.source())
639 }
640
641 pub fn remove_dependency_edges(&mut self, index: NodeIndex) {
643 self.inner.retain_edges(|g, e| {
646 let (_, target) = g.edge_endpoints(e).expect("edge should be valid");
647 target != index
648 });
649 }
650
651 pub fn add_dependency_edge(&mut self, from: NodeIndex, to: NodeIndex, space: &mut DfsSpace) {
655 if has_path_connecting(&self.inner, from, to, Some(space)) {
658 debug!(
660 "an import cycle was detected between `{from}` and `{to}`",
661 from = self.inner[from].uri,
662 to = self.inner[to].uri
663 );
664 self.cycles.insert((from, to));
665 } else if !self.inner.contains_edge(to, from) {
666 debug!(
667 "adding dependency edge from `{from}` to `{to}`",
668 from = self.inner[from].uri,
669 to = self.inner[to].uri
670 );
671
672 self.inner.add_edge(to, from, ());
675 }
676 }
677
678 pub fn contains_cycle(&self, from: NodeIndex, to: NodeIndex) -> bool {
680 self.cycles.contains(&(from, to))
681 }
682
683 pub fn subgraph(&self, nodes: &IndexSet<NodeIndex>) -> StableDiGraph<NodeIndex, ()> {
685 self.inner
686 .filter_map(|i, _| nodes.contains(&i).then_some(i), |_, _| Some(()))
687 }
688
689 pub fn gc(&mut self) {
694 let mut collected = HashSet::new();
695 for node in self.inner.node_indices() {
696 if self.roots.contains(&node) {
697 continue;
698 }
699
700 if self
701 .inner
702 .edges_directed(node, Direction::Outgoing)
703 .next()
704 .is_none()
705 {
706 debug!(
707 "removing document `{uri}` from the graph",
708 uri = self.inner[node].uri
709 );
710 collected.insert(node);
711 }
712 }
713
714 if collected.is_empty() {
715 return;
716 }
717
718 for node in &collected {
719 self.inner.remove_node(*node);
720 }
721
722 self.indexes.retain(|_, index| !collected.contains(index));
723
724 self.cycles
725 .retain(|(from, to)| !collected.contains(from) && !collected.contains(to));
726 }
727
728 pub fn transitive_dependents(
730 &self,
731 index: petgraph::graph::NodeIndex,
732 ) -> impl Iterator<Item = NodeIndex> {
733 Bfs::new(&self.inner, index).iter(&self.inner)
734 }
735
736 pub(crate) fn inner(&self) -> &StableDiGraph<DocumentGraphNode, ()> {
738 &self.inner
739 }
740}