use crate::client::LdpClient;
use crate::config::LdpAdapterConfig;
use crate::protocol::{
ProtocolAdapter, RemoteCapabilities, RemoteSkill, TaskEvent, TaskHandle, TaskRequest,
TaskStatus, TaskStream,
};
use crate::session_manager::SessionManager;
use crate::types::contract::{DelegationContract, FailurePolicy};
use crate::types::error::LdpError;
use crate::types::messages::{LdpEnvelope, LdpMessageBody};
use crate::types::provenance::Provenance;
use crate::types::verification::ProvenanceEntry;
use async_trait::async_trait;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, instrument};
fn validate_contract(contract: &DelegationContract, provenance: &Provenance) -> Vec<String> {
let mut violations = Vec::new();
if let Some(ref deadline_str) = contract.deadline {
if let Ok(deadline) = chrono::DateTime::parse_from_rfc3339(deadline_str) {
if chrono::Utc::now() > deadline {
violations.push("deadline_exceeded".into());
}
}
}
if let Some(ref budget) = contract.policy.budget {
if let (Some(max), Some(used)) = (budget.max_tokens, provenance.tokens_used) {
if used > max {
violations.push("budget_tokens_exceeded".into());
}
}
if let (Some(max), Some(used)) = (budget.max_cost_usd, provenance.cost_usd) {
if used > max {
violations.push("budget_cost_exceeded".into());
}
}
}
violations
}
fn build_lineage_entry(provenance: &Provenance, skill: &str) -> ProvenanceEntry {
ProvenanceEntry {
delegate_id: provenance.produced_by.clone(),
model_version: provenance.model_version.clone(),
step: skill.to_string(),
timestamp: provenance.timestamp.clone(),
verification_status: provenance.verification_status.clone(),
}
}
pub struct LdpAdapter {
session_manager: SessionManager,
client: LdpClient,
config: LdpAdapterConfig,
contracts: Arc<RwLock<HashMap<String, DelegationContract>>>,
}
impl LdpAdapter {
pub fn new(config: LdpAdapterConfig) -> Self {
let client = LdpClient::new();
let session_manager = SessionManager::new(client.clone(), config.clone());
Self {
session_manager,
client,
config,
contracts: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn with_client(config: LdpAdapterConfig, client: LdpClient) -> Self {
let session_manager = SessionManager::new(client.clone(), config.clone());
Self {
session_manager,
client,
config,
contracts: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn session_manager(&self) -> &SessionManager {
&self.session_manager
}
fn identity_to_capabilities(
&self,
identity: &crate::types::identity::LdpIdentityCard,
) -> RemoteCapabilities {
let skills = identity
.capabilities
.iter()
.map(|cap| RemoteSkill {
name: cap.name.clone(),
description: cap.description.clone(),
input_schema: cap.input_schema.clone(),
output_schema: cap.output_schema.clone(),
})
.collect();
RemoteCapabilities {
name: identity.name.clone(),
description: identity.description.clone(),
skills,
protocols: vec!["ldp".into()],
}
}
fn embed_provenance(&self, output: Value, provenance: Provenance) -> Value {
if self.config.attach_provenance {
match output {
Value::Object(mut map) => {
map.insert("ldp_provenance".into(), provenance.to_value());
Value::Object(map)
}
other => {
json!({
"result": other,
"ldp_provenance": provenance.to_value()
})
}
}
} else {
output
}
}
fn apply_contract_validation(
&self,
contract: &DelegationContract,
output: Value,
mut provenance: Provenance,
) -> TaskStatus {
let violations = validate_contract(contract, &provenance);
provenance.contract_id = Some(contract.contract_id.clone());
provenance.contract_satisfied = Some(violations.is_empty());
provenance.contract_violations = violations.clone();
let output = self.embed_provenance(output, provenance);
if !violations.is_empty() && contract.policy.failure_policy == FailurePolicy::FailClosed {
let summary = violations.join(", ");
TaskStatus::Failed {
error: LdpError::policy(
"CONTRACT_VIOLATED",
format!("Contract violations: {}", summary),
)
.with_partial_output(output),
}
} else {
TaskStatus::Completed { output }
}
}
}
#[async_trait]
impl ProtocolAdapter for LdpAdapter {
#[instrument(skip(self), fields(url = %url))]
async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String> {
info!(url = %url, "Discovering LDP delegate");
let identity = self.client.fetch_identity_card(url).await?;
if self.config.enforce_trust_domains
&& !self.config.trust_domain.trusts(&identity.trust_domain.name)
{
return Err(format!(
"Trust domain '{}' is not trusted by '{}'",
identity.trust_domain.name, self.config.trust_domain.name
));
}
let capabilities = self.identity_to_capabilities(&identity);
debug!(
name = %capabilities.name,
skills = capabilities.skills.len(),
"LDP delegate discovered"
);
Ok(capabilities)
}
#[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String> {
info!(url = %url, skill = %task.skill, "Invoking LDP task");
let session = self.session_manager.get_or_establish(url).await?;
let task_id = uuid::Uuid::new_v4().to_string();
let mut submit = LdpEnvelope::new(
&session.session_id,
&self.config.delegate_id,
&session.remote_delegate_id,
LdpMessageBody::TaskSubmit {
task_id: task_id.clone(),
skill: task.skill.clone(),
input: task.input.clone(),
contract: task.contract.clone(),
},
session.payload.mode,
);
if let Some(ref secret) = self.config.signing_secret {
crate::signing::apply_signature(&mut submit, secret);
}
let _response = self.client.send_message(url, &submit).await?;
if let Some(ref contract) = task.contract {
let mut contracts = self.contracts.write().await;
contracts.insert(task_id.clone(), contract.clone());
}
self.session_manager.touch(url).await;
debug!(task_id = %task_id, "LDP task submitted");
Ok(TaskHandle {
task_id,
remote_url: url.to_string(),
})
}
#[instrument(skip(self, task), fields(url = %url, skill = %task.skill))]
async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String> {
let contract = task.contract.clone();
let handle = self.invoke(url, task).await?;
let client = self.client.clone();
let config = self.config.clone();
let url = url.to_string();
let task_id = handle.task_id.clone();
let stream = async_stream::stream! {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
interval.tick().await;
let mut status_query = LdpEnvelope::new(
"",
&config.delegate_id,
&url,
LdpMessageBody::TaskUpdate {
task_id: task_id.clone(),
progress: None,
message: Some("status_query".into()),
},
crate::types::payload::PayloadMode::Text,
);
if let Some(ref secret) = config.signing_secret {
crate::signing::apply_signature(&mut status_query, secret);
}
match client.send_message(&url, &status_query).await {
Ok(response) => match response.body {
LdpMessageBody::TaskUpdate { progress, message, .. } => {
yield TaskEvent::Progress {
message: message.unwrap_or_default(),
progress,
};
}
LdpMessageBody::TaskResult { output, mut provenance, .. } => {
provenance.lineage.insert(0, build_lineage_entry(&provenance, "task"));
provenance.normalize();
let output_with_provenance = if config.attach_provenance {
match output {
Value::Object(mut map) => {
map.insert("ldp_provenance".into(),
provenance.to_value());
Value::Object(map)
}
other => json!({
"result": other,
"ldp_provenance": provenance.to_value()
}),
}
} else {
output
};
if let Some(ref contract) = contract {
let violations = validate_contract(contract, &provenance);
if !violations.is_empty() && contract.policy.failure_policy == FailurePolicy::FailClosed {
let summary = violations.join(", ");
yield TaskEvent::Failed {
error: LdpError::policy(
"CONTRACT_VIOLATED",
format!("Contract violations: {}", summary),
)
.with_partial_output(output_with_provenance),
};
} else {
yield TaskEvent::Completed { output: output_with_provenance };
}
} else {
yield TaskEvent::Completed { output: output_with_provenance };
}
break;
}
LdpMessageBody::TaskFailed { error, .. } => {
yield TaskEvent::Failed { error };
break;
}
_ => {}
},
Err(e) => {
yield TaskEvent::Failed { error: LdpError::transport("STREAM_ERROR", e) };
break;
}
}
}
};
Ok(Box::pin(stream))
}
#[instrument(skip(self), fields(url = %url, task_id = %task_id))]
async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String> {
debug!(task_id = %task_id, "Polling LDP task status");
let mut query = LdpEnvelope::new(
"",
&self.config.delegate_id,
url,
LdpMessageBody::TaskUpdate {
task_id: task_id.to_string(),
progress: None,
message: Some("status_query".into()),
},
crate::types::payload::PayloadMode::Text,
);
if let Some(ref secret) = self.config.signing_secret {
crate::signing::apply_signature(&mut query, secret);
}
let response = self.client.send_message(url, &query).await?;
match response.body {
LdpMessageBody::TaskUpdate { message, .. } => {
let msg = message.unwrap_or_default();
if msg == "submitted" {
Ok(TaskStatus::Submitted)
} else {
Ok(TaskStatus::Working)
}
}
LdpMessageBody::TaskResult {
output,
mut provenance,
..
} => {
provenance
.lineage
.insert(0, build_lineage_entry(&provenance, "task"));
provenance.normalize();
let contracts = self.contracts.read().await;
if let Some(contract) = contracts.get(task_id) {
Ok(self.apply_contract_validation(contract, output, provenance))
} else {
let output = self.embed_provenance(output, provenance);
Ok(TaskStatus::Completed { output })
}
}
LdpMessageBody::TaskFailed { error, .. } => Ok(TaskStatus::Failed { error }),
_ => Ok(TaskStatus::Working),
}
}
#[instrument(skip(self), fields(url = %url, task_id = %task_id))]
async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String> {
info!(task_id = %task_id, "Cancelling LDP task");
let mut cancel_msg = LdpEnvelope::new(
"",
&self.config.delegate_id,
url,
LdpMessageBody::TaskCancel {
task_id: task_id.to_string(),
},
crate::types::payload::PayloadMode::Text,
);
if let Some(ref secret) = self.config.signing_secret {
crate::signing::apply_signature(&mut cancel_msg, secret);
}
self.client.send_message(url, &cancel_msg).await?;
Ok(())
}
}