Skip to main content

reddb_server/
runtime.rs

1//! Embedded runtime with connection pooling, scans and health.
2
3use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{RedDBError, RedDBOptions, RedDBResult};
10use crate::catalog::{
11    CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
12    CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
13};
14use crate::health::{HealthProvider, HealthReport};
15use crate::index::IndexCatalog;
16use crate::physical::{
17    ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
18    SnapshotDescriptor,
19};
20use crate::serde_json::Value as JsonValue;
21use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
22use crate::storage::engine::{
23    BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
24    CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
25    IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
26    MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
27    StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
28};
29use crate::storage::query::ast::{
30    AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateCollectionQuery,
31    CreateIndexQuery, CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery,
32    CreateVectorQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery,
33    DropIndexQuery, DropKvQuery, DropQueueQuery, DropTableQuery, DropTimeSeriesQuery,
34    DropTreeQuery, DropVectorQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainFormat,
35    FieldRef, Filter, FusionStrategy, GraphCommand, HybridQuery, IndexMethod, InsertEntityType,
36    InsertQuery, JoinQuery, JoinType, OrderByClause, ProbabilisticCommand, Projection, QueryExpr,
37    QueueCommand, QueueSelectQuery, QueueSide, SearchCommand, TableQuery, TreeCommand,
38    TruncateQuery, UpdateQuery, VectorQuery, VectorSource,
39};
40use crate::storage::query::is_universal_entity_source as is_universal_query_source;
41use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
42use crate::storage::query::planner::{
43    CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
44};
45use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
46use crate::storage::schema::Value;
47use crate::storage::unified::dsl::{
48    apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
49    FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
50    QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
51};
52use crate::storage::unified::store::{
53    NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
54    NativeRegistrySummary,
55};
56use crate::storage::unified::{
57    Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
58};
59use crate::storage::{
60    EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
61    UnifiedStore,
62};
63
64#[derive(Debug, Clone)]
65pub struct ConnectionPoolConfig {
66    pub max_connections: usize,
67    pub max_idle: usize,
68}
69
70impl Default for ConnectionPoolConfig {
71    fn default() -> Self {
72        Self {
73            max_connections: 64,
74            max_idle: 16,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct ScanCursor {
81    pub offset: usize,
82}
83
84#[derive(Debug, Clone)]
85pub struct ScanPage {
86    pub collection: String,
87    pub items: Vec<UnifiedEntity>,
88    pub next: Option<ScanCursor>,
89    pub total: usize,
90}
91
92#[derive(Debug, Clone)]
93pub struct SystemInfo {
94    pub pid: u32,
95    pub cpu_cores: usize,
96    pub total_memory_bytes: u64,
97    pub available_memory_bytes: u64,
98    pub os: String,
99    pub arch: String,
100    pub hostname: String,
101}
102
103impl SystemInfo {
104    /// Whether the system has enough cores to benefit from parallelism.
105    /// Returns false on single-core machines where thread overhead > gains.
106    pub fn should_parallelize() -> bool {
107        std::thread::available_parallelism()
108            .map(|p| p.get() > 1)
109            .unwrap_or(false)
110    }
111
112    pub fn collect() -> Self {
113        Self {
114            pid: std::process::id(),
115            cpu_cores: std::thread::available_parallelism()
116                .map(|p| p.get())
117                .unwrap_or(1),
118            total_memory_bytes: Self::read_total_memory(),
119            available_memory_bytes: Self::read_available_memory(),
120            os: std::env::consts::OS.to_string(),
121            arch: std::env::consts::ARCH.to_string(),
122            hostname: std::env::var("HOSTNAME")
123                .or_else(|_| std::env::var("COMPUTERNAME"))
124                .unwrap_or_else(|_| "unknown".to_string()),
125        }
126    }
127
128    #[cfg(target_os = "linux")]
129    fn read_total_memory() -> u64 {
130        std::fs::read_to_string("/proc/meminfo")
131            .ok()
132            .and_then(|s| {
133                s.lines()
134                    .find(|l| l.starts_with("MemTotal:"))
135                    .and_then(|l| {
136                        l.split_whitespace()
137                            .nth(1)
138                            .and_then(|v| v.parse::<u64>().ok())
139                    })
140                    .map(|kb| kb * 1024)
141            })
142            .unwrap_or(0)
143    }
144
145    #[cfg(target_os = "linux")]
146    fn read_available_memory() -> u64 {
147        std::fs::read_to_string("/proc/meminfo")
148            .ok()
149            .and_then(|s| {
150                s.lines()
151                    .find(|l| l.starts_with("MemAvailable:"))
152                    .and_then(|l| {
153                        l.split_whitespace()
154                            .nth(1)
155                            .and_then(|v| v.parse::<u64>().ok())
156                    })
157                    .map(|kb| kb * 1024)
158            })
159            .unwrap_or(0)
160    }
161
162    #[cfg(not(target_os = "linux"))]
163    fn read_total_memory() -> u64 {
164        0
165    }
166
167    #[cfg(not(target_os = "linux"))]
168    fn read_available_memory() -> u64 {
169        0
170    }
171}
172
173#[derive(Debug, Clone)]
174pub struct RuntimeStats {
175    pub active_connections: usize,
176    pub idle_connections: usize,
177    pub total_checkouts: u64,
178    pub paged_mode: bool,
179    pub started_at_unix_ms: u128,
180    pub store: StoreStats,
181    pub system: SystemInfo,
182    pub result_blob_cache: crate::storage::cache::BlobCacheStats,
183    pub kv: KvStats,
184    pub metrics_ingest: MetricsIngestStats,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct MetricsTenantActivityStats {
189    pub tenant: String,
190    pub namespace: String,
191    pub operation: String,
192    pub count: u64,
193}
194
195#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
196pub struct MetricsIngestStats {
197    pub samples_accepted: u64,
198    pub series_accepted: u64,
199    pub samples_rejected: u64,
200    pub series_rejected: u64,
201    pub series_rejected_cardinality_budget: u64,
202}
203
204#[derive(Debug, Default)]
205pub(crate) struct MetricsIngestCounters {
206    samples_accepted: AtomicU64,
207    series_accepted: AtomicU64,
208    samples_rejected: AtomicU64,
209    series_rejected: AtomicU64,
210    series_rejected_cardinality_budget: AtomicU64,
211}
212
213impl MetricsIngestCounters {
214    pub(crate) fn record(
215        &self,
216        accepted_samples: u64,
217        accepted_series: u64,
218        rejected_samples: u64,
219        rejected_series: u64,
220    ) {
221        self.samples_accepted
222            .fetch_add(accepted_samples, AtomicOrdering::Relaxed);
223        self.series_accepted
224            .fetch_add(accepted_series, AtomicOrdering::Relaxed);
225        self.samples_rejected
226            .fetch_add(rejected_samples, AtomicOrdering::Relaxed);
227        self.series_rejected
228            .fetch_add(rejected_series, AtomicOrdering::Relaxed);
229    }
230
231    pub(crate) fn record_cardinality_budget_rejections(&self, rejected_series: u64) {
232        self.series_rejected_cardinality_budget
233            .fetch_add(rejected_series, AtomicOrdering::Relaxed);
234    }
235
236    pub(crate) fn snapshot(&self) -> MetricsIngestStats {
237        MetricsIngestStats {
238            samples_accepted: self.samples_accepted.load(AtomicOrdering::Relaxed),
239            series_accepted: self.series_accepted.load(AtomicOrdering::Relaxed),
240            samples_rejected: self.samples_rejected.load(AtomicOrdering::Relaxed),
241            series_rejected: self.series_rejected.load(AtomicOrdering::Relaxed),
242            series_rejected_cardinality_budget: self
243                .series_rejected_cardinality_budget
244                .load(AtomicOrdering::Relaxed),
245        }
246    }
247}
248
249#[derive(Debug, Default)]
250pub(crate) struct MetricsTenantActivityCounters {
251    inner: Mutex<BTreeMap<(String, String, String), u64>>,
252}
253
254impl MetricsTenantActivityCounters {
255    pub(crate) fn record(&self, tenant: &str, namespace: &str, operation: &str) {
256        let mut inner = self
257            .inner
258            .lock()
259            .unwrap_or_else(|poison| poison.into_inner());
260        let key = (
261            tenant.to_string(),
262            namespace.to_string(),
263            operation.to_string(),
264        );
265        *inner.entry(key).or_insert(0) += 1;
266    }
267
268    pub(crate) fn snapshot(&self) -> Vec<MetricsTenantActivityStats> {
269        let inner = self
270            .inner
271            .lock()
272            .unwrap_or_else(|poison| poison.into_inner());
273        inner
274            .iter()
275            .map(
276                |((tenant, namespace, operation), count)| MetricsTenantActivityStats {
277                    tenant: tenant.clone(),
278                    namespace: namespace.clone(),
279                    operation: operation.clone(),
280                    count: *count,
281                },
282            )
283            .collect()
284    }
285}
286
287#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
288pub struct KvStats {
289    pub puts: u64,
290    pub gets: u64,
291    pub deletes: u64,
292    pub incrs: u64,
293    pub cas_success: u64,
294    pub cas_conflict: u64,
295    pub watch_streams_active: u64,
296    pub watch_events_emitted: u64,
297    pub watch_drops: u64,
298}
299
300#[derive(Debug, Default)]
301pub(crate) struct KvStatsCounters {
302    puts: AtomicU64,
303    gets: AtomicU64,
304    deletes: AtomicU64,
305    incrs: AtomicU64,
306    cas_success: AtomicU64,
307    cas_conflict: AtomicU64,
308    watch_streams_active: AtomicU64,
309    watch_events_emitted: AtomicU64,
310    watch_drops: AtomicU64,
311}
312
313#[derive(Debug, Default)]
314pub(crate) struct KvTagIndex {
315    tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
316    key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
317}
318
319impl KvTagIndex {
320    pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
321        let entry_key = (collection.to_string(), key.to_string());
322        let new_tags: BTreeSet<String> = tags
323            .iter()
324            .map(|tag| tag.trim())
325            .filter(|tag| !tag.is_empty())
326            .map(ToOwned::to_owned)
327            .collect();
328
329        let old_tags = {
330            let mut key_to_tags = self.key_to_tags.write();
331            if new_tags.is_empty() {
332                key_to_tags.remove(&entry_key)
333            } else {
334                key_to_tags.insert(entry_key.clone(), new_tags.clone())
335            }
336        };
337
338        let mut tag_to_entries = self.tag_to_entries.write();
339        if let Some(old_tags) = old_tags {
340            for tag in old_tags {
341                let scoped = (collection.to_string(), tag);
342                let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
343                    entries.remove(key);
344                    entries.is_empty()
345                } else {
346                    false
347                };
348                if remove_scoped {
349                    tag_to_entries.remove(&scoped);
350                }
351            }
352        }
353
354        for tag in new_tags {
355            tag_to_entries
356                .entry((collection.to_string(), tag))
357                .or_default()
358                .insert(key.to_string(), id);
359        }
360    }
361
362    pub(crate) fn remove(&self, collection: &str, key: &str) {
363        let entry_key = (collection.to_string(), key.to_string());
364        let old_tags = self.key_to_tags.write().remove(&entry_key);
365        let Some(old_tags) = old_tags else {
366            return;
367        };
368
369        let mut tag_to_entries = self.tag_to_entries.write();
370        for tag in old_tags {
371            let scoped = (collection.to_string(), tag);
372            let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
373                entries.remove(key);
374                entries.is_empty()
375            } else {
376                false
377            };
378            if remove_scoped {
379                tag_to_entries.remove(&scoped);
380            }
381        }
382    }
383
384    pub(crate) fn entries_for_tags(
385        &self,
386        collection: &str,
387        tags: &[String],
388    ) -> Vec<(String, EntityId)> {
389        if tags.is_empty() {
390            return Vec::new();
391        }
392
393        let tag_to_entries = self.tag_to_entries.read();
394        let mut out: HashMap<String, EntityId> = HashMap::new();
395        for tag in tags {
396            let scoped = (collection.to_string(), tag.trim().to_string());
397            if let Some(entries) = tag_to_entries.get(&scoped) {
398                for (key, id) in entries {
399                    out.entry(key.clone()).or_insert(*id);
400                }
401            }
402        }
403        out.into_iter().collect()
404    }
405
406    pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
407        self.key_to_tags
408            .read()
409            .get(&(collection.to_string(), key.to_string()))
410            .map(|tags| tags.iter().cloned().collect())
411            .unwrap_or_default()
412    }
413}
414
415impl KvStatsCounters {
416    pub(crate) fn snapshot(&self) -> KvStats {
417        KvStats {
418            puts: self.puts.load(AtomicOrdering::Relaxed),
419            gets: self.gets.load(AtomicOrdering::Relaxed),
420            deletes: self.deletes.load(AtomicOrdering::Relaxed),
421            incrs: self.incrs.load(AtomicOrdering::Relaxed),
422            cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
423            cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
424            watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
425            watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
426            watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
427        }
428    }
429
430    pub(crate) fn incr_puts(&self) {
431        self.puts.fetch_add(1, AtomicOrdering::Relaxed);
432    }
433
434    pub(crate) fn incr_gets(&self) {
435        self.gets.fetch_add(1, AtomicOrdering::Relaxed);
436    }
437
438    pub(crate) fn incr_deletes(&self) {
439        self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
440    }
441
442    pub(crate) fn incr_incrs(&self) {
443        self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
444    }
445
446    pub(crate) fn incr_cas_success(&self) {
447        self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
448    }
449
450    pub(crate) fn incr_cas_conflict(&self) {
451        self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
452    }
453
454    pub(crate) fn incr_watch_streams_active(&self) {
455        self.watch_streams_active
456            .fetch_add(1, AtomicOrdering::Relaxed);
457    }
458
459    pub(crate) fn decr_watch_streams_active(&self) {
460        self.watch_streams_active
461            .fetch_sub(1, AtomicOrdering::Relaxed);
462    }
463
464    pub(crate) fn incr_watch_events_emitted(&self) {
465        self.watch_events_emitted
466            .fetch_add(1, AtomicOrdering::Relaxed);
467    }
468
469    pub(crate) fn add_watch_drops(&self, count: u64) {
470        self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
471    }
472}
473
474#[derive(Debug, Clone)]
475pub struct RuntimeQueryResult {
476    pub query: String,
477    pub mode: QueryMode,
478    pub statement: &'static str,
479    pub engine: &'static str,
480    pub result: UnifiedResult,
481    pub affected_rows: u64,
482    /// High-level statement type: "select", "insert", "update", "delete", "create", "drop", "alter"
483    pub statement_type: &'static str,
484}
485
486impl RuntimeQueryResult {
487    /// Construct a result representing a DML operation (insert/update/delete).
488    pub fn dml_result(
489        query: String,
490        affected: u64,
491        statement_type: &'static str,
492        engine: &'static str,
493    ) -> Self {
494        Self {
495            query,
496            mode: QueryMode::Sql,
497            statement: statement_type,
498            engine,
499            result: UnifiedResult::empty(),
500            affected_rows: affected,
501            statement_type,
502        }
503    }
504
505    /// Construct a result representing a DDL message (create/drop/alter).
506    pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
507        let mut result = UnifiedResult::empty();
508        let mut record = UnifiedRecord::new();
509        record.set("message", Value::text(message.to_string()));
510        result.push(record);
511        result.columns = vec!["message".to_string()];
512
513        Self {
514            query,
515            mode: QueryMode::Sql,
516            statement: statement_type,
517            engine: "runtime-ddl",
518            result,
519            affected_rows: 0,
520            statement_type,
521        }
522    }
523
524    /// Construct a multi-column record result for read-only meta commands
525    /// (EXPLAIN ALTER, schema introspection, etc.). Each row is a Vec of
526    /// (column_name, value) pairs in column order.
527    pub fn ok_records(
528        query: String,
529        columns: Vec<String>,
530        rows: Vec<Vec<(String, Value)>>,
531        statement_type: &'static str,
532    ) -> Self {
533        let mut result = UnifiedResult::empty();
534        for row in rows {
535            let mut record = UnifiedRecord::new();
536            for (k, v) in row {
537                record.set(&k, v);
538            }
539            result.push(record);
540        }
541        result.columns = columns;
542
543        Self {
544            query,
545            mode: QueryMode::Sql,
546            statement: statement_type,
547            engine: "runtime-meta",
548            result,
549            affected_rows: 0,
550            statement_type,
551        }
552    }
553}
554
555#[derive(Debug, Clone)]
556pub struct RuntimeQueryExplain {
557    pub query: String,
558    pub mode: QueryMode,
559    pub statement: &'static str,
560    pub is_universal: bool,
561    pub plan_cost: crate::storage::query::planner::PlanCost,
562    pub estimated_rows: f64,
563    pub estimated_selectivity: f64,
564    pub estimated_confidence: f64,
565    pub passes_applied: Vec<String>,
566    pub logical_plan: CanonicalLogicalPlan,
567    /// Names of any CTEs declared by a leading `WITH` clause. Empty
568    /// for non-CTE queries. The plan tree is built against the
569    /// post-inlining body, so each CTE's body is reachable inside
570    /// `logical_plan` as a regular `Subquery` (or, for bare refs, the
571    /// inlined Table node verbatim). This list lets renderers prepend
572    /// `CteScan` markers so operators see which CTEs were resolved.
573    pub cte_materializations: Vec<String>,
574}
575
576#[derive(Debug, Clone)]
577pub struct RuntimeIvfMatch {
578    pub entity_id: u64,
579    pub distance: f32,
580    pub entity: Option<UnifiedEntity>,
581}
582
583#[derive(Debug, Clone)]
584pub struct RuntimeIvfSearchResult {
585    pub collection: String,
586    pub k: usize,
587    pub n_lists: usize,
588    pub n_probes: usize,
589    pub stats: IvfStats,
590    pub matches: Vec<RuntimeIvfMatch>,
591}
592
593#[derive(Debug, Clone, Copy, PartialEq, Eq)]
594pub enum RuntimeGraphDirection {
595    Outgoing,
596    Incoming,
597    Both,
598}
599
600#[derive(Debug, Clone, Copy, PartialEq, Eq)]
601pub enum RuntimeGraphTraversalStrategy {
602    Bfs,
603    Dfs,
604}
605
606#[derive(Debug, Clone, Copy, PartialEq, Eq)]
607pub enum RuntimeGraphPathAlgorithm {
608    Bfs,
609    Dijkstra,
610    AStar,
611    BellmanFord,
612}
613
614#[derive(Debug, Clone)]
615pub struct RuntimeGraphNode {
616    pub id: String,
617    pub label: String,
618    pub node_type: String,
619    pub out_edge_count: u32,
620    pub in_edge_count: u32,
621}
622
623#[derive(Debug, Clone)]
624pub struct RuntimeGraphEdge {
625    pub source: String,
626    pub target: String,
627    pub edge_type: String,
628    pub weight: f32,
629}
630
631#[derive(Debug, Clone)]
632pub struct RuntimeGraphVisit {
633    pub depth: usize,
634    pub node: RuntimeGraphNode,
635}
636
637#[derive(Debug, Clone)]
638pub struct RuntimeGraphNeighborhoodResult {
639    pub source: String,
640    pub direction: RuntimeGraphDirection,
641    pub max_depth: usize,
642    pub nodes: Vec<RuntimeGraphVisit>,
643    pub edges: Vec<RuntimeGraphEdge>,
644}
645
646#[derive(Debug, Clone)]
647pub struct RuntimeGraphTraversalResult {
648    pub source: String,
649    pub direction: RuntimeGraphDirection,
650    pub strategy: RuntimeGraphTraversalStrategy,
651    pub max_depth: usize,
652    pub visits: Vec<RuntimeGraphVisit>,
653    pub edges: Vec<RuntimeGraphEdge>,
654}
655
656#[derive(Debug, Clone)]
657pub struct RuntimeGraphPath {
658    pub hop_count: usize,
659    pub total_weight: f64,
660    pub nodes: Vec<RuntimeGraphNode>,
661    pub edges: Vec<RuntimeGraphEdge>,
662}
663
664#[derive(Debug, Clone)]
665pub struct RuntimeGraphPathResult {
666    pub source: String,
667    pub target: String,
668    pub direction: RuntimeGraphDirection,
669    pub algorithm: RuntimeGraphPathAlgorithm,
670    pub nodes_visited: usize,
671    pub negative_cycle_detected: Option<bool>,
672    pub path: Option<RuntimeGraphPath>,
673}
674
675#[derive(Debug, Clone, Copy, PartialEq, Eq)]
676pub enum RuntimeGraphComponentsMode {
677    Connected,
678    Weak,
679    Strong,
680}
681
682#[derive(Debug, Clone, Copy, PartialEq, Eq)]
683pub enum RuntimeGraphCentralityAlgorithm {
684    Degree,
685    Closeness,
686    Betweenness,
687    Eigenvector,
688    PageRank,
689}
690
691#[derive(Debug, Clone, Copy, PartialEq, Eq)]
692pub enum RuntimeGraphCommunityAlgorithm {
693    LabelPropagation,
694    Louvain,
695}
696
697#[derive(Debug, Clone)]
698pub struct RuntimeGraphComponent {
699    pub id: String,
700    pub size: usize,
701    pub nodes: Vec<String>,
702}
703
704#[derive(Debug, Clone)]
705pub struct RuntimeGraphComponentsResult {
706    pub mode: RuntimeGraphComponentsMode,
707    pub count: usize,
708    pub components: Vec<RuntimeGraphComponent>,
709}
710
711#[derive(Debug, Clone)]
712pub struct RuntimeGraphCentralityScore {
713    pub node: RuntimeGraphNode,
714    pub score: f64,
715}
716
717#[derive(Debug, Clone)]
718pub struct RuntimeGraphDegreeScore {
719    pub node: RuntimeGraphNode,
720    pub in_degree: usize,
721    pub out_degree: usize,
722    pub total_degree: usize,
723}
724
725#[derive(Debug, Clone)]
726pub struct RuntimeGraphCentralityResult {
727    pub algorithm: RuntimeGraphCentralityAlgorithm,
728    pub normalized: Option<bool>,
729    pub iterations: Option<usize>,
730    pub converged: Option<bool>,
731    pub scores: Vec<RuntimeGraphCentralityScore>,
732    pub degree_scores: Vec<RuntimeGraphDegreeScore>,
733}
734
735#[derive(Debug, Clone)]
736pub struct RuntimeGraphCommunity {
737    pub id: String,
738    pub size: usize,
739    pub nodes: Vec<String>,
740}
741
742#[derive(Debug, Clone)]
743pub struct RuntimeGraphCommunityResult {
744    pub algorithm: RuntimeGraphCommunityAlgorithm,
745    pub count: usize,
746    pub iterations: Option<usize>,
747    pub converged: Option<bool>,
748    pub modularity: Option<f64>,
749    pub passes: Option<usize>,
750    pub communities: Vec<RuntimeGraphCommunity>,
751}
752
753#[derive(Debug, Clone)]
754pub struct RuntimeGraphClusteringResult {
755    pub global: f64,
756    pub local: Vec<RuntimeGraphCentralityScore>,
757    pub triangle_count: Option<usize>,
758}
759
760#[derive(Debug, Clone)]
761pub struct RuntimeGraphHitsResult {
762    pub iterations: usize,
763    pub converged: bool,
764    pub hubs: Vec<RuntimeGraphCentralityScore>,
765    pub authorities: Vec<RuntimeGraphCentralityScore>,
766}
767
768#[derive(Debug, Clone)]
769pub struct RuntimeGraphCyclesResult {
770    pub limit_reached: bool,
771    pub cycles: Vec<RuntimeGraphPath>,
772}
773
774#[derive(Debug, Clone)]
775pub struct RuntimeGraphTopologicalSortResult {
776    pub acyclic: bool,
777    pub ordered_nodes: Vec<RuntimeGraphNode>,
778}
779
780#[derive(Debug, Clone)]
781pub struct RuntimeGraphPropertiesResult {
782    pub node_count: usize,
783    pub edge_count: usize,
784    pub self_loop_count: usize,
785    pub negative_edge_count: usize,
786    pub connected_component_count: usize,
787    pub weak_component_count: usize,
788    pub strong_component_count: usize,
789    pub is_empty: bool,
790    pub is_connected: bool,
791    pub is_weakly_connected: bool,
792    pub is_strongly_connected: bool,
793    pub is_complete: bool,
794    pub is_complete_directed: bool,
795    pub is_cyclic: bool,
796    pub is_circular: bool,
797    pub is_acyclic: bool,
798    pub is_tree: bool,
799    pub density: f64,
800    pub density_directed: f64,
801}
802
803// ============================================================================
804// Context Search types
805// ============================================================================
806
807#[derive(Debug, Clone)]
808pub struct ContextSearchResult {
809    pub query: String,
810    pub tables: Vec<ContextEntity>,
811    pub graph: ContextGraphResult,
812    pub vectors: Vec<ContextEntity>,
813    pub documents: Vec<ContextEntity>,
814    pub key_values: Vec<ContextEntity>,
815    pub connections: Vec<ContextConnection>,
816    pub summary: ContextSummary,
817}
818
819#[derive(Debug, Clone)]
820pub struct ContextEntity {
821    pub entity: UnifiedEntity,
822    pub score: f32,
823    pub discovery: DiscoveryMethod,
824    pub collection: String,
825}
826
827#[derive(Debug, Clone)]
828pub enum DiscoveryMethod {
829    Indexed {
830        field: String,
831    },
832    GlobalScan,
833    CrossReference {
834        source_id: u64,
835        ref_type: String,
836    },
837    GraphTraversal {
838        source_id: u64,
839        edge_type: String,
840        depth: usize,
841    },
842    VectorQuery {
843        similarity: f32,
844    },
845}
846
847#[derive(Debug, Clone)]
848pub struct ContextGraphResult {
849    pub nodes: Vec<ContextEntity>,
850    pub edges: Vec<ContextEntity>,
851}
852
853#[derive(Debug, Clone)]
854pub struct ContextConnection {
855    pub from_id: u64,
856    pub to_id: u64,
857    pub connection_type: ContextConnectionType,
858    pub weight: f32,
859}
860
861#[derive(Debug, Clone)]
862pub enum ContextConnectionType {
863    CrossRef(String),
864    GraphEdge(String),
865    VectorSimilarity(f32),
866}
867
868#[derive(Debug, Clone)]
869pub struct ContextSummary {
870    pub total_entities: usize,
871    pub direct_matches: usize,
872    pub expanded_via_graph: usize,
873    pub expanded_via_cross_refs: usize,
874    pub expanded_via_vector_query: usize,
875    pub collections_searched: usize,
876    pub execution_time_us: u64,
877    pub tiers_used: Vec<String>,
878    pub entities_reindexed: usize,
879}
880
881struct PoolState {
882    next_id: u64,
883    active: usize,
884    idle: Vec<u64>,
885    total_checkouts: u64,
886}
887
888impl Default for PoolState {
889    fn default() -> Self {
890        Self {
891            next_id: 1,
892            active: 0,
893            idle: Vec::new(),
894            total_checkouts: 0,
895        }
896    }
897}
898
899#[derive(Debug, Clone)]
900struct RuntimeResultCacheEntry {
901    result: RuntimeQueryResult,
902    cached_at: std::time::Instant,
903    scopes: HashSet<String>,
904}
905
906pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
907pub(crate) const ASK_ANSWER_CACHE_NAMESPACE: &str = "runtime.ask_answer_cache";
908const RMW_LOCK_SHARDS: usize = 64;
909
910struct RmwLockTable {
911    shards: Vec<parking_lot::Mutex<HashMap<String, Arc<parking_lot::Mutex<()>>>>>,
912}
913
914impl RmwLockTable {
915    fn new() -> Self {
916        let shards = (0..RMW_LOCK_SHARDS)
917            .map(|_| parking_lot::Mutex::new(HashMap::new()))
918            .collect();
919        Self { shards }
920    }
921
922    fn lock_for(&self, collection: &str, key: &str) -> Arc<parking_lot::Mutex<()>> {
923        use std::hash::{Hash, Hasher};
924
925        let mut hasher = std::collections::hash_map::DefaultHasher::new();
926        collection.hash(&mut hasher);
927        key.hash(&mut hasher);
928        let shard_idx = (hasher.finish() as usize) % self.shards.len();
929        let map_key = format!("{collection}\u{1f}{key}");
930        let mut shard = self.shards[shard_idx].lock();
931        shard
932            .entry(map_key)
933            .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
934            .clone()
935    }
936}
937
938struct RuntimeInner {
939    db: Arc<RedDB>,
940    layout: PhysicalLayout,
941    indices: IndexCatalog,
942    pool_config: ConnectionPoolConfig,
943    pool: Mutex<PoolState>,
944    started_at_unix_ms: u128,
945    probabilistic: probabilistic_store::ProbabilisticStore,
946    index_store: index_store::IndexStore,
947    cdc: crate::replication::cdc::CdcBuffer,
948    backup_scheduler: crate::replication::scheduler::BackupScheduler,
949    query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
950    result_cache: parking_lot::RwLock<(
951        HashMap<String, RuntimeResultCacheEntry>,
952        std::collections::VecDeque<String>,
953    )>,
954    result_blob_cache: crate::storage::cache::BlobCache,
955    result_blob_entries: parking_lot::RwLock<(
956        HashMap<String, RuntimeResultCacheEntry>,
957        std::collections::VecDeque<String>,
958    )>,
959    ask_answer_cache_entries:
960        parking_lot::RwLock<(HashSet<String>, std::collections::VecDeque<String>)>,
961    result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
962    ask_daily_spend:
963        parking_lot::RwLock<HashMap<String, crate::runtime::ai::cost_guard::DailyState>>,
964    /// Process-local queue message locks used to emulate `SKIP LOCKED`-style
965    /// claim semantics for concurrent queue consumers inside this runtime.
966    queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
967    /// Process-local read-modify-write locks. The table is sharded by
968    /// `(collection, key)` and each entry has its own mutex, so unrelated keys
969    /// in the same collection do not serialize behind one global lock.
970    rmw_locks: RmwLockTable,
971    planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
972    ec_registry: Arc<crate::ec::config::EcRegistry>,
973    ec_worker: crate::ec::worker::EcWorker,
974    /// Optional AuthStore — injected by server boot when auth is
975    /// enabled. Required for `Value::Secret` auto-encrypt/decrypt
976    /// because the AES key lives in the vault KV under the
977    /// `red.secret.aes_key` entry.
978    auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
979    /// Optional OAuth/OIDC JWT validator. Wired by server boot when
980    /// the operator configures issuer + JWKS via env / CLI. HTTP and
981    /// wire transports read this on every bearer-token request and,
982    /// when the token decodes as a JWT, validate it against the
983    /// configured issuer + audience + signature before falling back to
984    /// the local AuthStore lookup.
985    oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
986    /// View registry (Phase 2.1 PG parity).
987    ///
988    /// Holds the parsed `SELECT` body for every view created via
989    /// `CREATE [MATERIALIZED] VIEW`. Queries that reference a view name
990    /// substitute the stored `QueryExpr` at execution time. Materialized
991    /// views additionally back onto the shared `MaterializedViewCache`
992    /// (see `RuntimeInner::materialized_views`).
993    ///
994    /// This is in-memory only in Phase 2.1 — view definitions do not
995    /// survive a restart. Persistence is a Phase 3 follow-up.
996    views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
997    materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
998    /// Per-collection retention sweeper state (issue #584 slice 12).
999    /// Tracks `last_sweep_at_ms`, `rows_swept_total`, and the latest
1000    /// pending-rows estimate that feeds the three new columns on
1001    /// `red.retention`. In-memory only; resets across restart.
1002    pub(crate) retention_sweeper:
1003        parking_lot::RwLock<crate::runtime::retention_sweeper::RetentionSweeperState>,
1004    /// MVCC snapshot manager (Phase 2.3 PG parity).
1005    ///
1006    /// Allocates monotonic `xid`s on BEGIN and tracks the active/aborted
1007    /// sets used by `Snapshot::sees` to filter tuples by visibility. Each
1008    /// query evaluates `entity.is_visible(snapshot.xid)` — pre-MVCC rows
1009    /// (`xmin == 0`) stay visible to every snapshot, preserving backward
1010    /// compatibility with data written before the xid fields existed.
1011    snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
1012    /// Connection → active transaction context map (Phase 2.3 PG parity).
1013    ///
1014    /// Keyed by connection id from `RuntimeConnection`. Populated on BEGIN,
1015    /// cleared on COMMIT/ROLLBACK. When a statement executes outside a
1016    /// transaction (autocommit path) no entry exists and writes stamp
1017    /// `xid=0` — identical to pre-MVCC behaviour.
1018    tx_contexts:
1019        parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
1020    /// Intent-lock hierarchy (IS/IX/S/X) used to break the implicit
1021    /// global-write serialisation in write paths. Populated at boot
1022    /// with `concurrency.locking.deadlock_timeout_ms` from the matrix
1023    /// and wired through DML/DDL dispatch in later P1 tasks.
1024    /// Dormant until P1.T3 flips the read path to `(Global,IS) →
1025    /// (Collection,IS)` and P1.T4/T5 pick up writes/DDL.
1026    lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
1027    /// Perf-parity env-var overrides (`REDDB_<UP_DOTTED_KEY>`).
1028    /// Populated once at boot, read by every config getter; takes
1029    /// precedence over the persisted red_config value so operators
1030    /// can hot-fix a bad config by restarting with a different env
1031    /// var set. Keys are restricted to those declared in the matrix.
1032    env_config_overrides: HashMap<String, String>,
1033    /// Transaction-local tenant override (`SET LOCAL TENANT '<id>'`).
1034    /// Keyed by connection id, mirroring `tx_contexts`. Lives only while
1035    /// a transaction is open — `COMMIT` / `ROLLBACK` evict the entry,
1036    /// returning the connection to whichever session-level tenant
1037    /// (`SET TENANT 'x'`) was active before the transaction began.
1038    /// Wins over the session value but loses to a per-statement
1039    /// `WITHIN TENANT '<id>' …` override on the same call.
1040    tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
1041    /// Row-level security policies (Phase 2.5 PG parity).
1042    ///
1043    /// Keyed by `(table_name, policy_name)`; the set of tables with RLS
1044    /// enforcement toggled on lives in `rls_enabled_tables`. Filter
1045    /// enforcement hooks into the read path via `collect_rls_filters()`
1046    /// — see `runtime::impl_core`.
1047    rls_policies: parking_lot::RwLock<
1048        HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
1049    >,
1050    rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
1051    /// Foreign Data Wrapper registry (Phase 3.2 PG parity).
1052    ///
1053    /// Maps server names → wrapper instances and foreign-table names →
1054    /// definitions. Queries referencing a registered foreign table are
1055    /// re-routed to `ForeignTableRegistry::scan` by the read-path
1056    /// rewriter; reads against unknown names fall through to the native
1057    /// collection lookup.
1058    foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
1059    /// Per-connection list of tuples marked for deletion by the current
1060    /// transaction (Phase 2.3.2b MVCC tombstones + 2.3.2e savepoints).
1061    /// Each entry is `(collection, entity_id, stamper_xid, previous_xmax)`
1062    /// — the xid that stamped xmax on the tuple plus the value it
1063    /// replaced. For a plain transaction the
1064    /// stamper equals `ctx.xid`; with savepoints the stamper equals
1065    /// the innermost open sub-xid so ROLLBACK TO SAVEPOINT can revive
1066    /// only the matching subset. COMMIT drains the whole conn list
1067    /// and keeps the committed tombstones; ROLLBACK (whole-tx) revives them all;
1068    /// ROLLBACK TO SAVEPOINT revives those with `stamper_xid >=
1069    /// savepoint_xid`. Autocommit DELETE bypasses this map.
1070    pending_tombstones: parking_lot::RwLock<
1071        HashMap<
1072            u64,
1073            Vec<(
1074                String,
1075                crate::storage::unified::entity::EntityId,
1076                crate::storage::transaction::snapshot::Xid,
1077                crate::storage::transaction::snapshot::Xid,
1078            )>,
1079        >,
1080    >,
1081    /// Per-connection table-row UPDATE versions created by an open
1082    /// transaction. Each entry is `(collection, old_entity_id,
1083    /// new_entity_id, stamper_xid, previous_xmax)`. COMMIT keeps both physical
1084    /// versions and drops the pending marker; ROLLBACK revives the old
1085    /// version and removes the new uncommitted version.
1086    pending_versioned_updates: parking_lot::RwLock<
1087        HashMap<
1088            u64,
1089            Vec<(
1090                String,
1091                crate::storage::unified::entity::EntityId,
1092                crate::storage::unified::entity::EntityId,
1093                crate::storage::transaction::snapshot::Xid,
1094                crate::storage::transaction::snapshot::Xid,
1095            )>,
1096        >,
1097    >,
1098    pending_kv_watch_events:
1099        parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
1100    pending_store_wal_actions:
1101        parking_lot::RwLock<HashMap<u64, crate::storage::unified::DeferredStoreWalActions>>,
1102    /// Table-scoped tenancy registry (Phase 2.5.4).
1103    ///
1104    /// Maps `table_name → tenant_column`. DML auto-fill looks here to
1105    /// inject `CURRENT_TENANT()` on INSERTs that omit the column, and
1106    /// DDL keeps the in-memory registry in sync with the
1107    /// `tenant_tables.*` keys in red_config. Read-side enforcement
1108    /// piggy-backs on the existing RLS infrastructure: every entry
1109    /// installs an implicit `col = CURRENT_TENANT()` policy.
1110    tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
1111    /// Monotonic epoch bumped on every DDL / schema-mutating operation
1112    /// that calls `invalidate_plan_cache`. Prepared statements capture
1113    /// this at PREPARE and re-check at EXECUTE — a mismatch means the
1114    /// cached shape may reference dropped or renamed columns and the
1115    /// client must re-PREPARE.
1116    ddl_epoch: std::sync::atomic::AtomicU64,
1117    /// Public-mutation gate (PLAN.md W1).
1118    ///
1119    /// Built once at construction from the immutable subset of
1120    /// `RedDBOptions` (read_only flag + replication role). Every public
1121    /// mutation surface — SQL DML/DDL, gRPC mutating RPCs, HTTP/native
1122    /// wire mutations, admin maintenance endpoints, serverless
1123    /// lifecycle — consults `write_gate.check(WriteKind::*)` before
1124    /// dispatching to storage. The replica internal apply path
1125    /// (`LogicalChangeApplier`) reaches into the store directly and
1126    /// bypasses the gate by construction.
1127    write_gate: Arc<crate::runtime::write_gate::WriteGate>,
1128    /// Process lifecycle state machine (PLAN.md Phase 1 — Lifecycle
1129    /// Contract). Drives `/health/live`, `/health/ready`,
1130    /// `/health/startup`, and `POST /admin/shutdown` so any
1131    /// orchestrator (K8s preStop, Fly autostop, ECS task drain,
1132    /// systemd) can coordinate without losing data.
1133    lifecycle: crate::runtime::lifecycle::Lifecycle,
1134    /// Operator-imposed resource limits (PLAN.md Phase 4.1).
1135    /// Read once at boot from `RED_MAX_*` env vars; consulted by
1136    /// observability and (in follow-up commits) the per-write
1137    /// enforcement points.
1138    resource_limits: crate::runtime::resource_limits::ResourceLimits,
1139    /// Append-only audit log for admin mutations (PLAN.md Phase
1140    /// 6.5). Lives next to the primary `.rdb` file so backup +
1141    /// restore flows ship it alongside the data.
1142    audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
1143    /// Serverless writer-lease state machine. `None` when the operator
1144    /// did not opt into lease fencing (`RED_LEASE_REQUIRED` unset/false).
1145    /// When set, owns the {acquire/refresh/release/lost} transitions and
1146    /// is the single place that mutates `write_gate.set_lease_state` and
1147    /// records lease/* audit entries — keeping those two side-effects
1148    /// from drifting.
1149    lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
1150    /// PLAN.md Phase 11.5 — counters bumped by the replica apply
1151    /// loop on `Gap` / `Divergence` / `Apply` errors so /metrics
1152    /// surfaces them as `reddb_replica_apply_errors_total{kind}`.
1153    replica_apply_metrics: crate::replication::logical::ReplicaApplyMetrics,
1154    /// PLAN.md Phase 4.4 — per-caller QPS quotas. Disabled (no-op)
1155    /// when `RED_MAX_QPS_PER_CALLER` is unset.
1156    quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
1157    /// Issue #120 — token → schema entity reverse index, kept current
1158    /// incrementally on DDL events. Consumed by AskPipeline (issue
1159    /// #121) Stage 2 to narrow vector-search candidates before any
1160    /// embedding compute. Mutated only from DDL execution paths.
1161    schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
1162    /// Issue #205 — dedicated slow-query sink (`red-slow.log`).
1163    /// Built once at runtime startup; below-threshold calls pay only a
1164    /// single relaxed atomic load. Threshold + sample-pct come from
1165    /// `runtime.slow_query.threshold_ms` / `.sample_pct` (config matrix)
1166    /// at construction; live tuning via the config tree is a follow-up.
1167    slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
1168    /// Process-local normal-KV operation counters. These are intentionally
1169    /// runtime-local; persistent accounting belongs in catalog stats.
1170    kv_stats: KvStatsCounters,
1171    metrics_ingest_stats: MetricsIngestCounters,
1172    metrics_tenant_activity_stats: MetricsTenantActivityCounters,
1173    /// Slice 10 of issue #527 — Prometheus counters for the queue
1174    /// lifecycle (delivered/acked/nacked) rendered onto `/metrics`.
1175    /// Process-local; counters reset on restart by design.
1176    queue_telemetry: std::sync::Arc<queue_telemetry::QueueTelemetryCounters>,
1177    /// Process-local normal-KV tag index used by `INVALIDATE TAGS`.
1178    kv_tag_index: KvTagIndex,
1179    /// Issue #524 — in-memory chain-tip cache per collection. Populated lazily
1180    /// by the first INSERT or `GET /chain-tip` call after restart and updated
1181    /// atomically with each chain INSERT. Backed by a single mutex so a chain
1182    /// INSERT serialises against concurrent submitters — the loser observes
1183    /// the advanced tip and surfaces `BlockchainConflict` to its caller.
1184    chain_tip_cache: parking_lot::Mutex<
1185        HashMap<String, crate::runtime::blockchain_kind::ChainTipFull>,
1186    >,
1187    /// Issue #525 — in-memory mirror of the persisted `integrity` flag per
1188    /// chain collection.  `true` means INSERTs must be rejected with
1189    /// `ChainIntegrityBroken`.  Loaded lazily from `red_config` on first
1190    /// access so the flag survives restart.
1191    chain_integrity_broken: parking_lot::Mutex<HashMap<String, bool>>,
1192}
1193
1194#[derive(Clone)]
1195pub struct RedDBRuntime {
1196    inner: Arc<RuntimeInner>,
1197}
1198
1199pub struct RuntimeConnection {
1200    id: u64,
1201    inner: Arc<RuntimeInner>,
1202}
1203
1204pub mod ai;
1205pub mod analytics_schema_registry;
1206pub mod ask_pipeline;
1207pub mod audit_log;
1208pub mod audit_query;
1209pub mod authorized_search;
1210pub mod batch_insert;
1211pub mod blockchain_kind;
1212mod collection_contract;
1213pub mod config_matrix;
1214pub mod config_overlay;
1215pub mod config_watcher;
1216pub(crate) mod ddl;
1217pub mod disk_space_monitor;
1218mod dml_target_scan;
1219mod expr_eval;
1220mod graph_dsl;
1221mod health_connection;
1222mod impl_config;
1223pub(crate) mod impl_core;
1224mod impl_ddl;
1225mod impl_dml;
1226mod impl_ec;
1227mod impl_events;
1228mod impl_graph;
1229mod impl_graph_commands;
1230pub mod impl_kv;
1231mod impl_migrations;
1232mod impl_native;
1233mod impl_physical;
1234mod impl_probabilistic;
1235pub mod impl_queue;
1236mod impl_search;
1237mod impl_timeseries;
1238mod impl_tree;
1239mod impl_vcs;
1240mod index_store;
1241mod join_filter;
1242mod keyed_spine;
1243pub mod kv_watch;
1244pub mod lease_lifecycle;
1245pub mod lease_loop;
1246pub mod lease_timer_wheel;
1247pub mod lifecycle;
1248pub mod locking;
1249pub(crate) mod mutation;
1250mod probabilistic_store;
1251pub(crate) mod query_exec;
1252mod queue_delivery;
1253pub(crate) mod queue_lifecycle;
1254pub(crate) mod queue_telemetry;
1255pub(crate) mod primary_queue_store;
1256pub(crate) mod replica_queue_store;
1257pub mod quota_bucket;
1258mod record_search;
1259mod red_schema;
1260pub(crate) mod retention_filter;
1261pub(crate) mod retention_sweeper;
1262pub(crate) mod sessionize;
1263pub mod resource_limits;
1264pub(crate) mod scalar_evaluator;
1265pub mod schema_diff;
1266pub mod schema_vocabulary;
1267pub mod signed_chain;
1268pub mod signed_writes_kind;
1269pub mod snapshot_reuse;
1270mod statement_frame;
1271mod table_row_mvcc_resolver;
1272mod vector_index;
1273pub mod within_clause;
1274pub mod write_gate;
1275
1276pub use self::graph_dsl::*;
1277use self::join_filter::*;
1278use self::query_exec::*;
1279use self::record_search::*;
1280pub use self::statement_frame::EffectiveScope;
1281
1282/// Re-exports for transports + tests that need per-connection
1283/// isolation, tenant / auth thread-locals, and MVCC snapshot
1284/// utilities. Mirrors what PG-wire / gRPC / HTTP middleware already
1285/// call, and is enough to emulate independent connections in
1286/// integration tests.
1287pub mod mvcc {
1288    pub use super::impl_core::{
1289        capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
1290        clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
1291        entity_visible_under_current_snapshot, entity_visible_with_context,
1292        set_current_auth_identity, set_current_connection_id, set_current_snapshot,
1293        set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
1294    };
1295}
1296
1297/// Public helpers re-exported for use by the presentation layer.
1298pub mod record_search_helpers {
1299    use crate::storage::query::UnifiedRecord;
1300    use crate::storage::UnifiedEntity;
1301    use std::collections::BTreeSet;
1302
1303    pub fn entity_type_and_capabilities(
1304        entity: &UnifiedEntity,
1305    ) -> (&'static str, BTreeSet<String>) {
1306        super::record_search::runtime_entity_type_and_capabilities(entity)
1307    }
1308
1309    /// Materialise any entity kind (TableRow, Node, Edge, Vector,
1310    /// TimeSeriesPoint, QueueMessage) into a `UnifiedRecord` whose
1311    /// `values` carry the native fields. Used by the RLS evaluator
1312    /// when a non-table collection matches a `CompareExpr` policy.
1313    pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
1314        super::record_search::runtime_any_record_from_entity(entity)
1315    }
1316}