hydro_cli 0.13.0

Hydro Deploy Command Line Interface
Documentation
from codecs import decode
import json
from pathlib import Path
import pytest
import hydro

@pytest.mark.asyncio
async def test_server_sink():
    deployment = hydro.Deployment()
    localhost_machine = deployment.Localhost()

    sender = deployment.HydroflowCrate(
        src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
        example="dedalus_sender",
        profile="dev",
        args=[json.dumps(([0, 1], 123))],
        on=localhost_machine.client_only()
    )

    receiver0 = deployment.HydroflowCrate(
        src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
        example="dedalus_receiver",
        profile="dev",
        on=localhost_machine
    )

    receiver1 = deployment.HydroflowCrate(
        src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
        example="dedalus_receiver",
        profile="dev",
        on=localhost_machine
    )

    sender.ports.broadcast.send_to(hydro.demux({
        0: receiver0.ports.broadcast.merge(),
        1: receiver1.ports.broadcast.merge(),
    }))

    await deployment.deploy()

    receiver0_out = await receiver0.stdout()
    receiver1_out = await receiver1.stdout()

    await deployment.start()

    async for log in receiver0_out:
        assert log == "echo (\"Hello 123\",)"
        break

    async for log in receiver1_out:
        assert log == "echo (\"Hello 123\",)"
        break

@pytest.mark.asyncio
async def test_client_sink():
    deployment = hydro.Deployment()
    localhost_machine = deployment.Localhost()

    sender = deployment.HydroflowCrate(
        src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
        example="dedalus_sender",
        profile="dev",
        args=[json.dumps(([0, 1], 123))],
        on=localhost_machine
    )

    receiver0 = deployment.HydroflowCrate(
        src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
        example="dedalus_receiver",
        profile="dev",
        on=localhost_machine.client_only()
    )

    receiver1 = deployment.HydroflowCrate(
        src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
        example="dedalus_receiver",
        profile="dev",
        on=localhost_machine.client_only()
    )

    sender.ports.broadcast.send_to(hydro.demux({
        0: receiver0.ports.broadcast.merge(),
        1: receiver1.ports.broadcast.merge(),
    }))

    await deployment.deploy()

    receiver0_out = await receiver0.stdout()
    receiver1_out = await receiver1.stdout()

    await deployment.start()

    async for log in receiver0_out:
        assert log == "echo (\"Hello 123\",)"
        break

    async for log in receiver1_out:
        assert log == "echo (\"Hello 123\",)"
        break