use crate::package_lock::{ArtifactDeterminism, LockedArtifact, PackageLock};
use crate::type_schema::{TypeSchema, TypeSchemaBuilder};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use shape_value::KindedSlot;
use shape_wire::WireValue;
use std::collections::{BTreeMap, HashMap};
use std::path::{Path, PathBuf};
use std::sync::RwLock;
pub const CACHE_FILENAME: &str = "shape.lock";
const SCHEMA_CACHE_VERSION: u32 = 1;
const SCHEMA_CACHE_NAMESPACE: &str = "external.datasource.schema";
const SCHEMA_CACHE_PRODUCER: &str = "shape-runtime/schema_cache@v1";
static DEFAULT_CACHE_PATH_OVERRIDE: std::sync::LazyLock<RwLock<Option<PathBuf>>> =
std::sync::LazyLock::new(|| RwLock::new(None));
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaCacheDiagnostic {
pub key: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSourceSchemaCache {
pub version: u32,
pub sources: HashMap<String, SourceSchema>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceSchema {
pub uri: String,
pub tables: HashMap<String, EntitySchema>,
pub cached_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntitySchema {
pub name: String,
pub columns: Vec<FieldSchema>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldSchema {
pub name: String,
#[serde(rename = "type")]
pub shape_type: String,
pub nullable: bool,
}
pub fn source_schema_to_nb(_schema: &SourceSchema) -> KindedSlot {
KindedSlot::none()
}
pub fn source_schema_from_nb(_value: &KindedSlot) -> Result<SourceSchema, String> {
Err("source_schema_from_nb: pending Phase 2c kind-threaded TypedObject decode — see ADR-006 §2.7.4".to_string())
}
pub fn source_schema_from_wire(value: &WireValue) -> Result<SourceSchema, String> {
let object = match value {
WireValue::Object(map) => map,
_ => return Err("schema payload must be an object".to_string()),
};
let uri = object
.get("uri")
.and_then(|v| match v {
WireValue::String(s) => Some(s.clone()),
_ => None,
})
.ok_or_else(|| "schema payload missing string field 'uri'".to_string())?;
let cached_at = object
.get("cached_at")
.and_then(|v| match v {
WireValue::String(s) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let tables_value = object
.get("tables")
.ok_or_else(|| "schema payload missing object field 'tables'".to_string())?;
let tables_object = match tables_value {
WireValue::Object(map) => map,
_ => return Err("schema payload field 'tables' must be an object".to_string()),
};
let mut tables = HashMap::new();
for (table_name, entity_value) in tables_object {
let entity_obj = match entity_value {
WireValue::Object(map) => map,
_ => return Err(format!("table '{table_name}' schema must be an object")),
};
let entity_name = entity_obj
.get("name")
.and_then(|v| match v {
WireValue::String(s) => Some(s.clone()),
_ => None,
})
.unwrap_or_else(|| table_name.clone());
let columns_value = entity_obj
.get("columns")
.ok_or_else(|| format!("table '{table_name}' missing 'columns' array"))?;
let columns_array = match columns_value {
WireValue::Array(values) => values,
_ => {
return Err(format!(
"table '{table_name}' field 'columns' must be an array"
));
}
};
let mut columns = Vec::new();
for column_value in columns_array {
let column_obj = match column_value {
WireValue::Object(map) => map,
_ => {
return Err(format!(
"table '{table_name}' contains non-object column entry"
));
}
};
let name = column_obj
.get("name")
.and_then(|v| match v {
WireValue::String(s) => Some(s.clone()),
_ => None,
})
.ok_or_else(|| format!("table '{table_name}' column missing string 'name'"))?;
let shape_type = column_obj
.get("type")
.or_else(|| column_obj.get("shape_type"))
.and_then(|v| match v {
WireValue::String(s) => Some(s.clone()),
_ => None,
})
.ok_or_else(|| format!("table '{table_name}' column '{name}' missing type"))?;
let nullable = column_obj
.get("nullable")
.and_then(|v| match v {
WireValue::Bool(b) => Some(*b),
_ => None,
})
.unwrap_or(false);
columns.push(FieldSchema {
name,
shape_type,
nullable,
});
}
tables.insert(
table_name.clone(),
EntitySchema {
name: entity_name,
columns,
},
);
}
Ok(SourceSchema {
uri,
tables,
cached_at,
})
}
impl DataSourceSchemaCache {
pub fn new() -> Self {
Self {
version: SCHEMA_CACHE_VERSION,
sources: HashMap::new(),
}
}
pub fn save(&self, path: &Path) -> std::io::Result<()> {
let mut lock = PackageLock::read(path).unwrap_or_default();
lock.artifacts
.retain(|artifact| artifact.namespace != SCHEMA_CACHE_NAMESPACE);
let mut uris: Vec<_> = self.sources.keys().cloned().collect();
uris.sort();
for uri in uris {
let Some(source) = self.sources.get(&uri) else {
continue;
};
let schema_hash = hash_source_schema(source);
let payload = source_to_payload(source);
let mut inputs = BTreeMap::new();
inputs.insert("uri".to_string(), uri.clone());
inputs.insert("schema_hash".to_string(), schema_hash.clone());
let determinism = ArtifactDeterminism::External {
fingerprints: BTreeMap::from([(format!("schema:{uri}"), schema_hash)]),
};
let artifact = LockedArtifact::new(
SCHEMA_CACHE_NAMESPACE,
uri,
SCHEMA_CACHE_PRODUCER,
determinism,
inputs,
payload,
)
.map_err(std::io::Error::other)?;
lock.upsert_artifact(artifact)
.map_err(std::io::Error::other)?;
}
lock.write(path)
}
pub fn load(path: &Path) -> std::io::Result<Self> {
let (cache, diagnostics) = Self::load_with_diagnostics(path)?;
if let Some(diag) = diagnostics.first() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid schema artifact '{}': {}", diag.key, diag.message),
));
}
Ok(cache)
}
pub fn load_with_diagnostics(
path: &Path,
) -> std::io::Result<(Self, Vec<SchemaCacheDiagnostic>)> {
let lock = PackageLock::read(path).ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "shape.lock not found")
})?;
let mut sources = HashMap::new();
let mut diagnostics = Vec::new();
for artifact in lock
.artifacts
.iter()
.filter(|artifact| artifact.namespace == SCHEMA_CACHE_NAMESPACE)
{
let payload = match artifact.payload() {
Ok(payload) => payload,
Err(err) => {
diagnostics.push(SchemaCacheDiagnostic {
key: artifact.key.clone(),
message: format!("payload decode failed: {err}"),
});
continue;
}
};
let source = match payload_to_source(&artifact.key, &payload) {
Ok(source) => source,
Err(err) => {
diagnostics.push(SchemaCacheDiagnostic {
key: artifact.key.clone(),
message: format!("payload parse failed: {err}"),
});
continue;
}
};
if let Some(expected_hash) = source_fingerprint_hash(artifact, &source.uri) {
let actual_hash = hash_source_schema(&source);
if expected_hash != actual_hash {
diagnostics.push(SchemaCacheDiagnostic {
key: artifact.key.clone(),
message: format!(
"stale schema fingerprint (expected {expected_hash}, computed {actual_hash})"
),
});
continue;
}
} else {
diagnostics.push(SchemaCacheDiagnostic {
key: artifact.key.clone(),
message: "missing schema fingerprint".to_string(),
});
continue;
}
sources.insert(source.uri.clone(), source);
}
Ok((
Self {
version: SCHEMA_CACHE_VERSION,
sources,
},
diagnostics,
))
}
pub fn load_or_empty(path: &Path) -> Self {
match Self::load(path) {
Ok(cache) => cache,
Err(_) => Self::new(),
}
}
pub fn get_source(&self, uri: &str) -> Option<&SourceSchema> {
self.sources.get(uri)
}
pub fn upsert_source(&mut self, schema: SourceSchema) {
self.sources.insert(schema.uri.clone(), schema);
}
pub fn is_offline() -> bool {
std::env::var("SHAPE_OFFLINE")
.map(|value| value == "true" || value == "1")
.unwrap_or(false)
}
}
pub fn load_cached_type_schemas_for_uri_prefixes(
cache_path: &Path,
uri_prefixes: &[&str],
) -> std::io::Result<Vec<TypeSchema>> {
let (schemas, _diagnostics) =
load_cached_type_schemas_for_uri_prefixes_with_diagnostics(cache_path, uri_prefixes)?;
Ok(schemas)
}
pub fn load_cached_type_schemas_for_uri_prefixes_with_diagnostics(
cache_path: &Path,
uri_prefixes: &[&str],
) -> std::io::Result<(Vec<TypeSchema>, Vec<SchemaCacheDiagnostic>)> {
let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
let mut schemas = Vec::new();
for source in cache.sources.values() {
if !uri_prefixes.is_empty()
&& !uri_prefixes
.iter()
.any(|prefix| source.uri.starts_with(prefix))
{
continue;
}
for table in source.tables.values() {
schemas.push(type_schema_from_entity(table));
}
}
Ok((schemas, diagnostics))
}
pub fn default_cache_path() -> PathBuf {
if let Ok(guard) = DEFAULT_CACHE_PATH_OVERRIDE.read() {
if let Some(path) = guard.as_ref() {
return path.clone();
}
}
std::env::current_dir()
.unwrap_or_default()
.join(CACHE_FILENAME)
}
pub fn set_default_cache_path(path: Option<PathBuf>) {
if let Ok(mut guard) = DEFAULT_CACHE_PATH_OVERRIDE.write() {
*guard = path;
}
}
pub fn load_cached_source_for_uri(cache_path: &Path, uri: &str) -> std::io::Result<SourceSchema> {
let (source, diagnostics) = load_cached_source_for_uri_with_diagnostics(cache_path, uri)?;
if let Some(diag) = diagnostics.first() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid schema artifact '{}': {}", diag.key, diag.message),
));
}
Ok(source)
}
pub fn load_cached_source_for_uri_with_diagnostics(
cache_path: &Path,
uri: &str,
) -> std::io::Result<(SourceSchema, Vec<SchemaCacheDiagnostic>)> {
let (cache, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(cache_path)?;
let source = cache.get_source(uri).cloned().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("no cached schema for '{uri}'"),
)
})?;
Ok((source, diagnostics))
}
impl Default for DataSourceSchemaCache {
fn default() -> Self {
Self::new()
}
}
impl SourceSchema {
pub fn get_entity(&self, name: &str) -> Option<&EntitySchema> {
self.tables.get(name)
}
pub fn entity_names(&self) -> Vec<&str> {
self.tables.keys().map(|name| name.as_str()).collect()
}
}
impl EntitySchema {
pub fn get_field(&self, name: &str) -> Option<&FieldSchema> {
self.columns.iter().find(|column| column.name == name)
}
pub fn field_names(&self) -> Vec<&str> {
self.columns
.iter()
.map(|column| column.name.as_str())
.collect()
}
}
fn hash_source_schema(source: &SourceSchema) -> String {
let mut hasher = Sha256::new();
hasher.update(source.uri.as_bytes());
hasher.update([0]);
let mut entity_names: Vec<_> = source.tables.keys().cloned().collect();
entity_names.sort();
for entity_name in entity_names {
hasher.update(entity_name.as_bytes());
hasher.update([0]);
if let Some(entity) = source.tables.get(&entity_name) {
for field in &entity.columns {
hasher.update(field.name.as_bytes());
hasher.update([0]);
hasher.update(field.shape_type.as_bytes());
hasher.update([0]);
hasher.update([if field.nullable { 1 } else { 0 }]);
}
}
}
format!("sha256:{:x}", hasher.finalize())
}
fn source_fingerprint_hash<'a>(artifact: &'a LockedArtifact, uri: &str) -> Option<&'a str> {
if let ArtifactDeterminism::External { fingerprints } = &artifact.determinism {
let key = format!("schema:{uri}");
if let Some(value) = fingerprints.get(&key) {
return Some(value.as_str());
}
}
artifact.inputs.get("schema_hash").map(String::as_str)
}
fn type_schema_from_entity(entity: &EntitySchema) -> TypeSchema {
let schema_name = format!("DbRow_{}", entity.name);
let builder =
entity
.columns
.iter()
.fold(
TypeSchemaBuilder::new(&schema_name),
|builder, field| match field.shape_type.as_str() {
"int" => builder.i64_field(&field.name),
"number" => builder.f64_field(&field.name),
"string" => builder.string_field(&field.name),
"bool" => builder.bool_field(&field.name),
"timestamp" => builder.timestamp_field(&field.name),
_ => builder.any_field(&field.name),
},
);
builder.build()
}
fn source_to_payload(source: &SourceSchema) -> shape_wire::WireValue {
let mut entities: Vec<_> = source.tables.values().collect();
entities.sort_by(|left, right| left.name.cmp(&right.name));
let entity_values = entities
.into_iter()
.map(|entity| {
let field_values = entity
.columns
.iter()
.map(|field| {
shape_wire::WireValue::Object(BTreeMap::from([
(
"name".to_string(),
shape_wire::WireValue::String(field.name.clone()),
),
(
"shape_type".to_string(),
shape_wire::WireValue::String(field.shape_type.clone()),
),
(
"nullable".to_string(),
shape_wire::WireValue::Bool(field.nullable),
),
]))
})
.collect::<Vec<_>>();
shape_wire::WireValue::Object(BTreeMap::from([
(
"name".to_string(),
shape_wire::WireValue::String(entity.name.clone()),
),
(
"columns".to_string(),
shape_wire::WireValue::Array(field_values),
),
]))
})
.collect::<Vec<_>>();
shape_wire::WireValue::Object(BTreeMap::from([
(
"uri".to_string(),
shape_wire::WireValue::String(source.uri.clone()),
),
(
"cached_at".to_string(),
shape_wire::WireValue::String(source.cached_at.clone()),
),
(
"tables".to_string(),
shape_wire::WireValue::Array(entity_values),
),
]))
}
fn payload_to_source(
key_hint: &str,
payload: &shape_wire::WireValue,
) -> Result<SourceSchema, String> {
let shape_wire::WireValue::Object(map) = payload else {
return Err("source payload must be an object".to_string());
};
let uri = map
.get("uri")
.and_then(shape_wire::WireValue::as_str)
.map(|value| value.to_string())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| key_hint.to_string());
let cached_at = map
.get("cached_at")
.and_then(shape_wire::WireValue::as_str)
.unwrap_or("")
.to_string();
let tables_value = map
.get("tables")
.ok_or_else(|| "source payload missing 'tables'".to_string())?;
let shape_wire::WireValue::Array(table_values) = tables_value else {
return Err("source payload 'tables' must be an array".to_string());
};
let mut tables = HashMap::new();
for table_value in table_values {
let entity = payload_to_entity(table_value)?;
tables.insert(entity.name.clone(), entity);
}
Ok(SourceSchema {
uri,
tables,
cached_at,
})
}
fn payload_to_entity(value: &shape_wire::WireValue) -> Result<EntitySchema, String> {
let shape_wire::WireValue::Object(table_map) = value else {
return Err("table payload must be an object".to_string());
};
let name = table_map
.get("name")
.and_then(shape_wire::WireValue::as_str)
.ok_or_else(|| "table payload missing 'name'".to_string())?
.to_string();
let columns_value = table_map
.get("columns")
.ok_or_else(|| "table payload missing 'columns'".to_string())?;
let shape_wire::WireValue::Array(column_values) = columns_value else {
return Err("table payload 'columns' must be an array".to_string());
};
let mut columns = Vec::with_capacity(column_values.len());
for column_value in column_values {
columns.push(payload_to_field(column_value)?);
}
Ok(EntitySchema { name, columns })
}
fn payload_to_field(value: &shape_wire::WireValue) -> Result<FieldSchema, String> {
let shape_wire::WireValue::Object(column_map) = value else {
return Err("column payload must be an object".to_string());
};
let name = column_map
.get("name")
.and_then(shape_wire::WireValue::as_str)
.ok_or_else(|| "column payload missing 'name'".to_string())?
.to_string();
let shape_type = column_map
.get("shape_type")
.and_then(shape_wire::WireValue::as_str)
.ok_or_else(|| "column payload missing 'shape_type'".to_string())?
.to_string();
let nullable = column_map
.get("nullable")
.and_then(shape_wire::WireValue::as_bool)
.ok_or_else(|| "column payload missing 'nullable'".to_string())?;
Ok(FieldSchema {
name,
shape_type,
nullable,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::package_lock::ArtifactDeterminism;
fn sample_cache() -> DataSourceSchemaCache {
let mut cache = DataSourceSchemaCache::new();
let mut tables = HashMap::new();
tables.insert(
"users".to_string(),
EntitySchema {
name: "users".to_string(),
columns: vec![
FieldSchema {
name: "id".to_string(),
shape_type: "int".to_string(),
nullable: false,
},
FieldSchema {
name: "name".to_string(),
shape_type: "string".to_string(),
nullable: false,
},
FieldSchema {
name: "age".to_string(),
shape_type: "int".to_string(),
nullable: true,
},
],
},
);
cache.upsert_source(SourceSchema {
uri: "duckdb://analytics.db".to_string(),
tables,
cached_at: "2026-02-12T10:00:00Z".to_string(),
});
cache
}
#[test]
fn test_roundtrip_serialization() {
let cache = sample_cache();
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join(CACHE_FILENAME);
cache.save(&path).unwrap();
let loaded = DataSourceSchemaCache::load(&path).unwrap();
assert_eq!(loaded.version, SCHEMA_CACHE_VERSION);
assert_eq!(loaded.sources.len(), 1);
let conn = loaded.get_source("duckdb://analytics.db").unwrap();
assert_eq!(conn.tables.len(), 1);
let users = conn.get_entity("users").unwrap();
assert_eq!(users.columns.len(), 3);
assert_eq!(users.columns[0].name, "id");
assert_eq!(users.columns[0].shape_type, "int");
assert!(!users.columns[0].nullable);
assert_eq!(users.columns[2].name, "age");
assert!(users.columns[2].nullable);
}
#[test]
fn test_load_or_empty_missing_file() {
let cache = DataSourceSchemaCache::load_or_empty(Path::new("/nonexistent/path.toml"));
assert_eq!(cache.version, SCHEMA_CACHE_VERSION);
assert!(cache.sources.is_empty());
}
#[test]
fn test_source_helpers() {
let cache = sample_cache();
let conn = cache.get_source("duckdb://analytics.db").unwrap();
let names = conn.entity_names();
assert!(names.contains(&"users"));
let users = conn.get_entity("users").unwrap();
assert_eq!(users.field_names(), vec!["id", "name", "age"]);
assert!(users.get_field("id").is_some());
assert!(users.get_field("nonexistent").is_none());
}
#[test]
fn test_upsert_source() {
let mut cache = DataSourceSchemaCache::new();
assert!(cache.sources.is_empty());
cache.upsert_source(SourceSchema {
uri: "duckdb://test.db".to_string(),
tables: HashMap::new(),
cached_at: "2026-01-01T00:00:00Z".to_string(),
});
assert_eq!(cache.sources.len(), 1);
cache.upsert_source(SourceSchema {
uri: "duckdb://test.db".to_string(),
tables: HashMap::new(),
cached_at: "2026-02-01T00:00:00Z".to_string(),
});
assert_eq!(cache.sources.len(), 1);
assert_eq!(
cache.get_source("duckdb://test.db").unwrap().cached_at,
"2026-02-01T00:00:00Z"
);
}
#[test]
fn test_load_with_diagnostics_reports_stale_fingerprint() {
let cache = sample_cache();
let source = cache.get_source("duckdb://analytics.db").unwrap();
let payload = source_to_payload(source);
let artifact = LockedArtifact::new(
SCHEMA_CACHE_NAMESPACE,
source.uri.clone(),
SCHEMA_CACHE_PRODUCER,
ArtifactDeterminism::External {
fingerprints: BTreeMap::from([(
format!("schema:{}", source.uri),
"sha256:deadbeef".to_string(),
)]),
},
BTreeMap::from([
("uri".to_string(), source.uri.clone()),
("schema_hash".to_string(), "sha256:deadbeef".to_string()),
]),
payload,
)
.unwrap();
let mut lock = PackageLock::new();
lock.upsert_artifact(artifact).unwrap();
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join(CACHE_FILENAME);
lock.write(&path).unwrap();
let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
assert!(loaded.sources.is_empty());
assert_eq!(diagnostics.len(), 1);
assert!(diagnostics[0].message.contains("stale schema fingerprint"));
}
#[test]
fn test_load_with_diagnostics_reports_invalid_payload() {
let artifact = LockedArtifact::new(
SCHEMA_CACHE_NAMESPACE,
"broken://source",
SCHEMA_CACHE_PRODUCER,
ArtifactDeterminism::External {
fingerprints: BTreeMap::from([(
"schema:broken://source".to_string(),
"sha256:abc".to_string(),
)]),
},
BTreeMap::from([
("uri".to_string(), "broken://source".to_string()),
("schema_hash".to_string(), "sha256:abc".to_string()),
]),
shape_wire::WireValue::String("bad".to_string()),
)
.unwrap();
let mut lock = PackageLock::new();
lock.upsert_artifact(artifact).unwrap();
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join(CACHE_FILENAME);
lock.write(&path).unwrap();
let (loaded, diagnostics) = DataSourceSchemaCache::load_with_diagnostics(&path).unwrap();
assert!(loaded.sources.is_empty());
assert_eq!(diagnostics.len(), 1);
assert!(diagnostics[0].message.contains("payload parse failed"));
}
#[test]
fn test_load_cached_type_schemas_for_uri_prefixes_filters_sources() {
let mut cache = DataSourceSchemaCache::new();
let mut duck_tables = HashMap::new();
duck_tables.insert(
"users".to_string(),
EntitySchema {
name: "users".to_string(),
columns: vec![FieldSchema {
name: "id".to_string(),
shape_type: "int".to_string(),
nullable: false,
}],
},
);
cache.upsert_source(SourceSchema {
uri: "duckdb://analytics.db".to_string(),
tables: duck_tables,
cached_at: "2026-02-17T00:00:00Z".to_string(),
});
let mut pg_tables = HashMap::new();
pg_tables.insert(
"orders".to_string(),
EntitySchema {
name: "orders".to_string(),
columns: vec![FieldSchema {
name: "id".to_string(),
shape_type: "int".to_string(),
nullable: false,
}],
},
);
cache.upsert_source(SourceSchema {
uri: "postgres://localhost/app".to_string(),
tables: pg_tables,
cached_at: "2026-02-17T00:00:00Z".to_string(),
});
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join(CACHE_FILENAME);
cache.save(&path).unwrap();
let duck_schemas =
load_cached_type_schemas_for_uri_prefixes(&path, &["duckdb://"]).unwrap();
assert_eq!(duck_schemas.len(), 1);
assert_eq!(duck_schemas[0].name, "DbRow_users");
let pg_schemas =
load_cached_type_schemas_for_uri_prefixes(&path, &["postgres://", "postgresql://"])
.unwrap();
assert_eq!(pg_schemas.len(), 1);
assert_eq!(pg_schemas[0].name, "DbRow_orders");
}
#[test]
fn test_load_cached_source_for_uri_with_diagnostics() {
let cache = sample_cache();
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join(CACHE_FILENAME);
cache.save(&path).unwrap();
let (source, diagnostics) =
load_cached_source_for_uri_with_diagnostics(&path, "duckdb://analytics.db").unwrap();
assert_eq!(source.uri, "duckdb://analytics.db");
assert!(diagnostics.is_empty());
}
#[test]
fn test_default_cache_path_ends_with_shape_lock() {
let path = default_cache_path();
assert!(path.ends_with(CACHE_FILENAME));
}
}