1use super::*;
8
9impl RedDBRuntime {
10 pub fn execute_graph_command(
12 &self,
13 raw_query: &str,
14 cmd: &GraphCommand,
15 ) -> RedDBResult<RuntimeQueryResult> {
16 match cmd {
17 GraphCommand::Neighborhood {
18 source,
19 depth,
20 direction,
21 } => {
22 let dir = parse_direction(direction)?;
23 let res = self.graph_neighborhood(source, dir, *depth as usize, None, None)?;
24 let mut result = UnifiedResult::with_columns(vec![
25 "node_id".into(),
26 "label".into(),
27 "node_type".into(),
28 "depth".into(),
29 ]);
30 for visit in &res.nodes {
31 let mut record = UnifiedRecord::new();
32 record.set("node_id", Value::text(visit.node.id.clone()));
33 record.set("label", Value::text(visit.node.label.clone()));
34 record.set("node_type", Value::text(visit.node.node_type.clone()));
35 record.set("depth", Value::Integer(visit.depth as i64));
36 result.push(record);
37 }
38 Ok(RuntimeQueryResult {
39 query: raw_query.to_string(),
40 mode: QueryMode::Sql,
41 statement: "graph_neighborhood",
42 engine: "runtime-graph",
43 result,
44 affected_rows: 0,
45 statement_type: "select",
46 })
47 }
48 GraphCommand::ShortestPath {
49 source,
50 target,
51 algorithm,
52 direction,
53 } => {
54 let dir = parse_direction(direction)?;
55 let alg = parse_path_algorithm(algorithm)?;
56 let res = self.graph_shortest_path(source, target, dir, alg, None, None)?;
57 let mut result = UnifiedResult::with_columns(vec![
58 "source".into(),
59 "target".into(),
60 "nodes_visited".into(),
61 "negative_cycle_detected".into(),
62 "hop_count".into(),
63 "total_weight".into(),
64 ]);
65 let mut record = UnifiedRecord::new();
66 record.set("source", Value::text(res.source));
67 record.set("target", Value::text(res.target));
68 record.set("nodes_visited", Value::Integer(res.nodes_visited as i64));
69 record.set(
70 "negative_cycle_detected",
71 match res.negative_cycle_detected {
72 Some(value) => Value::Boolean(value),
73 None => Value::Null,
74 },
75 );
76 if let Some(ref path) = res.path {
77 record.set("hop_count", Value::Integer(path.hop_count as i64));
78 record.set("total_weight", Value::Float(path.total_weight));
79 } else {
80 record.set("hop_count", Value::Null);
81 record.set("total_weight", Value::Null);
82 }
83 result.push(record);
84 Ok(RuntimeQueryResult {
85 query: raw_query.to_string(),
86 mode: QueryMode::Sql,
87 statement: "graph_shortest_path",
88 engine: "runtime-graph",
89 result,
90 affected_rows: 0,
91 statement_type: "select",
92 })
93 }
94 GraphCommand::Properties => {
95 let res = self.graph_properties(None)?;
96 let mut result = UnifiedResult::with_columns(vec![
97 "node_count".into(),
98 "edge_count".into(),
99 "is_connected".into(),
100 "is_complete".into(),
101 "is_cyclic".into(),
102 "density".into(),
103 ]);
104 let mut record = UnifiedRecord::new();
105 record.set("node_count", Value::Integer(res.node_count as i64));
106 record.set("edge_count", Value::Integer(res.edge_count as i64));
107 record.set("is_connected", Value::Boolean(res.is_connected));
108 record.set("is_complete", Value::Boolean(res.is_complete));
109 record.set("is_cyclic", Value::Boolean(res.is_cyclic));
110 record.set("density", Value::Float(res.density));
111 result.push(record);
112 Ok(RuntimeQueryResult {
113 query: raw_query.to_string(),
114 mode: QueryMode::Sql,
115 statement: "graph_properties",
116 engine: "runtime-graph",
117 result,
118 affected_rows: 0,
119 statement_type: "select",
120 })
121 }
122 GraphCommand::Traverse {
123 source,
124 strategy,
125 depth,
126 direction,
127 } => {
128 let dir = parse_direction(direction)?;
129 let strat = parse_traversal_strategy(strategy)?;
130 let res = self.graph_traverse(source, dir, *depth as usize, strat, None, None)?;
131 let mut result = UnifiedResult::with_columns(vec![
132 "node_id".into(),
133 "label".into(),
134 "node_type".into(),
135 "depth".into(),
136 ]);
137 for visit in &res.visits {
138 let mut record = UnifiedRecord::new();
139 record.set("node_id", Value::text(visit.node.id.clone()));
140 record.set("label", Value::text(visit.node.label.clone()));
141 record.set("node_type", Value::text(visit.node.node_type.clone()));
142 record.set("depth", Value::Integer(visit.depth as i64));
143 result.push(record);
144 }
145 Ok(RuntimeQueryResult {
146 query: raw_query.to_string(),
147 mode: QueryMode::Sql,
148 statement: "graph_traverse",
149 engine: "runtime-graph",
150 result,
151 affected_rows: 0,
152 statement_type: "select",
153 })
154 }
155 GraphCommand::Centrality { algorithm } => {
156 let alg = parse_centrality_algorithm(algorithm)?;
157 let res = self.graph_centrality(alg, 100, false, None, None, None, None)?;
158 let mut result = UnifiedResult::with_columns(vec![
159 "node_id".into(),
160 "label".into(),
161 "score".into(),
162 ]);
163 for score in &res.scores {
164 let mut record = UnifiedRecord::new();
165 record.set("node_id", Value::text(score.node.id.clone()));
166 record.set("label", Value::text(score.node.label.clone()));
167 record.set("score", Value::Float(score.score));
168 result.push(record);
169 }
170 for ds in &res.degree_scores {
171 let mut record = UnifiedRecord::new();
172 record.set("node_id", Value::text(ds.node.id.clone()));
173 record.set("label", Value::text(ds.node.label.clone()));
174 record.set("score", Value::Float(ds.total_degree as f64));
175 result.push(record);
176 }
177 Ok(RuntimeQueryResult {
178 query: raw_query.to_string(),
179 mode: QueryMode::Sql,
180 statement: "graph_centrality",
181 engine: "runtime-graph",
182 result,
183 affected_rows: 0,
184 statement_type: "select",
185 })
186 }
187 GraphCommand::Community {
188 algorithm,
189 max_iterations,
190 } => {
191 let alg = parse_community_algorithm(algorithm)?;
192 let res =
193 self.graph_communities(alg, 1, Some(*max_iterations as usize), None, None)?;
194 let mut result =
195 UnifiedResult::with_columns(vec!["community_id".into(), "size".into()]);
196 for community in &res.communities {
197 let mut record = UnifiedRecord::new();
198 record.set("community_id", Value::text(community.id.clone()));
199 record.set("size", Value::Integer(community.size as i64));
200 result.push(record);
201 }
202 Ok(RuntimeQueryResult {
203 query: raw_query.to_string(),
204 mode: QueryMode::Sql,
205 statement: "graph_community",
206 engine: "runtime-graph",
207 result,
208 affected_rows: 0,
209 statement_type: "select",
210 })
211 }
212 GraphCommand::Components { mode } => {
213 let m = parse_components_mode(mode)?;
214 let res = self.graph_components(m, 1, None)?;
215 let mut result =
216 UnifiedResult::with_columns(vec!["component_id".into(), "size".into()]);
217 for component in &res.components {
218 let mut record = UnifiedRecord::new();
219 record.set("component_id", Value::text(component.id.clone()));
220 record.set("size", Value::Integer(component.size as i64));
221 result.push(record);
222 }
223 Ok(RuntimeQueryResult {
224 query: raw_query.to_string(),
225 mode: QueryMode::Sql,
226 statement: "graph_components",
227 engine: "runtime-graph",
228 result,
229 affected_rows: 0,
230 statement_type: "select",
231 })
232 }
233 GraphCommand::Cycles { max_length } => {
234 let res = self.graph_cycles(*max_length as usize, 100, None)?;
235 let mut result =
236 UnifiedResult::with_columns(vec!["cycle_index".into(), "length".into()]);
237 for (i, cycle) in res.cycles.iter().enumerate() {
238 let mut record = UnifiedRecord::new();
239 record.set("cycle_index", Value::Integer(i as i64));
240 record.set("length", Value::Integer(cycle.nodes.len() as i64));
241 result.push(record);
242 }
243 Ok(RuntimeQueryResult {
244 query: raw_query.to_string(),
245 mode: QueryMode::Sql,
246 statement: "graph_cycles",
247 engine: "runtime-graph",
248 result,
249 affected_rows: 0,
250 statement_type: "select",
251 })
252 }
253 GraphCommand::Clustering => {
254 let res = self.graph_clustering(100, true, None)?;
255 let mut result = UnifiedResult::with_columns(vec![
256 "node_id".into(),
257 "label".into(),
258 "score".into(),
259 ]);
260 let mut global_record = UnifiedRecord::new();
262 global_record.set("node_id", Value::text("__global__"));
263 global_record.set("label", Value::text("global_clustering"));
264 global_record.set("score", Value::Float(res.global));
265 result.push(global_record);
266 for score in &res.local {
267 let mut record = UnifiedRecord::new();
268 record.set("node_id", Value::text(score.node.id.clone()));
269 record.set("label", Value::text(score.node.label.clone()));
270 record.set("score", Value::Float(score.score));
271 result.push(record);
272 }
273 Ok(RuntimeQueryResult {
274 query: raw_query.to_string(),
275 mode: QueryMode::Sql,
276 statement: "graph_clustering",
277 engine: "runtime-graph",
278 result,
279 affected_rows: 0,
280 statement_type: "select",
281 })
282 }
283 GraphCommand::TopologicalSort => {
284 let res = self.graph_topological_sort(None)?;
285 let mut result = UnifiedResult::with_columns(vec![
286 "order".into(),
287 "node_id".into(),
288 "label".into(),
289 ]);
290 for (i, node) in res.ordered_nodes.iter().enumerate() {
291 let mut record = UnifiedRecord::new();
292 record.set("order", Value::Integer(i as i64));
293 record.set("node_id", Value::text(node.id.clone()));
294 record.set("label", Value::text(node.label.clone()));
295 result.push(record);
296 }
297 Ok(RuntimeQueryResult {
298 query: raw_query.to_string(),
299 mode: QueryMode::Sql,
300 statement: "graph_topological_sort",
301 engine: "runtime-graph",
302 result,
303 affected_rows: 0,
304 statement_type: "select",
305 })
306 }
307 }
308 }
309
310 pub fn execute_search_command(
312 &self,
313 raw_query: &str,
314 cmd: &SearchCommand,
315 ) -> RedDBResult<RuntimeQueryResult> {
316 match cmd {
317 SearchCommand::Similar {
318 vector,
319 text,
320 provider,
321 collection,
322 limit,
323 min_score,
324 } => {
325 let search_vector = if let Some(query_text) = text {
327 let (default_provider, _) = crate::ai::resolve_defaults_from_runtime(self);
328 let provider = match provider.as_deref() {
329 Some(p) => crate::ai::parse_provider(p)?,
330 None => default_provider,
331 };
332 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
333 let model = std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
334 .ok()
335 .unwrap_or_else(|| provider.default_embedding_model().to_string());
336 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(self);
337 let request = crate::ai::OpenAiEmbeddingRequest {
338 api_key,
339 model,
340 inputs: vec![query_text.clone()],
341 dimensions: None,
342 api_base: provider.resolve_api_base(),
343 };
344 let response = crate::runtime::ai::block_on_ai(async move {
345 crate::ai::openai_embeddings_async(&transport, request).await
346 })
347 .and_then(|result| result)?;
348 response.embeddings.into_iter().next().ok_or_else(|| {
349 RedDBError::Query("embedding API returned no vectors".to_string())
350 })?
351 } else {
352 vector.clone()
353 };
354 let scope = self.ai_scope();
358 let results =
359 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
360 crate::runtime::authorized_search::AuthorizedSearch::execute_similar(
361 self,
362 &scope,
363 collection,
364 &search_vector,
365 *limit,
366 *min_score,
367 )?
368 } else {
369 self.search_similar(collection, &search_vector, *limit, *min_score)?
371 };
372 let mut result =
373 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
374 for sr in &results {
375 let mut record = UnifiedRecord::new();
376 record.set("entity_id", Value::UnsignedInteger(sr.entity_id.raw()));
377 record.set("score", Value::Float(sr.score as f64));
378 result.push(record);
379 }
380 Ok(RuntimeQueryResult {
381 query: raw_query.to_string(),
382 mode: QueryMode::Sql,
383 statement: "search_similar",
384 engine: "runtime-search",
385 result,
386 affected_rows: 0,
387 statement_type: "select",
388 })
389 }
390 SearchCommand::Text {
391 query,
392 collection,
393 limit,
394 fuzzy,
395 } => {
396 let collections = collection.as_ref().map(|c| vec![c.clone()]);
397 let scope = self.ai_scope();
399 let res =
400 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
401 crate::runtime::authorized_search::AuthorizedSearch::execute_text(
402 self,
403 &scope,
404 query.clone(),
405 collections,
406 None,
407 None,
408 None,
409 Some(*limit),
410 *fuzzy,
411 )?
412 } else {
413 self.search_text(
414 query.clone(),
415 collections,
416 None,
417 None,
418 None,
419 Some(*limit),
420 *fuzzy,
421 )?
422 };
423 let mut result =
424 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
425 for item in &res.matches {
426 let mut record = UnifiedRecord::new();
427 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
428 record.set("score", Value::Float(item.score as f64));
429 result.push(record);
430 }
431 Ok(RuntimeQueryResult {
432 query: raw_query.to_string(),
433 mode: QueryMode::Sql,
434 statement: "search_text",
435 engine: "runtime-search",
436 result,
437 affected_rows: 0,
438 statement_type: "select",
439 })
440 }
441 SearchCommand::Hybrid {
442 vector,
443 query,
444 collection,
445 limit,
446 } => {
447 let res = self.search_hybrid(
448 vector.clone(),
449 query.clone(),
450 Some(*limit),
451 Some(vec![collection.clone()]),
452 None,
453 None,
454 None,
455 Vec::new(),
456 None,
457 None,
458 Some(*limit),
459 )?;
460 let mut result =
461 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
462 for item in &res.matches {
463 let mut record = UnifiedRecord::new();
464 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
465 record.set("score", Value::Float(item.score as f64));
466 result.push(record);
467 }
468 Ok(RuntimeQueryResult {
469 query: raw_query.to_string(),
470 mode: QueryMode::Sql,
471 statement: "search_hybrid",
472 engine: "runtime-search",
473 result,
474 affected_rows: 0,
475 statement_type: "select",
476 })
477 }
478 SearchCommand::Multimodal {
479 query,
480 collection,
481 limit,
482 } => {
483 let collections = collection.as_ref().map(|c| vec![c.clone()]);
484 let res =
485 self.search_multimodal(query.clone(), collections, None, None, Some(*limit))?;
486 let mut result =
487 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
488 for item in &res.matches {
489 let mut record = UnifiedRecord::new();
490 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
491 record.set("score", Value::Float(item.score as f64));
492 result.push(record);
493 }
494 Ok(RuntimeQueryResult {
495 query: raw_query.to_string(),
496 mode: QueryMode::Sql,
497 statement: "search_multimodal",
498 engine: "runtime-search",
499 result,
500 affected_rows: 0,
501 statement_type: "select",
502 })
503 }
504 SearchCommand::Index {
505 index,
506 value,
507 collection,
508 limit,
509 exact,
510 } => {
511 let collections = collection.as_ref().map(|c| vec![c.clone()]);
512 let res = self.search_index(
513 index.clone(),
514 value.clone(),
515 *exact,
516 collections,
517 None,
518 None,
519 Some(*limit),
520 )?;
521 let mut result =
522 UnifiedResult::with_columns(vec!["entity_id".into(), "score".into()]);
523 for item in &res.matches {
524 let mut record = UnifiedRecord::new();
525 record.set("entity_id", Value::UnsignedInteger(item.entity.id.raw()));
526 record.set("score", Value::Float(item.score as f64));
527 result.push(record);
528 }
529 Ok(RuntimeQueryResult {
530 query: raw_query.to_string(),
531 mode: QueryMode::Sql,
532 statement: "search_index",
533 engine: "runtime-search",
534 result,
535 affected_rows: 0,
536 statement_type: "select",
537 })
538 }
539 SearchCommand::Context {
540 query,
541 field,
542 collection,
543 limit,
544 depth,
545 } => {
546 use crate::application::SearchContextInput;
547 let input = SearchContextInput {
551 query: query.clone(),
552 field: field.clone(),
553 vector: None,
554 collections: collection.as_ref().map(|c| vec![c.clone()]),
555 graph_depth: Some(*depth),
556 graph_max_edges: None,
557 max_cross_refs: None,
558 follow_cross_refs: None,
559 expand_graph: None,
560 global_scan: None,
561 reindex: None,
562 limit: Some(*limit),
563 min_score: None,
564 };
565 let scope = self.ai_scope();
566 let res =
567 if super::statement_frame::ReadFrame::visible_collections(&scope).is_some() {
568 crate::runtime::authorized_search::AuthorizedSearch::execute_context(
569 self, &scope, input,
570 )?
571 } else {
572 self.search_context(input)?
573 };
574 let mut result = UnifiedResult::with_columns(vec![
575 "entity_id".into(),
576 "collection".into(),
577 "score".into(),
578 "discovery".into(),
579 "kind".into(),
580 ]);
581 let all_entities = res
582 .tables
583 .iter()
584 .map(|e| (e, "table"))
585 .chain(res.graph.nodes.iter().map(|e| (e, "graph_node")))
586 .chain(res.graph.edges.iter().map(|e| (e, "graph_edge")))
587 .chain(res.vectors.iter().map(|e| (e, "vector")))
588 .chain(res.documents.iter().map(|e| (e, "document")))
589 .chain(res.key_values.iter().map(|e| (e, "kv")));
590 for (entity, kind) in all_entities {
591 let mut record = UnifiedRecord::new();
592 record.set("entity_id", Value::UnsignedInteger(entity.entity.id.raw()));
593 record.set("collection", Value::text(entity.collection.clone()));
594 record.set("score", Value::Float(entity.score as f64));
595 record.set("discovery", Value::text(format!("{:?}", entity.discovery)));
596 record.set("kind", Value::text(kind.to_string()));
597 result.push(record);
598 }
599 Ok(RuntimeQueryResult {
600 query: raw_query.to_string(),
601 mode: QueryMode::Sql,
602 statement: "search_context",
603 engine: "runtime-context",
604 result,
605 affected_rows: 0,
606 statement_type: "select",
607 })
608 }
609 SearchCommand::SpatialRadius {
610 center_lat,
611 center_lon,
612 radius_km,
613 collection,
614 column,
615 limit,
616 } => {
617 use crate::storage::unified::spatial_index::haversine_km;
618 let _ = column; let store = self.inner.db.store();
620 let entities = store
621 .get_collection(collection)
622 .map(|m| m.query_all(|_| true))
623 .unwrap_or_default();
624
625 let mut hits: Vec<(u64, f64)> = Vec::new();
626 for entity in &entities {
627 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
629 let dist = haversine_km(*center_lat, *center_lon, lat, lon);
630 if dist <= *radius_km {
631 hits.push((entity.id.raw(), dist));
632 }
633 }
634 }
635 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
636 hits.truncate(*limit);
637
638 let mut result =
639 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
640 for (id, dist) in &hits {
641 let mut record = UnifiedRecord::new();
642 record.set("entity_id", Value::UnsignedInteger(*id));
643 record.set("distance_km", Value::Float(*dist));
644 result.push(record);
645 }
646 Ok(RuntimeQueryResult {
647 query: raw_query.to_string(),
648 mode: QueryMode::Sql,
649 statement: "search_spatial_radius",
650 engine: "runtime-spatial",
651 result,
652 affected_rows: 0,
653 statement_type: "select",
654 })
655 }
656 SearchCommand::SpatialBbox {
657 min_lat,
658 min_lon,
659 max_lat,
660 max_lon,
661 collection,
662 column,
663 limit,
664 } => {
665 let _ = column;
666 let store = self.inner.db.store();
667 let entities = store
668 .get_collection(collection)
669 .map(|m| m.query_all(|_| true))
670 .unwrap_or_default();
671
672 let mut result = UnifiedResult::with_columns(vec!["entity_id".into()]);
673 let mut count = 0;
674 for entity in &entities {
675 if count >= *limit {
676 break;
677 }
678 if let Some((lat, lon)) = extract_geo_from_entity(entity) {
679 if lat >= *min_lat && lat <= *max_lat && lon >= *min_lon && lon <= *max_lon
680 {
681 let mut record = UnifiedRecord::new();
682 record.set("entity_id", Value::UnsignedInteger(entity.id.raw()));
683 result.push(record);
684 count += 1;
685 }
686 }
687 }
688 Ok(RuntimeQueryResult {
689 query: raw_query.to_string(),
690 mode: QueryMode::Sql,
691 statement: "search_spatial_bbox",
692 engine: "runtime-spatial",
693 result,
694 affected_rows: 0,
695 statement_type: "select",
696 })
697 }
698 SearchCommand::SpatialNearest {
699 lat,
700 lon,
701 k,
702 collection,
703 column,
704 } => {
705 use crate::storage::unified::spatial_index::haversine_km;
706 let _ = column;
707 let store = self.inner.db.store();
708 let entities = store
709 .get_collection(collection)
710 .map(|m| m.query_all(|_| true))
711 .unwrap_or_default();
712
713 let mut hits: Vec<(u64, f64)> = Vec::new();
714 for entity in &entities {
715 if let Some((elat, elon)) = extract_geo_from_entity(entity) {
716 let dist = haversine_km(*lat, *lon, elat, elon);
717 hits.push((entity.id.raw(), dist));
718 }
719 }
720 hits.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
721 hits.truncate(*k);
722
723 let mut result =
724 UnifiedResult::with_columns(vec!["entity_id".into(), "distance_km".into()]);
725 for (id, dist) in &hits {
726 let mut record = UnifiedRecord::new();
727 record.set("entity_id", Value::UnsignedInteger(*id));
728 record.set("distance_km", Value::Float(*dist));
729 result.push(record);
730 }
731 Ok(RuntimeQueryResult {
732 query: raw_query.to_string(),
733 mode: QueryMode::Sql,
734 statement: "search_spatial_nearest",
735 engine: "runtime-spatial",
736 result,
737 affected_rows: 0,
738 statement_type: "select",
739 })
740 }
741 }
742 }
743}
744
745fn parse_direction(s: &str) -> RedDBResult<RuntimeGraphDirection> {
750 match s.to_lowercase().as_str() {
751 "outgoing" | "out" => Ok(RuntimeGraphDirection::Outgoing),
752 "incoming" | "in" => Ok(RuntimeGraphDirection::Incoming),
753 "both" | "any" => Ok(RuntimeGraphDirection::Both),
754 _ => Err(RedDBError::Query(format!(
755 "unknown direction: '{s}', expected outgoing|incoming|both"
756 ))),
757 }
758}
759
760fn parse_path_algorithm(s: &str) -> RedDBResult<RuntimeGraphPathAlgorithm> {
761 match s.to_lowercase().as_str() {
762 "bfs" => Ok(RuntimeGraphPathAlgorithm::Bfs),
763 "dijkstra" => Ok(RuntimeGraphPathAlgorithm::Dijkstra),
764 "astar" | "a*" => Ok(RuntimeGraphPathAlgorithm::AStar),
765 "bellman_ford" | "bellmanford" => Ok(RuntimeGraphPathAlgorithm::BellmanFord),
766 _ => Err(RedDBError::Query(format!(
767 "unknown path algorithm: '{s}', expected bfs|dijkstra|astar|bellman_ford"
768 ))),
769 }
770}
771
772fn parse_traversal_strategy(s: &str) -> RedDBResult<RuntimeGraphTraversalStrategy> {
773 match s.to_lowercase().as_str() {
774 "bfs" => Ok(RuntimeGraphTraversalStrategy::Bfs),
775 "dfs" => Ok(RuntimeGraphTraversalStrategy::Dfs),
776 _ => Err(RedDBError::Query(format!(
777 "unknown traversal strategy: '{s}', expected bfs|dfs"
778 ))),
779 }
780}
781
782fn parse_centrality_algorithm(s: &str) -> RedDBResult<RuntimeGraphCentralityAlgorithm> {
783 match s.to_lowercase().as_str() {
784 "degree" => Ok(RuntimeGraphCentralityAlgorithm::Degree),
785 "closeness" => Ok(RuntimeGraphCentralityAlgorithm::Closeness),
786 "betweenness" => Ok(RuntimeGraphCentralityAlgorithm::Betweenness),
787 "eigenvector" => Ok(RuntimeGraphCentralityAlgorithm::Eigenvector),
788 "pagerank" | "page_rank" => Ok(RuntimeGraphCentralityAlgorithm::PageRank),
789 _ => Err(RedDBError::Query(format!(
790 "unknown centrality algorithm: '{s}', expected degree|closeness|betweenness|eigenvector|pagerank"
791 ))),
792 }
793}
794
795fn parse_community_algorithm(s: &str) -> RedDBResult<RuntimeGraphCommunityAlgorithm> {
796 match s.to_lowercase().as_str() {
797 "label_propagation" | "labelpropagation" => {
798 Ok(RuntimeGraphCommunityAlgorithm::LabelPropagation)
799 }
800 "louvain" => Ok(RuntimeGraphCommunityAlgorithm::Louvain),
801 _ => Err(RedDBError::Query(format!(
802 "unknown community algorithm: '{s}', expected label_propagation|louvain"
803 ))),
804 }
805}
806
807fn parse_components_mode(s: &str) -> RedDBResult<RuntimeGraphComponentsMode> {
808 match s.to_lowercase().as_str() {
809 "connected" => Ok(RuntimeGraphComponentsMode::Connected),
810 "weak" | "weakly_connected" => Ok(RuntimeGraphComponentsMode::Weak),
811 "strong" | "strongly_connected" => Ok(RuntimeGraphComponentsMode::Strong),
812 _ => Err(RedDBError::Query(format!(
813 "unknown components mode: '{s}', expected connected|weak|strong"
814 ))),
815 }
816}
817
818fn extract_geo_from_entity(entity: &UnifiedEntity) -> Option<(f64, f64)> {
823 match &entity.data {
824 EntityData::Row(row) => {
825 if let Some(ref named) = row.named {
827 for value in named.values() {
829 if let Value::GeoPoint(lat_micro, lon_micro) = value {
830 return Some((
831 *lat_micro as f64 / 1_000_000.0,
832 *lon_micro as f64 / 1_000_000.0,
833 ));
834 }
835 }
836 let lat =
838 named
839 .get("lat")
840 .or_else(|| named.get("latitude"))
841 .and_then(|v| match v {
842 Value::Float(f) => Some(*f),
843 Value::Integer(i) => Some(*i as f64),
844 _ => None,
845 });
846 let lon = named
847 .get("lon")
848 .or_else(|| named.get("lng"))
849 .or_else(|| named.get("longitude"))
850 .and_then(|v| match v {
851 Value::Float(f) => Some(*f),
852 Value::Integer(i) => Some(*i as f64),
853 _ => None,
854 });
855 if let (Some(la), Some(lo)) = (lat, lon) {
856 return Some((la, lo));
857 }
858 }
859 for value in &row.columns {
861 if let Value::GeoPoint(lat_micro, lon_micro) = value {
862 return Some((
863 *lat_micro as f64 / 1_000_000.0,
864 *lon_micro as f64 / 1_000_000.0,
865 ));
866 }
867 }
868 None
869 }
870 EntityData::Node(node) => {
871 for value in node.properties.values() {
873 if let Value::GeoPoint(lat_micro, lon_micro) = value {
874 return Some((
875 *lat_micro as f64 / 1_000_000.0,
876 *lon_micro as f64 / 1_000_000.0,
877 ));
878 }
879 }
880 let lat = node
881 .properties
882 .get("lat")
883 .or_else(|| node.properties.get("latitude"))
884 .and_then(|v| match v {
885 Value::Float(f) => Some(*f),
886 Value::Integer(i) => Some(*i as f64),
887 _ => None,
888 });
889 let lon = node
890 .properties
891 .get("lon")
892 .or_else(|| node.properties.get("lng"))
893 .or_else(|| node.properties.get("longitude"))
894 .and_then(|v| match v {
895 Value::Float(f) => Some(*f),
896 Value::Integer(i) => Some(*i as f64),
897 _ => None,
898 });
899 if let (Some(la), Some(lo)) = (lat, lon) {
900 return Some((la, lo));
901 }
902 None
903 }
904 _ => None,
905 }
906}