zerodds-py 1.0.0-rc.3

PyO3 bindings for the ZeroDDS DCPS API
Documentation
"""Current variant — IDL dataclass tests.

Central test: the Python serialization of a ShapeType-equivalent
``@dataclass`` must produce **byte-exactly** the same XCDR2-LE bytes
as the Rust side in `crates/dcps/src/interop.rs` (= `dds_dcps::
interop::ShapeType`).

This ensures that a Python publisher and a Rust subscriber can talk
bidirectionally over a ``BytesTopic`` with the correct ``typename`` —
without Rust codegen.
"""

from dataclasses import dataclass

import pytest

from zerodds.cdr import CdrReader, CdrWriter
from zerodds.idl import Bool, Bytes, Float32, Int32, String, idl_struct


# =============================================================================
# CDR primitive roundtrip
# =============================================================================


def test_cdr_primitive_roundtrip() -> None:
    w = CdrWriter()
    w.write_bool(True)
    w.write_u8(0x7F)
    w.write_i16(-1234)
    w.write_u32(0xDEADBEEF)
    w.write_i64(-1_000_000_000_000)
    w.write_f32(3.14)
    w.write_f64(-2.7182818)
    w.write_string("hello")
    w.write_bytes(b"\x01\x02\x03")
    data = w.into_bytes()

    r = CdrReader(data)
    assert r.read_bool() is True
    assert r.read_u8() == 0x7F
    assert r.read_i16() == -1234
    assert r.read_u32() == 0xDEADBEEF
    assert r.read_i64() == -1_000_000_000_000
    assert abs(r.read_f32() - 3.14) < 1e-5
    assert abs(r.read_f64() - (-2.7182818)) < 1e-12
    assert r.read_string() == "hello"
    assert r.read_bytes() == b"\x01\x02\x03"


def test_cdr_string_alignment_padding() -> None:
    # "AB\0" is 3 raw bytes, then 1 byte of padding to 4-aligned, then x int32.
    # Identical layout to the Rust ShapeType "AB":
    #   04 00 00 00  length=3
    #   41 42 00     "AB\0"
    #   00           pad
    #   01 00 00 00  int32=1
    w = CdrWriter()
    w.write_string("AB")
    w.write_i32(1)
    expected = bytes(
        [
            0x03,
            0x00,
            0x00,
            0x00,  # length = 3 (incl. null)
            0x41,
            0x42,
            0x00,  # "AB\0"
            0x00,  # pad
            0x01,
            0x00,
            0x00,
            0x00,  # int32 = 1
        ],
    )
    assert w.into_bytes() == expected


def test_cdr_reader_rejects_truncated_string() -> None:
    # Length says 3, but only 2 bytes follow.
    data = bytes([0x03, 0x00, 0x00, 0x00, 0x41, 0x42])
    r = CdrReader(data)
    with pytest.raises(ValueError):
        r.read_string()


# =============================================================================
# @idl_struct
# =============================================================================


@idl_struct(typename="ShapeType")
@dataclass
class PyShape:
    color: String  # type: ignore[valid-type]
    x: Int32  # type: ignore[valid-type]
    y: Int32  # type: ignore[valid-type]
    shapesize: Int32  # type: ignore[valid-type]


def test_pyshape_byte_roundtrip() -> None:
    s = PyShape(color="RED", x=42, y=77, shapesize=30)
    encoded = s.encode()
    # Reference exactly as in crates/dcps/tests/shapes_type_wire.rs.
    expected = bytes(
        [
            0x04,
            0x00,
            0x00,
            0x00,  # color.length
            0x52,
            0x45,
            0x44,
            0x00,  # "RED\0"
            0x2A,
            0x00,
            0x00,
            0x00,  # x = 42
            0x4D,
            0x00,
            0x00,
            0x00,  # y = 77
            0x1E,
            0x00,
            0x00,
            0x00,  # shapesize = 30
        ],
    )
    assert encoded == expected, (
        f"Python CDR encoder deviates from the Rust reference.\n"
        f"  got: {encoded.hex(' ')}\n"
        f"  exp: {expected.hex(' ')}"
    )
    # Return path.
    back = PyShape.decode(encoded)
    assert back == s


def test_pyshape_type_name_set_by_decorator() -> None:
    assert PyShape.TYPE_NAME == "ShapeType"


def test_idl_struct_requires_dataclass() -> None:
    # Plain class → error.
    with pytest.raises(TypeError):

        @idl_struct(typename="x")
        class NotAClass:
            x: Int32  # type: ignore[valid-type]

        _ = NotAClass


@idl_struct(typename="zerodds::Sensor")
@dataclass
class Sensor:
    active: Bool  # type: ignore[valid-type]
    reading: Float32  # type: ignore[valid-type]
    label: String  # type: ignore[valid-type]
    raw: Bytes  # type: ignore[valid-type]


def test_sensor_mixed_fields_roundtrip() -> None:
    s = Sensor(active=True, reading=1.5, label="sonar", raw=b"\xAA\xBB\xCC")
    back = Sensor.decode(s.encode())
    assert back.active is True
    assert abs(back.reading - 1.5) < 1e-6
    assert back.label == "sonar"
    assert back.raw == b"\xAA\xBB\xCC"


def test_auto_map_python_primitives() -> None:
    # Without an explicit IDL annotation: `int` → Int32, `str` → String, etc.
    @idl_struct(typename="auto::Test")
    @dataclass
    class Auto:
        n: int
        name: str

    a = Auto(n=7, name="x")
    back = Auto.decode(a.encode())
    assert back == a


# =============================================================================
# Composite extension — composite types: nested struct, sequence, array, optional
# =============================================================================


