Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8
9impl RedDBRuntime {
10    /// Execute a GRAPH analytics command.
11    pub fn execute_graph_command(
12        &self,
13        raw_query: &str,
14        cmd: &GraphCommand,
15    ) -> RedDBResult<RuntimeQueryResult> {
16        match cmd {
17            GraphCommand::Neighborhood {
18                source,
19                depth,
20                direction,
21            } => {
22                let dir = parse_direction(direction)?;
23                let res = self.graph_neighborhood(source, dir, *depth as usize, None, None)?;
24                let mut result = UnifiedResult::with_columns(vec![
25                    "node_id".into(),
26                    "label".into(),
27                    "node_type".into(),
28                    "depth".into(),
29                ]);
30                for visit in &res.nodes {
31                    let mut record = UnifiedRecord::new();
32                    record.set("node_id", Value::text(visit.node.id.clone()));
33                    record.set("label", Value::text(visit.node.label.clone()));
34                    record.set("node_type", Value::text(visit.node.node_type.clone()));
35                    record.set("depth", Value::Integer(visit.depth as i64));
36                    result.push(record);
37                }
38                Ok(RuntimeQueryResult {
39                    query: raw_query.to_string(),
40                    mode: QueryMode::Sql,
41                    statement: "graph_neighborhood",
42                    engine: "runtime-graph",
43                    result,
44                    affected_rows: 0,
45                    statement_type: "select",
46                })
47            }
48            GraphCommand::ShortestPath {
49                source,
50                target,
51                algorithm,
52                direction,
53            } => {
54                let dir = parse_direction(direction)?;
55                let alg = parse_path_algorithm(algorithm)?;
56                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
57                let mut result = UnifiedResult::with_columns(vec![
58                    "source".into(),
59                    "target".into(),
60                    "nodes_visited".into(),
61                    "negative_cycle_detected".into(),
62                    "hop_count".into(),
63                    "total_weight".into(),
64                ]);
65                let mut record = UnifiedRecord::new();
66                record.set("source", Value::text(res.source));
67                record.set("target", Value::text(res.target));
68                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
69                record.set(
70                    "negative_cycle_detected",
71                    match res.negative_cycle_detected {
72                        Some(value) => Value::Boolean(value),
73                        None => Value::Null,
74                    },
75                );
76                if let Some(ref path) = res.path {
77                    record.set("hop_count", Value::Integer(path.hop_count as i64));
78                    record.set("total_weight", Value::Float(path.total_weight));
79                } else {
80                    record.set("hop_count", Value::Null);
81                    record.set("total_weight", Value::Null);
82                }
83                result.push(record);
84                Ok(RuntimeQueryResult {
85                    query: raw_query.to_string(),
86                    mode: QueryMode::Sql,
87                    statement: "graph_shortest_path",
88                    engine: "runtime-graph",
89                    result,
90                    affected_rows: 0,
91                    statement_type: "select",
92                })
93            }
94            GraphCommand::Properties => {
95                let res = self.graph_properties(None)?;
96                let mut result = UnifiedResult::with_columns(vec![
97                    "node_count".into(),
98                    "edge_count".into(),
99                    "is_connected".into(),
100                    "is_complete".into(),
101                    "is_cyclic".into(),
102                    "density".into(),
103                ]);
104                let mut record = UnifiedRecord::new();
105                record.set("node_count", Value::Integer(res.node_count as i64));
106                record.set("edge_count", Value::Integer(res.edge_count as i64));
107                record.set("is_connected", Value::Boolean(res.is_connected));
108                record.set("is_complete", Value::Boolean(res.is_complete));
109                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
110                record.set("density", Value::Float(res.density));
111                result.push(record);
112                Ok(RuntimeQueryResult {
113                    query: raw_query.to_string(),
114                    mode: QueryMode::Sql,
115                    statement: "graph_properties",
116                    engine: "runtime-graph",
117                    result,
118                    affected_rows: 0,
119                    statement_type: "select",
120                })
121            }
122            GraphCommand::Traverse {
123                source,
124                strategy,
125                depth,
126                direction,
127            } => {
128                let dir = parse_direction(direction)?;
129                let strat = parse_traversal_strategy(strategy)?;
130                let res = self.graph_traverse(source, dir, *depth as usize, strat, None, None)?;
131                let mut result = UnifiedResult::with_columns(vec![
132                    "node_id".into(),
133                    "label".into(),
134                    "node_type".into(),
135                    "depth".into(),
136                ]);
137                for visit in &res.visits {
138                    let mut record = UnifiedRecord::new();
139                    record.set("node_id", Value::text(visit.node.id.clone()));
140                    record.set("label", Value::text(visit.node.label.clone()));
141                    record.set("node_type", Value::text(visit.node.node_type.clone()));
142                    record.set("depth", Value::Integer(visit.depth as i64));
143                    result.push(record);
144                }
145                Ok(RuntimeQueryResult {
146                    query: raw_query.to_string(),
147                    mode: QueryMode::Sql,
148                    statement: "graph_traverse",
149                    engine: "runtime-graph",
150                    result,
151                    affected_rows: 0,
152                    statement_type: "select",
153                })
154            }
155            GraphCommand::Centrality { algorithm } => {
156                let alg = parse_centrality_algorithm(algorithm)?;
157                let res = self.graph_centrality(alg, 100, false, None, None, None, None)?;
158                let mut result = UnifiedResult::with_columns(vec![
159                    "node_id".into(),
160                    "label".into(),
161                    "score".into(),
162                ]);
163                for score in &res.scores {
164                    let mut record = UnifiedRecord::new();
165                    record.set("node_id", Value::text(score.node.id.clone()));
166                    record.set("label", Value::text(score.node.label.clone()));
167                    record.set("score", Value::Float(score.score));
168                    result.push(record);
169                }
170                for ds in &res.degree_scores {
171                    let mut record = UnifiedRecord::new();
172                    record.set("node_id", Value::text(ds.node.id.clone()));
173                    record.set("label", Value::text(ds.node.label.clone()));
174                    record.set("score", Value::Float(ds.total_degree as f64));
175                    result.push(record);
176                }
177                Ok(RuntimeQueryResult {
178                    query: raw_query.to_string(),
179                    mode: QueryMode::Sql,
180                    statement: "graph_centrality",
181                    engine: "runtime-graph",
182                    result,
183                    affected_rows: 0,
184                    statement_type: "select",
185                })
186            }
187            GraphCommand::Community {
188                algorithm,
189                max_iterations,
190            } => {
191                let alg = parse_community_algorithm(algorithm)?;
192                let res =
193                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
194                let mut result =
195                    UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
196                for community in &res.communities {
197                    let mut record = UnifiedRecord::new();
198                    record.set("community_id", Value::text(community.id.clone()));
199                    record.set("size", Value::Integer(community.size as i64));
200                    result.push(record);
201                }
202                Ok(RuntimeQueryResult {
203                    query: raw_query.to_string(),
204                    mode: QueryMode::Sql,
205                    statement: "graph_community",
206                    engine: "runtime-graph",
207                    result,
208                    affected_rows: 0,
209                    statement_type: "select",
210                })
211            }
212            GraphCommand::Components { mode } => {
213                let m = parse_components_mode(mode)?;
214                let res = self.graph_components(m, 1, None)?;
215                let mut result =
216                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
217                for component in &res.components {
218                    let mut record = UnifiedRecord::new();
219                    record.set("component_id", Value::text(component.id.clone()));
220                    record.set("size", Value::Integer(component.size as i64));
221                    result.push(record);
222                }
223                Ok(RuntimeQueryResult {
224                    query: raw_query.to_string(),
225                    mode: QueryMode::Sql,
226                    statement: "graph_components",
227                    engine: "runtime-graph",
228                    result,
229                    affected_rows: 0,
230                    statement_type: "select",
231                })
232            }
233            GraphCommand::Cycles { max_length } => {
234                let res = self.graph_cycles(*max_length as usize, 100, None)?;
235                let mut result =
236                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
237                for (i, cycle) in res.cycles.iter().enumerate() {
238                    let mut record = UnifiedRecord::new();
239                    record.set("cycle_index", Value::Integer(i as i64));
240                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
241                    result.push(record);
242                }
243                Ok(RuntimeQueryResult {
244                    query: raw_query.to_string(),
245                    mode: QueryMode::Sql,
246                    statement: "graph_cycles",
247                    engine: "runtime-graph",
248                    result,
249                    affected_rows: 0,
250                    statement_type: "select",
251                })
252            }
253            GraphCommand::Clustering => {
254                let res = self.graph_clustering(100, true, None)?;
255                let mut result = UnifiedResult::with_columns(vec![
256                    "node_id".into(),
257                    "label".into(),
258                    "score".into(),
259                ]);
260                // First row: global coefficient
261                let mut global_record = UnifiedRecord::new();
262                global_record.set("node_id", Value::text("__global__"));
263                global_record.set("label", Value::text("global_clustering"));
264                global_record.set("score", Value::Float(res.global));
265                result.push(global_record);
266                for score in &res.local {
267                    let mut record = UnifiedRecord::new();
268                    record.set("node_id", Value::text(score.node.id.clone()));
269                    record.set("label", Value::text(score.node.label.clone()));
270                    record.set("score", Value::Float(score.score));
271                    result.push(record);
272                }
273                Ok(RuntimeQueryResult {
274                    query: raw_query.to_string(),
275                    mode: QueryMode::Sql,
276                    statement: "graph_clustering",
277                    engine: "runtime-graph",
278                    result,
279                    affected_rows: 0,
280                    statement_type: "select",
281                })
282            }
283            GraphCommand::TopologicalSort => {
284                let res = self.graph_topological_sort(None)?;
285                let mut result = UnifiedResult::with_columns(vec![
286                    "order".into(),
287                    "node_id".into(),
288                    "label".into(),
289                ]);
290                for (i, node) in res.ordered_nodes.iter().enumerate() {
291                    let mut record = UnifiedRecord::new();
292                    record.set("order", Value::Integer(i as i64));
293                    record.set("node_id", Value::text(node.id.clone()));
294                    record.set("label", Value::text(node.label.clone()));
295                    result.push(record);
296                }
297                Ok(RuntimeQueryResult {
298                    query: raw_query.to_string(),
299                    mode: QueryMode::Sql,
300                    statement: "graph_topological_sort",
301                    engine: "runtime-graph",
302                    result,
303                    affected_rows: 0,
304                    statement_type: "select",
305                })
306            }
307        }
308    }
309
310    /// Execute a SEARCH command.
311    pub fn execute_search_command(
312        &self,
313        raw_query: &str,
314        cmd: &SearchCommand,
315    ) -> RedDBResult<RuntimeQueryResult> {
316        match cmd {
317            SearchCommand::Similar {
318                vector,
319                text,
320                provider,
321                collection,
322                limit,
323                min_score,
324            } => {
325                // If text provided, generate embedding first (semantic search)
326                let search_vector = if let Some(query_text) = text {
327                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
328                    let provider = match provider.as_deref() {
329                        Some(p) => crate::ai::parse_provider(p)?,
330                        None => default_provider,
331                    };
332                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
333                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
334                        .ok()
335                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
336                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
337                    let request = crate::ai::OpenAiEmbeddingRequest {
338                        api_key,
339                        model,
340                        inputs: vec![query_text.clone()],
341                        dimensions: None,
342                        api_base: provider.resolve_api_base(),
343                    };
344                    let response = crate::runtime::ai::block_on_ai(async move {
345                        crate::ai::openai_embeddings_async(&transport, request).await
346                    })
347                    .and_then(|result| result)?;
348                    response.embeddings.into_iter().next().ok_or_else(|| {
349                        RedDBError::Query("embedding API returned no vectors".to_string())
350                    })?
351                } else {
352                    vector.clone()
353                };
354                // Issue #119: route through AuthorizedSearch so the
355                // candidate set is gated by `EffectiveScope.visible_collections`
356                // before any similarity score is computed.
357                let scope = self.ai_scope();
358                let results =
359                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
360                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
361                            self,
362                            &scope,
363                            collection,
364                            &search_vector,
365                            *limit,
366                            *min_score,
367                        )?
368                    } else {
369                        // Embedded / no-auth caller: keep legacy behaviour.
370                        self.search_similar(collection, &search_vector, *limit, *min_score)?
371                    };
372                let mut result =
373                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
374                for sr in &results {
375                    let mut record = UnifiedRecord::new();
376                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
377                    record.set("score", Value::Float(sr.score as f64));
378                    result.push(record);
379                }
380                Ok(RuntimeQueryResult {
381                    query: raw_query.to_string(),
382                    mode: QueryMode::Sql,
383                    statement: "search_similar",
384                    engine: "runtime-search",
385                    result,
386                    affected_rows: 0,
387                    statement_type: "select",
388                })
389            }
390            SearchCommand::Text {
391                query,
392                collection,
393                limit,
394                fuzzy,
395            } => {
396                let collections = collection.as_ref().map(|c| vec![c.clone()]);
397                // Issue #119: gate the candidate set by visible_collections.
398                let scope = self.ai_scope();
399                let res =
400                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
401                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
402                            self,
403                            &scope,
404                            query.clone(),
405                            collections,
406                            None,
407                            None,
408                            None,
409                            Some(*limit),
410                            *fuzzy,
411                        )?
412                    } else {
413                        self.search_text(
414                            query.clone(),
415                            collections,
416                            None,
417                            None,
418                            None,
419                            Some(*limit),
420                            *fuzzy,
421                        )?
422                    };
423                let mut result =
424                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
425                for item in &res.matches {
426                    let mut record = UnifiedRecord::new();
427                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
428                    record.set("score", Value::Float(item.score as f64));
429                    result.push(record);
430                }
431                Ok(RuntimeQueryResult {
432                    query: raw_query.to_string(),
433                    mode: QueryMode::Sql,
434                    statement: "search_text",
435                    engine: "runtime-search",
436                    result,
437                    affected_rows: 0,
438                    statement_type: "select",
439                })
440            }
441            SearchCommand::Hybrid {
442                vector,
443                query,
444                collection,
445                limit,
446            } => {
447                let res = self.search_hybrid(
448                    vector.clone(),
449                    query.clone(),
450                    Some(*limit),
451                    Some(vec![collection.clone()]),
452                    None,
453                    None,
454                    None,
455                    Vec::new(),
456                    None,
457                    None,
458                    Some(*limit),
459                )?;
460                let mut result =
461                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
462                for item in &res.matches {
463                    let mut record = UnifiedRecord::new();
464                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
465                    record.set("score", Value::Float(item.score as f64));
466                    result.push(record);
467                }
468                Ok(RuntimeQueryResult {
469                    query: raw_query.to_string(),
470                    mode: QueryMode::Sql,
471                    statement: "search_hybrid",
472                    engine: "runtime-search",
473                    result,
474                    affected_rows: 0,
475                    statement_type: "select",
476                })
477            }
478            SearchCommand::Multimodal {
479                query,
480                collection,
481                limit,
482            } => {
483                let collections = collection.as_ref().map(|c| vec![c.clone()]);
484                let res =
485                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
486                let mut result =
487                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
488                for item in &res.matches {
489                    let mut record = UnifiedRecord::new();
490                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
491                    record.set("score", Value::Float(item.score as f64));
492                    result.push(record);
493                }
494                Ok(RuntimeQueryResult {
495                    query: raw_query.to_string(),
496                    mode: QueryMode::Sql,
497                    statement: "search_multimodal",
498                    engine: "runtime-search",
499                    result,
500                    affected_rows: 0,
501                    statement_type: "select",
502                })
503            }
504            SearchCommand::Index {
505                index,
506                value,
507                collection,
508                limit,
509                exact,
510            } => {
511                let collections = collection.as_ref().map(|c| vec![c.clone()]);
512                let res = self.search_index(
513                    index.clone(),
514                    value.clone(),
515                    *exact,
516                    collections,
517                    None,
518                    None,
519                    Some(*limit),
520                )?;
521                let mut result =
522                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
523                for item in &res.matches {
524                    let mut record = UnifiedRecord::new();
525                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
526                    record.set("score", Value::Float(item.score as f64));
527                    result.push(record);
528                }
529                Ok(RuntimeQueryResult {
530                    query: raw_query.to_string(),
531                    mode: QueryMode::Sql,
532                    statement: "search_index",
533                    engine: "runtime-search",
534                    result,
535                    affected_rows: 0,
536                    statement_type: "select",
537                })
538            }
539            SearchCommand::Context {
540                query,
541                field,
542                collection,
543                limit,
544                depth,
545            } => {
546                use crate::application::SearchContextInput;
547                // Issue #119: route through AuthorizedSearch so the
548                // candidate set + every expansion bucket is bounded by
549                // `EffectiveScope.visible_collections`.
550                let input = SearchContextInput {
551                    query: query.clone(),
552                    field: field.clone(),
553                    vector: None,
554                    collections: collection.as_ref().map(|c| vec![c.clone()]),
555                    graph_depth: Some(*depth),
556                    graph_max_edges: None,
557                    max_cross_refs: None,
558                    follow_cross_refs: None,
559                    expand_graph: None,
560                    global_scan: None,
561                    reindex: None,
562                    limit: Some(*limit),
563                    min_score: None,
564                };
565                let scope = self.ai_scope();
566                let res =
567                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
568                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
569                            self, &scope, input,
570                        )?
571                    } else {
572                        self.search_context(input)?
573                    };
574                let mut result = UnifiedResult::with_columns(vec![
575                    "entity_id".into(),
576                    "collection".into(),
577                    "score".into(),
578                    "discovery".into(),
579                    "kind".into(),
580                ]);
581                let all_entities = res
582                    .tables
583                    .iter()
584                    .map(|e| (e, "table"))
585                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
586                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
587                    .chain(res.vectors.iter().map(|e| (e, "vector")))
588                    .chain(res.documents.iter().map(|e| (e, "document")))
589                    .chain(res.key_values.iter().map(|e| (e, "kv")));
590                for (entity, kind) in all_entities {
591                    let mut record = UnifiedRecord::new();
592                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
593                    record.set("collection", Value::text(entity.collection.clone()));
594                    record.set("score", Value::Float(entity.score as f64));
595                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
596                    record.set("kind", Value::text(kind.to_string()));
597                    result.push(record);
598                }
599                Ok(RuntimeQueryResult {
600                    query: raw_query.to_string(),
601                    mode: QueryMode::Sql,
602                    statement: "search_context",
603                    engine: "runtime-context",
604                    result,
605                    affected_rows: 0,
606                    statement_type: "select",
607                })
608            }
609            SearchCommand::SpatialRadius {
610                center_lat,
611                center_lon,
612                radius_km,
613                collection,
614                column,
615                limit,
616            } => {
617                use crate::storage::unified::spatial_index::haversine_km;
618                let _ = column; // Column indicates which field holds geo data
619                let store = self.inner.db.store();
620                let entities = store
621                    .get_collection(collection)
622                    .map(|m| m.query_all(|_| true))
623                    .unwrap_or_default();
624
625                let mut hits: Vec<(u64, f64)> = Vec::new();
626                for entity in &entities {
627                    // Extract lat/lon from GeoPoint values in entity data
628                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
629                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
630                        if dist <= *radius_km {
631                            hits.push((entity.id.raw(), dist));
632                        }
633                    }
634                }
635                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
636                hits.truncate(*limit);
637
638                let mut result =
639                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
640                for (id, dist) in &hits {
641                    let mut record = UnifiedRecord::new();
642                    record.set("entity_id", Value::UnsignedInteger(*id));
643                    record.set("distance_km", Value::Float(*dist));
644                    result.push(record);
645                }
646                Ok(RuntimeQueryResult {
647                    query: raw_query.to_string(),
648                    mode: QueryMode::Sql,
649                    statement: "search_spatial_radius",
650                    engine: "runtime-spatial",
651                    result,
652                    affected_rows: 0,
653                    statement_type: "select",
654                })
655            }
656            SearchCommand::SpatialBbox {
657                min_lat,
658                min_lon,
659                max_lat,
660                max_lon,
661                collection,
662                column,
663                limit,
664            } => {
665                let _ = column;
666                let store = self.inner.db.store();
667                let entities = store
668                    .get_collection(collection)
669                    .map(|m| m.query_all(|_| true))
670                    .unwrap_or_default();
671
672                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
673                let mut count = 0;
674                for entity in &entities {
675                    if count >= *limit {
676                        break;
677                    }
678                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
679                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
680                        {
681                            let mut record = UnifiedRecord::new();
682                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
683                            result.push(record);
684                            count += 1;
685                        }
686                    }
687                }
688                Ok(RuntimeQueryResult {
689                    query: raw_query.to_string(),
690                    mode: QueryMode::Sql,
691                    statement: "search_spatial_bbox",
692                    engine: "runtime-spatial",
693                    result,
694                    affected_rows: 0,
695                    statement_type: "select",
696                })
697            }
698            SearchCommand::SpatialNearest {
699                lat,
700                lon,
701                k,
702                collection,
703                column,
704            } => {
705                use crate::storage::unified::spatial_index::haversine_km;
706                let _ = column;
707                let store = self.inner.db.store();
708                let entities = store
709                    .get_collection(collection)
710                    .map(|m| m.query_all(|_| true))
711                    .unwrap_or_default();
712
713                let mut hits: Vec<(u64, f64)> = Vec::new();
714                for entity in &entities {
715                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
716                        let dist = haversine_km(*lat, *lon, elat, elon);
717                        hits.push((entity.id.raw(), dist));
718                    }
719                }
720                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
721                hits.truncate(*k);
722
723                let mut result =
724                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
725                for (id, dist) in &hits {
726                    let mut record = UnifiedRecord::new();
727                    record.set("entity_id", Value::UnsignedInteger(*id));
728                    record.set("distance_km", Value::Float(*dist));
729                    result.push(record);
730                }
731                Ok(RuntimeQueryResult {
732                    query: raw_query.to_string(),
733                    mode: QueryMode::Sql,
734                    statement: "search_spatial_nearest",
735                    engine: "runtime-spatial",
736                    result,
737                    affected_rows: 0,
738                    statement_type: "select",
739                })
740            }
741        }
742    }
743}
744
745// =============================================================================
746// Conversion helpers for string -> enum
747// =============================================================================
748
749fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
750    match s.to_lowercase().as_str() {
751        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
752        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
753        "both" | "any" => Ok(RuntimeGraphDirection::Both),
754        _ => Err(RedDBError::Query(format!(
755            "unknown direction: '{s}', expected outgoing|incoming|both"
756        ))),
757    }
758}
759
760fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
761    match s.to_lowercase().as_str() {
762        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
763        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
764        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
765        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
766        _ => Err(RedDBError::Query(format!(
767            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
768        ))),
769    }
770}
771
772fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
773    match s.to_lowercase().as_str() {
774        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
775        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
776        _ => Err(RedDBError::Query(format!(
777            "unknown traversal strategy: '{s}', expected bfs|dfs"
778        ))),
779    }
780}
781
782fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
783    match s.to_lowercase().as_str() {
784        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
785        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
786        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
787        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
788        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
789        _ => Err(RedDBError::Query(format!(
790            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
791        ))),
792    }
793}
794
795fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
796    match s.to_lowercase().as_str() {
797        "label_propagation" | "labelpropagation" => {
798            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
799        }
800        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
801        _ => Err(RedDBError::Query(format!(
802            "unknown community algorithm: '{s}', expected label_propagation|louvain"
803        ))),
804    }
805}
806
807fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
808    match s.to_lowercase().as_str() {
809        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
810        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
811        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
812        _ => Err(RedDBError::Query(format!(
813            "unknown components mode: '{s}', expected connected|weak|strong"
814        ))),
815    }
816}
817
818/// Extract (latitude, longitude) from an entity.
819///
820/// Looks for GeoPoint values in the entity data (row columns or node properties)
821/// or dedicated lat/lon fields. Returns degrees.
822fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
823    match &entity.data {
824        EntityData::Row(row) => {
825            // Search named columns for GeoPoint or lat/lon pairs
826            if let Some(ref named) = row.named {
827                // Direct GeoPoint value
828                for value in named.values() {
829                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
830                        return Some((
831                            *lat_micro as f64 / 1_000_000.0,
832                            *lon_micro as f64 / 1_000_000.0,
833                        ));
834                    }
835                }
836                // Try lat/lon or latitude/longitude named fields
837                let lat =
838                    named
839                        .get("lat")
840                        .or_else(|| named.get("latitude"))
841                        .and_then(|v| match v {
842                            Value::Float(f) => Some(*f),
843                            Value::Integer(i) => Some(*i as f64),
844                            _ => None,
845                        });
846                let lon = named
847                    .get("lon")
848                    .or_else(|| named.get("lng"))
849                    .or_else(|| named.get("longitude"))
850                    .and_then(|v| match v {
851                        Value::Float(f) => Some(*f),
852                        Value::Integer(i) => Some(*i as f64),
853                        _ => None,
854                    });
855                if let (Some(la), Some(lo)) = (lat, lon) {
856                    return Some((la, lo));
857                }
858            }
859            // Search positional columns for GeoPoint
860            for value in &row.columns {
861                if let Value::GeoPoint(lat_micro, lon_micro) = value {
862                    return Some((
863                        *lat_micro as f64 / 1_000_000.0,
864                        *lon_micro as f64 / 1_000_000.0,
865                    ));
866                }
867            }
868            None
869        }
870        EntityData::Node(node) => {
871            // Search node properties
872            for value in node.properties.values() {
873                if let Value::GeoPoint(lat_micro, lon_micro) = value {
874                    return Some((
875                        *lat_micro as f64 / 1_000_000.0,
876                        *lon_micro as f64 / 1_000_000.0,
877                    ));
878                }
879            }
880            let lat = node
881                .properties
882                .get("lat")
883                .or_else(|| node.properties.get("latitude"))
884                .and_then(|v| match v {
885                    Value::Float(f) => Some(*f),
886                    Value::Integer(i) => Some(*i as f64),
887                    _ => None,
888                });
889            let lon = node
890                .properties
891                .get("lon")
892                .or_else(|| node.properties.get("lng"))
893                .or_else(|| node.properties.get("longitude"))
894                .and_then(|v| match v {
895                    Value::Float(f) => Some(*f),
896                    Value::Integer(i) => Some(*i as f64),
897                    _ => None,
898                });
899            if let (Some(la), Some(lo)) = (lat, lon) {
900                return Some((la, lo));
901            }
902            None
903        }
904        _ => None,
905    }
906}