use ldp_protocol::config::LdpAdapterConfig;
use ldp_protocol::protocol::{ProtocolAdapter, TaskRequest, TaskStatus};
use ldp_protocol::server::LdpServer;
use ldp_protocol::types::contract::{DelegationContract, FailurePolicy};
use ldp_protocol::types::messages::LdpEnvelope;
use ldp_protocol::LdpAdapter;
use serde_json::json;
use std::sync::Arc;
use tokio::net::TcpListener;
async fn start_test_server(server: LdpServer) -> String {
let server = Arc::new(server);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let server = server.clone();
tokio::spawn(async move {
handle_connection(stream, server).await;
});
}
});
format!("http://{}", addr)
}
async fn handle_connection(stream: tokio::net::TcpStream, server: Arc<LdpServer>) {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut stream = stream;
let mut buf = vec![0u8; 65536];
let n = stream.read(&mut buf).await.unwrap_or(0);
if n == 0 {
return;
}
let request = String::from_utf8_lossy(&buf[..n]);
let first_line = request.lines().next().unwrap_or("");
let parts: Vec<&str> = first_line.split_whitespace().collect();
if parts.len() < 2 {
return;
}
let method = parts[0];
let path = parts[1];
let (status, body) = match (method, path) {
("GET", "/.well-known/ldp-identity") => {
let identity = server.handle_identity_request();
("200 OK", serde_json::to_string(&identity).unwrap())
}
("GET", "/ldp/identity") => {
let identity = server.handle_identity_request();
("200 OK", serde_json::to_string(&identity).unwrap())
}
("GET", "/ldp/capabilities") => {
let caps = server.handle_capabilities_request();
("200 OK", serde_json::to_string(&caps).unwrap())
}
("POST", "/ldp/messages") => {
let body_start = request
.find("\r\n\r\n")
.map(|i| i + 4)
.or_else(|| request.find("\n\n").map(|i| i + 2))
.unwrap_or(n);
let json_body = &request[body_start..];
match serde_json::from_str::<LdpEnvelope>(json_body) {
Ok(envelope) => match server.handle_message(envelope).await {
Ok(response) => ("200 OK", serde_json::to_string(&response).unwrap()),
Err(e) => ("500 Internal Server Error", json!({"error": e}).to_string()),
},
Err(e) => (
"400 Bad Request",
json!({"error": e.to_string()}).to_string(),
),
}
}
_ => ("404 Not Found", json!({"error": "not found"}).to_string()),
};
let response = format!(
"HTTP/1.1 {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status,
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
}
fn test_adapter() -> LdpAdapter {
LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:test-client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
..Default::default()
})
}
#[tokio::test]
async fn test_discover() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = test_adapter();
let caps = adapter.discover(&base_url).await.unwrap();
assert_eq!(caps.name, "Echo Server");
assert_eq!(caps.protocols, vec!["ldp"]);
assert!(!caps.skills.is_empty());
assert_eq!(caps.skills[0].name, "echo");
}
#[tokio::test]
async fn test_invoke_returns_handle() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = test_adapter();
let task = TaskRequest {
skill: "echo".into(),
input: json!({"message": "hello"}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
assert!(!handle.task_id.is_empty());
assert_eq!(handle.remote_url, base_url);
}
#[tokio::test]
async fn test_invoke_and_status() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = test_adapter();
let task = TaskRequest {
skill: "echo".into(),
input: json!({"value": 42}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
TaskStatus::Completed { output } => {
assert!(output.get("echo").is_some() || output.get("result").is_some());
assert!(
output.get("ldp_provenance").is_some(),
"Expected provenance in output, got: {}",
output
);
}
other => panic!("Expected Completed, got: {:?}", other),
}
}
#[tokio::test]
async fn test_session_reuse() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = test_adapter();
let task1 = TaskRequest {
skill: "echo".into(),
input: json!({"call": 1}),
contract: None,
};
let h1 = adapter.invoke(&base_url, task1).await.unwrap();
let task2 = TaskRequest {
skill: "echo".into(),
input: json!({"call": 2}),
contract: None,
};
let h2 = adapter.invoke(&base_url, task2).await.unwrap();
assert_ne!(h1.task_id, h2.task_id);
assert_eq!(adapter.session_manager().active_count().await, 1);
}
#[tokio::test]
async fn test_cancel() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = test_adapter();
let task = TaskRequest {
skill: "echo".into(),
input: json!({}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let result = adapter.cancel(&base_url, &handle.task_id).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_provenance_present_in_output() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = test_adapter();
let task = TaskRequest {
skill: "echo".into(),
input: json!({"data": "test"}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
if let TaskStatus::Completed { output } = status {
let prov = output.get("ldp_provenance").expect("provenance missing");
assert_eq!(
prov.get("produced_by").unwrap().as_str().unwrap(),
"ldp:delegate:echo"
);
assert_eq!(prov.get("model_version").unwrap().as_str().unwrap(), "1.0");
assert!(prov.get("payload_mode_used").is_some());
} else {
panic!("Expected Completed status");
}
}
#[tokio::test]
async fn test_trust_domain_mismatch_rejected() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:strict-client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("production-only"),
enforce_trust_domains: true,
..Default::default()
});
let result = adapter.discover(&base_url).await;
assert!(result.is_err());
assert!(
result.unwrap_err().contains("not trusted"),
"Expected trust domain error"
);
}
#[tokio::test]
async fn test_cross_domain_with_trusted_peer_succeeds() {
use ldp_protocol::types::trust::TrustDomain;
let server = {
use ldp_protocol::types::capability::LdpCapability;
use ldp_protocol::types::identity::LdpIdentityCard;
use ldp_protocol::types::payload::PayloadMode;
use serde_json::json;
use std::collections::HashMap;
let identity = LdpIdentityCard {
delegate_id: "ldp:delegate:partner".to_string(),
name: "Partner Server".to_string(),
description: Some("Partner domain server".into()),
model_family: "TestModel".into(),
model_version: "1.0".into(),
weights_fingerprint: None,
trust_domain: TrustDomain {
name: "partner".into(),
allow_cross_domain: true,
trusted_peers: vec!["acme".into()],
},
context_window: 4096,
reasoning_profile: Some("analytical".into()),
cost_profile: Some("low".into()),
latency_profile: Some("p50:100ms".into()),
jurisdiction: None,
capabilities: vec![LdpCapability {
name: "echo".into(),
description: Some("Echoes input back".into()),
input_schema: None,
output_schema: None,
quality: None,
domains: vec![],
}],
supported_payload_modes: vec![PayloadMode::SemanticFrame, PayloadMode::Text],
endpoint: String::new(),
metadata: HashMap::new(),
};
let handler: ldp_protocol::server::TaskHandler =
Arc::new(|_skill, input| json!({ "echo": input }));
LdpServer::new(identity, handler)
};
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:acme-client".into(),
trust_domain: TrustDomain {
name: "acme".into(),
allow_cross_domain: true,
trusted_peers: vec!["partner".into()],
},
enforce_trust_domains: true,
..Default::default()
});
let caps = adapter.discover(&base_url).await.unwrap();
assert_eq!(caps.name, "Partner Server");
}
#[tokio::test]
async fn test_wellknown_discovery() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let client = ldp_protocol::LdpClient::new();
let identity = client.fetch_identity_wellknown(&base_url).await.unwrap();
assert_eq!(identity.delegate_id, "ldp:delegate:echo");
assert_eq!(identity.name, "Echo Server");
}
#[tokio::test]
async fn test_signed_messages() {
let secret = "shared-test-secret";
let server =
LdpServer::echo_server("ldp:delegate:echo", "Echo Server").with_signing_secret(secret);
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:signed-client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
signing_secret: Some(secret.to_string()),
enforce_trust_domains: false,
..Default::default()
});
let task = TaskRequest {
skill: "echo".into(),
input: json!({"signed": true}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
assert!(!handle.task_id.is_empty());
}
#[tokio::test]
async fn test_wrong_signing_secret_rejected() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server")
.with_signing_secret("server-secret");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:bad-client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
signing_secret: Some("wrong-secret".to_string()),
enforce_trust_domains: false,
..Default::default()
});
let task = TaskRequest {
skill: "echo".into(),
input: json!({}),
contract: None,
};
let result = adapter.invoke(&base_url, task).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_cross_domain_without_trust_fails() {
use ldp_protocol::types::trust::TrustDomain;
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:isolated-client".into(),
trust_domain: TrustDomain::new("isolated"),
enforce_trust_domains: true,
..Default::default()
});
let result = adapter.discover(&base_url).await;
assert!(result.is_err());
assert!(
result.unwrap_err().contains("not trusted"),
"Expected trust domain error"
);
}
#[tokio::test]
async fn test_session_expiry_forces_reestablishment() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:short-ttl".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
session: ldp_protocol::types::session::SessionConfig {
ttl_secs: 1,
..Default::default()
},
enforce_trust_domains: false,
..Default::default()
});
let h1 = adapter
.invoke(
&base_url,
TaskRequest {
skill: "echo".into(),
input: json!({"call": 1}),
contract: None,
},
)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let h2 = adapter
.invoke(
&base_url,
TaskRequest {
skill: "echo".into(),
input: json!({"call": 2}),
contract: None,
},
)
.await
.unwrap();
assert_ne!(h1.task_id, h2.task_id);
}
#[tokio::test]
async fn test_payload_negotiation_text_only() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:text-only".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
session: ldp_protocol::types::session::SessionConfig {
preferred_payload_modes: vec![ldp_protocol::types::payload::PayloadMode::Text],
..Default::default()
},
enforce_trust_domains: false,
..Default::default()
});
let handle = adapter
.invoke(
&base_url,
TaskRequest {
skill: "echo".into(),
input: json!({"mode_test": true}),
contract: None,
},
)
.await
.unwrap();
assert!(!handle.task_id.is_empty());
}
#[tokio::test]
async fn test_discover_multiple_skills() {
use ldp_protocol::types::capability::LdpCapability;
use ldp_protocol::types::identity::LdpIdentityCard;
use ldp_protocol::types::payload::PayloadMode;
use std::collections::HashMap;
let identity = LdpIdentityCard {
delegate_id: "ldp:delegate:multi".into(),
name: "Multi-Skill Agent".into(),
description: None,
model_family: "TestModel".into(),
model_version: "1.0".into(),
weights_fingerprint: None,
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
context_window: 4096,
reasoning_profile: None,
cost_profile: None,
latency_profile: None,
jurisdiction: None,
capabilities: vec![
LdpCapability {
name: "reasoning".into(),
description: None,
input_schema: None,
output_schema: None,
quality: None,
domains: vec![],
},
LdpCapability {
name: "coding".into(),
description: None,
input_schema: None,
output_schema: None,
quality: None,
domains: vec![],
},
LdpCapability {
name: "writing".into(),
description: None,
input_schema: None,
output_schema: None,
quality: None,
domains: vec![],
},
],
supported_payload_modes: vec![PayloadMode::Text],
endpoint: String::new(),
metadata: HashMap::new(),
};
let handler: ldp_protocol::server::TaskHandler =
std::sync::Arc::new(|_s, i| serde_json::json!({"echo": i}));
let server = LdpServer::new(identity, handler);
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:client".into(),
enforce_trust_domains: false,
..Default::default()
});
let caps = adapter.discover(&base_url).await.unwrap();
assert_eq!(caps.skills.len(), 3);
assert_eq!(caps.skills[0].name, "reasoning");
assert_eq!(caps.skills[1].name, "coding");
assert_eq!(caps.skills[2].name, "writing");
}
#[tokio::test]
async fn test_signed_invoke_has_provenance() {
let secret = "provenance-test-secret";
let server =
LdpServer::echo_server("ldp:delegate:echo", "Echo Server").with_signing_secret(secret);
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:signed".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
signing_secret: Some(secret.to_string()),
enforce_trust_domains: false,
..Default::default()
});
let handle = adapter
.invoke(
&base_url,
TaskRequest {
skill: "echo".into(),
input: json!({"test": "provenance"}),
contract: None,
},
)
.await
.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
ldp_protocol::protocol::TaskStatus::Completed { output } => {
assert!(output.get("ldp_provenance").is_some() || output.get("echo").is_some());
}
_ => panic!("Expected Completed status"),
}
}
#[tokio::test]
async fn test_contract_no_violations() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
enforce_trust_domains: false,
..Default::default()
});
let contract = DelegationContract::new("Echo the input", vec!["return input".into()])
.with_deadline("2099-12-31T23:59:59Z");
let task = TaskRequest {
skill: "echo".into(),
input: json!({"data": "test"}),
contract: Some(contract),
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
TaskStatus::Completed { output } => {
let prov = output.get("ldp_provenance").expect("provenance missing");
assert_eq!(
prov.get("contract_satisfied").and_then(|v| v.as_bool()),
Some(true)
);
let violations = prov
.get("contract_violations")
.and_then(|v| v.as_array())
.unwrap();
assert!(violations.is_empty());
}
other => panic!("Expected Completed, got: {:?}", other),
}
}
#[tokio::test]
async fn test_contract_deadline_exceeded_fail_open() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
enforce_trust_domains: false,
..Default::default()
});
let contract = DelegationContract::new("Echo", vec![]).with_deadline("2020-01-01T00:00:00Z");
let task = TaskRequest {
skill: "echo".into(),
input: json!({}),
contract: Some(contract),
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
TaskStatus::Completed { output } => {
let prov = output.get("ldp_provenance").expect("provenance missing");
assert_eq!(
prov.get("contract_satisfied").and_then(|v| v.as_bool()),
Some(false)
);
let violations = prov
.get("contract_violations")
.and_then(|v| v.as_array())
.unwrap();
assert!(violations
.iter()
.any(|v| v.as_str() == Some("deadline_exceeded")));
}
other => panic!("Expected Completed (FailOpen), got: {:?}", other),
}
}
#[tokio::test]
async fn test_contract_deadline_fail_closed() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
enforce_trust_domains: false,
..Default::default()
});
let contract = DelegationContract::new("Echo", vec![])
.with_deadline("2020-01-01T00:00:00Z")
.with_failure_policy(FailurePolicy::FailClosed);
let task = TaskRequest {
skill: "echo".into(),
input: json!({}),
contract: Some(contract),
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
TaskStatus::Failed { error } => {
assert_eq!(error.code, "CONTRACT_VIOLATED");
assert!(
error.partial_output.is_some(),
"FailClosed should preserve output"
);
}
other => panic!("Expected Failed, got: {:?}", other),
}
}
#[tokio::test]
async fn test_no_contract_backward_compatible() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
enforce_trust_domains: false,
..Default::default()
});
let task = TaskRequest {
skill: "echo".into(),
input: json!({"data": "test"}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
TaskStatus::Completed { .. } => {}
other => panic!("Expected Completed, got: {:?}", other),
}
}
#[tokio::test]
async fn test_result_has_lineage_entry() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
enforce_trust_domains: false,
..Default::default()
});
let task = TaskRequest {
skill: "echo".into(),
input: json!({"data": "lineage-test"}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
TaskStatus::Completed { output } => {
let prov = output.get("ldp_provenance").expect("provenance missing");
let lineage = prov.get("lineage").and_then(|v| v.as_array()).unwrap();
assert_eq!(lineage.len(), 1);
assert_eq!(
lineage[0].get("delegate_id").and_then(|v| v.as_str()),
Some("ldp:delegate:echo")
);
assert_eq!(
lineage[0]
.get("verification_status")
.and_then(|v| v.as_str()),
Some("self_verified")
);
}
other => panic!("Expected Completed, got: {:?}", other),
}
}
#[tokio::test]
async fn test_result_has_verification_status() {
let server = LdpServer::echo_server("ldp:delegate:echo", "Echo Server");
let base_url = start_test_server(server).await;
let adapter = LdpAdapter::new(LdpAdapterConfig {
delegate_id: "ldp:delegate:client".into(),
trust_domain: ldp_protocol::types::trust::TrustDomain::new("test-domain"),
enforce_trust_domains: false,
..Default::default()
});
let task = TaskRequest {
skill: "echo".into(),
input: json!({"data": "verification-test"}),
contract: None,
};
let handle = adapter.invoke(&base_url, task).await.unwrap();
let status = adapter.status(&base_url, &handle.task_id).await.unwrap();
match status {
TaskStatus::Completed { output } => {
let prov = output.get("ldp_provenance").expect("provenance missing");
assert_eq!(
prov.get("verification_status").and_then(|v| v.as_str()),
Some("self_verified")
);
}
other => panic!("Expected Completed, got: {:?}", other),
}
}