pub mod cache;
pub mod compaction;
pub mod comparative;
pub mod config;
pub mod dependency;
pub mod input;
pub mod persistence;
pub mod planner;
pub mod queries;
pub mod query;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use sqry_core::graph::unified::concurrent::GraphSnapshot;
pub use cache::ShardedCache;
pub use comparative::{
ChangeType, ComparativeQueryDb, DiffOptions, DiffOutput, DiffSummary, NodeChange, NodeLocation,
};
pub use config::QueryDbConfig;
pub use dependency::DependencyRecorderGuard;
pub use input::{FileInput, FileInputStore};
pub use persistence::{
DERIVED_FORMAT_VERSION, DERIVED_MAGIC, DerivedHeader, DerivedManifest, LoadError, LoadOutcome,
PersistedEntry, QueryDeps, SkipReason, StagedEntry, load_derived,
};
pub use queries::dispatch::{derived_path, load_derived_opportunistic, make_query_db_cold};
pub use query::{DerivedQuery, QueryKey, QueryRegistry};
pub struct QueryDb {
inputs: FileInputStore,
cache: ShardedCache,
edge_revision: AtomicU64,
metadata_revision: AtomicU64,
registry: QueryRegistry,
snapshot: Arc<GraphSnapshot>,
config: QueryDbConfig,
cache_hits: AtomicU64,
cache_misses: AtomicU64,
pub(crate) cold_load_allowed: AtomicBool,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct QueryDbMetrics {
pub cache_hits: u64,
pub cache_misses: u64,
}
impl QueryDbMetrics {
#[inline]
#[must_use]
pub fn total_gets(&self) -> u64 {
self.cache_hits.saturating_add(self.cache_misses)
}
}
impl QueryDb {
#[must_use]
pub fn new(snapshot: Arc<GraphSnapshot>, config: QueryDbConfig) -> Self {
let inputs = FileInputStore::from_snapshot(&snapshot);
let mut db = Self {
inputs,
cache: ShardedCache::new(config.shard_count),
edge_revision: AtomicU64::new(0),
metadata_revision: AtomicU64::new(0),
registry: QueryRegistry::new(),
snapshot,
config,
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
cold_load_allowed: AtomicBool::new(true),
};
db.register_builtin_queries();
db
}
fn register_builtin_queries(&mut self) {
self.registry.register::<queries::SccQuery>();
self.registry.register::<queries::CondensationQuery>();
self.registry.register::<queries::ReachabilityQuery>();
self.registry.register::<queries::CallersQuery>();
self.registry.register::<queries::CalleesQuery>();
self.registry.register::<queries::ImportsQuery>();
self.registry.register::<queries::ExportsQuery>();
self.registry.register::<queries::ReferencesQuery>();
self.registry.register::<queries::ImplementsQuery>();
self.registry.register::<queries::CyclesQuery>();
self.registry.register::<queries::IsInCycleQuery>();
self.registry.register::<queries::EntryPointsQuery>();
self.registry
.register::<queries::ReachableFromEntryPointsQuery>();
self.registry.register::<queries::UnusedQuery>();
self.registry.register::<queries::IsNodeUnusedQuery>();
}
pub fn register<Q: DerivedQuery>(&mut self) {
self.registry.register::<Q>();
}
pub fn get<Q: DerivedQuery>(&self, key: &Q::Key) -> Q::Value {
let query_key = QueryKey::new::<Q>(key);
let shard_idx = self.registry.shard_for::<Q>(self.cache.shard_count());
if let Some(value) = self
.cache
.get_if_valid::<Q::Value>(shard_idx, &query_key, |cached| {
self.validate_cached_result::<Q>(cached)
})
{
self.cache_hits.fetch_add(1, Ordering::Relaxed);
return value;
}
if let Some(value) =
self.cache
.get_cold_if_valid::<Q::Value>(shard_idx, &query_key, |cached| {
self.validate_cached_result::<Q>(cached)
})
{
self.cache_hits.fetch_add(1, Ordering::Relaxed);
return value;
}
self.cache_misses.fetch_add(1, Ordering::Relaxed);
let guard = DependencyRecorderGuard::new();
let value = Q::execute(key, self, &self.snapshot);
let file_deps = guard.finish(&self.inputs);
let edge_rev = if Q::TRACKS_EDGE_REVISION {
Some(self.edge_revision.load(Ordering::Acquire))
} else {
None
};
let metadata_rev = if Q::TRACKS_METADATA_REVISION {
Some(self.metadata_revision.load(Ordering::Acquire))
} else {
None
};
if let Err(err) = self.cache.insert_query::<Q>(
shard_idx,
query_key.clone(),
key,
value.clone(),
file_deps.clone(),
edge_rev,
metadata_rev,
&self.config,
) {
log::warn!(
"sqry-db: failed to serialise cache entry for query_type_id={:#06x}: {err}; \
storing typed value without persistence",
Q::QUERY_TYPE_ID,
);
self.cache.insert(
shard_idx,
query_key,
cache::CachedResult::new(value.clone(), file_deps, edge_rev, metadata_rev),
);
}
value
}
#[inline]
#[must_use]
pub fn snapshot(&self) -> &GraphSnapshot {
&self.snapshot
}
#[inline]
#[must_use]
pub fn snapshot_arc(&self) -> Arc<GraphSnapshot> {
Arc::clone(&self.snapshot)
}
pub fn set_snapshot(&mut self, snapshot: Arc<GraphSnapshot>) {
self.snapshot = snapshot;
}
#[inline]
#[must_use]
pub fn inputs(&self) -> &FileInputStore {
&self.inputs
}
#[inline]
pub fn inputs_mut(&mut self) -> &mut FileInputStore {
&mut self.inputs
}
#[inline]
#[must_use]
pub fn edge_revision(&self) -> u64 {
self.edge_revision.load(Ordering::Acquire)
}
pub fn bump_edge_revision(&self) -> u64 {
self.edge_revision.fetch_add(1, Ordering::Release) + 1
}
#[inline]
#[must_use]
pub fn metadata_revision(&self) -> u64 {
self.metadata_revision.load(Ordering::Acquire)
}
pub fn bump_metadata_revision(&self) -> u64 {
self.metadata_revision.fetch_add(1, Ordering::Release) + 1
}
#[inline]
#[must_use]
pub fn config(&self) -> &QueryDbConfig {
&self.config
}
#[inline]
#[must_use]
pub fn metrics(&self) -> QueryDbMetrics {
QueryDbMetrics {
cache_hits: self.cache_hits.load(Ordering::Relaxed),
cache_misses: self.cache_misses.load(Ordering::Relaxed),
}
}
pub fn invalidate_all(&self) {
self.cache.clear_all();
}
#[inline]
#[must_use]
pub fn cold_load_allowed(&self) -> bool {
self.cold_load_allowed.load(Ordering::Acquire)
}
pub(crate) fn commit_staged_load(
&mut self,
header: persistence::DerivedHeader,
staged: Vec<persistence::StagedEntry>,
) {
self.edge_revision
.store(header.edge_revision, Ordering::Release);
self.metadata_revision
.store(header.metadata_revision, Ordering::Release);
self.inputs.restore_revisions(&header.file_revisions);
for entry in staged {
self.cache.insert_validated(
entry.query_type_id,
entry.raw_key_bytes.into(),
entry.raw_result_bytes.into(),
entry.deps,
);
}
}
pub(crate) fn iter_persistent_cache_entries(
&self,
) -> impl Iterator<Item = cache::PersistableEntry> + '_ {
self.cache.iter_persistent()
}
fn validate_cached_result<Q: DerivedQuery>(&self, cached: &cache::CachedResult) -> bool {
if !cached.validate_file_deps(&self.inputs) {
return false;
}
if Q::TRACKS_EDGE_REVISION {
if let Some(cached_rev) = cached.edge_revision() {
if cached_rev != self.edge_revision.load(Ordering::Acquire) {
return false;
}
} else {
return false;
}
}
if Q::TRACKS_METADATA_REVISION {
if let Some(cached_rev) = cached.metadata_revision() {
if cached_rev != self.metadata_revision.load(Ordering::Acquire) {
return false;
}
} else {
return false;
}
}
true
}
}
unsafe impl Send for QueryDb {}
unsafe impl Sync for QueryDb {}
#[cfg(test)]
mod tests {
use super::*;
use sqry_core::graph::unified::concurrent::CodeGraph;
#[test]
fn query_db_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<QueryDb>();
}
#[test]
fn query_db_new_built_in_registration_is_stable_under_repeats() {
let snapshot = std::sync::Arc::new(CodeGraph::new().snapshot());
let mut db = QueryDb::new(snapshot, QueryDbConfig::default());
let count_after_new = db.registry.registered_count();
db.register::<queries::SccQuery>();
db.register::<queries::CondensationQuery>();
db.register::<queries::ReachabilityQuery>();
db.register::<queries::CallersQuery>();
db.register::<queries::CalleesQuery>();
db.register::<queries::ImportsQuery>();
db.register::<queries::ExportsQuery>();
db.register::<queries::ReferencesQuery>();
db.register::<queries::ImplementsQuery>();
db.register::<queries::CyclesQuery>();
db.register::<queries::IsInCycleQuery>();
db.register::<queries::EntryPointsQuery>();
db.register::<queries::ReachableFromEntryPointsQuery>();
db.register::<queries::UnusedQuery>();
db.register::<queries::IsNodeUnusedQuery>();
assert_eq!(
db.registry.registered_count(),
count_after_new,
"repeat register of built-ins must not bump count"
);
assert!(
count_after_new >= 15,
"QueryDb::new must pre-register at least the 15 DB14 built-ins, got {count_after_new}"
);
}
}