use std::collections::HashMap;
use std::path::{Path, PathBuf};
use anyhow::Result;
use clap::Args;
use tree_sitter::{Node, Parser, Tree};
use tree_sitter_python::LANGUAGE as PYTHON_LANGUAGE;
use walkdir::WalkDir;
use crate::output::{OutputFormat, OutputWriter};
use super::error::{ContractsError, ContractsResult};
use super::types::{
Confidence, ExceptionSpec, FunctionSpecs, InputOutputSpec,
OutputFormat as ContractsOutputFormat, PropertySpec, SpecsByType, SpecsReport, SpecsSummary,
};
use super::validation::{check_ast_depth, read_file_safe, validate_file_path};
const MAX_LITERAL_DEPTH: usize = 10;
const MAX_LITERAL_SIZE: usize = 10_000;
#[derive(Debug, Args)]
pub struct SpecsArgs {
#[arg(long = "from-tests", short = 't')]
pub from_tests: PathBuf,
#[arg(
long = "output-format",
short = 'o',
hide = true,
default_value = "json"
)]
pub output_format: ContractsOutputFormat,
#[arg(long)]
pub function: Option<String>,
#[arg(long)]
pub source: Option<PathBuf>,
}
impl SpecsArgs {
pub fn run(&self, format: OutputFormat, quiet: bool) -> Result<()> {
let writer = OutputWriter::new(format, quiet);
if !self.from_tests.exists() {
return Err(ContractsError::TestPathNotFound {
path: self.from_tests.clone(),
}
.into());
}
writer.progress(&format!(
"Extracting specs from {}...",
self.from_tests.display()
));
let report = run_specs(&self.from_tests, self.function.as_deref())?;
let use_text = matches!(self.output_format, ContractsOutputFormat::Text)
|| matches!(format, OutputFormat::Text);
if use_text {
let text = format_specs_text(&report);
writer.write_text(&text)?;
} else {
writer.write(&report)?;
}
Ok(())
}
}
pub fn run_specs(test_path: &Path, function_filter: Option<&str>) -> ContractsResult<SpecsReport> {
let mut all_specs: HashMap<String, FunctionSpecs> = HashMap::new();
let mut test_functions_scanned = 0u32;
let mut test_files_scanned = 0u32;
if test_path.is_file() {
let file_report = extract_from_test_file(test_path)?;
test_files_scanned = 1;
test_functions_scanned = file_report.test_functions_scanned;
merge_specs(&mut all_specs, file_report.functions);
} else {
for entry in WalkDir::new(test_path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| {
e.path().is_file()
&& e.file_name()
.to_str()
.is_some_and(|n| n.starts_with("test_") && n.ends_with(".py"))
})
{
let file_path = entry.path();
match extract_from_test_file(file_path) {
Ok(file_report) => {
test_files_scanned += 1;
test_functions_scanned += file_report.test_functions_scanned;
merge_specs(&mut all_specs, file_report.functions);
}
Err(e) => {
eprintln!("Warning: Failed to parse {}: {}", file_path.display(), e);
}
}
}
}
let mut functions: Vec<FunctionSpecs> = all_specs.into_values().collect();
if let Some(filter) = function_filter {
functions.retain(|f| f.function_name == filter);
}
functions.sort_by(|a, b| a.function_name.cmp(&b.function_name));
let total_io = functions
.iter()
.map(|f| f.input_output_specs.len() as u32)
.sum();
let total_exc = functions
.iter()
.map(|f| f.exception_specs.len() as u32)
.sum();
let total_prop = functions
.iter()
.map(|f| f.property_specs.len() as u32)
.sum();
let total_specs = total_io + total_exc + total_prop;
let summary = SpecsSummary {
total_specs,
by_type: SpecsByType {
input_output: total_io,
exception: total_exc,
property: total_prop,
},
test_functions_scanned,
test_files_scanned,
functions_found: functions.len() as u32,
};
Ok(SpecsReport { functions, summary })
}
struct FileSpecReport {
functions: Vec<FunctionSpecs>,
test_functions_scanned: u32,
}
fn extract_from_test_file(path: &Path) -> ContractsResult<FileSpecReport> {
let canonical = validate_file_path(path)?;
let source = read_file_safe(&canonical)?;
if source.trim().is_empty() {
return Ok(FileSpecReport {
functions: vec![],
test_functions_scanned: 0,
});
}
let tree = parse_python(&source, &canonical)?;
let root = tree.root_node();
let mut specs: HashMap<String, FunctionSpecs> = HashMap::new();
let mut test_func_count = 0u32;
let mut cursor = root.walk();
for child in root.children(&mut cursor) {
match child.kind() {
"function_definition" => {
if let Some(name_node) = child.child_by_field_name("name") {
let name = get_node_text(name_node, source.as_bytes());
if name.starts_with("test_") {
test_func_count += 1;
process_test_function(child, name, source.as_bytes(), &mut specs, 0)?;
}
}
}
"class_definition" => {
if let Some(name_node) = child.child_by_field_name("name") {
let class_name = get_node_text(name_node, source.as_bytes());
if class_name.starts_with("Test") {
if let Some(body) = child.child_by_field_name("body") {
let mut class_cursor = body.walk();
for method in body.children(&mut class_cursor) {
if method.kind() == "function_definition" {
if let Some(method_name) = method.child_by_field_name("name") {
let mname = get_node_text(method_name, source.as_bytes());
if mname.starts_with("test_") {
test_func_count += 1;
process_test_function(
method,
mname,
source.as_bytes(),
&mut specs,
0,
)?;
}
}
}
}
}
}
}
}
_ => {}
}
}
let functions: Vec<FunctionSpecs> = specs
.into_values()
.map(|mut fs| {
fs.summary = generate_summary(&fs);
fs
})
.collect();
Ok(FileSpecReport {
functions,
test_functions_scanned: test_func_count,
})
}
fn parse_python(source: &str, file: &Path) -> ContractsResult<Tree> {
let mut parser = Parser::new();
parser
.set_language(&PYTHON_LANGUAGE.into())
.map_err(|e| ContractsError::ParseError {
file: file.to_path_buf(),
message: format!("Failed to set Python language: {}", e),
})?;
parser
.parse(source, None)
.ok_or_else(|| ContractsError::ParseError {
file: file.to_path_buf(),
message: "Parsing returned None".to_string(),
})
}
fn get_node_text<'a>(node: Node<'a>, source: &'a [u8]) -> &'a str {
let start = node.start_byte();
let end = node.end_byte();
if end <= source.len() {
std::str::from_utf8(&source[start..end]).unwrap_or("")
} else {
""
}
}
fn process_test_function(
func: Node,
test_func_name: &str,
source: &[u8],
specs: &mut HashMap<String, FunctionSpecs>,
depth: usize,
) -> ContractsResult<()> {
check_ast_depth(depth, &PathBuf::from("<test>"))?;
let body = match func.child_by_field_name("body") {
Some(b) => b,
None => return Ok(()),
};
let mut cursor = body.walk();
for stmt in body.children(&mut cursor) {
match stmt.kind() {
"assert_statement" => {
extract_from_assert(stmt, test_func_name, source, specs)?;
}
"with_statement" => {
extract_from_with(stmt, test_func_name, source, specs)?;
}
"expression_statement" => {
let mut inner = stmt.walk();
for child in stmt.children(&mut inner) {
if child.kind() == "assert_statement" {
extract_from_assert(child, test_func_name, source, specs)?;
}
}
}
_ => {}
}
}
Ok(())
}
fn extract_from_assert(
assert_stmt: Node,
test_func_name: &str,
source: &[u8],
specs: &mut HashMap<String, FunctionSpecs>,
) -> ContractsResult<()> {
let line = assert_stmt.start_position().row as u32 + 1;
let mut cursor = assert_stmt.walk();
let mut test_expr = None;
for child in assert_stmt.children(&mut cursor) {
if child.kind() != "assert" {
test_expr = Some(child);
break;
}
}
let test_expr = match test_expr {
Some(e) => e,
None => return Ok(()),
};
if try_extract_isinstance_spec(test_expr, test_func_name, line, source, specs) {
return Ok(());
}
if try_extract_comparison_spec(test_expr, test_func_name, line, source, specs) {
return Ok(());
}
Ok(())
}
fn try_extract_isinstance_spec(
expr: Node,
test_func_name: &str,
line: u32,
source: &[u8],
specs: &mut HashMap<String, FunctionSpecs>,
) -> bool {
if expr.kind() != "call" {
return false;
}
let func_node = match expr.child_by_field_name("function") {
Some(f) => f,
None => return false,
};
let func_name = get_node_text(func_node, source);
if func_name != "isinstance" {
return false;
}
let args = match expr.child_by_field_name("arguments") {
Some(a) => a,
None => return false,
};
let mut arg_cursor = args.walk();
let mut first_arg = None;
let mut second_arg = None;
for child in args.children(&mut arg_cursor) {
let kind = child.kind();
if kind == "(" || kind == ")" || kind == "," {
continue;
}
if first_arg.is_none() {
first_arg = Some(child);
} else if second_arg.is_none() {
second_arg = Some(child);
break;
}
}
let (first_arg, second_arg) = match (first_arg, second_arg) {
(Some(f), Some(s)) => (f, s),
_ => return false,
};
if first_arg.kind() != "call" {
return false;
}
let (fname, _inputs) = match extract_call_info(first_arg, source) {
Some(info) => info,
None => return false,
};
let type_name = get_node_text(second_arg, source);
let constraint = format!("isinstance(result, {})", type_name);
let fs = specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.property_specs.push(PropertySpec {
function: fname,
property_type: "type".to_string(),
constraint,
test_function: test_func_name.to_string(),
line,
confidence: Confidence::High,
});
true
}
fn try_extract_comparison_spec(
expr: Node,
test_func_name: &str,
line: u32,
source: &[u8],
specs: &mut HashMap<String, FunctionSpecs>,
) -> bool {
if expr.kind() != "comparison_operator" {
return false;
}
let mut cursor = expr.walk();
let mut left = None;
let mut op: Option<&str> = None;
let mut right = None;
for child in expr.children(&mut cursor) {
let kind = child.kind();
match kind {
"==" | "!=" | "<" | ">" | "<=" | ">=" => {
op = Some(kind);
}
"in" | "not in" => {
op = Some(kind);
}
"is" | "is not" => {
op = Some(kind);
}
_ => {
if left.is_none() {
left = Some(child);
} else if right.is_none() {
right = Some(child);
}
}
}
}
let (left, op, right) = match (left, op, right) {
(Some(l), Some(o), Some(r)) => (l, o, r),
_ => return false,
};
if op == "in" && right.kind() == "call" {
if let Some((fname, _)) = extract_call_info(right, source) {
let key_text = get_node_text(left, source);
let constraint = format!("{} in result", key_text);
let fs = specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.property_specs.push(PropertySpec {
function: fname,
property_type: "membership".to_string(),
constraint,
test_function: test_func_name.to_string(),
line,
confidence: Confidence::Medium,
});
return true;
}
}
if op == "==" {
if left.kind() == "call" {
let left_func = left
.child_by_field_name("function")
.map(|f| get_node_text(f, source));
if left_func == Some("len") {
if let Some(inner_args) = left.child_by_field_name("arguments") {
let mut inner_cursor = inner_args.walk();
for child in inner_args.children(&mut inner_cursor) {
if child.kind() == "call" {
if let Some((fname, _)) = extract_call_info(child, source) {
let len_val = get_node_text(right, source);
let constraint = format!("len(result) == {}", len_val);
let fs =
specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.property_specs.push(PropertySpec {
function: fname,
property_type: "length".to_string(),
constraint,
test_function: test_func_name.to_string(),
line,
confidence: Confidence::High,
});
return true;
}
}
}
}
}
}
if left.kind() == "call" {
if let Some((fname, inputs)) = extract_call_info(left, source) {
let output = try_eval_literal(right, source);
let fs = specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.input_output_specs.push(InputOutputSpec {
function: fname,
inputs,
output,
test_function: test_func_name.to_string(),
line,
confidence: Confidence::High,
});
return true;
}
}
if right.kind() == "call" {
if let Some((fname, inputs)) = extract_call_info(right, source) {
let output = try_eval_literal(left, source);
let fs = specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.input_output_specs.push(InputOutputSpec {
function: fname,
inputs,
output,
test_function: test_func_name.to_string(),
line,
confidence: Confidence::High,
});
return true;
}
}
}
if matches!(op, "<" | ">" | "<=" | ">=") {
let (call_side, value_side) = if left.kind() == "call" {
(left, right)
} else if right.kind() == "call" {
(right, left)
} else {
return false;
};
let call_func_name = call_side
.child_by_field_name("function")
.map(|f| get_node_text(f, source));
if call_func_name == Some("len") {
if let Some(inner_args) = call_side.child_by_field_name("arguments") {
let mut inner_cursor = inner_args.walk();
for child in inner_args.children(&mut inner_cursor) {
if child.kind() == "call" {
if let Some((fname, _)) = extract_call_info(child, source) {
let val = get_node_text(value_side, source);
let constraint = format!("len(result) {} {}", op, val);
let fs = specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.property_specs.push(PropertySpec {
function: fname,
property_type: "length".to_string(),
constraint,
test_function: test_func_name.to_string(),
line,
confidence: Confidence::Medium,
});
return true;
}
}
}
}
}
if let Some((fname, _)) = extract_call_info(call_side, source) {
let val = get_node_text(value_side, source);
let constraint = format!("result {} {}", op, val);
let fs = specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.property_specs.push(PropertySpec {
function: fname,
property_type: "bounds".to_string(),
constraint,
test_function: test_func_name.to_string(),
line,
confidence: Confidence::Medium,
});
return true;
}
}
false
}
fn extract_from_with(
with_stmt: Node,
test_func_name: &str,
source: &[u8],
specs: &mut HashMap<String, FunctionSpecs>,
) -> ContractsResult<()> {
let line = with_stmt.start_position().row as u32 + 1;
let mut cursor = with_stmt.walk();
let mut is_raises = false;
let mut exception_type = String::new();
let mut match_pattern: Option<String> = None;
for child in with_stmt.children(&mut cursor) {
if child.kind() == "with_clause" {
let mut clause_cursor = child.walk();
for clause_child in child.children(&mut clause_cursor) {
if clause_child.kind() == "with_item" {
if let Some(ctx_expr) = clause_child.child(0) {
if ctx_expr.kind() == "call" {
let func_text = ctx_expr
.child_by_field_name("function")
.map(|f| get_node_text(f, source))
.unwrap_or("");
if func_text == "raises" || func_text.ends_with(".raises") {
is_raises = true;
if let Some(args) = ctx_expr.child_by_field_name("arguments") {
let mut arg_cursor = args.walk();
for arg in args.children(&mut arg_cursor) {
let kind = arg.kind();
if kind == "(" || kind == ")" || kind == "," {
continue;
}
if kind == "keyword_argument" {
if let Some(key) = arg.child_by_field_name("name") {
if get_node_text(key, source) == "match" {
if let Some(val) =
arg.child_by_field_name("value")
{
let val_text = get_node_text(val, source);
match_pattern = Some(
val_text
.trim_matches('"')
.trim_matches('\'')
.to_string(),
);
}
}
}
} else if exception_type.is_empty() {
exception_type = get_node_text(arg, source).to_string();
}
}
}
}
}
}
}
}
}
}
if !is_raises || exception_type.is_empty() {
return Ok(());
}
let body = match with_stmt.child_by_field_name("body") {
Some(b) => b,
None => return Ok(()),
};
find_calls_and_add_exception_specs(
body,
source,
specs,
&exception_type,
&match_pattern,
test_func_name,
line,
);
Ok(())
}
fn find_calls_and_add_exception_specs(
block: Node,
source: &[u8],
specs: &mut HashMap<String, FunctionSpecs>,
exception_type: &str,
match_pattern: &Option<String>,
test_func_name: &str,
line: u32,
) {
let mut cursor = block.walk();
for child in block.children(&mut cursor) {
if child.kind() == "call" {
if let Some((fname, inputs)) = extract_call_info(child, source) {
let fs = specs.entry(fname.clone()).or_insert_with(|| FunctionSpecs {
function_name: fname.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
fs.exception_specs.push(ExceptionSpec {
function: fname,
inputs,
exception_type: exception_type.to_string(),
match_pattern: match_pattern.clone(),
test_function: test_func_name.to_string(),
line,
confidence: Confidence::High,
});
}
}
if child.child_count() > 0 {
find_calls_and_add_exception_specs(
child,
source,
specs,
exception_type,
match_pattern,
test_func_name,
line,
);
}
}
}
fn extract_call_info(call: Node, source: &[u8]) -> Option<(String, Vec<serde_json::Value>)> {
let func_node = call.child_by_field_name("function")?;
let func_name = match func_node.kind() {
"identifier" => get_node_text(func_node, source).to_string(),
"attribute" => {
func_node
.child_by_field_name("attribute")
.map(|a| get_node_text(a, source).to_string())?
}
_ => return None,
};
if matches!(
func_name.as_str(),
"len"
| "str"
| "int"
| "float"
| "bool"
| "list"
| "dict"
| "set"
| "tuple"
| "isinstance"
| "hasattr"
| "getattr"
| "print"
| "range"
| "type"
) {
return None;
}
let args_node = call.child_by_field_name("arguments")?;
let mut inputs = Vec::new();
let mut cursor = args_node.walk();
for child in args_node.children(&mut cursor) {
let kind = child.kind();
if kind == "(" || kind == ")" || kind == "," {
continue;
}
if kind == "keyword_argument" {
continue;
}
inputs.push(try_eval_literal(child, source));
}
Some((func_name, inputs))
}
fn try_eval_literal(node: Node, source: &[u8]) -> serde_json::Value {
try_eval_literal_inner(node, source, 0)
}
fn try_eval_literal_inner(node: Node, source: &[u8], depth: usize) -> serde_json::Value {
if depth > MAX_LITERAL_DEPTH {
return serde_json::Value::String(get_node_text(node, source).to_string());
}
let text = get_node_text(node, source);
if text.len() > MAX_LITERAL_SIZE {
return serde_json::Value::String("<large literal>".to_string());
}
match node.kind() {
"integer" => text
.parse::<i64>()
.map(serde_json::Value::from)
.unwrap_or_else(|_| serde_json::Value::String(text.to_string())),
"float" => text
.parse::<f64>()
.map(|f| serde_json::json!(f))
.unwrap_or_else(|_| serde_json::Value::String(text.to_string())),
"string" | "concatenated_string" => {
let unquoted = strip_string_quotes(text);
serde_json::Value::String(unquoted)
}
"true" | "True" => serde_json::Value::Bool(true),
"false" | "False" => serde_json::Value::Bool(false),
"none" | "None" => serde_json::Value::Null,
"identifier" => {
match text {
"True" => serde_json::Value::Bool(true),
"False" => serde_json::Value::Bool(false),
"None" => serde_json::Value::Null,
_ => serde_json::Value::String(text.to_string()),
}
}
"list" => {
let mut items = Vec::new();
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
let kind = child.kind();
if kind != "[" && kind != "]" && kind != "," {
items.push(try_eval_literal_inner(child, source, depth + 1));
}
}
serde_json::Value::Array(items)
}
"tuple" | "parenthesized_expression" => {
let mut items = Vec::new();
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
let kind = child.kind();
if kind != "(" && kind != ")" && kind != "," {
items.push(try_eval_literal_inner(child, source, depth + 1));
}
}
if items.len() == 1 && node.kind() == "parenthesized_expression" {
items.into_iter().next().unwrap_or(serde_json::Value::Null)
} else {
serde_json::Value::Array(items)
}
}
"dictionary" => {
let mut obj = serde_json::Map::new();
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "pair" {
let key_node = child.child_by_field_name("key");
let value_node = child.child_by_field_name("value");
if let (Some(k), Some(v)) = (key_node, value_node) {
let key = match try_eval_literal_inner(k, source, depth + 1) {
serde_json::Value::String(s) => s,
other => other.to_string(),
};
let value = try_eval_literal_inner(v, source, depth + 1);
obj.insert(key, value);
}
}
}
serde_json::Value::Object(obj)
}
"unary_operator" => {
let mut cursor = node.walk();
let mut op = "";
let mut operand = None;
for child in node.children(&mut cursor) {
if child.kind() == "-" {
op = "-";
} else if child.kind() == "+" {
op = "+";
} else {
operand = Some(child);
}
}
if op == "-" {
if let Some(operand) = operand {
let val = try_eval_literal_inner(operand, source, depth + 1);
if let serde_json::Value::Number(n) = val {
if let Some(i) = n.as_i64() {
return serde_json::json!(-i);
}
if let Some(f) = n.as_f64() {
return serde_json::json!(-f);
}
}
}
}
serde_json::Value::String(text.to_string())
}
_ => {
serde_json::Value::String(text.to_string())
}
}
}
fn strip_string_quotes(s: &str) -> String {
let s = s.trim();
let s = s
.strip_prefix('r')
.or_else(|| s.strip_prefix('R'))
.unwrap_or(s);
let s = s
.strip_prefix('b')
.or_else(|| s.strip_prefix('B'))
.unwrap_or(s);
let s = s
.strip_prefix('f')
.or_else(|| s.strip_prefix('F'))
.unwrap_or(s);
if s.starts_with("\"\"\"") && s.ends_with("\"\"\"") && s.len() >= 6 {
return s[3..s.len() - 3].to_string();
}
if s.starts_with("'''") && s.ends_with("'''") && s.len() >= 6 {
return s[3..s.len() - 3].to_string();
}
if ((s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')))
&& s.len() >= 2
{
return s[1..s.len() - 1].to_string();
}
s.to_string()
}
fn merge_specs(all_specs: &mut HashMap<String, FunctionSpecs>, new_specs: Vec<FunctionSpecs>) {
for new_fs in new_specs {
let entry = all_specs
.entry(new_fs.function_name.clone())
.or_insert_with(|| FunctionSpecs {
function_name: new_fs.function_name.clone(),
summary: String::new(),
test_count: 0,
input_output_specs: vec![],
exception_specs: vec![],
property_specs: vec![],
});
entry.input_output_specs.extend(new_fs.input_output_specs);
entry.exception_specs.extend(new_fs.exception_specs);
entry.property_specs.extend(new_fs.property_specs);
entry.test_count += new_fs.test_count;
}
}
fn generate_summary(fs: &FunctionSpecs) -> String {
let io_count = fs.input_output_specs.len();
let exc_count = fs.exception_specs.len();
let prop_count = fs.property_specs.len();
let mut parts = Vec::new();
if io_count > 0 {
parts.push(format!("{} input/output", io_count));
}
if exc_count > 0 {
parts.push(format!("{} raises", exc_count));
}
if prop_count > 0 {
parts.push(format!("{} property", prop_count));
}
if parts.is_empty() {
"no specs".to_string()
} else {
parts.join(", ")
}
}
pub fn format_specs_text(report: &SpecsReport) -> String {
let mut output = String::new();
for func in &report.functions {
output.push_str(&format!("Function: {}\n", func.function_name));
for spec in &func.input_output_specs {
let inputs_str: Vec<String> = spec.inputs.iter().map(|v| format!("{}", v)).collect();
output.push_str(&format!(
" IO: {}({}) == {}\n",
func.function_name,
inputs_str.join(", "),
spec.output
));
}
for spec in &func.exception_specs {
if let Some(pattern) = &spec.match_pattern {
output.push_str(&format!(
" Raises: {} (match='{}')\n",
spec.exception_type, pattern
));
} else {
output.push_str(&format!(" Raises: {}\n", spec.exception_type));
}
}
for spec in &func.property_specs {
output.push_str(&format!(
" Property ({}): {}\n",
spec.property_type, spec.constraint
));
}
output.push('\n');
}
output.push_str(&format!("Total specs: {}\n", report.summary.total_specs));
output
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
const PYTHON_TEST_FILE: &str = r#"
import pytest
def test_add_basic():
assert add(1, 2) == 3
assert add(0, 0) == 0
assert add(-1, 1) == 0
def test_add_large():
assert add(100, 200) == 300
def test_divide_by_zero():
with pytest.raises(ZeroDivisionError):
divide(1, 0)
def test_validate_raises_with_match():
with pytest.raises(ValueError, match="invalid"):
validate(-1)
def test_result_type():
# Direct call pattern for type check
assert isinstance(multiply(2, 3), int)
def test_result_length():
# Direct call pattern for length check
assert len(get_items()) == 3
def test_result_bounds():
# Direct call pattern for bounds check
assert compute_value() > 0
def test_membership():
# Direct call pattern for membership check
assert "key" in get_config()
"#;
#[test]
fn test_specs_input_output_extraction() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let add_func = report.functions.iter().find(|f| f.function_name == "add");
assert!(add_func.is_some(), "Should find 'add' function");
let add = add_func.unwrap();
assert!(
add.input_output_specs.len() >= 3,
"Should extract at least 3 IO specs for add, got {}",
add.input_output_specs.len()
);
}
#[test]
fn test_specs_exception_extraction() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let divide_func = report
.functions
.iter()
.find(|f| f.function_name == "divide");
assert!(divide_func.is_some(), "Should find 'divide' function");
let divide = divide_func.unwrap();
assert!(
!divide.exception_specs.is_empty(),
"Should extract exception specs for divide"
);
assert_eq!(
divide.exception_specs[0].exception_type,
"ZeroDivisionError"
);
}
#[test]
fn test_specs_exception_with_match() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let validate_func = report
.functions
.iter()
.find(|f| f.function_name == "validate");
assert!(validate_func.is_some(), "Should find 'validate' function");
let validate = validate_func.unwrap();
assert!(!validate.exception_specs.is_empty());
assert!(validate.exception_specs[0].match_pattern.is_some());
assert_eq!(
validate.exception_specs[0].match_pattern.as_ref().unwrap(),
"invalid"
);
}
#[test]
fn test_specs_property_type_extraction() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let multiply_func = report
.functions
.iter()
.find(|f| f.function_name == "multiply");
assert!(multiply_func.is_some(), "Should find 'multiply' function");
let multiply = multiply_func.unwrap();
let type_prop = multiply
.property_specs
.iter()
.find(|p| p.property_type == "type");
assert!(type_prop.is_some(), "Should extract type property");
assert!(type_prop.unwrap().constraint.contains("isinstance"));
}
#[test]
fn test_specs_property_length_extraction() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let get_items = report
.functions
.iter()
.find(|f| f.function_name == "get_items");
assert!(get_items.is_some(), "Should find 'get_items' function");
let get_items = get_items.unwrap();
let len_prop = get_items
.property_specs
.iter()
.find(|p| p.property_type == "length");
assert!(len_prop.is_some(), "Should extract length property");
}
#[test]
fn test_specs_property_bounds_extraction() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let compute = report
.functions
.iter()
.find(|f| f.function_name == "compute_value");
assert!(compute.is_some(), "Should find 'compute_value' function");
let compute = compute.unwrap();
let bounds_prop = compute
.property_specs
.iter()
.find(|p| p.property_type == "bounds");
assert!(bounds_prop.is_some(), "Should extract bounds property");
}
#[test]
fn test_specs_property_membership_extraction() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let get_config = report
.functions
.iter()
.find(|f| f.function_name == "get_config");
assert!(get_config.is_some(), "Should find 'get_config' function");
let get_config = get_config.unwrap();
let member_prop = get_config
.property_specs
.iter()
.find(|p| p.property_type == "membership");
assert!(member_prop.is_some(), "Should extract membership property");
}
#[test]
fn test_specs_function_filter() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, Some("add")).unwrap();
assert_eq!(report.functions.len(), 1);
assert_eq!(report.functions[0].function_name, "add");
}
#[test]
fn test_specs_directory_scan() {
let temp = TempDir::new().unwrap();
let test1 = temp.path().join("test_one.py");
fs::write(&test1, "def test_foo():\n assert foo(1) == 2\n").unwrap();
let test2 = temp.path().join("test_two.py");
fs::write(&test2, "def test_bar():\n assert bar(3) == 4\n").unwrap();
let report = run_specs(temp.path(), None).unwrap();
assert_eq!(report.summary.test_files_scanned, 2);
assert!(report.functions.iter().any(|f| f.function_name == "foo"));
assert!(report.functions.iter().any(|f| f.function_name == "bar"));
}
#[test]
fn test_specs_json_output() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let json = serde_json::to_string(&report).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert!(parsed.get("functions").is_some());
assert!(parsed.get("summary").is_some());
}
#[test]
fn test_specs_text_output() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
let text = format_specs_text(&report);
assert!(text.contains("Function:"));
assert!(text.contains("Total specs:"));
}
#[test]
fn test_specs_test_path_not_found() {
let _args = SpecsArgs {
from_tests: PathBuf::from("/nonexistent/test_path"),
output_format: ContractsOutputFormat::Json,
function: None,
source: None,
};
let path = Path::new("/nonexistent/test_path");
assert!(!path.exists(), "Path should not exist for this test");
}
#[test]
fn test_specs_empty_directory() {
let temp = TempDir::new().unwrap();
let report = run_specs(temp.path(), None).unwrap();
assert_eq!(report.summary.test_files_scanned, 0);
assert_eq!(report.summary.total_specs, 0);
}
#[test]
fn test_specs_summary_counts() {
let temp = TempDir::new().unwrap();
let test_path = temp.path().join("test_module.py");
fs::write(&test_path, PYTHON_TEST_FILE).unwrap();
let report = run_specs(&test_path, None).unwrap();
assert!(report.summary.total_specs > 0);
assert!(report.summary.by_type.input_output > 0);
assert!(report.summary.test_functions_scanned > 0);
assert_eq!(report.summary.test_files_scanned, 1);
}
fn parse_and_get_expr(source: &str) -> (Tree, Vec<u8>) {
let mut parser = Parser::new();
parser.set_language(&PYTHON_LANGUAGE.into()).unwrap();
let tree = parser.parse(source, None).unwrap();
(tree, source.as_bytes().to_vec())
}
fn find_expr_node(node: Node) -> Node {
if node.kind() == "expression_statement" {
if let Some(child) = node.child(0) {
return child;
}
}
node
}
#[test]
fn test_literal_eval_integers() {
let (tree, source) = parse_and_get_expr("42");
let root = tree.root_node();
let expr = find_expr_node(root.child(0).unwrap());
let val = try_eval_literal(expr, &source);
assert_eq!(val, serde_json::json!(42));
}
#[test]
fn test_literal_eval_negative() {
let (tree, source) = parse_and_get_expr("-5");
let root = tree.root_node();
let expr = find_expr_node(root.child(0).unwrap());
let val = try_eval_literal(expr, &source);
assert_eq!(val, serde_json::json!(-5));
}
#[test]
fn test_literal_eval_string() {
let (tree, source) = parse_and_get_expr("\"hello\"");
let root = tree.root_node();
let expr = find_expr_node(root.child(0).unwrap());
let val = try_eval_literal(expr, &source);
assert_eq!(val, serde_json::json!("hello"));
}
#[test]
fn test_literal_eval_list() {
let (tree, source) = parse_and_get_expr("[1, 2, 3]");
let root = tree.root_node();
let expr = find_expr_node(root.child(0).unwrap());
let val = try_eval_literal(expr, &source);
assert_eq!(val, serde_json::json!([1, 2, 3]));
}
}