1use crate::types::capability::LdpCapability;
11use crate::types::error::LdpError;
12use crate::types::identity::LdpIdentityCard;
13use crate::types::messages::{LdpEnvelope, LdpMessageBody};
14use crate::types::payload::{negotiate_payload_mode, PayloadMode};
15use crate::types::provenance::Provenance;
16use crate::types::session::{LdpSession, SessionState};
17use crate::types::trust::TrustDomain;
18
19use chrono::Utc;
20use serde_json::{json, Value};
21use std::collections::HashMap;
22use std::sync::Arc;
23use tokio::sync::RwLock;
24use tracing::{debug, info};
25
26pub type TaskHandler = Arc<dyn Fn(&str, &Value) -> Value + Send + Sync>;
31
32pub struct LdpServer {
34 identity: LdpIdentityCard,
36 sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
38 tasks: Arc<RwLock<HashMap<String, TaskRecord>>>,
40 handler: TaskHandler,
42 signing_secret: Option<String>,
44}
45
46#[derive(Debug, Clone)]
48#[allow(dead_code)]
49struct TaskRecord {
50 task_id: String,
51 skill: String,
52 state: TaskRecordState,
53 output: Option<Value>,
54 error: Option<String>,
55}
56
57#[derive(Debug, Clone, PartialEq)]
58#[allow(dead_code)]
59enum TaskRecordState {
60 Submitted,
61 Working,
62 Completed,
63 Failed,
64}
65
66impl LdpServer {
67 pub fn new(identity: LdpIdentityCard, handler: TaskHandler) -> Self {
69 Self {
70 identity,
71 sessions: Arc::new(RwLock::new(HashMap::new())),
72 tasks: Arc::new(RwLock::new(HashMap::new())),
73 handler,
74 signing_secret: None,
75 }
76 }
77
78 pub fn with_signing_secret(mut self, secret: impl Into<String>) -> Self {
80 self.signing_secret = Some(secret.into());
81 self
82 }
83
84 pub fn echo_server(delegate_id: &str, name: &str) -> Self {
86 let identity = LdpIdentityCard {
87 delegate_id: delegate_id.to_string(),
88 name: name.to_string(),
89 description: Some("Echo test server".into()),
90 model_family: "TestModel".into(),
91 model_version: "1.0".into(),
92 weights_fingerprint: None,
93 trust_domain: TrustDomain::new("test-domain"),
94 context_window: 4096,
95 reasoning_profile: Some("analytical".into()),
96 cost_profile: Some("low".into()),
97 latency_profile: Some("p50:100ms".into()),
98 jurisdiction: None,
99 capabilities: vec![LdpCapability {
100 name: "echo".into(),
101 description: Some("Echoes input back".into()),
102 input_schema: None,
103 output_schema: None,
104 quality: None,
105 domains: vec![],
106 }],
107 supported_payload_modes: vec![PayloadMode::SemanticFrame, PayloadMode::Text],
108 endpoint: String::new(),
109 metadata: HashMap::new(),
110 };
111
112 let handler: TaskHandler = Arc::new(|_skill, input| {
113 json!({ "echo": input })
114 });
115
116 Self::new(identity, handler)
117 }
118
119 pub fn identity(&self) -> &LdpIdentityCard {
121 &self.identity
122 }
123
124 pub fn handle_identity_request(&self) -> Value {
126 serde_json::to_value(&self.identity).unwrap_or_default()
127 }
128
129 pub fn handle_capabilities_request(&self) -> Value {
131 json!({
132 "capabilities": self.identity.capabilities,
133 "supported_modes": self.identity.supported_payload_modes,
134 })
135 }
136
137 pub async fn handle_message(&self, envelope: LdpEnvelope) -> Result<LdpEnvelope, String> {
141 if let Some(ref secret) = self.signing_secret {
143 if let Some(ref sig) = envelope.signature {
144 if !crate::signing::verify_envelope(&envelope, secret, sig) {
145 return Err("Invalid message signature".to_string());
146 }
147 } else if !matches!(envelope.body, LdpMessageBody::Hello { .. }) {
148 return Err("Message signature required but not provided".to_string());
150 }
151 }
152
153 let mut response = match &envelope.body {
154 LdpMessageBody::Hello { delegate_id, supported_modes } => {
155 self.handle_hello(&envelope, delegate_id, supported_modes).await
156 }
157 LdpMessageBody::SessionPropose { config } => {
158 self.handle_session_propose(&envelope, config).await
159 }
160 LdpMessageBody::TaskSubmit { task_id, skill, input } => {
161 self.handle_task_submit(&envelope, task_id, skill, input).await
162 }
163 LdpMessageBody::TaskUpdate { task_id, .. } => {
164 self.handle_task_status_query(&envelope, task_id).await
165 }
166 LdpMessageBody::TaskCancel { task_id } => {
167 self.handle_task_cancel(&envelope, task_id).await
168 }
169 LdpMessageBody::SessionClose { .. } => {
170 self.handle_session_close(&envelope).await
171 }
172 _ => Err(format!("Unhandled message type")),
173 }?;
174
175 if let Some(ref secret) = self.signing_secret {
177 crate::signing::apply_signature(&mut response, secret);
178 }
179
180 Ok(response)
181 }
182
183 async fn handle_hello(
185 &self,
186 envelope: &LdpEnvelope,
187 _delegate_id: &str,
188 _supported_modes: &[PayloadMode],
189 ) -> Result<LdpEnvelope, String> {
190 info!(from = %envelope.from, "Received HELLO");
191
192 Ok(LdpEnvelope::new(
193 &envelope.session_id,
194 &self.identity.delegate_id,
195 &envelope.from,
196 LdpMessageBody::CapabilityManifest {
197 capabilities: json!({
198 "capabilities": self.identity.capabilities,
199 "supported_modes": self.identity.supported_payload_modes,
200 }),
201 },
202 PayloadMode::Text,
203 ))
204 }
205
206 async fn handle_session_propose(
208 &self,
209 envelope: &LdpEnvelope,
210 config: &Value,
211 ) -> Result<LdpEnvelope, String> {
212 let session_id = envelope.session_id.clone();
213 info!(session_id = %session_id, from = %envelope.from, "Session proposed");
214
215 let remote_domain = config
217 .get("trust_domain")
218 .and_then(|v| v.as_str())
219 .unwrap_or("unknown");
220
221 if !self.identity.trust_domain.trusts(remote_domain) {
222 let reason = format!(
223 "Trust domain '{}' not trusted by '{}'",
224 remote_domain, self.identity.trust_domain.name
225 );
226 return Ok(LdpEnvelope::new(
227 &session_id,
228 &self.identity.delegate_id,
229 &envelope.from,
230 LdpMessageBody::SessionReject {
231 reason: reason.clone(),
232 error: Some(LdpError::policy("TRUST_VIOLATION", reason)),
233 },
234 PayloadMode::Text,
235 ));
236 }
237
238 let requested_mode = config
240 .get("payload_mode")
241 .and_then(|v| serde_json::from_value::<PayloadMode>(v.clone()).ok())
242 .unwrap_or(PayloadMode::SemanticFrame);
243
244 let negotiated = negotiate_payload_mode(
246 &[requested_mode, PayloadMode::Text],
247 &self.identity.supported_payload_modes,
248 );
249
250 let now = Utc::now();
252 let ttl = config
253 .get("ttl_secs")
254 .and_then(|v| v.as_u64())
255 .unwrap_or(3600);
256
257 let session = LdpSession {
258 session_id: session_id.clone(),
259 remote_url: String::new(),
260 remote_delegate_id: envelope.from.clone(),
261 state: SessionState::Active,
262 payload: negotiated.clone(),
263 trust_domain: self.identity.trust_domain.clone(),
264 created_at: now,
265 last_used: now,
266 ttl_secs: ttl,
267 task_count: 0,
268 };
269
270 {
271 let mut sessions = self.sessions.write().await;
272 sessions.insert(session_id.clone(), session);
273 }
274
275 let response = LdpEnvelope::new(
276 &session_id,
277 &self.identity.delegate_id,
278 &envelope.from,
279 LdpMessageBody::SessionAccept {
280 session_id: session_id.clone(),
281 negotiated_mode: negotiated.mode,
282 },
283 PayloadMode::Text,
284 );
285 Ok(response)
286 }
287
288 async fn handle_task_submit(
290 &self,
291 envelope: &LdpEnvelope,
292 task_id: &str,
293 skill: &str,
294 input: &Value,
295 ) -> Result<LdpEnvelope, String> {
296 debug!(task_id = %task_id, skill = %skill, "Task submitted");
297
298 let output = (self.handler)(skill, input);
300
301 {
303 let mut tasks = self.tasks.write().await;
304 tasks.insert(
305 task_id.to_string(),
306 TaskRecord {
307 task_id: task_id.to_string(),
308 skill: skill.to_string(),
309 state: TaskRecordState::Completed,
310 output: Some(output.clone()),
311 error: None,
312 },
313 );
314 }
315
316 let provenance = Provenance::new(
318 &self.identity.delegate_id,
319 &self.identity.model_version,
320 );
321
322 let mode = {
324 let sessions = self.sessions.read().await;
325 sessions
326 .get(&envelope.session_id)
327 .map(|s| s.payload.mode)
328 .unwrap_or(PayloadMode::Text)
329 };
330
331 Ok(LdpEnvelope::new(
332 &envelope.session_id,
333 &self.identity.delegate_id,
334 &envelope.from,
335 LdpMessageBody::TaskResult {
336 task_id: task_id.to_string(),
337 output,
338 provenance,
339 },
340 mode,
341 ))
342 }
343
344 async fn handle_task_status_query(
346 &self,
347 envelope: &LdpEnvelope,
348 task_id: &str,
349 ) -> Result<LdpEnvelope, String> {
350 let tasks = self.tasks.read().await;
351
352 if let Some(record) = tasks.get(task_id) {
353 let body = match record.state {
354 TaskRecordState::Completed => LdpMessageBody::TaskResult {
355 task_id: task_id.to_string(),
356 output: record.output.clone().unwrap_or(json!(null)),
357 provenance: Provenance::new(
358 &self.identity.delegate_id,
359 &self.identity.model_version,
360 ),
361 },
362 TaskRecordState::Failed => LdpMessageBody::TaskFailed {
363 task_id: task_id.to_string(),
364 error: LdpError::runtime(
365 "TASK_FAILED",
366 record.error.clone().unwrap_or_else(|| "unknown error".into()),
367 ),
368 },
369 _ => LdpMessageBody::TaskUpdate {
370 task_id: task_id.to_string(),
371 progress: None,
372 message: Some(format!("{:?}", record.state).to_lowercase()),
373 },
374 };
375
376 Ok(LdpEnvelope::new(
377 &envelope.session_id,
378 &self.identity.delegate_id,
379 &envelope.from,
380 body,
381 PayloadMode::Text,
382 ))
383 } else {
384 Err(format!("Unknown task: {}", task_id))
385 }
386 }
387
388 async fn handle_task_cancel(
390 &self,
391 envelope: &LdpEnvelope,
392 task_id: &str,
393 ) -> Result<LdpEnvelope, String> {
394 info!(task_id = %task_id, "Task cancelled");
395
396 let mut tasks = self.tasks.write().await;
397 tasks.remove(task_id);
398
399 Ok(LdpEnvelope::new(
400 &envelope.session_id,
401 &self.identity.delegate_id,
402 &envelope.from,
403 LdpMessageBody::TaskFailed {
404 task_id: task_id.to_string(),
405 error: LdpError::runtime("CANCELLED", "Task cancelled by client"),
406 },
407 PayloadMode::Text,
408 ))
409 }
410
411 async fn handle_session_close(
413 &self,
414 envelope: &LdpEnvelope,
415 ) -> Result<LdpEnvelope, String> {
416 info!(session_id = %envelope.session_id, "Session closed");
417
418 let mut sessions = self.sessions.write().await;
419 sessions.remove(&envelope.session_id);
420
421 Ok(LdpEnvelope::new(
422 &envelope.session_id,
423 &self.identity.delegate_id,
424 &envelope.from,
425 LdpMessageBody::SessionClose {
426 reason: Some("acknowledged".into()),
427 },
428 PayloadMode::Text,
429 ))
430 }
431
432 pub async fn active_sessions(&self) -> usize {
434 self.sessions.read().await.len()
435 }
436
437 pub async fn completed_tasks(&self) -> usize {
439 self.tasks
440 .read()
441 .await
442 .values()
443 .filter(|t| t.state == TaskRecordState::Completed)
444 .count()
445 }
446}