1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::{Durability, SqliteTuning};
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16fn sqlite_err(e: rusqlite::Error) -> MCSError {
17 MCSError::IoError(std::io::Error::other(e))
18}
19
20const fn is_not_found(e: &rusqlite::Error) -> bool {
21 matches!(e, rusqlite::Error::QueryReturnedNoRows)
22}
23
24#[inline(always)]
25fn now_us() -> i64 {
26 SystemTime::now()
27 .duration_since(UNIX_EPOCH)
28 .unwrap_or_default()
29 .as_micros() as i64
30}
31
32#[inline(always)]
33pub(crate) fn name_hash(name: &str) -> i64 {
34 let mut h: u64 = 0xcbf29ce484222325;
35 for b in name.bytes() {
36 h ^= u64::from(b);
37 h = h.wrapping_mul(0x100000001b3);
38 }
39 h as i64
40}
41
42fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
43 let mut stmt = conn
44 .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
45 .map_err(sqlite_err)?;
46 let rows = stmt
47 .query_map(params![entity_id], |row| row.get::<_, String>(0))
48 .map_err(sqlite_err)?
49 .filter_map(|r| r.ok())
50 .collect::<Vec<_>>();
51 Ok(rows)
52}
53
54fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
55 load_observations(conn, entity_id).unwrap_or_default()
56}
57
58fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
59 let h = name_hash(name);
60 let mut stmt = conn
61 .prepare_cached(
62 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
63 )
64 .map_err(sqlite_err)?;
65 match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
66 Ok(id) => Ok(Some(id)),
67 Err(e) if is_not_found(&e) => Ok(None),
68 Err(e) => Err(sqlite_err(e)),
69 }
70}
71
72fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
73 let mut sel = conn
74 .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
75 .map_err(sqlite_err)?;
76 if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
77 return Ok(id);
78 }
79 conn.execute(
80 "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
81 params![kind, type_name],
82 )
83 .map_err(sqlite_err)?;
84 Ok(conn.last_insert_rowid())
85}
86
87fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
91 conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
92 .ok()?
93 .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
94 .ok()
95}
96
97fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
98 conn.execute(
99 "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
100 params![delta, type_id],
101 )
102 .map_err(sqlite_err)?;
103 Ok(())
104}
105
106fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
107 conn.execute(
108 "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
109 params![delta, key],
110 )
111 .map_err(sqlite_err)?;
112 Ok(())
113}
114
115fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
116 conn.query_row(
117 "SELECT value FROM graph_stat WHERE key = ?1",
118 params![key],
119 |row| row.get(0),
120 )
121 .map_err(sqlite_err)
122}
123
124fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
125 conn.query_row(
126 "SELECT name FROM type_dict WHERE id = ?1",
127 params![type_id],
128 |row| row.get(0),
129 )
130 .map_err(sqlite_err)
131}
132
133fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
134 let mut stmt = conn
135 .prepare_cached(
136 "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
137 )
138 .map_err(sqlite_err)?;
139 let rows = stmt
140 .query_map(params![kind], |row| {
141 Ok((
142 row.get::<_, String>(0)?,
143 row.get::<_, i64>(1)? as usize,
144 ))
145 })
146 .map_err(sqlite_err)?
147 .filter_map(|r| r.ok())
148 .collect();
149 Ok(rows)
150}
151
152fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
153 let mut stmt = conn
154 .prepare_cached(
155 "SELECT e.name, t.name,
156 COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
157 FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
158 )
159 .map_err(sqlite_err)?;
160 let (name, etype, obs_json): (String, String, String) = stmt
161 .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
162 .map_err(sqlite_err)?;
163 let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
164 Ok(Entity {
165 name,
166 entity_type: etype,
167 observations,
168 })
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum Direction {
174 Outgoing,
175 Incoming,
176 Both,
177}
178
179impl Direction {
180 pub fn parse(s: Option<&str>) -> Self {
181 match s {
182 Some("OUTGOING") => Direction::Outgoing,
183 Some("INCOMING") => Direction::Incoming,
184 _ => Direction::Both,
185 }
186 }
187}
188
189pub fn push_json_str(buf: &mut String, raw: &str) {
192 buf.push('"');
193 let mut start = 0;
194 let bytes = raw.as_bytes();
195 for (i, &b) in bytes.iter().enumerate() {
196 let esc: u8 = match b {
197 b'"' => b'"',
198 b'\\' => b'\\',
199 b'\n' => b'n',
200 b'\r' => b'r',
201 b'\t' => b't',
202 0x08 => b'b',
203 0x0C => b'f',
204 0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, _ => continue,
206 };
207 buf.push_str(&raw[start..i]);
208 buf.push('\\');
209 buf.push(esc as char);
210 start = i + 1;
211 }
212 for (i, &b) in bytes.iter().enumerate().skip(start) {
214 if b <= 0x07 || b == 0x0B || (0x0E..=0x1F).contains(&b) {
215 buf.push_str(&raw[start..i]);
216 write_escape_unicode(buf, b);
217 start = i + 1;
218 }
219 }
220 buf.push_str(&raw[start..]);
221 buf.push('"');
222}
223
224#[inline(never)]
225fn write_escape_unicode(buf: &mut String, b: u8) {
226 use std::fmt::Write;
227 write!(buf, "\\u{:04x}", b).unwrap();
228}
229
230#[derive(Copy, Clone)]
233struct EntityMeta {
234 id: i64,
235 type_id: i64,
236 obs_count: i64,
237 out_deg: i64,
238 in_deg: i64,
239}
240
241struct TxGuard<'a> {
244 conn: &'a Connection,
245 done: bool,
246}
247
248impl<'a> TxGuard<'a> {
249 fn begin(conn: &'a Connection) -> Result<Self> {
250 conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
255 Ok(Self { conn, done: false })
256 }
257
258 fn commit(mut self) -> Result<()> {
259 self.done = true;
260 self.conn.execute_batch("COMMIT").map_err(sqlite_err)
261 }
262}
263
264impl Drop for TxGuard<'_> {
265 fn drop(&mut self) {
266 if !self.done {
267 let _ = self.conn.execute_batch("ROLLBACK");
268 }
269 }
270}
271
272struct ReaderPool {
279 conns: Vec<Mutex<Connection>>,
280 next: AtomicUsize,
281}
282
283impl ReaderPool {
284 fn get(&self) -> MutexGuard<'_, Connection> {
288 for c in &self.conns {
289 if let Some(g) = c.try_lock() {
290 return g;
291 }
292 }
293 let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
294 self.conns[i].lock()
295 }
296}
297
298pub struct GraphHandle {
301 writer: Mutex<Connection>,
304 readers: ReaderPool,
306 seq_entity: AtomicI64,
307 seq_obs: AtomicI64,
308 cache: Mutex<LruCache<String, EntityMeta>>,
309}
310
311fn open_reader(path: &Path, tuning: &SqliteTuning) -> Result<Connection> {
318 let conn = Connection::open_with_flags(
319 path,
320 OpenFlags::SQLITE_OPEN_READ_WRITE
321 | OpenFlags::SQLITE_OPEN_NO_MUTEX
322 | OpenFlags::SQLITE_OPEN_URI,
323 )
324 .map_err(sqlite_err)?;
325 conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
326 .map_err(sqlite_err)?;
327 conn.execute_batch(&format!(
328 "PRAGMA query_only = ON;
329 PRAGMA cache_size = -{};
330 PRAGMA temp_store = MEMORY;
331 PRAGMA mmap_size = {};",
332 tuning.cache_size_kb, tuning.mmap_size
333 ))
334 .map_err(sqlite_err)?;
335 Ok(conn)
336}
337
338impl GraphHandle {
339 pub fn new(
340 path: &Path,
341 durability: Durability,
342 tuning: SqliteTuning,
343 lru_cache_size: NonZeroUsize,
344 read_pool_size: usize,
345 ) -> Result<Self> {
346 let conn = Connection::open(path).map_err(sqlite_err)?;
347 conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
350 .map_err(sqlite_err)?;
351
352 conn.execute_batch(&format!(
358 "PRAGMA page_size = {};
359 PRAGMA auto_vacuum = INCREMENTAL;",
360 tuning.page_size
361 ))
362 .map_err(sqlite_err)?;
363
364 conn.execute_batch(&format!(
365 "PRAGMA journal_mode = WAL;
366 PRAGMA foreign_keys = OFF;
367 PRAGMA cache_size = -{};
368 PRAGMA temp_store = MEMORY;
369 PRAGMA busy_timeout = {};
370 PRAGMA synchronous = NORMAL;
371 PRAGMA journal_size_limit = {};",
372 tuning.cache_size_kb, tuning.busy_timeout_ms, tuning.journal_size_limit
373 ))
374 .map_err(sqlite_err)?;
375
376 conn.execute_batch(
377 "CREATE TABLE IF NOT EXISTS entity (
378 id INTEGER PRIMARY KEY,
379 name_hash INTEGER NOT NULL,
380 name TEXT NOT NULL,
381 type_id INTEGER NOT NULL,
382 obs_count INTEGER NOT NULL DEFAULT 0,
383 out_deg INTEGER NOT NULL DEFAULT 0,
384 in_deg INTEGER NOT NULL DEFAULT 0,
385 created_us INTEGER NOT NULL,
386 updated_us INTEGER NOT NULL,
387 flags INTEGER NOT NULL DEFAULT 0
388 ) STRICT;
389
390 CREATE INDEX IF NOT EXISTS entity_by_hash
391 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
392 WHERE flags = 0;
393
394 CREATE INDEX IF NOT EXISTS entity_name_ci
395 ON entity(lower(name))
396 WHERE flags = 0;
397
398 CREATE TABLE IF NOT EXISTS observation (
399 id INTEGER PRIMARY KEY,
400 entity_id INTEGER NOT NULL,
401 idx INTEGER NOT NULL,
402 body TEXT NOT NULL,
403 created_us INTEGER NOT NULL
404 ) STRICT;
405
406 CREATE INDEX IF NOT EXISTS obs_by_entity
407 ON observation(entity_id, idx);
408
409 CREATE TABLE IF NOT EXISTS relation (
410 from_id INTEGER NOT NULL,
411 to_id INTEGER NOT NULL,
412 type_id INTEGER NOT NULL,
413 created_us INTEGER NOT NULL
414 ) STRICT;
415
416 CREATE INDEX IF NOT EXISTS rel_out
417 ON relation(from_id, type_id, to_id);
418
419 CREATE INDEX IF NOT EXISTS rel_in
420 ON relation(to_id, type_id, from_id);
421
422 CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
423 USING fts5(name, content='entity', content_rowid='id',
424 tokenize='unicode61 remove_diacritics 2');
425
426 CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
427 USING fts5(body, content='observation', content_rowid='id',
428 tokenize='unicode61 remove_diacritics 2');
429
430 CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
431 INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
432 END;
433
434 CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
435 INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
436 END;
437
438 CREATE TABLE IF NOT EXISTS type_dict (
439 id INTEGER PRIMARY KEY,
440 kind INTEGER NOT NULL,
441 name TEXT NOT NULL,
442 count INTEGER NOT NULL DEFAULT 0
443 ) STRICT;
444
445 CREATE INDEX IF NOT EXISTS type_by_name
446 ON type_dict(kind, name);
447
448 CREATE TABLE IF NOT EXISTS graph_stat (
449 key TEXT NOT NULL PRIMARY KEY,
450 value INTEGER NOT NULL
451 ) STRICT, WITHOUT ROWID;
452
453 CREATE TABLE IF NOT EXISTS hub_degree (
454 entity_id INTEGER PRIMARY KEY,
455 out_deg INTEGER NOT NULL,
456 in_deg INTEGER NOT NULL
457 ) STRICT;
458
459 CREATE TABLE IF NOT EXISTS partition_map (
460 table_name TEXT NOT NULL PRIMARY KEY,
461 role INTEGER NOT NULL,
462 type_id INTEGER,
463 row_count INTEGER NOT NULL DEFAULT 0
464 ) STRICT, WITHOUT ROWID;",
465 )
466 .map_err(sqlite_err)?;
467
468 conn.execute_batch(&format!("PRAGMA mmap_size = {};", tuning.mmap_size))
469 .map_err(sqlite_err)?;
470
471 let sync_pragma = match durability {
472 Durability::Sync => "PRAGMA synchronous = FULL",
473 Durability::Async => "PRAGMA synchronous = NORMAL",
474 };
475 conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
476
477 conn.execute_batch("PRAGMA analysis_limit = 400;")
480 .map_err(sqlite_err)?;
481
482 let has_stat: bool = conn
483 .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
484 .is_ok();
485 if !has_stat {
486 conn.execute_batch(
487 "INSERT INTO graph_stat(key, value) VALUES
488 ('entities', 0), ('relations', 0), ('observations', 0),
489 ('entity_seq', 0), ('obs_seq', 0);",
490 )
491 .map_err(sqlite_err)?;
492 }
493
494 conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
495
496 let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
497 let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
498
499 let pool_size = read_pool_size.max(1);
502 let mut conns = Vec::with_capacity(pool_size);
503 for _ in 0..pool_size {
504 conns.push(Mutex::new(open_reader(path, &tuning)?));
505 }
506 let readers = ReaderPool {
507 conns,
508 next: AtomicUsize::new(0),
509 };
510
511 Ok(Self {
512 writer: Mutex::new(conn),
513 readers,
514 seq_entity: AtomicI64::new(seq_entity),
515 seq_obs: AtomicI64::new(seq_obs),
516 cache: Mutex::new(LruCache::new(lru_cache_size)),
517 })
518 }
519
520 fn next_entity_id(&self) -> i64 {
521 self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
522 }
523
524 fn next_obs_id(&self) -> i64 {
525 self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
526 }
527
528 fn meta_get(&self, name: &str) -> Option<EntityMeta> {
529 self.cache.lock().get(name).copied()
530 }
531
532 fn meta_set(&self, name: &str, m: EntityMeta) {
533 self.cache.lock().put(name.to_string(), m);
534 }
535
536 fn meta_remove(&self, name: &str) {
537 self.cache.lock().pop(name);
538 }
539
540 fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
541 let mut cache = self.cache.lock();
542 if let Some(m) = cache.get_mut(name) {
543 f(m);
544 }
545 }
546
547 fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
548 if let Some(m) = self.meta_get(name) {
549 return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
550 }
551 let h = name_hash(name);
552 let mut stmt = conn
553 .prepare_cached(
554 "SELECT id, type_id, obs_count, out_deg, in_deg
555 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
556 )
557 .map_err(sqlite_err)?;
558 match stmt.query_row(params![h, name], |row| {
559 Ok(EntityMeta {
560 id: row.get(0)?,
561 type_id: row.get(1)?,
562 obs_count: row.get(2)?,
563 out_deg: row.get(3)?,
564 in_deg: row.get(4)?,
565 })
566 }) {
567 Ok(m) => {
568 self.meta_set(name, m);
569 Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
570 }
571 Err(e) if is_not_found(&e) => Ok(None),
572 Err(e) => Err(sqlite_err(e)),
573 }
574 }
575
576 fn sync_seqs(&self, conn: &Connection) -> Result<()> {
577 let seq_e = self.seq_entity.load(Ordering::Relaxed);
578 let seq_o = self.seq_obs.load(Ordering::Relaxed);
579 conn.execute(
580 "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
581 WHERE key IN ('entity_seq', 'obs_seq')",
582 params![seq_e, seq_o],
583 )
584 .map_err(sqlite_err)?;
585 Ok(())
586 }
587
588 pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
591 if name.is_empty() {
592 return Ok(None);
593 }
594
595 if let Some(m) = self.meta_get(name) {
596 let conn = self.readers.get();
597 let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
598 let observations = load_observations_opt(&conn, m.id);
599 return Ok(Some(Entity {
600 name: name.to_string(),
601 entity_type: etype,
602 observations,
603 }));
604 }
605
606 let conn = self.readers.get();
607 let h = name_hash(name);
608 let mut stmt = conn
609 .prepare_cached(
610 "SELECT e.id, e.type_id, e.name, t.name,
611 e.obs_count, e.out_deg, e.in_deg
612 FROM entity e
613 JOIN type_dict t ON t.id = e.type_id
614 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
615 )
616 .map_err(sqlite_err)?;
617 match stmt.query_row(params![h, name], |row| {
618 let id: i64 = row.get(0)?;
619 let type_id: i64 = row.get(1)?;
620 let ename: String = row.get(2)?;
621 let etype: String = row.get(3)?;
622 let obs_count: i64 = row.get(4)?;
623 let out_deg: i64 = row.get(5)?;
624 let in_deg: i64 = row.get(6)?;
625 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
626 }) {
627 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
628 let observations = load_observations_opt(&conn, id);
629 drop(stmt);
630 drop(conn);
631 self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
632 Ok(Some(Entity {
633 name: ename,
634 entity_type: etype,
635 observations,
636 }))
637 }
638 Err(e) if is_not_found(&e) => Ok(None),
639 Err(e) => Err(sqlite_err(e)),
640 }
641 }
642
643 pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
644 let conn = self.writer.lock();
645 let tx = TxGuard::begin(&conn)?;
646
647 let mut ins_ent = conn
648 .prepare_cached(
649 "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
650 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
651 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
652 )
653 .map_err(sqlite_err)?;
654
655 let mut ins_fts = conn
656 .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
657 .map_err(sqlite_err)?;
658
659 let batch_ts = now_us();
660 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
661 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
662 let mut total_entities: i64 = 0;
663 let mut total_obs: i64 = 0;
664 let mut created = Vec::new();
665 let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
666 let mut obs_sql = String::new();
667
668 for entity in entities {
669 if entity.name.is_empty() {
670 continue;
671 }
672 let h = name_hash(&entity.name);
673 let id = self.next_entity_id();
674 let type_id = match type_cache.get(entity.entity_type.as_str()) {
675 Some(t) => *t,
676 None => {
677 let t = get_type_id(&conn, &entity.entity_type, 0)?;
678 type_cache.insert(entity.entity_type.clone(), t);
679 t
680 }
681 };
682 let obs_count = entity.observations.len() as i64;
683
684 let changed = ins_ent
685 .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
686 .map_err(sqlite_err)?;
687 if changed == 0 {
688 continue;
689 }
690
691 let n = entity.observations.len();
692 if n > 0 {
693 obs_sql.clear();
694
695 let mut oids = Vec::with_capacity(n);
696 let mut idxs = Vec::with_capacity(n);
697 for _ in 0..n {
698 oids.push(self.next_obs_id());
699 }
700 for i in 0..n as i64 {
701 idxs.push(i);
702 }
703
704 obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
705 for i in 0..n {
706 if i > 0 { obs_sql.push(','); }
707 obs_sql.push_str("(?,?,?,?,?)");
708 }
709
710 let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
711 for (i, obs) in entity.observations.iter().enumerate() {
712 obs_params.push(&oids[i]);
713 obs_params.push(&id);
714 obs_params.push(&idxs[i]);
715 obs_params.push(obs);
716 obs_params.push(&batch_ts);
717 }
718
719 conn.execute(&obs_sql, obs_params.as_slice())
720 .map_err(sqlite_err)?;
721 }
722
723 ins_fts
724 .execute(params![id, entity.name])
725 .map_err(sqlite_err)?;
726
727 *type_deltas.entry(type_id).or_insert(0) += 1;
728 total_entities += 1;
729 total_obs += obs_count;
730
731 created.push(entity.clone());
732 created_metas.push((entity.name.clone(), EntityMeta {
733 id,
734 type_id,
735 obs_count,
736 out_deg: 0,
737 in_deg: 0,
738 }));
739 }
740
741 if total_entities > 0 {
742 for (type_id, delta) in &type_deltas {
743 inc_type_count(&conn, *type_id, *delta)?;
744 }
745 inc_graph_stat(&conn, "entities", total_entities)?;
746 inc_graph_stat(&conn, "observations", total_obs)?;
747 self.sync_seqs(&conn)?;
748 }
749
750 tx.commit()?;
751
752 if !created_metas.is_empty() {
757 let mut cache = self.cache.lock();
758 for (name, meta) in &created_metas {
759 cache.put(name.clone(), *meta);
760 }
761 }
762
763 Ok(created)
764 }
765
766 pub fn delete_entities(&self, names: &[String]) -> Result<()> {
767 if names.is_empty() {
768 return Ok(());
769 }
770 let conn = self.writer.lock();
771
772 let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
774 let mut sel = conn
775 .prepare_cached(
776 "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
777 )
778 .map_err(sqlite_err)?;
779 for name in names {
780 let h = name_hash(name);
781 let (id, type_id) = match sel.query_row(params![h, name], |row| {
782 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
783 }) {
784 Ok(v) => v,
785 Err(e) if is_not_found(&e) => continue,
786 Err(e) => return Err(sqlite_err(e)),
787 };
788 resolved.push((id, type_id, name.clone()));
789 }
790
791 if resolved.is_empty() {
792 return Ok(());
793 }
794
795 let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
796 let n = ids.len();
797
798 let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
800 let obs_sql = format!(
801 "DELETE FROM observation WHERE entity_id IN ({})",
802 obs_p.join(",")
803 );
804 let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
805 let obs_deleted = conn
806 .execute(&obs_sql, obs_refs.as_slice())
807 .map_err(sqlite_err)? as i64;
808
809 let rel_sql = format!(
811 "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
812 obs_p.join(","),
813 obs_p.join(",")
814 );
815 let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
816 let rel_deleted = conn
817 .execute(&rel_sql, rel_refs.as_slice())
818 .map_err(sqlite_err)? as i64;
819
820 let fts_values: Vec<String> = (0..n)
822 .map(|_| "('delete', ?, '')".to_string())
823 .collect();
824 let fts_sql = format!(
825 "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
826 fts_values.join(", ")
827 );
828 conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
829 .map_err(sqlite_err)?;
830
831 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
833 for &(_, type_id, _) in &resolved {
834 *type_deltas.entry(type_id).or_insert(0) += 1;
835 }
836
837 if !type_deltas.is_empty() {
839 let m = type_deltas.len();
840 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
841 let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
842 let mut case_parts: Vec<String> = Vec::with_capacity(m);
843 let mut id_parts: Vec<String> = Vec::with_capacity(m);
844 for i in 0..m {
845 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
846 id_parts.push(format!("?{}", i + 1));
847 }
848 let sql = format!(
849 "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
850 case_parts.join(" "),
851 id_parts.join(","),
852 );
853 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
854 for id in &type_keys {
855 params.push(Box::new(*id));
856 }
857 for delta in &type_vals {
858 params.push(Box::new(*delta));
859 }
860 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
861 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
862 }
863
864 conn.execute(
866 &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
867 ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
868 )
869 .map_err(sqlite_err)?;
870
871 inc_graph_stat(&conn, "entities", -(n as i64))?;
873 inc_graph_stat(&conn, "observations", -obs_deleted)?;
874 inc_graph_stat(&conn, "relations", -rel_deleted)?;
875
876 for (_, _, name) in &resolved {
878 self.meta_remove(name);
879 }
880
881 Ok(())
882 }
883
884 pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
885 let conn = self.writer.lock();
886 let tx = TxGuard::begin(&conn)?;
887
888 let mut ins = conn
889 .prepare_cached(
890 "INSERT INTO relation (from_id, to_id, type_id, created_us)
891 SELECT ?1, ?2, ?3, ?4
892 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
893 )
894 .map_err(sqlite_err)?;
895
896 let ts = now_us();
897 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
898 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
899 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
900 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
901 let mut total_relations: i64 = 0;
902 let mut created = Vec::new();
903
904 for rel in relations {
905 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
906 Some(v) => v,
907 None => continue,
908 };
909 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
910 Some(v) => v,
911 None => continue,
912 };
913 let type_id = match type_cache.get(rel.relation_type.as_str()) {
914 Some(t) => *t,
915 None => {
916 let t = get_type_id(&conn, &rel.relation_type, 1)?;
917 type_cache.insert(rel.relation_type.clone(), t);
918 t
919 }
920 };
921
922 let changed = ins
923 .execute(params![from_id, to_id, type_id, ts])
924 .map_err(sqlite_err)?;
925 if changed == 0 {
926 continue;
927 }
928
929 *out_deltas.entry(from_id).or_insert(0) += 1;
930 *in_deltas.entry(to_id).or_insert(0) += 1;
931 *type_deltas.entry(type_id).or_insert(0) += 1;
932 total_relations += 1;
933
934 created.push(rel.clone());
935 }
936
937 if total_relations > 0 {
938 for (id, delta) in &out_deltas {
939 conn.execute(
940 "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
941 params![delta, id],
942 )
943 .map_err(sqlite_err)?;
944 }
945 for (id, delta) in in_deltas {
946 conn.execute(
947 "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
948 params![delta, id],
949 )
950 .map_err(sqlite_err)?;
951 }
952 for (type_id, delta) in &type_deltas {
953 inc_type_count(&conn, *type_id, *delta)?;
954 }
955 inc_graph_stat(&conn, "relations", total_relations)?;
956 }
957
958 tx.commit()?;
959
960 if !created.is_empty() {
963 let mut cache = self.cache.lock();
964 for rel in &created {
965 if let Some(m) = cache.get_mut(&rel.from) {
966 m.out_deg += 1;
967 }
968 if let Some(m) = cache.get_mut(&rel.to) {
969 m.in_deg += 1;
970 }
971 }
972 }
973
974 Ok(created)
975 }
976
977 pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
978 if relations.is_empty() {
979 return Ok(());
980 }
981 let conn = self.writer.lock();
982
983 let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
985 let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
986 for rel in relations {
987 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
988 Some(v) => v,
989 None => continue,
990 };
991 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
992 Some(v) => v,
993 None => continue,
994 };
995 let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
996 Ok(id) => id,
997 Err(_) => continue,
998 };
999 triples.push((from_id, to_id, type_id));
1000 names.push((rel.from.clone(), rel.to.clone()));
1001 }
1002
1003 if triples.is_empty() {
1004 return Ok(());
1005 }
1006
1007 let mut sql = String::from(
1009 "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
1010 );
1011 for (i, _) in triples.iter().enumerate() {
1012 if i > 0 {
1013 sql.push_str(", ");
1014 }
1015 let base = (i * 3) + 1;
1016 sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1017 }
1018 sql.push(')');
1019
1020 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1021 for &(f, t, tp) in &triples {
1022 param_values.push(Box::new(f));
1023 param_values.push(Box::new(t));
1024 param_values.push(Box::new(tp));
1025 }
1026 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1027 let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1028 if total == 0 {
1029 return Ok(());
1030 }
1031
1032 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1034 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1035 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1036 for &(from_id, to_id, type_id) in &triples {
1037 *out_deltas.entry(from_id).or_insert(0) += 1;
1038 *in_deltas.entry(to_id).or_insert(0) += 1;
1039 *type_deltas.entry(type_id).or_insert(0) += 1;
1040 }
1041
1042 let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1044 let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1045 if !out_keys.is_empty() {
1046 let m = out_keys.len();
1047 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1048 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1049 for i in 0..m {
1050 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1051 id_parts.push(format!("?{}", i + 1));
1052 }
1053 let sql = format!(
1054 "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1055 case_parts.join(" "),
1056 id_parts.join(","),
1057 );
1058 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1059 for id in &out_keys {
1060 params.push(Box::new(*id));
1061 }
1062 for delta in &out_vals {
1063 params.push(Box::new(*delta));
1064 }
1065 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1066 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1067 }
1068
1069 let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1071 let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1072 if !in_keys.is_empty() {
1073 let m = in_keys.len();
1074 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1075 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1076 for i in 0..m {
1077 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1078 id_parts.push(format!("?{}", i + 1));
1079 }
1080 let sql = format!(
1081 "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1082 case_parts.join(" "),
1083 id_parts.join(","),
1084 );
1085 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1086 for id in &in_keys {
1087 params.push(Box::new(*id));
1088 }
1089 for delta in &in_vals {
1090 params.push(Box::new(*delta));
1091 }
1092 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1093 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1094 }
1095
1096 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1098 let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1099 if !type_keys.is_empty() {
1100 let m = type_keys.len();
1101 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1102 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1103 for i in 0..m {
1104 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1105 id_parts.push(format!("?{}", i + 1));
1106 }
1107 let sql = format!(
1108 "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1109 case_parts.join(" "),
1110 id_parts.join(","),
1111 );
1112 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1113 for id in &type_keys {
1114 params.push(Box::new(*id));
1115 }
1116 for delta in &type_vals {
1117 params.push(Box::new(*delta));
1118 }
1119 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1120 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1121 }
1122
1123 inc_graph_stat(&conn, "relations", -(total as i64))?;
1124
1125 for (from, to) in &names {
1128 self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1129 self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1130 }
1131
1132 Ok(())
1133 }
1134
1135 pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1136 let conn = self.writer.lock();
1137 let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1138 Some(v) => v,
1139 None => {
1140 return Err(MCSError::InvalidParams(format!(
1141 "Entity '{entity_name}' not found"
1142 )))
1143 }
1144 };
1145
1146 let mut max_idx: i64 = conn
1147 .query_row(
1148 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1149 params![id],
1150 |row| row.get(0),
1151 )
1152 .map_err(sqlite_err)?;
1153
1154 let ts = now_us();
1155 let mut ins_obs = conn
1156 .prepare_cached(
1157 "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1158 )
1159 .map_err(sqlite_err)?;
1160
1161 for content in contents {
1162 max_idx += 1;
1163 let oid = self.next_obs_id();
1164 ins_obs
1165 .execute(params![oid, id, max_idx, content, ts])
1166 .map_err(sqlite_err)?;
1167 }
1168 let added = contents.to_vec();
1169
1170 let count: i64 = contents.len() as i64;
1171 conn.execute(
1172 "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1173 params![count, ts, id],
1174 )
1175 .map_err(sqlite_err)?;
1176
1177 inc_graph_stat(&conn, "observations", count)?;
1178 self.sync_seqs(&conn)?;
1179
1180 self.meta_update(entity_name, |m| m.obs_count += count);
1181
1182 Ok(added)
1183 }
1184
1185 pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1186 if observations.is_empty() {
1187 return Ok(());
1188 }
1189 let conn = self.writer.lock();
1190 let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1191 Some(v) => v,
1192 None => {
1193 return Err(MCSError::InvalidParams(format!(
1194 "Entity '{entity_name}' not found"
1195 )))
1196 }
1197 };
1198
1199 let placeholders: Vec<String> = (0..observations.len())
1200 .map(|i| format!("?{}", i + 2))
1201 .collect();
1202 let sql = format!(
1203 "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1204 placeholders.join(",")
1205 );
1206
1207 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1208 param_values.push(Box::new(id));
1209 for obs in observations {
1210 param_values.push(Box::new(obs.as_str()));
1211 }
1212 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1213 let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1214
1215 if removed > 0 {
1216 conn.execute(
1217 "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1218 params![removed, now_us(), id],
1219 )
1220 .map_err(sqlite_err)?;
1221 inc_graph_stat(&conn, "observations", -removed)?;
1222
1223 self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1224 }
1225
1226 Ok(())
1227 }
1228
1229 pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1230 let mut results = Vec::new();
1231 for entity in entities {
1232 if let Some(existing) = self.get_entity(&entity.name)? {
1233 if existing.entity_type != entity.entity_type {
1235 let conn = self.writer.lock();
1236 let old_type_id = conn
1237 .query_row(
1238 "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1239 params![name_hash(&entity.name), entity.name],
1240 |row| row.get::<_, i64>(0),
1241 )
1242 .map_err(sqlite_err)?;
1243 let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1244 inc_type_count(&conn, old_type_id, -1)?;
1245 inc_type_count(&conn, new_type_id, 1)?;
1246 conn.execute(
1247 "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1248 params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1249 )
1250 .map_err(sqlite_err)?;
1251 self.meta_remove(&entity.name);
1253 }
1254 let existing_set: HashSet<&str> =
1256 existing.observations.iter().map(|s| s.as_str()).collect();
1257 let to_add: Vec<String> = entity
1258 .observations
1259 .iter()
1260 .filter(|o| !existing_set.contains(o.as_str()))
1261 .cloned()
1262 .collect();
1263 if !to_add.is_empty() {
1264 self.add_observations(&entity.name, &to_add)?;
1265 }
1266 let updated = self
1267 .get_entity(&entity.name)?
1268 .unwrap_or(entity.clone());
1269 results.push(updated);
1270 } else {
1271 let c = self.create_entities(std::slice::from_ref(entity))?;
1272 if let Some(e) = c.into_iter().next() {
1273 results.push(e);
1274 }
1275 }
1276 }
1277 Ok(results)
1278 }
1279
1280 pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1281 let conn = self.writer.lock();
1282 let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1283 Some(v) => v,
1284 None => {
1285 return Err(MCSError::InvalidParams(format!(
1286 "Source entity '{source}' not found"
1287 )))
1288 }
1289 };
1290 let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1291 Some(v) => v,
1292 None => {
1293 return Err(MCSError::InvalidParams(format!(
1294 "Target entity '{target}' not found"
1295 )))
1296 }
1297 };
1298
1299 if src_id == tgt_id {
1300 return self.get_entity(target)?.ok_or_else(|| {
1301 MCSError::InvalidParams("Target entity not found after merge".into())
1302 });
1303 }
1304
1305 let mut obs_count: i64 = 0;
1307 {
1308 let mut max_idx: i64 = conn
1309 .query_row(
1310 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1311 params![tgt_id],
1312 |row| row.get(0),
1313 )
1314 .map_err(sqlite_err)?;
1315 let mut sel_obs = conn
1316 .prepare_cached(
1317 "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1318 )
1319 .map_err(sqlite_err)?;
1320 let mut upd_obs = conn
1321 .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1322 .map_err(sqlite_err)?;
1323 let rows: Vec<(i64, String)> = sel_obs
1324 .query_map(params![src_id], |row| {
1325 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1326 })
1327 .map_err(sqlite_err)?
1328 .filter_map(|r| r.ok())
1329 .collect();
1330 for (oid, _body) in &rows {
1331 max_idx += 1;
1332 upd_obs
1333 .execute(params![tgt_id, max_idx, oid])
1334 .map_err(sqlite_err)?;
1335 obs_count += 1;
1336 }
1337 }
1338
1339 conn.execute(
1341 "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1342 params![tgt_id, src_id],
1343 )
1344 .map_err(sqlite_err)?;
1345 conn.execute(
1346 "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1347 params![tgt_id, src_id],
1348 )
1349 .map_err(sqlite_err)?;
1350 conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1354 .map_err(sqlite_err)?;
1355 conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1356 .map_err(sqlite_err)?;
1357
1358 let out_add: i64 = conn
1360 .query_row(
1361 "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1362 params![tgt_id],
1363 |row| row.get(0),
1364 )
1365 .map_err(sqlite_err)?;
1366 let in_add: i64 = conn
1367 .query_row(
1368 "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1369 params![tgt_id],
1370 |row| row.get(0),
1371 )
1372 .map_err(sqlite_err)?;
1373 conn.execute(
1374 "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1375 params![out_add, in_add, obs_count, now_us(), tgt_id],
1376 )
1377 .map_err(sqlite_err)?;
1378
1379 conn.execute(
1381 "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1382 params![src_id],
1383 )
1384 .map_err(sqlite_err)?;
1385 conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1386 .map_err(sqlite_err)?;
1387
1388 inc_graph_stat(&conn, "entities", -1)?;
1389 self.meta_remove(source);
1390
1391 if let Ok(meta) = conn.query_row(
1393 "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1394 params![tgt_id],
1395 |row| {
1396 Ok(EntityMeta {
1397 id: row.get(0)?,
1398 type_id: row.get(1)?,
1399 obs_count: row.get(2)?,
1400 out_deg: row.get(3)?,
1401 in_deg: row.get(4)?,
1402 })
1403 },
1404 ) {
1405 self.meta_set(target, meta);
1406 }
1407
1408 let (name, etype): (String, String) = conn
1409 .query_row(
1410 "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1411 params![tgt_id],
1412 |row| Ok((row.get(0)?, row.get(1)?)),
1413 )
1414 .map_err(sqlite_err)?;
1415 let observations = load_observations_opt(&conn, tgt_id);
1416
1417 Ok(Entity {
1418 name,
1419 entity_type: etype,
1420 observations,
1421 })
1422 }
1423
1424 pub fn search_nodes_filtered(
1425 &self,
1426 query: &str,
1427 filter_type: Option<&str>,
1428 offset: usize,
1429 limit: usize,
1430 ) -> Vec<Entity> {
1431 if query.is_empty() {
1432 return Vec::new();
1433 }
1434 let conn = self.readers.get();
1435
1436 let mut entity_ids: Vec<i64> = Vec::new();
1438 let mut seen: HashSet<i64> = HashSet::new();
1439
1440 if let Ok(mut stmt) = conn.prepare(
1441 "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1442 ) {
1443 let limit_i64 = (limit + offset) as i64;
1444 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1445 row.get::<_, i64>(0)
1446 }) {
1447 for row in rows.flatten() {
1448 if seen.insert(row) {
1449 entity_ids.push(row);
1450 }
1451 }
1452 }
1453 }
1454
1455 if let Ok(mut stmt) = conn.prepare(
1456 "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1457 WHERE obs_fts MATCH ?1
1458 GROUP BY entity_id
1459 LIMIT ?2",
1460 ) {
1461 let limit_i64 = (limit + offset) as i64;
1462 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1463 row.get::<_, i64>(0)
1464 }) {
1465 for row in rows.flatten() {
1466 if seen.insert(row) {
1467 entity_ids.push(row);
1468 }
1469 }
1470 }
1471 }
1472
1473 let mut results = Vec::new();
1475 let mut count: usize = 0;
1476 for eid in entity_ids {
1477 if let Ok(entity) = entity_by_id(&conn, eid) {
1478 if let Some(ft) = filter_type
1479 && !ft.is_empty() && entity.entity_type != ft {
1480 continue;
1481 }
1482 if count < offset {
1483 count += 1;
1484 continue;
1485 }
1486 if results.len() >= limit {
1487 break;
1488 }
1489 results.push(entity);
1490 count += 1;
1491 }
1492 }
1493
1494 results
1495 }
1496
1497 pub fn read_graph_filtered(
1498 &self,
1499 filter_type: Option<&str>,
1500 offset: usize,
1501 limit: usize,
1502 ) -> Result<String> {
1503 let conn = self.readers.get();
1504
1505 let limit_sql: i64 = if limit == usize::MAX {
1506 -1
1507 } else {
1508 limit.min(i64::MAX as usize) as i64
1509 };
1510 let offset_sql: i64 = offset as i64;
1511
1512 let filter = filter_type.filter(|ft| !ft.is_empty());
1518 let ids: Vec<i64> = if let Some(ft) = filter {
1519 let mut stmt = conn
1520 .prepare_cached(
1521 "SELECT e.id FROM entity e
1522 WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1523 AND e.flags = 0
1524 ORDER BY e.id LIMIT ?2 OFFSET ?3",
1525 )
1526 .map_err(sqlite_err)?;
1527 stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1528 .map_err(sqlite_err)?
1529 .filter_map(|r| r.ok())
1530 .collect()
1531 } else {
1532 let mut stmt = conn
1533 .prepare_cached(
1534 "SELECT e.id FROM entity e WHERE e.flags = 0
1535 ORDER BY e.id LIMIT ?1 OFFSET ?2",
1536 )
1537 .map_err(sqlite_err)?;
1538 stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1539 .map_err(sqlite_err)?
1540 .filter_map(|r| r.ok())
1541 .collect()
1542 };
1543
1544 if ids.is_empty() {
1545 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1546 }
1547
1548 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1549
1550 let entities_json: String = {
1551 let sql = format!(
1552 "SELECT COALESCE(json_group_array(json_object(
1553 'name', e.name,
1554 'entityType', t.name,
1555 'observations', COALESCE((
1556 SELECT json_group_array(o.body ORDER BY o.idx)
1557 FROM observation o WHERE o.entity_id = e.id
1558 ), json('[]'))
1559 ) ORDER BY e.id), json('[]'))
1560 FROM entity e
1561 JOIN type_dict t ON t.id = e.type_id
1562 WHERE e.id IN ({placeholders}) AND e.flags = 0"
1563 );
1564 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1565 row.get::<_, String>(0)
1566 })
1567 .map_err(sqlite_err)?
1568 };
1569
1570 let relations_json: String = {
1571 let sql = format!(
1572 "SELECT COALESCE(json_group_array(json_object(
1573 'from', e1.name,
1574 'to', e2.name,
1575 'relationType', t.name
1576 )), json('[]'))
1577 FROM relation r
1578 JOIN entity e1 ON e1.id = r.from_id
1579 JOIN entity e2 ON e2.id = r.to_id
1580 JOIN type_dict t ON t.id = r.type_id
1581 WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1582 AND e1.flags = 0 AND e2.flags = 0"
1583 );
1584 let all_params: Vec<&dyn ToSql> = ids
1585 .iter()
1586 .map(|id| id as &dyn ToSql)
1587 .chain(ids.iter().map(|id| id as &dyn ToSql))
1588 .collect();
1589 conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1590 .map_err(sqlite_err)?
1591 };
1592
1593 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1594 out.push_str("{\"entities\":");
1595 out.push_str(&entities_json);
1596 out.push_str(",\"relations\":");
1597 out.push_str(&relations_json);
1598 out.push('}');
1599 Ok(out)
1600 }
1601
1602 pub fn open_nodes(&self, names: &[String]) -> String {
1603 let conn = self.readers.get();
1604 let mut entity_ids: Vec<i64> = Vec::new();
1605
1606 for name in names {
1607 let h = name_hash(name);
1608 if let Ok(Some(id)) = conn
1609 .query_row(
1610 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1611 params![h, name],
1612 |row| row.get::<_, i64>(0),
1613 )
1614 .map(Some)
1615 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1616 {
1617 entity_ids.push(id);
1618 }
1619 }
1620
1621 if entity_ids.is_empty() {
1622 return r#"{"entities":[],"relations":[]}"#.to_string();
1623 }
1624
1625 let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1626 let ids_str = placeholders.join(",");
1627
1628 let entities_json: String = {
1629 let sql = format!(
1630 "SELECT COALESCE(json_group_array(json_object(
1631 'name', e.name,
1632 'entityType', t.name,
1633 'observations', COALESCE((
1634 SELECT json_group_array(o.body ORDER BY o.idx)
1635 FROM observation o WHERE o.entity_id = e.id
1636 ), json('[]'))
1637 ) ORDER BY e.id), json('[]'))
1638 FROM entity e
1639 JOIN type_dict t ON t.id = e.type_id
1640 WHERE e.id IN ({ids_str}) AND e.flags = 0"
1641 );
1642 conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1643 row.get::<_, String>(0)
1644 })
1645 .unwrap_or_else(|_| "[]".to_string())
1646 };
1647
1648 let relations_json: String = {
1649 let sql = format!(
1650 "SELECT COALESCE(json_group_array(json_object(
1651 'from', e1.name,
1652 'to', e2.name,
1653 'relationType', t.name
1654 )), json('[]'))
1655 FROM relation r
1656 JOIN entity e1 ON e1.id = r.from_id
1657 JOIN entity e2 ON e2.id = r.to_id
1658 JOIN type_dict t ON t.id = r.type_id
1659 WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1660 AND e1.flags = 0 AND e2.flags = 0"
1661 );
1662 let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1663 .iter()
1664 .map(|id| id as &dyn rusqlite::types::ToSql)
1665 .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1666 .collect();
1667 let mut stmt = conn.prepare(&sql).unwrap();
1668 stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1669 .unwrap_or_else(|_| "[]".to_string())
1670 };
1671
1672 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1673 out.push_str("{\"entities\":");
1674 out.push_str(&entities_json);
1675 out.push_str(",\"relations\":");
1676 out.push_str(&relations_json);
1677 out.push('}');
1678 out
1679 }
1680
1681 pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1682 let conn = self.readers.get();
1683 let mut results = Vec::with_capacity(names.len());
1684 for name in names {
1685 let h = name_hash(name);
1686 let exists: bool = conn
1687 .query_row(
1688 "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1689 params![h, name],
1690 |_| Ok(()),
1691 )
1692 .is_ok();
1693 results.push(exists);
1694 }
1695 Ok(results)
1696 }
1697
1698 pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1699 let conn = self.readers.get();
1700 let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1701 Some(v) => v,
1702 None => {
1703 return Err(MCSError::InvalidParams(format!(
1704 "Entity '{name}' not found"
1705 )))
1706 }
1707 };
1708 Ok(match direction {
1709 Direction::Outgoing => out_d as usize,
1710 Direction::Incoming => in_d as usize,
1711 Direction::Both => (out_d + in_d) as usize,
1712 })
1713 }
1714
1715 pub fn get_entity_count(&self) -> Result<usize> {
1716 let conn = self.readers.get();
1717 read_graph_stat(&conn, "entities")
1718 .map(|v| v as usize)
1719 .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1720 }
1721
1722 pub fn get_relation_count(&self) -> Result<usize> {
1723 let conn = self.readers.get();
1724 read_graph_stat(&conn, "relations")
1725 .map(|v| v as usize)
1726 .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1727 }
1728
1729 pub fn search_relations(
1730 &self,
1731 from: Option<&str>,
1732 to: Option<&str>,
1733 rtype: Option<&str>,
1734 ) -> Vec<Relation> {
1735 let conn = self.readers.get();
1736 let mut results = Vec::new();
1737
1738 let from_id = from
1744 .filter(|f| !f.is_empty())
1745 .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1746 let to_id = to
1747 .filter(|t| !t.is_empty())
1748 .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1749 let type_id = rtype
1750 .filter(|rt| !rt.is_empty())
1751 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1752
1753 match (from_id, to_id, type_id) {
1754 (Some(fid), Some(tid), Some(tpid)) => {
1755 if let Ok(mut stmt) = conn.prepare_cached(
1756 "SELECT e1.name, e2.name, t.name
1757 FROM relation r
1758 JOIN entity e1 ON e1.id = r.from_id
1759 JOIN entity e2 ON e2.id = r.to_id
1760 JOIN type_dict t ON t.id = r.type_id
1761 WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1762 AND e1.flags = 0 AND e2.flags = 0
1763 ORDER BY r.from_id, r.to_id"
1764 )
1765 && let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1766 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1767 }) {
1768 for row in rows.flatten() { results.push(row); }
1769 }
1770 }
1771 (Some(fid), Some(tid), None) => {
1772 if let Ok(mut stmt) = conn.prepare_cached(
1773 "SELECT e1.name, e2.name, t.name
1774 FROM relation r
1775 JOIN entity e1 ON e1.id = r.from_id
1776 JOIN entity e2 ON e2.id = r.to_id
1777 JOIN type_dict t ON t.id = r.type_id
1778 WHERE r.from_id = ?1 AND r.to_id = ?2
1779 AND e1.flags = 0 AND e2.flags = 0
1780 ORDER BY r.from_id, r.to_id"
1781 )
1782 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1783 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1784 }) {
1785 for row in rows.flatten() { results.push(row); }
1786 }
1787 }
1788 (Some(fid), None, Some(tpid)) => {
1789 if let Ok(mut stmt) = conn.prepare_cached(
1790 "SELECT e1.name, e2.name, t.name
1791 FROM relation r
1792 JOIN entity e1 ON e1.id = r.from_id
1793 JOIN entity e2 ON e2.id = r.to_id
1794 JOIN type_dict t ON t.id = r.type_id
1795 WHERE r.from_id = ?1 AND r.type_id = ?2
1796 AND e1.flags = 0 AND e2.flags = 0
1797 ORDER BY r.from_id, r.to_id"
1798 )
1799 && let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1800 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1801 }) {
1802 for row in rows.flatten() { results.push(row); }
1803 }
1804 }
1805 (None, Some(tid), Some(tpid)) => {
1806 if let Ok(mut stmt) = conn.prepare_cached(
1807 "SELECT e1.name, e2.name, t.name
1808 FROM relation r
1809 JOIN entity e1 ON e1.id = r.from_id
1810 JOIN entity e2 ON e2.id = r.to_id
1811 JOIN type_dict t ON t.id = r.type_id
1812 WHERE r.to_id = ?1 AND r.type_id = ?2
1813 AND e1.flags = 0 AND e2.flags = 0
1814 ORDER BY r.from_id, r.to_id"
1815 )
1816 && let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1817 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1818 }) {
1819 for row in rows.flatten() { results.push(row); }
1820 }
1821 }
1822 (Some(fid), None, None) => {
1823 if let Ok(mut stmt) = conn.prepare_cached(
1824 "SELECT e1.name, e2.name, t.name
1825 FROM relation r
1826 JOIN entity e1 ON e1.id = r.from_id
1827 JOIN entity e2 ON e2.id = r.to_id
1828 JOIN type_dict t ON t.id = r.type_id
1829 WHERE r.from_id = ?1
1830 AND e1.flags = 0 AND e2.flags = 0
1831 ORDER BY r.from_id, r.to_id"
1832 )
1833 && let Ok(rows) = stmt.query_map(params![fid], |row| {
1834 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1835 }) {
1836 for row in rows.flatten() { results.push(row); }
1837 }
1838 }
1839 (None, Some(tid), None) => {
1840 if let Ok(mut stmt) = conn.prepare_cached(
1841 "SELECT e1.name, e2.name, t.name
1842 FROM relation r
1843 JOIN entity e1 ON e1.id = r.from_id
1844 JOIN entity e2 ON e2.id = r.to_id
1845 JOIN type_dict t ON t.id = r.type_id
1846 WHERE r.to_id = ?1
1847 AND e1.flags = 0 AND e2.flags = 0
1848 ORDER BY r.from_id, r.to_id"
1849 )
1850 && let Ok(rows) = stmt.query_map(params![tid], |row| {
1851 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1852 }) {
1853 for row in rows.flatten() { results.push(row); }
1854 }
1855 }
1856 (None, None, Some(tpid)) => {
1857 if let Ok(mut stmt) = conn.prepare_cached(
1858 "SELECT e1.name, e2.name, t.name
1859 FROM relation r
1860 JOIN entity e1 ON e1.id = r.from_id
1861 JOIN entity e2 ON e2.id = r.to_id
1862 JOIN type_dict t ON t.id = r.type_id
1863 WHERE r.type_id = ?1
1864 AND e1.flags = 0 AND e2.flags = 0
1865 ORDER BY r.from_id, r.to_id"
1866 )
1867 && let Ok(rows) = stmt.query_map(params![tpid], |row| {
1868 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1869 }) {
1870 for row in rows.flatten() { results.push(row); }
1871 }
1872 }
1873 (None, None, None) => {
1874 if let Ok(mut stmt) = conn.prepare_cached(
1875 "SELECT e1.name, e2.name, t.name
1876 FROM relation r
1877 JOIN entity e1 ON e1.id = r.from_id
1878 JOIN entity e2 ON e2.id = r.to_id
1879 JOIN type_dict t ON t.id = r.type_id
1880 WHERE e1.flags = 0 AND e2.flags = 0
1881 ORDER BY r.from_id, r.to_id"
1882 )
1883 && let Ok(rows) = stmt.query_map([], |row| {
1884 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1885 }) {
1886 for row in rows.flatten() { results.push(row); }
1887 }
1888 }
1889 }
1890 results
1891 }
1892
1893 pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1894 let conn = self.readers.get();
1895 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1896 Some(v) => v,
1897 None => {
1898 return Err(MCSError::InvalidParams(format!(
1899 "Source entity '{from}' not found"
1900 )))
1901 }
1902 };
1903 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1904 Some(v) => v,
1905 None => {
1906 return Err(MCSError::InvalidParams(format!(
1907 "Target entity '{to}' not found"
1908 )))
1909 }
1910 };
1911
1912 if from_id == to_id {
1913 return Ok(Some(vec![from.to_string()]));
1914 }
1915
1916 let mut visited = HashSet::new();
1918 let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1919 let mut queue = VecDeque::new();
1920 visited.insert(from_id);
1921 queue.push_back(from_id);
1922
1923 while let Some(cur) = queue.pop_front() {
1924 if cur == to_id {
1925 break;
1926 }
1927 if let Ok(mut stmt) =
1929 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1930 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1931 for row in rows.flatten() {
1932 if visited.insert(row) {
1933 parent.insert(row, cur);
1934 queue.push_back(row);
1935 }
1936 }
1937 }
1938 if let Ok(mut stmt) =
1940 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1941 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1942 for row in rows.flatten() {
1943 if visited.insert(row) {
1944 parent.insert(row, cur);
1945 queue.push_back(row);
1946 }
1947 }
1948 }
1949 }
1950
1951 if !parent.contains_key(&to_id) && to_id != from_id {
1952 return Ok(None);
1953 }
1954
1955 let mut path = Vec::new();
1956 let mut cur = to_id;
1957 path.push(cur);
1958 while let Some(&p) = parent.get(&cur) {
1959 path.push(p);
1960 cur = p;
1961 if cur == from_id {
1962 break;
1963 }
1964 }
1965 path.reverse();
1966
1967 let placeholders: Vec<String> = path.iter().map(|_| "?".to_string()).collect();
1968 let sql = format!(
1969 "SELECT id, name FROM entity WHERE id IN ({})",
1970 placeholders.join(",")
1971 );
1972 let name_map: FxHashMap<i64, String> = if let Ok(mut stmt) = conn.prepare(&sql)
1973 && let Ok(rows) = stmt.query_map(
1974 rusqlite::params_from_iter(&path),
1975 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1976 ) {
1977 rows.flatten().collect()
1978 } else {
1979 FxHashMap::default()
1980 };
1981
1982 let name_path: Vec<String> = path.iter().filter_map(|id| name_map.get(id).cloned()).collect();
1983
1984 Ok(Some(name_path))
1985 }
1986
1987 pub fn compact(&self) -> Result<()> {
1988 let conn = self.writer.lock();
1989 conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
1990 Ok(())
1991 }
1992
1993 pub fn neighbors(
1994 &self,
1995 name: &str,
1996 direction: Direction,
1997 rtype: Option<&str>,
1998 depth: u32,
1999 ) -> Result<String> {
2000 self._traverse(name, direction, rtype, depth, true)
2001 }
2002
2003 pub fn extract_subgraph(
2004 &self,
2005 names: &[String],
2006 depth: u32,
2007 ) -> Result<String> {
2008 if names.is_empty() {
2009 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2010 }
2011
2012 let conn = self.readers.get();
2013 let mut all_entity_ids: HashSet<i64> = HashSet::new();
2014 let mut frontier: HashSet<i64> = HashSet::new();
2015 let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2016
2017 for name in names {
2019 let h = name_hash(name);
2020 if let Ok(Some(id)) = conn
2021 .query_row(
2022 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2023 params![h, name],
2024 |row| row.get::<_, i64>(0),
2025 )
2026 .map(Some)
2027 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2028 {
2029 all_entity_ids.insert(id);
2030 frontier.insert(id);
2031 }
2032 }
2033
2034 let mut current_depth = 0u32;
2035 while current_depth < depth && !frontier.is_empty() {
2036 let mut next_frontier: HashSet<i64> = HashSet::new();
2037
2038 const CHUNK: usize = 500;
2041 let frontier_ids: Vec<i64> = frontier.iter().copied().collect();
2042 for chunk in frontier_ids.chunks(CHUNK) {
2043 let placeholders: Vec<String> = chunk.iter().map(|_| "?".to_string()).collect();
2044 let in_clause = placeholders.join(",");
2045
2046 if let Ok(mut stmt) = conn.prepare(
2048 &format!(
2049 "SELECT from_id, to_id, type_id FROM relation WHERE from_id IN ({in_clause})",
2050 )
2051 )
2052 && let Ok(rows) = stmt.query_map(
2053 rusqlite::params_from_iter(chunk),
2054 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2055 )
2056 {
2057 for row in rows.flatten() {
2058 let (from_id, to_id, type_id) = row;
2059 all_rel_pairs.insert((from_id, to_id, type_id));
2060 if all_entity_ids.insert(to_id) {
2061 next_frontier.insert(to_id);
2062 }
2063 }
2064 }
2065
2066 if let Ok(mut stmt) = conn.prepare(
2068 &format!(
2069 "SELECT from_id, to_id, type_id FROM relation WHERE to_id IN ({in_clause})",
2070 )
2071 )
2072 && let Ok(rows) = stmt.query_map(
2073 rusqlite::params_from_iter(chunk),
2074 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2075 )
2076 {
2077 for row in rows.flatten() {
2078 let (from_id, to_id, type_id) = row;
2079 all_rel_pairs.insert((from_id, to_id, type_id));
2080 if all_entity_ids.insert(from_id) {
2081 next_frontier.insert(from_id);
2082 }
2083 }
2084 }
2085 }
2086 frontier = next_frontier;
2087 current_depth += 1;
2088 }
2089
2090 let entities_json: String = {
2091 if all_entity_ids.is_empty() {
2092 "[]".to_string()
2093 } else {
2094 let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2095 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2096 let sql = format!(
2097 "SELECT COALESCE(json_group_array(json_object(
2098 'name', e.name,
2099 'entityType', t.name,
2100 'observations', COALESCE((
2101 SELECT json_group_array(o.body ORDER BY o.idx)
2102 FROM observation o WHERE o.entity_id = e.id
2103 ), json('[]'))
2104 ) ORDER BY e.id), json('[]'))
2105 FROM entity e
2106 JOIN type_dict t ON t.id = e.type_id
2107 WHERE e.id IN ({}) AND e.flags = 0",
2108 placeholders.join(",")
2109 );
2110 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2111 row.get::<_, String>(0)
2112 })
2113 .unwrap_or_else(|_| "[]".to_string())
2114 }
2115 };
2116
2117 let relations_json: String = {
2118 if all_rel_pairs.is_empty() {
2119 "[]".to_string()
2120 } else {
2121 let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2122 let sql = format!(
2123 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2124 SELECT COALESCE(json_group_array(json_object(
2125 'from', e1.name,
2126 'to', e2.name,
2127 'relationType', t.name
2128 )), json('[]'))
2129 FROM r
2130 JOIN entity e1 ON e1.id = r.from_id
2131 JOIN entity e2 ON e2.id = r.to_id
2132 JOIN type_dict t ON t.id = r.type_id
2133 WHERE e1.flags = 0 AND e2.flags = 0",
2134 vals.join(", ")
2135 );
2136 let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2137 .flat_map(|(f, t, tp)| {
2138 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2139 })
2140 .collect();
2141 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2142 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2143 .unwrap_or_else(|_| "[]".to_string())
2144 }
2145 };
2146
2147 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2148 out.push_str("{\"entities\":");
2149 out.push_str(&entities_json);
2150 out.push_str(",\"relations\":");
2151 out.push_str(&relations_json);
2152 out.push('}');
2153 Ok(out)
2154 }
2155
2156 pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2157 self.get_entity(name)?
2158 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2159 }
2160
2161 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2162 let conn = self.readers.get();
2163 select_all_types(&conn, 0).unwrap_or_default()
2164 }
2165
2166 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2167 let conn = self.readers.get();
2168 select_all_types(&conn, 1).unwrap_or_default()
2169 }
2170
2171 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2172 names
2173 .iter()
2174 .map(|n| self.get_entity(n).unwrap_or(None))
2175 .collect()
2176 }
2177
2178 pub fn find_all_paths(
2179 &self,
2180 from: &str,
2181 to: &str,
2182 max_depth: usize,
2183 max_paths: usize,
2184 ) -> Result<Vec<Vec<String>>> {
2185 let conn = self.readers.get();
2186 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2187 Some(v) => v,
2188 None => {
2189 return Err(MCSError::InvalidParams(format!(
2190 "Source entity '{from}' not found"
2191 )))
2192 }
2193 };
2194 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2195 Some(v) => v,
2196 None => {
2197 return Err(MCSError::InvalidParams(format!(
2198 "Target entity '{to}' not found"
2199 )))
2200 }
2201 };
2202
2203 if from_id == to_id {
2204 return Ok(vec![vec![from.to_string()]]);
2205 }
2206
2207 let mut all_paths: Vec<Vec<i64>> = Vec::new();
2209 let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2210 queue.push_back((from_id, vec![from_id]));
2211
2212 const MAX_QUEUE_SIZE: usize = 10_000_000;
2213
2214 while let Some((cur, path)) = queue.pop_front() {
2215 if all_paths.len() >= max_paths {
2216 break;
2217 }
2218 if path.len() > max_depth {
2219 continue;
2220 }
2221
2222 if let Ok(mut stmt) =
2224 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2225 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2226 for next_id in rows.flatten() {
2227 if next_id == to_id {
2228 let mut full_path = path.clone();
2229 full_path.push(next_id);
2230 all_paths.push(full_path);
2231 if all_paths.len() >= max_paths {
2232 break;
2233 }
2234 } else if !path.contains(&next_id) && path.len() < max_depth {
2235 if queue.len() >= MAX_QUEUE_SIZE {
2236 return Err(MCSError::InvalidParams(
2237 "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2238 ));
2239 }
2240 let mut new_path = path.clone();
2241 new_path.push(next_id);
2242 queue.push_back((next_id, new_path));
2243 }
2244 }
2245 }
2246
2247 if let Ok(mut stmt) =
2249 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2250 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2251 for next_id in rows.flatten() {
2252 if next_id == to_id {
2253 let mut full_path = path.clone();
2254 full_path.push(next_id);
2255 all_paths.push(full_path);
2256 if all_paths.len() >= max_paths {
2257 break;
2258 }
2259 } else if !path.contains(&next_id) && path.len() < max_depth {
2260 if queue.len() >= MAX_QUEUE_SIZE {
2261 return Err(MCSError::InvalidParams(
2262 "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2263 ));
2264 }
2265 let mut new_path = path.clone();
2266 new_path.push(next_id);
2267 queue.push_back((next_id, new_path));
2268 }
2269 }
2270 }
2271 }
2272
2273 let all_ids: HashSet<i64> = all_paths.iter().flat_map(|p| p.iter()).copied().collect();
2275 let id_list: Vec<i64> = all_ids.into_iter().collect();
2276 let name_map: FxHashMap<i64, String> = if id_list.is_empty() {
2277 FxHashMap::default()
2278 } else {
2279 let placeholders: Vec<String> = id_list.iter().map(|_| "?".to_string()).collect();
2280 let sql = format!(
2281 "SELECT id, name FROM entity WHERE id IN ({})",
2282 placeholders.join(",")
2283 );
2284 if let Ok(mut stmt) = conn.prepare(&sql)
2285 && let Ok(rows) = stmt.query_map(
2286 rusqlite::params_from_iter(&id_list),
2287 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
2288 ) {
2289 rows.flatten().collect()
2290 } else {
2291 FxHashMap::default()
2292 }
2293 };
2294
2295 let mut named_paths: Vec<Vec<String>> = Vec::with_capacity(all_paths.len());
2296 for path_ids in all_paths {
2297 let named: Vec<String> = path_ids.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2298 named_paths.push(named);
2299 }
2300
2301 Ok(named_paths)
2302 }
2303
2304 pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2309 let conn = self.readers.get();
2310 conn.query_row(
2313 "SELECT json_object(
2314 'entities', COALESCE((
2315 SELECT json_group_array(json_object(
2316 'name', e.name,
2317 'entityType', t.name,
2318 'observations', COALESCE((
2319 SELECT json_group_array(o.body ORDER BY o.idx)
2320 FROM observation o WHERE o.entity_id = e.id
2321 ), json('[]'))
2322 ) ORDER BY e.id)
2323 FROM (
2324 SELECT id, name, type_id FROM entity
2325 WHERE flags = 0 ORDER BY id LIMIT ?1
2326 ) e
2327 JOIN type_dict t ON t.id = e.type_id
2328 ), json('[]')),
2329 'relations', COALESCE((
2330 SELECT json_group_array(json_object(
2331 'from', e1.name,
2332 'to', e2.name,
2333 'relationType', t.name
2334 ))
2335 FROM (
2336 SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2337 ) r
2338 JOIN entity e1 ON e1.id = r.from_id
2339 JOIN entity e2 ON e2.id = r.to_id
2340 JOIN type_dict t ON t.id = r.type_id
2341 WHERE e1.flags = 0 AND e2.flags = 0
2342 ), json('[]'))
2343 )",
2344 params![max_rows],
2345 |row| row.get::<_, String>(0),
2346 )
2347 .map_err(sqlite_err)
2348 }
2349
2350 pub fn wipe(&self) -> Result<()> {
2351 let conn = self.writer.lock();
2352 conn.execute_batch(
2357 "DELETE FROM observation;
2358 DELETE FROM relation;
2359 DELETE FROM entity;
2360 DELETE FROM type_dict;
2361 INSERT INTO name_fts(name_fts) VALUES('delete-all');
2362 INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2363 UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2364 UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2365 )
2366 .map_err(sqlite_err)?;
2367 self.seq_entity.store(0, Ordering::Relaxed);
2368 self.seq_obs.store(0, Ordering::Relaxed);
2369 self.cache.lock().clear();
2370 Ok(())
2371 }
2372
2373 pub fn run_maintenance(&self) -> Result<()> {
2376 let conn = self.writer.lock();
2377
2378 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2379 .map_err(sqlite_err)?;
2380
2381 conn.execute_batch("PRAGMA optimize(0x10000);")
2382 .map_err(sqlite_err)?;
2383
2384 conn.execute_batch(
2385 "INSERT INTO name_fts(name_fts) VALUES('optimize');
2386 INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2387 )
2388 .map_err(sqlite_err)?;
2389
2390 Ok(())
2391 }
2392
2393 pub fn checkpoint_passive(&self) -> Result<()> {
2397 let conn = self.writer.lock();
2398 conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);")
2399 .map_err(sqlite_err)?;
2400 Ok(())
2401 }
2402
2403 fn _traverse(
2404 &self,
2405 name: &str,
2406 direction: Direction,
2407 rtype: Option<&str>,
2408 depth: u32,
2409 _include_relations: bool,
2411 ) -> Result<String> {
2412 let conn = self.readers.get();
2413 let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2414 Some(v) => v,
2415 None => {
2416 return Err(MCSError::InvalidParams(format!(
2417 "Entity '{name}' not found"
2418 )))
2419 }
2420 };
2421
2422 let mut all_ids: HashSet<i64> = HashSet::new();
2423 let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2424 let mut frontier: HashSet<i64> = HashSet::new();
2425 all_ids.insert(start_id);
2426 frontier.insert(start_id);
2427
2428 let type_filter: Option<i64> = rtype
2434 .filter(|rt| !rt.is_empty())
2435 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2436
2437 let mut q_out_t = conn.prepare_cached(
2439 "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2440 let mut q_out = conn.prepare_cached(
2441 "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2442 let mut q_in_t = conn.prepare_cached(
2443 "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2444 let mut q_in = conn.prepare_cached(
2445 "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2446
2447 let mut cur_depth = 0u32;
2448 while cur_depth < depth && !frontier.is_empty() {
2449 let mut next_frontier: HashSet<i64> = HashSet::new();
2450
2451 for &fid in &frontier {
2452 if direction == Direction::Outgoing || direction == Direction::Both {
2453 if let Some(tid) = type_filter {
2454 if let Ok(ref mut stmt) = q_out_t
2455 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2456 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2457 }) {
2458 for row in rows.flatten() {
2459 let (to_id, t_id) = row;
2460 all_rels.insert((fid, to_id, t_id));
2461 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2462 }
2463 }
2464 } else if let Ok(ref mut stmt) = q_out
2465 && let Ok(rows) = stmt.query_map(params![fid], |row| {
2466 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2467 }) {
2468 for row in rows.flatten() {
2469 let (to_id, t_id) = row;
2470 all_rels.insert((fid, to_id, t_id));
2471 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2472 }
2473 }
2474 }
2475
2476 if direction == Direction::Incoming || direction == Direction::Both {
2477 if let Some(tid) = type_filter {
2478 if let Ok(ref mut stmt) = q_in_t
2479 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2480 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2481 }) {
2482 for row in rows.flatten() {
2483 let (from_id, t_id) = row;
2484 all_rels.insert((from_id, fid, t_id));
2485 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2486 }
2487 }
2488 } else if let Ok(ref mut stmt) = q_in
2489 && let Ok(rows) = stmt.query_map(params![fid], |row| {
2490 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2491 }) {
2492 for row in rows.flatten() {
2493 let (from_id, t_id) = row;
2494 all_rels.insert((from_id, fid, t_id));
2495 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2496 }
2497 }
2498 }
2499 }
2500
2501 frontier = next_frontier;
2502 cur_depth += 1;
2503 }
2504
2505 let entities_json: String = {
2506 if all_ids.is_empty() {
2507 "[]".to_string()
2508 } else {
2509 let ids: Vec<i64> = all_ids.iter().copied().collect();
2510 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2511 let sql = format!(
2512 "SELECT COALESCE(json_group_array(json_object(
2513 'name', e.name,
2514 'entityType', t.name,
2515 'observations', COALESCE((
2516 SELECT json_group_array(o.body ORDER BY o.idx)
2517 FROM observation o WHERE o.entity_id = e.id
2518 ), json('[]'))
2519 ) ORDER BY e.id), json('[]'))
2520 FROM entity e
2521 JOIN type_dict t ON t.id = e.type_id
2522 WHERE e.id IN ({}) AND e.flags = 0",
2523 placeholders.join(",")
2524 );
2525 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2526 row.get::<_, String>(0)
2527 })
2528 .unwrap_or_else(|_| "[]".to_string())
2529 }
2530 };
2531
2532 let relations_json: String = {
2533 if all_rels.is_empty() {
2534 "[]".to_string()
2535 } else {
2536 let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2537 let sql = format!(
2538 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2539 SELECT COALESCE(json_group_array(json_object(
2540 'from', e1.name,
2541 'to', e2.name,
2542 'relationType', t.name
2543 )), json('[]'))
2544 FROM r
2545 JOIN entity e1 ON e1.id = r.from_id
2546 JOIN entity e2 ON e2.id = r.to_id
2547 JOIN type_dict t ON t.id = r.type_id
2548 WHERE e1.flags = 0 AND e2.flags = 0",
2549 vals.join(", ")
2550 );
2551 let params: Vec<&dyn ToSql> = all_rels.iter()
2552 .flat_map(|(f, t, tp)| {
2553 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2554 })
2555 .collect();
2556 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2557 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2558 .unwrap_or_else(|_| "[]".to_string())
2559 }
2560 };
2561
2562 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2563 out.push_str("{\"entities\":");
2564 out.push_str(&entities_json);
2565 out.push_str(",\"relations\":");
2566 out.push_str(&relations_json);
2567 out.push('}');
2568 Ok(out)
2569 }
2570}
2571
2572#[cfg(test)]
2575mod tests {
2576 use super::*;
2577 use serde_json::Value;
2578 use std::ops::Deref;
2579 use std::path::PathBuf;
2580
2581 struct TestKg(GraphHandle, PathBuf);
2582
2583 impl Deref for TestKg {
2584 type Target = GraphHandle;
2585 fn deref(&self) -> &GraphHandle {
2586 &self.0
2587 }
2588 }
2589
2590 impl Drop for TestKg {
2591 fn drop(&mut self) {
2592 cleanup_db(&self.1);
2593 }
2594 }
2595
2596 fn cleanup_db(path: &std::path::Path) {
2597 let _ = std::fs::remove_file(path);
2598 let _ = std::fs::remove_file(path.with_extension("db-wal"));
2599 let _ = std::fs::remove_file(path.with_extension("db-shm"));
2600 }
2601
2602 fn new_kg() -> TestKg {
2603 use std::sync::atomic::AtomicU64;
2604 use std::sync::atomic::Ordering;
2605 static COUNTER: AtomicU64 = AtomicU64::new(0);
2606 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2607 let dir = std::env::temp_dir();
2608 let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2609 cleanup_db(&path);
2610 let kg = GraphHandle::new(&path, Durability::Async, SqliteTuning::default(), NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2611 TestKg(kg, path)
2612 }
2613
2614 #[test]
2615 fn test_create_and_get_entity() {
2616 let kg = new_kg();
2617 let entities = vec![Entity {
2618 name: "test".into(),
2619 entity_type: "person".into(),
2620 observations: vec!["obs1".into(), "obs2".into()],
2621 }];
2622 let created = kg.create_entities(&entities).unwrap();
2623 assert_eq!(created.len(), 1);
2624
2625 let got = kg.get_entity("test").unwrap().unwrap();
2626 assert_eq!(got.name, "test");
2627 assert_eq!(got.entity_type, "person");
2628 assert_eq!(got.observations, vec!["obs1", "obs2"]);
2629 }
2630
2631 #[test]
2632 fn test_get_nonexistent() {
2633 let kg = new_kg();
2634 assert!(kg.get_entity("nonexistent").unwrap().is_none());
2635 }
2636
2637 #[test]
2638 fn test_delete_entity() {
2639 let kg = new_kg();
2640 kg.create_entities(&[Entity {
2641 name: "del".into(),
2642 entity_type: "t".into(),
2643 observations: vec![],
2644 }])
2645 .unwrap();
2646 assert!(kg.get_entity("del").unwrap().is_some());
2647 kg.delete_entities(&["del".to_string()]).unwrap();
2648 assert!(kg.get_entity("del").unwrap().is_none());
2649 }
2650
2651 #[test]
2652 fn test_add_and_delete_observations() {
2653 let kg = new_kg();
2654 kg.create_entities(&[Entity {
2655 name: "obs_test".into(),
2656 entity_type: "t".into(),
2657 observations: vec!["a".into()],
2658 }])
2659 .unwrap();
2660
2661 let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2662 assert_eq!(added.len(), 2);
2663
2664 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2665 assert!(ent.observations.contains(&"b".into()));
2666 assert!(ent.observations.contains(&"c".into()));
2667
2668 kg.delete_observations("obs_test", &["b".into()]).unwrap();
2669 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2670 assert!(!ent.observations.contains(&"b".into()));
2671 assert!(ent.observations.contains(&"c".into()));
2672 assert!(ent.observations.contains(&"a".into()));
2673 }
2674
2675 #[test]
2676 fn test_create_relations() {
2677 let kg = new_kg();
2678 kg.create_entities(&[
2679 Entity {
2680 name: "A".into(),
2681 entity_type: "node".into(),
2682 observations: vec![],
2683 },
2684 Entity {
2685 name: "B".into(),
2686 entity_type: "node".into(),
2687 observations: vec![],
2688 },
2689 ])
2690 .unwrap();
2691
2692 let rels = kg
2693 .create_relations(&[Relation {
2694 from: "A".into(),
2695 to: "B".into(),
2696 relation_type: "edge".into(),
2697 }])
2698 .unwrap();
2699 assert_eq!(rels.len(), 1);
2700
2701 assert_eq!(kg.get_entity_count().unwrap(), 2);
2702 assert_eq!(kg.get_relation_count().unwrap(), 1);
2703 }
2704
2705 #[test]
2706 fn test_search_nodes() {
2707 let kg = new_kg();
2708 kg.create_entities(&[Entity {
2709 name: "Einstein".into(),
2710 entity_type: "scientist".into(),
2711 observations: vec!["physics".into(), "relativity".into()],
2712 }])
2713 .unwrap();
2714
2715 let results = kg.search_nodes_filtered("physics", None, 0, 10);
2716 assert_eq!(results.len(), 1);
2717 assert_eq!(results[0].name, "Einstein");
2718
2719 let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2720 assert_eq!(results.len(), 1);
2721
2722 let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2723 assert_eq!(results.len(), 0);
2724 }
2725
2726 #[test]
2727 fn test_find_path() {
2728 let kg = new_kg();
2729 kg.create_entities(&[
2730 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2731 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2732 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2733 ]).unwrap();
2734
2735 kg.create_relations(&[
2736 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2737 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2738 ]).unwrap();
2739
2740 let path = kg.find_path("A", "C").unwrap().unwrap();
2741 assert_eq!(path, vec!["A", "B", "C"]);
2742 }
2743
2744 #[test]
2745 fn test_degree() {
2746 let kg = new_kg();
2747 kg.create_entities(&[
2748 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2749 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2750 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2751 ]).unwrap();
2752
2753 kg.create_relations(&[
2754 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2755 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2756 ]).unwrap();
2757
2758 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2759 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2760 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2761 }
2762
2763 #[test]
2764 fn test_neighbors() {
2765 let kg = new_kg();
2766 kg.create_entities(&[
2767 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2768 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2769 ]).unwrap();
2770
2771 kg.create_relations(&[Relation {
2772 from: "A".into(), to: "B".into(), relation_type: "e".into(),
2773 }]).unwrap();
2774
2775 let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2776 let v: Value = serde_json::from_str(&result).unwrap();
2777 assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2778 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2779 }
2780
2781 #[test]
2782 fn test_open_nodes() {
2783 let kg = new_kg();
2784 kg.create_entities(&[
2785 Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2786 Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2787 ]).unwrap();
2788
2789 kg.create_relations(&[Relation {
2790 from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2791 }]).unwrap();
2792
2793 let result = kg.open_nodes(&["X".into()]);
2794 let v: Value = serde_json::from_str(&result).unwrap();
2795 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2796 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2797 }
2798
2799 #[test]
2800 fn test_entities_exist() {
2801 let kg = new_kg();
2802 kg.create_entities(&[Entity {
2803 name: "exists".into(), entity_type: "t".into(), observations: vec![],
2804 }]).unwrap();
2805
2806 let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2807 assert_eq!(res, vec![true, false]);
2808 }
2809
2810 #[test]
2811 fn test_describe_entity() {
2812 let kg = new_kg();
2813 kg.create_entities(&[Entity {
2814 name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2815 }]).unwrap();
2816
2817 let entity = kg.describe_entity("desc").unwrap();
2818 assert_eq!(entity.name, "desc");
2819 }
2820
2821 #[test]
2822 fn test_entity_type_counts() {
2823 let kg = new_kg();
2824 kg.create_entities(&[
2825 Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2826 Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2827 Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2828 ]).unwrap();
2829
2830 let counts = kg.entity_type_counts();
2831 let map: FxHashMap<_, _> = counts.into_iter().collect();
2832 assert_eq!(map.get("person"), Some(&2));
2833 assert_eq!(map.get("place"), Some(&1));
2834 }
2835
2836 #[test]
2837 fn test_relation_type_counts() {
2838 let kg = new_kg();
2839 kg.create_entities(&[
2840 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2841 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2842 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2843 ]).unwrap();
2844
2845 kg.create_relations(&[
2846 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2847 Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2848 ]).unwrap();
2849
2850 let counts = kg.relation_type_counts();
2851 let map: FxHashMap<_, _> = counts.into_iter().collect();
2852 assert_eq!(map.get("knows"), Some(&2));
2853 }
2854
2855 #[test]
2856 fn test_upsert_entities() {
2857 let kg = new_kg();
2858 kg.create_entities(&[Entity {
2859 name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2860 }]).unwrap();
2861
2862 kg.upsert_entities(&[Entity {
2864 name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2865 }]).unwrap();
2866
2867 let ent = kg.get_entity("u").unwrap().unwrap();
2868 assert_eq!(ent.entity_type, "new");
2869 assert!(ent.observations.contains(&"added".into()));
2870 assert!(ent.observations.contains(&"existing".into()));
2871 }
2872
2873 #[test]
2874 fn test_merge_entities() {
2875 let kg = new_kg();
2876 kg.create_entities(&[
2877 Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2878 Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2879 ]).unwrap();
2880
2881 kg.create_relations(&[Relation {
2882 from: "source".into(), to: "target".into(), relation_type: "e".into(),
2883 }]).unwrap();
2884
2885 let merged = kg.merge_entities("source", "target").unwrap();
2886 assert_eq!(merged.name, "target");
2887 assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2888 }
2889
2890 #[test]
2891 fn test_find_all_paths() {
2892 let kg = new_kg();
2893 kg.create_entities(&[
2894 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2895 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2896 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2897 ]).unwrap();
2898
2899 kg.create_relations(&[
2900 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2901 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2902 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2903 ]).unwrap();
2904
2905 let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2906 assert!(paths.len() >= 2);
2907 }
2908
2909 #[test]
2910 fn test_batch_get_entities() {
2911 let kg = new_kg();
2912 kg.create_entities(&[
2913 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2914 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2915 ]).unwrap();
2916
2917 let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2918 assert_eq!(results.len(), 3);
2919 assert!(results[0].is_some());
2920 assert!(results[1].is_none());
2921 assert!(results[2].is_some());
2922 }
2923
2924 #[test]
2925 fn test_export_graph() {
2926 let kg = new_kg();
2927 kg.create_entities(&[Entity {
2928 name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2929 }]).unwrap();
2930
2931 let exported = kg.export("json", i64::MAX).unwrap();
2932 assert!(exported.contains("exp"));
2933 assert!(exported.contains("o"));
2934 }
2935
2936 #[test]
2937 fn test_graph_stats() {
2938 let kg = new_kg();
2939 assert_eq!(kg.get_entity_count().unwrap(), 0);
2940 assert_eq!(kg.get_relation_count().unwrap(), 0);
2941
2942 kg.create_entities(&[Entity {
2943 name: "s".into(), entity_type: "t".into(), observations: vec![],
2944 }]).unwrap();
2945
2946 assert_eq!(kg.get_entity_count().unwrap(), 1);
2947 }
2948
2949 #[test]
2950 fn test_read_graph_filtered() {
2951 let kg = new_kg();
2952 kg.create_entities(&[
2953 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2954 Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2955 ]).unwrap();
2956
2957 let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2958 let v: Value = serde_json::from_str(&out).unwrap();
2959 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2960 assert_eq!(v["entities"][0]["name"], "p1");
2961 }
2962
2963 #[test]
2964 fn test_wipe() {
2965 let kg = new_kg();
2966 kg.create_entities(&[Entity {
2967 name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
2968 }]).unwrap();
2969 assert_eq!(kg.get_entity_count().unwrap(), 1);
2970
2971 kg.wipe().unwrap();
2972 assert_eq!(kg.get_entity_count().unwrap(), 0);
2973 }
2974
2975 #[test]
2976 fn test_push_json_str() {
2977 let mut buf = String::new();
2978 push_json_str(&mut buf, "hello");
2979 assert_eq!(buf, "\"hello\"");
2980 let mut buf = String::new();
2981 push_json_str(&mut buf, "he\"llo");
2982 assert_eq!(buf, "\"he\\\"llo\"");
2983 }
2984
2985 #[test]
2988 fn test_create_entities_empty_input() {
2989 let kg = new_kg();
2990 let created = kg.create_entities(&[]).unwrap();
2991 assert!(created.is_empty());
2992 }
2993
2994 #[test]
2995 fn test_create_entities_skip_empty_name() {
2996 let kg = new_kg();
2997 let created = kg.create_entities(&[Entity {
2998 name: "".into(),
2999 entity_type: "t".into(),
3000 observations: vec![],
3001 }])
3002 .unwrap();
3003 assert!(created.is_empty());
3004 assert_eq!(kg.get_entity_count().unwrap(), 0);
3005 }
3006
3007 #[test]
3008 fn test_create_entities_duplicate_names() {
3009 let kg = new_kg();
3010 let e = Entity {
3011 name: "dup".into(),
3012 entity_type: "t".into(),
3013 observations: vec!["obs".into()],
3014 };
3015 let first = kg.create_entities(&[e.clone()]).unwrap();
3016 assert_eq!(first.len(), 1);
3017 let second = kg.create_entities(&[e.clone()]).unwrap();
3018 assert!(second.is_empty());
3019 assert_eq!(kg.get_entity_count().unwrap(), 1);
3020 }
3021
3022 #[test]
3023 fn test_create_entities_partial_duplicates() {
3024 let kg = new_kg();
3025 let created = kg.create_entities(&[
3026 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
3027 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3028 ]).unwrap();
3029 assert_eq!(created.len(), 2);
3030
3031 let second = kg.create_entities(&[
3032 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3033 Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
3034 ]).unwrap();
3035 assert_eq!(second.len(), 1); assert_eq!(second[0].name, "c");
3037 assert_eq!(kg.get_entity_count().unwrap(), 3);
3038 }
3039
3040 #[test]
3041 fn test_create_entities_mixed_empty_and_valid() {
3042 let kg = new_kg();
3043 let created = kg.create_entities(&[
3044 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3045 Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3046 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3047 ]).unwrap();
3048 assert_eq!(created.len(), 1);
3049 assert_eq!(created[0].name, "valid");
3050 assert_eq!(kg.get_entity_count().unwrap(), 1);
3051 }
3052
3053 #[test]
3054 fn test_create_entities_same_name_in_batch() {
3055 let kg = new_kg();
3056 let created = kg.create_entities(&[
3057 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3058 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3059 ]).unwrap();
3060 assert_eq!(created.len(), 1);
3061 assert_eq!(kg.get_entity_count().unwrap(), 1);
3062 }
3063
3064 #[test]
3067 fn test_create_relations_empty_input() {
3068 let kg = new_kg();
3069 let rels = kg.create_relations(&[]).unwrap();
3070 assert!(rels.is_empty());
3071 }
3072
3073 #[test]
3074 fn test_create_relations_nonexistent_from() {
3075 let kg = new_kg();
3076 kg.create_entities(&[Entity {
3077 name: "B".into(), entity_type: "t".into(), observations: vec![],
3078 }]).unwrap();
3079
3080 let rels = kg.create_relations(&[Relation {
3081 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3082 }]).unwrap();
3083 assert!(rels.is_empty());
3084 assert_eq!(kg.get_relation_count().unwrap(), 0);
3085 }
3086
3087 #[test]
3088 fn test_create_relations_nonexistent_to() {
3089 let kg = new_kg();
3090 kg.create_entities(&[Entity {
3091 name: "A".into(), entity_type: "t".into(), observations: vec![],
3092 }]).unwrap();
3093
3094 let rels = kg.create_relations(&[Relation {
3095 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3096 }]).unwrap();
3097 assert!(rels.is_empty());
3098 assert_eq!(kg.get_relation_count().unwrap(), 0);
3099 }
3100
3101 #[test]
3102 fn test_create_relations_both_nonexistent() {
3103 let kg = new_kg();
3104 let rels = kg.create_relations(&[Relation {
3105 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3106 }]).unwrap();
3107 assert!(rels.is_empty());
3108 }
3109
3110 #[test]
3111 fn test_create_relations_self_loop() {
3112 let kg = new_kg();
3113 kg.create_entities(&[Entity {
3114 name: "self".into(), entity_type: "t".into(), observations: vec![],
3115 }]).unwrap();
3116
3117 let rels = kg.create_relations(&[Relation {
3118 from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3119 }]).unwrap();
3120 assert_eq!(rels.len(), 1);
3121 assert_eq!(kg.get_relation_count().unwrap(), 1);
3122 assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3123 assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3124 }
3125
3126 #[test]
3127 fn test_create_relations_duplicate() {
3128 let kg = new_kg();
3129 kg.create_entities(&[
3130 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3131 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3132 ]).unwrap();
3133
3134 let r = Relation {
3135 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3136 };
3137 let first = kg.create_relations(&[r.clone()]).unwrap();
3138 assert_eq!(first.len(), 1);
3139
3140 let second = kg.create_relations(&[r.clone()]).unwrap();
3141 assert!(second.is_empty());
3142 assert_eq!(kg.get_relation_count().unwrap(), 1);
3143 }
3144
3145 #[test]
3146 fn test_create_relations_new_type_auto_created() {
3147 let kg = new_kg();
3148 kg.create_entities(&[
3149 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3150 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3151 ]).unwrap();
3152
3153 let rels = kg.create_relations(&[Relation {
3154 from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3155 }]).unwrap();
3156 assert_eq!(rels.len(), 1);
3157
3158 let counts = kg.relation_type_counts();
3159 let map: FxHashMap<_, _> = counts.into_iter().collect();
3160 assert_eq!(map.get("brand_new_type"), Some(&1));
3161 }
3162
3163 #[test]
3164 fn test_create_relations_degree_updates() {
3165 let kg = new_kg();
3166 kg.create_entities(&[
3167 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3168 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3169 Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3170 ]).unwrap();
3171
3172 kg.create_relations(&[
3173 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3174 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3175 ]).unwrap();
3176
3177 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3178 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3179 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3180 assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3181 assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3182 }
3183
3184 #[test]
3185 fn test_create_relations_delete_and_recreate() {
3186 let kg = new_kg();
3187 kg.create_entities(&[
3188 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3189 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3190 ]).unwrap();
3191
3192 let r = Relation {
3193 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3194 };
3195 kg.create_relations(&[r.clone()]).unwrap();
3196 assert_eq!(kg.get_relation_count().unwrap(), 1);
3197
3198 kg.delete_relations(&[r.clone()]).unwrap();
3199 assert_eq!(kg.get_relation_count().unwrap(), 0);
3200
3201 let re = kg.create_relations(&[r.clone()]).unwrap();
3203 assert_eq!(re.len(), 1);
3204 assert_eq!(kg.get_relation_count().unwrap(), 1);
3205 }
3206
3207 #[test]
3210 fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3211 let kg = new_kg();
3212 kg.create_entities(&[
3213 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3214 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3215 ]).unwrap();
3216 kg.create_relations(&[
3217 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3218 ]).unwrap();
3219
3220 assert_eq!(kg.get_relation_count().unwrap(), 1);
3221
3222 kg.delete_entities(&["A".into()]).unwrap();
3224 assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3225 assert_eq!(kg.get_relation_count().unwrap(), 0);
3226 }
3227
3228 #[test]
3229 fn test_graph_stats_after_entity_with_observations() {
3230 let kg = new_kg();
3231 kg.create_entities(&[Entity {
3232 name: "stat".into(), entity_type: "t".into(),
3233 observations: vec!["o1".into(), "o2".into(), "o3".into()],
3234 }]).unwrap();
3235
3236 let ecount = kg.get_entity_count().unwrap();
3237 assert_eq!(ecount, 1);
3239
3240 kg.delete_entities(&["stat".into()]).unwrap();
3242 assert_eq!(kg.get_entity_count().unwrap(), 0);
3243 }
3244
3245 fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3248 use std::sync::atomic::AtomicU64;
3249 static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3250 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3251 let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3252 cleanup_db(&path);
3253 let kg = GraphHandle::new(
3254 &path,
3255 Durability::Async,
3256 SqliteTuning::default(),
3257 NonZeroUsize::new(10_000).unwrap(),
3258 read_pool_size,
3259 )
3260 .expect("create KG");
3261 TestKg(kg, path)
3262 }
3263
3264 fn seed_line(kg: &GraphHandle, n: usize) {
3265 let entities: Vec<Entity> = (0..n)
3266 .map(|i| Entity {
3267 name: format!("n{i}"),
3268 entity_type: "node".into(),
3269 observations: vec![format!("obs of n{i}")],
3270 })
3271 .collect();
3272 kg.create_entities(&entities).unwrap();
3273 let rels: Vec<Relation> = (0..n.saturating_sub(1))
3274 .map(|i| Relation {
3275 from: format!("n{i}"),
3276 to: format!("n{}", i + 1),
3277 relation_type: "edge".into(),
3278 })
3279 .collect();
3280 if !rels.is_empty() {
3281 kg.create_relations(&rels).unwrap();
3282 }
3283 }
3284
3285 fn count_relations(graph_json: &str) -> usize {
3286 let v: Value = serde_json::from_str(graph_json).unwrap();
3287 v["relations"].as_array().unwrap().len()
3288 }
3289
3290 fn count_entities(graph_json: &str) -> usize {
3291 let v: Value = serde_json::from_str(graph_json).unwrap();
3292 v["entities"].as_array().unwrap().len()
3293 }
3294
3295 #[test]
3298 fn test_pool_size_one_still_works() {
3299 let kg = new_kg_with_pool(1);
3300 seed_line(&kg, 5);
3301 assert_eq!(kg.get_entity_count().unwrap(), 5);
3302 assert!(kg.get_entity("n2").unwrap().is_some());
3303 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3304 assert_eq!(count_entities(&g), 5);
3305 }
3306
3307 #[test]
3308 fn test_reads_see_committed_writes() {
3309 let kg = new_kg_with_pool(4);
3312 kg.create_entities(&[Entity {
3313 name: "fresh".into(),
3314 entity_type: "t".into(),
3315 observations: vec!["v".into()],
3316 }])
3317 .unwrap();
3318 let got = kg.get_entity("fresh").unwrap().unwrap();
3320 assert_eq!(got.observations, vec!["v"]);
3321 }
3322
3323 #[test]
3324 fn test_concurrent_readers_consistent() {
3325 let kg = new_kg_with_pool(4);
3328 seed_line(&kg, 50);
3329
3330 std::thread::scope(|s| {
3331 for _ in 0..8 {
3333 s.spawn(|| {
3334 for _ in 0..200 {
3335 let _ = kg.get_entity("n10");
3336 let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3337 let _ = kg.read_graph_filtered(None, 0, 100);
3338 let _ = kg.get_entity_count();
3339 let _ = kg.neighbors("n10", Direction::Both, None, 2);
3340 }
3341 });
3342 }
3343 s.spawn(|| {
3345 for i in 100..160 {
3346 kg.create_entities(&[Entity {
3347 name: format!("w{i}"),
3348 entity_type: "node".into(),
3349 observations: vec![format!("w obs {i}")],
3350 }])
3351 .unwrap();
3352 }
3353 });
3354 });
3355
3356 assert_eq!(kg.get_entity_count().unwrap(), 110);
3358 assert!(kg.get_entity("w159").unwrap().is_some());
3359 }
3360
3361 #[test]
3362 fn test_reader_pool_rejects_writes_internally() {
3363 let kg = new_kg_with_pool(1);
3369 seed_line(&kg, 3);
3370 std::thread::scope(|s| {
3371 for _ in 0..4 {
3372 s.spawn(|| {
3373 for _ in 0..100 {
3374 let _ = kg.read_graph_filtered(None, 0, 10);
3375 }
3376 });
3377 }
3378 });
3379 assert_eq!(kg.get_entity_count().unwrap(), 3);
3380 }
3381
3382 #[test]
3385 fn test_read_graph_relations_scoped_to_page() {
3386 let kg = new_kg_with_pool(2);
3387 seed_line(&kg, 4);
3389
3390 let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3392 assert_eq!(count_entities(&full), 4);
3393 assert_eq!(count_relations(&full), 3);
3394
3395 let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3398 assert_eq!(count_entities(&page1), 1);
3399 assert_eq!(count_relations(&page1), 0);
3400
3401 let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3404 assert_eq!(count_entities(&page2), 2);
3405 assert_eq!(count_relations(&page2), 1);
3406 }
3407
3408 #[test]
3409 fn test_read_graph_pagination_offset() {
3410 let kg = new_kg_with_pool(2);
3411 seed_line(&kg, 5);
3412 let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3413 assert_eq!(count_entities(&g), 2);
3414 assert!(!g.contains("\"n0\""));
3416 assert!(!g.contains("\"n1\""));
3417 assert!(g.contains("\"n2\""));
3418 assert!(g.contains("\"n3\""));
3419 }
3420
3421 #[test]
3422 fn test_read_graph_empty() {
3423 let kg = new_kg_with_pool(2);
3424 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3425 assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3426 }
3427
3428 #[test]
3429 fn test_read_graph_filtered_by_type() {
3430 let kg = new_kg_with_pool(2);
3431 kg.create_entities(&[
3432 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3433 Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3434 Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3435 ])
3436 .unwrap();
3437 let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3438 assert_eq!(count_entities(&g), 2);
3439 assert!(g.contains("\"p1\""));
3440 assert!(g.contains("\"p2\""));
3441 assert!(!g.contains("\"q1\""));
3442 }
3443
3444 #[test]
3445 fn test_export_respects_max_rows() {
3446 let kg = new_kg_with_pool(2);
3447 seed_line(&kg, 5);
3448
3449 let full = kg.export("json", i64::MAX).unwrap();
3451 assert_eq!(count_entities(&full), 5);
3452 assert_eq!(count_relations(&full), 4);
3453
3454 let capped = kg.export("json", 2).unwrap();
3456 assert_eq!(count_entities(&capped), 2);
3457 assert_eq!(count_relations(&capped), 2);
3458 }
3459
3460 #[test]
3461 fn test_export_negative_max_rows_is_unbounded() {
3462 let kg = new_kg_with_pool(2);
3463 seed_line(&kg, 3);
3464 let out = kg.export("json", -1).unwrap();
3466 assert_eq!(count_entities(&out), 3);
3467 }
3468
3469 #[test]
3472 fn test_many_small_write_batches_stay_consistent() {
3473 let kg = new_kg_with_pool(2);
3474 for i in 0..100 {
3475 kg.create_entities(&[Entity {
3476 name: format!("e{i}"),
3477 entity_type: "t".into(),
3478 observations: vec![format!("o{i}")],
3479 }])
3480 .unwrap();
3481 }
3482 assert_eq!(kg.get_entity_count().unwrap(), 100);
3483 let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3486 assert!(hits.iter().any(|e| e.name == "e57"));
3487 }
3488
3489 #[test]
3492 fn test_wipe_clears_name_and_obs_fts() {
3493 let kg = new_kg_with_pool(2);
3494 kg.create_entities(&[Entity {
3495 name: "Einstein".into(),
3496 entity_type: "scientist".into(),
3497 observations: vec!["physics".into()],
3498 }])
3499 .unwrap();
3500
3501 assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3503 assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3504
3505 kg.wipe().unwrap();
3506
3507 assert_eq!(kg.get_entity_count().unwrap(), 0);
3510 assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3511 assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3512 }
3513
3514 #[test]
3515 fn test_wipe_then_recreate_search_works() {
3516 let kg = new_kg_with_pool(2);
3519 kg.create_entities(&[Entity {
3520 name: "Einstein".into(),
3521 entity_type: "scientist".into(),
3522 observations: vec!["physics".into()],
3523 }])
3524 .unwrap();
3525 kg.wipe().unwrap();
3526
3527 kg.create_entities(&[Entity {
3528 name: "Einstein".into(),
3529 entity_type: "scientist".into(),
3530 observations: vec!["physics".into(), "relativity".into()],
3531 }])
3532 .unwrap();
3533
3534 let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3535 assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3536 let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3537 assert_eq!(by_obs.len(), 1);
3538 assert_eq!(kg.get_entity_count().unwrap(), 1);
3539 }
3540
3541 #[test]
3544 fn test_search_relations_missing_type_returns_empty() {
3545 let kg = new_kg_with_pool(2);
3546 seed_line(&kg, 3); let r = kg.search_relations(None, None, Some("does_not_exist"));
3550 assert!(r.is_empty());
3551 let types = kg.relation_type_counts();
3553 assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3554 }
3555
3556 #[test]
3557 fn test_search_relations_missing_from_returns_empty() {
3558 let kg = new_kg_with_pool(2);
3559 seed_line(&kg, 3);
3560 let r = kg.search_relations(Some("ghost"), None, None);
3561 assert!(r.is_empty(), "missing 'from' must not match every relation");
3562 }
3563
3564 #[test]
3565 fn test_search_relations_existing_filters_still_work() {
3566 let kg = new_kg_with_pool(2);
3567 seed_line(&kg, 3);
3568 let r = kg.search_relations(Some("n0"), None, Some("edge"));
3569 assert_eq!(r.len(), 1);
3570 assert_eq!(r[0].from, "n0");
3571 assert_eq!(r[0].to, "n1");
3572 }
3573
3574 #[test]
3575 fn test_neighbors_missing_type_returns_only_start() {
3576 let kg = new_kg_with_pool(2);
3577 seed_line(&kg, 3);
3578 let json = kg
3579 .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3580 .unwrap();
3581 assert_eq!(count_entities(&json), 1);
3583 assert_eq!(count_relations(&json), 0);
3584 }
3585
3586 #[test]
3587 fn test_neighbors_existing_type_filters() {
3588 let kg = new_kg_with_pool(2);
3589 kg.create_entities(&[
3590 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3591 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3592 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3593 ])
3594 .unwrap();
3595 kg.create_relations(&[
3596 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3597 Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3598 ])
3599 .unwrap();
3600 let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3601 assert!(json.contains("\"b\""));
3602 assert!(!json.contains("\"c\""));
3603 assert_eq!(count_relations(&json), 1);
3604 }
3605
3606 #[test]
3607 fn test_sqlite_tuning_applied_to_fresh_db() {
3608 use std::sync::atomic::AtomicU64;
3609 static COUNTER: AtomicU64 = AtomicU64::new(2_000_000);
3610 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3611 let path = std::env::temp_dir().join(format!("kg_tuning_{}_{}.db", std::process::id(), n));
3612 cleanup_db(&path);
3613
3614 let tuning = SqliteTuning {
3615 page_size: 8192,
3616 ..SqliteTuning::default()
3617 };
3618 let kg = TestKg(
3619 GraphHandle::new(&path, Durability::Async, tuning, NonZeroUsize::new(64).unwrap(), 2)
3620 .expect("create KG"),
3621 path.clone(),
3622 );
3623 kg.create_entities(&[Entity {
3624 name: "a".into(),
3625 entity_type: "n".into(),
3626 observations: vec!["o".into()],
3627 }])
3628 .unwrap();
3629
3630 let probe = Connection::open(&path).unwrap();
3633 let page_size: i64 = probe.query_row("PRAGMA page_size", [], |r| r.get(0)).unwrap();
3634 assert_eq!(page_size, 8192);
3635 let auto_vacuum: i64 = probe.query_row("PRAGMA auto_vacuum", [], |r| r.get(0)).unwrap();
3636 assert_eq!(auto_vacuum, 2, "expected INCREMENTAL auto_vacuum");
3637 let journal: String = probe.query_row("PRAGMA journal_mode", [], |r| r.get(0)).unwrap();
3638 assert_eq!(journal.to_lowercase(), "wal");
3639 }
3640
3641 #[test]
3642 fn test_checkpoint_passive_is_noop_safe() {
3643 let kg = new_kg();
3644 kg.checkpoint_passive().unwrap();
3646 kg.create_entities(&[Entity {
3647 name: "a".into(),
3648 entity_type: "n".into(),
3649 observations: vec!["o".into()],
3650 }])
3651 .unwrap();
3652 kg.checkpoint_passive().unwrap();
3654 kg.checkpoint_passive().unwrap();
3655 assert!(kg.get_entity("a").unwrap().is_some());
3657 }
3658}