Skip to main content

ldp_protocol/
adapter.rs

1//! LDP protocol adapter — implements the `ProtocolAdapter` trait.
2//!
3//! This is the primary integration point. The adapter:
4//! - Translates `discover/invoke/stream/status/cancel` into LDP messages
5//! - Manages sessions transparently (callers see request->response)
6//! - Attaches provenance to all results
7//! - Enforces trust domain boundaries
8
9use crate::client::LdpClient;
10use crate::config::LdpAdapterConfig;
11use crate::protocol::{
12    ProtocolAdapter, RemoteCapabilities, RemoteSkill, TaskEvent, TaskHandle, TaskRequest,
13    TaskStatus, TaskStream,
14};
15use crate::session_manager::SessionManager;
16use crate::types::contract::{DelegationContract, FailurePolicy};
17use crate::types::error::LdpError;
18use crate::types::messages::{LdpEnvelope, LdpMessageBody};
19use crate::types::provenance::Provenance;
20use crate::types::verification::ProvenanceEntry;
21
22use async_trait::async_trait;
23use serde_json::{json, Value};
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::RwLock;
27use tracing::{debug, info, instrument};
28
29/// Validate a task result against its contract. Returns violation codes.
30fn validate_contract(contract: &DelegationContract, provenance: &Provenance) -> Vec<String> {
31    let mut violations = Vec::new();
32
33    // Deadline check (client's local UTC time is authoritative)
34    if let Some(ref deadline_str) = contract.deadline {
35        if let Ok(deadline) = chrono::DateTime::parse_from_rfc3339(deadline_str) {
36            if chrono::Utc::now() > deadline {
37                violations.push("deadline_exceeded".into());
38            }
39        }
40    }
41
42    // Budget token check
43    if let Some(ref budget) = contract.policy.budget {
44        if let (Some(max), Some(used)) = (budget.max_tokens, provenance.tokens_used) {
45            if used > max {
46                violations.push("budget_tokens_exceeded".into());
47            }
48        }
49        if let (Some(max), Some(used)) = (budget.max_cost_usd, provenance.cost_usd) {
50            if used > max {
51                violations.push("budget_cost_exceeded".into());
52            }
53        }
54    }
55
56    violations
57}
58
59/// Build a lineage entry from a provenance record and the skill name.
60fn build_lineage_entry(provenance: &Provenance, skill: &str) -> ProvenanceEntry {
61    ProvenanceEntry {
62        delegate_id: provenance.produced_by.clone(),
63        model_version: provenance.model_version.clone(),
64        step: skill.to_string(),
65        timestamp: provenance.timestamp.clone(),
66        verification_status: provenance.verification_status.clone(),
67    }
68}
69
70/// LDP protocol adapter.
71///
72/// Can be used standalone or registered with a `ProtocolRegistry`
73/// (including JamJet's registry via the `jamjet` feature).
74pub struct LdpAdapter {
75    session_manager: SessionManager,
76    client: LdpClient,
77    config: LdpAdapterConfig,
78    contracts: Arc<RwLock<HashMap<String, DelegationContract>>>,
79}
80
81impl LdpAdapter {
82    /// Create a new LDP adapter with the given configuration.
83    pub fn new(config: LdpAdapterConfig) -> Self {
84        let client = LdpClient::new();
85        let session_manager = SessionManager::new(client.clone(), config.clone());
86        Self {
87            session_manager,
88            client,
89            config,
90            contracts: Arc::new(RwLock::new(HashMap::new())),
91        }
92    }
93
94    /// Create with a custom HTTP client (useful for testing).
95    pub fn with_client(config: LdpAdapterConfig, client: LdpClient) -> Self {
96        let session_manager = SessionManager::new(client.clone(), config.clone());
97        Self {
98            session_manager,
99            client,
100            config,
101            contracts: Arc::new(RwLock::new(HashMap::new())),
102        }
103    }
104
105    /// Get the session manager (for external session control).
106    pub fn session_manager(&self) -> &SessionManager {
107        &self.session_manager
108    }
109
110    /// Convert an LDP identity card to RemoteCapabilities.
111    fn identity_to_capabilities(
112        &self,
113        identity: &crate::types::identity::LdpIdentityCard,
114    ) -> RemoteCapabilities {
115        let skills = identity
116            .capabilities
117            .iter()
118            .map(|cap| RemoteSkill {
119                name: cap.name.clone(),
120                description: cap.description.clone(),
121                input_schema: cap.input_schema.clone(),
122                output_schema: cap.output_schema.clone(),
123            })
124            .collect();
125
126        RemoteCapabilities {
127            name: identity.name.clone(),
128            description: identity.description.clone(),
129            skills,
130            protocols: vec!["ldp".into()],
131        }
132    }
133
134    /// Embed provenance into a task output Value.
135    fn embed_provenance(&self, output: Value, provenance: Provenance) -> Value {
136        if self.config.attach_provenance {
137            match output {
138                Value::Object(mut map) => {
139                    map.insert("ldp_provenance".into(), provenance.to_value());
140                    Value::Object(map)
141                }
142                other => {
143                    json!({
144                        "result": other,
145                        "ldp_provenance": provenance.to_value()
146                    })
147                }
148            }
149        } else {
150            output
151        }
152    }
153
154    /// Apply contract validation to a completed task, returning final TaskStatus.
155    fn apply_contract_validation(
156        &self,
157        contract: &DelegationContract,
158        output: Value,
159        mut provenance: Provenance,
160    ) -> TaskStatus {
161        let violations = validate_contract(contract, &provenance);
162
163        provenance.contract_id = Some(contract.contract_id.clone());
164        provenance.contract_satisfied = Some(violations.is_empty());
165        provenance.contract_violations = violations.clone();
166
167        let output = self.embed_provenance(output, provenance);
168
169        if !violations.is_empty() && contract.policy.failure_policy == FailurePolicy::FailClosed {
170            let summary = violations.join(", ");
171            TaskStatus::Failed {
172                error: LdpError::policy(
173                    "CONTRACT_VIOLATED",
174                    format!("Contract violations: {}", summary),
175                )
176                .with_partial_output(output),
177            }
178        } else {
179            TaskStatus::Completed { output }
180        }
181    }
182}
183
184#[async_trait]
185impl ProtocolAdapter for LdpAdapter {
186    /// Discover remote delegate capabilities.
187    ///
188    /// 1. Fetch LDP identity card
189    /// 2. Validate trust domain (if configured)
190    /// 3. Map to RemoteCapabilities
191    #[instrument(skip(self), fields(url = %url))]
192    async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String> {
193        info!(url = %url, "Discovering LDP delegate");
194
195        // Fetch identity card.
196        let identity = self.client.fetch_identity_card(url).await?;
197
198        // Trust domain check.
199        if self.config.enforce_trust_domains
200            && !self.config.trust_domain.trusts(&identity.trust_domain.name)
201        {
202            return Err(format!(
203                "Trust domain '{}' is not trusted by '{}'",
204                identity.trust_domain.name, self.config.trust_domain.name
205            ));
206        }
207
208        // Convert to RemoteCapabilities.
209        let capabilities = self.identity_to_capabilities(&identity);
210        debug!(
211            name = %capabilities.name,
212            skills = capabilities.skills.len(),
213            "LDP delegate discovered"
214        );
215
216        Ok(capabilities)
217    }
218
219    /// Submit a task to an LDP delegate.
220    ///
221    /// 1. Get or establish session (transparent to caller)
222    /// 2. Send TASK_SUBMIT within session
223    /// 3. Return TaskHandle
224    #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
225    async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String> {
226        info!(url = %url, skill = %task.skill, "Invoking LDP task");
227
228        // Step 1: Get or establish session.
229        let session = self.session_manager.get_or_establish(url).await?;
230
231        // Step 2: Send TASK_SUBMIT.
232        let task_id = uuid::Uuid::new_v4().to_string();
233        let mut submit = LdpEnvelope::new(
234            &session.session_id,
235            &self.config.delegate_id,
236            &session.remote_delegate_id,
237            LdpMessageBody::TaskSubmit {
238                task_id: task_id.clone(),
239                skill: task.skill.clone(),
240                input: task.input.clone(),
241                contract: task.contract.clone(),
242            },
243            session.payload.mode,
244        );
245
246        // Sign if configured
247        if let Some(ref secret) = self.config.signing_secret {
248            crate::signing::apply_signature(&mut submit, secret);
249        }
250
251        let _response = self.client.send_message(url, &submit).await?;
252
253        // Store contract for later validation.
254        if let Some(ref contract) = task.contract {
255            let mut contracts = self.contracts.write().await;
256            contracts.insert(task_id.clone(), contract.clone());
257        }
258
259        // Touch session (update last_used, increment task count).
260        self.session_manager.touch(url).await;
261
262        debug!(task_id = %task_id, "LDP task submitted");
263
264        Ok(TaskHandle {
265            task_id,
266            remote_url: url.to_string(),
267        })
268    }
269
270    /// Stream task progress events.
271    ///
272    /// Submits the task, then polls for updates until completion.
273    /// In a full implementation, this would use SSE or WebSocket.
274    #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
275    async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String> {
276        // Capture the contract before invoke() consumes the task.
277        let contract = task.contract.clone();
278
279        let handle = self.invoke(url, task).await?;
280        let client = self.client.clone();
281        let config = self.config.clone();
282        let url = url.to_string();
283        let task_id = handle.task_id.clone();
284
285        // Poll-based streaming: periodically check task status.
286        // In production, replace with SSE subscription.
287        let stream = async_stream::stream! {
288            let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
289            loop {
290                interval.tick().await;
291
292                // Build a status query envelope.
293                let mut status_query = LdpEnvelope::new(
294                    "",
295                    &config.delegate_id,
296                    &url,
297                    LdpMessageBody::TaskUpdate {
298                        task_id: task_id.clone(),
299                        progress: None,
300                        message: Some("status_query".into()),
301                    },
302                    crate::types::payload::PayloadMode::Text,
303                );
304
305                // Sign if configured
306                if let Some(ref secret) = config.signing_secret {
307                    crate::signing::apply_signature(&mut status_query, secret);
308                }
309
310                match client.send_message(&url, &status_query).await {
311                    Ok(response) => match response.body {
312                        LdpMessageBody::TaskUpdate { progress, message, .. } => {
313                            yield TaskEvent::Progress {
314                                message: message.unwrap_or_default(),
315                                progress,
316                            };
317                        }
318                        LdpMessageBody::TaskResult { output, mut provenance, .. } => {
319                            provenance.lineage.insert(0, build_lineage_entry(&provenance, "task"));
320                            provenance.normalize();
321                            let output_with_provenance = if config.attach_provenance {
322                                match output {
323                                    Value::Object(mut map) => {
324                                        map.insert("ldp_provenance".into(),
325                                            provenance.to_value());
326                                        Value::Object(map)
327                                    }
328                                    other => json!({
329                                        "result": other,
330                                        "ldp_provenance": provenance.to_value()
331                                    }),
332                                }
333                            } else {
334                                output
335                            };
336
337                            // Apply contract validation if present
338                            if let Some(ref contract) = contract {
339                                let violations = validate_contract(contract, &provenance);
340                                if !violations.is_empty() && contract.policy.failure_policy == FailurePolicy::FailClosed {
341                                    let summary = violations.join(", ");
342                                    yield TaskEvent::Failed {
343                                        error: LdpError::policy(
344                                            "CONTRACT_VIOLATED",
345                                            format!("Contract violations: {}", summary),
346                                        )
347                                        .with_partial_output(output_with_provenance),
348                                    };
349                                } else {
350                                    yield TaskEvent::Completed { output: output_with_provenance };
351                                }
352                            } else {
353                                yield TaskEvent::Completed { output: output_with_provenance };
354                            }
355                            break;
356                        }
357                        LdpMessageBody::TaskFailed { error, .. } => {
358                            yield TaskEvent::Failed { error };
359                            break;
360                        }
361                        _ => {}
362                    },
363                    Err(e) => {
364                        yield TaskEvent::Failed { error: LdpError::transport("STREAM_ERROR", e) };
365                        break;
366                    }
367                }
368            }
369        };
370
371        Ok(Box::pin(stream))
372    }
373
374    /// Poll task status.
375    #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
376    async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String> {
377        debug!(task_id = %task_id, "Polling LDP task status");
378
379        let mut query = LdpEnvelope::new(
380            "",
381            &self.config.delegate_id,
382            url,
383            LdpMessageBody::TaskUpdate {
384                task_id: task_id.to_string(),
385                progress: None,
386                message: Some("status_query".into()),
387            },
388            crate::types::payload::PayloadMode::Text,
389        );
390
391        // Sign if configured
392        if let Some(ref secret) = self.config.signing_secret {
393            crate::signing::apply_signature(&mut query, secret);
394        }
395
396        let response = self.client.send_message(url, &query).await?;
397
398        match response.body {
399            LdpMessageBody::TaskUpdate { message, .. } => {
400                let msg = message.unwrap_or_default();
401                if msg == "submitted" {
402                    Ok(TaskStatus::Submitted)
403                } else {
404                    Ok(TaskStatus::Working)
405                }
406            }
407            LdpMessageBody::TaskResult {
408                output,
409                mut provenance,
410                ..
411            } => {
412                provenance
413                    .lineage
414                    .insert(0, build_lineage_entry(&provenance, "task"));
415                provenance.normalize();
416                let contracts = self.contracts.read().await;
417                if let Some(contract) = contracts.get(task_id) {
418                    Ok(self.apply_contract_validation(contract, output, provenance))
419                } else {
420                    let output = self.embed_provenance(output, provenance);
421                    Ok(TaskStatus::Completed { output })
422                }
423            }
424            LdpMessageBody::TaskFailed { error, .. } => Ok(TaskStatus::Failed { error }),
425            _ => Ok(TaskStatus::Working),
426        }
427    }
428
429    /// Cancel a running task.
430    #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
431    async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String> {
432        info!(task_id = %task_id, "Cancelling LDP task");
433
434        let mut cancel_msg = LdpEnvelope::new(
435            "",
436            &self.config.delegate_id,
437            url,
438            LdpMessageBody::TaskCancel {
439                task_id: task_id.to_string(),
440            },
441            crate::types::payload::PayloadMode::Text,
442        );
443
444        // Sign if configured
445        if let Some(ref secret) = self.config.signing_secret {
446            crate::signing::apply_signature(&mut cancel_msg, secret);
447        }
448
449        self.client.send_message(url, &cancel_msg).await?;
450        Ok(())
451    }
452}