1use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12 pub fn execute_graph_command(
14 &self,
15 raw_query: &str,
16 cmd: &GraphCommand,
17 ) -> RedDBResult<RuntimeQueryResult> {
18 match cmd {
19 GraphCommand::Neighborhood {
20 source,
21 depth,
22 direction,
23 edge_labels,
24 } => {
25 let dir = parse_direction(direction)?;
26 let res = self.graph_neighborhood(
27 source,
28 dir,
29 *depth as usize,
30 edge_labels.clone(),
31 None,
32 )?;
33 let mut result = UnifiedResult::with_columns(vec![
34 "node_id".into(),
35 "label".into(),
36 "node_type".into(),
37 "depth".into(),
38 ]);
39 for visit in &res.nodes {
40 let mut record = UnifiedRecord::new();
41 record.set("node_id", Value::text(visit.node.id.clone()));
42 record.set("label", Value::text(visit.node.label.clone()));
43 record.set("node_type", Value::text(visit.node.node_type.clone()));
44 record.set("depth", Value::Integer(visit.depth as i64));
45 result.push(record);
46 }
47 Ok(RuntimeQueryResult {
48 query: raw_query.to_string(),
49 mode: QueryMode::Sql,
50 statement: "graph_neighborhood",
51 engine: "runtime-graph",
52 result,
53 affected_rows: 0,
54 statement_type: "select",
55 })
56 }
57 GraphCommand::ShortestPath {
58 source,
59 target,
60 algorithm,
61 direction,
62 limit,
63 order_by,
64 } => {
65 let dir = parse_direction(direction)?;
66 let alg = parse_path_algorithm(algorithm)?;
67 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68 let mut result = UnifiedResult::with_columns(vec![
69 "source".into(),
70 "target".into(),
71 "nodes_visited".into(),
72 "negative_cycle_detected".into(),
73 "hop_count".into(),
74 "total_weight".into(),
75 ]);
76 let mut record = UnifiedRecord::new();
77 record.set("source", Value::text(res.source));
78 record.set("target", Value::text(res.target));
79 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80 record.set(
81 "negative_cycle_detected",
82 match res.negative_cycle_detected {
83 Some(value) => Value::Boolean(value),
84 None => Value::Null,
85 },
86 );
87 if let Some(ref path) = res.path {
88 record.set("hop_count", Value::Integer(path.hop_count as i64));
89 record.set("total_weight", Value::Float(path.total_weight));
90 } else {
91 record.set("hop_count", Value::Null);
92 record.set("total_weight", Value::Null);
93 }
94 result.push(record);
95 apply_graph_order_and_limit(
96 &mut result,
97 "graph_shortest_path",
98 order_by.as_ref(),
99 limit.map(|n| n as usize),
100 )?;
101 Ok(RuntimeQueryResult {
102 query: raw_query.to_string(),
103 mode: QueryMode::Sql,
104 statement: "graph_shortest_path",
105 engine: "runtime-graph",
106 result,
107 affected_rows: 0,
108 statement_type: "select",
109 })
110 }
111 GraphCommand::Properties { source } => {
112 if let Some(node_ref) = source {
113 let graph = materialize_graph_with_projection(
117 self.inner.db.store().as_ref(),
118 None,
119 )?;
120 let resolved = resolve_graph_node_id(&graph, node_ref)?;
121 let stored = graph.get_node(&resolved).ok_or_else(|| {
122 RedDBError::NotFound(node_ref.to_string())
123 })?;
124 let all_props = materialize_graph_node_properties(
125 self.inner.db.store().as_ref(),
126 )?;
127 let props = all_props.get(&resolved).cloned().unwrap_or_default();
128
129 let mut prop_keys: Vec<&String> = props.keys().collect();
132 prop_keys.sort();
133 let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
134 columns.push("node_id".into());
135 columns.push("label".into());
136 columns.push("node_type".into());
137 for k in &prop_keys {
138 columns.push((*k).clone());
139 }
140 let mut result = UnifiedResult::with_columns(columns);
141 let mut record = UnifiedRecord::new();
142 record.set("node_id", Value::text(stored.id.clone()));
143 record.set("label", Value::text(stored.label.clone()));
144 record.set("node_type", Value::text(stored.node_type.clone()));
145 for k in &prop_keys {
146 if let Some(v) = props.get(*k) {
147 record.set(k.as_str(), v.clone());
148 }
149 }
150 result.push(record);
151 return Ok(RuntimeQueryResult {
152 query: raw_query.to_string(),
153 mode: QueryMode::Sql,
154 statement: "graph_properties",
155 engine: "runtime-graph",
156 result,
157 affected_rows: 0,
158 statement_type: "select",
159 });
160 }
161 let res = self.graph_properties(None)?;
162 let mut result = UnifiedResult::with_columns(vec![
163 "node_count".into(),
164 "edge_count".into(),
165 "is_connected".into(),
166 "is_complete".into(),
167 "is_cyclic".into(),
168 "density".into(),
169 ]);
170 let mut record = UnifiedRecord::new();
171 record.set("node_count", Value::Integer(res.node_count as i64));
172 record.set("edge_count", Value::Integer(res.edge_count as i64));
173 record.set("is_connected", Value::Boolean(res.is_connected));
174 record.set("is_complete", Value::Boolean(res.is_complete));
175 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
176 record.set("density", Value::Float(res.density));
177 result.push(record);
178 Ok(RuntimeQueryResult {
179 query: raw_query.to_string(),
180 mode: QueryMode::Sql,
181 statement: "graph_properties",
182 engine: "runtime-graph",
183 result,
184 affected_rows: 0,
185 statement_type: "select",
186 })
187 }
188 GraphCommand::Traverse {
189 source,
190 strategy,
191 depth,
192 direction,
193 edge_labels,
194 } => {
195 let dir = parse_direction(direction)?;
196 let strat = parse_traversal_strategy(strategy)?;
197 let res = self.graph_traverse(
198 source,
199 dir,
200 *depth as usize,
201 strat,
202 edge_labels.clone(),
203 None,
204 )?;
205 let mut result = UnifiedResult::with_columns(vec![
206 "node_id".into(),
207 "label".into(),
208 "node_type".into(),
209 "depth".into(),
210 ]);
211 for visit in &res.visits {
212 let mut record = UnifiedRecord::new();
213 record.set("node_id", Value::text(visit.node.id.clone()));
214 record.set("label", Value::text(visit.node.label.clone()));
215 record.set("node_type", Value::text(visit.node.node_type.clone()));
216 record.set("depth", Value::Integer(visit.depth as i64));
217 result.push(record);
218 }
219 Ok(RuntimeQueryResult {
220 query: raw_query.to_string(),
221 mode: QueryMode::Sql,
222 statement: "graph_traverse",
223 engine: "runtime-graph",
224 result,
225 affected_rows: 0,
226 statement_type: "select",
227 })
228 }
229 GraphCommand::Centrality {
230 algorithm,
231 limit,
232 order_by,
233 } => {
234 let alg = parse_centrality_algorithm(algorithm)?;
235 let limit_usize = limit.map(|n| n as usize);
238 let order_needs_full_set = order_by
239 .as_ref()
240 .map(|order| order.ascending)
241 .unwrap_or(false);
242 let top_k = if order_needs_full_set {
243 usize::MAX
244 } else {
245 limit_usize.unwrap_or(100).max(1)
246 };
247 let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
248 let mut result = UnifiedResult::with_columns(vec![
249 "node_id".into(),
250 "label".into(),
251 "score".into(),
252 ]);
253 for score in &res.scores {
254 let mut record = UnifiedRecord::new();
255 record.set("node_id", Value::text(score.node.id.clone()));
256 record.set("label", Value::text(score.node.label.clone()));
257 record.set("score", Value::Float(score.score));
258 result.push(record);
259 }
260 for ds in &res.degree_scores {
261 let mut record = UnifiedRecord::new();
262 record.set("node_id", Value::text(ds.node.id.clone()));
263 record.set("label", Value::text(ds.node.label.clone()));
264 record.set("score", Value::Float(ds.total_degree as f64));
265 result.push(record);
266 }
267 apply_graph_order_and_limit(
268 &mut result,
269 "graph_centrality",
270 order_by.as_ref(),
271 Some(limit_usize.unwrap_or(100)),
272 )?;
273 Ok(RuntimeQueryResult {
274 query: raw_query.to_string(),
275 mode: QueryMode::Sql,
276 statement: "graph_centrality",
277 engine: "runtime-graph",
278 result,
279 affected_rows: 0,
280 statement_type: "select",
281 })
282 }
283 GraphCommand::Community {
284 algorithm,
285 max_iterations,
286 limit,
287 order_by,
288 } => {
289 let alg = parse_community_algorithm(algorithm)?;
290 let res =
291 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
292 let mut result =
293 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
294 for community in &res.communities {
295 let mut record = UnifiedRecord::new();
296 record.set("community_id", Value::text(community.id.clone()));
297 record.set("size", Value::Integer(community.size as i64));
298 result.push(record);
299 }
300 apply_graph_order_and_limit(
301 &mut result,
302 "graph_community",
303 order_by.as_ref(),
304 limit.map(|n| n as usize),
305 )?;
306 Ok(RuntimeQueryResult {
307 query: raw_query.to_string(),
308 mode: QueryMode::Sql,
309 statement: "graph_community",
310 engine: "runtime-graph",
311 result,
312 affected_rows: 0,
313 statement_type: "select",
314 })
315 }
316 GraphCommand::Components {
317 mode,
318 limit,
319 order_by,
320 } => {
321 let m = parse_components_mode(mode)?;
322 let res = self.graph_components(m, 1, None)?;
323 let mut result =
324 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
325 for component in &res.components {
326 let mut record = UnifiedRecord::new();
327 record.set("component_id", Value::text(component.id.clone()));
328 record.set("size", Value::Integer(component.size as i64));
329 result.push(record);
330 }
331 apply_graph_order_and_limit(
332 &mut result,
333 "graph_components",
334 order_by.as_ref(),
335 limit.map(|n| n as usize),
336 )?;
337 Ok(RuntimeQueryResult {
338 query: raw_query.to_string(),
339 mode: QueryMode::Sql,
340 statement: "graph_components",
341 engine: "runtime-graph",
342 result,
343 affected_rows: 0,
344 statement_type: "select",
345 })
346 }
347 GraphCommand::Cycles { max_length } => {
348 let res = self.graph_cycles(*max_length as usize, 100, None)?;
349 let mut result =
350 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
351 for (i, cycle) in res.cycles.iter().enumerate() {
352 let mut record = UnifiedRecord::new();
353 record.set("cycle_index", Value::Integer(i as i64));
354 record.set("length", Value::Integer(cycle.nodes.len() as i64));
355 result.push(record);
356 }
357 Ok(RuntimeQueryResult {
358 query: raw_query.to_string(),
359 mode: QueryMode::Sql,
360 statement: "graph_cycles",
361 engine: "runtime-graph",
362 result,
363 affected_rows: 0,
364 statement_type: "select",
365 })
366 }
367 GraphCommand::Clustering => {
368 let res = self.graph_clustering(100, true, None)?;
369 let mut result = UnifiedResult::with_columns(vec![
370 "node_id".into(),
371 "label".into(),
372 "score".into(),
373 ]);
374 let mut global_record = UnifiedRecord::new();
376 global_record.set("node_id", Value::text("__global__"));
377 global_record.set("label", Value::text("global_clustering"));
378 global_record.set("score", Value::Float(res.global));
379 result.push(global_record);
380 for score in &res.local {
381 let mut record = UnifiedRecord::new();
382 record.set("node_id", Value::text(score.node.id.clone()));
383 record.set("label", Value::text(score.node.label.clone()));
384 record.set("score", Value::Float(score.score));
385 result.push(record);
386 }
387 Ok(RuntimeQueryResult {
388 query: raw_query.to_string(),
389 mode: QueryMode::Sql,
390 statement: "graph_clustering",
391 engine: "runtime-graph",
392 result,
393 affected_rows: 0,
394 statement_type: "select",
395 })
396 }
397 GraphCommand::TopologicalSort => {
398 let res = self.graph_topological_sort(None)?;
399 let mut result = UnifiedResult::with_columns(vec![
400 "order".into(),
401 "node_id".into(),
402 "label".into(),
403 ]);
404 for (i, node) in res.ordered_nodes.iter().enumerate() {
405 let mut record = UnifiedRecord::new();
406 record.set("order", Value::Integer(i as i64));
407 record.set("node_id", Value::text(node.id.clone()));
408 record.set("label", Value::text(node.label.clone()));
409 result.push(record);
410 }
411 Ok(RuntimeQueryResult {
412 query: raw_query.to_string(),
413 mode: QueryMode::Sql,
414 statement: "graph_topological_sort",
415 engine: "runtime-graph",
416 result,
417 affected_rows: 0,
418 statement_type: "select",
419 })
420 }
421 }
422 }
423
424 pub fn execute_search_command(
426 &self,
427 raw_query: &str,
428 cmd: &SearchCommand,
429 ) -> RedDBResult<RuntimeQueryResult> {
430 match cmd {
431 SearchCommand::Similar {
432 vector,
433 text,
434 provider,
435 collection,
436 limit,
437 min_score,
438 vector_param,
439 limit_param,
440 min_score_param,
441 text_param,
442 } => {
443 if vector_param.is_some() {
444 return Err(RedDBError::Query(
445 "SEARCH SIMILAR $N vector parameter was not bound before execution"
446 .to_string(),
447 ));
448 }
449 if limit_param.is_some() {
450 return Err(RedDBError::Query(
451 "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
452 .to_string(),
453 ));
454 }
455 if min_score_param.is_some() {
456 return Err(RedDBError::Query(
457 "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
458 .to_string(),
459 ));
460 }
461 if text_param.is_some() {
462 return Err(RedDBError::Query(
463 "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
464 .to_string(),
465 ));
466 }
467 let search_vector = if let Some(query_text) = text {
469 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
470 let provider = match provider.as_deref() {
471 Some(p) => crate::ai::parse_provider(p)?,
472 None => default_provider,
473 };
474 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
475 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
476 .ok()
477 .unwrap_or_else(|| provider.default_embedding_model().to_string());
478 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
479 let request = crate::ai::OpenAiEmbeddingRequest {
480 api_key,
481 model,
482 inputs: vec![query_text.clone()],
483 dimensions: None,
484 api_base: provider.resolve_api_base(),
485 };
486 let response = crate::runtime::ai::block_on_ai(async move {
487 crate::ai::openai_embeddings_async(&transport, request).await
488 })
489 .and_then(|result| result)?;
490 response.embeddings.into_iter().next().ok_or_else(|| {
491 RedDBError::Query("embedding API returned no vectors".to_string())
492 })?
493 } else {
494 vector.clone()
495 };
496 let scope = self.ai_scope();
500 let results =
501 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
502 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
503 self,
504 &scope,
505 collection,
506 &search_vector,
507 *limit,
508 *min_score,
509 )?
510 } else {
511 self.search_similar(collection, &search_vector, *limit, *min_score)?
513 };
514 let mut result =
515 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
516 for sr in &results {
517 let mut record = UnifiedRecord::new();
518 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
519 record.set("score", Value::Float(sr.score as f64));
520 result.push(record);
521 }
522 Ok(RuntimeQueryResult {
523 query: raw_query.to_string(),
524 mode: QueryMode::Sql,
525 statement: "search_similar",
526 engine: "runtime-search",
527 result,
528 affected_rows: 0,
529 statement_type: "select",
530 })
531 }
532 SearchCommand::Text {
533 query,
534 collection,
535 limit,
536 fuzzy,
537 limit_param,
538 } => {
539 if limit_param.is_some() {
540 return Err(RedDBError::Query(
541 "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
542 ));
543 }
544 let collections = collection.as_ref().map(|c| vec![c.clone()]);
545 let scope = self.ai_scope();
547 let res =
548 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
549 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
550 self,
551 &scope,
552 query.clone(),
553 collections,
554 None,
555 None,
556 None,
557 Some(*limit),
558 *fuzzy,
559 )?
560 } else {
561 self.search_text(
562 query.clone(),
563 collections,
564 None,
565 None,
566 None,
567 Some(*limit),
568 *fuzzy,
569 )?
570 };
571 let mut result =
572 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
573 for item in &res.matches {
574 let mut record = UnifiedRecord::new();
575 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
576 record.set("score", Value::Float(item.score as f64));
577 result.push(record);
578 }
579 Ok(RuntimeQueryResult {
580 query: raw_query.to_string(),
581 mode: QueryMode::Sql,
582 statement: "search_text",
583 engine: "runtime-search",
584 result,
585 affected_rows: 0,
586 statement_type: "select",
587 })
588 }
589 SearchCommand::Hybrid {
590 vector,
591 query,
592 collection,
593 limit,
594 limit_param,
595 } => {
596 if limit_param.is_some() {
597 return Err(RedDBError::Query(
598 "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
599 .to_string(),
600 ));
601 }
602 let res = self.search_hybrid(
603 vector.clone(),
604 query.clone(),
605 Some(*limit),
606 Some(vec![collection.clone()]),
607 None,
608 None,
609 None,
610 Vec::new(),
611 None,
612 None,
613 Some(*limit),
614 )?;
615 let mut result =
616 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
617 for item in &res.matches {
618 let mut record = UnifiedRecord::new();
619 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
620 record.set("score", Value::Float(item.score as f64));
621 result.push(record);
622 }
623 Ok(RuntimeQueryResult {
624 query: raw_query.to_string(),
625 mode: QueryMode::Sql,
626 statement: "search_hybrid",
627 engine: "runtime-search",
628 result,
629 affected_rows: 0,
630 statement_type: "select",
631 })
632 }
633 SearchCommand::Multimodal {
634 query,
635 collection,
636 limit,
637 limit_param,
638 } => {
639 if limit_param.is_some() {
640 return Err(RedDBError::Query(
641 "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
642 .to_string(),
643 ));
644 }
645 let collections = collection.as_ref().map(|c| vec![c.clone()]);
646 let res =
647 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
648 let mut result =
649 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
650 for item in &res.matches {
651 let mut record = UnifiedRecord::new();
652 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
653 record.set("score", Value::Float(item.score as f64));
654 result.push(record);
655 }
656 Ok(RuntimeQueryResult {
657 query: raw_query.to_string(),
658 mode: QueryMode::Sql,
659 statement: "search_multimodal",
660 engine: "runtime-search",
661 result,
662 affected_rows: 0,
663 statement_type: "select",
664 })
665 }
666 SearchCommand::Index {
667 index,
668 value,
669 collection,
670 limit,
671 exact,
672 limit_param,
673 } => {
674 if limit_param.is_some() {
675 return Err(RedDBError::Query(
676 "SEARCH INDEX LIMIT $N parameter was not bound before execution"
677 .to_string(),
678 ));
679 }
680 let collections = collection.as_ref().map(|c| vec![c.clone()]);
681 let res = self.search_index(
682 index.clone(),
683 value.clone(),
684 *exact,
685 collections,
686 None,
687 None,
688 Some(*limit),
689 )?;
690 let mut result =
691 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
692 for item in &res.matches {
693 let mut record = UnifiedRecord::new();
694 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
695 record.set("score", Value::Float(item.score as f64));
696 result.push(record);
697 }
698 Ok(RuntimeQueryResult {
699 query: raw_query.to_string(),
700 mode: QueryMode::Sql,
701 statement: "search_index",
702 engine: "runtime-search",
703 result,
704 affected_rows: 0,
705 statement_type: "select",
706 })
707 }
708 SearchCommand::Context {
709 query,
710 field,
711 collection,
712 limit,
713 depth,
714 limit_param,
715 } => {
716 if limit_param.is_some() {
717 return Err(RedDBError::Query(
718 "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
719 .to_string(),
720 ));
721 }
722 use crate::application::SearchContextInput;
723 let input = SearchContextInput {
727 query: query.clone(),
728 field: field.clone(),
729 vector: None,
730 collections: collection.as_ref().map(|c| vec![c.clone()]),
731 graph_depth: Some(*depth),
732 graph_max_edges: None,
733 max_cross_refs: None,
734 follow_cross_refs: None,
735 expand_graph: None,
736 global_scan: None,
737 reindex: None,
738 limit: Some(*limit),
739 min_score: None,
740 };
741 let scope = self.ai_scope();
742 let res =
743 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
744 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
745 self, &scope, input,
746 )?
747 } else {
748 self.search_context(input)?
749 };
750 let mut result = UnifiedResult::with_columns(vec![
751 "entity_id".into(),
752 "collection".into(),
753 "score".into(),
754 "discovery".into(),
755 "kind".into(),
756 ]);
757 let all_entities = res
758 .tables
759 .iter()
760 .map(|e| (e, "table"))
761 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
762 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
763 .chain(res.vectors.iter().map(|e| (e, "vector")))
764 .chain(res.documents.iter().map(|e| (e, "document")))
765 .chain(res.key_values.iter().map(|e| (e, "kv")));
766 for (entity, kind) in all_entities {
767 let mut record = UnifiedRecord::new();
768 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
769 record.set("collection", Value::text(entity.collection.clone()));
770 record.set("score", Value::Float(entity.score as f64));
771 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
772 record.set("kind", Value::text(kind.to_string()));
773 result.push(record);
774 }
775 Ok(RuntimeQueryResult {
776 query: raw_query.to_string(),
777 mode: QueryMode::Sql,
778 statement: "search_context",
779 engine: "runtime-context",
780 result,
781 affected_rows: 0,
782 statement_type: "select",
783 })
784 }
785 SearchCommand::SpatialRadius {
786 center_lat,
787 center_lon,
788 radius_km,
789 collection,
790 column,
791 limit,
792 limit_param,
793 } => {
794 if limit_param.is_some() {
795 return Err(RedDBError::Query(
796 "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
797 .to_string(),
798 ));
799 }
800 use crate::storage::unified::spatial_index::haversine_km;
801 let _ = column; let store = self.inner.db.store();
803 let entities = store
804 .get_collection(collection)
805 .map(|m| m.query_all(|_| true))
806 .unwrap_or_default();
807
808 let mut hits: Vec<(u64, f64)> = Vec::new();
809 for entity in &entities {
810 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
812 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
813 if dist <= *radius_km {
814 hits.push((entity.id.raw(), dist));
815 }
816 }
817 }
818 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
819 hits.truncate(*limit);
820
821 let mut result =
822 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
823 for (id, dist) in &hits {
824 let mut record = UnifiedRecord::new();
825 record.set("entity_id", Value::UnsignedInteger(*id));
826 record.set("distance_km", Value::Float(*dist));
827 result.push(record);
828 }
829 Ok(RuntimeQueryResult {
830 query: raw_query.to_string(),
831 mode: QueryMode::Sql,
832 statement: "search_spatial_radius",
833 engine: "runtime-spatial",
834 result,
835 affected_rows: 0,
836 statement_type: "select",
837 })
838 }
839 SearchCommand::SpatialBbox {
840 min_lat,
841 min_lon,
842 max_lat,
843 max_lon,
844 collection,
845 column,
846 limit,
847 limit_param,
848 } => {
849 if limit_param.is_some() {
850 return Err(RedDBError::Query(
851 "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
852 .to_string(),
853 ));
854 }
855 let _ = column;
856 let store = self.inner.db.store();
857 let entities = store
858 .get_collection(collection)
859 .map(|m| m.query_all(|_| true))
860 .unwrap_or_default();
861
862 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
863 let mut count = 0;
864 for entity in &entities {
865 if count >= *limit {
866 break;
867 }
868 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
869 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
870 {
871 let mut record = UnifiedRecord::new();
872 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
873 result.push(record);
874 count += 1;
875 }
876 }
877 }
878 Ok(RuntimeQueryResult {
879 query: raw_query.to_string(),
880 mode: QueryMode::Sql,
881 statement: "search_spatial_bbox",
882 engine: "runtime-spatial",
883 result,
884 affected_rows: 0,
885 statement_type: "select",
886 })
887 }
888 SearchCommand::SpatialNearest {
889 lat,
890 lon,
891 k,
892 collection,
893 column,
894 k_param,
895 } => {
896 if k_param.is_some() {
897 return Err(RedDBError::Query(
898 "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
899 .to_string(),
900 ));
901 }
902 use crate::storage::unified::spatial_index::haversine_km;
903 let _ = column;
904 let store = self.inner.db.store();
905 let entities = store
906 .get_collection(collection)
907 .map(|m| m.query_all(|_| true))
908 .unwrap_or_default();
909
910 let mut hits: Vec<(u64, f64)> = Vec::new();
911 for entity in &entities {
912 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
913 let dist = haversine_km(*lat, *lon, elat, elon);
914 hits.push((entity.id.raw(), dist));
915 }
916 }
917 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
918 hits.truncate(*k);
919
920 let mut result =
921 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
922 for (id, dist) in &hits {
923 let mut record = UnifiedRecord::new();
924 record.set("entity_id", Value::UnsignedInteger(*id));
925 record.set("distance_km", Value::Float(*dist));
926 result.push(record);
927 }
928 Ok(RuntimeQueryResult {
929 query: raw_query.to_string(),
930 mode: QueryMode::Sql,
931 statement: "search_spatial_nearest",
932 engine: "runtime-spatial",
933 result,
934 affected_rows: 0,
935 statement_type: "select",
936 })
937 }
938 }
939 }
940}
941
942fn apply_graph_order_and_limit(
943 result: &mut UnifiedResult,
944 statement: &str,
945 order_by: Option<&GraphCommandOrderBy>,
946 limit: Option<usize>,
947) -> RedDBResult<()> {
948 if let Some(order) = order_by {
949 let column = graph_order_metric_column(statement, &order.metric)?;
950 let columns = result.columns.clone();
951 result.records.sort_by(|left, right| {
952 let cmp = compare_graph_values(left.get(column), right.get(column));
953 let cmp = if order.ascending { cmp } else { cmp.reverse() };
954 if cmp == Ordering::Equal {
955 compare_graph_rows(left, right, &columns)
956 } else {
957 cmp
958 }
959 });
960 }
961 if let Some(limit) = limit {
962 result.records.truncate(limit);
963 }
964 Ok(())
965}
966
967fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
968 let metric = metric.to_ascii_lowercase();
969 match (statement, metric.as_str()) {
970 ("graph_centrality", "score" | "centrality_score") => Ok("score"),
971 ("graph_community", "size" | "community_size") => Ok("size"),
972 ("graph_components", "size" | "component_size") => Ok("size"),
973 ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
974 Ok(match metric.as_str() {
975 "total_weight" => "total_weight",
976 "nodes_visited" => "nodes_visited",
977 _ => "hop_count",
978 })
979 }
980 _ => Err(RedDBError::Query(format!(
981 "unsupported ORDER BY metric '{metric}' for GRAPH {}",
982 statement.trim_start_matches("graph_")
983 ))),
984 }
985}
986
987fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
988 for column in columns {
989 let cmp = compare_graph_values(left.get(column), right.get(column));
990 if cmp != Ordering::Equal {
991 return cmp;
992 }
993 }
994 Ordering::Equal
995}
996
997fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
998 match (left, right) {
999 (None, None) => Ordering::Equal,
1000 (None, Some(_)) => Ordering::Less,
1001 (Some(_), None) => Ordering::Greater,
1002 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1003 (Some(Value::Null), Some(_)) => Ordering::Less,
1004 (Some(_), Some(Value::Null)) => Ordering::Greater,
1005 (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1006 (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1007 left.cmp(right)
1008 }
1009 (Some(Value::Float(left)), Some(Value::Float(right))) => {
1010 left.partial_cmp(right).unwrap_or(Ordering::Equal)
1011 }
1012 (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1013 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1014 }
1015 (Some(Value::Float(left)), Some(Value::Integer(right))) => {
1016 left.partial_cmp(&(*right as f64)).unwrap_or(Ordering::Equal)
1017 }
1018 (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1019 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1020 }
1021 (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => {
1022 left.partial_cmp(&(*right as f64)).unwrap_or(Ordering::Equal)
1023 }
1024 (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1025 (*left as i128).cmp(&(*right as i128))
1026 }
1027 (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1028 (*left as i128).cmp(&(*right as i128))
1029 }
1030 (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1031 (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1032 (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1033 (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1034 }
1035}
1036
1037fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1042 match s.to_lowercase().as_str() {
1043 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1044 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1045 "both" | "any" => Ok(RuntimeGraphDirection::Both),
1046 _ => Err(RedDBError::Query(format!(
1047 "unknown direction: '{s}', expected outgoing|incoming|both"
1048 ))),
1049 }
1050}
1051
1052fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1053 match s.to_lowercase().as_str() {
1054 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1055 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1056 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1057 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1058 _ => Err(RedDBError::Query(format!(
1059 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1060 ))),
1061 }
1062}
1063
1064fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1065 match s.to_lowercase().as_str() {
1066 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1067 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1068 _ => Err(RedDBError::Query(format!(
1069 "unknown traversal strategy: '{s}', expected bfs|dfs"
1070 ))),
1071 }
1072}
1073
1074fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1075 match s.to_lowercase().as_str() {
1076 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1077 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1078 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1079 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1080 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1081 _ => Err(RedDBError::Query(format!(
1082 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1083 ))),
1084 }
1085}
1086
1087fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1088 match s.to_lowercase().as_str() {
1089 "label_propagation" | "labelpropagation" => {
1090 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1091 }
1092 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1093 _ => Err(RedDBError::Query(format!(
1094 "unknown community algorithm: '{s}', expected label_propagation|louvain"
1095 ))),
1096 }
1097}
1098
1099fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1100 match s.to_lowercase().as_str() {
1101 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1102 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1103 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1104 _ => Err(RedDBError::Query(format!(
1105 "unknown components mode: '{s}', expected connected|weak|strong"
1106 ))),
1107 }
1108}
1109
1110fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1115 match &entity.data {
1116 EntityData::Row(row) => {
1117 if let Some(ref named) = row.named {
1119 for value in named.values() {
1121 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1122 return Some((
1123 *lat_micro as f64 / 1_000_000.0,
1124 *lon_micro as f64 / 1_000_000.0,
1125 ));
1126 }
1127 }
1128 let lat =
1130 named
1131 .get("lat")
1132 .or_else(|| named.get("latitude"))
1133 .and_then(|v| match v {
1134 Value::Float(f) => Some(*f),
1135 Value::Integer(i) => Some(*i as f64),
1136 _ => None,
1137 });
1138 let lon = named
1139 .get("lon")
1140 .or_else(|| named.get("lng"))
1141 .or_else(|| named.get("longitude"))
1142 .and_then(|v| match v {
1143 Value::Float(f) => Some(*f),
1144 Value::Integer(i) => Some(*i as f64),
1145 _ => None,
1146 });
1147 if let (Some(la), Some(lo)) = (lat, lon) {
1148 return Some((la, lo));
1149 }
1150 }
1151 for value in &row.columns {
1153 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1154 return Some((
1155 *lat_micro as f64 / 1_000_000.0,
1156 *lon_micro as f64 / 1_000_000.0,
1157 ));
1158 }
1159 }
1160 None
1161 }
1162 EntityData::Node(node) => {
1163 for value in node.properties.values() {
1165 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1166 return Some((
1167 *lat_micro as f64 / 1_000_000.0,
1168 *lon_micro as f64 / 1_000_000.0,
1169 ));
1170 }
1171 }
1172 let lat = node
1173 .properties
1174 .get("lat")
1175 .or_else(|| node.properties.get("latitude"))
1176 .and_then(|v| match v {
1177 Value::Float(f) => Some(*f),
1178 Value::Integer(i) => Some(*i as f64),
1179 _ => None,
1180 });
1181 let lon = node
1182 .properties
1183 .get("lon")
1184 .or_else(|| node.properties.get("lng"))
1185 .or_else(|| node.properties.get("longitude"))
1186 .and_then(|v| match v {
1187 Value::Float(f) => Some(*f),
1188 Value::Integer(i) => Some(*i as f64),
1189 _ => None,
1190 });
1191 if let (Some(la), Some(lo)) = (lat, lon) {
1192 return Some((la, lo));
1193 }
1194 None
1195 }
1196 _ => None,
1197 }
1198}