use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::api::{RedDBError, RedDBOptions, RedDBResult};
use crate::catalog::{
CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
};
use crate::health::{HealthProvider, HealthReport};
use crate::index::IndexCatalog;
use crate::physical::{
ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
SnapshotDescriptor,
};
use crate::serde_json::Value as JsonValue;
use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
use crate::storage::engine::{
BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
};
use crate::storage::query::ast::{
AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateCollectionQuery,
CreateIndexQuery, CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery,
CreateVectorQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery,
DropIndexQuery, DropKvQuery, DropQueueQuery, DropTableQuery, DropTimeSeriesQuery,
DropTreeQuery, DropVectorQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainFormat,
FieldRef, Filter, FusionStrategy, GraphCommand, HybridQuery, IndexMethod, InsertEntityType,
InsertQuery, JoinQuery, JoinType, OrderByClause, ProbabilisticCommand, Projection, QueryExpr,
QueueCommand, QueueSelectQuery, QueueSide, SearchCommand, TableQuery, TreeCommand,
TruncateQuery, UpdateQuery, VectorQuery, VectorSource,
};
use crate::storage::query::is_universal_entity_source as is_universal_query_source;
use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
use crate::storage::query::planner::{
CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
};
use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
use crate::storage::schema::Value;
use crate::storage::unified::dsl::{
apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
};
use crate::storage::unified::store::{
NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
NativeRegistrySummary,
};
use crate::storage::unified::{
Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
};
use crate::storage::{
EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
UnifiedStore,
};
#[derive(Debug, Clone)]
pub struct ConnectionPoolConfig {
pub max_connections: usize,
pub max_idle: usize,
}
impl Default for ConnectionPoolConfig {
fn default() -> Self {
Self {
max_connections: 64,
max_idle: 16,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ScanCursor {
pub offset: usize,
}
#[derive(Debug, Clone)]
pub struct ScanPage {
pub collection: String,
pub items: Vec<UnifiedEntity>,
pub next: Option<ScanCursor>,
pub total: usize,
}
#[derive(Debug, Clone)]
pub struct SystemInfo {
pub pid: u32,
pub cpu_cores: usize,
pub total_memory_bytes: u64,
pub available_memory_bytes: u64,
pub os: String,
pub arch: String,
pub hostname: String,
}
impl SystemInfo {
pub fn should_parallelize() -> bool {
std::thread::available_parallelism()
.map(|p| p.get() > 1)
.unwrap_or(false)
}
pub fn collect() -> Self {
Self {
pid: std::process::id(),
cpu_cores: std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(1),
total_memory_bytes: Self::read_total_memory(),
available_memory_bytes: Self::read_available_memory(),
os: std::env::consts::OS.to_string(),
arch: std::env::consts::ARCH.to_string(),
hostname: std::env::var("HOSTNAME")
.or_else(|_| std::env::var("COMPUTERNAME"))
.unwrap_or_else(|_| "unknown".to_string()),
}
}
#[cfg(target_os = "linux")]
fn read_total_memory() -> u64 {
std::fs::read_to_string("/proc/meminfo")
.ok()
.and_then(|s| {
s.lines()
.find(|l| l.starts_with("MemTotal:"))
.and_then(|l| {
l.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u64>().ok())
})
.map(|kb| kb * 1024)
})
.unwrap_or(0)
}
#[cfg(target_os = "linux")]
fn read_available_memory() -> u64 {
std::fs::read_to_string("/proc/meminfo")
.ok()
.and_then(|s| {
s.lines()
.find(|l| l.starts_with("MemAvailable:"))
.and_then(|l| {
l.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u64>().ok())
})
.map(|kb| kb * 1024)
})
.unwrap_or(0)
}
#[cfg(not(target_os = "linux"))]
fn read_total_memory() -> u64 {
0
}
#[cfg(not(target_os = "linux"))]
fn read_available_memory() -> u64 {
0
}
}
#[derive(Debug, Clone)]
pub struct RuntimeStats {
pub active_connections: usize,
pub idle_connections: usize,
pub total_checkouts: u64,
pub paged_mode: bool,
pub started_at_unix_ms: u128,
pub store: StoreStats,
pub system: SystemInfo,
pub result_blob_cache: crate::storage::cache::BlobCacheStats,
pub kv: KvStats,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct KvStats {
pub puts: u64,
pub gets: u64,
pub deletes: u64,
pub incrs: u64,
pub cas_success: u64,
pub cas_conflict: u64,
pub watch_streams_active: u64,
pub watch_events_emitted: u64,
pub watch_drops: u64,
}
#[derive(Debug, Default)]
pub(crate) struct KvStatsCounters {
puts: AtomicU64,
gets: AtomicU64,
deletes: AtomicU64,
incrs: AtomicU64,
cas_success: AtomicU64,
cas_conflict: AtomicU64,
watch_streams_active: AtomicU64,
watch_events_emitted: AtomicU64,
watch_drops: AtomicU64,
}
#[derive(Debug, Default)]
pub(crate) struct KvTagIndex {
tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
}
impl KvTagIndex {
pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
let entry_key = (collection.to_string(), key.to_string());
let new_tags: BTreeSet<String> = tags
.iter()
.map(|tag| tag.trim())
.filter(|tag| !tag.is_empty())
.map(ToOwned::to_owned)
.collect();
let old_tags = {
let mut key_to_tags = self.key_to_tags.write();
if new_tags.is_empty() {
key_to_tags.remove(&entry_key)
} else {
key_to_tags.insert(entry_key.clone(), new_tags.clone())
}
};
let mut tag_to_entries = self.tag_to_entries.write();
if let Some(old_tags) = old_tags {
for tag in old_tags {
let scoped = (collection.to_string(), tag);
let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
entries.remove(key);
entries.is_empty()
} else {
false
};
if remove_scoped {
tag_to_entries.remove(&scoped);
}
}
}
for tag in new_tags {
tag_to_entries
.entry((collection.to_string(), tag))
.or_default()
.insert(key.to_string(), id);
}
}
pub(crate) fn remove(&self, collection: &str, key: &str) {
let entry_key = (collection.to_string(), key.to_string());
let old_tags = self.key_to_tags.write().remove(&entry_key);
let Some(old_tags) = old_tags else {
return;
};
let mut tag_to_entries = self.tag_to_entries.write();
for tag in old_tags {
let scoped = (collection.to_string(), tag);
let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
entries.remove(key);
entries.is_empty()
} else {
false
};
if remove_scoped {
tag_to_entries.remove(&scoped);
}
}
}
pub(crate) fn entries_for_tags(
&self,
collection: &str,
tags: &[String],
) -> Vec<(String, EntityId)> {
if tags.is_empty() {
return Vec::new();
}
let tag_to_entries = self.tag_to_entries.read();
let mut out: HashMap<String, EntityId> = HashMap::new();
for tag in tags {
let scoped = (collection.to_string(), tag.trim().to_string());
if let Some(entries) = tag_to_entries.get(&scoped) {
for (key, id) in entries {
out.entry(key.clone()).or_insert(*id);
}
}
}
out.into_iter().collect()
}
pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
self.key_to_tags
.read()
.get(&(collection.to_string(), key.to_string()))
.map(|tags| tags.iter().cloned().collect())
.unwrap_or_default()
}
}
impl KvStatsCounters {
pub(crate) fn snapshot(&self) -> KvStats {
KvStats {
puts: self.puts.load(AtomicOrdering::Relaxed),
gets: self.gets.load(AtomicOrdering::Relaxed),
deletes: self.deletes.load(AtomicOrdering::Relaxed),
incrs: self.incrs.load(AtomicOrdering::Relaxed),
cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
}
}
pub(crate) fn incr_puts(&self) {
self.puts.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn incr_gets(&self) {
self.gets.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn incr_deletes(&self) {
self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn incr_incrs(&self) {
self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn incr_cas_success(&self) {
self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn incr_cas_conflict(&self) {
self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn incr_watch_streams_active(&self) {
self.watch_streams_active
.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn decr_watch_streams_active(&self) {
self.watch_streams_active
.fetch_sub(1, AtomicOrdering::Relaxed);
}
pub(crate) fn incr_watch_events_emitted(&self) {
self.watch_events_emitted
.fetch_add(1, AtomicOrdering::Relaxed);
}
pub(crate) fn add_watch_drops(&self, count: u64) {
self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct RuntimeQueryResult {
pub query: String,
pub mode: QueryMode,
pub statement: &'static str,
pub engine: &'static str,
pub result: UnifiedResult,
pub affected_rows: u64,
pub statement_type: &'static str,
}
impl RuntimeQueryResult {
pub fn dml_result(
query: String,
affected: u64,
statement_type: &'static str,
engine: &'static str,
) -> Self {
Self {
query,
mode: QueryMode::Sql,
statement: statement_type,
engine,
result: UnifiedResult::empty(),
affected_rows: affected,
statement_type,
}
}
pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
let mut result = UnifiedResult::empty();
let mut record = UnifiedRecord::new();
record.set("message", Value::text(message.to_string()));
result.push(record);
result.columns = vec!["message".to_string()];
Self {
query,
mode: QueryMode::Sql,
statement: statement_type,
engine: "runtime-ddl",
result,
affected_rows: 0,
statement_type,
}
}
pub fn ok_records(
query: String,
columns: Vec<String>,
rows: Vec<Vec<(String, Value)>>,
statement_type: &'static str,
) -> Self {
let mut result = UnifiedResult::empty();
for row in rows {
let mut record = UnifiedRecord::new();
for (k, v) in row {
record.set(&k, v);
}
result.push(record);
}
result.columns = columns;
Self {
query,
mode: QueryMode::Sql,
statement: statement_type,
engine: "runtime-meta",
result,
affected_rows: 0,
statement_type,
}
}
}
#[derive(Debug, Clone)]
pub struct RuntimeQueryExplain {
pub query: String,
pub mode: QueryMode,
pub statement: &'static str,
pub is_universal: bool,
pub plan_cost: crate::storage::query::planner::PlanCost,
pub estimated_rows: f64,
pub estimated_selectivity: f64,
pub estimated_confidence: f64,
pub passes_applied: Vec<String>,
pub logical_plan: CanonicalLogicalPlan,
pub cte_materializations: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct RuntimeIvfMatch {
pub entity_id: u64,
pub distance: f32,
pub entity: Option<UnifiedEntity>,
}
#[derive(Debug, Clone)]
pub struct RuntimeIvfSearchResult {
pub collection: String,
pub k: usize,
pub n_lists: usize,
pub n_probes: usize,
pub stats: IvfStats,
pub matches: Vec<RuntimeIvfMatch>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeGraphDirection {
Outgoing,
Incoming,
Both,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeGraphTraversalStrategy {
Bfs,
Dfs,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeGraphPathAlgorithm {
Bfs,
Dijkstra,
AStar,
BellmanFord,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphNode {
pub id: String,
pub label: String,
pub node_type: String,
pub out_edge_count: u32,
pub in_edge_count: u32,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphEdge {
pub source: String,
pub target: String,
pub edge_type: String,
pub weight: f32,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphVisit {
pub depth: usize,
pub node: RuntimeGraphNode,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphNeighborhoodResult {
pub source: String,
pub direction: RuntimeGraphDirection,
pub max_depth: usize,
pub nodes: Vec<RuntimeGraphVisit>,
pub edges: Vec<RuntimeGraphEdge>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphTraversalResult {
pub source: String,
pub direction: RuntimeGraphDirection,
pub strategy: RuntimeGraphTraversalStrategy,
pub max_depth: usize,
pub visits: Vec<RuntimeGraphVisit>,
pub edges: Vec<RuntimeGraphEdge>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphPath {
pub hop_count: usize,
pub total_weight: f64,
pub nodes: Vec<RuntimeGraphNode>,
pub edges: Vec<RuntimeGraphEdge>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphPathResult {
pub source: String,
pub target: String,
pub direction: RuntimeGraphDirection,
pub algorithm: RuntimeGraphPathAlgorithm,
pub nodes_visited: usize,
pub negative_cycle_detected: Option<bool>,
pub path: Option<RuntimeGraphPath>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeGraphComponentsMode {
Connected,
Weak,
Strong,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeGraphCentralityAlgorithm {
Degree,
Closeness,
Betweenness,
Eigenvector,
PageRank,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeGraphCommunityAlgorithm {
LabelPropagation,
Louvain,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphComponent {
pub id: String,
pub size: usize,
pub nodes: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphComponentsResult {
pub mode: RuntimeGraphComponentsMode,
pub count: usize,
pub components: Vec<RuntimeGraphComponent>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphCentralityScore {
pub node: RuntimeGraphNode,
pub score: f64,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphDegreeScore {
pub node: RuntimeGraphNode,
pub in_degree: usize,
pub out_degree: usize,
pub total_degree: usize,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphCentralityResult {
pub algorithm: RuntimeGraphCentralityAlgorithm,
pub normalized: Option<bool>,
pub iterations: Option<usize>,
pub converged: Option<bool>,
pub scores: Vec<RuntimeGraphCentralityScore>,
pub degree_scores: Vec<RuntimeGraphDegreeScore>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphCommunity {
pub id: String,
pub size: usize,
pub nodes: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphCommunityResult {
pub algorithm: RuntimeGraphCommunityAlgorithm,
pub count: usize,
pub iterations: Option<usize>,
pub converged: Option<bool>,
pub modularity: Option<f64>,
pub passes: Option<usize>,
pub communities: Vec<RuntimeGraphCommunity>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphClusteringResult {
pub global: f64,
pub local: Vec<RuntimeGraphCentralityScore>,
pub triangle_count: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphHitsResult {
pub iterations: usize,
pub converged: bool,
pub hubs: Vec<RuntimeGraphCentralityScore>,
pub authorities: Vec<RuntimeGraphCentralityScore>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphCyclesResult {
pub limit_reached: bool,
pub cycles: Vec<RuntimeGraphPath>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphTopologicalSortResult {
pub acyclic: bool,
pub ordered_nodes: Vec<RuntimeGraphNode>,
}
#[derive(Debug, Clone)]
pub struct RuntimeGraphPropertiesResult {
pub node_count: usize,
pub edge_count: usize,
pub self_loop_count: usize,
pub negative_edge_count: usize,
pub connected_component_count: usize,
pub weak_component_count: usize,
pub strong_component_count: usize,
pub is_empty: bool,
pub is_connected: bool,
pub is_weakly_connected: bool,
pub is_strongly_connected: bool,
pub is_complete: bool,
pub is_complete_directed: bool,
pub is_cyclic: bool,
pub is_circular: bool,
pub is_acyclic: bool,
pub is_tree: bool,
pub density: f64,
pub density_directed: f64,
}
#[derive(Debug, Clone)]
pub struct ContextSearchResult {
pub query: String,
pub tables: Vec<ContextEntity>,
pub graph: ContextGraphResult,
pub vectors: Vec<ContextEntity>,
pub documents: Vec<ContextEntity>,
pub key_values: Vec<ContextEntity>,
pub connections: Vec<ContextConnection>,
pub summary: ContextSummary,
}
#[derive(Debug, Clone)]
pub struct ContextEntity {
pub entity: UnifiedEntity,
pub score: f32,
pub discovery: DiscoveryMethod,
pub collection: String,
}
#[derive(Debug, Clone)]
pub enum DiscoveryMethod {
Indexed {
field: String,
},
GlobalScan,
CrossReference {
source_id: u64,
ref_type: String,
},
GraphTraversal {
source_id: u64,
edge_type: String,
depth: usize,
},
VectorQuery {
similarity: f32,
},
}
#[derive(Debug, Clone)]
pub struct ContextGraphResult {
pub nodes: Vec<ContextEntity>,
pub edges: Vec<ContextEntity>,
}
#[derive(Debug, Clone)]
pub struct ContextConnection {
pub from_id: u64,
pub to_id: u64,
pub connection_type: ContextConnectionType,
pub weight: f32,
}
#[derive(Debug, Clone)]
pub enum ContextConnectionType {
CrossRef(String),
GraphEdge(String),
VectorSimilarity(f32),
}
#[derive(Debug, Clone)]
pub struct ContextSummary {
pub total_entities: usize,
pub direct_matches: usize,
pub expanded_via_graph: usize,
pub expanded_via_cross_refs: usize,
pub expanded_via_vector_query: usize,
pub collections_searched: usize,
pub execution_time_us: u64,
pub tiers_used: Vec<String>,
pub entities_reindexed: usize,
}
struct PoolState {
next_id: u64,
active: usize,
idle: Vec<u64>,
total_checkouts: u64,
}
impl Default for PoolState {
fn default() -> Self {
Self {
next_id: 1,
active: 0,
idle: Vec::new(),
total_checkouts: 0,
}
}
}
#[derive(Debug, Clone)]
struct RuntimeResultCacheEntry {
result: RuntimeQueryResult,
cached_at: std::time::Instant,
scopes: HashSet<String>,
}
pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
pub(crate) const ASK_ANSWER_CACHE_NAMESPACE: &str = "runtime.ask_answer_cache";
struct RuntimeInner {
db: Arc<RedDB>,
layout: PhysicalLayout,
indices: IndexCatalog,
pool_config: ConnectionPoolConfig,
pool: Mutex<PoolState>,
started_at_unix_ms: u128,
probabilistic: probabilistic_store::ProbabilisticStore,
index_store: index_store::IndexStore,
cdc: crate::replication::cdc::CdcBuffer,
backup_scheduler: crate::replication::scheduler::BackupScheduler,
query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
result_cache: parking_lot::RwLock<(
HashMap<String, RuntimeResultCacheEntry>,
std::collections::VecDeque<String>,
)>,
result_blob_cache: crate::storage::cache::BlobCache,
result_blob_entries: parking_lot::RwLock<(
HashMap<String, RuntimeResultCacheEntry>,
std::collections::VecDeque<String>,
)>,
ask_answer_cache_entries:
parking_lot::RwLock<(HashSet<String>, std::collections::VecDeque<String>)>,
result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
ask_daily_spend:
parking_lot::RwLock<HashMap<String, crate::runtime::ai::cost_guard::DailyState>>,
queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
ec_registry: Arc<crate::ec::config::EcRegistry>,
ec_worker: crate::ec::worker::EcWorker,
auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
tx_contexts:
parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
env_config_overrides: HashMap<String, String>,
tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
rls_policies: parking_lot::RwLock<
HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
>,
rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
pending_tombstones: parking_lot::RwLock<
HashMap<
u64,
Vec<(
String,
crate::storage::unified::entity::EntityId,
crate::storage::transaction::snapshot::Xid,
crate::storage::transaction::snapshot::Xid,
)>,
>,
>,
pending_versioned_updates: parking_lot::RwLock<
HashMap<
u64,
Vec<(
String,
crate::storage::unified::entity::EntityId,
crate::storage::unified::entity::EntityId,
crate::storage::transaction::snapshot::Xid,
crate::storage::transaction::snapshot::Xid,
)>,
>,
>,
pending_kv_watch_events:
parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
pending_store_wal_actions:
parking_lot::RwLock<HashMap<u64, crate::storage::unified::DeferredStoreWalActions>>,
tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
ddl_epoch: std::sync::atomic::AtomicU64,
write_gate: Arc<crate::runtime::write_gate::WriteGate>,
lifecycle: crate::runtime::lifecycle::Lifecycle,
resource_limits: crate::runtime::resource_limits::ResourceLimits,
audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
replica_apply_metrics: crate::replication::logical::ReplicaApplyMetrics,
quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
kv_stats: KvStatsCounters,
kv_tag_index: KvTagIndex,
}
#[derive(Clone)]
pub struct RedDBRuntime {
inner: Arc<RuntimeInner>,
}
pub struct RuntimeConnection {
id: u64,
inner: Arc<RuntimeInner>,
}
pub mod ai;
pub mod ask_pipeline;
pub mod audit_log;
pub mod audit_query;
pub mod authorized_search;
mod collection_contract;
pub mod config_matrix;
pub mod config_overlay;
pub mod config_watcher;
pub(crate) mod ddl;
pub mod disk_space_monitor;
mod dml_target_scan;
mod expr_eval;
mod graph_dsl;
mod health_connection;
mod impl_config;
pub(crate) mod impl_core;
mod impl_ddl;
mod impl_dml;
mod impl_ec;
mod impl_events;
mod impl_graph;
mod impl_graph_commands;
pub mod impl_kv;
mod impl_migrations;
mod impl_native;
mod impl_physical;
mod impl_probabilistic;
pub mod impl_queue;
mod impl_search;
mod impl_timeseries;
mod impl_tree;
mod impl_vcs;
mod index_store;
mod join_filter;
mod keyed_spine;
pub mod kv_watch;
pub mod lease_lifecycle;
pub mod lease_loop;
pub mod lease_timer_wheel;
pub mod lifecycle;
pub mod locking;
pub(crate) mod mutation;
mod probabilistic_store;
pub(crate) mod query_exec;
mod queue_delivery;
pub mod quota_bucket;
mod record_search;
mod red_schema;
pub mod resource_limits;
pub(crate) mod scalar_evaluator;
pub mod schema_diff;
pub mod schema_vocabulary;
pub mod snapshot_reuse;
mod statement_frame;
mod vector_index;
pub mod within_clause;
pub mod write_gate;
pub use self::graph_dsl::*;
use self::join_filter::*;
use self::query_exec::*;
use self::record_search::*;
pub use self::statement_frame::EffectiveScope;
pub mod mvcc {
pub use super::impl_core::{
capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
entity_visible_under_current_snapshot, entity_visible_with_context,
set_current_auth_identity, set_current_connection_id, set_current_snapshot,
set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
};
}
pub mod record_search_helpers {
use crate::storage::query::UnifiedRecord;
use crate::storage::UnifiedEntity;
use std::collections::BTreeSet;
pub fn entity_type_and_capabilities(
entity: &UnifiedEntity,
) -> (&'static str, BTreeSet<String>) {
super::record_search::runtime_entity_type_and_capabilities(entity)
}
pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
super::record_search::runtime_any_record_from_entity(entity)
}
}