1use crate::types::capability::LdpCapability;
11use crate::types::error::LdpError;
12use crate::types::identity::LdpIdentityCard;
13use crate::types::messages::{LdpEnvelope, LdpMessageBody};
14use crate::types::payload::{negotiate_payload_mode, PayloadMode};
15use crate::types::provenance::Provenance;
16use crate::types::session::{LdpSession, SessionState};
17use crate::types::trust::TrustDomain;
18use crate::types::verification::VerificationStatus;
19
20use chrono::Utc;
21use serde_json::{json, Value};
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::RwLock;
25use tracing::{debug, info};
26
27pub type TaskHandler = Arc<dyn Fn(&str, &Value) -> Value + Send + Sync>;
32
33pub struct LdpServer {
35 identity: LdpIdentityCard,
37 sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
39 tasks: Arc<RwLock<HashMap<String, TaskRecord>>>,
41 handler: TaskHandler,
43 signing_secret: Option<String>,
45}
46
47#[derive(Debug, Clone)]
49#[allow(dead_code)]
50struct TaskRecord {
51 task_id: String,
52 skill: String,
53 state: TaskRecordState,
54 output: Option<Value>,
55 error: Option<String>,
56}
57
58#[derive(Debug, Clone, PartialEq)]
59#[allow(dead_code)]
60enum TaskRecordState {
61 Submitted,
62 Working,
63 Completed,
64 Failed,
65}
66
67impl LdpServer {
68 pub fn new(identity: LdpIdentityCard, handler: TaskHandler) -> Self {
70 Self {
71 identity,
72 sessions: Arc::new(RwLock::new(HashMap::new())),
73 tasks: Arc::new(RwLock::new(HashMap::new())),
74 handler,
75 signing_secret: None,
76 }
77 }
78
79 pub fn with_signing_secret(mut self, secret: impl Into<String>) -> Self {
81 self.signing_secret = Some(secret.into());
82 self
83 }
84
85 pub fn echo_server(delegate_id: &str, name: &str) -> Self {
87 let identity = LdpIdentityCard {
88 delegate_id: delegate_id.to_string(),
89 name: name.to_string(),
90 description: Some("Echo test server".into()),
91 model_family: "TestModel".into(),
92 model_version: "1.0".into(),
93 weights_fingerprint: None,
94 trust_domain: TrustDomain::new("test-domain"),
95 context_window: 4096,
96 reasoning_profile: Some("analytical".into()),
97 cost_profile: Some("low".into()),
98 latency_profile: Some("p50:100ms".into()),
99 jurisdiction: None,
100 capabilities: vec![LdpCapability {
101 name: "echo".into(),
102 description: Some("Echoes input back".into()),
103 input_schema: None,
104 output_schema: None,
105 quality: None,
106 domains: vec![],
107 }],
108 supported_payload_modes: vec![PayloadMode::SemanticFrame, PayloadMode::Text],
109 endpoint: String::new(),
110 metadata: HashMap::new(),
111 };
112
113 let handler: TaskHandler = Arc::new(|_skill, input| json!({ "echo": input }));
114
115 Self::new(identity, handler)
116 }
117
118 pub fn identity(&self) -> &LdpIdentityCard {
120 &self.identity
121 }
122
123 pub fn handle_identity_request(&self) -> Value {
125 serde_json::to_value(&self.identity).unwrap_or_default()
126 }
127
128 pub fn handle_capabilities_request(&self) -> Value {
130 json!({
131 "capabilities": self.identity.capabilities,
132 "supported_modes": self.identity.supported_payload_modes,
133 })
134 }
135
136 pub async fn handle_message(&self, envelope: LdpEnvelope) -> Result<LdpEnvelope, String> {
140 if let Some(ref secret) = self.signing_secret {
142 if let Some(ref sig) = envelope.signature {
143 if !crate::signing::verify_envelope(&envelope, secret, sig) {
144 return Err("Invalid message signature".to_string());
145 }
146 } else if !matches!(envelope.body, LdpMessageBody::Hello { .. }) {
147 return Err("Message signature required but not provided".to_string());
149 }
150 }
151
152 let mut response = match &envelope.body {
153 LdpMessageBody::Hello {
154 delegate_id,
155 supported_modes,
156 } => {
157 self.handle_hello(&envelope, delegate_id, supported_modes)
158 .await
159 }
160 LdpMessageBody::SessionPropose { config } => {
161 self.handle_session_propose(&envelope, config).await
162 }
163 LdpMessageBody::TaskSubmit {
164 task_id,
165 skill,
166 input,
167 ..
168 } => {
169 self.handle_task_submit(&envelope, task_id, skill, input)
170 .await
171 }
172 LdpMessageBody::TaskUpdate { task_id, .. } => {
173 self.handle_task_status_query(&envelope, task_id).await
174 }
175 LdpMessageBody::TaskCancel { task_id } => {
176 self.handle_task_cancel(&envelope, task_id).await
177 }
178 LdpMessageBody::SessionClose { .. } => self.handle_session_close(&envelope).await,
179 _ => Err("Unhandled message type".to_string()),
180 }?;
181
182 if let Some(ref secret) = self.signing_secret {
184 crate::signing::apply_signature(&mut response, secret);
185 }
186
187 Ok(response)
188 }
189
190 async fn handle_hello(
192 &self,
193 envelope: &LdpEnvelope,
194 _delegate_id: &str,
195 _supported_modes: &[PayloadMode],
196 ) -> Result<LdpEnvelope, String> {
197 info!(from = %envelope.from, "Received HELLO");
198
199 Ok(LdpEnvelope::new(
200 &envelope.session_id,
201 &self.identity.delegate_id,
202 &envelope.from,
203 LdpMessageBody::CapabilityManifest {
204 capabilities: json!({
205 "capabilities": self.identity.capabilities,
206 "supported_modes": self.identity.supported_payload_modes,
207 }),
208 },
209 PayloadMode::Text,
210 ))
211 }
212
213 async fn handle_session_propose(
215 &self,
216 envelope: &LdpEnvelope,
217 config: &Value,
218 ) -> Result<LdpEnvelope, String> {
219 let session_id = envelope.session_id.clone();
220 info!(session_id = %session_id, from = %envelope.from, "Session proposed");
221
222 let remote_domain = config
224 .get("trust_domain")
225 .and_then(|v| v.as_str())
226 .unwrap_or("unknown");
227
228 if !self.identity.trust_domain.trusts(remote_domain) {
229 let reason = format!(
230 "Trust domain '{}' not trusted by '{}'",
231 remote_domain, self.identity.trust_domain.name
232 );
233 return Ok(LdpEnvelope::new(
234 &session_id,
235 &self.identity.delegate_id,
236 &envelope.from,
237 LdpMessageBody::SessionReject {
238 reason: reason.clone(),
239 error: Some(LdpError::policy("TRUST_VIOLATION", reason)),
240 },
241 PayloadMode::Text,
242 ));
243 }
244
245 let requested_mode = config
247 .get("payload_mode")
248 .and_then(|v| serde_json::from_value::<PayloadMode>(v.clone()).ok())
249 .unwrap_or(PayloadMode::SemanticFrame);
250
251 let negotiated = negotiate_payload_mode(
253 &[requested_mode, PayloadMode::Text],
254 &self.identity.supported_payload_modes,
255 );
256
257 let now = Utc::now();
259 let ttl = config
260 .get("ttl_secs")
261 .and_then(|v| v.as_u64())
262 .unwrap_or(3600);
263
264 let session = LdpSession {
265 session_id: session_id.clone(),
266 remote_url: String::new(),
267 remote_delegate_id: envelope.from.clone(),
268 state: SessionState::Active,
269 payload: negotiated.clone(),
270 trust_domain: self.identity.trust_domain.clone(),
271 created_at: now,
272 last_used: now,
273 ttl_secs: ttl,
274 task_count: 0,
275 };
276
277 {
278 let mut sessions = self.sessions.write().await;
279 sessions.insert(session_id.clone(), session);
280 }
281
282 let response = LdpEnvelope::new(
283 &session_id,
284 &self.identity.delegate_id,
285 &envelope.from,
286 LdpMessageBody::SessionAccept {
287 session_id: session_id.clone(),
288 negotiated_mode: negotiated.mode,
289 },
290 PayloadMode::Text,
291 );
292 Ok(response)
293 }
294
295 async fn handle_task_submit(
297 &self,
298 envelope: &LdpEnvelope,
299 task_id: &str,
300 skill: &str,
301 input: &Value,
302 ) -> Result<LdpEnvelope, String> {
303 debug!(task_id = %task_id, skill = %skill, "Task submitted");
304
305 let output = (self.handler)(skill, input);
307
308 {
310 let mut tasks = self.tasks.write().await;
311 tasks.insert(
312 task_id.to_string(),
313 TaskRecord {
314 task_id: task_id.to_string(),
315 skill: skill.to_string(),
316 state: TaskRecordState::Completed,
317 output: Some(output.clone()),
318 error: None,
319 },
320 );
321 }
322
323 let mut provenance =
325 Provenance::new(&self.identity.delegate_id, &self.identity.model_version);
326 provenance.verification_status = VerificationStatus::SelfVerified;
327 #[allow(deprecated)]
328 {
329 provenance.verified = true;
330 }
331
332 let mode = {
334 let sessions = self.sessions.read().await;
335 sessions
336 .get(&envelope.session_id)
337 .map(|s| s.payload.mode)
338 .unwrap_or(PayloadMode::Text)
339 };
340
341 Ok(LdpEnvelope::new(
342 &envelope.session_id,
343 &self.identity.delegate_id,
344 &envelope.from,
345 LdpMessageBody::TaskResult {
346 task_id: task_id.to_string(),
347 output,
348 provenance,
349 },
350 mode,
351 ))
352 }
353
354 async fn handle_task_status_query(
356 &self,
357 envelope: &LdpEnvelope,
358 task_id: &str,
359 ) -> Result<LdpEnvelope, String> {
360 let tasks = self.tasks.read().await;
361
362 if let Some(record) = tasks.get(task_id) {
363 let body = match record.state {
364 TaskRecordState::Completed => {
365 let mut provenance =
366 Provenance::new(&self.identity.delegate_id, &self.identity.model_version);
367 provenance.verification_status = VerificationStatus::SelfVerified;
368 #[allow(deprecated)]
369 {
370 provenance.verified = true;
371 }
372 LdpMessageBody::TaskResult {
373 task_id: task_id.to_string(),
374 output: record.output.clone().unwrap_or(json!(null)),
375 provenance,
376 }
377 }
378 TaskRecordState::Failed => LdpMessageBody::TaskFailed {
379 task_id: task_id.to_string(),
380 error: LdpError::runtime(
381 "TASK_FAILED",
382 record
383 .error
384 .clone()
385 .unwrap_or_else(|| "unknown error".into()),
386 ),
387 },
388 _ => LdpMessageBody::TaskUpdate {
389 task_id: task_id.to_string(),
390 progress: None,
391 message: Some(format!("{:?}", record.state).to_lowercase()),
392 },
393 };
394
395 Ok(LdpEnvelope::new(
396 &envelope.session_id,
397 &self.identity.delegate_id,
398 &envelope.from,
399 body,
400 PayloadMode::Text,
401 ))
402 } else {
403 Err(format!("Unknown task: {}", task_id))
404 }
405 }
406
407 async fn handle_task_cancel(
409 &self,
410 envelope: &LdpEnvelope,
411 task_id: &str,
412 ) -> Result<LdpEnvelope, String> {
413 info!(task_id = %task_id, "Task cancelled");
414
415 let mut tasks = self.tasks.write().await;
416 tasks.remove(task_id);
417
418 Ok(LdpEnvelope::new(
419 &envelope.session_id,
420 &self.identity.delegate_id,
421 &envelope.from,
422 LdpMessageBody::TaskFailed {
423 task_id: task_id.to_string(),
424 error: LdpError::runtime("CANCELLED", "Task cancelled by client"),
425 },
426 PayloadMode::Text,
427 ))
428 }
429
430 async fn handle_session_close(&self, envelope: &LdpEnvelope) -> Result<LdpEnvelope, String> {
432 info!(session_id = %envelope.session_id, "Session closed");
433
434 let mut sessions = self.sessions.write().await;
435 sessions.remove(&envelope.session_id);
436
437 Ok(LdpEnvelope::new(
438 &envelope.session_id,
439 &self.identity.delegate_id,
440 &envelope.from,
441 LdpMessageBody::SessionClose {
442 reason: Some("acknowledged".into()),
443 },
444 PayloadMode::Text,
445 ))
446 }
447
448 pub async fn active_sessions(&self) -> usize {
450 self.sessions.read().await.len()
451 }
452
453 pub async fn completed_tasks(&self) -> usize {
455 self.tasks
456 .read()
457 .await
458 .values()
459 .filter(|t| t.state == TaskRecordState::Completed)
460 .count()
461 }
462}