Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12    /// Execute a GRAPH analytics command.
13    pub fn execute_graph_command(
14        &self,
15        raw_query: &str,
16        cmd: &GraphCommand,
17    ) -> RedDBResult<RuntimeQueryResult> {
18        match cmd {
19            GraphCommand::Neighborhood {
20                source,
21                depth,
22                direction,
23                edge_labels,
24            } => {
25                let dir = parse_direction(direction)?;
26                let res = self.graph_neighborhood(
27                    source,
28                    dir,
29                    *depth as usize,
30                    edge_labels.clone(),
31                    None,
32                )?;
33                let mut result = UnifiedResult::with_columns(vec![
34                    "node_id".into(),
35                    "label".into(),
36                    "node_type".into(),
37                    "depth".into(),
38                ]);
39                for visit in &res.nodes {
40                    let mut record = UnifiedRecord::new();
41                    record.set("node_id", Value::text(visit.node.id.clone()));
42                    record.set("label", Value::text(visit.node.label.clone()));
43                    record.set("node_type", Value::text(visit.node.node_type.clone()));
44                    record.set("depth", Value::Integer(visit.depth as i64));
45                    result.push(record);
46                }
47                Ok(RuntimeQueryResult {
48                    query: raw_query.to_string(),
49                    mode: QueryMode::Sql,
50                    statement: "graph_neighborhood",
51                    engine: "runtime-graph",
52                    result,
53                    affected_rows: 0,
54                    statement_type: "select",
55                })
56            }
57            GraphCommand::ShortestPath {
58                source,
59                target,
60                algorithm,
61                direction,
62                limit,
63                order_by,
64            } => {
65                let dir = parse_direction(direction)?;
66                let alg = parse_path_algorithm(algorithm)?;
67                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68                let mut result = UnifiedResult::with_columns(vec![
69                    "source".into(),
70                    "target".into(),
71                    "nodes_visited".into(),
72                    "negative_cycle_detected".into(),
73                    "hop_count".into(),
74                    "total_weight".into(),
75                ]);
76                let mut record = UnifiedRecord::new();
77                record.set("source", Value::text(res.source));
78                record.set("target", Value::text(res.target));
79                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
80                record.set(
81                    "negative_cycle_detected",
82                    match res.negative_cycle_detected {
83                        Some(value) => Value::Boolean(value),
84                        None => Value::Null,
85                    },
86                );
87                if let Some(ref path) = res.path {
88                    record.set("hop_count", Value::Integer(path.hop_count as i64));
89                    record.set("total_weight", Value::Float(path.total_weight));
90                } else {
91                    record.set("hop_count", Value::Null);
92                    record.set("total_weight", Value::Null);
93                }
94                result.push(record);
95                apply_graph_order_and_limit(
96                    &mut result,
97                    "graph_shortest_path",
98                    order_by.as_ref(),
99                    limit.map(|n| n as usize),
100                )?;
101                Ok(RuntimeQueryResult {
102                    query: raw_query.to_string(),
103                    mode: QueryMode::Sql,
104                    statement: "graph_shortest_path",
105                    engine: "runtime-graph",
106                    result,
107                    affected_rows: 0,
108                    statement_type: "select",
109                })
110            }
111            GraphCommand::Properties { source } => {
112                if let Some(node_ref) = source {
113                    // Per-node property lookup (#423). Uses the same label
114                    // resolution as NEIGHBORHOOD/TRAVERSE so '<label>' and
115                    // '<numeric id>' both work.
116                    let graph =
117                        materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
118                    let resolved = resolve_graph_node_id(&graph, node_ref)?;
119                    let stored = graph
120                        .get_node(&resolved)
121                        .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
122                    let node_type = self
123                        .inner
124                        .db
125                        .store()
126                        .query_all(|entity| {
127                            entity.id.raw().to_string() == resolved
128                                && matches!(
129                                    entity.kind,
130                                    crate::storage::unified::EntityKind::GraphNode(_)
131                                )
132                        })
133                        .into_iter()
134                        .find_map(|(_, entity)| match entity.kind {
135                            crate::storage::unified::EntityKind::GraphNode(node) => {
136                                Some(node.node_type)
137                            }
138                            _ => None,
139                        })
140                        .unwrap_or_else(|| stored.node_type.clone());
141                    let all_props =
142                        materialize_graph_node_properties(self.inner.db.store().as_ref())?;
143                    let props = all_props.get(&resolved).cloned().unwrap_or_default();
144
145                    // Fixed columns first, then property keys in sorted order so
146                    // the schema is stable across snapshots / wire renders.
147                    let mut prop_keys: Vec<&String> = props.keys().collect();
148                    prop_keys.sort();
149                    let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
150                    columns.push("node_id".into());
151                    columns.push("label".into());
152                    columns.push("node_type".into());
153                    for k in &prop_keys {
154                        columns.push((*k).clone());
155                    }
156                    let mut result = UnifiedResult::with_columns(columns);
157                    let mut record = UnifiedRecord::new();
158                    record.set("node_id", Value::text(stored.id.clone()));
159                    record.set("label", Value::text(stored.label.clone()));
160                    record.set("node_type", Value::text(node_type));
161                    for k in &prop_keys {
162                        if let Some(v) = props.get(*k) {
163                            record.set(k.as_str(), v.clone());
164                        }
165                    }
166                    result.push(record);
167                    return Ok(RuntimeQueryResult {
168                        query: raw_query.to_string(),
169                        mode: QueryMode::Sql,
170                        statement: "graph_properties",
171                        engine: "runtime-graph",
172                        result,
173                        affected_rows: 0,
174                        statement_type: "select",
175                    });
176                }
177                let res = self.graph_properties(None)?;
178                let mut result = UnifiedResult::with_columns(vec![
179                    "node_count".into(),
180                    "edge_count".into(),
181                    "is_connected".into(),
182                    "is_complete".into(),
183                    "is_cyclic".into(),
184                    "density".into(),
185                ]);
186                let mut record = UnifiedRecord::new();
187                record.set("node_count", Value::Integer(res.node_count as i64));
188                record.set("edge_count", Value::Integer(res.edge_count as i64));
189                record.set("is_connected", Value::Boolean(res.is_connected));
190                record.set("is_complete", Value::Boolean(res.is_complete));
191                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
192                record.set("density", Value::Float(res.density));
193                result.push(record);
194                Ok(RuntimeQueryResult {
195                    query: raw_query.to_string(),
196                    mode: QueryMode::Sql,
197                    statement: "graph_properties",
198                    engine: "runtime-graph",
199                    result,
200                    affected_rows: 0,
201                    statement_type: "select",
202                })
203            }
204            GraphCommand::Traverse {
205                source,
206                strategy,
207                depth,
208                direction,
209                edge_labels,
210            } => {
211                let dir = parse_direction(direction)?;
212                let strat = parse_traversal_strategy(strategy)?;
213                let res = self.graph_traverse(
214                    source,
215                    dir,
216                    *depth as usize,
217                    strat,
218                    edge_labels.clone(),
219                    None,
220                )?;
221                let mut result = UnifiedResult::with_columns(vec![
222                    "node_id".into(),
223                    "label".into(),
224                    "node_type".into(),
225                    "depth".into(),
226                ]);
227                for visit in &res.visits {
228                    let mut record = UnifiedRecord::new();
229                    record.set("node_id", Value::text(visit.node.id.clone()));
230                    record.set("label", Value::text(visit.node.label.clone()));
231                    record.set("node_type", Value::text(visit.node.node_type.clone()));
232                    record.set("depth", Value::Integer(visit.depth as i64));
233                    result.push(record);
234                }
235                Ok(RuntimeQueryResult {
236                    query: raw_query.to_string(),
237                    mode: QueryMode::Sql,
238                    statement: "graph_traverse",
239                    engine: "runtime-graph",
240                    result,
241                    affected_rows: 0,
242                    statement_type: "select",
243                })
244            }
245            GraphCommand::Centrality {
246                algorithm,
247                limit,
248                order_by,
249            } => {
250                let alg = parse_centrality_algorithm(algorithm)?;
251                // `limit = None` keeps historical implicit top-100 cap.
252                // `Some(0)` returns zero rows (standard SQL LIMIT 0 semantics).
253                let limit_usize = limit.map(|n| n as usize);
254                let order_needs_full_set = order_by
255                    .as_ref()
256                    .map(|order| order.ascending)
257                    .unwrap_or(false);
258                let top_k = if order_needs_full_set {
259                    usize::MAX
260                } else {
261                    limit_usize.unwrap_or(100).max(1)
262                };
263                let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
264                let mut result = UnifiedResult::with_columns(vec![
265                    "node_id".into(),
266                    "label".into(),
267                    "score".into(),
268                ]);
269                for score in &res.scores {
270                    let mut record = UnifiedRecord::new();
271                    record.set("node_id", Value::text(score.node.id.clone()));
272                    record.set("label", Value::text(score.node.label.clone()));
273                    record.set("score", Value::Float(score.score));
274                    result.push(record);
275                }
276                for ds in &res.degree_scores {
277                    let mut record = UnifiedRecord::new();
278                    record.set("node_id", Value::text(ds.node.id.clone()));
279                    record.set("label", Value::text(ds.node.label.clone()));
280                    record.set("score", Value::Float(ds.total_degree as f64));
281                    result.push(record);
282                }
283                apply_graph_order_and_limit(
284                    &mut result,
285                    "graph_centrality",
286                    order_by.as_ref(),
287                    Some(limit_usize.unwrap_or(100)),
288                )?;
289                Ok(RuntimeQueryResult {
290                    query: raw_query.to_string(),
291                    mode: QueryMode::Sql,
292                    statement: "graph_centrality",
293                    engine: "runtime-graph",
294                    result,
295                    affected_rows: 0,
296                    statement_type: "select",
297                })
298            }
299            GraphCommand::Community {
300                algorithm,
301                max_iterations,
302                limit,
303                order_by,
304                return_assignments,
305            } => {
306                let alg = parse_community_algorithm(algorithm)?;
307                let res =
308                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
309                let result = if *return_assignments {
310                    // Per-node node→community map (#660). Communities arrive
311                    // sorted (size desc, id asc); within each, sort nodes for a
312                    // deterministic row order. ORDER BY metrics target the
313                    // aggregate shape, so they don't apply here — only LIMIT
314                    // (row cap) is honoured.
315                    let mut result =
316                        UnifiedResult::with_columns(vec!["node_id".into(), "community_id".into()]);
317                    let row_cap = limit.map(|n| n as usize);
318                    'outer: for community in &res.communities {
319                        let mut nodes = community.nodes.clone();
320                        nodes.sort();
321                        for node_id in nodes {
322                            if row_cap.is_some_and(|cap| result.records.len() >= cap) {
323                                break 'outer;
324                            }
325                            let mut record = UnifiedRecord::new();
326                            record.set("node_id", Value::text(node_id));
327                            record.set("community_id", Value::text(community.id.clone()));
328                            result.push(record);
329                        }
330                    }
331                    result
332                } else {
333                    let mut result =
334                        UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
335                    for community in &res.communities {
336                        let mut record = UnifiedRecord::new();
337                        record.set("community_id", Value::text(community.id.clone()));
338                        record.set("size", Value::Integer(community.size as i64));
339                        result.push(record);
340                    }
341                    apply_graph_order_and_limit(
342                        &mut result,
343                        "graph_community",
344                        order_by.as_ref(),
345                        limit.map(|n| n as usize),
346                    )?;
347                    result
348                };
349                Ok(RuntimeQueryResult {
350                    query: raw_query.to_string(),
351                    mode: QueryMode::Sql,
352                    statement: "graph_community",
353                    engine: "runtime-graph",
354                    result,
355                    affected_rows: 0,
356                    statement_type: "select",
357                })
358            }
359            GraphCommand::Components {
360                mode,
361                limit,
362                order_by,
363            } => {
364                let m = parse_components_mode(mode)?;
365                let res = self.graph_components(m, 1, None)?;
366                let mut result =
367                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
368                for component in &res.components {
369                    let mut record = UnifiedRecord::new();
370                    record.set("component_id", Value::text(component.id.clone()));
371                    record.set("size", Value::Integer(component.size as i64));
372                    result.push(record);
373                }
374                apply_graph_order_and_limit(
375                    &mut result,
376                    "graph_components",
377                    order_by.as_ref(),
378                    limit.map(|n| n as usize),
379                )?;
380                Ok(RuntimeQueryResult {
381                    query: raw_query.to_string(),
382                    mode: QueryMode::Sql,
383                    statement: "graph_components",
384                    engine: "runtime-graph",
385                    result,
386                    affected_rows: 0,
387                    statement_type: "select",
388                })
389            }
390            GraphCommand::Cycles { max_length } => {
391                let res = self.graph_cycles(*max_length as usize, 100, None)?;
392                let mut result =
393                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
394                for (i, cycle) in res.cycles.iter().enumerate() {
395                    let mut record = UnifiedRecord::new();
396                    record.set("cycle_index", Value::Integer(i as i64));
397                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
398                    result.push(record);
399                }
400                Ok(RuntimeQueryResult {
401                    query: raw_query.to_string(),
402                    mode: QueryMode::Sql,
403                    statement: "graph_cycles",
404                    engine: "runtime-graph",
405                    result,
406                    affected_rows: 0,
407                    statement_type: "select",
408                })
409            }
410            GraphCommand::Clustering => {
411                let res = self.graph_clustering(100, true, None)?;
412                let mut result = UnifiedResult::with_columns(vec![
413                    "node_id".into(),
414                    "label".into(),
415                    "score".into(),
416                ]);
417                // First row: global coefficient
418                let mut global_record = UnifiedRecord::new();
419                global_record.set("node_id", Value::text("__global__"));
420                global_record.set("label", Value::text("global_clustering"));
421                global_record.set("score", Value::Float(res.global));
422                result.push(global_record);
423                for score in &res.local {
424                    let mut record = UnifiedRecord::new();
425                    record.set("node_id", Value::text(score.node.id.clone()));
426                    record.set("label", Value::text(score.node.label.clone()));
427                    record.set("score", Value::Float(score.score));
428                    result.push(record);
429                }
430                Ok(RuntimeQueryResult {
431                    query: raw_query.to_string(),
432                    mode: QueryMode::Sql,
433                    statement: "graph_clustering",
434                    engine: "runtime-graph",
435                    result,
436                    affected_rows: 0,
437                    statement_type: "select",
438                })
439            }
440            GraphCommand::TopologicalSort => {
441                let res = self.graph_topological_sort(None)?;
442                let mut result = UnifiedResult::with_columns(vec![
443                    "order".into(),
444                    "node_id".into(),
445                    "label".into(),
446                ]);
447                for (i, node) in res.ordered_nodes.iter().enumerate() {
448                    let mut record = UnifiedRecord::new();
449                    record.set("order", Value::Integer(i as i64));
450                    record.set("node_id", Value::text(node.id.clone()));
451                    record.set("label", Value::text(node.label.clone()));
452                    result.push(record);
453                }
454                Ok(RuntimeQueryResult {
455                    query: raw_query.to_string(),
456                    mode: QueryMode::Sql,
457                    statement: "graph_topological_sort",
458                    engine: "runtime-graph",
459                    result,
460                    affected_rows: 0,
461                    statement_type: "select",
462                })
463            }
464        }
465    }
466
467    /// Execute a SEARCH command.
468    pub fn execute_search_command(
469        &self,
470        raw_query: &str,
471        cmd: &SearchCommand,
472    ) -> RedDBResult<RuntimeQueryResult> {
473        match cmd {
474            SearchCommand::Similar {
475                vector,
476                text,
477                provider,
478                collection,
479                limit,
480                min_score,
481                vector_param,
482                limit_param,
483                min_score_param,
484                text_param,
485            } => {
486                if vector_param.is_some() {
487                    return Err(RedDBError::Query(
488                        "SEARCH SIMILAR $N vector parameter was not bound before execution"
489                            .to_string(),
490                    ));
491                }
492                if limit_param.is_some() {
493                    return Err(RedDBError::Query(
494                        "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
495                            .to_string(),
496                    ));
497                }
498                if min_score_param.is_some() {
499                    return Err(RedDBError::Query(
500                        "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
501                            .to_string(),
502                    ));
503                }
504                if text_param.is_some() {
505                    return Err(RedDBError::Query(
506                        "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
507                            .to_string(),
508                    ));
509                }
510                // If text provided, generate embedding first (semantic search)
511                let search_vector = if let Some(query_text) = text {
512                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
513                    let provider = match provider.as_deref() {
514                        Some(p) => crate::ai::parse_provider(p)?,
515                        None => default_provider,
516                    };
517                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
518                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
519                        .ok()
520                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
521                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
522                    let request = crate::ai::OpenAiEmbeddingRequest {
523                        api_key,
524                        model,
525                        inputs: vec![query_text.clone()],
526                        dimensions: None,
527                        api_base: provider.resolve_api_base(),
528                    };
529                    let response = crate::runtime::ai::block_on_ai(async move {
530                        crate::ai::openai_embeddings_async(&transport, request).await
531                    })
532                    .and_then(|result| result)?;
533                    response.embeddings.into_iter().next().ok_or_else(|| {
534                        RedDBError::Query("embedding API returned no vectors".to_string())
535                    })?
536                } else {
537                    vector.clone()
538                };
539                // Issue #119: route through AuthorizedSearch so the
540                // candidate set is gated by `EffectiveScope.visible_collections`
541                // before any similarity score is computed.
542                let scope = self.ai_scope();
543                let results =
544                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
545                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
546                            self,
547                            &scope,
548                            collection,
549                            &search_vector,
550                            *limit,
551                            *min_score,
552                        )?
553                    } else {
554                        // Embedded / no-auth caller: keep legacy behaviour.
555                        self.search_similar(collection, &search_vector, *limit, *min_score)?
556                    };
557                let mut result =
558                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
559                for sr in &results {
560                    let mut record = UnifiedRecord::new();
561                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
562                    record.set("score", Value::Float(sr.score as f64));
563                    result.push(record);
564                }
565                Ok(RuntimeQueryResult {
566                    query: raw_query.to_string(),
567                    mode: QueryMode::Sql,
568                    statement: "search_similar",
569                    engine: "runtime-search",
570                    result,
571                    affected_rows: 0,
572                    statement_type: "select",
573                })
574            }
575            SearchCommand::Text {
576                query,
577                collection,
578                limit,
579                fuzzy,
580                limit_param,
581            } => {
582                if limit_param.is_some() {
583                    return Err(RedDBError::Query(
584                        "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
585                    ));
586                }
587                let collections = collection.as_ref().map(|c| vec![c.clone()]);
588                // Issue #119: gate the candidate set by visible_collections.
589                let scope = self.ai_scope();
590                let res =
591                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
592                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
593                            self,
594                            &scope,
595                            query.clone(),
596                            collections,
597                            None,
598                            None,
599                            None,
600                            Some(*limit),
601                            *fuzzy,
602                        )?
603                    } else {
604                        self.search_text(
605                            query.clone(),
606                            collections,
607                            None,
608                            None,
609                            None,
610                            Some(*limit),
611                            *fuzzy,
612                        )?
613                    };
614                let mut result =
615                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
616                for item in &res.matches {
617                    let mut record = UnifiedRecord::new();
618                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
619                    record.set("score", Value::Float(item.score as f64));
620                    result.push(record);
621                }
622                Ok(RuntimeQueryResult {
623                    query: raw_query.to_string(),
624                    mode: QueryMode::Sql,
625                    statement: "search_text",
626                    engine: "runtime-search",
627                    result,
628                    affected_rows: 0,
629                    statement_type: "select",
630                })
631            }
632            SearchCommand::Hybrid {
633                vector,
634                query,
635                collection,
636                limit,
637                limit_param,
638            } => {
639                if limit_param.is_some() {
640                    return Err(RedDBError::Query(
641                        "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
642                            .to_string(),
643                    ));
644                }
645                let res = self.search_hybrid(
646                    vector.clone(),
647                    query.clone(),
648                    Some(*limit),
649                    Some(vec![collection.clone()]),
650                    None,
651                    None,
652                    None,
653                    Vec::new(),
654                    None,
655                    None,
656                    Some(*limit),
657                )?;
658                let mut result =
659                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
660                for item in &res.matches {
661                    let mut record = UnifiedRecord::new();
662                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
663                    record.set("score", Value::Float(item.score as f64));
664                    result.push(record);
665                }
666                Ok(RuntimeQueryResult {
667                    query: raw_query.to_string(),
668                    mode: QueryMode::Sql,
669                    statement: "search_hybrid",
670                    engine: "runtime-search",
671                    result,
672                    affected_rows: 0,
673                    statement_type: "select",
674                })
675            }
676            SearchCommand::Multimodal {
677                query,
678                collection,
679                limit,
680                limit_param,
681            } => {
682                if limit_param.is_some() {
683                    return Err(RedDBError::Query(
684                        "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
685                            .to_string(),
686                    ));
687                }
688                let collections = collection.as_ref().map(|c| vec![c.clone()]);
689                let res =
690                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
691                let mut result =
692                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
693                for item in &res.matches {
694                    let mut record = UnifiedRecord::new();
695                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
696                    record.set("score", Value::Float(item.score as f64));
697                    result.push(record);
698                }
699                Ok(RuntimeQueryResult {
700                    query: raw_query.to_string(),
701                    mode: QueryMode::Sql,
702                    statement: "search_multimodal",
703                    engine: "runtime-search",
704                    result,
705                    affected_rows: 0,
706                    statement_type: "select",
707                })
708            }
709            SearchCommand::Index {
710                index,
711                value,
712                collection,
713                limit,
714                exact,
715                limit_param,
716            } => {
717                if limit_param.is_some() {
718                    return Err(RedDBError::Query(
719                        "SEARCH INDEX LIMIT $N parameter was not bound before execution"
720                            .to_string(),
721                    ));
722                }
723                let collections = collection.as_ref().map(|c| vec![c.clone()]);
724                let res = self.search_index(
725                    index.clone(),
726                    value.clone(),
727                    *exact,
728                    collections,
729                    None,
730                    None,
731                    Some(*limit),
732                )?;
733                let mut result =
734                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
735                for item in &res.matches {
736                    let mut record = UnifiedRecord::new();
737                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
738                    record.set("score", Value::Float(item.score as f64));
739                    result.push(record);
740                }
741                Ok(RuntimeQueryResult {
742                    query: raw_query.to_string(),
743                    mode: QueryMode::Sql,
744                    statement: "search_index",
745                    engine: "runtime-search",
746                    result,
747                    affected_rows: 0,
748                    statement_type: "select",
749                })
750            }
751            SearchCommand::Context {
752                query,
753                field,
754                collection,
755                limit,
756                depth,
757                limit_param,
758            } => {
759                if limit_param.is_some() {
760                    return Err(RedDBError::Query(
761                        "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
762                            .to_string(),
763                    ));
764                }
765                use crate::application::SearchContextInput;
766                // Issue #119: route through AuthorizedSearch so the
767                // candidate set + every expansion bucket is bounded by
768                // `EffectiveScope.visible_collections`.
769                let input = SearchContextInput {
770                    query: query.clone(),
771                    field: field.clone(),
772                    vector: None,
773                    collections: collection.as_ref().map(|c| vec![c.clone()]),
774                    graph_depth: Some(*depth),
775                    graph_max_edges: None,
776                    max_cross_refs: None,
777                    follow_cross_refs: None,
778                    expand_graph: None,
779                    global_scan: None,
780                    reindex: None,
781                    limit: Some(*limit),
782                    min_score: None,
783                };
784                let scope = self.ai_scope();
785                let res =
786                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
787                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
788                            self, &scope, input,
789                        )?
790                    } else {
791                        self.search_context(input)?
792                    };
793                let mut result = UnifiedResult::with_columns(vec![
794                    "entity_id".into(),
795                    "collection".into(),
796                    "score".into(),
797                    "discovery".into(),
798                    "kind".into(),
799                ]);
800                let all_entities = res
801                    .tables
802                    .iter()
803                    .map(|e| (e, "table"))
804                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
805                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
806                    .chain(res.vectors.iter().map(|e| (e, "vector")))
807                    .chain(res.documents.iter().map(|e| (e, "document")))
808                    .chain(res.key_values.iter().map(|e| (e, "kv")));
809                for (entity, kind) in all_entities {
810                    let mut record = UnifiedRecord::new();
811                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
812                    record.set("collection", Value::text(entity.collection.clone()));
813                    record.set("score", Value::Float(entity.score as f64));
814                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
815                    record.set("kind", Value::text(kind.to_string()));
816                    result.push(record);
817                }
818                Ok(RuntimeQueryResult {
819                    query: raw_query.to_string(),
820                    mode: QueryMode::Sql,
821                    statement: "search_context",
822                    engine: "runtime-context",
823                    result,
824                    affected_rows: 0,
825                    statement_type: "select",
826                })
827            }
828            SearchCommand::SpatialRadius {
829                center_lat,
830                center_lon,
831                radius_km,
832                collection,
833                column,
834                limit,
835                limit_param,
836            } => {
837                if limit_param.is_some() {
838                    return Err(RedDBError::Query(
839                        "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
840                            .to_string(),
841                    ));
842                }
843                use crate::storage::unified::spatial_index::haversine_km;
844                let _ = column; // Column indicates which field holds geo data
845                let store = self.inner.db.store();
846                let entities = store
847                    .get_collection(collection)
848                    .map(|m| m.query_all(|_| true))
849                    .unwrap_or_default();
850
851                let mut hits: Vec<(u64, f64)> = Vec::new();
852                for entity in &entities {
853                    // Extract lat/lon from GeoPoint values in entity data
854                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
855                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
856                        if dist <= *radius_km {
857                            hits.push((entity.id.raw(), dist));
858                        }
859                    }
860                }
861                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
862                hits.truncate(*limit);
863
864                let mut result =
865                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
866                for (id, dist) in &hits {
867                    let mut record = UnifiedRecord::new();
868                    record.set("entity_id", Value::UnsignedInteger(*id));
869                    record.set("distance_km", Value::Float(*dist));
870                    result.push(record);
871                }
872                Ok(RuntimeQueryResult {
873                    query: raw_query.to_string(),
874                    mode: QueryMode::Sql,
875                    statement: "search_spatial_radius",
876                    engine: "runtime-spatial",
877                    result,
878                    affected_rows: 0,
879                    statement_type: "select",
880                })
881            }
882            SearchCommand::SpatialBbox {
883                min_lat,
884                min_lon,
885                max_lat,
886                max_lon,
887                collection,
888                column,
889                limit,
890                limit_param,
891            } => {
892                if limit_param.is_some() {
893                    return Err(RedDBError::Query(
894                        "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
895                            .to_string(),
896                    ));
897                }
898                let _ = column;
899                let store = self.inner.db.store();
900                let entities = store
901                    .get_collection(collection)
902                    .map(|m| m.query_all(|_| true))
903                    .unwrap_or_default();
904
905                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
906                let mut count = 0;
907                for entity in &entities {
908                    if count >= *limit {
909                        break;
910                    }
911                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
912                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
913                        {
914                            let mut record = UnifiedRecord::new();
915                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
916                            result.push(record);
917                            count += 1;
918                        }
919                    }
920                }
921                Ok(RuntimeQueryResult {
922                    query: raw_query.to_string(),
923                    mode: QueryMode::Sql,
924                    statement: "search_spatial_bbox",
925                    engine: "runtime-spatial",
926                    result,
927                    affected_rows: 0,
928                    statement_type: "select",
929                })
930            }
931            SearchCommand::SpatialNearest {
932                lat,
933                lon,
934                k,
935                collection,
936                column,
937                k_param,
938            } => {
939                if k_param.is_some() {
940                    return Err(RedDBError::Query(
941                        "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
942                            .to_string(),
943                    ));
944                }
945                use crate::storage::unified::spatial_index::haversine_km;
946                let _ = column;
947                let store = self.inner.db.store();
948                let entities = store
949                    .get_collection(collection)
950                    .map(|m| m.query_all(|_| true))
951                    .unwrap_or_default();
952
953                let mut hits: Vec<(u64, f64)> = Vec::new();
954                for entity in &entities {
955                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
956                        let dist = haversine_km(*lat, *lon, elat, elon);
957                        hits.push((entity.id.raw(), dist));
958                    }
959                }
960                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
961                hits.truncate(*k);
962
963                let mut result =
964                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
965                for (id, dist) in &hits {
966                    let mut record = UnifiedRecord::new();
967                    record.set("entity_id", Value::UnsignedInteger(*id));
968                    record.set("distance_km", Value::Float(*dist));
969                    result.push(record);
970                }
971                Ok(RuntimeQueryResult {
972                    query: raw_query.to_string(),
973                    mode: QueryMode::Sql,
974                    statement: "search_spatial_nearest",
975                    engine: "runtime-spatial",
976                    result,
977                    affected_rows: 0,
978                    statement_type: "select",
979                })
980            }
981        }
982    }
983}
984
985fn apply_graph_order_and_limit(
986    result: &mut UnifiedResult,
987    statement: &str,
988    order_by: Option<&GraphCommandOrderBy>,
989    limit: Option<usize>,
990) -> RedDBResult<()> {
991    if let Some(order) = order_by {
992        let column = graph_order_metric_column(statement, &order.metric)?;
993        let columns = result.columns.clone();
994        result.records.sort_by(|left, right| {
995            let cmp = compare_graph_values(left.get(column), right.get(column));
996            let cmp = if order.ascending { cmp } else { cmp.reverse() };
997            if cmp == Ordering::Equal {
998                compare_graph_rows(left, right, &columns)
999            } else {
1000                cmp
1001            }
1002        });
1003    }
1004    if let Some(limit) = limit {
1005        result.records.truncate(limit);
1006    }
1007    Ok(())
1008}
1009
1010fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
1011    let metric = metric.to_ascii_lowercase();
1012    match (statement, metric.as_str()) {
1013        ("graph_centrality", "score" | "centrality_score") => Ok("score"),
1014        ("graph_community", "size" | "community_size") => Ok("size"),
1015        ("graph_components", "size" | "component_size") => Ok("size"),
1016        ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
1017            Ok(match metric.as_str() {
1018                "total_weight" => "total_weight",
1019                "nodes_visited" => "nodes_visited",
1020                _ => "hop_count",
1021            })
1022        }
1023        _ => Err(RedDBError::Query(format!(
1024            "unsupported ORDER BY metric '{metric}' for GRAPH {}",
1025            statement.trim_start_matches("graph_")
1026        ))),
1027    }
1028}
1029
1030fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1031    for column in columns {
1032        let cmp = compare_graph_values(left.get(column), right.get(column));
1033        if cmp != Ordering::Equal {
1034            return cmp;
1035        }
1036    }
1037    Ordering::Equal
1038}
1039
1040fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1041    match (left, right) {
1042        (None, None) => Ordering::Equal,
1043        (None, Some(_)) => Ordering::Less,
1044        (Some(_), None) => Ordering::Greater,
1045        (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1046        (Some(Value::Null), Some(_)) => Ordering::Less,
1047        (Some(_), Some(Value::Null)) => Ordering::Greater,
1048        (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1049        (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1050            left.cmp(right)
1051        }
1052        (Some(Value::Float(left)), Some(Value::Float(right))) => {
1053            left.partial_cmp(right).unwrap_or(Ordering::Equal)
1054        }
1055        (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1056            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1057        }
1058        (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1059            .partial_cmp(&(*right as f64))
1060            .unwrap_or(Ordering::Equal),
1061        (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1062            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1063        }
1064        (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1065            .partial_cmp(&(*right as f64))
1066            .unwrap_or(Ordering::Equal),
1067        (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1068            (*left as i128).cmp(&(*right as i128))
1069        }
1070        (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1071            (*left as i128).cmp(&(*right as i128))
1072        }
1073        (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1074        (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1075        (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1076        (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1077    }
1078}
1079
1080// =============================================================================
1081// Conversion helpers for string -> enum
1082// =============================================================================
1083
1084fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1085    match s.to_lowercase().as_str() {
1086        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1087        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1088        "both" | "any" => Ok(RuntimeGraphDirection::Both),
1089        _ => Err(RedDBError::Query(format!(
1090            "unknown direction: '{s}', expected outgoing|incoming|both"
1091        ))),
1092    }
1093}
1094
1095fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1096    match s.to_lowercase().as_str() {
1097        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1098        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1099        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1100        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1101        _ => Err(RedDBError::Query(format!(
1102            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1103        ))),
1104    }
1105}
1106
1107fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1108    match s.to_lowercase().as_str() {
1109        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1110        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1111        _ => Err(RedDBError::Query(format!(
1112            "unknown traversal strategy: '{s}', expected bfs|dfs"
1113        ))),
1114    }
1115}
1116
1117fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1118    match s.to_lowercase().as_str() {
1119        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1120        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1121        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1122        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1123        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1124        _ => Err(RedDBError::Query(format!(
1125            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1126        ))),
1127    }
1128}
1129
1130fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1131    match s.to_lowercase().as_str() {
1132        "label_propagation" | "labelpropagation" => {
1133            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1134        }
1135        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1136        _ => Err(RedDBError::Query(format!(
1137            "unknown community algorithm: '{s}', expected label_propagation|louvain"
1138        ))),
1139    }
1140}
1141
1142fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1143    match s.to_lowercase().as_str() {
1144        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1145        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1146        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1147        _ => Err(RedDBError::Query(format!(
1148            "unknown components mode: '{s}', expected connected|weak|strong"
1149        ))),
1150    }
1151}
1152
1153/// Extract (latitude, longitude) from an entity.
1154///
1155/// Looks for GeoPoint values in the entity data (row columns or node properties)
1156/// or dedicated lat/lon fields. Returns degrees.
1157fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1158    match &entity.data {
1159        EntityData::Row(row) => {
1160            // Search named columns for GeoPoint or lat/lon pairs
1161            if let Some(ref named) = row.named {
1162                // Direct GeoPoint value
1163                for value in named.values() {
1164                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
1165                        return Some((
1166                            *lat_micro as f64 / 1_000_000.0,
1167                            *lon_micro as f64 / 1_000_000.0,
1168                        ));
1169                    }
1170                }
1171                // Try lat/lon or latitude/longitude named fields
1172                let lat =
1173                    named
1174                        .get("lat")
1175                        .or_else(|| named.get("latitude"))
1176                        .and_then(|v| match v {
1177                            Value::Float(f) => Some(*f),
1178                            Value::Integer(i) => Some(*i as f64),
1179                            _ => None,
1180                        });
1181                let lon = named
1182                    .get("lon")
1183                    .or_else(|| named.get("lng"))
1184                    .or_else(|| named.get("longitude"))
1185                    .and_then(|v| match v {
1186                        Value::Float(f) => Some(*f),
1187                        Value::Integer(i) => Some(*i as f64),
1188                        _ => None,
1189                    });
1190                if let (Some(la), Some(lo)) = (lat, lon) {
1191                    return Some((la, lo));
1192                }
1193            }
1194            // Search positional columns for GeoPoint
1195            for value in &row.columns {
1196                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1197                    return Some((
1198                        *lat_micro as f64 / 1_000_000.0,
1199                        *lon_micro as f64 / 1_000_000.0,
1200                    ));
1201                }
1202            }
1203            None
1204        }
1205        EntityData::Node(node) => {
1206            // Search node properties
1207            for value in node.properties.values() {
1208                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1209                    return Some((
1210                        *lat_micro as f64 / 1_000_000.0,
1211                        *lon_micro as f64 / 1_000_000.0,
1212                    ));
1213                }
1214            }
1215            let lat = node
1216                .properties
1217                .get("lat")
1218                .or_else(|| node.properties.get("latitude"))
1219                .and_then(|v| match v {
1220                    Value::Float(f) => Some(*f),
1221                    Value::Integer(i) => Some(*i as f64),
1222                    _ => None,
1223                });
1224            let lon = node
1225                .properties
1226                .get("lon")
1227                .or_else(|| node.properties.get("lng"))
1228                .or_else(|| node.properties.get("longitude"))
1229                .and_then(|v| match v {
1230                    Value::Float(f) => Some(*f),
1231                    Value::Integer(i) => Some(*i as f64),
1232                    _ => None,
1233                });
1234            if let (Some(la), Some(lo)) = (lat, lon) {
1235                return Some((la, lo));
1236            }
1237            None
1238        }
1239        _ => None,
1240    }
1241}