1#![forbid(unsafe_code)]
2pub const VERSION: &str = env!("CARGO_PKG_VERSION");
20pub const NAME: &str = env!("CARGO_PKG_NAME");
21
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Map, Value};
24use std::env;
25use std::error::Error;
26use std::fmt::{Display, Formatter};
27use std::io::{Read, Write};
28use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
29use std::thread;
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32pub type TraceDbClientResult<T> = std::result::Result<T, TraceDbClientError>;
33
34#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
35pub enum FeatureFreshnessMode {
36 Strict,
37 Lazy,
38 AllowDirty,
39 OnRead,
40 AllowStale,
41}
42
43#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
44pub enum FreshnessMode {
45 Strict,
46 Lazy,
47 AllowDirty,
48}
49
50#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
51pub struct VectorColumnSchema {
52 pub name: String,
53 pub dimensions: usize,
54 pub source_columns: Vec<String>,
55}
56
57#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
58pub struct TableSchema {
59 pub name: String,
60 pub primary_id_column: String,
61 pub tenant_id_column: String,
62 pub scalar_columns: Vec<String>,
63 pub text_indexed_columns: Vec<String>,
64 pub vector_columns: Vec<VectorColumnSchema>,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
68pub struct RecordInput {
69 pub table: String,
70 pub id: String,
71 pub tenant_id: String,
72 pub fields: Map<String, Value>,
73}
74
75#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct RecordOutput {
77 pub table: String,
78 pub id: String,
79 pub tenant_id: String,
80 pub version_id: u64,
81 pub fields: Map<String, Value>,
82}
83
84#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
85pub struct RecordPutBatchRequest {
86 #[serde(default)]
87 pub include_write_timing: bool,
88 pub records: Vec<RecordInput>,
89}
90
91impl RecordPutBatchRequest {
92 pub fn new(records: Vec<RecordInput>) -> Self {
93 Self {
94 include_write_timing: false,
95 records,
96 }
97 }
98}
99
100#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
101pub struct RecordPatchRequest {
102 pub table: String,
103 pub tenant_id: String,
104 pub id: String,
105 pub fields: Map<String, Value>,
106}
107
108impl RecordPatchRequest {
109 pub fn new(
110 table: impl Into<String>,
111 tenant_id: impl Into<String>,
112 id: impl Into<String>,
113 fields: Map<String, Value>,
114 ) -> Self {
115 Self {
116 table: table.into(),
117 tenant_id: tenant_id.into(),
118 id: id.into(),
119 fields,
120 }
121 }
122}
123
124#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
125pub struct RecordDeleteRequest {
126 pub table: String,
127 pub tenant_id: String,
128 pub id: String,
129 #[serde(default = "default_tombstone")]
130 pub tombstone: String,
131}
132
133impl RecordDeleteRequest {
134 pub fn new(
135 table: impl Into<String>,
136 tenant_id: impl Into<String>,
137 id: impl Into<String>,
138 ) -> Self {
139 Self {
140 table: table.into(),
141 tenant_id: tenant_id.into(),
142 id: id.into(),
143 tombstone: default_tombstone(),
144 }
145 }
146
147 pub fn tombstone(mut self, tombstone: impl Into<String>) -> Self {
148 self.tombstone = tombstone.into();
149 self
150 }
151}
152
153#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
154pub struct RecordGetRequest {
155 pub table: String,
156 pub tenant_id: String,
157 pub id: String,
158}
159
160impl RecordGetRequest {
161 pub fn new(
162 table: impl Into<String>,
163 tenant_id: impl Into<String>,
164 id: impl Into<String>,
165 ) -> Self {
166 Self {
167 table: table.into(),
168 tenant_id: tenant_id.into(),
169 id: id.into(),
170 }
171 }
172}
173
174#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
175pub struct RecordScanRequest {
176 pub table: String,
177 pub tenant_id: String,
178 pub limit: usize,
179 #[serde(default, skip_serializing_if = "Option::is_none")]
180 pub cursor: Option<String>,
181}
182
183impl RecordScanRequest {
184 pub fn new(table: impl Into<String>, tenant_id: impl Into<String>) -> Self {
185 Self {
186 table: table.into(),
187 tenant_id: tenant_id.into(),
188 limit: 100,
189 cursor: None,
190 }
191 }
192
193 pub fn limit(mut self, limit: usize) -> Self {
194 self.limit = limit;
195 self
196 }
197
198 pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
199 self.cursor = Some(cursor.into());
200 self
201 }
202}
203
204#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
205pub struct RecordScanOutput {
206 pub records: Vec<RecordOutput>,
207 pub returned_count: usize,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub next_cursor: Option<String>,
210}
211
212#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
213pub struct HybridQuery {
214 pub table: String,
215 pub tenant_id: String,
216 #[serde(default, skip_serializing_if = "Option::is_none")]
217 pub cursor: Option<String>,
218 #[serde(default)]
219 pub text_field: Option<String>,
220 pub text: Option<String>,
221 #[serde(default)]
222 pub vector_field: Option<String>,
223 pub vector: Option<Vec<f32>>,
224 #[serde(default)]
225 pub scalar_eq: Map<String, Value>,
226 #[serde(default)]
227 pub graph_seed: Option<String>,
228 #[serde(default)]
229 pub temporal_as_of: Option<u64>,
230 pub top_k: usize,
231 pub freshness: FreshnessMode,
232 pub explain: bool,
233}
234
235#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
236pub struct ScoreComponents {
237 pub vector: Option<f32>,
238 pub lexical: Option<f32>,
239 pub relational: Option<f32>,
240 pub freshness_penalty: Option<f32>,
241 pub final_score: f32,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245pub struct QueryRow {
246 pub record_id: String,
247 pub version_id: u64,
248 pub tenant_id: String,
249 pub fields: Map<String, Value>,
250 pub score: ScoreComponents,
251}
252
253pub type HybridQueryRow = QueryRow;
254pub type HybridScoreComponents = ScoreComponents;
255
256#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
257pub enum FeatureFreshness {
258 Ready,
259 Dirty,
260 Pending,
261 Failed,
262 Missing,
263}
264
265#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
266pub struct Candidate {
267 pub record_id: String,
268 pub version_id: u64,
269 pub score_components: ScoreComponents,
270 pub score_upper_bound: Option<f32>,
271 pub source: String,
272 pub freshness: FeatureFreshness,
273 pub visibility_checked: bool,
274}
275
276#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
277pub struct AccessPathExplain {
278 pub access_path_id: String,
279 pub opened: bool,
280 pub visibility_checked_before_open: bool,
281 pub candidates: usize,
282}
283
284#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
285pub struct QueryPhaseTiming {
286 pub phase: String,
287 pub elapsed_ms: f64,
288}
289
290#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
291pub struct AccessPathTiming {
292 pub access_path_id: String,
293 pub build_ms: f64,
294 pub open_ms: f64,
295}
296
297#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
298pub struct ExplainOutput {
299 pub read_epoch: u64,
300 pub schema_epoch: u64,
301 pub policy_epoch: u64,
302 pub tenant_mask_visible_records: usize,
303 pub scalar_filter_applied: bool,
304 pub scalar_filter_predicates: Vec<String>,
305 pub scalar_filter_visible_records: usize,
306 pub scalar_filter_removed_records: usize,
307 pub opened_candidate_streams: Vec<String>,
308 pub access_paths: Vec<AccessPathExplain>,
309 pub planner_candidates: Vec<Candidate>,
310 pub candidate_budget: usize,
311 pub text_candidates: usize,
312 pub vector_candidates: usize,
313 pub hot_overlay_searched: bool,
314 pub freshness_mode: String,
315 pub dirty_feature_count: usize,
316 pub pending_feature_count: usize,
317 pub failed_feature_count: usize,
318 pub missing_feature_count: usize,
319 pub fusion_method: String,
320 pub deduped_candidate_count: usize,
321 pub materialized_count: usize,
322 pub final_visibility_guard_count: usize,
323 pub final_visibility_guard_removed: usize,
324 pub returned_count: usize,
325 pub segments_scanned: usize,
326 pub module_versions: Vec<String>,
327 pub selected_strategy: Option<String>,
328 pub skipped_access_paths: Vec<String>,
329 pub exact_fallback_triggered: bool,
330 pub early_stop_reason: Option<String>,
331 #[serde(default)]
332 pub lexical_cache_hits: usize,
333 #[serde(default)]
334 pub lexical_cache_misses: usize,
335 #[serde(default)]
336 pub lexical_indexed_documents: usize,
337 #[serde(default)]
338 pub lexical_scored_documents: usize,
339 #[serde(default)]
340 pub phase_timings: Vec<QueryPhaseTiming>,
341 #[serde(default)]
342 pub access_path_timings: Vec<AccessPathTiming>,
343}
344
345pub type HybridExplain = ExplainOutput;
346
347#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
348pub struct QueryOutput {
349 pub results: Vec<QueryRow>,
350 pub explain: ExplainOutput,
351 #[serde(default, skip_serializing_if = "Option::is_none")]
352 pub next_cursor: Option<String>,
353}
354
355pub type HybridQueryOutput = QueryOutput;
356
357#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
358pub struct WritePathTiming {
359 pub total_ms: f64,
360 pub lock_ms: f64,
361 pub refresh_total_ms: f64,
362 pub refresh_manifest_read_ms: f64,
363 pub refresh_wal_tail_ms: f64,
364 pub refresh_reopen_ms: f64,
365 pub refresh_performed: bool,
366 pub schema_lookup_ms: f64,
367 pub store_clone_ms: f64,
368 #[serde(default)]
369 pub store_delta_plan_ms: f64,
370 #[serde(default)]
371 pub store_delta_apply_ms: f64,
372 pub store_apply_ms: f64,
373 #[serde(default)]
374 pub store_apply_validate_identity_ms: f64,
375 #[serde(default)]
376 pub store_apply_validate_vector_ms: f64,
377 #[serde(default)]
378 pub store_apply_key_ms: f64,
379 #[serde(default)]
380 pub store_apply_fields_ms: f64,
381 #[serde(default)]
382 pub store_apply_finalize_identity_ms: f64,
383 #[serde(default)]
384 pub store_apply_features_ms: f64,
385 #[serde(default)]
386 pub store_apply_install_ms: f64,
387 pub feature_invalidation_ms: f64,
388 pub commit_build_ms: f64,
389 pub wal_total_ms: f64,
390 pub wal_lock_tail_ms: f64,
391 pub wal_frame_build_ms: f64,
392 pub wal_commit_prepare_ms: f64,
393 pub wal_serialize_ms: f64,
394 pub wal_payload_checksum_ms: f64,
395 pub wal_frame_assembly_ms: f64,
396 pub wal_payload_bytes: u64,
397 pub wal_frame_bytes: u64,
398 pub wal_write_ms: f64,
399 pub wal_sync_data_ms: f64,
400 pub wal_tail_update_ms: f64,
401 pub store_install_ms: f64,
402 pub manifest_total_ms: f64,
403 pub manifest_clone_ms: f64,
404 pub manifest_write_total_ms: f64,
405 pub manifest_bytes: u64,
406 pub manifest_checksum_ms: f64,
407 pub manifest_serialize_ms: f64,
408 pub manifest_write_ms: f64,
409 pub manifest_sync_file_ms: f64,
410 pub manifest_rename_ms: f64,
411 pub manifest_sync_dir_ms: f64,
412 pub cache_clear_ms: f64,
413}
414
415fn default_tombstone() -> String {
416 "user_delete".to_string()
417}
418
419#[derive(Clone, Debug)]
420pub enum TraceDbClientError {
421 InvalidUrl(String),
422 InvalidConfig {
423 variable: String,
424 message: String,
425 },
426 InvalidRequest {
427 method: String,
428 path: String,
429 message: String,
430 },
431 Io(String),
432 Json(String),
433 Timeout {
434 method: String,
435 path: String,
436 timeout_ms: u64,
437 },
438 InvalidResponse {
439 method: String,
440 path: String,
441 message: String,
442 },
443 HttpStatus {
444 method: String,
445 path: String,
446 status: u16,
447 body: String,
448 },
449}
450
451impl Display for TraceDbClientError {
452 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
453 match self {
454 Self::InvalidUrl(url) => write!(f, "invalid TraceDB URL {url}"),
455 Self::InvalidConfig { variable, message } => {
456 write!(f, "invalid TraceDB SDK config for {variable}: {message}")
457 }
458 Self::InvalidRequest {
459 method,
460 path,
461 message,
462 } => write!(
463 f,
464 "invalid TraceDB HTTP request for {method} {path}: {message}"
465 ),
466 Self::Io(error) => write!(f, "TraceDB HTTP I/O error: {error}"),
467 Self::Json(error) => write!(f, "TraceDB JSON error: {error}"),
468 Self::Timeout {
469 method,
470 path,
471 timeout_ms,
472 } => write!(
473 f,
474 "TraceDB HTTP request {method} {path} timed out after {timeout_ms} ms"
475 ),
476 Self::InvalidResponse {
477 method,
478 path,
479 message,
480 } => write!(
481 f,
482 "invalid TraceDB HTTP response for {method} {path}: {message}"
483 ),
484 Self::HttpStatus {
485 method,
486 path,
487 status,
488 body,
489 } => {
490 write!(
491 f,
492 "TraceDB HTTP request {method} {path} failed with status {status}: {body}"
493 )
494 }
495 }
496 }
497}
498
499impl Error for TraceDbClientError {
500 fn source(&self) -> Option<&(dyn Error + 'static)> {
501 match self {
502 Self::Io(_error) => None,
503 Self::Json(_error) => None,
504 Self::InvalidUrl(_)
505 | Self::InvalidConfig { .. }
506 | Self::InvalidRequest { .. }
507 | Self::Timeout { .. }
508 | Self::InvalidResponse { .. }
509 | Self::HttpStatus { .. } => None,
510 }
511 }
512}
513
514impl From<std::io::Error> for TraceDbClientError {
515 fn from(error: std::io::Error) -> Self {
516 Self::Io(error.to_string())
517 }
518}
519
520impl From<serde_json::Error> for TraceDbClientError {
521 fn from(error: serde_json::Error) -> Self {
522 Self::Json(error.to_string())
523 }
524}
525
526impl TraceDbClientError {
527 pub fn error_response(&self) -> Option<ErrorResponse> {
528 match self {
529 Self::HttpStatus { body, .. } => serde_json::from_str::<ErrorResponse>(body).ok(),
530 _ => None,
531 }
532 }
533
534 pub fn server_error(&self) -> Option<String> {
535 let Self::HttpStatus { body, .. } = self else {
536 return None;
537 };
538 serde_json::from_str::<ErrorResponse>(body)
539 .ok()
540 .map(|response| response.error)
541 }
542
543 pub fn server_error_code(&self) -> Option<String> {
544 let Self::HttpStatus { body, .. } = self else {
545 return None;
546 };
547 serde_json::from_str::<ErrorResponse>(body)
548 .ok()
549 .and_then(|response| response.code)
550 }
551}
552
553#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
554pub struct TraceDbClientConfig {
555 pub url: String,
556 pub token: String,
557 #[serde(default, skip_serializing_if = "Option::is_none")]
558 pub database_id: Option<String>,
559 #[serde(default, skip_serializing_if = "Option::is_none")]
560 pub branch_id: Option<String>,
561 #[serde(default = "default_request_timeout_ms")]
562 pub request_timeout_ms: u64,
563 #[serde(default)]
564 pub safe_retries: u8,
565 #[serde(default)]
566 pub idempotency_retries: u8,
567}
568
569impl TraceDbClientConfig {
570 pub fn managed(url: impl Into<String>, token: impl Into<String>) -> Self {
571 Self {
572 url: url.into(),
573 token: token.into(),
574 database_id: None,
575 branch_id: None,
576 request_timeout_ms: default_request_timeout_ms(),
577 safe_retries: 0,
578 idempotency_retries: 0,
579 }
580 }
581
582 pub fn from_env() -> TraceDbClientResult<Self> {
583 Self::from_env_vars(env::vars())
584 }
585
586 pub fn from_env_vars<K, V, I>(vars: I) -> TraceDbClientResult<Self>
587 where
588 K: Into<String>,
589 V: Into<String>,
590 I: IntoIterator<Item = (K, V)>,
591 {
592 let mut url = None;
593 let mut token = None;
594 let mut database_id = None;
595 let mut branch_id = None;
596 let mut timeout_ms = None;
597 let mut safe_retries = None;
598 let mut idempotency_retries = None;
599
600 for (key, value) in vars {
601 let key = key.into();
602 let value = value.into();
603 match key.as_str() {
604 "TRACEDB_URL" => url = Some(value),
605 "TRACEDB_TOKEN" => token = Some(value),
606 "TRACEDB_DATABASE_ID" => database_id = Some(value),
607 "TRACEDB_BRANCH_ID" => branch_id = Some(value),
608 "TRACEDB_TIMEOUT_MS" => timeout_ms = Some(value),
609 "TRACEDB_SAFE_RETRIES" => safe_retries = Some(value),
610 "TRACEDB_IDEMPOTENCY_RETRIES" => idempotency_retries = Some(value),
611 _ => {}
612 }
613 }
614
615 let url = required_env("TRACEDB_URL", url)?;
616 let mut config = Self::managed(url, token.unwrap_or_default());
617 if let Some(database_id) = optional_env("TRACEDB_DATABASE_ID", database_id)? {
618 config = config.with_database(database_id);
619 }
620 if let Some(branch_id) = optional_env("TRACEDB_BRANCH_ID", branch_id)? {
621 config = config.with_branch(branch_id);
622 }
623 if let Some(timeout_ms) = optional_positive_u64_env("TRACEDB_TIMEOUT_MS", timeout_ms)? {
624 config.request_timeout_ms = timeout_ms;
625 }
626 if let Some(retries) = optional_u8_env("TRACEDB_SAFE_RETRIES", safe_retries)? {
627 config.safe_retries = retries;
628 }
629 if let Some(retries) = optional_u8_env("TRACEDB_IDEMPOTENCY_RETRIES", idempotency_retries)?
630 {
631 config.idempotency_retries = retries;
632 }
633 Ok(config)
634 }
635
636 pub fn with_database(mut self, database_id: impl Into<String>) -> Self {
637 self.database_id = Some(database_id.into());
638 self
639 }
640
641 pub fn with_branch(mut self, branch_id: impl Into<String>) -> Self {
642 self.branch_id = Some(branch_id.into());
643 self
644 }
645
646 pub fn with_database_branch(
647 self,
648 database_id: impl Into<String>,
649 branch_id: impl Into<String>,
650 ) -> Self {
651 self.with_database(database_id).with_branch(branch_id)
652 }
653
654 pub fn with_timeout(mut self, timeout: Duration) -> Self {
655 self.request_timeout_ms = timeout_ms(timeout);
656 self
657 }
658
659 pub fn with_safe_retries(mut self, retries: u8) -> Self {
660 self.safe_retries = retries;
661 self
662 }
663
664 pub fn with_idempotency_retries(mut self, retries: u8) -> Self {
665 self.idempotency_retries = retries;
666 self
667 }
668
669 fn request_timeout(&self) -> Duration {
670 Duration::from_millis(self.request_timeout_ms.max(1))
671 }
672}
673
674#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
675pub struct TraceDbRequestOptions {
676 #[serde(default, skip_serializing_if = "Option::is_none")]
677 pub idempotency_key: Option<String>,
678 #[serde(default, skip_serializing_if = "Option::is_none")]
679 pub actor_context: Option<TraceDbActorContext>,
680}
681
682impl TraceDbRequestOptions {
683 pub fn new() -> Self {
684 Self::default()
685 }
686
687 pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
688 self.idempotency_key = Some(key.into());
689 self
690 }
691
692 pub fn with_actor_context(mut self, actor_context: TraceDbActorContext) -> Self {
693 self.actor_context = Some(actor_context);
694 self
695 }
696}
697
698#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
699pub struct TraceDbActorContext {
700 pub tenant_id: String,
701 pub database_id: String,
702 pub branch_id: String,
703 pub token_identity: String,
704 pub request_id: String,
705 #[serde(default)]
706 pub policy_epoch: u64,
707 #[serde(default)]
708 pub scopes: Vec<String>,
709}
710
711impl TraceDbActorContext {
712 pub fn new(
713 tenant_id: impl Into<String>,
714 database_id: impl Into<String>,
715 branch_id: impl Into<String>,
716 token_identity: impl Into<String>,
717 request_id: impl Into<String>,
718 ) -> Self {
719 Self {
720 tenant_id: tenant_id.into(),
721 database_id: database_id.into(),
722 branch_id: branch_id.into(),
723 token_identity: token_identity.into(),
724 request_id: request_id.into(),
725 policy_epoch: 0,
726 scopes: Vec::new(),
727 }
728 }
729
730 pub fn with_policy_epoch(mut self, policy_epoch: u64) -> Self {
731 self.policy_epoch = policy_epoch;
732 self
733 }
734
735 pub fn with_scopes(mut self, scopes: impl IntoIterator<Item = impl Into<String>>) -> Self {
736 self.scopes = scopes.into_iter().map(Into::into).collect();
737 self
738 }
739}
740
741#[derive(Clone, Debug)]
742pub struct TraceDbClient {
744 pub config: TraceDbClientConfig,
745}
746
747pub type TraceDb = TraceDbClient;
748
749impl TraceDbClient {
750 pub fn new(config: TraceDbClientConfig) -> Self {
751 Self { config }
752 }
753
754 pub fn connect(config: TraceDbClientConfig) -> TraceDbClientResult<Self> {
755 HttpTarget::parse(&config.url)?;
756 Ok(Self::new(config))
757 }
758
759 pub fn ready(&self) -> TraceDbClientResult<Value> {
760 self.get_json("/v1/ready")
761 }
762
763 pub fn ready_typed(&self) -> TraceDbClientResult<ReadyResponse> {
764 self.get_typed("/v1/ready")
765 }
766
767 pub fn health(&self) -> TraceDbClientResult<Value> {
768 self.get_json("/v1/health")
769 }
770
771 pub fn health_typed(&self) -> TraceDbClientResult<HealthResponse> {
772 self.get_typed("/v1/health")
773 }
774
775 pub fn list_databases(&self) -> TraceDbClientResult<Value> {
776 self.get_json("/v1/databases")
777 }
778
779 pub fn list_databases_typed(&self) -> TraceDbClientResult<DatabasesResponse> {
780 self.get_typed("/v1/databases")
781 }
782
783 pub fn list_branches(&self) -> TraceDbClientResult<Value> {
784 self.get_json("/v1/branches")
785 }
786
787 pub fn list_branches_typed(&self) -> TraceDbClientResult<BranchesResponse> {
788 self.get_typed("/v1/branches")
789 }
790
791 pub fn public_safe_metrics(&self) -> TraceDbClientResult<Value> {
792 self.get_json("/v1/metrics/public-safe")
793 }
794
795 pub fn public_safe_metrics_typed(&self) -> TraceDbClientResult<MetricsResponse> {
796 self.get_typed("/v1/metrics/public-safe")
797 }
798
799 pub fn apply_schema(&self, schema: &TableSchema) -> TraceDbClientResult<Value> {
800 self.post_json("/v1/schema/apply", schema)
801 }
802
803 pub fn apply_schema_with_options(
804 &self,
805 schema: &TableSchema,
806 options: &TraceDbRequestOptions,
807 ) -> TraceDbClientResult<Value> {
808 self.post_json_with_options("/v1/schema/apply", schema, options)
809 }
810
811 pub fn apply_schema_typed(&self, schema: &TableSchema) -> TraceDbClientResult<EpochResponse> {
812 self.post_typed("/v1/schema/apply", schema)
813 }
814
815 pub fn apply_schema_typed_with_options(
816 &self,
817 schema: &TableSchema,
818 options: &TraceDbRequestOptions,
819 ) -> TraceDbClientResult<EpochResponse> {
820 self.post_typed_with_options("/v1/schema/apply", schema, options)
821 }
822
823 pub fn put(&self, record: &RecordInput) -> TraceDbClientResult<Value> {
824 self.post_json("/v1/records/put", record)
825 }
826
827 pub fn put_with_options(
828 &self,
829 record: &RecordInput,
830 options: &TraceDbRequestOptions,
831 ) -> TraceDbClientResult<Value> {
832 self.post_json_with_options("/v1/records/put", record, options)
833 }
834
835 pub fn put_typed(&self, record: &RecordInput) -> TraceDbClientResult<EpochResponse> {
836 self.post_typed("/v1/records/put", record)
837 }
838
839 pub fn put_typed_with_options(
840 &self,
841 record: &RecordInput,
842 options: &TraceDbRequestOptions,
843 ) -> TraceDbClientResult<EpochResponse> {
844 self.post_typed_with_options("/v1/records/put", record, options)
845 }
846
847 pub fn put_batch(&self, request: &RecordPutBatchRequest) -> TraceDbClientResult<Value> {
848 self.post_json("/v1/records/put-batch", request)
849 }
850
851 pub fn put_batch_with_options(
852 &self,
853 request: &RecordPutBatchRequest,
854 options: &TraceDbRequestOptions,
855 ) -> TraceDbClientResult<Value> {
856 self.post_json_with_options("/v1/records/put-batch", request, options)
857 }
858
859 pub fn put_batch_typed(
860 &self,
861 request: &RecordPutBatchRequest,
862 ) -> TraceDbClientResult<PutBatchResponse> {
863 self.post_typed("/v1/records/put-batch", request)
864 }
865
866 pub fn put_batch_typed_with_options(
867 &self,
868 request: &RecordPutBatchRequest,
869 options: &TraceDbRequestOptions,
870 ) -> TraceDbClientResult<PutBatchResponse> {
871 self.post_typed_with_options("/v1/records/put-batch", request, options)
872 }
873
874 pub fn patch(&self, request: &RecordPatchRequest) -> TraceDbClientResult<Value> {
875 self.post_json("/v1/records/patch", request)
876 }
877
878 pub fn patch_with_options(
879 &self,
880 request: &RecordPatchRequest,
881 options: &TraceDbRequestOptions,
882 ) -> TraceDbClientResult<Value> {
883 self.post_json_with_options("/v1/records/patch", request, options)
884 }
885
886 pub fn patch_typed(&self, request: &RecordPatchRequest) -> TraceDbClientResult<EpochResponse> {
887 self.post_typed("/v1/records/patch", request)
888 }
889
890 pub fn patch_typed_with_options(
891 &self,
892 request: &RecordPatchRequest,
893 options: &TraceDbRequestOptions,
894 ) -> TraceDbClientResult<EpochResponse> {
895 self.post_typed_with_options("/v1/records/patch", request, options)
896 }
897
898 pub fn delete(&self, request: &RecordDeleteRequest) -> TraceDbClientResult<Value> {
899 self.post_json("/v1/records/delete", request)
900 }
901
902 pub fn delete_with_options(
903 &self,
904 request: &RecordDeleteRequest,
905 options: &TraceDbRequestOptions,
906 ) -> TraceDbClientResult<Value> {
907 self.post_json_with_options("/v1/records/delete", request, options)
908 }
909
910 pub fn delete_typed(
911 &self,
912 request: &RecordDeleteRequest,
913 ) -> TraceDbClientResult<DeleteResponse> {
914 self.post_typed("/v1/records/delete", request)
915 }
916
917 pub fn delete_typed_with_options(
918 &self,
919 request: &RecordDeleteRequest,
920 options: &TraceDbRequestOptions,
921 ) -> TraceDbClientResult<DeleteResponse> {
922 self.post_typed_with_options("/v1/records/delete", request, options)
923 }
924
925 pub fn get(&self, request: &RecordGetRequest) -> TraceDbClientResult<Value> {
926 self.post_json("/v1/records/get", request)
927 }
928
929 pub fn get_record_typed(
930 &self,
931 request: &RecordGetRequest,
932 ) -> TraceDbClientResult<GetRecordResponse> {
933 self.post_typed("/v1/records/get", request)
934 }
935
936 pub fn scan(&self, request: &RecordScanRequest) -> TraceDbClientResult<Value> {
937 self.post_json("/v1/records/scan", request)
938 }
939
940 pub fn scan_typed(&self, request: &RecordScanRequest) -> TraceDbClientResult<RecordScanOutput> {
941 self.post_typed("/v1/records/scan", request)
942 }
943
944 pub fn query(&self, query: &HybridQuery) -> TraceDbClientResult<Value> {
945 self.post_json("/v1/query", query)
946 }
947
948 pub fn query_typed(&self, query: &HybridQuery) -> TraceDbClientResult<QueryResponse> {
949 self.post_typed("/v1/query", query)
950 }
951
952 pub fn traceql(&self, query: impl Into<String>) -> TraceDbClientResult<Value> {
953 let request = TraceQlQueryRequest::new(query);
954 self.traceql_request(&request)
955 }
956
957 pub fn traceql_request(&self, request: &TraceQlQueryRequest) -> TraceDbClientResult<Value> {
958 self.post_json("/v1/traceql", request)
959 }
960
961 pub fn traceql_request_with_options(
962 &self,
963 request: &TraceQlQueryRequest,
964 options: &TraceDbRequestOptions,
965 ) -> TraceDbClientResult<Value> {
966 self.post_json_with_options("/v1/traceql", request, options)
967 }
968
969 pub fn traceql_typed(&self, query: impl Into<String>) -> TraceDbClientResult<QueryResponse> {
970 let request = TraceQlQueryRequest::new(query);
971 self.traceql_request_typed(&request)
972 }
973
974 pub fn traceql_request_typed(
975 &self,
976 request: &TraceQlQueryRequest,
977 ) -> TraceDbClientResult<QueryResponse> {
978 self.post_typed("/v1/traceql", request)
979 }
980
981 pub fn traceql_request_typed_with_options(
982 &self,
983 request: &TraceQlQueryRequest,
984 options: &TraceDbRequestOptions,
985 ) -> TraceDbClientResult<QueryResponse> {
986 self.post_typed_with_options("/v1/traceql", request, options)
987 }
988
989 pub fn graphql(&self, query: impl Into<String>) -> TraceDbClientResult<Value> {
990 let request = GraphQlQueryRequest::new(query);
991 self.graphql_request(&request)
992 }
993
994 pub fn graphql_request(&self, request: &GraphQlQueryRequest) -> TraceDbClientResult<Value> {
995 self.post_json("/v1/graphql", request)
996 }
997
998 pub fn graphql_request_with_options(
999 &self,
1000 request: &GraphQlQueryRequest,
1001 options: &TraceDbRequestOptions,
1002 ) -> TraceDbClientResult<Value> {
1003 self.post_json_with_options("/v1/graphql", request, options)
1004 }
1005
1006 pub fn graphql_typed(&self, query: impl Into<String>) -> TraceDbClientResult<GraphQlResponse> {
1007 let request = GraphQlQueryRequest::new(query);
1008 self.graphql_request_typed(&request)
1009 }
1010
1011 pub fn graphql_request_typed(
1012 &self,
1013 request: &GraphQlQueryRequest,
1014 ) -> TraceDbClientResult<GraphQlResponse> {
1015 self.post_typed("/v1/graphql", request)
1016 }
1017
1018 pub fn graphql_request_typed_with_options(
1019 &self,
1020 request: &GraphQlQueryRequest,
1021 options: &TraceDbRequestOptions,
1022 ) -> TraceDbClientResult<GraphQlResponse> {
1023 self.post_typed_with_options("/v1/graphql", request, options)
1024 }
1025
1026 pub fn bounded_graphql(&self, query: impl Into<String>) -> TraceDbClientResult<Value> {
1027 let request = GraphQlQueryRequest::new(query);
1028 self.bounded_graphql_request(&request)
1029 }
1030
1031 pub fn bounded_graphql_request(
1032 &self,
1033 request: &GraphQlQueryRequest,
1034 ) -> TraceDbClientResult<Value> {
1035 self.post_json("/v1/graphql/bounded", request)
1036 }
1037
1038 pub fn bounded_graphql_typed(
1039 &self,
1040 query: impl Into<String>,
1041 ) -> TraceDbClientResult<QueryResponse> {
1042 let request = GraphQlQueryRequest::new(query);
1043 self.bounded_graphql_request_typed(&request)
1044 }
1045
1046 pub fn bounded_graphql_request_typed(
1047 &self,
1048 request: &GraphQlQueryRequest,
1049 ) -> TraceDbClientResult<QueryResponse> {
1050 self.post_typed("/v1/graphql/bounded", request)
1051 }
1052
1053 pub fn graphql_schema(&self) -> TraceDbClientResult<Value> {
1054 self.get_json("/v1/graphql/schema")
1055 }
1056
1057 pub fn graphql_schema_typed(&self) -> TraceDbClientResult<GraphQlSchemaResponse> {
1058 self.get_typed("/v1/graphql/schema")
1059 }
1060
1061 pub fn explain(&self, query: &HybridQuery) -> TraceDbClientResult<Value> {
1062 self.post_json("/v1/explain", query)
1063 }
1064
1065 pub fn explain_typed(&self, query: &HybridQuery) -> TraceDbClientResult<HybridExplain> {
1066 self.post_typed("/v1/explain", query)
1067 }
1068
1069 pub fn compact(&self) -> TraceDbClientResult<Value> {
1070 self.post_json("/v1/admin/compact", &json!({}))
1071 }
1072
1073 pub fn compact_with_options(
1074 &self,
1075 options: &TraceDbRequestOptions,
1076 ) -> TraceDbClientResult<Value> {
1077 self.post_json_with_options("/v1/admin/compact", &json!({}), options)
1078 }
1079
1080 pub fn compact_typed(&self) -> TraceDbClientResult<CompactResponse> {
1081 self.post_typed("/v1/admin/compact", &json!({}))
1082 }
1083
1084 pub fn compact_typed_with_options(
1085 &self,
1086 options: &TraceDbRequestOptions,
1087 ) -> TraceDbClientResult<CompactResponse> {
1088 self.post_typed_with_options("/v1/admin/compact", &json!({}), options)
1089 }
1090
1091 pub fn list_admin_jobs(&self) -> TraceDbClientResult<Value> {
1092 self.get_json("/v1/admin/jobs")
1093 }
1094
1095 pub fn list_admin_jobs_typed(&self) -> TraceDbClientResult<JobsResponse> {
1096 self.get_typed("/v1/admin/jobs")
1097 }
1098
1099 pub fn snapshot(&self, request: &SnapshotRequest) -> TraceDbClientResult<Value> {
1100 self.post_json("/v1/admin/snapshot", request)
1101 }
1102
1103 pub fn snapshot_with_options(
1104 &self,
1105 request: &SnapshotRequest,
1106 options: &TraceDbRequestOptions,
1107 ) -> TraceDbClientResult<Value> {
1108 self.post_json_with_options("/v1/admin/snapshot", request, options)
1109 }
1110
1111 pub fn snapshot_typed(
1112 &self,
1113 request: &SnapshotRequest,
1114 ) -> TraceDbClientResult<SnapshotResponse> {
1115 self.post_typed("/v1/admin/snapshot", request)
1116 }
1117
1118 pub fn snapshot_typed_with_options(
1119 &self,
1120 request: &SnapshotRequest,
1121 options: &TraceDbRequestOptions,
1122 ) -> TraceDbClientResult<SnapshotResponse> {
1123 self.post_typed_with_options("/v1/admin/snapshot", request, options)
1124 }
1125
1126 pub fn restore(&self, request: &RestoreRequest) -> TraceDbClientResult<Value> {
1127 self.post_json("/v1/admin/restore", request)
1128 }
1129
1130 pub fn restore_with_options(
1131 &self,
1132 request: &RestoreRequest,
1133 options: &TraceDbRequestOptions,
1134 ) -> TraceDbClientResult<Value> {
1135 self.post_json_with_options("/v1/admin/restore", request, options)
1136 }
1137
1138 pub fn restore_typed(&self, request: &RestoreRequest) -> TraceDbClientResult<RestoreResponse> {
1139 self.post_typed("/v1/admin/restore", request)
1140 }
1141
1142 pub fn restore_typed_with_options(
1143 &self,
1144 request: &RestoreRequest,
1145 options: &TraceDbRequestOptions,
1146 ) -> TraceDbClientResult<RestoreResponse> {
1147 self.post_typed_with_options("/v1/admin/restore", request, options)
1148 }
1149
1150 pub fn request_json(
1151 &self,
1152 method: &str,
1153 path: &str,
1154 body: Option<&Value>,
1155 ) -> TraceDbClientResult<Value> {
1156 self.request_json_with_options(method, path, body, &TraceDbRequestOptions::default())
1157 }
1158
1159 pub fn request_json_with_options(
1160 &self,
1161 method: &str,
1162 path: &str,
1163 body: Option<&Value>,
1164 options: &TraceDbRequestOptions,
1165 ) -> TraceDbClientResult<Value> {
1166 let attempts = self.request_attempts(method, path, body, options);
1167 for attempt in 0..attempts {
1168 match self.request_json_once(method, path, body, options) {
1169 Ok(value) => return Ok(value),
1170 Err(error) if is_retryable_error(&error) && attempt + 1 < attempts => {
1171 thread::sleep(retry_backoff_delay(attempt));
1172 }
1173 Err(error) => return Err(error),
1174 }
1175 }
1176 unreachable!("request attempts should be at least one")
1177 }
1178
1179 fn request_attempts(
1180 &self,
1181 method: &str,
1182 path: &str,
1183 body: Option<&Value>,
1184 options: &TraceDbRequestOptions,
1185 ) -> u8 {
1186 if self.config.idempotency_retries > 0
1187 && is_idempotent_retry_request(method, path)
1188 && options
1189 .idempotency_key
1190 .as_deref()
1191 .is_some_and(|key| !key.is_empty())
1192 {
1193 self.config.idempotency_retries.saturating_add(1)
1194 } else if is_retry_safe_request(method, path, body) {
1195 self.config.safe_retries.saturating_add(1)
1196 } else {
1197 1
1198 }
1199 }
1200
1201 fn request_json_once(
1202 &self,
1203 method: &str,
1204 path: &str,
1205 body: Option<&Value>,
1206 options: &TraceDbRequestOptions,
1207 ) -> TraceDbClientResult<Value> {
1208 let target = HttpTarget::parse(&self.config.url)?;
1209 let request_path = target.path(path);
1210 let body_bytes = self.request_body_bytes(body)?;
1211 let timeout = self.config.request_timeout();
1212 let idempotency_key_header = idempotency_key_header(method, &request_path, options)?;
1213 let mut stream = target.connect(method, &request_path, timeout)?;
1214 let mut request = format!(
1215 "{method} {request_path} HTTP/1.1\r\nHost: {}\r\nAccept: application/json\r\nConnection: close\r\nContent-Length: {}\r\nUser-Agent: {NAME}/{VERSION}\r\n",
1216 target.authority,
1217 body_bytes.len()
1218 );
1219 if !self.config.token.is_empty() {
1220 request.push_str(&format!("Authorization: Bearer {}\r\n", self.config.token));
1221 }
1222 request.push_str(&idempotency_key_header);
1223 request.push_str(&self.actor_headers(options)?);
1224 if !body_bytes.is_empty() {
1225 request.push_str("Content-Type: application/json\r\n");
1226 }
1227 request.push_str("\r\n");
1228 stream
1229 .write_all(request.as_bytes())
1230 .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1231 if !body_bytes.is_empty() {
1232 stream
1233 .write_all(&body_bytes)
1234 .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1235 }
1236 stream
1237 .flush()
1238 .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1239 let mut response = String::new();
1240 stream
1241 .read_to_string(&mut response)
1242 .map_err(|error| map_request_io_error(method, &request_path, timeout, error))?;
1243 if response.is_empty() {
1244 return Err(TraceDbClientError::Io(
1245 "connection closed before response".to_string(),
1246 ));
1247 }
1248 parse_response(method, &request_path, &response)
1249 }
1250
1251 pub fn table(&self, table: impl Into<String>) -> TableHandle {
1252 QueryBuilder {
1253 client_config: Some(self.config.clone()),
1254 table: table.into(),
1255 tenant_id: None,
1256 text_field: None,
1257 text_query: None,
1258 vector_field: None,
1259 vector: None,
1260 scalar_eq: Map::new(),
1261 freshness: FeatureFreshnessMode::Strict,
1262 limit: 10,
1263 cursor: None,
1264 explain: true,
1265 }
1266 }
1267
1268 fn get_json(&self, path: &str) -> TraceDbClientResult<Value> {
1269 self.request_json("GET", path, None)
1270 }
1271
1272 fn get_typed<T: for<'de> Deserialize<'de>>(&self, path: &str) -> TraceDbClientResult<T> {
1273 decode_typed("GET", path, self.get_json(path)?)
1274 }
1275
1276 fn post_json<T: Serialize>(&self, path: &str, body: &T) -> TraceDbClientResult<Value> {
1277 let value = serde_json::to_value(body)?;
1278 self.request_json("POST", path, Some(&value))
1279 }
1280
1281 fn post_json_with_options<T: Serialize>(
1282 &self,
1283 path: &str,
1284 body: &T,
1285 options: &TraceDbRequestOptions,
1286 ) -> TraceDbClientResult<Value> {
1287 let value = serde_json::to_value(body)?;
1288 self.request_json_with_options("POST", path, Some(&value), options)
1289 }
1290
1291 fn post_typed<T: Serialize, R: for<'de> Deserialize<'de>>(
1292 &self,
1293 path: &str,
1294 body: &T,
1295 ) -> TraceDbClientResult<R> {
1296 decode_typed("POST", path, self.post_json(path, body)?)
1297 }
1298
1299 fn post_typed_with_options<T: Serialize, R: for<'de> Deserialize<'de>>(
1300 &self,
1301 path: &str,
1302 body: &T,
1303 options: &TraceDbRequestOptions,
1304 ) -> TraceDbClientResult<R> {
1305 decode_typed(
1306 "POST",
1307 path,
1308 self.post_json_with_options(path, body, options)?,
1309 )
1310 }
1311
1312 fn request_body_bytes(&self, body: Option<&Value>) -> TraceDbClientResult<Vec<u8>> {
1313 let Some(body) = body else {
1314 return Ok(Vec::new());
1315 };
1316 let mut body = body.clone();
1317 self.inject_route_metadata(&mut body);
1318 Ok(serde_json::to_vec(&body)?)
1319 }
1320
1321 fn inject_route_metadata(&self, body: &mut Value) {
1322 let Value::Object(body) = body else {
1323 return;
1324 };
1325 if let Some(database_id) = &self.config.database_id {
1326 body.entry("database_id".to_string())
1327 .or_insert_with(|| Value::String(database_id.clone()));
1328 }
1329 if !body.contains_key("branch_id") {
1330 let branch_id = self.config.branch_id.clone().or_else(|| {
1331 self.config.database_id.as_ref().and_then(|_| {
1332 body.get("database_id")
1333 .and_then(Value::as_str)
1334 .map(|database_id| format!("{database_id}:main"))
1335 })
1336 });
1337 if let Some(branch_id) = branch_id {
1338 body.insert("branch_id".to_string(), Value::String(branch_id));
1339 }
1340 }
1341 }
1342
1343 fn actor_headers(&self, options: &TraceDbRequestOptions) -> TraceDbClientResult<String> {
1344 let mut headers = String::new();
1345 for (name, value) in self.actor_header_pairs(options)? {
1346 headers.push_str(&header_line(name, &value)?);
1347 }
1348 Ok(headers)
1349 }
1350
1351 fn actor_header_pairs(
1352 &self,
1353 options: &TraceDbRequestOptions,
1354 ) -> TraceDbClientResult<Vec<(&'static str, String)>> {
1355 let mut headers = Vec::new();
1356 if let Some(actor) = &options.actor_context {
1357 headers.push(("x-tracedb-tenant-id", actor.tenant_id.clone()));
1358 headers.push(("x-tracedb-database-id", actor.database_id.clone()));
1359 headers.push(("x-tracedb-branch-id", actor.branch_id.clone()));
1360 headers.push(("x-tracedb-token-identity", actor.token_identity.clone()));
1361 headers.push(("x-tracedb-request-id", actor.request_id.clone()));
1362 headers.push(("x-tracedb-policy-epoch", actor.policy_epoch.to_string()));
1363 if !actor.scopes.is_empty() {
1364 headers.push(("x-tracedb-scopes", actor.scopes.join(",")));
1365 }
1366 } else {
1367 if let Some(database_id) = &self.config.database_id {
1368 headers.push(("x-tracedb-database-id", database_id.clone()));
1369 }
1370 if let Some(branch_id) = &self.config.branch_id {
1371 headers.push(("x-tracedb-branch-id", branch_id.clone()));
1372 }
1373 }
1374 for (name, value) in &headers {
1375 validate_header_value(name, value)?;
1376 }
1377 Ok(headers)
1378 }
1379}
1380
1381#[derive(Clone, Debug)]
1382pub struct TraceDbAsyncClient {
1384 inner: TraceDbClient,
1385 http_client: reqwest::Client,
1386}
1387
1388impl TraceDbAsyncClient {
1389 pub fn new(config: TraceDbClientConfig) -> Self {
1390 let http_client = reqwest::Client::builder()
1391 .pool_max_idle_per_host(16)
1392 .build()
1393 .expect("TraceDB async HTTP client configuration is valid");
1394 Self {
1395 inner: TraceDbClient::new(config),
1396 http_client,
1397 }
1398 }
1399
1400 pub fn from_blocking(client: TraceDbClient) -> Self {
1401 let http_client = reqwest::Client::builder()
1402 .pool_max_idle_per_host(16)
1403 .build()
1404 .expect("TraceDB async HTTP client configuration is valid");
1405 Self {
1406 inner: client,
1407 http_client,
1408 }
1409 }
1410
1411 pub fn blocking_client(&self) -> &TraceDbClient {
1412 &self.inner
1413 }
1414
1415 pub async fn request_json(
1416 &self,
1417 method: &str,
1418 path: &str,
1419 body: Option<&Value>,
1420 ) -> TraceDbClientResult<Value> {
1421 self.request_json_with_options(method, path, body, &TraceDbRequestOptions::default())
1422 .await
1423 }
1424
1425 pub async fn request_json_with_options(
1426 &self,
1427 method: &str,
1428 path: &str,
1429 body: Option<&Value>,
1430 options: &TraceDbRequestOptions,
1431 ) -> TraceDbClientResult<Value> {
1432 let attempts = self.inner.request_attempts(method, path, body, options);
1433 for attempt in 0..attempts {
1434 match self.request_json_once(method, path, body, options).await {
1435 Ok(value) => return Ok(value),
1436 Err(error) if is_retryable_error(&error) && attempt + 1 < attempts => {
1437 tokio::time::sleep(retry_backoff_delay(attempt)).await;
1438 }
1439 Err(error) => return Err(error),
1440 }
1441 }
1442 unreachable!("request attempts should be at least one")
1443 }
1444
1445 pub async fn ready(&self) -> TraceDbClientResult<Value> {
1446 self.request_json("GET", "/v1/ready", None).await
1447 }
1448
1449 pub async fn ready_typed(&self) -> TraceDbClientResult<ReadyResponse> {
1450 self.get_typed("/v1/ready").await
1451 }
1452
1453 pub async fn health(&self) -> TraceDbClientResult<Value> {
1454 self.request_json("GET", "/v1/health", None).await
1455 }
1456
1457 pub async fn health_typed(&self) -> TraceDbClientResult<HealthResponse> {
1458 self.get_typed("/v1/health").await
1459 }
1460
1461 pub async fn list_databases_typed(&self) -> TraceDbClientResult<DatabasesResponse> {
1462 self.get_typed("/v1/databases").await
1463 }
1464
1465 pub async fn list_branches_typed(&self) -> TraceDbClientResult<BranchesResponse> {
1466 self.get_typed("/v1/branches").await
1467 }
1468
1469 pub async fn public_safe_metrics_typed(&self) -> TraceDbClientResult<MetricsResponse> {
1470 self.get_typed("/v1/metrics/public-safe").await
1471 }
1472
1473 pub async fn list_admin_jobs_typed(&self) -> TraceDbClientResult<JobsResponse> {
1474 self.get_typed("/v1/admin/jobs").await
1475 }
1476
1477 pub async fn apply_schema_typed(
1478 &self,
1479 schema: &TableSchema,
1480 ) -> TraceDbClientResult<EpochResponse> {
1481 self.post_typed("/v1/schema/apply", schema).await
1482 }
1483
1484 pub async fn apply_schema_typed_with_options(
1485 &self,
1486 schema: &TableSchema,
1487 options: &TraceDbRequestOptions,
1488 ) -> TraceDbClientResult<EpochResponse> {
1489 self.post_typed_with_options("/v1/schema/apply", schema, options)
1490 .await
1491 }
1492
1493 pub async fn put_typed(&self, record: &RecordInput) -> TraceDbClientResult<EpochResponse> {
1494 self.post_typed("/v1/records/put", record).await
1495 }
1496
1497 pub async fn put_typed_with_options(
1498 &self,
1499 record: &RecordInput,
1500 options: &TraceDbRequestOptions,
1501 ) -> TraceDbClientResult<EpochResponse> {
1502 self.post_typed_with_options("/v1/records/put", record, options)
1503 .await
1504 }
1505
1506 pub async fn put_batch_typed(
1507 &self,
1508 request: &RecordPutBatchRequest,
1509 ) -> TraceDbClientResult<PutBatchResponse> {
1510 self.post_typed("/v1/records/put-batch", request).await
1511 }
1512
1513 pub async fn put_batch_typed_with_options(
1514 &self,
1515 request: &RecordPutBatchRequest,
1516 options: &TraceDbRequestOptions,
1517 ) -> TraceDbClientResult<PutBatchResponse> {
1518 self.post_typed_with_options("/v1/records/put-batch", request, options)
1519 .await
1520 }
1521
1522 pub async fn patch_typed(
1523 &self,
1524 request: &RecordPatchRequest,
1525 ) -> TraceDbClientResult<EpochResponse> {
1526 self.post_typed("/v1/records/patch", request).await
1527 }
1528
1529 pub async fn patch_typed_with_options(
1530 &self,
1531 request: &RecordPatchRequest,
1532 options: &TraceDbRequestOptions,
1533 ) -> TraceDbClientResult<EpochResponse> {
1534 self.post_typed_with_options("/v1/records/patch", request, options)
1535 .await
1536 }
1537
1538 pub async fn delete_typed(
1539 &self,
1540 request: &RecordDeleteRequest,
1541 ) -> TraceDbClientResult<DeleteResponse> {
1542 self.post_typed("/v1/records/delete", request).await
1543 }
1544
1545 pub async fn delete_typed_with_options(
1546 &self,
1547 request: &RecordDeleteRequest,
1548 options: &TraceDbRequestOptions,
1549 ) -> TraceDbClientResult<DeleteResponse> {
1550 self.post_typed_with_options("/v1/records/delete", request, options)
1551 .await
1552 }
1553
1554 pub async fn get_record_typed(
1555 &self,
1556 request: &RecordGetRequest,
1557 ) -> TraceDbClientResult<GetRecordResponse> {
1558 self.post_typed("/v1/records/get", request).await
1559 }
1560
1561 pub async fn scan_typed(
1562 &self,
1563 request: &RecordScanRequest,
1564 ) -> TraceDbClientResult<RecordScanOutput> {
1565 self.post_typed("/v1/records/scan", request).await
1566 }
1567
1568 pub async fn query_typed(&self, query: &HybridQuery) -> TraceDbClientResult<QueryResponse> {
1569 self.post_typed("/v1/query", query).await
1570 }
1571
1572 pub async fn traceql_typed(
1573 &self,
1574 query: impl Into<String>,
1575 ) -> TraceDbClientResult<QueryResponse> {
1576 let request = TraceQlQueryRequest::new(query);
1577 self.post_typed("/v1/traceql", &request).await
1578 }
1579
1580 pub async fn graphql_typed(
1581 &self,
1582 query: impl Into<String>,
1583 ) -> TraceDbClientResult<GraphQlResponse> {
1584 let request = GraphQlQueryRequest::new(query);
1585 self.post_typed("/v1/graphql", &request).await
1586 }
1587
1588 pub async fn bounded_graphql_typed(
1589 &self,
1590 query: impl Into<String>,
1591 ) -> TraceDbClientResult<QueryResponse> {
1592 let request = GraphQlQueryRequest::new(query);
1593 self.post_typed("/v1/graphql/bounded", &request).await
1594 }
1595
1596 pub async fn graphql_schema_typed(&self) -> TraceDbClientResult<GraphQlSchemaResponse> {
1597 self.get_typed("/v1/graphql/schema").await
1598 }
1599
1600 pub async fn explain_typed(&self, query: &HybridQuery) -> TraceDbClientResult<HybridExplain> {
1601 self.post_typed("/v1/explain", query).await
1602 }
1603
1604 pub async fn compact_typed(&self) -> TraceDbClientResult<CompactResponse> {
1605 self.post_typed("/v1/admin/compact", &json!({})).await
1606 }
1607
1608 pub async fn compact_typed_with_options(
1609 &self,
1610 options: &TraceDbRequestOptions,
1611 ) -> TraceDbClientResult<CompactResponse> {
1612 self.post_typed_with_options("/v1/admin/compact", &json!({}), options)
1613 .await
1614 }
1615
1616 pub async fn snapshot_typed(
1617 &self,
1618 request: &SnapshotRequest,
1619 ) -> TraceDbClientResult<SnapshotResponse> {
1620 self.post_typed("/v1/admin/snapshot", request).await
1621 }
1622
1623 pub async fn snapshot_typed_with_options(
1624 &self,
1625 request: &SnapshotRequest,
1626 options: &TraceDbRequestOptions,
1627 ) -> TraceDbClientResult<SnapshotResponse> {
1628 self.post_typed_with_options("/v1/admin/snapshot", request, options)
1629 .await
1630 }
1631
1632 pub async fn restore_typed(
1633 &self,
1634 request: &RestoreRequest,
1635 ) -> TraceDbClientResult<RestoreResponse> {
1636 self.post_typed("/v1/admin/restore", request).await
1637 }
1638
1639 pub async fn restore_typed_with_options(
1640 &self,
1641 request: &RestoreRequest,
1642 options: &TraceDbRequestOptions,
1643 ) -> TraceDbClientResult<RestoreResponse> {
1644 self.post_typed_with_options("/v1/admin/restore", request, options)
1645 .await
1646 }
1647
1648 async fn get_typed<T: for<'de> Deserialize<'de>>(&self, path: &str) -> TraceDbClientResult<T> {
1649 decode_typed("GET", path, self.request_json("GET", path, None).await?)
1650 }
1651
1652 async fn post_typed<B, R>(&self, path: &str, body: &B) -> TraceDbClientResult<R>
1653 where
1654 B: Serialize,
1655 R: for<'de> Deserialize<'de>,
1656 {
1657 let value = serde_json::to_value(body)?;
1658 decode_typed(
1659 "POST",
1660 path,
1661 self.request_json("POST", path, Some(&value)).await?,
1662 )
1663 }
1664
1665 async fn post_typed_with_options<B, R>(
1666 &self,
1667 path: &str,
1668 body: &B,
1669 options: &TraceDbRequestOptions,
1670 ) -> TraceDbClientResult<R>
1671 where
1672 B: Serialize,
1673 R: for<'de> Deserialize<'de>,
1674 {
1675 let value = serde_json::to_value(body)?;
1676 decode_typed(
1677 "POST",
1678 path,
1679 self.request_json_with_options("POST", path, Some(&value), options)
1680 .await?,
1681 )
1682 }
1683
1684 async fn request_json_once(
1685 &self,
1686 method: &str,
1687 path: &str,
1688 body: Option<&Value>,
1689 options: &TraceDbRequestOptions,
1690 ) -> TraceDbClientResult<Value> {
1691 let target = HttpTarget::parse(&self.inner.config.url)?;
1692 let request_path = target.path(path);
1693 let body_bytes = self.inner.request_body_bytes(body)?;
1694 let timeout = self.inner.config.request_timeout();
1695 let method_value = reqwest::Method::from_bytes(method.as_bytes()).map_err(|error| {
1696 TraceDbClientError::InvalidRequest {
1697 method: method.to_string(),
1698 path: request_path.clone(),
1699 message: format!("invalid HTTP method: {error}"),
1700 }
1701 })?;
1702 let url = format!("http://{}{}", target.authority, request_path);
1703 let mut request = self
1704 .http_client
1705 .request(method_value, url)
1706 .timeout(timeout)
1707 .header(reqwest::header::ACCEPT, "application/json")
1708 .header(
1709 reqwest::header::CONTENT_LENGTH,
1710 body_bytes.len().to_string(),
1711 )
1712 .header("User-Agent", format!("{NAME}/{VERSION}"));
1713 if !self.inner.config.token.is_empty() {
1714 request = request.bearer_auth(&self.inner.config.token);
1715 }
1716 if let Some(key) = validated_idempotency_key(method, &request_path, options)? {
1717 request = request.header("Idempotency-Key", key);
1718 }
1719 for (name, value) in self.inner.actor_header_pairs(options)? {
1720 request = request.header(name, value);
1721 }
1722 if !body_bytes.is_empty() {
1723 request = request.header(reqwest::header::CONTENT_TYPE, "application/json");
1724 }
1725 let response = request
1726 .body(body_bytes)
1727 .send()
1728 .await
1729 .map_err(|error| map_reqwest_error(method, &request_path, timeout, error))?;
1730 let status = response.status().as_u16();
1731 let bytes = response
1732 .bytes()
1733 .await
1734 .map_err(|error| map_reqwest_error(method, &request_path, timeout, error))?;
1735 if !(200..300).contains(&status) {
1736 return Err(TraceDbClientError::HttpStatus {
1737 method: method.to_string(),
1738 path: request_path,
1739 status,
1740 body: String::from_utf8_lossy(&bytes).to_string(),
1741 });
1742 }
1743 if bytes.iter().all(u8::is_ascii_whitespace) || bytes.is_empty() {
1744 return Ok(Value::Null);
1745 }
1746 serde_json::from_slice(&bytes).map_err(|error| TraceDbClientError::InvalidResponse {
1747 method: method.to_string(),
1748 path: request_path,
1749 message: format!("invalid JSON body: {error}"),
1750 })
1751 }
1752}
1753
1754#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1755pub struct ReadyResponse {
1756 #[serde(default)]
1757 pub ok: Option<bool>,
1758 pub ready: bool,
1759 #[serde(default)]
1760 pub service: Option<String>,
1761 #[serde(default)]
1762 pub latest_epoch: Option<u64>,
1763 #[serde(default)]
1764 pub durable_epoch: Option<u64>,
1765 #[serde(default)]
1766 pub recovery_state: Option<String>,
1767 #[serde(default)]
1768 pub engine_url: Option<String>,
1769 #[serde(default)]
1770 pub engine_health_checked: Option<bool>,
1771 #[serde(default)]
1772 pub engine_status_code: Option<u16>,
1773 #[serde(default)]
1774 pub catalog_databases: Option<u64>,
1775 #[serde(default)]
1776 pub metered_requests: Option<u64>,
1777 #[serde(default)]
1778 pub error: Option<String>,
1779}
1780
1781#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1782pub struct HealthResponse {
1783 pub ok: bool,
1784 #[serde(default)]
1785 pub service: Option<String>,
1786 #[serde(default)]
1787 pub engine_url: Option<String>,
1788 #[serde(default)]
1789 pub catalog_databases: Option<u64>,
1790 #[serde(default)]
1791 pub metered_requests: Option<u64>,
1792}
1793
1794#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1795pub struct DatabaseSummary {
1796 pub database_id: String,
1797 #[serde(default)]
1798 pub org_id: Option<String>,
1799 #[serde(default)]
1800 pub project_id: Option<String>,
1801 #[serde(default)]
1802 pub name: Option<String>,
1803 #[serde(default)]
1804 pub region: Option<String>,
1805 #[serde(default)]
1806 pub endpoint: Option<String>,
1807}
1808
1809#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1810pub struct DatabasesResponse {
1811 pub databases: Vec<DatabaseSummary>,
1812 #[serde(default)]
1813 pub gateway: Option<bool>,
1814 #[serde(default)]
1815 pub mode: Option<String>,
1816}
1817
1818#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1819pub struct BranchSummary {
1820 pub branch_id: String,
1821 #[serde(default)]
1822 pub database_id: Option<String>,
1823 #[serde(default)]
1824 pub parent_branch_id: Option<String>,
1825 #[serde(default)]
1826 pub state: Option<String>,
1827 #[serde(default)]
1828 pub endpoint: Option<String>,
1829 #[serde(default)]
1830 pub latest_epoch: Option<u64>,
1831}
1832
1833#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1834pub struct BranchesResponse {
1835 pub branches: Vec<BranchSummary>,
1836 #[serde(default)]
1837 pub gateway: Option<bool>,
1838}
1839
1840#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1841pub struct MetricsResponse {
1842 #[serde(default)]
1843 pub gateway: Option<bool>,
1844 #[serde(default)]
1845 pub service: Option<String>,
1846 #[serde(default)]
1847 pub latest_epoch: Option<u64>,
1848 #[serde(default)]
1849 pub durable_epoch: Option<u64>,
1850 #[serde(default)]
1851 pub segment_count: Option<usize>,
1852 #[serde(default)]
1853 pub index_count: Option<usize>,
1854 #[serde(default)]
1855 pub module_count: Option<usize>,
1856 #[serde(default)]
1857 pub schema_count: Option<usize>,
1858 #[serde(default)]
1859 pub recovery_state: Option<String>,
1860 #[serde(default)]
1861 pub requests: Option<u64>,
1862 #[serde(default)]
1863 pub rate_limit_enabled: Option<bool>,
1864 #[serde(default)]
1865 pub rate_limit_requests: Option<u64>,
1866}
1867
1868#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1869pub struct ErrorResponse {
1870 pub error: String,
1871 #[serde(default, skip_serializing_if = "Option::is_none")]
1872 pub code: Option<String>,
1873}
1874
1875#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1876pub struct EpochResponse {
1877 pub epoch: u64,
1878}
1879
1880#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1881pub struct PutBatchResponse {
1882 pub epoch: u64,
1883 pub record_count: usize,
1884 #[serde(default)]
1885 pub write_timing: Option<WritePathTiming>,
1886}
1887
1888#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1889pub struct DeleteResponse {
1890 pub deleted: bool,
1891 pub epoch: u64,
1892}
1893
1894#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1895pub struct GetRecordResponse {
1896 pub record: Option<RecordOutput>,
1897}
1898
1899#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1900pub struct QueryResponse {
1901 pub results: Vec<HybridQueryRow>,
1902 #[serde(default)]
1903 pub explain: Option<HybridExplain>,
1904 #[serde(default, skip_serializing_if = "Option::is_none")]
1905 pub next_cursor: Option<String>,
1906}
1907
1908#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1909pub struct TraceQlQueryRequest {
1910 pub query: String,
1911}
1912
1913impl TraceQlQueryRequest {
1914 pub fn new(query: impl Into<String>) -> Self {
1915 Self {
1916 query: query.into(),
1917 }
1918 }
1919
1920 pub fn command<T: Serialize>(
1921 command: impl AsRef<str>,
1922 payload: &T,
1923 ) -> TraceDbClientResult<Self> {
1924 Ok(Self {
1925 query: format!("{} {}", command.as_ref(), serde_json::to_string(payload)?),
1926 })
1927 }
1928
1929 pub fn schema_apply(schema: &TableSchema) -> TraceDbClientResult<Self> {
1930 Self::command("SCHEMA APPLY", schema)
1931 }
1932
1933 pub fn put(record: &RecordInput) -> TraceDbClientResult<Self> {
1934 Self::command("RECORD PUT", record)
1935 }
1936
1937 pub fn batch(request: &RecordPutBatchRequest) -> TraceDbClientResult<Self> {
1938 Self::command("RECORD BATCH", request)
1939 }
1940
1941 pub fn patch(request: &RecordPatchRequest) -> TraceDbClientResult<Self> {
1942 Self::command("RECORD PATCH", request)
1943 }
1944
1945 pub fn delete(request: &RecordDeleteRequest) -> TraceDbClientResult<Self> {
1946 Self::command("RECORD DELETE", request)
1947 }
1948
1949 pub fn get(request: &RecordGetRequest) -> TraceDbClientResult<Self> {
1950 Self::command("RECORD GET", request)
1951 }
1952
1953 pub fn scan(request: &RecordScanRequest) -> TraceDbClientResult<Self> {
1954 Self::command("RECORD SCAN", request)
1955 }
1956
1957 pub fn query(query: &HybridQuery) -> TraceDbClientResult<Self> {
1958 Self::command("QUERY", query)
1959 }
1960
1961 pub fn explain(query: &HybridQuery) -> TraceDbClientResult<Self> {
1962 Self::command("EXPLAIN", query)
1963 }
1964
1965 pub fn jobs_list() -> Self {
1966 Self {
1967 query: "JOBS LIST".to_string(),
1968 }
1969 }
1970}
1971
1972#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1973pub struct GraphQlQueryRequest {
1974 pub query: String,
1975 #[serde(default, skip_serializing_if = "Value::is_null")]
1976 pub variables: Value,
1977 #[serde(
1978 default,
1979 skip_serializing_if = "Option::is_none",
1980 alias = "operationName"
1981 )]
1982 pub operation_name: Option<String>,
1983}
1984
1985impl GraphQlQueryRequest {
1986 pub fn new(query: impl Into<String>) -> Self {
1987 Self {
1988 query: query.into(),
1989 variables: Value::Null,
1990 operation_name: None,
1991 }
1992 }
1993
1994 pub fn with_variables(mut self, variables: Value) -> Self {
1995 self.variables = variables;
1996 self
1997 }
1998
1999 pub fn with_operation_name(mut self, operation_name: impl Into<String>) -> Self {
2000 self.operation_name = Some(operation_name.into());
2001 self
2002 }
2003}
2004
2005#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2006pub struct GraphQlResponse {
2007 #[serde(default)]
2008 pub data: Value,
2009 #[serde(default, skip_serializing_if = "Vec::is_empty")]
2010 pub errors: Vec<GraphQlError>,
2011}
2012
2013#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2014pub struct GraphQlError {
2015 pub message: String,
2016 #[serde(default, skip_serializing_if = "Option::is_none")]
2017 pub path: Option<Value>,
2018 #[serde(default, skip_serializing_if = "Option::is_none")]
2019 pub extensions: Option<Value>,
2020}
2021
2022#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2023pub struct GraphQlSchemaResponse {
2024 pub adapter: String,
2025 pub schema: String,
2026 pub tables: Vec<String>,
2027 #[serde(alias = "execution_caveat")]
2028 pub execution: String,
2029}
2030
2031#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2032pub struct CompactResponse {
2033 pub compacted: bool,
2034}
2035
2036#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2037pub struct SnapshotRequest {
2038 pub target: String,
2039}
2040
2041impl SnapshotRequest {
2042 pub fn new(target: impl Into<String>) -> Self {
2043 Self {
2044 target: target.into(),
2045 }
2046 }
2047}
2048
2049#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2050pub struct SnapshotResponse {
2051 pub snapshot: bool,
2052 pub target: String,
2053}
2054
2055#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2056pub struct RestoreRequest {
2057 pub source: String,
2058 pub target: String,
2059 #[serde(skip_serializing_if = "Option::is_none")]
2060 pub verify_record: Option<RecordGetRequest>,
2061}
2062
2063impl RestoreRequest {
2064 pub fn new(source: impl Into<String>, target: impl Into<String>) -> Self {
2065 Self {
2066 source: source.into(),
2067 target: target.into(),
2068 verify_record: None,
2069 }
2070 }
2071
2072 pub fn verify_record(mut self, request: RecordGetRequest) -> Self {
2073 self.verify_record = Some(request);
2074 self
2075 }
2076}
2077
2078#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2079pub struct RestoreResponse {
2080 pub restored: bool,
2081 pub source: String,
2082 pub target: String,
2083 #[serde(default)]
2084 pub verification: Option<RestoreVerification>,
2085}
2086
2087#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2088pub struct RestoreVerification {
2089 pub status: String,
2090 pub record_visible: bool,
2091 #[serde(default)]
2092 pub request: Option<RecordGetRequest>,
2093 #[serde(default)]
2094 pub record: Option<RecordOutput>,
2095}
2096
2097#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2098pub struct AdminJob {
2099 pub queue: String,
2100 pub state: String,
2101}
2102
2103#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
2104pub struct JobsResponse {
2105 pub jobs: Vec<AdminJob>,
2106}
2107
2108#[derive(Clone, Debug, Eq, PartialEq)]
2109struct HttpTarget {
2110 authority: String,
2111 host: String,
2112 port: u16,
2113 base_path: String,
2114}
2115
2116impl HttpTarget {
2117 fn parse(url: &str) -> TraceDbClientResult<Self> {
2118 let without_scheme = url
2119 .strip_prefix("http://")
2120 .ok_or_else(|| TraceDbClientError::InvalidUrl(url.to_string()))?;
2121 let (authority, base_path) = without_scheme
2122 .split_once('/')
2123 .map(|(authority, path)| (authority, format!("/{path}")))
2124 .unwrap_or((without_scheme, String::new()));
2125 if authority.is_empty() {
2126 return Err(TraceDbClientError::InvalidUrl(url.to_string()));
2127 }
2128 let (host, port) = if let Some((host, port)) = authority.rsplit_once(':') {
2129 let parsed_port = port
2130 .parse::<u16>()
2131 .map_err(|_| TraceDbClientError::InvalidUrl(url.to_string()))?;
2132 (host.to_string(), parsed_port)
2133 } else {
2134 (authority.to_string(), 80)
2135 };
2136 if host.is_empty() {
2137 return Err(TraceDbClientError::InvalidUrl(url.to_string()));
2138 }
2139 Ok(Self {
2140 authority: authority.to_string(),
2141 host,
2142 port,
2143 base_path,
2144 })
2145 }
2146
2147 fn connect(
2148 &self,
2149 method: &str,
2150 path: &str,
2151 timeout: Duration,
2152 ) -> TraceDbClientResult<TcpStream> {
2153 let socket_addr = self.socket_addr(method, path, timeout)?;
2154 let stream = TcpStream::connect_timeout(&socket_addr, timeout)
2155 .map_err(|error| map_request_io_error(method, path, timeout, error))?;
2156 stream
2157 .set_read_timeout(Some(timeout))
2158 .map_err(|error| map_request_io_error(method, path, timeout, error))?;
2159 stream
2160 .set_write_timeout(Some(timeout))
2161 .map_err(|error| map_request_io_error(method, path, timeout, error))?;
2162 Ok(stream)
2163 }
2164
2165 fn socket_addr(
2166 &self,
2167 method: &str,
2168 path: &str,
2169 timeout: Duration,
2170 ) -> TraceDbClientResult<SocketAddr> {
2171 (self.host.as_str(), self.port)
2172 .to_socket_addrs()
2173 .map_err(|error| map_request_io_error(method, path, timeout, error))?
2174 .next()
2175 .ok_or_else(|| TraceDbClientError::InvalidUrl(self.authority.clone()))
2176 }
2177
2178 fn path(&self, path: &str) -> String {
2179 if self.base_path.is_empty() {
2180 path.to_string()
2181 } else {
2182 format!(
2183 "{}/{}",
2184 self.base_path.trim_end_matches('/'),
2185 path.trim_start_matches('/')
2186 )
2187 }
2188 }
2189}
2190
2191fn default_request_timeout_ms() -> u64 {
2192 30_000
2193}
2194
2195fn timeout_ms(timeout: Duration) -> u64 {
2196 timeout.as_millis().clamp(1, u64::MAX as u128) as u64
2197}
2198
2199fn retry_backoff_delay(attempt: u8) -> Duration {
2200 let shift = u32::from(attempt).min(16);
2201 let base_ms = 100_u64.saturating_mul(1_u64 << shift).min(5_000);
2202 let jitter_quarter = base_ms / 4;
2203 let jitter_range = jitter_quarter.saturating_mul(2).saturating_add(1);
2204 let jitter_offset = SystemTime::now()
2205 .duration_since(UNIX_EPOCH)
2206 .unwrap_or_default()
2207 .subsec_nanos() as u64
2208 % jitter_range;
2209 let delay_ms = base_ms
2210 .saturating_sub(jitter_quarter)
2211 .saturating_add(jitter_offset)
2212 .clamp(1, 5_000);
2213 Duration::from_millis(delay_ms)
2214}
2215
2216fn required_env(variable: &str, value: Option<String>) -> TraceDbClientResult<String> {
2217 match value {
2218 Some(value) if !value.trim().is_empty() => Ok(value),
2219 _ => Err(TraceDbClientError::InvalidConfig {
2220 variable: variable.to_string(),
2221 message: format!("{variable} is required"),
2222 }),
2223 }
2224}
2225
2226fn optional_env(variable: &str, value: Option<String>) -> TraceDbClientResult<Option<String>> {
2227 match value {
2228 Some(value) if value.trim().is_empty() => Err(TraceDbClientError::InvalidConfig {
2229 variable: variable.to_string(),
2230 message: format!("{variable} must not be empty when set"),
2231 }),
2232 Some(value) => Ok(Some(value)),
2233 None => Ok(None),
2234 }
2235}
2236
2237fn optional_positive_u64_env(
2238 variable: &str,
2239 value: Option<String>,
2240) -> TraceDbClientResult<Option<u64>> {
2241 let Some(value) = optional_env(variable, value)? else {
2242 return Ok(None);
2243 };
2244 let parsed = value
2245 .parse::<u64>()
2246 .map_err(|_| TraceDbClientError::InvalidConfig {
2247 variable: variable.to_string(),
2248 message: format!("{variable} must be a positive integer"),
2249 })?;
2250 if parsed == 0 {
2251 return Err(TraceDbClientError::InvalidConfig {
2252 variable: variable.to_string(),
2253 message: format!("{variable} must be greater than 0"),
2254 });
2255 }
2256 Ok(Some(parsed))
2257}
2258
2259fn optional_u8_env(variable: &str, value: Option<String>) -> TraceDbClientResult<Option<u8>> {
2260 let Some(value) = optional_env(variable, value)? else {
2261 return Ok(None);
2262 };
2263 value
2264 .parse::<u8>()
2265 .map(Some)
2266 .map_err(|_| TraceDbClientError::InvalidConfig {
2267 variable: variable.to_string(),
2268 message: format!("{variable} must be an integer from 0 to 255"),
2269 })
2270}
2271
2272fn idempotency_key_header(
2273 method: &str,
2274 path: &str,
2275 options: &TraceDbRequestOptions,
2276) -> TraceDbClientResult<String> {
2277 let Some(key) = validated_idempotency_key(method, path, options)? else {
2278 return Ok(String::new());
2279 };
2280 Ok(format!("Idempotency-Key: {key}\r\n"))
2281}
2282
2283fn validated_idempotency_key<'a>(
2284 method: &str,
2285 path: &str,
2286 options: &'a TraceDbRequestOptions,
2287) -> TraceDbClientResult<Option<&'a str>> {
2288 let Some(key) = options.idempotency_key.as_deref() else {
2289 return Ok(None);
2290 };
2291 if key.is_empty() || key.contains('\r') || key.contains('\n') {
2292 return Err(TraceDbClientError::InvalidRequest {
2293 method: method.to_string(),
2294 path: path.to_string(),
2295 message: "idempotency key must be non-empty and must not contain CR or LF".to_string(),
2296 });
2297 }
2298 Ok(Some(key))
2299}
2300
2301fn header_line(name: &str, value: &str) -> TraceDbClientResult<String> {
2302 validate_header_value(name, value)?;
2303 Ok(format!("{name}: {value}\r\n"))
2304}
2305
2306fn validate_header_value(name: &str, value: &str) -> TraceDbClientResult<()> {
2307 if value.contains('\r') || value.contains('\n') {
2308 return Err(TraceDbClientError::InvalidRequest {
2309 method: "CONFIG".to_string(),
2310 path: name.to_string(),
2311 message: "header values must not contain CR or LF".to_string(),
2312 });
2313 }
2314 Ok(())
2315}
2316
2317fn map_request_io_error(
2318 method: &str,
2319 path: &str,
2320 timeout: Duration,
2321 error: std::io::Error,
2322) -> TraceDbClientError {
2323 if matches!(
2324 error.kind(),
2325 std::io::ErrorKind::TimedOut | std::io::ErrorKind::WouldBlock
2326 ) {
2327 TraceDbClientError::Timeout {
2328 method: method.to_string(),
2329 path: path.to_string(),
2330 timeout_ms: timeout_ms(timeout),
2331 }
2332 } else {
2333 TraceDbClientError::Io(error.to_string())
2334 }
2335}
2336
2337fn map_reqwest_error(
2338 method: &str,
2339 path: &str,
2340 timeout: Duration,
2341 error: reqwest::Error,
2342) -> TraceDbClientError {
2343 if error.is_timeout() {
2344 TraceDbClientError::Timeout {
2345 method: method.to_string(),
2346 path: path.to_string(),
2347 timeout_ms: timeout_ms(timeout),
2348 }
2349 } else {
2350 TraceDbClientError::Io(error.to_string())
2351 }
2352}
2353
2354fn is_retry_safe_request(method: &str, path: &str, body: Option<&Value>) -> bool {
2355 match (method, strip_query(path)) {
2356 ("GET", "/v1/health" | "/v1/ready" | "/v1/graphql/schema")
2357 | (
2358 "POST",
2359 "/v1/records/get"
2360 | "/v1/records/scan"
2361 | "/v1/query"
2362 | "/v1/graphql/bounded"
2363 | "/v1/explain",
2364 ) => true,
2365 ("POST", "/v1/traceql") => traceql_body_is_read_only(body),
2366 ("POST", "/v1/graphql") => graphql_body_is_read_only(body),
2367 _ => false,
2368 }
2369}
2370
2371fn traceql_body_is_read_only(body: Option<&Value>) -> bool {
2372 let Some(query) = body_query(body) else {
2373 return false;
2374 };
2375 let Some(command) = traceql_command(query) else {
2376 return true;
2377 };
2378 matches!(
2379 command,
2380 "RECORD GET" | "GET" | "RECORD SCAN" | "SCAN" | "QUERY" | "EXPLAIN" | "JOBS LIST"
2381 )
2382}
2383
2384fn traceql_command(input: &str) -> Option<&'static str> {
2385 let trimmed = input.trim_start();
2386 for command in [
2387 "SCHEMA APPLY",
2388 "RECORD PUT",
2389 "RECORD BATCH",
2390 "RECORD PATCH",
2391 "RECORD DELETE",
2392 "RECORD GET",
2393 "RECORD SCAN",
2394 "ADMIN COMPACT",
2395 "ADMIN SNAPSHOT",
2396 "ADMIN RESTORE",
2397 "JOBS LIST",
2398 "JOBS RUN",
2399 "EXPLAIN",
2400 "QUERY",
2401 "PUT",
2402 "BATCH",
2403 "PATCH",
2404 "DELETE",
2405 "GET",
2406 "SCAN",
2407 "COMPACT",
2408 "SNAPSHOT",
2409 "RESTORE",
2410 ] {
2411 if trimmed.len() == command.len() && trimmed.eq_ignore_ascii_case(command) {
2412 return Some(command);
2413 }
2414 if trimmed.len() > command.len()
2415 && trimmed
2416 .get(..command.len())
2417 .is_some_and(|prefix| prefix.eq_ignore_ascii_case(command))
2418 && trimmed.as_bytes()[command.len()].is_ascii_whitespace()
2419 {
2420 return Some(command);
2421 }
2422 }
2423 None
2424}
2425
2426fn graphql_body_is_read_only(body: Option<&Value>) -> bool {
2427 let Some(query) = body_query(body) else {
2428 return false;
2429 };
2430 graphql_root_field(query)
2431 .is_some_and(|field| matches!(field, "get" | "scan" | "query" | "explain" | "jobs"))
2432}
2433
2434fn graphql_root_field(query: &str) -> Option<&str> {
2435 let trimmed = query.trim_start();
2436 if word_starts_with(trimmed, "mutation") || word_starts_with(trimmed, "subscription") {
2437 return None;
2438 }
2439 let root = if word_starts_with(trimmed, "query") {
2440 trimmed.find('{').map(|index| &trimmed[index + 1..])?
2441 } else {
2442 trimmed.strip_prefix('{')?
2443 };
2444 let (name, rest) = parse_graphql_name(root)?;
2445 let rest = rest.trim_start();
2446 if let Some(rest) = rest.strip_prefix(':') {
2447 parse_graphql_name(rest).map(|(field, _)| field)
2448 } else {
2449 Some(name)
2450 }
2451}
2452
2453fn parse_graphql_name(input: &str) -> Option<(&str, &str)> {
2454 let trimmed = input.trim_start();
2455 let mut chars = trimmed.char_indices();
2456 let (_, first) = chars.next()?;
2457 if !(first == '_' || first.is_ascii_alphabetic()) {
2458 return None;
2459 }
2460 let mut end = first.len_utf8();
2461 for (index, ch) in chars {
2462 if ch == '_' || ch.is_ascii_alphanumeric() {
2463 end = index + ch.len_utf8();
2464 } else {
2465 return Some((&trimmed[..index], &trimmed[index..]));
2466 }
2467 }
2468 Some((&trimmed[..end], &trimmed[end..]))
2469}
2470
2471fn word_starts_with(input: &str, word: &str) -> bool {
2472 input
2473 .get(..word.len())
2474 .is_some_and(|prefix| prefix.eq_ignore_ascii_case(word))
2475 && input[word.len()..]
2476 .chars()
2477 .next()
2478 .map_or(true, |ch| !(ch == '_' || ch.is_ascii_alphanumeric()))
2479}
2480
2481fn body_query(body: Option<&Value>) -> Option<&str> {
2482 body?.get("query")?.as_str()
2483}
2484
2485fn is_idempotent_retry_request(method: &str, path: &str) -> bool {
2486 matches!(
2487 (method, strip_query(path)),
2488 ("POST", "/v1/schema/apply")
2489 | ("POST", "/v1/insert")
2490 | ("POST", "/v1/records/put")
2491 | ("POST", "/v1/records/put-batch")
2492 | ("POST", "/v1/records/patch")
2493 | ("POST", "/v1/records/delete")
2494 | ("POST", "/v1/admin/compact")
2495 | ("POST", "/v1/admin/snapshot")
2496 | ("POST", "/v1/admin/restore")
2497 | ("POST", "/v1/graphql")
2498 | ("POST", "/v1/traceql")
2499 )
2500}
2501
2502fn strip_query(path: &str) -> &str {
2503 path.split_once('?').map(|(path, _)| path).unwrap_or(path)
2504}
2505
2506fn is_retryable_error(error: &TraceDbClientError) -> bool {
2507 matches!(
2508 error,
2509 TraceDbClientError::Io(_) | TraceDbClientError::Timeout { .. }
2510 ) || matches!(error, TraceDbClientError::HttpStatus { status, .. } if *status >= 500)
2511}
2512
2513fn parse_response(method: &str, path: &str, response: &str) -> TraceDbClientResult<Value> {
2514 let (head, body) =
2515 response
2516 .split_once("\r\n\r\n")
2517 .ok_or_else(|| TraceDbClientError::InvalidResponse {
2518 method: method.to_string(),
2519 path: path.to_string(),
2520 message: "missing header boundary".to_string(),
2521 })?;
2522 let status_line = head
2523 .lines()
2524 .next()
2525 .ok_or_else(|| TraceDbClientError::InvalidResponse {
2526 method: method.to_string(),
2527 path: path.to_string(),
2528 message: "missing status line".to_string(),
2529 })?;
2530 let status = status_line
2531 .split_whitespace()
2532 .nth(1)
2533 .ok_or_else(|| TraceDbClientError::InvalidResponse {
2534 method: method.to_string(),
2535 path: path.to_string(),
2536 message: "missing status code".to_string(),
2537 })?
2538 .parse::<u16>()
2539 .map_err(|_| TraceDbClientError::InvalidResponse {
2540 method: method.to_string(),
2541 path: path.to_string(),
2542 message: status_line.to_string(),
2543 })?;
2544 if !(200..300).contains(&status) {
2545 return Err(TraceDbClientError::HttpStatus {
2546 method: method.to_string(),
2547 path: path.to_string(),
2548 status,
2549 body: body.to_string(),
2550 });
2551 }
2552 if body.trim().is_empty() {
2553 return Ok(Value::Null);
2554 }
2555 serde_json::from_str(body).map_err(|error| TraceDbClientError::InvalidResponse {
2556 method: method.to_string(),
2557 path: path.to_string(),
2558 message: format!("invalid JSON body: {error}"),
2559 })
2560}
2561
2562fn decode_typed<T: for<'de> Deserialize<'de>>(
2563 method: &str,
2564 path: &str,
2565 value: Value,
2566) -> TraceDbClientResult<T> {
2567 serde_json::from_value(value).map_err(|error| TraceDbClientError::InvalidResponse {
2568 method: method.to_string(),
2569 path: path.to_string(),
2570 message: format!("invalid JSON shape: {error}"),
2571 })
2572}
2573
2574#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2575pub struct TableRecordInput {
2576 pub id: String,
2577 pub fields: Map<String, Value>,
2578}
2579
2580impl TableRecordInput {
2581 pub fn new(id: impl Into<String>, fields: Map<String, Value>) -> Self {
2582 Self {
2583 id: id.into(),
2584 fields,
2585 }
2586 }
2587}
2588
2589#[derive(Clone, Debug)]
2590pub struct QueryBuilder {
2592 client_config: Option<TraceDbClientConfig>,
2593 table: String,
2594 tenant_id: Option<String>,
2595 text_field: Option<String>,
2596 text_query: Option<String>,
2597 vector_field: Option<String>,
2598 vector: Option<Vec<f32>>,
2599 scalar_eq: Map<String, Value>,
2600 freshness: FeatureFreshnessMode,
2601 limit: usize,
2602 cursor: Option<String>,
2603 explain: bool,
2604}
2605
2606pub type TableHandle = QueryBuilder;
2607
2608impl QueryBuilder {
2609 pub fn tenant(mut self, tenant_id: impl Into<String>) -> Self {
2610 self.tenant_id = Some(tenant_id.into());
2611 self
2612 }
2613
2614 pub fn where_eq(mut self, field: impl Into<String>, value: impl Into<Value>) -> Self {
2615 self.scalar_eq.insert(field.into(), value.into());
2616 self
2617 }
2618
2619 pub fn match_text(mut self, field: impl Into<String>, query: impl Into<String>) -> Self {
2620 self.text_field = Some(field.into());
2621 self.text_query = Some(query.into());
2622 self
2623 }
2624
2625 pub fn near(mut self, field: impl Into<String>, vector: Vec<f32>) -> Self {
2626 self.vector_field = Some(field.into());
2627 self.vector = Some(vector);
2628 self
2629 }
2630
2631 pub fn freshness(mut self, freshness: FeatureFreshnessMode) -> Self {
2632 self.freshness = freshness;
2633 self
2634 }
2635
2636 pub fn limit(mut self, limit: usize) -> Self {
2637 self.limit = limit;
2638 self
2639 }
2640
2641 pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
2642 self.cursor = Some(cursor.into());
2643 self
2644 }
2645
2646 pub fn with_explain(mut self) -> Self {
2647 self.explain = true;
2648 self
2649 }
2650
2651 pub fn query(&self) -> Self {
2652 self.clone()
2653 }
2654
2655 pub fn without_explain(mut self) -> Self {
2656 self.explain = false;
2657 self
2658 }
2659
2660 pub fn insert(
2661 &self,
2662 id: impl Into<String>,
2663 fields: Map<String, Value>,
2664 ) -> TraceDbClientResult<EpochResponse> {
2665 let options = TraceDbRequestOptions::default();
2666 self.insert_with_options(id, fields, &options)
2667 }
2668
2669 pub fn insert_with_options(
2670 &self,
2671 id: impl Into<String>,
2672 fields: Map<String, Value>,
2673 options: &TraceDbRequestOptions,
2674 ) -> TraceDbClientResult<EpochResponse> {
2675 let path = "/v1/records/put";
2676 let tenant_id = self.required_tenant_id("POST", path)?;
2677 let record = self.record_input(TableRecordInput::new(id, fields), &tenant_id);
2678 self.client("POST", path)?
2679 .put_typed_with_options(&record, options)
2680 }
2681
2682 pub fn insert_batch(
2683 &self,
2684 records: Vec<TableRecordInput>,
2685 ) -> TraceDbClientResult<PutBatchResponse> {
2686 let options = TraceDbRequestOptions::default();
2687 self.insert_batch_with_options(records, &options)
2688 }
2689
2690 pub fn insert_batch_with_options(
2691 &self,
2692 records: Vec<TableRecordInput>,
2693 options: &TraceDbRequestOptions,
2694 ) -> TraceDbClientResult<PutBatchResponse> {
2695 let path = "/v1/records/put-batch";
2696 let tenant_id = self.required_tenant_id("POST", path)?;
2697 let records = records
2698 .into_iter()
2699 .map(|record| self.record_input(record, &tenant_id))
2700 .collect();
2701 let request = RecordPutBatchRequest::new(records);
2702 self.client("POST", path)?
2703 .put_batch_typed_with_options(&request, options)
2704 }
2705
2706 pub fn insert_rows(
2707 &self,
2708 rows: Vec<Map<String, Value>>,
2709 ) -> TraceDbClientResult<PutBatchResponse> {
2710 let options = TraceDbRequestOptions::default();
2711 self.insert_rows_with_id_field_and_options(rows, "id", &options)
2712 }
2713
2714 pub fn insert_rows_with_options(
2715 &self,
2716 rows: Vec<Map<String, Value>>,
2717 options: &TraceDbRequestOptions,
2718 ) -> TraceDbClientResult<PutBatchResponse> {
2719 self.insert_rows_with_id_field_and_options(rows, "id", options)
2720 }
2721
2722 pub fn insert_rows_with_id_field(
2723 &self,
2724 rows: Vec<Map<String, Value>>,
2725 id_field: impl Into<String>,
2726 ) -> TraceDbClientResult<PutBatchResponse> {
2727 let options = TraceDbRequestOptions::default();
2728 self.insert_rows_with_id_field_and_options(rows, id_field, &options)
2729 }
2730
2731 pub fn insert_rows_with_id_field_and_options(
2732 &self,
2733 rows: Vec<Map<String, Value>>,
2734 id_field: impl Into<String>,
2735 options: &TraceDbRequestOptions,
2736 ) -> TraceDbClientResult<PutBatchResponse> {
2737 let path = "/v1/records/put-batch";
2738 let id_field = id_field.into();
2739 if id_field.is_empty() {
2740 return Err(TraceDbClientError::InvalidRequest {
2741 method: "POST".to_string(),
2742 path: path.to_string(),
2743 message: "id_field cannot be empty".to_string(),
2744 });
2745 }
2746 let tenant_id = self.required_tenant_id("POST", path)?;
2747 let records = rows
2748 .into_iter()
2749 .enumerate()
2750 .map(|(index, fields)| self.row_record_input(index, fields, &id_field, &tenant_id))
2751 .collect::<TraceDbClientResult<Vec<_>>>()?;
2752 let request = RecordPutBatchRequest::new(records);
2753 self.client("POST", path)?
2754 .put_batch_typed_with_options(&request, options)
2755 }
2756
2757 pub fn patch_record(
2758 &self,
2759 id: impl Into<String>,
2760 fields: Map<String, Value>,
2761 ) -> TraceDbClientResult<EpochResponse> {
2762 let options = TraceDbRequestOptions::default();
2763 self.patch_record_with_options(id, fields, &options)
2764 }
2765
2766 pub fn patch_record_with_options(
2767 &self,
2768 id: impl Into<String>,
2769 fields: Map<String, Value>,
2770 options: &TraceDbRequestOptions,
2771 ) -> TraceDbClientResult<EpochResponse> {
2772 let path = "/v1/records/patch";
2773 let request = RecordPatchRequest::new(
2774 self.table.clone(),
2775 self.required_tenant_id("POST", path)?,
2776 id,
2777 fields,
2778 );
2779 self.client("POST", path)?
2780 .patch_typed_with_options(&request, options)
2781 }
2782
2783 pub fn get_record(&self, id: impl Into<String>) -> TraceDbClientResult<GetRecordResponse> {
2784 let path = "/v1/records/get";
2785 let request = RecordGetRequest::new(
2786 self.table.clone(),
2787 self.required_tenant_id("POST", path)?,
2788 id,
2789 );
2790 self.client("POST", path)?.get_record_typed(&request)
2791 }
2792
2793 pub fn scan_typed(&self) -> TraceDbClientResult<RecordScanOutput> {
2794 let path = "/v1/records/scan";
2795 let request =
2796 RecordScanRequest::new(self.table.clone(), self.required_tenant_id("POST", path)?)
2797 .limit(self.limit);
2798 let request = if let Some(cursor) = &self.cursor {
2799 request.cursor(cursor.clone())
2800 } else {
2801 request
2802 };
2803 self.client("POST", path)?.scan_typed(&request)
2804 }
2805
2806 pub fn delete_record(&self, id: impl Into<String>) -> TraceDbClientResult<DeleteResponse> {
2807 let options = TraceDbRequestOptions::default();
2808 self.delete_record_with_options(id, &options)
2809 }
2810
2811 pub fn delete_record_with_options(
2812 &self,
2813 id: impl Into<String>,
2814 options: &TraceDbRequestOptions,
2815 ) -> TraceDbClientResult<DeleteResponse> {
2816 let path = "/v1/records/delete";
2817 let request = RecordDeleteRequest::new(
2818 self.table.clone(),
2819 self.required_tenant_id("POST", path)?,
2820 id,
2821 );
2822 self.client("POST", path)?
2823 .delete_typed_with_options(&request, options)
2824 }
2825
2826 pub fn all(self) -> TraceDbClientResult<QueryResponse> {
2827 let path = "/v1/query";
2828 let client = self.client("POST", path)?;
2829 let query = self.into_hybrid_query(path)?;
2830 client.query_typed(&query)
2831 }
2832
2833 pub fn explain_plan(self) -> TraceDbClientResult<HybridExplain> {
2834 let path = "/v1/explain";
2835 let client = self.client("POST", path)?;
2836 let query = self.into_hybrid_query(path)?;
2837 client.explain_typed(&query)
2838 }
2839
2840 pub fn build(self) -> TraceQueryRequest {
2841 let freshness = match self.freshness {
2842 FeatureFreshnessMode::Strict => "Strict",
2843 FeatureFreshnessMode::AllowDirty => "AllowDirty",
2844 FeatureFreshnessMode::Lazy
2845 | FeatureFreshnessMode::OnRead
2846 | FeatureFreshnessMode::AllowStale => "Lazy",
2847 };
2848 TraceQueryRequest {
2849 table: self.table,
2850 tenant_id: self.tenant_id.unwrap_or_default(),
2851 text_field: self.text_field,
2852 text: self.text_query,
2853 vector_field: self.vector_field,
2854 vector: self.vector,
2855 scalar_eq: self.scalar_eq,
2856 top_k: self.limit,
2857 cursor: self.cursor,
2858 freshness: freshness.to_string(),
2859 explain: self.explain,
2860 }
2861 }
2862
2863 pub fn put(self, id: impl Into<String>) -> RecordPutBuilder {
2864 RecordPutBuilder {
2865 table: self.table,
2866 tenant_id: self.tenant_id.unwrap_or_default(),
2867 id: id.into(),
2868 fields: Map::new(),
2869 }
2870 }
2871
2872 pub fn scan(self) -> RecordScanBuilder {
2873 RecordScanBuilder {
2874 table: self.table,
2875 tenant_id: self.tenant_id.unwrap_or_default(),
2876 limit: 100,
2877 cursor: self.cursor,
2878 }
2879 }
2880
2881 pub fn delete(self, id: impl Into<String>) -> RecordDeleteBuilder {
2882 RecordDeleteBuilder {
2883 table: self.table,
2884 tenant_id: self.tenant_id.unwrap_or_default(),
2885 id: id.into(),
2886 tombstone: "user_delete".to_string(),
2887 }
2888 }
2889
2890 fn into_hybrid_query(self, path: &str) -> TraceDbClientResult<HybridQuery> {
2891 let tenant_id = self.required_tenant_id("POST", path)?;
2892 let freshness = self.hybrid_freshness();
2893 Ok(HybridQuery {
2894 table: self.table,
2895 tenant_id,
2896 cursor: self.cursor,
2897 text_field: self.text_field,
2898 text: self.text_query,
2899 vector_field: self.vector_field,
2900 vector: self.vector,
2901 scalar_eq: self.scalar_eq,
2902 graph_seed: None,
2903 temporal_as_of: None,
2904 top_k: self.limit,
2905 freshness,
2906 explain: self.explain,
2907 })
2908 }
2909
2910 fn hybrid_freshness(&self) -> FreshnessMode {
2911 match self.freshness {
2912 FeatureFreshnessMode::Strict => FreshnessMode::Strict,
2913 FeatureFreshnessMode::AllowDirty => FreshnessMode::AllowDirty,
2914 FeatureFreshnessMode::Lazy
2915 | FeatureFreshnessMode::OnRead
2916 | FeatureFreshnessMode::AllowStale => FreshnessMode::Lazy,
2917 }
2918 }
2919
2920 fn client(&self, method: &str, path: &str) -> TraceDbClientResult<TraceDbClient> {
2921 self.client_config
2922 .clone()
2923 .map(TraceDbClient::new)
2924 .ok_or_else(|| TraceDbClientError::InvalidRequest {
2925 method: method.to_string(),
2926 path: path.to_string(),
2927 message: "table handle is not bound to a TraceDbClient".to_string(),
2928 })
2929 }
2930
2931 fn required_tenant_id(&self, method: &str, path: &str) -> TraceDbClientResult<String> {
2932 match self.tenant_id.as_ref().filter(|tenant| !tenant.is_empty()) {
2933 Some(tenant_id) => Ok(tenant_id.clone()),
2934 None => Err(TraceDbClientError::InvalidRequest {
2935 method: method.to_string(),
2936 path: path.to_string(),
2937 message: "table handle execution requires tenant(...)".to_string(),
2938 }),
2939 }
2940 }
2941
2942 fn record_input(&self, record: TableRecordInput, tenant_id: &str) -> RecordInput {
2943 let mut fields = record.fields;
2944 fields
2945 .entry("id".to_string())
2946 .or_insert_with(|| Value::String(record.id.clone()));
2947 fields
2948 .entry("tenant".to_string())
2949 .or_insert_with(|| Value::String(tenant_id.to_string()));
2950 RecordInput {
2951 table: self.table.clone(),
2952 id: record.id,
2953 tenant_id: tenant_id.to_string(),
2954 fields,
2955 }
2956 }
2957
2958 fn row_record_input(
2959 &self,
2960 index: usize,
2961 fields: Map<String, Value>,
2962 id_field: &str,
2963 tenant_id: &str,
2964 ) -> TraceDbClientResult<RecordInput> {
2965 let id = fields
2966 .get(id_field)
2967 .ok_or_else(|| TraceDbClientError::InvalidRequest {
2968 method: "POST".to_string(),
2969 path: "/v1/records/put-batch".to_string(),
2970 message: format!("row {index} missing id field '{id_field}'"),
2971 })?;
2972 let id = match id {
2973 Value::String(id) => id.clone(),
2974 value => value.to_string(),
2975 };
2976 Ok(self.record_input(TableRecordInput::new(id, fields), tenant_id))
2977 }
2978}
2979
2980#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2981pub struct TraceQueryRequest {
2982 pub table: String,
2983 pub tenant_id: String,
2984 #[serde(default, skip_serializing_if = "Option::is_none")]
2985 pub cursor: Option<String>,
2986 pub text_field: Option<String>,
2987 pub text: Option<String>,
2988 pub vector_field: Option<String>,
2989 pub vector: Option<Vec<f32>>,
2990 #[serde(default, skip_serializing_if = "Map::is_empty")]
2991 pub scalar_eq: Map<String, Value>,
2992 pub top_k: usize,
2993 pub freshness: String,
2994 pub explain: bool,
2995}
2996
2997#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
2998pub struct TraceHttpRequest {
2999 pub path: String,
3000 pub body: Value,
3001}
3002
3003#[derive(Clone, Debug)]
3004pub struct RecordPutBuilder {
3005 table: String,
3006 tenant_id: String,
3007 id: String,
3008 fields: Map<String, Value>,
3009}
3010
3011impl RecordPutBuilder {
3012 pub fn field(mut self, key: impl Into<String>, value: Value) -> Self {
3013 self.fields.insert(key.into(), value);
3014 self
3015 }
3016
3017 pub fn fields(mut self, fields: Map<String, Value>) -> Self {
3018 self.fields.extend(fields);
3019 self
3020 }
3021
3022 pub fn build(mut self) -> TraceHttpRequest {
3023 self.fields
3024 .entry("id".to_string())
3025 .or_insert_with(|| Value::String(self.id.clone()));
3026 self.fields
3027 .entry("tenant".to_string())
3028 .or_insert_with(|| Value::String(self.tenant_id.clone()));
3029 TraceHttpRequest {
3030 path: "/v1/records/put".to_string(),
3031 body: json!({
3032 "table": self.table,
3033 "id": self.id,
3034 "tenant_id": self.tenant_id,
3035 "fields": self.fields,
3036 }),
3037 }
3038 }
3039}
3040
3041#[derive(Clone, Debug)]
3042pub struct RecordScanBuilder {
3043 table: String,
3044 tenant_id: String,
3045 limit: usize,
3046 cursor: Option<String>,
3047}
3048
3049impl RecordScanBuilder {
3050 pub fn limit(mut self, limit: usize) -> Self {
3051 self.limit = limit;
3052 self
3053 }
3054
3055 pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
3056 self.cursor = Some(cursor.into());
3057 self
3058 }
3059
3060 pub fn build(self) -> TraceHttpRequest {
3061 let mut body = json!({
3062 "table": self.table,
3063 "tenant_id": self.tenant_id,
3064 "limit": self.limit,
3065 });
3066 if let Some(cursor) = self.cursor {
3067 body["cursor"] = json!(cursor);
3068 }
3069 TraceHttpRequest {
3070 path: "/v1/records/scan".to_string(),
3071 body,
3072 }
3073 }
3074}
3075
3076#[derive(Clone, Debug)]
3077pub struct RecordDeleteBuilder {
3078 table: String,
3079 tenant_id: String,
3080 id: String,
3081 tombstone: String,
3082}
3083
3084impl RecordDeleteBuilder {
3085 pub fn tombstone(mut self, tombstone: impl Into<String>) -> Self {
3086 self.tombstone = tombstone.into();
3087 self
3088 }
3089
3090 pub fn build(self) -> TraceHttpRequest {
3091 TraceHttpRequest {
3092 path: "/v1/records/delete".to_string(),
3093 body: json!({
3094 "table": self.table,
3095 "tenant_id": self.tenant_id,
3096 "id": self.id,
3097 "tombstone": self.tombstone,
3098 }),
3099 }
3100 }
3101}