1use crate::package_lock::{ArtifactDeterminism, LockedArtifact, PackageLock};
7use crate::type_schema::{
8 TypeSchema, TypeSchemaBuilder, typed_object_from_nb_pairs, typed_object_to_hashmap_nb,
9};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12use shape_value::ValueWord;
13use shape_wire::WireValue;
14use std::collections::{BTreeMap, HashMap};
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::sync::RwLock;
18
19pub const CACHE_FILENAME: &str = "shape.lock";
21
22const SCHEMA_CACHE_VERSION: u32 = 1;
23const SCHEMA_CACHE_NAMESPACE: &str = "external.datasource.schema";
24const SCHEMA_CACHE_PRODUCER: &str = "shape-runtime/schema_cache@v1";
25
26static DEFAULT_CACHE_PATH_OVERRIDE: std::sync::LazyLock<RwLock<Option<PathBuf>>> =
27 std::sync::LazyLock::new(|| RwLock::new(None));
28
29#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct SchemaCacheDiagnostic {
32 pub key: String,
33 pub message: String,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct DataSourceSchemaCache {
39 pub version: u32,
41 pub sources: HashMap<String, SourceSchema>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SourceSchema {
48 pub uri: String,
50 pub tables: HashMap<String, EntitySchema>,
52 pub cached_at: String,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct EntitySchema {
59 pub name: String,
61 pub columns: Vec<FieldSchema>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct FieldSchema {
68 pub name: String,
70 #[serde(rename = "type")]
72 pub shape_type: String,
73 pub nullable: bool,
75}
76
77pub fn source_schema_to_nb(schema: &SourceSchema) -> ValueWord {
82 let mut table_pairs: Vec<(String, ValueWord)> = schema
83 .tables
84 .iter()
85 .map(|(table_name, entity)| {
86 let columns = entity
87 .columns
88 .iter()
89 .map(|column| {
90 typed_object_from_nb_pairs(&[
91 (
92 "name",
93 ValueWord::from_string(Arc::new(column.name.clone())),
94 ),
95 (
96 "type",
97 ValueWord::from_string(Arc::new(column.shape_type.clone())),
98 ),
99 ("nullable", ValueWord::from_bool(column.nullable)),
100 ])
101 })
102 .collect::<Vec<_>>();
103
104 let entity_nb = typed_object_from_nb_pairs(&[
105 (
106 "name",
107 ValueWord::from_string(Arc::new(entity.name.clone())),
108 ),
109 ("columns", ValueWord::from_array(Arc::new(columns))),
110 ]);
111
112 (table_name.clone(), entity_nb)
113 })
114 .collect();
115 table_pairs.sort_by(|left, right| left.0.cmp(&right.0));
116
117 let table_refs: Vec<(&str, ValueWord)> = table_pairs
118 .iter()
119 .map(|(name, value)| (name.as_str(), value.clone()))
120 .collect();
121 let tables_nb = typed_object_from_nb_pairs(&table_refs);
122
123 typed_object_from_nb_pairs(&[
124 ("uri", ValueWord::from_string(Arc::new(schema.uri.clone()))),
125 ("tables", tables_nb),
126 (
127 "cached_at",
128 ValueWord::from_string(Arc::new(schema.cached_at.clone())),
129 ),
130 ])
131}
132
133pub fn source_schema_from_nb(value: &ValueWord) -> Result<SourceSchema, String> {
135 let object = typed_object_to_hashmap_nb(value)
136 .ok_or_else(|| "schema payload must be a typed object".to_string())?;
137
138 let uri = object
139 .get("uri")
140 .and_then(|v| v.as_str().map(|s| s.to_string()))
141 .ok_or_else(|| "schema payload missing string field 'uri'".to_string())?;
142
143 let cached_at = object
144 .get("cached_at")
145 .and_then(|v| v.as_str().map(|s| s.to_string()))
146 .unwrap_or_default();
147
148 let tables_nb = object
149 .get("tables")
150 .ok_or_else(|| "schema payload missing object field 'tables'".to_string())?;
151 let tables_obj = typed_object_to_hashmap_nb(tables_nb)
152 .ok_or_else(|| "schema payload field 'tables' must be an object".to_string())?;
153
154 let mut tables = HashMap::new();
155 for (table_name, entity_nb) in tables_obj {
156 let entity_obj = typed_object_to_hashmap_nb(&entity_nb)
157 .ok_or_else(|| format!("table '{table_name}' schema must be an object"))?;
158
159 let entity_name = entity_obj
160 .get("name")
161 .and_then(|v| v.as_str().map(|s| s.to_string()))
162 .unwrap_or_else(|| table_name.clone());
163
164 let columns_nb = entity_obj
165 .get("columns")
166 .ok_or_else(|| format!("table '{table_name}' missing 'columns' array"))?;
167 let columns_arr = columns_nb
168 .as_any_array()
169 .ok_or_else(|| format!("table '{table_name}' field 'columns' must be an array"))?
170 .to_generic();
171
172 let mut columns = Vec::new();
173 for column_nb in columns_arr.iter() {
174 let column_obj = typed_object_to_hashmap_nb(column_nb)
175 .ok_or_else(|| format!("table '{table_name}' contains non-object column entry"))?;
176 let name = column_obj
177 .get("name")
178 .and_then(|v| v.as_str().map(|s| s.to_string()))
179 .ok_or_else(|| format!("table '{table_name}' column missing string 'name'"))?;
180 let shape_type = column_obj
181 .get("type")
182 .or_else(|| column_obj.get("shape_type"))
183 .and_then(|v| v.as_str().map(|s| s.to_string()))
184 .ok_or_else(|| format!("table '{table_name}' column '{name}' missing type"))?;
185 let nullable = column_obj
186 .get("nullable")
187 .and_then(|v| v.as_bool())
188 .unwrap_or(false);
189
190 columns.push(FieldSchema {
191 name,
192 shape_type,
193 nullable,
194 });
195 }
196
197 tables.insert(
198 table_name.clone(),
199 EntitySchema {
200 name: entity_name,
201 columns,
202 },
203 );
204 }
205
206 Ok(SourceSchema {
207 uri,
208 tables,
209 cached_at,
210 })
211}
212
213pub fn source_schema_from_wire(value: &WireValue) -> Result<SourceSchema, String> {
215 let object = match value {
216 WireValue::Object(map) => map,
217 _ => return Err("schema payload must be an object".to_string()),
218 };
219
220 let uri = object
221 .get("uri")
222 .and_then(|v| match v {
223 WireValue::String(s) => Some(s.clone()),
224 _ => None,
225 })
226 .ok_or_else(|| "schema payload missing string field 'uri'".to_string())?;
227
228 let cached_at = object
229 .get("cached_at")
230 .and_then(|v| match v {
231 WireValue::String(s) => Some(s.clone()),
232 _ => None,
233 })
234 .unwrap_or_default();
235
236 let tables_value = object
237 .get("tables")
238 .ok_or_else(|| "schema payload missing object field 'tables'".to_string())?;
239 let tables_object = match tables_value {
240 WireValue::Object(map) => map,
241 _ => return Err("schema payload field 'tables' must be an object".to_string()),
242 };
243
244 let mut tables = HashMap::new();
245 for (table_name, entity_value) in tables_object {
246 let entity_obj = match entity_value {
247 WireValue::Object(map) => map,
248 _ => return Err(format!("table '{table_name}' schema must be an object")),
249 };
250
251 let entity_name = entity_obj
252 .get("name")
253 .and_then(|v| match v {
254 WireValue::String(s) => Some(s.clone()),
255 _ => None,
256 })
257 .unwrap_or_else(|| table_name.clone());
258
259 let columns_value = entity_obj
260 .get("columns")
261 .ok_or_else(|| format!("table '{table_name}' missing 'columns' array"))?;
262 let columns_array = match columns_value {
263 WireValue::Array(values) => values,
264 _ => {
265 return Err(format!(
266 "table '{table_name}' field 'columns' must be an array"
267 ));
268 }
269 };
270
271 let mut columns = Vec::new();
272 for column_value in columns_array {
273 let column_obj = match column_value {
274 WireValue::Object(map) => map,
275 _ => {
276 return Err(format!(
277 "table '{table_name}' contains non-object column entry"
278 ));
279 }
280 };
281
282 let name = column_obj
283 .get("name")
284 .and_then(|v| match v {
285 WireValue::String(s) => Some(s.clone()),
286 _ => None,
287 })
288 .ok_or_else(|| format!("table '{table_name}' column missing string 'name'"))?;
289
290 let shape_type = column_obj
291 .get("type")
292 .or_else(|| column_obj.get("shape_type"))
293 .and_then(|v| match v {
294 WireValue::String(s) => Some(s.clone()),
295 _ => None,
296 })
297 .ok_or_else(|| format!("table '{table_name}' column '{name}' missing type"))?;
298
299 let nullable = column_obj
300 .get("nullable")
301 .and_then(|v| match v {
302 WireValue::Bool(b) => Some(*b),
303 _ => None,
304 })
305 .unwrap_or(false);
306
307 columns.push(FieldSchema {
308 name,
309 shape_type,
310 nullable,
311 });
312 }
313
314 tables.insert(
315 table_name.clone(),
316 EntitySchema {
317 name: entity_name,
318 columns,
319 },
320 );
321 }
322
323 Ok(SourceSchema {
324 uri,
325 tables,
326 cached_at,
327 })
328}
329
330impl DataSourceSchemaCache {
331 pub fn new() -> Self {
333 Self {
334 version: SCHEMA_CACHE_VERSION,
335 sources: HashMap::new(),
336 }
337 }
338
339 pub fn save(&self, path: &Path) -> std::io::Result<()> {
341 let mut lock = PackageLock::read(path).unwrap_or_default();
342
343 lock.artifacts
345 .retain(|artifact| artifact.namespace != SCHEMA_CACHE_NAMESPACE);
346
347 let mut uris: Vec<_> = self.sources.keys().cloned().collect();
348 uris.sort();
349
350 for uri in uris {
351 let Some(source) = self.sources.get(&uri) else {
352 continue;
353 };
354
355 let schema_hash = hash_source_schema(source);
356 let payload = source_to_payload(source);
357
358 let mut inputs = BTreeMap::new();
359 inputs.insert("uri".to_string(), uri.clone());
360 inputs.insert("schema_hash".to_string(), schema_hash.clone());
361
362 let determinism = ArtifactDeterminism::External {
363 fingerprints: BTreeMap::from([(format!("schema:{uri}"), schema_hash)]),
364 };
365
366 let artifact = LockedArtifact::new(
367 SCHEMA_CACHE_NAMESPACE,
368 uri,
369 SCHEMA_CACHE_PRODUCER,
370 determinism,
371 inputs,
372 payload,
373 )
374 .map_err(std::io::Error::other)?;
375
376 lock.upsert_artifact(artifact)
377 .map_err(std::io::Error::other)?;
378 }
379
380 lock.write(path)
381 }
382
383 pub fn load(path: &Path) -> std::io::Result<Self> {
385 let (cache, diagnostics) = Self::load_with_diagnostics(path)?;
386 if let Some(diag) = diagnostics.first() {
387 return Err(std::io::Error::new(
388 std::io::ErrorKind::InvalidData,
389 format!("invalid schema artifact '{}': {}", diag.key, diag.message),
390 ));
391 }
392 Ok(cache)
393 }
394
395 pub fn load_with_diagnostics(
398 path: &Path,
399 ) -> std::io::Result<(Self, Vec<SchemaCacheDiagnostic>)> {
400 let lock = PackageLock::read(path).ok_or_else(|| {
401 std::io::Error::new(std::io::ErrorKind::NotFound, "shape.lock not found")
402 })?;
403
404 let mut sources = HashMap::new();
405 let mut diagnostics = Vec::new();
406 for artifact in lock
407 .artifacts
408 .iter()
409 .filter(|artifact| artifact.namespace == SCHEMA_CACHE_NAMESPACE)
410 {
411 let payload = match artifact.payload() {
412 Ok(payload) => payload,
413 Err(err) => {
414 diagnostics.push(SchemaCacheDiagnostic {
415 key: artifact.key.clone(),
416 message: format!("payload decode failed: {err}"),
417 });
418 continue;
419 }
420 };
421 let source = match payload_to_source(&artifact.key, &payload) {
422 Ok(source) => source,
423 Err(err) => {
424 diagnostics.push(SchemaCacheDiagnostic {
425 key: artifact.key.clone(),
426 message: format!("payload parse failed: {err}"),
427 });
428 continue;
429 }
430 };
431
432 if let Some(expected_hash) = source_fingerprint_hash(artifact, &source.uri) {
433 let actual_hash = hash_source_schema(&source);
434 if expected_hash != actual_hash {
435 diagnostics.push(SchemaCacheDiagnostic {
436 key: artifact.key.clone(),
437 message: format!(
438 "stale schema fingerprint (expected {expected_hash}, computed {actual_hash})"
439 ),
440 });
441 continue;
442 }
443 } else {
444 diagnostics.push(SchemaCacheDiagnostic {
445 key: artifact.key.clone(),
446 message: "missing schema fingerprint".to_string(),
447 });
448 continue;
449 }
450 sources.insert(source.uri.clone(), source);
451 }
452
453 Ok((
454 Self {
455 version: SCHEMA_CACHE_VERSION,
456 sources,
457 },
458 diagnostics,
459 ))
460 }
461
462 pub fn load_or_empty(path: &Path) -> Self {
464 match Self::load(path) {
465 Ok(cache) => cache,
466 Err(_) => Self::new(),
467 }
468 }
469
470 pub fn get_source(&self, uri: &str) -> Option<&SourceSchema> {
472 self.sources.get(uri)
473 }
474
475 pub fn upsert_source(&mut self, schema: SourceSchema) {
477 self.sources.insert(schema.uri.clone(), schema);
478 }
479
480 pub fn is_offline() -> bool {
482 std::env::var("SHAPE_OFFLINE")
483 .map(|value| value == "true" || value == "1")
484 .unwrap_or(false)
485 }
486}
487
488pub fn load_cached_type_schemas_for_uri_prefixes(
493 cache_path: &Path,
494 uri_prefixes: &[&str],
495) -> std::io::Result<Vec<TypeSchema>> {
496 let (schemas, _diagnostics) =
497 load_cached_type_schemas_for_uri_prefixes_with_diagnostics(cache_path, uri_prefixes)?;
498 Ok(schemas)
499}
500
501pub fn load_cached_type_schemas_for_uri_prefixes_with_diagnostics(
504 cache_path: &Path,
505 uri_prefixes: &[&str],
506) -> std::io::Result<(Vec<TypeSchema>, Vec<SchemaCacheDiagnostic>)> {
507 let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
508 let mut schemas = Vec::new();
509
510 for source in cache.sources.values() {
511 if !uri_prefixes.is_empty()
512 && !uri_prefixes
513 .iter()
514 .any(|prefix| source.uri.starts_with(prefix))
515 {
516 continue;
517 }
518
519 for table in source.tables.values() {
520 schemas.push(type_schema_from_entity(table));
521 }
522 }
523
524 Ok((schemas, diagnostics))
525}
526
527pub fn default_cache_path() -> PathBuf {
529 if let Ok(guard) = DEFAULT_CACHE_PATH_OVERRIDE.read() {
530 if let Some(path) = guard.as_ref() {
531 return path.clone();
532 }
533 }
534
535 std::env::current_dir()
536 .unwrap_or_default()
537 .join(CACHE_FILENAME)
538}
539
540pub fn set_default_cache_path(path: Option<PathBuf>) {
543 if let Ok(mut guard) = DEFAULT_CACHE_PATH_OVERRIDE.write() {
544 *guard = path;
545 }
546}
547
548pub fn load_cached_source_for_uri(cache_path: &Path, uri: &str) -> std::io::Result<SourceSchema> {
550 let (source, diagnostics) = load_cached_source_for_uri_with_diagnostics(cache_path, uri)?;
551 if let Some(diag) = diagnostics.first() {
552 return Err(std::io::Error::new(
553 std::io::ErrorKind::InvalidData,
554 format!("invalid schema artifact '{}': {}", diag.key, diag.message),
555 ));
556 }
557 Ok(source)
558}
559
560pub fn load_cached_source_for_uri_with_diagnostics(
563 cache_path: &Path,
564 uri: &str,
565) -> std::io::Result<(SourceSchema, Vec<SchemaCacheDiagnostic>)> {
566 let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
567 let source = cache.get_source(uri).cloned().ok_or_else(|| {
568 std::io::Error::new(
569 std::io::ErrorKind::NotFound,
570 format!("no cached schema for '{uri}'"),
571 )
572 })?;
573 Ok((source, diagnostics))
574}
575
576impl Default for DataSourceSchemaCache {
577 fn default() -> Self {
578 Self::new()
579 }
580}
581
582impl SourceSchema {
583 pub fn get_entity(&self, name: &str) -> Option<&EntitySchema> {
585 self.tables.get(name)
586 }
587
588 pub fn entity_names(&self) -> Vec<&str> {
590 self.tables.keys().map(|name| name.as_str()).collect()
591 }
592}
593
594impl EntitySchema {
595 pub fn get_field(&self, name: &str) -> Option<&FieldSchema> {
597 self.columns.iter().find(|column| column.name == name)
598 }
599
600 pub fn field_names(&self) -> Vec<&str> {
602 self.columns
603 .iter()
604 .map(|column| column.name.as_str())
605 .collect()
606 }
607}
608
609fn hash_source_schema(source: &SourceSchema) -> String {
610 let mut hasher = Sha256::new();
611 hasher.update(source.uri.as_bytes());
612 hasher.update([0]);
613
614 let mut entity_names: Vec<_> = source.tables.keys().cloned().collect();
615 entity_names.sort();
616 for entity_name in entity_names {
617 hasher.update(entity_name.as_bytes());
618 hasher.update([0]);
619
620 if let Some(entity) = source.tables.get(&entity_name) {
621 for field in &entity.columns {
622 hasher.update(field.name.as_bytes());
623 hasher.update([0]);
624 hasher.update(field.shape_type.as_bytes());
625 hasher.update([0]);
626 hasher.update([if field.nullable { 1 } else { 0 }]);
627 }
628 }
629 }
630
631 format!("sha256:{:x}", hasher.finalize())
632}
633
634fn source_fingerprint_hash<'a>(artifact: &'a LockedArtifact, uri: &str) -> Option<&'a str> {
635 if let ArtifactDeterminism::External { fingerprints } = &artifact.determinism {
636 let key = format!("schema:{uri}");
637 if let Some(value) = fingerprints.get(&key) {
638 return Some(value.as_str());
639 }
640 }
641 artifact.inputs.get("schema_hash").map(String::as_str)
642}
643
644fn type_schema_from_entity(entity: &EntitySchema) -> TypeSchema {
645 let schema_name = format!("DbRow_{}", entity.name);
646 let builder =
647 entity
648 .columns
649 .iter()
650 .fold(
651 TypeSchemaBuilder::new(&schema_name),
652 |builder, field| match field.shape_type.as_str() {
653 "int" => builder.i64_field(&field.name),
654 "number" => builder.f64_field(&field.name),
655 "string" => builder.string_field(&field.name),
656 "bool" => builder.bool_field(&field.name),
657 "timestamp" => builder.timestamp_field(&field.name),
658 _ => builder.any_field(&field.name),
659 },
660 );
661 builder.build()
662}
663
664fn source_to_payload(source: &SourceSchema) -> shape_wire::WireValue {
665 let mut entities: Vec<_> = source.tables.values().collect();
666 entities.sort_by(|left, right| left.name.cmp(&right.name));
667
668 let entity_values = entities
669 .into_iter()
670 .map(|entity| {
671 let field_values = entity
672 .columns
673 .iter()
674 .map(|field| {
675 shape_wire::WireValue::Object(BTreeMap::from([
676 (
677 "name".to_string(),
678 shape_wire::WireValue::String(field.name.clone()),
679 ),
680 (
681 "shape_type".to_string(),
682 shape_wire::WireValue::String(field.shape_type.clone()),
683 ),
684 (
685 "nullable".to_string(),
686 shape_wire::WireValue::Bool(field.nullable),
687 ),
688 ]))
689 })
690 .collect::<Vec<_>>();
691
692 shape_wire::WireValue::Object(BTreeMap::from([
693 (
694 "name".to_string(),
695 shape_wire::WireValue::String(entity.name.clone()),
696 ),
697 (
698 "columns".to_string(),
699 shape_wire::WireValue::Array(field_values),
700 ),
701 ]))
702 })
703 .collect::<Vec<_>>();
704
705 shape_wire::WireValue::Object(BTreeMap::from([
706 (
707 "uri".to_string(),
708 shape_wire::WireValue::String(source.uri.clone()),
709 ),
710 (
711 "cached_at".to_string(),
712 shape_wire::WireValue::String(source.cached_at.clone()),
713 ),
714 (
715 "tables".to_string(),
716 shape_wire::WireValue::Array(entity_values),
717 ),
718 ]))
719}
720
721fn payload_to_source(
722 key_hint: &str,
723 payload: &shape_wire::WireValue,
724) -> Result<SourceSchema, String> {
725 let shape_wire::WireValue::Object(map) = payload else {
726 return Err("source payload must be an object".to_string());
727 };
728
729 let uri = map
730 .get("uri")
731 .and_then(shape_wire::WireValue::as_str)
732 .map(|value| value.to_string())
733 .filter(|value| !value.is_empty())
734 .unwrap_or_else(|| key_hint.to_string());
735 let cached_at = map
736 .get("cached_at")
737 .and_then(shape_wire::WireValue::as_str)
738 .unwrap_or("")
739 .to_string();
740
741 let tables_value = map
742 .get("tables")
743 .ok_or_else(|| "source payload missing 'tables'".to_string())?;
744 let shape_wire::WireValue::Array(table_values) = tables_value else {
745 return Err("source payload 'tables' must be an array".to_string());
746 };
747
748 let mut tables = HashMap::new();
749 for table_value in table_values {
750 let entity = payload_to_entity(table_value)?;
751 tables.insert(entity.name.clone(), entity);
752 }
753
754 Ok(SourceSchema {
755 uri,
756 tables,
757 cached_at,
758 })
759}
760
761fn payload_to_entity(value: &shape_wire::WireValue) -> Result<EntitySchema, String> {
762 let shape_wire::WireValue::Object(table_map) = value else {
763 return Err("table payload must be an object".to_string());
764 };
765
766 let name = table_map
767 .get("name")
768 .and_then(shape_wire::WireValue::as_str)
769 .ok_or_else(|| "table payload missing 'name'".to_string())?
770 .to_string();
771 let columns_value = table_map
772 .get("columns")
773 .ok_or_else(|| "table payload missing 'columns'".to_string())?;
774 let shape_wire::WireValue::Array(column_values) = columns_value else {
775 return Err("table payload 'columns' must be an array".to_string());
776 };
777
778 let mut columns = Vec::with_capacity(column_values.len());
779 for column_value in column_values {
780 columns.push(payload_to_field(column_value)?);
781 }
782
783 Ok(EntitySchema { name, columns })
784}
785
786fn payload_to_field(value: &shape_wire::WireValue) -> Result<FieldSchema, String> {
787 let shape_wire::WireValue::Object(column_map) = value else {
788 return Err("column payload must be an object".to_string());
789 };
790
791 let name = column_map
792 .get("name")
793 .and_then(shape_wire::WireValue::as_str)
794 .ok_or_else(|| "column payload missing 'name'".to_string())?
795 .to_string();
796 let shape_type = column_map
797 .get("shape_type")
798 .and_then(shape_wire::WireValue::as_str)
799 .ok_or_else(|| "column payload missing 'shape_type'".to_string())?
800 .to_string();
801 let nullable = column_map
802 .get("nullable")
803 .and_then(shape_wire::WireValue::as_bool)
804 .ok_or_else(|| "column payload missing 'nullable'".to_string())?;
805
806 Ok(FieldSchema {
807 name,
808 shape_type,
809 nullable,
810 })
811}
812
813#[cfg(test)]
814mod tests {
815 use super::*;
816 use crate::package_lock::ArtifactDeterminism;
817
818 fn sample_cache() -> DataSourceSchemaCache {
819 let mut cache = DataSourceSchemaCache::new();
820 let mut tables = HashMap::new();
821 tables.insert(
822 "users".to_string(),
823 EntitySchema {
824 name: "users".to_string(),
825 columns: vec![
826 FieldSchema {
827 name: "id".to_string(),
828 shape_type: "int".to_string(),
829 nullable: false,
830 },
831 FieldSchema {
832 name: "name".to_string(),
833 shape_type: "string".to_string(),
834 nullable: false,
835 },
836 FieldSchema {
837 name: "age".to_string(),
838 shape_type: "int".to_string(),
839 nullable: true,
840 },
841 ],
842 },
843 );
844 cache.upsert_source(SourceSchema {
845 uri: "duckdb://analytics.db".to_string(),
846 tables,
847 cached_at: "2026-02-12T10:00:00Z".to_string(),
848 });
849 cache
850 }
851
852 #[test]
853 fn test_roundtrip_serialization() {
854 let cache = sample_cache();
855
856 let dir = tempfile::tempdir().unwrap();
857 let path = dir.path().join(CACHE_FILENAME);
858 cache.save(&path).unwrap();
859
860 let loaded = DataSourceSchemaCache::load(&path).unwrap();
861 assert_eq!(loaded.version, SCHEMA_CACHE_VERSION);
862 assert_eq!(loaded.sources.len(), 1);
863
864 let conn = loaded.get_source("duckdb://analytics.db").unwrap();
865 assert_eq!(conn.tables.len(), 1);
866
867 let users = conn.get_entity("users").unwrap();
868 assert_eq!(users.columns.len(), 3);
869 assert_eq!(users.columns[0].name, "id");
870 assert_eq!(users.columns[0].shape_type, "int");
871 assert!(!users.columns[0].nullable);
872 assert_eq!(users.columns[2].name, "age");
873 assert!(users.columns[2].nullable);
874 }
875
876 #[test]
877 fn test_load_or_empty_missing_file() {
878 let cache = DataSourceSchemaCache::load_or_empty(Path::new("/nonexistent/path.toml"));
879 assert_eq!(cache.version, SCHEMA_CACHE_VERSION);
880 assert!(cache.sources.is_empty());
881 }
882
883 #[test]
884 fn test_source_helpers() {
885 let cache = sample_cache();
886 let conn = cache.get_source("duckdb://analytics.db").unwrap();
887
888 let names = conn.entity_names();
889 assert!(names.contains(&"users"));
890
891 let users = conn.get_entity("users").unwrap();
892 assert_eq!(users.field_names(), vec!["id", "name", "age"]);
893 assert!(users.get_field("id").is_some());
894 assert!(users.get_field("nonexistent").is_none());
895 }
896
897 #[test]
898 fn test_upsert_source() {
899 let mut cache = DataSourceSchemaCache::new();
900 assert!(cache.sources.is_empty());
901
902 cache.upsert_source(SourceSchema {
903 uri: "duckdb://test.db".to_string(),
904 tables: HashMap::new(),
905 cached_at: "2026-01-01T00:00:00Z".to_string(),
906 });
907 assert_eq!(cache.sources.len(), 1);
908
909 cache.upsert_source(SourceSchema {
910 uri: "duckdb://test.db".to_string(),
911 tables: HashMap::new(),
912 cached_at: "2026-02-01T00:00:00Z".to_string(),
913 });
914 assert_eq!(cache.sources.len(), 1);
915 assert_eq!(
916 cache.get_source("duckdb://test.db").unwrap().cached_at,
917 "2026-02-01T00:00:00Z"
918 );
919 }
920
921 #[test]
922 fn test_load_with_diagnostics_reports_stale_fingerprint() {
923 let cache = sample_cache();
924 let source = cache.get_source("duckdb://analytics.db").unwrap();
925
926 let payload = source_to_payload(source);
927 let artifact = LockedArtifact::new(
928 SCHEMA_CACHE_NAMESPACE,
929 source.uri.clone(),
930 SCHEMA_CACHE_PRODUCER,
931 ArtifactDeterminism::External {
932 fingerprints: BTreeMap::from([(
933 format!("schema:{}", source.uri),
934 "sha256:deadbeef".to_string(),
935 )]),
936 },
937 BTreeMap::from([
938 ("uri".to_string(), source.uri.clone()),
939 ("schema_hash".to_string(), "sha256:deadbeef".to_string()),
940 ]),
941 payload,
942 )
943 .unwrap();
944
945 let mut lock = PackageLock::new();
946 lock.upsert_artifact(artifact).unwrap();
947
948 let dir = tempfile::tempdir().unwrap();
949 let path = dir.path().join(CACHE_FILENAME);
950 lock.write(&path).unwrap();
951
952 let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
953 assert!(loaded.sources.is_empty());
954 assert_eq!(diagnostics.len(), 1);
955 assert!(diagnostics[0].message.contains("stale schema fingerprint"));
956 }
957
958 #[test]
959 fn test_load_with_diagnostics_reports_invalid_payload() {
960 let artifact = LockedArtifact::new(
961 SCHEMA_CACHE_NAMESPACE,
962 "broken://source",
963 SCHEMA_CACHE_PRODUCER,
964 ArtifactDeterminism::External {
965 fingerprints: BTreeMap::from([(
966 "schema:broken://source".to_string(),
967 "sha256:abc".to_string(),
968 )]),
969 },
970 BTreeMap::from([
971 ("uri".to_string(), "broken://source".to_string()),
972 ("schema_hash".to_string(), "sha256:abc".to_string()),
973 ]),
974 shape_wire::WireValue::String("bad".to_string()),
975 )
976 .unwrap();
977
978 let mut lock = PackageLock::new();
979 lock.upsert_artifact(artifact).unwrap();
980
981 let dir = tempfile::tempdir().unwrap();
982 let path = dir.path().join(CACHE_FILENAME);
983 lock.write(&path).unwrap();
984
985 let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
986 assert!(loaded.sources.is_empty());
987 assert_eq!(diagnostics.len(), 1);
988 assert!(diagnostics[0].message.contains("payload parse failed"));
989 }
990
991 #[test]
992 fn test_load_cached_type_schemas_for_uri_prefixes_filters_sources() {
993 let mut cache = DataSourceSchemaCache::new();
994
995 let mut duck_tables = HashMap::new();
996 duck_tables.insert(
997 "users".to_string(),
998 EntitySchema {
999 name: "users".to_string(),
1000 columns: vec![FieldSchema {
1001 name: "id".to_string(),
1002 shape_type: "int".to_string(),
1003 nullable: false,
1004 }],
1005 },
1006 );
1007 cache.upsert_source(SourceSchema {
1008 uri: "duckdb://analytics.db".to_string(),
1009 tables: duck_tables,
1010 cached_at: "2026-02-17T00:00:00Z".to_string(),
1011 });
1012
1013 let mut pg_tables = HashMap::new();
1014 pg_tables.insert(
1015 "orders".to_string(),
1016 EntitySchema {
1017 name: "orders".to_string(),
1018 columns: vec![FieldSchema {
1019 name: "id".to_string(),
1020 shape_type: "int".to_string(),
1021 nullable: false,
1022 }],
1023 },
1024 );
1025 cache.upsert_source(SourceSchema {
1026 uri: "postgres://localhost/app".to_string(),
1027 tables: pg_tables,
1028 cached_at: "2026-02-17T00:00:00Z".to_string(),
1029 });
1030
1031 let dir = tempfile::tempdir().unwrap();
1032 let path = dir.path().join(CACHE_FILENAME);
1033 cache.save(&path).unwrap();
1034
1035 let duck_schemas =
1036 load_cached_type_schemas_for_uri_prefixes(&path, &["duckdb://"]).unwrap();
1037 assert_eq!(duck_schemas.len(), 1);
1038 assert_eq!(duck_schemas[0].name, "DbRow_users");
1039
1040 let pg_schemas =
1041 load_cached_type_schemas_for_uri_prefixes(&path, &["postgres://", "postgresql://"])
1042 .unwrap();
1043 assert_eq!(pg_schemas.len(), 1);
1044 assert_eq!(pg_schemas[0].name, "DbRow_orders");
1045 }
1046
1047 #[test]
1048 fn test_load_cached_source_for_uri_with_diagnostics() {
1049 let cache = sample_cache();
1050 let dir = tempfile::tempdir().unwrap();
1051 let path = dir.path().join(CACHE_FILENAME);
1052 cache.save(&path).unwrap();
1053
1054 let (source, diagnostics) =
1055 load_cached_source_for_uri_with_diagnostics(&path, "duckdb://analytics.db").unwrap();
1056 assert_eq!(source.uri, "duckdb://analytics.db");
1057 assert!(diagnostics.is_empty());
1058 }
1059
1060 #[test]
1061 fn test_default_cache_path_ends_with_shape_lock() {
1062 let path = default_cache_path();
1063 assert!(path.ends_with(CACHE_FILENAME));
1064 }
1065}