1use rustc_hash::FxHashMap;
2use std::collections::{HashSet, VecDeque};
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use lru::LruCache;
9use parking_lot::{Mutex, MutexGuard};
10use rusqlite::{params, types::ToSql, Connection, OpenFlags};
11
12use crate::config::{Durability, SqliteTuning};
13use crate::errors::{MCSError, Result};
14use crate::types::{Entity, Relation};
15
16fn sqlite_err(e: rusqlite::Error) -> MCSError {
17 MCSError::IoError(std::io::Error::other(e))
18}
19
20const fn is_not_found(e: &rusqlite::Error) -> bool {
21 matches!(e, rusqlite::Error::QueryReturnedNoRows)
22}
23
24#[inline(always)]
25fn now_us() -> i64 {
26 SystemTime::now()
27 .duration_since(UNIX_EPOCH)
28 .unwrap_or_default()
29 .as_micros() as i64
30}
31
32#[inline(always)]
33pub(crate) fn name_hash(name: &str) -> i64 {
34 let mut h: u64 = 0xcbf29ce484222325;
35 for b in name.bytes() {
36 h ^= u64::from(b);
37 h = h.wrapping_mul(0x100000001b3);
38 }
39 h as i64
40}
41
42fn load_observations(conn: &Connection, entity_id: i64) -> Result<Vec<String>> {
43 let mut stmt = conn
44 .prepare_cached("SELECT body FROM observation WHERE entity_id = ?1 ORDER BY idx")
45 .map_err(sqlite_err)?;
46 let rows = stmt
47 .query_map(params![entity_id], |row| row.get::<_, String>(0))
48 .map_err(sqlite_err)?
49 .filter_map(|r| r.ok())
50 .collect::<Vec<_>>();
51 Ok(rows)
52}
53
54fn load_observations_opt(conn: &Connection, entity_id: i64) -> Vec<String> {
55 load_observations(conn, entity_id).unwrap_or_default()
56}
57
58fn entity_name_lookup(conn: &Connection, name: &str) -> Result<Option<i64>> {
59 let h = name_hash(name);
60 let mut stmt = conn
61 .prepare_cached(
62 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
63 )
64 .map_err(sqlite_err)?;
65 match stmt.query_row(params![h, name], |row| row.get::<_, i64>(0)) {
66 Ok(id) => Ok(Some(id)),
67 Err(e) if is_not_found(&e) => Ok(None),
68 Err(e) => Err(sqlite_err(e)),
69 }
70}
71
72fn get_type_id(conn: &Connection, type_name: &str, kind: i64) -> Result<i64> {
73 let mut sel = conn
74 .prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
75 .map_err(sqlite_err)?;
76 if let Ok(id) = sel.query_row(params![kind, type_name], |row| row.get::<_, i64>(0)) {
77 return Ok(id);
78 }
79 conn.execute(
80 "INSERT INTO type_dict (kind, name, count) VALUES (?1, ?2, 0)",
81 params![kind, type_name],
82 )
83 .map_err(sqlite_err)?;
84 Ok(conn.last_insert_rowid())
85}
86
87fn lookup_type_id(conn: &Connection, type_name: &str, kind: i64) -> Option<i64> {
91 conn.prepare_cached("SELECT id FROM type_dict WHERE kind = ?1 AND name = ?2")
92 .ok()?
93 .query_row(params![kind, type_name], |row| row.get::<_, i64>(0))
94 .ok()
95}
96
97fn inc_type_count(conn: &Connection, type_id: i64, delta: i64) -> Result<()> {
98 conn.execute(
99 "UPDATE type_dict SET count = count + ?1 WHERE id = ?2",
100 params![delta, type_id],
101 )
102 .map_err(sqlite_err)?;
103 Ok(())
104}
105
106fn inc_graph_stat(conn: &Connection, key: &str, delta: i64) -> Result<()> {
107 conn.execute(
108 "UPDATE graph_stat SET value = value + ?1 WHERE key = ?2",
109 params![delta, key],
110 )
111 .map_err(sqlite_err)?;
112 Ok(())
113}
114
115fn read_graph_stat(conn: &Connection, key: &str) -> Result<i64> {
116 conn.query_row(
117 "SELECT value FROM graph_stat WHERE key = ?1",
118 params![key],
119 |row| row.get(0),
120 )
121 .map_err(sqlite_err)
122}
123
124fn name_of_type(conn: &Connection, type_id: i64) -> Result<String> {
125 conn.query_row(
126 "SELECT name FROM type_dict WHERE id = ?1",
127 params![type_id],
128 |row| row.get(0),
129 )
130 .map_err(sqlite_err)
131}
132
133fn select_all_types(conn: &Connection, kind: i64) -> Result<Vec<(String, usize)>> {
134 let mut stmt = conn
135 .prepare_cached(
136 "SELECT name, count FROM type_dict WHERE kind = ?1 AND count > 0 ORDER BY count DESC",
137 )
138 .map_err(sqlite_err)?;
139 let rows = stmt
140 .query_map(params![kind], |row| {
141 Ok((
142 row.get::<_, String>(0)?,
143 row.get::<_, i64>(1)? as usize,
144 ))
145 })
146 .map_err(sqlite_err)?
147 .filter_map(|r| r.ok())
148 .collect();
149 Ok(rows)
150}
151
152fn entity_by_id(conn: &Connection, id: i64) -> Result<Entity> {
153 let mut stmt = conn
154 .prepare_cached(
155 "SELECT e.name, t.name,
156 COALESCE((SELECT json_group_array(o.body ORDER BY o.idx) FROM observation o WHERE o.entity_id = e.id), '[]')
157 FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1 AND e.flags = 0",
158 )
159 .map_err(sqlite_err)?;
160 let (name, etype, obs_json): (String, String, String) = stmt
161 .query_row(params![id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
162 .map_err(sqlite_err)?;
163 let observations: Vec<String> = serde_json::from_str(&obs_json).unwrap_or_default();
164 Ok(Entity {
165 name,
166 entity_type: etype,
167 observations,
168 })
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub enum Direction {
174 Outgoing,
175 Incoming,
176 Both,
177}
178
179impl Direction {
180 pub fn parse(s: Option<&str>) -> Self {
181 match s {
182 Some("OUTGOING") => Direction::Outgoing,
183 Some("INCOMING") => Direction::Incoming,
184 _ => Direction::Both,
185 }
186 }
187}
188
189pub fn push_json_str(buf: &mut String, raw: &str) {
192 buf.push('"');
193 let mut start = 0;
194 let bytes = raw.as_bytes();
195 for (i, &b) in bytes.iter().enumerate() {
196 let esc: u8 = match b {
197 b'"' => b'"',
198 b'\\' => b'\\',
199 b'\n' => b'n',
200 b'\r' => b'r',
201 b'\t' => b't',
202 0x08 => b'b',
203 0x0C => b'f',
204 0x00..=0x07 | 0x0B | 0x0E..=0x1F => continue, _ => continue,
206 };
207 buf.push_str(&raw[start..i]);
208 buf.push('\\');
209 buf.push(esc as char);
210 start = i + 1;
211 }
212 for (i, &b) in bytes.iter().enumerate().skip(start) {
214 if b <= 0x07 || b == 0x0B || (0x0E..=0x1F).contains(&b) {
215 buf.push_str(&raw[start..i]);
216 write_escape_unicode(buf, b);
217 start = i + 1;
218 }
219 }
220 buf.push_str(&raw[start..]);
221 buf.push('"');
222}
223
224#[inline(never)]
225fn write_escape_unicode(buf: &mut String, b: u8) {
226 use std::fmt::Write;
227 write!(buf, "\\u{:04x}", b).unwrap();
228}
229
230#[derive(Copy, Clone)]
233struct EntityMeta {
234 id: i64,
235 type_id: i64,
236 obs_count: i64,
237 out_deg: i64,
238 in_deg: i64,
239}
240
241struct TxGuard<'a> {
244 conn: &'a Connection,
245 done: bool,
246}
247
248impl<'a> TxGuard<'a> {
249 fn begin(conn: &'a Connection) -> Result<Self> {
250 conn.execute_batch("BEGIN IMMEDIATE").map_err(sqlite_err)?;
255 Ok(Self { conn, done: false })
256 }
257
258 fn commit(mut self) -> Result<()> {
259 self.done = true;
260 self.conn.execute_batch("COMMIT").map_err(sqlite_err)
261 }
262}
263
264impl Drop for TxGuard<'_> {
265 fn drop(&mut self) {
266 if !self.done {
267 let _ = self.conn.execute_batch("ROLLBACK");
268 }
269 }
270}
271
272struct ReaderPool {
279 conns: Vec<Mutex<Connection>>,
280 next: AtomicUsize,
281}
282
283impl ReaderPool {
284 fn get(&self) -> MutexGuard<'_, Connection> {
288 for c in &self.conns {
289 if let Some(g) = c.try_lock() {
290 return g;
291 }
292 }
293 let i = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
294 self.conns[i].lock()
295 }
296}
297
298pub struct GraphHandle {
301 writer: Mutex<Connection>,
304 readers: ReaderPool,
306 seq_entity: AtomicI64,
307 seq_obs: AtomicI64,
308 cache: Mutex<LruCache<String, EntityMeta>>,
309}
310
311fn open_reader(path: &Path, tuning: &SqliteTuning) -> Result<Connection> {
318 let conn = Connection::open_with_flags(
319 path,
320 OpenFlags::SQLITE_OPEN_READ_WRITE
321 | OpenFlags::SQLITE_OPEN_NO_MUTEX
322 | OpenFlags::SQLITE_OPEN_URI,
323 )
324 .map_err(sqlite_err)?;
325 conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
326 .map_err(sqlite_err)?;
327 conn.execute_batch(&format!(
328 "PRAGMA query_only = ON;
329 PRAGMA cache_size = -{};
330 PRAGMA temp_store = MEMORY;
331 PRAGMA mmap_size = {};",
332 tuning.cache_size_kb, tuning.mmap_size
333 ))
334 .map_err(sqlite_err)?;
335 Ok(conn)
336}
337
338impl GraphHandle {
339 pub fn new(
340 path: &Path,
341 durability: Durability,
342 tuning: SqliteTuning,
343 lru_cache_size: NonZeroUsize,
344 read_pool_size: usize,
345 ) -> Result<Self> {
346 let conn = Connection::open(path).map_err(sqlite_err)?;
347 conn.busy_timeout(Duration::from_millis(tuning.busy_timeout_ms))
350 .map_err(sqlite_err)?;
351
352 conn.execute_batch(&format!(
358 "PRAGMA page_size = {};
359 PRAGMA auto_vacuum = INCREMENTAL;",
360 tuning.page_size
361 ))
362 .map_err(sqlite_err)?;
363
364 conn.execute_batch(&format!(
365 "PRAGMA journal_mode = WAL;
366 PRAGMA foreign_keys = OFF;
367 PRAGMA cache_size = -{};
368 PRAGMA temp_store = MEMORY;
369 PRAGMA busy_timeout = {};
370 PRAGMA synchronous = NORMAL;
371 PRAGMA journal_size_limit = {};",
372 tuning.cache_size_kb, tuning.busy_timeout_ms, tuning.journal_size_limit
373 ))
374 .map_err(sqlite_err)?;
375
376 conn.execute_batch(
377 "CREATE TABLE IF NOT EXISTS entity (
378 id INTEGER PRIMARY KEY,
379 name_hash INTEGER NOT NULL,
380 name TEXT NOT NULL,
381 type_id INTEGER NOT NULL,
382 obs_count INTEGER NOT NULL DEFAULT 0,
383 out_deg INTEGER NOT NULL DEFAULT 0,
384 in_deg INTEGER NOT NULL DEFAULT 0,
385 created_us INTEGER NOT NULL,
386 updated_us INTEGER NOT NULL,
387 flags INTEGER NOT NULL DEFAULT 0
388 ) STRICT;
389
390 CREATE INDEX IF NOT EXISTS entity_by_hash
391 ON entity(name_hash, type_id, obs_count, out_deg, in_deg)
392 WHERE flags = 0;
393
394 CREATE INDEX IF NOT EXISTS entity_name_ci
395 ON entity(lower(name))
396 WHERE flags = 0;
397
398 CREATE TABLE IF NOT EXISTS observation (
399 id INTEGER PRIMARY KEY,
400 entity_id INTEGER NOT NULL,
401 idx INTEGER NOT NULL,
402 body TEXT NOT NULL,
403 created_us INTEGER NOT NULL
404 ) STRICT;
405
406 CREATE INDEX IF NOT EXISTS obs_by_entity
407 ON observation(entity_id, idx);
408
409 CREATE TABLE IF NOT EXISTS relation (
410 from_id INTEGER NOT NULL,
411 to_id INTEGER NOT NULL,
412 type_id INTEGER NOT NULL,
413 created_us INTEGER NOT NULL
414 ) STRICT;
415
416 CREATE INDEX IF NOT EXISTS rel_out
417 ON relation(from_id, type_id, to_id);
418
419 CREATE INDEX IF NOT EXISTS rel_in
420 ON relation(to_id, type_id, from_id);
421
422 CREATE VIRTUAL TABLE IF NOT EXISTS name_fts
423 USING fts5(name, content='entity', content_rowid='id',
424 tokenize='unicode61 remove_diacritics 2');
425
426 CREATE VIRTUAL TABLE IF NOT EXISTS obs_fts
427 USING fts5(body, content='observation', content_rowid='id',
428 tokenize='unicode61 remove_diacritics 2');
429
430 CREATE TRIGGER IF NOT EXISTS obs_fts_ai AFTER INSERT ON observation BEGIN
431 INSERT INTO obs_fts(rowid, body) VALUES (new.id, new.body);
432 END;
433
434 CREATE TRIGGER IF NOT EXISTS obs_fts_bd BEFORE DELETE ON observation BEGIN
435 INSERT INTO obs_fts(obs_fts, rowid, body) VALUES ('delete', old.id, '');
436 END;
437
438 CREATE TABLE IF NOT EXISTS type_dict (
439 id INTEGER PRIMARY KEY,
440 kind INTEGER NOT NULL,
441 name TEXT NOT NULL,
442 count INTEGER NOT NULL DEFAULT 0
443 ) STRICT;
444
445 CREATE INDEX IF NOT EXISTS type_by_name
446 ON type_dict(kind, name);
447
448 CREATE TABLE IF NOT EXISTS graph_stat (
449 key TEXT NOT NULL PRIMARY KEY,
450 value INTEGER NOT NULL
451 ) STRICT, WITHOUT ROWID;
452
453 CREATE TABLE IF NOT EXISTS hub_degree (
454 entity_id INTEGER PRIMARY KEY,
455 out_deg INTEGER NOT NULL,
456 in_deg INTEGER NOT NULL
457 ) STRICT;
458
459 CREATE TABLE IF NOT EXISTS partition_map (
460 table_name TEXT NOT NULL PRIMARY KEY,
461 role INTEGER NOT NULL,
462 type_id INTEGER,
463 row_count INTEGER NOT NULL DEFAULT 0
464 ) STRICT, WITHOUT ROWID;",
465 )
466 .map_err(sqlite_err)?;
467
468 conn.execute_batch(&format!("PRAGMA mmap_size = {};", tuning.mmap_size))
469 .map_err(sqlite_err)?;
470
471 let sync_pragma = match durability {
472 Durability::Sync => "PRAGMA synchronous = FULL",
473 Durability::Async => "PRAGMA synchronous = NORMAL",
474 };
475 conn.execute_batch(sync_pragma).map_err(sqlite_err)?;
476
477 conn.execute_batch("PRAGMA analysis_limit = 400;")
480 .map_err(sqlite_err)?;
481
482 let has_stat: bool = conn
483 .query_row("SELECT 1 FROM graph_stat LIMIT 1", [], |_| Ok(()))
484 .is_ok();
485 if !has_stat {
486 conn.execute_batch(
487 "INSERT INTO graph_stat(key, value) VALUES
488 ('entities', 0), ('relations', 0), ('observations', 0),
489 ('entity_seq', 0), ('obs_seq', 0);",
490 )
491 .map_err(sqlite_err)?;
492 }
493
494 conn.execute_batch("PRAGMA optimize;").map_err(sqlite_err)?;
495
496 let seq_entity = read_graph_stat(&conn, "entity_seq").unwrap_or(0);
497 let seq_obs = read_graph_stat(&conn, "obs_seq").unwrap_or(0);
498
499 let pool_size = read_pool_size.max(1);
502 let mut conns = Vec::with_capacity(pool_size);
503 for _ in 0..pool_size {
504 conns.push(Mutex::new(open_reader(path, &tuning)?));
505 }
506 let readers = ReaderPool {
507 conns,
508 next: AtomicUsize::new(0),
509 };
510
511 Ok(Self {
512 writer: Mutex::new(conn),
513 readers,
514 seq_entity: AtomicI64::new(seq_entity),
515 seq_obs: AtomicI64::new(seq_obs),
516 cache: Mutex::new(LruCache::new(lru_cache_size)),
517 })
518 }
519
520 fn next_entity_id(&self) -> i64 {
521 self.seq_entity.fetch_add(1, Ordering::Relaxed) + 1
522 }
523
524 fn next_obs_id(&self) -> i64 {
525 self.seq_obs.fetch_add(1, Ordering::Relaxed) + 1
526 }
527
528 fn meta_get(&self, name: &str) -> Option<EntityMeta> {
529 self.cache.lock().get(name).copied()
530 }
531
532 fn meta_set(&self, name: &str, m: EntityMeta) {
533 self.cache.lock().put(name.to_string(), m);
534 }
535
536 fn meta_remove(&self, name: &str) {
537 self.cache.lock().pop(name);
538 }
539
540 fn meta_update(&self, name: &str, f: impl FnOnce(&mut EntityMeta)) {
541 let mut cache = self.cache.lock();
542 if let Some(m) = cache.get_mut(name) {
543 f(m);
544 }
545 }
546
547 fn get_entity_id(&self, conn: &Connection, name: &str) -> Result<Option<(i64, i64, i64, i64)>> {
548 if let Some(m) = self.meta_get(name) {
549 return Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)));
550 }
551 let h = name_hash(name);
552 let mut stmt = conn
553 .prepare_cached(
554 "SELECT id, type_id, obs_count, out_deg, in_deg
555 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
556 )
557 .map_err(sqlite_err)?;
558 match stmt.query_row(params![h, name], |row| {
559 Ok(EntityMeta {
560 id: row.get(0)?,
561 type_id: row.get(1)?,
562 obs_count: row.get(2)?,
563 out_deg: row.get(3)?,
564 in_deg: row.get(4)?,
565 })
566 }) {
567 Ok(m) => {
568 self.meta_set(name, m);
569 Ok(Some((m.id, m.type_id, m.out_deg, m.in_deg)))
570 }
571 Err(e) if is_not_found(&e) => Ok(None),
572 Err(e) => Err(sqlite_err(e)),
573 }
574 }
575
576 fn sync_seqs(&self, conn: &Connection) -> Result<()> {
577 let seq_e = self.seq_entity.load(Ordering::Relaxed);
578 let seq_o = self.seq_obs.load(Ordering::Relaxed);
579 conn.execute(
580 "UPDATE graph_stat SET value = CASE key WHEN 'entity_seq' THEN ?1 WHEN 'obs_seq' THEN ?2 ELSE value END
581 WHERE key IN ('entity_seq', 'obs_seq')",
582 params![seq_e, seq_o],
583 )
584 .map_err(sqlite_err)?;
585 Ok(())
586 }
587
588 pub fn get_entity(&self, name: &str) -> Result<Option<Entity>> {
591 if name.is_empty() {
592 return Ok(None);
593 }
594
595 if let Some(m) = self.meta_get(name) {
596 let conn = self.readers.get();
597 let etype = name_of_type(&conn, m.type_id).unwrap_or_default();
598 let observations = load_observations_opt(&conn, m.id);
599 return Ok(Some(Entity {
600 name: name.to_string(),
601 entity_type: etype,
602 observations,
603 }));
604 }
605
606 let conn = self.readers.get();
607 let h = name_hash(name);
608 let mut stmt = conn
609 .prepare_cached(
610 "SELECT e.id, e.type_id, e.name, t.name,
611 e.obs_count, e.out_deg, e.in_deg
612 FROM entity e
613 JOIN type_dict t ON t.id = e.type_id
614 WHERE e.name_hash = ?1 AND e.name = ?2 AND e.flags = 0",
615 )
616 .map_err(sqlite_err)?;
617 match stmt.query_row(params![h, name], |row| {
618 let id: i64 = row.get(0)?;
619 let type_id: i64 = row.get(1)?;
620 let ename: String = row.get(2)?;
621 let etype: String = row.get(3)?;
622 let obs_count: i64 = row.get(4)?;
623 let out_deg: i64 = row.get(5)?;
624 let in_deg: i64 = row.get(6)?;
625 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg))
626 }) {
627 Ok((id, type_id, ename, etype, obs_count, out_deg, in_deg)) => {
628 let observations = load_observations_opt(&conn, id);
629 drop(stmt);
630 drop(conn);
631 self.meta_set(&ename, EntityMeta { id, type_id, obs_count, out_deg, in_deg });
632 Ok(Some(Entity {
633 name: ename,
634 entity_type: etype,
635 observations,
636 }))
637 }
638 Err(e) if is_not_found(&e) => Ok(None),
639 Err(e) => Err(sqlite_err(e)),
640 }
641 }
642
643 pub fn create_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
644 let conn = self.writer.lock();
645 let tx = TxGuard::begin(&conn)?;
646
647 let mut ins_ent = conn
648 .prepare_cached(
649 "INSERT INTO entity (id, name_hash, name, type_id, obs_count, out_deg, in_deg, created_us, updated_us, flags)
650 SELECT ?1, ?2, ?3, ?4, ?5, 0, 0, ?6, ?6, 0
651 WHERE NOT EXISTS (SELECT 1 FROM entity WHERE name_hash = ?2 AND name = ?3 AND flags = 0)",
652 )
653 .map_err(sqlite_err)?;
654
655 let mut ins_fts = conn
656 .prepare_cached("INSERT INTO name_fts (rowid, name) VALUES (?1, ?2)")
657 .map_err(sqlite_err)?;
658
659 let batch_ts = now_us();
660 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
661 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
662 let mut total_entities: i64 = 0;
663 let mut total_obs: i64 = 0;
664 let mut created = Vec::new();
665 let mut created_metas: Vec<(String, EntityMeta)> = Vec::new();
666 let mut obs_sql = String::new();
667
668 for entity in entities {
669 if entity.name.is_empty() {
670 continue;
671 }
672 let h = name_hash(&entity.name);
673 let id = self.next_entity_id();
674 let type_id = match type_cache.get(entity.entity_type.as_str()) {
675 Some(t) => *t,
676 None => {
677 let t = get_type_id(&conn, &entity.entity_type, 0)?;
678 type_cache.insert(entity.entity_type.clone(), t);
679 t
680 }
681 };
682 let obs_count = entity.observations.len() as i64;
683
684 let changed = ins_ent
685 .execute(params![id, h, entity.name, type_id, obs_count, batch_ts])
686 .map_err(sqlite_err)?;
687 if changed == 0 {
688 continue;
689 }
690
691 let n = entity.observations.len();
692 if n > 0 {
693 obs_sql.clear();
694
695 let mut oids = Vec::with_capacity(n);
696 let mut idxs = Vec::with_capacity(n);
697 for _ in 0..n {
698 oids.push(self.next_obs_id());
699 }
700 for i in 0..n as i64 {
701 idxs.push(i);
702 }
703
704 obs_sql.push_str("INSERT INTO observation (id,entity_id,idx,body,created_us) VALUES");
705 for i in 0..n {
706 if i > 0 { obs_sql.push(','); }
707 obs_sql.push_str("(?,?,?,?,?)");
708 }
709
710 let mut obs_params: Vec<&dyn ToSql> = Vec::with_capacity(n * 5);
711 for (i, obs) in entity.observations.iter().enumerate() {
712 obs_params.push(&oids[i]);
713 obs_params.push(&id);
714 obs_params.push(&idxs[i]);
715 obs_params.push(obs);
716 obs_params.push(&batch_ts);
717 }
718
719 conn.execute(&obs_sql, obs_params.as_slice())
720 .map_err(sqlite_err)?;
721 }
722
723 ins_fts
724 .execute(params![id, entity.name])
725 .map_err(sqlite_err)?;
726
727 *type_deltas.entry(type_id).or_insert(0) += 1;
728 total_entities += 1;
729 total_obs += obs_count;
730
731 created.push(entity.clone());
732 created_metas.push((entity.name.clone(), EntityMeta {
733 id,
734 type_id,
735 obs_count,
736 out_deg: 0,
737 in_deg: 0,
738 }));
739 }
740
741 if total_entities > 0 {
742 for (type_id, delta) in &type_deltas {
743 inc_type_count(&conn, *type_id, *delta)?;
744 }
745 inc_graph_stat(&conn, "entities", total_entities)?;
746 inc_graph_stat(&conn, "observations", total_obs)?;
747 self.sync_seqs(&conn)?;
748 }
749
750 tx.commit()?;
751
752 if !created_metas.is_empty() {
757 let mut cache = self.cache.lock();
758 for (name, meta) in &created_metas {
759 cache.put(name.clone(), *meta);
760 }
761 }
762
763 Ok(created)
764 }
765
766 pub fn delete_entities(&self, names: &[String]) -> Result<()> {
767 if names.is_empty() {
768 return Ok(());
769 }
770 let conn = self.writer.lock();
771
772 let mut resolved: Vec<(i64, i64, String)> = Vec::with_capacity(names.len());
774 let mut sel = conn
775 .prepare_cached(
776 "SELECT id, type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
777 )
778 .map_err(sqlite_err)?;
779 for name in names {
780 let h = name_hash(name);
781 let (id, type_id) = match sel.query_row(params![h, name], |row| {
782 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
783 }) {
784 Ok(v) => v,
785 Err(e) if is_not_found(&e) => continue,
786 Err(e) => return Err(sqlite_err(e)),
787 };
788 resolved.push((id, type_id, name.clone()));
789 }
790
791 if resolved.is_empty() {
792 return Ok(());
793 }
794
795 let ids: Vec<i64> = resolved.iter().map(|(id, _, _)| *id).collect();
796 let n = ids.len();
797
798 let obs_p: Vec<String> = (0..n).map(|i| format!("?{}", i + 1)).collect();
800 let obs_sql = format!(
801 "DELETE FROM observation WHERE entity_id IN ({})",
802 obs_p.join(",")
803 );
804 let obs_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
805 let obs_deleted = conn
806 .execute(&obs_sql, obs_refs.as_slice())
807 .map_err(sqlite_err)? as i64;
808
809 let rel_sql = format!(
811 "DELETE FROM relation WHERE from_id IN ({}) OR to_id IN ({})",
812 obs_p.join(","),
813 obs_p.join(",")
814 );
815 let rel_refs: Vec<&dyn ToSql> = ids.iter().map(|id| id as &dyn ToSql).collect();
816 let rel_deleted = conn
817 .execute(&rel_sql, rel_refs.as_slice())
818 .map_err(sqlite_err)? as i64;
819
820 let fts_values: Vec<String> = (0..n)
822 .map(|_| "('delete', ?, '')".to_string())
823 .collect();
824 let fts_sql = format!(
825 "INSERT INTO name_fts(name_fts, rowid, name) VALUES {}",
826 fts_values.join(", ")
827 );
828 conn.execute(&fts_sql, rusqlite::params_from_iter(&ids))
829 .map_err(sqlite_err)?;
830
831 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
833 for &(_, type_id, _) in &resolved {
834 *type_deltas.entry(type_id).or_insert(0) += 1;
835 }
836
837 if !type_deltas.is_empty() {
839 let m = type_deltas.len();
840 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
841 let type_vals: Vec<i64> = type_deltas.values().map(|v| -*v).collect();
842 let mut case_parts: Vec<String> = Vec::with_capacity(m);
843 let mut id_parts: Vec<String> = Vec::with_capacity(m);
844 for i in 0..m {
845 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
846 id_parts.push(format!("?{}", i + 1));
847 }
848 let sql = format!(
849 "UPDATE type_dict SET count = MAX(0, count + CASE id {} ELSE 0 END) WHERE id IN ({})",
850 case_parts.join(" "),
851 id_parts.join(","),
852 );
853 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
854 for id in &type_keys {
855 params.push(Box::new(*id));
856 }
857 for delta in &type_vals {
858 params.push(Box::new(*delta));
859 }
860 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
861 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
862 }
863
864 conn.execute(
866 &format!("DELETE FROM entity WHERE id IN ({})", obs_p.join(",")),
867 ids.iter().map(|id| id as &dyn ToSql).collect::<Vec<_>>().as_slice(),
868 )
869 .map_err(sqlite_err)?;
870
871 inc_graph_stat(&conn, "entities", -(n as i64))?;
873 inc_graph_stat(&conn, "observations", -obs_deleted)?;
874 inc_graph_stat(&conn, "relations", -rel_deleted)?;
875
876 for (_, _, name) in &resolved {
878 self.meta_remove(name);
879 }
880
881 Ok(())
882 }
883
884 #[cfg(feature = "code")]
889 pub fn code_purge_file(&self, rel_path: &str) -> Result<usize> {
890 let defines = self.search_relations(Some(rel_path), None, Some("defines"));
891 let mut names: Vec<String> = defines.into_iter().map(|r| r.to).collect();
892 names.push(rel_path.to_string());
893 let n = names.len();
894 self.delete_entities(&names)?;
895 Ok(n)
896 }
897
898 pub fn create_relations(&self, relations: &[Relation]) -> Result<Vec<Relation>> {
899 let conn = self.writer.lock();
900 let tx = TxGuard::begin(&conn)?;
901
902 let mut ins = conn
903 .prepare_cached(
904 "INSERT INTO relation (from_id, to_id, type_id, created_us)
905 SELECT ?1, ?2, ?3, ?4
906 WHERE NOT EXISTS (SELECT 1 FROM relation WHERE from_id = ?1 AND to_id = ?2 AND type_id = ?3)",
907 )
908 .map_err(sqlite_err)?;
909
910 let ts = now_us();
911 let mut type_cache: FxHashMap<String, i64> = FxHashMap::default();
912 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
913 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
914 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
915 let mut total_relations: i64 = 0;
916 let mut created = Vec::new();
917
918 for rel in relations {
919 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
920 Some(v) => v,
921 None => continue,
922 };
923 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
924 Some(v) => v,
925 None => continue,
926 };
927 let type_id = match type_cache.get(rel.relation_type.as_str()) {
928 Some(t) => *t,
929 None => {
930 let t = get_type_id(&conn, &rel.relation_type, 1)?;
931 type_cache.insert(rel.relation_type.clone(), t);
932 t
933 }
934 };
935
936 let changed = ins
937 .execute(params![from_id, to_id, type_id, ts])
938 .map_err(sqlite_err)?;
939 if changed == 0 {
940 continue;
941 }
942
943 *out_deltas.entry(from_id).or_insert(0) += 1;
944 *in_deltas.entry(to_id).or_insert(0) += 1;
945 *type_deltas.entry(type_id).or_insert(0) += 1;
946 total_relations += 1;
947
948 created.push(rel.clone());
949 }
950
951 if total_relations > 0 {
952 for (id, delta) in &out_deltas {
953 conn.execute(
954 "UPDATE entity SET out_deg = out_deg + ?1 WHERE id = ?2",
955 params![delta, id],
956 )
957 .map_err(sqlite_err)?;
958 }
959 for (id, delta) in in_deltas {
960 conn.execute(
961 "UPDATE entity SET in_deg = in_deg + ?1 WHERE id = ?2",
962 params![delta, id],
963 )
964 .map_err(sqlite_err)?;
965 }
966 for (type_id, delta) in &type_deltas {
967 inc_type_count(&conn, *type_id, *delta)?;
968 }
969 inc_graph_stat(&conn, "relations", total_relations)?;
970 }
971
972 tx.commit()?;
973
974 if !created.is_empty() {
977 let mut cache = self.cache.lock();
978 for rel in &created {
979 if let Some(m) = cache.get_mut(&rel.from) {
980 m.out_deg += 1;
981 }
982 if let Some(m) = cache.get_mut(&rel.to) {
983 m.in_deg += 1;
984 }
985 }
986 }
987
988 Ok(created)
989 }
990
991 pub fn delete_relations(&self, relations: &[Relation]) -> Result<()> {
992 if relations.is_empty() {
993 return Ok(());
994 }
995 let conn = self.writer.lock();
996
997 let mut triples: Vec<(i64, i64, i64)> = Vec::with_capacity(relations.len());
999 let mut names: Vec<(String, String)> = Vec::with_capacity(relations.len());
1000 for rel in relations {
1001 let (from_id, _, _, _) = match self.get_entity_id(&conn, &rel.from)? {
1002 Some(v) => v,
1003 None => continue,
1004 };
1005 let (to_id, _, _, _) = match self.get_entity_id(&conn, &rel.to)? {
1006 Some(v) => v,
1007 None => continue,
1008 };
1009 let type_id = match get_type_id(&conn, &rel.relation_type, 1) {
1010 Ok(id) => id,
1011 Err(_) => continue,
1012 };
1013 triples.push((from_id, to_id, type_id));
1014 names.push((rel.from.clone(), rel.to.clone()));
1015 }
1016
1017 if triples.is_empty() {
1018 return Ok(());
1019 }
1020
1021 let mut sql = String::from(
1023 "DELETE FROM relation WHERE (from_id, to_id, type_id) IN (",
1024 );
1025 for (i, _) in triples.iter().enumerate() {
1026 if i > 0 {
1027 sql.push_str(", ");
1028 }
1029 let base = (i * 3) + 1;
1030 sql.push_str(&format!("SELECT ?{b}, ?{bp1}, ?{bp2}", b = base, bp1 = base + 1, bp2 = base + 2));
1031 }
1032 sql.push(')');
1033
1034 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(triples.len() * 3);
1035 for &(f, t, tp) in &triples {
1036 param_values.push(Box::new(f));
1037 param_values.push(Box::new(t));
1038 param_values.push(Box::new(tp));
1039 }
1040 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1041 let total = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1042 if total == 0 {
1043 return Ok(());
1044 }
1045
1046 let mut out_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1048 let mut in_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1049 let mut type_deltas: FxHashMap<i64, i64> = FxHashMap::default();
1050 for &(from_id, to_id, type_id) in &triples {
1051 *out_deltas.entry(from_id).or_insert(0) += 1;
1052 *in_deltas.entry(to_id).or_insert(0) += 1;
1053 *type_deltas.entry(type_id).or_insert(0) += 1;
1054 }
1055
1056 let out_keys: Vec<i64> = out_deltas.keys().cloned().collect();
1058 let out_vals: Vec<i64> = out_deltas.values().cloned().collect();
1059 if !out_keys.is_empty() {
1060 let m = out_keys.len();
1061 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1062 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1063 for i in 0..m {
1064 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1065 id_parts.push(format!("?{}", i + 1));
1066 }
1067 let sql = format!(
1068 "UPDATE entity SET out_deg = MAX(0, out_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1069 case_parts.join(" "),
1070 id_parts.join(","),
1071 );
1072 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1073 for id in &out_keys {
1074 params.push(Box::new(*id));
1075 }
1076 for delta in &out_vals {
1077 params.push(Box::new(*delta));
1078 }
1079 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1080 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1081 }
1082
1083 let in_keys: Vec<i64> = in_deltas.keys().cloned().collect();
1085 let in_vals: Vec<i64> = in_deltas.values().cloned().collect();
1086 if !in_keys.is_empty() {
1087 let m = in_keys.len();
1088 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1089 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1090 for i in 0..m {
1091 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1092 id_parts.push(format!("?{}", i + 1));
1093 }
1094 let sql = format!(
1095 "UPDATE entity SET in_deg = MAX(0, in_deg - CASE id {} ELSE 0 END) WHERE id IN ({})",
1096 case_parts.join(" "),
1097 id_parts.join(","),
1098 );
1099 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1100 for id in &in_keys {
1101 params.push(Box::new(*id));
1102 }
1103 for delta in &in_vals {
1104 params.push(Box::new(*delta));
1105 }
1106 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1107 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1108 }
1109
1110 let type_keys: Vec<i64> = type_deltas.keys().cloned().collect();
1112 let type_vals: Vec<i64> = type_deltas.values().cloned().collect();
1113 if !type_keys.is_empty() {
1114 let m = type_keys.len();
1115 let mut case_parts: Vec<String> = Vec::with_capacity(m);
1116 let mut id_parts: Vec<String> = Vec::with_capacity(m);
1117 for i in 0..m {
1118 case_parts.push(format!("WHEN ?{} THEN ?{}", i + 1, m + i + 1));
1119 id_parts.push(format!("?{}", i + 1));
1120 }
1121 let sql = format!(
1122 "UPDATE type_dict SET count = MAX(0, count - CASE id {} ELSE 0 END) WHERE id IN ({})",
1123 case_parts.join(" "),
1124 id_parts.join(","),
1125 );
1126 let mut params: Vec<Box<dyn ToSql>> = Vec::with_capacity(2 * m);
1127 for id in &type_keys {
1128 params.push(Box::new(*id));
1129 }
1130 for delta in &type_vals {
1131 params.push(Box::new(*delta));
1132 }
1133 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
1134 conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)?;
1135 }
1136
1137 inc_graph_stat(&conn, "relations", -(total as i64))?;
1138
1139 for (from, to) in &names {
1142 self.meta_update(from, |m| m.out_deg = m.out_deg.saturating_sub(1));
1143 self.meta_update(to, |m| m.in_deg = m.in_deg.saturating_sub(1));
1144 }
1145
1146 Ok(())
1147 }
1148
1149 pub fn add_observations(&self, entity_name: &str, contents: &[String]) -> Result<Vec<String>> {
1150 let conn = self.writer.lock();
1151 let (id, _type_id, _, _) = match self.get_entity_id(&conn, entity_name)? {
1152 Some(v) => v,
1153 None => {
1154 return Err(MCSError::InvalidParams(format!(
1155 "Entity '{entity_name}' not found"
1156 )))
1157 }
1158 };
1159
1160 let mut max_idx: i64 = conn
1161 .query_row(
1162 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1163 params![id],
1164 |row| row.get(0),
1165 )
1166 .map_err(sqlite_err)?;
1167
1168 let ts = now_us();
1169 let mut ins_obs = conn
1170 .prepare_cached(
1171 "INSERT INTO observation (id, entity_id, idx, body, created_us) VALUES (?1, ?2, ?3, ?4, ?5)",
1172 )
1173 .map_err(sqlite_err)?;
1174
1175 for content in contents {
1176 max_idx += 1;
1177 let oid = self.next_obs_id();
1178 ins_obs
1179 .execute(params![oid, id, max_idx, content, ts])
1180 .map_err(sqlite_err)?;
1181 }
1182 let added = contents.to_vec();
1183
1184 let count: i64 = contents.len() as i64;
1185 conn.execute(
1186 "UPDATE entity SET obs_count = obs_count + ?1, updated_us = ?2 WHERE id = ?3",
1187 params![count, ts, id],
1188 )
1189 .map_err(sqlite_err)?;
1190
1191 inc_graph_stat(&conn, "observations", count)?;
1192 self.sync_seqs(&conn)?;
1193
1194 self.meta_update(entity_name, |m| m.obs_count += count);
1195
1196 Ok(added)
1197 }
1198
1199 pub fn delete_observations(&self, entity_name: &str, observations: &[String]) -> Result<()> {
1200 if observations.is_empty() {
1201 return Ok(());
1202 }
1203 let conn = self.writer.lock();
1204 let (id, _, _, _) = match self.get_entity_id(&conn, entity_name)? {
1205 Some(v) => v,
1206 None => {
1207 return Err(MCSError::InvalidParams(format!(
1208 "Entity '{entity_name}' not found"
1209 )))
1210 }
1211 };
1212
1213 let placeholders: Vec<String> = (0..observations.len())
1214 .map(|i| format!("?{}", i + 2))
1215 .collect();
1216 let sql = format!(
1217 "DELETE FROM observation WHERE entity_id = ?1 AND body IN ({})",
1218 placeholders.join(",")
1219 );
1220
1221 let mut param_values: Vec<Box<dyn ToSql>> = Vec::with_capacity(1 + observations.len());
1222 param_values.push(Box::new(id));
1223 for obs in observations {
1224 param_values.push(Box::new(obs.as_str()));
1225 }
1226 let param_refs: Vec<&dyn ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
1227 let removed = conn.execute(&sql, param_refs.as_slice()).map_err(sqlite_err)? as i64;
1228
1229 if removed > 0 {
1230 conn.execute(
1231 "UPDATE entity SET obs_count = MAX(0, obs_count - ?1), updated_us = ?2 WHERE id = ?3",
1232 params![removed, now_us(), id],
1233 )
1234 .map_err(sqlite_err)?;
1235 inc_graph_stat(&conn, "observations", -removed)?;
1236
1237 self.meta_update(entity_name, |m| m.obs_count = m.obs_count.saturating_sub(removed));
1238 }
1239
1240 Ok(())
1241 }
1242
1243 pub fn upsert_entities(&self, entities: &[Entity]) -> Result<Vec<Entity>> {
1244 let mut results = Vec::new();
1245 for entity in entities {
1246 if let Some(existing) = self.get_entity(&entity.name)? {
1247 if existing.entity_type != entity.entity_type {
1249 let conn = self.writer.lock();
1250 let old_type_id = conn
1251 .query_row(
1252 "SELECT type_id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1253 params![name_hash(&entity.name), entity.name],
1254 |row| row.get::<_, i64>(0),
1255 )
1256 .map_err(sqlite_err)?;
1257 let new_type_id = get_type_id(&conn, &entity.entity_type, 0)?;
1258 inc_type_count(&conn, old_type_id, -1)?;
1259 inc_type_count(&conn, new_type_id, 1)?;
1260 conn.execute(
1261 "UPDATE entity SET type_id = ?1, updated_us = ?2 WHERE name_hash = ?3 AND name = ?4",
1262 params![new_type_id, now_us(), name_hash(&entity.name), entity.name],
1263 )
1264 .map_err(sqlite_err)?;
1265 self.meta_remove(&entity.name);
1267 }
1268 let existing_set: HashSet<&str> =
1270 existing.observations.iter().map(|s| s.as_str()).collect();
1271 let to_add: Vec<String> = entity
1272 .observations
1273 .iter()
1274 .filter(|o| !existing_set.contains(o.as_str()))
1275 .cloned()
1276 .collect();
1277 if !to_add.is_empty() {
1278 self.add_observations(&entity.name, &to_add)?;
1279 }
1280 let updated = self
1281 .get_entity(&entity.name)?
1282 .unwrap_or(entity.clone());
1283 results.push(updated);
1284 } else {
1285 let c = self.create_entities(std::slice::from_ref(entity))?;
1286 if let Some(e) = c.into_iter().next() {
1287 results.push(e);
1288 }
1289 }
1290 }
1291 Ok(results)
1292 }
1293
1294 pub fn merge_entities(&self, source: &str, target: &str) -> Result<Entity> {
1295 let conn = self.writer.lock();
1296 let (src_id, _, _, _) = match self.get_entity_id(&conn, source)? {
1297 Some(v) => v,
1298 None => {
1299 return Err(MCSError::InvalidParams(format!(
1300 "Source entity '{source}' not found"
1301 )))
1302 }
1303 };
1304 let (tgt_id, _, _, _) = match self.get_entity_id(&conn, target)? {
1305 Some(v) => v,
1306 None => {
1307 return Err(MCSError::InvalidParams(format!(
1308 "Target entity '{target}' not found"
1309 )))
1310 }
1311 };
1312
1313 if src_id == tgt_id {
1314 return self.get_entity(target)?.ok_or_else(|| {
1315 MCSError::InvalidParams("Target entity not found after merge".into())
1316 });
1317 }
1318
1319 let mut obs_count: i64 = 0;
1321 {
1322 let mut max_idx: i64 = conn
1323 .query_row(
1324 "SELECT COALESCE(MAX(idx), -1) FROM observation WHERE entity_id = ?1",
1325 params![tgt_id],
1326 |row| row.get(0),
1327 )
1328 .map_err(sqlite_err)?;
1329 let mut sel_obs = conn
1330 .prepare_cached(
1331 "SELECT id, body FROM observation WHERE entity_id = ?1 ORDER BY idx",
1332 )
1333 .map_err(sqlite_err)?;
1334 let mut upd_obs = conn
1335 .prepare_cached("UPDATE observation SET entity_id = ?1, idx = ?2 WHERE id = ?3")
1336 .map_err(sqlite_err)?;
1337 let rows: Vec<(i64, String)> = sel_obs
1338 .query_map(params![src_id], |row| {
1339 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
1340 })
1341 .map_err(sqlite_err)?
1342 .filter_map(|r| r.ok())
1343 .collect();
1344 for (oid, _body) in &rows {
1345 max_idx += 1;
1346 upd_obs
1347 .execute(params![tgt_id, max_idx, oid])
1348 .map_err(sqlite_err)?;
1349 obs_count += 1;
1350 }
1351 }
1352
1353 conn.execute(
1355 "UPDATE OR IGNORE relation SET from_id = ?1 WHERE from_id = ?2",
1356 params![tgt_id, src_id],
1357 )
1358 .map_err(sqlite_err)?;
1359 conn.execute(
1360 "UPDATE OR IGNORE relation SET to_id = ?1 WHERE to_id = ?2",
1361 params![tgt_id, src_id],
1362 )
1363 .map_err(sqlite_err)?;
1364 conn.execute("DELETE FROM relation WHERE from_id = ?1", params![src_id])
1368 .map_err(sqlite_err)?;
1369 conn.execute("DELETE FROM relation WHERE to_id = ?1", params![src_id])
1370 .map_err(sqlite_err)?;
1371
1372 let out_add: i64 = conn
1374 .query_row(
1375 "SELECT COUNT(*) FROM relation WHERE from_id = ?1",
1376 params![tgt_id],
1377 |row| row.get(0),
1378 )
1379 .map_err(sqlite_err)?;
1380 let in_add: i64 = conn
1381 .query_row(
1382 "SELECT COUNT(*) FROM relation WHERE to_id = ?1",
1383 params![tgt_id],
1384 |row| row.get(0),
1385 )
1386 .map_err(sqlite_err)?;
1387 conn.execute(
1388 "UPDATE entity SET out_deg = ?1, in_deg = ?2, obs_count = obs_count + ?3, updated_us = ?4 WHERE id = ?5",
1389 params![out_add, in_add, obs_count, now_us(), tgt_id],
1390 )
1391 .map_err(sqlite_err)?;
1392
1393 conn.execute(
1395 "INSERT INTO name_fts(name_fts, rowid, name) VALUES ('delete', ?1, '')",
1396 params![src_id],
1397 )
1398 .map_err(sqlite_err)?;
1399 conn.execute("DELETE FROM entity WHERE id = ?1", params![src_id])
1400 .map_err(sqlite_err)?;
1401
1402 inc_graph_stat(&conn, "entities", -1)?;
1403 self.meta_remove(source);
1404
1405 if let Ok(meta) = conn.query_row(
1407 "SELECT id, type_id, obs_count, out_deg, in_deg FROM entity WHERE id = ?1",
1408 params![tgt_id],
1409 |row| {
1410 Ok(EntityMeta {
1411 id: row.get(0)?,
1412 type_id: row.get(1)?,
1413 obs_count: row.get(2)?,
1414 out_deg: row.get(3)?,
1415 in_deg: row.get(4)?,
1416 })
1417 },
1418 ) {
1419 self.meta_set(target, meta);
1420 }
1421
1422 let (name, etype): (String, String) = conn
1423 .query_row(
1424 "SELECT e.name, t.name FROM entity e JOIN type_dict t ON t.id = e.type_id WHERE e.id = ?1",
1425 params![tgt_id],
1426 |row| Ok((row.get(0)?, row.get(1)?)),
1427 )
1428 .map_err(sqlite_err)?;
1429 let observations = load_observations_opt(&conn, tgt_id);
1430
1431 Ok(Entity {
1432 name,
1433 entity_type: etype,
1434 observations,
1435 })
1436 }
1437
1438 pub fn search_nodes_filtered(
1439 &self,
1440 query: &str,
1441 filter_type: Option<&str>,
1442 offset: usize,
1443 limit: usize,
1444 ) -> Vec<Entity> {
1445 if query.is_empty() {
1446 return Vec::new();
1447 }
1448 let conn = self.readers.get();
1449
1450 let mut entity_ids: Vec<i64> = Vec::new();
1452 let mut seen: HashSet<i64> = HashSet::new();
1453
1454 if let Ok(mut stmt) = conn.prepare(
1455 "SELECT rowid FROM name_fts WHERE name_fts MATCH ?1 ORDER BY rank LIMIT ?2",
1456 ) {
1457 let limit_i64 = (limit + offset) as i64;
1458 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1459 row.get::<_, i64>(0)
1460 }) {
1461 for row in rows.flatten() {
1462 if seen.insert(row) {
1463 entity_ids.push(row);
1464 }
1465 }
1466 }
1467 }
1468
1469 if let Ok(mut stmt) = conn.prepare(
1470 "SELECT entity_id FROM obs_fts JOIN observation ON obs_fts.rowid = observation.id
1471 WHERE obs_fts MATCH ?1
1472 GROUP BY entity_id
1473 LIMIT ?2",
1474 ) {
1475 let limit_i64 = (limit + offset) as i64;
1476 if let Ok(rows) = stmt.query_map(params![query, limit_i64], |row| {
1477 row.get::<_, i64>(0)
1478 }) {
1479 for row in rows.flatten() {
1480 if seen.insert(row) {
1481 entity_ids.push(row);
1482 }
1483 }
1484 }
1485 }
1486
1487 let mut results = Vec::new();
1489 let mut count: usize = 0;
1490 for eid in entity_ids {
1491 if let Ok(entity) = entity_by_id(&conn, eid) {
1492 if let Some(ft) = filter_type
1493 && !ft.is_empty() && entity.entity_type != ft {
1494 continue;
1495 }
1496 if count < offset {
1497 count += 1;
1498 continue;
1499 }
1500 if results.len() >= limit {
1501 break;
1502 }
1503 results.push(entity);
1504 count += 1;
1505 }
1506 }
1507
1508 results
1509 }
1510
1511 pub fn read_graph_filtered(
1512 &self,
1513 filter_type: Option<&str>,
1514 offset: usize,
1515 limit: usize,
1516 ) -> Result<String> {
1517 let conn = self.readers.get();
1518
1519 let limit_sql: i64 = if limit == usize::MAX {
1520 -1
1521 } else {
1522 limit.min(i64::MAX as usize) as i64
1523 };
1524 let offset_sql: i64 = offset as i64;
1525
1526 let filter = filter_type.filter(|ft| !ft.is_empty());
1532 let ids: Vec<i64> = if let Some(ft) = filter {
1533 let mut stmt = conn
1534 .prepare_cached(
1535 "SELECT e.id FROM entity e
1536 WHERE e.type_id = (SELECT id FROM type_dict WHERE kind = 0 AND name = ?1)
1537 AND e.flags = 0
1538 ORDER BY e.id LIMIT ?2 OFFSET ?3",
1539 )
1540 .map_err(sqlite_err)?;
1541 stmt.query_map(params![ft, limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1542 .map_err(sqlite_err)?
1543 .filter_map(|r| r.ok())
1544 .collect()
1545 } else {
1546 let mut stmt = conn
1547 .prepare_cached(
1548 "SELECT e.id FROM entity e WHERE e.flags = 0
1549 ORDER BY e.id LIMIT ?1 OFFSET ?2",
1550 )
1551 .map_err(sqlite_err)?;
1552 stmt.query_map(params![limit_sql, offset_sql], |r| r.get::<_, i64>(0))
1553 .map_err(sqlite_err)?
1554 .filter_map(|r| r.ok())
1555 .collect()
1556 };
1557
1558 if ids.is_empty() {
1559 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
1560 }
1561
1562 let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
1563
1564 let entities_json: String = {
1565 let sql = format!(
1566 "SELECT COALESCE(json_group_array(json_object(
1567 'name', e.name,
1568 'entityType', t.name,
1569 'observations', COALESCE((
1570 SELECT json_group_array(o.body ORDER BY o.idx)
1571 FROM observation o WHERE o.entity_id = e.id
1572 ), json('[]'))
1573 ) ORDER BY e.id), json('[]'))
1574 FROM entity e
1575 JOIN type_dict t ON t.id = e.type_id
1576 WHERE e.id IN ({placeholders}) AND e.flags = 0"
1577 );
1578 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
1579 row.get::<_, String>(0)
1580 })
1581 .map_err(sqlite_err)?
1582 };
1583
1584 let relations_json: String = {
1585 let sql = format!(
1586 "SELECT COALESCE(json_group_array(json_object(
1587 'from', e1.name,
1588 'to', e2.name,
1589 'relationType', t.name
1590 )), json('[]'))
1591 FROM relation r
1592 JOIN entity e1 ON e1.id = r.from_id
1593 JOIN entity e2 ON e2.id = r.to_id
1594 JOIN type_dict t ON t.id = r.type_id
1595 WHERE r.from_id IN ({placeholders}) AND r.to_id IN ({placeholders})
1596 AND e1.flags = 0 AND e2.flags = 0"
1597 );
1598 let all_params: Vec<&dyn ToSql> = ids
1599 .iter()
1600 .map(|id| id as &dyn ToSql)
1601 .chain(ids.iter().map(|id| id as &dyn ToSql))
1602 .collect();
1603 conn.query_row(&sql, all_params.as_slice(), |row| row.get::<_, String>(0))
1604 .map_err(sqlite_err)?
1605 };
1606
1607 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1608 out.push_str("{\"entities\":");
1609 out.push_str(&entities_json);
1610 out.push_str(",\"relations\":");
1611 out.push_str(&relations_json);
1612 out.push('}');
1613 Ok(out)
1614 }
1615
1616 pub fn open_nodes(&self, names: &[String]) -> String {
1617 let conn = self.readers.get();
1618 let mut entity_ids: Vec<i64> = Vec::new();
1619
1620 for name in names {
1621 let h = name_hash(name);
1622 if let Ok(Some(id)) = conn
1623 .query_row(
1624 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1625 params![h, name],
1626 |row| row.get::<_, i64>(0),
1627 )
1628 .map(Some)
1629 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
1630 {
1631 entity_ids.push(id);
1632 }
1633 }
1634
1635 if entity_ids.is_empty() {
1636 return r#"{"entities":[],"relations":[]}"#.to_string();
1637 }
1638
1639 let placeholders: Vec<String> = entity_ids.iter().map(|_| "?".to_string()).collect();
1640 let ids_str = placeholders.join(",");
1641
1642 let entities_json: String = {
1643 let sql = format!(
1644 "SELECT COALESCE(json_group_array(json_object(
1645 'name', e.name,
1646 'entityType', t.name,
1647 'observations', COALESCE((
1648 SELECT json_group_array(o.body ORDER BY o.idx)
1649 FROM observation o WHERE o.entity_id = e.id
1650 ), json('[]'))
1651 ) ORDER BY e.id), json('[]'))
1652 FROM entity e
1653 JOIN type_dict t ON t.id = e.type_id
1654 WHERE e.id IN ({ids_str}) AND e.flags = 0"
1655 );
1656 conn.query_row(&sql, rusqlite::params_from_iter(&entity_ids), |row| {
1657 row.get::<_, String>(0)
1658 })
1659 .unwrap_or_else(|_| "[]".to_string())
1660 };
1661
1662 let relations_json: String = {
1663 let sql = format!(
1664 "SELECT COALESCE(json_group_array(json_object(
1665 'from', e1.name,
1666 'to', e2.name,
1667 'relationType', t.name
1668 )), json('[]'))
1669 FROM relation r
1670 JOIN entity e1 ON e1.id = r.from_id
1671 JOIN entity e2 ON e2.id = r.to_id
1672 JOIN type_dict t ON t.id = r.type_id
1673 WHERE (r.from_id IN ({ids_str}) OR r.to_id IN ({ids_str}))
1674 AND e1.flags = 0 AND e2.flags = 0"
1675 );
1676 let all_params: Vec<&dyn rusqlite::types::ToSql> = entity_ids
1677 .iter()
1678 .map(|id| id as &dyn rusqlite::types::ToSql)
1679 .chain(entity_ids.iter().map(|id| id as &dyn rusqlite::types::ToSql))
1680 .collect();
1681 let mut stmt = conn.prepare(&sql).unwrap();
1682 stmt.query_row(all_params.as_slice(), |row| row.get::<_, String>(0))
1683 .unwrap_or_else(|_| "[]".to_string())
1684 };
1685
1686 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
1687 out.push_str("{\"entities\":");
1688 out.push_str(&entities_json);
1689 out.push_str(",\"relations\":");
1690 out.push_str(&relations_json);
1691 out.push('}');
1692 out
1693 }
1694
1695 pub fn entities_exist(&self, names: &[String]) -> Result<Vec<bool>> {
1696 let conn = self.readers.get();
1697 let mut results = Vec::with_capacity(names.len());
1698 for name in names {
1699 let h = name_hash(name);
1700 let exists: bool = conn
1701 .query_row(
1702 "SELECT 1 FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
1703 params![h, name],
1704 |_| Ok(()),
1705 )
1706 .is_ok();
1707 results.push(exists);
1708 }
1709 Ok(results)
1710 }
1711
1712 pub fn degree(&self, name: &str, direction: Direction) -> Result<usize> {
1713 let conn = self.readers.get();
1714 let (_, _, out_d, in_d) = match self.get_entity_id(&conn, name)? {
1715 Some(v) => v,
1716 None => {
1717 return Err(MCSError::InvalidParams(format!(
1718 "Entity '{name}' not found"
1719 )))
1720 }
1721 };
1722 Ok(match direction {
1723 Direction::Outgoing => out_d as usize,
1724 Direction::Incoming => in_d as usize,
1725 Direction::Both => (out_d + in_d) as usize,
1726 })
1727 }
1728
1729 pub fn get_entity_count(&self) -> Result<usize> {
1730 let conn = self.readers.get();
1731 read_graph_stat(&conn, "entities")
1732 .map(|v| v as usize)
1733 .map_err(|_| MCSError::MemoryError("Failed to read entity count".into()))
1734 }
1735
1736 pub fn get_relation_count(&self) -> Result<usize> {
1737 let conn = self.readers.get();
1738 read_graph_stat(&conn, "relations")
1739 .map(|v| v as usize)
1740 .map_err(|_| MCSError::MemoryError("Failed to read relation count".into()))
1741 }
1742
1743 pub fn search_relations(
1744 &self,
1745 from: Option<&str>,
1746 to: Option<&str>,
1747 rtype: Option<&str>,
1748 ) -> Vec<Relation> {
1749 let conn = self.readers.get();
1750 let mut results = Vec::new();
1751
1752 let from_id = from
1758 .filter(|f| !f.is_empty())
1759 .map(|f| entity_name_lookup(&conn, f).ok().flatten().unwrap_or(-1));
1760 let to_id = to
1761 .filter(|t| !t.is_empty())
1762 .map(|t| entity_name_lookup(&conn, t).ok().flatten().unwrap_or(-1));
1763 let type_id = rtype
1764 .filter(|rt| !rt.is_empty())
1765 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
1766
1767 match (from_id, to_id, type_id) {
1768 (Some(fid), Some(tid), Some(tpid)) => {
1769 if let Ok(mut stmt) = conn.prepare_cached(
1770 "SELECT e1.name, e2.name, t.name
1771 FROM relation r
1772 JOIN entity e1 ON e1.id = r.from_id
1773 JOIN entity e2 ON e2.id = r.to_id
1774 JOIN type_dict t ON t.id = r.type_id
1775 WHERE r.from_id = ?1 AND r.to_id = ?2 AND r.type_id = ?3
1776 AND e1.flags = 0 AND e2.flags = 0
1777 ORDER BY r.from_id, r.to_id"
1778 )
1779 && let Ok(rows) = stmt.query_map(params![fid, tid, tpid], |row| {
1780 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1781 }) {
1782 for row in rows.flatten() { results.push(row); }
1783 }
1784 }
1785 (Some(fid), Some(tid), None) => {
1786 if let Ok(mut stmt) = conn.prepare_cached(
1787 "SELECT e1.name, e2.name, t.name
1788 FROM relation r
1789 JOIN entity e1 ON e1.id = r.from_id
1790 JOIN entity e2 ON e2.id = r.to_id
1791 JOIN type_dict t ON t.id = r.type_id
1792 WHERE r.from_id = ?1 AND r.to_id = ?2
1793 AND e1.flags = 0 AND e2.flags = 0
1794 ORDER BY r.from_id, r.to_id"
1795 )
1796 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
1797 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1798 }) {
1799 for row in rows.flatten() { results.push(row); }
1800 }
1801 }
1802 (Some(fid), None, Some(tpid)) => {
1803 if let Ok(mut stmt) = conn.prepare_cached(
1804 "SELECT e1.name, e2.name, t.name
1805 FROM relation r
1806 JOIN entity e1 ON e1.id = r.from_id
1807 JOIN entity e2 ON e2.id = r.to_id
1808 JOIN type_dict t ON t.id = r.type_id
1809 WHERE r.from_id = ?1 AND r.type_id = ?2
1810 AND e1.flags = 0 AND e2.flags = 0
1811 ORDER BY r.from_id, r.to_id"
1812 )
1813 && let Ok(rows) = stmt.query_map(params![fid, tpid], |row| {
1814 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1815 }) {
1816 for row in rows.flatten() { results.push(row); }
1817 }
1818 }
1819 (None, Some(tid), Some(tpid)) => {
1820 if let Ok(mut stmt) = conn.prepare_cached(
1821 "SELECT e1.name, e2.name, t.name
1822 FROM relation r
1823 JOIN entity e1 ON e1.id = r.from_id
1824 JOIN entity e2 ON e2.id = r.to_id
1825 JOIN type_dict t ON t.id = r.type_id
1826 WHERE r.to_id = ?1 AND r.type_id = ?2
1827 AND e1.flags = 0 AND e2.flags = 0
1828 ORDER BY r.from_id, r.to_id"
1829 )
1830 && let Ok(rows) = stmt.query_map(params![tid, tpid], |row| {
1831 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1832 }) {
1833 for row in rows.flatten() { results.push(row); }
1834 }
1835 }
1836 (Some(fid), None, None) => {
1837 if let Ok(mut stmt) = conn.prepare_cached(
1838 "SELECT e1.name, e2.name, t.name
1839 FROM relation r
1840 JOIN entity e1 ON e1.id = r.from_id
1841 JOIN entity e2 ON e2.id = r.to_id
1842 JOIN type_dict t ON t.id = r.type_id
1843 WHERE r.from_id = ?1
1844 AND e1.flags = 0 AND e2.flags = 0
1845 ORDER BY r.from_id, r.to_id"
1846 )
1847 && let Ok(rows) = stmt.query_map(params![fid], |row| {
1848 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1849 }) {
1850 for row in rows.flatten() { results.push(row); }
1851 }
1852 }
1853 (None, Some(tid), None) => {
1854 if let Ok(mut stmt) = conn.prepare_cached(
1855 "SELECT e1.name, e2.name, t.name
1856 FROM relation r
1857 JOIN entity e1 ON e1.id = r.from_id
1858 JOIN entity e2 ON e2.id = r.to_id
1859 JOIN type_dict t ON t.id = r.type_id
1860 WHERE r.to_id = ?1
1861 AND e1.flags = 0 AND e2.flags = 0
1862 ORDER BY r.from_id, r.to_id"
1863 )
1864 && let Ok(rows) = stmt.query_map(params![tid], |row| {
1865 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1866 }) {
1867 for row in rows.flatten() { results.push(row); }
1868 }
1869 }
1870 (None, None, Some(tpid)) => {
1871 if let Ok(mut stmt) = conn.prepare_cached(
1872 "SELECT e1.name, e2.name, t.name
1873 FROM relation r
1874 JOIN entity e1 ON e1.id = r.from_id
1875 JOIN entity e2 ON e2.id = r.to_id
1876 JOIN type_dict t ON t.id = r.type_id
1877 WHERE r.type_id = ?1
1878 AND e1.flags = 0 AND e2.flags = 0
1879 ORDER BY r.from_id, r.to_id"
1880 )
1881 && let Ok(rows) = stmt.query_map(params![tpid], |row| {
1882 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1883 }) {
1884 for row in rows.flatten() { results.push(row); }
1885 }
1886 }
1887 (None, None, None) => {
1888 if let Ok(mut stmt) = conn.prepare_cached(
1889 "SELECT e1.name, e2.name, t.name
1890 FROM relation r
1891 JOIN entity e1 ON e1.id = r.from_id
1892 JOIN entity e2 ON e2.id = r.to_id
1893 JOIN type_dict t ON t.id = r.type_id
1894 WHERE e1.flags = 0 AND e2.flags = 0
1895 ORDER BY r.from_id, r.to_id"
1896 )
1897 && let Ok(rows) = stmt.query_map([], |row| {
1898 Ok(Relation { from: row.get(0)?, to: row.get(1)?, relation_type: row.get(2)? })
1899 }) {
1900 for row in rows.flatten() { results.push(row); }
1901 }
1902 }
1903 }
1904 results
1905 }
1906
1907 pub fn find_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
1908 let conn = self.readers.get();
1909 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
1910 Some(v) => v,
1911 None => {
1912 return Err(MCSError::InvalidParams(format!(
1913 "Source entity '{from}' not found"
1914 )))
1915 }
1916 };
1917 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
1918 Some(v) => v,
1919 None => {
1920 return Err(MCSError::InvalidParams(format!(
1921 "Target entity '{to}' not found"
1922 )))
1923 }
1924 };
1925
1926 if from_id == to_id {
1927 return Ok(Some(vec![from.to_string()]));
1928 }
1929
1930 let mut visited = HashSet::new();
1932 let mut parent: FxHashMap<i64, i64> = FxHashMap::default();
1933 let mut queue = VecDeque::new();
1934 visited.insert(from_id);
1935 queue.push_back(from_id);
1936
1937 while let Some(cur) = queue.pop_front() {
1938 if cur == to_id {
1939 break;
1940 }
1941 if let Ok(mut stmt) =
1943 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
1944 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1945 for row in rows.flatten() {
1946 if visited.insert(row) {
1947 parent.insert(row, cur);
1948 queue.push_back(row);
1949 }
1950 }
1951 }
1952 if let Ok(mut stmt) =
1954 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
1955 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
1956 for row in rows.flatten() {
1957 if visited.insert(row) {
1958 parent.insert(row, cur);
1959 queue.push_back(row);
1960 }
1961 }
1962 }
1963 }
1964
1965 if !parent.contains_key(&to_id) && to_id != from_id {
1966 return Ok(None);
1967 }
1968
1969 let mut path = Vec::new();
1970 let mut cur = to_id;
1971 path.push(cur);
1972 while let Some(&p) = parent.get(&cur) {
1973 path.push(p);
1974 cur = p;
1975 if cur == from_id {
1976 break;
1977 }
1978 }
1979 path.reverse();
1980
1981 let placeholders: Vec<String> = path.iter().map(|_| "?".to_string()).collect();
1982 let sql = format!(
1983 "SELECT id, name FROM entity WHERE id IN ({})",
1984 placeholders.join(",")
1985 );
1986 let name_map: FxHashMap<i64, String> = if let Ok(mut stmt) = conn.prepare(&sql)
1987 && let Ok(rows) = stmt.query_map(
1988 rusqlite::params_from_iter(&path),
1989 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1990 ) {
1991 rows.flatten().collect()
1992 } else {
1993 FxHashMap::default()
1994 };
1995
1996 let name_path: Vec<String> = path.iter().filter_map(|id| name_map.get(id).cloned()).collect();
1997
1998 Ok(Some(name_path))
1999 }
2000
2001 pub fn compact(&self) -> Result<()> {
2002 let conn = self.writer.lock();
2003 conn.execute_batch("PRAGMA incremental_vacuum;").map_err(sqlite_err)?;
2004 Ok(())
2005 }
2006
2007 pub fn neighbors(
2008 &self,
2009 name: &str,
2010 direction: Direction,
2011 rtype: Option<&str>,
2012 depth: u32,
2013 ) -> Result<String> {
2014 self._traverse(name, direction, rtype, depth, true)
2015 }
2016
2017 pub fn extract_subgraph(
2018 &self,
2019 names: &[String],
2020 depth: u32,
2021 ) -> Result<String> {
2022 if names.is_empty() {
2023 return Ok(r#"{"entities":[],"relations":[]}"#.to_string());
2024 }
2025
2026 let conn = self.readers.get();
2027 let mut all_entity_ids: HashSet<i64> = HashSet::new();
2028 let mut frontier: HashSet<i64> = HashSet::new();
2029 let mut all_rel_pairs: HashSet<(i64, i64, i64)> = HashSet::new();
2030
2031 for name in names {
2033 let h = name_hash(name);
2034 if let Ok(Some(id)) = conn
2035 .query_row(
2036 "SELECT id FROM entity WHERE name_hash = ?1 AND name = ?2 AND flags = 0",
2037 params![h, name],
2038 |row| row.get::<_, i64>(0),
2039 )
2040 .map(Some)
2041 .or_else(|e| if is_not_found(&e) { Ok(None) } else { Err(sqlite_err(e)) })
2042 {
2043 all_entity_ids.insert(id);
2044 frontier.insert(id);
2045 }
2046 }
2047
2048 let mut current_depth = 0u32;
2049 while current_depth < depth && !frontier.is_empty() {
2050 let mut next_frontier: HashSet<i64> = HashSet::new();
2051
2052 const CHUNK: usize = 500;
2055 let frontier_ids: Vec<i64> = frontier.iter().copied().collect();
2056 for chunk in frontier_ids.chunks(CHUNK) {
2057 let placeholders: Vec<String> = chunk.iter().map(|_| "?".to_string()).collect();
2058 let in_clause = placeholders.join(",");
2059
2060 if let Ok(mut stmt) = conn.prepare(
2062 &format!(
2063 "SELECT from_id, to_id, type_id FROM relation WHERE from_id IN ({in_clause})",
2064 )
2065 )
2066 && let Ok(rows) = stmt.query_map(
2067 rusqlite::params_from_iter(chunk),
2068 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2069 )
2070 {
2071 for row in rows.flatten() {
2072 let (from_id, to_id, type_id) = row;
2073 all_rel_pairs.insert((from_id, to_id, type_id));
2074 if all_entity_ids.insert(to_id) {
2075 next_frontier.insert(to_id);
2076 }
2077 }
2078 }
2079
2080 if let Ok(mut stmt) = conn.prepare(
2082 &format!(
2083 "SELECT from_id, to_id, type_id FROM relation WHERE to_id IN ({in_clause})",
2084 )
2085 )
2086 && let Ok(rows) = stmt.query_map(
2087 rusqlite::params_from_iter(chunk),
2088 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?)),
2089 )
2090 {
2091 for row in rows.flatten() {
2092 let (from_id, to_id, type_id) = row;
2093 all_rel_pairs.insert((from_id, to_id, type_id));
2094 if all_entity_ids.insert(from_id) {
2095 next_frontier.insert(from_id);
2096 }
2097 }
2098 }
2099 }
2100 frontier = next_frontier;
2101 current_depth += 1;
2102 }
2103
2104 let entities_json: String = {
2105 if all_entity_ids.is_empty() {
2106 "[]".to_string()
2107 } else {
2108 let ids: Vec<i64> = all_entity_ids.iter().copied().collect();
2109 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2110 let sql = format!(
2111 "SELECT COALESCE(json_group_array(json_object(
2112 'name', e.name,
2113 'entityType', t.name,
2114 'observations', COALESCE((
2115 SELECT json_group_array(o.body ORDER BY o.idx)
2116 FROM observation o WHERE o.entity_id = e.id
2117 ), json('[]'))
2118 ) ORDER BY e.id), json('[]'))
2119 FROM entity e
2120 JOIN type_dict t ON t.id = e.type_id
2121 WHERE e.id IN ({}) AND e.flags = 0",
2122 placeholders.join(",")
2123 );
2124 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2125 row.get::<_, String>(0)
2126 })
2127 .unwrap_or_else(|_| "[]".to_string())
2128 }
2129 };
2130
2131 let relations_json: String = {
2132 if all_rel_pairs.is_empty() {
2133 "[]".to_string()
2134 } else {
2135 let vals: Vec<String> = all_rel_pairs.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2136 let sql = format!(
2137 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2138 SELECT COALESCE(json_group_array(json_object(
2139 'from', e1.name,
2140 'to', e2.name,
2141 'relationType', t.name
2142 )), json('[]'))
2143 FROM r
2144 JOIN entity e1 ON e1.id = r.from_id
2145 JOIN entity e2 ON e2.id = r.to_id
2146 JOIN type_dict t ON t.id = r.type_id
2147 WHERE e1.flags = 0 AND e2.flags = 0",
2148 vals.join(", ")
2149 );
2150 let params: Vec<&dyn ToSql> = all_rel_pairs.iter()
2151 .flat_map(|(f, t, tp)| {
2152 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2153 })
2154 .collect();
2155 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2156 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2157 .unwrap_or_else(|_| "[]".to_string())
2158 }
2159 };
2160
2161 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2162 out.push_str("{\"entities\":");
2163 out.push_str(&entities_json);
2164 out.push_str(",\"relations\":");
2165 out.push_str(&relations_json);
2166 out.push('}');
2167 Ok(out)
2168 }
2169
2170 pub fn describe_entity(&self, name: &str) -> Result<Entity> {
2171 self.get_entity(name)?
2172 .ok_or_else(|| MCSError::InvalidParams(format!("Entity '{name}' not found")))
2173 }
2174
2175 pub fn entity_type_counts(&self) -> Vec<(String, usize)> {
2176 let conn = self.readers.get();
2177 select_all_types(&conn, 0).unwrap_or_default()
2178 }
2179
2180 pub fn relation_type_counts(&self) -> Vec<(String, usize)> {
2181 let conn = self.readers.get();
2182 select_all_types(&conn, 1).unwrap_or_default()
2183 }
2184
2185 pub fn batch_get_entities(&self, names: &[String]) -> Vec<Option<Entity>> {
2186 names
2187 .iter()
2188 .map(|n| self.get_entity(n).unwrap_or(None))
2189 .collect()
2190 }
2191
2192 pub fn find_all_paths(
2193 &self,
2194 from: &str,
2195 to: &str,
2196 max_depth: usize,
2197 max_paths: usize,
2198 ) -> Result<Vec<Vec<String>>> {
2199 let conn = self.readers.get();
2200 let (from_id, _, _, _) = match self.get_entity_id(&conn, from)? {
2201 Some(v) => v,
2202 None => {
2203 return Err(MCSError::InvalidParams(format!(
2204 "Source entity '{from}' not found"
2205 )))
2206 }
2207 };
2208 let (to_id, _, _, _) = match self.get_entity_id(&conn, to)? {
2209 Some(v) => v,
2210 None => {
2211 return Err(MCSError::InvalidParams(format!(
2212 "Target entity '{to}' not found"
2213 )))
2214 }
2215 };
2216
2217 if from_id == to_id {
2218 return Ok(vec![vec![from.to_string()]]);
2219 }
2220
2221 let mut all_paths: Vec<Vec<i64>> = Vec::new();
2223 let mut queue: VecDeque<(i64, Vec<i64>)> = VecDeque::new();
2224 queue.push_back((from_id, vec![from_id]));
2225
2226 const MAX_QUEUE_SIZE: usize = 10_000_000;
2227
2228 while let Some((cur, path)) = queue.pop_front() {
2229 if all_paths.len() >= max_paths {
2230 break;
2231 }
2232 if path.len() > max_depth {
2233 continue;
2234 }
2235
2236 if let Ok(mut stmt) =
2238 conn.prepare_cached("SELECT to_id FROM relation WHERE from_id = ?1")
2239 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2240 for next_id in rows.flatten() {
2241 if next_id == to_id {
2242 let mut full_path = path.clone();
2243 full_path.push(next_id);
2244 all_paths.push(full_path);
2245 if all_paths.len() >= max_paths {
2246 break;
2247 }
2248 } else if !path.contains(&next_id) && path.len() < max_depth {
2249 if queue.len() >= MAX_QUEUE_SIZE {
2250 return Err(MCSError::InvalidParams(
2251 "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2252 ));
2253 }
2254 let mut new_path = path.clone();
2255 new_path.push(next_id);
2256 queue.push_back((next_id, new_path));
2257 }
2258 }
2259 }
2260
2261 if let Ok(mut stmt) =
2263 conn.prepare_cached("SELECT from_id FROM relation WHERE to_id = ?1")
2264 && let Ok(rows) = stmt.query_map(params![cur], |row| row.get::<_, i64>(0)) {
2265 for next_id in rows.flatten() {
2266 if next_id == to_id {
2267 let mut full_path = path.clone();
2268 full_path.push(next_id);
2269 all_paths.push(full_path);
2270 if all_paths.len() >= max_paths {
2271 break;
2272 }
2273 } else if !path.contains(&next_id) && path.len() < max_depth {
2274 if queue.len() >= MAX_QUEUE_SIZE {
2275 return Err(MCSError::InvalidParams(
2276 "Path exploration queue exceeded limit (too many paths on highly connected graph)".to_string()
2277 ));
2278 }
2279 let mut new_path = path.clone();
2280 new_path.push(next_id);
2281 queue.push_back((next_id, new_path));
2282 }
2283 }
2284 }
2285 }
2286
2287 let all_ids: HashSet<i64> = all_paths.iter().flat_map(|p| p.iter()).copied().collect();
2289 let id_list: Vec<i64> = all_ids.into_iter().collect();
2290 let name_map: FxHashMap<i64, String> = if id_list.is_empty() {
2291 FxHashMap::default()
2292 } else {
2293 let placeholders: Vec<String> = id_list.iter().map(|_| "?".to_string()).collect();
2294 let sql = format!(
2295 "SELECT id, name FROM entity WHERE id IN ({})",
2296 placeholders.join(",")
2297 );
2298 if let Ok(mut stmt) = conn.prepare(&sql)
2299 && let Ok(rows) = stmt.query_map(
2300 rusqlite::params_from_iter(&id_list),
2301 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
2302 ) {
2303 rows.flatten().collect()
2304 } else {
2305 FxHashMap::default()
2306 }
2307 };
2308
2309 let mut named_paths: Vec<Vec<String>> = Vec::with_capacity(all_paths.len());
2310 for path_ids in all_paths {
2311 let named: Vec<String> = path_ids.iter().filter_map(|id| name_map.get(id).cloned()).collect();
2312 named_paths.push(named);
2313 }
2314
2315 Ok(named_paths)
2316 }
2317
2318 pub fn export(&self, _format: &str, max_rows: i64) -> Result<String> {
2323 let conn = self.readers.get();
2324 conn.query_row(
2327 "SELECT json_object(
2328 'entities', COALESCE((
2329 SELECT json_group_array(json_object(
2330 'name', e.name,
2331 'entityType', t.name,
2332 'observations', COALESCE((
2333 SELECT json_group_array(o.body ORDER BY o.idx)
2334 FROM observation o WHERE o.entity_id = e.id
2335 ), json('[]'))
2336 ) ORDER BY e.id)
2337 FROM (
2338 SELECT id, name, type_id FROM entity
2339 WHERE flags = 0 ORDER BY id LIMIT ?1
2340 ) e
2341 JOIN type_dict t ON t.id = e.type_id
2342 ), json('[]')),
2343 'relations', COALESCE((
2344 SELECT json_group_array(json_object(
2345 'from', e1.name,
2346 'to', e2.name,
2347 'relationType', t.name
2348 ))
2349 FROM (
2350 SELECT from_id, to_id, type_id FROM relation LIMIT ?1
2351 ) r
2352 JOIN entity e1 ON e1.id = r.from_id
2353 JOIN entity e2 ON e2.id = r.to_id
2354 JOIN type_dict t ON t.id = r.type_id
2355 WHERE e1.flags = 0 AND e2.flags = 0
2356 ), json('[]'))
2357 )",
2358 params![max_rows],
2359 |row| row.get::<_, String>(0),
2360 )
2361 .map_err(sqlite_err)
2362 }
2363
2364 pub fn wipe(&self) -> Result<()> {
2365 let conn = self.writer.lock();
2366 conn.execute_batch(
2371 "DELETE FROM observation;
2372 DELETE FROM relation;
2373 DELETE FROM entity;
2374 DELETE FROM type_dict;
2375 INSERT INTO name_fts(name_fts) VALUES('delete-all');
2376 INSERT INTO obs_fts(obs_fts) VALUES('delete-all');
2377 UPDATE graph_stat SET value = 0 WHERE key IN ('entities', 'relations', 'observations');
2378 UPDATE graph_stat SET value = 0 WHERE key IN ('entity_seq', 'obs_seq');",
2379 )
2380 .map_err(sqlite_err)?;
2381 self.seq_entity.store(0, Ordering::Relaxed);
2382 self.seq_obs.store(0, Ordering::Relaxed);
2383 self.cache.lock().clear();
2384 Ok(())
2385 }
2386
2387 pub fn run_maintenance(&self) -> Result<()> {
2390 let conn = self.writer.lock();
2391
2392 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
2393 .map_err(sqlite_err)?;
2394
2395 conn.execute_batch("PRAGMA optimize(0x10000);")
2396 .map_err(sqlite_err)?;
2397
2398 conn.execute_batch(
2399 "INSERT INTO name_fts(name_fts) VALUES('optimize');
2400 INSERT INTO obs_fts(obs_fts) VALUES('optimize');",
2401 )
2402 .map_err(sqlite_err)?;
2403
2404 Ok(())
2405 }
2406
2407 pub fn checkpoint_passive(&self) -> Result<()> {
2411 let conn = self.writer.lock();
2412 conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);")
2413 .map_err(sqlite_err)?;
2414 Ok(())
2415 }
2416
2417 fn _traverse(
2418 &self,
2419 name: &str,
2420 direction: Direction,
2421 rtype: Option<&str>,
2422 depth: u32,
2423 _include_relations: bool,
2425 ) -> Result<String> {
2426 let conn = self.readers.get();
2427 let (start_id, _, _, _) = match self.get_entity_id(&conn, name)? {
2428 Some(v) => v,
2429 None => {
2430 return Err(MCSError::InvalidParams(format!(
2431 "Entity '{name}' not found"
2432 )))
2433 }
2434 };
2435
2436 let mut all_ids: HashSet<i64> = HashSet::new();
2437 let mut all_rels: HashSet<(i64, i64, i64)> = HashSet::new();
2438 let mut frontier: HashSet<i64> = HashSet::new();
2439 all_ids.insert(start_id);
2440 frontier.insert(start_id);
2441
2442 let type_filter: Option<i64> = rtype
2448 .filter(|rt| !rt.is_empty())
2449 .map(|rt| lookup_type_id(&conn, rt, 1).unwrap_or(-1));
2450
2451 let mut q_out_t = conn.prepare_cached(
2453 "SELECT to_id, type_id FROM relation WHERE from_id = ?1 AND type_id = ?2");
2454 let mut q_out = conn.prepare_cached(
2455 "SELECT to_id, type_id FROM relation WHERE from_id = ?1");
2456 let mut q_in_t = conn.prepare_cached(
2457 "SELECT from_id, type_id FROM relation WHERE to_id = ?1 AND type_id = ?2");
2458 let mut q_in = conn.prepare_cached(
2459 "SELECT from_id, type_id FROM relation WHERE to_id = ?1");
2460
2461 let mut cur_depth = 0u32;
2462 while cur_depth < depth && !frontier.is_empty() {
2463 let mut next_frontier: HashSet<i64> = HashSet::new();
2464
2465 for &fid in &frontier {
2466 if direction == Direction::Outgoing || direction == Direction::Both {
2467 if let Some(tid) = type_filter {
2468 if let Ok(ref mut stmt) = q_out_t
2469 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2470 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2471 }) {
2472 for row in rows.flatten() {
2473 let (to_id, t_id) = row;
2474 all_rels.insert((fid, to_id, t_id));
2475 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2476 }
2477 }
2478 } else if let Ok(ref mut stmt) = q_out
2479 && let Ok(rows) = stmt.query_map(params![fid], |row| {
2480 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2481 }) {
2482 for row in rows.flatten() {
2483 let (to_id, t_id) = row;
2484 all_rels.insert((fid, to_id, t_id));
2485 if all_ids.insert(to_id) { next_frontier.insert(to_id); }
2486 }
2487 }
2488 }
2489
2490 if direction == Direction::Incoming || direction == Direction::Both {
2491 if let Some(tid) = type_filter {
2492 if let Ok(ref mut stmt) = q_in_t
2493 && let Ok(rows) = stmt.query_map(params![fid, tid], |row| {
2494 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2495 }) {
2496 for row in rows.flatten() {
2497 let (from_id, t_id) = row;
2498 all_rels.insert((from_id, fid, t_id));
2499 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2500 }
2501 }
2502 } else if let Ok(ref mut stmt) = q_in
2503 && let Ok(rows) = stmt.query_map(params![fid], |row| {
2504 Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
2505 }) {
2506 for row in rows.flatten() {
2507 let (from_id, t_id) = row;
2508 all_rels.insert((from_id, fid, t_id));
2509 if all_ids.insert(from_id) { next_frontier.insert(from_id); }
2510 }
2511 }
2512 }
2513 }
2514
2515 frontier = next_frontier;
2516 cur_depth += 1;
2517 }
2518
2519 let entities_json: String = {
2520 if all_ids.is_empty() {
2521 "[]".to_string()
2522 } else {
2523 let ids: Vec<i64> = all_ids.iter().copied().collect();
2524 let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
2525 let sql = format!(
2526 "SELECT COALESCE(json_group_array(json_object(
2527 'name', e.name,
2528 'entityType', t.name,
2529 'observations', COALESCE((
2530 SELECT json_group_array(o.body ORDER BY o.idx)
2531 FROM observation o WHERE o.entity_id = e.id
2532 ), json('[]'))
2533 ) ORDER BY e.id), json('[]'))
2534 FROM entity e
2535 JOIN type_dict t ON t.id = e.type_id
2536 WHERE e.id IN ({}) AND e.flags = 0",
2537 placeholders.join(",")
2538 );
2539 conn.query_row(&sql, rusqlite::params_from_iter(&ids), |row| {
2540 row.get::<_, String>(0)
2541 })
2542 .unwrap_or_else(|_| "[]".to_string())
2543 }
2544 };
2545
2546 let relations_json: String = {
2547 if all_rels.is_empty() {
2548 "[]".to_string()
2549 } else {
2550 let vals: Vec<String> = all_rels.iter().map(|_| "(?, ?, ?)".to_string()).collect();
2551 let sql = format!(
2552 "WITH r(from_id, to_id, type_id) AS (VALUES {})
2553 SELECT COALESCE(json_group_array(json_object(
2554 'from', e1.name,
2555 'to', e2.name,
2556 'relationType', t.name
2557 )), json('[]'))
2558 FROM r
2559 JOIN entity e1 ON e1.id = r.from_id
2560 JOIN entity e2 ON e2.id = r.to_id
2561 JOIN type_dict t ON t.id = r.type_id
2562 WHERE e1.flags = 0 AND e2.flags = 0",
2563 vals.join(", ")
2564 );
2565 let params: Vec<&dyn ToSql> = all_rels.iter()
2566 .flat_map(|(f, t, tp)| {
2567 vec![f as &dyn ToSql, t as &dyn ToSql, tp as &dyn ToSql]
2568 })
2569 .collect();
2570 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
2571 stmt.query_row(params.as_slice(), |row| row.get::<_, String>(0))
2572 .unwrap_or_else(|_| "[]".to_string())
2573 }
2574 };
2575
2576 let mut out = String::with_capacity(32 + entities_json.len() + relations_json.len());
2577 out.push_str("{\"entities\":");
2578 out.push_str(&entities_json);
2579 out.push_str(",\"relations\":");
2580 out.push_str(&relations_json);
2581 out.push('}');
2582 Ok(out)
2583 }
2584}
2585
2586#[cfg(test)]
2589mod tests {
2590 use super::*;
2591 use serde_json::Value;
2592 use std::ops::Deref;
2593 use std::path::PathBuf;
2594
2595 struct TestKg(GraphHandle, PathBuf);
2596
2597 impl Deref for TestKg {
2598 type Target = GraphHandle;
2599 fn deref(&self) -> &GraphHandle {
2600 &self.0
2601 }
2602 }
2603
2604 impl Drop for TestKg {
2605 fn drop(&mut self) {
2606 cleanup_db(&self.1);
2607 }
2608 }
2609
2610 fn cleanup_db(path: &std::path::Path) {
2611 let _ = std::fs::remove_file(path);
2612 let _ = std::fs::remove_file(path.with_extension("db-wal"));
2613 let _ = std::fs::remove_file(path.with_extension("db-shm"));
2614 }
2615
2616 fn new_kg() -> TestKg {
2617 use std::sync::atomic::AtomicU64;
2618 use std::sync::atomic::Ordering;
2619 static COUNTER: AtomicU64 = AtomicU64::new(0);
2620 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
2621 let dir = std::env::temp_dir();
2622 let path = dir.join(format!("kg_test_{}_{}.db", std::process::id(), n));
2623 cleanup_db(&path);
2624 let kg = GraphHandle::new(&path, Durability::Async, SqliteTuning::default(), NonZeroUsize::new(10000).unwrap(), 4).expect("create KG");
2625 TestKg(kg, path)
2626 }
2627
2628 #[test]
2629 fn test_create_and_get_entity() {
2630 let kg = new_kg();
2631 let entities = vec![Entity {
2632 name: "test".into(),
2633 entity_type: "person".into(),
2634 observations: vec!["obs1".into(), "obs2".into()],
2635 }];
2636 let created = kg.create_entities(&entities).unwrap();
2637 assert_eq!(created.len(), 1);
2638
2639 let got = kg.get_entity("test").unwrap().unwrap();
2640 assert_eq!(got.name, "test");
2641 assert_eq!(got.entity_type, "person");
2642 assert_eq!(got.observations, vec!["obs1", "obs2"]);
2643 }
2644
2645 #[test]
2646 fn test_get_nonexistent() {
2647 let kg = new_kg();
2648 assert!(kg.get_entity("nonexistent").unwrap().is_none());
2649 }
2650
2651 #[test]
2652 fn test_delete_entity() {
2653 let kg = new_kg();
2654 kg.create_entities(&[Entity {
2655 name: "del".into(),
2656 entity_type: "t".into(),
2657 observations: vec![],
2658 }])
2659 .unwrap();
2660 assert!(kg.get_entity("del").unwrap().is_some());
2661 kg.delete_entities(&["del".to_string()]).unwrap();
2662 assert!(kg.get_entity("del").unwrap().is_none());
2663 }
2664
2665 #[test]
2666 fn test_add_and_delete_observations() {
2667 let kg = new_kg();
2668 kg.create_entities(&[Entity {
2669 name: "obs_test".into(),
2670 entity_type: "t".into(),
2671 observations: vec!["a".into()],
2672 }])
2673 .unwrap();
2674
2675 let added = kg.add_observations("obs_test", &["b".into(), "c".into()]).unwrap();
2676 assert_eq!(added.len(), 2);
2677
2678 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2679 assert!(ent.observations.contains(&"b".into()));
2680 assert!(ent.observations.contains(&"c".into()));
2681
2682 kg.delete_observations("obs_test", &["b".into()]).unwrap();
2683 let ent = kg.get_entity("obs_test").unwrap().unwrap();
2684 assert!(!ent.observations.contains(&"b".into()));
2685 assert!(ent.observations.contains(&"c".into()));
2686 assert!(ent.observations.contains(&"a".into()));
2687 }
2688
2689 #[test]
2690 fn test_create_relations() {
2691 let kg = new_kg();
2692 kg.create_entities(&[
2693 Entity {
2694 name: "A".into(),
2695 entity_type: "node".into(),
2696 observations: vec![],
2697 },
2698 Entity {
2699 name: "B".into(),
2700 entity_type: "node".into(),
2701 observations: vec![],
2702 },
2703 ])
2704 .unwrap();
2705
2706 let rels = kg
2707 .create_relations(&[Relation {
2708 from: "A".into(),
2709 to: "B".into(),
2710 relation_type: "edge".into(),
2711 }])
2712 .unwrap();
2713 assert_eq!(rels.len(), 1);
2714
2715 assert_eq!(kg.get_entity_count().unwrap(), 2);
2716 assert_eq!(kg.get_relation_count().unwrap(), 1);
2717 }
2718
2719 #[test]
2720 fn test_search_nodes() {
2721 let kg = new_kg();
2722 kg.create_entities(&[Entity {
2723 name: "Einstein".into(),
2724 entity_type: "scientist".into(),
2725 observations: vec!["physics".into(), "relativity".into()],
2726 }])
2727 .unwrap();
2728
2729 let results = kg.search_nodes_filtered("physics", None, 0, 10);
2730 assert_eq!(results.len(), 1);
2731 assert_eq!(results[0].name, "Einstein");
2732
2733 let results = kg.search_nodes_filtered("physics", Some("scientist"), 0, 10);
2734 assert_eq!(results.len(), 1);
2735
2736 let results = kg.search_nodes_filtered("physics", Some("nonexistent"), 0, 10);
2737 assert_eq!(results.len(), 0);
2738 }
2739
2740 #[test]
2741 fn test_find_path() {
2742 let kg = new_kg();
2743 kg.create_entities(&[
2744 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2745 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2746 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2747 ]).unwrap();
2748
2749 kg.create_relations(&[
2750 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2751 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2752 ]).unwrap();
2753
2754 let path = kg.find_path("A", "C").unwrap().unwrap();
2755 assert_eq!(path, vec!["A", "B", "C"]);
2756 }
2757
2758 #[test]
2759 fn test_degree() {
2760 let kg = new_kg();
2761 kg.create_entities(&[
2762 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2763 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2764 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2765 ]).unwrap();
2766
2767 kg.create_relations(&[
2768 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2769 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2770 ]).unwrap();
2771
2772 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
2773 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
2774 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
2775 }
2776
2777 #[test]
2778 fn test_neighbors() {
2779 let kg = new_kg();
2780 kg.create_entities(&[
2781 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2782 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2783 ]).unwrap();
2784
2785 kg.create_relations(&[Relation {
2786 from: "A".into(), to: "B".into(), relation_type: "e".into(),
2787 }]).unwrap();
2788
2789 let result = kg.neighbors("A", Direction::Outgoing, None, 1).unwrap();
2790 let v: Value = serde_json::from_str(&result).unwrap();
2791 assert_eq!(v["entities"].as_array().unwrap().len(), 2);
2792 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2793 }
2794
2795 #[test]
2796 fn test_open_nodes() {
2797 let kg = new_kg();
2798 kg.create_entities(&[
2799 Entity { name: "X".into(), entity_type: "n".into(), observations: vec!["obs_x".into()] },
2800 Entity { name: "Y".into(), entity_type: "n".into(), observations: vec!["obs_y".into()] },
2801 ]).unwrap();
2802
2803 kg.create_relations(&[Relation {
2804 from: "X".into(), to: "Y".into(), relation_type: "e".into(),
2805 }]).unwrap();
2806
2807 let result = kg.open_nodes(&["X".into()]);
2808 let v: Value = serde_json::from_str(&result).unwrap();
2809 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2810 assert_eq!(v["relations"].as_array().unwrap().len(), 1);
2811 }
2812
2813 #[test]
2814 fn test_entities_exist() {
2815 let kg = new_kg();
2816 kg.create_entities(&[Entity {
2817 name: "exists".into(), entity_type: "t".into(), observations: vec![],
2818 }]).unwrap();
2819
2820 let res = kg.entities_exist(&["exists".into(), "missing".into()]).unwrap();
2821 assert_eq!(res, vec![true, false]);
2822 }
2823
2824 #[test]
2825 fn test_describe_entity() {
2826 let kg = new_kg();
2827 kg.create_entities(&[Entity {
2828 name: "desc".into(), entity_type: "t".into(), observations: vec!["o".into()],
2829 }]).unwrap();
2830
2831 let entity = kg.describe_entity("desc").unwrap();
2832 assert_eq!(entity.name, "desc");
2833 }
2834
2835 #[test]
2836 fn test_entity_type_counts() {
2837 let kg = new_kg();
2838 kg.create_entities(&[
2839 Entity { name: "a".into(), entity_type: "person".into(), observations: vec![] },
2840 Entity { name: "b".into(), entity_type: "person".into(), observations: vec![] },
2841 Entity { name: "c".into(), entity_type: "place".into(), observations: vec![] },
2842 ]).unwrap();
2843
2844 let counts = kg.entity_type_counts();
2845 let map: FxHashMap<_, _> = counts.into_iter().collect();
2846 assert_eq!(map.get("person"), Some(&2));
2847 assert_eq!(map.get("place"), Some(&1));
2848 }
2849
2850 #[test]
2851 fn test_relation_type_counts() {
2852 let kg = new_kg();
2853 kg.create_entities(&[
2854 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
2855 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
2856 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
2857 ]).unwrap();
2858
2859 kg.create_relations(&[
2860 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
2861 Relation { from: "a".into(), to: "c".into(), relation_type: "knows".into() },
2862 ]).unwrap();
2863
2864 let counts = kg.relation_type_counts();
2865 let map: FxHashMap<_, _> = counts.into_iter().collect();
2866 assert_eq!(map.get("knows"), Some(&2));
2867 }
2868
2869 #[test]
2870 fn test_upsert_entities() {
2871 let kg = new_kg();
2872 kg.create_entities(&[Entity {
2873 name: "u".into(), entity_type: "old".into(), observations: vec!["existing".into()],
2874 }]).unwrap();
2875
2876 kg.upsert_entities(&[Entity {
2878 name: "u".into(), entity_type: "new".into(), observations: vec!["existing".into(), "added".into()],
2879 }]).unwrap();
2880
2881 let ent = kg.get_entity("u").unwrap().unwrap();
2882 assert_eq!(ent.entity_type, "new");
2883 assert!(ent.observations.contains(&"added".into()));
2884 assert!(ent.observations.contains(&"existing".into()));
2885 }
2886
2887 #[test]
2888 fn test_merge_entities() {
2889 let kg = new_kg();
2890 kg.create_entities(&[
2891 Entity { name: "source".into(), entity_type: "t".into(), observations: vec!["src_obs".into()] },
2892 Entity { name: "target".into(), entity_type: "t".into(), observations: vec!["tgt_obs".into()] },
2893 ]).unwrap();
2894
2895 kg.create_relations(&[Relation {
2896 from: "source".into(), to: "target".into(), relation_type: "e".into(),
2897 }]).unwrap();
2898
2899 let merged = kg.merge_entities("source", "target").unwrap();
2900 assert_eq!(merged.name, "target");
2901 assert_eq!(kg.get_entity("source").unwrap().is_none(), true);
2902 }
2903
2904 #[test]
2905 fn test_find_all_paths() {
2906 let kg = new_kg();
2907 kg.create_entities(&[
2908 Entity { name: "A".into(), entity_type: "n".into(), observations: vec![] },
2909 Entity { name: "B".into(), entity_type: "n".into(), observations: vec![] },
2910 Entity { name: "C".into(), entity_type: "n".into(), observations: vec![] },
2911 ]).unwrap();
2912
2913 kg.create_relations(&[
2914 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
2915 Relation { from: "B".into(), to: "C".into(), relation_type: "e".into() },
2916 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
2917 ]).unwrap();
2918
2919 let paths = kg.find_all_paths("A", "C", 5, 10).unwrap();
2920 assert!(paths.len() >= 2);
2921 }
2922
2923 #[test]
2924 fn test_batch_get_entities() {
2925 let kg = new_kg();
2926 kg.create_entities(&[
2927 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
2928 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
2929 ]).unwrap();
2930
2931 let results = kg.batch_get_entities(&["a".into(), "missing".into(), "b".into()]);
2932 assert_eq!(results.len(), 3);
2933 assert!(results[0].is_some());
2934 assert!(results[1].is_none());
2935 assert!(results[2].is_some());
2936 }
2937
2938 #[test]
2939 fn test_export_graph() {
2940 let kg = new_kg();
2941 kg.create_entities(&[Entity {
2942 name: "exp".into(), entity_type: "t".into(), observations: vec!["o".into()],
2943 }]).unwrap();
2944
2945 let exported = kg.export("json", i64::MAX).unwrap();
2946 assert!(exported.contains("exp"));
2947 assert!(exported.contains("o"));
2948 }
2949
2950 #[test]
2951 fn test_graph_stats() {
2952 let kg = new_kg();
2953 assert_eq!(kg.get_entity_count().unwrap(), 0);
2954 assert_eq!(kg.get_relation_count().unwrap(), 0);
2955
2956 kg.create_entities(&[Entity {
2957 name: "s".into(), entity_type: "t".into(), observations: vec![],
2958 }]).unwrap();
2959
2960 assert_eq!(kg.get_entity_count().unwrap(), 1);
2961 }
2962
2963 #[test]
2964 fn test_read_graph_filtered() {
2965 let kg = new_kg();
2966 kg.create_entities(&[
2967 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
2968 Entity { name: "p2".into(), entity_type: "place".into(), observations: vec![] },
2969 ]).unwrap();
2970
2971 let out = kg.read_graph_filtered(Some("person"), 0, 10).unwrap();
2972 let v: Value = serde_json::from_str(&out).unwrap();
2973 assert_eq!(v["entities"].as_array().unwrap().len(), 1);
2974 assert_eq!(v["entities"][0]["name"], "p1");
2975 }
2976
2977 #[test]
2978 fn test_wipe() {
2979 let kg = new_kg();
2980 kg.create_entities(&[Entity {
2981 name: "w".into(), entity_type: "t".into(), observations: vec!["o".into()],
2982 }]).unwrap();
2983 assert_eq!(kg.get_entity_count().unwrap(), 1);
2984
2985 kg.wipe().unwrap();
2986 assert_eq!(kg.get_entity_count().unwrap(), 0);
2987 }
2988
2989 #[test]
2990 fn test_push_json_str() {
2991 let mut buf = String::new();
2992 push_json_str(&mut buf, "hello");
2993 assert_eq!(buf, "\"hello\"");
2994 let mut buf = String::new();
2995 push_json_str(&mut buf, "he\"llo");
2996 assert_eq!(buf, "\"he\\\"llo\"");
2997 }
2998
2999 #[test]
3002 fn test_create_entities_empty_input() {
3003 let kg = new_kg();
3004 let created = kg.create_entities(&[]).unwrap();
3005 assert!(created.is_empty());
3006 }
3007
3008 #[test]
3009 fn test_create_entities_skip_empty_name() {
3010 let kg = new_kg();
3011 let created = kg.create_entities(&[Entity {
3012 name: "".into(),
3013 entity_type: "t".into(),
3014 observations: vec![],
3015 }])
3016 .unwrap();
3017 assert!(created.is_empty());
3018 assert_eq!(kg.get_entity_count().unwrap(), 0);
3019 }
3020
3021 #[test]
3022 fn test_create_entities_duplicate_names() {
3023 let kg = new_kg();
3024 let e = Entity {
3025 name: "dup".into(),
3026 entity_type: "t".into(),
3027 observations: vec!["obs".into()],
3028 };
3029 let first = kg.create_entities(&[e.clone()]).unwrap();
3030 assert_eq!(first.len(), 1);
3031 let second = kg.create_entities(&[e.clone()]).unwrap();
3032 assert!(second.is_empty());
3033 assert_eq!(kg.get_entity_count().unwrap(), 1);
3034 }
3035
3036 #[test]
3037 fn test_create_entities_partial_duplicates() {
3038 let kg = new_kg();
3039 let created = kg.create_entities(&[
3040 Entity { name: "a".into(), entity_type: "t".into(), observations: vec![] },
3041 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3042 ]).unwrap();
3043 assert_eq!(created.len(), 2);
3044
3045 let second = kg.create_entities(&[
3046 Entity { name: "b".into(), entity_type: "t".into(), observations: vec![] },
3047 Entity { name: "c".into(), entity_type: "t".into(), observations: vec![] },
3048 ]).unwrap();
3049 assert_eq!(second.len(), 1); assert_eq!(second[0].name, "c");
3051 assert_eq!(kg.get_entity_count().unwrap(), 3);
3052 }
3053
3054 #[test]
3055 fn test_create_entities_mixed_empty_and_valid() {
3056 let kg = new_kg();
3057 let created = kg.create_entities(&[
3058 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3059 Entity { name: "valid".into(), entity_type: "t".into(), observations: vec![] },
3060 Entity { name: "".into(), entity_type: "t".into(), observations: vec![] },
3061 ]).unwrap();
3062 assert_eq!(created.len(), 1);
3063 assert_eq!(created[0].name, "valid");
3064 assert_eq!(kg.get_entity_count().unwrap(), 1);
3065 }
3066
3067 #[test]
3068 fn test_create_entities_same_name_in_batch() {
3069 let kg = new_kg();
3070 let created = kg.create_entities(&[
3071 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3072 Entity { name: "dup_in_batch".into(), entity_type: "t".into(), observations: vec![] },
3073 ]).unwrap();
3074 assert_eq!(created.len(), 1);
3075 assert_eq!(kg.get_entity_count().unwrap(), 1);
3076 }
3077
3078 #[test]
3081 fn test_create_relations_empty_input() {
3082 let kg = new_kg();
3083 let rels = kg.create_relations(&[]).unwrap();
3084 assert!(rels.is_empty());
3085 }
3086
3087 #[test]
3088 fn test_create_relations_nonexistent_from() {
3089 let kg = new_kg();
3090 kg.create_entities(&[Entity {
3091 name: "B".into(), entity_type: "t".into(), observations: vec![],
3092 }]).unwrap();
3093
3094 let rels = kg.create_relations(&[Relation {
3095 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3096 }]).unwrap();
3097 assert!(rels.is_empty());
3098 assert_eq!(kg.get_relation_count().unwrap(), 0);
3099 }
3100
3101 #[test]
3102 fn test_create_relations_nonexistent_to() {
3103 let kg = new_kg();
3104 kg.create_entities(&[Entity {
3105 name: "A".into(), entity_type: "t".into(), observations: vec![],
3106 }]).unwrap();
3107
3108 let rels = kg.create_relations(&[Relation {
3109 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3110 }]).unwrap();
3111 assert!(rels.is_empty());
3112 assert_eq!(kg.get_relation_count().unwrap(), 0);
3113 }
3114
3115 #[test]
3116 fn test_create_relations_both_nonexistent() {
3117 let kg = new_kg();
3118 let rels = kg.create_relations(&[Relation {
3119 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3120 }]).unwrap();
3121 assert!(rels.is_empty());
3122 }
3123
3124 #[test]
3125 fn test_create_relations_self_loop() {
3126 let kg = new_kg();
3127 kg.create_entities(&[Entity {
3128 name: "self".into(), entity_type: "t".into(), observations: vec![],
3129 }]).unwrap();
3130
3131 let rels = kg.create_relations(&[Relation {
3132 from: "self".into(), to: "self".into(), relation_type: "loop".into(),
3133 }]).unwrap();
3134 assert_eq!(rels.len(), 1);
3135 assert_eq!(kg.get_relation_count().unwrap(), 1);
3136 assert_eq!(kg.degree("self", Direction::Outgoing).unwrap(), 1);
3137 assert_eq!(kg.degree("self", Direction::Incoming).unwrap(), 1);
3138 }
3139
3140 #[test]
3141 fn test_create_relations_duplicate() {
3142 let kg = new_kg();
3143 kg.create_entities(&[
3144 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3145 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3146 ]).unwrap();
3147
3148 let r = Relation {
3149 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3150 };
3151 let first = kg.create_relations(&[r.clone()]).unwrap();
3152 assert_eq!(first.len(), 1);
3153
3154 let second = kg.create_relations(&[r.clone()]).unwrap();
3155 assert!(second.is_empty());
3156 assert_eq!(kg.get_relation_count().unwrap(), 1);
3157 }
3158
3159 #[test]
3160 fn test_create_relations_new_type_auto_created() {
3161 let kg = new_kg();
3162 kg.create_entities(&[
3163 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3164 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3165 ]).unwrap();
3166
3167 let rels = kg.create_relations(&[Relation {
3168 from: "A".into(), to: "B".into(), relation_type: "brand_new_type".into(),
3169 }]).unwrap();
3170 assert_eq!(rels.len(), 1);
3171
3172 let counts = kg.relation_type_counts();
3173 let map: FxHashMap<_, _> = counts.into_iter().collect();
3174 assert_eq!(map.get("brand_new_type"), Some(&1));
3175 }
3176
3177 #[test]
3178 fn test_create_relations_degree_updates() {
3179 let kg = new_kg();
3180 kg.create_entities(&[
3181 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3182 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3183 Entity { name: "C".into(), entity_type: "t".into(), observations: vec![] },
3184 ]).unwrap();
3185
3186 kg.create_relations(&[
3187 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3188 Relation { from: "A".into(), to: "C".into(), relation_type: "e".into() },
3189 ]).unwrap();
3190
3191 assert_eq!(kg.degree("A", Direction::Outgoing).unwrap(), 2);
3192 assert_eq!(kg.degree("A", Direction::Incoming).unwrap(), 0);
3193 assert_eq!(kg.degree("B", Direction::Incoming).unwrap(), 1);
3194 assert_eq!(kg.degree("C", Direction::Incoming).unwrap(), 1);
3195 assert_eq!(kg.degree("A", Direction::Both).unwrap(), 2);
3196 }
3197
3198 #[test]
3199 fn test_create_relations_delete_and_recreate() {
3200 let kg = new_kg();
3201 kg.create_entities(&[
3202 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3203 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3204 ]).unwrap();
3205
3206 let r = Relation {
3207 from: "A".into(), to: "B".into(), relation_type: "e".into(),
3208 };
3209 kg.create_relations(&[r.clone()]).unwrap();
3210 assert_eq!(kg.get_relation_count().unwrap(), 1);
3211
3212 kg.delete_relations(&[r.clone()]).unwrap();
3213 assert_eq!(kg.get_relation_count().unwrap(), 0);
3214
3215 let re = kg.create_relations(&[r.clone()]).unwrap();
3217 assert_eq!(re.len(), 1);
3218 assert_eq!(kg.get_relation_count().unwrap(), 1);
3219 }
3220
3221 #[test]
3224 fn test_create_entities_then_relations_then_delete_entity_with_relations() {
3225 let kg = new_kg();
3226 kg.create_entities(&[
3227 Entity { name: "A".into(), entity_type: "t".into(), observations: vec![] },
3228 Entity { name: "B".into(), entity_type: "t".into(), observations: vec![] },
3229 ]).unwrap();
3230 kg.create_relations(&[
3231 Relation { from: "A".into(), to: "B".into(), relation_type: "e".into() },
3232 ]).unwrap();
3233
3234 assert_eq!(kg.get_relation_count().unwrap(), 1);
3235
3236 kg.delete_entities(&["A".into()]).unwrap();
3238 assert_eq!(kg.get_entity("A").unwrap().is_none(), true);
3239 assert_eq!(kg.get_relation_count().unwrap(), 0);
3240 }
3241
3242 #[test]
3243 fn test_graph_stats_after_entity_with_observations() {
3244 let kg = new_kg();
3245 kg.create_entities(&[Entity {
3246 name: "stat".into(), entity_type: "t".into(),
3247 observations: vec!["o1".into(), "o2".into(), "o3".into()],
3248 }]).unwrap();
3249
3250 let ecount = kg.get_entity_count().unwrap();
3251 assert_eq!(ecount, 1);
3253
3254 kg.delete_entities(&["stat".into()]).unwrap();
3256 assert_eq!(kg.get_entity_count().unwrap(), 0);
3257 }
3258
3259 fn new_kg_with_pool(read_pool_size: usize) -> TestKg {
3262 use std::sync::atomic::AtomicU64;
3263 static COUNTER: AtomicU64 = AtomicU64::new(1_000_000);
3264 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3265 let path = std::env::temp_dir().join(format!("kg_pool_{}_{}.db", std::process::id(), n));
3266 cleanup_db(&path);
3267 let kg = GraphHandle::new(
3268 &path,
3269 Durability::Async,
3270 SqliteTuning::default(),
3271 NonZeroUsize::new(10_000).unwrap(),
3272 read_pool_size,
3273 )
3274 .expect("create KG");
3275 TestKg(kg, path)
3276 }
3277
3278 fn seed_line(kg: &GraphHandle, n: usize) {
3279 let entities: Vec<Entity> = (0..n)
3280 .map(|i| Entity {
3281 name: format!("n{i}"),
3282 entity_type: "node".into(),
3283 observations: vec![format!("obs of n{i}")],
3284 })
3285 .collect();
3286 kg.create_entities(&entities).unwrap();
3287 let rels: Vec<Relation> = (0..n.saturating_sub(1))
3288 .map(|i| Relation {
3289 from: format!("n{i}"),
3290 to: format!("n{}", i + 1),
3291 relation_type: "edge".into(),
3292 })
3293 .collect();
3294 if !rels.is_empty() {
3295 kg.create_relations(&rels).unwrap();
3296 }
3297 }
3298
3299 fn count_relations(graph_json: &str) -> usize {
3300 let v: Value = serde_json::from_str(graph_json).unwrap();
3301 v["relations"].as_array().unwrap().len()
3302 }
3303
3304 fn count_entities(graph_json: &str) -> usize {
3305 let v: Value = serde_json::from_str(graph_json).unwrap();
3306 v["entities"].as_array().unwrap().len()
3307 }
3308
3309 #[test]
3312 fn test_pool_size_one_still_works() {
3313 let kg = new_kg_with_pool(1);
3314 seed_line(&kg, 5);
3315 assert_eq!(kg.get_entity_count().unwrap(), 5);
3316 assert!(kg.get_entity("n2").unwrap().is_some());
3317 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3318 assert_eq!(count_entities(&g), 5);
3319 }
3320
3321 #[test]
3322 fn test_reads_see_committed_writes() {
3323 let kg = new_kg_with_pool(4);
3326 kg.create_entities(&[Entity {
3327 name: "fresh".into(),
3328 entity_type: "t".into(),
3329 observations: vec!["v".into()],
3330 }])
3331 .unwrap();
3332 let got = kg.get_entity("fresh").unwrap().unwrap();
3334 assert_eq!(got.observations, vec!["v"]);
3335 }
3336
3337 #[test]
3338 fn test_concurrent_readers_consistent() {
3339 let kg = new_kg_with_pool(4);
3342 seed_line(&kg, 50);
3343
3344 std::thread::scope(|s| {
3345 for _ in 0..8 {
3347 s.spawn(|| {
3348 for _ in 0..200 {
3349 let _ = kg.get_entity("n10");
3350 let _ = kg.search_nodes_filtered("obs", None, 0, 10);
3351 let _ = kg.read_graph_filtered(None, 0, 100);
3352 let _ = kg.get_entity_count();
3353 let _ = kg.neighbors("n10", Direction::Both, None, 2);
3354 }
3355 });
3356 }
3357 s.spawn(|| {
3359 for i in 100..160 {
3360 kg.create_entities(&[Entity {
3361 name: format!("w{i}"),
3362 entity_type: "node".into(),
3363 observations: vec![format!("w obs {i}")],
3364 }])
3365 .unwrap();
3366 }
3367 });
3368 });
3369
3370 assert_eq!(kg.get_entity_count().unwrap(), 110);
3372 assert!(kg.get_entity("w159").unwrap().is_some());
3373 }
3374
3375 #[test]
3376 fn test_reader_pool_rejects_writes_internally() {
3377 let kg = new_kg_with_pool(1);
3383 seed_line(&kg, 3);
3384 std::thread::scope(|s| {
3385 for _ in 0..4 {
3386 s.spawn(|| {
3387 for _ in 0..100 {
3388 let _ = kg.read_graph_filtered(None, 0, 10);
3389 }
3390 });
3391 }
3392 });
3393 assert_eq!(kg.get_entity_count().unwrap(), 3);
3394 }
3395
3396 #[test]
3399 fn test_read_graph_relations_scoped_to_page() {
3400 let kg = new_kg_with_pool(2);
3401 seed_line(&kg, 4);
3403
3404 let full = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3406 assert_eq!(count_entities(&full), 4);
3407 assert_eq!(count_relations(&full), 3);
3408
3409 let page1 = kg.read_graph_filtered(None, 0, 1).unwrap();
3412 assert_eq!(count_entities(&page1), 1);
3413 assert_eq!(count_relations(&page1), 0);
3414
3415 let page2 = kg.read_graph_filtered(None, 0, 2).unwrap();
3418 assert_eq!(count_entities(&page2), 2);
3419 assert_eq!(count_relations(&page2), 1);
3420 }
3421
3422 #[test]
3423 fn test_read_graph_pagination_offset() {
3424 let kg = new_kg_with_pool(2);
3425 seed_line(&kg, 5);
3426 let g = kg.read_graph_filtered(None, 2, 2).unwrap();
3427 assert_eq!(count_entities(&g), 2);
3428 assert!(!g.contains("\"n0\""));
3430 assert!(!g.contains("\"n1\""));
3431 assert!(g.contains("\"n2\""));
3432 assert!(g.contains("\"n3\""));
3433 }
3434
3435 #[test]
3436 fn test_read_graph_empty() {
3437 let kg = new_kg_with_pool(2);
3438 let g = kg.read_graph_filtered(None, 0, usize::MAX).unwrap();
3439 assert_eq!(g, r#"{"entities":[],"relations":[]}"#);
3440 }
3441
3442 #[test]
3443 fn test_read_graph_filtered_by_type() {
3444 let kg = new_kg_with_pool(2);
3445 kg.create_entities(&[
3446 Entity { name: "p1".into(), entity_type: "person".into(), observations: vec![] },
3447 Entity { name: "q1".into(), entity_type: "place".into(), observations: vec![] },
3448 Entity { name: "p2".into(), entity_type: "person".into(), observations: vec![] },
3449 ])
3450 .unwrap();
3451 let g = kg.read_graph_filtered(Some("person"), 0, usize::MAX).unwrap();
3452 assert_eq!(count_entities(&g), 2);
3453 assert!(g.contains("\"p1\""));
3454 assert!(g.contains("\"p2\""));
3455 assert!(!g.contains("\"q1\""));
3456 }
3457
3458 #[test]
3459 fn test_export_respects_max_rows() {
3460 let kg = new_kg_with_pool(2);
3461 seed_line(&kg, 5);
3462
3463 let full = kg.export("json", i64::MAX).unwrap();
3465 assert_eq!(count_entities(&full), 5);
3466 assert_eq!(count_relations(&full), 4);
3467
3468 let capped = kg.export("json", 2).unwrap();
3470 assert_eq!(count_entities(&capped), 2);
3471 assert_eq!(count_relations(&capped), 2);
3472 }
3473
3474 #[test]
3475 fn test_export_negative_max_rows_is_unbounded() {
3476 let kg = new_kg_with_pool(2);
3477 seed_line(&kg, 3);
3478 let out = kg.export("json", -1).unwrap();
3480 assert_eq!(count_entities(&out), 3);
3481 }
3482
3483 #[test]
3486 fn test_many_small_write_batches_stay_consistent() {
3487 let kg = new_kg_with_pool(2);
3488 for i in 0..100 {
3489 kg.create_entities(&[Entity {
3490 name: format!("e{i}"),
3491 entity_type: "t".into(),
3492 observations: vec![format!("o{i}")],
3493 }])
3494 .unwrap();
3495 }
3496 assert_eq!(kg.get_entity_count().unwrap(), 100);
3497 let hits = kg.search_nodes_filtered("e57", None, 0, 10);
3500 assert!(hits.iter().any(|e| e.name == "e57"));
3501 }
3502
3503 #[test]
3506 fn test_wipe_clears_name_and_obs_fts() {
3507 let kg = new_kg_with_pool(2);
3508 kg.create_entities(&[Entity {
3509 name: "Einstein".into(),
3510 entity_type: "scientist".into(),
3511 observations: vec!["physics".into()],
3512 }])
3513 .unwrap();
3514
3515 assert_eq!(kg.search_nodes_filtered("Einstein", None, 0, 10).len(), 1);
3517 assert_eq!(kg.search_nodes_filtered("physics", None, 0, 10).len(), 1);
3518
3519 kg.wipe().unwrap();
3520
3521 assert_eq!(kg.get_entity_count().unwrap(), 0);
3524 assert!(kg.search_nodes_filtered("Einstein", None, 0, 10).is_empty());
3525 assert!(kg.search_nodes_filtered("physics", None, 0, 10).is_empty());
3526 }
3527
3528 #[test]
3529 fn test_wipe_then_recreate_search_works() {
3530 let kg = new_kg_with_pool(2);
3533 kg.create_entities(&[Entity {
3534 name: "Einstein".into(),
3535 entity_type: "scientist".into(),
3536 observations: vec!["physics".into()],
3537 }])
3538 .unwrap();
3539 kg.wipe().unwrap();
3540
3541 kg.create_entities(&[Entity {
3542 name: "Einstein".into(),
3543 entity_type: "scientist".into(),
3544 observations: vec!["physics".into(), "relativity".into()],
3545 }])
3546 .unwrap();
3547
3548 let by_name = kg.search_nodes_filtered("Einstein", None, 0, 10);
3549 assert_eq!(by_name.len(), 1, "exactly one Einstein after recreate");
3550 let by_obs = kg.search_nodes_filtered("relativity", None, 0, 10);
3551 assert_eq!(by_obs.len(), 1);
3552 assert_eq!(kg.get_entity_count().unwrap(), 1);
3553 }
3554
3555 #[test]
3558 fn test_search_relations_missing_type_returns_empty() {
3559 let kg = new_kg_with_pool(2);
3560 seed_line(&kg, 3); let r = kg.search_relations(None, None, Some("does_not_exist"));
3564 assert!(r.is_empty());
3565 let types = kg.relation_type_counts();
3567 assert!(types.iter().all(|(t, _)| t != "does_not_exist"));
3568 }
3569
3570 #[test]
3571 fn test_search_relations_missing_from_returns_empty() {
3572 let kg = new_kg_with_pool(2);
3573 seed_line(&kg, 3);
3574 let r = kg.search_relations(Some("ghost"), None, None);
3575 assert!(r.is_empty(), "missing 'from' must not match every relation");
3576 }
3577
3578 #[test]
3579 fn test_search_relations_existing_filters_still_work() {
3580 let kg = new_kg_with_pool(2);
3581 seed_line(&kg, 3);
3582 let r = kg.search_relations(Some("n0"), None, Some("edge"));
3583 assert_eq!(r.len(), 1);
3584 assert_eq!(r[0].from, "n0");
3585 assert_eq!(r[0].to, "n1");
3586 }
3587
3588 #[test]
3589 fn test_neighbors_missing_type_returns_only_start() {
3590 let kg = new_kg_with_pool(2);
3591 seed_line(&kg, 3);
3592 let json = kg
3593 .neighbors("n0", Direction::Both, Some("nonexistent"), 2)
3594 .unwrap();
3595 assert_eq!(count_entities(&json), 1);
3597 assert_eq!(count_relations(&json), 0);
3598 }
3599
3600 #[test]
3601 fn test_neighbors_existing_type_filters() {
3602 let kg = new_kg_with_pool(2);
3603 kg.create_entities(&[
3604 Entity { name: "a".into(), entity_type: "n".into(), observations: vec![] },
3605 Entity { name: "b".into(), entity_type: "n".into(), observations: vec![] },
3606 Entity { name: "c".into(), entity_type: "n".into(), observations: vec![] },
3607 ])
3608 .unwrap();
3609 kg.create_relations(&[
3610 Relation { from: "a".into(), to: "b".into(), relation_type: "knows".into() },
3611 Relation { from: "a".into(), to: "c".into(), relation_type: "likes".into() },
3612 ])
3613 .unwrap();
3614 let json = kg.neighbors("a", Direction::Outgoing, Some("knows"), 1).unwrap();
3615 assert!(json.contains("\"b\""));
3616 assert!(!json.contains("\"c\""));
3617 assert_eq!(count_relations(&json), 1);
3618 }
3619
3620 #[test]
3621 fn test_sqlite_tuning_applied_to_fresh_db() {
3622 use std::sync::atomic::AtomicU64;
3623 static COUNTER: AtomicU64 = AtomicU64::new(2_000_000);
3624 let n = COUNTER.fetch_add(1, Ordering::SeqCst);
3625 let path = std::env::temp_dir().join(format!("kg_tuning_{}_{}.db", std::process::id(), n));
3626 cleanup_db(&path);
3627
3628 let tuning = SqliteTuning {
3629 page_size: 8192,
3630 ..SqliteTuning::default()
3631 };
3632 let kg = TestKg(
3633 GraphHandle::new(&path, Durability::Async, tuning, NonZeroUsize::new(64).unwrap(), 2)
3634 .expect("create KG"),
3635 path.clone(),
3636 );
3637 kg.create_entities(&[Entity {
3638 name: "a".into(),
3639 entity_type: "n".into(),
3640 observations: vec!["o".into()],
3641 }])
3642 .unwrap();
3643
3644 let probe = Connection::open(&path).unwrap();
3647 let page_size: i64 = probe.query_row("PRAGMA page_size", [], |r| r.get(0)).unwrap();
3648 assert_eq!(page_size, 8192);
3649 let auto_vacuum: i64 = probe.query_row("PRAGMA auto_vacuum", [], |r| r.get(0)).unwrap();
3650 assert_eq!(auto_vacuum, 2, "expected INCREMENTAL auto_vacuum");
3651 let journal: String = probe.query_row("PRAGMA journal_mode", [], |r| r.get(0)).unwrap();
3652 assert_eq!(journal.to_lowercase(), "wal");
3653 }
3654
3655 #[test]
3656 fn test_checkpoint_passive_is_noop_safe() {
3657 let kg = new_kg();
3658 kg.checkpoint_passive().unwrap();
3660 kg.create_entities(&[Entity {
3661 name: "a".into(),
3662 entity_type: "n".into(),
3663 observations: vec!["o".into()],
3664 }])
3665 .unwrap();
3666 kg.checkpoint_passive().unwrap();
3668 kg.checkpoint_passive().unwrap();
3669 assert!(kg.get_entity("a").unwrap().is_some());
3671 }
3672}