1use crate::client::LdpClient;
10use crate::config::LdpAdapterConfig;
11use crate::protocol::{
12 ProtocolAdapter, RemoteCapabilities, RemoteSkill, TaskEvent, TaskHandle, TaskRequest,
13 TaskStatus, TaskStream,
14};
15use crate::session_manager::SessionManager;
16use crate::types::error::LdpError;
17use crate::types::messages::{LdpEnvelope, LdpMessageBody};
18use crate::types::provenance::Provenance;
19
20use async_trait::async_trait;
21use serde_json::{json, Value};
22use tracing::{debug, info, instrument};
23
24pub struct LdpAdapter {
29 session_manager: SessionManager,
30 client: LdpClient,
31 config: LdpAdapterConfig,
32}
33
34impl LdpAdapter {
35 pub fn new(config: LdpAdapterConfig) -> Self {
37 let client = LdpClient::new();
38 let session_manager = SessionManager::new(client.clone(), config.clone());
39 Self {
40 session_manager,
41 client,
42 config,
43 }
44 }
45
46 pub fn with_client(config: LdpAdapterConfig, client: LdpClient) -> Self {
48 let session_manager = SessionManager::new(client.clone(), config.clone());
49 Self {
50 session_manager,
51 client,
52 config,
53 }
54 }
55
56 pub fn session_manager(&self) -> &SessionManager {
58 &self.session_manager
59 }
60
61 fn identity_to_capabilities(
63 &self,
64 identity: &crate::types::identity::LdpIdentityCard,
65 ) -> RemoteCapabilities {
66 let skills = identity
67 .capabilities
68 .iter()
69 .map(|cap| RemoteSkill {
70 name: cap.name.clone(),
71 description: cap.description.clone(),
72 input_schema: cap.input_schema.clone(),
73 output_schema: cap.output_schema.clone(),
74 })
75 .collect();
76
77 RemoteCapabilities {
78 name: identity.name.clone(),
79 description: identity.description.clone(),
80 skills,
81 protocols: vec!["ldp".into()],
82 }
83 }
84
85 fn embed_provenance(&self, output: Value, provenance: Provenance) -> Value {
87 if self.config.attach_provenance {
88 match output {
89 Value::Object(mut map) => {
90 map.insert("ldp_provenance".into(), provenance.to_value());
91 Value::Object(map)
92 }
93 other => {
94 json!({
95 "result": other,
96 "ldp_provenance": provenance.to_value()
97 })
98 }
99 }
100 } else {
101 output
102 }
103 }
104}
105
106#[async_trait]
107impl ProtocolAdapter for LdpAdapter {
108 #[instrument(skip(self), fields(url = %url))]
114 async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String> {
115 info!(url = %url, "Discovering LDP delegate");
116
117 let identity = self.client.fetch_identity_card(url).await?;
119
120 if self.config.enforce_trust_domains {
122 if !self.config.trust_domain.trusts(&identity.trust_domain.name) {
123 return Err(format!(
124 "Trust domain '{}' is not trusted by '{}'",
125 identity.trust_domain.name, self.config.trust_domain.name
126 ));
127 }
128 }
129
130 let capabilities = self.identity_to_capabilities(&identity);
132 debug!(
133 name = %capabilities.name,
134 skills = capabilities.skills.len(),
135 "LDP delegate discovered"
136 );
137
138 Ok(capabilities)
139 }
140
141 #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
147 async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String> {
148 info!(url = %url, skill = %task.skill, "Invoking LDP task");
149
150 let session = self.session_manager.get_or_establish(url).await?;
152
153 let task_id = uuid::Uuid::new_v4().to_string();
155 let mut submit = LdpEnvelope::new(
156 &session.session_id,
157 &self.config.delegate_id,
158 &session.remote_delegate_id,
159 LdpMessageBody::TaskSubmit {
160 task_id: task_id.clone(),
161 skill: task.skill.clone(),
162 input: task.input.clone(),
163 },
164 session.payload.mode,
165 );
166
167 if let Some(ref secret) = self.config.signing_secret {
169 crate::signing::apply_signature(&mut submit, secret);
170 }
171
172 let _response = self.client.send_message(url, &submit).await?;
173
174 self.session_manager.touch(url).await;
176
177 debug!(task_id = %task_id, "LDP task submitted");
178
179 Ok(TaskHandle {
180 task_id,
181 remote_url: url.to_string(),
182 })
183 }
184
185 #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
190 async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String> {
191 let handle = self.invoke(url, task).await?;
192 let client = self.client.clone();
193 let config = self.config.clone();
194 let url = url.to_string();
195 let task_id = handle.task_id.clone();
196
197 let stream = async_stream::stream! {
200 let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
201 loop {
202 interval.tick().await;
203
204 let mut status_query = LdpEnvelope::new(
206 "",
207 &config.delegate_id,
208 &url,
209 LdpMessageBody::TaskUpdate {
210 task_id: task_id.clone(),
211 progress: None,
212 message: Some("status_query".into()),
213 },
214 crate::types::payload::PayloadMode::Text,
215 );
216
217 if let Some(ref secret) = config.signing_secret {
219 crate::signing::apply_signature(&mut status_query, secret);
220 }
221
222 match client.send_message(&url, &status_query).await {
223 Ok(response) => match response.body {
224 LdpMessageBody::TaskUpdate { progress, message, .. } => {
225 yield TaskEvent::Progress {
226 message: message.unwrap_or_default(),
227 progress,
228 };
229 }
230 LdpMessageBody::TaskResult { output, provenance, .. } => {
231 let output_with_provenance = if config.attach_provenance {
232 match output {
233 Value::Object(mut map) => {
234 map.insert("ldp_provenance".into(),
235 provenance.to_value());
236 Value::Object(map)
237 }
238 other => json!({
239 "result": other,
240 "ldp_provenance": provenance.to_value()
241 }),
242 }
243 } else {
244 output
245 };
246 yield TaskEvent::Completed { output: output_with_provenance };
247 break;
248 }
249 LdpMessageBody::TaskFailed { error, .. } => {
250 yield TaskEvent::Failed { error };
251 break;
252 }
253 _ => {}
254 },
255 Err(e) => {
256 yield TaskEvent::Failed { error: LdpError::transport("STREAM_ERROR", e) };
257 break;
258 }
259 }
260 }
261 };
262
263 Ok(Box::pin(stream))
264 }
265
266 #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
268 async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String> {
269 debug!(task_id = %task_id, "Polling LDP task status");
270
271 let mut query = LdpEnvelope::new(
272 "",
273 &self.config.delegate_id,
274 url,
275 LdpMessageBody::TaskUpdate {
276 task_id: task_id.to_string(),
277 progress: None,
278 message: Some("status_query".into()),
279 },
280 crate::types::payload::PayloadMode::Text,
281 );
282
283 if let Some(ref secret) = self.config.signing_secret {
285 crate::signing::apply_signature(&mut query, secret);
286 }
287
288 let response = self.client.send_message(url, &query).await?;
289
290 match response.body {
291 LdpMessageBody::TaskUpdate { message, .. } => {
292 let msg = message.unwrap_or_default();
293 if msg == "submitted" {
294 Ok(TaskStatus::Submitted)
295 } else {
296 Ok(TaskStatus::Working)
297 }
298 }
299 LdpMessageBody::TaskResult { output, provenance, .. } => {
300 let output = self.embed_provenance(output, provenance);
301 Ok(TaskStatus::Completed { output })
302 }
303 LdpMessageBody::TaskFailed { error, .. } => Ok(TaskStatus::Failed { error }),
304 _ => Ok(TaskStatus::Working),
305 }
306 }
307
308 #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
310 async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String> {
311 info!(task_id = %task_id, "Cancelling LDP task");
312
313 let mut cancel_msg = LdpEnvelope::new(
314 "",
315 &self.config.delegate_id,
316 url,
317 LdpMessageBody::TaskCancel {
318 task_id: task_id.to_string(),
319 },
320 crate::types::payload::PayloadMode::Text,
321 );
322
323 if let Some(ref secret) = self.config.signing_secret {
325 crate::signing::apply_signature(&mut cancel_msg, secret);
326 }
327
328 self.client.send_message(url, &cancel_msg).await?;
329 Ok(())
330 }
331}