Skip to main content

reddb_server/
runtime.rs

1//! Embedded runtime with connection pooling, scans and health.
2
3use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{RedDBError, RedDBOptions, RedDBResult};
10use crate::catalog::{
11    CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
12    CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
13};
14use crate::health::{HealthProvider, HealthReport};
15use crate::index::IndexCatalog;
16use crate::physical::{
17    ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
18    SnapshotDescriptor,
19};
20use crate::serde_json::Value as JsonValue;
21use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
22use crate::storage::engine::{
23    BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
24    CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
25    IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
26    MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
27    StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
28};
29use crate::storage::query::ast::{
30    AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateCollectionQuery,
31    CreateIndexQuery, CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery,
32    CreateVectorQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery,
33    DropIndexQuery, DropKvQuery, DropQueueQuery, DropTableQuery, DropTimeSeriesQuery,
34    DropTreeQuery, DropVectorQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainFormat,
35    FieldRef, Filter, FusionStrategy, GraphCommand, HybridQuery, IndexMethod, InsertEntityType,
36    InsertQuery, JoinQuery, JoinType, OrderByClause, ProbabilisticCommand, Projection, QueryExpr,
37    QueueCommand, QueueSelectQuery, QueueSide, SearchCommand, TableQuery, TreeCommand,
38    TruncateQuery, UpdateQuery, VectorQuery, VectorSource,
39};
40use crate::storage::query::is_universal_entity_source as is_universal_query_source;
41use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
42use crate::storage::query::planner::{
43    CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
44};
45use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
46use crate::storage::schema::Value;
47use crate::storage::unified::dsl::{
48    apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
49    FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
50    QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
51};
52use crate::storage::unified::store::{
53    NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
54    NativeRegistrySummary,
55};
56use crate::storage::unified::{
57    Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
58};
59use crate::storage::{
60    EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
61    UnifiedStore,
62};
63
64#[derive(Debug, Clone)]
65pub struct ConnectionPoolConfig {
66    pub max_connections: usize,
67    pub max_idle: usize,
68}
69
70impl Default for ConnectionPoolConfig {
71    fn default() -> Self {
72        Self {
73            max_connections: 64,
74            max_idle: 16,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct ScanCursor {
81    pub offset: usize,
82}
83
84#[derive(Debug, Clone)]
85pub struct ScanPage {
86    pub collection: String,
87    pub items: Vec<UnifiedEntity>,
88    pub next: Option<ScanCursor>,
89    pub total: usize,
90}
91
92#[derive(Debug, Clone)]
93pub struct SystemInfo {
94    pub pid: u32,
95    pub cpu_cores: usize,
96    pub total_memory_bytes: u64,
97    pub available_memory_bytes: u64,
98    pub os: String,
99    pub arch: String,
100    pub hostname: String,
101}
102
103impl SystemInfo {
104    /// Whether the system has enough cores to benefit from parallelism.
105    /// Returns false on single-core machines where thread overhead > gains.
106    pub fn should_parallelize() -> bool {
107        std::thread::available_parallelism()
108            .map(|p| p.get() > 1)
109            .unwrap_or(false)
110    }
111
112    pub fn collect() -> Self {
113        Self {
114            pid: std::process::id(),
115            cpu_cores: std::thread::available_parallelism()
116                .map(|p| p.get())
117                .unwrap_or(1),
118            total_memory_bytes: Self::read_total_memory(),
119            available_memory_bytes: Self::read_available_memory(),
120            os: std::env::consts::OS.to_string(),
121            arch: std::env::consts::ARCH.to_string(),
122            hostname: std::env::var("HOSTNAME")
123                .or_else(|_| std::env::var("COMPUTERNAME"))
124                .unwrap_or_else(|_| "unknown".to_string()),
125        }
126    }
127
128    #[cfg(target_os = "linux")]
129    fn read_total_memory() -> u64 {
130        std::fs::read_to_string("/proc/meminfo")
131            .ok()
132            .and_then(|s| {
133                s.lines()
134                    .find(|l| l.starts_with("MemTotal:"))
135                    .and_then(|l| {
136                        l.split_whitespace()
137                            .nth(1)
138                            .and_then(|v| v.parse::<u64>().ok())
139                    })
140                    .map(|kb| kb * 1024)
141            })
142            .unwrap_or(0)
143    }
144
145    #[cfg(target_os = "linux")]
146    fn read_available_memory() -> u64 {
147        std::fs::read_to_string("/proc/meminfo")
148            .ok()
149            .and_then(|s| {
150                s.lines()
151                    .find(|l| l.starts_with("MemAvailable:"))
152                    .and_then(|l| {
153                        l.split_whitespace()
154                            .nth(1)
155                            .and_then(|v| v.parse::<u64>().ok())
156                    })
157                    .map(|kb| kb * 1024)
158            })
159            .unwrap_or(0)
160    }
161
162    #[cfg(not(target_os = "linux"))]
163    fn read_total_memory() -> u64 {
164        0
165    }
166
167    #[cfg(not(target_os = "linux"))]
168    fn read_available_memory() -> u64 {
169        0
170    }
171}
172
173#[derive(Debug, Clone)]
174pub struct RuntimeStats {
175    pub active_connections: usize,
176    pub idle_connections: usize,
177    pub total_checkouts: u64,
178    pub paged_mode: bool,
179    pub started_at_unix_ms: u128,
180    pub store: StoreStats,
181    pub system: SystemInfo,
182    pub result_blob_cache: crate::storage::cache::BlobCacheStats,
183    pub kv: KvStats,
184    pub metrics_ingest: MetricsIngestStats,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct MetricsTenantActivityStats {
189    pub tenant: String,
190    pub namespace: String,
191    pub operation: String,
192    pub count: u64,
193}
194
195#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
196pub struct MetricsIngestStats {
197    pub samples_accepted: u64,
198    pub series_accepted: u64,
199    pub samples_rejected: u64,
200    pub series_rejected: u64,
201    pub series_rejected_cardinality_budget: u64,
202}
203
204#[derive(Debug, Default)]
205pub(crate) struct MetricsIngestCounters {
206    samples_accepted: AtomicU64,
207    series_accepted: AtomicU64,
208    samples_rejected: AtomicU64,
209    series_rejected: AtomicU64,
210    series_rejected_cardinality_budget: AtomicU64,
211}
212
213impl MetricsIngestCounters {
214    pub(crate) fn record(
215        &self,
216        accepted_samples: u64,
217        accepted_series: u64,
218        rejected_samples: u64,
219        rejected_series: u64,
220    ) {
221        self.samples_accepted
222            .fetch_add(accepted_samples, AtomicOrdering::Relaxed);
223        self.series_accepted
224            .fetch_add(accepted_series, AtomicOrdering::Relaxed);
225        self.samples_rejected
226            .fetch_add(rejected_samples, AtomicOrdering::Relaxed);
227        self.series_rejected
228            .fetch_add(rejected_series, AtomicOrdering::Relaxed);
229    }
230
231    pub(crate) fn record_cardinality_budget_rejections(&self, rejected_series: u64) {
232        self.series_rejected_cardinality_budget
233            .fetch_add(rejected_series, AtomicOrdering::Relaxed);
234    }
235
236    pub(crate) fn snapshot(&self) -> MetricsIngestStats {
237        MetricsIngestStats {
238            samples_accepted: self.samples_accepted.load(AtomicOrdering::Relaxed),
239            series_accepted: self.series_accepted.load(AtomicOrdering::Relaxed),
240            samples_rejected: self.samples_rejected.load(AtomicOrdering::Relaxed),
241            series_rejected: self.series_rejected.load(AtomicOrdering::Relaxed),
242            series_rejected_cardinality_budget: self
243                .series_rejected_cardinality_budget
244                .load(AtomicOrdering::Relaxed),
245        }
246    }
247}
248
249#[derive(Debug, Default)]
250pub(crate) struct MetricsTenantActivityCounters {
251    inner: Mutex<BTreeMap<(String, String, String), u64>>,
252}
253
254impl MetricsTenantActivityCounters {
255    pub(crate) fn record(&self, tenant: &str, namespace: &str, operation: &str) {
256        let mut inner = self
257            .inner
258            .lock()
259            .unwrap_or_else(|poison| poison.into_inner());
260        let key = (
261            tenant.to_string(),
262            namespace.to_string(),
263            operation.to_string(),
264        );
265        *inner.entry(key).or_insert(0) += 1;
266    }
267
268    pub(crate) fn snapshot(&self) -> Vec<MetricsTenantActivityStats> {
269        let inner = self
270            .inner
271            .lock()
272            .unwrap_or_else(|poison| poison.into_inner());
273        inner
274            .iter()
275            .map(
276                |((tenant, namespace, operation), count)| MetricsTenantActivityStats {
277                    tenant: tenant.clone(),
278                    namespace: namespace.clone(),
279                    operation: operation.clone(),
280                    count: *count,
281                },
282            )
283            .collect()
284    }
285}
286
287#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
288pub struct KvStats {
289    pub puts: u64,
290    pub gets: u64,
291    pub deletes: u64,
292    pub incrs: u64,
293    pub cas_success: u64,
294    pub cas_conflict: u64,
295    pub watch_streams_active: u64,
296    pub watch_events_emitted: u64,
297    pub watch_drops: u64,
298}
299
300#[derive(Debug, Default)]
301pub(crate) struct KvStatsCounters {
302    puts: AtomicU64,
303    gets: AtomicU64,
304    deletes: AtomicU64,
305    incrs: AtomicU64,
306    cas_success: AtomicU64,
307    cas_conflict: AtomicU64,
308    watch_streams_active: AtomicU64,
309    watch_events_emitted: AtomicU64,
310    watch_drops: AtomicU64,
311}
312
313#[derive(Debug, Default)]
314pub(crate) struct KvTagIndex {
315    tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
316    key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
317}
318
319impl KvTagIndex {
320    pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
321        let entry_key = (collection.to_string(), key.to_string());
322        let new_tags: BTreeSet<String> = tags
323            .iter()
324            .map(|tag| tag.trim())
325            .filter(|tag| !tag.is_empty())
326            .map(ToOwned::to_owned)
327            .collect();
328
329        let old_tags = {
330            let mut key_to_tags = self.key_to_tags.write();
331            if new_tags.is_empty() {
332                key_to_tags.remove(&entry_key)
333            } else {
334                key_to_tags.insert(entry_key.clone(), new_tags.clone())
335            }
336        };
337
338        let mut tag_to_entries = self.tag_to_entries.write();
339        if let Some(old_tags) = old_tags {
340            for tag in old_tags {
341                let scoped = (collection.to_string(), tag);
342                let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
343                    entries.remove(key);
344                    entries.is_empty()
345                } else {
346                    false
347                };
348                if remove_scoped {
349                    tag_to_entries.remove(&scoped);
350                }
351            }
352        }
353
354        for tag in new_tags {
355            tag_to_entries
356                .entry((collection.to_string(), tag))
357                .or_default()
358                .insert(key.to_string(), id);
359        }
360    }
361
362    pub(crate) fn remove(&self, collection: &str, key: &str) {
363        let entry_key = (collection.to_string(), key.to_string());
364        let old_tags = self.key_to_tags.write().remove(&entry_key);
365        let Some(old_tags) = old_tags else {
366            return;
367        };
368
369        let mut tag_to_entries = self.tag_to_entries.write();
370        for tag in old_tags {
371            let scoped = (collection.to_string(), tag);
372            let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
373                entries.remove(key);
374                entries.is_empty()
375            } else {
376                false
377            };
378            if remove_scoped {
379                tag_to_entries.remove(&scoped);
380            }
381        }
382    }
383
384    pub(crate) fn entries_for_tags(
385        &self,
386        collection: &str,
387        tags: &[String],
388    ) -> Vec<(String, EntityId)> {
389        if tags.is_empty() {
390            return Vec::new();
391        }
392
393        let tag_to_entries = self.tag_to_entries.read();
394        let mut out: HashMap<String, EntityId> = HashMap::new();
395        for tag in tags {
396            let scoped = (collection.to_string(), tag.trim().to_string());
397            if let Some(entries) = tag_to_entries.get(&scoped) {
398                for (key, id) in entries {
399                    out.entry(key.clone()).or_insert(*id);
400                }
401            }
402        }
403        out.into_iter().collect()
404    }
405
406    pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
407        self.key_to_tags
408            .read()
409            .get(&(collection.to_string(), key.to_string()))
410            .map(|tags| tags.iter().cloned().collect())
411            .unwrap_or_default()
412    }
413}
414
415impl KvStatsCounters {
416    pub(crate) fn snapshot(&self) -> KvStats {
417        KvStats {
418            puts: self.puts.load(AtomicOrdering::Relaxed),
419            gets: self.gets.load(AtomicOrdering::Relaxed),
420            deletes: self.deletes.load(AtomicOrdering::Relaxed),
421            incrs: self.incrs.load(AtomicOrdering::Relaxed),
422            cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
423            cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
424            watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
425            watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
426            watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
427        }
428    }
429
430    pub(crate) fn incr_puts(&self) {
431        self.puts.fetch_add(1, AtomicOrdering::Relaxed);
432    }
433
434    pub(crate) fn incr_gets(&self) {
435        self.gets.fetch_add(1, AtomicOrdering::Relaxed);
436    }
437
438    pub(crate) fn incr_deletes(&self) {
439        self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
440    }
441
442    pub(crate) fn incr_incrs(&self) {
443        self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
444    }
445
446    pub(crate) fn incr_cas_success(&self) {
447        self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
448    }
449
450    pub(crate) fn incr_cas_conflict(&self) {
451        self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
452    }
453
454    pub(crate) fn incr_watch_streams_active(&self) {
455        self.watch_streams_active
456            .fetch_add(1, AtomicOrdering::Relaxed);
457    }
458
459    pub(crate) fn decr_watch_streams_active(&self) {
460        self.watch_streams_active
461            .fetch_sub(1, AtomicOrdering::Relaxed);
462    }
463
464    pub(crate) fn incr_watch_events_emitted(&self) {
465        self.watch_events_emitted
466            .fetch_add(1, AtomicOrdering::Relaxed);
467    }
468
469    pub(crate) fn add_watch_drops(&self, count: u64) {
470        self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
471    }
472}
473
474#[derive(Debug, Clone)]
475pub struct RuntimeQueryResult {
476    pub query: String,
477    pub mode: QueryMode,
478    pub statement: &'static str,
479    pub engine: &'static str,
480    pub result: UnifiedResult,
481    pub affected_rows: u64,
482    /// High-level statement type: "select", "insert", "update", "delete", "create", "drop", "alter"
483    pub statement_type: &'static str,
484}
485
486impl RuntimeQueryResult {
487    /// Construct a result representing a DML operation (insert/update/delete).
488    pub fn dml_result(
489        query: String,
490        affected: u64,
491        statement_type: &'static str,
492        engine: &'static str,
493    ) -> Self {
494        Self {
495            query,
496            mode: QueryMode::Sql,
497            statement: statement_type,
498            engine,
499            result: UnifiedResult::empty(),
500            affected_rows: affected,
501            statement_type,
502        }
503    }
504
505    /// Construct a result representing a DDL message (create/drop/alter).
506    pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
507        let mut result = UnifiedResult::empty();
508        let mut record = UnifiedRecord::new();
509        record.set("message", Value::text(message.to_string()));
510        result.push(record);
511        result.columns = vec!["message".to_string()];
512
513        Self {
514            query,
515            mode: QueryMode::Sql,
516            statement: statement_type,
517            engine: "runtime-ddl",
518            result,
519            affected_rows: 0,
520            statement_type,
521        }
522    }
523
524    /// Construct a multi-column record result for read-only meta commands
525    /// (EXPLAIN ALTER, schema introspection, etc.). Each row is a Vec of
526    /// (column_name, value) pairs in column order.
527    pub fn ok_records(
528        query: String,
529        columns: Vec<String>,
530        rows: Vec<Vec<(String, Value)>>,
531        statement_type: &'static str,
532    ) -> Self {
533        let mut result = UnifiedResult::empty();
534        for row in rows {
535            let mut record = UnifiedRecord::new();
536            for (k, v) in row {
537                record.set(&k, v);
538            }
539            result.push(record);
540        }
541        result.columns = columns;
542
543        Self {
544            query,
545            mode: QueryMode::Sql,
546            statement: statement_type,
547            engine: "runtime-meta",
548            result,
549            affected_rows: 0,
550            statement_type,
551        }
552    }
553}
554
555#[derive(Debug, Clone)]
556pub struct RuntimeQueryExplain {
557    pub query: String,
558    pub mode: QueryMode,
559    pub statement: &'static str,
560    pub is_universal: bool,
561    pub plan_cost: crate::storage::query::planner::PlanCost,
562    pub estimated_rows: f64,
563    pub estimated_selectivity: f64,
564    pub estimated_confidence: f64,
565    pub passes_applied: Vec<String>,
566    pub logical_plan: CanonicalLogicalPlan,
567    /// Names of any CTEs declared by a leading `WITH` clause. Empty
568    /// for non-CTE queries. The plan tree is built against the
569    /// post-inlining body, so each CTE's body is reachable inside
570    /// `logical_plan` as a regular `Subquery` (or, for bare refs, the
571    /// inlined Table node verbatim). This list lets renderers prepend
572    /// `CteScan` markers so operators see which CTEs were resolved.
573    pub cte_materializations: Vec<String>,
574}
575
576#[derive(Debug, Clone)]
577pub struct RuntimeIvfMatch {
578    pub entity_id: u64,
579    pub distance: f32,
580    pub entity: Option<UnifiedEntity>,
581}
582
583#[derive(Debug, Clone)]
584pub struct RuntimeIvfSearchResult {
585    pub collection: String,
586    pub k: usize,
587    pub n_lists: usize,
588    pub n_probes: usize,
589    pub stats: IvfStats,
590    pub matches: Vec<RuntimeIvfMatch>,
591}
592
593#[derive(Debug, Clone, Copy, PartialEq, Eq)]
594pub enum RuntimeGraphDirection {
595    Outgoing,
596    Incoming,
597    Both,
598}
599
600#[derive(Debug, Clone, Copy, PartialEq, Eq)]
601pub enum RuntimeGraphTraversalStrategy {
602    Bfs,
603    Dfs,
604}
605
606#[derive(Debug, Clone, Copy, PartialEq, Eq)]
607pub enum RuntimeGraphPathAlgorithm {
608    Bfs,
609    Dijkstra,
610    AStar,
611    BellmanFord,
612}
613
614#[derive(Debug, Clone)]
615pub struct RuntimeGraphNode {
616    pub id: String,
617    pub label: String,
618    pub node_type: String,
619    pub out_edge_count: u32,
620    pub in_edge_count: u32,
621}
622
623#[derive(Debug, Clone)]
624pub struct RuntimeGraphEdge {
625    pub source: String,
626    pub target: String,
627    pub edge_type: String,
628    pub weight: f32,
629}
630
631#[derive(Debug, Clone)]
632pub struct RuntimeGraphVisit {
633    pub depth: usize,
634    pub node: RuntimeGraphNode,
635}
636
637#[derive(Debug, Clone)]
638pub struct RuntimeGraphNeighborhoodResult {
639    pub source: String,
640    pub direction: RuntimeGraphDirection,
641    pub max_depth: usize,
642    pub nodes: Vec<RuntimeGraphVisit>,
643    pub edges: Vec<RuntimeGraphEdge>,
644}
645
646#[derive(Debug, Clone)]
647pub struct RuntimeGraphTraversalResult {
648    pub source: String,
649    pub direction: RuntimeGraphDirection,
650    pub strategy: RuntimeGraphTraversalStrategy,
651    pub max_depth: usize,
652    pub visits: Vec<RuntimeGraphVisit>,
653    pub edges: Vec<RuntimeGraphEdge>,
654}
655
656#[derive(Debug, Clone)]
657pub struct RuntimeGraphPath {
658    pub hop_count: usize,
659    pub total_weight: f64,
660    pub nodes: Vec<RuntimeGraphNode>,
661    pub edges: Vec<RuntimeGraphEdge>,
662}
663
664#[derive(Debug, Clone)]
665pub struct RuntimeGraphPathResult {
666    pub source: String,
667    pub target: String,
668    pub direction: RuntimeGraphDirection,
669    pub algorithm: RuntimeGraphPathAlgorithm,
670    pub nodes_visited: usize,
671    pub negative_cycle_detected: Option<bool>,
672    pub path: Option<RuntimeGraphPath>,
673}
674
675#[derive(Debug, Clone, Copy, PartialEq, Eq)]
676pub enum RuntimeGraphComponentsMode {
677    Connected,
678    Weak,
679    Strong,
680}
681
682#[derive(Debug, Clone, Copy, PartialEq, Eq)]
683pub enum RuntimeGraphCentralityAlgorithm {
684    Degree,
685    Closeness,
686    Betweenness,
687    Eigenvector,
688    PageRank,
689}
690
691#[derive(Debug, Clone, Copy, PartialEq, Eq)]
692pub enum RuntimeGraphCommunityAlgorithm {
693    LabelPropagation,
694    Louvain,
695}
696
697#[derive(Debug, Clone)]
698pub struct RuntimeGraphComponent {
699    pub id: String,
700    pub size: usize,
701    pub nodes: Vec<String>,
702}
703
704#[derive(Debug, Clone)]
705pub struct RuntimeGraphComponentsResult {
706    pub mode: RuntimeGraphComponentsMode,
707    pub count: usize,
708    pub components: Vec<RuntimeGraphComponent>,
709}
710
711#[derive(Debug, Clone)]
712pub struct RuntimeGraphCentralityScore {
713    pub node: RuntimeGraphNode,
714    pub score: f64,
715}
716
717#[derive(Debug, Clone)]
718pub struct RuntimeGraphDegreeScore {
719    pub node: RuntimeGraphNode,
720    pub in_degree: usize,
721    pub out_degree: usize,
722    pub total_degree: usize,
723}
724
725#[derive(Debug, Clone)]
726pub struct RuntimeGraphCentralityResult {
727    pub algorithm: RuntimeGraphCentralityAlgorithm,
728    pub normalized: Option<bool>,
729    pub iterations: Option<usize>,
730    pub converged: Option<bool>,
731    pub scores: Vec<RuntimeGraphCentralityScore>,
732    pub degree_scores: Vec<RuntimeGraphDegreeScore>,
733}
734
735#[derive(Debug, Clone)]
736pub struct RuntimeGraphCommunity {
737    pub id: String,
738    pub size: usize,
739    pub nodes: Vec<String>,
740}
741
742#[derive(Debug, Clone)]
743pub struct RuntimeGraphCommunityResult {
744    pub algorithm: RuntimeGraphCommunityAlgorithm,
745    pub count: usize,
746    pub iterations: Option<usize>,
747    pub converged: Option<bool>,
748    pub modularity: Option<f64>,
749    pub passes: Option<usize>,
750    pub communities: Vec<RuntimeGraphCommunity>,
751}
752
753#[derive(Debug, Clone)]
754pub struct RuntimeGraphClusteringResult {
755    pub global: f64,
756    pub local: Vec<RuntimeGraphCentralityScore>,
757    pub triangle_count: Option<usize>,
758}
759
760#[derive(Debug, Clone)]
761pub struct RuntimeGraphHitsResult {
762    pub iterations: usize,
763    pub converged: bool,
764    pub hubs: Vec<RuntimeGraphCentralityScore>,
765    pub authorities: Vec<RuntimeGraphCentralityScore>,
766}
767
768#[derive(Debug, Clone)]
769pub struct RuntimeGraphCyclesResult {
770    pub limit_reached: bool,
771    pub cycles: Vec<RuntimeGraphPath>,
772}
773
774#[derive(Debug, Clone)]
775pub struct RuntimeGraphTopologicalSortResult {
776    pub acyclic: bool,
777    pub ordered_nodes: Vec<RuntimeGraphNode>,
778}
779
780#[derive(Debug, Clone)]
781pub struct RuntimeGraphPropertiesResult {
782    pub node_count: usize,
783    pub edge_count: usize,
784    pub self_loop_count: usize,
785    pub negative_edge_count: usize,
786    pub connected_component_count: usize,
787    pub weak_component_count: usize,
788    pub strong_component_count: usize,
789    pub is_empty: bool,
790    pub is_connected: bool,
791    pub is_weakly_connected: bool,
792    pub is_strongly_connected: bool,
793    pub is_complete: bool,
794    pub is_complete_directed: bool,
795    pub is_cyclic: bool,
796    pub is_circular: bool,
797    pub is_acyclic: bool,
798    pub is_tree: bool,
799    pub density: f64,
800    pub density_directed: f64,
801}
802
803// ============================================================================
804// Context Search types
805// ============================================================================
806
807#[derive(Debug, Clone)]
808pub struct ContextSearchResult {
809    pub query: String,
810    pub tables: Vec<ContextEntity>,
811    pub graph: ContextGraphResult,
812    pub vectors: Vec<ContextEntity>,
813    pub documents: Vec<ContextEntity>,
814    pub key_values: Vec<ContextEntity>,
815    pub connections: Vec<ContextConnection>,
816    pub summary: ContextSummary,
817}
818
819#[derive(Debug, Clone)]
820pub struct ContextEntity {
821    pub entity: UnifiedEntity,
822    pub score: f32,
823    pub discovery: DiscoveryMethod,
824    pub collection: String,
825}
826
827#[derive(Debug, Clone)]
828pub enum DiscoveryMethod {
829    Indexed {
830        field: String,
831    },
832    GlobalScan,
833    CrossReference {
834        source_id: u64,
835        ref_type: String,
836    },
837    GraphTraversal {
838        source_id: u64,
839        edge_type: String,
840        depth: usize,
841    },
842    VectorQuery {
843        similarity: f32,
844    },
845}
846
847#[derive(Debug, Clone)]
848pub struct ContextGraphResult {
849    pub nodes: Vec<ContextEntity>,
850    pub edges: Vec<ContextEntity>,
851}
852
853#[derive(Debug, Clone)]
854pub struct ContextConnection {
855    pub from_id: u64,
856    pub to_id: u64,
857    pub connection_type: ContextConnectionType,
858    pub weight: f32,
859}
860
861#[derive(Debug, Clone)]
862pub enum ContextConnectionType {
863    CrossRef(String),
864    GraphEdge(String),
865    VectorSimilarity(f32),
866}
867
868#[derive(Debug, Clone)]
869pub struct ContextSummary {
870    pub total_entities: usize,
871    pub direct_matches: usize,
872    pub expanded_via_graph: usize,
873    pub expanded_via_cross_refs: usize,
874    pub expanded_via_vector_query: usize,
875    pub collections_searched: usize,
876    pub execution_time_us: u64,
877    pub tiers_used: Vec<String>,
878    pub entities_reindexed: usize,
879}
880
881struct PoolState {
882    next_id: u64,
883    active: usize,
884    idle: Vec<u64>,
885    total_checkouts: u64,
886}
887
888impl Default for PoolState {
889    fn default() -> Self {
890        Self {
891            next_id: 1,
892            active: 0,
893            idle: Vec::new(),
894            total_checkouts: 0,
895        }
896    }
897}
898
899#[derive(Debug, Clone)]
900struct RuntimeResultCacheEntry {
901    result: RuntimeQueryResult,
902    cached_at: std::time::Instant,
903    scopes: HashSet<String>,
904}
905
906pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
907pub(crate) const ASK_ANSWER_CACHE_NAMESPACE: &str = "runtime.ask_answer_cache";
908const RMW_LOCK_SHARDS: usize = 64;
909
910struct RmwLockTable {
911    shards: Vec<parking_lot::Mutex<HashMap<String, Arc<parking_lot::Mutex<()>>>>>,
912}
913
914impl RmwLockTable {
915    fn new() -> Self {
916        let shards = (0..RMW_LOCK_SHARDS)
917            .map(|_| parking_lot::Mutex::new(HashMap::new()))
918            .collect();
919        Self { shards }
920    }
921
922    fn lock_for(&self, collection: &str, key: &str) -> Arc<parking_lot::Mutex<()>> {
923        use std::hash::{Hash, Hasher};
924
925        let mut hasher = std::collections::hash_map::DefaultHasher::new();
926        collection.hash(&mut hasher);
927        key.hash(&mut hasher);
928        let shard_idx = (hasher.finish() as usize) % self.shards.len();
929        let map_key = format!("{collection}\u{1f}{key}");
930        let mut shard = self.shards[shard_idx].lock();
931        shard
932            .entry(map_key)
933            .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
934            .clone()
935    }
936}
937
938struct RuntimeInner {
939    db: Arc<RedDB>,
940    layout: PhysicalLayout,
941    indices: IndexCatalog,
942    pool_config: ConnectionPoolConfig,
943    pool: Mutex<PoolState>,
944    started_at_unix_ms: u128,
945    probabilistic: probabilistic_store::ProbabilisticStore,
946    index_store: index_store::IndexStore,
947    cdc: crate::replication::cdc::CdcBuffer,
948    backup_scheduler: crate::replication::scheduler::BackupScheduler,
949    query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
950    result_cache: parking_lot::RwLock<(
951        HashMap<String, RuntimeResultCacheEntry>,
952        std::collections::VecDeque<String>,
953    )>,
954    result_blob_cache: crate::storage::cache::BlobCache,
955    result_blob_entries: parking_lot::RwLock<(
956        HashMap<String, RuntimeResultCacheEntry>,
957        std::collections::VecDeque<String>,
958    )>,
959    ask_answer_cache_entries:
960        parking_lot::RwLock<(HashSet<String>, std::collections::VecDeque<String>)>,
961    result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
962    ask_daily_spend:
963        parking_lot::RwLock<HashMap<String, crate::runtime::ai::cost_guard::DailyState>>,
964    /// Process-local queue message locks used to emulate `SKIP LOCKED`-style
965    /// claim semantics for concurrent queue consumers inside this runtime.
966    queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
967    /// Process-local read-modify-write locks. The table is sharded by
968    /// `(collection, key)` and each entry has its own mutex, so unrelated keys
969    /// in the same collection do not serialize behind one global lock.
970    rmw_locks: RmwLockTable,
971    planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
972    ec_registry: Arc<crate::ec::config::EcRegistry>,
973    ec_worker: crate::ec::worker::EcWorker,
974    /// Optional AuthStore — injected by server boot when auth is
975    /// enabled. Required for `Value::Secret` auto-encrypt/decrypt
976    /// because the AES key lives in the vault KV under the
977    /// `red.secret.aes_key` entry.
978    auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
979    /// Optional OAuth/OIDC JWT validator. Wired by server boot when
980    /// the operator configures issuer + JWKS via env / CLI. HTTP and
981    /// wire transports read this on every bearer-token request and,
982    /// when the token decodes as a JWT, validate it against the
983    /// configured issuer + audience + signature before falling back to
984    /// the local AuthStore lookup.
985    oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
986    /// View registry (Phase 2.1 PG parity).
987    ///
988    /// Holds the parsed `SELECT` body for every view created via
989    /// `CREATE [MATERIALIZED] VIEW`. Queries that reference a view name
990    /// substitute the stored `QueryExpr` at execution time. Materialized
991    /// views additionally back onto the shared `MaterializedViewCache`
992    /// (see `RuntimeInner::materialized_views`).
993    ///
994    /// This is in-memory only in Phase 2.1 — view definitions do not
995    /// survive a restart. Persistence is a Phase 3 follow-up.
996    views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
997    materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
998    /// MVCC snapshot manager (Phase 2.3 PG parity).
999    ///
1000    /// Allocates monotonic `xid`s on BEGIN and tracks the active/aborted
1001    /// sets used by `Snapshot::sees` to filter tuples by visibility. Each
1002    /// query evaluates `entity.is_visible(snapshot.xid)` — pre-MVCC rows
1003    /// (`xmin == 0`) stay visible to every snapshot, preserving backward
1004    /// compatibility with data written before the xid fields existed.
1005    snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
1006    /// Connection → active transaction context map (Phase 2.3 PG parity).
1007    ///
1008    /// Keyed by connection id from `RuntimeConnection`. Populated on BEGIN,
1009    /// cleared on COMMIT/ROLLBACK. When a statement executes outside a
1010    /// transaction (autocommit path) no entry exists and writes stamp
1011    /// `xid=0` — identical to pre-MVCC behaviour.
1012    tx_contexts:
1013        parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
1014    /// Intent-lock hierarchy (IS/IX/S/X) used to break the implicit
1015    /// global-write serialisation in write paths. Populated at boot
1016    /// with `concurrency.locking.deadlock_timeout_ms` from the matrix
1017    /// and wired through DML/DDL dispatch in later P1 tasks.
1018    /// Dormant until P1.T3 flips the read path to `(Global,IS) →
1019    /// (Collection,IS)` and P1.T4/T5 pick up writes/DDL.
1020    lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
1021    /// Perf-parity env-var overrides (`REDDB_<UP_DOTTED_KEY>`).
1022    /// Populated once at boot, read by every config getter; takes
1023    /// precedence over the persisted red_config value so operators
1024    /// can hot-fix a bad config by restarting with a different env
1025    /// var set. Keys are restricted to those declared in the matrix.
1026    env_config_overrides: HashMap<String, String>,
1027    /// Transaction-local tenant override (`SET LOCAL TENANT '<id>'`).
1028    /// Keyed by connection id, mirroring `tx_contexts`. Lives only while
1029    /// a transaction is open — `COMMIT` / `ROLLBACK` evict the entry,
1030    /// returning the connection to whichever session-level tenant
1031    /// (`SET TENANT 'x'`) was active before the transaction began.
1032    /// Wins over the session value but loses to a per-statement
1033    /// `WITHIN TENANT '<id>' …` override on the same call.
1034    tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
1035    /// Row-level security policies (Phase 2.5 PG parity).
1036    ///
1037    /// Keyed by `(table_name, policy_name)`; the set of tables with RLS
1038    /// enforcement toggled on lives in `rls_enabled_tables`. Filter
1039    /// enforcement hooks into the read path via `collect_rls_filters()`
1040    /// — see `runtime::impl_core`.
1041    rls_policies: parking_lot::RwLock<
1042        HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
1043    >,
1044    rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
1045    /// Foreign Data Wrapper registry (Phase 3.2 PG parity).
1046    ///
1047    /// Maps server names → wrapper instances and foreign-table names →
1048    /// definitions. Queries referencing a registered foreign table are
1049    /// re-routed to `ForeignTableRegistry::scan` by the read-path
1050    /// rewriter; reads against unknown names fall through to the native
1051    /// collection lookup.
1052    foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
1053    /// Per-connection list of tuples marked for deletion by the current
1054    /// transaction (Phase 2.3.2b MVCC tombstones + 2.3.2e savepoints).
1055    /// Each entry is `(collection, entity_id, stamper_xid, previous_xmax)`
1056    /// — the xid that stamped xmax on the tuple plus the value it
1057    /// replaced. For a plain transaction the
1058    /// stamper equals `ctx.xid`; with savepoints the stamper equals
1059    /// the innermost open sub-xid so ROLLBACK TO SAVEPOINT can revive
1060    /// only the matching subset. COMMIT drains the whole conn list
1061    /// and keeps the committed tombstones; ROLLBACK (whole-tx) revives them all;
1062    /// ROLLBACK TO SAVEPOINT revives those with `stamper_xid >=
1063    /// savepoint_xid`. Autocommit DELETE bypasses this map.
1064    pending_tombstones: parking_lot::RwLock<
1065        HashMap<
1066            u64,
1067            Vec<(
1068                String,
1069                crate::storage::unified::entity::EntityId,
1070                crate::storage::transaction::snapshot::Xid,
1071                crate::storage::transaction::snapshot::Xid,
1072            )>,
1073        >,
1074    >,
1075    /// Per-connection table-row UPDATE versions created by an open
1076    /// transaction. Each entry is `(collection, old_entity_id,
1077    /// new_entity_id, stamper_xid, previous_xmax)`. COMMIT keeps both physical
1078    /// versions and drops the pending marker; ROLLBACK revives the old
1079    /// version and removes the new uncommitted version.
1080    pending_versioned_updates: parking_lot::RwLock<
1081        HashMap<
1082            u64,
1083            Vec<(
1084                String,
1085                crate::storage::unified::entity::EntityId,
1086                crate::storage::unified::entity::EntityId,
1087                crate::storage::transaction::snapshot::Xid,
1088                crate::storage::transaction::snapshot::Xid,
1089            )>,
1090        >,
1091    >,
1092    pending_kv_watch_events:
1093        parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
1094    pending_store_wal_actions:
1095        parking_lot::RwLock<HashMap<u64, crate::storage::unified::DeferredStoreWalActions>>,
1096    /// Table-scoped tenancy registry (Phase 2.5.4).
1097    ///
1098    /// Maps `table_name → tenant_column`. DML auto-fill looks here to
1099    /// inject `CURRENT_TENANT()` on INSERTs that omit the column, and
1100    /// DDL keeps the in-memory registry in sync with the
1101    /// `tenant_tables.*` keys in red_config. Read-side enforcement
1102    /// piggy-backs on the existing RLS infrastructure: every entry
1103    /// installs an implicit `col = CURRENT_TENANT()` policy.
1104    tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
1105    /// Monotonic epoch bumped on every DDL / schema-mutating operation
1106    /// that calls `invalidate_plan_cache`. Prepared statements capture
1107    /// this at PREPARE and re-check at EXECUTE — a mismatch means the
1108    /// cached shape may reference dropped or renamed columns and the
1109    /// client must re-PREPARE.
1110    ddl_epoch: std::sync::atomic::AtomicU64,
1111    /// Public-mutation gate (PLAN.md W1).
1112    ///
1113    /// Built once at construction from the immutable subset of
1114    /// `RedDBOptions` (read_only flag + replication role). Every public
1115    /// mutation surface — SQL DML/DDL, gRPC mutating RPCs, HTTP/native
1116    /// wire mutations, admin maintenance endpoints, serverless
1117    /// lifecycle — consults `write_gate.check(WriteKind::*)` before
1118    /// dispatching to storage. The replica internal apply path
1119    /// (`LogicalChangeApplier`) reaches into the store directly and
1120    /// bypasses the gate by construction.
1121    write_gate: Arc<crate::runtime::write_gate::WriteGate>,
1122    /// Process lifecycle state machine (PLAN.md Phase 1 — Lifecycle
1123    /// Contract). Drives `/health/live`, `/health/ready`,
1124    /// `/health/startup`, and `POST /admin/shutdown` so any
1125    /// orchestrator (K8s preStop, Fly autostop, ECS task drain,
1126    /// systemd) can coordinate without losing data.
1127    lifecycle: crate::runtime::lifecycle::Lifecycle,
1128    /// Operator-imposed resource limits (PLAN.md Phase 4.1).
1129    /// Read once at boot from `RED_MAX_*` env vars; consulted by
1130    /// observability and (in follow-up commits) the per-write
1131    /// enforcement points.
1132    resource_limits: crate::runtime::resource_limits::ResourceLimits,
1133    /// Append-only audit log for admin mutations (PLAN.md Phase
1134    /// 6.5). Lives next to the primary `.rdb` file so backup +
1135    /// restore flows ship it alongside the data.
1136    audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
1137    /// Serverless writer-lease state machine. `None` when the operator
1138    /// did not opt into lease fencing (`RED_LEASE_REQUIRED` unset/false).
1139    /// When set, owns the {acquire/refresh/release/lost} transitions and
1140    /// is the single place that mutates `write_gate.set_lease_state` and
1141    /// records lease/* audit entries — keeping those two side-effects
1142    /// from drifting.
1143    lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
1144    /// PLAN.md Phase 11.5 — counters bumped by the replica apply
1145    /// loop on `Gap` / `Divergence` / `Apply` errors so /metrics
1146    /// surfaces them as `reddb_replica_apply_errors_total{kind}`.
1147    replica_apply_metrics: crate::replication::logical::ReplicaApplyMetrics,
1148    /// PLAN.md Phase 4.4 — per-caller QPS quotas. Disabled (no-op)
1149    /// when `RED_MAX_QPS_PER_CALLER` is unset.
1150    quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
1151    /// Issue #120 — token → schema entity reverse index, kept current
1152    /// incrementally on DDL events. Consumed by AskPipeline (issue
1153    /// #121) Stage 2 to narrow vector-search candidates before any
1154    /// embedding compute. Mutated only from DDL execution paths.
1155    schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
1156    /// Issue #205 — dedicated slow-query sink (`red-slow.log`).
1157    /// Built once at runtime startup; below-threshold calls pay only a
1158    /// single relaxed atomic load. Threshold + sample-pct come from
1159    /// `runtime.slow_query.threshold_ms` / `.sample_pct` (config matrix)
1160    /// at construction; live tuning via the config tree is a follow-up.
1161    slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
1162    /// Process-local normal-KV operation counters. These are intentionally
1163    /// runtime-local; persistent accounting belongs in catalog stats.
1164    kv_stats: KvStatsCounters,
1165    metrics_ingest_stats: MetricsIngestCounters,
1166    metrics_tenant_activity_stats: MetricsTenantActivityCounters,
1167    /// Process-local normal-KV tag index used by `INVALIDATE TAGS`.
1168    kv_tag_index: KvTagIndex,
1169}
1170
1171#[derive(Clone)]
1172pub struct RedDBRuntime {
1173    inner: Arc<RuntimeInner>,
1174}
1175
1176pub struct RuntimeConnection {
1177    id: u64,
1178    inner: Arc<RuntimeInner>,
1179}
1180
1181pub mod ai;
1182pub mod ask_pipeline;
1183pub mod audit_log;
1184pub mod audit_query;
1185pub mod authorized_search;
1186mod collection_contract;
1187pub mod config_matrix;
1188pub mod config_overlay;
1189pub mod config_watcher;
1190pub(crate) mod ddl;
1191pub mod disk_space_monitor;
1192mod dml_target_scan;
1193mod expr_eval;
1194mod graph_dsl;
1195mod health_connection;
1196mod impl_config;
1197pub(crate) mod impl_core;
1198mod impl_ddl;
1199mod impl_dml;
1200mod impl_ec;
1201mod impl_events;
1202mod impl_graph;
1203mod impl_graph_commands;
1204pub mod impl_kv;
1205mod impl_migrations;
1206mod impl_native;
1207mod impl_physical;
1208mod impl_probabilistic;
1209pub mod impl_queue;
1210mod impl_search;
1211mod impl_timeseries;
1212mod impl_tree;
1213mod impl_vcs;
1214mod index_store;
1215mod join_filter;
1216mod keyed_spine;
1217pub mod kv_watch;
1218pub mod lease_lifecycle;
1219pub mod lease_loop;
1220pub mod lease_timer_wheel;
1221pub mod lifecycle;
1222pub mod locking;
1223pub(crate) mod mutation;
1224mod probabilistic_store;
1225pub(crate) mod query_exec;
1226mod queue_delivery;
1227pub mod quota_bucket;
1228mod record_search;
1229mod red_schema;
1230pub mod resource_limits;
1231pub(crate) mod scalar_evaluator;
1232pub mod schema_diff;
1233pub mod schema_vocabulary;
1234pub mod snapshot_reuse;
1235mod statement_frame;
1236mod table_row_mvcc_resolver;
1237mod vector_index;
1238pub mod within_clause;
1239pub mod write_gate;
1240
1241pub use self::graph_dsl::*;
1242use self::join_filter::*;
1243use self::query_exec::*;
1244use self::record_search::*;
1245pub use self::statement_frame::EffectiveScope;
1246
1247/// Re-exports for transports + tests that need per-connection
1248/// isolation, tenant / auth thread-locals, and MVCC snapshot
1249/// utilities. Mirrors what PG-wire / gRPC / HTTP middleware already
1250/// call, and is enough to emulate independent connections in
1251/// integration tests.
1252pub mod mvcc {
1253    pub use super::impl_core::{
1254        capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
1255        clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
1256        entity_visible_under_current_snapshot, entity_visible_with_context,
1257        set_current_auth_identity, set_current_connection_id, set_current_snapshot,
1258        set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
1259    };
1260}
1261
1262/// Public helpers re-exported for use by the presentation layer.
1263pub mod record_search_helpers {
1264    use crate::storage::query::UnifiedRecord;
1265    use crate::storage::UnifiedEntity;
1266    use std::collections::BTreeSet;
1267
1268    pub fn entity_type_and_capabilities(
1269        entity: &UnifiedEntity,
1270    ) -> (&'static str, BTreeSet<String>) {
1271        super::record_search::runtime_entity_type_and_capabilities(entity)
1272    }
1273
1274    /// Materialise any entity kind (TableRow, Node, Edge, Vector,
1275    /// TimeSeriesPoint, QueueMessage) into a `UnifiedRecord` whose
1276    /// `values` carry the native fields. Used by the RLS evaluator
1277    /// when a non-table collection matches a `CompareExpr` policy.
1278    pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
1279        super::record_search::runtime_any_record_from_entity(entity)
1280    }
1281}