1use std::collections::{BTreeMap, HashSet};
4use std::fmt;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use reqwest::header::{ACCEPT, USER_AGENT};
10use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
11use serde::Deserialize;
12use serde::de::DeserializeOwned;
13use tokio::sync::OnceCell;
14use url::Url;
15use uuid::Uuid;
16
17use super::{CatalogError, CatalogResult};
18
19const API_VERSION_SEGMENT: &str = "v1";
20const DEFAULT_CATALOG_TIMEOUT: Duration = Duration::from_secs(30);
21const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
22const DEFAULT_PAGE_SIZE: u32 = 1_000;
23const MAX_PAGE_SIZE: u32 = 10_000;
24const DEFAULT_MAX_RESPONSE_BYTES: usize = 64 * 1024 * 1024;
25const ERROR_BODY_LIMIT_BYTES: usize = 64 * 1024;
26const MAX_LIST_PAGES: usize = 10_000;
27const MAX_LISTED_TABLES: usize = 1_000_000;
28const MAX_IDENTIFIER_BYTES: usize = 1_024;
29const DEFAULT_NAMESPACE_SEPARATOR: &str = "%1F";
30const USER_AGENT_VALUE: &str = concat!("krishiv-sql-catalog/", env!("CARGO_PKG_VERSION"));
31
32#[derive(Clone)]
37pub struct RestCatalogConfig {
38 base_url: Url,
39 warehouse: Option<String>,
40 catalog_prefix: Option<String>,
41 bearer_token: Option<String>,
42 timeout: Duration,
43 page_size: u32,
44 max_response_bytes: usize,
45}
46
47impl RestCatalogConfig {
48 pub fn new(base_url: impl AsRef<str>) -> CatalogResult<Self> {
50 let base_url =
51 Url::parse(base_url.as_ref()).map_err(|error| CatalogError::InvalidConfiguration {
52 message: format!("invalid catalog URL: {error}"),
53 })?;
54 validate_base_url(&base_url)?;
55
56 Ok(Self {
57 base_url,
58 warehouse: None,
59 catalog_prefix: None,
60 bearer_token: None,
61 timeout: DEFAULT_CATALOG_TIMEOUT,
62 page_size: DEFAULT_PAGE_SIZE,
63 max_response_bytes: DEFAULT_MAX_RESPONSE_BYTES,
64 })
65 }
66
67 pub fn with_warehouse(mut self, warehouse: impl Into<String>) -> CatalogResult<Self> {
69 let warehouse =
70 validate_non_blank("warehouse", warehouse.into(), MAX_IDENTIFIER_BYTES * 4)?;
71 self.warehouse = Some(warehouse);
72 Ok(self)
73 }
74
75 pub fn with_catalog_prefix(mut self, prefix: impl Into<String>) -> CatalogResult<Self> {
80 let prefix = prefix.into();
81 split_catalog_prefix(&prefix)?;
82 self.catalog_prefix = Some(prefix);
83 Ok(self)
84 }
85
86 pub fn with_bearer_token(mut self, token: impl Into<String>) -> CatalogResult<Self> {
88 self.bearer_token = Some(validate_non_blank(
89 "bearer token",
90 token.into(),
91 MAX_IDENTIFIER_BYTES * 16,
92 )?);
93 Ok(self)
94 }
95
96 pub fn with_timeout(mut self, timeout: Duration) -> CatalogResult<Self> {
98 if timeout.is_zero() {
99 return Err(CatalogError::InvalidConfiguration {
100 message: "catalog request timeout must be positive".to_string(),
101 });
102 }
103 self.timeout = timeout;
104 Ok(self)
105 }
106
107 pub fn with_page_size(mut self, page_size: u32) -> CatalogResult<Self> {
109 if !(1..=MAX_PAGE_SIZE).contains(&page_size) {
110 return Err(CatalogError::InvalidConfiguration {
111 message: format!(
112 "catalog page size must be between 1 and {MAX_PAGE_SIZE}, got {page_size}"
113 ),
114 });
115 }
116 self.page_size = page_size;
117 Ok(self)
118 }
119
120 pub fn with_max_response_bytes(mut self, limit: usize) -> CatalogResult<Self> {
122 if limit == 0 {
123 return Err(CatalogError::InvalidConfiguration {
124 message: "catalog response limit must be positive".to_string(),
125 });
126 }
127 self.max_response_bytes = limit;
128 Ok(self)
129 }
130
131 pub fn base_url(&self) -> &Url {
132 &self.base_url
133 }
134
135 pub fn warehouse(&self) -> Option<&str> {
136 self.warehouse.as_deref()
137 }
138
139 pub fn catalog_prefix(&self) -> Option<&str> {
140 self.catalog_prefix.as_deref()
141 }
142
143 pub fn timeout(&self) -> Duration {
144 self.timeout
145 }
146
147 pub fn page_size(&self) -> u32 {
148 self.page_size
149 }
150
151 pub fn max_response_bytes(&self) -> usize {
152 self.max_response_bytes
153 }
154}
155
156impl fmt::Debug for RestCatalogConfig {
157 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
158 formatter
159 .debug_struct("RestCatalogConfig")
160 .field("base_url", &self.base_url)
161 .field("has_warehouse", &self.warehouse.is_some())
162 .field("catalog_prefix", &self.catalog_prefix)
163 .field("has_bearer_token", &self.bearer_token.is_some())
164 .field("timeout", &self.timeout)
165 .field("page_size", &self.page_size)
166 .field("max_response_bytes", &self.max_response_bytes)
167 .finish()
168 }
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct IcebergTableId {
174 namespace: String,
175 name: String,
176}
177
178impl IcebergTableId {
179 pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> CatalogResult<Self> {
180 Ok(Self {
181 namespace: validate_non_blank(
182 "table namespace",
183 namespace.into(),
184 MAX_IDENTIFIER_BYTES,
185 )?,
186 name: validate_non_blank("table name", name.into(), MAX_IDENTIFIER_BYTES)?,
187 })
188 }
189
190 pub fn namespace(&self) -> &str {
191 &self.namespace
192 }
193
194 pub fn name(&self) -> &str {
195 &self.name
196 }
197
198 fn display_name(&self) -> String {
199 format!("{}.{}", self.namespace, self.name)
200 }
201}
202
203#[derive(Clone, PartialEq)]
205pub struct LoadedIcebergTable {
206 metadata_location: String,
207 metadata: serde_json::Value,
208 config: BTreeMap<String, String>,
209}
210
211impl LoadedIcebergTable {
212 pub fn metadata_location(&self) -> &str {
213 &self.metadata_location
214 }
215
216 pub fn metadata(&self) -> &serde_json::Value {
217 &self.metadata
218 }
219
220 pub fn into_metadata(self) -> serde_json::Value {
221 self.metadata
222 }
223
224 pub fn config(&self) -> &BTreeMap<String, String> {
228 &self.config
229 }
230}
231
232impl fmt::Debug for LoadedIcebergTable {
233 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
234 let format_version = self
235 .metadata
236 .get("format-version")
237 .and_then(serde_json::Value::as_u64);
238 let table_uuid = self
239 .metadata
240 .get("table-uuid")
241 .and_then(serde_json::Value::as_str);
242 formatter
243 .debug_struct("LoadedIcebergTable")
244 .field("has_metadata_location", &true)
245 .field("format_version", &format_version)
246 .field("table_uuid", &table_uuid)
247 .field("config_keys", &self.config.keys().collect::<Vec<_>>())
248 .finish()
249 }
250}
251
252#[async_trait]
257pub trait IcebergCatalogClient: Send + Sync {
258 async fn list_tables(&self, namespace: &str) -> CatalogResult<Vec<String>>;
259
260 async fn load_table(&self, table: &IcebergTableId) -> CatalogResult<LoadedIcebergTable>;
261
262 async fn load_table_metadata(
263 &self,
264 table: &IcebergTableId,
265 ) -> CatalogResult<serde_json::Value> {
266 Ok(self.load_table(table).await?.into_metadata())
267 }
268}
269
270#[derive(Debug)]
271struct ResolvedCatalogConfig {
272 prefix_segments: Vec<String>,
273 namespace_separator: String,
274 endpoints: Option<HashSet<String>>,
275}
276
277#[derive(Clone)]
279pub struct GenericRestCatalog {
280 config: RestCatalogConfig,
281 client: Client,
282 resolved: Arc<OnceCell<ResolvedCatalogConfig>>,
283}
284
285impl GenericRestCatalog {
286 pub fn new(config: RestCatalogConfig) -> CatalogResult<Self> {
288 let connect_timeout = config.timeout.min(DEFAULT_CONNECT_TIMEOUT);
289 let client = Client::builder()
290 .connect_timeout(connect_timeout)
291 .timeout(config.timeout)
292 .user_agent(USER_AGENT_VALUE)
293 .build()
294 .map_err(|error| CatalogError::InvalidConfiguration {
295 message: format!("failed to build catalog HTTP client: {error}"),
296 })?;
297 Self::with_http_client(config, client)
298 }
299
300 pub fn with_http_client(config: RestCatalogConfig, client: Client) -> CatalogResult<Self> {
305 validate_base_url(&config.base_url)?;
306 Ok(Self {
307 config,
308 client,
309 resolved: Arc::new(OnceCell::new()),
310 })
311 }
312
313 async fn resolved_config(&self) -> CatalogResult<&ResolvedCatalogConfig> {
314 self.resolved
315 .get_or_try_init(|| async { self.fetch_catalog_config().await })
316 .await
317 }
318
319 async fn fetch_catalog_config(&self) -> CatalogResult<ResolvedCatalogConfig> {
320 let mut url = append_url_segments(&self.config.base_url, [API_VERSION_SEGMENT, "config"])?;
321 if let Some(warehouse) = self.config.warehouse() {
322 url.query_pairs_mut().append_pair("warehouse", warehouse);
323 }
324
325 let response: CatalogConfigResponse = self
326 .execute_json(
327 self.request(Method::GET, url),
328 "fetch catalog configuration",
329 NotFoundKind::None,
330 )
331 .await?;
332
333 let prefix = response
334 .overrides
335 .get("prefix")
336 .map(String::as_str)
337 .or(self.config.catalog_prefix())
338 .or_else(|| response.defaults.get("prefix").map(String::as_str))
339 .unwrap_or_default();
340 let prefix_segments = split_catalog_prefix(prefix)?;
341 let namespace_separator = response
342 .overrides
343 .get("namespace-separator")
344 .or_else(|| response.defaults.get("namespace-separator"))
345 .map(String::as_str)
346 .unwrap_or(DEFAULT_NAMESPACE_SEPARATOR);
347 let namespace_separator = decode_namespace_separator(namespace_separator)?;
348
349 let endpoints = response
350 .endpoints
351 .map(validate_advertised_endpoints)
352 .transpose()?;
353
354 Ok(ResolvedCatalogConfig {
355 prefix_segments,
356 namespace_separator,
357 endpoints,
358 })
359 }
360
361 fn request(&self, method: Method, url: Url) -> RequestBuilder {
362 let mut request = self
363 .client
364 .request(method, url)
365 .timeout(self.config.timeout)
366 .header(ACCEPT, "application/json")
367 .header(USER_AGENT, USER_AGENT_VALUE);
368 if let Some(token) = &self.config.bearer_token {
369 request = request.bearer_auth(token);
370 }
371 request
372 }
373
374 fn catalog_url(&self, resolved: &ResolvedCatalogConfig, suffix: &[&str]) -> CatalogResult<Url> {
375 let mut segments = Vec::with_capacity(1 + resolved.prefix_segments.len() + suffix.len());
376 segments.push(API_VERSION_SEGMENT);
377 segments.extend(resolved.prefix_segments.iter().map(String::as_str));
378 segments.extend_from_slice(suffix);
379 append_url_segments(&self.config.base_url, segments)
380 }
381
382 async fn execute_json<T>(
383 &self,
384 request: RequestBuilder,
385 operation: &'static str,
386 not_found: NotFoundKind,
387 ) -> CatalogResult<T>
388 where
389 T: DeserializeOwned,
390 {
391 let response = request
392 .send()
393 .await
394 .map_err(|error| CatalogError::Transport {
395 operation: operation.to_string(),
396 message: error.to_string(),
397 })?;
398 let status = response.status();
399
400 if !status.is_success() {
401 let body = read_error_body(response, operation).await;
402 return match (status, not_found) {
403 (StatusCode::NOT_FOUND, NotFoundKind::Namespace(name)) => {
404 Err(CatalogError::SchemaNotFound { name })
405 }
406 (StatusCode::NOT_FOUND, NotFoundKind::Table(name)) => {
407 Err(CatalogError::TableNotFound { name })
408 }
409 _ => Err(CatalogError::Http {
410 status: status.as_u16(),
411 message: describe_error_body(&body),
412 }),
413 };
414 }
415
416 let body = read_bounded_body(response, self.config.max_response_bytes, operation).await?;
417 serde_json::from_slice(&body).map_err(|error| CatalogError::InvalidResponse {
418 operation: operation.to_string(),
419 message: format!(
420 "response is not valid JSON at line {}, column {}: {error}",
421 error.line(),
422 error.column()
423 ),
424 })
425 }
426
427 fn require_endpoint(
428 resolved: &ResolvedCatalogConfig,
429 endpoint: RequiredEndpoint,
430 ) -> CatalogResult<()> {
431 let Some(endpoints) = &resolved.endpoints else {
432 return Ok(());
433 };
434 if endpoint.matches(endpoints, &resolved.prefix_segments) {
435 Ok(())
436 } else {
437 Err(CatalogError::UnsupportedOperation {
438 operation: endpoint.operation().to_string(),
439 })
440 }
441 }
442}
443
444impl fmt::Debug for GenericRestCatalog {
445 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
446 formatter
447 .debug_struct("GenericRestCatalog")
448 .field("config", &self.config)
449 .field("configuration_resolved", &self.resolved.get().is_some())
450 .finish()
451 }
452}
453
454#[async_trait]
455impl IcebergCatalogClient for GenericRestCatalog {
456 async fn list_tables(&self, namespace: &str) -> CatalogResult<Vec<String>> {
457 let namespace = validate_non_blank(
458 "table namespace",
459 namespace.to_string(),
460 MAX_IDENTIFIER_BYTES,
461 )?;
462 let resolved = self.resolved_config().await?;
463 Self::require_endpoint(resolved, RequiredEndpoint::ListTables)?;
464
465 let mut table_names = Vec::new();
466 let mut identifiers = HashSet::new();
467 let mut seen_page_tokens = HashSet::new();
468 let mut page_token = String::new();
469
470 for _ in 0..MAX_LIST_PAGES {
471 let mut url =
472 self.catalog_url(resolved, &["namespaces", namespace.as_str(), "tables"])?;
473 url.query_pairs_mut()
474 .append_pair("pageToken", &page_token)
475 .append_pair("pageSize", &self.config.page_size.to_string());
476
477 let page: ListTablesResponse = self
478 .execute_json(
479 self.request(Method::GET, url),
480 "list catalog tables",
481 NotFoundKind::Namespace(namespace.clone()),
482 )
483 .await?;
484
485 for identifier in page.identifiers {
486 validate_identifier_response(
487 &identifier,
488 &namespace,
489 &resolved.namespace_separator,
490 )?;
491 let key = (identifier.namespace, identifier.name.clone());
492 if !identifiers.insert(key) {
493 return Err(CatalogError::InvalidResponse {
494 operation: "list catalog tables".to_string(),
495 message: format!(
496 "server returned duplicate table identifier '{}'",
497 identifier.name
498 ),
499 });
500 }
501 if table_names.len() == MAX_LISTED_TABLES {
502 return Err(CatalogError::InvalidResponse {
503 operation: "list catalog tables".to_string(),
504 message: format!(
505 "listing exceeds the maximum of {MAX_LISTED_TABLES} tables"
506 ),
507 });
508 }
509 table_names.push(identifier.name);
510 }
511
512 let Some(next_page_token) = page.next_page_token else {
513 return Ok(table_names);
514 };
515 if next_page_token.is_empty() {
516 return Err(CatalogError::InvalidResponse {
517 operation: "list catalog tables".to_string(),
518 message: "server returned an empty next-page-token".to_string(),
519 });
520 }
521 if !seen_page_tokens.insert(next_page_token.clone()) {
522 return Err(CatalogError::InvalidResponse {
523 operation: "list catalog tables".to_string(),
524 message: "server repeated a pagination token".to_string(),
525 });
526 }
527 page_token = next_page_token;
528 }
529
530 Err(CatalogError::InvalidResponse {
531 operation: "list catalog tables".to_string(),
532 message: format!("listing exceeded the maximum of {MAX_LIST_PAGES} pages"),
533 })
534 }
535
536 async fn load_table(&self, table: &IcebergTableId) -> CatalogResult<LoadedIcebergTable> {
537 let resolved = self.resolved_config().await?;
538 Self::require_endpoint(resolved, RequiredEndpoint::LoadTable)?;
539 let url = self.catalog_url(
540 resolved,
541 &["namespaces", table.namespace(), "tables", table.name()],
542 )?;
543 let response: LoadTableResponse = self
544 .execute_json(
545 self.request(Method::GET, url),
546 "load catalog table",
547 NotFoundKind::Table(table.display_name()),
548 )
549 .await?;
550 validate_loaded_table(response)
551 }
552}
553
554#[derive(Debug, Deserialize)]
555struct CatalogConfigResponse {
556 defaults: BTreeMap<String, String>,
557 overrides: BTreeMap<String, String>,
558 #[serde(default)]
559 endpoints: Option<Vec<String>>,
560}
561
562#[derive(Debug, Deserialize)]
563struct ListTablesResponse {
564 identifiers: Vec<TableIdentifierResponse>,
565 #[serde(rename = "next-page-token", default)]
566 next_page_token: Option<String>,
567}
568
569#[derive(Debug, Deserialize)]
570struct TableIdentifierResponse {
571 namespace: Vec<String>,
572 name: String,
573}
574
575#[derive(Debug, Deserialize)]
576struct LoadTableResponse {
577 #[serde(rename = "metadata-location")]
578 metadata_location: String,
579 metadata: serde_json::Value,
580 #[serde(default)]
581 config: BTreeMap<String, String>,
582}
583
584#[derive(Debug, Clone, Copy)]
585enum RequiredEndpoint {
586 ListTables,
587 LoadTable,
588}
589
590impl RequiredEndpoint {
591 fn operation(self) -> &'static str {
592 match self {
593 Self::ListTables => "listing Iceberg tables",
594 Self::LoadTable => "loading an Iceberg table",
595 }
596 }
597
598 fn template(self) -> &'static str {
599 match self {
600 Self::ListTables => "GET /v1/{prefix}/namespaces/{namespace}/tables",
601 Self::LoadTable => "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}",
602 }
603 }
604
605 fn matches(self, endpoints: &HashSet<String>, prefix_segments: &[String]) -> bool {
606 if endpoints.contains(self.template()) {
607 return true;
608 }
609
610 let prefix = prefix_segments.join("/");
611 let concrete = match self {
612 Self::ListTables if prefix.is_empty() => {
613 "GET /v1/namespaces/{namespace}/tables".to_string()
614 }
615 Self::LoadTable if prefix.is_empty() => {
616 "GET /v1/namespaces/{namespace}/tables/{table}".to_string()
617 }
618 Self::ListTables => {
619 format!("GET /v1/{prefix}/namespaces/{{namespace}}/tables")
620 }
621 Self::LoadTable => {
622 format!("GET /v1/{prefix}/namespaces/{{namespace}}/tables/{{table}}")
623 }
624 };
625 endpoints.contains(&concrete)
626 }
627}
628
629enum NotFoundKind {
630 None,
631 Namespace(String),
632 Table(String),
633}
634
635fn validate_base_url(url: &Url) -> CatalogResult<()> {
636 if !matches!(url.scheme(), "http" | "https") {
637 return Err(CatalogError::InvalidConfiguration {
638 message: format!(
639 "catalog URL scheme must be http or https, got '{}'",
640 url.scheme()
641 ),
642 });
643 }
644 if url.host_str().is_none() {
645 return Err(CatalogError::InvalidConfiguration {
646 message: "catalog URL must include a host".to_string(),
647 });
648 }
649 if !url.username().is_empty() || url.password().is_some() {
650 return Err(CatalogError::InvalidConfiguration {
651 message: "catalog URL must not contain embedded credentials".to_string(),
652 });
653 }
654 if url.query().is_some() || url.fragment().is_some() {
655 return Err(CatalogError::InvalidConfiguration {
656 message: "catalog URL must not contain a query string or fragment".to_string(),
657 });
658 }
659 if url.cannot_be_a_base() {
660 return Err(CatalogError::InvalidConfiguration {
661 message: "catalog URL cannot be used as a hierarchical base URL".to_string(),
662 });
663 }
664 Ok(())
665}
666
667fn validate_non_blank(label: &str, value: String, max_bytes: usize) -> CatalogResult<String> {
668 if value.trim().is_empty() {
669 return Err(CatalogError::InvalidConfiguration {
670 message: format!("{label} must not be blank"),
671 });
672 }
673 if value.trim() != value {
674 return Err(CatalogError::InvalidConfiguration {
675 message: format!("{label} must not have leading or trailing whitespace"),
676 });
677 }
678 if value.contains('\0') {
679 return Err(CatalogError::InvalidConfiguration {
680 message: format!("{label} must not contain NUL"),
681 });
682 }
683 if value.len() > max_bytes {
684 return Err(CatalogError::InvalidConfiguration {
685 message: format!("{label} exceeds the maximum of {max_bytes} bytes"),
686 });
687 }
688 Ok(value)
689}
690
691fn split_catalog_prefix(prefix: &str) -> CatalogResult<Vec<String>> {
692 let prefix = prefix.trim_matches('/');
693 if prefix.is_empty() {
694 return Ok(Vec::new());
695 }
696
697 prefix
698 .split('/')
699 .map(|segment| {
700 if segment.is_empty() || matches!(segment, "." | "..") {
701 return Err(CatalogError::InvalidConfiguration {
702 message: format!("catalog prefix contains invalid segment '{segment}'"),
703 });
704 }
705 validate_non_blank(
706 "catalog prefix segment",
707 segment.to_string(),
708 MAX_IDENTIFIER_BYTES,
709 )
710 })
711 .collect()
712}
713
714fn validate_advertised_endpoints(endpoints: Vec<String>) -> CatalogResult<HashSet<String>> {
715 let mut validated = HashSet::with_capacity(endpoints.len());
716 for endpoint in endpoints {
717 let endpoint = validate_non_blank("advertised endpoint", endpoint, 4_096)?;
718 if !validated.insert(endpoint.clone()) {
719 return Err(CatalogError::InvalidResponse {
720 operation: "fetch catalog configuration".to_string(),
721 message: format!("server advertised endpoint '{endpoint}' more than once"),
722 });
723 }
724 }
725 Ok(validated)
726}
727
728fn decode_namespace_separator(encoded: &str) -> CatalogResult<String> {
729 let query = format!("separator={encoded}");
730 let decoded = url::form_urlencoded::parse(query.as_bytes())
731 .next()
732 .map(|(_, value)| value.into_owned())
733 .ok_or_else(|| CatalogError::InvalidResponse {
734 operation: "fetch catalog configuration".to_string(),
735 message: "namespace-separator could not be decoded".to_string(),
736 })?;
737 if decoded.is_empty() || decoded.contains('\0') || decoded.len() > 16 {
738 return Err(CatalogError::InvalidResponse {
739 operation: "fetch catalog configuration".to_string(),
740 message: "namespace-separator must decode to between 1 and 16 non-NUL bytes"
741 .to_string(),
742 });
743 }
744 Ok(decoded)
745}
746
747fn append_url_segments<I, S>(base_url: &Url, segments: I) -> CatalogResult<Url>
748where
749 I: IntoIterator<Item = S>,
750 S: AsRef<str>,
751{
752 let mut url = base_url.clone();
753 let mut path = url
754 .path_segments_mut()
755 .map_err(|_| CatalogError::InvalidConfiguration {
756 message: "catalog URL cannot accept path segments".to_string(),
757 })?;
758 path.pop_if_empty();
759 for segment in segments {
760 path.push(segment.as_ref());
761 }
762 drop(path);
763 Ok(url)
764}
765
766async fn read_bounded_body(
767 mut response: Response,
768 limit: usize,
769 operation: &str,
770) -> CatalogResult<Vec<u8>> {
771 if response
772 .content_length()
773 .is_some_and(|length| length > limit as u64)
774 {
775 return Err(CatalogError::ResponseTooLarge {
776 operation: operation.to_string(),
777 limit_bytes: limit,
778 });
779 }
780
781 let mut body = Vec::new();
782 while let Some(chunk) = response
783 .chunk()
784 .await
785 .map_err(|error| CatalogError::Transport {
786 operation: operation.to_string(),
787 message: format!("failed while reading response body: {error}"),
788 })?
789 {
790 let next_len =
791 body.len()
792 .checked_add(chunk.len())
793 .ok_or_else(|| CatalogError::ResponseTooLarge {
794 operation: operation.to_string(),
795 limit_bytes: limit,
796 })?;
797 if next_len > limit {
798 return Err(CatalogError::ResponseTooLarge {
799 operation: operation.to_string(),
800 limit_bytes: limit,
801 });
802 }
803 body.extend_from_slice(&chunk);
804 }
805 Ok(body)
806}
807
808async fn read_error_body(mut response: Response, operation: &str) -> Vec<u8> {
809 let mut body = Vec::new();
810 while body.len() < ERROR_BODY_LIMIT_BYTES {
811 let chunk = match response.chunk().await {
812 Ok(Some(chunk)) => chunk,
813 Ok(None) => return body,
814 Err(error) => {
815 let fallback = format!("failed to read error response during {operation}: {error}");
816 return fallback.into_bytes();
817 }
818 };
819 let remaining = ERROR_BODY_LIMIT_BYTES - body.len();
820 if chunk.len() > remaining {
821 body.extend_from_slice(chunk.get(..remaining).unwrap_or(&chunk));
822 body.extend_from_slice(b" [truncated]");
823 return body;
824 }
825 body.extend_from_slice(&chunk);
826 }
827 body
828}
829
830fn describe_error_body(body: &[u8]) -> String {
831 #[derive(Deserialize)]
832 struct ErrorEnvelope {
833 error: ErrorDetail,
834 }
835 #[derive(Deserialize)]
836 struct ErrorDetail {
837 message: String,
838 #[serde(rename = "type")]
839 error_type: Option<String>,
840 code: Option<u16>,
841 }
842
843 if let Ok(envelope) = serde_json::from_slice::<ErrorEnvelope>(body) {
844 let mut message = envelope.error.message;
845 if let Some(error_type) = envelope.error.error_type {
846 message = format!("{error_type}: {message}");
847 }
848 if let Some(code) = envelope.error.code {
849 message = format!("{message} (catalog code {code})");
850 }
851 return message;
852 }
853
854 let text = String::from_utf8_lossy(body).trim().to_string();
855 if text.is_empty() {
856 "catalog returned an empty error response".to_string()
857 } else {
858 text
859 }
860}
861
862fn validate_identifier_response(
863 identifier: &TableIdentifierResponse,
864 requested_namespace: &str,
865 namespace_separator: &str,
866) -> CatalogResult<()> {
867 if identifier.namespace.is_empty() {
868 return Err(CatalogError::InvalidResponse {
869 operation: "list catalog tables".to_string(),
870 message: format!(
871 "table identifier '{}' has an empty namespace",
872 identifier.name
873 ),
874 });
875 }
876 for namespace_part in &identifier.namespace {
877 validate_response_string("list catalog tables", "namespace component", namespace_part)?;
878 }
879 let response_namespace = identifier.namespace.join(namespace_separator);
880 if response_namespace != requested_namespace {
881 return Err(CatalogError::InvalidResponse {
882 operation: "list catalog tables".to_string(),
883 message: format!(
884 "server returned table '{}' from namespace '{}' while listing '{}'",
885 identifier.name, response_namespace, requested_namespace
886 ),
887 });
888 }
889 validate_response_string("list catalog tables", "table name", &identifier.name)
890}
891
892fn validate_loaded_table(response: LoadTableResponse) -> CatalogResult<LoadedIcebergTable> {
893 validate_response_string(
894 "load catalog table",
895 "metadata-location",
896 &response.metadata_location,
897 )?;
898 validate_absolute_uri(
899 "load catalog table",
900 "metadata-location",
901 &response.metadata_location,
902 )?;
903 for key in response.config.keys() {
904 validate_response_string("load catalog table", "table config key", key)?;
905 }
906
907 let metadata = response
908 .metadata
909 .as_object()
910 .ok_or_else(|| CatalogError::InvalidResponse {
911 operation: "load catalog table".to_string(),
912 message: "metadata must be a JSON object".to_string(),
913 })?;
914 let format_version = metadata
915 .get("format-version")
916 .and_then(serde_json::Value::as_u64)
917 .ok_or_else(|| CatalogError::InvalidResponse {
918 operation: "load catalog table".to_string(),
919 message: "metadata format-version must be an integer".to_string(),
920 })?;
921 if !(1..=3).contains(&format_version) {
922 return Err(CatalogError::InvalidResponse {
923 operation: "load catalog table".to_string(),
924 message: format!("unsupported Iceberg format-version {format_version}"),
925 });
926 }
927
928 let table_uuid = metadata
929 .get("table-uuid")
930 .and_then(serde_json::Value::as_str)
931 .ok_or_else(|| CatalogError::InvalidResponse {
932 operation: "load catalog table".to_string(),
933 message: "metadata table-uuid must be a string".to_string(),
934 })?;
935 Uuid::parse_str(table_uuid).map_err(|error| CatalogError::InvalidResponse {
936 operation: "load catalog table".to_string(),
937 message: format!("metadata table-uuid is invalid: {error}"),
938 })?;
939
940 if let Some(location) = metadata.get("location") {
941 let location = location
942 .as_str()
943 .ok_or_else(|| CatalogError::InvalidResponse {
944 operation: "load catalog table".to_string(),
945 message: "metadata location must be a string".to_string(),
946 })?;
947 validate_response_string("load catalog table", "metadata location", location)?;
948 validate_absolute_uri("load catalog table", "metadata location", location)?;
949 }
950
951 Ok(LoadedIcebergTable {
952 metadata_location: response.metadata_location,
953 metadata: response.metadata,
954 config: response.config,
955 })
956}
957
958fn validate_response_string(operation: &str, label: &str, value: &str) -> CatalogResult<()> {
959 if value.trim().is_empty() {
960 return Err(CatalogError::InvalidResponse {
961 operation: operation.to_string(),
962 message: format!("{label} must not be blank"),
963 });
964 }
965 if value.contains('\0') {
966 return Err(CatalogError::InvalidResponse {
967 operation: operation.to_string(),
968 message: format!("{label} must not contain NUL"),
969 });
970 }
971 if value.len() > MAX_IDENTIFIER_BYTES * 16 {
972 return Err(CatalogError::InvalidResponse {
973 operation: operation.to_string(),
974 message: format!("{label} is unreasonably large"),
975 });
976 }
977 Ok(())
978}
979
980fn validate_absolute_uri(operation: &str, label: &str, value: &str) -> CatalogResult<()> {
981 let uri = Url::parse(value).map_err(|error| CatalogError::InvalidResponse {
982 operation: operation.to_string(),
983 message: format!("{label} must be an absolute URI: {error}"),
984 })?;
985 if uri.cannot_be_a_base() {
986 return Err(CatalogError::InvalidResponse {
987 operation: operation.to_string(),
988 message: format!("{label} must be a hierarchical URI"),
989 });
990 }
991 Ok(())
992}
993
994#[cfg(test)]
995mod tests {
996 use std::time::Duration;
997
998 use reqwest::Client;
999 use serde_json::json;
1000 use wiremock::matchers::{header, method, path, query_param};
1001 use wiremock::{Mock, MockServer, ResponseTemplate};
1002
1003 use super::*;
1004
1005 async fn mount_config(server: &MockServer, body: serde_json::Value) {
1006 Mock::given(method("GET"))
1007 .and(path("/v1/config"))
1008 .respond_with(ResponseTemplate::new(200).set_body_json(body))
1009 .mount(server)
1010 .await;
1011 }
1012
1013 fn test_catalog(server: &MockServer) -> GenericRestCatalog {
1014 GenericRestCatalog::new(RestCatalogConfig::new(server.uri()).unwrap()).unwrap()
1015 }
1016
1017 fn valid_load_response() -> serde_json::Value {
1018 json!({
1019 "metadata-location": "s3://warehouse/db/table/metadata/v1.json",
1020 "metadata": {
1021 "format-version": 2,
1022 "table-uuid": "4d9d09d7-927d-47f6-9063-06e62f070a3b",
1023 "location": "s3://warehouse/db/table"
1024 },
1025 "config": {
1026 "token": "table-secret"
1027 }
1028 })
1029 }
1030
1031 #[test]
1032 fn configuration_rejects_unsafe_or_unbounded_values() {
1033 assert!(RestCatalogConfig::new("file:///tmp/catalog").is_err());
1034 assert!(RestCatalogConfig::new("https://user:secret@example.com").is_err());
1035 assert!(RestCatalogConfig::new("https://example.com?token=secret").is_err());
1036 assert!(
1037 RestCatalogConfig::new("https://example.com")
1038 .unwrap()
1039 .with_timeout(Duration::ZERO)
1040 .is_err()
1041 );
1042 assert!(
1043 RestCatalogConfig::new("https://example.com")
1044 .unwrap()
1045 .with_page_size(0)
1046 .is_err()
1047 );
1048 assert!(
1049 RestCatalogConfig::new("https://example.com")
1050 .unwrap()
1051 .with_max_response_bytes(0)
1052 .is_err()
1053 );
1054 }
1055
1056 #[test]
1057 fn configuration_debug_redacts_bearer_token() {
1058 let config = RestCatalogConfig::new("https://example.com")
1059 .unwrap()
1060 .with_bearer_token("top-secret")
1061 .unwrap();
1062 let debug = format!("{config:?}");
1063 assert!(!debug.contains("top-secret"));
1064 assert!(debug.contains("has_bearer_token: true"));
1065 }
1066
1067 #[test]
1068 fn table_identifier_is_validated() {
1069 assert!(IcebergTableId::new("", "events").is_err());
1070 assert!(IcebergTableId::new("analytics", " ").is_err());
1071 assert!(IcebergTableId::new(" analytics", "events").is_err());
1072 let table = IcebergTableId::new("analytics", "events").unwrap();
1073 assert_eq!(table.namespace(), "analytics");
1074 assert_eq!(table.name(), "events");
1075 }
1076
1077 #[test]
1078 fn url_segments_are_encoded_and_base_path_is_preserved() {
1079 let base = Url::parse("https://example.com/catalog/api/").unwrap();
1080 let url = append_url_segments(&base, ["v1", "namespaces", "a/b", "tables"]).unwrap();
1081 assert_eq!(
1082 url.as_str(),
1083 "https://example.com/catalog/api/v1/namespaces/a%2Fb/tables"
1084 );
1085 }
1086
1087 #[tokio::test]
1088 async fn config_negotiation_applies_warehouse_auth_and_server_prefix_override() {
1089 let server = MockServer::start().await;
1090 Mock::given(method("GET"))
1091 .and(path("/v1/config"))
1092 .and(query_param("warehouse", "s3://warehouse"))
1093 .and(header("authorization", "Bearer catalog-secret"))
1094 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1095 "defaults": { "prefix": "default" },
1096 "overrides": { "prefix": "tenant/prod" }
1097 })))
1098 .expect(1)
1099 .mount(&server)
1100 .await;
1101 Mock::given(method("GET"))
1102 .and(path("/v1/tenant/prod/namespaces/analytics/tables"))
1103 .and(query_param("pageToken", ""))
1104 .and(query_param("pageSize", "1000"))
1105 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1106 "identifiers": [
1107 { "namespace": ["analytics"], "name": "events" }
1108 ]
1109 })))
1110 .mount(&server)
1111 .await;
1112
1113 let config = RestCatalogConfig::new(server.uri())
1114 .unwrap()
1115 .with_warehouse("s3://warehouse")
1116 .unwrap()
1117 .with_catalog_prefix("client")
1118 .unwrap()
1119 .with_bearer_token("catalog-secret")
1120 .unwrap();
1121 let catalog = GenericRestCatalog::new(config).unwrap();
1122
1123 assert_eq!(
1124 catalog.list_tables("analytics").await.unwrap(),
1125 vec!["events"]
1126 );
1127 }
1128
1129 #[tokio::test]
1130 async fn list_tables_follows_pagination_and_negotiates_once() {
1131 let server = MockServer::start().await;
1132 Mock::given(method("GET"))
1133 .and(path("/v1/config"))
1134 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1135 "defaults": {},
1136 "overrides": {}
1137 })))
1138 .expect(1)
1139 .mount(&server)
1140 .await;
1141 Mock::given(method("GET"))
1142 .and(path("/v1/namespaces/ns/tables"))
1143 .and(query_param("pageToken", ""))
1144 .and(query_param("pageSize", "2"))
1145 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1146 "identifiers": [
1147 { "namespace": ["ns"], "name": "a" },
1148 { "namespace": ["ns"], "name": "b" }
1149 ],
1150 "next-page-token": "page-2"
1151 })))
1152 .expect(2)
1153 .mount(&server)
1154 .await;
1155 Mock::given(method("GET"))
1156 .and(path("/v1/namespaces/ns/tables"))
1157 .and(query_param("pageToken", "page-2"))
1158 .and(query_param("pageSize", "2"))
1159 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1160 "identifiers": [
1161 { "namespace": ["ns"], "name": "c" }
1162 ],
1163 "next-page-token": null
1164 })))
1165 .expect(2)
1166 .mount(&server)
1167 .await;
1168
1169 let config = RestCatalogConfig::new(server.uri())
1170 .unwrap()
1171 .with_page_size(2)
1172 .unwrap();
1173 let catalog = GenericRestCatalog::new(config).unwrap();
1174
1175 assert_eq!(
1176 catalog.list_tables("ns").await.unwrap(),
1177 vec!["a", "b", "c"]
1178 );
1179 assert_eq!(
1180 catalog.list_tables("ns").await.unwrap(),
1181 vec!["a", "b", "c"]
1182 );
1183 }
1184
1185 #[tokio::test]
1186 async fn list_tables_rejects_repeated_page_token() {
1187 let server = MockServer::start().await;
1188 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1189 Mock::given(method("GET"))
1190 .and(path("/v1/namespaces/ns/tables"))
1191 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1192 "identifiers": [],
1193 "next-page-token": "same"
1194 })))
1195 .mount(&server)
1196 .await;
1197
1198 let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1199 assert!(matches!(error, CatalogError::InvalidResponse { .. }));
1200 assert!(error.to_string().contains("repeated a pagination token"));
1201 }
1202
1203 #[tokio::test]
1204 async fn list_tables_rejects_malformed_and_duplicate_identifiers() {
1205 let server = MockServer::start().await;
1206 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1207 Mock::given(method("GET"))
1208 .and(path("/v1/namespaces/ns/tables"))
1209 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1210 "identifiers": [
1211 { "namespace": ["ns"], "name": "events" },
1212 { "namespace": ["ns"], "name": "events" }
1213 ]
1214 })))
1215 .mount(&server)
1216 .await;
1217
1218 let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1219 assert!(error.to_string().contains("duplicate table identifier"));
1220 }
1221
1222 #[tokio::test]
1223 async fn list_tables_validates_multipart_namespace_with_server_separator() {
1224 let server = MockServer::start().await;
1225 mount_config(
1226 &server,
1227 json!({
1228 "defaults": {},
1229 "overrides": { "namespace-separator": "%2E" }
1230 }),
1231 )
1232 .await;
1233 Mock::given(method("GET"))
1234 .and(path("/v1/namespaces/accounting.tax/tables"))
1235 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1236 "identifiers": [
1237 { "namespace": ["accounting", "tax"], "name": "payments" }
1238 ]
1239 })))
1240 .mount(&server)
1241 .await;
1242
1243 assert_eq!(
1244 test_catalog(&server)
1245 .list_tables("accounting.tax")
1246 .await
1247 .unwrap(),
1248 vec!["payments"]
1249 );
1250 }
1251
1252 #[tokio::test]
1253 async fn list_tables_rejects_identifier_from_another_namespace() {
1254 let server = MockServer::start().await;
1255 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1256 Mock::given(method("GET"))
1257 .and(path("/v1/namespaces/ns/tables"))
1258 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1259 "identifiers": [
1260 { "namespace": ["other"], "name": "events" }
1261 ]
1262 })))
1263 .mount(&server)
1264 .await;
1265
1266 let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1267 assert!(error.to_string().contains("while listing 'ns'"));
1268 }
1269
1270 #[tokio::test]
1271 async fn list_tables_rejects_missing_identifiers() {
1272 let server = MockServer::start().await;
1273 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1274 Mock::given(method("GET"))
1275 .and(path("/v1/namespaces/ns/tables"))
1276 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
1277 .mount(&server)
1278 .await;
1279
1280 let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1281 assert!(matches!(error, CatalogError::InvalidResponse { .. }));
1282 }
1283
1284 #[tokio::test]
1285 async fn advertised_capabilities_are_enforced() {
1286 let server = MockServer::start().await;
1287 mount_config(
1288 &server,
1289 json!({
1290 "defaults": {},
1291 "overrides": {},
1292 "endpoints": [
1293 "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}"
1294 ]
1295 }),
1296 )
1297 .await;
1298
1299 let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1300 assert!(matches!(error, CatalogError::UnsupportedOperation { .. }));
1301 }
1302
1303 #[tokio::test]
1304 async fn list_not_found_maps_to_schema_not_found() {
1305 let server = MockServer::start().await;
1306 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1307 Mock::given(method("GET"))
1308 .and(path("/v1/namespaces/missing/tables"))
1309 .respond_with(ResponseTemplate::new(404).set_body_json(json!({
1310 "error": {
1311 "message": "namespace is missing",
1312 "type": "NoSuchNamespaceException",
1313 "code": 404
1314 }
1315 })))
1316 .mount(&server)
1317 .await;
1318
1319 let error = test_catalog(&server)
1320 .list_tables("missing")
1321 .await
1322 .unwrap_err();
1323 assert!(matches!(
1324 error,
1325 CatalogError::SchemaNotFound { name } if name == "missing"
1326 ));
1327 }
1328
1329 #[tokio::test]
1330 async fn load_table_returns_validated_envelope_and_redacts_config_debug() {
1331 let server = MockServer::start().await;
1332 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1333 Mock::given(method("GET"))
1334 .and(path("/v1/namespaces/ns/tables/events"))
1335 .respond_with(ResponseTemplate::new(200).set_body_json(valid_load_response()))
1336 .mount(&server)
1337 .await;
1338
1339 let table = IcebergTableId::new("ns", "events").unwrap();
1340 let loaded = test_catalog(&server).load_table(&table).await.unwrap();
1341 assert_eq!(
1342 loaded.metadata_location(),
1343 "s3://warehouse/db/table/metadata/v1.json"
1344 );
1345 assert_eq!(loaded.metadata()["format-version"], 2);
1346 assert_eq!(loaded.config().get("token").unwrap(), "table-secret");
1347 assert!(!format!("{loaded:?}").contains("table-secret"));
1348 assert!(!format!("{loaded:?}").contains("s3://warehouse"));
1349 }
1350
1351 #[tokio::test]
1352 async fn load_table_metadata_compatibility_helper_returns_only_metadata() {
1353 let server = MockServer::start().await;
1354 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1355 Mock::given(method("GET"))
1356 .and(path("/v1/namespaces/ns/tables/events"))
1357 .respond_with(ResponseTemplate::new(200).set_body_json(valid_load_response()))
1358 .mount(&server)
1359 .await;
1360
1361 let metadata = test_catalog(&server)
1362 .load_table_metadata(&IcebergTableId::new("ns", "events").unwrap())
1363 .await
1364 .unwrap();
1365 assert_eq!(metadata["format-version"], 2);
1366 assert!(metadata.get("metadata-location").is_none());
1367 }
1368
1369 #[tokio::test]
1370 async fn load_table_rejects_invalid_metadata_contract() {
1371 let server = MockServer::start().await;
1372 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1373 let mut response = valid_load_response();
1374 response["metadata"]["table-uuid"] = json!("not-a-uuid");
1375 Mock::given(method("GET"))
1376 .and(path("/v1/namespaces/ns/tables/events"))
1377 .respond_with(ResponseTemplate::new(200).set_body_json(response))
1378 .mount(&server)
1379 .await;
1380
1381 let error = test_catalog(&server)
1382 .load_table(&IcebergTableId::new("ns", "events").unwrap())
1383 .await
1384 .unwrap_err();
1385 assert!(matches!(error, CatalogError::InvalidResponse { .. }));
1386 assert!(error.to_string().contains("table-uuid is invalid"));
1387 }
1388
1389 #[tokio::test]
1390 async fn load_table_rejects_relative_metadata_location() {
1391 let server = MockServer::start().await;
1392 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1393 let mut response = valid_load_response();
1394 response["metadata-location"] = json!("metadata/v1.json");
1395 Mock::given(method("GET"))
1396 .and(path("/v1/namespaces/ns/tables/events"))
1397 .respond_with(ResponseTemplate::new(200).set_body_json(response))
1398 .mount(&server)
1399 .await;
1400
1401 let error = test_catalog(&server)
1402 .load_table(&IcebergTableId::new("ns", "events").unwrap())
1403 .await
1404 .unwrap_err();
1405 assert!(error.to_string().contains("must be an absolute URI"));
1406 }
1407
1408 #[tokio::test]
1409 async fn load_table_not_found_maps_to_table_not_found() {
1410 let server = MockServer::start().await;
1411 mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1412 Mock::given(method("GET"))
1413 .and(path("/v1/namespaces/ns/tables/missing"))
1414 .respond_with(ResponseTemplate::new(404))
1415 .mount(&server)
1416 .await;
1417
1418 let error = test_catalog(&server)
1419 .load_table(&IcebergTableId::new("ns", "missing").unwrap())
1420 .await
1421 .unwrap_err();
1422 assert!(matches!(
1423 error,
1424 CatalogError::TableNotFound { name } if name == "ns.missing"
1425 ));
1426 }
1427
1428 #[tokio::test]
1429 async fn response_body_limit_is_enforced_without_content_length_reliance() {
1430 let server = MockServer::start().await;
1431 Mock::given(method("GET"))
1432 .and(path("/v1/config"))
1433 .respond_with(
1434 ResponseTemplate::new(200).set_body_string(
1435 json!({
1436 "defaults": {},
1437 "overrides": {},
1438 "padding": "x".repeat(2_000)
1439 })
1440 .to_string(),
1441 ),
1442 )
1443 .mount(&server)
1444 .await;
1445
1446 let config = RestCatalogConfig::new(server.uri())
1447 .unwrap()
1448 .with_max_response_bytes(128)
1449 .unwrap();
1450 let error = GenericRestCatalog::new(config)
1451 .unwrap()
1452 .list_tables("ns")
1453 .await
1454 .unwrap_err();
1455 assert!(matches!(error, CatalogError::ResponseTooLarge { .. }));
1456 }
1457
1458 #[tokio::test]
1459 async fn iceberg_error_envelope_preserves_type_message_and_code() {
1460 let server = MockServer::start().await;
1461 Mock::given(method("GET"))
1462 .and(path("/v1/config"))
1463 .respond_with(ResponseTemplate::new(503).set_body_json(json!({
1464 "error": {
1465 "message": "catalog unavailable",
1466 "type": "ServiceUnavailableException",
1467 "code": 503
1468 }
1469 })))
1470 .mount(&server)
1471 .await;
1472
1473 let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1474 assert!(matches!(error, CatalogError::Http { status: 503, .. }));
1475 let message = error.to_string();
1476 assert!(message.contains("ServiceUnavailableException"));
1477 assert!(message.contains("catalog unavailable"));
1478 assert!(message.contains("catalog code 503"));
1479 }
1480
1481 #[tokio::test]
1482 async fn custom_http_client_is_supported() {
1483 let server = MockServer::start().await;
1484 Mock::given(method("GET"))
1485 .and(path("/v1/config"))
1486 .and(header("x-catalog-tenant", "tenant-a"))
1487 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1488 "defaults": {},
1489 "overrides": {}
1490 })))
1491 .mount(&server)
1492 .await;
1493 Mock::given(method("GET"))
1494 .and(path("/v1/namespaces/ns/tables"))
1495 .and(header("x-catalog-tenant", "tenant-a"))
1496 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1497 "identifiers": []
1498 })))
1499 .mount(&server)
1500 .await;
1501
1502 let mut headers = reqwest::header::HeaderMap::new();
1503 headers.insert("x-catalog-tenant", "tenant-a".parse().unwrap());
1504 let client = Client::builder().default_headers(headers).build().unwrap();
1505 let catalog = GenericRestCatalog::with_http_client(
1506 RestCatalogConfig::new(server.uri()).unwrap(),
1507 client,
1508 )
1509 .unwrap();
1510 assert!(catalog.list_tables("ns").await.unwrap().is_empty());
1511 }
1512
1513 #[tokio::test]
1514 async fn request_timeout_is_reported_as_transport_error() {
1515 let server = MockServer::start().await;
1516 Mock::given(method("GET"))
1517 .and(path("/v1/config"))
1518 .respond_with(
1519 ResponseTemplate::new(200)
1520 .set_delay(Duration::from_millis(100))
1521 .set_body_json(json!({ "defaults": {}, "overrides": {} })),
1522 )
1523 .mount(&server)
1524 .await;
1525
1526 let config = RestCatalogConfig::new(server.uri())
1527 .unwrap()
1528 .with_timeout(Duration::from_millis(10))
1529 .unwrap();
1530 let error = GenericRestCatalog::new(config)
1531 .unwrap()
1532 .list_tables("ns")
1533 .await
1534 .unwrap_err();
1535 assert!(matches!(error, CatalogError::Transport { .. }));
1536 }
1537}