deslop 0.2.0

A static analyzer that spots low-context and AI-assisted code patterns across naming, concurrency, security, performance, and test quality.
Documentation
use std::collections::BTreeSet;

use crate::analysis::ParsedFile;
use crate::model::{Finding, Severity};

use super::{
    OPTION_BAG_FIELD_THRESHOLD, OPTION_BAG_SIGNAL_THRESHOLD, is_client_constructor_text,
    is_config_load_call, is_config_load_text, is_file_io_call, is_network_call_path,
    is_subprocess_call, mutable_default_kind, mutates_binding, option_bag_boolean_field,
    option_bag_optional_field, resolve_top_level_call_path, should_skip_import_time_config_finding,
    should_skip_wide_contract_rule, wide_contract_markers,
};

pub(super) fn module_state_findings(file: &ParsedFile) -> Vec<Finding> {
    let mut findings = Vec::new();
    findings.extend(dataclass_mutable_default_findings(file));
    findings.extend(option_bag_model_findings(file));
    findings.extend(public_any_type_leak_model_findings(file));
    findings.extend(import_time_network_call_findings(file));
    findings.extend(import_time_file_io_findings(file));
    findings.extend(import_time_subprocess_findings(file));
    findings.extend(module_singleton_client_side_effect_findings(file));
    findings.extend(mutable_module_global_state_findings(file));
    findings.extend(import_time_config_load_findings(file));
    findings
}

fn dataclass_mutable_default_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    let mut findings = Vec::new();
    for model in file
        .python_models()
        .iter()
        .filter(|model| model.is_dataclass)
    {
        for field in &model.fields {
            let Some(default_text) = field.default_text.as_deref() else {
                continue;
            };
            let Some(mutable_kind) = mutable_default_kind(default_text) else {
                continue;
            };
            if default_text.contains("default_factory") {
                continue;
            }

            findings.push(Finding {
                rule_id: "dataclass_mutable_default".to_string(),
                severity: Severity::Warning,
                path: file.path.clone(),
                function_name: None,
                start_line: field.line,
                end_line: field.line,
                message: format!(
                    "dataclass {} uses a mutable default for field {} instead of default_factory",
                    model.name, field.name
                ),
                evidence: vec![
                    format!("field={}", field.name),
                    format!("default_expression={default_text}"),
                    format!("mutable_kind={mutable_kind}"),
                ],
            });
        }
    }

    findings
}

fn option_bag_model_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    file.python_models()
        .iter()
        .filter(|model| (model.is_dataclass || model.is_typed_dict) && !model.name.starts_with('_'))
        .filter_map(|model| {
            let optional_fields = model
                .fields
                .iter()
                .filter(|field| option_bag_optional_field(field.annotation_text.as_deref(), field.default_text.as_deref()))
                .map(|field| field.name.as_str())
                .collect::<Vec<_>>();
            let boolean_fields = model
                .fields
                .iter()
                .filter(|field| option_bag_boolean_field(field.annotation_text.as_deref(), field.default_text.as_deref()))
                .map(|field| field.name.as_str())
                .collect::<Vec<_>>();

            if model.fields.len() < OPTION_BAG_FIELD_THRESHOLD
                || optional_fields.len() + boolean_fields.len() < OPTION_BAG_SIGNAL_THRESHOLD
                || (optional_fields.len() < 3 && boolean_fields.len() < 2)
            {
                return None;
            }

            let mut evidence = vec![format!("field_count={}", model.fields.len())];
            evidence.push(format!("optional_fields={}", optional_fields.join(",")));
            evidence.push(format!("boolean_fields={}", boolean_fields.join(",")));
            evidence.push(format!("method_count={}", model.method_names.len()));
            if !model.base_classes.is_empty() {
                evidence.push(format!("base_classes={}", model.base_classes.join(",")));
            }
            if !model.decorators.is_empty() {
                evidence.push(format!("decorators={}", model.decorators.join(",")));
            }

            Some(Finding {
                rule_id: "option_bag_model".to_string(),
                severity: Severity::Info,
                path: file.path.clone(),
                function_name: None,
                start_line: model.line,
                end_line: model.line,
                message: format!(
                    "model {} encodes many optional or boolean switches and looks like an option bag",
                    model.name
                ),
                evidence,
            })
        })
        .collect()
}

fn public_any_type_leak_model_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file || should_skip_wide_contract_rule(file) {
        return Vec::new();
    }

    let mut findings = Vec::new();
    for model in file
        .python_models()
        .iter()
        .filter(|model| !model.name.starts_with('_'))
    {
        for field in &model.fields {
            let Some(annotation_text) = field.annotation_text.as_deref() else {
                continue;
            };
            let markers = wide_contract_markers(annotation_text);
            if markers.is_empty() {
                continue;
            }

            findings.push(Finding {
                rule_id: "public_any_type_leak".to_string(),
                severity: Severity::Info,
                path: file.path.clone(),
                function_name: None,
                start_line: field.line,
                end_line: field.line,
                message: format!(
                    "model {} exposes field {} with a very wide type contract",
                    model.name, field.name
                ),
                evidence: markers
                    .into_iter()
                    .map(|marker| format!("wide_type_marker={marker}"))
                    .collect(),
            });
        }
    }

    findings
}

