from typing import Callable
from dora import DoraStatus
class Operator:
def __init__(self):
pass
def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes], None],
):
print(
f"Received input {dora_input['id']}, with data: {dora_input['data']}"
)
return DoraStatus.CONTINUE
def __del__(self):
pass