Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12    /// Execute a GRAPH analytics command.
13    pub fn execute_graph_command(
14        &self,
15        raw_query: &str,
16        cmd: &GraphCommand,
17    ) -> RedDBResult<RuntimeQueryResult> {
18        match cmd {
19            GraphCommand::Neighborhood {
20                source,
21                depth,
22                direction,
23                edge_labels,
24            } => {
25                let dir = parse_direction(direction)?;
26                let res = self.graph_neighborhood(
27                    source,
28                    dir,
29                    *depth as usize,
30                    edge_labels.clone(),
31                    None,
32                )?;
33                let mut result = UnifiedResult::with_columns(vec![
34                    "node_id".into(),
35                    "label".into(),
36                    "node_type".into(),
37                    "depth".into(),
38                ]);
39                for visit in &res.nodes {
40                    let mut record = UnifiedRecord::new();
41                    record.set("node_id", Value::text(visit.node.id.clone()));
42                    record.set("label", Value::text(visit.node.label.clone()));
43                    record.set("node_type", Value::text(visit.node.node_type.clone()));
44                    record.set("depth", Value::Integer(visit.depth as i64));
45                    result.push(record);
46                }
47                Ok(RuntimeQueryResult {
48                    query: raw_query.to_string(),
49                    mode: QueryMode::Sql,
50                    statement: "graph_neighborhood",
51                    engine: "runtime-graph",
52                    result,
53                    affected_rows: 0,
54                    statement_type: "select",
55                })
56            }
57            GraphCommand::ShortestPath {
58                source,
59                target,
60                algorithm,
61                direction,
62                limit,
63                order_by,
64            } => {
65                let dir = parse_direction(direction)?;
66                let alg = parse_path_algorithm(algorithm)?;
67                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68                let mut result = UnifiedResult::with_columns(vec![
69                    "source".into(),
70                    "target".into(),
71                    "nodes_visited".into(),
72                    "negative_cycle_detected".into(),
73                    "hop_count".into(),
74                    "total_weight".into(),
75                ]);
76                let mut record = UnifiedRecord::new();
77                record.set("source", Value::text(res.source));
78                record.set("target", Value::text(res.target));
79                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80                record.set(
81                    "negative_cycle_detected",
82                    match res.negative_cycle_detected {
83                        Some(value) => Value::Boolean(value),
84                        None => Value::Null,
85                    },
86                );
87                if let Some(ref path) = res.path {
88                    record.set("hop_count", Value::Integer(path.hop_count as i64));
89                    record.set("total_weight", Value::Float(path.total_weight));
90                } else {
91                    record.set("hop_count", Value::Null);
92                    record.set("total_weight", Value::Null);
93                }
94                result.push(record);
95                apply_graph_order_and_limit(
96                    &mut result,
97                    "graph_shortest_path",
98                    order_by.as_ref(),
99                    limit.map(|n| n as usize),
100                )?;
101                Ok(RuntimeQueryResult {
102                    query: raw_query.to_string(),
103                    mode: QueryMode::Sql,
104                    statement: "graph_shortest_path",
105                    engine: "runtime-graph",
106                    result,
107                    affected_rows: 0,
108                    statement_type: "select",
109                })
110            }
111            GraphCommand::Properties { source } => {
112                if let Some(node_ref) = source {
113                    // Per-node property lookup (#423). Uses the same label
114                    // resolution as NEIGHBORHOOD/TRAVERSE so '<label>' and
115                    // '<numeric id>' both work.
116                    let graph =
117                        materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
118                    let resolved = resolve_graph_node_id(&graph, node_ref)?;
119                    let stored = graph
120                        .get_node(&resolved)
121                        .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
122                    let node_type = self
123                        .inner
124                        .db
125                        .store()
126                        .query_all(|entity| {
127                            entity.id.raw().to_string() == resolved
128                                && matches!(
129                                    entity.kind,
130                                    crate::storage::unified::EntityKind::GraphNode(_)
131                                )
132                        })
133                        .into_iter()
134                        .find_map(|(_, entity)| match entity.kind {
135                            crate::storage::unified::EntityKind::GraphNode(node) => {
136                                Some(node.node_type)
137                            }
138                            _ => None,
139                        })
140                        .unwrap_or_else(|| stored.node_type.clone());
141                    let all_props =
142                        materialize_graph_node_properties(self.inner.db.store().as_ref())?;
143                    let props = all_props.get(&resolved).cloned().unwrap_or_default();
144
145                    // Fixed columns first, then property keys in sorted order so
146                    // the schema is stable across snapshots / wire renders.
147                    let mut prop_keys: Vec<&String> = props.keys().collect();
148                    prop_keys.sort();
149                    let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
150                    columns.push("node_id".into());
151                    columns.push("label".into());
152                    columns.push("node_type".into());
153                    for k in &prop_keys {
154                        columns.push((*k).clone());
155                    }
156                    let mut result = UnifiedResult::with_columns(columns);
157                    let mut record = UnifiedRecord::new();
158                    record.set("node_id", Value::text(stored.id.clone()));
159                    record.set("label", Value::text(stored.label.clone()));
160                    record.set("node_type", Value::text(node_type));
161                    for k in &prop_keys {
162                        if let Some(v) = props.get(*k) {
163                            record.set(k.as_str(), v.clone());
164                        }
165                    }
166                    result.push(record);
167                    return Ok(RuntimeQueryResult {
168                        query: raw_query.to_string(),
169                        mode: QueryMode::Sql,
170                        statement: "graph_properties",
171                        engine: "runtime-graph",
172                        result,
173                        affected_rows: 0,
174                        statement_type: "select",
175                    });
176                }
177                let res = self.graph_properties(None)?;
178                let mut result = UnifiedResult::with_columns(vec![
179                    "node_count".into(),
180                    "edge_count".into(),
181                    "is_connected".into(),
182                    "is_complete".into(),
183                    "is_cyclic".into(),
184                    "density".into(),
185                ]);
186                let mut record = UnifiedRecord::new();
187                record.set("node_count", Value::Integer(res.node_count as i64));
188                record.set("edge_count", Value::Integer(res.edge_count as i64));
189                record.set("is_connected", Value::Boolean(res.is_connected));
190                record.set("is_complete", Value::Boolean(res.is_complete));
191                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
192                record.set("density", Value::Float(res.density));
193                result.push(record);
194                Ok(RuntimeQueryResult {
195                    query: raw_query.to_string(),
196                    mode: QueryMode::Sql,
197                    statement: "graph_properties",
198                    engine: "runtime-graph",
199                    result,
200                    affected_rows: 0,
201                    statement_type: "select",
202                })
203            }
204            GraphCommand::Traverse {
205                source,
206                strategy,
207                depth,
208                direction,
209                edge_labels,
210            } => {
211                let dir = parse_direction(direction)?;
212                let strat = parse_traversal_strategy(strategy)?;
213                let res = self.graph_traverse(
214                    source,
215                    dir,
216                    *depth as usize,
217                    strat,
218                    edge_labels.clone(),
219                    None,
220                )?;
221                let mut result = UnifiedResult::with_columns(vec![
222                    "node_id".into(),
223                    "label".into(),
224                    "node_type".into(),
225                    "depth".into(),
226                ]);
227                for visit in &res.visits {
228                    let mut record = UnifiedRecord::new();
229                    record.set("node_id", Value::text(visit.node.id.clone()));
230                    record.set("label", Value::text(visit.node.label.clone()));
231                    record.set("node_type", Value::text(visit.node.node_type.clone()));
232                    record.set("depth", Value::Integer(visit.depth as i64));
233                    result.push(record);
234                }
235                Ok(RuntimeQueryResult {
236                    query: raw_query.to_string(),
237                    mode: QueryMode::Sql,
238                    statement: "graph_traverse",
239                    engine: "runtime-graph",
240                    result,
241                    affected_rows: 0,
242                    statement_type: "select",
243                })
244            }
245            GraphCommand::Centrality {
246                algorithm,
247                limit,
248                order_by,
249            } => {
250                let alg = parse_centrality_algorithm(algorithm)?;
251                // `limit = None` keeps historical implicit top-100 cap.
252                // `Some(0)` returns zero rows (standard SQL LIMIT 0 semantics).
253                let limit_usize = limit.map(|n| n as usize);
254                let order_needs_full_set = order_by
255                    .as_ref()
256                    .map(|order| order.ascending)
257                    .unwrap_or(false);
258                let top_k = if order_needs_full_set {
259                    usize::MAX
260                } else {
261                    limit_usize.unwrap_or(100).max(1)
262                };
263                let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
264                let mut result = UnifiedResult::with_columns(vec![
265                    "node_id".into(),
266                    "label".into(),
267                    "score".into(),
268                ]);
269                for score in &res.scores {
270                    let mut record = UnifiedRecord::new();
271                    record.set("node_id", Value::text(score.node.id.clone()));
272                    record.set("label", Value::text(score.node.label.clone()));
273                    record.set("score", Value::Float(score.score));
274                    result.push(record);
275                }
276                for ds in &res.degree_scores {
277                    let mut record = UnifiedRecord::new();
278                    record.set("node_id", Value::text(ds.node.id.clone()));
279                    record.set("label", Value::text(ds.node.label.clone()));
280                    record.set("score", Value::Float(ds.total_degree as f64));
281                    result.push(record);
282                }
283                apply_graph_order_and_limit(
284                    &mut result,
285                    "graph_centrality",
286                    order_by.as_ref(),
287                    Some(limit_usize.unwrap_or(100)),
288                )?;
289                Ok(RuntimeQueryResult {
290                    query: raw_query.to_string(),
291                    mode: QueryMode::Sql,
292                    statement: "graph_centrality",
293                    engine: "runtime-graph",
294                    result,
295                    affected_rows: 0,
296                    statement_type: "select",
297                })
298            }
299            GraphCommand::Community {
300                algorithm,
301                max_iterations,
302                limit,
303                order_by,
304            } => {
305                let alg = parse_community_algorithm(algorithm)?;
306                let res =
307                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
308                let mut result =
309                    UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
310                for community in &res.communities {
311                    let mut record = UnifiedRecord::new();
312                    record.set("community_id", Value::text(community.id.clone()));
313                    record.set("size", Value::Integer(community.size as i64));
314                    result.push(record);
315                }
316                apply_graph_order_and_limit(
317                    &mut result,
318                    "graph_community",
319                    order_by.as_ref(),
320                    limit.map(|n| n as usize),
321                )?;
322                Ok(RuntimeQueryResult {
323                    query: raw_query.to_string(),
324                    mode: QueryMode::Sql,
325                    statement: "graph_community",
326                    engine: "runtime-graph",
327                    result,
328                    affected_rows: 0,
329                    statement_type: "select",
330                })
331            }
332            GraphCommand::Components {
333                mode,
334                limit,
335                order_by,
336            } => {
337                let m = parse_components_mode(mode)?;
338                let res = self.graph_components(m, 1, None)?;
339                let mut result =
340                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
341                for component in &res.components {
342                    let mut record = UnifiedRecord::new();
343                    record.set("component_id", Value::text(component.id.clone()));
344                    record.set("size", Value::Integer(component.size as i64));
345                    result.push(record);
346                }
347                apply_graph_order_and_limit(
348                    &mut result,
349                    "graph_components",
350                    order_by.as_ref(),
351                    limit.map(|n| n as usize),
352                )?;
353                Ok(RuntimeQueryResult {
354                    query: raw_query.to_string(),
355                    mode: QueryMode::Sql,
356                    statement: "graph_components",
357                    engine: "runtime-graph",
358                    result,
359                    affected_rows: 0,
360                    statement_type: "select",
361                })
362            }
363            GraphCommand::Cycles { max_length } => {
364                let res = self.graph_cycles(*max_length as usize, 100, None)?;
365                let mut result =
366                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
367                for (i, cycle) in res.cycles.iter().enumerate() {
368                    let mut record = UnifiedRecord::new();
369                    record.set("cycle_index", Value::Integer(i as i64));
370                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
371                    result.push(record);
372                }
373                Ok(RuntimeQueryResult {
374                    query: raw_query.to_string(),
375                    mode: QueryMode::Sql,
376                    statement: "graph_cycles",
377                    engine: "runtime-graph",
378                    result,
379                    affected_rows: 0,
380                    statement_type: "select",
381                })
382            }
383            GraphCommand::Clustering => {
384                let res = self.graph_clustering(100, true, None)?;
385                let mut result = UnifiedResult::with_columns(vec![
386                    "node_id".into(),
387                    "label".into(),
388                    "score".into(),
389                ]);
390                // First row: global coefficient
391                let mut global_record = UnifiedRecord::new();
392                global_record.set("node_id", Value::text("__global__"));
393                global_record.set("label", Value::text("global_clustering"));
394                global_record.set("score", Value::Float(res.global));
395                result.push(global_record);
396                for score in &res.local {
397                    let mut record = UnifiedRecord::new();
398                    record.set("node_id", Value::text(score.node.id.clone()));
399                    record.set("label", Value::text(score.node.label.clone()));
400                    record.set("score", Value::Float(score.score));
401                    result.push(record);
402                }
403                Ok(RuntimeQueryResult {
404                    query: raw_query.to_string(),
405                    mode: QueryMode::Sql,
406                    statement: "graph_clustering",
407                    engine: "runtime-graph",
408                    result,
409                    affected_rows: 0,
410                    statement_type: "select",
411                })
412            }
413            GraphCommand::TopologicalSort => {
414                let res = self.graph_topological_sort(None)?;
415                let mut result = UnifiedResult::with_columns(vec![
416                    "order".into(),
417                    "node_id".into(),
418                    "label".into(),
419                ]);
420                for (i, node) in res.ordered_nodes.iter().enumerate() {
421                    let mut record = UnifiedRecord::new();
422                    record.set("order", Value::Integer(i as i64));
423                    record.set("node_id", Value::text(node.id.clone()));
424                    record.set("label", Value::text(node.label.clone()));
425                    result.push(record);
426                }
427                Ok(RuntimeQueryResult {
428                    query: raw_query.to_string(),
429                    mode: QueryMode::Sql,
430                    statement: "graph_topological_sort",
431                    engine: "runtime-graph",
432                    result,
433                    affected_rows: 0,
434                    statement_type: "select",
435                })
436            }
437        }
438    }
439
440    /// Execute a SEARCH command.
441    pub fn execute_search_command(
442        &self,
443        raw_query: &str,
444        cmd: &SearchCommand,
445    ) -> RedDBResult<RuntimeQueryResult> {
446        match cmd {
447            SearchCommand::Similar {
448                vector,
449                text,
450                provider,
451                collection,
452                limit,
453                min_score,
454                vector_param,
455                limit_param,
456                min_score_param,
457                text_param,
458            } => {
459                if vector_param.is_some() {
460                    return Err(RedDBError::Query(
461                        "SEARCH SIMILAR $N vector parameter was not bound before execution"
462                            .to_string(),
463                    ));
464                }
465                if limit_param.is_some() {
466                    return Err(RedDBError::Query(
467                        "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
468                            .to_string(),
469                    ));
470                }
471                if min_score_param.is_some() {
472                    return Err(RedDBError::Query(
473                        "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
474                            .to_string(),
475                    ));
476                }
477                if text_param.is_some() {
478                    return Err(RedDBError::Query(
479                        "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
480                            .to_string(),
481                    ));
482                }
483                // If text provided, generate embedding first (semantic search)
484                let search_vector = if let Some(query_text) = text {
485                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
486                    let provider = match provider.as_deref() {
487                        Some(p) => crate::ai::parse_provider(p)?,
488                        None => default_provider,
489                    };
490                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
491                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
492                        .ok()
493                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
494                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
495                    let request = crate::ai::OpenAiEmbeddingRequest {
496                        api_key,
497                        model,
498                        inputs: vec![query_text.clone()],
499                        dimensions: None,
500                        api_base: provider.resolve_api_base(),
501                    };
502                    let response = crate::runtime::ai::block_on_ai(async move {
503                        crate::ai::openai_embeddings_async(&transport, request).await
504                    })
505                    .and_then(|result| result)?;
506                    response.embeddings.into_iter().next().ok_or_else(|| {
507                        RedDBError::Query("embedding API returned no vectors".to_string())
508                    })?
509                } else {
510                    vector.clone()
511                };
512                // Issue #119: route through AuthorizedSearch so the
513                // candidate set is gated by `EffectiveScope.visible_collections`
514                // before any similarity score is computed.
515                let scope = self.ai_scope();
516                let results =
517                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
518                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
519                            self,
520                            &scope,
521                            collection,
522                            &search_vector,
523                            *limit,
524                            *min_score,
525                        )?
526                    } else {
527                        // Embedded / no-auth caller: keep legacy behaviour.
528                        self.search_similar(collection, &search_vector, *limit, *min_score)?
529                    };
530                let mut result =
531                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
532                for sr in &results {
533                    let mut record = UnifiedRecord::new();
534                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
535                    record.set("score", Value::Float(sr.score as f64));
536                    result.push(record);
537                }
538                Ok(RuntimeQueryResult {
539                    query: raw_query.to_string(),
540                    mode: QueryMode::Sql,
541                    statement: "search_similar",
542                    engine: "runtime-search",
543                    result,
544                    affected_rows: 0,
545                    statement_type: "select",
546                })
547            }
548            SearchCommand::Text {
549                query,
550                collection,
551                limit,
552                fuzzy,
553                limit_param,
554            } => {
555                if limit_param.is_some() {
556                    return Err(RedDBError::Query(
557                        "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
558                    ));
559                }
560                let collections = collection.as_ref().map(|c| vec![c.clone()]);
561                // Issue #119: gate the candidate set by visible_collections.
562                let scope = self.ai_scope();
563                let res =
564                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
565                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
566                            self,
567                            &scope,
568                            query.clone(),
569                            collections,
570                            None,
571                            None,
572                            None,
573                            Some(*limit),
574                            *fuzzy,
575                        )?
576                    } else {
577                        self.search_text(
578                            query.clone(),
579                            collections,
580                            None,
581                            None,
582                            None,
583                            Some(*limit),
584                            *fuzzy,
585                        )?
586                    };
587                let mut result =
588                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
589                for item in &res.matches {
590                    let mut record = UnifiedRecord::new();
591                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
592                    record.set("score", Value::Float(item.score as f64));
593                    result.push(record);
594                }
595                Ok(RuntimeQueryResult {
596                    query: raw_query.to_string(),
597                    mode: QueryMode::Sql,
598                    statement: "search_text",
599                    engine: "runtime-search",
600                    result,
601                    affected_rows: 0,
602                    statement_type: "select",
603                })
604            }
605            SearchCommand::Hybrid {
606                vector,
607                query,
608                collection,
609                limit,
610                limit_param,
611            } => {
612                if limit_param.is_some() {
613                    return Err(RedDBError::Query(
614                        "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
615                            .to_string(),
616                    ));
617                }
618                let res = self.search_hybrid(
619                    vector.clone(),
620                    query.clone(),
621                    Some(*limit),
622                    Some(vec![collection.clone()]),
623                    None,
624                    None,
625                    None,
626                    Vec::new(),
627                    None,
628                    None,
629                    Some(*limit),
630                )?;
631                let mut result =
632                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
633                for item in &res.matches {
634                    let mut record = UnifiedRecord::new();
635                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
636                    record.set("score", Value::Float(item.score as f64));
637                    result.push(record);
638                }
639                Ok(RuntimeQueryResult {
640                    query: raw_query.to_string(),
641                    mode: QueryMode::Sql,
642                    statement: "search_hybrid",
643                    engine: "runtime-search",
644                    result,
645                    affected_rows: 0,
646                    statement_type: "select",
647                })
648            }
649            SearchCommand::Multimodal {
650                query,
651                collection,
652                limit,
653                limit_param,
654            } => {
655                if limit_param.is_some() {
656                    return Err(RedDBError::Query(
657                        "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
658                            .to_string(),
659                    ));
660                }
661                let collections = collection.as_ref().map(|c| vec![c.clone()]);
662                let res =
663                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
664                let mut result =
665                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
666                for item in &res.matches {
667                    let mut record = UnifiedRecord::new();
668                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
669                    record.set("score", Value::Float(item.score as f64));
670                    result.push(record);
671                }
672                Ok(RuntimeQueryResult {
673                    query: raw_query.to_string(),
674                    mode: QueryMode::Sql,
675                    statement: "search_multimodal",
676                    engine: "runtime-search",
677                    result,
678                    affected_rows: 0,
679                    statement_type: "select",
680                })
681            }
682            SearchCommand::Index {
683                index,
684                value,
685                collection,
686                limit,
687                exact,
688                limit_param,
689            } => {
690                if limit_param.is_some() {
691                    return Err(RedDBError::Query(
692                        "SEARCH INDEX LIMIT $N parameter was not bound before execution"
693                            .to_string(),
694                    ));
695                }
696                let collections = collection.as_ref().map(|c| vec![c.clone()]);
697                let res = self.search_index(
698                    index.clone(),
699                    value.clone(),
700                    *exact,
701                    collections,
702                    None,
703                    None,
704                    Some(*limit),
705                )?;
706                let mut result =
707                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
708                for item in &res.matches {
709                    let mut record = UnifiedRecord::new();
710                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
711                    record.set("score", Value::Float(item.score as f64));
712                    result.push(record);
713                }
714                Ok(RuntimeQueryResult {
715                    query: raw_query.to_string(),
716                    mode: QueryMode::Sql,
717                    statement: "search_index",
718                    engine: "runtime-search",
719                    result,
720                    affected_rows: 0,
721                    statement_type: "select",
722                })
723            }
724            SearchCommand::Context {
725                query,
726                field,
727                collection,
728                limit,
729                depth,
730                limit_param,
731            } => {
732                if limit_param.is_some() {
733                    return Err(RedDBError::Query(
734                        "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
735                            .to_string(),
736                    ));
737                }
738                use crate::application::SearchContextInput;
739                // Issue #119: route through AuthorizedSearch so the
740                // candidate set + every expansion bucket is bounded by
741                // `EffectiveScope.visible_collections`.
742                let input = SearchContextInput {
743                    query: query.clone(),
744                    field: field.clone(),
745                    vector: None,
746                    collections: collection.as_ref().map(|c| vec![c.clone()]),
747                    graph_depth: Some(*depth),
748                    graph_max_edges: None,
749                    max_cross_refs: None,
750                    follow_cross_refs: None,
751                    expand_graph: None,
752                    global_scan: None,
753                    reindex: None,
754                    limit: Some(*limit),
755                    min_score: None,
756                };
757                let scope = self.ai_scope();
758                let res =
759                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
760                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
761                            self, &scope, input,
762                        )?
763                    } else {
764                        self.search_context(input)?
765                    };
766                let mut result = UnifiedResult::with_columns(vec![
767                    "entity_id".into(),
768                    "collection".into(),
769                    "score".into(),
770                    "discovery".into(),
771                    "kind".into(),
772                ]);
773                let all_entities = res
774                    .tables
775                    .iter()
776                    .map(|e| (e, "table"))
777                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
778                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
779                    .chain(res.vectors.iter().map(|e| (e, "vector")))
780                    .chain(res.documents.iter().map(|e| (e, "document")))
781                    .chain(res.key_values.iter().map(|e| (e, "kv")));
782                for (entity, kind) in all_entities {
783                    let mut record = UnifiedRecord::new();
784                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
785                    record.set("collection", Value::text(entity.collection.clone()));
786                    record.set("score", Value::Float(entity.score as f64));
787                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
788                    record.set("kind", Value::text(kind.to_string()));
789                    result.push(record);
790                }
791                Ok(RuntimeQueryResult {
792                    query: raw_query.to_string(),
793                    mode: QueryMode::Sql,
794                    statement: "search_context",
795                    engine: "runtime-context",
796                    result,
797                    affected_rows: 0,
798                    statement_type: "select",
799                })
800            }
801            SearchCommand::SpatialRadius {
802                center_lat,
803                center_lon,
804                radius_km,
805                collection,
806                column,
807                limit,
808                limit_param,
809            } => {
810                if limit_param.is_some() {
811                    return Err(RedDBError::Query(
812                        "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
813                            .to_string(),
814                    ));
815                }
816                use crate::storage::unified::spatial_index::haversine_km;
817                let _ = column; // Column indicates which field holds geo data
818                let store = self.inner.db.store();
819                let entities = store
820                    .get_collection(collection)
821                    .map(|m| m.query_all(|_| true))
822                    .unwrap_or_default();
823
824                let mut hits: Vec<(u64, f64)> = Vec::new();
825                for entity in &entities {
826                    // Extract lat/lon from GeoPoint values in entity data
827                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
828                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
829                        if dist <= *radius_km {
830                            hits.push((entity.id.raw(), dist));
831                        }
832                    }
833                }
834                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
835                hits.truncate(*limit);
836
837                let mut result =
838                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
839                for (id, dist) in &hits {
840                    let mut record = UnifiedRecord::new();
841                    record.set("entity_id", Value::UnsignedInteger(*id));
842                    record.set("distance_km", Value::Float(*dist));
843                    result.push(record);
844                }
845                Ok(RuntimeQueryResult {
846                    query: raw_query.to_string(),
847                    mode: QueryMode::Sql,
848                    statement: "search_spatial_radius",
849                    engine: "runtime-spatial",
850                    result,
851                    affected_rows: 0,
852                    statement_type: "select",
853                })
854            }
855            SearchCommand::SpatialBbox {
856                min_lat,
857                min_lon,
858                max_lat,
859                max_lon,
860                collection,
861                column,
862                limit,
863                limit_param,
864            } => {
865                if limit_param.is_some() {
866                    return Err(RedDBError::Query(
867                        "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
868                            .to_string(),
869                    ));
870                }
871                let _ = column;
872                let store = self.inner.db.store();
873                let entities = store
874                    .get_collection(collection)
875                    .map(|m| m.query_all(|_| true))
876                    .unwrap_or_default();
877
878                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
879                let mut count = 0;
880                for entity in &entities {
881                    if count >= *limit {
882                        break;
883                    }
884                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
885                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
886                        {
887                            let mut record = UnifiedRecord::new();
888                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
889                            result.push(record);
890                            count += 1;
891                        }
892                    }
893                }
894                Ok(RuntimeQueryResult {
895                    query: raw_query.to_string(),
896                    mode: QueryMode::Sql,
897                    statement: "search_spatial_bbox",
898                    engine: "runtime-spatial",
899                    result,
900                    affected_rows: 0,
901                    statement_type: "select",
902                })
903            }
904            SearchCommand::SpatialNearest {
905                lat,
906                lon,
907                k,
908                collection,
909                column,
910                k_param,
911            } => {
912                if k_param.is_some() {
913                    return Err(RedDBError::Query(
914                        "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
915                            .to_string(),
916                    ));
917                }
918                use crate::storage::unified::spatial_index::haversine_km;
919                let _ = column;
920                let store = self.inner.db.store();
921                let entities = store
922                    .get_collection(collection)
923                    .map(|m| m.query_all(|_| true))
924                    .unwrap_or_default();
925
926                let mut hits: Vec<(u64, f64)> = Vec::new();
927                for entity in &entities {
928                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
929                        let dist = haversine_km(*lat, *lon, elat, elon);
930                        hits.push((entity.id.raw(), dist));
931                    }
932                }
933                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
934                hits.truncate(*k);
935
936                let mut result =
937                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
938                for (id, dist) in &hits {
939                    let mut record = UnifiedRecord::new();
940                    record.set("entity_id", Value::UnsignedInteger(*id));
941                    record.set("distance_km", Value::Float(*dist));
942                    result.push(record);
943                }
944                Ok(RuntimeQueryResult {
945                    query: raw_query.to_string(),
946                    mode: QueryMode::Sql,
947                    statement: "search_spatial_nearest",
948                    engine: "runtime-spatial",
949                    result,
950                    affected_rows: 0,
951                    statement_type: "select",
952                })
953            }
954        }
955    }
956}
957
958fn apply_graph_order_and_limit(
959    result: &mut UnifiedResult,
960    statement: &str,
961    order_by: Option<&GraphCommandOrderBy>,
962    limit: Option<usize>,
963) -> RedDBResult<()> {
964    if let Some(order) = order_by {
965        let column = graph_order_metric_column(statement, &order.metric)?;
966        let columns = result.columns.clone();
967        result.records.sort_by(|left, right| {
968            let cmp = compare_graph_values(left.get(column), right.get(column));
969            let cmp = if order.ascending { cmp } else { cmp.reverse() };
970            if cmp == Ordering::Equal {
971                compare_graph_rows(left, right, &columns)
972            } else {
973                cmp
974            }
975        });
976    }
977    if let Some(limit) = limit {
978        result.records.truncate(limit);
979    }
980    Ok(())
981}
982
983fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
984    let metric = metric.to_ascii_lowercase();
985    match (statement, metric.as_str()) {
986        ("graph_centrality", "score" | "centrality_score") => Ok("score"),
987        ("graph_community", "size" | "community_size") => Ok("size"),
988        ("graph_components", "size" | "component_size") => Ok("size"),
989        ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
990            Ok(match metric.as_str() {
991                "total_weight" => "total_weight",
992                "nodes_visited" => "nodes_visited",
993                _ => "hop_count",
994            })
995        }
996        _ => Err(RedDBError::Query(format!(
997            "unsupported ORDER BY metric '{metric}' for GRAPH {}",
998            statement.trim_start_matches("graph_")
999        ))),
1000    }
1001}
1002
1003fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1004    for column in columns {
1005        let cmp = compare_graph_values(left.get(column), right.get(column));
1006        if cmp != Ordering::Equal {
1007            return cmp;
1008        }
1009    }
1010    Ordering::Equal
1011}
1012
1013fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1014    match (left, right) {
1015        (None, None) => Ordering::Equal,
1016        (None, Some(_)) => Ordering::Less,
1017        (Some(_), None) => Ordering::Greater,
1018        (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1019        (Some(Value::Null), Some(_)) => Ordering::Less,
1020        (Some(_), Some(Value::Null)) => Ordering::Greater,
1021        (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1022        (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1023            left.cmp(right)
1024        }
1025        (Some(Value::Float(left)), Some(Value::Float(right))) => {
1026            left.partial_cmp(right).unwrap_or(Ordering::Equal)
1027        }
1028        (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1029            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1030        }
1031        (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1032            .partial_cmp(&(*right as f64))
1033            .unwrap_or(Ordering::Equal),
1034        (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1035            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1036        }
1037        (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1038            .partial_cmp(&(*right as f64))
1039            .unwrap_or(Ordering::Equal),
1040        (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1041            (*left as i128).cmp(&(*right as i128))
1042        }
1043        (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1044            (*left as i128).cmp(&(*right as i128))
1045        }
1046        (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1047        (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1048        (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1049        (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1050    }
1051}
1052
1053// =============================================================================
1054// Conversion helpers for string -> enum
1055// =============================================================================
1056
1057fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1058    match s.to_lowercase().as_str() {
1059        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1060        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1061        "both" | "any" => Ok(RuntimeGraphDirection::Both),
1062        _ => Err(RedDBError::Query(format!(
1063            "unknown direction: '{s}', expected outgoing|incoming|both"
1064        ))),
1065    }
1066}
1067
1068fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1069    match s.to_lowercase().as_str() {
1070        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1071        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1072        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1073        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1074        _ => Err(RedDBError::Query(format!(
1075            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1076        ))),
1077    }
1078}
1079
1080fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1081    match s.to_lowercase().as_str() {
1082        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1083        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1084        _ => Err(RedDBError::Query(format!(
1085            "unknown traversal strategy: '{s}', expected bfs|dfs"
1086        ))),
1087    }
1088}
1089
1090fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1091    match s.to_lowercase().as_str() {
1092        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1093        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1094        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1095        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1096        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1097        _ => Err(RedDBError::Query(format!(
1098            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1099        ))),
1100    }
1101}
1102
1103fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1104    match s.to_lowercase().as_str() {
1105        "label_propagation" | "labelpropagation" => {
1106            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1107        }
1108        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1109        _ => Err(RedDBError::Query(format!(
1110            "unknown community algorithm: '{s}', expected label_propagation|louvain"
1111        ))),
1112    }
1113}
1114
1115fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1116    match s.to_lowercase().as_str() {
1117        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1118        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1119        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1120        _ => Err(RedDBError::Query(format!(
1121            "unknown components mode: '{s}', expected connected|weak|strong"
1122        ))),
1123    }
1124}
1125
1126/// Extract (latitude, longitude) from an entity.
1127///
1128/// Looks for GeoPoint values in the entity data (row columns or node properties)
1129/// or dedicated lat/lon fields. Returns degrees.
1130fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1131    match &entity.data {
1132        EntityData::Row(row) => {
1133            // Search named columns for GeoPoint or lat/lon pairs
1134            if let Some(ref named) = row.named {
1135                // Direct GeoPoint value
1136                for value in named.values() {
1137                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
1138                        return Some((
1139                            *lat_micro as f64 / 1_000_000.0,
1140                            *lon_micro as f64 / 1_000_000.0,
1141                        ));
1142                    }
1143                }
1144                // Try lat/lon or latitude/longitude named fields
1145                let lat =
1146                    named
1147                        .get("lat")
1148                        .or_else(|| named.get("latitude"))
1149                        .and_then(|v| match v {
1150                            Value::Float(f) => Some(*f),
1151                            Value::Integer(i) => Some(*i as f64),
1152                            _ => None,
1153                        });
1154                let lon = named
1155                    .get("lon")
1156                    .or_else(|| named.get("lng"))
1157                    .or_else(|| named.get("longitude"))
1158                    .and_then(|v| match v {
1159                        Value::Float(f) => Some(*f),
1160                        Value::Integer(i) => Some(*i as f64),
1161                        _ => None,
1162                    });
1163                if let (Some(la), Some(lo)) = (lat, lon) {
1164                    return Some((la, lo));
1165                }
1166            }
1167            // Search positional columns for GeoPoint
1168            for value in &row.columns {
1169                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1170                    return Some((
1171                        *lat_micro as f64 / 1_000_000.0,
1172                        *lon_micro as f64 / 1_000_000.0,
1173                    ));
1174                }
1175            }
1176            None
1177        }
1178        EntityData::Node(node) => {
1179            // Search node properties
1180            for value in node.properties.values() {
1181                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1182                    return Some((
1183                        *lat_micro as f64 / 1_000_000.0,
1184                        *lon_micro as f64 / 1_000_000.0,
1185                    ));
1186                }
1187            }
1188            let lat = node
1189                .properties
1190                .get("lat")
1191                .or_else(|| node.properties.get("latitude"))
1192                .and_then(|v| match v {
1193                    Value::Float(f) => Some(*f),
1194                    Value::Integer(i) => Some(*i as f64),
1195                    _ => None,
1196                });
1197            let lon = node
1198                .properties
1199                .get("lon")
1200                .or_else(|| node.properties.get("lng"))
1201                .or_else(|| node.properties.get("longitude"))
1202                .and_then(|v| match v {
1203                    Value::Float(f) => Some(*f),
1204                    Value::Integer(i) => Some(*i as f64),
1205                    _ => None,
1206                });
1207            if let (Some(la), Some(lo)) = (lat, lon) {
1208                return Some((la, lo));
1209            }
1210            None
1211        }
1212        _ => None,
1213    }
1214}