Skip to main content

reddb_server/
runtime.rs

1//! Embedded runtime with connection pooling, scans and health.
2
3use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{RedDBError, RedDBOptions, RedDBResult};
10use crate::catalog::{
11    CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
12    CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
13};
14use crate::health::{HealthProvider, HealthReport};
15use crate::index::IndexCatalog;
16use crate::physical::{
17    ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
18    SnapshotDescriptor,
19};
20use crate::serde_json::Value as JsonValue;
21use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
22use crate::storage::engine::{
23    BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
24    CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
25    IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
26    MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
27    StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
28};
29use crate::storage::query::ast::{
30    AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateCollectionQuery,
31    CreateIndexQuery, CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery,
32    CreateVectorQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery,
33    DropIndexQuery, DropKvQuery, DropQueueQuery, DropTableQuery, DropTimeSeriesQuery,
34    DropTreeQuery, DropVectorQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainFormat,
35    FieldRef, Filter, FusionStrategy, GraphCommand, HybridQuery, IndexMethod, InsertEntityType,
36    InsertQuery, JoinQuery, JoinType, OrderByClause, ProbabilisticCommand, Projection, QueryExpr,
37    QueueCommand, QueueSelectQuery, QueueSide, SearchCommand, TableQuery, TreeCommand,
38    TruncateQuery, UpdateQuery, VectorQuery, VectorSource,
39};
40use crate::storage::query::is_universal_entity_source as is_universal_query_source;
41use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
42use crate::storage::query::planner::{
43    CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
44};
45use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
46use crate::storage::schema::Value;
47use crate::storage::unified::dsl::{
48    apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
49    FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
50    QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
51};
52use crate::storage::unified::store::{
53    NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
54    NativeRegistrySummary,
55};
56use crate::storage::unified::{
57    Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
58};
59use crate::storage::{
60    EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
61    UnifiedStore,
62};
63
64#[derive(Debug, Clone)]
65pub struct ConnectionPoolConfig {
66    pub max_connections: usize,
67    pub max_idle: usize,
68}
69
70impl Default for ConnectionPoolConfig {
71    fn default() -> Self {
72        Self {
73            max_connections: 64,
74            max_idle: 16,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct ScanCursor {
81    pub offset: usize,
82}
83
84#[derive(Debug, Clone)]
85pub struct ScanPage {
86    pub collection: String,
87    pub items: Vec<UnifiedEntity>,
88    pub next: Option<ScanCursor>,
89    pub total: usize,
90}
91
92#[derive(Debug, Clone)]
93pub struct SystemInfo {
94    pub pid: u32,
95    pub cpu_cores: usize,
96    pub total_memory_bytes: u64,
97    pub available_memory_bytes: u64,
98    pub os: String,
99    pub arch: String,
100    pub hostname: String,
101}
102
103impl SystemInfo {
104    /// Whether the system has enough cores to benefit from parallelism.
105    /// Returns false on single-core machines where thread overhead > gains.
106    pub fn should_parallelize() -> bool {
107        std::thread::available_parallelism()
108            .map(|p| p.get() > 1)
109            .unwrap_or(false)
110    }
111
112    pub fn collect() -> Self {
113        Self {
114            pid: std::process::id(),
115            cpu_cores: std::thread::available_parallelism()
116                .map(|p| p.get())
117                .unwrap_or(1),
118            total_memory_bytes: Self::read_total_memory(),
119            available_memory_bytes: Self::read_available_memory(),
120            os: std::env::consts::OS.to_string(),
121            arch: std::env::consts::ARCH.to_string(),
122            hostname: std::env::var("HOSTNAME")
123                .or_else(|_| std::env::var("COMPUTERNAME"))
124                .unwrap_or_else(|_| "unknown".to_string()),
125        }
126    }
127
128    #[cfg(target_os = "linux")]
129    fn read_total_memory() -> u64 {
130        std::fs::read_to_string("/proc/meminfo")
131            .ok()
132            .and_then(|s| {
133                s.lines()
134                    .find(|l| l.starts_with("MemTotal:"))
135                    .and_then(|l| {
136                        l.split_whitespace()
137                            .nth(1)
138                            .and_then(|v| v.parse::<u64>().ok())
139                    })
140                    .map(|kb| kb * 1024)
141            })
142            .unwrap_or(0)
143    }
144
145    #[cfg(target_os = "linux")]
146    fn read_available_memory() -> u64 {
147        std::fs::read_to_string("/proc/meminfo")
148            .ok()
149            .and_then(|s| {
150                s.lines()
151                    .find(|l| l.starts_with("MemAvailable:"))
152                    .and_then(|l| {
153                        l.split_whitespace()
154                            .nth(1)
155                            .and_then(|v| v.parse::<u64>().ok())
156                    })
157                    .map(|kb| kb * 1024)
158            })
159            .unwrap_or(0)
160    }
161
162    #[cfg(not(target_os = "linux"))]
163    fn read_total_memory() -> u64 {
164        0
165    }
166
167    #[cfg(not(target_os = "linux"))]
168    fn read_available_memory() -> u64 {
169        0
170    }
171}
172
173#[derive(Debug, Clone)]
174pub struct RuntimeStats {
175    pub active_connections: usize,
176    pub idle_connections: usize,
177    pub total_checkouts: u64,
178    pub paged_mode: bool,
179    pub started_at_unix_ms: u128,
180    pub store: StoreStats,
181    pub system: SystemInfo,
182    pub result_blob_cache: crate::storage::cache::BlobCacheStats,
183    pub kv: KvStats,
184    pub metrics_ingest: MetricsIngestStats,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct MetricsTenantActivityStats {
189    pub tenant: String,
190    pub namespace: String,
191    pub operation: String,
192    pub count: u64,
193}
194
195#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
196pub struct MetricsIngestStats {
197    pub samples_accepted: u64,
198    pub series_accepted: u64,
199    pub samples_rejected: u64,
200    pub series_rejected: u64,
201    pub series_rejected_cardinality_budget: u64,
202}
203
204#[derive(Debug, Default)]
205pub(crate) struct MetricsIngestCounters {
206    samples_accepted: AtomicU64,
207    series_accepted: AtomicU64,
208    samples_rejected: AtomicU64,
209    series_rejected: AtomicU64,
210    series_rejected_cardinality_budget: AtomicU64,
211}
212
213impl MetricsIngestCounters {
214    pub(crate) fn record(
215        &self,
216        accepted_samples: u64,
217        accepted_series: u64,
218        rejected_samples: u64,
219        rejected_series: u64,
220    ) {
221        self.samples_accepted
222            .fetch_add(accepted_samples, AtomicOrdering::Relaxed);
223        self.series_accepted
224            .fetch_add(accepted_series, AtomicOrdering::Relaxed);
225        self.samples_rejected
226            .fetch_add(rejected_samples, AtomicOrdering::Relaxed);
227        self.series_rejected
228            .fetch_add(rejected_series, AtomicOrdering::Relaxed);
229    }
230
231    pub(crate) fn record_cardinality_budget_rejections(&self, rejected_series: u64) {
232        self.series_rejected_cardinality_budget
233            .fetch_add(rejected_series, AtomicOrdering::Relaxed);
234    }
235
236    pub(crate) fn snapshot(&self) -> MetricsIngestStats {
237        MetricsIngestStats {
238            samples_accepted: self.samples_accepted.load(AtomicOrdering::Relaxed),
239            series_accepted: self.series_accepted.load(AtomicOrdering::Relaxed),
240            samples_rejected: self.samples_rejected.load(AtomicOrdering::Relaxed),
241            series_rejected: self.series_rejected.load(AtomicOrdering::Relaxed),
242            series_rejected_cardinality_budget: self
243                .series_rejected_cardinality_budget
244                .load(AtomicOrdering::Relaxed),
245        }
246    }
247}
248
249#[derive(Debug, Default)]
250pub(crate) struct MetricsTenantActivityCounters {
251    inner: Mutex<BTreeMap<(String, String, String), u64>>,
252}
253
254impl MetricsTenantActivityCounters {
255    pub(crate) fn record(&self, tenant: &str, namespace: &str, operation: &str) {
256        let mut inner = self
257            .inner
258            .lock()
259            .unwrap_or_else(|poison| poison.into_inner());
260        let key = (
261            tenant.to_string(),
262            namespace.to_string(),
263            operation.to_string(),
264        );
265        *inner.entry(key).or_insert(0) += 1;
266    }
267
268    pub(crate) fn snapshot(&self) -> Vec<MetricsTenantActivityStats> {
269        let inner = self
270            .inner
271            .lock()
272            .unwrap_or_else(|poison| poison.into_inner());
273        inner
274            .iter()
275            .map(
276                |((tenant, namespace, operation), count)| MetricsTenantActivityStats {
277                    tenant: tenant.clone(),
278                    namespace: namespace.clone(),
279                    operation: operation.clone(),
280                    count: *count,
281                },
282            )
283            .collect()
284    }
285}
286
287#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
288pub struct KvStats {
289    pub puts: u64,
290    pub gets: u64,
291    pub deletes: u64,
292    pub incrs: u64,
293    pub cas_success: u64,
294    pub cas_conflict: u64,
295    pub watch_streams_active: u64,
296    pub watch_events_emitted: u64,
297    pub watch_drops: u64,
298}
299
300#[derive(Debug, Default)]
301pub(crate) struct KvStatsCounters {
302    puts: AtomicU64,
303    gets: AtomicU64,
304    deletes: AtomicU64,
305    incrs: AtomicU64,
306    cas_success: AtomicU64,
307    cas_conflict: AtomicU64,
308    watch_streams_active: AtomicU64,
309    watch_events_emitted: AtomicU64,
310    watch_drops: AtomicU64,
311}
312
313#[derive(Debug, Default)]
314pub(crate) struct KvTagIndex {
315    tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
316    key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
317}
318
319impl KvTagIndex {
320    pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
321        let entry_key = (collection.to_string(), key.to_string());
322        let new_tags: BTreeSet<String> = tags
323            .iter()
324            .map(|tag| tag.trim())
325            .filter(|tag| !tag.is_empty())
326            .map(ToOwned::to_owned)
327            .collect();
328
329        let old_tags = {
330            let mut key_to_tags = self.key_to_tags.write();
331            if new_tags.is_empty() {
332                key_to_tags.remove(&entry_key)
333            } else {
334                key_to_tags.insert(entry_key.clone(), new_tags.clone())
335            }
336        };
337
338        let mut tag_to_entries = self.tag_to_entries.write();
339        if let Some(old_tags) = old_tags {
340            for tag in old_tags {
341                let scoped = (collection.to_string(), tag);
342                let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
343                    entries.remove(key);
344                    entries.is_empty()
345                } else {
346                    false
347                };
348                if remove_scoped {
349                    tag_to_entries.remove(&scoped);
350                }
351            }
352        }
353
354        for tag in new_tags {
355            tag_to_entries
356                .entry((collection.to_string(), tag))
357                .or_default()
358                .insert(key.to_string(), id);
359        }
360    }
361
362    pub(crate) fn remove(&self, collection: &str, key: &str) {
363        let entry_key = (collection.to_string(), key.to_string());
364        let old_tags = self.key_to_tags.write().remove(&entry_key);
365        let Some(old_tags) = old_tags else {
366            return;
367        };
368
369        let mut tag_to_entries = self.tag_to_entries.write();
370        for tag in old_tags {
371            let scoped = (collection.to_string(), tag);
372            let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
373                entries.remove(key);
374                entries.is_empty()
375            } else {
376                false
377            };
378            if remove_scoped {
379                tag_to_entries.remove(&scoped);
380            }
381        }
382    }
383
384    pub(crate) fn entries_for_tags(
385        &self,
386        collection: &str,
387        tags: &[String],
388    ) -> Vec<(String, EntityId)> {
389        if tags.is_empty() {
390            return Vec::new();
391        }
392
393        let tag_to_entries = self.tag_to_entries.read();
394        let mut out: HashMap<String, EntityId> = HashMap::new();
395        for tag in tags {
396            let scoped = (collection.to_string(), tag.trim().to_string());
397            if let Some(entries) = tag_to_entries.get(&scoped) {
398                for (key, id) in entries {
399                    out.entry(key.clone()).or_insert(*id);
400                }
401            }
402        }
403        out.into_iter().collect()
404    }
405
406    pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
407        self.key_to_tags
408            .read()
409            .get(&(collection.to_string(), key.to_string()))
410            .map(|tags| tags.iter().cloned().collect())
411            .unwrap_or_default()
412    }
413}
414
415impl KvStatsCounters {
416    pub(crate) fn snapshot(&self) -> KvStats {
417        KvStats {
418            puts: self.puts.load(AtomicOrdering::Relaxed),
419            gets: self.gets.load(AtomicOrdering::Relaxed),
420            deletes: self.deletes.load(AtomicOrdering::Relaxed),
421            incrs: self.incrs.load(AtomicOrdering::Relaxed),
422            cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
423            cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
424            watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
425            watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
426            watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
427        }
428    }
429
430    pub(crate) fn incr_puts(&self) {
431        self.puts.fetch_add(1, AtomicOrdering::Relaxed);
432    }
433
434    pub(crate) fn incr_gets(&self) {
435        self.gets.fetch_add(1, AtomicOrdering::Relaxed);
436    }
437
438    pub(crate) fn incr_deletes(&self) {
439        self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
440    }
441
442    pub(crate) fn incr_incrs(&self) {
443        self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
444    }
445
446    pub(crate) fn incr_cas_success(&self) {
447        self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
448    }
449
450    pub(crate) fn incr_cas_conflict(&self) {
451        self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
452    }
453
454    pub(crate) fn incr_watch_streams_active(&self) {
455        self.watch_streams_active
456            .fetch_add(1, AtomicOrdering::Relaxed);
457    }
458
459    pub(crate) fn decr_watch_streams_active(&self) {
460        self.watch_streams_active
461            .fetch_sub(1, AtomicOrdering::Relaxed);
462    }
463
464    pub(crate) fn incr_watch_events_emitted(&self) {
465        self.watch_events_emitted
466            .fetch_add(1, AtomicOrdering::Relaxed);
467    }
468
469    pub(crate) fn add_watch_drops(&self, count: u64) {
470        self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
471    }
472}
473
474#[derive(Debug, Clone)]
475pub struct RuntimeQueryResult {
476    pub query: String,
477    pub mode: QueryMode,
478    pub statement: &'static str,
479    pub engine: &'static str,
480    pub result: UnifiedResult,
481    pub affected_rows: u64,
482    /// High-level statement type: "select", "insert", "update", "delete", "create", "drop", "alter"
483    pub statement_type: &'static str,
484    pub bookmark: Option<String>,
485}
486
487impl RuntimeQueryResult {
488    /// Construct a result representing a DML operation (insert/update/delete).
489    pub fn dml_result(
490        query: String,
491        affected: u64,
492        statement_type: &'static str,
493        engine: &'static str,
494    ) -> Self {
495        Self {
496            query,
497            mode: QueryMode::Sql,
498            statement: statement_type,
499            engine,
500            result: UnifiedResult::empty(),
501            affected_rows: affected,
502            statement_type,
503            bookmark: None,
504        }
505    }
506
507    /// Construct a result representing a DDL message (create/drop/alter).
508    pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
509        let mut result = UnifiedResult::empty();
510        let mut record = UnifiedRecord::new();
511        record.set("message", Value::text(message.to_string()));
512        result.push(record);
513        result.columns = vec!["message".to_string()];
514
515        Self {
516            query,
517            mode: QueryMode::Sql,
518            statement: statement_type,
519            engine: "runtime-ddl",
520            result,
521            affected_rows: 0,
522            statement_type,
523            bookmark: None,
524        }
525    }
526
527    /// Construct a multi-column record result for read-only meta commands
528    /// (EXPLAIN ALTER, schema introspection, etc.). Each row is a Vec of
529    /// (column_name, value) pairs in column order.
530    pub fn ok_records(
531        query: String,
532        columns: Vec<String>,
533        rows: Vec<Vec<(String, Value)>>,
534        statement_type: &'static str,
535    ) -> Self {
536        let mut result = UnifiedResult::empty();
537        for row in rows {
538            let mut record = UnifiedRecord::new();
539            for (k, v) in row {
540                record.set(&k, v);
541            }
542            result.push(record);
543        }
544        result.columns = columns;
545
546        Self {
547            query,
548            mode: QueryMode::Sql,
549            statement: statement_type,
550            engine: "runtime-meta",
551            result,
552            affected_rows: 0,
553            statement_type,
554            bookmark: None,
555        }
556    }
557
558    pub fn bookmark_token(&self) -> Option<&str> {
559        self.bookmark.as_deref()
560    }
561}
562
563#[derive(Debug, Clone)]
564pub struct RuntimeQueryExplain {
565    pub query: String,
566    pub mode: QueryMode,
567    pub statement: &'static str,
568    pub is_universal: bool,
569    pub plan_cost: crate::storage::query::planner::PlanCost,
570    pub estimated_rows: f64,
571    pub estimated_selectivity: f64,
572    pub estimated_confidence: f64,
573    pub passes_applied: Vec<String>,
574    pub logical_plan: CanonicalLogicalPlan,
575    /// Names of any CTEs declared by a leading `WITH` clause. Empty
576    /// for non-CTE queries. The plan tree is built against the
577    /// post-inlining body, so each CTE's body is reachable inside
578    /// `logical_plan` as a regular `Subquery` (or, for bare refs, the
579    /// inlined Table node verbatim). This list lets renderers prepend
580    /// `CteScan` markers so operators see which CTEs were resolved.
581    pub cte_materializations: Vec<String>,
582}
583
584#[derive(Debug, Clone)]
585pub struct RuntimeIvfMatch {
586    pub entity_id: u64,
587    pub distance: f32,
588    pub entity: Option<UnifiedEntity>,
589}
590
591#[derive(Debug, Clone)]
592pub struct RuntimeIvfSearchResult {
593    pub collection: String,
594    pub k: usize,
595    pub n_lists: usize,
596    pub n_probes: usize,
597    pub stats: IvfStats,
598    pub matches: Vec<RuntimeIvfMatch>,
599}
600
601#[derive(Debug, Clone, Copy, PartialEq, Eq)]
602pub enum RuntimeGraphDirection {
603    Outgoing,
604    Incoming,
605    Both,
606}
607
608#[derive(Debug, Clone, Copy, PartialEq, Eq)]
609pub enum RuntimeGraphTraversalStrategy {
610    Bfs,
611    Dfs,
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq)]
615pub enum RuntimeGraphPathAlgorithm {
616    Bfs,
617    Dijkstra,
618    AStar,
619    BellmanFord,
620}
621
622#[derive(Debug, Clone)]
623pub struct RuntimeGraphNode {
624    pub id: String,
625    pub label: String,
626    pub node_type: String,
627    pub out_edge_count: u32,
628    pub in_edge_count: u32,
629}
630
631#[derive(Debug, Clone)]
632pub struct RuntimeGraphEdge {
633    pub source: String,
634    pub target: String,
635    pub edge_type: String,
636    pub weight: f32,
637}
638
639#[derive(Debug, Clone)]
640pub struct RuntimeGraphVisit {
641    pub depth: usize,
642    pub node: RuntimeGraphNode,
643}
644
645#[derive(Debug, Clone)]
646pub struct RuntimeGraphNeighborhoodResult {
647    pub source: String,
648    pub direction: RuntimeGraphDirection,
649    pub max_depth: usize,
650    pub nodes: Vec<RuntimeGraphVisit>,
651    pub edges: Vec<RuntimeGraphEdge>,
652}
653
654#[derive(Debug, Clone)]
655pub struct RuntimeGraphTraversalResult {
656    pub source: String,
657    pub direction: RuntimeGraphDirection,
658    pub strategy: RuntimeGraphTraversalStrategy,
659    pub max_depth: usize,
660    pub visits: Vec<RuntimeGraphVisit>,
661    pub edges: Vec<RuntimeGraphEdge>,
662}
663
664#[derive(Debug, Clone)]
665pub struct RuntimeGraphPath {
666    pub hop_count: usize,
667    pub total_weight: f64,
668    pub nodes: Vec<RuntimeGraphNode>,
669    pub edges: Vec<RuntimeGraphEdge>,
670}
671
672#[derive(Debug, Clone)]
673pub struct RuntimeGraphPathResult {
674    pub source: String,
675    pub target: String,
676    pub direction: RuntimeGraphDirection,
677    pub algorithm: RuntimeGraphPathAlgorithm,
678    pub nodes_visited: usize,
679    pub negative_cycle_detected: Option<bool>,
680    pub path: Option<RuntimeGraphPath>,
681}
682
683#[derive(Debug, Clone, Copy, PartialEq, Eq)]
684pub enum RuntimeGraphComponentsMode {
685    Connected,
686    Weak,
687    Strong,
688}
689
690#[derive(Debug, Clone, Copy, PartialEq, Eq)]
691pub enum RuntimeGraphCentralityAlgorithm {
692    Degree,
693    Closeness,
694    Betweenness,
695    Eigenvector,
696    PageRank,
697}
698
699#[derive(Debug, Clone, Copy, PartialEq, Eq)]
700pub enum RuntimeGraphCommunityAlgorithm {
701    LabelPropagation,
702    Louvain,
703}
704
705#[derive(Debug, Clone)]
706pub struct RuntimeGraphComponent {
707    pub id: String,
708    pub size: usize,
709    pub nodes: Vec<String>,
710}
711
712#[derive(Debug, Clone)]
713pub struct RuntimeGraphComponentsResult {
714    pub mode: RuntimeGraphComponentsMode,
715    pub count: usize,
716    pub components: Vec<RuntimeGraphComponent>,
717}
718
719#[derive(Debug, Clone)]
720pub struct RuntimeGraphCentralityScore {
721    pub node: RuntimeGraphNode,
722    pub score: f64,
723}
724
725#[derive(Debug, Clone)]
726pub struct RuntimeGraphDegreeScore {
727    pub node: RuntimeGraphNode,
728    pub in_degree: usize,
729    pub out_degree: usize,
730    pub total_degree: usize,
731}
732
733#[derive(Debug, Clone)]
734pub struct RuntimeGraphCentralityResult {
735    pub algorithm: RuntimeGraphCentralityAlgorithm,
736    pub normalized: Option<bool>,
737    pub iterations: Option<usize>,
738    pub converged: Option<bool>,
739    pub scores: Vec<RuntimeGraphCentralityScore>,
740    pub degree_scores: Vec<RuntimeGraphDegreeScore>,
741}
742
743#[derive(Debug, Clone)]
744pub struct RuntimeGraphCommunity {
745    pub id: String,
746    pub size: usize,
747    pub nodes: Vec<String>,
748}
749
750#[derive(Debug, Clone)]
751pub struct RuntimeGraphCommunityResult {
752    pub algorithm: RuntimeGraphCommunityAlgorithm,
753    pub count: usize,
754    pub iterations: Option<usize>,
755    pub converged: Option<bool>,
756    pub modularity: Option<f64>,
757    pub passes: Option<usize>,
758    pub communities: Vec<RuntimeGraphCommunity>,
759}
760
761#[derive(Debug, Clone)]
762pub struct RuntimeGraphClusteringResult {
763    pub global: f64,
764    pub local: Vec<RuntimeGraphCentralityScore>,
765    pub triangle_count: Option<usize>,
766}
767
768#[derive(Debug, Clone)]
769pub struct RuntimeGraphHitsResult {
770    pub iterations: usize,
771    pub converged: bool,
772    pub hubs: Vec<RuntimeGraphCentralityScore>,
773    pub authorities: Vec<RuntimeGraphCentralityScore>,
774}
775
776#[derive(Debug, Clone)]
777pub struct RuntimeGraphCyclesResult {
778    pub limit_reached: bool,
779    pub cycles: Vec<RuntimeGraphPath>,
780}
781
782#[derive(Debug, Clone)]
783pub struct RuntimeGraphTopologicalSortResult {
784    pub acyclic: bool,
785    pub ordered_nodes: Vec<RuntimeGraphNode>,
786}
787
788#[derive(Debug, Clone)]
789pub struct RuntimeGraphPropertiesResult {
790    pub node_count: usize,
791    pub edge_count: usize,
792    pub self_loop_count: usize,
793    pub negative_edge_count: usize,
794    pub connected_component_count: usize,
795    pub weak_component_count: usize,
796    pub strong_component_count: usize,
797    pub is_empty: bool,
798    pub is_connected: bool,
799    pub is_weakly_connected: bool,
800    pub is_strongly_connected: bool,
801    pub is_complete: bool,
802    pub is_complete_directed: bool,
803    pub is_cyclic: bool,
804    pub is_circular: bool,
805    pub is_acyclic: bool,
806    pub is_tree: bool,
807    pub density: f64,
808    pub density_directed: f64,
809}
810
811// ============================================================================
812// Context Search types
813// ============================================================================
814
815#[derive(Debug, Clone)]
816pub struct ContextSearchResult {
817    pub query: String,
818    pub tables: Vec<ContextEntity>,
819    pub graph: ContextGraphResult,
820    pub vectors: Vec<ContextEntity>,
821    pub documents: Vec<ContextEntity>,
822    pub key_values: Vec<ContextEntity>,
823    pub connections: Vec<ContextConnection>,
824    pub summary: ContextSummary,
825}
826
827#[derive(Debug, Clone)]
828pub struct ContextEntity {
829    pub entity: UnifiedEntity,
830    pub score: f32,
831    pub discovery: DiscoveryMethod,
832    pub collection: String,
833}
834
835#[derive(Debug, Clone)]
836pub enum DiscoveryMethod {
837    Indexed {
838        field: String,
839    },
840    GlobalScan,
841    CrossReference {
842        source_id: u64,
843        ref_type: String,
844    },
845    GraphTraversal {
846        source_id: u64,
847        edge_type: String,
848        depth: usize,
849    },
850    VectorQuery {
851        similarity: f32,
852    },
853}
854
855#[derive(Debug, Clone)]
856pub struct ContextGraphResult {
857    pub nodes: Vec<ContextEntity>,
858    pub edges: Vec<ContextEntity>,
859}
860
861#[derive(Debug, Clone)]
862pub struct ContextConnection {
863    pub from_id: u64,
864    pub to_id: u64,
865    pub connection_type: ContextConnectionType,
866    pub weight: f32,
867}
868
869#[derive(Debug, Clone)]
870pub enum ContextConnectionType {
871    CrossRef(String),
872    GraphEdge(String),
873    VectorSimilarity(f32),
874}
875
876#[derive(Debug, Clone)]
877pub struct ContextSummary {
878    pub total_entities: usize,
879    pub direct_matches: usize,
880    pub expanded_via_graph: usize,
881    pub expanded_via_cross_refs: usize,
882    pub expanded_via_vector_query: usize,
883    pub collections_searched: usize,
884    pub execution_time_us: u64,
885    pub tiers_used: Vec<String>,
886    pub entities_reindexed: usize,
887}
888
889struct PoolState {
890    next_id: u64,
891    active: usize,
892    idle: Vec<u64>,
893    total_checkouts: u64,
894}
895
896impl Default for PoolState {
897    fn default() -> Self {
898        Self {
899            next_id: 1,
900            active: 0,
901            idle: Vec::new(),
902            total_checkouts: 0,
903        }
904    }
905}
906
907#[derive(Debug, Clone)]
908struct RuntimeResultCacheEntry {
909    result: RuntimeQueryResult,
910    cached_at: std::time::Instant,
911    scopes: HashSet<String>,
912}
913
914pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
915/// Stable metric names for the query/graph-analytics result cache
916/// (issue #802). Exposed under the `red.metrics` surface as
917/// `reddb_result_cache_{hit,miss,evict}_total` and readable in-process
918/// via `RedDBRuntime::result_cache_metrics`.
919pub const METRIC_RESULT_CACHE_HIT_TOTAL: &str = "result_cache_hit_total";
920pub const METRIC_RESULT_CACHE_MISS_TOTAL: &str = "result_cache_miss_total";
921pub const METRIC_RESULT_CACHE_EVICT_TOTAL: &str = "result_cache_evict_total";
922pub(crate) const ASK_ANSWER_CACHE_NAMESPACE: &str = "runtime.ask_answer_cache";
923const RMW_LOCK_SHARDS: usize = 64;
924
925struct RmwLockTable {
926    shards: Vec<parking_lot::Mutex<HashMap<String, Arc<parking_lot::Mutex<()>>>>>,
927}
928
929impl RmwLockTable {
930    fn new() -> Self {
931        let shards = (0..RMW_LOCK_SHARDS)
932            .map(|_| parking_lot::Mutex::new(HashMap::new()))
933            .collect();
934        Self { shards }
935    }
936
937    fn lock_for(&self, collection: &str, key: &str) -> Arc<parking_lot::Mutex<()>> {
938        use std::hash::{Hash, Hasher};
939
940        let mut hasher = std::collections::hash_map::DefaultHasher::new();
941        collection.hash(&mut hasher);
942        key.hash(&mut hasher);
943        let shard_idx = (hasher.finish() as usize) % self.shards.len();
944        let map_key = format!("{collection}\u{1f}{key}");
945        let mut shard = self.shards[shard_idx].lock();
946        shard
947            .entry(map_key)
948            .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
949            .clone()
950    }
951}
952
953struct RuntimeInner {
954    db: Arc<RedDB>,
955    layout: PhysicalLayout,
956    indices: IndexCatalog,
957    pool_config: ConnectionPoolConfig,
958    pool: Mutex<PoolState>,
959    started_at_unix_ms: u128,
960    probabilistic: probabilistic_store::ProbabilisticStore,
961    index_store: index_store::IndexStore,
962    cdc: crate::replication::cdc::CdcBuffer,
963    backup_scheduler: crate::replication::scheduler::BackupScheduler,
964    query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
965    result_cache: parking_lot::RwLock<(
966        HashMap<String, RuntimeResultCacheEntry>,
967        std::collections::VecDeque<String>,
968    )>,
969    result_blob_cache: crate::storage::cache::BlobCache,
970    result_blob_entries: parking_lot::RwLock<(
971        HashMap<String, RuntimeResultCacheEntry>,
972        std::collections::VecDeque<String>,
973    )>,
974    ask_answer_cache_entries:
975        parking_lot::RwLock<(HashSet<String>, std::collections::VecDeque<String>)>,
976    result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
977    /// Result-cache observability counters (issue #802). Process-cumulative
978    /// hit/miss/eviction tallies surfaced under `red.metrics`.
979    result_cache_hits: std::sync::atomic::AtomicU64,
980    result_cache_misses: std::sync::atomic::AtomicU64,
981    result_cache_evictions: std::sync::atomic::AtomicU64,
982    ask_daily_spend:
983        parking_lot::RwLock<HashMap<String, crate::runtime::ai::cost_guard::DailyState>>,
984    /// Process-local queue message locks used to emulate `SKIP LOCKED`-style
985    /// claim semantics for concurrent queue consumers inside this runtime.
986    queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
987    /// Process-local read-modify-write locks. The table is sharded by
988    /// `(collection, key)` and each entry has its own mutex, so unrelated keys
989    /// in the same collection do not serialize behind one global lock.
990    rmw_locks: RmwLockTable,
991    planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
992    ec_registry: Arc<crate::ec::config::EcRegistry>,
993    config_registry: Arc<crate::auth::registry::ConfigRegistry>,
994    ec_worker: crate::ec::worker::EcWorker,
995    /// Optional AuthStore — injected by server boot when auth is
996    /// enabled. Required for `Value::Secret` auto-encrypt/decrypt
997    /// because the AES key lives in the vault KV under the
998    /// `red.secret.aes_key` entry.
999    auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
1000    /// Optional OAuth/OIDC JWT validator. Wired by server boot when
1001    /// the operator configures issuer + JWKS via env / CLI. HTTP and
1002    /// wire transports read this on every bearer-token request and,
1003    /// when the token decodes as a JWT, validate it against the
1004    /// configured issuer + audience + signature before falling back to
1005    /// the local AuthStore lookup.
1006    oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
1007    /// View registry (Phase 2.1 PG parity).
1008    ///
1009    /// Holds the parsed `SELECT` body for every view created via
1010    /// `CREATE [MATERIALIZED] VIEW`. Queries that reference a view name
1011    /// substitute the stored `QueryExpr` at execution time. Materialized
1012    /// views additionally back onto the shared `MaterializedViewCache`
1013    /// (see `RuntimeInner::materialized_views`).
1014    ///
1015    /// This is in-memory only in Phase 2.1 — view definitions do not
1016    /// survive a restart. Persistence is a Phase 3 follow-up.
1017    views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
1018    materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
1019    /// Per-collection retention sweeper state (issue #584 slice 12).
1020    /// Tracks `last_sweep_at_ms`, `rows_swept_total`, and the latest
1021    /// pending-rows estimate that feeds the three new columns on
1022    /// `red.retention`. In-memory only; resets across restart.
1023    pub(crate) retention_sweeper:
1024        parking_lot::RwLock<crate::runtime::retention_sweeper::RetentionSweeperState>,
1025    /// MVCC snapshot manager (Phase 2.3 PG parity).
1026    ///
1027    /// Allocates monotonic `xid`s on BEGIN and tracks the active/aborted
1028    /// sets used by `Snapshot::sees` to filter tuples by visibility. Each
1029    /// query evaluates `entity.is_visible(snapshot.xid)` — pre-MVCC rows
1030    /// (`xmin == 0`) stay visible to every snapshot, preserving backward
1031    /// compatibility with data written before the xid fields existed.
1032    snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
1033    /// Connection → active transaction context map (Phase 2.3 PG parity).
1034    ///
1035    /// Keyed by connection id from `RuntimeConnection`. Populated on BEGIN,
1036    /// cleared on COMMIT/ROLLBACK. When a statement executes outside a
1037    /// transaction (autocommit path) no entry exists and writes stamp
1038    /// `xid=0` — identical to pre-MVCC behaviour.
1039    tx_contexts:
1040        parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
1041    /// Intent-lock hierarchy (IS/IX/S/X) used to break the implicit
1042    /// global-write serialisation in write paths. Populated at boot
1043    /// with `concurrency.locking.deadlock_timeout_ms` from the matrix
1044    /// and wired through DML/DDL dispatch in later P1 tasks.
1045    /// Dormant until P1.T3 flips the read path to `(Global,IS) →
1046    /// (Collection,IS)` and P1.T4/T5 pick up writes/DDL.
1047    lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
1048    /// Perf-parity env-var overrides (`REDDB_<UP_DOTTED_KEY>`).
1049    /// Populated once at boot, read by every config getter; takes
1050    /// precedence over the persisted red_config value so operators
1051    /// can hot-fix a bad config by restarting with a different env
1052    /// var set. Keys are restricted to those declared in the matrix.
1053    env_config_overrides: HashMap<String, String>,
1054    /// Transaction-local tenant override (`SET LOCAL TENANT '<id>'`).
1055    /// Keyed by connection id, mirroring `tx_contexts`. Lives only while
1056    /// a transaction is open — `COMMIT` / `ROLLBACK` evict the entry,
1057    /// returning the connection to whichever session-level tenant
1058    /// (`SET TENANT 'x'`) was active before the transaction began.
1059    /// Wins over the session value but loses to a per-statement
1060    /// `WITHIN TENANT '<id>' …` override on the same call.
1061    tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
1062    /// Row-level security policies (Phase 2.5 PG parity).
1063    ///
1064    /// Keyed by `(table_name, policy_name)`; the set of tables with RLS
1065    /// enforcement toggled on lives in `rls_enabled_tables`. Filter
1066    /// enforcement hooks into the read path via `collect_rls_filters()`
1067    /// — see `runtime::impl_core`.
1068    rls_policies: parking_lot::RwLock<
1069        HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
1070    >,
1071    rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
1072    /// Foreign Data Wrapper registry (Phase 3.2 PG parity).
1073    ///
1074    /// Maps server names → wrapper instances and foreign-table names →
1075    /// definitions. Queries referencing a registered foreign table are
1076    /// re-routed to `ForeignTableRegistry::scan` by the read-path
1077    /// rewriter; reads against unknown names fall through to the native
1078    /// collection lookup.
1079    foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
1080    /// Per-connection list of tuples marked for deletion by the current
1081    /// transaction (Phase 2.3.2b MVCC tombstones + 2.3.2e savepoints).
1082    /// Each entry is `(collection, entity_id, stamper_xid, previous_xmax)`
1083    /// — the xid that stamped xmax on the tuple plus the value it
1084    /// replaced. For a plain transaction the
1085    /// stamper equals `ctx.xid`; with savepoints the stamper equals
1086    /// the innermost open sub-xid so ROLLBACK TO SAVEPOINT can revive
1087    /// only the matching subset. COMMIT drains the whole conn list
1088    /// and keeps the committed tombstones; ROLLBACK (whole-tx) revives them all;
1089    /// ROLLBACK TO SAVEPOINT revives those with `stamper_xid >=
1090    /// savepoint_xid`. Autocommit DELETE bypasses this map.
1091    pending_tombstones: parking_lot::RwLock<
1092        HashMap<
1093            u64,
1094            Vec<(
1095                String,
1096                crate::storage::unified::entity::EntityId,
1097                crate::storage::transaction::snapshot::Xid,
1098                crate::storage::transaction::snapshot::Xid,
1099            )>,
1100        >,
1101    >,
1102    /// Per-connection table-row UPDATE versions created by an open
1103    /// transaction. Each entry is `(collection, old_entity_id,
1104    /// new_entity_id, stamper_xid, previous_xmax)`. COMMIT keeps both physical
1105    /// versions and drops the pending marker; ROLLBACK revives the old
1106    /// version and removes the new uncommitted version.
1107    pending_versioned_updates: parking_lot::RwLock<
1108        HashMap<
1109            u64,
1110            Vec<(
1111                String,
1112                crate::storage::unified::entity::EntityId,
1113                crate::storage::unified::entity::EntityId,
1114                crate::storage::transaction::snapshot::Xid,
1115                crate::storage::transaction::snapshot::Xid,
1116            )>,
1117        >,
1118    >,
1119    pending_kv_watch_events:
1120        parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
1121    pending_store_wal_actions:
1122        parking_lot::RwLock<HashMap<u64, crate::storage::unified::DeferredStoreWalActions>>,
1123    /// Table-scoped tenancy registry (Phase 2.5.4).
1124    ///
1125    /// Maps `table_name → tenant_column`. DML auto-fill looks here to
1126    /// inject `CURRENT_TENANT()` on INSERTs that omit the column, and
1127    /// DDL keeps the in-memory registry in sync with the
1128    /// `tenant_tables.*` keys in red_config. Read-side enforcement
1129    /// piggy-backs on the existing RLS infrastructure: every entry
1130    /// installs an implicit `col = CURRENT_TENANT()` policy.
1131    tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
1132    /// Monotonic epoch bumped on every DDL / schema-mutating operation
1133    /// that calls `invalidate_plan_cache`. Prepared statements capture
1134    /// this at PREPARE and re-check at EXECUTE — a mismatch means the
1135    /// cached shape may reference dropped or renamed columns and the
1136    /// client must re-PREPARE.
1137    ddl_epoch: std::sync::atomic::AtomicU64,
1138    /// Public-mutation gate (PLAN.md W1).
1139    ///
1140    /// Built once at construction from the immutable subset of
1141    /// `RedDBOptions` (read_only flag + replication role). Every public
1142    /// mutation surface — SQL DML/DDL, gRPC mutating RPCs, HTTP/native
1143    /// wire mutations, admin maintenance endpoints, serverless
1144    /// lifecycle — consults `write_gate.check(WriteKind::*)` before
1145    /// dispatching to storage. The replica internal apply path
1146    /// (`LogicalChangeApplier`) reaches into the store directly and
1147    /// bypasses the gate by construction.
1148    write_gate: Arc<crate::runtime::write_gate::WriteGate>,
1149    /// Process lifecycle state machine (PLAN.md Phase 1 — Lifecycle
1150    /// Contract). Drives `/health/live`, `/health/ready`,
1151    /// `/health/startup`, and `POST /admin/shutdown` so any
1152    /// orchestrator (K8s preStop, Fly autostop, ECS task drain,
1153    /// systemd) can coordinate without losing data.
1154    lifecycle: crate::runtime::lifecycle::Lifecycle,
1155    /// Operator-imposed resource limits (PLAN.md Phase 4.1).
1156    /// Read once at boot from `RED_MAX_*` env vars; consulted by
1157    /// observability and (in follow-up commits) the per-write
1158    /// enforcement points.
1159    resource_limits: crate::runtime::resource_limits::ResourceLimits,
1160    /// Append-only audit log for admin mutations (PLAN.md Phase
1161    /// 6.5). Lives next to the primary `.rdb` file so backup +
1162    /// restore flows ship it alongside the data.
1163    audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
1164    /// Durable control-plane evidence ledger. Producer slices emit
1165    /// through this sink and decide fail-open/fail-closed using
1166    /// `control_event_config`.
1167    control_event_ledger:
1168        parking_lot::RwLock<Arc<dyn crate::runtime::control_events::ControlEventLedger>>,
1169    control_event_config: crate::runtime::control_events::ControlEventConfig,
1170    /// Data-plane query audit stream. Kept separate from the Control
1171    /// Event Ledger so scoped query metadata cannot be confused with
1172    /// governance evidence.
1173    query_audit: Arc<crate::runtime::query_audit::QueryAuditStream>,
1174    /// Serverless writer-lease state machine. `None` when the operator
1175    /// did not opt into lease fencing (`RED_LEASE_REQUIRED` unset/false).
1176    /// When set, owns the {acquire/refresh/release/lost} transitions and
1177    /// is the single place that mutates `write_gate.set_lease_state` and
1178    /// records lease/* audit entries — keeping those two side-effects
1179    /// from drifting.
1180    lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
1181    /// PLAN.md Phase 11.5 — counters bumped by the replica apply
1182    /// loop on `Gap` / `Divergence` / `Apply` errors so /metrics
1183    /// surfaces them as `reddb_replica_apply_errors_total{kind}`.
1184    replica_apply_metrics: Arc<crate::replication::logical::ReplicaApplyMetrics>,
1185    /// PLAN.md Phase 4.4 — per-caller QPS quotas. Disabled (no-op)
1186    /// when `RED_MAX_QPS_PER_CALLER` is unset.
1187    quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
1188    /// Issue #120 — token → schema entity reverse index, kept current
1189    /// incrementally on DDL events. Consumed by AskPipeline (issue
1190    /// #121) Stage 2 to narrow vector-search candidates before any
1191    /// embedding compute. Mutated only from DDL execution paths.
1192    schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
1193    /// Issue #205 — dedicated slow-query sink (`red-slow.log`).
1194    /// Built once at runtime startup; below-threshold calls pay only a
1195    /// single relaxed atomic load. Threshold + sample-pct come from
1196    /// `runtime.slow_query.threshold_ms` / `.sample_pct` (config matrix)
1197    /// at construction; live tuning via the config tree is a follow-up.
1198    slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
1199    /// Process-local normal-KV operation counters. These are intentionally
1200    /// runtime-local; persistent accounting belongs in catalog stats.
1201    kv_stats: KvStatsCounters,
1202    metrics_ingest_stats: MetricsIngestCounters,
1203    metrics_tenant_activity_stats: MetricsTenantActivityCounters,
1204    /// Slice 10 of issue #527 — Prometheus counters for the queue
1205    /// lifecycle (delivered/acked/nacked) rendered onto `/metrics`.
1206    /// Process-local; counters reset on restart by design.
1207    queue_telemetry: std::sync::Arc<queue_telemetry::QueueTelemetryCounters>,
1208    /// Issue #742 — consumer presence (heartbeat / lease / lifecycle
1209    /// state per (queue, group, consumer)). Process-local in this
1210    /// slice; durability across restart lands in the follow-up that
1211    /// mirrors the registry into `red_queue_meta` rows. Independent
1212    /// of pending-delivery state by design — see
1213    /// `storage::queue::presence` for the contract.
1214    queue_presence: std::sync::Arc<crate::storage::queue::presence::ConsumerPresenceRegistry>,
1215    /// Issue #743 — vector + TurboQuant introspection registry. Red UI
1216    /// vector toolbars (and `red.*` vector virtual tables) read
1217    /// per-collection metadata + artifact state from here so they
1218    /// never have to reach into `engine::vector_store` or
1219    /// `engine::turboquant::*` internals. Engine publish points (build
1220    /// start / finish, fallback toggle, drop) call into this in
1221    /// follow-up slices of PRD #735; the public Rust surface lives on
1222    /// the runtime and does not change when those land.
1223    vector_introspection:
1224        std::sync::Arc<crate::storage::vector::introspection::VectorIntrospectionRegistry>,
1225    /// Slice C of PRD #718 — local wait registry for `QUEUE READ … WAIT`.
1226    /// Producer commits notify; readers park here until wake-or-timeout
1227    /// or until `cancel_all` is invoked at shutdown.
1228    queue_wait_registry: std::sync::Arc<queue_wait_registry::QueueWaitRegistry>,
1229    /// Per-connection list of `(scope, queue)` pairs that should be
1230    /// notified through `queue_wait_registry` on COMMIT (slice C of
1231    /// PRD #718). Push paths buffer here while inside a txn; the
1232    /// COMMIT path drains and notifies. ROLLBACK discards — by design
1233    /// rolled-back enqueues do not deliver and do not wake waiters.
1234    pending_queue_wakes: parking_lot::RwLock<HashMap<u64, Vec<(String, String)>>>,
1235    /// Process-local normal-KV tag index used by `INVALIDATE TAGS`.
1236    kv_tag_index: KvTagIndex,
1237    /// Issue #524 — in-memory chain-tip cache per collection. Populated lazily
1238    /// by the first INSERT or `GET /chain-tip` call after restart and updated
1239    /// atomically with each chain INSERT. Backed by a single mutex so a chain
1240    /// INSERT serialises against concurrent submitters — the loser observes
1241    /// the advanced tip and surfaces `BlockchainConflict` to its caller.
1242    chain_tip_cache:
1243        parking_lot::Mutex<HashMap<String, crate::runtime::blockchain_kind::ChainTipFull>>,
1244    /// Issue #525 — in-memory mirror of the persisted `integrity` flag per
1245    /// chain collection.  `true` means INSERTs must be rejected with
1246    /// `ChainIntegrityBroken`.  Loaded lazily from `red_config` on first
1247    /// access so the flag survives restart.
1248    chain_integrity_broken: parking_lot::Mutex<HashMap<String, bool>>,
1249    /// Issue #765 / S6 — in-memory cache of input-stream integrity tombstone
1250    /// RID ranges. Loaded lazily from `red_config` on first read so the set
1251    /// survives restart (the durable list lives under
1252    /// `stream.integrity.tombstones`). `integrity_tombstones_state` is the
1253    /// hot-path gate: `0` = unloaded, `1` = loaded-empty (reads skip
1254    /// filtering after a single relaxed load), `2` = loaded-with-tombstones.
1255    integrity_tombstones:
1256        parking_lot::Mutex<Vec<crate::runtime::integrity_tombstone::TombstoneRange>>,
1257    integrity_tombstones_state: std::sync::atomic::AtomicU8,
1258}
1259
1260#[derive(Clone)]
1261pub struct RedDBRuntime {
1262    inner: Arc<RuntimeInner>,
1263}
1264
1265pub struct RuntimeConnection {
1266    id: u64,
1267    inner: Arc<RuntimeInner>,
1268}
1269
1270pub struct CausalSession {
1271    runtime: RedDBRuntime,
1272    bookmark: Option<crate::replication::CausalBookmark>,
1273    wait_timeout: std::time::Duration,
1274}
1275
1276impl CausalSession {
1277    pub fn bookmark_token(&self) -> Option<String> {
1278        self.bookmark.map(|bookmark| bookmark.encode())
1279    }
1280
1281    pub fn inject_bookmark(&mut self, token: &str) -> RedDBResult<()> {
1282        let bookmark = crate::replication::CausalBookmark::decode(token)
1283            .map_err(|err| RedDBError::InvalidOperation(err.to_string()))?;
1284        self.bookmark = Some(bookmark);
1285        Ok(())
1286    }
1287
1288    pub fn execute_query(&mut self, query: &str) -> RedDBResult<RuntimeQueryResult> {
1289        if is_select_query_text(query) {
1290            if let Some(bookmark) = self.bookmark {
1291                self.runtime
1292                    .wait_for_bookmark(&bookmark, self.wait_timeout)?;
1293            }
1294        }
1295        let result = self.runtime.execute_query(query)?;
1296        if let Some(token) = result.bookmark.as_deref() {
1297            self.inject_bookmark(token)?;
1298        }
1299        Ok(result)
1300    }
1301}
1302
1303fn is_select_query_text(query: &str) -> bool {
1304    query
1305        .trim_start()
1306        .get(..6)
1307        .is_some_and(|prefix| prefix.eq_ignore_ascii_case("select"))
1308}
1309
1310pub mod ai;
1311pub mod analytics_schema_registry;
1312pub(crate) mod analytics_source_catalog;
1313pub mod ask_pipeline;
1314pub mod audit_log;
1315pub mod audit_query;
1316pub mod authorized_search;
1317pub mod batch_insert;
1318pub mod blockchain_kind;
1319mod collection_contract;
1320pub mod config_matrix;
1321pub mod config_overlay;
1322pub mod config_watcher;
1323pub mod continuous_materialized_view;
1324pub mod control_events;
1325pub(crate) mod ddl;
1326pub mod disk_space_monitor;
1327mod dml_target_scan;
1328pub mod evidence_export;
1329mod expr_eval;
1330mod graph_dsl;
1331mod health_connection;
1332mod impl_config;
1333pub(crate) mod impl_core;
1334mod impl_ddl;
1335mod impl_dml;
1336mod impl_ec;
1337mod impl_events;
1338mod impl_graph;
1339mod impl_graph_commands;
1340pub mod impl_kv;
1341mod impl_migrations;
1342mod impl_native;
1343mod impl_physical;
1344mod impl_probabilistic;
1345pub mod impl_queue;
1346mod impl_search;
1347mod impl_timeseries;
1348mod impl_tree;
1349mod impl_vcs;
1350mod index_store;
1351pub mod integrity_tombstone;
1352mod join_filter;
1353mod keyed_spine;
1354pub mod kv_watch;
1355pub mod lease_lifecycle;
1356pub mod lease_loop;
1357pub mod lease_timer_wheel;
1358pub mod lifecycle;
1359pub mod locking;
1360pub(crate) mod materialization_limit;
1361pub(crate) mod metric_descriptor_catalog;
1362pub(crate) mod mutation;
1363pub(crate) mod primary_queue_store;
1364mod probabilistic_store;
1365pub mod query_audit;
1366pub(crate) mod query_exec;
1367mod queue_delivery;
1368pub(crate) mod queue_lifecycle;
1369pub(crate) mod queue_telemetry;
1370pub(crate) mod queue_wait_registry;
1371pub mod quota_bucket;
1372mod record_search;
1373mod red_schema;
1374pub(crate) mod replica_queue_store;
1375pub mod resource_limits;
1376pub(crate) mod retention_filter;
1377pub(crate) mod retention_sweeper;
1378pub(crate) mod scalar_evaluator;
1379pub mod schema_diff;
1380pub mod schema_vocabulary;
1381pub(crate) mod sessionize;
1382pub mod signed_chain;
1383pub mod signed_writes_kind;
1384pub(crate) mod slo_descriptor_catalog;
1385pub mod snapshot_reuse;
1386mod statement_frame;
1387mod table_row_mvcc_resolver;
1388pub mod turbo_crash_inject;
1389mod vector_index;
1390pub mod vector_turbo_kind;
1391pub(crate) mod window_phase;
1392pub mod within_clause;
1393pub mod write_gate;
1394
1395pub use self::graph_dsl::*;
1396use self::join_filter::*;
1397use self::query_exec::*;
1398use self::record_search::*;
1399pub use self::statement_frame::EffectiveScope;
1400
1401/// Re-exports for transports + tests that need per-connection
1402/// isolation, tenant / auth thread-locals, and MVCC snapshot
1403/// utilities. Mirrors what PG-wire / gRPC / HTTP middleware already
1404/// call, and is enough to emulate independent connections in
1405/// integration tests.
1406pub mod mvcc {
1407    pub use super::impl_core::{
1408        capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
1409        clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
1410        entity_visible_under_current_snapshot, entity_visible_with_context,
1411        set_current_auth_identity, set_current_connection_id, set_current_snapshot,
1412        set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
1413    };
1414}
1415
1416/// Public helpers re-exported for use by the presentation layer.
1417pub mod record_search_helpers {
1418    use crate::storage::query::UnifiedRecord;
1419    use crate::storage::UnifiedEntity;
1420    use std::collections::BTreeSet;
1421
1422    pub fn entity_type_and_capabilities(
1423        entity: &UnifiedEntity,
1424    ) -> (&'static str, BTreeSet<String>) {
1425        super::record_search::runtime_entity_type_and_capabilities(entity)
1426    }
1427
1428    /// Materialise any entity kind (TableRow, Node, Edge, Vector,
1429    /// TimeSeriesPoint, QueueMessage) into a `UnifiedRecord` whose
1430    /// `values` carry the native fields. Used by the RLS evaluator
1431    /// when a non-table collection matches a `CompareExpr` policy.
1432    pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
1433        super::record_search::runtime_any_record_from_entity(entity)
1434    }
1435}