Skip to main content

xz_knowledge_graph/store/
sqlite.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
4use tracing::{debug, info};
5
6use crate::config::KgConfig;
7use crate::error::KgError;
8use crate::store::sqlite_schema::{DDL, FTS_TRIGGERS};
9use crate::traits::KnowledgeGraph;
10use crate::types::attribute::AttributeValue;
11use crate::types::confidence::Confidence;
12use crate::types::consistency::{ConsistencyIssue, ConsistencyIssueType, IssueSeverity};
13use crate::types::entity::{Entity, EntityType};
14use crate::types::graph::{GraphStats, PathStep, SubGraph};
15use crate::types::import::{ImportResult, MergeStrategy, UpsertResult};
16use crate::types::provenance::Provenance;
17use crate::types::query::{
18    EntityPage, EntityQuery, RelationQuery,
19};
20use crate::types::relation::{Relation, WeightStrategy};
21
22/// SQLite-backed knowledge graph implementation.
23#[derive(Debug)]
24pub struct SqliteKnowledgeGraph {
25    pool: SqlitePool,
26    #[allow(dead_code)]
27    merge_strategy: MergeStrategy,
28    weight_strategy: WeightStrategy,
29    max_bfs_depth: u32,
30    max_path_search: u32,
31}
32
33impl SqliteKnowledgeGraph {
34    pub async fn new(path: &str, config: KgConfig) -> Result<Self, KgError> {
35        let pool = SqlitePoolOptions::new()
36            .max_connections(config.storage.pool_size)
37            .connect(&format!("sqlite:{}", path))
38            .await
39            .map_err(|e| KgError::Database(e.to_string()))?;
40
41        sqlx::query("PRAGMA journal_mode=WAL")
42            .execute(&pool)
43            .await
44            .map_err(|e| KgError::Database(e.to_string()))?;
45
46        let this = Self {
47            pool,
48            merge_strategy: config.merge_strategy,
49            weight_strategy: config.weight_strategy,
50            max_bfs_depth: config.max_bfs_depth,
51            max_path_search: config.max_path_search,
52        };
53
54        this.run_migrations().await?;
55        Ok(this)
56    }
57
58    async fn run_migrations(&self) -> Result<(), KgError> {
59        for stmt in DDL {
60            sqlx::query(stmt)
61                .execute(&self.pool)
62                .await
63                .map_err(|e| KgError::Database(format!("Migration failed: {}", e)))?;
64        }
65        for stmt in FTS_TRIGGERS {
66            let _ = sqlx::query(stmt).execute(&self.pool).await;
67        }
68        debug!("sqlite schema migrations complete");
69        Ok(())
70    }
71}
72
73#[async_trait::async_trait]
74impl KnowledgeGraph for SqliteKnowledgeGraph {
75    // === Entity Operations ===
76
77    async fn upsert_entity(&self, entity: Entity) -> Result<UpsertResult, KgError> {
78        let existing: Option<EntityRow> = sqlx::query_as(
79            "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
80                    version, source, tags_json, aliases_json
81             FROM entities WHERE id = ?",
82        )
83        .bind(&entity.id)
84        .fetch_optional(&self.pool)
85        .await
86        .map_err(|e| KgError::Database(e.to_string()))?;
87
88        let attrs_json =
89            serde_json::to_string(&entity.attributes).map_err(|e| KgError::Serialization(e.to_string()))?;
90        let tags_json =
91            serde_json::to_string(&entity.tags).map_err(|e| KgError::Serialization(e.to_string()))?;
92        let aliases_json =
93            serde_json::to_string(&entity.aliases).map_err(|e| KgError::Serialization(e.to_string()))?;
94        let entity_type = entity.entity_type.as_str();
95
96        if let Some(row) = existing {
97            let mut changed = Vec::new();
98            let conflicts = Vec::new();
99
100            if row.name != entity.name {
101                changed.push("name".into());
102            }
103            if row.entity_type != entity_type {
104                changed.push("entity_type".into());
105            }
106
107            if changed.is_empty() {
108                return Ok(UpsertResult::Unchanged);
109            }
110
111            sqlx::query(
112                "UPDATE entities SET name=?, entity_type=?, attributes_json=?, description=?,
113                 updated_at=?, version=version+1, source=?, tags_json=?, aliases_json=?
114                 WHERE id=?",
115            )
116            .bind(&entity.name)
117            .bind(&entity_type)
118            .bind(&attrs_json)
119            .bind(&entity.description)
120            .bind(current_epoch_ms() as i64)
121            .bind(&entity.source)
122            .bind(&tags_json)
123            .bind(&aliases_json)
124            .bind(&entity.id)
125            .execute(&self.pool)
126            .await
127            .map_err(|e| KgError::Database(e.to_string()))?;
128
129            Ok(UpsertResult::Updated { changed_fields: changed, conflicts })
130        } else {
131            sqlx::query(
132                "INSERT INTO entities (id, name, entity_type, attributes_json, description,
133                 created_at, updated_at, version, source, tags_json, aliases_json)
134                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)",
135            )
136            .bind(&entity.id)
137            .bind(&entity.name)
138            .bind(&entity_type)
139            .bind(&attrs_json)
140            .bind(&entity.description)
141            .bind(entity.created_at as i64)
142            .bind(entity.updated_at as i64)
143            .bind(&entity.source)
144            .bind(&tags_json)
145            .bind(&aliases_json)
146            .execute(&self.pool)
147            .await
148            .map_err(|e| KgError::Database(e.to_string()))?;
149
150            Ok(UpsertResult::Created)
151        }
152    }
153
154    async fn get_entity(&self, id: &str) -> Result<Option<Entity>, KgError> {
155        let row: Option<EntityRow> = sqlx::query_as(
156            "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
157                    version, source, tags_json, aliases_json
158             FROM entities WHERE id = ?",
159        )
160        .bind(id)
161        .fetch_optional(&self.pool)
162        .await
163        .map_err(|e| KgError::Database(e.to_string()))?;
164
165        Ok(row.map(|r| r.into()))
166    }
167
168    async fn search_entities(&self, query: &EntityQuery) -> Result<EntityPage, KgError> {
169        let use_fts = query.name_contains.is_some();
170
171        let select_cols = "e.id, e.name, e.entity_type, e.attributes_json, e.description, \
172            e.created_at, e.updated_at, e.version, e.source, e.tags_json, e.aliases_json";
173
174        let mut sql = if use_fts {
175            format!(
176                "SELECT {} FROM entities e JOIN entities_fts fts ON e.rowid = fts.rowid WHERE entities_fts MATCH ?",
177                select_cols
178            )
179        } else {
180            format!("SELECT {} FROM entities e WHERE 1=1", select_cols)
181        };
182        let mut params: Vec<String> = Vec::new();
183
184        if use_fts {
185            // FTS5 query: append * for prefix matching
186            let fts_query = format!("{}*", query.name_contains.as_ref().unwrap());
187            params.push(fts_query);
188        } else if let Some(ref name) = query.name_contains {
189            params.push(format!("%{}%", name));
190            sql.push_str(" AND e.name LIKE ?");
191        }
192
193        if let Some(ref aliases) = query.alias_contains {
194            params.push(format!("%{}%", aliases));
195            sql.push_str(" AND e.aliases_json LIKE ?");
196        }
197        if let Some(ref types) = query.entity_types {
198            if !types.is_empty() {
199                let type_strs: Vec<String> = types.iter().map(|t| t.as_str()).collect();
200                let placeholders: Vec<String> = type_strs.iter().map(|_| "?".to_string()).collect();
201                sql.push_str(&format!(" AND e.entity_type IN ({})", placeholders.join(",")));
202                params.extend(type_strs);
203            }
204        }
205        if let Some(ref source) = query.source {
206            params.push(source.clone());
207            sql.push_str(" AND e.source = ?");
208        }
209        // Tag filter
210        if let Some(ref tag_filter) = query.tags {
211            if !tag_filter.tags.is_empty() {
212                match tag_filter.mode {
213                    crate::types::query::TagFilterMode::Or => {
214                        let tag_conditions: Vec<String> = tag_filter.tags.iter().map(|_| {
215                            "e.tags_json LIKE '%' || ? || '%'".to_string()
216                        }).collect();
217                        sql.push_str(&format!(" AND ({})", tag_conditions.join(" OR ")));
218                        params.extend(tag_filter.tags.iter().cloned());
219                    }
220                    crate::types::query::TagFilterMode::And => {
221                        for tag in &tag_filter.tags {
222                            sql.push_str(" AND e.tags_json LIKE '%' || ? || '%'");
223                            params.push(tag.clone());
224                        }
225                    }
226                }
227            }
228        }
229        // Attribute filters
230        for attr in &query.attribute_filters {
231            let json_path = format!("$.{}", attr.key);
232            match attr.operator {
233                crate::types::query::FilterOperator::Eq => {
234                    sql.push_str(" AND json_extract(e.attributes_json, ?) = ?");
235                    params.push(json_path);
236                    params.push(attr.value.clone());
237                }
238                crate::types::query::FilterOperator::Contains => {
239                    sql.push_str(" AND json_extract(e.attributes_json, ?) LIKE '%' || ? || '%'");
240                    params.push(json_path);
241                    params.push(attr.value.clone());
242                }
243                _ => {
244                    // Other operators: use JSON value comparison
245                    sql.push_str(" AND json_extract(e.attributes_json, ?) = ?");
246                    params.push(json_path);
247                    params.push(attr.value.clone());
248                }
249            }
250        }
251
252        // Count
253        let count_sql = sql.replace(
254            &format!("SELECT {}", select_cols),
255            "SELECT COUNT(*)",
256        );
257
258        let mut count_query = sqlx::query_scalar(&count_sql);
259        for p in &params {
260            count_query = count_query.bind(p);
261        }
262        let total: i64 = count_query
263            .fetch_one(&self.pool)
264            .await
265            .map_err(|e| KgError::Database(e.to_string()))?;
266
267        // Sort
268        let order = match query.sort_by {
269            Some(crate::types::query::EntitySortField::Name) => "e.name ASC",
270            Some(crate::types::query::EntitySortField::CreatedAt) => "e.created_at DESC",
271            Some(crate::types::query::EntitySortField::UpdatedAt) => "e.updated_at DESC",
272            Some(crate::types::query::EntitySortField::EntityType) => "e.entity_type ASC",
273            Some(crate::types::query::EntitySortField::RelationCount) => "e.updated_at DESC",
274            None => {
275                if use_fts { "ORDER BY rank" } else { "ORDER BY e.updated_at DESC" }
276            }
277        };
278        sql.push_str(&format!(" {} LIMIT ? OFFSET ?", order));
279
280        let mut fetch_query = sqlx::query_as::<_, EntityRow>(&sql);
281        for p in &params {
282            fetch_query = fetch_query.bind(p);
283        }
284        fetch_query = fetch_query
285            .bind(query.page.limit as i64)
286            .bind(query.page.offset as i64);
287
288        let rows: Vec<EntityRow> = fetch_query
289            .fetch_all(&self.pool)
290            .await
291            .map_err(|e| KgError::Database(e.to_string()))?;
292
293        let total = total as usize;
294        let items: Vec<Entity> = rows.into_iter().map(|r| r.into()).collect();
295        let has_more = query.page.offset + query.page.limit < total;
296
297        Ok(EntityPage { items, total, has_more })
298    }
299
300    async fn delete_entity(&self, id: &str) -> Result<usize, KgError> {
301        let mut txn = self.pool.begin().await.map_err(|e| KgError::Database(e.to_string()))?;
302
303        let outcome = {
304            let conn = std::ops::DerefMut::deref_mut(&mut txn);
305
306            let relation_count: (i64,) = sqlx::query_as(
307                "SELECT COUNT(*) FROM relations WHERE source_id = ? OR target_id = ?",
308            )
309            .bind(id)
310            .bind(id)
311            .fetch_one(&mut *conn)
312            .await
313            .map_err(|e| KgError::Database(e.to_string()))?;
314
315            sqlx::query("DELETE FROM relations WHERE source_id = ? OR target_id = ?")
316                .bind(id)
317                .bind(id)
318                .execute(&mut *conn)
319                .await
320                .map_err(|e| KgError::Database(e.to_string()))?;
321
322            sqlx::query("DELETE FROM entities WHERE id = ?")
323                .bind(id)
324                .execute(&mut *conn)
325                .await
326                .map_err(|e| KgError::Database(e.to_string()))?;
327
328            Ok::<usize, KgError>(relation_count.0 as usize)
329        };
330
331        match outcome {
332            Ok(count) => {
333                txn.commit().await.map_err(|e| KgError::Database(e.to_string()))?;
334                Ok(count)
335            }
336            Err(e) => {
337                let _ = txn.rollback().await;
338                Err(e)
339            }
340        }
341    }
342
343    async fn get_entities_batch(&self, ids: &[&str]) -> Result<Vec<Entity>, KgError> {
344        if ids.is_empty() {
345            return Ok(vec![]);
346        }
347        let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
348        let sql = format!(
349            "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
350                    version, source, tags_json, aliases_json
351             FROM entities WHERE id IN ({})",
352            placeholders.join(",")
353        );
354
355        let mut query = sqlx::query_as::<_, EntityRow>(&sql);
356        for id in ids {
357            query = query.bind(id);
358        }
359
360        let rows: Vec<EntityRow> = query
361            .fetch_all(&self.pool)
362            .await
363            .map_err(|e| KgError::Database(e.to_string()))?;
364
365        Ok(rows.into_iter().map(|r| r.into()).collect())
366    }
367
368    // === Relation Operations ===
369
370    async fn upsert_relation(&self, relation: Relation) -> Result<UpsertResult, KgError> {
371        let existing: Option<RelationRow> = sqlx::query_as(
372            "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
373                    provenance_json, valid_from, valid_to, created_at, weight
374             FROM relations WHERE id = ?",
375        )
376        .bind(&relation.id)
377        .fetch_optional(&self.pool)
378        .await
379        .map_err(|e| KgError::Database(e.to_string()))?;
380
381        let props_json = serde_json::to_string(&relation.properties)
382            .map_err(|e| KgError::Serialization(e.to_string()))?;
383        let provenance_json = relation
384            .provenance
385            .as_ref()
386            .map(|p| serde_json::to_string(p))
387            .transpose()
388            .map_err(|e| KgError::Serialization(e.to_string()))?;
389
390        if existing.is_some() {
391            sqlx::query(
392                "UPDATE relations SET relation_type=?, properties_json=?, confidence=?,
393                 provenance_json=?, valid_from=?, valid_to=?, weight=?
394                 WHERE id=?",
395            )
396            .bind(&relation.relation_type)
397            .bind(&props_json)
398            .bind(relation.confidence.as_f32())
399            .bind(&provenance_json)
400            .bind(relation.valid_from.map(|v| v as i64))
401            .bind(relation.valid_to.map(|v| v as i64))
402            .bind(relation.weight)
403            .bind(&relation.id)
404            .execute(&self.pool)
405            .await
406            .map_err(|e| KgError::Database(e.to_string()))?;
407
408            Ok(UpsertResult::Updated {
409                changed_fields: vec!["relation_type".into()],
410                conflicts: vec![],
411            })
412        } else {
413            sqlx::query(
414                "INSERT INTO relations (id, source_id, target_id, relation_type, properties_json,
415                 confidence, provenance_json, valid_from, valid_to, created_at, weight)
416                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
417            )
418            .bind(&relation.id)
419            .bind(&relation.source_id)
420            .bind(&relation.target_id)
421            .bind(&relation.relation_type)
422            .bind(&props_json)
423            .bind(relation.confidence.as_f32())
424            .bind(&provenance_json)
425            .bind(relation.valid_from.map(|v| v as i64))
426            .bind(relation.valid_to.map(|v| v as i64))
427            .bind(relation.created_at as i64)
428            .bind(relation.weight)
429            .execute(&self.pool)
430            .await
431            .map_err(|e| KgError::Database(e.to_string()))?;
432
433            Ok(UpsertResult::Created)
434        }
435    }
436
437    async fn get_relations(&self, entity_id: &str) -> Result<Vec<Relation>, KgError> {
438        let rows: Vec<RelationRow> = sqlx::query_as(
439            "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
440                    provenance_json, valid_from, valid_to, created_at, weight
441             FROM relations WHERE source_id = ? OR target_id = ?",
442        )
443        .bind(entity_id)
444        .bind(entity_id)
445        .fetch_all(&self.pool)
446        .await
447        .map_err(|e| KgError::Database(e.to_string()))?;
448
449        Ok(rows.into_iter().map(|r| r.into()).collect())
450    }
451
452    async fn query_relations(&self, query: &RelationQuery) -> Result<Vec<Relation>, KgError> {
453        let mut sql = String::from(
454            "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
455                    provenance_json, valid_from, valid_to, created_at, weight
456             FROM relations WHERE 1=1",
457        );
458        let mut params: Vec<String> = Vec::new();
459
460        if let Some(ref sid) = query.source_id {
461            params.push(sid.clone());
462            sql.push_str(" AND source_id = ?");
463        }
464        if let Some(ref tid) = query.target_id {
465            params.push(tid.clone());
466            sql.push_str(" AND target_id = ?");
467        }
468        if let Some(ref eid) = query.entity_id {
469            params.push(eid.clone());
470            params.push(eid.clone());
471            sql.push_str(" AND (source_id = ? OR target_id = ?)");
472        }
473        if let Some(ref rt) = query.relation_type {
474            params.push(rt.clone());
475            sql.push_str(" AND relation_type = ?");
476        }
477        if let Some(ref rts) = query.relation_types {
478            if !rts.is_empty() {
479                let placeholders: Vec<String> = rts.iter().map(|_| "?".to_string()).collect();
480                sql.push_str(&format!(" AND relation_type IN ({})", placeholders.join(",")));
481                params.extend(rts.iter().cloned());
482            }
483        }
484        if let Some(ref min_conf) = query.min_confidence {
485            sql.push_str(" AND confidence >= ?");
486            params.push(min_conf.as_f32().to_string());
487        }
488        if let Some(valid_at) = query.valid_at {
489            sql.push_str(" AND (valid_from IS NULL OR valid_from <= ?) AND (valid_to IS NULL OR valid_to >= ?)");
490            params.push(valid_at.to_string());
491            params.push(valid_at.to_string());
492        }
493
494        sql.push_str(" LIMIT ? OFFSET ?");
495
496        let mut fetch_query = sqlx::query_as::<_, RelationRow>(&sql);
497        for p in &params {
498            fetch_query = fetch_query.bind(p);
499        }
500        fetch_query = fetch_query
501            .bind(query.page.limit as i64)
502            .bind(query.page.offset as i64);
503
504        let rows: Vec<RelationRow> = fetch_query
505            .fetch_all(&self.pool)
506            .await
507            .map_err(|e| KgError::Database(e.to_string()))?;
508
509        Ok(rows.into_iter().map(|r| r.into()).collect())
510    }
511
512    async fn delete_relation(&self, id: &str) -> Result<(), KgError> {
513        let result = sqlx::query("DELETE FROM relations WHERE id = ?")
514            .bind(id)
515            .execute(&self.pool)
516            .await
517            .map_err(|e| KgError::Database(e.to_string()))?;
518
519        if result.rows_affected() == 0 {
520            return Err(KgError::RelationNotFound(id.to_string()));
521        }
522        Ok(())
523    }
524
525    // === Graph Traversal ===
526
527    async fn get_neighbors(&self, entity_id: &str, depth: u32) -> Result<SubGraph, KgError> {
528        if depth > self.max_bfs_depth {
529            return Err(KgError::MaxDepthExceeded {
530                depth,
531                max: self.max_bfs_depth,
532            });
533        }
534
535        let center = self
536            .get_entity(entity_id)
537            .await?
538            .ok_or_else(|| KgError::EntityNotFound(entity_id.to_string()))?;
539
540        let mut visited_entities: HashMap<String, Entity> = HashMap::new();
541        let mut visited_relations: Vec<Relation> = Vec::new();
542        let mut queue: VecDeque<(String, u32)> = VecDeque::new();
543
544        visited_entities.insert(entity_id.to_string(), center.clone());
545        queue.push_back((entity_id.to_string(), 0));
546
547        while let Some((current_id, current_depth)) = queue.pop_front() {
548            if current_depth >= depth {
549                continue;
550            }
551
552            // Get all relations for the current entity
553            let relations = self.get_relations(&current_id).await?;
554            for rel in relations {
555                let neighbor_id = if rel.source_id == current_id {
556                    rel.target_id.clone()
557                } else {
558                    rel.source_id.clone()
559                };
560
561                visited_relations.push(rel);
562
563                if !visited_entities.contains_key(&neighbor_id) {
564                    if let Some(entity) = self.get_entity(&neighbor_id).await? {
565                        visited_entities.insert(neighbor_id.clone(), entity);
566                        queue.push_back((neighbor_id, current_depth + 1));
567                    }
568                }
569            }
570        }
571
572        let entities: Vec<Entity> = visited_entities
573            .into_iter()
574            .filter(|(id, _)| id != entity_id)
575            .map(|(_, e)| e)
576            .collect();
577
578        Ok(SubGraph {
579            center,
580            entities,
581            relations: visited_relations,
582        })
583    }
584
585    async fn shortest_path(
586        &self,
587        from: &str,
588        to: &str,
589    ) -> Result<Option<Vec<PathStep>>, KgError> {
590        if from == to {
591            return Ok(Some(vec![]));
592        }
593
594        // Load all entities and relations into memory for path finding
595        let entity_rows: Vec<EntityRow> = sqlx::query_as(
596            "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
597                    version, source, tags_json, aliases_json FROM entities",
598        )
599        .fetch_all(&self.pool)
600        .await
601        .map_err(|e| KgError::Database(e.to_string()))?;
602
603        let entities: HashMap<String, Entity> =
604            entity_rows.into_iter().map(|r| (r.id.clone(), r.into())).collect();
605
606        let relation_rows: Vec<RelationRow> = sqlx::query_as(
607            "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
608                    provenance_json, valid_from, valid_to, created_at, weight FROM relations",
609        )
610        .fetch_all(&self.pool)
611        .await
612        .map_err(|e| KgError::Database(e.to_string()))?;
613
614        let relations: Vec<Relation> = relation_rows.into_iter().map(|r| r.into()).collect();
615
616        // Build adjacency lists
617        let mut adj: HashMap<String, Vec<(String, Relation)>> = HashMap::new();
618        for rel in &relations {
619            adj.entry(rel.source_id.clone())
620                .or_default()
621                .push((rel.target_id.clone(), rel.clone()));
622            adj.entry(rel.target_id.clone())
623                .or_default()
624                .push((rel.source_id.clone(), rel.clone()));
625        }
626
627        let mut dist: HashMap<String, f32> = HashMap::new();
628        let mut prev: HashMap<String, (String, Relation)> = HashMap::new();
629        let initial_dist = f32::MAX;
630
631        for id in entities.keys() {
632            dist.insert(id.clone(), initial_dist);
633        }
634        dist.insert(from.to_string(), 0.0);
635
636        let mut queue: Vec<(f32, String)> = Vec::new();
637        queue.push((0.0, from.to_string()));
638
639        while let Some((_d, u)) = queue.pop() {
640            if let Some(neighbors) = adj.get(&u) {
641                for (v, rel) in neighbors {
642                    let weight = self.weight_strategy.relation_cost(rel);
643                    let alt = dist.get(&u).copied().unwrap_or(initial_dist) + weight;
644                    if alt < dist.get(v).copied().unwrap_or(initial_dist) {
645                        dist.insert(v.clone(), alt);
646                        prev.insert(v.clone(), (u.clone(), rel.clone()));
647                        // Push negative so we pop smallest (min-heap via sort)
648                        queue.push((-alt, v.clone()));
649                    }
650                }
651            }
652            queue.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
653        }
654
655        if !prev.contains_key(to) && from != to {
656            return Ok(None);
657        }
658
659        // Reconstruct path
660        let mut path = Vec::new();
661        let mut current = to.to_string();
662        while current != from {
663            if let Some((prev_node, rel)) = prev.get(&current) {
664                let entity = entities.get(&current).cloned().unwrap();
665                path.push(PathStep { entity, relation: rel.clone() });
666                current = prev_node.clone();
667            } else {
668                break;
669            }
670        }
671        // Add the starting entity
672        path.reverse();
673
674        Ok(Some(path))
675    }
676
677    async fn all_paths(
678        &self,
679        from: &str,
680        to: &str,
681        max_depth: u32,
682    ) -> Result<Vec<Vec<PathStep>>, KgError> {
683        if max_depth > self.max_path_search {
684            return Err(KgError::MaxDepthExceeded {
685                depth: max_depth,
686                max: self.max_path_search,
687            });
688        }
689
690        let entity_rows: Vec<EntityRow> = sqlx::query_as(
691            "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
692                    version, source, tags_json, aliases_json FROM entities",
693        )
694        .fetch_all(&self.pool)
695        .await
696        .map_err(|e| KgError::Database(e.to_string()))?;
697
698        let entities: HashMap<String, Entity> =
699            entity_rows.into_iter().map(|r| (r.id.clone(), r.into())).collect();
700
701        let relation_rows: Vec<RelationRow> = sqlx::query_as(
702            "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
703                    provenance_json, valid_from, valid_to, created_at, weight FROM relations",
704        )
705        .fetch_all(&self.pool)
706        .await
707        .map_err(|e| KgError::Database(e.to_string()))?;
708
709        let relations: Vec<Relation> = relation_rows.into_iter().map(|r| r.into()).collect();
710
711        // Build adjacency lists
712        let mut adj: HashMap<String, Vec<(String, Relation)>> = HashMap::new();
713        for rel in &relations {
714            adj.entry(rel.source_id.clone())
715                .or_default()
716                .push((rel.target_id.clone(), rel.clone()));
717            adj.entry(rel.target_id.clone())
718                .or_default()
719                .push((rel.source_id.clone(), rel.clone()));
720        }
721
722        let mut all_paths: Vec<Vec<PathStep>> = Vec::new();
723        let mut visited: HashSet<String> = HashSet::new();
724        let mut current_path: Vec<PathStep> = Vec::new();
725
726        dfs_all_paths(
727            from,
728            to,
729            max_depth,
730            &entities,
731            &adj,
732            &mut visited,
733            &mut current_path,
734            &mut all_paths,
735        );
736
737        all_paths.sort_by(|a, b| {
738            let a_cost: f32 = a.iter().map(|step| self.weight_strategy.relation_cost(&step.relation)).sum();
739            let b_cost: f32 = b.iter().map(|step| self.weight_strategy.relation_cost(&step.relation)).sum();
740            a_cost.partial_cmp(&b_cost).unwrap_or(std::cmp::Ordering::Equal)
741        });
742
743        Ok(all_paths)
744    }
745
746    // === Batch Operations ===
747
748    async fn batch_import(
749        &self,
750        entities: Vec<Entity>,
751        relations: Vec<Relation>,
752    ) -> Result<ImportResult, KgError> {
753        let mut txn = self.pool.begin().await.map_err(|e| KgError::Database(e.to_string()))?;
754
755        // Scoped to allow fallback to rollback after conn is dropped
756        let outcome = {
757            let conn = std::ops::DerefMut::deref_mut(&mut txn);
758            let mut result = ImportResult::default();
759
760            for entity in &entities {
761                match batch_upsert_entity(conn, entity).await {
762                    Ok(UpsertResult::Created) => result.entities_created += 1,
763                    Ok(UpsertResult::Updated { conflicts, .. }) => {
764                        result.entities_updated += 1;
765                        result.conflicts.extend(conflicts);
766                    }
767                    Ok(UpsertResult::Unchanged) => result.entities_skipped += 1,
768                    Err(e) => return Err(e),
769                }
770            }
771
772            for relation in &relations {
773                match batch_upsert_relation(conn, relation).await {
774                    Ok(UpsertResult::Created) => result.relations_created += 1,
775                    Ok(UpsertResult::Updated { .. }) => result.relations_updated += 1,
776                    Ok(UpsertResult::Unchanged) => {}
777                    Err(e) => return Err(e),
778                }
779            }
780
781            Ok(result)
782        };
783
784        match outcome {
785            Ok(result) => {
786                txn.commit().await.map_err(|e| KgError::Database(e.to_string()))?;
787                info!(
788                    entities_created = %result.entities_created,
789                    entities_updated = %result.entities_updated,
790                    relations_created = %result.relations_created,
791                    "batch import completed"
792                );
793                Ok(result)
794            }
795            Err(e) => {
796                let _ = txn.rollback().await;
797                Err(e)
798            }
799        }
800    }
801
802    // === Consistency ===
803
804    async fn check_consistency(&self) -> Result<Vec<ConsistencyIssue>, KgError> {
805        let mut issues = Vec::new();
806
807        // Check 1: Orphan relations
808        let orphans: Vec<OrphanRelationRow> = sqlx::query_as(
809            "SELECT r.id, r.source_id, r.target_id
810             FROM relations r
811             LEFT JOIN entities e1 ON r.source_id = e1.id
812             LEFT JOIN entities e2 ON r.target_id = e2.id
813             WHERE e1.id IS NULL OR e2.id IS NULL",
814        )
815        .fetch_all(&self.pool)
816        .await
817        .map_err(|e| KgError::Database(e.to_string()))?;
818
819        for o in orphans {
820            issues.push(ConsistencyIssue {
821                severity: IssueSeverity::Error,
822                issue_type: ConsistencyIssueType::OrphanRelation,
823                description: format!("Relation {} references a non-existent entity", o.id),
824                related_entities: vec![o.source_id, o.target_id],
825                related_relations: vec![o.id],
826            });
827        }
828
829        // Check 2: Self-referencing
830        let self_refs: Vec<RelationRow> = sqlx::query_as(
831            "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
832                    provenance_json, valid_from, valid_to, created_at, weight
833             FROM relations WHERE source_id = target_id",
834        )
835        .fetch_all(&self.pool)
836        .await
837        .map_err(|e| KgError::Database(e.to_string()))?;
838
839        for rel in self_refs {
840            issues.push(ConsistencyIssue {
841                severity: IssueSeverity::Warning,
842                issue_type: ConsistencyIssueType::SelfReferencing,
843                description: format!("Relation {} self-references entity {}", rel.id, rel.source_id),
844                related_entities: vec![rel.source_id],
845                related_relations: vec![rel.id],
846            });
847        }
848
849        // Check 3: Orphan entities
850        let orphan_entities: Vec<(String, String)> = sqlx::query_as(
851            "SELECT e.id, e.name FROM entities e
852             WHERE e.id NOT IN (SELECT source_id FROM relations)
853               AND e.id NOT IN (SELECT target_id FROM relations)",
854        )
855        .fetch_all(&self.pool)
856        .await
857        .map_err(|e| KgError::Database(e.to_string()))?;
858
859        for (id, name) in orphan_entities {
860            issues.push(ConsistencyIssue {
861                severity: IssueSeverity::Info,
862                issue_type: ConsistencyIssueType::OrphanEntity,
863                description: format!("Entity {} ({}) has no relations", name, id),
864                related_entities: vec![id],
865                related_relations: vec![],
866            });
867        }
868
869        // Check 4: Expired relations
870        let now = current_epoch_ms();
871        let expired: Vec<RelationRow> = sqlx::query_as(
872            "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
873                    provenance_json, valid_from, valid_to, created_at, weight
874             FROM relations WHERE valid_to IS NOT NULL AND valid_to < ?",
875        )
876        .bind(now as i64)
877        .fetch_all(&self.pool)
878        .await
879        .map_err(|e| KgError::Database(e.to_string()))?;
880
881        for rel in expired {
882            issues.push(ConsistencyIssue {
883                severity: IssueSeverity::Warning,
884                issue_type: ConsistencyIssueType::ExpiredRelation,
885                description: format!("Relation {} has expired (valid_to < now)", rel.id),
886                related_entities: vec![rel.source_id, rel.target_id],
887                related_relations: vec![rel.id],
888            });
889        }
890
891        Ok(issues)
892    }
893
894    // === Statistics ===
895
896    async fn stats(&self) -> Result<GraphStats, KgError> {
897        let total_entities: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM entities")
898            .fetch_one(&self.pool)
899            .await
900            .map_err(|e| KgError::Database(e.to_string()))?;
901
902        let total_relations: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM relations")
903            .fetch_one(&self.pool)
904            .await
905            .map_err(|e| KgError::Database(e.to_string()))?;
906
907        let entity_types: Vec<(String, i64)> = sqlx::query_as(
908            "SELECT entity_type, COUNT(*) as cnt FROM entities GROUP BY entity_type",
909        )
910        .fetch_all(&self.pool)
911        .await
912        .map_err(|e| KgError::Database(e.to_string()))?;
913
914        let relation_types: Vec<(String, i64)> = sqlx::query_as(
915            "SELECT relation_type, COUNT(*) as cnt FROM relations GROUP BY relation_type",
916        )
917        .fetch_all(&self.pool)
918        .await
919        .map_err(|e| KgError::Database(e.to_string()))?;
920
921        // Calculate degrees
922        let degrees: Vec<(i64,)> = sqlx::query_as(
923            "SELECT COUNT(*) FROM relations GROUP BY source_id
924             UNION ALL SELECT COUNT(*) FROM relations GROUP BY target_id",
925        )
926        .fetch_all(&self.pool)
927        .await
928        .map_err(|e| KgError::Database(e.to_string()))?;
929
930        let degree_values: Vec<usize> = degrees.into_iter().map(|d| d.0 as usize).collect();
931        let avg_degree = if degree_values.is_empty() {
932            0.0
933        } else {
934            degree_values.iter().sum::<usize>() as f64 / degree_values.len() as f64
935        };
936        let max_degree = degree_values.iter().max().copied().unwrap_or(0);
937
938        // Orphan entities
939        let orphan_entities: (i64,) = sqlx::query_as(
940            "SELECT COUNT(*) FROM entities e
941             WHERE e.id NOT IN (SELECT source_id FROM relations)
942               AND e.id NOT IN (SELECT target_id FROM relations)",
943        )
944        .fetch_one(&self.pool)
945        .await
946        .map_err(|e| KgError::Database(e.to_string()))?;
947
948        // DB size
949        let db_size: (i64,) = sqlx::query_as(
950            "SELECT COALESCE(SUM(pgsize), 0) FROM dbstat",
951        )
952        .fetch_one(&self.pool)
953        .await
954        .map_err(|e| KgError::Database(e.to_string()))?;
955
956        Ok(GraphStats {
957            total_entities: total_entities.0 as usize,
958            total_relations: total_relations.0 as usize,
959            entity_types: entity_types.into_iter().map(|(k, v)| (k, v as usize)).collect(),
960            relation_types: relation_types.into_iter().map(|(k, v)| (k, v as usize)).collect(),
961            avg_degree,
962            max_degree,
963            orphan_entities: orphan_entities.0 as usize,
964            db_size_bytes: db_size.0 as u64,
965        })
966    }
967}
968
969// === DFS helper ===
970
971#[allow(clippy::too_many_arguments)]
972fn dfs_all_paths(
973    current: &str,
974    target: &str,
975    max_depth: u32,
976    entities: &HashMap<String, Entity>,
977    adj: &HashMap<String, Vec<(String, Relation)>>,
978    visited: &mut HashSet<String>,
979    current_path: &mut Vec<PathStep>,
980    all_paths: &mut Vec<Vec<PathStep>>,
981) {
982    if current == target {
983        all_paths.push(current_path.clone());
984        return;
985    }
986    if current_path.len() >= max_depth as usize {
987        return;
988    }
989    visited.insert(current.to_string());
990
991    if let Some(neighbors) = adj.get(current) {
992        for (neighbor, rel) in neighbors {
993            if visited.contains(neighbor.as_str()) {
994                continue;
995            }
996            if let Some(entity) = entities.get(neighbor).cloned() {
997                current_path.push(PathStep {
998                    entity,
999                    relation: rel.clone(),
1000                });
1001                dfs_all_paths(
1002                    neighbor, target, max_depth, entities, adj,
1003                    visited, current_path, all_paths,
1004                );
1005                current_path.pop();
1006            }
1007        }
1008    }
1009
1010    visited.remove(current);
1011}
1012
1013// === Batch helpers (operate on a &mut SqliteConnection within a transaction) ===
1014
1015async fn batch_upsert_entity(
1016    conn: &mut sqlx::SqliteConnection,
1017    entity: &Entity,
1018) -> Result<UpsertResult, KgError> {
1019    let existing: Option<EntityRow> = sqlx::query_as(
1020        "SELECT id, name, entity_type, attributes_json, description, created_at, updated_at,
1021                version, source, tags_json, aliases_json
1022         FROM entities WHERE id = ?",
1023    )
1024    .bind(&entity.id)
1025    .fetch_optional(&mut *conn)
1026    .await
1027    .map_err(|e| KgError::Database(e.to_string()))?;
1028
1029    let attrs_json =
1030        serde_json::to_string(&entity.attributes).map_err(|e| KgError::Serialization(e.to_string()))?;
1031    let tags_json =
1032        serde_json::to_string(&entity.tags).map_err(|e| KgError::Serialization(e.to_string()))?;
1033    let aliases_json =
1034        serde_json::to_string(&entity.aliases).map_err(|e| KgError::Serialization(e.to_string()))?;
1035    let entity_type = entity.entity_type.as_str();
1036
1037    if let Some(row) = existing {
1038        let mut changed = Vec::new();
1039        let conflicts = Vec::new();
1040
1041        if row.name != entity.name {
1042            changed.push("name".into());
1043        }
1044        if row.entity_type != entity_type {
1045            changed.push("entity_type".into());
1046        }
1047
1048        if changed.is_empty() {
1049            return Ok(UpsertResult::Unchanged);
1050        }
1051
1052        sqlx::query(
1053            "UPDATE entities SET name=?, entity_type=?, attributes_json=?, description=?,
1054             updated_at=?, version=version+1, source=?, tags_json=?, aliases_json=?
1055             WHERE id=?",
1056        )
1057        .bind(&entity.name)
1058        .bind(&entity_type)
1059        .bind(&attrs_json)
1060        .bind(&entity.description)
1061        .bind(current_epoch_ms() as i64)
1062        .bind(&entity.source)
1063        .bind(&tags_json)
1064        .bind(&aliases_json)
1065        .bind(&entity.id)
1066        .execute(&mut *conn)
1067        .await
1068        .map_err(|e| KgError::Database(e.to_string()))?;
1069
1070        Ok(UpsertResult::Updated { changed_fields: changed, conflicts })
1071    } else {
1072        sqlx::query(
1073            "INSERT INTO entities (id, name, entity_type, attributes_json, description,
1074             created_at, updated_at, version, source, tags_json, aliases_json)
1075             VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?)",
1076        )
1077        .bind(&entity.id)
1078        .bind(&entity.name)
1079        .bind(&entity_type)
1080        .bind(&attrs_json)
1081        .bind(&entity.description)
1082        .bind(entity.created_at as i64)
1083        .bind(entity.updated_at as i64)
1084        .bind(&entity.source)
1085        .bind(&tags_json)
1086        .bind(&aliases_json)
1087        .execute(&mut *conn)
1088        .await
1089        .map_err(|e| KgError::Database(e.to_string()))?;
1090
1091        Ok(UpsertResult::Created)
1092    }
1093}
1094
1095async fn batch_upsert_relation(
1096    conn: &mut sqlx::SqliteConnection,
1097    relation: &Relation,
1098) -> Result<UpsertResult, KgError> {
1099    let existing: Option<RelationRow> = sqlx::query_as(
1100        "SELECT id, source_id, target_id, relation_type, properties_json, confidence,
1101                provenance_json, valid_from, valid_to, created_at, weight
1102         FROM relations WHERE id = ?",
1103    )
1104    .bind(&relation.id)
1105    .fetch_optional(&mut *conn)
1106    .await
1107    .map_err(|e| KgError::Database(e.to_string()))?;
1108
1109    let props_json = serde_json::to_string(&relation.properties)
1110        .map_err(|e| KgError::Serialization(e.to_string()))?;
1111    let provenance_json = relation
1112        .provenance
1113        .as_ref()
1114        .map(|p| serde_json::to_string(p))
1115        .transpose()
1116        .map_err(|e| KgError::Serialization(e.to_string()))?;
1117
1118    if existing.is_some() {
1119        sqlx::query(
1120            "UPDATE relations SET relation_type=?, properties_json=?, confidence=?,
1121             provenance_json=?, valid_from=?, valid_to=?, weight=?
1122             WHERE id=?",
1123        )
1124        .bind(&relation.relation_type)
1125        .bind(&props_json)
1126        .bind(relation.confidence.as_f32())
1127        .bind(&provenance_json)
1128        .bind(relation.valid_from.map(|v| v as i64))
1129        .bind(relation.valid_to.map(|v| v as i64))
1130        .bind(relation.weight)
1131        .bind(&relation.id)
1132        .execute(&mut *conn)
1133        .await
1134        .map_err(|e| KgError::Database(e.to_string()))?;
1135
1136        Ok(UpsertResult::Updated {
1137            changed_fields: vec!["relation_type".into()],
1138            conflicts: vec![],
1139        })
1140    } else {
1141        sqlx::query(
1142            "INSERT INTO relations (id, source_id, target_id, relation_type, properties_json,
1143             confidence, provenance_json, valid_from, valid_to, created_at, weight)
1144             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1145        )
1146        .bind(&relation.id)
1147        .bind(&relation.source_id)
1148        .bind(&relation.target_id)
1149        .bind(&relation.relation_type)
1150        .bind(&props_json)
1151        .bind(relation.confidence.as_f32())
1152        .bind(&provenance_json)
1153        .bind(relation.valid_from.map(|v| v as i64))
1154        .bind(relation.valid_to.map(|v| v as i64))
1155        .bind(relation.created_at as i64)
1156        .bind(relation.weight)
1157        .execute(&mut *conn)
1158        .await
1159        .map_err(|e| KgError::Database(e.to_string()))?;
1160
1161        Ok(UpsertResult::Created)
1162    }
1163}
1164
1165// === Row types ===
1166
1167#[derive(Debug, sqlx::FromRow)]
1168struct EntityRow {
1169    id: String,
1170    name: String,
1171    entity_type: String,
1172    attributes_json: String,
1173    description: Option<String>,
1174    created_at: i64,
1175    updated_at: i64,
1176    version: i64,
1177    source: Option<String>,
1178    tags_json: String,
1179    aliases_json: String,
1180}
1181
1182impl From<EntityRow> for Entity {
1183    fn from(r: EntityRow) -> Self {
1184        let attributes: HashMap<String, AttributeValue> =
1185            serde_json::from_str(&r.attributes_json).unwrap_or_default();
1186        let tags: Vec<String> = serde_json::from_str(&r.tags_json).unwrap_or_default();
1187        let aliases: Vec<String> = serde_json::from_str(&r.aliases_json).unwrap_or_default();
1188
1189        Self {
1190            id: r.id,
1191            name: r.name,
1192            entity_type: EntityType::from_str(&r.entity_type),
1193            attributes,
1194            description: r.description,
1195            created_at: r.created_at as u64,
1196            updated_at: r.updated_at as u64,
1197            version: r.version as u64,
1198            source: r.source,
1199            tags,
1200            aliases,
1201        }
1202    }
1203}
1204
1205#[derive(Debug, sqlx::FromRow)]
1206struct RelationRow {
1207    id: String,
1208    source_id: String,
1209    target_id: String,
1210    relation_type: String,
1211    properties_json: String,
1212    confidence: f32,
1213    provenance_json: Option<String>,
1214    valid_from: Option<i64>,
1215    valid_to: Option<i64>,
1216    created_at: i64,
1217    weight: Option<f32>,
1218}
1219
1220impl From<RelationRow> for Relation {
1221    fn from(r: RelationRow) -> Self {
1222        let properties: HashMap<String, String> =
1223            serde_json::from_str(&r.properties_json).unwrap_or_default();
1224        let provenance: Option<Provenance> = r
1225            .provenance_json
1226            .and_then(|j| serde_json::from_str(&j).ok());
1227
1228        Self {
1229            id: r.id,
1230            source_id: r.source_id,
1231            target_id: r.target_id,
1232            relation_type: r.relation_type,
1233            properties,
1234            confidence: Confidence::from_f32(r.confidence),
1235            provenance,
1236            valid_from: r.valid_from.map(|v| v as u64),
1237            valid_to: r.valid_to.map(|v| v as u64),
1238            created_at: r.created_at as u64,
1239            weight: r.weight,
1240        }
1241    }
1242}
1243
1244#[derive(Debug, sqlx::FromRow)]
1245struct OrphanRelationRow {
1246    id: String,
1247    source_id: String,
1248    target_id: String,
1249}
1250
1251// === Utility ===
1252
1253fn current_epoch_ms() -> u64 {
1254    std::time::SystemTime::now()
1255        .duration_since(std::time::UNIX_EPOCH)
1256        .unwrap_or_default()
1257        .as_millis() as u64
1258}