Skip to main content

tracedb_sdk/
lib.rs

1#![forbid(unsafe_code)]
2//! TraceDB Rust SDK for current local HTTP workflows.
3//!
4//! # Examples
5//!
6//! ```rust,no_run
7//! # use tracedb_sdk::{TraceDbClient, TraceDbClientConfig};
8//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
9//! let url = String::from("http://127.0.0.1:8090");
10//! let token = String::from("dev-token");
11//! let config = TraceDbClientConfig::managed(url, token);
12//! let client = TraceDbClient::new(config);
13//! let ready = client.ready_typed()?;
14//! println!("ready: {}", ready.ready);
15//! # Ok(())
16//! # }
17//! ```
18
19pub const VERSION: &str = env!("CARGO_PKG_VERSION");
20pub const NAME: &str = env!("CARGO_PKG_NAME");
21
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Map, Value};
24use std::env;
25use std::error::Error;
26use std::fmt::{Display, Formatter};
27use std::io::{Read, Write};
28use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
29use std::thread;
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32pub type TraceDbClientResult<T> = std::result::Result<T, TraceDbClientError>;
33
34#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
35pub enum FeatureFreshnessMode {
36    Strict,
37    Lazy,
38    AllowDirty,
39    OnRead,
40    AllowStale,
41}
42
43#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
44pub enum FreshnessMode {
45    Strict,
46    Lazy,
47    AllowDirty,
48}
49
50#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
51pub struct VectorColumnSchema {
52    pub name: String,
53    pub dimensions: usize,
54    pub source_columns: Vec<String>,
55}
56
57#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
58pub struct TableSchema {
59    pub name: String,
60    pub primary_id_column: String,
61    pub tenant_id_column: String,
62    pub scalar_columns: Vec<String>,
63    pub text_indexed_columns: Vec<String>,
64    pub vector_columns: Vec<VectorColumnSchema>,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
68pub struct RecordInput {
69    pub table: String,
70    pub id: String,
71    pub tenant_id: String,
72    pub fields: Map<String, Value>,
73}
74
75#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct RecordOutput {
77    pub table: String,
78    pub id: String,
79    pub tenant_id: String,
80    pub version_id: u64,
81    pub fields: Map<String, Value>,
82}
83
84#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
85pub struct RecordPutBatchRequest {
86    #[serde(default)]
87    pub include_write_timing: bool,
88    pub records: Vec<RecordInput>,
89}
90
91impl RecordPutBatchRequest {
92    pub fn new(records: Vec<RecordInput>) -> Self {
93        Self {
94            include_write_timing: false,
95            records,
96        }
97    }
98}
99
100#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
101pub struct RecordPatchRequest {
102    pub table: String,
103    pub tenant_id: String,
104    pub id: String,
105    pub fields: Map<String, Value>,
106}
107
108impl RecordPatchRequest {
109    pub fn new(
110        table: impl Into<String>,
111        tenant_id: impl Into<String>,
112        id: impl Into<String>,
113        fields: Map<String, Value>,
114    ) -> Self {
115        Self {
116            table: table.into(),
117            tenant_id: tenant_id.into(),
118            id: id.into(),
119            fields,
120        }
121    }
122}
123
124#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
125pub struct RecordDeleteRequest {
126    pub table: String,
127    pub tenant_id: String,
128    pub id: String,
129    #[serde(default = "default_tombstone")]
130    pub tombstone: String,
131}
132
133impl RecordDeleteRequest {
134    pub fn new(
135        table: impl Into<String>,
136        tenant_id: impl Into<String>,
137        id: impl Into<String>,
138    ) -> Self {
139        Self {
140            table: table.into(),
141            tenant_id: tenant_id.into(),
142            id: id.into(),
143            tombstone: default_tombstone(),
144        }
145    }
146
147    pub fn tombstone(mut self, tombstone: impl Into<String>) -> Self {
148        self.tombstone = tombstone.into();
149        self
150    }
151}
152
153#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
154pub struct RecordGetRequest {
155    pub table: String,
156    pub tenant_id: String,
157    pub id: String,
158}
159
160impl RecordGetRequest {
161    pub fn new(
162        table: impl Into<String>,
163        tenant_id: impl Into<String>,
164        id: impl Into<String>,
165    ) -> Self {
166        Self {
167            table: table.into(),
168            tenant_id: tenant_id.into(),
169            id: id.into(),
170        }
171    }
172}
173
174#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
175pub struct RecordScanRequest {
176    pub table: String,
177    pub tenant_id: String,
178    pub limit: usize,
179    #[serde(default, skip_serializing_if = "Option::is_none")]
180    pub cursor: Option<String>,
181}
182
183impl RecordScanRequest {
184    pub fn new(table: impl Into<String>, tenant_id: impl Into<String>) -> Self {
185        Self {
186            table: table.into(),
187            tenant_id: tenant_id.into(),
188            limit: 100,
189            cursor: None,
190        }
191    }
192
193    pub fn limit(mut self, limit: usize) -> Self {
194        self.limit = limit;
195        self
196    }
197
198    pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
199        self.cursor = Some(cursor.into());
200        self
201    }
202}
203
204#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
205pub struct RecordScanOutput {
206    pub records: Vec<RecordOutput>,
207    pub returned_count: usize,
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub next_cursor: Option<String>,
210}
211
212#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
213pub struct HybridQuery {
214    pub table: String,
215    pub tenant_id: String,
216    #[serde(default, skip_serializing_if = "Option::is_none")]
217    pub cursor: Option<String>,
218    #[serde(default)]
219    pub text_field: Option<String>,
220    pub text: Option<String>,
221    #[serde(default)]
222    pub vector_field: Option<String>,
223    pub vector: Option<Vec<f32>>,
224    #[serde(default)]
225    pub scalar_eq: Map<String, Value>,
226    #[serde(default)]
227    pub graph_seed: Option<String>,
228    #[serde(default)]
229    pub temporal_as_of: Option<u64>,
230    pub top_k: usize,
231    pub freshness: FreshnessMode,
232    pub explain: bool,
233}
234
235#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
236pub struct ScoreComponents {
237    pub vector: Option<f32>,
238    pub lexical: Option<f32>,
239    pub relational: Option<f32>,
240    pub freshness_penalty: Option<f32>,
241    pub final_score: f32,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245pub struct QueryRow {
246    pub record_id: String,
247    pub version_id: u64,
248    pub tenant_id: String,
249    pub fields: Map<String, Value>,
250    pub score: ScoreComponents,
251}
252
253pub type HybridQueryRow = QueryRow;
254pub type HybridScoreComponents = ScoreComponents;
255
256#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
257pub enum FeatureFreshness {
258    Ready,
259    Dirty,
260    Pending,
261    Failed,
262    Missing,
263}
264
265#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
266pub struct Candidate {
267    pub record_id: String,
268    pub version_id: u64,
269    pub score_components: ScoreComponents,
270    pub score_upper_bound: Option<f32>,
271    pub source: String,
272    pub freshness: FeatureFreshness,
273    pub visibility_checked: bool,
274}
275
276#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
277pub struct AccessPathExplain {
278    pub access_path_id: String,
279    pub opened: bool,
280    pub visibility_checked_before_open: bool,
281    pub candidates: usize,
282}
283
284#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
285pub struct QueryPhaseTiming {
286    pub phase: String,
287    pub elapsed_ms: f64,
288}
289
290#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
291pub struct AccessPathTiming {
292    pub access_path_id: String,
293    pub build_ms: f64,
294    pub open_ms: f64,
295}
296
297#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
298pub struct ExplainOutput {
299    pub read_epoch: u64,
300    pub schema_epoch: u64,
301    pub policy_epoch: u64,
302    pub tenant_mask_visible_records: usize,
303    pub scalar_filter_applied: bool,
304    pub scalar_filter_predicates: Vec<String>,
305    pub scalar_filter_visible_records: usize,
306    pub scalar_filter_removed_records: usize,
307    pub opened_candidate_streams: Vec<String>,
308    pub access_paths: Vec<AccessPathExplain>,
309    pub planner_candidates: Vec<Candidate>,
310    pub candidate_budget: usize,
311    pub text_candidates: usize,
312    pub vector_candidates: usize,
313    pub hot_overlay_searched: bool,
314    pub freshness_mode: String,
315    pub dirty_feature_count: usize,
316    pub pending_feature_count: usize,
317    pub failed_feature_count: usize,
318    pub missing_feature_count: usize,
319    pub fusion_method: String,
320    pub deduped_candidate_count: usize,
321    pub materialized_count: usize,
322    pub final_visibility_guard_count: usize,
323    pub final_visibility_guard_removed: usize,
324    pub returned_count: usize,
325    pub segments_scanned: usize,
326    pub module_versions: Vec<String>,
327    pub selected_strategy: Option<String>,
328    pub skipped_access_paths: Vec<String>,
329    pub exact_fallback_triggered: bool,
330    pub early_stop_reason: Option<String>,
331    #[serde(default)]
332    pub lexical_cache_hits: usize,
333    #[serde(default)]
334    pub lexical_cache_misses: usize,
335    #[serde(default)]
336    pub lexical_indexed_documents: usize,
337    #[serde(default)]
338    pub lexical_scored_documents: usize,
339    #[serde(default)]
340    pub phase_timings: Vec<QueryPhaseTiming>,
341    #[serde(default)]
342    pub access_path_timings: Vec<AccessPathTiming>,
343}
344
345pub type HybridExplain = ExplainOutput;
346
347#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
348pub struct QueryOutput {
349    pub results: Vec<QueryRow>,
350    pub explain: ExplainOutput,
351    #[serde(default, skip_serializing_if = "Option::is_none")]
352    pub next_cursor: Option<String>,
353}
354
355pub type HybridQueryOutput = QueryOutput;
356
357#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
358pub struct WritePathTiming {
359    pub total_ms: f64,
360    pub lock_ms: f64,
361    pub refresh_total_ms: f64,
362    pub refresh_manifest_read_ms: f64,
363    pub refresh_wal_tail_ms: f64,
364    pub refresh_reopen_ms: f64,
365    pub refresh_performed: bool,
366    pub schema_lookup_ms: f64,
367    pub store_clone_ms: f64,
368    #[serde(default)]
369    pub store_delta_plan_ms: f64,
370    #[serde(default)]
371    pub store_delta_apply_ms: f64,
372    pub store_apply_ms: f64,
373    #[serde(default)]
374    pub store_apply_validate_identity_ms: f64,
375    #[serde(default)]
376    pub store_apply_validate_vector_ms: f64,
377    #[serde(default)]
378    pub store_apply_key_ms: f64,
379    #[serde(default)]
380    pub store_apply_fields_ms: f64,
381    #[serde(default)]
382    pub store_apply_finalize_identity_ms: f64,
383    #[serde(default)]
384    pub store_apply_features_ms: f64,
385    #[serde(default)]
386    pub store_apply_install_ms: f64,
387    pub feature_invalidation_ms: f64,
388    pub commit_build_ms: f64,
389    pub wal_total_ms: f64,
390    pub wal_lock_tail_ms: f64,
391    pub wal_frame_build_ms: f64,
392    pub wal_commit_prepare_ms: f64,
393    pub wal_serialize_ms: f64,
394    pub wal_payload_checksum_ms: f64,
395    pub wal_frame_assembly_ms: f64,
396    pub wal_payload_bytes: u64,
397    pub wal_frame_bytes: u64,
398    pub wal_write_ms: f64,
399    pub wal_sync_data_ms: f64,
400    pub wal_tail_update_ms: f64,
401    pub store_install_ms: f64,
402    pub manifest_total_ms: f64,
403    pub manifest_clone_ms: f64,
404    pub manifest_write_total_ms: f64,
405    pub manifest_bytes: u64,
406    pub manifest_checksum_ms: f64,
407    pub manifest_serialize_ms: f64,
408    pub manifest_write_ms: f64,
409    pub manifest_sync_file_ms: f64,
410    pub manifest_rename_ms: f64,
411    pub manifest_sync_dir_ms: f64,
412    pub cache_clear_ms: f64,
413}
414
415fn default_tombstone() -> String {
416    "user_delete".to_string()
417}
418
419#[derive(Clone, Debug)]
420pub enum TraceDbClientError {
421    InvalidUrl(String),
422    InvalidConfig {
423        variable: String,
424        message: String,
425    },
426    InvalidRequest {
427        method: String,
428        path: String,
429        message: String,
430    },
431    Io(String),
432    Json(String),
433    Timeout {
434        method: String,
435        path: String,
436        timeout_ms: u64,
437    },
438    InvalidResponse {
439        method: String,
440        path: String,
441        message: String,
442    },
443    HttpStatus {
444        method: String,
445        path: String,
446        status: u16,
447        body: String,
448    },
449}
450
451impl Display for TraceDbClientError {
452    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
453        match self {
454            Self::InvalidUrl(url) => write!(f, "invalid TraceDB URL {url}"),
455            Self::InvalidConfig { variable, message } => {
456                write!(f, "invalid TraceDB SDK config for {variable}: {message}")
457            }
458            Self::InvalidRequest {
459                method,
460                path,
461                message,
462            } => write!(
463                f,
464                "invalid TraceDB HTTP request for {method} {path}: {message}"
465            ),
466            Self::Io(error) => write!(f, "TraceDB HTTP I/O error: {error}"),
467            Self::Json(error) => write!(f, "TraceDB JSON error: {error}"),
468            Self::Timeout {
469                method,
470                path,
471                timeout_ms,
472            } => write!(
473                f,
474                "TraceDB HTTP request {method} {path} timed out after {timeout_ms} ms"
475            ),
476            Self::InvalidResponse {
477                method,
478                path,
479                message,
480            } => write!(
481                f,
482                "invalid TraceDB HTTP response for {method} {path}: {message}"
483            ),
484            Self::HttpStatus {
485                method,
486                path,
487                status,
488                body,
489            } => {
490                write!(
491                    f,
492                    "TraceDB HTTP request {method} {path} failed with status {status}: {body}"
493                )
494            }
495        }
496    }
497}
498
499impl Error for TraceDbClientError {
500    fn source(&self) -> Option<&(dyn Error + 'static)> {
501        match self {
502            Self::Io(_error) => None,
503            Self::Json(_error) => None,
504            Self::InvalidUrl(_)
505            | Self::InvalidConfig { .. }
506            | Self::InvalidRequest { .. }
507            | Self::Timeout { .. }
508            | Self::InvalidResponse { .. }
509            | Self::HttpStatus { .. } => None,
510        }
511    }
512}
513
514impl From<std::io::Error> for TraceDbClientError {
515    fn from(error: std::io::Error) -> Self {
516        Self::Io(error.to_string())
517    }
518}
519
520impl From<serde_json::Error> for TraceDbClientError {
521    fn from(error: serde_json::Error) -> Self {
522        Self::Json(error.to_string())
523    }
524}
525
526impl TraceDbClientError {
527    pub fn error_response(&self) -> Option<ErrorResponse> {
528        match self {
529            Self::HttpStatus { body, .. } => serde_json::from_str::<ErrorResponse>(body).ok(),
530            _ => None,
531        }
532    }
533
534    pub fn server_error(&self) -> Option<String> {
535        let Self::HttpStatus { body, .. } = self else {
536            return None;
537        };
538        serde_json::from_str::<ErrorResponse>(body)
539            .ok()
540            .map(|response| response.error)
541    }
542
543    pub fn server_error_code(&self) -> Option<String> {
544        let Self::HttpStatus { body, .. } = self else {
545            return None;
546        };
547        serde_json::from_str::<ErrorResponse>(body)
548            .ok()
549            .and_then(|response| response.code)
550    }
551}
552
553#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
554pub struct TraceDbClientConfig {
555    pub url: String,
556    pub token: String,
557    #[serde(default, skip_serializing_if = "Option::is_none")]
558    pub database_id: Option<String>,
559    #[serde(default, skip_serializing_if = "Option::is_none")]
560    pub branch_id: Option<String>,
561    #[serde(default = "default_request_timeout_ms")]
562    pub request_timeout_ms: u64,
563    #[serde(default)]
564    pub safe_retries: u8,
565    #[serde(default)]
566    pub idempotency_retries: u8,
567}
568
569impl TraceDbClientConfig {
570    pub fn managed(url: impl Into<String>, token: impl Into<String>) -> Self {
571        Self {
572            url: url.into(),
573            token: token.into(),
574            database_id: None,
575            branch_id: None,
576            request_timeout_ms: default_request_timeout_ms(),
577            safe_retries: 0,
578            idempotency_retries: 0,
579        }
580    }
581
582    pub fn from_env() -> TraceDbClientResult<Self> {
583        Self::from_env_vars(env::vars())
584    }
585
586    pub fn from_env_vars<K, V, I>(vars: I) -> TraceDbClientResult<Self>
587    where
588        K: Into<String>,
589        V: Into<String>,
590        I: IntoIterator<Item = (K, V)>,
591    {
592        let mut url = None;
593        let mut token = None;
594        let mut database_id = None;
595        let mut branch_id = None;
596        let mut timeout_ms = None;
597        let mut safe_retries = None;
598        let mut idempotency_retries = None;
599
600        for (key, value) in vars {
601            let key = key.into();
602            let value = value.into();
603            match key.as_str() {
604                "TRACEDB_URL" => url = Some(value),
605                "TRACEDB_TOKEN" => token = Some(value),
606                "TRACEDB_DATABASE_ID" => database_id = Some(value),
607                "TRACEDB_BRANCH_ID" => branch_id = Some(value),
608                "TRACEDB_TIMEOUT_MS" => timeout_ms = Some(value),
609                "TRACEDB_SAFE_RETRIES" => safe_retries = Some(value),
610                "TRACEDB_IDEMPOTENCY_RETRIES" => idempotency_retries = Some(value),
611                _ => {}
612            }
613        }
614
615        let url = required_env("TRACEDB_URL", url)?;
616        let mut config = Self::managed(url, token.unwrap_or_default());
617        if let Some(database_id) = optional_env("TRACEDB_DATABASE_ID", database_id)? {
618            config = config.with_database(database_id);
619        }
620        if let Some(branch_id) = optional_env("TRACEDB_BRANCH_ID", branch_id)? {
621            config = config.with_branch(branch_id);
622        }
623        if let Some(timeout_ms) = optional_positive_u64_env("TRACEDB_TIMEOUT_MS", timeout_ms)? {
624            config.request_timeout_ms = timeout_ms;
625        }
626        if let Some(retries) = optional_u8_env("TRACEDB_SAFE_RETRIES", safe_retries)? {
627            config.safe_retries = retries;
628        }
629        if let Some(retries) = optional_u8_env("TRACEDB_IDEMPOTENCY_RETRIES", idempotency_retries)?
630        {
631            config.idempotency_retries = retries;
632        }
633        Ok(config)
634    }
635
636    pub fn with_database(mut self, database_id: impl Into<String>) -> Self {
637        self.database_id = Some(database_id.into());
638        self
639    }
640
641    pub fn with_branch(mut self, branch_id: impl Into<String>) -> Self {
642        self.branch_id = Some(branch_id.into());
643        self
644    }
645
646    pub fn with_database_branch(
647        self,
648        database_id: impl Into<String>,
649        branch_id: impl Into<String>,
650    ) -> Self {
651        self.with_database(database_id).with_branch(branch_id)
652    }
653
654    pub fn with_timeout(mut self, timeout: Duration) -> Self {
655        self.request_timeout_ms = timeout_ms(timeout);
656        self
657    }
658
659    pub fn with_safe_retries(mut self, retries: u8) -> Self {
660        self.safe_retries = retries;
661        self
662    }
663
664    pub fn with_idempotency_retries(mut self, retries: u8) -> Self {
665        self.idempotency_retries = retries;
666        self
667    }
668
669    fn request_timeout(&self) -> Duration {
670        Duration::from_millis(self.request_timeout_ms.max(1))
671    }
672}
673
674#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
675pub struct TraceDbRequestOptions {
676    #[serde(default, skip_serializing_if = "Option::is_none")]
677    pub idempotency_key: Option<String>,
678    #[serde(default, skip_serializing_if = "Option::is_none")]
679    pub actor_context: Option<TraceDbActorContext>,
680}
681
682impl TraceDbRequestOptions {
683    pub fn new() -> Self {
684        Self::default()
685    }
686
687    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
688        self.idempotency_key = Some(key.into());
689        self
690    }
691
692    pub fn with_actor_context(mut self, actor_context: TraceDbActorContext) -> Self {
693        self.actor_context = Some(actor_context);
694        self
695    }
696}
697
698#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
699pub struct TraceDbActorContext {
700    pub tenant_id: String,
701    pub database_id: String,
702    pub branch_id: String,
703    pub token_identity: String,
704    pub request_id: String,
705    #[serde(default)]
706    pub policy_epoch: u64,
707    #[serde(default)]
708    pub scopes: Vec<String>,
709}
710
711impl TraceDbActorContext {
712    pub fn new(
713        tenant_id: impl Into<String>,
714        database_id: impl Into<String>,
715        branch_id: impl Into<String>,
716        token_identity: impl Into<String>,
717        request_id: impl Into<String>,
718    ) -> Self {
719        Self {
720            tenant_id: tenant_id.into(),
721            database_id: database_id.into(),
722            branch_id: branch_id.into(),
723            token_identity: token_identity.into(),
724            request_id: request_id.into(),
725            policy_epoch: 0,
726            scopes: Vec::new(),
727        }
728    }
729
730    pub fn with_policy_epoch(mut self, policy_epoch: u64) -> Self {
731        self.policy_epoch = policy_epoch;
732        self
733    }
734
735    pub fn with_scopes(mut self, scopes: impl IntoIterator<Item = impl Into<String>>) -> Self {
736        self.scopes = scopes.into_iter().map(Into::into).collect();
737        self
738    }
739}
740
741#[derive(Clone, Debug)]
742/// Synchronous HTTP client for TraceDB.
743pub struct TraceDbClient {
744    pub config: TraceDbClientConfig,
745}
746
747pub type TraceDb = TraceDbClient;
748
749impl TraceDbClient {
750    pub fn new(config: TraceDbClientConfig) -> Self {
751        Self { config }
752    }
753
754    pub fn connect(config: TraceDbClientConfig) -> TraceDbClientResult<Self> {
755        HttpTarget::parse(&config.url)?;
756        Ok(Self::new(config))
757    }
758
759    pub fn ready(&self) -> TraceDbClientResult<Value> {
760        self.get_json("/v1/ready")
761    }
762
763    pub fn ready_typed(&self) -> TraceDbClientResult<ReadyResponse> {
764        self.get_typed("/v1/ready")
765    }
766
767    pub fn health(&self) -> TraceDbClientResult<Value> {
768        self.get_json("/v1/health")
769    }
770
771    pub fn health_typed(&self) -> TraceDbClientResult<HealthResponse> {
772        self.get_typed("/v1/health")
773    }
774
775    pub fn list_databases(&self) -> TraceDbClientResult<Value> {
776        self.get_json("/v1/databases")
777    }
778
779    pub fn list_databases_typed(&self) -> TraceDbClientResult<DatabasesResponse> {
780        self.get_typed("/v1/databases")
781    }
782
783    pub fn list_branches(&self) -> TraceDbClientResult<Value> {
784        self.get_json("/v1/branches")
785    }
786
787    pub fn list_branches_typed(&self) -> TraceDbClientResult<BranchesResponse> {
788        self.get_typed("/v1/branches")
789    }
790
791    pub fn public_safe_metrics(&self) -> TraceDbClientResult<Value> {
792        self.get_json("/v1/metrics/public-safe")
793    }
794
795    pub fn public_safe_metrics_typed(&self) -> TraceDbClientResult<MetricsResponse> {
796        self.get_typed("/v1/metrics/public-safe")
797    }
798
799    pub fn apply_schema(&self, schema: &TableSchema) -> TraceDbClientResult<Value> {
800        self.post_json("/v1/schema/apply", schema)
801    }
802
803    pub fn apply_schema_with_options(
804        &self,
805        schema: &TableSchema,
806        options: &TraceDbRequestOptions,
807    ) -> TraceDbClientResult<Value> {
808        self.post_json_with_options("/v1/schema/apply", schema, options)
809    }
810
811    pub fn apply_schema_typed(&self, schema: &TableSchema) -> TraceDbClientResult<EpochResponse> {
812        self.post_typed("/v1/schema/apply", schema)
813    }
814
815    pub fn apply_schema_typed_with_options(
816        &self,
817        schema: &TableSchema,
818        options: &TraceDbRequestOptions,
819    ) -> TraceDbClientResult<EpochResponse> {
820        self.post_typed_with_options("/v1/schema/apply", schema, options)
821    }
822
823    pub fn put(&self, record: &RecordInput) -> TraceDbClientResult<Value> {
824        self.post_json("/v1/records/put", record)
825    }
826
827    pub fn put_with_options(
828        &self,
829        record: &RecordInput,
830        options: &TraceDbRequestOptions,
831    ) -> TraceDbClientResult<Value> {
832        self.post_json_with_options("/v1/records/put", record, options)
833    }
834
835    pub fn put_typed(&self, record: &RecordInput) -> TraceDbClientResult<EpochResponse> {
836        self.post_typed("/v1/records/put", record)
837    }
838
839    pub fn put_typed_with_options(
840        &self,
841        record: &RecordInput,
842        options: &TraceDbRequestOptions,
843    ) -> TraceDbClientResult<EpochResponse> {
844        self.post_typed_with_options("/v1/records/put", record, options)
845    }
846
847    pub fn put_batch(&self, request: &RecordPutBatchRequest) -> TraceDbClientResult<Value> {
848        self.post_json("/v1/records/put-batch", request)
849    }
850
851    pub fn put_batch_with_options(
852        &self,
853        request: &RecordPutBatchRequest,
854        options: &TraceDbRequestOptions,
855    ) -> TraceDbClientResult<Value> {
856        self.post_json_with_options("/v1/records/put-batch", request, options)
857    }
858
859    pub fn put_batch_typed(
860        &self,
861        request: &RecordPutBatchRequest,
862    ) -> TraceDbClientResult<PutBatchResponse> {
863        self.post_typed("/v1/records/put-batch", request)
864    }
865
866    pub fn put_batch_typed_with_options(
867        &self,
868        request: &RecordPutBatchRequest,
869        options: &TraceDbRequestOptions,
870    ) -> TraceDbClientResult<PutBatchResponse> {
871        self.post_typed_with_options("/v1/records/put-batch", request, options)
872    }
873
874    pub fn patch(&self, request: &RecordPatchRequest) -> TraceDbClientResult<Value> {
875        self.post_json("/v1/records/patch", request)
876    }
877
878    pub fn patch_with_options(
879        &self,
880        request: &RecordPatchRequest,
881        options: &TraceDbRequestOptions,
882    ) -> TraceDbClientResult<Value> {
883        self.post_json_with_options("/v1/records/patch", request, options)
884    }
885
886    pub fn patch_typed(&self, request: &RecordPatchRequest) -> TraceDbClientResult<EpochResponse> {
887        self.post_typed("/v1/records/patch", request)
888    }
889
890    pub fn patch_typed_with_options(
891        &self,
892        request: &RecordPatchRequest,
893        options: &TraceDbRequestOptions,
894    ) -> TraceDbClientResult<EpochResponse> {
895        self.post_typed_with_options("/v1/records/patch", request, options)
896    }
897
898    pub fn delete(&self, request: &RecordDeleteRequest) -> TraceDbClientResult<Value> {
899        self.post_json("/v1/records/delete", request)
900    }
901
902    pub fn delete_with_options(
903        &self,
904        request: &RecordDeleteRequest,
905        options: &TraceDbRequestOptions,
906    ) -> TraceDbClientResult<Value> {
907        self.post_json_with_options("/v1/records/delete", request, options)
908    }
909
910    pub fn delete_typed(
911        &self,
912        request: &RecordDeleteRequest,
913    ) -> TraceDbClientResult<DeleteResponse> {
914        self.post_typed("/v1/records/delete", request)
915    }
916
917    pub fn delete_typed_with_options(
918        &self,
919        request: &RecordDeleteRequest,
920        options: &TraceDbRequestOptions,
921    ) -> TraceDbClientResult<DeleteResponse> {
922        self.post_typed_with_options("/v1/records/delete", request, options)
923    }
924
925    pub fn get(&self, request: &RecordGetRequest) -> TraceDbClientResult<Value> {
926        self.post_json("/v1/records/get", request)
927    }
928
929    pub fn get_record_typed(
930        &self,
931        request: &RecordGetRequest,
932    ) -> TraceDbClientResult<GetRecordResponse> {
933        self.post_typed("/v1/records/get", request)
934    }
935
936    pub fn scan(&self, request: &RecordScanRequest) -> TraceDbClientResult<Value> {
937        self.post_json("/v1/records/scan", request)
938    }
939
940    pub fn scan_typed(&self, request: &RecordScanRequest) -> TraceDbClientResult<RecordScanOutput> {
941        self.post_typed("/v1/records/scan", request)
942    }
943
944    pub fn query(&self, query: &HybridQuery) -> TraceDbClientResult<Value> {
945        self.post_json("/v1/query", query)
946    }
947
948    pub fn query_typed(&self, query: &HybridQuery) -> TraceDbClientResult<QueryResponse> {
949        self.post_typed("/v1/query", query)
950    }
951
952    pub fn traceql(&self, query: impl Into<String>) -> TraceDbClientResult<Value> {
953        let request = TraceQlQueryRequest::new(query);
954        self.traceql_request(&request)
955    }
956
957    pub fn traceql_request(&self, request: &TraceQlQueryRequest) -> TraceDbClientResult<Value> {
958        self.post_json("/v1/traceql", request)
959    }
960
961    pub fn traceql_request_with_options(
962        &self,
963        request: &TraceQlQueryRequest,
964        options: &TraceDbRequestOptions,
965    ) -> TraceDbClientResult<Value> {
966        self.post_json_with_options("/v1/traceql", request, options)
967    }
968
969    pub fn traceql_typed(&self, query: impl Into<String>) -> TraceDbClientResult<QueryResponse> {
970        let request = TraceQlQueryRequest::new(query);
971        self.traceql_request_typed(&request)
972    }
973
974    pub fn traceql_request_typed(
975        &self,
976        request: &TraceQlQueryRequest,
977    ) -> TraceDbClientResult<QueryResponse> {
978        self.post_typed("/v1/traceql", request)
979    }
980
981    pub fn traceql_request_typed_with_options(
982        &self,
983        request: &TraceQlQueryRequest,
984        options: &TraceDbRequestOptions,
985    ) -> TraceDbClientResult<QueryResponse> {
986        self.post_typed_with_options("/v1/traceql", request, options)
987    }
988
989    pub fn graphql(&self, query: impl Into<String>) -> TraceDbClientResult<Value> {
990        let request = GraphQlQueryRequest::new(query);
991        self.graphql_request(&request)
992    }
993
994    pub fn graphql_request(&self, request: &GraphQlQueryRequest) -> TraceDbClientResult<Value> {
995        self.post_json("/v1/graphql", request)
996    }
997
998    pub fn graphql_request_with_options(
999        &self,
1000        request: &GraphQlQueryRequest,
1001        options: &TraceDbRequestOptions,
1002    ) -> TraceDbClientResult<Value> {
1003        self.post_json_with_options("/v1/graphql", request, options)
1004    }
1005
1006    pub fn graphql_typed(&self, query: impl Into<String>) -> TraceDbClientResult<GraphQlResponse> {
1007        let request = GraphQlQueryRequest::new(query);
1008        self.graphql_request_typed(&request)
1009    }
1010
1011    pub fn graphql_request_typed(
1012        &self,
1013        request: &GraphQlQueryRequest,
1014    ) -> TraceDbClientResult<GraphQlResponse> {
1015        self.post_typed("/v1/graphql", request)
1016    }
1017
1018    pub fn graphql_request_typed_with_options(
1019        &self,
1020        request: &GraphQlQueryRequest,
1021        options: &TraceDbRequestOptions,
1022    ) -> TraceDbClientResult<GraphQlResponse> {
1023        self.post_typed_with_options("/v1/graphql", request, options)
1024    }
1025
1026    pub fn bounded_graphql(&self, query: impl Into<String>) -> TraceDbClientResult<Value> {
1027        let request = GraphQlQueryRequest::new(query);
1028        self.bounded_graphql_request(&request)
1029    }
1030
1031    pub fn bounded_graphql_request(
1032        &self,
1033        request: &GraphQlQueryRequest,
1034    ) -> TraceDbClientResult<Value> {
1035        self.post_json("/v1/graphql/bounded", request)
1036    }
1037
1038    pub fn bounded_graphql_typed(
1039        &self,
1040        query: impl Into<String>,
1041    ) -> TraceDbClientResult<QueryResponse> {
1042        let request = GraphQlQueryRequest::new(query);
1043        self.bounded_graphql_request_typed(&request)
1044    }
1045
1046    pub fn bounded_graphql_request_typed(
1047        &self,
1048        request: &GraphQlQueryRequest,
1049    ) -> TraceDbClientResult<QueryResponse> {
1050        self.post_typed("/v1/graphql/bounded", request)
1051    }
1052
1053    pub fn graphql_schema(&self) -> TraceDbClientResult<Value> {
1054        self.get_json("/v1/graphql/schema")
1055    }
1056
1057    pub fn graphql_schema_typed(&self) -> TraceDbClientResult<GraphQlSchemaResponse> {
1058        self.get_typed("/v1/graphql/schema")
1059    }
1060
1061    pub fn explain(&self, query: &HybridQuery) -> TraceDbClientResult<Value> {
1062        self.post_json("/v1/explain", query)
1063    }
1064
1065    pub fn explain_typed(&self, query: &HybridQuery) -> TraceDbClientResult<HybridExplain> {
1066        self.post_typed("/v1/explain", query)
1067    }
1068
1069    pub fn compact(&self) -> TraceDbClientResult<Value> {
1070        self.post_json("/v1/admin/compact", &json!({}))
1071    }
1072
1073    pub fn compact_with_options(
1074        &self,
1075        options: &TraceDbRequestOptions,
1076    ) -> TraceDbClientResult<Value> {
1077        self.post_json_with_options("/v1/admin/compact", &json!({}), options)
1078    }
1079
1080    pub fn compact_typed(&self) -> TraceDbClientResult<CompactResponse> {
1081        self.post_typed("/v1/admin/compact", &json!({}))
1082    }
1083
1084    pub fn compact_typed_with_options(
1085        &self,
1086        options: &TraceDbRequestOptions,
1087    ) -> TraceDbClientResult<CompactResponse> {
1088        self.post_typed_with_options("/v1/admin/compact", &json!({}), options)
1089    }
1090
1091    pub fn list_admin_jobs(&self) -> TraceDbClientResult<Value> {
1092        self.get_json("/v1/admin/jobs")
1093    }
1094
1095    pub fn list_admin_jobs_typed(&self) -> TraceDbClientResult<JobsResponse> {
1096        self.get_typed("/v1/admin/jobs")
1097    }
1098
1099    pub fn snapshot(&self, request: &SnapshotRequest) -> TraceDbClientResult<Value> {
1100        self.post_json("/v1/admin/snapshot", request)
1101    }
1102
1103    pub fn snapshot_with_options(
1104        &self,
1105        request: &SnapshotRequest,
1106        options: &TraceDbRequestOptions,
1107    ) -> TraceDbClientResult<Value> {
1108        self.post_json_with_options("/v1/admin/snapshot", request, options)
1109    }
1110
1111    pub fn snapshot_typed(
1112        &self,
1113        request: &SnapshotRequest,
1114    ) -> TraceDbClientResult<SnapshotResponse> {
1115        self.post_typed("/v1/admin/snapshot", request)
1116    }
1117
1118    pub fn snapshot_typed_with_options(
1119        &self,
1120        request: &SnapshotRequest,
1121        options: &TraceDbRequestOptions,
1122    ) -> TraceDbClientResult<SnapshotResponse> {
1123        self.post_typed_with_options("/v1/admin/snapshot", request, options)
1124    }
1125
1126    pub fn restore(&self, request: &RestoreRequest) -> TraceDbClientResult<Value> {
1127        self.post_json("/v1/admin/restore", request)
1128    }
1129
1130    pub fn restore_with_options(
1131        &self,
1132        request: &RestoreRequest,
1133        options: &TraceDbRequestOptions,
1134    ) -> TraceDbClientResult<Value> {
1135        self.post_json_with_options("/v1/admin/restore", request, options)
1136    }
1137
1138    pub fn restore_typed(&self, request: &RestoreRequest) -> TraceDbClientResult<RestoreResponse> {
1139        self.post_typed("/v1/admin/restore", request)
1140    }
1141
1142    pub fn restore_typed_with_options(
1143        &self,
1144        request: &RestoreRequest,
1145        options: &TraceDbRequestOptions,
1146    ) -> TraceDbClientResult<RestoreResponse> {
1147        self.post_typed_with_options("/v1/admin/restore", request, options)
1148    }
1149
1150    pub fn request_json(
1151        &self,
1152        method: &str,
1153        path: &str,
1154        body: Option<&Value>,
1155    ) -> TraceDbClientResult<Value> {
1156        self.request_json_with_options(method, path, body, &TraceDbRequestOptions::default())
1157    }
1158
1159    pub fn request_json_with_options(
1160        &self,
1161        method: &str,
1162        path: &str,
1163        body: Option<&Value>,
1164        options: &TraceDbRequestOptions,
1165    ) -> TraceDbClientResult<Value> {
1166        let attempts = self.request_attempts(method, path, body, options);
1167        for attempt in 0..attempts {
1168            match self.request_json_once(method, path, body, options) {
1169                Ok(value) => return Ok(value),
1170                Err(error) if is_retryable_error(&error) && attempt + 1 < attempts => {
1171                    thread::sleep(retry_backoff_delay(attempt));
1172                }
1173                Err(error) => return Err(error),
1174            }
1175        }
1176        unreachable!("request attempts should be at least one")
1177    }
1178
1179    fn request_attempts(
1180        &self,
1181        method: &str,
1182        path: &str,
1183        body: Option<&Value>,
1184        options: &TraceDbRequestOptions,
1185    ) -> u8 {
1186        if self.config.idempotency_retries > 0
1187            && is_idempotent_retry_request(method, path)
1188            && options
1189                .idempotency_key
1190                .as_deref()
1191                .is_some_and(|key| !key.is_empty())
1192        {
1193            self.config.idempotency_retries.saturating_add(1)
1194        } else if is_retry_safe_request(method, path, body) {
1195            self.config.safe_retries.saturating_add(1)
1196        } else {
1197            1
1198        }
1199    }
1200
1201    fn request_json_once(
1202        &self,
1203        method: &str,
1204        path: &str,
1205        body: Option<&Value>,
1206        options: &TraceDbRequestOptions,
1207    ) -> TraceDbClientResult<Value> {
1208        let target = HttpTarget::parse(&self.config.url)?;
1209        let request_path = target.path(path);
1210        let body_bytes = self.request_body_bytes(body)?;
1211        let timeout = self.config.request_timeout();
1212        let idempotency_key_header = idempotency_key_header(method, &request_path, options)?;
1213        let mut stream = target.connect(method, &request_path, timeout)?;
1214        let mut request = format!(
1215            "{method} {request_path} HTTP/1.1\r\nHost: {}\r\nAccept: application/json\r\nConnection: close\r\nContent-Length: {}\r\nUser-Agent: {NAME}/{VERSION}\r\n",
1216            target.authority,
1217            body_bytes.len()
1218        );
1219        if !self.config.token.is_empty() {
1220            request.push_str(&format!("Authorization: Bearer {}\r\n", self.config.token));
1221        }
1222        request.push_str(&idempotency_key_header);
1223        request.push_str(&self.actor_headers(options)?);
1224        if !body_bytes.is_empty() {
1225            request.push_str("Content-Type: application/json\r\n");
1226        }
1227        request.push_str("\r\n");
1228        stream
1229            .write_all(request.as_bytes())
1230            .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1231        if !body_bytes.is_empty() {
1232            stream
1233                .write_all(&body_bytes)
1234                .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1235        }
1236        stream
1237            .flush()
1238            .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1239        let mut response = String::new();
1240        stream
1241            .read_to_string(&mut response)
1242            .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1243        if response.is_empty() {
1244            return Err(TraceDbClientError::Io(
1245                "connection closed before response".to_string(),
1246            ));
1247        }
1248        parse_response(method, &request_path, &response)
1249    }
1250
1251    pub fn table(&self, table: impl Into<String>) -> TableHandle {
1252        QueryBuilder {
1253            client_config: Some(self.config.clone()),
1254            table: table.into(),
1255            tenant_id: None,
1256            text_field: None,
1257            text_query: None,
1258            vector_field: None,
1259            vector: None,
1260            scalar_eq: Map::new(),
1261            freshness: FeatureFreshnessMode::Strict,
1262            limit: 10,
1263            cursor: None,
1264            explain: true,
1265        }
1266    }
1267
1268    fn get_json(&self, path: &str) -> TraceDbClientResult<Value> {
1269        self.request_json("GET", path, None)
1270    }
1271
1272    fn get_typed<T: for<'de> Deserialize<'de>>(&self, path: &str) -> TraceDbClientResult<T> {
1273        decode_typed("GET", path, self.get_json(path)?)
1274    }
1275
1276    fn post_json<T: Serialize>(&self, path: &str, body: &T) -> TraceDbClientResult<Value> {
1277        let value = serde_json::to_value(body)?;
1278        self.request_json("POST", path, Some(&value))
1279    }
1280
1281    fn post_json_with_options<T: Serialize>(
1282        &self,
1283        path: &str,
1284        body: &T,
1285        options: &TraceDbRequestOptions,
1286    ) -> TraceDbClientResult<Value> {
1287        let value = serde_json::to_value(body)?;
1288        self.request_json_with_options("POST", path, Some(&value), options)
1289    }
1290
1291    fn post_typed<T: Serialize, R: for<'de> Deserialize<'de>>(
1292        &self,
1293        path: &str,
1294        body: &T,
1295    ) -> TraceDbClientResult<R> {
1296        decode_typed("POST", path, self.post_json(path, body)?)
1297    }
1298
1299    fn post_typed_with_options<T: Serialize, R: for<'de> Deserialize<'de>>(
1300        &self,
1301        path: &str,
1302        body: &T,
1303        options: &TraceDbRequestOptions,
1304    ) -> TraceDbClientResult<R> {
1305        decode_typed(
1306            "POST",
1307            path,
1308            self.post_json_with_options(path, body, options)?,
1309        )
1310    }
1311
1312    fn request_body_bytes(&self, body: Option<&Value>) -> TraceDbClientResult<Vec<u8>> {
1313        let Some(body) = body else {
1314            return Ok(Vec::new());
1315        };
1316        let mut body = body.clone();
1317        self.inject_route_metadata(&mut body);
1318        Ok(serde_json::to_vec(&body)?)
1319    }
1320
1321    fn inject_route_metadata(&self, body: &mut Value) {
1322        let Value::Object(body) = body else {
1323            return;
1324        };
1325        if let Some(database_id) = &self.config.database_id {
1326            body.entry("database_id".to_string())
1327                .or_insert_with(|| Value::String(database_id.clone()));
1328        }
1329        if !body.contains_key("branch_id") {
1330            let branch_id = self.config.branch_id.clone().or_else(|| {
1331                self.config.database_id.as_ref().and_then(|_| {
1332                    body.get("database_id")
1333                        .and_then(Value::as_str)
1334                        .map(|database_id| format!("{database_id}:main"))
1335                })
1336            });
1337            if let Some(branch_id) = branch_id {
1338                body.insert("branch_id".to_string(), Value::String(branch_id));
1339            }
1340        }
1341    }
1342
1343    fn actor_headers(&self, options: &TraceDbRequestOptions) -> TraceDbClientResult<String> {
1344        let mut headers = String::new();
1345        for (name, value) in self.actor_header_pairs(options)? {
1346            headers.push_str(&header_line(name, &value)?);
1347        }
1348        Ok(headers)
1349    }
1350
1351    fn actor_header_pairs(
1352        &self,
1353        options: &TraceDbRequestOptions,
1354    ) -> TraceDbClientResult<Vec<(&'static str, String)>> {
1355        let mut headers = Vec::new();
1356        if let Some(actor) = &options.actor_context {
1357            headers.push(("x-tracedb-tenant-id", actor.tenant_id.clone()));
1358            headers.push(("x-tracedb-database-id", actor.database_id.clone()));
1359            headers.push(("x-tracedb-branch-id", actor.branch_id.clone()));
1360            headers.push(("x-tracedb-token-identity", actor.token_identity.clone()));
1361            headers.push(("x-tracedb-request-id", actor.request_id.clone()));
1362            headers.push(("x-tracedb-policy-epoch", actor.policy_epoch.to_string()));
1363            if !actor.scopes.is_empty() {
1364                headers.push(("x-tracedb-scopes", actor.scopes.join(",")));
1365            }
1366        } else {
1367            if let Some(database_id) = &self.config.database_id {
1368                headers.push(("x-tracedb-database-id", database_id.clone()));
1369            }
1370            if let Some(branch_id) = &self.config.branch_id {
1371                headers.push(("x-tracedb-branch-id", branch_id.clone()));
1372            }
1373        }
1374        for (name, value) in &headers {
1375            validate_header_value(name, value)?;
1376        }
1377        Ok(headers)
1378    }
1379}
1380
1381#[derive(Clone, Debug)]
1382/// Asynchronous HTTP client for TraceDB.
1383pub struct TraceDbAsyncClient {
1384    inner: TraceDbClient,
1385    http_client: reqwest::Client,
1386}
1387
1388impl TraceDbAsyncClient {
1389    pub fn new(config: TraceDbClientConfig) -> Self {
1390        let http_client = reqwest::Client::builder()
1391            .pool_max_idle_per_host(16)
1392            .build()
1393            .expect("TraceDB async HTTP client configuration is valid");
1394        Self {
1395            inner: TraceDbClient::new(config),
1396            http_client,
1397        }
1398    }
1399
1400    pub fn from_blocking(client: TraceDbClient) -> Self {
1401        let http_client = reqwest::Client::builder()
1402            .pool_max_idle_per_host(16)
1403            .build()
1404            .expect("TraceDB async HTTP client configuration is valid");
1405        Self {
1406            inner: client,
1407            http_client,
1408        }
1409    }
1410
1411    pub fn blocking_client(&self) -> &TraceDbClient {
1412        &self.inner
1413    }
1414
1415    pub async fn request_json(
1416        &self,
1417        method: &str,
1418        path: &str,
1419        body: Option<&Value>,
1420    ) -> TraceDbClientResult<Value> {
1421        self.request_json_with_options(method, path, body, &TraceDbRequestOptions::default())
1422            .await
1423    }
1424
1425    pub async fn request_json_with_options(
1426        &self,
1427        method: &str,
1428        path: &str,
1429        body: Option<&Value>,
1430        options: &TraceDbRequestOptions,
1431    ) -> TraceDbClientResult<Value> {
1432        let attempts = self.inner.request_attempts(method, path, body, options);
1433        for attempt in 0..attempts {
1434            match self.request_json_once(method, path, body, options).await {
1435                Ok(value) => return Ok(value),
1436                Err(error) if is_retryable_error(&error) && attempt + 1 < attempts => {
1437                    tokio::time::sleep(retry_backoff_delay(attempt)).await;
1438                }
1439                Err(error) => return Err(error),
1440            }
1441        }
1442        unreachable!("request attempts should be at least one")
1443    }
1444
1445    pub async fn ready(&self) -> TraceDbClientResult<Value> {
1446        self.request_json("GET", "/v1/ready", None).await
1447    }
1448
1449    pub async fn ready_typed(&self) -> TraceDbClientResult<ReadyResponse> {
1450        self.get_typed("/v1/ready").await
1451    }
1452
1453    pub async fn health(&self) -> TraceDbClientResult<Value> {
1454        self.request_json("GET", "/v1/health", None).await
1455    }
1456
1457    pub async fn health_typed(&self) -> TraceDbClientResult<HealthResponse> {
1458        self.get_typed("/v1/health").await
1459    }
1460
1461    pub async fn list_databases_typed(&self) -> TraceDbClientResult<DatabasesResponse> {
1462        self.get_typed("/v1/databases").await
1463    }
1464
1465    pub async fn list_branches_typed(&self) -> TraceDbClientResult<BranchesResponse> {
1466        self.get_typed("/v1/branches").await
1467    }
1468
1469    pub async fn public_safe_metrics_typed(&self) -> TraceDbClientResult<MetricsResponse> {
1470        self.get_typed("/v1/metrics/public-safe").await
1471    }
1472
1473    pub async fn list_admin_jobs_typed(&self) -> TraceDbClientResult<JobsResponse> {
1474        self.get_typed("/v1/admin/jobs").await
1475    }
1476
1477    pub async fn apply_schema_typed(
1478        &self,
1479        schema: &TableSchema,
1480    ) -> TraceDbClientResult<EpochResponse> {
1481        self.post_typed("/v1/schema/apply", schema).await
1482    }
1483
1484    pub async fn apply_schema_typed_with_options(
1485        &self,
1486        schema: &TableSchema,
1487        options: &TraceDbRequestOptions,
1488    ) -> TraceDbClientResult<EpochResponse> {
1489        self.post_typed_with_options("/v1/schema/apply", schema, options)
1490            .await
1491    }
1492
1493    pub async fn put_typed(&self, record: &RecordInput) -> TraceDbClientResult<EpochResponse> {
1494        self.post_typed("/v1/records/put", record).await
1495    }
1496
1497    pub async fn put_typed_with_options(
1498        &self,
1499        record: &RecordInput,
1500        options: &TraceDbRequestOptions,
1501    ) -> TraceDbClientResult<EpochResponse> {
1502        self.post_typed_with_options("/v1/records/put", record, options)
1503            .await
1504    }
1505
1506    pub async fn put_batch_typed(
1507        &self,
1508        request: &RecordPutBatchRequest,
1509    ) -> TraceDbClientResult<PutBatchResponse> {
1510        self.post_typed("/v1/records/put-batch", request).await
1511    }
1512
1513    pub async fn put_batch_typed_with_options(
1514        &self,
1515        request: &RecordPutBatchRequest,
1516        options: &TraceDbRequestOptions,
1517    ) -> TraceDbClientResult<PutBatchResponse> {
1518        self.post_typed_with_options("/v1/records/put-batch", request, options)
1519            .await
1520    }
1521
1522    pub async fn patch_typed(
1523        &self,
1524        request: &RecordPatchRequest,
1525    ) -> TraceDbClientResult<EpochResponse> {
1526        self.post_typed("/v1/records/patch", request).await
1527    }
1528
1529    pub async fn patch_typed_with_options(
1530        &self,
1531        request: &RecordPatchRequest,
1532        options: &TraceDbRequestOptions,
1533    ) -> TraceDbClientResult<EpochResponse> {
1534        self.post_typed_with_options("/v1/records/patch", request, options)
1535            .await
1536    }
1537
1538    pub async fn delete_typed(
1539        &self,
1540        request: &RecordDeleteRequest,
1541    ) -> TraceDbClientResult<DeleteResponse> {
1542        self.post_typed("/v1/records/delete", request).await
1543    }
1544
1545    pub async fn delete_typed_with_options(
1546        &self,
1547        request: &RecordDeleteRequest,
1548        options: &TraceDbRequestOptions,
1549    ) -> TraceDbClientResult<DeleteResponse> {
1550        self.post_typed_with_options("/v1/records/delete", request, options)
1551            .await
1552    }
1553
1554    pub async fn get_record_typed(
1555        &self,
1556        request: &RecordGetRequest,
1557    ) -> TraceDbClientResult<GetRecordResponse> {
1558        self.post_typed("/v1/records/get", request).await
1559    }
1560
1561    pub async fn scan_typed(
1562        &self,
1563        request: &RecordScanRequest,
1564    ) -> TraceDbClientResult<RecordScanOutput> {
1565        self.post_typed("/v1/records/scan", request).await
1566    }
1567
1568    pub async fn query_typed(&self, query: &HybridQuery) -> TraceDbClientResult<QueryResponse> {
1569        self.post_typed("/v1/query", query).await
1570    }
1571
1572    pub async fn traceql_typed(
1573        &self,
1574        query: impl Into<String>,
1575    ) -> TraceDbClientResult<QueryResponse> {
1576        let request = TraceQlQueryRequest::new(query);
1577        self.post_typed("/v1/traceql", &request).await
1578    }
1579
1580    pub async fn graphql_typed(
1581        &self,
1582        query: impl Into<String>,
1583    ) -> TraceDbClientResult<GraphQlResponse> {
1584        let request = GraphQlQueryRequest::new(query);
1585        self.post_typed("/v1/graphql", &request).await
1586    }
1587
1588    pub async fn bounded_graphql_typed(
1589        &self,
1590        query: impl Into<String>,
1591    ) -> TraceDbClientResult<QueryResponse> {
1592        let request = GraphQlQueryRequest::new(query);
1593        self.post_typed("/v1/graphql/bounded", &request).await
1594    }
1595
1596    pub async fn graphql_schema_typed(&self) -> TraceDbClientResult<GraphQlSchemaResponse> {
1597        self.get_typed("/v1/graphql/schema").await
1598    }
1599
1600    pub async fn explain_typed(&self, query: &HybridQuery) -> TraceDbClientResult<HybridExplain> {
1601        self.post_typed("/v1/explain", query).await
1602    }
1603
1604    pub async fn compact_typed(&self) -> TraceDbClientResult<CompactResponse> {
1605        self.post_typed("/v1/admin/compact", &json!({})).await
1606    }
1607
1608    pub async fn compact_typed_with_options(
1609        &self,
1610        options: &TraceDbRequestOptions,
1611    ) -> TraceDbClientResult<CompactResponse> {
1612        self.post_typed_with_options("/v1/admin/compact", &json!({}), options)
1613            .await
1614    }
1615
1616    pub async fn snapshot_typed(
1617        &self,
1618        request: &SnapshotRequest,
1619    ) -> TraceDbClientResult<SnapshotResponse> {
1620        self.post_typed("/v1/admin/snapshot", request).await
1621    }
1622
1623    pub async fn snapshot_typed_with_options(
1624        &self,
1625        request: &SnapshotRequest,
1626        options: &TraceDbRequestOptions,
1627    ) -> TraceDbClientResult<SnapshotResponse> {
1628        self.post_typed_with_options("/v1/admin/snapshot", request, options)
1629            .await
1630    }
1631
1632    pub async fn restore_typed(
1633        &self,
1634        request: &RestoreRequest,
1635    ) -> TraceDbClientResult<RestoreResponse> {
1636        self.post_typed("/v1/admin/restore", request).await
1637    }
1638
1639    pub async fn restore_typed_with_options(
1640        &self,
1641        request: &RestoreRequest,
1642        options: &TraceDbRequestOptions,
1643    ) -> TraceDbClientResult<RestoreResponse> {
1644        self.post_typed_with_options("/v1/admin/restore", request, options)
1645            .await
1646    }
1647
1648    async fn get_typed<T: for<'de> Deserialize<'de>>(&self, path: &str) -> TraceDbClientResult<T> {
1649        decode_typed("GET", path, self.request_json("GET", path, None).await?)
1650    }
1651
1652    async fn post_typed<B, R>(&self, path: &str, body: &B) -> TraceDbClientResult<R>
1653    where
1654        B: Serialize,
1655        R: for<'de> Deserialize<'de>,
1656    {
1657        let value = serde_json::to_value(body)?;
1658        decode_typed(
1659            "POST",
1660            path,
1661            self.request_json("POST", path, Some(&value)).await?,
1662        )
1663    }
1664
1665    async fn post_typed_with_options<B, R>(
1666        &self,
1667        path: &str,
1668        body: &B,
1669        options: &TraceDbRequestOptions,
1670    ) -> TraceDbClientResult<R>
1671    where
1672        B: Serialize,
1673        R: for<'de> Deserialize<'de>,
1674    {
1675        let value = serde_json::to_value(body)?;
1676        decode_typed(
1677            "POST",
1678            path,
1679            self.request_json_with_options("POST", path, Some(&value), options)
1680                .await?,
1681        )
1682    }
1683
1684    async fn request_json_once(
1685        &self,
1686        method: &str,
1687        path: &str,
1688        body: Option<&Value>,
1689        options: &TraceDbRequestOptions,
1690    ) -> TraceDbClientResult<Value> {
1691        let target = HttpTarget::parse(&self.inner.config.url)?;
1692        let request_path = target.path(path);
1693        let body_bytes = self.inner.request_body_bytes(body)?;
1694        let timeout = self.inner.config.request_timeout();
1695        let method_value = reqwest::Method::from_bytes(method.as_bytes()).map_err(|error| {
1696            TraceDbClientError::InvalidRequest {
1697                method: method.to_string(),
1698                path: request_path.clone(),
1699                message: format!("invalid HTTP method: {error}"),
1700            }
1701        })?;
1702        let url = format!("http://{}{}", target.authority, request_path);
1703        let mut request = self
1704            .http_client
1705            .request(method_value, url)
1706            .timeout(timeout)
1707            .header(reqwest::header::ACCEPT, "application/json")
1708            .header(
1709                reqwest::header::CONTENT_LENGTH,
1710                body_bytes.len().to_string(),
1711            )
1712            .header("User-Agent", format!("{NAME}/{VERSION}"));
1713        if !self.inner.config.token.is_empty() {
1714            request = request.bearer_auth(&self.inner.config.token);
1715        }
1716        if let Some(key) = validated_idempotency_key(method, &request_path, options)? {
1717            request = request.header("Idempotency-Key", key);
1718        }
1719        for (name, value) in self.inner.actor_header_pairs(options)? {
1720            request = request.header(name, value);
1721        }
1722        if !body_bytes.is_empty() {
1723            request = request.header(reqwest::header::CONTENT_TYPE, "application/json");
1724        }
1725        let response = request
1726            .body(body_bytes)
1727            .send()
1728            .await
1729            .map_err(|error| map_reqwest_error(method, &request_path, timeout, error))?;
1730        let status = response.status().as_u16();
1731        let bytes = response
1732            .bytes()
1733            .await
1734            .map_err(|error| map_reqwest_error(method, &request_path, timeout, error))?;
1735        if !(200..300).contains(&status) {
1736            return Err(TraceDbClientError::HttpStatus {
1737                method: method.to_string(),
1738                path: request_path,
1739                status,
1740                body: String::from_utf8_lossy(&bytes).to_string(),
1741            });
1742        }
1743        if bytes.iter().all(u8::is_ascii_whitespace) || bytes.is_empty() {
1744            return Ok(Value::Null);
1745        }
1746        serde_json::from_slice(&bytes).map_err(|error| TraceDbClientError::InvalidResponse {
1747            method: method.to_string(),
1748            path: request_path,
1749            message: format!("invalid JSON body: {error}"),
1750        })
1751    }
1752}
1753
1754#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1755pub struct ReadyResponse {
1756    #[serde(default)]
1757    pub ok: Option<bool>,
1758    pub ready: bool,
1759    #[serde(default)]
1760    pub service: Option<String>,
1761    #[serde(default)]
1762    pub latest_epoch: Option<u64>,
1763    #[serde(default)]
1764    pub durable_epoch: Option<u64>,
1765    #[serde(default)]
1766    pub recovery_state: Option<String>,
1767    #[serde(default)]
1768    pub engine_url: Option<String>,
1769    #[serde(default)]
1770    pub engine_health_checked: Option<bool>,
1771    #[serde(default)]
1772    pub engine_status_code: Option<u16>,
1773    #[serde(default)]
1774    pub catalog_databases: Option<u64>,
1775    #[serde(default)]
1776    pub metered_requests: Option<u64>,
1777    #[serde(default)]
1778    pub error: Option<String>,
1779}
1780
1781#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1782pub struct HealthResponse {
1783    pub ok: bool,
1784    #[serde(default)]
1785    pub service: Option<String>,
1786    #[serde(default)]
1787    pub engine_url: Option<String>,
1788    #[serde(default)]
1789    pub catalog_databases: Option<u64>,
1790    #[serde(default)]
1791    pub metered_requests: Option<u64>,
1792}
1793
1794#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1795pub struct DatabaseSummary {
1796    pub database_id: String,
1797    #[serde(default)]
1798    pub org_id: Option<String>,
1799    #[serde(default)]
1800    pub project_id: Option<String>,
1801    #[serde(default)]
1802    pub name: Option<String>,
1803    #[serde(default)]
1804    pub region: Option<String>,
1805    #[serde(default)]
1806    pub endpoint: Option<String>,
1807}
1808
1809#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1810pub struct DatabasesResponse {
1811    pub databases: Vec<DatabaseSummary>,
1812    #[serde(default)]
1813    pub gateway: Option<bool>,
1814    #[serde(default)]
1815    pub mode: Option<String>,
1816}
1817
1818#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1819pub struct BranchSummary {
1820    pub branch_id: String,
1821    #[serde(default)]
1822    pub database_id: Option<String>,
1823    #[serde(default)]
1824    pub parent_branch_id: Option<String>,
1825    #[serde(default)]
1826    pub state: Option<String>,
1827    #[serde(default)]
1828    pub endpoint: Option<String>,
1829    #[serde(default)]
1830    pub latest_epoch: Option<u64>,
1831}
1832
1833#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1834pub struct BranchesResponse {
1835    pub branches: Vec<BranchSummary>,
1836    #[serde(default)]
1837    pub gateway: Option<bool>,
1838}
1839
1840#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1841pub struct MetricsResponse {
1842    #[serde(default)]
1843    pub gateway: Option<bool>,
1844    #[serde(default)]
1845    pub service: Option<String>,
1846    #[serde(default)]
1847    pub latest_epoch: Option<u64>,
1848    #[serde(default)]
1849    pub durable_epoch: Option<u64>,
1850    #[serde(default)]
1851    pub segment_count: Option<usize>,
1852    #[serde(default)]
1853    pub index_count: Option<usize>,
1854    #[serde(default)]
1855    pub module_count: Option<usize>,
1856    #[serde(default)]
1857    pub schema_count: Option<usize>,
1858    #[serde(default)]
1859    pub recovery_state: Option<String>,
1860    #[serde(default)]
1861    pub requests: Option<u64>,
1862    #[serde(default)]
1863    pub rate_limit_enabled: Option<bool>,
1864    #[serde(default)]
1865    pub rate_limit_requests: Option<u64>,
1866}
1867
1868#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1869pub struct ErrorResponse {
1870    pub error: String,
1871    #[serde(default, skip_serializing_if = "Option::is_none")]
1872    pub code: Option<String>,
1873}
1874
1875#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1876pub struct EpochResponse {
1877    pub epoch: u64,
1878}
1879
1880#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1881pub struct PutBatchResponse {
1882    pub epoch: u64,
1883    pub record_count: usize,
1884    #[serde(default)]
1885    pub write_timing: Option<WritePathTiming>,
1886}
1887
1888#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1889pub struct DeleteResponse {
1890    pub deleted: bool,
1891    pub epoch: u64,
1892}
1893
1894#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1895pub struct GetRecordResponse {
1896    pub record: Option<RecordOutput>,
1897}
1898
1899#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1900pub struct QueryResponse {
1901    pub results: Vec<HybridQueryRow>,
1902    #[serde(default)]
1903    pub explain: Option<HybridExplain>,
1904    #[serde(default, skip_serializing_if = "Option::is_none")]
1905    pub next_cursor: Option<String>,
1906}
1907
1908#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1909pub struct TraceQlQueryRequest {
1910    pub query: String,
1911}
1912
1913impl TraceQlQueryRequest {
1914    pub fn new(query: impl Into<String>) -> Self {
1915        Self {
1916            query: query.into(),
1917        }
1918    }
1919
1920    pub fn command<T: Serialize>(
1921        command: impl AsRef<str>,
1922        payload: &T,
1923    ) -> TraceDbClientResult<Self> {
1924        Ok(Self {
1925            query: format!("{} {}", command.as_ref(), serde_json::to_string(payload)?),
1926        })
1927    }
1928
1929    pub fn schema_apply(schema: &TableSchema) -> TraceDbClientResult<Self> {
1930        Self::command("SCHEMA APPLY", schema)
1931    }
1932
1933    pub fn put(record: &RecordInput) -> TraceDbClientResult<Self> {
1934        Self::command("RECORD PUT", record)
1935    }
1936
1937    pub fn batch(request: &RecordPutBatchRequest) -> TraceDbClientResult<Self> {
1938        Self::command("RECORD BATCH", request)
1939    }
1940
1941    pub fn patch(request: &RecordPatchRequest) -> TraceDbClientResult<Self> {
1942        Self::command("RECORD PATCH", request)
1943    }
1944
1945    pub fn delete(request: &RecordDeleteRequest) -> TraceDbClientResult<Self> {
1946        Self::command("RECORD DELETE", request)
1947    }
1948
1949    pub fn get(request: &RecordGetRequest) -> TraceDbClientResult<Self> {
1950        Self::command("RECORD GET", request)
1951    }
1952
1953    pub fn scan(request: &RecordScanRequest) -> TraceDbClientResult<Self> {
1954        Self::command("RECORD SCAN", request)
1955    }
1956
1957    pub fn query(query: &HybridQuery) -> TraceDbClientResult<Self> {
1958        Self::command("QUERY", query)
1959    }
1960
1961    pub fn explain(query: &HybridQuery) -> TraceDbClientResult<Self> {
1962        Self::command("EXPLAIN", query)
1963    }
1964
1965    pub fn jobs_list() -> Self {
1966        Self {
1967            query: "JOBS LIST".to_string(),
1968        }
1969    }
1970}
1971
1972#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1973pub struct GraphQlQueryRequest {
1974    pub query: String,
1975    #[serde(default, skip_serializing_if = "Value::is_null")]
1976    pub variables: Value,
1977    #[serde(
1978        default,
1979        skip_serializing_if = "Option::is_none",
1980        alias = "operationName"
1981    )]
1982    pub operation_name: Option<String>,
1983}
1984
1985impl GraphQlQueryRequest {
1986    pub fn new(query: impl Into<String>) -> Self {
1987        Self {
1988            query: query.into(),
1989            variables: Value::Null,
1990            operation_name: None,
1991        }
1992    }
1993
1994    pub fn with_variables(mut self, variables: Value) -> Self {
1995        self.variables = variables;
1996        self
1997    }
1998
1999    pub fn with_operation_name(mut self, operation_name: impl Into<String>) -> Self {
2000        self.operation_name = Some(operation_name.into());
2001        self
2002    }
2003}
2004
2005#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2006pub struct GraphQlResponse {
2007    #[serde(default)]
2008    pub data: Value,
2009    #[serde(default, skip_serializing_if = "Vec::is_empty")]
2010    pub errors: Vec<GraphQlError>,
2011}
2012
2013#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2014pub struct GraphQlError {
2015    pub message: String,
2016    #[serde(default, skip_serializing_if = "Option::is_none")]
2017    pub path: Option<Value>,
2018    #[serde(default, skip_serializing_if = "Option::is_none")]
2019    pub extensions: Option<Value>,
2020}
2021
2022#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2023pub struct GraphQlSchemaResponse {
2024    pub adapter: String,
2025    pub schema: String,
2026    pub tables: Vec<String>,
2027    #[serde(alias = "execution_caveat")]
2028    pub execution: String,
2029}
2030
2031#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2032pub struct CompactResponse {
2033    pub compacted: bool,
2034}
2035
2036#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2037pub struct SnapshotRequest {
2038    pub target: String,
2039}
2040
2041impl SnapshotRequest {
2042    pub fn new(target: impl Into<String>) -> Self {
2043        Self {
2044            target: target.into(),
2045        }
2046    }
2047}
2048
2049#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2050pub struct SnapshotResponse {
2051    pub snapshot: bool,
2052    pub target: String,
2053}
2054
2055#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2056pub struct RestoreRequest {
2057    pub source: String,
2058    pub target: String,
2059    #[serde(skip_serializing_if = "Option::is_none")]
2060    pub verify_record: Option<RecordGetRequest>,
2061}
2062
2063impl RestoreRequest {
2064    pub fn new(source: impl Into<String>, target: impl Into<String>) -> Self {
2065        Self {
2066            source: source.into(),
2067            target: target.into(),
2068            verify_record: None,
2069        }
2070    }
2071
2072    pub fn verify_record(mut self, request: RecordGetRequest) -> Self {
2073        self.verify_record = Some(request);
2074        self
2075    }
2076}
2077
2078#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2079pub struct RestoreResponse {
2080    pub restored: bool,
2081    pub source: String,
2082    pub target: String,
2083    #[serde(default)]
2084    pub verification: Option<RestoreVerification>,
2085}
2086
2087#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2088pub struct RestoreVerification {
2089    pub status: String,
2090    pub record_visible: bool,
2091    #[serde(default)]
2092    pub request: Option<RecordGetRequest>,
2093    #[serde(default)]
2094    pub record: Option<RecordOutput>,
2095}
2096
2097#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2098pub struct AdminJob {
2099    pub queue: String,
2100    pub state: String,
2101}
2102
2103#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2104pub struct JobsResponse {
2105    pub jobs: Vec<AdminJob>,
2106}
2107
2108#[derive(Clone, Debug, Eq, PartialEq)]
2109struct HttpTarget {
2110    authority: String,
2111    host: String,
2112    port: u16,
2113    base_path: String,
2114}
2115
2116impl HttpTarget {
2117    fn parse(url: &str) -> TraceDbClientResult<Self> {
2118        let without_scheme = url
2119            .strip_prefix("http://")
2120            .ok_or_else(|| TraceDbClientError::InvalidUrl(url.to_string()))?;
2121        let (authority, base_path) = without_scheme
2122            .split_once('/')
2123            .map(|(authority, path)| (authority, format!("/{path}")))
2124            .unwrap_or((without_scheme, String::new()));
2125        if authority.is_empty() {
2126            return Err(TraceDbClientError::InvalidUrl(url.to_string()));
2127        }
2128        let (host, port) = if let Some((host, port)) = authority.rsplit_once(':') {
2129            let parsed_port = port
2130                .parse::<u16>()
2131                .map_err(|_| TraceDbClientError::InvalidUrl(url.to_string()))?;
2132            (host.to_string(), parsed_port)
2133        } else {
2134            (authority.to_string(), 80)
2135        };
2136        if host.is_empty() {
2137            return Err(TraceDbClientError::InvalidUrl(url.to_string()));
2138        }
2139        Ok(Self {
2140            authority: authority.to_string(),
2141            host,
2142            port,
2143            base_path,
2144        })
2145    }
2146
2147    fn connect(
2148        &self,
2149        method: &str,
2150        path: &str,
2151        timeout: Duration,
2152    ) -> TraceDbClientResult<TcpStream> {
2153        let socket_addr = self.socket_addr(method, path, timeout)?;
2154        let stream = TcpStream::connect_timeout(&socket_addr, timeout)
2155            .map_err(|error| map_request_io_error(method, path, timeout, error))?;
2156        stream
2157            .set_read_timeout(Some(timeout))
2158            .map_err(|error| map_request_io_error(method, path, timeout, error))?;
2159        stream
2160            .set_write_timeout(Some(timeout))
2161            .map_err(|error| map_request_io_error(method, path, timeout, error))?;
2162        Ok(stream)
2163    }
2164
2165    fn socket_addr(
2166        &self,
2167        method: &str,
2168        path: &str,
2169        timeout: Duration,
2170    ) -> TraceDbClientResult<SocketAddr> {
2171        (self.host.as_str(), self.port)
2172            .to_socket_addrs()
2173            .map_err(|error| map_request_io_error(method, path, timeout, error))?
2174            .next()
2175            .ok_or_else(|| TraceDbClientError::InvalidUrl(self.authority.clone()))
2176    }
2177
2178    fn path(&self, path: &str) -> String {
2179        if self.base_path.is_empty() {
2180            path.to_string()
2181        } else {
2182            format!(
2183                "{}/{}",
2184                self.base_path.trim_end_matches('/'),
2185                path.trim_start_matches('/')
2186            )
2187        }
2188    }
2189}
2190
2191fn default_request_timeout_ms() -> u64 {
2192    30_000
2193}
2194
2195fn timeout_ms(timeout: Duration) -> u64 {
2196    timeout.as_millis().clamp(1, u64::MAX as u128) as u64
2197}
2198
2199fn retry_backoff_delay(attempt: u8) -> Duration {
2200    let shift = u32::from(attempt).min(16);
2201    let base_ms = 100_u64.saturating_mul(1_u64 << shift).min(5_000);
2202    let jitter_quarter = base_ms / 4;
2203    let jitter_range = jitter_quarter.saturating_mul(2).saturating_add(1);
2204    let jitter_offset = SystemTime::now()
2205        .duration_since(UNIX_EPOCH)
2206        .unwrap_or_default()
2207        .subsec_nanos() as u64
2208        % jitter_range;
2209    let delay_ms = base_ms
2210        .saturating_sub(jitter_quarter)
2211        .saturating_add(jitter_offset)
2212        .clamp(1, 5_000);
2213    Duration::from_millis(delay_ms)
2214}
2215
2216fn required_env(variable: &str, value: Option<String>) -> TraceDbClientResult<String> {
2217    match value {
2218        Some(value) if !value.trim().is_empty() => Ok(value),
2219        _ => Err(TraceDbClientError::InvalidConfig {
2220            variable: variable.to_string(),
2221            message: format!("{variable} is required"),
2222        }),
2223    }
2224}
2225
2226fn optional_env(variable: &str, value: Option<String>) -> TraceDbClientResult<Option<String>> {
2227    match value {
2228        Some(value) if value.trim().is_empty() => Err(TraceDbClientError::InvalidConfig {
2229            variable: variable.to_string(),
2230            message: format!("{variable} must not be empty when set"),
2231        }),
2232        Some(value) => Ok(Some(value)),
2233        None => Ok(None),
2234    }
2235}
2236
2237fn optional_positive_u64_env(
2238    variable: &str,
2239    value: Option<String>,
2240) -> TraceDbClientResult<Option<u64>> {
2241    let Some(value) = optional_env(variable, value)? else {
2242        return Ok(None);
2243    };
2244    let parsed = value
2245        .parse::<u64>()
2246        .map_err(|_| TraceDbClientError::InvalidConfig {
2247            variable: variable.to_string(),
2248            message: format!("{variable} must be a positive integer"),
2249        })?;
2250    if parsed == 0 {
2251        return Err(TraceDbClientError::InvalidConfig {
2252            variable: variable.to_string(),
2253            message: format!("{variable} must be greater than 0"),
2254        });
2255    }
2256    Ok(Some(parsed))
2257}
2258
2259fn optional_u8_env(variable: &str, value: Option<String>) -> TraceDbClientResult<Option<u8>> {
2260    let Some(value) = optional_env(variable, value)? else {
2261        return Ok(None);
2262    };
2263    value
2264        .parse::<u8>()
2265        .map(Some)
2266        .map_err(|_| TraceDbClientError::InvalidConfig {
2267            variable: variable.to_string(),
2268            message: format!("{variable} must be an integer from 0 to 255"),
2269        })
2270}
2271
2272fn idempotency_key_header(
2273    method: &str,
2274    path: &str,
2275    options: &TraceDbRequestOptions,
2276) -> TraceDbClientResult<String> {
2277    let Some(key) = validated_idempotency_key(method, path, options)? else {
2278        return Ok(String::new());
2279    };
2280    Ok(format!("Idempotency-Key: {key}\r\n"))
2281}
2282
2283fn validated_idempotency_key<'a>(
2284    method: &str,
2285    path: &str,
2286    options: &'a TraceDbRequestOptions,
2287) -> TraceDbClientResult<Option<&'a str>> {
2288    let Some(key) = options.idempotency_key.as_deref() else {
2289        return Ok(None);
2290    };
2291    if key.is_empty() || key.contains('\r') || key.contains('\n') {
2292        return Err(TraceDbClientError::InvalidRequest {
2293            method: method.to_string(),
2294            path: path.to_string(),
2295            message: "idempotency key must be non-empty and must not contain CR or LF".to_string(),
2296        });
2297    }
2298    Ok(Some(key))
2299}
2300
2301fn header_line(name: &str, value: &str) -> TraceDbClientResult<String> {
2302    validate_header_value(name, value)?;
2303    Ok(format!("{name}: {value}\r\n"))
2304}
2305
2306fn validate_header_value(name: &str, value: &str) -> TraceDbClientResult<()> {
2307    if value.contains('\r') || value.contains('\n') {
2308        return Err(TraceDbClientError::InvalidRequest {
2309            method: "CONFIG".to_string(),
2310            path: name.to_string(),
2311            message: "header values must not contain CR or LF".to_string(),
2312        });
2313    }
2314    Ok(())
2315}
2316
2317fn map_request_io_error(
2318    method: &str,
2319    path: &str,
2320    timeout: Duration,
2321    error: std::io::Error,
2322) -> TraceDbClientError {
2323    if matches!(
2324        error.kind(),
2325        std::io::ErrorKind::TimedOut | std::io::ErrorKind::WouldBlock
2326    ) {
2327        TraceDbClientError::Timeout {
2328            method: method.to_string(),
2329            path: path.to_string(),
2330            timeout_ms: timeout_ms(timeout),
2331        }
2332    } else {
2333        TraceDbClientError::Io(error.to_string())
2334    }
2335}
2336
2337fn map_reqwest_error(
2338    method: &str,
2339    path: &str,
2340    timeout: Duration,
2341    error: reqwest::Error,
2342) -> TraceDbClientError {
2343    if error.is_timeout() {
2344        TraceDbClientError::Timeout {
2345            method: method.to_string(),
2346            path: path.to_string(),
2347            timeout_ms: timeout_ms(timeout),
2348        }
2349    } else {
2350        TraceDbClientError::Io(error.to_string())
2351    }
2352}
2353
2354fn is_retry_safe_request(method: &str, path: &str, body: Option<&Value>) -> bool {
2355    match (method, strip_query(path)) {
2356        ("GET", "/v1/health" | "/v1/ready" | "/v1/graphql/schema")
2357        | (
2358            "POST",
2359            "/v1/records/get"
2360            | "/v1/records/scan"
2361            | "/v1/query"
2362            | "/v1/graphql/bounded"
2363            | "/v1/explain",
2364        ) => true,
2365        ("POST", "/v1/traceql") => traceql_body_is_read_only(body),
2366        ("POST", "/v1/graphql") => graphql_body_is_read_only(body),
2367        _ => false,
2368    }
2369}
2370
2371fn traceql_body_is_read_only(body: Option<&Value>) -> bool {
2372    let Some(query) = body_query(body) else {
2373        return false;
2374    };
2375    let Some(command) = traceql_command(query) else {
2376        return true;
2377    };
2378    matches!(
2379        command,
2380        "RECORD GET" | "GET" | "RECORD SCAN" | "SCAN" | "QUERY" | "EXPLAIN" | "JOBS LIST"
2381    )
2382}
2383
2384fn traceql_command(input: &str) -> Option<&'static str> {
2385    let trimmed = input.trim_start();
2386    for command in [
2387        "SCHEMA APPLY",
2388        "RECORD PUT",
2389        "RECORD BATCH",
2390        "RECORD PATCH",
2391        "RECORD DELETE",
2392        "RECORD GET",
2393        "RECORD SCAN",
2394        "ADMIN COMPACT",
2395        "ADMIN SNAPSHOT",
2396        "ADMIN RESTORE",
2397        "JOBS LIST",
2398        "JOBS RUN",
2399        "EXPLAIN",
2400        "QUERY",
2401        "PUT",
2402        "BATCH",
2403        "PATCH",
2404        "DELETE",
2405        "GET",
2406        "SCAN",
2407        "COMPACT",
2408        "SNAPSHOT",
2409        "RESTORE",
2410    ] {
2411        if trimmed.len() == command.len() && trimmed.eq_ignore_ascii_case(command) {
2412            return Some(command);
2413        }
2414        if trimmed.len() > command.len()
2415            && trimmed
2416                .get(..command.len())
2417                .is_some_and(|prefix| prefix.eq_ignore_ascii_case(command))
2418            && trimmed.as_bytes()[command.len()].is_ascii_whitespace()
2419        {
2420            return Some(command);
2421        }
2422    }
2423    None
2424}
2425
2426fn graphql_body_is_read_only(body: Option<&Value>) -> bool {
2427    let Some(query) = body_query(body) else {
2428        return false;
2429    };
2430    graphql_root_field(query)
2431        .is_some_and(|field| matches!(field, "get" | "scan" | "query" | "explain" | "jobs"))
2432}
2433
2434fn graphql_root_field(query: &str) -> Option<&str> {
2435    let trimmed = query.trim_start();
2436    if word_starts_with(trimmed, "mutation") || word_starts_with(trimmed, "subscription") {
2437        return None;
2438    }
2439    let root = if word_starts_with(trimmed, "query") {
2440        trimmed.find('{').map(|index| &trimmed[index + 1..])?
2441    } else {
2442        trimmed.strip_prefix('{')?
2443    };
2444    let (name, rest) = parse_graphql_name(root)?;
2445    let rest = rest.trim_start();
2446    if let Some(rest) = rest.strip_prefix(':') {
2447        parse_graphql_name(rest).map(|(field, _)| field)
2448    } else {
2449        Some(name)
2450    }
2451}
2452
2453fn parse_graphql_name(input: &str) -> Option<(&str, &str)> {
2454    let trimmed = input.trim_start();
2455    let mut chars = trimmed.char_indices();
2456    let (_, first) = chars.next()?;
2457    if !(first == '_' || first.is_ascii_alphabetic()) {
2458        return None;
2459    }
2460    let mut end = first.len_utf8();
2461    for (index, ch) in chars {
2462        if ch == '_' || ch.is_ascii_alphanumeric() {
2463            end = index + ch.len_utf8();
2464        } else {
2465            return Some((&trimmed[..index], &trimmed[index..]));
2466        }
2467    }
2468    Some((&trimmed[..end], &trimmed[end..]))
2469}
2470
2471fn word_starts_with(input: &str, word: &str) -> bool {
2472    input
2473        .get(..word.len())
2474        .is_some_and(|prefix| prefix.eq_ignore_ascii_case(word))
2475        && input[word.len()..]
2476            .chars()
2477            .next()
2478            .map_or(true, |ch| !(ch == '_' || ch.is_ascii_alphanumeric()))
2479}
2480
2481fn body_query(body: Option<&Value>) -> Option<&str> {
2482    body?.get("query")?.as_str()
2483}
2484
2485fn is_idempotent_retry_request(method: &str, path: &str) -> bool {
2486    matches!(
2487        (method, strip_query(path)),
2488        ("POST", "/v1/schema/apply")
2489            | ("POST", "/v1/insert")
2490            | ("POST", "/v1/records/put")
2491            | ("POST", "/v1/records/put-batch")
2492            | ("POST", "/v1/records/patch")
2493            | ("POST", "/v1/records/delete")
2494            | ("POST", "/v1/admin/compact")
2495            | ("POST", "/v1/admin/snapshot")
2496            | ("POST", "/v1/admin/restore")
2497            | ("POST", "/v1/graphql")
2498            | ("POST", "/v1/traceql")
2499    )
2500}
2501
2502fn strip_query(path: &str) -> &str {
2503    path.split_once('?').map(|(path, _)| path).unwrap_or(path)
2504}
2505
2506fn is_retryable_error(error: &TraceDbClientError) -> bool {
2507    matches!(
2508        error,
2509        TraceDbClientError::Io(_) | TraceDbClientError::Timeout { .. }
2510    ) || matches!(error, TraceDbClientError::HttpStatus { status, .. } if *status >= 500)
2511}
2512
2513fn parse_response(method: &str, path: &str, response: &str) -> TraceDbClientResult<Value> {
2514    let (head, body) =
2515        response
2516            .split_once("\r\n\r\n")
2517            .ok_or_else(|| TraceDbClientError::InvalidResponse {
2518                method: method.to_string(),
2519                path: path.to_string(),
2520                message: "missing header boundary".to_string(),
2521            })?;
2522    let status_line = head
2523        .lines()
2524        .next()
2525        .ok_or_else(|| TraceDbClientError::InvalidResponse {
2526            method: method.to_string(),
2527            path: path.to_string(),
2528            message: "missing status line".to_string(),
2529        })?;
2530    let status = status_line
2531        .split_whitespace()
2532        .nth(1)
2533        .ok_or_else(|| TraceDbClientError::InvalidResponse {
2534            method: method.to_string(),
2535            path: path.to_string(),
2536            message: "missing status code".to_string(),
2537        })?
2538        .parse::<u16>()
2539        .map_err(|_| TraceDbClientError::InvalidResponse {
2540            method: method.to_string(),
2541            path: path.to_string(),
2542            message: status_line.to_string(),
2543        })?;
2544    if !(200..300).contains(&status) {
2545        return Err(TraceDbClientError::HttpStatus {
2546            method: method.to_string(),
2547            path: path.to_string(),
2548            status,
2549            body: body.to_string(),
2550        });
2551    }
2552    if body.trim().is_empty() {
2553        return Ok(Value::Null);
2554    }
2555    serde_json::from_str(body).map_err(|error| TraceDbClientError::InvalidResponse {
2556        method: method.to_string(),
2557        path: path.to_string(),
2558        message: format!("invalid JSON body: {error}"),
2559    })
2560}
2561
2562fn decode_typed<T: for<'de> Deserialize<'de>>(
2563    method: &str,
2564    path: &str,
2565    value: Value,
2566) -> TraceDbClientResult<T> {
2567    serde_json::from_value(value).map_err(|error| TraceDbClientError::InvalidResponse {
2568        method: method.to_string(),
2569        path: path.to_string(),
2570        message: format!("invalid JSON shape: {error}"),
2571    })
2572}
2573
2574#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2575pub struct TableRecordInput {
2576    pub id: String,
2577    pub fields: Map<String, Value>,
2578}
2579
2580impl TableRecordInput {
2581    pub fn new(id: impl Into<String>, fields: Map<String, Value>) -> Self {
2582        Self {
2583            id: id.into(),
2584            fields,
2585        }
2586    }
2587}
2588
2589#[derive(Clone, Debug)]
2590/// Fluent builder for TraceDB hybrid queries.
2591pub struct QueryBuilder {
2592    client_config: Option<TraceDbClientConfig>,
2593    table: String,
2594    tenant_id: Option<String>,
2595    text_field: Option<String>,
2596    text_query: Option<String>,
2597    vector_field: Option<String>,
2598    vector: Option<Vec<f32>>,
2599    scalar_eq: Map<String, Value>,
2600    freshness: FeatureFreshnessMode,
2601    limit: usize,
2602    cursor: Option<String>,
2603    explain: bool,
2604}
2605
2606pub type TableHandle = QueryBuilder;
2607
2608impl QueryBuilder {
2609    pub fn tenant(mut self, tenant_id: impl Into<String>) -> Self {
2610        self.tenant_id = Some(tenant_id.into());
2611        self
2612    }
2613
2614    pub fn where_eq(mut self, field: impl Into<String>, value: impl Into<Value>) -> Self {
2615        self.scalar_eq.insert(field.into(), value.into());
2616        self
2617    }
2618
2619    pub fn match_text(mut self, field: impl Into<String>, query: impl Into<String>) -> Self {
2620        self.text_field = Some(field.into());
2621        self.text_query = Some(query.into());
2622        self
2623    }
2624
2625    pub fn near(mut self, field: impl Into<String>, vector: Vec<f32>) -> Self {
2626        self.vector_field = Some(field.into());
2627        self.vector = Some(vector);
2628        self
2629    }
2630
2631    pub fn freshness(mut self, freshness: FeatureFreshnessMode) -> Self {
2632        self.freshness = freshness;
2633        self
2634    }
2635
2636    pub fn limit(mut self, limit: usize) -> Self {
2637        self.limit = limit;
2638        self
2639    }
2640
2641    pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
2642        self.cursor = Some(cursor.into());
2643        self
2644    }
2645
2646    pub fn with_explain(mut self) -> Self {
2647        self.explain = true;
2648        self
2649    }
2650
2651    pub fn query(&self) -> Self {
2652        self.clone()
2653    }
2654
2655    pub fn without_explain(mut self) -> Self {
2656        self.explain = false;
2657        self
2658    }
2659
2660    pub fn insert(
2661        &self,
2662        id: impl Into<String>,
2663        fields: Map<String, Value>,
2664    ) -> TraceDbClientResult<EpochResponse> {
2665        let options = TraceDbRequestOptions::default();
2666        self.insert_with_options(id, fields, &options)
2667    }
2668
2669    pub fn insert_with_options(
2670        &self,
2671        id: impl Into<String>,
2672        fields: Map<String, Value>,
2673        options: &TraceDbRequestOptions,
2674    ) -> TraceDbClientResult<EpochResponse> {
2675        let path = "/v1/records/put";
2676        let tenant_id = self.required_tenant_id("POST", path)?;
2677        let record = self.record_input(TableRecordInput::new(id, fields), &tenant_id);
2678        self.client("POST", path)?
2679            .put_typed_with_options(&record, options)
2680    }
2681
2682    pub fn insert_batch(
2683        &self,
2684        records: Vec<TableRecordInput>,
2685    ) -> TraceDbClientResult<PutBatchResponse> {
2686        let options = TraceDbRequestOptions::default();
2687        self.insert_batch_with_options(records, &options)
2688    }
2689
2690    pub fn insert_batch_with_options(
2691        &self,
2692        records: Vec<TableRecordInput>,
2693        options: &TraceDbRequestOptions,
2694    ) -> TraceDbClientResult<PutBatchResponse> {
2695        let path = "/v1/records/put-batch";
2696        let tenant_id = self.required_tenant_id("POST", path)?;
2697        let records = records
2698            .into_iter()
2699            .map(|record| self.record_input(record, &tenant_id))
2700            .collect();
2701        let request = RecordPutBatchRequest::new(records);
2702        self.client("POST", path)?
2703            .put_batch_typed_with_options(&request, options)
2704    }
2705
2706    pub fn insert_rows(
2707        &self,
2708        rows: Vec<Map<String, Value>>,
2709    ) -> TraceDbClientResult<PutBatchResponse> {
2710        let options = TraceDbRequestOptions::default();
2711        self.insert_rows_with_id_field_and_options(rows, "id", &options)
2712    }
2713
2714    pub fn insert_rows_with_options(
2715        &self,
2716        rows: Vec<Map<String, Value>>,
2717        options: &TraceDbRequestOptions,
2718    ) -> TraceDbClientResult<PutBatchResponse> {
2719        self.insert_rows_with_id_field_and_options(rows, "id", options)
2720    }
2721
2722    pub fn insert_rows_with_id_field(
2723        &self,
2724        rows: Vec<Map<String, Value>>,
2725        id_field: impl Into<String>,
2726    ) -> TraceDbClientResult<PutBatchResponse> {
2727        let options = TraceDbRequestOptions::default();
2728        self.insert_rows_with_id_field_and_options(rows, id_field, &options)
2729    }
2730
2731    pub fn insert_rows_with_id_field_and_options(
2732        &self,
2733        rows: Vec<Map<String, Value>>,
2734        id_field: impl Into<String>,
2735        options: &TraceDbRequestOptions,
2736    ) -> TraceDbClientResult<PutBatchResponse> {
2737        let path = "/v1/records/put-batch";
2738        let id_field = id_field.into();
2739        if id_field.is_empty() {
2740            return Err(TraceDbClientError::InvalidRequest {
2741                method: "POST".to_string(),
2742                path: path.to_string(),
2743                message: "id_field cannot be empty".to_string(),
2744            });
2745        }
2746        let tenant_id = self.required_tenant_id("POST", path)?;
2747        let records = rows
2748            .into_iter()
2749            .enumerate()
2750            .map(|(index, fields)| self.row_record_input(index, fields, &id_field, &tenant_id))
2751            .collect::<TraceDbClientResult<Vec<_>>>()?;
2752        let request = RecordPutBatchRequest::new(records);
2753        self.client("POST", path)?
2754            .put_batch_typed_with_options(&request, options)
2755    }
2756
2757    pub fn patch_record(
2758        &self,
2759        id: impl Into<String>,
2760        fields: Map<String, Value>,
2761    ) -> TraceDbClientResult<EpochResponse> {
2762        let options = TraceDbRequestOptions::default();
2763        self.patch_record_with_options(id, fields, &options)
2764    }
2765
2766    pub fn patch_record_with_options(
2767        &self,
2768        id: impl Into<String>,
2769        fields: Map<String, Value>,
2770        options: &TraceDbRequestOptions,
2771    ) -> TraceDbClientResult<EpochResponse> {
2772        let path = "/v1/records/patch";
2773        let request = RecordPatchRequest::new(
2774            self.table.clone(),
2775            self.required_tenant_id("POST", path)?,
2776            id,
2777            fields,
2778        );
2779        self.client("POST", path)?
2780            .patch_typed_with_options(&request, options)
2781    }
2782
2783    pub fn get_record(&self, id: impl Into<String>) -> TraceDbClientResult<GetRecordResponse> {
2784        let path = "/v1/records/get";
2785        let request = RecordGetRequest::new(
2786            self.table.clone(),
2787            self.required_tenant_id("POST", path)?,
2788            id,
2789        );
2790        self.client("POST", path)?.get_record_typed(&request)
2791    }
2792
2793    pub fn scan_typed(&self) -> TraceDbClientResult<RecordScanOutput> {
2794        let path = "/v1/records/scan";
2795        let request =
2796            RecordScanRequest::new(self.table.clone(), self.required_tenant_id("POST", path)?)
2797                .limit(self.limit);
2798        let request = if let Some(cursor) = &self.cursor {
2799            request.cursor(cursor.clone())
2800        } else {
2801            request
2802        };
2803        self.client("POST", path)?.scan_typed(&request)
2804    }
2805
2806    pub fn delete_record(&self, id: impl Into<String>) -> TraceDbClientResult<DeleteResponse> {
2807        let options = TraceDbRequestOptions::default();
2808        self.delete_record_with_options(id, &options)
2809    }
2810
2811    pub fn delete_record_with_options(
2812        &self,
2813        id: impl Into<String>,
2814        options: &TraceDbRequestOptions,
2815    ) -> TraceDbClientResult<DeleteResponse> {
2816        let path = "/v1/records/delete";
2817        let request = RecordDeleteRequest::new(
2818            self.table.clone(),
2819            self.required_tenant_id("POST", path)?,
2820            id,
2821        );
2822        self.client("POST", path)?
2823            .delete_typed_with_options(&request, options)
2824    }
2825
2826    pub fn all(self) -> TraceDbClientResult<QueryResponse> {
2827        let path = "/v1/query";
2828        let client = self.client("POST", path)?;
2829        let query = self.into_hybrid_query(path)?;
2830        client.query_typed(&query)
2831    }
2832
2833    pub fn explain_plan(self) -> TraceDbClientResult<HybridExplain> {
2834        let path = "/v1/explain";
2835        let client = self.client("POST", path)?;
2836        let query = self.into_hybrid_query(path)?;
2837        client.explain_typed(&query)
2838    }
2839
2840    pub fn build(self) -> TraceQueryRequest {
2841        let freshness = match self.freshness {
2842            FeatureFreshnessMode::Strict => "Strict",
2843            FeatureFreshnessMode::AllowDirty => "AllowDirty",
2844            FeatureFreshnessMode::Lazy
2845            | FeatureFreshnessMode::OnRead
2846            | FeatureFreshnessMode::AllowStale => "Lazy",
2847        };
2848        TraceQueryRequest {
2849            table: self.table,
2850            tenant_id: self.tenant_id.unwrap_or_default(),
2851            text_field: self.text_field,
2852            text: self.text_query,
2853            vector_field: self.vector_field,
2854            vector: self.vector,
2855            scalar_eq: self.scalar_eq,
2856            top_k: self.limit,
2857            cursor: self.cursor,
2858            freshness: freshness.to_string(),
2859            explain: self.explain,
2860        }
2861    }
2862
2863    pub fn put(self, id: impl Into<String>) -> RecordPutBuilder {
2864        RecordPutBuilder {
2865            table: self.table,
2866            tenant_id: self.tenant_id.unwrap_or_default(),
2867            id: id.into(),
2868            fields: Map::new(),
2869        }
2870    }
2871
2872    pub fn scan(self) -> RecordScanBuilder {
2873        RecordScanBuilder {
2874            table: self.table,
2875            tenant_id: self.tenant_id.unwrap_or_default(),
2876            limit: 100,
2877            cursor: self.cursor,
2878        }
2879    }
2880
2881    pub fn delete(self, id: impl Into<String>) -> RecordDeleteBuilder {
2882        RecordDeleteBuilder {
2883            table: self.table,
2884            tenant_id: self.tenant_id.unwrap_or_default(),
2885            id: id.into(),
2886            tombstone: "user_delete".to_string(),
2887        }
2888    }
2889
2890    fn into_hybrid_query(self, path: &str) -> TraceDbClientResult<HybridQuery> {
2891        let tenant_id = self.required_tenant_id("POST", path)?;
2892        let freshness = self.hybrid_freshness();
2893        Ok(HybridQuery {
2894            table: self.table,
2895            tenant_id,
2896            cursor: self.cursor,
2897            text_field: self.text_field,
2898            text: self.text_query,
2899            vector_field: self.vector_field,
2900            vector: self.vector,
2901            scalar_eq: self.scalar_eq,
2902            graph_seed: None,
2903            temporal_as_of: None,
2904            top_k: self.limit,
2905            freshness,
2906            explain: self.explain,
2907        })
2908    }
2909
2910    fn hybrid_freshness(&self) -> FreshnessMode {
2911        match self.freshness {
2912            FeatureFreshnessMode::Strict => FreshnessMode::Strict,
2913            FeatureFreshnessMode::AllowDirty => FreshnessMode::AllowDirty,
2914            FeatureFreshnessMode::Lazy
2915            | FeatureFreshnessMode::OnRead
2916            | FeatureFreshnessMode::AllowStale => FreshnessMode::Lazy,
2917        }
2918    }
2919
2920    fn client(&self, method: &str, path: &str) -> TraceDbClientResult<TraceDbClient> {
2921        self.client_config
2922            .clone()
2923            .map(TraceDbClient::new)
2924            .ok_or_else(|| TraceDbClientError::InvalidRequest {
2925                method: method.to_string(),
2926                path: path.to_string(),
2927                message: "table handle is not bound to a TraceDbClient".to_string(),
2928            })
2929    }
2930
2931    fn required_tenant_id(&self, method: &str, path: &str) -> TraceDbClientResult<String> {
2932        match self.tenant_id.as_ref().filter(|tenant| !tenant.is_empty()) {
2933            Some(tenant_id) => Ok(tenant_id.clone()),
2934            None => Err(TraceDbClientError::InvalidRequest {
2935                method: method.to_string(),
2936                path: path.to_string(),
2937                message: "table handle execution requires tenant(...)".to_string(),
2938            }),
2939        }
2940    }
2941
2942    fn record_input(&self, record: TableRecordInput, tenant_id: &str) -> RecordInput {
2943        let mut fields = record.fields;
2944        fields
2945            .entry("id".to_string())
2946            .or_insert_with(|| Value::String(record.id.clone()));
2947        fields
2948            .entry("tenant".to_string())
2949            .or_insert_with(|| Value::String(tenant_id.to_string()));
2950        RecordInput {
2951            table: self.table.clone(),
2952            id: record.id,
2953            tenant_id: tenant_id.to_string(),
2954            fields,
2955        }
2956    }
2957
2958    fn row_record_input(
2959        &self,
2960        index: usize,
2961        fields: Map<String, Value>,
2962        id_field: &str,
2963        tenant_id: &str,
2964    ) -> TraceDbClientResult<RecordInput> {
2965        let id = fields
2966            .get(id_field)
2967            .ok_or_else(|| TraceDbClientError::InvalidRequest {
2968                method: "POST".to_string(),
2969                path: "/v1/records/put-batch".to_string(),
2970                message: format!("row {index} missing id field '{id_field}'"),
2971            })?;
2972        let id = match id {
2973            Value::String(id) => id.clone(),
2974            value => value.to_string(),
2975        };
2976        Ok(self.record_input(TableRecordInput::new(id, fields), tenant_id))
2977    }
2978}
2979
2980#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2981pub struct TraceQueryRequest {
2982    pub table: String,
2983    pub tenant_id: String,
2984    #[serde(default, skip_serializing_if = "Option::is_none")]
2985    pub cursor: Option<String>,
2986    pub text_field: Option<String>,
2987    pub text: Option<String>,
2988    pub vector_field: Option<String>,
2989    pub vector: Option<Vec<f32>>,
2990    #[serde(default, skip_serializing_if = "Map::is_empty")]
2991    pub scalar_eq: Map<String, Value>,
2992    pub top_k: usize,
2993    pub freshness: String,
2994    pub explain: bool,
2995}
2996
2997#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2998pub struct TraceHttpRequest {
2999    pub path: String,
3000    pub body: Value,
3001}
3002
3003#[derive(Clone, Debug)]
3004pub struct RecordPutBuilder {
3005    table: String,
3006    tenant_id: String,
3007    id: String,
3008    fields: Map<String, Value>,
3009}
3010
3011impl RecordPutBuilder {
3012    pub fn field(mut self, key: impl Into<String>, value: Value) -> Self {
3013        self.fields.insert(key.into(), value);
3014        self
3015    }
3016
3017    pub fn fields(mut self, fields: Map<String, Value>) -> Self {
3018        self.fields.extend(fields);
3019        self
3020    }
3021
3022    pub fn build(mut self) -> TraceHttpRequest {
3023        self.fields
3024            .entry("id".to_string())
3025            .or_insert_with(|| Value::String(self.id.clone()));
3026        self.fields
3027            .entry("tenant".to_string())
3028            .or_insert_with(|| Value::String(self.tenant_id.clone()));
3029        TraceHttpRequest {
3030            path: "/v1/records/put".to_string(),
3031            body: json!({
3032                "table": self.table,
3033                "id": self.id,
3034                "tenant_id": self.tenant_id,
3035                "fields": self.fields,
3036            }),
3037        }
3038    }
3039}
3040
3041#[derive(Clone, Debug)]
3042pub struct RecordScanBuilder {
3043    table: String,
3044    tenant_id: String,
3045    limit: usize,
3046    cursor: Option<String>,
3047}
3048
3049impl RecordScanBuilder {
3050    pub fn limit(mut self, limit: usize) -> Self {
3051        self.limit = limit;
3052        self
3053    }
3054
3055    pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
3056        self.cursor = Some(cursor.into());
3057        self
3058    }
3059
3060    pub fn build(self) -> TraceHttpRequest {
3061        let mut body = json!({
3062            "table": self.table,
3063            "tenant_id": self.tenant_id,
3064            "limit": self.limit,
3065        });
3066        if let Some(cursor) = self.cursor {
3067            body["cursor"] = json!(cursor);
3068        }
3069        TraceHttpRequest {
3070            path: "/v1/records/scan".to_string(),
3071            body,
3072        }
3073    }
3074}
3075
3076#[derive(Clone, Debug)]
3077pub struct RecordDeleteBuilder {
3078    table: String,
3079    tenant_id: String,
3080    id: String,
3081    tombstone: String,
3082}
3083
3084impl RecordDeleteBuilder {
3085    pub fn tombstone(mut self, tombstone: impl Into<String>) -> Self {
3086        self.tombstone = tombstone.into();
3087        self
3088    }
3089
3090    pub fn build(self) -> TraceHttpRequest {
3091        TraceHttpRequest {
3092            path: "/v1/records/delete".to_string(),
3093            body: json!({
3094                "table": self.table,
3095                "tenant_id": self.tenant_id,
3096                "id": self.id,
3097                "tombstone": self.tombstone,
3098            }),
3099        }
3100    }
3101}