Skip to main content

reddb_server/
runtime.rs

1//! Embedded runtime with connection pooling, scans and health.
2
3use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{RedDBError, RedDBOptions, RedDBResult};
10use crate::catalog::{
11    CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
12    CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
13};
14use crate::health::{HealthProvider, HealthReport};
15use crate::index::IndexCatalog;
16use crate::physical::{
17    ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
18    SnapshotDescriptor,
19};
20use crate::serde_json::Value as JsonValue;
21use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
22use crate::storage::engine::{
23    BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
24    CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
25    IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
26    MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
27    StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
28};
29use crate::storage::query::ast::{
30    AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateCollectionQuery,
31    CreateIndexQuery, CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery,
32    CreateVectorQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery,
33    DropIndexQuery, DropKvQuery, DropQueueQuery, DropTableQuery, DropTimeSeriesQuery,
34    DropTreeQuery, DropVectorQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainFormat,
35    FieldRef, Filter, FusionStrategy, GraphCommand, HybridQuery, IndexMethod, InsertEntityType,
36    InsertQuery, JoinQuery, JoinType, OrderByClause, ProbabilisticCommand, Projection, QueryExpr,
37    QueueCommand, QueueSelectQuery, QueueSide, SearchCommand, TableQuery, TreeCommand,
38    TruncateQuery, UpdateQuery, VectorQuery, VectorSource,
39};
40use crate::storage::query::is_universal_entity_source as is_universal_query_source;
41use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
42use crate::storage::query::planner::{
43    CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
44};
45use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
46use crate::storage::schema::Value;
47use crate::storage::unified::dsl::{
48    apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
49    FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
50    QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
51};
52use crate::storage::unified::store::{
53    NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
54    NativeRegistrySummary,
55};
56use crate::storage::unified::{
57    Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
58};
59use crate::storage::{
60    EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
61    UnifiedStore,
62};
63
64#[derive(Debug, Clone)]
65pub struct ConnectionPoolConfig {
66    pub max_connections: usize,
67    pub max_idle: usize,
68}
69
70impl Default for ConnectionPoolConfig {
71    fn default() -> Self {
72        Self {
73            max_connections: 64,
74            max_idle: 16,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct ScanCursor {
81    pub offset: usize,
82}
83
84#[derive(Debug, Clone)]
85pub struct ScanPage {
86    pub collection: String,
87    pub items: Vec<UnifiedEntity>,
88    pub next: Option<ScanCursor>,
89    pub total: usize,
90}
91
92#[derive(Debug, Clone)]
93pub struct SystemInfo {
94    pub pid: u32,
95    pub cpu_cores: usize,
96    pub total_memory_bytes: u64,
97    pub available_memory_bytes: u64,
98    pub os: String,
99    pub arch: String,
100    pub hostname: String,
101}
102
103impl SystemInfo {
104    /// Whether the system has enough cores to benefit from parallelism.
105    /// Returns false on single-core machines where thread overhead > gains.
106    pub fn should_parallelize() -> bool {
107        std::thread::available_parallelism()
108            .map(|p| p.get() > 1)
109            .unwrap_or(false)
110    }
111
112    pub fn collect() -> Self {
113        Self {
114            pid: std::process::id(),
115            cpu_cores: std::thread::available_parallelism()
116                .map(|p| p.get())
117                .unwrap_or(1),
118            total_memory_bytes: Self::read_total_memory(),
119            available_memory_bytes: Self::read_available_memory(),
120            os: std::env::consts::OS.to_string(),
121            arch: std::env::consts::ARCH.to_string(),
122            hostname: std::env::var("HOSTNAME")
123                .or_else(|_| std::env::var("COMPUTERNAME"))
124                .unwrap_or_else(|_| "unknown".to_string()),
125        }
126    }
127
128    #[cfg(target_os = "linux")]
129    fn read_total_memory() -> u64 {
130        std::fs::read_to_string("/proc/meminfo")
131            .ok()
132            .and_then(|s| {
133                s.lines()
134                    .find(|l| l.starts_with("MemTotal:"))
135                    .and_then(|l| {
136                        l.split_whitespace()
137                            .nth(1)
138                            .and_then(|v| v.parse::<u64>().ok())
139                    })
140                    .map(|kb| kb * 1024)
141            })
142            .unwrap_or(0)
143    }
144
145    #[cfg(target_os = "linux")]
146    fn read_available_memory() -> u64 {
147        std::fs::read_to_string("/proc/meminfo")
148            .ok()
149            .and_then(|s| {
150                s.lines()
151                    .find(|l| l.starts_with("MemAvailable:"))
152                    .and_then(|l| {
153                        l.split_whitespace()
154                            .nth(1)
155                            .and_then(|v| v.parse::<u64>().ok())
156                    })
157                    .map(|kb| kb * 1024)
158            })
159            .unwrap_or(0)
160    }
161
162    #[cfg(not(target_os = "linux"))]
163    fn read_total_memory() -> u64 {
164        0
165    }
166
167    #[cfg(not(target_os = "linux"))]
168    fn read_available_memory() -> u64 {
169        0
170    }
171}
172
173#[derive(Debug, Clone)]
174pub struct RuntimeStats {
175    pub active_connections: usize,
176    pub idle_connections: usize,
177    pub total_checkouts: u64,
178    pub paged_mode: bool,
179    pub started_at_unix_ms: u128,
180    pub store: StoreStats,
181    pub system: SystemInfo,
182    pub result_blob_cache: crate::storage::cache::BlobCacheStats,
183    pub kv: KvStats,
184}
185
186#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
187pub struct KvStats {
188    pub puts: u64,
189    pub gets: u64,
190    pub deletes: u64,
191    pub incrs: u64,
192    pub cas_success: u64,
193    pub cas_conflict: u64,
194    pub watch_streams_active: u64,
195    pub watch_events_emitted: u64,
196    pub watch_drops: u64,
197}
198
199#[derive(Debug, Default)]
200pub(crate) struct KvStatsCounters {
201    puts: AtomicU64,
202    gets: AtomicU64,
203    deletes: AtomicU64,
204    incrs: AtomicU64,
205    cas_success: AtomicU64,
206    cas_conflict: AtomicU64,
207    watch_streams_active: AtomicU64,
208    watch_events_emitted: AtomicU64,
209    watch_drops: AtomicU64,
210}
211
212#[derive(Debug, Default)]
213pub(crate) struct KvTagIndex {
214    tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
215    key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
216}
217
218impl KvTagIndex {
219    pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
220        let entry_key = (collection.to_string(), key.to_string());
221        let new_tags: BTreeSet<String> = tags
222            .iter()
223            .map(|tag| tag.trim())
224            .filter(|tag| !tag.is_empty())
225            .map(ToOwned::to_owned)
226            .collect();
227
228        let old_tags = {
229            let mut key_to_tags = self.key_to_tags.write();
230            if new_tags.is_empty() {
231                key_to_tags.remove(&entry_key)
232            } else {
233                key_to_tags.insert(entry_key.clone(), new_tags.clone())
234            }
235        };
236
237        let mut tag_to_entries = self.tag_to_entries.write();
238        if let Some(old_tags) = old_tags {
239            for tag in old_tags {
240                let scoped = (collection.to_string(), tag);
241                let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
242                    entries.remove(key);
243                    entries.is_empty()
244                } else {
245                    false
246                };
247                if remove_scoped {
248                    tag_to_entries.remove(&scoped);
249                }
250            }
251        }
252
253        for tag in new_tags {
254            tag_to_entries
255                .entry((collection.to_string(), tag))
256                .or_default()
257                .insert(key.to_string(), id);
258        }
259    }
260
261    pub(crate) fn remove(&self, collection: &str, key: &str) {
262        let entry_key = (collection.to_string(), key.to_string());
263        let old_tags = self.key_to_tags.write().remove(&entry_key);
264        let Some(old_tags) = old_tags else {
265            return;
266        };
267
268        let mut tag_to_entries = self.tag_to_entries.write();
269        for tag in old_tags {
270            let scoped = (collection.to_string(), tag);
271            let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
272                entries.remove(key);
273                entries.is_empty()
274            } else {
275                false
276            };
277            if remove_scoped {
278                tag_to_entries.remove(&scoped);
279            }
280        }
281    }
282
283    pub(crate) fn entries_for_tags(
284        &self,
285        collection: &str,
286        tags: &[String],
287    ) -> Vec<(String, EntityId)> {
288        if tags.is_empty() {
289            return Vec::new();
290        }
291
292        let tag_to_entries = self.tag_to_entries.read();
293        let mut out: HashMap<String, EntityId> = HashMap::new();
294        for tag in tags {
295            let scoped = (collection.to_string(), tag.trim().to_string());
296            if let Some(entries) = tag_to_entries.get(&scoped) {
297                for (key, id) in entries {
298                    out.entry(key.clone()).or_insert(*id);
299                }
300            }
301        }
302        out.into_iter().collect()
303    }
304
305    pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
306        self.key_to_tags
307            .read()
308            .get(&(collection.to_string(), key.to_string()))
309            .map(|tags| tags.iter().cloned().collect())
310            .unwrap_or_default()
311    }
312}
313
314impl KvStatsCounters {
315    pub(crate) fn snapshot(&self) -> KvStats {
316        KvStats {
317            puts: self.puts.load(AtomicOrdering::Relaxed),
318            gets: self.gets.load(AtomicOrdering::Relaxed),
319            deletes: self.deletes.load(AtomicOrdering::Relaxed),
320            incrs: self.incrs.load(AtomicOrdering::Relaxed),
321            cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
322            cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
323            watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
324            watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
325            watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
326        }
327    }
328
329    pub(crate) fn incr_puts(&self) {
330        self.puts.fetch_add(1, AtomicOrdering::Relaxed);
331    }
332
333    pub(crate) fn incr_gets(&self) {
334        self.gets.fetch_add(1, AtomicOrdering::Relaxed);
335    }
336
337    pub(crate) fn incr_deletes(&self) {
338        self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
339    }
340
341    pub(crate) fn incr_incrs(&self) {
342        self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
343    }
344
345    pub(crate) fn incr_cas_success(&self) {
346        self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
347    }
348
349    pub(crate) fn incr_cas_conflict(&self) {
350        self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
351    }
352
353    pub(crate) fn incr_watch_streams_active(&self) {
354        self.watch_streams_active
355            .fetch_add(1, AtomicOrdering::Relaxed);
356    }
357
358    pub(crate) fn decr_watch_streams_active(&self) {
359        self.watch_streams_active
360            .fetch_sub(1, AtomicOrdering::Relaxed);
361    }
362
363    pub(crate) fn incr_watch_events_emitted(&self) {
364        self.watch_events_emitted
365            .fetch_add(1, AtomicOrdering::Relaxed);
366    }
367
368    pub(crate) fn add_watch_drops(&self, count: u64) {
369        self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
370    }
371}
372
373#[derive(Debug, Clone)]
374pub struct RuntimeQueryResult {
375    pub query: String,
376    pub mode: QueryMode,
377    pub statement: &'static str,
378    pub engine: &'static str,
379    pub result: UnifiedResult,
380    pub affected_rows: u64,
381    /// High-level statement type: "select", "insert", "update", "delete", "create", "drop", "alter"
382    pub statement_type: &'static str,
383}
384
385impl RuntimeQueryResult {
386    /// Construct a result representing a DML operation (insert/update/delete).
387    pub fn dml_result(
388        query: String,
389        affected: u64,
390        statement_type: &'static str,
391        engine: &'static str,
392    ) -> Self {
393        Self {
394            query,
395            mode: QueryMode::Sql,
396            statement: statement_type,
397            engine,
398            result: UnifiedResult::empty(),
399            affected_rows: affected,
400            statement_type,
401        }
402    }
403
404    /// Construct a result representing a DDL message (create/drop/alter).
405    pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
406        let mut result = UnifiedResult::empty();
407        let mut record = UnifiedRecord::new();
408        record.set("message", Value::text(message.to_string()));
409        result.push(record);
410        result.columns = vec!["message".to_string()];
411
412        Self {
413            query,
414            mode: QueryMode::Sql,
415            statement: statement_type,
416            engine: "runtime-ddl",
417            result,
418            affected_rows: 0,
419            statement_type,
420        }
421    }
422
423    /// Construct a multi-column record result for read-only meta commands
424    /// (EXPLAIN ALTER, schema introspection, etc.). Each row is a Vec of
425    /// (column_name, value) pairs in column order.
426    pub fn ok_records(
427        query: String,
428        columns: Vec<String>,
429        rows: Vec<Vec<(String, Value)>>,
430        statement_type: &'static str,
431    ) -> Self {
432        let mut result = UnifiedResult::empty();
433        for row in rows {
434            let mut record = UnifiedRecord::new();
435            for (k, v) in row {
436                record.set(&k, v);
437            }
438            result.push(record);
439        }
440        result.columns = columns;
441
442        Self {
443            query,
444            mode: QueryMode::Sql,
445            statement: statement_type,
446            engine: "runtime-meta",
447            result,
448            affected_rows: 0,
449            statement_type,
450        }
451    }
452}
453
454#[derive(Debug, Clone)]
455pub struct RuntimeQueryExplain {
456    pub query: String,
457    pub mode: QueryMode,
458    pub statement: &'static str,
459    pub is_universal: bool,
460    pub plan_cost: crate::storage::query::planner::PlanCost,
461    pub estimated_rows: f64,
462    pub estimated_selectivity: f64,
463    pub estimated_confidence: f64,
464    pub passes_applied: Vec<String>,
465    pub logical_plan: CanonicalLogicalPlan,
466    /// Names of any CTEs declared by a leading `WITH` clause. Empty
467    /// for non-CTE queries. The plan tree is built against the
468    /// post-inlining body, so each CTE's body is reachable inside
469    /// `logical_plan` as a regular `Subquery` (or, for bare refs, the
470    /// inlined Table node verbatim). This list lets renderers prepend
471    /// `CteScan` markers so operators see which CTEs were resolved.
472    pub cte_materializations: Vec<String>,
473}
474
475#[derive(Debug, Clone)]
476pub struct RuntimeIvfMatch {
477    pub entity_id: u64,
478    pub distance: f32,
479    pub entity: Option<UnifiedEntity>,
480}
481
482#[derive(Debug, Clone)]
483pub struct RuntimeIvfSearchResult {
484    pub collection: String,
485    pub k: usize,
486    pub n_lists: usize,
487    pub n_probes: usize,
488    pub stats: IvfStats,
489    pub matches: Vec<RuntimeIvfMatch>,
490}
491
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493pub enum RuntimeGraphDirection {
494    Outgoing,
495    Incoming,
496    Both,
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq)]
500pub enum RuntimeGraphTraversalStrategy {
501    Bfs,
502    Dfs,
503}
504
505#[derive(Debug, Clone, Copy, PartialEq, Eq)]
506pub enum RuntimeGraphPathAlgorithm {
507    Bfs,
508    Dijkstra,
509    AStar,
510    BellmanFord,
511}
512
513#[derive(Debug, Clone)]
514pub struct RuntimeGraphNode {
515    pub id: String,
516    pub label: String,
517    pub node_type: String,
518    pub out_edge_count: u32,
519    pub in_edge_count: u32,
520}
521
522#[derive(Debug, Clone)]
523pub struct RuntimeGraphEdge {
524    pub source: String,
525    pub target: String,
526    pub edge_type: String,
527    pub weight: f32,
528}
529
530#[derive(Debug, Clone)]
531pub struct RuntimeGraphVisit {
532    pub depth: usize,
533    pub node: RuntimeGraphNode,
534}
535
536#[derive(Debug, Clone)]
537pub struct RuntimeGraphNeighborhoodResult {
538    pub source: String,
539    pub direction: RuntimeGraphDirection,
540    pub max_depth: usize,
541    pub nodes: Vec<RuntimeGraphVisit>,
542    pub edges: Vec<RuntimeGraphEdge>,
543}
544
545#[derive(Debug, Clone)]
546pub struct RuntimeGraphTraversalResult {
547    pub source: String,
548    pub direction: RuntimeGraphDirection,
549    pub strategy: RuntimeGraphTraversalStrategy,
550    pub max_depth: usize,
551    pub visits: Vec<RuntimeGraphVisit>,
552    pub edges: Vec<RuntimeGraphEdge>,
553}
554
555#[derive(Debug, Clone)]
556pub struct RuntimeGraphPath {
557    pub hop_count: usize,
558    pub total_weight: f64,
559    pub nodes: Vec<RuntimeGraphNode>,
560    pub edges: Vec<RuntimeGraphEdge>,
561}
562
563#[derive(Debug, Clone)]
564pub struct RuntimeGraphPathResult {
565    pub source: String,
566    pub target: String,
567    pub direction: RuntimeGraphDirection,
568    pub algorithm: RuntimeGraphPathAlgorithm,
569    pub nodes_visited: usize,
570    pub negative_cycle_detected: Option<bool>,
571    pub path: Option<RuntimeGraphPath>,
572}
573
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575pub enum RuntimeGraphComponentsMode {
576    Connected,
577    Weak,
578    Strong,
579}
580
581#[derive(Debug, Clone, Copy, PartialEq, Eq)]
582pub enum RuntimeGraphCentralityAlgorithm {
583    Degree,
584    Closeness,
585    Betweenness,
586    Eigenvector,
587    PageRank,
588}
589
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
591pub enum RuntimeGraphCommunityAlgorithm {
592    LabelPropagation,
593    Louvain,
594}
595
596#[derive(Debug, Clone)]
597pub struct RuntimeGraphComponent {
598    pub id: String,
599    pub size: usize,
600    pub nodes: Vec<String>,
601}
602
603#[derive(Debug, Clone)]
604pub struct RuntimeGraphComponentsResult {
605    pub mode: RuntimeGraphComponentsMode,
606    pub count: usize,
607    pub components: Vec<RuntimeGraphComponent>,
608}
609
610#[derive(Debug, Clone)]
611pub struct RuntimeGraphCentralityScore {
612    pub node: RuntimeGraphNode,
613    pub score: f64,
614}
615
616#[derive(Debug, Clone)]
617pub struct RuntimeGraphDegreeScore {
618    pub node: RuntimeGraphNode,
619    pub in_degree: usize,
620    pub out_degree: usize,
621    pub total_degree: usize,
622}
623
624#[derive(Debug, Clone)]
625pub struct RuntimeGraphCentralityResult {
626    pub algorithm: RuntimeGraphCentralityAlgorithm,
627    pub normalized: Option<bool>,
628    pub iterations: Option<usize>,
629    pub converged: Option<bool>,
630    pub scores: Vec<RuntimeGraphCentralityScore>,
631    pub degree_scores: Vec<RuntimeGraphDegreeScore>,
632}
633
634#[derive(Debug, Clone)]
635pub struct RuntimeGraphCommunity {
636    pub id: String,
637    pub size: usize,
638    pub nodes: Vec<String>,
639}
640
641#[derive(Debug, Clone)]
642pub struct RuntimeGraphCommunityResult {
643    pub algorithm: RuntimeGraphCommunityAlgorithm,
644    pub count: usize,
645    pub iterations: Option<usize>,
646    pub converged: Option<bool>,
647    pub modularity: Option<f64>,
648    pub passes: Option<usize>,
649    pub communities: Vec<RuntimeGraphCommunity>,
650}
651
652#[derive(Debug, Clone)]
653pub struct RuntimeGraphClusteringResult {
654    pub global: f64,
655    pub local: Vec<RuntimeGraphCentralityScore>,
656    pub triangle_count: Option<usize>,
657}
658
659#[derive(Debug, Clone)]
660pub struct RuntimeGraphHitsResult {
661    pub iterations: usize,
662    pub converged: bool,
663    pub hubs: Vec<RuntimeGraphCentralityScore>,
664    pub authorities: Vec<RuntimeGraphCentralityScore>,
665}
666
667#[derive(Debug, Clone)]
668pub struct RuntimeGraphCyclesResult {
669    pub limit_reached: bool,
670    pub cycles: Vec<RuntimeGraphPath>,
671}
672
673#[derive(Debug, Clone)]
674pub struct RuntimeGraphTopologicalSortResult {
675    pub acyclic: bool,
676    pub ordered_nodes: Vec<RuntimeGraphNode>,
677}
678
679#[derive(Debug, Clone)]
680pub struct RuntimeGraphPropertiesResult {
681    pub node_count: usize,
682    pub edge_count: usize,
683    pub self_loop_count: usize,
684    pub negative_edge_count: usize,
685    pub connected_component_count: usize,
686    pub weak_component_count: usize,
687    pub strong_component_count: usize,
688    pub is_empty: bool,
689    pub is_connected: bool,
690    pub is_weakly_connected: bool,
691    pub is_strongly_connected: bool,
692    pub is_complete: bool,
693    pub is_complete_directed: bool,
694    pub is_cyclic: bool,
695    pub is_circular: bool,
696    pub is_acyclic: bool,
697    pub is_tree: bool,
698    pub density: f64,
699    pub density_directed: f64,
700}
701
702// ============================================================================
703// Context Search types
704// ============================================================================
705
706#[derive(Debug, Clone)]
707pub struct ContextSearchResult {
708    pub query: String,
709    pub tables: Vec<ContextEntity>,
710    pub graph: ContextGraphResult,
711    pub vectors: Vec<ContextEntity>,
712    pub documents: Vec<ContextEntity>,
713    pub key_values: Vec<ContextEntity>,
714    pub connections: Vec<ContextConnection>,
715    pub summary: ContextSummary,
716}
717
718#[derive(Debug, Clone)]
719pub struct ContextEntity {
720    pub entity: UnifiedEntity,
721    pub score: f32,
722    pub discovery: DiscoveryMethod,
723    pub collection: String,
724}
725
726#[derive(Debug, Clone)]
727pub enum DiscoveryMethod {
728    Indexed {
729        field: String,
730    },
731    GlobalScan,
732    CrossReference {
733        source_id: u64,
734        ref_type: String,
735    },
736    GraphTraversal {
737        source_id: u64,
738        edge_type: String,
739        depth: usize,
740    },
741    VectorQuery {
742        similarity: f32,
743    },
744}
745
746#[derive(Debug, Clone)]
747pub struct ContextGraphResult {
748    pub nodes: Vec<ContextEntity>,
749    pub edges: Vec<ContextEntity>,
750}
751
752#[derive(Debug, Clone)]
753pub struct ContextConnection {
754    pub from_id: u64,
755    pub to_id: u64,
756    pub connection_type: ContextConnectionType,
757    pub weight: f32,
758}
759
760#[derive(Debug, Clone)]
761pub enum ContextConnectionType {
762    CrossRef(String),
763    GraphEdge(String),
764    VectorSimilarity(f32),
765}
766
767#[derive(Debug, Clone)]
768pub struct ContextSummary {
769    pub total_entities: usize,
770    pub direct_matches: usize,
771    pub expanded_via_graph: usize,
772    pub expanded_via_cross_refs: usize,
773    pub expanded_via_vector_query: usize,
774    pub collections_searched: usize,
775    pub execution_time_us: u64,
776    pub tiers_used: Vec<String>,
777    pub entities_reindexed: usize,
778}
779
780struct PoolState {
781    next_id: u64,
782    active: usize,
783    idle: Vec<u64>,
784    total_checkouts: u64,
785}
786
787impl Default for PoolState {
788    fn default() -> Self {
789        Self {
790            next_id: 1,
791            active: 0,
792            idle: Vec::new(),
793            total_checkouts: 0,
794        }
795    }
796}
797
798#[derive(Debug, Clone)]
799struct RuntimeResultCacheEntry {
800    result: RuntimeQueryResult,
801    cached_at: std::time::Instant,
802    scopes: HashSet<String>,
803}
804
805pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
806pub(crate) const ASK_ANSWER_CACHE_NAMESPACE: &str = "runtime.ask_answer_cache";
807
808struct RuntimeInner {
809    db: Arc<RedDB>,
810    layout: PhysicalLayout,
811    indices: IndexCatalog,
812    pool_config: ConnectionPoolConfig,
813    pool: Mutex<PoolState>,
814    started_at_unix_ms: u128,
815    probabilistic: probabilistic_store::ProbabilisticStore,
816    index_store: index_store::IndexStore,
817    cdc: crate::replication::cdc::CdcBuffer,
818    backup_scheduler: crate::replication::scheduler::BackupScheduler,
819    query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
820    result_cache: parking_lot::RwLock<(
821        HashMap<String, RuntimeResultCacheEntry>,
822        std::collections::VecDeque<String>,
823    )>,
824    result_blob_cache: crate::storage::cache::BlobCache,
825    result_blob_entries: parking_lot::RwLock<(
826        HashMap<String, RuntimeResultCacheEntry>,
827        std::collections::VecDeque<String>,
828    )>,
829    ask_answer_cache_entries:
830        parking_lot::RwLock<(HashSet<String>, std::collections::VecDeque<String>)>,
831    result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
832    ask_daily_spend:
833        parking_lot::RwLock<HashMap<String, crate::runtime::ai::cost_guard::DailyState>>,
834    /// Process-local queue message locks used to emulate `SKIP LOCKED`-style
835    /// claim semantics for concurrent queue consumers inside this runtime.
836    queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
837    planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
838    ec_registry: Arc<crate::ec::config::EcRegistry>,
839    ec_worker: crate::ec::worker::EcWorker,
840    /// Optional AuthStore — injected by server boot when auth is
841    /// enabled. Required for `Value::Secret` auto-encrypt/decrypt
842    /// because the AES key lives in the vault KV under the
843    /// `red.secret.aes_key` entry.
844    auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
845    /// Optional OAuth/OIDC JWT validator. Wired by server boot when
846    /// the operator configures issuer + JWKS via env / CLI. HTTP and
847    /// wire transports read this on every bearer-token request and,
848    /// when the token decodes as a JWT, validate it against the
849    /// configured issuer + audience + signature before falling back to
850    /// the local AuthStore lookup.
851    oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
852    /// View registry (Phase 2.1 PG parity).
853    ///
854    /// Holds the parsed `SELECT` body for every view created via
855    /// `CREATE [MATERIALIZED] VIEW`. Queries that reference a view name
856    /// substitute the stored `QueryExpr` at execution time. Materialized
857    /// views additionally back onto the shared `MaterializedViewCache`
858    /// (see `RuntimeInner::materialized_views`).
859    ///
860    /// This is in-memory only in Phase 2.1 — view definitions do not
861    /// survive a restart. Persistence is a Phase 3 follow-up.
862    views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
863    materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
864    /// MVCC snapshot manager (Phase 2.3 PG parity).
865    ///
866    /// Allocates monotonic `xid`s on BEGIN and tracks the active/aborted
867    /// sets used by `Snapshot::sees` to filter tuples by visibility. Each
868    /// query evaluates `entity.is_visible(snapshot.xid)` — pre-MVCC rows
869    /// (`xmin == 0`) stay visible to every snapshot, preserving backward
870    /// compatibility with data written before the xid fields existed.
871    snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
872    /// Connection → active transaction context map (Phase 2.3 PG parity).
873    ///
874    /// Keyed by connection id from `RuntimeConnection`. Populated on BEGIN,
875    /// cleared on COMMIT/ROLLBACK. When a statement executes outside a
876    /// transaction (autocommit path) no entry exists and writes stamp
877    /// `xid=0` — identical to pre-MVCC behaviour.
878    tx_contexts:
879        parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
880    /// Intent-lock hierarchy (IS/IX/S/X) used to break the implicit
881    /// global-write serialisation in write paths. Populated at boot
882    /// with `concurrency.locking.deadlock_timeout_ms` from the matrix
883    /// and wired through DML/DDL dispatch in later P1 tasks.
884    /// Dormant until P1.T3 flips the read path to `(Global,IS) →
885    /// (Collection,IS)` and P1.T4/T5 pick up writes/DDL.
886    lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
887    /// Perf-parity env-var overrides (`REDDB_<UP_DOTTED_KEY>`).
888    /// Populated once at boot, read by every config getter; takes
889    /// precedence over the persisted red_config value so operators
890    /// can hot-fix a bad config by restarting with a different env
891    /// var set. Keys are restricted to those declared in the matrix.
892    env_config_overrides: HashMap<String, String>,
893    /// Transaction-local tenant override (`SET LOCAL TENANT '<id>'`).
894    /// Keyed by connection id, mirroring `tx_contexts`. Lives only while
895    /// a transaction is open — `COMMIT` / `ROLLBACK` evict the entry,
896    /// returning the connection to whichever session-level tenant
897    /// (`SET TENANT 'x'`) was active before the transaction began.
898    /// Wins over the session value but loses to a per-statement
899    /// `WITHIN TENANT '<id>' …` override on the same call.
900    tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
901    /// Row-level security policies (Phase 2.5 PG parity).
902    ///
903    /// Keyed by `(table_name, policy_name)`; the set of tables with RLS
904    /// enforcement toggled on lives in `rls_enabled_tables`. Filter
905    /// enforcement hooks into the read path via `collect_rls_filters()`
906    /// — see `runtime::impl_core`.
907    rls_policies: parking_lot::RwLock<
908        HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
909    >,
910    rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
911    /// Foreign Data Wrapper registry (Phase 3.2 PG parity).
912    ///
913    /// Maps server names → wrapper instances and foreign-table names →
914    /// definitions. Queries referencing a registered foreign table are
915    /// re-routed to `ForeignTableRegistry::scan` by the read-path
916    /// rewriter; reads against unknown names fall through to the native
917    /// collection lookup.
918    foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
919    /// Per-connection list of tuples marked for deletion by the current
920    /// transaction (Phase 2.3.2b MVCC tombstones + 2.3.2e savepoints).
921    /// Each entry is `(collection, entity_id, stamper_xid, previous_xmax)`
922    /// — the xid that stamped xmax on the tuple plus the value it
923    /// replaced. For a plain transaction the
924    /// stamper equals `ctx.xid`; with savepoints the stamper equals
925    /// the innermost open sub-xid so ROLLBACK TO SAVEPOINT can revive
926    /// only the matching subset. COMMIT drains the whole conn list
927    /// and keeps the committed tombstones; ROLLBACK (whole-tx) revives them all;
928    /// ROLLBACK TO SAVEPOINT revives those with `stamper_xid >=
929    /// savepoint_xid`. Autocommit DELETE bypasses this map.
930    pending_tombstones: parking_lot::RwLock<
931        HashMap<
932            u64,
933            Vec<(
934                String,
935                crate::storage::unified::entity::EntityId,
936                crate::storage::transaction::snapshot::Xid,
937                crate::storage::transaction::snapshot::Xid,
938            )>,
939        >,
940    >,
941    /// Per-connection table-row UPDATE versions created by an open
942    /// transaction. Each entry is `(collection, old_entity_id,
943    /// new_entity_id, stamper_xid, previous_xmax)`. COMMIT keeps both physical
944    /// versions and drops the pending marker; ROLLBACK revives the old
945    /// version and removes the new uncommitted version.
946    pending_versioned_updates: parking_lot::RwLock<
947        HashMap<
948            u64,
949            Vec<(
950                String,
951                crate::storage::unified::entity::EntityId,
952                crate::storage::unified::entity::EntityId,
953                crate::storage::transaction::snapshot::Xid,
954                crate::storage::transaction::snapshot::Xid,
955            )>,
956        >,
957    >,
958    pending_kv_watch_events:
959        parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
960    pending_store_wal_actions:
961        parking_lot::RwLock<HashMap<u64, crate::storage::unified::DeferredStoreWalActions>>,
962    /// Table-scoped tenancy registry (Phase 2.5.4).
963    ///
964    /// Maps `table_name → tenant_column`. DML auto-fill looks here to
965    /// inject `CURRENT_TENANT()` on INSERTs that omit the column, and
966    /// DDL keeps the in-memory registry in sync with the
967    /// `tenant_tables.*` keys in red_config. Read-side enforcement
968    /// piggy-backs on the existing RLS infrastructure: every entry
969    /// installs an implicit `col = CURRENT_TENANT()` policy.
970    tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
971    /// Monotonic epoch bumped on every DDL / schema-mutating operation
972    /// that calls `invalidate_plan_cache`. Prepared statements capture
973    /// this at PREPARE and re-check at EXECUTE — a mismatch means the
974    /// cached shape may reference dropped or renamed columns and the
975    /// client must re-PREPARE.
976    ddl_epoch: std::sync::atomic::AtomicU64,
977    /// Public-mutation gate (PLAN.md W1).
978    ///
979    /// Built once at construction from the immutable subset of
980    /// `RedDBOptions` (read_only flag + replication role). Every public
981    /// mutation surface — SQL DML/DDL, gRPC mutating RPCs, HTTP/native
982    /// wire mutations, admin maintenance endpoints, serverless
983    /// lifecycle — consults `write_gate.check(WriteKind::*)` before
984    /// dispatching to storage. The replica internal apply path
985    /// (`LogicalChangeApplier`) reaches into the store directly and
986    /// bypasses the gate by construction.
987    write_gate: Arc<crate::runtime::write_gate::WriteGate>,
988    /// Process lifecycle state machine (PLAN.md Phase 1 — Lifecycle
989    /// Contract). Drives `/health/live`, `/health/ready`,
990    /// `/health/startup`, and `POST /admin/shutdown` so any
991    /// orchestrator (K8s preStop, Fly autostop, ECS task drain,
992    /// systemd) can coordinate without losing data.
993    lifecycle: crate::runtime::lifecycle::Lifecycle,
994    /// Operator-imposed resource limits (PLAN.md Phase 4.1).
995    /// Read once at boot from `RED_MAX_*` env vars; consulted by
996    /// observability and (in follow-up commits) the per-write
997    /// enforcement points.
998    resource_limits: crate::runtime::resource_limits::ResourceLimits,
999    /// Append-only audit log for admin mutations (PLAN.md Phase
1000    /// 6.5). Lives next to the primary `.rdb` file so backup +
1001    /// restore flows ship it alongside the data.
1002    audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
1003    /// Serverless writer-lease state machine. `None` when the operator
1004    /// did not opt into lease fencing (`RED_LEASE_REQUIRED` unset/false).
1005    /// When set, owns the {acquire/refresh/release/lost} transitions and
1006    /// is the single place that mutates `write_gate.set_lease_state` and
1007    /// records lease/* audit entries — keeping those two side-effects
1008    /// from drifting.
1009    lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
1010    /// PLAN.md Phase 11.5 — counters bumped by the replica apply
1011    /// loop on `Gap` / `Divergence` / `Apply` errors so /metrics
1012    /// surfaces them as `reddb_replica_apply_errors_total{kind}`.
1013    replica_apply_metrics: crate::replication::logical::ReplicaApplyMetrics,
1014    /// PLAN.md Phase 4.4 — per-caller QPS quotas. Disabled (no-op)
1015    /// when `RED_MAX_QPS_PER_CALLER` is unset.
1016    quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
1017    /// Issue #120 — token → schema entity reverse index, kept current
1018    /// incrementally on DDL events. Consumed by AskPipeline (issue
1019    /// #121) Stage 2 to narrow vector-search candidates before any
1020    /// embedding compute. Mutated only from DDL execution paths.
1021    schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
1022    /// Issue #205 — dedicated slow-query sink (`red-slow.log`).
1023    /// Built once at runtime startup; below-threshold calls pay only a
1024    /// single relaxed atomic load. Threshold + sample-pct come from
1025    /// `runtime.slow_query.threshold_ms` / `.sample_pct` (config matrix)
1026    /// at construction; live tuning via the config tree is a follow-up.
1027    slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
1028    /// Process-local normal-KV operation counters. These are intentionally
1029    /// runtime-local; persistent accounting belongs in catalog stats.
1030    kv_stats: KvStatsCounters,
1031    /// Process-local normal-KV tag index used by `INVALIDATE TAGS`.
1032    kv_tag_index: KvTagIndex,
1033}
1034
1035#[derive(Clone)]
1036pub struct RedDBRuntime {
1037    inner: Arc<RuntimeInner>,
1038}
1039
1040pub struct RuntimeConnection {
1041    id: u64,
1042    inner: Arc<RuntimeInner>,
1043}
1044
1045pub mod ai;
1046pub mod ask_pipeline;
1047pub mod audit_log;
1048pub mod audit_query;
1049pub mod authorized_search;
1050mod collection_contract;
1051pub mod config_matrix;
1052pub mod config_overlay;
1053pub mod config_watcher;
1054pub(crate) mod ddl;
1055pub mod disk_space_monitor;
1056mod dml_target_scan;
1057mod expr_eval;
1058mod graph_dsl;
1059mod health_connection;
1060mod impl_config;
1061pub(crate) mod impl_core;
1062mod impl_ddl;
1063mod impl_dml;
1064mod impl_ec;
1065mod impl_events;
1066mod impl_graph;
1067mod impl_graph_commands;
1068pub mod impl_kv;
1069mod impl_migrations;
1070mod impl_native;
1071mod impl_physical;
1072mod impl_probabilistic;
1073pub mod impl_queue;
1074mod impl_search;
1075mod impl_timeseries;
1076mod impl_tree;
1077mod impl_vcs;
1078mod index_store;
1079mod join_filter;
1080mod keyed_spine;
1081pub mod kv_watch;
1082pub mod lease_lifecycle;
1083pub mod lease_loop;
1084pub mod lease_timer_wheel;
1085pub mod lifecycle;
1086pub mod locking;
1087pub(crate) mod mutation;
1088mod probabilistic_store;
1089pub(crate) mod query_exec;
1090mod queue_delivery;
1091pub mod quota_bucket;
1092mod record_search;
1093mod red_schema;
1094pub mod resource_limits;
1095pub(crate) mod scalar_evaluator;
1096pub mod schema_diff;
1097pub mod schema_vocabulary;
1098pub mod snapshot_reuse;
1099mod statement_frame;
1100mod vector_index;
1101pub mod within_clause;
1102pub mod write_gate;
1103
1104pub use self::graph_dsl::*;
1105use self::join_filter::*;
1106use self::query_exec::*;
1107use self::record_search::*;
1108pub use self::statement_frame::EffectiveScope;
1109
1110/// Re-exports for transports + tests that need per-connection
1111/// isolation, tenant / auth thread-locals, and MVCC snapshot
1112/// utilities. Mirrors what PG-wire / gRPC / HTTP middleware already
1113/// call, and is enough to emulate independent connections in
1114/// integration tests.
1115pub mod mvcc {
1116    pub use super::impl_core::{
1117        capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
1118        clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
1119        entity_visible_under_current_snapshot, entity_visible_with_context,
1120        set_current_auth_identity, set_current_connection_id, set_current_snapshot,
1121        set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
1122    };
1123}
1124
1125/// Public helpers re-exported for use by the presentation layer.
1126pub mod record_search_helpers {
1127    use crate::storage::query::UnifiedRecord;
1128    use crate::storage::UnifiedEntity;
1129    use std::collections::BTreeSet;
1130
1131    pub fn entity_type_and_capabilities(
1132        entity: &UnifiedEntity,
1133    ) -> (&'static str, BTreeSet<String>) {
1134        super::record_search::runtime_entity_type_and_capabilities(entity)
1135    }
1136
1137    /// Materialise any entity kind (TableRow, Node, Edge, Vector,
1138    /// TimeSeriesPoint, QueueMessage) into a `UnifiedRecord` whose
1139    /// `values` carry the native fields. Used by the RLS evaluator
1140    /// when a non-table collection matches a `CompareExpr` policy.
1141    pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
1142        super::record_search::runtime_any_record_from_entity(entity)
1143    }
1144}