import asyncio
from minllm import Node, Flow, AsyncNode, AsyncFlow
class FirstNode(Node):
def prep(self, shared):
print("FirstNode: prep")
return {"data": shared.get("initial_data", "default")}
def exec(self, prep_res):
print(f"FirstNode: exec with {prep_res}")
return prep_res["data"].upper()
def post(self, shared, prep_res, exec_res):
print(f"FirstNode: post with result {exec_res}")
shared["first_result"] = exec_res
return "default"
class SecondNode(Node):
def prep(self, shared):
print("SecondNode: prep")
return {"prev_result": shared.get("first_result", "")}
def exec(self, prep_res):
print(f"SecondNode: exec with {prep_res}")
return f"Processed: {prep_res['prev_result']}"
def post(self, shared, prep_res, exec_res):
print(f"SecondNode: post with result {exec_res}")
shared["final_result"] = exec_res
return None
class AsyncFirstNode(AsyncNode):
async def prep_async(self, shared):
print("AsyncFirstNode: prep")
return {"data": shared.get("initial_data", "default")}
async def exec_async(self, prep_res):
print(f"AsyncFirstNode: exec with {prep_res}")
await asyncio.sleep(0.1) return prep_res["data"].upper()
async def post_async(self, shared, prep_res, exec_res):
print(f"AsyncFirstNode: post with result {exec_res}")
shared["first_result"] = exec_res
return "default"
class AsyncSecondNode(AsyncNode):
async def prep_async(self, shared):
print("AsyncSecondNode: prep")
return {"prev_result": shared.get("first_result", "")}
async def exec_async(self, prep_res):
print(f"AsyncSecondNode: exec with {prep_res}")
await asyncio.sleep(0.1) return f"Processed: {prep_res['prev_result']}"
async def post_async(self, shared, prep_res, exec_res):
print(f"AsyncSecondNode: post with result {exec_res}")
shared["final_result"] = exec_res
return None
def run_sync_example():
print("\n=== Running Synchronous Example ===")
first = FirstNode()
second = SecondNode()
flow = Flow(first)
first >> second
shared_data = {"initial_data": "hello world"}
flow.run(shared_data)
print(f"Final result: {shared_data['final_result']}")
async def run_async_example():
print("\n=== Running Asynchronous Example ===")
first = AsyncFirstNode()
second = AsyncSecondNode()
flow = AsyncFlow(first)
first >> second
shared_data = {"initial_data": "hello async world"}
await flow.run_async(shared_data)
print(f"Final result: {shared_data['final_result']}")
if __name__ == "__main__":
run_sync_example()
asyncio.run(run_async_example())