Skip to main content

mubit_sdk/
lib.rs

1pub mod contract;
2pub mod learn;
3pub mod proto {
4    pub mod mubit {
5        pub mod v1 {
6            tonic::include_proto!("mubit.v1");
7        }
8    }
9}
10
11use crate::contract::{find_operation, HttpMethod, OperationSpec};
12use reqwest::header::CONTENT_TYPE;
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use serde_json::{json, Map, Value};
16use std::collections::HashSet;
17use std::pin::Pin;
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20use thiserror::Error;
21use tokio::sync::Mutex;
22use tokio::time::sleep;
23use tokio_stream::{Stream, StreamExt};
24use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
25use tonic::{Code, Request, Status};
26use url::Url;
27
28/// A boxed stream of JSON values for server-streaming RPCs.
29pub type ValueStream = Pin<Box<dyn Stream<Item = Result<Value>> + Send>>;
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum TransportMode {
33    Auto,
34    Grpc,
35    Http,
36}
37
38impl TransportMode {
39    fn normalize(raw: &str) -> Self {
40        match raw.trim().to_lowercase().as_str() {
41            "grpc" => Self::Grpc,
42            "http" => Self::Http,
43            _ => Self::Auto,
44        }
45    }
46}
47
48const DEFAULT_SHARED_HTTP_ENDPOINT: &str = "https://api.mubit.ai";
49const DEFAULT_SHARED_GRPC_ENDPOINT: &str = "grpc.api.mubit.ai:443";
50
51fn env_non_empty(name: &str) -> Option<String> {
52    std::env::var(name)
53        .ok()
54        .map(|value| value.trim().to_string())
55        .filter(|value| !value.is_empty())
56}
57
58#[derive(Clone, Debug)]
59pub struct ClientConfig {
60    pub endpoint: String,
61    pub grpc_endpoint: Option<String>,
62    pub http_endpoint: Option<String>,
63    pub transport: TransportMode,
64    pub api_key: Option<String>,
65    pub token: Option<String>, // Backward-compatible alias for api_key in 0.3.x
66    pub run_id: Option<String>,
67    pub timeout_ms: u64,
68}
69
70impl ClientConfig {
71    pub fn new(endpoint: impl Into<String>) -> Self {
72        Self {
73            endpoint: endpoint.into(),
74            grpc_endpoint: None,
75            http_endpoint: None,
76            transport: TransportMode::Auto,
77            api_key: None,
78            token: None,
79            run_id: None,
80            timeout_ms: 30_000,
81        }
82    }
83
84    pub fn transport(mut self, transport: impl AsRef<str>) -> Self {
85        self.transport = TransportMode::normalize(transport.as_ref());
86        self
87    }
88
89    pub fn api_key(mut self, api_key: impl Into<String>) -> Self {
90        self.api_key = Some(api_key.into());
91        self
92    }
93
94    pub fn token(mut self, token: impl Into<String>) -> Self {
95        self.api_key = Some(token.into());
96        self
97    }
98
99    pub fn run_id(mut self, run_id: impl Into<String>) -> Self {
100        self.run_id = Some(run_id.into());
101        self
102    }
103
104    pub fn from_env() -> Self {
105        Self::default()
106    }
107}
108
109impl Default for ClientConfig {
110    fn default() -> Self {
111        let transport = env_non_empty("MUBIT_TRANSPORT")
112            .map(|value| TransportMode::normalize(&value))
113            .unwrap_or(TransportMode::Auto);
114        let endpoint = env_non_empty("MUBIT_ENDPOINT")
115            .unwrap_or_else(|| DEFAULT_SHARED_HTTP_ENDPOINT.to_string());
116
117        let mut config = Self::new(endpoint);
118        config.transport = transport;
119        config.http_endpoint = env_non_empty("MUBIT_HTTP_ENDPOINT");
120        config.grpc_endpoint = env_non_empty("MUBIT_GRPC_ENDPOINT");
121        config.api_key = env_non_empty("MUBIT_API_KEY");
122        config.token = env_non_empty("MUBIT_TOKEN");
123        config.run_id = env_non_empty("MUBIT_RUN_ID");
124
125        if config.http_endpoint.is_none() {
126            config.http_endpoint = Some(DEFAULT_SHARED_HTTP_ENDPOINT.to_string());
127        }
128        if config.grpc_endpoint.is_none() {
129            config.grpc_endpoint = Some(DEFAULT_SHARED_GRPC_ENDPOINT.to_string());
130        }
131
132        config
133    }
134}
135
136#[derive(Clone, Copy, Debug, PartialEq, Eq)]
137pub enum TransportFailureKind {
138    Unavailable,
139    ConnectionReset,
140    DeadlineExceeded,
141    Io,
142    Unimplemented,
143    Other,
144}
145
146#[derive(Error, Debug)]
147pub enum SdkError {
148    #[error("AuthError: {0}")]
149    AuthError(String),
150    #[error("ValidationError: {0}")]
151    ValidationError(String),
152    #[error("TransportError({kind:?}): {message}")]
153    TransportError {
154        kind: TransportFailureKind,
155        message: String,
156    },
157    #[error("ServerError: {0}")]
158    ServerError(String),
159    #[error("UnsupportedFeatureError: {0}")]
160    UnsupportedFeatureError(String),
161}
162
163impl SdkError {
164    fn is_fallback_eligible(&self) -> bool {
165        matches!(
166            self,
167            SdkError::TransportError {
168                kind: TransportFailureKind::Unavailable
169                    | TransportFailureKind::ConnectionReset
170                    | TransportFailureKind::DeadlineExceeded
171                    | TransportFailureKind::Io
172                    | TransportFailureKind::Unimplemented,
173                ..
174            }
175        )
176    }
177}
178
179pub type Result<T> = std::result::Result<T, SdkError>;
180
181#[derive(Clone, Debug)]
182struct MutableState {
183    api_key: Option<String>,
184    run_id: Option<String>,
185    transport: TransportMode,
186}
187
188struct TransportEngine {
189    http_endpoint: String,
190    grpc_endpoint: String,
191    grpc_tls: bool,
192    timeout: Duration,
193    http_client: reqwest::Client,
194    grpc_channel: Mutex<Option<Channel>>,
195    state: Arc<RwLock<MutableState>>,
196}
197
198impl TransportEngine {
199    fn new(config: ClientConfig) -> Result<Self> {
200        let (default_http_endpoint, default_grpc_endpoint, default_grpc_tls) =
201            derive_http_and_grpc(&config.endpoint)?;
202
203        let http_endpoint = match config.http_endpoint {
204            Some(http_endpoint) => normalize_http_endpoint(&http_endpoint)?,
205            None => default_http_endpoint,
206        };
207
208        let (grpc_endpoint, grpc_tls) = match config.grpc_endpoint {
209            Some(grpc_endpoint) => normalize_grpc_endpoint(&grpc_endpoint)?,
210            None => (default_grpc_endpoint, default_grpc_tls),
211        };
212
213        let timeout = Duration::from_millis(config.timeout_ms);
214        let http_client = reqwest::Client::builder()
215            .timeout(timeout)
216            .build()
217            .map_err(|e| SdkError::TransportError {
218                kind: TransportFailureKind::Other,
219                message: format!("failed to build HTTP client: {}", e),
220            })?;
221
222        Ok(Self {
223            http_endpoint,
224            grpc_endpoint,
225            grpc_tls,
226            timeout,
227            http_client,
228            grpc_channel: Mutex::new(None),
229            state: Arc::new(RwLock::new(MutableState {
230                api_key: config.api_key.or(config.token),
231                run_id: config.run_id,
232                transport: config.transport,
233            })),
234        })
235    }
236
237    fn set_api_key(&self, api_key: Option<String>) {
238        if let Ok(mut state) = self.state.write() {
239            state.api_key = api_key;
240        }
241    }
242
243    fn set_run_id(&self, run_id: Option<String>) {
244        if let Ok(mut state) = self.state.write() {
245            state.run_id = run_id;
246        }
247    }
248
249    fn set_transport(&self, transport: TransportMode) {
250        if let Ok(mut state) = self.state.write() {
251            state.transport = transport;
252        }
253    }
254
255    fn api_key(&self) -> Option<String> {
256        self.state.read().ok().and_then(|s| s.api_key.clone())
257    }
258
259    fn run_id(&self) -> Option<String> {
260        self.state.read().ok().and_then(|s| s.run_id.clone())
261    }
262
263    fn transport(&self) -> TransportMode {
264        self.state
265            .read()
266            .map(|s| s.transport)
267            .unwrap_or(TransportMode::Auto)
268    }
269
270    async fn invoke_serialized<T: Serialize>(&self, op_key: &str, payload: T) -> Result<Value> {
271        let value = serde_json::to_value(payload).map_err(|e| {
272            SdkError::ValidationError(format!("failed to serialize request payload: {}", e))
273        })?;
274        self.invoke(op_key, value).await
275    }
276
277    async fn invoke(&self, op_key: &str, payload: Value) -> Result<Value> {
278        let op = find_operation(op_key).ok_or_else(|| {
279            SdkError::UnsupportedFeatureError(format!("unknown operation: {}", op_key))
280        })?;
281
282        let mut payload = ensure_object_payload(payload)?;
283        self.apply_run_id_default(op, &mut payload);
284
285        match self.transport() {
286            TransportMode::Grpc => self.invoke_grpc(op, payload).await,
287            TransportMode::Http => self.invoke_http(op, payload).await,
288            TransportMode::Auto => match self.invoke_grpc(op, payload.clone()).await {
289                Ok(value) => Ok(value),
290                Err(err) if err.is_fallback_eligible() => self.invoke_http(op, payload).await,
291                Err(err) => Err(err),
292            },
293        }
294    }
295
296    pub async fn invoke_stream(&self, key: &str, payload: Value) -> Result<ValueStream> {
297        let op = find_operation(key).ok_or_else(|| {
298            SdkError::UnsupportedFeatureError(format!("unknown operation: {}", key))
299        })?;
300
301        let mut payload = ensure_object_payload(payload)?;
302        self.apply_run_id_default(op, &mut payload);
303
304        match self.transport() {
305            TransportMode::Grpc => self.invoke_grpc_stream(op, payload).await,
306            TransportMode::Http => self.invoke_http_stream(op, payload).await,
307            TransportMode::Auto => match self.invoke_grpc_stream(op, payload.clone()).await {
308                Ok(stream) => Ok(stream),
309                Err(err) if err.is_fallback_eligible() => {
310                    self.invoke_http_stream(op, payload).await
311                }
312                Err(err) => Err(err),
313            },
314        }
315    }
316
317    pub async fn invoke_stream_serialized<T: Serialize>(
318        &self,
319        key: &str,
320        payload: T,
321    ) -> Result<ValueStream> {
322        let value = serde_json::to_value(payload).map_err(|e| {
323            SdkError::ValidationError(format!("failed to serialize payload: {}", e))
324        })?;
325        self.invoke_stream(key, value).await
326    }
327
328    fn apply_run_id_default(&self, op: &OperationSpec, payload: &mut Value) {
329        let Some(run_id_field) = op.run_id_field else {
330            return;
331        };
332        let Some(run_id) = self.run_id() else {
333            return;
334        };
335
336        if let Some(map) = payload.as_object_mut() {
337            if !map.contains_key(run_id_field) {
338                map.insert(run_id_field.to_string(), Value::String(run_id));
339            }
340        }
341    }
342
343    async fn grpc_channel(&self) -> Result<Channel> {
344        {
345            let guard = self.grpc_channel.lock().await;
346            if let Some(channel) = guard.as_ref() {
347                return Ok(channel.clone());
348            }
349        }
350
351        let uri = if self.grpc_tls {
352            format!("https://{}", self.grpc_endpoint)
353        } else {
354            format!("http://{}", self.grpc_endpoint)
355        };
356
357        let mut endpoint = Endpoint::from_shared(uri.clone()).map_err(|e| {
358            SdkError::ValidationError(format!("invalid gRPC endpoint '{}': {}", uri, e))
359        })?;
360        endpoint = endpoint.connect_timeout(self.timeout).timeout(self.timeout);
361        if self.grpc_tls {
362            endpoint = endpoint.tls_config(ClientTlsConfig::new()).map_err(|e| {
363                SdkError::TransportError {
364                    kind: TransportFailureKind::Other,
365                    message: format!("failed to configure TLS for {}: {}", self.grpc_endpoint, e),
366                }
367            })?;
368        }
369
370        let channel = endpoint
371            .connect()
372            .await
373            .map_err(|e| map_grpc_connect_error(e, &self.grpc_endpoint))?;
374
375        let mut guard = self.grpc_channel.lock().await;
376        *guard = Some(channel.clone());
377        Ok(channel)
378    }
379
380    fn attach_grpc_metadata<T>(&self, request: &mut Request<T>) {
381        if let Some(api_key) = self.api_key() {
382            if let Ok(header_value) = format!("Bearer {}", api_key).parse() {
383                request.metadata_mut().insert("authorization", header_value);
384            }
385        }
386    }
387
388    fn grpc_request<T>(&self, payload: T) -> Request<T> {
389        let mut request = Request::new(payload);
390        self.attach_grpc_metadata(&mut request);
391        request
392    }
393
394    async fn invoke_grpc(&self, op: &OperationSpec, payload: Value) -> Result<Value> {
395        use crate::proto::mubit::v1 as pb;
396
397        if op.grpc_method.is_empty() {
398            return Err(SdkError::TransportError {
399                kind: TransportFailureKind::Unimplemented,
400                message: format!("operation {} has no gRPC mapping", op.key),
401            });
402        }
403
404        let channel = self.grpc_channel().await?;
405
406        macro_rules! unary_core {
407            ($method:ident, $req_ty:ty) => {{
408                let request: $req_ty = decode_grpc_request(op.key, payload)?;
409                let mut client = pb::core_service_client::CoreServiceClient::new(channel.clone());
410                let response = client
411                    .$method(self.grpc_request(request))
412                    .await
413                    .map_err(map_grpc_status)?;
414                encode_grpc_response(op.key, response.into_inner())
415            }};
416        }
417
418        macro_rules! unary_control {
419            ($method:ident, $req_ty:ty) => {{
420                let request: $req_ty = decode_grpc_request(op.key, payload)?;
421                let mut client =
422                    pb::control_service_client::ControlServiceClient::new(channel.clone());
423                let response = client
424                    .$method(self.grpc_request(request))
425                    .await
426                    .map_err(map_grpc_status)?;
427                encode_grpc_response(op.key, response.into_inner())
428            }};
429        }
430
431        match op.key {
432            // auth
433            "auth.health" => unary_core!(health, pb::HealthRequest),
434            "auth.create_user" => unary_core!(create_user, pb::CreateUserRequest),
435            "auth.rotate_user_api_key" => {
436                unary_core!(rotate_user_api_key, pb::RotateUserApiKeyRequest)
437            }
438            "auth.revoke_user_api_key" => {
439                unary_core!(revoke_user_api_key, pb::RevokeUserApiKeyRequest)
440            }
441            "auth.list_users" => unary_core!(list_users, pb::ListUsersRequest),
442            "auth.get_user" => unary_core!(get_user, pb::GetUserRequest),
443            "auth.delete_user" => unary_core!(delete_user, pb::DeleteUserRequest),
444
445            // core
446            "core.insert" => unary_core!(insert, pb::InsertRequest),
447            "core.search" => unary_core!(search, pb::SearchRequest),
448            "core.delete_node" => unary_core!(delete_node, pb::DeleteNodeRequest),
449            "core.delete_run" => unary_core!(delete_run, pb::DeleteRunRequest),
450            "core.create_session" => unary_core!(create_session, pb::CreateSessionRequest),
451            "core.snapshot_session" => unary_core!(snapshot_session, pb::SnapshotSessionRequest),
452            "core.load_session" => unary_core!(load_session, pb::LoadSessionRequest),
453            "core.commit_session" => unary_core!(commit_session, pb::CommitSessionRequest),
454            "core.drop_session" => unary_core!(drop_session, pb::DropSessionRequest),
455            "core.write_memory" => unary_core!(write_memory, pb::WriteMemoryRequest),
456            "core.read_memory" => unary_core!(read_memory, pb::ReadMemoryRequest),
457            "core.add_memory" => unary_core!(add_memory, pb::AddMemoryRequest),
458            "core.get_memory" => unary_core!(get_memory, pb::GetMemoryRequest),
459            "core.clear_memory" => unary_core!(clear_memory, pb::ClearMemoryRequest),
460            "core.grant_permission" => unary_core!(grant_permission, pb::GrantPermissionRequest),
461            "core.revoke_permission" => unary_core!(revoke_permission, pb::RevokePermissionRequest),
462            "core.batch_insert" => {
463                let request_items = decode_batch_insert_payload(payload)?;
464                let mut request = Request::new(tokio_stream::iter(request_items));
465                self.attach_grpc_metadata(&mut request);
466                let mut client = pb::core_service_client::CoreServiceClient::new(channel.clone());
467                let response = client
468                    .batch_insert(request)
469                    .await
470                    .map_err(map_grpc_status)?;
471                encode_grpc_response(op.key, response.into_inner())
472            }
473            // control
474            "control.set_variable" => unary_control!(set_variable, pb::SetVariableRequest),
475            "control.get_variable" => unary_control!(get_variable, pb::GetVariableRequest),
476            "control.list_variables" => unary_control!(list_variables, pb::ListVariablesRequest),
477            "control.delete_variable" => unary_control!(delete_variable, pb::DeleteVariableRequest),
478            "control.define_concept" => unary_control!(define_concept, pb::DefineConceptRequest),
479            "control.list_concepts" => unary_control!(list_concepts, pb::ListConceptsRequest),
480            "control.add_goal" => unary_control!(add_goal, pb::AddGoalRequest),
481            "control.update_goal" => unary_control!(update_goal, pb::UpdateGoalRequest),
482            "control.list_goals" => unary_control!(list_goals, pb::ListGoalsRequest),
483            "control.get_goal_tree" => unary_control!(get_goal_tree, pb::GetGoalTreeRequest),
484            "control.submit_action" => unary_control!(submit_action, pb::ActionRequest),
485            "control.get_action_log" => unary_control!(get_action_log, pb::ActionLogRequest),
486            "control.run_cycle" => unary_control!(run_cycle, pb::RunCycleRequest),
487            "control.get_cycle_history" => {
488                unary_control!(get_cycle_history, pb::CycleHistoryRequest)
489            }
490            "control.register_agent" => unary_control!(register_agent, pb::AgentRegisterRequest),
491            "control.agent_heartbeat" => {
492                unary_control!(agent_heartbeat, pb::AgentHeartbeatRequest)
493            }
494            "control.append_activity" => unary_control!(append_activity, pb::ActivityAppendRequest),
495            "control.context_snapshot" => unary_control!(get_run_snapshot, pb::RunSnapshotRequest),
496            "control.link_run" => unary_control!(link_run, pb::LinkRunRequest),
497            "control.unlink_run" => unary_control!(unlink_run, pb::UnlinkRunRequest),
498            "control.ingest" => unary_control!(ingest, pb::IngestRequest),
499            "control.batch_insert" => {
500                unary_control!(batch_insert, pb::ControlBatchInsertRequest)
501            }
502            "control.get_ingest_job" => unary_control!(get_ingest_job, pb::GetIngestJobRequest),
503            "control.query" => unary_control!(query, pb::AgentQueryRequest),
504            "control.diagnose" => unary_control!(diagnose, pb::DiagnoseRequest),
505            "control.delete_run" => unary_control!(delete_run, pb::RunRequest),
506            "control.reflect" => unary_control!(reflect, pb::ReflectRequest),
507            "control.lessons" => unary_control!(list_lessons, pb::ListLessonsRequest),
508            "control.delete_lesson" => unary_control!(delete_lesson, pb::DeleteLessonRequest),
509            "control.context" => unary_control!(get_context, pb::ContextRequest),
510            "control.list_activity" => unary_control!(list_activity, pb::ListActivityRequest),
511            "control.export_activity" => {
512                unary_control!(export_activity, pb::ExportActivityRequest)
513            }
514            "control.archive_block" => unary_control!(archive_block, pb::ArchiveBlockRequest),
515            "control.dereference" => unary_control!(dereference, pb::DereferenceRequest),
516            "control.memory_health" => {
517                unary_control!(get_memory_health, pb::MemoryHealthRequest)
518            }
519            "control.checkpoint" => unary_control!(checkpoint, pb::CheckpointRequest),
520            "control.list_agents" => unary_control!(list_agents, pb::ListAgentsRequest),
521            "control.create_handoff" => unary_control!(create_handoff, pb::HandoffRequest),
522            "control.submit_feedback" => unary_control!(submit_feedback, pb::FeedbackRequest),
523            "control.record_outcome" => {
524                unary_control!(record_outcome, pb::RecordOutcomeRequest)
525            }
526            "control.surface_strategies" => {
527                unary_control!(surface_strategies, pb::SurfaceStrategiesRequest)
528            }
529            _ => Err(SdkError::UnsupportedFeatureError(format!(
530                "unknown gRPC operation: {}",
531                op.key
532            ))),
533        }
534    }
535
536    async fn invoke_grpc_stream(
537        &self,
538        op: &'static OperationSpec,
539        payload: Value,
540    ) -> Result<ValueStream> {
541        use crate::proto::mubit::v1 as pb;
542
543        let channel = self.grpc_channel().await?;
544
545        match op.key {
546            "core.subscribe_events" => {
547                let request: pb::CoreSubscribeRequest =
548                    decode_grpc_request(op.key, payload)?;
549                let mut client =
550                    pb::core_service_client::CoreServiceClient::new(channel.clone());
551                let response = client
552                    .subscribe(self.grpc_request(request))
553                    .await
554                    .map_err(map_grpc_status)?;
555                let stream = response.into_inner();
556                Ok(Box::pin(stream.map(|result| {
557                    result
558                        .map(|msg| {
559                            serde_json::to_value(msg).unwrap_or_else(|e| {
560                                serde_json::json!({"error": e.to_string()})
561                            })
562                        })
563                        .map_err(map_grpc_status)
564                })))
565            }
566            "control.subscribe" => {
567                let request: pb::SubscribeRequest =
568                    decode_grpc_request(op.key, payload)?;
569                let mut client =
570                    pb::control_service_client::ControlServiceClient::new(channel.clone());
571                let response = client
572                    .subscribe(self.grpc_request(request))
573                    .await
574                    .map_err(map_grpc_status)?;
575                let stream = response.into_inner();
576                Ok(Box::pin(stream.map(|result| {
577                    result
578                        .map(|msg| {
579                            serde_json::to_value(msg).unwrap_or_else(|e| {
580                                serde_json::json!({"error": e.to_string()})
581                            })
582                        })
583                        .map_err(map_grpc_status)
584                })))
585            }
586            "core.watch_memory" => {
587                let request: pb::WatchMemoryRequest =
588                    decode_grpc_request(op.key, payload)?;
589                let mut client =
590                    pb::core_service_client::CoreServiceClient::new(channel.clone());
591                let response = client
592                    .watch_memory(self.grpc_request(request))
593                    .await
594                    .map_err(map_grpc_status)?;
595                let stream = response.into_inner();
596                Ok(Box::pin(stream.map(|result| {
597                    result
598                        .map(|msg| {
599                            serde_json::to_value(msg).unwrap_or_else(|e| {
600                                serde_json::json!({"error": e.to_string()})
601                            })
602                        })
603                        .map_err(map_grpc_status)
604                })))
605            }
606            _ => Err(SdkError::UnsupportedFeatureError(format!(
607                "gRPC streaming not supported for {}",
608                op.key
609            ))),
610        }
611    }
612
613    async fn invoke_http_stream(
614        &self,
615        op: &'static OperationSpec,
616        payload: Value,
617    ) -> Result<ValueStream> {
618        let base = self.http_endpoint.trim_end_matches('/');
619        let route = op.http_path;
620        let url = format!("{}{}", base, route);
621
622        let client = &self.http_client;
623        let is_get = matches!(op.http_method, HttpMethod::Get);
624
625        let mut request = if is_get {
626            let mut req = client.get(&url);
627            if let Some(obj) = payload.as_object() {
628                for (k, v) in obj {
629                    let val = match v {
630                        Value::String(s) => s.clone(),
631                        other => other.to_string(),
632                    };
633                    req = req.query(&[(k.as_str(), val)]);
634                }
635            }
636            req
637        } else {
638            client.post(&url).json(&payload)
639        };
640
641        if let Some(api_key) = self.api_key() {
642            request = request.bearer_auth(api_key);
643        }
644
645        let response = request.send().await.map_err(|e| {
646            map_transport_error(e, format!("{} SSE request failed", op.key))
647        })?;
648
649        let status = response.status();
650        if !status.is_success() {
651            let body = response
652                .text()
653                .await
654                .unwrap_or_else(|_| "request failed".to_string());
655            return Err(map_http_error(status.as_u16(), body));
656        }
657
658        let byte_stream = response.bytes_stream();
659        let sse_stream = parse_sse_byte_stream(byte_stream);
660        Ok(Box::pin(sse_stream))
661    }
662
663    async fn invoke_http(&self, op: &OperationSpec, payload: Value) -> Result<Value> {
664        let mut path = op.http_path.to_string();
665        let mut consumed_keys = HashSet::new();
666
667        if let Some(map) = payload.as_object() {
668            for (key, value) in map {
669                let marker = format!(":{}", key);
670                if path.contains(&marker) {
671                    let rendered = value_to_param(value).ok_or_else(|| {
672                        SdkError::ValidationError(format!(
673                            "invalid path parameter value for {} in {}",
674                            key, op.key
675                        ))
676                    })?;
677                    path = path.replace(&marker, &rendered);
678                    consumed_keys.insert(key.clone());
679                }
680            }
681        }
682
683        if path.contains(':') {
684            return Err(SdkError::ValidationError(format!(
685                "missing path parameter for {}",
686                op.key
687            )));
688        }
689
690        let url = format!("{}{}", self.http_endpoint.trim_end_matches('/'), path);
691        let mut request = match op.http_method {
692            HttpMethod::Get => self.http_client.get(&url),
693            HttpMethod::Post => self.http_client.post(&url),
694            HttpMethod::Delete => self.http_client.delete(&url),
695        };
696
697        if let Some(api_key) = self.api_key() {
698            request = request.bearer_auth(api_key);
699        }
700
701        if matches!(op.http_method, HttpMethod::Get) {
702            let query = payload
703                .as_object()
704                .map(|map| {
705                    map.iter()
706                        .filter_map(|(key, value)| {
707                            if consumed_keys.contains(key) || value.is_null() {
708                                return None;
709                            }
710                            value_to_query(value).map(|rendered| (key.clone(), rendered))
711                        })
712                        .collect::<Vec<(String, String)>>()
713                })
714                .unwrap_or_default();
715
716            if !query.is_empty() {
717                request = request.query(&query);
718            }
719        } else if payload
720            .as_object()
721            .map(|map| !map.is_empty())
722            .unwrap_or(false)
723        {
724            request = request.json(&payload);
725        }
726
727        let response = request.send().await.map_err(|e| {
728            map_transport_error(
729                e,
730                format!(
731                    "{} {} request failed",
732                    http_method_label(op.http_method),
733                    op.key
734                ),
735            )
736        })?;
737
738        let status = response.status();
739        if !status.is_success() {
740            let body = response
741                .text()
742                .await
743                .unwrap_or_else(|_| "request failed".to_string());
744            return Err(map_http_error(status.as_u16(), body));
745        }
746
747        let content_type = response
748            .headers()
749            .get(CONTENT_TYPE)
750            .and_then(|v| v.to_str().ok())
751            .map(|v| v.to_lowercase())
752            .unwrap_or_default();
753
754        let bytes = response
755            .bytes()
756            .await
757            .map_err(|e| SdkError::TransportError {
758                kind: TransportFailureKind::Io,
759                message: format!("failed to read response body for {}: {}", op.key, e),
760            })?;
761
762        if bytes.is_empty() {
763            return Ok(json!({}));
764        }
765
766        if content_type.contains("application/json") {
767            return serde_json::from_slice::<Value>(&bytes).map_err(|e| {
768                SdkError::ServerError(format!(
769                    "failed to decode json response for {}: {}",
770                    op.key, e
771                ))
772            });
773        }
774
775        Ok(Value::String(String::from_utf8_lossy(&bytes).to_string()))
776    }
777}
778
779#[derive(Clone)]
780pub struct Client {
781    pub auth: AuthClient,
782    pub core: CoreClient,
783    pub control: ControlClient,
784    transport: Arc<TransportEngine>,
785}
786
787impl Client {
788    pub fn new(config: ClientConfig) -> Result<Self> {
789        let transport = Arc::new(TransportEngine::new(config)?);
790        Ok(Self {
791            auth: AuthClient {
792                transport: transport.clone(),
793            },
794            core: CoreClient {
795                transport: transport.clone(),
796            },
797            control: ControlClient {
798                transport: transport.clone(),
799            },
800            transport,
801        })
802    }
803
804    pub fn set_api_key(&self, api_key: Option<String>) {
805        self.transport.set_api_key(api_key);
806    }
807
808    pub fn set_token(&self, token: Option<String>) {
809        self.set_api_key(token);
810    }
811
812    pub fn set_run_id(&self, run_id: Option<String>) {
813        self.transport.set_run_id(run_id);
814    }
815
816    pub fn set_transport(&self, transport: TransportMode) {
817        self.transport.set_transport(transport);
818    }
819}
820
821#[derive(Clone, Debug)]
822pub struct RememberOptions {
823    pub run_id: Option<String>,
824    pub agent_id: Option<String>,
825    pub item_id: Option<String>,
826    pub content: String,
827    pub content_type: String,
828    pub metadata: Option<Value>,
829    pub hints: Option<Value>,
830    pub payload: Option<Value>,
831    pub intent: Option<String>,
832    pub lesson_type: Option<String>,
833    pub lesson_scope: Option<String>,
834    pub lesson_importance: Option<String>,
835    pub lesson_conditions: Vec<String>,
836    pub user_id: Option<String>,
837    pub upsert_key: Option<String>,
838    pub importance: Option<String>,
839    pub source: Option<String>,
840    pub lane: Option<String>,
841    pub parallel: bool,
842    pub idempotency_key: Option<String>,
843    pub wait: bool,
844    pub timeout_ms: Option<u64>,
845    pub poll_interval_ms: u64,
846    /// When the event actually occurred (unix seconds). Distinct from ingestion
847    /// time. Used for temporal queries about when events happened vs when the
848    /// system learned about them.
849    pub occurrence_time: Option<i64>,
850}
851
852impl RememberOptions {
853    pub fn new(content: impl Into<String>) -> Self {
854        Self {
855            run_id: None,
856            agent_id: Some("sdk-client".to_string()),
857            item_id: None,
858            content: content.into(),
859            content_type: "text/plain".to_string(),
860            metadata: None,
861            hints: None,
862            payload: None,
863            intent: None,
864            lesson_type: None,
865            lesson_scope: None,
866            lesson_importance: None,
867            lesson_conditions: Vec::new(),
868            user_id: None,
869            upsert_key: None,
870            importance: None,
871            source: Some("agent".to_string()),
872            lane: None,
873            parallel: false,
874            idempotency_key: None,
875            wait: true,
876            timeout_ms: None,
877            poll_interval_ms: 300,
878            occurrence_time: None,
879        }
880    }
881}
882
883#[derive(Clone, Debug)]
884pub struct RecallOptions {
885    pub run_id: Option<String>,
886    pub query: String,
887    pub schema: Option<String>,
888    pub mode: String,
889    pub direct_lane: String,
890    pub include_linked_runs: bool,
891    pub limit: u64,
892    pub embedding: Vec<f32>,
893    pub entry_types: Vec<String>,
894    pub include_working_memory: bool,
895    pub user_id: Option<String>,
896    pub agent_id: Option<String>,
897    pub lane: Option<String>,
898    /// Temporal range filter lower bound (unix seconds, inclusive).
899    pub min_timestamp: Option<i64>,
900    /// Temporal range filter upper bound (unix seconds, inclusive).
901    pub max_timestamp: Option<i64>,
902    /// Search budget tier: "low", "mid", "high".
903    pub budget: Option<String>,
904}
905
906impl RecallOptions {
907    pub fn new(query: impl Into<String>) -> Self {
908        Self {
909            run_id: None,
910            query: query.into(),
911            schema: None,
912            mode: "agent_routed".to_string(),
913            direct_lane: "semantic_search".to_string(),
914            include_linked_runs: false,
915            limit: 5,
916            embedding: Vec::new(),
917            entry_types: Vec::new(),
918            include_working_memory: true,
919            user_id: None,
920            agent_id: None,
921            lane: None,
922            min_timestamp: None,
923            max_timestamp: None,
924            budget: None,
925        }
926    }
927}
928
929#[derive(Clone, Debug)]
930pub struct GetContextOptions {
931    pub run_id: Option<String>,
932    pub query: Option<String>,
933    pub user_id: Option<String>,
934    pub entry_types: Vec<String>,
935    pub include_working_memory: bool,
936    pub format: Option<String>,
937    pub limit: Option<u64>,
938    pub max_token_budget: Option<u32>,
939    pub agent_id: Option<String>,
940    pub mode: Option<String>,
941    pub sections: Vec<String>,
942    pub lane: Option<String>,
943}
944
945impl Default for GetContextOptions {
946    fn default() -> Self {
947        Self {
948            run_id: None,
949            query: None,
950            user_id: None,
951            entry_types: Vec::new(),
952            include_working_memory: true,
953            format: None,
954            limit: None,
955            max_token_budget: None,
956            agent_id: None,
957            mode: None,
958            sections: Vec::new(),
959            lane: None,
960        }
961    }
962}
963
964#[derive(Clone, Debug)]
965pub struct ArchiveOptions {
966    pub run_id: Option<String>,
967    pub content: String,
968    pub artifact_kind: String,
969    pub metadata: Option<Value>,
970    pub user_id: Option<String>,
971    pub agent_id: Option<String>,
972    pub origin_agent_id: Option<String>,
973    pub source_attempt_id: Option<String>,
974    pub source_tool: Option<String>,
975    pub labels: Vec<String>,
976    pub family: Option<String>,
977    pub importance: Option<String>,
978}
979
980impl ArchiveOptions {
981    pub fn new(content: impl Into<String>, artifact_kind: impl Into<String>) -> Self {
982        Self {
983            run_id: None,
984            content: content.into(),
985            artifact_kind: artifact_kind.into(),
986            metadata: None,
987            user_id: None,
988            agent_id: None,
989            origin_agent_id: None,
990            source_attempt_id: None,
991            source_tool: None,
992            labels: Vec::new(),
993            family: None,
994            importance: None,
995        }
996    }
997}
998
999#[derive(Clone, Debug)]
1000pub struct DereferenceOptions {
1001    pub run_id: Option<String>,
1002    pub reference_id: String,
1003    pub user_id: Option<String>,
1004    pub agent_id: Option<String>,
1005}
1006
1007impl DereferenceOptions {
1008    pub fn new(reference_id: impl Into<String>) -> Self {
1009        Self {
1010            run_id: None,
1011            reference_id: reference_id.into(),
1012            user_id: None,
1013            agent_id: None,
1014        }
1015    }
1016}
1017
1018#[derive(Clone, Debug)]
1019pub struct MemoryHealthOptions {
1020    pub run_id: Option<String>,
1021    pub user_id: Option<String>,
1022    pub stale_threshold_days: u32,
1023    pub limit: u32,
1024}
1025
1026impl Default for MemoryHealthOptions {
1027    fn default() -> Self {
1028        Self {
1029            run_id: None,
1030            user_id: None,
1031            stale_threshold_days: 30,
1032            limit: 500,
1033        }
1034    }
1035}
1036
1037#[derive(Clone, Debug)]
1038pub struct DiagnoseOptions {
1039    pub run_id: Option<String>,
1040    pub error_text: String,
1041    pub error_type: Option<String>,
1042    pub limit: u64,
1043    pub user_id: Option<String>,
1044}
1045
1046impl DiagnoseOptions {
1047    pub fn new(error_text: impl Into<String>) -> Self {
1048        Self {
1049            run_id: None,
1050            error_text: error_text.into(),
1051            error_type: None,
1052            limit: 10,
1053            user_id: None,
1054        }
1055    }
1056}
1057
1058#[derive(Clone, Debug, Default)]
1059pub struct ReflectOptions {
1060    pub run_id: Option<String>,
1061    pub include_linked_runs: bool,
1062    pub user_id: Option<String>,
1063    pub step_id: Option<String>,
1064    pub checkpoint_id: Option<String>,
1065    pub last_n_items: Option<u64>,
1066    pub include_step_outcomes: Option<bool>,
1067}
1068
1069#[derive(Clone, Debug, Default)]
1070pub struct ForgetOptions {
1071    pub run_id: Option<String>,
1072    pub lesson_id: Option<String>,
1073}
1074
1075impl ForgetOptions {
1076    pub fn for_run(run_id: impl Into<String>) -> Self {
1077        Self {
1078            run_id: Some(run_id.into()),
1079            lesson_id: None,
1080        }
1081    }
1082
1083    pub fn for_lesson(lesson_id: impl Into<String>) -> Self {
1084        Self {
1085            run_id: None,
1086            lesson_id: Some(lesson_id.into()),
1087        }
1088    }
1089}
1090
1091#[derive(Clone, Debug)]
1092pub struct CheckpointOptions {
1093    pub run_id: Option<String>,
1094    pub label: Option<String>,
1095    pub context_snapshot: String,
1096    pub metadata: Option<Value>,
1097    pub user_id: Option<String>,
1098    pub agent_id: Option<String>,
1099}
1100
1101impl CheckpointOptions {
1102    pub fn new(context_snapshot: impl Into<String>) -> Self {
1103        Self {
1104            run_id: None,
1105            label: None,
1106            context_snapshot: context_snapshot.into(),
1107            metadata: None,
1108            user_id: None,
1109            agent_id: None,
1110        }
1111    }
1112}
1113
1114#[derive(Clone, Debug)]
1115pub struct RegisterAgentOptions {
1116    pub run_id: Option<String>,
1117    pub agent_id: String,
1118    pub role: String,
1119    pub capabilities: Vec<String>,
1120    pub status: String,
1121    pub read_scopes: Vec<String>,
1122    pub write_scopes: Vec<String>,
1123    pub shared_memory_lanes: Vec<String>,
1124}
1125
1126impl RegisterAgentOptions {
1127    pub fn new(agent_id: impl Into<String>) -> Self {
1128        Self {
1129            run_id: None,
1130            agent_id: agent_id.into(),
1131            role: String::new(),
1132            capabilities: Vec::new(),
1133            status: "active".to_string(),
1134            read_scopes: Vec::new(),
1135            write_scopes: Vec::new(),
1136            shared_memory_lanes: Vec::new(),
1137        }
1138    }
1139}
1140
1141#[derive(Clone, Debug, Default)]
1142pub struct ListAgentsOptions {
1143    pub run_id: Option<String>,
1144}
1145
1146#[derive(Clone, Debug)]
1147pub struct RecordOutcomeOptions {
1148    pub run_id: Option<String>,
1149    pub reference_id: String,
1150    pub outcome: String,
1151    pub signal: f32,
1152    pub rationale: String,
1153    pub agent_id: Option<String>,
1154    pub user_id: Option<String>,
1155}
1156
1157impl RecordOutcomeOptions {
1158    pub fn new(reference_id: impl Into<String>, outcome: impl Into<String>) -> Self {
1159        Self {
1160            run_id: None,
1161            reference_id: reference_id.into(),
1162            outcome: outcome.into(),
1163            signal: 0.0,
1164            rationale: String::new(),
1165            agent_id: None,
1166            user_id: None,
1167        }
1168    }
1169}
1170
1171#[derive(Clone, Debug)]
1172pub struct RecordStepOutcomeOptions {
1173    pub run_id: Option<String>,
1174    pub step_id: String,
1175    pub step_name: Option<String>,
1176    pub outcome: String,
1177    pub signal: f32,
1178    pub rationale: String,
1179    pub directive_hint: Option<String>,
1180    pub agent_id: Option<String>,
1181    pub user_id: Option<String>,
1182    pub metadata: Option<Value>,
1183}
1184
1185impl RecordStepOutcomeOptions {
1186    pub fn new(step_id: impl Into<String>, outcome: impl Into<String>) -> Self {
1187        Self {
1188            run_id: None,
1189            step_id: step_id.into(),
1190            step_name: None,
1191            outcome: outcome.into(),
1192            signal: 0.0,
1193            rationale: String::new(),
1194            directive_hint: None,
1195            agent_id: None,
1196            user_id: None,
1197            metadata: None,
1198        }
1199    }
1200}
1201
1202#[derive(Clone, Debug)]
1203pub struct SurfaceStrategiesOptions {
1204    pub run_id: Option<String>,
1205    pub lesson_types: Vec<String>,
1206    pub max_strategies: u32,
1207    pub user_id: Option<String>,
1208}
1209
1210impl Default for SurfaceStrategiesOptions {
1211    fn default() -> Self {
1212        Self {
1213            run_id: None,
1214            lesson_types: Vec::new(),
1215            max_strategies: 5,
1216            user_id: None,
1217        }
1218    }
1219}
1220
1221#[derive(Clone, Debug)]
1222pub struct HandoffOptions {
1223    pub run_id: Option<String>,
1224    pub task_id: String,
1225    pub from_agent_id: String,
1226    pub to_agent_id: String,
1227    pub content: String,
1228    pub requested_action: String,
1229    pub metadata: Option<Value>,
1230    pub user_id: Option<String>,
1231}
1232
1233impl HandoffOptions {
1234    pub fn new(
1235        task_id: impl Into<String>,
1236        from_agent_id: impl Into<String>,
1237        to_agent_id: impl Into<String>,
1238        content: impl Into<String>,
1239    ) -> Self {
1240        Self {
1241            run_id: None,
1242            task_id: task_id.into(),
1243            from_agent_id: from_agent_id.into(),
1244            to_agent_id: to_agent_id.into(),
1245            content: content.into(),
1246            requested_action: "continue".to_string(),
1247            metadata: None,
1248            user_id: None,
1249        }
1250    }
1251}
1252
1253#[derive(Clone, Debug)]
1254pub struct FeedbackOptions {
1255    pub run_id: Option<String>,
1256    pub handoff_id: String,
1257    pub verdict: String,
1258    pub comments: String,
1259    pub from_agent_id: Option<String>,
1260    pub metadata: Option<Value>,
1261    pub user_id: Option<String>,
1262}
1263
1264impl FeedbackOptions {
1265    pub fn new(handoff_id: impl Into<String>, verdict: impl Into<String>) -> Self {
1266        Self {
1267            run_id: None,
1268            handoff_id: handoff_id.into(),
1269            verdict: verdict.into(),
1270            comments: String::new(),
1271            from_agent_id: None,
1272            metadata: None,
1273            user_id: None,
1274        }
1275    }
1276}
1277
1278impl Client {
1279    pub async fn remember(&self, options: RememberOptions) -> Result<Value> {
1280        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "remember")?;
1281        let content = require_non_empty_string(options.content, "content")?;
1282        let item_id = options
1283            .item_id
1284            .unwrap_or_else(|| generate_helper_id("remember"));
1285        let accepted = self
1286            .control
1287            .ingest(prune_nulls(json!({
1288                "run_id": run_id,
1289                "agent_id": options.agent_id.unwrap_or_else(|| "sdk-client".to_string()),
1290                "idempotency_key": options.idempotency_key.unwrap_or_else(|| item_id.clone()),
1291                "parallel": options.parallel,
1292                "items": [{
1293                    "item_id": item_id,
1294                    "content_type": options.content_type,
1295                    "text": content,
1296                    "payload_json": encode_optional_json(options.payload.as_ref())?,
1297                    "hints_json": encode_optional_json(options.hints.as_ref())?,
1298                    "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1299                    "intent": options.intent,
1300                    "lesson_type": options.lesson_type,
1301                    "lesson_scope": options.lesson_scope,
1302                    "lesson_importance": options.lesson_importance,
1303                    "lesson_conditions_json": encode_string_vec(&options.lesson_conditions)?,
1304                    "user_id": options.user_id,
1305                    "upsert_key": options.upsert_key,
1306                    "importance": options.importance,
1307                    "source": options.source.unwrap_or_else(|| "agent".to_string()),
1308                    "lane": options.lane,
1309                    "occurrence_time": options.occurrence_time.unwrap_or(0),
1310                }],
1311            })))
1312            .await?;
1313
1314        if !options.wait {
1315            return Ok(accepted);
1316        }
1317
1318        let Some(job_id) = accepted.get("job_id").and_then(|value| value.as_str()) else {
1319            return Ok(accepted);
1320        };
1321
1322        self.wait_for_ingest_job(
1323            &run_id,
1324            job_id,
1325            options
1326                .timeout_ms
1327                .unwrap_or_else(|| self.transport.timeout.as_millis() as u64),
1328            options.poll_interval_ms,
1329        )
1330        .await
1331    }
1332
1333    pub async fn recall(&self, options: RecallOptions) -> Result<Value> {
1334        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "recall")?;
1335        self.control
1336            .query(prune_nulls(json!({
1337                "run_id": run_id,
1338                "query": require_non_empty_string(options.query, "query")?,
1339                "schema": options.schema,
1340                "mode": options.mode,
1341                "direct_lane": options.direct_lane,
1342                "include_linked_runs": options.include_linked_runs,
1343                "limit": options.limit,
1344                "embedding": options.embedding,
1345                "entry_types": if options.entry_types.is_empty() { Value::Null } else { json!(options.entry_types) },
1346                "include_working_memory": options.include_working_memory,
1347                "user_id": options.user_id,
1348                "agent_id": options.agent_id,
1349                "lane": options.lane,
1350                "min_timestamp": options.min_timestamp.unwrap_or(0),
1351                "max_timestamp": options.max_timestamp.unwrap_or(0),
1352                "budget": options.budget,
1353            })))
1354            .await
1355    }
1356
1357    pub async fn get_context(&self, options: GetContextOptions) -> Result<Value> {
1358        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "get_context")?;
1359        self.control
1360            .context(prune_nulls(json!({
1361                "run_id": run_id,
1362                "query": require_non_empty_string(options.query.unwrap_or_default(), "query")?,
1363                "user_id": options.user_id,
1364                "entry_types": if options.entry_types.is_empty() { Value::Null } else { json!(options.entry_types) },
1365                "include_working_memory": options.include_working_memory,
1366                "format": options.format.unwrap_or_else(|| "structured".to_string()),
1367                "limit": options.limit.unwrap_or(5),
1368                "max_token_budget": options.max_token_budget.unwrap_or(0),
1369                "agent_id": options.agent_id,
1370                "mode": options.mode.unwrap_or_else(|| "full".to_string()),
1371                "sections": if options.sections.is_empty() { Value::Null } else { json!(options.sections) },
1372                "lane": options.lane,
1373            })))
1374            .await
1375    }
1376
1377    pub async fn archive(&self, options: ArchiveOptions) -> Result<Value> {
1378        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "archive")?;
1379        let content = require_non_empty_string(options.content, "content")?;
1380        let artifact_kind = require_non_empty_string(options.artifact_kind, "artifact_kind")?;
1381        let agent_id = options.agent_id.clone();
1382        self.control
1383            .archive_block(prune_nulls(json!({
1384                "run_id": run_id,
1385                "content": content,
1386                "artifact_kind": artifact_kind,
1387                "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1388                "user_id": options.user_id,
1389                "agent_id": agent_id.clone(),
1390                "origin_agent_id": options.origin_agent_id.or(agent_id),
1391                "source_attempt_id": options.source_attempt_id,
1392                "source_tool": options.source_tool,
1393                "labels": if options.labels.is_empty() { Value::Null } else { json!(options.labels) },
1394                "family": options.family,
1395                "importance": options.importance,
1396            })))
1397            .await
1398    }
1399
1400    pub async fn archive_block(&self, options: ArchiveOptions) -> Result<Value> {
1401        self.archive(options).await
1402    }
1403
1404    pub async fn dereference(&self, options: DereferenceOptions) -> Result<Value> {
1405        let run_id =
1406            resolve_helper_run_id(options.run_id, self.transport.run_id(), "dereference")?;
1407        self.control
1408            .dereference(prune_nulls(json!({
1409                "run_id": run_id,
1410                "reference_id": require_non_empty_string(options.reference_id, "reference_id")?,
1411                "user_id": options.user_id,
1412                "agent_id": options.agent_id,
1413            })))
1414            .await
1415    }
1416
1417    pub async fn memory_health(&self, options: MemoryHealthOptions) -> Result<Value> {
1418        let run_id =
1419            resolve_helper_run_id(options.run_id, self.transport.run_id(), "memory_health")?;
1420        self.control
1421            .memory_health(prune_nulls(json!({
1422                "run_id": run_id,
1423                "user_id": options.user_id,
1424                "stale_threshold_days": options.stale_threshold_days,
1425                "limit": options.limit,
1426            })))
1427            .await
1428    }
1429
1430    pub async fn diagnose(&self, options: DiagnoseOptions) -> Result<Value> {
1431        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "diagnose")?;
1432        self.control
1433            .diagnose(prune_nulls(json!({
1434                "run_id": run_id,
1435                "error_text": require_non_empty_string(options.error_text, "error_text")?,
1436                "error_type": options.error_type,
1437                "limit": options.limit,
1438                "user_id": options.user_id,
1439            })))
1440            .await
1441    }
1442
1443    pub async fn reflect(&self, options: ReflectOptions) -> Result<Value> {
1444        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "reflect")?;
1445        self.control
1446            .reflect(prune_nulls(json!({
1447                "run_id": run_id,
1448                "include_linked_runs": options.include_linked_runs,
1449                "user_id": options.user_id,
1450                "step_id": options.step_id,
1451                "checkpoint_id": options.checkpoint_id,
1452                "last_n_items": options.last_n_items,
1453                "include_step_outcomes": options.include_step_outcomes,
1454            })))
1455            .await
1456    }
1457
1458    pub async fn forget(&self, options: ForgetOptions) -> Result<Value> {
1459        let delete_lesson = options
1460            .lesson_id
1461            .as_ref()
1462            .map(|value| !value.trim().is_empty())
1463            .unwrap_or(false);
1464        let run_id = if options.run_id.is_some() {
1465            options.run_id
1466        } else if delete_lesson {
1467            None
1468        } else {
1469            self.transport.run_id()
1470        };
1471        let delete_run = run_id
1472            .as_ref()
1473            .map(|value| !value.trim().is_empty())
1474            .unwrap_or(false);
1475
1476        if (delete_lesson as u8) + (delete_run as u8) != 1 {
1477            return Err(SdkError::ValidationError(
1478                "forget requires either lesson_id or run_id, but not both".to_string(),
1479            ));
1480        }
1481
1482        if delete_lesson {
1483            return self
1484                .control
1485                .delete_lesson(json!({ "lesson_id": options.lesson_id.unwrap_or_default() }))
1486                .await;
1487        }
1488
1489        self.control
1490            .delete_run(json!({ "run_id": run_id.unwrap_or_default() }))
1491            .await
1492    }
1493
1494    pub async fn checkpoint(&self, options: CheckpointOptions) -> Result<Value> {
1495        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "checkpoint")?;
1496        self.control
1497            .checkpoint(prune_nulls(json!({
1498                "run_id": run_id,
1499                "label": options.label,
1500                "context_snapshot": require_non_empty_string(options.context_snapshot, "context_snapshot")?,
1501                "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1502                "user_id": options.user_id,
1503                "agent_id": options.agent_id,
1504            })))
1505            .await
1506    }
1507
1508    pub async fn register_agent(&self, options: RegisterAgentOptions) -> Result<Value> {
1509        let run_id =
1510            resolve_helper_run_id(options.run_id, self.transport.run_id(), "register_agent")?;
1511        self.control
1512            .register_agent(prune_nulls(json!({
1513                "run_id": run_id,
1514                "agent_id": require_non_empty_string(options.agent_id, "agent_id")?,
1515                "role": options.role,
1516                "capabilities": if options.capabilities.is_empty() { Value::Null } else { json!(options.capabilities) },
1517                "status": options.status,
1518                "read_scopes": if options.read_scopes.is_empty() { Value::Null } else { json!(options.read_scopes) },
1519                "write_scopes": if options.write_scopes.is_empty() { Value::Null } else { json!(options.write_scopes) },
1520                "shared_memory_lanes": if options.shared_memory_lanes.is_empty() { Value::Null } else { json!(options.shared_memory_lanes) },
1521            })))
1522            .await
1523    }
1524
1525    pub async fn list_agents(&self, options: ListAgentsOptions) -> Result<Value> {
1526        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "list_agents")?;
1527        self.control.list_agents(json!({ "run_id": run_id })).await
1528    }
1529
1530    pub async fn record_outcome(&self, options: RecordOutcomeOptions) -> Result<Value> {
1531        let run_id =
1532            resolve_helper_run_id(options.run_id, self.transport.run_id(), "record_outcome")?;
1533        self.control
1534            .record_outcome(prune_nulls(json!({
1535                "run_id": run_id,
1536                "reference_id": require_non_empty_string(options.reference_id, "reference_id")?,
1537                "outcome": require_non_empty_string(options.outcome, "outcome")?,
1538                "signal": options.signal,
1539                "rationale": options.rationale,
1540                "agent_id": options.agent_id,
1541                "user_id": options.user_id,
1542            })))
1543            .await
1544    }
1545
1546    pub async fn record_step_outcome(&self, options: RecordStepOutcomeOptions) -> Result<Value> {
1547        let run_id =
1548            resolve_helper_run_id(options.run_id, self.transport.run_id(), "record_step_outcome")?;
1549        self.control
1550            .record_outcome(prune_nulls(json!({
1551                "run_id": run_id,
1552                "step_id": require_non_empty_string(options.step_id, "step_id")?,
1553                "step_name": options.step_name,
1554                "outcome": require_non_empty_string(options.outcome, "outcome")?,
1555                "signal": options.signal,
1556                "rationale": options.rationale,
1557                "directive_hint": options.directive_hint,
1558                "agent_id": options.agent_id,
1559                "user_id": options.user_id,
1560                "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1561            })))
1562            .await
1563    }
1564
1565    pub async fn surface_strategies(&self, options: SurfaceStrategiesOptions) -> Result<Value> {
1566        let run_id = resolve_helper_run_id(
1567            options.run_id,
1568            self.transport.run_id(),
1569            "surface_strategies",
1570        )?;
1571        self.control
1572            .surface_strategies(prune_nulls(json!({
1573                "run_id": run_id,
1574                "lesson_types": if options.lesson_types.is_empty() { Value::Null } else { json!(options.lesson_types) },
1575                "max_strategies": options.max_strategies,
1576                "user_id": options.user_id,
1577            })))
1578            .await
1579    }
1580
1581    pub async fn handoff(&self, options: HandoffOptions) -> Result<Value> {
1582        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "handoff")?;
1583        self.control
1584            .create_handoff(prune_nulls(json!({
1585                "run_id": run_id,
1586                "task_id": require_non_empty_string(options.task_id, "task_id")?,
1587                "from_agent_id": require_non_empty_string(options.from_agent_id, "from_agent_id")?,
1588                "to_agent_id": require_non_empty_string(options.to_agent_id, "to_agent_id")?,
1589                "content": require_non_empty_string(options.content, "content")?,
1590                "requested_action": options.requested_action,
1591                "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1592                "user_id": options.user_id,
1593            })))
1594            .await
1595    }
1596
1597    pub async fn feedback(&self, options: FeedbackOptions) -> Result<Value> {
1598        let run_id = resolve_helper_run_id(options.run_id, self.transport.run_id(), "feedback")?;
1599        self.control
1600            .submit_feedback(prune_nulls(json!({
1601                "run_id": run_id,
1602                "handoff_id": require_non_empty_string(options.handoff_id, "handoff_id")?,
1603                "verdict": require_non_empty_string(options.verdict, "verdict")?,
1604                "comments": options.comments,
1605                "from_agent_id": options.from_agent_id,
1606                "metadata_json": encode_optional_json(options.metadata.as_ref())?,
1607                "user_id": options.user_id,
1608            })))
1609            .await
1610    }
1611
1612    async fn wait_for_ingest_job(
1613        &self,
1614        run_id: &str,
1615        job_id: &str,
1616        timeout_ms: u64,
1617        poll_interval_ms: u64,
1618    ) -> Result<Value> {
1619        let deadline = Instant::now() + Duration::from_millis(timeout_ms);
1620        loop {
1621            let job = self
1622                .control
1623                .get_ingest_job(json!({ "run_id": run_id, "job_id": job_id }))
1624                .await?;
1625            if job
1626                .get("done")
1627                .and_then(|value| value.as_bool())
1628                .unwrap_or(false)
1629            {
1630                return Ok(job);
1631            }
1632            if Instant::now() >= deadline {
1633                return Err(SdkError::TransportError {
1634                    kind: TransportFailureKind::DeadlineExceeded,
1635                    message: format!("timed out waiting for ingest job {}", job_id),
1636                });
1637            }
1638            sleep(Duration::from_millis(poll_interval_ms)).await;
1639        }
1640    }
1641}
1642
1643#[derive(Clone)]
1644pub struct AuthClient {
1645    transport: Arc<TransportEngine>,
1646}
1647
1648impl AuthClient {
1649    pub async fn health(&self) -> Result<Value> {
1650        self.transport.invoke("auth.health", json!({})).await
1651    }
1652
1653    pub fn set_api_key(&self, api_key: Option<String>) {
1654        self.transport.set_api_key(api_key);
1655    }
1656
1657    pub fn set_token(&self, token: Option<String>) {
1658        self.set_api_key(token);
1659    }
1660
1661    pub fn set_run_id(&self, run_id: Option<String>) {
1662        self.transport.set_run_id(run_id);
1663    }
1664}
1665
1666macro_rules! define_auth_payload_methods {
1667    ($($name:ident => $op_key:literal),+ $(,)?) => {
1668        impl AuthClient {
1669            $(
1670                pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
1671                    self.transport.invoke_serialized($op_key, payload).await
1672                }
1673            )+
1674        }
1675    };
1676}
1677
1678define_auth_payload_methods!(
1679    create_user => "auth.create_user",
1680    rotate_user_api_key => "auth.rotate_user_api_key",
1681    revoke_user_api_key => "auth.revoke_user_api_key",
1682    list_users => "auth.list_users",
1683    get_user => "auth.get_user",
1684    delete_user => "auth.delete_user"
1685);
1686
1687#[derive(Clone)]
1688pub struct CoreClient {
1689    transport: Arc<TransportEngine>,
1690}
1691
1692macro_rules! define_core_payload_methods {
1693    ($($name:ident => $op_key:literal),+ $(,)?) => {
1694        impl CoreClient {
1695            $(
1696                pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
1697                    self.transport.invoke_serialized($op_key, payload).await
1698                }
1699            )+
1700        }
1701    };
1702}
1703
1704define_core_payload_methods!(
1705    insert => "core.insert",
1706    batch_insert => "core.batch_insert",
1707    search => "core.search",
1708    delete_node => "core.delete_node",
1709    delete_run => "core.delete_run",
1710    create_session => "core.create_session",
1711    snapshot_session => "core.snapshot_session",
1712    load_session => "core.load_session",
1713    commit_session => "core.commit_session",
1714    drop_session => "core.drop_session",
1715    write_memory => "core.write_memory",
1716    read_memory => "core.read_memory",
1717    add_memory => "core.add_memory",
1718    get_memory => "core.get_memory",
1719    clear_memory => "core.clear_memory",
1720    grant_permission => "core.grant_permission",
1721    revoke_permission => "core.revoke_permission",
1722    check_permission => "core.check_permission",
1723    unsubscribe_events => "core.unsubscribe_events"
1724);
1725
1726impl CoreClient {
1727    pub async fn subscribe_events<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
1728        self.transport.invoke_stream_serialized("core.subscribe_events", payload).await
1729    }
1730
1731    pub async fn watch_memory<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
1732        self.transport.invoke_stream_serialized("core.watch_memory", payload).await
1733    }
1734
1735    pub async fn list_subscriptions(&self) -> Result<Value> {
1736        self.transport
1737            .invoke("core.list_subscriptions", json!({}))
1738            .await
1739    }
1740
1741    pub async fn storage_stats(&self) -> Result<Value> {
1742        self.transport.invoke("core.storage_stats", json!({})).await
1743    }
1744
1745    pub async fn trigger_compaction(&self) -> Result<Value> {
1746        self.transport
1747            .invoke("core.trigger_compaction", json!({}))
1748            .await
1749    }
1750}
1751
1752#[derive(Clone)]
1753pub struct ControlClient {
1754    transport: Arc<TransportEngine>,
1755}
1756
1757macro_rules! define_control_payload_methods {
1758    ($($name:ident => $op_key:literal),+ $(,)?) => {
1759        impl ControlClient {
1760            $(
1761                pub async fn $name<T: Serialize>(&self, payload: T) -> Result<Value> {
1762                    self.transport.invoke_serialized($op_key, payload).await
1763                }
1764            )+
1765        }
1766    };
1767}
1768
1769define_control_payload_methods!(
1770    set_variable => "control.set_variable",
1771    get_variable => "control.get_variable",
1772    list_variables => "control.list_variables",
1773    delete_variable => "control.delete_variable",
1774    define_concept => "control.define_concept",
1775    list_concepts => "control.list_concepts",
1776    add_goal => "control.add_goal",
1777    update_goal => "control.update_goal",
1778    list_goals => "control.list_goals",
1779    get_goal_tree => "control.get_goal_tree",
1780    submit_action => "control.submit_action",
1781    get_action_log => "control.get_action_log",
1782    run_cycle => "control.run_cycle",
1783    get_cycle_history => "control.get_cycle_history",
1784    register_agent => "control.register_agent",
1785    agent_heartbeat => "control.agent_heartbeat",
1786    append_activity => "control.append_activity",
1787    context_snapshot => "control.context_snapshot",
1788    link_run => "control.link_run",
1789    unlink_run => "control.unlink_run",
1790    ingest => "control.ingest",
1791    batch_insert => "control.batch_insert",
1792    get_ingest_job => "control.get_ingest_job",
1793    query => "control.query",
1794    diagnose => "control.diagnose",
1795    delete_run => "control.delete_run",
1796    reflect => "control.reflect",
1797    lessons => "control.lessons",
1798    delete_lesson => "control.delete_lesson",
1799    context => "control.context",
1800    archive_block => "control.archive_block",
1801    dereference => "control.dereference",
1802    memory_health => "control.memory_health",
1803    checkpoint => "control.checkpoint",
1804    list_agents => "control.list_agents",
1805    create_handoff => "control.create_handoff",
1806    submit_feedback => "control.submit_feedback",
1807    record_outcome => "control.record_outcome",
1808    surface_strategies => "control.surface_strategies"
1809);
1810
1811impl ControlClient {
1812    pub async fn subscribe<T: Serialize>(&self, payload: T) -> Result<ValueStream> {
1813        self.transport.invoke_stream_serialized("control.subscribe", payload).await
1814    }
1815}
1816
1817fn parse_sse_byte_stream(
1818    byte_stream: impl Stream<Item = reqwest::Result<bytes::Bytes>> + Send + 'static,
1819) -> impl Stream<Item = Result<Value>> + Send {
1820    let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value>>(64);
1821    tokio::spawn(async move {
1822        tokio::pin!(byte_stream);
1823        let mut buffer = String::new();
1824        while let Some(chunk_result) = byte_stream.next().await {
1825            match chunk_result {
1826                Ok(chunk) => {
1827                    buffer.push_str(&String::from_utf8_lossy(&chunk));
1828                    while let Some(newline_pos) = buffer.find('\n') {
1829                        let line = buffer[..newline_pos].trim().to_string();
1830                        buffer = buffer[newline_pos + 1..].to_string();
1831                        if line.is_empty() || line.starts_with(':') {
1832                            continue;
1833                        }
1834                        let data = if line.starts_with("data: ") {
1835                            &line[6..]
1836                        } else {
1837                            &line
1838                        };
1839                        let value = match serde_json::from_str::<Value>(data) {
1840                            Ok(value) => Ok(value),
1841                            Err(_) => Ok(Value::String(data.to_string())),
1842                        };
1843                        if tx.send(value).await.is_err() {
1844                            return;
1845                        }
1846                    }
1847                }
1848                Err(e) => {
1849                    let _ = tx
1850                        .send(Err(SdkError::TransportError {
1851                            kind: TransportFailureKind::Io,
1852                            message: e.to_string(),
1853                        }))
1854                        .await;
1855                    return;
1856                }
1857            }
1858        }
1859        // Flush remaining buffer
1860        let remaining = buffer.trim().to_string();
1861        if !remaining.is_empty() && !remaining.starts_with(':') {
1862            let data = if remaining.starts_with("data: ") {
1863                &remaining[6..]
1864            } else {
1865                &remaining
1866            };
1867            let value = match serde_json::from_str::<Value>(data) {
1868                Ok(value) => Ok(value),
1869                Err(_) => Ok(Value::String(data.to_string())),
1870            };
1871            let _ = tx.send(value).await;
1872        }
1873    });
1874    tokio_stream::wrappers::ReceiverStream::new(rx)
1875}
1876
1877fn prune_nulls(value: Value) -> Value {
1878    match value {
1879        Value::Object(map) => Value::Object(
1880            map.into_iter()
1881                .filter_map(|(key, value)| {
1882                    let cleaned = prune_nulls(value);
1883                    if cleaned.is_null() {
1884                        None
1885                    } else {
1886                        Some((key, cleaned))
1887                    }
1888                })
1889                .collect::<Map<String, Value>>(),
1890        ),
1891        Value::Array(items) => Value::Array(items.into_iter().map(prune_nulls).collect()),
1892        other => other,
1893    }
1894}
1895
1896fn resolve_helper_run_id(
1897    explicit: Option<String>,
1898    fallback: Option<String>,
1899    helper_name: &str,
1900) -> Result<String> {
1901    let candidate = explicit.or(fallback).unwrap_or_default();
1902    if candidate.trim().is_empty() {
1903        return Err(SdkError::ValidationError(format!(
1904            "{} requires run_id or a client default run_id",
1905            helper_name
1906        )));
1907    }
1908    Ok(candidate)
1909}
1910
1911fn require_non_empty_string(value: String, field_name: &str) -> Result<String> {
1912    if value.trim().is_empty() {
1913        return Err(SdkError::ValidationError(format!(
1914            "{} is required",
1915            field_name
1916        )));
1917    }
1918    Ok(value)
1919}
1920
1921fn encode_optional_json(value: Option<&Value>) -> Result<String> {
1922    match value {
1923        Some(value) => serde_json::to_string(value).map_err(|err| {
1924            SdkError::ValidationError(format!("failed to serialize helper json field: {}", err))
1925        }),
1926        None => Ok(String::new()),
1927    }
1928}
1929
1930fn encode_string_vec(values: &[String]) -> Result<String> {
1931    if values.is_empty() {
1932        return Ok(String::new());
1933    }
1934    serde_json::to_string(values).map_err(|err| {
1935        SdkError::ValidationError(format!("failed to serialize helper string list: {}", err))
1936    })
1937}
1938
1939fn generate_helper_id(prefix: &str) -> String {
1940    use std::time::{SystemTime, UNIX_EPOCH};
1941
1942    let millis = SystemTime::now()
1943        .duration_since(UNIX_EPOCH)
1944        .unwrap_or_default()
1945        .as_millis();
1946    format!("{}-{}", prefix, millis)
1947}
1948
1949fn ensure_object_payload(payload: Value) -> Result<Value> {
1950    match payload {
1951        Value::Null => Ok(json!({})),
1952        Value::Object(_) => Ok(payload),
1953        _ => Err(SdkError::ValidationError(
1954            "payload must serialize to a JSON object".to_string(),
1955        )),
1956    }
1957}
1958
1959fn decode_grpc_request<T: DeserializeOwned>(op_key: &str, payload: Value) -> Result<T> {
1960    serde_json::from_value(payload).map_err(|e| {
1961        SdkError::ValidationError(format!(
1962            "invalid gRPC request payload for {}: {}",
1963            op_key, e
1964        ))
1965    })
1966}
1967
1968fn encode_grpc_response<T: Serialize>(op_key: &str, response: T) -> Result<Value> {
1969    serde_json::to_value(response).map_err(|e| {
1970        SdkError::ServerError(format!(
1971            "failed to serialize gRPC response for {}: {}",
1972            op_key, e
1973        ))
1974    })
1975}
1976
1977fn decode_batch_insert_payload(
1978    payload: Value,
1979) -> Result<Vec<crate::proto::mubit::v1::InsertRequest>> {
1980    let payload = ensure_object_payload(payload)?;
1981    let mut extracted: Option<&Value> = None;
1982    if let Some(map) = payload.as_object() {
1983        for key in ["items", "requests", "nodes"] {
1984            if let Some(value) = map.get(key) {
1985                extracted = Some(value);
1986                break;
1987            }
1988        }
1989    }
1990
1991    if let Some(Value::Array(items)) = extracted {
1992        if items.is_empty() {
1993            return Err(SdkError::ValidationError(
1994                "batch_insert gRPC payload cannot provide an empty items list".to_string(),
1995            ));
1996        }
1997        return items
1998            .iter()
1999            .cloned()
2000            .map(|item| decode_grpc_request("core.batch_insert", item))
2001            .collect();
2002    }
2003
2004    if extracted.is_some() {
2005        return Err(SdkError::ValidationError(
2006            "batch_insert gRPC payload items/requests/nodes must be an array".to_string(),
2007        ));
2008    }
2009
2010    let single = decode_grpc_request("core.batch_insert", payload)?;
2011    Ok(vec![single])
2012}
2013
2014fn value_to_param(value: &Value) -> Option<String> {
2015    match value {
2016        Value::String(v) => Some(v.clone()),
2017        Value::Number(v) => Some(v.to_string()),
2018        Value::Bool(v) => Some(v.to_string()),
2019        _ => None,
2020    }
2021}
2022
2023fn value_to_query(value: &Value) -> Option<String> {
2024    match value {
2025        Value::String(v) => Some(v.clone()),
2026        Value::Number(v) => Some(v.to_string()),
2027        Value::Bool(v) => Some(v.to_string()),
2028        Value::Array(items) => {
2029            let rendered: Vec<String> = items.iter().filter_map(value_to_query).collect();
2030            if rendered.is_empty() {
2031                None
2032            } else {
2033                Some(rendered.join(","))
2034            }
2035        }
2036        _ => None,
2037    }
2038}
2039
2040fn derive_http_and_grpc(endpoint: &str) -> Result<(String, String, bool)> {
2041    let endpoint = if endpoint.contains("://") {
2042        endpoint.to_string()
2043    } else {
2044        format!("http://{}", endpoint)
2045    };
2046
2047    let parsed = Url::parse(&endpoint).map_err(|e| {
2048        SdkError::ValidationError(format!("invalid endpoint '{}': {}", endpoint, e))
2049    })?;
2050
2051    let host = parsed.host_str().ok_or_else(|| {
2052        SdkError::ValidationError(format!("endpoint '{}' missing host", endpoint))
2053    })?;
2054
2055    let scheme = parsed.scheme();
2056    let port = parsed.port_or_known_default().ok_or_else(|| {
2057        SdkError::ValidationError(format!(
2058            "endpoint '{}' missing known default port",
2059            endpoint
2060        ))
2061    })?;
2062
2063    let default_port = match scheme {
2064        "https" => 443,
2065        _ => 80,
2066    };
2067
2068    let http_endpoint = if port == default_port {
2069        format!("{}://{}", scheme, host)
2070    } else {
2071        format!("{}://{}:{}", scheme, host, port)
2072    };
2073
2074    let grpc_endpoint = format!("{}:{}", host, port);
2075    let grpc_tls = scheme.eq_ignore_ascii_case("https");
2076
2077    Ok((http_endpoint, grpc_endpoint, grpc_tls))
2078}
2079
2080fn normalize_http_endpoint(endpoint: &str) -> Result<String> {
2081    let endpoint = if endpoint.contains("://") {
2082        endpoint.to_string()
2083    } else {
2084        format!("http://{}", endpoint)
2085    };
2086
2087    let parsed = Url::parse(&endpoint).map_err(|e| {
2088        SdkError::ValidationError(format!("invalid http_endpoint '{}': {}", endpoint, e))
2089    })?;
2090
2091    let host = parsed.host_str().ok_or_else(|| {
2092        SdkError::ValidationError(format!("http_endpoint '{}' missing host", endpoint))
2093    })?;
2094
2095    let scheme = parsed.scheme();
2096    let port = parsed.port_or_known_default().ok_or_else(|| {
2097        SdkError::ValidationError(format!(
2098            "http_endpoint '{}' missing known default port",
2099            endpoint
2100        ))
2101    })?;
2102
2103    let default_port = if scheme.eq_ignore_ascii_case("https") {
2104        443
2105    } else {
2106        80
2107    };
2108
2109    let normalized = if port == default_port {
2110        format!("{}://{}", scheme, host)
2111    } else {
2112        format!("{}://{}:{}", scheme, host, port)
2113    };
2114
2115    Ok(normalized)
2116}
2117
2118fn normalize_grpc_endpoint(endpoint: &str) -> Result<(String, bool)> {
2119    if endpoint.contains("://") {
2120        let parsed = Url::parse(endpoint).map_err(|e| {
2121            SdkError::ValidationError(format!("invalid grpc_endpoint '{}': {}", endpoint, e))
2122        })?;
2123        let host = parsed.host_str().ok_or_else(|| {
2124            SdkError::ValidationError(format!("grpc_endpoint '{}' missing host", endpoint))
2125        })?;
2126        let port = parsed.port_or_known_default().ok_or_else(|| {
2127            SdkError::ValidationError(format!(
2128                "grpc_endpoint '{}' missing known default port",
2129                endpoint
2130            ))
2131        })?;
2132        return Ok((
2133            format!("{}:{}", host, port),
2134            parsed.scheme().eq_ignore_ascii_case("https")
2135                || parsed.scheme().eq_ignore_ascii_case("grpcs"),
2136        ));
2137    }
2138
2139    let endpoint = endpoint.trim();
2140    if endpoint.is_empty() {
2141        return Err(SdkError::ValidationError(
2142            "grpc_endpoint cannot be empty".to_string(),
2143        ));
2144    }
2145
2146    if let Some((host, port_text)) = endpoint.rsplit_once(':') {
2147        if !host.trim().is_empty() {
2148            if let Ok(port) = port_text.parse::<u16>() {
2149                return Ok((format!("{}:{}", host, port), port == 443));
2150            }
2151        }
2152        return Ok((endpoint.to_string(), false));
2153    }
2154
2155    Ok((format!("{}:50051", endpoint), false))
2156}
2157
2158fn map_grpc_connect_error(error: tonic::transport::Error, endpoint: &str) -> SdkError {
2159    let lower = error.to_string().to_lowercase();
2160    let kind = if lower.contains("deadline") || lower.contains("timed out") {
2161        TransportFailureKind::DeadlineExceeded
2162    } else if lower.contains("connection reset") {
2163        TransportFailureKind::ConnectionReset
2164    } else if lower.contains("dns")
2165        || lower.contains("refused")
2166        || lower.contains("unavailable")
2167        || lower.contains("not connected")
2168    {
2169        TransportFailureKind::Unavailable
2170    } else {
2171        TransportFailureKind::Io
2172    };
2173
2174    SdkError::TransportError {
2175        kind,
2176        message: format!("failed to connect to gRPC endpoint {}: {}", endpoint, error),
2177    }
2178}
2179
2180fn map_grpc_status(status: Status) -> SdkError {
2181    let message = status.message().to_string();
2182    match status.code() {
2183        Code::Unauthenticated | Code::PermissionDenied => SdkError::AuthError(message),
2184        Code::InvalidArgument
2185        | Code::NotFound
2186        | Code::AlreadyExists
2187        | Code::FailedPrecondition
2188        | Code::OutOfRange => SdkError::ValidationError(message),
2189        Code::Unavailable => SdkError::TransportError {
2190            kind: TransportFailureKind::Unavailable,
2191            message,
2192        },
2193        Code::DeadlineExceeded => SdkError::TransportError {
2194            kind: TransportFailureKind::DeadlineExceeded,
2195            message,
2196        },
2197        Code::Unimplemented => SdkError::TransportError {
2198            kind: TransportFailureKind::Unimplemented,
2199            message,
2200        },
2201        Code::Cancelled => SdkError::TransportError {
2202            kind: TransportFailureKind::Io,
2203            message,
2204        },
2205        Code::Unknown | Code::Internal => {
2206            let lower = message.to_lowercase();
2207            if lower.contains("connection reset") {
2208                SdkError::TransportError {
2209                    kind: TransportFailureKind::ConnectionReset,
2210                    message,
2211                }
2212            } else if lower.contains("transport")
2213                || lower.contains("broken pipe")
2214                || lower.contains("io error")
2215            {
2216                SdkError::TransportError {
2217                    kind: TransportFailureKind::Io,
2218                    message,
2219                }
2220            } else {
2221                SdkError::ServerError(message)
2222            }
2223        }
2224        _ => SdkError::ServerError(message),
2225    }
2226}
2227
2228fn map_transport_error(error: reqwest::Error, message: String) -> SdkError {
2229    let lower = error.to_string().to_lowercase();
2230    let kind = if error.is_timeout() {
2231        TransportFailureKind::DeadlineExceeded
2232    } else if error.is_connect() {
2233        TransportFailureKind::Unavailable
2234    } else if lower.contains("connection reset") {
2235        TransportFailureKind::ConnectionReset
2236    } else if error.is_request() || error.is_body() {
2237        TransportFailureKind::Io
2238    } else {
2239        TransportFailureKind::Other
2240    };
2241
2242    SdkError::TransportError {
2243        kind,
2244        message: format!("{}: {}", message, error),
2245    }
2246}
2247
2248fn map_http_error(status: u16, body: String) -> SdkError {
2249    match status {
2250        401 | 403 => SdkError::AuthError(body),
2251        400 | 404 | 409 | 422 => SdkError::ValidationError(body),
2252        501 => SdkError::UnsupportedFeatureError(body),
2253        _ => SdkError::ServerError(body),
2254    }
2255}
2256
2257fn http_method_label(method: HttpMethod) -> &'static str {
2258    match method {
2259        HttpMethod::Get => "GET",
2260        HttpMethod::Post => "POST",
2261        HttpMethod::Delete => "DELETE",
2262    }
2263}