@idl_struct(typename="geom::Vec3")
@dataclass
class Vec3:
    x: Float32  # type: ignore[valid-type]
    y: Float32  # type: ignore[valid-type]
    z: Float32  # type: ignore[valid-type]


@idl_struct(typename="geom::Pose")
@dataclass
class Pose:
    position: Vec3
    label: String  # type: ignore[valid-type]


def test_nested_struct_roundtrip() -> None:
    p = Pose(position=Vec3(x=1.0, y=2.0, z=3.0), label="origin")
    back = Pose.decode(p.encode())
    assert back == p


from zerodds.idl import Array, Optional, Sequence  # noqa: E402


@idl_struct(typename="container::Grid")
@dataclass
class Grid:
    values: Sequence[Int32]  # type: ignore[valid-type]


def test_sequence_of_primitives_roundtrip() -> None:
    g = Grid(values=[1, 2, 3, 42, -7])
    back = Grid.decode(g.encode())
    assert back.values == g.values


@idl_struct(typename="container::Mesh")
@dataclass
class Mesh:
    points: Sequence[Vec3]  # type: ignore[valid-type]


def test_sequence_of_structs_roundtrip() -> None:
    m = Mesh(points=[Vec3(1.0, 0.0, 0.0), Vec3(0.0, 1.0, 0.0)])
    back = Mesh.decode(m.encode())
    assert back.points == m.points


@idl_struct(typename="container::Fixed")
@dataclass
class Fixed:
    raw: Array[Int32, 4]  # type: ignore[valid-type]


def test_array_fixed_count_roundtrip() -> None:
    f = Fixed(raw=[10, 20, 30, 40])
    back = Fixed.decode(f.encode())
    assert back.raw == f.raw


def test_array_wrong_count_rejected() -> None:
    f = Fixed(raw=[1, 2])
    with pytest.raises(ValueError):
        f.encode()


@idl_struct(typename="container::Maybe")
@dataclass
class Maybe:
    tag: String  # type: ignore[valid-type]
    maybe_num: Optional[Int32]  # type: ignore[valid-type]


def test_optional_present_and_absent() -> None:
    with_value = Maybe(tag="hit", maybe_num=42)
    without = Maybe(tag="miss", maybe_num=None)
    assert Maybe.decode(with_value.encode()) == with_value
    assert Maybe.decode(without.encode()) == without


# =============================================================================
# IntEnum extension — IntEnum as an IDL field type
# =============================================================================


from enum import IntEnum  # noqa: E402


class Severity(IntEnum):
    OK = 0
    WARN = 1
    ERROR = 2


@idl_struct(typename="diag::Event")
@dataclass
class Event:
    code: Int32  # type: ignore[valid-type]
    severity: Severity
    message: String  # type: ignore[valid-type]


def test_enum_roundtrip() -> None:
    e = Event(code=42, severity=Severity.WARN, message="voltage drop")
    back = Event.decode(e.encode())
    assert back == e
    assert back.severity is Severity.WARN


# =============================================================================
# Union extension — discriminated unions
# =============================================================================


from zerodds.idl import Float64, idl_union  # noqa: E402


# Union: disc=0 → Int32 'n', disc=1 → String 's', default → Float64 'f'.
MyUnion = idl_union(
    typename="u::MyUnion",
    discriminator=Int32,
    cases={0: ("n", Int32), 1: ("s", String)},
    default=("f", Float64),
)


def test_union_case_int_roundtrip() -> None:
    v = MyUnion.make(0, 42)
    back = MyUnion.decode(MyUnion.encode(v))
    assert back == v
    assert back.value == 42


def test_union_case_string_roundtrip() -> None:
    v = MyUnion.make(1, "hello")
    back = MyUnion.decode(MyUnion.encode(v))
    assert back.value == "hello"


def test_union_default_branch_used_for_unknown_disc() -> None:
    # Discriminator 99 matches no case → the default (Float64) is taken.
    v = MyUnion.make(99, 3.14)
    back = MyUnion.decode(MyUnion.encode(v))
    assert abs(back.value - 3.14) < 1e-9


def test_union_without_default_rejects_unknown_disc() -> None:
    strict = idl_union(
        typename="u::Strict",
        discriminator=Int32,
        cases={0: ("n", Int32)},
    )
    with pytest.raises(ValueError):
        strict.encode(strict.make(1, 0))


def test_enum_unknown_value_raises() -> None:
    # Hand-built bytes with Severity=99 (not in the enum).
    # code=0 | severity=99 | message="x" → simulated via the _idl_fields-
    # internal encoder with a raw_int instead of an enum variant.
    from zerodds.cdr import CdrWriter

    w = CdrWriter()
    w.write_i32(0)   # code
    w.write_i32(99)  # severity raw
    w.write_string("x")  # message
    raw = w.into_bytes()
    with pytest.raises(ValueError):
        Event.decode(raw)


def test_idl_struct_resolves_pep563_stringified_annotations() -> None:
    # With `from __future__ import annotations`, all field types are
    # strings at runtime. The decorator must resolve them in the module
    # namespace — regression test for the multi-process tests example bug.
    import textwrap
    import types

    import sys

    mod = types.ModuleType("_pep563_probe")
    sys.modules["_pep563_probe"] = mod
    mod.__dict__["__name__"] = "_pep563_probe"
    src = textwrap.dedent(
        """
        from __future__ import annotations
        from dataclasses import dataclass
        from zerodds.idl import idl_struct, Int32, String

        @idl_struct(typename="probe::T")
        @dataclass
        class Probe:
            n: Int32
            label: String
        """,
    )
    exec(src, mod.__dict__)  # noqa: S102
    p = mod.Probe(n=7, label="abc")
    assert mod.Probe.decode(p.encode()) == p