1use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12 pub fn execute_graph_command(
14 &self,
15 raw_query: &str,
16 cmd: &GraphCommand,
17 ) -> RedDBResult<RuntimeQueryResult> {
18 match cmd {
19 GraphCommand::Neighborhood {
20 source,
21 depth,
22 direction,
23 edge_labels,
24 } => {
25 let dir = parse_direction(direction)?;
26 let res = self.graph_neighborhood(
27 source,
28 dir,
29 *depth as usize,
30 edge_labels.clone(),
31 None,
32 )?;
33 let mut result = UnifiedResult::with_columns(vec![
34 "node_id".into(),
35 "label".into(),
36 "node_type".into(),
37 "depth".into(),
38 ]);
39 for visit in &res.nodes {
40 let mut record = UnifiedRecord::new();
41 record.set("node_id", Value::text(visit.node.id.clone()));
42 record.set("label", Value::text(visit.node.label.clone()));
43 record.set("node_type", Value::text(visit.node.node_type.clone()));
44 record.set("depth", Value::Integer(visit.depth as i64));
45 result.push(record);
46 }
47 Ok(RuntimeQueryResult {
48 query: raw_query.to_string(),
49 mode: QueryMode::Sql,
50 statement: "graph_neighborhood",
51 engine: "runtime-graph",
52 result,
53 affected_rows: 0,
54 statement_type: "select",
55 })
56 }
57 GraphCommand::ShortestPath {
58 source,
59 target,
60 algorithm,
61 direction,
62 limit,
63 order_by,
64 } => {
65 let dir = parse_direction(direction)?;
66 let alg = parse_path_algorithm(algorithm)?;
67 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68 let mut result = UnifiedResult::with_columns(vec![
69 "source".into(),
70 "target".into(),
71 "nodes_visited".into(),
72 "negative_cycle_detected".into(),
73 "hop_count".into(),
74 "total_weight".into(),
75 ]);
76 let mut record = UnifiedRecord::new();
77 record.set("source", Value::text(res.source));
78 record.set("target", Value::text(res.target));
79 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80 record.set(
81 "negative_cycle_detected",
82 match res.negative_cycle_detected {
83 Some(value) => Value::Boolean(value),
84 None => Value::Null,
85 },
86 );
87 if let Some(ref path) = res.path {
88 record.set("hop_count", Value::Integer(path.hop_count as i64));
89 record.set("total_weight", Value::Float(path.total_weight));
90 } else {
91 record.set("hop_count", Value::Null);
92 record.set("total_weight", Value::Null);
93 }
94 result.push(record);
95 apply_graph_order_and_limit(
96 &mut result,
97 "graph_shortest_path",
98 order_by.as_ref(),
99 limit.map(|n| n as usize),
100 )?;
101 Ok(RuntimeQueryResult {
102 query: raw_query.to_string(),
103 mode: QueryMode::Sql,
104 statement: "graph_shortest_path",
105 engine: "runtime-graph",
106 result,
107 affected_rows: 0,
108 statement_type: "select",
109 })
110 }
111 GraphCommand::Properties { source } => {
112 if let Some(node_ref) = source {
113 let graph =
117 materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
118 let resolved = resolve_graph_node_id(&graph, node_ref)?;
119 let stored = graph
120 .get_node(&resolved)
121 .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
122 let all_props =
123 materialize_graph_node_properties(self.inner.db.store().as_ref())?;
124 let props = all_props.get(&resolved).cloned().unwrap_or_default();
125
126 let mut prop_keys: Vec<&String> = props.keys().collect();
129 prop_keys.sort();
130 let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
131 columns.push("node_id".into());
132 columns.push("label".into());
133 columns.push("node_type".into());
134 for k in &prop_keys {
135 columns.push((*k).clone());
136 }
137 let mut result = UnifiedResult::with_columns(columns);
138 let mut record = UnifiedRecord::new();
139 record.set("node_id", Value::text(stored.id.clone()));
140 record.set("label", Value::text(stored.label.clone()));
141 record.set("node_type", Value::text(stored.node_type.clone()));
142 for k in &prop_keys {
143 if let Some(v) = props.get(*k) {
144 record.set(k.as_str(), v.clone());
145 }
146 }
147 result.push(record);
148 return Ok(RuntimeQueryResult {
149 query: raw_query.to_string(),
150 mode: QueryMode::Sql,
151 statement: "graph_properties",
152 engine: "runtime-graph",
153 result,
154 affected_rows: 0,
155 statement_type: "select",
156 });
157 }
158 let res = self.graph_properties(None)?;
159 let mut result = UnifiedResult::with_columns(vec![
160 "node_count".into(),
161 "edge_count".into(),
162 "is_connected".into(),
163 "is_complete".into(),
164 "is_cyclic".into(),
165 "density".into(),
166 ]);
167 let mut record = UnifiedRecord::new();
168 record.set("node_count", Value::Integer(res.node_count as i64));
169 record.set("edge_count", Value::Integer(res.edge_count as i64));
170 record.set("is_connected", Value::Boolean(res.is_connected));
171 record.set("is_complete", Value::Boolean(res.is_complete));
172 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
173 record.set("density", Value::Float(res.density));
174 result.push(record);
175 Ok(RuntimeQueryResult {
176 query: raw_query.to_string(),
177 mode: QueryMode::Sql,
178 statement: "graph_properties",
179 engine: "runtime-graph",
180 result,
181 affected_rows: 0,
182 statement_type: "select",
183 })
184 }
185 GraphCommand::Traverse {
186 source,
187 strategy,
188 depth,
189 direction,
190 edge_labels,
191 } => {
192 let dir = parse_direction(direction)?;
193 let strat = parse_traversal_strategy(strategy)?;
194 let res = self.graph_traverse(
195 source,
196 dir,
197 *depth as usize,
198 strat,
199 edge_labels.clone(),
200 None,
201 )?;
202 let mut result = UnifiedResult::with_columns(vec![
203 "node_id".into(),
204 "label".into(),
205 "node_type".into(),
206 "depth".into(),
207 ]);
208 for visit in &res.visits {
209 let mut record = UnifiedRecord::new();
210 record.set("node_id", Value::text(visit.node.id.clone()));
211 record.set("label", Value::text(visit.node.label.clone()));
212 record.set("node_type", Value::text(visit.node.node_type.clone()));
213 record.set("depth", Value::Integer(visit.depth as i64));
214 result.push(record);
215 }
216 Ok(RuntimeQueryResult {
217 query: raw_query.to_string(),
218 mode: QueryMode::Sql,
219 statement: "graph_traverse",
220 engine: "runtime-graph",
221 result,
222 affected_rows: 0,
223 statement_type: "select",
224 })
225 }
226 GraphCommand::Centrality {
227 algorithm,
228 limit,
229 order_by,
230 } => {
231 let alg = parse_centrality_algorithm(algorithm)?;
232 let limit_usize = limit.map(|n| n as usize);
235 let order_needs_full_set = order_by
236 .as_ref()
237 .map(|order| order.ascending)
238 .unwrap_or(false);
239 let top_k = if order_needs_full_set {
240 usize::MAX
241 } else {
242 limit_usize.unwrap_or(100).max(1)
243 };
244 let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
245 let mut result = UnifiedResult::with_columns(vec![
246 "node_id".into(),
247 "label".into(),
248 "score".into(),
249 ]);
250 for score in &res.scores {
251 let mut record = UnifiedRecord::new();
252 record.set("node_id", Value::text(score.node.id.clone()));
253 record.set("label", Value::text(score.node.label.clone()));
254 record.set("score", Value::Float(score.score));
255 result.push(record);
256 }
257 for ds in &res.degree_scores {
258 let mut record = UnifiedRecord::new();
259 record.set("node_id", Value::text(ds.node.id.clone()));
260 record.set("label", Value::text(ds.node.label.clone()));
261 record.set("score", Value::Float(ds.total_degree as f64));
262 result.push(record);
263 }
264 apply_graph_order_and_limit(
265 &mut result,
266 "graph_centrality",
267 order_by.as_ref(),
268 Some(limit_usize.unwrap_or(100)),
269 )?;
270 Ok(RuntimeQueryResult {
271 query: raw_query.to_string(),
272 mode: QueryMode::Sql,
273 statement: "graph_centrality",
274 engine: "runtime-graph",
275 result,
276 affected_rows: 0,
277 statement_type: "select",
278 })
279 }
280 GraphCommand::Community {
281 algorithm,
282 max_iterations,
283 limit,
284 order_by,
285 } => {
286 let alg = parse_community_algorithm(algorithm)?;
287 let res =
288 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
289 let mut result =
290 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
291 for community in &res.communities {
292 let mut record = UnifiedRecord::new();
293 record.set("community_id", Value::text(community.id.clone()));
294 record.set("size", Value::Integer(community.size as i64));
295 result.push(record);
296 }
297 apply_graph_order_and_limit(
298 &mut result,
299 "graph_community",
300 order_by.as_ref(),
301 limit.map(|n| n as usize),
302 )?;
303 Ok(RuntimeQueryResult {
304 query: raw_query.to_string(),
305 mode: QueryMode::Sql,
306 statement: "graph_community",
307 engine: "runtime-graph",
308 result,
309 affected_rows: 0,
310 statement_type: "select",
311 })
312 }
313 GraphCommand::Components {
314 mode,
315 limit,
316 order_by,
317 } => {
318 let m = parse_components_mode(mode)?;
319 let res = self.graph_components(m, 1, None)?;
320 let mut result =
321 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
322 for component in &res.components {
323 let mut record = UnifiedRecord::new();
324 record.set("component_id", Value::text(component.id.clone()));
325 record.set("size", Value::Integer(component.size as i64));
326 result.push(record);
327 }
328 apply_graph_order_and_limit(
329 &mut result,
330 "graph_components",
331 order_by.as_ref(),
332 limit.map(|n| n as usize),
333 )?;
334 Ok(RuntimeQueryResult {
335 query: raw_query.to_string(),
336 mode: QueryMode::Sql,
337 statement: "graph_components",
338 engine: "runtime-graph",
339 result,
340 affected_rows: 0,
341 statement_type: "select",
342 })
343 }
344 GraphCommand::Cycles { max_length } => {
345 let res = self.graph_cycles(*max_length as usize, 100, None)?;
346 let mut result =
347 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
348 for (i, cycle) in res.cycles.iter().enumerate() {
349 let mut record = UnifiedRecord::new();
350 record.set("cycle_index", Value::Integer(i as i64));
351 record.set("length", Value::Integer(cycle.nodes.len() as i64));
352 result.push(record);
353 }
354 Ok(RuntimeQueryResult {
355 query: raw_query.to_string(),
356 mode: QueryMode::Sql,
357 statement: "graph_cycles",
358 engine: "runtime-graph",
359 result,
360 affected_rows: 0,
361 statement_type: "select",
362 })
363 }
364 GraphCommand::Clustering => {
365 let res = self.graph_clustering(100, true, None)?;
366 let mut result = UnifiedResult::with_columns(vec![
367 "node_id".into(),
368 "label".into(),
369 "score".into(),
370 ]);
371 let mut global_record = UnifiedRecord::new();
373 global_record.set("node_id", Value::text("__global__"));
374 global_record.set("label", Value::text("global_clustering"));
375 global_record.set("score", Value::Float(res.global));
376 result.push(global_record);
377 for score in &res.local {
378 let mut record = UnifiedRecord::new();
379 record.set("node_id", Value::text(score.node.id.clone()));
380 record.set("label", Value::text(score.node.label.clone()));
381 record.set("score", Value::Float(score.score));
382 result.push(record);
383 }
384 Ok(RuntimeQueryResult {
385 query: raw_query.to_string(),
386 mode: QueryMode::Sql,
387 statement: "graph_clustering",
388 engine: "runtime-graph",
389 result,
390 affected_rows: 0,
391 statement_type: "select",
392 })
393 }
394 GraphCommand::TopologicalSort => {
395 let res = self.graph_topological_sort(None)?;
396 let mut result = UnifiedResult::with_columns(vec![
397 "order".into(),
398 "node_id".into(),
399 "label".into(),
400 ]);
401 for (i, node) in res.ordered_nodes.iter().enumerate() {
402 let mut record = UnifiedRecord::new();
403 record.set("order", Value::Integer(i as i64));
404 record.set("node_id", Value::text(node.id.clone()));
405 record.set("label", Value::text(node.label.clone()));
406 result.push(record);
407 }
408 Ok(RuntimeQueryResult {
409 query: raw_query.to_string(),
410 mode: QueryMode::Sql,
411 statement: "graph_topological_sort",
412 engine: "runtime-graph",
413 result,
414 affected_rows: 0,
415 statement_type: "select",
416 })
417 }
418 }
419 }
420
421 pub fn execute_search_command(
423 &self,
424 raw_query: &str,
425 cmd: &SearchCommand,
426 ) -> RedDBResult<RuntimeQueryResult> {
427 match cmd {
428 SearchCommand::Similar {
429 vector,
430 text,
431 provider,
432 collection,
433 limit,
434 min_score,
435 vector_param,
436 limit_param,
437 min_score_param,
438 text_param,
439 } => {
440 if vector_param.is_some() {
441 return Err(RedDBError::Query(
442 "SEARCH SIMILAR $N vector parameter was not bound before execution"
443 .to_string(),
444 ));
445 }
446 if limit_param.is_some() {
447 return Err(RedDBError::Query(
448 "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
449 .to_string(),
450 ));
451 }
452 if min_score_param.is_some() {
453 return Err(RedDBError::Query(
454 "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
455 .to_string(),
456 ));
457 }
458 if text_param.is_some() {
459 return Err(RedDBError::Query(
460 "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
461 .to_string(),
462 ));
463 }
464 let search_vector = if let Some(query_text) = text {
466 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
467 let provider = match provider.as_deref() {
468 Some(p) => crate::ai::parse_provider(p)?,
469 None => default_provider,
470 };
471 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
472 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
473 .ok()
474 .unwrap_or_else(|| provider.default_embedding_model().to_string());
475 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
476 let request = crate::ai::OpenAiEmbeddingRequest {
477 api_key,
478 model,
479 inputs: vec![query_text.clone()],
480 dimensions: None,
481 api_base: provider.resolve_api_base(),
482 };
483 let response = crate::runtime::ai::block_on_ai(async move {
484 crate::ai::openai_embeddings_async(&transport, request).await
485 })
486 .and_then(|result| result)?;
487 response.embeddings.into_iter().next().ok_or_else(|| {
488 RedDBError::Query("embedding API returned no vectors".to_string())
489 })?
490 } else {
491 vector.clone()
492 };
493 let scope = self.ai_scope();
497 let results =
498 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
499 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
500 self,
501 &scope,
502 collection,
503 &search_vector,
504 *limit,
505 *min_score,
506 )?
507 } else {
508 self.search_similar(collection, &search_vector, *limit, *min_score)?
510 };
511 let mut result =
512 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
513 for sr in &results {
514 let mut record = UnifiedRecord::new();
515 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
516 record.set("score", Value::Float(sr.score as f64));
517 result.push(record);
518 }
519 Ok(RuntimeQueryResult {
520 query: raw_query.to_string(),
521 mode: QueryMode::Sql,
522 statement: "search_similar",
523 engine: "runtime-search",
524 result,
525 affected_rows: 0,
526 statement_type: "select",
527 })
528 }
529 SearchCommand::Text {
530 query,
531 collection,
532 limit,
533 fuzzy,
534 limit_param,
535 } => {
536 if limit_param.is_some() {
537 return Err(RedDBError::Query(
538 "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
539 ));
540 }
541 let collections = collection.as_ref().map(|c| vec![c.clone()]);
542 let scope = self.ai_scope();
544 let res =
545 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
546 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
547 self,
548 &scope,
549 query.clone(),
550 collections,
551 None,
552 None,
553 None,
554 Some(*limit),
555 *fuzzy,
556 )?
557 } else {
558 self.search_text(
559 query.clone(),
560 collections,
561 None,
562 None,
563 None,
564 Some(*limit),
565 *fuzzy,
566 )?
567 };
568 let mut result =
569 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
570 for item in &res.matches {
571 let mut record = UnifiedRecord::new();
572 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
573 record.set("score", Value::Float(item.score as f64));
574 result.push(record);
575 }
576 Ok(RuntimeQueryResult {
577 query: raw_query.to_string(),
578 mode: QueryMode::Sql,
579 statement: "search_text",
580 engine: "runtime-search",
581 result,
582 affected_rows: 0,
583 statement_type: "select",
584 })
585 }
586 SearchCommand::Hybrid {
587 vector,
588 query,
589 collection,
590 limit,
591 limit_param,
592 } => {
593 if limit_param.is_some() {
594 return Err(RedDBError::Query(
595 "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
596 .to_string(),
597 ));
598 }
599 let res = self.search_hybrid(
600 vector.clone(),
601 query.clone(),
602 Some(*limit),
603 Some(vec![collection.clone()]),
604 None,
605 None,
606 None,
607 Vec::new(),
608 None,
609 None,
610 Some(*limit),
611 )?;
612 let mut result =
613 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
614 for item in &res.matches {
615 let mut record = UnifiedRecord::new();
616 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
617 record.set("score", Value::Float(item.score as f64));
618 result.push(record);
619 }
620 Ok(RuntimeQueryResult {
621 query: raw_query.to_string(),
622 mode: QueryMode::Sql,
623 statement: "search_hybrid",
624 engine: "runtime-search",
625 result,
626 affected_rows: 0,
627 statement_type: "select",
628 })
629 }
630 SearchCommand::Multimodal {
631 query,
632 collection,
633 limit,
634 limit_param,
635 } => {
636 if limit_param.is_some() {
637 return Err(RedDBError::Query(
638 "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
639 .to_string(),
640 ));
641 }
642 let collections = collection.as_ref().map(|c| vec![c.clone()]);
643 let res =
644 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
645 let mut result =
646 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
647 for item in &res.matches {
648 let mut record = UnifiedRecord::new();
649 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
650 record.set("score", Value::Float(item.score as f64));
651 result.push(record);
652 }
653 Ok(RuntimeQueryResult {
654 query: raw_query.to_string(),
655 mode: QueryMode::Sql,
656 statement: "search_multimodal",
657 engine: "runtime-search",
658 result,
659 affected_rows: 0,
660 statement_type: "select",
661 })
662 }
663 SearchCommand::Index {
664 index,
665 value,
666 collection,
667 limit,
668 exact,
669 limit_param,
670 } => {
671 if limit_param.is_some() {
672 return Err(RedDBError::Query(
673 "SEARCH INDEX LIMIT $N parameter was not bound before execution"
674 .to_string(),
675 ));
676 }
677 let collections = collection.as_ref().map(|c| vec![c.clone()]);
678 let res = self.search_index(
679 index.clone(),
680 value.clone(),
681 *exact,
682 collections,
683 None,
684 None,
685 Some(*limit),
686 )?;
687 let mut result =
688 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
689 for item in &res.matches {
690 let mut record = UnifiedRecord::new();
691 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
692 record.set("score", Value::Float(item.score as f64));
693 result.push(record);
694 }
695 Ok(RuntimeQueryResult {
696 query: raw_query.to_string(),
697 mode: QueryMode::Sql,
698 statement: "search_index",
699 engine: "runtime-search",
700 result,
701 affected_rows: 0,
702 statement_type: "select",
703 })
704 }
705 SearchCommand::Context {
706 query,
707 field,
708 collection,
709 limit,
710 depth,
711 limit_param,
712 } => {
713 if limit_param.is_some() {
714 return Err(RedDBError::Query(
715 "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
716 .to_string(),
717 ));
718 }
719 use crate::application::SearchContextInput;
720 let input = SearchContextInput {
724 query: query.clone(),
725 field: field.clone(),
726 vector: None,
727 collections: collection.as_ref().map(|c| vec![c.clone()]),
728 graph_depth: Some(*depth),
729 graph_max_edges: None,
730 max_cross_refs: None,
731 follow_cross_refs: None,
732 expand_graph: None,
733 global_scan: None,
734 reindex: None,
735 limit: Some(*limit),
736 min_score: None,
737 };
738 let scope = self.ai_scope();
739 let res =
740 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
741 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
742 self, &scope, input,
743 )?
744 } else {
745 self.search_context(input)?
746 };
747 let mut result = UnifiedResult::with_columns(vec![
748 "entity_id".into(),
749 "collection".into(),
750 "score".into(),
751 "discovery".into(),
752 "kind".into(),
753 ]);
754 let all_entities = res
755 .tables
756 .iter()
757 .map(|e| (e, "table"))
758 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
759 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
760 .chain(res.vectors.iter().map(|e| (e, "vector")))
761 .chain(res.documents.iter().map(|e| (e, "document")))
762 .chain(res.key_values.iter().map(|e| (e, "kv")));
763 for (entity, kind) in all_entities {
764 let mut record = UnifiedRecord::new();
765 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
766 record.set("collection", Value::text(entity.collection.clone()));
767 record.set("score", Value::Float(entity.score as f64));
768 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
769 record.set("kind", Value::text(kind.to_string()));
770 result.push(record);
771 }
772 Ok(RuntimeQueryResult {
773 query: raw_query.to_string(),
774 mode: QueryMode::Sql,
775 statement: "search_context",
776 engine: "runtime-context",
777 result,
778 affected_rows: 0,
779 statement_type: "select",
780 })
781 }
782 SearchCommand::SpatialRadius {
783 center_lat,
784 center_lon,
785 radius_km,
786 collection,
787 column,
788 limit,
789 limit_param,
790 } => {
791 if limit_param.is_some() {
792 return Err(RedDBError::Query(
793 "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
794 .to_string(),
795 ));
796 }
797 use crate::storage::unified::spatial_index::haversine_km;
798 let _ = column; let store = self.inner.db.store();
800 let entities = store
801 .get_collection(collection)
802 .map(|m| m.query_all(|_| true))
803 .unwrap_or_default();
804
805 let mut hits: Vec<(u64, f64)> = Vec::new();
806 for entity in &entities {
807 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
809 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
810 if dist <= *radius_km {
811 hits.push((entity.id.raw(), dist));
812 }
813 }
814 }
815 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
816 hits.truncate(*limit);
817
818 let mut result =
819 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
820 for (id, dist) in &hits {
821 let mut record = UnifiedRecord::new();
822 record.set("entity_id", Value::UnsignedInteger(*id));
823 record.set("distance_km", Value::Float(*dist));
824 result.push(record);
825 }
826 Ok(RuntimeQueryResult {
827 query: raw_query.to_string(),
828 mode: QueryMode::Sql,
829 statement: "search_spatial_radius",
830 engine: "runtime-spatial",
831 result,
832 affected_rows: 0,
833 statement_type: "select",
834 })
835 }
836 SearchCommand::SpatialBbox {
837 min_lat,
838 min_lon,
839 max_lat,
840 max_lon,
841 collection,
842 column,
843 limit,
844 limit_param,
845 } => {
846 if limit_param.is_some() {
847 return Err(RedDBError::Query(
848 "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
849 .to_string(),
850 ));
851 }
852 let _ = column;
853 let store = self.inner.db.store();
854 let entities = store
855 .get_collection(collection)
856 .map(|m| m.query_all(|_| true))
857 .unwrap_or_default();
858
859 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
860 let mut count = 0;
861 for entity in &entities {
862 if count >= *limit {
863 break;
864 }
865 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
866 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
867 {
868 let mut record = UnifiedRecord::new();
869 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
870 result.push(record);
871 count += 1;
872 }
873 }
874 }
875 Ok(RuntimeQueryResult {
876 query: raw_query.to_string(),
877 mode: QueryMode::Sql,
878 statement: "search_spatial_bbox",
879 engine: "runtime-spatial",
880 result,
881 affected_rows: 0,
882 statement_type: "select",
883 })
884 }
885 SearchCommand::SpatialNearest {
886 lat,
887 lon,
888 k,
889 collection,
890 column,
891 k_param,
892 } => {
893 if k_param.is_some() {
894 return Err(RedDBError::Query(
895 "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
896 .to_string(),
897 ));
898 }
899 use crate::storage::unified::spatial_index::haversine_km;
900 let _ = column;
901 let store = self.inner.db.store();
902 let entities = store
903 .get_collection(collection)
904 .map(|m| m.query_all(|_| true))
905 .unwrap_or_default();
906
907 let mut hits: Vec<(u64, f64)> = Vec::new();
908 for entity in &entities {
909 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
910 let dist = haversine_km(*lat, *lon, elat, elon);
911 hits.push((entity.id.raw(), dist));
912 }
913 }
914 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
915 hits.truncate(*k);
916
917 let mut result =
918 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
919 for (id, dist) in &hits {
920 let mut record = UnifiedRecord::new();
921 record.set("entity_id", Value::UnsignedInteger(*id));
922 record.set("distance_km", Value::Float(*dist));
923 result.push(record);
924 }
925 Ok(RuntimeQueryResult {
926 query: raw_query.to_string(),
927 mode: QueryMode::Sql,
928 statement: "search_spatial_nearest",
929 engine: "runtime-spatial",
930 result,
931 affected_rows: 0,
932 statement_type: "select",
933 })
934 }
935 }
936 }
937}
938
939fn apply_graph_order_and_limit(
940 result: &mut UnifiedResult,
941 statement: &str,
942 order_by: Option<&GraphCommandOrderBy>,
943 limit: Option<usize>,
944) -> RedDBResult<()> {
945 if let Some(order) = order_by {
946 let column = graph_order_metric_column(statement, &order.metric)?;
947 let columns = result.columns.clone();
948 result.records.sort_by(|left, right| {
949 let cmp = compare_graph_values(left.get(column), right.get(column));
950 let cmp = if order.ascending { cmp } else { cmp.reverse() };
951 if cmp == Ordering::Equal {
952 compare_graph_rows(left, right, &columns)
953 } else {
954 cmp
955 }
956 });
957 }
958 if let Some(limit) = limit {
959 result.records.truncate(limit);
960 }
961 Ok(())
962}
963
964fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
965 let metric = metric.to_ascii_lowercase();
966 match (statement, metric.as_str()) {
967 ("graph_centrality", "score" | "centrality_score") => Ok("score"),
968 ("graph_community", "size" | "community_size") => Ok("size"),
969 ("graph_components", "size" | "component_size") => Ok("size"),
970 ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
971 Ok(match metric.as_str() {
972 "total_weight" => "total_weight",
973 "nodes_visited" => "nodes_visited",
974 _ => "hop_count",
975 })
976 }
977 _ => Err(RedDBError::Query(format!(
978 "unsupported ORDER BY metric '{metric}' for GRAPH {}",
979 statement.trim_start_matches("graph_")
980 ))),
981 }
982}
983
984fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
985 for column in columns {
986 let cmp = compare_graph_values(left.get(column), right.get(column));
987 if cmp != Ordering::Equal {
988 return cmp;
989 }
990 }
991 Ordering::Equal
992}
993
994fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
995 match (left, right) {
996 (None, None) => Ordering::Equal,
997 (None, Some(_)) => Ordering::Less,
998 (Some(_), None) => Ordering::Greater,
999 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1000 (Some(Value::Null), Some(_)) => Ordering::Less,
1001 (Some(_), Some(Value::Null)) => Ordering::Greater,
1002 (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1003 (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1004 left.cmp(right)
1005 }
1006 (Some(Value::Float(left)), Some(Value::Float(right))) => {
1007 left.partial_cmp(right).unwrap_or(Ordering::Equal)
1008 }
1009 (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1010 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1011 }
1012 (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1013 .partial_cmp(&(*right as f64))
1014 .unwrap_or(Ordering::Equal),
1015 (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1016 (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1017 }
1018 (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1019 .partial_cmp(&(*right as f64))
1020 .unwrap_or(Ordering::Equal),
1021 (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1022 (*left as i128).cmp(&(*right as i128))
1023 }
1024 (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1025 (*left as i128).cmp(&(*right as i128))
1026 }
1027 (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1028 (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1029 (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1030 (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1031 }
1032}
1033
1034fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1039 match s.to_lowercase().as_str() {
1040 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1041 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1042 "both" | "any" => Ok(RuntimeGraphDirection::Both),
1043 _ => Err(RedDBError::Query(format!(
1044 "unknown direction: '{s}', expected outgoing|incoming|both"
1045 ))),
1046 }
1047}
1048
1049fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1050 match s.to_lowercase().as_str() {
1051 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1052 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1053 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1054 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1055 _ => Err(RedDBError::Query(format!(
1056 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1057 ))),
1058 }
1059}
1060
1061fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1062 match s.to_lowercase().as_str() {
1063 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1064 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1065 _ => Err(RedDBError::Query(format!(
1066 "unknown traversal strategy: '{s}', expected bfs|dfs"
1067 ))),
1068 }
1069}
1070
1071fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1072 match s.to_lowercase().as_str() {
1073 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1074 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1075 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1076 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1077 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1078 _ => Err(RedDBError::Query(format!(
1079 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1080 ))),
1081 }
1082}
1083
1084fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1085 match s.to_lowercase().as_str() {
1086 "label_propagation" | "labelpropagation" => {
1087 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1088 }
1089 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1090 _ => Err(RedDBError::Query(format!(
1091 "unknown community algorithm: '{s}', expected label_propagation|louvain"
1092 ))),
1093 }
1094}
1095
1096fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1097 match s.to_lowercase().as_str() {
1098 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1099 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1100 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1101 _ => Err(RedDBError::Query(format!(
1102 "unknown components mode: '{s}', expected connected|weak|strong"
1103 ))),
1104 }
1105}
1106
1107fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1112 match &entity.data {
1113 EntityData::Row(row) => {
1114 if let Some(ref named) = row.named {
1116 for value in named.values() {
1118 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1119 return Some((
1120 *lat_micro as f64 / 1_000_000.0,
1121 *lon_micro as f64 / 1_000_000.0,
1122 ));
1123 }
1124 }
1125 let lat =
1127 named
1128 .get("lat")
1129 .or_else(|| named.get("latitude"))
1130 .and_then(|v| match v {
1131 Value::Float(f) => Some(*f),
1132 Value::Integer(i) => Some(*i as f64),
1133 _ => None,
1134 });
1135 let lon = named
1136 .get("lon")
1137 .or_else(|| named.get("lng"))
1138 .or_else(|| named.get("longitude"))
1139 .and_then(|v| match v {
1140 Value::Float(f) => Some(*f),
1141 Value::Integer(i) => Some(*i as f64),
1142 _ => None,
1143 });
1144 if let (Some(la), Some(lo)) = (lat, lon) {
1145 return Some((la, lo));
1146 }
1147 }
1148 for value in &row.columns {
1150 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1151 return Some((
1152 *lat_micro as f64 / 1_000_000.0,
1153 *lon_micro as f64 / 1_000_000.0,
1154 ));
1155 }
1156 }
1157 None
1158 }
1159 EntityData::Node(node) => {
1160 for value in node.properties.values() {
1162 if let Value::GeoPoint(lat_micro, lon_micro) = value {
1163 return Some((
1164 *lat_micro as f64 / 1_000_000.0,
1165 *lon_micro as f64 / 1_000_000.0,
1166 ));
1167 }
1168 }
1169 let lat = node
1170 .properties
1171 .get("lat")
1172 .or_else(|| node.properties.get("latitude"))
1173 .and_then(|v| match v {
1174 Value::Float(f) => Some(*f),
1175 Value::Integer(i) => Some(*i as f64),
1176 _ => None,
1177 });
1178 let lon = node
1179 .properties
1180 .get("lon")
1181 .or_else(|| node.properties.get("lng"))
1182 .or_else(|| node.properties.get("longitude"))
1183 .and_then(|v| match v {
1184 Value::Float(f) => Some(*f),
1185 Value::Integer(i) => Some(*i as f64),
1186 _ => None,
1187 });
1188 if let (Some(la), Some(lo)) = (lat, lon) {
1189 return Some((la, lo));
1190 }
1191 None
1192 }
1193 _ => None,
1194 }
1195}