Skip to main content

reddb_server/runtime/
impl_graph_commands.rs

1//! Execution of GRAPH and SEARCH SQL-like commands.
2//!
3//! Maps parsed `GraphCommand` and `SearchCommand` AST nodes to the existing
4//! runtime graph analytics and search methods, returning results wrapped in
5//! `RuntimeQueryResult`.
6
7use super::*;
8use crate::storage::query::ast::GraphCommandOrderBy;
9use std::cmp::Ordering;
10
11impl RedDBRuntime {
12    /// Execute a GRAPH analytics command.
13    pub fn execute_graph_command(
14        &self,
15        raw_query: &str,
16        cmd: &GraphCommand,
17    ) -> RedDBResult<RuntimeQueryResult> {
18        match cmd {
19            GraphCommand::Neighborhood {
20                source,
21                depth,
22                direction,
23                edge_labels,
24            } => {
25                let dir = parse_direction(direction)?;
26                let res = self.graph_neighborhood(
27                    source,
28                    dir,
29                    *depth as usize,
30                    edge_labels.clone(),
31                    None,
32                )?;
33                let mut result = UnifiedResult::with_columns(vec![
34                    "node_id".into(),
35                    "label".into(),
36                    "node_type".into(),
37                    "depth".into(),
38                ]);
39                for visit in &res.nodes {
40                    let mut record = UnifiedRecord::new();
41                    record.set("node_id", Value::text(visit.node.id.clone()));
42                    record.set("label", Value::text(visit.node.label.clone()));
43                    record.set("node_type", Value::text(visit.node.node_type.clone()));
44                    record.set("depth", Value::Integer(visit.depth as i64));
45                    result.push(record);
46                }
47                Ok(RuntimeQueryResult {
48                    query: raw_query.to_string(),
49                    mode: QueryMode::Sql,
50                    statement: "graph_neighborhood",
51                    engine: "runtime-graph",
52                    result,
53                    affected_rows: 0,
54                    statement_type: "select",
55                    bookmark: None,
56                })
57            }
58            GraphCommand::ShortestPath {
59                source,
60                target,
61                algorithm,
62                direction,
63                limit,
64                order_by,
65            } => {
66                let dir = parse_direction(direction)?;
67                let alg = parse_path_algorithm(algorithm)?;
68                let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
69                let mut result = UnifiedResult::with_columns(vec![
70                    "source".into(),
71                    "target".into(),
72                    "nodes_visited".into(),
73                    "negative_cycle_detected".into(),
74                    "path_found".into(),
75                    "hop_count".into(),
76                    "total_weight".into(),
77                ]);
78                let mut record = UnifiedRecord::new();
79                record.set("source", Value::text(res.source));
80                record.set("target", Value::text(res.target));
81                record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
82                record.set(
83                    "negative_cycle_detected",
84                    match res.negative_cycle_detected {
85                        Some(value) => Value::Boolean(value),
86                        None => Value::Null,
87                    },
88                );
89                if let Some(ref path) = res.path {
90                    record.set("path_found", Value::Boolean(true));
91                    record.set("hop_count", Value::Integer(path.hop_count as i64));
92                    record.set("total_weight", Value::Float(path.total_weight));
93                } else {
94                    record.set("path_found", Value::Boolean(false));
95                    record.set("hop_count", Value::Null);
96                    record.set("total_weight", Value::Null);
97                }
98                result.push(record);
99                apply_graph_order_and_limit(
100                    &mut result,
101                    "graph_shortest_path",
102                    order_by.as_ref(),
103                    limit.map(|n| n as usize),
104                )?;
105                Ok(RuntimeQueryResult {
106                    query: raw_query.to_string(),
107                    mode: QueryMode::Sql,
108                    statement: "graph_shortest_path",
109                    engine: "runtime-graph",
110                    result,
111                    affected_rows: 0,
112                    statement_type: "select",
113                    bookmark: None,
114                })
115            }
116            GraphCommand::Properties { source } => {
117                if let Some(node_ref) = source {
118                    // Per-node property lookup (#423). Uses the same label
119                    // resolution as NEIGHBORHOOD/TRAVERSE so '<label>' and
120                    // '<numeric id>' both work.
121                    let graph =
122                        materialize_graph_with_projection(self.inner.db.store().as_ref(), None)?;
123                    let resolved = resolve_graph_node_id(&graph, node_ref)?;
124                    let stored = graph
125                        .get_node(&resolved)
126                        .ok_or_else(|| RedDBError::NotFound(node_ref.to_string()))?;
127                    let node_type = self
128                        .inner
129                        .db
130                        .store()
131                        .query_all(|entity| {
132                            entity.id.raw().to_string() == resolved
133                                && matches!(
134                                    entity.kind,
135                                    crate::storage::unified::EntityKind::GraphNode(_)
136                                )
137                        })
138                        .into_iter()
139                        .find_map(|(_, entity)| match entity.kind {
140                            crate::storage::unified::EntityKind::GraphNode(node) => {
141                                Some(node.node_type)
142                            }
143                            _ => None,
144                        })
145                        .unwrap_or_else(|| stored.node_type.clone());
146                    let all_props =
147                        materialize_graph_node_properties(self.inner.db.store().as_ref())?;
148                    let props = all_props.get(&resolved).cloned().unwrap_or_default();
149
150                    // Fixed columns first, then property keys in sorted order so
151                    // the schema is stable across snapshots / wire renders.
152                    let mut prop_keys: Vec<&String> = props.keys().collect();
153                    prop_keys.sort();
154                    let mut columns: Vec<String> = Vec::with_capacity(3 + prop_keys.len());
155                    columns.push("node_id".into());
156                    columns.push("label".into());
157                    columns.push("node_type".into());
158                    for k in &prop_keys {
159                        columns.push((*k).clone());
160                    }
161                    let mut result = UnifiedResult::with_columns(columns);
162                    let mut record = UnifiedRecord::new();
163                    record.set("node_id", Value::text(stored.id.clone()));
164                    record.set("label", Value::text(stored.label.clone()));
165                    record.set("node_type", Value::text(node_type));
166                    for k in &prop_keys {
167                        if let Some(v) = props.get(*k) {
168                            record.set(k.as_str(), v.clone());
169                        }
170                    }
171                    result.push(record);
172                    return Ok(RuntimeQueryResult {
173                        query: raw_query.to_string(),
174                        mode: QueryMode::Sql,
175                        statement: "graph_properties",
176                        engine: "runtime-graph",
177                        result,
178                        affected_rows: 0,
179                        statement_type: "select",
180                        bookmark: None,
181                    });
182                }
183                let res = self.graph_properties(None)?;
184                let mut result = UnifiedResult::with_columns(vec![
185                    "node_count".into(),
186                    "edge_count".into(),
187                    "is_connected".into(),
188                    "is_complete".into(),
189                    "is_cyclic".into(),
190                    "density".into(),
191                ]);
192                let mut record = UnifiedRecord::new();
193                record.set("node_count", Value::Integer(res.node_count as i64));
194                record.set("edge_count", Value::Integer(res.edge_count as i64));
195                record.set("is_connected", Value::Boolean(res.is_connected));
196                record.set("is_complete", Value::Boolean(res.is_complete));
197                record.set("is_cyclic", Value::Boolean(res.is_cyclic));
198                record.set("density", Value::Float(res.density));
199                result.push(record);
200                Ok(RuntimeQueryResult {
201                    query: raw_query.to_string(),
202                    mode: QueryMode::Sql,
203                    statement: "graph_properties",
204                    engine: "runtime-graph",
205                    result,
206                    affected_rows: 0,
207                    statement_type: "select",
208                    bookmark: None,
209                })
210            }
211            GraphCommand::Traverse {
212                source,
213                strategy,
214                depth,
215                direction,
216                edge_labels,
217            } => {
218                let dir = parse_direction(direction)?;
219                let strat = parse_traversal_strategy(strategy)?;
220                let res = self.graph_traverse(
221                    source,
222                    dir,
223                    *depth as usize,
224                    strat,
225                    edge_labels.clone(),
226                    None,
227                )?;
228                let mut result = UnifiedResult::with_columns(vec![
229                    "node_id".into(),
230                    "label".into(),
231                    "node_type".into(),
232                    "depth".into(),
233                ]);
234                for visit in &res.visits {
235                    let mut record = UnifiedRecord::new();
236                    record.set("node_id", Value::text(visit.node.id.clone()));
237                    record.set("label", Value::text(visit.node.label.clone()));
238                    record.set("node_type", Value::text(visit.node.node_type.clone()));
239                    record.set("depth", Value::Integer(visit.depth as i64));
240                    result.push(record);
241                }
242                Ok(RuntimeQueryResult {
243                    query: raw_query.to_string(),
244                    mode: QueryMode::Sql,
245                    statement: "graph_traverse",
246                    engine: "runtime-graph",
247                    result,
248                    affected_rows: 0,
249                    statement_type: "select",
250                    bookmark: None,
251                })
252            }
253            GraphCommand::Centrality {
254                algorithm,
255                limit,
256                order_by,
257            } => {
258                let alg = parse_centrality_algorithm(algorithm)?;
259                // `limit = None` keeps historical implicit top-100 cap.
260                // `Some(0)` returns zero rows (standard SQL LIMIT 0 semantics).
261                let limit_usize = limit.map(|n| n as usize);
262                let order_needs_full_set = order_by
263                    .as_ref()
264                    .map(|order| order.ascending)
265                    .unwrap_or(false);
266                let top_k = if order_needs_full_set {
267                    usize::MAX
268                } else {
269                    limit_usize.unwrap_or(100).max(1)
270                };
271                let res = self.graph_centrality(alg, top_k, false, None, None, None, None)?;
272                let mut result = UnifiedResult::with_columns(vec![
273                    "node_id".into(),
274                    "label".into(),
275                    "score".into(),
276                ]);
277                for score in &res.scores {
278                    let mut record = UnifiedRecord::new();
279                    record.set("node_id", Value::text(score.node.id.clone()));
280                    record.set("label", Value::text(score.node.label.clone()));
281                    record.set("score", Value::Float(score.score));
282                    result.push(record);
283                }
284                for ds in &res.degree_scores {
285                    let mut record = UnifiedRecord::new();
286                    record.set("node_id", Value::text(ds.node.id.clone()));
287                    record.set("label", Value::text(ds.node.label.clone()));
288                    record.set("score", Value::Float(ds.total_degree as f64));
289                    result.push(record);
290                }
291                apply_graph_order_and_limit(
292                    &mut result,
293                    "graph_centrality",
294                    order_by.as_ref(),
295                    Some(limit_usize.unwrap_or(100)),
296                )?;
297                Ok(RuntimeQueryResult {
298                    query: raw_query.to_string(),
299                    mode: QueryMode::Sql,
300                    statement: "graph_centrality",
301                    engine: "runtime-graph",
302                    result,
303                    affected_rows: 0,
304                    statement_type: "select",
305                    bookmark: None,
306                })
307            }
308            GraphCommand::Community {
309                algorithm,
310                max_iterations,
311                limit,
312                order_by,
313                return_assignments,
314            } => {
315                let alg = parse_community_algorithm(algorithm)?;
316                let res =
317                    self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
318                let result = if *return_assignments {
319                    // Per-node node→community map (#660). Communities arrive
320                    // sorted (size desc, id asc); within each, sort nodes for a
321                    // deterministic row order. ORDER BY metrics target the
322                    // aggregate shape, so they don't apply here — only LIMIT
323                    // (row cap) is honoured.
324                    let mut result =
325                        UnifiedResult::with_columns(vec!["node_id".into(), "community_id".into()]);
326                    let row_cap = limit.map(|n| n as usize);
327                    'outer: for community in &res.communities {
328                        let mut nodes = community.nodes.clone();
329                        nodes.sort();
330                        for node_id in nodes {
331                            if row_cap.is_some_and(|cap| result.records.len() >= cap) {
332                                break 'outer;
333                            }
334                            let mut record = UnifiedRecord::new();
335                            record.set("node_id", Value::text(node_id));
336                            record.set("community_id", Value::text(community.id.clone()));
337                            result.push(record);
338                        }
339                    }
340                    result
341                } else {
342                    let mut result =
343                        UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
344                    for community in &res.communities {
345                        let mut record = UnifiedRecord::new();
346                        record.set("community_id", Value::text(community.id.clone()));
347                        record.set("size", Value::Integer(community.size as i64));
348                        result.push(record);
349                    }
350                    apply_graph_order_and_limit(
351                        &mut result,
352                        "graph_community",
353                        order_by.as_ref(),
354                        limit.map(|n| n as usize),
355                    )?;
356                    result
357                };
358                Ok(RuntimeQueryResult {
359                    query: raw_query.to_string(),
360                    mode: QueryMode::Sql,
361                    statement: "graph_community",
362                    engine: "runtime-graph",
363                    result,
364                    affected_rows: 0,
365                    statement_type: "select",
366                    bookmark: None,
367                })
368            }
369            GraphCommand::Components {
370                mode,
371                limit,
372                order_by,
373            } => {
374                let m = parse_components_mode(mode)?;
375                let res = self.graph_components(m, 1, None)?;
376                let mut result =
377                    UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
378                for component in &res.components {
379                    let mut record = UnifiedRecord::new();
380                    record.set("component_id", Value::text(component.id.clone()));
381                    record.set("size", Value::Integer(component.size as i64));
382                    result.push(record);
383                }
384                apply_graph_order_and_limit(
385                    &mut result,
386                    "graph_components",
387                    order_by.as_ref(),
388                    limit.map(|n| n as usize),
389                )?;
390                Ok(RuntimeQueryResult {
391                    query: raw_query.to_string(),
392                    mode: QueryMode::Sql,
393                    statement: "graph_components",
394                    engine: "runtime-graph",
395                    result,
396                    affected_rows: 0,
397                    statement_type: "select",
398                    bookmark: None,
399                })
400            }
401            GraphCommand::Cycles { max_length } => {
402                let res = self.graph_cycles(*max_length as usize, 100, None)?;
403                let mut result =
404                    UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
405                for (i, cycle) in res.cycles.iter().enumerate() {
406                    let mut record = UnifiedRecord::new();
407                    record.set("cycle_index", Value::Integer(i as i64));
408                    record.set("length", Value::Integer(cycle.nodes.len() as i64));
409                    result.push(record);
410                }
411                Ok(RuntimeQueryResult {
412                    query: raw_query.to_string(),
413                    mode: QueryMode::Sql,
414                    statement: "graph_cycles",
415                    engine: "runtime-graph",
416                    result,
417                    affected_rows: 0,
418                    statement_type: "select",
419                    bookmark: None,
420                })
421            }
422            GraphCommand::Clustering => {
423                let res = self.graph_clustering(100, true, None)?;
424                let mut result = UnifiedResult::with_columns(vec![
425                    "node_id".into(),
426                    "label".into(),
427                    "score".into(),
428                ]);
429                // First row: global coefficient
430                let mut global_record = UnifiedRecord::new();
431                global_record.set("node_id", Value::text("__global__"));
432                global_record.set("label", Value::text("global_clustering"));
433                global_record.set("score", Value::Float(res.global));
434                result.push(global_record);
435                for score in &res.local {
436                    let mut record = UnifiedRecord::new();
437                    record.set("node_id", Value::text(score.node.id.clone()));
438                    record.set("label", Value::text(score.node.label.clone()));
439                    record.set("score", Value::Float(score.score));
440                    result.push(record);
441                }
442                Ok(RuntimeQueryResult {
443                    query: raw_query.to_string(),
444                    mode: QueryMode::Sql,
445                    statement: "graph_clustering",
446                    engine: "runtime-graph",
447                    result,
448                    affected_rows: 0,
449                    statement_type: "select",
450                    bookmark: None,
451                })
452            }
453            GraphCommand::TopologicalSort => {
454                let res = self.graph_topological_sort(None)?;
455                let mut result = UnifiedResult::with_columns(vec![
456                    "order".into(),
457                    "node_id".into(),
458                    "label".into(),
459                ]);
460                for (i, node) in res.ordered_nodes.iter().enumerate() {
461                    let mut record = UnifiedRecord::new();
462                    record.set("order", Value::Integer(i as i64));
463                    record.set("node_id", Value::text(node.id.clone()));
464                    record.set("label", Value::text(node.label.clone()));
465                    result.push(record);
466                }
467                Ok(RuntimeQueryResult {
468                    query: raw_query.to_string(),
469                    mode: QueryMode::Sql,
470                    statement: "graph_topological_sort",
471                    engine: "runtime-graph",
472                    result,
473                    affected_rows: 0,
474                    statement_type: "select",
475                    bookmark: None,
476                })
477            }
478        }
479    }
480
481    /// Execute a SEARCH command.
482    pub fn execute_search_command(
483        &self,
484        raw_query: &str,
485        cmd: &SearchCommand,
486    ) -> RedDBResult<RuntimeQueryResult> {
487        match cmd {
488            SearchCommand::Similar {
489                vector,
490                text,
491                provider,
492                collection,
493                limit,
494                min_score,
495                vector_param,
496                limit_param,
497                min_score_param,
498                text_param,
499            } => {
500                if vector_param.is_some() {
501                    return Err(RedDBError::Query(
502                        "SEARCH SIMILAR $N vector parameter was not bound before execution"
503                            .to_string(),
504                    ));
505                }
506                if limit_param.is_some() {
507                    return Err(RedDBError::Query(
508                        "SEARCH SIMILAR LIMIT $N parameter was not bound before execution"
509                            .to_string(),
510                    ));
511                }
512                if min_score_param.is_some() {
513                    return Err(RedDBError::Query(
514                        "SEARCH SIMILAR MIN_SCORE $N parameter was not bound before execution"
515                            .to_string(),
516                    ));
517                }
518                if text_param.is_some() {
519                    return Err(RedDBError::Query(
520                        "SEARCH SIMILAR TEXT $N parameter was not bound before execution"
521                            .to_string(),
522                    ));
523                }
524                // If text provided, generate embedding first (semantic search)
525                let search_vector = if let Some(query_text) = text {
526                    let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
527                    let provider = match provider.as_deref() {
528                        Some(p) => crate::ai::parse_provider(p)?,
529                        None => default_provider,
530                    };
531                    // S3 / #711: planner-level provider gate. Runs
532                    // before the credential resolver so the resolver
533                    // audit event is not emitted when policy denies.
534                    crate::runtime::ai::provider_gate::enforce(self, &provider)?;
535                    let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
536                    let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
537                        .ok()
538                        .unwrap_or_else(|| provider.default_embedding_model().to_string());
539                    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
540                    let request = crate::ai::OpenAiEmbeddingRequest {
541                        api_key,
542                        model,
543                        inputs: vec![query_text.clone()],
544                        dimensions: None,
545                        api_base: provider.resolve_api_base(),
546                    };
547                    let response = crate::runtime::ai::block_on_ai(async move {
548                        crate::ai::openai_embeddings_async(&transport, request).await
549                    })
550                    .and_then(|result| result)?;
551                    response.embeddings.into_iter().next().ok_or_else(|| {
552                        RedDBError::Query("embedding API returned no vectors".to_string())
553                    })?
554                } else {
555                    vector.clone()
556                };
557                // Issue #119: route through AuthorizedSearch so the
558                // candidate set is gated by `EffectiveScope.visible_collections`
559                // before any similarity score is computed.
560                let scope = self.ai_scope();
561                let results =
562                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
563                        crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
564                            self,
565                            &scope,
566                            collection,
567                            &search_vector,
568                            *limit,
569                            *min_score,
570                        )?
571                    } else {
572                        // Embedded / no-auth caller: keep legacy behaviour.
573                        self.search_similar(collection, &search_vector, *limit, *min_score)?
574                    };
575                let mut result =
576                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
577                for sr in &results {
578                    let mut record = UnifiedRecord::new();
579                    record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
580                    record.set("score", Value::Float(sr.score as f64));
581                    result.push(record);
582                }
583                Ok(RuntimeQueryResult {
584                    query: raw_query.to_string(),
585                    mode: QueryMode::Sql,
586                    statement: "search_similar",
587                    engine: "runtime-search",
588                    result,
589                    affected_rows: 0,
590                    statement_type: "select",
591                    bookmark: None,
592                })
593            }
594            SearchCommand::Text {
595                query,
596                collection,
597                limit,
598                fuzzy,
599                limit_param,
600            } => {
601                if limit_param.is_some() {
602                    return Err(RedDBError::Query(
603                        "SEARCH TEXT LIMIT $N parameter was not bound before execution".to_string(),
604                    ));
605                }
606                let collections = collection.as_ref().map(|c| vec![c.clone()]);
607                // Issue #119: gate the candidate set by visible_collections.
608                let scope = self.ai_scope();
609                let res =
610                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
611                        crate::runtime::authorized_search::AuthorizedSearch::execute_text(
612                            self,
613                            &scope,
614                            query.clone(),
615                            collections,
616                            None,
617                            None,
618                            None,
619                            Some(*limit),
620                            *fuzzy,
621                        )?
622                    } else {
623                        self.search_text(
624                            query.clone(),
625                            collections,
626                            None,
627                            None,
628                            None,
629                            Some(*limit),
630                            *fuzzy,
631                        )?
632                    };
633                let mut result =
634                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
635                for item in &res.matches {
636                    let mut record = UnifiedRecord::new();
637                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
638                    record.set("score", Value::Float(item.score as f64));
639                    result.push(record);
640                }
641                Ok(RuntimeQueryResult {
642                    query: raw_query.to_string(),
643                    mode: QueryMode::Sql,
644                    statement: "search_text",
645                    engine: "runtime-search",
646                    result,
647                    affected_rows: 0,
648                    statement_type: "select",
649                    bookmark: None,
650                })
651            }
652            SearchCommand::Hybrid {
653                vector,
654                query,
655                collection,
656                limit,
657                limit_param,
658            } => {
659                if limit_param.is_some() {
660                    return Err(RedDBError::Query(
661                        "SEARCH HYBRID LIMIT $N parameter was not bound before execution"
662                            .to_string(),
663                    ));
664                }
665                let res = self.search_hybrid(
666                    vector.clone(),
667                    query.clone(),
668                    Some(*limit),
669                    Some(vec![collection.clone()]),
670                    None,
671                    None,
672                    None,
673                    Vec::new(),
674                    None,
675                    None,
676                    Some(*limit),
677                )?;
678                let mut result =
679                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
680                for item in &res.matches {
681                    let mut record = UnifiedRecord::new();
682                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
683                    record.set("score", Value::Float(item.score as f64));
684                    result.push(record);
685                }
686                Ok(RuntimeQueryResult {
687                    query: raw_query.to_string(),
688                    mode: QueryMode::Sql,
689                    statement: "search_hybrid",
690                    engine: "runtime-search",
691                    result,
692                    affected_rows: 0,
693                    statement_type: "select",
694                    bookmark: None,
695                })
696            }
697            SearchCommand::Multimodal {
698                query,
699                collection,
700                limit,
701                limit_param,
702            } => {
703                if limit_param.is_some() {
704                    return Err(RedDBError::Query(
705                        "SEARCH MULTIMODAL LIMIT $N parameter was not bound before execution"
706                            .to_string(),
707                    ));
708                }
709                let collections = collection.as_ref().map(|c| vec![c.clone()]);
710                let res =
711                    self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
712                let mut result =
713                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
714                for item in &res.matches {
715                    let mut record = UnifiedRecord::new();
716                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
717                    record.set("score", Value::Float(item.score as f64));
718                    result.push(record);
719                }
720                Ok(RuntimeQueryResult {
721                    query: raw_query.to_string(),
722                    mode: QueryMode::Sql,
723                    statement: "search_multimodal",
724                    engine: "runtime-search",
725                    result,
726                    affected_rows: 0,
727                    statement_type: "select",
728                    bookmark: None,
729                })
730            }
731            SearchCommand::Index {
732                index,
733                value,
734                collection,
735                limit,
736                exact,
737                limit_param,
738            } => {
739                if limit_param.is_some() {
740                    return Err(RedDBError::Query(
741                        "SEARCH INDEX LIMIT $N parameter was not bound before execution"
742                            .to_string(),
743                    ));
744                }
745                let collections = collection.as_ref().map(|c| vec![c.clone()]);
746                let res = self.search_index(
747                    index.clone(),
748                    value.clone(),
749                    *exact,
750                    collections,
751                    None,
752                    None,
753                    Some(*limit),
754                )?;
755                let mut result =
756                    UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
757                for item in &res.matches {
758                    let mut record = UnifiedRecord::new();
759                    record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
760                    record.set("score", Value::Float(item.score as f64));
761                    result.push(record);
762                }
763                Ok(RuntimeQueryResult {
764                    query: raw_query.to_string(),
765                    mode: QueryMode::Sql,
766                    statement: "search_index",
767                    engine: "runtime-search",
768                    result,
769                    affected_rows: 0,
770                    statement_type: "select",
771                    bookmark: None,
772                })
773            }
774            SearchCommand::Context {
775                query,
776                field,
777                collection,
778                limit,
779                depth,
780                limit_param,
781            } => {
782                if limit_param.is_some() {
783                    return Err(RedDBError::Query(
784                        "SEARCH CONTEXT LIMIT $N parameter was not bound before execution"
785                            .to_string(),
786                    ));
787                }
788                use crate::application::SearchContextInput;
789                // Issue #119: route through AuthorizedSearch so the
790                // candidate set + every expansion bucket is bounded by
791                // `EffectiveScope.visible_collections`.
792                let input = SearchContextInput {
793                    query: query.clone(),
794                    field: field.clone(),
795                    vector: None,
796                    collections: collection.as_ref().map(|c| vec![c.clone()]),
797                    graph_depth: Some(*depth),
798                    graph_max_edges: None,
799                    max_cross_refs: None,
800                    follow_cross_refs: None,
801                    expand_graph: None,
802                    global_scan: None,
803                    reindex: None,
804                    limit: Some(*limit),
805                    min_score: None,
806                };
807                let scope = self.ai_scope();
808                let res =
809                    if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
810                        crate::runtime::authorized_search::AuthorizedSearch::execute_context(
811                            self, &scope, input,
812                        )?
813                    } else {
814                        self.search_context(input)?
815                    };
816                let mut result = UnifiedResult::with_columns(vec![
817                    "entity_id".into(),
818                    "collection".into(),
819                    "score".into(),
820                    "discovery".into(),
821                    "kind".into(),
822                ]);
823                let all_entities = res
824                    .tables
825                    .iter()
826                    .map(|e| (e, "table"))
827                    .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
828                    .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
829                    .chain(res.vectors.iter().map(|e| (e, "vector")))
830                    .chain(res.documents.iter().map(|e| (e, "document")))
831                    .chain(res.key_values.iter().map(|e| (e, "kv")));
832                for (entity, kind) in all_entities {
833                    let mut record = UnifiedRecord::new();
834                    record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
835                    record.set("collection", Value::text(entity.collection.clone()));
836                    record.set("score", Value::Float(entity.score as f64));
837                    record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
838                    record.set("kind", Value::text(kind.to_string()));
839                    result.push(record);
840                }
841                Ok(RuntimeQueryResult {
842                    query: raw_query.to_string(),
843                    mode: QueryMode::Sql,
844                    statement: "search_context",
845                    engine: "runtime-context",
846                    result,
847                    affected_rows: 0,
848                    statement_type: "select",
849                    bookmark: None,
850                })
851            }
852            SearchCommand::SpatialRadius {
853                center_lat,
854                center_lon,
855                radius_km,
856                collection,
857                column,
858                limit,
859                limit_param,
860            } => {
861                if limit_param.is_some() {
862                    return Err(RedDBError::Query(
863                        "SEARCH SPATIAL RADIUS LIMIT $N parameter was not bound before execution"
864                            .to_string(),
865                    ));
866                }
867                use crate::storage::unified::spatial_index::haversine_km;
868                let _ = column; // Column indicates which field holds geo data
869                let store = self.inner.db.store();
870                let entities = store
871                    .get_collection(collection)
872                    .map(|m| m.query_all(|_| true))
873                    .unwrap_or_default();
874
875                let mut hits: Vec<(u64, f64)> = Vec::new();
876                for entity in &entities {
877                    // Extract lat/lon from GeoPoint values in entity data
878                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
879                        let dist = haversine_km(*center_lat, *center_lon, lat, lon);
880                        if dist <= *radius_km {
881                            hits.push((entity.id.raw(), dist));
882                        }
883                    }
884                }
885                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
886                hits.truncate(*limit);
887
888                let mut result =
889                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
890                for (id, dist) in &hits {
891                    let mut record = UnifiedRecord::new();
892                    record.set("entity_id", Value::UnsignedInteger(*id));
893                    record.set("distance_km", Value::Float(*dist));
894                    result.push(record);
895                }
896                Ok(RuntimeQueryResult {
897                    query: raw_query.to_string(),
898                    mode: QueryMode::Sql,
899                    statement: "search_spatial_radius",
900                    engine: "runtime-spatial",
901                    result,
902                    affected_rows: 0,
903                    statement_type: "select",
904                    bookmark: None,
905                })
906            }
907            SearchCommand::SpatialBbox {
908                min_lat,
909                min_lon,
910                max_lat,
911                max_lon,
912                collection,
913                column,
914                limit,
915                limit_param,
916            } => {
917                if limit_param.is_some() {
918                    return Err(RedDBError::Query(
919                        "SEARCH SPATIAL BBOX LIMIT $N parameter was not bound before execution"
920                            .to_string(),
921                    ));
922                }
923                let _ = column;
924                let store = self.inner.db.store();
925                let entities = store
926                    .get_collection(collection)
927                    .map(|m| m.query_all(|_| true))
928                    .unwrap_or_default();
929
930                let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
931                let mut count = 0;
932                for entity in &entities {
933                    if count >= *limit {
934                        break;
935                    }
936                    if let Some((lat, lon)) = extract_geo_from_entity(entity) {
937                        if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
938                        {
939                            let mut record = UnifiedRecord::new();
940                            record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
941                            result.push(record);
942                            count += 1;
943                        }
944                    }
945                }
946                Ok(RuntimeQueryResult {
947                    query: raw_query.to_string(),
948                    mode: QueryMode::Sql,
949                    statement: "search_spatial_bbox",
950                    engine: "runtime-spatial",
951                    result,
952                    affected_rows: 0,
953                    statement_type: "select",
954                    bookmark: None,
955                })
956            }
957            SearchCommand::SpatialNearest {
958                lat,
959                lon,
960                k,
961                collection,
962                column,
963                k_param,
964            } => {
965                if k_param.is_some() {
966                    return Err(RedDBError::Query(
967                        "SEARCH SPATIAL NEAREST K $N parameter was not bound before execution"
968                            .to_string(),
969                    ));
970                }
971                use crate::storage::unified::spatial_index::haversine_km;
972                let _ = column;
973                let store = self.inner.db.store();
974                let entities = store
975                    .get_collection(collection)
976                    .map(|m| m.query_all(|_| true))
977                    .unwrap_or_default();
978
979                let mut hits: Vec<(u64, f64)> = Vec::new();
980                for entity in &entities {
981                    if let Some((elat, elon)) = extract_geo_from_entity(entity) {
982                        let dist = haversine_km(*lat, *lon, elat, elon);
983                        hits.push((entity.id.raw(), dist));
984                    }
985                }
986                hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
987                hits.truncate(*k);
988
989                let mut result =
990                    UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
991                for (id, dist) in &hits {
992                    let mut record = UnifiedRecord::new();
993                    record.set("entity_id", Value::UnsignedInteger(*id));
994                    record.set("distance_km", Value::Float(*dist));
995                    result.push(record);
996                }
997                Ok(RuntimeQueryResult {
998                    query: raw_query.to_string(),
999                    mode: QueryMode::Sql,
1000                    statement: "search_spatial_nearest",
1001                    engine: "runtime-spatial",
1002                    result,
1003                    affected_rows: 0,
1004                    statement_type: "select",
1005                    bookmark: None,
1006                })
1007            }
1008        }
1009    }
1010}
1011
1012fn apply_graph_order_and_limit(
1013    result: &mut UnifiedResult,
1014    statement: &str,
1015    order_by: Option<&GraphCommandOrderBy>,
1016    limit: Option<usize>,
1017) -> RedDBResult<()> {
1018    if let Some(order) = order_by {
1019        let column = graph_order_metric_column(statement, &order.metric)?;
1020        let columns = result.columns.clone();
1021        result.records.sort_by(|left, right| {
1022            let cmp = compare_graph_values(left.get(column), right.get(column));
1023            let cmp = if order.ascending { cmp } else { cmp.reverse() };
1024            if cmp == Ordering::Equal {
1025                compare_graph_rows(left, right, &columns)
1026            } else {
1027                cmp
1028            }
1029        });
1030    }
1031    if let Some(limit) = limit {
1032        result.records.truncate(limit);
1033    }
1034    Ok(())
1035}
1036
1037fn graph_order_metric_column(statement: &str, metric: &str) -> RedDBResult<&'static str> {
1038    let metric = metric.to_ascii_lowercase();
1039    match (statement, metric.as_str()) {
1040        ("graph_centrality", "score" | "centrality_score") => Ok("score"),
1041        ("graph_community", "size" | "community_size") => Ok("size"),
1042        ("graph_components", "size" | "component_size") => Ok("size"),
1043        ("graph_shortest_path", "hop_count" | "total_weight" | "nodes_visited") => {
1044            Ok(match metric.as_str() {
1045                "total_weight" => "total_weight",
1046                "nodes_visited" => "nodes_visited",
1047                _ => "hop_count",
1048            })
1049        }
1050        _ => Err(RedDBError::Query(format!(
1051            "unsupported ORDER BY metric '{metric}' for GRAPH {}",
1052            statement.trim_start_matches("graph_")
1053        ))),
1054    }
1055}
1056
1057fn compare_graph_rows(left: &UnifiedRecord, right: &UnifiedRecord, columns: &[String]) -> Ordering {
1058    for column in columns {
1059        let cmp = compare_graph_values(left.get(column), right.get(column));
1060        if cmp != Ordering::Equal {
1061            return cmp;
1062        }
1063    }
1064    Ordering::Equal
1065}
1066
1067fn compare_graph_values(left: Option<&Value>, right: Option<&Value>) -> Ordering {
1068    match (left, right) {
1069        (None, None) => Ordering::Equal,
1070        (None, Some(_)) => Ordering::Less,
1071        (Some(_), None) => Ordering::Greater,
1072        (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
1073        (Some(Value::Null), Some(_)) => Ordering::Less,
1074        (Some(_), Some(Value::Null)) => Ordering::Greater,
1075        (Some(Value::Integer(left)), Some(Value::Integer(right))) => left.cmp(right),
1076        (Some(Value::UnsignedInteger(left)), Some(Value::UnsignedInteger(right))) => {
1077            left.cmp(right)
1078        }
1079        (Some(Value::Float(left)), Some(Value::Float(right))) => {
1080            left.partial_cmp(right).unwrap_or(Ordering::Equal)
1081        }
1082        (Some(Value::Integer(left)), Some(Value::Float(right))) => {
1083            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1084        }
1085        (Some(Value::Float(left)), Some(Value::Integer(right))) => left
1086            .partial_cmp(&(*right as f64))
1087            .unwrap_or(Ordering::Equal),
1088        (Some(Value::UnsignedInteger(left)), Some(Value::Float(right))) => {
1089            (*left as f64).partial_cmp(right).unwrap_or(Ordering::Equal)
1090        }
1091        (Some(Value::Float(left)), Some(Value::UnsignedInteger(right))) => left
1092            .partial_cmp(&(*right as f64))
1093            .unwrap_or(Ordering::Equal),
1094        (Some(Value::Integer(left)), Some(Value::UnsignedInteger(right))) => {
1095            (*left as i128).cmp(&(*right as i128))
1096        }
1097        (Some(Value::UnsignedInteger(left)), Some(Value::Integer(right))) => {
1098            (*left as i128).cmp(&(*right as i128))
1099        }
1100        (Some(Value::Timestamp(left)), Some(Value::Timestamp(right))) => left.cmp(right),
1101        (Some(Value::Text(left)), Some(Value::Text(right))) => left.cmp(right),
1102        (Some(Value::Boolean(left)), Some(Value::Boolean(right))) => left.cmp(right),
1103        (Some(left), Some(right)) => format!("{left:?}").cmp(&format!("{right:?}")),
1104    }
1105}
1106
1107// =============================================================================
1108// Conversion helpers for string -> enum
1109// =============================================================================
1110
1111fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
1112    match s.to_lowercase().as_str() {
1113        "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
1114        "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
1115        "both" | "any" => Ok(RuntimeGraphDirection::Both),
1116        _ => Err(RedDBError::Query(format!(
1117            "unknown direction: '{s}', expected outgoing|incoming|both"
1118        ))),
1119    }
1120}
1121
1122fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
1123    match s.to_lowercase().as_str() {
1124        "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
1125        "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
1126        "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
1127        "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
1128        _ => Err(RedDBError::Query(format!(
1129            "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
1130        ))),
1131    }
1132}
1133
1134fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
1135    match s.to_lowercase().as_str() {
1136        "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
1137        "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
1138        _ => Err(RedDBError::Query(format!(
1139            "unknown traversal strategy: '{s}', expected bfs|dfs"
1140        ))),
1141    }
1142}
1143
1144fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
1145    match s.to_lowercase().as_str() {
1146        "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
1147        "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
1148        "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
1149        "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
1150        "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
1151        _ => Err(RedDBError::Query(format!(
1152            "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
1153        ))),
1154    }
1155}
1156
1157fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
1158    match s.to_lowercase().as_str() {
1159        "label_propagation" | "labelpropagation" => {
1160            Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
1161        }
1162        "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
1163        _ => Err(RedDBError::Query(format!(
1164            "unknown community algorithm: '{s}', expected label_propagation|louvain"
1165        ))),
1166    }
1167}
1168
1169fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
1170    match s.to_lowercase().as_str() {
1171        "connected" => Ok(RuntimeGraphComponentsMode::Connected),
1172        "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
1173        "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
1174        _ => Err(RedDBError::Query(format!(
1175            "unknown components mode: '{s}', expected connected|weak|strong"
1176        ))),
1177    }
1178}
1179
1180/// Extract (latitude, longitude) from an entity.
1181///
1182/// Looks for GeoPoint values in the entity data (row columns or node properties)
1183/// or dedicated lat/lon fields. Returns degrees.
1184fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
1185    match &entity.data {
1186        EntityData::Row(row) => {
1187            // Search named columns for GeoPoint or lat/lon pairs
1188            if let Some(ref named) = row.named {
1189                // Direct GeoPoint value
1190                for value in named.values() {
1191                    if let Value::GeoPoint(lat_micro, lon_micro) = value {
1192                        return Some((
1193                            *lat_micro as f64 / 1_000_000.0,
1194                            *lon_micro as f64 / 1_000_000.0,
1195                        ));
1196                    }
1197                }
1198                // Try lat/lon or latitude/longitude named fields
1199                let lat =
1200                    named
1201                        .get("lat")
1202                        .or_else(|| named.get("latitude"))
1203                        .and_then(|v| match v {
1204                            Value::Float(f) => Some(*f),
1205                            Value::Integer(i) => Some(*i as f64),
1206                            _ => None,
1207                        });
1208                let lon = named
1209                    .get("lon")
1210                    .or_else(|| named.get("lng"))
1211                    .or_else(|| named.get("longitude"))
1212                    .and_then(|v| match v {
1213                        Value::Float(f) => Some(*f),
1214                        Value::Integer(i) => Some(*i as f64),
1215                        _ => None,
1216                    });
1217                if let (Some(la), Some(lo)) = (lat, lon) {
1218                    return Some((la, lo));
1219                }
1220            }
1221            // Search positional columns for GeoPoint
1222            for value in &row.columns {
1223                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1224                    return Some((
1225                        *lat_micro as f64 / 1_000_000.0,
1226                        *lon_micro as f64 / 1_000_000.0,
1227                    ));
1228                }
1229            }
1230            None
1231        }
1232        EntityData::Node(node) => {
1233            // Search node properties
1234            for value in node.properties.values() {
1235                if let Value::GeoPoint(lat_micro, lon_micro) = value {
1236                    return Some((
1237                        *lat_micro as f64 / 1_000_000.0,
1238                        *lon_micro as f64 / 1_000_000.0,
1239                    ));
1240                }
1241            }
1242            let lat = node
1243                .properties
1244                .get("lat")
1245                .or_else(|| node.properties.get("latitude"))
1246                .and_then(|v| match v {
1247                    Value::Float(f) => Some(*f),
1248                    Value::Integer(i) => Some(*i as f64),
1249                    _ => None,
1250                });
1251            let lon = node
1252                .properties
1253                .get("lon")
1254                .or_else(|| node.properties.get("lng"))
1255                .or_else(|| node.properties.get("longitude"))
1256                .and_then(|v| match v {
1257                    Value::Float(f) => Some(*f),
1258                    Value::Integer(i) => Some(*i as f64),
1259                    _ => None,
1260                });
1261            if let (Some(la), Some(lo)) = (lat, lon) {
1262                return Some((la, lo));
1263            }
1264            None
1265        }
1266        _ => None,
1267    }
1268}