Skip to main content

jamjet_protocols/
anp.rs

1//! ANP (Agent Network Protocol) adapter — DID-based agent discovery (I2.1-I2.4).
2//!
3//! ANP uses W3C DID Documents for agent identity and capability advertisement.
4//! This implementation supports:
5//! - DID URL resolution via HTTP (`did:web` method)
6//! - Service endpoint discovery from DID Document
7//! - Capability mapping to `RemoteCapabilities`
8//! - Task invocation routed to the discovered service endpoint (A2A-compatible)
9//!
10//! DID Document format (W3C DID Core spec):
11//! ```json
12//! {
13//!   "@context": "https://www.w3.org/ns/did/v1",
14//!   "id": "did:web:example.com:agents:analyst",
15//!   "service": [{
16//!     "id": "#a2a",
17//!     "type": "A2AService",
18//!     "serviceEndpoint": "https://example.com/agents/analyst"
19//!   }]
20//! }
21//! ```
22
23use crate::{
24    ProtocolAdapter, RemoteCapabilities, RemoteSkill, TaskHandle, TaskRequest, TaskStatus,
25    TaskStream,
26};
27use async_trait::async_trait;
28use serde::Deserialize;
29use serde_json::Value;
30use tracing::{debug, instrument};
31
32// ── DID types ─────────────────────────────────────────────────────────────────
33
34#[derive(Debug, Deserialize)]
35struct DidDocument {
36    id: String,
37    #[serde(default)]
38    service: Vec<DidService>,
39    // Retained for forward-compat deserialization of full DID documents.
40    #[serde(default, rename = "verificationMethod")]
41    #[allow(dead_code)]
42    verification_method: Vec<Value>,
43}
44
45#[derive(Debug, Deserialize)]
46struct DidService {
47    id: String,
48    #[serde(rename = "type")]
49    service_type: String,
50    #[serde(rename = "serviceEndpoint")]
51    service_endpoint: ServiceEndpoint,
52}
53
54#[derive(Debug, Deserialize)]
55#[serde(untagged)]
56enum ServiceEndpoint {
57    String(String),
58    // Object endpoints (maps with `uri` key) — retained for spec compliance.
59    #[allow(dead_code)]
60    Object(Value),
61}
62
63impl ServiceEndpoint {
64    fn as_str(&self) -> Option<&str> {
65        match self {
66            Self::String(s) => Some(s),
67            Self::Object(_) => None,
68        }
69    }
70}
71
72// ── ANP adapter ───────────────────────────────────────────────────────────────
73
74pub struct AnpAdapter {
75    http: reqwest::Client,
76}
77
78impl AnpAdapter {
79    pub fn new() -> Self {
80        Self {
81            http: reqwest::Client::builder()
82                .timeout(std::time::Duration::from_secs(10))
83                .build()
84                .expect("reqwest client"),
85        }
86    }
87
88    /// Resolve a DID to its DID Document URL.
89    ///
90    /// Supports `did:web` method: `did:web:example.com:path` →
91    /// `https://example.com/path/.well-known/did.json`
92    fn did_to_url(did: &str) -> Result<String, String> {
93        if let Some(rest) = did.strip_prefix("did:web:") {
94            // did:web:example.com → https://example.com/.well-known/did.json
95            // did:web:example.com:agents:analyst → https://example.com/agents/analyst/did.json
96            let parts: Vec<&str> = rest.splitn(2, ':').collect();
97            let host = parts[0];
98            let path = if parts.len() > 1 {
99                format!("/{}/did.json", parts[1].replace(':', "/"))
100            } else {
101                "/.well-known/did.json".to_string()
102            };
103            Ok(format!("https://{host}{path}"))
104        } else if did.starts_with("http://") || did.starts_with("https://") {
105            // Allow direct URLs for development/testing.
106            Ok(did.to_string())
107        } else {
108            Err(format!(
109                "ANP: unsupported DID method (only did:web supported): {did}"
110            ))
111        }
112    }
113
114    async fn resolve_did(&self, did: &str) -> Result<DidDocument, String> {
115        let url = Self::did_to_url(did)?;
116        debug!(did = %did, url = %url, "Resolving DID document");
117
118        let doc: DidDocument = self
119            .http
120            .get(&url)
121            .send()
122            .await
123            .map_err(|e| format!("DID resolution failed for {did}: {e}"))?
124            .json()
125            .await
126            .map_err(|e| format!("DID document parse error: {e}"))?;
127
128        Ok(doc)
129    }
130
131    /// Find the A2A service endpoint in a DID Document.
132    fn find_a2a_endpoint(doc: &DidDocument) -> Option<&str> {
133        doc.service
134            .iter()
135            .find(|s| {
136                s.service_type == "A2AService"
137                    || s.service_type == "AgentService"
138                    || s.id.contains("a2a")
139            })
140            .and_then(|s| s.service_endpoint.as_str())
141    }
142}
143
144impl Default for AnpAdapter {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150#[async_trait]
151impl ProtocolAdapter for AnpAdapter {
152    #[instrument(skip(self), fields(did = %url))]
153    async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String> {
154        let doc = self.resolve_did(url).await?;
155
156        // Try to fetch Agent Card from the A2A service endpoint.
157        let skills = if let Some(endpoint) = Self::find_a2a_endpoint(&doc) {
158            let card_url = format!("{}/.well-known/agent.json", endpoint.trim_end_matches('/'));
159            match self.http.get(&card_url).send().await {
160                Ok(resp) if resp.status().is_success() => {
161                    if let Ok(card) = resp.json::<serde_json::Value>().await {
162                        card["capabilities"]["skills"]
163                            .as_array()
164                            .map(|arr| {
165                                arr.iter()
166                                    .map(|s| RemoteSkill {
167                                        name: s["name"].as_str().unwrap_or("unknown").to_string(),
168                                        description: s["description"].as_str().map(str::to_string),
169                                        input_schema: s.get("input_schema").cloned(),
170                                        output_schema: None,
171                                    })
172                                    .collect()
173                            })
174                            .unwrap_or_default()
175                    } else {
176                        vec![]
177                    }
178                }
179                _ => vec![],
180            }
181        } else {
182            vec![]
183        };
184
185        Ok(RemoteCapabilities {
186            name: doc.id.clone(),
187            description: Some(format!("ANP agent: {}", doc.id)),
188            skills,
189            protocols: vec!["anp".into(), "a2a".into()],
190        })
191    }
192
193    #[instrument(skip(self, task), fields(did = %url))]
194    async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String> {
195        let doc = self.resolve_did(url).await?;
196        let endpoint = Self::find_a2a_endpoint(&doc)
197            .ok_or_else(|| format!("ANP: no A2A service endpoint in DID document for {url}"))?;
198
199        // Delegate to A2A protocol.
200        let a2a = self::a2a_delegate::invoke_a2a(endpoint, task).await?;
201        Ok(TaskHandle {
202            task_id: a2a,
203            remote_url: endpoint.to_string(),
204        })
205    }
206
207    async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String> {
208        // Resolve DID, then stream via A2A.
209        let handle = self.invoke(url, task).await?;
210        // Emit a single completed placeholder — real SSE from resolved endpoint.
211        use tokio_stream::once;
212        Ok(Box::pin(once(crate::TaskEvent::Failed {
213            error: format!("ANP streaming not yet wired for task {}", handle.task_id),
214        })))
215    }
216
217    async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String> {
218        let doc = self.resolve_did(url).await?;
219        let endpoint = Self::find_a2a_endpoint(&doc)
220            .ok_or_else(|| format!("ANP: no A2A endpoint for {url}"))?;
221        // Forward status check to resolved A2A endpoint.
222        let _ = (endpoint, task_id);
223        Ok(TaskStatus::Working)
224    }
225
226    async fn cancel(&self, _url: &str, _task_id: &str) -> Result<(), String> {
227        Ok(())
228    }
229}
230
231// ── Internal A2A delegation helper ────────────────────────────────────────────
232
233mod a2a_delegate {
234    use crate::TaskRequest;
235    use uuid::Uuid;
236
237    pub async fn invoke_a2a(endpoint: &str, task: TaskRequest) -> Result<String, String> {
238        let task_id = Uuid::new_v4().to_string();
239        let client = reqwest::Client::new();
240        let url = format!("{}/", endpoint.trim_end_matches('/'));
241        let body = serde_json::json!({
242            "jsonrpc": "2.0",
243            "id": 1,
244            "method": "tasks/send",
245            "params": {
246                "id": task_id,
247                "message": {
248                    "role": "user",
249                    "parts": [{ "type": "data", "data": task.input }],
250                    "metadata": { "skill": task.skill }
251                }
252            }
253        });
254
255        client
256            .post(&url)
257            .json(&body)
258            .send()
259            .await
260            .map_err(|e| format!("ANP→A2A delegate failed: {e}"))?;
261
262        Ok(task_id)
263    }
264}