pub mod contract;
pub mod learn;
pub mod config;
pub mod error;
pub mod transport;
pub mod domains;
pub mod helpers;
pub mod proto {
pub mod mubit {
pub mod v1 {
tonic::include_proto!("mubit.v1");
}
}
}
use crate::contract::{find_operation, HttpMethod, OperationSpec};
use reqwest::header::CONTENT_TYPE;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::{json, Map, Value};
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio_stream::{Stream, StreamExt};
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
use tonic::{Code, Request, Status};
use url::Url;
pub type ValueStream = Pin<Box<dyn Stream<Item = Result<Value>> + Send>>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TransportMode {
Auto,
Grpc,
Http,
}
impl TransportMode {
fn normalize(raw: &str) -> Self {
match raw.trim().to_lowercase().as_str() {
"grpc" => Self::Grpc,
"http" => Self::Http,
_ => Self::Auto,
}
}
}
const DEFAULT_SHARED_HTTP_ENDPOINT: &str = "https://api.mubit.ai";
const DEFAULT_SHARED_GRPC_ENDPOINT: &str = "grpc.api.mubit.ai:443";
fn env_non_empty(name: &str) -> Option<String> {
std::env::var(name)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
#[derive(Clone, Debug)]
pub struct ClientConfig {
pub endpoint: String,
pub grpc_endpoint: Option<String>,
pub http_endpoint: Option<String>,
pub transport: TransportMode,
pub api_key: Option<String>,
pub token: Option<String>, pub run_id: Option<String>,
pub timeout_ms: u64,
}
impl ClientConfig {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
grpc_endpoint: None,
http_endpoint: None,
transport: TransportMode::Auto,
api_key: None,
token: None,
run_id: None,
timeout_ms: 30_000,
}
}
pub fn transport(mut self, transport: impl AsRef<str>) -> Self {
self.transport = TransportMode::normalize(transport.as_ref());
self
}
pub fn api_key(mut self, api_key: impl Into<String>) -> Self {
self.api_key = Some(api_key.into());
self
}
pub fn token(mut self, token: impl Into<String>) -> Self {
self.api_key = Some(token.into());
self
}
pub fn run_id(mut self, run_id: impl Into<String>) -> Self {
self.run_id = Some(run_id.into());
self
}
pub fn from_env() -> Self {
Self::default()
}
}
impl Default for ClientConfig {
fn default() -> Self {
let transport = env_non_empty("MUBIT_TRANSPORT")
.map(|value| TransportMode::normalize(&value))
.unwrap_or(TransportMode::Auto);
let endpoint = env_non_empty("MUBIT_ENDPOINT")
.unwrap_or_else(|| DEFAULT_SHARED_HTTP_ENDPOINT.to_string());
let mut config = Self::new(endpoint);
config.transport = transport;
config.http_endpoint = env_non_empty("MUBIT_HTTP_ENDPOINT");
config.grpc_endpoint = env_non_empty("MUBIT_GRPC_ENDPOINT");
config.api_key = env_non_empty("MUBIT_API_KEY");
config.token = env_non_empty("MUBIT_TOKEN");
config.run_id = env_non_empty("MUBIT_RUN_ID");
if config.http_endpoint.is_none() {
config.http_endpoint = Some(DEFAULT_SHARED_HTTP_ENDPOINT.to_string());
}
if config.grpc_endpoint.is_none() {
config.grpc_endpoint = Some(DEFAULT_SHARED_GRPC_ENDPOINT.to_string());
}
config
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TransportFailureKind {
Unavailable,
ConnectionReset,
DeadlineExceeded,
Io,
Unimplemented,
Other,
}
#[derive(Error, Debug)]
pub enum SdkError {
#[error("AuthError: {0}")]
AuthError(String),
#[error("ValidationError: {0}")]
ValidationError(String),
#[error("TransportError({kind:?}): {message}")]
TransportError {
kind: TransportFailureKind,
message: String,
},
#[error("ServerError: {0}")]
ServerError(String),
#[error("UnsupportedFeatureError: {0}")]
UnsupportedFeatureError(String),
}
impl SdkError {
fn is_fallback_eligible(&self) -> bool {
matches!(
self,
SdkError::TransportError {
kind: TransportFailureKind::Unavailable
| TransportFailureKind::ConnectionReset
| TransportFailureKind::DeadlineExceeded
| TransportFailureKind::Io
| TransportFailureKind::Unimplemented,
..
}
)
}
fn is_retryable(&self) -> bool {
match self {
SdkError::TransportError { kind, .. } => matches!(
kind,
TransportFailureKind::Unavailable
| TransportFailureKind::ConnectionReset
| TransportFailureKind::DeadlineExceeded
| TransportFailureKind::Io
),
SdkError::ServerError(_) => true,
SdkError::AuthError(_)
| SdkError::ValidationError(_)
| SdkError::UnsupportedFeatureError(_) => false,
}
}
}
fn retry_env_u64(name: &str, default: u64, minimum: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|v| v.trim().parse::<u64>().ok())
.map(|v| v.max(minimum))
.unwrap_or(default)
}
fn retry_env_f64(name: &str, default: f64, minimum: f64) -> f64 {
std::env::var(name)
.ok()
.and_then(|v| v.trim().parse::<f64>().ok())
.map(|v| v.max(minimum))
.unwrap_or(default)
}
fn retry_attempts() -> u32 {
retry_env_u64("MUBIT_RETRY_ATTEMPTS", 3, 1) as u32
}
fn retry_base_ms() -> u64 {
retry_env_u64("MUBIT_RETRY_BASE_MS", 200, 10)
}
fn retry_cap_ms() -> u64 {
retry_env_u64("MUBIT_RETRY_CAP_MS", 5000, retry_base_ms())
}
fn retry_jitter() -> f64 {
retry_env_f64("MUBIT_RETRY_JITTER", 0.2, 0.0)
}
fn backoff_delay_ms(attempt: u32) -> Duration {
if attempt <= 1 {
return Duration::ZERO;
}
let base = retry_base_ms();
let cap = retry_cap_ms();
let exp = std::cmp::min(base.saturating_mul(1u64 << (attempt - 2)), cap) as f64;
let j = retry_jitter();
let ms = if j > 0.0 {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let frac = (nanos as f64 / 1_000_000_000.0) * 2.0 - 1.0;
(exp * (1.0 + frac * j)).max(0.0) as u64
} else {
exp as u64
};
Duration::from_millis(ms)
}
pub type Result<T> = std::result::Result<T, SdkError>;
#[derive(Clone, Debug)]
struct MutableState {
api_key: Option<String>,
run_id: Option<String>,
transport: TransportMode,
}
struct TransportEngine {
http_endpoint: String,
grpc_endpoint: String,
grpc_tls: bool,
timeout: Duration,
http_client: reqwest::Client,
grpc_channel: Mutex<Option<Channel>>,
state: Arc<RwLock<MutableState>>,
}
impl TransportEngine {
fn new(config: ClientConfig) -> Result<Self> {
let (default_http_endpoint, default_grpc_endpoint, default_grpc_tls) =
derive_http_and_grpc(&config.endpoint)?;
let http_endpoint = match config.http_endpoint {
Some(http_endpoint) => normalize_http_endpoint(&http_endpoint)?,
None => default_http_endpoint,
};
let (grpc_endpoint, grpc_tls) = match config.grpc_endpoint {
Some(grpc_endpoint) => normalize_grpc_endpoint(&grpc_endpoint)?,
None => (default_grpc_endpoint, default_grpc_tls),
};
let timeout = Duration::from_millis(config.timeout_ms);
let http_client = reqwest::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| SdkError::TransportError {
kind: TransportFailureKind::Other,
message: format!("failed to build HTTP client: {}", e),
})?;
Ok(Self {
http_endpoint,
grpc_endpoint,
grpc_tls,
timeout,
http_client,
grpc_channel: Mutex::new(None),
state: Arc::new(RwLock::new(MutableState {
api_key: config.api_key.or(config.token),
run_id: config.run_id,
transport: config.transport,
})),
})
}
fn set_api_key(&self, api_key: Option<String>) {
if let Ok(mut state) = self.state.write() {
state.api_key = api_key;
}
}
fn set_run_id(&self, run_id: Option<String>) {
if let Ok(mut state) = self.state.write() {
state.run_id = run_id;
}
}
fn set_transport(&self, transport: TransportMode) {
if let Ok(mut state) = self.state.write() {
state.transport = transport;
}
}
fn api_key(&self) -> Option<String> {
self.state.read().ok().and_then(|s| s.api_key.clone())
}
fn run_id(&self) -> Option<String> {
self.state.read().ok().and_then(|s| s.run_id.clone())
}
fn transport(&self) -> TransportMode {
self.state
.read()
.map(|s| s.transport)
.unwrap_or(TransportMode::Auto)
}
async fn invoke_serialized<T: Serialize>(&self, op_key: &str, payload: T) -> Result<Value> {
let value = serde_json::to_value(payload).map_err(|e| {
SdkError::ValidationError(format!("failed to serialize request payload: {}", e))
})?;
self.invoke(op_key, value).await
}
async fn invoke(&self, op_key: &str, payload: Value) -> Result<Value> {
let op = find_operation(op_key).ok_or_else(|| {
SdkError::UnsupportedFeatureError(format!("unknown operation: {}", op_key))
})?;
let mut payload = ensure_object_payload(payload)?;
self.apply_run_id_default(op, &mut payload);
let transport_mode = self.transport();
let attempts = retry_attempts();
let mut last_err: Option<SdkError> = None;
for attempt in 1..=attempts {
if attempt > 1 {
let delay = backoff_delay_ms(attempt);
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
}
let result = match transport_mode {
TransportMode::Grpc => self.invoke_grpc(op, payload.clone()).await,
TransportMode::Http => self.invoke_http(op, payload.clone()).await,
TransportMode::Auto => match self.invoke_grpc(op, payload.clone()).await {
Ok(value) => Ok(value),
Err(err) if err.is_fallback_eligible() => {
self.invoke_http(op, payload.clone()).await
}
Err(err) => Err(err),
},
};
match result {
Ok(value) => return Ok(value),
Err(err) => {
if !err.is_retryable() || attempt >= attempts {
return Err(err);
}
last_err = Some(err);
}
}
}
Err(last_err.unwrap_or_else(|| SdkError::TransportError {
kind: TransportFailureKind::Other,
message: "retry loop exited without result".to_string(),
}))
}
pub async fn invoke_stream(&self, key: &str, payload: Value) -> Result<ValueStream> {
let op = find_operation(key).ok_or_else(|| {
SdkError::UnsupportedFeatureError(format!("unknown operation: {}", key))
})?;
let mut payload = ensure_object_payload(payload)?;
self.apply_run_id_default(op, &mut payload);
match self.transport() {
TransportMode::Grpc => self.invoke_grpc_stream(op, payload).await,
TransportMode::Http => self.invoke_http_stream(op, payload).await,
TransportMode::Auto => match self.invoke_grpc_stream(op, payload.clone()).await {
Ok(stream) => Ok(stream),
Err(err) if err.is_fallback_eligible() => {
self.invoke_http_stream(op, payload).await
}
Err(err) => Err(err),
},
}
}
pub async fn invoke_stream_serialized<T: Serialize>(
&self,
key: &str,
payload: T,
) -> Result<ValueStream> {
let value = serde_json::to_value(payload).map_err(|e| {
SdkError::ValidationError(format!("failed to serialize payload: {}", e))
})?;
self.invoke_stream(key, value).await
}
fn apply_run_id_default(&self, op: &OperationSpec, payload: &mut Value) {
let Some(run_id_field) = op.run_id_field else {
return;
};
let Some(run_id) = self.run_id() else {
return;
};
if let Some(map) = payload.as_object_mut() {
if !map.contains_key(run_id_field) {
map.insert(run_id_field.to_string(), Value::String(run_id));
}
}
}
async fn grpc_channel(&self) -> Result<Channel> {
{
let guard = self.grpc_channel.lock().await;
if let Some(channel) = guard.as_ref() {
return Ok(channel.clone());
}
}
let uri = if self.grpc_tls {
format!("https://{}", self.grpc_endpoint)
} else {
format!("http://{}", self.grpc_endpoint)
};
let mut endpoint = Endpoint::from_shared(uri.clone()).map_err(|e| {
SdkError::ValidationError(format!("invalid gRPC endpoint '{}': {}", uri, e))
})?;
endpoint = endpoint.connect_timeout(self.timeout).timeout(self.timeout);
if self.grpc_tls {
endpoint = endpoint.tls_config(ClientTlsConfig::new()).map_err(|e| {
SdkError::TransportError {
kind: TransportFailureKind::Other,
message: format!("failed to configure TLS for {}: {}", self.grpc_endpoint, e),
}
})?;
}
let channel = endpoint
.connect()
.await
.map_err(|e| map_grpc_connect_error(e, &self.grpc_endpoint))?;
let mut guard = self.grpc_channel.lock().await;
*guard = Some(channel.clone());
Ok(channel)
}
fn attach_grpc_metadata<T>(&self, request: &mut Request<T>) {
if let Some(api_key) = self.api_key() {
if let Ok(header_value) = format!("Bearer {}", api_key).parse() {
request.metadata_mut().insert("authorization", header_value);
}
}
}
fn grpc_request<T>(&self, payload: T) -> Request<T> {
let mut request = Request::new(payload);
self.attach_grpc_metadata(&mut request);
request
}
async fn invoke_grpc(&self, op: &OperationSpec, payload: Value) -> Result<Value> {
use crate::proto::mubit::v1 as pb;
if op.grpc_method.is_empty() {
return Err(SdkError::TransportError {
kind: TransportFailureKind::Unimplemented,
message: format!("operation {} has no gRPC mapping", op.key),
});
}
let channel = self.grpc_channel().await?;
macro_rules! unary_core {
($method:ident, $req_ty:ty) => {{
let request: $req_ty = decode_grpc_request(op.key, payload)?;
let mut client = pb::core_service_client::CoreServiceClient::new(channel.clone());
let response = client
.$method(self.grpc_request(request))
.await
.map_err(map_grpc_status)?;
encode_grpc_response(op.key, response.into_inner())
}};
}
macro_rules! unary_control {
($method:ident, $req_ty:ty) => {{
let request: $req_ty = decode_grpc_request(op.key, payload)?;
let mut client =
pb::control_service_client::ControlServiceClient::new(channel.clone());
let response = client
.$method(self.grpc_request(request))
.await
.map_err(map_grpc_status)?;
encode_grpc_response(op.key, response.into_inner())
}};
}
match op.key {
"auth.health" => unary_core!(health, pb::HealthRequest),
"auth.create_user" => unary_core!(create_user, pb::CreateUserRequest),
"auth.rotate_user_api_key" => {
unary_core!(rotate_user_api_key, pb::RotateUserApiKeyRequest)
}
"auth.revoke_user_api_key" => {
unary_core!(revoke_user_api_key, pb::RevokeUserApiKeyRequest)
}
"auth.list_users" => unary_core!(list_users, pb::ListUsersRequest),
"auth.get_user" => unary_core!(get_user, pb::GetUserRequest),
"auth.delete_user" => unary_core!(delete_user, pb::DeleteUserRequest),
"core.insert" => unary_core!(insert, pb::InsertRequest),
"core.search" => unary_core!(search, pb::SearchRequest),
"core.delete_node" => unary_core!(delete_node, pb::DeleteNodeRequest),
"core.delete_run" => unary_core!(delete_run, pb::DeleteRunRequest),
"core.create_session" => unary_core!(create_session, pb::CreateSessionRequest),
"core.snapshot_session" => unary_core!(snapshot_session, pb::SnapshotSessionRequest),
"core.load_session" => unary_core!(load_session, pb::LoadSessionRequest),
"core.commit_session" => unary_core!(commit_session, pb::CommitSessionRequest),
"core.drop_session" => unary_core!(drop_session, pb::DropSessionRequest),
"core.write_memory" => unary_core!(write_memory, pb::WriteMemoryRequest),
"core.read_memory" => unary_core!(read_memory, pb::ReadMemoryRequest),
"core.add_memory" => unary_core!(add_memory, pb::AddMemoryRequest),
"core.get_memory" => unary_core!(get_memory, pb::GetMemoryRequest),
"core.clear_memory" => unary_core!(clear_memory, pb::ClearMemoryRequest),
"core.grant_permission" => unary_core!(grant_permission, pb::GrantPermissionRequest),
"core.revoke_permission" => unary_core!(revoke_permission, pb::RevokePermissionRequest),
"core.batch_insert" => {
let request_items = decode_batch_insert_payload(payload)?;
let mut request = Request::new(tokio_stream::iter(request_items));
self.attach_grpc_metadata(&mut request);
let mut client = pb::core_service_client::CoreServiceClient::new(channel.clone());
let response = client
.batch_insert(request)
.await
.map_err(map_grpc_status)?;
encode_grpc_response(op.key, response.into_inner())
}
"control.set_variable" => unary_control!(set_variable, pb::SetVariableRequest),
"control.get_variable" => unary_control!(get_variable, pb::GetVariableRequest),
"control.list_variables" => unary_control!(list_variables, pb::ListVariablesRequest),
"control.delete_variable" => unary_control!(delete_variable, pb::DeleteVariableRequest),
"control.define_concept" => unary_control!(define_concept, pb::DefineConceptRequest),
"control.list_concepts" => unary_control!(list_concepts, pb::ListConceptsRequest),
"control.add_goal" => unary_control!(add_goal, pb::AddGoalRequest),
"control.update_goal" => unary_control!(update_goal, pb::UpdateGoalRequest),
"control.list_goals" => unary_control!(list_goals, pb::ListGoalsRequest),
"control.get_goal_tree" => unary_control!(get_goal_tree, pb::GetGoalTreeRequest),
"control.submit_action" => unary_control!(submit_action, pb::ActionRequest),
"control.get_action_log" => unary_control!(get_action_log, pb::ActionLogRequest),
"control.run_cycle" => unary_control!(run_cycle, pb::RunCycleRequest),
"control.get_cycle_history" => {
unary_control!(get_cycle_history, pb::CycleHistoryRequest)
}
"control.register_agent" => unary_control!(register_agent, pb::AgentRegisterRequest),
"control.agent_heartbeat" => {
unary_control!(agent_heartbeat, pb::AgentHeartbeatRequest)
}
"control.append_activity" => unary_control!(append_activity, pb::ActivityAppendRequest),
"control.context_snapshot" => unary_control!(get_run_snapshot, pb::RunSnapshotRequest),
"control.link_run" => unary_control!(link_run, pb::LinkRunRequest),
"control.unlink_run" => unary_control!(unlink_run, pb::UnlinkRunRequest),
"control.ingest" => unary_control!(ingest, pb::IngestRequest),
"control.batch_insert" => {
unary_control!(batch_insert, pb::ControlBatchInsertRequest)
}
"control.get_ingest_job" => unary_control!(get_ingest_job, pb::GetIngestJobRequest),
"control.query" => unary_control!(query, pb::AgentQueryRequest),
"control.diagnose" => unary_control!(diagnose, pb::DiagnoseRequest),
"control.delete_run" => unary_control!(delete_run, pb::RunRequest),
"control.reflect" => unary_control!(reflect, pb::ReflectRequest),
"control.lessons" => unary_control!(list_lessons, pb::ListLessonsRequest),
"control.delete_lesson" => unary_control!(delete_lesson, pb::DeleteLessonRequest),
"control.context" => unary_control!(get_context, pb::ContextRequest),
"control.list_activity" => unary_control!(list_activity, pb::ListActivityRequest),
"control.export_activity" => {
unary_control!(export_activity, pb::ExportActivityRequest)
}
"control.archive_block" => unary_control!(archive_block, pb::ArchiveBlockRequest),
"control.dereference" => unary_control!(dereference, pb::DereferenceRequest),
"control.memory_health" => {
unary_control!(get_memory_health, pb::MemoryHealthRequest)
}
"control.checkpoint" => unary_control!(checkpoint, pb::CheckpointRequest),
"control.list_agents" => unary_control!(list_agents, pb::ListAgentsRequest),
"control.create_handoff" => unary_control!(create_handoff, pb::HandoffRequest),
"control.submit_feedback" => unary_control!(submit_feedback, pb::FeedbackRequest),
"control.record_outcome" => {
unary_control!(record_outcome, pb::RecordOutcomeRequest)
}
"control.surface_strategies" => {
unary_control!(surface_strategies, pb::SurfaceStrategiesRequest)
}
_ => Err(SdkError::UnsupportedFeatureError(format!(
"unknown gRPC operation: {}",
op.key
))),
}
}
async fn invoke_grpc_stream(
&self,
op: &'static OperationSpec,
payload: Value,
) -> Result<ValueStream> {
use crate::proto::mubit::v1 as pb;
let channel = self.grpc_channel().await?;
match op.key {
"core.subscribe_events" => {
let request: pb::CoreSubscribeRequest =
decode_grpc_request(op.key, payload)?;
let mut client =
pb::core_service_client::CoreServiceClient::new(channel.clone());
let response = client
.subscribe(self.grpc_request(request))
.await
.map_err(map_grpc_status)?;
let stream = response.into_inner();
Ok(Box::pin(stream.map(|result| {
result
.map(|msg| {
let mut value =
serde_json::to_value(msg).unwrap_or_else(|e| {
serde_json::json!({"error": e.to_string()})
});
hydrate_pubsub_event(&mut value);
value
})
.map_err(map_grpc_status)
})))
}
"control.subscribe" => {
let request: pb::SubscribeRequest =
decode_grpc_request(op.key, payload)?;
let mut client =
pb::control_service_client::ControlServiceClient::new(channel.clone());
let response = client
.subscribe(self.grpc_request(request))
.await
.map_err(map_grpc_status)?;
let stream = response.into_inner();
Ok(Box::pin(stream.map(|result| {
result
.map(|msg| {
serde_json::to_value(msg).unwrap_or_else(|e| {
serde_json::json!({"error": e.to_string()})
})
})
.map_err(map_grpc_status)
})))
}
"core.watch_memory" => {
let request: pb::WatchMemoryRequest =
decode_grpc_request(op.key, payload)?;
let mut client =
pb::core_service_client::CoreServiceClient::new(channel.clone());
let response = client
.watch_memory(self.grpc_request(request))
.await
.map_err(map_grpc_status)?;
let stream = response.into_inner();
Ok(Box::pin(stream.map(|result| {
result
.map(|msg| {
serde_json::to_value(msg).unwrap_or_else(|e| {
serde_json::json!({"error": e.to_string()})
})
})
.map_err(map_grpc_status)
})))
}
_ => Err(SdkError::UnsupportedFeatureError(format!(
"gRPC streaming not supported for {}",
op.key
))),
}
}
async fn invoke_http_stream(
&self,
op: &'static OperationSpec,
payload: Value,
) -> Result<ValueStream> {
let base = self.http_endpoint.trim_end_matches('/');
let route = op.http_path;
let url = format!("{}{}", base, route);
let client = &self.http_client;
let is_get = matches!(op.http_method, HttpMethod::Get);
let mut request = if is_get {
let mut req = client.get(&url);
if let Some(obj) = payload.as_object() {
for (k, v) in obj {
let val = match v {
Value::String(s) => s.clone(),
other => other.to_string(),
};
req = req.query(&[(k.as_str(), val)]);
}
}
req
} else {
client.post(&url).json(&payload)
};
if let Some(api_key) = self.api_key() {
request = request.bearer_auth(api_key);
}
let response = request.send().await.map_err(|e| {
map_transport_error(e, format!("{} SSE request failed", op.key))
})?;
let status = response.status();
if !status.is_success() {
let body = response
.text()
.await
.unwrap_or_else(|_| "request failed".to_string());
return Err(map_http_error(status.as_u16(), body));
}
let byte_stream = response.bytes_stream();
let sse_stream = parse_sse_byte_stream(byte_stream);
if op.key == "core.subscribe_events" {
let hydrated = sse_stream.map(|result| {
result.map(|mut value| {
hydrate_pubsub_event(&mut value);
value
})
});
Ok(Box::pin(hydrated))
} else {
Ok(Box::pin(sse_stream))
}
}
async fn invoke_http(&self, op: &OperationSpec, payload: Value) -> Result<Value> {
let mut path = op.http_path.to_string();
let mut consumed_keys = HashSet::new();
if let Some(map) = payload.as_object() {
for (key, value) in map {
let marker = format!(":{}", key);
if path.contains(&marker) {
let rendered = value_to_param(value).ok_or_else(|| {
SdkError::ValidationError(format!(
"invalid path parameter value for {} in {}",
key, op.key
))
})?;
path = path.replace(&marker, &rendered);
consumed_keys.insert(key.clone());
}
}
}
if path.contains(':') {
return Err(SdkError::ValidationError(format!(
"missing path parameter for {}",
op.key
)));
}
let url = format!("{}{}", self.http_endpoint.trim_end_matches('/'), path);
let mut request = match op.http_method {
HttpMethod::Get => self.http_client.get(&url),
HttpMethod::Post => self.http_client.post(&url),
HttpMethod::Delete => self.http_client.delete(&url),
};
if let Some(api_key) = self.api_key() {
request = request.bearer_auth(api_key);
}
if matches!(op.http_method, HttpMethod::Get) {
let query = payload
.as_object()
.map(|map| {
map.iter()
.filter_map(|(key, value)| {
if consumed_keys.contains(key) || value.is_null() {
return None;
}
value_to_query(value).map(|rendered| (key.clone(), rendered))
})
.collect::<Vec<(String, String)>>()
})
.unwrap_or_default();
if !query.is_empty() {
request = request.query(&query);
}
} else if payload
.as_object()
.map(|map| !map.is_empty())
.unwrap_or(false)
{
request = request.json(&payload);
}
let response = request.send().await.map_err(|e| {
map_transport_error(
e,
format!(
"{} {} request failed",
http_method_label(op.http_method),
op.key
),
)
})?;
let status = response.status();
if !status.is_success() {
let body = response
.text()
.await
.unwrap_or_else(|_| "request failed".to_string());
return Err(map_http_error(status.as_u16(), body));
}
let content_type = response
.headers()
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_lowercase())
.unwrap_or_default();
let bytes = response
.bytes()
.await
.map_err(|e| SdkError::TransportError {
kind: TransportFailureKind::Io,
message: format!("failed to read response body for {}: {}", op.key, e),
})?;
if bytes.is_empty() {
return Ok(json!({}));
}
if content_type.contains("application/json") {
return serde_json::from_slice::<Value>(&bytes).map_err(|e| {
SdkError::ServerError(format!(
"failed to decode json response for {}: {}",
op.key, e
))
});
}
Ok(Value::String(String::from_utf8_lossy(&bytes).to_string()))
}
}
#[derive(Clone)]
pub struct Client {
pub auth: AuthClient,
pub core: CoreClient,
pub(crate) control: ControlClient,
transport: Arc<TransportEngine>,
}
impl Client {
pub fn new(config: ClientConfig) -> Result<Self> {
let transport = Arc::new(TransportEngine::new(config)?);
Ok(Self {
auth: AuthClient {
transport: transport.clone(),
},
core: CoreClient {
transport: transport.clone(),
},
control: ControlClient {
transport: transport.clone(),
},
transport,
})
}
pub fn set_api_key(&self, api_key: Option<String>) {
self.transport.set_api_key(api_key);
}
pub fn set_token(&self, token: Option<String>) {
self.set_api_key(token);
}
pub fn set_run_id(&self, run_id: Option<String>) {
self.transport.set_run_id(run_id);
}
pub fn set_transport(&self, transport: TransportMode) {
self.transport.set_transport(transport);
}
}
#[derive(Clone, Debug)]
pub struct RememberOptions {
pub run_id: Option<String>,
pub agent_id: Option<String>,
pub item_id: Option<String>,
pub content: String,
pub content_type: String,
pub metadata: Option<Value>,
pub hints: Option<Value>,
pub payload: Option<Value>,
pub intent: Option<String>,
pub lesson_type: Option<String>,
pub lesson_scope: Option<String>,
pub lesson_importance: Option<String>,
pub lesson_conditions: Vec<String>,
pub user_id: Option<String>,
pub upsert_key: Option<String>,
pub importance: Option<String>,
pub source: Option<String>,
pub lane: Option<String>,
pub parallel: bool,
pub idempotency_key: Option<String>,
pub wait: bool,
pub timeout_ms: Option<u64>,
pub poll_interval_ms: u64,
pub occurrence_time: Option<i64>,
}
impl RememberOptions {
pub fn new(content: impl Into<String>) -> Self {
Self {
run_id: None,
agent_id: Some("sdk-client".to_string()),
item_id: None,
content: content.into(),
content_type: "text/plain".to_string(),
metadata: None,
hints: None,
payload: None,
intent: None,
lesson_type: None,
lesson_scope: None,
lesson_importance: None,
lesson_conditions: Vec::new(),
user_id: None,
upsert_key: None,
importance: None,
source: Some("agent".to_string()),
lane: None,
parallel: false,
idempotency_key: None,
wait: true,
timeout_ms: None,
poll_interval_ms: 300,
occurrence_time: None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct LessonMeta {
pub lesson_type: Option<String>,
pub lesson_scope: Option<String>,
pub lesson_importance: Option<String>,
pub lesson_conditions: Vec<String>,
}
#[derive(Clone, Debug, Default)]
pub struct SessionScope {
pub run_id: Option<String>,
pub agent_id: Option<String>,
pub user_id: Option<String>,
}
pub struct RememberBuilder {
options: RememberOptions,
}
impl RememberBuilder {
pub fn new(content: impl Into<String>) -> Self {
Self {
options: RememberOptions::new(content),
}
}
pub fn lesson(mut self, meta: LessonMeta) -> Self {
self.options.lesson_type = meta.lesson_type;
self.options.lesson_scope = meta.lesson_scope;
self.options.lesson_importance = meta.lesson_importance;
self.options.lesson_conditions = meta.lesson_conditions;
self
}
pub fn session(mut self, scope: SessionScope) -> Self {
self.options.run_id = scope.run_id;
self.options.agent_id = scope.agent_id.or(self.options.agent_id);
self.options.user_id = scope.user_id;
self
}
pub fn metadata(mut self, metadata: Value) -> Self {
self.options.metadata = Some(metadata);
self
}
pub fn hints(mut self, hints: Value) -> Self {
self.options.hints = Some(hints);
self
}
pub fn payload(mut self, payload: Value) -> Self {
self.options.payload = Some(payload);
self
}
pub fn upsert(mut self, key: impl Into<String>) -> Self {
self.options.upsert_key = Some(key.into());
self
}
pub fn intent(mut self, intent: impl Into<String>) -> Self {
self.options.intent = Some(intent.into());
self
}
pub fn lane(mut self, lane: impl Into<String>) -> Self {
self.options.lane = Some(lane.into());
self
}
pub fn source(mut self, source: impl Into<String>) -> Self {
self.options.source = Some(source.into());
self
}
pub fn importance(mut self, importance: impl Into<String>) -> Self {
self.options.importance = Some(importance.into());
self
}
pub fn occurrence_time(mut self, ts_seconds: i64) -> Self {
self.options.occurrence_time = Some(ts_seconds);
self
}
pub fn idempotency_key(mut self, key: impl Into<String>) -> Self {
self.options.idempotency_key = Some(key.into());
self
}
pub fn wait(mut self, wait: bool) -> Self {
self.options.wait = wait;
self
}
pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
self.options.timeout_ms = Some(timeout_ms);
self
}
pub fn build(self) -> RememberOptions {
self.options
}
}
#[derive(Clone, Debug)]
pub struct RecallOptions {
pub run_id: Option<String>,
pub query: String,
pub schema: Option<String>,
pub mode: String,
pub direct_lane: String,
pub include_linked_runs: bool,
pub limit: u64,
pub embedding: Vec<f32>,
pub entry_types: Vec<String>,
pub include_working_memory: bool,
pub user_id: Option<String>,
pub agent_id: Option<String>,
pub lane: Option<String>,
pub min_timestamp: Option<i64>,
pub max_timestamp: Option<i64>,
pub budget: Option<String>,
pub rank_by: Option<String>,
pub explain: Option<bool>,
pub prefer_current_run: Option<bool>,
}
impl RecallOptions {
pub fn new(query: impl Into<String>) -> Self {
Self {
run_id: None,
query: query.into(),
schema: None,
mode: "AGENT_ROUTED".to_string(),
direct_lane: "SEMANTIC_SEARCH".to_string(),
include_linked_runs: false,
limit: 5,
embedding: Vec::new(),
entry_types: Vec::new(),
include_working_memory: true,
user_id: None,
agent_id: None,
lane: None,
min_timestamp: None,
max_timestamp: None,
budget: None,
rank_by: None,
explain: None,
prefer_current_run: None,
}
}
}
#[derive(Clone, Debug)]
pub struct GetContextOptions {
pub run_id: Option<String>,
pub query: Option<String>,
pub user_id: Option<String>,
pub entry_types: Vec<String>,
pub include_working_memory: bool,
pub format: Option<String>,
pub limit: Option<u64>,
pub max_token_budget: Option<u32>,
pub agent_id: Option<String>,
pub mode: Option<String>,
pub sections: Vec<String>,
pub lane: Option<String>,
}
impl Default for GetContextOptions {
fn default() -> Self {
Self {
run_id: None,
query: None,
user_id: None,
entry_types: Vec::new(),
include_working_memory: true,
format: None,
limit: None,
max_token_budget: None,
agent_id: None,
mode: None,
sections: Vec::new(),
lane: None,
}
}
}
#[derive(Clone, Debug)]
pub struct ArchiveOptions {
pub run_id: Option<String>,
pub content: String,
pub artifact_kind: String,
pub metadata: Option<Value>,
pub user_id: Option<String>,
pub agent_id: Option<String>,
pub origin_agent_id: Option<String>,
pub source_attempt_id: Option<String>,
pub source_tool: Option<String>,
pub labels: Vec<String>,
pub family: Option<String>,
pub importance: Option<String>,
}
impl ArchiveOptions {
pub fn new(content: impl Into<String>, artifact_kind: impl Into<String>) -> Self {
Self {
run_id: None,
content: content.into(),
artifact_kind: artifact_kind.into(),
metadata: None,
user_id: None,
agent_id: None,
origin_agent_id: None,
source_attempt_id: None,
source_tool: None,
labels: Vec::new(),
family: None,
importance: None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct ArtifactProvenance {
pub origin_agent_id: Option<String>,
pub source_attempt_id: Option<String>,
pub source_tool: Option<String>,
pub labels: Vec<String>,
pub family: Option<String>,
}
pub struct ArchiveBuilder {
options: ArchiveOptions,
}
impl ArchiveBuilder {
pub fn new(content: impl Into<String>, artifact_kind: impl Into<String>) -> Self {
Self {
options: ArchiveOptions::new(content, artifact_kind),
}
}
pub fn session(mut self, scope: SessionScope) -> Self {
self.options.run_id = scope.run_id;
self.options.agent_id = scope.agent_id;
self.options.user_id = scope.user_id;
self
}
pub fn provenance(mut self, prov: ArtifactProvenance) -> Self {
self.options.origin_agent_id = prov.origin_agent_id;
self.options.source_attempt_id = prov.source_attempt_id;
self.options.source_tool = prov.source_tool;
self.options.labels = prov.labels;
self.options.family = prov.family;
self
}
pub fn metadata(mut self, metadata: Value) -> Self {
self.options.metadata = Some(metadata);
self
}
pub fn importance(mut self, importance: impl Into<String>) -> Self {
self.options.importance = Some(importance.into());
self
}
pub fn label(mut self, label: impl Into<String>) -> Self {
self.options.labels.push(label.into());
self
}
pub fn build(self) -> ArchiveOptions {
self.options
}
}
#[derive(Clone, Debug)]
pub struct DereferenceOptions {
pub run_id: Option<String>,
pub reference_id: String,
pub user_id: Option<String>,
pub agent_id: Option<String>,
}
impl DereferenceOptions {
pub fn new(reference_id: impl Into<String>) -> Self {
Self {
run_id: None,
reference_id: reference_id.into(),
user_id: None,
agent_id: None,
}
}
}
#[derive(Clone, Debug)]
pub struct MemoryHealthOptions {
pub run_id: Option<String>,
pub user_id: Option<String>,
pub stale_threshold_days: u32,
pub limit: u32,
}
impl Default for MemoryHealthOptions {
fn default() -> Self {
Self {
run_id: None,
user_id: None,
stale_threshold_days: 30,
limit: 500,
}
}
}
#[derive(Clone, Debug)]
pub struct DiagnoseOptions {
pub run_id: Option<String>,
pub error_text: String,
pub error_type: Option<String>,
pub limit: u64,
pub user_id: Option<String>,
}
impl DiagnoseOptions {
pub fn new(error_text: impl Into<String>) -> Self {
Self {
run_id: None,
error_text: error_text.into(),
error_type: None,
limit: 10,
user_id: None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct ReflectOptions {
pub run_id: Option<String>,
pub include_linked_runs: bool,
pub user_id: Option<String>,
pub step_id: Option<String>,
pub checkpoint_id: Option<String>,
pub last_n_items: Option<u64>,
pub include_step_outcomes: Option<bool>,
}
#[derive(Clone, Debug, Default)]
pub struct ForgetOptions {
pub run_id: Option<String>,
pub lesson_id: Option<String>,
}
impl ForgetOptions {
pub fn for_run(run_id: impl Into<String>) -> Self {
Self {
run_id: Some(run_id.into()),
lesson_id: None,
}
}
pub fn for_lesson(lesson_id: impl Into<String>) -> Self {
Self {
run_id: None,
lesson_id: Some(lesson_id.into()),
}
}
}
#[derive(Clone, Debug)]
pub struct CheckpointOptions {
pub run_id: Option<String>,
pub label: Option<String>,
pub context_snapshot: String,
pub metadata: Option<Value>,
pub user_id: Option<String>,
pub agent_id: Option<String>,
}
impl CheckpointOptions {
pub fn new(context_snapshot: impl Into<String>) -> Self {
Self {
run_id: None,
label: None,
context_snapshot: context_snapshot.into(),
metadata: None,
user_id: None,
agent_id: None,
}
}
}
#[derive(Clone, Debug)]
pub struct RegisterAgentOptions {
pub run_id: Option<String>,
pub agent_id: String,
pub role: String,
pub capabilities: Vec<String>,
pub status: String,
pub read_scopes: Vec<String>,
pub write_scopes: Vec<String>,
pub shared_memory_lanes: Vec<String>,
}
impl RegisterAgentOptions {
pub fn new(agent_id: impl Into<String>) -> Self {
Self {
run_id: None,
agent_id: agent_id.into(),
role: String::new(),
capabilities: Vec::new(),
status: "active".to_string(),
read_scopes: Vec::new(),
write_scopes: Vec::new(),
shared_memory_lanes: Vec::new(),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct ListAgentsOptions {
pub run_id: Option<String>,
}
#[derive(Clone, Debug)]
pub struct RecordOutcomeOptions {
pub run_id: Option<String>,
pub reference_id: String,
pub outcome: String,
pub signal: f32,
pub rationale: String,
pub agent_id: Option<String>,
pub user_id: Option<String>,
}
impl RecordOutcomeOptions {
pub fn new(reference_id: impl Into<String>, outcome: impl Into<String>) -> Self {
Self {
run_id: None,
reference_id: reference_id.into(),
outcome: outcome.into(),
signal: 0.0,
rationale: String::new(),
agent_id: None,
user_id: None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct OptimizePromptOptions {
pub agent_id: String,
pub auto_activate: bool,
pub run_id: Option<String>,
pub project_id: Option<String>,
}
impl OptimizePromptOptions {
pub fn new(agent_id: impl Into<String>) -> Self {
Self {
agent_id: agent_id.into(),
auto_activate: false,
run_id: None,
project_id: None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct OptimizeSkillOptions {
pub skill_id: String,
pub auto_activate: bool,
pub project_id: Option<String>,
}
impl OptimizeSkillOptions {
pub fn new(skill_id: impl Into<String>) -> Self {
Self {
skill_id: skill_id.into(),
auto_activate: false,
project_id: None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct CircuitBreakOptions {
pub run_id: Option<String>,
pub reason: Option<String>,
pub agent_id: Option<String>,
}
impl CircuitBreakOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
self.reason = Some(reason.into());
self
}
}
#[derive(Clone, Debug)]
pub struct RecordStepOutcomeOptions {
pub run_id: Option<String>,
pub step_id: String,
pub step_name: Option<String>,
pub outcome: String,
pub signal: f32,
pub rationale: String,
pub directive_hint: Option<String>,
pub agent_id: Option<String>,
pub user_id: Option<String>,
pub metadata: Option<Value>,
}
impl RecordStepOutcomeOptions {
pub fn new(step_id: impl Into<String>, outcome: impl Into<String>) -> Self {
Self {
run_id: None,
step_id: step_id.into(),
step_name: None,
outcome: outcome.into(),
signal: 0.0,
rationale: String::new(),
directive_hint: None,
agent_id: None,
user_id: None,
metadata: None,
}
}
}
#[derive(Clone, Debug)]
pub struct SurfaceStrategiesOptions {
pub run_id: Option<String>,
pub lesson_types: Vec<String>,
pub max_strategies: u32,
pub user_id: Option<String>,
}
impl Default for SurfaceStrategiesOptions {
fn default() -> Self {
Self {
run_id: None,
lesson_types: Vec::new(),
max_strategies: 5,
user_id: None,
}
}
}
#[derive(Clone, Debug)]
pub struct HandoffOptions {
pub run_id: Option<String>,
pub task_id: String,
pub from_agent_id: String,
pub to_agent_id: String,
pub content: String,
pub requested_action: String,
pub metadata: Option<Value>,
pub user_id: Option<String>,
}
impl HandoffOptions {
pub fn new(
task_id: impl Into<String>,
from_agent_id: impl Into<String>,
to_agent_id: impl Into<String>,
content: impl Into<String>,
) -> Self {
Self {
run_id: None,
task_id: task_id.into(),
from_agent_id: from_agent_id.into(),
to_agent_id: to_agent_id.into(),
content: content.into(),
requested_action: "continue".to_string(),
metadata: None,
user_id: None,
}
}
}
#[derive(Clone, Debug)]
pub struct FeedbackOptions {
pub run_id: Option<String>,
pub handoff_id: String,
pub verdict: String,
pub comments: String,
pub from_agent_id: Option<String>,
pub metadata: Option<Value>,
pub user_id: Option<String>,
}
impl FeedbackOptions {
pub fn new(handoff_id: impl Into<String>, verdict: impl Into<String>) -> Self {
Self {
run_id: None,
handoff_id: handoff_id.into(),
verdict: verdict.into(),
comments: String::new(),
from_agent_id: None,
metadata: None,
user_id: None,
}
}
}
impl Client {
pub async fn remember(&self, options: RememberOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "remember")?;
let content = require_non_empty_string(options.content, "content")?;
let item_id = options
.item_id
.unwrap_or_else(|| generate_helper_id("remember"));
let accepted = self
.control
.ingest(prune_nulls(json!({
"run_id": run_id,
"agent_id": options.agent_id.unwrap_or_else(|| "sdk-client".to_string()),
"idempotency_key": options.idempotency_key.unwrap_or_else(|| item_id.clone()),
"parallel": options.parallel,
"items": [{
"item_id": item_id,
"content_type": options.content_type,
"text": content,
"payload_json": encode_optional_json(options.payload.as_ref())?,
"hints_json": encode_optional_json(options.hints.as_ref())?,
"metadata_json": encode_optional_json(options.metadata.as_ref())?,
"intent": options.intent,
"lesson_type": options.lesson_type,
"lesson_scope": options.lesson_scope,
"lesson_importance": options.lesson_importance,
"lesson_conditions_json": encode_string_vec(&options.lesson_conditions)?,
"user_id": options.user_id,
"upsert_key": options.upsert_key,
"importance": options.importance,
"source": options.source.unwrap_or_else(|| "agent".to_string()),
"lane": options.lane,
"occurrence_time": options.occurrence_time.unwrap_or(0),
}],
})))
.await?;
if !options.wait {
return Ok(accepted);
}
let Some(job_id) = accepted.get("job_id").and_then(|value| value.as_str()) else {
return Ok(accepted);
};
self.wait_for_ingest_job(
&run_id,
job_id,
options
.timeout_ms
.unwrap_or_else(|| self.transport.timeout.as_millis() as u64),
options.poll_interval_ms,
)
.await
}
pub async fn recall(&self, options: RecallOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "recall")?;
self.control
.query(prune_nulls(json!({
"run_id": run_id,
"query": require_non_empty_string(options.query, "query")?,
"schema": options.schema,
"mode": options.mode,
"direct_lane": options.direct_lane,
"include_linked_runs": options.include_linked_runs,
"limit": options.limit,
"embedding": options.embedding,
"entry_types": if options.entry_types.is_empty() { Value::Null } else { json!(options.entry_types) },
"include_working_memory": options.include_working_memory,
"user_id": options.user_id,
"agent_id": options.agent_id,
"lane": options.lane,
"min_timestamp": options.min_timestamp.unwrap_or(0),
"max_timestamp": options.max_timestamp.unwrap_or(0),
"budget": options.budget,
"rank_by": options.rank_by,
"explain": options.explain,
"prefer_current_run": options.prefer_current_run,
})))
.await
}
pub async fn get_context(&self, options: GetContextOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "get_context")?;
self.control
.context(prune_nulls(json!({
"run_id": run_id,
"query": require_non_empty_string(options.query.unwrap_or_default(), "query")?,
"user_id": options.user_id,
"entry_types": if options.entry_types.is_empty() { Value::Null } else { json!(options.entry_types) },
"include_working_memory": options.include_working_memory,
"format": options.format.unwrap_or_else(|| "structured".to_string()),
"limit": options.limit.unwrap_or(5),
"max_token_budget": options.max_token_budget.unwrap_or(0),
"agent_id": options.agent_id,
"mode": options.mode.unwrap_or_else(|| "full".to_string()),
"sections": if options.sections.is_empty() { Value::Null } else { json!(options.sections) },
"lane": options.lane,
})))
.await
}
pub async fn archive(&self, options: ArchiveOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "archive")?;
let content = require_non_empty_string(options.content, "content")?;
let artifact_kind = require_non_empty_string(options.artifact_kind, "artifact_kind")?;
let agent_id = options.agent_id.clone();
self.control
.archive_block(prune_nulls(json!({
"run_id": run_id,
"content": content,
"artifact_kind": artifact_kind,
"metadata_json": encode_optional_json(options.metadata.as_ref())?,
"user_id": options.user_id,
"agent_id": agent_id.clone(),
"origin_agent_id": options.origin_agent_id.or(agent_id),
"source_attempt_id": options.source_attempt_id,
"source_tool": options.source_tool,
"labels": if options.labels.is_empty() { Value::Null } else { json!(options.labels) },
"family": options.family,
"importance": options.importance,
})))
.await
}
pub async fn archive_block(&self, options: ArchiveOptions) -> Result<Value> {
self.archive(options).await
}
pub async fn dereference(&self, options: DereferenceOptions) -> Result<Value> {
let run_id =
resolve_helper_run_id(options.run_id, self.transport.run_id(), "dereference")?;
self.control
.dereference(prune_nulls(json!({
"run_id": run_id,
"reference_id": require_non_empty_string(options.reference_id, "reference_id")?,
"user_id": options.user_id,
"agent_id": options.agent_id,
})))
.await
}
pub async fn memory_health(&self, options: MemoryHealthOptions) -> Result<Value> {
let run_id =
resolve_helper_run_id(options.run_id, self.transport.run_id(), "memory_health")?;
self.control
.memory_health(prune_nulls(json!({
"run_id": run_id,
"user_id": options.user_id,
"stale_threshold_days": options.stale_threshold_days,
"limit": options.limit,
})))
.await
}
pub async fn diagnose(&self, options: DiagnoseOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "diagnose")?;
self.control
.diagnose(prune_nulls(json!({
"run_id": run_id,
"error_text": require_non_empty_string(options.error_text, "error_text")?,
"error_type": options.error_type,
"limit": options.limit,
"user_id": options.user_id,
})))
.await
}
pub async fn reflect(&self, options: ReflectOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "reflect")?;
self.control
.reflect(prune_nulls(json!({
"run_id": run_id,
"include_linked_runs": options.include_linked_runs,
"user_id": options.user_id,
"step_id": options.step_id,
"checkpoint_id": options.checkpoint_id,
"last_n_items": options.last_n_items,
"include_step_outcomes": options.include_step_outcomes,
})))
.await
}
pub async fn forget(&self, options: ForgetOptions) -> Result<Value> {
let delete_lesson = options
.lesson_id
.as_ref()
.map(|value| !value.trim().is_empty())
.unwrap_or(false);
let run_id = if options.run_id.is_some() {
options.run_id
} else if delete_lesson {
None
} else {
self.transport.run_id()
};
let delete_run = run_id
.as_ref()
.map(|value| !value.trim().is_empty())
.unwrap_or(false);
if (delete_lesson as u8) + (delete_run as u8) != 1 {
return Err(SdkError::ValidationError(
"forget requires either lesson_id or run_id, but not both".to_string(),
));
}
if delete_lesson {
return self
.control
.delete_lesson(json!({ "lesson_id": options.lesson_id.unwrap_or_default() }))
.await;
}
self.control
.delete_run(json!({ "run_id": run_id.unwrap_or_default() }))
.await
}
pub async fn checkpoint(&self, options: CheckpointOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "checkpoint")?;
self.control
.checkpoint(prune_nulls(json!({
"run_id": run_id,
"label": options.label,
"context_snapshot": require_non_empty_string(options.context_snapshot, "context_snapshot")?,
"metadata_json": encode_optional_json(options.metadata.as_ref())?,
"user_id": options.user_id,
"agent_id": options.agent_id,
})))
.await
}
pub async fn register_agent(&self, options: RegisterAgentOptions) -> Result<Value> {
let run_id =
resolve_helper_run_id(options.run_id, self.transport.run_id(), "register_agent")?;
self.control
.register_agent(prune_nulls(json!({
"run_id": run_id,
"agent_id": require_non_empty_string(options.agent_id, "agent_id")?,
"role": options.role,
"capabilities": if options.capabilities.is_empty() { Value::Null } else { json!(options.capabilities) },
"status": options.status,
"read_scopes": if options.read_scopes.is_empty() { Value::Null } else { json!(options.read_scopes) },
"write_scopes": if options.write_scopes.is_empty() { Value::Null } else { json!(options.write_scopes) },
"shared_memory_lanes": if options.shared_memory_lanes.is_empty() { Value::Null } else { json!(options.shared_memory_lanes) },
})))
.await
}
pub async fn list_agents(&self, options: ListAgentsOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "list_agents")?;
self.control.list_agents(json!({ "run_id": run_id })).await
}
pub async fn record_outcome(&self, options: RecordOutcomeOptions) -> Result<Value> {
let run_id =
resolve_helper_run_id(options.run_id, self.transport.run_id(), "record_outcome")?;
self.control
.record_outcome(prune_nulls(json!({
"run_id": run_id,
"reference_id": require_non_empty_string(options.reference_id, "reference_id")?,
"outcome": require_non_empty_string(options.outcome, "outcome")?,
"signal": options.signal,
"rationale": options.rationale,
"agent_id": options.agent_id,
"user_id": options.user_id,
})))
.await
}
pub async fn optimize_prompt(&self, options: OptimizePromptOptions) -> Result<Value> {
let run_id = options
.run_id
.clone()
.or_else(|| self.transport.run_id())
.unwrap_or_default();
self.control
.optimize_prompt(prune_nulls(json!({
"agent_id": require_non_empty_string(options.agent_id, "agent_id")?,
"auto_activate": options.auto_activate,
"run_id": if run_id.is_empty() { None } else { Some(run_id) },
"project_id": options.project_id,
})))
.await
}
pub async fn optimize_skill(&self, options: OptimizeSkillOptions) -> Result<Value> {
self.control
.optimize_skill(prune_nulls(json!({
"skill_id": require_non_empty_string(options.skill_id, "skill_id")?,
"auto_activate": options.auto_activate,
"project_id": options.project_id,
})))
.await
}
pub async fn circuit_break(&self, options: CircuitBreakOptions) -> Result<Value> {
let run_id =
resolve_helper_run_id(options.run_id, self.transport.run_id(), "circuit_break")?;
self.control
.circuit_break(prune_nulls(json!({
"run_id": run_id,
"reason": options.reason,
"agent_id": options.agent_id,
})))
.await
}
pub async fn record_step_outcome(&self, options: RecordStepOutcomeOptions) -> Result<Value> {
let run_id =
resolve_helper_run_id(options.run_id, self.transport.run_id(), "record_step_outcome")?;
self.control
.record_outcome(prune_nulls(json!({
"run_id": run_id,
"step_id": require_non_empty_string(options.step_id, "step_id")?,
"step_name": options.step_name,
"outcome": require_non_empty_string(options.outcome, "outcome")?,
"signal": options.signal,
"rationale": options.rationale,
"directive_hint": options.directive_hint,
"agent_id": options.agent_id,
"user_id": options.user_id,
"metadata_json": encode_optional_json(options.metadata.as_ref())?,
})))
.await
}
pub async fn surface_strategies(&self, options: SurfaceStrategiesOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(
options.run_id,
self.transport.run_id(),
"surface_strategies",
)?;
self.control
.surface_strategies(prune_nulls(json!({
"run_id": run_id,
"lesson_types": if options.lesson_types.is_empty() { Value::Null } else { json!(options.lesson_types) },
"max_strategies": options.max_strategies,
"user_id": options.user_id,
})))
.await
}
pub async fn handoff(&self, options: HandoffOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "handoff")?;
self.control
.create_handoff(prune_nulls(json!({
"run_id": run_id,
"task_id": require_non_empty_string(options.task_id, "task_id")?,
"from_agent_id": require_non_empty_string(options.from_agent_id, "from_agent_id")?,
"to_agent_id": require_non_empty_string(options.to_agent_id, "to_agent_id")?,
"content": require_non_empty_string(options.content, "content")?,
"requested_action": options.requested_action,
"metadata_json": encode_optional_json(options.metadata.as_ref())?,
"user_id": options.user_id,
})))
.await
}
pub async fn feedback(&self, options: FeedbackOptions) -> Result<Value> {
let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "feedback")?;
self.control
.submit_feedback(prune_nulls(json!({
"run_id": run_id,
"handoff_id": require_non_empty_string(options.handoff_id, "handoff_id")?,
"verdict": require_non_empty_string(options.verdict, "verdict")?,
"comments": options.comments,
"from_agent_id": options.from_agent_id,
"metadata_json": encode_optional_json(options.metadata.as_ref())?,
"user_id": options.user_id,
})))
.await
}
async fn wait_for_ingest_job(
&self,
run_id: &str,
job_id: &str,
timeout_ms: u64,
poll_interval_ms: u64,
) -> Result<Value> {
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
loop {
let job = self
.control
.get_ingest_job(json!({ "run_id": run_id, "job_id": job_id }))
.await?;
if job
.get("done")
.and_then(|value| value.as_bool())
.unwrap_or(false)
{
return Ok(job);
}
if Instant::now() >= deadline {
return Err(SdkError::TransportError {
kind: TransportFailureKind::DeadlineExceeded,
message: format!("timed out waiting for ingest job {}", job_id),
});
}
sleep(Duration::from_millis(poll_interval_ms)).await;
}
}
}
#[derive(Clone)]
pub struct AuthClient {
transport: Arc<TransportEngine>,
}
impl AuthClient {
pub async fn health(&self) -> Result<Value> {
self.transport.invoke("auth.health", json!({})).await
}
pub fn set_api_key(&self, api_key: Option<String>) {
self.transport.set_api_key(api_key);
}
pub fn set_token(&self, token: Option<String>) {
self.set_api_key(token);
}
pub fn set_run_id(&self, run_id: Option<String>) {
self.transport.set_run_id(run_id);
}
}
macro_rules! define_auth_payload_methods {
($($name:ident => $op_key:literal),+ $(,)?) => {
impl AuthClient {
$(
pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
self.transport.invoke_serialized($op_key, payload).await
}
)+
}
};
}
define_auth_payload_methods!(
create_user => "auth.create_user",
rotate_user_api_key => "auth.rotate_user_api_key",
revoke_user_api_key => "auth.revoke_user_api_key",
list_users => "auth.list_users",
get_user => "auth.get_user",
delete_user => "auth.delete_user"
);
#[derive(Clone)]
pub struct CoreClient {
transport: Arc<TransportEngine>,
}
macro_rules! define_core_payload_methods {
($($name:ident => $op_key:literal),+ $(,)?) => {
impl CoreClient {
$(
pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
self.transport.invoke_serialized($op_key, payload).await
}
)+
}
};
}
define_core_payload_methods!(
insert => "core.insert",
batch_insert => "core.batch_insert",
search => "core.search",
delete_node => "core.delete_node",
delete_run => "core.delete_run",
create_session => "core.create_session",
snapshot_session => "core.snapshot_session",
load_session => "core.load_session",
commit_session => "core.commit_session",
drop_session => "core.drop_session",
write_memory => "core.write_memory",
read_memory => "core.read_memory",
add_memory => "core.add_memory",
get_memory => "core.get_memory",
clear_memory => "core.clear_memory",
grant_permission => "core.grant_permission",
revoke_permission => "core.revoke_permission",
check_permission => "core.check_permission",
unsubscribe_events => "core.unsubscribe_events"
);
impl CoreClient {
pub async fn subscribe_events<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
self.transport.invoke_stream_serialized("core.subscribe_events", payload).await
}
pub async fn watch_memory<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
self.transport.invoke_stream_serialized("core.watch_memory", payload).await
}
pub async fn list_subscriptions(&self) -> Result<Value> {
self.transport
.invoke("core.list_subscriptions", json!({}))
.await
}
pub async fn storage_stats(&self) -> Result<Value> {
self.transport.invoke("core.storage_stats", json!({})).await
}
pub async fn trigger_compaction(&self) -> Result<Value> {
self.transport
.invoke("core.trigger_compaction", json!({}))
.await
}
}
#[derive(Clone)]
pub struct ControlClient {
transport: Arc<TransportEngine>,
}
macro_rules! define_control_payload_methods {
($($name:ident => $op_key:literal),+ $(,)?) => {
impl ControlClient {
$(
pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
self.transport.invoke_serialized($op_key, payload).await
}
)+
}
};
}
define_control_payload_methods!(
register_agent => "control.register_agent",
agent_heartbeat => "control.agent_heartbeat",
context_snapshot => "control.context_snapshot",
link_run => "control.link_run",
unlink_run => "control.unlink_run",
ingest => "control.ingest",
batch_insert => "control.batch_insert",
get_ingest_job => "control.get_ingest_job",
get_run_ingest_stats => "control.get_run_ingest_stats",
query => "control.query",
diagnose => "control.diagnose",
delete_run => "control.delete_run",
reflect => "control.reflect",
lessons => "control.lessons",
delete_lesson => "control.delete_lesson",
context => "control.context",
archive_block => "control.archive_block",
dereference => "control.dereference",
memory_health => "control.memory_health",
checkpoint => "control.checkpoint",
list_agents => "control.list_agents",
create_handoff => "control.create_handoff",
submit_feedback => "control.submit_feedback",
circuit_break => "control.circuit_break",
record_outcome => "control.record_outcome",
record_step_outcome => "control.record_step_outcome",
surface_strategies => "control.surface_strategies",
create_session => "control.create_session",
get_session => "control.get_session",
close_session => "control.close_session",
set_prompt => "control.set_prompt",
get_prompt => "control.get_prompt",
list_prompt_versions => "control.list_prompt_versions",
activate_prompt_version => "control.activate_prompt_version",
optimize_prompt => "control.optimize_prompt",
get_prompt_diff => "control.get_prompt_diff",
create_project => "control.create_project",
get_project => "control.get_project",
list_projects => "control.list_projects",
update_project => "control.update_project",
delete_project => "control.delete_project",
create_agent_definition => "control.create_agent_definition",
get_agent_definition => "control.get_agent_definition",
list_agent_definitions => "control.list_agent_definitions",
update_agent_definition => "control.update_agent_definition",
delete_agent_definition => "control.delete_agent_definition",
list_run_history => "control.list_run_history",
get_run_history => "control.get_run_history",
create_skill => "control.create_skill",
get_skill => "control.get_skill",
list_skills => "control.list_skills",
update_skill => "control.update_skill",
delete_skill => "control.delete_skill",
list_skill_versions => "control.list_skill_versions",
activate_skill_version => "control.activate_skill_version",
optimize_skill => "control.optimize_skill",
get_skill_diff => "control.get_skill_diff",
);
macro_rules! define_client_control_delegates {
($($name:ident),+ $(,)?) => {
impl Client {
$(
pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
self.control.$name(payload).await
}
)+
}
};
}
define_client_control_delegates!(
agent_heartbeat, context_snapshot,
link_run, unlink_run,
ingest, batch_insert, get_ingest_job, get_run_ingest_stats,
query,
delete_run,
lessons, delete_lesson,
context,
create_handoff, submit_feedback,
create_session, get_session, close_session,
set_prompt, get_prompt, list_prompt_versions,
activate_prompt_version, get_prompt_diff,
create_project, get_project, list_projects, update_project, delete_project,
create_agent_definition, get_agent_definition, list_agent_definitions,
update_agent_definition, delete_agent_definition,
list_run_history, get_run_history,
create_skill, get_skill, list_skills, update_skill, delete_skill,
list_skill_versions, activate_skill_version, get_skill_diff,
);
impl Client {
pub async fn subscribe<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
self.control.subscribe(payload).await
}
}
impl ControlClient {
pub async fn subscribe<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
self.transport.invoke_stream_serialized("control.subscribe", payload).await
}
}
fn parse_sse_byte_stream(
byte_stream: impl Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
) -> impl Stream<Item = Result<Value>> + Send {
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value>>(64);
tokio::spawn(async move {
tokio::pin!(byte_stream);
let mut buffer = String::new();
while let Some(chunk_result) = byte_stream.next().await {
match chunk_result {
Ok(chunk) => {
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() || line.starts_with(':') {
continue;
}
let data = if line.starts_with("data: ") {
&line[6..]
} else {
&line
};
let value = match serde_json::from_str::<Value>(data) {
Ok(value) => Ok(value),
Err(_) => Ok(Value::String(data.to_string())),
};
if tx.send(value).await.is_err() {
return;
}
}
}
Err(e) => {
let _ = tx
.send(Err(SdkError::TransportError {
kind: TransportFailureKind::Io,
message: e.to_string(),
}))
.await;
return;
}
}
}
let remaining = buffer.trim().to_string();
if !remaining.is_empty() && !remaining.starts_with(':') {
let data = if remaining.starts_with("data: ") {
&remaining[6..]
} else {
&remaining
};
let value = match serde_json::from_str::<Value>(data) {
Ok(value) => Ok(value),
Err(_) => Ok(Value::String(data.to_string())),
};
let _ = tx.send(value).await;
}
});
tokio_stream::wrappers::ReceiverStream::new(rx)
}
fn hydrate_pubsub_event(value: &mut Value) {
let Some(obj) = value.as_object_mut() else {
return;
};
if let Some(Value::String(raw)) = obj.remove("metadata_json") {
let parsed = serde_json::from_str::<Value>(&raw).unwrap_or(Value::String(raw));
obj.insert("metadata".to_string(), parsed);
}
if let Some(Value::String(raw)) = obj.remove("entry_json") {
let parsed = serde_json::from_str::<Value>(&raw).unwrap_or(Value::String(raw));
obj.insert("entry".to_string(), parsed);
}
let event_type = obj
.get("type")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_default();
let keep: &[&str] = match event_type.as_str() {
"subscribed" => &["type", "subscription_id"],
"node.inserted" | "node.updated" => {
&["type", "node_id", "run_id", "metadata", "created_at", "updated_at"]
}
"node.deleted" => &["type", "node_id"],
"memory.added" => &["type", "session_id", "entry"],
_ => return,
};
obj.retain(|k, _| keep.iter().any(|known| *known == k));
}
fn prune_nulls(value: Value) -> Value {
match value {
Value::Object(map) => Value::Object(
map.into_iter()
.filter_map(|(key, value)| {
let cleaned = prune_nulls(value);
if cleaned.is_null() {
None
} else {
Some((key, cleaned))
}
})
.collect::<Map<String, Value>>(),
),
Value::Array(items) => Value::Array(items.into_iter().map(prune_nulls).collect()),
other => other,
}
}
fn resolve_helper_run_id(
explicit: Option<String>,
fallback: Option<String>,
helper_name: &str,
) -> Result<String> {
let candidate = explicit.or(fallback).unwrap_or_default();
if candidate.trim().is_empty() {
return Err(SdkError::ValidationError(format!(
"{} requires run_id or a client default run_id",
helper_name
)));
}
Ok(candidate)
}
fn require_non_empty_string(value: String, field_name: &str) -> Result<String> {
if value.trim().is_empty() {
return Err(SdkError::ValidationError(format!(
"{} is required",
field_name
)));
}
Ok(value)
}
fn encode_optional_json(value: Option<&Value>) -> Result<String> {
match value {
Some(value) => serde_json::to_string(value).map_err(|err| {
SdkError::ValidationError(format!("failed to serialize helper json field: {}", err))
}),
None => Ok(String::new()),
}
}
fn encode_string_vec(values: &[String]) -> Result<String> {
if values.is_empty() {
return Ok(String::new());
}
serde_json::to_string(values).map_err(|err| {
SdkError::ValidationError(format!("failed to serialize helper string list: {}", err))
})
}
fn generate_helper_id(prefix: &str) -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
format!("{}-{}", prefix, millis)
}
fn ensure_object_payload(payload: Value) -> Result<Value> {
match payload {
Value::Null => Ok(json!({})),
Value::Object(_) => Ok(payload),
_ => Err(SdkError::ValidationError(
"payload must serialize to a JSON object".to_string(),
)),
}
}
fn decode_grpc_request<T: DeserializeOwned>(op_key: &str, payload: Value) -> Result<T> {
serde_json::from_value(payload).map_err(|e| {
SdkError::ValidationError(format!(
"invalid gRPC request payload for {}: {}",
op_key, e
))
})
}
fn encode_grpc_response<T: Serialize>(op_key: &str, response: T) -> Result<Value> {
serde_json::to_value(response).map_err(|e| {
SdkError::ServerError(format!(
"failed to serialize gRPC response for {}: {}",
op_key, e
))
})
}
fn decode_batch_insert_payload(
payload: Value,
) -> Result<Vec<crate::proto::mubit::v1::InsertRequest>> {
let payload = ensure_object_payload(payload)?;
let mut extracted: Option<&Value> = None;
if let Some(map) = payload.as_object() {
for key in ["items", "requests", "nodes"] {
if let Some(value) = map.get(key) {
extracted = Some(value);
break;
}
}
}
if let Some(Value::Array(items)) = extracted {
if items.is_empty() {
return Err(SdkError::ValidationError(
"batch_insert gRPC payload cannot provide an empty items list".to_string(),
));
}
return items
.iter()
.cloned()
.map(|item| decode_grpc_request("core.batch_insert", item))
.collect();
}
if extracted.is_some() {
return Err(SdkError::ValidationError(
"batch_insert gRPC payload items/requests/nodes must be an array".to_string(),
));
}
let single = decode_grpc_request("core.batch_insert", payload)?;
Ok(vec![single])
}
fn value_to_param(value: &Value) -> Option<String> {
match value {
Value::String(v) => Some(v.clone()),
Value::Number(v) => Some(v.to_string()),
Value::Bool(v) => Some(v.to_string()),
_ => None,
}
}
fn value_to_query(value: &Value) -> Option<String> {
match value {
Value::String(v) => Some(v.clone()),
Value::Number(v) => Some(v.to_string()),
Value::Bool(v) => Some(v.to_string()),
Value::Array(items) => {
let rendered: Vec<String> = items.iter().filter_map(value_to_query).collect();
if rendered.is_empty() {
None
} else {
Some(rendered.join(","))
}
}
_ => None,
}
}
fn derive_http_and_grpc(endpoint: &str) -> Result<(String, String, bool)> {
let endpoint = if endpoint.contains("://") {
endpoint.to_string()
} else {
format!("http://{}", endpoint)
};
let parsed = Url::parse(&endpoint).map_err(|e| {
SdkError::ValidationError(format!("invalid endpoint '{}': {}", endpoint, e))
})?;
let host = parsed.host_str().ok_or_else(|| {
SdkError::ValidationError(format!("endpoint '{}' missing host", endpoint))
})?;
let scheme = parsed.scheme();
let port = parsed.port_or_known_default().ok_or_else(|| {
SdkError::ValidationError(format!(
"endpoint '{}' missing known default port",
endpoint
))
})?;
let default_port = match scheme {
"https" => 443,
_ => 80,
};
let http_endpoint = if port == default_port {
format!("{}://{}", scheme, host)
} else {
format!("{}://{}:{}", scheme, host, port)
};
let grpc_endpoint = format!("{}:{}", host, port);
let grpc_tls = scheme.eq_ignore_ascii_case("https");
Ok((http_endpoint, grpc_endpoint, grpc_tls))
}
fn normalize_http_endpoint(endpoint: &str) -> Result<String> {
let endpoint = if endpoint.contains("://") {
endpoint.to_string()
} else {
format!("http://{}", endpoint)
};
let parsed = Url::parse(&endpoint).map_err(|e| {
SdkError::ValidationError(format!("invalid http_endpoint '{}': {}", endpoint, e))
})?;
let host = parsed.host_str().ok_or_else(|| {
SdkError::ValidationError(format!("http_endpoint '{}' missing host", endpoint))
})?;
let scheme = parsed.scheme();
let port = parsed.port_or_known_default().ok_or_else(|| {
SdkError::ValidationError(format!(
"http_endpoint '{}' missing known default port",
endpoint
))
})?;
let default_port = if scheme.eq_ignore_ascii_case("https") {
443
} else {
80
};
let normalized = if port == default_port {
format!("{}://{}", scheme, host)
} else {
format!("{}://{}:{}", scheme, host, port)
};
Ok(normalized)
}
fn normalize_grpc_endpoint(endpoint: &str) -> Result<(String, bool)> {
if endpoint.contains("://") {
let parsed = Url::parse(endpoint).map_err(|e| {
SdkError::ValidationError(format!("invalid grpc_endpoint '{}': {}", endpoint, e))
})?;
let host = parsed.host_str().ok_or_else(|| {
SdkError::ValidationError(format!("grpc_endpoint '{}' missing host", endpoint))
})?;
let port = parsed.port_or_known_default().ok_or_else(|| {
SdkError::ValidationError(format!(
"grpc_endpoint '{}' missing known default port",
endpoint
))
})?;
return Ok((
format!("{}:{}", host, port),
parsed.scheme().eq_ignore_ascii_case("https")
|| parsed.scheme().eq_ignore_ascii_case("grpcs"),
));
}
let endpoint = endpoint.trim();
if endpoint.is_empty() {
return Err(SdkError::ValidationError(
"grpc_endpoint cannot be empty".to_string(),
));
}
if let Some((host, port_text)) = endpoint.rsplit_once(':') {
if !host.trim().is_empty() {
if let Ok(port) = port_text.parse::<u16>() {
return Ok((format!("{}:{}", host, port), port == 443));
}
}
return Ok((endpoint.to_string(), false));
}
Ok((format!("{}:50051", endpoint), false))
}
fn map_grpc_connect_error(error: tonic::transport::Error, endpoint: &str) -> SdkError {
let lower = error.to_string().to_lowercase();
let kind = if lower.contains("deadline") || lower.contains("timed out") {
TransportFailureKind::DeadlineExceeded
} else if lower.contains("connection reset") {
TransportFailureKind::ConnectionReset
} else if lower.contains("dns")
|| lower.contains("refused")
|| lower.contains("unavailable")
|| lower.contains("not connected")
{
TransportFailureKind::Unavailable
} else {
TransportFailureKind::Io
};
SdkError::TransportError {
kind,
message: format!("failed to connect to gRPC endpoint {}: {}", endpoint, error),
}
}
fn map_grpc_status(status: Status) -> SdkError {
let message = status.message().to_string();
match status.code() {
Code::Unauthenticated | Code::PermissionDenied => SdkError::AuthError(message),
Code::InvalidArgument
| Code::NotFound
| Code::AlreadyExists
| Code::FailedPrecondition
| Code::OutOfRange => SdkError::ValidationError(message),
Code::Unavailable => SdkError::TransportError {
kind: TransportFailureKind::Unavailable,
message,
},
Code::DeadlineExceeded => SdkError::TransportError {
kind: TransportFailureKind::DeadlineExceeded,
message,
},
Code::Unimplemented => SdkError::TransportError {
kind: TransportFailureKind::Unimplemented,
message,
},
Code::Cancelled => SdkError::TransportError {
kind: TransportFailureKind::Io,
message,
},
Code::Unknown | Code::Internal => {
let lower = message.to_lowercase();
if lower.contains("connection reset") {
SdkError::TransportError {
kind: TransportFailureKind::ConnectionReset,
message,
}
} else if lower.contains("transport")
|| lower.contains("broken pipe")
|| lower.contains("io error")
{
SdkError::TransportError {
kind: TransportFailureKind::Io,
message,
}
} else {
SdkError::ServerError(message)
}
}
_ => SdkError::ServerError(message),
}
}
fn map_transport_error(error: reqwest::Error, message: String) -> SdkError {
let lower = error.to_string().to_lowercase();
let kind = if error.is_timeout() {
TransportFailureKind::DeadlineExceeded
} else if error.is_connect() {
TransportFailureKind::Unavailable
} else if lower.contains("connection reset") {
TransportFailureKind::ConnectionReset
} else if error.is_request() || error.is_body() {
TransportFailureKind::Io
} else {
TransportFailureKind::Other
};
SdkError::TransportError {
kind,
message: format!("{}: {}", message, error),
}
}
fn map_http_error(status: u16, body: String) -> SdkError {
match status {
401 | 403 => SdkError::AuthError(body),
400 | 404 | 409 | 422 => SdkError::ValidationError(body),
501 => SdkError::UnsupportedFeatureError(body),
_ => SdkError::ServerError(body),
}
}
fn http_method_label(method: HttpMethod) -> &'static str {
match method {
HttpMethod::Get => "GET",
HttpMethod::Post => "POST",
HttpMethod::Delete => "DELETE",
}
}