from __future__ import annotations
import asyncio
from typing import TypedDict
from a1.langgraph_tool import a1_node
class TradingState(TypedDict):
symbol: str
quantity: int
authorized: bool
result: str
@a1_node(passport_path="passport.json", capability="trade.equity")
async def execute_trade_node(state: TradingState) -> dict:
return {
**state,
"authorized": True,
"result": f"Traded {state['quantity']} shares of {state['symbol']}",
}
@a1_node(passport_path="passport.json", capability="portfolio.read")
async def read_portfolio_node(state: TradingState) -> dict:
return {**state, "result": "Portfolio: [...]"}
async def main() -> None:
state: TradingState = {
"symbol": "ACME",
"quantity": 100,
"authorized": False,
"result": "",
}
state = await execute_trade_node(state) print(state["result"])
state = await read_portfolio_node(state) print(state["result"])
if __name__ == "__main__":
asyncio.run(main())