Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8
9impl RedDBRuntime {
10    /// Execute a GRAPH analytics command.
11    pub fn execute_graph_command(
12        &self,
13        raw_query: &str,
14        cmd: &GraphCommand,
15    ) -> RedDBResult<RuntimeQueryResult> {
16        match cmd {
17            GraphCommand::Neighborhood {
18                source,
19                depth,
20                direction,
21            } => {
22                let dir = parse_direction(direction)?;
23                let res = self.graph_neighborhood(source, dir, *depth as usize, None, None)?;
24                let mut result = UnifiedResult::with_columns(vec![
25                    "node_id".into(),
26                    "label".into(),
27                    "node_type".into(),
28                    "depth".into(),
29                ]);
30                for visit in &res.nodes {
31                    let mut record = UnifiedRecord::new();
32                    record.set("node_id", Value::text(visit.node.id.clone()));
33                    record.set("label", Value::text(visit.node.label.clone()));
34                    record.set("node_type", Value::text(visit.node.node_type.clone()));
35                    record.set("depth", Value::Integer(visit.depth as i64));
36                    result.push(record);
37                }
38                Ok(RuntimeQueryResult {
39                    query: raw_query.to_string(),
40                    mode: QueryMode::Sql,
41                    statement: "graph_neighborhood",
42                    engine: "runtime-graph",
43                    result,
44                    affected_rows: 0,
45                    statement_type: "select",
46                })
47            }
48            GraphCommand::ShortestPath {
49                source,
50                target,
51                algorithm,
52                direction,
53            } => {
54                let dir = parse_direction(direction)?;
55                let alg = parse_path_algorithm(algorithm)?;
56                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
57                let mut result = UnifiedResult::with_columns(vec![
58                    "source".into(),
59                    "target".into(),
60                    "nodes_visited".into(),
61                    "negative_cycle_detected".into(),
62                    "hop_count".into(),
63                    "total_weight".into(),
64                ]);
65                let mut record = UnifiedRecord::new();
66                record.set("source", Value::text(res.source));
67                record.set("target", Value::text(res.target));
68                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
69                record.set(
70                    "negative_cycle_detected",
71                    match res.negative_cycle_detected {
72                        Some(value) => Value::Boolean(value),
73                        None => Value::Null,
74                    },
75                );
76                if let Some(ref path) = res.path {
77                    record.set("hop_count", Value::Integer(path.hop_count as i64));
78                    record.set("total_weight", Value::Float(path.total_weight));
79                } else {
80                    record.set("hop_count", Value::Null);
81                    record.set("total_weight", Value::Null);
82                }
83                result.push(record);
84                Ok(RuntimeQueryResult {
85                    query: raw_query.to_string(),
86                    mode: QueryMode::Sql,
87                    statement: "graph_shortest_path",
88                    engine: "runtime-graph",
89                    result,
90                    affected_rows: 0,
91                    statement_type: "select",
92                })
93            }
94            GraphCommand::Properties => {
95                let res = self.graph_properties(None)?;
96                let mut result = UnifiedResult::with_columns(vec![
97                    "node_count".into(),
98                    "edge_count".into(),
99                    "is_connected".into(),
100                    "is_complete".into(),
101                    "is_cyclic".into(),
102                    "density".into(),
103                ]);
104                let mut record = UnifiedRecord::new();
105                record.set("node_count", Value::Integer(res.node_count as i64));
106                record.set("edge_count", Value::Integer(res.edge_count as i64));
107                record.set("is_connected", Value::Boolean(res.is_connected));
108                record.set("is_complete", Value::Boolean(res.is_complete));
109                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
110                record.set("density", Value::Float(res.density));
111                result.push(record);
112                Ok(RuntimeQueryResult {
113                    query: raw_query.to_string(),
114                    mode: QueryMode::Sql,
115                    statement: "graph_properties",
116                    engine: "runtime-graph",
117                    result,
118                    affected_rows: 0,
119                    statement_type: "select",
120                })
121            }
122            GraphCommand::Traverse {
123                source,
124                strategy,
125                depth,
126                direction,
127            } => {
128                let dir = parse_direction(direction)?;
129                let strat = parse_traversal_strategy(strategy)?;
130                let res = self.graph_traverse(source, dir, *depth as usize, strat, None, None)?;
131                let mut result = UnifiedResult::with_columns(vec![
132                    "node_id".into(),
133                    "label".into(),
134                    "node_type".into(),
135                    "depth".into(),
136                ]);
137                for visit in &res.visits {
138                    let mut record = UnifiedRecord::new();
139                    record.set("node_id", Value::text(visit.node.id.clone()));
140                    record.set("label", Value::text(visit.node.label.clone()));
141                    record.set("node_type", Value::text(visit.node.node_type.clone()));
142                    record.set("depth", Value::Integer(visit.depth as i64));
143                    result.push(record);
144                }
145                Ok(RuntimeQueryResult {
146                    query: raw_query.to_string(),
147                    mode: QueryMode::Sql,
148                    statement: "graph_traverse",
149                    engine: "runtime-graph",
150                    result,
151                    affected_rows: 0,
152                    statement_type: "select",
153                })
154            }
155            GraphCommand::Centrality { algorithm } => {
156                let alg = parse_centrality_algorithm(algorithm)?;
157                let res = self.graph_centrality(alg, 100, false, None, None, None, None)?;
158                let mut result = UnifiedResult::with_columns(vec![
159                    "node_id".into(),
160                    "label".into(),
161                    "score".into(),
162                ]);
163                for score in &res.scores {
164                    let mut record = UnifiedRecord::new();
165                    record.set("node_id", Value::text(score.node.id.clone()));
166                    record.set("label", Value::text(score.node.label.clone()));
167                    record.set("score", Value::Float(score.score));
168                    result.push(record);
169                }
170                for ds in &res.degree_scores {
171                    let mut record = UnifiedRecord::new();
172                    record.set("node_id", Value::text(ds.node.id.clone()));
173                    record.set("label", Value::text(ds.node.label.clone()));
174                    record.set("score", Value::Float(ds.total_degree as f64));
175                    result.push(record);
176                }
177                Ok(RuntimeQueryResult {
178                    query: raw_query.to_string(),
179                    mode: QueryMode::Sql,
180                    statement: "graph_centrality",
181                    engine: "runtime-graph",
182                    result,
183                    affected_rows: 0,
184                    statement_type: "select",
185                })
186            }
187            GraphCommand::Community {
188                algorithm,
189                max_iterations,
190            } => {
191                let alg = parse_community_algorithm(algorithm)?;
192                let res =
193                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
194                let mut result =
195                    UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
196                for community in &res.communities {
197                    let mut record = UnifiedRecord::new();
198                    record.set("community_id", Value::text(community.id.clone()));
199                    record.set("size", Value::Integer(community.size as i64));
200                    result.push(record);
201                }
202                Ok(RuntimeQueryResult {
203                    query: raw_query.to_string(),
204                    mode: QueryMode::Sql,
205                    statement: "graph_community",
206                    engine: "runtime-graph",
207                    result,
208                    affected_rows: 0,
209                    statement_type: "select",
210                })
211            }
212            GraphCommand::Components { mode } => {
213                let m = parse_components_mode(mode)?;
214                let res = self.graph_components(m, 1, None)?;
215                let mut result =
216                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
217                for component in &res.components {
218                    let mut record = UnifiedRecord::new();
219                    record.set("component_id", Value::text(component.id.clone()));
220                    record.set("size", Value::Integer(component.size as i64));
221                    result.push(record);
222                }
223                Ok(RuntimeQueryResult {
224                    query: raw_query.to_string(),
225                    mode: QueryMode::Sql,
226                    statement: "graph_components",
227                    engine: "runtime-graph",
228                    result,
229                    affected_rows: 0,
230                    statement_type: "select",
231                })
232            }
233            GraphCommand::Cycles { max_length } => {
234                let res = self.graph_cycles(*max_length as usize, 100, None)?;
235                let mut result =
236                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
237                for (i, cycle) in res.cycles.iter().enumerate() {
238                    let mut record = UnifiedRecord::new();
239                    record.set("cycle_index", Value::Integer(i as i64));
240                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
241                    result.push(record);
242                }
243                Ok(RuntimeQueryResult {
244                    query: raw_query.to_string(),
245                    mode: QueryMode::Sql,
246                    statement: "graph_cycles",
247                    engine: "runtime-graph",
248                    result,
249                    affected_rows: 0,
250                    statement_type: "select",
251                })
252            }
253            GraphCommand::Clustering => {
254                let res = self.graph_clustering(100, true, None)?;
255                let mut result = UnifiedResult::with_columns(vec![
256                    "node_id".into(),
257                    "label".into(),
258                    "score".into(),
259                ]);
260                // First row: global coefficient
261                let mut global_record = UnifiedRecord::new();
262                global_record.set("node_id", Value::text("__global__"));
263                global_record.set("label", Value::text("global_clustering"));
264                global_record.set("score", Value::Float(res.global));
265                result.push(global_record);
266                for score in &res.local {
267                    let mut record = UnifiedRecord::new();
268                    record.set("node_id", Value::text(score.node.id.clone()));
269                    record.set("label", Value::text(score.node.label.clone()));
270                    record.set("score", Value::Float(score.score));
271                    result.push(record);
272                }
273                Ok(RuntimeQueryResult {
274                    query: raw_query.to_string(),
275                    mode: QueryMode::Sql,
276                    statement: "graph_clustering",
277                    engine: "runtime-graph",
278                    result,
279                    affected_rows: 0,
280                    statement_type: "select",
281                })
282            }
283            GraphCommand::TopologicalSort => {
284                let res = self.graph_topological_sort(None)?;
285                let mut result = UnifiedResult::with_columns(vec![
286                    "order".into(),
287                    "node_id".into(),
288                    "label".into(),
289                ]);
290                for (i, node) in res.ordered_nodes.iter().enumerate() {
291                    let mut record = UnifiedRecord::new();
292                    record.set("order", Value::Integer(i as i64));
293                    record.set("node_id", Value::text(node.id.clone()));
294                    record.set("label", Value::text(node.label.clone()));
295                    result.push(record);
296                }
297                Ok(RuntimeQueryResult {
298                    query: raw_query.to_string(),
299                    mode: QueryMode::Sql,
300                    statement: "graph_topological_sort",
301                    engine: "runtime-graph",
302                    result,
303                    affected_rows: 0,
304                    statement_type: "select",
305                })
306            }
307        }
308    }
309
310    /// Execute a SEARCH command.
311    pub fn execute_search_command(
312        &self,
313        raw_query: &str,
314        cmd: &SearchCommand,
315    ) -> RedDBResult<RuntimeQueryResult> {
316        match cmd {
317            SearchCommand::Similar {
318                vector,
319                text,
320                provider,
321                collection,
322                limit,
323                min_score,
324                vector_param,
325                limit_param,
326                min_score_param,
327                text_param,
328            } => {
329                if vector_param.is_some() {
330                    return Err(RedDBError::Query(
331                        "SEARCH SIMILAR $N vector parameter was not bound before execution"
332                            .to_string(),
333                    ));
334                }
335                if limit_param.is_some() {
336                    return Err(RedDBError::Query(
337                        "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
338                            .to_string(),
339                    ));
340                }
341                if min_score_param.is_some() {
342                    return Err(RedDBError::Query(
343                        "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
344                            .to_string(),
345                    ));
346                }
347                if text_param.is_some() {
348                    return Err(RedDBError::Query(
349                        "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
350                            .to_string(),
351                    ));
352                }
353                // If text provided, generate embedding first (semantic search)
354                let search_vector = if let Some(query_text) = text {
355                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
356                    let provider = match provider.as_deref() {
357                        Some(p) => crate::ai::parse_provider(p)?,
358                        None => default_provider,
359                    };
360                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
361                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
362                        .ok()
363                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
364                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
365                    let request = crate::ai::OpenAiEmbeddingRequest {
366                        api_key,
367                        model,
368                        inputs: vec![query_text.clone()],
369                        dimensions: None,
370                        api_base: provider.resolve_api_base(),
371                    };
372                    let response = crate::runtime::ai::block_on_ai(async move {
373                        crate::ai::openai_embeddings_async(&transport, request).await
374                    })
375                    .and_then(|result| result)?;
376                    response.embeddings.into_iter().next().ok_or_else(|| {
377                        RedDBError::Query("embedding API returned no vectors".to_string())
378                    })?
379                } else {
380                    vector.clone()
381                };
382                // Issue #119: route through AuthorizedSearch so the
383                // candidate set is gated by `EffectiveScope.visible_collections`
384                // before any similarity score is computed.
385                let scope = self.ai_scope();
386                let results =
387                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
388                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
389                            self,
390                            &scope,
391                            collection,
392                            &search_vector,
393                            *limit,
394                            *min_score,
395                        )?
396                    } else {
397                        // Embedded / no-auth caller: keep legacy behaviour.
398                        self.search_similar(collection, &search_vector, *limit, *min_score)?
399                    };
400                let mut result =
401                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
402                for sr in &results {
403                    let mut record = UnifiedRecord::new();
404                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
405                    record.set("score", Value::Float(sr.score as f64));
406                    result.push(record);
407                }
408                Ok(RuntimeQueryResult {
409                    query: raw_query.to_string(),
410                    mode: QueryMode::Sql,
411                    statement: "search_similar",
412                    engine: "runtime-search",
413                    result,
414                    affected_rows: 0,
415                    statement_type: "select",
416                })
417            }
418            SearchCommand::Text {
419                query,
420                collection,
421                limit,
422                fuzzy,
423                limit_param,
424            } => {
425                if limit_param.is_some() {
426                    return Err(RedDBError::Query(
427                        "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
428                    ));
429                }
430                let collections = collection.as_ref().map(|c| vec![c.clone()]);
431                // Issue #119: gate the candidate set by visible_collections.
432                let scope = self.ai_scope();
433                let res =
434                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
435                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
436                            self,
437                            &scope,
438                            query.clone(),
439                            collections,
440                            None,
441                            None,
442                            None,
443                            Some(*limit),
444                            *fuzzy,
445                        )?
446                    } else {
447                        self.search_text(
448                            query.clone(),
449                            collections,
450                            None,
451                            None,
452                            None,
453                            Some(*limit),
454                            *fuzzy,
455                        )?
456                    };
457                let mut result =
458                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
459                for item in &res.matches {
460                    let mut record = UnifiedRecord::new();
461                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
462                    record.set("score", Value::Float(item.score as f64));
463                    result.push(record);
464                }
465                Ok(RuntimeQueryResult {
466                    query: raw_query.to_string(),
467                    mode: QueryMode::Sql,
468                    statement: "search_text",
469                    engine: "runtime-search",
470                    result,
471                    affected_rows: 0,
472                    statement_type: "select",
473                })
474            }
475            SearchCommand::Hybrid {
476                vector,
477                query,
478                collection,
479                limit,
480                limit_param,
481            } => {
482                if limit_param.is_some() {
483                    return Err(RedDBError::Query(
484                        "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
485                            .to_string(),
486                    ));
487                }
488                let res = self.search_hybrid(
489                    vector.clone(),
490                    query.clone(),
491                    Some(*limit),
492                    Some(vec![collection.clone()]),
493                    None,
494                    None,
495                    None,
496                    Vec::new(),
497                    None,
498                    None,
499                    Some(*limit),
500                )?;
501                let mut result =
502                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
503                for item in &res.matches {
504                    let mut record = UnifiedRecord::new();
505                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
506                    record.set("score", Value::Float(item.score as f64));
507                    result.push(record);
508                }
509                Ok(RuntimeQueryResult {
510                    query: raw_query.to_string(),
511                    mode: QueryMode::Sql,
512                    statement: "search_hybrid",
513                    engine: "runtime-search",
514                    result,
515                    affected_rows: 0,
516                    statement_type: "select",
517                })
518            }
519            SearchCommand::Multimodal {
520                query,
521                collection,
522                limit,
523                limit_param,
524            } => {
525                if limit_param.is_some() {
526                    return Err(RedDBError::Query(
527                        "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
528                            .to_string(),
529                    ));
530                }
531                let collections = collection.as_ref().map(|c| vec![c.clone()]);
532                let res =
533                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
534                let mut result =
535                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
536                for item in &res.matches {
537                    let mut record = UnifiedRecord::new();
538                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
539                    record.set("score", Value::Float(item.score as f64));
540                    result.push(record);
541                }
542                Ok(RuntimeQueryResult {
543                    query: raw_query.to_string(),
544                    mode: QueryMode::Sql,
545                    statement: "search_multimodal",
546                    engine: "runtime-search",
547                    result,
548                    affected_rows: 0,
549                    statement_type: "select",
550                })
551            }
552            SearchCommand::Index {
553                index,
554                value,
555                collection,
556                limit,
557                exact,
558                limit_param,
559            } => {
560                if limit_param.is_some() {
561                    return Err(RedDBError::Query(
562                        "SEARCH INDEX LIMIT $N parameter was not bound before execution"
563                            .to_string(),
564                    ));
565                }
566                let collections = collection.as_ref().map(|c| vec![c.clone()]);
567                let res = self.search_index(
568                    index.clone(),
569                    value.clone(),
570                    *exact,
571                    collections,
572                    None,
573                    None,
574                    Some(*limit),
575                )?;
576                let mut result =
577                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
578                for item in &res.matches {
579                    let mut record = UnifiedRecord::new();
580                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
581                    record.set("score", Value::Float(item.score as f64));
582                    result.push(record);
583                }
584                Ok(RuntimeQueryResult {
585                    query: raw_query.to_string(),
586                    mode: QueryMode::Sql,
587                    statement: "search_index",
588                    engine: "runtime-search",
589                    result,
590                    affected_rows: 0,
591                    statement_type: "select",
592                })
593            }
594            SearchCommand::Context {
595                query,
596                field,
597                collection,
598                limit,
599                depth,
600                limit_param,
601            } => {
602                if limit_param.is_some() {
603                    return Err(RedDBError::Query(
604                        "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
605                            .to_string(),
606                    ));
607                }
608                use crate::application::SearchContextInput;
609                // Issue #119: route through AuthorizedSearch so the
610                // candidate set + every expansion bucket is bounded by
611                // `EffectiveScope.visible_collections`.
612                let input = SearchContextInput {
613                    query: query.clone(),
614                    field: field.clone(),
615                    vector: None,
616                    collections: collection.as_ref().map(|c| vec![c.clone()]),
617                    graph_depth: Some(*depth),
618                    graph_max_edges: None,
619                    max_cross_refs: None,
620                    follow_cross_refs: None,
621                    expand_graph: None,
622                    global_scan: None,
623                    reindex: None,
624                    limit: Some(*limit),
625                    min_score: None,
626                };
627                let scope = self.ai_scope();
628                let res =
629                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
630                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
631                            self, &scope, input,
632                        )?
633                    } else {
634                        self.search_context(input)?
635                    };
636                let mut result = UnifiedResult::with_columns(vec![
637                    "entity_id".into(),
638                    "collection".into(),
639                    "score".into(),
640                    "discovery".into(),
641                    "kind".into(),
642                ]);
643                let all_entities = res
644                    .tables
645                    .iter()
646                    .map(|e| (e, "table"))
647                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
648                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
649                    .chain(res.vectors.iter().map(|e| (e, "vector")))
650                    .chain(res.documents.iter().map(|e| (e, "document")))
651                    .chain(res.key_values.iter().map(|e| (e, "kv")));
652                for (entity, kind) in all_entities {
653                    let mut record = UnifiedRecord::new();
654                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
655                    record.set("collection", Value::text(entity.collection.clone()));
656                    record.set("score", Value::Float(entity.score as f64));
657                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
658                    record.set("kind", Value::text(kind.to_string()));
659                    result.push(record);
660                }
661                Ok(RuntimeQueryResult {
662                    query: raw_query.to_string(),
663                    mode: QueryMode::Sql,
664                    statement: "search_context",
665                    engine: "runtime-context",
666                    result,
667                    affected_rows: 0,
668                    statement_type: "select",
669                })
670            }
671            SearchCommand::SpatialRadius {
672                center_lat,
673                center_lon,
674                radius_km,
675                collection,
676                column,
677                limit,
678                limit_param,
679            } => {
680                if limit_param.is_some() {
681                    return Err(RedDBError::Query(
682                        "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
683                            .to_string(),
684                    ));
685                }
686                use crate::storage::unified::spatial_index::haversine_km;
687                let _ = column; // Column indicates which field holds geo data
688                let store = self.inner.db.store();
689                let entities = store
690                    .get_collection(collection)
691                    .map(|m| m.query_all(|_| true))
692                    .unwrap_or_default();
693
694                let mut hits: Vec<(u64, f64)> = Vec::new();
695                for entity in &entities {
696                    // Extract lat/lon from GeoPoint values in entity data
697                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
698                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
699                        if dist <= *radius_km {
700                            hits.push((entity.id.raw(), dist));
701                        }
702                    }
703                }
704                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
705                hits.truncate(*limit);
706
707                let mut result =
708                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
709                for (id, dist) in &hits {
710                    let mut record = UnifiedRecord::new();
711                    record.set("entity_id", Value::UnsignedInteger(*id));
712                    record.set("distance_km", Value::Float(*dist));
713                    result.push(record);
714                }
715                Ok(RuntimeQueryResult {
716                    query: raw_query.to_string(),
717                    mode: QueryMode::Sql,
718                    statement: "search_spatial_radius",
719                    engine: "runtime-spatial",
720                    result,
721                    affected_rows: 0,
722                    statement_type: "select",
723                })
724            }
725            SearchCommand::SpatialBbox {
726                min_lat,
727                min_lon,
728                max_lat,
729                max_lon,
730                collection,
731                column,
732                limit,
733                limit_param,
734            } => {
735                if limit_param.is_some() {
736                    return Err(RedDBError::Query(
737                        "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
738                            .to_string(),
739                    ));
740                }
741                let _ = column;
742                let store = self.inner.db.store();
743                let entities = store
744                    .get_collection(collection)
745                    .map(|m| m.query_all(|_| true))
746                    .unwrap_or_default();
747
748                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
749                let mut count = 0;
750                for entity in &entities {
751                    if count >= *limit {
752                        break;
753                    }
754                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
755                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
756                        {
757                            let mut record = UnifiedRecord::new();
758                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
759                            result.push(record);
760                            count += 1;
761                        }
762                    }
763                }
764                Ok(RuntimeQueryResult {
765                    query: raw_query.to_string(),
766                    mode: QueryMode::Sql,
767                    statement: "search_spatial_bbox",
768                    engine: "runtime-spatial",
769                    result,
770                    affected_rows: 0,
771                    statement_type: "select",
772                })
773            }
774            SearchCommand::SpatialNearest {
775                lat,
776                lon,
777                k,
778                collection,
779                column,
780                k_param,
781            } => {
782                if k_param.is_some() {
783                    return Err(RedDBError::Query(
784                        "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
785                            .to_string(),
786                    ));
787                }
788                use crate::storage::unified::spatial_index::haversine_km;
789                let _ = column;
790                let store = self.inner.db.store();
791                let entities = store
792                    .get_collection(collection)
793                    .map(|m| m.query_all(|_| true))
794                    .unwrap_or_default();
795
796                let mut hits: Vec<(u64, f64)> = Vec::new();
797                for entity in &entities {
798                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
799                        let dist = haversine_km(*lat, *lon, elat, elon);
800                        hits.push((entity.id.raw(), dist));
801                    }
802                }
803                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
804                hits.truncate(*k);
805
806                let mut result =
807                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
808                for (id, dist) in &hits {
809                    let mut record = UnifiedRecord::new();
810                    record.set("entity_id", Value::UnsignedInteger(*id));
811                    record.set("distance_km", Value::Float(*dist));
812                    result.push(record);
813                }
814                Ok(RuntimeQueryResult {
815                    query: raw_query.to_string(),
816                    mode: QueryMode::Sql,
817                    statement: "search_spatial_nearest",
818                    engine: "runtime-spatial",
819                    result,
820                    affected_rows: 0,
821                    statement_type: "select",
822                })
823            }
824        }
825    }
826}
827
828// =============================================================================
829// Conversion helpers for string -> enum
830// =============================================================================
831
832fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
833    match s.to_lowercase().as_str() {
834        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
835        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
836        "both" | "any" => Ok(RuntimeGraphDirection::Both),
837        _ => Err(RedDBError::Query(format!(
838            "unknown direction: '{s}', expected outgoing|incoming|both"
839        ))),
840    }
841}
842
843fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
844    match s.to_lowercase().as_str() {
845        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
846        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
847        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
848        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
849        _ => Err(RedDBError::Query(format!(
850            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
851        ))),
852    }
853}
854
855fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
856    match s.to_lowercase().as_str() {
857        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
858        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
859        _ => Err(RedDBError::Query(format!(
860            "unknown traversal strategy: '{s}', expected bfs|dfs"
861        ))),
862    }
863}
864
865fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
866    match s.to_lowercase().as_str() {
867        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
868        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
869        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
870        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
871        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
872        _ => Err(RedDBError::Query(format!(
873            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
874        ))),
875    }
876}
877
878fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
879    match s.to_lowercase().as_str() {
880        "label_propagation" | "labelpropagation" => {
881            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
882        }
883        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
884        _ => Err(RedDBError::Query(format!(
885            "unknown community algorithm: '{s}', expected label_propagation|louvain"
886        ))),
887    }
888}
889
890fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
891    match s.to_lowercase().as_str() {
892        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
893        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
894        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
895        _ => Err(RedDBError::Query(format!(
896            "unknown components mode: '{s}', expected connected|weak|strong"
897        ))),
898    }
899}
900
901/// Extract (latitude, longitude) from an entity.
902///
903/// Looks for GeoPoint values in the entity data (row columns or node properties)
904/// or dedicated lat/lon fields. Returns degrees.
905fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
906    match &entity.data {
907        EntityData::Row(row) => {
908            // Search named columns for GeoPoint or lat/lon pairs
909            if let Some(ref named) = row.named {
910                // Direct GeoPoint value
911                for value in named.values() {
912                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
913                        return Some((
914                            *lat_micro as f64 / 1_000_000.0,
915                            *lon_micro as f64 / 1_000_000.0,
916                        ));
917                    }
918                }
919                // Try lat/lon or latitude/longitude named fields
920                let lat =
921                    named
922                        .get("lat")
923                        .or_else(|| named.get("latitude"))
924                        .and_then(|v| match v {
925                            Value::Float(f) => Some(*f),
926                            Value::Integer(i) => Some(*i as f64),
927                            _ => None,
928                        });
929                let lon = named
930                    .get("lon")
931                    .or_else(|| named.get("lng"))
932                    .or_else(|| named.get("longitude"))
933                    .and_then(|v| match v {
934                        Value::Float(f) => Some(*f),
935                        Value::Integer(i) => Some(*i as f64),
936                        _ => None,
937                    });
938                if let (Some(la), Some(lo)) = (lat, lon) {
939                    return Some((la, lo));
940                }
941            }
942            // Search positional columns for GeoPoint
943            for value in &row.columns {
944                if let Value::GeoPoint(lat_micro, lon_micro) = value {
945                    return Some((
946                        *lat_micro as f64 / 1_000_000.0,
947                        *lon_micro as f64 / 1_000_000.0,
948                    ));
949                }
950            }
951            None
952        }
953        EntityData::Node(node) => {
954            // Search node properties
955            for value in node.properties.values() {
956                if let Value::GeoPoint(lat_micro, lon_micro) = value {
957                    return Some((
958                        *lat_micro as f64 / 1_000_000.0,
959                        *lon_micro as f64 / 1_000_000.0,
960                    ));
961                }
962            }
963            let lat = node
964                .properties
965                .get("lat")
966                .or_else(|| node.properties.get("latitude"))
967                .and_then(|v| match v {
968                    Value::Float(f) => Some(*f),
969                    Value::Integer(i) => Some(*i as f64),
970                    _ => None,
971                });
972            let lon = node
973                .properties
974                .get("lon")
975                .or_else(|| node.properties.get("lng"))
976                .or_else(|| node.properties.get("longitude"))
977                .and_then(|v| match v {
978                    Value::Float(f) => Some(*f),
979                    Value::Integer(i) => Some(*i as f64),
980                    _ => None,
981                });
982            if let (Some(la), Some(lo)) = (lat, lon) {
983                return Some((la, lo));
984            }
985            None
986        }
987        _ => None,
988    }
989}