1use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12 pub fn execute_graph_command(
14 &self,
15 raw_query: &str,
16 cmd: &GraphCommand,
17 ) -> RedDBResult<RuntimeQueryResult> {
18 match cmd {
19 GraphCommand::Neighborhood {
20 source,
21 depth,
22 direction,
23 edge_labels,
24 } => {
25 let dir = parse_direction(direction)?;
26 let res = self.graph_neighborhood(
27 source,
28 dir,
29 *depth as usize,
30 edge_labels.clone(),
31 None,
32 )?;
33 let mut result = UnifiedResult::with_columns(vec![
34 "node_id".into(),
35 "label".into(),
36 "node_type".into(),
37 "depth".into(),
38 ]);
39 for visit in &res.nodes {
40 let mut record = UnifiedRecord::new();
41 record.set("node_id", Value::text(visit.node.id.clone()));
42 record.set("label", Value::text(visit.node.label.clone()));
43 record.set("node_type", Value::text(visit.node.node_type.clone()));
44 record.set("depth", Value::Integer(visit.depth as i64));
45 result.push(record);
46 }
47 Ok(RuntimeQueryResult {
48 query: raw_query.to_string(),
49 mode: QueryMode::Sql,
50 statement: "graph_neighborhood",
51 engine: "runtime-graph",
52 result,
53 affected_rows: 0,
54 statement_type: "select",
55 })
56 }
57 GraphCommand::ShortestPath {
58 source,
59 target,
60 algorithm,
61 direction,
62 limit,
63 order_by,
64 } => {
65 let dir = parse_direction(direction)?;
66 let alg = parse_path_algorithm(algorithm)?;
67 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68 let mut result = UnifiedResult::with_columns(vec![
69 "source".into(),
70 "target".into(),
71 "nodes_visited".into(),
72 "negative_cycle_detected".into(),
73 "hop_count".into(),
74 "total_weight".into(),
75 ]);
76 let mut record = UnifiedRecord::new();
77 record.set("source", Value::text(res.source));
78 record.set("target", Value::text(res.target));
79 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80 record.set(
81 "negative_cycle_detected",
82 match res.negative_cycle_detected {
83 Some(value) => Value::Boolean(value),
84 None => Value::Null,
85 },
86 );
87 if let Some(ref path) = res.path {
88 record.set("hop_count", Value::Integer(path.hop_count as i64));
89 record.set("total_weight", Value::Float(path.total_weight));
90 } else {
91 record.set("hop_count", Value::Null);
92 record.set("total_weight", Value::Null);
93 }
94 result.push(record);
95 apply_graph_order_and_limit(
96 &mut result,
97 "graph_shortest_path",
98 order_by.as_ref(),
99 limit.map(|n| n as usize),
100 )?;
101 Ok(RuntimeQueryResult {
102 query: raw_query.to_string(),
103 mode: QueryMode::Sql,
104 statement: "graph_shortest_path",
105 engine: "runtime-graph",
106 result,
107 affected_rows: 0,
108 statement_type: "select",
109 })
110 }
111 GraphCommand::Properties { source } => {
112 if let Some(node_ref) = source {
113 let graph =
117 materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
118 let resolved = resolve_graph_node_id(&graph, node_ref)?;
119 let stored = graph
120 .get_node(&resolved)
121 .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
122 let node_type = self
123 .inner
124 .db
125 .store()
126 .query_all(|entity| {
127 entity.id.raw().to_string() == resolved
128 && matches!(
129 entity.kind,
130 crate::storage::unified::EntityKind::GraphNode(_)
131 )
132 })
133 .into_iter()
134 .find_map(|(_, entity)| match entity.kind {
135 crate::storage::unified::EntityKind::GraphNode(node) => {
136 Some(node.node_type)
137 }
138 _ => None,
139 })
140 .unwrap_or_else(|| stored.node_type.clone());
141 let all_props =
142 materialize_graph_node_properties(self.inner.db.store().as_ref())?;
143 let props = all_props.get(&resolved).cloned().unwrap_or_default();
144
145 let mut prop_keys: Vec<&String> = props.keys().collect();
148 prop_keys.sort();
149 let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
150 columns.push("node_id".into());
151 columns.push("label".into());
152 columns.push("node_type".into());
153 for k in &prop_keys {
154 columns.push((*k).clone());
155 }
156 let mut result = UnifiedResult::with_columns(columns);
157 let mut record = UnifiedRecord::new();
158 record.set("node_id", Value::text(stored.id.clone()));
159 record.set("label", Value::text(stored.label.clone()));
160 record.set("node_type", Value::text(node_type));
161 for k in &prop_keys {
162 if let Some(v) = props.get(*k) {
163 record.set(k.as_str(), v.clone());
164 }
165 }
166 result.push(record);
167 return Ok(RuntimeQueryResult {
168 query: raw_query.to_string(),
169 mode: QueryMode::Sql,
170 statement: "graph_properties",
171 engine: "runtime-graph",
172 result,
173 affected_rows: 0,
174 statement_type: "select",
175 });
176 }
177 let res = self.graph_properties(None)?;
178 let mut result = UnifiedResult::with_columns(vec![
179 "node_count".into(),
180 "edge_count".into(),
181 "is_connected".into(),
182 "is_complete".into(),
183 "is_cyclic".into(),
184 "density".into(),
185 ]);
186 let mut record = UnifiedRecord::new();
187 record.set("node_count", Value::Integer(res.node_count as i64));
188 record.set("edge_count", Value::Integer(res.edge_count as i64));
189 record.set("is_connected", Value::Boolean(res.is_connected));
190 record.set("is_complete", Value::Boolean(res.is_complete));
191 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
192 record.set("density", Value::Float(res.density));
193 result.push(record);
194 Ok(RuntimeQueryResult {
195 query: raw_query.to_string(),
196 mode: QueryMode::Sql,
197 statement: "graph_properties",
198 engine: "runtime-graph",
199 result,
200 affected_rows: 0,
201 statement_type: "select",
202 })
203 }
204 GraphCommand::Traverse {
205 source,
206 strategy,
207 depth,
208 direction,
209 edge_labels,
210 } => {
211 let dir = parse_direction(direction)?;
212 let strat = parse_traversal_strategy(strategy)?;
213 let res = self.graph_traverse(
214 source,
215 dir,
216 *depth as usize,
217 strat,
218 edge_labels.clone(),
219 None,
220 )?;
221 let mut result = UnifiedResult::with_columns(vec![
222 "node_id".into(),
223 "label".into(),
224 "node_type".into(),
225 "depth".into(),
226 ]);
227 for visit in &res.visits {
228 let mut record = UnifiedRecord::new();
229 record.set("node_id", Value::text(visit.node.id.clone()));
230 record.set("label", Value::text(visit.node.label.clone()));
231 record.set("node_type", Value::text(visit.node.node_type.clone()));
232 record.set("depth", Value::Integer(visit.depth as i64));
233 result.push(record);
234 }
235 Ok(RuntimeQueryResult {
236 query: raw_query.to_string(),
237 mode: QueryMode::Sql,
238 statement: "graph_traverse",
239 engine: "runtime-graph",
240 result,
241 affected_rows: 0,
242 statement_type: "select",
243 })
244 }
245 GraphCommand::Centrality {
246 algorithm,
247 limit,
248 order_by,
249 } => {
250 let alg = parse_centrality_algorithm(algorithm)?;
251 let limit_usize = limit.map(|n| n as usize);
254 let order_needs_full_set = order_by
255 .as_ref()
256 .map(|order| order.ascending)
257 .unwrap_or(false);
258 let top_k = if order_needs_full_set {
259 usize::MAX
260 } else {
261 limit_usize.unwrap_or(100).max(1)
262 };
263 let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
264 let mut result = UnifiedResult::with_columns(vec![
265 "node_id".into(),
266 "label".into(),
267 "score".into(),
268 ]);
269 for score in &res.scores {
270 let mut record = UnifiedRecord::new();
271 record.set("node_id", Value::text(score.node.id.clone()));
272 record.set("label", Value::text(score.node.label.clone()));
273 record.set("score", Value::Float(score.score));
274 result.push(record);
275 }
276 for ds in &res.degree_scores {
277 let mut record = UnifiedRecord::new();
278 record.set("node_id", Value::text(ds.node.id.clone()));
279 record.set("label", Value::text(ds.node.label.clone()));
280 record.set("score", Value::Float(ds.total_degree as f64));
281 result.push(record);
282 }
283 apply_graph_order_and_limit(
284 &mut result,
285 "graph_centrality",
286 order_by.as_ref(),
287 Some(limit_usize.unwrap_or(100)),
288 )?;
289 Ok(RuntimeQueryResult {
290 query: raw_query.to_string(),
291 mode: QueryMode::Sql,
292 statement: "graph_centrality",
293 engine: "runtime-graph",
294 result,
295 affected_rows: 0,
296 statement_type: "select",
297 })
298 }
299 GraphCommand::Community {
300 algorithm,
301 max_iterations,
302 limit,
303 order_by,
304 } => {
305 let alg = parse_community_algorithm(algorithm)?;
306 let res =
307 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
308 let mut result =
309 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
310 for community in &res.communities {
311 let mut record = UnifiedRecord::new();
312 record.set("community_id", Value::text(community.id.clone()));
313 record.set("size", Value::Integer(community.size as i64));
314 result.push(record);
315 }
316 apply_graph_order_and_limit(
317 &mut result,
318 "graph_community",
319 order_by.as_ref(),
320 limit.map(|n| n as usize),
321 )?;
322 Ok(RuntimeQueryResult {
323 query: raw_query.to_string(),
324 mode: QueryMode::Sql,
325 statement: "graph_community",
326 engine: "runtime-graph",
327 result,
328 affected_rows: 0,
329 statement_type: "select",
330 })
331 }
332 GraphCommand::Components {
333 mode,
334 limit,
335 order_by,
336 } => {
337 let m = parse_components_mode(mode)?;
338 let res = self.graph_components(m, 1, None)?;
339 let mut result =
340 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
341 for component in &res.components {
342 let mut record = UnifiedRecord::new();
343 record.set("component_id", Value::text(component.id.clone()));
344 record.set("size", Value::Integer(component.size as i64));
345 result.push(record);
346 }
347 apply_graph_order_and_limit(
348 &mut result,
349 "graph_components",
350 order_by.as_ref(),
351 limit.map(|n| n as usize),
352 )?;
353 Ok(RuntimeQueryResult {
354 query: raw_query.to_string(),
355 mode: QueryMode::Sql,
356 statement: "graph_components",
357 engine: "runtime-graph",
358 result,
359 affected_rows: 0,
360 statement_type: "select",
361 })
362 }
363 GraphCommand::Cycles { max_length } => {
364 let res = self.graph_cycles(*max_length as usize, 100, None)?;
365 let mut result =
366 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
367 for (i, cycle) in res.cycles.iter().enumerate() {
368 let mut record = UnifiedRecord::new();
369 record.set("cycle_index", Value::Integer(i as i64));
370 record.set("length", Value::Integer(cycle.nodes.len() as i64));
371 result.push(record);
372 }
373 Ok(RuntimeQueryResult {
374 query: raw_query.to_string(),
375 mode: QueryMode::Sql,
376 statement: "graph_cycles",
377 engine: "runtime-graph",
378 result,
379 affected_rows: 0,
380 statement_type: "select",
381 })
382 }
383 GraphCommand::Clustering => {
384 let res = self.graph_clustering(100, true, None)?;
385 let mut result = UnifiedResult::with_columns(vec![
386 "node_id".into(),
387 "label".into(),
388 "score".into(),
389 ]);
390 let mut global_record = UnifiedRecord::new();
392 global_record.set("node_id", Value::text("__global__"));
393 global_record.set("label", Value::text("global_clustering"));
394 global_record.set("score", Value::Float(res.global));
395 result.push(global_record);
396 for score in &res.local {
397 let mut record = UnifiedRecord::new();
398 record.set("node_id", Value::text(score.node.id.clone()));
399 record.set("label", Value::text(score.node.label.clone()));
400 record.set("score", Value::Float(score.score));
401 result.push(record);
402 }
403 Ok(RuntimeQueryResult {
404 query: raw_query.to_string(),
405 mode: QueryMode::Sql,
406 statement: "graph_clustering",
407 engine: "runtime-graph",
408 result,
409 affected_rows: 0,
410 statement_type: "select",
411 })
412 }
413 GraphCommand::TopologicalSort => {
414 let res = self.graph_topological_sort(None)?;
415 let mut result = UnifiedResult::with_columns(vec![
416 "order".into(),
417 "node_id".into(),
418 "label".into(),
419 ]);
420 for (i, node) in res.ordered_nodes.iter().enumerate() {
421 let mut record = UnifiedRecord::new();
422 record.set("order", Value::Integer(i as i64));
423 record.set("node_id", Value::text(node.id.clone()));
424 record.set("label", Value::text(node.label.clone()));
425 result.push(record);
426 }
427 Ok(RuntimeQueryResult {
428 query: raw_query.to_string(),
429 mode: QueryMode::Sql,
430 statement: "graph_topological_sort",
431 engine: "runtime-graph",
432 result,
433 affected_rows: 0,
434 statement_type: "select",
435 })
436 }
437 }
438 }
439
440 pub fn execute_search_command(
442 &self,
443 raw_query: &str,
444 cmd: &SearchCommand,
445 ) -> RedDBResult<RuntimeQueryResult> {
446 match cmd {
447 SearchCommand::Similar {
448 vector,
449 text,
450 provider,
451 collection,
452 limit,
453 min_score,
454 vector_param,
455 limit_param,
456 min_score_param,
457 text_param,
458 } => {
459 if vector_param.is_some() {
460 return Err(RedDBError::Query(
461 "SEARCH SIMILAR $N vector parameter was not bound before execution"
462 .to_string(),
463 ));
464 }
465 if limit_param.is_some() {
466 return Err(RedDBError::Query(
467 "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
468 .to_string(),
469 ));
470 }
471 if min_score_param.is_some() {
472 return Err(RedDBError::Query(
473 "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
474 .to_string(),
475 ));
476 }
477 if text_param.is_some() {
478 return Err(RedDBError::Query(
479 "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
480 .to_string(),
481 ));
482 }
483 let search_vector = if let Some(query_text) = text {
485 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
486 let provider = match provider.as_deref() {
487 Some(p) => crate::ai::parse_provider(p)?,
488 None => default_provider,
489 };
490 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
491 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
492 .ok()
493 .unwrap_or_else(|| provider.default_embedding_model().to_string());
494 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
495 let request = crate::ai::OpenAiEmbeddingRequest {
496 api_key,
497 model,
498 inputs: vec![query_text.clone()],
499 dimensions: None,
500 api_base: provider.resolve_api_base(),
501 };
502 let response = crate::runtime::ai::block_on_ai(async move {
503 crate::ai::openai_embeddings_async(&transport, request).await
504 })
505 .and_then(|result| result)?;
506 response.embeddings.into_iter().next().ok_or_else(|| {
507 RedDBError::Query("embedding API returned no vectors".to_string())
508 })?
509 } else {
510 vector.clone()
511 };
512 let scope = self.ai_scope();
516 let results =
517 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
518 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
519 self,
520 &scope,
521 collection,
522 &search_vector,
523 *limit,
524 *min_score,
525 )?
526 } else {
527 self.search_similar(collection, &search_vector, *limit, *min_score)?
529 };
530 let mut result =
531 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
532 for sr in &results {
533 let mut record = UnifiedRecord::new();
534 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
535 record.set("score", Value::Float(sr.score as f64));
536 result.push(record);
537 }
538 Ok(RuntimeQueryResult {
539 query: raw_query.to_string(),
540 mode: QueryMode::Sql,
541 statement: "search_similar",
542 engine: "runtime-search",
543 result,
544 affected_rows: 0,
545 statement_type: "select",
546 })
547 }
548 SearchCommand::Text {
549 query,
550 collection,
551 limit,
552 fuzzy,
553 limit_param,
554 } => {
555 if limit_param.is_some() {
556 return Err(RedDBError::Query(
557 "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
558 ));
559 }
560 let collections = collection.as_ref().map(|c| vec![c.clone()]);
561 let scope = self.ai_scope();
563 let res =
564 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
565 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
566 self,
567 &scope,
568 query.clone(),
569 collections,
570 None,
571 None,
572 None,
573 Some(*limit),
574 *fuzzy,
575 )?
576 } else {
577 self.search_text(
578 query.clone(),
579 collections,
580 None,
581 None,
582 None,
583 Some(*limit),
584 *fuzzy,
585 )?
586 };
587 let mut result =
588 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
589 for item in &res.matches {
590 let mut record = UnifiedRecord::new();
591 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
592 record.set("score", Value::Float(item.score as f64));
593 result.push(record);
594 }
595 Ok(RuntimeQueryResult {
596 query: raw_query.to_string(),
597 mode: QueryMode::Sql,
598 statement: "search_text",
599 engine: "runtime-search",
600 result,
601 affected_rows: 0,
602 statement_type: "select",
603 })
604 }
605 SearchCommand::Hybrid {
606 vector,
607 query,
608 collection,
609 limit,
610 limit_param,
611 } => {
612 if limit_param.is_some() {
613 return Err(RedDBError::Query(
614 "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
615 .to_string(),
616 ));
617 }
618 let res = self.search_hybrid(
619 vector.clone(),
620 query.clone(),
621 Some(*limit),
622 Some(vec![collection.clone()]),
623 None,
624 None,
625 None,
626 Vec::new(),
627 None,
628 None,
629 Some(*limit),
630 )?;
631 let mut result =
632 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
633 for item in &res.matches {
634 let mut record = UnifiedRecord::new();
635 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
636 record.set("score", Value::Float(item.score as f64));
637 result.push(record);
638 }
639 Ok(RuntimeQueryResult {
640 query: raw_query.to_string(),
641 mode: QueryMode::Sql,
642 statement: "search_hybrid",
643 engine: "runtime-search",
644 result,
645 affected_rows: 0,
646 statement_type: "select",
647 })
648 }
649 SearchCommand::Multimodal {
650 query,
651 collection,
652 limit,
653 limit_param,
654 } => {
655 if limit_param.is_some() {
656 return Err(RedDBError::Query(
657 "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
658 .to_string(),
659 ));
660 }
661 let collections = collection.as_ref().map(|c| vec![c.clone()]);
662 let res =
663 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
664 let mut result =
665 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
666 for item in &res.matches {
667 let mut record = UnifiedRecord::new();
668 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
669 record.set("score", Value::Float(item.score as f64));
670 result.push(record);
671 }
672 Ok(RuntimeQueryResult {
673 query: raw_query.to_string(),
674 mode: QueryMode::Sql,
675 statement: "search_multimodal",
676 engine: "runtime-search",
677 result,
678 affected_rows: 0,
679 statement_type: "select",
680 })
681 }
682 SearchCommand::Index {
683 index,
684 value,
685 collection,
686 limit,
687 exact,
688 limit_param,
689 } => {
690 if limit_param.is_some() {
691 return Err(RedDBError::Query(
692 "SEARCH INDEX LIMIT $N parameter was not bound before execution"
693 .to_string(),
694 ));
695 }
696 let collections = collection.as_ref().map(|c| vec![c.clone()]);
697 let res = self.search_index(
698 index.clone(),
699 value.clone(),
700 *exact,
701 collections,
702 None,
703 None,
704 Some(*limit),
705 )?;
706 let mut result =
707 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
708 for item in &res.matches {
709 let mut record = UnifiedRecord::new();
710 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
711 record.set("score", Value::Float(item.score as f64));
712 result.push(record);
713 }
714 Ok(RuntimeQueryResult {
715 query: raw_query.to_string(),
716 mode: QueryMode::Sql,
717 statement: "search_index",
718 engine: "runtime-search",
719 result,
720 affected_rows: 0,
721 statement_type: "select",
722 })
723 }
724 SearchCommand::Context {
725 query,
726 field,
727 collection,
728 limit,
729 depth,
730 limit_param,
731 } => {
732 if limit_param.is_some() {
733 return Err(RedDBError::Query(
734 "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
735 .to_string(),
736 ));
737 }
738 use crate::application::SearchContextInput;
739 let input = SearchContextInput {
743 query: query.clone(),
744 field: field.clone(),
745 vector: None,
746 collections: collection.as_ref().map(|c| vec![c.clone()]),
747 graph_depth: Some(*depth),
748 graph_max_edges: None,
749 max_cross_refs: None,
750 follow_cross_refs: None,
751 expand_graph: None,
752 global_scan: None,
753 reindex: None,
754 limit: Some(*limit),
755 min_score: None,
756 };
757 let scope = self.ai_scope();
758 let res =
759 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
760 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
761 self, &scope, input,
762 )?
763 } else {
764 self.search_context(input)?
765 };
766 let mut result = UnifiedResult::with_columns(vec![
767 "entity_id".into(),
768 "collection".into(),
769 "score".into(),
770 "discovery".into(),
771 "kind".into(),
772 ]);
773 let all_entities = res
774 .tables
775 .iter()
776 .map(|e| (e, "table"))
777 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
778 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
779 .chain(res.vectors.iter().map(|e| (e, "vector")))
780 .chain(res.documents.iter().map(|e| (e, "document")))
781 .chain(res.key_values.iter().map(|e| (e, "kv")));
782 for (entity, kind) in all_entities {
783 let mut record = UnifiedRecord::new();
784 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
785 record.set("collection", Value::text(entity.collection.clone()));
786 record.set("score", Value::Float(entity.score as f64));
787 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
788 record.set("kind", Value::text(kind.to_string()));
789 result.push(record);
790 }
791 Ok(RuntimeQueryResult {
792 query: raw_query.to_string(),
793 mode: QueryMode::Sql,
794 statement: "search_context",
795 engine: "runtime-context",
796 result,
797 affected_rows: 0,
798 statement_type: "select",
799 })
800 }
801 SearchCommand::SpatialRadius {
802 center_lat,
803 center_lon,
804 radius_km,
805 collection,
806 column,
807 limit,
808 limit_param,
809 } => {
810 if limit_param.is_some() {
811 return Err(RedDBError::Query(
812 "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
813 .to_string(),
814 ));
815 }
816 use crate::storage::unified::spatial_index::haversine_km;
817 let _ = column; let store = self.inner.db.store();
819 let entities = store
820 .get_collection(collection)
821 .map(|m| m.query_all(|_| true))
822 .unwrap_or_default();
823
824 let mut hits: Vec<(u64, f64)> = Vec::new();
825 for entity in &entities {
826 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
828 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
829 if dist <= *radius_km {
830 hits.push((entity.id.raw(), dist));
831 }
832 }
833 }
834 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
835 hits.truncate(*limit);
836
837 let mut result =
838 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
839 for (id, dist) in &hits {
840 let mut record = UnifiedRecord::new();
841 record.set("entity_id", Value::UnsignedInteger(*id));
842 record.set("distance_km", Value::Float(*dist));
843 result.push(record);
844 }
845 Ok(RuntimeQueryResult {
846 query: raw_query.to_string(),
847 mode: QueryMode::Sql,
848 statement: "search_spatial_radius",
849 engine: "runtime-spatial",
850 result,
851 affected_rows: 0,
852 statement_type: "select",
853 })
854 }
855 SearchCommand::SpatialBbox {
856 min_lat,
857 min_lon,
858 max_lat,
859 max_lon,
860 collection,
861 column,
862 limit,
863 limit_param,
864 } => {
865 if limit_param.is_some() {
866 return Err(RedDBError::Query(
867 "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
868 .to_string(),
869 ));
870 }
871 let _ = column;
872 let store = self.inner.db.store();
873 let entities = store
874 .get_collection(collection)
875 .map(|m| m.query_all(|_| true))
876 .unwrap_or_default();
877
878 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
879 let mut count = 0;
880 for entity in &entities {
881 if count >= *limit {
882 break;
883 }
884 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
885 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
886 {
887 let mut record = UnifiedRecord::new();
888 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
889 result.push(record);
890 count += 1;
891 }
892 }
893 }
894 Ok(RuntimeQueryResult {
895 query: raw_query.to_string(),
896 mode: QueryMode::Sql,
897 statement: "search_spatial_bbox",
898 engine: "runtime-spatial",
899 result,
900 affected_rows: 0,
901 statement_type: "select",
902 })
903 }
904 SearchCommand::SpatialNearest {
905 lat,
906 lon,
907 k,
908 collection,
909 column,
910 k_param,
911 } => {
912 if k_param.is_some() {
913 return Err(RedDBError::Query(
914 "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
915 .to_string(),
916 ));
917 }
918 use crate::storage::unified::spatial_index::haversine_km;
919 let _ = column;
920 let store = self.inner.db.store();
921 let entities = store
922 .get_collection(collection)
923 .map(|m| m.query_all(|_| true))
924 .unwrap_or_default();
925
926 let mut hits: Vec<(u64, f64)> = Vec::new();
927 for entity in &entities {
928 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
929 let dist = haversine_km(*lat, *lon, elat, elon);
930 hits.push((entity.id.raw(), dist));
931 }
932 }
933 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
934 hits.truncate(*k);
935
936 let mut result =
937 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
938 for (id, dist) in &hits {
939 let mut record = UnifiedRecord::new();
940 record.set("entity_id", Value::UnsignedInteger(*id));
941 record.set("distance_km", Value::Float(*dist));
942 result.push(record);
943 }
944 Ok(RuntimeQueryResult {
945 query: raw_query.to_string(),
946 mode: QueryMode::Sql,
947 statement: "search_spatial_nearest",
948 engine: "runtime-spatial",
949 result,
950 affected_rows: 0,
951 statement_type: "select",
952 })
953 }
954 }
955 }
956}
957
958fn apply_graph_order_and_limit(
959 result: &mut UnifiedResult,
960 statement: &str,
961 order_by: Option<&GraphCommandOrderBy>,
962 limit: Option<usize>,
963) -> RedDBResult<()> {
964 if let Some(order) = order_by {
965 let column = graph_order_metric_column(statement, &order.metric)?;
966 let columns = result.columns.clone();
967 result.records.sort_by(|left, right| {
968 let cmp = compare_graph_values(left.get(column), right.get(column));
969 let cmp = if order.ascending { cmp } else { cmp.reverse() };
970 if cmp == Ordering::Equal {
971 compare_graph_rows(left, right, &columns)
972 } else {
973 cmp
974 }
975 });
976 }
977 if let Some(limit) = limit {
978 result.records.truncate(limit);
979 }
980 Ok(())
981}
982
983fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
984 let metric = metric.to_ascii_lowercase();
985 match (statement, metric.as_str()) {
986 ("graph_centrality", "score" | "centrality_score") => Ok("score"),
987 ("graph_community", "size" | "community_size") => Ok("size"),
988 ("graph_components", "size" | "component_size") => Ok("size"),
989 ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
990 Ok(match metric.as_str() {
991 "total_weight" => "total_weight",
992 "nodes_visited" => "nodes_visited",
993 _ => "hop_count",
994 })
995 }
996 _ => Err(RedDBError::Query(format!(
997 "unsupported ORDER BY metric '{metric}' for GRAPH {}",
998 statement.trim_start_matches("graph_")
999 ))),
1000 }
1001}
1002
1003fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1004 for column in columns {
1005 let cmp = compare_graph_values(left.get(column), right.get(column));
1006 if cmp != Ordering::Equal {
1007 return cmp;
1008 }
1009 }
1010 Ordering::Equal
1011}
1012
1013fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1014 match (left, right) {
1015 (None, None) => Ordering::Equal,
1016 (None, Some(_)) => Ordering::Less,
1017 (Some(_), None) => Ordering::Greater,
1018 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1019 (Some(Value::Null), Some(_)) => Ordering::Less,
1020 (Some(_), Some(Value::Null)) => Ordering::Greater,
1021 (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1022 (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1023 left.cmp(right)
1024 }
1025 (Some(Value::Float(left)), Some(Value::Float(right))) => {
1026 left.partial_cmp(right).unwrap_or(Ordering::Equal)
1027 }
1028 (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1029 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1030 }
1031 (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1032 .partial_cmp(&(*right as f64))
1033 .unwrap_or(Ordering::Equal),
1034 (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1035 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1036 }
1037 (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1038 .partial_cmp(&(*right as f64))
1039 .unwrap_or(Ordering::Equal),
1040 (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1041 (*left as i128).cmp(&(*right as i128))
1042 }
1043 (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1044 (*left as i128).cmp(&(*right as i128))
1045 }
1046 (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1047 (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1048 (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1049 (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1050 }
1051}
1052
1053fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1058 match s.to_lowercase().as_str() {
1059 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1060 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1061 "both" | "any" => Ok(RuntimeGraphDirection::Both),
1062 _ => Err(RedDBError::Query(format!(
1063 "unknown direction: '{s}', expected outgoing|incoming|both"
1064 ))),
1065 }
1066}
1067
1068fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1069 match s.to_lowercase().as_str() {
1070 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1071 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1072 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1073 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1074 _ => Err(RedDBError::Query(format!(
1075 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1076 ))),
1077 }
1078}
1079
1080fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1081 match s.to_lowercase().as_str() {
1082 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1083 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1084 _ => Err(RedDBError::Query(format!(
1085 "unknown traversal strategy: '{s}', expected bfs|dfs"
1086 ))),
1087 }
1088}
1089
1090fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1091 match s.to_lowercase().as_str() {
1092 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1093 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1094 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1095 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1096 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1097 _ => Err(RedDBError::Query(format!(
1098 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1099 ))),
1100 }
1101}
1102
1103fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1104 match s.to_lowercase().as_str() {
1105 "label_propagation" | "labelpropagation" => {
1106 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1107 }
1108 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1109 _ => Err(RedDBError::Query(format!(
1110 "unknown community algorithm: '{s}', expected label_propagation|louvain"
1111 ))),
1112 }
1113}
1114
1115fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1116 match s.to_lowercase().as_str() {
1117 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1118 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1119 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1120 _ => Err(RedDBError::Query(format!(
1121 "unknown components mode: '{s}', expected connected|weak|strong"
1122 ))),
1123 }
1124}
1125
1126fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1131 match &entity.data {
1132 EntityData::Row(row) => {
1133 if let Some(ref named) = row.named {
1135 for value in named.values() {
1137 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1138 return Some((
1139 *lat_micro as f64 / 1_000_000.0,
1140 *lon_micro as f64 / 1_000_000.0,
1141 ));
1142 }
1143 }
1144 let lat =
1146 named
1147 .get("lat")
1148 .or_else(|| named.get("latitude"))
1149 .and_then(|v| match v {
1150 Value::Float(f) => Some(*f),
1151 Value::Integer(i) => Some(*i as f64),
1152 _ => None,
1153 });
1154 let lon = named
1155 .get("lon")
1156 .or_else(|| named.get("lng"))
1157 .or_else(|| named.get("longitude"))
1158 .and_then(|v| match v {
1159 Value::Float(f) => Some(*f),
1160 Value::Integer(i) => Some(*i as f64),
1161 _ => None,
1162 });
1163 if let (Some(la), Some(lo)) = (lat, lon) {
1164 return Some((la, lo));
1165 }
1166 }
1167 for value in &row.columns {
1169 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1170 return Some((
1171 *lat_micro as f64 / 1_000_000.0,
1172 *lon_micro as f64 / 1_000_000.0,
1173 ));
1174 }
1175 }
1176 None
1177 }
1178 EntityData::Node(node) => {
1179 for value in node.properties.values() {
1181 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1182 return Some((
1183 *lat_micro as f64 / 1_000_000.0,
1184 *lon_micro as f64 / 1_000_000.0,
1185 ));
1186 }
1187 }
1188 let lat = node
1189 .properties
1190 .get("lat")
1191 .or_else(|| node.properties.get("latitude"))
1192 .and_then(|v| match v {
1193 Value::Float(f) => Some(*f),
1194 Value::Integer(i) => Some(*i as f64),
1195 _ => None,
1196 });
1197 let lon = node
1198 .properties
1199 .get("lon")
1200 .or_else(|| node.properties.get("lng"))
1201 .or_else(|| node.properties.get("longitude"))
1202 .and_then(|v| match v {
1203 Value::Float(f) => Some(*f),
1204 Value::Integer(i) => Some(*i as f64),
1205 _ => None,
1206 });
1207 if let (Some(la), Some(lo)) = (lat, lon) {
1208 return Some((la, lo));
1209 }
1210 None
1211 }
1212 _ => None,
1213 }
1214}