Skip to main content

jamjet_protocols/
lib.rs

1//! JamJet Protocol Adapter Framework
2//!
3//! The `ProtocolAdapter` trait defines the common interface all protocol
4//! adapters must implement. Built-in adapters: MCP, A2A, ANP.
5//! New protocols can be added by implementing this trait without modifying
6//! the runtime core.
7
8pub mod anp;
9pub mod failure;
10pub mod registry;
11
12pub use failure::{DelegationFailure, DelegationFailureInfo, FailureSeverity};
13pub use registry::ProtocolRegistry;
14
15use async_trait::async_trait;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::pin::Pin;
19use tokio_stream::Stream;
20
21/// A task request to a remote agent or tool provider.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct TaskRequest {
24    pub skill: String,
25    pub input: Value,
26    pub timeout_secs: Option<u64>,
27    pub stream: bool,
28    pub metadata: Value,
29}
30
31/// A handle to a submitted task.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct TaskHandle {
34    pub task_id: String,
35    pub remote_url: String,
36}
37
38/// An event streamed from a running task.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40#[serde(tag = "type", rename_all = "snake_case")]
41pub enum TaskEvent {
42    Progress {
43        message: String,
44        progress: Option<f32>,
45    },
46    Artifact {
47        name: String,
48        data: Value,
49    },
50    InputRequired {
51        prompt: String,
52    },
53    Completed {
54        output: Value,
55    },
56    Failed {
57        error: String,
58    },
59}
60
61/// Current status of a remote task.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TaskStatus {
65    Submitted,
66    Working,
67    InputRequired,
68    Completed { output: Value },
69    Failed { error: String },
70    Cancelled,
71}
72
73/// Capabilities discovered from a remote agent or tool provider.
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct RemoteCapabilities {
76    pub name: String,
77    pub description: Option<String>,
78    pub skills: Vec<RemoteSkill>,
79    pub protocols: Vec<String>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct RemoteSkill {
84    pub name: String,
85    pub description: Option<String>,
86    pub input_schema: Option<Value>,
87    pub output_schema: Option<Value>,
88}
89
90pub type TaskStream = Pin<Box<dyn Stream<Item = TaskEvent> + Send>>;
91
92// ── Structured streaming (I2.5) ───────────────────────────────────────────────
93
94/// A typed stream chunk for structured streaming across all protocol adapters.
95///
96/// Follows the OTel GenAI structured streaming convention:
97/// text_delta → tool_call → progress → artifact → final
98#[derive(Debug, Clone, Serialize, Deserialize)]
99#[serde(tag = "type", rename_all = "snake_case")]
100pub enum StreamChunk {
101    /// Incremental text delta (LLM streaming tokens).
102    TextDelta { delta: String },
103    /// A tool call invocation during streaming.
104    ToolCall { name: String, arguments: Value },
105    /// Progress indicator (0.0–1.0).
106    Progress {
107        message: String,
108        fraction: Option<f32>,
109    },
110    /// A completed artifact (file, data, structured output).
111    Artifact {
112        name: String,
113        data: Value,
114        mime_type: Option<String>,
115    },
116    /// Final completed output. Signals end of stream.
117    Final { output: Value },
118    /// Error that terminates the stream.
119    Error { message: String },
120}
121
122pub type StructuredStream = Pin<Box<dyn Stream<Item = StreamChunk> + Send>>;
123
124/// The protocol adapter trait. Implement this to add a new agent communication protocol.
125#[async_trait]
126pub trait ProtocolAdapter: Send + Sync {
127    /// Discover remote capabilities (fetch Agent Card or equivalent).
128    async fn discover(&self, url: &str) -> Result<RemoteCapabilities, String>;
129
130    /// Submit a task/request to the remote.
131    async fn invoke(&self, url: &str, task: TaskRequest) -> Result<TaskHandle, String>;
132
133    /// Stream task progress events (for streaming tasks).
134    async fn stream(&self, url: &str, task: TaskRequest) -> Result<TaskStream, String>;
135
136    /// Stream task with structured typed chunks (I2.6).
137    /// Default: wraps `stream()` and maps `TaskEvent` to `StreamChunk`.
138    /// Each chunk is also emitted as a `tracing` span event (I2.8).
139    async fn stream_structured(
140        &self,
141        url: &str,
142        task: TaskRequest,
143    ) -> Result<StructuredStream, String> {
144        use tokio_stream::StreamExt;
145        let event_stream = self.stream(url, task).await?;
146        let chunk_stream = event_stream.map(|event| {
147            let chunk = match event {
148                TaskEvent::Progress { message, progress } => StreamChunk::Progress {
149                    message,
150                    fraction: progress,
151                },
152                TaskEvent::Artifact { name, data } => StreamChunk::Artifact {
153                    name,
154                    data,
155                    mime_type: None,
156                },
157                TaskEvent::Completed { output } => StreamChunk::Final { output },
158                TaskEvent::Failed { error } => StreamChunk::Error { message: error },
159                TaskEvent::InputRequired { prompt } => StreamChunk::Progress {
160                    message: format!("input required: {prompt}"),
161                    fraction: None,
162                },
163            };
164            // I2.8: emit span event for each structured stream chunk.
165            let chunk_type = match &chunk {
166                StreamChunk::TextDelta { .. } => "text_delta",
167                StreamChunk::ToolCall { .. } => "tool_call",
168                StreamChunk::Progress { .. } => "progress",
169                StreamChunk::Artifact { .. } => "artifact",
170                StreamChunk::Final { .. } => "final",
171                StreamChunk::Error { .. } => "error",
172            };
173            tracing::debug!(stream_chunk = chunk_type, "stream_chunk_emitted");
174            chunk
175        });
176        Ok(Box::pin(chunk_stream))
177    }
178
179    /// Stream with backpressure: bounded channel buffers at most `buffer_size` chunks
180    /// before applying flow control to the producer (I2.9).
181    async fn stream_with_backpressure(
182        &self,
183        url: &str,
184        task: TaskRequest,
185        buffer_size: usize,
186    ) -> Result<StructuredStream, String> {
187        use tokio_stream::StreamExt;
188        let (tx, rx) = tokio::sync::mpsc::channel::<StreamChunk>(buffer_size);
189        let mut source = self.stream_structured(url, task).await?;
190        tokio::spawn(async move {
191            while let Some(chunk) = source.next().await {
192                if tx.send(chunk).await.is_err() {
193                    break; // receiver dropped
194                }
195            }
196        });
197        Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
198    }
199
200    /// Poll task status by task_id.
201    async fn status(&self, url: &str, task_id: &str) -> Result<TaskStatus, String>;
202
203    /// Cancel a running task.
204    async fn cancel(&self, url: &str, task_id: &str) -> Result<(), String>;
205}