1use super::*;
8
9impl RedDBRuntime {
10 pub fn execute_graph_command(
12 &self,
13 raw_query: &str,
14 cmd: &GraphCommand,
15 ) -> RedDBResult<RuntimeQueryResult> {
16 match cmd {
17 GraphCommand::Neighborhood {
18 source,
19 depth,
20 direction,
21 } => {
22 let dir = parse_direction(direction)?;
23 let res = self.graph_neighborhood(source, dir, *depth as usize, None, None)?;
24 let mut result = UnifiedResult::with_columns(vec![
25 "node_id".into(),
26 "label".into(),
27 "node_type".into(),
28 "depth".into(),
29 ]);
30 for visit in &res.nodes {
31 let mut record = UnifiedRecord::new();
32 record.set("node_id", Value::text(visit.node.id.clone()));
33 record.set("label", Value::text(visit.node.label.clone()));
34 record.set("node_type", Value::text(visit.node.node_type.clone()));
35 record.set("depth", Value::Integer(visit.depth as i64));
36 result.push(record);
37 }
38 Ok(RuntimeQueryResult {
39 query: raw_query.to_string(),
40 mode: QueryMode::Sql,
41 statement: "graph_neighborhood",
42 engine: "runtime-graph",
43 result,
44 affected_rows: 0,
45 statement_type: "select",
46 })
47 }
48 GraphCommand::ShortestPath {
49 source,
50 target,
51 algorithm,
52 direction,
53 } => {
54 let dir = parse_direction(direction)?;
55 let alg = parse_path_algorithm(algorithm)?;
56 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
57 let mut result = UnifiedResult::with_columns(vec![
58 "source".into(),
59 "target".into(),
60 "nodes_visited".into(),
61 "negative_cycle_detected".into(),
62 "hop_count".into(),
63 "total_weight".into(),
64 ]);
65 let mut record = UnifiedRecord::new();
66 record.set("source", Value::text(res.source));
67 record.set("target", Value::text(res.target));
68 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
69 record.set(
70 "negative_cycle_detected",
71 match res.negative_cycle_detected {
72 Some(value) => Value::Boolean(value),
73 None => Value::Null,
74 },
75 );
76 if let Some(ref path) = res.path {
77 record.set("hop_count", Value::Integer(path.hop_count as i64));
78 record.set("total_weight", Value::Float(path.total_weight));
79 } else {
80 record.set("hop_count", Value::Null);
81 record.set("total_weight", Value::Null);
82 }
83 result.push(record);
84 Ok(RuntimeQueryResult {
85 query: raw_query.to_string(),
86 mode: QueryMode::Sql,
87 statement: "graph_shortest_path",
88 engine: "runtime-graph",
89 result,
90 affected_rows: 0,
91 statement_type: "select",
92 })
93 }
94 GraphCommand::Properties => {
95 let res = self.graph_properties(None)?;
96 let mut result = UnifiedResult::with_columns(vec![
97 "node_count".into(),
98 "edge_count".into(),
99 "is_connected".into(),
100 "is_complete".into(),
101 "is_cyclic".into(),
102 "density".into(),
103 ]);
104 let mut record = UnifiedRecord::new();
105 record.set("node_count", Value::Integer(res.node_count as i64));
106 record.set("edge_count", Value::Integer(res.edge_count as i64));
107 record.set("is_connected", Value::Boolean(res.is_connected));
108 record.set("is_complete", Value::Boolean(res.is_complete));
109 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
110 record.set("density", Value::Float(res.density));
111 result.push(record);
112 Ok(RuntimeQueryResult {
113 query: raw_query.to_string(),
114 mode: QueryMode::Sql,
115 statement: "graph_properties",
116 engine: "runtime-graph",
117 result,
118 affected_rows: 0,
119 statement_type: "select",
120 })
121 }
122 GraphCommand::Traverse {
123 source,
124 strategy,
125 depth,
126 direction,
127 } => {
128 let dir = parse_direction(direction)?;
129 let strat = parse_traversal_strategy(strategy)?;
130 let res = self.graph_traverse(source, dir, *depth as usize, strat, None, None)?;
131 let mut result = UnifiedResult::with_columns(vec![
132 "node_id".into(),
133 "label".into(),
134 "node_type".into(),
135 "depth".into(),
136 ]);
137 for visit in &res.visits {
138 let mut record = UnifiedRecord::new();
139 record.set("node_id", Value::text(visit.node.id.clone()));
140 record.set("label", Value::text(visit.node.label.clone()));
141 record.set("node_type", Value::text(visit.node.node_type.clone()));
142 record.set("depth", Value::Integer(visit.depth as i64));
143 result.push(record);
144 }
145 Ok(RuntimeQueryResult {
146 query: raw_query.to_string(),
147 mode: QueryMode::Sql,
148 statement: "graph_traverse",
149 engine: "runtime-graph",
150 result,
151 affected_rows: 0,
152 statement_type: "select",
153 })
154 }
155 GraphCommand::Centrality { algorithm } => {
156 let alg = parse_centrality_algorithm(algorithm)?;
157 let res = self.graph_centrality(alg, 100, false, None, None, None, None)?;
158 let mut result = UnifiedResult::with_columns(vec![
159 "node_id".into(),
160 "label".into(),
161 "score".into(),
162 ]);
163 for score in &res.scores {
164 let mut record = UnifiedRecord::new();
165 record.set("node_id", Value::text(score.node.id.clone()));
166 record.set("label", Value::text(score.node.label.clone()));
167 record.set("score", Value::Float(score.score));
168 result.push(record);
169 }
170 for ds in &res.degree_scores {
171 let mut record = UnifiedRecord::new();
172 record.set("node_id", Value::text(ds.node.id.clone()));
173 record.set("label", Value::text(ds.node.label.clone()));
174 record.set("score", Value::Float(ds.total_degree as f64));
175 result.push(record);
176 }
177 Ok(RuntimeQueryResult {
178 query: raw_query.to_string(),
179 mode: QueryMode::Sql,
180 statement: "graph_centrality",
181 engine: "runtime-graph",
182 result,
183 affected_rows: 0,
184 statement_type: "select",
185 })
186 }
187 GraphCommand::Community {
188 algorithm,
189 max_iterations,
190 } => {
191 let alg = parse_community_algorithm(algorithm)?;
192 let res =
193 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
194 let mut result =
195 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
196 for community in &res.communities {
197 let mut record = UnifiedRecord::new();
198 record.set("community_id", Value::text(community.id.clone()));
199 record.set("size", Value::Integer(community.size as i64));
200 result.push(record);
201 }
202 Ok(RuntimeQueryResult {
203 query: raw_query.to_string(),
204 mode: QueryMode::Sql,
205 statement: "graph_community",
206 engine: "runtime-graph",
207 result,
208 affected_rows: 0,
209 statement_type: "select",
210 })
211 }
212 GraphCommand::Components { mode } => {
213 let m = parse_components_mode(mode)?;
214 let res = self.graph_components(m, 1, None)?;
215 let mut result =
216 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
217 for component in &res.components {
218 let mut record = UnifiedRecord::new();
219 record.set("component_id", Value::text(component.id.clone()));
220 record.set("size", Value::Integer(component.size as i64));
221 result.push(record);
222 }
223 Ok(RuntimeQueryResult {
224 query: raw_query.to_string(),
225 mode: QueryMode::Sql,
226 statement: "graph_components",
227 engine: "runtime-graph",
228 result,
229 affected_rows: 0,
230 statement_type: "select",
231 })
232 }
233 GraphCommand::Cycles { max_length } => {
234 let res = self.graph_cycles(*max_length as usize, 100, None)?;
235 let mut result =
236 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
237 for (i, cycle) in res.cycles.iter().enumerate() {
238 let mut record = UnifiedRecord::new();
239 record.set("cycle_index", Value::Integer(i as i64));
240 record.set("length", Value::Integer(cycle.nodes.len() as i64));
241 result.push(record);
242 }
243 Ok(RuntimeQueryResult {
244 query: raw_query.to_string(),
245 mode: QueryMode::Sql,
246 statement: "graph_cycles",
247 engine: "runtime-graph",
248 result,
249 affected_rows: 0,
250 statement_type: "select",
251 })
252 }
253 GraphCommand::Clustering => {
254 let res = self.graph_clustering(100, true, None)?;
255 let mut result = UnifiedResult::with_columns(vec![
256 "node_id".into(),
257 "label".into(),
258 "score".into(),
259 ]);
260 let mut global_record = UnifiedRecord::new();
262 global_record.set("node_id", Value::text("__global__"));
263 global_record.set("label", Value::text("global_clustering"));
264 global_record.set("score", Value::Float(res.global));
265 result.push(global_record);
266 for score in &res.local {
267 let mut record = UnifiedRecord::new();
268 record.set("node_id", Value::text(score.node.id.clone()));
269 record.set("label", Value::text(score.node.label.clone()));
270 record.set("score", Value::Float(score.score));
271 result.push(record);
272 }
273 Ok(RuntimeQueryResult {
274 query: raw_query.to_string(),
275 mode: QueryMode::Sql,
276 statement: "graph_clustering",
277 engine: "runtime-graph",
278 result,
279 affected_rows: 0,
280 statement_type: "select",
281 })
282 }
283 GraphCommand::TopologicalSort => {
284 let res = self.graph_topological_sort(None)?;
285 let mut result = UnifiedResult::with_columns(vec![
286 "order".into(),
287 "node_id".into(),
288 "label".into(),
289 ]);
290 for (i, node) in res.ordered_nodes.iter().enumerate() {
291 let mut record = UnifiedRecord::new();
292 record.set("order", Value::Integer(i as i64));
293 record.set("node_id", Value::text(node.id.clone()));
294 record.set("label", Value::text(node.label.clone()));
295 result.push(record);
296 }
297 Ok(RuntimeQueryResult {
298 query: raw_query.to_string(),
299 mode: QueryMode::Sql,
300 statement: "graph_topological_sort",
301 engine: "runtime-graph",
302 result,
303 affected_rows: 0,
304 statement_type: "select",
305 })
306 }
307 }
308 }
309
310 pub fn execute_search_command(
312 &self,
313 raw_query: &str,
314 cmd: &SearchCommand,
315 ) -> RedDBResult<RuntimeQueryResult> {
316 match cmd {
317 SearchCommand::Similar {
318 vector,
319 text,
320 provider,
321 collection,
322 limit,
323 min_score,
324 vector_param,
325 limit_param,
326 min_score_param,
327 text_param,
328 } => {
329 if vector_param.is_some() {
330 return Err(RedDBError::Query(
331 "SEARCH SIMILAR $N vector parameter was not bound before execution"
332 .to_string(),
333 ));
334 }
335 if limit_param.is_some() {
336 return Err(RedDBError::Query(
337 "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
338 .to_string(),
339 ));
340 }
341 if min_score_param.is_some() {
342 return Err(RedDBError::Query(
343 "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
344 .to_string(),
345 ));
346 }
347 if text_param.is_some() {
348 return Err(RedDBError::Query(
349 "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
350 .to_string(),
351 ));
352 }
353 let search_vector = if let Some(query_text) = text {
355 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
356 let provider = match provider.as_deref() {
357 Some(p) => crate::ai::parse_provider(p)?,
358 None => default_provider,
359 };
360 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
361 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
362 .ok()
363 .unwrap_or_else(|| provider.default_embedding_model().to_string());
364 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
365 let request = crate::ai::OpenAiEmbeddingRequest {
366 api_key,
367 model,
368 inputs: vec![query_text.clone()],
369 dimensions: None,
370 api_base: provider.resolve_api_base(),
371 };
372 let response = crate::runtime::ai::block_on_ai(async move {
373 crate::ai::openai_embeddings_async(&transport, request).await
374 })
375 .and_then(|result| result)?;
376 response.embeddings.into_iter().next().ok_or_else(|| {
377 RedDBError::Query("embedding API returned no vectors".to_string())
378 })?
379 } else {
380 vector.clone()
381 };
382 let scope = self.ai_scope();
386 let results =
387 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
388 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
389 self,
390 &scope,
391 collection,
392 &search_vector,
393 *limit,
394 *min_score,
395 )?
396 } else {
397 self.search_similar(collection, &search_vector, *limit, *min_score)?
399 };
400 let mut result =
401 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
402 for sr in &results {
403 let mut record = UnifiedRecord::new();
404 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
405 record.set("score", Value::Float(sr.score as f64));
406 result.push(record);
407 }
408 Ok(RuntimeQueryResult {
409 query: raw_query.to_string(),
410 mode: QueryMode::Sql,
411 statement: "search_similar",
412 engine: "runtime-search",
413 result,
414 affected_rows: 0,
415 statement_type: "select",
416 })
417 }
418 SearchCommand::Text {
419 query,
420 collection,
421 limit,
422 fuzzy,
423 limit_param,
424 } => {
425 if limit_param.is_some() {
426 return Err(RedDBError::Query(
427 "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
428 ));
429 }
430 let collections = collection.as_ref().map(|c| vec![c.clone()]);
431 let scope = self.ai_scope();
433 let res =
434 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
435 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
436 self,
437 &scope,
438 query.clone(),
439 collections,
440 None,
441 None,
442 None,
443 Some(*limit),
444 *fuzzy,
445 )?
446 } else {
447 self.search_text(
448 query.clone(),
449 collections,
450 None,
451 None,
452 None,
453 Some(*limit),
454 *fuzzy,
455 )?
456 };
457 let mut result =
458 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
459 for item in &res.matches {
460 let mut record = UnifiedRecord::new();
461 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
462 record.set("score", Value::Float(item.score as f64));
463 result.push(record);
464 }
465 Ok(RuntimeQueryResult {
466 query: raw_query.to_string(),
467 mode: QueryMode::Sql,
468 statement: "search_text",
469 engine: "runtime-search",
470 result,
471 affected_rows: 0,
472 statement_type: "select",
473 })
474 }
475 SearchCommand::Hybrid {
476 vector,
477 query,
478 collection,
479 limit,
480 limit_param,
481 } => {
482 if limit_param.is_some() {
483 return Err(RedDBError::Query(
484 "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
485 .to_string(),
486 ));
487 }
488 let res = self.search_hybrid(
489 vector.clone(),
490 query.clone(),
491 Some(*limit),
492 Some(vec![collection.clone()]),
493 None,
494 None,
495 None,
496 Vec::new(),
497 None,
498 None,
499 Some(*limit),
500 )?;
501 let mut result =
502 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
503 for item in &res.matches {
504 let mut record = UnifiedRecord::new();
505 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
506 record.set("score", Value::Float(item.score as f64));
507 result.push(record);
508 }
509 Ok(RuntimeQueryResult {
510 query: raw_query.to_string(),
511 mode: QueryMode::Sql,
512 statement: "search_hybrid",
513 engine: "runtime-search",
514 result,
515 affected_rows: 0,
516 statement_type: "select",
517 })
518 }
519 SearchCommand::Multimodal {
520 query,
521 collection,
522 limit,
523 limit_param,
524 } => {
525 if limit_param.is_some() {
526 return Err(RedDBError::Query(
527 "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
528 .to_string(),
529 ));
530 }
531 let collections = collection.as_ref().map(|c| vec![c.clone()]);
532 let res =
533 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
534 let mut result =
535 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
536 for item in &res.matches {
537 let mut record = UnifiedRecord::new();
538 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
539 record.set("score", Value::Float(item.score as f64));
540 result.push(record);
541 }
542 Ok(RuntimeQueryResult {
543 query: raw_query.to_string(),
544 mode: QueryMode::Sql,
545 statement: "search_multimodal",
546 engine: "runtime-search",
547 result,
548 affected_rows: 0,
549 statement_type: "select",
550 })
551 }
552 SearchCommand::Index {
553 index,
554 value,
555 collection,
556 limit,
557 exact,
558 limit_param,
559 } => {
560 if limit_param.is_some() {
561 return Err(RedDBError::Query(
562 "SEARCH INDEX LIMIT $N parameter was not bound before execution"
563 .to_string(),
564 ));
565 }
566 let collections = collection.as_ref().map(|c| vec![c.clone()]);
567 let res = self.search_index(
568 index.clone(),
569 value.clone(),
570 *exact,
571 collections,
572 None,
573 None,
574 Some(*limit),
575 )?;
576 let mut result =
577 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
578 for item in &res.matches {
579 let mut record = UnifiedRecord::new();
580 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
581 record.set("score", Value::Float(item.score as f64));
582 result.push(record);
583 }
584 Ok(RuntimeQueryResult {
585 query: raw_query.to_string(),
586 mode: QueryMode::Sql,
587 statement: "search_index",
588 engine: "runtime-search",
589 result,
590 affected_rows: 0,
591 statement_type: "select",
592 })
593 }
594 SearchCommand::Context {
595 query,
596 field,
597 collection,
598 limit,
599 depth,
600 limit_param,
601 } => {
602 if limit_param.is_some() {
603 return Err(RedDBError::Query(
604 "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
605 .to_string(),
606 ));
607 }
608 use crate::application::SearchContextInput;
609 let input = SearchContextInput {
613 query: query.clone(),
614 field: field.clone(),
615 vector: None,
616 collections: collection.as_ref().map(|c| vec![c.clone()]),
617 graph_depth: Some(*depth),
618 graph_max_edges: None,
619 max_cross_refs: None,
620 follow_cross_refs: None,
621 expand_graph: None,
622 global_scan: None,
623 reindex: None,
624 limit: Some(*limit),
625 min_score: None,
626 };
627 let scope = self.ai_scope();
628 let res =
629 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
630 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
631 self, &scope, input,
632 )?
633 } else {
634 self.search_context(input)?
635 };
636 let mut result = UnifiedResult::with_columns(vec![
637 "entity_id".into(),
638 "collection".into(),
639 "score".into(),
640 "discovery".into(),
641 "kind".into(),
642 ]);
643 let all_entities = res
644 .tables
645 .iter()
646 .map(|e| (e, "table"))
647 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
648 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
649 .chain(res.vectors.iter().map(|e| (e, "vector")))
650 .chain(res.documents.iter().map(|e| (e, "document")))
651 .chain(res.key_values.iter().map(|e| (e, "kv")));
652 for (entity, kind) in all_entities {
653 let mut record = UnifiedRecord::new();
654 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
655 record.set("collection", Value::text(entity.collection.clone()));
656 record.set("score", Value::Float(entity.score as f64));
657 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
658 record.set("kind", Value::text(kind.to_string()));
659 result.push(record);
660 }
661 Ok(RuntimeQueryResult {
662 query: raw_query.to_string(),
663 mode: QueryMode::Sql,
664 statement: "search_context",
665 engine: "runtime-context",
666 result,
667 affected_rows: 0,
668 statement_type: "select",
669 })
670 }
671 SearchCommand::SpatialRadius {
672 center_lat,
673 center_lon,
674 radius_km,
675 collection,
676 column,
677 limit,
678 limit_param,
679 } => {
680 if limit_param.is_some() {
681 return Err(RedDBError::Query(
682 "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
683 .to_string(),
684 ));
685 }
686 use crate::storage::unified::spatial_index::haversine_km;
687 let _ = column; let store = self.inner.db.store();
689 let entities = store
690 .get_collection(collection)
691 .map(|m| m.query_all(|_| true))
692 .unwrap_or_default();
693
694 let mut hits: Vec<(u64, f64)> = Vec::new();
695 for entity in &entities {
696 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
698 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
699 if dist <= *radius_km {
700 hits.push((entity.id.raw(), dist));
701 }
702 }
703 }
704 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
705 hits.truncate(*limit);
706
707 let mut result =
708 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
709 for (id, dist) in &hits {
710 let mut record = UnifiedRecord::new();
711 record.set("entity_id", Value::UnsignedInteger(*id));
712 record.set("distance_km", Value::Float(*dist));
713 result.push(record);
714 }
715 Ok(RuntimeQueryResult {
716 query: raw_query.to_string(),
717 mode: QueryMode::Sql,
718 statement: "search_spatial_radius",
719 engine: "runtime-spatial",
720 result,
721 affected_rows: 0,
722 statement_type: "select",
723 })
724 }
725 SearchCommand::SpatialBbox {
726 min_lat,
727 min_lon,
728 max_lat,
729 max_lon,
730 collection,
731 column,
732 limit,
733 limit_param,
734 } => {
735 if limit_param.is_some() {
736 return Err(RedDBError::Query(
737 "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
738 .to_string(),
739 ));
740 }
741 let _ = column;
742 let store = self.inner.db.store();
743 let entities = store
744 .get_collection(collection)
745 .map(|m| m.query_all(|_| true))
746 .unwrap_or_default();
747
748 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
749 let mut count = 0;
750 for entity in &entities {
751 if count >= *limit {
752 break;
753 }
754 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
755 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
756 {
757 let mut record = UnifiedRecord::new();
758 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
759 result.push(record);
760 count += 1;
761 }
762 }
763 }
764 Ok(RuntimeQueryResult {
765 query: raw_query.to_string(),
766 mode: QueryMode::Sql,
767 statement: "search_spatial_bbox",
768 engine: "runtime-spatial",
769 result,
770 affected_rows: 0,
771 statement_type: "select",
772 })
773 }
774 SearchCommand::SpatialNearest {
775 lat,
776 lon,
777 k,
778 collection,
779 column,
780 k_param,
781 } => {
782 if k_param.is_some() {
783 return Err(RedDBError::Query(
784 "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
785 .to_string(),
786 ));
787 }
788 use crate::storage::unified::spatial_index::haversine_km;
789 let _ = column;
790 let store = self.inner.db.store();
791 let entities = store
792 .get_collection(collection)
793 .map(|m| m.query_all(|_| true))
794 .unwrap_or_default();
795
796 let mut hits: Vec<(u64, f64)> = Vec::new();
797 for entity in &entities {
798 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
799 let dist = haversine_km(*lat, *lon, elat, elon);
800 hits.push((entity.id.raw(), dist));
801 }
802 }
803 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
804 hits.truncate(*k);
805
806 let mut result =
807 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
808 for (id, dist) in &hits {
809 let mut record = UnifiedRecord::new();
810 record.set("entity_id", Value::UnsignedInteger(*id));
811 record.set("distance_km", Value::Float(*dist));
812 result.push(record);
813 }
814 Ok(RuntimeQueryResult {
815 query: raw_query.to_string(),
816 mode: QueryMode::Sql,
817 statement: "search_spatial_nearest",
818 engine: "runtime-spatial",
819 result,
820 affected_rows: 0,
821 statement_type: "select",
822 })
823 }
824 }
825 }
826}
827
828fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
833 match s.to_lowercase().as_str() {
834 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
835 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
836 "both" | "any" => Ok(RuntimeGraphDirection::Both),
837 _ => Err(RedDBError::Query(format!(
838 "unknown direction: '{s}', expected outgoing|incoming|both"
839 ))),
840 }
841}
842
843fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
844 match s.to_lowercase().as_str() {
845 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
846 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
847 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
848 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
849 _ => Err(RedDBError::Query(format!(
850 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
851 ))),
852 }
853}
854
855fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
856 match s.to_lowercase().as_str() {
857 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
858 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
859 _ => Err(RedDBError::Query(format!(
860 "unknown traversal strategy: '{s}', expected bfs|dfs"
861 ))),
862 }
863}
864
865fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
866 match s.to_lowercase().as_str() {
867 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
868 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
869 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
870 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
871 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
872 _ => Err(RedDBError::Query(format!(
873 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
874 ))),
875 }
876}
877
878fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
879 match s.to_lowercase().as_str() {
880 "label_propagation" | "labelpropagation" => {
881 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
882 }
883 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
884 _ => Err(RedDBError::Query(format!(
885 "unknown community algorithm: '{s}', expected label_propagation|louvain"
886 ))),
887 }
888}
889
890fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
891 match s.to_lowercase().as_str() {
892 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
893 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
894 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
895 _ => Err(RedDBError::Query(format!(
896 "unknown components mode: '{s}', expected connected|weak|strong"
897 ))),
898 }
899}
900
901fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
906 match &entity.data {
907 EntityData::Row(row) => {
908 if let Some(ref named) = row.named {
910 for value in named.values() {
912 if let Value::GeoPoint(lat_micro, lon_micro) = value {
913 return Some((
914 *lat_micro as f64 / 1_000_000.0,
915 *lon_micro as f64 / 1_000_000.0,
916 ));
917 }
918 }
919 let lat =
921 named
922 .get("lat")
923 .or_else(|| named.get("latitude"))
924 .and_then(|v| match v {
925 Value::Float(f) => Some(*f),
926 Value::Integer(i) => Some(*i as f64),
927 _ => None,
928 });
929 let lon = named
930 .get("lon")
931 .or_else(|| named.get("lng"))
932 .or_else(|| named.get("longitude"))
933 .and_then(|v| match v {
934 Value::Float(f) => Some(*f),
935 Value::Integer(i) => Some(*i as f64),
936 _ => None,
937 });
938 if let (Some(la), Some(lo)) = (lat, lon) {
939 return Some((la, lo));
940 }
941 }
942 for value in &row.columns {
944 if let Value::GeoPoint(lat_micro, lon_micro) = value {
945 return Some((
946 *lat_micro as f64 / 1_000_000.0,
947 *lon_micro as f64 / 1_000_000.0,
948 ));
949 }
950 }
951 None
952 }
953 EntityData::Node(node) => {
954 for value in node.properties.values() {
956 if let Value::GeoPoint(lat_micro, lon_micro) = value {
957 return Some((
958 *lat_micro as f64 / 1_000_000.0,
959 *lon_micro as f64 / 1_000_000.0,
960 ));
961 }
962 }
963 let lat = node
964 .properties
965 .get("lat")
966 .or_else(|| node.properties.get("latitude"))
967 .and_then(|v| match v {
968 Value::Float(f) => Some(*f),
969 Value::Integer(i) => Some(*i as f64),
970 _ => None,
971 });
972 let lon = node
973 .properties
974 .get("lon")
975 .or_else(|| node.properties.get("lng"))
976 .or_else(|| node.properties.get("longitude"))
977 .and_then(|v| match v {
978 Value::Float(f) => Some(*f),
979 Value::Integer(i) => Some(*i as f64),
980 _ => None,
981 });
982 if let (Some(la), Some(lo)) = (lat, lon) {
983 return Some((la, lo));
984 }
985 None
986 }
987 _ => None,
988 }
989}