from __future__ import annotations
import asyncio
import json
import uuid
from datetime import datetime, timezone
import httpx
CLIENT_DELEGATE_ID = "ldp:delegate:orchestrator"
class LdpClient:
def __init__(self, delegate_id: str = CLIENT_DELEGATE_ID):
self.delegate_id = delegate_id
self.http = httpx.AsyncClient(timeout=60.0)
self.sessions: dict[str, dict] = {}
def _envelope(self, session_id: str, to: str, body: dict) -> dict:
return {
"message_id": str(uuid.uuid4()),
"session_id": session_id,
"from": self.delegate_id,
"to": to,
"body": body,
"payload_mode": "SemanticFrame",
"timestamp": datetime.now(timezone.utc).isoformat(),
"provenance": None,
}
async def discover(self, url: str) -> dict:
resp = await self.http.get(f"{url}/ldp/identity")
resp.raise_for_status()
return resp.json()
async def establish_session(self, url: str) -> str:
identity = await self.discover(url)
remote_id = identity["delegate_id"]
temp_session = str(uuid.uuid4())
hello = self._envelope(temp_session, remote_id, {
"type": "HELLO",
"delegate_id": self.delegate_id,
"supported_modes": ["SemanticFrame", "Text"],
})
resp = await self.http.post(f"{url}/ldp/messages", json=hello)
manifest = resp.json()
print(f" Received: {manifest['body']['type']}")
propose = self._envelope(temp_session, remote_id, {
"type": "SESSION_PROPOSE",
"config": {
"preferred_payload_modes": ["SemanticFrame", "Text"],
"ttl_secs": 3600,
},
})
resp = await self.http.post(f"{url}/ldp/messages", json=propose)
accept = resp.json()
session_id = accept["body"]["session_id"]
mode = accept["body"]["negotiated_mode"]
print(f" Session established: {session_id[:8]}... (mode: {mode})")
self.sessions[url] = {
"session_id": session_id,
"remote_id": remote_id,
"mode": mode,
"identity": identity,
}
return session_id
async def submit_task(self, url: str, skill: str, input_data: dict) -> dict:
if url not in self.sessions:
await self.establish_session(url)
session = self.sessions[url]
task_id = str(uuid.uuid4())
submit = self._envelope(session["session_id"], session["remote_id"], {
"type": "TASK_SUBMIT",
"task_id": task_id,
"skill": skill,
"input": input_data,
})
resp = await self.http.post(f"{url}/ldp/messages", json=submit)
result = resp.json()
return {
"task_id": task_id,
"output": result["body"].get("output"),
"provenance": result["body"].get("provenance"),
}
async def close_session(self, url: str) -> None:
if url not in self.sessions:
return
session = self.sessions[url]
close = self._envelope(session["session_id"], session["remote_id"], {
"type": "SESSION_CLOSE",
"reason": "client done",
})
await self.http.post(f"{url}/ldp/messages", json=close)
del self.sessions[url]
print(f" Session closed")
async def close(self) -> None:
for url in list(self.sessions.keys()):
await self.close_session(url)
await self.http.aclose()
async def route_by_quality(delegates: list[dict], skill: str) -> dict | None:
best = None
best_score = -1.0
for d in delegates:
for cap in d.get("capabilities", []):
if cap["name"] == skill:
score = cap.get("quality", {}).get("quality_score", 0)
if score > best_score:
best_score = score
best = d
return best
async def route_by_cost(delegates: list[dict], skill: str) -> dict | None:
best = None
best_cost = float("inf")
for d in delegates:
for cap in d.get("capabilities", []):
if cap["name"] == skill:
cost = cap.get("quality", {}).get("cost_per_call_usd", float("inf"))
if cost < best_cost:
best_cost = cost
best = d
return best
async def main():
client = LdpClient()
delegate_url = "http://localhost:8090"
print("1. Discovering delegate...")
identity = await client.discover(delegate_url)
print(f" Name: {identity['name']}")
print(f" Model: {identity['model_family']} {identity['model_version']}")
print(f" Trust: {identity['trust_domain']['name']}")
print(f" Capabilities: {[c['name'] for c in identity['capabilities']]}")
print(f" Payload modes: {identity['supported_payload_modes']}")
print()
print("2. Establishing session...")
session_id = await client.establish_session(delegate_url)
print()
print("3. Submitting reasoning task...")
result = await client.submit_task(
delegate_url,
skill="reasoning",
input_data={
"prompt": "What are the key tradeoffs between microservices and monolithic architecture for a 5-person team?",
},
)
print(f" Output: {json.dumps(result['output'], indent=2)[:300]}...")
print(f" Provenance:")
prov = result["provenance"]
print(f" produced_by: {prov['produced_by']}")
print(f" model: {prov['model_version']}")
print(f" confidence: {prov['confidence']}")
print(f" verified: {prov['verified']}")
print()
print("4. Submitting summarization task (reuses session)...")
result2 = await client.submit_task(
delegate_url,
skill="summarization",
input_data={"prompt": "Summarize the key principles of LDP in 3 bullet points."},
)
print(f" Output: {json.dumps(result2['output'], indent=2)[:300]}...")
print()
print("5. Routing example...")
delegates = [identity]
best_quality = await route_by_quality(delegates, "reasoning")
best_cost = await route_by_cost(delegates, "reasoning")
if best_quality:
print(f" Best quality for 'reasoning': {best_quality['name']}")
if best_cost:
print(f" Cheapest for 'reasoning': {best_cost['name']}")
print()
print("6. Closing session...")
await client.close()
print("\nDone.")
if __name__ == "__main__":
asyncio.run(main())