Skip to main content

shape_runtime/
schema_cache.rs

1//! Data-source schema cache backed by unified `shape.lock` artifacts.
2//!
3//! This replaces the legacy `shape.database.lock.json` sidecar file and keeps
4//! external schema state in the shared lockfile model.
5
6use crate::package_lock::{ArtifactDeterminism, LockedArtifact, PackageLock};
7use crate::type_schema::{
8    TypeSchema, TypeSchemaBuilder, typed_object_from_nb_pairs, typed_object_to_hashmap_nb,
9};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12use shape_value::ValueWord;
13use shape_wire::WireValue;
14use std::collections::{BTreeMap, HashMap};
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::RwLock;
18
19/// Top-level cache file path. Kept for call-site compatibility.
20pub const CACHE_FILENAME: &str = "shape.lock";
21
22const SCHEMA_CACHE_VERSION: u32 = 1;
23const SCHEMA_CACHE_NAMESPACE: &str = "external.datasource.schema";
24const SCHEMA_CACHE_PRODUCER: &str = "shape-runtime/schema_cache@v1";
25
26static DEFAULT_CACHE_PATH_OVERRIDE: std::sync::LazyLock<RwLock<Option<PathBuf>>> =
27    std::sync::LazyLock::new(|| RwLock::new(None));
28
29/// Diagnostic emitted while loading schema artifacts from `shape.lock`.
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct SchemaCacheDiagnostic {
32    pub key: String,
33    pub message: String,
34}
35
36/// Top-level cache model for external data-source schemas.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct DataSourceSchemaCache {
39    /// Schema cache format version.
40    pub version: u32,
41    /// Source URI -> schema mapping.
42    pub sources: HashMap<String, SourceSchema>,
43}
44
45/// Schema for a single data-source URI.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SourceSchema {
48    /// The source URI (e.g., "duckdb://analytics.db").
49    pub uri: String,
50    /// Entity name -> schema mapping.
51    pub tables: HashMap<String, EntitySchema>,
52    /// RFC3339 timestamp of when schemas were last fetched.
53    pub cached_at: String,
54}
55
56/// Schema for a single data-source entity.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct EntitySchema {
59    /// Entity name.
60    pub name: String,
61    /// Ordered list of fields.
62    pub columns: Vec<FieldSchema>,
63}
64
65/// Schema for a single field.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct FieldSchema {
68    /// Column name.
69    pub name: String,
70    /// Shape type: `int`, `number`, `string`, `bool`, `timestamp`.
71    #[serde(rename = "type")]
72    pub shape_type: String,
73    /// Whether the column accepts null values.
74    pub nullable: bool,
75}
76
77/// Convert a `SourceSchema` into a typed-object ValueWord payload.
78///
79/// This is used by module-capability extensions to return rich schema
80/// metadata through the shared module invocation path.
81pub fn source_schema_to_nb(schema: &SourceSchema) -> ValueWord {
82    let mut table_pairs: Vec<(String, ValueWord)> = schema
83        .tables
84        .iter()
85        .map(|(table_name, entity)| {
86            let columns = entity
87                .columns
88                .iter()
89                .map(|column| {
90                    typed_object_from_nb_pairs(&[
91                        (
92                            "name",
93                            ValueWord::from_string(Arc::new(column.name.clone())),
94                        ),
95                        (
96                            "type",
97                            ValueWord::from_string(Arc::new(column.shape_type.clone())),
98                        ),
99                        ("nullable", ValueWord::from_bool(column.nullable)),
100                    ])
101                })
102                .collect::<Vec<_>>();
103
104            let entity_nb = typed_object_from_nb_pairs(&[
105                (
106                    "name",
107                    ValueWord::from_string(Arc::new(entity.name.clone())),
108                ),
109                ("columns", ValueWord::from_array(Arc::new(columns))),
110            ]);
111
112            (table_name.clone(), entity_nb)
113        })
114        .collect();
115    table_pairs.sort_by(|left, right| left.0.cmp(&right.0));
116
117    let table_refs: Vec<(&str, ValueWord)> = table_pairs
118        .iter()
119        .map(|(name, value)| (name.as_str(), value.clone()))
120        .collect();
121    let tables_nb = typed_object_from_nb_pairs(&table_refs);
122
123    typed_object_from_nb_pairs(&[
124        ("uri", ValueWord::from_string(Arc::new(schema.uri.clone()))),
125        ("tables", tables_nb),
126        (
127            "cached_at",
128            ValueWord::from_string(Arc::new(schema.cached_at.clone())),
129        ),
130    ])
131}
132
133/// Decode a `SourceSchema` from a typed-object ValueWord payload.
134pub fn source_schema_from_nb(value: &ValueWord) -> Result<SourceSchema, String> {
135    let object = typed_object_to_hashmap_nb(value)
136        .ok_or_else(|| "schema payload must be a typed object".to_string())?;
137
138    let uri = object
139        .get("uri")
140        .and_then(|v| v.as_str().map(|s| s.to_string()))
141        .ok_or_else(|| "schema payload missing string field 'uri'".to_string())?;
142
143    let cached_at = object
144        .get("cached_at")
145        .and_then(|v| v.as_str().map(|s| s.to_string()))
146        .unwrap_or_default();
147
148    let tables_nb = object
149        .get("tables")
150        .ok_or_else(|| "schema payload missing object field 'tables'".to_string())?;
151    let tables_obj = typed_object_to_hashmap_nb(tables_nb)
152        .ok_or_else(|| "schema payload field 'tables' must be an object".to_string())?;
153
154    let mut tables = HashMap::new();
155    for (table_name, entity_nb) in tables_obj {
156        let entity_obj = typed_object_to_hashmap_nb(&entity_nb)
157            .ok_or_else(|| format!("table '{table_name}' schema must be an object"))?;
158
159        let entity_name = entity_obj
160            .get("name")
161            .and_then(|v| v.as_str().map(|s| s.to_string()))
162            .unwrap_or_else(|| table_name.clone());
163
164        let columns_nb = entity_obj
165            .get("columns")
166            .ok_or_else(|| format!("table '{table_name}' missing 'columns' array"))?;
167        let columns_arr = columns_nb
168            .as_any_array()
169            .ok_or_else(|| format!("table '{table_name}' field 'columns' must be an array"))?
170            .to_generic();
171
172        let mut columns = Vec::new();
173        for column_nb in columns_arr.iter() {
174            let column_obj = typed_object_to_hashmap_nb(column_nb)
175                .ok_or_else(|| format!("table '{table_name}' contains non-object column entry"))?;
176            let name = column_obj
177                .get("name")
178                .and_then(|v| v.as_str().map(|s| s.to_string()))
179                .ok_or_else(|| format!("table '{table_name}' column missing string 'name'"))?;
180            let shape_type = column_obj
181                .get("type")
182                .or_else(|| column_obj.get("shape_type"))
183                .and_then(|v| v.as_str().map(|s| s.to_string()))
184                .ok_or_else(|| format!("table '{table_name}' column '{name}' missing type"))?;
185            let nullable = column_obj
186                .get("nullable")
187                .and_then(|v| v.as_bool())
188                .unwrap_or(false);
189
190            columns.push(FieldSchema {
191                name,
192                shape_type,
193                nullable,
194            });
195        }
196
197        tables.insert(
198            table_name.clone(),
199            EntitySchema {
200                name: entity_name,
201                columns,
202            },
203        );
204    }
205
206    Ok(SourceSchema {
207        uri,
208        tables,
209        cached_at,
210    })
211}
212
213/// Decode a `SourceSchema` from a shape-wire object payload.
214pub fn source_schema_from_wire(value: &WireValue) -> Result<SourceSchema, String> {
215    let object = match value {
216        WireValue::Object(map) => map,
217        _ => return Err("schema payload must be an object".to_string()),
218    };
219
220    let uri = object
221        .get("uri")
222        .and_then(|v| match v {
223            WireValue::String(s) => Some(s.clone()),
224            _ => None,
225        })
226        .ok_or_else(|| "schema payload missing string field 'uri'".to_string())?;
227
228    let cached_at = object
229        .get("cached_at")
230        .and_then(|v| match v {
231            WireValue::String(s) => Some(s.clone()),
232            _ => None,
233        })
234        .unwrap_or_default();
235
236    let tables_value = object
237        .get("tables")
238        .ok_or_else(|| "schema payload missing object field 'tables'".to_string())?;
239    let tables_object = match tables_value {
240        WireValue::Object(map) => map,
241        _ => return Err("schema payload field 'tables' must be an object".to_string()),
242    };
243
244    let mut tables = HashMap::new();
245    for (table_name, entity_value) in tables_object {
246        let entity_obj = match entity_value {
247            WireValue::Object(map) => map,
248            _ => return Err(format!("table '{table_name}' schema must be an object")),
249        };
250
251        let entity_name = entity_obj
252            .get("name")
253            .and_then(|v| match v {
254                WireValue::String(s) => Some(s.clone()),
255                _ => None,
256            })
257            .unwrap_or_else(|| table_name.clone());
258
259        let columns_value = entity_obj
260            .get("columns")
261            .ok_or_else(|| format!("table '{table_name}' missing 'columns' array"))?;
262        let columns_array = match columns_value {
263            WireValue::Array(values) => values,
264            _ => {
265                return Err(format!(
266                    "table '{table_name}' field 'columns' must be an array"
267                ));
268            }
269        };
270
271        let mut columns = Vec::new();
272        for column_value in columns_array {
273            let column_obj = match column_value {
274                WireValue::Object(map) => map,
275                _ => {
276                    return Err(format!(
277                        "table '{table_name}' contains non-object column entry"
278                    ));
279                }
280            };
281
282            let name = column_obj
283                .get("name")
284                .and_then(|v| match v {
285                    WireValue::String(s) => Some(s.clone()),
286                    _ => None,
287                })
288                .ok_or_else(|| format!("table '{table_name}' column missing string 'name'"))?;
289
290            let shape_type = column_obj
291                .get("type")
292                .or_else(|| column_obj.get("shape_type"))
293                .and_then(|v| match v {
294                    WireValue::String(s) => Some(s.clone()),
295                    _ => None,
296                })
297                .ok_or_else(|| format!("table '{table_name}' column '{name}' missing type"))?;
298
299            let nullable = column_obj
300                .get("nullable")
301                .and_then(|v| match v {
302                    WireValue::Bool(b) => Some(*b),
303                    _ => None,
304                })
305                .unwrap_or(false);
306
307            columns.push(FieldSchema {
308                name,
309                shape_type,
310                nullable,
311            });
312        }
313
314        tables.insert(
315            table_name.clone(),
316            EntitySchema {
317                name: entity_name,
318                columns,
319            },
320        );
321    }
322
323    Ok(SourceSchema {
324        uri,
325        tables,
326        cached_at,
327    })
328}
329
330impl DataSourceSchemaCache {
331    /// Create an empty cache.
332    pub fn new() -> Self {
333        Self {
334            version: SCHEMA_CACHE_VERSION,
335            sources: HashMap::new(),
336        }
337    }
338
339    /// Save cache entries into `shape.lock` artifacts.
340    pub fn save(&self, path: &Path) -> std::io::Result<()> {
341        let mut lock = PackageLock::read(path).unwrap_or_default();
342
343        // Replace this namespace wholesale to keep the lock deterministic.
344        lock.artifacts
345            .retain(|artifact| artifact.namespace != SCHEMA_CACHE_NAMESPACE);
346
347        let mut uris: Vec<_> = self.sources.keys().cloned().collect();
348        uris.sort();
349
350        for uri in uris {
351            let Some(source) = self.sources.get(&uri) else {
352                continue;
353            };
354
355            let schema_hash = hash_source_schema(source);
356            let payload = source_to_payload(source);
357
358            let mut inputs = BTreeMap::new();
359            inputs.insert("uri".to_string(), uri.clone());
360            inputs.insert("schema_hash".to_string(), schema_hash.clone());
361
362            let determinism = ArtifactDeterminism::External {
363                fingerprints: BTreeMap::from([(format!("schema:{uri}"), schema_hash)]),
364            };
365
366            let artifact = LockedArtifact::new(
367                SCHEMA_CACHE_NAMESPACE,
368                uri,
369                SCHEMA_CACHE_PRODUCER,
370                determinism,
371                inputs,
372                payload,
373            )
374            .map_err(std::io::Error::other)?;
375
376            lock.upsert_artifact(artifact)
377                .map_err(std::io::Error::other)?;
378        }
379
380        lock.write(path)
381    }
382
383    /// Load cache entries from `shape.lock` artifacts.
384    pub fn load(path: &Path) -> std::io::Result<Self> {
385        let (cache, diagnostics) = Self::load_with_diagnostics(path)?;
386        if let Some(diag) = diagnostics.first() {
387            return Err(std::io::Error::new(
388                std::io::ErrorKind::InvalidData,
389                format!("invalid schema artifact '{}': {}", diag.key, diag.message),
390            ));
391        }
392        Ok(cache)
393    }
394
395    /// Load cache entries from `shape.lock` artifacts and collect diagnostics for
396    /// stale/invalid artifacts while keeping valid entries.
397    pub fn load_with_diagnostics(
398        path: &Path,
399    ) -> std::io::Result<(Self, Vec<SchemaCacheDiagnostic>)> {
400        let lock = PackageLock::read(path).ok_or_else(|| {
401            std::io::Error::new(std::io::ErrorKind::NotFound, "shape.lock not found")
402        })?;
403
404        let mut sources = HashMap::new();
405        let mut diagnostics = Vec::new();
406        for artifact in lock
407            .artifacts
408            .iter()
409            .filter(|artifact| artifact.namespace == SCHEMA_CACHE_NAMESPACE)
410        {
411            let payload = match artifact.payload() {
412                Ok(payload) => payload,
413                Err(err) => {
414                    diagnostics.push(SchemaCacheDiagnostic {
415                        key: artifact.key.clone(),
416                        message: format!("payload decode failed: {err}"),
417                    });
418                    continue;
419                }
420            };
421            let source = match payload_to_source(&artifact.key, &payload) {
422                Ok(source) => source,
423                Err(err) => {
424                    diagnostics.push(SchemaCacheDiagnostic {
425                        key: artifact.key.clone(),
426                        message: format!("payload parse failed: {err}"),
427                    });
428                    continue;
429                }
430            };
431
432            if let Some(expected_hash) = source_fingerprint_hash(artifact, &source.uri) {
433                let actual_hash = hash_source_schema(&source);
434                if expected_hash != actual_hash {
435                    diagnostics.push(SchemaCacheDiagnostic {
436                        key: artifact.key.clone(),
437                        message: format!(
438                            "stale schema fingerprint (expected {expected_hash}, computed {actual_hash})"
439                        ),
440                    });
441                    continue;
442                }
443            } else {
444                diagnostics.push(SchemaCacheDiagnostic {
445                    key: artifact.key.clone(),
446                    message: "missing schema fingerprint".to_string(),
447                });
448                continue;
449            }
450            sources.insert(source.uri.clone(), source);
451        }
452
453        Ok((
454            Self {
455                version: SCHEMA_CACHE_VERSION,
456                sources,
457            },
458            diagnostics,
459        ))
460    }
461
462    /// Try to load cache entries, returning an empty cache if lockfile is missing.
463    pub fn load_or_empty(path: &Path) -> Self {
464        match Self::load(path) {
465            Ok(cache) => cache,
466            Err(_) => Self::new(),
467        }
468    }
469
470    /// Get schema for a specific source URI.
471    pub fn get_source(&self, uri: &str) -> Option<&SourceSchema> {
472        self.sources.get(uri)
473    }
474
475    /// Insert or update a source schema.
476    pub fn upsert_source(&mut self, schema: SourceSchema) {
477        self.sources.insert(schema.uri.clone(), schema);
478    }
479
480    /// Check if offline mode is enabled via `SHAPE_OFFLINE=true`.
481    pub fn is_offline() -> bool {
482        std::env::var("SHAPE_OFFLINE")
483            .map(|value| value == "true" || value == "1")
484            .unwrap_or(false)
485    }
486}
487
488/// Load cached schemas and convert matching sources into runtime `TypeSchema`s.
489///
490/// `uri_prefixes` filters which sources are included (for example:
491/// `["duckdb://"]` or `["postgres://", "postgresql://"]`).
492pub fn load_cached_type_schemas_for_uri_prefixes(
493    cache_path: &Path,
494    uri_prefixes: &[&str],
495) -> std::io::Result<Vec<TypeSchema>> {
496    let (schemas, _diagnostics) =
497        load_cached_type_schemas_for_uri_prefixes_with_diagnostics(cache_path, uri_prefixes)?;
498    Ok(schemas)
499}
500
501/// Like [`load_cached_type_schemas_for_uri_prefixes`] but also returns
502/// non-fatal diagnostics for invalid/stale artifacts that were ignored.
503pub fn load_cached_type_schemas_for_uri_prefixes_with_diagnostics(
504    cache_path: &Path,
505    uri_prefixes: &[&str],
506) -> std::io::Result<(Vec<TypeSchema>, Vec<SchemaCacheDiagnostic>)> {
507    let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
508    let mut schemas = Vec::new();
509
510    for source in cache.sources.values() {
511        if !uri_prefixes.is_empty()
512            && !uri_prefixes
513                .iter()
514                .any(|prefix| source.uri.starts_with(prefix))
515        {
516            continue;
517        }
518
519        for table in source.tables.values() {
520            schemas.push(type_schema_from_entity(table));
521        }
522    }
523
524    Ok((schemas, diagnostics))
525}
526
527/// Resolve the default `shape.lock` path from the current working directory.
528pub fn default_cache_path() -> PathBuf {
529    if let Ok(guard) = DEFAULT_CACHE_PATH_OVERRIDE.read() {
530        if let Some(path) = guard.as_ref() {
531            return path.clone();
532        }
533    }
534
535    std::env::current_dir()
536        .unwrap_or_default()
537        .join(CACHE_FILENAME)
538}
539
540/// Override the default lock/cache path used by extension-side schema cache
541/// helpers during this process.
542pub fn set_default_cache_path(path: Option<PathBuf>) {
543    if let Ok(mut guard) = DEFAULT_CACHE_PATH_OVERRIDE.write() {
544        *guard = path;
545    }
546}
547
548/// Load one cached source schema by URI.
549pub fn load_cached_source_for_uri(cache_path: &Path, uri: &str) -> std::io::Result<SourceSchema> {
550    let (source, diagnostics) = load_cached_source_for_uri_with_diagnostics(cache_path, uri)?;
551    if let Some(diag) = diagnostics.first() {
552        return Err(std::io::Error::new(
553            std::io::ErrorKind::InvalidData,
554            format!("invalid schema artifact '{}': {}", diag.key, diag.message),
555        ));
556    }
557    Ok(source)
558}
559
560/// Load one cached source schema by URI and keep non-fatal diagnostics for
561/// stale/invalid artifacts that were ignored.
562pub fn load_cached_source_for_uri_with_diagnostics(
563    cache_path: &Path,
564    uri: &str,
565) -> std::io::Result<(SourceSchema, Vec<SchemaCacheDiagnostic>)> {
566    let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
567    let source = cache.get_source(uri).cloned().ok_or_else(|| {
568        std::io::Error::new(
569            std::io::ErrorKind::NotFound,
570            format!("no cached schema for '{uri}'"),
571        )
572    })?;
573    Ok((source, diagnostics))
574}
575
576impl Default for DataSourceSchemaCache {
577    fn default() -> Self {
578        Self::new()
579    }
580}
581
582impl SourceSchema {
583    /// Get schema for a specific entity.
584    pub fn get_entity(&self, name: &str) -> Option<&EntitySchema> {
585        self.tables.get(name)
586    }
587
588    /// Get all entity names.
589    pub fn entity_names(&self) -> Vec<&str> {
590        self.tables.keys().map(|name| name.as_str()).collect()
591    }
592}
593
594impl EntitySchema {
595    /// Get a field by name.
596    pub fn get_field(&self, name: &str) -> Option<&FieldSchema> {
597        self.columns.iter().find(|column| column.name == name)
598    }
599
600    /// Get all field names.
601    pub fn field_names(&self) -> Vec<&str> {
602        self.columns
603            .iter()
604            .map(|column| column.name.as_str())
605            .collect()
606    }
607}
608
609fn hash_source_schema(source: &SourceSchema) -> String {
610    let mut hasher = Sha256::new();
611    hasher.update(source.uri.as_bytes());
612    hasher.update([0]);
613
614    let mut entity_names: Vec<_> = source.tables.keys().cloned().collect();
615    entity_names.sort();
616    for entity_name in entity_names {
617        hasher.update(entity_name.as_bytes());
618        hasher.update([0]);
619
620        if let Some(entity) = source.tables.get(&entity_name) {
621            for field in &entity.columns {
622                hasher.update(field.name.as_bytes());
623                hasher.update([0]);
624                hasher.update(field.shape_type.as_bytes());
625                hasher.update([0]);
626                hasher.update([if field.nullable { 1 } else { 0 }]);
627            }
628        }
629    }
630
631    format!("sha256:{:x}", hasher.finalize())
632}
633
634fn source_fingerprint_hash<'a>(artifact: &'a LockedArtifact, uri: &str) -> Option<&'a str> {
635    if let ArtifactDeterminism::External { fingerprints } = &artifact.determinism {
636        let key = format!("schema:{uri}");
637        if let Some(value) = fingerprints.get(&key) {
638            return Some(value.as_str());
639        }
640    }
641    artifact.inputs.get("schema_hash").map(String::as_str)
642}
643
644fn type_schema_from_entity(entity: &EntitySchema) -> TypeSchema {
645    let schema_name = format!("DbRow_{}", entity.name);
646    let builder =
647        entity
648            .columns
649            .iter()
650            .fold(
651                TypeSchemaBuilder::new(&schema_name),
652                |builder, field| match field.shape_type.as_str() {
653                    "int" => builder.i64_field(&field.name),
654                    "number" => builder.f64_field(&field.name),
655                    "string" => builder.string_field(&field.name),
656                    "bool" => builder.bool_field(&field.name),
657                    "timestamp" => builder.timestamp_field(&field.name),
658                    _ => builder.any_field(&field.name),
659                },
660            );
661    builder.build()
662}
663
664fn source_to_payload(source: &SourceSchema) -> shape_wire::WireValue {
665    let mut entities: Vec<_> = source.tables.values().collect();
666    entities.sort_by(|left, right| left.name.cmp(&right.name));
667
668    let entity_values = entities
669        .into_iter()
670        .map(|entity| {
671            let field_values = entity
672                .columns
673                .iter()
674                .map(|field| {
675                    shape_wire::WireValue::Object(BTreeMap::from([
676                        (
677                            "name".to_string(),
678                            shape_wire::WireValue::String(field.name.clone()),
679                        ),
680                        (
681                            "shape_type".to_string(),
682                            shape_wire::WireValue::String(field.shape_type.clone()),
683                        ),
684                        (
685                            "nullable".to_string(),
686                            shape_wire::WireValue::Bool(field.nullable),
687                        ),
688                    ]))
689                })
690                .collect::<Vec<_>>();
691
692            shape_wire::WireValue::Object(BTreeMap::from([
693                (
694                    "name".to_string(),
695                    shape_wire::WireValue::String(entity.name.clone()),
696                ),
697                (
698                    "columns".to_string(),
699                    shape_wire::WireValue::Array(field_values),
700                ),
701            ]))
702        })
703        .collect::<Vec<_>>();
704
705    shape_wire::WireValue::Object(BTreeMap::from([
706        (
707            "uri".to_string(),
708            shape_wire::WireValue::String(source.uri.clone()),
709        ),
710        (
711            "cached_at".to_string(),
712            shape_wire::WireValue::String(source.cached_at.clone()),
713        ),
714        (
715            "tables".to_string(),
716            shape_wire::WireValue::Array(entity_values),
717        ),
718    ]))
719}
720
721fn payload_to_source(
722    key_hint: &str,
723    payload: &shape_wire::WireValue,
724) -> Result<SourceSchema, String> {
725    let shape_wire::WireValue::Object(map) = payload else {
726        return Err("source payload must be an object".to_string());
727    };
728
729    let uri = map
730        .get("uri")
731        .and_then(shape_wire::WireValue::as_str)
732        .map(|value| value.to_string())
733        .filter(|value| !value.is_empty())
734        .unwrap_or_else(|| key_hint.to_string());
735    let cached_at = map
736        .get("cached_at")
737        .and_then(shape_wire::WireValue::as_str)
738        .unwrap_or("")
739        .to_string();
740
741    let tables_value = map
742        .get("tables")
743        .ok_or_else(|| "source payload missing 'tables'".to_string())?;
744    let shape_wire::WireValue::Array(table_values) = tables_value else {
745        return Err("source payload 'tables' must be an array".to_string());
746    };
747
748    let mut tables = HashMap::new();
749    for table_value in table_values {
750        let entity = payload_to_entity(table_value)?;
751        tables.insert(entity.name.clone(), entity);
752    }
753
754    Ok(SourceSchema {
755        uri,
756        tables,
757        cached_at,
758    })
759}
760
761fn payload_to_entity(value: &shape_wire::WireValue) -> Result<EntitySchema, String> {
762    let shape_wire::WireValue::Object(table_map) = value else {
763        return Err("table payload must be an object".to_string());
764    };
765
766    let name = table_map
767        .get("name")
768        .and_then(shape_wire::WireValue::as_str)
769        .ok_or_else(|| "table payload missing 'name'".to_string())?
770        .to_string();
771    let columns_value = table_map
772        .get("columns")
773        .ok_or_else(|| "table payload missing 'columns'".to_string())?;
774    let shape_wire::WireValue::Array(column_values) = columns_value else {
775        return Err("table payload 'columns' must be an array".to_string());
776    };
777
778    let mut columns = Vec::with_capacity(column_values.len());
779    for column_value in column_values {
780        columns.push(payload_to_field(column_value)?);
781    }
782
783    Ok(EntitySchema { name, columns })
784}
785
786fn payload_to_field(value: &shape_wire::WireValue) -> Result<FieldSchema, String> {
787    let shape_wire::WireValue::Object(column_map) = value else {
788        return Err("column payload must be an object".to_string());
789    };
790
791    let name = column_map
792        .get("name")
793        .and_then(shape_wire::WireValue::as_str)
794        .ok_or_else(|| "column payload missing 'name'".to_string())?
795        .to_string();
796    let shape_type = column_map
797        .get("shape_type")
798        .and_then(shape_wire::WireValue::as_str)
799        .ok_or_else(|| "column payload missing 'shape_type'".to_string())?
800        .to_string();
801    let nullable = column_map
802        .get("nullable")
803        .and_then(shape_wire::WireValue::as_bool)
804        .ok_or_else(|| "column payload missing 'nullable'".to_string())?;
805
806    Ok(FieldSchema {
807        name,
808        shape_type,
809        nullable,
810    })
811}
812
813#[cfg(test)]
814mod tests {
815    use super::*;
816    use crate::package_lock::ArtifactDeterminism;
817
818    fn sample_cache() -> DataSourceSchemaCache {
819        let mut cache = DataSourceSchemaCache::new();
820        let mut tables = HashMap::new();
821        tables.insert(
822            "users".to_string(),
823            EntitySchema {
824                name: "users".to_string(),
825                columns: vec![
826                    FieldSchema {
827                        name: "id".to_string(),
828                        shape_type: "int".to_string(),
829                        nullable: false,
830                    },
831                    FieldSchema {
832                        name: "name".to_string(),
833                        shape_type: "string".to_string(),
834                        nullable: false,
835                    },
836                    FieldSchema {
837                        name: "age".to_string(),
838                        shape_type: "int".to_string(),
839                        nullable: true,
840                    },
841                ],
842            },
843        );
844        cache.upsert_source(SourceSchema {
845            uri: "duckdb://analytics.db".to_string(),
846            tables,
847            cached_at: "2026-02-12T10:00:00Z".to_string(),
848        });
849        cache
850    }
851
852    #[test]
853    fn test_roundtrip_serialization() {
854        let cache = sample_cache();
855
856        let dir = tempfile::tempdir().unwrap();
857        let path = dir.path().join(CACHE_FILENAME);
858        cache.save(&path).unwrap();
859
860        let loaded = DataSourceSchemaCache::load(&path).unwrap();
861        assert_eq!(loaded.version, SCHEMA_CACHE_VERSION);
862        assert_eq!(loaded.sources.len(), 1);
863
864        let conn = loaded.get_source("duckdb://analytics.db").unwrap();
865        assert_eq!(conn.tables.len(), 1);
866
867        let users = conn.get_entity("users").unwrap();
868        assert_eq!(users.columns.len(), 3);
869        assert_eq!(users.columns[0].name, "id");
870        assert_eq!(users.columns[0].shape_type, "int");
871        assert!(!users.columns[0].nullable);
872        assert_eq!(users.columns[2].name, "age");
873        assert!(users.columns[2].nullable);
874    }
875
876    #[test]
877    fn test_load_or_empty_missing_file() {
878        let cache = DataSourceSchemaCache::load_or_empty(Path::new("/nonexistent/path.toml"));
879        assert_eq!(cache.version, SCHEMA_CACHE_VERSION);
880        assert!(cache.sources.is_empty());
881    }
882
883    #[test]
884    fn test_source_helpers() {
885        let cache = sample_cache();
886        let conn = cache.get_source("duckdb://analytics.db").unwrap();
887
888        let names = conn.entity_names();
889        assert!(names.contains(&"users"));
890
891        let users = conn.get_entity("users").unwrap();
892        assert_eq!(users.field_names(), vec!["id", "name", "age"]);
893        assert!(users.get_field("id").is_some());
894        assert!(users.get_field("nonexistent").is_none());
895    }
896
897    #[test]
898    fn test_upsert_source() {
899        let mut cache = DataSourceSchemaCache::new();
900        assert!(cache.sources.is_empty());
901
902        cache.upsert_source(SourceSchema {
903            uri: "duckdb://test.db".to_string(),
904            tables: HashMap::new(),
905            cached_at: "2026-01-01T00:00:00Z".to_string(),
906        });
907        assert_eq!(cache.sources.len(), 1);
908
909        cache.upsert_source(SourceSchema {
910            uri: "duckdb://test.db".to_string(),
911            tables: HashMap::new(),
912            cached_at: "2026-02-01T00:00:00Z".to_string(),
913        });
914        assert_eq!(cache.sources.len(), 1);
915        assert_eq!(
916            cache.get_source("duckdb://test.db").unwrap().cached_at,
917            "2026-02-01T00:00:00Z"
918        );
919    }
920
921    #[test]
922    fn test_load_with_diagnostics_reports_stale_fingerprint() {
923        let cache = sample_cache();
924        let source = cache.get_source("duckdb://analytics.db").unwrap();
925
926        let payload = source_to_payload(source);
927        let artifact = LockedArtifact::new(
928            SCHEMA_CACHE_NAMESPACE,
929            source.uri.clone(),
930            SCHEMA_CACHE_PRODUCER,
931            ArtifactDeterminism::External {
932                fingerprints: BTreeMap::from([(
933                    format!("schema:{}", source.uri),
934                    "sha256:deadbeef".to_string(),
935                )]),
936            },
937            BTreeMap::from([
938                ("uri".to_string(), source.uri.clone()),
939                ("schema_hash".to_string(), "sha256:deadbeef".to_string()),
940            ]),
941            payload,
942        )
943        .unwrap();
944
945        let mut lock = PackageLock::new();
946        lock.upsert_artifact(artifact).unwrap();
947
948        let dir = tempfile::tempdir().unwrap();
949        let path = dir.path().join(CACHE_FILENAME);
950        lock.write(&path).unwrap();
951
952        let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
953        assert!(loaded.sources.is_empty());
954        assert_eq!(diagnostics.len(), 1);
955        assert!(diagnostics[0].message.contains("stale schema fingerprint"));
956    }
957
958    #[test]
959    fn test_load_with_diagnostics_reports_invalid_payload() {
960        let artifact = LockedArtifact::new(
961            SCHEMA_CACHE_NAMESPACE,
962            "broken://source",
963            SCHEMA_CACHE_PRODUCER,
964            ArtifactDeterminism::External {
965                fingerprints: BTreeMap::from([(
966                    "schema:broken://source".to_string(),
967                    "sha256:abc".to_string(),
968                )]),
969            },
970            BTreeMap::from([
971                ("uri".to_string(), "broken://source".to_string()),
972                ("schema_hash".to_string(), "sha256:abc".to_string()),
973            ]),
974            shape_wire::WireValue::String("bad".to_string()),
975        )
976        .unwrap();
977
978        let mut lock = PackageLock::new();
979        lock.upsert_artifact(artifact).unwrap();
980
981        let dir = tempfile::tempdir().unwrap();
982        let path = dir.path().join(CACHE_FILENAME);
983        lock.write(&path).unwrap();
984
985        let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
986        assert!(loaded.sources.is_empty());
987        assert_eq!(diagnostics.len(), 1);
988        assert!(diagnostics[0].message.contains("payload parse failed"));
989    }
990
991    #[test]
992    fn test_load_cached_type_schemas_for_uri_prefixes_filters_sources() {
993        let mut cache = DataSourceSchemaCache::new();
994
995        let mut duck_tables = HashMap::new();
996        duck_tables.insert(
997            "users".to_string(),
998            EntitySchema {
999                name: "users".to_string(),
1000                columns: vec![FieldSchema {
1001                    name: "id".to_string(),
1002                    shape_type: "int".to_string(),
1003                    nullable: false,
1004                }],
1005            },
1006        );
1007        cache.upsert_source(SourceSchema {
1008            uri: "duckdb://analytics.db".to_string(),
1009            tables: duck_tables,
1010            cached_at: "2026-02-17T00:00:00Z".to_string(),
1011        });
1012
1013        let mut pg_tables = HashMap::new();
1014        pg_tables.insert(
1015            "orders".to_string(),
1016            EntitySchema {
1017                name: "orders".to_string(),
1018                columns: vec![FieldSchema {
1019                    name: "id".to_string(),
1020                    shape_type: "int".to_string(),
1021                    nullable: false,
1022                }],
1023            },
1024        );
1025        cache.upsert_source(SourceSchema {
1026            uri: "postgres://localhost/app".to_string(),
1027            tables: pg_tables,
1028            cached_at: "2026-02-17T00:00:00Z".to_string(),
1029        });
1030
1031        let dir = tempfile::tempdir().unwrap();
1032        let path = dir.path().join(CACHE_FILENAME);
1033        cache.save(&path).unwrap();
1034
1035        let duck_schemas =
1036            load_cached_type_schemas_for_uri_prefixes(&path, &["duckdb://"]).unwrap();
1037        assert_eq!(duck_schemas.len(), 1);
1038        assert_eq!(duck_schemas[0].name, "DbRow_users");
1039
1040        let pg_schemas =
1041            load_cached_type_schemas_for_uri_prefixes(&path, &["postgres://", "postgresql://"])
1042                .unwrap();
1043        assert_eq!(pg_schemas.len(), 1);
1044        assert_eq!(pg_schemas[0].name, "DbRow_orders");
1045    }
1046
1047    #[test]
1048    fn test_load_cached_source_for_uri_with_diagnostics() {
1049        let cache = sample_cache();
1050        let dir = tempfile::tempdir().unwrap();
1051        let path = dir.path().join(CACHE_FILENAME);
1052        cache.save(&path).unwrap();
1053
1054        let (source, diagnostics) =
1055            load_cached_source_for_uri_with_diagnostics(&path, "duckdb://analytics.db").unwrap();
1056        assert_eq!(source.uri, "duckdb://analytics.db");
1057        assert!(diagnostics.is_empty());
1058    }
1059
1060    #[test]
1061    fn test_default_cache_path_ends_with_shape_lock() {
1062        let path = default_cache_path();
1063        assert!(path.ends_with(CACHE_FILENAME));
1064    }
1065}