use std::sync::Arc;
use crate::bridge::dispatch::Dispatcher;
use crate::control::catalog_entry::entry::CatalogEntry;
use crate::control::catalog_entry::post_apply::gateway_invalidation::invalidate_gateway_cache_for_entry;
use crate::control::gateway::plan_cache::{PlanCache, PlanCacheKey, hash_sql};
use crate::control::gateway::version_set::GatewayVersionSet;
use crate::control::gateway::{Gateway, PlanCacheInvalidator};
use crate::control::security::catalog::StoredCollection;
use crate::control::state::SharedState;
use crate::wal::WalManager;
fn make_test_state() -> (Arc<SharedState>, Arc<PlanCache>) {
let dir = tempfile::tempdir().expect("tmpdir");
let wal_path = dir.path().join("test.wal");
std::mem::forget(dir);
let wal = Arc::new(WalManager::open_for_testing(&wal_path).expect("wal"));
let (dispatcher, _data_sides) = Dispatcher::new(1, 64);
let shared = SharedState::new(dispatcher, wal);
let shared_for_gw = Arc::clone(&shared);
let gateway = Arc::new(Gateway::new(shared_for_gw));
let plan_cache = Arc::clone(&gateway.plan_cache);
let invalidator = Arc::new(PlanCacheInvalidator::new(&gateway.plan_cache));
unsafe {
let state = Arc::as_ptr(&shared) as *mut SharedState;
(*state).gateway = Some(gateway);
(*state).gateway_invalidator = Some(invalidator);
}
(shared, plan_cache)
}
fn plant_sentinel(cache: &PlanCache, col: &str) -> PlanCacheKey {
use crate::bridge::physical_plan::{KvOp, PhysicalPlan};
let key = PlanCacheKey {
sql_text_hash: hash_sql(&format!("SELECT * FROM {col}")),
placeholder_types_hash: 0,
version_set: GatewayVersionSet::from_pairs(vec![(col.into(), 1)]),
};
let plan = Arc::new(PhysicalPlan::Kv(KvOp::Get {
collection: col.into(),
key: vec![],
rls_filters: vec![],
surrogate_ceiling: None,
}));
cache.insert(key.clone(), plan);
key
}
#[tokio::test]
async fn put_collection_evicts_stale_plan_entries() {
let (shared, cache) = make_test_state();
let key = plant_sentinel(&cache, "orders");
assert_eq!(cache.len(), 1);
let mut col = StoredCollection::new(1, "orders", "alice");
col.descriptor_version = 2;
let entry = CatalogEntry::PutCollection(Box::new(col));
invalidate_gateway_cache_for_entry(&entry, &shared);
assert_eq!(cache.len(), 0, "put_collection must evict stale entries");
assert!(cache.get(&key).is_none());
}
#[tokio::test]
async fn deactivate_collection_evicts_plan_entries() {
let (shared, cache) = make_test_state();
let key = plant_sentinel(&cache, "products");
assert_eq!(cache.len(), 1);
let entry = CatalogEntry::DeactivateCollection {
tenant_id: 1,
name: "products".into(),
};
invalidate_gateway_cache_for_entry(&entry, &shared);
assert_eq!(cache.len(), 0, "deactivate_collection must evict entries");
assert!(cache.get(&key).is_none());
}
fn assert_noop(
shared: &Arc<SharedState>,
cache: &Arc<PlanCache>,
entry: CatalogEntry,
label: &str,
) {
let key = plant_sentinel(cache, "sentinel_col");
let size_before = cache.len();
invalidate_gateway_cache_for_entry(&entry, shared);
assert_eq!(cache.len(), size_before, "{label}: cache must not change");
assert!(
cache.get(&key).is_some(),
"{label}: sentinel entry must survive"
);
cache.invalidate_descriptor("sentinel_col", 0);
}
#[tokio::test]
async fn no_op_variants_do_not_evict_plan_cache() {
use crate::control::security::catalog::sequence_types::StoredSequence;
let (shared, cache) = make_test_state();
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteSequence {
tenant_id: 1,
name: "seq".into(),
},
"DeleteSequence",
);
assert_noop(
&shared,
&cache,
CatalogEntry::PutSequence(Box::new(StoredSequence::new(
1,
"seq2".into(),
"alice".into(),
))),
"PutSequence",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteTrigger {
tenant_id: 1,
name: "trig".into(),
},
"DeleteTrigger",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteFunction {
tenant_id: 1,
name: "fn_".into(),
},
"DeleteFunction",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteProcedure {
tenant_id: 1,
name: "proc".into(),
},
"DeleteProcedure",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteSchedule {
tenant_id: 1,
name: "sched".into(),
},
"DeleteSchedule",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteChangeStream {
tenant_id: 1,
name: "stream".into(),
},
"DeleteChangeStream",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeactivateUser {
username: "bob".into(),
},
"DeactivateUser",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteRole {
name: "analyst".into(),
},
"DeleteRole",
);
assert_noop(
&shared,
&cache,
CatalogEntry::RevokeApiKey {
key_id: "key_abc".into(),
},
"RevokeApiKey",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteMaterializedView {
tenant_id: 1,
name: "mv_orders".into(),
},
"DeleteMaterializedView",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteTenant { tenant_id: 42 },
"DeleteTenant",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteRlsPolicy {
tenant_id: 1,
collection: "orders".into(),
name: "tenant_isolation".into(),
},
"DeleteRlsPolicy",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeletePermission {
target: "collection:1:orders".into(),
grantee: "user:bob".into(),
permission: "read".into(),
},
"DeletePermission",
);
assert_noop(
&shared,
&cache,
CatalogEntry::DeleteOwner {
object_type: "collection".into(),
tenant_id: 1,
object_name: "orders".into(),
},
"DeleteOwner",
);
}
#[tokio::test]
async fn no_gateway_invalidator_is_safe_noop() {
let dir = tempfile::tempdir().expect("tmpdir");
std::mem::forget(dir); let wal_path = std::path::PathBuf::from("/tmp/matchstick_no_gw.wal");
let wal = Arc::new(WalManager::open_for_testing(&wal_path).expect("wal"));
let (dispatcher, _) = Dispatcher::new(1, 64);
let shared = SharedState::new(dispatcher, wal);
let entry = CatalogEntry::PutCollection(Box::new(StoredCollection::new(1, "x", "alice")));
invalidate_gateway_cache_for_entry(&entry, &shared);
}