dcap-qvl 0.3.7

This crate implements the quote verification logic for DCAP (Data Center Attestation Primitives) in pure Rust.
Documentation
#!/usr/bin/env python3
"""
Test async collateral functions with real sample quote data.

This script uses the actual sample quote files to test the async functions.
"""

import asyncio
import sys
import os
import json
from pathlib import Path

# Add the python package to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "python"))

try:
    import dcap_qvl

    print("✓ Successfully imported dcap_qvl")
except ImportError as e:
    print(f"✗ Failed to import dcap_qvl: {e}")
    print("Make sure the Python bindings are built first.")
    sys.exit(1)


async def test_with_sample_quotes():
    """Test async functions with real sample quote data."""
    print("\n=== Testing with Sample Quote Data ===")

    # Find sample directory (at project root, two levels up from tests/)
    sample_dir = Path(__file__).parent.parent.parent / "sample"
    if not sample_dir.exists():
        print(f"✗ Sample directory not found: {sample_dir}")
        return False

    # Test with SGX quote
    sgx_quote_path = sample_dir / "sgx_quote"
    tdx_quote_path = sample_dir / "tdx_quote"

    results = {}

    for quote_type, quote_path in [("SGX", sgx_quote_path), ("TDX", tdx_quote_path)]:
        if not quote_path.exists():
            print(f"{quote_type} quote file not found: {quote_path}")
            continue

        print(f"\n--- Testing with {quote_type} Quote ---")

        try:
            # Load quote data
            with open(quote_path, "rb") as f:
                quote_data = f.read()

            print(f"✓ Loaded {quote_type} quote: {len(quote_data)} bytes")

            # Parse quote to get info
            quote = dcap_qvl.Quote.parse(quote_data)
            fmspc = quote.fmspc()
            ca = quote.ca()
            is_tdx = quote.is_tdx()
            quote_type_str = quote.quote_type()

            print(f"  FMSPC: {fmspc}")
            print(f"  CA: {ca}")
            print(f"  Is TDX: {is_tdx}")
            print(f"  Quote Type: {quote_type_str}")

            # Test get_collateral_for_fmspc directly
            print(f"\n  Testing get_collateral_for_fmspc...")
            try:
                collateral1 = await dcap_qvl.get_collateral_for_fmspc(
                    pccs_url="https://api.trustedservices.intel.com",
                    fmspc=fmspc,
                    ca=ca,
                    for_sgx=not is_tdx,
                )
                print(f"  ✓ get_collateral_for_fmspc succeeded")
                results[f"{quote_type}_get_collateral_for_fmspc"] = True
            except Exception as e:
                print(f"  ✗ get_collateral_for_fmspc failed: {e}")
                results[f"{quote_type}_get_collateral_for_fmspc"] = False

            # Test get_collateral with quote
            print(f"  Testing get_collateral...")
            try:
                collateral2 = await dcap_qvl.get_collateral(
                    "https://api.trustedservices.intel.com", quote_data
                )
                print(f"  ✓ get_collateral succeeded")
                results[f"{quote_type}_get_collateral"] = True
            except Exception as e:
                print(f"  ✗ get_collateral failed: {e}")
                results[f"{quote_type}_get_collateral"] = False

            # Test get_collateral_from_pcs
            print(f"  Testing get_collateral_from_pcs...")
            try:
                collateral3 = await dcap_qvl.get_collateral_from_pcs(quote_data)
                print(f"  ✓ get_collateral_from_pcs succeeded")
                results[f"{quote_type}_get_collateral_from_pcs"] = True
            except Exception as e:
                print(f"  ✗ get_collateral_from_pcs failed: {e}")
                results[f"{quote_type}_get_collateral_from_pcs"] = False

            # Test get_collateral_and_verify
            print(f"  Testing get_collateral_and_verify...")
            try:
                verified_report = await dcap_qvl.get_collateral_and_verify(quote_data)
                print(f"  ✓ get_collateral_and_verify succeeded")
                print(f"    Status: {verified_report.status}")
                print(f"    Advisory IDs: {verified_report.advisory_ids}")
                results[f"{quote_type}_get_collateral_and_verify"] = True
            except Exception as e:
                print(f"  ✗ get_collateral_and_verify failed: {e}")
                results[f"{quote_type}_get_collateral_and_verify"] = False

        except Exception as e:
            print(f"✗ Failed to process {quote_type} quote: {e}")
            continue

    return results


async def test_function_signatures():
    """Test that all functions have correct async signatures."""
    print("\n=== Testing Function Signatures ===")

    import inspect

    functions_to_test = [
        "get_collateral_for_fmspc",
        "get_collateral",
        "get_collateral_from_pcs",
        "get_collateral_and_verify",
    ]

    all_correct = True

    for func_name in functions_to_test:
        if hasattr(dcap_qvl, func_name):
            func = getattr(dcap_qvl, func_name)
            # Note: PyO3 async functions using future_into_py don't register as
            # coroutine functions via inspect.iscoroutinefunction(), but they
            # still return awaitables. We just verify the function exists.
            print(f"{func_name} is available")

            # Get signature
            try:
                sig = inspect.signature(func)
                print(f"  Signature: {func_name}{sig}")
            except Exception as e:
                print(f"  Could not get signature: {e}")
        else:
            print(f"{func_name} not found")
            all_correct = False

    return all_correct


async def main():
    """Main test function."""
    print("=" * 60)
    print("TESTING ASYNC COLLATERAL FUNCTIONS WITH SAMPLE DATA")
    print("=" * 60)

    # Test function signatures
    signatures_ok = await test_function_signatures()

    # Test with sample data
    sample_results = await test_with_sample_quotes()

    # Summary
    print("\n" + "=" * 60)
    print("TEST SUMMARY")
    print("=" * 60)

    if signatures_ok:
        print("✓ All async functions are available")
    else:
        print("✗ Some async functions are missing")

    if sample_results:
        success_count = sum(1 for result in sample_results.values() if result)
        total_count = len(sample_results)

        print(f"\nSample Data Tests: {success_count}/{total_count} passed")

        for test_name, result in sample_results.items():
            status = "" if result else ""
            print(f"  {status} {test_name}")

        overall_success = signatures_ok and success_count > 0
    else:
        print("✗ No sample data tests were run")
        overall_success = False

    if overall_success:
        print(f"\n🎉 Async collateral functions are working!")
    else:
        print(f"\n❌ Some issues found. Check the errors above.")

    return overall_success


if __name__ == "__main__":
    try:
        success = asyncio.run(main())
        sys.exit(0 if success else 1)
    except KeyboardInterrupt:
        print("\n✗ Test interrupted by user")
        sys.exit(1)
    except Exception as e:
        print(f"\n✗ Unexpected error: {e}")
        import traceback

        traceback.print_exc()
        sys.exit(1)