use crate::client_stream::StreamItem;
use crate::config::{BuildHttpClient, DistriConfig};
use crate::{AgentStreamClient, ClientError, StreamError};
use distri_a2a::{
EventKind, JsonRpcRequest, Message as A2aMessage, MessageKind, MessageSendConfiguration,
MessageSendParams, Role,
};
use distri_types::{
ExternalTool, LLmContext, LlmDefinition, Message, MessageRole, TokenResponse, ToolCall,
a2a_converters::MessageMetadata, prompt::PromptSection,
};
use distri_types::{StandardDefinition, ToolResponse, configuration::AgentConfigWithTools};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[derive(Debug, Clone, Deserialize)]
pub struct AgentRegistrationResponse {
pub name: String,
#[serde(default)]
pub version: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct LoginUrlResponse {
pub login_url: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ApiKeyResponse {
pub id: uuid::Uuid,
pub name: String,
pub key: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct WorkspaceResponse {
pub id: uuid::Uuid,
pub name: String,
pub slug: String,
pub description: Option<String>,
pub is_personal: bool,
pub role: String,
}
#[derive(Clone)]
pub struct Distri {
pub(crate) base_url: String,
pub(crate) http: reqwest::Client,
stream: AgentStreamClient,
config: DistriConfig,
}
impl Distri {
pub fn new() -> Self {
let config = DistriConfig::default();
Self::from_config(config)
}
pub fn from_env() -> Self {
let config = DistriConfig::from_env();
Self::from_config(config)
}
pub fn from_config(config: DistriConfig) -> Self {
let base = config.base_url.clone();
let http = <DistriConfig as BuildHttpClient>::build_http_client(&config)
.expect("Failed to build HTTP client");
let mut stream = AgentStreamClient::from_config(config.clone());
stream = stream.with_http_client(http.clone());
Self {
base_url: base,
http,
stream,
config,
}
}
pub fn with_api_key(mut self, api_key: impl Into<String>) -> Self {
self.config = self.config.with_api_key(api_key);
self.http = <DistriConfig as BuildHttpClient>::build_http_client(&self.config)
.expect("Failed to build HTTP client");
self.stream = self.stream.clone().with_http_client(self.http.clone());
self
}
pub fn with_workspace_id(mut self, workspace_id: impl Into<String>) -> Self {
self.config = self.config.with_workspace_id(workspace_id);
self.http = <DistriConfig as BuildHttpClient>::build_http_client(&self.config)
.expect("Failed to build HTTP client");
self.stream = self.stream.clone().with_http_client(self.http.clone());
self
}
pub fn workspace_id(&self) -> Option<&str> {
self.config.workspace_id.as_deref()
}
pub fn with_http_client(mut self, client: reqwest::Client) -> Self {
self.http = client.clone();
self.stream = self.stream.clone().with_http_client(client);
self
}
pub fn base_url(&self) -> &str {
&self.base_url
}
pub fn http_client(&self) -> &reqwest::Client {
&self.http
}
pub fn config(&self) -> &DistriConfig {
&self.config
}
pub fn has_auth(&self) -> bool {
self.config.has_auth()
}
pub fn is_local(&self) -> bool {
self.config.is_local()
}
pub async fn register_agent(&self, definition: &StandardDefinition) -> Result<(), ClientError> {
let create_url = format!("{}/agents", self.base_url);
let resp = self.http.post(&create_url).json(definition).send().await?;
if resp.status().is_success() {
return Ok(());
}
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"agent registration failed (status {status}): {body}"
)))
}
pub async fn fetch_agent(
&self,
agent_id: &str,
) -> Result<Option<AgentConfigWithTools>, ClientError> {
let url = format!("{}/agents/{}", self.base_url, agent_id);
let resp = self.http.get(&url).send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if !resp.status().is_success() {
return Err(ClientError::InvalidResponse(format!(
"failed to fetch agent {}: {}",
agent_id,
resp.status()
)));
}
Ok(Some(resp.json::<AgentConfigWithTools>().await?))
}
pub async fn register_agent_markdown(
&self,
markdown: &str,
) -> Result<AgentRegistrationResponse, ClientError> {
let create_url = format!("{}/agents", self.base_url);
if let Some(workspace_id) = self.workspace_id() {
tracing::info!("Pushing agent to: {create_url} (workspace: {workspace_id})");
} else {
tracing::info!("Pushing agent to: {create_url}");
}
let resp = self
.http
.post(&create_url)
.header(reqwest::header::CONTENT_TYPE, "text/markdown")
.body(markdown.to_string())
.send()
.await?;
if resp.status().is_success() {
let response: AgentRegistrationResponse = resp.json().await.map_err(|e| {
ClientError::InvalidResponse(format!("Failed to read response: {}", e))
})?;
return Ok(response);
}
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"Agent registration failed (status {status}): {body}"
)))
}
pub async fn complete_tool(
&self,
agent: impl AsRef<str>,
tool_response: &ToolResponse,
) -> Result<(), ClientError> {
let url = format!("{}/agents/{}/complete-tool", self.base_url, agent.as_ref());
let payload = serde_json::json!({
"tool_call_id": tool_response.tool_call_id,
"tool_response": tool_response,
});
let resp = self
.http
.post(&url)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.json(&payload)
.send()
.await?;
if resp.status().is_success() {
return Ok(());
}
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"complete-tool failed (status {status}): {body}"
)))
}
pub async fn set_session_value(
&self,
session_id: &str,
key: &str,
value: serde_json::Value,
expiry_iso: Option<&str>,
) -> Result<(), ClientError> {
#[derive(serde::Serialize)]
struct SetRequest<'a> {
key: &'a str,
value: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
expiry: Option<&'a str>,
}
let url = format!("{}/sessions/{}/values", self.base_url, session_id);
let body = SetRequest {
key,
value,
expiry: expiry_iso,
};
let resp = self.http.post(url).json(&body).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to set session value: {}",
text
)))
}
}
pub async fn get_session_value(
&self,
session_id: &str,
key: &str,
) -> Result<Option<serde_json::Value>, ClientError> {
#[derive(serde::Deserialize)]
struct GetResponse {
value: Option<serde_json::Value>,
}
let url = format!("{}/sessions/{}/values/{}", self.base_url, session_id, key);
let resp = self.http.get(url).send().await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"failed to get session value: {}",
text
)));
}
let data: GetResponse = serde_json::from_str(&resp.text().await?)?;
Ok(data.value)
}
pub async fn get_session_values(
&self,
session_id: &str,
) -> Result<std::collections::HashMap<String, serde_json::Value>, ClientError> {
#[derive(serde::Deserialize)]
struct GetAllResponse {
values: std::collections::HashMap<String, serde_json::Value>,
}
let url = format!("{}/sessions/{}/values", self.base_url, session_id);
let resp = self.http.get(url).send().await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"failed to get session values: {}",
text
)));
}
let data: GetAllResponse = serde_json::from_str(&resp.text().await?)?;
Ok(data.values)
}
pub async fn delete_session_value(
&self,
session_id: &str,
key: &str,
) -> Result<(), ClientError> {
let url = format!("{}/sessions/{}/values/{}", self.base_url, session_id, key);
let resp = self.http.delete(url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete session value: {}",
text
)))
}
}
pub async fn clear_session(&self, session_id: &str) -> Result<(), ClientError> {
let url = format!("{}/sessions/{}", self.base_url, session_id);
let resp = self.http.delete(url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to clear session: {}",
text
)))
}
}
pub const USER_PART_PREFIX: &'static str = "__user_part_";
pub async fn set_user_part(
&self,
session_id: &str,
name: &str,
part: distri_types::Part,
) -> Result<(), ClientError> {
let key = format!("{}{}", Self::USER_PART_PREFIX, name);
let value = serde_json::to_value(&part)?;
self.set_session_value(session_id, &key, value, None).await
}
pub async fn set_user_part_text(
&self,
session_id: &str,
name: &str,
text: &str,
) -> Result<(), ClientError> {
self.set_user_part(session_id, name, distri_types::Part::Text(text.to_string()))
.await
}
pub async fn set_user_part_image(
&self,
session_id: &str,
name: &str,
image: distri_types::FileType,
) -> Result<(), ClientError> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;
let key = format!("{}{}", Self::USER_PART_PREFIX, name);
let part = distri_types::Part::Image(image);
let value = serde_json::to_value(&part)?;
let json_bytes = serde_json::to_vec(&serde_json::json!({
"key": key,
"value": value
}))?;
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder
.write_all(&json_bytes)
.map_err(|e| ClientError::InvalidResponse(format!("gzip compression failed: {}", e)))?;
let compressed = encoder
.finish()
.map_err(|e| ClientError::InvalidResponse(format!("gzip finish failed: {}", e)))?;
tracing::debug!(
"Compressed image part: {} -> {} bytes ({:.1}% reduction)",
json_bytes.len(),
compressed.len(),
(1.0 - compressed.len() as f64 / json_bytes.len() as f64) * 100.0
);
let url = format!("{}/sessions/{}/values", self.base_url, session_id);
let resp = self
.http
.post(&url)
.header("Content-Type", "application/json")
.header("Content-Encoding", "gzip")
.body(compressed)
.send()
.await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to set user part image: {}",
text
)))
}
}
pub async fn delete_user_part(&self, session_id: &str, name: &str) -> Result<(), ClientError> {
let key = format!("{}{}", Self::USER_PART_PREFIX, name);
self.delete_session_value(session_id, &key).await
}
pub async fn clear_user_parts(&self, session_id: &str) -> Result<(), ClientError> {
let all_values = self.get_session_values(session_id).await?;
for key in all_values.keys() {
if key.starts_with(Self::USER_PART_PREFIX) {
self.delete_session_value(session_id, key).await?;
}
}
Ok(())
}
pub async fn issue_token(&self) -> Result<TokenResponse, ClientError> {
let url = format!("{}/token", self.base_url);
let resp = self
.http
.post(url)
.header(reqwest::header::CONTENT_LENGTH, "0")
.send()
.await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"failed to issue token: {}",
text
)));
}
let response: TokenResponse = serde_json::from_str(&resp.text().await?)?;
Ok(response)
}
pub async fn get_login_url(&self) -> Result<LoginUrlResponse, ClientError> {
let url = format!("{}/auth/login-url", self.base_url);
let resp = match self.http.get(&url).send().await {
Ok(r) => r,
Err(e) => {
return Err(ClientError::InvalidResponse(format!(
"Failed to connect to server at {}: {}",
url, e
)));
}
};
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"Server returned {} for {}: {}",
status, url, text
)));
}
resp.json().await.map_err(ClientError::from)
}
pub async fn create_api_key_with_token(
&self,
jwt_token: &str,
name: Option<String>,
ttl_hours: Option<i64>,
) -> Result<ApiKeyResponse, ClientError> {
#[derive(Serialize)]
struct CreateApiKeyRequest {
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
ttl_hours: Option<i64>,
}
let url = format!("{}/api-keys", self.base_url);
let payload = CreateApiKeyRequest { name, ttl_hours };
let resp = self
.http
.post(&url)
.header("Authorization", format!("Bearer {}", jwt_token))
.json(&payload)
.send()
.await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"Failed to create API key: {}",
text
)));
}
resp.json().await.map_err(ClientError::from)
}
pub async fn list_workspaces_with_token(
&self,
jwt_token: &str,
) -> Result<Vec<WorkspaceResponse>, ClientError> {
let url = format!("{}/workspaces", self.base_url);
let resp = self
.http
.get(&url)
.header("Authorization", format!("Bearer {}", jwt_token))
.send()
.await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"Failed to list workspaces: {}",
text
)));
}
resp.json().await.map_err(ClientError::from)
}
pub async fn get_workspace(
&self,
workspace_id: &str,
) -> Result<WorkspaceResponse, ClientError> {
let url = format!("{}/workspaces/{}", self.base_url, workspace_id);
let resp = self.http.get(&url).send().await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"Failed to get workspace: {}",
text
)));
}
resp.json().await.map_err(ClientError::from)
}
pub fn with_stream_client(mut self, stream: AgentStreamClient) -> Self {
self.stream = stream;
self
}
pub async fn invoke(
&self,
agent_id: &str,
messages: &[Message],
) -> Result<Vec<Message>, ClientError> {
let params = build_params(messages, true, None)?;
let rpc = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: Some(serde_json::json!(uuid::Uuid::new_v4().to_string())),
method: "message/send".to_string(),
params: serde_json::to_value(params)?,
};
let url = format!("{}/agents/{}", self.base_url, agent_id);
let resp = self
.http
.post(url)
.json(&rpc)
.send()
.await?
.error_for_status()?;
let body: serde_json::Value = resp.json().await?;
if let Some(err) = body
.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
{
return Err(ClientError::InvalidResponse(err.to_string()));
}
let Some(result) = body.get("result").cloned() else {
return Ok(Vec::new());
};
let kinds: Vec<MessageKind> =
if let Ok(single) = serde_json::from_value::<MessageKind>(result.clone()) {
vec![single]
} else if let Ok(list) = serde_json::from_value::<Vec<MessageKind>>(result) {
list
} else {
return Err(ClientError::InvalidResponse(
"Unexpected response format from message/send".into(),
));
};
kinds
.into_iter()
.filter_map(|k| match convert_kind(k) {
Ok(Some(msg)) => Some(Ok(msg)),
Ok(None) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub async fn invoke_stream<H, Fut>(
&self,
agent_id: &str,
messages: &[Message],
on_event: H,
) -> Result<(), StreamError>
where
H: FnMut(StreamItem) -> Fut,
Fut: std::future::Future<Output = ()> + Send,
{
let params = build_params(messages, false, None)
.map_err(|e| StreamError::InvalidResponse(e.to_string()))?;
self.stream
.stream_agent(agent_id, params, on_event)
.await
}
pub async fn invoke_with_options(
&self,
agent_id: &str,
messages: &[Message],
options: InvokeOptions,
) -> Result<Vec<Message>, ClientError> {
let params = build_params(messages, true, Some(&options))?;
let rpc = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: Some(serde_json::json!(uuid::Uuid::new_v4().to_string())),
method: "message/send".to_string(),
params: serde_json::to_value(params)?,
};
let url = format!("{}/agents/{}", self.base_url, agent_id);
let resp = self
.http
.post(url)
.json(&rpc)
.send()
.await?
.error_for_status()?;
let body: serde_json::Value = resp.json().await?;
if let Some(err) = body
.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
{
return Err(ClientError::InvalidResponse(err.to_string()));
}
let Some(result) = body.get("result").cloned() else {
return Ok(Vec::new());
};
let kinds: Vec<MessageKind> =
if let Ok(single) = serde_json::from_value::<MessageKind>(result.clone()) {
vec![single]
} else if let Ok(list) = serde_json::from_value::<Vec<MessageKind>>(result) {
list
} else {
return Err(ClientError::InvalidResponse(
"Unexpected response format from message/send".into(),
));
};
kinds
.into_iter()
.filter_map(|k| match convert_kind(k) {
Ok(Some(msg)) => Some(Ok(msg)),
Ok(None) => None,
Err(e) => Some(Err(e)),
})
.collect()
}
pub async fn invoke_stream_with_options<H, Fut>(
&self,
agent_id: &str,
messages: &[Message],
options: InvokeOptions,
on_event: H,
) -> Result<(), StreamError>
where
H: FnMut(StreamItem) -> Fut,
Fut: std::future::Future<Output = ()> + Send,
{
let params = build_params(messages, false, Some(&options))
.map_err(|e| StreamError::InvalidResponse(e.to_string()))?;
self.stream
.stream_agent(agent_id, params, on_event)
.await
}
pub async fn call_tool(
&self,
tool_call: &ToolCall,
session_id: Option<String>,
metadata: Option<Value>,
) -> Result<Value, ClientError> {
let payload = ToolCallRequest {
tool_name: tool_call.tool_name.clone(),
input: tool_call.input.clone(),
session_id,
metadata,
};
let url = format!("{}/tools/call", self.base_url);
let resp = self.http.post(url).json(&payload).send().await?;
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if !status.is_success() {
return Err(ClientError::InvalidResponse(format!(
"toolcall failed {}: {}",
status, body
)));
}
serde_json::from_str(&body).map_err(ClientError::Serialization)
}
pub async fn llm_execute(
&self,
options: LlmExecuteOptions,
) -> Result<LlmExecuteResponse, ClientError> {
let payload = LlmExecuteRequest {
messages: options.context.messages,
tools: options.tools,
thread_id: options.context.thread_id,
parent_task_id: options.context.task_id,
run_id: options.context.run_id,
model_settings: options.llm_def.and_then(|d| d.model_settings.clone()),
is_sub_task: options.is_sub_task,
headers: options.headers,
agent_id: options.agent_id,
load_history: options.load_history,
};
let url = format!("{}/llm/execute", self.base_url);
let resp = self.http.post(url).json(&payload).send().await?;
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if !status.is_success() {
return Err(ClientError::InvalidResponse(format!(
"llm_execute failed {}: {}",
status, body
)));
}
serde_json::from_str(&body).map_err(ClientError::Serialization)
}
pub async fn list_artifact_namespaces(&self) -> Result<ArtifactNamespaceList, ClientError> {
let url = format!("{}/artifacts", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list artifact namespaces: {}",
text
)))
}
}
pub async fn get_task_artifact_id(
&self,
thread_id: &str,
task_id: &str,
) -> Result<TaskNamespaceResponse, ClientError> {
let url = format!("{}/artifacts/task/{}/{}", self.base_url, thread_id, task_id);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get task namespace: {}",
text
)))
}
}
pub async fn list_artifacts(
&self,
artifact_id: &str,
) -> Result<ArtifactListResponse, ClientError> {
let url = format!("{}/artifacts/{}", self.base_url, artifact_id);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list artifacts: {}",
text
)))
}
}
pub async fn list_task_artifacts(
&self,
thread_id: &str,
task_id: &str,
) -> Result<ArtifactListResponse, ClientError> {
let namespace = self.get_task_artifact_id(thread_id, task_id).await?;
self.list_artifacts(&namespace.artifact_id).await
}
pub async fn read_artifact(
&self,
artifact_id: &str,
filename: &str,
start_line: Option<u64>,
end_line: Option<u64>,
) -> Result<ArtifactReadResponse, ClientError> {
let mut url = format!(
"{}/artifacts/{}/content/{}",
self.base_url, artifact_id, filename
);
let mut query_parts = Vec::new();
if let Some(start) = start_line {
query_parts.push(format!("start_line={}", start));
}
if let Some(end) = end_line {
query_parts.push(format!("end_line={}", end));
}
if !query_parts.is_empty() {
url.push('?');
url.push_str(&query_parts.join("&"));
}
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else if resp.status() == reqwest::StatusCode::NOT_FOUND {
Err(ClientError::InvalidResponse(format!(
"artifact not found: {}",
filename
)))
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to read artifact: {}",
text
)))
}
}
pub async fn save_artifact_with_namespace(
&self,
namespace: &distri_types::ArtifactNamespace,
filename: &str,
content: &str,
) -> Result<ArtifactSaveResponse, ClientError> {
let artifact_id = namespace.thread_path();
self.save_artifact(&artifact_id, filename, content).await
}
pub async fn save_artifact(
&self,
artifact_id: &str,
filename: &str,
content: &str,
) -> Result<ArtifactSaveResponse, ClientError> {
let url = format!(
"{}/artifacts/{}/content/{}",
self.base_url, artifact_id, filename
);
let resp = self
.http
.put(&url)
.json(&serde_json::json!({ "content": content }))
.send()
.await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to save artifact: {}",
text
)))
}
}
pub async fn delete_artifact(
&self,
artifact_id: &str,
filename: &str,
) -> Result<(), ClientError> {
let url = format!(
"{}/artifacts/{}/content/{}",
self.base_url, artifact_id, filename
);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete artifact: {}",
text
)))
}
}
pub async fn search_artifacts(
&self,
artifact_id: &str,
pattern: &str,
) -> Result<Value, ClientError> {
let url = format!("{}/artifacts/{}/search", self.base_url, artifact_id);
let resp = self
.http
.post(&url)
.json(&serde_json::json!({ "pattern": pattern }))
.send()
.await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to search artifacts: {}",
text
)))
}
}
pub async fn list_prompt_templates(&self) -> Result<Vec<PromptTemplateResponse>, ClientError> {
let url = format!("{}/prompts", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list prompt templates: {}",
text
)))
}
}
pub async fn upsert_prompt_template(
&self,
template: &NewPromptTemplateRequest,
) -> Result<PromptTemplateResponse, ClientError> {
let url = format!("{}/prompts", self.base_url);
let resp = self.http.post(&url).json(template).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to upsert prompt template: {}",
text
)))
}
}
pub async fn sync_prompt_templates(
&self,
templates: &[NewPromptTemplateRequest],
) -> Result<SyncPromptTemplatesResponse, ClientError> {
let url = format!("{}/prompts/sync", self.base_url);
let resp = self
.http
.post(&url)
.json(&serde_json::json!({ "templates": templates }))
.send()
.await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to sync prompt templates: {}",
text
)))
}
}
pub async fn delete_prompt_template(&self, template_id: &str) -> Result<(), ClientError> {
let url = format!("{}/prompts/{}", self.base_url, template_id);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete prompt template: {}",
text
)))
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewPromptTemplateRequest {
pub name: String,
pub template: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromptTemplateResponse {
pub id: String,
pub name: String,
pub template: String,
pub description: Option<String>,
pub version: Option<String>,
pub is_system: bool,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncPromptTemplatesResponse {
pub created: usize,
pub updated: usize,
pub templates: Vec<PromptTemplateResponse>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArtifactNamespaceList {
pub namespaces: Vec<ArtifactNamespace>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArtifactNamespace {
pub artifact_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub artifact_count: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskNamespaceResponse {
pub artifact_id: String,
pub thread_id: String,
pub task_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArtifactListResponse {
pub artifact_id: String,
pub artifacts: Vec<ArtifactEntry>,
pub content_path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArtifactEntry {
pub filename: String,
pub is_file: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub size: Option<u64>,
pub read_path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArtifactReadResponse {
pub content: String,
pub start_line: u64,
pub end_line: u64,
pub total_lines: u64,
pub filename: String,
pub artifact_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArtifactSaveResponse {
pub success: bool,
pub filename: String,
pub artifact_id: String,
pub size: usize,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct InvokeOptions {
#[serde(skip_serializing_if = "Option::is_none")]
pub dynamic_sections: Option<Vec<PromptSection>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dynamic_values: Option<HashMap<String, serde_json::Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
impl InvokeOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_dynamic_sections(mut self, sections: Vec<PromptSection>) -> Self {
self.dynamic_sections = Some(sections);
self
}
pub fn with_dynamic_values(mut self, values: HashMap<String, serde_json::Value>) -> Self {
self.dynamic_values = Some(values);
self
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = Some(metadata);
self
}
}
fn build_params(
messages: &[Message],
blocking: bool,
options: Option<&InvokeOptions>,
) -> Result<MessageSendParams, ClientError> {
let last = messages
.last()
.ok_or_else(|| ClientError::InvalidResponse("no messages provided".into()))?;
let parts = last
.parts
.iter()
.cloned()
.map(|p| p.into())
.collect::<Vec<_>>();
let a2a_message = A2aMessage {
kind: EventKind::Message,
message_id: last.id.clone(),
role: to_a2a_role(&last.role),
parts,
context_id: None,
task_id: None,
reference_task_ids: vec![],
extensions: vec![],
metadata: serde_json::to_value(MessageMetadata::from(last.clone())).ok(),
};
let configuration = if blocking {
Some(MessageSendConfiguration {
accepted_output_modes: vec![],
blocking: true,
history_length: None,
push_notification_config: None,
})
} else {
None
};
let metadata = options.and_then(|opts| {
let mut meta = opts
.metadata
.as_ref()
.and_then(|m| m.as_object().cloned())
.unwrap_or_default();
if let Some(sections) = &opts.dynamic_sections
&& let Ok(val) = serde_json::to_value(sections) {
meta.insert("dynamic_sections".to_string(), val);
}
if let Some(values) = &opts.dynamic_values
&& let Ok(val) = serde_json::to_value(values) {
meta.insert("dynamic_values".to_string(), val);
}
if meta.is_empty() {
None
} else {
Some(serde_json::Value::Object(meta))
}
});
Ok(MessageSendParams {
message: a2a_message,
configuration,
metadata,
})
}
fn to_a2a_role(role: &MessageRole) -> Role {
match role {
MessageRole::User => Role::User,
_ => Role::Agent,
}
}
fn convert_kind(kind: MessageKind) -> Result<Option<Message>, ClientError> {
match kind {
MessageKind::Message(msg) => distri_types::Message::try_from(msg)
.map(Some)
.map_err(|e| ClientError::InvalidResponse(e.to_string())),
_ => Ok(None),
}
}
#[derive(Debug, Serialize)]
struct ToolCallRequest {
tool_name: String,
input: Value,
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
metadata: Option<Value>,
}
#[derive(Debug, Clone, Default)]
pub struct LlmExecuteOptions {
pub llm_def: Option<LlmDefinition>,
pub context: LLmContext,
pub tools: Vec<ExternalTool>,
pub headers: Option<HashMap<String, String>>,
pub is_sub_task: bool,
pub agent_id: Option<String>,
pub load_history: bool,
}
impl LlmExecuteOptions {
pub fn new(context: LLmContext) -> Self {
Self {
context,
load_history: true,
..Default::default()
}
}
pub fn with_llm_def(mut self, llm_def: LlmDefinition) -> Self {
self.llm_def = Some(llm_def);
self
}
pub fn with_tools(mut self, tools: Vec<ExternalTool>) -> Self {
self.tools = tools;
self
}
pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
self.headers = Some(headers);
self
}
pub fn with_agent_id(mut self, agent_id: String) -> Self {
self.agent_id = Some(agent_id);
self
}
pub fn with_load_history(mut self, load_history: bool) -> Self {
self.load_history = load_history;
self
}
pub fn is_sub_task(mut self, is_sub_task: bool) -> Self {
self.is_sub_task = is_sub_task;
self
}
}
#[derive(Debug, Serialize)]
struct LlmExecuteRequest {
messages: Vec<Message>,
#[serde(default)]
tools: Vec<ExternalTool>,
#[serde(default)]
thread_id: Option<String>,
#[serde(default)]
parent_task_id: Option<String>,
#[serde(default)]
run_id: Option<String>,
#[serde(default)]
model_settings: Option<distri_types::ModelSettings>,
#[serde(default)]
is_sub_task: bool,
#[serde(default)]
headers: Option<HashMap<String, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
agent_id: Option<String>,
#[serde(default = "default_load_history")]
load_history: bool,
}
#[allow(dead_code)]
fn default_load_history() -> bool {
true
}
#[derive(Debug, Clone, Deserialize)]
pub struct LlmExecuteResponse {
pub finish_reason: String,
#[serde(default)]
pub content: String,
#[serde(default)]
pub tool_calls: Vec<ToolCall>,
#[serde(default)]
pub usage: Option<distri_types::TokenUsage>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PluginResponse {
pub id: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub version: Option<String>,
pub code: String,
#[serde(default)]
pub schemas: Option<PluginSchemas>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub is_public: bool,
#[serde(default)]
pub is_system: bool,
#[serde(default)]
pub is_owner: bool,
#[serde(default)]
pub star_count: i32,
#[serde(default)]
pub clone_count: i32,
#[serde(default)]
pub is_starred: bool,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PluginSchemas {
#[serde(default)]
pub tools: Vec<ToolSchema>,
#[serde(default)]
pub workflows: Vec<WorkflowSchema>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolSchema {
pub name: String,
pub description: String,
pub parameters: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowSchema {
pub name: String,
#[serde(default)]
pub description: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PluginListItemResponse {
pub id: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub version: Option<String>,
#[serde(default)]
pub schemas: Option<PluginSchemas>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub is_public: bool,
#[serde(default)]
pub is_system: bool,
#[serde(default)]
pub is_owner: bool,
#[serde(default)]
pub star_count: i32,
#[serde(default)]
pub clone_count: i32,
#[serde(default)]
pub is_starred: bool,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct CreatePluginRequest {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub code: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub tags: Vec<String>,
#[serde(default)]
pub is_public: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct UpdatePluginRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_public: Option<bool>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ValidatePluginResponse {
pub valid: bool,
#[serde(default)]
pub error: Option<String>,
#[serde(default)]
pub tools: Vec<String>,
#[serde(default)]
pub workflows: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PluginsListResponse {
pub plugins: Vec<PluginListItemResponse>,
}
impl Distri {
pub async fn list_plugins(&self) -> Result<Vec<PluginListItemResponse>, ClientError> {
let url = format!("{}/plugins", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
let list: PluginsListResponse = resp.json().await?;
Ok(list.plugins)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list plugins: {}",
text
)))
}
}
pub async fn get_plugin(&self, id: &str) -> Result<Option<PluginResponse>, ClientError> {
let url = format!("{}/plugins/{}", self.base_url, id);
let resp = self.http.get(&url).send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if resp.status().is_success() {
Ok(Some(resp.json().await?))
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get plugin: {}",
text
)))
}
}
pub async fn create_plugin(
&self,
request: &CreatePluginRequest,
) -> Result<PluginResponse, ClientError> {
let url = format!("{}/plugins", self.base_url);
let resp = self.http.post(&url).json(request).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to create plugin: {}",
text
)))
}
}
pub async fn update_plugin(
&self,
id: &str,
request: &UpdatePluginRequest,
) -> Result<PluginResponse, ClientError> {
let url = format!("{}/plugins/{}", self.base_url, id);
let resp = self.http.put(&url).json(request).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to update plugin: {}",
text
)))
}
}
pub async fn delete_plugin(&self, id: &str) -> Result<(), ClientError> {
let url = format!("{}/plugins/{}", self.base_url, id);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete plugin: {}",
text
)))
}
}
pub async fn validate_plugin(&self, code: &str) -> Result<ValidatePluginResponse, ClientError> {
let url = format!("{}/plugins/validate", self.base_url);
let resp = self
.http
.post(&url)
.json(&serde_json::json!({ "code": code }))
.send()
.await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to validate plugin: {}",
text
)))
}
}
pub async fn upsert_plugin(
&self,
request: &CreatePluginRequest,
) -> Result<PluginResponse, ClientError> {
let plugins = self.list_plugins().await?;
let existing = plugins.iter().find(|p| p.name == request.name);
if let Some(plugin) = existing {
let update = UpdatePluginRequest {
name: Some(request.name.clone()),
description: request.description.clone(),
code: Some(request.code.clone()),
metadata: request.metadata.clone(),
tags: Some(request.tags.clone()),
is_public: Some(request.is_public),
};
self.update_plugin(&plugin.id, &update).await
} else {
self.create_plugin(request).await
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SkillResponse {
pub id: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
pub content: String,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub is_public: bool,
#[serde(default)]
pub is_system: bool,
#[serde(default)]
pub is_owner: bool,
#[serde(default)]
pub star_count: i32,
#[serde(default)]
pub clone_count: i32,
#[serde(default)]
pub is_starred: bool,
#[serde(default)]
pub scripts: Vec<SkillScriptResponse>,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SkillScriptResponse {
pub id: String,
pub skill_id: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
pub code: String,
pub language: String,
pub created_at: String,
pub updated_at: String,
}
pub type SkillListItemResponse = distri_types::stores::SkillListItem;
pub type SkillsListResponse = distri_types::stores::SkillsListResponse;
#[derive(Debug, Clone, Serialize)]
pub struct CreateSkillRequest {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub content: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub tags: Vec<String>,
#[serde(default)]
pub is_public: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub scripts: Vec<CreateSkillScriptRequest>,
}
#[derive(Debug, Clone, Serialize)]
pub struct CreateSkillScriptRequest {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub code: String,
pub language: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct UpdateSkillRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_public: Option<bool>,
}
impl Distri {
pub async fn list_skills(&self) -> Result<Vec<SkillListItemResponse>, ClientError> {
let url = format!("{}/skills", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
let wrapper: SkillsListResponse = resp.json().await?;
Ok(wrapper.skills)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list skills: {}",
text
)))
}
}
pub async fn get_skill(&self, id: &str) -> Result<Option<SkillResponse>, ClientError> {
let url = format!("{}/skills/{}", self.base_url, id);
let resp = self.http.get(&url).send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if resp.status().is_success() {
Ok(Some(resp.json().await?))
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get skill: {}",
text
)))
}
}
pub async fn create_skill(
&self,
request: &CreateSkillRequest,
) -> Result<SkillResponse, ClientError> {
let url = format!("{}/skills", self.base_url);
let resp = self.http.post(&url).json(request).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to create skill: {}",
text
)))
}
}
pub async fn update_skill(
&self,
id: &str,
request: &UpdateSkillRequest,
) -> Result<SkillResponse, ClientError> {
let url = format!("{}/skills/{}", self.base_url, id);
let resp = self.http.put(&url).json(request).send().await?;
if resp.status().is_success() {
resp.json().await.map_err(ClientError::from)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to update skill: {}",
text
)))
}
}
pub async fn delete_skill(&self, id: &str) -> Result<(), ClientError> {
let url = format!("{}/skills/{}", self.base_url, id);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete skill: {}",
text
)))
}
}
pub async fn upsert_skill(
&self,
request: &CreateSkillRequest,
) -> Result<SkillResponse, ClientError> {
if let Ok(skills) = self.list_skills().await
&& let Some(skill) = skills.iter().find(|s| s.name == request.name) {
let update = UpdateSkillRequest {
name: Some(request.name.clone()),
description: request.description.clone(),
content: Some(request.content.clone()),
tags: Some(request.tags.clone()),
is_public: Some(request.is_public),
};
return self.update_skill(&skill.id, &update).await;
}
self.create_skill(request).await
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentListItem {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub id: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConnectionSummary {
pub id: String,
pub name: String,
#[serde(default)]
pub status: Option<String>,
#[serde(default)]
pub config: Option<serde_json::Value>,
#[serde(default)]
pub created_at: Option<String>,
#[serde(default)]
pub updated_at: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConnectionToken {
pub access_token: String,
#[serde(default)]
pub token_type: Option<String>,
#[serde(default)]
pub expires_at: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ProviderInfo {
pub name: String,
#[serde(default)]
pub available: bool,
#[serde(default)]
pub scopes_supported: Vec<String>,
#[serde(default)]
pub default_scopes: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConnectResponse {
#[serde(default)]
pub connection_id: Option<String>,
#[serde(default)]
pub auth_url: Option<String>,
#[serde(rename = "type", default)]
pub response_type: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SecretEntry {
pub id: String,
pub key: String,
pub masked_value: String,
#[serde(default)]
pub updated_at: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct NewSecretRequest {
pub key: String,
pub value: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ThreadSummary {
pub id: String,
#[serde(default)]
pub title: Option<String>,
#[serde(default)]
pub agent_id: Option<String>,
#[serde(default)]
pub agent_name: Option<String>,
#[serde(default)]
pub updated_at: Option<String>,
#[serde(default)]
pub message_count: Option<u32>,
#[serde(default)]
pub last_message: Option<String>,
}
impl Distri {
pub async fn list_agents(&self) -> Result<Vec<AgentListItem>, ClientError> {
let url = format!("{}/agents", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list agents: {}",
text
)))
}
}
pub async fn delete_agent(&self, id: &str) -> Result<(), ClientError> {
let url = format!("{}/agents/{}", self.base_url, id);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() {
Ok(())
} else if resp.status() == reqwest::StatusCode::NOT_FOUND {
Err(ClientError::InvalidResponse("agent not found".to_string()))
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete agent: {}",
text
)))
}
}
pub async fn list_connections(&self) -> Result<Vec<ConnectionSummary>, ClientError> {
let url = format!("{}/connections", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list connections: {}",
text
)))
}
}
pub async fn list_connections_with_skills(&self) -> Result<serde_json::Value, ClientError> {
let url = format!("{}/connections?include_skills=true", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list connections: {}",
text
)))
}
}
pub async fn get_connection_detail(
&self,
connection_id: &str,
) -> Result<serde_json::Value, ClientError> {
let url = format!("{}/connections/{}/detail", self.base_url, connection_id);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get connection detail: {}",
text
)))
}
}
pub async fn get_connection_token(
&self,
connection_id: &str,
) -> Result<ConnectionToken, ClientError> {
let url = format!("{}/connections/{}/token", self.base_url, connection_id);
let resp = self.http.post(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get connection token for {}: {}",
connection_id, text
)))
}
}
pub async fn delete_connection(&self, connection_id: &str) -> Result<(), ClientError> {
let url = format!("{}/connections/{}", self.base_url, connection_id);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete connection {}: {}",
connection_id, text
)))
}
}
pub async fn list_providers(&self) -> Result<Vec<ProviderInfo>, ClientError> {
let url = format!("{}/connections/providers", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list providers: {}",
text
)))
}
}
pub async fn register_connection_provider(
&self,
provider: serde_json::Value,
client_id: &str,
client_secret: &str,
) -> Result<serde_json::Value, ClientError> {
let provider_id = provider
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("custom");
let payload = serde_json::json!({
"provider_id": provider_id,
"secrets": {
format!("{}_CLIENT_ID", provider_id.to_uppercase()): client_id,
format!("{}_CLIENT_SECRET", provider_id.to_uppercase()): client_secret,
},
"connection_provider": provider,
});
let url = format!("{}/providers", self.base_url);
let resp = self.http.post(&url).json(&payload).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to register connection provider: {}",
text
)))
}
}
pub async fn get_default_model(&self) -> Result<Option<String>, ClientError> {
let url = format!("{}/providers/default-model", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
let body: serde_json::Value = resp.json().await?;
Ok(body
.get("default_model")
.and_then(|v| v.as_str())
.map(|s| s.to_string()))
} else {
Ok(None)
}
}
pub async fn list_connection_providers(&self) -> Result<serde_json::Value, ClientError> {
let url = format!("{}/workspaces/current", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
let body: serde_json::Value = resp.json().await?;
let providers = body
.get("settings")
.and_then(|s| s.get("connection_providers"))
.cloned()
.unwrap_or(serde_json::json!([]));
Ok(providers)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get workspace settings: {}",
text
)))
}
}
pub async fn discover_skills(&self, query: &str) -> Result<serde_json::Value, ClientError> {
let url = format!(
"{}/skills/discover?query={}",
self.base_url,
urlencoding::encode(query)
);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to discover skills: {}",
text
)))
}
}
pub async fn import_skill(
&self,
url: &str,
name: Option<&str>,
) -> Result<serde_json::Value, ClientError> {
let api_url = format!("{}/skills/import", self.base_url);
let mut payload = serde_json::json!({ "url": url });
if let Some(n) = name {
payload["name"] = serde_json::Value::String(n.to_string());
}
let resp = self.http.post(&api_url).json(&payload).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to import skill: {}",
text
)))
}
}
pub async fn connect(
&self,
provider: &str,
scopes: &[String],
) -> Result<ConnectResponse, ClientError> {
let url = format!("{}/connections", self.base_url);
let payload = serde_json::json!({
"auth_type": "oauth",
"auth": {
"provider": provider,
"scopes": scopes,
}
});
let resp = self.http.post(&url).json(&payload).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to initiate connection for {}: {}",
provider, text
)))
}
}
pub async fn list_secrets(&self) -> Result<Vec<SecretEntry>, ClientError> {
let url = format!("{}/secrets", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list secrets: {}",
text
)))
}
}
pub async fn get_secret(&self, key: &str) -> Result<Option<SecretEntry>, ClientError> {
let url = format!("{}/secrets/{}", self.base_url, key);
let resp = self.http.get(&url).send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if resp.status().is_success() {
Ok(Some(resp.json().await?))
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get secret {}: {}",
key, text
)))
}
}
pub async fn set_secret(&self, request: &NewSecretRequest) -> Result<SecretEntry, ClientError> {
let url = format!("{}/secrets", self.base_url);
let resp = self.http.post(&url).json(request).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to set secret: {}",
text
)))
}
}
pub async fn delete_secret(&self, key: &str) -> Result<(), ClientError> {
let url = format!("{}/secrets/{}", self.base_url, key);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete secret {}: {}",
key, text
)))
}
}
pub async fn list_notes(
&self,
tag: Option<&str>,
search: Option<&str>,
) -> Result<Value, ClientError> {
let mut url = format!("{}/notes", self.base_url);
let mut params = vec![];
if let Some(t) = tag {
params.push(format!("tag={}", t));
}
if let Some(s) = search {
params.push(format!("search={}", s));
}
if !params.is_empty() {
url = format!("{}?{}", url, params.join("&"));
}
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list notes: {}",
text
)))
}
}
pub async fn create_note(
&self,
title: &str,
content: &str,
tags: &[String],
) -> Result<Value, ClientError> {
let url = format!("{}/notes", self.base_url);
let body = serde_json::json!({ "title": title, "content": content, "tags": tags });
let resp = self.http.post(&url).json(&body).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to create note: {}",
text
)))
}
}
pub async fn get_note(&self, id: &str) -> Result<Option<Value>, ClientError> {
let url = format!("{}/notes/{}", self.base_url, id);
let resp = self.http.get(&url).send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if resp.status().is_success() {
Ok(Some(resp.json().await?))
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get note: {}",
text
)))
}
}
pub async fn update_note(
&self,
id: &str,
title: Option<&str>,
content: Option<&str>,
tags: Option<&[String]>,
) -> Result<Value, ClientError> {
let url = format!("{}/notes/{}", self.base_url, id);
let mut body = serde_json::Map::new();
if let Some(t) = title {
body.insert("title".to_string(), serde_json::json!(t));
}
if let Some(c) = content {
body.insert("content".to_string(), serde_json::json!(c));
}
if let Some(tg) = tags {
body.insert("tags".to_string(), serde_json::json!(tg));
}
let resp = self
.http
.put(&url)
.json(&Value::Object(body))
.send()
.await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to update note: {}",
text
)))
}
}
pub async fn delete_note(&self, id: &str) -> Result<(), ClientError> {
let url = format!("{}/notes/{}", self.base_url, id);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete note: {}",
text
)))
}
}
pub async fn list_threads(&self) -> Result<Vec<ThreadSummary>, ClientError> {
let url = format!("{}/threads", self.base_url);
let resp = self.http.get(&url).send().await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"failed to list threads: {}",
text
)));
}
let body: serde_json::Value = resp.json().await?;
let arr = if let Some(threads) = body.get("threads") {
threads.clone()
} else {
body
};
serde_json::from_value(arr).map_err(|e| {
ClientError::InvalidResponse(format!("failed to parse threads: {}", e))
})
}
pub async fn get_thread_messages(
&self,
thread_id: &str,
messages_only: bool,
) -> Result<Vec<distri_types::TaskMessage>, ClientError> {
let mut url = format!("{}/threads/{}/messages", self.base_url, thread_id);
if messages_only {
url.push_str("?filter=Messages");
}
let resp = self.http.get(&url).send().await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(ClientError::InvalidResponse(format!(
"failed to get thread messages: {}",
text
)));
}
let raw: Vec<serde_json::Value> = resp.json().await?;
let items = raw
.into_iter()
.filter_map(|v| {
let mk: distri_a2a::MessageKind = serde_json::from_value(v).ok()?;
distri_types::TaskMessage::try_from(mk).ok()
})
.collect();
Ok(items)
}
pub async fn list_workflows(
&self,
) -> Result<distri_types::stores::WorkflowsListResponse, ClientError> {
let url = format!("{}/workflows", self.base_url);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to list workflows: {}",
text
)))
}
}
pub async fn get_workflow(
&self,
id: &str,
) -> Result<distri_types::stores::WorkflowRecord, ClientError> {
let url = format!("{}/workflows/{}", self.base_url, id);
let resp = self.http.get(&url).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to get workflow: {}",
text
)))
}
}
pub async fn create_workflow(
&self,
workflow: distri_types::stores::NewWorkflow,
) -> Result<distri_types::stores::WorkflowRecord, ClientError> {
let url = format!("{}/workflows", self.base_url);
let resp = self.http.post(&url).json(&workflow).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to create workflow: {}",
text
)))
}
}
pub async fn update_workflow(
&self,
id: &str,
update: distri_types::stores::UpdateWorkflow,
) -> Result<distri_types::stores::WorkflowRecord, ClientError> {
let url = format!("{}/workflows/{}", self.base_url, id);
let resp = self.http.put(&url).json(&update).send().await?;
if resp.status().is_success() {
Ok(resp.json().await?)
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to update workflow: {}",
text
)))
}
}
pub async fn delete_workflow(&self, id: &str) -> Result<(), ClientError> {
let url = format!("{}/workflows/{}", self.base_url, id);
let resp = self.http.delete(&url).send().await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NO_CONTENT {
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
Err(ClientError::InvalidResponse(format!(
"failed to delete workflow: {}",
text
)))
}
}
pub async fn push_workflow(
&self,
name: &str,
definition: serde_json::Value,
) -> Result<distri_types::stores::WorkflowRecord, ClientError> {
let new = distri_types::stores::NewWorkflow {
name: name.to_string(),
description: None,
definition,
tags: vec![],
is_public: false,
is_template: false,
};
self.create_workflow(new).await
}
}