#![cfg_attr(coverage_nightly, coverage(off))]
use anyhow::Result;
use std::path::Path;
use crate::models::error::TemplateError;
use crate::services::complexity::{ComplexityMetrics, FileComplexityMetrics, FunctionComplexity};
use crate::services::context::{AstItem, FileContext};
use crate::services::file_classifier::FileClassifier;
use crate::ast::languages::python::PythonStrategy;
use crate::ast::languages::LanguageStrategy;
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "path_exists")]
pub async fn analyze_python_file_with_complexity(
path: &Path,
classifier: Option<&FileClassifier>,
) -> Result<FileComplexityMetrics, TemplateError> {
analyze_python_file_with_complexity_and_classifier(path, classifier).await
}
async fn analyze_python_file_with_complexity_and_classifier(
path: &Path,
_classifier: Option<&FileClassifier>,
) -> Result<FileComplexityMetrics, TemplateError> {
let content = tokio::fs::read_to_string(path)
.await
.map_err(TemplateError::Io)?;
let strategy = PythonStrategy::new();
let ast = strategy
.parse_file(path, &content)
.await
.map_err(|e| TemplateError::InvalidUtf8(e.to_string()))?;
let functions = strategy.extract_functions(&ast);
let mut function_metrics = Vec::new();
for (i, _node) in functions.iter().enumerate() {
function_metrics.push(FunctionComplexity {
name: format!("function_{i}"),
line_start: (i * 10) as u32,
line_end: ((i + 1) * 10) as u32,
metrics: ComplexityMetrics {
cyclomatic: 1, cognitive: 1, nesting_max: 0,
lines: 10,
halstead: None,
},
});
}
let (cyclomatic, cognitive) = strategy.calculate_complexity(&ast);
Ok(FileComplexityMetrics {
path: path.display().to_string(),
total_complexity: ComplexityMetrics {
cyclomatic: cyclomatic as u16,
cognitive: cognitive as u16,
nesting_max: 2,
lines: 100,
halstead: None,
},
functions: function_metrics,
classes: Vec::new(), })
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "path_exists")]
pub async fn analyze_python_file(path: &Path) -> Result<FileContext, TemplateError> {
analyze_python_file_with_classifier(path, None).await
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "path_exists")]
pub async fn analyze_python_file_with_classifier(
path: &Path,
_classifier: Option<&FileClassifier>,
) -> Result<FileContext, TemplateError> {
let content = tokio::fs::read_to_string(path)
.await
.map_err(TemplateError::Io)?;
let strategy = PythonStrategy::new();
let ast = strategy
.parse_file(path, &content)
.await
.map_err(|e| TemplateError::InvalidUtf8(e.to_string()))?;
let functions = strategy.extract_functions(&ast);
let types = strategy.extract_types(&ast);
let imports = strategy.extract_imports(&ast);
let mut items = Vec::new();
for (i, _node) in imports.iter().enumerate() {
items.push(AstItem::Import {
module: format!("module_{i}"),
items: vec![],
alias: None,
line: i * 2, });
}
for (i, _node) in functions.iter().enumerate() {
items.push(AstItem::Function {
name: format!("function_{i}"),
visibility: String::new(), is_async: false, line: i * 10 + 20, });
}
for (i, _node) in types.iter().enumerate() {
items.push(AstItem::Struct {
name: format!("class_{i}"),
visibility: String::new(),
fields_count: 0,
derives: vec![],
line: (functions.len() + i) * 10 + 50, });
}
Ok(FileContext {
path: path.display().to_string(),
language: "python".to_string(),
items,
complexity_metrics: None,
})
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}
#[cfg(test)]
mod ast_python_compat_tests {
use super::*;
fn write_py(tmp: &tempfile::TempDir, name: &str, body: &str) -> std::path::PathBuf {
let path = tmp.path().join(name);
std::fs::write(&path, body).unwrap();
path
}
const SAMPLE: &str = r#"
import os
from collections import defaultdict
class MyClass:
pass
def hello():
return "world"
def another():
if True:
return 1
return 2
"#;
#[tokio::test]
async fn test_analyze_python_file_with_complexity_returns_metrics() {
let tmp = tempfile::tempdir().unwrap();
let p = write_py(&tmp, "a.py", SAMPLE);
let metrics = analyze_python_file_with_complexity(&p, None).await.unwrap();
assert_eq!(metrics.path, p.to_string_lossy());
}
#[tokio::test]
async fn test_analyze_python_file_with_complexity_with_classifier() {
let tmp = tempfile::tempdir().unwrap();
let p = write_py(&tmp, "b.py", SAMPLE);
let classifier = FileClassifier::default();
let _metrics = analyze_python_file_with_complexity(&p, Some(&classifier))
.await
.unwrap();
}
#[tokio::test]
async fn test_analyze_python_file_returns_context() {
let tmp = tempfile::tempdir().unwrap();
let p = write_py(&tmp, "c.py", SAMPLE);
let ctx = analyze_python_file(&p).await.unwrap();
assert!(!ctx.items.is_empty(), "items must populate from sample");
}
#[tokio::test]
async fn test_analyze_python_file_with_classifier_path_alias() {
let tmp = tempfile::tempdir().unwrap();
let p = write_py(&tmp, "d.py", SAMPLE);
let classifier = FileClassifier::default();
let ctx = analyze_python_file_with_classifier(&p, Some(&classifier))
.await
.unwrap();
assert!(!ctx.items.is_empty());
}
#[tokio::test]
async fn test_analyze_python_file_empty_source_no_panic() {
let tmp = tempfile::tempdir().unwrap();
let p = write_py(&tmp, "empty.py", "");
let _ = analyze_python_file(&p).await.unwrap();
}
#[tokio::test]
async fn test_analyze_python_file_missing_file_returns_err() {
let p = std::path::Path::new("/tmp/pmat_missing_python_xyz.py");
let result = analyze_python_file(p).await;
assert!(result.is_err(), "missing file must propagate as error");
}
#[tokio::test]
async fn test_analyze_python_file_with_complexity_missing_file_returns_err() {
let p = std::path::Path::new("/tmp/pmat_missing_complexity_xyz.py");
let result = analyze_python_file_with_complexity(p, None).await;
assert!(result.is_err());
}
}