1pub mod anp;
9pub mod failure;
10pub mod registry;
11
12pub use failure::{DelegationFailure, DelegationFailureInfo, FailureSeverity};
13pub use registry::ProtocolRegistry;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::pin::Pin;
19use tokio_stream::Stream;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct TaskRequest {
24 pub skill: String,
25 pub input: Value,
26 pub timeout_secs: Option<u64>,
27 pub stream: bool,
28 pub metadata: Value,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct TaskHandle {
34 pub task_id: String,
35 pub remote_url: String,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40#[serde(tag = "type", rename_all = "snake_case")]
41pub enum TaskEvent {
42 Progress {
43 message: String,
44 progress: Option<f32>,
45 },
46 Artifact {
47 name: String,
48 data: Value,
49 },
50 InputRequired {
51 prompt: String,
52 },
53 Completed {
54 output: Value,
55 },
56 Failed {
57 error: String,
58 },
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TaskStatus {
65 Submitted,
66 Working,
67 InputRequired,
68 Completed { output: Value },
69 Failed { error: String },
70 Cancelled,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RemoteCapabilities {
76 pub name: String,
77 pub description: Option<String>,
78 pub skills: Vec<RemoteSkill>,
79 pub protocols: Vec<String>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct RemoteSkill {
84 pub name: String,
85 pub description: Option<String>,
86 pub input_schema: Option<Value>,
87 pub output_schema: Option<Value>,
88}
89
90pub type TaskStream = Pin<Box<dyn Stream<Item = TaskEvent> + Send>>;
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
99#[serde(tag = "type", rename_all = "snake_case")]
100pub enum StreamChunk {
101 TextDelta { delta: String },
103 ToolCall { name: String, arguments: Value },
105 Progress {
107 message: String,
108 fraction: Option<f32>,
109 },
110 Artifact {
112 name: String,
113 data: Value,
114 mime_type: Option<String>,
115 },
116 Final { output: Value },
118 Error { message: String },
120}
121
122pub type StructuredStream = Pin<Box<dyn Stream<Item = StreamChunk> + Send>>;
123
124#[async_trait]
126pub trait ProtocolAdapter: Send + Sync {
127 async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String>;
129
130 async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String>;
132
133 async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String>;
135
136 async fn stream_structured(
140 &self,
141 url: &str,
142 task: TaskRequest,
143 ) -> Result<StructuredStream, String> {
144 use tokio_stream::StreamExt;
145 let event_stream = self.stream(url, task).await?;
146 let chunk_stream = event_stream.map(|event| {
147 let chunk = match event {
148 TaskEvent::Progress { message, progress } => StreamChunk::Progress {
149 message,
150 fraction: progress,
151 },
152 TaskEvent::Artifact { name, data } => StreamChunk::Artifact {
153 name,
154 data,
155 mime_type: None,
156 },
157 TaskEvent::Completed { output } => StreamChunk::Final { output },
158 TaskEvent::Failed { error } => StreamChunk::Error { message: error },
159 TaskEvent::InputRequired { prompt } => StreamChunk::Progress {
160 message: format!("input required: {prompt}"),
161 fraction: None,
162 },
163 };
164 let chunk_type = match &chunk {
166 StreamChunk::TextDelta { .. } => "text_delta",
167 StreamChunk::ToolCall { .. } => "tool_call",
168 StreamChunk::Progress { .. } => "progress",
169 StreamChunk::Artifact { .. } => "artifact",
170 StreamChunk::Final { .. } => "final",
171 StreamChunk::Error { .. } => "error",
172 };
173 tracing::debug!(stream_chunk = chunk_type, "stream_chunk_emitted");
174 chunk
175 });
176 Ok(Box::pin(chunk_stream))
177 }
178
179 async fn stream_with_backpressure(
182 &self,
183 url: &str,
184 task: TaskRequest,
185 buffer_size: usize,
186 ) -> Result<StructuredStream, String> {
187 use tokio_stream::StreamExt;
188 let (tx, rx) = tokio::sync::mpsc::channel::<StreamChunk>(buffer_size);
189 let mut source = self.stream_structured(url, task).await?;
190 tokio::spawn(async move {
191 while let Some(chunk) = source.next().await {
192 if tx.send(chunk).await.is_err() {
193 break; }
195 }
196 });
197 Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
198 }
199
200 async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String>;
202
203 async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String>;
205}