Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12    /// Execute a GRAPH analytics command.
13    pub fn execute_graph_command(
14        &self,
15        raw_query: &str,
16        cmd: &GraphCommand,
17    ) -> RedDBResult<RuntimeQueryResult> {
18        match cmd {
19            GraphCommand::Neighborhood {
20                source,
21                depth,
22                direction,
23                edge_labels,
24            } => {
25                let dir = parse_direction(direction)?;
26                let res = self.graph_neighborhood(
27                    source,
28                    dir,
29                    *depth as usize,
30                    edge_labels.clone(),
31                    None,
32                )?;
33                let mut result = UnifiedResult::with_columns(vec![
34                    "node_id".into(),
35                    "label".into(),
36                    "node_type".into(),
37                    "depth".into(),
38                ]);
39                for visit in &res.nodes {
40                    let mut record = UnifiedRecord::new();
41                    record.set("node_id", Value::text(visit.node.id.clone()));
42                    record.set("label", Value::text(visit.node.label.clone()));
43                    record.set("node_type", Value::text(visit.node.node_type.clone()));
44                    record.set("depth", Value::Integer(visit.depth as i64));
45                    result.push(record);
46                }
47                Ok(RuntimeQueryResult {
48                    query: raw_query.to_string(),
49                    mode: QueryMode::Sql,
50                    statement: "graph_neighborhood",
51                    engine: "runtime-graph",
52                    result,
53                    affected_rows: 0,
54                    statement_type: "select",
55                })
56            }
57            GraphCommand::ShortestPath {
58                source,
59                target,
60                algorithm,
61                direction,
62                limit,
63                order_by,
64            } => {
65                let dir = parse_direction(direction)?;
66                let alg = parse_path_algorithm(algorithm)?;
67                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68                let mut result = UnifiedResult::with_columns(vec![
69                    "source".into(),
70                    "target".into(),
71                    "nodes_visited".into(),
72                    "negative_cycle_detected".into(),
73                    "hop_count".into(),
74                    "total_weight".into(),
75                ]);
76                let mut record = UnifiedRecord::new();
77                record.set("source", Value::text(res.source));
78                record.set("target", Value::text(res.target));
79                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80                record.set(
81                    "negative_cycle_detected",
82                    match res.negative_cycle_detected {
83                        Some(value) => Value::Boolean(value),
84                        None => Value::Null,
85                    },
86                );
87                if let Some(ref path) = res.path {
88                    record.set("hop_count", Value::Integer(path.hop_count as i64));
89                    record.set("total_weight", Value::Float(path.total_weight));
90                } else {
91                    record.set("hop_count", Value::Null);
92                    record.set("total_weight", Value::Null);
93                }
94                result.push(record);
95                apply_graph_order_and_limit(
96                    &mut result,
97                    "graph_shortest_path",
98                    order_by.as_ref(),
99                    limit.map(|n| n as usize),
100                )?;
101                Ok(RuntimeQueryResult {
102                    query: raw_query.to_string(),
103                    mode: QueryMode::Sql,
104                    statement: "graph_shortest_path",
105                    engine: "runtime-graph",
106                    result,
107                    affected_rows: 0,
108                    statement_type: "select",
109                })
110            }
111            GraphCommand::Properties { source } => {
112                if let Some(node_ref) = source {
113                    // Per-node property lookup (#423). Uses the same label
114                    // resolution as NEIGHBORHOOD/TRAVERSE so '<label>' and
115                    // '<numeric id>' both work.
116                    let graph =
117                        materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
118                    let resolved = resolve_graph_node_id(&graph, node_ref)?;
119                    let stored = graph
120                        .get_node(&resolved)
121                        .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
122                    let all_props =
123                        materialize_graph_node_properties(self.inner.db.store().as_ref())?;
124                    let props = all_props.get(&resolved).cloned().unwrap_or_default();
125
126                    // Fixed columns first, then property keys in sorted order so
127                    // the schema is stable across snapshots / wire renders.
128                    let mut prop_keys: Vec<&String> = props.keys().collect();
129                    prop_keys.sort();
130                    let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
131                    columns.push("node_id".into());
132                    columns.push("label".into());
133                    columns.push("node_type".into());
134                    for k in &prop_keys {
135                        columns.push((*k).clone());
136                    }
137                    let mut result = UnifiedResult::with_columns(columns);
138                    let mut record = UnifiedRecord::new();
139                    record.set("node_id", Value::text(stored.id.clone()));
140                    record.set("label", Value::text(stored.label.clone()));
141                    record.set("node_type", Value::text(stored.node_type.clone()));
142                    for k in &prop_keys {
143                        if let Some(v) = props.get(*k) {
144                            record.set(k.as_str(), v.clone());
145                        }
146                    }
147                    result.push(record);
148                    return Ok(RuntimeQueryResult {
149                        query: raw_query.to_string(),
150                        mode: QueryMode::Sql,
151                        statement: "graph_properties",
152                        engine: "runtime-graph",
153                        result,
154                        affected_rows: 0,
155                        statement_type: "select",
156                    });
157                }
158                let res = self.graph_properties(None)?;
159                let mut result = UnifiedResult::with_columns(vec![
160                    "node_count".into(),
161                    "edge_count".into(),
162                    "is_connected".into(),
163                    "is_complete".into(),
164                    "is_cyclic".into(),
165                    "density".into(),
166                ]);
167                let mut record = UnifiedRecord::new();
168                record.set("node_count", Value::Integer(res.node_count as i64));
169                record.set("edge_count", Value::Integer(res.edge_count as i64));
170                record.set("is_connected", Value::Boolean(res.is_connected));
171                record.set("is_complete", Value::Boolean(res.is_complete));
172                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
173                record.set("density", Value::Float(res.density));
174                result.push(record);
175                Ok(RuntimeQueryResult {
176                    query: raw_query.to_string(),
177                    mode: QueryMode::Sql,
178                    statement: "graph_properties",
179                    engine: "runtime-graph",
180                    result,
181                    affected_rows: 0,
182                    statement_type: "select",
183                })
184            }
185            GraphCommand::Traverse {
186                source,
187                strategy,
188                depth,
189                direction,
190                edge_labels,
191            } => {
192                let dir = parse_direction(direction)?;
193                let strat = parse_traversal_strategy(strategy)?;
194                let res = self.graph_traverse(
195                    source,
196                    dir,
197                    *depth as usize,
198                    strat,
199                    edge_labels.clone(),
200                    None,
201                )?;
202                let mut result = UnifiedResult::with_columns(vec![
203                    "node_id".into(),
204                    "label".into(),
205                    "node_type".into(),
206                    "depth".into(),
207                ]);
208                for visit in &res.visits {
209                    let mut record = UnifiedRecord::new();
210                    record.set("node_id", Value::text(visit.node.id.clone()));
211                    record.set("label", Value::text(visit.node.label.clone()));
212                    record.set("node_type", Value::text(visit.node.node_type.clone()));
213                    record.set("depth", Value::Integer(visit.depth as i64));
214                    result.push(record);
215                }
216                Ok(RuntimeQueryResult {
217                    query: raw_query.to_string(),
218                    mode: QueryMode::Sql,
219                    statement: "graph_traverse",
220                    engine: "runtime-graph",
221                    result,
222                    affected_rows: 0,
223                    statement_type: "select",
224                })
225            }
226            GraphCommand::Centrality {
227                algorithm,
228                limit,
229                order_by,
230            } => {
231                let alg = parse_centrality_algorithm(algorithm)?;
232                // `limit = None` keeps historical implicit top-100 cap.
233                // `Some(0)` returns zero rows (standard SQL LIMIT 0 semantics).
234                let limit_usize = limit.map(|n| n as usize);
235                let order_needs_full_set = order_by
236                    .as_ref()
237                    .map(|order| order.ascending)
238                    .unwrap_or(false);
239                let top_k = if order_needs_full_set {
240                    usize::MAX
241                } else {
242                    limit_usize.unwrap_or(100).max(1)
243                };
244                let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
245                let mut result = UnifiedResult::with_columns(vec![
246                    "node_id".into(),
247                    "label".into(),
248                    "score".into(),
249                ]);
250                for score in &res.scores {
251                    let mut record = UnifiedRecord::new();
252                    record.set("node_id", Value::text(score.node.id.clone()));
253                    record.set("label", Value::text(score.node.label.clone()));
254                    record.set("score", Value::Float(score.score));
255                    result.push(record);
256                }
257                for ds in &res.degree_scores {
258                    let mut record = UnifiedRecord::new();
259                    record.set("node_id", Value::text(ds.node.id.clone()));
260                    record.set("label", Value::text(ds.node.label.clone()));
261                    record.set("score", Value::Float(ds.total_degree as f64));
262                    result.push(record);
263                }
264                apply_graph_order_and_limit(
265                    &mut result,
266                    "graph_centrality",
267                    order_by.as_ref(),
268                    Some(limit_usize.unwrap_or(100)),
269                )?;
270                Ok(RuntimeQueryResult {
271                    query: raw_query.to_string(),
272                    mode: QueryMode::Sql,
273                    statement: "graph_centrality",
274                    engine: "runtime-graph",
275                    result,
276                    affected_rows: 0,
277                    statement_type: "select",
278                })
279            }
280            GraphCommand::Community {
281                algorithm,
282                max_iterations,
283                limit,
284                order_by,
285            } => {
286                let alg = parse_community_algorithm(algorithm)?;
287                let res =
288                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
289                let mut result =
290                    UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
291                for community in &res.communities {
292                    let mut record = UnifiedRecord::new();
293                    record.set("community_id", Value::text(community.id.clone()));
294                    record.set("size", Value::Integer(community.size as i64));
295                    result.push(record);
296                }
297                apply_graph_order_and_limit(
298                    &mut result,
299                    "graph_community",
300                    order_by.as_ref(),
301                    limit.map(|n| n as usize),
302                )?;
303                Ok(RuntimeQueryResult {
304                    query: raw_query.to_string(),
305                    mode: QueryMode::Sql,
306                    statement: "graph_community",
307                    engine: "runtime-graph",
308                    result,
309                    affected_rows: 0,
310                    statement_type: "select",
311                })
312            }
313            GraphCommand::Components {
314                mode,
315                limit,
316                order_by,
317            } => {
318                let m = parse_components_mode(mode)?;
319                let res = self.graph_components(m, 1, None)?;
320                let mut result =
321                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
322                for component in &res.components {
323                    let mut record = UnifiedRecord::new();
324                    record.set("component_id", Value::text(component.id.clone()));
325                    record.set("size", Value::Integer(component.size as i64));
326                    result.push(record);
327                }
328                apply_graph_order_and_limit(
329                    &mut result,
330                    "graph_components",
331                    order_by.as_ref(),
332                    limit.map(|n| n as usize),
333                )?;
334                Ok(RuntimeQueryResult {
335                    query: raw_query.to_string(),
336                    mode: QueryMode::Sql,
337                    statement: "graph_components",
338                    engine: "runtime-graph",
339                    result,
340                    affected_rows: 0,
341                    statement_type: "select",
342                })
343            }
344            GraphCommand::Cycles { max_length } => {
345                let res = self.graph_cycles(*max_length as usize, 100, None)?;
346                let mut result =
347                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
348                for (i, cycle) in res.cycles.iter().enumerate() {
349                    let mut record = UnifiedRecord::new();
350                    record.set("cycle_index", Value::Integer(i as i64));
351                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
352                    result.push(record);
353                }
354                Ok(RuntimeQueryResult {
355                    query: raw_query.to_string(),
356                    mode: QueryMode::Sql,
357                    statement: "graph_cycles",
358                    engine: "runtime-graph",
359                    result,
360                    affected_rows: 0,
361                    statement_type: "select",
362                })
363            }
364            GraphCommand::Clustering => {
365                let res = self.graph_clustering(100, true, None)?;
366                let mut result = UnifiedResult::with_columns(vec![
367                    "node_id".into(),
368                    "label".into(),
369                    "score".into(),
370                ]);
371                // First row: global coefficient
372                let mut global_record = UnifiedRecord::new();
373                global_record.set("node_id", Value::text("__global__"));
374                global_record.set("label", Value::text("global_clustering"));
375                global_record.set("score", Value::Float(res.global));
376                result.push(global_record);
377                for score in &res.local {
378                    let mut record = UnifiedRecord::new();
379                    record.set("node_id", Value::text(score.node.id.clone()));
380                    record.set("label", Value::text(score.node.label.clone()));
381                    record.set("score", Value::Float(score.score));
382                    result.push(record);
383                }
384                Ok(RuntimeQueryResult {
385                    query: raw_query.to_string(),
386                    mode: QueryMode::Sql,
387                    statement: "graph_clustering",
388                    engine: "runtime-graph",
389                    result,
390                    affected_rows: 0,
391                    statement_type: "select",
392                })
393            }
394            GraphCommand::TopologicalSort => {
395                let res = self.graph_topological_sort(None)?;
396                let mut result = UnifiedResult::with_columns(vec![
397                    "order".into(),
398                    "node_id".into(),
399                    "label".into(),
400                ]);
401                for (i, node) in res.ordered_nodes.iter().enumerate() {
402                    let mut record = UnifiedRecord::new();
403                    record.set("order", Value::Integer(i as i64));
404                    record.set("node_id", Value::text(node.id.clone()));
405                    record.set("label", Value::text(node.label.clone()));
406                    result.push(record);
407                }
408                Ok(RuntimeQueryResult {
409                    query: raw_query.to_string(),
410                    mode: QueryMode::Sql,
411                    statement: "graph_topological_sort",
412                    engine: "runtime-graph",
413                    result,
414                    affected_rows: 0,
415                    statement_type: "select",
416                })
417            }
418        }
419    }
420
421    /// Execute a SEARCH command.
422    pub fn execute_search_command(
423        &self,
424        raw_query: &str,
425        cmd: &SearchCommand,
426    ) -> RedDBResult<RuntimeQueryResult> {
427        match cmd {
428            SearchCommand::Similar {
429                vector,
430                text,
431                provider,
432                collection,
433                limit,
434                min_score,
435                vector_param,
436                limit_param,
437                min_score_param,
438                text_param,
439            } => {
440                if vector_param.is_some() {
441                    return Err(RedDBError::Query(
442                        "SEARCH SIMILAR $N vector parameter was not bound before execution"
443                            .to_string(),
444                    ));
445                }
446                if limit_param.is_some() {
447                    return Err(RedDBError::Query(
448                        "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
449                            .to_string(),
450                    ));
451                }
452                if min_score_param.is_some() {
453                    return Err(RedDBError::Query(
454                        "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
455                            .to_string(),
456                    ));
457                }
458                if text_param.is_some() {
459                    return Err(RedDBError::Query(
460                        "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
461                            .to_string(),
462                    ));
463                }
464                // If text provided, generate embedding first (semantic search)
465                let search_vector = if let Some(query_text) = text {
466                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
467                    let provider = match provider.as_deref() {
468                        Some(p) => crate::ai::parse_provider(p)?,
469                        None => default_provider,
470                    };
471                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
472                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
473                        .ok()
474                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
475                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
476                    let request = crate::ai::OpenAiEmbeddingRequest {
477                        api_key,
478                        model,
479                        inputs: vec![query_text.clone()],
480                        dimensions: None,
481                        api_base: provider.resolve_api_base(),
482                    };
483                    let response = crate::runtime::ai::block_on_ai(async move {
484                        crate::ai::openai_embeddings_async(&transport, request).await
485                    })
486                    .and_then(|result| result)?;
487                    response.embeddings.into_iter().next().ok_or_else(|| {
488                        RedDBError::Query("embedding API returned no vectors".to_string())
489                    })?
490                } else {
491                    vector.clone()
492                };
493                // Issue #119: route through AuthorizedSearch so the
494                // candidate set is gated by `EffectiveScope.visible_collections`
495                // before any similarity score is computed.
496                let scope = self.ai_scope();
497                let results =
498                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
499                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
500                            self,
501                            &scope,
502                            collection,
503                            &search_vector,
504                            *limit,
505                            *min_score,
506                        )?
507                    } else {
508                        // Embedded / no-auth caller: keep legacy behaviour.
509                        self.search_similar(collection, &search_vector, *limit, *min_score)?
510                    };
511                let mut result =
512                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
513                for sr in &results {
514                    let mut record = UnifiedRecord::new();
515                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
516                    record.set("score", Value::Float(sr.score as f64));
517                    result.push(record);
518                }
519                Ok(RuntimeQueryResult {
520                    query: raw_query.to_string(),
521                    mode: QueryMode::Sql,
522                    statement: "search_similar",
523                    engine: "runtime-search",
524                    result,
525                    affected_rows: 0,
526                    statement_type: "select",
527                })
528            }
529            SearchCommand::Text {
530                query,
531                collection,
532                limit,
533                fuzzy,
534                limit_param,
535            } => {
536                if limit_param.is_some() {
537                    return Err(RedDBError::Query(
538                        "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
539                    ));
540                }
541                let collections = collection.as_ref().map(|c| vec![c.clone()]);
542                // Issue #119: gate the candidate set by visible_collections.
543                let scope = self.ai_scope();
544                let res =
545                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
546                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
547                            self,
548                            &scope,
549                            query.clone(),
550                            collections,
551                            None,
552                            None,
553                            None,
554                            Some(*limit),
555                            *fuzzy,
556                        )?
557                    } else {
558                        self.search_text(
559                            query.clone(),
560                            collections,
561                            None,
562                            None,
563                            None,
564                            Some(*limit),
565                            *fuzzy,
566                        )?
567                    };
568                let mut result =
569                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
570                for item in &res.matches {
571                    let mut record = UnifiedRecord::new();
572                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
573                    record.set("score", Value::Float(item.score as f64));
574                    result.push(record);
575                }
576                Ok(RuntimeQueryResult {
577                    query: raw_query.to_string(),
578                    mode: QueryMode::Sql,
579                    statement: "search_text",
580                    engine: "runtime-search",
581                    result,
582                    affected_rows: 0,
583                    statement_type: "select",
584                })
585            }
586            SearchCommand::Hybrid {
587                vector,
588                query,
589                collection,
590                limit,
591                limit_param,
592            } => {
593                if limit_param.is_some() {
594                    return Err(RedDBError::Query(
595                        "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
596                            .to_string(),
597                    ));
598                }
599                let res = self.search_hybrid(
600                    vector.clone(),
601                    query.clone(),
602                    Some(*limit),
603                    Some(vec![collection.clone()]),
604                    None,
605                    None,
606                    None,
607                    Vec::new(),
608                    None,
609                    None,
610                    Some(*limit),
611                )?;
612                let mut result =
613                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
614                for item in &res.matches {
615                    let mut record = UnifiedRecord::new();
616                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
617                    record.set("score", Value::Float(item.score as f64));
618                    result.push(record);
619                }
620                Ok(RuntimeQueryResult {
621                    query: raw_query.to_string(),
622                    mode: QueryMode::Sql,
623                    statement: "search_hybrid",
624                    engine: "runtime-search",
625                    result,
626                    affected_rows: 0,
627                    statement_type: "select",
628                })
629            }
630            SearchCommand::Multimodal {
631                query,
632                collection,
633                limit,
634                limit_param,
635            } => {
636                if limit_param.is_some() {
637                    return Err(RedDBError::Query(
638                        "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
639                            .to_string(),
640                    ));
641                }
642                let collections = collection.as_ref().map(|c| vec![c.clone()]);
643                let res =
644                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
645                let mut result =
646                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
647                for item in &res.matches {
648                    let mut record = UnifiedRecord::new();
649                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
650                    record.set("score", Value::Float(item.score as f64));
651                    result.push(record);
652                }
653                Ok(RuntimeQueryResult {
654                    query: raw_query.to_string(),
655                    mode: QueryMode::Sql,
656                    statement: "search_multimodal",
657                    engine: "runtime-search",
658                    result,
659                    affected_rows: 0,
660                    statement_type: "select",
661                })
662            }
663            SearchCommand::Index {
664                index,
665                value,
666                collection,
667                limit,
668                exact,
669                limit_param,
670            } => {
671                if limit_param.is_some() {
672                    return Err(RedDBError::Query(
673                        "SEARCH INDEX LIMIT $N parameter was not bound before execution"
674                            .to_string(),
675                    ));
676                }
677                let collections = collection.as_ref().map(|c| vec![c.clone()]);
678                let res = self.search_index(
679                    index.clone(),
680                    value.clone(),
681                    *exact,
682                    collections,
683                    None,
684                    None,
685                    Some(*limit),
686                )?;
687                let mut result =
688                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
689                for item in &res.matches {
690                    let mut record = UnifiedRecord::new();
691                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
692                    record.set("score", Value::Float(item.score as f64));
693                    result.push(record);
694                }
695                Ok(RuntimeQueryResult {
696                    query: raw_query.to_string(),
697                    mode: QueryMode::Sql,
698                    statement: "search_index",
699                    engine: "runtime-search",
700                    result,
701                    affected_rows: 0,
702                    statement_type: "select",
703                })
704            }
705            SearchCommand::Context {
706                query,
707                field,
708                collection,
709                limit,
710                depth,
711                limit_param,
712            } => {
713                if limit_param.is_some() {
714                    return Err(RedDBError::Query(
715                        "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
716                            .to_string(),
717                    ));
718                }
719                use crate::application::SearchContextInput;
720                // Issue #119: route through AuthorizedSearch so the
721                // candidate set + every expansion bucket is bounded by
722                // `EffectiveScope.visible_collections`.
723                let input = SearchContextInput {
724                    query: query.clone(),
725                    field: field.clone(),
726                    vector: None,
727                    collections: collection.as_ref().map(|c| vec![c.clone()]),
728                    graph_depth: Some(*depth),
729                    graph_max_edges: None,
730                    max_cross_refs: None,
731                    follow_cross_refs: None,
732                    expand_graph: None,
733                    global_scan: None,
734                    reindex: None,
735                    limit: Some(*limit),
736                    min_score: None,
737                };
738                let scope = self.ai_scope();
739                let res =
740                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
741                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
742                            self, &scope, input,
743                        )?
744                    } else {
745                        self.search_context(input)?
746                    };
747                let mut result = UnifiedResult::with_columns(vec![
748                    "entity_id".into(),
749                    "collection".into(),
750                    "score".into(),
751                    "discovery".into(),
752                    "kind".into(),
753                ]);
754                let all_entities = res
755                    .tables
756                    .iter()
757                    .map(|e| (e, "table"))
758                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
759                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
760                    .chain(res.vectors.iter().map(|e| (e, "vector")))
761                    .chain(res.documents.iter().map(|e| (e, "document")))
762                    .chain(res.key_values.iter().map(|e| (e, "kv")));
763                for (entity, kind) in all_entities {
764                    let mut record = UnifiedRecord::new();
765                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
766                    record.set("collection", Value::text(entity.collection.clone()));
767                    record.set("score", Value::Float(entity.score as f64));
768                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
769                    record.set("kind", Value::text(kind.to_string()));
770                    result.push(record);
771                }
772                Ok(RuntimeQueryResult {
773                    query: raw_query.to_string(),
774                    mode: QueryMode::Sql,
775                    statement: "search_context",
776                    engine: "runtime-context",
777                    result,
778                    affected_rows: 0,
779                    statement_type: "select",
780                })
781            }
782            SearchCommand::SpatialRadius {
783                center_lat,
784                center_lon,
785                radius_km,
786                collection,
787                column,
788                limit,
789                limit_param,
790            } => {
791                if limit_param.is_some() {
792                    return Err(RedDBError::Query(
793                        "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
794                            .to_string(),
795                    ));
796                }
797                use crate::storage::unified::spatial_index::haversine_km;
798                let _ = column; // Column indicates which field holds geo data
799                let store = self.inner.db.store();
800                let entities = store
801                    .get_collection(collection)
802                    .map(|m| m.query_all(|_| true))
803                    .unwrap_or_default();
804
805                let mut hits: Vec<(u64, f64)> = Vec::new();
806                for entity in &entities {
807                    // Extract lat/lon from GeoPoint values in entity data
808                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
809                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
810                        if dist <= *radius_km {
811                            hits.push((entity.id.raw(), dist));
812                        }
813                    }
814                }
815                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
816                hits.truncate(*limit);
817
818                let mut result =
819                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
820                for (id, dist) in &hits {
821                    let mut record = UnifiedRecord::new();
822                    record.set("entity_id", Value::UnsignedInteger(*id));
823                    record.set("distance_km", Value::Float(*dist));
824                    result.push(record);
825                }
826                Ok(RuntimeQueryResult {
827                    query: raw_query.to_string(),
828                    mode: QueryMode::Sql,
829                    statement: "search_spatial_radius",
830                    engine: "runtime-spatial",
831                    result,
832                    affected_rows: 0,
833                    statement_type: "select",
834                })
835            }
836            SearchCommand::SpatialBbox {
837                min_lat,
838                min_lon,
839                max_lat,
840                max_lon,
841                collection,
842                column,
843                limit,
844                limit_param,
845            } => {
846                if limit_param.is_some() {
847                    return Err(RedDBError::Query(
848                        "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
849                            .to_string(),
850                    ));
851                }
852                let _ = column;
853                let store = self.inner.db.store();
854                let entities = store
855                    .get_collection(collection)
856                    .map(|m| m.query_all(|_| true))
857                    .unwrap_or_default();
858
859                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
860                let mut count = 0;
861                for entity in &entities {
862                    if count >= *limit {
863                        break;
864                    }
865                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
866                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
867                        {
868                            let mut record = UnifiedRecord::new();
869                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
870                            result.push(record);
871                            count += 1;
872                        }
873                    }
874                }
875                Ok(RuntimeQueryResult {
876                    query: raw_query.to_string(),
877                    mode: QueryMode::Sql,
878                    statement: "search_spatial_bbox",
879                    engine: "runtime-spatial",
880                    result,
881                    affected_rows: 0,
882                    statement_type: "select",
883                })
884            }
885            SearchCommand::SpatialNearest {
886                lat,
887                lon,
888                k,
889                collection,
890                column,
891                k_param,
892            } => {
893                if k_param.is_some() {
894                    return Err(RedDBError::Query(
895                        "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
896                            .to_string(),
897                    ));
898                }
899                use crate::storage::unified::spatial_index::haversine_km;
900                let _ = column;
901                let store = self.inner.db.store();
902                let entities = store
903                    .get_collection(collection)
904                    .map(|m| m.query_all(|_| true))
905                    .unwrap_or_default();
906
907                let mut hits: Vec<(u64, f64)> = Vec::new();
908                for entity in &entities {
909                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
910                        let dist = haversine_km(*lat, *lon, elat, elon);
911                        hits.push((entity.id.raw(), dist));
912                    }
913                }
914                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
915                hits.truncate(*k);
916
917                let mut result =
918                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
919                for (id, dist) in &hits {
920                    let mut record = UnifiedRecord::new();
921                    record.set("entity_id", Value::UnsignedInteger(*id));
922                    record.set("distance_km", Value::Float(*dist));
923                    result.push(record);
924                }
925                Ok(RuntimeQueryResult {
926                    query: raw_query.to_string(),
927                    mode: QueryMode::Sql,
928                    statement: "search_spatial_nearest",
929                    engine: "runtime-spatial",
930                    result,
931                    affected_rows: 0,
932                    statement_type: "select",
933                })
934            }
935        }
936    }
937}
938
939fn apply_graph_order_and_limit(
940    result: &mut UnifiedResult,
941    statement: &str,
942    order_by: Option<&GraphCommandOrderBy>,
943    limit: Option<usize>,
944) -> RedDBResult<()> {
945    if let Some(order) = order_by {
946        let column = graph_order_metric_column(statement, &order.metric)?;
947        let columns = result.columns.clone();
948        result.records.sort_by(|left, right| {
949            let cmp = compare_graph_values(left.get(column), right.get(column));
950            let cmp = if order.ascending { cmp } else { cmp.reverse() };
951            if cmp == Ordering::Equal {
952                compare_graph_rows(left, right, &columns)
953            } else {
954                cmp
955            }
956        });
957    }
958    if let Some(limit) = limit {
959        result.records.truncate(limit);
960    }
961    Ok(())
962}
963
964fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
965    let metric = metric.to_ascii_lowercase();
966    match (statement, metric.as_str()) {
967        ("graph_centrality", "score" | "centrality_score") => Ok("score"),
968        ("graph_community", "size" | "community_size") => Ok("size"),
969        ("graph_components", "size" | "component_size") => Ok("size"),
970        ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
971            Ok(match metric.as_str() {
972                "total_weight" => "total_weight",
973                "nodes_visited" => "nodes_visited",
974                _ => "hop_count",
975            })
976        }
977        _ => Err(RedDBError::Query(format!(
978            "unsupported ORDER BY metric '{metric}' for GRAPH {}",
979            statement.trim_start_matches("graph_")
980        ))),
981    }
982}
983
984fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
985    for column in columns {
986        let cmp = compare_graph_values(left.get(column), right.get(column));
987        if cmp != Ordering::Equal {
988            return cmp;
989        }
990    }
991    Ordering::Equal
992}
993
994fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
995    match (left, right) {
996        (None, None) => Ordering::Equal,
997        (None, Some(_)) => Ordering::Less,
998        (Some(_), None) => Ordering::Greater,
999        (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1000        (Some(Value::Null), Some(_)) => Ordering::Less,
1001        (Some(_), Some(Value::Null)) => Ordering::Greater,
1002        (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1003        (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1004            left.cmp(right)
1005        }
1006        (Some(Value::Float(left)), Some(Value::Float(right))) => {
1007            left.partial_cmp(right).unwrap_or(Ordering::Equal)
1008        }
1009        (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1010            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1011        }
1012        (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1013            .partial_cmp(&(*right as f64))
1014            .unwrap_or(Ordering::Equal),
1015        (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1016            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1017        }
1018        (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1019            .partial_cmp(&(*right as f64))
1020            .unwrap_or(Ordering::Equal),
1021        (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1022            (*left as i128).cmp(&(*right as i128))
1023        }
1024        (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1025            (*left as i128).cmp(&(*right as i128))
1026        }
1027        (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1028        (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1029        (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1030        (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1031    }
1032}
1033
1034// =============================================================================
1035// Conversion helpers for string -> enum
1036// =============================================================================
1037
1038fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1039    match s.to_lowercase().as_str() {
1040        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1041        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1042        "both" | "any" => Ok(RuntimeGraphDirection::Both),
1043        _ => Err(RedDBError::Query(format!(
1044            "unknown direction: '{s}', expected outgoing|incoming|both"
1045        ))),
1046    }
1047}
1048
1049fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1050    match s.to_lowercase().as_str() {
1051        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1052        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1053        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1054        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1055        _ => Err(RedDBError::Query(format!(
1056            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1057        ))),
1058    }
1059}
1060
1061fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1062    match s.to_lowercase().as_str() {
1063        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1064        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1065        _ => Err(RedDBError::Query(format!(
1066            "unknown traversal strategy: '{s}', expected bfs|dfs"
1067        ))),
1068    }
1069}
1070
1071fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1072    match s.to_lowercase().as_str() {
1073        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1074        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1075        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1076        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1077        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1078        _ => Err(RedDBError::Query(format!(
1079            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1080        ))),
1081    }
1082}
1083
1084fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1085    match s.to_lowercase().as_str() {
1086        "label_propagation" | "labelpropagation" => {
1087            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1088        }
1089        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1090        _ => Err(RedDBError::Query(format!(
1091            "unknown community algorithm: '{s}', expected label_propagation|louvain"
1092        ))),
1093    }
1094}
1095
1096fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1097    match s.to_lowercase().as_str() {
1098        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1099        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1100        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1101        _ => Err(RedDBError::Query(format!(
1102            "unknown components mode: '{s}', expected connected|weak|strong"
1103        ))),
1104    }
1105}
1106
1107/// Extract (latitude, longitude) from an entity.
1108///
1109/// Looks for GeoPoint values in the entity data (row columns or node properties)
1110/// or dedicated lat/lon fields. Returns degrees.
1111fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1112    match &entity.data {
1113        EntityData::Row(row) => {
1114            // Search named columns for GeoPoint or lat/lon pairs
1115            if let Some(ref named) = row.named {
1116                // Direct GeoPoint value
1117                for value in named.values() {
1118                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
1119                        return Some((
1120                            *lat_micro as f64 / 1_000_000.0,
1121                            *lon_micro as f64 / 1_000_000.0,
1122                        ));
1123                    }
1124                }
1125                // Try lat/lon or latitude/longitude named fields
1126                let lat =
1127                    named
1128                        .get("lat")
1129                        .or_else(|| named.get("latitude"))
1130                        .and_then(|v| match v {
1131                            Value::Float(f) => Some(*f),
1132                            Value::Integer(i) => Some(*i as f64),
1133                            _ => None,
1134                        });
1135                let lon = named
1136                    .get("lon")
1137                    .or_else(|| named.get("lng"))
1138                    .or_else(|| named.get("longitude"))
1139                    .and_then(|v| match v {
1140                        Value::Float(f) => Some(*f),
1141                        Value::Integer(i) => Some(*i as f64),
1142                        _ => None,
1143                    });
1144                if let (Some(la), Some(lo)) = (lat, lon) {
1145                    return Some((la, lo));
1146                }
1147            }
1148            // Search positional columns for GeoPoint
1149            for value in &row.columns {
1150                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1151                    return Some((
1152                        *lat_micro as f64 / 1_000_000.0,
1153                        *lon_micro as f64 / 1_000_000.0,
1154                    ));
1155                }
1156            }
1157            None
1158        }
1159        EntityData::Node(node) => {
1160            // Search node properties
1161            for value in node.properties.values() {
1162                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1163                    return Some((
1164                        *lat_micro as f64 / 1_000_000.0,
1165                        *lon_micro as f64 / 1_000_000.0,
1166                    ));
1167                }
1168            }
1169            let lat = node
1170                .properties
1171                .get("lat")
1172                .or_else(|| node.properties.get("latitude"))
1173                .and_then(|v| match v {
1174                    Value::Float(f) => Some(*f),
1175                    Value::Integer(i) => Some(*i as f64),
1176                    _ => None,
1177                });
1178            let lon = node
1179                .properties
1180                .get("lon")
1181                .or_else(|| node.properties.get("lng"))
1182                .or_else(|| node.properties.get("longitude"))
1183                .and_then(|v| match v {
1184                    Value::Float(f) => Some(*f),
1185                    Value::Integer(i) => Some(*i as f64),
1186                    _ => None,
1187                });
1188            if let (Some(la), Some(lo)) = (lat, lon) {
1189                return Some((la, lo));
1190            }
1191            None
1192        }
1193        _ => None,
1194    }
1195}