1use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{RedDBError, RedDBOptions, RedDBResult};
10use crate::catalog::{
11 CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
12 CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
13};
14use crate::health::{HealthProvider, HealthReport};
15use crate::index::IndexCatalog;
16use crate::physical::{
17 ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
18 SnapshotDescriptor,
19};
20use crate::serde_json::Value as JsonValue;
21use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
22use crate::storage::engine::{
23 BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
24 CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
25 IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
26 MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
27 StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
28};
29use crate::storage::query::ast::{
30 AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateCollectionQuery,
31 CreateIndexQuery, CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery,
32 CreateVectorQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery,
33 DropIndexQuery, DropKvQuery, DropQueueQuery, DropTableQuery, DropTimeSeriesQuery,
34 DropTreeQuery, DropVectorQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainFormat,
35 FieldRef, Filter, FusionStrategy, GraphCommand, HybridQuery, IndexMethod, InsertEntityType,
36 InsertQuery, JoinQuery, JoinType, OrderByClause, ProbabilisticCommand, Projection, QueryExpr,
37 QueueCommand, QueueSelectQuery, QueueSide, SearchCommand, TableQuery, TreeCommand,
38 TruncateQuery, UpdateQuery, VectorQuery, VectorSource,
39};
40use crate::storage::query::is_universal_entity_source as is_universal_query_source;
41use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
42use crate::storage::query::planner::{
43 CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
44};
45use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
46use crate::storage::schema::Value;
47use crate::storage::unified::dsl::{
48 apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
49 FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
50 QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
51};
52use crate::storage::unified::store::{
53 NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
54 NativeRegistrySummary,
55};
56use crate::storage::unified::{
57 Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
58};
59use crate::storage::{
60 EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
61 UnifiedStore,
62};
63
64#[derive(Debug, Clone)]
65pub struct ConnectionPoolConfig {
66 pub max_connections: usize,
67 pub max_idle: usize,
68}
69
70impl Default for ConnectionPoolConfig {
71 fn default() -> Self {
72 Self {
73 max_connections: 64,
74 max_idle: 16,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct ScanCursor {
81 pub offset: usize,
82}
83
84#[derive(Debug, Clone)]
85pub struct ScanPage {
86 pub collection: String,
87 pub items: Vec<UnifiedEntity>,
88 pub next: Option<ScanCursor>,
89 pub total: usize,
90}
91
92#[derive(Debug, Clone)]
93pub struct SystemInfo {
94 pub pid: u32,
95 pub cpu_cores: usize,
96 pub total_memory_bytes: u64,
97 pub available_memory_bytes: u64,
98 pub os: String,
99 pub arch: String,
100 pub hostname: String,
101}
102
103impl SystemInfo {
104 pub fn should_parallelize() -> bool {
107 std::thread::available_parallelism()
108 .map(|p| p.get() > 1)
109 .unwrap_or(false)
110 }
111
112 pub fn collect() -> Self {
113 Self {
114 pid: std::process::id(),
115 cpu_cores: std::thread::available_parallelism()
116 .map(|p| p.get())
117 .unwrap_or(1),
118 total_memory_bytes: Self::read_total_memory(),
119 available_memory_bytes: Self::read_available_memory(),
120 os: std::env::consts::OS.to_string(),
121 arch: std::env::consts::ARCH.to_string(),
122 hostname: std::env::var("HOSTNAME")
123 .or_else(|_| std::env::var("COMPUTERNAME"))
124 .unwrap_or_else(|_| "unknown".to_string()),
125 }
126 }
127
128 #[cfg(target_os = "linux")]
129 fn read_total_memory() -> u64 {
130 std::fs::read_to_string("/proc/meminfo")
131 .ok()
132 .and_then(|s| {
133 s.lines()
134 .find(|l| l.starts_with("MemTotal:"))
135 .and_then(|l| {
136 l.split_whitespace()
137 .nth(1)
138 .and_then(|v| v.parse::<u64>().ok())
139 })
140 .map(|kb| kb * 1024)
141 })
142 .unwrap_or(0)
143 }
144
145 #[cfg(target_os = "linux")]
146 fn read_available_memory() -> u64 {
147 std::fs::read_to_string("/proc/meminfo")
148 .ok()
149 .and_then(|s| {
150 s.lines()
151 .find(|l| l.starts_with("MemAvailable:"))
152 .and_then(|l| {
153 l.split_whitespace()
154 .nth(1)
155 .and_then(|v| v.parse::<u64>().ok())
156 })
157 .map(|kb| kb * 1024)
158 })
159 .unwrap_or(0)
160 }
161
162 #[cfg(not(target_os = "linux"))]
163 fn read_total_memory() -> u64 {
164 0
165 }
166
167 #[cfg(not(target_os = "linux"))]
168 fn read_available_memory() -> u64 {
169 0
170 }
171}
172
173#[derive(Debug, Clone)]
174pub struct RuntimeStats {
175 pub active_connections: usize,
176 pub idle_connections: usize,
177 pub total_checkouts: u64,
178 pub paged_mode: bool,
179 pub started_at_unix_ms: u128,
180 pub store: StoreStats,
181 pub system: SystemInfo,
182 pub result_blob_cache: crate::storage::cache::BlobCacheStats,
183 pub kv: KvStats,
184 pub metrics_ingest: MetricsIngestStats,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct MetricsTenantActivityStats {
189 pub tenant: String,
190 pub namespace: String,
191 pub operation: String,
192 pub count: u64,
193}
194
195#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
196pub struct MetricsIngestStats {
197 pub samples_accepted: u64,
198 pub series_accepted: u64,
199 pub samples_rejected: u64,
200 pub series_rejected: u64,
201 pub series_rejected_cardinality_budget: u64,
202}
203
204#[derive(Debug, Default)]
205pub(crate) struct MetricsIngestCounters {
206 samples_accepted: AtomicU64,
207 series_accepted: AtomicU64,
208 samples_rejected: AtomicU64,
209 series_rejected: AtomicU64,
210 series_rejected_cardinality_budget: AtomicU64,
211}
212
213impl MetricsIngestCounters {
214 pub(crate) fn record(
215 &self,
216 accepted_samples: u64,
217 accepted_series: u64,
218 rejected_samples: u64,
219 rejected_series: u64,
220 ) {
221 self.samples_accepted
222 .fetch_add(accepted_samples, AtomicOrdering::Relaxed);
223 self.series_accepted
224 .fetch_add(accepted_series, AtomicOrdering::Relaxed);
225 self.samples_rejected
226 .fetch_add(rejected_samples, AtomicOrdering::Relaxed);
227 self.series_rejected
228 .fetch_add(rejected_series, AtomicOrdering::Relaxed);
229 }
230
231 pub(crate) fn record_cardinality_budget_rejections(&self, rejected_series: u64) {
232 self.series_rejected_cardinality_budget
233 .fetch_add(rejected_series, AtomicOrdering::Relaxed);
234 }
235
236 pub(crate) fn snapshot(&self) -> MetricsIngestStats {
237 MetricsIngestStats {
238 samples_accepted: self.samples_accepted.load(AtomicOrdering::Relaxed),
239 series_accepted: self.series_accepted.load(AtomicOrdering::Relaxed),
240 samples_rejected: self.samples_rejected.load(AtomicOrdering::Relaxed),
241 series_rejected: self.series_rejected.load(AtomicOrdering::Relaxed),
242 series_rejected_cardinality_budget: self
243 .series_rejected_cardinality_budget
244 .load(AtomicOrdering::Relaxed),
245 }
246 }
247}
248
249#[derive(Debug, Default)]
250pub(crate) struct MetricsTenantActivityCounters {
251 inner: Mutex<BTreeMap<(String, String, String), u64>>,
252}
253
254impl MetricsTenantActivityCounters {
255 pub(crate) fn record(&self, tenant: &str, namespace: &str, operation: &str) {
256 let mut inner = self
257 .inner
258 .lock()
259 .unwrap_or_else(|poison| poison.into_inner());
260 let key = (
261 tenant.to_string(),
262 namespace.to_string(),
263 operation.to_string(),
264 );
265 *inner.entry(key).or_insert(0) += 1;
266 }
267
268 pub(crate) fn snapshot(&self) -> Vec<MetricsTenantActivityStats> {
269 let inner = self
270 .inner
271 .lock()
272 .unwrap_or_else(|poison| poison.into_inner());
273 inner
274 .iter()
275 .map(
276 |((tenant, namespace, operation), count)| MetricsTenantActivityStats {
277 tenant: tenant.clone(),
278 namespace: namespace.clone(),
279 operation: operation.clone(),
280 count: *count,
281 },
282 )
283 .collect()
284 }
285}
286
287#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
288pub struct KvStats {
289 pub puts: u64,
290 pub gets: u64,
291 pub deletes: u64,
292 pub incrs: u64,
293 pub cas_success: u64,
294 pub cas_conflict: u64,
295 pub watch_streams_active: u64,
296 pub watch_events_emitted: u64,
297 pub watch_drops: u64,
298}
299
300#[derive(Debug, Default)]
301pub(crate) struct KvStatsCounters {
302 puts: AtomicU64,
303 gets: AtomicU64,
304 deletes: AtomicU64,
305 incrs: AtomicU64,
306 cas_success: AtomicU64,
307 cas_conflict: AtomicU64,
308 watch_streams_active: AtomicU64,
309 watch_events_emitted: AtomicU64,
310 watch_drops: AtomicU64,
311}
312
313#[derive(Debug, Default)]
314pub(crate) struct KvTagIndex {
315 tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
316 key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
317}
318
319impl KvTagIndex {
320 pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
321 let entry_key = (collection.to_string(), key.to_string());
322 let new_tags: BTreeSet<String> = tags
323 .iter()
324 .map(|tag| tag.trim())
325 .filter(|tag| !tag.is_empty())
326 .map(ToOwned::to_owned)
327 .collect();
328
329 let old_tags = {
330 let mut key_to_tags = self.key_to_tags.write();
331 if new_tags.is_empty() {
332 key_to_tags.remove(&entry_key)
333 } else {
334 key_to_tags.insert(entry_key.clone(), new_tags.clone())
335 }
336 };
337
338 let mut tag_to_entries = self.tag_to_entries.write();
339 if let Some(old_tags) = old_tags {
340 for tag in old_tags {
341 let scoped = (collection.to_string(), tag);
342 let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
343 entries.remove(key);
344 entries.is_empty()
345 } else {
346 false
347 };
348 if remove_scoped {
349 tag_to_entries.remove(&scoped);
350 }
351 }
352 }
353
354 for tag in new_tags {
355 tag_to_entries
356 .entry((collection.to_string(), tag))
357 .or_default()
358 .insert(key.to_string(), id);
359 }
360 }
361
362 pub(crate) fn remove(&self, collection: &str, key: &str) {
363 let entry_key = (collection.to_string(), key.to_string());
364 let old_tags = self.key_to_tags.write().remove(&entry_key);
365 let Some(old_tags) = old_tags else {
366 return;
367 };
368
369 let mut tag_to_entries = self.tag_to_entries.write();
370 for tag in old_tags {
371 let scoped = (collection.to_string(), tag);
372 let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
373 entries.remove(key);
374 entries.is_empty()
375 } else {
376 false
377 };
378 if remove_scoped {
379 tag_to_entries.remove(&scoped);
380 }
381 }
382 }
383
384 pub(crate) fn entries_for_tags(
385 &self,
386 collection: &str,
387 tags: &[String],
388 ) -> Vec<(String, EntityId)> {
389 if tags.is_empty() {
390 return Vec::new();
391 }
392
393 let tag_to_entries = self.tag_to_entries.read();
394 let mut out: HashMap<String, EntityId> = HashMap::new();
395 for tag in tags {
396 let scoped = (collection.to_string(), tag.trim().to_string());
397 if let Some(entries) = tag_to_entries.get(&scoped) {
398 for (key, id) in entries {
399 out.entry(key.clone()).or_insert(*id);
400 }
401 }
402 }
403 out.into_iter().collect()
404 }
405
406 pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
407 self.key_to_tags
408 .read()
409 .get(&(collection.to_string(), key.to_string()))
410 .map(|tags| tags.iter().cloned().collect())
411 .unwrap_or_default()
412 }
413}
414
415impl KvStatsCounters {
416 pub(crate) fn snapshot(&self) -> KvStats {
417 KvStats {
418 puts: self.puts.load(AtomicOrdering::Relaxed),
419 gets: self.gets.load(AtomicOrdering::Relaxed),
420 deletes: self.deletes.load(AtomicOrdering::Relaxed),
421 incrs: self.incrs.load(AtomicOrdering::Relaxed),
422 cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
423 cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
424 watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
425 watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
426 watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
427 }
428 }
429
430 pub(crate) fn incr_puts(&self) {
431 self.puts.fetch_add(1, AtomicOrdering::Relaxed);
432 }
433
434 pub(crate) fn incr_gets(&self) {
435 self.gets.fetch_add(1, AtomicOrdering::Relaxed);
436 }
437
438 pub(crate) fn incr_deletes(&self) {
439 self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
440 }
441
442 pub(crate) fn incr_incrs(&self) {
443 self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
444 }
445
446 pub(crate) fn incr_cas_success(&self) {
447 self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
448 }
449
450 pub(crate) fn incr_cas_conflict(&self) {
451 self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
452 }
453
454 pub(crate) fn incr_watch_streams_active(&self) {
455 self.watch_streams_active
456 .fetch_add(1, AtomicOrdering::Relaxed);
457 }
458
459 pub(crate) fn decr_watch_streams_active(&self) {
460 self.watch_streams_active
461 .fetch_sub(1, AtomicOrdering::Relaxed);
462 }
463
464 pub(crate) fn incr_watch_events_emitted(&self) {
465 self.watch_events_emitted
466 .fetch_add(1, AtomicOrdering::Relaxed);
467 }
468
469 pub(crate) fn add_watch_drops(&self, count: u64) {
470 self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
471 }
472}
473
474#[derive(Debug, Clone)]
475pub struct RuntimeQueryResult {
476 pub query: String,
477 pub mode: QueryMode,
478 pub statement: &'static str,
479 pub engine: &'static str,
480 pub result: UnifiedResult,
481 pub affected_rows: u64,
482 pub statement_type: &'static str,
484}
485
486impl RuntimeQueryResult {
487 pub fn dml_result(
489 query: String,
490 affected: u64,
491 statement_type: &'static str,
492 engine: &'static str,
493 ) -> Self {
494 Self {
495 query,
496 mode: QueryMode::Sql,
497 statement: statement_type,
498 engine,
499 result: UnifiedResult::empty(),
500 affected_rows: affected,
501 statement_type,
502 }
503 }
504
505 pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
507 let mut result = UnifiedResult::empty();
508 let mut record = UnifiedRecord::new();
509 record.set("message", Value::text(message.to_string()));
510 result.push(record);
511 result.columns = vec!["message".to_string()];
512
513 Self {
514 query,
515 mode: QueryMode::Sql,
516 statement: statement_type,
517 engine: "runtime-ddl",
518 result,
519 affected_rows: 0,
520 statement_type,
521 }
522 }
523
524 pub fn ok_records(
528 query: String,
529 columns: Vec<String>,
530 rows: Vec<Vec<(String, Value)>>,
531 statement_type: &'static str,
532 ) -> Self {
533 let mut result = UnifiedResult::empty();
534 for row in rows {
535 let mut record = UnifiedRecord::new();
536 for (k, v) in row {
537 record.set(&k, v);
538 }
539 result.push(record);
540 }
541 result.columns = columns;
542
543 Self {
544 query,
545 mode: QueryMode::Sql,
546 statement: statement_type,
547 engine: "runtime-meta",
548 result,
549 affected_rows: 0,
550 statement_type,
551 }
552 }
553}
554
555#[derive(Debug, Clone)]
556pub struct RuntimeQueryExplain {
557 pub query: String,
558 pub mode: QueryMode,
559 pub statement: &'static str,
560 pub is_universal: bool,
561 pub plan_cost: crate::storage::query::planner::PlanCost,
562 pub estimated_rows: f64,
563 pub estimated_selectivity: f64,
564 pub estimated_confidence: f64,
565 pub passes_applied: Vec<String>,
566 pub logical_plan: CanonicalLogicalPlan,
567 pub cte_materializations: Vec<String>,
574}
575
576#[derive(Debug, Clone)]
577pub struct RuntimeIvfMatch {
578 pub entity_id: u64,
579 pub distance: f32,
580 pub entity: Option<UnifiedEntity>,
581}
582
583#[derive(Debug, Clone)]
584pub struct RuntimeIvfSearchResult {
585 pub collection: String,
586 pub k: usize,
587 pub n_lists: usize,
588 pub n_probes: usize,
589 pub stats: IvfStats,
590 pub matches: Vec<RuntimeIvfMatch>,
591}
592
593#[derive(Debug, Clone, Copy, PartialEq, Eq)]
594pub enum RuntimeGraphDirection {
595 Outgoing,
596 Incoming,
597 Both,
598}
599
600#[derive(Debug, Clone, Copy, PartialEq, Eq)]
601pub enum RuntimeGraphTraversalStrategy {
602 Bfs,
603 Dfs,
604}
605
606#[derive(Debug, Clone, Copy, PartialEq, Eq)]
607pub enum RuntimeGraphPathAlgorithm {
608 Bfs,
609 Dijkstra,
610 AStar,
611 BellmanFord,
612}
613
614#[derive(Debug, Clone)]
615pub struct RuntimeGraphNode {
616 pub id: String,
617 pub label: String,
618 pub node_type: String,
619 pub out_edge_count: u32,
620 pub in_edge_count: u32,
621}
622
623#[derive(Debug, Clone)]
624pub struct RuntimeGraphEdge {
625 pub source: String,
626 pub target: String,
627 pub edge_type: String,
628 pub weight: f32,
629}
630
631#[derive(Debug, Clone)]
632pub struct RuntimeGraphVisit {
633 pub depth: usize,
634 pub node: RuntimeGraphNode,
635}
636
637#[derive(Debug, Clone)]
638pub struct RuntimeGraphNeighborhoodResult {
639 pub source: String,
640 pub direction: RuntimeGraphDirection,
641 pub max_depth: usize,
642 pub nodes: Vec<RuntimeGraphVisit>,
643 pub edges: Vec<RuntimeGraphEdge>,
644}
645
646#[derive(Debug, Clone)]
647pub struct RuntimeGraphTraversalResult {
648 pub source: String,
649 pub direction: RuntimeGraphDirection,
650 pub strategy: RuntimeGraphTraversalStrategy,
651 pub max_depth: usize,
652 pub visits: Vec<RuntimeGraphVisit>,
653 pub edges: Vec<RuntimeGraphEdge>,
654}
655
656#[derive(Debug, Clone)]
657pub struct RuntimeGraphPath {
658 pub hop_count: usize,
659 pub total_weight: f64,
660 pub nodes: Vec<RuntimeGraphNode>,
661 pub edges: Vec<RuntimeGraphEdge>,
662}
663
664#[derive(Debug, Clone)]
665pub struct RuntimeGraphPathResult {
666 pub source: String,
667 pub target: String,
668 pub direction: RuntimeGraphDirection,
669 pub algorithm: RuntimeGraphPathAlgorithm,
670 pub nodes_visited: usize,
671 pub negative_cycle_detected: Option<bool>,
672 pub path: Option<RuntimeGraphPath>,
673}
674
675#[derive(Debug, Clone, Copy, PartialEq, Eq)]
676pub enum RuntimeGraphComponentsMode {
677 Connected,
678 Weak,
679 Strong,
680}
681
682#[derive(Debug, Clone, Copy, PartialEq, Eq)]
683pub enum RuntimeGraphCentralityAlgorithm {
684 Degree,
685 Closeness,
686 Betweenness,
687 Eigenvector,
688 PageRank,
689}
690
691#[derive(Debug, Clone, Copy, PartialEq, Eq)]
692pub enum RuntimeGraphCommunityAlgorithm {
693 LabelPropagation,
694 Louvain,
695}
696
697#[derive(Debug, Clone)]
698pub struct RuntimeGraphComponent {
699 pub id: String,
700 pub size: usize,
701 pub nodes: Vec<String>,
702}
703
704#[derive(Debug, Clone)]
705pub struct RuntimeGraphComponentsResult {
706 pub mode: RuntimeGraphComponentsMode,
707 pub count: usize,
708 pub components: Vec<RuntimeGraphComponent>,
709}
710
711#[derive(Debug, Clone)]
712pub struct RuntimeGraphCentralityScore {
713 pub node: RuntimeGraphNode,
714 pub score: f64,
715}
716
717#[derive(Debug, Clone)]
718pub struct RuntimeGraphDegreeScore {
719 pub node: RuntimeGraphNode,
720 pub in_degree: usize,
721 pub out_degree: usize,
722 pub total_degree: usize,
723}
724
725#[derive(Debug, Clone)]
726pub struct RuntimeGraphCentralityResult {
727 pub algorithm: RuntimeGraphCentralityAlgorithm,
728 pub normalized: Option<bool>,
729 pub iterations: Option<usize>,
730 pub converged: Option<bool>,
731 pub scores: Vec<RuntimeGraphCentralityScore>,
732 pub degree_scores: Vec<RuntimeGraphDegreeScore>,
733}
734
735#[derive(Debug, Clone)]
736pub struct RuntimeGraphCommunity {
737 pub id: String,
738 pub size: usize,
739 pub nodes: Vec<String>,
740}
741
742#[derive(Debug, Clone)]
743pub struct RuntimeGraphCommunityResult {
744 pub algorithm: RuntimeGraphCommunityAlgorithm,
745 pub count: usize,
746 pub iterations: Option<usize>,
747 pub converged: Option<bool>,
748 pub modularity: Option<f64>,
749 pub passes: Option<usize>,
750 pub communities: Vec<RuntimeGraphCommunity>,
751}
752
753#[derive(Debug, Clone)]
754pub struct RuntimeGraphClusteringResult {
755 pub global: f64,
756 pub local: Vec<RuntimeGraphCentralityScore>,
757 pub triangle_count: Option<usize>,
758}
759
760#[derive(Debug, Clone)]
761pub struct RuntimeGraphHitsResult {
762 pub iterations: usize,
763 pub converged: bool,
764 pub hubs: Vec<RuntimeGraphCentralityScore>,
765 pub authorities: Vec<RuntimeGraphCentralityScore>,
766}
767
768#[derive(Debug, Clone)]
769pub struct RuntimeGraphCyclesResult {
770 pub limit_reached: bool,
771 pub cycles: Vec<RuntimeGraphPath>,
772}
773
774#[derive(Debug, Clone)]
775pub struct RuntimeGraphTopologicalSortResult {
776 pub acyclic: bool,
777 pub ordered_nodes: Vec<RuntimeGraphNode>,
778}
779
780#[derive(Debug, Clone)]
781pub struct RuntimeGraphPropertiesResult {
782 pub node_count: usize,
783 pub edge_count: usize,
784 pub self_loop_count: usize,
785 pub negative_edge_count: usize,
786 pub connected_component_count: usize,
787 pub weak_component_count: usize,
788 pub strong_component_count: usize,
789 pub is_empty: bool,
790 pub is_connected: bool,
791 pub is_weakly_connected: bool,
792 pub is_strongly_connected: bool,
793 pub is_complete: bool,
794 pub is_complete_directed: bool,
795 pub is_cyclic: bool,
796 pub is_circular: bool,
797 pub is_acyclic: bool,
798 pub is_tree: bool,
799 pub density: f64,
800 pub density_directed: f64,
801}
802
803#[derive(Debug, Clone)]
808pub struct ContextSearchResult {
809 pub query: String,
810 pub tables: Vec<ContextEntity>,
811 pub graph: ContextGraphResult,
812 pub vectors: Vec<ContextEntity>,
813 pub documents: Vec<ContextEntity>,
814 pub key_values: Vec<ContextEntity>,
815 pub connections: Vec<ContextConnection>,
816 pub summary: ContextSummary,
817}
818
819#[derive(Debug, Clone)]
820pub struct ContextEntity {
821 pub entity: UnifiedEntity,
822 pub score: f32,
823 pub discovery: DiscoveryMethod,
824 pub collection: String,
825}
826
827#[derive(Debug, Clone)]
828pub enum DiscoveryMethod {
829 Indexed {
830 field: String,
831 },
832 GlobalScan,
833 CrossReference {
834 source_id: u64,
835 ref_type: String,
836 },
837 GraphTraversal {
838 source_id: u64,
839 edge_type: String,
840 depth: usize,
841 },
842 VectorQuery {
843 similarity: f32,
844 },
845}
846
847#[derive(Debug, Clone)]
848pub struct ContextGraphResult {
849 pub nodes: Vec<ContextEntity>,
850 pub edges: Vec<ContextEntity>,
851}
852
853#[derive(Debug, Clone)]
854pub struct ContextConnection {
855 pub from_id: u64,
856 pub to_id: u64,
857 pub connection_type: ContextConnectionType,
858 pub weight: f32,
859}
860
861#[derive(Debug, Clone)]
862pub enum ContextConnectionType {
863 CrossRef(String),
864 GraphEdge(String),
865 VectorSimilarity(f32),
866}
867
868#[derive(Debug, Clone)]
869pub struct ContextSummary {
870 pub total_entities: usize,
871 pub direct_matches: usize,
872 pub expanded_via_graph: usize,
873 pub expanded_via_cross_refs: usize,
874 pub expanded_via_vector_query: usize,
875 pub collections_searched: usize,
876 pub execution_time_us: u64,
877 pub tiers_used: Vec<String>,
878 pub entities_reindexed: usize,
879}
880
881struct PoolState {
882 next_id: u64,
883 active: usize,
884 idle: Vec<u64>,
885 total_checkouts: u64,
886}
887
888impl Default for PoolState {
889 fn default() -> Self {
890 Self {
891 next_id: 1,
892 active: 0,
893 idle: Vec::new(),
894 total_checkouts: 0,
895 }
896 }
897}
898
899#[derive(Debug, Clone)]
900struct RuntimeResultCacheEntry {
901 result: RuntimeQueryResult,
902 cached_at: std::time::Instant,
903 scopes: HashSet<String>,
904}
905
906pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
907pub(crate) const ASK_ANSWER_CACHE_NAMESPACE: &str = "runtime.ask_answer_cache";
908const RMW_LOCK_SHARDS: usize = 64;
909
910struct RmwLockTable {
911 shards: Vec<parking_lot::Mutex<HashMap<String, Arc<parking_lot::Mutex<()>>>>>,
912}
913
914impl RmwLockTable {
915 fn new() -> Self {
916 let shards = (0..RMW_LOCK_SHARDS)
917 .map(|_| parking_lot::Mutex::new(HashMap::new()))
918 .collect();
919 Self { shards }
920 }
921
922 fn lock_for(&self, collection: &str, key: &str) -> Arc<parking_lot::Mutex<()>> {
923 use std::hash::{Hash, Hasher};
924
925 let mut hasher = std::collections::hash_map::DefaultHasher::new();
926 collection.hash(&mut hasher);
927 key.hash(&mut hasher);
928 let shard_idx = (hasher.finish() as usize) % self.shards.len();
929 let map_key = format!("{collection}\u{1f}{key}");
930 let mut shard = self.shards[shard_idx].lock();
931 shard
932 .entry(map_key)
933 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
934 .clone()
935 }
936}
937
938struct RuntimeInner {
939 db: Arc<RedDB>,
940 layout: PhysicalLayout,
941 indices: IndexCatalog,
942 pool_config: ConnectionPoolConfig,
943 pool: Mutex<PoolState>,
944 started_at_unix_ms: u128,
945 probabilistic: probabilistic_store::ProbabilisticStore,
946 index_store: index_store::IndexStore,
947 cdc: crate::replication::cdc::CdcBuffer,
948 backup_scheduler: crate::replication::scheduler::BackupScheduler,
949 query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
950 result_cache: parking_lot::RwLock<(
951 HashMap<String, RuntimeResultCacheEntry>,
952 std::collections::VecDeque<String>,
953 )>,
954 result_blob_cache: crate::storage::cache::BlobCache,
955 result_blob_entries: parking_lot::RwLock<(
956 HashMap<String, RuntimeResultCacheEntry>,
957 std::collections::VecDeque<String>,
958 )>,
959 ask_answer_cache_entries:
960 parking_lot::RwLock<(HashSet<String>, std::collections::VecDeque<String>)>,
961 result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
962 ask_daily_spend:
963 parking_lot::RwLock<HashMap<String, crate::runtime::ai::cost_guard::DailyState>>,
964 queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
967 rmw_locks: RmwLockTable,
971 planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
972 ec_registry: Arc<crate::ec::config::EcRegistry>,
973 ec_worker: crate::ec::worker::EcWorker,
974 auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
979 oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
986 views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
997 materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
998 snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
1006 tx_contexts:
1013 parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
1014 lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
1021 env_config_overrides: HashMap<String, String>,
1027 tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
1035 rls_policies: parking_lot::RwLock<
1042 HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
1043 >,
1044 rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
1045 foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
1053 pending_tombstones: parking_lot::RwLock<
1065 HashMap<
1066 u64,
1067 Vec<(
1068 String,
1069 crate::storage::unified::entity::EntityId,
1070 crate::storage::transaction::snapshot::Xid,
1071 crate::storage::transaction::snapshot::Xid,
1072 )>,
1073 >,
1074 >,
1075 pending_versioned_updates: parking_lot::RwLock<
1081 HashMap<
1082 u64,
1083 Vec<(
1084 String,
1085 crate::storage::unified::entity::EntityId,
1086 crate::storage::unified::entity::EntityId,
1087 crate::storage::transaction::snapshot::Xid,
1088 crate::storage::transaction::snapshot::Xid,
1089 )>,
1090 >,
1091 >,
1092 pending_kv_watch_events:
1093 parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
1094 pending_store_wal_actions:
1095 parking_lot::RwLock<HashMap<u64, crate::storage::unified::DeferredStoreWalActions>>,
1096 tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
1105 ddl_epoch: std::sync::atomic::AtomicU64,
1111 write_gate: Arc<crate::runtime::write_gate::WriteGate>,
1122 lifecycle: crate::runtime::lifecycle::Lifecycle,
1128 resource_limits: crate::runtime::resource_limits::ResourceLimits,
1133 audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
1137 lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
1144 replica_apply_metrics: crate::replication::logical::ReplicaApplyMetrics,
1148 quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
1151 schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
1156 slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
1162 kv_stats: KvStatsCounters,
1165 metrics_ingest_stats: MetricsIngestCounters,
1166 metrics_tenant_activity_stats: MetricsTenantActivityCounters,
1167 kv_tag_index: KvTagIndex,
1169}
1170
1171#[derive(Clone)]
1172pub struct RedDBRuntime {
1173 inner: Arc<RuntimeInner>,
1174}
1175
1176pub struct RuntimeConnection {
1177 id: u64,
1178 inner: Arc<RuntimeInner>,
1179}
1180
1181pub mod ai;
1182pub mod ask_pipeline;
1183pub mod audit_log;
1184pub mod audit_query;
1185pub mod authorized_search;
1186mod collection_contract;
1187pub mod config_matrix;
1188pub mod config_overlay;
1189pub mod config_watcher;
1190pub(crate) mod ddl;
1191pub mod disk_space_monitor;
1192mod dml_target_scan;
1193mod expr_eval;
1194mod graph_dsl;
1195mod health_connection;
1196mod impl_config;
1197pub(crate) mod impl_core;
1198mod impl_ddl;
1199mod impl_dml;
1200mod impl_ec;
1201mod impl_events;
1202mod impl_graph;
1203mod impl_graph_commands;
1204pub mod impl_kv;
1205mod impl_migrations;
1206mod impl_native;
1207mod impl_physical;
1208mod impl_probabilistic;
1209pub mod impl_queue;
1210mod impl_search;
1211mod impl_timeseries;
1212mod impl_tree;
1213mod impl_vcs;
1214mod index_store;
1215mod join_filter;
1216mod keyed_spine;
1217pub mod kv_watch;
1218pub mod lease_lifecycle;
1219pub mod lease_loop;
1220pub mod lease_timer_wheel;
1221pub mod lifecycle;
1222pub mod locking;
1223pub(crate) mod mutation;
1224mod probabilistic_store;
1225pub(crate) mod query_exec;
1226mod queue_delivery;
1227pub mod quota_bucket;
1228mod record_search;
1229mod red_schema;
1230pub mod resource_limits;
1231pub(crate) mod scalar_evaluator;
1232pub mod schema_diff;
1233pub mod schema_vocabulary;
1234pub mod snapshot_reuse;
1235mod statement_frame;
1236mod table_row_mvcc_resolver;
1237mod vector_index;
1238pub mod within_clause;
1239pub mod write_gate;
1240
1241pub use self::graph_dsl::*;
1242use self::join_filter::*;
1243use self::query_exec::*;
1244use self::record_search::*;
1245pub use self::statement_frame::EffectiveScope;
1246
1247pub mod mvcc {
1253 pub use super::impl_core::{
1254 capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
1255 clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
1256 entity_visible_under_current_snapshot, entity_visible_with_context,
1257 set_current_auth_identity, set_current_connection_id, set_current_snapshot,
1258 set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
1259 };
1260}
1261
1262pub mod record_search_helpers {
1264 use crate::storage::query::UnifiedRecord;
1265 use crate::storage::UnifiedEntity;
1266 use std::collections::BTreeSet;
1267
1268 pub fn entity_type_and_capabilities(
1269 entity: &UnifiedEntity,
1270 ) -> (&'static str, BTreeSet<String>) {
1271 super::record_search::runtime_entity_type_and_capabilities(entity)
1272 }
1273
1274 pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
1279 super::record_search::runtime_any_record_from_entity(entity)
1280 }
1281}