1use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12 pub fn execute_graph_command(
14 &self,
15 raw_query: &str,
16 cmd: &GraphCommand,
17 ) -> RedDBResult<RuntimeQueryResult> {
18 match cmd {
19 GraphCommand::Neighborhood {
20 source,
21 depth,
22 direction,
23 edge_labels,
24 } => {
25 let dir = parse_direction(direction)?;
26 let res = self.graph_neighborhood(
27 source,
28 dir,
29 *depth as usize,
30 edge_labels.clone(),
31 None,
32 )?;
33 let mut result = UnifiedResult::with_columns(vec![
34 "node_id".into(),
35 "label".into(),
36 "node_type".into(),
37 "depth".into(),
38 ]);
39 for visit in &res.nodes {
40 let mut record = UnifiedRecord::new();
41 record.set("node_id", Value::text(visit.node.id.clone()));
42 record.set("label", Value::text(visit.node.label.clone()));
43 record.set("node_type", Value::text(visit.node.node_type.clone()));
44 record.set("depth", Value::Integer(visit.depth as i64));
45 result.push(record);
46 }
47 Ok(RuntimeQueryResult {
48 query: raw_query.to_string(),
49 mode: QueryMode::Sql,
50 statement: "graph_neighborhood",
51 engine: "runtime-graph",
52 result,
53 affected_rows: 0,
54 statement_type: "select",
55 })
56 }
57 GraphCommand::ShortestPath {
58 source,
59 target,
60 algorithm,
61 direction,
62 limit,
63 order_by,
64 } => {
65 let dir = parse_direction(direction)?;
66 let alg = parse_path_algorithm(algorithm)?;
67 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68 let mut result = UnifiedResult::with_columns(vec![
69 "source".into(),
70 "target".into(),
71 "nodes_visited".into(),
72 "negative_cycle_detected".into(),
73 "hop_count".into(),
74 "total_weight".into(),
75 ]);
76 let mut record = UnifiedRecord::new();
77 record.set("source", Value::text(res.source));
78 record.set("target", Value::text(res.target));
79 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80 record.set(
81 "negative_cycle_detected",
82 match res.negative_cycle_detected {
83 Some(value) => Value::Boolean(value),
84 None => Value::Null,
85 },
86 );
87 if let Some(ref path) = res.path {
88 record.set("hop_count", Value::Integer(path.hop_count as i64));
89 record.set("total_weight", Value::Float(path.total_weight));
90 } else {
91 record.set("hop_count", Value::Null);
92 record.set("total_weight", Value::Null);
93 }
94 result.push(record);
95 apply_graph_order_and_limit(
96 &mut result,
97 "graph_shortest_path",
98 order_by.as_ref(),
99 limit.map(|n| n as usize),
100 )?;
101 Ok(RuntimeQueryResult {
102 query: raw_query.to_string(),
103 mode: QueryMode::Sql,
104 statement: "graph_shortest_path",
105 engine: "runtime-graph",
106 result,
107 affected_rows: 0,
108 statement_type: "select",
109 })
110 }
111 GraphCommand::Properties { source } => {
112 if let Some(node_ref) = source {
113 let graph =
117 materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
118 let resolved = resolve_graph_node_id(&graph, node_ref)?;
119 let stored = graph
120 .get_node(&resolved)
121 .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
122 let node_type = self
123 .inner
124 .db
125 .store()
126 .query_all(|entity| {
127 entity.id.raw().to_string() == resolved
128 && matches!(
129 entity.kind,
130 crate::storage::unified::EntityKind::GraphNode(_)
131 )
132 })
133 .into_iter()
134 .find_map(|(_, entity)| match entity.kind {
135 crate::storage::unified::EntityKind::GraphNode(node) => {
136 Some(node.node_type)
137 }
138 _ => None,
139 })
140 .unwrap_or_else(|| stored.node_type.clone());
141 let all_props =
142 materialize_graph_node_properties(self.inner.db.store().as_ref())?;
143 let props = all_props.get(&resolved).cloned().unwrap_or_default();
144
145 let mut prop_keys: Vec<&String> = props.keys().collect();
148 prop_keys.sort();
149 let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
150 columns.push("node_id".into());
151 columns.push("label".into());
152 columns.push("node_type".into());
153 for k in &prop_keys {
154 columns.push((*k).clone());
155 }
156 let mut result = UnifiedResult::with_columns(columns);
157 let mut record = UnifiedRecord::new();
158 record.set("node_id", Value::text(stored.id.clone()));
159 record.set("label", Value::text(stored.label.clone()));
160 record.set("node_type", Value::text(node_type));
161 for k in &prop_keys {
162 if let Some(v) = props.get(*k) {
163 record.set(k.as_str(), v.clone());
164 }
165 }
166 result.push(record);
167 return Ok(RuntimeQueryResult {
168 query: raw_query.to_string(),
169 mode: QueryMode::Sql,
170 statement: "graph_properties",
171 engine: "runtime-graph",
172 result,
173 affected_rows: 0,
174 statement_type: "select",
175 });
176 }
177 let res = self.graph_properties(None)?;
178 let mut result = UnifiedResult::with_columns(vec![
179 "node_count".into(),
180 "edge_count".into(),
181 "is_connected".into(),
182 "is_complete".into(),
183 "is_cyclic".into(),
184 "density".into(),
185 ]);
186 let mut record = UnifiedRecord::new();
187 record.set("node_count", Value::Integer(res.node_count as i64));
188 record.set("edge_count", Value::Integer(res.edge_count as i64));
189 record.set("is_connected", Value::Boolean(res.is_connected));
190 record.set("is_complete", Value::Boolean(res.is_complete));
191 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
192 record.set("density", Value::Float(res.density));
193 result.push(record);
194 Ok(RuntimeQueryResult {
195 query: raw_query.to_string(),
196 mode: QueryMode::Sql,
197 statement: "graph_properties",
198 engine: "runtime-graph",
199 result,
200 affected_rows: 0,
201 statement_type: "select",
202 })
203 }
204 GraphCommand::Traverse {
205 source,
206 strategy,
207 depth,
208 direction,
209 edge_labels,
210 } => {
211 let dir = parse_direction(direction)?;
212 let strat = parse_traversal_strategy(strategy)?;
213 let res = self.graph_traverse(
214 source,
215 dir,
216 *depth as usize,
217 strat,
218 edge_labels.clone(),
219 None,
220 )?;
221 let mut result = UnifiedResult::with_columns(vec![
222 "node_id".into(),
223 "label".into(),
224 "node_type".into(),
225 "depth".into(),
226 ]);
227 for visit in &res.visits {
228 let mut record = UnifiedRecord::new();
229 record.set("node_id", Value::text(visit.node.id.clone()));
230 record.set("label", Value::text(visit.node.label.clone()));
231 record.set("node_type", Value::text(visit.node.node_type.clone()));
232 record.set("depth", Value::Integer(visit.depth as i64));
233 result.push(record);
234 }
235 Ok(RuntimeQueryResult {
236 query: raw_query.to_string(),
237 mode: QueryMode::Sql,
238 statement: "graph_traverse",
239 engine: "runtime-graph",
240 result,
241 affected_rows: 0,
242 statement_type: "select",
243 })
244 }
245 GraphCommand::Centrality {
246 algorithm,
247 limit,
248 order_by,
249 } => {
250 let alg = parse_centrality_algorithm(algorithm)?;
251 let limit_usize = limit.map(|n| n as usize);
254 let order_needs_full_set = order_by
255 .as_ref()
256 .map(|order| order.ascending)
257 .unwrap_or(false);
258 let top_k = if order_needs_full_set {
259 usize::MAX
260 } else {
261 limit_usize.unwrap_or(100).max(1)
262 };
263 let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
264 let mut result = UnifiedResult::with_columns(vec![
265 "node_id".into(),
266 "label".into(),
267 "score".into(),
268 ]);
269 for score in &res.scores {
270 let mut record = UnifiedRecord::new();
271 record.set("node_id", Value::text(score.node.id.clone()));
272 record.set("label", Value::text(score.node.label.clone()));
273 record.set("score", Value::Float(score.score));
274 result.push(record);
275 }
276 for ds in &res.degree_scores {
277 let mut record = UnifiedRecord::new();
278 record.set("node_id", Value::text(ds.node.id.clone()));
279 record.set("label", Value::text(ds.node.label.clone()));
280 record.set("score", Value::Float(ds.total_degree as f64));
281 result.push(record);
282 }
283 apply_graph_order_and_limit(
284 &mut result,
285 "graph_centrality",
286 order_by.as_ref(),
287 Some(limit_usize.unwrap_or(100)),
288 )?;
289 Ok(RuntimeQueryResult {
290 query: raw_query.to_string(),
291 mode: QueryMode::Sql,
292 statement: "graph_centrality",
293 engine: "runtime-graph",
294 result,
295 affected_rows: 0,
296 statement_type: "select",
297 })
298 }
299 GraphCommand::Community {
300 algorithm,
301 max_iterations,
302 limit,
303 order_by,
304 return_assignments,
305 } => {
306 let alg = parse_community_algorithm(algorithm)?;
307 let res =
308 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
309 let result = if *return_assignments {
310 let mut result =
316 UnifiedResult::with_columns(vec!["node_id".into(), "community_id".into()]);
317 let row_cap = limit.map(|n| n as usize);
318 'outer: for community in &res.communities {
319 let mut nodes = community.nodes.clone();
320 nodes.sort();
321 for node_id in nodes {
322 if row_cap.is_some_and(|cap| result.records.len() >= cap) {
323 break 'outer;
324 }
325 let mut record = UnifiedRecord::new();
326 record.set("node_id", Value::text(node_id));
327 record.set("community_id", Value::text(community.id.clone()));
328 result.push(record);
329 }
330 }
331 result
332 } else {
333 let mut result =
334 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
335 for community in &res.communities {
336 let mut record = UnifiedRecord::new();
337 record.set("community_id", Value::text(community.id.clone()));
338 record.set("size", Value::Integer(community.size as i64));
339 result.push(record);
340 }
341 apply_graph_order_and_limit(
342 &mut result,
343 "graph_community",
344 order_by.as_ref(),
345 limit.map(|n| n as usize),
346 )?;
347 result
348 };
349 Ok(RuntimeQueryResult {
350 query: raw_query.to_string(),
351 mode: QueryMode::Sql,
352 statement: "graph_community",
353 engine: "runtime-graph",
354 result,
355 affected_rows: 0,
356 statement_type: "select",
357 })
358 }
359 GraphCommand::Components {
360 mode,
361 limit,
362 order_by,
363 } => {
364 let m = parse_components_mode(mode)?;
365 let res = self.graph_components(m, 1, None)?;
366 let mut result =
367 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
368 for component in &res.components {
369 let mut record = UnifiedRecord::new();
370 record.set("component_id", Value::text(component.id.clone()));
371 record.set("size", Value::Integer(component.size as i64));
372 result.push(record);
373 }
374 apply_graph_order_and_limit(
375 &mut result,
376 "graph_components",
377 order_by.as_ref(),
378 limit.map(|n| n as usize),
379 )?;
380 Ok(RuntimeQueryResult {
381 query: raw_query.to_string(),
382 mode: QueryMode::Sql,
383 statement: "graph_components",
384 engine: "runtime-graph",
385 result,
386 affected_rows: 0,
387 statement_type: "select",
388 })
389 }
390 GraphCommand::Cycles { max_length } => {
391 let res = self.graph_cycles(*max_length as usize, 100, None)?;
392 let mut result =
393 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
394 for (i, cycle) in res.cycles.iter().enumerate() {
395 let mut record = UnifiedRecord::new();
396 record.set("cycle_index", Value::Integer(i as i64));
397 record.set("length", Value::Integer(cycle.nodes.len() as i64));
398 result.push(record);
399 }
400 Ok(RuntimeQueryResult {
401 query: raw_query.to_string(),
402 mode: QueryMode::Sql,
403 statement: "graph_cycles",
404 engine: "runtime-graph",
405 result,
406 affected_rows: 0,
407 statement_type: "select",
408 })
409 }
410 GraphCommand::Clustering => {
411 let res = self.graph_clustering(100, true, None)?;
412 let mut result = UnifiedResult::with_columns(vec![
413 "node_id".into(),
414 "label".into(),
415 "score".into(),
416 ]);
417 let mut global_record = UnifiedRecord::new();
419 global_record.set("node_id", Value::text("__global__"));
420 global_record.set("label", Value::text("global_clustering"));
421 global_record.set("score", Value::Float(res.global));
422 result.push(global_record);
423 for score in &res.local {
424 let mut record = UnifiedRecord::new();
425 record.set("node_id", Value::text(score.node.id.clone()));
426 record.set("label", Value::text(score.node.label.clone()));
427 record.set("score", Value::Float(score.score));
428 result.push(record);
429 }
430 Ok(RuntimeQueryResult {
431 query: raw_query.to_string(),
432 mode: QueryMode::Sql,
433 statement: "graph_clustering",
434 engine: "runtime-graph",
435 result,
436 affected_rows: 0,
437 statement_type: "select",
438 })
439 }
440 GraphCommand::TopologicalSort => {
441 let res = self.graph_topological_sort(None)?;
442 let mut result = UnifiedResult::with_columns(vec![
443 "order".into(),
444 "node_id".into(),
445 "label".into(),
446 ]);
447 for (i, node) in res.ordered_nodes.iter().enumerate() {
448 let mut record = UnifiedRecord::new();
449 record.set("order", Value::Integer(i as i64));
450 record.set("node_id", Value::text(node.id.clone()));
451 record.set("label", Value::text(node.label.clone()));
452 result.push(record);
453 }
454 Ok(RuntimeQueryResult {
455 query: raw_query.to_string(),
456 mode: QueryMode::Sql,
457 statement: "graph_topological_sort",
458 engine: "runtime-graph",
459 result,
460 affected_rows: 0,
461 statement_type: "select",
462 })
463 }
464 }
465 }
466
467 pub fn execute_search_command(
469 &self,
470 raw_query: &str,
471 cmd: &SearchCommand,
472 ) -> RedDBResult<RuntimeQueryResult> {
473 match cmd {
474 SearchCommand::Similar {
475 vector,
476 text,
477 provider,
478 collection,
479 limit,
480 min_score,
481 vector_param,
482 limit_param,
483 min_score_param,
484 text_param,
485 } => {
486 if vector_param.is_some() {
487 return Err(RedDBError::Query(
488 "SEARCH SIMILAR $N vector parameter was not bound before execution"
489 .to_string(),
490 ));
491 }
492 if limit_param.is_some() {
493 return Err(RedDBError::Query(
494 "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
495 .to_string(),
496 ));
497 }
498 if min_score_param.is_some() {
499 return Err(RedDBError::Query(
500 "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
501 .to_string(),
502 ));
503 }
504 if text_param.is_some() {
505 return Err(RedDBError::Query(
506 "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
507 .to_string(),
508 ));
509 }
510 let search_vector = if let Some(query_text) = text {
512 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
513 let provider = match provider.as_deref() {
514 Some(p) => crate::ai::parse_provider(p)?,
515 None => default_provider,
516 };
517 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
518 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
519 .ok()
520 .unwrap_or_else(|| provider.default_embedding_model().to_string());
521 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
522 let request = crate::ai::OpenAiEmbeddingRequest {
523 api_key,
524 model,
525 inputs: vec![query_text.clone()],
526 dimensions: None,
527 api_base: provider.resolve_api_base(),
528 };
529 let response = crate::runtime::ai::block_on_ai(async move {
530 crate::ai::openai_embeddings_async(&transport, request).await
531 })
532 .and_then(|result| result)?;
533 response.embeddings.into_iter().next().ok_or_else(|| {
534 RedDBError::Query("embedding API returned no vectors".to_string())
535 })?
536 } else {
537 vector.clone()
538 };
539 let scope = self.ai_scope();
543 let results =
544 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
545 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
546 self,
547 &scope,
548 collection,
549 &search_vector,
550 *limit,
551 *min_score,
552 )?
553 } else {
554 self.search_similar(collection, &search_vector, *limit, *min_score)?
556 };
557 let mut result =
558 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
559 for sr in &results {
560 let mut record = UnifiedRecord::new();
561 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
562 record.set("score", Value::Float(sr.score as f64));
563 result.push(record);
564 }
565 Ok(RuntimeQueryResult {
566 query: raw_query.to_string(),
567 mode: QueryMode::Sql,
568 statement: "search_similar",
569 engine: "runtime-search",
570 result,
571 affected_rows: 0,
572 statement_type: "select",
573 })
574 }
575 SearchCommand::Text {
576 query,
577 collection,
578 limit,
579 fuzzy,
580 limit_param,
581 } => {
582 if limit_param.is_some() {
583 return Err(RedDBError::Query(
584 "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
585 ));
586 }
587 let collections = collection.as_ref().map(|c| vec![c.clone()]);
588 let scope = self.ai_scope();
590 let res =
591 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
592 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
593 self,
594 &scope,
595 query.clone(),
596 collections,
597 None,
598 None,
599 None,
600 Some(*limit),
601 *fuzzy,
602 )?
603 } else {
604 self.search_text(
605 query.clone(),
606 collections,
607 None,
608 None,
609 None,
610 Some(*limit),
611 *fuzzy,
612 )?
613 };
614 let mut result =
615 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
616 for item in &res.matches {
617 let mut record = UnifiedRecord::new();
618 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
619 record.set("score", Value::Float(item.score as f64));
620 result.push(record);
621 }
622 Ok(RuntimeQueryResult {
623 query: raw_query.to_string(),
624 mode: QueryMode::Sql,
625 statement: "search_text",
626 engine: "runtime-search",
627 result,
628 affected_rows: 0,
629 statement_type: "select",
630 })
631 }
632 SearchCommand::Hybrid {
633 vector,
634 query,
635 collection,
636 limit,
637 limit_param,
638 } => {
639 if limit_param.is_some() {
640 return Err(RedDBError::Query(
641 "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
642 .to_string(),
643 ));
644 }
645 let res = self.search_hybrid(
646 vector.clone(),
647 query.clone(),
648 Some(*limit),
649 Some(vec![collection.clone()]),
650 None,
651 None,
652 None,
653 Vec::new(),
654 None,
655 None,
656 Some(*limit),
657 )?;
658 let mut result =
659 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
660 for item in &res.matches {
661 let mut record = UnifiedRecord::new();
662 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
663 record.set("score", Value::Float(item.score as f64));
664 result.push(record);
665 }
666 Ok(RuntimeQueryResult {
667 query: raw_query.to_string(),
668 mode: QueryMode::Sql,
669 statement: "search_hybrid",
670 engine: "runtime-search",
671 result,
672 affected_rows: 0,
673 statement_type: "select",
674 })
675 }
676 SearchCommand::Multimodal {
677 query,
678 collection,
679 limit,
680 limit_param,
681 } => {
682 if limit_param.is_some() {
683 return Err(RedDBError::Query(
684 "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
685 .to_string(),
686 ));
687 }
688 let collections = collection.as_ref().map(|c| vec![c.clone()]);
689 let res =
690 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
691 let mut result =
692 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
693 for item in &res.matches {
694 let mut record = UnifiedRecord::new();
695 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
696 record.set("score", Value::Float(item.score as f64));
697 result.push(record);
698 }
699 Ok(RuntimeQueryResult {
700 query: raw_query.to_string(),
701 mode: QueryMode::Sql,
702 statement: "search_multimodal",
703 engine: "runtime-search",
704 result,
705 affected_rows: 0,
706 statement_type: "select",
707 })
708 }
709 SearchCommand::Index {
710 index,
711 value,
712 collection,
713 limit,
714 exact,
715 limit_param,
716 } => {
717 if limit_param.is_some() {
718 return Err(RedDBError::Query(
719 "SEARCH INDEX LIMIT $N parameter was not bound before execution"
720 .to_string(),
721 ));
722 }
723 let collections = collection.as_ref().map(|c| vec![c.clone()]);
724 let res = self.search_index(
725 index.clone(),
726 value.clone(),
727 *exact,
728 collections,
729 None,
730 None,
731 Some(*limit),
732 )?;
733 let mut result =
734 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
735 for item in &res.matches {
736 let mut record = UnifiedRecord::new();
737 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
738 record.set("score", Value::Float(item.score as f64));
739 result.push(record);
740 }
741 Ok(RuntimeQueryResult {
742 query: raw_query.to_string(),
743 mode: QueryMode::Sql,
744 statement: "search_index",
745 engine: "runtime-search",
746 result,
747 affected_rows: 0,
748 statement_type: "select",
749 })
750 }
751 SearchCommand::Context {
752 query,
753 field,
754 collection,
755 limit,
756 depth,
757 limit_param,
758 } => {
759 if limit_param.is_some() {
760 return Err(RedDBError::Query(
761 "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
762 .to_string(),
763 ));
764 }
765 use crate::application::SearchContextInput;
766 let input = SearchContextInput {
770 query: query.clone(),
771 field: field.clone(),
772 vector: None,
773 collections: collection.as_ref().map(|c| vec![c.clone()]),
774 graph_depth: Some(*depth),
775 graph_max_edges: None,
776 max_cross_refs: None,
777 follow_cross_refs: None,
778 expand_graph: None,
779 global_scan: None,
780 reindex: None,
781 limit: Some(*limit),
782 min_score: None,
783 };
784 let scope = self.ai_scope();
785 let res =
786 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
787 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
788 self, &scope, input,
789 )?
790 } else {
791 self.search_context(input)?
792 };
793 let mut result = UnifiedResult::with_columns(vec![
794 "entity_id".into(),
795 "collection".into(),
796 "score".into(),
797 "discovery".into(),
798 "kind".into(),
799 ]);
800 let all_entities = res
801 .tables
802 .iter()
803 .map(|e| (e, "table"))
804 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
805 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
806 .chain(res.vectors.iter().map(|e| (e, "vector")))
807 .chain(res.documents.iter().map(|e| (e, "document")))
808 .chain(res.key_values.iter().map(|e| (e, "kv")));
809 for (entity, kind) in all_entities {
810 let mut record = UnifiedRecord::new();
811 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
812 record.set("collection", Value::text(entity.collection.clone()));
813 record.set("score", Value::Float(entity.score as f64));
814 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
815 record.set("kind", Value::text(kind.to_string()));
816 result.push(record);
817 }
818 Ok(RuntimeQueryResult {
819 query: raw_query.to_string(),
820 mode: QueryMode::Sql,
821 statement: "search_context",
822 engine: "runtime-context",
823 result,
824 affected_rows: 0,
825 statement_type: "select",
826 })
827 }
828 SearchCommand::SpatialRadius {
829 center_lat,
830 center_lon,
831 radius_km,
832 collection,
833 column,
834 limit,
835 limit_param,
836 } => {
837 if limit_param.is_some() {
838 return Err(RedDBError::Query(
839 "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
840 .to_string(),
841 ));
842 }
843 use crate::storage::unified::spatial_index::haversine_km;
844 let _ = column; let store = self.inner.db.store();
846 let entities = store
847 .get_collection(collection)
848 .map(|m| m.query_all(|_| true))
849 .unwrap_or_default();
850
851 let mut hits: Vec<(u64, f64)> = Vec::new();
852 for entity in &entities {
853 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
855 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
856 if dist <= *radius_km {
857 hits.push((entity.id.raw(), dist));
858 }
859 }
860 }
861 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
862 hits.truncate(*limit);
863
864 let mut result =
865 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
866 for (id, dist) in &hits {
867 let mut record = UnifiedRecord::new();
868 record.set("entity_id", Value::UnsignedInteger(*id));
869 record.set("distance_km", Value::Float(*dist));
870 result.push(record);
871 }
872 Ok(RuntimeQueryResult {
873 query: raw_query.to_string(),
874 mode: QueryMode::Sql,
875 statement: "search_spatial_radius",
876 engine: "runtime-spatial",
877 result,
878 affected_rows: 0,
879 statement_type: "select",
880 })
881 }
882 SearchCommand::SpatialBbox {
883 min_lat,
884 min_lon,
885 max_lat,
886 max_lon,
887 collection,
888 column,
889 limit,
890 limit_param,
891 } => {
892 if limit_param.is_some() {
893 return Err(RedDBError::Query(
894 "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
895 .to_string(),
896 ));
897 }
898 let _ = column;
899 let store = self.inner.db.store();
900 let entities = store
901 .get_collection(collection)
902 .map(|m| m.query_all(|_| true))
903 .unwrap_or_default();
904
905 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
906 let mut count = 0;
907 for entity in &entities {
908 if count >= *limit {
909 break;
910 }
911 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
912 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
913 {
914 let mut record = UnifiedRecord::new();
915 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
916 result.push(record);
917 count += 1;
918 }
919 }
920 }
921 Ok(RuntimeQueryResult {
922 query: raw_query.to_string(),
923 mode: QueryMode::Sql,
924 statement: "search_spatial_bbox",
925 engine: "runtime-spatial",
926 result,
927 affected_rows: 0,
928 statement_type: "select",
929 })
930 }
931 SearchCommand::SpatialNearest {
932 lat,
933 lon,
934 k,
935 collection,
936 column,
937 k_param,
938 } => {
939 if k_param.is_some() {
940 return Err(RedDBError::Query(
941 "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
942 .to_string(),
943 ));
944 }
945 use crate::storage::unified::spatial_index::haversine_km;
946 let _ = column;
947 let store = self.inner.db.store();
948 let entities = store
949 .get_collection(collection)
950 .map(|m| m.query_all(|_| true))
951 .unwrap_or_default();
952
953 let mut hits: Vec<(u64, f64)> = Vec::new();
954 for entity in &entities {
955 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
956 let dist = haversine_km(*lat, *lon, elat, elon);
957 hits.push((entity.id.raw(), dist));
958 }
959 }
960 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
961 hits.truncate(*k);
962
963 let mut result =
964 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
965 for (id, dist) in &hits {
966 let mut record = UnifiedRecord::new();
967 record.set("entity_id", Value::UnsignedInteger(*id));
968 record.set("distance_km", Value::Float(*dist));
969 result.push(record);
970 }
971 Ok(RuntimeQueryResult {
972 query: raw_query.to_string(),
973 mode: QueryMode::Sql,
974 statement: "search_spatial_nearest",
975 engine: "runtime-spatial",
976 result,
977 affected_rows: 0,
978 statement_type: "select",
979 })
980 }
981 }
982 }
983}
984
985fn apply_graph_order_and_limit(
986 result: &mut UnifiedResult,
987 statement: &str,
988 order_by: Option<&GraphCommandOrderBy>,
989 limit: Option<usize>,
990) -> RedDBResult<()> {
991 if let Some(order) = order_by {
992 let column = graph_order_metric_column(statement, &order.metric)?;
993 let columns = result.columns.clone();
994 result.records.sort_by(|left, right| {
995 let cmp = compare_graph_values(left.get(column), right.get(column));
996 let cmp = if order.ascending { cmp } else { cmp.reverse() };
997 if cmp == Ordering::Equal {
998 compare_graph_rows(left, right, &columns)
999 } else {
1000 cmp
1001 }
1002 });
1003 }
1004 if let Some(limit) = limit {
1005 result.records.truncate(limit);
1006 }
1007 Ok(())
1008}
1009
1010fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
1011 let metric = metric.to_ascii_lowercase();
1012 match (statement, metric.as_str()) {
1013 ("graph_centrality", "score" | "centrality_score") => Ok("score"),
1014 ("graph_community", "size" | "community_size") => Ok("size"),
1015 ("graph_components", "size" | "component_size") => Ok("size"),
1016 ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
1017 Ok(match metric.as_str() {
1018 "total_weight" => "total_weight",
1019 "nodes_visited" => "nodes_visited",
1020 _ => "hop_count",
1021 })
1022 }
1023 _ => Err(RedDBError::Query(format!(
1024 "unsupported ORDER BY metric '{metric}' for GRAPH {}",
1025 statement.trim_start_matches("graph_")
1026 ))),
1027 }
1028}
1029
1030fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1031 for column in columns {
1032 let cmp = compare_graph_values(left.get(column), right.get(column));
1033 if cmp != Ordering::Equal {
1034 return cmp;
1035 }
1036 }
1037 Ordering::Equal
1038}
1039
1040fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1041 match (left, right) {
1042 (None, None) => Ordering::Equal,
1043 (None, Some(_)) => Ordering::Less,
1044 (Some(_), None) => Ordering::Greater,
1045 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1046 (Some(Value::Null), Some(_)) => Ordering::Less,
1047 (Some(_), Some(Value::Null)) => Ordering::Greater,
1048 (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1049 (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1050 left.cmp(right)
1051 }
1052 (Some(Value::Float(left)), Some(Value::Float(right))) => {
1053 left.partial_cmp(right).unwrap_or(Ordering::Equal)
1054 }
1055 (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1056 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1057 }
1058 (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1059 .partial_cmp(&(*right as f64))
1060 .unwrap_or(Ordering::Equal),
1061 (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1062 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1063 }
1064 (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1065 .partial_cmp(&(*right as f64))
1066 .unwrap_or(Ordering::Equal),
1067 (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1068 (*left as i128).cmp(&(*right as i128))
1069 }
1070 (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1071 (*left as i128).cmp(&(*right as i128))
1072 }
1073 (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1074 (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1075 (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1076 (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1077 }
1078}
1079
1080fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1085 match s.to_lowercase().as_str() {
1086 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1087 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1088 "both" | "any" => Ok(RuntimeGraphDirection::Both),
1089 _ => Err(RedDBError::Query(format!(
1090 "unknown direction: '{s}', expected outgoing|incoming|both"
1091 ))),
1092 }
1093}
1094
1095fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1096 match s.to_lowercase().as_str() {
1097 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1098 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1099 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1100 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1101 _ => Err(RedDBError::Query(format!(
1102 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1103 ))),
1104 }
1105}
1106
1107fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1108 match s.to_lowercase().as_str() {
1109 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1110 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1111 _ => Err(RedDBError::Query(format!(
1112 "unknown traversal strategy: '{s}', expected bfs|dfs"
1113 ))),
1114 }
1115}
1116
1117fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1118 match s.to_lowercase().as_str() {
1119 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1120 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1121 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1122 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1123 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1124 _ => Err(RedDBError::Query(format!(
1125 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1126 ))),
1127 }
1128}
1129
1130fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1131 match s.to_lowercase().as_str() {
1132 "label_propagation" | "labelpropagation" => {
1133 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1134 }
1135 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1136 _ => Err(RedDBError::Query(format!(
1137 "unknown community algorithm: '{s}', expected label_propagation|louvain"
1138 ))),
1139 }
1140}
1141
1142fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1143 match s.to_lowercase().as_str() {
1144 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1145 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1146 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1147 _ => Err(RedDBError::Query(format!(
1148 "unknown components mode: '{s}', expected connected|weak|strong"
1149 ))),
1150 }
1151}
1152
1153fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1158 match &entity.data {
1159 EntityData::Row(row) => {
1160 if let Some(ref named) = row.named {
1162 for value in named.values() {
1164 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1165 return Some((
1166 *lat_micro as f64 / 1_000_000.0,
1167 *lon_micro as f64 / 1_000_000.0,
1168 ));
1169 }
1170 }
1171 let lat =
1173 named
1174 .get("lat")
1175 .or_else(|| named.get("latitude"))
1176 .and_then(|v| match v {
1177 Value::Float(f) => Some(*f),
1178 Value::Integer(i) => Some(*i as f64),
1179 _ => None,
1180 });
1181 let lon = named
1182 .get("lon")
1183 .or_else(|| named.get("lng"))
1184 .or_else(|| named.get("longitude"))
1185 .and_then(|v| match v {
1186 Value::Float(f) => Some(*f),
1187 Value::Integer(i) => Some(*i as f64),
1188 _ => None,
1189 });
1190 if let (Some(la), Some(lo)) = (lat, lon) {
1191 return Some((la, lo));
1192 }
1193 }
1194 for value in &row.columns {
1196 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1197 return Some((
1198 *lat_micro as f64 / 1_000_000.0,
1199 *lon_micro as f64 / 1_000_000.0,
1200 ));
1201 }
1202 }
1203 None
1204 }
1205 EntityData::Node(node) => {
1206 for value in node.properties.values() {
1208 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1209 return Some((
1210 *lat_micro as f64 / 1_000_000.0,
1211 *lon_micro as f64 / 1_000_000.0,
1212 ));
1213 }
1214 }
1215 let lat = node
1216 .properties
1217 .get("lat")
1218 .or_else(|| node.properties.get("latitude"))
1219 .and_then(|v| match v {
1220 Value::Float(f) => Some(*f),
1221 Value::Integer(i) => Some(*i as f64),
1222 _ => None,
1223 });
1224 let lon = node
1225 .properties
1226 .get("lon")
1227 .or_else(|| node.properties.get("lng"))
1228 .or_else(|| node.properties.get("longitude"))
1229 .and_then(|v| match v {
1230 Value::Float(f) => Some(*f),
1231 Value::Integer(i) => Some(*i as f64),
1232 _ => None,
1233 });
1234 if let (Some(la), Some(lo)) = (lat, lon) {
1235 return Some((la, lo));
1236 }
1237 None
1238 }
1239 _ => None,
1240 }
1241}