1use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{RedDBError, RedDBOptions, RedDBResult};
10use crate::catalog::{
11 CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
12 CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
13};
14use crate::health::{HealthProvider, HealthReport};
15use crate::index::IndexCatalog;
16use crate::physical::{
17 ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
18 SnapshotDescriptor,
19};
20use crate::serde_json::Value as JsonValue;
21use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
22use crate::storage::engine::{
23 BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
24 CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
25 IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
26 MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
27 StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
28};
29use crate::storage::query::ast::{
30 AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateIndexQuery,
31 CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery, DeleteQuery,
32 DropCollectionQuery, DropDocumentQuery, DropGraphQuery, DropIndexQuery, DropKvQuery,
33 DropQueueQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery,
34 EventsBackfillQuery, ExplainAlterQuery, ExplainFormat, FieldRef, Filter, FusionStrategy,
35 GraphCommand, HybridQuery, IndexMethod, InsertEntityType, InsertQuery, JoinQuery, JoinType,
36 OrderByClause, ProbabilisticCommand, Projection, QueryExpr, QueueCommand, QueueSelectQuery,
37 QueueSide, SearchCommand, TableQuery, TreeCommand, TruncateQuery, UpdateQuery, VectorQuery,
38 VectorSource,
39};
40use crate::storage::query::is_universal_entity_source as is_universal_query_source;
41use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
42use crate::storage::query::planner::{
43 CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
44};
45use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
46use crate::storage::schema::Value;
47use crate::storage::unified::dsl::{
48 apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
49 FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
50 QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
51};
52use crate::storage::unified::store::{
53 NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
54 NativeRegistrySummary,
55};
56use crate::storage::unified::{
57 Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
58};
59use crate::storage::{
60 EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
61 UnifiedStore,
62};
63
64#[derive(Debug, Clone)]
65pub struct ConnectionPoolConfig {
66 pub max_connections: usize,
67 pub max_idle: usize,
68}
69
70impl Default for ConnectionPoolConfig {
71 fn default() -> Self {
72 Self {
73 max_connections: 64,
74 max_idle: 16,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct ScanCursor {
81 pub offset: usize,
82}
83
84#[derive(Debug, Clone)]
85pub struct ScanPage {
86 pub collection: String,
87 pub items: Vec<UnifiedEntity>,
88 pub next: Option<ScanCursor>,
89 pub total: usize,
90}
91
92#[derive(Debug, Clone)]
93pub struct SystemInfo {
94 pub pid: u32,
95 pub cpu_cores: usize,
96 pub total_memory_bytes: u64,
97 pub available_memory_bytes: u64,
98 pub os: String,
99 pub arch: String,
100 pub hostname: String,
101}
102
103impl SystemInfo {
104 pub fn should_parallelize() -> bool {
107 std::thread::available_parallelism()
108 .map(|p| p.get() > 1)
109 .unwrap_or(false)
110 }
111
112 pub fn collect() -> Self {
113 Self {
114 pid: std::process::id(),
115 cpu_cores: std::thread::available_parallelism()
116 .map(|p| p.get())
117 .unwrap_or(1),
118 total_memory_bytes: Self::read_total_memory(),
119 available_memory_bytes: Self::read_available_memory(),
120 os: std::env::consts::OS.to_string(),
121 arch: std::env::consts::ARCH.to_string(),
122 hostname: std::env::var("HOSTNAME")
123 .or_else(|_| std::env::var("COMPUTERNAME"))
124 .unwrap_or_else(|_| "unknown".to_string()),
125 }
126 }
127
128 #[cfg(target_os = "linux")]
129 fn read_total_memory() -> u64 {
130 std::fs::read_to_string("/proc/meminfo")
131 .ok()
132 .and_then(|s| {
133 s.lines()
134 .find(|l| l.starts_with("MemTotal:"))
135 .and_then(|l| {
136 l.split_whitespace()
137 .nth(1)
138 .and_then(|v| v.parse::<u64>().ok())
139 })
140 .map(|kb| kb * 1024)
141 })
142 .unwrap_or(0)
143 }
144
145 #[cfg(target_os = "linux")]
146 fn read_available_memory() -> u64 {
147 std::fs::read_to_string("/proc/meminfo")
148 .ok()
149 .and_then(|s| {
150 s.lines()
151 .find(|l| l.starts_with("MemAvailable:"))
152 .and_then(|l| {
153 l.split_whitespace()
154 .nth(1)
155 .and_then(|v| v.parse::<u64>().ok())
156 })
157 .map(|kb| kb * 1024)
158 })
159 .unwrap_or(0)
160 }
161
162 #[cfg(not(target_os = "linux"))]
163 fn read_total_memory() -> u64 {
164 0
165 }
166
167 #[cfg(not(target_os = "linux"))]
168 fn read_available_memory() -> u64 {
169 0
170 }
171}
172
173#[derive(Debug, Clone)]
174pub struct RuntimeStats {
175 pub active_connections: usize,
176 pub idle_connections: usize,
177 pub total_checkouts: u64,
178 pub paged_mode: bool,
179 pub started_at_unix_ms: u128,
180 pub store: StoreStats,
181 pub system: SystemInfo,
182 pub result_blob_cache: crate::storage::cache::BlobCacheStats,
183 pub kv: KvStats,
184}
185
186#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
187pub struct KvStats {
188 pub puts: u64,
189 pub gets: u64,
190 pub deletes: u64,
191 pub incrs: u64,
192 pub cas_success: u64,
193 pub cas_conflict: u64,
194 pub watch_streams_active: u64,
195 pub watch_events_emitted: u64,
196 pub watch_drops: u64,
197}
198
199#[derive(Debug, Default)]
200pub(crate) struct KvStatsCounters {
201 puts: AtomicU64,
202 gets: AtomicU64,
203 deletes: AtomicU64,
204 incrs: AtomicU64,
205 cas_success: AtomicU64,
206 cas_conflict: AtomicU64,
207 watch_streams_active: AtomicU64,
208 watch_events_emitted: AtomicU64,
209 watch_drops: AtomicU64,
210}
211
212#[derive(Debug, Default)]
213pub(crate) struct KvTagIndex {
214 tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
215 key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
216}
217
218impl KvTagIndex {
219 pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
220 let entry_key = (collection.to_string(), key.to_string());
221 let new_tags: BTreeSet<String> = tags
222 .iter()
223 .map(|tag| tag.trim())
224 .filter(|tag| !tag.is_empty())
225 .map(ToOwned::to_owned)
226 .collect();
227
228 let old_tags = {
229 let mut key_to_tags = self.key_to_tags.write();
230 if new_tags.is_empty() {
231 key_to_tags.remove(&entry_key)
232 } else {
233 key_to_tags.insert(entry_key.clone(), new_tags.clone())
234 }
235 };
236
237 let mut tag_to_entries = self.tag_to_entries.write();
238 if let Some(old_tags) = old_tags {
239 for tag in old_tags {
240 let scoped = (collection.to_string(), tag);
241 let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
242 entries.remove(key);
243 entries.is_empty()
244 } else {
245 false
246 };
247 if remove_scoped {
248 tag_to_entries.remove(&scoped);
249 }
250 }
251 }
252
253 for tag in new_tags {
254 tag_to_entries
255 .entry((collection.to_string(), tag))
256 .or_default()
257 .insert(key.to_string(), id);
258 }
259 }
260
261 pub(crate) fn remove(&self, collection: &str, key: &str) {
262 let entry_key = (collection.to_string(), key.to_string());
263 let old_tags = self.key_to_tags.write().remove(&entry_key);
264 let Some(old_tags) = old_tags else {
265 return;
266 };
267
268 let mut tag_to_entries = self.tag_to_entries.write();
269 for tag in old_tags {
270 let scoped = (collection.to_string(), tag);
271 let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
272 entries.remove(key);
273 entries.is_empty()
274 } else {
275 false
276 };
277 if remove_scoped {
278 tag_to_entries.remove(&scoped);
279 }
280 }
281 }
282
283 pub(crate) fn entries_for_tags(
284 &self,
285 collection: &str,
286 tags: &[String],
287 ) -> Vec<(String, EntityId)> {
288 if tags.is_empty() {
289 return Vec::new();
290 }
291
292 let tag_to_entries = self.tag_to_entries.read();
293 let mut out: HashMap<String, EntityId> = HashMap::new();
294 for tag in tags {
295 let scoped = (collection.to_string(), tag.trim().to_string());
296 if let Some(entries) = tag_to_entries.get(&scoped) {
297 for (key, id) in entries {
298 out.entry(key.clone()).or_insert(*id);
299 }
300 }
301 }
302 out.into_iter().collect()
303 }
304
305 pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
306 self.key_to_tags
307 .read()
308 .get(&(collection.to_string(), key.to_string()))
309 .map(|tags| tags.iter().cloned().collect())
310 .unwrap_or_default()
311 }
312}
313
314impl KvStatsCounters {
315 pub(crate) fn snapshot(&self) -> KvStats {
316 KvStats {
317 puts: self.puts.load(AtomicOrdering::Relaxed),
318 gets: self.gets.load(AtomicOrdering::Relaxed),
319 deletes: self.deletes.load(AtomicOrdering::Relaxed),
320 incrs: self.incrs.load(AtomicOrdering::Relaxed),
321 cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
322 cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
323 watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
324 watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
325 watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
326 }
327 }
328
329 pub(crate) fn incr_puts(&self) {
330 self.puts.fetch_add(1, AtomicOrdering::Relaxed);
331 }
332
333 pub(crate) fn incr_gets(&self) {
334 self.gets.fetch_add(1, AtomicOrdering::Relaxed);
335 }
336
337 pub(crate) fn incr_deletes(&self) {
338 self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
339 }
340
341 pub(crate) fn incr_incrs(&self) {
342 self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
343 }
344
345 pub(crate) fn incr_cas_success(&self) {
346 self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
347 }
348
349 pub(crate) fn incr_cas_conflict(&self) {
350 self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
351 }
352
353 pub(crate) fn incr_watch_streams_active(&self) {
354 self.watch_streams_active
355 .fetch_add(1, AtomicOrdering::Relaxed);
356 }
357
358 pub(crate) fn decr_watch_streams_active(&self) {
359 self.watch_streams_active
360 .fetch_sub(1, AtomicOrdering::Relaxed);
361 }
362
363 pub(crate) fn incr_watch_events_emitted(&self) {
364 self.watch_events_emitted
365 .fetch_add(1, AtomicOrdering::Relaxed);
366 }
367
368 pub(crate) fn add_watch_drops(&self, count: u64) {
369 self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
370 }
371}
372
373#[derive(Debug, Clone)]
374pub struct RuntimeQueryResult {
375 pub query: String,
376 pub mode: QueryMode,
377 pub statement: &'static str,
378 pub engine: &'static str,
379 pub result: UnifiedResult,
380 pub affected_rows: u64,
381 pub statement_type: &'static str,
383}
384
385impl RuntimeQueryResult {
386 pub fn dml_result(
388 query: String,
389 affected: u64,
390 statement_type: &'static str,
391 engine: &'static str,
392 ) -> Self {
393 Self {
394 query,
395 mode: QueryMode::Sql,
396 statement: statement_type,
397 engine,
398 result: UnifiedResult::empty(),
399 affected_rows: affected,
400 statement_type,
401 }
402 }
403
404 pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
406 let mut result = UnifiedResult::empty();
407 let mut record = UnifiedRecord::new();
408 record.set("message", Value::text(message.to_string()));
409 result.push(record);
410 result.columns = vec!["message".to_string()];
411
412 Self {
413 query,
414 mode: QueryMode::Sql,
415 statement: statement_type,
416 engine: "runtime-ddl",
417 result,
418 affected_rows: 0,
419 statement_type,
420 }
421 }
422
423 pub fn ok_records(
427 query: String,
428 columns: Vec<String>,
429 rows: Vec<Vec<(String, Value)>>,
430 statement_type: &'static str,
431 ) -> Self {
432 let mut result = UnifiedResult::empty();
433 for row in rows {
434 let mut record = UnifiedRecord::new();
435 for (k, v) in row {
436 record.set(&k, v);
437 }
438 result.push(record);
439 }
440 result.columns = columns;
441
442 Self {
443 query,
444 mode: QueryMode::Sql,
445 statement: statement_type,
446 engine: "runtime-meta",
447 result,
448 affected_rows: 0,
449 statement_type,
450 }
451 }
452}
453
454#[derive(Debug, Clone)]
455pub struct RuntimeQueryExplain {
456 pub query: String,
457 pub mode: QueryMode,
458 pub statement: &'static str,
459 pub is_universal: bool,
460 pub plan_cost: crate::storage::query::planner::PlanCost,
461 pub estimated_rows: f64,
462 pub estimated_selectivity: f64,
463 pub estimated_confidence: f64,
464 pub passes_applied: Vec<String>,
465 pub logical_plan: CanonicalLogicalPlan,
466 pub cte_materializations: Vec<String>,
473}
474
475#[derive(Debug, Clone)]
476pub struct RuntimeIvfMatch {
477 pub entity_id: u64,
478 pub distance: f32,
479 pub entity: Option<UnifiedEntity>,
480}
481
482#[derive(Debug, Clone)]
483pub struct RuntimeIvfSearchResult {
484 pub collection: String,
485 pub k: usize,
486 pub n_lists: usize,
487 pub n_probes: usize,
488 pub stats: IvfStats,
489 pub matches: Vec<RuntimeIvfMatch>,
490}
491
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493pub enum RuntimeGraphDirection {
494 Outgoing,
495 Incoming,
496 Both,
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq)]
500pub enum RuntimeGraphTraversalStrategy {
501 Bfs,
502 Dfs,
503}
504
505#[derive(Debug, Clone, Copy, PartialEq, Eq)]
506pub enum RuntimeGraphPathAlgorithm {
507 Bfs,
508 Dijkstra,
509 AStar,
510 BellmanFord,
511}
512
513#[derive(Debug, Clone)]
514pub struct RuntimeGraphNode {
515 pub id: String,
516 pub label: String,
517 pub node_type: String,
518 pub out_edge_count: u32,
519 pub in_edge_count: u32,
520}
521
522#[derive(Debug, Clone)]
523pub struct RuntimeGraphEdge {
524 pub source: String,
525 pub target: String,
526 pub edge_type: String,
527 pub weight: f32,
528}
529
530#[derive(Debug, Clone)]
531pub struct RuntimeGraphVisit {
532 pub depth: usize,
533 pub node: RuntimeGraphNode,
534}
535
536#[derive(Debug, Clone)]
537pub struct RuntimeGraphNeighborhoodResult {
538 pub source: String,
539 pub direction: RuntimeGraphDirection,
540 pub max_depth: usize,
541 pub nodes: Vec<RuntimeGraphVisit>,
542 pub edges: Vec<RuntimeGraphEdge>,
543}
544
545#[derive(Debug, Clone)]
546pub struct RuntimeGraphTraversalResult {
547 pub source: String,
548 pub direction: RuntimeGraphDirection,
549 pub strategy: RuntimeGraphTraversalStrategy,
550 pub max_depth: usize,
551 pub visits: Vec<RuntimeGraphVisit>,
552 pub edges: Vec<RuntimeGraphEdge>,
553}
554
555#[derive(Debug, Clone)]
556pub struct RuntimeGraphPath {
557 pub hop_count: usize,
558 pub total_weight: f64,
559 pub nodes: Vec<RuntimeGraphNode>,
560 pub edges: Vec<RuntimeGraphEdge>,
561}
562
563#[derive(Debug, Clone)]
564pub struct RuntimeGraphPathResult {
565 pub source: String,
566 pub target: String,
567 pub direction: RuntimeGraphDirection,
568 pub algorithm: RuntimeGraphPathAlgorithm,
569 pub nodes_visited: usize,
570 pub negative_cycle_detected: Option<bool>,
571 pub path: Option<RuntimeGraphPath>,
572}
573
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575pub enum RuntimeGraphComponentsMode {
576 Connected,
577 Weak,
578 Strong,
579}
580
581#[derive(Debug, Clone, Copy, PartialEq, Eq)]
582pub enum RuntimeGraphCentralityAlgorithm {
583 Degree,
584 Closeness,
585 Betweenness,
586 Eigenvector,
587 PageRank,
588}
589
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
591pub enum RuntimeGraphCommunityAlgorithm {
592 LabelPropagation,
593 Louvain,
594}
595
596#[derive(Debug, Clone)]
597pub struct RuntimeGraphComponent {
598 pub id: String,
599 pub size: usize,
600 pub nodes: Vec<String>,
601}
602
603#[derive(Debug, Clone)]
604pub struct RuntimeGraphComponentsResult {
605 pub mode: RuntimeGraphComponentsMode,
606 pub count: usize,
607 pub components: Vec<RuntimeGraphComponent>,
608}
609
610#[derive(Debug, Clone)]
611pub struct RuntimeGraphCentralityScore {
612 pub node: RuntimeGraphNode,
613 pub score: f64,
614}
615
616#[derive(Debug, Clone)]
617pub struct RuntimeGraphDegreeScore {
618 pub node: RuntimeGraphNode,
619 pub in_degree: usize,
620 pub out_degree: usize,
621 pub total_degree: usize,
622}
623
624#[derive(Debug, Clone)]
625pub struct RuntimeGraphCentralityResult {
626 pub algorithm: RuntimeGraphCentralityAlgorithm,
627 pub normalized: Option<bool>,
628 pub iterations: Option<usize>,
629 pub converged: Option<bool>,
630 pub scores: Vec<RuntimeGraphCentralityScore>,
631 pub degree_scores: Vec<RuntimeGraphDegreeScore>,
632}
633
634#[derive(Debug, Clone)]
635pub struct RuntimeGraphCommunity {
636 pub id: String,
637 pub size: usize,
638 pub nodes: Vec<String>,
639}
640
641#[derive(Debug, Clone)]
642pub struct RuntimeGraphCommunityResult {
643 pub algorithm: RuntimeGraphCommunityAlgorithm,
644 pub count: usize,
645 pub iterations: Option<usize>,
646 pub converged: Option<bool>,
647 pub modularity: Option<f64>,
648 pub passes: Option<usize>,
649 pub communities: Vec<RuntimeGraphCommunity>,
650}
651
652#[derive(Debug, Clone)]
653pub struct RuntimeGraphClusteringResult {
654 pub global: f64,
655 pub local: Vec<RuntimeGraphCentralityScore>,
656 pub triangle_count: Option<usize>,
657}
658
659#[derive(Debug, Clone)]
660pub struct RuntimeGraphHitsResult {
661 pub iterations: usize,
662 pub converged: bool,
663 pub hubs: Vec<RuntimeGraphCentralityScore>,
664 pub authorities: Vec<RuntimeGraphCentralityScore>,
665}
666
667#[derive(Debug, Clone)]
668pub struct RuntimeGraphCyclesResult {
669 pub limit_reached: bool,
670 pub cycles: Vec<RuntimeGraphPath>,
671}
672
673#[derive(Debug, Clone)]
674pub struct RuntimeGraphTopologicalSortResult {
675 pub acyclic: bool,
676 pub ordered_nodes: Vec<RuntimeGraphNode>,
677}
678
679#[derive(Debug, Clone)]
680pub struct RuntimeGraphPropertiesResult {
681 pub node_count: usize,
682 pub edge_count: usize,
683 pub self_loop_count: usize,
684 pub negative_edge_count: usize,
685 pub connected_component_count: usize,
686 pub weak_component_count: usize,
687 pub strong_component_count: usize,
688 pub is_empty: bool,
689 pub is_connected: bool,
690 pub is_weakly_connected: bool,
691 pub is_strongly_connected: bool,
692 pub is_complete: bool,
693 pub is_complete_directed: bool,
694 pub is_cyclic: bool,
695 pub is_circular: bool,
696 pub is_acyclic: bool,
697 pub is_tree: bool,
698 pub density: f64,
699 pub density_directed: f64,
700}
701
702#[derive(Debug, Clone)]
707pub struct ContextSearchResult {
708 pub query: String,
709 pub tables: Vec<ContextEntity>,
710 pub graph: ContextGraphResult,
711 pub vectors: Vec<ContextEntity>,
712 pub documents: Vec<ContextEntity>,
713 pub key_values: Vec<ContextEntity>,
714 pub connections: Vec<ContextConnection>,
715 pub summary: ContextSummary,
716}
717
718#[derive(Debug, Clone)]
719pub struct ContextEntity {
720 pub entity: UnifiedEntity,
721 pub score: f32,
722 pub discovery: DiscoveryMethod,
723 pub collection: String,
724}
725
726#[derive(Debug, Clone)]
727pub enum DiscoveryMethod {
728 Indexed {
729 field: String,
730 },
731 GlobalScan,
732 CrossReference {
733 source_id: u64,
734 ref_type: String,
735 },
736 GraphTraversal {
737 source_id: u64,
738 edge_type: String,
739 depth: usize,
740 },
741 VectorQuery {
742 similarity: f32,
743 },
744}
745
746#[derive(Debug, Clone)]
747pub struct ContextGraphResult {
748 pub nodes: Vec<ContextEntity>,
749 pub edges: Vec<ContextEntity>,
750}
751
752#[derive(Debug, Clone)]
753pub struct ContextConnection {
754 pub from_id: u64,
755 pub to_id: u64,
756 pub connection_type: ContextConnectionType,
757 pub weight: f32,
758}
759
760#[derive(Debug, Clone)]
761pub enum ContextConnectionType {
762 CrossRef(String),
763 GraphEdge(String),
764 VectorSimilarity(f32),
765}
766
767#[derive(Debug, Clone)]
768pub struct ContextSummary {
769 pub total_entities: usize,
770 pub direct_matches: usize,
771 pub expanded_via_graph: usize,
772 pub expanded_via_cross_refs: usize,
773 pub expanded_via_vector_query: usize,
774 pub collections_searched: usize,
775 pub execution_time_us: u64,
776 pub tiers_used: Vec<String>,
777 pub entities_reindexed: usize,
778}
779
780struct PoolState {
781 next_id: u64,
782 active: usize,
783 idle: Vec<u64>,
784 total_checkouts: u64,
785}
786
787impl Default for PoolState {
788 fn default() -> Self {
789 Self {
790 next_id: 1,
791 active: 0,
792 idle: Vec::new(),
793 total_checkouts: 0,
794 }
795 }
796}
797
798#[derive(Debug, Clone)]
799struct RuntimeResultCacheEntry {
800 result: RuntimeQueryResult,
801 cached_at: std::time::Instant,
802 scopes: HashSet<String>,
803}
804
805pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
806
807struct RuntimeInner {
808 db: Arc<RedDB>,
809 layout: PhysicalLayout,
810 indices: IndexCatalog,
811 pool_config: ConnectionPoolConfig,
812 pool: Mutex<PoolState>,
813 started_at_unix_ms: u128,
814 probabilistic: probabilistic_store::ProbabilisticStore,
815 index_store: index_store::IndexStore,
816 cdc: crate::replication::cdc::CdcBuffer,
817 backup_scheduler: crate::replication::scheduler::BackupScheduler,
818 query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
819 result_cache: parking_lot::RwLock<(
820 HashMap<String, RuntimeResultCacheEntry>,
821 std::collections::VecDeque<String>,
822 )>,
823 result_blob_cache: crate::storage::cache::BlobCache,
824 result_blob_entries: parking_lot::RwLock<(
825 HashMap<String, RuntimeResultCacheEntry>,
826 std::collections::VecDeque<String>,
827 )>,
828 result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
829 queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
832 planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
833 ec_registry: Arc<crate::ec::config::EcRegistry>,
834 ec_worker: crate::ec::worker::EcWorker,
835 auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
840 oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
847 views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
858 materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
859 snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
867 tx_contexts:
874 parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
875 lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
882 env_config_overrides: HashMap<String, String>,
888 tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
896 rls_policies: parking_lot::RwLock<
903 HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
904 >,
905 rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
906 foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
914 pending_tombstones: parking_lot::RwLock<
925 HashMap<
926 u64,
927 Vec<(
928 String,
929 crate::storage::unified::entity::EntityId,
930 crate::storage::transaction::snapshot::Xid,
931 )>,
932 >,
933 >,
934 pending_kv_watch_events:
935 parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
936 tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
945 ddl_epoch: std::sync::atomic::AtomicU64,
951 write_gate: Arc<crate::runtime::write_gate::WriteGate>,
962 lifecycle: crate::runtime::lifecycle::Lifecycle,
968 resource_limits: crate::runtime::resource_limits::ResourceLimits,
973 audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
977 lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
984 replica_apply_metrics: crate::replication::logical::ReplicaApplyMetrics,
988 quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
991 schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
996 slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
1002 kv_stats: KvStatsCounters,
1005 kv_tag_index: KvTagIndex,
1007}
1008
1009#[derive(Clone)]
1010pub struct RedDBRuntime {
1011 inner: Arc<RuntimeInner>,
1012}
1013
1014pub struct RuntimeConnection {
1015 id: u64,
1016 inner: Arc<RuntimeInner>,
1017}
1018
1019pub mod ai;
1020pub mod ask_pipeline;
1021pub mod audit_log;
1022pub mod audit_query;
1023pub mod authorized_search;
1024mod collection_contract;
1025pub mod config_matrix;
1026pub mod config_overlay;
1027pub mod config_watcher;
1028pub(crate) mod ddl;
1029pub mod disk_space_monitor;
1030mod dml_target_scan;
1031mod expr_eval;
1032mod graph_dsl;
1033mod health_connection;
1034mod impl_config;
1035pub(crate) mod impl_core;
1036mod impl_ddl;
1037mod impl_dml;
1038mod impl_ec;
1039mod impl_events;
1040mod impl_graph;
1041mod impl_graph_commands;
1042pub mod impl_kv;
1043mod impl_migrations;
1044mod impl_native;
1045mod impl_physical;
1046mod impl_probabilistic;
1047pub mod impl_queue;
1048mod impl_search;
1049mod impl_timeseries;
1050mod impl_tree;
1051mod impl_vcs;
1052mod index_store;
1053mod join_filter;
1054mod keyed_spine;
1055pub mod kv_watch;
1056pub mod lease_lifecycle;
1057pub mod lease_loop;
1058pub mod lease_timer_wheel;
1059pub mod lifecycle;
1060pub mod locking;
1061pub(crate) mod mutation;
1062mod probabilistic_store;
1063pub(crate) mod query_exec;
1064mod queue_delivery;
1065pub mod quota_bucket;
1066mod record_search;
1067mod red_schema;
1068pub mod resource_limits;
1069pub(crate) mod scalar_evaluator;
1070pub mod schema_diff;
1071pub mod schema_vocabulary;
1072pub mod snapshot_reuse;
1073mod statement_frame;
1074pub mod within_clause;
1075pub mod write_gate;
1076
1077pub use self::graph_dsl::*;
1078use self::join_filter::*;
1079use self::query_exec::*;
1080use self::record_search::*;
1081pub use self::statement_frame::EffectiveScope;
1082
1083pub mod mvcc {
1089 pub use super::impl_core::{
1090 capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
1091 clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
1092 entity_visible_under_current_snapshot, entity_visible_with_context,
1093 set_current_auth_identity, set_current_connection_id, set_current_snapshot,
1094 set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
1095 };
1096}
1097
1098pub mod record_search_helpers {
1100 use crate::storage::query::UnifiedRecord;
1101 use crate::storage::UnifiedEntity;
1102 use std::collections::BTreeSet;
1103
1104 pub fn entity_type_and_capabilities(
1105 entity: &UnifiedEntity,
1106 ) -> (&'static str, BTreeSet<String>) {
1107 super::record_search::runtime_entity_type_and_capabilities(entity)
1108 }
1109
1110 pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
1115 super::record_search::runtime_any_record_from_entity(entity)
1116 }
1117}