pub mod anp;
pub mod failure;
pub mod registry;
pub use failure::{DelegationFailure, DelegationFailureInfo, FailureSeverity};
pub use registry::ProtocolRegistry;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::pin::Pin;
use tokio_stream::Stream;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskRequest {
pub skill: String,
pub input: Value,
pub timeout_secs: Option<u64>,
pub stream: bool,
pub metadata: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskHandle {
pub task_id: String,
pub remote_url: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TaskEvent {
Progress {
message: String,
progress: Option<f32>,
},
Artifact {
name: String,
data: Value,
},
InputRequired {
prompt: String,
},
Completed {
output: Value,
},
Failed {
error: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
Submitted,
Working,
InputRequired,
Completed { output: Value },
Failed { error: String },
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteCapabilities {
pub name: String,
pub description: Option<String>,
pub skills: Vec<RemoteSkill>,
pub protocols: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteSkill {
pub name: String,
pub description: Option<String>,
pub input_schema: Option<Value>,
pub output_schema: Option<Value>,
}
pub type TaskStream = Pin<Box<dyn Stream<Item = TaskEvent> + Send>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamChunk {
TextDelta { delta: String },
ToolCall { name: String, arguments: Value },
Progress {
message: String,
fraction: Option<f32>,
},
Artifact {
name: String,
data: Value,
mime_type: Option<String>,
},
Final { output: Value },
Error { message: String },
}
pub type StructuredStream = Pin<Box<dyn Stream<Item = StreamChunk> + Send>>;
#[async_trait]
pub trait ProtocolAdapter: Send + Sync {
async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String>;
async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String>;
async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String>;
async fn stream_structured(
&self,
url: &str,
task: TaskRequest,
) -> Result<StructuredStream, String> {
use tokio_stream::StreamExt;
let event_stream = self.stream(url, task).await?;
let chunk_stream = event_stream.map(|event| {
let chunk = match event {
TaskEvent::Progress { message, progress } => StreamChunk::Progress {
message,
fraction: progress,
},
TaskEvent::Artifact { name, data } => StreamChunk::Artifact {
name,
data,
mime_type: None,
},
TaskEvent::Completed { output } => StreamChunk::Final { output },
TaskEvent::Failed { error } => StreamChunk::Error { message: error },
TaskEvent::InputRequired { prompt } => StreamChunk::Progress {
message: format!("input required: {prompt}"),
fraction: None,
},
};
let chunk_type = match &chunk {
StreamChunk::TextDelta { .. } => "text_delta",
StreamChunk::ToolCall { .. } => "tool_call",
StreamChunk::Progress { .. } => "progress",
StreamChunk::Artifact { .. } => "artifact",
StreamChunk::Final { .. } => "final",
StreamChunk::Error { .. } => "error",
};
tracing::debug!(stream_chunk = chunk_type, "stream_chunk_emitted");
chunk
});
Ok(Box::pin(chunk_stream))
}
async fn stream_with_backpressure(
&self,
url: &str,
task: TaskRequest,
buffer_size: usize,
) -> Result<StructuredStream, String> {
use tokio_stream::StreamExt;
let (tx, rx) = tokio::sync::mpsc::channel::<StreamChunk>(buffer_size);
let mut source = self.stream_structured(url, task).await?;
tokio::spawn(async move {
while let Some(chunk) = source.next().await {
if tx.send(chunk).await.is_err() {
break; }
}
});
Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String>;
async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String>;
}