1use crate::client::LdpClient;
10use crate::config::LdpAdapterConfig;
11use crate::protocol::{
12 ProtocolAdapter, RemoteCapabilities, RemoteSkill, TaskEvent, TaskHandle, TaskRequest,
13 TaskStatus, TaskStream,
14};
15use crate::session_manager::SessionManager;
16use crate::types::contract::{DelegationContract, FailurePolicy};
17use crate::types::error::LdpError;
18use crate::types::messages::{LdpEnvelope, LdpMessageBody};
19use crate::types::provenance::Provenance;
20use crate::types::verification::ProvenanceEntry;
21
22use async_trait::async_trait;
23use serde_json::{json, Value};
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::RwLock;
27use tracing::{debug, info, instrument};
28
29fn validate_contract(contract: &DelegationContract, provenance: &Provenance) -> Vec<String> {
31 let mut violations = Vec::new();
32
33 if let Some(ref deadline_str) = contract.deadline {
35 if let Ok(deadline) = chrono::DateTime::parse_from_rfc3339(deadline_str) {
36 if chrono::Utc::now() > deadline {
37 violations.push("deadline_exceeded".into());
38 }
39 }
40 }
41
42 if let Some(ref budget) = contract.policy.budget {
44 if let (Some(max), Some(used)) = (budget.max_tokens, provenance.tokens_used) {
45 if used > max {
46 violations.push("budget_tokens_exceeded".into());
47 }
48 }
49 if let (Some(max), Some(used)) = (budget.max_cost_usd, provenance.cost_usd) {
50 if used > max {
51 violations.push("budget_cost_exceeded".into());
52 }
53 }
54 }
55
56 violations
57}
58
59fn build_lineage_entry(provenance: &Provenance, skill: &str) -> ProvenanceEntry {
61 ProvenanceEntry {
62 delegate_id: provenance.produced_by.clone(),
63 model_version: provenance.model_version.clone(),
64 step: skill.to_string(),
65 timestamp: provenance.timestamp.clone(),
66 verification_status: provenance.verification_status.clone(),
67 }
68}
69
70pub struct LdpAdapter {
75 session_manager: SessionManager,
76 client: LdpClient,
77 config: LdpAdapterConfig,
78 contracts: Arc<RwLock<HashMap<String, DelegationContract>>>,
79}
80
81impl LdpAdapter {
82 pub fn new(config: LdpAdapterConfig) -> Self {
84 let client = LdpClient::new();
85 let session_manager = SessionManager::new(client.clone(), config.clone());
86 Self {
87 session_manager,
88 client,
89 config,
90 contracts: Arc::new(RwLock::new(HashMap::new())),
91 }
92 }
93
94 pub fn with_client(config: LdpAdapterConfig, client: LdpClient) -> Self {
96 let session_manager = SessionManager::new(client.clone(), config.clone());
97 Self {
98 session_manager,
99 client,
100 config,
101 contracts: Arc::new(RwLock::new(HashMap::new())),
102 }
103 }
104
105 pub fn session_manager(&self) -> &SessionManager {
107 &self.session_manager
108 }
109
110 fn identity_to_capabilities(
112 &self,
113 identity: &crate::types::identity::LdpIdentityCard,
114 ) -> RemoteCapabilities {
115 let skills = identity
116 .capabilities
117 .iter()
118 .map(|cap| RemoteSkill {
119 name: cap.name.clone(),
120 description: cap.description.clone(),
121 input_schema: cap.input_schema.clone(),
122 output_schema: cap.output_schema.clone(),
123 })
124 .collect();
125
126 RemoteCapabilities {
127 name: identity.name.clone(),
128 description: identity.description.clone(),
129 skills,
130 protocols: vec!["ldp".into()],
131 }
132 }
133
134 fn embed_provenance(&self, output: Value, provenance: Provenance) -> Value {
136 if self.config.attach_provenance {
137 match output {
138 Value::Object(mut map) => {
139 map.insert("ldp_provenance".into(), provenance.to_value());
140 Value::Object(map)
141 }
142 other => {
143 json!({
144 "result": other,
145 "ldp_provenance": provenance.to_value()
146 })
147 }
148 }
149 } else {
150 output
151 }
152 }
153
154 fn apply_contract_validation(
156 &self,
157 contract: &DelegationContract,
158 output: Value,
159 mut provenance: Provenance,
160 ) -> TaskStatus {
161 let violations = validate_contract(contract, &provenance);
162
163 provenance.contract_id = Some(contract.contract_id.clone());
164 provenance.contract_satisfied = Some(violations.is_empty());
165 provenance.contract_violations = violations.clone();
166
167 let output = self.embed_provenance(output, provenance);
168
169 if !violations.is_empty() && contract.policy.failure_policy == FailurePolicy::FailClosed {
170 let summary = violations.join(", ");
171 TaskStatus::Failed {
172 error: LdpError::policy(
173 "CONTRACT_VIOLATED",
174 format!("Contract violations: {}", summary),
175 )
176 .with_partial_output(output),
177 }
178 } else {
179 TaskStatus::Completed { output }
180 }
181 }
182}
183
184#[async_trait]
185impl ProtocolAdapter for LdpAdapter {
186 #[instrument(skip(self), fields(url = %url))]
192 async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String> {
193 info!(url = %url, "Discovering LDP delegate");
194
195 let identity = self.client.fetch_identity_card(url).await?;
197
198 if self.config.enforce_trust_domains
200 && !self.config.trust_domain.trusts(&identity.trust_domain.name)
201 {
202 return Err(format!(
203 "Trust domain '{}' is not trusted by '{}'",
204 identity.trust_domain.name, self.config.trust_domain.name
205 ));
206 }
207
208 let capabilities = self.identity_to_capabilities(&identity);
210 debug!(
211 name = %capabilities.name,
212 skills = capabilities.skills.len(),
213 "LDP delegate discovered"
214 );
215
216 Ok(capabilities)
217 }
218
219 #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
225 async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String> {
226 info!(url = %url, skill = %task.skill, "Invoking LDP task");
227
228 let session = self.session_manager.get_or_establish(url).await?;
230
231 let task_id = uuid::Uuid::new_v4().to_string();
233 let mut submit = LdpEnvelope::new(
234 &session.session_id,
235 &self.config.delegate_id,
236 &session.remote_delegate_id,
237 LdpMessageBody::TaskSubmit {
238 task_id: task_id.clone(),
239 skill: task.skill.clone(),
240 input: task.input.clone(),
241 contract: task.contract.clone(),
242 },
243 session.payload.mode,
244 );
245
246 if let Some(ref secret) = self.config.signing_secret {
248 crate::signing::apply_signature(&mut submit, secret);
249 }
250
251 let _response = self.client.send_message(url, &submit).await?;
252
253 if let Some(ref contract) = task.contract {
255 let mut contracts = self.contracts.write().await;
256 contracts.insert(task_id.clone(), contract.clone());
257 }
258
259 self.session_manager.touch(url).await;
261
262 debug!(task_id = %task_id, "LDP task submitted");
263
264 Ok(TaskHandle {
265 task_id,
266 remote_url: url.to_string(),
267 })
268 }
269
270 #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
275 async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String> {
276 let contract = task.contract.clone();
278
279 let handle = self.invoke(url, task).await?;
280 let client = self.client.clone();
281 let config = self.config.clone();
282 let url = url.to_string();
283 let task_id = handle.task_id.clone();
284
285 let stream = async_stream::stream! {
288 let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
289 loop {
290 interval.tick().await;
291
292 let mut status_query = LdpEnvelope::new(
294 "",
295 &config.delegate_id,
296 &url,
297 LdpMessageBody::TaskUpdate {
298 task_id: task_id.clone(),
299 progress: None,
300 message: Some("status_query".into()),
301 },
302 crate::types::payload::PayloadMode::Text,
303 );
304
305 if let Some(ref secret) = config.signing_secret {
307 crate::signing::apply_signature(&mut status_query, secret);
308 }
309
310 match client.send_message(&url, &status_query).await {
311 Ok(response) => match response.body {
312 LdpMessageBody::TaskUpdate { progress, message, .. } => {
313 yield TaskEvent::Progress {
314 message: message.unwrap_or_default(),
315 progress,
316 };
317 }
318 LdpMessageBody::TaskResult { output, mut provenance, .. } => {
319 provenance.lineage.insert(0, build_lineage_entry(&provenance, "task"));
320 provenance.normalize();
321 let output_with_provenance = if config.attach_provenance {
322 match output {
323 Value::Object(mut map) => {
324 map.insert("ldp_provenance".into(),
325 provenance.to_value());
326 Value::Object(map)
327 }
328 other => json!({
329 "result": other,
330 "ldp_provenance": provenance.to_value()
331 }),
332 }
333 } else {
334 output
335 };
336
337 if let Some(ref contract) = contract {
339 let violations = validate_contract(contract, &provenance);
340 if !violations.is_empty() && contract.policy.failure_policy == FailurePolicy::FailClosed {
341 let summary = violations.join(", ");
342 yield TaskEvent::Failed {
343 error: LdpError::policy(
344 "CONTRACT_VIOLATED",
345 format!("Contract violations: {}", summary),
346 )
347 .with_partial_output(output_with_provenance),
348 };
349 } else {
350 yield TaskEvent::Completed { output: output_with_provenance };
351 }
352 } else {
353 yield TaskEvent::Completed { output: output_with_provenance };
354 }
355 break;
356 }
357 LdpMessageBody::TaskFailed { error, .. } => {
358 yield TaskEvent::Failed { error };
359 break;
360 }
361 _ => {}
362 },
363 Err(e) => {
364 yield TaskEvent::Failed { error: LdpError::transport("STREAM_ERROR", e) };
365 break;
366 }
367 }
368 }
369 };
370
371 Ok(Box::pin(stream))
372 }
373
374 #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
376 async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String> {
377 debug!(task_id = %task_id, "Polling LDP task status");
378
379 let mut query = LdpEnvelope::new(
380 "",
381 &self.config.delegate_id,
382 url,
383 LdpMessageBody::TaskUpdate {
384 task_id: task_id.to_string(),
385 progress: None,
386 message: Some("status_query".into()),
387 },
388 crate::types::payload::PayloadMode::Text,
389 );
390
391 if let Some(ref secret) = self.config.signing_secret {
393 crate::signing::apply_signature(&mut query, secret);
394 }
395
396 let response = self.client.send_message(url, &query).await?;
397
398 match response.body {
399 LdpMessageBody::TaskUpdate { message, .. } => {
400 let msg = message.unwrap_or_default();
401 if msg == "submitted" {
402 Ok(TaskStatus::Submitted)
403 } else {
404 Ok(TaskStatus::Working)
405 }
406 }
407 LdpMessageBody::TaskResult {
408 output,
409 mut provenance,
410 ..
411 } => {
412 provenance
413 .lineage
414 .insert(0, build_lineage_entry(&provenance, "task"));
415 provenance.normalize();
416 let contracts = self.contracts.read().await;
417 if let Some(contract) = contracts.get(task_id) {
418 Ok(self.apply_contract_validation(contract, output, provenance))
419 } else {
420 let output = self.embed_provenance(output, provenance);
421 Ok(TaskStatus::Completed { output })
422 }
423 }
424 LdpMessageBody::TaskFailed { error, .. } => Ok(TaskStatus::Failed { error }),
425 _ => Ok(TaskStatus::Working),
426 }
427 }
428
429 #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
431 async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String> {
432 info!(task_id = %task_id, "Cancelling LDP task");
433
434 let mut cancel_msg = LdpEnvelope::new(
435 "",
436 &self.config.delegate_id,
437 url,
438 LdpMessageBody::TaskCancel {
439 task_id: task_id.to_string(),
440 },
441 crate::types::payload::PayloadMode::Text,
442 );
443
444 if let Some(ref secret) = self.config.signing_secret {
446 crate::signing::apply_signature(&mut cancel_msg, secret);
447 }
448
449 self.client.send_message(url, &cancel_msg).await?;
450 Ok(())
451 }
452}