1use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12 pub fn execute_graph_command(
14 &self,
15 raw_query: &str,
16 cmd: &GraphCommand,
17 ) -> RedDBResult<RuntimeQueryResult> {
18 match cmd {
19 GraphCommand::Neighborhood {
20 source,
21 depth,
22 direction,
23 edge_labels,
24 } => {
25 let dir = parse_direction(direction)?;
26 let res = self.graph_neighborhood(
27 source,
28 dir,
29 *depth as usize,
30 edge_labels.clone(),
31 None,
32 )?;
33 let mut result = UnifiedResult::with_columns(vec![
34 "node_id".into(),
35 "label".into(),
36 "node_type".into(),
37 "depth".into(),
38 ]);
39 for visit in &res.nodes {
40 let mut record = UnifiedRecord::new();
41 record.set("node_id", Value::text(visit.node.id.clone()));
42 record.set("label", Value::text(visit.node.label.clone()));
43 record.set("node_type", Value::text(visit.node.node_type.clone()));
44 record.set("depth", Value::Integer(visit.depth as i64));
45 result.push(record);
46 }
47 Ok(RuntimeQueryResult {
48 query: raw_query.to_string(),
49 mode: QueryMode::Sql,
50 statement: "graph_neighborhood",
51 engine: "runtime-graph",
52 result,
53 affected_rows: 0,
54 statement_type: "select",
55 })
56 }
57 GraphCommand::ShortestPath {
58 source,
59 target,
60 algorithm,
61 direction,
62 limit,
63 order_by,
64 } => {
65 let dir = parse_direction(direction)?;
66 let alg = parse_path_algorithm(algorithm)?;
67 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68 let mut result = UnifiedResult::with_columns(vec![
69 "source".into(),
70 "target".into(),
71 "nodes_visited".into(),
72 "negative_cycle_detected".into(),
73 "path_found".into(),
74 "hop_count".into(),
75 "total_weight".into(),
76 ]);
77 let mut record = UnifiedRecord::new();
78 record.set("source", Value::text(res.source));
79 record.set("target", Value::text(res.target));
80 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
81 record.set(
82 "negative_cycle_detected",
83 match res.negative_cycle_detected {
84 Some(value) => Value::Boolean(value),
85 None => Value::Null,
86 },
87 );
88 if let Some(ref path) = res.path {
89 record.set("path_found", Value::Boolean(true));
90 record.set("hop_count", Value::Integer(path.hop_count as i64));
91 record.set("total_weight", Value::Float(path.total_weight));
92 } else {
93 record.set("path_found", Value::Boolean(false));
94 record.set("hop_count", Value::Null);
95 record.set("total_weight", Value::Null);
96 }
97 result.push(record);
98 apply_graph_order_and_limit(
99 &mut result,
100 "graph_shortest_path",
101 order_by.as_ref(),
102 limit.map(|n| n as usize),
103 )?;
104 Ok(RuntimeQueryResult {
105 query: raw_query.to_string(),
106 mode: QueryMode::Sql,
107 statement: "graph_shortest_path",
108 engine: "runtime-graph",
109 result,
110 affected_rows: 0,
111 statement_type: "select",
112 })
113 }
114 GraphCommand::Properties { source } => {
115 if let Some(node_ref) = source {
116 let graph =
120 materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
121 let resolved = resolve_graph_node_id(&graph, node_ref)?;
122 let stored = graph
123 .get_node(&resolved)
124 .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
125 let node_type = self
126 .inner
127 .db
128 .store()
129 .query_all(|entity| {
130 entity.id.raw().to_string() == resolved
131 && matches!(
132 entity.kind,
133 crate::storage::unified::EntityKind::GraphNode(_)
134 )
135 })
136 .into_iter()
137 .find_map(|(_, entity)| match entity.kind {
138 crate::storage::unified::EntityKind::GraphNode(node) => {
139 Some(node.node_type)
140 }
141 _ => None,
142 })
143 .unwrap_or_else(|| stored.node_type.clone());
144 let all_props =
145 materialize_graph_node_properties(self.inner.db.store().as_ref())?;
146 let props = all_props.get(&resolved).cloned().unwrap_or_default();
147
148 let mut prop_keys: Vec<&String> = props.keys().collect();
151 prop_keys.sort();
152 let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
153 columns.push("node_id".into());
154 columns.push("label".into());
155 columns.push("node_type".into());
156 for k in &prop_keys {
157 columns.push((*k).clone());
158 }
159 let mut result = UnifiedResult::with_columns(columns);
160 let mut record = UnifiedRecord::new();
161 record.set("node_id", Value::text(stored.id.clone()));
162 record.set("label", Value::text(stored.label.clone()));
163 record.set("node_type", Value::text(node_type));
164 for k in &prop_keys {
165 if let Some(v) = props.get(*k) {
166 record.set(k.as_str(), v.clone());
167 }
168 }
169 result.push(record);
170 return Ok(RuntimeQueryResult {
171 query: raw_query.to_string(),
172 mode: QueryMode::Sql,
173 statement: "graph_properties",
174 engine: "runtime-graph",
175 result,
176 affected_rows: 0,
177 statement_type: "select",
178 });
179 }
180 let res = self.graph_properties(None)?;
181 let mut result = UnifiedResult::with_columns(vec![
182 "node_count".into(),
183 "edge_count".into(),
184 "is_connected".into(),
185 "is_complete".into(),
186 "is_cyclic".into(),
187 "density".into(),
188 ]);
189 let mut record = UnifiedRecord::new();
190 record.set("node_count", Value::Integer(res.node_count as i64));
191 record.set("edge_count", Value::Integer(res.edge_count as i64));
192 record.set("is_connected", Value::Boolean(res.is_connected));
193 record.set("is_complete", Value::Boolean(res.is_complete));
194 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
195 record.set("density", Value::Float(res.density));
196 result.push(record);
197 Ok(RuntimeQueryResult {
198 query: raw_query.to_string(),
199 mode: QueryMode::Sql,
200 statement: "graph_properties",
201 engine: "runtime-graph",
202 result,
203 affected_rows: 0,
204 statement_type: "select",
205 })
206 }
207 GraphCommand::Traverse {
208 source,
209 strategy,
210 depth,
211 direction,
212 edge_labels,
213 } => {
214 let dir = parse_direction(direction)?;
215 let strat = parse_traversal_strategy(strategy)?;
216 let res = self.graph_traverse(
217 source,
218 dir,
219 *depth as usize,
220 strat,
221 edge_labels.clone(),
222 None,
223 )?;
224 let mut result = UnifiedResult::with_columns(vec![
225 "node_id".into(),
226 "label".into(),
227 "node_type".into(),
228 "depth".into(),
229 ]);
230 for visit in &res.visits {
231 let mut record = UnifiedRecord::new();
232 record.set("node_id", Value::text(visit.node.id.clone()));
233 record.set("label", Value::text(visit.node.label.clone()));
234 record.set("node_type", Value::text(visit.node.node_type.clone()));
235 record.set("depth", Value::Integer(visit.depth as i64));
236 result.push(record);
237 }
238 Ok(RuntimeQueryResult {
239 query: raw_query.to_string(),
240 mode: QueryMode::Sql,
241 statement: "graph_traverse",
242 engine: "runtime-graph",
243 result,
244 affected_rows: 0,
245 statement_type: "select",
246 })
247 }
248 GraphCommand::Centrality {
249 algorithm,
250 limit,
251 order_by,
252 } => {
253 let alg = parse_centrality_algorithm(algorithm)?;
254 let limit_usize = limit.map(|n| n as usize);
257 let order_needs_full_set = order_by
258 .as_ref()
259 .map(|order| order.ascending)
260 .unwrap_or(false);
261 let top_k = if order_needs_full_set {
262 usize::MAX
263 } else {
264 limit_usize.unwrap_or(100).max(1)
265 };
266 let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
267 let mut result = UnifiedResult::with_columns(vec![
268 "node_id".into(),
269 "label".into(),
270 "score".into(),
271 ]);
272 for score in &res.scores {
273 let mut record = UnifiedRecord::new();
274 record.set("node_id", Value::text(score.node.id.clone()));
275 record.set("label", Value::text(score.node.label.clone()));
276 record.set("score", Value::Float(score.score));
277 result.push(record);
278 }
279 for ds in &res.degree_scores {
280 let mut record = UnifiedRecord::new();
281 record.set("node_id", Value::text(ds.node.id.clone()));
282 record.set("label", Value::text(ds.node.label.clone()));
283 record.set("score", Value::Float(ds.total_degree as f64));
284 result.push(record);
285 }
286 apply_graph_order_and_limit(
287 &mut result,
288 "graph_centrality",
289 order_by.as_ref(),
290 Some(limit_usize.unwrap_or(100)),
291 )?;
292 Ok(RuntimeQueryResult {
293 query: raw_query.to_string(),
294 mode: QueryMode::Sql,
295 statement: "graph_centrality",
296 engine: "runtime-graph",
297 result,
298 affected_rows: 0,
299 statement_type: "select",
300 })
301 }
302 GraphCommand::Community {
303 algorithm,
304 max_iterations,
305 limit,
306 order_by,
307 return_assignments,
308 } => {
309 let alg = parse_community_algorithm(algorithm)?;
310 let res =
311 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
312 let result = if *return_assignments {
313 let mut result =
319 UnifiedResult::with_columns(vec!["node_id".into(), "community_id".into()]);
320 let row_cap = limit.map(|n| n as usize);
321 'outer: for community in &res.communities {
322 let mut nodes = community.nodes.clone();
323 nodes.sort();
324 for node_id in nodes {
325 if row_cap.is_some_and(|cap| result.records.len() >= cap) {
326 break 'outer;
327 }
328 let mut record = UnifiedRecord::new();
329 record.set("node_id", Value::text(node_id));
330 record.set("community_id", Value::text(community.id.clone()));
331 result.push(record);
332 }
333 }
334 result
335 } else {
336 let mut result =
337 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
338 for community in &res.communities {
339 let mut record = UnifiedRecord::new();
340 record.set("community_id", Value::text(community.id.clone()));
341 record.set("size", Value::Integer(community.size as i64));
342 result.push(record);
343 }
344 apply_graph_order_and_limit(
345 &mut result,
346 "graph_community",
347 order_by.as_ref(),
348 limit.map(|n| n as usize),
349 )?;
350 result
351 };
352 Ok(RuntimeQueryResult {
353 query: raw_query.to_string(),
354 mode: QueryMode::Sql,
355 statement: "graph_community",
356 engine: "runtime-graph",
357 result,
358 affected_rows: 0,
359 statement_type: "select",
360 })
361 }
362 GraphCommand::Components {
363 mode,
364 limit,
365 order_by,
366 } => {
367 let m = parse_components_mode(mode)?;
368 let res = self.graph_components(m, 1, None)?;
369 let mut result =
370 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
371 for component in &res.components {
372 let mut record = UnifiedRecord::new();
373 record.set("component_id", Value::text(component.id.clone()));
374 record.set("size", Value::Integer(component.size as i64));
375 result.push(record);
376 }
377 apply_graph_order_and_limit(
378 &mut result,
379 "graph_components",
380 order_by.as_ref(),
381 limit.map(|n| n as usize),
382 )?;
383 Ok(RuntimeQueryResult {
384 query: raw_query.to_string(),
385 mode: QueryMode::Sql,
386 statement: "graph_components",
387 engine: "runtime-graph",
388 result,
389 affected_rows: 0,
390 statement_type: "select",
391 })
392 }
393 GraphCommand::Cycles { max_length } => {
394 let res = self.graph_cycles(*max_length as usize, 100, None)?;
395 let mut result =
396 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
397 for (i, cycle) in res.cycles.iter().enumerate() {
398 let mut record = UnifiedRecord::new();
399 record.set("cycle_index", Value::Integer(i as i64));
400 record.set("length", Value::Integer(cycle.nodes.len() as i64));
401 result.push(record);
402 }
403 Ok(RuntimeQueryResult {
404 query: raw_query.to_string(),
405 mode: QueryMode::Sql,
406 statement: "graph_cycles",
407 engine: "runtime-graph",
408 result,
409 affected_rows: 0,
410 statement_type: "select",
411 })
412 }
413 GraphCommand::Clustering => {
414 let res = self.graph_clustering(100, true, None)?;
415 let mut result = UnifiedResult::with_columns(vec![
416 "node_id".into(),
417 "label".into(),
418 "score".into(),
419 ]);
420 let mut global_record = UnifiedRecord::new();
422 global_record.set("node_id", Value::text("__global__"));
423 global_record.set("label", Value::text("global_clustering"));
424 global_record.set("score", Value::Float(res.global));
425 result.push(global_record);
426 for score in &res.local {
427 let mut record = UnifiedRecord::new();
428 record.set("node_id", Value::text(score.node.id.clone()));
429 record.set("label", Value::text(score.node.label.clone()));
430 record.set("score", Value::Float(score.score));
431 result.push(record);
432 }
433 Ok(RuntimeQueryResult {
434 query: raw_query.to_string(),
435 mode: QueryMode::Sql,
436 statement: "graph_clustering",
437 engine: "runtime-graph",
438 result,
439 affected_rows: 0,
440 statement_type: "select",
441 })
442 }
443 GraphCommand::TopologicalSort => {
444 let res = self.graph_topological_sort(None)?;
445 let mut result = UnifiedResult::with_columns(vec![
446 "order".into(),
447 "node_id".into(),
448 "label".into(),
449 ]);
450 for (i, node) in res.ordered_nodes.iter().enumerate() {
451 let mut record = UnifiedRecord::new();
452 record.set("order", Value::Integer(i as i64));
453 record.set("node_id", Value::text(node.id.clone()));
454 record.set("label", Value::text(node.label.clone()));
455 result.push(record);
456 }
457 Ok(RuntimeQueryResult {
458 query: raw_query.to_string(),
459 mode: QueryMode::Sql,
460 statement: "graph_topological_sort",
461 engine: "runtime-graph",
462 result,
463 affected_rows: 0,
464 statement_type: "select",
465 })
466 }
467 }
468 }
469
470 pub fn execute_search_command(
472 &self,
473 raw_query: &str,
474 cmd: &SearchCommand,
475 ) -> RedDBResult<RuntimeQueryResult> {
476 match cmd {
477 SearchCommand::Similar {
478 vector,
479 text,
480 provider,
481 collection,
482 limit,
483 min_score,
484 vector_param,
485 limit_param,
486 min_score_param,
487 text_param,
488 } => {
489 if vector_param.is_some() {
490 return Err(RedDBError::Query(
491 "SEARCH SIMILAR $N vector parameter was not bound before execution"
492 .to_string(),
493 ));
494 }
495 if limit_param.is_some() {
496 return Err(RedDBError::Query(
497 "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
498 .to_string(),
499 ));
500 }
501 if min_score_param.is_some() {
502 return Err(RedDBError::Query(
503 "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
504 .to_string(),
505 ));
506 }
507 if text_param.is_some() {
508 return Err(RedDBError::Query(
509 "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
510 .to_string(),
511 ));
512 }
513 let search_vector = if let Some(query_text) = text {
515 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
516 let provider = match provider.as_deref() {
517 Some(p) => crate::ai::parse_provider(p)?,
518 None => default_provider,
519 };
520 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
521 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
522 .ok()
523 .unwrap_or_else(|| provider.default_embedding_model().to_string());
524 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
525 let request = crate::ai::OpenAiEmbeddingRequest {
526 api_key,
527 model,
528 inputs: vec![query_text.clone()],
529 dimensions: None,
530 api_base: provider.resolve_api_base(),
531 };
532 let response = crate::runtime::ai::block_on_ai(async move {
533 crate::ai::openai_embeddings_async(&transport, request).await
534 })
535 .and_then(|result| result)?;
536 response.embeddings.into_iter().next().ok_or_else(|| {
537 RedDBError::Query("embedding API returned no vectors".to_string())
538 })?
539 } else {
540 vector.clone()
541 };
542 let scope = self.ai_scope();
546 let results =
547 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
548 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
549 self,
550 &scope,
551 collection,
552 &search_vector,
553 *limit,
554 *min_score,
555 )?
556 } else {
557 self.search_similar(collection, &search_vector, *limit, *min_score)?
559 };
560 let mut result =
561 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
562 for sr in &results {
563 let mut record = UnifiedRecord::new();
564 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
565 record.set("score", Value::Float(sr.score as f64));
566 result.push(record);
567 }
568 Ok(RuntimeQueryResult {
569 query: raw_query.to_string(),
570 mode: QueryMode::Sql,
571 statement: "search_similar",
572 engine: "runtime-search",
573 result,
574 affected_rows: 0,
575 statement_type: "select",
576 })
577 }
578 SearchCommand::Text {
579 query,
580 collection,
581 limit,
582 fuzzy,
583 limit_param,
584 } => {
585 if limit_param.is_some() {
586 return Err(RedDBError::Query(
587 "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
588 ));
589 }
590 let collections = collection.as_ref().map(|c| vec![c.clone()]);
591 let scope = self.ai_scope();
593 let res =
594 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
595 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
596 self,
597 &scope,
598 query.clone(),
599 collections,
600 None,
601 None,
602 None,
603 Some(*limit),
604 *fuzzy,
605 )?
606 } else {
607 self.search_text(
608 query.clone(),
609 collections,
610 None,
611 None,
612 None,
613 Some(*limit),
614 *fuzzy,
615 )?
616 };
617 let mut result =
618 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
619 for item in &res.matches {
620 let mut record = UnifiedRecord::new();
621 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
622 record.set("score", Value::Float(item.score as f64));
623 result.push(record);
624 }
625 Ok(RuntimeQueryResult {
626 query: raw_query.to_string(),
627 mode: QueryMode::Sql,
628 statement: "search_text",
629 engine: "runtime-search",
630 result,
631 affected_rows: 0,
632 statement_type: "select",
633 })
634 }
635 SearchCommand::Hybrid {
636 vector,
637 query,
638 collection,
639 limit,
640 limit_param,
641 } => {
642 if limit_param.is_some() {
643 return Err(RedDBError::Query(
644 "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
645 .to_string(),
646 ));
647 }
648 let res = self.search_hybrid(
649 vector.clone(),
650 query.clone(),
651 Some(*limit),
652 Some(vec![collection.clone()]),
653 None,
654 None,
655 None,
656 Vec::new(),
657 None,
658 None,
659 Some(*limit),
660 )?;
661 let mut result =
662 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
663 for item in &res.matches {
664 let mut record = UnifiedRecord::new();
665 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
666 record.set("score", Value::Float(item.score as f64));
667 result.push(record);
668 }
669 Ok(RuntimeQueryResult {
670 query: raw_query.to_string(),
671 mode: QueryMode::Sql,
672 statement: "search_hybrid",
673 engine: "runtime-search",
674 result,
675 affected_rows: 0,
676 statement_type: "select",
677 })
678 }
679 SearchCommand::Multimodal {
680 query,
681 collection,
682 limit,
683 limit_param,
684 } => {
685 if limit_param.is_some() {
686 return Err(RedDBError::Query(
687 "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
688 .to_string(),
689 ));
690 }
691 let collections = collection.as_ref().map(|c| vec![c.clone()]);
692 let res =
693 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
694 let mut result =
695 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
696 for item in &res.matches {
697 let mut record = UnifiedRecord::new();
698 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
699 record.set("score", Value::Float(item.score as f64));
700 result.push(record);
701 }
702 Ok(RuntimeQueryResult {
703 query: raw_query.to_string(),
704 mode: QueryMode::Sql,
705 statement: "search_multimodal",
706 engine: "runtime-search",
707 result,
708 affected_rows: 0,
709 statement_type: "select",
710 })
711 }
712 SearchCommand::Index {
713 index,
714 value,
715 collection,
716 limit,
717 exact,
718 limit_param,
719 } => {
720 if limit_param.is_some() {
721 return Err(RedDBError::Query(
722 "SEARCH INDEX LIMIT $N parameter was not bound before execution"
723 .to_string(),
724 ));
725 }
726 let collections = collection.as_ref().map(|c| vec![c.clone()]);
727 let res = self.search_index(
728 index.clone(),
729 value.clone(),
730 *exact,
731 collections,
732 None,
733 None,
734 Some(*limit),
735 )?;
736 let mut result =
737 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
738 for item in &res.matches {
739 let mut record = UnifiedRecord::new();
740 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
741 record.set("score", Value::Float(item.score as f64));
742 result.push(record);
743 }
744 Ok(RuntimeQueryResult {
745 query: raw_query.to_string(),
746 mode: QueryMode::Sql,
747 statement: "search_index",
748 engine: "runtime-search",
749 result,
750 affected_rows: 0,
751 statement_type: "select",
752 })
753 }
754 SearchCommand::Context {
755 query,
756 field,
757 collection,
758 limit,
759 depth,
760 limit_param,
761 } => {
762 if limit_param.is_some() {
763 return Err(RedDBError::Query(
764 "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
765 .to_string(),
766 ));
767 }
768 use crate::application::SearchContextInput;
769 let input = SearchContextInput {
773 query: query.clone(),
774 field: field.clone(),
775 vector: None,
776 collections: collection.as_ref().map(|c| vec![c.clone()]),
777 graph_depth: Some(*depth),
778 graph_max_edges: None,
779 max_cross_refs: None,
780 follow_cross_refs: None,
781 expand_graph: None,
782 global_scan: None,
783 reindex: None,
784 limit: Some(*limit),
785 min_score: None,
786 };
787 let scope = self.ai_scope();
788 let res =
789 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
790 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
791 self, &scope, input,
792 )?
793 } else {
794 self.search_context(input)?
795 };
796 let mut result = UnifiedResult::with_columns(vec![
797 "entity_id".into(),
798 "collection".into(),
799 "score".into(),
800 "discovery".into(),
801 "kind".into(),
802 ]);
803 let all_entities = res
804 .tables
805 .iter()
806 .map(|e| (e, "table"))
807 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
808 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
809 .chain(res.vectors.iter().map(|e| (e, "vector")))
810 .chain(res.documents.iter().map(|e| (e, "document")))
811 .chain(res.key_values.iter().map(|e| (e, "kv")));
812 for (entity, kind) in all_entities {
813 let mut record = UnifiedRecord::new();
814 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
815 record.set("collection", Value::text(entity.collection.clone()));
816 record.set("score", Value::Float(entity.score as f64));
817 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
818 record.set("kind", Value::text(kind.to_string()));
819 result.push(record);
820 }
821 Ok(RuntimeQueryResult {
822 query: raw_query.to_string(),
823 mode: QueryMode::Sql,
824 statement: "search_context",
825 engine: "runtime-context",
826 result,
827 affected_rows: 0,
828 statement_type: "select",
829 })
830 }
831 SearchCommand::SpatialRadius {
832 center_lat,
833 center_lon,
834 radius_km,
835 collection,
836 column,
837 limit,
838 limit_param,
839 } => {
840 if limit_param.is_some() {
841 return Err(RedDBError::Query(
842 "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
843 .to_string(),
844 ));
845 }
846 use crate::storage::unified::spatial_index::haversine_km;
847 let _ = column; let store = self.inner.db.store();
849 let entities = store
850 .get_collection(collection)
851 .map(|m| m.query_all(|_| true))
852 .unwrap_or_default();
853
854 let mut hits: Vec<(u64, f64)> = Vec::new();
855 for entity in &entities {
856 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
858 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
859 if dist <= *radius_km {
860 hits.push((entity.id.raw(), dist));
861 }
862 }
863 }
864 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
865 hits.truncate(*limit);
866
867 let mut result =
868 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
869 for (id, dist) in &hits {
870 let mut record = UnifiedRecord::new();
871 record.set("entity_id", Value::UnsignedInteger(*id));
872 record.set("distance_km", Value::Float(*dist));
873 result.push(record);
874 }
875 Ok(RuntimeQueryResult {
876 query: raw_query.to_string(),
877 mode: QueryMode::Sql,
878 statement: "search_spatial_radius",
879 engine: "runtime-spatial",
880 result,
881 affected_rows: 0,
882 statement_type: "select",
883 })
884 }
885 SearchCommand::SpatialBbox {
886 min_lat,
887 min_lon,
888 max_lat,
889 max_lon,
890 collection,
891 column,
892 limit,
893 limit_param,
894 } => {
895 if limit_param.is_some() {
896 return Err(RedDBError::Query(
897 "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
898 .to_string(),
899 ));
900 }
901 let _ = column;
902 let store = self.inner.db.store();
903 let entities = store
904 .get_collection(collection)
905 .map(|m| m.query_all(|_| true))
906 .unwrap_or_default();
907
908 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
909 let mut count = 0;
910 for entity in &entities {
911 if count >= *limit {
912 break;
913 }
914 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
915 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
916 {
917 let mut record = UnifiedRecord::new();
918 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
919 result.push(record);
920 count += 1;
921 }
922 }
923 }
924 Ok(RuntimeQueryResult {
925 query: raw_query.to_string(),
926 mode: QueryMode::Sql,
927 statement: "search_spatial_bbox",
928 engine: "runtime-spatial",
929 result,
930 affected_rows: 0,
931 statement_type: "select",
932 })
933 }
934 SearchCommand::SpatialNearest {
935 lat,
936 lon,
937 k,
938 collection,
939 column,
940 k_param,
941 } => {
942 if k_param.is_some() {
943 return Err(RedDBError::Query(
944 "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
945 .to_string(),
946 ));
947 }
948 use crate::storage::unified::spatial_index::haversine_km;
949 let _ = column;
950 let store = self.inner.db.store();
951 let entities = store
952 .get_collection(collection)
953 .map(|m| m.query_all(|_| true))
954 .unwrap_or_default();
955
956 let mut hits: Vec<(u64, f64)> = Vec::new();
957 for entity in &entities {
958 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
959 let dist = haversine_km(*lat, *lon, elat, elon);
960 hits.push((entity.id.raw(), dist));
961 }
962 }
963 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
964 hits.truncate(*k);
965
966 let mut result =
967 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
968 for (id, dist) in &hits {
969 let mut record = UnifiedRecord::new();
970 record.set("entity_id", Value::UnsignedInteger(*id));
971 record.set("distance_km", Value::Float(*dist));
972 result.push(record);
973 }
974 Ok(RuntimeQueryResult {
975 query: raw_query.to_string(),
976 mode: QueryMode::Sql,
977 statement: "search_spatial_nearest",
978 engine: "runtime-spatial",
979 result,
980 affected_rows: 0,
981 statement_type: "select",
982 })
983 }
984 }
985 }
986}
987
988fn apply_graph_order_and_limit(
989 result: &mut UnifiedResult,
990 statement: &str,
991 order_by: Option<&GraphCommandOrderBy>,
992 limit: Option<usize>,
993) -> RedDBResult<()> {
994 if let Some(order) = order_by {
995 let column = graph_order_metric_column(statement, &order.metric)?;
996 let columns = result.columns.clone();
997 result.records.sort_by(|left, right| {
998 let cmp = compare_graph_values(left.get(column), right.get(column));
999 let cmp = if order.ascending { cmp } else { cmp.reverse() };
1000 if cmp == Ordering::Equal {
1001 compare_graph_rows(left, right, &columns)
1002 } else {
1003 cmp
1004 }
1005 });
1006 }
1007 if let Some(limit) = limit {
1008 result.records.truncate(limit);
1009 }
1010 Ok(())
1011}
1012
1013fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
1014 let metric = metric.to_ascii_lowercase();
1015 match (statement, metric.as_str()) {
1016 ("graph_centrality", "score" | "centrality_score") => Ok("score"),
1017 ("graph_community", "size" | "community_size") => Ok("size"),
1018 ("graph_components", "size" | "component_size") => Ok("size"),
1019 ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
1020 Ok(match metric.as_str() {
1021 "total_weight" => "total_weight",
1022 "nodes_visited" => "nodes_visited",
1023 _ => "hop_count",
1024 })
1025 }
1026 _ => Err(RedDBError::Query(format!(
1027 "unsupported ORDER BY metric '{metric}' for GRAPH {}",
1028 statement.trim_start_matches("graph_")
1029 ))),
1030 }
1031}
1032
1033fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1034 for column in columns {
1035 let cmp = compare_graph_values(left.get(column), right.get(column));
1036 if cmp != Ordering::Equal {
1037 return cmp;
1038 }
1039 }
1040 Ordering::Equal
1041}
1042
1043fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1044 match (left, right) {
1045 (None, None) => Ordering::Equal,
1046 (None, Some(_)) => Ordering::Less,
1047 (Some(_), None) => Ordering::Greater,
1048 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1049 (Some(Value::Null), Some(_)) => Ordering::Less,
1050 (Some(_), Some(Value::Null)) => Ordering::Greater,
1051 (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1052 (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1053 left.cmp(right)
1054 }
1055 (Some(Value::Float(left)), Some(Value::Float(right))) => {
1056 left.partial_cmp(right).unwrap_or(Ordering::Equal)
1057 }
1058 (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1059 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1060 }
1061 (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1062 .partial_cmp(&(*right as f64))
1063 .unwrap_or(Ordering::Equal),
1064 (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1065 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1066 }
1067 (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1068 .partial_cmp(&(*right as f64))
1069 .unwrap_or(Ordering::Equal),
1070 (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1071 (*left as i128).cmp(&(*right as i128))
1072 }
1073 (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1074 (*left as i128).cmp(&(*right as i128))
1075 }
1076 (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1077 (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1078 (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1079 (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1080 }
1081}
1082
1083fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1088 match s.to_lowercase().as_str() {
1089 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1090 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1091 "both" | "any" => Ok(RuntimeGraphDirection::Both),
1092 _ => Err(RedDBError::Query(format!(
1093 "unknown direction: '{s}', expected outgoing|incoming|both"
1094 ))),
1095 }
1096}
1097
1098fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1099 match s.to_lowercase().as_str() {
1100 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1101 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1102 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1103 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1104 _ => Err(RedDBError::Query(format!(
1105 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1106 ))),
1107 }
1108}
1109
1110fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1111 match s.to_lowercase().as_str() {
1112 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1113 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1114 _ => Err(RedDBError::Query(format!(
1115 "unknown traversal strategy: '{s}', expected bfs|dfs"
1116 ))),
1117 }
1118}
1119
1120fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1121 match s.to_lowercase().as_str() {
1122 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1123 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1124 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1125 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1126 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1127 _ => Err(RedDBError::Query(format!(
1128 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1129 ))),
1130 }
1131}
1132
1133fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1134 match s.to_lowercase().as_str() {
1135 "label_propagation" | "labelpropagation" => {
1136 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1137 }
1138 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1139 _ => Err(RedDBError::Query(format!(
1140 "unknown community algorithm: '{s}', expected label_propagation|louvain"
1141 ))),
1142 }
1143}
1144
1145fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1146 match s.to_lowercase().as_str() {
1147 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1148 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1149 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1150 _ => Err(RedDBError::Query(format!(
1151 "unknown components mode: '{s}', expected connected|weak|strong"
1152 ))),
1153 }
1154}
1155
1156fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1161 match &entity.data {
1162 EntityData::Row(row) => {
1163 if let Some(ref named) = row.named {
1165 for value in named.values() {
1167 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1168 return Some((
1169 *lat_micro as f64 / 1_000_000.0,
1170 *lon_micro as f64 / 1_000_000.0,
1171 ));
1172 }
1173 }
1174 let lat =
1176 named
1177 .get("lat")
1178 .or_else(|| named.get("latitude"))
1179 .and_then(|v| match v {
1180 Value::Float(f) => Some(*f),
1181 Value::Integer(i) => Some(*i as f64),
1182 _ => None,
1183 });
1184 let lon = named
1185 .get("lon")
1186 .or_else(|| named.get("lng"))
1187 .or_else(|| named.get("longitude"))
1188 .and_then(|v| match v {
1189 Value::Float(f) => Some(*f),
1190 Value::Integer(i) => Some(*i as f64),
1191 _ => None,
1192 });
1193 if let (Some(la), Some(lo)) = (lat, lon) {
1194 return Some((la, lo));
1195 }
1196 }
1197 for value in &row.columns {
1199 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1200 return Some((
1201 *lat_micro as f64 / 1_000_000.0,
1202 *lon_micro as f64 / 1_000_000.0,
1203 ));
1204 }
1205 }
1206 None
1207 }
1208 EntityData::Node(node) => {
1209 for value in node.properties.values() {
1211 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1212 return Some((
1213 *lat_micro as f64 / 1_000_000.0,
1214 *lon_micro as f64 / 1_000_000.0,
1215 ));
1216 }
1217 }
1218 let lat = node
1219 .properties
1220 .get("lat")
1221 .or_else(|| node.properties.get("latitude"))
1222 .and_then(|v| match v {
1223 Value::Float(f) => Some(*f),
1224 Value::Integer(i) => Some(*i as f64),
1225 _ => None,
1226 });
1227 let lon = node
1228 .properties
1229 .get("lon")
1230 .or_else(|| node.properties.get("lng"))
1231 .or_else(|| node.properties.get("longitude"))
1232 .and_then(|v| match v {
1233 Value::Float(f) => Some(*f),
1234 Value::Integer(i) => Some(*i as f64),
1235 _ => None,
1236 });
1237 if let (Some(la), Some(lo)) = (lat, lon) {
1238 return Some((la, lo));
1239 }
1240 None
1241 }
1242 _ => None,
1243 }
1244}