1use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12 pub fn execute_graph_command(
14 &self,
15 raw_query: &str,
16 cmd: &GraphCommand,
17 ) -> RedDBResult<RuntimeQueryResult> {
18 match cmd {
19 GraphCommand::Neighborhood {
20 source,
21 depth,
22 direction,
23 edge_labels,
24 } => {
25 let dir = parse_direction(direction)?;
26 let res = self.graph_neighborhood(
27 source,
28 dir,
29 *depth as usize,
30 edge_labels.clone(),
31 None,
32 )?;
33 let mut result = UnifiedResult::with_columns(vec![
34 "node_id".into(),
35 "label".into(),
36 "node_type".into(),
37 "depth".into(),
38 ]);
39 for visit in &res.nodes {
40 let mut record = UnifiedRecord::new();
41 record.set("node_id", Value::text(visit.node.id.clone()));
42 record.set("label", Value::text(visit.node.label.clone()));
43 record.set("node_type", Value::text(visit.node.node_type.clone()));
44 record.set("depth", Value::Integer(visit.depth as i64));
45 result.push(record);
46 }
47 Ok(RuntimeQueryResult {
48 query: raw_query.to_string(),
49 mode: QueryMode::Sql,
50 statement: "graph_neighborhood",
51 engine: "runtime-graph",
52 result,
53 affected_rows: 0,
54 statement_type: "select",
55 bookmark: None,
56 })
57 }
58 GraphCommand::ShortestPath {
59 source,
60 target,
61 algorithm,
62 direction,
63 limit,
64 order_by,
65 } => {
66 let dir = parse_direction(direction)?;
67 let alg = parse_path_algorithm(algorithm)?;
68 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
69 let mut result = UnifiedResult::with_columns(vec![
70 "source".into(),
71 "target".into(),
72 "nodes_visited".into(),
73 "negative_cycle_detected".into(),
74 "path_found".into(),
75 "hop_count".into(),
76 "total_weight".into(),
77 ]);
78 let mut record = UnifiedRecord::new();
79 record.set("source", Value::text(res.source));
80 record.set("target", Value::text(res.target));
81 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
82 record.set(
83 "negative_cycle_detected",
84 match res.negative_cycle_detected {
85 Some(value) => Value::Boolean(value),
86 None => Value::Null,
87 },
88 );
89 if let Some(ref path) = res.path {
90 record.set("path_found", Value::Boolean(true));
91 record.set("hop_count", Value::Integer(path.hop_count as i64));
92 record.set("total_weight", Value::Float(path.total_weight));
93 } else {
94 record.set("path_found", Value::Boolean(false));
95 record.set("hop_count", Value::Null);
96 record.set("total_weight", Value::Null);
97 }
98 result.push(record);
99 apply_graph_order_and_limit(
100 &mut result,
101 "graph_shortest_path",
102 order_by.as_ref(),
103 limit.map(|n| n as usize),
104 )?;
105 Ok(RuntimeQueryResult {
106 query: raw_query.to_string(),
107 mode: QueryMode::Sql,
108 statement: "graph_shortest_path",
109 engine: "runtime-graph",
110 result,
111 affected_rows: 0,
112 statement_type: "select",
113 bookmark: None,
114 })
115 }
116 GraphCommand::Properties { source } => {
117 if let Some(node_ref) = source {
118 let graph =
122 materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
123 let resolved = resolve_graph_node_id(&graph, node_ref)?;
124 let stored = graph
125 .get_node(&resolved)
126 .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
127 let node_type = self
128 .inner
129 .db
130 .store()
131 .query_all(|entity| {
132 entity.id.raw().to_string() == resolved
133 && matches!(
134 entity.kind,
135 crate::storage::unified::EntityKind::GraphNode(_)
136 )
137 })
138 .into_iter()
139 .find_map(|(_, entity)| match entity.kind {
140 crate::storage::unified::EntityKind::GraphNode(node) => {
141 Some(node.node_type)
142 }
143 _ => None,
144 })
145 .unwrap_or_else(|| stored.node_type.clone());
146 let all_props =
147 materialize_graph_node_properties(self.inner.db.store().as_ref())?;
148 let props = all_props.get(&resolved).cloned().unwrap_or_default();
149
150 let mut prop_keys: Vec<&String> = props.keys().collect();
153 prop_keys.sort();
154 let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
155 columns.push("node_id".into());
156 columns.push("label".into());
157 columns.push("node_type".into());
158 for k in &prop_keys {
159 columns.push((*k).clone());
160 }
161 let mut result = UnifiedResult::with_columns(columns);
162 let mut record = UnifiedRecord::new();
163 record.set("node_id", Value::text(stored.id.clone()));
164 record.set("label", Value::text(stored.label.clone()));
165 record.set("node_type", Value::text(node_type));
166 for k in &prop_keys {
167 if let Some(v) = props.get(*k) {
168 record.set(k.as_str(), v.clone());
169 }
170 }
171 result.push(record);
172 return Ok(RuntimeQueryResult {
173 query: raw_query.to_string(),
174 mode: QueryMode::Sql,
175 statement: "graph_properties",
176 engine: "runtime-graph",
177 result,
178 affected_rows: 0,
179 statement_type: "select",
180 bookmark: None,
181 });
182 }
183 let res = self.graph_properties(None)?;
184 let mut result = UnifiedResult::with_columns(vec![
185 "node_count".into(),
186 "edge_count".into(),
187 "is_connected".into(),
188 "is_complete".into(),
189 "is_cyclic".into(),
190 "density".into(),
191 ]);
192 let mut record = UnifiedRecord::new();
193 record.set("node_count", Value::Integer(res.node_count as i64));
194 record.set("edge_count", Value::Integer(res.edge_count as i64));
195 record.set("is_connected", Value::Boolean(res.is_connected));
196 record.set("is_complete", Value::Boolean(res.is_complete));
197 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
198 record.set("density", Value::Float(res.density));
199 result.push(record);
200 Ok(RuntimeQueryResult {
201 query: raw_query.to_string(),
202 mode: QueryMode::Sql,
203 statement: "graph_properties",
204 engine: "runtime-graph",
205 result,
206 affected_rows: 0,
207 statement_type: "select",
208 bookmark: None,
209 })
210 }
211 GraphCommand::Traverse {
212 source,
213 strategy,
214 depth,
215 direction,
216 edge_labels,
217 } => {
218 let dir = parse_direction(direction)?;
219 let strat = parse_traversal_strategy(strategy)?;
220 let res = self.graph_traverse(
221 source,
222 dir,
223 *depth as usize,
224 strat,
225 edge_labels.clone(),
226 None,
227 )?;
228 let mut result = UnifiedResult::with_columns(vec![
229 "node_id".into(),
230 "label".into(),
231 "node_type".into(),
232 "depth".into(),
233 ]);
234 for visit in &res.visits {
235 let mut record = UnifiedRecord::new();
236 record.set("node_id", Value::text(visit.node.id.clone()));
237 record.set("label", Value::text(visit.node.label.clone()));
238 record.set("node_type", Value::text(visit.node.node_type.clone()));
239 record.set("depth", Value::Integer(visit.depth as i64));
240 result.push(record);
241 }
242 Ok(RuntimeQueryResult {
243 query: raw_query.to_string(),
244 mode: QueryMode::Sql,
245 statement: "graph_traverse",
246 engine: "runtime-graph",
247 result,
248 affected_rows: 0,
249 statement_type: "select",
250 bookmark: None,
251 })
252 }
253 GraphCommand::Centrality {
254 algorithm,
255 limit,
256 order_by,
257 } => {
258 let alg = parse_centrality_algorithm(algorithm)?;
259 let limit_usize = limit.map(|n| n as usize);
262 let order_needs_full_set = order_by
263 .as_ref()
264 .map(|order| order.ascending)
265 .unwrap_or(false);
266 let top_k = if order_needs_full_set {
267 usize::MAX
268 } else {
269 limit_usize.unwrap_or(100).max(1)
270 };
271 let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
272 let mut result = UnifiedResult::with_columns(vec![
273 "node_id".into(),
274 "label".into(),
275 "score".into(),
276 ]);
277 for score in &res.scores {
278 let mut record = UnifiedRecord::new();
279 record.set("node_id", Value::text(score.node.id.clone()));
280 record.set("label", Value::text(score.node.label.clone()));
281 record.set("score", Value::Float(score.score));
282 result.push(record);
283 }
284 for ds in &res.degree_scores {
285 let mut record = UnifiedRecord::new();
286 record.set("node_id", Value::text(ds.node.id.clone()));
287 record.set("label", Value::text(ds.node.label.clone()));
288 record.set("score", Value::Float(ds.total_degree as f64));
289 result.push(record);
290 }
291 apply_graph_order_and_limit(
292 &mut result,
293 "graph_centrality",
294 order_by.as_ref(),
295 Some(limit_usize.unwrap_or(100)),
296 )?;
297 Ok(RuntimeQueryResult {
298 query: raw_query.to_string(),
299 mode: QueryMode::Sql,
300 statement: "graph_centrality",
301 engine: "runtime-graph",
302 result,
303 affected_rows: 0,
304 statement_type: "select",
305 bookmark: None,
306 })
307 }
308 GraphCommand::Community {
309 algorithm,
310 max_iterations,
311 limit,
312 order_by,
313 return_assignments,
314 } => {
315 let alg = parse_community_algorithm(algorithm)?;
316 let res =
317 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
318 let result = if *return_assignments {
319 let mut result =
325 UnifiedResult::with_columns(vec!["node_id".into(), "community_id".into()]);
326 let row_cap = limit.map(|n| n as usize);
327 'outer: for community in &res.communities {
328 let mut nodes = community.nodes.clone();
329 nodes.sort();
330 for node_id in nodes {
331 if row_cap.is_some_and(|cap| result.records.len() >= cap) {
332 break 'outer;
333 }
334 let mut record = UnifiedRecord::new();
335 record.set("node_id", Value::text(node_id));
336 record.set("community_id", Value::text(community.id.clone()));
337 result.push(record);
338 }
339 }
340 result
341 } else {
342 let mut result =
343 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
344 for community in &res.communities {
345 let mut record = UnifiedRecord::new();
346 record.set("community_id", Value::text(community.id.clone()));
347 record.set("size", Value::Integer(community.size as i64));
348 result.push(record);
349 }
350 apply_graph_order_and_limit(
351 &mut result,
352 "graph_community",
353 order_by.as_ref(),
354 limit.map(|n| n as usize),
355 )?;
356 result
357 };
358 Ok(RuntimeQueryResult {
359 query: raw_query.to_string(),
360 mode: QueryMode::Sql,
361 statement: "graph_community",
362 engine: "runtime-graph",
363 result,
364 affected_rows: 0,
365 statement_type: "select",
366 bookmark: None,
367 })
368 }
369 GraphCommand::Components {
370 mode,
371 limit,
372 order_by,
373 } => {
374 let m = parse_components_mode(mode)?;
375 let res = self.graph_components(m, 1, None)?;
376 let mut result =
377 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
378 for component in &res.components {
379 let mut record = UnifiedRecord::new();
380 record.set("component_id", Value::text(component.id.clone()));
381 record.set("size", Value::Integer(component.size as i64));
382 result.push(record);
383 }
384 apply_graph_order_and_limit(
385 &mut result,
386 "graph_components",
387 order_by.as_ref(),
388 limit.map(|n| n as usize),
389 )?;
390 Ok(RuntimeQueryResult {
391 query: raw_query.to_string(),
392 mode: QueryMode::Sql,
393 statement: "graph_components",
394 engine: "runtime-graph",
395 result,
396 affected_rows: 0,
397 statement_type: "select",
398 bookmark: None,
399 })
400 }
401 GraphCommand::Cycles { max_length } => {
402 let res = self.graph_cycles(*max_length as usize, 100, None)?;
403 let mut result =
404 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
405 for (i, cycle) in res.cycles.iter().enumerate() {
406 let mut record = UnifiedRecord::new();
407 record.set("cycle_index", Value::Integer(i as i64));
408 record.set("length", Value::Integer(cycle.nodes.len() as i64));
409 result.push(record);
410 }
411 Ok(RuntimeQueryResult {
412 query: raw_query.to_string(),
413 mode: QueryMode::Sql,
414 statement: "graph_cycles",
415 engine: "runtime-graph",
416 result,
417 affected_rows: 0,
418 statement_type: "select",
419 bookmark: None,
420 })
421 }
422 GraphCommand::Clustering => {
423 let res = self.graph_clustering(100, true, None)?;
424 let mut result = UnifiedResult::with_columns(vec![
425 "node_id".into(),
426 "label".into(),
427 "score".into(),
428 ]);
429 let mut global_record = UnifiedRecord::new();
431 global_record.set("node_id", Value::text("__global__"));
432 global_record.set("label", Value::text("global_clustering"));
433 global_record.set("score", Value::Float(res.global));
434 result.push(global_record);
435 for score in &res.local {
436 let mut record = UnifiedRecord::new();
437 record.set("node_id", Value::text(score.node.id.clone()));
438 record.set("label", Value::text(score.node.label.clone()));
439 record.set("score", Value::Float(score.score));
440 result.push(record);
441 }
442 Ok(RuntimeQueryResult {
443 query: raw_query.to_string(),
444 mode: QueryMode::Sql,
445 statement: "graph_clustering",
446 engine: "runtime-graph",
447 result,
448 affected_rows: 0,
449 statement_type: "select",
450 bookmark: None,
451 })
452 }
453 GraphCommand::TopologicalSort => {
454 let res = self.graph_topological_sort(None)?;
455 let mut result = UnifiedResult::with_columns(vec![
456 "order".into(),
457 "node_id".into(),
458 "label".into(),
459 ]);
460 for (i, node) in res.ordered_nodes.iter().enumerate() {
461 let mut record = UnifiedRecord::new();
462 record.set("order", Value::Integer(i as i64));
463 record.set("node_id", Value::text(node.id.clone()));
464 record.set("label", Value::text(node.label.clone()));
465 result.push(record);
466 }
467 Ok(RuntimeQueryResult {
468 query: raw_query.to_string(),
469 mode: QueryMode::Sql,
470 statement: "graph_topological_sort",
471 engine: "runtime-graph",
472 result,
473 affected_rows: 0,
474 statement_type: "select",
475 bookmark: None,
476 })
477 }
478 }
479 }
480
481 pub fn execute_search_command(
483 &self,
484 raw_query: &str,
485 cmd: &SearchCommand,
486 ) -> RedDBResult<RuntimeQueryResult> {
487 match cmd {
488 SearchCommand::Similar {
489 vector,
490 text,
491 provider,
492 collection,
493 limit,
494 min_score,
495 vector_param,
496 limit_param,
497 min_score_param,
498 text_param,
499 } => {
500 if vector_param.is_some() {
501 return Err(RedDBError::Query(
502 "SEARCH SIMILAR $N vector parameter was not bound before execution"
503 .to_string(),
504 ));
505 }
506 if limit_param.is_some() {
507 return Err(RedDBError::Query(
508 "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
509 .to_string(),
510 ));
511 }
512 if min_score_param.is_some() {
513 return Err(RedDBError::Query(
514 "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
515 .to_string(),
516 ));
517 }
518 if text_param.is_some() {
519 return Err(RedDBError::Query(
520 "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
521 .to_string(),
522 ));
523 }
524 let search_vector = if let Some(query_text) = text {
526 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
527 let provider = match provider.as_deref() {
528 Some(p) => crate::ai::parse_provider(p)?,
529 None => default_provider,
530 };
531 crate::runtime::ai::provider_gate::enforce(self, &provider)?;
535 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
536 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
537 .ok()
538 .unwrap_or_else(|| provider.default_embedding_model().to_string());
539 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
540 let request = crate::ai::OpenAiEmbeddingRequest {
541 api_key,
542 model,
543 inputs: vec![query_text.clone()],
544 dimensions: None,
545 api_base: provider.resolve_api_base(),
546 };
547 let response = crate::runtime::ai::block_on_ai(async move {
548 crate::ai::openai_embeddings_async(&transport, request).await
549 })
550 .and_then(|result| result)?;
551 response.embeddings.into_iter().next().ok_or_else(|| {
552 RedDBError::Query("embedding API returned no vectors".to_string())
553 })?
554 } else {
555 vector.clone()
556 };
557 let scope = self.ai_scope();
561 let results =
562 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
563 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
564 self,
565 &scope,
566 collection,
567 &search_vector,
568 *limit,
569 *min_score,
570 )?
571 } else {
572 self.search_similar(collection, &search_vector, *limit, *min_score)?
574 };
575 let mut result =
576 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
577 for sr in &results {
578 let mut record = UnifiedRecord::new();
579 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
580 record.set("score", Value::Float(sr.score as f64));
581 result.push(record);
582 }
583 Ok(RuntimeQueryResult {
584 query: raw_query.to_string(),
585 mode: QueryMode::Sql,
586 statement: "search_similar",
587 engine: "runtime-search",
588 result,
589 affected_rows: 0,
590 statement_type: "select",
591 bookmark: None,
592 })
593 }
594 SearchCommand::Text {
595 query,
596 collection,
597 limit,
598 fuzzy,
599 limit_param,
600 } => {
601 if limit_param.is_some() {
602 return Err(RedDBError::Query(
603 "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
604 ));
605 }
606 let collections = collection.as_ref().map(|c| vec![c.clone()]);
607 let scope = self.ai_scope();
609 let res =
610 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
611 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
612 self,
613 &scope,
614 query.clone(),
615 collections,
616 None,
617 None,
618 None,
619 Some(*limit),
620 *fuzzy,
621 )?
622 } else {
623 self.search_text(
624 query.clone(),
625 collections,
626 None,
627 None,
628 None,
629 Some(*limit),
630 *fuzzy,
631 )?
632 };
633 let mut result =
634 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
635 for item in &res.matches {
636 let mut record = UnifiedRecord::new();
637 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
638 record.set("score", Value::Float(item.score as f64));
639 result.push(record);
640 }
641 Ok(RuntimeQueryResult {
642 query: raw_query.to_string(),
643 mode: QueryMode::Sql,
644 statement: "search_text",
645 engine: "runtime-search",
646 result,
647 affected_rows: 0,
648 statement_type: "select",
649 bookmark: None,
650 })
651 }
652 SearchCommand::Hybrid {
653 vector,
654 query,
655 collection,
656 limit,
657 limit_param,
658 } => {
659 if limit_param.is_some() {
660 return Err(RedDBError::Query(
661 "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
662 .to_string(),
663 ));
664 }
665 let res = self.search_hybrid(
666 vector.clone(),
667 query.clone(),
668 Some(*limit),
669 Some(vec![collection.clone()]),
670 None,
671 None,
672 None,
673 Vec::new(),
674 None,
675 None,
676 Some(*limit),
677 )?;
678 let mut result =
679 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
680 for item in &res.matches {
681 let mut record = UnifiedRecord::new();
682 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
683 record.set("score", Value::Float(item.score as f64));
684 result.push(record);
685 }
686 Ok(RuntimeQueryResult {
687 query: raw_query.to_string(),
688 mode: QueryMode::Sql,
689 statement: "search_hybrid",
690 engine: "runtime-search",
691 result,
692 affected_rows: 0,
693 statement_type: "select",
694 bookmark: None,
695 })
696 }
697 SearchCommand::Multimodal {
698 query,
699 collection,
700 limit,
701 limit_param,
702 } => {
703 if limit_param.is_some() {
704 return Err(RedDBError::Query(
705 "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
706 .to_string(),
707 ));
708 }
709 let collections = collection.as_ref().map(|c| vec![c.clone()]);
710 let res =
711 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
712 let mut result =
713 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
714 for item in &res.matches {
715 let mut record = UnifiedRecord::new();
716 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
717 record.set("score", Value::Float(item.score as f64));
718 result.push(record);
719 }
720 Ok(RuntimeQueryResult {
721 query: raw_query.to_string(),
722 mode: QueryMode::Sql,
723 statement: "search_multimodal",
724 engine: "runtime-search",
725 result,
726 affected_rows: 0,
727 statement_type: "select",
728 bookmark: None,
729 })
730 }
731 SearchCommand::Index {
732 index,
733 value,
734 collection,
735 limit,
736 exact,
737 limit_param,
738 } => {
739 if limit_param.is_some() {
740 return Err(RedDBError::Query(
741 "SEARCH INDEX LIMIT $N parameter was not bound before execution"
742 .to_string(),
743 ));
744 }
745 let collections = collection.as_ref().map(|c| vec![c.clone()]);
746 let res = self.search_index(
747 index.clone(),
748 value.clone(),
749 *exact,
750 collections,
751 None,
752 None,
753 Some(*limit),
754 )?;
755 let mut result =
756 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
757 for item in &res.matches {
758 let mut record = UnifiedRecord::new();
759 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
760 record.set("score", Value::Float(item.score as f64));
761 result.push(record);
762 }
763 Ok(RuntimeQueryResult {
764 query: raw_query.to_string(),
765 mode: QueryMode::Sql,
766 statement: "search_index",
767 engine: "runtime-search",
768 result,
769 affected_rows: 0,
770 statement_type: "select",
771 bookmark: None,
772 })
773 }
774 SearchCommand::Context {
775 query,
776 field,
777 collection,
778 limit,
779 depth,
780 limit_param,
781 } => {
782 if limit_param.is_some() {
783 return Err(RedDBError::Query(
784 "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
785 .to_string(),
786 ));
787 }
788 use crate::application::SearchContextInput;
789 let input = SearchContextInput {
793 query: query.clone(),
794 field: field.clone(),
795 vector: None,
796 collections: collection.as_ref().map(|c| vec![c.clone()]),
797 graph_depth: Some(*depth),
798 graph_max_edges: None,
799 max_cross_refs: None,
800 follow_cross_refs: None,
801 expand_graph: None,
802 global_scan: None,
803 reindex: None,
804 limit: Some(*limit),
805 min_score: None,
806 };
807 let scope = self.ai_scope();
808 let res =
809 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
810 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
811 self, &scope, input,
812 )?
813 } else {
814 self.search_context(input)?
815 };
816 let mut result = UnifiedResult::with_columns(vec![
817 "entity_id".into(),
818 "collection".into(),
819 "score".into(),
820 "discovery".into(),
821 "kind".into(),
822 ]);
823 let all_entities = res
824 .tables
825 .iter()
826 .map(|e| (e, "table"))
827 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
828 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
829 .chain(res.vectors.iter().map(|e| (e, "vector")))
830 .chain(res.documents.iter().map(|e| (e, "document")))
831 .chain(res.key_values.iter().map(|e| (e, "kv")));
832 for (entity, kind) in all_entities {
833 let mut record = UnifiedRecord::new();
834 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
835 record.set("collection", Value::text(entity.collection.clone()));
836 record.set("score", Value::Float(entity.score as f64));
837 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
838 record.set("kind", Value::text(kind.to_string()));
839 result.push(record);
840 }
841 Ok(RuntimeQueryResult {
842 query: raw_query.to_string(),
843 mode: QueryMode::Sql,
844 statement: "search_context",
845 engine: "runtime-context",
846 result,
847 affected_rows: 0,
848 statement_type: "select",
849 bookmark: None,
850 })
851 }
852 SearchCommand::SpatialRadius {
853 center_lat,
854 center_lon,
855 radius_km,
856 collection,
857 column,
858 limit,
859 limit_param,
860 } => {
861 if limit_param.is_some() {
862 return Err(RedDBError::Query(
863 "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
864 .to_string(),
865 ));
866 }
867 use crate::storage::unified::spatial_index::haversine_km;
868 let _ = column; let store = self.inner.db.store();
870 let entities = store
871 .get_collection(collection)
872 .map(|m| m.query_all(|_| true))
873 .unwrap_or_default();
874
875 let mut hits: Vec<(u64, f64)> = Vec::new();
876 for entity in &entities {
877 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
879 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
880 if dist <= *radius_km {
881 hits.push((entity.id.raw(), dist));
882 }
883 }
884 }
885 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
886 hits.truncate(*limit);
887
888 let mut result =
889 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
890 for (id, dist) in &hits {
891 let mut record = UnifiedRecord::new();
892 record.set("entity_id", Value::UnsignedInteger(*id));
893 record.set("distance_km", Value::Float(*dist));
894 result.push(record);
895 }
896 Ok(RuntimeQueryResult {
897 query: raw_query.to_string(),
898 mode: QueryMode::Sql,
899 statement: "search_spatial_radius",
900 engine: "runtime-spatial",
901 result,
902 affected_rows: 0,
903 statement_type: "select",
904 bookmark: None,
905 })
906 }
907 SearchCommand::SpatialBbox {
908 min_lat,
909 min_lon,
910 max_lat,
911 max_lon,
912 collection,
913 column,
914 limit,
915 limit_param,
916 } => {
917 if limit_param.is_some() {
918 return Err(RedDBError::Query(
919 "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
920 .to_string(),
921 ));
922 }
923 let _ = column;
924 let store = self.inner.db.store();
925 let entities = store
926 .get_collection(collection)
927 .map(|m| m.query_all(|_| true))
928 .unwrap_or_default();
929
930 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
931 let mut count = 0;
932 for entity in &entities {
933 if count >= *limit {
934 break;
935 }
936 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
937 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
938 {
939 let mut record = UnifiedRecord::new();
940 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
941 result.push(record);
942 count += 1;
943 }
944 }
945 }
946 Ok(RuntimeQueryResult {
947 query: raw_query.to_string(),
948 mode: QueryMode::Sql,
949 statement: "search_spatial_bbox",
950 engine: "runtime-spatial",
951 result,
952 affected_rows: 0,
953 statement_type: "select",
954 bookmark: None,
955 })
956 }
957 SearchCommand::SpatialNearest {
958 lat,
959 lon,
960 k,
961 collection,
962 column,
963 k_param,
964 } => {
965 if k_param.is_some() {
966 return Err(RedDBError::Query(
967 "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
968 .to_string(),
969 ));
970 }
971 use crate::storage::unified::spatial_index::haversine_km;
972 let _ = column;
973 let store = self.inner.db.store();
974 let entities = store
975 .get_collection(collection)
976 .map(|m| m.query_all(|_| true))
977 .unwrap_or_default();
978
979 let mut hits: Vec<(u64, f64)> = Vec::new();
980 for entity in &entities {
981 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
982 let dist = haversine_km(*lat, *lon, elat, elon);
983 hits.push((entity.id.raw(), dist));
984 }
985 }
986 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
987 hits.truncate(*k);
988
989 let mut result =
990 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
991 for (id, dist) in &hits {
992 let mut record = UnifiedRecord::new();
993 record.set("entity_id", Value::UnsignedInteger(*id));
994 record.set("distance_km", Value::Float(*dist));
995 result.push(record);
996 }
997 Ok(RuntimeQueryResult {
998 query: raw_query.to_string(),
999 mode: QueryMode::Sql,
1000 statement: "search_spatial_nearest",
1001 engine: "runtime-spatial",
1002 result,
1003 affected_rows: 0,
1004 statement_type: "select",
1005 bookmark: None,
1006 })
1007 }
1008 }
1009 }
1010}
1011
1012fn apply_graph_order_and_limit(
1013 result: &mut UnifiedResult,
1014 statement: &str,
1015 order_by: Option<&GraphCommandOrderBy>,
1016 limit: Option<usize>,
1017) -> RedDBResult<()> {
1018 if let Some(order) = order_by {
1019 let column = graph_order_metric_column(statement, &order.metric)?;
1020 let columns = result.columns.clone();
1021 result.records.sort_by(|left, right| {
1022 let cmp = compare_graph_values(left.get(column), right.get(column));
1023 let cmp = if order.ascending { cmp } else { cmp.reverse() };
1024 if cmp == Ordering::Equal {
1025 compare_graph_rows(left, right, &columns)
1026 } else {
1027 cmp
1028 }
1029 });
1030 }
1031 if let Some(limit) = limit {
1032 result.records.truncate(limit);
1033 }
1034 Ok(())
1035}
1036
1037fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
1038 let metric = metric.to_ascii_lowercase();
1039 match (statement, metric.as_str()) {
1040 ("graph_centrality", "score" | "centrality_score") => Ok("score"),
1041 ("graph_community", "size" | "community_size") => Ok("size"),
1042 ("graph_components", "size" | "component_size") => Ok("size"),
1043 ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
1044 Ok(match metric.as_str() {
1045 "total_weight" => "total_weight",
1046 "nodes_visited" => "nodes_visited",
1047 _ => "hop_count",
1048 })
1049 }
1050 _ => Err(RedDBError::Query(format!(
1051 "unsupported ORDER BY metric '{metric}' for GRAPH {}",
1052 statement.trim_start_matches("graph_")
1053 ))),
1054 }
1055}
1056
1057fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1058 for column in columns {
1059 let cmp = compare_graph_values(left.get(column), right.get(column));
1060 if cmp != Ordering::Equal {
1061 return cmp;
1062 }
1063 }
1064 Ordering::Equal
1065}
1066
1067fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1068 match (left, right) {
1069 (None, None) => Ordering::Equal,
1070 (None, Some(_)) => Ordering::Less,
1071 (Some(_), None) => Ordering::Greater,
1072 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1073 (Some(Value::Null), Some(_)) => Ordering::Less,
1074 (Some(_), Some(Value::Null)) => Ordering::Greater,
1075 (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1076 (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1077 left.cmp(right)
1078 }
1079 (Some(Value::Float(left)), Some(Value::Float(right))) => {
1080 left.partial_cmp(right).unwrap_or(Ordering::Equal)
1081 }
1082 (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1083 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1084 }
1085 (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1086 .partial_cmp(&(*right as f64))
1087 .unwrap_or(Ordering::Equal),
1088 (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1089 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1090 }
1091 (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1092 .partial_cmp(&(*right as f64))
1093 .unwrap_or(Ordering::Equal),
1094 (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1095 (*left as i128).cmp(&(*right as i128))
1096 }
1097 (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1098 (*left as i128).cmp(&(*right as i128))
1099 }
1100 (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1101 (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1102 (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1103 (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1104 }
1105}
1106
1107fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1112 match s.to_lowercase().as_str() {
1113 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1114 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1115 "both" | "any" => Ok(RuntimeGraphDirection::Both),
1116 _ => Err(RedDBError::Query(format!(
1117 "unknown direction: '{s}', expected outgoing|incoming|both"
1118 ))),
1119 }
1120}
1121
1122fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1123 match s.to_lowercase().as_str() {
1124 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1125 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1126 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1127 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1128 _ => Err(RedDBError::Query(format!(
1129 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1130 ))),
1131 }
1132}
1133
1134fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1135 match s.to_lowercase().as_str() {
1136 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1137 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1138 _ => Err(RedDBError::Query(format!(
1139 "unknown traversal strategy: '{s}', expected bfs|dfs"
1140 ))),
1141 }
1142}
1143
1144fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1145 match s.to_lowercase().as_str() {
1146 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1147 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1148 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1149 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1150 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1151 _ => Err(RedDBError::Query(format!(
1152 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1153 ))),
1154 }
1155}
1156
1157fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1158 match s.to_lowercase().as_str() {
1159 "label_propagation" | "labelpropagation" => {
1160 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1161 }
1162 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1163 _ => Err(RedDBError::Query(format!(
1164 "unknown community algorithm: '{s}', expected label_propagation|louvain"
1165 ))),
1166 }
1167}
1168
1169fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1170 match s.to_lowercase().as_str() {
1171 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1172 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1173 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1174 _ => Err(RedDBError::Query(format!(
1175 "unknown components mode: '{s}', expected connected|weak|strong"
1176 ))),
1177 }
1178}
1179
1180fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1185 match &entity.data {
1186 EntityData::Row(row) => {
1187 if let Some(ref named) = row.named {
1189 for value in named.values() {
1191 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1192 return Some((
1193 *lat_micro as f64 / 1_000_000.0,
1194 *lon_micro as f64 / 1_000_000.0,
1195 ));
1196 }
1197 }
1198 let lat =
1200 named
1201 .get("lat")
1202 .or_else(|| named.get("latitude"))
1203 .and_then(|v| match v {
1204 Value::Float(f) => Some(*f),
1205 Value::Integer(i) => Some(*i as f64),
1206 _ => None,
1207 });
1208 let lon = named
1209 .get("lon")
1210 .or_else(|| named.get("lng"))
1211 .or_else(|| named.get("longitude"))
1212 .and_then(|v| match v {
1213 Value::Float(f) => Some(*f),
1214 Value::Integer(i) => Some(*i as f64),
1215 _ => None,
1216 });
1217 if let (Some(la), Some(lo)) = (lat, lon) {
1218 return Some((la, lo));
1219 }
1220 }
1221 for value in &row.columns {
1223 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1224 return Some((
1225 *lat_micro as f64 / 1_000_000.0,
1226 *lon_micro as f64 / 1_000_000.0,
1227 ));
1228 }
1229 }
1230 None
1231 }
1232 EntityData::Node(node) => {
1233 for value in node.properties.values() {
1235 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1236 return Some((
1237 *lat_micro as f64 / 1_000_000.0,
1238 *lon_micro as f64 / 1_000_000.0,
1239 ));
1240 }
1241 }
1242 let lat = node
1243 .properties
1244 .get("lat")
1245 .or_else(|| node.properties.get("latitude"))
1246 .and_then(|v| match v {
1247 Value::Float(f) => Some(*f),
1248 Value::Integer(i) => Some(*i as f64),
1249 _ => None,
1250 });
1251 let lon = node
1252 .properties
1253 .get("lon")
1254 .or_else(|| node.properties.get("lng"))
1255 .or_else(|| node.properties.get("longitude"))
1256 .and_then(|v| match v {
1257 Value::Float(f) => Some(*f),
1258 Value::Integer(i) => Some(*i as f64),
1259 _ => None,
1260 });
1261 if let (Some(la), Some(lo)) = (lat, lon) {
1262 return Some((la, lo));
1263 }
1264 None
1265 }
1266 _ => None,
1267 }
1268}