use super::super::FixtureWorkspace;
fn assert_rules_present(report: &deslop::ScanReport, rule_ids: &[&str]) {
for rule_id in rule_ids {
assert!(
report
.findings
.iter()
.any(|finding| finding.rule_id == *rule_id),
"expected rule {rule_id} to fire"
);
}
}
fn assert_rules_absent(report: &deslop::ScanReport, rule_ids: &[&str]) {
for rule_id in rule_ids {
assert!(
!report
.findings
.iter()
.any(|finding| finding.rule_id == *rule_id),
"did not expect rule {rule_id} to fire"
);
}
}
#[test]
fn test_python_advanceplan2_async_rules() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/async_service.py",
python_fixture!("integration/advanceplan2/async_positive.txt"),
)]);
let report = workspace.scan();
assert_rules_present(
&report,
&[
"untracked_asyncio_task",
"background_task_exception_unobserved",
"async_lock_held_across_await",
"async_retry_sleep_without_backoff",
],
);
}
#[test]
fn test_python_advanceplan2_async_clean() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/async_service.py",
python_fixture!("integration/advanceplan2/async_clean.txt"),
)]);
let report = workspace.scan();
assert_rules_absent(
&report,
&[
"untracked_asyncio_task",
"background_task_exception_unobserved",
"async_lock_held_across_await",
"async_retry_sleep_without_backoff",
],
);
}
#[test]
fn test_python_advanceplan2_contract_rules() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/contracts.py",
python_fixture!("integration/advanceplan2/contracts_positive.txt"),
)]);
let report = workspace.scan();
assert_rules_present(
&report,
&[
"mutable_default_argument",
"dataclass_mutable_default",
"dataclass_heavy_post_init",
"option_bag_model",
"public_any_type_leak",
"typeddict_unchecked_access",
],
);
}
#[test]
fn test_python_advanceplan2_contract_clean() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/contracts.py",
python_fixture!("integration/advanceplan2/contracts_clean.txt"),
)]);
let report = workspace.scan();
assert_rules_absent(
&report,
&[
"mutable_default_argument",
"dataclass_mutable_default",
"dataclass_heavy_post_init",
"option_bag_model",
"public_any_type_leak",
"typeddict_unchecked_access",
],
);
}
#[test]
fn test_python_advanceplan2_skips_serializer_to_dict_any_contract() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/contracts.py",
python_fixture!("integration/advanceplan2/contracts_to_dict_clean.txt"),
)]);
let report = workspace.scan();
assert_rules_absent(&report, &["public_any_type_leak"]);
}
#[test]
fn test_python_advanceplan2_skips_framework_route_any_contracts() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"apps/api/main.py",
python_fixture!("integration/advanceplan2/contracts_framework_boundary_clean.txt"),
)]);
let report = workspace.scan();
assert_rules_absent(
&report,
&[
"public_any_type_leak",
"python_public_api_any_contract",
"weak_typing",
],
);
}
#[test]
fn test_python_advanceplan2_import_time_rules() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/bootstrap.py",
python_fixture!("integration/advanceplan2/import_time_positive.txt"),
)]);
let report = workspace.scan();
assert_rules_present(
&report,
&[
"import_time_network_call",
"import_time_file_io",
"import_time_subprocess",
"module_singleton_client_side_effect",
"mutable_module_global_state",
"import_time_config_load",
],
);
}
#[test]
fn test_python_advanceplan2_import_time_clean() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/bootstrap.py",
python_fixture!("integration/advanceplan2/import_time_clean.txt"),
)]);
let report = workspace.scan();
assert_rules_absent(
&report,
&[
"import_time_network_call",
"import_time_file_io",
"import_time_subprocess",
"module_singleton_client_side_effect",
"mutable_module_global_state",
"import_time_config_load",
],
);
}
#[test]
fn test_python_advanceplan2_skips_framework_entrypoint_env_bootstrap() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"apps/api/main.py",
python_fixture!("integration/advanceplan2/import_time_framework_entrypoint_clean.txt"),
)]);
let report = workspace.scan();
assert_rules_absent(&report, &["import_time_config_load"]);
}
#[test]
fn test_python_advanceplan2_boundary_rules() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/boundary.py",
python_fixture!("integration/advanceplan2/boundary_positive.txt"),
)]);
let report = workspace.scan();
assert_rules_present(
&report,
&[
"unsafe_yaml_loader",
"pickle_deserialization_boundary",
"subprocess_shell_true",
"tar_extractall_unfiltered",
"tempfile_without_cleanup",
],
);
}
#[test]
fn test_python_advanceplan2_boundary_clean() {
let workspace = FixtureWorkspace::new();
workspace.write_files(&[(
"pkg/boundary.py",
python_fixture!("integration/advanceplan2/boundary_clean.txt"),
)]);
let report = workspace.scan();
assert_rules_absent(
&report,
&[
"unsafe_yaml_loader",
"pickle_deserialization_boundary",
"subprocess_shell_true",
"tar_extractall_unfiltered",
"tempfile_without_cleanup",
],
);
}