1pub mod contract;
2pub mod learn;
3pub mod proto {
4 pub mod mubit {
5 pub mod v1 {
6 tonic::include_proto!("mubit.v1");
7 }
8 }
9}
10
11use crate::contract::{find_operation, HttpMethod, OperationSpec};
12use reqwest::header::CONTENT_TYPE;
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use serde_json::{json, Map, Value};
16use std::collections::HashSet;
17use std::pin::Pin;
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20use thiserror::Error;
21use tokio::sync::Mutex;
22use tokio::time::sleep;
23use tokio_stream::{Stream, StreamExt};
24use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
25use tonic::{Code, Request, Status};
26use url::Url;
27
28pub type ValueStream = Pin<Box<dyn Stream<Item = Result<Value>> + Send>>;
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum TransportMode {
33 Auto,
34 Grpc,
35 Http,
36}
37
38impl TransportMode {
39 fn normalize(raw: &str) -> Self {
40 match raw.trim().to_lowercase().as_str() {
41 "grpc" => Self::Grpc,
42 "http" => Self::Http,
43 _ => Self::Auto,
44 }
45 }
46}
47
48const DEFAULT_SHARED_HTTP_ENDPOINT: &str = "https://api.mubit.ai";
49const DEFAULT_SHARED_GRPC_ENDPOINT: &str = "grpc.api.mubit.ai:443";
50
51fn env_non_empty(name: &str) -> Option<String> {
52 std::env::var(name)
53 .ok()
54 .map(|value| value.trim().to_string())
55 .filter(|value| !value.is_empty())
56}
57
58#[derive(Clone, Debug)]
59pub struct ClientConfig {
60 pub endpoint: String,
61 pub grpc_endpoint: Option<String>,
62 pub http_endpoint: Option<String>,
63 pub transport: TransportMode,
64 pub api_key: Option<String>,
65 pub token: Option<String>, pub run_id: Option<String>,
67 pub timeout_ms: u64,
68}
69
70impl ClientConfig {
71 pub fn new(endpoint: impl Into<String>) -> Self {
72 Self {
73 endpoint: endpoint.into(),
74 grpc_endpoint: None,
75 http_endpoint: None,
76 transport: TransportMode::Auto,
77 api_key: None,
78 token: None,
79 run_id: None,
80 timeout_ms: 30_000,
81 }
82 }
83
84 pub fn transport(mut self, transport: impl AsRef<str>) -> Self {
85 self.transport = TransportMode::normalize(transport.as_ref());
86 self
87 }
88
89 pub fn api_key(mut self, api_key: impl Into<String>) -> Self {
90 self.api_key = Some(api_key.into());
91 self
92 }
93
94 pub fn token(mut self, token: impl Into<String>) -> Self {
95 self.api_key = Some(token.into());
96 self
97 }
98
99 pub fn run_id(mut self, run_id: impl Into<String>) -> Self {
100 self.run_id = Some(run_id.into());
101 self
102 }
103
104 pub fn from_env() -> Self {
105 Self::default()
106 }
107}
108
109impl Default for ClientConfig {
110 fn default() -> Self {
111 let transport = env_non_empty("MUBIT_TRANSPORT")
112 .map(|value| TransportMode::normalize(&value))
113 .unwrap_or(TransportMode::Auto);
114 let endpoint = env_non_empty("MUBIT_ENDPOINT")
115 .unwrap_or_else(|| DEFAULT_SHARED_HTTP_ENDPOINT.to_string());
116
117 let mut config = Self::new(endpoint);
118 config.transport = transport;
119 config.http_endpoint = env_non_empty("MUBIT_HTTP_ENDPOINT");
120 config.grpc_endpoint = env_non_empty("MUBIT_GRPC_ENDPOINT");
121 config.api_key = env_non_empty("MUBIT_API_KEY");
122 config.token = env_non_empty("MUBIT_TOKEN");
123 config.run_id = env_non_empty("MUBIT_RUN_ID");
124
125 if config.http_endpoint.is_none() {
126 config.http_endpoint = Some(DEFAULT_SHARED_HTTP_ENDPOINT.to_string());
127 }
128 if config.grpc_endpoint.is_none() {
129 config.grpc_endpoint = Some(DEFAULT_SHARED_GRPC_ENDPOINT.to_string());
130 }
131
132 config
133 }
134}
135
136#[derive(Clone, Copy, Debug, PartialEq, Eq)]
137pub enum TransportFailureKind {
138 Unavailable,
139 ConnectionReset,
140 DeadlineExceeded,
141 Io,
142 Unimplemented,
143 Other,
144}
145
146#[derive(Error, Debug)]
147pub enum SdkError {
148 #[error("AuthError: {0}")]
149 AuthError(String),
150 #[error("ValidationError: {0}")]
151 ValidationError(String),
152 #[error("TransportError({kind:?}): {message}")]
153 TransportError {
154 kind: TransportFailureKind,
155 message: String,
156 },
157 #[error("ServerError: {0}")]
158 ServerError(String),
159 #[error("UnsupportedFeatureError: {0}")]
160 UnsupportedFeatureError(String),
161}
162
163impl SdkError {
164 fn is_fallback_eligible(&self) -> bool {
165 matches!(
166 self,
167 SdkError::TransportError {
168 kind: TransportFailureKind::Unavailable
169 | TransportFailureKind::ConnectionReset
170 | TransportFailureKind::DeadlineExceeded
171 | TransportFailureKind::Io
172 | TransportFailureKind::Unimplemented,
173 ..
174 }
175 )
176 }
177}
178
179pub type Result<T> = std::result::Result<T, SdkError>;
180
181#[derive(Clone, Debug)]
182struct MutableState {
183 api_key: Option<String>,
184 run_id: Option<String>,
185 transport: TransportMode,
186}
187
188struct TransportEngine {
189 http_endpoint: String,
190 grpc_endpoint: String,
191 grpc_tls: bool,
192 timeout: Duration,
193 http_client: reqwest::Client,
194 grpc_channel: Mutex<Option<Channel>>,
195 state: Arc<RwLock<MutableState>>,
196}
197
198impl TransportEngine {
199 fn new(config: ClientConfig) -> Result<Self> {
200 let (default_http_endpoint, default_grpc_endpoint, default_grpc_tls) =
201 derive_http_and_grpc(&config.endpoint)?;
202
203 let http_endpoint = match config.http_endpoint {
204 Some(http_endpoint) => normalize_http_endpoint(&http_endpoint)?,
205 None => default_http_endpoint,
206 };
207
208 let (grpc_endpoint, grpc_tls) = match config.grpc_endpoint {
209 Some(grpc_endpoint) => normalize_grpc_endpoint(&grpc_endpoint)?,
210 None => (default_grpc_endpoint, default_grpc_tls),
211 };
212
213 let timeout = Duration::from_millis(config.timeout_ms);
214 let http_client = reqwest::Client::builder()
215 .timeout(timeout)
216 .build()
217 .map_err(|e| SdkError::TransportError {
218 kind: TransportFailureKind::Other,
219 message: format!("failed to build HTTP client: {}", e),
220 })?;
221
222 Ok(Self {
223 http_endpoint,
224 grpc_endpoint,
225 grpc_tls,
226 timeout,
227 http_client,
228 grpc_channel: Mutex::new(None),
229 state: Arc::new(RwLock::new(MutableState {
230 api_key: config.api_key.or(config.token),
231 run_id: config.run_id,
232 transport: config.transport,
233 })),
234 })
235 }
236
237 fn set_api_key(&self, api_key: Option<String>) {
238 if let Ok(mut state) = self.state.write() {
239 state.api_key = api_key;
240 }
241 }
242
243 fn set_run_id(&self, run_id: Option<String>) {
244 if let Ok(mut state) = self.state.write() {
245 state.run_id = run_id;
246 }
247 }
248
249 fn set_transport(&self, transport: TransportMode) {
250 if let Ok(mut state) = self.state.write() {
251 state.transport = transport;
252 }
253 }
254
255 fn api_key(&self) -> Option<String> {
256 self.state.read().ok().and_then(|s| s.api_key.clone())
257 }
258
259 fn run_id(&self) -> Option<String> {
260 self.state.read().ok().and_then(|s| s.run_id.clone())
261 }
262
263 fn transport(&self) -> TransportMode {
264 self.state
265 .read()
266 .map(|s| s.transport)
267 .unwrap_or(TransportMode::Auto)
268 }
269
270 async fn invoke_serialized<T: Serialize>(&self, op_key: &str, payload: T) -> Result<Value> {
271 let value = serde_json::to_value(payload).map_err(|e| {
272 SdkError::ValidationError(format!("failed to serialize request payload: {}", e))
273 })?;
274 self.invoke(op_key, value).await
275 }
276
277 async fn invoke(&self, op_key: &str, payload: Value) -> Result<Value> {
278 let op = find_operation(op_key).ok_or_else(|| {
279 SdkError::UnsupportedFeatureError(format!("unknown operation: {}", op_key))
280 })?;
281
282 let mut payload = ensure_object_payload(payload)?;
283 self.apply_run_id_default(op, &mut payload);
284
285 match self.transport() {
286 TransportMode::Grpc => self.invoke_grpc(op, payload).await,
287 TransportMode::Http => self.invoke_http(op, payload).await,
288 TransportMode::Auto => match self.invoke_grpc(op, payload.clone()).await {
289 Ok(value) => Ok(value),
290 Err(err) if err.is_fallback_eligible() => self.invoke_http(op, payload).await,
291 Err(err) => Err(err),
292 },
293 }
294 }
295
296 pub async fn invoke_stream(&self, key: &str, payload: Value) -> Result<ValueStream> {
297 let op = find_operation(key).ok_or_else(|| {
298 SdkError::UnsupportedFeatureError(format!("unknown operation: {}", key))
299 })?;
300
301 let mut payload = ensure_object_payload(payload)?;
302 self.apply_run_id_default(op, &mut payload);
303
304 match self.transport() {
305 TransportMode::Grpc => self.invoke_grpc_stream(op, payload).await,
306 TransportMode::Http => self.invoke_http_stream(op, payload).await,
307 TransportMode::Auto => match self.invoke_grpc_stream(op, payload.clone()).await {
308 Ok(stream) => Ok(stream),
309 Err(err) if err.is_fallback_eligible() => {
310 self.invoke_http_stream(op, payload).await
311 }
312 Err(err) => Err(err),
313 },
314 }
315 }
316
317 pub async fn invoke_stream_serialized<T: Serialize>(
318 &self,
319 key: &str,
320 payload: T,
321 ) -> Result<ValueStream> {
322 let value = serde_json::to_value(payload).map_err(|e| {
323 SdkError::ValidationError(format!("failed to serialize payload: {}", e))
324 })?;
325 self.invoke_stream(key, value).await
326 }
327
328 fn apply_run_id_default(&self, op: &OperationSpec, payload: &mut Value) {
329 let Some(run_id_field) = op.run_id_field else {
330 return;
331 };
332 let Some(run_id) = self.run_id() else {
333 return;
334 };
335
336 if let Some(map) = payload.as_object_mut() {
337 if !map.contains_key(run_id_field) {
338 map.insert(run_id_field.to_string(), Value::String(run_id));
339 }
340 }
341 }
342
343 async fn grpc_channel(&self) -> Result<Channel> {
344 {
345 let guard = self.grpc_channel.lock().await;
346 if let Some(channel) = guard.as_ref() {
347 return Ok(channel.clone());
348 }
349 }
350
351 let uri = if self.grpc_tls {
352 format!("https://{}", self.grpc_endpoint)
353 } else {
354 format!("http://{}", self.grpc_endpoint)
355 };
356
357 let mut endpoint = Endpoint::from_shared(uri.clone()).map_err(|e| {
358 SdkError::ValidationError(format!("invalid gRPC endpoint '{}': {}", uri, e))
359 })?;
360 endpoint = endpoint.connect_timeout(self.timeout).timeout(self.timeout);
361 if self.grpc_tls {
362 endpoint = endpoint.tls_config(ClientTlsConfig::new()).map_err(|e| {
363 SdkError::TransportError {
364 kind: TransportFailureKind::Other,
365 message: format!("failed to configure TLS for {}: {}", self.grpc_endpoint, e),
366 }
367 })?;
368 }
369
370 let channel = endpoint
371 .connect()
372 .await
373 .map_err(|e| map_grpc_connect_error(e, &self.grpc_endpoint))?;
374
375 let mut guard = self.grpc_channel.lock().await;
376 *guard = Some(channel.clone());
377 Ok(channel)
378 }
379
380 fn attach_grpc_metadata<T>(&self, request: &mut Request<T>) {
381 if let Some(api_key) = self.api_key() {
382 if let Ok(header_value) = format!("Bearer {}", api_key).parse() {
383 request.metadata_mut().insert("authorization", header_value);
384 }
385 }
386 }
387
388 fn grpc_request<T>(&self, payload: T) -> Request<T> {
389 let mut request = Request::new(payload);
390 self.attach_grpc_metadata(&mut request);
391 request
392 }
393
394 async fn invoke_grpc(&self, op: &OperationSpec, payload: Value) -> Result<Value> {
395 use crate::proto::mubit::v1 as pb;
396
397 if op.grpc_method.is_empty() {
398 return Err(SdkError::TransportError {
399 kind: TransportFailureKind::Unimplemented,
400 message: format!("operation {} has no gRPC mapping", op.key),
401 });
402 }
403
404 let channel = self.grpc_channel().await?;
405
406 macro_rules! unary_core {
407 ($method:ident, $req_ty:ty) => {{
408 let request: $req_ty = decode_grpc_request(op.key, payload)?;
409 let mut client = pb::core_service_client::CoreServiceClient::new(channel.clone());
410 let response = client
411 .$method(self.grpc_request(request))
412 .await
413 .map_err(map_grpc_status)?;
414 encode_grpc_response(op.key, response.into_inner())
415 }};
416 }
417
418 macro_rules! unary_control {
419 ($method:ident, $req_ty:ty) => {{
420 let request: $req_ty = decode_grpc_request(op.key, payload)?;
421 let mut client =
422 pb::control_service_client::ControlServiceClient::new(channel.clone());
423 let response = client
424 .$method(self.grpc_request(request))
425 .await
426 .map_err(map_grpc_status)?;
427 encode_grpc_response(op.key, response.into_inner())
428 }};
429 }
430
431 match op.key {
432 "auth.health" => unary_core!(health, pb::HealthRequest),
434 "auth.create_user" => unary_core!(create_user, pb::CreateUserRequest),
435 "auth.rotate_user_api_key" => {
436 unary_core!(rotate_user_api_key, pb::RotateUserApiKeyRequest)
437 }
438 "auth.revoke_user_api_key" => {
439 unary_core!(revoke_user_api_key, pb::RevokeUserApiKeyRequest)
440 }
441 "auth.list_users" => unary_core!(list_users, pb::ListUsersRequest),
442 "auth.get_user" => unary_core!(get_user, pb::GetUserRequest),
443 "auth.delete_user" => unary_core!(delete_user, pb::DeleteUserRequest),
444
445 "core.insert" => unary_core!(insert, pb::InsertRequest),
447 "core.search" => unary_core!(search, pb::SearchRequest),
448 "core.delete_node" => unary_core!(delete_node, pb::DeleteNodeRequest),
449 "core.delete_run" => unary_core!(delete_run, pb::DeleteRunRequest),
450 "core.create_session" => unary_core!(create_session, pb::CreateSessionRequest),
451 "core.snapshot_session" => unary_core!(snapshot_session, pb::SnapshotSessionRequest),
452 "core.load_session" => unary_core!(load_session, pb::LoadSessionRequest),
453 "core.commit_session" => unary_core!(commit_session, pb::CommitSessionRequest),
454 "core.drop_session" => unary_core!(drop_session, pb::DropSessionRequest),
455 "core.write_memory" => unary_core!(write_memory, pb::WriteMemoryRequest),
456 "core.read_memory" => unary_core!(read_memory, pb::ReadMemoryRequest),
457 "core.add_memory" => unary_core!(add_memory, pb::AddMemoryRequest),
458 "core.get_memory" => unary_core!(get_memory, pb::GetMemoryRequest),
459 "core.clear_memory" => unary_core!(clear_memory, pb::ClearMemoryRequest),
460 "core.grant_permission" => unary_core!(grant_permission, pb::GrantPermissionRequest),
461 "core.revoke_permission" => unary_core!(revoke_permission, pb::RevokePermissionRequest),
462 "core.batch_insert" => {
463 let request_items = decode_batch_insert_payload(payload)?;
464 let mut request = Request::new(tokio_stream::iter(request_items));
465 self.attach_grpc_metadata(&mut request);
466 let mut client = pb::core_service_client::CoreServiceClient::new(channel.clone());
467 let response = client
468 .batch_insert(request)
469 .await
470 .map_err(map_grpc_status)?;
471 encode_grpc_response(op.key, response.into_inner())
472 }
473 "control.set_variable" => unary_control!(set_variable, pb::SetVariableRequest),
475 "control.get_variable" => unary_control!(get_variable, pb::GetVariableRequest),
476 "control.list_variables" => unary_control!(list_variables, pb::ListVariablesRequest),
477 "control.delete_variable" => unary_control!(delete_variable, pb::DeleteVariableRequest),
478 "control.define_concept" => unary_control!(define_concept, pb::DefineConceptRequest),
479 "control.list_concepts" => unary_control!(list_concepts, pb::ListConceptsRequest),
480 "control.add_goal" => unary_control!(add_goal, pb::AddGoalRequest),
481 "control.update_goal" => unary_control!(update_goal, pb::UpdateGoalRequest),
482 "control.list_goals" => unary_control!(list_goals, pb::ListGoalsRequest),
483 "control.get_goal_tree" => unary_control!(get_goal_tree, pb::GetGoalTreeRequest),
484 "control.submit_action" => unary_control!(submit_action, pb::ActionRequest),
485 "control.get_action_log" => unary_control!(get_action_log, pb::ActionLogRequest),
486 "control.run_cycle" => unary_control!(run_cycle, pb::RunCycleRequest),
487 "control.get_cycle_history" => {
488 unary_control!(get_cycle_history, pb::CycleHistoryRequest)
489 }
490 "control.register_agent" => unary_control!(register_agent, pb::AgentRegisterRequest),
491 "control.agent_heartbeat" => {
492 unary_control!(agent_heartbeat, pb::AgentHeartbeatRequest)
493 }
494 "control.append_activity" => unary_control!(append_activity, pb::ActivityAppendRequest),
495 "control.context_snapshot" => unary_control!(get_run_snapshot, pb::RunSnapshotRequest),
496 "control.link_run" => unary_control!(link_run, pb::LinkRunRequest),
497 "control.unlink_run" => unary_control!(unlink_run, pb::UnlinkRunRequest),
498 "control.ingest" => unary_control!(ingest, pb::IngestRequest),
499 "control.batch_insert" => {
500 unary_control!(batch_insert, pb::ControlBatchInsertRequest)
501 }
502 "control.get_ingest_job" => unary_control!(get_ingest_job, pb::GetIngestJobRequest),
503 "control.query" => unary_control!(query, pb::AgentQueryRequest),
504 "control.diagnose" => unary_control!(diagnose, pb::DiagnoseRequest),
505 "control.delete_run" => unary_control!(delete_run, pb::RunRequest),
506 "control.reflect" => unary_control!(reflect, pb::ReflectRequest),
507 "control.lessons" => unary_control!(list_lessons, pb::ListLessonsRequest),
508 "control.delete_lesson" => unary_control!(delete_lesson, pb::DeleteLessonRequest),
509 "control.context" => unary_control!(get_context, pb::ContextRequest),
510 "control.list_activity" => unary_control!(list_activity, pb::ListActivityRequest),
511 "control.export_activity" => {
512 unary_control!(export_activity, pb::ExportActivityRequest)
513 }
514 "control.archive_block" => unary_control!(archive_block, pb::ArchiveBlockRequest),
515 "control.dereference" => unary_control!(dereference, pb::DereferenceRequest),
516 "control.memory_health" => {
517 unary_control!(get_memory_health, pb::MemoryHealthRequest)
518 }
519 "control.checkpoint" => unary_control!(checkpoint, pb::CheckpointRequest),
520 "control.list_agents" => unary_control!(list_agents, pb::ListAgentsRequest),
521 "control.create_handoff" => unary_control!(create_handoff, pb::HandoffRequest),
522 "control.submit_feedback" => unary_control!(submit_feedback, pb::FeedbackRequest),
523 "control.record_outcome" => {
524 unary_control!(record_outcome, pb::RecordOutcomeRequest)
525 }
526 "control.surface_strategies" => {
527 unary_control!(surface_strategies, pb::SurfaceStrategiesRequest)
528 }
529 _ => Err(SdkError::UnsupportedFeatureError(format!(
530 "unknown gRPC operation: {}",
531 op.key
532 ))),
533 }
534 }
535
536 async fn invoke_grpc_stream(
537 &self,
538 op: &'static OperationSpec,
539 payload: Value,
540 ) -> Result<ValueStream> {
541 use crate::proto::mubit::v1 as pb;
542
543 let channel = self.grpc_channel().await?;
544
545 match op.key {
546 "core.subscribe_events" => {
547 let request: pb::CoreSubscribeRequest =
548 decode_grpc_request(op.key, payload)?;
549 let mut client =
550 pb::core_service_client::CoreServiceClient::new(channel.clone());
551 let response = client
552 .subscribe(self.grpc_request(request))
553 .await
554 .map_err(map_grpc_status)?;
555 let stream = response.into_inner();
556 Ok(Box::pin(stream.map(|result| {
557 result
558 .map(|msg| {
559 serde_json::to_value(msg).unwrap_or_else(|e| {
560 serde_json::json!({"error": e.to_string()})
561 })
562 })
563 .map_err(map_grpc_status)
564 })))
565 }
566 "control.subscribe" => {
567 let request: pb::SubscribeRequest =
568 decode_grpc_request(op.key, payload)?;
569 let mut client =
570 pb::control_service_client::ControlServiceClient::new(channel.clone());
571 let response = client
572 .subscribe(self.grpc_request(request))
573 .await
574 .map_err(map_grpc_status)?;
575 let stream = response.into_inner();
576 Ok(Box::pin(stream.map(|result| {
577 result
578 .map(|msg| {
579 serde_json::to_value(msg).unwrap_or_else(|e| {
580 serde_json::json!({"error": e.to_string()})
581 })
582 })
583 .map_err(map_grpc_status)
584 })))
585 }
586 "core.watch_memory" => {
587 let request: pb::WatchMemoryRequest =
588 decode_grpc_request(op.key, payload)?;
589 let mut client =
590 pb::core_service_client::CoreServiceClient::new(channel.clone());
591 let response = client
592 .watch_memory(self.grpc_request(request))
593 .await
594 .map_err(map_grpc_status)?;
595 let stream = response.into_inner();
596 Ok(Box::pin(stream.map(|result| {
597 result
598 .map(|msg| {
599 serde_json::to_value(msg).unwrap_or_else(|e| {
600 serde_json::json!({"error": e.to_string()})
601 })
602 })
603 .map_err(map_grpc_status)
604 })))
605 }
606 _ => Err(SdkError::UnsupportedFeatureError(format!(
607 "gRPC streaming not supported for {}",
608 op.key
609 ))),
610 }
611 }
612
613 async fn invoke_http_stream(
614 &self,
615 op: &'static OperationSpec,
616 payload: Value,
617 ) -> Result<ValueStream> {
618 let base = self.http_endpoint.trim_end_matches('/');
619 let route = op.http_path;
620 let url = format!("{}{}", base, route);
621
622 let client = &self.http_client;
623 let is_get = matches!(op.http_method, HttpMethod::Get);
624
625 let mut request = if is_get {
626 let mut req = client.get(&url);
627 if let Some(obj) = payload.as_object() {
628 for (k, v) in obj {
629 let val = match v {
630 Value::String(s) => s.clone(),
631 other => other.to_string(),
632 };
633 req = req.query(&[(k.as_str(), val)]);
634 }
635 }
636 req
637 } else {
638 client.post(&url).json(&payload)
639 };
640
641 if let Some(api_key) = self.api_key() {
642 request = request.bearer_auth(api_key);
643 }
644
645 let response = request.send().await.map_err(|e| {
646 map_transport_error(e, format!("{} SSE request failed", op.key))
647 })?;
648
649 let status = response.status();
650 if !status.is_success() {
651 let body = response
652 .text()
653 .await
654 .unwrap_or_else(|_| "request failed".to_string());
655 return Err(map_http_error(status.as_u16(), body));
656 }
657
658 let byte_stream = response.bytes_stream();
659 let sse_stream = parse_sse_byte_stream(byte_stream);
660 Ok(Box::pin(sse_stream))
661 }
662
663 async fn invoke_http(&self, op: &OperationSpec, payload: Value) -> Result<Value> {
664 let mut path = op.http_path.to_string();
665 let mut consumed_keys = HashSet::new();
666
667 if let Some(map) = payload.as_object() {
668 for (key, value) in map {
669 let marker = format!(":{}", key);
670 if path.contains(&marker) {
671 let rendered = value_to_param(value).ok_or_else(|| {
672 SdkError::ValidationError(format!(
673 "invalid path parameter value for {} in {}",
674 key, op.key
675 ))
676 })?;
677 path = path.replace(&marker, &rendered);
678 consumed_keys.insert(key.clone());
679 }
680 }
681 }
682
683 if path.contains(':') {
684 return Err(SdkError::ValidationError(format!(
685 "missing path parameter for {}",
686 op.key
687 )));
688 }
689
690 let url = format!("{}{}", self.http_endpoint.trim_end_matches('/'), path);
691 let mut request = match op.http_method {
692 HttpMethod::Get => self.http_client.get(&url),
693 HttpMethod::Post => self.http_client.post(&url),
694 HttpMethod::Delete => self.http_client.delete(&url),
695 };
696
697 if let Some(api_key) = self.api_key() {
698 request = request.bearer_auth(api_key);
699 }
700
701 if matches!(op.http_method, HttpMethod::Get) {
702 let query = payload
703 .as_object()
704 .map(|map| {
705 map.iter()
706 .filter_map(|(key, value)| {
707 if consumed_keys.contains(key) || value.is_null() {
708 return None;
709 }
710 value_to_query(value).map(|rendered| (key.clone(), rendered))
711 })
712 .collect::<Vec<(String, String)>>()
713 })
714 .unwrap_or_default();
715
716 if !query.is_empty() {
717 request = request.query(&query);
718 }
719 } else if payload
720 .as_object()
721 .map(|map| !map.is_empty())
722 .unwrap_or(false)
723 {
724 request = request.json(&payload);
725 }
726
727 let response = request.send().await.map_err(|e| {
728 map_transport_error(
729 e,
730 format!(
731 "{} {} request failed",
732 http_method_label(op.http_method),
733 op.key
734 ),
735 )
736 })?;
737
738 let status = response.status();
739 if !status.is_success() {
740 let body = response
741 .text()
742 .await
743 .unwrap_or_else(|_| "request failed".to_string());
744 return Err(map_http_error(status.as_u16(), body));
745 }
746
747 let content_type = response
748 .headers()
749 .get(CONTENT_TYPE)
750 .and_then(|v| v.to_str().ok())
751 .map(|v| v.to_lowercase())
752 .unwrap_or_default();
753
754 let bytes = response
755 .bytes()
756 .await
757 .map_err(|e| SdkError::TransportError {
758 kind: TransportFailureKind::Io,
759 message: format!("failed to read response body for {}: {}", op.key, e),
760 })?;
761
762 if bytes.is_empty() {
763 return Ok(json!({}));
764 }
765
766 if content_type.contains("application/json") {
767 return serde_json::from_slice::<Value>(&bytes).map_err(|e| {
768 SdkError::ServerError(format!(
769 "failed to decode json response for {}: {}",
770 op.key, e
771 ))
772 });
773 }
774
775 Ok(Value::String(String::from_utf8_lossy(&bytes).to_string()))
776 }
777}
778
779#[derive(Clone)]
780pub struct Client {
781 pub auth: AuthClient,
782 pub core: CoreClient,
783 pub control: ControlClient,
784 transport: Arc<TransportEngine>,
785}
786
787impl Client {
788 pub fn new(config: ClientConfig) -> Result<Self> {
789 let transport = Arc::new(TransportEngine::new(config)?);
790 Ok(Self {
791 auth: AuthClient {
792 transport: transport.clone(),
793 },
794 core: CoreClient {
795 transport: transport.clone(),
796 },
797 control: ControlClient {
798 transport: transport.clone(),
799 },
800 transport,
801 })
802 }
803
804 pub fn set_api_key(&self, api_key: Option<String>) {
805 self.transport.set_api_key(api_key);
806 }
807
808 pub fn set_token(&self, token: Option<String>) {
809 self.set_api_key(token);
810 }
811
812 pub fn set_run_id(&self, run_id: Option<String>) {
813 self.transport.set_run_id(run_id);
814 }
815
816 pub fn set_transport(&self, transport: TransportMode) {
817 self.transport.set_transport(transport);
818 }
819}
820
821#[derive(Clone, Debug)]
822pub struct RememberOptions {
823 pub run_id: Option<String>,
824 pub agent_id: Option<String>,
825 pub item_id: Option<String>,
826 pub content: String,
827 pub content_type: String,
828 pub metadata: Option<Value>,
829 pub hints: Option<Value>,
830 pub payload: Option<Value>,
831 pub intent: Option<String>,
832 pub lesson_type: Option<String>,
833 pub lesson_scope: Option<String>,
834 pub lesson_importance: Option<String>,
835 pub lesson_conditions: Vec<String>,
836 pub user_id: Option<String>,
837 pub upsert_key: Option<String>,
838 pub importance: Option<String>,
839 pub source: Option<String>,
840 pub lane: Option<String>,
841 pub parallel: bool,
842 pub idempotency_key: Option<String>,
843 pub wait: bool,
844 pub timeout_ms: Option<u64>,
845 pub poll_interval_ms: u64,
846 pub occurrence_time: Option<i64>,
850}
851
852impl RememberOptions {
853 pub fn new(content: impl Into<String>) -> Self {
854 Self {
855 run_id: None,
856 agent_id: Some("sdk-client".to_string()),
857 item_id: None,
858 content: content.into(),
859 content_type: "text/plain".to_string(),
860 metadata: None,
861 hints: None,
862 payload: None,
863 intent: None,
864 lesson_type: None,
865 lesson_scope: None,
866 lesson_importance: None,
867 lesson_conditions: Vec::new(),
868 user_id: None,
869 upsert_key: None,
870 importance: None,
871 source: Some("agent".to_string()),
872 lane: None,
873 parallel: false,
874 idempotency_key: None,
875 wait: true,
876 timeout_ms: None,
877 poll_interval_ms: 300,
878 occurrence_time: None,
879 }
880 }
881}
882
883#[derive(Clone, Debug)]
884pub struct RecallOptions {
885 pub run_id: Option<String>,
886 pub query: String,
887 pub schema: Option<String>,
888 pub mode: String,
889 pub direct_lane: String,
890 pub include_linked_runs: bool,
891 pub limit: u64,
892 pub embedding: Vec<f32>,
893 pub entry_types: Vec<String>,
894 pub include_working_memory: bool,
895 pub user_id: Option<String>,
896 pub agent_id: Option<String>,
897 pub lane: Option<String>,
898 pub min_timestamp: Option<i64>,
900 pub max_timestamp: Option<i64>,
902 pub budget: Option<String>,
904}
905
906impl RecallOptions {
907 pub fn new(query: impl Into<String>) -> Self {
908 Self {
909 run_id: None,
910 query: query.into(),
911 schema: None,
912 mode: "agent_routed".to_string(),
913 direct_lane: "semantic_search".to_string(),
914 include_linked_runs: false,
915 limit: 5,
916 embedding: Vec::new(),
917 entry_types: Vec::new(),
918 include_working_memory: true,
919 user_id: None,
920 agent_id: None,
921 lane: None,
922 min_timestamp: None,
923 max_timestamp: None,
924 budget: None,
925 }
926 }
927}
928
929#[derive(Clone, Debug)]
930pub struct GetContextOptions {
931 pub run_id: Option<String>,
932 pub query: Option<String>,
933 pub user_id: Option<String>,
934 pub entry_types: Vec<String>,
935 pub include_working_memory: bool,
936 pub format: Option<String>,
937 pub limit: Option<u64>,
938 pub max_token_budget: Option<u32>,
939 pub agent_id: Option<String>,
940 pub mode: Option<String>,
941 pub sections: Vec<String>,
942 pub lane: Option<String>,
943}
944
945impl Default for GetContextOptions {
946 fn default() -> Self {
947 Self {
948 run_id: None,
949 query: None,
950 user_id: None,
951 entry_types: Vec::new(),
952 include_working_memory: true,
953 format: None,
954 limit: None,
955 max_token_budget: None,
956 agent_id: None,
957 mode: None,
958 sections: Vec::new(),
959 lane: None,
960 }
961 }
962}
963
964#[derive(Clone, Debug)]
965pub struct ArchiveOptions {
966 pub run_id: Option<String>,
967 pub content: String,
968 pub artifact_kind: String,
969 pub metadata: Option<Value>,
970 pub user_id: Option<String>,
971 pub agent_id: Option<String>,
972 pub origin_agent_id: Option<String>,
973 pub source_attempt_id: Option<String>,
974 pub source_tool: Option<String>,
975 pub labels: Vec<String>,
976 pub family: Option<String>,
977 pub importance: Option<String>,
978}
979
980impl ArchiveOptions {
981 pub fn new(content: impl Into<String>, artifact_kind: impl Into<String>) -> Self {
982 Self {
983 run_id: None,
984 content: content.into(),
985 artifact_kind: artifact_kind.into(),
986 metadata: None,
987 user_id: None,
988 agent_id: None,
989 origin_agent_id: None,
990 source_attempt_id: None,
991 source_tool: None,
992 labels: Vec::new(),
993 family: None,
994 importance: None,
995 }
996 }
997}
998
999#[derive(Clone, Debug)]
1000pub struct DereferenceOptions {
1001 pub run_id: Option<String>,
1002 pub reference_id: String,
1003 pub user_id: Option<String>,
1004 pub agent_id: Option<String>,
1005}
1006
1007impl DereferenceOptions {
1008 pub fn new(reference_id: impl Into<String>) -> Self {
1009 Self {
1010 run_id: None,
1011 reference_id: reference_id.into(),
1012 user_id: None,
1013 agent_id: None,
1014 }
1015 }
1016}
1017
1018#[derive(Clone, Debug)]
1019pub struct MemoryHealthOptions {
1020 pub run_id: Option<String>,
1021 pub user_id: Option<String>,
1022 pub stale_threshold_days: u32,
1023 pub limit: u32,
1024}
1025
1026impl Default for MemoryHealthOptions {
1027 fn default() -> Self {
1028 Self {
1029 run_id: None,
1030 user_id: None,
1031 stale_threshold_days: 30,
1032 limit: 500,
1033 }
1034 }
1035}
1036
1037#[derive(Clone, Debug)]
1038pub struct DiagnoseOptions {
1039 pub run_id: Option<String>,
1040 pub error_text: String,
1041 pub error_type: Option<String>,
1042 pub limit: u64,
1043 pub user_id: Option<String>,
1044}
1045
1046impl DiagnoseOptions {
1047 pub fn new(error_text: impl Into<String>) -> Self {
1048 Self {
1049 run_id: None,
1050 error_text: error_text.into(),
1051 error_type: None,
1052 limit: 10,
1053 user_id: None,
1054 }
1055 }
1056}
1057
1058#[derive(Clone, Debug, Default)]
1059pub struct ReflectOptions {
1060 pub run_id: Option<String>,
1061 pub include_linked_runs: bool,
1062 pub user_id: Option<String>,
1063 pub step_id: Option<String>,
1064 pub checkpoint_id: Option<String>,
1065 pub last_n_items: Option<u64>,
1066 pub include_step_outcomes: Option<bool>,
1067}
1068
1069#[derive(Clone, Debug, Default)]
1070pub struct ForgetOptions {
1071 pub run_id: Option<String>,
1072 pub lesson_id: Option<String>,
1073}
1074
1075impl ForgetOptions {
1076 pub fn for_run(run_id: impl Into<String>) -> Self {
1077 Self {
1078 run_id: Some(run_id.into()),
1079 lesson_id: None,
1080 }
1081 }
1082
1083 pub fn for_lesson(lesson_id: impl Into<String>) -> Self {
1084 Self {
1085 run_id: None,
1086 lesson_id: Some(lesson_id.into()),
1087 }
1088 }
1089}
1090
1091#[derive(Clone, Debug)]
1092pub struct CheckpointOptions {
1093 pub run_id: Option<String>,
1094 pub label: Option<String>,
1095 pub context_snapshot: String,
1096 pub metadata: Option<Value>,
1097 pub user_id: Option<String>,
1098 pub agent_id: Option<String>,
1099}
1100
1101impl CheckpointOptions {
1102 pub fn new(context_snapshot: impl Into<String>) -> Self {
1103 Self {
1104 run_id: None,
1105 label: None,
1106 context_snapshot: context_snapshot.into(),
1107 metadata: None,
1108 user_id: None,
1109 agent_id: None,
1110 }
1111 }
1112}
1113
1114#[derive(Clone, Debug)]
1115pub struct RegisterAgentOptions {
1116 pub run_id: Option<String>,
1117 pub agent_id: String,
1118 pub role: String,
1119 pub capabilities: Vec<String>,
1120 pub status: String,
1121 pub read_scopes: Vec<String>,
1122 pub write_scopes: Vec<String>,
1123 pub shared_memory_lanes: Vec<String>,
1124}
1125
1126impl RegisterAgentOptions {
1127 pub fn new(agent_id: impl Into<String>) -> Self {
1128 Self {
1129 run_id: None,
1130 agent_id: agent_id.into(),
1131 role: String::new(),
1132 capabilities: Vec::new(),
1133 status: "active".to_string(),
1134 read_scopes: Vec::new(),
1135 write_scopes: Vec::new(),
1136 shared_memory_lanes: Vec::new(),
1137 }
1138 }
1139}
1140
1141#[derive(Clone, Debug, Default)]
1142pub struct ListAgentsOptions {
1143 pub run_id: Option<String>,
1144}
1145
1146#[derive(Clone, Debug)]
1147pub struct RecordOutcomeOptions {
1148 pub run_id: Option<String>,
1149 pub reference_id: String,
1150 pub outcome: String,
1151 pub signal: f32,
1152 pub rationale: String,
1153 pub agent_id: Option<String>,
1154 pub user_id: Option<String>,
1155}
1156
1157impl RecordOutcomeOptions {
1158 pub fn new(reference_id: impl Into<String>, outcome: impl Into<String>) -> Self {
1159 Self {
1160 run_id: None,
1161 reference_id: reference_id.into(),
1162 outcome: outcome.into(),
1163 signal: 0.0,
1164 rationale: String::new(),
1165 agent_id: None,
1166 user_id: None,
1167 }
1168 }
1169}
1170
1171#[derive(Clone, Debug)]
1172pub struct RecordStepOutcomeOptions {
1173 pub run_id: Option<String>,
1174 pub step_id: String,
1175 pub step_name: Option<String>,
1176 pub outcome: String,
1177 pub signal: f32,
1178 pub rationale: String,
1179 pub directive_hint: Option<String>,
1180 pub agent_id: Option<String>,
1181 pub user_id: Option<String>,
1182 pub metadata: Option<Value>,
1183}
1184
1185impl RecordStepOutcomeOptions {
1186 pub fn new(step_id: impl Into<String>, outcome: impl Into<String>) -> Self {
1187 Self {
1188 run_id: None,
1189 step_id: step_id.into(),
1190 step_name: None,
1191 outcome: outcome.into(),
1192 signal: 0.0,
1193 rationale: String::new(),
1194 directive_hint: None,
1195 agent_id: None,
1196 user_id: None,
1197 metadata: None,
1198 }
1199 }
1200}
1201
1202#[derive(Clone, Debug)]
1203pub struct SurfaceStrategiesOptions {
1204 pub run_id: Option<String>,
1205 pub lesson_types: Vec<String>,
1206 pub max_strategies: u32,
1207 pub user_id: Option<String>,
1208}
1209
1210impl Default for SurfaceStrategiesOptions {
1211 fn default() -> Self {
1212 Self {
1213 run_id: None,
1214 lesson_types: Vec::new(),
1215 max_strategies: 5,
1216 user_id: None,
1217 }
1218 }
1219}
1220
1221#[derive(Clone, Debug)]
1222pub struct HandoffOptions {
1223 pub run_id: Option<String>,
1224 pub task_id: String,
1225 pub from_agent_id: String,
1226 pub to_agent_id: String,
1227 pub content: String,
1228 pub requested_action: String,
1229 pub metadata: Option<Value>,
1230 pub user_id: Option<String>,
1231}
1232
1233impl HandoffOptions {
1234 pub fn new(
1235 task_id: impl Into<String>,
1236 from_agent_id: impl Into<String>,
1237 to_agent_id: impl Into<String>,
1238 content: impl Into<String>,
1239 ) -> Self {
1240 Self {
1241 run_id: None,
1242 task_id: task_id.into(),
1243 from_agent_id: from_agent_id.into(),
1244 to_agent_id: to_agent_id.into(),
1245 content: content.into(),
1246 requested_action: "continue".to_string(),
1247 metadata: None,
1248 user_id: None,
1249 }
1250 }
1251}
1252
1253#[derive(Clone, Debug)]
1254pub struct FeedbackOptions {
1255 pub run_id: Option<String>,
1256 pub handoff_id: String,
1257 pub verdict: String,
1258 pub comments: String,
1259 pub from_agent_id: Option<String>,
1260 pub metadata: Option<Value>,
1261 pub user_id: Option<String>,
1262}
1263
1264impl FeedbackOptions {
1265 pub fn new(handoff_id: impl Into<String>, verdict: impl Into<String>) -> Self {
1266 Self {
1267 run_id: None,
1268 handoff_id: handoff_id.into(),
1269 verdict: verdict.into(),
1270 comments: String::new(),
1271 from_agent_id: None,
1272 metadata: None,
1273 user_id: None,
1274 }
1275 }
1276}
1277
1278impl Client {
1279 pub async fn remember(&self, options: RememberOptions) -> Result<Value> {
1280 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "remember")?;
1281 let content = require_non_empty_string(options.content, "content")?;
1282 let item_id = options
1283 .item_id
1284 .unwrap_or_else(|| generate_helper_id("remember"));
1285 let accepted = self
1286 .control
1287 .ingest(prune_nulls(json!({
1288 "run_id": run_id,
1289 "agent_id": options.agent_id.unwrap_or_else(|| "sdk-client".to_string()),
1290 "idempotency_key": options.idempotency_key.unwrap_or_else(|| item_id.clone()),
1291 "parallel": options.parallel,
1292 "items": [{
1293 "item_id": item_id,
1294 "content_type": options.content_type,
1295 "text": content,
1296 "payload_json": encode_optional_json(options.payload.as_ref())?,
1297 "hints_json": encode_optional_json(options.hints.as_ref())?,
1298 "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1299 "intent": options.intent,
1300 "lesson_type": options.lesson_type,
1301 "lesson_scope": options.lesson_scope,
1302 "lesson_importance": options.lesson_importance,
1303 "lesson_conditions_json": encode_string_vec(&options.lesson_conditions)?,
1304 "user_id": options.user_id,
1305 "upsert_key": options.upsert_key,
1306 "importance": options.importance,
1307 "source": options.source.unwrap_or_else(|| "agent".to_string()),
1308 "lane": options.lane,
1309 "occurrence_time": options.occurrence_time.unwrap_or(0),
1310 }],
1311 })))
1312 .await?;
1313
1314 if !options.wait {
1315 return Ok(accepted);
1316 }
1317
1318 let Some(job_id) = accepted.get("job_id").and_then(|value| value.as_str()) else {
1319 return Ok(accepted);
1320 };
1321
1322 self.wait_for_ingest_job(
1323 &run_id,
1324 job_id,
1325 options
1326 .timeout_ms
1327 .unwrap_or_else(|| self.transport.timeout.as_millis() as u64),
1328 options.poll_interval_ms,
1329 )
1330 .await
1331 }
1332
1333 pub async fn recall(&self, options: RecallOptions) -> Result<Value> {
1334 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "recall")?;
1335 self.control
1336 .query(prune_nulls(json!({
1337 "run_id": run_id,
1338 "query": require_non_empty_string(options.query, "query")?,
1339 "schema": options.schema,
1340 "mode": options.mode,
1341 "direct_lane": options.direct_lane,
1342 "include_linked_runs": options.include_linked_runs,
1343 "limit": options.limit,
1344 "embedding": options.embedding,
1345 "entry_types": if options.entry_types.is_empty() { Value::Null } else { json!(options.entry_types) },
1346 "include_working_memory": options.include_working_memory,
1347 "user_id": options.user_id,
1348 "agent_id": options.agent_id,
1349 "lane": options.lane,
1350 "min_timestamp": options.min_timestamp.unwrap_or(0),
1351 "max_timestamp": options.max_timestamp.unwrap_or(0),
1352 "budget": options.budget,
1353 })))
1354 .await
1355 }
1356
1357 pub async fn get_context(&self, options: GetContextOptions) -> Result<Value> {
1358 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "get_context")?;
1359 self.control
1360 .context(prune_nulls(json!({
1361 "run_id": run_id,
1362 "query": require_non_empty_string(options.query.unwrap_or_default(), "query")?,
1363 "user_id": options.user_id,
1364 "entry_types": if options.entry_types.is_empty() { Value::Null } else { json!(options.entry_types) },
1365 "include_working_memory": options.include_working_memory,
1366 "format": options.format.unwrap_or_else(|| "structured".to_string()),
1367 "limit": options.limit.unwrap_or(5),
1368 "max_token_budget": options.max_token_budget.unwrap_or(0),
1369 "agent_id": options.agent_id,
1370 "mode": options.mode.unwrap_or_else(|| "full".to_string()),
1371 "sections": if options.sections.is_empty() { Value::Null } else { json!(options.sections) },
1372 "lane": options.lane,
1373 })))
1374 .await
1375 }
1376
1377 pub async fn archive(&self, options: ArchiveOptions) -> Result<Value> {
1378 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "archive")?;
1379 let content = require_non_empty_string(options.content, "content")?;
1380 let artifact_kind = require_non_empty_string(options.artifact_kind, "artifact_kind")?;
1381 let agent_id = options.agent_id.clone();
1382 self.control
1383 .archive_block(prune_nulls(json!({
1384 "run_id": run_id,
1385 "content": content,
1386 "artifact_kind": artifact_kind,
1387 "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1388 "user_id": options.user_id,
1389 "agent_id": agent_id.clone(),
1390 "origin_agent_id": options.origin_agent_id.or(agent_id),
1391 "source_attempt_id": options.source_attempt_id,
1392 "source_tool": options.source_tool,
1393 "labels": if options.labels.is_empty() { Value::Null } else { json!(options.labels) },
1394 "family": options.family,
1395 "importance": options.importance,
1396 })))
1397 .await
1398 }
1399
1400 pub async fn archive_block(&self, options: ArchiveOptions) -> Result<Value> {
1401 self.archive(options).await
1402 }
1403
1404 pub async fn dereference(&self, options: DereferenceOptions) -> Result<Value> {
1405 let run_id =
1406 resolve_helper_run_id(options.run_id, self.transport.run_id(), "dereference")?;
1407 self.control
1408 .dereference(prune_nulls(json!({
1409 "run_id": run_id,
1410 "reference_id": require_non_empty_string(options.reference_id, "reference_id")?,
1411 "user_id": options.user_id,
1412 "agent_id": options.agent_id,
1413 })))
1414 .await
1415 }
1416
1417 pub async fn memory_health(&self, options: MemoryHealthOptions) -> Result<Value> {
1418 let run_id =
1419 resolve_helper_run_id(options.run_id, self.transport.run_id(), "memory_health")?;
1420 self.control
1421 .memory_health(prune_nulls(json!({
1422 "run_id": run_id,
1423 "user_id": options.user_id,
1424 "stale_threshold_days": options.stale_threshold_days,
1425 "limit": options.limit,
1426 })))
1427 .await
1428 }
1429
1430 pub async fn diagnose(&self, options: DiagnoseOptions) -> Result<Value> {
1431 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "diagnose")?;
1432 self.control
1433 .diagnose(prune_nulls(json!({
1434 "run_id": run_id,
1435 "error_text": require_non_empty_string(options.error_text, "error_text")?,
1436 "error_type": options.error_type,
1437 "limit": options.limit,
1438 "user_id": options.user_id,
1439 })))
1440 .await
1441 }
1442
1443 pub async fn reflect(&self, options: ReflectOptions) -> Result<Value> {
1444 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "reflect")?;
1445 self.control
1446 .reflect(prune_nulls(json!({
1447 "run_id": run_id,
1448 "include_linked_runs": options.include_linked_runs,
1449 "user_id": options.user_id,
1450 "step_id": options.step_id,
1451 "checkpoint_id": options.checkpoint_id,
1452 "last_n_items": options.last_n_items,
1453 "include_step_outcomes": options.include_step_outcomes,
1454 })))
1455 .await
1456 }
1457
1458 pub async fn forget(&self, options: ForgetOptions) -> Result<Value> {
1459 let delete_lesson = options
1460 .lesson_id
1461 .as_ref()
1462 .map(|value| !value.trim().is_empty())
1463 .unwrap_or(false);
1464 let run_id = if options.run_id.is_some() {
1465 options.run_id
1466 } else if delete_lesson {
1467 None
1468 } else {
1469 self.transport.run_id()
1470 };
1471 let delete_run = run_id
1472 .as_ref()
1473 .map(|value| !value.trim().is_empty())
1474 .unwrap_or(false);
1475
1476 if (delete_lesson as u8) + (delete_run as u8) != 1 {
1477 return Err(SdkError::ValidationError(
1478 "forget requires either lesson_id or run_id, but not both".to_string(),
1479 ));
1480 }
1481
1482 if delete_lesson {
1483 return self
1484 .control
1485 .delete_lesson(json!({ "lesson_id": options.lesson_id.unwrap_or_default() }))
1486 .await;
1487 }
1488
1489 self.control
1490 .delete_run(json!({ "run_id": run_id.unwrap_or_default() }))
1491 .await
1492 }
1493
1494 pub async fn checkpoint(&self, options: CheckpointOptions) -> Result<Value> {
1495 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "checkpoint")?;
1496 self.control
1497 .checkpoint(prune_nulls(json!({
1498 "run_id": run_id,
1499 "label": options.label,
1500 "context_snapshot": require_non_empty_string(options.context_snapshot, "context_snapshot")?,
1501 "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1502 "user_id": options.user_id,
1503 "agent_id": options.agent_id,
1504 })))
1505 .await
1506 }
1507
1508 pub async fn register_agent(&self, options: RegisterAgentOptions) -> Result<Value> {
1509 let run_id =
1510 resolve_helper_run_id(options.run_id, self.transport.run_id(), "register_agent")?;
1511 self.control
1512 .register_agent(prune_nulls(json!({
1513 "run_id": run_id,
1514 "agent_id": require_non_empty_string(options.agent_id, "agent_id")?,
1515 "role": options.role,
1516 "capabilities": if options.capabilities.is_empty() { Value::Null } else { json!(options.capabilities) },
1517 "status": options.status,
1518 "read_scopes": if options.read_scopes.is_empty() { Value::Null } else { json!(options.read_scopes) },
1519 "write_scopes": if options.write_scopes.is_empty() { Value::Null } else { json!(options.write_scopes) },
1520 "shared_memory_lanes": if options.shared_memory_lanes.is_empty() { Value::Null } else { json!(options.shared_memory_lanes) },
1521 })))
1522 .await
1523 }
1524
1525 pub async fn list_agents(&self, options: ListAgentsOptions) -> Result<Value> {
1526 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "list_agents")?;
1527 self.control.list_agents(json!({ "run_id": run_id })).await
1528 }
1529
1530 pub async fn record_outcome(&self, options: RecordOutcomeOptions) -> Result<Value> {
1531 let run_id =
1532 resolve_helper_run_id(options.run_id, self.transport.run_id(), "record_outcome")?;
1533 self.control
1534 .record_outcome(prune_nulls(json!({
1535 "run_id": run_id,
1536 "reference_id": require_non_empty_string(options.reference_id, "reference_id")?,
1537 "outcome": require_non_empty_string(options.outcome, "outcome")?,
1538 "signal": options.signal,
1539 "rationale": options.rationale,
1540 "agent_id": options.agent_id,
1541 "user_id": options.user_id,
1542 })))
1543 .await
1544 }
1545
1546 pub async fn record_step_outcome(&self, options: RecordStepOutcomeOptions) -> Result<Value> {
1547 let run_id =
1548 resolve_helper_run_id(options.run_id, self.transport.run_id(), "record_step_outcome")?;
1549 self.control
1550 .record_outcome(prune_nulls(json!({
1551 "run_id": run_id,
1552 "step_id": require_non_empty_string(options.step_id, "step_id")?,
1553 "step_name": options.step_name,
1554 "outcome": require_non_empty_string(options.outcome, "outcome")?,
1555 "signal": options.signal,
1556 "rationale": options.rationale,
1557 "directive_hint": options.directive_hint,
1558 "agent_id": options.agent_id,
1559 "user_id": options.user_id,
1560 "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1561 })))
1562 .await
1563 }
1564
1565 pub async fn surface_strategies(&self, options: SurfaceStrategiesOptions) -> Result<Value> {
1566 let run_id = resolve_helper_run_id(
1567 options.run_id,
1568 self.transport.run_id(),
1569 "surface_strategies",
1570 )?;
1571 self.control
1572 .surface_strategies(prune_nulls(json!({
1573 "run_id": run_id,
1574 "lesson_types": if options.lesson_types.is_empty() { Value::Null } else { json!(options.lesson_types) },
1575 "max_strategies": options.max_strategies,
1576 "user_id": options.user_id,
1577 })))
1578 .await
1579 }
1580
1581 pub async fn handoff(&self, options: HandoffOptions) -> Result<Value> {
1582 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "handoff")?;
1583 self.control
1584 .create_handoff(prune_nulls(json!({
1585 "run_id": run_id,
1586 "task_id": require_non_empty_string(options.task_id, "task_id")?,
1587 "from_agent_id": require_non_empty_string(options.from_agent_id, "from_agent_id")?,
1588 "to_agent_id": require_non_empty_string(options.to_agent_id, "to_agent_id")?,
1589 "content": require_non_empty_string(options.content, "content")?,
1590 "requested_action": options.requested_action,
1591 "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1592 "user_id": options.user_id,
1593 })))
1594 .await
1595 }
1596
1597 pub async fn feedback(&self, options: FeedbackOptions) -> Result<Value> {
1598 let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "feedback")?;
1599 self.control
1600 .submit_feedback(prune_nulls(json!({
1601 "run_id": run_id,
1602 "handoff_id": require_non_empty_string(options.handoff_id, "handoff_id")?,
1603 "verdict": require_non_empty_string(options.verdict, "verdict")?,
1604 "comments": options.comments,
1605 "from_agent_id": options.from_agent_id,
1606 "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1607 "user_id": options.user_id,
1608 })))
1609 .await
1610 }
1611
1612 async fn wait_for_ingest_job(
1613 &self,
1614 run_id: &str,
1615 job_id: &str,
1616 timeout_ms: u64,
1617 poll_interval_ms: u64,
1618 ) -> Result<Value> {
1619 let deadline = Instant::now() + Duration::from_millis(timeout_ms);
1620 loop {
1621 let job = self
1622 .control
1623 .get_ingest_job(json!({ "run_id": run_id, "job_id": job_id }))
1624 .await?;
1625 if job
1626 .get("done")
1627 .and_then(|value| value.as_bool())
1628 .unwrap_or(false)
1629 {
1630 return Ok(job);
1631 }
1632 if Instant::now() >= deadline {
1633 return Err(SdkError::TransportError {
1634 kind: TransportFailureKind::DeadlineExceeded,
1635 message: format!("timed out waiting for ingest job {}", job_id),
1636 });
1637 }
1638 sleep(Duration::from_millis(poll_interval_ms)).await;
1639 }
1640 }
1641}
1642
1643#[derive(Clone)]
1644pub struct AuthClient {
1645 transport: Arc<TransportEngine>,
1646}
1647
1648impl AuthClient {
1649 pub async fn health(&self) -> Result<Value> {
1650 self.transport.invoke("auth.health", json!({})).await
1651 }
1652
1653 pub fn set_api_key(&self, api_key: Option<String>) {
1654 self.transport.set_api_key(api_key);
1655 }
1656
1657 pub fn set_token(&self, token: Option<String>) {
1658 self.set_api_key(token);
1659 }
1660
1661 pub fn set_run_id(&self, run_id: Option<String>) {
1662 self.transport.set_run_id(run_id);
1663 }
1664}
1665
1666macro_rules! define_auth_payload_methods {
1667 ($($name:ident => $op_key:literal),+ $(,)?) => {
1668 impl AuthClient {
1669 $(
1670 pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
1671 self.transport.invoke_serialized($op_key, payload).await
1672 }
1673 )+
1674 }
1675 };
1676}
1677
1678define_auth_payload_methods!(
1679 create_user => "auth.create_user",
1680 rotate_user_api_key => "auth.rotate_user_api_key",
1681 revoke_user_api_key => "auth.revoke_user_api_key",
1682 list_users => "auth.list_users",
1683 get_user => "auth.get_user",
1684 delete_user => "auth.delete_user"
1685);
1686
1687#[derive(Clone)]
1688pub struct CoreClient {
1689 transport: Arc<TransportEngine>,
1690}
1691
1692macro_rules! define_core_payload_methods {
1693 ($($name:ident => $op_key:literal),+ $(,)?) => {
1694 impl CoreClient {
1695 $(
1696 pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
1697 self.transport.invoke_serialized($op_key, payload).await
1698 }
1699 )+
1700 }
1701 };
1702}
1703
1704define_core_payload_methods!(
1705 insert => "core.insert",
1706 batch_insert => "core.batch_insert",
1707 search => "core.search",
1708 delete_node => "core.delete_node",
1709 delete_run => "core.delete_run",
1710 create_session => "core.create_session",
1711 snapshot_session => "core.snapshot_session",
1712 load_session => "core.load_session",
1713 commit_session => "core.commit_session",
1714 drop_session => "core.drop_session",
1715 write_memory => "core.write_memory",
1716 read_memory => "core.read_memory",
1717 add_memory => "core.add_memory",
1718 get_memory => "core.get_memory",
1719 clear_memory => "core.clear_memory",
1720 grant_permission => "core.grant_permission",
1721 revoke_permission => "core.revoke_permission",
1722 check_permission => "core.check_permission",
1723 unsubscribe_events => "core.unsubscribe_events"
1724);
1725
1726impl CoreClient {
1727 pub async fn subscribe_events<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
1728 self.transport.invoke_stream_serialized("core.subscribe_events", payload).await
1729 }
1730
1731 pub async fn watch_memory<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
1732 self.transport.invoke_stream_serialized("core.watch_memory", payload).await
1733 }
1734
1735 pub async fn list_subscriptions(&self) -> Result<Value> {
1736 self.transport
1737 .invoke("core.list_subscriptions", json!({}))
1738 .await
1739 }
1740
1741 pub async fn storage_stats(&self) -> Result<Value> {
1742 self.transport.invoke("core.storage_stats", json!({})).await
1743 }
1744
1745 pub async fn trigger_compaction(&self) -> Result<Value> {
1746 self.transport
1747 .invoke("core.trigger_compaction", json!({}))
1748 .await
1749 }
1750}
1751
1752#[derive(Clone)]
1753pub struct ControlClient {
1754 transport: Arc<TransportEngine>,
1755}
1756
1757macro_rules! define_control_payload_methods {
1758 ($($name:ident => $op_key:literal),+ $(,)?) => {
1759 impl ControlClient {
1760 $(
1761 pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
1762 self.transport.invoke_serialized($op_key, payload).await
1763 }
1764 )+
1765 }
1766 };
1767}
1768
1769define_control_payload_methods!(
1770 set_variable => "control.set_variable",
1771 get_variable => "control.get_variable",
1772 list_variables => "control.list_variables",
1773 delete_variable => "control.delete_variable",
1774 define_concept => "control.define_concept",
1775 list_concepts => "control.list_concepts",
1776 add_goal => "control.add_goal",
1777 update_goal => "control.update_goal",
1778 list_goals => "control.list_goals",
1779 get_goal_tree => "control.get_goal_tree",
1780 submit_action => "control.submit_action",
1781 get_action_log => "control.get_action_log",
1782 run_cycle => "control.run_cycle",
1783 get_cycle_history => "control.get_cycle_history",
1784 register_agent => "control.register_agent",
1785 agent_heartbeat => "control.agent_heartbeat",
1786 append_activity => "control.append_activity",
1787 context_snapshot => "control.context_snapshot",
1788 link_run => "control.link_run",
1789 unlink_run => "control.unlink_run",
1790 ingest => "control.ingest",
1791 batch_insert => "control.batch_insert",
1792 get_ingest_job => "control.get_ingest_job",
1793 query => "control.query",
1794 diagnose => "control.diagnose",
1795 delete_run => "control.delete_run",
1796 reflect => "control.reflect",
1797 lessons => "control.lessons",
1798 delete_lesson => "control.delete_lesson",
1799 context => "control.context",
1800 archive_block => "control.archive_block",
1801 dereference => "control.dereference",
1802 memory_health => "control.memory_health",
1803 checkpoint => "control.checkpoint",
1804 list_agents => "control.list_agents",
1805 create_handoff => "control.create_handoff",
1806 submit_feedback => "control.submit_feedback",
1807 record_outcome => "control.record_outcome",
1808 surface_strategies => "control.surface_strategies"
1809);
1810
1811impl ControlClient {
1812 pub async fn subscribe<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
1813 self.transport.invoke_stream_serialized("control.subscribe", payload).await
1814 }
1815}
1816
1817fn parse_sse_byte_stream(
1818 byte_stream: impl Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
1819) -> impl Stream<Item = Result<Value>> + Send {
1820 let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value>>(64);
1821 tokio::spawn(async move {
1822 tokio::pin!(byte_stream);
1823 let mut buffer = String::new();
1824 while let Some(chunk_result) = byte_stream.next().await {
1825 match chunk_result {
1826 Ok(chunk) => {
1827 buffer.push_str(&String::from_utf8_lossy(&chunk));
1828 while let Some(newline_pos) = buffer.find('\n') {
1829 let line = buffer[..newline_pos].trim().to_string();
1830 buffer = buffer[newline_pos + 1..].to_string();
1831 if line.is_empty() || line.starts_with(':') {
1832 continue;
1833 }
1834 let data = if line.starts_with("data: ") {
1835 &line[6..]
1836 } else {
1837 &line
1838 };
1839 let value = match serde_json::from_str::<Value>(data) {
1840 Ok(value) => Ok(value),
1841 Err(_) => Ok(Value::String(data.to_string())),
1842 };
1843 if tx.send(value).await.is_err() {
1844 return;
1845 }
1846 }
1847 }
1848 Err(e) => {
1849 let _ = tx
1850 .send(Err(SdkError::TransportError {
1851 kind: TransportFailureKind::Io,
1852 message: e.to_string(),
1853 }))
1854 .await;
1855 return;
1856 }
1857 }
1858 }
1859 let remaining = buffer.trim().to_string();
1861 if !remaining.is_empty() && !remaining.starts_with(':') {
1862 let data = if remaining.starts_with("data: ") {
1863 &remaining[6..]
1864 } else {
1865 &remaining
1866 };
1867 let value = match serde_json::from_str::<Value>(data) {
1868 Ok(value) => Ok(value),
1869 Err(_) => Ok(Value::String(data.to_string())),
1870 };
1871 let _ = tx.send(value).await;
1872 }
1873 });
1874 tokio_stream::wrappers::ReceiverStream::new(rx)
1875}
1876
1877fn prune_nulls(value: Value) -> Value {
1878 match value {
1879 Value::Object(map) => Value::Object(
1880 map.into_iter()
1881 .filter_map(|(key, value)| {
1882 let cleaned = prune_nulls(value);
1883 if cleaned.is_null() {
1884 None
1885 } else {
1886 Some((key, cleaned))
1887 }
1888 })
1889 .collect::<Map<String, Value>>(),
1890 ),
1891 Value::Array(items) => Value::Array(items.into_iter().map(prune_nulls).collect()),
1892 other => other,
1893 }
1894}
1895
1896fn resolve_helper_run_id(
1897 explicit: Option<String>,
1898 fallback: Option<String>,
1899 helper_name: &str,
1900) -> Result<String> {
1901 let candidate = explicit.or(fallback).unwrap_or_default();
1902 if candidate.trim().is_empty() {
1903 return Err(SdkError::ValidationError(format!(
1904 "{} requires run_id or a client default run_id",
1905 helper_name
1906 )));
1907 }
1908 Ok(candidate)
1909}
1910
1911fn require_non_empty_string(value: String, field_name: &str) -> Result<String> {
1912 if value.trim().is_empty() {
1913 return Err(SdkError::ValidationError(format!(
1914 "{} is required",
1915 field_name
1916 )));
1917 }
1918 Ok(value)
1919}
1920
1921fn encode_optional_json(value: Option<&Value>) -> Result<String> {
1922 match value {
1923 Some(value) => serde_json::to_string(value).map_err(|err| {
1924 SdkError::ValidationError(format!("failed to serialize helper json field: {}", err))
1925 }),
1926 None => Ok(String::new()),
1927 }
1928}
1929
1930fn encode_string_vec(values: &[String]) -> Result<String> {
1931 if values.is_empty() {
1932 return Ok(String::new());
1933 }
1934 serde_json::to_string(values).map_err(|err| {
1935 SdkError::ValidationError(format!("failed to serialize helper string list: {}", err))
1936 })
1937}
1938
1939fn generate_helper_id(prefix: &str) -> String {
1940 use std::time::{SystemTime, UNIX_EPOCH};
1941
1942 let millis = SystemTime::now()
1943 .duration_since(UNIX_EPOCH)
1944 .unwrap_or_default()
1945 .as_millis();
1946 format!("{}-{}", prefix, millis)
1947}
1948
1949fn ensure_object_payload(payload: Value) -> Result<Value> {
1950 match payload {
1951 Value::Null => Ok(json!({})),
1952 Value::Object(_) => Ok(payload),
1953 _ => Err(SdkError::ValidationError(
1954 "payload must serialize to a JSON object".to_string(),
1955 )),
1956 }
1957}
1958
1959fn decode_grpc_request<T: DeserializeOwned>(op_key: &str, payload: Value) -> Result<T> {
1960 serde_json::from_value(payload).map_err(|e| {
1961 SdkError::ValidationError(format!(
1962 "invalid gRPC request payload for {}: {}",
1963 op_key, e
1964 ))
1965 })
1966}
1967
1968fn encode_grpc_response<T: Serialize>(op_key: &str, response: T) -> Result<Value> {
1969 serde_json::to_value(response).map_err(|e| {
1970 SdkError::ServerError(format!(
1971 "failed to serialize gRPC response for {}: {}",
1972 op_key, e
1973 ))
1974 })
1975}
1976
1977fn decode_batch_insert_payload(
1978 payload: Value,
1979) -> Result<Vec<crate::proto::mubit::v1::InsertRequest>> {
1980 let payload = ensure_object_payload(payload)?;
1981 let mut extracted: Option<&Value> = None;
1982 if let Some(map) = payload.as_object() {
1983 for key in ["items", "requests", "nodes"] {
1984 if let Some(value) = map.get(key) {
1985 extracted = Some(value);
1986 break;
1987 }
1988 }
1989 }
1990
1991 if let Some(Value::Array(items)) = extracted {
1992 if items.is_empty() {
1993 return Err(SdkError::ValidationError(
1994 "batch_insert gRPC payload cannot provide an empty items list".to_string(),
1995 ));
1996 }
1997 return items
1998 .iter()
1999 .cloned()
2000 .map(|item| decode_grpc_request("core.batch_insert", item))
2001 .collect();
2002 }
2003
2004 if extracted.is_some() {
2005 return Err(SdkError::ValidationError(
2006 "batch_insert gRPC payload items/requests/nodes must be an array".to_string(),
2007 ));
2008 }
2009
2010 let single = decode_grpc_request("core.batch_insert", payload)?;
2011 Ok(vec![single])
2012}
2013
2014fn value_to_param(value: &Value) -> Option<String> {
2015 match value {
2016 Value::String(v) => Some(v.clone()),
2017 Value::Number(v) => Some(v.to_string()),
2018 Value::Bool(v) => Some(v.to_string()),
2019 _ => None,
2020 }
2021}
2022
2023fn value_to_query(value: &Value) -> Option<String> {
2024 match value {
2025 Value::String(v) => Some(v.clone()),
2026 Value::Number(v) => Some(v.to_string()),
2027 Value::Bool(v) => Some(v.to_string()),
2028 Value::Array(items) => {
2029 let rendered: Vec<String> = items.iter().filter_map(value_to_query).collect();
2030 if rendered.is_empty() {
2031 None
2032 } else {
2033 Some(rendered.join(","))
2034 }
2035 }
2036 _ => None,
2037 }
2038}
2039
2040fn derive_http_and_grpc(endpoint: &str) -> Result<(String, String, bool)> {
2041 let endpoint = if endpoint.contains("://") {
2042 endpoint.to_string()
2043 } else {
2044 format!("http://{}", endpoint)
2045 };
2046
2047 let parsed = Url::parse(&endpoint).map_err(|e| {
2048 SdkError::ValidationError(format!("invalid endpoint '{}': {}", endpoint, e))
2049 })?;
2050
2051 let host = parsed.host_str().ok_or_else(|| {
2052 SdkError::ValidationError(format!("endpoint '{}' missing host", endpoint))
2053 })?;
2054
2055 let scheme = parsed.scheme();
2056 let port = parsed.port_or_known_default().ok_or_else(|| {
2057 SdkError::ValidationError(format!(
2058 "endpoint '{}' missing known default port",
2059 endpoint
2060 ))
2061 })?;
2062
2063 let default_port = match scheme {
2064 "https" => 443,
2065 _ => 80,
2066 };
2067
2068 let http_endpoint = if port == default_port {
2069 format!("{}://{}", scheme, host)
2070 } else {
2071 format!("{}://{}:{}", scheme, host, port)
2072 };
2073
2074 let grpc_endpoint = format!("{}:{}", host, port);
2075 let grpc_tls = scheme.eq_ignore_ascii_case("https");
2076
2077 Ok((http_endpoint, grpc_endpoint, grpc_tls))
2078}
2079
2080fn normalize_http_endpoint(endpoint: &str) -> Result<String> {
2081 let endpoint = if endpoint.contains("://") {
2082 endpoint.to_string()
2083 } else {
2084 format!("http://{}", endpoint)
2085 };
2086
2087 let parsed = Url::parse(&endpoint).map_err(|e| {
2088 SdkError::ValidationError(format!("invalid http_endpoint '{}': {}", endpoint, e))
2089 })?;
2090
2091 let host = parsed.host_str().ok_or_else(|| {
2092 SdkError::ValidationError(format!("http_endpoint '{}' missing host", endpoint))
2093 })?;
2094
2095 let scheme = parsed.scheme();
2096 let port = parsed.port_or_known_default().ok_or_else(|| {
2097 SdkError::ValidationError(format!(
2098 "http_endpoint '{}' missing known default port",
2099 endpoint
2100 ))
2101 })?;
2102
2103 let default_port = if scheme.eq_ignore_ascii_case("https") {
2104 443
2105 } else {
2106 80
2107 };
2108
2109 let normalized = if port == default_port {
2110 format!("{}://{}", scheme, host)
2111 } else {
2112 format!("{}://{}:{}", scheme, host, port)
2113 };
2114
2115 Ok(normalized)
2116}
2117
2118fn normalize_grpc_endpoint(endpoint: &str) -> Result<(String, bool)> {
2119 if endpoint.contains("://") {
2120 let parsed = Url::parse(endpoint).map_err(|e| {
2121 SdkError::ValidationError(format!("invalid grpc_endpoint '{}': {}", endpoint, e))
2122 })?;
2123 let host = parsed.host_str().ok_or_else(|| {
2124 SdkError::ValidationError(format!("grpc_endpoint '{}' missing host", endpoint))
2125 })?;
2126 let port = parsed.port_or_known_default().ok_or_else(|| {
2127 SdkError::ValidationError(format!(
2128 "grpc_endpoint '{}' missing known default port",
2129 endpoint
2130 ))
2131 })?;
2132 return Ok((
2133 format!("{}:{}", host, port),
2134 parsed.scheme().eq_ignore_ascii_case("https")
2135 || parsed.scheme().eq_ignore_ascii_case("grpcs"),
2136 ));
2137 }
2138
2139 let endpoint = endpoint.trim();
2140 if endpoint.is_empty() {
2141 return Err(SdkError::ValidationError(
2142 "grpc_endpoint cannot be empty".to_string(),
2143 ));
2144 }
2145
2146 if let Some((host, port_text)) = endpoint.rsplit_once(':') {
2147 if !host.trim().is_empty() {
2148 if let Ok(port) = port_text.parse::<u16>() {
2149 return Ok((format!("{}:{}", host, port), port == 443));
2150 }
2151 }
2152 return Ok((endpoint.to_string(), false));
2153 }
2154
2155 Ok((format!("{}:50051", endpoint), false))
2156}
2157
2158fn map_grpc_connect_error(error: tonic::transport::Error, endpoint: &str) -> SdkError {
2159 let lower = error.to_string().to_lowercase();
2160 let kind = if lower.contains("deadline") || lower.contains("timed out") {
2161 TransportFailureKind::DeadlineExceeded
2162 } else if lower.contains("connection reset") {
2163 TransportFailureKind::ConnectionReset
2164 } else if lower.contains("dns")
2165 || lower.contains("refused")
2166 || lower.contains("unavailable")
2167 || lower.contains("not connected")
2168 {
2169 TransportFailureKind::Unavailable
2170 } else {
2171 TransportFailureKind::Io
2172 };
2173
2174 SdkError::TransportError {
2175 kind,
2176 message: format!("failed to connect to gRPC endpoint {}: {}", endpoint, error),
2177 }
2178}
2179
2180fn map_grpc_status(status: Status) -> SdkError {
2181 let message = status.message().to_string();
2182 match status.code() {
2183 Code::Unauthenticated | Code::PermissionDenied => SdkError::AuthError(message),
2184 Code::InvalidArgument
2185 | Code::NotFound
2186 | Code::AlreadyExists
2187 | Code::FailedPrecondition
2188 | Code::OutOfRange => SdkError::ValidationError(message),
2189 Code::Unavailable => SdkError::TransportError {
2190 kind: TransportFailureKind::Unavailable,
2191 message,
2192 },
2193 Code::DeadlineExceeded => SdkError::TransportError {
2194 kind: TransportFailureKind::DeadlineExceeded,
2195 message,
2196 },
2197 Code::Unimplemented => SdkError::TransportError {
2198 kind: TransportFailureKind::Unimplemented,
2199 message,
2200 },
2201 Code::Cancelled => SdkError::TransportError {
2202 kind: TransportFailureKind::Io,
2203 message,
2204 },
2205 Code::Unknown | Code::Internal => {
2206 let lower = message.to_lowercase();
2207 if lower.contains("connection reset") {
2208 SdkError::TransportError {
2209 kind: TransportFailureKind::ConnectionReset,
2210 message,
2211 }
2212 } else if lower.contains("transport")
2213 || lower.contains("broken pipe")
2214 || lower.contains("io error")
2215 {
2216 SdkError::TransportError {
2217 kind: TransportFailureKind::Io,
2218 message,
2219 }
2220 } else {
2221 SdkError::ServerError(message)
2222 }
2223 }
2224 _ => SdkError::ServerError(message),
2225 }
2226}
2227
2228fn map_transport_error(error: reqwest::Error, message: String) -> SdkError {
2229 let lower = error.to_string().to_lowercase();
2230 let kind = if error.is_timeout() {
2231 TransportFailureKind::DeadlineExceeded
2232 } else if error.is_connect() {
2233 TransportFailureKind::Unavailable
2234 } else if lower.contains("connection reset") {
2235 TransportFailureKind::ConnectionReset
2236 } else if error.is_request() || error.is_body() {
2237 TransportFailureKind::Io
2238 } else {
2239 TransportFailureKind::Other
2240 };
2241
2242 SdkError::TransportError {
2243 kind,
2244 message: format!("{}: {}", message, error),
2245 }
2246}
2247
2248fn map_http_error(status: u16, body: String) -> SdkError {
2249 match status {
2250 401 | 403 => SdkError::AuthError(body),
2251 400 | 404 | 409 | 422 => SdkError::ValidationError(body),
2252 501 => SdkError::UnsupportedFeatureError(body),
2253 _ => SdkError::ServerError(body),
2254 }
2255}
2256
2257fn http_method_label(method: HttpMethod) -> &'static str {
2258 match method {
2259 HttpMethod::Get => "GET",
2260 HttpMethod::Post => "POST",
2261 HttpMethod::Delete => "DELETE",
2262 }
2263}