use crate::types::capability::LdpCapability;
use crate::types::error::LdpError;
use crate::types::identity::LdpIdentityCard;
use crate::types::messages::{LdpEnvelope, LdpMessageBody};
use crate::types::payload::{negotiate_payload_mode, PayloadMode};
use crate::types::provenance::Provenance;
use crate::types::session::{LdpSession, SessionState};
use crate::types::trust::TrustDomain;
use crate::types::verification::VerificationStatus;
use chrono::Utc;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info};
pub type TaskHandler = Arc<dyn Fn(&str, &Value) -> Value + Send + Sync>;
pub struct LdpServer {
identity: LdpIdentityCard,
sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
tasks: Arc<RwLock<HashMap<String, TaskRecord>>>,
handler: TaskHandler,
signing_secret: Option<String>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct TaskRecord {
task_id: String,
skill: String,
state: TaskRecordState,
output: Option<Value>,
error: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
#[allow(dead_code)]
enum TaskRecordState {
Submitted,
Working,
Completed,
Failed,
}
impl LdpServer {
pub fn new(identity: LdpIdentityCard, handler: TaskHandler) -> Self {
Self {
identity,
sessions: Arc::new(RwLock::new(HashMap::new())),
tasks: Arc::new(RwLock::new(HashMap::new())),
handler,
signing_secret: None,
}
}
pub fn with_signing_secret(mut self, secret: impl Into<String>) -> Self {
self.signing_secret = Some(secret.into());
self
}
pub fn echo_server(delegate_id: &str, name: &str) -> Self {
let identity = LdpIdentityCard {
delegate_id: delegate_id.to_string(),
name: name.to_string(),
description: Some("Echo test server".into()),
model_family: "TestModel".into(),
model_version: "1.0".into(),
weights_fingerprint: None,
trust_domain: TrustDomain::new("test-domain"),
context_window: 4096,
reasoning_profile: Some("analytical".into()),
cost_profile: Some("low".into()),
latency_profile: Some("p50:100ms".into()),
jurisdiction: None,
capabilities: vec![LdpCapability {
name: "echo".into(),
description: Some("Echoes input back".into()),
input_schema: None,
output_schema: None,
quality: None,
domains: vec![],
}],
supported_payload_modes: vec![PayloadMode::SemanticFrame, PayloadMode::Text],
endpoint: String::new(),
metadata: HashMap::new(),
};
let handler: TaskHandler = Arc::new(|_skill, input| json!({ "echo": input }));
Self::new(identity, handler)
}
pub fn identity(&self) -> &LdpIdentityCard {
&self.identity
}
pub fn handle_identity_request(&self) -> Value {
serde_json::to_value(&self.identity).unwrap_or_default()
}
pub fn handle_capabilities_request(&self) -> Value {
json!({
"capabilities": self.identity.capabilities,
"supported_modes": self.identity.supported_payload_modes,
})
}
pub async fn handle_message(&self, envelope: LdpEnvelope) -> Result<LdpEnvelope, String> {
if let Some(ref secret) = self.signing_secret {
if let Some(ref sig) = envelope.signature {
if !crate::signing::verify_envelope(&envelope, secret, sig) {
return Err("Invalid message signature".to_string());
}
} else if !matches!(envelope.body, LdpMessageBody::Hello { .. }) {
return Err("Message signature required but not provided".to_string());
}
}
let mut response = match &envelope.body {
LdpMessageBody::Hello {
delegate_id,
supported_modes,
} => {
self.handle_hello(&envelope, delegate_id, supported_modes)
.await
}
LdpMessageBody::SessionPropose { config } => {
self.handle_session_propose(&envelope, config).await
}
LdpMessageBody::TaskSubmit {
task_id,
skill,
input,
..
} => {
self.handle_task_submit(&envelope, task_id, skill, input)
.await
}
LdpMessageBody::TaskUpdate { task_id, .. } => {
self.handle_task_status_query(&envelope, task_id).await
}
LdpMessageBody::TaskCancel { task_id } => {
self.handle_task_cancel(&envelope, task_id).await
}
LdpMessageBody::SessionClose { .. } => self.handle_session_close(&envelope).await,
_ => Err("Unhandled message type".to_string()),
}?;
if let Some(ref secret) = self.signing_secret {
crate::signing::apply_signature(&mut response, secret);
}
Ok(response)
}
async fn handle_hello(
&self,
envelope: &LdpEnvelope,
_delegate_id: &str,
_supported_modes: &[PayloadMode],
) -> Result<LdpEnvelope, String> {
info!(from = %envelope.from, "Received HELLO");
Ok(LdpEnvelope::new(
&envelope.session_id,
&self.identity.delegate_id,
&envelope.from,
LdpMessageBody::CapabilityManifest {
capabilities: json!({
"capabilities": self.identity.capabilities,
"supported_modes": self.identity.supported_payload_modes,
}),
},
PayloadMode::Text,
))
}
async fn handle_session_propose(
&self,
envelope: &LdpEnvelope,
config: &Value,
) -> Result<LdpEnvelope, String> {
let session_id = envelope.session_id.clone();
info!(session_id = %session_id, from = %envelope.from, "Session proposed");
let remote_domain = config
.get("trust_domain")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if !self.identity.trust_domain.trusts(remote_domain) {
let reason = format!(
"Trust domain '{}' not trusted by '{}'",
remote_domain, self.identity.trust_domain.name
);
return Ok(LdpEnvelope::new(
&session_id,
&self.identity.delegate_id,
&envelope.from,
LdpMessageBody::SessionReject {
reason: reason.clone(),
error: Some(LdpError::policy("TRUST_VIOLATION", reason)),
},
PayloadMode::Text,
));
}
let requested_mode = config
.get("payload_mode")
.and_then(|v| serde_json::from_value::<PayloadMode>(v.clone()).ok())
.unwrap_or(PayloadMode::SemanticFrame);
let negotiated = negotiate_payload_mode(
&[requested_mode, PayloadMode::Text],
&self.identity.supported_payload_modes,
);
let now = Utc::now();
let ttl = config
.get("ttl_secs")
.and_then(|v| v.as_u64())
.unwrap_or(3600);
let session = LdpSession {
session_id: session_id.clone(),
remote_url: String::new(),
remote_delegate_id: envelope.from.clone(),
state: SessionState::Active,
payload: negotiated.clone(),
trust_domain: self.identity.trust_domain.clone(),
created_at: now,
last_used: now,
ttl_secs: ttl,
task_count: 0,
};
{
let mut sessions = self.sessions.write().await;
sessions.insert(session_id.clone(), session);
}
let response = LdpEnvelope::new(
&session_id,
&self.identity.delegate_id,
&envelope.from,
LdpMessageBody::SessionAccept {
session_id: session_id.clone(),
negotiated_mode: negotiated.mode,
},
PayloadMode::Text,
);
Ok(response)
}
async fn handle_task_submit(
&self,
envelope: &LdpEnvelope,
task_id: &str,
skill: &str,
input: &Value,
) -> Result<LdpEnvelope, String> {
debug!(task_id = %task_id, skill = %skill, "Task submitted");
let output = (self.handler)(skill, input);
{
let mut tasks = self.tasks.write().await;
tasks.insert(
task_id.to_string(),
TaskRecord {
task_id: task_id.to_string(),
skill: skill.to_string(),
state: TaskRecordState::Completed,
output: Some(output.clone()),
error: None,
},
);
}
let mut provenance =
Provenance::new(&self.identity.delegate_id, &self.identity.model_version);
provenance.verification_status = VerificationStatus::SelfVerified;
#[allow(deprecated)]
{
provenance.verified = true;
}
let mode = {
let sessions = self.sessions.read().await;
sessions
.get(&envelope.session_id)
.map(|s| s.payload.mode)
.unwrap_or(PayloadMode::Text)
};
Ok(LdpEnvelope::new(
&envelope.session_id,
&self.identity.delegate_id,
&envelope.from,
LdpMessageBody::TaskResult {
task_id: task_id.to_string(),
output,
provenance,
},
mode,
))
}
async fn handle_task_status_query(
&self,
envelope: &LdpEnvelope,
task_id: &str,
) -> Result<LdpEnvelope, String> {
let tasks = self.tasks.read().await;
if let Some(record) = tasks.get(task_id) {
let body = match record.state {
TaskRecordState::Completed => {
let mut provenance =
Provenance::new(&self.identity.delegate_id, &self.identity.model_version);
provenance.verification_status = VerificationStatus::SelfVerified;
#[allow(deprecated)]
{
provenance.verified = true;
}
LdpMessageBody::TaskResult {
task_id: task_id.to_string(),
output: record.output.clone().unwrap_or(json!(null)),
provenance,
}
}
TaskRecordState::Failed => LdpMessageBody::TaskFailed {
task_id: task_id.to_string(),
error: LdpError::runtime(
"TASK_FAILED",
record
.error
.clone()
.unwrap_or_else(|| "unknown error".into()),
),
},
_ => LdpMessageBody::TaskUpdate {
task_id: task_id.to_string(),
progress: None,
message: Some(format!("{:?}", record.state).to_lowercase()),
},
};
Ok(LdpEnvelope::new(
&envelope.session_id,
&self.identity.delegate_id,
&envelope.from,
body,
PayloadMode::Text,
))
} else {
Err(format!("Unknown task: {}", task_id))
}
}
async fn handle_task_cancel(
&self,
envelope: &LdpEnvelope,
task_id: &str,
) -> Result<LdpEnvelope, String> {
info!(task_id = %task_id, "Task cancelled");
let mut tasks = self.tasks.write().await;
tasks.remove(task_id);
Ok(LdpEnvelope::new(
&envelope.session_id,
&self.identity.delegate_id,
&envelope.from,
LdpMessageBody::TaskFailed {
task_id: task_id.to_string(),
error: LdpError::runtime("CANCELLED", "Task cancelled by client"),
},
PayloadMode::Text,
))
}
async fn handle_session_close(&self, envelope: &LdpEnvelope) -> Result<LdpEnvelope, String> {
info!(session_id = %envelope.session_id, "Session closed");
let mut sessions = self.sessions.write().await;
sessions.remove(&envelope.session_id);
Ok(LdpEnvelope::new(
&envelope.session_id,
&self.identity.delegate_id,
&envelope.from,
LdpMessageBody::SessionClose {
reason: Some("acknowledged".into()),
},
PayloadMode::Text,
))
}
pub async fn active_sessions(&self) -> usize {
self.sessions.read().await.len()
}
pub async fn completed_tasks(&self) -> usize {
self.tasks
.read()
.await
.values()
.filter(|t| t.state == TaskRecordState::Completed)
.count()
}
}