#![allow(clippy::unwrap_used)] #![allow(clippy::iter_on_single_items)]
use async_trait::async_trait;
use serde_json::json;
use super::*;
use crate::{
cache::{CacheConfig, FactTableVersionStrategy},
db::WhereOperator,
schema::CompiledSchema,
};
struct MockAdapter {
call_count: std::sync::atomic::AtomicU32,
raw_call_count: std::sync::atomic::AtomicU32,
}
impl MockAdapter {
fn new() -> Self {
Self {
call_count: std::sync::atomic::AtomicU32::new(0),
raw_call_count: std::sync::atomic::AtomicU32::new(0),
}
}
fn call_count(&self) -> u32 {
self.call_count.load(std::sync::atomic::Ordering::SeqCst)
+ self.raw_call_count.load(std::sync::atomic::Ordering::SeqCst)
}
}
#[async_trait]
impl DatabaseAdapter for MockAdapter {
async fn execute_with_projection(
&self,
_view: &str,
_projection: Option<&crate::schema::SqlProjectionHint>,
_where_clause: Option<&WhereClause>,
_limit: Option<u32>,
_offset: Option<u32>,
_order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
self.call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(vec![
JsonbValue::new(json!({"id": 1, "name": "Alice"})),
JsonbValue::new(json!({"id": 2, "name": "Bob"})),
])
}
async fn execute_where_query(
&self,
_view: &str,
_where_clause: Option<&WhereClause>,
_limit: Option<u32>,
_offset: Option<u32>,
_order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
self.call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(vec![
JsonbValue::new(json!({"id": 1, "name": "Alice"})),
JsonbValue::new(json!({"id": 2, "name": "Bob"})),
])
}
fn database_type(&self) -> DatabaseType {
DatabaseType::PostgreSQL
}
async fn health_check(&self) -> Result<()> {
Ok(())
}
fn pool_metrics(&self) -> PoolMetrics {
PoolMetrics {
total_connections: 10,
idle_connections: 5,
active_connections: 3,
waiting_requests: 0,
}
}
async fn execute_raw_query(
&self,
_sql: &str,
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
self.raw_call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut row = std::collections::HashMap::new();
row.insert("count".to_string(), json!(42));
Ok(vec![row])
}
async fn execute_parameterized_aggregate(
&self,
_sql: &str,
_params: &[serde_json::Value],
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
self.raw_call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut row = std::collections::HashMap::new();
row.insert("count".to_string(), json!(42));
Ok(vec![row])
}
async fn execute_function_call(
&self,
_function_name: &str,
_args: &[serde_json::Value],
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
Ok(vec![])
}
}
impl SupportsMutations for MockAdapter {}
#[tokio::test]
async fn test_cache_miss_then_hit() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["active".to_string()],
operator: WhereOperator::Eq,
value: json!(true),
};
let result1 = adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.expect("mock adapter must not fail on first query");
assert_eq!(result1.len(), 2);
assert_eq!(adapter.inner().call_count(), 1);
let result2 = adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.expect("mock adapter must not fail on second query");
assert_eq!(result2.len(), 2);
assert_eq!(adapter.inner().call_count(), 1); }
#[tokio::test]
async fn test_different_where_clauses_produce_different_cache_entries() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where1 = WhereClause::Field {
path: vec!["id".to_string()],
operator: WhereOperator::Eq,
value: json!(1),
};
let where2 = WhereClause::Field {
path: vec!["id".to_string()],
operator: WhereOperator::Eq,
value: json!(2),
};
adapter
.execute_where_query("v_user", Some(&where1), None, None, None)
.await
.expect("mock adapter must not fail");
assert_eq!(adapter.inner().call_count(), 1);
adapter
.execute_where_query("v_user", Some(&where2), None, None, None)
.await
.expect("mock adapter must not fail");
assert_eq!(adapter.inner().call_count(), 2);
}
#[tokio::test]
async fn test_invalidation_clears_cache() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let invalidated = adapter
.invalidate_views(&["v_user".to_string()])
.expect("invalidate_views must succeed");
assert_eq!(invalidated, 1);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
}
#[tokio::test]
async fn test_different_limits_produce_different_cache_entries() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
adapter
.execute_where_query("v_user", None, Some(10), None, None)
.await
.expect("mock adapter must not fail");
assert_eq!(adapter.inner().call_count(), 1);
adapter.execute_where_query("v_user", None, Some(20), None, None).await.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
}
#[tokio::test]
async fn test_cache_disabled() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::disabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
}
#[tokio::test]
async fn test_all_queries_are_cached() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter
.execute_where_query("v_user", None, Some(1000), None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
adapter
.execute_where_query("v_user", None, Some(1000), None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
let where_clause = WhereClause::Field {
path: vec!["id".to_string()],
operator: WhereOperator::Eq,
value: json!(1),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 3);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 3); }
#[tokio::test]
async fn test_schema_version_change_invalidates_cache() {
let cache = Arc::new(QueryResultCache::new(CacheConfig::enabled()));
let version_provider = Arc::new(FactTableVersionProvider::default());
let mock1 = MockAdapter::new();
let adapter_v1 = CachedDatabaseAdapter {
adapter: mock1,
cache: Arc::clone(&cache),
schema_version: "1.0.0".to_string(),
view_ttl_overrides: HashMap::new(),
cacheable_views: std::collections::HashSet::new(),
opt_in_mode: false,
fact_table_config: FactTableCacheConfig::default(),
version_provider: Arc::clone(&version_provider),
cascade_invalidator: None,
};
adapter_v1.execute_where_query("v_user", None, None, None, None).await.unwrap();
let mock2 = MockAdapter::new();
let adapter_v2 = CachedDatabaseAdapter {
adapter: mock2,
cache: Arc::clone(&cache),
schema_version: "2.0.0".to_string(),
view_ttl_overrides: HashMap::new(),
cacheable_views: std::collections::HashSet::new(),
opt_in_mode: false,
fact_table_config: FactTableCacheConfig::default(),
version_provider: Arc::clone(&version_provider),
cascade_invalidator: None,
};
adapter_v2.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(adapter_v2.inner().call_count(), 1); }
#[tokio::test]
async fn test_forwards_database_type() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
assert_eq!(adapter.database_type(), DatabaseType::PostgreSQL);
}
#[tokio::test]
async fn test_forwards_health_check() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
adapter.health_check().await.unwrap();
}
#[tokio::test]
async fn test_forwards_pool_metrics() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let metrics = adapter.pool_metrics();
assert_eq!(metrics.total_connections, 10);
assert_eq!(metrics.idle_connections, 5);
}
#[tokio::test]
async fn test_inner_and_cache_accessors() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
assert_eq!(adapter.inner().call_count(), 0);
let cache_metrics = adapter.cache().metrics().unwrap();
assert_eq!(cache_metrics.hits, 0);
assert_eq!(adapter.schema_version(), "1.0.0");
}
use super::super::cascade_response_parser::CascadeResponseParser;
#[tokio::test]
async fn test_invalidate_cascade_entities_with_single_entity() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let cascade_response = json!({
"createPost": {
"cascade": {
"updated": [
{
"__typename": "User",
"id": "550e8400-e29b-41d4-a716-446655440000"
}
],
"deleted": []
}
}
});
let parser = CascadeResponseParser::new();
let invalidated = adapter.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
assert_eq!(invalidated, 1);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
}
#[tokio::test]
async fn test_invalidate_cascade_entities_with_multiple_entities() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_comment", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 3);
let cascade_response = json!({
"updateUser": {
"cascade": {
"updated": [
{"__typename": "User", "id": "u-1"},
{"__typename": "Post", "id": "p-1"},
{"__typename": "Comment", "id": "c-1"}
],
"deleted": []
}
}
});
let parser = CascadeResponseParser::new();
let invalidated = adapter.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
assert_eq!(invalidated, 3);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_comment", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 6);
}
#[tokio::test]
async fn test_invalidate_cascade_entities_with_deleted_entities() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_comment", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
let cascade_response = json!({
"deletePost": {
"cascade": {
"updated": [],
"deleted": [
{"__typename": "Post", "id": "p-123"},
{"__typename": "Comment", "id": "c-456"}
]
}
}
});
let parser = CascadeResponseParser::new();
let invalidated = adapter.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
assert_eq!(invalidated, 2);
adapter
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_comment", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 4);
}
#[tokio::test]
async fn test_invalidate_cascade_entities_with_no_cascade_field() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let cascade_response = json!({
"createPost": {
"post": {
"id": "p-123",
"title": "Hello"
}
}
});
let parser = CascadeResponseParser::new();
let invalidated = adapter.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
assert_eq!(invalidated, 0);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1); }
#[tokio::test]
async fn test_invalidate_cascade_entities_mixed_updated_and_deleted() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
let cascade_response = json!({
"mutation": {
"cascade": {
"updated": [
{"__typename": "User", "id": "u-1"}
],
"deleted": [
{"__typename": "Post", "id": "p-1"}
]
}
}
});
let parser = CascadeResponseParser::new();
let invalidated = adapter.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
assert_eq!(invalidated, 2);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 4);
}
#[tokio::test]
async fn test_cascade_invalidation_deduplicates_entity_types() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let cascade_response = json!({
"mutation": {
"cascade": {
"updated": [
{"__typename": "User", "id": "u-1"},
{"__typename": "User", "id": "u-2"},
{"__typename": "User", "id": "u-3"}
],
"deleted": []
}
}
});
let parser = CascadeResponseParser::new();
let invalidated = adapter.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
assert_eq!(invalidated, 1);
}
#[tokio::test]
async fn test_cascade_invalidation_vs_view_invalidation_same_result() {
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
let mock1 = MockAdapter::new();
let cache1 = QueryResultCache::new(CacheConfig::enabled());
let adapter1 = CachedDatabaseAdapter::new(mock1, cache1, "1.0.0".to_string());
adapter1
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter1
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
let cascade_response = json!({
"mutation": {
"cascade": {
"updated": [
{"__typename": "User", "id": "u-1"},
{"__typename": "Post", "id": "p-1"}
],
"deleted": []
}
}
});
let parser = CascadeResponseParser::new();
let invalidated_cascade =
adapter1.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
let mock2 = MockAdapter::new();
let cache2 = QueryResultCache::new(CacheConfig::enabled());
let adapter2 = CachedDatabaseAdapter::new(mock2, cache2, "1.0.0".to_string());
adapter2
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter2
.execute_where_query("v_post", Some(&where_clause), None, None, None)
.await
.unwrap();
let invalidated_views = adapter2
.invalidate_views(&["v_user".to_string(), "v_post".to_string()])
.unwrap();
assert_eq!(invalidated_cascade, 2);
assert_eq!(invalidated_views, 2);
}
#[tokio::test]
async fn test_cascade_invalidation_with_empty_cascade() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["status".to_string()],
operator: WhereOperator::Eq,
value: json!("active"),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let cascade_response = json!({
"mutation": {
"cascade": {
"updated": [],
"deleted": []
}
}
});
let parser = CascadeResponseParser::new();
let invalidated = adapter.invalidate_cascade_entities(&cascade_response, &parser).unwrap();
assert_eq!(invalidated, 0);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1); }
#[test]
fn test_extract_fact_table_from_sql() {
assert_eq!(
CachedDatabaseAdapter::<MockAdapter>::extract_fact_table_from_sql(
"SELECT SUM(revenue) FROM tf_sales WHERE year = 2024"
),
Some("tf_sales".to_string())
);
assert_eq!(
CachedDatabaseAdapter::<MockAdapter>::extract_fact_table_from_sql(
"SELECT COUNT(*) FROM tf_page_views"
),
Some("tf_page_views".to_string())
);
assert_eq!(
CachedDatabaseAdapter::<MockAdapter>::extract_fact_table_from_sql(
"select sum(x) FROM TF_EVENTS"
),
Some("tf_events".to_string())
);
assert_eq!(
CachedDatabaseAdapter::<MockAdapter>::extract_fact_table_from_sql(
"SELECT * FROM users WHERE id = 1"
),
None
);
assert_eq!(
CachedDatabaseAdapter::<MockAdapter>::extract_fact_table_from_sql("SELECT 1 + 1"),
None
);
}
#[test]
fn test_generate_aggregation_cache_key() {
let key1 = CachedDatabaseAdapter::<MockAdapter>::generate_aggregation_cache_key(
"SELECT SUM(x) FROM tf_sales",
"1.0.0",
Some("tv:42"),
);
let key2 = CachedDatabaseAdapter::<MockAdapter>::generate_aggregation_cache_key(
"SELECT SUM(x) FROM tf_sales",
"1.0.0",
Some("tv:43"), );
let key3 = CachedDatabaseAdapter::<MockAdapter>::generate_aggregation_cache_key(
"SELECT SUM(x) FROM tf_sales",
"2.0.0", Some("tv:42"),
);
assert_ne!(key1, key2);
assert_ne!(key1, key3);
assert_ne!(key2, key3);
}
#[tokio::test]
async fn test_aggregation_caching_time_based() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let mut ft_config = FactTableCacheConfig::default();
ft_config.set_strategy("tf_sales", FactTableVersionStrategy::TimeBased { ttl_seconds: 300 });
let adapter =
CachedDatabaseAdapter::with_fact_table_config(mock, cache, "1.0.0".to_string(), ft_config);
let _ = adapter
.execute_aggregation_query("SELECT SUM(revenue) FROM tf_sales")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let _ = adapter
.execute_aggregation_query("SELECT SUM(revenue) FROM tf_sales")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1); }
#[tokio::test]
async fn test_aggregation_caching_schema_version() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let mut ft_config = FactTableCacheConfig::default();
ft_config.set_strategy("tf_historical_rates", FactTableVersionStrategy::SchemaVersion);
let adapter =
CachedDatabaseAdapter::with_fact_table_config(mock, cache, "1.0.0".to_string(), ft_config);
let _ = adapter
.execute_aggregation_query("SELECT AVG(rate) FROM tf_historical_rates")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let _ = adapter
.execute_aggregation_query("SELECT AVG(rate) FROM tf_historical_rates")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1); }
#[tokio::test]
async fn test_aggregation_caching_disabled_by_default() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::default());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let _ = adapter
.execute_aggregation_query("SELECT SUM(revenue) FROM tf_sales")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let _ = adapter
.execute_aggregation_query("SELECT SUM(revenue) FROM tf_sales")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2); }
#[tokio::test]
async fn test_aggregation_caching_non_fact_table() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let ft_config = FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
let adapter =
CachedDatabaseAdapter::with_fact_table_config(mock, cache, "1.0.0".to_string(), ft_config);
let _ = adapter.execute_aggregation_query("SELECT COUNT(*) FROM users").await.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let _ = adapter.execute_aggregation_query("SELECT COUNT(*) FROM users").await.unwrap();
assert_eq!(adapter.inner().call_count(), 2); }
#[tokio::test]
async fn test_aggregation_caching_different_queries() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let mut ft_config = FactTableCacheConfig::default();
ft_config.set_strategy("tf_sales", FactTableVersionStrategy::SchemaVersion);
let adapter =
CachedDatabaseAdapter::with_fact_table_config(mock, cache, "1.0.0".to_string(), ft_config);
let _ = adapter
.execute_aggregation_query("SELECT SUM(revenue) FROM tf_sales WHERE year = 2024")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
let _ = adapter
.execute_aggregation_query("SELECT SUM(revenue) FROM tf_sales WHERE year = 2023")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
let _ = adapter
.execute_aggregation_query("SELECT SUM(revenue) FROM tf_sales WHERE year = 2024")
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2); }
#[tokio::test]
async fn test_fact_table_config_accessor() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let mut ft_config = FactTableCacheConfig::default();
ft_config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
let adapter =
CachedDatabaseAdapter::with_fact_table_config(mock, cache, "1.0.0".to_string(), ft_config);
assert_eq!(
adapter.fact_table_config().get_strategy("tf_sales"),
&FactTableVersionStrategy::VersionTable
);
assert_eq!(
adapter.fact_table_config().get_strategy("tf_other"),
&FactTableVersionStrategy::Disabled
);
}
#[tokio::test]
async fn test_cascade_invalidator_expands_transitive_views() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let mut cascade = CascadeInvalidator::new();
cascade.add_dependency("v_user_stats", "v_user").unwrap();
cascade.add_dependency("v_dashboard", "v_user_stats").unwrap();
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string())
.with_cascade_invalidator(cascade);
let where_clause = WhereClause::Field {
path: vec!["id".to_string()],
operator: WhereOperator::Eq,
value: json!(1),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_user_stats", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_dashboard", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 3);
let count = adapter.invalidate_views(&["v_user".to_string()]).unwrap();
assert_eq!(count, 3, "All three views should be invalidated via cascade");
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_user_stats", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_dashboard", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(
adapter.inner().call_count(),
6,
"All three should be cache misses after cascade"
);
}
#[tokio::test]
async fn test_no_cascade_invalidator_only_direct_views() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
let where_clause = WhereClause::Field {
path: vec!["id".to_string()],
operator: WhereOperator::Eq,
value: json!(1),
};
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_user_stats", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 2);
let count = adapter.invalidate_views(&["v_user".to_string()]).unwrap();
assert_eq!(count, 1);
adapter
.execute_where_query("v_user", Some(&where_clause), None, None, None)
.await
.unwrap();
adapter
.execute_where_query("v_user_stats", Some(&where_clause), None, None, None)
.await
.unwrap();
assert_eq!(
adapter.inner().call_count(),
3,
"Only v_user should be a miss; v_user_stats is still cached"
);
}
struct BumpAdapter {
bump_call_count: std::sync::atomic::AtomicU32,
}
impl BumpAdapter {
fn new() -> Self {
Self {
bump_call_count: std::sync::atomic::AtomicU32::new(0),
}
}
fn bump_call_count(&self) -> u32 {
self.bump_call_count.load(std::sync::atomic::Ordering::SeqCst)
}
}
#[async_trait]
impl DatabaseAdapter for BumpAdapter {
async fn execute_where_query(
&self,
_view: &str,
_where_clause: Option<&WhereClause>,
_limit: Option<u32>,
_offset: Option<u32>,
_order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
Ok(vec![])
}
async fn execute_with_projection(
&self,
_view: &str,
_projection: Option<&crate::schema::SqlProjectionHint>,
_where_clause: Option<&WhereClause>,
_limit: Option<u32>,
_offset: Option<u32>,
_order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
Ok(vec![])
}
fn database_type(&self) -> DatabaseType {
DatabaseType::PostgreSQL
}
async fn health_check(&self) -> Result<()> {
Ok(())
}
fn pool_metrics(&self) -> PoolMetrics {
PoolMetrics {
total_connections: 1,
idle_connections: 1,
active_connections: 0,
waiting_requests: 0,
}
}
async fn execute_raw_query(
&self,
_sql: &str,
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
Ok(vec![])
}
async fn execute_parameterized_aggregate(
&self,
_sql: &str,
_params: &[serde_json::Value],
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
Ok(vec![])
}
async fn execute_function_call(
&self,
function_name: &str,
_args: &[serde_json::Value],
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
if function_name == "bump_tf_version" {
let n = self.bump_call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let new_version = i64::from(n) + 2; let mut row = std::collections::HashMap::new();
row.insert("bump_tf_version".to_string(), json!(new_version));
Ok(vec![row])
} else {
Ok(vec![])
}
}
}
#[tokio::test]
async fn test_bump_fact_table_versions_updates_version_cache() {
let mut ft_config = FactTableCacheConfig::default();
ft_config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
let adapter = CachedDatabaseAdapter::with_fact_table_config(
BumpAdapter::new(),
QueryResultCache::new(CacheConfig::enabled()),
"1.0.0".to_string(),
ft_config,
);
assert!(adapter.version_provider().get_cached_version("tf_sales").is_none());
adapter.bump_fact_table_versions(&["tf_sales".to_string()]).await.unwrap();
assert_eq!(adapter.inner().bump_call_count(), 1);
assert_eq!(adapter.version_provider().get_cached_version("tf_sales"), Some(2));
}
#[tokio::test]
async fn test_bump_fact_table_versions_skips_non_version_table_strategy() {
let mut ft_config = FactTableCacheConfig::default();
ft_config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
ft_config.set_strategy("tf_events", FactTableVersionStrategy::TimeBased { ttl_seconds: 300 });
ft_config.set_strategy("tf_hist", FactTableVersionStrategy::SchemaVersion);
let adapter = CachedDatabaseAdapter::with_fact_table_config(
BumpAdapter::new(),
QueryResultCache::new(CacheConfig::enabled()),
"1.0.0".to_string(),
ft_config,
);
adapter
.bump_fact_table_versions(&[
"tf_sales".to_string(),
"tf_events".to_string(),
"tf_hist".to_string(),
])
.await
.unwrap();
assert_eq!(
adapter.inner().bump_call_count(),
1,
"Only VersionTable strategy calls bump_tf_version"
);
assert!(adapter.version_provider().get_cached_version("tf_sales").is_some());
assert!(adapter.version_provider().get_cached_version("tf_events").is_none());
assert!(adapter.version_provider().get_cached_version("tf_hist").is_none());
}
#[tokio::test]
async fn test_bump_fact_table_versions_empty_list_is_noop() {
let adapter = CachedDatabaseAdapter::new(
BumpAdapter::new(),
QueryResultCache::new(CacheConfig::enabled()),
"1.0.0".to_string(),
);
adapter.bump_fact_table_versions(&[]).await.unwrap();
assert_eq!(adapter.inner().bump_call_count(), 0);
}
#[test]
fn test_view_name_to_entity_type_single_word() {
use crate::cache::adapter::view_name_to_entity_type;
assert_eq!(view_name_to_entity_type("v_user"), Some("User".to_string()));
assert_eq!(view_name_to_entity_type("v_product"), Some("Product".to_string()));
}
#[test]
fn test_view_name_to_entity_type_multi_word() {
use crate::cache::adapter::view_name_to_entity_type;
assert_eq!(view_name_to_entity_type("v_order_item"), Some("OrderItem".to_string()));
assert_eq!(view_name_to_entity_type("v_user_profile"), Some("UserProfile".to_string()));
assert_eq!(view_name_to_entity_type("v_a_b_c"), Some("ABC".to_string()));
}
#[test]
fn test_view_name_to_entity_type_arbitrary_prefix() {
use crate::cache::adapter::view_name_to_entity_type;
assert_eq!(view_name_to_entity_type("tv_user_event"), Some("UserEvent".to_string()));
assert_eq!(view_name_to_entity_type("mat_order"), Some("Order".to_string()));
}
#[test]
fn test_view_name_to_entity_type_no_prefix() {
use crate::cache::adapter::view_name_to_entity_type;
assert_eq!(view_name_to_entity_type("users"), None);
assert_eq!(view_name_to_entity_type("orders"), None);
}
#[test]
fn test_view_name_to_entity_type_empty_after_prefix() {
use crate::cache::adapter::view_name_to_entity_type;
assert_eq!(view_name_to_entity_type("v_"), None);
assert_eq!(view_name_to_entity_type("_"), None);
}
#[tokio::test]
async fn test_non_cacheable_view_always_hits_db() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let overrides = HashMap::from([("v_expensive".to_string(), 300_u64)]);
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string())
.with_view_ttl_overrides(overrides);
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(
adapter.inner().call_count(),
2,
"non-cacheable view must not be served from cache"
);
}
#[tokio::test]
async fn test_cacheable_view_is_still_cached() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let overrides = HashMap::from([("v_expensive".to_string(), 300_u64)]);
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string())
.with_view_ttl_overrides(overrides);
adapter
.execute_where_query("v_expensive", None, None, None, None)
.await
.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter
.execute_where_query("v_expensive", None, None, None, None)
.await
.unwrap();
assert_eq!(
adapter.inner().call_count(),
1,
"opt-in view must be served from cache on second call"
);
}
#[tokio::test]
async fn test_all_views_cacheable_when_no_overrides_set() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string());
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(
adapter.inner().call_count(),
1,
"with no schema loaded all views must be cached"
);
}
#[tokio::test]
async fn test_schema_without_ttl_annotations_bypasses_cache() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string())
.with_view_ttl_overrides(HashMap::new());
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(
adapter.inner().call_count(),
2,
"schema with no TTL annotations must bypass cache on every request (#187)"
);
}
#[tokio::test]
async fn test_ttl_overrides_from_empty_schema_bypasses_cache() {
let mock = MockAdapter::new();
let cache = QueryResultCache::new(CacheConfig::enabled());
let schema = CompiledSchema::default();
let adapter = CachedDatabaseAdapter::new(mock, cache, "1.0.0".to_string())
.with_ttl_overrides_from_schema(&schema);
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(adapter.inner().call_count(), 1);
adapter.execute_where_query("v_user", None, None, None, None).await.unwrap();
assert_eq!(
adapter.inner().call_count(),
2,
"with_ttl_overrides_from_schema on unannotated schema must bypass cache entirely"
);
}