Skip to main content

shape_runtime/
schema_cache.rs

1//! Data-source schema cache backed by unified `shape.lock` artifacts.
2//!
3//! This replaces the legacy `shape.database.lock.json` sidecar file and keeps
4//! external schema state in the shared lockfile model.
5
6use crate::package_lock::{ArtifactDeterminism, LockedArtifact, PackageLock};
7use crate::type_schema::{TypeSchema, TypeSchemaBuilder};
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use shape_value::KindedSlot;
11use shape_wire::WireValue;
12use std::collections::{BTreeMap, HashMap};
13use std::path::{Path, PathBuf};
14use std::sync::RwLock;
15
16/// Top-level cache file path. Kept for call-site compatibility.
17pub const CACHE_FILENAME: &str = "shape.lock";
18
19const SCHEMA_CACHE_VERSION: u32 = 1;
20const SCHEMA_CACHE_NAMESPACE: &str = "external.datasource.schema";
21const SCHEMA_CACHE_PRODUCER: &str = "shape-runtime/schema_cache@v1";
22
23static DEFAULT_CACHE_PATH_OVERRIDE: std::sync::LazyLock<RwLock<Option<PathBuf>>> =
24    std::sync::LazyLock::new(|| RwLock::new(None));
25
26/// Diagnostic emitted while loading schema artifacts from `shape.lock`.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct SchemaCacheDiagnostic {
29    pub key: String,
30    pub message: String,
31}
32
33/// Top-level cache model for external data-source schemas.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct DataSourceSchemaCache {
36    /// Schema cache format version.
37    pub version: u32,
38    /// Source URI -> schema mapping.
39    pub sources: HashMap<String, SourceSchema>,
40}
41
42/// Schema for a single data-source URI.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SourceSchema {
45    /// The source URI (e.g., "duckdb://analytics.db").
46    pub uri: String,
47    /// Entity name -> schema mapping.
48    pub tables: HashMap<String, EntitySchema>,
49    /// RFC3339 timestamp of when schemas were last fetched.
50    pub cached_at: String,
51}
52
53/// Schema for a single data-source entity.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct EntitySchema {
56    /// Entity name.
57    pub name: String,
58    /// Ordered list of fields.
59    pub columns: Vec<FieldSchema>,
60}
61
62/// Schema for a single field.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct FieldSchema {
65    /// Column name.
66    pub name: String,
67    /// Shape type: `int`, `number`, `string`, `bool`, `timestamp`.
68    #[serde(rename = "type")]
69    pub shape_type: String,
70    /// Whether the column accepts null values.
71    pub nullable: bool,
72}
73
74/// Convert a `SourceSchema` into a [`KindedSlot`] typed-object payload.
75///
76/// Phase 1.B (ADR-006 §2.7.4 audit-accuracy ruling): the runtime-side
77/// SourceSchema↔KindedSlot path goes through nested
78/// `typed_object_from_pairs`. The current `typed_object_from_pairs`
79/// expects the schema to already be registered for the `(uri, tables,
80/// cached_at)` and per-table-entity field shapes — those schemas are
81/// not currently registered (they were synthesized at runtime under
82/// the deleted ValueWord path). The Phase 2c rebuild registers the
83/// schemas as predeclared and rebuilds this helper. Until then, this
84/// function returns an empty `KindedSlot` placeholder.
85///
86/// The wire-format path (`source_schema_to_wire`) is unaffected and
87/// remains the canonical persisted-format converter.
88pub fn source_schema_to_nb(_schema: &SourceSchema) -> KindedSlot {
89    KindedSlot::none()
90}
91
92/// Decode a `SourceSchema` from a typed-object [`KindedSlot`] payload.
93///
94/// See [`source_schema_to_nb`] — Phase 2c rebuild deferral. The wire-
95/// format path (`source_schema_from_wire`) handles the persisted case.
96pub fn source_schema_from_nb(_value: &KindedSlot) -> Result<SourceSchema, String> {
97    Err("source_schema_from_nb: pending Phase 2c kind-threaded TypedObject decode — see ADR-006 §2.7.4".to_string())
98}
99
100/// Decode a `SourceSchema` from a shape-wire object payload.
101pub fn source_schema_from_wire(value: &WireValue) -> Result<SourceSchema, String> {
102    let object = match value {
103        WireValue::Object(map) => map,
104        _ => return Err("schema payload must be an object".to_string()),
105    };
106
107    let uri = object
108        .get("uri")
109        .and_then(|v| match v {
110            WireValue::String(s) => Some(s.clone()),
111            _ => None,
112        })
113        .ok_or_else(|| "schema payload missing string field 'uri'".to_string())?;
114
115    let cached_at = object
116        .get("cached_at")
117        .and_then(|v| match v {
118            WireValue::String(s) => Some(s.clone()),
119            _ => None,
120        })
121        .unwrap_or_default();
122
123    let tables_value = object
124        .get("tables")
125        .ok_or_else(|| "schema payload missing object field 'tables'".to_string())?;
126    let tables_object = match tables_value {
127        WireValue::Object(map) => map,
128        _ => return Err("schema payload field 'tables' must be an object".to_string()),
129    };
130
131    let mut tables = HashMap::new();
132    for (table_name, entity_value) in tables_object {
133        let entity_obj = match entity_value {
134            WireValue::Object(map) => map,
135            _ => return Err(format!("table '{table_name}' schema must be an object")),
136        };
137
138        let entity_name = entity_obj
139            .get("name")
140            .and_then(|v| match v {
141                WireValue::String(s) => Some(s.clone()),
142                _ => None,
143            })
144            .unwrap_or_else(|| table_name.clone());
145
146        let columns_value = entity_obj
147            .get("columns")
148            .ok_or_else(|| format!("table '{table_name}' missing 'columns' array"))?;
149        let columns_array = match columns_value {
150            WireValue::Array(values) => values,
151            _ => {
152                return Err(format!(
153                    "table '{table_name}' field 'columns' must be an array"
154                ));
155            }
156        };
157
158        let mut columns = Vec::new();
159        for column_value in columns_array {
160            let column_obj = match column_value {
161                WireValue::Object(map) => map,
162                _ => {
163                    return Err(format!(
164                        "table '{table_name}' contains non-object column entry"
165                    ));
166                }
167            };
168
169            let name = column_obj
170                .get("name")
171                .and_then(|v| match v {
172                    WireValue::String(s) => Some(s.clone()),
173                    _ => None,
174                })
175                .ok_or_else(|| format!("table '{table_name}' column missing string 'name'"))?;
176
177            let shape_type = column_obj
178                .get("type")
179                .or_else(|| column_obj.get("shape_type"))
180                .and_then(|v| match v {
181                    WireValue::String(s) => Some(s.clone()),
182                    _ => None,
183                })
184                .ok_or_else(|| format!("table '{table_name}' column '{name}' missing type"))?;
185
186            let nullable = column_obj
187                .get("nullable")
188                .and_then(|v| match v {
189                    WireValue::Bool(b) => Some(*b),
190                    _ => None,
191                })
192                .unwrap_or(false);
193
194            columns.push(FieldSchema {
195                name,
196                shape_type,
197                nullable,
198            });
199        }
200
201        tables.insert(
202            table_name.clone(),
203            EntitySchema {
204                name: entity_name,
205                columns,
206            },
207        );
208    }
209
210    Ok(SourceSchema {
211        uri,
212        tables,
213        cached_at,
214    })
215}
216
217impl DataSourceSchemaCache {
218    /// Create an empty cache.
219    pub fn new() -> Self {
220        Self {
221            version: SCHEMA_CACHE_VERSION,
222            sources: HashMap::new(),
223        }
224    }
225
226    /// Save cache entries into `shape.lock` artifacts.
227    pub fn save(&self, path: &Path) -> std::io::Result<()> {
228        let mut lock = PackageLock::read(path).unwrap_or_default();
229
230        // Replace this namespace wholesale to keep the lock deterministic.
231        lock.artifacts
232            .retain(|artifact| artifact.namespace != SCHEMA_CACHE_NAMESPACE);
233
234        let mut uris: Vec<_> = self.sources.keys().cloned().collect();
235        uris.sort();
236
237        for uri in uris {
238            let Some(source) = self.sources.get(&uri) else {
239                continue;
240            };
241
242            let schema_hash = hash_source_schema(source);
243            let payload = source_to_payload(source);
244
245            let mut inputs = BTreeMap::new();
246            inputs.insert("uri".to_string(), uri.clone());
247            inputs.insert("schema_hash".to_string(), schema_hash.clone());
248
249            let determinism = ArtifactDeterminism::External {
250                fingerprints: BTreeMap::from([(format!("schema:{uri}"), schema_hash)]),
251            };
252
253            let artifact = LockedArtifact::new(
254                SCHEMA_CACHE_NAMESPACE,
255                uri,
256                SCHEMA_CACHE_PRODUCER,
257                determinism,
258                inputs,
259                payload,
260            )
261            .map_err(std::io::Error::other)?;
262
263            lock.upsert_artifact(artifact)
264                .map_err(std::io::Error::other)?;
265        }
266
267        lock.write(path)
268    }
269
270    /// Load cache entries from `shape.lock` artifacts.
271    pub fn load(path: &Path) -> std::io::Result<Self> {
272        let (cache, diagnostics) = Self::load_with_diagnostics(path)?;
273        if let Some(diag) = diagnostics.first() {
274            return Err(std::io::Error::new(
275                std::io::ErrorKind::InvalidData,
276                format!("invalid schema artifact '{}': {}", diag.key, diag.message),
277            ));
278        }
279        Ok(cache)
280    }
281
282    /// Load cache entries from `shape.lock` artifacts and collect diagnostics for
283    /// stale/invalid artifacts while keeping valid entries.
284    pub fn load_with_diagnostics(
285        path: &Path,
286    ) -> std::io::Result<(Self, Vec<SchemaCacheDiagnostic>)> {
287        let lock = PackageLock::read(path).ok_or_else(|| {
288            std::io::Error::new(std::io::ErrorKind::NotFound, "shape.lock not found")
289        })?;
290
291        let mut sources = HashMap::new();
292        let mut diagnostics = Vec::new();
293        for artifact in lock
294            .artifacts
295            .iter()
296            .filter(|artifact| artifact.namespace == SCHEMA_CACHE_NAMESPACE)
297        {
298            let payload = match artifact.payload() {
299                Ok(payload) => payload,
300                Err(err) => {
301                    diagnostics.push(SchemaCacheDiagnostic {
302                        key: artifact.key.clone(),
303                        message: format!("payload decode failed: {err}"),
304                    });
305                    continue;
306                }
307            };
308            let source = match payload_to_source(&artifact.key, &payload) {
309                Ok(source) => source,
310                Err(err) => {
311                    diagnostics.push(SchemaCacheDiagnostic {
312                        key: artifact.key.clone(),
313                        message: format!("payload parse failed: {err}"),
314                    });
315                    continue;
316                }
317            };
318
319            if let Some(expected_hash) = source_fingerprint_hash(artifact, &source.uri) {
320                let actual_hash = hash_source_schema(&source);
321                if expected_hash != actual_hash {
322                    diagnostics.push(SchemaCacheDiagnostic {
323                        key: artifact.key.clone(),
324                        message: format!(
325                            "stale schema fingerprint (expected {expected_hash}, computed {actual_hash})"
326                        ),
327                    });
328                    continue;
329                }
330            } else {
331                diagnostics.push(SchemaCacheDiagnostic {
332                    key: artifact.key.clone(),
333                    message: "missing schema fingerprint".to_string(),
334                });
335                continue;
336            }
337            sources.insert(source.uri.clone(), source);
338        }
339
340        Ok((
341            Self {
342                version: SCHEMA_CACHE_VERSION,
343                sources,
344            },
345            diagnostics,
346        ))
347    }
348
349    /// Try to load cache entries, returning an empty cache if lockfile is missing.
350    pub fn load_or_empty(path: &Path) -> Self {
351        match Self::load(path) {
352            Ok(cache) => cache,
353            Err(_) => Self::new(),
354        }
355    }
356
357    /// Get schema for a specific source URI.
358    pub fn get_source(&self, uri: &str) -> Option<&SourceSchema> {
359        self.sources.get(uri)
360    }
361
362    /// Insert or update a source schema.
363    pub fn upsert_source(&mut self, schema: SourceSchema) {
364        self.sources.insert(schema.uri.clone(), schema);
365    }
366
367    /// Check if offline mode is enabled via `SHAPE_OFFLINE=true`.
368    pub fn is_offline() -> bool {
369        std::env::var("SHAPE_OFFLINE")
370            .map(|value| value == "true" || value == "1")
371            .unwrap_or(false)
372    }
373}
374
375/// Load cached schemas and convert matching sources into runtime `TypeSchema`s.
376///
377/// `uri_prefixes` filters which sources are included (for example:
378/// `["duckdb://"]` or `["postgres://", "postgresql://"]`).
379pub fn load_cached_type_schemas_for_uri_prefixes(
380    cache_path: &Path,
381    uri_prefixes: &[&str],
382) -> std::io::Result<Vec<TypeSchema>> {
383    let (schemas, _diagnostics) =
384        load_cached_type_schemas_for_uri_prefixes_with_diagnostics(cache_path, uri_prefixes)?;
385    Ok(schemas)
386}
387
388/// Like [`load_cached_type_schemas_for_uri_prefixes`] but also returns
389/// non-fatal diagnostics for invalid/stale artifacts that were ignored.
390pub fn load_cached_type_schemas_for_uri_prefixes_with_diagnostics(
391    cache_path: &Path,
392    uri_prefixes: &[&str],
393) -> std::io::Result<(Vec<TypeSchema>, Vec<SchemaCacheDiagnostic>)> {
394    let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
395    let mut schemas = Vec::new();
396
397    for source in cache.sources.values() {
398        if !uri_prefixes.is_empty()
399            && !uri_prefixes
400                .iter()
401                .any(|prefix| source.uri.starts_with(prefix))
402        {
403            continue;
404        }
405
406        for table in source.tables.values() {
407            schemas.push(type_schema_from_entity(table));
408        }
409    }
410
411    Ok((schemas, diagnostics))
412}
413
414/// Resolve the default `shape.lock` path from the current working directory.
415pub fn default_cache_path() -> PathBuf {
416    if let Ok(guard) = DEFAULT_CACHE_PATH_OVERRIDE.read() {
417        if let Some(path) = guard.as_ref() {
418            return path.clone();
419        }
420    }
421
422    std::env::current_dir()
423        .unwrap_or_default()
424        .join(CACHE_FILENAME)
425}
426
427/// Override the default lock/cache path used by extension-side schema cache
428/// helpers during this process.
429pub fn set_default_cache_path(path: Option<PathBuf>) {
430    if let Ok(mut guard) = DEFAULT_CACHE_PATH_OVERRIDE.write() {
431        *guard = path;
432    }
433}
434
435/// Load one cached source schema by URI.
436pub fn load_cached_source_for_uri(cache_path: &Path, uri: &str) -> std::io::Result<SourceSchema> {
437    let (source, diagnostics) = load_cached_source_for_uri_with_diagnostics(cache_path, uri)?;
438    if let Some(diag) = diagnostics.first() {
439        return Err(std::io::Error::new(
440            std::io::ErrorKind::InvalidData,
441            format!("invalid schema artifact '{}': {}", diag.key, diag.message),
442        ));
443    }
444    Ok(source)
445}
446
447/// Load one cached source schema by URI and keep non-fatal diagnostics for
448/// stale/invalid artifacts that were ignored.
449pub fn load_cached_source_for_uri_with_diagnostics(
450    cache_path: &Path,
451    uri: &str,
452) -> std::io::Result<(SourceSchema, Vec<SchemaCacheDiagnostic>)> {
453    let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
454    let source = cache.get_source(uri).cloned().ok_or_else(|| {
455        std::io::Error::new(
456            std::io::ErrorKind::NotFound,
457            format!("no cached schema for '{uri}'"),
458        )
459    })?;
460    Ok((source, diagnostics))
461}
462
463impl Default for DataSourceSchemaCache {
464    fn default() -> Self {
465        Self::new()
466    }
467}
468
469impl SourceSchema {
470    /// Get schema for a specific entity.
471    pub fn get_entity(&self, name: &str) -> Option<&EntitySchema> {
472        self.tables.get(name)
473    }
474
475    /// Get all entity names.
476    pub fn entity_names(&self) -> Vec<&str> {
477        self.tables.keys().map(|name| name.as_str()).collect()
478    }
479}
480
481impl EntitySchema {
482    /// Get a field by name.
483    pub fn get_field(&self, name: &str) -> Option<&FieldSchema> {
484        self.columns.iter().find(|column| column.name == name)
485    }
486
487    /// Get all field names.
488    pub fn field_names(&self) -> Vec<&str> {
489        self.columns
490            .iter()
491            .map(|column| column.name.as_str())
492            .collect()
493    }
494}
495
496fn hash_source_schema(source: &SourceSchema) -> String {
497    let mut hasher = Sha256::new();
498    hasher.update(source.uri.as_bytes());
499    hasher.update([0]);
500
501    let mut entity_names: Vec<_> = source.tables.keys().cloned().collect();
502    entity_names.sort();
503    for entity_name in entity_names {
504        hasher.update(entity_name.as_bytes());
505        hasher.update([0]);
506
507        if let Some(entity) = source.tables.get(&entity_name) {
508            for field in &entity.columns {
509                hasher.update(field.name.as_bytes());
510                hasher.update([0]);
511                hasher.update(field.shape_type.as_bytes());
512                hasher.update([0]);
513                hasher.update([if field.nullable { 1 } else { 0 }]);
514            }
515        }
516    }
517
518    format!("sha256:{:x}", hasher.finalize())
519}
520
521fn source_fingerprint_hash<'a>(artifact: &'a LockedArtifact, uri: &str) -> Option<&'a str> {
522    if let ArtifactDeterminism::External { fingerprints } = &artifact.determinism {
523        let key = format!("schema:{uri}");
524        if let Some(value) = fingerprints.get(&key) {
525            return Some(value.as_str());
526        }
527    }
528    artifact.inputs.get("schema_hash").map(String::as_str)
529}
530
531fn type_schema_from_entity(entity: &EntitySchema) -> TypeSchema {
532    let schema_name = format!("DbRow_{}", entity.name);
533    let builder =
534        entity
535            .columns
536            .iter()
537            .fold(
538                TypeSchemaBuilder::new(&schema_name),
539                |builder, field| match field.shape_type.as_str() {
540                    "int" => builder.i64_field(&field.name),
541                    "number" => builder.f64_field(&field.name),
542                    "string" => builder.string_field(&field.name),
543                    "bool" => builder.bool_field(&field.name),
544                    "timestamp" => builder.timestamp_field(&field.name),
545                    _ => builder.any_field(&field.name),
546                },
547            );
548    builder.build()
549}
550
551fn source_to_payload(source: &SourceSchema) -> shape_wire::WireValue {
552    let mut entities: Vec<_> = source.tables.values().collect();
553    entities.sort_by(|left, right| left.name.cmp(&right.name));
554
555    let entity_values = entities
556        .into_iter()
557        .map(|entity| {
558            let field_values = entity
559                .columns
560                .iter()
561                .map(|field| {
562                    shape_wire::WireValue::Object(BTreeMap::from([
563                        (
564                            "name".to_string(),
565                            shape_wire::WireValue::String(field.name.clone()),
566                        ),
567                        (
568                            "shape_type".to_string(),
569                            shape_wire::WireValue::String(field.shape_type.clone()),
570                        ),
571                        (
572                            "nullable".to_string(),
573                            shape_wire::WireValue::Bool(field.nullable),
574                        ),
575                    ]))
576                })
577                .collect::<Vec<_>>();
578
579            shape_wire::WireValue::Object(BTreeMap::from([
580                (
581                    "name".to_string(),
582                    shape_wire::WireValue::String(entity.name.clone()),
583                ),
584                (
585                    "columns".to_string(),
586                    shape_wire::WireValue::Array(field_values),
587                ),
588            ]))
589        })
590        .collect::<Vec<_>>();
591
592    shape_wire::WireValue::Object(BTreeMap::from([
593        (
594            "uri".to_string(),
595            shape_wire::WireValue::String(source.uri.clone()),
596        ),
597        (
598            "cached_at".to_string(),
599            shape_wire::WireValue::String(source.cached_at.clone()),
600        ),
601        (
602            "tables".to_string(),
603            shape_wire::WireValue::Array(entity_values),
604        ),
605    ]))
606}
607
608fn payload_to_source(
609    key_hint: &str,
610    payload: &shape_wire::WireValue,
611) -> Result<SourceSchema, String> {
612    let shape_wire::WireValue::Object(map) = payload else {
613        return Err("source payload must be an object".to_string());
614    };
615
616    let uri = map
617        .get("uri")
618        .and_then(shape_wire::WireValue::as_str)
619        .map(|value| value.to_string())
620        .filter(|value| !value.is_empty())
621        .unwrap_or_else(|| key_hint.to_string());
622    let cached_at = map
623        .get("cached_at")
624        .and_then(shape_wire::WireValue::as_str)
625        .unwrap_or("")
626        .to_string();
627
628    let tables_value = map
629        .get("tables")
630        .ok_or_else(|| "source payload missing 'tables'".to_string())?;
631    let shape_wire::WireValue::Array(table_values) = tables_value else {
632        return Err("source payload 'tables' must be an array".to_string());
633    };
634
635    let mut tables = HashMap::new();
636    for table_value in table_values {
637        let entity = payload_to_entity(table_value)?;
638        tables.insert(entity.name.clone(), entity);
639    }
640
641    Ok(SourceSchema {
642        uri,
643        tables,
644        cached_at,
645    })
646}
647
648fn payload_to_entity(value: &shape_wire::WireValue) -> Result<EntitySchema, String> {
649    let shape_wire::WireValue::Object(table_map) = value else {
650        return Err("table payload must be an object".to_string());
651    };
652
653    let name = table_map
654        .get("name")
655        .and_then(shape_wire::WireValue::as_str)
656        .ok_or_else(|| "table payload missing 'name'".to_string())?
657        .to_string();
658    let columns_value = table_map
659        .get("columns")
660        .ok_or_else(|| "table payload missing 'columns'".to_string())?;
661    let shape_wire::WireValue::Array(column_values) = columns_value else {
662        return Err("table payload 'columns' must be an array".to_string());
663    };
664
665    let mut columns = Vec::with_capacity(column_values.len());
666    for column_value in column_values {
667        columns.push(payload_to_field(column_value)?);
668    }
669
670    Ok(EntitySchema { name, columns })
671}
672
673fn payload_to_field(value: &shape_wire::WireValue) -> Result<FieldSchema, String> {
674    let shape_wire::WireValue::Object(column_map) = value else {
675        return Err("column payload must be an object".to_string());
676    };
677
678    let name = column_map
679        .get("name")
680        .and_then(shape_wire::WireValue::as_str)
681        .ok_or_else(|| "column payload missing 'name'".to_string())?
682        .to_string();
683    let shape_type = column_map
684        .get("shape_type")
685        .and_then(shape_wire::WireValue::as_str)
686        .ok_or_else(|| "column payload missing 'shape_type'".to_string())?
687        .to_string();
688    let nullable = column_map
689        .get("nullable")
690        .and_then(shape_wire::WireValue::as_bool)
691        .ok_or_else(|| "column payload missing 'nullable'".to_string())?;
692
693    Ok(FieldSchema {
694        name,
695        shape_type,
696        nullable,
697    })
698}
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703    use crate::package_lock::ArtifactDeterminism;
704
705    fn sample_cache() -> DataSourceSchemaCache {
706        let mut cache = DataSourceSchemaCache::new();
707        let mut tables = HashMap::new();
708        tables.insert(
709            "users".to_string(),
710            EntitySchema {
711                name: "users".to_string(),
712                columns: vec![
713                    FieldSchema {
714                        name: "id".to_string(),
715                        shape_type: "int".to_string(),
716                        nullable: false,
717                    },
718                    FieldSchema {
719                        name: "name".to_string(),
720                        shape_type: "string".to_string(),
721                        nullable: false,
722                    },
723                    FieldSchema {
724                        name: "age".to_string(),
725                        shape_type: "int".to_string(),
726                        nullable: true,
727                    },
728                ],
729            },
730        );
731        cache.upsert_source(SourceSchema {
732            uri: "duckdb://analytics.db".to_string(),
733            tables,
734            cached_at: "2026-02-12T10:00:00Z".to_string(),
735        });
736        cache
737    }
738
739    #[test]
740    fn test_roundtrip_serialization() {
741        let cache = sample_cache();
742
743        let dir = tempfile::tempdir().unwrap();
744        let path = dir.path().join(CACHE_FILENAME);
745        cache.save(&path).unwrap();
746
747        let loaded = DataSourceSchemaCache::load(&path).unwrap();
748        assert_eq!(loaded.version, SCHEMA_CACHE_VERSION);
749        assert_eq!(loaded.sources.len(), 1);
750
751        let conn = loaded.get_source("duckdb://analytics.db").unwrap();
752        assert_eq!(conn.tables.len(), 1);
753
754        let users = conn.get_entity("users").unwrap();
755        assert_eq!(users.columns.len(), 3);
756        assert_eq!(users.columns[0].name, "id");
757        assert_eq!(users.columns[0].shape_type, "int");
758        assert!(!users.columns[0].nullable);
759        assert_eq!(users.columns[2].name, "age");
760        assert!(users.columns[2].nullable);
761    }
762
763    #[test]
764    fn test_load_or_empty_missing_file() {
765        let cache = DataSourceSchemaCache::load_or_empty(Path::new("/nonexistent/path.toml"));
766        assert_eq!(cache.version, SCHEMA_CACHE_VERSION);
767        assert!(cache.sources.is_empty());
768    }
769
770    #[test]
771    fn test_source_helpers() {
772        let cache = sample_cache();
773        let conn = cache.get_source("duckdb://analytics.db").unwrap();
774
775        let names = conn.entity_names();
776        assert!(names.contains(&"users"));
777
778        let users = conn.get_entity("users").unwrap();
779        assert_eq!(users.field_names(), vec!["id", "name", "age"]);
780        assert!(users.get_field("id").is_some());
781        assert!(users.get_field("nonexistent").is_none());
782    }
783
784    #[test]
785    fn test_upsert_source() {
786        let mut cache = DataSourceSchemaCache::new();
787        assert!(cache.sources.is_empty());
788
789        cache.upsert_source(SourceSchema {
790            uri: "duckdb://test.db".to_string(),
791            tables: HashMap::new(),
792            cached_at: "2026-01-01T00:00:00Z".to_string(),
793        });
794        assert_eq!(cache.sources.len(), 1);
795
796        cache.upsert_source(SourceSchema {
797            uri: "duckdb://test.db".to_string(),
798            tables: HashMap::new(),
799            cached_at: "2026-02-01T00:00:00Z".to_string(),
800        });
801        assert_eq!(cache.sources.len(), 1);
802        assert_eq!(
803            cache.get_source("duckdb://test.db").unwrap().cached_at,
804            "2026-02-01T00:00:00Z"
805        );
806    }
807
808    #[test]
809    fn test_load_with_diagnostics_reports_stale_fingerprint() {
810        let cache = sample_cache();
811        let source = cache.get_source("duckdb://analytics.db").unwrap();
812
813        let payload = source_to_payload(source);
814        let artifact = LockedArtifact::new(
815            SCHEMA_CACHE_NAMESPACE,
816            source.uri.clone(),
817            SCHEMA_CACHE_PRODUCER,
818            ArtifactDeterminism::External {
819                fingerprints: BTreeMap::from([(
820                    format!("schema:{}", source.uri),
821                    "sha256:deadbeef".to_string(),
822                )]),
823            },
824            BTreeMap::from([
825                ("uri".to_string(), source.uri.clone()),
826                ("schema_hash".to_string(), "sha256:deadbeef".to_string()),
827            ]),
828            payload,
829        )
830        .unwrap();
831
832        let mut lock = PackageLock::new();
833        lock.upsert_artifact(artifact).unwrap();
834
835        let dir = tempfile::tempdir().unwrap();
836        let path = dir.path().join(CACHE_FILENAME);
837        lock.write(&path).unwrap();
838
839        let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
840        assert!(loaded.sources.is_empty());
841        assert_eq!(diagnostics.len(), 1);
842        assert!(diagnostics[0].message.contains("stale schema fingerprint"));
843    }
844
845    #[test]
846    fn test_load_with_diagnostics_reports_invalid_payload() {
847        let artifact = LockedArtifact::new(
848            SCHEMA_CACHE_NAMESPACE,
849            "broken://source",
850            SCHEMA_CACHE_PRODUCER,
851            ArtifactDeterminism::External {
852                fingerprints: BTreeMap::from([(
853                    "schema:broken://source".to_string(),
854                    "sha256:abc".to_string(),
855                )]),
856            },
857            BTreeMap::from([
858                ("uri".to_string(), "broken://source".to_string()),
859                ("schema_hash".to_string(), "sha256:abc".to_string()),
860            ]),
861            shape_wire::WireValue::String("bad".to_string()),
862        )
863        .unwrap();
864
865        let mut lock = PackageLock::new();
866        lock.upsert_artifact(artifact).unwrap();
867
868        let dir = tempfile::tempdir().unwrap();
869        let path = dir.path().join(CACHE_FILENAME);
870        lock.write(&path).unwrap();
871
872        let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
873        assert!(loaded.sources.is_empty());
874        assert_eq!(diagnostics.len(), 1);
875        assert!(diagnostics[0].message.contains("payload parse failed"));
876    }
877
878    #[test]
879    fn test_load_cached_type_schemas_for_uri_prefixes_filters_sources() {
880        let mut cache = DataSourceSchemaCache::new();
881
882        let mut duck_tables = HashMap::new();
883        duck_tables.insert(
884            "users".to_string(),
885            EntitySchema {
886                name: "users".to_string(),
887                columns: vec![FieldSchema {
888                    name: "id".to_string(),
889                    shape_type: "int".to_string(),
890                    nullable: false,
891                }],
892            },
893        );
894        cache.upsert_source(SourceSchema {
895            uri: "duckdb://analytics.db".to_string(),
896            tables: duck_tables,
897            cached_at: "2026-02-17T00:00:00Z".to_string(),
898        });
899
900        let mut pg_tables = HashMap::new();
901        pg_tables.insert(
902            "orders".to_string(),
903            EntitySchema {
904                name: "orders".to_string(),
905                columns: vec![FieldSchema {
906                    name: "id".to_string(),
907                    shape_type: "int".to_string(),
908                    nullable: false,
909                }],
910            },
911        );
912        cache.upsert_source(SourceSchema {
913            uri: "postgres://localhost/app".to_string(),
914            tables: pg_tables,
915            cached_at: "2026-02-17T00:00:00Z".to_string(),
916        });
917
918        let dir = tempfile::tempdir().unwrap();
919        let path = dir.path().join(CACHE_FILENAME);
920        cache.save(&path).unwrap();
921
922        let duck_schemas =
923            load_cached_type_schemas_for_uri_prefixes(&path, &["duckdb://"]).unwrap();
924        assert_eq!(duck_schemas.len(), 1);
925        assert_eq!(duck_schemas[0].name, "DbRow_users");
926
927        let pg_schemas =
928            load_cached_type_schemas_for_uri_prefixes(&path, &["postgres://", "postgresql://"])
929                .unwrap();
930        assert_eq!(pg_schemas.len(), 1);
931        assert_eq!(pg_schemas[0].name, "DbRow_orders");
932    }
933
934    #[test]
935    fn test_load_cached_source_for_uri_with_diagnostics() {
936        let cache = sample_cache();
937        let dir = tempfile::tempdir().unwrap();
938        let path = dir.path().join(CACHE_FILENAME);
939        cache.save(&path).unwrap();
940
941        let (source, diagnostics) =
942            load_cached_source_for_uri_with_diagnostics(&path, "duckdb://analytics.db").unwrap();
943        assert_eq!(source.uri, "duckdb://analytics.db");
944        assert!(diagnostics.is_empty());
945    }
946
947    #[test]
948    fn test_default_cache_path_ends_with_shape_lock() {
949        let path = default_cache_path();
950        assert!(path.ends_with(CACHE_FILENAME));
951    }
952}