1use crate::{
24 ProtocolAdapter, RemoteCapabilities, RemoteSkill, TaskHandle, TaskRequest, TaskStatus,
25 TaskStream,
26};
27use async_trait::async_trait;
28use serde::Deserialize;
29use serde_json::Value;
30use tracing::{debug, instrument};
31
32#[derive(Debug, Deserialize)]
35struct DidDocument {
36 id: String,
37 #[serde(default)]
38 service: Vec<DidService>,
39 #[serde(default, rename = "verificationMethod")]
41 #[allow(dead_code)]
42 verification_method: Vec<Value>,
43}
44
45#[derive(Debug, Deserialize)]
46struct DidService {
47 id: String,
48 #[serde(rename = "type")]
49 service_type: String,
50 #[serde(rename = "serviceEndpoint")]
51 service_endpoint: ServiceEndpoint,
52}
53
54#[derive(Debug, Deserialize)]
55#[serde(untagged)]
56enum ServiceEndpoint {
57 String(String),
58 #[allow(dead_code)]
60 Object(Value),
61}
62
63impl ServiceEndpoint {
64 fn as_str(&self) -> Option<&str> {
65 match self {
66 Self::String(s) => Some(s),
67 Self::Object(_) => None,
68 }
69 }
70}
71
72pub struct AnpAdapter {
75 http: reqwest::Client,
76}
77
78impl AnpAdapter {
79 pub fn new() -> Self {
80 Self {
81 http: reqwest::Client::builder()
82 .timeout(std::time::Duration::from_secs(10))
83 .build()
84 .expect("reqwest client"),
85 }
86 }
87
88 fn did_to_url(did: &str) -> Result<String, String> {
93 if let Some(rest) = did.strip_prefix("did:web:") {
94 let parts: Vec<&str> = rest.splitn(2, ':').collect();
97 let host = parts[0];
98 let path = if parts.len() > 1 {
99 format!("/{}/did.json", parts[1].replace(':', "/"))
100 } else {
101 "/.well-known/did.json".to_string()
102 };
103 Ok(format!("https://{host}{path}"))
104 } else if did.starts_with("http://") || did.starts_with("https://") {
105 Ok(did.to_string())
107 } else {
108 Err(format!(
109 "ANP: unsupported DID method (only did:web supported): {did}"
110 ))
111 }
112 }
113
114 async fn resolve_did(&self, did: &str) -> Result<DidDocument, String> {
115 let url = Self::did_to_url(did)?;
116 debug!(did = %did, url = %url, "Resolving DID document");
117
118 let doc: DidDocument = self
119 .http
120 .get(&url)
121 .send()
122 .await
123 .map_err(|e| format!("DID resolution failed for {did}: {e}"))?
124 .json()
125 .await
126 .map_err(|e| format!("DID document parse error: {e}"))?;
127
128 Ok(doc)
129 }
130
131 fn find_a2a_endpoint(doc: &DidDocument) -> Option<&str> {
133 doc.service
134 .iter()
135 .find(|s| {
136 s.service_type == "A2AService"
137 || s.service_type == "AgentService"
138 || s.id.contains("a2a")
139 })
140 .and_then(|s| s.service_endpoint.as_str())
141 }
142}
143
144impl Default for AnpAdapter {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150#[async_trait]
151impl ProtocolAdapter for AnpAdapter {
152 #[instrument(skip(self), fields(did = %url))]
153 async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String> {
154 let doc = self.resolve_did(url).await?;
155
156 let skills = if let Some(endpoint) = Self::find_a2a_endpoint(&doc) {
158 let card_url = format!("{}/.well-known/agent.json", endpoint.trim_end_matches('/'));
159 match self.http.get(&card_url).send().await {
160 Ok(resp) if resp.status().is_success() => {
161 if let Ok(card) = resp.json::<serde_json::Value>().await {
162 card["capabilities"]["skills"]
163 .as_array()
164 .map(|arr| {
165 arr.iter()
166 .map(|s| RemoteSkill {
167 name: s["name"].as_str().unwrap_or("unknown").to_string(),
168 description: s["description"].as_str().map(str::to_string),
169 input_schema: s.get("input_schema").cloned(),
170 output_schema: None,
171 })
172 .collect()
173 })
174 .unwrap_or_default()
175 } else {
176 vec![]
177 }
178 }
179 _ => vec![],
180 }
181 } else {
182 vec![]
183 };
184
185 Ok(RemoteCapabilities {
186 name: doc.id.clone(),
187 description: Some(format!("ANP agent: {}", doc.id)),
188 skills,
189 protocols: vec!["anp".into(), "a2a".into()],
190 })
191 }
192
193 #[instrument(skip(self, task), fields(did = %url))]
194 async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String> {
195 let doc = self.resolve_did(url).await?;
196 let endpoint = Self::find_a2a_endpoint(&doc)
197 .ok_or_else(|| format!("ANP: no A2A service endpoint in DID document for {url}"))?;
198
199 let a2a = self::a2a_delegate::invoke_a2a(endpoint, task).await?;
201 Ok(TaskHandle {
202 task_id: a2a,
203 remote_url: endpoint.to_string(),
204 })
205 }
206
207 async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String> {
208 let handle = self.invoke(url, task).await?;
210 use tokio_stream::once;
212 Ok(Box::pin(once(crate::TaskEvent::Failed {
213 error: format!("ANP streaming not yet wired for task {}", handle.task_id),
214 })))
215 }
216
217 async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String> {
218 let doc = self.resolve_did(url).await?;
219 let endpoint = Self::find_a2a_endpoint(&doc)
220 .ok_or_else(|| format!("ANP: no A2A endpoint for {url}"))?;
221 let _ = (endpoint, task_id);
223 Ok(TaskStatus::Working)
224 }
225
226 async fn cancel(&self, _url: &str, _task_id: &str) -> Result<(), String> {
227 Ok(())
228 }
229}
230
231mod a2a_delegate {
234 use crate::TaskRequest;
235 use uuid::Uuid;
236
237 pub async fn invoke_a2a(endpoint: &str, task: TaskRequest) -> Result<String, String> {
238 let task_id = Uuid::new_v4().to_string();
239 let client = reqwest::Client::new();
240 let url = format!("{}/", endpoint.trim_end_matches('/'));
241 let body = serde_json::json!({
242 "jsonrpc": "2.0",
243 "id": 1,
244 "method": "tasks/send",
245 "params": {
246 "id": task_id,
247 "message": {
248 "role": "user",
249 "parts": [{ "type": "data", "data": task.input }],
250 "metadata": { "skill": task.skill }
251 }
252 }
253 });
254
255 client
256 .post(&url)
257 .json(&body)
258 .send()
259 .await
260 .map_err(|e| format!("ANP→A2A delegate failed: {e}"))?;
261
262 Ok(task_id)
263 }
264}