use std::borrow::Cow;
use std::path::Path;
use std::str::FromStr;
use lasso::Spur;
use ropey::Rope;
use tower_lsp_server::ls_types::*;
use tracing::{debug, instrument, trace, warn};
use tree_sitter::{Node, Parser, QueryCapture, QueryMatch};
use ts_macros::query;
use crate::prelude::*;
use crate::analyze::{Type, type_cache};
use crate::index::{_G, _I, _R, PathSymbol, index_models};
use crate::model::{ModelName, ModelType};
use crate::xml::determine_csv_xmlid_subgroup;
use crate::{backend::Backend, backend::Text};
use std::collections::HashMap;
mod completions;
mod diagnostics;
#[cfg(test)]
mod tests;
#[rustfmt::skip]
query! {
PyCompletions(Request, XmlId, Mapped, MappedTarget, Depends, ReadFn, Model, Prop, ForXmlId, Scope, FieldDescriptor, FieldType, HasGroups);
(call [
(attribute [
(identifier) @_env
(attribute (_) (identifier) @_env)] (identifier) @_ref)
(attribute
(identifier) @REQUEST (identifier) @_render)
(attribute
(_) (identifier) @FOR_XML_ID)
(attribute
(_) (identifier) @HAS_GROUPS) ]
(argument_list . (string) @XML_ID)
(#eq? @_env "env")
(#eq? @_ref "ref")
(#eq? @REQUEST "request")
(#eq? @_render "render")
(#eq? @FOR_XML_ID "_for_xml_id")
(#match? @HAS_GROUPS "^(user_has_groups|has_group)$")
)
(subscript [
(identifier) @_env
(attribute (_) (identifier) @_env)]
(string) @MODEL
(#eq? @_env "env"))
((class_definition
(block
(expression_statement
(assignment
(identifier) @PROP [
(string) @MODEL
(list ((string) @MODEL ","?)*)
(call
(attribute
(identifier) @_fields (identifier) @FIELD_TYPE (#eq? @_fields "fields"))
(argument_list
. [
((comment)+ (string) @MODEL)
(string) @MODEL ]?
((keyword_argument (identifier) @FIELD_DESCRIPTOR (_)) ","?)*)) ])))))
(call [
(attribute
(_) @MAPPED_TARGET (identifier) @_mapper)
(attribute
(identifier) @_api (identifier) @DEPENDS)]
(argument_list (string) @MAPPED)
(#match? @_mapper "^(mapp|filter|sort|group)ed$")
(#eq? @_api "api")
(#match? @DEPENDS "^(depends|constrains|onchange)$"))
((call
(attribute
(_) @MAPPED_TARGET (identifier) @_search)
(argument_list [
(list [
(tuple . (string) @MAPPED)
(parenthesized_expression (string) @MAPPED)])
(keyword_argument
(identifier) @_domain
(list [
(tuple . (string) @MAPPED)
(parenthesized_expression (string) @MAPPED)]))]))
(#eq? @_domain "domain")
(#match? @_search "^(search(_(read|count))?|_?read_group|filtered_domain|_where_calc)$"))
((call
(attribute
(_) @MAPPED_TARGET (identifier) @READ_FN)
(argument_list [
(list (string) @MAPPED)
(keyword_argument
(identifier) @_domain
(list (string) @MAPPED)) ]))
(#match? @_domain "^(groupby|aggregates)$")
(#match? @READ_FN "^(_?read(_group)?|flush_model)$"))
((call
(attribute
(_) @MAPPED_TARGET (identifier) @DEPENDS)
(argument_list . [
(set (string) @MAPPED)
(dictionary [
(pair key: (string) @MAPPED)
(ERROR (string) @MAPPED)
(ERROR) @MAPPED ])
(_ [
(set (string) @MAPPED)
(dictionary [
(pair key: (string) @MAPPED)
(ERROR (string) @MAPPED) ]) ]) ]))
(#match? @DEPENDS "^(create|write|copy)$"))
((class_definition
(block [
(function_definition) @SCOPE
(decorated_definition
(decorator
(call
(attribute (identifier) @_api (identifier) @_depends)
(argument_list ((string) @MAPPED ","?)*)))
(function_definition) @SCOPE) ]))
(#eq? @_api "api")
(#eq? @_depends "depends"))
(class_definition
(block
(decorated_definition
(decorator (_) @_)
(function_definition) @SCOPE)*)
(#not-match? @_ "^api.depends"))
}
#[rustfmt::skip]
query! {
PyImports(ImportModule, ImportName, ImportAlias);
(import_from_statement
module_name: (dotted_name) @IMPORT_MODULE
name: (dotted_name) @IMPORT_NAME)
(import_from_statement
module_name: (dotted_name) @IMPORT_MODULE
name: (aliased_import
name: (dotted_name) @IMPORT_NAME
alias: (identifier) @IMPORT_ALIAS))
(import_statement
name: (dotted_name) @IMPORT_NAME)
(import_statement
name: (aliased_import
name: (dotted_name) @IMPORT_NAME
alias: (identifier) @IMPORT_ALIAS))
}
#[derive(derive_more::FromStr, Clone, Copy)]
#[from_str(rename_all = "snake_case")]
enum FieldDescriptors {
ComodelName,
Domain,
Compute,
Inverse,
Search,
InverseName,
Related,
Groups,
}
pub(crate) fn top_level_stmt(module: Node, offset: usize) -> Option<Node> {
module
.named_children(&mut module.walk())
.find(|child| child.byte_range().contains_end(offset))
}
fn find_class_definition<'a>(
node: tree_sitter::Node<'a>,
contents: &str,
class_name: &str,
) -> Option<tree_sitter::Node<'a>> {
use crate::utils::PreTravel;
PreTravel::new(node)
.find(|node| {
node.kind() == "class_definition"
&& node
.child_by_field_name("name")
.map(|name_node| class_name == &contents[name_node.byte_range()])
.unwrap_or(false)
})
.and_then(|node| node.child_by_field_name("name"))
}
#[derive(Debug)]
struct Mapped<'text> {
needle: &'text str,
model: &'text str,
single_field: bool,
range: ByteRange,
}
#[derive(Debug, Clone)]
struct ImportInfo {
module_path: String,
imported_name: String,
alias: Option<String>,
}
type ImportMap = HashMap<String, ImportInfo>;
impl Backend {
fn resolve_import_location(&self, imports: &ImportMap, identifier: &str) -> anyhow::Result<Option<Location>> {
let Some(import_info) = imports.get(identifier) else {
return Ok(None);
};
if let Some(alias) = &import_info.alias {
debug!(
"Found aliased import '{}' -> '{}' from module '{}'",
alias, import_info.imported_name, import_info.module_path
);
} else {
debug!(
"Found direct import '{}' from module '{}'",
import_info.imported_name, import_info.module_path
);
}
let Some(file_path) = self.index.resolve_py_module(&import_info.module_path) else {
debug!("Failed to resolve module path: {}", import_info.module_path);
return Ok(None);
};
debug!("Resolved file path: {}", file_path.display());
let target_contents = ok!(
test_utils::fs::read_to_string(&file_path),
"Failed to read target file {}",
file_path.display(),
);
let class_name = &import_info.imported_name;
if let Some(alias) = &import_info.alias {
debug!(
"Looking for original class '{}' (aliased as '{}') in target file",
class_name, alias
);
} else {
debug!("Looking for class '{}' in target file", class_name);
}
let mut target_parser = Parser::new();
target_parser
.set_language(&tree_sitter_python::LANGUAGE.into())
.map_err(|e| anyhow::anyhow!("Failed to set parser language: {}", e))?;
let Some(target_ast) = target_parser.parse(&target_contents, None) else {
debug!("Failed to parse target file with tree-sitter");
return Ok(Some(Location {
uri: Uri::from_file_path(file_path).unwrap(),
range: Range::new(Position::new(0, 0), Position::new(0, 0)),
}));
};
if let Some(class_node) = find_class_definition(target_ast.root_node(), &target_contents, class_name) {
let range = class_node.range();
if let Some(alias) = &import_info.alias {
debug!(
"Found class '{}' (aliased as '{}') at line {}, col {}",
class_name, alias, range.start_point.row, range.start_point.column
);
} else {
debug!(
"Found class '{}' at line {}, col {}",
class_name, range.start_point.row, range.start_point.column
);
}
return Ok(Some(Location {
uri: Uri::from_file_path(file_path).unwrap(),
range: span_conv(range),
}));
}
if let Some(alias) = &import_info.alias {
debug!(
"Class '{}' (aliased as '{}') not found in target file using tree-sitter",
class_name, alias
);
} else {
debug!("Class '{}' not found in target file using tree-sitter", class_name);
}
Ok(Some(Location {
uri: Uri::from_file_path(file_path).unwrap(),
range: Range::new(Position::new(0, 0), Position::new(0, 0)),
}))
}
#[tracing::instrument(skip_all, ret, fields(uri))]
pub fn on_change_python(
&self,
text: &Text,
uri: &Uri,
rope: RopeSlice<'_>,
old_rope: Option<Rope>,
) -> anyhow::Result<()> {
let mut parser = Parser::new();
parser
.set_language(&tree_sitter_python::LANGUAGE.into())
.expect("bug: failed to init python parser");
self.update_ast(text, uri, rope, old_rope, parser)
}
fn parse_imports(&self, contents: &str) -> anyhow::Result<ImportMap> {
let mut parser = Parser::new();
parser.set_language(&tree_sitter_python::LANGUAGE.into())?;
let ast = parser
.parse(contents, None)
.ok_or_else(|| errloc!("Failed to parse Python AST"))?;
let query = PyImports::query();
let mut cursor = tree_sitter::QueryCursor::new();
let mut imports = ImportMap::new();
debug!("Parsing imports from {} bytes", contents.len());
let mut matches = cursor.matches(query, ast.root_node(), contents.as_bytes());
while let Some(match_) = matches.next() {
let mut module_path = None;
let mut import_name = None;
let mut alias = None;
debug!("Found import match with {} captures", match_.captures.len());
for capture in match_.captures {
let capture_text = &contents[capture.node.byte_range()];
debug!("Capture {}: = '{}'", capture.index, capture_text);
match PyImports::from(capture.index) {
Some(PyImports::ImportModule) => {
module_path = Some(capture_text.to_string());
}
Some(PyImports::ImportName) => {
import_name = Some(capture_text.to_string());
}
Some(PyImports::ImportAlias) => {
alias = Some(capture_text.to_string());
}
_ => {}
}
}
if let Some(name) = import_name {
let full_module_path = if let Some(module) = module_path {
module } else {
name.clone() };
let key = alias.as_ref().unwrap_or(&name).clone();
debug!("Adding import: {} -> {} (from module {})", key, name, full_module_path);
imports.insert(
key,
ImportInfo {
module_path: full_module_path,
imported_name: name,
alias,
},
);
}
}
debug!("Final imports map: {:?}", imports);
Ok(imports)
}
pub fn update_models(&self, text: Text, path: &Path, root: Spur, rope: Rope) -> anyhow::Result<()> {
let text = match text {
Text::Full(text) => Cow::from(text),
Text::Delta(_) => Cow::from(rope.slice(..)),
};
let models = index_models(text.as_bytes())?;
let path = PathSymbol::strip_root(root, path);
self.index.models.append(path, true, &models);
for model in models {
match model.type_ {
ModelType::Base { name, ancestors } => {
let model_key = _G(&name).unwrap();
let mut entry = self
.index
.models
.try_get_mut(&model_key)
.expect(format_loc!("deadlock"))
.unwrap();
entry
.ancestors
.extend(ancestors.into_iter().map(|sym| ModelName::from(_I(&sym))));
drop(entry);
self.index.models.populate_properties(model_key.into(), &[path]);
}
ModelType::Inherit(inherits) => {
let Some(model) = inherits.first() else { continue };
let model_key = _G(model).unwrap();
self.index.models.populate_properties(model_key.into(), &[path]);
}
}
}
Ok(())
}
pub async fn did_save_python(&self, uri: Uri, root: Spur) -> anyhow::Result<()> {
let path = uri.to_file_path().unwrap();
let zone;
_ = {
let mut document = self
.document_map
.get_mut(uri.path().as_str())
.ok_or_else(|| errloc!("(did_save) did not build document"))?;
zone = document.damage_zone.take();
let rope = document.rope.clone();
let text = Cow::from(&document.rope).into_owned();
self.update_models(Text::Full(text), &path, root, rope)
}
.inspect_err(|err| warn!("{err:?}"));
if zone.is_some() {
debug!("diagnostics");
{
let mut document = self.document_map.get_mut(uri.path().as_str()).unwrap();
let rope = document.rope.clone();
let file_path = uri.to_file_path().unwrap();
self.diagnose_python(
file_path.to_str().unwrap(),
rope.slice(..),
zone,
&mut document.diagnostics_cache,
);
let diags = document.diagnostics_cache.clone();
self.client.publish_diagnostics(uri, diags, None)
}
.await;
}
Ok(())
}
#[instrument(level = "trace", skip_all, ret, fields(range_content = &contents[range.clone()]))]
fn gather_mapped<'text>(
&self,
root: Node,
match_: &tree_sitter::QueryMatch,
offset: Option<usize>,
mut range: core::ops::Range<usize>,
this_model: Option<&'text str>,
contents: &'text str,
for_replacing: bool,
single_field_override: Option<bool>,
) -> Option<Mapped<'text>> {
let mut needle = if for_replacing {
range = range.shrink(1);
let offset = offset.unwrap_or(range.end);
&contents[range.start..offset]
} else {
let slice = &contents[range.clone().shrink(1)];
let relative_start = range.start + 1;
let offset = offset
.unwrap_or((range.end - 1).max(relative_start + 1))
.max(relative_start)
.min(relative_start + slice.len());
let start = offset - relative_start;
let slice_till_end = slice.get(start..).unwrap_or("");
let limit = slice_till_end.find('.').unwrap_or(slice_till_end.len());
range = relative_start..offset + limit;
&contents[range.clone()]
};
if needle == "|" || needle == "&" {
return None;
}
tracing::trace!("(gather_mapped) {} matches={match_:?}", &contents[range.clone()]);
let model;
if let Some(local_model) = match_.nodes_for_capture_index(PyCompletions::MappedTarget as _).next() {
let model_ = (self.index).model_of_range(root, local_model.byte_range().map_unit(ByteOffset), contents)?;
model = _R(model_);
} else if let Some(this_model) = &this_model {
model = this_model
} else {
return None;
}
let mut single_field = false;
if let Some(depends) = match_.nodes_for_capture_index(PyCompletions::Depends as _).next() {
single_field = matches!(
&contents[depends.byte_range()],
"write" | "create" | "constrains" | "onchange"
);
} else if let Some(read_fn) = match_.nodes_for_capture_index(PyCompletions::ReadFn as _).next() {
single_field = true;
if contents[read_fn.byte_range()].ends_with("read_group") {
needle = match needle.split_once(":") {
None => needle,
Some((field, _)) => {
range = range.start..range.start + field.len();
field
}
}
}
} else if let Some(override_) = single_field_override {
single_field = override_;
}
Some(Mapped {
needle,
model,
single_field,
range: range.map_unit(ByteOffset),
})
}
pub fn python_jump_def(
&self,
params: GotoDefinitionParams,
rope: RopeSlice<'_>,
) -> anyhow::Result<Option<Location>> {
let uri = ¶ms.text_document_position_params.text_document.uri;
let file_path = uri.to_file_path().unwrap();
let file_path_str = file_path.to_str().unwrap();
let ast = self
.ast_map
.get(file_path_str)
.ok_or_else(|| errloc!("Did not build AST for {}", file_path_str))?;
let ByteOffset(offset) = rope_conv(params.text_document_position_params.position, rope);
let contents = Cow::from(rope);
let root = some!(top_level_stmt(ast.root_node(), offset));
let imports = self.parse_imports(&contents).unwrap_or_default();
debug!("Parsed imports: {:?}", imports);
if let Some(cursor_node) = ast.root_node().descendant_for_byte_range(offset, offset)
&& cursor_node.kind() == "identifier"
{
let identifier = &contents[cursor_node.byte_range()];
debug!("Checking identifier '{}' at offset {}", identifier, offset);
if let Some(location) = self.resolve_import_location(&imports, identifier )? {
return Ok(Some(location));
}
}
let query = PyCompletions::query();
let mut cursor = tree_sitter::QueryCursor::new();
let mut this_model = ThisModel::default();
let mut matches = cursor.matches(query, ast.root_node(), contents.as_bytes());
while let Some(match_) = matches.next() {
for capture in match_.captures {
let range = capture.node.byte_range();
match PyCompletions::from(capture.index) {
Some(PyCompletions::XmlId) if range.contains(&offset) => {
let range = range.shrink(1);
let slice = Cow::from(ok!(rope.try_slice(range.clone())));
let mut slice = slice.as_ref();
if match_
.nodes_for_capture_index(PyCompletions::HasGroups as _)
.next()
.is_some()
{
let mut ref_ = None;
determine_csv_xmlid_subgroup(&mut ref_, (slice, range.clone()), offset);
(slice, _) = some!(ref_);
}
return self
.index
.jump_def_xml_id(slice, ¶ms.text_document_position_params.text_document.uri);
}
Some(PyCompletions::Model) => {
let range = capture.node.byte_range();
let is_meta = match_
.nodes_for_capture_index(PyCompletions::Prop as _)
.next()
.map(|prop| matches!(&contents[prop.byte_range()], "_name" | "_inherit"))
.unwrap_or(true);
if range.contains(&offset) {
let range = range.shrink(1);
let slice = ok!(rope.try_slice(range.clone()));
let slice = Cow::from(slice);
return self.index.jump_def_model(&slice);
} else if range.end < offset && is_meta
{
this_model.tag_model(capture.node, match_, root.byte_range(), &contents);
}
}
Some(PyCompletions::Mapped) => {
if range.contains_end(offset)
&& let Some(mapped) = self.gather_mapped(
root,
match_,
Some(offset),
range.clone(),
this_model.inner,
&contents,
false,
None,
) {
let mut needle = mapped.needle;
let mut model = _I(mapped.model);
if !mapped.single_field {
some!(self.index.models.resolve_mapped(&mut model, &mut needle, None).ok());
}
let model = _R(model);
return self.index.jump_def_property_name(needle, model);
} else if let Some(cmdlist) = python_next_named_sibling(capture.node)
&& Backend::is_commandlist(cmdlist, offset)
{
let (needle, _, model) = some!(self.gather_commandlist(
cmdlist,
root,
match_,
offset,
range,
this_model.inner,
&contents,
false,
));
return self.index.jump_def_property_name(needle, _R(model));
}
}
Some(PyCompletions::FieldDescriptor) => {
use FieldDescriptors as FD;
let Some(desc_value) = python_next_named_sibling(capture.node) else {
continue;
};
if !desc_value.byte_range().contains_end(offset) {
continue;
}
match FD::from_str(&contents[range]) {
Ok(FD::ComodelName) => {
let range = desc_value.byte_range().shrink(1);
let slice = ok!(rope.try_slice(range.clone()));
let slice = Cow::from(slice);
return self.index.jump_def_model(&slice);
}
Ok(
descriptor @ (FD::Compute | FD::Search | FD::Inverse | FD::Related | FD::InverseName),
) => {
let single_field = !matches!(descriptor, FD::Related);
let mapped_model = if matches!(descriptor, FD::InverseName) {
extract_comodel_name(match_.captures, &contents)
.map(|comodel_name| &contents[comodel_name.byte_range().shrink(1)])
} else {
this_model.inner
};
let Some(mapped) = self.gather_mapped(
root,
match_,
Some(offset),
desc_value.byte_range(),
mapped_model,
&contents,
false,
Some(single_field),
) else {
break;
};
let mut needle = mapped.needle;
let mut model = _I(mapped.model);
if !mapped.single_field {
some!(self.index.models.resolve_mapped(&mut model, &mut needle, None).ok());
}
let model = _R(model);
return self.index.jump_def_property_name(needle, model);
}
Ok(FD::Groups) => {
let range = desc_value.byte_range().shrink(1);
let value = Cow::from(ok!(rope.try_slice(range.clone())));
let mut ref_ = None;
determine_csv_xmlid_subgroup(&mut ref_, (&value, range), offset);
let (needle, _) = some!(ref_);
return self.index.jump_def_xml_id(needle, uri);
}
Ok(FD::Domain) | Err(_) => {}
}
return Ok(None);
}
Some(PyCompletions::Request)
| Some(PyCompletions::ForXmlId)
| Some(PyCompletions::HasGroups)
| Some(PyCompletions::XmlId)
| Some(PyCompletions::MappedTarget)
| Some(PyCompletions::Depends)
| Some(PyCompletions::Prop)
| Some(PyCompletions::ReadFn)
| Some(PyCompletions::Scope)
| Some(PyCompletions::FieldType)
| None => {}
}
}
}
let (model, prop, _) = some!(self.attribute_at_offset(offset, root, &contents));
self.index.jump_def_property_name(prop, model)
}
fn attribute_at_offset<'out>(
&'out self,
offset: usize,
root: Node<'out>,
contents: &'out str,
) -> Option<(&'out str, &'out str, core::ops::Range<usize>)> {
let (lhs, field, range) = Self::attribute_node_at_offset(offset, root, contents)?;
let model = (self.index).model_of_range(root, lhs.byte_range().map_unit(ByteOffset), contents)?;
Some((_R(model), field, range))
}
#[instrument(level = "trace", skip_all, ret)]
pub fn attribute_node_at_offset<'out>(
mut offset: usize,
root: Node<'out>,
contents: &'out str,
) -> Option<(Node<'out>, &'out str, core::ops::Range<usize>)> {
if contents.is_empty() {
return None;
}
offset = offset.clamp(0, contents.len() - 1);
let mut cursor_node = root.descendant_for_byte_range(offset, offset)?;
let mut real_offset = None;
if cursor_node.is_named() && !matches!(cursor_node.kind(), "attribute" | "identifier") {
real_offset = Some(offset);
offset = offset.saturating_sub(1);
cursor_node = root.descendant_for_byte_range(offset, offset)?;
}
trace!(
"(attribute_node_to_offset) {} cursor={}\n sexp={}",
&contents[cursor_node.byte_range()],
contents.as_bytes()[offset] as char,
cursor_node.to_sexp(),
);
let lhs;
let rhs;
if !cursor_node.is_named() {
let idx = contents[..=offset].bytes().rposition(|c| c == b'.')?;
let ident = contents[..=idx].bytes().rposition(|c| c.is_ascii_alphanumeric())?;
lhs = root.descendant_for_byte_range(ident, ident)?;
rhs = python_next_named_sibling(lhs).and_then(|attr| match attr.kind() {
"identifier" => Some(attr),
"attribute" => attr.child_by_field_name("attribute"),
_ => None,
});
} else if cursor_node.kind() == "attribute" {
lhs = cursor_node.child_by_field_name("object")?;
rhs = cursor_node.child_by_field_name("attribute");
} else {
match cursor_node.parent() {
Some(parent) if parent.kind() == "attribute" => {
lhs = parent.child_by_field_name("object")?;
rhs = Some(cursor_node);
}
Some(parent) if parent.kind() == "ERROR" => {
lhs = cursor_node;
rhs = None;
}
_ => return None,
}
}
trace!(
"(attribute_node_to_offset) lhs={} rhs={:?}",
&contents[lhs.byte_range()],
rhs.as_ref().map(|rhs| &contents[rhs.byte_range()]),
);
if lhs == cursor_node {
return None;
}
let Some(rhs) = rhs else {
let offset = real_offset.unwrap_or(offset);
return Some((lhs, "", offset..offset));
};
let (field, range) = if rhs.range().start_point.row != lhs.range().end_point.row {
let offset = real_offset.unwrap_or(offset);
("", offset..offset)
} else {
let range = rhs.byte_range();
(&contents[range.clone()], range)
};
Some((lhs, field, range))
}
pub fn python_references(
&self,
params: ReferenceParams,
rope: RopeSlice<'_>,
) -> anyhow::Result<Option<Vec<Location>>> {
let ByteOffset(offset) = rope_conv(params.text_document_position.position, rope);
let uri = ¶ms.text_document_position.text_document.uri;
let file_path = uri.to_file_path().unwrap();
let file_path_str = file_path.to_str().unwrap();
let ast = self
.ast_map
.get(file_path_str)
.ok_or_else(|| errloc!("Did not build AST for {}", file_path_str))?;
let root = some!(top_level_stmt(ast.root_node(), offset));
let query = PyCompletions::query();
let contents = Cow::from(rope);
let mut cursor = tree_sitter::QueryCursor::new();
let path = some!(params.text_document_position.text_document.uri.to_file_path());
let current_module = self.index.find_module_of(&path);
let mut this_model = ThisModel::default();
let mut matches = cursor.matches(query, ast.root_node(), contents.as_bytes());
while let Some(match_) = matches.next() {
for capture in match_.captures {
let range = capture.node.byte_range();
match PyCompletions::from(capture.index) {
Some(PyCompletions::XmlId) if range.contains(&offset) => {
let range = range.shrink(1);
let slice = Cow::from(ok!(rope.try_slice(range.clone())));
let mut slice = slice.as_ref();
if match_
.nodes_for_capture_index(PyCompletions::HasGroups as _)
.next()
.is_some()
{
let mut ref_ = None;
determine_csv_xmlid_subgroup(&mut ref_, (slice, range.clone()), offset);
(slice, _) = some!(ref_);
}
return self.record_references(&path, slice, current_module);
}
Some(PyCompletions::Model) => {
let range = capture.node.byte_range();
let is_meta = match_
.nodes_for_capture_index(PyCompletions::Prop as _)
.next()
.map(|prop| matches!(&contents[prop.byte_range()], "_name" | "_inherit"))
.unwrap_or(true);
if is_meta && range.contains(&offset) {
let range = range.shrink(1);
let slice = ok!(rope.try_slice(range.clone()));
let slice = Cow::from(slice);
let slice = some!(_G(slice));
return self.model_references(&path, &slice.into());
} else if range.end < offset
&& match_
.nodes_for_capture_index(PyCompletions::FieldType as _)
.next()
.is_none()
{
this_model.tag_model(capture.node, match_, root.byte_range(), &contents);
}
}
Some(PyCompletions::FieldDescriptor) => {
use FieldDescriptors as FD;
let Some(desc_value) = python_next_named_sibling(capture.node) else {
continue;
};
if !desc_value.byte_range().contains_end(offset) {
continue;
};
match FD::from_str(&contents[range]) {
Ok(FD::ComodelName) => {
let range = desc_value.byte_range().shrink(1);
let slice = ok!(rope.try_slice(range.clone()));
let slice = Cow::from(slice);
let slice = some!(_G(slice));
return self.model_references(&path, &slice.into());
}
Ok(FD::Compute | FD::Search | FD::Inverse) => {
let range = desc_value.byte_range().shrink(1);
let model = some!(this_model.inner.as_ref());
let prop = &contents[range];
return self.index.method_references(prop, model);
}
Ok(FD::InverseName) => return Ok(None),
Ok(FD::Domain | FD::Related | FD::Groups) | Err(_) => {}
}
return Ok(None);
}
Some(PyCompletions::Request)
| Some(PyCompletions::XmlId)
| Some(PyCompletions::ForXmlId)
| Some(PyCompletions::HasGroups)
| Some(PyCompletions::Mapped)
| Some(PyCompletions::MappedTarget)
| Some(PyCompletions::Depends)
| Some(PyCompletions::Prop)
| Some(PyCompletions::ReadFn)
| Some(PyCompletions::Scope)
| Some(PyCompletions::FieldType)
| None => {}
}
}
}
let (model, prop, _) = some!(self.attribute_at_offset(offset, root, &contents));
self.index.method_references(prop, model)
}
pub fn python_hover(&self, params: HoverParams, rope: RopeSlice<'_>) -> anyhow::Result<Option<Hover>> {
let uri = ¶ms.text_document_position_params.text_document.uri;
let file_path = uri.to_file_path().unwrap();
let file_path_str = file_path.to_str().unwrap();
let ast = self
.ast_map
.get(file_path_str)
.ok_or_else(|| errloc!("Did not build AST for {}", file_path_str))?;
let ByteOffset(offset) = rope_conv(params.text_document_position_params.position, rope);
let contents = Cow::from(rope);
let root = some!(top_level_stmt(ast.root_node(), offset));
let query = PyCompletions::query();
let mut cursor = tree_sitter::QueryCursor::new();
let mut this_model = ThisModel::default();
let mut matches = cursor.matches(query, ast.root_node(), contents.as_bytes());
while let Some(match_) = matches.next() {
for capture in match_.captures {
let range = capture.node.byte_range();
match PyCompletions::from(capture.index) {
Some(PyCompletions::Model) => {
if range.contains_end(offset) {
let range = range.shrink(1);
let lsp_range = span_conv(capture.node.range());
let slice = ok!(rope.try_slice(range.clone()));
let slice = Cow::from(slice);
return self.index.hover_model(&slice, Some(lsp_range), false, None);
}
if range.end < offset
&& match_
.nodes_for_capture_index(PyCompletions::Prop as _)
.next()
.is_some()
{
this_model.tag_model(capture.node, match_, root.byte_range(), &contents);
}
}
Some(PyCompletions::Mapped) => {
if range.contains(&offset) {
let mapped = some!(self.gather_mapped(
root,
match_,
Some(offset),
range.clone(),
this_model.inner,
&contents,
false,
None,
));
let mut needle = mapped.needle;
let mut model = _I(mapped.model);
let mut range = mapped.range;
if !mapped.single_field {
some!(
self.index
.models
.resolve_mapped(&mut model, &mut needle, Some(&mut range))
.ok()
);
}
let model = _R(model);
return (self.index).hover_property_name(needle, model, Some(rope_conv(range, rope)));
} else if let Some(cmdlist) = python_next_named_sibling(capture.node)
&& Backend::is_commandlist(cmdlist, offset)
{
let (needle, range, model) = some!(self.gather_commandlist(
cmdlist,
root,
match_,
offset,
range,
this_model.inner,
&contents,
false,
));
let range = Some(rope_conv(range, rope));
return self.index.hover_property_name(needle, _R(model), range);
}
}
Some(PyCompletions::XmlId) if range.contains_end(offset) => {
let range = range.shrink(1);
let slice = Cow::from(ok!(rope.try_slice(range.clone())));
let mut slice = slice.as_ref();
if match_
.nodes_for_capture_index(PyCompletions::HasGroups as _)
.next()
.is_some()
{
let mut ref_ = None;
determine_csv_xmlid_subgroup(&mut ref_, (slice, range.clone()), offset);
if let Some((needle, _)) = ref_ {
slice = needle;
}
}
return (self.index).hover_record(slice, Some(rope_conv(range.map_unit(ByteOffset), rope)));
}
Some(PyCompletions::Prop) if range.contains(&offset) => {
let model = some!(this_model.inner);
let name = &contents[range];
let range = span_conv(capture.node.range());
return self.index.hover_property_name(name, model, Some(range));
}
Some(PyCompletions::FieldDescriptor) => {
use FieldDescriptors as FD;
let Some(desc_value) = python_next_named_sibling(capture.node) else {
continue;
};
if !desc_value.byte_range().contains_end(offset) {
continue;
}
match FD::from_str(&contents[range]) {
Ok(FD::ComodelName) => {
let range = desc_value.byte_range().shrink(1);
let lsp_range = span_conv(desc_value.range());
let slice = ok!(rope.try_slice(range.clone()));
let slice = Cow::from(slice);
return self.index.hover_model(&slice, Some(lsp_range), false, None);
}
Ok(
descriptor @ (FD::Compute | FD::Search | FD::Inverse | FD::Related | FD::InverseName),
) => {
let single_field = !matches!(descriptor, FD::Related);
let mapped_model = if matches!(descriptor, FD::InverseName) {
extract_comodel_name(match_.captures, &contents)
.map(|comodel_name| &contents[comodel_name.byte_range().shrink(1)])
} else {
this_model.inner
};
let mapped = some!(self.gather_mapped(
root,
match_,
Some(offset),
desc_value.byte_range(),
mapped_model,
&contents,
false,
Some(single_field)
));
let mut needle = mapped.needle;
let mut model = _I(mapped.model);
let mut range = mapped.range;
if !mapped.single_field {
some!(
self.index
.models
.resolve_mapped(&mut model, &mut needle, Some(&mut range))
.ok()
);
}
let model = _R(model);
return (self.index).hover_property_name(needle, model, Some(rope_conv(range, rope)));
}
Ok(FD::Groups) => {
let range = desc_value.byte_range().shrink(1);
let value = Cow::from(ok!(rope.try_slice(range.clone())));
let mut ref_ = None;
determine_csv_xmlid_subgroup(&mut ref_, (&value, range), offset);
let (needle, byte_range) = some!(ref_);
return self
.index
.hover_record(needle, Some(rope_conv(byte_range.map_unit(ByteOffset), rope)));
}
Ok(FD::Domain) | Err(_) => {}
}
return Ok(None);
}
Some(PyCompletions::Request)
| Some(PyCompletions::XmlId)
| Some(PyCompletions::ForXmlId)
| Some(PyCompletions::HasGroups)
| Some(PyCompletions::MappedTarget)
| Some(PyCompletions::Depends)
| Some(PyCompletions::ReadFn)
| Some(PyCompletions::Scope)
| Some(PyCompletions::Prop)
| Some(PyCompletions::FieldType)
| None => {}
}
}
}
if let Some((model, prop, range)) = self.attribute_at_offset(offset, root, &contents) {
let lsp_range = Some(rope_conv(range.map_unit(ByteOffset), rope));
return self.index.hover_property_name(prop, model, lsp_range);
}
let root = some!(top_level_stmt(ast.root_node(), offset));
let needle = some!(root.named_descendant_for_byte_range(offset, offset));
let lsp_range = span_conv(needle.range());
let (type_, scope) =
some!((self.index).type_of_range(root, needle.byte_range().map_unit(ByteOffset), &contents));
if let Some(model) = self.index.try_resolve_model(type_cache().resolve(type_), &scope) {
let model = _R(model);
let identifier = (needle.kind() == "identifier").then(|| &contents[needle.byte_range()]);
return self.index.hover_model(model, Some(lsp_range), true, identifier);
}
self.index.hover_variable(
(needle.kind() == "identifier").then(|| &contents[needle.byte_range()]),
type_,
Some(lsp_range),
)
}
pub(crate) fn python_signature_help(&self, params: SignatureHelpParams) -> anyhow::Result<Option<SignatureHelp>> {
use std::fmt::Write;
let uri = ¶ms.text_document_position_params.text_document.uri;
let document = some!((self.document_map).get(uri.path().as_str()));
let file_path = uri.to_file_path().unwrap();
let ast = some!((self.ast_map).get(file_path.to_str().unwrap()));
let contents = Cow::from(&document.rope);
let point = tree_sitter::Point::new(
params.text_document_position_params.position.line as _,
params.text_document_position_params.position.character as _,
);
let node = some!(ast.root_node().descendant_for_point_range(point, point));
let mut args = node;
while let Some(parent) = args.parent() {
if args.kind() == "argument_list" {
break;
}
args = parent;
}
if args.kind() != "argument_list" {
return Ok(None);
}
let active_parameter = 'find_param: {
let ByteOffset(offset) = rope_conv(params.text_document_position_params.position, document.rope.slice(..));
if let Some(contents) = contents.get(..=offset)
&& let Some(idx) = contents.bytes().rposition(|c| c == b',' || c == b'(')
{
if contents.as_bytes()[idx] == b'(' {
break 'find_param Some(0);
}
let prev_param = args.descendant_for_byte_range(idx, idx).unwrap().prev_named_sibling();
for (idx, arg) in args.named_children(&mut args.walk()).enumerate() {
if Some(arg) == prev_param {
break 'find_param Some((idx + 1) as u32);
}
}
}
None
};
let callee = some!(args.prev_named_sibling());
let Some((tid, _)) =
(self.index).type_of_range(ast.root_node(), callee.byte_range().map_unit(ByteOffset), &contents)
else {
return Ok(None);
};
let Type::Method(model_key, method) = type_cache().resolve(tid) else {
return Ok(None);
};
let method_key = some!(_G(method));
let rtype = (self.index).eval_method_rtype(method_key.into(), **model_key, None);
let model = some!((self.index).models.get(model_key));
let method_obj = some!(some!(model.methods.as_ref()).get(&method_key));
let mut label = format!("{method}(");
let mut parameters = vec![];
for (idx, param) in method_obj.arguments.as_deref().unwrap_or(&[]).iter().enumerate() {
let begin;
if idx == 0 {
begin = label.len();
_ = write!(&mut label, "{param}");
} else {
begin = label.len() + 2;
_ = write!(&mut label, ", {param}");
}
let end = label.len();
parameters.push(ParameterInformation {
label: ParameterLabel::LabelOffsets([begin as _, end as _]),
documentation: None,
});
}
let rtype = rtype.and_then(|rtype| self.index.type_display(rtype));
match rtype {
Some(rtype) => drop(write!(&mut label, ") -> {rtype}")),
None => label.push_str(") -> ..."),
};
let sig = SignatureInformation {
label,
active_parameter,
parameters: Some(parameters),
documentation: method_obj.docstring.as_ref().map(|doc| {
Documentation::MarkupContent(MarkupContent {
kind: MarkupKind::Markdown,
value: doc.to_string(),
})
}),
};
Ok(Some(SignatureHelp {
signatures: vec![sig],
active_signature: Some(0),
active_parameter: None,
}))
}
pub(crate) fn python_code_action(
&self,
params: CodeActionParams,
rope: RopeSlice<'_>,
) -> anyhow::Result<Option<CodeActionResponse>> {
let uri = ¶ms.text_document.uri;
let file_path = uri.to_file_path().unwrap();
let file_path_str = file_path.to_str().unwrap();
let ast = self
.ast_map
.get(file_path_str)
.ok_or_else(|| errloc!("Did not build AST for {}", file_path_str))?;
let ByteOffset(offset) = rope_conv(params.range.end, rope);
let contents = Cow::from(rope);
let query = PyCompletions::query();
let mut cursor = tree_sitter::QueryCursor::new();
let mut matches = cursor.matches(query, ast.root_node(), contents.as_bytes());
while let Some(match_) = matches.next() {
for capture in match_.captures {
let range = capture.node.byte_range();
match PyCompletions::from(capture.index) {
Some(PyCompletions::Model) if range.contains_end(offset) => {
let range = range.shrink(1);
let slice = ok!(rope.try_slice(range.clone()));
let slice = Cow::from(slice);
return self.index.code_action_for_model(&slice, &file_path);
}
_ => {}
}
}
}
Ok(None)
}
fn is_commandlist(cmdlist: Node, offset: usize) -> bool {
matches!(cmdlist.kind(), "list" | "list_comprehension")
&& cmdlist.byte_range().contains_end(offset)
&& cmdlist.parent().is_some_and(|parent| parent.kind() == "pair")
}
fn gather_commandlist<'text>(
&self,
cmdlist: Node,
root: Node,
match_: &tree_sitter::QueryMatch,
offset: usize,
range: std::ops::Range<usize>,
this_model: Option<&'text str>,
contents: &'text str,
for_replacing: bool,
) -> Option<(&'text str, ByteRange, Spur)> {
let mut access = contents[range.shrink(1)].to_string();
tracing::debug!(
"gather_commandlist: cmdlist range: {:?}, offset: {}",
cmdlist.byte_range(),
offset
);
let mut dest = cmdlist.descendant_for_byte_range(offset, offset);
tracing::debug!("Initial dest: {:?}", dest.map(|n| (n.kind(), n.byte_range())));
if dest.is_none() && offset > cmdlist.start_byte() {
tracing::debug!("No node at offset {}, trying offset - 1", offset);
if let Some(node) = cmdlist.descendant_for_byte_range(offset - 1, offset - 1) {
tracing::debug!(
"Found node at offset - 1: kind={}, range={:?}",
node.kind(),
node.byte_range()
);
if node.kind() == "string"
|| (node.kind() == "string_content" && node.parent().map(|p| p.kind()) == Some("string"))
|| node.kind() == "string_end"
{
let string_node = if node.kind() == "string" {
node
} else {
node.parent()?
};
if let Some(next_sibling) = string_node.next_sibling() {
tracing::debug!("String has next sibling: {}", next_sibling.kind());
if next_sibling.kind() != ":" {
dest = Some(string_node);
}
} else {
tracing::debug!("String has no next sibling, treating as incomplete");
dest = Some(string_node);
}
}
}
}
let mut dest = dest?;
if dest.kind() == "string_content" {
dest = dest.parent()?;
}
if dest.kind() != "string" {
dest = dest.parent()?;
}
if dest.kind() != "string" {
return None;
}
let mut is_broken_syntax = false;
if let Some(parent) = dest.parent() {
tracing::debug!("String parent kind: {}", parent.kind());
if parent.kind() == "dictionary" {
if let Some(next_sibling) = dest.next_sibling() {
tracing::debug!("String next sibling kind: {}", next_sibling.kind());
if next_sibling.kind() != ":" {
is_broken_syntax = true;
}
} else {
tracing::debug!("String has no next sibling");
is_broken_syntax = true;
}
} else if parent.kind() == "ERROR" {
if let Some(grandparent) = parent.parent() {
tracing::debug!("ERROR parent (grandparent) kind: {}", grandparent.kind());
if grandparent.kind() == "dictionary" {
is_broken_syntax = true;
}
}
}
}
if is_broken_syntax {
tracing::debug!("Detected broken syntax: string in dictionary without colon");
}
let (needle, model_str, range) = if is_broken_syntax {
let range = ByteRange {
start: ByteOffset(offset),
end: ByteOffset(offset),
};
let model = this_model.unwrap_or("");
("", model, range)
} else {
let Mapped {
needle, model, range, ..
} = self.gather_mapped(
root,
match_,
Some(offset),
dest.byte_range(),
this_model,
contents,
for_replacing,
None,
)?;
(needle, model, range)
};
tracing::debug!(
"needle={}, is_broken_syntax={}, model_str={}",
needle,
is_broken_syntax,
model_str
);
let mut cursor = cmdlist;
let mut count = 0;
while count < 30 {
count += 1;
let Some(candidate) = cursor.child_with_descendant(dest) else {
tracing::debug!("child_containing_descendant returned None at count={}", count);
return None;
};
let obj;
tracing::debug!("candidate kind: {}", candidate.kind());
if candidate.kind() == "tuple" {
obj = candidate.child_with_descendant(dest)?;
} else if candidate.kind() == "call" {
let args = dig!(candidate, argument_list(1))?;
obj = args.child_with_descendant(dest)?;
} else {
return None;
}
tracing::debug!("obj kind: {}", obj.kind());
if obj.kind() == "dictionary" {
let pair = obj.child_with_descendant(dest)?;
tracing::debug!("pair kind: {}", pair.kind());
if pair.kind() != "pair" {
if pair.kind() == "string" && pair.byte_range().contains(&offset) {
tracing::debug!("Breaking due to broken syntax string in dictionary");
break;
} else if pair.kind() == "ERROR" {
if pair.byte_range().contains(&offset) {
tracing::debug!("Breaking due to ERROR node containing offset");
break;
}
}
tracing::debug!("Returning None: pair kind {} is not 'pair'", pair.kind());
return None;
}
let key = dig!(pair, string)?;
if key.byte_range().contains_end(offset) {
break;
}
cursor = pair.child_with_descendant(dest)?;
access.push('.');
access.push_str(&contents[key.byte_range().shrink(1)]);
} else if obj.kind() == "set" {
break;
} else {
return None;
}
}
if count == 30 {
warn!("recursion limit hit");
}
access.push('.'); tracing::debug!("Access path: {}", access);
tracing::debug!("Initial model before resolve: {}", model_str);
let access = &mut access.as_str();
let mut model = _I(model_str);
if self.index.models.resolve_mapped(&mut model, access, None).is_err() {
tracing::debug!("resolve_mapped failed for model={} access={}", _R(model), access);
return None;
}
tracing::debug!("Resolved model: {}", _R(model));
Some((needle, range, model))
}
}
#[derive(Default, Clone)]
struct ThisModel<'a> {
inner: Option<&'a str>,
source: ThisModelKind,
top_level_range: core::ops::Range<usize>,
}
#[derive(Default, Clone, Copy)]
enum ThisModelKind {
Primary,
#[default]
Inherited,
}
impl<'this> ThisModel<'this> {
fn tag_model(
&mut self,
model: Node,
match_: &QueryMatch,
top_level_range: core::ops::Range<usize>,
contents: &'this str,
) {
if match_
.nodes_for_capture_index(PyCompletions::FieldType as _)
.next()
.is_some()
{
return;
}
debug_assert_eq!(model.kind(), "string");
let (is_name, mut is_inherit) = match_
.nodes_for_capture_index(PyCompletions::Prop as _)
.next()
.map(|prop| {
let prop = &contents[prop.byte_range()];
(prop == "_name", prop == "_inherit")
})
.unwrap_or((false, false));
let top_level_changed = top_level_range != self.top_level_range;
is_inherit = is_inherit && (top_level_changed || matches!(self.source, ThisModelKind::Inherited));
if is_inherit {
let parent = model.parent().expect(format_loc!("(tag_model) parent"));
is_inherit = parent.kind() == "assignment" || parent.kind() == "list" && parent.named_child_count() == 1;
}
if is_inherit || is_name && top_level_changed {
self.inner = Some(&contents[model.byte_range().shrink(1)]);
self.top_level_range = top_level_range;
if is_name {
self.source = ThisModelKind::Primary;
} else if is_inherit {
self.source = ThisModelKind::Inherited;
}
}
}
}
fn extract_string_needle_at_offset<'a>(
rope: RopeSlice<'a>,
range: core::ops::Range<usize>,
offset: usize,
) -> anyhow::Result<(Cow<'a, str>, core::ops::Range<ByteOffset>)> {
let slice = rope.try_slice(range.clone())?;
let relative_offset = range.start;
let needle = Cow::from(slice.try_slice(1..offset - relative_offset)?);
let byte_range = range.shrink(1).map_unit(ByteOffset);
Ok((needle, byte_range))
}
fn extract_comodel_name<'tree>(captures: &[QueryCapture<'tree>], contents: &str) -> Option<Node<'tree>> {
for cap in captures {
match PyCompletions::from(cap.index) {
Some(PyCompletions::Model) => {
if let Some(parent) = cap.node.parent()
&& parent.kind() == "argument_list"
{
return Some(cap.node);
}
}
Some(PyCompletions::FieldDescriptor) => {
let Ok(FieldDescriptors::ComodelName) = FieldDescriptors::from_str(&contents[cap.node.byte_range()])
else {
continue;
};
return cap.node.next_named_sibling();
}
_ => {}
}
}
None
}