Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12    /// Execute a GRAPH analytics command.
13    pub fn execute_graph_command(
14        &self,
15        raw_query: &str,
16        cmd: &GraphCommand,
17    ) -> RedDBResult<RuntimeQueryResult> {
18        match cmd {
19            GraphCommand::Neighborhood {
20                source,
21                depth,
22                direction,
23                edge_labels,
24            } => {
25                let dir = parse_direction(direction)?;
26                let res = self.graph_neighborhood(
27                    source,
28                    dir,
29                    *depth as usize,
30                    edge_labels.clone(),
31                    None,
32                )?;
33                let mut result = UnifiedResult::with_columns(vec![
34                    "node_id".into(),
35                    "label".into(),
36                    "node_type".into(),
37                    "depth".into(),
38                ]);
39                for visit in &res.nodes {
40                    let mut record = UnifiedRecord::new();
41                    record.set("node_id", Value::text(visit.node.id.clone()));
42                    record.set("label", Value::text(visit.node.label.clone()));
43                    record.set("node_type", Value::text(visit.node.node_type.clone()));
44                    record.set("depth", Value::Integer(visit.depth as i64));
45                    result.push(record);
46                }
47                Ok(RuntimeQueryResult {
48                    query: raw_query.to_string(),
49                    mode: QueryMode::Sql,
50                    statement: "graph_neighborhood",
51                    engine: "runtime-graph",
52                    result,
53                    affected_rows: 0,
54                    statement_type: "select",
55                })
56            }
57            GraphCommand::ShortestPath {
58                source,
59                target,
60                algorithm,
61                direction,
62                limit,
63                order_by,
64            } => {
65                let dir = parse_direction(direction)?;
66                let alg = parse_path_algorithm(algorithm)?;
67                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68                let mut result = UnifiedResult::with_columns(vec![
69                    "source".into(),
70                    "target".into(),
71                    "nodes_visited".into(),
72                    "negative_cycle_detected".into(),
73                    "hop_count".into(),
74                    "total_weight".into(),
75                ]);
76                let mut record = UnifiedRecord::new();
77                record.set("source", Value::text(res.source));
78                record.set("target", Value::text(res.target));
79                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80                record.set(
81                    "negative_cycle_detected",
82                    match res.negative_cycle_detected {
83                        Some(value) => Value::Boolean(value),
84                        None => Value::Null,
85                    },
86                );
87                if let Some(ref path) = res.path {
88                    record.set("hop_count", Value::Integer(path.hop_count as i64));
89                    record.set("total_weight", Value::Float(path.total_weight));
90                } else {
91                    record.set("hop_count", Value::Null);
92                    record.set("total_weight", Value::Null);
93                }
94                result.push(record);
95                apply_graph_order_and_limit(
96                    &mut result,
97                    "graph_shortest_path",
98                    order_by.as_ref(),
99                    limit.map(|n| n as usize),
100                )?;
101                Ok(RuntimeQueryResult {
102                    query: raw_query.to_string(),
103                    mode: QueryMode::Sql,
104                    statement: "graph_shortest_path",
105                    engine: "runtime-graph",
106                    result,
107                    affected_rows: 0,
108                    statement_type: "select",
109                })
110            }
111            GraphCommand::Properties { source } => {
112                if let Some(node_ref) = source {
113                    // Per-node property lookup (#423). Uses the same label
114                    // resolution as NEIGHBORHOOD/TRAVERSE so '<label>' and
115                    // '<numeric id>' both work.
116                    let graph = materialize_graph_with_projection(
117                        self.inner.db.store().as_ref(),
118                        None,
119                    )?;
120                    let resolved = resolve_graph_node_id(&graph, node_ref)?;
121                    let stored = graph.get_node(&resolved).ok_or_else(|| {
122                        RedDBError::NotFound(node_ref.to_string())
123                    })?;
124                    let all_props = materialize_graph_node_properties(
125                        self.inner.db.store().as_ref(),
126                    )?;
127                    let props = all_props.get(&resolved).cloned().unwrap_or_default();
128
129                    // Fixed columns first, then property keys in sorted order so
130                    // the schema is stable across snapshots / wire renders.
131                    let mut prop_keys: Vec<&String> = props.keys().collect();
132                    prop_keys.sort();
133                    let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
134                    columns.push("node_id".into());
135                    columns.push("label".into());
136                    columns.push("node_type".into());
137                    for k in &prop_keys {
138                        columns.push((*k).clone());
139                    }
140                    let mut result = UnifiedResult::with_columns(columns);
141                    let mut record = UnifiedRecord::new();
142                    record.set("node_id", Value::text(stored.id.clone()));
143                    record.set("label", Value::text(stored.label.clone()));
144                    record.set("node_type", Value::text(stored.node_type.clone()));
145                    for k in &prop_keys {
146                        if let Some(v) = props.get(*k) {
147                            record.set(k.as_str(), v.clone());
148                        }
149                    }
150                    result.push(record);
151                    return Ok(RuntimeQueryResult {
152                        query: raw_query.to_string(),
153                        mode: QueryMode::Sql,
154                        statement: "graph_properties",
155                        engine: "runtime-graph",
156                        result,
157                        affected_rows: 0,
158                        statement_type: "select",
159                    });
160                }
161                let res = self.graph_properties(None)?;
162                let mut result = UnifiedResult::with_columns(vec![
163                    "node_count".into(),
164                    "edge_count".into(),
165                    "is_connected".into(),
166                    "is_complete".into(),
167                    "is_cyclic".into(),
168                    "density".into(),
169                ]);
170                let mut record = UnifiedRecord::new();
171                record.set("node_count", Value::Integer(res.node_count as i64));
172                record.set("edge_count", Value::Integer(res.edge_count as i64));
173                record.set("is_connected", Value::Boolean(res.is_connected));
174                record.set("is_complete", Value::Boolean(res.is_complete));
175                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
176                record.set("density", Value::Float(res.density));
177                result.push(record);
178                Ok(RuntimeQueryResult {
179                    query: raw_query.to_string(),
180                    mode: QueryMode::Sql,
181                    statement: "graph_properties",
182                    engine: "runtime-graph",
183                    result,
184                    affected_rows: 0,
185                    statement_type: "select",
186                })
187            }
188            GraphCommand::Traverse {
189                source,
190                strategy,
191                depth,
192                direction,
193                edge_labels,
194            } => {
195                let dir = parse_direction(direction)?;
196                let strat = parse_traversal_strategy(strategy)?;
197                let res = self.graph_traverse(
198                    source,
199                    dir,
200                    *depth as usize,
201                    strat,
202                    edge_labels.clone(),
203                    None,
204                )?;
205                let mut result = UnifiedResult::with_columns(vec![
206                    "node_id".into(),
207                    "label".into(),
208                    "node_type".into(),
209                    "depth".into(),
210                ]);
211                for visit in &res.visits {
212                    let mut record = UnifiedRecord::new();
213                    record.set("node_id", Value::text(visit.node.id.clone()));
214                    record.set("label", Value::text(visit.node.label.clone()));
215                    record.set("node_type", Value::text(visit.node.node_type.clone()));
216                    record.set("depth", Value::Integer(visit.depth as i64));
217                    result.push(record);
218                }
219                Ok(RuntimeQueryResult {
220                    query: raw_query.to_string(),
221                    mode: QueryMode::Sql,
222                    statement: "graph_traverse",
223                    engine: "runtime-graph",
224                    result,
225                    affected_rows: 0,
226                    statement_type: "select",
227                })
228            }
229            GraphCommand::Centrality {
230                algorithm,
231                limit,
232                order_by,
233            } => {
234                let alg = parse_centrality_algorithm(algorithm)?;
235                // `limit = None` keeps historical implicit top-100 cap.
236                // `Some(0)` returns zero rows (standard SQL LIMIT 0 semantics).
237                let limit_usize = limit.map(|n| n as usize);
238                let order_needs_full_set = order_by
239                    .as_ref()
240                    .map(|order| order.ascending)
241                    .unwrap_or(false);
242                let top_k = if order_needs_full_set {
243                    usize::MAX
244                } else {
245                    limit_usize.unwrap_or(100).max(1)
246                };
247                let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
248                let mut result = UnifiedResult::with_columns(vec![
249                    "node_id".into(),
250                    "label".into(),
251                    "score".into(),
252                ]);
253                for score in &res.scores {
254                    let mut record = UnifiedRecord::new();
255                    record.set("node_id", Value::text(score.node.id.clone()));
256                    record.set("label", Value::text(score.node.label.clone()));
257                    record.set("score", Value::Float(score.score));
258                    result.push(record);
259                }
260                for ds in &res.degree_scores {
261                    let mut record = UnifiedRecord::new();
262                    record.set("node_id", Value::text(ds.node.id.clone()));
263                    record.set("label", Value::text(ds.node.label.clone()));
264                    record.set("score", Value::Float(ds.total_degree as f64));
265                    result.push(record);
266                }
267                apply_graph_order_and_limit(
268                    &mut result,
269                    "graph_centrality",
270                    order_by.as_ref(),
271                    Some(limit_usize.unwrap_or(100)),
272                )?;
273                Ok(RuntimeQueryResult {
274                    query: raw_query.to_string(),
275                    mode: QueryMode::Sql,
276                    statement: "graph_centrality",
277                    engine: "runtime-graph",
278                    result,
279                    affected_rows: 0,
280                    statement_type: "select",
281                })
282            }
283            GraphCommand::Community {
284                algorithm,
285                max_iterations,
286                limit,
287                order_by,
288            } => {
289                let alg = parse_community_algorithm(algorithm)?;
290                let res =
291                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
292                let mut result =
293                    UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
294                for community in &res.communities {
295                    let mut record = UnifiedRecord::new();
296                    record.set("community_id", Value::text(community.id.clone()));
297                    record.set("size", Value::Integer(community.size as i64));
298                    result.push(record);
299                }
300                apply_graph_order_and_limit(
301                    &mut result,
302                    "graph_community",
303                    order_by.as_ref(),
304                    limit.map(|n| n as usize),
305                )?;
306                Ok(RuntimeQueryResult {
307                    query: raw_query.to_string(),
308                    mode: QueryMode::Sql,
309                    statement: "graph_community",
310                    engine: "runtime-graph",
311                    result,
312                    affected_rows: 0,
313                    statement_type: "select",
314                })
315            }
316            GraphCommand::Components {
317                mode,
318                limit,
319                order_by,
320            } => {
321                let m = parse_components_mode(mode)?;
322                let res = self.graph_components(m, 1, None)?;
323                let mut result =
324                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
325                for component in &res.components {
326                    let mut record = UnifiedRecord::new();
327                    record.set("component_id", Value::text(component.id.clone()));
328                    record.set("size", Value::Integer(component.size as i64));
329                    result.push(record);
330                }
331                apply_graph_order_and_limit(
332                    &mut result,
333                    "graph_components",
334                    order_by.as_ref(),
335                    limit.map(|n| n as usize),
336                )?;
337                Ok(RuntimeQueryResult {
338                    query: raw_query.to_string(),
339                    mode: QueryMode::Sql,
340                    statement: "graph_components",
341                    engine: "runtime-graph",
342                    result,
343                    affected_rows: 0,
344                    statement_type: "select",
345                })
346            }
347            GraphCommand::Cycles { max_length } => {
348                let res = self.graph_cycles(*max_length as usize, 100, None)?;
349                let mut result =
350                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
351                for (i, cycle) in res.cycles.iter().enumerate() {
352                    let mut record = UnifiedRecord::new();
353                    record.set("cycle_index", Value::Integer(i as i64));
354                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
355                    result.push(record);
356                }
357                Ok(RuntimeQueryResult {
358                    query: raw_query.to_string(),
359                    mode: QueryMode::Sql,
360                    statement: "graph_cycles",
361                    engine: "runtime-graph",
362                    result,
363                    affected_rows: 0,
364                    statement_type: "select",
365                })
366            }
367            GraphCommand::Clustering => {
368                let res = self.graph_clustering(100, true, None)?;
369                let mut result = UnifiedResult::with_columns(vec![
370                    "node_id".into(),
371                    "label".into(),
372                    "score".into(),
373                ]);
374                // First row: global coefficient
375                let mut global_record = UnifiedRecord::new();
376                global_record.set("node_id", Value::text("__global__"));
377                global_record.set("label", Value::text("global_clustering"));
378                global_record.set("score", Value::Float(res.global));
379                result.push(global_record);
380                for score in &res.local {
381                    let mut record = UnifiedRecord::new();
382                    record.set("node_id", Value::text(score.node.id.clone()));
383                    record.set("label", Value::text(score.node.label.clone()));
384                    record.set("score", Value::Float(score.score));
385                    result.push(record);
386                }
387                Ok(RuntimeQueryResult {
388                    query: raw_query.to_string(),
389                    mode: QueryMode::Sql,
390                    statement: "graph_clustering",
391                    engine: "runtime-graph",
392                    result,
393                    affected_rows: 0,
394                    statement_type: "select",
395                })
396            }
397            GraphCommand::TopologicalSort => {
398                let res = self.graph_topological_sort(None)?;
399                let mut result = UnifiedResult::with_columns(vec![
400                    "order".into(),
401                    "node_id".into(),
402                    "label".into(),
403                ]);
404                for (i, node) in res.ordered_nodes.iter().enumerate() {
405                    let mut record = UnifiedRecord::new();
406                    record.set("order", Value::Integer(i as i64));
407                    record.set("node_id", Value::text(node.id.clone()));
408                    record.set("label", Value::text(node.label.clone()));
409                    result.push(record);
410                }
411                Ok(RuntimeQueryResult {
412                    query: raw_query.to_string(),
413                    mode: QueryMode::Sql,
414                    statement: "graph_topological_sort",
415                    engine: "runtime-graph",
416                    result,
417                    affected_rows: 0,
418                    statement_type: "select",
419                })
420            }
421        }
422    }
423
424    /// Execute a SEARCH command.
425    pub fn execute_search_command(
426        &self,
427        raw_query: &str,
428        cmd: &SearchCommand,
429    ) -> RedDBResult<RuntimeQueryResult> {
430        match cmd {
431            SearchCommand::Similar {
432                vector,
433                text,
434                provider,
435                collection,
436                limit,
437                min_score,
438                vector_param,
439                limit_param,
440                min_score_param,
441                text_param,
442            } => {
443                if vector_param.is_some() {
444                    return Err(RedDBError::Query(
445                        "SEARCH SIMILAR $N vector parameter was not bound before execution"
446                            .to_string(),
447                    ));
448                }
449                if limit_param.is_some() {
450                    return Err(RedDBError::Query(
451                        "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
452                            .to_string(),
453                    ));
454                }
455                if min_score_param.is_some() {
456                    return Err(RedDBError::Query(
457                        "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
458                            .to_string(),
459                    ));
460                }
461                if text_param.is_some() {
462                    return Err(RedDBError::Query(
463                        "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
464                            .to_string(),
465                    ));
466                }
467                // If text provided, generate embedding first (semantic search)
468                let search_vector = if let Some(query_text) = text {
469                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
470                    let provider = match provider.as_deref() {
471                        Some(p) => crate::ai::parse_provider(p)?,
472                        None => default_provider,
473                    };
474                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
475                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
476                        .ok()
477                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
478                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
479                    let request = crate::ai::OpenAiEmbeddingRequest {
480                        api_key,
481                        model,
482                        inputs: vec![query_text.clone()],
483                        dimensions: None,
484                        api_base: provider.resolve_api_base(),
485                    };
486                    let response = crate::runtime::ai::block_on_ai(async move {
487                        crate::ai::openai_embeddings_async(&transport, request).await
488                    })
489                    .and_then(|result| result)?;
490                    response.embeddings.into_iter().next().ok_or_else(|| {
491                        RedDBError::Query("embedding API returned no vectors".to_string())
492                    })?
493                } else {
494                    vector.clone()
495                };
496                // Issue #119: route through AuthorizedSearch so the
497                // candidate set is gated by `EffectiveScope.visible_collections`
498                // before any similarity score is computed.
499                let scope = self.ai_scope();
500                let results =
501                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
502                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
503                            self,
504                            &scope,
505                            collection,
506                            &search_vector,
507                            *limit,
508                            *min_score,
509                        )?
510                    } else {
511                        // Embedded / no-auth caller: keep legacy behaviour.
512                        self.search_similar(collection, &search_vector, *limit, *min_score)?
513                    };
514                let mut result =
515                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
516                for sr in &results {
517                    let mut record = UnifiedRecord::new();
518                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
519                    record.set("score", Value::Float(sr.score as f64));
520                    result.push(record);
521                }
522                Ok(RuntimeQueryResult {
523                    query: raw_query.to_string(),
524                    mode: QueryMode::Sql,
525                    statement: "search_similar",
526                    engine: "runtime-search",
527                    result,
528                    affected_rows: 0,
529                    statement_type: "select",
530                })
531            }
532            SearchCommand::Text {
533                query,
534                collection,
535                limit,
536                fuzzy,
537                limit_param,
538            } => {
539                if limit_param.is_some() {
540                    return Err(RedDBError::Query(
541                        "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
542                    ));
543                }
544                let collections = collection.as_ref().map(|c| vec![c.clone()]);
545                // Issue #119: gate the candidate set by visible_collections.
546                let scope = self.ai_scope();
547                let res =
548                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
549                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
550                            self,
551                            &scope,
552                            query.clone(),
553                            collections,
554                            None,
555                            None,
556                            None,
557                            Some(*limit),
558                            *fuzzy,
559                        )?
560                    } else {
561                        self.search_text(
562                            query.clone(),
563                            collections,
564                            None,
565                            None,
566                            None,
567                            Some(*limit),
568                            *fuzzy,
569                        )?
570                    };
571                let mut result =
572                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
573                for item in &res.matches {
574                    let mut record = UnifiedRecord::new();
575                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
576                    record.set("score", Value::Float(item.score as f64));
577                    result.push(record);
578                }
579                Ok(RuntimeQueryResult {
580                    query: raw_query.to_string(),
581                    mode: QueryMode::Sql,
582                    statement: "search_text",
583                    engine: "runtime-search",
584                    result,
585                    affected_rows: 0,
586                    statement_type: "select",
587                })
588            }
589            SearchCommand::Hybrid {
590                vector,
591                query,
592                collection,
593                limit,
594                limit_param,
595            } => {
596                if limit_param.is_some() {
597                    return Err(RedDBError::Query(
598                        "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
599                            .to_string(),
600                    ));
601                }
602                let res = self.search_hybrid(
603                    vector.clone(),
604                    query.clone(),
605                    Some(*limit),
606                    Some(vec![collection.clone()]),
607                    None,
608                    None,
609                    None,
610                    Vec::new(),
611                    None,
612                    None,
613                    Some(*limit),
614                )?;
615                let mut result =
616                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
617                for item in &res.matches {
618                    let mut record = UnifiedRecord::new();
619                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
620                    record.set("score", Value::Float(item.score as f64));
621                    result.push(record);
622                }
623                Ok(RuntimeQueryResult {
624                    query: raw_query.to_string(),
625                    mode: QueryMode::Sql,
626                    statement: "search_hybrid",
627                    engine: "runtime-search",
628                    result,
629                    affected_rows: 0,
630                    statement_type: "select",
631                })
632            }
633            SearchCommand::Multimodal {
634                query,
635                collection,
636                limit,
637                limit_param,
638            } => {
639                if limit_param.is_some() {
640                    return Err(RedDBError::Query(
641                        "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
642                            .to_string(),
643                    ));
644                }
645                let collections = collection.as_ref().map(|c| vec![c.clone()]);
646                let res =
647                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
648                let mut result =
649                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
650                for item in &res.matches {
651                    let mut record = UnifiedRecord::new();
652                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
653                    record.set("score", Value::Float(item.score as f64));
654                    result.push(record);
655                }
656                Ok(RuntimeQueryResult {
657                    query: raw_query.to_string(),
658                    mode: QueryMode::Sql,
659                    statement: "search_multimodal",
660                    engine: "runtime-search",
661                    result,
662                    affected_rows: 0,
663                    statement_type: "select",
664                })
665            }
666            SearchCommand::Index {
667                index,
668                value,
669                collection,
670                limit,
671                exact,
672                limit_param,
673            } => {
674                if limit_param.is_some() {
675                    return Err(RedDBError::Query(
676                        "SEARCH INDEX LIMIT $N parameter was not bound before execution"
677                            .to_string(),
678                    ));
679                }
680                let collections = collection.as_ref().map(|c| vec![c.clone()]);
681                let res = self.search_index(
682                    index.clone(),
683                    value.clone(),
684                    *exact,
685                    collections,
686                    None,
687                    None,
688                    Some(*limit),
689                )?;
690                let mut result =
691                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
692                for item in &res.matches {
693                    let mut record = UnifiedRecord::new();
694                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
695                    record.set("score", Value::Float(item.score as f64));
696                    result.push(record);
697                }
698                Ok(RuntimeQueryResult {
699                    query: raw_query.to_string(),
700                    mode: QueryMode::Sql,
701                    statement: "search_index",
702                    engine: "runtime-search",
703                    result,
704                    affected_rows: 0,
705                    statement_type: "select",
706                })
707            }
708            SearchCommand::Context {
709                query,
710                field,
711                collection,
712                limit,
713                depth,
714                limit_param,
715            } => {
716                if limit_param.is_some() {
717                    return Err(RedDBError::Query(
718                        "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
719                            .to_string(),
720                    ));
721                }
722                use crate::application::SearchContextInput;
723                // Issue #119: route through AuthorizedSearch so the
724                // candidate set + every expansion bucket is bounded by
725                // `EffectiveScope.visible_collections`.
726                let input = SearchContextInput {
727                    query: query.clone(),
728                    field: field.clone(),
729                    vector: None,
730                    collections: collection.as_ref().map(|c| vec![c.clone()]),
731                    graph_depth: Some(*depth),
732                    graph_max_edges: None,
733                    max_cross_refs: None,
734                    follow_cross_refs: None,
735                    expand_graph: None,
736                    global_scan: None,
737                    reindex: None,
738                    limit: Some(*limit),
739                    min_score: None,
740                };
741                let scope = self.ai_scope();
742                let res =
743                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
744                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
745                            self, &scope, input,
746                        )?
747                    } else {
748                        self.search_context(input)?
749                    };
750                let mut result = UnifiedResult::with_columns(vec![
751                    "entity_id".into(),
752                    "collection".into(),
753                    "score".into(),
754                    "discovery".into(),
755                    "kind".into(),
756                ]);
757                let all_entities = res
758                    .tables
759                    .iter()
760                    .map(|e| (e, "table"))
761                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
762                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
763                    .chain(res.vectors.iter().map(|e| (e, "vector")))
764                    .chain(res.documents.iter().map(|e| (e, "document")))
765                    .chain(res.key_values.iter().map(|e| (e, "kv")));
766                for (entity, kind) in all_entities {
767                    let mut record = UnifiedRecord::new();
768                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
769                    record.set("collection", Value::text(entity.collection.clone()));
770                    record.set("score", Value::Float(entity.score as f64));
771                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
772                    record.set("kind", Value::text(kind.to_string()));
773                    result.push(record);
774                }
775                Ok(RuntimeQueryResult {
776                    query: raw_query.to_string(),
777                    mode: QueryMode::Sql,
778                    statement: "search_context",
779                    engine: "runtime-context",
780                    result,
781                    affected_rows: 0,
782                    statement_type: "select",
783                })
784            }
785            SearchCommand::SpatialRadius {
786                center_lat,
787                center_lon,
788                radius_km,
789                collection,
790                column,
791                limit,
792                limit_param,
793            } => {
794                if limit_param.is_some() {
795                    return Err(RedDBError::Query(
796                        "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
797                            .to_string(),
798                    ));
799                }
800                use crate::storage::unified::spatial_index::haversine_km;
801                let _ = column; // Column indicates which field holds geo data
802                let store = self.inner.db.store();
803                let entities = store
804                    .get_collection(collection)
805                    .map(|m| m.query_all(|_| true))
806                    .unwrap_or_default();
807
808                let mut hits: Vec<(u64, f64)> = Vec::new();
809                for entity in &entities {
810                    // Extract lat/lon from GeoPoint values in entity data
811                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
812                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
813                        if dist <= *radius_km {
814                            hits.push((entity.id.raw(), dist));
815                        }
816                    }
817                }
818                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
819                hits.truncate(*limit);
820
821                let mut result =
822                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
823                for (id, dist) in &hits {
824                    let mut record = UnifiedRecord::new();
825                    record.set("entity_id", Value::UnsignedInteger(*id));
826                    record.set("distance_km", Value::Float(*dist));
827                    result.push(record);
828                }
829                Ok(RuntimeQueryResult {
830                    query: raw_query.to_string(),
831                    mode: QueryMode::Sql,
832                    statement: "search_spatial_radius",
833                    engine: "runtime-spatial",
834                    result,
835                    affected_rows: 0,
836                    statement_type: "select",
837                })
838            }
839            SearchCommand::SpatialBbox {
840                min_lat,
841                min_lon,
842                max_lat,
843                max_lon,
844                collection,
845                column,
846                limit,
847                limit_param,
848            } => {
849                if limit_param.is_some() {
850                    return Err(RedDBError::Query(
851                        "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
852                            .to_string(),
853                    ));
854                }
855                let _ = column;
856                let store = self.inner.db.store();
857                let entities = store
858                    .get_collection(collection)
859                    .map(|m| m.query_all(|_| true))
860                    .unwrap_or_default();
861
862                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
863                let mut count = 0;
864                for entity in &entities {
865                    if count >= *limit {
866                        break;
867                    }
868                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
869                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
870                        {
871                            let mut record = UnifiedRecord::new();
872                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
873                            result.push(record);
874                            count += 1;
875                        }
876                    }
877                }
878                Ok(RuntimeQueryResult {
879                    query: raw_query.to_string(),
880                    mode: QueryMode::Sql,
881                    statement: "search_spatial_bbox",
882                    engine: "runtime-spatial",
883                    result,
884                    affected_rows: 0,
885                    statement_type: "select",
886                })
887            }
888            SearchCommand::SpatialNearest {
889                lat,
890                lon,
891                k,
892                collection,
893                column,
894                k_param,
895            } => {
896                if k_param.is_some() {
897                    return Err(RedDBError::Query(
898                        "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
899                            .to_string(),
900                    ));
901                }
902                use crate::storage::unified::spatial_index::haversine_km;
903                let _ = column;
904                let store = self.inner.db.store();
905                let entities = store
906                    .get_collection(collection)
907                    .map(|m| m.query_all(|_| true))
908                    .unwrap_or_default();
909
910                let mut hits: Vec<(u64, f64)> = Vec::new();
911                for entity in &entities {
912                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
913                        let dist = haversine_km(*lat, *lon, elat, elon);
914                        hits.push((entity.id.raw(), dist));
915                    }
916                }
917                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
918                hits.truncate(*k);
919
920                let mut result =
921                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
922                for (id, dist) in &hits {
923                    let mut record = UnifiedRecord::new();
924                    record.set("entity_id", Value::UnsignedInteger(*id));
925                    record.set("distance_km", Value::Float(*dist));
926                    result.push(record);
927                }
928                Ok(RuntimeQueryResult {
929                    query: raw_query.to_string(),
930                    mode: QueryMode::Sql,
931                    statement: "search_spatial_nearest",
932                    engine: "runtime-spatial",
933                    result,
934                    affected_rows: 0,
935                    statement_type: "select",
936                })
937            }
938        }
939    }
940}
941
942fn apply_graph_order_and_limit(
943    result: &mut UnifiedResult,
944    statement: &str,
945    order_by: Option<&GraphCommandOrderBy>,
946    limit: Option<usize>,
947) -> RedDBResult<()> {
948    if let Some(order) = order_by {
949        let column = graph_order_metric_column(statement, &order.metric)?;
950        let columns = result.columns.clone();
951        result.records.sort_by(|left, right| {
952            let cmp = compare_graph_values(left.get(column), right.get(column));
953            let cmp = if order.ascending { cmp } else { cmp.reverse() };
954            if cmp == Ordering::Equal {
955                compare_graph_rows(left, right, &columns)
956            } else {
957                cmp
958            }
959        });
960    }
961    if let Some(limit) = limit {
962        result.records.truncate(limit);
963    }
964    Ok(())
965}
966
967fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
968    let metric = metric.to_ascii_lowercase();
969    match (statement, metric.as_str()) {
970        ("graph_centrality", "score" | "centrality_score") => Ok("score"),
971        ("graph_community", "size" | "community_size") => Ok("size"),
972        ("graph_components", "size" | "component_size") => Ok("size"),
973        ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
974            Ok(match metric.as_str() {
975                "total_weight" => "total_weight",
976                "nodes_visited" => "nodes_visited",
977                _ => "hop_count",
978            })
979        }
980        _ => Err(RedDBError::Query(format!(
981            "unsupported ORDER BY metric '{metric}' for GRAPH {}",
982            statement.trim_start_matches("graph_")
983        ))),
984    }
985}
986
987fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
988    for column in columns {
989        let cmp = compare_graph_values(left.get(column), right.get(column));
990        if cmp != Ordering::Equal {
991            return cmp;
992        }
993    }
994    Ordering::Equal
995}
996
997fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
998    match (left, right) {
999        (None, None) => Ordering::Equal,
1000        (None, Some(_)) => Ordering::Less,
1001        (Some(_), None) => Ordering::Greater,
1002        (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1003        (Some(Value::Null), Some(_)) => Ordering::Less,
1004        (Some(_), Some(Value::Null)) => Ordering::Greater,
1005        (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1006        (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1007            left.cmp(right)
1008        }
1009        (Some(Value::Float(left)), Some(Value::Float(right))) => {
1010            left.partial_cmp(right).unwrap_or(Ordering::Equal)
1011        }
1012        (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1013            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1014        }
1015        (Some(Value::Float(left)), Some(Value::Integer(right))) => {
1016            left.partial_cmp(&(*right as f64)).unwrap_or(Ordering::Equal)
1017        }
1018        (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1019            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1020        }
1021        (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => {
1022            left.partial_cmp(&(*right as f64)).unwrap_or(Ordering::Equal)
1023        }
1024        (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1025            (*left as i128).cmp(&(*right as i128))
1026        }
1027        (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1028            (*left as i128).cmp(&(*right as i128))
1029        }
1030        (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1031        (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1032        (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1033        (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1034    }
1035}
1036
1037// =============================================================================
1038// Conversion helpers for string -> enum
1039// =============================================================================
1040
1041fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1042    match s.to_lowercase().as_str() {
1043        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1044        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1045        "both" | "any" => Ok(RuntimeGraphDirection::Both),
1046        _ => Err(RedDBError::Query(format!(
1047            "unknown direction: '{s}', expected outgoing|incoming|both"
1048        ))),
1049    }
1050}
1051
1052fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1053    match s.to_lowercase().as_str() {
1054        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1055        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1056        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1057        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1058        _ => Err(RedDBError::Query(format!(
1059            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1060        ))),
1061    }
1062}
1063
1064fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1065    match s.to_lowercase().as_str() {
1066        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1067        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1068        _ => Err(RedDBError::Query(format!(
1069            "unknown traversal strategy: '{s}', expected bfs|dfs"
1070        ))),
1071    }
1072}
1073
1074fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1075    match s.to_lowercase().as_str() {
1076        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1077        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1078        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1079        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1080        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1081        _ => Err(RedDBError::Query(format!(
1082            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1083        ))),
1084    }
1085}
1086
1087fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1088    match s.to_lowercase().as_str() {
1089        "label_propagation" | "labelpropagation" => {
1090            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1091        }
1092        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1093        _ => Err(RedDBError::Query(format!(
1094            "unknown community algorithm: '{s}', expected label_propagation|louvain"
1095        ))),
1096    }
1097}
1098
1099fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1100    match s.to_lowercase().as_str() {
1101        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1102        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1103        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1104        _ => Err(RedDBError::Query(format!(
1105            "unknown components mode: '{s}', expected connected|weak|strong"
1106        ))),
1107    }
1108}
1109
1110/// Extract (latitude, longitude) from an entity.
1111///
1112/// Looks for GeoPoint values in the entity data (row columns or node properties)
1113/// or dedicated lat/lon fields. Returns degrees.
1114fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1115    match &entity.data {
1116        EntityData::Row(row) => {
1117            // Search named columns for GeoPoint or lat/lon pairs
1118            if let Some(ref named) = row.named {
1119                // Direct GeoPoint value
1120                for value in named.values() {
1121                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
1122                        return Some((
1123                            *lat_micro as f64 / 1_000_000.0,
1124                            *lon_micro as f64 / 1_000_000.0,
1125                        ));
1126                    }
1127                }
1128                // Try lat/lon or latitude/longitude named fields
1129                let lat =
1130                    named
1131                        .get("lat")
1132                        .or_else(|| named.get("latitude"))
1133                        .and_then(|v| match v {
1134                            Value::Float(f) => Some(*f),
1135                            Value::Integer(i) => Some(*i as f64),
1136                            _ => None,
1137                        });
1138                let lon = named
1139                    .get("lon")
1140                    .or_else(|| named.get("lng"))
1141                    .or_else(|| named.get("longitude"))
1142                    .and_then(|v| match v {
1143                        Value::Float(f) => Some(*f),
1144                        Value::Integer(i) => Some(*i as f64),
1145                        _ => None,
1146                    });
1147                if let (Some(la), Some(lo)) = (lat, lon) {
1148                    return Some((la, lo));
1149                }
1150            }
1151            // Search positional columns for GeoPoint
1152            for value in &row.columns {
1153                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1154                    return Some((
1155                        *lat_micro as f64 / 1_000_000.0,
1156                        *lon_micro as f64 / 1_000_000.0,
1157                    ));
1158                }
1159            }
1160            None
1161        }
1162        EntityData::Node(node) => {
1163            // Search node properties
1164            for value in node.properties.values() {
1165                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1166                    return Some((
1167                        *lat_micro as f64 / 1_000_000.0,
1168                        *lon_micro as f64 / 1_000_000.0,
1169                    ));
1170                }
1171            }
1172            let lat = node
1173                .properties
1174                .get("lat")
1175                .or_else(|| node.properties.get("latitude"))
1176                .and_then(|v| match v {
1177                    Value::Float(f) => Some(*f),
1178                    Value::Integer(i) => Some(*i as f64),
1179                    _ => None,
1180                });
1181            let lon = node
1182                .properties
1183                .get("lon")
1184                .or_else(|| node.properties.get("lng"))
1185                .or_else(|| node.properties.get("longitude"))
1186                .and_then(|v| match v {
1187                    Value::Float(f) => Some(*f),
1188                    Value::Integer(i) => Some(*i as f64),
1189                    _ => None,
1190                });
1191            if let (Some(la), Some(lo)) = (lat, lon) {
1192                return Some((la, lo));
1193            }
1194            None
1195        }
1196        _ => None,
1197    }
1198}