1use std::path::Path;
36use std::sync::{Arc, Mutex};
37
38use aonyx_core::{AonyxError, Result};
39use async_trait::async_trait;
40use chrono::{DateTime, Utc};
41use rusqlite::{params, Connection, OptionalExtension, Row};
42use serde::{Deserialize, Serialize};
43use serde_json::Value as JsonValue;
44use uuid::Uuid;
45
46pub type EntityId = Uuid;
48
49pub type RelationId = Uuid;
51
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
54pub struct Entity {
55 pub id: EntityId,
57 pub name: String,
59 pub entity_type: String,
61 #[serde(default)]
63 pub attrs: JsonValue,
64 pub valid_from: Option<DateTime<Utc>>,
66 pub valid_to: Option<DateTime<Utc>>,
68 pub source_doc_id: Option<String>,
70 pub confidence: f32,
72 pub created_at: DateTime<Utc>,
74}
75
76impl Entity {
77 pub fn new(name: impl Into<String>, entity_type: impl Into<String>) -> Self {
79 Self {
80 id: Uuid::new_v4(),
81 name: name.into(),
82 entity_type: entity_type.into(),
83 attrs: JsonValue::Null,
84 valid_from: None,
85 valid_to: None,
86 source_doc_id: None,
87 confidence: 1.0,
88 created_at: Utc::now(),
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
95pub struct Relation {
96 pub id: RelationId,
98 pub src_id: EntityId,
100 pub dst_id: EntityId,
102 pub predicate: String,
104 #[serde(default)]
106 pub attrs: JsonValue,
107 pub valid_from: Option<DateTime<Utc>>,
109 pub valid_to: Option<DateTime<Utc>>,
111 pub created_at: DateTime<Utc>,
113}
114
115impl Relation {
116 pub fn new(src_id: EntityId, dst_id: EntityId, predicate: impl Into<String>) -> Self {
118 Self {
119 id: Uuid::new_v4(),
120 src_id,
121 dst_id,
122 predicate: predicate.into(),
123 attrs: JsonValue::Null,
124 valid_from: None,
125 valid_to: None,
126 created_at: Utc::now(),
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub enum Direction {
134 Out,
136 In,
138 Both,
140}
141
142#[async_trait]
144pub trait KgStore: Send + Sync {
145 async fn upsert_entity(&self, entity: Entity) -> Result<EntityId>;
147
148 async fn upsert_relation(&self, relation: Relation) -> Result<RelationId>;
150
151 async fn get_entity(&self, id: EntityId) -> Result<Option<Entity>>;
153
154 async fn find_entities_by_name(&self, name: &str) -> Result<Vec<Entity>>;
156
157 async fn relations_for(
159 &self,
160 entity_id: EntityId,
161 direction: Direction,
162 ) -> Result<Vec<Relation>>;
163
164 async fn count_entities(&self) -> Result<usize>;
166
167 async fn list_entities(&self, limit: usize) -> Result<Vec<Entity>>;
170
171 async fn list_relations(&self, limit: usize) -> Result<Vec<Relation>>;
174}
175
176#[derive(Clone)]
183pub struct SqliteKgStore {
184 conn: Arc<Mutex<Connection>>,
185}
186
187impl SqliteKgStore {
188 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
190 let conn = Connection::open(path.as_ref())
191 .map_err(|e| AonyxError::Memory(format!("open kg db: {e}")))?;
192 Self::migrate(&conn)?;
193 Ok(Self {
194 conn: Arc::new(Mutex::new(conn)),
195 })
196 }
197
198 pub fn open_in_memory() -> Result<Self> {
200 let conn = Connection::open_in_memory()
201 .map_err(|e| AonyxError::Memory(format!("open in-memory kg: {e}")))?;
202 Self::migrate(&conn)?;
203 Ok(Self {
204 conn: Arc::new(Mutex::new(conn)),
205 })
206 }
207
208 fn migrate(conn: &Connection) -> Result<()> {
209 conn.execute_batch(MIGRATION_V1)
210 .map_err(|e| AonyxError::Memory(format!("migrate kg schema: {e}")))?;
211 Ok(())
212 }
213}
214
215const MIGRATION_V1: &str = r#"
216CREATE TABLE IF NOT EXISTS entities (
217 id TEXT PRIMARY KEY,
218 name TEXT NOT NULL,
219 entity_type TEXT NOT NULL,
220 attrs_json TEXT,
221 valid_from TEXT,
222 valid_to TEXT,
223 source_doc_id TEXT,
224 confidence REAL NOT NULL DEFAULT 1.0,
225 created_at TEXT NOT NULL
226);
227
228CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
229CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(entity_type);
230
231CREATE TABLE IF NOT EXISTS relations (
232 id TEXT NOT NULL PRIMARY KEY,
233 src_id TEXT NOT NULL REFERENCES entities(id),
234 dst_id TEXT NOT NULL REFERENCES entities(id),
235 predicate TEXT NOT NULL,
236 attrs_json TEXT,
237 valid_from TEXT,
238 valid_to TEXT,
239 created_at TEXT NOT NULL
240);
241
242CREATE INDEX IF NOT EXISTS idx_relations_src ON relations(src_id);
243CREATE INDEX IF NOT EXISTS idx_relations_dst ON relations(dst_id);
244CREATE INDEX IF NOT EXISTS idx_relations_predicate ON relations(predicate);
245"#;
246
247fn parse_uuid(s: &str) -> rusqlite::Result<Uuid> {
248 Uuid::parse_str(s).map_err(|e| {
249 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
250 })
251}
252
253fn parse_ts(s: Option<String>) -> Option<DateTime<Utc>> {
254 s.and_then(|raw| {
255 DateTime::parse_from_rfc3339(&raw)
256 .ok()
257 .map(|d| d.with_timezone(&Utc))
258 })
259}
260
261fn entity_from_row(row: &Row<'_>) -> rusqlite::Result<Entity> {
262 let id_str: String = row.get(0)?;
263 let name: String = row.get(1)?;
264 let entity_type: String = row.get(2)?;
265 let attrs_json: Option<String> = row.get(3)?;
266 let valid_from_raw: Option<String> = row.get(4)?;
267 let valid_to_raw: Option<String> = row.get(5)?;
268 let source_doc_id: Option<String> = row.get(6)?;
269 let confidence: f32 = row.get(7)?;
270 let created_at_raw: String = row.get(8)?;
271
272 let attrs = attrs_json
273 .and_then(|s| serde_json::from_str(&s).ok())
274 .unwrap_or(JsonValue::Null);
275 let created_at = DateTime::parse_from_rfc3339(&created_at_raw)
276 .map(|d| d.with_timezone(&Utc))
277 .unwrap_or_else(|_| Utc::now());
278
279 Ok(Entity {
280 id: parse_uuid(&id_str)?,
281 name,
282 entity_type,
283 attrs,
284 valid_from: parse_ts(valid_from_raw),
285 valid_to: parse_ts(valid_to_raw),
286 source_doc_id,
287 confidence,
288 created_at,
289 })
290}
291
292fn relation_from_row(row: &Row<'_>) -> rusqlite::Result<Relation> {
293 let id_str: String = row.get(0)?;
294 let src_str: String = row.get(1)?;
295 let dst_str: String = row.get(2)?;
296 let predicate: String = row.get(3)?;
297 let attrs_json: Option<String> = row.get(4)?;
298 let valid_from_raw: Option<String> = row.get(5)?;
299 let valid_to_raw: Option<String> = row.get(6)?;
300 let created_at_raw: String = row.get(7)?;
301
302 let attrs = attrs_json
303 .and_then(|s| serde_json::from_str(&s).ok())
304 .unwrap_or(JsonValue::Null);
305 let created_at = DateTime::parse_from_rfc3339(&created_at_raw)
306 .map(|d| d.with_timezone(&Utc))
307 .unwrap_or_else(|_| Utc::now());
308
309 Ok(Relation {
310 id: parse_uuid(&id_str)?,
311 src_id: parse_uuid(&src_str)?,
312 dst_id: parse_uuid(&dst_str)?,
313 predicate,
314 attrs,
315 valid_from: parse_ts(valid_from_raw),
316 valid_to: parse_ts(valid_to_raw),
317 created_at,
318 })
319}
320
321const ENTITY_COLUMNS: &str =
322 "id, name, entity_type, attrs_json, valid_from, valid_to, source_doc_id, confidence, created_at";
323
324const RELATION_COLUMNS: &str =
325 "id, src_id, dst_id, predicate, attrs_json, valid_from, valid_to, created_at";
326
327#[async_trait]
328impl KgStore for SqliteKgStore {
329 async fn upsert_entity(&self, entity: Entity) -> Result<EntityId> {
330 let conn = self.conn.clone();
331 let id = entity.id;
332 tokio::task::spawn_blocking(move || -> Result<()> {
333 let lock = conn.lock().expect("kg mutex poisoned");
334 lock.execute(
335 r#"
336 INSERT INTO entities (id, name, entity_type, attrs_json, valid_from, valid_to, source_doc_id, confidence, created_at)
337 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
338 ON CONFLICT(id) DO UPDATE SET
339 name = excluded.name,
340 entity_type = excluded.entity_type,
341 attrs_json = excluded.attrs_json,
342 valid_from = excluded.valid_from,
343 valid_to = excluded.valid_to,
344 source_doc_id = excluded.source_doc_id,
345 confidence = excluded.confidence
346 "#,
347 params![
348 entity.id.to_string(),
349 entity.name,
350 entity.entity_type,
351 serde_json::to_string(&entity.attrs).ok(),
352 entity.valid_from.map(|d| d.to_rfc3339()),
353 entity.valid_to.map(|d| d.to_rfc3339()),
354 entity.source_doc_id,
355 entity.confidence,
356 entity.created_at.to_rfc3339(),
357 ],
358 )
359 .map_err(|e| AonyxError::Memory(format!("upsert_entity: {e}")))?;
360 Ok(())
361 })
362 .await
363 .map_err(|e| AonyxError::Memory(format!("kg upsert_entity join: {e}")))??;
364 Ok(id)
365 }
366
367 async fn upsert_relation(&self, relation: Relation) -> Result<RelationId> {
368 let conn = self.conn.clone();
369 let id = relation.id;
370 tokio::task::spawn_blocking(move || -> Result<()> {
371 let lock = conn.lock().expect("kg mutex poisoned");
372 lock.execute(
373 r#"
374 INSERT INTO relations (id, src_id, dst_id, predicate, attrs_json, valid_from, valid_to, created_at)
375 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
376 ON CONFLICT(id) DO UPDATE SET
377 src_id = excluded.src_id,
378 dst_id = excluded.dst_id,
379 predicate = excluded.predicate,
380 attrs_json = excluded.attrs_json,
381 valid_from = excluded.valid_from,
382 valid_to = excluded.valid_to
383 "#,
384 params![
385 relation.id.to_string(),
386 relation.src_id.to_string(),
387 relation.dst_id.to_string(),
388 relation.predicate,
389 serde_json::to_string(&relation.attrs).ok(),
390 relation.valid_from.map(|d| d.to_rfc3339()),
391 relation.valid_to.map(|d| d.to_rfc3339()),
392 relation.created_at.to_rfc3339(),
393 ],
394 )
395 .map_err(|e| AonyxError::Memory(format!("upsert_relation: {e}")))?;
396 Ok(())
397 })
398 .await
399 .map_err(|e| AonyxError::Memory(format!("kg upsert_relation join: {e}")))??;
400 Ok(id)
401 }
402
403 async fn get_entity(&self, id: EntityId) -> Result<Option<Entity>> {
404 let conn = self.conn.clone();
405 tokio::task::spawn_blocking(move || -> Result<Option<Entity>> {
406 let lock = conn.lock().expect("kg mutex poisoned");
407 let sql = format!("SELECT {ENTITY_COLUMNS} FROM entities WHERE id = ?1");
408 let mut stmt = lock
409 .prepare(&sql)
410 .map_err(|e| AonyxError::Memory(format!("prepare get_entity: {e}")))?;
411 let row = stmt
412 .query_row(params![id.to_string()], entity_from_row)
413 .optional()
414 .map_err(|e| AonyxError::Memory(format!("get_entity: {e}")))?;
415 Ok(row)
416 })
417 .await
418 .map_err(|e| AonyxError::Memory(format!("kg get_entity join: {e}")))?
419 }
420
421 async fn find_entities_by_name(&self, name: &str) -> Result<Vec<Entity>> {
422 let conn = self.conn.clone();
423 let needle = name.to_string();
424 tokio::task::spawn_blocking(move || -> Result<Vec<Entity>> {
425 let lock = conn.lock().expect("kg mutex poisoned");
426 let sql = format!("SELECT {ENTITY_COLUMNS} FROM entities WHERE name = ?1");
427 let mut stmt = lock
428 .prepare(&sql)
429 .map_err(|e| AonyxError::Memory(format!("prepare find_entities_by_name: {e}")))?;
430 let rows = stmt
431 .query_map(params![needle], entity_from_row)
432 .map_err(|e| AonyxError::Memory(format!("query find_entities_by_name: {e}")))?;
433 let mut out = Vec::new();
434 for r in rows {
435 out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
436 }
437 Ok(out)
438 })
439 .await
440 .map_err(|e| AonyxError::Memory(format!("kg find_entities_by_name join: {e}")))?
441 }
442
443 async fn relations_for(
444 &self,
445 entity_id: EntityId,
446 direction: Direction,
447 ) -> Result<Vec<Relation>> {
448 let conn = self.conn.clone();
449 tokio::task::spawn_blocking(move || -> Result<Vec<Relation>> {
450 let lock = conn.lock().expect("kg mutex poisoned");
451 let where_clause = match direction {
452 Direction::Out => "WHERE src_id = ?1",
453 Direction::In => "WHERE dst_id = ?1",
454 Direction::Both => "WHERE src_id = ?1 OR dst_id = ?1",
455 };
456 let sql = format!("SELECT {RELATION_COLUMNS} FROM relations {where_clause}");
457 let mut stmt = lock
458 .prepare(&sql)
459 .map_err(|e| AonyxError::Memory(format!("prepare relations_for: {e}")))?;
460 let rows = stmt
461 .query_map(params![entity_id.to_string()], relation_from_row)
462 .map_err(|e| AonyxError::Memory(format!("query relations_for: {e}")))?;
463 let mut out = Vec::new();
464 for r in rows {
465 out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
466 }
467 Ok(out)
468 })
469 .await
470 .map_err(|e| AonyxError::Memory(format!("kg relations_for join: {e}")))?
471 }
472
473 async fn count_entities(&self) -> Result<usize> {
474 let conn = self.conn.clone();
475 tokio::task::spawn_blocking(move || -> Result<usize> {
476 let lock = conn.lock().expect("kg mutex poisoned");
477 let n: i64 = lock
478 .query_row("SELECT COUNT(*) FROM entities", [], |r| r.get(0))
479 .map_err(|e| AonyxError::Memory(format!("count_entities: {e}")))?;
480 Ok(n.max(0) as usize)
481 })
482 .await
483 .map_err(|e| AonyxError::Memory(format!("kg count_entities join: {e}")))?
484 }
485
486 async fn list_entities(&self, limit: usize) -> Result<Vec<Entity>> {
487 let conn = self.conn.clone();
488 tokio::task::spawn_blocking(move || -> Result<Vec<Entity>> {
489 let lock = conn.lock().expect("kg mutex poisoned");
490 let sql =
491 format!("SELECT {ENTITY_COLUMNS} FROM entities ORDER BY created_at DESC LIMIT ?1");
492 let mut stmt = lock
493 .prepare(&sql)
494 .map_err(|e| AonyxError::Memory(format!("prepare list_entities: {e}")))?;
495 let rows = stmt
496 .query_map(params![limit as i64], entity_from_row)
497 .map_err(|e| AonyxError::Memory(format!("query list_entities: {e}")))?;
498 let mut out = Vec::new();
499 for r in rows {
500 out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
501 }
502 Ok(out)
503 })
504 .await
505 .map_err(|e| AonyxError::Memory(format!("kg list_entities join: {e}")))?
506 }
507
508 async fn list_relations(&self, limit: usize) -> Result<Vec<Relation>> {
509 let conn = self.conn.clone();
510 tokio::task::spawn_blocking(move || -> Result<Vec<Relation>> {
511 let lock = conn.lock().expect("kg mutex poisoned");
512 let sql = format!(
513 "SELECT {RELATION_COLUMNS} FROM relations ORDER BY created_at DESC LIMIT ?1"
514 );
515 let mut stmt = lock
516 .prepare(&sql)
517 .map_err(|e| AonyxError::Memory(format!("prepare list_relations: {e}")))?;
518 let rows = stmt
519 .query_map(params![limit as i64], relation_from_row)
520 .map_err(|e| AonyxError::Memory(format!("query list_relations: {e}")))?;
521 let mut out = Vec::new();
522 for r in rows {
523 out.push(r.map_err(|e| AonyxError::Memory(format!("row decode: {e}")))?);
524 }
525 Ok(out)
526 })
527 .await
528 .map_err(|e| AonyxError::Memory(format!("kg list_relations join: {e}")))?
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535
536 #[tokio::test]
537 async fn open_in_memory_runs_migrations() {
538 let store = SqliteKgStore::open_in_memory().expect("open in-memory");
539 assert_eq!(store.count_entities().await.unwrap(), 0);
540 }
541
542 #[tokio::test]
543 async fn upsert_and_fetch_entity() {
544 let store = SqliteKgStore::open_in_memory().expect("open in-memory");
545 let e = Entity::new("Damien", "person");
546 let id = store.upsert_entity(e.clone()).await.unwrap();
547 let got = store.get_entity(id).await.unwrap().expect("entity exists");
548 assert_eq!(got.name, "Damien");
549 assert_eq!(got.entity_type, "person");
550 assert_eq!(got.confidence, 1.0);
551 }
552
553 #[tokio::test]
554 async fn upsert_is_idempotent() {
555 let store = SqliteKgStore::open_in_memory().expect("open in-memory");
556 let mut e = Entity::new("Aonyx Agent", "project");
557 let id = store.upsert_entity(e.clone()).await.unwrap();
558 e.name = "Aonyx Agent (renamed)".into();
559 e.id = id;
560 store.upsert_entity(e).await.unwrap();
561 assert_eq!(store.count_entities().await.unwrap(), 1);
562 let got = store.get_entity(id).await.unwrap().expect("entity exists");
563 assert_eq!(got.name, "Aonyx Agent (renamed)");
564 }
565
566 #[tokio::test]
567 async fn find_by_name_returns_matching_entities() {
568 let store = SqliteKgStore::open_in_memory().expect("open in-memory");
569 store
570 .upsert_entity(Entity::new("Alice", "person"))
571 .await
572 .unwrap();
573 store
574 .upsert_entity(Entity::new("Bob", "person"))
575 .await
576 .unwrap();
577 let hits = store.find_entities_by_name("Alice").await.unwrap();
578 assert_eq!(hits.len(), 1);
579 assert_eq!(hits[0].name, "Alice");
580 }
581
582 #[tokio::test]
583 async fn relations_can_be_queried_in_both_directions() {
584 let store = SqliteKgStore::open_in_memory().expect("open in-memory");
585 let a_id = store
586 .upsert_entity(Entity::new("Aonyx Agent", "project"))
587 .await
588 .unwrap();
589 let b_id = store
590 .upsert_entity(Entity::new("Aonyx RAG", "project"))
591 .await
592 .unwrap();
593 store
594 .upsert_relation(Relation::new(a_id, b_id, "ports_patterns_from"))
595 .await
596 .unwrap();
597
598 let out = store.relations_for(a_id, Direction::Out).await.unwrap();
599 let into = store.relations_for(b_id, Direction::In).await.unwrap();
600 let both = store.relations_for(a_id, Direction::Both).await.unwrap();
601
602 assert_eq!(out.len(), 1);
603 assert_eq!(into.len(), 1);
604 assert_eq!(both.len(), 1);
605 assert_eq!(out[0].predicate, "ports_patterns_from");
606 assert_eq!(out[0].src_id, a_id);
607 assert_eq!(out[0].dst_id, b_id);
608 }
609
610 #[tokio::test]
611 async fn list_entities_orders_newest_first_and_caps_limit() {
612 let store = SqliteKgStore::open_in_memory().expect("open in-memory");
613 for name in ["A", "B", "C"] {
614 store
615 .upsert_entity(Entity::new(name, "thing"))
616 .await
617 .unwrap();
618 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
619 }
620 let all = store.list_entities(100).await.unwrap();
621 assert_eq!(all.len(), 3);
622 assert_eq!(all[0].name, "C");
623 let two = store.list_entities(2).await.unwrap();
624 assert_eq!(two.len(), 2);
625 }
626
627 #[tokio::test]
628 async fn list_relations_returns_recent_edges_first() {
629 let store = SqliteKgStore::open_in_memory().expect("open in-memory");
630 let a = store.upsert_entity(Entity::new("a", "x")).await.unwrap();
631 let b = store.upsert_entity(Entity::new("b", "x")).await.unwrap();
632 let c = store.upsert_entity(Entity::new("c", "x")).await.unwrap();
633 store
634 .upsert_relation(Relation::new(a, b, "older"))
635 .await
636 .unwrap();
637 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
638 store
639 .upsert_relation(Relation::new(b, c, "newer"))
640 .await
641 .unwrap();
642 let rels = store.list_relations(10).await.unwrap();
643 assert_eq!(rels.len(), 2);
644 assert_eq!(rels[0].predicate, "newer");
645 }
646}