Skip to main content

krishiv_sql/catalog/
iceberg_rest.rs

1//! Production-oriented client for the Apache Iceberg REST Catalog API.
2
3use std::collections::{BTreeMap, HashSet};
4use std::fmt;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use reqwest::header::{ACCEPT, USER_AGENT};
10use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
11use serde::Deserialize;
12use serde::de::DeserializeOwned;
13use tokio::sync::OnceCell;
14use url::Url;
15use uuid::Uuid;
16
17use super::{CatalogError, CatalogResult};
18
19const API_VERSION_SEGMENT: &str = "v1";
20const DEFAULT_CATALOG_TIMEOUT: Duration = Duration::from_secs(30);
21const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
22const DEFAULT_PAGE_SIZE: u32 = 1_000;
23const MAX_PAGE_SIZE: u32 = 10_000;
24const DEFAULT_MAX_RESPONSE_BYTES: usize = 64 * 1024 * 1024;
25const ERROR_BODY_LIMIT_BYTES: usize = 64 * 1024;
26const MAX_LIST_PAGES: usize = 10_000;
27const MAX_LISTED_TABLES: usize = 1_000_000;
28const MAX_IDENTIFIER_BYTES: usize = 1_024;
29const DEFAULT_NAMESPACE_SEPARATOR: &str = "%1F";
30const USER_AGENT_VALUE: &str = concat!("krishiv-sql-catalog/", env!("CARGO_PKG_VERSION"));
31
32/// REST catalog configuration.
33///
34/// Construction validates the endpoint and all resource ceilings. Credentials
35/// are deliberately omitted from `Debug`.
36#[derive(Clone)]
37pub struct RestCatalogConfig {
38    base_url: Url,
39    warehouse: Option<String>,
40    catalog_prefix: Option<String>,
41    bearer_token: Option<String>,
42    timeout: Duration,
43    page_size: u32,
44    max_response_bytes: usize,
45}
46
47impl RestCatalogConfig {
48    /// Create a validated configuration for an Iceberg REST catalog.
49    pub fn new(base_url: impl AsRef<str>) -> CatalogResult<Self> {
50        let base_url =
51            Url::parse(base_url.as_ref()).map_err(|error| CatalogError::InvalidConfiguration {
52                message: format!("invalid catalog URL: {error}"),
53            })?;
54        validate_base_url(&base_url)?;
55
56        Ok(Self {
57            base_url,
58            warehouse: None,
59            catalog_prefix: None,
60            bearer_token: None,
61            timeout: DEFAULT_CATALOG_TIMEOUT,
62            page_size: DEFAULT_PAGE_SIZE,
63            max_response_bytes: DEFAULT_MAX_RESPONSE_BYTES,
64        })
65    }
66
67    /// Select the warehouse passed to the mandatory `/v1/config` request.
68    pub fn with_warehouse(mut self, warehouse: impl Into<String>) -> CatalogResult<Self> {
69        let warehouse =
70            validate_non_blank("warehouse", warehouse.into(), MAX_IDENTIFIER_BYTES * 4)?;
71        self.warehouse = Some(warehouse);
72        Ok(self)
73    }
74
75    /// Supply a client-side catalog prefix.
76    ///
77    /// Server defaults are applied first and server overrides are applied last,
78    /// as required by the Iceberg REST configuration contract.
79    pub fn with_catalog_prefix(mut self, prefix: impl Into<String>) -> CatalogResult<Self> {
80        let prefix = prefix.into();
81        split_catalog_prefix(&prefix)?;
82        self.catalog_prefix = Some(prefix);
83        Ok(self)
84    }
85
86    /// Attach a bearer token to configuration and catalog requests.
87    pub fn with_bearer_token(mut self, token: impl Into<String>) -> CatalogResult<Self> {
88        self.bearer_token = Some(validate_non_blank(
89            "bearer token",
90            token.into(),
91            MAX_IDENTIFIER_BYTES * 16,
92        )?);
93        Ok(self)
94    }
95
96    /// Set the end-to-end request timeout.
97    pub fn with_timeout(mut self, timeout: Duration) -> CatalogResult<Self> {
98        if timeout.is_zero() {
99            return Err(CatalogError::InvalidConfiguration {
100                message: "catalog request timeout must be positive".to_string(),
101            });
102        }
103        self.timeout = timeout;
104        Ok(self)
105    }
106
107    /// Set the requested list page size.
108    pub fn with_page_size(mut self, page_size: u32) -> CatalogResult<Self> {
109        if !(1..=MAX_PAGE_SIZE).contains(&page_size) {
110            return Err(CatalogError::InvalidConfiguration {
111                message: format!(
112                    "catalog page size must be between 1 and {MAX_PAGE_SIZE}, got {page_size}"
113                ),
114            });
115        }
116        self.page_size = page_size;
117        Ok(self)
118    }
119
120    /// Set the maximum successful response body retained in memory.
121    pub fn with_max_response_bytes(mut self, limit: usize) -> CatalogResult<Self> {
122        if limit == 0 {
123            return Err(CatalogError::InvalidConfiguration {
124                message: "catalog response limit must be positive".to_string(),
125            });
126        }
127        self.max_response_bytes = limit;
128        Ok(self)
129    }
130
131    pub fn base_url(&self) -> &Url {
132        &self.base_url
133    }
134
135    pub fn warehouse(&self) -> Option<&str> {
136        self.warehouse.as_deref()
137    }
138
139    pub fn catalog_prefix(&self) -> Option<&str> {
140        self.catalog_prefix.as_deref()
141    }
142
143    pub fn timeout(&self) -> Duration {
144        self.timeout
145    }
146
147    pub fn page_size(&self) -> u32 {
148        self.page_size
149    }
150
151    pub fn max_response_bytes(&self) -> usize {
152        self.max_response_bytes
153    }
154}
155
156impl fmt::Debug for RestCatalogConfig {
157    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
158        formatter
159            .debug_struct("RestCatalogConfig")
160            .field("base_url", &self.base_url)
161            .field("has_warehouse", &self.warehouse.is_some())
162            .field("catalog_prefix", &self.catalog_prefix)
163            .field("has_bearer_token", &self.bearer_token.is_some())
164            .field("timeout", &self.timeout)
165            .field("page_size", &self.page_size)
166            .field("max_response_bytes", &self.max_response_bytes)
167            .finish()
168    }
169}
170
171/// Validated Iceberg table identifier.
172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct IcebergTableId {
174    namespace: String,
175    name: String,
176}
177
178impl IcebergTableId {
179    pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> CatalogResult<Self> {
180        Ok(Self {
181            namespace: validate_non_blank(
182                "table namespace",
183                namespace.into(),
184                MAX_IDENTIFIER_BYTES,
185            )?,
186            name: validate_non_blank("table name", name.into(), MAX_IDENTIFIER_BYTES)?,
187        })
188    }
189
190    pub fn namespace(&self) -> &str {
191        &self.namespace
192    }
193
194    pub fn name(&self) -> &str {
195        &self.name
196    }
197
198    fn display_name(&self) -> String {
199        format!("{}.{}", self.namespace, self.name)
200    }
201}
202
203/// Validated load-table response.
204#[derive(Clone, PartialEq)]
205pub struct LoadedIcebergTable {
206    metadata_location: String,
207    metadata: serde_json::Value,
208    config: BTreeMap<String, String>,
209}
210
211impl LoadedIcebergTable {
212    pub fn metadata_location(&self) -> &str {
213        &self.metadata_location
214    }
215
216    pub fn metadata(&self) -> &serde_json::Value {
217        &self.metadata
218    }
219
220    pub fn into_metadata(self) -> serde_json::Value {
221        self.metadata
222    }
223
224    /// Per-table configuration returned by the catalog.
225    ///
226    /// This map may contain credentials and must not be logged.
227    pub fn config(&self) -> &BTreeMap<String, String> {
228        &self.config
229    }
230}
231
232impl fmt::Debug for LoadedIcebergTable {
233    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
234        let format_version = self
235            .metadata
236            .get("format-version")
237            .and_then(serde_json::Value::as_u64);
238        let table_uuid = self
239            .metadata
240            .get("table-uuid")
241            .and_then(serde_json::Value::as_str);
242        formatter
243            .debug_struct("LoadedIcebergTable")
244            .field("has_metadata_location", &true)
245            .field("format_version", &format_version)
246            .field("table_uuid", &table_uuid)
247            .field("config_keys", &self.config.keys().collect::<Vec<_>>())
248            .finish()
249    }
250}
251
252/// Read-only operations implemented by the Iceberg REST catalog client.
253///
254/// Table mutations are intentionally absent until Krishiv has a typed commit
255/// request model with Iceberg requirements and updates.
256#[async_trait]
257pub trait IcebergCatalogClient: Send + Sync {
258    async fn list_tables(&self, namespace: &str) -> CatalogResult<Vec<String>>;
259
260    async fn load_table(&self, table: &IcebergTableId) -> CatalogResult<LoadedIcebergTable>;
261
262    async fn load_table_metadata(
263        &self,
264        table: &IcebergTableId,
265    ) -> CatalogResult<serde_json::Value> {
266        Ok(self.load_table(table).await?.into_metadata())
267    }
268}
269
270#[derive(Debug)]
271struct ResolvedCatalogConfig {
272    prefix_segments: Vec<String>,
273    namespace_separator: String,
274    endpoints: Option<HashSet<String>>,
275}
276
277/// Generic Apache Iceberg REST catalog.
278#[derive(Clone)]
279pub struct GenericRestCatalog {
280    config: RestCatalogConfig,
281    client: Client,
282    resolved: Arc<OnceCell<ResolvedCatalogConfig>>,
283}
284
285impl GenericRestCatalog {
286    /// Build a catalog with Krishiv's bounded default HTTP client.
287    pub fn new(config: RestCatalogConfig) -> CatalogResult<Self> {
288        let connect_timeout = config.timeout.min(DEFAULT_CONNECT_TIMEOUT);
289        let client = Client::builder()
290            .connect_timeout(connect_timeout)
291            .timeout(config.timeout)
292            .user_agent(USER_AGENT_VALUE)
293            .build()
294            .map_err(|error| CatalogError::InvalidConfiguration {
295                message: format!("failed to build catalog HTTP client: {error}"),
296            })?;
297        Self::with_http_client(config, client)
298    }
299
300    /// Build a catalog with a caller-configured HTTP client.
301    ///
302    /// This supports custom trust roots, proxies, and authentication headers.
303    /// The validated per-request timeout and response ceiling still apply.
304    pub fn with_http_client(config: RestCatalogConfig, client: Client) -> CatalogResult<Self> {
305        validate_base_url(&config.base_url)?;
306        Ok(Self {
307            config,
308            client,
309            resolved: Arc::new(OnceCell::new()),
310        })
311    }
312
313    async fn resolved_config(&self) -> CatalogResult<&ResolvedCatalogConfig> {
314        self.resolved
315            .get_or_try_init(|| async { self.fetch_catalog_config().await })
316            .await
317    }
318
319    async fn fetch_catalog_config(&self) -> CatalogResult<ResolvedCatalogConfig> {
320        let mut url = append_url_segments(&self.config.base_url, [API_VERSION_SEGMENT, "config"])?;
321        if let Some(warehouse) = self.config.warehouse() {
322            url.query_pairs_mut().append_pair("warehouse", warehouse);
323        }
324
325        let response: CatalogConfigResponse = self
326            .execute_json(
327                self.request(Method::GET, url),
328                "fetch catalog configuration",
329                NotFoundKind::None,
330            )
331            .await?;
332
333        let prefix = response
334            .overrides
335            .get("prefix")
336            .map(String::as_str)
337            .or(self.config.catalog_prefix())
338            .or_else(|| response.defaults.get("prefix").map(String::as_str))
339            .unwrap_or_default();
340        let prefix_segments = split_catalog_prefix(prefix)?;
341        let namespace_separator = response
342            .overrides
343            .get("namespace-separator")
344            .or_else(|| response.defaults.get("namespace-separator"))
345            .map(String::as_str)
346            .unwrap_or(DEFAULT_NAMESPACE_SEPARATOR);
347        let namespace_separator = decode_namespace_separator(namespace_separator)?;
348
349        let endpoints = response
350            .endpoints
351            .map(validate_advertised_endpoints)
352            .transpose()?;
353
354        Ok(ResolvedCatalogConfig {
355            prefix_segments,
356            namespace_separator,
357            endpoints,
358        })
359    }
360
361    fn request(&self, method: Method, url: Url) -> RequestBuilder {
362        let mut request = self
363            .client
364            .request(method, url)
365            .timeout(self.config.timeout)
366            .header(ACCEPT, "application/json")
367            .header(USER_AGENT, USER_AGENT_VALUE);
368        if let Some(token) = &self.config.bearer_token {
369            request = request.bearer_auth(token);
370        }
371        request
372    }
373
374    fn catalog_url(&self, resolved: &ResolvedCatalogConfig, suffix: &[&str]) -> CatalogResult<Url> {
375        let mut segments = Vec::with_capacity(1 + resolved.prefix_segments.len() + suffix.len());
376        segments.push(API_VERSION_SEGMENT);
377        segments.extend(resolved.prefix_segments.iter().map(String::as_str));
378        segments.extend_from_slice(suffix);
379        append_url_segments(&self.config.base_url, segments)
380    }
381
382    async fn execute_json<T>(
383        &self,
384        request: RequestBuilder,
385        operation: &'static str,
386        not_found: NotFoundKind,
387    ) -> CatalogResult<T>
388    where
389        T: DeserializeOwned,
390    {
391        let response = request
392            .send()
393            .await
394            .map_err(|error| CatalogError::Transport {
395                operation: operation.to_string(),
396                message: error.to_string(),
397            })?;
398        let status = response.status();
399
400        if !status.is_success() {
401            let body = read_error_body(response, operation).await;
402            return match (status, not_found) {
403                (StatusCode::NOT_FOUND, NotFoundKind::Namespace(name)) => {
404                    Err(CatalogError::SchemaNotFound { name })
405                }
406                (StatusCode::NOT_FOUND, NotFoundKind::Table(name)) => {
407                    Err(CatalogError::TableNotFound { name })
408                }
409                _ => Err(CatalogError::Http {
410                    status: status.as_u16(),
411                    message: describe_error_body(&body),
412                }),
413            };
414        }
415
416        let body = read_bounded_body(response, self.config.max_response_bytes, operation).await?;
417        serde_json::from_slice(&body).map_err(|error| CatalogError::InvalidResponse {
418            operation: operation.to_string(),
419            message: format!(
420                "response is not valid JSON at line {}, column {}: {error}",
421                error.line(),
422                error.column()
423            ),
424        })
425    }
426
427    fn require_endpoint(
428        resolved: &ResolvedCatalogConfig,
429        endpoint: RequiredEndpoint,
430    ) -> CatalogResult<()> {
431        let Some(endpoints) = &resolved.endpoints else {
432            return Ok(());
433        };
434        if endpoint.matches(endpoints, &resolved.prefix_segments) {
435            Ok(())
436        } else {
437            Err(CatalogError::UnsupportedOperation {
438                operation: endpoint.operation().to_string(),
439            })
440        }
441    }
442}
443
444impl fmt::Debug for GenericRestCatalog {
445    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
446        formatter
447            .debug_struct("GenericRestCatalog")
448            .field("config", &self.config)
449            .field("configuration_resolved", &self.resolved.get().is_some())
450            .finish()
451    }
452}
453
454#[async_trait]
455impl IcebergCatalogClient for GenericRestCatalog {
456    async fn list_tables(&self, namespace: &str) -> CatalogResult<Vec<String>> {
457        let namespace = validate_non_blank(
458            "table namespace",
459            namespace.to_string(),
460            MAX_IDENTIFIER_BYTES,
461        )?;
462        let resolved = self.resolved_config().await?;
463        Self::require_endpoint(resolved, RequiredEndpoint::ListTables)?;
464
465        let mut table_names = Vec::new();
466        let mut identifiers = HashSet::new();
467        let mut seen_page_tokens = HashSet::new();
468        let mut page_token = String::new();
469
470        for _ in 0..MAX_LIST_PAGES {
471            let mut url =
472                self.catalog_url(resolved, &["namespaces", namespace.as_str(), "tables"])?;
473            url.query_pairs_mut()
474                .append_pair("pageToken", &page_token)
475                .append_pair("pageSize", &self.config.page_size.to_string());
476
477            let page: ListTablesResponse = self
478                .execute_json(
479                    self.request(Method::GET, url),
480                    "list catalog tables",
481                    NotFoundKind::Namespace(namespace.clone()),
482                )
483                .await?;
484
485            for identifier in page.identifiers {
486                validate_identifier_response(
487                    &identifier,
488                    &namespace,
489                    &resolved.namespace_separator,
490                )?;
491                let key = (identifier.namespace, identifier.name.clone());
492                if !identifiers.insert(key) {
493                    return Err(CatalogError::InvalidResponse {
494                        operation: "list catalog tables".to_string(),
495                        message: format!(
496                            "server returned duplicate table identifier '{}'",
497                            identifier.name
498                        ),
499                    });
500                }
501                if table_names.len() == MAX_LISTED_TABLES {
502                    return Err(CatalogError::InvalidResponse {
503                        operation: "list catalog tables".to_string(),
504                        message: format!(
505                            "listing exceeds the maximum of {MAX_LISTED_TABLES} tables"
506                        ),
507                    });
508                }
509                table_names.push(identifier.name);
510            }
511
512            let Some(next_page_token) = page.next_page_token else {
513                return Ok(table_names);
514            };
515            if next_page_token.is_empty() {
516                return Err(CatalogError::InvalidResponse {
517                    operation: "list catalog tables".to_string(),
518                    message: "server returned an empty next-page-token".to_string(),
519                });
520            }
521            if !seen_page_tokens.insert(next_page_token.clone()) {
522                return Err(CatalogError::InvalidResponse {
523                    operation: "list catalog tables".to_string(),
524                    message: "server repeated a pagination token".to_string(),
525                });
526            }
527            page_token = next_page_token;
528        }
529
530        Err(CatalogError::InvalidResponse {
531            operation: "list catalog tables".to_string(),
532            message: format!("listing exceeded the maximum of {MAX_LIST_PAGES} pages"),
533        })
534    }
535
536    async fn load_table(&self, table: &IcebergTableId) -> CatalogResult<LoadedIcebergTable> {
537        let resolved = self.resolved_config().await?;
538        Self::require_endpoint(resolved, RequiredEndpoint::LoadTable)?;
539        let url = self.catalog_url(
540            resolved,
541            &["namespaces", table.namespace(), "tables", table.name()],
542        )?;
543        let response: LoadTableResponse = self
544            .execute_json(
545                self.request(Method::GET, url),
546                "load catalog table",
547                NotFoundKind::Table(table.display_name()),
548            )
549            .await?;
550        validate_loaded_table(response)
551    }
552}
553
554#[derive(Debug, Deserialize)]
555struct CatalogConfigResponse {
556    defaults: BTreeMap<String, String>,
557    overrides: BTreeMap<String, String>,
558    #[serde(default)]
559    endpoints: Option<Vec<String>>,
560}
561
562#[derive(Debug, Deserialize)]
563struct ListTablesResponse {
564    identifiers: Vec<TableIdentifierResponse>,
565    #[serde(rename = "next-page-token", default)]
566    next_page_token: Option<String>,
567}
568
569#[derive(Debug, Deserialize)]
570struct TableIdentifierResponse {
571    namespace: Vec<String>,
572    name: String,
573}
574
575#[derive(Debug, Deserialize)]
576struct LoadTableResponse {
577    #[serde(rename = "metadata-location")]
578    metadata_location: String,
579    metadata: serde_json::Value,
580    #[serde(default)]
581    config: BTreeMap<String, String>,
582}
583
584#[derive(Debug, Clone, Copy)]
585enum RequiredEndpoint {
586    ListTables,
587    LoadTable,
588}
589
590impl RequiredEndpoint {
591    fn operation(self) -> &'static str {
592        match self {
593            Self::ListTables => "listing Iceberg tables",
594            Self::LoadTable => "loading an Iceberg table",
595        }
596    }
597
598    fn template(self) -> &'static str {
599        match self {
600            Self::ListTables => "GET /v1/{prefix}/namespaces/{namespace}/tables",
601            Self::LoadTable => "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}",
602        }
603    }
604
605    fn matches(self, endpoints: &HashSet<String>, prefix_segments: &[String]) -> bool {
606        if endpoints.contains(self.template()) {
607            return true;
608        }
609
610        let prefix = prefix_segments.join("/");
611        let concrete = match self {
612            Self::ListTables if prefix.is_empty() => {
613                "GET /v1/namespaces/{namespace}/tables".to_string()
614            }
615            Self::LoadTable if prefix.is_empty() => {
616                "GET /v1/namespaces/{namespace}/tables/{table}".to_string()
617            }
618            Self::ListTables => {
619                format!("GET /v1/{prefix}/namespaces/{{namespace}}/tables")
620            }
621            Self::LoadTable => {
622                format!("GET /v1/{prefix}/namespaces/{{namespace}}/tables/{{table}}")
623            }
624        };
625        endpoints.contains(&concrete)
626    }
627}
628
629enum NotFoundKind {
630    None,
631    Namespace(String),
632    Table(String),
633}
634
635fn validate_base_url(url: &Url) -> CatalogResult<()> {
636    if !matches!(url.scheme(), "http" | "https") {
637        return Err(CatalogError::InvalidConfiguration {
638            message: format!(
639                "catalog URL scheme must be http or https, got '{}'",
640                url.scheme()
641            ),
642        });
643    }
644    if url.host_str().is_none() {
645        return Err(CatalogError::InvalidConfiguration {
646            message: "catalog URL must include a host".to_string(),
647        });
648    }
649    if !url.username().is_empty() || url.password().is_some() {
650        return Err(CatalogError::InvalidConfiguration {
651            message: "catalog URL must not contain embedded credentials".to_string(),
652        });
653    }
654    if url.query().is_some() || url.fragment().is_some() {
655        return Err(CatalogError::InvalidConfiguration {
656            message: "catalog URL must not contain a query string or fragment".to_string(),
657        });
658    }
659    if url.cannot_be_a_base() {
660        return Err(CatalogError::InvalidConfiguration {
661            message: "catalog URL cannot be used as a hierarchical base URL".to_string(),
662        });
663    }
664    Ok(())
665}
666
667fn validate_non_blank(label: &str, value: String, max_bytes: usize) -> CatalogResult<String> {
668    if value.trim().is_empty() {
669        return Err(CatalogError::InvalidConfiguration {
670            message: format!("{label} must not be blank"),
671        });
672    }
673    if value.trim() != value {
674        return Err(CatalogError::InvalidConfiguration {
675            message: format!("{label} must not have leading or trailing whitespace"),
676        });
677    }
678    if value.contains('\0') {
679        return Err(CatalogError::InvalidConfiguration {
680            message: format!("{label} must not contain NUL"),
681        });
682    }
683    if value.len() > max_bytes {
684        return Err(CatalogError::InvalidConfiguration {
685            message: format!("{label} exceeds the maximum of {max_bytes} bytes"),
686        });
687    }
688    Ok(value)
689}
690
691fn split_catalog_prefix(prefix: &str) -> CatalogResult<Vec<String>> {
692    let prefix = prefix.trim_matches('/');
693    if prefix.is_empty() {
694        return Ok(Vec::new());
695    }
696
697    prefix
698        .split('/')
699        .map(|segment| {
700            if segment.is_empty() || matches!(segment, "." | "..") {
701                return Err(CatalogError::InvalidConfiguration {
702                    message: format!("catalog prefix contains invalid segment '{segment}'"),
703                });
704            }
705            validate_non_blank(
706                "catalog prefix segment",
707                segment.to_string(),
708                MAX_IDENTIFIER_BYTES,
709            )
710        })
711        .collect()
712}
713
714fn validate_advertised_endpoints(endpoints: Vec<String>) -> CatalogResult<HashSet<String>> {
715    let mut validated = HashSet::with_capacity(endpoints.len());
716    for endpoint in endpoints {
717        let endpoint = validate_non_blank("advertised endpoint", endpoint, 4_096)?;
718        if !validated.insert(endpoint.clone()) {
719            return Err(CatalogError::InvalidResponse {
720                operation: "fetch catalog configuration".to_string(),
721                message: format!("server advertised endpoint '{endpoint}' more than once"),
722            });
723        }
724    }
725    Ok(validated)
726}
727
728fn decode_namespace_separator(encoded: &str) -> CatalogResult<String> {
729    let query = format!("separator={encoded}");
730    let decoded = url::form_urlencoded::parse(query.as_bytes())
731        .next()
732        .map(|(_, value)| value.into_owned())
733        .ok_or_else(|| CatalogError::InvalidResponse {
734            operation: "fetch catalog configuration".to_string(),
735            message: "namespace-separator could not be decoded".to_string(),
736        })?;
737    if decoded.is_empty() || decoded.contains('\0') || decoded.len() > 16 {
738        return Err(CatalogError::InvalidResponse {
739            operation: "fetch catalog configuration".to_string(),
740            message: "namespace-separator must decode to between 1 and 16 non-NUL bytes"
741                .to_string(),
742        });
743    }
744    Ok(decoded)
745}
746
747fn append_url_segments<I, S>(base_url: &Url, segments: I) -> CatalogResult<Url>
748where
749    I: IntoIterator<Item = S>,
750    S: AsRef<str>,
751{
752    let mut url = base_url.clone();
753    let mut path = url
754        .path_segments_mut()
755        .map_err(|_| CatalogError::InvalidConfiguration {
756            message: "catalog URL cannot accept path segments".to_string(),
757        })?;
758    path.pop_if_empty();
759    for segment in segments {
760        path.push(segment.as_ref());
761    }
762    drop(path);
763    Ok(url)
764}
765
766async fn read_bounded_body(
767    mut response: Response,
768    limit: usize,
769    operation: &str,
770) -> CatalogResult<Vec<u8>> {
771    if response
772        .content_length()
773        .is_some_and(|length| length > limit as u64)
774    {
775        return Err(CatalogError::ResponseTooLarge {
776            operation: operation.to_string(),
777            limit_bytes: limit,
778        });
779    }
780
781    let mut body = Vec::new();
782    while let Some(chunk) = response
783        .chunk()
784        .await
785        .map_err(|error| CatalogError::Transport {
786            operation: operation.to_string(),
787            message: format!("failed while reading response body: {error}"),
788        })?
789    {
790        let next_len =
791            body.len()
792                .checked_add(chunk.len())
793                .ok_or_else(|| CatalogError::ResponseTooLarge {
794                    operation: operation.to_string(),
795                    limit_bytes: limit,
796                })?;
797        if next_len > limit {
798            return Err(CatalogError::ResponseTooLarge {
799                operation: operation.to_string(),
800                limit_bytes: limit,
801            });
802        }
803        body.extend_from_slice(&chunk);
804    }
805    Ok(body)
806}
807
808async fn read_error_body(mut response: Response, operation: &str) -> Vec<u8> {
809    let mut body = Vec::new();
810    while body.len() < ERROR_BODY_LIMIT_BYTES {
811        let chunk = match response.chunk().await {
812            Ok(Some(chunk)) => chunk,
813            Ok(None) => return body,
814            Err(error) => {
815                let fallback = format!("failed to read error response during {operation}: {error}");
816                return fallback.into_bytes();
817            }
818        };
819        let remaining = ERROR_BODY_LIMIT_BYTES - body.len();
820        if chunk.len() > remaining {
821            body.extend_from_slice(chunk.get(..remaining).unwrap_or(&chunk));
822            body.extend_from_slice(b" [truncated]");
823            return body;
824        }
825        body.extend_from_slice(&chunk);
826    }
827    body
828}
829
830fn describe_error_body(body: &[u8]) -> String {
831    #[derive(Deserialize)]
832    struct ErrorEnvelope {
833        error: ErrorDetail,
834    }
835    #[derive(Deserialize)]
836    struct ErrorDetail {
837        message: String,
838        #[serde(rename = "type")]
839        error_type: Option<String>,
840        code: Option<u16>,
841    }
842
843    if let Ok(envelope) = serde_json::from_slice::<ErrorEnvelope>(body) {
844        let mut message = envelope.error.message;
845        if let Some(error_type) = envelope.error.error_type {
846            message = format!("{error_type}: {message}");
847        }
848        if let Some(code) = envelope.error.code {
849            message = format!("{message} (catalog code {code})");
850        }
851        return message;
852    }
853
854    let text = String::from_utf8_lossy(body).trim().to_string();
855    if text.is_empty() {
856        "catalog returned an empty error response".to_string()
857    } else {
858        text
859    }
860}
861
862fn validate_identifier_response(
863    identifier: &TableIdentifierResponse,
864    requested_namespace: &str,
865    namespace_separator: &str,
866) -> CatalogResult<()> {
867    if identifier.namespace.is_empty() {
868        return Err(CatalogError::InvalidResponse {
869            operation: "list catalog tables".to_string(),
870            message: format!(
871                "table identifier '{}' has an empty namespace",
872                identifier.name
873            ),
874        });
875    }
876    for namespace_part in &identifier.namespace {
877        validate_response_string("list catalog tables", "namespace component", namespace_part)?;
878    }
879    let response_namespace = identifier.namespace.join(namespace_separator);
880    if response_namespace != requested_namespace {
881        return Err(CatalogError::InvalidResponse {
882            operation: "list catalog tables".to_string(),
883            message: format!(
884                "server returned table '{}' from namespace '{}' while listing '{}'",
885                identifier.name, response_namespace, requested_namespace
886            ),
887        });
888    }
889    validate_response_string("list catalog tables", "table name", &identifier.name)
890}
891
892fn validate_loaded_table(response: LoadTableResponse) -> CatalogResult<LoadedIcebergTable> {
893    validate_response_string(
894        "load catalog table",
895        "metadata-location",
896        &response.metadata_location,
897    )?;
898    validate_absolute_uri(
899        "load catalog table",
900        "metadata-location",
901        &response.metadata_location,
902    )?;
903    for key in response.config.keys() {
904        validate_response_string("load catalog table", "table config key", key)?;
905    }
906
907    let metadata = response
908        .metadata
909        .as_object()
910        .ok_or_else(|| CatalogError::InvalidResponse {
911            operation: "load catalog table".to_string(),
912            message: "metadata must be a JSON object".to_string(),
913        })?;
914    let format_version = metadata
915        .get("format-version")
916        .and_then(serde_json::Value::as_u64)
917        .ok_or_else(|| CatalogError::InvalidResponse {
918            operation: "load catalog table".to_string(),
919            message: "metadata format-version must be an integer".to_string(),
920        })?;
921    if !(1..=3).contains(&format_version) {
922        return Err(CatalogError::InvalidResponse {
923            operation: "load catalog table".to_string(),
924            message: format!("unsupported Iceberg format-version {format_version}"),
925        });
926    }
927
928    let table_uuid = metadata
929        .get("table-uuid")
930        .and_then(serde_json::Value::as_str)
931        .ok_or_else(|| CatalogError::InvalidResponse {
932            operation: "load catalog table".to_string(),
933            message: "metadata table-uuid must be a string".to_string(),
934        })?;
935    Uuid::parse_str(table_uuid).map_err(|error| CatalogError::InvalidResponse {
936        operation: "load catalog table".to_string(),
937        message: format!("metadata table-uuid is invalid: {error}"),
938    })?;
939
940    if let Some(location) = metadata.get("location") {
941        let location = location
942            .as_str()
943            .ok_or_else(|| CatalogError::InvalidResponse {
944                operation: "load catalog table".to_string(),
945                message: "metadata location must be a string".to_string(),
946            })?;
947        validate_response_string("load catalog table", "metadata location", location)?;
948        validate_absolute_uri("load catalog table", "metadata location", location)?;
949    }
950
951    Ok(LoadedIcebergTable {
952        metadata_location: response.metadata_location,
953        metadata: response.metadata,
954        config: response.config,
955    })
956}
957
958fn validate_response_string(operation: &str, label: &str, value: &str) -> CatalogResult<()> {
959    if value.trim().is_empty() {
960        return Err(CatalogError::InvalidResponse {
961            operation: operation.to_string(),
962            message: format!("{label} must not be blank"),
963        });
964    }
965    if value.contains('\0') {
966        return Err(CatalogError::InvalidResponse {
967            operation: operation.to_string(),
968            message: format!("{label} must not contain NUL"),
969        });
970    }
971    if value.len() > MAX_IDENTIFIER_BYTES * 16 {
972        return Err(CatalogError::InvalidResponse {
973            operation: operation.to_string(),
974            message: format!("{label} is unreasonably large"),
975        });
976    }
977    Ok(())
978}
979
980fn validate_absolute_uri(operation: &str, label: &str, value: &str) -> CatalogResult<()> {
981    let uri = Url::parse(value).map_err(|error| CatalogError::InvalidResponse {
982        operation: operation.to_string(),
983        message: format!("{label} must be an absolute URI: {error}"),
984    })?;
985    if uri.cannot_be_a_base() {
986        return Err(CatalogError::InvalidResponse {
987            operation: operation.to_string(),
988            message: format!("{label} must be a hierarchical URI"),
989        });
990    }
991    Ok(())
992}
993
994#[cfg(test)]
995mod tests {
996    use std::time::Duration;
997
998    use reqwest::Client;
999    use serde_json::json;
1000    use wiremock::matchers::{header, method, path, query_param};
1001    use wiremock::{Mock, MockServer, ResponseTemplate};
1002
1003    use super::*;
1004
1005    async fn mount_config(server: &MockServer, body: serde_json::Value) {
1006        Mock::given(method("GET"))
1007            .and(path("/v1/config"))
1008            .respond_with(ResponseTemplate::new(200).set_body_json(body))
1009            .mount(server)
1010            .await;
1011    }
1012
1013    fn test_catalog(server: &MockServer) -> GenericRestCatalog {
1014        GenericRestCatalog::new(RestCatalogConfig::new(server.uri()).unwrap()).unwrap()
1015    }
1016
1017    fn valid_load_response() -> serde_json::Value {
1018        json!({
1019            "metadata-location": "s3://warehouse/db/table/metadata/v1.json",
1020            "metadata": {
1021                "format-version": 2,
1022                "table-uuid": "4d9d09d7-927d-47f6-9063-06e62f070a3b",
1023                "location": "s3://warehouse/db/table"
1024            },
1025            "config": {
1026                "token": "table-secret"
1027            }
1028        })
1029    }
1030
1031    #[test]
1032    fn configuration_rejects_unsafe_or_unbounded_values() {
1033        assert!(RestCatalogConfig::new("file:///tmp/catalog").is_err());
1034        assert!(RestCatalogConfig::new("https://user:secret@example.com").is_err());
1035        assert!(RestCatalogConfig::new("https://example.com?token=secret").is_err());
1036        assert!(
1037            RestCatalogConfig::new("https://example.com")
1038                .unwrap()
1039                .with_timeout(Duration::ZERO)
1040                .is_err()
1041        );
1042        assert!(
1043            RestCatalogConfig::new("https://example.com")
1044                .unwrap()
1045                .with_page_size(0)
1046                .is_err()
1047        );
1048        assert!(
1049            RestCatalogConfig::new("https://example.com")
1050                .unwrap()
1051                .with_max_response_bytes(0)
1052                .is_err()
1053        );
1054    }
1055
1056    #[test]
1057    fn configuration_debug_redacts_bearer_token() {
1058        let config = RestCatalogConfig::new("https://example.com")
1059            .unwrap()
1060            .with_bearer_token("top-secret")
1061            .unwrap();
1062        let debug = format!("{config:?}");
1063        assert!(!debug.contains("top-secret"));
1064        assert!(debug.contains("has_bearer_token: true"));
1065    }
1066
1067    #[test]
1068    fn table_identifier_is_validated() {
1069        assert!(IcebergTableId::new("", "events").is_err());
1070        assert!(IcebergTableId::new("analytics", " ").is_err());
1071        assert!(IcebergTableId::new(" analytics", "events").is_err());
1072        let table = IcebergTableId::new("analytics", "events").unwrap();
1073        assert_eq!(table.namespace(), "analytics");
1074        assert_eq!(table.name(), "events");
1075    }
1076
1077    #[test]
1078    fn url_segments_are_encoded_and_base_path_is_preserved() {
1079        let base = Url::parse("https://example.com/catalog/api/").unwrap();
1080        let url = append_url_segments(&base, ["v1", "namespaces", "a/b", "tables"]).unwrap();
1081        assert_eq!(
1082            url.as_str(),
1083            "https://example.com/catalog/api/v1/namespaces/a%2Fb/tables"
1084        );
1085    }
1086
1087    #[tokio::test]
1088    async fn config_negotiation_applies_warehouse_auth_and_server_prefix_override() {
1089        let server = MockServer::start().await;
1090        Mock::given(method("GET"))
1091            .and(path("/v1/config"))
1092            .and(query_param("warehouse", "s3://warehouse"))
1093            .and(header("authorization", "Bearer catalog-secret"))
1094            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1095                "defaults": { "prefix": "default" },
1096                "overrides": { "prefix": "tenant/prod" }
1097            })))
1098            .expect(1)
1099            .mount(&server)
1100            .await;
1101        Mock::given(method("GET"))
1102            .and(path("/v1/tenant/prod/namespaces/analytics/tables"))
1103            .and(query_param("pageToken", ""))
1104            .and(query_param("pageSize", "1000"))
1105            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1106                "identifiers": [
1107                    { "namespace": ["analytics"], "name": "events" }
1108                ]
1109            })))
1110            .mount(&server)
1111            .await;
1112
1113        let config = RestCatalogConfig::new(server.uri())
1114            .unwrap()
1115            .with_warehouse("s3://warehouse")
1116            .unwrap()
1117            .with_catalog_prefix("client")
1118            .unwrap()
1119            .with_bearer_token("catalog-secret")
1120            .unwrap();
1121        let catalog = GenericRestCatalog::new(config).unwrap();
1122
1123        assert_eq!(
1124            catalog.list_tables("analytics").await.unwrap(),
1125            vec!["events"]
1126        );
1127    }
1128
1129    #[tokio::test]
1130    async fn list_tables_follows_pagination_and_negotiates_once() {
1131        let server = MockServer::start().await;
1132        Mock::given(method("GET"))
1133            .and(path("/v1/config"))
1134            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1135                "defaults": {},
1136                "overrides": {}
1137            })))
1138            .expect(1)
1139            .mount(&server)
1140            .await;
1141        Mock::given(method("GET"))
1142            .and(path("/v1/namespaces/ns/tables"))
1143            .and(query_param("pageToken", ""))
1144            .and(query_param("pageSize", "2"))
1145            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1146                "identifiers": [
1147                    { "namespace": ["ns"], "name": "a" },
1148                    { "namespace": ["ns"], "name": "b" }
1149                ],
1150                "next-page-token": "page-2"
1151            })))
1152            .expect(2)
1153            .mount(&server)
1154            .await;
1155        Mock::given(method("GET"))
1156            .and(path("/v1/namespaces/ns/tables"))
1157            .and(query_param("pageToken", "page-2"))
1158            .and(query_param("pageSize", "2"))
1159            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1160                "identifiers": [
1161                    { "namespace": ["ns"], "name": "c" }
1162                ],
1163                "next-page-token": null
1164            })))
1165            .expect(2)
1166            .mount(&server)
1167            .await;
1168
1169        let config = RestCatalogConfig::new(server.uri())
1170            .unwrap()
1171            .with_page_size(2)
1172            .unwrap();
1173        let catalog = GenericRestCatalog::new(config).unwrap();
1174
1175        assert_eq!(
1176            catalog.list_tables("ns").await.unwrap(),
1177            vec!["a", "b", "c"]
1178        );
1179        assert_eq!(
1180            catalog.list_tables("ns").await.unwrap(),
1181            vec!["a", "b", "c"]
1182        );
1183    }
1184
1185    #[tokio::test]
1186    async fn list_tables_rejects_repeated_page_token() {
1187        let server = MockServer::start().await;
1188        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1189        Mock::given(method("GET"))
1190            .and(path("/v1/namespaces/ns/tables"))
1191            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1192                "identifiers": [],
1193                "next-page-token": "same"
1194            })))
1195            .mount(&server)
1196            .await;
1197
1198        let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1199        assert!(matches!(error, CatalogError::InvalidResponse { .. }));
1200        assert!(error.to_string().contains("repeated a pagination token"));
1201    }
1202
1203    #[tokio::test]
1204    async fn list_tables_rejects_malformed_and_duplicate_identifiers() {
1205        let server = MockServer::start().await;
1206        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1207        Mock::given(method("GET"))
1208            .and(path("/v1/namespaces/ns/tables"))
1209            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1210                "identifiers": [
1211                    { "namespace": ["ns"], "name": "events" },
1212                    { "namespace": ["ns"], "name": "events" }
1213                ]
1214            })))
1215            .mount(&server)
1216            .await;
1217
1218        let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1219        assert!(error.to_string().contains("duplicate table identifier"));
1220    }
1221
1222    #[tokio::test]
1223    async fn list_tables_validates_multipart_namespace_with_server_separator() {
1224        let server = MockServer::start().await;
1225        mount_config(
1226            &server,
1227            json!({
1228                "defaults": {},
1229                "overrides": { "namespace-separator": "%2E" }
1230            }),
1231        )
1232        .await;
1233        Mock::given(method("GET"))
1234            .and(path("/v1/namespaces/accounting.tax/tables"))
1235            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1236                "identifiers": [
1237                    { "namespace": ["accounting", "tax"], "name": "payments" }
1238                ]
1239            })))
1240            .mount(&server)
1241            .await;
1242
1243        assert_eq!(
1244            test_catalog(&server)
1245                .list_tables("accounting.tax")
1246                .await
1247                .unwrap(),
1248            vec!["payments"]
1249        );
1250    }
1251
1252    #[tokio::test]
1253    async fn list_tables_rejects_identifier_from_another_namespace() {
1254        let server = MockServer::start().await;
1255        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1256        Mock::given(method("GET"))
1257            .and(path("/v1/namespaces/ns/tables"))
1258            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1259                "identifiers": [
1260                    { "namespace": ["other"], "name": "events" }
1261                ]
1262            })))
1263            .mount(&server)
1264            .await;
1265
1266        let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1267        assert!(error.to_string().contains("while listing 'ns'"));
1268    }
1269
1270    #[tokio::test]
1271    async fn list_tables_rejects_missing_identifiers() {
1272        let server = MockServer::start().await;
1273        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1274        Mock::given(method("GET"))
1275            .and(path("/v1/namespaces/ns/tables"))
1276            .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
1277            .mount(&server)
1278            .await;
1279
1280        let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1281        assert!(matches!(error, CatalogError::InvalidResponse { .. }));
1282    }
1283
1284    #[tokio::test]
1285    async fn advertised_capabilities_are_enforced() {
1286        let server = MockServer::start().await;
1287        mount_config(
1288            &server,
1289            json!({
1290                "defaults": {},
1291                "overrides": {},
1292                "endpoints": [
1293                    "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}"
1294                ]
1295            }),
1296        )
1297        .await;
1298
1299        let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1300        assert!(matches!(error, CatalogError::UnsupportedOperation { .. }));
1301    }
1302
1303    #[tokio::test]
1304    async fn list_not_found_maps_to_schema_not_found() {
1305        let server = MockServer::start().await;
1306        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1307        Mock::given(method("GET"))
1308            .and(path("/v1/namespaces/missing/tables"))
1309            .respond_with(ResponseTemplate::new(404).set_body_json(json!({
1310                "error": {
1311                    "message": "namespace is missing",
1312                    "type": "NoSuchNamespaceException",
1313                    "code": 404
1314                }
1315            })))
1316            .mount(&server)
1317            .await;
1318
1319        let error = test_catalog(&server)
1320            .list_tables("missing")
1321            .await
1322            .unwrap_err();
1323        assert!(matches!(
1324            error,
1325            CatalogError::SchemaNotFound { name } if name == "missing"
1326        ));
1327    }
1328
1329    #[tokio::test]
1330    async fn load_table_returns_validated_envelope_and_redacts_config_debug() {
1331        let server = MockServer::start().await;
1332        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1333        Mock::given(method("GET"))
1334            .and(path("/v1/namespaces/ns/tables/events"))
1335            .respond_with(ResponseTemplate::new(200).set_body_json(valid_load_response()))
1336            .mount(&server)
1337            .await;
1338
1339        let table = IcebergTableId::new("ns", "events").unwrap();
1340        let loaded = test_catalog(&server).load_table(&table).await.unwrap();
1341        assert_eq!(
1342            loaded.metadata_location(),
1343            "s3://warehouse/db/table/metadata/v1.json"
1344        );
1345        assert_eq!(loaded.metadata()["format-version"], 2);
1346        assert_eq!(loaded.config().get("token").unwrap(), "table-secret");
1347        assert!(!format!("{loaded:?}").contains("table-secret"));
1348        assert!(!format!("{loaded:?}").contains("s3://warehouse"));
1349    }
1350
1351    #[tokio::test]
1352    async fn load_table_metadata_compatibility_helper_returns_only_metadata() {
1353        let server = MockServer::start().await;
1354        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1355        Mock::given(method("GET"))
1356            .and(path("/v1/namespaces/ns/tables/events"))
1357            .respond_with(ResponseTemplate::new(200).set_body_json(valid_load_response()))
1358            .mount(&server)
1359            .await;
1360
1361        let metadata = test_catalog(&server)
1362            .load_table_metadata(&IcebergTableId::new("ns", "events").unwrap())
1363            .await
1364            .unwrap();
1365        assert_eq!(metadata["format-version"], 2);
1366        assert!(metadata.get("metadata-location").is_none());
1367    }
1368
1369    #[tokio::test]
1370    async fn load_table_rejects_invalid_metadata_contract() {
1371        let server = MockServer::start().await;
1372        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1373        let mut response = valid_load_response();
1374        response["metadata"]["table-uuid"] = json!("not-a-uuid");
1375        Mock::given(method("GET"))
1376            .and(path("/v1/namespaces/ns/tables/events"))
1377            .respond_with(ResponseTemplate::new(200).set_body_json(response))
1378            .mount(&server)
1379            .await;
1380
1381        let error = test_catalog(&server)
1382            .load_table(&IcebergTableId::new("ns", "events").unwrap())
1383            .await
1384            .unwrap_err();
1385        assert!(matches!(error, CatalogError::InvalidResponse { .. }));
1386        assert!(error.to_string().contains("table-uuid is invalid"));
1387    }
1388
1389    #[tokio::test]
1390    async fn load_table_rejects_relative_metadata_location() {
1391        let server = MockServer::start().await;
1392        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1393        let mut response = valid_load_response();
1394        response["metadata-location"] = json!("metadata/v1.json");
1395        Mock::given(method("GET"))
1396            .and(path("/v1/namespaces/ns/tables/events"))
1397            .respond_with(ResponseTemplate::new(200).set_body_json(response))
1398            .mount(&server)
1399            .await;
1400
1401        let error = test_catalog(&server)
1402            .load_table(&IcebergTableId::new("ns", "events").unwrap())
1403            .await
1404            .unwrap_err();
1405        assert!(error.to_string().contains("must be an absolute URI"));
1406    }
1407
1408    #[tokio::test]
1409    async fn load_table_not_found_maps_to_table_not_found() {
1410        let server = MockServer::start().await;
1411        mount_config(&server, json!({ "defaults": {}, "overrides": {} })).await;
1412        Mock::given(method("GET"))
1413            .and(path("/v1/namespaces/ns/tables/missing"))
1414            .respond_with(ResponseTemplate::new(404))
1415            .mount(&server)
1416            .await;
1417
1418        let error = test_catalog(&server)
1419            .load_table(&IcebergTableId::new("ns", "missing").unwrap())
1420            .await
1421            .unwrap_err();
1422        assert!(matches!(
1423            error,
1424            CatalogError::TableNotFound { name } if name == "ns.missing"
1425        ));
1426    }
1427
1428    #[tokio::test]
1429    async fn response_body_limit_is_enforced_without_content_length_reliance() {
1430        let server = MockServer::start().await;
1431        Mock::given(method("GET"))
1432            .and(path("/v1/config"))
1433            .respond_with(
1434                ResponseTemplate::new(200).set_body_string(
1435                    json!({
1436                        "defaults": {},
1437                        "overrides": {},
1438                        "padding": "x".repeat(2_000)
1439                    })
1440                    .to_string(),
1441                ),
1442            )
1443            .mount(&server)
1444            .await;
1445
1446        let config = RestCatalogConfig::new(server.uri())
1447            .unwrap()
1448            .with_max_response_bytes(128)
1449            .unwrap();
1450        let error = GenericRestCatalog::new(config)
1451            .unwrap()
1452            .list_tables("ns")
1453            .await
1454            .unwrap_err();
1455        assert!(matches!(error, CatalogError::ResponseTooLarge { .. }));
1456    }
1457
1458    #[tokio::test]
1459    async fn iceberg_error_envelope_preserves_type_message_and_code() {
1460        let server = MockServer::start().await;
1461        Mock::given(method("GET"))
1462            .and(path("/v1/config"))
1463            .respond_with(ResponseTemplate::new(503).set_body_json(json!({
1464                "error": {
1465                    "message": "catalog unavailable",
1466                    "type": "ServiceUnavailableException",
1467                    "code": 503
1468                }
1469            })))
1470            .mount(&server)
1471            .await;
1472
1473        let error = test_catalog(&server).list_tables("ns").await.unwrap_err();
1474        assert!(matches!(error, CatalogError::Http { status: 503, .. }));
1475        let message = error.to_string();
1476        assert!(message.contains("ServiceUnavailableException"));
1477        assert!(message.contains("catalog unavailable"));
1478        assert!(message.contains("catalog code 503"));
1479    }
1480
1481    #[tokio::test]
1482    async fn custom_http_client_is_supported() {
1483        let server = MockServer::start().await;
1484        Mock::given(method("GET"))
1485            .and(path("/v1/config"))
1486            .and(header("x-catalog-tenant", "tenant-a"))
1487            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1488                "defaults": {},
1489                "overrides": {}
1490            })))
1491            .mount(&server)
1492            .await;
1493        Mock::given(method("GET"))
1494            .and(path("/v1/namespaces/ns/tables"))
1495            .and(header("x-catalog-tenant", "tenant-a"))
1496            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1497                "identifiers": []
1498            })))
1499            .mount(&server)
1500            .await;
1501
1502        let mut headers = reqwest::header::HeaderMap::new();
1503        headers.insert("x-catalog-tenant", "tenant-a".parse().unwrap());
1504        let client = Client::builder().default_headers(headers).build().unwrap();
1505        let catalog = GenericRestCatalog::with_http_client(
1506            RestCatalogConfig::new(server.uri()).unwrap(),
1507            client,
1508        )
1509        .unwrap();
1510        assert!(catalog.list_tables("ns").await.unwrap().is_empty());
1511    }
1512
1513    #[tokio::test]
1514    async fn request_timeout_is_reported_as_transport_error() {
1515        let server = MockServer::start().await;
1516        Mock::given(method("GET"))
1517            .and(path("/v1/config"))
1518            .respond_with(
1519                ResponseTemplate::new(200)
1520                    .set_delay(Duration::from_millis(100))
1521                    .set_body_json(json!({ "defaults": {}, "overrides": {} })),
1522            )
1523            .mount(&server)
1524            .await;
1525
1526        let config = RestCatalogConfig::new(server.uri())
1527            .unwrap()
1528            .with_timeout(Duration::from_millis(10))
1529            .unwrap();
1530        let error = GenericRestCatalog::new(config)
1531            .unwrap()
1532            .list_tables("ns")
1533            .await
1534            .unwrap_err();
1535        assert!(matches!(error, CatalogError::Transport { .. }));
1536    }
1537}