use super::helpers::{bytes_match_keyword, extract_ascii_ident};
pub(super) fn extract_decorator_type_usages(line: &str, local_uses: &mut Vec<String>) {
if !line.contains('(') {
return;
}
const SKIP_IDENTS: &[&str] = &[
"None",
"True",
"False",
"str",
"int",
"float",
"bool",
"bytes",
"list",
"dict",
"set",
"tuple",
"frozenset",
"type",
"object",
"Any",
"Union",
"Optional",
"List",
"Dict",
"Set",
"Tuple",
"Callable",
"Sequence",
"Mapping",
"Iterable",
"Iterator",
"Type",
"self",
"cls",
];
let bytes = line.as_bytes();
let len = bytes.len();
let mut i = 0;
while i < len {
if bytes_match_keyword(bytes, i, b"response_model=")
|| bytes_match_keyword(bytes, i, b"response_class=")
{
i += 15;
while i < len && bytes[i].is_ascii_whitespace() {
i += 1;
}
extract_type_from_decorator(line, &mut i, local_uses, SKIP_IDENTS);
continue;
}
if bytes_match_keyword(bytes, i, b"Depends(") {
i += 8;
while i < len && bytes[i].is_ascii_whitespace() {
i += 1;
}
if i < len && (bytes[i].is_ascii_alphabetic() || bytes[i] == b'_') {
let start = i;
while i < len && (bytes[i].is_ascii_alphanumeric() || bytes[i] == b'_') {
i += 1;
}
let ident = extract_ascii_ident(bytes, start, i);
if !ident.is_empty()
&& !SKIP_IDENTS.contains(&ident.as_str())
&& !local_uses.contains(&ident)
{
local_uses.push(ident);
}
}
continue;
}
i += 1;
}
}
fn extract_type_from_decorator(
line: &str,
pos: &mut usize,
local_uses: &mut Vec<String>,
skip_idents: &[&str],
) {
let bytes = line.as_bytes();
let len = bytes.len();
let i = *pos;
if i >= len || !(bytes[i].is_ascii_alphabetic() || bytes[i] == b'_') {
return;
}
let start = i;
let mut j = i;
while j < len && (bytes[j].is_ascii_alphanumeric() || bytes[j] == b'_') {
j += 1;
}
let ident = extract_ascii_ident(bytes, start, j);
*pos = j;
if j < len && bytes[j] == b'[' {
j += 1;
let mut bracket_depth = 1;
while j < len && bracket_depth > 0 {
match bytes[j] {
b'[' => bracket_depth += 1,
b']' => bracket_depth -= 1,
_ if bytes[j].is_ascii_alphabetic() || bytes[j] == b'_' => {
let inner_start = j;
while j < len && (bytes[j].is_ascii_alphanumeric() || bytes[j] == b'_') {
j += 1;
}
let inner_ident = extract_ascii_ident(bytes, inner_start, j);
if !inner_ident.is_empty()
&& !skip_idents.contains(&inner_ident.as_str())
&& !local_uses.contains(&inner_ident)
{
local_uses.push(inner_ident);
}
continue;
}
_ => {}
}
j += 1;
}
*pos = j;
} else if !ident.is_empty()
&& !skip_idents.contains(&ident.as_str())
&& !local_uses.contains(&ident)
{
local_uses.push(ident);
}
}
pub(super) fn is_framework_decorator(line: &str) -> bool {
let lower = line.to_lowercase();
if lower.contains("@pytest.fixture")
|| lower.contains("@fixture")
|| lower.contains("@pytest.mark")
|| lower.contains("@pytest.parametrize")
{
return true;
}
if lower.contains(".command")
|| lower.contains("@click.")
|| lower.contains("@app.command")
|| lower.contains("@typer.")
{
return true;
}
if lower.contains("@app.get")
|| lower.contains("@app.post")
|| lower.contains("@app.put")
|| lower.contains("@app.delete")
|| lower.contains("@app.patch")
|| lower.contains("@router.get")
|| lower.contains("@router.post")
|| lower.contains("@router.put")
|| lower.contains("@router.delete")
|| lower.contains("@router.patch")
|| lower.contains("@api_router.")
{
return true;
}
if lower.contains("@app.route")
|| lower.contains("@blueprint.route")
|| lower.contains(".route(")
{
return true;
}
if lower.contains("@celery.task")
|| lower.contains("@app.task")
|| lower.contains("@shared_task")
{
return true;
}
if lower.contains("@admin.register")
|| lower.contains("@receiver")
|| lower.contains("@login_required")
|| lower.contains("@permission_required")
{
return true;
}
if lower.contains("@cron") || lower.contains("@func") {
return true;
}
if lower.contains("@rumps.") || lower.contains(".timer(") {
return true;
}
if lower.contains("@on_event")
|| lower.contains("@event_handler")
|| lower.contains("@callback")
|| lower.contains("@hook")
|| lower.contains("@register")
{
return true;
}
false
}
pub(super) fn extract_first_string_literal(text: &str) -> Option<String> {
let mut in_quote: Option<char> = None;
let mut buf = String::new();
for ch in text.chars() {
if let Some(q) = in_quote {
if ch == q {
return Some(buf);
} else {
buf.push(ch);
}
} else if ch == '"' || ch == '\'' {
in_quote = Some(ch);
}
}
None
}
pub(super) fn parse_route_decorator(
line: &str,
line_num: usize,
) -> Option<crate::types::RouteInfo> {
let lower = line.to_lowercase();
let mut framework = None;
let mut method = None;
let mut methods_param: Option<String> = None;
for (pat, m) in [
("@app.get", "GET"),
("@app.post", "POST"),
("@app.put", "PUT"),
("@app.delete", "DELETE"),
("@app.patch", "PATCH"),
("@router.get", "GET"),
("@router.post", "POST"),
("@router.put", "PUT"),
("@router.delete", "DELETE"),
("@router.patch", "PATCH"),
("@api_router.get", "GET"),
("@api_router.post", "POST"),
("@api_router.put", "PUT"),
("@api_router.delete", "DELETE"),
("@api_router.patch", "PATCH"),
] {
if lower.contains(pat) {
framework = Some("fastapi");
method = Some(m);
break;
}
}
if framework.is_none()
&& (lower.contains("@app.route")
|| lower.contains("@blueprint.route")
|| lower.contains(".route("))
{
framework = Some("flask");
if let Some(pos) = line.find("methods")
&& let Some(start) = line[pos..].find('[')
&& let Some(end) = line[pos + start + 1..].find(']')
{
let body = &line[pos + start + 1..pos + start + 1 + end];
let tokens: Vec<String> = body
.split([',', ' ', '\t'])
.filter_map(|p| {
let trimmed = p.trim().trim_matches(|c| c == '"' || c == '\'');
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_uppercase())
}
})
.collect();
if !tokens.is_empty() {
methods_param = Some(tokens.join(","));
}
}
method = Some(methods_param.as_deref().unwrap_or("route"));
}
let framework = framework?;
let method = method.unwrap_or("route");
let path = extract_first_string_literal(line);
Some(crate::types::RouteInfo {
framework: framework.to_string(),
method: method.to_string(),
path,
name: None,
line: line_num,
})
}