fn import_time_network_call_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    file.module_scope_calls
        .iter()
        .filter_map(|call| {
            let resolved = resolve_top_level_call_path(file, call);
            if !is_network_call_path(&resolved, &call.name) {
                return None;
            }

            Some(Finding {
                rule_id: "import_time_network_call".to_string(),
                severity: Severity::Warning,
                path: file.path.clone(),
                function_name: None,
                start_line: call.line,
                end_line: call.line,
                message: "module performs a network call while being imported".to_string(),
                evidence: vec![format!("module_scope_call={}", call.text.trim())],
            })
        })
        .collect()
}

fn import_time_file_io_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    file.module_scope_calls
        .iter()
        .filter_map(|call| {
            let resolved = resolve_top_level_call_path(file, call);
            if !is_file_io_call(call, &resolved) {
                return None;
            }

            Some(Finding {
                rule_id: "import_time_file_io".to_string(),
                severity: Severity::Info,
                path: file.path.clone(),
                function_name: None,
                start_line: call.line,
                end_line: call.line,
                message: "module performs filesystem work while being imported".to_string(),
                evidence: vec![format!("module_scope_call={}", call.text.trim())],
            })
        })
        .collect()
}

fn import_time_subprocess_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    file.module_scope_calls
        .iter()
        .filter_map(|call| {
            let resolved = resolve_top_level_call_path(file, call);
            if !is_subprocess_call(&resolved) {
                return None;
            }

            Some(Finding {
                rule_id: "import_time_subprocess".to_string(),
                severity: Severity::Warning,
                path: file.path.clone(),
                function_name: None,
                start_line: call.line,
                end_line: call.line,
                message: "module launches a subprocess while being imported".to_string(),
                evidence: vec![format!("module_scope_call={}", call.text.trim())],
            })
        })
        .collect()
}

fn module_singleton_client_side_effect_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    file.top_level_bindings
        .iter()
        .filter(|binding| {
            !binding
                .name
                .chars()
                .all(|character| character.is_ascii_uppercase() || character == '_')
        })
        .filter_map(|binding| {
            let lower_value = binding.value_text.to_ascii_lowercase();
            if !is_client_constructor_text(&lower_value) {
                return None;
            }

            Some(Finding {
                rule_id: "module_singleton_client_side_effect".to_string(),
                severity: Severity::Info,
                path: file.path.clone(),
                function_name: None,
                start_line: binding.line,
                end_line: binding.line,
                message: format!(
                    "module eagerly constructs client-like global {} at import time",
                    binding.name
                ),
                evidence: vec![format!(
                    "binding={}= {}",
                    binding.name,
                    binding.value_text.trim()
                )],
            })
        })
        .collect()
}

fn mutable_module_global_state_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    file.top_level_bindings
        .iter()
        .filter_map(|binding| {
            let mutable_kind = mutable_default_kind(&binding.value_text)?;

            let mutating_functions = file
                .functions
                .iter()
                .filter(|function| {
                    !function.is_test_function && mutates_binding(function, &binding.name)
                })
                .map(|function| function.fingerprint.name.as_str())
                .collect::<BTreeSet<_>>();
            if mutating_functions.len() < 2 {
                return None;
            }

            Some(Finding {
                rule_id: "mutable_module_global_state".to_string(),
                severity: Severity::Info,
                path: file.path.clone(),
                function_name: None,
                start_line: binding.line,
                end_line: binding.line,
                message: format!(
                    "module-level mutable global {} is updated from multiple functions",
                    binding.name
                ),
                evidence: vec![
                    format!("mutable_kind={mutable_kind}"),
                    format!(
                        "mutating_functions={}",
                        mutating_functions.into_iter().collect::<Vec<_>>().join(",")
                    ),
                ],
            })
        })
        .collect()
}

fn import_time_config_load_findings(file: &ParsedFile) -> Vec<Finding> {
    if file.is_test_file {
        return Vec::new();
    }

    let mut findings = Vec::new();
    for call in &file.module_scope_calls {
        let resolved = resolve_top_level_call_path(file, call).to_ascii_lowercase();
        let lower_text = call.text.to_ascii_lowercase();
        if !is_config_load_call(&resolved, &lower_text) {
            continue;
        }
        if should_skip_import_time_config_finding(file, &resolved, &lower_text) {
            continue;
        }

        findings.push(Finding {
            rule_id: "import_time_config_load".to_string(),
            severity: Severity::Info,
            path: file.path.clone(),
            function_name: None,
            start_line: call.line,
            end_line: call.line,
            message: "module loads configuration or secrets while being imported".to_string(),
            evidence: vec![format!("module_scope_call={}", call.text.trim())],
        });
    }

    for binding in &file.top_level_bindings {
        let lower_value = binding.value_text.to_ascii_lowercase();
        if !is_config_load_text(&lower_value) {
            continue;
        }
        if should_skip_import_time_config_finding(file, "", &lower_value) {
            continue;
        }

        findings.push(Finding {
            rule_id: "import_time_config_load".to_string(),
            severity: Severity::Info,
            path: file.path.clone(),
            function_name: None,
            start_line: binding.line,
            end_line: binding.line,
            message: format!(
                "module initializes {} from configuration or secrets at import time",
                binding.name
            ),
            evidence: vec![format!(
                "binding={}= {}",
                binding.name,
                binding.value_text.trim()
            )],
        });
    }

    findings
}