1use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8
9use crate::executor::{GraphQueryExecutor, JsonRow, StoredGraphConfig};
10use crate::provider::GraphProvider;
11use crate::types::{
12 Direction, EdgeExpansion, GraphNode, GraphPayload, GraphRelationship, GraphSchema, GraphStats,
13 NodeMetadata, Props, SearchHits,
14};
15
16const NOW: &str = "1970-01-01T00:00:00Z";
17
18fn ident(name: &str) -> String {
19 format!("\"{}\"", name.replace('"', "\"\""))
20}
21fn lit(v: &str) -> String {
22 format!("'{}'", v.replace('\'', "''"))
23}
24fn in_list(values: &[String]) -> String {
25 values.iter().map(|v| lit(v)).collect::<Vec<_>>().join(",")
26}
27fn as_str(v: &serde_json::Value) -> String {
29 match v {
30 serde_json::Value::String(s) => s.clone(),
31 other => other.to_string(),
32 }
33}
34
35fn node_source(c: &StoredGraphConfig) -> String {
44 format!(
45 "(select * from (select *, row_number() over (partition by {id} order by {id}) as __rn from {t}) where __rn = 1)",
46 id = ident(&c.id_col), t = ident(&c.node_table),
47 )
48}
49fn edge_source(c: &StoredGraphConfig) -> String {
52 format!(
53 "(select * from (select *, row_number() over (partition by {s},{d},{ty} order by {s}) as __rn from {t}) where __rn = 1)",
54 s = ident(&c.src_col), d = ident(&c.dst_col), ty = ident(&c.type_col), t = ident(&c.edge_table),
55 )
56}
57
58pub(crate) fn node_sample_sql(c: &StoredGraphConfig, limit: usize) -> String {
59 format!("select * from {} limit {}", node_source(c), limit)
60}
61pub(crate) fn edge_sample_sql(c: &StoredGraphConfig, limit: usize) -> String {
62 format!("select * from {} limit {}", edge_source(c), limit)
63}
64pub(crate) fn node_by_id_sql(c: &StoredGraphConfig, id: &str) -> String {
65 format!("select * from {} where {} = {} limit 1", node_source(c), ident(&c.id_col), lit(id))
66}
67pub(crate) fn neighbors_sql(c: &StoredGraphConfig, ids: &[String], dir: Direction, limit: usize) -> String {
68 let list = in_list(ids);
69 let pred = match dir {
70 Direction::Forward => format!("{} in ({list})", ident(&c.src_col)),
71 Direction::Backward => format!("{} in ({list})", ident(&c.dst_col)),
72 Direction::Both => format!("{} in ({list}) or {} in ({list})", ident(&c.src_col), ident(&c.dst_col)),
73 };
74 format!("select * from {} where {pred} limit {limit}", edge_source(c))
75}
76pub(crate) fn search_sql(c: &StoredGraphConfig, text: &str, limit: usize, offset: usize) -> String {
77 let needle = lit(&format!("%{}%", text.to_lowercase()));
78 format!(
79 "select * from {t} where lower(cast({id} as varchar)) like {n} or lower(cast({lbl} as varchar)) like {n} limit {limit} offset {offset}",
80 t = node_source(c), id = ident(&c.id_col), lbl = ident(&c.label_col), n = needle,
81 )
82}
83pub(crate) fn count_sql(source: &str) -> String {
85 format!("select count(*) as n from {}", source)
86}
87pub(crate) fn group_count_sql(source: &str, col: &str) -> String {
89 format!("select cast({c} as varchar) as k, count(*) as n from {t} group by {c}", c = ident(col), t = source)
90}
91
92fn parse_labels(v: Option<&serde_json::Value>) -> Vec<String> {
93 match v {
94 Some(serde_json::Value::Array(a)) => a.iter().map(as_str).collect(),
95 Some(serde_json::Value::String(s)) => vec![s.clone()],
96 Some(other) => vec![other.to_string()],
97 None => vec![],
98 }
99}
100
101const PROPS_COL: &str = "props";
104
105fn hex_decode(s: &str) -> Option<Vec<u8>> {
107 if s.is_empty() || s.len() % 2 != 0 {
108 return None;
109 }
110 let val = |c: u8| match c {
111 b'0'..=b'9' => Some(c - b'0'),
112 b'a'..=b'f' => Some(c - b'a' + 10),
113 b'A'..=b'F' => Some(c - b'A' + 10),
114 _ => None,
115 };
116 let b = s.as_bytes();
117 let mut out = Vec::with_capacity(b.len() / 2);
118 let mut i = 0;
119 while i < b.len() {
120 out.push((val(b[i])? << 4) | val(b[i + 1])?);
121 i += 2;
122 }
123 Some(out)
124}
125
126fn decode_props_blob(v: &serde_json::Value) -> Option<serde_json::Map<String, serde_json::Value>> {
131 let s = v.as_str()?;
132 if let Some(bytes) = hex_decode(s) {
133 if let Ok(txt) = std::str::from_utf8(&bytes) {
134 if let Ok(serde_json::Value::Object(m)) = serde_json::from_str(txt) {
135 return Some(m);
136 }
137 }
138 }
139 if let Ok(serde_json::Value::Object(m)) = serde_json::from_str::<serde_json::Value>(s) {
140 return Some(m);
141 }
142 None
143}
144
145fn collect_props(row: &JsonRow, role_cols: &[&str]) -> Props {
149 let mut props: Props = BTreeMap::new();
150 let mut blob: Option<&serde_json::Value> = None;
151 for (k, v) in row {
152 if role_cols.contains(&k.as_str()) || k == "__rn" {
153 continue;
154 }
155 if k == PROPS_COL {
156 blob = Some(v);
157 continue;
158 }
159 props.insert(k.clone(), v.clone());
160 }
161 if let Some(v) = blob {
162 match decode_props_blob(v) {
163 Some(obj) => {
164 for (pk, pv) in obj {
165 props.entry(pk).or_insert(pv);
166 }
167 }
168 None => {
170 props.insert(PROPS_COL.to_string(), v.clone());
171 }
172 }
173 }
174 props
175}
176
177pub(crate) fn row_to_node(c: &StoredGraphConfig, row: &JsonRow) -> GraphNode {
178 let id = row.get(&c.id_col).map(as_str).unwrap_or_default();
179 let labels = parse_labels(row.get(&c.label_col));
180 let realm = c.realm_col.as_ref().and_then(|rc| row.get(rc)).map(as_str).unwrap_or_else(|| c.database.clone());
181 let role_cols: [&str; 3] = [c.id_col.as_str(), c.label_col.as_str(), c.realm_col.as_deref().unwrap_or("")];
182 let props = collect_props(row, &role_cols);
183 GraphNode {
184 id, labels, properties: props,
185 metadata: NodeMetadata { created_at: NOW.into(), updated_at: NOW.into(), source_type: Some("stored".into()), source_id: None, realm },
186 }
187}
188
189pub(crate) fn row_to_edge(c: &StoredGraphConfig, row: &JsonRow) -> GraphRelationship {
190 let src = row.get(&c.src_col).map(as_str).unwrap_or_default();
191 let dst = row.get(&c.dst_col).map(as_str).unwrap_or_default();
192 let ty = row.get(&c.type_col).map(as_str).unwrap_or_default();
193 let role_cols = [c.src_col.as_str(), c.dst_col.as_str(), c.type_col.as_str()];
194 let props = collect_props(row, &role_cols);
195 GraphRelationship {
196 id: format!("{src}->{dst}:{ty}"),
197 source_id: src, target_id: dst, relationship_type: ty, properties: props,
198 }
199}
200
201pub struct StoredGraphProvider {
202 cfg: StoredGraphConfig,
203 exec: Arc<dyn GraphQueryExecutor>,
204}
205
206impl StoredGraphProvider {
207 pub fn new(cfg: StoredGraphConfig, exec: Arc<dyn GraphQueryExecutor>) -> Self {
208 Self { cfg, exec }
209 }
210 async fn rows(&self, sql: String) -> anyhow::Result<Vec<JsonRow>> {
211 self.exec.query(&self.cfg.database, sql).await
212 }
213 fn count_of(rows: &[JsonRow]) -> usize {
214 rows.first().and_then(|r| r.get("n")).and_then(|v| v.as_u64()).unwrap_or(0) as usize
215 }
216}
217
218async fn stats_for(p: &StoredGraphProvider) -> anyhow::Result<GraphStats> {
219 let total_nodes = StoredGraphProvider::count_of(&p.rows(count_sql(&node_source(&p.cfg))).await?);
220 let total_relationships = StoredGraphProvider::count_of(&p.rows(count_sql(&edge_source(&p.cfg))).await?);
221 let mut label_counts = BTreeMap::new();
222 for r in p.rows(group_count_sql(&node_source(&p.cfg), &p.cfg.label_col)).await? {
223 let k = r.get("k").map(as_str).unwrap_or_default();
224 let n = r.get("n").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
225 label_counts.insert(k, n);
226 }
227 let mut relationship_type_counts = BTreeMap::new();
228 for r in p.rows(group_count_sql(&edge_source(&p.cfg), &p.cfg.type_col)).await? {
229 let k = r.get("k").map(as_str).unwrap_or_default();
230 let n = r.get("n").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
231 relationship_type_counts.insert(k, n);
232 }
233 Ok(GraphStats { total_nodes, total_relationships, label_counts, relationship_type_counts })
234}
235
236#[async_trait]
237impl GraphProvider for StoredGraphProvider {
238 async fn overview(&self, _realm: Option<&str>, limit: usize) -> anyhow::Result<GraphPayload> {
239 let nodes: Vec<GraphNode> = self.rows(node_sample_sql(&self.cfg, limit)).await?
240 .iter().map(|r| row_to_node(&self.cfg, r)).collect();
241 let kept: std::collections::HashSet<&String> = nodes.iter().map(|n| &n.id).collect();
242 let edges: Vec<GraphRelationship> = self.rows(edge_sample_sql(&self.cfg, limit.saturating_mul(4))).await?
243 .iter().map(|r| row_to_edge(&self.cfg, r))
244 .filter(|e| kept.contains(&e.source_id) && kept.contains(&e.target_id))
245 .collect();
246 let stats = stats_for(self).await?;
247 Ok(GraphPayload { stats, nodes, edges })
248 }
249 async fn node(&self, id: &str) -> anyhow::Result<Option<GraphNode>> {
250 Ok(self.rows(node_by_id_sql(&self.cfg, id)).await?.first().map(|r| row_to_node(&self.cfg, r)))
251 }
252 async fn neighbors(&self, ids: &[String], dir: Direction, _only_internal: bool, limit: usize) -> anyhow::Result<EdgeExpansion> {
253 if ids.is_empty() { return Ok(EdgeExpansion { edges: vec![], new_node_ids: vec![] }); }
254 let edges: Vec<GraphRelationship> = self.rows(neighbors_sql(&self.cfg, ids, dir, limit)).await?
255 .iter().map(|r| row_to_edge(&self.cfg, r)).collect();
256 let idset: std::collections::HashSet<&String> = ids.iter().collect();
257 let mut new_ids = Vec::new();
258 for e in &edges {
259 for end in [&e.source_id, &e.target_id] {
260 if !idset.contains(end) && !new_ids.contains(end) { new_ids.push(end.clone()); }
261 }
262 }
263 Ok(EdgeExpansion { edges, new_node_ids: new_ids })
264 }
265 async fn subgraph(&self, id: &str, depth: usize) -> anyhow::Result<GraphPayload> {
266 let mut frontier = vec![id.to_string()];
267 let mut all_node_ids: std::collections::HashSet<String> = frontier.iter().cloned().collect();
268 let mut edges: Vec<GraphRelationship> = Vec::new();
269 for _ in 0..depth.max(1) {
270 if frontier.is_empty() { break; }
271 let exp = self.neighbors(&frontier, Direction::Both, true, 500).await?;
272 let mut next = Vec::new();
273 for e in exp.edges {
274 if !edges.iter().any(|k| k.id == e.id) { edges.push(e); }
275 }
276 for nid in exp.new_node_ids {
277 if all_node_ids.insert(nid.clone()) { next.push(nid); }
278 }
279 frontier = next;
280 }
281 let mut nodes = Vec::new();
283 for nid in &all_node_ids {
284 if let Some(n) = self.node(nid).await? { nodes.push(n); }
285 }
286 let stats = GraphStats {
287 total_nodes: nodes.len(), total_relationships: edges.len(),
288 label_counts: BTreeMap::new(), relationship_type_counts: BTreeMap::new(),
289 };
290 Ok(GraphPayload { stats, nodes, edges })
291 }
292 async fn search(&self, text: &str, labels: &[String], _realm: Option<&str>, limit: usize, offset: usize) -> anyhow::Result<SearchHits> {
293 let mut hits: Vec<GraphNode> = self.rows(search_sql(&self.cfg, text, limit, offset)).await?
294 .iter().map(|r| row_to_node(&self.cfg, r)).collect();
295 if !labels.is_empty() {
296 hits.retain(|n| labels.iter().any(|l| n.labels.contains(l)));
297 }
298 let total = hits.len();
299 Ok(SearchHits { hits, total, limit, offset })
300 }
301 async fn stats(&self, _realm: Option<&str>) -> anyhow::Result<GraphStats> {
302 stats_for(self).await
303 }
304 async fn schema(&self) -> anyhow::Result<GraphSchema> {
305 let stats = stats_for(self).await?;
306 Ok(GraphSchema {
307 node_kinds: stats.label_counts.keys().cloned().collect(),
308 edge_types: stats.relationship_type_counts.keys().cloned().collect(),
309 property_keys: BTreeMap::new(),
310 })
311 }
312}
313
314#[cfg(test)]
315mod sql_tests {
316 use super::*;
317 use crate::executor::StoredGraphConfig;
318
319 fn cfg() -> StoredGraphConfig {
320 StoredGraphConfig {
321 database: "kg".into(), node_table: "kg_nodes".into(), edge_table: "kg_edges".into(),
322 id_col: "id".into(), label_col: "labels".into(),
323 src_col: "src".into(), dst_col: "dst".into(), type_col: "type".into(),
324 realm_col: Some("realm".into()),
325 }
326 }
327
328 #[test]
329 fn node_by_id_sql_quotes_and_escapes() {
330 let s = node_by_id_sql(&cfg(), "a'b");
331 assert!(s.contains(r#"from "kg_nodes""#), "{s}");
332 assert!(s.contains(r#""id" = 'a''b'"#), "{s}");
333 assert!(s.to_lowercase().contains("limit 1"));
334 }
335
336 #[test]
337 fn neighbors_sql_both_directions() {
338 let s = neighbors_sql(&cfg(), &["x".into(), "y".into()], Direction::Both, 50);
339 assert!(s.contains(r#""src" in ('x','y')"#), "{s}");
340 assert!(s.contains(r#""dst" in ('x','y')"#), "{s}");
341 assert!(s.to_lowercase().contains("limit 50"));
342 }
343
344 #[test]
345 fn node_sample_dedups_by_id() {
346 let s = node_sample_sql(&cfg(), 50).to_lowercase();
349 assert!(s.contains("row_number()"), "{s}");
350 assert!(s.contains(r#"partition by "id""#), "{s}");
351 assert!(s.contains("kg_nodes"), "{s}");
352 assert!(s.contains("limit 50"), "{s}");
353 }
354
355 #[test]
356 fn edge_sample_dedups_by_src_dst_type() {
357 let s = edge_sample_sql(&cfg(), 50).to_lowercase();
358 assert!(s.contains("row_number()"), "{s}");
359 assert!(s.contains(r#"partition by "src","dst","type""#), "{s}");
360 assert!(s.contains("kg_edges"), "{s}");
361 }
362
363 #[test]
364 fn count_and_group_count_run_over_deduped_source() {
365 let cnt = count_sql(&node_source(&cfg())).to_lowercase();
367 assert!(cnt.contains("count(*)") && cnt.contains("row_number()"), "{cnt}");
368 let grp = group_count_sql(&node_source(&cfg()), &cfg().label_col).to_lowercase();
369 assert!(grp.contains("group by") && grp.contains("row_number()"), "{grp}");
370 }
371
372 #[test]
373 fn row_to_node_uses_roles() {
374 let mut row = JsonRow::new();
375 row.insert("id".into(), serde_json::json!("n1"));
376 row.insert("labels".into(), serde_json::json!("Service"));
377 row.insert("realm".into(), serde_json::json!("prod"));
378 row.insert("owner".into(), serde_json::json!("team-a"));
379 let n = row_to_node(&cfg(), &row);
380 assert_eq!(n.id, "n1");
381 assert_eq!(n.labels, vec!["Service".to_string()]);
382 assert_eq!(n.metadata.realm, "prod");
383 assert_eq!(n.properties.get("owner").unwrap(), &serde_json::json!("team-a"));
384 assert!(!n.properties.contains_key("id")); }
386
387 #[test]
388 fn row_to_edge_uses_roles() {
389 let mut row = JsonRow::new();
390 row.insert("src".into(), serde_json::json!("a"));
391 row.insert("dst".into(), serde_json::json!("b"));
392 row.insert("type".into(), serde_json::json!("CALLS"));
393 row.insert("weight".into(), serde_json::json!(5));
394 let e = row_to_edge(&cfg(), &row);
395 assert_eq!(e.source_id, "a");
396 assert_eq!(e.target_id, "b");
397 assert_eq!(e.relationship_type, "CALLS");
398 assert_eq!(e.properties.get("weight").unwrap(), &serde_json::json!(5));
399 }
400
401 #[test]
402 fn hex_decode_round_trips() {
403 assert_eq!(hex_decode("7b226b223a2276227d").unwrap(), br#"{"k":"v"}"#.to_vec());
405 assert!(hex_decode("abc").is_none(), "odd length");
406 assert!(hex_decode("zz").is_none(), "non-hex");
407 }
408
409 #[test]
410 fn decode_props_blob_handles_hex_and_plain_json() {
411 let hex = serde_json::json!("7b226b223a2276227d");
413 let m = decode_props_blob(&hex).expect("hex json");
414 assert_eq!(m.get("k").unwrap(), &serde_json::json!("v"));
415 let plain = serde_json::json!(r#"{"a":1}"#);
417 assert_eq!(decode_props_blob(&plain).unwrap().get("a").unwrap(), &serde_json::json!(1));
418 assert!(decode_props_blob(&serde_json::json!("deadbeef")).is_none());
420 }
421
422 #[test]
423 fn collect_props_merges_blob_and_explicit_wins() {
424 let blob = "7b226c616e6775616765223a22707974686f6e222c226e616d65223a22626c6f622d6e616d65227d";
426 let mut row = JsonRow::new();
427 row.insert("id".into(), serde_json::json!("file:1"));
428 row.insert("labels".into(), serde_json::json!("CodeFile"));
429 row.insert("name".into(), serde_json::json!("real-name")); row.insert("props".into(), serde_json::json!(blob));
431 row.insert("__rn".into(), serde_json::json!(1)); let props = collect_props(&row, &["id", "labels"]);
433 assert_eq!(props.get("language").unwrap(), &serde_json::json!("python"));
435 assert_eq!(props.get("name").unwrap(), &serde_json::json!("real-name"));
437 assert!(!props.contains_key("__rn"));
439 assert!(!props.contains_key("props"));
440 assert!(!props.contains_key("id"));
441 }
442
443 #[test]
444 fn collect_props_keeps_undecodable_blob_raw() {
445 let mut row = JsonRow::new();
446 row.insert("props".into(), serde_json::json!("not-json"));
447 let props = collect_props(&row, &[]);
448 assert_eq!(props.get("props").unwrap(), &serde_json::json!("not-json"));
449 }
450}
451
452#[cfg(test)]
453mod provider_tests {
454 use super::*;
455 use crate::executor::{GraphQueryExecutor, JsonRow, StoredGraphConfig};
456
457 struct FakeExec;
458 fn row(pairs: &[(&str, serde_json::Value)]) -> JsonRow {
459 pairs.iter().map(|(k, v)| (k.to_string(), v.clone())).collect()
460 }
461 #[async_trait]
462 impl GraphQueryExecutor for FakeExec {
463 async fn query(&self, _db: &str, sql: String) -> anyhow::Result<Vec<JsonRow>> {
464 let s = sql.to_lowercase();
465 if s.contains("count(*)") && s.contains("group by") {
466 return Ok(vec![row(&[("k", serde_json::json!("Service")), ("n", serde_json::json!(2))])]);
467 }
468 if s.contains("count(*)") {
469 return Ok(vec![row(&[("n", serde_json::json!(2))])]);
470 }
471 if s.contains("kg_nodes") {
472 return Ok(vec![
473 row(&[("id", serde_json::json!("a")), ("labels", serde_json::json!("Service"))]),
474 row(&[("id", serde_json::json!("b")), ("labels", serde_json::json!("Service"))]),
475 ]);
476 }
477 if s.contains("kg_edges") {
478 return Ok(vec![row(&[("src", serde_json::json!("a")), ("dst", serde_json::json!("b")), ("type", serde_json::json!("CALLS"))])]);
479 }
480 Ok(vec![])
481 }
482 }
483 fn provider() -> StoredGraphProvider {
484 StoredGraphProvider::new(
485 StoredGraphConfig { database: "kg".into(), node_table: "kg_nodes".into(), edge_table: "kg_edges".into(),
486 id_col: "id".into(), label_col: "labels".into(), src_col: "src".into(), dst_col: "dst".into(), type_col: "type".into(), realm_col: None },
487 std::sync::Arc::new(FakeExec),
488 )
489 }
490
491 #[tokio::test]
492 async fn overview_shapes_nodes_edges_and_stats() {
493 let p = provider();
494 let ov = p.overview(None, 100).await.unwrap();
495 assert_eq!(ov.nodes.len(), 2);
496 assert_eq!(ov.edges.len(), 1);
497 assert_eq!(ov.edges[0].relationship_type, "CALLS");
498 assert_eq!(ov.stats.total_nodes, 2);
499 assert_eq!(ov.stats.label_counts.get("Service").copied(), Some(2));
500 }
501 #[tokio::test]
502 async fn neighbors_collects_new_ids() {
503 let p = provider();
504 let exp = p.neighbors(&["a".into()], Direction::Both, true, 50).await.unwrap();
505 assert_eq!(exp.edges.len(), 1);
506 assert_eq!(exp.new_node_ids, vec!["b".to_string()]);
507 }
508}