1use std::cmp::Ordering;
4use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use crate::api::{RedDBError, RedDBOptions, RedDBResult};
10use crate::catalog::{
11 CatalogAnalyticsJobStatus, CatalogAttentionSummary, CatalogGraphProjectionStatus,
12 CatalogIndexStatus, CatalogModelSnapshot, CollectionDescriptor,
13};
14use crate::health::{HealthProvider, HealthReport};
15use crate::index::IndexCatalog;
16use crate::physical::{
17 ExportDescriptor, ManifestEvent, PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalLayout,
18 SnapshotDescriptor,
19};
20use crate::serde_json::Value as JsonValue;
21use crate::storage::engine::pathfinding::{AStar, BellmanFord, Dijkstra, BFS, DFS};
22use crate::storage::engine::{
23 BetweennessCentrality, ClosenessCentrality, ClusteringCoefficient, ConnectedComponents,
24 CycleDetector, DegreeCentrality, EigenvectorCentrality, GraphStore, IvfConfig, IvfIndex,
25 IvfStats, LabelPropagation, Louvain, MetadataEntry, MetadataFilter as VectorMetadataFilter,
26 MetadataValue as VectorMetadataValue, PageRank, PersonalizedPageRank, PhysicalFileHeader,
27 StoredNode, StronglyConnectedComponents, WeaklyConnectedComponents, HITS,
28};
29use crate::storage::query::ast::{
30 AlterOperation, AlterQueueQuery, AlterTableQuery, CompareOp, CreateCollectionQuery,
31 CreateIndexQuery, CreateQueueQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery,
32 CreateVectorQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropGraphQuery,
33 DropIndexQuery, DropKvQuery, DropQueueQuery, DropTableQuery, DropTimeSeriesQuery,
34 DropTreeQuery, DropVectorQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainFormat,
35 FieldRef, Filter, FusionStrategy, GraphCommand, HybridQuery, IndexMethod, InsertEntityType,
36 InsertQuery, JoinQuery, JoinType, OrderByClause, ProbabilisticCommand, Projection, QueryExpr,
37 QueueCommand, QueueSelectQuery, QueueSide, SearchCommand, TableQuery, TreeCommand,
38 TruncateQuery, UpdateQuery, VectorQuery, VectorSource,
39};
40use crate::storage::query::is_universal_entity_source as is_universal_query_source;
41use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
42use crate::storage::query::planner::{
43 CanonicalLogicalPlan, CanonicalPlanner, CostEstimator, QueryPlanner,
44};
45use crate::storage::query::unified::{UnifiedRecord, UnifiedResult};
46use crate::storage::schema::Value;
47use crate::storage::unified::dsl::{
48 apply_filters, cosine_similarity, Filter as DslFilter, FilterOp as DslFilterOp,
49 FilterValue as DslFilterValue, GraphPatternDsl, HybridQueryBuilder, MatchComponents,
50 QueryResult as DslQueryResult, ScoredMatch, TextSearchBuilder,
51};
52use crate::storage::unified::store::{
53 NativeCatalogSummary, NativeManifestSummary, NativePhysicalState, NativeRecoverySummary,
54 NativeRegistrySummary,
55};
56use crate::storage::unified::{
57 Metadata, MetadataValue as UnifiedMetadataValue, RefTarget, UnifiedMetadataFilter,
58};
59use crate::storage::{
60 EntityData, EntityId, EntityKind, RedDB, RefType, SimilarResult, StoreStats, UnifiedEntity,
61 UnifiedStore,
62};
63
64#[derive(Debug, Clone)]
65pub struct ConnectionPoolConfig {
66 pub max_connections: usize,
67 pub max_idle: usize,
68}
69
70impl Default for ConnectionPoolConfig {
71 fn default() -> Self {
72 Self {
73 max_connections: 64,
74 max_idle: 16,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub struct ScanCursor {
81 pub offset: usize,
82}
83
84#[derive(Debug, Clone)]
85pub struct ScanPage {
86 pub collection: String,
87 pub items: Vec<UnifiedEntity>,
88 pub next: Option<ScanCursor>,
89 pub total: usize,
90}
91
92#[derive(Debug, Clone)]
93pub struct SystemInfo {
94 pub pid: u32,
95 pub cpu_cores: usize,
96 pub total_memory_bytes: u64,
97 pub available_memory_bytes: u64,
98 pub os: String,
99 pub arch: String,
100 pub hostname: String,
101}
102
103impl SystemInfo {
104 pub fn should_parallelize() -> bool {
107 std::thread::available_parallelism()
108 .map(|p| p.get() > 1)
109 .unwrap_or(false)
110 }
111
112 pub fn collect() -> Self {
113 Self {
114 pid: std::process::id(),
115 cpu_cores: std::thread::available_parallelism()
116 .map(|p| p.get())
117 .unwrap_or(1),
118 total_memory_bytes: Self::read_total_memory(),
119 available_memory_bytes: Self::read_available_memory(),
120 os: std::env::consts::OS.to_string(),
121 arch: std::env::consts::ARCH.to_string(),
122 hostname: std::env::var("HOSTNAME")
123 .or_else(|_| std::env::var("COMPUTERNAME"))
124 .unwrap_or_else(|_| "unknown".to_string()),
125 }
126 }
127
128 #[cfg(target_os = "linux")]
129 fn read_total_memory() -> u64 {
130 std::fs::read_to_string("/proc/meminfo")
131 .ok()
132 .and_then(|s| {
133 s.lines()
134 .find(|l| l.starts_with("MemTotal:"))
135 .and_then(|l| {
136 l.split_whitespace()
137 .nth(1)
138 .and_then(|v| v.parse::<u64>().ok())
139 })
140 .map(|kb| kb * 1024)
141 })
142 .unwrap_or(0)
143 }
144
145 #[cfg(target_os = "linux")]
146 fn read_available_memory() -> u64 {
147 std::fs::read_to_string("/proc/meminfo")
148 .ok()
149 .and_then(|s| {
150 s.lines()
151 .find(|l| l.starts_with("MemAvailable:"))
152 .and_then(|l| {
153 l.split_whitespace()
154 .nth(1)
155 .and_then(|v| v.parse::<u64>().ok())
156 })
157 .map(|kb| kb * 1024)
158 })
159 .unwrap_or(0)
160 }
161
162 #[cfg(not(target_os = "linux"))]
163 fn read_total_memory() -> u64 {
164 0
165 }
166
167 #[cfg(not(target_os = "linux"))]
168 fn read_available_memory() -> u64 {
169 0
170 }
171}
172
173#[derive(Debug, Clone)]
174pub struct RuntimeStats {
175 pub active_connections: usize,
176 pub idle_connections: usize,
177 pub total_checkouts: u64,
178 pub paged_mode: bool,
179 pub started_at_unix_ms: u128,
180 pub store: StoreStats,
181 pub system: SystemInfo,
182 pub result_blob_cache: crate::storage::cache::BlobCacheStats,
183 pub kv: KvStats,
184 pub metrics_ingest: MetricsIngestStats,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
188pub struct MetricsTenantActivityStats {
189 pub tenant: String,
190 pub namespace: String,
191 pub operation: String,
192 pub count: u64,
193}
194
195#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
196pub struct MetricsIngestStats {
197 pub samples_accepted: u64,
198 pub series_accepted: u64,
199 pub samples_rejected: u64,
200 pub series_rejected: u64,
201 pub series_rejected_cardinality_budget: u64,
202}
203
204#[derive(Debug, Default)]
205pub(crate) struct MetricsIngestCounters {
206 samples_accepted: AtomicU64,
207 series_accepted: AtomicU64,
208 samples_rejected: AtomicU64,
209 series_rejected: AtomicU64,
210 series_rejected_cardinality_budget: AtomicU64,
211}
212
213impl MetricsIngestCounters {
214 pub(crate) fn record(
215 &self,
216 accepted_samples: u64,
217 accepted_series: u64,
218 rejected_samples: u64,
219 rejected_series: u64,
220 ) {
221 self.samples_accepted
222 .fetch_add(accepted_samples, AtomicOrdering::Relaxed);
223 self.series_accepted
224 .fetch_add(accepted_series, AtomicOrdering::Relaxed);
225 self.samples_rejected
226 .fetch_add(rejected_samples, AtomicOrdering::Relaxed);
227 self.series_rejected
228 .fetch_add(rejected_series, AtomicOrdering::Relaxed);
229 }
230
231 pub(crate) fn record_cardinality_budget_rejections(&self, rejected_series: u64) {
232 self.series_rejected_cardinality_budget
233 .fetch_add(rejected_series, AtomicOrdering::Relaxed);
234 }
235
236 pub(crate) fn snapshot(&self) -> MetricsIngestStats {
237 MetricsIngestStats {
238 samples_accepted: self.samples_accepted.load(AtomicOrdering::Relaxed),
239 series_accepted: self.series_accepted.load(AtomicOrdering::Relaxed),
240 samples_rejected: self.samples_rejected.load(AtomicOrdering::Relaxed),
241 series_rejected: self.series_rejected.load(AtomicOrdering::Relaxed),
242 series_rejected_cardinality_budget: self
243 .series_rejected_cardinality_budget
244 .load(AtomicOrdering::Relaxed),
245 }
246 }
247}
248
249#[derive(Debug, Default)]
250pub(crate) struct MetricsTenantActivityCounters {
251 inner: Mutex<BTreeMap<(String, String, String), u64>>,
252}
253
254impl MetricsTenantActivityCounters {
255 pub(crate) fn record(&self, tenant: &str, namespace: &str, operation: &str) {
256 let mut inner = self
257 .inner
258 .lock()
259 .unwrap_or_else(|poison| poison.into_inner());
260 let key = (
261 tenant.to_string(),
262 namespace.to_string(),
263 operation.to_string(),
264 );
265 *inner.entry(key).or_insert(0) += 1;
266 }
267
268 pub(crate) fn snapshot(&self) -> Vec<MetricsTenantActivityStats> {
269 let inner = self
270 .inner
271 .lock()
272 .unwrap_or_else(|poison| poison.into_inner());
273 inner
274 .iter()
275 .map(
276 |((tenant, namespace, operation), count)| MetricsTenantActivityStats {
277 tenant: tenant.clone(),
278 namespace: namespace.clone(),
279 operation: operation.clone(),
280 count: *count,
281 },
282 )
283 .collect()
284 }
285}
286
287#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
288pub struct KvStats {
289 pub puts: u64,
290 pub gets: u64,
291 pub deletes: u64,
292 pub incrs: u64,
293 pub cas_success: u64,
294 pub cas_conflict: u64,
295 pub watch_streams_active: u64,
296 pub watch_events_emitted: u64,
297 pub watch_drops: u64,
298}
299
300#[derive(Debug, Default)]
301pub(crate) struct KvStatsCounters {
302 puts: AtomicU64,
303 gets: AtomicU64,
304 deletes: AtomicU64,
305 incrs: AtomicU64,
306 cas_success: AtomicU64,
307 cas_conflict: AtomicU64,
308 watch_streams_active: AtomicU64,
309 watch_events_emitted: AtomicU64,
310 watch_drops: AtomicU64,
311}
312
313#[derive(Debug, Default)]
314pub(crate) struct KvTagIndex {
315 tag_to_entries: parking_lot::RwLock<HashMap<(String, String), HashMap<String, EntityId>>>,
316 key_to_tags: parking_lot::RwLock<HashMap<(String, String), BTreeSet<String>>>,
317}
318
319impl KvTagIndex {
320 pub(crate) fn replace(&self, collection: &str, key: &str, id: EntityId, tags: &[String]) {
321 let entry_key = (collection.to_string(), key.to_string());
322 let new_tags: BTreeSet<String> = tags
323 .iter()
324 .map(|tag| tag.trim())
325 .filter(|tag| !tag.is_empty())
326 .map(ToOwned::to_owned)
327 .collect();
328
329 let old_tags = {
330 let mut key_to_tags = self.key_to_tags.write();
331 if new_tags.is_empty() {
332 key_to_tags.remove(&entry_key)
333 } else {
334 key_to_tags.insert(entry_key.clone(), new_tags.clone())
335 }
336 };
337
338 let mut tag_to_entries = self.tag_to_entries.write();
339 if let Some(old_tags) = old_tags {
340 for tag in old_tags {
341 let scoped = (collection.to_string(), tag);
342 let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
343 entries.remove(key);
344 entries.is_empty()
345 } else {
346 false
347 };
348 if remove_scoped {
349 tag_to_entries.remove(&scoped);
350 }
351 }
352 }
353
354 for tag in new_tags {
355 tag_to_entries
356 .entry((collection.to_string(), tag))
357 .or_default()
358 .insert(key.to_string(), id);
359 }
360 }
361
362 pub(crate) fn remove(&self, collection: &str, key: &str) {
363 let entry_key = (collection.to_string(), key.to_string());
364 let old_tags = self.key_to_tags.write().remove(&entry_key);
365 let Some(old_tags) = old_tags else {
366 return;
367 };
368
369 let mut tag_to_entries = self.tag_to_entries.write();
370 for tag in old_tags {
371 let scoped = (collection.to_string(), tag);
372 let remove_scoped = if let Some(entries) = tag_to_entries.get_mut(&scoped) {
373 entries.remove(key);
374 entries.is_empty()
375 } else {
376 false
377 };
378 if remove_scoped {
379 tag_to_entries.remove(&scoped);
380 }
381 }
382 }
383
384 pub(crate) fn entries_for_tags(
385 &self,
386 collection: &str,
387 tags: &[String],
388 ) -> Vec<(String, EntityId)> {
389 if tags.is_empty() {
390 return Vec::new();
391 }
392
393 let tag_to_entries = self.tag_to_entries.read();
394 let mut out: HashMap<String, EntityId> = HashMap::new();
395 for tag in tags {
396 let scoped = (collection.to_string(), tag.trim().to_string());
397 if let Some(entries) = tag_to_entries.get(&scoped) {
398 for (key, id) in entries {
399 out.entry(key.clone()).or_insert(*id);
400 }
401 }
402 }
403 out.into_iter().collect()
404 }
405
406 pub(crate) fn tags_for_key(&self, collection: &str, key: &str) -> Vec<String> {
407 self.key_to_tags
408 .read()
409 .get(&(collection.to_string(), key.to_string()))
410 .map(|tags| tags.iter().cloned().collect())
411 .unwrap_or_default()
412 }
413}
414
415impl KvStatsCounters {
416 pub(crate) fn snapshot(&self) -> KvStats {
417 KvStats {
418 puts: self.puts.load(AtomicOrdering::Relaxed),
419 gets: self.gets.load(AtomicOrdering::Relaxed),
420 deletes: self.deletes.load(AtomicOrdering::Relaxed),
421 incrs: self.incrs.load(AtomicOrdering::Relaxed),
422 cas_success: self.cas_success.load(AtomicOrdering::Relaxed),
423 cas_conflict: self.cas_conflict.load(AtomicOrdering::Relaxed),
424 watch_streams_active: self.watch_streams_active.load(AtomicOrdering::Relaxed),
425 watch_events_emitted: self.watch_events_emitted.load(AtomicOrdering::Relaxed),
426 watch_drops: self.watch_drops.load(AtomicOrdering::Relaxed),
427 }
428 }
429
430 pub(crate) fn incr_puts(&self) {
431 self.puts.fetch_add(1, AtomicOrdering::Relaxed);
432 }
433
434 pub(crate) fn incr_gets(&self) {
435 self.gets.fetch_add(1, AtomicOrdering::Relaxed);
436 }
437
438 pub(crate) fn incr_deletes(&self) {
439 self.deletes.fetch_add(1, AtomicOrdering::Relaxed);
440 }
441
442 pub(crate) fn incr_incrs(&self) {
443 self.incrs.fetch_add(1, AtomicOrdering::Relaxed);
444 }
445
446 pub(crate) fn incr_cas_success(&self) {
447 self.cas_success.fetch_add(1, AtomicOrdering::Relaxed);
448 }
449
450 pub(crate) fn incr_cas_conflict(&self) {
451 self.cas_conflict.fetch_add(1, AtomicOrdering::Relaxed);
452 }
453
454 pub(crate) fn incr_watch_streams_active(&self) {
455 self.watch_streams_active
456 .fetch_add(1, AtomicOrdering::Relaxed);
457 }
458
459 pub(crate) fn decr_watch_streams_active(&self) {
460 self.watch_streams_active
461 .fetch_sub(1, AtomicOrdering::Relaxed);
462 }
463
464 pub(crate) fn incr_watch_events_emitted(&self) {
465 self.watch_events_emitted
466 .fetch_add(1, AtomicOrdering::Relaxed);
467 }
468
469 pub(crate) fn add_watch_drops(&self, count: u64) {
470 self.watch_drops.fetch_add(count, AtomicOrdering::Relaxed);
471 }
472}
473
474#[derive(Debug, Clone)]
475pub struct RuntimeQueryResult {
476 pub query: String,
477 pub mode: QueryMode,
478 pub statement: &'static str,
479 pub engine: &'static str,
480 pub result: UnifiedResult,
481 pub affected_rows: u64,
482 pub statement_type: &'static str,
484 pub bookmark: Option<String>,
485}
486
487impl RuntimeQueryResult {
488 pub fn dml_result(
490 query: String,
491 affected: u64,
492 statement_type: &'static str,
493 engine: &'static str,
494 ) -> Self {
495 Self {
496 query,
497 mode: QueryMode::Sql,
498 statement: statement_type,
499 engine,
500 result: UnifiedResult::empty(),
501 affected_rows: affected,
502 statement_type,
503 bookmark: None,
504 }
505 }
506
507 pub fn ok_message(query: String, message: &str, statement_type: &'static str) -> Self {
509 let mut result = UnifiedResult::empty();
510 let mut record = UnifiedRecord::new();
511 record.set("message", Value::text(message.to_string()));
512 result.push(record);
513 result.columns = vec!["message".to_string()];
514
515 Self {
516 query,
517 mode: QueryMode::Sql,
518 statement: statement_type,
519 engine: "runtime-ddl",
520 result,
521 affected_rows: 0,
522 statement_type,
523 bookmark: None,
524 }
525 }
526
527 pub fn ok_records(
531 query: String,
532 columns: Vec<String>,
533 rows: Vec<Vec<(String, Value)>>,
534 statement_type: &'static str,
535 ) -> Self {
536 let mut result = UnifiedResult::empty();
537 for row in rows {
538 let mut record = UnifiedRecord::new();
539 for (k, v) in row {
540 record.set(&k, v);
541 }
542 result.push(record);
543 }
544 result.columns = columns;
545
546 Self {
547 query,
548 mode: QueryMode::Sql,
549 statement: statement_type,
550 engine: "runtime-meta",
551 result,
552 affected_rows: 0,
553 statement_type,
554 bookmark: None,
555 }
556 }
557
558 pub fn bookmark_token(&self) -> Option<&str> {
559 self.bookmark.as_deref()
560 }
561}
562
563#[derive(Debug, Clone)]
564pub struct RuntimeQueryExplain {
565 pub query: String,
566 pub mode: QueryMode,
567 pub statement: &'static str,
568 pub is_universal: bool,
569 pub plan_cost: crate::storage::query::planner::PlanCost,
570 pub estimated_rows: f64,
571 pub estimated_selectivity: f64,
572 pub estimated_confidence: f64,
573 pub passes_applied: Vec<String>,
574 pub logical_plan: CanonicalLogicalPlan,
575 pub cte_materializations: Vec<String>,
582}
583
584#[derive(Debug, Clone)]
585pub struct RuntimeIvfMatch {
586 pub entity_id: u64,
587 pub distance: f32,
588 pub entity: Option<UnifiedEntity>,
589}
590
591#[derive(Debug, Clone)]
592pub struct RuntimeIvfSearchResult {
593 pub collection: String,
594 pub k: usize,
595 pub n_lists: usize,
596 pub n_probes: usize,
597 pub stats: IvfStats,
598 pub matches: Vec<RuntimeIvfMatch>,
599}
600
601#[derive(Debug, Clone, Copy, PartialEq, Eq)]
602pub enum RuntimeGraphDirection {
603 Outgoing,
604 Incoming,
605 Both,
606}
607
608#[derive(Debug, Clone, Copy, PartialEq, Eq)]
609pub enum RuntimeGraphTraversalStrategy {
610 Bfs,
611 Dfs,
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq)]
615pub enum RuntimeGraphPathAlgorithm {
616 Bfs,
617 Dijkstra,
618 AStar,
619 BellmanFord,
620}
621
622#[derive(Debug, Clone)]
623pub struct RuntimeGraphNode {
624 pub id: String,
625 pub label: String,
626 pub node_type: String,
627 pub out_edge_count: u32,
628 pub in_edge_count: u32,
629}
630
631#[derive(Debug, Clone)]
632pub struct RuntimeGraphEdge {
633 pub source: String,
634 pub target: String,
635 pub edge_type: String,
636 pub weight: f32,
637}
638
639#[derive(Debug, Clone)]
640pub struct RuntimeGraphVisit {
641 pub depth: usize,
642 pub node: RuntimeGraphNode,
643}
644
645#[derive(Debug, Clone)]
646pub struct RuntimeGraphNeighborhoodResult {
647 pub source: String,
648 pub direction: RuntimeGraphDirection,
649 pub max_depth: usize,
650 pub nodes: Vec<RuntimeGraphVisit>,
651 pub edges: Vec<RuntimeGraphEdge>,
652}
653
654#[derive(Debug, Clone)]
655pub struct RuntimeGraphTraversalResult {
656 pub source: String,
657 pub direction: RuntimeGraphDirection,
658 pub strategy: RuntimeGraphTraversalStrategy,
659 pub max_depth: usize,
660 pub visits: Vec<RuntimeGraphVisit>,
661 pub edges: Vec<RuntimeGraphEdge>,
662}
663
664#[derive(Debug, Clone)]
665pub struct RuntimeGraphPath {
666 pub hop_count: usize,
667 pub total_weight: f64,
668 pub nodes: Vec<RuntimeGraphNode>,
669 pub edges: Vec<RuntimeGraphEdge>,
670}
671
672#[derive(Debug, Clone)]
673pub struct RuntimeGraphPathResult {
674 pub source: String,
675 pub target: String,
676 pub direction: RuntimeGraphDirection,
677 pub algorithm: RuntimeGraphPathAlgorithm,
678 pub nodes_visited: usize,
679 pub negative_cycle_detected: Option<bool>,
680 pub path: Option<RuntimeGraphPath>,
681}
682
683#[derive(Debug, Clone, Copy, PartialEq, Eq)]
684pub enum RuntimeGraphComponentsMode {
685 Connected,
686 Weak,
687 Strong,
688}
689
690#[derive(Debug, Clone, Copy, PartialEq, Eq)]
691pub enum RuntimeGraphCentralityAlgorithm {
692 Degree,
693 Closeness,
694 Betweenness,
695 Eigenvector,
696 PageRank,
697}
698
699#[derive(Debug, Clone, Copy, PartialEq, Eq)]
700pub enum RuntimeGraphCommunityAlgorithm {
701 LabelPropagation,
702 Louvain,
703}
704
705#[derive(Debug, Clone)]
706pub struct RuntimeGraphComponent {
707 pub id: String,
708 pub size: usize,
709 pub nodes: Vec<String>,
710}
711
712#[derive(Debug, Clone)]
713pub struct RuntimeGraphComponentsResult {
714 pub mode: RuntimeGraphComponentsMode,
715 pub count: usize,
716 pub components: Vec<RuntimeGraphComponent>,
717}
718
719#[derive(Debug, Clone)]
720pub struct RuntimeGraphCentralityScore {
721 pub node: RuntimeGraphNode,
722 pub score: f64,
723}
724
725#[derive(Debug, Clone)]
726pub struct RuntimeGraphDegreeScore {
727 pub node: RuntimeGraphNode,
728 pub in_degree: usize,
729 pub out_degree: usize,
730 pub total_degree: usize,
731}
732
733#[derive(Debug, Clone)]
734pub struct RuntimeGraphCentralityResult {
735 pub algorithm: RuntimeGraphCentralityAlgorithm,
736 pub normalized: Option<bool>,
737 pub iterations: Option<usize>,
738 pub converged: Option<bool>,
739 pub scores: Vec<RuntimeGraphCentralityScore>,
740 pub degree_scores: Vec<RuntimeGraphDegreeScore>,
741}
742
743#[derive(Debug, Clone)]
744pub struct RuntimeGraphCommunity {
745 pub id: String,
746 pub size: usize,
747 pub nodes: Vec<String>,
748}
749
750#[derive(Debug, Clone)]
751pub struct RuntimeGraphCommunityResult {
752 pub algorithm: RuntimeGraphCommunityAlgorithm,
753 pub count: usize,
754 pub iterations: Option<usize>,
755 pub converged: Option<bool>,
756 pub modularity: Option<f64>,
757 pub passes: Option<usize>,
758 pub communities: Vec<RuntimeGraphCommunity>,
759}
760
761#[derive(Debug, Clone)]
762pub struct RuntimeGraphClusteringResult {
763 pub global: f64,
764 pub local: Vec<RuntimeGraphCentralityScore>,
765 pub triangle_count: Option<usize>,
766}
767
768#[derive(Debug, Clone)]
769pub struct RuntimeGraphHitsResult {
770 pub iterations: usize,
771 pub converged: bool,
772 pub hubs: Vec<RuntimeGraphCentralityScore>,
773 pub authorities: Vec<RuntimeGraphCentralityScore>,
774}
775
776#[derive(Debug, Clone)]
777pub struct RuntimeGraphCyclesResult {
778 pub limit_reached: bool,
779 pub cycles: Vec<RuntimeGraphPath>,
780}
781
782#[derive(Debug, Clone)]
783pub struct RuntimeGraphTopologicalSortResult {
784 pub acyclic: bool,
785 pub ordered_nodes: Vec<RuntimeGraphNode>,
786}
787
788#[derive(Debug, Clone)]
789pub struct RuntimeGraphPropertiesResult {
790 pub node_count: usize,
791 pub edge_count: usize,
792 pub self_loop_count: usize,
793 pub negative_edge_count: usize,
794 pub connected_component_count: usize,
795 pub weak_component_count: usize,
796 pub strong_component_count: usize,
797 pub is_empty: bool,
798 pub is_connected: bool,
799 pub is_weakly_connected: bool,
800 pub is_strongly_connected: bool,
801 pub is_complete: bool,
802 pub is_complete_directed: bool,
803 pub is_cyclic: bool,
804 pub is_circular: bool,
805 pub is_acyclic: bool,
806 pub is_tree: bool,
807 pub density: f64,
808 pub density_directed: f64,
809}
810
811#[derive(Debug, Clone)]
816pub struct ContextSearchResult {
817 pub query: String,
818 pub tables: Vec<ContextEntity>,
819 pub graph: ContextGraphResult,
820 pub vectors: Vec<ContextEntity>,
821 pub documents: Vec<ContextEntity>,
822 pub key_values: Vec<ContextEntity>,
823 pub connections: Vec<ContextConnection>,
824 pub summary: ContextSummary,
825}
826
827#[derive(Debug, Clone)]
828pub struct ContextEntity {
829 pub entity: UnifiedEntity,
830 pub score: f32,
831 pub discovery: DiscoveryMethod,
832 pub collection: String,
833}
834
835#[derive(Debug, Clone)]
836pub enum DiscoveryMethod {
837 Indexed {
838 field: String,
839 },
840 GlobalScan,
841 CrossReference {
842 source_id: u64,
843 ref_type: String,
844 },
845 GraphTraversal {
846 source_id: u64,
847 edge_type: String,
848 depth: usize,
849 },
850 VectorQuery {
851 similarity: f32,
852 },
853}
854
855#[derive(Debug, Clone)]
856pub struct ContextGraphResult {
857 pub nodes: Vec<ContextEntity>,
858 pub edges: Vec<ContextEntity>,
859}
860
861#[derive(Debug, Clone)]
862pub struct ContextConnection {
863 pub from_id: u64,
864 pub to_id: u64,
865 pub connection_type: ContextConnectionType,
866 pub weight: f32,
867}
868
869#[derive(Debug, Clone)]
870pub enum ContextConnectionType {
871 CrossRef(String),
872 GraphEdge(String),
873 VectorSimilarity(f32),
874}
875
876#[derive(Debug, Clone)]
877pub struct ContextSummary {
878 pub total_entities: usize,
879 pub direct_matches: usize,
880 pub expanded_via_graph: usize,
881 pub expanded_via_cross_refs: usize,
882 pub expanded_via_vector_query: usize,
883 pub collections_searched: usize,
884 pub execution_time_us: u64,
885 pub tiers_used: Vec<String>,
886 pub entities_reindexed: usize,
887}
888
889struct PoolState {
890 next_id: u64,
891 active: usize,
892 idle: Vec<u64>,
893 total_checkouts: u64,
894}
895
896impl Default for PoolState {
897 fn default() -> Self {
898 Self {
899 next_id: 1,
900 active: 0,
901 idle: Vec::new(),
902 total_checkouts: 0,
903 }
904 }
905}
906
907#[derive(Debug, Clone)]
908struct RuntimeResultCacheEntry {
909 result: RuntimeQueryResult,
910 cached_at: std::time::Instant,
911 scopes: HashSet<String>,
912}
913
914pub const METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL: &str = "cache_shadow_divergence_total";
915pub const METRIC_RESULT_CACHE_HIT_TOTAL: &str = "result_cache_hit_total";
920pub const METRIC_RESULT_CACHE_MISS_TOTAL: &str = "result_cache_miss_total";
921pub const METRIC_RESULT_CACHE_EVICT_TOTAL: &str = "result_cache_evict_total";
922pub(crate) const ASK_ANSWER_CACHE_NAMESPACE: &str = "runtime.ask_answer_cache";
923const RMW_LOCK_SHARDS: usize = 64;
924
925struct RmwLockTable {
926 shards: Vec<parking_lot::Mutex<HashMap<String, Arc<parking_lot::Mutex<()>>>>>,
927}
928
929impl RmwLockTable {
930 fn new() -> Self {
931 let shards = (0..RMW_LOCK_SHARDS)
932 .map(|_| parking_lot::Mutex::new(HashMap::new()))
933 .collect();
934 Self { shards }
935 }
936
937 fn lock_for(&self, collection: &str, key: &str) -> Arc<parking_lot::Mutex<()>> {
938 use std::hash::{Hash, Hasher};
939
940 let mut hasher = std::collections::hash_map::DefaultHasher::new();
941 collection.hash(&mut hasher);
942 key.hash(&mut hasher);
943 let shard_idx = (hasher.finish() as usize) % self.shards.len();
944 let map_key = format!("{collection}\u{1f}{key}");
945 let mut shard = self.shards[shard_idx].lock();
946 shard
947 .entry(map_key)
948 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
949 .clone()
950 }
951}
952
953struct RuntimeInner {
954 db: Arc<RedDB>,
955 layout: PhysicalLayout,
956 indices: IndexCatalog,
957 pool_config: ConnectionPoolConfig,
958 pool: Mutex<PoolState>,
959 started_at_unix_ms: u128,
960 probabilistic: probabilistic_store::ProbabilisticStore,
961 index_store: index_store::IndexStore,
962 cdc: crate::replication::cdc::CdcBuffer,
963 backup_scheduler: crate::replication::scheduler::BackupScheduler,
964 query_cache: parking_lot::RwLock<crate::storage::query::planner::cache::PlanCache>,
965 result_cache: parking_lot::RwLock<(
966 HashMap<String, RuntimeResultCacheEntry>,
967 std::collections::VecDeque<String>,
968 )>,
969 result_blob_cache: crate::storage::cache::BlobCache,
970 result_blob_entries: parking_lot::RwLock<(
971 HashMap<String, RuntimeResultCacheEntry>,
972 std::collections::VecDeque<String>,
973 )>,
974 ask_answer_cache_entries:
975 parking_lot::RwLock<(HashSet<String>, std::collections::VecDeque<String>)>,
976 result_cache_shadow_divergences: std::sync::atomic::AtomicU64,
977 result_cache_hits: std::sync::atomic::AtomicU64,
980 result_cache_misses: std::sync::atomic::AtomicU64,
981 result_cache_evictions: std::sync::atomic::AtomicU64,
982 ask_daily_spend:
983 parking_lot::RwLock<HashMap<String, crate::runtime::ai::cost_guard::DailyState>>,
984 queue_message_locks: parking_lot::RwLock<HashMap<String, Arc<parking_lot::Mutex<()>>>>,
987 rmw_locks: RmwLockTable,
991 planner_dirty_tables: parking_lot::RwLock<HashSet<String>>,
992 ec_registry: Arc<crate::ec::config::EcRegistry>,
993 config_registry: Arc<crate::auth::registry::ConfigRegistry>,
994 ec_worker: crate::ec::worker::EcWorker,
995 auth_store: parking_lot::RwLock<Option<Arc<crate::auth::store::AuthStore>>>,
1000 oauth_validator: parking_lot::RwLock<Option<Arc<crate::auth::oauth::OAuthValidator>>>,
1007 views: parking_lot::RwLock<HashMap<String, Arc<crate::storage::query::ast::CreateViewQuery>>>,
1018 materialized_views: parking_lot::RwLock<crate::storage::cache::result::MaterializedViewCache>,
1019 pub(crate) retention_sweeper:
1024 parking_lot::RwLock<crate::runtime::retention_sweeper::RetentionSweeperState>,
1025 snapshot_manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
1033 tx_contexts:
1040 parking_lot::RwLock<HashMap<u64, crate::storage::transaction::snapshot::TxnContext>>,
1041 lock_manager: Arc<crate::storage::transaction::lock::LockManager>,
1048 env_config_overrides: HashMap<String, String>,
1054 tx_local_tenants: parking_lot::RwLock<HashMap<u64, Option<String>>>,
1062 rls_policies: parking_lot::RwLock<
1069 HashMap<(String, String), Arc<crate::storage::query::ast::CreatePolicyQuery>>,
1070 >,
1071 rls_enabled_tables: parking_lot::RwLock<HashSet<String>>,
1072 foreign_tables: Arc<crate::storage::fdw::ForeignTableRegistry>,
1080 pending_tombstones: parking_lot::RwLock<
1092 HashMap<
1093 u64,
1094 Vec<(
1095 String,
1096 crate::storage::unified::entity::EntityId,
1097 crate::storage::transaction::snapshot::Xid,
1098 crate::storage::transaction::snapshot::Xid,
1099 )>,
1100 >,
1101 >,
1102 pending_versioned_updates: parking_lot::RwLock<
1108 HashMap<
1109 u64,
1110 Vec<(
1111 String,
1112 crate::storage::unified::entity::EntityId,
1113 crate::storage::unified::entity::EntityId,
1114 crate::storage::transaction::snapshot::Xid,
1115 crate::storage::transaction::snapshot::Xid,
1116 )>,
1117 >,
1118 >,
1119 pending_kv_watch_events:
1120 parking_lot::RwLock<HashMap<u64, Vec<crate::replication::cdc::KvWatchEvent>>>,
1121 pending_store_wal_actions:
1122 parking_lot::RwLock<HashMap<u64, crate::storage::unified::DeferredStoreWalActions>>,
1123 tenant_tables: parking_lot::RwLock<HashMap<String, String>>,
1132 ddl_epoch: std::sync::atomic::AtomicU64,
1138 write_gate: Arc<crate::runtime::write_gate::WriteGate>,
1149 lifecycle: crate::runtime::lifecycle::Lifecycle,
1155 resource_limits: crate::runtime::resource_limits::ResourceLimits,
1160 audit_log: Arc<crate::runtime::audit_log::AuditLogger>,
1164 control_event_ledger:
1168 parking_lot::RwLock<Arc<dyn crate::runtime::control_events::ControlEventLedger>>,
1169 control_event_config: crate::runtime::control_events::ControlEventConfig,
1170 query_audit: Arc<crate::runtime::query_audit::QueryAuditStream>,
1174 lease_lifecycle: std::sync::OnceLock<Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>>,
1181 replica_apply_metrics: Arc<crate::replication::logical::ReplicaApplyMetrics>,
1185 quota_bucket: crate::runtime::quota_bucket::QuotaBucket,
1188 schema_vocabulary: parking_lot::RwLock<crate::runtime::schema_vocabulary::SchemaVocabulary>,
1193 slow_query_logger: Arc<crate::telemetry::slow_query_logger::SlowQueryLogger>,
1199 kv_stats: KvStatsCounters,
1202 metrics_ingest_stats: MetricsIngestCounters,
1203 metrics_tenant_activity_stats: MetricsTenantActivityCounters,
1204 queue_telemetry: std::sync::Arc<queue_telemetry::QueueTelemetryCounters>,
1208 queue_presence: std::sync::Arc<crate::storage::queue::presence::ConsumerPresenceRegistry>,
1215 vector_introspection:
1224 std::sync::Arc<crate::storage::vector::introspection::VectorIntrospectionRegistry>,
1225 queue_wait_registry: std::sync::Arc<queue_wait_registry::QueueWaitRegistry>,
1229 pending_queue_wakes: parking_lot::RwLock<HashMap<u64, Vec<(String, String)>>>,
1235 kv_tag_index: KvTagIndex,
1237 chain_tip_cache:
1243 parking_lot::Mutex<HashMap<String, crate::runtime::blockchain_kind::ChainTipFull>>,
1244 chain_integrity_broken: parking_lot::Mutex<HashMap<String, bool>>,
1249 integrity_tombstones:
1256 parking_lot::Mutex<Vec<crate::runtime::integrity_tombstone::TombstoneRange>>,
1257 integrity_tombstones_state: std::sync::atomic::AtomicU8,
1258}
1259
1260#[derive(Clone)]
1261pub struct RedDBRuntime {
1262 inner: Arc<RuntimeInner>,
1263}
1264
1265pub struct RuntimeConnection {
1266 id: u64,
1267 inner: Arc<RuntimeInner>,
1268}
1269
1270pub struct CausalSession {
1271 runtime: RedDBRuntime,
1272 bookmark: Option<crate::replication::CausalBookmark>,
1273 wait_timeout: std::time::Duration,
1274}
1275
1276impl CausalSession {
1277 pub fn bookmark_token(&self) -> Option<String> {
1278 self.bookmark.map(|bookmark| bookmark.encode())
1279 }
1280
1281 pub fn inject_bookmark(&mut self, token: &str) -> RedDBResult<()> {
1282 let bookmark = crate::replication::CausalBookmark::decode(token)
1283 .map_err(|err| RedDBError::InvalidOperation(err.to_string()))?;
1284 self.bookmark = Some(bookmark);
1285 Ok(())
1286 }
1287
1288 pub fn execute_query(&mut self, query: &str) -> RedDBResult<RuntimeQueryResult> {
1289 if is_select_query_text(query) {
1290 if let Some(bookmark) = self.bookmark {
1291 self.runtime
1292 .wait_for_bookmark(&bookmark, self.wait_timeout)?;
1293 }
1294 }
1295 let result = self.runtime.execute_query(query)?;
1296 if let Some(token) = result.bookmark.as_deref() {
1297 self.inject_bookmark(token)?;
1298 }
1299 Ok(result)
1300 }
1301}
1302
1303fn is_select_query_text(query: &str) -> bool {
1304 query
1305 .trim_start()
1306 .get(..6)
1307 .is_some_and(|prefix| prefix.eq_ignore_ascii_case("select"))
1308}
1309
1310pub mod ai;
1311pub mod analytics_schema_registry;
1312pub(crate) mod analytics_source_catalog;
1313pub mod ask_pipeline;
1314pub mod audit_log;
1315pub mod audit_query;
1316pub mod authorized_search;
1317pub mod batch_insert;
1318pub mod blockchain_kind;
1319mod collection_contract;
1320pub mod config_matrix;
1321pub mod config_overlay;
1322pub mod config_watcher;
1323pub mod continuous_materialized_view;
1324pub mod control_events;
1325pub(crate) mod ddl;
1326pub mod disk_space_monitor;
1327mod dml_target_scan;
1328pub mod evidence_export;
1329mod expr_eval;
1330mod graph_dsl;
1331mod health_connection;
1332mod impl_config;
1333pub(crate) mod impl_core;
1334mod impl_ddl;
1335mod impl_dml;
1336mod impl_ec;
1337mod impl_events;
1338mod impl_graph;
1339mod impl_graph_commands;
1340pub mod impl_kv;
1341mod impl_migrations;
1342mod impl_native;
1343mod impl_physical;
1344mod impl_probabilistic;
1345pub mod impl_queue;
1346mod impl_search;
1347mod impl_timeseries;
1348mod impl_tree;
1349mod impl_vcs;
1350mod index_store;
1351pub mod integrity_tombstone;
1352mod join_filter;
1353mod keyed_spine;
1354pub mod kv_watch;
1355pub mod lease_lifecycle;
1356pub mod lease_loop;
1357pub mod lease_timer_wheel;
1358pub mod lifecycle;
1359pub mod locking;
1360pub(crate) mod materialization_limit;
1361pub(crate) mod metric_descriptor_catalog;
1362pub(crate) mod mutation;
1363pub(crate) mod primary_queue_store;
1364mod probabilistic_store;
1365pub mod query_audit;
1366pub(crate) mod query_exec;
1367mod queue_delivery;
1368pub(crate) mod queue_lifecycle;
1369pub(crate) mod queue_telemetry;
1370pub(crate) mod queue_wait_registry;
1371pub mod quota_bucket;
1372mod record_search;
1373mod red_schema;
1374pub(crate) mod replica_queue_store;
1375pub mod resource_limits;
1376pub(crate) mod retention_filter;
1377pub(crate) mod retention_sweeper;
1378pub(crate) mod scalar_evaluator;
1379pub mod schema_diff;
1380pub mod schema_vocabulary;
1381pub(crate) mod sessionize;
1382pub mod signed_chain;
1383pub mod signed_writes_kind;
1384pub(crate) mod slo_descriptor_catalog;
1385pub mod snapshot_reuse;
1386mod statement_frame;
1387mod table_row_mvcc_resolver;
1388pub mod turbo_crash_inject;
1389mod vector_index;
1390pub mod vector_turbo_kind;
1391pub(crate) mod window_phase;
1392pub mod within_clause;
1393pub mod write_gate;
1394
1395pub use self::graph_dsl::*;
1396use self::join_filter::*;
1397use self::query_exec::*;
1398use self::record_search::*;
1399pub use self::statement_frame::EffectiveScope;
1400
1401pub mod mvcc {
1407 pub use super::impl_core::{
1408 capture_current_snapshot, clear_current_auth_identity, clear_current_connection_id,
1409 clear_current_snapshot, clear_current_tenant, current_connection_id, current_tenant,
1410 entity_visible_under_current_snapshot, entity_visible_with_context,
1411 set_current_auth_identity, set_current_connection_id, set_current_snapshot,
1412 set_current_tenant, snapshot_bundle, with_snapshot_bundle, SnapshotBundle, SnapshotContext,
1413 };
1414}
1415
1416pub mod record_search_helpers {
1418 use crate::storage::query::UnifiedRecord;
1419 use crate::storage::UnifiedEntity;
1420 use std::collections::BTreeSet;
1421
1422 pub fn entity_type_and_capabilities(
1423 entity: &UnifiedEntity,
1424 ) -> (&'static str, BTreeSet<String>) {
1425 super::record_search::runtime_entity_type_and_capabilities(entity)
1426 }
1427
1428 pub fn any_record_from_entity(entity: UnifiedEntity) -> Option<UnifiedRecord> {
1433 super::record_search::runtime_any_record_from_entity(entity)
1434 }
1435}