agent-file-tools 0.29.0

Agent File Tools — tree-sitter powered code analysis for AI agents
Documentation
use std::collections::BTreeMap;

use crate::compress::generic::{dedup_consecutive, middle_truncate, strip_ansi};
use crate::compress::Compressor;

const MAX_LINES: usize = 300;

pub struct MypyCompressor;

impl Compressor for MypyCompressor {
    fn matches(&self, command: &str) -> bool {
        let tokens = command_tokens(command).collect::<Vec<_>>();
        tokens.iter().any(|token| token == "mypy")
            || tokens
                .windows(3)
                .any(|window| matches!(window, [python, flag, module] if (python == "python" || python == "python3") && flag == "-m" && module == "mypy"))
    }

    fn compress(&self, _command: &str, output: &str) -> String {
        compress_mypy(output)
    }
}

fn compress_mypy(output: &str) -> String {
    let trimmed = output.trim();
    if trimmed.starts_with("Success: no issues found") {
        return "mypy: clean".to_string();
    }

    let mut by_file: BTreeMap<String, Vec<String>> = BTreeMap::new();
    let mut fileless = Vec::new();
    let mut summary = None;
    let mut previous_error_file: Option<String> = None;

    for line in output.lines() {
        let trimmed = line.trim_end();
        if is_summary_line(trimmed) {
            summary = Some(trimmed.to_string());
            previous_error_file = None;
            continue;
        }

        if let Some((file, severity)) = parse_mypy_line(trimmed) {
            match severity {
                "error" => {
                    by_file
                        .entry(file.to_string())
                        .or_default()
                        .push(trimmed.to_string());
                    previous_error_file = Some(file.to_string());
                }
                "note" => {
                    if previous_error_file.as_deref() == Some(file) {
                        by_file
                            .entry(file.to_string())
                            .or_default()
                            .push(trimmed.to_string());
                    }
                }
                _ => previous_error_file = None,
            }
        } else if trimmed.contains("error:") && !trimmed.is_empty() {
            fileless.push(trimmed.to_string());
            previous_error_file = None;
        } else {
            previous_error_file = None;
        }
    }

    let mut lines = Vec::new();
    lines.extend(fileless);
    for (_file, diagnostics) in by_file {
        if !lines.is_empty() && !diagnostics.is_empty() {
            lines.push(String::new());
        }
        lines.extend(diagnostics);
    }
    if let Some(summary) = summary {
        if !lines.is_empty() {
            lines.push(String::new());
        }
        lines.push(summary);
    }

    if lines.is_empty() {
        return output.trim_end().to_string();
    }

    finish(&lines.join("\n"))
}

fn command_tokens(command: &str) -> impl Iterator<Item = String> + '_ {
    command
        .split_whitespace()
        .map(|token| token.trim_matches(|ch| matches!(ch, '\'' | '"')))
        .filter(|token| !matches!(*token, "npx" | "pnpm" | "yarn" | "bun" | "bunx" | "exec"))
        .map(|token| {
            token
                .rsplit(['/', '\\'])
                .next()
                .unwrap_or(token)
                .trim_end_matches(".cmd")
                .to_string()
        })
}

fn parse_mypy_line(line: &str) -> Option<(&str, &str)> {
    let (file, rest) = line.split_once(':')?;
    let rest = rest.trim_start();
    let (_, rest) = split_number_prefix(rest)?;
    let rest = rest.strip_prefix(':')?.trim_start();
    let rest = if let Some(stripped) = strip_column(rest) {
        stripped
    } else {
        rest
    };
    let (severity, _) = rest.split_once(':')?;
    if matches!(severity, "error" | "note") {
        Some((file, severity))
    } else {
        None
    }
}

fn strip_column(rest: &str) -> Option<&str> {
    let (maybe_column, tail) = rest.split_once(':')?;
    if maybe_column.chars().all(|ch| ch.is_ascii_digit()) {
        Some(tail.trim_start())
    } else {
        None
    }
}

fn split_number_prefix(input: &str) -> Option<(&str, &str)> {
    let digits = input
        .char_indices()
        .take_while(|(_, ch)| ch.is_ascii_digit())
        .last()
        .map(|(index, ch)| index + ch.len_utf8())?;
    Some(input.split_at(digits))
}

fn is_summary_line(trimmed: &str) -> bool {
    (trimmed.starts_with("Found ") && trimmed.contains(" error"))
        || trimmed.starts_with("Success: no issues found")
}

fn finish(input: &str) -> String {
    let stripped = strip_ansi(input);
    let deduped = dedup_consecutive(&stripped);
    cap_lines(
        &middle_truncate(&deduped, 32 * 1024, 16 * 1024, 16 * 1024),
        MAX_LINES,
    )
}

fn cap_lines(input: &str, max_lines: usize) -> String {
    let lines: Vec<&str> = input.lines().collect();
    if lines.len() <= max_lines {
        return input.trim_end().to_string();
    }
    let mut kept = lines
        .iter()
        .take(max_lines)
        .copied()
        .collect::<Vec<_>>()
        .join("\n");
    kept.push_str(&format!(
        "\n... truncated {} lines",
        lines.len() - max_lines
    ));
    kept
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn matches_mypy_invocations() {
        let compressor = MypyCompressor;
        assert!(compressor.matches("mypy src"));
        assert!(compressor.matches("python -m mypy src"));
        assert!(compressor.matches("python3 -m mypy --strict"));
        assert!(compressor.matches("uv run mypy src"));
        assert!(!compressor.matches("cargo build"));
        assert!(!compressor.matches("ls"));
    }

    #[test]
    fn compresses_real_success_case() {
        let output = "Success: no issues found in 1 source file\n";
        let compressed = compress_mypy(output);
        assert_eq!(compressed, "mypy: clean");
        assert!(compressed.len() < output.len());
    }

    #[test]
    fn preserves_error_lines_and_summary() {
        let output = "src/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:15: error: Missing return statement  [return]\nsrc/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]\nFound 3 errors in 2 files (checked 50 source files)\n";
        let compressed = compress_mypy(output);
        assert!(compressed
            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
        assert!(compressed.contains("src/a.py:15: error: Missing return statement  [return]"));
        assert!(compressed.contains("src/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]"));
        assert!(compressed.contains("Found 3 errors in 2 files (checked 50 source files)"));
    }

    #[test]
    fn keeps_attached_notes_and_drops_standalone_notes() {
        let output = "src/a.py:1: note: Standalone note\nsrc/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:10: note: Expected int\nsrc/b.py:8: note: Use `Type[X]` for class types\nFound 1 error in 1 file (checked 2 source files)\n";
        let compressed = compress_mypy(output);
        assert!(compressed
            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
        assert!(compressed.contains("src/a.py:10: note: Expected int"));
        assert!(!compressed.contains("Standalone note"));
        assert!(!compressed.contains("Use `Type[X]`"));
    }

    #[test]
    fn compresses_large_note_heavy_input() {
        let mut output = String::new();
        for index in 0..500 {
            output.push_str(&format!(
                "src/file{}.py:{}: note: Standalone informational note that should be dropped\n",
                index,
                index + 1
            ));
        }
        output.push_str("src/a.py:10: error: Incompatible types in assignment  [assignment]\n");
        output.push_str("Found 1 error in 1 file (checked 501 source files)\n");
        let compressed = compress_mypy(&output);
        assert!(compressed
            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
        assert!(compressed.contains("Found 1 error in 1 file"));
        assert!(compressed.len() < output.len() / 2);
        assert!(!compressed.contains("Standalone informational"));
    }
}