Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12    /// Execute a GRAPH analytics command.
13    pub fn execute_graph_command(
14        &self,
15        raw_query: &str,
16        cmd: &GraphCommand,
17    ) -> RedDBResult<RuntimeQueryResult> {
18        match cmd {
19            GraphCommand::Neighborhood {
20                source,
21                depth,
22                direction,
23                edge_labels,
24            } => {
25                let dir = parse_direction(direction)?;
26                let res = self.graph_neighborhood(
27                    source,
28                    dir,
29                    *depth as usize,
30                    edge_labels.clone(),
31                    None,
32                )?;
33                let mut result = UnifiedResult::with_columns(vec![
34                    "node_id".into(),
35                    "label".into(),
36                    "node_type".into(),
37                    "depth".into(),
38                ]);
39                for visit in &res.nodes {
40                    let mut record = UnifiedRecord::new();
41                    record.set("node_id", Value::text(visit.node.id.clone()));
42                    record.set("label", Value::text(visit.node.label.clone()));
43                    record.set("node_type", Value::text(visit.node.node_type.clone()));
44                    record.set("depth", Value::Integer(visit.depth as i64));
45                    result.push(record);
46                }
47                Ok(RuntimeQueryResult {
48                    query: raw_query.to_string(),
49                    mode: QueryMode::Sql,
50                    statement: "graph_neighborhood",
51                    engine: "runtime-graph",
52                    result,
53                    affected_rows: 0,
54                    statement_type: "select",
55                })
56            }
57            GraphCommand::ShortestPath {
58                source,
59                target,
60                algorithm,
61                direction,
62                limit,
63                order_by,
64            } => {
65                let dir = parse_direction(direction)?;
66                let alg = parse_path_algorithm(algorithm)?;
67                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
68                let mut result = UnifiedResult::with_columns(vec![
69                    "source".into(),
70                    "target".into(),
71                    "nodes_visited".into(),
72                    "negative_cycle_detected".into(),
73                    "path_found".into(),
74                    "hop_count".into(),
75                    "total_weight".into(),
76                ]);
77                let mut record = UnifiedRecord::new();
78                record.set("source", Value::text(res.source));
79                record.set("target", Value::text(res.target));
80                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
81                record.set(
82                    "negative_cycle_detected",
83                    match res.negative_cycle_detected {
84                        Some(value) => Value::Boolean(value),
85                        None => Value::Null,
86                    },
87                );
88                if let Some(ref path) = res.path {
89                    record.set("path_found", Value::Boolean(true));
90                    record.set("hop_count", Value::Integer(path.hop_count as i64));
91                    record.set("total_weight", Value::Float(path.total_weight));
92                } else {
93                    record.set("path_found", Value::Boolean(false));
94                    record.set("hop_count", Value::Null);
95                    record.set("total_weight", Value::Null);
96                }
97                result.push(record);
98                apply_graph_order_and_limit(
99                    &mut result,
100                    "graph_shortest_path",
101                    order_by.as_ref(),
102                    limit.map(|n| n as usize),
103                )?;
104                Ok(RuntimeQueryResult {
105                    query: raw_query.to_string(),
106                    mode: QueryMode::Sql,
107                    statement: "graph_shortest_path",
108                    engine: "runtime-graph",
109                    result,
110                    affected_rows: 0,
111                    statement_type: "select",
112                })
113            }
114            GraphCommand::Properties { source } => {
115                if let Some(node_ref) = source {
116                    // Per-node property lookup (#423). Uses the same label
117                    // resolution as NEIGHBORHOOD/TRAVERSE so '<label>' and
118                    // '<numeric id>' both work.
119                    let graph =
120                        materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
121                    let resolved = resolve_graph_node_id(&graph, node_ref)?;
122                    let stored = graph
123                        .get_node(&resolved)
124                        .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
125                    let node_type = self
126                        .inner
127                        .db
128                        .store()
129                        .query_all(|entity| {
130                            entity.id.raw().to_string() == resolved
131                                && matches!(
132                                    entity.kind,
133                                    crate::storage::unified::EntityKind::GraphNode(_)
134                                )
135                        })
136                        .into_iter()
137                        .find_map(|(_, entity)| match entity.kind {
138                            crate::storage::unified::EntityKind::GraphNode(node) => {
139                                Some(node.node_type)
140                            }
141                            _ => None,
142                        })
143                        .unwrap_or_else(|| stored.node_type.clone());
144                    let all_props =
145                        materialize_graph_node_properties(self.inner.db.store().as_ref())?;
146                    let props = all_props.get(&resolved).cloned().unwrap_or_default();
147
148                    // Fixed columns first, then property keys in sorted order so
149                    // the schema is stable across snapshots / wire renders.
150                    let mut prop_keys: Vec<&String> = props.keys().collect();
151                    prop_keys.sort();
152                    let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
153                    columns.push("node_id".into());
154                    columns.push("label".into());
155                    columns.push("node_type".into());
156                    for k in &prop_keys {
157                        columns.push((*k).clone());
158                    }
159                    let mut result = UnifiedResult::with_columns(columns);
160                    let mut record = UnifiedRecord::new();
161                    record.set("node_id", Value::text(stored.id.clone()));
162                    record.set("label", Value::text(stored.label.clone()));
163                    record.set("node_type", Value::text(node_type));
164                    for k in &prop_keys {
165                        if let Some(v) = props.get(*k) {
166                            record.set(k.as_str(), v.clone());
167                        }
168                    }
169                    result.push(record);
170                    return Ok(RuntimeQueryResult {
171                        query: raw_query.to_string(),
172                        mode: QueryMode::Sql,
173                        statement: "graph_properties",
174                        engine: "runtime-graph",
175                        result,
176                        affected_rows: 0,
177                        statement_type: "select",
178                    });
179                }
180                let res = self.graph_properties(None)?;
181                let mut result = UnifiedResult::with_columns(vec![
182                    "node_count".into(),
183                    "edge_count".into(),
184                    "is_connected".into(),
185                    "is_complete".into(),
186                    "is_cyclic".into(),
187                    "density".into(),
188                ]);
189                let mut record = UnifiedRecord::new();
190                record.set("node_count", Value::Integer(res.node_count as i64));
191                record.set("edge_count", Value::Integer(res.edge_count as i64));
192                record.set("is_connected", Value::Boolean(res.is_connected));
193                record.set("is_complete", Value::Boolean(res.is_complete));
194                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
195                record.set("density", Value::Float(res.density));
196                result.push(record);
197                Ok(RuntimeQueryResult {
198                    query: raw_query.to_string(),
199                    mode: QueryMode::Sql,
200                    statement: "graph_properties",
201                    engine: "runtime-graph",
202                    result,
203                    affected_rows: 0,
204                    statement_type: "select",
205                })
206            }
207            GraphCommand::Traverse {
208                source,
209                strategy,
210                depth,
211                direction,
212                edge_labels,
213            } => {
214                let dir = parse_direction(direction)?;
215                let strat = parse_traversal_strategy(strategy)?;
216                let res = self.graph_traverse(
217                    source,
218                    dir,
219                    *depth as usize,
220                    strat,
221                    edge_labels.clone(),
222                    None,
223                )?;
224                let mut result = UnifiedResult::with_columns(vec![
225                    "node_id".into(),
226                    "label".into(),
227                    "node_type".into(),
228                    "depth".into(),
229                ]);
230                for visit in &res.visits {
231                    let mut record = UnifiedRecord::new();
232                    record.set("node_id", Value::text(visit.node.id.clone()));
233                    record.set("label", Value::text(visit.node.label.clone()));
234                    record.set("node_type", Value::text(visit.node.node_type.clone()));
235                    record.set("depth", Value::Integer(visit.depth as i64));
236                    result.push(record);
237                }
238                Ok(RuntimeQueryResult {
239                    query: raw_query.to_string(),
240                    mode: QueryMode::Sql,
241                    statement: "graph_traverse",
242                    engine: "runtime-graph",
243                    result,
244                    affected_rows: 0,
245                    statement_type: "select",
246                })
247            }
248            GraphCommand::Centrality {
249                algorithm,
250                limit,
251                order_by,
252            } => {
253                let alg = parse_centrality_algorithm(algorithm)?;
254                // `limit = None` keeps historical implicit top-100 cap.
255                // `Some(0)` returns zero rows (standard SQL LIMIT 0 semantics).
256                let limit_usize = limit.map(|n| n as usize);
257                let order_needs_full_set = order_by
258                    .as_ref()
259                    .map(|order| order.ascending)
260                    .unwrap_or(false);
261                let top_k = if order_needs_full_set {
262                    usize::MAX
263                } else {
264                    limit_usize.unwrap_or(100).max(1)
265                };
266                let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
267                let mut result = UnifiedResult::with_columns(vec![
268                    "node_id".into(),
269                    "label".into(),
270                    "score".into(),
271                ]);
272                for score in &res.scores {
273                    let mut record = UnifiedRecord::new();
274                    record.set("node_id", Value::text(score.node.id.clone()));
275                    record.set("label", Value::text(score.node.label.clone()));
276                    record.set("score", Value::Float(score.score));
277                    result.push(record);
278                }
279                for ds in &res.degree_scores {
280                    let mut record = UnifiedRecord::new();
281                    record.set("node_id", Value::text(ds.node.id.clone()));
282                    record.set("label", Value::text(ds.node.label.clone()));
283                    record.set("score", Value::Float(ds.total_degree as f64));
284                    result.push(record);
285                }
286                apply_graph_order_and_limit(
287                    &mut result,
288                    "graph_centrality",
289                    order_by.as_ref(),
290                    Some(limit_usize.unwrap_or(100)),
291                )?;
292                Ok(RuntimeQueryResult {
293                    query: raw_query.to_string(),
294                    mode: QueryMode::Sql,
295                    statement: "graph_centrality",
296                    engine: "runtime-graph",
297                    result,
298                    affected_rows: 0,
299                    statement_type: "select",
300                })
301            }
302            GraphCommand::Community {
303                algorithm,
304                max_iterations,
305                limit,
306                order_by,
307                return_assignments,
308            } => {
309                let alg = parse_community_algorithm(algorithm)?;
310                let res =
311                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
312                let result = if *return_assignments {
313                    // Per-node node→community map (#660). Communities arrive
314                    // sorted (size desc, id asc); within each, sort nodes for a
315                    // deterministic row order. ORDER BY metrics target the
316                    // aggregate shape, so they don't apply here — only LIMIT
317                    // (row cap) is honoured.
318                    let mut result =
319                        UnifiedResult::with_columns(vec!["node_id".into(), "community_id".into()]);
320                    let row_cap = limit.map(|n| n as usize);
321                    'outer: for community in &res.communities {
322                        let mut nodes = community.nodes.clone();
323                        nodes.sort();
324                        for node_id in nodes {
325                            if row_cap.is_some_and(|cap| result.records.len() >= cap) {
326                                break 'outer;
327                            }
328                            let mut record = UnifiedRecord::new();
329                            record.set("node_id", Value::text(node_id));
330                            record.set("community_id", Value::text(community.id.clone()));
331                            result.push(record);
332                        }
333                    }
334                    result
335                } else {
336                    let mut result =
337                        UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
338                    for community in &res.communities {
339                        let mut record = UnifiedRecord::new();
340                        record.set("community_id", Value::text(community.id.clone()));
341                        record.set("size", Value::Integer(community.size as i64));
342                        result.push(record);
343                    }
344                    apply_graph_order_and_limit(
345                        &mut result,
346                        "graph_community",
347                        order_by.as_ref(),
348                        limit.map(|n| n as usize),
349                    )?;
350                    result
351                };
352                Ok(RuntimeQueryResult {
353                    query: raw_query.to_string(),
354                    mode: QueryMode::Sql,
355                    statement: "graph_community",
356                    engine: "runtime-graph",
357                    result,
358                    affected_rows: 0,
359                    statement_type: "select",
360                })
361            }
362            GraphCommand::Components {
363                mode,
364                limit,
365                order_by,
366            } => {
367                let m = parse_components_mode(mode)?;
368                let res = self.graph_components(m, 1, None)?;
369                let mut result =
370                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
371                for component in &res.components {
372                    let mut record = UnifiedRecord::new();
373                    record.set("component_id", Value::text(component.id.clone()));
374                    record.set("size", Value::Integer(component.size as i64));
375                    result.push(record);
376                }
377                apply_graph_order_and_limit(
378                    &mut result,
379                    "graph_components",
380                    order_by.as_ref(),
381                    limit.map(|n| n as usize),
382                )?;
383                Ok(RuntimeQueryResult {
384                    query: raw_query.to_string(),
385                    mode: QueryMode::Sql,
386                    statement: "graph_components",
387                    engine: "runtime-graph",
388                    result,
389                    affected_rows: 0,
390                    statement_type: "select",
391                })
392            }
393            GraphCommand::Cycles { max_length } => {
394                let res = self.graph_cycles(*max_length as usize, 100, None)?;
395                let mut result =
396                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
397                for (i, cycle) in res.cycles.iter().enumerate() {
398                    let mut record = UnifiedRecord::new();
399                    record.set("cycle_index", Value::Integer(i as i64));
400                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
401                    result.push(record);
402                }
403                Ok(RuntimeQueryResult {
404                    query: raw_query.to_string(),
405                    mode: QueryMode::Sql,
406                    statement: "graph_cycles",
407                    engine: "runtime-graph",
408                    result,
409                    affected_rows: 0,
410                    statement_type: "select",
411                })
412            }
413            GraphCommand::Clustering => {
414                let res = self.graph_clustering(100, true, None)?;
415                let mut result = UnifiedResult::with_columns(vec![
416                    "node_id".into(),
417                    "label".into(),
418                    "score".into(),
419                ]);
420                // First row: global coefficient
421                let mut global_record = UnifiedRecord::new();
422                global_record.set("node_id", Value::text("__global__"));
423                global_record.set("label", Value::text("global_clustering"));
424                global_record.set("score", Value::Float(res.global));
425                result.push(global_record);
426                for score in &res.local {
427                    let mut record = UnifiedRecord::new();
428                    record.set("node_id", Value::text(score.node.id.clone()));
429                    record.set("label", Value::text(score.node.label.clone()));
430                    record.set("score", Value::Float(score.score));
431                    result.push(record);
432                }
433                Ok(RuntimeQueryResult {
434                    query: raw_query.to_string(),
435                    mode: QueryMode::Sql,
436                    statement: "graph_clustering",
437                    engine: "runtime-graph",
438                    result,
439                    affected_rows: 0,
440                    statement_type: "select",
441                })
442            }
443            GraphCommand::TopologicalSort => {
444                let res = self.graph_topological_sort(None)?;
445                let mut result = UnifiedResult::with_columns(vec![
446                    "order".into(),
447                    "node_id".into(),
448                    "label".into(),
449                ]);
450                for (i, node) in res.ordered_nodes.iter().enumerate() {
451                    let mut record = UnifiedRecord::new();
452                    record.set("order", Value::Integer(i as i64));
453                    record.set("node_id", Value::text(node.id.clone()));
454                    record.set("label", Value::text(node.label.clone()));
455                    result.push(record);
456                }
457                Ok(RuntimeQueryResult {
458                    query: raw_query.to_string(),
459                    mode: QueryMode::Sql,
460                    statement: "graph_topological_sort",
461                    engine: "runtime-graph",
462                    result,
463                    affected_rows: 0,
464                    statement_type: "select",
465                })
466            }
467        }
468    }
469
470    /// Execute a SEARCH command.
471    pub fn execute_search_command(
472        &self,
473        raw_query: &str,
474        cmd: &SearchCommand,
475    ) -> RedDBResult<RuntimeQueryResult> {
476        match cmd {
477            SearchCommand::Similar {
478                vector,
479                text,
480                provider,
481                collection,
482                limit,
483                min_score,
484                vector_param,
485                limit_param,
486                min_score_param,
487                text_param,
488            } => {
489                if vector_param.is_some() {
490                    return Err(RedDBError::Query(
491                        "SEARCH SIMILAR $N vector parameter was not bound before execution"
492                            .to_string(),
493                    ));
494                }
495                if limit_param.is_some() {
496                    return Err(RedDBError::Query(
497                        "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
498                            .to_string(),
499                    ));
500                }
501                if min_score_param.is_some() {
502                    return Err(RedDBError::Query(
503                        "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
504                            .to_string(),
505                    ));
506                }
507                if text_param.is_some() {
508                    return Err(RedDBError::Query(
509                        "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
510                            .to_string(),
511                    ));
512                }
513                // If text provided, generate embedding first (semantic search)
514                let search_vector = if let Some(query_text) = text {
515                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
516                    let provider = match provider.as_deref() {
517                        Some(p) => crate::ai::parse_provider(p)?,
518                        None => default_provider,
519                    };
520                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
521                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
522                        .ok()
523                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
524                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
525                    let request = crate::ai::OpenAiEmbeddingRequest {
526                        api_key,
527                        model,
528                        inputs: vec![query_text.clone()],
529                        dimensions: None,
530                        api_base: provider.resolve_api_base(),
531                    };
532                    let response = crate::runtime::ai::block_on_ai(async move {
533                        crate::ai::openai_embeddings_async(&transport, request).await
534                    })
535                    .and_then(|result| result)?;
536                    response.embeddings.into_iter().next().ok_or_else(|| {
537                        RedDBError::Query("embedding API returned no vectors".to_string())
538                    })?
539                } else {
540                    vector.clone()
541                };
542                // Issue #119: route through AuthorizedSearch so the
543                // candidate set is gated by `EffectiveScope.visible_collections`
544                // before any similarity score is computed.
545                let scope = self.ai_scope();
546                let results =
547                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
548                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
549                            self,
550                            &scope,
551                            collection,
552                            &search_vector,
553                            *limit,
554                            *min_score,
555                        )?
556                    } else {
557                        // Embedded / no-auth caller: keep legacy behaviour.
558                        self.search_similar(collection, &search_vector, *limit, *min_score)?
559                    };
560                let mut result =
561                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
562                for sr in &results {
563                    let mut record = UnifiedRecord::new();
564                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
565                    record.set("score", Value::Float(sr.score as f64));
566                    result.push(record);
567                }
568                Ok(RuntimeQueryResult {
569                    query: raw_query.to_string(),
570                    mode: QueryMode::Sql,
571                    statement: "search_similar",
572                    engine: "runtime-search",
573                    result,
574                    affected_rows: 0,
575                    statement_type: "select",
576                })
577            }
578            SearchCommand::Text {
579                query,
580                collection,
581                limit,
582                fuzzy,
583                limit_param,
584            } => {
585                if limit_param.is_some() {
586                    return Err(RedDBError::Query(
587                        "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
588                    ));
589                }
590                let collections = collection.as_ref().map(|c| vec![c.clone()]);
591                // Issue #119: gate the candidate set by visible_collections.
592                let scope = self.ai_scope();
593                let res =
594                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
595                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
596                            self,
597                            &scope,
598                            query.clone(),
599                            collections,
600                            None,
601                            None,
602                            None,
603                            Some(*limit),
604                            *fuzzy,
605                        )?
606                    } else {
607                        self.search_text(
608                            query.clone(),
609                            collections,
610                            None,
611                            None,
612                            None,
613                            Some(*limit),
614                            *fuzzy,
615                        )?
616                    };
617                let mut result =
618                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
619                for item in &res.matches {
620                    let mut record = UnifiedRecord::new();
621                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
622                    record.set("score", Value::Float(item.score as f64));
623                    result.push(record);
624                }
625                Ok(RuntimeQueryResult {
626                    query: raw_query.to_string(),
627                    mode: QueryMode::Sql,
628                    statement: "search_text",
629                    engine: "runtime-search",
630                    result,
631                    affected_rows: 0,
632                    statement_type: "select",
633                })
634            }
635            SearchCommand::Hybrid {
636                vector,
637                query,
638                collection,
639                limit,
640                limit_param,
641            } => {
642                if limit_param.is_some() {
643                    return Err(RedDBError::Query(
644                        "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
645                            .to_string(),
646                    ));
647                }
648                let res = self.search_hybrid(
649                    vector.clone(),
650                    query.clone(),
651                    Some(*limit),
652                    Some(vec![collection.clone()]),
653                    None,
654                    None,
655                    None,
656                    Vec::new(),
657                    None,
658                    None,
659                    Some(*limit),
660                )?;
661                let mut result =
662                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
663                for item in &res.matches {
664                    let mut record = UnifiedRecord::new();
665                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
666                    record.set("score", Value::Float(item.score as f64));
667                    result.push(record);
668                }
669                Ok(RuntimeQueryResult {
670                    query: raw_query.to_string(),
671                    mode: QueryMode::Sql,
672                    statement: "search_hybrid",
673                    engine: "runtime-search",
674                    result,
675                    affected_rows: 0,
676                    statement_type: "select",
677                })
678            }
679            SearchCommand::Multimodal {
680                query,
681                collection,
682                limit,
683                limit_param,
684            } => {
685                if limit_param.is_some() {
686                    return Err(RedDBError::Query(
687                        "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
688                            .to_string(),
689                    ));
690                }
691                let collections = collection.as_ref().map(|c| vec![c.clone()]);
692                let res =
693                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
694                let mut result =
695                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
696                for item in &res.matches {
697                    let mut record = UnifiedRecord::new();
698                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
699                    record.set("score", Value::Float(item.score as f64));
700                    result.push(record);
701                }
702                Ok(RuntimeQueryResult {
703                    query: raw_query.to_string(),
704                    mode: QueryMode::Sql,
705                    statement: "search_multimodal",
706                    engine: "runtime-search",
707                    result,
708                    affected_rows: 0,
709                    statement_type: "select",
710                })
711            }
712            SearchCommand::Index {
713                index,
714                value,
715                collection,
716                limit,
717                exact,
718                limit_param,
719            } => {
720                if limit_param.is_some() {
721                    return Err(RedDBError::Query(
722                        "SEARCH INDEX LIMIT $N parameter was not bound before execution"
723                            .to_string(),
724                    ));
725                }
726                let collections = collection.as_ref().map(|c| vec![c.clone()]);
727                let res = self.search_index(
728                    index.clone(),
729                    value.clone(),
730                    *exact,
731                    collections,
732                    None,
733                    None,
734                    Some(*limit),
735                )?;
736                let mut result =
737                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
738                for item in &res.matches {
739                    let mut record = UnifiedRecord::new();
740                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
741                    record.set("score", Value::Float(item.score as f64));
742                    result.push(record);
743                }
744                Ok(RuntimeQueryResult {
745                    query: raw_query.to_string(),
746                    mode: QueryMode::Sql,
747                    statement: "search_index",
748                    engine: "runtime-search",
749                    result,
750                    affected_rows: 0,
751                    statement_type: "select",
752                })
753            }
754            SearchCommand::Context {
755                query,
756                field,
757                collection,
758                limit,
759                depth,
760                limit_param,
761            } => {
762                if limit_param.is_some() {
763                    return Err(RedDBError::Query(
764                        "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
765                            .to_string(),
766                    ));
767                }
768                use crate::application::SearchContextInput;
769                // Issue #119: route through AuthorizedSearch so the
770                // candidate set + every expansion bucket is bounded by
771                // `EffectiveScope.visible_collections`.
772                let input = SearchContextInput {
773                    query: query.clone(),
774                    field: field.clone(),
775                    vector: None,
776                    collections: collection.as_ref().map(|c| vec![c.clone()]),
777                    graph_depth: Some(*depth),
778                    graph_max_edges: None,
779                    max_cross_refs: None,
780                    follow_cross_refs: None,
781                    expand_graph: None,
782                    global_scan: None,
783                    reindex: None,
784                    limit: Some(*limit),
785                    min_score: None,
786                };
787                let scope = self.ai_scope();
788                let res =
789                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
790                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
791                            self, &scope, input,
792                        )?
793                    } else {
794                        self.search_context(input)?
795                    };
796                let mut result = UnifiedResult::with_columns(vec![
797                    "entity_id".into(),
798                    "collection".into(),
799                    "score".into(),
800                    "discovery".into(),
801                    "kind".into(),
802                ]);
803                let all_entities = res
804                    .tables
805                    .iter()
806                    .map(|e| (e, "table"))
807                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
808                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
809                    .chain(res.vectors.iter().map(|e| (e, "vector")))
810                    .chain(res.documents.iter().map(|e| (e, "document")))
811                    .chain(res.key_values.iter().map(|e| (e, "kv")));
812                for (entity, kind) in all_entities {
813                    let mut record = UnifiedRecord::new();
814                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
815                    record.set("collection", Value::text(entity.collection.clone()));
816                    record.set("score", Value::Float(entity.score as f64));
817                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
818                    record.set("kind", Value::text(kind.to_string()));
819                    result.push(record);
820                }
821                Ok(RuntimeQueryResult {
822                    query: raw_query.to_string(),
823                    mode: QueryMode::Sql,
824                    statement: "search_context",
825                    engine: "runtime-context",
826                    result,
827                    affected_rows: 0,
828                    statement_type: "select",
829                })
830            }
831            SearchCommand::SpatialRadius {
832                center_lat,
833                center_lon,
834                radius_km,
835                collection,
836                column,
837                limit,
838                limit_param,
839            } => {
840                if limit_param.is_some() {
841                    return Err(RedDBError::Query(
842                        "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
843                            .to_string(),
844                    ));
845                }
846                use crate::storage::unified::spatial_index::haversine_km;
847                let _ = column; // Column indicates which field holds geo data
848                let store = self.inner.db.store();
849                let entities = store
850                    .get_collection(collection)
851                    .map(|m| m.query_all(|_| true))
852                    .unwrap_or_default();
853
854                let mut hits: Vec<(u64, f64)> = Vec::new();
855                for entity in &entities {
856                    // Extract lat/lon from GeoPoint values in entity data
857                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
858                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
859                        if dist <= *radius_km {
860                            hits.push((entity.id.raw(), dist));
861                        }
862                    }
863                }
864                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
865                hits.truncate(*limit);
866
867                let mut result =
868                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
869                for (id, dist) in &hits {
870                    let mut record = UnifiedRecord::new();
871                    record.set("entity_id", Value::UnsignedInteger(*id));
872                    record.set("distance_km", Value::Float(*dist));
873                    result.push(record);
874                }
875                Ok(RuntimeQueryResult {
876                    query: raw_query.to_string(),
877                    mode: QueryMode::Sql,
878                    statement: "search_spatial_radius",
879                    engine: "runtime-spatial",
880                    result,
881                    affected_rows: 0,
882                    statement_type: "select",
883                })
884            }
885            SearchCommand::SpatialBbox {
886                min_lat,
887                min_lon,
888                max_lat,
889                max_lon,
890                collection,
891                column,
892                limit,
893                limit_param,
894            } => {
895                if limit_param.is_some() {
896                    return Err(RedDBError::Query(
897                        "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
898                            .to_string(),
899                    ));
900                }
901                let _ = column;
902                let store = self.inner.db.store();
903                let entities = store
904                    .get_collection(collection)
905                    .map(|m| m.query_all(|_| true))
906                    .unwrap_or_default();
907
908                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
909                let mut count = 0;
910                for entity in &entities {
911                    if count >= *limit {
912                        break;
913                    }
914                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
915                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
916                        {
917                            let mut record = UnifiedRecord::new();
918                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
919                            result.push(record);
920                            count += 1;
921                        }
922                    }
923                }
924                Ok(RuntimeQueryResult {
925                    query: raw_query.to_string(),
926                    mode: QueryMode::Sql,
927                    statement: "search_spatial_bbox",
928                    engine: "runtime-spatial",
929                    result,
930                    affected_rows: 0,
931                    statement_type: "select",
932                })
933            }
934            SearchCommand::SpatialNearest {
935                lat,
936                lon,
937                k,
938                collection,
939                column,
940                k_param,
941            } => {
942                if k_param.is_some() {
943                    return Err(RedDBError::Query(
944                        "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
945                            .to_string(),
946                    ));
947                }
948                use crate::storage::unified::spatial_index::haversine_km;
949                let _ = column;
950                let store = self.inner.db.store();
951                let entities = store
952                    .get_collection(collection)
953                    .map(|m| m.query_all(|_| true))
954                    .unwrap_or_default();
955
956                let mut hits: Vec<(u64, f64)> = Vec::new();
957                for entity in &entities {
958                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
959                        let dist = haversine_km(*lat, *lon, elat, elon);
960                        hits.push((entity.id.raw(), dist));
961                    }
962                }
963                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
964                hits.truncate(*k);
965
966                let mut result =
967                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
968                for (id, dist) in &hits {
969                    let mut record = UnifiedRecord::new();
970                    record.set("entity_id", Value::UnsignedInteger(*id));
971                    record.set("distance_km", Value::Float(*dist));
972                    result.push(record);
973                }
974                Ok(RuntimeQueryResult {
975                    query: raw_query.to_string(),
976                    mode: QueryMode::Sql,
977                    statement: "search_spatial_nearest",
978                    engine: "runtime-spatial",
979                    result,
980                    affected_rows: 0,
981                    statement_type: "select",
982                })
983            }
984        }
985    }
986}
987
988fn apply_graph_order_and_limit(
989    result: &mut UnifiedResult,
990    statement: &str,
991    order_by: Option<&GraphCommandOrderBy>,
992    limit: Option<usize>,
993) -> RedDBResult<()> {
994    if let Some(order) = order_by {
995        let column = graph_order_metric_column(statement, &order.metric)?;
996        let columns = result.columns.clone();
997        result.records.sort_by(|left, right| {
998            let cmp = compare_graph_values(left.get(column), right.get(column));
999            let cmp = if order.ascending { cmp } else { cmp.reverse() };
1000            if cmp == Ordering::Equal {
1001                compare_graph_rows(left, right, &columns)
1002            } else {
1003                cmp
1004            }
1005        });
1006    }
1007    if let Some(limit) = limit {
1008        result.records.truncate(limit);
1009    }
1010    Ok(())
1011}
1012
1013fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
1014    let metric = metric.to_ascii_lowercase();
1015    match (statement, metric.as_str()) {
1016        ("graph_centrality", "score" | "centrality_score") => Ok("score"),
1017        ("graph_community", "size" | "community_size") => Ok("size"),
1018        ("graph_components", "size" | "component_size") => Ok("size"),
1019        ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
1020            Ok(match metric.as_str() {
1021                "total_weight" => "total_weight",
1022                "nodes_visited" => "nodes_visited",
1023                _ => "hop_count",
1024            })
1025        }
1026        _ => Err(RedDBError::Query(format!(
1027            "unsupported ORDER BY metric '{metric}' for GRAPH {}",
1028            statement.trim_start_matches("graph_")
1029        ))),
1030    }
1031}
1032
1033fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1034    for column in columns {
1035        let cmp = compare_graph_values(left.get(column), right.get(column));
1036        if cmp != Ordering::Equal {
1037            return cmp;
1038        }
1039    }
1040    Ordering::Equal
1041}
1042
1043fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1044    match (left, right) {
1045        (None, None) => Ordering::Equal,
1046        (None, Some(_)) => Ordering::Less,
1047        (Some(_), None) => Ordering::Greater,
1048        (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1049        (Some(Value::Null), Some(_)) => Ordering::Less,
1050        (Some(_), Some(Value::Null)) => Ordering::Greater,
1051        (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1052        (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1053            left.cmp(right)
1054        }
1055        (Some(Value::Float(left)), Some(Value::Float(right))) => {
1056            left.partial_cmp(right).unwrap_or(Ordering::Equal)
1057        }
1058        (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1059            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1060        }
1061        (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1062            .partial_cmp(&(*right as f64))
1063            .unwrap_or(Ordering::Equal),
1064        (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1065            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1066        }
1067        (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1068            .partial_cmp(&(*right as f64))
1069            .unwrap_or(Ordering::Equal),
1070        (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1071            (*left as i128).cmp(&(*right as i128))
1072        }
1073        (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1074            (*left as i128).cmp(&(*right as i128))
1075        }
1076        (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1077        (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1078        (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1079        (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1080    }
1081}
1082
1083// =============================================================================
1084// Conversion helpers for string -> enum
1085// =============================================================================
1086
1087fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1088    match s.to_lowercase().as_str() {
1089        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1090        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1091        "both" | "any" => Ok(RuntimeGraphDirection::Both),
1092        _ => Err(RedDBError::Query(format!(
1093            "unknown direction: '{s}', expected outgoing|incoming|both"
1094        ))),
1095    }
1096}
1097
1098fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1099    match s.to_lowercase().as_str() {
1100        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1101        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1102        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1103        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1104        _ => Err(RedDBError::Query(format!(
1105            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1106        ))),
1107    }
1108}
1109
1110fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1111    match s.to_lowercase().as_str() {
1112        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1113        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1114        _ => Err(RedDBError::Query(format!(
1115            "unknown traversal strategy: '{s}', expected bfs|dfs"
1116        ))),
1117    }
1118}
1119
1120fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1121    match s.to_lowercase().as_str() {
1122        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1123        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1124        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1125        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1126        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1127        _ => Err(RedDBError::Query(format!(
1128            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1129        ))),
1130    }
1131}
1132
1133fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1134    match s.to_lowercase().as_str() {
1135        "label_propagation" | "labelpropagation" => {
1136            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1137        }
1138        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1139        _ => Err(RedDBError::Query(format!(
1140            "unknown community algorithm: '{s}', expected label_propagation|louvain"
1141        ))),
1142    }
1143}
1144
1145fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1146    match s.to_lowercase().as_str() {
1147        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1148        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1149        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1150        _ => Err(RedDBError::Query(format!(
1151            "unknown components mode: '{s}', expected connected|weak|strong"
1152        ))),
1153    }
1154}
1155
1156/// Extract (latitude, longitude) from an entity.
1157///
1158/// Looks for GeoPoint values in the entity data (row columns or node properties)
1159/// or dedicated lat/lon fields. Returns degrees.
1160fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1161    match &entity.data {
1162        EntityData::Row(row) => {
1163            // Search named columns for GeoPoint or lat/lon pairs
1164            if let Some(ref named) = row.named {
1165                // Direct GeoPoint value
1166                for value in named.values() {
1167                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
1168                        return Some((
1169                            *lat_micro as f64 / 1_000_000.0,
1170                            *lon_micro as f64 / 1_000_000.0,
1171                        ));
1172                    }
1173                }
1174                // Try lat/lon or latitude/longitude named fields
1175                let lat =
1176                    named
1177                        .get("lat")
1178                        .or_else(|| named.get("latitude"))
1179                        .and_then(|v| match v {
1180                            Value::Float(f) => Some(*f),
1181                            Value::Integer(i) => Some(*i as f64),
1182                            _ => None,
1183                        });
1184                let lon = named
1185                    .get("lon")
1186                    .or_else(|| named.get("lng"))
1187                    .or_else(|| named.get("longitude"))
1188                    .and_then(|v| match v {
1189                        Value::Float(f) => Some(*f),
1190                        Value::Integer(i) => Some(*i as f64),
1191                        _ => None,
1192                    });
1193                if let (Some(la), Some(lo)) = (lat, lon) {
1194                    return Some((la, lo));
1195                }
1196            }
1197            // Search positional columns for GeoPoint
1198            for value in &row.columns {
1199                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1200                    return Some((
1201                        *lat_micro as f64 / 1_000_000.0,
1202                        *lon_micro as f64 / 1_000_000.0,
1203                    ));
1204                }
1205            }
1206            None
1207        }
1208        EntityData::Node(node) => {
1209            // Search node properties
1210            for value in node.properties.values() {
1211                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1212                    return Some((
1213                        *lat_micro as f64 / 1_000_000.0,
1214                        *lon_micro as f64 / 1_000_000.0,
1215                    ));
1216                }
1217            }
1218            let lat = node
1219                .properties
1220                .get("lat")
1221                .or_else(|| node.properties.get("latitude"))
1222                .and_then(|v| match v {
1223                    Value::Float(f) => Some(*f),
1224                    Value::Integer(i) => Some(*i as f64),
1225                    _ => None,
1226                });
1227            let lon = node
1228                .properties
1229                .get("lon")
1230                .or_else(|| node.properties.get("lng"))
1231                .or_else(|| node.properties.get("longitude"))
1232                .and_then(|v| match v {
1233                    Value::Float(f) => Some(*f),
1234                    Value::Integer(i) => Some(*i as f64),
1235                    _ => None,
1236                });
1237            if let (Some(la), Some(lo)) = (lat, lon) {
1238                return Some((la, lo));
1239            }
1240            None
1241        }
1242        _ => None,
1243    }
1244}