1use std::collections::{HashMap, HashSet, VecDeque};
2
3use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
4use tracing::{debug, info};
5
6use crate::config::KgConfig;
7use crate::error::KgError;
8use crate::store::sqlite_schema::{DDL, FTS_TRIGGERS};
9use crate::traits::KnowledgeGraph;
10use crate::types::attribute::AttributeValue;
11use crate::types::confidence::Confidence;
12use crate::types::consistency::{ConsistencyIssue, ConsistencyIssueType, IssueSeverity};
13use crate::types::entity::{Entity, EntityType};
14use crate::types::graph::{GraphStats, PathStep, SubGraph};
15use crate::types::import::{ImportResult, MergeStrategy, UpsertResult};
16use crate::types::provenance::Provenance;
17use crate::types::query::{
18 EntityPage, EntityQuery, RelationQuery,
19};
20use crate::types::relation::{Relation, WeightStrategy};
21
22#[derive(Debug)]
24pub struct SqliteKnowledgeGraph {
25 pool: SqlitePool,
26 #[allow(dead_code)]
27 merge_strategy: MergeStrategy,
28 weight_strategy: WeightStrategy,
29 max_bfs_depth: u32,
30 max_path_search: u32,
31}
32
33impl SqliteKnowledgeGraph {
34 pub async fn new(path: &str, config: KgConfig) -> Result<Self, KgError> {
35 let pool = SqlitePoolOptions::new()
36 .max_connections(config.storage.pool_size)
37 .connect(&format!("sqlite:{}", path))
38 .await
39 .map_err(|e| KgError::Database(e.to_string()))?;
40
41 sqlx::query("PRAGMA journal_mode=WAL")
42 .execute(&pool)
43 .await
44 .map_err(|e| KgError::Database(e.to_string()))?;
45
46 let this = Self {
47 pool,
48 merge_strategy: config.merge_strategy,
49 weight_strategy: config.weight_strategy,
50 max_bfs_depth: config.max_bfs_depth,
51 max_path_search: config.max_path_search,
52 };
53
54 this.run_migrations().await?;
55 Ok(this)
56 }
57
58 async fn run_migrations(&self) -> Result<(), KgError> {
59 for stmt in DDL {
60 sqlx::query(stmt)
61 .execute(&self.pool)
62 .await
63 .map_err(|e| KgError::Database(format!("Migration failed: {}", e)))?;
64 }
65 for stmt in FTS_TRIGGERS {
66 let _ = sqlx::query(stmt).execute(&self.pool).await;
67 }
68 debug!("sqlite schema migrations complete");
69 Ok(())
70 }
71}
72
73#[async_trait::async_trait]
74impl KnowledgeGraph for SqliteKnowledgeGraph {
75 async fn upsert_entity(&self, entity: Entity) -> Result<UpsertResult, KgError> {
78 let existing: Option<EntityRow> = sqlx::query_as(
79 "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
80 version, source, tags_json, aliases_json
81 FROM entities WHERE id = ?",
82 )
83 .bind(&entity.id)
84 .fetch_optional(&self.pool)
85 .await
86 .map_err(|e| KgError::Database(e.to_string()))?;
87
88 let attrs_json =
89 serde_json::to_string(&entity.attributes).map_err(|e| KgError::Serialization(e.to_string()))?;
90 let tags_json =
91 serde_json::to_string(&entity.tags).map_err(|e| KgError::Serialization(e.to_string()))?;
92 let aliases_json =
93 serde_json::to_string(&entity.aliases).map_err(|e| KgError::Serialization(e.to_string()))?;
94 let entity_type = entity.entity_type.as_str();
95
96 if let Some(row) = existing {
97 let mut changed = Vec::new();
98 let conflicts = Vec::new();
99
100 if row.name != entity.name {
101 changed.push("name".into());
102 }
103 if row.entity_type != entity_type {
104 changed.push("entity_type".into());
105 }
106
107 if changed.is_empty() {
108 return Ok(UpsertResult::Unchanged);
109 }
110
111 sqlx::query(
112 "UPDATE entities SET name=?, entity_type=?, attributes_json=?, description=?,
113 updated_at=?, version=version+1, source=?, tags_json=?, aliases_json=?
114 WHERE id=?",
115 )
116 .bind(&entity.name)
117 .bind(&entity_type)
118 .bind(&attrs_json)
119 .bind(&entity.description)
120 .bind(current_epoch_ms() as i64)
121 .bind(&entity.source)
122 .bind(&tags_json)
123 .bind(&aliases_json)
124 .bind(&entity.id)
125 .execute(&self.pool)
126 .await
127 .map_err(|e| KgError::Database(e.to_string()))?;
128
129 Ok(UpsertResult::Updated { changed_fields: changed, conflicts })
130 } else {
131 sqlx::query(
132 "INSERT INTO entities (id, name, entity_type, attributes_json, description,
133 created_at, updated_at, version, source, tags_json, aliases_json)
134 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)",
135 )
136 .bind(&entity.id)
137 .bind(&entity.name)
138 .bind(&entity_type)
139 .bind(&attrs_json)
140 .bind(&entity.description)
141 .bind(entity.created_at as i64)
142 .bind(entity.updated_at as i64)
143 .bind(&entity.source)
144 .bind(&tags_json)
145 .bind(&aliases_json)
146 .execute(&self.pool)
147 .await
148 .map_err(|e| KgError::Database(e.to_string()))?;
149
150 Ok(UpsertResult::Created)
151 }
152 }
153
154 async fn get_entity(&self, id: &str) -> Result<Option<Entity>, KgError> {
155 let row: Option<EntityRow> = sqlx::query_as(
156 "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
157 version, source, tags_json, aliases_json
158 FROM entities WHERE id = ?",
159 )
160 .bind(id)
161 .fetch_optional(&self.pool)
162 .await
163 .map_err(|e| KgError::Database(e.to_string()))?;
164
165 Ok(row.map(|r| r.into()))
166 }
167
168 async fn search_entities(&self, query: &EntityQuery) -> Result<EntityPage, KgError> {
169 let use_fts = query.name_contains.is_some();
170
171 let select_cols = "e.id, e.name, e.entity_type, e.attributes_json, e.description, \
172 e.created_at, e.updated_at, e.version, e.source, e.tags_json, e.aliases_json";
173
174 let mut sql = if use_fts {
175 format!(
176 "SELECT {} FROM entities e JOIN entities_fts fts ON e.rowid = fts.rowid WHERE entities_fts MATCH ?",
177 select_cols
178 )
179 } else {
180 format!("SELECT {} FROM entities e WHERE 1=1", select_cols)
181 };
182 let mut params: Vec<String> = Vec::new();
183
184 if use_fts {
185 let fts_query = format!("{}*", query.name_contains.as_ref().unwrap());
187 params.push(fts_query);
188 } else if let Some(ref name) = query.name_contains {
189 params.push(format!("%{}%", name));
190 sql.push_str(" AND e.name LIKE ?");
191 }
192
193 if let Some(ref aliases) = query.alias_contains {
194 params.push(format!("%{}%", aliases));
195 sql.push_str(" AND e.aliases_json LIKE ?");
196 }
197 if let Some(ref types) = query.entity_types {
198 if !types.is_empty() {
199 let type_strs: Vec<String> = types.iter().map(|t| t.as_str()).collect();
200 let placeholders: Vec<String> = type_strs.iter().map(|_| "?".to_string()).collect();
201 sql.push_str(&format!(" AND e.entity_type IN ({})", placeholders.join(",")));
202 params.extend(type_strs);
203 }
204 }
205 if let Some(ref source) = query.source {
206 params.push(source.clone());
207 sql.push_str(" AND e.source = ?");
208 }
209 if let Some(ref tag_filter) = query.tags {
211 if !tag_filter.tags.is_empty() {
212 match tag_filter.mode {
213 crate::types::query::TagFilterMode::Or => {
214 let tag_conditions: Vec<String> = tag_filter.tags.iter().map(|_| {
215 "e.tags_json LIKE '%' || ? || '%'".to_string()
216 }).collect();
217 sql.push_str(&format!(" AND ({})", tag_conditions.join(" OR ")));
218 params.extend(tag_filter.tags.iter().cloned());
219 }
220 crate::types::query::TagFilterMode::And => {
221 for tag in &tag_filter.tags {
222 sql.push_str(" AND e.tags_json LIKE '%' || ? || '%'");
223 params.push(tag.clone());
224 }
225 }
226 }
227 }
228 }
229 for attr in &query.attribute_filters {
231 let json_path = format!("$.{}", attr.key);
232 match attr.operator {
233 crate::types::query::FilterOperator::Eq => {
234 sql.push_str(" AND json_extract(e.attributes_json, ?) = ?");
235 params.push(json_path);
236 params.push(attr.value.clone());
237 }
238 crate::types::query::FilterOperator::Contains => {
239 sql.push_str(" AND json_extract(e.attributes_json, ?) LIKE '%' || ? || '%'");
240 params.push(json_path);
241 params.push(attr.value.clone());
242 }
243 _ => {
244 sql.push_str(" AND json_extract(e.attributes_json, ?) = ?");
246 params.push(json_path);
247 params.push(attr.value.clone());
248 }
249 }
250 }
251
252 let count_sql = sql.replace(
254 &format!("SELECT {}", select_cols),
255 "SELECT COUNT(*)",
256 );
257
258 let mut count_query = sqlx::query_scalar(&count_sql);
259 for p in ¶ms {
260 count_query = count_query.bind(p);
261 }
262 let total: i64 = count_query
263 .fetch_one(&self.pool)
264 .await
265 .map_err(|e| KgError::Database(e.to_string()))?;
266
267 let order = match query.sort_by {
269 Some(crate::types::query::EntitySortField::Name) => "e.name ASC",
270 Some(crate::types::query::EntitySortField::CreatedAt) => "e.created_at DESC",
271 Some(crate::types::query::EntitySortField::UpdatedAt) => "e.updated_at DESC",
272 Some(crate::types::query::EntitySortField::EntityType) => "e.entity_type ASC",
273 Some(crate::types::query::EntitySortField::RelationCount) => "e.updated_at DESC",
274 None => {
275 if use_fts { "ORDER BY rank" } else { "ORDER BY e.updated_at DESC" }
276 }
277 };
278 sql.push_str(&format!(" {} LIMIT ? OFFSET ?", order));
279
280 let mut fetch_query = sqlx::query_as::<_, EntityRow>(&sql);
281 for p in ¶ms {
282 fetch_query = fetch_query.bind(p);
283 }
284 fetch_query = fetch_query
285 .bind(query.page.limit as i64)
286 .bind(query.page.offset as i64);
287
288 let rows: Vec<EntityRow> = fetch_query
289 .fetch_all(&self.pool)
290 .await
291 .map_err(|e| KgError::Database(e.to_string()))?;
292
293 let total = total as usize;
294 let items: Vec<Entity> = rows.into_iter().map(|r| r.into()).collect();
295 let has_more = query.page.offset + query.page.limit < total;
296
297 Ok(EntityPage { items, total, has_more })
298 }
299
300 async fn delete_entity(&self, id: &str) -> Result<usize, KgError> {
301 let mut txn = self.pool.begin().await.map_err(|e| KgError::Database(e.to_string()))?;
302
303 let outcome = {
304 let conn = std::ops::DerefMut::deref_mut(&mut txn);
305
306 let relation_count: (i64,) = sqlx::query_as(
307 "SELECT COUNT(*) FROM relations WHERE source_id = ? OR target_id = ?",
308 )
309 .bind(id)
310 .bind(id)
311 .fetch_one(&mut *conn)
312 .await
313 .map_err(|e| KgError::Database(e.to_string()))?;
314
315 sqlx::query("DELETE FROM relations WHERE source_id = ? OR target_id = ?")
316 .bind(id)
317 .bind(id)
318 .execute(&mut *conn)
319 .await
320 .map_err(|e| KgError::Database(e.to_string()))?;
321
322 sqlx::query("DELETE FROM entities WHERE id = ?")
323 .bind(id)
324 .execute(&mut *conn)
325 .await
326 .map_err(|e| KgError::Database(e.to_string()))?;
327
328 Ok::<usize, KgError>(relation_count.0 as usize)
329 };
330
331 match outcome {
332 Ok(count) => {
333 txn.commit().await.map_err(|e| KgError::Database(e.to_string()))?;
334 Ok(count)
335 }
336 Err(e) => {
337 let _ = txn.rollback().await;
338 Err(e)
339 }
340 }
341 }
342
343 async fn get_entities_batch(&self, ids: &[&str]) -> Result<Vec<Entity>, KgError> {
344 if ids.is_empty() {
345 return Ok(vec![]);
346 }
347 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
348 let sql = format!(
349 "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
350 version, source, tags_json, aliases_json
351 FROM entities WHERE id IN ({})",
352 placeholders.join(",")
353 );
354
355 let mut query = sqlx::query_as::<_, EntityRow>(&sql);
356 for id in ids {
357 query = query.bind(id);
358 }
359
360 let rows: Vec<EntityRow> = query
361 .fetch_all(&self.pool)
362 .await
363 .map_err(|e| KgError::Database(e.to_string()))?;
364
365 Ok(rows.into_iter().map(|r| r.into()).collect())
366 }
367
368 async fn upsert_relation(&self, relation: Relation) -> Result<UpsertResult, KgError> {
371 let existing: Option<RelationRow> = sqlx::query_as(
372 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
373 provenance_json, valid_from, valid_to, created_at, weight
374 FROM relations WHERE id = ?",
375 )
376 .bind(&relation.id)
377 .fetch_optional(&self.pool)
378 .await
379 .map_err(|e| KgError::Database(e.to_string()))?;
380
381 let props_json = serde_json::to_string(&relation.properties)
382 .map_err(|e| KgError::Serialization(e.to_string()))?;
383 let provenance_json = relation
384 .provenance
385 .as_ref()
386 .map(|p| serde_json::to_string(p))
387 .transpose()
388 .map_err(|e| KgError::Serialization(e.to_string()))?;
389
390 if existing.is_some() {
391 sqlx::query(
392 "UPDATE relations SET relation_type=?, properties_json=?, confidence=?,
393 provenance_json=?, valid_from=?, valid_to=?, weight=?
394 WHERE id=?",
395 )
396 .bind(&relation.relation_type)
397 .bind(&props_json)
398 .bind(relation.confidence.as_f32())
399 .bind(&provenance_json)
400 .bind(relation.valid_from.map(|v| v as i64))
401 .bind(relation.valid_to.map(|v| v as i64))
402 .bind(relation.weight)
403 .bind(&relation.id)
404 .execute(&self.pool)
405 .await
406 .map_err(|e| KgError::Database(e.to_string()))?;
407
408 Ok(UpsertResult::Updated {
409 changed_fields: vec!["relation_type".into()],
410 conflicts: vec![],
411 })
412 } else {
413 sqlx::query(
414 "INSERT INTO relations (id, source_id, target_id, relation_type, properties_json,
415 confidence, provenance_json, valid_from, valid_to, created_at, weight)
416 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
417 )
418 .bind(&relation.id)
419 .bind(&relation.source_id)
420 .bind(&relation.target_id)
421 .bind(&relation.relation_type)
422 .bind(&props_json)
423 .bind(relation.confidence.as_f32())
424 .bind(&provenance_json)
425 .bind(relation.valid_from.map(|v| v as i64))
426 .bind(relation.valid_to.map(|v| v as i64))
427 .bind(relation.created_at as i64)
428 .bind(relation.weight)
429 .execute(&self.pool)
430 .await
431 .map_err(|e| KgError::Database(e.to_string()))?;
432
433 Ok(UpsertResult::Created)
434 }
435 }
436
437 async fn get_relations(&self, entity_id: &str) -> Result<Vec<Relation>, KgError> {
438 let rows: Vec<RelationRow> = sqlx::query_as(
439 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
440 provenance_json, valid_from, valid_to, created_at, weight
441 FROM relations WHERE source_id = ? OR target_id = ?",
442 )
443 .bind(entity_id)
444 .bind(entity_id)
445 .fetch_all(&self.pool)
446 .await
447 .map_err(|e| KgError::Database(e.to_string()))?;
448
449 Ok(rows.into_iter().map(|r| r.into()).collect())
450 }
451
452 async fn query_relations(&self, query: &RelationQuery) -> Result<Vec<Relation>, KgError> {
453 let mut sql = String::from(
454 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
455 provenance_json, valid_from, valid_to, created_at, weight
456 FROM relations WHERE 1=1",
457 );
458 let mut params: Vec<String> = Vec::new();
459
460 if let Some(ref sid) = query.source_id {
461 params.push(sid.clone());
462 sql.push_str(" AND source_id = ?");
463 }
464 if let Some(ref tid) = query.target_id {
465 params.push(tid.clone());
466 sql.push_str(" AND target_id = ?");
467 }
468 if let Some(ref eid) = query.entity_id {
469 params.push(eid.clone());
470 params.push(eid.clone());
471 sql.push_str(" AND (source_id = ? OR target_id = ?)");
472 }
473 if let Some(ref rt) = query.relation_type {
474 params.push(rt.clone());
475 sql.push_str(" AND relation_type = ?");
476 }
477 if let Some(ref rts) = query.relation_types {
478 if !rts.is_empty() {
479 let placeholders: Vec<String> = rts.iter().map(|_| "?".to_string()).collect();
480 sql.push_str(&format!(" AND relation_type IN ({})", placeholders.join(",")));
481 params.extend(rts.iter().cloned());
482 }
483 }
484 if let Some(ref min_conf) = query.min_confidence {
485 sql.push_str(" AND confidence >= ?");
486 params.push(min_conf.as_f32().to_string());
487 }
488 if let Some(valid_at) = query.valid_at {
489 sql.push_str(" AND (valid_from IS NULL OR valid_from <= ?) AND (valid_to IS NULL OR valid_to >= ?)");
490 params.push(valid_at.to_string());
491 params.push(valid_at.to_string());
492 }
493
494 sql.push_str(" LIMIT ? OFFSET ?");
495
496 let mut fetch_query = sqlx::query_as::<_, RelationRow>(&sql);
497 for p in ¶ms {
498 fetch_query = fetch_query.bind(p);
499 }
500 fetch_query = fetch_query
501 .bind(query.page.limit as i64)
502 .bind(query.page.offset as i64);
503
504 let rows: Vec<RelationRow> = fetch_query
505 .fetch_all(&self.pool)
506 .await
507 .map_err(|e| KgError::Database(e.to_string()))?;
508
509 Ok(rows.into_iter().map(|r| r.into()).collect())
510 }
511
512 async fn delete_relation(&self, id: &str) -> Result<(), KgError> {
513 let result = sqlx::query("DELETE FROM relations WHERE id = ?")
514 .bind(id)
515 .execute(&self.pool)
516 .await
517 .map_err(|e| KgError::Database(e.to_string()))?;
518
519 if result.rows_affected() == 0 {
520 return Err(KgError::RelationNotFound(id.to_string()));
521 }
522 Ok(())
523 }
524
525 async fn get_neighbors(&self, entity_id: &str, depth: u32) -> Result<SubGraph, KgError> {
528 if depth > self.max_bfs_depth {
529 return Err(KgError::MaxDepthExceeded {
530 depth,
531 max: self.max_bfs_depth,
532 });
533 }
534
535 let center = self
536 .get_entity(entity_id)
537 .await?
538 .ok_or_else(|| KgError::EntityNotFound(entity_id.to_string()))?;
539
540 let mut visited_entities: HashMap<String, Entity> = HashMap::new();
541 let mut visited_relations: Vec<Relation> = Vec::new();
542 let mut queue: VecDeque<(String, u32)> = VecDeque::new();
543
544 visited_entities.insert(entity_id.to_string(), center.clone());
545 queue.push_back((entity_id.to_string(), 0));
546
547 while let Some((current_id, current_depth)) = queue.pop_front() {
548 if current_depth >= depth {
549 continue;
550 }
551
552 let relations = self.get_relations(¤t_id).await?;
554 for rel in relations {
555 let neighbor_id = if rel.source_id == current_id {
556 rel.target_id.clone()
557 } else {
558 rel.source_id.clone()
559 };
560
561 visited_relations.push(rel);
562
563 if !visited_entities.contains_key(&neighbor_id) {
564 if let Some(entity) = self.get_entity(&neighbor_id).await? {
565 visited_entities.insert(neighbor_id.clone(), entity);
566 queue.push_back((neighbor_id, current_depth + 1));
567 }
568 }
569 }
570 }
571
572 let entities: Vec<Entity> = visited_entities
573 .into_iter()
574 .filter(|(id, _)| id != entity_id)
575 .map(|(_, e)| e)
576 .collect();
577
578 Ok(SubGraph {
579 center,
580 entities,
581 relations: visited_relations,
582 })
583 }
584
585 async fn shortest_path(
586 &self,
587 from: &str,
588 to: &str,
589 ) -> Result<Option<Vec<PathStep>>, KgError> {
590 if from == to {
591 return Ok(Some(vec![]));
592 }
593
594 let entity_rows: Vec<EntityRow> = sqlx::query_as(
596 "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
597 version, source, tags_json, aliases_json FROM entities",
598 )
599 .fetch_all(&self.pool)
600 .await
601 .map_err(|e| KgError::Database(e.to_string()))?;
602
603 let entities: HashMap<String, Entity> =
604 entity_rows.into_iter().map(|r| (r.id.clone(), r.into())).collect();
605
606 let relation_rows: Vec<RelationRow> = sqlx::query_as(
607 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
608 provenance_json, valid_from, valid_to, created_at, weight FROM relations",
609 )
610 .fetch_all(&self.pool)
611 .await
612 .map_err(|e| KgError::Database(e.to_string()))?;
613
614 let relations: Vec<Relation> = relation_rows.into_iter().map(|r| r.into()).collect();
615
616 let mut adj: HashMap<String, Vec<(String, Relation)>> = HashMap::new();
618 for rel in &relations {
619 adj.entry(rel.source_id.clone())
620 .or_default()
621 .push((rel.target_id.clone(), rel.clone()));
622 adj.entry(rel.target_id.clone())
623 .or_default()
624 .push((rel.source_id.clone(), rel.clone()));
625 }
626
627 let mut dist: HashMap<String, f32> = HashMap::new();
628 let mut prev: HashMap<String, (String, Relation)> = HashMap::new();
629 let initial_dist = f32::MAX;
630
631 for id in entities.keys() {
632 dist.insert(id.clone(), initial_dist);
633 }
634 dist.insert(from.to_string(), 0.0);
635
636 let mut queue: Vec<(f32, String)> = Vec::new();
637 queue.push((0.0, from.to_string()));
638
639 while let Some((_d, u)) = queue.pop() {
640 if let Some(neighbors) = adj.get(&u) {
641 for (v, rel) in neighbors {
642 let weight = self.weight_strategy.relation_cost(rel);
643 let alt = dist.get(&u).copied().unwrap_or(initial_dist) + weight;
644 if alt < dist.get(v).copied().unwrap_or(initial_dist) {
645 dist.insert(v.clone(), alt);
646 prev.insert(v.clone(), (u.clone(), rel.clone()));
647 queue.push((-alt, v.clone()));
649 }
650 }
651 }
652 queue.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
653 }
654
655 if !prev.contains_key(to) && from != to {
656 return Ok(None);
657 }
658
659 let mut path = Vec::new();
661 let mut current = to.to_string();
662 while current != from {
663 if let Some((prev_node, rel)) = prev.get(¤t) {
664 let entity = entities.get(¤t).cloned().unwrap();
665 path.push(PathStep { entity, relation: rel.clone() });
666 current = prev_node.clone();
667 } else {
668 break;
669 }
670 }
671 path.reverse();
673
674 Ok(Some(path))
675 }
676
677 async fn all_paths(
678 &self,
679 from: &str,
680 to: &str,
681 max_depth: u32,
682 ) -> Result<Vec<Vec<PathStep>>, KgError> {
683 if max_depth > self.max_path_search {
684 return Err(KgError::MaxDepthExceeded {
685 depth: max_depth,
686 max: self.max_path_search,
687 });
688 }
689
690 let entity_rows: Vec<EntityRow> = sqlx::query_as(
691 "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
692 version, source, tags_json, aliases_json FROM entities",
693 )
694 .fetch_all(&self.pool)
695 .await
696 .map_err(|e| KgError::Database(e.to_string()))?;
697
698 let entities: HashMap<String, Entity> =
699 entity_rows.into_iter().map(|r| (r.id.clone(), r.into())).collect();
700
701 let relation_rows: Vec<RelationRow> = sqlx::query_as(
702 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
703 provenance_json, valid_from, valid_to, created_at, weight FROM relations",
704 )
705 .fetch_all(&self.pool)
706 .await
707 .map_err(|e| KgError::Database(e.to_string()))?;
708
709 let relations: Vec<Relation> = relation_rows.into_iter().map(|r| r.into()).collect();
710
711 let mut adj: HashMap<String, Vec<(String, Relation)>> = HashMap::new();
713 for rel in &relations {
714 adj.entry(rel.source_id.clone())
715 .or_default()
716 .push((rel.target_id.clone(), rel.clone()));
717 adj.entry(rel.target_id.clone())
718 .or_default()
719 .push((rel.source_id.clone(), rel.clone()));
720 }
721
722 let mut all_paths: Vec<Vec<PathStep>> = Vec::new();
723 let mut visited: HashSet<String> = HashSet::new();
724 let mut current_path: Vec<PathStep> = Vec::new();
725
726 dfs_all_paths(
727 from,
728 to,
729 max_depth,
730 &entities,
731 &adj,
732 &mut visited,
733 &mut current_path,
734 &mut all_paths,
735 );
736
737 all_paths.sort_by(|a, b| {
738 let a_cost: f32 = a.iter().map(|step| self.weight_strategy.relation_cost(&step.relation)).sum();
739 let b_cost: f32 = b.iter().map(|step| self.weight_strategy.relation_cost(&step.relation)).sum();
740 a_cost.partial_cmp(&b_cost).unwrap_or(std::cmp::Ordering::Equal)
741 });
742
743 Ok(all_paths)
744 }
745
746 async fn batch_import(
749 &self,
750 entities: Vec<Entity>,
751 relations: Vec<Relation>,
752 ) -> Result<ImportResult, KgError> {
753 let mut txn = self.pool.begin().await.map_err(|e| KgError::Database(e.to_string()))?;
754
755 let outcome = {
757 let conn = std::ops::DerefMut::deref_mut(&mut txn);
758 let mut result = ImportResult::default();
759
760 for entity in &entities {
761 match batch_upsert_entity(conn, entity).await {
762 Ok(UpsertResult::Created) => result.entities_created += 1,
763 Ok(UpsertResult::Updated { conflicts, .. }) => {
764 result.entities_updated += 1;
765 result.conflicts.extend(conflicts);
766 }
767 Ok(UpsertResult::Unchanged) => result.entities_skipped += 1,
768 Err(e) => return Err(e),
769 }
770 }
771
772 for relation in &relations {
773 match batch_upsert_relation(conn, relation).await {
774 Ok(UpsertResult::Created) => result.relations_created += 1,
775 Ok(UpsertResult::Updated { .. }) => result.relations_updated += 1,
776 Ok(UpsertResult::Unchanged) => {}
777 Err(e) => return Err(e),
778 }
779 }
780
781 Ok(result)
782 };
783
784 match outcome {
785 Ok(result) => {
786 txn.commit().await.map_err(|e| KgError::Database(e.to_string()))?;
787 info!(
788 entities_created = %result.entities_created,
789 entities_updated = %result.entities_updated,
790 relations_created = %result.relations_created,
791 "batch import completed"
792 );
793 Ok(result)
794 }
795 Err(e) => {
796 let _ = txn.rollback().await;
797 Err(e)
798 }
799 }
800 }
801
802 async fn check_consistency(&self) -> Result<Vec<ConsistencyIssue>, KgError> {
805 let mut issues = Vec::new();
806
807 let orphans: Vec<OrphanRelationRow> = sqlx::query_as(
809 "SELECT r.id, r.source_id, r.target_id
810 FROM relations r
811 LEFT JOIN entities e1 ON r.source_id = e1.id
812 LEFT JOIN entities e2 ON r.target_id = e2.id
813 WHERE e1.id IS NULL OR e2.id IS NULL",
814 )
815 .fetch_all(&self.pool)
816 .await
817 .map_err(|e| KgError::Database(e.to_string()))?;
818
819 for o in orphans {
820 issues.push(ConsistencyIssue {
821 severity: IssueSeverity::Error,
822 issue_type: ConsistencyIssueType::OrphanRelation,
823 description: format!("Relation {} references a non-existent entity", o.id),
824 related_entities: vec![o.source_id, o.target_id],
825 related_relations: vec![o.id],
826 });
827 }
828
829 let self_refs: Vec<RelationRow> = sqlx::query_as(
831 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
832 provenance_json, valid_from, valid_to, created_at, weight
833 FROM relations WHERE source_id = target_id",
834 )
835 .fetch_all(&self.pool)
836 .await
837 .map_err(|e| KgError::Database(e.to_string()))?;
838
839 for rel in self_refs {
840 issues.push(ConsistencyIssue {
841 severity: IssueSeverity::Warning,
842 issue_type: ConsistencyIssueType::SelfReferencing,
843 description: format!("Relation {} self-references entity {}", rel.id, rel.source_id),
844 related_entities: vec![rel.source_id],
845 related_relations: vec![rel.id],
846 });
847 }
848
849 let orphan_entities: Vec<(String, String)> = sqlx::query_as(
851 "SELECT e.id, e.name FROM entities e
852 WHERE e.id NOT IN (SELECT source_id FROM relations)
853 AND e.id NOT IN (SELECT target_id FROM relations)",
854 )
855 .fetch_all(&self.pool)
856 .await
857 .map_err(|e| KgError::Database(e.to_string()))?;
858
859 for (id, name) in orphan_entities {
860 issues.push(ConsistencyIssue {
861 severity: IssueSeverity::Info,
862 issue_type: ConsistencyIssueType::OrphanEntity,
863 description: format!("Entity {} ({}) has no relations", name, id),
864 related_entities: vec![id],
865 related_relations: vec![],
866 });
867 }
868
869 let now = current_epoch_ms();
871 let expired: Vec<RelationRow> = sqlx::query_as(
872 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
873 provenance_json, valid_from, valid_to, created_at, weight
874 FROM relations WHERE valid_to IS NOT NULL AND valid_to < ?",
875 )
876 .bind(now as i64)
877 .fetch_all(&self.pool)
878 .await
879 .map_err(|e| KgError::Database(e.to_string()))?;
880
881 for rel in expired {
882 issues.push(ConsistencyIssue {
883 severity: IssueSeverity::Warning,
884 issue_type: ConsistencyIssueType::ExpiredRelation,
885 description: format!("Relation {} has expired (valid_to < now)", rel.id),
886 related_entities: vec![rel.source_id, rel.target_id],
887 related_relations: vec![rel.id],
888 });
889 }
890
891 Ok(issues)
892 }
893
894 async fn stats(&self) -> Result<GraphStats, KgError> {
897 let total_entities: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM entities")
898 .fetch_one(&self.pool)
899 .await
900 .map_err(|e| KgError::Database(e.to_string()))?;
901
902 let total_relations: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM relations")
903 .fetch_one(&self.pool)
904 .await
905 .map_err(|e| KgError::Database(e.to_string()))?;
906
907 let entity_types: Vec<(String, i64)> = sqlx::query_as(
908 "SELECT entity_type, COUNT(*) as cnt FROM entities GROUP BY entity_type",
909 )
910 .fetch_all(&self.pool)
911 .await
912 .map_err(|e| KgError::Database(e.to_string()))?;
913
914 let relation_types: Vec<(String, i64)> = sqlx::query_as(
915 "SELECT relation_type, COUNT(*) as cnt FROM relations GROUP BY relation_type",
916 )
917 .fetch_all(&self.pool)
918 .await
919 .map_err(|e| KgError::Database(e.to_string()))?;
920
921 let degrees: Vec<(i64,)> = sqlx::query_as(
923 "SELECT COUNT(*) FROM relations GROUP BY source_id
924 UNION ALL SELECT COUNT(*) FROM relations GROUP BY target_id",
925 )
926 .fetch_all(&self.pool)
927 .await
928 .map_err(|e| KgError::Database(e.to_string()))?;
929
930 let degree_values: Vec<usize> = degrees.into_iter().map(|d| d.0 as usize).collect();
931 let avg_degree = if degree_values.is_empty() {
932 0.0
933 } else {
934 degree_values.iter().sum::<usize>() as f64 / degree_values.len() as f64
935 };
936 let max_degree = degree_values.iter().max().copied().unwrap_or(0);
937
938 let orphan_entities: (i64,) = sqlx::query_as(
940 "SELECT COUNT(*) FROM entities e
941 WHERE e.id NOT IN (SELECT source_id FROM relations)
942 AND e.id NOT IN (SELECT target_id FROM relations)",
943 )
944 .fetch_one(&self.pool)
945 .await
946 .map_err(|e| KgError::Database(e.to_string()))?;
947
948 let db_size: (i64,) = sqlx::query_as(
950 "SELECT COALESCE(SUM(pgsize), 0) FROM dbstat",
951 )
952 .fetch_one(&self.pool)
953 .await
954 .map_err(|e| KgError::Database(e.to_string()))?;
955
956 Ok(GraphStats {
957 total_entities: total_entities.0 as usize,
958 total_relations: total_relations.0 as usize,
959 entity_types: entity_types.into_iter().map(|(k, v)| (k, v as usize)).collect(),
960 relation_types: relation_types.into_iter().map(|(k, v)| (k, v as usize)).collect(),
961 avg_degree,
962 max_degree,
963 orphan_entities: orphan_entities.0 as usize,
964 db_size_bytes: db_size.0 as u64,
965 })
966 }
967}
968
969#[allow(clippy::too_many_arguments)]
972fn dfs_all_paths(
973 current: &str,
974 target: &str,
975 max_depth: u32,
976 entities: &HashMap<String, Entity>,
977 adj: &HashMap<String, Vec<(String, Relation)>>,
978 visited: &mut HashSet<String>,
979 current_path: &mut Vec<PathStep>,
980 all_paths: &mut Vec<Vec<PathStep>>,
981) {
982 if current == target {
983 all_paths.push(current_path.clone());
984 return;
985 }
986 if current_path.len() >= max_depth as usize {
987 return;
988 }
989 visited.insert(current.to_string());
990
991 if let Some(neighbors) = adj.get(current) {
992 for (neighbor, rel) in neighbors {
993 if visited.contains(neighbor.as_str()) {
994 continue;
995 }
996 if let Some(entity) = entities.get(neighbor).cloned() {
997 current_path.push(PathStep {
998 entity,
999 relation: rel.clone(),
1000 });
1001 dfs_all_paths(
1002 neighbor, target, max_depth, entities, adj,
1003 visited, current_path, all_paths,
1004 );
1005 current_path.pop();
1006 }
1007 }
1008 }
1009
1010 visited.remove(current);
1011}
1012
1013async fn batch_upsert_entity(
1016 conn: &mut sqlx::SqliteConnection,
1017 entity: &Entity,
1018) -> Result<UpsertResult, KgError> {
1019 let existing: Option<EntityRow> = sqlx::query_as(
1020 "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
1021 version, source, tags_json, aliases_json
1022 FROM entities WHERE id = ?",
1023 )
1024 .bind(&entity.id)
1025 .fetch_optional(&mut *conn)
1026 .await
1027 .map_err(|e| KgError::Database(e.to_string()))?;
1028
1029 let attrs_json =
1030 serde_json::to_string(&entity.attributes).map_err(|e| KgError::Serialization(e.to_string()))?;
1031 let tags_json =
1032 serde_json::to_string(&entity.tags).map_err(|e| KgError::Serialization(e.to_string()))?;
1033 let aliases_json =
1034 serde_json::to_string(&entity.aliases).map_err(|e| KgError::Serialization(e.to_string()))?;
1035 let entity_type = entity.entity_type.as_str();
1036
1037 if let Some(row) = existing {
1038 let mut changed = Vec::new();
1039 let conflicts = Vec::new();
1040
1041 if row.name != entity.name {
1042 changed.push("name".into());
1043 }
1044 if row.entity_type != entity_type {
1045 changed.push("entity_type".into());
1046 }
1047
1048 if changed.is_empty() {
1049 return Ok(UpsertResult::Unchanged);
1050 }
1051
1052 sqlx::query(
1053 "UPDATE entities SET name=?, entity_type=?, attributes_json=?, description=?,
1054 updated_at=?, version=version+1, source=?, tags_json=?, aliases_json=?
1055 WHERE id=?",
1056 )
1057 .bind(&entity.name)
1058 .bind(&entity_type)
1059 .bind(&attrs_json)
1060 .bind(&entity.description)
1061 .bind(current_epoch_ms() as i64)
1062 .bind(&entity.source)
1063 .bind(&tags_json)
1064 .bind(&aliases_json)
1065 .bind(&entity.id)
1066 .execute(&mut *conn)
1067 .await
1068 .map_err(|e| KgError::Database(e.to_string()))?;
1069
1070 Ok(UpsertResult::Updated { changed_fields: changed, conflicts })
1071 } else {
1072 sqlx::query(
1073 "INSERT INTO entities (id, name, entity_type, attributes_json, description,
1074 created_at, updated_at, version, source, tags_json, aliases_json)
1075 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)",
1076 )
1077 .bind(&entity.id)
1078 .bind(&entity.name)
1079 .bind(&entity_type)
1080 .bind(&attrs_json)
1081 .bind(&entity.description)
1082 .bind(entity.created_at as i64)
1083 .bind(entity.updated_at as i64)
1084 .bind(&entity.source)
1085 .bind(&tags_json)
1086 .bind(&aliases_json)
1087 .execute(&mut *conn)
1088 .await
1089 .map_err(|e| KgError::Database(e.to_string()))?;
1090
1091 Ok(UpsertResult::Created)
1092 }
1093}
1094
1095async fn batch_upsert_relation(
1096 conn: &mut sqlx::SqliteConnection,
1097 relation: &Relation,
1098) -> Result<UpsertResult, KgError> {
1099 let existing: Option<RelationRow> = sqlx::query_as(
1100 "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
1101 provenance_json, valid_from, valid_to, created_at, weight
1102 FROM relations WHERE id = ?",
1103 )
1104 .bind(&relation.id)
1105 .fetch_optional(&mut *conn)
1106 .await
1107 .map_err(|e| KgError::Database(e.to_string()))?;
1108
1109 let props_json = serde_json::to_string(&relation.properties)
1110 .map_err(|e| KgError::Serialization(e.to_string()))?;
1111 let provenance_json = relation
1112 .provenance
1113 .as_ref()
1114 .map(|p| serde_json::to_string(p))
1115 .transpose()
1116 .map_err(|e| KgError::Serialization(e.to_string()))?;
1117
1118 if existing.is_some() {
1119 sqlx::query(
1120 "UPDATE relations SET relation_type=?, properties_json=?, confidence=?,
1121 provenance_json=?, valid_from=?, valid_to=?, weight=?
1122 WHERE id=?",
1123 )
1124 .bind(&relation.relation_type)
1125 .bind(&props_json)
1126 .bind(relation.confidence.as_f32())
1127 .bind(&provenance_json)
1128 .bind(relation.valid_from.map(|v| v as i64))
1129 .bind(relation.valid_to.map(|v| v as i64))
1130 .bind(relation.weight)
1131 .bind(&relation.id)
1132 .execute(&mut *conn)
1133 .await
1134 .map_err(|e| KgError::Database(e.to_string()))?;
1135
1136 Ok(UpsertResult::Updated {
1137 changed_fields: vec!["relation_type".into()],
1138 conflicts: vec![],
1139 })
1140 } else {
1141 sqlx::query(
1142 "INSERT INTO relations (id, source_id, target_id, relation_type, properties_json,
1143 confidence, provenance_json, valid_from, valid_to, created_at, weight)
1144 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1145 )
1146 .bind(&relation.id)
1147 .bind(&relation.source_id)
1148 .bind(&relation.target_id)
1149 .bind(&relation.relation_type)
1150 .bind(&props_json)
1151 .bind(relation.confidence.as_f32())
1152 .bind(&provenance_json)
1153 .bind(relation.valid_from.map(|v| v as i64))
1154 .bind(relation.valid_to.map(|v| v as i64))
1155 .bind(relation.created_at as i64)
1156 .bind(relation.weight)
1157 .execute(&mut *conn)
1158 .await
1159 .map_err(|e| KgError::Database(e.to_string()))?;
1160
1161 Ok(UpsertResult::Created)
1162 }
1163}
1164
1165#[derive(Debug, sqlx::FromRow)]
1168struct EntityRow {
1169 id: String,
1170 name: String,
1171 entity_type: String,
1172 attributes_json: String,
1173 description: Option<String>,
1174 created_at: i64,
1175 updated_at: i64,
1176 version: i64,
1177 source: Option<String>,
1178 tags_json: String,
1179 aliases_json: String,
1180}
1181
1182impl From<EntityRow> for Entity {
1183 fn from(r: EntityRow) -> Self {
1184 let attributes: HashMap<String, AttributeValue> =
1185 serde_json::from_str(&r.attributes_json).unwrap_or_default();
1186 let tags: Vec<String> = serde_json::from_str(&r.tags_json).unwrap_or_default();
1187 let aliases: Vec<String> = serde_json::from_str(&r.aliases_json).unwrap_or_default();
1188
1189 Self {
1190 id: r.id,
1191 name: r.name,
1192 entity_type: EntityType::from_str(&r.entity_type),
1193 attributes,
1194 description: r.description,
1195 created_at: r.created_at as u64,
1196 updated_at: r.updated_at as u64,
1197 version: r.version as u64,
1198 source: r.source,
1199 tags,
1200 aliases,
1201 }
1202 }
1203}
1204
1205#[derive(Debug, sqlx::FromRow)]
1206struct RelationRow {
1207 id: String,
1208 source_id: String,
1209 target_id: String,
1210 relation_type: String,
1211 properties_json: String,
1212 confidence: f32,
1213 provenance_json: Option<String>,
1214 valid_from: Option<i64>,
1215 valid_to: Option<i64>,
1216 created_at: i64,
1217 weight: Option<f32>,
1218}
1219
1220impl From<RelationRow> for Relation {
1221 fn from(r: RelationRow) -> Self {
1222 let properties: HashMap<String, String> =
1223 serde_json::from_str(&r.properties_json).unwrap_or_default();
1224 let provenance: Option<Provenance> = r
1225 .provenance_json
1226 .and_then(|j| serde_json::from_str(&j).ok());
1227
1228 Self {
1229 id: r.id,
1230 source_id: r.source_id,
1231 target_id: r.target_id,
1232 relation_type: r.relation_type,
1233 properties,
1234 confidence: Confidence::from_f32(r.confidence),
1235 provenance,
1236 valid_from: r.valid_from.map(|v| v as u64),
1237 valid_to: r.valid_to.map(|v| v as u64),
1238 created_at: r.created_at as u64,
1239 weight: r.weight,
1240 }
1241 }
1242}
1243
1244#[derive(Debug, sqlx::FromRow)]
1245struct OrphanRelationRow {
1246 id: String,
1247 source_id: String,
1248 target_id: String,
1249}
1250
1251fn current_epoch_ms() -> u64 {
1254 std::time::SystemTime::now()
1255 .duration_since(std::time::UNIX_EPOCH)
1256 .unwrap_or_default()
1257 .as_millis() as u64
1258}