Skip to main content

ldp_protocol/
server.rs

1//! LDP server — receives LDP messages and serves identity/capabilities.
2//!
3//! A minimal LDP-compliant server that:
4//! - Serves identity cards at `GET /ldp/identity`
5//! - Serves capabilities at `GET /ldp/capabilities`
6//! - Handles protocol messages at `POST /ldp/messages`
7//! - Manages session lifecycle (accept/reject)
8//! - Dispatches tasks to a pluggable handler
9
10use crate::types::capability::LdpCapability;
11use crate::types::error::LdpError;
12use crate::types::identity::LdpIdentityCard;
13use crate::types::messages::{LdpEnvelope, LdpMessageBody};
14use crate::types::payload::{negotiate_payload_mode, PayloadMode};
15use crate::types::provenance::Provenance;
16use crate::types::session::{LdpSession, SessionState};
17use crate::types::trust::TrustDomain;
18
19use chrono::Utc;
20use serde_json::{json, Value};
21use std::collections::HashMap;
22use std::sync::Arc;
23use tokio::sync::RwLock;
24use tracing::{debug, info};
25
26/// Task handler function type.
27///
28/// Given a skill name and input, returns the output value.
29/// Used to plug in actual task execution logic.
30pub type TaskHandler = Arc<dyn Fn(&str, &Value) -> Value + Send + Sync>;
31
32/// A minimal LDP server for testing and research.
33pub struct LdpServer {
34    /// This server's identity card.
35    identity: LdpIdentityCard,
36    /// Active sessions.
37    sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
38    /// Pending/completed tasks: task_id → (state, output).
39    tasks: Arc<RwLock<HashMap<String, TaskRecord>>>,
40    /// Pluggable task handler.
41    handler: TaskHandler,
42    /// Shared secret for HMAC message signing. If None, signing is disabled.
43    signing_secret: Option<String>,
44}
45
46/// Internal task tracking record.
47#[derive(Debug, Clone)]
48#[allow(dead_code)]
49struct TaskRecord {
50    task_id: String,
51    skill: String,
52    state: TaskRecordState,
53    output: Option<Value>,
54    error: Option<String>,
55}
56
57#[derive(Debug, Clone, PartialEq)]
58#[allow(dead_code)]
59enum TaskRecordState {
60    Submitted,
61    Working,
62    Completed,
63    Failed,
64}
65
66impl LdpServer {
67    /// Create a new LDP server with the given identity and task handler.
68    pub fn new(identity: LdpIdentityCard, handler: TaskHandler) -> Self {
69        Self {
70            identity,
71            sessions: Arc::new(RwLock::new(HashMap::new())),
72            tasks: Arc::new(RwLock::new(HashMap::new())),
73            handler,
74            signing_secret: None,
75        }
76    }
77
78    /// Set a signing secret for HMAC message signing (builder pattern).
79    pub fn with_signing_secret(mut self, secret: impl Into<String>) -> Self {
80        self.signing_secret = Some(secret.into());
81        self
82    }
83
84    /// Create a test server with an echo handler (returns input as output).
85    pub fn echo_server(delegate_id: &str, name: &str) -> Self {
86        let identity = LdpIdentityCard {
87            delegate_id: delegate_id.to_string(),
88            name: name.to_string(),
89            description: Some("Echo test server".into()),
90            model_family: "TestModel".into(),
91            model_version: "1.0".into(),
92            weights_fingerprint: None,
93            trust_domain: TrustDomain::new("test-domain"),
94            context_window: 4096,
95            reasoning_profile: Some("analytical".into()),
96            cost_profile: Some("low".into()),
97            latency_profile: Some("p50:100ms".into()),
98            jurisdiction: None,
99            capabilities: vec![LdpCapability {
100                name: "echo".into(),
101                description: Some("Echoes input back".into()),
102                input_schema: None,
103                output_schema: None,
104                quality: None,
105                domains: vec![],
106            }],
107            supported_payload_modes: vec![PayloadMode::SemanticFrame, PayloadMode::Text],
108            endpoint: String::new(),
109            metadata: HashMap::new(),
110        };
111
112        let handler: TaskHandler = Arc::new(|_skill, input| {
113            json!({ "echo": input })
114        });
115
116        Self::new(identity, handler)
117    }
118
119    /// Get the identity card.
120    pub fn identity(&self) -> &LdpIdentityCard {
121        &self.identity
122    }
123
124    /// Handle a GET /ldp/identity request.
125    pub fn handle_identity_request(&self) -> Value {
126        serde_json::to_value(&self.identity).unwrap_or_default()
127    }
128
129    /// Handle a GET /ldp/capabilities request.
130    pub fn handle_capabilities_request(&self) -> Value {
131        json!({
132            "capabilities": self.identity.capabilities,
133            "supported_modes": self.identity.supported_payload_modes,
134        })
135    }
136
137    /// Handle a POST /ldp/messages request.
138    ///
139    /// Processes the incoming LDP envelope and returns a response envelope.
140    pub async fn handle_message(&self, envelope: LdpEnvelope) -> Result<LdpEnvelope, String> {
141        // Verify signature if signing is configured
142        if let Some(ref secret) = self.signing_secret {
143            if let Some(ref sig) = envelope.signature {
144                if !crate::signing::verify_envelope(&envelope, secret, sig) {
145                    return Err("Invalid message signature".to_string());
146                }
147            } else if !matches!(envelope.body, LdpMessageBody::Hello { .. }) {
148                // Allow unsigned HELLO (first contact), require signatures after
149                return Err("Message signature required but not provided".to_string());
150            }
151        }
152
153        let mut response = match &envelope.body {
154            LdpMessageBody::Hello { delegate_id, supported_modes } => {
155                self.handle_hello(&envelope, delegate_id, supported_modes).await
156            }
157            LdpMessageBody::SessionPropose { config } => {
158                self.handle_session_propose(&envelope, config).await
159            }
160            LdpMessageBody::TaskSubmit { task_id, skill, input } => {
161                self.handle_task_submit(&envelope, task_id, skill, input).await
162            }
163            LdpMessageBody::TaskUpdate { task_id, .. } => {
164                self.handle_task_status_query(&envelope, task_id).await
165            }
166            LdpMessageBody::TaskCancel { task_id } => {
167                self.handle_task_cancel(&envelope, task_id).await
168            }
169            LdpMessageBody::SessionClose { .. } => {
170                self.handle_session_close(&envelope).await
171            }
172            _ => Err(format!("Unhandled message type")),
173        }?;
174
175        // Sign outgoing response
176        if let Some(ref secret) = self.signing_secret {
177            crate::signing::apply_signature(&mut response, secret);
178        }
179
180        Ok(response)
181    }
182
183    /// Handle HELLO — respond with CAPABILITY_MANIFEST.
184    async fn handle_hello(
185        &self,
186        envelope: &LdpEnvelope,
187        _delegate_id: &str,
188        _supported_modes: &[PayloadMode],
189    ) -> Result<LdpEnvelope, String> {
190        info!(from = %envelope.from, "Received HELLO");
191
192        Ok(LdpEnvelope::new(
193            &envelope.session_id,
194            &self.identity.delegate_id,
195            &envelope.from,
196            LdpMessageBody::CapabilityManifest {
197                capabilities: json!({
198                    "capabilities": self.identity.capabilities,
199                    "supported_modes": self.identity.supported_payload_modes,
200                }),
201            },
202            PayloadMode::Text,
203        ))
204    }
205
206    /// Handle SESSION_PROPOSE — accept the session.
207    async fn handle_session_propose(
208        &self,
209        envelope: &LdpEnvelope,
210        config: &Value,
211    ) -> Result<LdpEnvelope, String> {
212        let session_id = envelope.session_id.clone();
213        info!(session_id = %session_id, from = %envelope.from, "Session proposed");
214
215        // Validate trust domain
216        let remote_domain = config
217            .get("trust_domain")
218            .and_then(|v| v.as_str())
219            .unwrap_or("unknown");
220
221        if !self.identity.trust_domain.trusts(remote_domain) {
222            let reason = format!(
223                "Trust domain '{}' not trusted by '{}'",
224                remote_domain, self.identity.trust_domain.name
225            );
226            return Ok(LdpEnvelope::new(
227                &session_id,
228                &self.identity.delegate_id,
229                &envelope.from,
230                LdpMessageBody::SessionReject {
231                    reason: reason.clone(),
232                    error: Some(LdpError::policy("TRUST_VIOLATION", reason)),
233                },
234                PayloadMode::Text,
235            ));
236        }
237
238        // Extract requested payload mode (default to SemanticFrame).
239        let requested_mode = config
240            .get("payload_mode")
241            .and_then(|v| serde_json::from_value::<PayloadMode>(v.clone()).ok())
242            .unwrap_or(PayloadMode::SemanticFrame);
243
244        // Negotiate payload mode.
245        let negotiated = negotiate_payload_mode(
246            &[requested_mode, PayloadMode::Text],
247            &self.identity.supported_payload_modes,
248        );
249
250        // Create session.
251        let now = Utc::now();
252        let ttl = config
253            .get("ttl_secs")
254            .and_then(|v| v.as_u64())
255            .unwrap_or(3600);
256
257        let session = LdpSession {
258            session_id: session_id.clone(),
259            remote_url: String::new(),
260            remote_delegate_id: envelope.from.clone(),
261            state: SessionState::Active,
262            payload: negotiated.clone(),
263            trust_domain: self.identity.trust_domain.clone(),
264            created_at: now,
265            last_used: now,
266            ttl_secs: ttl,
267            task_count: 0,
268        };
269
270        {
271            let mut sessions = self.sessions.write().await;
272            sessions.insert(session_id.clone(), session);
273        }
274
275        let response = LdpEnvelope::new(
276            &session_id,
277            &self.identity.delegate_id,
278            &envelope.from,
279            LdpMessageBody::SessionAccept {
280                session_id: session_id.clone(),
281                negotiated_mode: negotiated.mode,
282            },
283            PayloadMode::Text,
284        );
285        Ok(response)
286    }
287
288    /// Handle TASK_SUBMIT — execute the task immediately and return result.
289    async fn handle_task_submit(
290        &self,
291        envelope: &LdpEnvelope,
292        task_id: &str,
293        skill: &str,
294        input: &Value,
295    ) -> Result<LdpEnvelope, String> {
296        debug!(task_id = %task_id, skill = %skill, "Task submitted");
297
298        // Execute the task using the handler.
299        let output = (self.handler)(skill, input);
300
301        // Store the task record.
302        {
303            let mut tasks = self.tasks.write().await;
304            tasks.insert(
305                task_id.to_string(),
306                TaskRecord {
307                    task_id: task_id.to_string(),
308                    skill: skill.to_string(),
309                    state: TaskRecordState::Completed,
310                    output: Some(output.clone()),
311                    error: None,
312                },
313            );
314        }
315
316        // Build provenance.
317        let provenance = Provenance::new(
318            &self.identity.delegate_id,
319            &self.identity.model_version,
320        );
321
322        // Determine payload mode from session.
323        let mode = {
324            let sessions = self.sessions.read().await;
325            sessions
326                .get(&envelope.session_id)
327                .map(|s| s.payload.mode)
328                .unwrap_or(PayloadMode::Text)
329        };
330
331        Ok(LdpEnvelope::new(
332            &envelope.session_id,
333            &self.identity.delegate_id,
334            &envelope.from,
335            LdpMessageBody::TaskResult {
336                task_id: task_id.to_string(),
337                output,
338                provenance,
339            },
340            mode,
341        ))
342    }
343
344    /// Handle task status query — return current task state.
345    async fn handle_task_status_query(
346        &self,
347        envelope: &LdpEnvelope,
348        task_id: &str,
349    ) -> Result<LdpEnvelope, String> {
350        let tasks = self.tasks.read().await;
351
352        if let Some(record) = tasks.get(task_id) {
353            let body = match record.state {
354                TaskRecordState::Completed => LdpMessageBody::TaskResult {
355                    task_id: task_id.to_string(),
356                    output: record.output.clone().unwrap_or(json!(null)),
357                    provenance: Provenance::new(
358                        &self.identity.delegate_id,
359                        &self.identity.model_version,
360                    ),
361                },
362                TaskRecordState::Failed => LdpMessageBody::TaskFailed {
363                    task_id: task_id.to_string(),
364                    error: LdpError::runtime(
365                        "TASK_FAILED",
366                        record.error.clone().unwrap_or_else(|| "unknown error".into()),
367                    ),
368                },
369                _ => LdpMessageBody::TaskUpdate {
370                    task_id: task_id.to_string(),
371                    progress: None,
372                    message: Some(format!("{:?}", record.state).to_lowercase()),
373                },
374            };
375
376            Ok(LdpEnvelope::new(
377                &envelope.session_id,
378                &self.identity.delegate_id,
379                &envelope.from,
380                body,
381                PayloadMode::Text,
382            ))
383        } else {
384            Err(format!("Unknown task: {}", task_id))
385        }
386    }
387
388    /// Handle TASK_CANCEL.
389    async fn handle_task_cancel(
390        &self,
391        envelope: &LdpEnvelope,
392        task_id: &str,
393    ) -> Result<LdpEnvelope, String> {
394        info!(task_id = %task_id, "Task cancelled");
395
396        let mut tasks = self.tasks.write().await;
397        tasks.remove(task_id);
398
399        Ok(LdpEnvelope::new(
400            &envelope.session_id,
401            &self.identity.delegate_id,
402            &envelope.from,
403            LdpMessageBody::TaskFailed {
404                task_id: task_id.to_string(),
405                error: LdpError::runtime("CANCELLED", "Task cancelled by client"),
406            },
407            PayloadMode::Text,
408        ))
409    }
410
411    /// Handle SESSION_CLOSE.
412    async fn handle_session_close(
413        &self,
414        envelope: &LdpEnvelope,
415    ) -> Result<LdpEnvelope, String> {
416        info!(session_id = %envelope.session_id, "Session closed");
417
418        let mut sessions = self.sessions.write().await;
419        sessions.remove(&envelope.session_id);
420
421        Ok(LdpEnvelope::new(
422            &envelope.session_id,
423            &self.identity.delegate_id,
424            &envelope.from,
425            LdpMessageBody::SessionClose {
426                reason: Some("acknowledged".into()),
427            },
428            PayloadMode::Text,
429        ))
430    }
431
432    /// Get active session count.
433    pub async fn active_sessions(&self) -> usize {
434        self.sessions.read().await.len()
435    }
436
437    /// Get completed task count.
438    pub async fn completed_tasks(&self) -> usize {
439        self.tasks
440            .read()
441            .await
442            .values()
443            .filter(|t| t.state == TaskRecordState::Completed)
444            .count()
445    }
446}