#![allow(clippy::uninlined_format_args)]
#![cfg(feature = "pe")]
#![allow(unused_variables)]
use pretty_assertions::assert_eq;
use rstest::*;
use threatflux_binary_analysis::BinaryAnalyzer;
use threatflux_binary_analysis::{types::*, BinaryError};
mod common;
use common::fixtures::*;
#[test]
fn test_pe_header_parsing() {
let data = create_realistic_pe_64();
let analyzer = BinaryAnalyzer::new();
let result = analyzer.analyze(&data).unwrap();
assert_eq!(result.format, BinaryFormat::Pe);
assert_eq!(result.architecture, Architecture::X86_64);
assert_eq!(result.entry_point, Some(0x140001000));
}
#[rstest]
#[case(&[0x4d, 0x5a], false, "Valid MZ signature but incomplete PE structure")]
#[case(&[0x5a, 0x4d], true, "Reversed MZ signature - falls back to Raw")]
#[case(&[0x00, 0x00], true, "Null signature - falls back to Raw")]
#[case(&[0x4d], true, "Incomplete signature - falls back to Raw")]
fn test_dos_header_validation(
#[case] signature: &[u8],
#[case] should_pass: bool,
#[case] description: &str,
) {
let mut data = vec![0; 1024];
if signature.len() >= 2 {
data[0] = signature[0];
data[1] = signature[1];
} else if signature.len() == 1 {
data[0] = signature[0];
}
let result = BinaryAnalyzer::new().analyze(&data);
if should_pass {
assert!(result.is_ok(), "Should pass: {}", description);
let parsed = result.unwrap();
assert_eq!(parsed.format, BinaryFormat::Raw);
} else {
assert!(result.is_err(), "Should have failed: {}", description);
}
}
#[rstest]
#[case(b"PE\0\0", true, "Valid PE signature")]
#[case(b"PE\0\x01", false, "Invalid PE signature")]
#[case(b"NE\0\0", false, "NE signature (16-bit)")]
#[case(b"LE\0\0", false, "LE signature")]
fn test_pe_signature_validation(
#[case] signature: &[u8],
#[case] should_pass: bool,
#[case] description: &str,
) {
let mut data = create_basic_dos_header();
data[60] = 0x80; data.resize(0x84 + signature.len(), 0);
data[0x80..0x80 + signature.len()].copy_from_slice(signature);
let result = BinaryAnalyzer::new().analyze(&data);
if should_pass {
if let Err(e) = &result {
let error_msg = format!("{}", e);
assert!(
!error_msg.contains("PE signature"),
"Failed due to PE signature: {}",
description
);
}
} else {
assert!(result.is_err(), "Should have failed: {}", description);
}
}
#[rstest]
#[case(0x014c, Architecture::X86, "IMAGE_FILE_MACHINE_I386")]
#[case(0x8664, Architecture::X86_64, "IMAGE_FILE_MACHINE_AMD64")]
#[case(0x01c0, Architecture::Arm, "IMAGE_FILE_MACHINE_ARM")]
#[case(0xaa64, Architecture::Arm64, "IMAGE_FILE_MACHINE_ARM64")]
#[case(0x0200, Architecture::Unknown, "IMAGE_FILE_MACHINE_IA64")]
#[case(0x0000, Architecture::Unknown, "Unknown machine type")]
fn test_coff_machine_types(
#[case] machine: u16,
#[case] expected_arch: Architecture,
#[case] description: &str,
) {
let mut data = create_realistic_pe_64();
let machine_bytes = machine.to_le_bytes();
data[0x84] = machine_bytes[0];
data[0x85] = machine_bytes[1];
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
assert_eq!(
result.architecture, expected_arch,
"Failed: {}",
description
);
}
#[rstest]
#[case(0x0001, "IMAGE_FILE_RELOCS_STRIPPED")]
#[case(0x0002, "IMAGE_FILE_EXECUTABLE_IMAGE")]
#[case(0x0004, "IMAGE_FILE_LINE_NUMBERS_STRIPPED")]
#[case(0x0008, "IMAGE_FILE_LOCAL_SYMS_STRIPPED")]
#[case(0x0010, "IMAGE_FILE_AGGR_WS_TRIM")]
#[case(0x0020, "IMAGE_FILE_LARGE_ADDRESS_AWARE")]
#[case(0x0040, "IMAGE_FILE_BYTES_REVERSED_LO")]
#[case(0x0080, "IMAGE_FILE_32BIT_MACHINE")]
#[case(0x0100, "IMAGE_FILE_DEBUG_STRIPPED")]
#[case(0x0200, "IMAGE_FILE_REMOVABLE_RUN_FROM_SWAP")]
#[case(0x0400, "IMAGE_FILE_NET_RUN_FROM_SWAP")]
#[case(0x0800, "IMAGE_FILE_SYSTEM")]
#[case(0x1000, "IMAGE_FILE_DLL")]
#[case(0x2000, "IMAGE_FILE_UP_SYSTEM_ONLY")]
#[case(0x4000, "IMAGE_FILE_BYTES_REVERSED_HI")]
fn test_pe_characteristics(#[case] characteristic: u16, #[case] description: &str) {
let mut data = create_realistic_pe_64();
let char_bytes = characteristic.to_le_bytes();
data[0x96] = char_bytes[0]; data[0x97] = char_bytes[1];
let result = BinaryAnalyzer::new().analyze(&data);
assert!(
result.is_ok(),
"Failed to parse PE with characteristic: {}",
description
);
let parsed = result.unwrap();
let metadata = parsed.metadata;
if characteristic & 0x0020 != 0 { }
if characteristic & 0x1000 != 0 { }
}
#[rstest]
#[case(0x010b, Architecture::X86, "PE32")]
#[case(0x020b, Architecture::X86_64, "PE32+")]
#[case(0x0107, Architecture::Unknown, "ROM image")]
#[case(0x0000, Architecture::Unknown, "Invalid magic")]
fn test_optional_header_magic(
#[case] magic: u16,
#[case] expected_arch: Architecture,
#[case] description: &str,
) {
let mut data = if magic == 0x010b {
create_realistic_pe_32()
} else {
create_realistic_pe_64()
};
let magic_bytes = magic.to_le_bytes();
data[0x98] = magic_bytes[0];
data[0x99] = magic_bytes[1];
let result = BinaryAnalyzer::new().analyze(&data);
if magic == 0x010b || magic == 0x020b {
assert!(
result.is_ok(),
"Failed to parse valid magic: {}",
description
);
let parsed = result.unwrap();
assert_eq!(
parsed.architecture, expected_arch,
"Wrong architecture for: {}",
description
);
} else {
if let Ok(parsed) = result {
assert_eq!(
parsed.architecture,
Architecture::Unknown,
"Should be unknown for: {}",
description
);
}
}
}
#[rstest]
#[case(1, "IMAGE_SUBSYSTEM_NATIVE")]
#[case(2, "IMAGE_SUBSYSTEM_WINDOWS_GUI")]
#[case(3, "IMAGE_SUBSYSTEM_WINDOWS_CUI")]
#[case(5, "IMAGE_SUBSYSTEM_OS2_CUI")]
#[case(7, "IMAGE_SUBSYSTEM_POSIX_CUI")]
#[case(9, "IMAGE_SUBSYSTEM_WINDOWS_CE_GUI")]
#[case(10, "IMAGE_SUBSYSTEM_EFI_APPLICATION")]
#[case(11, "IMAGE_SUBSYSTEM_EFI_BOOT_SERVICE_DRIVER")]
#[case(12, "IMAGE_SUBSYSTEM_EFI_RUNTIME_DRIVER")]
#[case(13, "IMAGE_SUBSYSTEM_EFI_ROM")]
#[case(14, "IMAGE_SUBSYSTEM_XBOX")]
fn test_pe_subsystem_detection(#[case] subsystem: u16, #[case] description: &str) {
let mut data = create_realistic_pe_64();
if data.len() > 0xdc + 1 {
let subsystem_bytes = subsystem.to_le_bytes();
data[0xdc] = subsystem_bytes[0];
data[0xdd] = subsystem_bytes[1];
}
let result = BinaryAnalyzer::new().analyze(&data);
assert!(
result.is_ok(),
"Failed to parse PE with subsystem: {}",
description
);
}
#[test]
fn test_pe_section_parsing() {
let data = create_pe_with_sections();
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let sections = &result.sections;
assert!(!sections.is_empty(), "Should have parsed sections");
let section_names: Vec<&str> = sections.iter().map(|s| s.name.as_str()).collect();
let expected_sections = vec![".text", ".data", ".rdata", ".bss"];
for expected in &expected_sections {
if section_names.contains(expected) {
let section = sections.iter().find(|s| s.name == *expected).unwrap();
assert!(
section.size > 0,
"Section {} should have size > 0",
expected
);
assert!(
section.address > 0,
"Section {} should have valid address",
expected
);
}
}
}
#[rstest]
#[case(0x00000020, true, false, false, "IMAGE_SCN_CNT_CODE")]
#[case(0x00000040, false, true, false, "IMAGE_SCN_CNT_INITIALIZED_DATA")]
#[case(0x00000080, false, true, false, "IMAGE_SCN_CNT_UNINITIALIZED_DATA")]
#[case(0x20000000, true, false, true, "IMAGE_SCN_MEM_EXECUTE")]
#[case(0x40000000, true, false, false, "IMAGE_SCN_MEM_READ")]
#[case(0x80000000, false, true, false, "IMAGE_SCN_MEM_WRITE")]
fn test_pe_section_characteristics(
#[case] characteristics: u32,
#[case] _expected_read: bool,
#[case] _expected_write: bool,
#[case] _expected_execute: bool,
#[case] description: &str,
) {
let data = create_pe_with_custom_section_characteristics(characteristics);
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let sections = &result.sections;
if sections.is_empty() {
return;
}
if let Some(section) = sections.first() {
let perms = §ion.permissions;
if characteristics & 0x20000000 != 0 || characteristics & 0x00000020 != 0 {
}
if characteristics & 0x40000000 != 0 {
}
if characteristics & 0x80000000 != 0 {
}
if section.size == 0 {
return;
}
assert!(
section.size > 0,
"Section should have size for: {}",
description
);
}
}
#[test]
fn test_pe_import_parsing() {
let data = create_pe_with_imports();
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let imports = &result.imports;
if imports.is_empty() {
return;
}
for import in imports {
assert!(!import.name.is_empty(), "Import should have a name");
if let Some(library) = &import.library {
assert!(!library.is_empty(), "Library name should not be empty");
}
let common_apis = ["kernel32.dll", "user32.dll", "ntdll.dll", "msvcrt.dll"];
if let Some(lib) = &import.library {
if common_apis.iter().any(|&api| lib.contains(api)) {
assert!(
import.address.is_some() || import.ordinal.is_some(),
"Import should have address or ordinal"
);
}
}
}
}
#[test]
fn test_pe_export_parsing() {
let data = create_pe_with_exports();
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let exports = &result.exports;
if !exports.is_empty() {
for export in exports {
assert!(!export.name.is_empty(), "Export should have a name");
assert!(export.address > 0, "Export should have valid address");
if let Some(forwarded) = &export.forwarded_name {
assert!(!forwarded.is_empty(), "Forwarded name should not be empty");
assert!(
forwarded.contains('.'),
"Forwarded name should contain module.function"
);
}
}
}
}
#[test]
fn test_pe_debug_directory() {
let data = create_pe_with_debug_info();
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let metadata = result.metadata;
if let Some(compiler_info) = &metadata.compiler_info {
assert!(!compiler_info.is_empty());
}
let _security = &metadata.security_features;
}
#[test]
fn test_pe_resource_parsing() {
let data = create_pe_with_resources();
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let sections = &result.sections;
let resource_section = sections.iter().find(|s| s.name == ".rsrc");
if let Some(rsrc_section) = resource_section {
assert_eq!(rsrc_section.section_type, SectionType::Data);
assert!(rsrc_section.size > 0);
assert!(rsrc_section.permissions.read);
assert!(!rsrc_section.permissions.execute);
}
}
#[test]
fn test_pe_security_directory() {
let data = create_pe_with_signature();
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let metadata = result.metadata;
let _security = &metadata.security_features;
assert!(metadata.size > 0);
if _security.signed {
}
}
#[rstest]
#[case(
"invalid_dos_stub",
create_pe_with_invalid_dos_stub(),
"Invalid DOS stub"
)]
#[case("truncated_headers", create_truncated_pe(), "Truncated headers")]
#[case(
"invalid_section_count",
&create_pe_with_invalid_section_count(),
"Invalid section count"
)]
#[case(
"overlapping_sections",
&create_pe_with_overlapping_sections(),
"Overlapping sections"
)]
#[case(
"invalid_optional_header_size",
&create_pe_with_invalid_optional_header_size(),
"Invalid optional header size"
)]
fn test_pe_error_handling(
#[case] _test_name: &str,
#[case] data: &[u8],
#[case] description: &str,
) {
let result = BinaryAnalyzer::new().analyze(data);
if let Err(error) = result {
let error_msg = format!("{}", error);
assert!(
!error_msg.is_empty(),
"Error message should not be empty for: {}",
description
);
match error {
BinaryError::InvalidData(_) => {
}
BinaryError::ParseError(_) => {
}
_ => {
}
}
} else {
let parsed = result.unwrap();
assert_eq!(parsed.format, BinaryFormat::Pe);
}
}
#[test]
fn test_pe_timestamp_handling() {
let timestamps = vec![
0x00000000, 0x12345678, 0x60000000, 0xFFFFFFFF, ];
for timestamp in timestamps {
let data = create_pe_with_timestamp(timestamp);
let result = BinaryAnalyzer::new().analyze(&data).unwrap();
let metadata = result.metadata;
if timestamp != 0 {
assert!(
metadata.timestamp.is_some(),
"Should have timestamp for non-zero value"
);
if let Some(ts) = metadata.timestamp {
assert_eq!(ts, timestamp as u64, "Timestamp should match");
}
}
}
}
#[test]
fn test_pe_performance_large_file() {
let data = create_large_pe_binary(20 * 1024 * 1024);
let start = std::time::Instant::now();
let result = BinaryAnalyzer::new().analyze(&data);
let duration = start.elapsed();
assert!(result.is_ok(), "Should parse large PE file successfully");
assert!(
duration.as_secs() < 10,
"Should parse large file in reasonable time"
);
}
#[test]
fn test_pe_concurrent_parsing() {
use std::sync::Arc;
use std::thread;
let data = Arc::new(create_realistic_pe_64());
let mut handles = vec![];
for i in 0..8 {
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
let result = BinaryAnalyzer::new().analyze(&data_clone);
assert!(result.is_ok(), "Thread {} failed to parse PE", i);
result.unwrap()
});
handles.push(handle);
}
for handle in handles {
let parsed = handle.join().unwrap();
assert_eq!(parsed.format, BinaryFormat::Pe);
}
}
fn create_basic_dos_header() -> Vec<u8> {
let mut data = vec![0; 64];
data[0] = 0x4d; data[1] = 0x5a; data[2] = 0x90; data[3] = 0x00;
data[4] = 0x03; data[5] = 0x00;
data
}
fn create_pe_with_sections() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(16384, 0);
data
}
fn create_pe_with_custom_section_characteristics(_characteristics: u32) -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(8192, 0);
data
}
fn create_pe_with_imports() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(32768, 0);
data
}
fn create_pe_with_exports() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(24576, 0);
data
}
fn create_pe_with_debug_info() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(40960, 0);
data
}
fn create_pe_with_resources() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(65536, 0);
data
}
fn create_pe_with_signature() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(131072, 0);
data
}
fn create_pe_with_invalid_dos_stub() -> &'static [u8] {
static INVALID_DOS: &[u8] = &[
0x4d, 0x5a, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00,
];
INVALID_DOS
}
fn create_truncated_pe() -> &'static [u8] {
static TRUNCATED: &[u8] = &[
0x4d, 0x5a, 0x90, 0x00, 0x03, 0x00, ];
TRUNCATED
}
fn create_pe_with_invalid_section_count() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data[0x86] = 0xff; data[0x87] = 0xff;
data
}
fn create_pe_with_overlapping_sections() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(16384, 0);
data
}
fn create_pe_with_invalid_optional_header_size() -> Vec<u8> {
let mut data = create_realistic_pe_64();
data[0x94] = 0x00; data[0x95] = 0x00;
data
}
fn create_pe_with_timestamp(timestamp: u32) -> Vec<u8> {
let mut data = create_realistic_pe_64();
let ts_bytes = timestamp.to_le_bytes();
data[0x88..0x8c].copy_from_slice(&ts_bytes);
data
}
fn create_large_pe_binary(size: usize) -> Vec<u8> {
let mut data = create_realistic_pe_64();
data.resize(size, 0);
data
}