import json
import functools
from typing import Any
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from kanoniv_agent_auth import (
AgentKeyPair,
Delegation,
Invocation,
verify_invocation,
)
class DelegationContext:
def __init__(self, agent_keypair, delegation, root_identity):
self.agent_keypair = agent_keypair
self.delegation = delegation
self.root_identity = root_identity
self.invocations = []
REVOKED_DELEGATIONS: set[str] = set()
def requires_delegation(actions=None, require_cost=False, require_resource=False):
def decorator(func):
@functools.wraps(func)
def wrapper(state):
ctx = state.get("delegation_context")
if ctx is None:
return {**state, "error": "No delegation context."}
if ctx.delegation.content_hash() in REVOKED_DELEGATIONS:
return {**state, "error": f"Delegation revoked for {ctx.agent_keypair.identity().did}"}
action = state.get("action", func.__name__)
args = state.get("args", {})
if require_cost and "cost" not in args:
return {**state, "error": f"Node '{action}' requires 'cost' in args."}
if require_resource and "resource" not in args:
return {**state, "error": f"Node '{action}' requires 'resource' in args."}
try:
invocation = Invocation.create(ctx.agent_keypair, action, json.dumps(args), ctx.delegation)
result = verify_invocation(invocation, ctx.agent_keypair.identity(), ctx.root_identity)
ctx.invocations.append({"action": action, "chain": result[2], "depth": result[3]})
except ValueError as e:
return {**state, "error": f"Delegation denied: {e}"}
return func(state)
return wrapper
return decorator
class PipelineState(TypedDict, total=False):
delegation_context: Any
action: str
args: dict
error: str
search_results: list[str]
summary: str
draft: str
final_draft: str
review_status: str
review_notes: str
current_phase: str
def setup_agents():
human = AgentKeyPair.generate()
coordinator = AgentKeyPair.generate()
researcher = AgentKeyPair.generate()
writer = AgentKeyPair.generate()
reviewer = AgentKeyPair.generate()
print("Identities:")
print(f" Human (root): {human.identity().did}")
print(f" Coordinator: {coordinator.identity().did}")
print(f" Researcher: {researcher.identity().did}")
print(f" Writer: {writer.identity().did}")
print(f" Reviewer: {reviewer.identity().did}")
coordinator_del = Delegation.create_root(
human,
coordinator.identity().did,
json.dumps([
{"type": "action_scope", "value": [
"search", "summarize", "draft", "edit", "review",
]},
{"type": "max_cost", "value": 10.0},
]),
)
print("\nDelegation chains:")
print(" Human -> Coordinator: search, summarize, draft, edit, review (max $10)")
researcher_del = Delegation.delegate(
coordinator, researcher.identity().did,
json.dumps([
{"type": "action_scope", "value": ["search", "summarize"]},
{"type": "max_cost", "value": 5.0},
]),
coordinator_del,
)
print(" Coordinator -> Researcher: search, summarize (max $5)")
writer_del = Delegation.delegate(
coordinator, writer.identity().did,
json.dumps([
{"type": "action_scope", "value": ["draft", "edit"]},
{"type": "max_cost", "value": 3.0},
]),
coordinator_del,
)
print(" Coordinator -> Writer: draft, edit (max $3)")
reviewer_del = Delegation.delegate(
coordinator, reviewer.identity().did,
json.dumps([
{"type": "action_scope", "value": ["review"]},
{"type": "max_cost", "value": 1.0},
]),
coordinator_del,
)
print(" Coordinator -> Reviewer: review (max $1)")
return {
"human": human,
"coordinator": coordinator,
"researcher": (researcher, researcher_del),
"writer": (writer, writer_del),
"reviewer": (reviewer, reviewer_del),
}
@requires_delegation(actions=["search"], require_cost=True)
def search_node(state: PipelineState) -> PipelineState:
query = state["args"].get("query", "")
print(f" [search] Querying: '{query}'")
return {
**state,
"search_results": [
f"Result 1 for '{query}'",
f"Result 2 for '{query}'",
f"Result 3 for '{query}'",
],
"current_phase": "summarize",
}
@requires_delegation(actions=["summarize"])
def summarize_node(state: PipelineState) -> PipelineState:
results = state.get("search_results", [])
topic = state["args"].get("topic", "unknown")
print(f" [summarize] Condensing {len(results)} results on '{topic}'")
return {
**state,
"summary": f"Summary of {len(results)} findings on '{topic}'",
"current_phase": "draft",
}
@requires_delegation(actions=["draft"], require_cost=True)
def draft_node(state: PipelineState) -> PipelineState:
summary = state.get("summary", "")
print(f" [draft] Writing based on: {summary[:50]}...")
return {
**state,
"draft": f"Draft article based on: {summary}",
"current_phase": "edit",
}
@requires_delegation(actions=["edit"])
def edit_node(state: PipelineState) -> PipelineState:
draft = state.get("draft", "")
print(f" [edit] Polishing draft ({len(draft)} chars)")
return {
**state,
"final_draft": f"[Edited] {draft}",
"current_phase": "review",
}
@requires_delegation(actions=["review"])
def review_node(state: PipelineState) -> PipelineState:
final = state.get("final_draft", "")
print(f" [review] Reviewing final draft ({len(final)} chars)")
return {
**state,
"review_status": "approved",
"review_notes": "Looks good. Clear and accurate.",
"current_phase": "done",
}
def error_node(state: PipelineState) -> PipelineState:
print(f" [error] {state.get('error', 'Unknown error')}")
return {**state, "current_phase": "done"}
def coordinator_node(state: PipelineState) -> PipelineState:
phase = state.get("current_phase", "search")
print(f" [coordinator] Routing to phase: {phase}")
return {**state, "current_phase": phase}
def route_after_coordinator(state: PipelineState) -> str:
phase = state.get("current_phase", "search")
if phase in ("search", "summarize", "draft", "edit", "review"):
return phase
return "done"
def route_after_specialist(state: PipelineState) -> str:
if "error" in state and state["error"]:
return "error"
return "done"
def build_graph():
graph = StateGraph(PipelineState)
graph.add_node("coordinator", coordinator_node)
graph.add_node("search", search_node)
graph.add_node("summarize", summarize_node)
graph.add_node("draft", draft_node)
graph.add_node("edit", edit_node)
graph.add_node("review", review_node)
graph.add_node("error", error_node)
graph.add_edge(START, "coordinator")
graph.add_conditional_edges("coordinator", route_after_coordinator, {
"search": "search",
"summarize": "summarize",
"draft": "draft",
"edit": "edit",
"review": "review",
"done": END,
})
for node in ("search", "summarize", "draft", "edit", "review"):
graph.add_conditional_edges(node, route_after_specialist, {
"done": END,
"error": "error",
})
graph.add_edge("error", END)
return graph.compile()
def run_pipeline(agents):
human_id = agents["human"].identity()
phase_to_agent = {
"search": agents["researcher"],
"summarize": agents["researcher"],
"draft": agents["writer"],
"edit": agents["writer"],
"review": agents["reviewer"],
}
phase_actions = {
"search": {"query": "cryptographic agent identity", "cost": 0.50},
"summarize": {"topic": "cryptographic agent identity", "cost": 0.25},
"draft": {"topic": "agent auth", "cost": 1.0},
"edit": {"style": "concise", "cost": 0.50},
"review": {"criteria": "accuracy", "cost": 0.25},
}
phase_order = ["search", "summarize", "draft", "edit", "review"]
audit_trail = []
state: PipelineState = {}
print("\n--- Running LangGraph Pipeline ---\n")
for phase in phase_order:
keypair, delegation = phase_to_agent[phase]
ctx = DelegationContext(keypair, delegation, human_id)
state = {
**state,
"current_phase": phase,
"action": phase,
"args": phase_actions[phase],
"delegation_context": ctx,
"error": "",
}
app = build_graph()
state = app.invoke(state)
for inv in ctx.invocations:
audit_trail.append({
"phase": phase,
"agent": keypair.identity().did,
**inv,
})
if state.get("error"):
print(f" Pipeline halted: {state['error']}")
break
print(f"\n--- Audit Trail ({len(audit_trail)} verified actions) ---\n")
for entry in audit_trail:
did_short = entry["agent"].split(":")[-1][:12]
print(f" {entry['phase']:>10} did:..{did_short} depth={entry['depth']}")
return state
def test_boundary(agents, label, agent_key, action, args):
human_id = agents["human"].identity()
keypair, delegation = agents[agent_key]
ctx = DelegationContext(keypair, delegation, human_id)
app = build_graph()
state: PipelineState = {
"current_phase": action,
"action": action,
"args": args,
"delegation_context": ctx,
}
print(f"\n[{label}]")
result = app.invoke(state)
if "error" in result and result["error"]:
print(f" Correctly blocked: {result['error']}")
else:
print(" UNEXPECTED: action was allowed")
def test_boundaries(agents):
print("\n--- Authority Boundary Tests ---")
test_boundary(
agents, "A: Researcher tries to draft", "researcher",
"draft", {"topic": "sneaky draft", "cost": 1.0},
)
test_boundary(
agents, "B: Writer tries to search", "writer",
"search", {"query": "off limits", "cost": 0.5},
)
test_boundary(
agents, "C: Reviewer tries to edit", "reviewer",
"edit", {"style": "verbose", "cost": 0.25},
)
test_boundary(
agents, "D: Writer tries $5 draft (exceeds $3 budget)", "writer",
"draft", {"topic": "expensive", "cost": 5.0},
)
def test_revocation(agents):
print("\n--- Mid-Pipeline Revocation ---\n")
human_id = agents["human"].identity()
writer_keypair, writer_del = agents["writer"]
researcher_keypair, researcher_del = agents["researcher"]
ctx = DelegationContext(writer_keypair, writer_del, human_id)
app = build_graph()
state: PipelineState = {
"current_phase": "draft",
"action": "draft",
"args": {"topic": "test", "cost": 1.0},
"delegation_context": ctx,
}
result = app.invoke(state)
blocked = result.get("error", "")
print(f" Writer drafts (before revocation): {'BLOCKED - ' + blocked if blocked else 'OK'}")
REVOKED_DELEGATIONS.add(writer_del.content_hash())
print(f" Writer delegation revoked (hash: {writer_del.content_hash()[:16]}...)")
ctx = DelegationContext(writer_keypair, writer_del, human_id)
state = {
"current_phase": "draft",
"action": "draft",
"args": {"topic": "test", "cost": 1.0},
"delegation_context": ctx,
}
result = app.invoke(state)
blocked = result.get("error", "")
print(f" Writer drafts (after revocation): {'BLOCKED - ' + blocked if blocked else 'OK'}")
ctx = DelegationContext(researcher_keypair, researcher_del, human_id)
state = {
"current_phase": "search",
"action": "search",
"args": {"query": "still works", "cost": 0.5},
"delegation_context": ctx,
}
result = app.invoke(state)
blocked = result.get("error", "")
print(f" Researcher searches (unaffected): {'BLOCKED - ' + blocked if blocked else 'OK'}")
REVOKED_DELEGATIONS.discard(writer_del.content_hash())
if __name__ == "__main__":
print("=" * 60)
print("Tutorial: Multi-Agent Handoff with Scoped Authority")
print("=" * 60)
agents = setup_agents()
state = run_pipeline(agents)
print(f"\n Final review: {state.get('review_status', 'N/A')}")
print(f" Notes: {state.get('review_notes', 'N/A')}")
test_boundaries(agents)
test_revocation(agents)
print("\n" + "=" * 60)
print("Done. Every node was cryptographically verified via LangGraph.")
print("=" * 60)