Skip to main content

ldp_protocol/
adapter.rs

1//! LDP protocol adapter — implements the `ProtocolAdapter` trait.
2//!
3//! This is the primary integration point. The adapter:
4//! - Translates `discover/invoke/stream/status/cancel` into LDP messages
5//! - Manages sessions transparently (callers see request->response)
6//! - Attaches provenance to all results
7//! - Enforces trust domain boundaries
8
9use crate::client::LdpClient;
10use crate::config::LdpAdapterConfig;
11use crate::protocol::{
12    ProtocolAdapter, RemoteCapabilities, RemoteSkill, TaskEvent, TaskHandle, TaskRequest,
13    TaskStatus, TaskStream,
14};
15use crate::session_manager::SessionManager;
16use crate::types::error::LdpError;
17use crate::types::messages::{LdpEnvelope, LdpMessageBody};
18use crate::types::provenance::Provenance;
19
20use async_trait::async_trait;
21use serde_json::{json, Value};
22use tracing::{debug, info, instrument};
23
24/// LDP protocol adapter.
25///
26/// Can be used standalone or registered with a `ProtocolRegistry`
27/// (including JamJet's registry via the `jamjet` feature).
28pub struct LdpAdapter {
29    session_manager: SessionManager,
30    client: LdpClient,
31    config: LdpAdapterConfig,
32}
33
34impl LdpAdapter {
35    /// Create a new LDP adapter with the given configuration.
36    pub fn new(config: LdpAdapterConfig) -> Self {
37        let client = LdpClient::new();
38        let session_manager = SessionManager::new(client.clone(), config.clone());
39        Self {
40            session_manager,
41            client,
42            config,
43        }
44    }
45
46    /// Create with a custom HTTP client (useful for testing).
47    pub fn with_client(config: LdpAdapterConfig, client: LdpClient) -> Self {
48        let session_manager = SessionManager::new(client.clone(), config.clone());
49        Self {
50            session_manager,
51            client,
52            config,
53        }
54    }
55
56    /// Get the session manager (for external session control).
57    pub fn session_manager(&self) -> &SessionManager {
58        &self.session_manager
59    }
60
61    /// Convert an LDP identity card to RemoteCapabilities.
62    fn identity_to_capabilities(
63        &self,
64        identity: &crate::types::identity::LdpIdentityCard,
65    ) -> RemoteCapabilities {
66        let skills = identity
67            .capabilities
68            .iter()
69            .map(|cap| RemoteSkill {
70                name: cap.name.clone(),
71                description: cap.description.clone(),
72                input_schema: cap.input_schema.clone(),
73                output_schema: cap.output_schema.clone(),
74            })
75            .collect();
76
77        RemoteCapabilities {
78            name: identity.name.clone(),
79            description: identity.description.clone(),
80            skills,
81            protocols: vec!["ldp".into()],
82        }
83    }
84
85    /// Embed provenance into a task output Value.
86    fn embed_provenance(&self, output: Value, provenance: Provenance) -> Value {
87        if self.config.attach_provenance {
88            match output {
89                Value::Object(mut map) => {
90                    map.insert("ldp_provenance".into(), provenance.to_value());
91                    Value::Object(map)
92                }
93                other => {
94                    json!({
95                        "result": other,
96                        "ldp_provenance": provenance.to_value()
97                    })
98                }
99            }
100        } else {
101            output
102        }
103    }
104}
105
106#[async_trait]
107impl ProtocolAdapter for LdpAdapter {
108    /// Discover remote delegate capabilities.
109    ///
110    /// 1. Fetch LDP identity card
111    /// 2. Validate trust domain (if configured)
112    /// 3. Map to RemoteCapabilities
113    #[instrument(skip(self), fields(url = %url))]
114    async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String> {
115        info!(url = %url, "Discovering LDP delegate");
116
117        // Fetch identity card.
118        let identity = self.client.fetch_identity_card(url).await?;
119
120        // Trust domain check.
121        if self.config.enforce_trust_domains {
122            if !self.config.trust_domain.trusts(&identity.trust_domain.name) {
123                return Err(format!(
124                    "Trust domain '{}' is not trusted by '{}'",
125                    identity.trust_domain.name, self.config.trust_domain.name
126                ));
127            }
128        }
129
130        // Convert to RemoteCapabilities.
131        let capabilities = self.identity_to_capabilities(&identity);
132        debug!(
133            name = %capabilities.name,
134            skills = capabilities.skills.len(),
135            "LDP delegate discovered"
136        );
137
138        Ok(capabilities)
139    }
140
141    /// Submit a task to an LDP delegate.
142    ///
143    /// 1. Get or establish session (transparent to caller)
144    /// 2. Send TASK_SUBMIT within session
145    /// 3. Return TaskHandle
146    #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
147    async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String> {
148        info!(url = %url, skill = %task.skill, "Invoking LDP task");
149
150        // Step 1: Get or establish session.
151        let session = self.session_manager.get_or_establish(url).await?;
152
153        // Step 2: Send TASK_SUBMIT.
154        let task_id = uuid::Uuid::new_v4().to_string();
155        let mut submit = LdpEnvelope::new(
156            &session.session_id,
157            &self.config.delegate_id,
158            &session.remote_delegate_id,
159            LdpMessageBody::TaskSubmit {
160                task_id: task_id.clone(),
161                skill: task.skill.clone(),
162                input: task.input.clone(),
163            },
164            session.payload.mode,
165        );
166
167        // Sign if configured
168        if let Some(ref secret) = self.config.signing_secret {
169            crate::signing::apply_signature(&mut submit, secret);
170        }
171
172        let _response = self.client.send_message(url, &submit).await?;
173
174        // Touch session (update last_used, increment task count).
175        self.session_manager.touch(url).await;
176
177        debug!(task_id = %task_id, "LDP task submitted");
178
179        Ok(TaskHandle {
180            task_id,
181            remote_url: url.to_string(),
182        })
183    }
184
185    /// Stream task progress events.
186    ///
187    /// Submits the task, then polls for updates until completion.
188    /// In a full implementation, this would use SSE or WebSocket.
189    #[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
190    async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String> {
191        let handle = self.invoke(url, task).await?;
192        let client = self.client.clone();
193        let config = self.config.clone();
194        let url = url.to_string();
195        let task_id = handle.task_id.clone();
196
197        // Poll-based streaming: periodically check task status.
198        // In production, replace with SSE subscription.
199        let stream = async_stream::stream! {
200            let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
201            loop {
202                interval.tick().await;
203
204                // Build a status query envelope.
205                let mut status_query = LdpEnvelope::new(
206                    "",
207                    &config.delegate_id,
208                    &url,
209                    LdpMessageBody::TaskUpdate {
210                        task_id: task_id.clone(),
211                        progress: None,
212                        message: Some("status_query".into()),
213                    },
214                    crate::types::payload::PayloadMode::Text,
215                );
216
217                // Sign if configured
218                if let Some(ref secret) = config.signing_secret {
219                    crate::signing::apply_signature(&mut status_query, secret);
220                }
221
222                match client.send_message(&url, &status_query).await {
223                    Ok(response) => match response.body {
224                        LdpMessageBody::TaskUpdate { progress, message, .. } => {
225                            yield TaskEvent::Progress {
226                                message: message.unwrap_or_default(),
227                                progress,
228                            };
229                        }
230                        LdpMessageBody::TaskResult { output, provenance, .. } => {
231                            let output_with_provenance = if config.attach_provenance {
232                                match output {
233                                    Value::Object(mut map) => {
234                                        map.insert("ldp_provenance".into(),
235                                            provenance.to_value());
236                                        Value::Object(map)
237                                    }
238                                    other => json!({
239                                        "result": other,
240                                        "ldp_provenance": provenance.to_value()
241                                    }),
242                                }
243                            } else {
244                                output
245                            };
246                            yield TaskEvent::Completed { output: output_with_provenance };
247                            break;
248                        }
249                        LdpMessageBody::TaskFailed { error, .. } => {
250                            yield TaskEvent::Failed { error };
251                            break;
252                        }
253                        _ => {}
254                    },
255                    Err(e) => {
256                        yield TaskEvent::Failed { error: LdpError::transport("STREAM_ERROR", e) };
257                        break;
258                    }
259                }
260            }
261        };
262
263        Ok(Box::pin(stream))
264    }
265
266    /// Poll task status.
267    #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
268    async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String> {
269        debug!(task_id = %task_id, "Polling LDP task status");
270
271        let mut query = LdpEnvelope::new(
272            "",
273            &self.config.delegate_id,
274            url,
275            LdpMessageBody::TaskUpdate {
276                task_id: task_id.to_string(),
277                progress: None,
278                message: Some("status_query".into()),
279            },
280            crate::types::payload::PayloadMode::Text,
281        );
282
283        // Sign if configured
284        if let Some(ref secret) = self.config.signing_secret {
285            crate::signing::apply_signature(&mut query, secret);
286        }
287
288        let response = self.client.send_message(url, &query).await?;
289
290        match response.body {
291            LdpMessageBody::TaskUpdate { message, .. } => {
292                let msg = message.unwrap_or_default();
293                if msg == "submitted" {
294                    Ok(TaskStatus::Submitted)
295                } else {
296                    Ok(TaskStatus::Working)
297                }
298            }
299            LdpMessageBody::TaskResult { output, provenance, .. } => {
300                let output = self.embed_provenance(output, provenance);
301                Ok(TaskStatus::Completed { output })
302            }
303            LdpMessageBody::TaskFailed { error, .. } => Ok(TaskStatus::Failed { error }),
304            _ => Ok(TaskStatus::Working),
305        }
306    }
307
308    /// Cancel a running task.
309    #[instrument(skip(self), fields(url = %url, task_id = %task_id))]
310    async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String> {
311        info!(task_id = %task_id, "Cancelling LDP task");
312
313        let mut cancel_msg = LdpEnvelope::new(
314            "",
315            &self.config.delegate_id,
316            url,
317            LdpMessageBody::TaskCancel {
318                task_id: task_id.to_string(),
319            },
320            crate::types::payload::PayloadMode::Text,
321        );
322
323        // Sign if configured
324        if let Some(ref secret) = self.config.signing_secret {
325            crate::signing::apply_signature(&mut cancel_msg, secret);
326        }
327
328        self.client.send_message(url, &cancel_msg).await?;
329        Ok(())
330    }
331}