1use crate::package_lock::{ArtifactDeterminism, LockedArtifact, PackageLock};
7use crate::type_schema::{TypeSchema, TypeSchemaBuilder};
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use shape_value::KindedSlot;
11use shape_wire::WireValue;
12use std::collections::{BTreeMap, HashMap};
13use std::path::{Path, PathBuf};
14use std::sync::RwLock;
15
16pub const CACHE_FILENAME: &str = "shape.lock";
18
19const SCHEMA_CACHE_VERSION: u32 = 1;
20const SCHEMA_CACHE_NAMESPACE: &str = "external.datasource.schema";
21const SCHEMA_CACHE_PRODUCER: &str = "shape-runtime/schema_cache@v1";
22
23static DEFAULT_CACHE_PATH_OVERRIDE: std::sync::LazyLock<RwLock<Option<PathBuf>>> =
24 std::sync::LazyLock::new(|| RwLock::new(None));
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct SchemaCacheDiagnostic {
29 pub key: String,
30 pub message: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct DataSourceSchemaCache {
36 pub version: u32,
38 pub sources: HashMap<String, SourceSchema>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SourceSchema {
45 pub uri: String,
47 pub tables: HashMap<String, EntitySchema>,
49 pub cached_at: String,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct EntitySchema {
56 pub name: String,
58 pub columns: Vec<FieldSchema>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct FieldSchema {
65 pub name: String,
67 #[serde(rename = "type")]
69 pub shape_type: String,
70 pub nullable: bool,
72}
73
74pub fn source_schema_to_nb(_schema: &SourceSchema) -> KindedSlot {
89 KindedSlot::none()
90}
91
92pub fn source_schema_from_nb(_value: &KindedSlot) -> Result<SourceSchema, String> {
97 Err("source_schema_from_nb: pending Phase 2c kind-threaded TypedObject decode — see ADR-006 §2.7.4".to_string())
98}
99
100pub fn source_schema_from_wire(value: &WireValue) -> Result<SourceSchema, String> {
102 let object = match value {
103 WireValue::Object(map) => map,
104 _ => return Err("schema payload must be an object".to_string()),
105 };
106
107 let uri = object
108 .get("uri")
109 .and_then(|v| match v {
110 WireValue::String(s) => Some(s.clone()),
111 _ => None,
112 })
113 .ok_or_else(|| "schema payload missing string field 'uri'".to_string())?;
114
115 let cached_at = object
116 .get("cached_at")
117 .and_then(|v| match v {
118 WireValue::String(s) => Some(s.clone()),
119 _ => None,
120 })
121 .unwrap_or_default();
122
123 let tables_value = object
124 .get("tables")
125 .ok_or_else(|| "schema payload missing object field 'tables'".to_string())?;
126 let tables_object = match tables_value {
127 WireValue::Object(map) => map,
128 _ => return Err("schema payload field 'tables' must be an object".to_string()),
129 };
130
131 let mut tables = HashMap::new();
132 for (table_name, entity_value) in tables_object {
133 let entity_obj = match entity_value {
134 WireValue::Object(map) => map,
135 _ => return Err(format!("table '{table_name}' schema must be an object")),
136 };
137
138 let entity_name = entity_obj
139 .get("name")
140 .and_then(|v| match v {
141 WireValue::String(s) => Some(s.clone()),
142 _ => None,
143 })
144 .unwrap_or_else(|| table_name.clone());
145
146 let columns_value = entity_obj
147 .get("columns")
148 .ok_or_else(|| format!("table '{table_name}' missing 'columns' array"))?;
149 let columns_array = match columns_value {
150 WireValue::Array(values) => values,
151 _ => {
152 return Err(format!(
153 "table '{table_name}' field 'columns' must be an array"
154 ));
155 }
156 };
157
158 let mut columns = Vec::new();
159 for column_value in columns_array {
160 let column_obj = match column_value {
161 WireValue::Object(map) => map,
162 _ => {
163 return Err(format!(
164 "table '{table_name}' contains non-object column entry"
165 ));
166 }
167 };
168
169 let name = column_obj
170 .get("name")
171 .and_then(|v| match v {
172 WireValue::String(s) => Some(s.clone()),
173 _ => None,
174 })
175 .ok_or_else(|| format!("table '{table_name}' column missing string 'name'"))?;
176
177 let shape_type = column_obj
178 .get("type")
179 .or_else(|| column_obj.get("shape_type"))
180 .and_then(|v| match v {
181 WireValue::String(s) => Some(s.clone()),
182 _ => None,
183 })
184 .ok_or_else(|| format!("table '{table_name}' column '{name}' missing type"))?;
185
186 let nullable = column_obj
187 .get("nullable")
188 .and_then(|v| match v {
189 WireValue::Bool(b) => Some(*b),
190 _ => None,
191 })
192 .unwrap_or(false);
193
194 columns.push(FieldSchema {
195 name,
196 shape_type,
197 nullable,
198 });
199 }
200
201 tables.insert(
202 table_name.clone(),
203 EntitySchema {
204 name: entity_name,
205 columns,
206 },
207 );
208 }
209
210 Ok(SourceSchema {
211 uri,
212 tables,
213 cached_at,
214 })
215}
216
217impl DataSourceSchemaCache {
218 pub fn new() -> Self {
220 Self {
221 version: SCHEMA_CACHE_VERSION,
222 sources: HashMap::new(),
223 }
224 }
225
226 pub fn save(&self, path: &Path) -> std::io::Result<()> {
228 let mut lock = PackageLock::read(path).unwrap_or_default();
229
230 lock.artifacts
232 .retain(|artifact| artifact.namespace != SCHEMA_CACHE_NAMESPACE);
233
234 let mut uris: Vec<_> = self.sources.keys().cloned().collect();
235 uris.sort();
236
237 for uri in uris {
238 let Some(source) = self.sources.get(&uri) else {
239 continue;
240 };
241
242 let schema_hash = hash_source_schema(source);
243 let payload = source_to_payload(source);
244
245 let mut inputs = BTreeMap::new();
246 inputs.insert("uri".to_string(), uri.clone());
247 inputs.insert("schema_hash".to_string(), schema_hash.clone());
248
249 let determinism = ArtifactDeterminism::External {
250 fingerprints: BTreeMap::from([(format!("schema:{uri}"), schema_hash)]),
251 };
252
253 let artifact = LockedArtifact::new(
254 SCHEMA_CACHE_NAMESPACE,
255 uri,
256 SCHEMA_CACHE_PRODUCER,
257 determinism,
258 inputs,
259 payload,
260 )
261 .map_err(std::io::Error::other)?;
262
263 lock.upsert_artifact(artifact)
264 .map_err(std::io::Error::other)?;
265 }
266
267 lock.write(path)
268 }
269
270 pub fn load(path: &Path) -> std::io::Result<Self> {
272 let (cache, diagnostics) = Self::load_with_diagnostics(path)?;
273 if let Some(diag) = diagnostics.first() {
274 return Err(std::io::Error::new(
275 std::io::ErrorKind::InvalidData,
276 format!("invalid schema artifact '{}': {}", diag.key, diag.message),
277 ));
278 }
279 Ok(cache)
280 }
281
282 pub fn load_with_diagnostics(
285 path: &Path,
286 ) -> std::io::Result<(Self, Vec<SchemaCacheDiagnostic>)> {
287 let lock = PackageLock::read(path).ok_or_else(|| {
288 std::io::Error::new(std::io::ErrorKind::NotFound, "shape.lock not found")
289 })?;
290
291 let mut sources = HashMap::new();
292 let mut diagnostics = Vec::new();
293 for artifact in lock
294 .artifacts
295 .iter()
296 .filter(|artifact| artifact.namespace == SCHEMA_CACHE_NAMESPACE)
297 {
298 let payload = match artifact.payload() {
299 Ok(payload) => payload,
300 Err(err) => {
301 diagnostics.push(SchemaCacheDiagnostic {
302 key: artifact.key.clone(),
303 message: format!("payload decode failed: {err}"),
304 });
305 continue;
306 }
307 };
308 let source = match payload_to_source(&artifact.key, &payload) {
309 Ok(source) => source,
310 Err(err) => {
311 diagnostics.push(SchemaCacheDiagnostic {
312 key: artifact.key.clone(),
313 message: format!("payload parse failed: {err}"),
314 });
315 continue;
316 }
317 };
318
319 if let Some(expected_hash) = source_fingerprint_hash(artifact, &source.uri) {
320 let actual_hash = hash_source_schema(&source);
321 if expected_hash != actual_hash {
322 diagnostics.push(SchemaCacheDiagnostic {
323 key: artifact.key.clone(),
324 message: format!(
325 "stale schema fingerprint (expected {expected_hash}, computed {actual_hash})"
326 ),
327 });
328 continue;
329 }
330 } else {
331 diagnostics.push(SchemaCacheDiagnostic {
332 key: artifact.key.clone(),
333 message: "missing schema fingerprint".to_string(),
334 });
335 continue;
336 }
337 sources.insert(source.uri.clone(), source);
338 }
339
340 Ok((
341 Self {
342 version: SCHEMA_CACHE_VERSION,
343 sources,
344 },
345 diagnostics,
346 ))
347 }
348
349 pub fn load_or_empty(path: &Path) -> Self {
351 match Self::load(path) {
352 Ok(cache) => cache,
353 Err(_) => Self::new(),
354 }
355 }
356
357 pub fn get_source(&self, uri: &str) -> Option<&SourceSchema> {
359 self.sources.get(uri)
360 }
361
362 pub fn upsert_source(&mut self, schema: SourceSchema) {
364 self.sources.insert(schema.uri.clone(), schema);
365 }
366
367 pub fn is_offline() -> bool {
369 std::env::var("SHAPE_OFFLINE")
370 .map(|value| value == "true" || value == "1")
371 .unwrap_or(false)
372 }
373}
374
375pub fn load_cached_type_schemas_for_uri_prefixes(
380 cache_path: &Path,
381 uri_prefixes: &[&str],
382) -> std::io::Result<Vec<TypeSchema>> {
383 let (schemas, _diagnostics) =
384 load_cached_type_schemas_for_uri_prefixes_with_diagnostics(cache_path, uri_prefixes)?;
385 Ok(schemas)
386}
387
388pub fn load_cached_type_schemas_for_uri_prefixes_with_diagnostics(
391 cache_path: &Path,
392 uri_prefixes: &[&str],
393) -> std::io::Result<(Vec<TypeSchema>, Vec<SchemaCacheDiagnostic>)> {
394 let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
395 let mut schemas = Vec::new();
396
397 for source in cache.sources.values() {
398 if !uri_prefixes.is_empty()
399 && !uri_prefixes
400 .iter()
401 .any(|prefix| source.uri.starts_with(prefix))
402 {
403 continue;
404 }
405
406 for table in source.tables.values() {
407 schemas.push(type_schema_from_entity(table));
408 }
409 }
410
411 Ok((schemas, diagnostics))
412}
413
414pub fn default_cache_path() -> PathBuf {
416 if let Ok(guard) = DEFAULT_CACHE_PATH_OVERRIDE.read() {
417 if let Some(path) = guard.as_ref() {
418 return path.clone();
419 }
420 }
421
422 std::env::current_dir()
423 .unwrap_or_default()
424 .join(CACHE_FILENAME)
425}
426
427pub fn set_default_cache_path(path: Option<PathBuf>) {
430 if let Ok(mut guard) = DEFAULT_CACHE_PATH_OVERRIDE.write() {
431 *guard = path;
432 }
433}
434
435pub fn load_cached_source_for_uri(cache_path: &Path, uri: &str) -> std::io::Result<SourceSchema> {
437 let (source, diagnostics) = load_cached_source_for_uri_with_diagnostics(cache_path, uri)?;
438 if let Some(diag) = diagnostics.first() {
439 return Err(std::io::Error::new(
440 std::io::ErrorKind::InvalidData,
441 format!("invalid schema artifact '{}': {}", diag.key, diag.message),
442 ));
443 }
444 Ok(source)
445}
446
447pub fn load_cached_source_for_uri_with_diagnostics(
450 cache_path: &Path,
451 uri: &str,
452) -> std::io::Result<(SourceSchema, Vec<SchemaCacheDiagnostic>)> {
453 let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
454 let source = cache.get_source(uri).cloned().ok_or_else(|| {
455 std::io::Error::new(
456 std::io::ErrorKind::NotFound,
457 format!("no cached schema for '{uri}'"),
458 )
459 })?;
460 Ok((source, diagnostics))
461}
462
463impl Default for DataSourceSchemaCache {
464 fn default() -> Self {
465 Self::new()
466 }
467}
468
469impl SourceSchema {
470 pub fn get_entity(&self, name: &str) -> Option<&EntitySchema> {
472 self.tables.get(name)
473 }
474
475 pub fn entity_names(&self) -> Vec<&str> {
477 self.tables.keys().map(|name| name.as_str()).collect()
478 }
479}
480
481impl EntitySchema {
482 pub fn get_field(&self, name: &str) -> Option<&FieldSchema> {
484 self.columns.iter().find(|column| column.name == name)
485 }
486
487 pub fn field_names(&self) -> Vec<&str> {
489 self.columns
490 .iter()
491 .map(|column| column.name.as_str())
492 .collect()
493 }
494}
495
496fn hash_source_schema(source: &SourceSchema) -> String {
497 let mut hasher = Sha256::new();
498 hasher.update(source.uri.as_bytes());
499 hasher.update([0]);
500
501 let mut entity_names: Vec<_> = source.tables.keys().cloned().collect();
502 entity_names.sort();
503 for entity_name in entity_names {
504 hasher.update(entity_name.as_bytes());
505 hasher.update([0]);
506
507 if let Some(entity) = source.tables.get(&entity_name) {
508 for field in &entity.columns {
509 hasher.update(field.name.as_bytes());
510 hasher.update([0]);
511 hasher.update(field.shape_type.as_bytes());
512 hasher.update([0]);
513 hasher.update([if field.nullable { 1 } else { 0 }]);
514 }
515 }
516 }
517
518 format!("sha256:{:x}", hasher.finalize())
519}
520
521fn source_fingerprint_hash<'a>(artifact: &'a LockedArtifact, uri: &str) -> Option<&'a str> {
522 if let ArtifactDeterminism::External { fingerprints } = &artifact.determinism {
523 let key = format!("schema:{uri}");
524 if let Some(value) = fingerprints.get(&key) {
525 return Some(value.as_str());
526 }
527 }
528 artifact.inputs.get("schema_hash").map(String::as_str)
529}
530
531fn type_schema_from_entity(entity: &EntitySchema) -> TypeSchema {
532 let schema_name = format!("DbRow_{}", entity.name);
533 let builder =
534 entity
535 .columns
536 .iter()
537 .fold(
538 TypeSchemaBuilder::new(&schema_name),
539 |builder, field| match field.shape_type.as_str() {
540 "int" => builder.i64_field(&field.name),
541 "number" => builder.f64_field(&field.name),
542 "string" => builder.string_field(&field.name),
543 "bool" => builder.bool_field(&field.name),
544 "timestamp" => builder.timestamp_field(&field.name),
545 _ => builder.any_field(&field.name),
546 },
547 );
548 builder.build()
549}
550
551fn source_to_payload(source: &SourceSchema) -> shape_wire::WireValue {
552 let mut entities: Vec<_> = source.tables.values().collect();
553 entities.sort_by(|left, right| left.name.cmp(&right.name));
554
555 let entity_values = entities
556 .into_iter()
557 .map(|entity| {
558 let field_values = entity
559 .columns
560 .iter()
561 .map(|field| {
562 shape_wire::WireValue::Object(BTreeMap::from([
563 (
564 "name".to_string(),
565 shape_wire::WireValue::String(field.name.clone()),
566 ),
567 (
568 "shape_type".to_string(),
569 shape_wire::WireValue::String(field.shape_type.clone()),
570 ),
571 (
572 "nullable".to_string(),
573 shape_wire::WireValue::Bool(field.nullable),
574 ),
575 ]))
576 })
577 .collect::<Vec<_>>();
578
579 shape_wire::WireValue::Object(BTreeMap::from([
580 (
581 "name".to_string(),
582 shape_wire::WireValue::String(entity.name.clone()),
583 ),
584 (
585 "columns".to_string(),
586 shape_wire::WireValue::Array(field_values),
587 ),
588 ]))
589 })
590 .collect::<Vec<_>>();
591
592 shape_wire::WireValue::Object(BTreeMap::from([
593 (
594 "uri".to_string(),
595 shape_wire::WireValue::String(source.uri.clone()),
596 ),
597 (
598 "cached_at".to_string(),
599 shape_wire::WireValue::String(source.cached_at.clone()),
600 ),
601 (
602 "tables".to_string(),
603 shape_wire::WireValue::Array(entity_values),
604 ),
605 ]))
606}
607
608fn payload_to_source(
609 key_hint: &str,
610 payload: &shape_wire::WireValue,
611) -> Result<SourceSchema, String> {
612 let shape_wire::WireValue::Object(map) = payload else {
613 return Err("source payload must be an object".to_string());
614 };
615
616 let uri = map
617 .get("uri")
618 .and_then(shape_wire::WireValue::as_str)
619 .map(|value| value.to_string())
620 .filter(|value| !value.is_empty())
621 .unwrap_or_else(|| key_hint.to_string());
622 let cached_at = map
623 .get("cached_at")
624 .and_then(shape_wire::WireValue::as_str)
625 .unwrap_or("")
626 .to_string();
627
628 let tables_value = map
629 .get("tables")
630 .ok_or_else(|| "source payload missing 'tables'".to_string())?;
631 let shape_wire::WireValue::Array(table_values) = tables_value else {
632 return Err("source payload 'tables' must be an array".to_string());
633 };
634
635 let mut tables = HashMap::new();
636 for table_value in table_values {
637 let entity = payload_to_entity(table_value)?;
638 tables.insert(entity.name.clone(), entity);
639 }
640
641 Ok(SourceSchema {
642 uri,
643 tables,
644 cached_at,
645 })
646}
647
648fn payload_to_entity(value: &shape_wire::WireValue) -> Result<EntitySchema, String> {
649 let shape_wire::WireValue::Object(table_map) = value else {
650 return Err("table payload must be an object".to_string());
651 };
652
653 let name = table_map
654 .get("name")
655 .and_then(shape_wire::WireValue::as_str)
656 .ok_or_else(|| "table payload missing 'name'".to_string())?
657 .to_string();
658 let columns_value = table_map
659 .get("columns")
660 .ok_or_else(|| "table payload missing 'columns'".to_string())?;
661 let shape_wire::WireValue::Array(column_values) = columns_value else {
662 return Err("table payload 'columns' must be an array".to_string());
663 };
664
665 let mut columns = Vec::with_capacity(column_values.len());
666 for column_value in column_values {
667 columns.push(payload_to_field(column_value)?);
668 }
669
670 Ok(EntitySchema { name, columns })
671}
672
673fn payload_to_field(value: &shape_wire::WireValue) -> Result<FieldSchema, String> {
674 let shape_wire::WireValue::Object(column_map) = value else {
675 return Err("column payload must be an object".to_string());
676 };
677
678 let name = column_map
679 .get("name")
680 .and_then(shape_wire::WireValue::as_str)
681 .ok_or_else(|| "column payload missing 'name'".to_string())?
682 .to_string();
683 let shape_type = column_map
684 .get("shape_type")
685 .and_then(shape_wire::WireValue::as_str)
686 .ok_or_else(|| "column payload missing 'shape_type'".to_string())?
687 .to_string();
688 let nullable = column_map
689 .get("nullable")
690 .and_then(shape_wire::WireValue::as_bool)
691 .ok_or_else(|| "column payload missing 'nullable'".to_string())?;
692
693 Ok(FieldSchema {
694 name,
695 shape_type,
696 nullable,
697 })
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703 use crate::package_lock::ArtifactDeterminism;
704
705 fn sample_cache() -> DataSourceSchemaCache {
706 let mut cache = DataSourceSchemaCache::new();
707 let mut tables = HashMap::new();
708 tables.insert(
709 "users".to_string(),
710 EntitySchema {
711 name: "users".to_string(),
712 columns: vec![
713 FieldSchema {
714 name: "id".to_string(),
715 shape_type: "int".to_string(),
716 nullable: false,
717 },
718 FieldSchema {
719 name: "name".to_string(),
720 shape_type: "string".to_string(),
721 nullable: false,
722 },
723 FieldSchema {
724 name: "age".to_string(),
725 shape_type: "int".to_string(),
726 nullable: true,
727 },
728 ],
729 },
730 );
731 cache.upsert_source(SourceSchema {
732 uri: "duckdb://analytics.db".to_string(),
733 tables,
734 cached_at: "2026-02-12T10:00:00Z".to_string(),
735 });
736 cache
737 }
738
739 #[test]
740 fn test_roundtrip_serialization() {
741 let cache = sample_cache();
742
743 let dir = tempfile::tempdir().unwrap();
744 let path = dir.path().join(CACHE_FILENAME);
745 cache.save(&path).unwrap();
746
747 let loaded = DataSourceSchemaCache::load(&path).unwrap();
748 assert_eq!(loaded.version, SCHEMA_CACHE_VERSION);
749 assert_eq!(loaded.sources.len(), 1);
750
751 let conn = loaded.get_source("duckdb://analytics.db").unwrap();
752 assert_eq!(conn.tables.len(), 1);
753
754 let users = conn.get_entity("users").unwrap();
755 assert_eq!(users.columns.len(), 3);
756 assert_eq!(users.columns[0].name, "id");
757 assert_eq!(users.columns[0].shape_type, "int");
758 assert!(!users.columns[0].nullable);
759 assert_eq!(users.columns[2].name, "age");
760 assert!(users.columns[2].nullable);
761 }
762
763 #[test]
764 fn test_load_or_empty_missing_file() {
765 let cache = DataSourceSchemaCache::load_or_empty(Path::new("/nonexistent/path.toml"));
766 assert_eq!(cache.version, SCHEMA_CACHE_VERSION);
767 assert!(cache.sources.is_empty());
768 }
769
770 #[test]
771 fn test_source_helpers() {
772 let cache = sample_cache();
773 let conn = cache.get_source("duckdb://analytics.db").unwrap();
774
775 let names = conn.entity_names();
776 assert!(names.contains(&"users"));
777
778 let users = conn.get_entity("users").unwrap();
779 assert_eq!(users.field_names(), vec!["id", "name", "age"]);
780 assert!(users.get_field("id").is_some());
781 assert!(users.get_field("nonexistent").is_none());
782 }
783
784 #[test]
785 fn test_upsert_source() {
786 let mut cache = DataSourceSchemaCache::new();
787 assert!(cache.sources.is_empty());
788
789 cache.upsert_source(SourceSchema {
790 uri: "duckdb://test.db".to_string(),
791 tables: HashMap::new(),
792 cached_at: "2026-01-01T00:00:00Z".to_string(),
793 });
794 assert_eq!(cache.sources.len(), 1);
795
796 cache.upsert_source(SourceSchema {
797 uri: "duckdb://test.db".to_string(),
798 tables: HashMap::new(),
799 cached_at: "2026-02-01T00:00:00Z".to_string(),
800 });
801 assert_eq!(cache.sources.len(), 1);
802 assert_eq!(
803 cache.get_source("duckdb://test.db").unwrap().cached_at,
804 "2026-02-01T00:00:00Z"
805 );
806 }
807
808 #[test]
809 fn test_load_with_diagnostics_reports_stale_fingerprint() {
810 let cache = sample_cache();
811 let source = cache.get_source("duckdb://analytics.db").unwrap();
812
813 let payload = source_to_payload(source);
814 let artifact = LockedArtifact::new(
815 SCHEMA_CACHE_NAMESPACE,
816 source.uri.clone(),
817 SCHEMA_CACHE_PRODUCER,
818 ArtifactDeterminism::External {
819 fingerprints: BTreeMap::from([(
820 format!("schema:{}", source.uri),
821 "sha256:deadbeef".to_string(),
822 )]),
823 },
824 BTreeMap::from([
825 ("uri".to_string(), source.uri.clone()),
826 ("schema_hash".to_string(), "sha256:deadbeef".to_string()),
827 ]),
828 payload,
829 )
830 .unwrap();
831
832 let mut lock = PackageLock::new();
833 lock.upsert_artifact(artifact).unwrap();
834
835 let dir = tempfile::tempdir().unwrap();
836 let path = dir.path().join(CACHE_FILENAME);
837 lock.write(&path).unwrap();
838
839 let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
840 assert!(loaded.sources.is_empty());
841 assert_eq!(diagnostics.len(), 1);
842 assert!(diagnostics[0].message.contains("stale schema fingerprint"));
843 }
844
845 #[test]
846 fn test_load_with_diagnostics_reports_invalid_payload() {
847 let artifact = LockedArtifact::new(
848 SCHEMA_CACHE_NAMESPACE,
849 "broken://source",
850 SCHEMA_CACHE_PRODUCER,
851 ArtifactDeterminism::External {
852 fingerprints: BTreeMap::from([(
853 "schema:broken://source".to_string(),
854 "sha256:abc".to_string(),
855 )]),
856 },
857 BTreeMap::from([
858 ("uri".to_string(), "broken://source".to_string()),
859 ("schema_hash".to_string(), "sha256:abc".to_string()),
860 ]),
861 shape_wire::WireValue::String("bad".to_string()),
862 )
863 .unwrap();
864
865 let mut lock = PackageLock::new();
866 lock.upsert_artifact(artifact).unwrap();
867
868 let dir = tempfile::tempdir().unwrap();
869 let path = dir.path().join(CACHE_FILENAME);
870 lock.write(&path).unwrap();
871
872 let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
873 assert!(loaded.sources.is_empty());
874 assert_eq!(diagnostics.len(), 1);
875 assert!(diagnostics[0].message.contains("payload parse failed"));
876 }
877
878 #[test]
879 fn test_load_cached_type_schemas_for_uri_prefixes_filters_sources() {
880 let mut cache = DataSourceSchemaCache::new();
881
882 let mut duck_tables = HashMap::new();
883 duck_tables.insert(
884 "users".to_string(),
885 EntitySchema {
886 name: "users".to_string(),
887 columns: vec![FieldSchema {
888 name: "id".to_string(),
889 shape_type: "int".to_string(),
890 nullable: false,
891 }],
892 },
893 );
894 cache.upsert_source(SourceSchema {
895 uri: "duckdb://analytics.db".to_string(),
896 tables: duck_tables,
897 cached_at: "2026-02-17T00:00:00Z".to_string(),
898 });
899
900 let mut pg_tables = HashMap::new();
901 pg_tables.insert(
902 "orders".to_string(),
903 EntitySchema {
904 name: "orders".to_string(),
905 columns: vec![FieldSchema {
906 name: "id".to_string(),
907 shape_type: "int".to_string(),
908 nullable: false,
909 }],
910 },
911 );
912 cache.upsert_source(SourceSchema {
913 uri: "postgres://localhost/app".to_string(),
914 tables: pg_tables,
915 cached_at: "2026-02-17T00:00:00Z".to_string(),
916 });
917
918 let dir = tempfile::tempdir().unwrap();
919 let path = dir.path().join(CACHE_FILENAME);
920 cache.save(&path).unwrap();
921
922 let duck_schemas =
923 load_cached_type_schemas_for_uri_prefixes(&path, &["duckdb://"]).unwrap();
924 assert_eq!(duck_schemas.len(), 1);
925 assert_eq!(duck_schemas[0].name, "DbRow_users");
926
927 let pg_schemas =
928 load_cached_type_schemas_for_uri_prefixes(&path, &["postgres://", "postgresql://"])
929 .unwrap();
930 assert_eq!(pg_schemas.len(), 1);
931 assert_eq!(pg_schemas[0].name, "DbRow_orders");
932 }
933
934 #[test]
935 fn test_load_cached_source_for_uri_with_diagnostics() {
936 let cache = sample_cache();
937 let dir = tempfile::tempdir().unwrap();
938 let path = dir.path().join(CACHE_FILENAME);
939 cache.save(&path).unwrap();
940
941 let (source, diagnostics) =
942 load_cached_source_for_uri_with_diagnostics(&path, "duckdb://analytics.db").unwrap();
943 assert_eq!(source.uri, "duckdb://analytics.db");
944 assert!(diagnostics.is_empty());
945 }
946
947 #[test]
948 fn test_default_cache_path_ends_with_shape_lock() {
949 let path = default_cache_path();
950 assert!(path.ends_with(CACHE_FILENAME));
951 }
952}