Skip to main content

ldp_protocol/
server.rs

1//! LDP server — receives LDP messages and serves identity/capabilities.
2//!
3//! A minimal LDP-compliant server that:
4//! - Serves identity cards at `GET /ldp/identity`
5//! - Serves capabilities at `GET /ldp/capabilities`
6//! - Handles protocol messages at `POST /ldp/messages`
7//! - Manages session lifecycle (accept/reject)
8//! - Dispatches tasks to a pluggable handler
9
10use crate::types::capability::LdpCapability;
11use crate::types::error::LdpError;
12use crate::types::identity::LdpIdentityCard;
13use crate::types::messages::{LdpEnvelope, LdpMessageBody};
14use crate::types::payload::{negotiate_payload_mode, PayloadMode};
15use crate::types::provenance::Provenance;
16use crate::types::session::{LdpSession, SessionState};
17use crate::types::trust::TrustDomain;
18use crate::types::verification::VerificationStatus;
19
20use chrono::Utc;
21use serde_json::{json, Value};
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::RwLock;
25use tracing::{debug, info};
26
27/// Task handler function type.
28///
29/// Given a skill name and input, returns the output value.
30/// Used to plug in actual task execution logic.
31pub type TaskHandler = Arc<dyn Fn(&str, &Value) -> Value + Send + Sync>;
32
33/// A minimal LDP server for testing and research.
34pub struct LdpServer {
35    /// This server's identity card.
36    identity: LdpIdentityCard,
37    /// Active sessions.
38    sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
39    /// Pending/completed tasks: task_id → (state, output).
40    tasks: Arc<RwLock<HashMap<String, TaskRecord>>>,
41    /// Pluggable task handler.
42    handler: TaskHandler,
43    /// Shared secret for HMAC message signing. If None, signing is disabled.
44    signing_secret: Option<String>,
45}
46
47/// Internal task tracking record.
48#[derive(Debug, Clone)]
49#[allow(dead_code)]
50struct TaskRecord {
51    task_id: String,
52    skill: String,
53    state: TaskRecordState,
54    output: Option<Value>,
55    error: Option<String>,
56}
57
58#[derive(Debug, Clone, PartialEq)]
59#[allow(dead_code)]
60enum TaskRecordState {
61    Submitted,
62    Working,
63    Completed,
64    Failed,
65}
66
67impl LdpServer {
68    /// Create a new LDP server with the given identity and task handler.
69    pub fn new(identity: LdpIdentityCard, handler: TaskHandler) -> Self {
70        Self {
71            identity,
72            sessions: Arc::new(RwLock::new(HashMap::new())),
73            tasks: Arc::new(RwLock::new(HashMap::new())),
74            handler,
75            signing_secret: None,
76        }
77    }
78
79    /// Set a signing secret for HMAC message signing (builder pattern).
80    pub fn with_signing_secret(mut self, secret: impl Into<String>) -> Self {
81        self.signing_secret = Some(secret.into());
82        self
83    }
84
85    /// Create a test server with an echo handler (returns input as output).
86    pub fn echo_server(delegate_id: &str, name: &str) -> Self {
87        let identity = LdpIdentityCard {
88            delegate_id: delegate_id.to_string(),
89            name: name.to_string(),
90            description: Some("Echo test server".into()),
91            model_family: "TestModel".into(),
92            model_version: "1.0".into(),
93            weights_fingerprint: None,
94            trust_domain: TrustDomain::new("test-domain"),
95            context_window: 4096,
96            reasoning_profile: Some("analytical".into()),
97            cost_profile: Some("low".into()),
98            latency_profile: Some("p50:100ms".into()),
99            jurisdiction: None,
100            capabilities: vec![LdpCapability {
101                name: "echo".into(),
102                description: Some("Echoes input back".into()),
103                input_schema: None,
104                output_schema: None,
105                quality: None,
106                domains: vec![],
107            }],
108            supported_payload_modes: vec![PayloadMode::SemanticFrame, PayloadMode::Text],
109            endpoint: String::new(),
110            metadata: HashMap::new(),
111        };
112
113        let handler: TaskHandler = Arc::new(|_skill, input| json!({ "echo": input }));
114
115        Self::new(identity, handler)
116    }
117
118    /// Get the identity card.
119    pub fn identity(&self) -> &LdpIdentityCard {
120        &self.identity
121    }
122
123    /// Handle a GET /ldp/identity request.
124    pub fn handle_identity_request(&self) -> Value {
125        serde_json::to_value(&self.identity).unwrap_or_default()
126    }
127
128    /// Handle a GET /ldp/capabilities request.
129    pub fn handle_capabilities_request(&self) -> Value {
130        json!({
131            "capabilities": self.identity.capabilities,
132            "supported_modes": self.identity.supported_payload_modes,
133        })
134    }
135
136    /// Handle a POST /ldp/messages request.
137    ///
138    /// Processes the incoming LDP envelope and returns a response envelope.
139    pub async fn handle_message(&self, envelope: LdpEnvelope) -> Result<LdpEnvelope, String> {
140        // Verify signature if signing is configured
141        if let Some(ref secret) = self.signing_secret {
142            if let Some(ref sig) = envelope.signature {
143                if !crate::signing::verify_envelope(&envelope, secret, sig) {
144                    return Err("Invalid message signature".to_string());
145                }
146            } else if !matches!(envelope.body, LdpMessageBody::Hello { .. }) {
147                // Allow unsigned HELLO (first contact), require signatures after
148                return Err("Message signature required but not provided".to_string());
149            }
150        }
151
152        let mut response = match &envelope.body {
153            LdpMessageBody::Hello {
154                delegate_id,
155                supported_modes,
156            } => {
157                self.handle_hello(&envelope, delegate_id, supported_modes)
158                    .await
159            }
160            LdpMessageBody::SessionPropose { config } => {
161                self.handle_session_propose(&envelope, config).await
162            }
163            LdpMessageBody::TaskSubmit {
164                task_id,
165                skill,
166                input,
167                ..
168            } => {
169                self.handle_task_submit(&envelope, task_id, skill, input)
170                    .await
171            }
172            LdpMessageBody::TaskUpdate { task_id, .. } => {
173                self.handle_task_status_query(&envelope, task_id).await
174            }
175            LdpMessageBody::TaskCancel { task_id } => {
176                self.handle_task_cancel(&envelope, task_id).await
177            }
178            LdpMessageBody::SessionClose { .. } => self.handle_session_close(&envelope).await,
179            _ => Err("Unhandled message type".to_string()),
180        }?;
181
182        // Sign outgoing response
183        if let Some(ref secret) = self.signing_secret {
184            crate::signing::apply_signature(&mut response, secret);
185        }
186
187        Ok(response)
188    }
189
190    /// Handle HELLO — respond with CAPABILITY_MANIFEST.
191    async fn handle_hello(
192        &self,
193        envelope: &LdpEnvelope,
194        _delegate_id: &str,
195        _supported_modes: &[PayloadMode],
196    ) -> Result<LdpEnvelope, String> {
197        info!(from = %envelope.from, "Received HELLO");
198
199        Ok(LdpEnvelope::new(
200            &envelope.session_id,
201            &self.identity.delegate_id,
202            &envelope.from,
203            LdpMessageBody::CapabilityManifest {
204                capabilities: json!({
205                    "capabilities": self.identity.capabilities,
206                    "supported_modes": self.identity.supported_payload_modes,
207                }),
208            },
209            PayloadMode::Text,
210        ))
211    }
212
213    /// Handle SESSION_PROPOSE — accept the session.
214    async fn handle_session_propose(
215        &self,
216        envelope: &LdpEnvelope,
217        config: &Value,
218    ) -> Result<LdpEnvelope, String> {
219        let session_id = envelope.session_id.clone();
220        info!(session_id = %session_id, from = %envelope.from, "Session proposed");
221
222        // Validate trust domain
223        let remote_domain = config
224            .get("trust_domain")
225            .and_then(|v| v.as_str())
226            .unwrap_or("unknown");
227
228        if !self.identity.trust_domain.trusts(remote_domain) {
229            let reason = format!(
230                "Trust domain '{}' not trusted by '{}'",
231                remote_domain, self.identity.trust_domain.name
232            );
233            return Ok(LdpEnvelope::new(
234                &session_id,
235                &self.identity.delegate_id,
236                &envelope.from,
237                LdpMessageBody::SessionReject {
238                    reason: reason.clone(),
239                    error: Some(LdpError::policy("TRUST_VIOLATION", reason)),
240                },
241                PayloadMode::Text,
242            ));
243        }
244
245        // Extract requested payload mode (default to SemanticFrame).
246        let requested_mode = config
247            .get("payload_mode")
248            .and_then(|v| serde_json::from_value::<PayloadMode>(v.clone()).ok())
249            .unwrap_or(PayloadMode::SemanticFrame);
250
251        // Negotiate payload mode.
252        let negotiated = negotiate_payload_mode(
253            &[requested_mode, PayloadMode::Text],
254            &self.identity.supported_payload_modes,
255        );
256
257        // Create session.
258        let now = Utc::now();
259        let ttl = config
260            .get("ttl_secs")
261            .and_then(|v| v.as_u64())
262            .unwrap_or(3600);
263
264        let session = LdpSession {
265            session_id: session_id.clone(),
266            remote_url: String::new(),
267            remote_delegate_id: envelope.from.clone(),
268            state: SessionState::Active,
269            payload: negotiated.clone(),
270            trust_domain: self.identity.trust_domain.clone(),
271            created_at: now,
272            last_used: now,
273            ttl_secs: ttl,
274            task_count: 0,
275        };
276
277        {
278            let mut sessions = self.sessions.write().await;
279            sessions.insert(session_id.clone(), session);
280        }
281
282        let response = LdpEnvelope::new(
283            &session_id,
284            &self.identity.delegate_id,
285            &envelope.from,
286            LdpMessageBody::SessionAccept {
287                session_id: session_id.clone(),
288                negotiated_mode: negotiated.mode,
289            },
290            PayloadMode::Text,
291        );
292        Ok(response)
293    }
294
295    /// Handle TASK_SUBMIT — execute the task immediately and return result.
296    async fn handle_task_submit(
297        &self,
298        envelope: &LdpEnvelope,
299        task_id: &str,
300        skill: &str,
301        input: &Value,
302    ) -> Result<LdpEnvelope, String> {
303        debug!(task_id = %task_id, skill = %skill, "Task submitted");
304
305        // Execute the task using the handler.
306        let output = (self.handler)(skill, input);
307
308        // Store the task record.
309        {
310            let mut tasks = self.tasks.write().await;
311            tasks.insert(
312                task_id.to_string(),
313                TaskRecord {
314                    task_id: task_id.to_string(),
315                    skill: skill.to_string(),
316                    state: TaskRecordState::Completed,
317                    output: Some(output.clone()),
318                    error: None,
319                },
320            );
321        }
322
323        // Build provenance.
324        let mut provenance =
325            Provenance::new(&self.identity.delegate_id, &self.identity.model_version);
326        provenance.verification_status = VerificationStatus::SelfVerified;
327        #[allow(deprecated)]
328        {
329            provenance.verified = true;
330        }
331
332        // Determine payload mode from session.
333        let mode = {
334            let sessions = self.sessions.read().await;
335            sessions
336                .get(&envelope.session_id)
337                .map(|s| s.payload.mode)
338                .unwrap_or(PayloadMode::Text)
339        };
340
341        Ok(LdpEnvelope::new(
342            &envelope.session_id,
343            &self.identity.delegate_id,
344            &envelope.from,
345            LdpMessageBody::TaskResult {
346                task_id: task_id.to_string(),
347                output,
348                provenance,
349            },
350            mode,
351        ))
352    }
353
354    /// Handle task status query — return current task state.
355    async fn handle_task_status_query(
356        &self,
357        envelope: &LdpEnvelope,
358        task_id: &str,
359    ) -> Result<LdpEnvelope, String> {
360        let tasks = self.tasks.read().await;
361
362        if let Some(record) = tasks.get(task_id) {
363            let body = match record.state {
364                TaskRecordState::Completed => {
365                    let mut provenance =
366                        Provenance::new(&self.identity.delegate_id, &self.identity.model_version);
367                    provenance.verification_status = VerificationStatus::SelfVerified;
368                    #[allow(deprecated)]
369                    {
370                        provenance.verified = true;
371                    }
372                    LdpMessageBody::TaskResult {
373                        task_id: task_id.to_string(),
374                        output: record.output.clone().unwrap_or(json!(null)),
375                        provenance,
376                    }
377                }
378                TaskRecordState::Failed => LdpMessageBody::TaskFailed {
379                    task_id: task_id.to_string(),
380                    error: LdpError::runtime(
381                        "TASK_FAILED",
382                        record
383                            .error
384                            .clone()
385                            .unwrap_or_else(|| "unknown error".into()),
386                    ),
387                },
388                _ => LdpMessageBody::TaskUpdate {
389                    task_id: task_id.to_string(),
390                    progress: None,
391                    message: Some(format!("{:?}", record.state).to_lowercase()),
392                },
393            };
394
395            Ok(LdpEnvelope::new(
396                &envelope.session_id,
397                &self.identity.delegate_id,
398                &envelope.from,
399                body,
400                PayloadMode::Text,
401            ))
402        } else {
403            Err(format!("Unknown task: {}", task_id))
404        }
405    }
406
407    /// Handle TASK_CANCEL.
408    async fn handle_task_cancel(
409        &self,
410        envelope: &LdpEnvelope,
411        task_id: &str,
412    ) -> Result<LdpEnvelope, String> {
413        info!(task_id = %task_id, "Task cancelled");
414
415        let mut tasks = self.tasks.write().await;
416        tasks.remove(task_id);
417
418        Ok(LdpEnvelope::new(
419            &envelope.session_id,
420            &self.identity.delegate_id,
421            &envelope.from,
422            LdpMessageBody::TaskFailed {
423                task_id: task_id.to_string(),
424                error: LdpError::runtime("CANCELLED", "Task cancelled by client"),
425            },
426            PayloadMode::Text,
427        ))
428    }
429
430    /// Handle SESSION_CLOSE.
431    async fn handle_session_close(&self, envelope: &LdpEnvelope) -> Result<LdpEnvelope, String> {
432        info!(session_id = %envelope.session_id, "Session closed");
433
434        let mut sessions = self.sessions.write().await;
435        sessions.remove(&envelope.session_id);
436
437        Ok(LdpEnvelope::new(
438            &envelope.session_id,
439            &self.identity.delegate_id,
440            &envelope.from,
441            LdpMessageBody::SessionClose {
442                reason: Some("acknowledged".into()),
443            },
444            PayloadMode::Text,
445        ))
446    }
447
448    /// Get active session count.
449    pub async fn active_sessions(&self) -> usize {
450        self.sessions.read().await.len()
451    }
452
453    /// Get completed task count.
454    pub async fn completed_tasks(&self) -> usize {
455        self.tasks
456            .read()
457            .await
458            .values()
459            .filter(|t| t.state == TaskRecordState::Completed)
460            .count()
461    }
462}