from codecs import decode
import json
from pathlib import Path
import pytest
import hydro
@pytest.mark.asyncio
async def test_server_sink():
deployment = hydro.Deployment()
localhost_machine = deployment.Localhost()
sender = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
example="dedalus_sender",
profile="dev",
args=[json.dumps(([0, 1], 123))],
on=localhost_machine.client_only()
)
receiver0 = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
example="dedalus_receiver",
profile="dev",
on=localhost_machine
)
receiver1 = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
example="dedalus_receiver",
profile="dev",
on=localhost_machine
)
sender.ports.broadcast.send_to(hydro.demux({
0: receiver0.ports.broadcast.merge(),
1: receiver1.ports.broadcast.merge(),
}))
await deployment.deploy()
receiver0_out = await receiver0.stdout()
receiver1_out = await receiver1.stdout()
await deployment.start()
async for log in receiver0_out:
assert log == "echo (\"Hello 123\",)"
break
async for log in receiver1_out:
assert log == "echo (\"Hello 123\",)"
break
@pytest.mark.asyncio
async def test_client_sink():
deployment = hydro.Deployment()
localhost_machine = deployment.Localhost()
sender = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
example="dedalus_sender",
profile="dev",
args=[json.dumps(([0, 1], 123))],
on=localhost_machine
)
receiver0 = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
example="dedalus_receiver",
profile="dev",
on=localhost_machine.client_only()
)
receiver1 = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent.parent / "hydro_cli_examples").absolute()),
example="dedalus_receiver",
profile="dev",
on=localhost_machine.client_only()
)
sender.ports.broadcast.send_to(hydro.demux({
0: receiver0.ports.broadcast.merge(),
1: receiver1.ports.broadcast.merge(),
}))
await deployment.deploy()
receiver0_out = await receiver0.stdout()
receiver1_out = await receiver1.stdout()
await deployment.start()
async for log in receiver0_out:
assert log == "echo (\"Hello 123\",)"
break
async for log in receiver1_out:
assert log == "echo (\"Hello 123\",)"
break